Improve code coverage (#1124)

* Helpers: Fixed stats counting

* Improve code coverage
This commit is contained in:
Tomas Della Vedova
2020-03-24 12:27:05 +01:00
committed by GitHub
parent f99fe71b67
commit 85616b07ef
5 changed files with 200 additions and 4 deletions

View File

@ -229,10 +229,12 @@ class Helpers {
chunkBytes += Buffer.byteLength(actionBody) + Buffer.byteLength(payloadBody)
bulkBody.push(actionBody)
bulkBody.push(payloadBody)
} else { // delete
} else if (operation === 'delete') {
actionBody = serialize(action)
chunkBytes += Buffer.byteLength(actionBody)
bulkBody.push(actionBody)
} else {
throw new ConfigurationError(`Bulk helper invalid action: '${operation}'`)
}
if (chunkBytes >= flushBytes) {
@ -315,6 +317,7 @@ class Helpers {
}
function send (bulkBody) {
/* istanbul ignore if */
if (running >= concurrency) {
throw new Error('Max concurrency reached')
}
@ -351,11 +354,12 @@ class Helpers {
function retryDocuments (err, bulkBody) {
if (err) return callback(err)
if (shouldAbort === true) return callback()
isRetrying = true
if (bulkBody.length > 0) {
if (retryCount > 0) {
isRetrying = true
retryCount -= 1
stats.retry += bulkBody.length
setTimeout(tryBulk, wait, bulkBody, retryDocuments)
return
}
@ -397,7 +401,6 @@ class Helpers {
// a document, because it was not an error in the document itself,
// but the ES node were handling too many operations.
if (status === 429) {
stats.retry += 1
retry.push(bulkBody[indexSlice])
if (operation !== 'delete') {
retry.push(bulkBody[indexSlice + 1])

View File

@ -49,6 +49,7 @@ class Transport {
} else if (opts.nodeSelector === 'round-robin') {
this.nodeSelector = roundRobinSelector()
} else if (opts.nodeSelector === 'random') {
/* istanbul ignore next */
this.nodeSelector = randomSelector
} else {
this.nodeSelector = roundRobinSelector()

View File

@ -143,6 +143,51 @@ test('bulk index', t => {
})
})
t.test('refreshOnCompletion', async t => {
let count = 0
const MockConnection = connection.buildMockConnection({
onRequest (params) {
if (params.method === 'GET') {
t.strictEqual(params.path, '/_all/_refresh')
return { body: { acknowledged: true } }
} else {
t.strictEqual(params.path, '/_bulk')
t.match(params.headers, { 'Content-Type': 'application/x-ndjson' })
const [action, payload] = params.body.split('\n')
t.deepEqual(JSON.parse(action), { index: { _index: 'test' } })
t.deepEqual(JSON.parse(payload), dataset[count++])
return { body: { errors: false, items: [{}] } }
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const result = await client.helpers.bulk({
datasource: dataset.slice(),
flushBytes: 1,
concurrency: 1,
refreshOnCompletion: true,
onDocument (doc) {
return {
index: { _index: 'test' }
}
}
})
t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 3,
retry: 0,
failed: 0,
aborted: false
})
})
t.test('Should perform a bulk request (custom action)', async t => {
let count = 0
const MockConnection = connection.buildMockConnection({
@ -262,6 +307,55 @@ test('bulk index', t => {
server.stop()
})
t.test('Should perform a bulk request (retry a single document from batch)', async t => {
function handler (req, res) {
res.setHeader('content-type', 'application/json')
res.end(JSON.stringify({
took: 0,
errors: true,
items: [
{ index: { status: 200 } },
{ index: { status: 429 } },
{ index: { status: 200 } }
]
}))
}
const [{ port }, server] = await buildServer(handler)
const client = new Client({ node: `http://localhost:${port}` })
const result = await client.helpers.bulk({
datasource: dataset.slice(),
concurrency: 1,
wait: 10,
retries: 0,
onDocument (doc) {
return {
index: { _index: 'test' }
}
},
onDrop (doc) {
t.deepEqual(doc, {
status: 429,
error: null,
operation: { index: { _index: 'test' } },
document: { user: 'arya', age: 18 },
retried: false
})
}
})
t.type(result.time, 'number')
t.type(result.bytes, 'number')
t.match(result, {
total: 3,
successful: 2,
retry: 0,
failed: 1,
aborted: false
})
server.stop()
})
t.test('Should perform a bulk request (failure)', async t => {
if (semver.lt(process.versions.node, '10.0.0')) {
t.skip('This test will not pass on Node v8')
@ -475,6 +569,35 @@ test('bulk index', t => {
server.stop()
})
t.test('Invalid operation', t => {
t.plan(2)
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return { body: { errors: false, items: [{}] } }
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
client.helpers
.bulk({
datasource: dataset.slice(),
flushBytes: 1,
concurrency: 1,
onDocument (doc) {
return {
foo: { _index: 'test' }
}
}
})
.catch(err => {
t.true(err instanceof errors.ConfigurationError)
t.is(err.message, `Bulk helper invalid action: 'foo'`)
})
})
t.end()
})

View File

@ -164,7 +164,8 @@ test('Scroll search (retry throws and maxRetries)', async t => {
index: 'test',
body: { foo: 'bar' }
}, {
wait: 10
wait: 10,
ignore: [404]
})
try {
@ -178,6 +179,55 @@ test('Scroll search (retry throws and maxRetries)', async t => {
}
})
test('Scroll search (retry throws later)', async t => {
var count = 0
const MockConnection = connection.buildMockConnection({
onRequest (params) {
if (count > 1) {
count += 1
return { body: {}, statusCode: 429 }
}
return {
statusCode: 200,
body: {
_scroll_id: count === 4 ? undefined : 'id',
count,
hits: {
hits: [
{ _source: { one: 'one' } },
{ _source: { two: 'two' } },
{ _source: { three: 'three' } }
]
}
}
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const scrollSearch = client.helpers.scrollSearch({
index: 'test',
body: { foo: 'bar' }
}, {
wait: 10
})
try {
for await (const result of scrollSearch) { // eslint-disable-line
t.strictEqual(result.body.count, count)
count += 1
}
} catch (err) {
t.true(err instanceof errors.ResponseError)
t.strictEqual(err.statusCode, 429)
t.strictEqual(count, 5)
}
})
test('Scroll search documents', async t => {
var count = 0
const MockConnection = connection.buildMockConnection({

View File

@ -40,3 +40,22 @@ test('Search should have an additional documents property', async t => {
{ three: 'three' }
])
})
test('kGetHits fallback', async t => {
const MockConnection = connection.buildMockConnection({
onRequest (params) {
return { body: {} }
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const result = await client.helpers.search({
index: 'test',
body: { foo: 'bar' }
})
t.deepEqual(result, [])
})