Files
elasticsearch-js/docs/helpers.asciidoc
2020-06-15 09:26:41 +02:00

505 lines
12 KiB
Plaintext

[[client-helpers]]
== Client Helpers
The client comes with an handy collection of helpers to give you a more comfortable experience with some APIs.
CAUTION: The client helpers are experimental, and the API may change in the next minor releases.
The helpers will not work in any Node.js version lower than 10.
=== Bulk Helper
~Added~ ~in~ ~`v7.7.0`~
Running Bulk requests can be complex due to the shape of the API, this helper aims to provide a nicer developer experience around the Bulk API.
==== Usage
[source,js]
----
const { createReadStream } = require('fs')
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')
const client = new Client({ node: 'http://localhost:9200' })
const result = await client.helpers.bulk({
datasource: createReadStream('./dataset.ndjson').pipe(split()),
onDocument (doc) {
return {
index: { _index: 'my-index' }
}
}
})
console.log(result)
// {
// total: number,
// failed: number,
// retry: number,
// successful: number,
// time: number,
// bytes: number,
// aborted: boolean
// }
----
To create a new instance of the Bulk helper, you should access it as shown in the example above, the configuration options are:
[cols=2*]
|===
|`datasource`
a|An array, async generator or a readable stream with the data you need to index/create/update/delete.
It can be an array of strings or objects, but also a stream of json strings or JavaScript objects. +
If it is a stream, we recommend to use the https://www.npmjs.com/package/split2[`split2`] package, that will split the stream on new lines delimiters. +
This parameter is mandatory.
[source,js]
----
const { createReadStream } = require('fs')
const split = require('split2')
const b = client.helpers.bulk({
// if you just use split(), the data will be used as array of strings
datasource: createReadStream('./dataset.ndjson').pipe(split())
// if you need to manipulate the data, you can pass JSON.parse to split
datasource: createReadStream('./dataset.ndjson').pipe(split(JSON.parse))
})
----
|`onDocument`
a|A function that will be called for each document of the datasource. Inside this function you can manipulate the document and you must return the operation you want to execute with the document. Look at the link:{ref}/docs-bulk.html[Bulk API documentation] to see the supported operations. +
This parameter is mandatory.
[source,js]
----
const b = client.helpers.bulk({
onDocument (doc) {
return {
index: { _index: 'my-index' }
}
}
})
----
|`onDrop`
a|A function that will be called for everytime a document can't be indexed and it has reached the maximum amount of retries.
[source,js]
----
const b = client.helpers.bulk({
onDrop (doc) {
console.log(doc)
}
})
----
|`flushBytes`
a|The size of the bulk body in bytes to reach before to send it. Default of 5MB. +
_Default:_ `5000000`
[source,js]
----
const b = client.helpers.bulk({
flushBytes: 1000000
})
----
|`flushInterval`
a|How much time (in milliseconds) the helper will wait before flushing the body from the last document read. +
_Default:_ `30000`
[source,js]
----
const b = client.helpers.bulk({
flushInterval: 30000
})
----
|`concurrency`
a|How many request will be executed at the same time. +
_Default:_ `5`
[source,js]
----
const b = client.helpers.bulk({
concurrency: 10
})
----
|`retries`
a|How many times a document will be retried before to call the `onDrop` callback. +
_Default:_ Client max retries.
[source,js]
----
const b = client.helpers.bulk({
retries: 3
})
----
|`wait`
a|How much time to wait before retries in milliseconds. +
_Default:_ 5000.
[source,js]
----
const b = client.helpers.bulk({
wait: 3000
})
----
|`refreshOnCompletion`
a|If `true`, at the end of the bulk operation it will run a refresh on all indices or on the specified indices. +
_Default:_ false.
[source,js]
----
const b = client.helpers.bulk({
refreshOnCompletion: true
// or
refreshOnCompletion: 'index-name'
})
----
|===
==== Supported operations
===== Index
[source,js]
----
client.helpers.bulk({
datasource: myDatasource,
onDocument (doc) {
return {
index: { _index: 'my-index' }
}
}
})
----
===== Create
[source,js]
----
client.helpers.bulk({
datasource: myDatasource,
onDocument (doc) {
return {
create: { _index: 'my-index', _id: doc.id }
}
}
})
----
===== Update
[source,js]
----
client.helpers.bulk({
datasource: myDatasource,
onDocument (doc) {
// Note that the update operation requires you to return
// an array, where the first element is the actio, while
// the second are the document option
return [
{ update: { _index: 'my-index', _id: doc.id } },
{ doc_as_upsert: true }
]
}
})
----
===== Delete
[source,js]
----
client.helpers.bulk({
datasource: myDatasource,
onDocument (doc) {
return {
delete: { _index: 'my-index', _id: doc.id }
}
}
})
----
==== Abort a bulk operation
If needed, you can abort a bulk operation at any time. The bulk helper returns a https://promisesaplus.com/[thenable], which has an `abort` method.
NOTE: The abort method will stop the execution of the bulk operation, but if you are using a concurrency higher than one, the operations that are already running will not be stopped.
[source,js]
----
const { createReadStream } = require('fs')
const split = require('split2')
const { Client } = require('@elastic/elasticsearch')
const client = new Client({ node: 'http://localhost:9200' })
const b = client.helpers.bulk({
datasource: createReadStream('./dataset.ndjson').pipe(split()),
onDocument (doc) {
return {
index: { _index: 'my-index' }
}
},
onDrop (doc) {
b.abort()
}
})
console.log(await b)
----
==== Passing custom options to the Bulk API
You can pass any option supported by the link:{ref}/docs-bulk.html#docs-bulk-api-query-params[Bulk API] to the helper, and the helper will use those options in conjuction with the Bulk
API call.
[source,js]
----
const result = await client.helpers.bulk({
datasource: [...]
onDocument (doc) {
return {
index: { _index: 'my-index' }
}
},
pipeline: 'my-pipeline'
})
----
==== Usage with an async generator
[source,js]
----
const { Client } = require('@elastic/elasticsearch')
async function * generator () {
const dataset = [
{ user: 'jon', age: 23 },
{ user: 'arya', age: 18 },
{ user: 'tyrion', age: 39 }
]
for (const doc of dataset) {
yield doc
}
}
const client = new Client({ node: 'http://localhost:9200' })
const result = await client.helpers.bulk({
datasource: generator(),
onDocument (doc) {
return {
index: { _index: 'my-index' }
}
}
})
console.log(result)
----
=== Multi Search Helper
~Added~ ~in~ ~`v7.8.0`~
If you are sending search request at a high rate, this helper might be useful for you.
It will use the mutli search API under the hood to batch the requests and improve the overall performances of your application. +
The `result` exposes a `documents` property as well, which allows you to access directly the hits sources.
==== Usage
[source,js]
----
const { Client } = require('@elastic/elasticsearch')
const client = new Client({ node: 'http://localhost:9200' })
const m = client.helpers.msearch()
// promise style API
m.search(
{ index: 'stackoverflow' },
{ query: { match: { title: 'javascript' } } }
)
.then(result => console.log(result.body)) // or result.documents
.catch(err => console.error(err))
// callback style API
m.search(
{ index: 'stackoverflow' },
{ query: { match: { title: 'ruby' } } },
(err, result) => {
if (err) console.error(err)
console.log(result.body)) // or result.documents
}
)
----
To create a new instance of the Msearch helper, you should access it as shown in the example above, the configuration options are:
[cols=2*]
|===
|`operations`
a|How many search operations should be sent in a single msearch request. +
_Default:_ `5`
[source,js]
----
const m = client.helpers.msearch({
operations: 10
})
----
|`flushInterval`
a|How much time (in milliseconds) the helper will wait before flushing the operations from the last operation read. +
_Default:_ `500`
[source,js]
----
const m = client.helpers.msearch({
flushInterval: 500
})
----
|`concurrency`
a|How many request will be executed at the same time. +
_Default:_ `5`
[source,js]
----
const m = client.helpers.msearch({
concurrency: 10
})
----
|`retries`
a|How many times an operation will be retried before to resolve the request. An operation will be retried only in case of a 429 error. +
_Default:_ Client max retries.
[source,js]
----
const m = client.helpers.msearch({
retries: 3
})
----
|`wait`
a|How much time to wait before retries in milliseconds. +
_Default:_ 5000.
[source,js]
----
const m = client.helpers.msearch({
wait: 3000
})
----
|===
==== Stopping the Msearch Helper
If needed, you can stop a msearch processor at any time. The msearch helper returns a https://promisesaplus.com/[thenable], which has an `stop` method.
If you are creating multiple msearch helpers instances and using them for a limitied period of time, remember to always use the `stop` method once you have finished using them, otherwise your application will start leaking memory.
The `stop` method accepts an optional error, that will be dispatched every subsequent search request.
NOTE: The stop method will stop the execution of the msearch processor, but if you are using a concurrency higher than one, the operations that are already running will not be stopped.
[source,js]
----
const { Client } = require('@elastic/elasticsearch')
const client = new Client({ node: 'http://localhost:9200' })
const m = client.helpers.msearch()
m.search(
{ index: 'stackoverflow' },
{ query: { match: { title: 'javascript' } } }
)
.then(result => console.log(result.body))
.catch(err => console.error(err))
m.search(
{ index: 'stackoverflow' },
{ query: { match: { title: 'ruby' } } }
)
.then(result => console.log(result.body))
.catch(err => console.error(err))
setImmediate(() => m.stop())
----
=== Search Helper
~Added~ ~in~ ~`v7.7.0`~
A simple wrapper around the search API. Instead of returning the entire `result` object it will return only the search documents source.
For improving the performances, this helper automatically adds `filter_path=hits.hits._source` to the querystring.
[source,js]
----
const documents = await client.helpers.search({
index: 'stackoverflow',
body: {
query: {
match: {
title: 'javascript'
}
}
}
})
for (const doc of documents) {
console.log(doc)
}
----
=== Scroll Search Helper
~Added~ ~in~ ~`v7.7.0`~
This helpers offers a simple and intuitive way to use the scroll search API. Once called, it returns an https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/for-await...of[async iterator] which can be used in conjuction with a for-await...of. +
It handles automatically the `429` error and uses the client's `maxRetries` option.
[source,js]
----
const scrollSearch = client.helpers.scrollSearch({
index: 'stackoverflow',
body: {
query: {
match: {
title: 'javascript'
}
}
}
})
for await (const result of scrollSearch) {
console.log(result)
}
----
==== Clear a scroll search
If needed, you can clear a scroll search by calling `result.clear()`:
[source,js]
----
for await (const result of scrollSearch) {
if (condition) {
await result.clear()
}
}
----
==== Quickly getting the documents
If you only need the documents from the result of a scroll search, you can access them via `result.documents`:
[source,js]
----
for await (const result of scrollSearch) {
console.log(result.documents)
}
----
=== Scroll Documents Helper
~Added~ ~in~ ~`v7.7.0`~
It works in the same way as the scroll search helper, but it returns only the documents instead. Note, every loop cycle will return you a single document, and you can't use the `clear` method.
For improving the performances, this helper automatically adds `filter_path=hits.hits._source` to the querystring.
[source,js]
----
const scrollSearch = client.helpers.scrollDocuments({
index: 'stackoverflow',
body: {
query: {
match: {
title: 'javascript'
}
}
}
})
for await (const doc of scrollSearch) {
console.log(doc)
}
----