Bulk update improvements (#1428)
This commit is contained in:
2
lib/Helpers.d.ts
vendored
2
lib/Helpers.d.ts
vendored
@ -36,6 +36,7 @@ export interface ScrollSearchResponse<TDocument = unknown, TResponse = Record<st
|
||||
|
||||
export interface BulkHelper<T> extends Promise<T> {
|
||||
abort: () => BulkHelper<T>
|
||||
readonly stats: BulkStats
|
||||
}
|
||||
|
||||
export interface BulkStats {
|
||||
@ -43,6 +44,7 @@ export interface BulkStats {
|
||||
failed: number
|
||||
retry: number
|
||||
successful: number
|
||||
noop: number
|
||||
time: number
|
||||
bytes: number
|
||||
aborted: boolean
|
||||
|
||||
@ -456,6 +456,7 @@ class Helpers {
|
||||
failed: 0,
|
||||
retry: 0,
|
||||
successful: 0,
|
||||
noop: 0,
|
||||
time: 0,
|
||||
bytes: 0,
|
||||
aborted: false
|
||||
@ -463,6 +464,9 @@ class Helpers {
|
||||
|
||||
const p = iterate()
|
||||
const helper = {
|
||||
get stats () {
|
||||
return stats
|
||||
},
|
||||
then (onFulfilled, onRejected) {
|
||||
return p.then(onFulfilled, onRejected)
|
||||
},
|
||||
@ -692,6 +696,11 @@ class Helpers {
|
||||
if (err) return callback(err, null)
|
||||
if (body.errors === false) {
|
||||
stats.successful += body.items.length
|
||||
for (const item of body.items) {
|
||||
if (item.update && item.update.result === 'noop') {
|
||||
stats.noop++
|
||||
}
|
||||
}
|
||||
return callback(null, [])
|
||||
}
|
||||
const retry = []
|
||||
|
||||
@ -913,6 +913,55 @@ test('bulk update', t => {
|
||||
})
|
||||
})
|
||||
|
||||
t.test('Should track the number of noop results', async t => {
|
||||
let count = 0
|
||||
const MockConnection = connection.buildMockConnection({
|
||||
onRequest (params) {
|
||||
t.strictEqual(params.path, '/_bulk')
|
||||
t.match(params.headers, { 'content-type': 'application/x-ndjson' })
|
||||
const [action, payload] = params.body.split('\n')
|
||||
t.deepEqual(JSON.parse(action), { update: { _index: 'test', _id: count } })
|
||||
t.deepEqual(JSON.parse(payload), { doc: dataset[count++], doc_as_upsert: true })
|
||||
return { body: { errors: false, items: [{ update: { result: 'noop' } }] } }
|
||||
}
|
||||
})
|
||||
|
||||
const client = new Client({
|
||||
node: 'http://localhost:9200',
|
||||
Connection: MockConnection
|
||||
})
|
||||
let id = 0
|
||||
const result = await client.helpers.bulk({
|
||||
datasource: dataset.slice(),
|
||||
flushBytes: 1,
|
||||
concurrency: 1,
|
||||
onDocument (doc) {
|
||||
return [{
|
||||
update: {
|
||||
_index: 'test',
|
||||
_id: id++
|
||||
}
|
||||
}, {
|
||||
doc_as_upsert: true
|
||||
}]
|
||||
},
|
||||
onDrop (doc) {
|
||||
t.fail('This should never be called')
|
||||
}
|
||||
})
|
||||
|
||||
t.type(result.time, 'number')
|
||||
t.type(result.bytes, 'number')
|
||||
t.match(result, {
|
||||
total: 3,
|
||||
successful: 3,
|
||||
noop: 3,
|
||||
retry: 0,
|
||||
failed: 0,
|
||||
aborted: false
|
||||
})
|
||||
})
|
||||
|
||||
t.end()
|
||||
})
|
||||
|
||||
@ -1263,5 +1312,52 @@ test('Flush interval', t => {
|
||||
})
|
||||
})
|
||||
|
||||
t.test('Operation stats', async t => {
|
||||
let count = 0
|
||||
const MockConnection = connection.buildMockConnection({
|
||||
onRequest (params) {
|
||||
t.strictEqual(params.path, '/_bulk')
|
||||
t.match(params.headers, {
|
||||
'content-type': 'application/x-ndjson',
|
||||
'x-elastic-client-meta': `es=${clientVersion},js=${nodeVersion},t=${clientVersion},hc=${nodeVersion},h=bp`
|
||||
})
|
||||
const [action, payload] = params.body.split('\n')
|
||||
t.deepEqual(JSON.parse(action), { index: { _index: 'test' } })
|
||||
t.deepEqual(JSON.parse(payload), dataset[count++])
|
||||
return { body: { errors: false, items: [{}] } }
|
||||
}
|
||||
})
|
||||
|
||||
const client = new Client({
|
||||
node: 'http://localhost:9200',
|
||||
Connection: MockConnection
|
||||
})
|
||||
const b = client.helpers.bulk({
|
||||
datasource: dataset.slice(),
|
||||
flushBytes: 1,
|
||||
concurrency: 1,
|
||||
onDocument (doc) {
|
||||
return {
|
||||
index: { _index: 'test' }
|
||||
}
|
||||
},
|
||||
onDrop (doc) {
|
||||
t.fail('This should never be called')
|
||||
}
|
||||
})
|
||||
const result = await b
|
||||
|
||||
t.type(result.time, 'number')
|
||||
t.type(result.bytes, 'number')
|
||||
t.match(result, b.stats)
|
||||
t.match(result, {
|
||||
total: 3,
|
||||
successful: 3,
|
||||
retry: 0,
|
||||
failed: 0,
|
||||
aborted: false
|
||||
})
|
||||
})
|
||||
|
||||
t.end()
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user