From 57fbbd0a8f7f2716583b403966ed61f04164420d Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Mon, 25 May 2020 17:19:44 +0200 Subject: [PATCH] [Backport 7.x] Add multi search helper (#1202) Co-authored-by: Tomas Della Vedova --- docs/helpers.asciidoc | 126 ++++- lib/Helpers.d.ts | 23 +- lib/Helpers.js | 255 +++++++++- test/integration/helpers/msearch.test.js | 106 +++++ test/types/helpers.test-d.ts | 31 +- test/unit/helpers/msearch.test.js | 580 +++++++++++++++++++++++ 6 files changed, 1102 insertions(+), 19 deletions(-) create mode 100644 test/integration/helpers/msearch.test.js create mode 100644 test/unit/helpers/msearch.test.js diff --git a/docs/helpers.asciidoc b/docs/helpers.asciidoc index b8aa72399..cea848156 100644 --- a/docs/helpers.asciidoc +++ b/docs/helpers.asciidoc @@ -7,6 +7,9 @@ CAUTION: The client helpers are experimental, and the API may change in the next The helpers will not work in any Node.js version lower than 10. === Bulk Helper + +~Added~ ~in~ ~`v7.7.0`~ + Running Bulk requests can be complex due to the shape of the API, this helper aims to provide a nicer developer experience around the Bulk API. ==== Usage @@ -114,7 +117,7 @@ const b = client.helpers.bulk({ ---- |`wait` -a|How much time to wait before retries in milliseconds.+ +a|How much time to wait before retries in milliseconds. + _Default:_ 5000. [source,js] ---- @@ -211,7 +214,123 @@ const result = await client.helpers.bulk({ console.log(result) ---- +=== Multi Search Helper + +~Added~ ~in~ ~`v7.8.0`~ + +If you are sending search request at a high rate, this helper might be useful for you. +It will use the mutli search API under the hood to batch the requests and improve the overall performances of your application. + +The `result` exposes a `documents` property as well, which allows you to access directly the hits sources. + +==== Usage +[source,js] +---- +const { Client } = require('@elastic/elasticsearch') + +const client = new Client({ node: 'http://localhost:9200' }) +const s = client.helpers.msearch() + +// promise style API +s.search( + { index: 'stackoverflow' }, + { query: { match: { title: 'javascript' } } } + ) + .then(result => console.log(result.body)) // or result.documents + .catch(err => console.error(err)) + +// callback style API +s.search( + { index: 'stackoverflow' }, + { query: { match: { title: 'ruby' } } }, + (err, result) => { + if (err) console.error(err) + console.log(result.body)) // or result.documents + } +) +---- + +To create a new instance of the Msearch helper, you should access it as shown in the example above, the configuration options are: +[cols=2*] +|=== +|`operations` +a|How many search operations should be sent in a single msearch request. + +_Default:_ `20` +[source,js] +---- +const b = client.helpers.msearch({ + operations: 10 +}) +---- + +|`concurrency` +a|How many request will be executed at the same time. + +_Default:_ `5` +[source,js] +---- +const b = client.helpers.msearch({ + concurrency: 10 +}) +---- + +|`retries` +a|How many times an operation will be retried before to resolve the request. An operation will be retried only in case of a 429 error. + +_Default:_ Client max retries. +[source,js] +---- +const b = client.helpers.msearch({ + retries: 3 +}) +---- + +|`wait` +a|How much time to wait before retries in milliseconds. + +_Default:_ 5000. +[source,js] +---- +const b = client.helpers.msearch({ + wait: 3000 +}) +---- + +|=== + +==== Stopping the Msearch Helper +If needed, you can stop a msearch processor at any time. The msearch helper returns a https://promisesaplus.com/[thenable], which has an `stop` method. + +If you are creating multiple msearch helpers instances and using them for a limitied period of time, remember to always use the `stop` method once you have finished using them, otherwise your application will start leaking memory. + +The `stop` method accepts an optional error, that will be dispatched every subsequent search request. + +NOTE: The stop method will stop the execution of the msearch processor, but if you are using a concurrency higher than one, the operations that are already running will not be stopped. + +[source,js] +---- +const { Client } = require('@elastic/elasticsearch') + +const client = new Client({ node: 'http://localhost:9200' }) +const s = client.helpers.msearch() + +s.search( + { index: 'stackoverflow' }, + { query: { match: { title: 'javascript' } } } + ) + .then(result => console.log(result.body)) + .catch(err => console.error(err)) + +s.search( + { index: 'stackoverflow' }, + { query: { match: { title: 'ruby' } } } + ) + .then(result => console.log(result.body)) + .catch(err => console.error(err)) + +setImmediate(() => s.stop()) +---- + === Search Helper + +~Added~ ~in~ ~`v7.7.0`~ + A simple wrapper around the search API. Instead of returning the entire `result` object it will return only the search documents source. For improving the performances, this helper automatically adds `filter_path=hits.hits._source` to the querystring. @@ -234,6 +353,9 @@ for (const doc of documents) { ---- === Scroll Search Helper + +~Added~ ~in~ ~`v7.7.0`~ + This helpers offers a simple and intuitive way to use the scroll search API. Once called, it returns an https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of[async iterator] which can be used in conjuction with a for-await...of. + It handles automatically the `429` error and uses the client's `maxRetries` option. @@ -281,6 +403,8 @@ for await (const result of scrollSearch) { === Scroll Documents Helper +~Added~ ~in~ ~`v7.7.0`~ + It works in the same way as the scroll search helper, but it returns only the documents instead. Note, every loop cycle will return you a single document, and you can't use the `clear` method. For improving the performances, this helper automatically adds `filter_path=hits.hits._source` to the querystring. diff --git a/lib/Helpers.d.ts b/lib/Helpers.d.ts index 00328f999..e1d9d521a 100644 --- a/lib/Helpers.d.ts +++ b/lib/Helpers.d.ts @@ -3,13 +3,14 @@ // See the LICENSE file in the project root for more information import { Readable as ReadableStream } from 'stream' -import { TransportRequestOptions, ApiResponse, RequestBody } from './Transport' -import { Search, Bulk } from '../api/requestParams' +import { TransportRequestOptions, ApiError, ApiResponse, RequestBody } from './Transport' +import { Search, Msearch, Bulk } from '../api/requestParams' export default class Helpers { search>(params: Search, options?: TransportRequestOptions): Promise scrollSearch, TRequestBody extends RequestBody = Record, TContext = unknown>(params: Search, options?: TransportRequestOptions): AsyncIterable> scrollDocuments>(params: Search, options?: TransportRequestOptions): AsyncIterable + msearch(options?: MsearchHelperOptions): MsearchHelper bulk(options: BulkHelperOptions): BulkHelper } @@ -70,8 +71,8 @@ export interface BulkHelperOptions extends Omit) => void, + wait?: number + onDrop?: (doc: OnDropDocument) => void refreshOnCompletion?: boolean | string } @@ -87,4 +88,18 @@ export interface OnDropDocument { } document: TDocument retried: boolean +} + +export interface MsearchHelperOptions extends Omit { + operations?: number + concurrency?: number + retries?: number + wait?: number +} + +declare type callbackFn = (err: ApiError, result: ApiResponse) => void; +export interface MsearchHelper extends Promise { + stop(error?: Error): void + search, TRequestBody extends RequestBody = Record, TContext = unknown>(header: Omit, body: TRequestBody): Promise> + search, TRequestBody extends RequestBody = Record, TContext = unknown>(header: Omit, body: TRequestBody, callback: callbackFn): void } \ No newline at end of file diff --git a/lib/Helpers.js b/lib/Helpers.js index 2c00f88d4..8d0a4b6c4 100644 --- a/lib/Helpers.js +++ b/lib/Helpers.js @@ -6,12 +6,12 @@ /* eslint camelcase: 0 */ +const { Readable } = require('stream') const { promisify } = require('util') const { ResponseError, ConfigurationError } = require('./errors') const pImmediate = promisify(setImmediate) const sleep = promisify(setTimeout) -const kGetHits = Symbol('elasticsearch-get-hits') const kClient = Symbol('elasticsearch-client') const noop = () => {} @@ -21,13 +21,6 @@ class Helpers { this.maxRetries = opts.maxRetries } - [kGetHits] (body) { - if (body.hits && body.hits.hits) { - return body.hits.hits.map(d => d._source) - } - return [] - } - /** * Runs a search operation. The only difference between client.search and this utility, * is that we are only returning the hits to the user and not the full ES response. @@ -39,8 +32,11 @@ class Helpers { */ async search (params, options) { appendFilterPath('hits.hits._source', params, true) - const response = await this[kClient].search(params, options) - return this[kGetHits](response.body) + const { body } = await this[kClient].search(params, options) + if (body.hits && body.hits.hits) { + return body.hits.hits.map(d => d._source) + } + return [] } /** @@ -94,7 +90,7 @@ class Helpers { while (response.body.hits && response.body.hits.hits.length > 0) { scroll_id = response.body._scroll_id response.clear = clear - response.documents = this[kGetHits](response.body) + addDocumentsGetter(response) yield response @@ -140,11 +136,233 @@ class Helpers { } } + /** + * Creates a msearch helper instance. Once you configure it, you can use the provided + * `search` method to add new searches in the queue. + * @param {object} options - The configuration of the msearch operations. + * @return {object} The possible operations to run. + */ + msearch (options = {}) { + // TODO: add an interval to force flush the body + // to handle the slow producer problem + const client = this[kClient] + const { + operations = 20, + concurrency = 5, + retries = this.maxRetries, + wait = 5000, + ...msearchOptions + } = options + + let stopReading = false + let stopError = null + const operationsStream = new Readable({ + objectMode: true, + read (size) {} + }) + + const p = iterate() + + return { + then (onFulfilled, onRejected) { + return p.then(onFulfilled, onRejected) + }, + catch (onRejected) { + return p.catch(onRejected) + }, + stop (error = null) { + stopReading = true + stopError = error + operationsStream.push(null) + }, + // TODO: support abort a single search? + search (header, body, callback) { + if (stopReading === true) { + const error = stopError === null + ? new ConfigurationError('The msearch processor has been stopped') + : stopError + return callback ? callback(error, {}) : Promise.reject(error) + } + + if (!(typeof header === 'object' && header !== null && !Array.isArray(header))) { + const error = new ConfigurationError('The header should be an object') + return callback ? callback(error, {}) : Promise.reject(error) + } + + if (!(typeof body === 'object' && body !== null && !Array.isArray(body))) { + const error = new ConfigurationError('The body should be an object') + return callback ? callback(error, {}) : Promise.reject(error) + } + + let promise = null + if (callback === undefined) { + let onFulfilled = null + let onRejected = null + promise = new Promise((resolve, reject) => { + onFulfilled = resolve + onRejected = reject + }) + callback = function callback (err, result) { + err ? onRejected(err) : onFulfilled(result) + } + } + + operationsStream.push([header, body, callback]) + + if (promise !== null) { + return promise + } + } + } + + async function iterate () { + const { semaphore, finish } = buildSemaphore() + const msearchBody = [] + const callbacks = [] + let loadedOperations = 0 + + for await (const operation of operationsStream) { + loadedOperations += 1 + msearchBody.push(operation[0], operation[1]) + callbacks.push(operation[2]) + if (loadedOperations >= operations) { + const send = await semaphore() + send(msearchBody.slice(), callbacks.slice()) + msearchBody.length = 0 + callbacks.length = 0 + loadedOperations = 0 + } + } + + // In some cases the previos http call does not have finished, + // or we didn't reach the flush bytes threshold, so we force one last operation. + if (loadedOperations > 0) { + const send = await semaphore() + send(msearchBody, callbacks) + } + + await finish() + + if (stopError !== null) { + throw stopError + } + } + + // This function builds a semaphore using the concurrency + // options of the msearch helper. It is used inside the iterator + // to guarantee that no more than the number of operations + // allowed to run at the same time are executed. + // It returns a semaphore function which resolves in the next tick + // if we didn't reach the maximim concurrency yet, otherwise it returns + // a promise that resolves as soon as one of the running request has finshed. + // The semaphore function resolves a send function, which will be used + // to send the actual msearch request. + // It also returns a finish function, which returns a promise that is resolved + // when there are no longer request running. + function buildSemaphore () { + let resolveSemaphore = null + let resolveFinish = null + let running = 0 + + return { semaphore, finish } + + function finish () { + return new Promise((resolve, reject) => { + if (running === 0) { + resolve() + } else { + resolveFinish = resolve + } + }) + } + + function semaphore () { + if (running < concurrency) { + return pImmediate(send) + } else { + return new Promise((resolve, reject) => { + resolveSemaphore = resolve + }) + } + } + + function send (msearchBody, callbacks) { + /* istanbul ignore if */ + if (running >= concurrency) { + throw new Error('Max concurrency reached') + } + running += 1 + msearchOperation(msearchBody, callbacks, () => { + running -= 1 + if (resolveSemaphore) { + resolveSemaphore(send) + resolveSemaphore = null + } else if (resolveFinish && running === 0) { + resolveFinish() + } + }) + } + } + + function msearchOperation (msearchBody, callbacks, done) { + let retryCount = retries + + // Instead of going full on async-await, which would make the code easier to read, + // we have decided to use callback style instead. + // This because every time we use async await, V8 will create multiple promises + // behind the scenes, making the code slightly slower. + tryMsearch(msearchBody, callbacks, retrySearch) + function retrySearch (msearchBody, callbacks) { + if (msearchBody.length > 0 && retryCount > 0) { + retryCount -= 1 + setTimeout(tryMsearch, wait, msearchBody, callbacks, retrySearch) + return + } + + done() + } + + // This function never returns an error, if the msearch operation fails, + // the error is dispatched to all search executors. + function tryMsearch (msearchBody, callbacks, done) { + client.msearch(Object.assign({}, msearchOptions, { body: msearchBody }), (err, results) => { + const retryBody = [] + const retryCallbacks = [] + if (err) { + addDocumentsGetter(results) + for (const callback of callbacks) { + callback(err, results) + } + return done(retryBody, retryCallbacks) + } + const { responses } = results.body + for (let i = 0, len = responses.length; i < len; i++) { + const response = responses[i] + if (response.status === 429 && retryCount > 0) { + retryBody.push(msearchBody[i * 2]) + retryBody.push(msearchBody[(i * 2) + 1]) + retryCallbacks.push(callbacks[i]) + continue + } + const result = { ...results, body: response } + addDocumentsGetter(result) + if (response.status >= 400) { + callbacks[i](new ResponseError(result), result) + } else { + callbacks[i](null, result) + } + } + done(retryBody, retryCallbacks) + }) + } + } + } + /** * Creates a bulk helper instance. Once you configure it, you can pick which operation * to execute with the given dataset, index, create, update, and delete. * @param {object} options - The configuration of the bulk operation. - * @return {object} The possible orations to run with the datasource. + * @return {object} The possible operations to run with the datasource. */ bulk (options) { // TODO: add an interval to force flush the body @@ -436,6 +654,19 @@ class Helpers { } } +// Using a getter will improve the overall performances of the code, +// as we will reed the documents only if needed. +function addDocumentsGetter (result) { + Object.defineProperty(result, 'documents', { + get () { + if (this.body.hits && this.body.hits.hits) { + return this.body.hits.hits.map(d => d._source) + } + return [] + } + }) +} + function appendFilterPath (filter, params, force) { if (params.filter_path !== undefined) { params.filter_path += ',' + filter diff --git a/test/integration/helpers/msearch.test.js b/test/integration/helpers/msearch.test.js new file mode 100644 index 000000000..e95be5745 --- /dev/null +++ b/test/integration/helpers/msearch.test.js @@ -0,0 +1,106 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +'use strict' + +const { createReadStream } = require('fs') +const { join } = require('path') +const split = require('split2') +const { test, beforeEach, afterEach } = require('tap') +const { waitCluster } = require('../../utils') +const { Client, errors } = require('../../../') + +const INDEX = `test-helpers-${process.pid}` +const client = new Client({ + node: process.env.TEST_ES_SERVER || 'http://localhost:9200' +}) + +beforeEach(async () => { + await waitCluster(client) + await client.indices.create({ index: INDEX }) + const stream = createReadStream(join(__dirname, '..', '..', 'fixtures', 'stackoverflow.ndjson')) + const result = await client.helpers.bulk({ + datasource: stream.pipe(split()), + refreshOnCompletion: true, + onDocument (doc) { + return { + index: { _index: INDEX } + } + } + }) + if (result.failed > 0) { + throw new Error('Failed bulk indexing docs') + } +}) + +afterEach(async () => { + await client.indices.delete({ index: INDEX }, { ignore: 404 }) +}) + +test('Basic', t => { + t.plan(4) + const s = client.helpers.msearch({ operations: 1 }) + + s.search( + { index: INDEX }, + { query: { match: { title: 'javascript' } } }, + (err, result) => { + t.error(err) + t.strictEqual(result.body.hits.total.value, 106) + } + ) + + s.search( + { index: INDEX }, + { query: { match: { title: 'ruby' } } }, + (err, result) => { + t.error(err) + t.strictEqual(result.body.hits.total.value, 29) + } + ) + + t.teardown(() => s.stop()) +}) + +test('Bad request', t => { + t.plan(3) + const s = client.helpers.msearch({ operations: 1 }) + + s.search( + { index: INDEX }, + { query: { match: { title: 'javascript' } } }, + (err, result) => { + t.error(err) + t.strictEqual(result.body.hits.total.value, 106) + } + ) + + s.search( + { index: INDEX }, + { query: { foo: { title: 'ruby' } } }, + (err, result) => { + t.true(err instanceof errors.ResponseError) + } + ) + + t.teardown(() => s.stop()) +}) + +test('Send multiple request concurrently over the concurrency limit', t => { + t.plan(20) + const s = client.helpers.msearch({ operations: 1 }) + + for (let i = 0; i < 10; i++) { + s.search( + { index: INDEX }, + { query: { match: { title: 'javascript' } } }, + (err, result) => { + t.error(err) + t.strictEqual(result.body.hits.total.value, 106) + } + ) + } + + t.teardown(() => s.stop()) +}) diff --git a/test/types/helpers.test-d.ts b/test/types/helpers.test-d.ts index b42090b3a..137ab5505 100644 --- a/test/types/helpers.test-d.ts +++ b/test/types/helpers.test-d.ts @@ -9,8 +9,10 @@ import { BulkStats, BulkHelperOptions, ScrollSearchResponse, - OnDropDocument + OnDropDocument, + MsearchHelper } from '../../lib/Helpers' +import { ApiResponse, ApiError } from '../../lib/Transport' const client = new Client({ node: 'http://localhost:9200' @@ -429,4 +431,29 @@ expectError( expectType>(p) expectType(await p) -} \ No newline at end of file +} + +/// .helpers.msearch + +const s = client.helpers.msearch({ + operations: 20, + concurrency: 5, + retries: 5, + wait: 5000 +}) + +expectType(s) +expectType(s.stop()) +expectType(s.stop(new Error('kaboom'))) + +expectType, unknown>>>(s.search({ index: 'foo'}, { query: {} })) +expectType>>(s.search, string>({ index: 'foo'}, { query: {} })) + +expectType(s.search({ index: 'foo'}, { query: {} }, (err, result) => { + expectType(err) + expectType(result) +})) +expectType(s.search, string>({ index: 'foo'}, { query: {} }, (err, result) => { + expectType(err) + expectType>(result) +})) \ No newline at end of file diff --git a/test/unit/helpers/msearch.test.js b/test/unit/helpers/msearch.test.js new file mode 100644 index 000000000..8c1c64fd6 --- /dev/null +++ b/test/unit/helpers/msearch.test.js @@ -0,0 +1,580 @@ +// Licensed to Elasticsearch B.V under one or more agreements. +// Elasticsearch B.V licenses this file to you under the Apache 2.0 License. +// See the LICENSE file in the project root for more information + +'use strict' + +const { test } = require('tap') +const { Client, errors } = require('../../../') +const { connection } = require('../../utils') + +test('Basic', async t => { + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return { + body: { + responses: [{ + status: 200, + hits: { + hits: [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] + } + }] + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch({ operations: 1 }) + + const result = await s.search( + { index: 'test' }, + { query: { match: { foo: 'bar' } } } + ) + + t.deepEqual(result.body, { + status: 200, + hits: { + hits: [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] + } + }) + + t.deepEqual(result.documents, [ + { one: 'one' }, + { two: 'two' }, + { three: 'three' } + ]) + + t.teardown(() => s.stop()) +}) + +test('Multiple searches (inside async iterator)', t => { + t.plan(6) + + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return { + body: { + responses: [{ + status: 200, + hits: { + hits: [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] + } + }, { + status: 200, + hits: { + hits: [ + { _source: { four: 'four' } }, + { _source: { five: 'five' } }, + { _source: { six: 'six' } } + ] + } + }] + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch({ operations: 2 }) + + s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { + t.error(err) + t.deepEqual(result.body, { + status: 200, + hits: { + hits: [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] + } + }) + + t.deepEqual(result.documents, [ + { one: 'one' }, + { two: 'two' }, + { three: 'three' } + ]) + }) + + s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { + t.error(err) + t.deepEqual(result.body, { + status: 200, + hits: { + hits: [ + { _source: { four: 'four' } }, + { _source: { five: 'five' } }, + { _source: { six: 'six' } } + ] + } + }) + + t.deepEqual(result.documents, [ + { four: 'four' }, + { five: 'five' }, + { six: 'six' } + ]) + }) + + t.teardown(() => s.stop()) +}) + +test('Multiple searches (async iterator exits)', t => { + t.plan(6) + + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return { + body: { + responses: [{ + status: 200, + hits: { + hits: [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] + } + }, { + status: 200, + hits: { + hits: [ + { _source: { four: 'four' } }, + { _source: { five: 'five' } }, + { _source: { six: 'six' } } + ] + } + }] + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch() + + s.search({ index: 'test' }, { query: {} }, (err, result) => { + t.error(err) + t.deepEqual(result.body, { + status: 200, + hits: { + hits: [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] + } + }) + + t.deepEqual(result.documents, [ + { one: 'one' }, + { two: 'two' }, + { three: 'three' } + ]) + }) + + s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { + t.error(err) + t.deepEqual(result.body, { + status: 200, + hits: { + hits: [ + { _source: { four: 'four' } }, + { _source: { five: 'five' } }, + { _source: { six: 'six' } } + ] + } + }) + + t.deepEqual(result.documents, [ + { four: 'four' }, + { five: 'five' }, + { six: 'six' } + ]) + }) + + setImmediate(() => s.stop()) +}) + +test('Stop a msearch processor (promises)', async t => { + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return {} + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch({ operations: 1 }) + + s.stop() + + try { + await s.search( + { index: 'test' }, + { query: { match: { foo: 'bar' } } } + ) + } catch (err) { + t.strictEqual(err.message, 'The msearch processor has been stopped') + } + + t.teardown(() => s.stop()) +}) + +test('Stop a msearch processor (callbacks)', t => { + t.plan(1) + + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return {} + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch() + + s.stop() + + s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => { + t.strictEqual(err.message, 'The msearch processor has been stopped') + }) +}) + +test('Bad header', t => { + t.plan(1) + + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return {} + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch() + + s.search(null, { query: { match: { foo: 'bar' } } }, (err, result) => { + t.strictEqual(err.message, 'The header should be an object') + }) + + t.teardown(() => s.stop()) +}) + +test('Bad body', t => { + t.plan(1) + + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return {} + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch() + + s.search({ index: 'test' }, null, (err, result) => { + t.strictEqual(err.message, 'The body should be an object') + }) + + t.teardown(() => s.stop()) +}) + +test('Retry on 429', async t => { + let count = 0 + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + if (count++ === 0) { + return { + body: { + responses: [{ + status: 429, + error: {} + }] + } + } + } else { + return { + body: { + responses: [{ + status: 200, + hits: { + hits: [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] + } + }] + } + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch({ operations: 1, wait: 10 }) + + const result = await s.search( + { index: 'test' }, + { query: { match: { foo: 'bar' } } } + ) + + t.deepEqual(result.body, { + status: 200, + hits: { + hits: [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] + } + }) + + t.deepEqual(result.documents, [ + { one: 'one' }, + { two: 'two' }, + { three: 'three' } + ]) + + t.teardown(() => s.stop()) +}) + +test('Single search errors', async t => { + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return { + body: { + responses: [{ + status: 400, + error: { foo: 'bar' } + }] + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch({ operations: 1 }) + + try { + await s.search( + { index: 'test' }, + { query: { match: { foo: 'bar' } } } + ) + } catch (err) { + t.true(err instanceof errors.ResponseError) + } + + t.teardown(() => s.stop()) +}) + +test('Entire msearch fails', t => { + t.plan(4) + + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return { + statusCode: 500, + body: { + status: 500, + error: { foo: 'bar' } + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch({ operations: 1 }) + + s.search({ index: 'test' }, { query: {} }, (err, result) => { + t.true(err instanceof errors.ResponseError) + t.deepEqual(result.documents, []) + }) + + s.search({ index: 'test' }, { query: {} }, (err, result) => { + t.true(err instanceof errors.ResponseError) + t.deepEqual(result.documents, []) + }) + + t.teardown(() => s.stop()) +}) + +test('Resolves the msearch helper', t => { + t.plan(1) + + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return {} + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch() + + s.stop() + + s.then( + () => t.pass('called'), + e => t.fail('Should not fail') + ) + + s.catch(e => t.fail('Should not fail')) +}) + +test('Stop the msearch helper with an error', t => { + t.plan(3) + + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return {} + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch() + + s.stop(new Error('kaboom')) + + s.then( + () => t.fail('Should fail'), + err => t.is(err.message, 'kaboom') + ) + + s.catch(err => t.is(err.message, 'kaboom')) + + s.search({ index: 'test' }, { query: {} }, (err, result) => { + t.is(err.message, 'kaboom') + }) +}) + +test('Multiple searches (concurrency = 1)', t => { + t.plan(6) + + const MockConnection = connection.buildMockConnection({ + onRequest (params) { + return { + body: { + responses: [{ + status: 200, + hits: { + hits: [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] + } + }] + } + } + } + }) + + const client = new Client({ + node: 'http://localhost:9200', + Connection: MockConnection + }) + + const s = client.helpers.msearch({ operations: 1, concurrency: 1 }) + + s.search({ index: 'test' }, { query: {} }, (err, result) => { + t.error(err) + t.deepEqual(result.body, { + status: 200, + hits: { + hits: [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] + } + }) + + t.deepEqual(result.documents, [ + { one: 'one' }, + { two: 'two' }, + { three: 'three' } + ]) + }) + + s.search({ index: 'test' }, { query: {} }, (err, result) => { + t.error(err) + t.deepEqual(result.body, { + status: 200, + hits: { + hits: [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] + } + }) + + t.deepEqual(result.documents, [ + { one: 'one' }, + { two: 'two' }, + { three: 'three' } + ]) + }) + + t.teardown(() => s.stop()) +})