Bulk update improvements (#1428)
This commit is contained in:
2
lib/Helpers.d.ts
vendored
2
lib/Helpers.d.ts
vendored
@ -36,6 +36,7 @@ export interface ScrollSearchResponse<TDocument = unknown, TResponse = Record<st
|
|||||||
|
|
||||||
export interface BulkHelper<T> extends Promise<T> {
|
export interface BulkHelper<T> extends Promise<T> {
|
||||||
abort: () => BulkHelper<T>
|
abort: () => BulkHelper<T>
|
||||||
|
readonly stats: BulkStats
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface BulkStats {
|
export interface BulkStats {
|
||||||
@ -43,6 +44,7 @@ export interface BulkStats {
|
|||||||
failed: number
|
failed: number
|
||||||
retry: number
|
retry: number
|
||||||
successful: number
|
successful: number
|
||||||
|
noop: number
|
||||||
time: number
|
time: number
|
||||||
bytes: number
|
bytes: number
|
||||||
aborted: boolean
|
aborted: boolean
|
||||||
|
|||||||
@ -456,6 +456,7 @@ class Helpers {
|
|||||||
failed: 0,
|
failed: 0,
|
||||||
retry: 0,
|
retry: 0,
|
||||||
successful: 0,
|
successful: 0,
|
||||||
|
noop: 0,
|
||||||
time: 0,
|
time: 0,
|
||||||
bytes: 0,
|
bytes: 0,
|
||||||
aborted: false
|
aborted: false
|
||||||
@ -463,6 +464,9 @@ class Helpers {
|
|||||||
|
|
||||||
const p = iterate()
|
const p = iterate()
|
||||||
const helper = {
|
const helper = {
|
||||||
|
get stats () {
|
||||||
|
return stats
|
||||||
|
},
|
||||||
then (onFulfilled, onRejected) {
|
then (onFulfilled, onRejected) {
|
||||||
return p.then(onFulfilled, onRejected)
|
return p.then(onFulfilled, onRejected)
|
||||||
},
|
},
|
||||||
@ -692,6 +696,11 @@ class Helpers {
|
|||||||
if (err) return callback(err, null)
|
if (err) return callback(err, null)
|
||||||
if (body.errors === false) {
|
if (body.errors === false) {
|
||||||
stats.successful += body.items.length
|
stats.successful += body.items.length
|
||||||
|
for (const item of body.items) {
|
||||||
|
if (item.update && item.update.result === 'noop') {
|
||||||
|
stats.noop++
|
||||||
|
}
|
||||||
|
}
|
||||||
return callback(null, [])
|
return callback(null, [])
|
||||||
}
|
}
|
||||||
const retry = []
|
const retry = []
|
||||||
|
|||||||
@ -913,6 +913,55 @@ test('bulk update', t => {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.test('Should track the number of noop results', async t => {
|
||||||
|
let count = 0
|
||||||
|
const MockConnection = connection.buildMockConnection({
|
||||||
|
onRequest (params) {
|
||||||
|
t.strictEqual(params.path, '/_bulk')
|
||||||
|
t.match(params.headers, { 'content-type': 'application/x-ndjson' })
|
||||||
|
const [action, payload] = params.body.split('\n')
|
||||||
|
t.deepEqual(JSON.parse(action), { update: { _index: 'test', _id: count } })
|
||||||
|
t.deepEqual(JSON.parse(payload), { doc: dataset[count++], doc_as_upsert: true })
|
||||||
|
return { body: { errors: false, items: [{ update: { result: 'noop' } }] } }
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
const client = new Client({
|
||||||
|
node: 'http://localhost:9200',
|
||||||
|
Connection: MockConnection
|
||||||
|
})
|
||||||
|
let id = 0
|
||||||
|
const result = await client.helpers.bulk({
|
||||||
|
datasource: dataset.slice(),
|
||||||
|
flushBytes: 1,
|
||||||
|
concurrency: 1,
|
||||||
|
onDocument (doc) {
|
||||||
|
return [{
|
||||||
|
update: {
|
||||||
|
_index: 'test',
|
||||||
|
_id: id++
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
doc_as_upsert: true
|
||||||
|
}]
|
||||||
|
},
|
||||||
|
onDrop (doc) {
|
||||||
|
t.fail('This should never be called')
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.type(result.time, 'number')
|
||||||
|
t.type(result.bytes, 'number')
|
||||||
|
t.match(result, {
|
||||||
|
total: 3,
|
||||||
|
successful: 3,
|
||||||
|
noop: 3,
|
||||||
|
retry: 0,
|
||||||
|
failed: 0,
|
||||||
|
aborted: false
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
t.end()
|
t.end()
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -1263,5 +1312,52 @@ test('Flush interval', t => {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.test('Operation stats', async t => {
|
||||||
|
let count = 0
|
||||||
|
const MockConnection = connection.buildMockConnection({
|
||||||
|
onRequest (params) {
|
||||||
|
t.strictEqual(params.path, '/_bulk')
|
||||||
|
t.match(params.headers, {
|
||||||
|
'content-type': 'application/x-ndjson',
|
||||||
|
'x-elastic-client-meta': `es=${clientVersion},js=${nodeVersion},t=${clientVersion},hc=${nodeVersion},h=bp`
|
||||||
|
})
|
||||||
|
const [action, payload] = params.body.split('\n')
|
||||||
|
t.deepEqual(JSON.parse(action), { index: { _index: 'test' } })
|
||||||
|
t.deepEqual(JSON.parse(payload), dataset[count++])
|
||||||
|
return { body: { errors: false, items: [{}] } }
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
const client = new Client({
|
||||||
|
node: 'http://localhost:9200',
|
||||||
|
Connection: MockConnection
|
||||||
|
})
|
||||||
|
const b = client.helpers.bulk({
|
||||||
|
datasource: dataset.slice(),
|
||||||
|
flushBytes: 1,
|
||||||
|
concurrency: 1,
|
||||||
|
onDocument (doc) {
|
||||||
|
return {
|
||||||
|
index: { _index: 'test' }
|
||||||
|
}
|
||||||
|
},
|
||||||
|
onDrop (doc) {
|
||||||
|
t.fail('This should never be called')
|
||||||
|
}
|
||||||
|
})
|
||||||
|
const result = await b
|
||||||
|
|
||||||
|
t.type(result.time, 'number')
|
||||||
|
t.type(result.bytes, 'number')
|
||||||
|
t.match(result, b.stats)
|
||||||
|
t.match(result, {
|
||||||
|
total: 3,
|
||||||
|
successful: 3,
|
||||||
|
retry: 0,
|
||||||
|
failed: 0,
|
||||||
|
aborted: false
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
t.end()
|
t.end()
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user