From ea009da3b6ff97914b17b3a3fbf63f630a71ed8c Mon Sep 17 00:00:00 2001 From: Tomas Della Vedova Date: Mon, 12 Oct 2020 10:46:51 +0200 Subject: [PATCH] Scroll search should clear the scroll at the end (#1331) --- lib/Helpers.js | 8 ++- test/unit/helpers/scroll.test.js | 103 +++++++++++++++++-------------- 2 files changed, 65 insertions(+), 46 deletions(-) diff --git a/lib/Helpers.js b/lib/Helpers.js index ac8509764..6a3524b5f 100644 --- a/lib/Helpers.js +++ b/lib/Helpers.js @@ -104,13 +104,15 @@ class Helpers { } while (response.body.hits && response.body.hits.hits.length > 0) { + // scroll id is always present in the response, but it might + // change over time based on the number of shards scroll_id = response.body._scroll_id response.clear = clear addDocumentsGetter(response) yield response - if (!scroll_id || stop === true) { + if (stop === true) { break } @@ -127,6 +129,10 @@ class Helpers { throw new ResponseError(response) } } + + if (stop === false) { + await clear() + } } /** diff --git a/test/unit/helpers/scroll.test.js b/test/unit/helpers/scroll.test.js index cfa26c969..ca50957ee 100644 --- a/test/unit/helpers/scroll.test.js +++ b/test/unit/helpers/scroll.test.js @@ -27,17 +27,26 @@ test('Scroll search', async t => { var count = 0 const MockConnection = connection.buildMockConnection({ onRequest (params) { - t.strictEqual(params.querystring, 'scroll=1m') + count += 1 + if (params.method === 'POST') { + t.strictEqual(params.querystring, 'scroll=1m') + } + if (count === 4) { + // final automated clear + t.strictEqual(params.method, 'DELETE') + } return { body: { - _scroll_id: count === 3 ? undefined : 'id', + _scroll_id: 'id', count, hits: { - hits: [ - { _source: { one: 'one' } }, - { _source: { two: 'two' } }, - { _source: { three: 'three' } } - ] + hits: count === 3 + ? [] + : [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] } } } @@ -56,12 +65,7 @@ test('Scroll search', async t => { for await (const result of scrollSearch) { t.strictEqual(result.body.count, count) - if (count < 3) { - t.strictEqual(result.body._scroll_id, 'id') - } else { - t.strictEqual(result.body._scroll_id, undefined) - } - count += 1 + t.strictEqual(result.body._scroll_id, 'id') } }) @@ -115,21 +119,27 @@ test('Scroll search (retry)', async t => { var count = 0 const MockConnection = connection.buildMockConnection({ onRequest (params) { + count += 1 if (count === 1) { - count += 1 return { body: {}, statusCode: 429 } } + if (count === 5) { + // final automated clear + t.strictEqual(params.method, 'DELETE') + } return { statusCode: 200, body: { - _scroll_id: count === 4 ? undefined : 'id', + _scroll_id: 'id', count, hits: { - hits: [ - { _source: { one: 'one' } }, - { _source: { two: 'two' } }, - { _source: { three: 'three' } } - ] + hits: count === 4 + ? [] + : [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] } } } @@ -151,12 +161,7 @@ test('Scroll search (retry)', async t => { for await (const result of scrollSearch) { t.strictEqual(result.body.count, count) t.notStrictEqual(result.body.count, 1) - if (count < 4) { - t.strictEqual(result.body._scroll_id, 'id') - } else { - t.strictEqual(result.body._scroll_id, undefined) - } - count += 1 + t.strictEqual(result.body._scroll_id, 'id') } }) @@ -198,20 +203,20 @@ test('Scroll search (retry throws and maxRetries)', async t => { test('Scroll search (retry throws later)', async t => { const maxRetries = 5 - const expectedAttempts = maxRetries + 1 + const expectedAttempts = maxRetries + 2 var count = 0 const MockConnection = connection.buildMockConnection({ onRequest (params) { - // filter_path should not be added if is not already present + count += 1 + // filter_path should not be added if is not already present t.strictEqual(params.querystring, 'scroll=1m') if (count > 1) { - count += 1 return { body: {}, statusCode: 429 } } return { statusCode: 200, body: { - _scroll_id: count === 4 ? undefined : 'id', + _scroll_id: 'id', count, hits: { hits: [ @@ -227,7 +232,8 @@ test('Scroll search (retry throws later)', async t => { const client = new Client({ node: 'http://localhost:9200', - Connection: MockConnection + Connection: MockConnection, + maxRetries }) const scrollSearch = client.helpers.scrollSearch({ @@ -240,7 +246,6 @@ test('Scroll search (retry throws later)', async t => { try { for await (const result of scrollSearch) { // eslint-disable-line t.strictEqual(result.body.count, count) - count += 1 } } catch (err) { t.true(err instanceof errors.ResponseError) @@ -256,19 +261,23 @@ test('Scroll search documents', async t => { if (count === 0) { t.strictEqual(params.querystring, 'filter_path=hits.hits._source%2C_scroll_id&scroll=1m') } else { - t.strictEqual(params.querystring, 'scroll=1m') - t.strictEqual(params.body, '{"scroll_id":"id"}') + if (params.method !== 'DELETE') { + t.strictEqual(params.querystring, 'scroll=1m') + t.strictEqual(params.body, '{"scroll_id":"id"}') + } } return { body: { - _scroll_id: count === 3 ? undefined : 'id', + _scroll_id: 'id', count, hits: { - hits: [ - { _source: { val: 1 * count } }, - { _source: { val: 2 * count } }, - { _source: { val: 3 * count } } - ] + hits: count === 3 + ? [] + : [ + { _source: { val: 1 * count } }, + { _source: { val: 2 * count } }, + { _source: { val: 3 * count } } + ] } } } @@ -339,15 +348,19 @@ test('Fix querystring for scroll search', async t => { if (count === 0) { t.strictEqual(params.querystring, 'size=1&scroll=1m') } else { - t.strictEqual(params.querystring, 'scroll=1m') + if (params.method !== 'DELETE') { + t.strictEqual(params.querystring, 'scroll=1m') + } } return { body: { - _scroll_id: count === 3 ? undefined : 'id', + _scroll_id: 'id', hits: { - hits: [ - { _source: { val: count } } - ] + hits: count === 3 + ? [] + : [ + { _source: { val: count } } + ] } } }