WIP: initial prototype

- Added sniff reason
- Improved types
This commit is contained in:
delvedor
2018-12-04 14:29:40 +01:00
parent bf4adf0546
commit b60a716e00
3 changed files with 80 additions and 41 deletions

15
index.d.ts vendored
View File

@ -2,11 +2,10 @@
import { EventEmitter } from 'events';
import { SecureContextOptions } from 'tls';
import Transport from './lib/Transport';
import Transport, { ApiResponse, EventMeta, SniffMeta } from './lib/Transport';
import Connection, { AgentOptions } from './lib/Connection';
import ConnectionPool, { nodeSelectorFn, nodeFilterFn } from './lib/ConnectionPool';
import Serializer from './lib/Serializer';
import { ApiResponse } from './lib/Transport';
declare type anyObject = {
[key: string]: any;
@ -471,4 +470,14 @@ declare const events: {
SNIFF: string;
};
export { Client, Transport, ConnectionPool, Connection, Serializer, events, ApiResponse };
export {
Client,
Transport,
ConnectionPool,
Connection,
Serializer,
events,
ApiResponse,
EventMeta,
SniffMeta
};

79
lib/Transport.d.ts vendored
View File

@ -6,42 +6,61 @@ declare type noopFn = (...args: any[]) => void;
declare type emitFn = (event: string | symbol, ...args: any[]) => boolean;
interface TransportOptions {
emit: emitFn & noopFn;
connectionPool: ConnectionPool;
serializer: Serializer;
maxRetries: number;
requestTimeout: number | string;
suggestCompression: boolean;
sniffInterval: number;
sniffOnConnectionFault: boolean;
sniffEndpoint: string;
sniffOnStart: boolean;
emit: emitFn & noopFn;
connectionPool: ConnectionPool;
serializer: Serializer;
maxRetries: number;
requestTimeout: number | string;
suggestCompression: boolean;
sniffInterval: number;
sniffOnConnectionFault: boolean;
sniffEndpoint: string;
sniffOnStart: boolean;
}
export interface ApiResponse {
body: any;
statusCode: number | null;
headers: any;
warnings: any[] | null;
body: any;
statusCode: number | null;
headers: any;
warnings: any[] | null;
}
export interface EventMeta {
connection: Connection;
request: any;
response: ApiResponse;
attempts: number;
aborted: boolean;
}
export interface SniffMeta {
hosts: any[];
reason: string;
}
export default class Transport {
emit: emitFn & noopFn;
connectionPool: ConnectionPool;
serializer: Serializer;
maxRetries: number;
requestTimeout: number;
suggestCompression: boolean;
sniffInterval: number;
sniffOnConnectionFault: boolean;
sniffEndpoint: string;
_sniffEnabled: boolean;
_nextSniff: number;
_isSniffing: boolean;
constructor(opts: TransportOptions);
request(params: any, callback: (err: Error | null, result: ApiResponse) => void): any;
getConnection(): Connection | null;
sniff(callback?: (...args: any[]) => void): void;
static sniffReasons: {
SNIFF_ON_START: string;
SNIFF_INTERVAL: string;
SNIFF_ON_CONNECTION_FAULT: string;
DEFAULT: string;
};
emit: emitFn & noopFn;
connectionPool: ConnectionPool;
serializer: Serializer;
maxRetries: number;
requestTimeout: number;
suggestCompression: boolean;
sniffInterval: number;
sniffOnConnectionFault: boolean;
sniffEndpoint: string;
_sniffEnabled: boolean;
_nextSniff: number;
_isSniffing: boolean;
constructor(opts: TransportOptions);
request(params: any, callback: (err: Error | null, result: ApiResponse) => void): any;
getConnection(): Connection | null;
sniff(callback?: (...args: any[]) => void): void;
}
export {};

View File

@ -29,7 +29,7 @@ class Transport {
this._isSniffing = false
if (opts.sniffOnStart === true) {
this.sniff()
this.sniff(Transport.sniffReasons.SNIFF_ON_START)
}
}
@ -112,7 +112,7 @@ class Transport {
this.connectionPool.markDead(meta.connection)
if (this.sniffOnConnectionFault === true) {
this.sniff()
this.sniff(Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT)
}
// retry logic
@ -225,19 +225,22 @@ class Transport {
getConnection () {
const now = Date.now()
if (this._sniffEnabled === true && now > this._nextSniff) {
this.sniff()
this.sniff(Transport.sniffReasons.SNIFF_INTERVAL)
}
this.connectionPool.resurrect(now)
return this.connectionPool.getConnection()
}
// TODO: add sniff reason
// 'connection-fault', 'interval', 'start', ...
sniff (callback = noop) {
sniff (reason = Transport.sniffReasons.DEFAULT, callback = noop) {
if (this._isSniffing === true) return
this._isSniffing = true
debug('Started sniffing request')
if (typeof reason === 'function') {
callback = reason
reason = Transport.sniffReasons.DEFAULT
}
const request = {
method: 'GET',
path: this.sniffEndpoint
@ -251,7 +254,7 @@ class Transport {
if (err != null) {
debug('Sniffing errored', err)
this.emit('sniff', err, null)
this.emit('sniff', err, { hosts: [], reason })
return callback(err)
}
@ -259,12 +262,20 @@ class Transport {
const hosts = this.connectionPool.nodesToHost(result.body.nodes)
this.connectionPool.update(hosts)
this.emit('sniff', null, hosts)
this.emit('sniff', null, { hosts, reason })
callback(null, hosts)
})
}
}
Transport.sniffReasons = {
SNIFF_ON_START: 'sniff-on-start',
SNIFF_INTERVAL: 'sniff-interval',
SNIFF_ON_CONNECTION_FAULT: 'sniff-on-connection-fault',
// TODO: find a better name
DEFAULT: 'default'
}
function toMs (time) {
if (typeof time === 'string') {
return ms(time)