Allow document to be overwritten in onDocument iteratee of bulk helper (#1732)
Co-authored-by: Josh Mock <joshua.mock@elastic.co>
This commit is contained in:
@ -74,11 +74,11 @@ export interface BulkStats {
|
||||
aborted: boolean
|
||||
}
|
||||
|
||||
interface IndexAction {
|
||||
interface IndexActionOperation {
|
||||
index: T.BulkIndexOperation
|
||||
}
|
||||
|
||||
interface CreateAction {
|
||||
interface CreateActionOperation {
|
||||
create: T.BulkCreateOperation
|
||||
}
|
||||
|
||||
@ -90,7 +90,9 @@ interface DeleteAction {
|
||||
delete: T.BulkDeleteOperation
|
||||
}
|
||||
|
||||
type UpdateAction = [UpdateActionOperation, Record<string, any>]
|
||||
type CreateAction = CreateActionOperation | [CreateActionOperation, unknown]
|
||||
type IndexAction = IndexActionOperation | [IndexActionOperation, unknown]
|
||||
type UpdateAction = [UpdateActionOperation, T.BulkUpdateAction]
|
||||
type Action = IndexAction | CreateAction | UpdateAction | DeleteAction
|
||||
|
||||
export interface OnDropDocument<TDocument = unknown> {
|
||||
@ -618,22 +620,21 @@ export default class Helpers {
|
||||
for await (const chunk of datasource) {
|
||||
if (shouldAbort) break
|
||||
timeoutRef.refresh()
|
||||
const action = onDocument(chunk)
|
||||
const operation = Array.isArray(action)
|
||||
? Object.keys(action[0])[0]
|
||||
: Object.keys(action)[0]
|
||||
const result = onDocument(chunk)
|
||||
const [action, payload] = Array.isArray(result) ? result : [result, chunk]
|
||||
const operation = Object.keys(action)[0]
|
||||
if (operation === 'index' || operation === 'create') {
|
||||
actionBody = serializer.serialize(action)
|
||||
payloadBody = typeof chunk === 'string' ? chunk : serializer.serialize(chunk)
|
||||
payloadBody = typeof payload === 'string'
|
||||
? payload
|
||||
: serializer.serialize(payload)
|
||||
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
|
||||
bulkBody.push(actionBody, payloadBody)
|
||||
} else if (operation === 'update') {
|
||||
// @ts-expect-error in case of update action is an array
|
||||
actionBody = serializer.serialize(action[0])
|
||||
actionBody = serializer.serialize(action)
|
||||
payloadBody = typeof chunk === 'string'
|
||||
? `{"doc":${chunk}}`
|
||||
// @ts-expect-error in case of update action is an array
|
||||
: serializer.serialize({ doc: chunk, ...action[1] })
|
||||
: serializer.serialize({ doc: chunk, ...payload })
|
||||
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
|
||||
bulkBody.push(actionBody, payloadBody)
|
||||
} else if (operation === 'delete') {
|
||||
|
||||
@ -17,11 +17,11 @@
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
import * as http from 'http'
|
||||
import FakeTimers from '@sinonjs/fake-timers'
|
||||
import { createReadStream } from 'fs'
|
||||
import * as http from 'http'
|
||||
import { join } from 'path'
|
||||
import split from 'split2'
|
||||
import FakeTimers from '@sinonjs/fake-timers'
|
||||
import { test } from 'tap'
|
||||
import { Client, errors } from '../../../'
|
||||
import { buildServer, connection } from '../../utils'
|
||||
@ -785,6 +785,59 @@ test('bulk index', t => {
|
||||
t.end()
|
||||
})
|
||||
|
||||
t.test('Should use payload returned by `onDocument`', async t => {
|
||||
let count = 0
|
||||
const updatedAt = '1970-01-01T12:00:00.000Z'
|
||||
const MockConnection = connection.buildMockConnection({
|
||||
onRequest (params) {
|
||||
t.equal(params.path, '/_bulk')
|
||||
t.match(params.headers, {
|
||||
'content-type': 'application/vnd.elasticsearch+x-ndjson; compatible-with=8',
|
||||
'x-elastic-client-meta': `es=${clientVersion},js=${nodeVersion},t=${transportVersion},hc=${nodeVersion},h=bp`
|
||||
})
|
||||
// @ts-expect-error
|
||||
const [action, payload] = params.body.split('\n')
|
||||
t.same(JSON.parse(action), { index: { _index: 'test' } })
|
||||
t.same(JSON.parse(payload), { ...dataset[count++], updatedAt })
|
||||
return { body: { errors: false, items: [{}] } }
|
||||
}
|
||||
})
|
||||
|
||||
const client = new Client({
|
||||
node: 'http://localhost:9200',
|
||||
Connection: MockConnection
|
||||
})
|
||||
const result = await client.helpers.bulk<Document>({
|
||||
datasource: dataset.slice(),
|
||||
flushBytes: 1,
|
||||
concurrency: 1,
|
||||
onDocument (doc) {
|
||||
t.type(doc.user, 'string') // testing that doc is type of Document
|
||||
return [
|
||||
{
|
||||
index: {
|
||||
_index: 'test'
|
||||
}
|
||||
},
|
||||
{ ...doc, updatedAt }
|
||||
]
|
||||
},
|
||||
onDrop (doc) {
|
||||
t.fail('This should never be called')
|
||||
}
|
||||
})
|
||||
|
||||
t.type(result.time, 'number')
|
||||
t.type(result.bytes, 'number')
|
||||
t.match(result, {
|
||||
total: 3,
|
||||
successful: 3,
|
||||
retry: 0,
|
||||
failed: 0,
|
||||
aborted: false
|
||||
})
|
||||
})
|
||||
|
||||
t.end()
|
||||
})
|
||||
|
||||
@ -835,6 +888,58 @@ test('bulk create', t => {
|
||||
aborted: false
|
||||
})
|
||||
})
|
||||
|
||||
t.test('Should use payload returned by `onDocument`', async t => {
|
||||
let count = 0
|
||||
const updatedAt = '1970-01-01T12:00:00.000Z'
|
||||
const MockConnection = connection.buildMockConnection({
|
||||
onRequest (params) {
|
||||
t.equal(params.path, '/_bulk')
|
||||
t.match(params.headers, { 'content-type': 'application/vnd.elasticsearch+x-ndjson; compatible-with=8' })
|
||||
// @ts-expect-error
|
||||
const [action, payload] = params.body.split('\n')
|
||||
t.same(JSON.parse(action), { create: { _index: 'test', _id: count } })
|
||||
t.same(JSON.parse(payload), { ...dataset[count++], updatedAt })
|
||||
return { body: { errors: false, items: [{}] } }
|
||||
}
|
||||
})
|
||||
|
||||
const client = new Client({
|
||||
node: 'http://localhost:9200',
|
||||
Connection: MockConnection
|
||||
})
|
||||
let id = 0
|
||||
const result = await client.helpers.bulk({
|
||||
datasource: dataset.slice(),
|
||||
flushBytes: 1,
|
||||
concurrency: 1,
|
||||
onDocument (doc) {
|
||||
return [
|
||||
{
|
||||
create: {
|
||||
_index: 'test',
|
||||
_id: String(id++)
|
||||
}
|
||||
},
|
||||
{ ...doc, updatedAt }
|
||||
]
|
||||
},
|
||||
onDrop (doc) {
|
||||
t.fail('This should never be called')
|
||||
}
|
||||
})
|
||||
|
||||
t.type(result.time, 'number')
|
||||
t.type(result.bytes, 'number')
|
||||
t.match(result, {
|
||||
total: 3,
|
||||
successful: 3,
|
||||
retry: 0,
|
||||
failed: 0,
|
||||
aborted: false
|
||||
})
|
||||
})
|
||||
|
||||
t.end()
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user