From 56e28766c82d5733990e31f382c8434450400560 Mon Sep 17 00:00:00 2001 From: Josh Mock Date: Thu, 15 Jun 2023 11:14:56 -0500 Subject: [PATCH] Backport bulk helper index drift bugfix to 7.x (#1915) --- lib/Helpers.js | 3 +- package.json | 2 +- test/unit/helpers/bulk.test.js | 64 ++++++++++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 2 deletions(-) diff --git a/lib/Helpers.js b/lib/Helpers.js index cd78c392d..8302050ce 100644 --- a/lib/Helpers.js +++ b/lib/Helpers.js @@ -705,11 +705,11 @@ class Helpers { } const retry = [] const { items } = body + let indexSlice = 0 for (let i = 0, len = items.length; i < len; i++) { const action = items[i] const operation = Object.keys(action)[0] const { status } = action[operation] - const indexSlice = operation !== 'delete' ? i * 2 : i if (status >= 400) { // 429 is the only staus code where we might want to retry @@ -736,6 +736,7 @@ class Helpers { } else { stats.successful += 1 } + operation === 'delete' ? indexSlice += 1 : indexSlice += 2 } callback(null, retry) }) diff --git a/package.json b/package.json index 151181b67..d796e264e 100644 --- a/package.json +++ b/package.json @@ -12,7 +12,7 @@ }, "homepage": "http://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/index.html", "version": "7.17.11", - "versionCanary": "7.17.11-canary.0", + "versionCanary": "7.17.11-canary.1", "keywords": [ "elasticsearch", "elastic", diff --git a/test/unit/helpers/bulk.test.js b/test/unit/helpers/bulk.test.js index b8a00a1f0..33ca8b570 100644 --- a/test/unit/helpers/bulk.test.js +++ b/test/unit/helpers/bulk.test.js @@ -1082,6 +1082,70 @@ test('bulk delete', t => { server.stop() }) + t.test('Should call onDrop on the correct document when doing a mix of operations that includes deletes', async t => { + // checks to ensure onDrop doesn't provide the wrong document when some operations are deletes + // see https://github.com/elastic/elasticsearch-js/issues/1751 + async function handler (req, res) { + res.setHeader('content-type', 'application/json') + res.end(JSON.stringify({ + took: 0, + errors: true, + items: [ + { delete: { status: 200 } }, + { index: { status: 429 } }, + { index: { status: 200 } } + ] + })) + } + + const [{ port }, server] = await buildServer(handler) + const client = new Client({ node: `http://localhost:${port}` }) + let counter = 0 + const result = await client.helpers.bulk({ + datasource: dataset.slice(), + concurrency: 1, + wait: 10, + retries: 0, + onDocument (doc) { + counter++ + if (counter === 1) { + return { + delete: { + _index: 'test', + _id: String(counter) + } + } + } else { + return { + index: { + _index: 'test' + } + } + } + }, + onDrop (doc) { + t.same(doc, { + status: 429, + error: null, + operation: { index: { _index: 'test' } }, + document: { user: 'arya', age: 18 }, + retried: false + }) + } + }) + + t.type(result.time, 'number') + t.type(result.bytes, 'number') + t.match(result, { + total: 3, + successful: 2, + retry: 0, + failed: 1, + aborted: false + }) + server.stop() + }) + t.end() })