WIP: initial prototype

- Updated ConnectionPool internals
- Added ConnectionPool.update api
- Experimental: return http response in transform
This commit is contained in:
delvedor
2018-11-08 19:34:12 +01:00
parent c6b8665edd
commit e97f66b0c3
2 changed files with 59 additions and 42 deletions

View File

@ -9,9 +9,6 @@ const noop = () => {}
class ConnectionPool {
constructor (opts = {}) {
this.connections = new Map()
// TODO: do we need those queue? (or find a better use)
// can we use just the connections map?
this.alive = []
this.dead = []
this.selector = opts.selector
this._ssl = opts.ssl
@ -28,7 +25,6 @@ class ConnectionPool {
this.pingTimeout = opts.pingTimeout
this.randomizeHost = opts.randomizeHost === true
this.nodeFilter = opts.nodeFilter || defaultNodeFilter
this.nodeWeighter = opts.nodeWeighter || noop
if (typeof opts.nodeSelector === 'function') {
this.nodeSelector = opts.nodeSelector
@ -50,29 +46,25 @@ class ConnectionPool {
/**
* Marks a connection as 'alive'.
* If needed moves the connection from the dead list
* to the alive list and then resets the `deadCount`.
* If needed removes the connection from the dead list
* and then resets the `deadCount`.
*
* @param {object} connection
*/
markAlive (connection) {
const { id } = connection
debug(`Marking as 'alive' connection '${id}'`)
if (this.alive.indexOf(id) === -1) {
this.alive.push(id)
const index = this.dead.indexOf(id)
if (index > -1) this.dead.splice(index, 1)
}
const index = this.dead.indexOf(id)
if (index > -1) this.dead.splice(index, 1)
connection.status = Connection.statuses.ALIVE
connection.deadCount = 0
connection.resurrectTimeout = 0
this.connections.set(id, connection)
}
/**
* Marks a connection as 'dead'.
* If needed moves the connection from the alive list
* to the dead list and then increments the `deadCount`.
* If needed adds the connection to the dead list
* and then increments the `deadCount`.
*
* @param {object} connection
*/
@ -81,8 +73,6 @@ class ConnectionPool {
debug(`Marking as 'dead' connection '${id}'`)
if (this.dead.indexOf(id) === -1) {
this.dead.push(id)
const index = this.alive.indexOf(id)
if (index > -1) this.alive.splice(index, 1)
}
connection.status = Connection.statuses.DEAD
connection.deadCount++
@ -97,7 +87,6 @@ class ConnectionPool {
this.resurrectTimeoutCutoff
)
)
this.connections.set(id, connection)
// sort the dead list in ascending order
// based on the resurrectTimeout
@ -152,10 +141,8 @@ class ConnectionPool {
// optimistic strategy
} else {
debug(`Resurrect: optimistic resurrection for connection '${id}'`)
this.alive.push(id)
this.dead.splice(this.dead.indexOf(id), 1)
connection.status = Connection.statuses.ALIVE
this.connections.set(id, connection)
// eslint-disable-next-line standard/no-callback-literal
callback(true, connection)
}
@ -164,21 +151,15 @@ class ConnectionPool {
/**
* Returns an alive connection if present,
* otherwise returns null.
* By default it does not apply a weighter to the node list,
* and filters the `master` only nodes
* By default it filters the `master` only nodes.
* It uses the selector to choose which
* connection return.
*
* @param {object} options (weighter, filter and selector)
* @param {object} options (filter and selector)
* @returns {object|null} connection
*/
getConnection (opts = {}) {
if (this.alive.length === 0) {
return null
}
const filter = opts.filter || this.nodeFilter
const weighter = opts.weighter || this.nodeWeighter
const selector = opts.selector || this.nodeSelector
// TODO: can we cache this?
@ -190,7 +171,8 @@ class ConnectionPool {
}
}
}
connections.sort(weighter)
if (connections.length === 0) return null
return selector(connections)
}
@ -219,10 +201,6 @@ class ConnectionPool {
throw new Error(`Connection with id '${connection.id}' is already present`)
}
this.connections.set(connection.id, connection)
this.alive.push(connection.id)
if (this.randomizeHost === true) {
this.alive = shuffleArray(this.alive)
}
return connection
}
@ -239,8 +217,6 @@ class ConnectionPool {
this.connections.delete(id)
var index = this.dead.indexOf(id)
if (index > -1) this.dead.splice(index, 1)
index = this.alive.indexOf(id)
if (index > -1) this.alive.splice(index, 1)
return this
}
@ -255,11 +231,46 @@ class ConnectionPool {
connection.close()
})
this.connections = new Map()
this.alive = []
this.dead = []
return this
}
/**
* Update the ConnectionPool with new connections.
*
* @param {array} array of connections
* @returns {ConnectionPool}
*/
update (connections) {
debug('Updating the connection pool')
for (var i = 0; i < connections.length; i++) {
const connection = connections[i]
// if we already have a given connection in the pool
// we check its status, if is 'alive', we do nothing,
// if 'dead' we mark it as alive, we do not close the old
// one to avoid socket issues
if (this.connections.has(connection.id) === true) {
debug(`The connection with id '${connection.id}' is already present`)
const oldConnection = this.connections.get(connection.id)
if (oldConnection.status === Connection.statuses.DEAD) {
this.markAlive(oldConnection)
}
} else {
this.addConnection(connection)
}
}
const ids = connections.map(c => c.id)
// remove all the dead connections and old connections
for (const connection of this.connections.values()) {
if (ids.indexOf(connection.id) === -1) {
this.removeConnection(connection)
}
}
return this
}
/**
* Transforms the nodes objects to a host object.
*
@ -311,10 +322,10 @@ ConnectionPool.resurrectStrategies = {
}
// https://gist.github.com/guilhermepontes/17ae0cc71fa2b13ea8c20c94c5c35dc4
const shuffleArray = arr => arr
.map(a => [Math.random(), a])
.sort((a, b) => a[0] - b[0])
.map(a => a[1])
// const shuffleArray = arr => arr
// .map(a => [Math.random(), a])
// .sort((a, b) => a[0] - b[0])
// .map(a => a[1])
function defaultNodeFilter (node) {
// avoid master only nodes

View File

@ -34,6 +34,7 @@ class Transport {
}
}
// TODO: should be able to send a stream of json data
request (params, callback) {
callback = once(callback)
const result = { body: null, statusCode: null, headers: null, warnings: null }
@ -115,6 +116,13 @@ class Transport {
result.warnings = headers['warning'].split(/(?!\B"[^"]*),(?![^"]*"\B)/)
}
// TODO: expose `asStream` option for returning the
// body already parsed?
if (params.asHttpResponse === true) {
callback(null, response)
return
}
var payload = ''
// collect the payload
response.setEncoding('utf8')
@ -215,9 +223,7 @@ class Transport {
debug('Sniffing ended successfully', body)
const hosts = this.connectionPool.nodesToHost(body.nodes)
this.connectionPool
.empty()
.addConnection(hosts)
this.connectionPool.update(hosts)
callback(null, hosts)
})