[Backport 7.x] Helpers: avoid allocating new timers (#1225)
Co-authored-by: Tomas Della Vedova <delvedor@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
fd59d66076
commit
acce06c2af
@ -155,7 +155,7 @@ class Helpers {
|
||||
|
||||
let stopReading = false
|
||||
let stopError = null
|
||||
let timeoutId = null
|
||||
let timeoutRef = null
|
||||
const operationsStream = new Readable({
|
||||
objectMode: true,
|
||||
read (size) {}
|
||||
@ -224,9 +224,10 @@ class Helpers {
|
||||
const msearchBody = []
|
||||
const callbacks = []
|
||||
let loadedOperations = 0
|
||||
timeoutRef = setTimeout(onFlushTimeout, flushInterval)
|
||||
|
||||
for await (const operation of operationsStream) {
|
||||
clearTimeout(timeoutId)
|
||||
timeoutRef.refresh()
|
||||
loadedOperations += 1
|
||||
msearchBody.push(operation[0], operation[1])
|
||||
callbacks.push(operation[2])
|
||||
@ -236,12 +237,10 @@ class Helpers {
|
||||
msearchBody.length = 0
|
||||
callbacks.length = 0
|
||||
loadedOperations = 0
|
||||
} else {
|
||||
timeoutId = setTimeout(onFlushTimeout, flushInterval)
|
||||
}
|
||||
}
|
||||
|
||||
clearTimeout(timeoutId)
|
||||
clearTimeout(timeoutRef)
|
||||
// In some cases the previos http call does not have finished,
|
||||
// or we didn't reach the flush bytes threshold, so we force one last operation.
|
||||
if (loadedOperations > 0) {
|
||||
@ -256,6 +255,7 @@ class Helpers {
|
||||
}
|
||||
|
||||
async function onFlushTimeout () {
|
||||
if (loadedOperations === 0) return
|
||||
const msearchBodyCopy = msearchBody.slice()
|
||||
const callbacksCopy = callbacks.slice()
|
||||
msearchBody.length = 0
|
||||
@ -415,7 +415,7 @@ class Helpers {
|
||||
}
|
||||
|
||||
let shouldAbort = false
|
||||
let timeoutId = null
|
||||
let timeoutRef = null
|
||||
const stats = {
|
||||
total: 0,
|
||||
failed: 0,
|
||||
@ -435,7 +435,7 @@ class Helpers {
|
||||
return p.catch(onRejected)
|
||||
},
|
||||
abort () {
|
||||
clearTimeout(timeoutId)
|
||||
clearTimeout(timeoutRef)
|
||||
shouldAbort = true
|
||||
stats.aborted = true
|
||||
return this
|
||||
@ -460,10 +460,11 @@ class Helpers {
|
||||
let actionBody = ''
|
||||
let payloadBody = ''
|
||||
let chunkBytes = 0
|
||||
timeoutRef = setTimeout(onFlushTimeout, flushInterval)
|
||||
|
||||
for await (const chunk of datasource) {
|
||||
if (shouldAbort === true) break
|
||||
clearTimeout(timeoutId)
|
||||
timeoutRef.refresh()
|
||||
const action = onDocument(chunk)
|
||||
const operation = Array.isArray(action)
|
||||
? Object.keys(action[0])[0]
|
||||
@ -485,6 +486,7 @@ class Helpers {
|
||||
chunkBytes += Buffer.byteLength(actionBody)
|
||||
bulkBody.push(actionBody)
|
||||
} else {
|
||||
clearTimeout(timeoutRef)
|
||||
throw new ConfigurationError(`Bulk helper invalid action: '${operation}'`)
|
||||
}
|
||||
|
||||
@ -494,12 +496,10 @@ class Helpers {
|
||||
send(bulkBody.slice())
|
||||
bulkBody.length = 0
|
||||
chunkBytes = 0
|
||||
} else {
|
||||
timeoutId = setTimeout(onFlushTimeout, flushInterval)
|
||||
}
|
||||
}
|
||||
|
||||
clearTimeout(timeoutId)
|
||||
clearTimeout(timeoutRef)
|
||||
// In some cases the previos http call does not have finished,
|
||||
// or we didn't reach the flush bytes threshold, so we force one last operation.
|
||||
if (shouldAbort === false && chunkBytes > 0) {
|
||||
@ -524,6 +524,7 @@ class Helpers {
|
||||
return stats
|
||||
|
||||
async function onFlushTimeout () {
|
||||
if (chunkBytes === 0) return
|
||||
stats.bytes += chunkBytes
|
||||
const bulkBodyCopy = bulkBody.slice()
|
||||
bulkBody.length = 0
|
||||
|
||||
Reference in New Issue
Block a user