Improve child API (#1245)

This commit is contained in:
Tomas Della Vedova
2020-07-06 11:39:08 +02:00
committed by GitHub
parent 8c4042d913
commit 8d7859d2e2
3 changed files with 189 additions and 65 deletions

40
index.d.ts vendored
View File

@ -102,7 +102,7 @@ interface ClientOptions {
}
}
declare class Client extends EventEmitter {
declare class Client {
constructor(opts?: ClientOptions);
connectionPool: ConnectionPool;
transport: Transport;
@ -111,6 +111,16 @@ declare class Client extends EventEmitter {
helpers: Helpers;
child(opts?: ClientOptions): Client;
close(callback?: Function): Promise<void> | void;
emit(event: string | symbol, ...args: any[]): boolean;
on(event: 'request', listener: (err: ApiError, meta: RequestEvent) => void): this;
on(event: 'response', listener: (err: ApiError, meta: RequestEvent) => void): this;
on(event: 'sniff', listener: (err: ApiError, meta: RequestEvent) => void): this;
on(event: 'resurrect', listener: (err: null, meta: ResurrectEvent) => void): this;
once(event: 'request', listener: (err: ApiError, meta: RequestEvent) => void): this;
once(event: 'response', listener: (err: ApiError, meta: RequestEvent) => void): this;
once(event: 'sniff', listener: (err: ApiError, meta: RequestEvent) => void): this;
once(event: 'resurrect', listener: (err: null, meta: ResurrectEvent) => void): this;
off(event: string | symbol, listener: (...args: any[]) => void): this;
/* GENERATED */
async_search: {
delete<TResponse = Record<string, any>, TContext = unknown>(params?: RequestParams.AsyncSearchDelete, options?: TransportRequestOptions): TransportRequestPromise<ApiResponse<TResponse, TContext>>
@ -2609,34 +2619,6 @@ declare class Client extends EventEmitter {
/* /GENERATED */
}
// We must redeclare the EventEmitter class so we can provide
// better type definitions for our events, otherwise the default
// signature is `(event: string | symbol, listener: (...args: any[]) => void): this;`
declare class EventEmitter {
addListener(event: string | symbol, listener: (...args: any[]) => void): this;
on(event: 'request', listener: (err: ApiError, meta: RequestEvent) => void): this;
on(event: 'response', listener: (err: ApiError, meta: RequestEvent) => void): this;
on(event: 'sniff', listener: (err: ApiError, meta: RequestEvent) => void): this;
on(event: 'resurrect', listener: (err: null, meta: ResurrectEvent) => void): this;
once(event: 'request', listener: (err: ApiError, meta: RequestEvent) => void): this;
once(event: 'response', listener: (err: ApiError, meta: RequestEvent) => void): this;
once(event: 'sniff', listener: (err: ApiError, meta: RequestEvent) => void): this;
once(event: 'resurrect', listener: (err: null, meta: ResurrectEvent) => void): this;
removeListener(event: string | symbol, listener: (...args: any[]) => void): this;
off(event: string | symbol, listener: (...args: any[]) => void): this;
removeAllListeners(event?: string | symbol): this;
setMaxListeners(n: number): this;
getMaxListeners(): number;
listeners(event: string | symbol): Function[];
rawListeners(event: string | symbol): Function[];
emit(event: string | symbol, ...args: any[]): boolean;
listenerCount(type: string | symbol): number;
// Added in Node 6...
prependListener(event: string | symbol, listener: (...args: any[]) => void): this;
prependOnceListener(event: string | symbol, listener: (...args: any[]) => void): this;
eventNames(): Array<string | symbol>;
}
declare const events: {
RESPONSE: string;
REQUEST: string;

View File

@ -21,12 +21,12 @@ const { ConfigurationError } = errors
const kInitialOptions = Symbol('elasticsearchjs-initial-options')
const kChild = Symbol('elasticsearchjs-child')
const kExtensions = Symbol('elasticsearchjs-extensions')
const kEventEmitter = Symbol('elasticsearchjs-event-emitter')
const buildApi = require('./api')
class Client extends EventEmitter {
class Client {
constructor (opts = {}) {
super()
if (opts.cloud) {
const { id, username, password } = opts.cloud
// the cloud id is `cluster-name:base64encodedurl`
@ -89,29 +89,33 @@ class Client extends EventEmitter {
this[kInitialOptions] = options
this[kExtensions] = []
this.name = options.name
this.serializer = new options.Serializer()
this.connectionPool = new options.ConnectionPool({
pingTimeout: options.pingTimeout,
resurrectStrategy: options.resurrectStrategy,
ssl: options.ssl,
agent: options.agent,
Connection: options.Connection,
auth: options.auth,
emit: this.emit.bind(this),
sniffEnabled: options.sniffInterval !== false ||
options.sniffOnStart !== false ||
options.sniffOnConnectionFault !== false
})
// Add the connections before initialize the Transport
if (opts[kChild] !== true) {
if (opts[kChild] !== undefined) {
this.serializer = options[kChild].serializer
this.connectionPool = options[kChild].connectionPool
this[kEventEmitter] = options[kChild].eventEmitter
} else {
this[kEventEmitter] = new EventEmitter()
this.serializer = new options.Serializer()
this.connectionPool = new options.ConnectionPool({
pingTimeout: options.pingTimeout,
resurrectStrategy: options.resurrectStrategy,
ssl: options.ssl,
agent: options.agent,
Connection: options.Connection,
auth: options.auth,
emit: this[kEventEmitter].emit.bind(this[kEventEmitter]),
sniffEnabled: options.sniffInterval !== false ||
options.sniffOnStart !== false ||
options.sniffOnConnectionFault !== false
})
// Add the connections before initialize the Transport
this.connectionPool.addConnection(options.node || options.nodes)
}
this.transport = new options.Transport({
emit: this.emit.bind(this),
emit: this[kEventEmitter].emit.bind(this[kEventEmitter]),
connectionPool: this.connectionPool,
serializer: this.serializer,
maxRetries: options.maxRetries,
@ -141,9 +145,26 @@ class Client extends EventEmitter {
ConfigurationError
})
Object.keys(apis).forEach(api => {
this[api] = apis[api]
})
const apiNames = Object.keys(apis)
for (var i = 0, len = apiNames.length; i < len; i++) {
this[apiNames[i]] = apis[apiNames[i]]
}
}
get emit () {
return this[kEventEmitter].emit.bind(this[kEventEmitter])
}
get on () {
return this[kEventEmitter].on.bind(this[kEventEmitter])
}
get once () {
return this[kEventEmitter].once.bind(this[kEventEmitter])
}
get off () {
return this[kEventEmitter].off.bind(this[kEventEmitter])
}
extend (name, opts, fn) {
@ -187,23 +208,20 @@ class Client extends EventEmitter {
child (opts) {
// Merge the new options with the initial ones
const initialOptions = Object.assign({}, this[kInitialOptions], opts)
// Tell to the client that we are creating a child client
initialOptions[kChild] = true
// Pass to the child client the parent instances that cannot be overriden
initialOptions[kChild] = {
connectionPool: this.connectionPool,
serializer: this.serializer,
eventEmitter: this[kEventEmitter]
}
const client = new Client(initialOptions)
// Reuse the same connection pool
client.connectionPool = this.connectionPool
client.transport.connectionPool = this.connectionPool
// Share event listener
const emitter = this.emit.bind(this)
client.emit = emitter
client.connectionPool.emit = emitter
client.transport.emit = emitter
client.on = this.on.bind(this)
// Add parent extensions
this[kExtensions].forEach(({ name, opts, fn }) => {
client.extend(name, opts, fn)
})
if (this[kExtensions].length > 0) {
this[kExtensions].forEach(({ name, opts, fn }) => {
client.extend(name, opts, fn)
})
}
return client
}

View File

@ -5,6 +5,7 @@
'use strict'
const { test } = require('tap')
const semver = require('semver')
const { Client, events } = require('../../index')
const { TimeoutError } = require('../../lib/errors')
const { connection: { MockConnection, MockConnectionTimeout } } = require('../utils')
@ -54,6 +55,113 @@ test('Should emit a request event when a request is performed', t => {
})
})
test('Should emit a request event once when a request is performed', t => {
t.plan(4)
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
client.once(events.REQUEST, (err, request) => {
t.error(err)
t.match(request, {
body: null,
statusCode: null,
headers: null,
warnings: null,
meta: {
context: null,
name: 'elasticsearch-js',
request: {
params: {
method: 'GET',
path: '/test/_search',
body: '',
querystring: 'q=foo%3Abar'
},
options: {},
id: 1
},
connection: {
id: 'http://localhost:9200'
},
attempts: 0,
aborted: false
}
})
})
client.search({
index: 'test',
q: 'foo:bar'
}, (err, result) => {
t.error(err)
})
client.search({
index: 'test',
q: 'foo:bar'
}, (err, result) => {
t.error(err)
})
})
test('Remove an event', { skip: semver.lt(process.versions.node, '10.0.0') }, t => {
t.plan(4)
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
client.on(events.REQUEST, onRequest)
function onRequest (err, request) {
t.error(err)
t.match(request, {
body: null,
statusCode: null,
headers: null,
warnings: null,
meta: {
context: null,
name: 'elasticsearch-js',
request: {
params: {
method: 'GET',
path: '/test/_search',
body: '',
querystring: 'q=foo%3Abar'
},
options: {},
id: 1
},
connection: {
id: 'http://localhost:9200'
},
attempts: 0,
aborted: false
}
})
client.off('request', onRequest)
}
client.search({
index: 'test',
q: 'foo:bar'
}, (err, result) => {
t.error(err)
})
client.search({
index: 'test',
q: 'foo:bar'
}, (err, result) => {
t.error(err)
})
})
test('Should emit a response event in case of a successful response', t => {
t.plan(3)
@ -151,3 +259,19 @@ test('Should emit a response event with the error set', t => {
t.ok(err instanceof TimeoutError)
})
})
test('Emit event', t => {
t.plan(2)
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
client.on(events.REQUEST, (err, request) => {
t.error(err)
t.deepEqual(request, { hello: 'world' })
})
client.emit(events.REQUEST, null, { hello: 'world' })
})