diff --git a/index.d.ts b/index.d.ts index 65e7023c3..1357bb2ae 100644 --- a/index.d.ts +++ b/index.d.ts @@ -18,7 +18,7 @@ import Transport, { } from './lib/Transport'; import { URL } from 'url'; import Connection, { AgentOptions, agentFn } from './lib/Connection'; -import ConnectionPool, { ResurrectEvent, BasicAuth, ApiKeyAuth } from './lib/ConnectionPool'; +import { ConnectionPool, ResurrectEvent, BasicAuth, ApiKeyAuth } from './lib/pool'; import Serializer from './lib/Serializer'; import * as RequestParams from './api/requestParams'; import * as errors from './lib/errors'; diff --git a/index.js b/index.js index e3d6dcbdb..6fc3e7708 100644 --- a/index.js +++ b/index.js @@ -9,7 +9,7 @@ const { URL } = require('url') const debug = require('debug')('elasticsearch') const Transport = require('./lib/Transport') const Connection = require('./lib/Connection') -const ConnectionPool = require('./lib/ConnectionPool') +const { ConnectionPool, CloudConnectionPool } = require('./lib/pool') const Serializer = require('./lib/Serializer') const errors = require('./lib/errors') const { ConfigurationError } = errors @@ -59,9 +59,9 @@ class Client extends EventEmitter { const options = Object.assign({}, { Connection, - ConnectionPool, Transport, Serializer, + ConnectionPool: opts.cloud ? CloudConnectionPool : ConnectionPool, maxRetries: 3, requestTimeout: 30000, pingTimeout: 3000, diff --git a/lib/Connection.d.ts b/lib/Connection.d.ts index 9f3e6d9cf..527b71fb7 100644 --- a/lib/Connection.d.ts +++ b/lib/Connection.d.ts @@ -6,7 +6,7 @@ import { URL } from 'url'; import { inspect, InspectOptions } from 'util'; -import { ApiKeyAuth, BasicAuth } from './ConnectionPool' +import { ApiKeyAuth, BasicAuth } from './pool' import * as http from 'http'; import { ConnectionOptions as TlsConnectionOptions } from 'tls'; diff --git a/lib/ConnectionPool.js b/lib/ConnectionPool.js deleted file mode 100644 index c1f2d6720..000000000 --- a/lib/ConnectionPool.js +++ /dev/null @@ -1,386 +0,0 @@ -// Licensed to Elasticsearch B.V under one or more agreements. -// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. -// See the LICENSE file in the project root for more information - -'use strict' - -const assert = require('assert') -const { URL } = require('url') -const debug = require('debug')('elasticsearch') -const Connection = require('./Connection') -const noop = () => {} - -class ConnectionPool { - constructor (opts = {}) { - this.connections = new Map() - this.dead = [] - this.selector = opts.selector - this.auth = opts.auth || null - this._ssl = opts.ssl - this._agent = opts.agent - // the resurrect timeout is 60s - this.resurrectTimeout = 1000 * 60 - // number of consecutive failures after which - // the timeout doesn't increase - this.resurrectTimeoutCutoff = 5 - this.pingTimeout = opts.pingTimeout - this.Connection = opts.Connection - this.emit = opts.emit || noop - this._sniffEnabled = opts.sniffEnabled || false - - const resurrectStrategy = opts.resurrectStrategy || 'ping' - this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy] - assert( - this.resurrectStrategy != null, - `Invalid resurrection strategy: '${resurrectStrategy}'` - ) - } - - /** - * Marks a connection as 'alive'. - * If needed removes the connection from the dead list - * and then resets the `deadCount`. - * If sniffing is not enabled and there is only - * one node, this method is a noop. - * - * @param {object} connection - */ - markAlive (connection) { - if (this._sniffEnabled === false && this.connections.size === 1) return - const { id } = connection - debug(`Marking as 'alive' connection '${id}'`) - const index = this.dead.indexOf(id) - if (index > -1) this.dead.splice(index, 1) - connection.status = Connection.statuses.ALIVE - connection.deadCount = 0 - connection.resurrectTimeout = 0 - } - - /** - * Marks a connection as 'dead'. - * If needed adds the connection to the dead list - * and then increments the `deadCount`. - * If sniffing is not enabled and there is only - * one node, this method is a noop. - * - * @param {object} connection - */ - markDead (connection) { - if (this._sniffEnabled === false && this.connections.size === 1) return - const { id } = connection - debug(`Marking as 'dead' connection '${id}'`) - if (this.dead.indexOf(id) === -1) { - this.dead.push(id) - } - connection.status = Connection.statuses.DEAD - connection.deadCount++ - // resurrectTimeout formula: - // `resurrectTimeout * 2 ** min(deadCount - 1, resurrectTimeoutCutoff)` - connection.resurrectTimeout = Date.now() + this.resurrectTimeout * Math.pow( - 2, Math.min(connection.deadCount - 1, this.resurrectTimeoutCutoff) - ) - - // sort the dead list in ascending order - // based on the resurrectTimeout - this.dead.sort((a, b) => { - const conn1 = this.connections.get(a) - const conn2 = this.connections.get(b) - return conn1.resurrectTimeout - conn2.resurrectTimeout - }) - } - - /** - * If enabled, tries to resurrect a connection with the given - * resurrect strategy ('ping', 'optimistic', 'none'). - * - * @param {object} { now, requestId } - * @param {function} callback (isAlive, connection) - */ - resurrect (opts, callback = noop) { - if (this.resurrectStrategy === 0 || this.dead.length === 0) { - debug('Nothing to resurrect') - callback(null, null) - return - } - - // the dead list is sorted in ascending order based on the timeout - // so the first element will always be the one with the smaller timeout - const connection = this.connections.get(this.dead[0]) - if ((opts.now || Date.now()) < connection.resurrectTimeout) { - debug('Nothing to resurrect') - callback(null, null) - return - } - - const { id } = connection - - // ping strategy - if (this.resurrectStrategy === 1) { - connection.request({ - method: 'HEAD', - path: '/', - timeout: this.pingTimeout - }, (err, response) => { - var isAlive = true - const statusCode = response !== null ? response.statusCode : 0 - if (err != null || - (statusCode === 502 || statusCode === 503 || statusCode === 504)) { - debug(`Resurrect: connection '${id}' is still dead`) - this.markDead(connection) - isAlive = false - } else { - debug(`Resurrect: connection '${id}' is now alive`) - this.markAlive(connection) - } - this.emit('resurrect', null, { - strategy: 'ping', - name: opts.name, - request: { id: opts.requestId }, - isAlive, - connection - }) - callback(isAlive, connection) - }) - // optimistic strategy - } else { - debug(`Resurrect: optimistic resurrection for connection '${id}'`) - this.dead.splice(this.dead.indexOf(id), 1) - connection.status = Connection.statuses.ALIVE - this.emit('resurrect', null, { - strategy: 'optimistic', - name: opts.name, - request: { id: opts.requestId }, - isAlive: true, - connection - }) - // eslint-disable-next-line standard/no-callback-literal - callback(true, connection) - } - } - - /** - * Returns an alive connection if present, - * otherwise returns null. - * By default it filters the `master` only nodes. - * It uses the selector to choose which - * connection return. - * - * @param {object} options (filter and selector) - * @returns {object|null} connection - */ - getConnection (opts = {}) { - const filter = opts.filter || (() => true) - const selector = opts.selector || (c => c[0]) - - // TODO: can we cache this? - const connections = [] - for (var connection of this.connections.values()) { - if (connection.status === Connection.statuses.ALIVE) { - if (filter(connection) === true) { - connections.push(connection) - } - } - } - - if (connections.length === 0) return null - - return selector(connections) - } - - /** - * Adds a new connection to the pool. - * - * @param {object|string} host - * @returns {ConnectionPool} - */ - addConnection (opts) { - if (Array.isArray(opts)) { - opts.forEach(o => this.addConnection(o)) - return - } - - if (typeof opts === 'string') { - opts = this.urlToHost(opts) - } - - if (opts.url.username !== '' && opts.url.password !== '') { - opts.auth = { - username: decodeURIComponent(opts.url.username), - password: decodeURIComponent(opts.url.password) - } - } else if (this.auth !== null) { - opts.auth = this.auth - } - - if (opts.ssl == null) opts.ssl = this._ssl - if (opts.agent == null) opts.agent = this._agent - - const connection = new this.Connection(opts) - debug('Adding a new connection', connection) - if (this.connections.has(connection.id)) { - throw new Error(`Connection with id '${connection.id}' is already present`) - } - this.connections.set(connection.id, connection) - return connection - } - - /** - * Removes a new connection to the pool. - * - * @param {object} connection - * @returns {ConnectionPool} - */ - removeConnection (connection) { - debug('Removing connection', connection) - connection.close(noop) - const { id } = connection - this.connections.delete(id) - var index = this.dead.indexOf(id) - if (index > -1) this.dead.splice(index, 1) - return this - } - - /** - * Empties the connection pool. - * - * @returns {ConnectionPool} - */ - empty (callback) { - debug('Emptying the connection pool') - var openConnections = this.connections.size - this.connections.forEach(connection => { - connection.close(() => { - if (--openConnections === 0) { - this.connections = new Map() - this.dead = [] - callback() - } - }) - }) - } - - /** - * Update the ConnectionPool with new connections. - * - * @param {array} array of connections - * @returns {ConnectionPool} - */ - update (connections) { - debug('Updating the connection pool') - for (var i = 0; i < connections.length; i++) { - const connection = connections[i] - // if we already have a given connection in the pool - // we check its status, if is 'alive', we do nothing, - // if 'dead' we mark it as alive, we do not close the old - // one to avoid socket issues - if (this.connections.has(connection.id) === true) { - debug(`The connection with id '${connection.id}' is already present`) - const oldConnection = this.connections.get(connection.id) - if (oldConnection.status === Connection.statuses.DEAD) { - this.markAlive(oldConnection) - } - // in case the user has passed a single url (or an array of urls), - // the connection id will be the full href; to avoid closing valid connections - // because are not present in the pool, we check also the node url, - // and if is already present we update its id with the ES provided one. - } else if (this.connections.has(connection.url.href) === true) { - const oldConnection = this.connections.get(connection.url.href) - this.connections.delete(connection.url.href) - oldConnection.id = connection.id - this.connections.set(connection.id, oldConnection) - if (oldConnection.status === Connection.statuses.DEAD) { - this.markAlive(oldConnection) - } - } else { - this.addConnection(connection) - } - } - - const ids = connections.map(c => c.id) - // remove all the dead connections and old connections - for (const connection of this.connections.values()) { - if (ids.indexOf(connection.id) === -1) { - this.removeConnection(connection) - } - } - - return this - } - - /** - * Transforms the nodes objects to a host object. - * - * @param {object} nodes - * @returns {array} hosts - */ - nodesToHost (nodes, protocol) { - const ids = Object.keys(nodes) - const hosts = [] - - for (var i = 0, len = ids.length; i < len; i++) { - const node = nodes[ids[i]] - // If there is no protocol in - // the `publish_address` new URL will throw - // the publish_address can have two forms: - // - ip:port - // - hostname/ip:port - // if we encounter the second case, we should - // use the hostname instead of the ip - var address = node.http.publish_address - const parts = address.split('/') - // the url is in the form of hostname/ip:port - if (parts.length > 1) { - const hostname = parts[0] - const port = parts[1].match(/((?::))(?:[0-9]+)$/g)[0].slice(1) - address = `${hostname}:${port}` - } - - address = address.slice(0, 4) === 'http' - ? address - : `${protocol}//${address}` - const roles = node.roles.reduce((acc, role) => { - acc[role] = true - return acc - }, {}) - - hosts.push({ - url: new URL(address), - id: ids[i], - roles: Object.assign({ - [Connection.roles.MASTER]: true, - [Connection.roles.DATA]: true, - [Connection.roles.INGEST]: true, - [Connection.roles.ML]: false - }, roles) - }) - } - - return hosts - } - - /** - * Transforms an url string to a host object - * - * @param {string} url - * @returns {object} host - */ - urlToHost (url) { - return { - url: new URL(url) - } - } -} - -ConnectionPool.resurrectStrategies = { - none: 0, - ping: 1, - optimistic: 2 -} - -// https://gist.github.com/guilhermepontes/17ae0cc71fa2b13ea8c20c94c5c35dc4 -// const shuffleArray = arr => arr -// .map(a => [Math.random(), a]) -// .sort((a, b) => a[0] - b[0]) -// .map(a => a[1]) - -module.exports = ConnectionPool diff --git a/lib/Transport.d.ts b/lib/Transport.d.ts index 98ab2d43a..8099de2ea 100644 --- a/lib/Transport.d.ts +++ b/lib/Transport.d.ts @@ -2,7 +2,7 @@ // Elasticsearch B.V licenses this file to you under the Apache 2.0 License. // See the LICENSE file in the project root for more information -import ConnectionPool from './ConnectionPool'; +import { ConnectionPool, CloudConnectionPool } from './pool'; import Connection from './Connection'; import Serializer from './Serializer'; @@ -23,7 +23,7 @@ declare type emitFn = (event: string | symbol, ...args: any[]) => boolean; interface TransportOptions { emit: emitFn & noopFn; - connectionPool: ConnectionPool; + connectionPool: ConnectionPool | CloudConnectionPool; serializer: Serializer; maxRetries: number; requestTimeout: number | string; @@ -113,7 +113,7 @@ export default class Transport { DEFAULT: string; }; emit: emitFn & noopFn; - connectionPool: ConnectionPool; + connectionPool: ConnectionPool | CloudConnectionPool; serializer: Serializer; maxRetries: number; requestTimeout: number; diff --git a/lib/Transport.js b/lib/Transport.js index 76862b98c..7e5b83681 100644 --- a/lib/Transport.js +++ b/lib/Transport.js @@ -314,10 +314,12 @@ class Transport { if (this._sniffEnabled === true && now > this._nextSniff) { this.sniff({ reason: Transport.sniffReasons.SNIFF_INTERVAL, requestId: opts.requestId }) } - this.connectionPool.resurrect({ now, requestId: opts.requestId, name: this.name }) return this.connectionPool.getConnection({ filter: this.nodeFilter, - selector: this.nodeSelector + selector: this.nodeSelector, + requestId: opts.requestId, + name: this.name, + now }) } diff --git a/lib/pool/BaseConnectionPool.js b/lib/pool/BaseConnectionPool.js new file mode 100644 index 000000000..6241ace5f --- /dev/null +++ b/lib/pool/BaseConnectionPool.js @@ -0,0 +1,239 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +'use strict' + +const { URL } = require('url') +const debug = require('debug')('elasticsearch') +const Connection = require('../Connection') +const noop = () => {} + +class BaseConnectionPool { + constructor (opts) { + // list of nodes and weights + this.connections = [] + // how many nodes we have in our scheduler + this.size = this.connections.length + this.Connection = opts.Connection + this.emit = opts.emit || noop + this.auth = opts.auth || null + this._ssl = opts.ssl + this._agent = opts.agent + } + + getConnection () { + throw new Error('getConnection must be implemented') + } + + markAlive () { + return this + } + + markDead () { + return this + } + + /** + * Creates a new connection instance. + */ + createConnection (opts) { + if (typeof opts === 'string') { + opts = this.urlToHost(opts) + } + + if (opts.url.username !== '' && opts.url.password !== '') { + opts.auth = { + username: decodeURIComponent(opts.url.username), + password: decodeURIComponent(opts.url.password) + } + } else if (this.auth !== null) { + opts.auth = this.auth + } + + if (opts.ssl == null) opts.ssl = this._ssl + if (opts.agent == null) opts.agent = this._agent + + const connection = new this.Connection(opts) + + for (const conn of this.connections) { + if (conn.id === connection.id) { + throw new Error(`Connection with id '${connection.id}' is already present`) + } + } + + return connection + } + + /** + * Adds a new connection to the pool. + * + * @param {object|string} host + * @returns {ConnectionPool} + */ + addConnection (opts) { + if (Array.isArray(opts)) { + return opts.forEach(o => this.addConnection(o)) + } + + if (typeof opts === 'string') { + opts = this.urlToHost(opts) + } + + const connectionById = this.connections.find(c => c.id === opts.id) + const connectionByUrl = this.connections.find(c => c.id === opts.url.href) + + if (connectionById || connectionByUrl) { + throw new Error(`Connection with id '${opts.id || opts.url.href}' is already present`) + } + + this.update([...this.connections, opts]) + return this.connections[this.size - 1] + } + + /** + * Removes a new connection to the pool. + * + * @param {object} connection + * @returns {ConnectionPool} + */ + removeConnection (connection) { + debug('Removing connection', connection) + return this.update(this.connections.filter(c => c.id !== connection.id)) + } + + /** + * Empties the connection pool. + * + * @returns {ConnectionPool} + */ + empty (callback) { + debug('Emptying the connection pool') + var openConnections = this.size + this.connections.forEach(connection => { + connection.close(() => { + if (--openConnections === 0) { + this.connections = [] + this.size = this.connections.length + callback() + } + }) + }) + } + + /** + * Update the ConnectionPool with new connections. + * + * @param {array} array of connections + * @returns {ConnectionPool} + */ + update (nodes) { + debug('Updating the connection pool') + const newConnections = [] + const oldConnections = [] + + for (const node of nodes) { + // if we already have a given connection in the pool + // we mark it as alive and we do not close the connection + // to avoid socket issues + const connectionById = this.connections.find(c => c.id === node.id) + const connectionByUrl = this.connections.find(c => c.id === node.url.href) + if (connectionById) { + debug(`The connection with id '${node.id}' is already present`) + this.markAlive(connectionById) + newConnections.push(connectionById) + // in case the user has passed a single url (or an array of urls), + // the connection id will be the full href; to avoid closing valid connections + // because are not present in the pool, we check also the node url, + // and if is already present we update its id with the ES provided one. + } else if (connectionByUrl) { + connectionByUrl.id = node.id + this.markAlive(connectionByUrl) + newConnections.push(connectionByUrl) + } else { + newConnections.push(this.createConnection(node)) + } + } + + const ids = nodes.map(c => c.id) + // remove all the dead connections and old connections + for (const connection of this.connections) { + if (ids.indexOf(connection.id) === -1) { + oldConnections.push(connection) + } + } + + // close old connections + oldConnections.forEach(connection => connection.close()) + + this.connections = newConnections + this.size = this.connections.length + + return this + } + + /** + * Transforms the nodes objects to a host object. + * + * @param {object} nodes + * @returns {array} hosts + */ + nodesToHost (nodes, protocol) { + const ids = Object.keys(nodes) + const hosts = [] + + for (var i = 0, len = ids.length; i < len; i++) { + const node = nodes[ids[i]] + // If there is no protocol in + // the `publish_address` new URL will throw + // the publish_address can have two forms: + // - ip:port + // - hostname/ip:port + // if we encounter the second case, we should + // use the hostname instead of the ip + var address = node.http.publish_address + const parts = address.split('/') + // the url is in the form of hostname/ip:port + if (parts.length > 1) { + const hostname = parts[0] + const port = parts[1].match(/((?::))(?:[0-9]+)$/g)[0].slice(1) + address = `${hostname}:${port}` + } + + address = address.slice(0, 4) === 'http' + ? address + : `${protocol}//${address}` + const roles = node.roles.reduce((acc, role) => { + acc[role] = true + return acc + }, {}) + + hosts.push({ + url: new URL(address), + id: ids[i], + roles: Object.assign({ + [Connection.roles.MASTER]: true, + [Connection.roles.DATA]: true, + [Connection.roles.INGEST]: true, + [Connection.roles.ML]: false + }, roles) + }) + } + + return hosts + } + + /** + * Transforms an url string to a host object + * + * @param {string} url + * @returns {object} host + */ + urlToHost (url) { + return { + url: new URL(url) + } + } +} + +module.exports = BaseConnectionPool diff --git a/lib/pool/CloudConnectionPool.js b/lib/pool/CloudConnectionPool.js new file mode 100644 index 000000000..0ff5a4da2 --- /dev/null +++ b/lib/pool/CloudConnectionPool.js @@ -0,0 +1,49 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +'use strict' + +const BaseConnectionPool = require('./BaseConnectionPool') + +class CloudConnectionPool extends BaseConnectionPool { + constructor (opts = {}) { + super(opts) + this.cloudConnection = null + } + + /** + * Returns the only cloud connection. + * + * @returns {object} connection + */ + getConnection () { + return this.cloudConnection + } + + /** + * Empties the connection pool. + * + * @returns {ConnectionPool} + */ + empty (callback) { + super.empty(() => { + this.cloudConnection = null + callback() + }) + } + + /** + * Update the ConnectionPool with new connections. + * + * @param {array} array of connections + * @returns {ConnectionPool} + */ + update (connections) { + super.update(connections) + this.cloudConnection = this.connections[0] + return this + } +} + +module.exports = CloudConnectionPool diff --git a/lib/pool/ConnectionPool.js b/lib/pool/ConnectionPool.js new file mode 100644 index 000000000..8114ced3e --- /dev/null +++ b/lib/pool/ConnectionPool.js @@ -0,0 +1,232 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +'use strict' + +const BaseConnectionPool = require('./BaseConnectionPool') +const assert = require('assert') +const debug = require('debug')('elasticsearch') +const Connection = require('../Connection') +const noop = () => {} + +class ConnectionPool extends BaseConnectionPool { + constructor (opts = {}) { + super(opts) + + this.dead = [] + // the resurrect timeout is 60s + this.resurrectTimeout = 1000 * 60 + // number of consecutive failures after which + // the timeout doesn't increase + this.resurrectTimeoutCutoff = 5 + this.pingTimeout = opts.pingTimeout + this._sniffEnabled = opts.sniffEnabled || false + + const resurrectStrategy = opts.resurrectStrategy || 'ping' + this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy] + assert( + this.resurrectStrategy != null, + `Invalid resurrection strategy: '${resurrectStrategy}'` + ) + } + + /** + * Marks a connection as 'alive'. + * If needed removes the connection from the dead list + * and then resets the `deadCount`. + * If sniffing is not enabled and there is only + * one node, this method is a noop. + * + * @param {object} connection + */ + markAlive (connection) { + if (this._sniffEnabled === false && this.size === 1) return this + const { id } = connection + debug(`Marking as 'alive' connection '${id}'`) + const index = this.dead.indexOf(id) + if (index > -1) this.dead.splice(index, 1) + connection.status = Connection.statuses.ALIVE + connection.deadCount = 0 + connection.resurrectTimeout = 0 + return this + } + + /** + * Marks a connection as 'dead'. + * If needed adds the connection to the dead list + * and then increments the `deadCount`. + * If sniffing is not enabled and there is only + * one node, this method is a noop. + * + * @param {object} connection + */ + markDead (connection) { + if (this._sniffEnabled === false && this.size === 1) return this + const { id } = connection + debug(`Marking as 'dead' connection '${id}'`) + if (this.dead.indexOf(id) === -1) { + this.dead.push(id) + } + connection.status = Connection.statuses.DEAD + connection.deadCount++ + // resurrectTimeout formula: + // `resurrectTimeout * 2 ** min(deadCount - 1, resurrectTimeoutCutoff)` + connection.resurrectTimeout = Date.now() + this.resurrectTimeout * Math.pow( + 2, Math.min(connection.deadCount - 1, this.resurrectTimeoutCutoff) + ) + + // sort the dead list in ascending order + // based on the resurrectTimeout + this.dead.sort((a, b) => { + const conn1 = this.connections.find(c => c.id === a) + const conn2 = this.connections.find(c => c.id === b) + return conn1.resurrectTimeout - conn2.resurrectTimeout + }) + + return this + } + + /** + * If enabled, tries to resurrect a connection with the given + * resurrect strategy ('ping', 'optimistic', 'none'). + * + * @param {object} { now, requestId } + * @param {function} callback (isAlive, connection) + */ + resurrect (opts, callback = noop) { + if (this.resurrectStrategy === 0 || this.dead.length === 0) { + debug('Nothing to resurrect') + callback(null, null) + return + } + + // the dead list is sorted in ascending order based on the timeout + // so the first element will always be the one with the smaller timeout + const connection = this.connections.find(c => c.id === this.dead[0]) + if ((opts.now || Date.now()) < connection.resurrectTimeout) { + debug('Nothing to resurrect') + callback(null, null) + return + } + + const { id } = connection + + // ping strategy + if (this.resurrectStrategy === 1) { + connection.request({ + method: 'HEAD', + path: '/', + timeout: this.pingTimeout + }, (err, response) => { + var isAlive = true + const statusCode = response !== null ? response.statusCode : 0 + if (err != null || + (statusCode === 502 || statusCode === 503 || statusCode === 504)) { + debug(`Resurrect: connection '${id}' is still dead`) + this.markDead(connection) + isAlive = false + } else { + debug(`Resurrect: connection '${id}' is now alive`) + this.markAlive(connection) + } + this.emit('resurrect', null, { + strategy: 'ping', + name: opts.name, + request: { id: opts.requestId }, + isAlive, + connection + }) + callback(isAlive, connection) + }) + // optimistic strategy + } else { + debug(`Resurrect: optimistic resurrection for connection '${id}'`) + this.dead.splice(this.dead.indexOf(id), 1) + connection.status = Connection.statuses.ALIVE + this.emit('resurrect', null, { + strategy: 'optimistic', + name: opts.name, + request: { id: opts.requestId }, + isAlive: true, + connection + }) + // eslint-disable-next-line standard/no-callback-literal + callback(true, connection) + } + } + + /** + * Returns an alive connection if present, + * otherwise returns null. + * By default it filters the `master` only nodes. + * It uses the selector to choose which + * connection return. + * + * @param {object} options (filter and selector) + * @returns {object|null} connection + */ + getConnection (opts = {}) { + const filter = opts.filter || (() => true) + const selector = opts.selector || (c => c[0]) + + this.resurrect({ + now: opts.now, + requestId: opts.requestId, + name: opts.name + }) + + // TODO: can we cache this? + const connections = [] + for (var i = 0; i < this.size; i++) { + const connection = this.connections[i] + if (connection.status === Connection.statuses.ALIVE) { + if (filter(connection) === true) { + connections.push(connection) + } + } + } + + if (connections.length === 0) return null + + return selector(connections) + } + + /** + * Empties the connection pool. + * + * @returns {ConnectionPool} + */ + empty (callback) { + super.empty(() => { + this.dead = [] + callback() + }) + } + + /** + * Update the ConnectionPool with new connections. + * + * @param {array} array of connections + * @returns {ConnectionPool} + */ + update (connections) { + super.update(connections) + + for (var i = 0; i < this.dead.length; i++) { + if (this.connections.find(c => c.id === this.dead[i]) === undefined) { + this.dead.splice(i, 1) + } + } + + return this + } +} + +ConnectionPool.resurrectStrategies = { + none: 0, + ping: 1, + optimistic: 2 +} + +module.exports = ConnectionPool diff --git a/lib/ConnectionPool.d.ts b/lib/pool/index.d.ts similarity index 70% rename from lib/ConnectionPool.d.ts rename to lib/pool/index.d.ts index b1e77f0fc..16894eec7 100644 --- a/lib/ConnectionPool.d.ts +++ b/lib/pool/index.d.ts @@ -5,24 +5,34 @@ /// import { SecureContextOptions } from 'tls'; -import Connection, { AgentOptions } from './Connection'; -import { nodeFilterFn, nodeSelectorFn } from './Transport'; +import Connection, { AgentOptions } from '../Connection'; +import { nodeFilterFn, nodeSelectorFn } from '../Transport'; -interface ConnectionPoolOptions { +interface BaseConnectionPoolOptions { ssl?: SecureContextOptions; agent?: AgentOptions; - auth: BasicAuth | ApiKeyAuth; + auth?: BasicAuth | ApiKeyAuth; + emit: (event: string | symbol, ...args: any[]) => boolean; pingTimeout?: number; Connection: typeof Connection; resurrectStrategy?: string; } -export interface getConnectionOptions { - filter?: nodeFilterFn; - selector?: nodeSelectorFn; +interface ConnectionPoolOptions extends BaseConnectionPoolOptions { + pingTimeout?: number; + resurrectStrategy?: string; + sniffEnabled?: boolean; } -export interface ApiKeyAuth { +interface getConnectionOptions { + filter?: nodeFilterFn; + selector?: nodeSelectorFn; + requestId?: string | number; + name?: string; + now?: number; +} + +interface ApiKeyAuth { apiKey: | string | { @@ -31,18 +41,18 @@ export interface ApiKeyAuth { } } -export interface BasicAuth { +interface BasicAuth { username: string; password: string; } -export interface resurrectOptions { +interface resurrectOptions { now?: number; requestId: string; name: string; } -export interface ResurrectEvent { +interface ResurrectEvent { strategy: string; isAlive: boolean; connection: Connection; @@ -52,24 +62,14 @@ export interface ResurrectEvent { }; } -export default class ConnectionPool { - static resurrectStrategies: { - none: number; - ping: number; - optimistic: number; - }; - connections: any; - dead: string[]; + +declare class BaseConnectionPool { + connections: Connection[]; _ssl: SecureContextOptions | null; _agent: AgentOptions | null; - _sniffEnabled: boolean; - resurrectTimeout: number; - resurrectTimeoutCutoff: number; - pingTimeout: number; auth: BasicAuth | ApiKeyAuth; Connection: typeof Connection; - resurrectStrategy: number; - constructor(opts?: ConnectionPoolOptions); + constructor(opts?: BaseConnectionPoolOptions); /** * Marks a connection as 'alive'. * If needed removes the connection from the dead list @@ -77,7 +77,7 @@ export default class ConnectionPool { * * @param {object} connection */ - markAlive(connection: Connection): void; + markAlive(connection: Connection): this; /** * Marks a connection as 'dead'. * If needed adds the connection to the dead list @@ -85,15 +85,7 @@ export default class ConnectionPool { * * @param {object} connection */ - markDead(connection: Connection): void; - /** - * If enabled, tries to resurrect a connection with the given - * resurrect strategy ('ping', 'optimistic', 'none'). - * - * @param {object} { now, requestId, name } - * @param {function} callback (isAlive, connection) - */ - resurrect(opts: resurrectOptions, callback?: (isAlive: boolean | null, connection: Connection | null) => void): void; + markDead(connection: Connection): this; /** * Returns an alive connection if present, * otherwise returns null. @@ -111,27 +103,27 @@ export default class ConnectionPool { * @param {object|string} host * @returns {ConnectionPool} */ - addConnection(opts: any): Connection | void; + addConnection(opts: any): Connection; /** * Removes a new connection to the pool. * * @param {object} connection * @returns {ConnectionPool} */ - removeConnection(connection: Connection): ConnectionPool; + removeConnection(connection: Connection): this; /** * Empties the connection pool. * * @returns {ConnectionPool} */ - empty(): ConnectionPool; + empty(): this; /** * Update the ConnectionPool with new connections. * * @param {array} array of connections * @returns {ConnectionPool} */ - update(connections: Connection[]): ConnectionPool; + update(connections: any[]): this; /** * Transforms the nodes objects to a host object. * @@ -148,14 +140,57 @@ export default class ConnectionPool { urlToHost(url: string): any; } +declare class ConnectionPool extends BaseConnectionPool { + static resurrectStrategies: { + none: number; + ping: number; + optimistic: number; + }; + dead: string[]; + _sniffEnabled: boolean; + resurrectTimeout: number; + resurrectTimeoutCutoff: number; + pingTimeout: number; + resurrectStrategy: number; + constructor(opts?: ConnectionPoolOptions); + + /** + * If enabled, tries to resurrect a connection with the given + * resurrect strategy ('ping', 'optimistic', 'none'). + * + * @param {object} { now, requestId, name } + * @param {function} callback (isAlive, connection) + */ + resurrect(opts: resurrectOptions, callback?: (isAlive: boolean | null, connection: Connection | null) => void): void; +} + +declare class CloudConnectionPool extends BaseConnectionPool { + cloudConnection: Connection | null + constructor(opts?: BaseConnectionPoolOptions); + getConnection(): Connection; +} + declare function defaultNodeFilter(node: Connection): boolean; declare function roundRobinSelector(): (connections: Connection[]) => Connection; declare function randomSelector(connections: Connection[]): Connection; -export declare const internals: { +declare const internals: { defaultNodeFilter: typeof defaultNodeFilter; roundRobinSelector: typeof roundRobinSelector; randomSelector: typeof randomSelector; }; -export {}; +export { + // Interfaces + ConnectionPoolOptions, + getConnectionOptions, + ApiKeyAuth, + BasicAuth, + internals, + resurrectOptions, + ResurrectEvent, + // Classes + BaseConnectionPool, + ConnectionPool, + CloudConnectionPool +}; diff --git a/lib/pool/index.js b/lib/pool/index.js new file mode 100644 index 000000000..1cdf1ed4c --- /dev/null +++ b/lib/pool/index.js @@ -0,0 +1,15 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +'use strict' + +const BaseConnectionPool = require('./BaseConnectionPool') +const ConnectionPool = require('./ConnectionPool') +const CloudConnectionPool = require('./CloudConnectionPool') + +module.exports = { + BaseConnectionPool, + ConnectionPool, + CloudConnectionPool +} diff --git a/package.json b/package.json index 07de66a04..38126cdf7 100644 --- a/package.json +++ b/package.json @@ -22,7 +22,7 @@ "test:integration": "tap test/integration/index.js -T --no-coverage", "test:integration:report": "npm run test:integration | tap-mocha-reporter xunit > $WORKSPACE/test-report-junit.xml", "test:types": "tsc --project ./test/types/tsconfig.json", - "test:coverage": "nyc npm run test:unit && nyc report --reporter=text-lcov > coverage.lcov && codecov", + "test:coverage": "nyc tap test/unit/*.test.js test/behavior/*.test.js -t 300 && nyc report --reporter=text-lcov > coverage.lcov && codecov", "lint": "standard", "lint:fix": "standard --fix", "ci": "npm run license-checker && npm test && npm run test:integration && npm run test:coverage", diff --git a/test/behavior/sniff.test.js b/test/behavior/sniff.test.js index 9377496b1..e2d6638f3 100644 --- a/test/behavior/sniff.test.js +++ b/test/behavior/sniff.test.js @@ -26,7 +26,7 @@ test('Should update the connection pool', t => { const client = new Client({ node: nodes[Object.keys(nodes)[0]].url }) - t.strictEqual(client.connectionPool.connections.size, 1) + t.strictEqual(client.connectionPool.size, 1) client.on(events.SNIFF, (err, request) => { t.error(err) @@ -72,7 +72,7 @@ test('Should update the connection pool', t => { } } - t.strictEqual(client.connectionPool.connections.size, 4) + t.strictEqual(client.connectionPool.size, 4) }) t.teardown(shutdown) }) @@ -85,7 +85,7 @@ test('Should handle hostnames in publish_address', t => { const client = new Client({ node: nodes[Object.keys(nodes)[0]].url }) - t.strictEqual(client.connectionPool.connections.size, 1) + t.strictEqual(client.connectionPool.size, 1) client.on(events.SNIFF, (err, request) => { t.error(err) @@ -105,7 +105,7 @@ test('Should handle hostnames in publish_address', t => { t.strictEqual(hosts[i].url.hostname, 'localhost') } - t.strictEqual(client.connectionPool.connections.size, 4) + t.strictEqual(client.connectionPool.size, 4) }) t.teardown(shutdown) }) @@ -125,13 +125,13 @@ test('Sniff interval', t => { t.error(err) const { hosts, reason } = request.meta.sniff t.strictEqual( - client.connectionPool.connections.size, + client.connectionPool.size, hosts.length ) t.strictEqual(reason, Transport.sniffReasons.SNIFF_INTERVAL) }) - t.strictEqual(client.connectionPool.connections.size, 1) + t.strictEqual(client.connectionPool.size, 1) setTimeout(() => client.info(t.error), 60) setTimeout(() => { @@ -141,7 +141,7 @@ test('Sniff interval', t => { }, 150) setTimeout(() => { - t.strictEqual(client.connectionPool.connections.size, 3) + t.strictEqual(client.connectionPool.size, 3) }, 200) t.teardown(shutdown) @@ -161,13 +161,13 @@ test('Sniff on start', t => { t.error(err) const { hosts, reason } = request.meta.sniff t.strictEqual( - client.connectionPool.connections.size, + client.connectionPool.size, hosts.length ) t.strictEqual(reason, Transport.sniffReasons.SNIFF_ON_START) }) - t.strictEqual(client.connectionPool.connections.size, 1) + t.strictEqual(client.connectionPool.size, 1) t.teardown(shutdown) }) }) @@ -190,11 +190,11 @@ test('Should not close living connections', t => { Connection: MyConnection }) - t.strictEqual(client.connectionPool.connections.size, 1) + t.strictEqual(client.connectionPool.size, 1) client.transport.sniff((err, hosts) => { t.error(err) t.strictEqual( - client.connectionPool.connections.size, + client.connectionPool.size, hosts.length ) }) @@ -228,13 +228,13 @@ test('Sniff on connection fault', t => { Connection: MyConnection }) - t.strictEqual(client.connectionPool.connections.size, 2) + t.strictEqual(client.connectionPool.size, 2) // this event will be triggered by the connection fault client.on(events.SNIFF, (err, request) => { t.error(err) const { hosts, reason } = request.meta.sniff t.strictEqual( - client.connectionPool.connections.size, + client.connectionPool.size, hosts.length ) t.strictEqual(reason, Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT) diff --git a/test/unit/base-connection-pool.test.js b/test/unit/base-connection-pool.test.js new file mode 100644 index 000000000..74d4ff3b7 --- /dev/null +++ b/test/unit/base-connection-pool.test.js @@ -0,0 +1,490 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +'use strict' + +const { test } = require('tap') +const { URL } = require('url') +const BaseConnectionPool = require('../../lib/pool/BaseConnectionPool') +const Connection = require('../../lib/Connection') + +test('API', t => { + t.test('addConnection', t => { + const pool = new BaseConnectionPool({ Connection }) + const href = 'http://localhost:9200/' + pool.addConnection(href) + t.ok(pool.connections.find(c => c.id === href) instanceof Connection) + t.strictEqual(pool.connections.find(c => c.id === href).status, Connection.statuses.ALIVE) + t.end() + }) + + t.test('addConnection should throw with two connections with the same id', t => { + const pool = new BaseConnectionPool({ Connection }) + const href = 'http://localhost:9200/' + pool.addConnection(href) + try { + pool.addConnection(href) + t.fail('Should throw') + } catch (err) { + t.is(err.message, `Connection with id '${href}' is already present`) + } + t.end() + }) + + t.test('addConnection should handle not-friendly url parameters for user and password', t => { + const pool = new BaseConnectionPool({ Connection }) + const href = 'http://us"er:p@assword@localhost:9200/' + pool.addConnection(href) + const conn = pool.connections[0] + t.strictEqual(conn.url.username, 'us%22er') + t.strictEqual(conn.url.password, 'p%40assword') + t.match(conn.headers, { + authorization: 'Basic ' + Buffer.from('us"er:p@assword').toString('base64') + }) + t.end() + }) + + t.test('markDead', t => { + const pool = new BaseConnectionPool({ Connection, sniffEnabled: true }) + const href = 'http://localhost:9200/' + var connection = pool.addConnection(href) + t.same(pool.markDead(connection), pool) + connection = pool.connections.find(c => c.id === href) + t.strictEqual(connection.status, Connection.statuses.ALIVE) + t.end() + }) + + t.test('markAlive', t => { + const pool = new BaseConnectionPool({ Connection, sniffEnabled: true }) + const href = 'http://localhost:9200/' + var connection = pool.addConnection(href) + t.same(pool.markAlive(connection), pool) + connection = pool.connections.find(c => c.id === href) + t.strictEqual(connection.status, Connection.statuses.ALIVE) + t.end() + }) + + t.test('getConnection should throw', t => { + const pool = new BaseConnectionPool({ Connection }) + const href = 'http://localhost:9200/' + pool.addConnection(href) + try { + pool.getConnection() + t.fail('Should fail') + } catch (err) { + t.is(err.message, 'getConnection must be implemented') + } + t.end() + }) + + t.test('removeConnection', t => { + const pool = new BaseConnectionPool({ Connection }) + const href = 'http://localhost:9200/' + var connection = pool.addConnection(href) + pool.removeConnection(connection) + t.strictEqual(pool.size, 0) + t.end() + }) + + t.test('empty', t => { + const pool = new BaseConnectionPool({ Connection }) + pool.addConnection('http://localhost:9200/') + pool.addConnection('http://localhost:9201/') + pool.empty(() => { + t.strictEqual(pool.size, 0) + t.end() + }) + }) + + t.test('urlToHost', t => { + const pool = new BaseConnectionPool({ Connection }) + const url = 'http://localhost:9200' + t.deepEqual( + pool.urlToHost(url), + { url: new URL(url) } + ) + t.end() + }) + + t.test('nodesToHost', t => { + t.test('publish_address as ip address (IPv4)', t => { + const pool = new BaseConnectionPool({ Connection }) + const nodes = { + a1: { + http: { + publish_address: '127.0.0.1:9200' + }, + roles: ['master', 'data', 'ingest'] + }, + a2: { + http: { + publish_address: '127.0.0.1:9201' + }, + roles: ['master', 'data', 'ingest'] + } + } + + t.deepEqual(pool.nodesToHost(nodes, 'http:'), [{ + url: new URL('http://127.0.0.1:9200'), + id: 'a1', + roles: { + master: true, + data: true, + ingest: true, + ml: false + } + }, { + url: new URL('http://127.0.0.1:9201'), + id: 'a2', + roles: { + master: true, + data: true, + ingest: true, + ml: false + } + }]) + + t.strictEqual(pool.nodesToHost(nodes, 'http:')[0].url.host, '127.0.0.1:9200') + t.strictEqual(pool.nodesToHost(nodes, 'http:')[1].url.host, '127.0.0.1:9201') + t.end() + }) + + t.test('publish_address as ip address (IPv6)', t => { + const pool = new BaseConnectionPool({ Connection }) + const nodes = { + a1: { + http: { + publish_address: '[::1]:9200' + }, + roles: ['master', 'data', 'ingest'] + }, + a2: { + http: { + publish_address: '[::1]:9201' + }, + roles: ['master', 'data', 'ingest'] + } + } + + t.deepEqual(pool.nodesToHost(nodes, 'http:'), [{ + url: new URL('http://[::1]:9200'), + id: 'a1', + roles: { + master: true, + data: true, + ingest: true, + ml: false + } + }, { + url: new URL('http://[::1]:9201'), + id: 'a2', + roles: { + master: true, + data: true, + ingest: true, + ml: false + } + }]) + + t.strictEqual(pool.nodesToHost(nodes, 'http:')[0].url.host, '[::1]:9200') + t.strictEqual(pool.nodesToHost(nodes, 'http:')[1].url.host, '[::1]:9201') + t.end() + }) + + t.test('publish_address as host/ip (IPv4)', t => { + const pool = new BaseConnectionPool({ Connection }) + const nodes = { + a1: { + http: { + publish_address: 'example.com/127.0.0.1:9200' + }, + roles: ['master', 'data', 'ingest'] + }, + a2: { + http: { + publish_address: 'example.com/127.0.0.1:9201' + }, + roles: ['master', 'data', 'ingest'] + } + } + + t.deepEqual(pool.nodesToHost(nodes, 'http:'), [{ + url: new URL('http://example.com:9200'), + id: 'a1', + roles: { + master: true, + data: true, + ingest: true, + ml: false + } + }, { + url: new URL('http://example.com:9201'), + id: 'a2', + roles: { + master: true, + data: true, + ingest: true, + ml: false + } + }]) + + t.strictEqual(pool.nodesToHost(nodes, 'http:')[0].url.host, 'example.com:9200') + t.strictEqual(pool.nodesToHost(nodes, 'http:')[1].url.host, 'example.com:9201') + t.end() + }) + + t.test('publish_address as host/ip (IPv6)', t => { + const pool = new BaseConnectionPool({ Connection }) + const nodes = { + a1: { + http: { + publish_address: 'example.com/[::1]:9200' + }, + roles: ['master', 'data', 'ingest'] + }, + a2: { + http: { + publish_address: 'example.com/[::1]:9201' + }, + roles: ['master', 'data', 'ingest'] + } + } + + t.deepEqual(pool.nodesToHost(nodes, 'http:'), [{ + url: new URL('http://example.com:9200'), + id: 'a1', + roles: { + master: true, + data: true, + ingest: true, + ml: false + } + }, { + url: new URL('http://example.com:9201'), + id: 'a2', + roles: { + master: true, + data: true, + ingest: true, + ml: false + } + }]) + + t.strictEqual(pool.nodesToHost(nodes, 'http:')[0].url.host, 'example.com:9200') + t.strictEqual(pool.nodesToHost(nodes, 'http:')[1].url.host, 'example.com:9201') + t.end() + }) + + t.test('Should use the configure protocol', t => { + const pool = new BaseConnectionPool({ Connection }) + const nodes = { + a1: { + http: { + publish_address: 'example.com/127.0.0.1:9200' + }, + roles: ['master', 'data', 'ingest'] + }, + a2: { + http: { + publish_address: 'example.com/127.0.0.1:9201' + }, + roles: ['master', 'data', 'ingest'] + } + } + + t.strictEqual(pool.nodesToHost(nodes, 'https:')[0].url.protocol, 'https:') + t.strictEqual(pool.nodesToHost(nodes, 'http:')[1].url.protocol, 'http:') + t.end() + }) + + t.end() + }) + + t.test('update', t => { + t.test('Should not update existing connections', t => { + t.plan(2) + const pool = new BaseConnectionPool({ Connection }) + pool.addConnection([{ + url: new URL('http://127.0.0.1:9200'), + id: 'a1', + roles: { + master: true, + data: true, + ingest: true + } + }, { + url: new URL('http://127.0.0.1:9201'), + id: 'a2', + roles: { + master: true, + data: true, + ingest: true + } + }]) + + pool.update([{ + url: new URL('http://127.0.0.1:9200'), + id: 'a1', + roles: null + }, { + url: new URL('http://127.0.0.1:9201'), + id: 'a2', + roles: null + }]) + + t.ok(pool.connections.find(c => c.id === 'a1').roles !== null) + t.ok(pool.connections.find(c => c.id === 'a2').roles !== null) + }) + + t.test('Should not update existing connections (mark alive)', t => { + t.plan(5) + class CustomBaseConnectionPool extends BaseConnectionPool { + markAlive (connection) { + t.ok('called') + super.markAlive(connection) + } + } + const pool = new CustomBaseConnectionPool({ Connection }) + const conn1 = pool.addConnection({ + url: new URL('http://127.0.0.1:9200'), + id: 'a1', + roles: { + master: true, + data: true, + ingest: true + } + }) + + const conn2 = pool.addConnection({ + url: new URL('http://127.0.0.1:9201'), + id: 'a2', + roles: { + master: true, + data: true, + ingest: true + } + }) + + pool.markDead(conn1) + pool.markDead(conn2) + + pool.update([{ + url: new URL('http://127.0.0.1:9200'), + id: 'a1', + roles: null + }, { + url: new URL('http://127.0.0.1:9201'), + id: 'a2', + roles: null + }]) + + t.ok(pool.connections.find(c => c.id === 'a1').roles !== null) + t.ok(pool.connections.find(c => c.id === 'a2').roles !== null) + }) + + t.test('Should not update existing connections (same url, different id)', t => { + t.plan(3) + class CustomBaseConnectionPool extends BaseConnectionPool { + markAlive (connection) { + t.ok('called') + super.markAlive(connection) + } + } + const pool = new CustomBaseConnectionPool({ Connection }) + pool.addConnection([{ + url: new URL('http://127.0.0.1:9200'), + id: 'http://127.0.0.1:9200/', + roles: { + master: true, + data: true, + ingest: true + } + }]) + + pool.update([{ + url: new URL('http://127.0.0.1:9200'), + id: 'a1', + roles: true + }]) + + // roles will never be updated, we only use it to do + // a dummy check to see if the connection has been updated + t.deepEqual(pool.connections.find(c => c.id === 'a1').roles, { + master: true, + data: true, + ingest: true, + ml: false + }) + t.strictEqual(pool.connections.find(c => c.id === 'http://127.0.0.1:9200/'), undefined) + }) + + t.test('Add a new connection', t => { + t.plan(2) + const pool = new BaseConnectionPool({ Connection }) + pool.addConnection({ + url: new URL('http://127.0.0.1:9200'), + id: 'a1', + roles: { + master: true, + data: true, + ingest: true + } + }) + + pool.update([{ + url: new URL('http://127.0.0.1:9200'), + id: 'a1', + roles: null + }, { + url: new URL('http://127.0.0.1:9201'), + id: 'a2', + roles: null + }]) + + t.ok(pool.connections.find(c => c.id === 'a1').roles !== null) + t.ok(pool.connections.find(c => c.id === 'a2')) + }) + + t.test('Remove old connections', t => { + t.plan(3) + const pool = new BaseConnectionPool({ Connection }) + pool.addConnection({ + url: new URL('http://127.0.0.1:9200'), + id: 'a1', + roles: null + }) + + pool.update([{ + url: new URL('http://127.0.0.1:9200'), + id: 'a2', + roles: null + }, { + url: new URL('http://127.0.0.1:9201'), + id: 'a3', + roles: null + }]) + + t.false(pool.connections.find(c => c.id === 'a1')) + t.true(pool.connections.find(c => c.id === 'a2')) + t.true(pool.connections.find(c => c.id === 'a3')) + }) + + t.end() + }) + + t.test('CreateConnection', t => { + t.plan(1) + const pool = new BaseConnectionPool({ Connection }) + const conn = pool.createConnection('http://localhost:9200') + pool.connections.push(conn) + try { + pool.createConnection('http://localhost:9200') + t.fail('Should throw') + } catch (err) { + t.is(err.message, 'Connection with id \'http://localhost:9200/\' is already present') + } + }) + + t.end() +}) diff --git a/test/unit/client.test.js b/test/unit/client.test.js index 2c7bcc683..f6a86a985 100644 --- a/test/unit/client.test.js +++ b/test/unit/client.test.js @@ -7,6 +7,7 @@ const { test } = require('tap') const { URL } = require('url') const { Client, ConnectionPool, Transport } = require('../../index') +const { CloudConnectionPool } = require('../../lib/pool') const { buildServer } = require('../utils') test('Configure host', t => { @@ -15,7 +16,7 @@ test('Configure host', t => { node: 'http://localhost:9200' }) const pool = client.connectionPool - t.match(pool.connections.get('http://localhost:9200/'), { + t.match(pool.connections.find(c => c.id === 'http://localhost:9200/'), { url: new URL('http://localhost:9200'), id: 'http://localhost:9200/', ssl: null, @@ -36,7 +37,7 @@ test('Configure host', t => { nodes: ['http://localhost:9200', 'http://localhost:9201'] }) const pool = client.connectionPool - t.match(pool.connections.get('http://localhost:9200/'), { + t.match(pool.connections.find(c => c.id === 'http://localhost:9200/'), { url: new URL('http://localhost:9200'), id: 'http://localhost:9200/', ssl: null, @@ -49,7 +50,7 @@ test('Configure host', t => { ml: false } }) - t.match(pool.connections.get('http://localhost:9201/'), { + t.match(pool.connections.find(c => c.id === 'http://localhost:9201/'), { url: new URL('http://localhost:9201'), id: 'http://localhost:9201/', ssl: null, @@ -80,7 +81,7 @@ test('Configure host', t => { } }) const pool = client.connectionPool - t.match(pool.connections.get('node'), { + t.match(pool.connections.find(c => c.id === 'node'), { url: new URL('http://localhost:9200'), id: 'node', ssl: 'ssl', @@ -88,7 +89,7 @@ test('Configure host', t => { resurrectTimeout: 0 }) - t.deepEqual(pool.connections.get('node').roles, { + t.deepEqual(pool.connections.find(c => c.id === 'node').roles, { master: true, data: false, ingest: false, @@ -121,7 +122,7 @@ test('Configure host', t => { }] }) const pool = client.connectionPool - t.match(pool.connections.get('node1'), { + t.match(pool.connections.find(c => c.id === 'node1'), { url: new URL('http://localhost:9200'), id: 'node1', ssl: 'ssl', @@ -129,14 +130,14 @@ test('Configure host', t => { resurrectTimeout: 0 }) - t.deepEqual(pool.connections.get('node1').roles, { + t.deepEqual(pool.connections.find(c => c.id === 'node1').roles, { master: true, data: false, ingest: false, ml: false }) - t.match(pool.connections.get('node2'), { + t.match(pool.connections.find(c => c.id === 'node2'), { url: new URL('http://localhost:9200'), id: 'node2', ssl: 'ssl', @@ -144,7 +145,7 @@ test('Configure host', t => { resurrectTimeout: 0 }) - t.deepEqual(pool.connections.get('node2').roles, { + t.deepEqual(pool.connections.find(c => c.id === 'node2').roles, { master: false, data: true, ingest: false, @@ -163,7 +164,7 @@ test('Configure host', t => { } }) const pool = client.connectionPool - t.match(pool.connections.get('node'), { + t.match(pool.connections.find(c => c.id === 'node'), { url: new URL('http://localhost:9200'), headers: { 'x-foo': 'bar' } }) @@ -755,7 +756,7 @@ test('Extend client APIs', t => { test('Elastic cloud config', t => { t.test('Basic', t => { - t.plan(4) + t.plan(5) const client = new Client({ cloud: { // 'localhost$abcd$efgh' @@ -766,7 +767,8 @@ test('Elastic cloud config', t => { }) const pool = client.connectionPool - t.match(pool.connections.get('https://abcd.localhost/'), { + t.ok(pool instanceof CloudConnectionPool) + t.match(pool.connections.find(c => c.id === 'https://abcd.localhost/'), { url: new URL('https://elastic:changeme@abcd.localhost'), id: 'https://abcd.localhost/', headers: { @@ -789,7 +791,7 @@ test('Elastic cloud config', t => { }) t.test('Auth as separate option', t => { - t.plan(4) + t.plan(5) const client = new Client({ cloud: { // 'localhost$abcd$efgh' @@ -802,7 +804,8 @@ test('Elastic cloud config', t => { }) const pool = client.connectionPool - t.match(pool.connections.get('https://abcd.localhost/'), { + t.ok(pool instanceof CloudConnectionPool) + t.match(pool.connections.find(c => c.id === 'https://abcd.localhost/'), { url: new URL('https://elastic:changeme@abcd.localhost'), id: 'https://abcd.localhost/', headers: { @@ -825,7 +828,7 @@ test('Elastic cloud config', t => { }) t.test('Override default options', t => { - t.plan(3) + t.plan(4) const client = new Client({ cloud: { // 'localhost$abcd$efgh' @@ -840,6 +843,7 @@ test('Elastic cloud config', t => { } }) + t.ok(client.connectionPool instanceof CloudConnectionPool) t.strictEqual(client.transport.compression, false) t.strictEqual(client.transport.suggestCompression, false) t.deepEqual(client.connectionPool._ssl, { secureProtocol: 'TLSv1_1_method' }) diff --git a/test/unit/cloud-connection-pool.test.js b/test/unit/cloud-connection-pool.test.js new file mode 100644 index 000000000..e0cb1a499 --- /dev/null +++ b/test/unit/cloud-connection-pool.test.js @@ -0,0 +1,33 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +'use strict' + +const { test } = require('tap') +const { CloudConnectionPool } = require('../../lib/pool') +const Connection = require('../../lib/Connection') + +test('Should expose a cloudConnection property', t => { + const pool = new CloudConnectionPool({ Connection }) + pool.addConnection('http://localhost:9200/') + t.ok(pool.cloudConnection instanceof Connection) + t.end() +}) + +test('Get connection should always return cloudConnection', t => { + const pool = new CloudConnectionPool({ Connection }) + const conn = pool.addConnection('http://localhost:9200/') + t.deepEqual(pool.getConnection(), conn) + t.end() +}) + +test('pool.empty should reset cloudConnection', t => { + const pool = new CloudConnectionPool({ Connection }) + pool.addConnection('http://localhost:9200/') + t.ok(pool.cloudConnection instanceof Connection) + pool.empty(() => { + t.strictEqual(pool.cloudConnection, null) + t.end() + }) +}) diff --git a/test/unit/connection-pool.test.js b/test/unit/connection-pool.test.js index 25bb6deec..53234d901 100644 --- a/test/unit/connection-pool.test.js +++ b/test/unit/connection-pool.test.js @@ -6,7 +6,7 @@ const { test } = require('tap') const { URL } = require('url') -const ConnectionPool = require('../../lib/ConnectionPool') +const ConnectionPool = require('../../lib/pool/ConnectionPool') const Connection = require('../../lib/Connection') const { defaultNodeFilter, roundRobinSelector } = require('../../lib/Transport').internals const { connection: { MockConnection, MockConnectionTimeout } } = require('../utils') @@ -16,8 +16,8 @@ test('API', t => { const pool = new ConnectionPool({ Connection }) const href = 'http://localhost:9200/' pool.addConnection(href) - t.ok(pool.connections.get(href) instanceof Connection) - t.strictEqual(pool.connections.get(href).status, Connection.statuses.ALIVE) + t.ok(pool.connections.find(c => c.id === href) instanceof Connection) + t.strictEqual(pool.connections.find(c => c.id === href).status, Connection.statuses.ALIVE) t.deepEqual(pool.dead, []) t.end() }) @@ -53,7 +53,7 @@ test('API', t => { const href = 'http://localhost:9200/' var connection = pool.addConnection(href) pool.markDead(connection) - connection = pool.connections.get(href) + connection = pool.connections.find(c => c.id === href) t.strictEqual(connection.deadCount, 1) t.true(connection.resurrectTimeout > 0) t.deepEqual(pool.dead, [href]) @@ -80,7 +80,7 @@ test('API', t => { var connection = pool.addConnection(href) pool.markDead(connection) pool.markAlive(connection) - connection = pool.connections.get(href) + connection = pool.connections.find(c => c.id === href) t.strictEqual(connection.deadCount, 0) t.strictEqual(connection.resurrectTimeout, 0) t.strictEqual(connection.status, Connection.statuses.ALIVE) @@ -107,7 +107,7 @@ test('API', t => { } pool.resurrect(opts, (isAlive, connection) => { t.true(isAlive) - connection = pool.connections.get(connection.id) + connection = pool.connections.find(c => c.id === connection.id) t.strictEqual(connection.deadCount, 0) t.strictEqual(connection.resurrectTimeout, 0) t.strictEqual(connection.status, Connection.statuses.ALIVE) @@ -133,7 +133,7 @@ test('API', t => { } pool.resurrect(opts, (isAlive, connection) => { t.false(isAlive) - connection = pool.connections.get(connection.id) + connection = pool.connections.find(c => c.id === connection.id) t.strictEqual(connection.deadCount, 2) t.true(connection.resurrectTimeout > 0) t.strictEqual(connection.status, Connection.statuses.DEAD) @@ -161,7 +161,7 @@ test('API', t => { } pool.resurrect(opts, (isAlive, connection) => { t.true(isAlive) - connection = pool.connections.get(connection.id) + connection = pool.connections.find(c => c.id === connection.id) t.strictEqual(connection.deadCount, 1) t.true(connection.resurrectTimeout > 0) t.strictEqual(connection.status, Connection.statuses.ALIVE) @@ -187,7 +187,7 @@ test('API', t => { pool.resurrect(opts, (isAlive, connection) => { t.ok(isAlive === null) t.ok(connection === null) - connection = pool.connections.get(href) + connection = pool.connections.find(c => c.id === href) t.strictEqual(connection.deadCount, 1) t.true(connection.resurrectTimeout > 0) t.strictEqual(connection.status, Connection.statuses.DEAD) @@ -267,7 +267,7 @@ test('API', t => { pool.addConnection('http://localhost:9200/') pool.addConnection('http://localhost:9201/') pool.empty(() => { - t.strictEqual(pool.connections.size, 0) + t.strictEqual(pool.size, 0) t.deepEqual(pool.dead, []) t.end() }) @@ -480,12 +480,7 @@ test('API', t => { t.test('update', t => { t.test('Should not update existing connections', t => { t.plan(2) - class CustomConnectionPool extends ConnectionPool { - markAlive () { - t.fail('Should not be called') - } - } - const pool = new CustomConnectionPool({ Connection }) + const pool = new ConnectionPool({ Connection }) pool.addConnection([{ url: new URL('http://127.0.0.1:9200'), id: 'a1', @@ -514,12 +509,12 @@ test('API', t => { roles: null }]) - t.ok(pool.connections.get('a1').roles !== null) - t.ok(pool.connections.get('a2').roles !== null) + t.ok(pool.connections.find(c => c.id === 'a1').roles !== null) + t.ok(pool.connections.find(c => c.id === 'a2').roles !== null) }) t.test('Should not update existing connections (mark alive)', t => { - t.plan(4) + t.plan(5) class CustomConnectionPool extends ConnectionPool { markAlive (connection) { t.ok('called') @@ -560,15 +555,16 @@ test('API', t => { roles: null }]) - t.ok(pool.connections.get('a1').roles !== null) - t.ok(pool.connections.get('a2').roles !== null) + t.ok(pool.connections.find(c => c.id === 'a1').roles !== null) + t.ok(pool.connections.find(c => c.id === 'a2').roles !== null) }) t.test('Should not update existing connections (same url, different id)', t => { - t.plan(2) + t.plan(3) class CustomConnectionPool extends ConnectionPool { - markAlive () { - t.fail('Should not be called') + markAlive (connection) { + t.ok('called') + super.markAlive(connection) } } const pool = new CustomConnectionPool({ Connection }) @@ -590,13 +586,13 @@ test('API', t => { // roles will never be updated, we only use it to do // a dummy check to see if the connection has been updated - t.deepEqual(pool.connections.get('a1').roles, { + t.deepEqual(pool.connections.find(c => c.id === 'a1').roles, { master: true, data: true, ingest: true, ml: false }) - t.strictEqual(pool.connections.get('http://127.0.0.1:9200/'), undefined) + t.strictEqual(pool.connections.find(c => c.id === 'http://127.0.0.1:9200/'), undefined) }) t.test('Add a new connection', t => { @@ -622,8 +618,8 @@ test('API', t => { roles: null }]) - t.ok(pool.connections.get('a1').roles !== null) - t.true(pool.connections.has('a2')) + t.ok(pool.connections.find(c => c.id === 'a1').roles !== null) + t.ok(pool.connections.find(c => c.id === 'a2')) }) t.test('Remove old connections', t => { @@ -645,9 +641,37 @@ test('API', t => { roles: null }]) - t.false(pool.connections.has('a1')) - t.true(pool.connections.has('a2')) - t.true(pool.connections.has('a3')) + t.false(pool.connections.find(c => c.id === 'a1')) + t.true(pool.connections.find(c => c.id === 'a2')) + t.true(pool.connections.find(c => c.id === 'a3')) + }) + + t.test('Remove old connections (markDead)', t => { + t.plan(5) + const pool = new ConnectionPool({ Connection, sniffEnabled: true }) + const conn = pool.addConnection({ + url: new URL('http://127.0.0.1:9200'), + id: 'a1', + roles: null + }) + + pool.markDead(conn) + t.deepEqual(pool.dead, ['a1']) + + pool.update([{ + url: new URL('http://127.0.0.1:9200'), + id: 'a2', + roles: null + }, { + url: new URL('http://127.0.0.1:9201'), + id: 'a3', + roles: null + }]) + + t.deepEqual(pool.dead, []) + t.false(pool.connections.find(c => c.id === 'a1')) + t.true(pool.connections.find(c => c.id === 'a2')) + t.true(pool.connections.find(c => c.id === 'a3')) }) t.end() @@ -702,22 +726,22 @@ test('Node filter', t => { test('Single node behavior', t => { t.test('sniffing disabled (markDead and markAlive should be noop)', t => { - t.plan(2) + t.plan(4) const pool = new ConnectionPool({ Connection, sniffEnabled: false }) const conn = pool.addConnection('http://localhost:9200/') - pool.markDead(conn) + t.true(pool.markDead(conn) instanceof ConnectionPool) t.strictEqual(pool.dead.length, 0) - pool.markAlive(conn) + t.true(pool.markAlive(conn) instanceof ConnectionPool) t.strictEqual(pool.dead.length, 0) }) t.test('sniffing enabled (markDead and markAlive should work)', t => { - t.plan(2) + t.plan(4) const pool = new ConnectionPool({ Connection, sniffEnabled: true }) const conn = pool.addConnection('http://localhost:9200/') - pool.markDead(conn) + t.true(pool.markDead(conn) instanceof ConnectionPool) t.strictEqual(pool.dead.length, 1) - pool.markAlive(conn) + t.true(pool.markAlive(conn) instanceof ConnectionPool) t.strictEqual(pool.dead.length, 0) }) diff --git a/test/unit/connection.test.js b/test/unit/connection.test.js index 385dce8ca..cee34db74 100644 --- a/test/unit/connection.test.js +++ b/test/unit/connection.test.js @@ -733,7 +733,7 @@ test('Util.inspect Connection class should hide agent, ssl and auth', t => { resurrectTimeout: 0, _openRequests: 0, status: 'alive', - roles: { master: true, data: true, ingest: true, ml: false } }`) + roles: { master: true, data: true, ingest: true, ml: false }}`) ) }) diff --git a/test/unit/transport.test.js b/test/unit/transport.test.js index 03adcce72..28f95ed45 100644 --- a/test/unit/transport.test.js +++ b/test/unit/transport.test.js @@ -6,6 +6,7 @@ const { test } = require('tap') const { URL } = require('url') +const lolex = require('lolex') const { createGunzip } = require('zlib') const os = require('os') const intoStream = require('into-stream') @@ -23,7 +24,7 @@ const { ConfigurationError } = require('../../lib/errors') -const ConnectionPool = require('../../lib/ConnectionPool') +const ConnectionPool = require('../../lib/pool/ConnectionPool') const Connection = require('../../lib/Connection') const Serializer = require('../../lib/Serializer') const Transport = require('../../lib/Transport') @@ -878,148 +879,95 @@ test('Override requestTimeout', t => { test('sniff', t => { t.test('sniffOnStart', t => { - t.plan(3) + t.plan(1) - class CustomConnectionPool extends ConnectionPool { - update () { - t.ok('called') - return this - } - - nodesToHost (nodes) { - t.ok('called') - return [] + class MyTransport extends Transport { + sniff (opts) { + t.strictEqual(opts.reason, Transport.sniffReasons.SNIFF_ON_START) } } - function handler (req, res) { - t.strictEqual(req.url, '/sniff') - res.setHeader('Content-Type', 'application/json;utf=8') - res.end(JSON.stringify({ hello: 'world' })) - } + const pool = new ConnectionPool({ Connection }) + pool.addConnection('http://localhost:9200') - buildServer(handler, ({ port }, server) => { - const pool = new CustomConnectionPool({ Connection }) - pool.addConnection(`http://localhost:${port}`) - - // eslint-disable-next-line - new Transport({ - emit: () => {}, - connectionPool: pool, - serializer: new Serializer(), - maxRetries: 3, - requestTimeout: 30000, - sniffInterval: false, - sniffOnStart: true, - sniffEndpoint: '/sniff' - }) - - setTimeout(() => server.stop(), 100) + // eslint-disable-next-line + new MyTransport({ + emit: () => {}, + connectionPool: pool, + serializer: new Serializer(), + maxRetries: 3, + requestTimeout: 30000, + sniffInterval: false, + sniffOnStart: true, + sniffEndpoint: '/sniff' }) }) t.test('sniffOnConnectionFault', t => { - t.plan(3) + t.plan(2) - class CustomConnectionPool extends ConnectionPool { - update () { - t.ok('called') - return this - } - - nodesToHost (nodes) { - t.ok('called') - return [] + class MyTransport extends Transport { + sniff (opts) { + t.strictEqual(opts.reason, Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT) } } - function handler (req, res) { - if (req.url === '/other/sniff') { - res.setHeader('Content-Type', 'application/json;utf=8') - res.end(JSON.stringify({ hello: 'world' })) - } else { - setTimeout(() => res.end(), 1000) - } - } + const pool = new ConnectionPool({ Connection: MockConnectionTimeout }) + pool.addConnection('http://localhost:9200') - buildServer(handler, ({ port }, server) => { - const pool = new CustomConnectionPool({ Connection }) - pool.addConnection(`http://localhost:${port}`) - pool.addConnection(`http://localhost:${port}/other`) + const transport = new MyTransport({ + emit: () => {}, + connectionPool: pool, + serializer: new Serializer(), + maxRetries: 0, + requestTimeout: 500, + sniffInterval: false, + sniffOnConnectionFault: true, + sniffEndpoint: '/sniff' + }) - const transport = new Transport({ - emit: () => {}, - connectionPool: pool, - serializer: new Serializer(), - maxRetries: 0, - requestTimeout: 500, - sniffInterval: false, - sniffOnConnectionFault: true, - sniffEndpoint: '/sniff' - }) - - transport.request({ - method: 'GET', - path: '/' - }, (err, { body }) => { - t.ok(err instanceof TimeoutError) - }) - - setTimeout(() => server.stop(), 1100) + transport.request({ + method: 'GET', + path: '/' + }, (err, { body }) => { + t.ok(err instanceof TimeoutError) }) }) t.test('sniffInterval', t => { - t.plan(9) + t.plan(6) - class CustomConnectionPool extends ConnectionPool { - update () { - return this - } + const clock = lolex.install({ toFake: ['Date'] }) + t.teardown(() => clock.uninstall()) - nodesToHost (nodes) { - return [] + class MyTransport extends Transport { + sniff (opts) { + t.strictEqual(opts.reason, Transport.sniffReasons.SNIFF_INTERVAL) } } - function handler (req, res) { - // this should be called 6 times - t.ok('called') - res.setHeader('Content-Type', 'application/json;utf=8') - res.end(JSON.stringify({ hello: 'world' })) - } + const pool = new ConnectionPool({ Connection: MockConnection }) + pool.addConnection('http://localhost:9200') - buildServer(handler, ({ port }, server) => { - const pool = new CustomConnectionPool({ Connection }) - pool.addConnection(`http://localhost:${port}`) - - const transport = new Transport({ - emit: () => {}, - connectionPool: pool, - serializer: new Serializer(), - maxRetries: 3, - requestTimeout: 3000, - sniffInterval: 1, - sniffEndpoint: '/sniff' - }) - - const params = { method: 'GET', path: '/' } - setTimeout(() => { - transport.request(params, t.error) - }, 100) - - setTimeout(() => { - transport.request(params, t.error) - }, 200) - - setTimeout(() => { - transport.request(params, t.error) - }, 300) - - setTimeout(() => { - server.stop() - }, 400) + const transport = new MyTransport({ + emit: () => {}, + connectionPool: pool, + serializer: new Serializer(), + maxRetries: 3, + requestTimeout: 3000, + sniffInterval: 1, + sniffEndpoint: '/sniff' }) + + const params = { method: 'GET', path: '/' } + clock.tick(100) + transport.request(params, t.error) + + clock.tick(200) + transport.request(params, t.error) + + clock.tick(300) + transport.request(params, t.error) }) t.test('errored', t => {