WIP: initial prototype

- Standardized event emitters
- Refactored transport.request to have a better handling of the state
- Added sniff event
- Improved abort handling
This commit is contained in:
delvedor
2018-12-03 18:06:09 +01:00
parent ab1d7ba992
commit fd738f8425
6 changed files with 112 additions and 67 deletions

1
index.d.ts vendored
View File

@ -468,6 +468,7 @@ declare const events: {
RESPONSE: string;
REQUEST: string;
ERROR: string;
SNIFF: string;
};
export { Client, Transport, ConnectionPool, Connection, Serializer, events, ApiResponse };

View File

@ -5,7 +5,8 @@ const Transport = require('./lib/Transport')
const Connection = require('./lib/Connection')
const ConnectionPool = require('./lib/ConnectionPool')
const Serializer = require('./lib/Serializer')
const { ConfigurationError } = require('./lib/errors')
const errors = require('./lib/errors')
const { ConfigurationError } = errors
const buildApi = require('./api')
@ -94,7 +95,8 @@ class Client extends EventEmitter {
const events = {
RESPONSE: 'response',
REQUEST: 'request',
ERROR: 'error'
ERROR: 'error',
SNIFF: 'sniff'
}
module.exports = {
@ -103,5 +105,6 @@ module.exports = {
ConnectionPool,
Connection,
Serializer,
events
events,
errors
}

View File

@ -77,6 +77,15 @@ class Connection {
}
})
// updates the ended state
request.on('abort', () => {
debug('Request aborted', params)
if (ended === false) {
ended = true
this._openRequests--
}
})
// Disables the Nagle algorithm
request.setNoDelay(true)

View File

