From 020165168c5904afcab10fcbc7bd9a0bc0894aee Mon Sep 17 00:00:00 2001 From: delvedor Date: Mon, 19 Nov 2018 11:33:40 +0100 Subject: [PATCH] WIP: initial prototype - Expose connection info inside events - Fixed minor bugs --- index.js | 21 ++++++++++++--------- lib/Connection.js | 1 + lib/ConnectionPool.js | 3 ++- lib/Transport.js | 22 ++++++++++++---------- 4 files changed, 27 insertions(+), 20 deletions(-) diff --git a/index.js b/index.js index 644f32ece..cc5055da6 100644 --- a/index.js +++ b/index.js @@ -59,14 +59,15 @@ class Client extends EventEmitter { this[kSerializer] = new options.Serializer() this[kConnectionPool] = new options.ConnectionPool({ - pingTimeout: opts.pingTimeout, - resurrectStrategy: opts.resurrectStrategy, - randomizeHost: opts.randomizeHost, + pingTimeout: options.pingTimeout, + resurrectStrategy: options.resurrectStrategy, + randomizeHost: options.randomizeHost, ssl: options.ssl, - agent: null, - nodeFilter: opts.nodeFilter, - nodeWeighter: opts.nodeWeighter, - nodeSelector: opts.nodeSelector + agent: options.agent, + nodeFilter: options.nodeFilter, + nodeWeighter: options.nodeWeighter, + nodeSelector: options.nodeSelector, + Connection: options.Connection }) // Add the connections before initialize the Transport @@ -99,7 +100,7 @@ class Client extends EventEmitter { } } -Client.events = { +const events = { RESPONSE: 'response', REQUEST: 'request', ERROR: 'error' @@ -109,6 +110,8 @@ module.exports = { Client, Transport, ConnectionPool, + Connection, Serializer, - symbols + symbols, + events } diff --git a/lib/Connection.js b/lib/Connection.js index 0fe0b0997..028cfdf51 100644 --- a/lib/Connection.js +++ b/lib/Connection.js @@ -86,6 +86,7 @@ class Connection { return request } + // TODO: write a better closing logic close () { debug('Closing connection', this.id) if (this._openRequests > 0) { diff --git a/lib/ConnectionPool.js b/lib/ConnectionPool.js index a42832ca0..47e461751 100644 --- a/lib/ConnectionPool.js +++ b/lib/ConnectionPool.js @@ -25,6 +25,7 @@ class ConnectionPool { this.pingTimeout = opts.pingTimeout this.randomizeHost = opts.randomizeHost === true this.nodeFilter = opts.nodeFilter || defaultNodeFilter + this.Connection = opts.Connection if (typeof opts.nodeSelector === 'function') { this.nodeSelector = opts.nodeSelector @@ -195,7 +196,7 @@ class ConnectionPool { if (opts.ssl == null) opts.ssl = this._ssl if (opts.agent == null) opts.agent = this._agent - const connection = new Connection(opts) + const connection = new this.Connection(opts) debug('Adding a new connection', connection) if (this.connections.has(connection.id)) { throw new Error(`Connection with id '${connection.id}' is already present`) diff --git a/lib/Transport.js b/lib/Transport.js index c37048ed0..83e4445c3 100644 --- a/lib/Transport.js +++ b/lib/Transport.js @@ -80,8 +80,7 @@ class Transport { // handles request timeout params.timeout = toMs(params.requestTimeout || this.requestTimeout) - // TODO: expose nicely the node metadata (also in response an error) - this.emit('request', params, connection) + this.emit('request', connection, params) // perform the actual http request const request = connection.request(params, (err, response) => { @@ -105,7 +104,7 @@ class Transport { ? err : new ConnectionError(err.message, params) - this.emit('error', error, params) + this.emit('error', error, connection, params) return callback(error, result) } @@ -119,6 +118,7 @@ class Transport { if (params.asStream === true) { result.body = response + this.emit('response', connection, params, result) callback(null, result) return } @@ -142,7 +142,7 @@ class Transport { try { result.body = this.serializer.deserialize(payload) } catch (err) { - this.emit('error', err, params) + this.emit('error', err, connection, params) return callback(err, result) } } else { @@ -171,7 +171,7 @@ class Transport { this.connectionPool.markAlive(connection) } - this.emit('response', params, result) + this.emit('response', connection, params, result) if (ignoreStatusCode === false && statusCode >= 400) { callback(new ResponseError(result), result) } else { @@ -206,23 +206,25 @@ class Transport { this._isSniffing = true debug('Started sniffing request') - this.request({ + const request = { method: 'GET', path: this.sniffEndpoint - }, (err, body) => { + } + + this.request(request, (err, result) => { this._isSniffing = false if (this._sniffEnabled === true) { this._nextSniff = Date.now() + this.sniffInterval } if (err != null) { - this.emit('error', err) + this.emit('error', err, null, request) debug('Sniffing errored', err) return callback(err) } - debug('Sniffing ended successfully', body) - const hosts = this.connectionPool.nodesToHost(body.nodes) + debug('Sniffing ended successfully', result.body) + const hosts = this.connectionPool.nodesToHost(result.body.nodes) this.connectionPool.update(hosts) callback(null, hosts)