Files
elasticsearch-js/lib/ConnectionPool.js
Tomas Della Vedova 5b856cd4c2 Child client support (#768)
With this pr we introduce the `client.child` API, which returns a new client instance that shares the connection pool with the parent client.
This feature can be handy if you need to have multiple client instances with different configurations, but with a shared connection pool.

Example:

```js
const { Client } = require('@elastic/elasticsearch')
const client = new Client({ node: 'http://localhost:9200' })
const child = client.child({
  headers: { 'x-foo': 'bar' },
  requestTimeout: 1000
})

client.info(console.log)
child.info(console.log)
```

**Open questions:**

* Currently, the event emitter is shared between the parent and the child(ren), is it ok?
* Currently, if you extend the parent client, the child client will have the same extensions, while if the child client adds an extension, the parent client will not be extended. Is it ok?

**Caveats:**

* You can override _any_ option except for the connection pool specific options (`ssl`, `agent`, `pingTimeout`, `Connection`, and `resurrectStrategy`).
* You can't specify a new `Connection` class.
* If you call `close` in any of the parent/child clients, every client will be closed.

_Note: the `nodeFilter` and `nodeSelector` options are now `Transport` options and no longer `ConnectionPool` options._
2019-02-21 12:48:49 +01:00

374 lines
11 KiB
JavaScript

'use strict'
const assert = require('assert')
const { URL } = require('url')
const debug = require('debug')('elasticsearch')
const Connection = require('./Connection')
const noop = () => {}
class ConnectionPool {
constructor (opts = {}) {
this.connections = new Map()
this.dead = []
this.selector = opts.selector
this._auth = null
this._ssl = opts.ssl
this._agent = opts.agent
// the resurrect timeout is 60s
// we multiply it by 2 because the resurrect formula is
// `Math.pow(resurrectTimeout * 2, deadCount -1)`
// and we don't need to multiply by 2
// the resurrectTimeout every time
this.resurrectTimeout = 1000 * 60 * 2
// number of consecutive failures after which
// the timeout doesn't increase
this.resurrectTimeoutCutoff = 5
this.pingTimeout = opts.pingTimeout
this.Connection = opts.Connection
this.emit = opts.emit || noop
this._sniffEnabled = opts.sniffEnabled || false
const resurrectStrategy = opts.resurrectStrategy || 'ping'
this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy]
assert(
this.resurrectStrategy != null,
`Invalid resurrection strategy: '${resurrectStrategy}'`
)
}
/**
* Marks a connection as 'alive'.
* If needed removes the connection from the dead list
* and then resets the `deadCount`.
* If sniffing is not enabled and there is only
* one node, this method is a noop.
*
* @param {object} connection
*/
markAlive (connection) {
if (this._sniffEnabled === false && this.connections.size === 1) return
const { id } = connection
debug(`Marking as 'alive' connection '${id}'`)
const index = this.dead.indexOf(id)
if (index > -1) this.dead.splice(index, 1)
connection.status = Connection.statuses.ALIVE
connection.deadCount = 0
connection.resurrectTimeout = 0
}
/**
* Marks a connection as 'dead'.
* If needed adds the connection to the dead list
* and then increments the `deadCount`.
* If sniffing is not enabled and there is only
* one node, this method is a noop.
*
* @param {object} connection
*/
markDead (connection) {
if (this._sniffEnabled === false && this.connections.size === 1) return
const { id } = connection
debug(`Marking as 'dead' connection '${id}'`)
if (this.dead.indexOf(id) === -1) {
this.dead.push(id)
}
connection.status = Connection.statuses.DEAD
connection.deadCount++
// resurrectTimeout formula:
// `Math.pow(resurrectTimeout * 2, deadCount -1)`
// we don't need to multiply the resurrectTimeout by 2
// every time, it is cached during the initialization
connection.resurrectTimeout = Date.now() + Math.pow(
this.resurrectTimeout,
Math.min(
connection.deadCount - 1,
this.resurrectTimeoutCutoff
)
)
// sort the dead list in ascending order
// based on the resurrectTimeout
this.dead.sort((a, b) => {
const conn1 = this.connections.get(a)
const conn2 = this.connections.get(b)
return conn1.resurrectTimeout - conn2.resurrectTimeout
})
}
/**
* If enabled, tries to resurrect a connection with the given
* resurrect strategy ('ping', 'optimistic', 'none').
*
* @param {number} epoch
* @param {function} callback (isAlive, connection)
*/
resurrect (now = Date.now(), callback = noop) {
if (this.resurrectStrategy === 0 || this.dead.length === 0) {
debug('Nothing to resurrect')
callback(null, null)
return
}
// the dead list is sorted in ascending order based on the timeout
// so the first element will always be the one with the smaller timeout
const connection = this.connections.get(this.dead[0])
if (now < connection.resurrectTimeout) {
debug('Nothing to resurrect')
callback(null, null)
return
}
const { id } = connection
// ping strategy
if (this.resurrectStrategy === 1) {
connection.request({
method: 'HEAD',
path: '/',
timeout: this.pingTimeout
}, (err, response) => {
var isAlive = true
const statusCode = response !== null ? response.statusCode : 0
if (err != null ||
(statusCode === 502 || statusCode === 503 || statusCode === 504)) {
debug(`Resurrect: connection '${id}' is still dead`)
this.markDead(connection)
isAlive = false
} else {
debug(`Resurrect: connection '${id}' is now alive`)
this.markAlive(connection)
}
this.emit('resurrect', null, { strategy: 'ping', isAlive, connection })
callback(isAlive, connection)
})
// optimistic strategy
} else {
debug(`Resurrect: optimistic resurrection for connection '${id}'`)
this.dead.splice(this.dead.indexOf(id), 1)
connection.status = Connection.statuses.ALIVE
this.emit('resurrect', null, { strategy: 'optimistic', isAlive: true, connection })
// eslint-disable-next-line standard/no-callback-literal
callback(true, connection)
}
}
/**
* Returns an alive connection if present,
* otherwise returns null.
* By default it filters the `master` only nodes.
* It uses the selector to choose which
* connection return.
*
* @param {object} options (filter and selector)
* @returns {object|null} connection
*/
getConnection (opts = {}) {
const filter = opts.filter || (() => true)
const selector = opts.selector || (c => c[0])
// TODO: can we cache this?
const connections = []
for (var connection of this.connections.values()) {
if (connection.status === Connection.statuses.ALIVE) {
if (filter(connection) === true) {
connections.push(connection)
}
}
}
if (connections.length === 0) return null
return selector(connections)
}
/**
* Adds a new connection to the pool.
*
* @param {object|string} host
* @returns {ConnectionPool}
*/
addConnection (opts) {
if (Array.isArray(opts)) {
opts.forEach(o => this.addConnection(o))
return
}
if (typeof opts === 'string') {
opts = this.urlToHost(opts)
}
// if a given node has auth data we store it in the connection pool,
// so if we add new nodes without auth data (after a sniff for example)
// we can add it to them once the connection instance has been created
if (opts.url.username !== '' && opts.url.password !== '') {
this._auth = {
username: opts.url.username,
password: opts.url.password
}
}
if (opts.ssl == null) opts.ssl = this._ssl
if (opts.agent == null) opts.agent = this._agent
const connection = new this.Connection(opts)
if (connection.url.username === '' &&
connection.url.password === '' &&
this._auth != null) {
connection.url.username = this._auth.username
connection.url.password = this._auth.password
}
debug('Adding a new connection', connection)
if (this.connections.has(connection.id)) {
throw new Error(`Connection with id '${connection.id}' is already present`)
}
this.connections.set(connection.id, connection)
return connection
}
/**
* Removes a new connection to the pool.
*
* @param {object} connection
* @returns {ConnectionPool}
*/
removeConnection (connection) {
debug('Removing connection', connection)
connection.close(noop)
const { id } = connection
this.connections.delete(id)
var index = this.dead.indexOf(id)
if (index > -1) this.dead.splice(index, 1)
return this
}
/**
* Empties the connection pool.
*
* @returns {ConnectionPool}
*/
empty (callback) {
debug('Emptying the connection pool')
var openConnections = this.connections.size
this.connections.forEach(connection => {
connection.close(() => {
if (--openConnections === 0) {
this.connections = new Map()
this.dead = []
callback()
}
})
})
}
/**
* Update the ConnectionPool with new connections.
*
* @param {array} array of connections
* @returns {ConnectionPool}
*/
update (connections) {
debug('Updating the connection pool')
for (var i = 0; i < connections.length; i++) {
const connection = connections[i]
// if we already have a given connection in the pool
// we check its status, if is 'alive', we do nothing,
// if 'dead' we mark it as alive, we do not close the old
// one to avoid socket issues
if (this.connections.has(connection.id) === true) {
debug(`The connection with id '${connection.id}' is already present`)
const oldConnection = this.connections.get(connection.id)
if (oldConnection.status === Connection.statuses.DEAD) {
this.markAlive(oldConnection)
}
// in case the user has passed a single url (or an array of urls),
// the connection id will be the full href; to avoid closing valid connections
// because are not present in the pool, we check also the node url,
// and if is already present we update its id with the ES provided one.
} else if (this.connections.has(connection.url.href) === true) {
const oldConnection = this.connections.get(connection.url.href)
this.connections.delete(connection.url.href)
oldConnection.id = connection.id
this.connections.set(connection.id, oldConnection)
if (oldConnection.status === Connection.statuses.DEAD) {
this.markAlive(oldConnection)
}
} else {
this.addConnection(connection)
}
}
const ids = connections.map(c => c.id)
// remove all the dead connections and old connections
for (const connection of this.connections.values()) {
if (ids.indexOf(connection.id) === -1) {
this.removeConnection(connection)
}
}
return this
}
/**
* Transforms the nodes objects to a host object.
*
* @param {object} nodes
* @returns {array} hosts
*/
nodesToHost (nodes) {
const ids = Object.keys(nodes)
const hosts = []
for (var i = 0, len = ids.length; i < len; i++) {
const node = nodes[ids[i]]
// If there is no protocol in
// the `publish_address` new URL wil throw
var address = node.http.publish_address
address = address.slice(0, 4) === 'http'
? address
: 'http://' + address
const roles = node.roles.reduce((acc, role) => {
acc[role] = true
return acc
}, {})
hosts.push({
url: new URL(address),
id: ids[i],
roles: Object.assign({
[Connection.roles.MASTER]: true,
[Connection.roles.DATA]: true,
[Connection.roles.INGEST]: true,
[Connection.roles.ML]: false
}, roles)
})
}
return hosts
}
/**
* Transforms an url string to a host object
*
* @param {string} url
* @returns {object} host
*/
urlToHost (url) {
return {
url: new URL(url)
}
}
}
ConnectionPool.resurrectStrategies = {
none: 0,
ping: 1,
optimistic: 2
}
// https://gist.github.com/guilhermepontes/17ae0cc71fa2b13ea8c20c94c5c35dc4
// const shuffleArray = arr => arr
// .map(a => [Math.random(), a])
// .sort((a, b) => a[0] - b[0])
// .map(a => a[1])
module.exports = ConnectionPool