[helpers] add support for transport options to all helpers (#1400)
Co-authored-by: spalger <spalger@users.noreply.github.com>
This commit is contained in:
4
lib/Helpers.d.ts
vendored
4
lib/Helpers.d.ts
vendored
@ -25,8 +25,8 @@ export default class Helpers {
|
|||||||
search<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): Promise<TDocument[]>
|
search<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): Promise<TDocument[]>
|
||||||
scrollSearch<TDocument = unknown, TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = Context>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<ScrollSearchResponse<TDocument, TResponse, TContext>>
|
scrollSearch<TDocument = unknown, TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = Context>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<ScrollSearchResponse<TDocument, TResponse, TContext>>
|
||||||
scrollDocuments<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<TDocument>
|
scrollDocuments<TDocument = unknown, TRequestBody extends RequestBody = Record<string, any>>(params: Search<TRequestBody>, options?: TransportRequestOptions): AsyncIterable<TDocument>
|
||||||
msearch(options?: MsearchHelperOptions): MsearchHelper
|
msearch(options?: MsearchHelperOptions, reqOptions?: TransportRequestOptions): MsearchHelper
|
||||||
bulk<TDocument = unknown>(options: BulkHelperOptions<TDocument>): BulkHelper<BulkStats>
|
bulk<TDocument = unknown>(options: BulkHelperOptions<TDocument>, reqOptions?: TransportRequestOptions): BulkHelper<BulkStats>
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface ScrollSearchResponse<TDocument = unknown, TResponse = Record<string, any>, TContext = Context> extends ApiResponse<TResponse, TContext> {
|
export interface ScrollSearchResponse<TDocument = unknown, TResponse = Record<string, any>, TContext = Context> extends ApiResponse<TResponse, TContext> {
|
||||||
|
|||||||
@ -158,7 +158,7 @@ class Helpers {
|
|||||||
*/
|
*/
|
||||||
async * scrollDocuments (params, options) {
|
async * scrollDocuments (params, options) {
|
||||||
appendFilterPath('hits.hits._source', params, true)
|
appendFilterPath('hits.hits._source', params, true)
|
||||||
for await (const { documents } of this.scrollSearch(params)) {
|
for await (const { documents } of this.scrollSearch(params, options)) {
|
||||||
for (const document of documents) {
|
for (const document of documents) {
|
||||||
yield document
|
yield document
|
||||||
}
|
}
|
||||||
@ -169,9 +169,10 @@ class Helpers {
|
|||||||
* Creates a msearch helper instance. Once you configure it, you can use the provided
|
* Creates a msearch helper instance. Once you configure it, you can use the provided
|
||||||
* `search` method to add new searches in the queue.
|
* `search` method to add new searches in the queue.
|
||||||
* @param {object} options - The configuration of the msearch operations.
|
* @param {object} options - The configuration of the msearch operations.
|
||||||
|
* @param {object} reqOptions - The client optional configuration for this request.
|
||||||
* @return {object} The possible operations to run.
|
* @return {object} The possible operations to run.
|
||||||
*/
|
*/
|
||||||
msearch (options = {}) {
|
msearch (options = {}, reqOptions = {}) {
|
||||||
const client = this[kClient]
|
const client = this[kClient]
|
||||||
const {
|
const {
|
||||||
operations = 5,
|
operations = 5,
|
||||||
@ -378,7 +379,7 @@ class Helpers {
|
|||||||
// This function never returns an error, if the msearch operation fails,
|
// This function never returns an error, if the msearch operation fails,
|
||||||
// the error is dispatched to all search executors.
|
// the error is dispatched to all search executors.
|
||||||
function tryMsearch (msearchBody, callbacks, done) {
|
function tryMsearch (msearchBody, callbacks, done) {
|
||||||
client.msearch(Object.assign({}, msearchOptions, { body: msearchBody }), (err, results) => {
|
client.msearch(Object.assign({}, msearchOptions, { body: msearchBody }), reqOptions, (err, results) => {
|
||||||
const retryBody = []
|
const retryBody = []
|
||||||
const retryCallbacks = []
|
const retryCallbacks = []
|
||||||
if (err) {
|
if (err) {
|
||||||
@ -415,12 +416,16 @@ class Helpers {
|
|||||||
* Creates a bulk helper instance. Once you configure it, you can pick which operation
|
* Creates a bulk helper instance. Once you configure it, you can pick which operation
|
||||||
* to execute with the given dataset, index, create, update, and delete.
|
* to execute with the given dataset, index, create, update, and delete.
|
||||||
* @param {object} options - The configuration of the bulk operation.
|
* @param {object} options - The configuration of the bulk operation.
|
||||||
|
* @param {object} reqOptions - The client optional configuration for this request.
|
||||||
* @return {object} The possible operations to run with the datasource.
|
* @return {object} The possible operations to run with the datasource.
|
||||||
*/
|
*/
|
||||||
bulk (options) {
|
bulk (options, reqOptions = {}) {
|
||||||
const client = this[kClient]
|
const client = this[kClient]
|
||||||
const { serialize, deserialize } = client.serializer
|
const { serialize, deserialize } = client.serializer
|
||||||
const reqOptions = this[kMetaHeader] !== null ? { headers: { 'x-elastic-client-meta': this[kMetaHeader] + ',h=bp' } } : {}
|
if (this[kMetaHeader] !== null) {
|
||||||
|
reqOptions.headers = reqOptions.headers || {}
|
||||||
|
reqOptions.headers['x-elastic-client-meta'] = this[kMetaHeader] + ',h=bp'
|
||||||
|
}
|
||||||
const {
|
const {
|
||||||
datasource,
|
datasource,
|
||||||
onDocument,
|
onDocument,
|
||||||
@ -545,7 +550,7 @@ class Helpers {
|
|||||||
index: typeof refreshOnCompletion === 'string'
|
index: typeof refreshOnCompletion === 'string'
|
||||||
? refreshOnCompletion
|
? refreshOnCompletion
|
||||||
: '_all'
|
: '_all'
|
||||||
})
|
}, reqOptions)
|
||||||
}
|
}
|
||||||
|
|
||||||
stats.time = Date.now() - startTime
|
stats.time = Date.now() - startTime
|
||||||
|
|||||||
@ -1049,6 +1049,66 @@ test('bulk delete', t => {
|
|||||||
t.end()
|
t.end()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test('transport options', t => {
|
||||||
|
t.test('Should pass transport options in request', async t => {
|
||||||
|
let count = 0
|
||||||
|
const MockConnection = connection.buildMockConnection({
|
||||||
|
onRequest (params) {
|
||||||
|
count++
|
||||||
|
|
||||||
|
if (params.path === '/_bulk') {
|
||||||
|
t.match(params.headers, {
|
||||||
|
'content-type': 'application/x-ndjson',
|
||||||
|
foo: 'bar'
|
||||||
|
})
|
||||||
|
return { body: { errors: false, items: [{}] } }
|
||||||
|
}
|
||||||
|
|
||||||
|
t.strictEqual(params.path, '/_all/_refresh')
|
||||||
|
t.match(params.headers, {
|
||||||
|
foo: 'bar'
|
||||||
|
})
|
||||||
|
return { body: {} }
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
const client = new Client({
|
||||||
|
node: 'http://localhost:9200',
|
||||||
|
Connection: MockConnection
|
||||||
|
})
|
||||||
|
|
||||||
|
const result = await client.helpers.bulk({
|
||||||
|
datasource: dataset.slice(),
|
||||||
|
flushBytes: 1,
|
||||||
|
concurrency: 1,
|
||||||
|
onDocument (doc) {
|
||||||
|
return { index: { _index: 'test' } }
|
||||||
|
},
|
||||||
|
onDrop (doc) {
|
||||||
|
t.fail('This should never be called')
|
||||||
|
},
|
||||||
|
refreshOnCompletion: true
|
||||||
|
}, {
|
||||||
|
headers: {
|
||||||
|
foo: 'bar'
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.strictEqual(count, 4) // three bulk requests, one refresh
|
||||||
|
t.type(result.time, 'number')
|
||||||
|
t.type(result.bytes, 'number')
|
||||||
|
t.match(result, {
|
||||||
|
total: 3,
|
||||||
|
successful: 3,
|
||||||
|
retry: 0,
|
||||||
|
failed: 0,
|
||||||
|
aborted: false
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
test('errors', t => {
|
test('errors', t => {
|
||||||
t.test('datasource type', async t => {
|
t.test('datasource type', async t => {
|
||||||
const client = new Client({
|
const client = new Client({
|
||||||
|
|||||||
@ -756,3 +756,42 @@ test('Stop should resolve the helper (error)', t => {
|
|||||||
|
|
||||||
m.then(() => t.fail('Should not fail'), err => t.is(err.message, 'kaboom'))
|
m.then(() => t.fail('Should not fail'), err => t.is(err.message, 'kaboom'))
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test('Should use req options', async t => {
|
||||||
|
t.plan(1)
|
||||||
|
|
||||||
|
const MockConnection = connection.buildMockConnection({
|
||||||
|
onRequest (params) {
|
||||||
|
t.match(params.headers, {
|
||||||
|
foo: 'bar'
|
||||||
|
})
|
||||||
|
|
||||||
|
return {
|
||||||
|
body: {
|
||||||
|
responses: [{
|
||||||
|
status: 200,
|
||||||
|
hits: { hits: [] }
|
||||||
|
}]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
const client = new Client({
|
||||||
|
node: 'http://localhost:9200',
|
||||||
|
Connection: MockConnection
|
||||||
|
})
|
||||||
|
|
||||||
|
const m = client.helpers.msearch({ operations: 1 }, {
|
||||||
|
headers: {
|
||||||
|
foo: 'bar'
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
await m.search(
|
||||||
|
{ index: 'test' },
|
||||||
|
{ query: { match: { foo: 'bar' } } }
|
||||||
|
)
|
||||||
|
|
||||||
|
t.teardown(() => m.stop())
|
||||||
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user