Helpers: avoid allocating new timers (#1224)
This commit is contained in:
committed by
GitHub
parent
8e077b46a7
commit
04d082cb49
@ -155,7 +155,7 @@ class Helpers {
|
||||
|
||||
let stopReading = false
|
||||
let stopError = null
|
||||
let timeoutId = null
|
||||
let timeoutRef = null
|
||||
const operationsStream = new Readable({
|
||||
objectMode: true,
|
||||
read (size) {}
|
||||
@ -224,9 +224,10 @@ class Helpers {
|
||||
const msearchBody = []
|
||||
const callbacks = []
|
||||
let loadedOperations = 0
|
||||
timeoutRef = setTimeout(onFlushTimeout, flushInterval)
|
||||
|
||||
for await (const operation of operationsStream) {
|
||||
clearTimeout(timeoutId)
|
||||
timeoutRef.refresh()
|
||||
loadedOperations += 1
|
||||
msearchBody.push(operation[0], operation[1])
|
||||
callbacks.push(operation[2])
|
||||
@ -236,12 +237,10 @@ class Helpers {
|
||||
msearchBody.length = 0
|
||||
callbacks.length = 0
|
||||
loadedOperations = 0
|
||||
} else {
|
||||
timeoutId = setTimeout(onFlushTimeout, flushInterval)
|
||||
}
|
||||
}
|
||||
|
||||
clearTimeout(timeoutId)
|
||||
clearTimeout(timeoutRef)
|
||||
// In some cases the previos http call does not have finished,
|
||||
// or we didn't reach the flush bytes threshold, so we force one last operation.
|
||||
if (loadedOperations > 0) {
|
||||
@ -256,6 +255,7 @@ class Helpers {
|
||||
}
|
||||
|
||||
async function onFlushTimeout () {
|
||||
if (loadedOperations === 0) return
|
||||
const msearchBodyCopy = msearchBody.slice()
|
||||
const callbacksCopy = callbacks.slice()
|
||||
msearchBody.length = 0
|
||||
@ -415,7 +415,7 @@ class Helpers {
|
||||
}
|
||||
|
||||
let shouldAbort = false
|
||||
let timeoutId = null
|
||||
let timeoutRef = null
|
||||
const stats = {
|
||||
total: 0,
|
||||
failed: 0,
|
||||
@ -435,7 +435,7 @@ class Helpers {
|
||||
return p.catch(onRejected)
|
||||
},
|
||||
abort () {
|
||||
clearTimeout(timeoutId)
|
||||
clearTimeout(timeoutRef)
|
||||
shouldAbort = true
|
||||
stats.aborted = true
|
||||
return this
|
||||
@ -460,10 +460,11 @@ class Helpers {
|
||||
let actionBody = ''
|
||||
let payloadBody = ''
|
||||
let chunkBytes = 0
|
||||
timeoutRef = setTimeout(onFlushTimeout, flushInterval)
|
||||
|
||||
for await (const chunk of datasource) {
|
||||
if (shouldAbort === true) break
|
||||
clearTimeout(timeoutId)
|
||||
timeoutRef.refresh()
|
||||
const action = onDocument(chunk)
|
||||
const operation = Array.isArray(action)
|
||||
? Object.keys(action[0])[0]
|
||||
@ -485,6 +486,7 @@ class Helpers {
|
||||
chunkBytes += Buffer.byteLength(actionBody)
|
||||
bulkBody.push(actionBody)
|
||||
} else {
|
||||
clearTimeout(timeoutRef)
|
||||
throw new ConfigurationError(`Bulk helper invalid action: '${operation}'`)
|
||||
}
|
||||
|
||||
@ -494,12 +496,10 @@ class Helpers {
|
||||
send(bulkBody.slice())
|
||||
bulkBody.length = 0
|
||||
chunkBytes = 0
|
||||
} else {
|
||||
timeoutId = setTimeout(onFlushTimeout, flushInterval)
|
||||
}
|
||||
}
|
||||
|
||||
clearTimeout(timeoutId)
|
||||
clearTimeout(timeoutRef)
|
||||
// In some cases the previos http call does not have finished,
|
||||
// or we didn't reach the flush bytes threshold, so we force one last operation.
|
||||
if (shouldAbort === false && chunkBytes > 0) {
|
||||
@ -524,6 +524,7 @@ class Helpers {
|
||||
return stats
|
||||
|
||||
async function onFlushTimeout () {
|
||||
if (chunkBytes === 0) return
|
||||
stats.bytes += chunkBytes
|
||||
const bulkBodyCopy = bulkBody.slice()
|
||||
bulkBody.length = 0
|
||||
|
||||
@ -41,7 +41,7 @@
|
||||
"company": "Elasticsearch BV"
|
||||
},
|
||||
"devDependencies": {
|
||||
"@sinonjs/fake-timers": "^6.0.1",
|
||||
"@sinonjs/fake-timers": "github:sinonjs/fake-timers#0bfffc1",
|
||||
"@types/node": "^12.6.2",
|
||||
"convert-hrtime": "^3.0.0",
|
||||
"dedent": "^0.7.0",
|
||||
|
||||
@ -637,8 +637,6 @@ test('Flush interval', t => {
|
||||
|
||||
test('Flush interval - early stop', t => {
|
||||
t.plan(3)
|
||||
const clock = FakeTimers.install({ toFake: ['setTimeout', 'clearTimeout'] })
|
||||
t.teardown(() => clock.uninstall())
|
||||
|
||||
const MockConnection = connection.buildMockConnection({
|
||||
onRequest (params) {
|
||||
@ -672,7 +670,6 @@ test('Flush interval - early stop', t => {
|
||||
})
|
||||
|
||||
setImmediate(() => {
|
||||
clock.next()
|
||||
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
t.ok(err instanceof errors.ConfigurationError)
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user