Added new observability events (#1365)

This commit is contained in:
Tomas Della Vedova
2020-12-09 11:43:46 +01:00
committed by GitHub
parent 9fea1aedc0
commit 33035901cf
6 changed files with 522 additions and 4 deletions

View File

@ -49,6 +49,15 @@ client.on('response', (err, result) => {
The client emits the following events:
[cols=2*]
|===
|`serialization`
a|Emitted before starting serialization and compression. If you want to measure this phase duration, you should measure the time elapsed between this event and `request`.
[source,js]
----
client.on('serialization', (err, result) => {
console.log(err, result)
})
----
|`request`
a|Emitted before sending the actual request to {es} _(emitted multiple times in case of retries)_.
[source,js]
@ -58,6 +67,15 @@ client.on('request', (err, result) => {
})
----
|`deserialization`
a|Emitted before starting deserialization and decompression. If you want to measure this phase duration, you should measure the time elapsed between this event and `response`. _(This event might not be emitted in certain situations)_.
[source,js]
----
client.on('deserialization', (err, result) => {
console.log(err, result)
})
----
|`response`
a|Emitted once {es} response has been received and parsed.
[source,js]
@ -87,7 +105,7 @@ client.on('resurrect', (err, result) => {
|===
The values of `result` in `request`, `response` and `sniff` will be:
The values of `result` in `serialization`, `request`, `deserialization`, `response` and `sniff` will be:
[source,ts]
----
@ -127,6 +145,29 @@ request: {
};
----
[discrete]
==== Events order
The event order is described in the following graph, in some edge cases, the order is not guaranteed.
You can find in https://github.com/elastic/elasticsearch-js/blob/master/test/acceptance/events-order.test.js[`test/acceptance/events-order.test.js`] how the order changes based on the situation.
[source]
----
serialization
│ (serialization and compression happens between those two events)
└─▶ request
│ (actual time spent over the wire)
└─▶ deserialization
│ (deserialization and decompression happens between those two events)
└─▶ response
----
[discrete]
=== Correlation id

4
index.d.ts vendored
View File

@ -2573,8 +2573,10 @@ declare class Client {
}
declare const events: {
RESPONSE: string;
SERIALIZATION: string;
REQUEST: string;
DESERIALIZATION: string;
RESPONSE: string;
SNIFF: string;
RESURRECT: string;
};

View File

@ -314,7 +314,9 @@ const events = {
RESPONSE: 'response',
REQUEST: 'request',
SNIFF: 'sniff',
RESURRECT: 'resurrect'
RESURRECT: 'resurrect',
SERIALIZATION: 'serialization',
DESERIALIZATION: 'deserialization'
}
module.exports = {

View File

@ -268,6 +268,8 @@ class Transport {
if (!isCompressed) {
response.setEncoding('utf8')
}
this.emit('deserialization', null, result)
response.on('data', onData)
response.on('error', onEnd)
response.on('end', onEnd)
@ -340,6 +342,7 @@ class Transport {
}
}
this.emit('serialization', null, result)
const headers = Object.assign({}, this.headers, lowerCaseHeaders(options.headers))
if (options.opaqueId !== undefined) {
@ -354,6 +357,7 @@ class Transport {
try {
params.body = this.serializer.serialize(params.body)
} catch (err) {
this.emit('request', err, result)
process.nextTick(callback, err, result)
return transportReturn
}
@ -369,6 +373,7 @@ class Transport {
try {
params.body = this.serializer.ndserialize(params.bulkBody)
} catch (err) {
this.emit('request', err, result)
process.nextTick(callback, err, result)
return transportReturn
}
@ -408,6 +413,7 @@ class Transport {
gzip(params.body, (err, buffer) => {
/* istanbul ignore next */
if (err) {
this.emit('request', err, result)
return callback(err, result)
}
params.headers['content-encoding'] = compression

View File

@ -0,0 +1,462 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
'use strict'
const { test } = require('tap')
const intoStream = require('into-stream')
const { Client, Connection, events } = require('../../index')
const {
TimeoutError,
ConnectionError,
ResponseError,
RequestAbortedError,
SerializationError,
DeserializationError
} = require('../../lib/errors')
const {
buildServer,
connection: {
MockConnection,
MockConnectionError,
MockConnectionTimeout,
buildMockConnection
}
} = require('../utils')
test('No errors', t => {
t.plan(10)
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const order = [
events.SERIALIZATION,
events.REQUEST,
events.DESERIALIZATION,
events.RESPONSE
]
client.on(events.SERIALIZATION, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.SERIALIZATION)
})
client.on(events.REQUEST, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.REQUEST)
})
client.on(events.DESERIALIZATION, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.DESERIALIZATION)
})
client.on(events.RESPONSE, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.RESPONSE)
})
client.info((err, result) => {
t.error(err)
t.strictEqual(order.length, 0)
})
})
test('Connection error', t => {
t.plan(10)
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnectionError,
maxRetries: 1
})
const order = [
events.SERIALIZATION,
events.REQUEST,
events.REQUEST,
events.RESPONSE
]
client.on(events.SERIALIZATION, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.SERIALIZATION)
})
client.on(events.REQUEST, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.REQUEST)
})
client.on(events.DESERIALIZATION, (_err, request) => {
t.fail('Should not be called')
})
client.on(events.RESPONSE, (err, request) => {
t.ok(err instanceof ConnectionError)
t.strictEqual(order.shift(), events.RESPONSE)
})
client.info((err, result) => {
t.ok(err instanceof ConnectionError)
t.strictEqual(order.length, 0)
})
})
test('TimeoutError error', t => {
t.plan(10)
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnectionTimeout,
maxRetries: 1
})
const order = [
events.SERIALIZATION,
events.REQUEST,
events.REQUEST,
events.RESPONSE
]
client.on(events.SERIALIZATION, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.SERIALIZATION)
})
client.on(events.REQUEST, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.REQUEST)
})
client.on(events.DESERIALIZATION, (_err, request) => {
t.fail('Should not be called')
})
client.on(events.RESPONSE, (err, request) => {
t.ok(err instanceof TimeoutError)
t.strictEqual(order.shift(), events.RESPONSE)
})
client.info((err, result) => {
t.ok(err instanceof TimeoutError)
t.strictEqual(order.length, 0)
})
})
test('RequestAbortedError error', t => {
t.plan(8)
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnectionTimeout,
maxRetries: 1
})
const order = [
events.SERIALIZATION,
events.REQUEST,
events.RESPONSE
]
client.on(events.SERIALIZATION, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.SERIALIZATION)
})
client.on(events.REQUEST, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.REQUEST)
})
client.on(events.DESERIALIZATION, (_err, request) => {
t.fail('Should not be called')
})
client.on(events.RESPONSE, (err, request) => {
t.ok(err instanceof RequestAbortedError)
t.strictEqual(order.shift(), events.RESPONSE)
})
const request = client.info((err, result) => {
t.ok(err instanceof RequestAbortedError)
t.strictEqual(order.length, 0)
})
request.abort()
})
test('ResponseError error (no retry)', t => {
t.plan(10)
const MockConnection = buildMockConnection({
onRequest (params) {
return {
statusCode: 400,
body: { hello: 'world' }
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection,
maxRetries: 1
})
const order = [
events.SERIALIZATION,
events.REQUEST,
events.DESERIALIZATION,
events.RESPONSE
]
client.on(events.SERIALIZATION, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.SERIALIZATION)
})
client.on(events.REQUEST, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.REQUEST)
})
client.on(events.DESERIALIZATION, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.DESERIALIZATION)
})
client.on(events.RESPONSE, (err, request) => {
t.ok(err instanceof ResponseError)
t.strictEqual(order.shift(), events.RESPONSE)
})
client.info((err, result) => {
t.ok(err instanceof ResponseError)
t.strictEqual(order.length, 0)
})
})
test('ResponseError error (with retry)', t => {
t.plan(14)
const MockConnection = buildMockConnection({
onRequest (params) {
return {
statusCode: 504,
body: { hello: 'world' }
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection,
maxRetries: 1
})
const order = [
events.SERIALIZATION,
events.REQUEST,
events.DESERIALIZATION,
events.REQUEST,
events.DESERIALIZATION,
events.RESPONSE
]
client.on(events.SERIALIZATION, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.SERIALIZATION)
})
client.on(events.REQUEST, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.REQUEST)
})
client.on(events.DESERIALIZATION, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.DESERIALIZATION)
})
client.on(events.RESPONSE, (err, request) => {
t.ok(err instanceof ResponseError)
t.strictEqual(order.shift(), events.RESPONSE)
})
client.info((err, result) => {
t.ok(err instanceof ResponseError)
t.strictEqual(order.length, 0)
})
})
test('Serialization Error', t => {
t.plan(6)
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection,
maxRetries: 1
})
const order = [
events.SERIALIZATION,
events.REQUEST
]
client.on(events.SERIALIZATION, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.SERIALIZATION)
})
client.on(events.REQUEST, (err, request) => {
t.ok(err instanceof SerializationError)
t.strictEqual(order.shift(), events.REQUEST)
})
client.on(events.DESERIALIZATION, (_err, request) => {
t.fail('Should not be called')
})
client.on(events.RESPONSE, (_err, request) => {
t.fail('Should not be called')
})
const body = {}
body.o = body
client.index({ index: 'test', body }, (err, result) => {
t.ok(err instanceof SerializationError)
t.strictEqual(order.length, 0)
})
})
test('Deserialization Error', t => {
t.plan(10)
class MockConnection extends Connection {
request (params, callback) {
const body = '{"hello":"wor'
const stream = intoStream(body)
stream.statusCode = 200
stream.headers = {
'content-type': 'application/json;utf=8',
'content-length': body.length,
connection: 'keep-alive',
date: new Date().toISOString()
}
stream.on('close', () => t.pass('Stream destroyed'))
process.nextTick(callback, null, stream)
return { abort () {} }
}
}
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection,
maxRetries: 1
})
const order = [
events.SERIALIZATION,
events.REQUEST,
events.DESERIALIZATION,
events.RESPONSE
]
client.on(events.SERIALIZATION, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.SERIALIZATION)
})
client.on(events.REQUEST, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.REQUEST)
})
client.on(events.DESERIALIZATION, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.DESERIALIZATION)
})
client.on(events.RESPONSE, (err, request) => {
t.ok(err instanceof DeserializationError)
t.strictEqual(order.shift(), events.RESPONSE)
})
client.info((err, result) => {
t.ok(err instanceof DeserializationError)
t.strictEqual(order.length, 0)
})
})
test('Socket destroyed while reading the body', t => {
t.plan(14)
function handler (req, res) {
const body = JSON.stringify({ hello: 'world' })
res.setHeader('Content-Type', 'application/json;utf=8')
res.setHeader('Content-Length', body.length + '')
res.write(body.slice(0, -5))
setTimeout(() => {
res.socket.destroy()
}, 500)
}
buildServer(handler, ({ port }, server) => {
const client = new Client({ node: `http://localhost:${port}`, maxRetries: 1 })
const order = [
events.SERIALIZATION,
events.REQUEST,
events.DESERIALIZATION,
events.REQUEST,
events.DESERIALIZATION,
events.RESPONSE
]
client.on(events.SERIALIZATION, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.SERIALIZATION)
})
client.on(events.REQUEST, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.REQUEST)
})
client.on(events.DESERIALIZATION, (err, request) => {
t.error(err)
t.strictEqual(order.shift(), events.DESERIALIZATION)
})
client.on(events.RESPONSE, (err, request) => {
t.ok(err instanceof ConnectionError)
t.strictEqual(order.shift(), events.RESPONSE)
})
client.info((err, result) => {
t.ok(err instanceof ConnectionError)
t.strictEqual(order.length, 0)
server.stop()
})
})
})

View File

@ -23,7 +23,12 @@ const { test } = require('tap')
const semver = require('semver')
const { Client, events } = require('../../index')
const { TimeoutError } = require('../../lib/errors')
const { connection: { MockConnection, MockConnectionTimeout } } = require('../utils')
const {
connection: {
MockConnection,
MockConnectionTimeout
}
} = require('../utils')
test('Should emit a request event when a request is performed', t => {
t.plan(3)