Added async generator support in bulk helper (#1138)

* Added async generator support in bulk helper

* Updated test

* Updated docs

* Improved type definitions

* Updated onDrop callback type definition
This commit is contained in:
Tomas Della Vedova
2020-04-03 09:46:26 +02:00
committed by GitHub
parent bdd38597d8
commit df17fb99d0
5 changed files with 115 additions and 17 deletions

View File

@ -43,7 +43,7 @@ To create a new instance of the Bulk helper, you should access it as shown in th
[cols=2*]
|===
|`datasource`
a|An array or a readable stream with the data you need to index/create/update/delete.
a|An array, async generator or a readable stream with the data you need to index/create/update/delete.
It can be an array of strings or objects, but also a stream of json strings or JavaScript objects. +
If it is a stream, we recommend to use the https://www.npmjs.com/package/split2[`split2`] package, that will split the stream on new lines delimiters. +
This parameter is mandatory.
@ -182,6 +182,36 @@ const result = await client.helpers.bulk({
})
----
==== Usage with an async generator
[source,js]
----
const { Client } = require('@elastic/elasticsearch')
async function * generator () {
const dataset = [
{ user: 'jon', age: 23 },
{ user: 'arya', age: 18 },
{ user: 'tyrion', age: 39 }
]
for (const doc of dataset) {
yield doc
}
}
const client = new Client({ node: 'http://localhost:9200' })
const result = await client.helpers.bulk({
datasource: generator(),
onDocument (doc) {
return {
index: { _index: 'my-index' }
}
}
})
console.log(result)
----
=== Search Helper
A simple wrapper around the search API. Instead of returning the entire `result` object it will return only the search documents result.

24
lib/Helpers.d.ts vendored
View File

@ -10,7 +10,7 @@ export default class Helpers {
search<TRequestBody extends RequestBody, TDocument = unknown>(params: Search<TRequestBody>, options?: TransportRequestOptions): Promise<TDocument[]>
scrollSearch<TRequestBody extends RequestBody, TDocument = unknown, TResponse = ResponseBody, TContext = unknown>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<ScrollSearchResponse<TDocument, TResponse, TContext>>
scrollDocuments<TRequestBody extends RequestBody, TDocument = unknown>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<TDocument>
bulk(options: BulkHelperOptions): BulkHelper<BulkStats>
bulk<TDocument = unknown>(options: BulkHelperOptions<TDocument>): BulkHelper<BulkStats>
}
export interface ScrollSearchResponse<TDocument = unknown, TResponse = ResponseBody, TContext = unknown> extends ApiResponse<TResponse, TContext> {
@ -64,13 +64,27 @@ type UpdateAction = [UpdateActionOperation, Record<string, any>]
type Action = IndexAction | CreateAction | UpdateAction | DeleteAction
type Omit<T, K extends keyof T> = Pick<T, Exclude<keyof T, K>>
export interface BulkHelperOptions extends Omit<Bulk, 'body'> {
datasource: any[] | Buffer | ReadableStream
onDocument: (doc: Record<string, any>) => Action
export interface BulkHelperOptions<TDocument = unknown> extends Omit<Bulk, 'body'> {
datasource: TDocument[] | Buffer | ReadableStream | AsyncIterator<TDocument>
onDocument: (doc: TDocument) => Action
flushBytes?: number
concurrency?: number
retries?: number
wait?: number,
onDrop?: (doc: Record<string, any>) => void,
onDrop?: (doc: OnDropDocument<TDocument>) => void,
refreshOnCompletion?: boolean | string
}
export interface OnDropDocument<TDocument = unknown> {
status: number
error: {
type: string,
reason: string,
caused_by: {
type: string,
reason: string
}
}
document: TDocument
retried: boolean
}

View File

@ -158,8 +158,8 @@ class Helpers {
if (datasource === undefined) {
return Promise.reject(new ConfigurationError('bulk helper: the datasource is required'))
}
if (!(Array.isArray(datasource) || Buffer.isBuffer(datasource) || typeof datasource.pipe === 'function')) {
return Promise.reject(new ConfigurationError('bulk helper: the datasource must be an array or a buffer or a readable stream'))
if (!(Array.isArray(datasource) || Buffer.isBuffer(datasource) || typeof datasource.pipe === 'function' || datasource[Symbol.asyncIterator])) {
return Promise.reject(new ConfigurationError('bulk helper: the datasource must be an array or a buffer or a readable stream or an async generator'))
}
if (onDocument === undefined) {
return Promise.reject(new ConfigurationError('bulk helper: the onDocument callback is required'))

View File

@ -9,7 +9,8 @@ import {
BulkHelper,
BulkStats,
BulkHelperOptions,
ScrollSearchResponse
ScrollSearchResponse,
OnDropDocument
} from '../../lib/Helpers'
const client = new Client({
@ -18,7 +19,7 @@ const client = new Client({
/// .helpers.bulk
const b = client.helpers.bulk({
const b = client.helpers.bulk<Record<string, any>>({
datasource: [],
onDocument (doc) {
expectType<Record<string, any>>(doc)
@ -29,7 +30,7 @@ const b = client.helpers.bulk({
retries: 3,
wait: 5000,
onDrop (doc) {
expectType<Record<string, any>>(doc)
expectType<OnDropDocument<Record<string, any>>>(doc)
},
refreshOnCompletion: true,
pipeline: 'my-pipeline'
@ -59,7 +60,7 @@ expectError(
return { index: { _index: 'test' } }
}
}
expectAssignable<BulkHelperOptions>(options)
expectAssignable<BulkHelperOptions<Record<string, any>>>(options)
}
// create
{
@ -69,20 +70,20 @@ expectError(
return { create: { _index: 'test' } }
}
}
expectAssignable<BulkHelperOptions>(options)
expectAssignable<BulkHelperOptions<Record<string, any>>>(options)
}
// update
{
// without `:BulkHelperOptions` this test cannot pass
// but if we write these options inline inside
// a `.helper.bulk`, it works as expected
const options: BulkHelperOptions = {
const options: BulkHelperOptions<Record<string, any>> = {
datasource: [],
onDocument (doc: Record<string, any>) {
return [{ update: { _index: 'test' } }, doc]
}
}
expectAssignable<BulkHelperOptions>(options)
expectAssignable<BulkHelperOptions<Record<string, any>>>(options)
}
// delete
{
@ -92,7 +93,7 @@ expectError(
return { delete: { _index: 'test' } }
}
}
expectAssignable<BulkHelperOptions>(options)
expectAssignable<BulkHelperOptions<Record<string, any>>>(options)
}
/// .helpers.scrollSearch

View File

@ -653,6 +653,59 @@ test('bulk index', t => {
t.end()
})
t.test('datasource as async generator', t => {
t.test('Should perform a bulk request', async t => {
let count = 0
const MockConnection = connection.buildMockConnection({
onRequest (params) {
t.strictEqual(params.path, '/_bulk')
t.match(params.headers, { 'Content-Type': 'application/x-ndjson' })
const [action, payload] = params.body.split('\n')
t.deepEqual(JSON.parse(action), { index: { _index: 'test' } })
t.deepEqual(JSON.parse(payload), dataset[count++])
return { body: { errors: false, items: [{}] } }
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
async function * generator () {
const data = dataset.slice()
for (const doc of data) {
yield doc
}
}
const result = await client.helpers.bulk({
datasource: generator(),
flushBytes: 1,
concurrency: 1,
onDocument (doc) {
return {
index: { _index: 'test' }
}
},
onDrop (doc) {
t.fail('This should never be called')
}
})
t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false
})
})
t.end()
})
t.end()
})
@ -896,7 +949,7 @@ test('errors', t => {
})
} catch (err) {
t.true(err instanceof errors.ConfigurationError)
t.is(err.message, 'bulk helper: the datasource must be an array or a buffer or a readable stream')
t.is(err.message, 'bulk helper: the datasource must be an array or a buffer or a readable stream or an async generator')
}
})