[Backport 7.x] Added timeout support in bulk and msearch helpers (#1211)
Co-authored-by: Tomas Della Vedova <delvedor@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
4121e1e7ff
commit
a4093a7338
4
lib/Helpers.d.ts
vendored
4
lib/Helpers.d.ts
vendored
@ -69,6 +69,7 @@ export interface BulkHelperOptions<TDocument = unknown> extends Omit<Bulk, 'body
|
||||
datasource: TDocument[] | Buffer | ReadableStream | AsyncIterator<TDocument>
|
||||
onDocument: (doc: TDocument) => Action
|
||||
flushBytes?: number
|
||||
flushInterval?: number
|
||||
concurrency?: number
|
||||
retries?: number
|
||||
wait?: number
|
||||
@ -92,6 +93,7 @@ export interface OnDropDocument<TDocument = unknown> {
|
||||
|
||||
export interface MsearchHelperOptions extends Omit<Msearch, 'body'> {
|
||||
operations?: number
|
||||
flushInterval?: number
|
||||
concurrency?: number
|
||||
retries?: number
|
||||
wait?: number
|
||||
@ -102,4 +104,4 @@ export interface MsearchHelper extends Promise<void> {
|
||||
stop(error?: Error): void
|
||||
search<TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = unknown>(header: Omit<Search, 'body'>, body: TRequestBody): Promise<ApiResponse<TResponse, TContext>>
|
||||
search<TResponse = Record<string, any>, TRequestBody extends RequestBody = Record<string, any>, TContext = unknown>(header: Omit<Search, 'body'>, body: TRequestBody, callback: callbackFn<TResponse, TContext>): void
|
||||
}
|
||||
}
|
||||
|
||||
@ -143,12 +143,11 @@ class Helpers {
|
||||
* @return {object} The possible operations to run.
|
||||
*/
|
||||
msearch (options = {}) {
|
||||
// TODO: add an interval to force flush the body
|
||||
// to handle the slow producer problem
|
||||
const client = this[kClient]
|
||||
const {
|
||||
operations = 20,
|
||||
operations = 5,
|
||||
concurrency = 5,
|
||||
flushInterval = 500,
|
||||
retries = this.maxRetries,
|
||||
wait = 5000,
|
||||
...msearchOptions
|
||||
@ -156,14 +155,14 @@ class Helpers {
|
||||
|
||||
let stopReading = false
|
||||
let stopError = null
|
||||
let timeoutId = null
|
||||
const operationsStream = new Readable({
|
||||
objectMode: true,
|
||||
read (size) {}
|
||||
})
|
||||
|
||||
const p = iterate()
|
||||
|
||||
return {
|
||||
const helper = {
|
||||
then (onFulfilled, onRejected) {
|
||||
return p.then(onFulfilled, onRejected)
|
||||
},
|
||||
@ -171,11 +170,14 @@ class Helpers {
|
||||
return p.catch(onRejected)
|
||||
},
|
||||
stop (error = null) {
|
||||
if (stopReading === true) return
|
||||
stopReading = true
|
||||
stopError = error
|
||||
operationsStream.push(null)
|
||||
},
|
||||
// TODO: support abort a single search?
|
||||
// NOTE: the validation checks are synchronous and the callback/promise will
|
||||
// be resolved in the same tick. We might want to fix this in the future.
|
||||
search (header, body, callback) {
|
||||
if (stopReading === true) {
|
||||
const error = stopError === null
|
||||
@ -215,6 +217,8 @@ class Helpers {
|
||||
}
|
||||
}
|
||||
|
||||
return helper
|
||||
|
||||
async function iterate () {
|
||||
const { semaphore, finish } = buildSemaphore()
|
||||
const msearchBody = []
|
||||
@ -222,6 +226,7 @@ class Helpers {
|
||||
let loadedOperations = 0
|
||||
|
||||
for await (const operation of operationsStream) {
|
||||
clearTimeout(timeoutId)
|
||||
loadedOperations += 1
|
||||
msearchBody.push(operation[0], operation[1])
|
||||
callbacks.push(operation[2])
|
||||
@ -231,9 +236,12 @@ class Helpers {
|
||||
msearchBody.length = 0
|
||||
callbacks.length = 0
|
||||
loadedOperations = 0
|
||||
} else {
|
||||
timeoutId = setTimeout(onFlushTimeout, flushInterval)
|
||||
}
|
||||
}
|
||||
|
||||
clearTimeout(timeoutId)
|
||||
// In some cases the previos http call does not have finished,
|
||||
// or we didn't reach the flush bytes threshold, so we force one last operation.
|
||||
if (loadedOperations > 0) {
|
||||
@ -246,6 +254,21 @@ class Helpers {
|
||||
if (stopError !== null) {
|
||||
throw stopError
|
||||
}
|
||||
|
||||
async function onFlushTimeout () {
|
||||
const msearchBodyCopy = msearchBody.slice()
|
||||
const callbacksCopy = callbacks.slice()
|
||||
msearchBody.length = 0
|
||||
callbacks.length = 0
|
||||
loadedOperations = 0
|
||||
try {
|
||||
const send = await semaphore()
|
||||
send(msearchBodyCopy, callbacksCopy)
|
||||
} catch (err) {
|
||||
/* istanbul ignore next */
|
||||
helper.stop(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This function builds a semaphore using the concurrency
|
||||
@ -365,14 +388,13 @@ class Helpers {
|
||||
* @return {object} The possible operations to run with the datasource.
|
||||
*/
|
||||
bulk (options) {
|
||||
// TODO: add an interval to force flush the body
|
||||
// to handle the slow producer problem
|
||||
const client = this[kClient]
|
||||
const { serialize, deserialize } = client.serializer
|
||||
const {
|
||||
datasource,
|
||||
onDocument,
|
||||
flushBytes = 5000000,
|
||||
flushInterval = 30000,
|
||||
concurrency = 5,
|
||||
retries = this.maxRetries,
|
||||
wait = 5000,
|
||||
@ -392,6 +414,7 @@ class Helpers {
|
||||
}
|
||||
|
||||
let shouldAbort = false
|
||||
let timeoutId = null
|
||||
const stats = {
|
||||
total: 0,
|
||||
failed: 0,
|
||||
@ -403,8 +426,7 @@ class Helpers {
|
||||
}
|
||||
|
||||
const p = iterate()
|
||||
|
||||
return {
|
||||
const helper = {
|
||||
then (onFulfilled, onRejected) {
|
||||
return p.then(onFulfilled, onRejected)
|
||||
},
|
||||
@ -412,12 +434,15 @@ class Helpers {
|
||||
return p.catch(onRejected)
|
||||
},
|
||||
abort () {
|
||||
clearTimeout(timeoutId)
|
||||
shouldAbort = true
|
||||
stats.aborted = true
|
||||
return this
|
||||
}
|
||||
}
|
||||
|
||||
return helper
|
||||
|
||||
/**
|
||||
* Function that iterates over the given datasource and start a bulk operation as soon
|
||||
* as it reaches the configured bulk size. It's designed to use the Node.js asynchronous
|
||||
@ -437,6 +462,7 @@ class Helpers {
|
||||
|
||||
for await (const chunk of datasource) {
|
||||
if (shouldAbort === true) break
|
||||
clearTimeout(timeoutId)
|
||||
const action = onDocument(chunk)
|
||||
const operation = Array.isArray(action)
|
||||
? Object.keys(action[0])[0]
|
||||
@ -445,16 +471,14 @@ class Helpers {
|
||||
actionBody = serialize(action)
|
||||
payloadBody = typeof chunk === 'string' ? chunk : serialize(chunk)
|
||||
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
|
||||
bulkBody.push(actionBody)
|
||||
bulkBody.push(payloadBody)
|
||||
bulkBody.push(actionBody, payloadBody)
|
||||
} else if (operation === 'update') {
|
||||
actionBody = serialize(action[0])
|
||||
payloadBody = typeof chunk === 'string'
|
||||
? `{doc:${chunk}}`
|
||||
: serialize({ doc: chunk, ...action[1] })
|
||||
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
|
||||
bulkBody.push(actionBody)
|
||||
bulkBody.push(payloadBody)
|
||||
bulkBody.push(actionBody, payloadBody)
|
||||
} else if (operation === 'delete') {
|
||||
actionBody = serialize(action)
|
||||
chunkBytes += Buffer.byteLength(actionBody)
|
||||
@ -469,9 +493,12 @@ class Helpers {
|
||||
send(bulkBody.slice())
|
||||
bulkBody.length = 0
|
||||
chunkBytes = 0
|
||||
} else {
|
||||
timeoutId = setTimeout(onFlushTimeout, flushInterval)
|
||||
}
|
||||
}
|
||||
|
||||
clearTimeout(timeoutId)
|
||||
// In some cases the previos http call does not have finished,
|
||||
// or we didn't reach the flush bytes threshold, so we force one last operation.
|
||||
if (shouldAbort === false && chunkBytes > 0) {
|
||||
@ -494,6 +521,20 @@ class Helpers {
|
||||
stats.total = stats.successful + stats.failed
|
||||
|
||||
return stats
|
||||
|
||||
async function onFlushTimeout () {
|
||||
stats.bytes += chunkBytes
|
||||
const bulkBodyCopy = bulkBody.slice()
|
||||
bulkBody.length = 0
|
||||
chunkBytes = 0
|
||||
try {
|
||||
const send = await semaphore()
|
||||
send(bulkBodyCopy)
|
||||
} catch (err) {
|
||||
/* istanbul ignore next */
|
||||
helper.abort()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// This function builds a semaphore using the concurrency
|
||||
|
||||
Reference in New Issue
Block a user