diff --git a/lib/Helpers.js b/lib/Helpers.js index e920e8f96..3d7263957 100644 --- a/lib/Helpers.js +++ b/lib/Helpers.js @@ -155,7 +155,7 @@ class Helpers { let stopReading = false let stopError = null - let timeoutId = null + let timeoutRef = null const operationsStream = new Readable({ objectMode: true, read (size) {} @@ -224,9 +224,10 @@ class Helpers { const msearchBody = [] const callbacks = [] let loadedOperations = 0 + timeoutRef = setTimeout(onFlushTimeout, flushInterval) for await (const operation of operationsStream) { - clearTimeout(timeoutId) + timeoutRef.refresh() loadedOperations += 1 msearchBody.push(operation[0], operation[1]) callbacks.push(operation[2]) @@ -236,12 +237,10 @@ class Helpers { msearchBody.length = 0 callbacks.length = 0 loadedOperations = 0 - } else { - timeoutId = setTimeout(onFlushTimeout, flushInterval) } } - clearTimeout(timeoutId) + clearTimeout(timeoutRef) // In some cases the previos http call does not have finished, // or we didn't reach the flush bytes threshold, so we force one last operation. if (loadedOperations > 0) { @@ -256,6 +255,7 @@ class Helpers { } async function onFlushTimeout () { + if (loadedOperations === 0) return const msearchBodyCopy = msearchBody.slice() const callbacksCopy = callbacks.slice() msearchBody.length = 0 @@ -415,7 +415,7 @@ class Helpers { } let shouldAbort = false - let timeoutId = null + let timeoutRef = null const stats = { total: 0, failed: 0, @@ -435,7 +435,7 @@ class Helpers { return p.catch(onRejected) }, abort () { - clearTimeout(timeoutId) + clearTimeout(timeoutRef) shouldAbort = true stats.aborted = true return this @@ -460,10 +460,11 @@ class Helpers { let actionBody = '' let payloadBody = '' let chunkBytes = 0 + timeoutRef = setTimeout(onFlushTimeout, flushInterval) for await (const chunk of datasource) { if (shouldAbort === true) break - clearTimeout(timeoutId) + timeoutRef.refresh() const action = onDocument(chunk) const operation = Array.isArray(action) ? Object.keys(action[0])[0] @@ -485,6 +486,7 @@ class Helpers { chunkBytes += Buffer.byteLength(actionBody) bulkBody.push(actionBody) } else { + clearTimeout(timeoutRef) throw new ConfigurationError(`Bulk helper invalid action: '${operation}'`) } @@ -494,12 +496,10 @@ class Helpers { send(bulkBody.slice()) bulkBody.length = 0 chunkBytes = 0 - } else { - timeoutId = setTimeout(onFlushTimeout, flushInterval) } } - clearTimeout(timeoutId) + clearTimeout(timeoutRef) // In some cases the previos http call does not have finished, // or we didn't reach the flush bytes threshold, so we force one last operation. if (shouldAbort === false && chunkBytes > 0) { @@ -524,6 +524,7 @@ class Helpers { return stats async function onFlushTimeout () { + if (chunkBytes === 0) return stats.bytes += chunkBytes const bulkBodyCopy = bulkBody.slice() bulkBody.length = 0 diff --git a/package.json b/package.json index bfdf26eaa..ab5517c46 100644 --- a/package.json +++ b/package.json @@ -41,7 +41,7 @@ "company": "Elasticsearch BV" }, "devDependencies": { - "@sinonjs/fake-timers": "^6.0.1", + "@sinonjs/fake-timers": "github:sinonjs/fake-timers#0bfffc1", "@types/node": "^12.6.2", "convert-hrtime": "^3.0.0", "dedent": "^0.7.0", diff --git a/test/unit/helpers/msearch.test.js b/test/unit/helpers/msearch.test.js index 6f6c94b4e..cfd5415c0 100644 --- a/test/unit/helpers/msearch.test.js +++ b/test/unit/helpers/msearch.test.js @@ -637,8 +637,6 @@ test('Flush interval', t => { test('Flush interval - early stop', t => { t.plan(3) - const clock = FakeTimers.install({ toFake: ['setTimeout', 'clearTimeout'] }) - t.teardown(() => clock.uninstall()) const MockConnection = connection.buildMockConnection({ onRequest (params) { @@ -672,7 +670,6 @@ test('Flush interval - early stop', t => { }) setImmediate(() => { - clock.next() m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { t.ok(err instanceof errors.ConfigurationError) })