Add multi search helper (#1186)

This commit is contained in:
Tomas Della Vedova
2020-05-25 15:37:01 +02:00
committed by GitHub
parent 84217fc737
commit 1a25b623b0
6 changed files with 1102 additions and 19 deletions

View File

@ -7,6 +7,9 @@ CAUTION: The client helpers are experimental, and the API may change in the next
The helpers will not work in any Node.js version lower than 10. The helpers will not work in any Node.js version lower than 10.
=== Bulk Helper === Bulk Helper
~Added~ ~in~ ~`v7.7.0`~
Running Bulk requests can be complex due to the shape of the API, this helper aims to provide a nicer developer experience around the Bulk API. Running Bulk requests can be complex due to the shape of the API, this helper aims to provide a nicer developer experience around the Bulk API.
==== Usage ==== Usage
@ -114,7 +117,7 @@ const b = client.helpers.bulk({
---- ----
|`wait` |`wait`
a|How much time to wait before retries in milliseconds.+ a|How much time to wait before retries in milliseconds. +
_Default:_ 5000. _Default:_ 5000.
[source,js] [source,js]
---- ----
@ -211,7 +214,123 @@ const result = await client.helpers.bulk({
console.log(result) console.log(result)
---- ----
=== Multi Search Helper
~Added~ ~in~ ~`v7.8.0`~
If you are sending search request at a high rate, this helper might be useful for you.
It will use the mutli search API under the hood to batch the requests and improve the overall performances of your application. +
The `result` exposes a `documents` property as well, which allows you to access directly the hits sources.
==== Usage
[source,js]
----
const { Client } = require('@elastic/elasticsearch')
const client = new Client({ node: 'http://localhost:9200' })
const s = client.helpers.msearch()
// promise style API
s.search(
{ index: 'stackoverflow' },
{ query: { match: { title: 'javascript' } } }
)
.then(result => console.log(result.body)) // or result.documents
.catch(err => console.error(err))
// callback style API
s.search(
{ index: 'stackoverflow' },
{ query: { match: { title: 'ruby' } } },
(err, result) => {
if (err) console.error(err)
console.log(result.body)) // or result.documents
}
)
----
To create a new instance of the Msearch helper, you should access it as shown in the example above, the configuration options are:
[cols=2*]
|===
|`operations`
a|How many search operations should be sent in a single msearch request. +
_Default:_ `20`
[source,js]
----
const b = client.helpers.msearch({
operations: 10
})
----
|`concurrency`
a|How many request will be executed at the same time. +
_Default:_ `5`
[source,js]
----
const b = client.helpers.msearch({
concurrency: 10
})
----
|`retries`
a|How many times an operation will be retried before to resolve the request. An operation will be retried only in case of a 429 error. +
_Default:_ Client max retries.
[source,js]
----
const b = client.helpers.msearch({
retries: 3
})
----
|`wait`
a|How much time to wait before retries in milliseconds. +
_Default:_ 5000.
[source,js]
----
const b = client.helpers.msearch({
wait: 3000
})
----
|===
==== Stopping the Msearch Helper
If needed, you can stop a msearch processor at any time. The msearch helper returns a https://promisesaplus.com/[thenable], which has an `stop` method.
If you are creating multiple msearch helpers instances and using them for a limitied period of time, remember to always use the `stop` method once you have finished using them, otherwise your application will start leaking memory.
The `stop` method accepts an optional error, that will be dispatched every subsequent search request.
NOTE: The stop method will stop the execution of the msearch processor, but if you are using a concurrency higher than one, the operations that are already running will not be stopped.
[source,js]
----
const { Client } = require('@elastic/elasticsearch')
const client = new Client({ node: 'http://localhost:9200' })
const s = client.helpers.msearch()
s.search(
{ index: 'stackoverflow' },
{ query: { match: { title: 'javascript' } } }
)
.then(result => console.log(result.body))
.catch(err => console.error(err))
s.search(
{ index: 'stackoverflow' },
{ query: { match: { title: 'ruby' } } }
)
.then(result => console.log(result.body))
.catch(err => console.error(err))
setImmediate(() => s.stop())
----
=== Search Helper === Search Helper
~Added~ ~in~ ~`v7.7.0`~
A simple wrapper around the search API. Instead of returning the entire `result` object it will return only the search documents source. A simple wrapper around the search API. Instead of returning the entire `result` object it will return only the search documents source.
For improving the performances, this helper automatically adds `filter_path=hits.hits._source` to the querystring. For improving the performances, this helper automatically adds `filter_path=hits.hits._source` to the querystring.
@ -234,6 +353,9 @@ for (const doc of documents) {
---- ----
=== Scroll Search Helper === Scroll Search Helper
~Added~ ~in~ ~`v7.7.0`~
This helpers offers a simple and intuitive way to use the scroll search API. Once called, it returns an https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of[async iterator] which can be used in conjuction with a for-await...of. + This helpers offers a simple and intuitive way to use the scroll search API. Once called, it returns an https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of[async iterator] which can be used in conjuction with a for-await...of. +
It handles automatically the `429` error and uses the client's `maxRetries` option. It handles automatically the `429` error and uses the client's `maxRetries` option.
@ -281,6 +403,8 @@ for await (const result of scrollSearch) {
=== Scroll Documents Helper === Scroll Documents Helper
~Added~ ~in~ ~`v7.7.0`~
It works in the same way as the scroll search helper, but it returns only the documents instead. Note, every loop cycle will return you a single document, and you can't use the `clear` method. It works in the same way as the scroll search helper, but it returns only the documents instead. Note, every loop cycle will return you a single document, and you can't use the `clear` method.
For improving the performances, this helper automatically adds `filter_path=hits.hits._source` to the querystring. For improving the performances, this helper automatically adds `filter_path=hits.hits._source` to the querystring.

23
lib/Helpers.d.ts vendored
View File

@ -3,13 +3,14 @@
// See the LICENSE file in the project root for more information // See the LICENSE file in the project root for more information
import { Readable as ReadableStream } from 'stream' import { Readable as ReadableStream } from 'stream'
import { TransportRequestOptions, ApiResponse, RequestBody } from './Transport' import { TransportRequestOptions, ApiError, ApiResponse, RequestBody } from './Transport'
import { Search, Bulk } from '../api/requestParams' import { Search, Msearch, Bulk } from '../api/requestParams'
export default class Helpers { export default class Helpers {
search<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): Promise<TDocument[]> search<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): Promise<TDocument[]>
scrollSearch<TDocument = unknown, TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = unknown>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<ScrollSearchResponse<TDocument, TResponse, TContext>> scrollSearch<TDocument = unknown, TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = unknown>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<ScrollSearchResponse<TDocument, TResponse, TContext>>
scrollDocuments<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<TDocument> scrollDocuments<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<TDocument>
msearch(options?: MsearchHelperOptions): MsearchHelper
bulk<TDocument = unknown>(options: BulkHelperOptions<TDocument>): BulkHelper<BulkStats> bulk<TDocument = unknown>(options: BulkHelperOptions<TDocument>): BulkHelper<BulkStats>
} }
@ -70,8 +71,8 @@ export interface BulkHelperOptions<TDocument = unknown> extends Omit<Bulk, 'body
flushBytes?: number flushBytes?: number
concurrency?: number concurrency?: number
retries?: number retries?: number
wait?: number, wait?: number
onDrop?: (doc: OnDropDocument<TDocument>) => void, onDrop?: (doc: OnDropDocument<TDocument>) => void
refreshOnCompletion?: boolean | string refreshOnCompletion?: boolean | string
} }
@ -87,4 +88,18 @@ export interface OnDropDocument<TDocument = unknown> {
} }
document: TDocument document: TDocument
retried: boolean retried: boolean
}
export interface MsearchHelperOptions extends Omit<Msearch, 'body'> {
operations?: number
concurrency?: number
retries?: number
wait?: number
}
declare type callbackFn<Response, Context> = (err: ApiError, result: ApiResponse<Response, Context>) => void;
export interface MsearchHelper extends Promise<void> {
stop(error?: Error): void
search<TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = unknown>(header: Omit<Search, 'body'>, body: TRequestBody): Promise<ApiResponse<TResponse, TContext>>
search<TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = unknown>(header: Omit<Search, 'body'>, body: TRequestBody, callback: callbackFn<TResponse, TContext>): void
} }

