diff --git a/index.d.ts b/index.d.ts index acb52b595..e5daecd42 100644 --- a/index.d.ts +++ b/index.d.ts @@ -4,7 +4,7 @@ import { EventEmitter } from 'events'; import { SecureContextOptions } from 'tls'; import Transport, { ApiResponse, EventMeta, SniffMeta } from './lib/Transport'; import Connection, { AgentOptions } from './lib/Connection'; -import ConnectionPool, { nodeSelectorFn, nodeFilterFn } from './lib/ConnectionPool'; +import ConnectionPool, { nodeSelectorFn, nodeFilterFn, ResurrectMeta } from './lib/ConnectionPool'; import Serializer from './lib/Serializer'; declare type anyObject = { @@ -461,12 +461,14 @@ declare class Client extends EventEmitter { } } constructor(opts?: ClientOptions); + close(callback?: Function): Promise | void; } declare const events: { RESPONSE: string; REQUEST: string; SNIFF: string; + RESURRECT: string; }; export { @@ -478,5 +480,6 @@ export { events, ApiResponse, EventMeta, - SniffMeta + SniffMeta, + ResurrectMeta }; diff --git a/index.js b/index.js index a54b93bfa..a889aa2f6 100644 --- a/index.js +++ b/index.js @@ -55,7 +55,8 @@ class Client extends EventEmitter { nodeFilter: options.nodeFilter, nodeWeighter: options.nodeWeighter, nodeSelector: options.nodeSelector, - Connection: options.Connection + Connection: options.Connection, + emit: this.emit.bind(this) }) // Add the connections before initialize the Transport @@ -84,12 +85,23 @@ class Client extends EventEmitter { this[api] = apis[api] }) } + + close (callback) { + if (callback == null) { + return new Promise((resolve, reject) => { + this.close(resolve) + }) + } + this.connectionPool.empty() + callback() + } } const events = { RESPONSE: 'response', REQUEST: 'request', - SNIFF: 'sniff' + SNIFF: 'sniff', + RESURRECT: 'resurrect' } module.exports = { diff --git a/lib/Connection.d.ts b/lib/Connection.d.ts index 80953fd40..0bf386563 100644 --- a/lib/Connection.d.ts +++ b/lib/Connection.d.ts @@ -46,7 +46,7 @@ export default class Connection { _status: string; _agent: http.Agent; constructor(opts?: ConnectionOptions); - request(params: any, callback: (err: Error | null, response: http.IncomingMessage | null) => void): http.ClientRequest; + request(params: http.ClientRequestArgs, callback: (err: Error | null, response: http.IncomingMessage | null) => void): http.ClientRequest; close(): Connection; setRole(role: string, enabled: boolean): Connection; status: string; diff --git a/lib/Connection.js b/lib/Connection.js index 12beb7666..678e7f1e5 100644 --- a/lib/Connection.js +++ b/lib/Connection.js @@ -64,7 +64,7 @@ class Connection { ended = true this._openRequests-- request.abort() - callback(new TimeoutError('Request timed out', params)) + callback(new TimeoutError('Request timed out', params), null) } }) @@ -73,7 +73,7 @@ class Connection { if (ended === false) { ended = true this._openRequests-- - callback(err) + callback(err, null) } }) @@ -95,7 +95,7 @@ class Connection { if (err != null && ended === false) { ended = true this._openRequests-- - callback(err) + callback(err, null) } }) } else { @@ -152,6 +152,7 @@ class Connection { pathname: url.pathname, path: '', href: url.href, + origin: url.origin, port: url.port, headers: this.headers, auth: !!url.username === true || !!url.password === true diff --git a/lib/ConnectionPool.d.ts b/lib/ConnectionPool.d.ts index 94ffaae34..83cddbb84 100644 --- a/lib/ConnectionPool.d.ts +++ b/lib/ConnectionPool.d.ts @@ -27,6 +27,12 @@ export interface getConnectionOptions { selector?: nodeSelectorFn; } +export interface ResurrectMeta { + strategy: string; + isAlive: boolean; + connection: Connection; +} + export default class ConnectionPool { static resurrectStrategies: { none: number; diff --git a/lib/ConnectionPool.js b/lib/ConnectionPool.js index 39905a313..40d6f48cf 100644 --- a/lib/ConnectionPool.js +++ b/lib/ConnectionPool.js @@ -26,6 +26,7 @@ class ConnectionPool { this.randomizeHost = opts.randomizeHost === true this.nodeFilter = opts.nodeFilter || defaultNodeFilter this.Connection = opts.Connection + this.emit = opts.emit || noop if (typeof opts.nodeSelector === 'function') { this.nodeSelector = opts.nodeSelector @@ -107,6 +108,7 @@ class ConnectionPool { */ resurrect (now = Date.now(), callback = noop) { if (this.resurrectStrategy === 0 || this.dead.length === 0) { + debug('Nothing to resurrect') callback(null, null) return } @@ -116,6 +118,7 @@ class ConnectionPool { const connection = this.connections.get(this.dead[0]) if (now < connection.resurrectTimeout) { debug('Nothing to resurrect') + callback(null, null) return } @@ -127,9 +130,11 @@ class ConnectionPool { method: 'HEAD', path: '/', timeout: this.pingTimeout - }, (err, res) => { + }, (err, response) => { var isAlive = true - if (err != null) { + const statusCode = response !== null ? response.statusCode : 0 + if (err != null || + (statusCode === 502 || statusCode === 503 || statusCode === 504)) { debug(`Resurrect: connection '${id}' is still dead`) this.markDead(connection) isAlive = false @@ -137,6 +142,7 @@ class ConnectionPool { debug(`Resurrect: connection '${id}' is now alive`) this.markAlive(connection) } + this.emit('resurrect', null, { strategy: 'ping', isAlive, connection }) callback(isAlive, connection) }) // optimistic strategy @@ -144,6 +150,7 @@ class ConnectionPool { debug(`Resurrect: optimistic resurrection for connection '${id}'`) this.dead.splice(this.dead.indexOf(id), 1) connection.status = Connection.statuses.ALIVE + this.emit('resurrect', null, { strategy: 'optimistic', isAlive: true, connection }) // eslint-disable-next-line standard/no-callback-literal callback(true, connection) }