[Backport 8.14] onSuccess function for bulk helper (#2204)

* Bulk helper onSuccess callback

For https://github.com/elastic/elasticsearch-js/issues/2090

Includes refactor of the tryBulk result processing code, to make
iterating over bulk response data easier to understand.

* Add onSuccess tests for each datasource type

* Cleanup, additional comments

* Add documentation for onSuccess callback

* Update changelog

* Drop link to 8.14 release notes.

Page not yet published, breaking docs build.

(cherry picked from commit 4aa00e03e1)

Co-authored-by: Josh Mock <joshua.mock@elastic.co>
This commit is contained in:
github-actions[bot]
2024-04-02 19:56:23 +00:00
committed by GitHub
parent 6900bfe990
commit 196bcf5091
4 changed files with 429 additions and 173 deletions

View File

@ -1,6 +1,14 @@
[[changelog-client]]
== Release notes
[discrete]
=== 8.14.0
[discrete]
===== `onSuccess` callback added to bulk helper
The bulk helper now supports an `onSuccess` callback that will be called for each successful operation. https://github.com/elastic/elasticsearch-js/pull/2199[#2199]
[discrete]
=== 8.13.0

View File

@ -98,6 +98,17 @@ const b = client.helpers.bulk({
})
----
|`onSuccess`
a|A function that is called for each successful operation in the bulk request, which includes the result from Elasticsearch along with the original document that was sent, or `null` for delete operations.
[source,js]
----
const b = client.helpers.bulk({
onSuccess ({ result, document }) {
console.log(`SUCCESS: Document ${result.index._id} indexed to ${result.index._index}`)
}
})
----
|`flushBytes`
a|The size of the bulk body in bytes to reach before to send it. Default of 5MB. +
_Default:_ `5000000`

View File

@ -103,6 +103,24 @@ export interface OnDropDocument<TDocument = unknown> {
retried: boolean
}
type BulkResponseItem = Partial<Record<T.BulkOperationType, T.BulkResponseItem>>
export interface OnSuccessDocument<TDocument = unknown> {
result: BulkResponseItem
document?: TDocument
}
interface ZippedResult<TDocument = unknown> {
result: BulkResponseItem
raw: {
action: string
document?: string
}
// this is a function so that deserialization is only done when needed
// to avoid a performance hit
document?: () => TDocument
}
export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
datasource: TDocument[] | Buffer | Readable | AsyncIterator<TDocument>
onDocument: (doc: TDocument) => Action
@ -112,6 +130,7 @@ export interface BulkHelperOptions<TDocument = unknown> extends T.BulkRequest {
retries?: number
wait?: number
onDrop?: (doc: OnDropDocument<TDocument>) => void
onSuccess?: (doc: OnSuccessDocument) => void
refreshOnCompletion?: boolean | string
}
@ -551,6 +570,9 @@ export default class Helpers {
retries = this[kMaxRetries],
wait = 5000,
onDrop = noop,
// onSuccess does not default to noop, to avoid the performance hit
// of deserializing every document in the bulk request
onSuccess,
refreshOnCompletion = false,
...bulkOptions
} = options
@ -817,57 +839,93 @@ export default class Helpers {
callback()
}
/**
* Zips bulk response items (the action's result) with the original document body.
* The raw string version of action and document lines are also included.
*/
function zipBulkResults (responseItems: BulkResponseItem[], bulkBody: string[]): ZippedResult[] {
const zipped = []
let indexSlice = 0
for (let i = 0, len = responseItems.length; i < len; i++) {
const result = responseItems[i]
const operation = Object.keys(result)[0]
let zipResult
if (operation === 'delete') {
zipResult = {
result,
raw: { action: bulkBody[indexSlice] }
}
indexSlice += 1
} else {
const document = bulkBody[indexSlice + 1]
zipResult = {
result,
raw: { action: bulkBody[indexSlice], document },
// this is a function so that deserialization is only done when needed
// to avoid a performance hit
document: () => serializer.deserialize(document)
}
indexSlice += 2
}
zipped.push(zipResult as ZippedResult)
}
return zipped
}
function tryBulk (bulkBody: string[], callback: (err: Error | null, bulkBody: string[]) => void): void {
if (shouldAbort) return callback(null, [])
client.bulk(Object.assign({}, bulkOptions, { body: bulkBody }), reqOptions as TransportRequestOptionsWithMeta)
.then(response => {
const result = response.body
const results = zipBulkResults(result.items, bulkBody)
if (!result.errors) {
stats.successful += result.items.length
for (const item of result.items) {
if (item.update?.result === 'noop') {
for (const item of results) {
const { result, document = noop } = item
if (result.update?.result === 'noop') {
stats.noop++
}
if (onSuccess != null) onSuccess({ result, document: document() })
}
return callback(null, [])
}
const retry = []
const { items } = result
let indexSlice = 0
for (let i = 0, len = items.length; i < len; i++) {
const action = items[i]
const operation = Object.keys(action)[0]
for (const item of results) {
const { result, raw, document = noop } = item
const operation = Object.keys(result)[0]
// @ts-expect-error
const responseItem = action[operation as keyof T.BulkResponseItemContainer]
const responseItem = result[operation as keyof T.BulkResponseItemContainer]
assert(responseItem !== undefined, 'The responseItem is undefined, please file a bug report')
if (responseItem.status >= 400) {
// 429 is the only staus code where we might want to retry
// 429 is the only status code where we might want to retry
// a document, because it was not an error in the document itself,
// but the ES node were handling too many operations.
// but the ES node was handling too many operations.
if (responseItem.status === 429) {
retry.push(bulkBody[indexSlice])
retry.push(raw.action)
/* istanbul ignore next */
if (operation !== 'delete') {
retry.push(bulkBody[indexSlice + 1])
retry.push(raw.document ?? '')
}
} else {
onDrop({
status: responseItem.status,
error: responseItem.error ?? null,
operation: serializer.deserialize(bulkBody[indexSlice]),
operation: serializer.deserialize(raw.action),
// @ts-expect-error
document: operation !== 'delete'
? serializer.deserialize(bulkBody[indexSlice + 1])
: null,
document: document(),
retried: isRetrying
})
stats.failed += 1
}
} else {
stats.successful += 1
if (onSuccess != null) onSuccess({ result, document: document() })
}
operation === 'delete' ? indexSlice += 1 : indexSlice += 2
}
callback(null, retry)
})

View File

@ -514,7 +514,7 @@ test('bulk index', t => {
t.test('Server error', async t => {
const MockConnection = connection.buildMockConnection({
onRequest (params) {
onRequest (_params) {
return {
statusCode: 500,
body: { somothing: 'went wrong' }
@ -530,12 +530,12 @@ test('bulk index', t => {
datasource: dataset.slice(),
flushBytes: 1,
concurrency: 1,
onDocument (doc) {
onDocument (_doc) {
return {
index: { _index: 'test' }
}
},
onDrop (doc) {
onDrop (_doc) {
t.fail('This should never be called')
}
})
@ -550,7 +550,7 @@ test('bulk index', t => {
t.test('Server error (high flush size, to trigger the finish error)', async t => {
const MockConnection = connection.buildMockConnection({
onRequest (params) {
onRequest (_params) {
return {
statusCode: 500,
body: { somothing: 'went wrong' }
@ -566,12 +566,12 @@ test('bulk index', t => {
datasource: dataset.slice(),
flushBytes: 5000000,
concurrency: 1,
onDocument (doc) {
onDocument (_doc) {
return {
index: { _index: 'test' }
}
},
onDrop (doc) {
onDrop (_doc) {
t.fail('This should never be called')
}
})
@ -625,12 +625,12 @@ test('bulk index', t => {
flushBytes: 1,
concurrency: 1,
wait: 10,
onDocument (doc) {
onDocument (_doc) {
return {
index: { _index: 'test' }
}
},
onDrop (doc) {
onDrop (_doc) {
b.abort()
}
})
@ -651,7 +651,7 @@ test('bulk index', t => {
t.test('Invalid operation', t => {
t.plan(2)
const MockConnection = connection.buildMockConnection({
onRequest (params) {
onRequest (_params) {
return { body: { errors: false, items: [{}] } }
}
})
@ -666,7 +666,7 @@ test('bulk index', t => {
flushBytes: 1,
concurrency: 1,
// @ts-expect-error
onDocument (doc) {
onDocument (_doc) {
return {
foo: { _index: 'test' }
}
@ -678,6 +678,43 @@ test('bulk index', t => {
})
})
t.test('should call onSuccess callback for each indexed document', async t => {
const MockConnection = connection.buildMockConnection({
onRequest (params) {
// @ts-expect-error
let [action] = params.body.split('\n')
action = JSON.parse(action)
return { body: { errors: false, items: [action] } }
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
let count = 0
await client.helpers.bulk<Document>({
datasource: dataset.slice(),
flushBytes: 1,
concurrency: 1,
onDocument (_doc) {
return {
index: { _index: 'test' }
}
},
onSuccess ({ result, document }) {
t.same(result, { index: { _index: 'test' }})
t.same(document, dataset[count++])
},
onDrop (_doc) {
t.fail('This should never be called')
}
})
t.equal(count, 3)
t.end()
})
t.end()
})
@ -731,6 +768,44 @@ test('bulk index', t => {
})
})
t.test('onSuccess is called for each indexed document', async t => {
const MockConnection = connection.buildMockConnection({
onRequest (params) {
// @ts-expect-error
let [action] = params.body.split('\n')
action = JSON.parse(action)
return { body: { errors: false, items: [action] } }
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const stream = createReadStream(join(__dirname, '..', '..', 'fixtures', 'small-dataset.ndjson'), 'utf8')
let count = 0
await client.helpers.bulk<Document>({
datasource: stream.pipe(split()),
flushBytes: 1,
concurrency: 1,
onDocument (_doc) {
return {
index: { _index: 'test' }
}
},
onSuccess ({ result, document }) {
t.same(result, { index: { _index: 'test' }})
t.same(document, dataset[count++])
},
onDrop (_doc) {
t.fail('This should never be called')
}
})
t.equal(count, 3)
t.end()
})
t.end()
})
@ -785,6 +860,50 @@ test('bulk index', t => {
aborted: false
})
})
t.test('onSuccess is called for each indexed document', async t => {
const MockConnection = connection.buildMockConnection({
onRequest (params) {
// @ts-expect-error
let [action] = params.body.split('\n')
action = JSON.parse(action)
return { body: { errors: false, items: [action] } }
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
async function * generator () {
const data = dataset.slice()
for (const doc of data) {
yield doc
}
}
let count = 0
await client.helpers.bulk<Document>({
datasource: generator(),
flushBytes: 1,
concurrency: 1,
onDocument (_doc) {
return {
index: { _index: 'test' }
}
},
onSuccess ({ result, document }) {
t.same(result, { index: { _index: 'test' }})
t.same(document, dataset[count++])
},
onDrop (_doc) {
t.fail('This should never be called')
}
})
t.equal(count, 3)
t.end()
})
t.end()
})
@ -943,6 +1062,8 @@ test('bulk create', t => {
})
})
t.end()
})
@ -1279,6 +1400,63 @@ test('bulk delete', t => {
server.stop()
})
t.test('should call onSuccess callback with delete action object', async t => {
const MockConnection = connection.buildMockConnection({
onRequest (params) {
// @ts-expect-error
let [action, payload] = params.body.split('\n')
action = JSON.parse(action)
return { body: { errors: false, items: [action] } }
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
let docCount = 0
let successCount = 0
await client.helpers.bulk<Document>({
datasource: dataset.slice(),
flushBytes: 1,
concurrency: 1,
onDocument (_doc) {
if (docCount++ === 1) {
return {
delete: {
_index: 'test',
_id: String(docCount)
}
}
} else {
return {
index: { _index: 'test' }
}
}
},
onSuccess ({ result, document }) {
const item = dataset[successCount]
if (successCount++ === 1) {
t.same(result, {
delete: {
_index: 'test',
_id: String(successCount)
}
})
} else {
t.same(result, { index: { _index: 'test' }})
t.same(document, item)
}
},
onDrop (_doc) {
t.fail('This should never be called')
}
})
t.end()
})
t.end()
})
@ -1594,152 +1772,153 @@ test('Flush interval', t => {
})
})
test(`flush timeout does not lock process when flushInterval is less than server timeout`, async t => {
const flushInterval = 500
async function handler (req: http.IncomingMessage, res: http.ServerResponse) {
setTimeout(() => {
res.writeHead(200, { 'content-type': 'application/json' })
res.end(JSON.stringify({ errors: false, items: [{}] }))
}, 1000)
}
const [{ port }, server] = await buildServer(handler)
const client = new Client({ node: `http://localhost:${port}` })
async function * generator () {
const data = dataset.slice()
for (const doc of data) {
await sleep(flushInterval)
yield doc
}
}
const result = await client.helpers.bulk({
datasource: Readable.from(generator()),
flushBytes: 1,
flushInterval: flushInterval,
concurrency: 1,
onDocument (_) {
return {
index: { _index: 'test' }
}
},
onDrop (_) {
t.fail('This should never be called')
}
})
t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false
})
server.stop()
})
test(`flush timeout does not lock process when flushInterval is greater than server timeout`, async t => {
const flushInterval = 500
async function handler (req: http.IncomingMessage, res: http.ServerResponse) {
setTimeout(() => {
res.writeHead(200, { 'content-type': 'application/json' })
res.end(JSON.stringify({ errors: false, items: [{}] }))
}, 250)
}
const [{ port }, server] = await buildServer(handler)
const client = new Client({ node: `http://localhost:${port}` })
async function * generator () {
const data = dataset.slice()
for (const doc of data) {
await sleep(flushInterval)
yield doc
}
}
const result = await client.helpers.bulk({
datasource: Readable.from(generator()),
flushBytes: 1,
flushInterval: flushInterval,
concurrency: 1,
onDocument (_) {
return {
index: { _index: 'test' }
}
},
onDrop (_) {
t.fail('This should never be called')
}
})
t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false
})
server.stop()
})
test(`flush timeout does not lock process when flushInterval is equal to server timeout`, async t => {
const flushInterval = 500
async function handler (req: http.IncomingMessage, res: http.ServerResponse) {
setTimeout(() => {
res.writeHead(200, { 'content-type': 'application/json' })
res.end(JSON.stringify({ errors: false, items: [{}] }))
}, flushInterval)
}
const [{ port }, server] = await buildServer(handler)
const client = new Client({ node: `http://localhost:${port}` })
async function * generator () {
const data = dataset.slice()
for (const doc of data) {
await sleep(flushInterval)
yield doc
}
}
const result = await client.helpers.bulk({
datasource: Readable.from(generator()),
flushBytes: 1,
flushInterval: flushInterval,
concurrency: 1,
onDocument (_) {
return {
index: { _index: 'test' }
}
},
onDrop (_) {
t.fail('This should never be called')
}
})
t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false
})
server.stop()
})
t.end()
})
test(`flush timeout does not lock process when flushInterval is less than server timeout`, async t => {
const flushInterval = 500
async function handler (req: http.IncomingMessage, res: http.ServerResponse) {
setTimeout(() => {
res.writeHead(200, { 'content-type': 'application/json' })
res.end(JSON.stringify({ errors: false, items: [{}] }))
}, 1000)
}
const [{ port }, server] = await buildServer(handler)
const client = new Client({ node: `http://localhost:${port}` })
async function * generator () {
const data = dataset.slice()
for (const doc of data) {
await sleep(flushInterval)
yield doc
}
}
const result = await client.helpers.bulk({
datasource: Readable.from(generator()),
flushBytes: 1,
flushInterval: flushInterval,
concurrency: 1,
onDocument (_) {
return {
index: { _index: 'test' }
}
},
onDrop (_) {
t.fail('This should never be called')
}
})
t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false
})
server.stop()
})
test(`flush timeout does not lock process when flushInterval is greater than server timeout`, async t => {
const flushInterval = 500
async function handler (req: http.IncomingMessage, res: http.ServerResponse) {
setTimeout(() => {
res.writeHead(200, { 'content-type': 'application/json' })
res.end(JSON.stringify({ errors: false, items: [{}] }))
}, 250)
}
const [{ port }, server] = await buildServer(handler)
const client = new Client({ node: `http://localhost:${port}` })
async function * generator () {
const data = dataset.slice()
for (const doc of data) {
await sleep(flushInterval)
yield doc
}
}
const result = await client.helpers.bulk({
datasource: Readable.from(generator()),
flushBytes: 1,
flushInterval: flushInterval,
concurrency: 1,
onDocument (_) {
return {
index: { _index: 'test' }
}
},
onDrop (_) {
t.fail('This should never be called')
}
})
t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false
})
server.stop()
})
test(`flush timeout does not lock process when flushInterval is equal to server timeout`, async t => {
const flushInterval = 500
async function handler (req: http.IncomingMessage, res: http.ServerResponse) {
setTimeout(() => {
res.writeHead(200, { 'content-type': 'application/json' })
res.end(JSON.stringify({ errors: false, items: [{}] }))
}, flushInterval)
}
const [{ port }, server] = await buildServer(handler)
const client = new Client({ node: `http://localhost:${port}` })
async function * generator () {
const data = dataset.slice()
for (const doc of data) {
await sleep(flushInterval)
yield doc
}
}
const result = await client.helpers.bulk({
datasource: Readable.from(generator()),
flushBytes: 1,
flushInterval: flushInterval,
concurrency: 1,
onDocument (_) {
return {
index: { _index: 'test' }
}
},
onDrop (_) {
t.fail('This should never be called')
}
})
t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false
})
server.stop()
})