diff --git a/docs/observability.asciidoc b/docs/observability.asciidoc index 26a3be83a..2d4a36668 100644 --- a/docs/observability.asciidoc +++ b/docs/observability.asciidoc @@ -49,6 +49,15 @@ client.on('response', (err, result) => { The client emits the following events: [cols=2*] |=== +|`serialization` +a|Emitted before starting serialization and compression. If you want to measure this phase duration, you should measure the time elapsed between this event and `request`. +[source,js] +---- +client.on('serialization', (err, result) => { + console.log(err, result) +}) +---- + |`request` a|Emitted before sending the actual request to {es} _(emitted multiple times in case of retries)_. [source,js] @@ -58,6 +67,15 @@ client.on('request', (err, result) => { }) ---- +|`deserialization` +a|Emitted before starting deserialization and decompression. If you want to measure this phase duration, you should measure the time elapsed between this event and `response`. _(This event might not be emitted in certain situations)_. +[source,js] +---- +client.on('deserialization', (err, result) => { + console.log(err, result) +}) +---- + |`response` a|Emitted once {es} response has been received and parsed. [source,js] @@ -87,7 +105,7 @@ client.on('resurrect', (err, result) => { |=== -The values of `result` in `request`, `response` and `sniff` will be: +The values of `result` in `serialization`, `request`, `deserialization`, `response` and `sniff` will be: [source,ts] ---- @@ -127,6 +145,29 @@ request: { }; ---- +[discrete] +==== Events order + +The event order is described in the following graph, in some edge cases, the order is not guaranteed. +You can find in https://github.com/elastic/elasticsearch-js/blob/master/test/acceptance/events-order.test.js[`test/acceptance/events-order.test.js`] how the order changes based on the situation. + +[source] +---- +serialization + │ + │ (serialization and compression happens between those two events) + │ + └─▶ request + │ + │ (actual time spent over the wire) + │ + └─▶ deserialization + │ + │ (deserialization and decompression happens between those two events) + │ + └─▶ response +---- + [discrete] === Correlation id diff --git a/index.d.ts b/index.d.ts index 47155a004..4f309d6aa 100644 --- a/index.d.ts +++ b/index.d.ts @@ -2573,8 +2573,10 @@ declare class Client { } declare const events: { - RESPONSE: string; + SERIALIZATION: string; REQUEST: string; + DESERIALIZATION: string; + RESPONSE: string; SNIFF: string; RESURRECT: string; }; diff --git a/index.js b/index.js index c3c278efc..e4d1ae704 100644 --- a/index.js +++ b/index.js @@ -314,7 +314,9 @@ const events = { RESPONSE: 'response', REQUEST: 'request', SNIFF: 'sniff', - RESURRECT: 'resurrect' + RESURRECT: 'resurrect', + SERIALIZATION: 'serialization', + DESERIALIZATION: 'deserialization' } module.exports = { diff --git a/lib/Transport.js b/lib/Transport.js index 6b0e17efc..50b75ccd5 100644 --- a/lib/Transport.js +++ b/lib/Transport.js @@ -268,6 +268,8 @@ class Transport { if (!isCompressed) { response.setEncoding('utf8') } + + this.emit('deserialization', null, result) response.on('data', onData) response.on('error', onEnd) response.on('end', onEnd) @@ -340,6 +342,7 @@ class Transport { } } + this.emit('serialization', null, result) const headers = Object.assign({}, this.headers, lowerCaseHeaders(options.headers)) if (options.opaqueId !== undefined) { @@ -354,6 +357,7 @@ class Transport { try { params.body = this.serializer.serialize(params.body) } catch (err) { + this.emit('request', err, result) process.nextTick(callback, err, result) return transportReturn } @@ -369,6 +373,7 @@ class Transport { try { params.body = this.serializer.ndserialize(params.bulkBody) } catch (err) { + this.emit('request', err, result) process.nextTick(callback, err, result) return transportReturn } @@ -408,6 +413,7 @@ class Transport { gzip(params.body, (err, buffer) => { /* istanbul ignore next */ if (err) { + this.emit('request', err, result) return callback(err, result) } params.headers['content-encoding'] = compression diff --git a/test/acceptance/events-order.test.js b/test/acceptance/events-order.test.js new file mode 100644 index 000000000..b4a9ff6b3 --- /dev/null +++ b/test/acceptance/events-order.test.js @@ -0,0 +1,462 @@ +/* + * Licensed to Elasticsearch B.V. under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch B.V. licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +'use strict' + +const { test } = require('tap') +const intoStream = require('into-stream') +const { Client, Connection, events } = require('../../index') +const { + TimeoutError, + ConnectionError, + ResponseError, + RequestAbortedError, + SerializationError, + DeserializationError +} = require('../../lib/errors') +const { + buildServer, + connection: { + MockConnection, + MockConnectionError, + MockConnectionTimeout, + buildMockConnection + } +} = require('../utils') + +test('No errors', t => { + t.plan(10) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const order = [ + events.SERIALIZATION, + events.REQUEST, + events.DESERIALIZATION, + events.RESPONSE + ] + + client.on(events.SERIALIZATION, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.SERIALIZATION) + }) + + client.on(events.REQUEST, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.REQUEST) + }) + + client.on(events.DESERIALIZATION, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.DESERIALIZATION) + }) + + client.on(events.RESPONSE, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.RESPONSE) + }) + + client.info((err, result) => { + t.error(err) + t.strictEqual(order.length, 0) + }) +}) + +test('Connection error', t => { + t.plan(10) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnectionError, + maxRetries: 1 + }) + + const order = [ + events.SERIALIZATION, + events.REQUEST, + events.REQUEST, + events.RESPONSE + ] + + client.on(events.SERIALIZATION, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.SERIALIZATION) + }) + + client.on(events.REQUEST, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.REQUEST) + }) + + client.on(events.DESERIALIZATION, (_err, request) => { + t.fail('Should not be called') + }) + + client.on(events.RESPONSE, (err, request) => { + t.ok(err instanceof ConnectionError) + t.strictEqual(order.shift(), events.RESPONSE) + }) + + client.info((err, result) => { + t.ok(err instanceof ConnectionError) + t.strictEqual(order.length, 0) + }) +}) + +test('TimeoutError error', t => { + t.plan(10) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnectionTimeout, + maxRetries: 1 + }) + + const order = [ + events.SERIALIZATION, + events.REQUEST, + events.REQUEST, + events.RESPONSE + ] + + client.on(events.SERIALIZATION, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.SERIALIZATION) + }) + + client.on(events.REQUEST, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.REQUEST) + }) + + client.on(events.DESERIALIZATION, (_err, request) => { + t.fail('Should not be called') + }) + + client.on(events.RESPONSE, (err, request) => { + t.ok(err instanceof TimeoutError) + t.strictEqual(order.shift(), events.RESPONSE) + }) + + client.info((err, result) => { + t.ok(err instanceof TimeoutError) + t.strictEqual(order.length, 0) + }) +}) + +test('RequestAbortedError error', t => { + t.plan(8) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnectionTimeout, + maxRetries: 1 + }) + + const order = [ + events.SERIALIZATION, + events.REQUEST, + events.RESPONSE + ] + + client.on(events.SERIALIZATION, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.SERIALIZATION) + }) + + client.on(events.REQUEST, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.REQUEST) + }) + + client.on(events.DESERIALIZATION, (_err, request) => { + t.fail('Should not be called') + }) + + client.on(events.RESPONSE, (err, request) => { + t.ok(err instanceof RequestAbortedError) + t.strictEqual(order.shift(), events.RESPONSE) + }) + + const request = client.info((err, result) => { + t.ok(err instanceof RequestAbortedError) + t.strictEqual(order.length, 0) + }) + + request.abort() +}) + +test('ResponseError error (no retry)', t => { + t.plan(10) + + const MockConnection = buildMockConnection({ + onRequest (params) { + return { + statusCode: 400, + body: { hello: 'world' } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection, + maxRetries: 1 + }) + + const order = [ + events.SERIALIZATION, + events.REQUEST, + events.DESERIALIZATION, + events.RESPONSE + ] + + client.on(events.SERIALIZATION, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.SERIALIZATION) + }) + + client.on(events.REQUEST, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.REQUEST) + }) + + client.on(events.DESERIALIZATION, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.DESERIALIZATION) + }) + + client.on(events.RESPONSE, (err, request) => { + t.ok(err instanceof ResponseError) + t.strictEqual(order.shift(), events.RESPONSE) + }) + + client.info((err, result) => { + t.ok(err instanceof ResponseError) + t.strictEqual(order.length, 0) + }) +}) + +test('ResponseError error (with retry)', t => { + t.plan(14) + + const MockConnection = buildMockConnection({ + onRequest (params) { + return { + statusCode: 504, + body: { hello: 'world' } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection, + maxRetries: 1 + }) + + const order = [ + events.SERIALIZATION, + events.REQUEST, + events.DESERIALIZATION, + events.REQUEST, + events.DESERIALIZATION, + events.RESPONSE + ] + + client.on(events.SERIALIZATION, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.SERIALIZATION) + }) + + client.on(events.REQUEST, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.REQUEST) + }) + + client.on(events.DESERIALIZATION, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.DESERIALIZATION) + }) + + client.on(events.RESPONSE, (err, request) => { + t.ok(err instanceof ResponseError) + t.strictEqual(order.shift(), events.RESPONSE) + }) + + client.info((err, result) => { + t.ok(err instanceof ResponseError) + t.strictEqual(order.length, 0) + }) +}) + +test('Serialization Error', t => { + t.plan(6) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection, + maxRetries: 1 + }) + + const order = [ + events.SERIALIZATION, + events.REQUEST + ] + + client.on(events.SERIALIZATION, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.SERIALIZATION) + }) + + client.on(events.REQUEST, (err, request) => { + t.ok(err instanceof SerializationError) + t.strictEqual(order.shift(), events.REQUEST) + }) + + client.on(events.DESERIALIZATION, (_err, request) => { + t.fail('Should not be called') + }) + + client.on(events.RESPONSE, (_err, request) => { + t.fail('Should not be called') + }) + + const body = {} + body.o = body + client.index({ index: 'test', body }, (err, result) => { + t.ok(err instanceof SerializationError) + t.strictEqual(order.length, 0) + }) +}) + +test('Deserialization Error', t => { + t.plan(10) + + class MockConnection extends Connection { + request (params, callback) { + const body = '{"hello":"wor' + const stream = intoStream(body) + stream.statusCode = 200 + stream.headers = { + 'content-type': 'application/json;utf=8', + 'content-length': body.length, + connection: 'keep-alive', + date: new Date().toISOString() + } + stream.on('close', () => t.pass('Stream destroyed')) + process.nextTick(callback, null, stream) + return { abort () {} } + } + } + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection, + maxRetries: 1 + }) + + const order = [ + events.SERIALIZATION, + events.REQUEST, + events.DESERIALIZATION, + events.RESPONSE + ] + + client.on(events.SERIALIZATION, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.SERIALIZATION) + }) + + client.on(events.REQUEST, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.REQUEST) + }) + + client.on(events.DESERIALIZATION, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.DESERIALIZATION) + }) + + client.on(events.RESPONSE, (err, request) => { + t.ok(err instanceof DeserializationError) + t.strictEqual(order.shift(), events.RESPONSE) + }) + + client.info((err, result) => { + t.ok(err instanceof DeserializationError) + t.strictEqual(order.length, 0) + }) +}) + +test('Socket destroyed while reading the body', t => { + t.plan(14) + + function handler (req, res) { + const body = JSON.stringify({ hello: 'world' }) + res.setHeader('Content-Type', 'application/json;utf=8') + res.setHeader('Content-Length', body.length + '') + res.write(body.slice(0, -5)) + setTimeout(() => { + res.socket.destroy() + }, 500) + } + + buildServer(handler, ({ port }, server) => { + const client = new Client({ node: `http://localhost:${port}`, maxRetries: 1 }) + + const order = [ + events.SERIALIZATION, + events.REQUEST, + events.DESERIALIZATION, + events.REQUEST, + events.DESERIALIZATION, + events.RESPONSE + ] + + client.on(events.SERIALIZATION, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.SERIALIZATION) + }) + + client.on(events.REQUEST, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.REQUEST) + }) + + client.on(events.DESERIALIZATION, (err, request) => { + t.error(err) + t.strictEqual(order.shift(), events.DESERIALIZATION) + }) + + client.on(events.RESPONSE, (err, request) => { + t.ok(err instanceof ConnectionError) + t.strictEqual(order.shift(), events.RESPONSE) + }) + + client.info((err, result) => { + t.ok(err instanceof ConnectionError) + t.strictEqual(order.length, 0) + server.stop() + }) + }) +}) diff --git a/test/unit/events.test.js b/test/unit/events.test.js index 4602c0ed7..2842e7e46 100644 --- a/test/unit/events.test.js +++ b/test/unit/events.test.js @@ -23,7 +23,12 @@ const { test } = require('tap') const semver = require('semver') const { Client, events } = require('../../index') const { TimeoutError } = require('../../lib/errors') -const { connection: { MockConnection, MockConnectionTimeout } } = require('../utils') +const { + connection: { + MockConnection, + MockConnectionTimeout + } +} = require('../utils') test('Should emit a request event when a request is performed', t => { t.plan(3)