View File

@ -6,12 +6,12 @@
/* eslint camelcase: 0 */ /* eslint camelcase: 0 */
const { Readable } = require('stream')
const { promisify } = require('util') const { promisify } = require('util')
const { ResponseError, ConfigurationError } = require('./errors') const { ResponseError, ConfigurationError } = require('./errors')
const pImmediate = promisify(setImmediate) const pImmediate = promisify(setImmediate)
const sleep = promisify(setTimeout) const sleep = promisify(setTimeout)
const kGetHits = Symbol('elasticsearch-get-hits')
const kClient = Symbol('elasticsearch-client') const kClient = Symbol('elasticsearch-client')
const noop = () => {} const noop = () => {}
@ -21,13 +21,6 @@ class Helpers {
this.maxRetries = opts.maxRetries this.maxRetries = opts.maxRetries
} }
[kGetHits] (body) {
if (body.hits && body.hits.hits) {
return body.hits.hits.map(d => d._source)
}
return []
}
/** /**
* Runs a search operation. The only difference between client.search and this utility, * Runs a search operation. The only difference between client.search and this utility,
* is that we are only returning the hits to the user and not the full ES response. * is that we are only returning the hits to the user and not the full ES response.
@ -39,8 +32,11 @@ class Helpers {
*/ */
async search (params, options) { async search (params, options) {
appendFilterPath('hits.hits._source', params, true) appendFilterPath('hits.hits._source', params, true)
const response = await this[kClient].search(params, options) const { body } = await this[kClient].search(params, options)
return this[kGetHits](response.body) if (body.hits && body.hits.hits) {
return body.hits.hits.map(d => d._source)
}
return []
} }
/** /**
@ -94,7 +90,7 @@ class Helpers {
while (response.body.hits && response.body.hits.hits.length > 0) { while (response.body.hits && response.body.hits.hits.length > 0) {
scroll_id = response.body._scroll_id scroll_id = response.body._scroll_id
response.clear = clear response.clear = clear
response.documents = this[kGetHits](response.body) addDocumentsGetter(response)
yield response yield response
@ -140,11 +136,233 @@ class Helpers {
} }
} }
/**
* Creates a msearch helper instance. Once you configure it, you can use the provided
* `search` method to add new searches in the queue.
* @param {object} options - The configuration of the msearch operations.
* @return {object} The possible operations to run.
*/
msearch (options = {}) {
// TODO: add an interval to force flush the body
// to handle the slow producer problem
const client = this[kClient]
const {
operations = 20,
concurrency = 5,
retries = this.maxRetries,
wait = 5000,
...msearchOptions
} = options
let stopReading = false
let stopError = null
const operationsStream = new Readable({
objectMode: true,
read (size) {}
})
const p = iterate()
return {
then (onFulfilled, onRejected) {
return p.then(onFulfilled, onRejected)
},
catch (onRejected) {
return p.catch(onRejected)
},
stop (error = null) {
stopReading = true
stopError = error
operationsStream.push(null)
},
// TODO: support abort a single search?
search (header, body, callback) {
if (stopReading === true) {
const error = stopError === null
? new ConfigurationError('The msearch processor has been stopped')
: stopError
return callback ? callback(error, {}) : Promise.reject(error)
}
if (!(typeof header === 'object' && header !== null && !Array.isArray(header))) {
const error = new ConfigurationError('The header should be an object')
return callback ? callback(error, {}) : Promise.reject(error)
}
if (!(typeof body === 'object' && body !== null && !Array.isArray(body))) {
const error = new ConfigurationError('The body should be an object')
return callback ? callback(error, {}) : Promise.reject(error)
}
let promise = null
if (callback === undefined) {
let onFulfilled = null
let onRejected = null
promise = new Promise((resolve, reject) => {
onFulfilled = resolve
onRejected = reject
})
callback = function callback (err, result) {
err ? onRejected(err) : onFulfilled(result)
}
}
operationsStream.push([header, body, callback])
if (promise !== null) {
return promise
}
}
}
async function iterate () {
const { semaphore, finish } = buildSemaphore()
const msearchBody = []
const callbacks = []
let loadedOperations = 0
for await (const operation of operationsStream) {
loadedOperations += 1
msearchBody.push(operation[0], operation[1])
callbacks.push(operation[2])
if (loadedOperations >= operations) {
const send = await semaphore()
send(msearchBody.slice(), callbacks.slice())
msearchBody.length = 0
callbacks.length = 0
loadedOperations = 0
}
}
// In some cases the previos http call does not have finished,
// or we didn't reach the flush bytes threshold, so we force one last operation.
if (loadedOperations > 0) {
const send = await semaphore()
send(msearchBody, callbacks)
}
await finish()
if (stopError !== null) {
throw stopError
}
}
// This function builds a semaphore using the concurrency
// options of the msearch helper. It is used inside the iterator
// to guarantee that no more than the number of operations
// allowed to run at the same time are executed.
// It returns a semaphore function which resolves in the next tick
// if we didn't reach the maximim concurrency yet, otherwise it returns
// a promise that resolves as soon as one of the running request has finshed.
// The semaphore function resolves a send function, which will be used
// to send the actual msearch request.
// It also returns a finish function, which returns a promise that is resolved
// when there are no longer request running.
function buildSemaphore () {
let resolveSemaphore = null
let resolveFinish = null
let running = 0
return { semaphore, finish }
function finish () {
return new Promise((resolve, reject) => {
if (running === 0) {
resolve()
} else {
resolveFinish = resolve
}
})
}
function semaphore () {
if (running < concurrency) {
return pImmediate(send)
} else {
return new Promise((resolve, reject) => {
resolveSemaphore = resolve
})
}
}
function send (msearchBody, callbacks) {
/* istanbul ignore if */
if (running >= concurrency) {
throw new Error('Max concurrency reached')
}
running += 1
msearchOperation(msearchBody, callbacks, () => {
running -= 1
if (resolveSemaphore) {
resolveSemaphore(send)
resolveSemaphore = null
} else if (resolveFinish && running === 0) {
resolveFinish()
}
})
}
}
function msearchOperation (msearchBody, callbacks, done) {
let retryCount = retries
// Instead of going full on async-await, which would make the code easier to read,
// we have decided to use callback style instead.
// This because every time we use async await, V8 will create multiple promises
// behind the scenes, making the code slightly slower.
tryMsearch(msearchBody, callbacks, retrySearch)
function retrySearch (msearchBody, callbacks) {
if (msearchBody.length > 0 && retryCount > 0) {
retryCount -= 1
setTimeout(tryMsearch, wait, msearchBody, callbacks, retrySearch)
return
}
done()
}
// This function never returns an error, if the msearch operation fails,
// the error is dispatched to all search executors.
function tryMsearch (msearchBody, callbacks, done) {
client.msearch(Object.assign({}, msearchOptions, { body: msearchBody }), (err, results) => {
const retryBody = []
const retryCallbacks = []
if (err) {
addDocumentsGetter(results)
for (const callback of callbacks) {
callback(err, results)
}
return done(retryBody, retryCallbacks)
}
const { responses } = results.body
for (let i = 0, len = responses.length; i < len; i++) {
const response = responses[i]
if (response.status === 429 && retryCount > 0) {
retryBody.push(msearchBody[i * 2])
retryBody.push(msearchBody[(i * 2) + 1])
retryCallbacks.push(callbacks[i])
continue
}
const result = { ...results, body: response }
addDocumentsGetter(result)
if (response.status >= 400) {
callbacks[i](new ResponseError(result), result)
} else {
callbacks[i](null, result)
}
}
done(retryBody, retryCallbacks)
})
}
}
}
/** /**
* Creates a bulk helper instance. Once you configure it, you can pick which operation * Creates a bulk helper instance. Once you configure it, you can pick which operation
* to execute with the given dataset, index, create, update, and delete. * to execute with the given dataset, index, create, update, and delete.
* @param {object} options - The configuration of the bulk operation. * @param {object} options - The configuration of the bulk operation.
* @return {object} The possible orations to run with the datasource. * @return {object} The possible operations to run with the datasource.
*/ */
bulk (options) { bulk (options) {
// TODO: add an interval to force flush the body // TODO: add an interval to force flush the body
@ -436,6 +654,19 @@ class Helpers {
} }
} }
// Using a getter will improve the overall performances of the code,
// as we will reed the documents only if needed.
function addDocumentsGetter (result) {
Object.defineProperty(result, 'documents', {
get () {
if (this.body.hits && this.body.hits.hits) {
return this.body.hits.hits.map(d => d._source)
}
return []
}
})
}
function appendFilterPath (filter, params, force) { function appendFilterPath (filter, params, force) {
if (params.filter_path !== undefined) { if (params.filter_path !== undefined) {
params.filter_path += ',' + filter params.filter_path += ',' + filter

View File

@ -0,0 +1,106 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
'use strict'
const { createReadStream } = require('fs')
const { join } = require('path')
const split = require('split2')
const { test, beforeEach, afterEach } = require('tap')
const { waitCluster } = require('../../utils')
const { Client, errors } = require('../../../')
const INDEX = `test-helpers-${process.pid}`
const client = new Client({
node: process.env.TEST_ES_SERVER || 'http://localhost:9200'
})
beforeEach(async () => {
await waitCluster(client)
await client.indices.create({ index: INDEX })
const stream = createReadStream(join(__dirname, '..', '..', 'fixtures', 'stackoverflow.ndjson'))
const result = await client.helpers.bulk({
datasource: stream.pipe(split()),
refreshOnCompletion: true,
onDocument (doc) {
return {
index: { _index: INDEX }
}
}
})
if (result.failed > 0) {
throw new Error('Failed bulk indexing docs')
}
})
afterEach(async () => {
await client.indices.delete({ index: INDEX }, { ignore: 404 })
})
test('Basic', t => {
t.plan(4)
const s = client.helpers.msearch({ operations: 1 })
s.search(
{ index: INDEX },
{ query: { match: { title: 'javascript' } } },
(err, result) => {
t.error(err)
t.strictEqual(result.body.hits.total.value, 106)
}
)
s.search(
{ index: INDEX },
{ query: { match: { title: 'ruby' } } },
(err, result) => {
t.error(err)
t.strictEqual(result.body.hits.total.value, 29)
}
)
t.teardown(() => s.stop())
})
test('Bad request', t => {
t.plan(3)
const s = client.helpers.msearch({ operations: 1 })
s.search(
{ index: INDEX },
{ query: { match: { title: 'javascript' } } },
(err, result) => {
t.error(err)
t.strictEqual(result.body.hits.total.value, 106)
}
)
s.search(
{ index: INDEX },
{ query: { foo: { title: 'ruby' } } },
(err, result) => {
t.true(err instanceof errors.ResponseError)
}
)
t.teardown(() => s.stop())
})
test('Send multiple request concurrently over the concurrency limit', t => {
t.plan(20)
const s = client.helpers.msearch({ operations: 1 })
for (let i = 0; i < 10; i++) {
s.search(
{ index: INDEX },
{ query: { match: { title: 'javascript' } } },
(err, result) => {
t.error(err)
t.strictEqual(result.body.hits.total.value, 106)
}
)
}
t.teardown(() => s.stop())
})

View File

@ -9,8 +9,10 @@ import {
BulkStats, BulkStats,
BulkHelperOptions, BulkHelperOptions,
ScrollSearchResponse, ScrollSearchResponse,
OnDropDocument OnDropDocument,
MsearchHelper
} from '../../lib/Helpers' } from '../../lib/Helpers'
import { ApiResponse, ApiError } from '../../lib/Transport'
const client = new Client({ const client = new Client({
node: 'http://localhost:9200' node: 'http://localhost:9200'
@ -429,4 +431,29 @@ expectError(
expectType<Promise<Source[]>>(p) expectType<Promise<Source[]>>(p)
expectType<Source[]>(await p) expectType<Source[]>(await p)
} }
/// .helpers.msearch
const s = client.helpers.msearch({
operations: 20,
concurrency: 5,
retries: 5,
wait: 5000
})
expectType<MsearchHelper>(s)
expectType<void>(s.stop())
expectType<void>(s.stop(new Error('kaboom')))
expectType<Promise<ApiResponse<Record<string, any>, unknown>>>(s.search({ index: 'foo'}, { query: {} }))
expectType<Promise<ApiResponse<string, string>>>(s.search<string, Record<string, any>, string>({ index: 'foo'}, { query: {} }))
expectType<void>(s.search({ index: 'foo'}, { query: {} }, (err, result) => {
expectType<ApiError>(err)
expectType<ApiResponse>(result)
}))
expectType<void>(s.search<string, Record<string, any>, string>({ index: 'foo'}, { query: {} }, (err, result) => {
expectType<ApiError>(err)
expectType<ApiResponse<string, string>>(result)
}))

View File

@ -0,0 +1,580 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
'use strict'
const { test } = require('tap')
const { Client, errors } = require('../../../')
const { connection } = require('../../utils')
test('Basic', async t => {
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return {
body: {
responses: [{
status: 200,
hits: {
hits: [
{ _source: { one: 'one' } },
{ _source: { two: 'two' } },
{ _source: { three: 'three' } }
]
}
}]
}
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch({ operations: 1 })
const result = await s.search(
{ index: 'test' },
{ query: { match: { foo: 'bar' } } }
)
t.deepEqual(result.body, {
status: 200,
hits: {
hits: [
{ _source: { one: 'one' } },
{ _source: { two: 'two' } },
{ _source: { three: 'three' } }
]
}
})
t.deepEqual(result.documents, [
{ one: 'one' },
{ two: 'two' },
{ three: 'three' }
])
t.teardown(() => s.stop())
})
test('Multiple searches (inside async iterator)', t => {
t.plan(6)
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return {
body: {
responses: [{
status: 200,
hits: {
hits: [
{ _source: { one: 'one' } },
{ _source: { two: 'two' } },
{ _source: { three: 'three' } }
]
}
}, {
status: 200,
hits: {
hits: [
{ _source: { four: 'four' } },
{ _source: { five: 'five' } },
{ _source: { six: 'six' } }
]
}
}]
}
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch({ operations: 2 })
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
t.error(err)
t.deepEqual(result.body, {
status: 200,
hits: {
hits: [
{ _source: { one: 'one' } },
{ _source: { two: 'two' } },
{ _source: { three: 'three' } }
]
}
})
t.deepEqual(result.documents, [
{ one: 'one' },
{ two: 'two' },
{ three: 'three' }
])
})
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
t.error(err)
t.deepEqual(result.body, {
status: 200,
hits: {
hits: [
{ _source: { four: 'four' } },
{ _source: { five: 'five' } },
{ _source: { six: 'six' } }
]
}
})
t.deepEqual(result.documents, [
{ four: 'four' },
{ five: 'five' },
{ six: 'six' }
])
})
t.teardown(() => s.stop())
})
test('Multiple searches (async iterator exits)', t => {
t.plan(6)
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return {
body: {
responses: [{
status: 200,
hits: {
hits: [
{ _source: { one: 'one' } },
{ _source: { two: 'two' } },
{ _source: { three: 'three' } }
]
}
}, {
status: 200,
hits: {
hits: [
{ _source: { four: 'four' } },
{ _source: { five: 'five' } },
{ _source: { six: 'six' } }
]
}
}]
}
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch()
s.search({ index: 'test' }, { query: {} }, (err, result) => {
t.error(err)
t.deepEqual(result.body, {
status: 200,
hits: {
hits: [
{ _source: { one: 'one' } },
{ _source: { two: 'two' } },
{ _source: { three: 'three' } }
]
}
})
t.deepEqual(result.documents, [
{ one: 'one' },
{ two: 'two' },
{ three: 'three' }
])
})
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
t.error(err)
t.deepEqual(result.body, {
status: 200,
hits: {
hits: [
{ _source: { four: 'four' } },
{ _source: { five: 'five' } },
{ _source: { six: 'six' } }
]
}
})
t.deepEqual(result.documents, [
{ four: 'four' },
{ five: 'five' },
{ six: 'six' }
])
})
setImmediate(() => s.stop())
})
test('Stop a msearch processor (promises)', async t => {
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return {}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch({ operations: 1 })
s.stop()
try {
await s.search(
{ index: 'test' },
{ query: { match: { foo: 'bar' } } }
)
} catch (err) {
t.strictEqual(err.message, 'The msearch processor has been stopped')
}
t.teardown(() => s.stop())
})
test('Stop a msearch processor (callbacks)', t => {
t.plan(1)
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return {}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch()
s.stop()
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
t.strictEqual(err.message, 'The msearch processor has been stopped')
})
})
test('Bad header', t => {
t.plan(1)
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return {}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch()
s.search(null, { query: { match: { foo: 'bar' } } }, (err, result) => {
t.strictEqual(err.message, 'The header should be an object')
})
t.teardown(() => s.stop())
})
test('Bad body', t => {
t.plan(1)
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return {}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch()
s.search({ index: 'test' }, null, (err, result) => {
t.strictEqual(err.message, 'The body should be an object')
})
t.teardown(() => s.stop())
})
test('Retry on 429', async t => {
let count = 0
const MockConnection = connection.buildMockConnection({
onRequest (params) {
if (count++ === 0) {
return {
body: {
responses: [{
status: 429,
error: {}
}]
}
}
} else {
return {
body: {
responses: [{
status: 200,
hits: {
hits: [
{ _source: { one: 'one' } },
{ _source: { two: 'two' } },
{ _source: { three: 'three' } }
]
}
}]
}
}
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch({ operations: 1, wait: 10 })
const result = await s.search(
{ index: 'test' },
{ query: { match: { foo: 'bar' } } }
)
t.deepEqual(result.body, {
status: 200,
hits: {
hits: [
{ _source: { one: 'one' } },
{ _source: { two: 'two' } },
{ _source: { three: 'three' } }
]
}
})
t.deepEqual(result.documents, [
{ one: 'one' },
{ two: 'two' },
{ three: 'three' }
])
t.teardown(() => s.stop())
})
test('Single search errors', async t => {
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return {
body: {
responses: [{
status: 400,
error: { foo: 'bar' }
}]
}
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch({ operations: 1 })
try {
await s.search(
{ index: 'test' },
{ query: { match: { foo: 'bar' } } }
)
} catch (err) {
t.true(err instanceof errors.ResponseError)
}
t.teardown(() => s.stop())
})
test('Entire msearch fails', t => {
t.plan(4)
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return {
statusCode: 500,
body: {
status: 500,
error: { foo: 'bar' }
}
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch({ operations: 1 })
s.search({ index: 'test' }, { query: {} }, (err, result) => {
t.true(err instanceof errors.ResponseError)
t.deepEqual(result.documents, [])
})
s.search({ index: 'test' }, { query: {} }, (err, result) => {
t.true(err instanceof errors.ResponseError)
t.deepEqual(result.documents, [])
})
t.teardown(() => s.stop())
})
test('Resolves the msearch helper', t => {
t.plan(1)
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return {}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch()
s.stop()
s.then(
() => t.pass('called'),
e => t.fail('Should not fail')
)
s.catch(e => t.fail('Should not fail'))
})
test('Stop the msearch helper with an error', t => {
t.plan(3)
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return {}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch()
s.stop(new Error('kaboom'))
s.then(
() => t.fail('Should fail'),
err => t.is(err.message, 'kaboom')
)
s.catch(err => t.is(err.message, 'kaboom'))
s.search({ index: 'test' }, { query: {} }, (err, result) => {
t.is(err.message, 'kaboom')
})
})
test('Multiple searches (concurrency = 1)', t => {
t.plan(6)
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return {
body: {
responses: [{
status: 200,
hits: {
hits: [
{ _source: { one: 'one' } },
{ _source: { two: 'two' } },
{ _source: { three: 'three' } }
]
}
}]
}
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const s = client.helpers.msearch({ operations: 1, concurrency: 1 })
s.search({ index: 'test' }, { query: {} }, (err, result) => {
t.error(err)
t.deepEqual(result.body, {
status: 200,
hits: {
hits: [
{ _source: { one: 'one' } },
{ _source: { two: 'two' } },
{ _source: { three: 'three' } }
]
}
})
t.deepEqual(result.documents, [
{ one: 'one' },
{ two: 'two' },
{ three: 'three' }
])
})
s.search({ index: 'test' }, { query: {} }, (err, result) => {
t.error(err)
t.deepEqual(result.body, {
status: 200,
hits: {
hits: [
{ _source: { one: 'one' } },
{ _source: { two: 'two' } },
{ _source: { three: 'three' } }
]
}
})
t.deepEqual(result.documents, [
{ one: 'one' },
{ two: 'two' },
{ three: 'three' }
])
})
t.teardown(() => s.stop())
})