From 0087c4998797681dff538ae41be14881bfa6e41a Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 4 Jun 2020 13:35:38 +0200 Subject: [PATCH] [Backport 7.x] Improve helper concurrency (#1216) Co-authored-by: Tomas Della Vedova --- docs/helpers.asciidoc | 24 ++--- lib/Helpers.js | 10 +- test/integration/helpers/msearch.test.js | 22 ++-- test/unit/helpers/msearch.test.js | 126 +++++++++++------------ 4 files changed, 92 insertions(+), 90 deletions(-) diff --git a/docs/helpers.asciidoc b/docs/helpers.asciidoc index 37f747f82..8435ffa3e 100644 --- a/docs/helpers.asciidoc +++ b/docs/helpers.asciidoc @@ -238,10 +238,10 @@ The `result` exposes a `documents` property as well, which allows you to access const { Client } = require('@elastic/elasticsearch') const client = new Client({ node: 'http://localhost:9200' }) -const s = client.helpers.msearch() +const m = client.helpers.msearch() // promise style API -s.search( +m.search( { index: 'stackoverflow' }, { query: { match: { title: 'javascript' } } } ) @@ -249,7 +249,7 @@ s.search( .catch(err => console.error(err)) // callback style API -s.search( +m.search( { index: 'stackoverflow' }, { query: { match: { title: 'ruby' } } }, (err, result) => { @@ -267,7 +267,7 @@ a|How many search operations should be sent in a single msearch request. + _Default:_ `5` [source,js] ---- -const b = client.helpers.msearch({ +const m = client.helpers.msearch({ operations: 10 }) ---- @@ -277,7 +277,7 @@ a|How much time (in milliseconds) the helper will wait before flushing the opera _Default:_ `500` [source,js] ---- -const b = client.helpers.msearch({ +const m = client.helpers.msearch({ flushInterval: 500 }) ---- @@ -287,7 +287,7 @@ a|How many request will be executed at the same time. + _Default:_ `5` [source,js] ---- -const b = client.helpers.msearch({ +const m = client.helpers.msearch({ concurrency: 10 }) ---- @@ -297,7 +297,7 @@ a|How many times an operation will be retried before to resolve the request. An _Default:_ Client max retries. [source,js] ---- -const b = client.helpers.msearch({ +const m = client.helpers.msearch({ retries: 3 }) ---- @@ -307,7 +307,7 @@ a|How much time to wait before retries in milliseconds. + _Default:_ 5000. [source,js] ---- -const b = client.helpers.msearch({ +const m = client.helpers.msearch({ wait: 3000 }) ---- @@ -328,23 +328,23 @@ NOTE: The stop method will stop the execution of the msearch processor, but if y const { Client } = require('@elastic/elasticsearch') const client = new Client({ node: 'http://localhost:9200' }) -const s = client.helpers.msearch() +const m = client.helpers.msearch() -s.search( +m.search( { index: 'stackoverflow' }, { query: { match: { title: 'javascript' } } } ) .then(result => console.log(result.body)) .catch(err => console.error(err)) -s.search( +m.search( { index: 'stackoverflow' }, { query: { match: { title: 'ruby' } } } ) .then(result => console.log(result.body)) .catch(err => console.error(err)) -setImmediate(() => s.stop()) +setImmediate(() => m.stop()) ---- === Search Helper diff --git a/lib/Helpers.js b/lib/Helpers.js index a8f33c6c0..e920e8f96 100644 --- a/lib/Helpers.js +++ b/lib/Helpers.js @@ -301,6 +301,7 @@ class Helpers { function semaphore () { if (running < concurrency) { + running += 1 return pImmediate(send) } else { return new Promise((resolve, reject) => { @@ -311,13 +312,13 @@ class Helpers { function send (msearchBody, callbacks) { /* istanbul ignore if */ - if (running >= concurrency) { + if (running > concurrency) { throw new Error('Max concurrency reached') } - running += 1 msearchOperation(msearchBody, callbacks, () => { running -= 1 if (resolveSemaphore) { + running += 1 resolveSemaphore(send) resolveSemaphore = null } else if (resolveFinish && running === 0) { @@ -575,6 +576,7 @@ class Helpers { function semaphore () { if (running < concurrency) { + running += 1 return pImmediate(send) } else { return new Promise((resolve, reject) => { @@ -585,10 +587,9 @@ class Helpers { function send (bulkBody) { /* istanbul ignore if */ - if (running >= concurrency) { + if (running > concurrency) { throw new Error('Max concurrency reached') } - running += 1 bulkOperation(bulkBody, err => { running -= 1 if (err) { @@ -596,6 +597,7 @@ class Helpers { error = err } if (resolveSemaphore) { + running += 1 resolveSemaphore(send) resolveSemaphore = null } else if (resolveFinish && running === 0) { diff --git a/test/integration/helpers/msearch.test.js b/test/integration/helpers/msearch.test.js index e95be5745..d3a4f3ba8 100644 --- a/test/integration/helpers/msearch.test.js +++ b/test/integration/helpers/msearch.test.js @@ -40,9 +40,9 @@ afterEach(async () => { test('Basic', t => { t.plan(4) - const s = client.helpers.msearch({ operations: 1 }) + const m = client.helpers.msearch({ operations: 1 }) - s.search( + m.search( { index: INDEX }, { query: { match: { title: 'javascript' } } }, (err, result) => { @@ -51,7 +51,7 @@ test('Basic', t => { } ) - s.search( + m.search( { index: INDEX }, { query: { match: { title: 'ruby' } } }, (err, result) => { @@ -60,14 +60,14 @@ test('Basic', t => { } ) - t.teardown(() => s.stop()) + t.teardown(() => m.stop()) }) test('Bad request', t => { t.plan(3) - const s = client.helpers.msearch({ operations: 1 }) + const m = client.helpers.msearch({ operations: 1 }) - s.search( + m.search( { index: INDEX }, { query: { match: { title: 'javascript' } } }, (err, result) => { @@ -76,7 +76,7 @@ test('Bad request', t => { } ) - s.search( + m.search( { index: INDEX }, { query: { foo: { title: 'ruby' } } }, (err, result) => { @@ -84,15 +84,15 @@ test('Bad request', t => { } ) - t.teardown(() => s.stop()) + t.teardown(() => m.stop()) }) test('Send multiple request concurrently over the concurrency limit', t => { t.plan(20) - const s = client.helpers.msearch({ operations: 1 }) + const m = client.helpers.msearch({ operations: 1 }) for (let i = 0; i < 10; i++) { - s.search( + m.search( { index: INDEX }, { query: { match: { title: 'javascript' } } }, (err, result) => { @@ -102,5 +102,5 @@ test('Send multiple request concurrently over the concurrency limit', t => { ) } - t.teardown(() => s.stop()) + t.teardown(() => m.stop()) }) diff --git a/test/unit/helpers/msearch.test.js b/test/unit/helpers/msearch.test.js index 64e79dcf1..6f6c94b4e 100644 --- a/test/unit/helpers/msearch.test.js +++ b/test/unit/helpers/msearch.test.js @@ -34,9 +34,9 @@ test('Basic', async t => { Connection: MockConnection }) - const s = client.helpers.msearch({ operations: 1 }) + const m = client.helpers.msearch({ operations: 1 }) - const result = await s.search( + const result = await m.search( { index: 'test' }, { query: { match: { foo: 'bar' } } } ) @@ -58,7 +58,7 @@ test('Basic', async t => { { three: 'three' } ]) - t.teardown(() => s.stop()) + t.teardown(() => m.stop()) }) test('Multiple searches (inside async iterator)', t => { @@ -97,9 +97,9 @@ test('Multiple searches (inside async iterator)', t => { Connection: MockConnection }) - const s = client.helpers.msearch({ operations: 2 }) + const m = client.helpers.msearch({ operations: 2 }) - s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { + m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { t.error(err) t.deepEqual(result.body, { status: 200, @@ -119,7 +119,7 @@ test('Multiple searches (inside async iterator)', t => { ]) }) - s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { + m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { t.error(err) t.deepEqual(result.body, { status: 200, @@ -139,7 +139,7 @@ test('Multiple searches (inside async iterator)', t => { ]) }) - t.teardown(() => s.stop()) + t.teardown(() => m.stop()) }) test('Multiple searches (async iterator exits)', t => { @@ -178,9 +178,9 @@ test('Multiple searches (async iterator exits)', t => { Connection: MockConnection }) - const s = client.helpers.msearch() + const m = client.helpers.msearch() - s.search({ index: 'test' }, { query: {} }, (err, result) => { + m.search({ index: 'test' }, { query: {} }, (err, result) => { t.error(err) t.deepEqual(result.body, { status: 200, @@ -200,7 +200,7 @@ test('Multiple searches (async iterator exits)', t => { ]) }) - s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { + m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { t.error(err) t.deepEqual(result.body, { status: 200, @@ -220,7 +220,7 @@ test('Multiple searches (async iterator exits)', t => { ]) }) - setImmediate(() => s.stop()) + setImmediate(() => m.stop()) }) test('Stop a msearch processor (promises)', async t => { @@ -235,12 +235,12 @@ test('Stop a msearch processor (promises)', async t => { Connection: MockConnection }) - const s = client.helpers.msearch({ operations: 1 }) + const m = client.helpers.msearch({ operations: 1 }) - s.stop() + m.stop() try { - await s.search( + await m.search( { index: 'test' }, { query: { match: { foo: 'bar' } } } ) @@ -248,7 +248,7 @@ test('Stop a msearch processor (promises)', async t => { t.strictEqual(err.message, 'The msearch processor has been stopped') } - t.teardown(() => s.stop()) + t.teardown(() => m.stop()) }) test('Stop a msearch processor (callbacks)', t => { @@ -265,11 +265,11 @@ test('Stop a msearch processor (callbacks)', t => { Connection: MockConnection }) - const s = client.helpers.msearch() + const m = client.helpers.msearch() - s.stop() + m.stop() - s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { + m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { t.strictEqual(err.message, 'The msearch processor has been stopped') }) }) @@ -288,13 +288,13 @@ test('Bad header', t => { Connection: MockConnection }) - const s = client.helpers.msearch() + const m = client.helpers.msearch() - s.search(null, { query: { match: { foo: 'bar' } } }, (err, result) => { + m.search(null, { query: { match: { foo: 'bar' } } }, (err, result) => { t.strictEqual(err.message, 'The header should be an object') }) - t.teardown(() => s.stop()) + t.teardown(() => m.stop()) }) test('Bad body', t => { @@ -311,13 +311,13 @@ test('Bad body', t => { Connection: MockConnection }) - const s = client.helpers.msearch() + const m = client.helpers.msearch() - s.search({ index: 'test' }, null, (err, result) => { + m.search({ index: 'test' }, null, (err, result) => { t.strictEqual(err.message, 'The body should be an object') }) - t.teardown(() => s.stop()) + t.teardown(() => m.stop()) }) test('Retry on 429', async t => { @@ -357,9 +357,9 @@ test('Retry on 429', async t => { Connection: MockConnection }) - const s = client.helpers.msearch({ operations: 1, wait: 10 }) + const m = client.helpers.msearch({ operations: 1, wait: 10 }) - const result = await s.search( + const result = await m.search( { index: 'test' }, { query: { match: { foo: 'bar' } } } ) @@ -381,7 +381,7 @@ test('Retry on 429', async t => { { three: 'three' } ]) - t.teardown(() => s.stop()) + t.teardown(() => m.stop()) }) test('Single search errors', async t => { @@ -403,10 +403,10 @@ test('Single search errors', async t => { Connection: MockConnection }) - const s = client.helpers.msearch({ operations: 1 }) + const m = client.helpers.msearch({ operations: 1 }) try { - await s.search( + await m.search( { index: 'test' }, { query: { match: { foo: 'bar' } } } ) @@ -414,7 +414,7 @@ test('Single search errors', async t => { t.true(err instanceof errors.ResponseError) } - t.teardown(() => s.stop()) + t.teardown(() => m.stop()) }) test('Entire msearch fails', t => { @@ -437,19 +437,19 @@ test('Entire msearch fails', t => { Connection: MockConnection }) - const s = client.helpers.msearch({ operations: 1 }) + const m = client.helpers.msearch({ operations: 1 }) - s.search({ index: 'test' }, { query: {} }, (err, result) => { + m.search({ index: 'test' }, { query: {} }, (err, result) => { t.true(err instanceof errors.ResponseError) t.deepEqual(result.documents, []) }) - s.search({ index: 'test' }, { query: {} }, (err, result) => { + m.search({ index: 'test' }, { query: {} }, (err, result) => { t.true(err instanceof errors.ResponseError) t.deepEqual(result.documents, []) }) - t.teardown(() => s.stop()) + t.teardown(() => m.stop()) }) test('Resolves the msearch helper', t => { @@ -466,16 +466,16 @@ test('Resolves the msearch helper', t => { Connection: MockConnection }) - const s = client.helpers.msearch() + const m = client.helpers.msearch() - s.stop() + m.stop() - s.then( + m.then( () => t.pass('called'), e => t.fail('Should not fail') ) - s.catch(e => t.fail('Should not fail')) + m.catch(e => t.fail('Should not fail')) }) test('Stop the msearch helper with an error', t => { @@ -492,18 +492,18 @@ test('Stop the msearch helper with an error', t => { Connection: MockConnection }) - const s = client.helpers.msearch() + const m = client.helpers.msearch() - s.stop(new Error('kaboom')) + m.stop(new Error('kaboom')) - s.then( + m.then( () => t.fail('Should fail'), err => t.is(err.message, 'kaboom') ) - s.catch(err => t.is(err.message, 'kaboom')) + m.catch(err => t.is(err.message, 'kaboom')) - s.search({ index: 'test' }, { query: {} }, (err, result) => { + m.search({ index: 'test' }, { query: {} }, (err, result) => { t.is(err.message, 'kaboom') }) }) @@ -535,9 +535,9 @@ test('Multiple searches (concurrency = 1)', t => { Connection: MockConnection }) - const s = client.helpers.msearch({ operations: 1, concurrency: 1 }) + const m = client.helpers.msearch({ operations: 1, concurrency: 1 }) - s.search({ index: 'test' }, { query: {} }, (err, result) => { + m.search({ index: 'test' }, { query: {} }, (err, result) => { t.error(err) t.deepEqual(result.body, { status: 200, @@ -557,7 +557,7 @@ test('Multiple searches (concurrency = 1)', t => { ]) }) - s.search({ index: 'test' }, { query: {} }, (err, result) => { + m.search({ index: 'test' }, { query: {} }, (err, result) => { t.error(err) t.deepEqual(result.body, { status: 200, @@ -577,7 +577,7 @@ test('Multiple searches (concurrency = 1)', t => { ]) }) - t.teardown(() => s.stop()) + t.teardown(() => m.stop()) }) test('Flush interval', t => { @@ -618,21 +618,21 @@ test('Flush interval', t => { Connection: MockConnection }) - const s = client.helpers.msearch() + const m = client.helpers.msearch() - s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { + m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { t.error(err) t.is(result.documents.length, 3) }) - s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { + m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { t.error(err) t.is(result.documents.length, 3) }) setImmediate(clock.next) - t.teardown(() => s.stop()) + t.teardown(() => m.stop()) }) test('Flush interval - early stop', t => { @@ -664,21 +664,21 @@ test('Flush interval - early stop', t => { Connection: MockConnection }) - const s = client.helpers.msearch() + const m = client.helpers.msearch() - s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { + m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { t.error(err) t.is(result.documents.length, 3) }) setImmediate(() => { clock.next() - s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { + m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { t.ok(err instanceof errors.ConfigurationError) }) }) - s.stop() + m.stop() }) test('Stop should resolve the helper', t => { @@ -699,10 +699,10 @@ test('Stop should resolve the helper', t => { Connection: MockConnection }) - const s = client.helpers.msearch() - setImmediate(s.stop) + const m = client.helpers.msearch() + setImmediate(m.stop) - s.then(() => t.pass('Called')) + m.then(() => t.pass('Called')) .catch(() => t.fail('Should not fail')) }) @@ -724,13 +724,13 @@ test('Stop should resolve the helper (error)', t => { Connection: MockConnection }) - const s = client.helpers.msearch() - setImmediate(s.stop, new Error('kaboom')) + const m = client.helpers.msearch() + setImmediate(m.stop, new Error('kaboom')) - s.then(() => t.fail('Should not fail')) + m.then(() => t.fail('Should not fail')) .catch(err => t.is(err.message, 'kaboom')) - s.catch(err => t.is(err.message, 'kaboom')) + m.catch(err => t.is(err.message, 'kaboom')) - s.then(() => t.fail('Should not fail'), err => t.is(err.message, 'kaboom')) + m.then(() => t.fail('Should not fail'), err => t.is(err.message, 'kaboom')) })