Refactored connection pool (#913)

* Refactored ConnectionPool
- Created BaseConnectionPool class
- Created CloudConnectionPool
- connection pool updates are immutable
- resurrect now happens inside getConnection()

* Rewritten connection pool(s) type definitions

* Updated test

* Fixed test

* Fix if check

* Removed old files

* Improve code coverage

* Updated license header

* Fix if check

* Improve code coverage

* Updated coverage script
This commit is contained in:
Tomas Della Vedova
2019-07-26 11:43:48 +02:00
committed by GitHub
parent fa07de3284
commit a948a98be6
19 changed files with 1305 additions and 620 deletions

View File

@ -0,0 +1,239 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
'use strict'
const { URL } = require('url')
const debug = require('debug')('elasticsearch')
const Connection = require('../Connection')
const noop = () => {}
class BaseConnectionPool {
constructor (opts) {
// list of nodes and weights
this.connections = []
// how many nodes we have in our scheduler
this.size = this.connections.length
this.Connection = opts.Connection
this.emit = opts.emit || noop
this.auth = opts.auth || null
this._ssl = opts.ssl
this._agent = opts.agent
}
getConnection () {
throw new Error('getConnection must be implemented')
}
markAlive () {
return this
}
markDead () {
return this
}
/**
* Creates a new connection instance.
*/
createConnection (opts) {
if (typeof opts === 'string') {
opts = this.urlToHost(opts)
}
if (opts.url.username !== '' && opts.url.password !== '') {
opts.auth = {
username: decodeURIComponent(opts.url.username),
password: decodeURIComponent(opts.url.password)
}
} else if (this.auth !== null) {
opts.auth = this.auth
}
if (opts.ssl == null) opts.ssl = this._ssl
if (opts.agent == null) opts.agent = this._agent
const connection = new this.Connection(opts)
for (const conn of this.connections) {
if (conn.id === connection.id) {
throw new Error(`Connection with id '${connection.id}' is already present`)
}
}
return connection
}
/**
* Adds a new connection to the pool.
*
* @param {object|string} host
* @returns {ConnectionPool}
*/
addConnection (opts) {
if (Array.isArray(opts)) {
return opts.forEach(o => this.addConnection(o))
}
if (typeof opts === 'string') {
opts = this.urlToHost(opts)
}
const connectionById = this.connections.find(c => c.id === opts.id)
const connectionByUrl = this.connections.find(c => c.id === opts.url.href)
if (connectionById || connectionByUrl) {
throw new Error(`Connection with id '${opts.id || opts.url.href}' is already present`)
}
this.update([...this.connections, opts])
return this.connections[this.size - 1]
}
/**
* Removes a new connection to the pool.
*
* @param {object} connection
* @returns {ConnectionPool}
*/
removeConnection (connection) {
debug('Removing connection', connection)
return this.update(this.connections.filter(c => c.id !== connection.id))
}
/**
* Empties the connection pool.
*
* @returns {ConnectionPool}
*/
empty (callback) {
debug('Emptying the connection pool')
var openConnections = this.size
this.connections.forEach(connection => {
connection.close(() => {
if (--openConnections === 0) {
this.connections = []
this.size = this.connections.length
callback()
}
})
})
}
/**
* Update the ConnectionPool with new connections.
*
* @param {array} array of connections
* @returns {ConnectionPool}
*/
update (nodes) {
debug('Updating the connection pool')
const newConnections = []
const oldConnections = []
for (const node of nodes) {
// if we already have a given connection in the pool
// we mark it as alive and we do not close the connection
// to avoid socket issues
const connectionById = this.connections.find(c => c.id === node.id)
const connectionByUrl = this.connections.find(c => c.id === node.url.href)
if (connectionById) {
debug(`The connection with id '${node.id}' is already present`)
this.markAlive(connectionById)
newConnections.push(connectionById)
// in case the user has passed a single url (or an array of urls),
// the connection id will be the full href; to avoid closing valid connections
// because are not present in the pool, we check also the node url,
// and if is already present we update its id with the ES provided one.
} else if (connectionByUrl) {
connectionByUrl.id = node.id
this.markAlive(connectionByUrl)
newConnections.push(connectionByUrl)
} else {
newConnections.push(this.createConnection(node))
}
}
const ids = nodes.map(c => c.id)
// remove all the dead connections and old connections
for (const connection of this.connections) {
if (ids.indexOf(connection.id) === -1) {
oldConnections.push(connection)
}
}
// close old connections
oldConnections.forEach(connection => connection.close())
this.connections = newConnections
this.size = this.connections.length
return this
}
/**
* Transforms the nodes objects to a host object.
*
* @param {object} nodes
* @returns {array} hosts
*/
nodesToHost (nodes, protocol) {
const ids = Object.keys(nodes)
const hosts = []
for (var i = 0, len = ids.length; i < len; i++) {
const node = nodes[ids[i]]
// If there is no protocol in
// the `publish_address` new URL will throw
// the publish_address can have two forms:
// - ip:port
// - hostname/ip:port
// if we encounter the second case, we should
// use the hostname instead of the ip
var address = node.http.publish_address
const parts = address.split('/')
// the url is in the form of hostname/ip:port
if (parts.length > 1) {
const hostname = parts[0]
const port = parts[1].match(/((?::))(?:[0-9]+)$/g)[0].slice(1)
address = `${hostname}:${port}`
}
address = address.slice(0, 4) === 'http'
? address
: `${protocol}//${address}`
const roles = node.roles.reduce((acc, role) => {
acc[role] = true
return acc
}, {})
hosts.push({
url: new URL(address),
id: ids[i],
roles: Object.assign({
[Connection.roles.MASTER]: true,
[Connection.roles.DATA]: true,
[Connection.roles.INGEST]: true,
[Connection.roles.ML]: false
}, roles)
})
}
return hosts
}
/**
* Transforms an url string to a host object
*
* @param {string} url
* @returns {object} host
*/
urlToHost (url) {
return {
url: new URL(url)
}
}
}
module.exports = BaseConnectionPool

View File

@ -0,0 +1,49 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
'use strict'
const BaseConnectionPool = require('./BaseConnectionPool')
class CloudConnectionPool extends BaseConnectionPool {
constructor (opts = {}) {
super(opts)
this.cloudConnection = null
}
/**
* Returns the only cloud connection.
*
* @returns {object} connection
*/
getConnection () {
return this.cloudConnection
}
/**
* Empties the connection pool.
*
* @returns {ConnectionPool}
*/
empty (callback) {
super.empty(() => {
this.cloudConnection = null
callback()
})
}
/**
* Update the ConnectionPool with new connections.
*
* @param {array} array of connections
* @returns {ConnectionPool}
*/
update (connections) {
super.update(connections)
this.cloudConnection = this.connections[0]
return this
}
}
module.exports = CloudConnectionPool

232
lib/pool/ConnectionPool.js Normal file
View File

@ -0,0 +1,232 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
'use strict'
const BaseConnectionPool = require('./BaseConnectionPool')
const assert = require('assert')
const debug = require('debug')('elasticsearch')
const Connection = require('../Connection')
const noop = () => {}
class ConnectionPool extends BaseConnectionPool {
constructor (opts = {}) {
super(opts)
this.dead = []
// the resurrect timeout is 60s
this.resurrectTimeout = 1000 * 60
// number of consecutive failures after which
// the timeout doesn't increase
this.resurrectTimeoutCutoff = 5
this.pingTimeout = opts.pingTimeout
this._sniffEnabled = opts.sniffEnabled || false
const resurrectStrategy = opts.resurrectStrategy || 'ping'
this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy]
assert(
this.resurrectStrategy != null,
`Invalid resurrection strategy: '${resurrectStrategy}'`
)
}
/**
* Marks a connection as 'alive'.
* If needed removes the connection from the dead list
* and then resets the `deadCount`.
* If sniffing is not enabled and there is only
* one node, this method is a noop.
*
* @param {object} connection
*/
markAlive (connection) {
if (this._sniffEnabled === false && this.size === 1) return this
const { id } = connection
debug(`Marking as 'alive' connection '${id}'`)
const index = this.dead.indexOf(id)
if (index > -1) this.dead.splice(index, 1)
connection.status = Connection.statuses.ALIVE
connection.deadCount = 0
connection.resurrectTimeout = 0
return this
}
/**
* Marks a connection as 'dead'.
* If needed adds the connection to the dead list
* and then increments the `deadCount`.
* If sniffing is not enabled and there is only
* one node, this method is a noop.
*
* @param {object} connection
*/
markDead (connection) {
if (this._sniffEnabled === false && this.size === 1) return this
const { id } = connection
debug(`Marking as 'dead' connection '${id}'`)
if (this.dead.indexOf(id) === -1) {
this.dead.push(id)
}
connection.status = Connection.statuses.DEAD
connection.deadCount++
// resurrectTimeout formula:
// `resurrectTimeout * 2 ** min(deadCount - 1, resurrectTimeoutCutoff)`
connection.resurrectTimeout = Date.now() + this.resurrectTimeout * Math.pow(
2, Math.min(connection.deadCount - 1, this.resurrectTimeoutCutoff)
)
// sort the dead list in ascending order
// based on the resurrectTimeout
this.dead.sort((a, b) => {
const conn1 = this.connections.find(c => c.id === a)
const conn2 = this.connections.find(c => c.id === b)
return conn1.resurrectTimeout - conn2.resurrectTimeout
})
return this
}
/**
* If enabled, tries to resurrect a connection with the given
* resurrect strategy ('ping', 'optimistic', 'none').
*
* @param {object} { now, requestId }
* @param {function} callback (isAlive, connection)
*/
resurrect (opts, callback = noop) {
if (this.resurrectStrategy === 0 || this.dead.length === 0) {
debug('Nothing to resurrect')
callback(null, null)
return
}
// the dead list is sorted in ascending order based on the timeout
// so the first element will always be the one with the smaller timeout
const connection = this.connections.find(c => c.id === this.dead[0])
if ((opts.now || Date.now()) < connection.resurrectTimeout) {
debug('Nothing to resurrect')
callback(null, null)
return
}
const { id } = connection
// ping strategy
if (this.resurrectStrategy === 1) {
connection.request({
method: 'HEAD',
path: '/',
timeout: this.pingTimeout
}, (err, response) => {
var isAlive = true
const statusCode = response !== null ? response.statusCode : 0
if (err != null ||
(statusCode === 502 || statusCode === 503 || statusCode === 504)) {
debug(`Resurrect: connection '${id}' is still dead`)
this.markDead(connection)
isAlive = false
} else {
debug(`Resurrect: connection '${id}' is now alive`)
this.markAlive(connection)
}
this.emit('resurrect', null, {
strategy: 'ping',
name: opts.name,
request: { id: opts.requestId },
isAlive,
connection
})
callback(isAlive, connection)
})
// optimistic strategy
} else {
debug(`Resurrect: optimistic resurrection for connection '${id}'`)
this.dead.splice(this.dead.indexOf(id), 1)
connection.status = Connection.statuses.ALIVE
this.emit('resurrect', null, {
strategy: 'optimistic',
name: opts.name,
request: { id: opts.requestId },
isAlive: true,
connection
})
// eslint-disable-next-line standard/no-callback-literal
callback(true, connection)
}
}
/**
* Returns an alive connection if present,
* otherwise returns null.
* By default it filters the `master` only nodes.
* It uses the selector to choose which
* connection return.
*
* @param {object} options (filter and selector)
* @returns {object|null} connection
*/
getConnection (opts = {}) {
const filter = opts.filter || (() => true)
const selector = opts.selector || (c => c[0])
this.resurrect({
now: opts.now,
requestId: opts.requestId,
name: opts.name
})
// TODO: can we cache this?
const connections = []
for (var i = 0; i < this.size; i++) {
const connection = this.connections[i]
if (connection.status === Connection.statuses.ALIVE) {
if (filter(connection) === true) {
connections.push(connection)
}
}
}
if (connections.length === 0) return null
return selector(connections)
}
/**
* Empties the connection pool.
*
* @returns {ConnectionPool}
*/
empty (callback) {
super.empty(() => {
this.dead = []
callback()
})
}
/**
* Update the ConnectionPool with new connections.
*
* @param {array} array of connections
* @returns {ConnectionPool}
*/
update (connections) {
super.update(connections)
for (var i = 0; i < this.dead.length; i++) {
if (this.connections.find(c => c.id === this.dead[i]) === undefined) {
this.dead.splice(i, 1)
}
}
return this
}
}
ConnectionPool.resurrectStrategies = {
none: 0,
ping: 1,
optimistic: 2
}
module.exports = ConnectionPool

