diff --git a/docs/helpers.asciidoc b/docs/helpers.asciidoc index cea848156..37f747f82 100644 --- a/docs/helpers.asciidoc +++ b/docs/helpers.asciidoc @@ -96,6 +96,16 @@ const b = client.helpers.bulk({ }) ---- +|`flushInterval` +a|How much time (in milliseconds) the helper will wait before flushing the body from the last document read. + +_Default:_ `30000` +[source,js] +---- +const b = client.helpers.bulk({ + flushInterval: 30000 +}) +---- + |`concurrency` a|How many request will be executed at the same time. + _Default:_ `5` @@ -254,7 +264,7 @@ To create a new instance of the Msearch helper, you should access it as shown in |=== |`operations` a|How many search operations should be sent in a single msearch request. + -_Default:_ `20` +_Default:_ `5` [source,js] ---- const b = client.helpers.msearch({ @@ -262,6 +272,16 @@ const b = client.helpers.msearch({ }) ---- +|`flushInterval` +a|How much time (in milliseconds) the helper will wait before flushing the operations from the last operation read. + +_Default:_ `500` +[source,js] +---- +const b = client.helpers.msearch({ + flushInterval: 500 +}) +---- + |`concurrency` a|How many request will be executed at the same time. + _Default:_ `5` diff --git a/lib/Helpers.d.ts b/lib/Helpers.d.ts index e1d9d521a..e31839644 100644 --- a/lib/Helpers.d.ts +++ b/lib/Helpers.d.ts @@ -69,6 +69,7 @@ export interface BulkHelperOptions extends Omit onDocument: (doc: TDocument) => Action flushBytes?: number + flushInterval?: number concurrency?: number retries?: number wait?: number @@ -92,6 +93,7 @@ export interface OnDropDocument { export interface MsearchHelperOptions extends Omit { operations?: number + flushInterval?: number concurrency?: number retries?: number wait?: number @@ -102,4 +104,4 @@ export interface MsearchHelper extends Promise { stop(error?: Error): void search, TRequestBody extends RequestBody = Record, TContext = unknown>(header: Omit, body: TRequestBody): Promise> search, TRequestBody extends RequestBody = Record, TContext = unknown>(header: Omit, body: TRequestBody, callback: callbackFn): void -} \ No newline at end of file +} diff --git a/lib/Helpers.js b/lib/Helpers.js index 8d0a4b6c4..a8f33c6c0 100644 --- a/lib/Helpers.js +++ b/lib/Helpers.js @@ -143,12 +143,11 @@ class Helpers { * @return {object} The possible operations to run. */ msearch (options = {}) { - // TODO: add an interval to force flush the body - // to handle the slow producer problem const client = this[kClient] const { - operations = 20, + operations = 5, concurrency = 5, + flushInterval = 500, retries = this.maxRetries, wait = 5000, ...msearchOptions @@ -156,14 +155,14 @@ class Helpers { let stopReading = false let stopError = null + let timeoutId = null const operationsStream = new Readable({ objectMode: true, read (size) {} }) const p = iterate() - - return { + const helper = { then (onFulfilled, onRejected) { return p.then(onFulfilled, onRejected) }, @@ -171,11 +170,14 @@ class Helpers { return p.catch(onRejected) }, stop (error = null) { + if (stopReading === true) return stopReading = true stopError = error operationsStream.push(null) }, // TODO: support abort a single search? + // NOTE: the validation checks are synchronous and the callback/promise will + // be resolved in the same tick. We might want to fix this in the future. search (header, body, callback) { if (stopReading === true) { const error = stopError === null @@ -215,6 +217,8 @@ class Helpers { } } + return helper + async function iterate () { const { semaphore, finish } = buildSemaphore() const msearchBody = [] @@ -222,6 +226,7 @@ class Helpers { let loadedOperations = 0 for await (const operation of operationsStream) { + clearTimeout(timeoutId) loadedOperations += 1 msearchBody.push(operation[0], operation[1]) callbacks.push(operation[2]) @@ -231,9 +236,12 @@ class Helpers { msearchBody.length = 0 callbacks.length = 0 loadedOperations = 0 + } else { + timeoutId = setTimeout(onFlushTimeout, flushInterval) } } + clearTimeout(timeoutId) // In some cases the previos http call does not have finished, // or we didn't reach the flush bytes threshold, so we force one last operation. if (loadedOperations > 0) { @@ -246,6 +254,21 @@ class Helpers { if (stopError !== null) { throw stopError } + + async function onFlushTimeout () { + const msearchBodyCopy = msearchBody.slice() + const callbacksCopy = callbacks.slice() + msearchBody.length = 0 + callbacks.length = 0 + loadedOperations = 0 + try { + const send = await semaphore() + send(msearchBodyCopy, callbacksCopy) + } catch (err) { + /* istanbul ignore next */ + helper.stop(err) + } + } } // This function builds a semaphore using the concurrency @@ -365,14 +388,13 @@ class Helpers { * @return {object} The possible operations to run with the datasource. */ bulk (options) { - // TODO: add an interval to force flush the body - // to handle the slow producer problem const client = this[kClient] const { serialize, deserialize } = client.serializer const { datasource, onDocument, flushBytes = 5000000, + flushInterval = 30000, concurrency = 5, retries = this.maxRetries, wait = 5000, @@ -392,6 +414,7 @@ class Helpers { } let shouldAbort = false + let timeoutId = null const stats = { total: 0, failed: 0, @@ -403,8 +426,7 @@ class Helpers { } const p = iterate() - - return { + const helper = { then (onFulfilled, onRejected) { return p.then(onFulfilled, onRejected) }, @@ -412,12 +434,15 @@ class Helpers { return p.catch(onRejected) }, abort () { + clearTimeout(timeoutId) shouldAbort = true stats.aborted = true return this } } + return helper + /** * Function that iterates over the given datasource and start a bulk operation as soon * as it reaches the configured bulk size. It's designed to use the Node.js asynchronous @@ -437,6 +462,7 @@ class Helpers { for await (const chunk of datasource) { if (shouldAbort === true) break + clearTimeout(timeoutId) const action = onDocument(chunk) const operation = Array.isArray(action) ? Object.keys(action[0])[0] @@ -445,16 +471,14 @@ class Helpers { actionBody = serialize(action) payloadBody = typeof chunk === 'string' ? chunk : serialize(chunk) chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody) - bulkBody.push(actionBody) - bulkBody.push(payloadBody) + bulkBody.push(actionBody, payloadBody) } else if (operation === 'update') { actionBody = serialize(action[0]) payloadBody = typeof chunk === 'string' ? `{doc:${chunk}}` : serialize({ doc: chunk, ...action[1] }) chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody) - bulkBody.push(actionBody) - bulkBody.push(payloadBody) + bulkBody.push(actionBody, payloadBody) } else if (operation === 'delete') { actionBody = serialize(action) chunkBytes += Buffer.byteLength(actionBody) @@ -469,9 +493,12 @@ class Helpers { send(bulkBody.slice()) bulkBody.length = 0 chunkBytes = 0 + } else { + timeoutId = setTimeout(onFlushTimeout, flushInterval) } } + clearTimeout(timeoutId) // In some cases the previos http call does not have finished, // or we didn't reach the flush bytes threshold, so we force one last operation. if (shouldAbort === false && chunkBytes > 0) { @@ -494,6 +521,20 @@ class Helpers { stats.total = stats.successful + stats.failed return stats + + async function onFlushTimeout () { + stats.bytes += chunkBytes + const bulkBodyCopy = bulkBody.slice() + bulkBody.length = 0 + chunkBytes = 0 + try { + const send = await semaphore() + send(bulkBodyCopy) + } catch (err) { + /* istanbul ignore next */ + helper.abort() + } + } } // This function builds a semaphore using the concurrency diff --git a/package.json b/package.json index 1ed0610a3..c4db60189 100644 --- a/package.json +++ b/package.json @@ -41,6 +41,7 @@ "company": "Elasticsearch BV" }, "devDependencies": { + "@sinonjs/fake-timers": "^6.0.1", "@types/node": "^12.6.2", "convert-hrtime": "^3.0.0", "dedent": "^0.7.0", diff --git a/test/types/helpers.test-d.ts b/test/types/helpers.test-d.ts index 137ab5505..339f07e2f 100644 --- a/test/types/helpers.test-d.ts +++ b/test/types/helpers.test-d.ts @@ -27,6 +27,7 @@ const b = client.helpers.bulk>({ return { index: { _index: 'test' } } }, flushBytes: 5000000, + flushInterval: 30000, concurrency: 5, retries: 3, wait: 5000, @@ -58,7 +59,7 @@ expectError( const options = { datasource: [], onDocument (doc: Record) { - return { index: { _index: 'test' } } + return { index: { _index: 'test' } } } } expectAssignable>>(options) @@ -139,20 +140,20 @@ expectError( } // with type defs -{ +{ interface ShardsResponse { total: number; successful: number; failed: number; skipped: number; } - + interface Explanation { value: number; description: string; details: Explanation[]; } - + interface SearchResponse { took: number; timed_out: boolean; @@ -178,7 +179,7 @@ expectError( }; aggregations?: any; } - + interface Source { foo: string } @@ -208,20 +209,20 @@ expectError( match: { foo: string } } } - + interface ShardsResponse { total: number; successful: number; failed: number; skipped: number; } - + interface Explanation { value: number; description: string; details: Explanation[]; } - + interface SearchResponse { took: number; timed_out: boolean; @@ -247,7 +248,7 @@ expectError( }; aggregations?: any; } - + interface Source { foo: string } @@ -310,7 +311,7 @@ expectError( } // with type defs -{ +{ interface Source { foo: string } @@ -337,7 +338,7 @@ expectError( match: { foo: string } } } - + interface Source { foo: string } @@ -415,7 +416,7 @@ expectError( match: { foo: string } } } - + interface Source { foo: string } @@ -436,7 +437,8 @@ expectError( /// .helpers.msearch const s = client.helpers.msearch({ - operations: 20, + operations: 5, + flushInterval: 500, concurrency: 5, retries: 5, wait: 5000 @@ -456,4 +458,4 @@ expectType(s.search({ index: 'foo'}, { query: {} }, (err, result) => { expectType(s.search, string>({ index: 'foo'}, { query: {} }, (err, result) => { expectType(err) expectType>(result) -})) \ No newline at end of file +})) diff --git a/test/unit/helpers/bulk.test.js b/test/unit/helpers/bulk.test.js index cb9ab8ed7..0eb8d94fe 100644 --- a/test/unit/helpers/bulk.test.js +++ b/test/unit/helpers/bulk.test.js @@ -7,6 +7,7 @@ const { createReadStream } = require('fs') const { join } = require('path') const split = require('split2') +const FakeTimers = require('@sinonjs/fake-timers') const semver = require('semver') const { test } = require('tap') const { Client, errors } = require('../../../') @@ -987,3 +988,118 @@ test('errors', t => { t.end() }) + +test('Flush interval', t => { + t.test('Slow producer', async t => { + const clock = FakeTimers.install({ toFake: ['setTimeout', 'clearTimeout'] }) + t.teardown(() => clock.uninstall()) + + let count = 0 + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + t.strictEqual(params.path, '/_bulk') + t.match(params.headers, { 'content-type': 'application/x-ndjson' }) + const [action, payload] = params.body.split('\n') + t.deepEqual(JSON.parse(action), { index: { _index: 'test' } }) + t.deepEqual(JSON.parse(payload), dataset[count++]) + return { body: { errors: false, items: [{}] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const result = await client.helpers.bulk({ + datasource: (async function * generator () { + for (const chunk of dataset) { + await clock.nextAsync() + yield chunk + } + })(), + flushBytes: 5000000, + concurrency: 1, + onDocument (doc) { + return { + index: { _index: 'test' } + } + }, + onDrop (doc) { + t.fail('This should never be called') + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 3, + retry: 0, + failed: 0, + aborted: false + }) + }) + + t.test('Abort operation', async t => { + const clock = FakeTimers.install({ toFake: ['setTimeout', 'clearTimeout'] }) + t.teardown(() => clock.uninstall()) + + let count = 0 + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + t.true(count < 2) + t.strictEqual(params.path, '/_bulk') + t.match(params.headers, { 'content-type': 'application/x-ndjson' }) + const [action, payload] = params.body.split('\n') + t.deepEqual(JSON.parse(action), { index: { _index: 'test' } }) + t.deepEqual(JSON.parse(payload), dataset[count++]) + return { body: { errors: false, items: [{}] } } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const b = client.helpers.bulk({ + datasource: (async function * generator () { + for (const chunk of dataset) { + await clock.nextAsync() + if (chunk.user === 'tyrion') { + // Needed otherwise in Node.js 10 + // the second request will never be sent + await Promise.resolve() + b.abort() + } + yield chunk + } + })(), + flushBytes: 5000000, + concurrency: 1, + onDocument (doc) { + return { + index: { _index: 'test' } + } + }, + onDrop (doc) { + t.fail('This should never be called') + } + }) + + const result = await b + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 2, + successful: 2, + retry: 0, + failed: 0, + aborted: true + }) + }) + + t.end() +}) diff --git a/test/unit/helpers/msearch.test.js b/test/unit/helpers/msearch.test.js index 8c1c64fd6..64e79dcf1 100644 --- a/test/unit/helpers/msearch.test.js +++ b/test/unit/helpers/msearch.test.js @@ -7,6 +7,7 @@ const { test } = require('tap') const { Client, errors } = require('../../../') const { connection } = require('../../utils') +const FakeTimers = require('@sinonjs/fake-timers') test('Basic', async t => { const MockConnection = connection.buildMockConnection({ @@ -578,3 +579,158 @@ test('Multiple searches (concurrency = 1)', t => { t.teardown(() => s.stop()) }) + +test('Flush interval', t => { + t.plan(4) + const clock = FakeTimers.install({ toFake: ['setTimeout', 'clearTimeout'] }) + t.teardown(() => clock.uninstall()) + + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return { + body: { + responses: [{ + status: 200, + hits: { + hits: [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] + } + }, { + status: 200, + hits: { + hits: [ + { _source: { four: 'four' } }, + { _source: { five: 'five' } }, + { _source: { six: 'six' } } + ] + } + }] + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch() + + s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { + t.error(err) + t.is(result.documents.length, 3) + }) + + s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { + t.error(err) + t.is(result.documents.length, 3) + }) + + setImmediate(clock.next) + + t.teardown(() => s.stop()) +}) + +test('Flush interval - early stop', t => { + t.plan(3) + const clock = FakeTimers.install({ toFake: ['setTimeout', 'clearTimeout'] }) + t.teardown(() => clock.uninstall()) + + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return { + body: { + responses: [{ + status: 200, + hits: { + hits: [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] + } + }] + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch() + + s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { + t.error(err) + t.is(result.documents.length, 3) + }) + + setImmediate(() => { + clock.next() + s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { + t.ok(err instanceof errors.ConfigurationError) + }) + }) + + s.stop() +}) + +test('Stop should resolve the helper', t => { + t.plan(1) + + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return { + body: { + responses: [] + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch() + setImmediate(s.stop) + + s.then(() => t.pass('Called')) + .catch(() => t.fail('Should not fail')) +}) + +test('Stop should resolve the helper (error)', t => { + t.plan(3) + + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return { + body: { + responses: [] + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch() + setImmediate(s.stop, new Error('kaboom')) + + s.then(() => t.fail('Should not fail')) + .catch(err => t.is(err.message, 'kaboom')) + + s.catch(err => t.is(err.message, 'kaboom')) + + s.then(() => t.fail('Should not fail'), err => t.is(err.message, 'kaboom')) +})