WIP: initial prototype

- Added client.close API
- Added resurrect event
- Improved resurrect ping strategy
- Updated types
This commit is contained in:
delvedor
2018-12-10 20:12:25 +01:00
parent 4c8df2d4c0
commit a9e621721e
6 changed files with 39 additions and 10 deletions

7
index.d.ts vendored
View File

@ -4,7 +4,7 @@ import { EventEmitter } from 'events';
import { SecureContextOptions } from 'tls'; import { SecureContextOptions } from 'tls';
import Transport, { ApiResponse, EventMeta, SniffMeta } from './lib/Transport'; import Transport, { ApiResponse, EventMeta, SniffMeta } from './lib/Transport';
import Connection, { AgentOptions } from './lib/Connection'; import Connection, { AgentOptions } from './lib/Connection';
import ConnectionPool, { nodeSelectorFn, nodeFilterFn } from './lib/ConnectionPool'; import ConnectionPool, { nodeSelectorFn, nodeFilterFn, ResurrectMeta } from './lib/ConnectionPool';
import Serializer from './lib/Serializer'; import Serializer from './lib/Serializer';
declare type anyObject = { declare type anyObject = {
@ -461,12 +461,14 @@ declare class Client extends EventEmitter {
} }
} }
constructor(opts?: ClientOptions); constructor(opts?: ClientOptions);
close(callback?: Function): Promise<void> | void;
} }
declare const events: { declare const events: {
RESPONSE: string; RESPONSE: string;
REQUEST: string; REQUEST: string;
SNIFF: string; SNIFF: string;
RESURRECT: string;
}; };
export { export {
@ -478,5 +480,6 @@ export {
events, events,
ApiResponse, ApiResponse,
EventMeta, EventMeta,
SniffMeta SniffMeta,
ResurrectMeta
}; };

View File

@ -55,7 +55,8 @@ class Client extends EventEmitter {
nodeFilter: options.nodeFilter, nodeFilter: options.nodeFilter,
nodeWeighter: options.nodeWeighter, nodeWeighter: options.nodeWeighter,
nodeSelector: options.nodeSelector, nodeSelector: options.nodeSelector,
Connection: options.Connection Connection: options.Connection,
emit: this.emit.bind(this)
}) })
// Add the connections before initialize the Transport // Add the connections before initialize the Transport
@ -84,12 +85,23 @@ class Client extends EventEmitter {
this[api] = apis[api] this[api] = apis[api]
}) })
} }
close (callback) {
if (callback == null) {
return new Promise((resolve, reject) => {
this.close(resolve)
})
}
this.connectionPool.empty()
callback()
}
} }
const events = { const events = {
RESPONSE: 'response', RESPONSE: 'response',
REQUEST: 'request', REQUEST: 'request',
SNIFF: 'sniff' SNIFF: 'sniff',
RESURRECT: 'resurrect'
} }
module.exports = { module.exports = {

2
lib/Connection.d.ts vendored
View File

@ -46,7 +46,7 @@ export default class Connection {
_status: string; _status: string;
_agent: http.Agent; _agent: http.Agent;
constructor(opts?: ConnectionOptions); constructor(opts?: ConnectionOptions);
request(params: any, callback: (err: Error | null, response: http.IncomingMessage | null) => void): http.ClientRequest; request(params: http.ClientRequestArgs, callback: (err: Error | null, response: http.IncomingMessage | null) => void): http.ClientRequest;
close(): Connection; close(): Connection;
setRole(role: string, enabled: boolean): Connection; setRole(role: string, enabled: boolean): Connection;
status: string; status: string;

View File

@ -64,7 +64,7 @@ class Connection {
ended = true ended = true
this._openRequests-- this._openRequests--
request.abort() request.abort()
callback(new TimeoutError('Request timed out', params)) callback(new TimeoutError('Request timed out', params), null)
} }
}) })
@ -73,7 +73,7 @@ class Connection {
if (ended === false) { if (ended === false) {
ended = true ended = true
this._openRequests-- this._openRequests--
callback(err) callback(err, null)
} }
}) })
@ -95,7 +95,7 @@ class Connection {
if (err != null && ended === false) { if (err != null && ended === false) {
ended = true ended = true
this._openRequests-- this._openRequests--
callback(err) callback(err, null)
} }
}) })
} else { } else {
@ -152,6 +152,7 @@ class Connection {
pathname: url.pathname, pathname: url.pathname,
path: '', path: '',
href: url.href, href: url.href,
origin: url.origin,
port: url.port, port: url.port,
headers: this.headers, headers: this.headers,
auth: !!url.username === true || !!url.password === true auth: !!url.username === true || !!url.password === true

View File

@ -27,6 +27,12 @@ export interface getConnectionOptions {
selector?: nodeSelectorFn; selector?: nodeSelectorFn;
} }
export interface ResurrectMeta {
strategy: string;
isAlive: boolean;
connection: Connection;
}
export default class ConnectionPool { export default class ConnectionPool {
static resurrectStrategies: { static resurrectStrategies: {
none: number; none: number;

View File

@ -26,6 +26,7 @@ class ConnectionPool {
this.randomizeHost = opts.randomizeHost === true this.randomizeHost = opts.randomizeHost === true
this.nodeFilter = opts.nodeFilter || defaultNodeFilter this.nodeFilter = opts.nodeFilter || defaultNodeFilter
this.Connection = opts.Connection this.Connection = opts.Connection
this.emit = opts.emit || noop
if (typeof opts.nodeSelector === 'function') { if (typeof opts.nodeSelector === 'function') {
this.nodeSelector = opts.nodeSelector this.nodeSelector = opts.nodeSelector
@ -107,6 +108,7 @@ class ConnectionPool {
*/ */
resurrect (now = Date.now(), callback = noop) { resurrect (now = Date.now(), callback = noop) {
if (this.resurrectStrategy === 0 || this.dead.length === 0) { if (this.resurrectStrategy === 0 || this.dead.length === 0) {
debug('Nothing to resurrect')
callback(null, null) callback(null, null)
return return
} }
@ -116,6 +118,7 @@ class ConnectionPool {
const connection = this.connections.get(this.dead[0]) const connection = this.connections.get(this.dead[0])
if (now < connection.resurrectTimeout) { if (now < connection.resurrectTimeout) {
debug('Nothing to resurrect') debug('Nothing to resurrect')
callback(null, null)
return return
} }
@ -127,9 +130,11 @@ class ConnectionPool {
method: 'HEAD', method: 'HEAD',
path: '/', path: '/',
timeout: this.pingTimeout timeout: this.pingTimeout
}, (err, res) => { }, (err, response) => {
var isAlive = true var isAlive = true
if (err != null) { const statusCode = response !== null ? response.statusCode : 0
if (err != null ||
(statusCode === 502 || statusCode === 503 || statusCode === 504)) {
debug(`Resurrect: connection '${id}' is still dead`) debug(`Resurrect: connection '${id}' is still dead`)
this.markDead(connection) this.markDead(connection)
isAlive = false isAlive = false
@ -137,6 +142,7 @@ class ConnectionPool {
debug(`Resurrect: connection '${id}' is now alive`) debug(`Resurrect: connection '${id}' is now alive`)
this.markAlive(connection) this.markAlive(connection)
} }
this.emit('resurrect', null, { strategy: 'ping', isAlive, connection })
callback(isAlive, connection) callback(isAlive, connection)
}) })
// optimistic strategy // optimistic strategy
@ -144,6 +150,7 @@ class ConnectionPool {
debug(`Resurrect: optimistic resurrection for connection '${id}'`) debug(`Resurrect: optimistic resurrection for connection '${id}'`)
this.dead.splice(this.dead.indexOf(id), 1) this.dead.splice(this.dead.indexOf(id), 1)
connection.status = Connection.statuses.ALIVE connection.status = Connection.statuses.ALIVE
this.emit('resurrect', null, { strategy: 'optimistic', isAlive: true, connection })
// eslint-disable-next-line standard/no-callback-literal // eslint-disable-next-line standard/no-callback-literal
callback(true, connection) callback(true, connection)
} }