Child client support (#768)

With this pr we introduce the `client.child` API, which returns a new client instance that shares the connection pool with the parent client.
This feature can be handy if you need to have multiple client instances with different configurations, but with a shared connection pool.

Example:

```js
const { Client } = require('@elastic/elasticsearch')
const client = new Client({ node: 'http://localhost:9200' })
const child = client.child({
  headers: { 'x-foo': 'bar' },
  requestTimeout: 1000
})

client.info(console.log)
child.info(console.log)
```

**Open questions:**

* Currently, the event emitter is shared between the parent and the child(ren), is it ok?
* Currently, if you extend the parent client, the child client will have the same extensions, while if the child client adds an extension, the parent client will not be extended. Is it ok?

**Caveats:**

* You can override _any_ option except for the connection pool specific options (`ssl`, `agent`, `pingTimeout`, `Connection`, and `resurrectStrategy`).
* You can't specify a new `Connection` class.
* If you call `close` in any of the parent/child clients, every client will be closed.

_Note: the `nodeFilter` and `nodeSelector` options are now `Transport` options and no longer `ConnectionPool` options._
This commit is contained in:
Tomas Della Vedova
2019-02-21 12:48:49 +01:00
committed by GitHub
parent 66e8d61476
commit 5b856cd4c2
11 changed files with 357 additions and 116 deletions

23
docs/child.asciidoc Normal file
View File

@ -0,0 +1,23 @@
= Creating a child client
There are some use cases where you may need multiple instances of the client. You can easily do that by calling `new Client()` as many times as you need, but you will lose all the benefits of using one single client, such as the long living connections and the connection pool handling. +
To avoid this problem the client offers a `child` API, which returns a new client instance that shares the connection pool with the parent client. +
NOTE: The event emitter is shared between the parent and the child(ren), and if you extend the parent client, the child client will have the same extensions, while if the child client adds an extension, the parent client will not be extended.
You can pass to the `child` every client option you would pass to a normal client, but the connection pool specific options (`ssl`, `agent`, `pingTimeout`, `Connection`, and `resurrectStrategy`).
CAUTION: If you call `close` in any of the parent/child clients, every client will be closed.
[source,js]
----
const { Client } = require('@elastic/elasticsearch')
const client = new Client({ node: 'http://localhost:9200' })
const child = client.child({
headers: { 'x-foo': 'bar' },
requestTimeout: 1000
})
client.info(console.log)
child.info(console.log)
----

8
index.d.ts vendored
View File

@ -7,10 +7,12 @@ import Transport, {
EventMeta,
SniffMeta,
TransportRequestParams,
TransportRequestOptions
TransportRequestOptions,
nodeFilterFn,
nodeSelectorFn
} from './lib/Transport';
import Connection, { AgentOptions } from './lib/Connection';
import ConnectionPool, { nodeSelectorFn, nodeFilterFn, ResurrectMeta } from './lib/ConnectionPool';
import ConnectionPool, { ResurrectMeta } from './lib/ConnectionPool';
import Serializer from './lib/Serializer';
import * as RequestParams from './api/requestParams';
import * as errors from './lib/errors';
@ -65,6 +67,7 @@ interface ClientOptions {
agent?: AgentOptions;
nodeFilter?: nodeFilterFn;
nodeSelector?: nodeSelectorFn | string;
headers?: anyObject;
cloud?: {
id: string;
username: string;
@ -78,6 +81,7 @@ declare class Client extends EventEmitter {
transport: Transport;
serializer: Serializer;
extend: ClientExtends;
child(opts?: ClientOptions): Client;
close(callback?: Function): Promise<void> | void;
bulk: ApiMethod<RequestParams.Bulk>
cat: {

View File

@ -9,6 +9,10 @@ const Serializer = require('./lib/Serializer')
const errors = require('./lib/errors')
const { ConfigurationError } = errors
const kInitialOptions = Symbol('elasticsearchjs-initial-options')
const kChild = Symbol('elasticsearchjs-child')
const kExtensions = Symbol('elasticsearchjs-extensions')
const buildApi = require('./api')
class Client extends EventEmitter {
@ -52,19 +56,18 @@ class Client extends EventEmitter {
agent: null,
headers: {},
nodeFilter: null,
nodeWeighter: null,
nodeSelector: 'round-robin'
}, opts)
this[kInitialOptions] = options
this[kExtensions] = []
this.serializer = new options.Serializer()
this.connectionPool = new options.ConnectionPool({
pingTimeout: options.pingTimeout,
resurrectStrategy: options.resurrectStrategy,
ssl: options.ssl,
agent: options.agent,
nodeFilter: options.nodeFilter,
nodeWeighter: options.nodeWeighter,
nodeSelector: options.nodeSelector,
Connection: options.Connection,
emit: this.emit.bind(this),
sniffEnabled: options.sniffInterval !== false ||
@ -73,7 +76,9 @@ class Client extends EventEmitter {
})
// Add the connections before initialize the Transport
this.connectionPool.addConnection(options.node || options.nodes)
if (opts[kChild] !== true) {
this.connectionPool.addConnection(options.node || options.nodes)
}
this.transport = new options.Transport({
emit: this.emit.bind(this),
@ -87,7 +92,9 @@ class Client extends EventEmitter {
sniffEndpoint: options.sniffEndpoint,
suggestCompression: options.suggestCompression,
compression: options.compression,
headers: options.headers
headers: options.headers,
nodeFilter: options.nodeFilter,
nodeSelector: options.nodeSelector
})
const apis = buildApi({
@ -135,6 +142,31 @@ class Client extends EventEmitter {
ConfigurationError
})
}
this[kExtensions].push({ name, opts, fn })
}
child (opts) {
// Merge the new options with the initial ones
const initialOptions = Object.assign({}, this[kInitialOptions], opts)
// Tell to the client that we are creating a child client
initialOptions[kChild] = true
const client = new Client(initialOptions)
// Reuse the same connection pool
client.connectionPool = this.connectionPool
client.transport.connectionPool = this.connectionPool
// Share event listener
const emitter = this.emit.bind(this)
client.emit = emitter
client.connectionPool.emit = emitter
client.transport.emit = emitter
client.on = this.on.bind(this)
// Add parent extensions
this[kExtensions].forEach(({ name, opts, fn }) => {
client.extend(name, opts, fn)
})
return client
}
close (callback) {

View File

@ -2,14 +2,7 @@
import { SecureContextOptions } from 'tls';
import Connection, { AgentOptions } from './Connection';
export interface nodeSelectorFn {
(connections: Connection[]): Connection;
}
export interface nodeFilterFn {
(connection: Connection): boolean;
}
import { nodeFilterFn, nodeSelectorFn } from './Transport';
interface ConnectionPoolOptions {
ssl?: SecureContextOptions;
@ -17,8 +10,6 @@ interface ConnectionPoolOptions {
pingTimeout?: number;
Connection: typeof Connection;
resurrectStrategy?: string;
nodeFilter?: nodeFilterFn;
nodeSelector?: string | nodeSelectorFn;
}
export interface getConnectionOptions {
@ -46,8 +37,6 @@ export default class ConnectionPool {
resurrectTimeout: number;
resurrectTimeoutCutoff: number;
pingTimeout: number;
nodeFilter: nodeFilterFn;
nodeSelector: nodeSelectorFn;
Connection: typeof Connection;
resurrectStrategy: number;
constructor(opts?: ConnectionPoolOptions);

View File

@ -24,21 +24,10 @@ class ConnectionPool {
// the timeout doesn't increase
this.resurrectTimeoutCutoff = 5
this.pingTimeout = opts.pingTimeout
this.nodeFilter = opts.nodeFilter || defaultNodeFilter
this.Connection = opts.Connection
this.emit = opts.emit || noop
this._sniffEnabled = opts.sniffEnabled || false
if (typeof opts.nodeSelector === 'function') {
this.nodeSelector = opts.nodeSelector
} else if (opts.nodeSelector === 'round-robin') {
this.nodeSelector = roundRobinSelector()
} else if (opts.nodeSelector === 'random') {
this.nodeSelector = randomSelector
} else {
this.nodeSelector = roundRobinSelector()
}
const resurrectStrategy = opts.resurrectStrategy || 'ping'
this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy]
assert(
@ -174,8 +163,8 @@ class ConnectionPool {
* @returns {object|null} connection
*/
getConnection (opts = {}) {
const filter = opts.filter || this.nodeFilter
const selector = opts.selector || this.nodeSelector
const filter = opts.filter || (() => true)
const selector = opts.selector || (c => c[0])
// TODO: can we cache this?
const connections = []
@ -381,30 +370,4 @@ ConnectionPool.resurrectStrategies = {
// .sort((a, b) => a[0] - b[0])
// .map(a => a[1])
function defaultNodeFilter (node) {
// avoid master only nodes
if (node.roles.master === true &&
node.roles.data === false &&
node.roles.ingest === false) {
return false
}
return true
}
function roundRobinSelector () {
var current = -1
return function _roundRobinSelector (connections) {
if (++current >= connections.length) {
current = 0
}
return connections[current]
}
}
function randomSelector (connections) {
const index = Math.floor(Math.random() * connections.length)
return connections[index]
}
module.exports = ConnectionPool
module.exports.internals = { defaultNodeFilter, roundRobinSelector, randomSelector }

11
lib/Transport.d.ts vendored
View File

@ -2,6 +2,14 @@ import ConnectionPool from './ConnectionPool';
import Connection from './Connection';
import Serializer from './Serializer';
export interface nodeSelectorFn {
(connections: Connection[]): Connection;
}
export interface nodeFilterFn {
(connection: Connection): boolean;
}
declare type noopFn = (...args: any[]) => void;
declare type emitFn = (event: string | symbol, ...args: any[]) => boolean;
@ -17,6 +25,9 @@ interface TransportOptions {
sniffOnConnectionFault: boolean;
sniffEndpoint: string;
sniffOnStart: boolean;
nodeFilter?: nodeFilterFn;
nodeSelector?: string | nodeSelectorFn;
headers?: anyObject;
}
export interface ApiResponse {

View File

@ -32,6 +32,17 @@ class Transport {
this.sniffOnConnectionFault = opts.sniffOnConnectionFault
this.sniffEndpoint = opts.sniffEndpoint
this.nodeFilter = opts.nodeFilter || defaultNodeFilter
if (typeof opts.nodeSelector === 'function') {
this.nodeSelector = opts.nodeSelector
} else if (opts.nodeSelector === 'round-robin') {
this.nodeSelector = roundRobinSelector()
} else if (opts.nodeSelector === 'random') {
this.nodeSelector = randomSelector
} else {
this.nodeSelector = roundRobinSelector()
}
this._sniffEnabled = typeof this.sniffInterval === 'number'
this._nextSniff = this._sniffEnabled ? (Date.now() + this.sniffInterval) : 0
this._isSniffing = false
@ -275,7 +286,10 @@ class Transport {
this.sniff(Transport.sniffReasons.SNIFF_INTERVAL)
}
this.connectionPool.resurrect(now)
return this.connectionPool.getConnection()
return this.connectionPool.getConnection({
filter: this.nodeFilter,
selector: this.nodeSelector
})
}
sniff (reason = Transport.sniffReasons.DEFAULT, callback = noop) {
@ -340,4 +354,30 @@ function isStream (obj) {
return typeof obj.pipe === 'function'
}
function defaultNodeFilter (node) {
// avoid master only nodes
if (node.roles.master === true &&
node.roles.data === false &&
node.roles.ingest === false) {
return false
}
return true
}
function roundRobinSelector () {
var current = -1
return function _roundRobinSelector (connections) {
if (++current >= connections.length) {
current = 0
}
return connections[current]
}
}
function randomSelector (connections) {
const index = Math.floor(Math.random() * connections.length)
return connections[index]
}
module.exports = Transport
module.exports.internals = { defaultNodeFilter, roundRobinSelector, randomSelector }

194
test/unit/child.test.js Normal file
View File

@ -0,0 +1,194 @@
'use strict'
const { test } = require('tap')
const { Client, errors } = require('../../index')
const {
buildServer,
connection: { MockConnection }
} = require('../utils')
test('Should create a child client (headers check)', t => {
t.plan(4)
var count = 0
function handler (req, res) {
if (count++ === 0) {
t.match(req.headers, { 'x-foo': 'bar' })
} else {
t.match(req.headers, { 'x-baz': 'faz' })
}
res.setHeader('Content-Type', 'application/json;utf=8')
res.end(JSON.stringify({ hello: 'world' }))
}
buildServer(handler, ({ port }, server) => {
const client = new Client({
node: `http://localhost:${port}`,
headers: { 'x-foo': 'bar' }
})
const child = client.child({
headers: { 'x-baz': 'faz' }
})
client.info((err, res) => {
t.error(err)
child.info((err, res) => {
t.error(err)
server.stop()
})
})
})
})
test('Should create a child client (timeout check)', t => {
t.plan(2)
function handler (req, res) {
setTimeout(() => {
res.setHeader('Content-Type', 'application/json;utf=8')
res.end(JSON.stringify({ hello: 'world' }))
}, 50)
}
buildServer(handler, ({ port }, server) => {
const client = new Client({ node: `http://localhost:${port}` })
const child = client.child({ requestTimeout: 25, maxRetries: 0 })
client.info((err, res) => {
t.error(err)
child.info((err, res) => {
t.true(err instanceof errors.TimeoutError)
server.stop()
})
})
})
})
test('Client extensions', t => {
t.test('One level', t => {
t.plan(1)
const client = new Client({ node: 'http://localhost:9200' })
client.extend('utility.index', () => {
return () => t.ok('called')
})
const child = client.child()
child.utility.index()
})
t.test('Two levels', t => {
t.plan(2)
const client = new Client({ node: 'http://localhost:9200' })
client.extend('utility.index', () => {
return () => t.ok('called')
})
const child = client.child()
child.extend('utility.search', () => {
return () => t.ok('called')
})
const grandchild = child.child()
grandchild.utility.index()
grandchild.utility.search()
})
t.test('The child should not extend the parent', t => {
t.plan(1)
const client = new Client({ node: 'http://localhost:9200' })
const child = client.child()
child.extend('utility.index', () => {
return () => t.fail('Should not be called')
})
try {
client.utility.index()
} catch (err) {
t.ok(err)
}
})
t.end()
})
test('Should share the event emitter', t => {
t.test('One level', t => {
t.plan(2)
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const child = client.child()
client.on('response', (err, meta) => {
t.error(err)
})
child.info((err, res) => {
t.error(err)
})
})
t.test('Two levels', t => {
t.plan(2)
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const child = client.child()
const grandchild = child.child()
client.on('response', (err, meta) => {
t.error(err)
})
grandchild.info((err, res) => {
t.error(err)
})
})
t.test('Child listener - one level', t => {
t.plan(2)
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const child = client.child()
child.on('response', (err, meta) => {
t.error(err)
})
child.info((err, res) => {
t.error(err)
})
})
t.test('Child listener - two levels', t => {
t.plan(2)
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const child = client.child()
const grandchild = child.child()
child.on('response', (err, meta) => {
t.error(err)
})
grandchild.info((err, res) => {
t.error(err)
})
})
t.end()
})

View File

@ -4,6 +4,7 @@ const { test } = require('tap')
const { URL } = require('url')
const ConnectionPool = require('../../lib/ConnectionPool')
const Connection = require('../../lib/Connection')
const { defaultNodeFilter, roundRobinSelector } = require('../../lib/Transport').internals
const { connection: { MockConnection, MockConnectionTimeout } } = require('../utils')
test('API', t => {
@ -227,22 +228,6 @@ test('API', t => {
pool.getConnection({ filter })
})
t.test('filter as ConnectionPool option', t => {
t.plan(3)
const href1 = 'http://localhost:9200/'
const href2 = 'http://localhost:9200/other'
const pool = new ConnectionPool({
Connection,
nodeFilter: node => {
t.ok('called')
return true
}
})
pool.addConnection([href1, href2])
t.strictEqual(pool.getConnection().id, href1)
})
t.end()
})
@ -498,27 +483,16 @@ test('API', t => {
test('Node selector', t => {
t.test('round-robin', t => {
t.plan(1)
const pool = new ConnectionPool({ Connection, nodeSelector: 'round-robin' })
const pool = new ConnectionPool({ Connection })
pool.addConnection('http://localhost:9200/')
t.true(pool.getConnection() instanceof Connection)
t.true(pool.getConnection({ selector: roundRobinSelector() }) instanceof Connection)
})
t.test('random', t => {
t.plan(1)
const pool = new ConnectionPool({ Connection, nodeSelector: 'random' })
const pool = new ConnectionPool({ Connection })
pool.addConnection('http://localhost:9200/')
t.true(pool.getConnection() instanceof Connection)
})
t.test('custom function', t => {
t.plan(2)
const nodeSelector = connections => {
t.ok('called')
return connections[0]
}
const pool = new ConnectionPool({ Connection, nodeSelector })
pool.addConnection('http://localhost:9200/')
t.true(pool.getConnection() instanceof Connection)
t.true(pool.getConnection({ selector: roundRobinSelector() }) instanceof Connection)
})
t.end()
@ -529,7 +503,7 @@ test('Node filter', t => {
t.plan(1)
const pool = new ConnectionPool({ Connection })
pool.addConnection({ url: new URL('http://localhost:9200/') })
t.true(pool.getConnection() instanceof Connection)
t.true(pool.getConnection({ filter: defaultNodeFilter }) instanceof Connection)
})
t.test('Should filter master only nodes', t => {
@ -544,29 +518,7 @@ test('Node filter', t => {
ml: false
}
})
t.strictEqual(pool.getConnection(), null)
})
t.test('custom', t => {
t.plan(2)
const nodeFilter = node => {
t.ok('called')
return true
}
const pool = new ConnectionPool({ Connection, nodeFilter })
pool.addConnection({ url: new URL('http://localhost:9200/') })
t.true(pool.getConnection() instanceof Connection)
})
t.test('custom (filter)', t => {
t.plan(2)
const nodeFilter = node => {
t.ok('called')
return false
}
const pool = new ConnectionPool({ Connection, nodeFilter })
pool.addConnection({ url: new URL('http://localhost:9200/') })
t.strictEqual(pool.getConnection(), null)
t.strictEqual(pool.getConnection({ filter: defaultNodeFilter }), null)
})
t.end()

View File

@ -1,7 +1,7 @@
'use strict'
const { test } = require('tap')
const { roundRobinSelector, randomSelector } = require('../../lib/ConnectionPool').internals
const { roundRobinSelector, randomSelector } = require('../../lib/Transport').internals
test('RoundRobinSelector', t => {
const selector = roundRobinSelector()

View File

@ -1978,3 +1978,36 @@ test('Headers configuration', t => {
t.end()
})
test('nodeFilter and nodeSelector', t => {
t.plan(4)
const pool = new ConnectionPool({ Connection: MockConnection })
pool.addConnection('http://localhost:9200')
const transport = new Transport({
emit: () => {},
connectionPool: pool,
serializer: new Serializer(),
maxRetries: 3,
requestTimeout: 30000,
sniffInterval: false,
sniffOnStart: false,
nodeFilter: () => {
t.ok('called')
return true
},
nodeSelector: conns => {
t.ok('called')
return conns[0]
}
})
transport.request({
method: 'GET',
path: '/hello'
}, (err, { body }) => {
t.error(err)
t.deepEqual(body, { hello: 'world' })
})
})