[Backport 7.x] Add multi search helper (#1202)
Co-authored-by: Tomas Della Vedova <delvedor@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
35e587663c
commit
57fbbd0a8f
23
lib/Helpers.d.ts
vendored
23
lib/Helpers.d.ts
vendored
@ -3,13 +3,14 @@
|
||||
// See the LICENSE file in the project root for more information
|
||||
|
||||
import { Readable as ReadableStream } from 'stream'
|
||||
import { TransportRequestOptions, ApiResponse, RequestBody } from './Transport'
|
||||
import { Search, Bulk } from '../api/requestParams'
|
||||
import { TransportRequestOptions, ApiError, ApiResponse, RequestBody } from './Transport'
|
||||
import { Search, Msearch, Bulk } from '../api/requestParams'
|
||||
|
||||
export default class Helpers {
|
||||
search<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): Promise<TDocument[]>
|
||||
scrollSearch<TDocument = unknown, TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = unknown>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<ScrollSearchResponse<TDocument, TResponse, TContext>>
|
||||
scrollDocuments<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<TDocument>
|
||||
msearch(options?: MsearchHelperOptions): MsearchHelper
|
||||
bulk<TDocument = unknown>(options: BulkHelperOptions<TDocument>): BulkHelper<BulkStats>
|
||||
}
|
||||
|
||||
@ -70,8 +71,8 @@ export interface BulkHelperOptions<TDocument = unknown> extends Omit<Bulk, 'body
|
||||
flushBytes?: number
|
||||
concurrency?: number
|
||||
retries?: number
|
||||
wait?: number,
|
||||
onDrop?: (doc: OnDropDocument<TDocument>) => void,
|
||||
wait?: number
|
||||
onDrop?: (doc: OnDropDocument<TDocument>) => void
|
||||
refreshOnCompletion?: boolean | string
|
||||
}
|
||||
|
||||
@ -87,4 +88,18 @@ export interface OnDropDocument<TDocument = unknown> {
|
||||
}
|
||||
document: TDocument
|
||||
retried: boolean
|
||||
}
|
||||
|
||||
export interface MsearchHelperOptions extends Omit<Msearch, 'body'> {
|
||||
operations?: number
|
||||
concurrency?: number
|
||||
retries?: number
|
||||
wait?: number
|
||||
}
|
||||
|
||||
declare type callbackFn<Response, Context> = (err: ApiError, result: ApiResponse<Response, Context>) => void;
|
||||
export interface MsearchHelper extends Promise<void> {
|
||||
stop(error?: Error): void
|
||||
search<TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = unknown>(header: Omit<Search, 'body'>, body: TRequestBody): Promise<ApiResponse<TResponse, TContext>>
|
||||
search<TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = unknown>(header: Omit<Search, 'body'>, body: TRequestBody, callback: callbackFn<TResponse, TContext>): void
|
||||
}
|
||||
255
lib/Helpers.js
255
lib/Helpers.js
@ -6,12 +6,12 @@
|
||||
|
||||
/* eslint camelcase: 0 */
|
||||
|
||||
const { Readable } = require('stream')
|
||||
const { promisify } = require('util')
|
||||
const { ResponseError, ConfigurationError } = require('./errors')
|
||||
|
||||
const pImmediate = promisify(setImmediate)
|
||||
const sleep = promisify(setTimeout)
|
||||
const kGetHits = Symbol('elasticsearch-get-hits')
|
||||
const kClient = Symbol('elasticsearch-client')
|
||||
const noop = () => {}
|
||||
|
||||
@ -21,13 +21,6 @@ class Helpers {
|
||||
this.maxRetries = opts.maxRetries
|
||||
}
|
||||
|
||||
[kGetHits] (body) {
|
||||
if (body.hits && body.hits.hits) {
|
||||
return body.hits.hits.map(d => d._source)
|
||||
}
|
||||
return []
|
||||
}
|
||||
|
||||
/**
|
||||
* Runs a search operation. The only difference between client.search and this utility,
|
||||
* is that we are only returning the hits to the user and not the full ES response.
|
||||
@ -39,8 +32,11 @@ class Helpers {
|
||||
*/
|
||||
async search (params, options) {
|
||||
appendFilterPath('hits.hits._source', params, true)
|
||||
const response = await this[kClient].search(params, options)
|
||||
return this[kGetHits](response.body)
|
||||
const { body } = await this[kClient].search(params, options)
|
||||
if (body.hits && body.hits.hits) {
|
||||
return body.hits.hits.map(d => d._source)
|
||||
}
|
||||
return []
|
||||
}
|
||||
|
||||
/**
|
||||
@ -94,7 +90,7 @@ class Helpers {
|
||||
while (response.body.hits && response.body.hits.hits.length > 0) {
|
||||
scroll_id = response.body._scroll_id
|
||||
response.clear = clear
|
||||
response.documents = this[kGetHits](response.body)
|
||||
addDocumentsGetter(response)
|
||||
|
||||
yield response
|
||||
|
||||
@ -140,11 +136,233 @@ class Helpers {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a msearch helper instance. Once you configure it, you can use the provided
|
||||
* `search` method to add new searches in the queue.
|
||||
* @param {object} options - The configuration of the msearch operations.
|
||||
* @return {object} The possible operations to run.
|
||||
*/
|
||||
msearch (options = {}) {
|
||||
// TODO: add an interval to force flush the body
|
||||
// to handle the slow producer problem
|
||||
const client = this[kClient]
|
||||
const {
|
||||
operations = 20,
|
||||
concurrency = 5,
|
||||
retries = this.maxRetries,
|
||||
wait = 5000,
|
||||
...msearchOptions
|
||||
} = options
|
||||
|
||||
let stopReading = false
|
||||
let stopError = null
|
||||
const operationsStream = new Readable({
|
||||
objectMode: true,
|
||||
read (size) {}
|
||||
})
|
||||
|
||||
const p = iterate()
|
||||
|
||||
return {
|
||||
then (onFulfilled, onRejected) {
|
||||
return p.then(onFulfilled, onRejected)
|
||||
},
|
||||
catch (onRejected) {
|
||||
return p.catch(onRejected)
|
||||
},
|
||||
stop (error = null) {
|
||||
stopReading = true
|
||||
stopError = error
|
||||
operationsStream.push(null)
|
||||
},
|
||||
// TODO: support abort a single search?
|
||||
search (header, body, callback) {
|
||||
if (stopReading === true) {
|
||||
const error = stopError === null
|
||||
? new ConfigurationError('The msearch processor has been stopped')
|
||||
: stopError
|
||||
return callback ? callback(error, {}) : Promise.reject(error)
|
||||
}
|
||||
|
||||
if (!(typeof header === 'object' && header !== null && !Array.isArray(header))) {
|
||||
const error = new ConfigurationError('The header should be an object')
|
||||
return callback ? callback(error, {}) : Promise.reject(error)
|
||||
}
|
||||
|
||||
if (!(typeof body === 'object' && body !== null && !Array.isArray(body))) {
|
||||
const error = new ConfigurationError('The body should be an object')
|
||||
return callback ? callback(error, {}) : Promise.reject(error)
|
||||
}
|
||||
|
||||
let promise = null
|
||||
if (callback === undefined) {
|
||||
let onFulfilled = null
|
||||
let onRejected = null
|
||||
promise = new Promise((resolve, reject) => {
|
||||
onFulfilled = resolve
|
||||
onRejected = reject
|
||||
})
|
||||
callback = function callback (err, result) {
|
||||
err ? onRejected(err) : onFulfilled(result)
|
||||
}
|
||||
}
|
||||
|
||||
operationsStream.push([header, body, callback])
|
||||
|
||||
if (promise !== null) {
|
||||
return promise
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async function iterate () {
|
||||
const { semaphore, finish } = buildSemaphore()
|
||||
const msearchBody = []
|
||||
const callbacks = []
|
||||
let loadedOperations = 0
|
||||
|
||||
for await (const operation of operationsStream) {
|
||||
loadedOperations += 1
|
||||
msearchBody.push(operation[0], operation[1])
|
||||
callbacks.push(operation[2])
|
||||
if (loadedOperations >= operations) {
|
||||
const send = await semaphore()
|
||||
send(msearchBody.slice(), callbacks.slice())
|
||||
msearchBody.length = 0
|
||||
callbacks.length = 0
|
||||
loadedOperations = 0
|
||||
}
|
||||
}
|
||||
|
||||
// In some cases the previos http call does not have finished,
|
||||
// or we didn't reach the flush bytes threshold, so we force one last operation.
|
||||
if (loadedOperations > 0) {
|
||||
const send = await semaphore()
|
||||
send(msearchBody, callbacks)
|
||||
}
|
||||
|
||||
await finish()
|
||||
|
||||
if (stopError !== null) {
|
||||
throw stopError
|
||||
}
|
||||
}
|
||||
|
||||
// This function builds a semaphore using the concurrency
|
||||
// options of the msearch helper. It is used inside the iterator
|
||||
// to guarantee that no more than the number of operations
|
||||
// allowed to run at the same time are executed.
|
||||
// It returns a semaphore function which resolves in the next tick
|
||||
// if we didn't reach the maximim concurrency yet, otherwise it returns
|
||||
// a promise that resolves as soon as one of the running request has finshed.
|
||||
// The semaphore function resolves a send function, which will be used
|
||||
// to send the actual msearch request.
|
||||
// It also returns a finish function, which returns a promise that is resolved
|
||||
// when there are no longer request running.
|
||||
function buildSemaphore () {
|
||||
let resolveSemaphore = null
|
||||
let resolveFinish = null
|
||||
let running = 0
|
||||
|
||||
return { semaphore, finish }
|
||||
|
||||
function finish () {
|
||||
return new Promise((resolve, reject) => {
|
||||
if (running === 0) {
|
||||
resolve()
|
||||
} else {
|
||||
resolveFinish = resolve
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
function semaphore () {
|
||||
if (running < concurrency) {
|
||||
return pImmediate(send)
|
||||
} else {
|
||||
return new Promise((resolve, reject) => {
|
||||
resolveSemaphore = resolve
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
function send (msearchBody, callbacks) {
|
||||
/* istanbul ignore if */
|
||||
if (running >= concurrency) {
|
||||
throw new Error('Max concurrency reached')
|
||||
}
|
||||
running += 1
|
||||
msearchOperation(msearchBody, callbacks, () => {
|
||||
running -= 1
|
||||
if (resolveSemaphore) {
|
||||
resolveSemaphore(send)
|
||||
resolveSemaphore = null
|
||||
} else if (resolveFinish && running === 0) {
|
||||
resolveFinish()
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
function msearchOperation (msearchBody, callbacks, done) {
|
||||
let retryCount = retries
|
||||
|
||||
// Instead of going full on async-await, which would make the code easier to read,
|
||||
// we have decided to use callback style instead.
|
||||
// This because every time we use async await, V8 will create multiple promises
|
||||
// behind the scenes, making the code slightly slower.
|
||||
tryMsearch(msearchBody, callbacks, retrySearch)
|
||||
function retrySearch (msearchBody, callbacks) {
|
||||
if (msearchBody.length > 0 && retryCount > 0) {
|
||||
retryCount -= 1
|
||||
setTimeout(tryMsearch, wait, msearchBody, callbacks, retrySearch)
|
||||
return
|
||||
}
|
||||
|
||||
done()
|
||||
}
|
||||
|
||||
// This function never returns an error, if the msearch operation fails,
|
||||
// the error is dispatched to all search executors.
|
||||
function tryMsearch (msearchBody, callbacks, done) {
|
||||
client.msearch(Object.assign({}, msearchOptions, { body: msearchBody }), (err, results) => {
|
||||
const retryBody = []
|
||||
const retryCallbacks = []
|
||||
if (err) {
|
||||
addDocumentsGetter(results)
|
||||
for (const callback of callbacks) {
|
||||
callback(err, results)
|
||||
}
|
||||
return done(retryBody, retryCallbacks)
|
||||
}
|
||||
const { responses } = results.body
|
||||
for (let i = 0, len = responses.length; i < len; i++) {
|
||||
const response = responses[i]
|
||||
if (response.status === 429 && retryCount > 0) {
|
||||
retryBody.push(msearchBody[i * 2])
|
||||
retryBody.push(msearchBody[(i * 2) + 1])
|
||||
retryCallbacks.push(callbacks[i])
|
||||
continue
|
||||
}
|
||||
const result = { ...results, body: response }
|
||||
addDocumentsGetter(result)
|
||||
if (response.status >= 400) {
|
||||
callbacks[i](new ResponseError(result), result)
|
||||
} else {
|
||||
callbacks[i](null, result)
|
||||
}
|
||||
}
|
||||
done(retryBody, retryCallbacks)
|
||||
})
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a bulk helper instance. Once you configure it, you can pick which operation
|
||||
* to execute with the given dataset, index, create, update, and delete.
|
||||
* @param {object} options - The configuration of the bulk operation.
|
||||
* @return {object} The possible orations to run with the datasource.
|
||||
* @return {object} The possible operations to run with the datasource.
|
||||
*/
|
||||
bulk (options) {
|
||||
// TODO: add an interval to force flush the body
|
||||
@ -436,6 +654,19 @@ class Helpers {
|
||||
}
|
||||
}
|
||||
|
||||
// Using a getter will improve the overall performances of the code,
|
||||
// as we will reed the documents only if needed.
|
||||
function addDocumentsGetter (result) {
|
||||
Object.defineProperty(result, 'documents', {
|
||||
get () {
|
||||
if (this.body.hits && this.body.hits.hits) {
|
||||
return this.body.hits.hits.map(d => d._source)
|
||||
}
|
||||
return []
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
function appendFilterPath (filter, params, force) {
|
||||
if (params.filter_path !== undefined) {
|
||||
params.filter_path += ',' + filter
|
||||
|
||||
Reference in New Issue
Block a user