196
lib/pool/index.d.ts vendored Normal file
View File

@ -0,0 +1,196 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
/// <reference types="node" />
import { SecureContextOptions } from 'tls';
import Connection, { AgentOptions } from '../Connection';
import { nodeFilterFn, nodeSelectorFn } from '../Transport';
interface BaseConnectionPoolOptions {
ssl?: SecureContextOptions;
agent?: AgentOptions;
auth?: BasicAuth | ApiKeyAuth;
emit: (event: string | symbol, ...args: any[]) => boolean;
pingTimeout?: number;
Connection: typeof Connection;
resurrectStrategy?: string;
}
interface ConnectionPoolOptions extends BaseConnectionPoolOptions {
pingTimeout?: number;
resurrectStrategy?: string;
sniffEnabled?: boolean;
}
interface getConnectionOptions {
filter?: nodeFilterFn;
selector?: nodeSelectorFn;
requestId?: string | number;
name?: string;
now?: number;
}
interface ApiKeyAuth {
apiKey:
| string
| {
id: string;
api_key: string;
}
}
interface BasicAuth {
username: string;
password: string;
}
interface resurrectOptions {
now?: number;
requestId: string;
name: string;
}
interface ResurrectEvent {
strategy: string;
isAlive: boolean;
connection: Connection;
name: string;
request: {
id: any;
};
}
declare class BaseConnectionPool {
connections: Connection[];
_ssl: SecureContextOptions | null;
_agent: AgentOptions | null;
auth: BasicAuth | ApiKeyAuth;
Connection: typeof Connection;
constructor(opts?: BaseConnectionPoolOptions);
/**
* Marks a connection as 'alive'.
* If needed removes the connection from the dead list
* and then resets the `deadCount`.
*
* @param {object} connection
*/
markAlive(connection: Connection): this;
/**
* Marks a connection as 'dead'.
* If needed adds the connection to the dead list
* and then increments the `deadCount`.
*
* @param {object} connection
*/
markDead(connection: Connection): this;
/**
* Returns an alive connection if present,
* otherwise returns null.
* By default it filters the `master` only nodes.
* It uses the selector to choose which
* connection return.
*
* @param {object} options (filter and selector)
* @returns {object|null} connection
*/
getConnection(opts?: getConnectionOptions): Connection | null;
/**
* Adds a new connection to the pool.
*
* @param {object|string} host
* @returns {ConnectionPool}
*/
addConnection(opts: any): Connection;
/**
* Removes a new connection to the pool.
*
* @param {object} connection
* @returns {ConnectionPool}
*/
removeConnection(connection: Connection): this;
/**
* Empties the connection pool.
*
* @returns {ConnectionPool}
*/
empty(): this;
/**
* Update the ConnectionPool with new connections.
*
* @param {array} array of connections
* @returns {ConnectionPool}
*/
update(connections: any[]): this;
/**
* Transforms the nodes objects to a host object.
*
* @param {object} nodes
* @returns {array} hosts
*/
nodesToHost(nodes: any, protocol: string): any[];
/**
* Transforms an url string to a host object
*
* @param {string} url
* @returns {object} host
*/
urlToHost(url: string): any;
}
declare class ConnectionPool extends BaseConnectionPool {
static resurrectStrategies: {
none: number;
ping: number;
optimistic: number;
};
dead: string[];
_sniffEnabled: boolean;
resurrectTimeout: number;
resurrectTimeoutCutoff: number;
pingTimeout: number;
resurrectStrategy: number;
constructor(opts?: ConnectionPoolOptions);
/**
* If enabled, tries to resurrect a connection with the given
* resurrect strategy ('ping', 'optimistic', 'none').
*
* @param {object} { now, requestId, name }
* @param {function} callback (isAlive, connection)
*/
resurrect(opts: resurrectOptions, callback?: (isAlive: boolean | null, connection: Connection | null) => void): void;
}
declare class CloudConnectionPool extends BaseConnectionPool {
cloudConnection: Connection | null
constructor(opts?: BaseConnectionPoolOptions);
getConnection(): Connection;
}
declare function defaultNodeFilter(node: Connection): boolean;
declare function roundRobinSelector(): (connections: Connection[]) => Connection;
declare function randomSelector(connections: Connection[]): Connection;
declare const internals: {
defaultNodeFilter: typeof defaultNodeFilter;
roundRobinSelector: typeof roundRobinSelector;
randomSelector: typeof randomSelector;
};
export {
// Interfaces
ConnectionPoolOptions,
getConnectionOptions,
ApiKeyAuth,
BasicAuth,
internals,
resurrectOptions,
ResurrectEvent,
// Classes
BaseConnectionPool,
ConnectionPool,
CloudConnectionPool
};

15
lib/pool/index.js Normal file
View File

@ -0,0 +1,15 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
'use strict'
const BaseConnectionPool = require('./BaseConnectionPool')
const ConnectionPool = require('./ConnectionPool')
const CloudConnectionPool = require('./CloudConnectionPool')
module.exports = {
BaseConnectionPool,
ConnectionPool,
CloudConnectionPool
}