diff --git a/api/index.js b/api/index.js index 3b9c905e4..bf88079ff 100644 --- a/api/index.js +++ b/api/index.js @@ -664,7 +664,10 @@ function ESAPI (opts) { return apis function handleError (err, callback) { - if (callback) return callback(err, result) + if (callback) { + process.nextTick(callback, err, result) + return { then: noop, catch: noop, abort: noop } + } return Promise.reject(err) } @@ -699,4 +702,6 @@ function lazyLoad (file, opts) { } } +function noop () {} + module.exports = ESAPI diff --git a/lib/Transport.js b/lib/Transport.js index b32ae8c9b..2d7e2285f 100644 --- a/lib/Transport.js +++ b/lib/Transport.js @@ -124,14 +124,28 @@ class Transport { ? 0 : (typeof options.maxRetries === 'number' ? options.maxRetries : this.maxRetries) const compression = options.compression !== undefined ? options.compression : this.compression var request = { abort: noop } + const transportReturn = { + then (onFulfilled, onRejected) { + return p.then(onFulfilled, onRejected) + }, + catch (onRejected) { + return p.catch(onRejected) + }, + abort () { + meta.aborted = true + request.abort() + debug('Aborting request', params) + return this + } + } const makeRequest = () => { if (meta.aborted === true) { - return callback(new RequestAbortedError(), result) + return process.nextTick(callback, new RequestAbortedError(), result) } meta.connection = this.getConnection({ requestId: meta.request.id }) if (meta.connection == null) { - return callback(new NoLivingConnectionsError(), result) + return process.nextTick(callback, new NoLivingConnectionsError(), result) } this.emit('request', null, result) // perform the actual http request @@ -266,7 +280,8 @@ class Transport { try { params.body = this.serializer.serialize(params.body) } catch (err) { - return callback(err, result) + process.nextTick(callback, err, result) + return transportReturn } } @@ -280,7 +295,8 @@ class Transport { try { params.body = this.serializer.ndserialize(params.bulkBody) } catch (err) { - return callback(err, result) + process.nextTick(callback, err, result) + return transportReturn } } else { params.body = params.bulkBody @@ -333,20 +349,7 @@ class Transport { makeRequest() } - return { - then (onFulfilled, onRejected) { - return p.then(onFulfilled, onRejected) - }, - catch (onRejected) { - return p.catch(onRejected) - }, - abort () { - meta.aborted = true - request.abort() - debug('Aborting request', params) - return this - } - } + return transportReturn } getConnection (opts) { diff --git a/test/unit/api.test.js b/test/unit/api.test.js index ad61a71e1..13e1361fd 100644 --- a/test/unit/api.test.js +++ b/test/unit/api.test.js @@ -299,6 +299,22 @@ test('ConfigurationError (promises)', t => { }) }) +test('The callback with a sync error should be called in the next tick', t => { + t.plan(4) + + const client = new Client({ + node: 'http://localhost:9200' + }) + + const transportReturn = client.index({ body: { foo: 'bar' } }, (err, result) => { + t.ok(err instanceof errors.ConfigurationError) + }) + + t.type(transportReturn.then, 'function') + t.type(transportReturn.catch, 'function') + t.type(transportReturn.abort, 'function') +}) + if (Number(process.version.split('.')[0].slice(1)) >= 8) { require('./api-async')(test) } diff --git a/test/unit/transport.test.js b/test/unit/transport.test.js index 18a62a2ce..169b2df98 100644 --- a/test/unit/transport.test.js +++ b/test/unit/transport.test.js @@ -2537,3 +2537,65 @@ test('Lowercase headers utilty', t => { t.strictEqual(lowerCaseHeaders(undefined), undefined) }) + +test('The callback with a sync error should be called in the next tick - json', t => { + t.plan(4) + const pool = new ConnectionPool({ Connection }) + pool.addConnection('http://localhost:9200') + + const transport = new Transport({ + emit: () => {}, + connectionPool: pool, + serializer: new Serializer(), + maxRetries: 3, + requestTimeout: 30000, + sniffInterval: false, + sniffOnStart: false + }) + + const body = { a: true } + body.o = body + + const transportReturn = transport.request({ + method: 'POST', + path: '/hello', + body + }, (err, { body }) => { + t.ok(err instanceof SerializationError) + }) + + t.type(transportReturn.then, 'function') + t.type(transportReturn.catch, 'function') + t.type(transportReturn.abort, 'function') +}) + +test('The callback with a sync error should be called in the next tick - ndjson', t => { + t.plan(4) + const pool = new ConnectionPool({ Connection }) + pool.addConnection('http://localhost:9200') + + const transport = new Transport({ + emit: () => {}, + connectionPool: pool, + serializer: new Serializer(), + maxRetries: 3, + requestTimeout: 30000, + sniffInterval: false, + sniffOnStart: false + }) + + const field = { a: true } + field.o = field + + const transportReturn = transport.request({ + method: 'POST', + path: '/hello', + bulkBody: [field] + }, (err, { body }) => { + t.ok(err instanceof SerializationError) + }) + + t.type(transportReturn.then, 'function') + t.type(transportReturn.catch, 'function') + t.type(transportReturn.abort, 'function') +})