[Backport 7.x] Improve helper concurrency (#1216)
Co-authored-by: Tomas Della Vedova <delvedor@users.noreply.github.com>
This commit is contained in:
committed by
GitHub
parent
8eacc288c7
commit
c99eac4699
@ -34,9 +34,9 @@ test('Basic', async t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch({ operations: 1 })
|
||||
const m = client.helpers.msearch({ operations: 1 })
|
||||
|
||||
const result = await s.search(
|
||||
const result = await m.search(
|
||||
{ index: 'test' },
|
||||
{ query: { match: { foo: 'bar' } } }
|
||||
)
|
||||
@ -58,7 +58,7 @@ test('Basic', async t => {
|
||||
{ three: 'three' }
|
||||
])
|
||||
|
||||
t.teardown(() => s.stop())
|
||||
t.teardown(() => m.stop())
|
||||
})
|
||||
|
||||
test('Multiple searches (inside async iterator)', t => {
|
||||
@ -97,9 +97,9 @@ test('Multiple searches (inside async iterator)', t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch({ operations: 2 })
|
||||
const m = client.helpers.msearch({ operations: 2 })
|
||||
|
||||
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
t.error(err)
|
||||
t.deepEqual(result.body, {
|
||||
status: 200,
|
||||
@ -119,7 +119,7 @@ test('Multiple searches (inside async iterator)', t => {
|
||||
])
|
||||
})
|
||||
|
||||
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
t.error(err)
|
||||
t.deepEqual(result.body, {
|
||||
status: 200,
|
||||
@ -139,7 +139,7 @@ test('Multiple searches (inside async iterator)', t => {
|
||||
])
|
||||
})
|
||||
|
||||
t.teardown(() => s.stop())
|
||||
t.teardown(() => m.stop())
|
||||
})
|
||||
|
||||
test('Multiple searches (async iterator exits)', t => {
|
||||
@ -178,9 +178,9 @@ test('Multiple searches (async iterator exits)', t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch()
|
||||
const m = client.helpers.msearch()
|
||||
|
||||
s.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||
m.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||
t.error(err)
|
||||
t.deepEqual(result.body, {
|
||||
status: 200,
|
||||
@ -200,7 +200,7 @@ test('Multiple searches (async iterator exits)', t => {
|
||||
])
|
||||
})
|
||||
|
||||
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
t.error(err)
|
||||
t.deepEqual(result.body, {
|
||||
status: 200,
|
||||
@ -220,7 +220,7 @@ test('Multiple searches (async iterator exits)', t => {
|
||||
])
|
||||
})
|
||||
|
||||
setImmediate(() => s.stop())
|
||||
setImmediate(() => m.stop())
|
||||
})
|
||||
|
||||
test('Stop a msearch processor (promises)', async t => {
|
||||
@ -235,12 +235,12 @@ test('Stop a msearch processor (promises)', async t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch({ operations: 1 })
|
||||
const m = client.helpers.msearch({ operations: 1 })
|
||||
|
||||
s.stop()
|
||||
m.stop()
|
||||
|
||||
try {
|
||||
await s.search(
|
||||
await m.search(
|
||||
{ index: 'test' },
|
||||
{ query: { match: { foo: 'bar' } } }
|
||||
)
|
||||
@ -248,7 +248,7 @@ test('Stop a msearch processor (promises)', async t => {
|
||||
t.strictEqual(err.message, 'The msearch processor has been stopped')
|
||||
}
|
||||
|
||||
t.teardown(() => s.stop())
|
||||
t.teardown(() => m.stop())
|
||||
})
|
||||
|
||||
test('Stop a msearch processor (callbacks)', t => {
|
||||
@ -265,11 +265,11 @@ test('Stop a msearch processor (callbacks)', t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch()
|
||||
const m = client.helpers.msearch()
|
||||
|
||||
s.stop()
|
||||
m.stop()
|
||||
|
||||
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
t.strictEqual(err.message, 'The msearch processor has been stopped')
|
||||
})
|
||||
})
|
||||
@ -288,13 +288,13 @@ test('Bad header', t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch()
|
||||
const m = client.helpers.msearch()
|
||||
|
||||
s.search(null, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
m.search(null, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
t.strictEqual(err.message, 'The header should be an object')
|
||||
})
|
||||
|
||||
t.teardown(() => s.stop())
|
||||
t.teardown(() => m.stop())
|
||||
})
|
||||
|
||||
test('Bad body', t => {
|
||||
@ -311,13 +311,13 @@ test('Bad body', t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch()
|
||||
const m = client.helpers.msearch()
|
||||
|
||||
s.search({ index: 'test' }, null, (err, result) => {
|
||||
m.search({ index: 'test' }, null, (err, result) => {
|
||||
t.strictEqual(err.message, 'The body should be an object')
|
||||
})
|
||||
|
||||
t.teardown(() => s.stop())
|
||||
t.teardown(() => m.stop())
|
||||
})
|
||||
|
||||
test('Retry on 429', async t => {
|
||||
@ -357,9 +357,9 @@ test('Retry on 429', async t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch({ operations: 1, wait: 10 })
|
||||
const m = client.helpers.msearch({ operations: 1, wait: 10 })
|
||||
|
||||
const result = await s.search(
|
||||
const result = await m.search(
|
||||
{ index: 'test' },
|
||||
{ query: { match: { foo: 'bar' } } }
|
||||
)
|
||||
@ -381,7 +381,7 @@ test('Retry on 429', async t => {
|
||||
{ three: 'three' }
|
||||
])
|
||||
|
||||
t.teardown(() => s.stop())
|
||||
t.teardown(() => m.stop())
|
||||
})
|
||||
|
||||
test('Single search errors', async t => {
|
||||
@ -403,10 +403,10 @@ test('Single search errors', async t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch({ operations: 1 })
|
||||
const m = client.helpers.msearch({ operations: 1 })
|
||||
|
||||
try {
|
||||
await s.search(
|
||||
await m.search(
|
||||
{ index: 'test' },
|
||||
{ query: { match: { foo: 'bar' } } }
|
||||
)
|
||||
@ -414,7 +414,7 @@ test('Single search errors', async t => {
|
||||
t.true(err instanceof errors.ResponseError)
|
||||
}
|
||||
|
||||
t.teardown(() => s.stop())
|
||||
t.teardown(() => m.stop())
|
||||
})
|
||||
|
||||
test('Entire msearch fails', t => {
|
||||
@ -437,19 +437,19 @@ test('Entire msearch fails', t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch({ operations: 1 })
|
||||
const m = client.helpers.msearch({ operations: 1 })
|
||||
|
||||
s.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||
m.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||
t.true(err instanceof errors.ResponseError)
|
||||
t.deepEqual(result.documents, [])
|
||||
})
|
||||
|
||||
s.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||
m.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||
t.true(err instanceof errors.ResponseError)
|
||||
t.deepEqual(result.documents, [])
|
||||
})
|
||||
|
||||
t.teardown(() => s.stop())
|
||||
t.teardown(() => m.stop())
|
||||
})
|
||||
|
||||
test('Resolves the msearch helper', t => {
|
||||
@ -466,16 +466,16 @@ test('Resolves the msearch helper', t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch()
|
||||
const m = client.helpers.msearch()
|
||||
|
||||
s.stop()
|
||||
m.stop()
|
||||
|
||||
s.then(
|
||||
m.then(
|
||||
() => t.pass('called'),
|
||||
e => t.fail('Should not fail')
|
||||
)
|
||||
|
||||
s.catch(e => t.fail('Should not fail'))
|
||||
m.catch(e => t.fail('Should not fail'))
|
||||
})
|
||||
|
||||
test('Stop the msearch helper with an error', t => {
|
||||
@ -492,18 +492,18 @@ test('Stop the msearch helper with an error', t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch()
|
||||
const m = client.helpers.msearch()
|
||||
|
||||
s.stop(new Error('kaboom'))
|
||||
m.stop(new Error('kaboom'))
|
||||
|
||||
s.then(
|
||||
m.then(
|
||||
() => t.fail('Should fail'),
|
||||
err => t.is(err.message, 'kaboom')
|
||||
)
|
||||
|
||||
s.catch(err => t.is(err.message, 'kaboom'))
|
||||
m.catch(err => t.is(err.message, 'kaboom'))
|
||||
|
||||
s.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||
m.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||
t.is(err.message, 'kaboom')
|
||||
})
|
||||
})
|
||||
@ -535,9 +535,9 @@ test('Multiple searches (concurrency = 1)', t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch({ operations: 1, concurrency: 1 })
|
||||
const m = client.helpers.msearch({ operations: 1, concurrency: 1 })
|
||||
|
||||
s.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||
m.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||
t.error(err)
|
||||
t.deepEqual(result.body, {
|
||||
status: 200,
|
||||
@ -557,7 +557,7 @@ test('Multiple searches (concurrency = 1)', t => {
|
||||
])
|
||||
})
|
||||
|
||||
s.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||
m.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||
t.error(err)
|
||||
t.deepEqual(result.body, {
|
||||
status: 200,
|
||||
@ -577,7 +577,7 @@ test('Multiple searches (concurrency = 1)', t => {
|
||||
])
|
||||
})
|
||||
|
||||
t.teardown(() => s.stop())
|
||||
t.teardown(() => m.stop())
|
||||
})
|
||||
|
||||
test('Flush interval', t => {
|
||||
@ -618,21 +618,21 @@ test('Flush interval', t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch()
|
||||
const m = client.helpers.msearch()
|
||||
|
||||
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
t.error(err)
|
||||
t.is(result.documents.length, 3)
|
||||
})
|
||||
|
||||
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
t.error(err)
|
||||
t.is(result.documents.length, 3)
|
||||
})
|
||||
|
||||
setImmediate(clock.next)
|
||||
|
||||
t.teardown(() => s.stop())
|
||||
t.teardown(() => m.stop())
|
||||
})
|
||||
|
||||
test('Flush interval - early stop', t => {
|
||||
@ -664,21 +664,21 @@ test('Flush interval - early stop', t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch()
|
||||
const m = client.helpers.msearch()
|
||||
|
||||
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
t.error(err)
|
||||
t.is(result.documents.length, 3)
|
||||
})
|
||||
|
||||
setImmediate(() => {
|
||||
clock.next()
|
||||
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||
t.ok(err instanceof errors.ConfigurationError)
|
||||
})
|
||||
})
|
||||
|
||||
s.stop()
|
||||
m.stop()
|
||||
})
|
||||
|
||||
test('Stop should resolve the helper', t => {
|
||||
@ -699,10 +699,10 @@ test('Stop should resolve the helper', t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch()
|
||||
setImmediate(s.stop)
|
||||
const m = client.helpers.msearch()
|
||||
setImmediate(m.stop)
|
||||
|
||||
s.then(() => t.pass('Called'))
|
||||
m.then(() => t.pass('Called'))
|
||||
.catch(() => t.fail('Should not fail'))
|
||||
})
|
||||
|
||||
@ -724,13 +724,13 @@ test('Stop should resolve the helper (error)', t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const s = client.helpers.msearch()
|
||||
setImmediate(s.stop, new Error('kaboom'))
|
||||
const m = client.helpers.msearch()
|
||||
setImmediate(m.stop, new Error('kaboom'))
|
||||
|
||||
s.then(() => t.fail('Should not fail'))
|
||||
m.then(() => t.fail('Should not fail'))
|
||||
.catch(err => t.is(err.message, 'kaboom'))
|
||||
|
||||
s.catch(err => t.is(err.message, 'kaboom'))
|
||||
m.catch(err => t.is(err.message, 'kaboom'))
|
||||
|
||||
s.then(() => t.fail('Should not fail'), err => t.is(err.message, 'kaboom'))
|
||||
m.then(() => t.fail('Should not fail'), err => t.is(err.message, 'kaboom'))
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user