Improve observability (#834)
* API generation * Added correlation id support * Updated docs * Updated test * Updated code generation * API generation * Updated code generation * Added support for client name and custom context object * Updated docs * Updated test * Fix docs * Updated docs * Added id support also for sniffing * Updated test * Update docs/observability.asciidoc Co-Authored-By: delvedor <delvedor@users.noreply.github.com> * Update docs/observability.asciidoc Co-Authored-By: delvedor <delvedor@users.noreply.github.com> * Apply suggestions * Update docs/configuration.asciidoc Co-Authored-By: delvedor <delvedor@users.noreply.github.com> * Update docs/configuration.asciidoc Co-Authored-By: delvedor <delvedor@users.noreply.github.com> * Update docs/observability.asciidoc Co-Authored-By: delvedor <delvedor@users.noreply.github.com> * Update docs/observability.asciidoc Co-Authored-By: delvedor <delvedor@users.noreply.github.com> * Update docs/observability.asciidoc Co-Authored-By: delvedor <delvedor@users.noreply.github.com> * Apply suggestions * Updated README.md * Fixed test * Addressed suggestions
This commit is contained in:
committed by
GitHub
parent
1261e60d41
commit
269c0fc96a
@ -50,6 +50,8 @@ class Transport {
|
||||
this.sniffInterval = opts.sniffInterval
|
||||
this.sniffOnConnectionFault = opts.sniffOnConnectionFault
|
||||
this.sniffEndpoint = opts.sniffEndpoint
|
||||
this.generateRequestId = opts.generateRequestId || generateRequestId()
|
||||
this.name = opts.name
|
||||
|
||||
this.nodeFilter = opts.nodeFilter || defaultNodeFilter
|
||||
if (typeof opts.nodeSelector === 'function') {
|
||||
@ -67,7 +69,7 @@ class Transport {
|
||||
this._isSniffing = false
|
||||
|
||||
if (opts.sniffOnStart === true) {
|
||||
this.sniff(Transport.sniffReasons.SNIFF_ON_START)
|
||||
this.sniff({ reason: Transport.sniffReasons.SNIFF_ON_START })
|
||||
}
|
||||
}
|
||||
|
||||
@ -89,10 +91,13 @@ class Transport {
|
||||
|
||||
callback = once(callback)
|
||||
const meta = {
|
||||
context: options.context || null,
|
||||
request: {
|
||||
params: null,
|
||||
options: null
|
||||
options: null,
|
||||
id: options.id || this.generateRequestId(params, options)
|
||||
},
|
||||
name: this.name,
|
||||
connection: null,
|
||||
attempts: 0,
|
||||
aborted: false
|
||||
@ -112,7 +117,7 @@ class Transport {
|
||||
|
||||
const makeRequest = () => {
|
||||
if (meta.aborted === true) return
|
||||
meta.connection = this.getConnection()
|
||||
meta.connection = this.getConnection({ requestId: meta.request.id })
|
||||
if (meta.connection === null) {
|
||||
return callback(new NoLivingConnectionsError('There are not living connections'), result)
|
||||
}
|
||||
@ -192,7 +197,10 @@ class Transport {
|
||||
this.connectionPool.markDead(meta.connection)
|
||||
|
||||
if (this.sniffOnConnectionFault === true) {
|
||||
this.sniff(Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT)
|
||||
this.sniff({
|
||||
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
|
||||
requestId: meta.request.id
|
||||
})
|
||||
}
|
||||
|
||||
// retry logic
|
||||
@ -312,34 +320,36 @@ class Transport {
|
||||
}
|
||||
}
|
||||
|
||||
getConnection () {
|
||||
getConnection (opts) {
|
||||
const now = Date.now()
|
||||
if (this._sniffEnabled === true && now > this._nextSniff) {
|
||||
this.sniff(Transport.sniffReasons.SNIFF_INTERVAL)
|
||||
this.sniff({ reason: Transport.sniffReasons.SNIFF_INTERVAL, requestId: opts.requestId })
|
||||
}
|
||||
this.connectionPool.resurrect(now)
|
||||
this.connectionPool.resurrect({ now, requestId: opts.requestId, name: this.name })
|
||||
return this.connectionPool.getConnection({
|
||||
filter: this.nodeFilter,
|
||||
selector: this.nodeSelector
|
||||
})
|
||||
}
|
||||
|
||||
sniff (reason = Transport.sniffReasons.DEFAULT, callback = noop) {
|
||||
sniff (opts, callback = noop) {
|
||||
if (this._isSniffing === true) return
|
||||
this._isSniffing = true
|
||||
debug('Started sniffing request')
|
||||
|
||||
if (typeof reason === 'function') {
|
||||
callback = reason
|
||||
reason = Transport.sniffReasons.DEFAULT
|
||||
if (typeof opts === 'function') {
|
||||
callback = opts
|
||||
opts = { reason: Transport.sniffReasons.DEFAULT }
|
||||
}
|
||||
|
||||
const { reason } = opts
|
||||
|
||||
const request = {
|
||||
method: 'GET',
|
||||
path: this.sniffEndpoint
|
||||
}
|
||||
|
||||
this.request(request, (err, result) => {
|
||||
this.request(request, { id: opts.requestId }, (err, result) => {
|
||||
this._isSniffing = false
|
||||
if (this._sniffEnabled === true) {
|
||||
this._nextSniff = Date.now() + this.sniffInterval
|
||||
@ -414,5 +424,12 @@ function randomSelector (connections) {
|
||||
return connections[index]
|
||||
}
|
||||
|
||||
function generateRequestId () {
|
||||
var maxInt = 2147483647
|
||||
var nextReqId = 0
|
||||
return function genReqId (params, options) {
|
||||
return (nextReqId = (nextReqId + 1) & maxInt)
|
||||
}
|
||||
}
|
||||
module.exports = Transport
|
||||
module.exports.internals = { defaultNodeFilter, roundRobinSelector, randomSelector }
|
||||
module.exports.internals = { defaultNodeFilter, roundRobinSelector, randomSelector, generateRequestId }
|
||||
|
||||
Reference in New Issue
Block a user