[Backport 8.8] Allow document to be overwritten in onDocument iteratee of bulk helper (#1732)Co-authored-by: Josh Mock <joshua.mock@elastic.co> (#1952)
Co-authored-by: Robert Da Silva <mail@robdasilva.com>
This commit is contained in:
@ -74,11 +74,11 @@ export interface BulkStats {
|
|||||||
aborted: boolean
|
aborted: boolean
|
||||||
}
|
}
|
||||||
|
|
||||||
interface IndexAction {
|
interface IndexActionOperation {
|
||||||
index: T.BulkIndexOperation
|
index: T.BulkIndexOperation
|
||||||
}
|
}
|
||||||
|
|
||||||
interface CreateAction {
|
interface CreateActionOperation {
|
||||||
create: T.BulkCreateOperation
|
create: T.BulkCreateOperation
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -90,7 +90,9 @@ interface DeleteAction {
|
|||||||
delete: T.BulkDeleteOperation
|
delete: T.BulkDeleteOperation
|
||||||
}
|
}
|
||||||
|
|
||||||
type UpdateAction = [UpdateActionOperation, Record<string, any>]
|
type CreateAction = CreateActionOperation | [CreateActionOperation, unknown]
|
||||||
|
type IndexAction = IndexActionOperation | [IndexActionOperation, unknown]
|
||||||
|
type UpdateAction = [UpdateActionOperation, T.BulkUpdateAction]
|
||||||
type Action = IndexAction | CreateAction | UpdateAction | DeleteAction
|
type Action = IndexAction | CreateAction | UpdateAction | DeleteAction
|
||||||
|
|
||||||
export interface OnDropDocument<TDocument = unknown> {
|
export interface OnDropDocument<TDocument = unknown> {
|
||||||
@ -618,22 +620,21 @@ export default class Helpers {
|
|||||||
for await (const chunk of datasource) {
|
for await (const chunk of datasource) {
|
||||||
if (shouldAbort) break
|
if (shouldAbort) break
|
||||||
timeoutRef.refresh()
|
timeoutRef.refresh()
|
||||||
const action = onDocument(chunk)
|
const result = onDocument(chunk)
|
||||||
const operation = Array.isArray(action)
|
const [action, payload] = Array.isArray(result) ? result : [result, chunk]
|
||||||
? Object.keys(action[0])[0]
|
const operation = Object.keys(action)[0]
|
||||||
: Object.keys(action)[0]
|
|
||||||
if (operation === 'index' || operation === 'create') {
|
if (operation === 'index' || operation === 'create') {
|
||||||
actionBody = serializer.serialize(action)
|
actionBody = serializer.serialize(action)
|
||||||
payloadBody = typeof chunk === 'string' ? chunk : serializer.serialize(chunk)
|
payloadBody = typeof payload === 'string'
|
||||||
|
? payload
|
||||||
|
: serializer.serialize(payload)
|
||||||
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
|
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
|
||||||
bulkBody.push(actionBody, payloadBody)
|
bulkBody.push(actionBody, payloadBody)
|
||||||
} else if (operation === 'update') {
|
} else if (operation === 'update') {
|
||||||
// @ts-expect-error in case of update action is an array
|
actionBody = serializer.serialize(action)
|
||||||
actionBody = serializer.serialize(action[0])
|
|
||||||
payloadBody = typeof chunk === 'string'
|
payloadBody = typeof chunk === 'string'
|
||||||
? `{"doc":${chunk}}`
|
? `{"doc":${chunk}}`
|
||||||
// @ts-expect-error in case of update action is an array
|
: serializer.serialize({ doc: chunk, ...payload })
|
||||||
: serializer.serialize({ doc: chunk, ...action[1] })
|
|
||||||
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
|
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
|
||||||
bulkBody.push(actionBody, payloadBody)
|
bulkBody.push(actionBody, payloadBody)
|
||||||
} else if (operation === 'delete') {
|
} else if (operation === 'delete') {
|
||||||
|
|||||||
@ -17,11 +17,11 @@
|
|||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
import * as http from 'http'
|
import FakeTimers from '@sinonjs/fake-timers'
|
||||||
import { createReadStream } from 'fs'
|
import { createReadStream } from 'fs'
|
||||||
|
import * as http from 'http'
|
||||||
import { join } from 'path'
|
import { join } from 'path'
|
||||||
import split from 'split2'
|
import split from 'split2'
|
||||||
import FakeTimers from '@sinonjs/fake-timers'
|
|
||||||
import { test } from 'tap'
|
import { test } from 'tap'
|
||||||
import { Client, errors } from '../../../'
|
import { Client, errors } from '../../../'
|
||||||
import { buildServer, connection } from '../../utils'
|
import { buildServer, connection } from '../../utils'
|
||||||
@ -785,6 +785,59 @@ test('bulk index', t => {
|
|||||||
t.end()
|
t.end()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.test('Should use payload returned by `onDocument`', async t => {
|
||||||
|
let count = 0
|
||||||
|
const updatedAt = '1970-01-01T12:00:00.000Z'
|
||||||
|
const MockConnection = connection.buildMockConnection({
|
||||||
|
onRequest (params) {
|
||||||
|
t.equal(params.path, '/_bulk')
|
||||||
|
t.match(params.headers, {
|
||||||
|
'content-type': 'application/vnd.elasticsearch+x-ndjson; compatible-with=8',
|
||||||
|
'x-elastic-client-meta': `es=${clientVersion},js=${nodeVersion},t=${transportVersion},hc=${nodeVersion},h=bp`
|
||||||
|
})
|
||||||
|
// @ts-expect-error
|
||||||
|
const [action, payload] = params.body.split('\n')
|
||||||
|
t.same(JSON.parse(action), { index: { _index: 'test' } })
|
||||||
|
t.same(JSON.parse(payload), { ...dataset[count++], updatedAt })
|
||||||
|
return { body: { errors: false, items: [{}] } }
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
const client = new Client({
|
||||||
|
node: 'http://localhost:9200',
|
||||||
|
Connection: MockConnection
|
||||||
|
})
|
||||||
|
const result = await client.helpers.bulk<Document>({
|
||||||
|
datasource: dataset.slice(),
|
||||||
|
flushBytes: 1,
|
||||||
|
concurrency: 1,
|
||||||
|
onDocument (doc) {
|
||||||
|
t.type(doc.user, 'string') // testing that doc is type of Document
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
index: {
|
||||||
|
_index: 'test'
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{ ...doc, updatedAt }
|
||||||
|
]
|
||||||
|
},
|
||||||
|
onDrop (doc) {
|
||||||
|
t.fail('This should never be called')
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.type(result.time, 'number')
|
||||||
|
t.type(result.bytes, 'number')
|
||||||
|
t.match(result, {
|
||||||
|
total: 3,
|
||||||
|
successful: 3,
|
||||||
|
retry: 0,
|
||||||
|
failed: 0,
|
||||||
|
aborted: false
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
t.end()
|
t.end()
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -835,6 +888,58 @@ test('bulk create', t => {
|
|||||||
aborted: false
|
aborted: false
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.test('Should use payload returned by `onDocument`', async t => {
|
||||||
|
let count = 0
|
||||||
|
const updatedAt = '1970-01-01T12:00:00.000Z'
|
||||||
|
const MockConnection = connection.buildMockConnection({
|
||||||
|
onRequest (params) {
|
||||||
|
t.equal(params.path, '/_bulk')
|
||||||
|
t.match(params.headers, { 'content-type': 'application/vnd.elasticsearch+x-ndjson; compatible-with=8' })
|
||||||
|
// @ts-expect-error
|
||||||
|
const [action, payload] = params.body.split('\n')
|
||||||
|
t.same(JSON.parse(action), { create: { _index: 'test', _id: count } })
|
||||||
|
t.same(JSON.parse(payload), { ...dataset[count++], updatedAt })
|
||||||
|
return { body: { errors: false, items: [{}] } }
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
const client = new Client({
|
||||||
|
node: 'http://localhost:9200',
|
||||||
|
Connection: MockConnection
|
||||||
|
})
|
||||||
|
let id = 0
|
||||||
|
const result = await client.helpers.bulk({
|
||||||
|
datasource: dataset.slice(),
|
||||||
|
flushBytes: 1,
|
||||||
|
concurrency: 1,
|
||||||
|
onDocument (doc) {
|
||||||
|
return [
|
||||||
|
{
|
||||||
|
create: {
|
||||||
|
_index: 'test',
|
||||||
|
_id: String(id++)
|
||||||
|
}
|
||||||
|
},
|
||||||
|
{ ...doc, updatedAt }
|
||||||
|
]
|
||||||
|
},
|
||||||
|
onDrop (doc) {
|
||||||
|
t.fail('This should never be called')
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.type(result.time, 'number')
|
||||||
|
t.type(result.bytes, 'number')
|
||||||
|
t.match(result, {
|
||||||
|
total: 3,
|
||||||
|
successful: 3,
|
||||||
|
retry: 0,
|
||||||
|
failed: 0,
|
||||||
|
aborted: false
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
t.end()
|
t.end()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user