WIP: initial prototype

- Added error parameter to request and response event
- Dropped error event
- Updated typescript indentation
This commit is contained in:
delvedor
2018-12-05 22:16:16 +01:00
parent 7e35f6a903
commit aa5977b153
8 changed files with 202 additions and 204 deletions

9
index.d.ts vendored
View File

@ -8,7 +8,7 @@ import ConnectionPool, { nodeSelectorFn, nodeFilterFn } from './lib/ConnectionPo
import Serializer from './lib/Serializer'; import Serializer from './lib/Serializer';
declare type anyObject = { declare type anyObject = {
[key: string]: any; [key: string]: any;
}; };
declare type callbackFn = (err: Error | null, result: ApiResponse) => void; declare type callbackFn = (err: Error | null, result: ApiResponse) => void;
declare type apiMethod = (params?: anyObject | callbackFn, callback?: callbackFn) => any; declare type apiMethod = (params?: anyObject | callbackFn, callback?: callbackFn) => any;
@ -464,10 +464,9 @@ declare class Client extends EventEmitter {
} }
declare const events: { declare const events: {
RESPONSE: string; RESPONSE: string;
REQUEST: string; REQUEST: string;
ERROR: string; SNIFF: string;
SNIFF: string;
}; };
export { export {

View File

@ -23,12 +23,6 @@ class Client extends EventEmitter {
this.on('error', console.log) this.on('error', console.log)
} }
// The logging is exposed via events, which the user can
// listen to and log the message its preferred way
// we add a fake listener to the error event to avoid
// the "unhandled error event" error.
this.on('error', () => {})
const options = Object.assign({}, { const options = Object.assign({}, {
Connection, Connection,
ConnectionPool, ConnectionPool,
@ -95,7 +89,6 @@ class Client extends EventEmitter {
const events = { const events = {
RESPONSE: 'response', RESPONSE: 'response',
REQUEST: 'request', REQUEST: 'request',
ERROR: 'error',
SNIFF: 'sniff' SNIFF: 'sniff'
} }

80
lib/Connection.d.ts vendored
View File

@ -5,52 +5,52 @@ import * as http from 'http';
import { SecureContextOptions } from 'tls'; import { SecureContextOptions } from 'tls';
interface ConnectionOptions { interface ConnectionOptions {
url: URL; url: URL;
ssl?: SecureContextOptions; ssl?: SecureContextOptions;
id?: string; id?: string;
headers?: any; headers?: any;
agent?: AgentOptions; agent?: AgentOptions;
status?: string; status?: string;
roles?: any; roles?: any;
} }
export interface AgentOptions { export interface AgentOptions {
keepAlive: boolean; keepAlive: boolean;
keepAliveMsecs: number; keepAliveMsecs: number;
maxSockets: number; maxSockets: number;
maxFreeSockets: number; maxFreeSockets: number;
} }
export default class Connection { export default class Connection {
static statuses: { static statuses: {
ALIVE: string; ALIVE: string;
DEAD: string; DEAD: string;
}; };
static roles: { static roles: {
MASTER: string; MASTER: string;
DATA: string; DATA: string;
INGEST: string; INGEST: string;
COORDINATING: string; COORDINATING: string;
MACHINE_LEARNING: string; MACHINE_LEARNING: string;
}; };
url: URL; url: URL;
ssl: SecureContextOptions | null; ssl: SecureContextOptions | null;
id: string; id: string;
headers: any; headers: any;
deadCount: number; deadCount: number;
resurrectTimeout: number; resurrectTimeout: number;
statuses: any; statuses: any;
roles: any; roles: any;
makeRequest: any; makeRequest: any;
_openRequests: number; _openRequests: number;
_status: string; _status: string;
_agent: http.Agent; _agent: http.Agent;
constructor(opts?: ConnectionOptions); constructor(opts?: ConnectionOptions);
request(params: any, callback: (err: Error | null, response: http.IncomingMessage | null) => void): http.ClientRequest; request(params: any, callback: (err: Error | null, response: http.IncomingMessage | null) => void): http.ClientRequest;
close(): Connection; close(): Connection;
setRole(role: string, enabled: boolean): Connection; setRole(role: string, enabled: boolean): Connection;
status: string; status: string;
buildRequestObject(params: any): http.ClientRequestArgs; buildRequestObject(params: any): http.ClientRequestArgs;
} }
export {}; export {};

View File

@ -4,124 +4,124 @@ import { SecureContextOptions } from 'tls';
import Connection, { AgentOptions } from './Connection'; import Connection, { AgentOptions } from './Connection';
export interface nodeSelectorFn { export interface nodeSelectorFn {
(connections: Connection[]): Connection; (connections: Connection[]): Connection;
} }
export interface nodeFilterFn { export interface nodeFilterFn {
(connection: Connection): boolean; (connection: Connection): boolean;
} }
interface ConnectionPoolOptions { interface ConnectionPoolOptions {
ssl?: SecureContextOptions; ssl?: SecureContextOptions;
agent?: AgentOptions; agent?: AgentOptions;
pingTimeout?: number; pingTimeout?: number;
randomizeHost?: boolean; randomizeHost?: boolean;
Connection: typeof Connection; Connection: typeof Connection;
resurrectStrategy?: string; resurrectStrategy?: string;
nodeFilter?: nodeFilterFn; nodeFilter?: nodeFilterFn;
nodeSelector?: string | nodeSelectorFn; nodeSelector?: string | nodeSelectorFn;
} }
export interface getConnectionOptions { export interface getConnectionOptions {
filter?: nodeFilterFn; filter?: nodeFilterFn;
selector?: nodeSelectorFn; selector?: nodeSelectorFn;
} }
export default class ConnectionPool { export default class ConnectionPool {
static resurrectStrategies: { static resurrectStrategies: {
none: number; none: number;
ping: number; ping: number;
optimistic: number; optimistic: number;
}; };
connections: any; connections: any;
dead: string[]; dead: string[];
_ssl: SecureContextOptions | null; _ssl: SecureContextOptions | null;
_agent: AgentOptions | null; _agent: AgentOptions | null;
resurrectTimeout: number; resurrectTimeout: number;
resurrectTimeoutCutoff: number; resurrectTimeoutCutoff: number;
pingTimeout: number; pingTimeout: number;
randomizeHost: boolean; randomizeHost: boolean;
nodeFilter: nodeFilterFn; nodeFilter: nodeFilterFn;
nodeSelector: nodeSelectorFn; nodeSelector: nodeSelectorFn;
Connection: typeof Connection; Connection: typeof Connection;
resurrectStrategy: number; resurrectStrategy: number;
constructor(opts?: ConnectionPoolOptions); constructor(opts?: ConnectionPoolOptions);
/** /**
* Marks a connection as 'alive'. * Marks a connection as 'alive'.
* If needed removes the connection from the dead list * If needed removes the connection from the dead list
* and then resets the `deadCount`. * and then resets the `deadCount`.
* *
* @param {object} connection * @param {object} connection
*/ */
markAlive(connection: Connection): void; markAlive(connection: Connection): void;
/** /**
* Marks a connection as 'dead'. * Marks a connection as 'dead'.
* If needed adds the connection to the dead list * If needed adds the connection to the dead list
* and then increments the `deadCount`. * and then increments the `deadCount`.
* *
* @param {object} connection * @param {object} connection
*/ */
markDead(connection: Connection): void; markDead(connection: Connection): void;
/** /**
* If enabled, tries to resurrect a connection with the given * If enabled, tries to resurrect a connection with the given
* resurrect strategy ('ping', 'optimistic', 'none'). * resurrect strategy ('ping', 'optimistic', 'none').
* *
* @param {number} epoch * @param {number} epoch
* @param {function} callback (isAlive, connection) * @param {function} callback (isAlive, connection)
*/ */
resurrect(now?: number, callback?: (isAlive: boolean | null, connection: Connection | null) => void): void; resurrect(now?: number, callback?: (isAlive: boolean | null, connection: Connection | null) => void): void;
/** /**
* Returns an alive connection if present, * Returns an alive connection if present,
* otherwise returns null. * otherwise returns null.
* By default it filters the `master` only nodes. * By default it filters the `master` only nodes.
* It uses the selector to choose which * It uses the selector to choose which
* connection return. * connection return.
* *
* @param {object} options (filter and selector) * @param {object} options (filter and selector)
* @returns {object|null} connection * @returns {object|null} connection
*/ */
getConnection(opts?: getConnectionOptions): Connection | null; getConnection(opts?: getConnectionOptions): Connection | null;
/** /**
* Adds a new connection to the pool. * Adds a new connection to the pool.
* *
* @param {object|string} host * @param {object|string} host
* @returns {ConnectionPool} * @returns {ConnectionPool}
*/ */
addConnection(opts: any): Connection | void; addConnection(opts: any): Connection | void;
/** /**
* Removes a new connection to the pool. * Removes a new connection to the pool.
* *
* @param {object} connection * @param {object} connection
* @returns {ConnectionPool} * @returns {ConnectionPool}
*/ */
removeConnection(connection: Connection): ConnectionPool; removeConnection(connection: Connection): ConnectionPool;
/** /**
* Empties the connection pool. * Empties the connection pool.
* *
* @returns {ConnectionPool} * @returns {ConnectionPool}
*/ */
empty(): ConnectionPool; empty(): ConnectionPool;
/** /**
* Update the ConnectionPool with new connections. * Update the ConnectionPool with new connections.
* *
* @param {array} array of connections * @param {array} array of connections
* @returns {ConnectionPool} * @returns {ConnectionPool}
*/ */
update(connections: Connection[]): ConnectionPool; update(connections: Connection[]): ConnectionPool;
/** /**
* Transforms the nodes objects to a host object. * Transforms the nodes objects to a host object.
* *
* @param {object} nodes * @param {object} nodes
* @returns {array} hosts * @returns {array} hosts
*/ */
nodesToHost(nodes: any): any[]; nodesToHost(nodes: any): any[];
/** /**
* Transforms an url string to a host object * Transforms an url string to a host object
* *
* @param {string} url * @param {string} url
* @returns {object} host * @returns {object} host
*/ */
urlToHost(url: string): any; urlToHost(url: string): any;
} }
declare function defaultNodeFilter(node: Connection): boolean; declare function defaultNodeFilter(node: Connection): boolean;
@ -129,9 +129,9 @@ declare function roundRobinSelector(): (connections: Connection[]) => Connection
declare function randomSelector(connections: Connection[]): Connection; declare function randomSelector(connections: Connection[]): Connection;
export declare const internals: { export declare const internals: {
defaultNodeFilter: typeof defaultNodeFilter; defaultNodeFilter: typeof defaultNodeFilter;
roundRobinSelector: typeof roundRobinSelector; roundRobinSelector: typeof roundRobinSelector;
randomSelector: typeof randomSelector; randomSelector: typeof randomSelector;
}; };
export {}; export {};

8
lib/Serializer.d.ts vendored
View File

@ -1,6 +1,6 @@
export default class Serializer { export default class Serializer {
serialize(object: any): string; serialize(object: any): string;
deserialize(json: string): any; deserialize(json: string): any;
ndserialize(array: any[]): string; ndserialize(array: any[]): string;
qserialize(object: any): string; qserialize(object: any): string;
} }

8
lib/Transport.d.ts vendored
View File

@ -40,10 +40,10 @@ export interface SniffMeta {
export default class Transport { export default class Transport {
static sniffReasons: { static sniffReasons: {
SNIFF_ON_START: string; SNIFF_ON_START: string;
SNIFF_INTERVAL: string; SNIFF_INTERVAL: string;
SNIFF_ON_CONNECTION_FAULT: string; SNIFF_ON_CONNECTION_FAULT: string;
DEFAULT: string; DEFAULT: string;
}; };
emit: emitFn & noopFn; emit: emitFn & noopFn;
connectionPool: ConnectionPool; connectionPool: ConnectionPool;

View File

@ -99,7 +99,7 @@ class Transport {
params.timeout = toMs(params.requestTimeout || this.requestTimeout) params.timeout = toMs(params.requestTimeout || this.requestTimeout)
meta.request = params meta.request = params
this.emit('request', meta) this.emit('request', null, meta)
// perform the actual http request // perform the actual http request
return meta.connection.request(params, onResponse) return meta.connection.request(params, onResponse)
@ -127,7 +127,7 @@ class Transport {
? err ? err
: new ConnectionError(err.message, params) : new ConnectionError(err.message, params)
this.emit('error', error, meta) this.emit('response', error, meta)
return callback(error, result) return callback(error, result)
} }
@ -142,7 +142,7 @@ class Transport {
if (params.asStream === true) { if (params.asStream === true) {
result.body = response result.body = response
meta.response = result meta.response = result
this.emit('response', meta) this.emit('response', null, meta)
callback(null, result) callback(null, result)
return return
} }
@ -151,7 +151,11 @@ class Transport {
// collect the payload // collect the payload
response.setEncoding('utf8') response.setEncoding('utf8')
response.on('data', chunk => { payload += chunk }) response.on('data', chunk => { payload += chunk })
response.on('error', err => callback(new ConnectionError(err.message, params), result)) response.on('error', err => {
const error = new ConnectionError(err.message, params)
this.emit('response', error, meta)
callback(error, result)
})
response.on('end', () => { response.on('end', () => {
const isHead = params.method === 'HEAD' const isHead = params.method === 'HEAD'
// we should attempt the payload deserialization only if: // we should attempt the payload deserialization only if:
@ -166,7 +170,7 @@ class Transport {
try { try {
result.body = this.serializer.deserialize(payload) result.body = this.serializer.deserialize(payload)
} catch (err) { } catch (err) {
this.emit('error', err, meta) this.emit('response', err, meta)
return callback(err, result) return callback(err, result)
} }
} else { } else {
@ -198,14 +202,16 @@ class Transport {
} }
meta.response = result meta.response = result
this.emit('response', meta)
if (ignoreStatusCode === false && statusCode >= 400) { if (ignoreStatusCode === false && statusCode >= 400) {
callback(new ResponseError(result), result) const error = new ResponseError(result)
this.emit('response', error, meta)
callback(error, result)
} else { } else {
// cast to boolean if the request method was HEAD // cast to boolean if the request method was HEAD
if (isHead === true && statusCode === 404) { if (isHead === true && statusCode === 404) {
result.body = false result.body = false
} }
this.emit('response', null, meta)
callback(null, result) callback(null, result)
} }
}) })

56
lib/errors.d.ts vendored
View File

@ -1,48 +1,48 @@
export declare class TimeoutError extends Error { export declare class TimeoutError extends Error {
name: string; name: string;
message: string; message: string;
request: any; request: any;
constructor(message: string, request: any); constructor(message: string, request: any);
} }
export declare class ConnectionError extends Error { export declare class ConnectionError extends Error {
name: string; name: string;
message: string; message: string;
request: any; request: any;
constructor(message: string, request: any); constructor(message: string, request: any);
} }
export declare class NoLivingConnectionsError extends Error { export declare class NoLivingConnectionsError extends Error {
name: string; name: string;
message: string; message: string;
constructor(message: string); constructor(message: string);
} }
export declare class SerializationError extends Error { export declare class SerializationError extends Error {
name: string; name: string;
message: string; message: string;
constructor(message: string); constructor(message: string);
} }
export declare class DeserializationError extends Error { export declare class DeserializationError extends Error {
name: string; name: string;
message: string; message: string;
constructor(message: string); constructor(message: string);
} }
export declare class ConfigurationError extends Error { export declare class ConfigurationError extends Error {
name: string; name: string;
message: string; message: string;
constructor(message: string); constructor(message: string);
} }
export declare class ResponseError extends Error { export declare class ResponseError extends Error {
name: string; name: string;
message: string; message: string;
body: any; body: any;
statusCode: number; statusCode: number;
headers: any; headers: any;
constructor({ body, statusCode, headers }: { constructor({ body, statusCode, headers }: {
[key: string]: any; [key: string]: any;
}); });
} }