From 8ce9f970f054d3d21fc9d523a91265d01ee9f53d Mon Sep 17 00:00:00 2001 From: delvedor Date: Thu, 25 Oct 2018 17:10:00 +0200 Subject: [PATCH] WIP: initial prototype - Added randomizeHost option - Added ignore status code option - Updated error classes - Added ndjson support - Retry on 502/3/4 --- index.js | 2 ++ lib/ConnectionPool.js | 13 ++++++++++++- lib/Serializer.js | 12 ++++++++++++ lib/Transport.js | 42 ++++++++++++++++++++++++++++++++++-------- lib/errors.js | 15 +++++++++------ 5 files changed, 69 insertions(+), 15 deletions(-) diff --git a/index.js b/index.js index 0d62f44ad..02dc24202 100644 --- a/index.js +++ b/index.js @@ -52,6 +52,7 @@ class Client extends EventEmitter { sniffEndpoint: '_nodes/_all/http', sniffOnConnectionFault: false, resurrectStrategy: 'ping', + randomizeHost: true, ssl: null, agent: null }, opts) @@ -61,6 +62,7 @@ class Client extends EventEmitter { this[kConnectionPool] = new options.ConnectionPool({ pingTimeout: opts.pingTimeout, resurrectStrategy: opts.resurrectStrategy, + randomizeHost: opts.randomizeHost, selector: this[kSelector], ssl: options.ssl, agent: null diff --git a/lib/ConnectionPool.js b/lib/ConnectionPool.js index 37bed21e0..8f52a1a77 100644 --- a/lib/ConnectionPool.js +++ b/lib/ConnectionPool.js @@ -24,6 +24,7 @@ class ConnectionPool { // the timeout doesn't increase this.resurrectTimeoutCutoff = 5 this.pingTimeout = opts.pingTimeout + this.randomizeHost = opts.randomizeHost const resurrectStrategy = opts.resurrectStrategy || 'ping' this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy] @@ -141,7 +142,8 @@ class ConnectionPool { this.dead.splice(this.dead.indexOf(id), 1) connection.status = Connection.statuses.ALIVE this.connections.set(id, connection) - callback(null, connection) + // eslint-disable-next-line standard/no-callback-literal + callback(true, connection) } } @@ -187,6 +189,9 @@ class ConnectionPool { } this.connections.set(connection.id, connection) this.alive.push(connection.id) + if (this.randomizeHost === true) { + this.alive = shuffleArray(this.alive) + } return connection } @@ -274,4 +279,10 @@ ConnectionPool.resurrectStrategies = { optimistic: 2 } +// https://gist.github.com/guilhermepontes/17ae0cc71fa2b13ea8c20c94c5c35dc4 +const shuffleArray = arr => arr + .map(a => [Math.random(), a]) + .sort((a, b) => a[0] - b[0]) + .map(a => a[1]) + module.exports = ConnectionPool diff --git a/lib/Serializer.js b/lib/Serializer.js index ea37ed513..52125ea29 100644 --- a/lib/Serializer.js +++ b/lib/Serializer.js @@ -23,6 +23,18 @@ class Serializer { } return object } + + ndserialize (array) { + debug('ndserialize', array) + if (Array.isArray(array) === false) { + throw new SerializationError('The argument provided is not an array') + } + var ndjson = '' + for (var i = 0, len = array.length; i < len; i++) { + ndjson += this.serialize(array[i]) + '\n' + } + return ndjson + } } module.exports = Serializer diff --git a/lib/Transport.js b/lib/Transport.js index eaa9a1ca2..6885d143c 100644 --- a/lib/Transport.js +++ b/lib/Transport.js @@ -40,6 +40,7 @@ class Transport { return callback(new NoLivingConnectionsError('There are not living connections')) } + // handle json body if (params.body != null) { try { params.body = this.serializer.serialize(params.body) @@ -49,6 +50,16 @@ class Transport { params.headers = params.headers || {} params.headers['Content-Type'] = 'application/json' params.headers['Content-Length'] = '' + Buffer.byteLength(params.body) + // handle ndjson body + } else if (params.bulkBody != null) { + try { + params.body = this.serializer.ndserialize(params.bulkBody) + } catch (err) { + return callback(err) + } + params.headers = params.headers || {} + params.headers['Content-Type'] = 'application/x-ndjson' + params.headers['Content-Length'] = '' + Buffer.byteLength(params.body) } params.timeout = params.timeout || this.requestTimeout @@ -67,8 +78,8 @@ class Transport { } const error = err.message === 'Request timed out' - ? new TimeoutError(err.message) - : new ConnectionError(err.message) + ? new TimeoutError(err.message, params) + : new ConnectionError(err.message, params) this.emit('error', error, params) return callback(error) @@ -77,10 +88,10 @@ class Transport { var json = '' response.setEncoding('utf8') response.on('data', chunk => { json += chunk }) - response.on('error', err => callback(new ConnectionError(err.message))) + response.on('error', err => callback(new ConnectionError(err.message, params))) response.on('end', () => { debug('JSON response', params, json) - this.connectionPool.markAlive(connection) + const contentType = response.headers['content-type'] if (contentType != null && contentType.indexOf('application/json') > -1) { try { @@ -92,10 +103,25 @@ class Transport { } else { payload = json } - const { statusCode } = response - this.emit('response', params, { statusCode, payload }) - if (statusCode >= 400) { - callback(new ResponseError(payload)) + + const { statusCode, headers } = response + const ignoreStatusCode = Array.isArray(params.ignore) && params.ignore.indexOf(statusCode) > -1 + + if (ignoreStatusCode === false && + (statusCode === 502 || statusCode === 503 || statusCode === 504)) { + this.connectionPool.markDead(connection) + if (attempts > 0) { + debug(`Retrying request, there are still ${attempts} attempts`, params) + params[kRemainingAttempts] = attempts - 1 + return this.request(params, callback) + } + } else { + this.connectionPool.markAlive(connection) + } + + this.emit('response', params, { statusCode, payload, headers }) + if (ignoreStatusCode === false && statusCode >= 400) { + callback(new ResponseError(payload, statusCode, headers)) } else { callback(null, payload) } diff --git a/lib/errors.js b/lib/errors.js index 38c82eb67..364808168 100644 --- a/lib/errors.js +++ b/lib/errors.js @@ -10,20 +10,22 @@ class BadConfigurationError extends Error { } class TimeoutError extends Error { - constructor (message) { + constructor (message, request) { super() Error.captureStackTrace(this, TimeoutError) this.name = 'TimeoutError' this.message = message || 'Timeout Error' + this.request = request } } class ConnectionError extends Error { - constructor (message) { + constructor (message, request) { super() Error.captureStackTrace(this, ConnectionError) this.name = 'ConnectionError' this.message = message || 'Connection Error' + this.request = request } } @@ -55,13 +57,14 @@ class DeserializationError extends Error { } class ResponseError extends Error { - constructor (err) { + constructor (payload, statusCode, headers) { super() Error.captureStackTrace(this, ResponseError) this.name = 'ResponseError' - this.message = (err && err.error && err.error.type) || 'Response Error' - this.response = err - this.statusCode = err && err.status + this.message = (payload && payload.error && payload.error.type) || 'Response Error' + this.response = payload + this.statusCode = (payload && payload.status) || statusCode + this.headers = headers } }