WIP: initial prototype

- Added randomizeHost option
- Added ignore status code option
- Updated error classes
- Added ndjson support
- Retry on 502/3/4
This commit is contained in:
delvedor
2018-10-25 17:10:00 +02:00
parent 145e2ab5e5
commit 8ce9f970f0
5 changed files with 69 additions and 15 deletions

View File

@ -52,6 +52,7 @@ class Client extends EventEmitter {
sniffEndpoint: '_nodes/_all/http',
sniffOnConnectionFault: false,
resurrectStrategy: 'ping',
randomizeHost: true,
ssl: null,
agent: null
}, opts)
@ -61,6 +62,7 @@ class Client extends EventEmitter {
this[kConnectionPool] = new options.ConnectionPool({
pingTimeout: opts.pingTimeout,
resurrectStrategy: opts.resurrectStrategy,
randomizeHost: opts.randomizeHost,
selector: this[kSelector],
ssl: options.ssl,
agent: null

View File

@ -24,6 +24,7 @@ class ConnectionPool {
// the timeout doesn't increase
this.resurrectTimeoutCutoff = 5
this.pingTimeout = opts.pingTimeout
this.randomizeHost = opts.randomizeHost
const resurrectStrategy = opts.resurrectStrategy || 'ping'
this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy]
@ -141,7 +142,8 @@ class ConnectionPool {
this.dead.splice(this.dead.indexOf(id), 1)
connection.status = Connection.statuses.ALIVE
this.connections.set(id, connection)
callback(null, connection)
// eslint-disable-next-line standard/no-callback-literal
callback(true, connection)
}
}
@ -187,6 +189,9 @@ class ConnectionPool {
}
this.connections.set(connection.id, connection)
this.alive.push(connection.id)
if (this.randomizeHost === true) {
this.alive = shuffleArray(this.alive)
}
return connection
}
@ -274,4 +279,10 @@ ConnectionPool.resurrectStrategies = {
optimistic: 2
}
// https://gist.github.com/guilhermepontes/17ae0cc71fa2b13ea8c20c94c5c35dc4
const shuffleArray = arr => arr
.map(a => [Math.random(), a])
.sort((a, b) => a[0] - b[0])
.map(a => a[1])
module.exports = ConnectionPool

View File

@ -23,6 +23,18 @@ class Serializer {
}
return object
}
ndserialize (array) {
debug('ndserialize', array)
if (Array.isArray(array) === false) {
throw new SerializationError('The argument provided is not an array')
}
var ndjson = ''
for (var i = 0, len = array.length; i < len; i++) {
ndjson += this.serialize(array[i]) + '\n'
}
return ndjson
}
}
module.exports = Serializer

View File

@ -40,6 +40,7 @@ class Transport {
return callback(new NoLivingConnectionsError('There are not living connections'))
}
// handle json body
if (params.body != null) {
try {
params.body = this.serializer.serialize(params.body)
@ -49,6 +50,16 @@ class Transport {
params.headers = params.headers || {}
params.headers['Content-Type'] = 'application/json'
params.headers['Content-Length'] = '' + Buffer.byteLength(params.body)
// handle ndjson body
} else if (params.bulkBody != null) {
try {
params.body = this.serializer.ndserialize(params.bulkBody)
} catch (err) {
return callback(err)
}
params.headers = params.headers || {}
params.headers['Content-Type'] = 'application/x-ndjson'
params.headers['Content-Length'] = '' + Buffer.byteLength(params.body)
}
params.timeout = params.timeout || this.requestTimeout
@ -67,8 +78,8 @@ class Transport {
}
const error = err.message === 'Request timed out'
? new TimeoutError(err.message)
: new ConnectionError(err.message)
? new TimeoutError(err.message, params)
: new ConnectionError(err.message, params)
this.emit('error', error, params)
return callback(error)
@ -77,10 +88,10 @@ class Transport {
var json = ''
response.setEncoding('utf8')
response.on('data', chunk => { json += chunk })
response.on('error', err => callback(new ConnectionError(err.message)))
response.on('error', err => callback(new ConnectionError(err.message, params)))
response.on('end', () => {
debug('JSON response', params, json)
this.connectionPool.markAlive(connection)
const contentType = response.headers['content-type']
if (contentType != null && contentType.indexOf('application/json') > -1) {
try {
@ -92,10 +103,25 @@ class Transport {
} else {
payload = json
}
const { statusCode } = response
this.emit('response', params, { statusCode, payload })
if (statusCode >= 400) {
callback(new ResponseError(payload))
const { statusCode, headers } = response
const ignoreStatusCode = Array.isArray(params.ignore) && params.ignore.indexOf(statusCode) > -1
if (ignoreStatusCode === false &&
(statusCode === 502 || statusCode === 503 || statusCode === 504)) {
this.connectionPool.markDead(connection)
if (attempts > 0) {
debug(`Retrying request, there are still ${attempts} attempts`, params)
params[kRemainingAttempts] = attempts - 1
return this.request(params, callback)
}
} else {
this.connectionPool.markAlive(connection)
}
this.emit('response', params, { statusCode, payload, headers })
if (ignoreStatusCode === false && statusCode >= 400) {
callback(new ResponseError(payload, statusCode, headers))
} else {
callback(null, payload)
}

View File

@ -10,20 +10,22 @@ class BadConfigurationError extends Error {
}
class TimeoutError extends Error {
constructor (message) {
constructor (message, request) {
super()
Error.captureStackTrace(this, TimeoutError)
this.name = 'TimeoutError'
this.message = message || 'Timeout Error'
this.request = request
}
}
class ConnectionError extends Error {
constructor (message) {
constructor (message, request) {
super()
Error.captureStackTrace(this, ConnectionError)
this.name = 'ConnectionError'
this.message = message || 'Connection Error'
this.request = request
}
}
@ -55,13 +57,14 @@ class DeserializationError extends Error {
}
class ResponseError extends Error {
constructor (err) {
constructor (payload, statusCode, headers) {
super()
Error.captureStackTrace(this, ResponseError)
this.name = 'ResponseError'
this.message = (err && err.error && err.error.type) || 'Response Error'
this.response = err
this.statusCode = err && err.status
this.message = (payload && payload.error && payload.error.type) || 'Response Error'
this.response = payload
this.statusCode = (payload && payload.status) || statusCode
this.headers = headers
}
}