From 5b856cd4c283bf7b17690841650aa97542b7d04f Mon Sep 17 00:00:00 2001 From: Tomas Della Vedova Date: Thu, 21 Feb 2019 12:48:49 +0100 Subject: [PATCH] Child client support (#768) With this pr we introduce the `client.child` API, which returns a new client instance that shares the connection pool with the parent client. This feature can be handy if you need to have multiple client instances with different configurations, but with a shared connection pool. Example: ```js const { Client } = require('@elastic/elasticsearch') const client = new Client({ node: 'http://localhost:9200' }) const child = client.child({ headers: { 'x-foo': 'bar' }, requestTimeout: 1000 }) client.info(console.log) child.info(console.log) ``` **Open questions:** * Currently, the event emitter is shared between the parent and the child(ren), is it ok? * Currently, if you extend the parent client, the child client will have the same extensions, while if the child client adds an extension, the parent client will not be extended. Is it ok? **Caveats:** * You can override _any_ option except for the connection pool specific options (`ssl`, `agent`, `pingTimeout`, `Connection`, and `resurrectStrategy`). * You can't specify a new `Connection` class. * If you call `close` in any of the parent/child clients, every client will be closed. _Note: the `nodeFilter` and `nodeSelector` options are now `Transport` options and no longer `ConnectionPool` options._ --- docs/child.asciidoc | 23 ++++ index.d.ts | 8 +- index.js | 44 ++++++- lib/ConnectionPool.d.ts | 13 +- lib/ConnectionPool.js | 41 +------ lib/Transport.d.ts | 11 ++ lib/Transport.js | 42 ++++++- test/unit/child.test.js | 194 ++++++++++++++++++++++++++++++ test/unit/connection-pool.test.js | 62 ++-------- test/unit/selectors.test.js | 2 +- test/unit/transport.test.js | 33 +++++ 11 files changed, 357 insertions(+), 116 deletions(-) create mode 100644 docs/child.asciidoc create mode 100644 test/unit/child.test.js diff --git a/docs/child.asciidoc b/docs/child.asciidoc new file mode 100644 index 000000000..74ba23185 --- /dev/null +++ b/docs/child.asciidoc @@ -0,0 +1,23 @@ += Creating a child client + +There are some use cases where you may need multiple instances of the client. You can easily do that by calling `new Client()` as many times as you need, but you will lose all the benefits of using one single client, such as the long living connections and the connection pool handling. + +To avoid this problem the client offers a `child` API, which returns a new client instance that shares the connection pool with the parent client. + + +NOTE: The event emitter is shared between the parent and the child(ren), and if you extend the parent client, the child client will have the same extensions, while if the child client adds an extension, the parent client will not be extended. + +You can pass to the `child` every client option you would pass to a normal client, but the connection pool specific options (`ssl`, `agent`, `pingTimeout`, `Connection`, and `resurrectStrategy`). + +CAUTION: If you call `close` in any of the parent/child clients, every client will be closed. + +[source,js] +---- +const { Client } = require('@elastic/elasticsearch') +const client = new Client({ node: 'http://localhost:9200' }) +const child = client.child({ + headers: { 'x-foo': 'bar' }, + requestTimeout: 1000 +}) + +client.info(console.log) +child.info(console.log) +---- diff --git a/index.d.ts b/index.d.ts index 40926e816..4b31d159a 100644 --- a/index.d.ts +++ b/index.d.ts @@ -7,10 +7,12 @@ import Transport, { EventMeta, SniffMeta, TransportRequestParams, - TransportRequestOptions + TransportRequestOptions, + nodeFilterFn, + nodeSelectorFn } from './lib/Transport'; import Connection, { AgentOptions } from './lib/Connection'; -import ConnectionPool, { nodeSelectorFn, nodeFilterFn, ResurrectMeta } from './lib/ConnectionPool'; +import ConnectionPool, { ResurrectMeta } from './lib/ConnectionPool'; import Serializer from './lib/Serializer'; import * as RequestParams from './api/requestParams'; import * as errors from './lib/errors'; @@ -65,6 +67,7 @@ interface ClientOptions { agent?: AgentOptions; nodeFilter?: nodeFilterFn; nodeSelector?: nodeSelectorFn | string; + headers?: anyObject; cloud?: { id: string; username: string; @@ -78,6 +81,7 @@ declare class Client extends EventEmitter { transport: Transport; serializer: Serializer; extend: ClientExtends; + child(opts?: ClientOptions): Client; close(callback?: Function): Promise | void; bulk: ApiMethod cat: { diff --git a/index.js b/index.js index 783ceeafc..4769ff93c 100644 --- a/index.js +++ b/index.js @@ -9,6 +9,10 @@ const Serializer = require('./lib/Serializer') const errors = require('./lib/errors') const { ConfigurationError } = errors +const kInitialOptions = Symbol('elasticsearchjs-initial-options') +const kChild = Symbol('elasticsearchjs-child') +const kExtensions = Symbol('elasticsearchjs-extensions') + const buildApi = require('./api') class Client extends EventEmitter { @@ -52,19 +56,18 @@ class Client extends EventEmitter { agent: null, headers: {}, nodeFilter: null, - nodeWeighter: null, nodeSelector: 'round-robin' }, opts) + this[kInitialOptions] = options + this[kExtensions] = [] + this.serializer = new options.Serializer() this.connectionPool = new options.ConnectionPool({ pingTimeout: options.pingTimeout, resurrectStrategy: options.resurrectStrategy, ssl: options.ssl, agent: options.agent, - nodeFilter: options.nodeFilter, - nodeWeighter: options.nodeWeighter, - nodeSelector: options.nodeSelector, Connection: options.Connection, emit: this.emit.bind(this), sniffEnabled: options.sniffInterval !== false || @@ -73,7 +76,9 @@ class Client extends EventEmitter { }) // Add the connections before initialize the Transport - this.connectionPool.addConnection(options.node || options.nodes) + if (opts[kChild] !== true) { + this.connectionPool.addConnection(options.node || options.nodes) + } this.transport = new options.Transport({ emit: this.emit.bind(this), @@ -87,7 +92,9 @@ class Client extends EventEmitter { sniffEndpoint: options.sniffEndpoint, suggestCompression: options.suggestCompression, compression: options.compression, - headers: options.headers + headers: options.headers, + nodeFilter: options.nodeFilter, + nodeSelector: options.nodeSelector }) const apis = buildApi({ @@ -135,6 +142,31 @@ class Client extends EventEmitter { ConfigurationError }) } + + this[kExtensions].push({ name, opts, fn }) + } + + child (opts) { + // Merge the new options with the initial ones + const initialOptions = Object.assign({}, this[kInitialOptions], opts) + // Tell to the client that we are creating a child client + initialOptions[kChild] = true + + const client = new Client(initialOptions) + // Reuse the same connection pool + client.connectionPool = this.connectionPool + client.transport.connectionPool = this.connectionPool + // Share event listener + const emitter = this.emit.bind(this) + client.emit = emitter + client.connectionPool.emit = emitter + client.transport.emit = emitter + client.on = this.on.bind(this) + // Add parent extensions + this[kExtensions].forEach(({ name, opts, fn }) => { + client.extend(name, opts, fn) + }) + return client } close (callback) { diff --git a/lib/ConnectionPool.d.ts b/lib/ConnectionPool.d.ts index 8b14ced59..a98cb6677 100644 --- a/lib/ConnectionPool.d.ts +++ b/lib/ConnectionPool.d.ts @@ -2,14 +2,7 @@ import { SecureContextOptions } from 'tls'; import Connection, { AgentOptions } from './Connection'; - -export interface nodeSelectorFn { - (connections: Connection[]): Connection; -} - -export interface nodeFilterFn { - (connection: Connection): boolean; -} +import { nodeFilterFn, nodeSelectorFn } from './Transport'; interface ConnectionPoolOptions { ssl?: SecureContextOptions; @@ -17,8 +10,6 @@ interface ConnectionPoolOptions { pingTimeout?: number; Connection: typeof Connection; resurrectStrategy?: string; - nodeFilter?: nodeFilterFn; - nodeSelector?: string | nodeSelectorFn; } export interface getConnectionOptions { @@ -46,8 +37,6 @@ export default class ConnectionPool { resurrectTimeout: number; resurrectTimeoutCutoff: number; pingTimeout: number; - nodeFilter: nodeFilterFn; - nodeSelector: nodeSelectorFn; Connection: typeof Connection; resurrectStrategy: number; constructor(opts?: ConnectionPoolOptions); diff --git a/lib/ConnectionPool.js b/lib/ConnectionPool.js index f5e82f401..7cdb89b5c 100644 --- a/lib/ConnectionPool.js +++ b/lib/ConnectionPool.js @@ -24,21 +24,10 @@ class ConnectionPool { // the timeout doesn't increase this.resurrectTimeoutCutoff = 5 this.pingTimeout = opts.pingTimeout - this.nodeFilter = opts.nodeFilter || defaultNodeFilter this.Connection = opts.Connection this.emit = opts.emit || noop this._sniffEnabled = opts.sniffEnabled || false - if (typeof opts.nodeSelector === 'function') { - this.nodeSelector = opts.nodeSelector - } else if (opts.nodeSelector === 'round-robin') { - this.nodeSelector = roundRobinSelector() - } else if (opts.nodeSelector === 'random') { - this.nodeSelector = randomSelector - } else { - this.nodeSelector = roundRobinSelector() - } - const resurrectStrategy = opts.resurrectStrategy || 'ping' this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy] assert( @@ -174,8 +163,8 @@ class ConnectionPool { * @returns {object|null} connection */ getConnection (opts = {}) { - const filter = opts.filter || this.nodeFilter - const selector = opts.selector || this.nodeSelector + const filter = opts.filter || (() => true) + const selector = opts.selector || (c => c[0]) // TODO: can we cache this? const connections = [] @@ -381,30 +370,4 @@ ConnectionPool.resurrectStrategies = { // .sort((a, b) => a[0] - b[0]) // .map(a => a[1]) -function defaultNodeFilter (node) { - // avoid master only nodes - if (node.roles.master === true && - node.roles.data === false && - node.roles.ingest === false) { - return false - } - return true -} - -function roundRobinSelector () { - var current = -1 - return function _roundRobinSelector (connections) { - if (++current >= connections.length) { - current = 0 - } - return connections[current] - } -} - -function randomSelector (connections) { - const index = Math.floor(Math.random() * connections.length) - return connections[index] -} - module.exports = ConnectionPool -module.exports.internals = { defaultNodeFilter, roundRobinSelector, randomSelector } diff --git a/lib/Transport.d.ts b/lib/Transport.d.ts index 718377fc1..9a9730127 100644 --- a/lib/Transport.d.ts +++ b/lib/Transport.d.ts @@ -2,6 +2,14 @@ import ConnectionPool from './ConnectionPool'; import Connection from './Connection'; import Serializer from './Serializer'; +export interface nodeSelectorFn { + (connections: Connection[]): Connection; +} + +export interface nodeFilterFn { + (connection: Connection): boolean; +} + declare type noopFn = (...args: any[]) => void; declare type emitFn = (event: string | symbol, ...args: any[]) => boolean; @@ -17,6 +25,9 @@ interface TransportOptions { sniffOnConnectionFault: boolean; sniffEndpoint: string; sniffOnStart: boolean; + nodeFilter?: nodeFilterFn; + nodeSelector?: string | nodeSelectorFn; + headers?: anyObject; } export interface ApiResponse { diff --git a/lib/Transport.js b/lib/Transport.js index 6dafb3574..f16907bec 100644 --- a/lib/Transport.js +++ b/lib/Transport.js @@ -32,6 +32,17 @@ class Transport { this.sniffOnConnectionFault = opts.sniffOnConnectionFault this.sniffEndpoint = opts.sniffEndpoint + this.nodeFilter = opts.nodeFilter || defaultNodeFilter + if (typeof opts.nodeSelector === 'function') { + this.nodeSelector = opts.nodeSelector + } else if (opts.nodeSelector === 'round-robin') { + this.nodeSelector = roundRobinSelector() + } else if (opts.nodeSelector === 'random') { + this.nodeSelector = randomSelector + } else { + this.nodeSelector = roundRobinSelector() + } + this._sniffEnabled = typeof this.sniffInterval === 'number' this._nextSniff = this._sniffEnabled ? (Date.now() + this.sniffInterval) : 0 this._isSniffing = false @@ -275,7 +286,10 @@ class Transport { this.sniff(Transport.sniffReasons.SNIFF_INTERVAL) } this.connectionPool.resurrect(now) - return this.connectionPool.getConnection() + return this.connectionPool.getConnection({ + filter: this.nodeFilter, + selector: this.nodeSelector + }) } sniff (reason = Transport.sniffReasons.DEFAULT, callback = noop) { @@ -340,4 +354,30 @@ function isStream (obj) { return typeof obj.pipe === 'function' } +function defaultNodeFilter (node) { + // avoid master only nodes + if (node.roles.master === true && + node.roles.data === false && + node.roles.ingest === false) { + return false + } + return true +} + +function roundRobinSelector () { + var current = -1 + return function _roundRobinSelector (connections) { + if (++current >= connections.length) { + current = 0 + } + return connections[current] + } +} + +function randomSelector (connections) { + const index = Math.floor(Math.random() * connections.length) + return connections[index] +} + module.exports = Transport +module.exports.internals = { defaultNodeFilter, roundRobinSelector, randomSelector } diff --git a/test/unit/child.test.js b/test/unit/child.test.js new file mode 100644 index 000000000..5e8405de3 --- /dev/null +++ b/test/unit/child.test.js @@ -0,0 +1,194 @@ +'use strict' + +const { test } = require('tap') +const { Client, errors } = require('../../index') +const { + buildServer, + connection: { MockConnection } +} = require('../utils') + +test('Should create a child client (headers check)', t => { + t.plan(4) + + var count = 0 + function handler (req, res) { + if (count++ === 0) { + t.match(req.headers, { 'x-foo': 'bar' }) + } else { + t.match(req.headers, { 'x-baz': 'faz' }) + } + res.setHeader('Content-Type', 'application/json;utf=8') + res.end(JSON.stringify({ hello: 'world' })) + } + + buildServer(handler, ({ port }, server) => { + const client = new Client({ + node: `http://localhost:${port}`, + headers: { 'x-foo': 'bar' } + }) + const child = client.child({ + headers: { 'x-baz': 'faz' } + }) + + client.info((err, res) => { + t.error(err) + child.info((err, res) => { + t.error(err) + server.stop() + }) + }) + }) +}) + +test('Should create a child client (timeout check)', t => { + t.plan(2) + + function handler (req, res) { + setTimeout(() => { + res.setHeader('Content-Type', 'application/json;utf=8') + res.end(JSON.stringify({ hello: 'world' })) + }, 50) + } + + buildServer(handler, ({ port }, server) => { + const client = new Client({ node: `http://localhost:${port}` }) + const child = client.child({ requestTimeout: 25, maxRetries: 0 }) + + client.info((err, res) => { + t.error(err) + child.info((err, res) => { + t.true(err instanceof errors.TimeoutError) + server.stop() + }) + }) + }) +}) + +test('Client extensions', t => { + t.test('One level', t => { + t.plan(1) + + const client = new Client({ node: 'http://localhost:9200' }) + client.extend('utility.index', () => { + return () => t.ok('called') + }) + + const child = client.child() + child.utility.index() + }) + + t.test('Two levels', t => { + t.plan(2) + + const client = new Client({ node: 'http://localhost:9200' }) + client.extend('utility.index', () => { + return () => t.ok('called') + }) + + const child = client.child() + child.extend('utility.search', () => { + return () => t.ok('called') + }) + + const grandchild = child.child() + grandchild.utility.index() + grandchild.utility.search() + }) + + t.test('The child should not extend the parent', t => { + t.plan(1) + + const client = new Client({ node: 'http://localhost:9200' }) + const child = client.child() + + child.extend('utility.index', () => { + return () => t.fail('Should not be called') + }) + + try { + client.utility.index() + } catch (err) { + t.ok(err) + } + }) + + t.end() +}) + +test('Should share the event emitter', t => { + t.test('One level', t => { + t.plan(2) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + const child = client.child() + + client.on('response', (err, meta) => { + t.error(err) + }) + + child.info((err, res) => { + t.error(err) + }) + }) + + t.test('Two levels', t => { + t.plan(2) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + const child = client.child() + const grandchild = child.child() + + client.on('response', (err, meta) => { + t.error(err) + }) + + grandchild.info((err, res) => { + t.error(err) + }) + }) + + t.test('Child listener - one level', t => { + t.plan(2) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + const child = client.child() + + child.on('response', (err, meta) => { + t.error(err) + }) + + child.info((err, res) => { + t.error(err) + }) + }) + + t.test('Child listener - two levels', t => { + t.plan(2) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + const child = client.child() + const grandchild = child.child() + + child.on('response', (err, meta) => { + t.error(err) + }) + + grandchild.info((err, res) => { + t.error(err) + }) + }) + + t.end() +}) diff --git a/test/unit/connection-pool.test.js b/test/unit/connection-pool.test.js index 0969e21a0..534d9fc5e 100644 --- a/test/unit/connection-pool.test.js +++ b/test/unit/connection-pool.test.js @@ -4,6 +4,7 @@ const { test } = require('tap') const { URL } = require('url') const ConnectionPool = require('../../lib/ConnectionPool') const Connection = require('../../lib/Connection') +const { defaultNodeFilter, roundRobinSelector } = require('../../lib/Transport').internals const { connection: { MockConnection, MockConnectionTimeout } } = require('../utils') test('API', t => { @@ -227,22 +228,6 @@ test('API', t => { pool.getConnection({ filter }) }) - t.test('filter as ConnectionPool option', t => { - t.plan(3) - - const href1 = 'http://localhost:9200/' - const href2 = 'http://localhost:9200/other' - const pool = new ConnectionPool({ - Connection, - nodeFilter: node => { - t.ok('called') - return true - } - }) - pool.addConnection([href1, href2]) - t.strictEqual(pool.getConnection().id, href1) - }) - t.end() }) @@ -498,27 +483,16 @@ test('API', t => { test('Node selector', t => { t.test('round-robin', t => { t.plan(1) - const pool = new ConnectionPool({ Connection, nodeSelector: 'round-robin' }) + const pool = new ConnectionPool({ Connection }) pool.addConnection('http://localhost:9200/') - t.true(pool.getConnection() instanceof Connection) + t.true(pool.getConnection({ selector: roundRobinSelector() }) instanceof Connection) }) t.test('random', t => { t.plan(1) - const pool = new ConnectionPool({ Connection, nodeSelector: 'random' }) + const pool = new ConnectionPool({ Connection }) pool.addConnection('http://localhost:9200/') - t.true(pool.getConnection() instanceof Connection) - }) - - t.test('custom function', t => { - t.plan(2) - const nodeSelector = connections => { - t.ok('called') - return connections[0] - } - const pool = new ConnectionPool({ Connection, nodeSelector }) - pool.addConnection('http://localhost:9200/') - t.true(pool.getConnection() instanceof Connection) + t.true(pool.getConnection({ selector: roundRobinSelector() }) instanceof Connection) }) t.end() @@ -529,7 +503,7 @@ test('Node filter', t => { t.plan(1) const pool = new ConnectionPool({ Connection }) pool.addConnection({ url: new URL('http://localhost:9200/') }) - t.true(pool.getConnection() instanceof Connection) + t.true(pool.getConnection({ filter: defaultNodeFilter }) instanceof Connection) }) t.test('Should filter master only nodes', t => { @@ -544,29 +518,7 @@ test('Node filter', t => { ml: false } }) - t.strictEqual(pool.getConnection(), null) - }) - - t.test('custom', t => { - t.plan(2) - const nodeFilter = node => { - t.ok('called') - return true - } - const pool = new ConnectionPool({ Connection, nodeFilter }) - pool.addConnection({ url: new URL('http://localhost:9200/') }) - t.true(pool.getConnection() instanceof Connection) - }) - - t.test('custom (filter)', t => { - t.plan(2) - const nodeFilter = node => { - t.ok('called') - return false - } - const pool = new ConnectionPool({ Connection, nodeFilter }) - pool.addConnection({ url: new URL('http://localhost:9200/') }) - t.strictEqual(pool.getConnection(), null) + t.strictEqual(pool.getConnection({ filter: defaultNodeFilter }), null) }) t.end() diff --git a/test/unit/selectors.test.js b/test/unit/selectors.test.js index 4ce57b581..cde2d52a5 100644 --- a/test/unit/selectors.test.js +++ b/test/unit/selectors.test.js @@ -1,7 +1,7 @@ 'use strict' const { test } = require('tap') -const { roundRobinSelector, randomSelector } = require('../../lib/ConnectionPool').internals +const { roundRobinSelector, randomSelector } = require('../../lib/Transport').internals test('RoundRobinSelector', t => { const selector = roundRobinSelector() diff --git a/test/unit/transport.test.js b/test/unit/transport.test.js index 8fba614d1..6fe920c49 100644 --- a/test/unit/transport.test.js +++ b/test/unit/transport.test.js @@ -1978,3 +1978,36 @@ test('Headers configuration', t => { t.end() }) + +test('nodeFilter and nodeSelector', t => { + t.plan(4) + + const pool = new ConnectionPool({ Connection: MockConnection }) + pool.addConnection('http://localhost:9200') + + const transport = new Transport({ + emit: () => {}, + connectionPool: pool, + serializer: new Serializer(), + maxRetries: 3, + requestTimeout: 30000, + sniffInterval: false, + sniffOnStart: false, + nodeFilter: () => { + t.ok('called') + return true + }, + nodeSelector: conns => { + t.ok('called') + return conns[0] + } + }) + + transport.request({ + method: 'GET', + path: '/hello' + }, (err, { body }) => { + t.error(err) + t.deepEqual(body, { hello: 'world' }) + }) +})