Updated abort behavior (#1141)

* Updated abort behavior

- Support for aborting a request with the promise api
- Aborting a request will cause a RequestAbortedError
- Normalized Connection class errors, now every error returned is
wrapped by the client errors constructors

* Updated test

* Updated docs

* Updated code generation script

* Renamed test

* Code coverage

* Avoid calling twice transport.request
This commit is contained in:
Tomas Della Vedova
2020-04-06 11:21:19 +02:00
committed by GitHub
parent 953a8033ab
commit 27a8e2a9bf
16 changed files with 824 additions and 648 deletions

View File

@ -67,45 +67,34 @@ client.search({
=== Aborting a request
When using the callback style API, the function also returns an object that
allows you to abort the API request.
If needed, you can abort a running request by calling the `request.abort()` method returned by the API.
CAUTION: If you abort a request, the request will fail with a `RequestAbortedError`.
[source,js]
----
// calback API
const request = client.search({
index: 'my-index',
body: { foo: 'bar' }
}, {
ignore: [404],
maxRetries: 3
}, (err, { body }) => {
if (err) console.log(err)
}, (err, result) => {
if (err) {
console.log(err) // RequestAbortedError
} else {
console.log(result)
}
})
request.abort()
----
Aborting a request with the promise style API is not supported, but you can
achieve that with convenience wrapper.
The same behavior is valid for the promise style API as well.
[source,js]
----
function abortableRequest (params, options) {
var request = null
const promise = new Promise((resolve, reject) => {
request = client.search(params, options, (err, result) => {
err ? reject(err) : resolve(res)
})
})
return {
promise,
abort: () => request.abort()
}
}
const request = abortableRequest({
const request = client.search({
index: 'my-index',
body: { foo: 'bar' }
}, {
@ -113,8 +102,11 @@ const request = abortableRequest({
maxRetries: 3
})
request
.then(result => console.log(result))
.catch(err => console.log(err)) // RequestAbortedError
request.abort()
// access the promise with `request.promise.[method]`
----
@ -213,6 +205,9 @@ You can find the errors exported by the client in the table below.
|`ConnectionError`
|Generated when an error occurs during the request, it can be a connection error or a malformed stream of data.
|`RequestAbortedError`
|Generated if the user calls the `request.abort()` method.
|`NoLivingConnectionsError`
|Given the configuration, the ConnectionPool was not able to find a usable Connection for this request.

1123
index.d.ts vendored

File diff suppressed because it is too large Load Diff

View File

@ -12,7 +12,12 @@ const debug = require('debug')('elasticsearch')
const decompressResponse = require('decompress-response')
const pump = require('pump')
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/
const { TimeoutError, ConfigurationError } = require('./errors')
const {
ConnectionError,
RequestAbortedError,
TimeoutError,
ConfigurationError
} = require('./errors')
class Connection {
constructor (opts = {}) {
@ -95,7 +100,7 @@ class Connection {
if (ended === false) {
ended = true
this._openRequests--
callback(err, null)
callback(new ConnectionError(err.message), null)
}
})
@ -105,6 +110,7 @@ class Connection {
if (ended === false) {
ended = true
this._openRequests--
callback(new RequestAbortedError(), null)
}
})

6
lib/Transport.d.ts vendored
View File

@ -11,7 +11,7 @@ import * as errors from './errors';
export type ApiError = errors.ConfigurationError | errors.ConnectionError |
errors.DeserializationError | errors.SerializationError |
errors.NoLivingConnectionsError | errors.ResponseError |
errors.TimeoutError
errors.TimeoutError | errors.RequestAbortedError
export interface nodeSelectorFn {
(connections: Connection[]): Connection;
@ -102,6 +102,10 @@ export interface TransportRequestCallback {
abort: () => void;
}
export interface TransportRequestPromise<T> extends Promise<T> {
abort: () => void;
}
export interface TransportGetConnectionOptions {
requestId: string;
}

View File

@ -12,7 +12,7 @@ const intoStream = require('into-stream')
const ms = require('ms')
const {
ConnectionError,
TimeoutError,
RequestAbortedError,
NoLivingConnectionsError,
ResponseError,
ConfigurationError
@ -70,14 +70,19 @@ class Transport {
callback = options
options = {}
}
var p = null
// promises support
if (callback == null) {
return new Promise((resolve, reject) => {
this.request(params, options, (err, result) => {
err ? reject(err) : resolve(result)
})
if (callback === undefined) {
let onFulfilled = null
let onRejected = null
p = new Promise((resolve, reject) => {
onFulfilled = resolve
onRejected = reject
})
callback = function callback (err, result) {
err ? onRejected(err) : onFulfilled(result)
}
}
callback = once(callback)
@ -107,7 +112,9 @@ class Transport {
var request = { abort: noop }
const makeRequest = () => {
if (meta.aborted === true) return
if (meta.aborted === true) {
return callback(new RequestAbortedError(), result)
}
meta.connection = this.getConnection({ requestId: meta.request.id })
if (meta.connection == null) {
return callback(new NoLivingConnectionsError(), result)
@ -190,35 +197,30 @@ class Transport {
const onResponse = (err, response) => {
if (err !== null) {
// if there is an error in the connection
// let's mark the connection as dead
this.connectionPool.markDead(meta.connection)
if (err.name !== 'RequestAbortedError') {
// if there is an error in the connection
// let's mark the connection as dead
this.connectionPool.markDead(meta.connection)
if (this.sniffOnConnectionFault === true) {
this.sniff({
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
requestId: meta.request.id
})
if (this.sniffOnConnectionFault === true) {
this.sniff({
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
requestId: meta.request.id
})
}
// retry logic
if (meta.attempts < maxRetries) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
request = makeRequest(params, callback)
return
}
}
// retry logic
if (meta.attempts < maxRetries) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
request = makeRequest(params, callback)
return
}
const error = err instanceof TimeoutError
? err
: new ConnectionError(err.message, result)
if (err.name === 'TimeoutError') {
err.meta = result
}
this.emit('response', error, result)
return callback(error, result)
err.meta = result
this.emit('response', err, result)
return callback(err, result)
}
const { statusCode, headers } = response
@ -310,10 +312,17 @@ class Transport {
request = makeRequest()
return {
abort: () => {
then (onFulfilled, onRejected) {
return p.then(onFulfilled, onRejected)
},
catch (onRejected) {
return p.catch(onRejected)
},
abort () {
meta.aborted = true
request.abort()
debug('Aborting request', params)
return this
}
}
}

7
lib/errors.d.ts vendored
View File

@ -59,3 +59,10 @@ export declare class ResponseError extends ElasticsearchClientError {
headers: Record<string, any>;
constructor(meta: ApiResponse);
}
export declare class RequestAbortedError extends ElasticsearchClientError {
name: string;
message: string;
meta: ApiResponse;
constructor(message: string, meta: ApiResponse);
}

View File

@ -95,6 +95,16 @@ class ResponseError extends ElasticsearchClientError {
}
}
class RequestAbortedError extends ElasticsearchClientError {
constructor (message, meta) {
super(message)
Error.captureStackTrace(this, RequestAbortedError)
this.name = 'RequestAbortedError'
this.message = message || 'Request aborted'
this.meta = meta
}
}
module.exports = {
ElasticsearchClientError,
TimeoutError,
@ -103,5 +113,6 @@ module.exports = {
SerializationError,
DeserializationError,
ConfigurationError,
ResponseError
ResponseError,
RequestAbortedError
}

View File

@ -183,14 +183,14 @@ function buildMethodDefinition (api, name, hasBody) {
if (hasBody) {
let methods = [
{ key: `${api}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}<TRequestBody>, options?: TransportRequestOptions)`, val: `Promise<ApiResponse<TResponse, TContext>>` },
{ key: `${api}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}<TRequestBody>, options?: TransportRequestOptions)`, val: `TransportRequestPromise<ApiResponse<TResponse, TContext>>` },
{ key: `${api}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
{ key: `${api}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}<TRequestBody>, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
{ key: `${api}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}<TRequestBody>, options: TransportRequestOptions, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` }
]
if (isSnakeCased(api)) {
methods = methods.concat([
{ key: `${camelify(api)}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}<TRequestBody>, options?: TransportRequestOptions)`, val: `Promise<ApiResponse<TResponse, TContext>>` },
{ key: `${camelify(api)}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}<TRequestBody>, options?: TransportRequestOptions)`, val: `TransportRequestPromise<ApiResponse<TResponse, TContext>>` },
{ key: `${camelify(api)}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
{ key: `${camelify(api)}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}<TRequestBody>, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
{ key: `${camelify(api)}<TRequestBody extends ${bodyType}, TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}<TRequestBody>, options: TransportRequestOptions, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` }
@ -199,14 +199,14 @@ function buildMethodDefinition (api, name, hasBody) {
return methods
} else {
let methods = [
{ key: `${api}<TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}, options?: TransportRequestOptions)`, val: `Promise<ApiResponse<TResponse, TContext>>` },
{ key: `${api}<TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}, options?: TransportRequestOptions)`, val: `TransportRequestPromise<ApiResponse<TResponse, TContext>>` },
{ key: `${api}<TResponse = ResponseBody, TContext = unknown>(callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
{ key: `${api}<TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
{ key: `${api}<TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}, options: TransportRequestOptions, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` }
]
if (isSnakeCased(api)) {
methods = methods.concat([
{ key: `${camelify(api)}<TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}, options?: TransportRequestOptions)`, val: `Promise<ApiResponse<TResponse, TContext>>` },
{ key: `${camelify(api)}<TResponse = ResponseBody, TContext = unknown>(params?: RequestParams.${Name}, options?: TransportRequestOptions)`, val: `TransportRequestPromise<ApiResponse<TResponse, TContext>>` },
{ key: `${camelify(api)}<TResponse = ResponseBody, TContext = unknown>(callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
{ key: `${camelify(api)}<TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` },
{ key: `${camelify(api)}<TResponse = ResponseBody, TContext = unknown>(params: RequestParams.${Name}, options: TransportRequestOptions, callback: callbackFn<TResponse, TContext>)`, val: `TransportRequestCallback` }

View File

@ -210,7 +210,7 @@ test('Sniff on connection fault', t => {
class MyConnection extends Connection {
request (params, callback) {
if (this.id === 'http://localhost:9200/') {
callback(new Error('kaboom'), null)
callback(new errors.ConnectionError('kaboom'), null)
return {}
} else {
return super.request(params, callback)

View File

@ -4,7 +4,7 @@
import { expectType } from 'tsd'
import { Client, ApiError, ApiResponse, RequestEvent, ResurrectEvent } from '../../'
import { TransportRequestCallback } from '../..//lib/Transport';
import { TransportRequestCallback, TransportRequestPromise } from '../..//lib/Transport';
const client = new Client({
node: 'http://localhost:9200'
@ -39,6 +39,7 @@ client.on('resurrect', (err, meta) => {
expectType<ApiResponse>(result)
})
expectType<TransportRequestCallback>(result)
expectType<void>(result.abort())
}
{
@ -47,6 +48,7 @@ client.on('resurrect', (err, meta) => {
expectType<ApiResponse>(result)
})
expectType<TransportRequestCallback>(result)
expectType<void>(result.abort())
}
{
@ -55,37 +57,42 @@ client.on('resurrect', (err, meta) => {
expectType<ApiResponse>(result)
})
expectType<TransportRequestCallback>(result)
expectType<void>(result.abort())
}
// Promise style
{
const promise = client.info()
expectType<Promise<ApiResponse>>(promise)
expectType<TransportRequestPromise<ApiResponse>>(promise)
promise
.then(result => expectType<ApiResponse>(result))
.catch((err: ApiError) => expectType<ApiError>(err))
expectType<void>(promise.abort())
}
{
const promise = client.info({ pretty: true })
expectType<Promise<ApiResponse>>(promise)
expectType<TransportRequestPromise<ApiResponse>>(promise)
promise
.then(result => expectType<ApiResponse>(result))
.catch((err: ApiError) => expectType<ApiError>(err))
expectType<void>(promise.abort())
}
{
const promise = client.info({ pretty: true }, { ignore: [404] })
expectType<Promise<ApiResponse>>(promise)
expectType<TransportRequestPromise<ApiResponse>>(promise)
promise
.then(result => expectType<ApiResponse>(result))
.catch((err: ApiError) => expectType<ApiError>(err))
expectType<void>(promise.abort())
}
// Promise style with async await
{
const promise = client.info()
expectType<Promise<ApiResponse>>(promise)
expectType<TransportRequestPromise<ApiResponse>>(promise)
expectType<void>(promise.abort())
try {
expectType<ApiResponse>(await promise)
} catch (err) {
@ -95,7 +102,8 @@ client.on('resurrect', (err, meta) => {
{
const promise = client.info({ pretty: true })
expectType<Promise<ApiResponse>>(promise)
expectType<TransportRequestPromise<ApiResponse>>(promise)
expectType<void>(promise.abort())
try {
expectType<ApiResponse>(await promise)
} catch (err) {
@ -105,7 +113,8 @@ client.on('resurrect', (err, meta) => {
{
const promise = client.info({ pretty: true }, { ignore: [404] })
expectType<Promise<ApiResponse>>(promise)
expectType<TransportRequestPromise<ApiResponse>>(promise)
expectType<void>(promise.abort())
try {
expectType<ApiResponse>(await promise)
} catch (err) {

View File

@ -81,3 +81,10 @@ const response = {
expectType<number>(err.statusCode)
expectType<Record<string, any>>(err.headers)
}
{
const err = new errors.RequestAbortedError('message', response)
expectType<string>(err.name)
expectType<string>(err.message)
expectType<ApiResponse>(err.meta)
}

View File

@ -135,7 +135,7 @@ test('Abort method (callback)', t => {
})
})
test('Abort is not supported in promises', t => {
test('Abort method (promises)', t => {
t.plan(2)
function handler (req, res) {
@ -160,7 +160,7 @@ test('Abort is not supported in promises', t => {
})
.catch(t.fail)
t.type(request.abort, 'undefined')
t.type(request.abort, 'function')
})
})

View File

@ -12,7 +12,7 @@ const { Agent } = require('http')
const intoStream = require('into-stream')
const { buildServer } = require('../utils')
const Connection = require('../../lib/Connection')
const { TimeoutError, ConfigurationError } = require('../../lib/errors')
const { TimeoutError, ConfigurationError, RequestAbortedError } = require('../../lib/errors')
test('Basic (http)', t => {
t.plan(4)
@ -855,3 +855,48 @@ test('Should not add agent and ssl to the serialized connection', t => {
t.end()
})
test('Abort a request syncronously', t => {
t.plan(1)
function handler (req, res) {
t.fail('The server should not be contacted')
}
buildServer(handler, ({ port }, server) => {
const connection = new Connection({
url: new URL(`http://localhost:${port}`)
})
const request = connection.request({
path: '/hello',
method: 'GET'
}, (err, res) => {
t.ok(err instanceof RequestAbortedError)
server.stop()
})
request.abort()
})
})
test('Abort a request asyncronously', t => {
t.plan(1)
function handler (req, res) {
// might be called or not
res.end('ok')
}
buildServer(handler, ({ port }, server) => {
const connection = new Connection({
url: new URL(`http://localhost:${port}`)
})
const request = connection.request({
path: '/hello',
method: 'GET'
}, (err, res) => {
t.ok(err instanceof RequestAbortedError)
server.stop()
})
setImmediate(() => request.abort())
})
})

View File

@ -80,3 +80,11 @@ test('ResponseError', t => {
t.ok(err.headers)
t.end()
})
test('RequestAbortedError', t => {
const err = new errors.RequestAbortedError()
t.true(err instanceof Error)
t.true(err instanceof errors.ElasticsearchClientError)
t.true(err.hasOwnProperty('meta'))
t.end()
})

View File

@ -21,7 +21,8 @@ const {
TimeoutError,
ResponseError,
ConnectionError,
ConfigurationError
ConfigurationError,
RequestAbortedError
} = require('../../lib/errors')
const ConnectionPool = require('../../lib/pool/ConnectionPool')
@ -88,6 +89,32 @@ test('Basic (promises support)', t => {
.catch(t.fail)
})
test('Basic - failing (promises support)', t => {
t.plan(1)
const pool = new ConnectionPool({ Connection: MockConnectionTimeout })
pool.addConnection('http://localhost:9200')
const transport = new Transport({
emit: () => {},
connectionPool: pool,
serializer: new Serializer(),
maxRetries: 3,
requestTimeout: 30000,
sniffInterval: false,
sniffOnStart: false
})
transport
.request({
method: 'GET',
path: '/hello'
})
.catch(err => {
t.ok(err instanceof TimeoutError)
})
})
test('Basic (options + promises support)', t => {
t.plan(1)
@ -764,7 +791,7 @@ test('Should call resurrect on every request', t => {
test('Should return a request aborter utility', t => {
t.plan(1)
const pool = new ConnectionPool({ Connection, MockConnection })
const pool = new ConnectionPool({ Connection: MockConnection })
pool.addConnection({
url: new URL('http://localhost:9200'),
id: 'node1'
@ -783,12 +810,11 @@ test('Should return a request aborter utility', t => {
const request = transport.request({
method: 'GET',
path: '/hello'
}, (_err, body) => {
t.fail('Should not be called')
}, (err, result) => {
t.ok(err instanceof RequestAbortedError)
})
request.abort()
t.pass('ok')
})
test('Retry mechanism and abort', t => {
@ -819,8 +845,6 @@ test('Retry mechanism and abort', t => {
emit: event => {
if (event === 'request' && count++ > 0) {
request.abort()
server.stop()
t.pass('ok')
}
},
connectionPool: pool,
@ -834,12 +858,48 @@ test('Retry mechanism and abort', t => {
const request = transport.request({
method: 'GET',
path: '/hello'
}, (e, { body }) => {
t.fail('Should not be called')
}, (err, result) => {
t.ok(err instanceof RequestAbortedError)
server.stop()
})
})
})
test('Abort a request with the promise API', t => {
t.plan(1)
const pool = new ConnectionPool({ Connection: MockConnection })
pool.addConnection({
url: new URL('http://localhost:9200'),
id: 'node1'
})
const transport = new Transport({
emit: () => {},
connectionPool: pool,
serializer: new Serializer(),
maxRetries: 3,
requestTimeout: 30000,
sniffInterval: false,
sniffOnStart: false
})
const request = transport.request({
method: 'GET',
path: '/hello'
})
request
.then(() => {
t.fail('Should not be called')
})
.catch(err => {
t.ok(err instanceof RequestAbortedError)
})
request.abort()
})
test('ResponseError', t => {
t.plan(3)

View File

@ -6,7 +6,11 @@
const assert = require('assert')
const { Connection } = require('../../index')
const { TimeoutError } = require('../../lib/errors')
const {
ConnectionError,
RequestAbortedError,
TimeoutError
} = require('../../lib/errors')
const intoStream = require('into-stream')
class MockConnection extends Connection {
@ -23,6 +27,8 @@ class MockConnection extends Connection {
process.nextTick(() => {
if (!aborted) {
callback(null, stream)
} else {
callback(new RequestAbortedError(), null)
}
})
return {
@ -37,6 +43,8 @@ class MockConnectionTimeout extends Connection {
process.nextTick(() => {
if (!aborted) {
callback(new TimeoutError('Request timed out', params), null)
} else {
callback(new RequestAbortedError(), null)
}
})
return {
@ -50,7 +58,9 @@ class MockConnectionError extends Connection {
var aborted = false
process.nextTick(() => {
if (!aborted) {
callback(new Error('Kaboom'), null)
callback(new ConnectionError('Kaboom'), null)
} else {
callback(new RequestAbortedError(), null)
}
})
return {
@ -93,6 +103,8 @@ class MockConnectionSniff extends Connection {
} else {
callback(null, stream)
}
} else {
callback(new RequestAbortedError(), null)
}
})
return {
@ -122,6 +134,8 @@ function buildMockConnection (opts) {
process.nextTick(() => {
if (!aborted) {
callback(null, stream)
} else {
callback(new RequestAbortedError(), null)
}
})
return {