@ -274,6 +274,7 @@ class ConnectionPool {
/**
* Transforms the nodes objects to a host object.
* TODO: handle ssl and agent options
*
* @param {object} nodes
* @returns {array} hosts

View File

@ -44,6 +44,7 @@ class Serializer {
qserialize (object) {
debug('qserialize', object)
if (object == null) return ''
if (typeof object === 'string') return object
// arrays should be serialized as comma separated list
const keys = Object.keys(object)
for (var i = 0, len = keys.length; i < len; i++) {

View File

@ -11,7 +11,6 @@ const {
} = require('./errors')
const noop = () => {}
const kRemainingAttempts = Symbol('elasticsearch-remaining-attempts')
class Transport {
constructor (opts = {}) {
@ -36,78 +35,99 @@ class Transport {
request (params, callback) {
callback = once(callback)
const result = { body: null, statusCode: null, headers: null, warnings: null }
const attempts = params[kRemainingAttempts] || params.maxRetries || this.maxRetries
const connection = this.getConnection()
if (connection === null) {
return callback(new NoLivingConnectionsError('There are not living connections'), result)
const meta = {
connection: null,
request: null,
response: null,
attempts: 0,
aborted: false
}
const result = {
body: null,
statusCode: null,
headers: null,
warnings: null
}
const maxRetries = params.maxRetries || this.maxRetries
var request = { abort: noop }
params.headers = params.headers || {}
// handle json body
if (params.body != null) {
if (shouldSerialize(params.body) === true) {
try {
params.body = this.serializer.serialize(params.body)
} catch (err) {
return callback(err, result)
const makeRequest = () => {
if (meta.aborted === true) return
meta.connection = this.getConnection()
if (meta.connection === null) {
return callback(new NoLivingConnectionsError('There are not living connections'), result)
}
params.headers = params.headers || {}
// handle json body
if (params.body != null) {
if (shouldSerialize(params.body) === true) {
try {
params.body = this.serializer.serialize(params.body)
} catch (err) {
return callback(err, result)
}
}
params.headers['Content-Type'] = 'application/json'
if (isStream(params.body) === false) {
params.headers['Content-Length'] = '' + Buffer.byteLength(params.body)
}
// handle ndjson body
} else if (params.bulkBody != null) {
if (shouldSerialize(params.bulkBody) === true) {
try {
params.body = this.serializer.ndserialize(params.bulkBody)
} catch (err) {
return callback(err, result)
}
} else {
params.body = params.bulkBody
}
params.headers['Content-Type'] = 'application/x-ndjson'
if (isStream(params.body) === false) {
params.headers['Content-Length'] = '' + Buffer.byteLength(params.body)
}
}
params.headers['Content-Type'] = 'application/json'
if (isStream(params.body) === false) {
params.headers['Content-Length'] = '' + Buffer.byteLength(params.body)
}
// handle ndjson body
} else if (params.bulkBody != null) {
if (shouldSerialize(params.bulkBody) === true) {
try {
params.body = this.serializer.ndserialize(params.bulkBody)
} catch (err) {
return callback(err, result)
}
} else {
params.body = params.bulkBody
}
params.headers['Content-Type'] = 'application/x-ndjson'
if (isStream(params.body) === false) {
params.headers['Content-Length'] = '' + Buffer.byteLength(params.body)
if (this.suggestCompression === true) {
params.headers['Accept-Encoding'] = 'gzip,deflate'
}
// serializes the querystring
params.querystring = this.serializer.qserialize(params.querystring)
// handles request timeout
params.timeout = toMs(params.requestTimeout || this.requestTimeout)
meta.request = params
this.emit('request', meta)
// perform the actual http request
return meta.connection.request(params, onResponse)
}
if (this.suggestCompression === true) {
params.headers['Accept-Encoding'] = 'gzip,deflate'
}
// serializes the querystring
params.querystring = this.serializer.qserialize(params.querystring)
// handles request timeout
params.timeout = toMs(params.requestTimeout || this.requestTimeout)
this.emit('request', connection, params)
// perform the actual http request
const request = connection.request(params, (err, response) => {
if (err != null) {
const onResponse = (err, response) => {
if (err !== null) {
// if there is an error in the connection
// let's mark the connection as dead
this.connectionPool.markDead(connection)
this.connectionPool.markDead(meta.connection)
if (this.sniffOnConnectionFault === true) {
this.sniff()
}
// retry logic
if (attempts > 0) {
debug(`Retrying request, there are still ${attempts} attempts`, params)
params[kRemainingAttempts] = attempts - 1
return this.request(params, callback)
if (meta.attempts < maxRetries) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
request = makeRequest(params, callback)
return
}
const error = err instanceof TimeoutError
? err
: new ConnectionError(err.message, params)
this.emit('error', error, connection, params)
this.emit('error', error, meta)
return callback(error, result)
}
@ -121,7 +141,8 @@ class Transport {
if (params.asStream === true) {
result.body = response
this.emit('response', connection, params, result)
meta.response = result
this.emit('response', meta)
callback(null, result)
return
}
@ -145,7 +166,7 @@ class Transport {
try {
result.body = this.serializer.deserialize(payload)
} catch (err) {
this.emit('error', err, connection, params)
this.emit('error', err, meta)
return callback(err, result)
}
} else {
@ -162,19 +183,22 @@ class Transport {
(statusCode === 502 || statusCode === 503 || statusCode === 504)) {
// if the statusCode is 502/3/4 we should run our retry strategy
// and mark the connection as dead
this.connectionPool.markDead(connection)
if (attempts > 0) {
debug(`Retrying request, there are still ${attempts} attempts`, params)
params[kRemainingAttempts] = attempts - 1
return this.request(params, callback)
this.connectionPool.markDead(meta.connection)
// retry logic
if (meta.attempts < maxRetries) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
request = makeRequest(params, callback)
return
}
} else {
// everything has worked as expected, let's mark
// the connection as alive (or confirm it)
this.connectionPool.markAlive(connection)
this.connectionPool.markAlive(meta.connection)
}
this.emit('response', connection, params, result)
meta.response = result
this.emit('response', meta)
if (ignoreStatusCode === false && statusCode >= 400) {
callback(new ResponseError(result), result)
} else {
@ -185,12 +209,15 @@ class Transport {
callback(null, result)
}
})
})
}
request = makeRequest()
return {
abort: () => {
meta.aborted = true
request.abort()
debug('Request aborted', params)
debug('Aborting request', params)
}
}
}
@ -204,6 +231,8 @@ class Transport {
return this.connectionPool.getConnection()
}
// TODO: add sniff reason
// 'connection-fault', 'interval', 'start', ...
sniff (callback = noop) {
if (this._isSniffing === true) return
this._isSniffing = true
@ -221,8 +250,8 @@ class Transport {
}
if (err != null) {
this.emit('error', err, null, request)
debug('Sniffing errored', err)
this.emit('sniff', err, null)
return callback(err)
}
@ -230,6 +259,7 @@ class Transport {
const hosts = this.connectionPool.nodesToHost(result.body.nodes)
this.connectionPool.update(hosts)
this.emit('sniff', null, hosts)
callback(null, hosts)
})
}