Improve observability (#834)

* API generation

* Added correlation id support

* Updated docs

* Updated test

* Updated code generation

* API generation

* Updated code generation

* Added support for client name and custom context object

* Updated docs

* Updated test

* Fix docs

* Updated docs

* Added id support also for sniffing

* Updated test

* Update docs/observability.asciidoc

Co-Authored-By: delvedor <delvedor@users.noreply.github.com>

* Update docs/observability.asciidoc

Co-Authored-By: delvedor <delvedor@users.noreply.github.com>

* Apply suggestions

* Update docs/configuration.asciidoc

Co-Authored-By: delvedor <delvedor@users.noreply.github.com>

* Update docs/configuration.asciidoc

Co-Authored-By: delvedor <delvedor@users.noreply.github.com>

* Update docs/observability.asciidoc

Co-Authored-By: delvedor <delvedor@users.noreply.github.com>

* Update docs/observability.asciidoc

Co-Authored-By: delvedor <delvedor@users.noreply.github.com>

* Update docs/observability.asciidoc

Co-Authored-By: delvedor <delvedor@users.noreply.github.com>

* Apply suggestions

* Updated README.md

* Fixed test

* Addressed suggestions
This commit is contained in:
Tomas Della Vedova
2019-05-03 17:23:40 +02:00
committed by delvedor
parent c47725d401
commit ed9db61d1f
279 changed files with 18696 additions and 142 deletions

View File

@ -36,10 +36,20 @@ export interface getConnectionOptions {
selector?: nodeSelectorFn;
}
export interface resurrectOptions {
now?: number;
requestId: string;
name: string;
}
export interface ResurrectEvent {
strategy: string;
isAlive: boolean;
connection: Connection;
name: string;
request: {
id: any;
};
}
export default class ConnectionPool {
@ -79,10 +89,10 @@ export default class ConnectionPool {
* If enabled, tries to resurrect a connection with the given
* resurrect strategy ('ping', 'optimistic', 'none').
*
* @param {number} epoch
* @param {object} { now, requestId, name }
* @param {function} callback (isAlive, connection)
*/
resurrect(now?: number, callback?: (isAlive: boolean | null, connection: Connection | null) => void): void;
resurrect(opts: resurrectOptions, callback?: (isAlive: boolean | null, connection: Connection | null) => void): void;
/**
* Returns an alive connection if present,
* otherwise returns null.

View File

@ -108,10 +108,10 @@ class ConnectionPool {
* If enabled, tries to resurrect a connection with the given
* resurrect strategy ('ping', 'optimistic', 'none').
*
* @param {number} epoch
* @param {object} { now, requestId }
* @param {function} callback (isAlive, connection)
*/
resurrect (now = Date.now(), callback = noop) {
resurrect (opts, callback = noop) {
if (this.resurrectStrategy === 0 || this.dead.length === 0) {
debug('Nothing to resurrect')
callback(null, null)
@ -121,7 +121,7 @@ class ConnectionPool {
// the dead list is sorted in ascending order based on the timeout
// so the first element will always be the one with the smaller timeout
const connection = this.connections.get(this.dead[0])
if (now < connection.resurrectTimeout) {
if ((opts.now || Date.now()) < connection.resurrectTimeout) {
debug('Nothing to resurrect')
callback(null, null)
return
@ -147,7 +147,13 @@ class ConnectionPool {
debug(`Resurrect: connection '${id}' is now alive`)
this.markAlive(connection)
}
this.emit('resurrect', null, { strategy: 'ping', isAlive, connection })
this.emit('resurrect', null, {
strategy: 'ping',
name: opts.name,
request: { id: opts.requestId },
isAlive,
connection
})
callback(isAlive, connection)
})
// optimistic strategy
@ -155,7 +161,13 @@ class ConnectionPool {
debug(`Resurrect: optimistic resurrection for connection '${id}'`)
this.dead.splice(this.dead.indexOf(id), 1)
connection.status = Connection.statuses.ALIVE
this.emit('resurrect', null, { strategy: 'optimistic', isAlive: true, connection })
this.emit('resurrect', null, {
strategy: 'optimistic',
name: opts.name,
request: { id: opts.requestId },
isAlive: true,
connection
})
// eslint-disable-next-line standard/no-callback-literal
callback(true, connection)
}

28
lib/Transport.d.ts vendored
View File

@ -29,6 +29,10 @@ export interface nodeFilterFn {
(connection: Connection): boolean;
}
export interface generateRequestIdFn {
(params: TransportRequestParams, options: TransportRequestOptions): any;
}
declare type noopFn = (...args: any[]) => void;
declare type emitFn = (event: string | symbol, ...args: any[]) => boolean;
@ -47,17 +51,22 @@ interface TransportOptions {
nodeFilter?: nodeFilterFn;
nodeSelector?: string | nodeSelectorFn;
headers?: anyObject;
generateRequestId?: generateRequestIdFn;
name: string;
}
export interface RequestEvent<T = any> {
export interface RequestEvent<T = any, C = any> {
body: T;
statusCode: number | null;
headers: anyObject | null;
warnings: string[] | null;
meta: {
context: C;
name: string;
request: {
params: TransportRequestParams;
options: TransportRequestOptions;
id: any;
};
connection: Connection;
attempts: number;
@ -71,7 +80,7 @@ export interface RequestEvent<T = any> {
// ApiResponse and RequestEvent are the same thing
// we are doing this for have more clear names
export interface ApiResponse<T = any> extends RequestEvent<T> {}
export interface ApiResponse<T = any, C = any> extends RequestEvent<T, C> {}
declare type anyObject = {
[key: string]: any;
@ -93,6 +102,8 @@ export interface TransportRequestOptions {
headers?: anyObject;
querystring?: anyObject;
compression?: string;
id?: any;
context?: any;
warnings?: [string];
}
@ -100,6 +111,15 @@ export interface TransportRequestCallback {
abort: () => void;
}
export interface TransportGetConnectionOptions {
requestId: string;
}
export interface TransportSniffOptions {
reason: string;
requestId?: string;
}
export default class Transport {
static sniffReasons: {
SNIFF_ON_START: string;
@ -123,8 +143,8 @@ export default class Transport {
constructor(opts: TransportOptions);
request(params: TransportRequestParams, options?: TransportRequestOptions): Promise<ApiResponse>;
request(params: TransportRequestParams, options?: TransportRequestOptions, callback?: (err: Error | null, result: ApiResponse) => void): TransportRequestCallback;
getConnection(): Connection | null;
sniff(callback?: (...args: any[]) => void): void;
getConnection(opts: TransportGetConnectionOptions): Connection | null;
sniff(opts?: TransportSniffOptions, callback?: (...args: any[]) => void): void;
}
export {};

View File

@ -50,6 +50,8 @@ class Transport {
this.sniffInterval = opts.sniffInterval
this.sniffOnConnectionFault = opts.sniffOnConnectionFault
this.sniffEndpoint = opts.sniffEndpoint
this.generateRequestId = opts.generateRequestId || generateRequestId()
this.name = opts.name
this.nodeFilter = opts.nodeFilter || defaultNodeFilter
if (typeof opts.nodeSelector === 'function') {
@ -67,7 +69,7 @@ class Transport {
this._isSniffing = false
if (opts.sniffOnStart === true) {
this.sniff(Transport.sniffReasons.SNIFF_ON_START)
this.sniff({ reason: Transport.sniffReasons.SNIFF_ON_START })
}
}
@ -89,10 +91,13 @@ class Transport {
callback = once(callback)
const meta = {
context: options.context || null,
request: {
params: null,
options: null
options: null,
id: options.id || this.generateRequestId(params, options)
},
name: this.name,
connection: null,
attempts: 0,
aborted: false
@ -112,7 +117,7 @@ class Transport {
const makeRequest = () => {
if (meta.aborted === true) return
meta.connection = this.getConnection()
meta.connection = this.getConnection({ requestId: meta.request.id })
if (meta.connection === null) {
return callback(new NoLivingConnectionsError('There are not living connections'), result)
}
@ -192,7 +197,10 @@ class Transport {
this.connectionPool.markDead(meta.connection)
if (this.sniffOnConnectionFault === true) {
this.sniff(Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT)
this.sniff({
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
requestId: meta.request.id
})
}
// retry logic
@ -312,34 +320,36 @@ class Transport {
}
}
getConnection () {
getConnection (opts) {
const now = Date.now()
if (this._sniffEnabled === true && now > this._nextSniff) {
this.sniff(Transport.sniffReasons.SNIFF_INTERVAL)
this.sniff({ reason: Transport.sniffReasons.SNIFF_INTERVAL, requestId: opts.requestId })
}
this.connectionPool.resurrect(now)
this.connectionPool.resurrect({ now, requestId: opts.requestId, name: this.name })
return this.connectionPool.getConnection({
filter: this.nodeFilter,
selector: this.nodeSelector
})
}
sniff (reason = Transport.sniffReasons.DEFAULT, callback = noop) {
sniff (opts, callback = noop) {
if (this._isSniffing === true) return
this._isSniffing = true
debug('Started sniffing request')
if (typeof reason === 'function') {
callback = reason
reason = Transport.sniffReasons.DEFAULT
if (typeof opts === 'function') {
callback = opts
opts = { reason: Transport.sniffReasons.DEFAULT }
}
const { reason } = opts
const request = {
method: 'GET',
path: this.sniffEndpoint
}
this.request(request, (err, result) => {
this.request(request, { id: opts.requestId }, (err, result) => {
this._isSniffing = false
if (this._sniffEnabled === true) {
this._nextSniff = Date.now() + this.sniffInterval
@ -414,5 +424,12 @@ function randomSelector (connections) {
return connections[index]
}
function generateRequestId () {
var maxInt = 2147483647
var nextReqId = 0
return function genReqId (params, options) {
return (nextReqId = (nextReqId + 1) & maxInt)
}
}
module.exports = Transport
module.exports.internals = { defaultNodeFilter, roundRobinSelector, randomSelector }
module.exports.internals = { defaultNodeFilter, roundRobinSelector, randomSelector, generateRequestId }