From e97f66b0c3d987d93eba9e6c65916c700c23270e Mon Sep 17 00:00:00 2001 From: delvedor Date: Thu, 8 Nov 2018 19:34:12 +0100 Subject: [PATCH] WIP: initial prototype - Updated ConnectionPool internals - Added ConnectionPool.update api - Experimental: return http response in transform --- lib/ConnectionPool.js | 89 ++++++++++++++++++++++++------------------- lib/Transport.js | 12 ++++-- 2 files changed, 59 insertions(+), 42 deletions(-) diff --git a/lib/ConnectionPool.js b/lib/ConnectionPool.js index 2ed3cc9f0..fdf48e96e 100644 --- a/lib/ConnectionPool.js +++ b/lib/ConnectionPool.js @@ -9,9 +9,6 @@ const noop = () => {} class ConnectionPool { constructor (opts = {}) { this.connections = new Map() - // TODO: do we need those queue? (or find a better use) - // can we use just the connections map? - this.alive = [] this.dead = [] this.selector = opts.selector this._ssl = opts.ssl @@ -28,7 +25,6 @@ class ConnectionPool { this.pingTimeout = opts.pingTimeout this.randomizeHost = opts.randomizeHost === true this.nodeFilter = opts.nodeFilter || defaultNodeFilter - this.nodeWeighter = opts.nodeWeighter || noop if (typeof opts.nodeSelector === 'function') { this.nodeSelector = opts.nodeSelector @@ -50,29 +46,25 @@ class ConnectionPool { /** * Marks a connection as 'alive'. - * If needed moves the connection from the dead list - * to the alive list and then resets the `deadCount`. + * If needed removes the connection from the dead list + * and then resets the `deadCount`. * * @param {object} connection */ markAlive (connection) { const { id } = connection debug(`Marking as 'alive' connection '${id}'`) - if (this.alive.indexOf(id) === -1) { - this.alive.push(id) - const index = this.dead.indexOf(id) - if (index > -1) this.dead.splice(index, 1) - } + const index = this.dead.indexOf(id) + if (index > -1) this.dead.splice(index, 1) connection.status = Connection.statuses.ALIVE connection.deadCount = 0 connection.resurrectTimeout = 0 - this.connections.set(id, connection) } /** * Marks a connection as 'dead'. - * If needed moves the connection from the alive list - * to the dead list and then increments the `deadCount`. + * If needed adds the connection to the dead list + * and then increments the `deadCount`. * * @param {object} connection */ @@ -81,8 +73,6 @@ class ConnectionPool { debug(`Marking as 'dead' connection '${id}'`) if (this.dead.indexOf(id) === -1) { this.dead.push(id) - const index = this.alive.indexOf(id) - if (index > -1) this.alive.splice(index, 1) } connection.status = Connection.statuses.DEAD connection.deadCount++ @@ -97,7 +87,6 @@ class ConnectionPool { this.resurrectTimeoutCutoff ) ) - this.connections.set(id, connection) // sort the dead list in ascending order // based on the resurrectTimeout @@ -152,10 +141,8 @@ class ConnectionPool { // optimistic strategy } else { debug(`Resurrect: optimistic resurrection for connection '${id}'`) - this.alive.push(id) this.dead.splice(this.dead.indexOf(id), 1) connection.status = Connection.statuses.ALIVE - this.connections.set(id, connection) // eslint-disable-next-line standard/no-callback-literal callback(true, connection) } @@ -164,21 +151,15 @@ class ConnectionPool { /** * Returns an alive connection if present, * otherwise returns null. - * By default it does not apply a weighter to the node list, - * and filters the `master` only nodes + * By default it filters the `master` only nodes. * It uses the selector to choose which * connection return. * - * @param {object} options (weighter, filter and selector) + * @param {object} options (filter and selector) * @returns {object|null} connection */ getConnection (opts = {}) { - if (this.alive.length === 0) { - return null - } - const filter = opts.filter || this.nodeFilter - const weighter = opts.weighter || this.nodeWeighter const selector = opts.selector || this.nodeSelector // TODO: can we cache this? @@ -190,7 +171,8 @@ class ConnectionPool { } } } - connections.sort(weighter) + + if (connections.length === 0) return null return selector(connections) } @@ -219,10 +201,6 @@ class ConnectionPool { throw new Error(`Connection with id '${connection.id}' is already present`) } this.connections.set(connection.id, connection) - this.alive.push(connection.id) - if (this.randomizeHost === true) { - this.alive = shuffleArray(this.alive) - } return connection } @@ -239,8 +217,6 @@ class ConnectionPool { this.connections.delete(id) var index = this.dead.indexOf(id) if (index > -1) this.dead.splice(index, 1) - index = this.alive.indexOf(id) - if (index > -1) this.alive.splice(index, 1) return this } @@ -255,11 +231,46 @@ class ConnectionPool { connection.close() }) this.connections = new Map() - this.alive = [] this.dead = [] return this } + /** + * Update the ConnectionPool with new connections. + * + * @param {array} array of connections + * @returns {ConnectionPool} + */ + update (connections) { + debug('Updating the connection pool') + for (var i = 0; i < connections.length; i++) { + const connection = connections[i] + // if we already have a given connection in the pool + // we check its status, if is 'alive', we do nothing, + // if 'dead' we mark it as alive, we do not close the old + // one to avoid socket issues + if (this.connections.has(connection.id) === true) { + debug(`The connection with id '${connection.id}' is already present`) + const oldConnection = this.connections.get(connection.id) + if (oldConnection.status === Connection.statuses.DEAD) { + this.markAlive(oldConnection) + } + } else { + this.addConnection(connection) + } + } + + const ids = connections.map(c => c.id) + // remove all the dead connections and old connections + for (const connection of this.connections.values()) { + if (ids.indexOf(connection.id) === -1) { + this.removeConnection(connection) + } + } + + return this + } + /** * Transforms the nodes objects to a host object. * @@ -311,10 +322,10 @@ ConnectionPool.resurrectStrategies = { } // https://gist.github.com/guilhermepontes/17ae0cc71fa2b13ea8c20c94c5c35dc4 -const shuffleArray = arr => arr - .map(a => [Math.random(), a]) - .sort((a, b) => a[0] - b[0]) - .map(a => a[1]) +// const shuffleArray = arr => arr +// .map(a => [Math.random(), a]) +// .sort((a, b) => a[0] - b[0]) +// .map(a => a[1]) function defaultNodeFilter (node) { // avoid master only nodes diff --git a/lib/Transport.js b/lib/Transport.js index 0673df7e5..e957e94dc 100644 --- a/lib/Transport.js +++ b/lib/Transport.js @@ -34,6 +34,7 @@ class Transport { } } + // TODO: should be able to send a stream of json data request (params, callback) { callback = once(callback) const result = { body: null, statusCode: null, headers: null, warnings: null } @@ -115,6 +116,13 @@ class Transport { result.warnings = headers['warning'].split(/(?!\B"[^"]*),(?![^"]*"\B)/) } + // TODO: expose `asStream` option for returning the + // body already parsed? + if (params.asHttpResponse === true) { + callback(null, response) + return + } + var payload = '' // collect the payload response.setEncoding('utf8') @@ -215,9 +223,7 @@ class Transport { debug('Sniffing ended successfully', body) const hosts = this.connectionPool.nodesToHost(body.nodes) - this.connectionPool - .empty() - .addConnection(hosts) + this.connectionPool.update(hosts) callback(null, hosts) })