diff --git a/docs/configuration.asciidoc b/docs/configuration.asciidoc index dc2664407..f2ca0f0c2 100644 --- a/docs/configuration.asciidoc +++ b/docs/configuration.asciidoc @@ -149,7 +149,9 @@ const client = new Client({ const client = new Client({ node: 'http://localhost:9200', - agent: () => new CustomAgent() + // the function takes as parameter the option + // object passed to the Connection constructor + agent: (opts) => new CustomAgent() }) const client = new Client({ diff --git a/lib/Connection.d.ts b/lib/Connection.d.ts index 00d1e676e..933a6a8eb 100644 --- a/lib/Connection.d.ts +++ b/lib/Connection.d.ts @@ -28,9 +28,9 @@ import * as https from 'https' import * as hpagent from 'hpagent' import { ConnectionOptions as TlsConnectionOptions } from 'tls' -export declare type agentFn = () => any; +export declare type agentFn = (opts: ConnectionOptions) => any; -interface ConnectionOptions { +export interface ConnectionOptions { url: URL; ssl?: TlsConnectionOptions; id?: string; diff --git a/lib/Connection.js b/lib/Connection.js index e1945b131..6bf38c495 100644 --- a/lib/Connection.js +++ b/lib/Connection.js @@ -53,7 +53,7 @@ class Connection { } if (typeof opts.agent === 'function') { - this.agent = opts.agent() + this.agent = opts.agent(opts) } else if (opts.agent === false) { this.agent = undefined } else { diff --git a/lib/Helpers.js b/lib/Helpers.js index ac8509764..6a3524b5f 100644 --- a/lib/Helpers.js +++ b/lib/Helpers.js @@ -104,13 +104,15 @@ class Helpers { } while (response.body.hits && response.body.hits.hits.length > 0) { + // scroll id is always present in the response, but it might + // change over time based on the number of shards scroll_id = response.body._scroll_id response.clear = clear addDocumentsGetter(response) yield response - if (!scroll_id || stop === true) { + if (stop === true) { break } @@ -127,6 +129,10 @@ class Helpers { throw new ResponseError(response) } } + + if (stop === false) { + await clear() + } } /** diff --git a/test/types/connection.test-d.ts b/test/types/connection.test-d.ts index df8910da6..66e9d257b 100644 --- a/test/types/connection.test-d.ts +++ b/test/types/connection.test-d.ts @@ -20,22 +20,35 @@ import { expectType } from 'tsd' import { URL } from 'url' import { Connection } from '../../' +import { ConnectionOptions } from '../../lib/Connection' -const conn = new Connection({ - url: new URL('http://localhost:9200'), - ssl: { ca: 'string' }, - id: 'id', - headers: {}, - agent: { keepAlive: false }, - status: 'alive', - roles: { master: true }, - auth: { username: 'username', password: 'password' } -}) +{ + const conn = new Connection({ + url: new URL('http://localhost:9200'), + ssl: { ca: 'string' }, + id: 'id', + headers: {}, + agent: { keepAlive: false }, + status: 'alive', + roles: { master: true }, + auth: { username: 'username', password: 'password' } + }) -expectType(conn) -expectType(conn.url) -expectType(conn.id) -expectType>(conn.headers) -expectType(conn.deadCount) -expectType(conn.resurrectTimeout) -expectType(conn.status) + expectType(conn) + expectType(conn.url) + expectType(conn.id) + expectType>(conn.headers) + expectType(conn.deadCount) + expectType(conn.resurrectTimeout) + expectType(conn.status) +} + +{ + const conn = new Connection({ + url: new URL('http://localhost:9200'), + agent (opts) { + expectType(opts) + return 'the agent' + } + }) +} diff --git a/test/unit/connection.test.js b/test/unit/connection.test.js index 0e3e6173a..c83147c8d 100644 --- a/test/unit/connection.test.js +++ b/test/unit/connection.test.js @@ -152,7 +152,7 @@ test('Basic (https with ssl agent)', t => { }) test('Custom http agent', t => { - t.plan(5) + t.plan(6) function handler (req, res) { t.match(req.headers, { @@ -172,7 +172,12 @@ test('Custom http agent', t => { agent.custom = true const connection = new Connection({ url: new URL(`http://localhost:${port}`), - agent: () => agent + agent: opts => { + t.match(opts, { + url: new URL(`http://localhost:${port}`) + }) + return agent + } }) t.true(connection.agent.custom) connection.request({ diff --git a/test/unit/helpers/scroll.test.js b/test/unit/helpers/scroll.test.js index cfa26c969..ca50957ee 100644 --- a/test/unit/helpers/scroll.test.js +++ b/test/unit/helpers/scroll.test.js @@ -27,17 +27,26 @@ test('Scroll search', async t => { var count = 0 const MockConnection = connection.buildMockConnection({ onRequest (params) { - t.strictEqual(params.querystring, 'scroll=1m') + count += 1 + if (params.method === 'POST') { + t.strictEqual(params.querystring, 'scroll=1m') + } + if (count === 4) { + // final automated clear + t.strictEqual(params.method, 'DELETE') + } return { body: { - _scroll_id: count === 3 ? undefined : 'id', + _scroll_id: 'id', count, hits: { - hits: [ - { _source: { one: 'one' } }, - { _source: { two: 'two' } }, - { _source: { three: 'three' } } - ] + hits: count === 3 + ? [] + : [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] } } } @@ -56,12 +65,7 @@ test('Scroll search', async t => { for await (const result of scrollSearch) { t.strictEqual(result.body.count, count) - if (count < 3) { - t.strictEqual(result.body._scroll_id, 'id') - } else { - t.strictEqual(result.body._scroll_id, undefined) - } - count += 1 + t.strictEqual(result.body._scroll_id, 'id') } }) @@ -115,21 +119,27 @@ test('Scroll search (retry)', async t => { var count = 0 const MockConnection = connection.buildMockConnection({ onRequest (params) { + count += 1 if (count === 1) { - count += 1 return { body: {}, statusCode: 429 } } + if (count === 5) { + // final automated clear + t.strictEqual(params.method, 'DELETE') + } return { statusCode: 200, body: { - _scroll_id: count === 4 ? undefined : 'id', + _scroll_id: 'id', count, hits: { - hits: [ - { _source: { one: 'one' } }, - { _source: { two: 'two' } }, - { _source: { three: 'three' } } - ] + hits: count === 4 + ? [] + : [ + { _source: { one: 'one' } }, + { _source: { two: 'two' } }, + { _source: { three: 'three' } } + ] } } } @@ -151,12 +161,7 @@ test('Scroll search (retry)', async t => { for await (const result of scrollSearch) { t.strictEqual(result.body.count, count) t.notStrictEqual(result.body.count, 1) - if (count < 4) { - t.strictEqual(result.body._scroll_id, 'id') - } else { - t.strictEqual(result.body._scroll_id, undefined) - } - count += 1 + t.strictEqual(result.body._scroll_id, 'id') } }) @@ -198,20 +203,20 @@ test('Scroll search (retry throws and maxRetries)', async t => { test('Scroll search (retry throws later)', async t => { const maxRetries = 5 - const expectedAttempts = maxRetries + 1 + const expectedAttempts = maxRetries + 2 var count = 0 const MockConnection = connection.buildMockConnection({ onRequest (params) { - // filter_path should not be added if is not already present + count += 1 + // filter_path should not be added if is not already present t.strictEqual(params.querystring, 'scroll=1m') if (count > 1) { - count += 1 return { body: {}, statusCode: 429 } } return { statusCode: 200, body: { - _scroll_id: count === 4 ? undefined : 'id', + _scroll_id: 'id', count, hits: { hits: [ @@ -227,7 +232,8 @@ test('Scroll search (retry throws later)', async t => { const client = new Client({ node: 'http://localhost:9200', - Connection: MockConnection + Connection: MockConnection, + maxRetries }) const scrollSearch = client.helpers.scrollSearch({ @@ -240,7 +246,6 @@ test('Scroll search (retry throws later)', async t => { try { for await (const result of scrollSearch) { // eslint-disable-line t.strictEqual(result.body.count, count) - count += 1 } } catch (err) { t.true(err instanceof errors.ResponseError) @@ -256,19 +261,23 @@ test('Scroll search documents', async t => { if (count === 0) { t.strictEqual(params.querystring, 'filter_path=hits.hits._source%2C_scroll_id&scroll=1m') } else { - t.strictEqual(params.querystring, 'scroll=1m') - t.strictEqual(params.body, '{"scroll_id":"id"}') + if (params.method !== 'DELETE') { + t.strictEqual(params.querystring, 'scroll=1m') + t.strictEqual(params.body, '{"scroll_id":"id"}') + } } return { body: { - _scroll_id: count === 3 ? undefined : 'id', + _scroll_id: 'id', count, hits: { - hits: [ - { _source: { val: 1 * count } }, - { _source: { val: 2 * count } }, - { _source: { val: 3 * count } } - ] + hits: count === 3 + ? [] + : [ + { _source: { val: 1 * count } }, + { _source: { val: 2 * count } }, + { _source: { val: 3 * count } } + ] } } } @@ -339,15 +348,19 @@ test('Fix querystring for scroll search', async t => { if (count === 0) { t.strictEqual(params.querystring, 'size=1&scroll=1m') } else { - t.strictEqual(params.querystring, 'scroll=1m') + if (params.method !== 'DELETE') { + t.strictEqual(params.querystring, 'scroll=1m') + } } return { body: { - _scroll_id: count === 3 ? undefined : 'id', + _scroll_id: 'id', hits: { - hits: [ - { _source: { val: count } } - ] + hits: count === 3 + ? [] + : [ + { _source: { val: count } } + ] } } }