Improve helper concurrency (#1214)
This commit is contained in:
committed by
GitHub
parent
3751f50774
commit
7dfaa6c5b4
@ -238,10 +238,10 @@ The `result` exposes a `documents` property as well, which allows you to access
|
|||||||
const { Client } = require('@elastic/elasticsearch')
|
const { Client } = require('@elastic/elasticsearch')
|
||||||
|
|
||||||
const client = new Client({ node: 'http://localhost:9200' })
|
const client = new Client({ node: 'http://localhost:9200' })
|
||||||
const s = client.helpers.msearch()
|
const m = client.helpers.msearch()
|
||||||
|
|
||||||
// promise style API
|
// promise style API
|
||||||
s.search(
|
m.search(
|
||||||
{ index: 'stackoverflow' },
|
{ index: 'stackoverflow' },
|
||||||
{ query: { match: { title: 'javascript' } } }
|
{ query: { match: { title: 'javascript' } } }
|
||||||
)
|
)
|
||||||
@ -249,7 +249,7 @@ s.search(
|
|||||||
.catch(err => console.error(err))
|
.catch(err => console.error(err))
|
||||||
|
|
||||||
// callback style API
|
// callback style API
|
||||||
s.search(
|
m.search(
|
||||||
{ index: 'stackoverflow' },
|
{ index: 'stackoverflow' },
|
||||||
{ query: { match: { title: 'ruby' } } },
|
{ query: { match: { title: 'ruby' } } },
|
||||||
(err, result) => {
|
(err, result) => {
|
||||||
@ -267,7 +267,7 @@ a|How many search operations should be sent in a single msearch request. +
|
|||||||
_Default:_ `5`
|
_Default:_ `5`
|
||||||
[source,js]
|
[source,js]
|
||||||
----
|
----
|
||||||
const b = client.helpers.msearch({
|
const m = client.helpers.msearch({
|
||||||
operations: 10
|
operations: 10
|
||||||
})
|
})
|
||||||
----
|
----
|
||||||
@ -277,7 +277,7 @@ a|How much time (in milliseconds) the helper will wait before flushing the opera
|
|||||||
_Default:_ `500`
|
_Default:_ `500`
|
||||||
[source,js]
|
[source,js]
|
||||||
----
|
----
|
||||||
const b = client.helpers.msearch({
|
const m = client.helpers.msearch({
|
||||||
flushInterval: 500
|
flushInterval: 500
|
||||||
})
|
})
|
||||||
----
|
----
|
||||||
@ -287,7 +287,7 @@ a|How many request will be executed at the same time. +
|
|||||||
_Default:_ `5`
|
_Default:_ `5`
|
||||||
[source,js]
|
[source,js]
|
||||||
----
|
----
|
||||||
const b = client.helpers.msearch({
|
const m = client.helpers.msearch({
|
||||||
concurrency: 10
|
concurrency: 10
|
||||||
})
|
})
|
||||||
----
|
----
|
||||||
@ -297,7 +297,7 @@ a|How many times an operation will be retried before to resolve the request. An
|
|||||||
_Default:_ Client max retries.
|
_Default:_ Client max retries.
|
||||||
[source,js]
|
[source,js]
|
||||||
----
|
----
|
||||||
const b = client.helpers.msearch({
|
const m = client.helpers.msearch({
|
||||||
retries: 3
|
retries: 3
|
||||||
})
|
})
|
||||||
----
|
----
|
||||||
@ -307,7 +307,7 @@ a|How much time to wait before retries in milliseconds. +
|
|||||||
_Default:_ 5000.
|
_Default:_ 5000.
|
||||||
[source,js]
|
[source,js]
|
||||||
----
|
----
|
||||||
const b = client.helpers.msearch({
|
const m = client.helpers.msearch({
|
||||||
wait: 3000
|
wait: 3000
|
||||||
})
|
})
|
||||||
----
|
----
|
||||||
@ -328,23 +328,23 @@ NOTE: The stop method will stop the execution of the msearch processor, but if y
|
|||||||
const { Client } = require('@elastic/elasticsearch')
|
const { Client } = require('@elastic/elasticsearch')
|
||||||
|
|
||||||
const client = new Client({ node: 'http://localhost:9200' })
|
const client = new Client({ node: 'http://localhost:9200' })
|
||||||
const s = client.helpers.msearch()
|
const m = client.helpers.msearch()
|
||||||
|
|
||||||
s.search(
|
m.search(
|
||||||
{ index: 'stackoverflow' },
|
{ index: 'stackoverflow' },
|
||||||
{ query: { match: { title: 'javascript' } } }
|
{ query: { match: { title: 'javascript' } } }
|
||||||
)
|
)
|
||||||
.then(result => console.log(result.body))
|
.then(result => console.log(result.body))
|
||||||
.catch(err => console.error(err))
|
.catch(err => console.error(err))
|
||||||
|
|
||||||
s.search(
|
m.search(
|
||||||
{ index: 'stackoverflow' },
|
{ index: 'stackoverflow' },
|
||||||
{ query: { match: { title: 'ruby' } } }
|
{ query: { match: { title: 'ruby' } } }
|
||||||
)
|
)
|
||||||
.then(result => console.log(result.body))
|
.then(result => console.log(result.body))
|
||||||
.catch(err => console.error(err))
|
.catch(err => console.error(err))
|
||||||
|
|
||||||
setImmediate(() => s.stop())
|
setImmediate(() => m.stop())
|
||||||
----
|
----
|
||||||
|
|
||||||
=== Search Helper
|
=== Search Helper
|
||||||
|
|||||||
@ -301,6 +301,7 @@ class Helpers {
|
|||||||
|
|
||||||
function semaphore () {
|
function semaphore () {
|
||||||
if (running < concurrency) {
|
if (running < concurrency) {
|
||||||
|
running += 1
|
||||||
return pImmediate(send)
|
return pImmediate(send)
|
||||||
} else {
|
} else {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
@ -311,13 +312,13 @@ class Helpers {
|
|||||||
|
|
||||||
function send (msearchBody, callbacks) {
|
function send (msearchBody, callbacks) {
|
||||||
/* istanbul ignore if */
|
/* istanbul ignore if */
|
||||||
if (running >= concurrency) {
|
if (running > concurrency) {
|
||||||
throw new Error('Max concurrency reached')
|
throw new Error('Max concurrency reached')
|
||||||
}
|
}
|
||||||
running += 1
|
|
||||||
msearchOperation(msearchBody, callbacks, () => {
|
msearchOperation(msearchBody, callbacks, () => {
|
||||||
running -= 1
|
running -= 1
|
||||||
if (resolveSemaphore) {
|
if (resolveSemaphore) {
|
||||||
|
running += 1
|
||||||
resolveSemaphore(send)
|
resolveSemaphore(send)
|
||||||
resolveSemaphore = null
|
resolveSemaphore = null
|
||||||
} else if (resolveFinish && running === 0) {
|
} else if (resolveFinish && running === 0) {
|
||||||
@ -575,6 +576,7 @@ class Helpers {
|
|||||||
|
|
||||||
function semaphore () {
|
function semaphore () {
|
||||||
if (running < concurrency) {
|
if (running < concurrency) {
|
||||||
|
running += 1
|
||||||
return pImmediate(send)
|
return pImmediate(send)
|
||||||
} else {
|
} else {
|
||||||
return new Promise((resolve, reject) => {
|
return new Promise((resolve, reject) => {
|
||||||
@ -585,10 +587,9 @@ class Helpers {
|
|||||||
|
|
||||||
function send (bulkBody) {
|
function send (bulkBody) {
|
||||||
/* istanbul ignore if */
|
/* istanbul ignore if */
|
||||||
if (running >= concurrency) {
|
if (running > concurrency) {
|
||||||
throw new Error('Max concurrency reached')
|
throw new Error('Max concurrency reached')
|
||||||
}
|
}
|
||||||
running += 1
|
|
||||||
bulkOperation(bulkBody, err => {
|
bulkOperation(bulkBody, err => {
|
||||||
running -= 1
|
running -= 1
|
||||||
if (err) {
|
if (err) {
|
||||||
@ -596,6 +597,7 @@ class Helpers {
|
|||||||
error = err
|
error = err
|
||||||
}
|
}
|
||||||
if (resolveSemaphore) {
|
if (resolveSemaphore) {
|
||||||
|
running += 1
|
||||||
resolveSemaphore(send)
|
resolveSemaphore(send)
|
||||||
resolveSemaphore = null
|
resolveSemaphore = null
|
||||||
} else if (resolveFinish && running === 0) {
|
} else if (resolveFinish && running === 0) {
|
||||||
|
|||||||
@ -40,9 +40,9 @@ afterEach(async () => {
|
|||||||
|
|
||||||
test('Basic', t => {
|
test('Basic', t => {
|
||||||
t.plan(4)
|
t.plan(4)
|
||||||
const s = client.helpers.msearch({ operations: 1 })
|
const m = client.helpers.msearch({ operations: 1 })
|
||||||
|
|
||||||
s.search(
|
m.search(
|
||||||
{ index: INDEX },
|
{ index: INDEX },
|
||||||
{ query: { match: { title: 'javascript' } } },
|
{ query: { match: { title: 'javascript' } } },
|
||||||
(err, result) => {
|
(err, result) => {
|
||||||
@ -51,7 +51,7 @@ test('Basic', t => {
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
s.search(
|
m.search(
|
||||||
{ index: INDEX },
|
{ index: INDEX },
|
||||||
{ query: { match: { title: 'ruby' } } },
|
{ query: { match: { title: 'ruby' } } },
|
||||||
(err, result) => {
|
(err, result) => {
|
||||||
@ -60,14 +60,14 @@ test('Basic', t => {
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
t.teardown(() => s.stop())
|
t.teardown(() => m.stop())
|
||||||
})
|
})
|
||||||
|
|
||||||
test('Bad request', t => {
|
test('Bad request', t => {
|
||||||
t.plan(3)
|
t.plan(3)
|
||||||
const s = client.helpers.msearch({ operations: 1 })
|
const m = client.helpers.msearch({ operations: 1 })
|
||||||
|
|
||||||
s.search(
|
m.search(
|
||||||
{ index: INDEX },
|
{ index: INDEX },
|
||||||
{ query: { match: { title: 'javascript' } } },
|
{ query: { match: { title: 'javascript' } } },
|
||||||
(err, result) => {
|
(err, result) => {
|
||||||
@ -76,7 +76,7 @@ test('Bad request', t => {
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
s.search(
|
m.search(
|
||||||
{ index: INDEX },
|
{ index: INDEX },
|
||||||
{ query: { foo: { title: 'ruby' } } },
|
{ query: { foo: { title: 'ruby' } } },
|
||||||
(err, result) => {
|
(err, result) => {
|
||||||
@ -84,15 +84,15 @@ test('Bad request', t => {
|
|||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
t.teardown(() => s.stop())
|
t.teardown(() => m.stop())
|
||||||
})
|
})
|
||||||
|
|
||||||
test('Send multiple request concurrently over the concurrency limit', t => {
|
test('Send multiple request concurrently over the concurrency limit', t => {
|
||||||
t.plan(20)
|
t.plan(20)
|
||||||
const s = client.helpers.msearch({ operations: 1 })
|
const m = client.helpers.msearch({ operations: 1 })
|
||||||
|
|
||||||
for (let i = 0; i < 10; i++) {
|
for (let i = 0; i < 10; i++) {
|
||||||
s.search(
|
m.search(
|
||||||
{ index: INDEX },
|
{ index: INDEX },
|
||||||
{ query: { match: { title: 'javascript' } } },
|
{ query: { match: { title: 'javascript' } } },
|
||||||
(err, result) => {
|
(err, result) => {
|
||||||
@ -102,5 +102,5 @@ test('Send multiple request concurrently over the concurrency limit', t => {
|
|||||||
)
|
)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.teardown(() => s.stop())
|
t.teardown(() => m.stop())
|
||||||
})
|
})
|
||||||
|
|||||||
@ -34,9 +34,9 @@ test('Basic', async t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch({ operations: 1 })
|
const m = client.helpers.msearch({ operations: 1 })
|
||||||
|
|
||||||
const result = await s.search(
|
const result = await m.search(
|
||||||
{ index: 'test' },
|
{ index: 'test' },
|
||||||
{ query: { match: { foo: 'bar' } } }
|
{ query: { match: { foo: 'bar' } } }
|
||||||
)
|
)
|
||||||
@ -58,7 +58,7 @@ test('Basic', async t => {
|
|||||||
{ three: 'three' }
|
{ three: 'three' }
|
||||||
])
|
])
|
||||||
|
|
||||||
t.teardown(() => s.stop())
|
t.teardown(() => m.stop())
|
||||||
})
|
})
|
||||||
|
|
||||||
test('Multiple searches (inside async iterator)', t => {
|
test('Multiple searches (inside async iterator)', t => {
|
||||||
@ -97,9 +97,9 @@ test('Multiple searches (inside async iterator)', t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch({ operations: 2 })
|
const m = client.helpers.msearch({ operations: 2 })
|
||||||
|
|
||||||
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||||
t.error(err)
|
t.error(err)
|
||||||
t.deepEqual(result.body, {
|
t.deepEqual(result.body, {
|
||||||
status: 200,
|
status: 200,
|
||||||
@ -119,7 +119,7 @@ test('Multiple searches (inside async iterator)', t => {
|
|||||||
])
|
])
|
||||||
})
|
})
|
||||||
|
|
||||||
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||||
t.error(err)
|
t.error(err)
|
||||||
t.deepEqual(result.body, {
|
t.deepEqual(result.body, {
|
||||||
status: 200,
|
status: 200,
|
||||||
@ -139,7 +139,7 @@ test('Multiple searches (inside async iterator)', t => {
|
|||||||
])
|
])
|
||||||
})
|
})
|
||||||
|
|
||||||
t.teardown(() => s.stop())
|
t.teardown(() => m.stop())
|
||||||
})
|
})
|
||||||
|
|
||||||
test('Multiple searches (async iterator exits)', t => {
|
test('Multiple searches (async iterator exits)', t => {
|
||||||
@ -178,9 +178,9 @@ test('Multiple searches (async iterator exits)', t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch()
|
const m = client.helpers.msearch()
|
||||||
|
|
||||||
s.search({ index: 'test' }, { query: {} }, (err, result) => {
|
m.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||||
t.error(err)
|
t.error(err)
|
||||||
t.deepEqual(result.body, {
|
t.deepEqual(result.body, {
|
||||||
status: 200,
|
status: 200,
|
||||||
@ -200,7 +200,7 @@ test('Multiple searches (async iterator exits)', t => {
|
|||||||
])
|
])
|
||||||
})
|
})
|
||||||
|
|
||||||
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||||
t.error(err)
|
t.error(err)
|
||||||
t.deepEqual(result.body, {
|
t.deepEqual(result.body, {
|
||||||
status: 200,
|
status: 200,
|
||||||
@ -220,7 +220,7 @@ test('Multiple searches (async iterator exits)', t => {
|
|||||||
])
|
])
|
||||||
})
|
})
|
||||||
|
|
||||||
setImmediate(() => s.stop())
|
setImmediate(() => m.stop())
|
||||||
})
|
})
|
||||||
|
|
||||||
test('Stop a msearch processor (promises)', async t => {
|
test('Stop a msearch processor (promises)', async t => {
|
||||||
@ -235,12 +235,12 @@ test('Stop a msearch processor (promises)', async t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch({ operations: 1 })
|
const m = client.helpers.msearch({ operations: 1 })
|
||||||
|
|
||||||
s.stop()
|
m.stop()
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await s.search(
|
await m.search(
|
||||||
{ index: 'test' },
|
{ index: 'test' },
|
||||||
{ query: { match: { foo: 'bar' } } }
|
{ query: { match: { foo: 'bar' } } }
|
||||||
)
|
)
|
||||||
@ -248,7 +248,7 @@ test('Stop a msearch processor (promises)', async t => {
|
|||||||
t.strictEqual(err.message, 'The msearch processor has been stopped')
|
t.strictEqual(err.message, 'The msearch processor has been stopped')
|
||||||
}
|
}
|
||||||
|
|
||||||
t.teardown(() => s.stop())
|
t.teardown(() => m.stop())
|
||||||
})
|
})
|
||||||
|
|
||||||
test('Stop a msearch processor (callbacks)', t => {
|
test('Stop a msearch processor (callbacks)', t => {
|
||||||
@ -265,11 +265,11 @@ test('Stop a msearch processor (callbacks)', t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch()
|
const m = client.helpers.msearch()
|
||||||
|
|
||||||
s.stop()
|
m.stop()
|
||||||
|
|
||||||
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||||
t.strictEqual(err.message, 'The msearch processor has been stopped')
|
t.strictEqual(err.message, 'The msearch processor has been stopped')
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@ -288,13 +288,13 @@ test('Bad header', t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch()
|
const m = client.helpers.msearch()
|
||||||
|
|
||||||
s.search(null, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
m.search(null, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||||
t.strictEqual(err.message, 'The header should be an object')
|
t.strictEqual(err.message, 'The header should be an object')
|
||||||
})
|
})
|
||||||
|
|
||||||
t.teardown(() => s.stop())
|
t.teardown(() => m.stop())
|
||||||
})
|
})
|
||||||
|
|
||||||
test('Bad body', t => {
|
test('Bad body', t => {
|
||||||
@ -311,13 +311,13 @@ test('Bad body', t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch()
|
const m = client.helpers.msearch()
|
||||||
|
|
||||||
s.search({ index: 'test' }, null, (err, result) => {
|
m.search({ index: 'test' }, null, (err, result) => {
|
||||||
t.strictEqual(err.message, 'The body should be an object')
|
t.strictEqual(err.message, 'The body should be an object')
|
||||||
})
|
})
|
||||||
|
|
||||||
t.teardown(() => s.stop())
|
t.teardown(() => m.stop())
|
||||||
})
|
})
|
||||||
|
|
||||||
test('Retry on 429', async t => {
|
test('Retry on 429', async t => {
|
||||||
@ -357,9 +357,9 @@ test('Retry on 429', async t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch({ operations: 1, wait: 10 })
|
const m = client.helpers.msearch({ operations: 1, wait: 10 })
|
||||||
|
|
||||||
const result = await s.search(
|
const result = await m.search(
|
||||||
{ index: 'test' },
|
{ index: 'test' },
|
||||||
{ query: { match: { foo: 'bar' } } }
|
{ query: { match: { foo: 'bar' } } }
|
||||||
)
|
)
|
||||||
@ -381,7 +381,7 @@ test('Retry on 429', async t => {
|
|||||||
{ three: 'three' }
|
{ three: 'three' }
|
||||||
])
|
])
|
||||||
|
|
||||||
t.teardown(() => s.stop())
|
t.teardown(() => m.stop())
|
||||||
})
|
})
|
||||||
|
|
||||||
test('Single search errors', async t => {
|
test('Single search errors', async t => {
|
||||||
@ -403,10 +403,10 @@ test('Single search errors', async t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch({ operations: 1 })
|
const m = client.helpers.msearch({ operations: 1 })
|
||||||
|
|
||||||
try {
|
try {
|
||||||
await s.search(
|
await m.search(
|
||||||
{ index: 'test' },
|
{ index: 'test' },
|
||||||
{ query: { match: { foo: 'bar' } } }
|
{ query: { match: { foo: 'bar' } } }
|
||||||
)
|
)
|
||||||
@ -414,7 +414,7 @@ test('Single search errors', async t => {
|
|||||||
t.true(err instanceof errors.ResponseError)
|
t.true(err instanceof errors.ResponseError)
|
||||||
}
|
}
|
||||||
|
|
||||||
t.teardown(() => s.stop())
|
t.teardown(() => m.stop())
|
||||||
})
|
})
|
||||||
|
|
||||||
test('Entire msearch fails', t => {
|
test('Entire msearch fails', t => {
|
||||||
@ -437,19 +437,19 @@ test('Entire msearch fails', t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch({ operations: 1 })
|
const m = client.helpers.msearch({ operations: 1 })
|
||||||
|
|
||||||
s.search({ index: 'test' }, { query: {} }, (err, result) => {
|
m.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||||
t.true(err instanceof errors.ResponseError)
|
t.true(err instanceof errors.ResponseError)
|
||||||
t.deepEqual(result.documents, [])
|
t.deepEqual(result.documents, [])
|
||||||
})
|
})
|
||||||
|
|
||||||
s.search({ index: 'test' }, { query: {} }, (err, result) => {
|
m.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||||
t.true(err instanceof errors.ResponseError)
|
t.true(err instanceof errors.ResponseError)
|
||||||
t.deepEqual(result.documents, [])
|
t.deepEqual(result.documents, [])
|
||||||
})
|
})
|
||||||
|
|
||||||
t.teardown(() => s.stop())
|
t.teardown(() => m.stop())
|
||||||
})
|
})
|
||||||
|
|
||||||
test('Resolves the msearch helper', t => {
|
test('Resolves the msearch helper', t => {
|
||||||
@ -466,16 +466,16 @@ test('Resolves the msearch helper', t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch()
|
const m = client.helpers.msearch()
|
||||||
|
|
||||||
s.stop()
|
m.stop()
|
||||||
|
|
||||||
s.then(
|
m.then(
|
||||||
() => t.pass('called'),
|
() => t.pass('called'),
|
||||||
e => t.fail('Should not fail')
|
e => t.fail('Should not fail')
|
||||||
)
|
)
|
||||||
|
|
||||||
s.catch(e => t.fail('Should not fail'))
|
m.catch(e => t.fail('Should not fail'))
|
||||||
})
|
})
|
||||||
|
|
||||||
test('Stop the msearch helper with an error', t => {
|
test('Stop the msearch helper with an error', t => {
|
||||||
@ -492,18 +492,18 @@ test('Stop the msearch helper with an error', t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch()
|
const m = client.helpers.msearch()
|
||||||
|
|
||||||
s.stop(new Error('kaboom'))
|
m.stop(new Error('kaboom'))
|
||||||
|
|
||||||
s.then(
|
m.then(
|
||||||
() => t.fail('Should fail'),
|
() => t.fail('Should fail'),
|
||||||
err => t.is(err.message, 'kaboom')
|
err => t.is(err.message, 'kaboom')
|
||||||
)
|
)
|
||||||
|
|
||||||
s.catch(err => t.is(err.message, 'kaboom'))
|
m.catch(err => t.is(err.message, 'kaboom'))
|
||||||
|
|
||||||
s.search({ index: 'test' }, { query: {} }, (err, result) => {
|
m.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||||
t.is(err.message, 'kaboom')
|
t.is(err.message, 'kaboom')
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@ -535,9 +535,9 @@ test('Multiple searches (concurrency = 1)', t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch({ operations: 1, concurrency: 1 })
|
const m = client.helpers.msearch({ operations: 1, concurrency: 1 })
|
||||||
|
|
||||||
s.search({ index: 'test' }, { query: {} }, (err, result) => {
|
m.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||||
t.error(err)
|
t.error(err)
|
||||||
t.deepEqual(result.body, {
|
t.deepEqual(result.body, {
|
||||||
status: 200,
|
status: 200,
|
||||||
@ -557,7 +557,7 @@ test('Multiple searches (concurrency = 1)', t => {
|
|||||||
])
|
])
|
||||||
})
|
})
|
||||||
|
|
||||||
s.search({ index: 'test' }, { query: {} }, (err, result) => {
|
m.search({ index: 'test' }, { query: {} }, (err, result) => {
|
||||||
t.error(err)
|
t.error(err)
|
||||||
t.deepEqual(result.body, {
|
t.deepEqual(result.body, {
|
||||||
status: 200,
|
status: 200,
|
||||||
@ -577,7 +577,7 @@ test('Multiple searches (concurrency = 1)', t => {
|
|||||||
])
|
])
|
||||||
})
|
})
|
||||||
|
|
||||||
t.teardown(() => s.stop())
|
t.teardown(() => m.stop())
|
||||||
})
|
})
|
||||||
|
|
||||||
test('Flush interval', t => {
|
test('Flush interval', t => {
|
||||||
@ -618,21 +618,21 @@ test('Flush interval', t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch()
|
const m = client.helpers.msearch()
|
||||||
|
|
||||||
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||||
t.error(err)
|
t.error(err)
|
||||||
t.is(result.documents.length, 3)
|
t.is(result.documents.length, 3)
|
||||||
})
|
})
|
||||||
|
|
||||||
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||||
t.error(err)
|
t.error(err)
|
||||||
t.is(result.documents.length, 3)
|
t.is(result.documents.length, 3)
|
||||||
})
|
})
|
||||||
|
|
||||||
setImmediate(clock.next)
|
setImmediate(clock.next)
|
||||||
|
|
||||||
t.teardown(() => s.stop())
|
t.teardown(() => m.stop())
|
||||||
})
|
})
|
||||||
|
|
||||||
test('Flush interval - early stop', t => {
|
test('Flush interval - early stop', t => {
|
||||||
@ -664,21 +664,21 @@ test('Flush interval - early stop', t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch()
|
const m = client.helpers.msearch()
|
||||||
|
|
||||||
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||||
t.error(err)
|
t.error(err)
|
||||||
t.is(result.documents.length, 3)
|
t.is(result.documents.length, 3)
|
||||||
})
|
})
|
||||||
|
|
||||||
setImmediate(() => {
|
setImmediate(() => {
|
||||||
clock.next()
|
clock.next()
|
||||||
s.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
m.search({ index: 'test' }, { query: { match: { foo: 'bar' } } }, (err, result) => {
|
||||||
t.ok(err instanceof errors.ConfigurationError)
|
t.ok(err instanceof errors.ConfigurationError)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
s.stop()
|
m.stop()
|
||||||
})
|
})
|
||||||
|
|
||||||
test('Stop should resolve the helper', t => {
|
test('Stop should resolve the helper', t => {
|
||||||
@ -699,10 +699,10 @@ test('Stop should resolve the helper', t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch()
|
const m = client.helpers.msearch()
|
||||||
setImmediate(s.stop)
|
setImmediate(m.stop)
|
||||||
|
|
||||||
s.then(() => t.pass('Called'))
|
m.then(() => t.pass('Called'))
|
||||||
.catch(() => t.fail('Should not fail'))
|
.catch(() => t.fail('Should not fail'))
|
||||||
})
|
})
|
||||||
|
|
||||||
@ -724,13 +724,13 @@ test('Stop should resolve the helper (error)', t => {
|
|||||||
Connection: MockConnection
|
Connection: MockConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
const s = client.helpers.msearch()
|
const m = client.helpers.msearch()
|
||||||
setImmediate(s.stop, new Error('kaboom'))
|
setImmediate(m.stop, new Error('kaboom'))
|
||||||
|
|
||||||
s.then(() => t.fail('Should not fail'))
|
m.then(() => t.fail('Should not fail'))
|
||||||
.catch(err => t.is(err.message, 'kaboom'))
|
.catch(err => t.is(err.message, 'kaboom'))
|
||||||
|
|
||||||
s.catch(err => t.is(err.message, 'kaboom'))
|
m.catch(err => t.is(err.message, 'kaboom'))
|
||||||
|
|
||||||
s.then(() => t.fail('Should not fail'), err => t.is(err.message, 'kaboom'))
|
m.then(() => t.fail('Should not fail'), err => t.is(err.message, 'kaboom'))
|
||||||
})
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user