Added timeout support in bulk and msearch helpers (#1206)

This commit is contained in:
Tomas Della Vedova
2020-06-03 10:33:08 +02:00
committed by GitHub
parent dbbee273d9
commit 80817a00f9
7 changed files with 367 additions and 29 deletions

View File

@ -96,6 +96,16 @@ const b = client.helpers.bulk({
})
----
|`flushInterval`
a|How much time (in milliseconds) the helper will wait before flushing the body from the last document read. +
_Default:_ `30000`
[source,js]
----
const b = client.helpers.bulk({
flushInterval: 30000
})
----
|`concurrency`
a|How many request will be executed at the same time. +
_Default:_ `5`
@ -254,7 +264,7 @@ To create a new instance of the Msearch helper, you should access it as shown in
|===
|`operations`
a|How many search operations should be sent in a single msearch request. +
_Default:_ `20`
_Default:_ `5`
[source,js]
----
const b = client.helpers.msearch({
@ -262,6 +272,16 @@ const b = client.helpers.msearch({
})
----
|`flushInterval`
a|How much time (in milliseconds) the helper will wait before flushing the operations from the last operation read. +
_Default:_ `500`
[source,js]
----
const b = client.helpers.msearch({
flushInterval: 500
})
----
|`concurrency`
a|How many request will be executed at the same time. +
_Default:_ `5`

4
lib/Helpers.d.ts vendored
View File

@ -69,6 +69,7 @@ export interface BulkHelperOptions<TDocument = unknown> extends Omit<Bulk, 'body
datasource: TDocument[] | Buffer | ReadableStream | AsyncIterator<TDocument>
onDocument: (doc: TDocument) => Action
flushBytes?: number
flushInterval?: number
concurrency?: number
retries?: number
wait?: number
@ -92,6 +93,7 @@ export interface OnDropDocument<TDocument = unknown> {
export interface MsearchHelperOptions extends Omit<Msearch, 'body'> {
operations?: number
flushInterval?: number
concurrency?: number
retries?: number
wait?: number
@ -102,4 +104,4 @@ export interface MsearchHelper extends Promise<void> {
stop(error?: Error): void
search<TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = unknown>(header: Omit<Search, 'body'>, body: TRequestBody): Promise<ApiResponse<TResponse, TContext>>
search<TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = unknown>(header: Omit<Search, 'body'>, body: TRequestBody, callback: callbackFn<TResponse, TContext>): void
}
}

View File

@ -143,12 +143,11 @@ class Helpers {
* @return {object} The possible operations to run.
*/
msearch (options = {}) {
// TODO: add an interval to force flush the body
// to handle the slow producer problem
const client = this[kClient]
const {
operations = 20,
operations = 5,
concurrency = 5,
flushInterval = 500,
retries = this.maxRetries,
wait = 5000,
...msearchOptions
@ -156,14 +155,14 @@ class Helpers {
let stopReading = false
let stopError = null
let timeoutId = null
const operationsStream = new Readable({
objectMode: true,
read (size) {}
})
const p = iterate()
return {
const helper = {
then (onFulfilled, onRejected) {
return p.then(onFulfilled, onRejected)
},
@ -171,11 +170,14 @@ class Helpers {
return p.catch(onRejected)
},
stop (error = null) {
if (stopReading === true) return
stopReading = true
stopError = error
operationsStream.push(null)
},
// TODO: support abort a single search?
// NOTE: the validation checks are synchronous and the callback/promise will
// be resolved in the same tick. We might want to fix this in the future.
search (header, body, callback) {
if (stopReading === true) {
const error = stopError === null
@ -215,6 +217,8 @@ class Helpers {
}
}
return helper
async function iterate () {
const { semaphore, finish } = buildSemaphore()
const msearchBody = []
@ -222,6 +226,7 @@ class Helpers {
let loadedOperations = 0
for await (const operation of operationsStream) {
clearTimeout(timeoutId)
loadedOperations += 1
msearchBody.push(operation[0], operation[1])
callbacks.push(operation[2])
@ -231,9 +236,12 @@ class Helpers {
msearchBody.length = 0
callbacks.length = 0
loadedOperations = 0
} else {
timeoutId = setTimeout(onFlushTimeout, flushInterval)
}
}
clearTimeout(timeoutId)
// In some cases the previos http call does not have finished,
// or we didn't reach the flush bytes threshold, so we force one last operation.
if (loadedOperations > 0) {
@ -246,6 +254,21 @@ class Helpers {
if (stopError !== null) {
throw stopError
}
async function onFlushTimeout () {
const msearchBodyCopy = msearchBody.slice()
const callbacksCopy = callbacks.slice()
msearchBody.length = 0
callbacks.length = 0
loadedOperations = 0
try {
const send = await semaphore()
send(msearchBodyCopy, callbacksCopy)
} catch (err) {
/* istanbul ignore next */
helper.stop(err)
}
}
}
// This function builds a semaphore using the concurrency
@ -365,14 +388,13 @@ class Helpers {
* @return {object} The possible operations to run with the datasource.
*/
bulk (options) {
// TODO: add an interval to force flush the body
// to handle the slow producer problem
const client = this[kClient]
const { serialize, deserialize } = client.serializer
const {
datasource,
onDocument,
flushBytes = 5000000,
flushInterval = 30000,
concurrency = 5,
retries = this.maxRetries,
wait = 5000,
@ -392,6 +414,7 @@ class Helpers {
}
let shouldAbort = false
let timeoutId = null
const stats = {
total: 0,
failed: 0,
@ -403,8 +426,7 @@ class Helpers {
}
const p = iterate()
return {
const helper = {
then (onFulfilled, onRejected) {
return p.then(onFulfilled, onRejected)
},
@ -412,12 +434,15 @@ class Helpers {
return p.catch(onRejected)
},
abort () {
clearTimeout(timeoutId)
shouldAbort = true
stats.aborted = true
return this
}
}
return helper
/**
* Function that iterates over the given datasource and start a bulk operation as soon
* as it reaches the configured bulk size. It's designed to use the Node.js asynchronous
@ -437,6 +462,7 @@ class Helpers {
for await (const chunk of datasource) {
if (shouldAbort === true) break
clearTimeout(timeoutId)
const action = onDocument(chunk)
const operation = Array.isArray(action)
? Object.keys(action[0])[0]
@ -445,16 +471,14 @@ class Helpers {
actionBody = serialize(action)
payloadBody = typeof chunk === 'string' ? chunk : serialize(chunk)
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
bulkBody.push(actionBody)
bulkBody.push(payloadBody)
bulkBody.push(actionBody, payloadBody)
} else if (operation === 'update') {
actionBody = serialize(action[0])
payloadBody = typeof chunk === 'string'
? `{doc:${chunk}}`
: serialize({ doc: chunk, ...action[1] })
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
bulkBody.push(actionBody)
bulkBody.push(payloadBody)
bulkBody.push(actionBody, payloadBody)
} else if (operation === 'delete') {
actionBody = serialize(action)
chunkBytes += Buffer.byteLength(actionBody)
@ -469,9 +493,12 @@ class Helpers {
send(bulkBody.slice())
bulkBody.length = 0
chunkBytes = 0
} else {
timeoutId = setTimeout(onFlushTimeout, flushInterval)
}
}
clearTimeout(timeoutId)
// In some cases the previos http call does not have finished,
// or we didn't reach the flush bytes threshold, so we force one last operation.
if (shouldAbort === false && chunkBytes > 0) {
@ -494,6 +521,20 @@ class Helpers {
stats.total = stats.successful + stats.failed
return stats
async function onFlushTimeout () {
stats.bytes += chunkBytes
const bulkBodyCopy = bulkBody.slice()
bulkBody.length = 0
chunkBytes = 0
try {
const send = await semaphore()
send(bulkBodyCopy)
} catch (err) {
/* istanbul ignore next */
helper.abort()
}
}
}
// This function builds a semaphore using the concurrency

View File

@ -41,6 +41,7 @@
"company": "Elasticsearch BV"
},
"devDependencies": {
"@sinonjs/fake-timers": "^6.0.1",
"@types/node": "^12.6.2",
"convert-hrtime": "^3.0.0",
"dedent": "^0.7.0",

View File

@ -27,6 +27,7 @@ const b = client.helpers.bulk<Record<string, any>>({
return { index: { _index: 'test' } }
},
flushBytes: 5000000,
flushInterval: 30000,
concurrency: 5,
retries: 3,
wait: 5000,
@ -58,7 +59,7 @@ expectError(
const options = {
datasource: [],
onDocument (doc: Record<string, any>) {
return { index: { _index: 'test' } }
return { index: { _index: 'test' } }
}
}
expectAssignable<BulkHelperOptions<Record<string, any>>>(options)
@ -139,20 +140,20 @@ expectError(
}
// with type defs
{
{
interface ShardsResponse {
total: number;
successful: number;
failed: number;
skipped: number;
}
interface Explanation {
value: number;
description: string;
details: Explanation[];
}
interface SearchResponse<T> {
took: number;
timed_out: boolean;
@ -178,7 +179,7 @@ expectError(
};
aggregations?: any;
}
interface Source {
foo: string
}
@ -208,20 +209,20 @@ expectError(
match: { foo: string }
}
}
interface ShardsResponse {
total: number;
successful: number;
failed: number;
skipped: number;
}
interface Explanation {
value: number;
description: string;
details: Explanation[];
}
interface SearchResponse<T> {
took: number;
timed_out: boolean;
@ -247,7 +248,7 @@ expectError(
};
aggregations?: any;
}
interface Source {
foo: string
}
@ -310,7 +311,7 @@ expectError(
}
// with type defs
{
{
interface Source {
foo: string
}
@ -337,7 +338,7 @@ expectError(
match: { foo: string }
}
}
interface Source {
foo: string
}
@ -415,7 +416,7 @@ expectError(
match: { foo: string }
}
}
interface Source {
foo: string
}
@ -436,7 +437,8 @@ expectError(
/// .helpers.msearch
const s = client.helpers.msearch({
operations: 20,
operations: 5,
flushInterval: 500,
concurrency: 5,
retries: 5,
wait: 5000
@ -456,4 +458,4 @@ expectType<void>(s.search({ index: 'foo'}, { query: {} }, (err, result) => {
expectType<void>(s.search<string, Record<string, any>, string>({ index: 'foo'}, { query: {} }, (err, result) => {
expectType<ApiError>(err)
expectType<ApiResponse<string, string>>(result)
}))
}))

View File

@ -7,6 +7,7 @@
const { createReadStream } = require('fs')
const { join } = require('path')
const split = require('split2')
const FakeTimers = require('@sinonjs/fake-timers')
const semver = require('semver')
const { test } = require('tap')
const { Client, errors } = require('../../../')
@ -987,3 +988,118 @@ test('errors', t => {
t.end()
})
test('Flush interval', t => {
t.test('Slow producer', async t => {
const clock = FakeTimers.install({ toFake: ['setTimeout', 'clearTimeout'] })
t.teardown(() => clock.uninstall())
let count = 0
const MockConnection = connection.buildMockConnection({
onRequest (params) {
t.strictEqual(params.path, '/_bulk')
t.match(params.headers, { 'content-type': 'application/x-ndjson' })
const [action, payload] = params.body.split('\n')
t.deepEqual(JSON.parse(action), { index: { _index: 'test' } })
t.deepEqual(JSON.parse(payload), dataset[count++])
return { body: { errors: false, items: [{}] } }
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const result = await client.helpers.bulk({
datasource: (async function * generator () {
for (const chunk of dataset) {
await clock.nextAsync()
yield chunk
}
})(),
flushBytes: 5000000,
concurrency: 1,
onDocument (doc) {
return {
index: { _index: 'test' }
}
},
onDrop (doc) {
t.fail('This should never be called')
}
})
t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false
})
})
t.test('Abort operation', async t => {
const clock = FakeTimers.install({ toFake: ['setTimeout', 'clearTimeout'] })
t.teardown(() => clock.uninstall())
let count = 0
const MockConnection = connection.buildMockConnection({
onRequest (params) {
t.true(count < 2)
t.strictEqual(params.path, '/_bulk')
t.match(params.headers, { 'content-type': 'application/x-ndjson' })
const [action, payload] = params.body.split('\n')
t.deepEqual(JSON.parse(action), { index: { _index: 'test' } })
t.deepEqual(JSON.parse(payload), dataset[count++])
return { body: { errors: false, items: [{}] } }
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const b = client.helpers.bulk({
datasource: (async function * generator () {
for (const chunk of dataset) {
await clock.nextAsync()
if (chunk.user === 'tyrion') {
// Needed otherwise in Node.js 10
// the second request will never be sent
await Promise.resolve()
b.abort()
}
yield chunk
}
})(),
flushBytes: 5000000,
concurrency: 1,
onDocument (doc) {
return {
index: { _index: 'test' }
}
},
onDrop (doc) {
t.fail('This should never be called')
}
})
const result = await b
t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 2,
successful: 2,
retry: 0,
failed: 0,
aborted: true
})
})
t.end()
})

View File

@ -7,6 +7,7 @@
const { test } = require('tap')
const { Client, errors } = require('../../../')
const { connection } = require('../../utils')
const FakeTimers = require('@sinonjs/fake-timers')
test('Basic', async t => {
const MockConnection = connection.buildMockConnection({
@ -578,3 +579,158 @@ test('Multiple searches (concurrency = 1)', t => {
t.teardown(() => s.stop())
})
test('Flush interval', t => {
t.plan(4)
const clock = FakeTimers.install({ toFake: ['setTimeout', 'clearTimeout'] })
t.teardown(() => clock.uninstall())
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return {
body: {
responses: [{
status: 200,
hits: {
hits: [
{ _source: { one: 'one' } },
{ _source: { two: 'two' } },
{ _source: { three: 'three' } }
]
}
}, {
status: 200,
hits: {
hits: [
{ _source: { four: 'four' } },
{ _source: { five: 'five' } },
{ _source: { six: 'six' } }
]
}
}]
}
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch()
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
t.error(err)
t.is(result.documents.length, 3)
})
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
t.error(err)
t.is(result.documents.length, 3)
})
setImmediate(clock.next)
t.teardown(() => s.stop())
})
test('Flush interval - early stop', t => {
t.plan(3)
const clock = FakeTimers.install({ toFake: ['setTimeout', 'clearTimeout'] })
t.teardown(() => clock.uninstall())
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return {
body: {
responses: [{
status: 200,
hits: {
hits: [
{ _source: { one: 'one' } },
{ _source: { two: 'two' } },
{ _source: { three: 'three' } }
]
}
}]
}
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch()
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
t.error(err)
t.is(result.documents.length, 3)
})
setImmediate(() => {
clock.next()
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
t.ok(err instanceof errors.ConfigurationError)
})
})
s.stop()
})
test('Stop should resolve the helper', t => {
t.plan(1)
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return {
body: {
responses: []
}
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch()
setImmediate(s.stop)
s.then(() => t.pass('Called'))
.catch(() => t.fail('Should not fail'))
})
test('Stop should resolve the helper (error)', t => {
t.plan(3)
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return {
body: {
responses: []
}
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch()
setImmediate(s.stop, new Error('kaboom'))
s.then(() => t.fail('Should not fail'))
.catch(err => t.is(err.message, 'kaboom'))
s.catch(err => t.is(err.message, 'kaboom'))
s.then(() => t.fail('Should not fail'), err => t.is(err.message, 'kaboom'))
})