diff --git a/docs/child.asciidoc b/docs/child.asciidoc new file mode 100644 index 000000000..74ba23185 --- /dev/null +++ b/docs/child.asciidoc @@ -0,0 +1,23 @@ += Creating a child client + +There are some use cases where you may need multiple instances of the client. You can easily do that by calling `new Client()` as many times as you need, but you will lose all the benefits of using one single client, such as the long living connections and the connection pool handling. + +To avoid this problem the client offers a `child` API, which returns a new client instance that shares the connection pool with the parent client. + + +NOTE: The event emitter is shared between the parent and the child(ren), and if you extend the parent client, the child client will have the same extensions, while if the child client adds an extension, the parent client will not be extended. + +You can pass to the `child` every client option you would pass to a normal client, but the connection pool specific options (`ssl`, `agent`, `pingTimeout`, `Connection`, and `resurrectStrategy`). + +CAUTION: If you call `close` in any of the parent/child clients, every client will be closed. + +[source,js] +---- +const { Client } = require('@elastic/elasticsearch') +const client = new Client({ node: 'http://localhost:9200' }) +const child = client.child({ + headers: { 'x-foo': 'bar' }, + requestTimeout: 1000 +}) + +client.info(console.log) +child.info(console.log) +---- diff --git a/index.d.ts b/index.d.ts index 40926e816..4b31d159a 100644 --- a/index.d.ts +++ b/index.d.ts @@ -7,10 +7,12 @@ import Transport, { EventMeta, SniffMeta, TransportRequestParams, - TransportRequestOptions + TransportRequestOptions, + nodeFilterFn, + nodeSelectorFn } from './lib/Transport'; import Connection, { AgentOptions } from './lib/Connection'; -import ConnectionPool, { nodeSelectorFn, nodeFilterFn, ResurrectMeta } from './lib/ConnectionPool'; +import ConnectionPool, { ResurrectMeta } from './lib/ConnectionPool'; import Serializer from './lib/Serializer'; import * as RequestParams from './api/requestParams'; import * as errors from './lib/errors'; @@ -65,6 +67,7 @@ interface ClientOptions { agent?: AgentOptions; nodeFilter?: nodeFilterFn; nodeSelector?: nodeSelectorFn | string; + headers?: anyObject; cloud?: { id: string; username: string; @@ -78,6 +81,7 @@ declare class Client extends EventEmitter { transport: Transport; serializer: Serializer; extend: ClientExtends; + child(opts?: ClientOptions): Client; close(callback?: Function): Promise | void; bulk: ApiMethod cat: { diff --git a/index.js b/index.js index 783ceeafc..4769ff93c 100644 --- a/index.js +++ b/index.js @@ -9,6 +9,10 @@ const Serializer = require('./lib/Serializer') const errors = require('./lib/errors') const { ConfigurationError } = errors +const kInitialOptions = Symbol('elasticsearchjs-initial-options') +const kChild = Symbol('elasticsearchjs-child') +const kExtensions = Symbol('elasticsearchjs-extensions') + const buildApi = require('./api') class Client extends EventEmitter { @@ -52,19 +56,18 @@ class Client extends EventEmitter { agent: null, headers: {}, nodeFilter: null, - nodeWeighter: null, nodeSelector: 'round-robin' }, opts) + this[kInitialOptions] = options + this[kExtensions] = [] + this.serializer = new options.Serializer() this.connectionPool = new options.ConnectionPool({ pingTimeout: options.pingTimeout, resurrectStrategy: options.resurrectStrategy, ssl: options.ssl, agent: options.agent, - nodeFilter: options.nodeFilter, - nodeWeighter: options.nodeWeighter, - nodeSelector: options.nodeSelector, Connection: options.Connection, emit: this.emit.bind(this), sniffEnabled: options.sniffInterval !== false || @@ -73,7 +76,9 @@ class Client extends EventEmitter { }) // Add the connections before initialize the Transport - this.connectionPool.addConnection(options.node || options.nodes) + if (opts[kChild] !== true) { + this.connectionPool.addConnection(options.node || options.nodes) + } this.transport = new options.Transport({ emit: this.emit.bind(this), @@ -87,7 +92,9 @@ class Client extends EventEmitter { sniffEndpoint: options.sniffEndpoint, suggestCompression: options.suggestCompression, compression: options.compression, - headers: options.headers + headers: options.headers, + nodeFilter: options.nodeFilter, + nodeSelector: options.nodeSelector }) const apis = buildApi({ @@ -135,6 +142,31 @@ class Client extends EventEmitter { ConfigurationError }) } + + this[kExtensions].push({ name, opts, fn }) + } + + child (opts) { + // Merge the new options with the initial ones + const initialOptions = Object.assign({}, this[kInitialOptions], opts) + // Tell to the client that we are creating a child client + initialOptions[kChild] = true + + const client = new Client(initialOptions) + // Reuse the same connection pool + client.connectionPool = this.connectionPool + client.transport.connectionPool = this.connectionPool + // Share event listener + const emitter = this.emit.bind(this) + client.emit = emitter + client.connectionPool.emit = emitter + client.transport.emit = emitter + client.on = this.on.bind(this) + // Add parent extensions + this[kExtensions].forEach(({ name, opts, fn }) => { + client.extend(name, opts, fn) + }) + return client } close (callback) { diff --git a/lib/ConnectionPool.d.ts b/lib/ConnectionPool.d.ts index 8b14ced59..a98cb6677 100644 --- a/lib/ConnectionPool.d.ts +++ b/lib/ConnectionPool.d.ts @@ -2,14 +2,7 @@ import { SecureContextOptions } from 'tls'; import Connection, { AgentOptions } from './Connection'; - -export interface nodeSelectorFn { - (connections: Connection[]): Connection; -} - -export interface nodeFilterFn { - (connection: Connection): boolean; -} +import { nodeFilterFn, nodeSelectorFn } from './Transport'; interface ConnectionPoolOptions { ssl?: SecureContextOptions; @@ -17,8 +10,6 @@ interface ConnectionPoolOptions { pingTimeout?: number; Connection: typeof Connection; resurrectStrategy?: string; - nodeFilter?: nodeFilterFn; - nodeSelector?: string | nodeSelectorFn; } export interface getConnectionOptions { @@ -46,8 +37,6 @@ export default class ConnectionPool { resurrectTimeout: number; resurrectTimeoutCutoff: number; pingTimeout: number; - nodeFilter: nodeFilterFn; - nodeSelector: nodeSelectorFn; Connection: typeof Connection; resurrectStrategy: number; constructor(opts?: ConnectionPoolOptions); diff --git a/lib/ConnectionPool.js b/lib/ConnectionPool.js index f5e82f401..7cdb89b5c 100644 --- a/lib/ConnectionPool.js +++ b/lib/ConnectionPool.js @@ -24,21 +24,10 @@ class ConnectionPool { // the timeout doesn't increase this.resurrectTimeoutCutoff = 5 this.pingTimeout = opts.pingTimeout - this.nodeFilter = opts.nodeFilter || defaultNodeFilter this.Connection = opts.Connection this.emit = opts.emit || noop this._sniffEnabled = opts.sniffEnabled || false - if (typeof opts.nodeSelector === 'function') { - this.nodeSelector = opts.nodeSelector - } else if (opts.nodeSelector === 'round-robin') { - this.nodeSelector = roundRobinSelector() - } else if (opts.nodeSelector === 'random') { - this.nodeSelector = randomSelector - } else { - this.nodeSelector = roundRobinSelector() - } - const resurrectStrategy = opts.resurrectStrategy || 'ping' this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy] assert( @@ -174,8 +163,8 @@ class ConnectionPool { * @returns {object|null} connection */ getConnection (opts = {}) { - const filter = opts.filter || this.nodeFilter - const selector = opts.selector || this.nodeSelector + const filter = opts.filter || (() => true) + const selector = opts.selector || (c => c[0]) // TODO: can we cache this? const connections = [] @@ -381,30 +370,4 @@ ConnectionPool.resurrectStrategies = { // .sort((a, b) => a[0] - b[0]) // .map(a => a[1]) -function defaultNodeFilter (node) { - // avoid master only nodes - if (node.roles.master === true && - node.roles.data === false && - node.roles.ingest === false) { - return false - } - return true -} - -function roundRobinSelector () { - var current = -1 - return function _roundRobinSelector (connections) { - if (++current >= connections.length) { - current = 0 - } - return connections[current] - } -} - -function randomSelector (connections) { - const index = Math.floor(Math.random() * connections.length) - return connections[index] -} - module.exports = ConnectionPool -module.exports.internals = { defaultNodeFilter, roundRobinSelector, randomSelector } diff --git a/lib/Transport.d.ts b/lib/Transport.d.ts index 718377fc1..9a9730127 100644 --- a/lib/Transport.d.ts +++ b/lib/Transport.d.ts @@ -2,6 +2,14 @@ import ConnectionPool from './ConnectionPool'; import Connection from './Connection'; import Serializer from './Serializer'; +export interface nodeSelectorFn { + (connections: Connection[]): Connection; +} + +export interface nodeFilterFn { + (connection: Connection): boolean; +} + declare type noopFn = (...args: any[]) => void; declare type emitFn = (event: string | symbol, ...args: any[]) => boolean; @@ -17,6 +25,9 @@ interface TransportOptions { sniffOnConnectionFault: boolean; sniffEndpoint: string; sniffOnStart: boolean; + nodeFilter?: nodeFilterFn; + nodeSelector?: string | nodeSelectorFn; + headers?: anyObject; } export interface ApiResponse { diff --git a/lib/Transport.js b/lib/Transport.js index 6dafb3574..f16907bec 100644 --- a/lib/Transport.js +++ b/lib/Transport.js @@ -32,6 +32,17 @@ class Transport { this.sniffOnConnectionFault = opts.sniffOnConnectionFault this.sniffEndpoint = opts.sniffEndpoint + this.nodeFilter = opts.nodeFilter || defaultNodeFilter + if (typeof opts.nodeSelector === 'function') { + this.nodeSelector = opts.nodeSelector + } else if (opts.nodeSelector === 'round-robin') { + this.nodeSelector = roundRobinSelector() + } else if (opts.nodeSelector === 'random') { + this.nodeSelector = randomSelector + } else { + this.nodeSelector = roundRobinSelector() + } + this._sniffEnabled = typeof this.sniffInterval === 'number' this._nextSniff = this._sniffEnabled ? (Date.now() + this.sniffInterval) : 0 this._isSniffing = false @@ -275,7 +286,10 @@ class Transport { this.sniff(Transport.sniffReasons.SNIFF_INTERVAL) } this.connectionPool.resurrect(now) - return this.connectionPool.getConnection() + return this.connectionPool.getConnection({ + filter: this.nodeFilter, + selector: this.nodeSelector + }) } sniff (reason = Transport.sniffReasons.DEFAULT, callback = noop) { @@ -340,4 +354,30 @@ function isStream (obj) { return typeof obj.pipe === 'function' } +function defaultNodeFilter (node) { + // avoid master only nodes + if (node.roles.master === true && + node.roles.data === false && + node.roles.ingest === false) { + return false + } + return true +} + +function roundRobinSelector () { + var current = -1 + return function _roundRobinSelector (connections) { + if (++current >= connections.length) { + current = 0 + } + return connections[current] + } +} + +function randomSelector (connections) { + const index = Math.floor(Math.random() * connections.length) + return connections[index] +} + module.exports = Transport +module.exports.internals = { defaultNodeFilter, roundRobinSelector, randomSelector } diff --git a/test/unit/child.test.js b/test/unit/child.test.js new file mode 100644 index 000000000..5e8405de3 --- /dev/null +++ b/test/unit/child.test.js @@ -0,0 +1,194 @@ +'use strict' + +const { test } = require('tap') +const { Client, errors } = require('../../index') +const { + buildServer, + connection: { MockConnection } +} = require('../utils') + +test('Should create a child client (headers check)', t => { + t.plan(4) + + var count = 0 + function handler (req, res) { + if (count++ === 0) { + t.match(req.headers, { 'x-foo': 'bar' }) + } else { + t.match(req.headers, { 'x-baz': 'faz' }) + } + res.setHeader('Content-Type', 'application/json;utf=8') + res.end(JSON.stringify({ hello: 'world' })) + } + + buildServer(handler, ({ port }, server) => { + const client = new Client({ + node: `http://localhost:${port}`, + headers: { 'x-foo': 'bar' } + }) + const child = client.child({ + headers: { 'x-baz': 'faz' } + }) + + client.info((err, res) => { + t.error(err) + child.info((err, res) => { + t.error(err) + server.stop() + }) + }) + }) +}) + +test('Should create a child client (timeout check)', t => { + t.plan(2) + + function handler (req, res) { + setTimeout(() => { + res.setHeader('Content-Type', 'application/json;utf=8') + res.end(JSON.stringify({ hello: 'world' })) + }, 50) + } + + buildServer(handler, ({ port }, server) => { + const client = new Client({ node: `http://localhost:${port}` }) + const child = client.child({ requestTimeout: 25, maxRetries: 0 }) + + client.info((err, res) => { + t.error(err) + child.info((err, res) => { + t.true(err instanceof errors.TimeoutError) + server.stop() + }) + }) + }) +}) + +test('Client extensions', t => { + t.test('One level', t => { + t.plan(1) + + const client = new Client({ node: 'http://localhost:9200' }) + client.extend('utility.index', () => { + return () => t.ok('called') + }) + + const child = client.child() + child.utility.index() + }) + + t.test('Two levels', t => { + t.plan(2) + + const client = new Client({ node: 'http://localhost:9200' }) + client.extend('utility.index', () => { + return () => t.ok('called') + }) + + const child = client.child() + child.extend('utility.search', () => { + return () => t.ok('called') + }) + + const grandchild = child.child() + grandchild.utility.index() + grandchild.utility.search() + }) + + t.test('The child should not extend the parent', t => { + t.plan(1) + + const client = new Client({ node: 'http://localhost:9200' }) + const child = client.child() + + child.extend('utility.index', () => { + return () => t.fail('Should not be called') + }) + + try { + client.utility.index() + } catch (err) { + t.ok(err) + } + }) + + t.end() +}) + +test('Should share the event emitter', t => { + t.test('One level', t => { + t.plan(2) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + const child = client.child() + + client.on('response', (err, meta) => { + t.error(err) + }) + + child.info((err, res) => { + t.error(err) + }) + }) + + t.test('Two levels', t => { + t.plan(2) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + const child = client.child() + const grandchild = child.child() + + client.on('response', (err, meta) => { + t.error(err) + }) + + grandchild.info((err, res) => { + t.error(err) + }) + }) + + t.test('Child listener - one level', t => { + t.plan(2) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + const child = client.child() + + child.on('response', (err, meta) => { + t.error(err) + }) + + child.info((err, res) => { + t.error(err) + }) + }) + + t.test('Child listener - two levels', t => { + t.plan(2) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + const child = client.child() + const grandchild = child.child() + + child.on('response', (err, meta) => { + t.error(err) + }) + + grandchild.info((err, res) => { + t.error(err) + }) + }) + + t.end() +}) diff --git a/test/unit/connection-pool.test.js b/test/unit/connection-pool.test.js index 0969e21a0..534d9fc5e 100644 --- a/test/unit/connection-pool.test.js +++ b/test/unit/connection-pool.test.js @@ -4,6 +4,7 @@ const { test } = require('tap') const { URL } = require('url') const ConnectionPool = require('../../lib/ConnectionPool') const Connection = require('../../lib/Connection') +const { defaultNodeFilter, roundRobinSelector } = require('../../lib/Transport').internals const { connection: { MockConnection, MockConnectionTimeout } } = require('../utils') test('API', t => { @@ -227,22 +228,6 @@ test('API', t => { pool.getConnection({ filter }) }) - t.test('filter as ConnectionPool option', t => { - t.plan(3) - - const href1 = 'http://localhost:9200/' - const href2 = 'http://localhost:9200/other' - const pool = new ConnectionPool({ - Connection, - nodeFilter: node => { - t.ok('called') - return true - } - }) - pool.addConnection([href1, href2]) - t.strictEqual(pool.getConnection().id, href1) - }) - t.end() }) @@ -498,27 +483,16 @@ test('API', t => { test('Node selector', t => { t.test('round-robin', t => { t.plan(1) - const pool = new ConnectionPool({ Connection, nodeSelector: 'round-robin' }) + const pool = new ConnectionPool({ Connection }) pool.addConnection('http://localhost:9200/') - t.true(pool.getConnection() instanceof Connection) + t.true(pool.getConnection({ selector: roundRobinSelector() }) instanceof Connection) }) t.test('random', t => { t.plan(1) - const pool = new ConnectionPool({ Connection, nodeSelector: 'random' }) + const pool = new ConnectionPool({ Connection }) pool.addConnection('http://localhost:9200/') - t.true(pool.getConnection() instanceof Connection) - }) - - t.test('custom function', t => { - t.plan(2) - const nodeSelector = connections => { - t.ok('called') - return connections[0] - } - const pool = new ConnectionPool({ Connection, nodeSelector }) - pool.addConnection('http://localhost:9200/') - t.true(pool.getConnection() instanceof Connection) + t.true(pool.getConnection({ selector: roundRobinSelector() }) instanceof Connection) }) t.end() @@ -529,7 +503,7 @@ test('Node filter', t => { t.plan(1) const pool = new ConnectionPool({ Connection }) pool.addConnection({ url: new URL('http://localhost:9200/') }) - t.true(pool.getConnection() instanceof Connection) + t.true(pool.getConnection({ filter: defaultNodeFilter }) instanceof Connection) }) t.test('Should filter master only nodes', t => { @@ -544,29 +518,7 @@ test('Node filter', t => { ml: false } }) - t.strictEqual(pool.getConnection(), null) - }) - - t.test('custom', t => { - t.plan(2) - const nodeFilter = node => { - t.ok('called') - return true - } - const pool = new ConnectionPool({ Connection, nodeFilter }) - pool.addConnection({ url: new URL('http://localhost:9200/') }) - t.true(pool.getConnection() instanceof Connection) - }) - - t.test('custom (filter)', t => { - t.plan(2) - const nodeFilter = node => { - t.ok('called') - return false - } - const pool = new ConnectionPool({ Connection, nodeFilter }) - pool.addConnection({ url: new URL('http://localhost:9200/') }) - t.strictEqual(pool.getConnection(), null) + t.strictEqual(pool.getConnection({ filter: defaultNodeFilter }), null) }) t.end() diff --git a/test/unit/selectors.test.js b/test/unit/selectors.test.js index 4ce57b581..cde2d52a5 100644 --- a/test/unit/selectors.test.js +++ b/test/unit/selectors.test.js @@ -1,7 +1,7 @@ 'use strict' const { test } = require('tap') -const { roundRobinSelector, randomSelector } = require('../../lib/ConnectionPool').internals +const { roundRobinSelector, randomSelector } = require('../../lib/Transport').internals test('RoundRobinSelector', t => { const selector = roundRobinSelector() diff --git a/test/unit/transport.test.js b/test/unit/transport.test.js index 8fba614d1..6fe920c49 100644 --- a/test/unit/transport.test.js +++ b/test/unit/transport.test.js @@ -1978,3 +1978,36 @@ test('Headers configuration', t => { t.end() }) + +test('nodeFilter and nodeSelector', t => { + t.plan(4) + + const pool = new ConnectionPool({ Connection: MockConnection }) + pool.addConnection('http://localhost:9200') + + const transport = new Transport({ + emit: () => {}, + connectionPool: pool, + serializer: new Serializer(), + maxRetries: 3, + requestTimeout: 30000, + sniffInterval: false, + sniffOnStart: false, + nodeFilter: () => { + t.ok('called') + return true + }, + nodeSelector: conns => { + t.ok('called') + return conns[0] + } + }) + + transport.request({ + method: 'GET', + path: '/hello' + }, (err, { body }) => { + t.error(err) + t.deepEqual(body, { hello: 'world' }) + }) +})