This commit is contained in:
delvedor
2020-10-21 11:07:37 +02:00
8 changed files with 124 additions and 79 deletions

View File

@ -149,7 +149,9 @@ const client = new Client({
const client = new Client({ const client = new Client({
node: 'http://localhost:9200', node: 'http://localhost:9200',
agent: () => new CustomAgent() // the function takes as parameter the option
// object passed to the Connection constructor
agent: (opts) => new CustomAgent()
}) })
const client = new Client({ const client = new Client({

4
lib/Connection.d.ts vendored
View File

@ -28,9 +28,9 @@ import * as https from 'https'
import * as hpagent from 'hpagent' import * as hpagent from 'hpagent'
import { ConnectionOptions as TlsConnectionOptions } from 'tls' import { ConnectionOptions as TlsConnectionOptions } from 'tls'
export declare type agentFn = () => any; export declare type agentFn = (opts: ConnectionOptions) => any;
interface ConnectionOptions { export interface ConnectionOptions {
url: URL; url: URL;
ssl?: TlsConnectionOptions; ssl?: TlsConnectionOptions;
id?: string; id?: string;

View File

@ -53,7 +53,7 @@ class Connection {
} }
if (typeof opts.agent === 'function') { if (typeof opts.agent === 'function') {
this.agent = opts.agent() this.agent = opts.agent(opts)
} else if (opts.agent === false) { } else if (opts.agent === false) {
this.agent = undefined this.agent = undefined
} else { } else {

View File

@ -104,13 +104,15 @@ class Helpers {
} }
while (response.body.hits && response.body.hits.hits.length > 0) { while (response.body.hits && response.body.hits.hits.length > 0) {
// scroll id is always present in the response, but it might
// change over time based on the number of shards
scroll_id = response.body._scroll_id scroll_id = response.body._scroll_id
response.clear = clear response.clear = clear
addDocumentsGetter(response) addDocumentsGetter(response)
yield response yield response
if (!scroll_id || stop === true) { if (stop === true) {
break break
} }
@ -127,6 +129,10 @@ class Helpers {
throw new ResponseError(response) throw new ResponseError(response)
} }
} }
if (stop === false) {
await clear()
}
} }
/** /**

View File

@ -279,20 +279,26 @@ function build (opts = {}) {
// eg: 'Basic ${auth}' we search the stahed value 'auth' // eg: 'Basic ${auth}' we search the stahed value 'auth'
// and the resulting value will be 'Basic valueOfAuth' // and the resulting value will be 'Basic valueOfAuth'
if (typeof val === 'string' && val.includes('${')) { if (typeof val === 'string' && val.includes('${')) {
const start = val.indexOf('${') while (obj[key].includes('${')) {
const end = val.indexOf('}', val.indexOf('${')) const val = obj[key]
const stashedKey = val.slice(start + 2, end) const start = val.indexOf('${')
const stashed = stash.get(stashedKey) const end = val.indexOf('}', val.indexOf('${'))
obj[key] = val.slice(0, start) + stashed + val.slice(end + 1) const stashedKey = val.slice(start + 2, end)
const stashed = stash.get(stashedKey)
obj[key] = val.slice(0, start) + stashed + val.slice(end + 1)
}
continue continue
} }
// handle json strings, eg: '{"hello":"$world"}' // handle json strings, eg: '{"hello":"$world"}'
if (typeof val === 'string' && val.includes('"$')) { if (typeof val === 'string' && val.includes('"$')) {
const start = val.indexOf('"$') while (obj[key].includes('"$')) {
const end = val.indexOf('"', start + 1) const val = obj[key]
const stashedKey = val.slice(start + 2, end) const start = val.indexOf('"$')
const stashed = '"' + stash.get(stashedKey) + '"' const end = val.indexOf('"', start + 1)
obj[key] = val.slice(0, start) + stashed + val.slice(end + 1) const stashedKey = val.slice(start + 2, end)
const stashed = '"' + stash.get(stashedKey) + '"'
obj[key] = val.slice(0, start) + stashed + val.slice(end + 1)
}
continue continue
} }
// if the key value is a string, and the string includes '$' // if the key value is a string, and the string includes '$'

View File

@ -20,22 +20,35 @@
import { expectType } from 'tsd' import { expectType } from 'tsd'
import { URL } from 'url' import { URL } from 'url'
import { Connection } from '../../' import { Connection } from '../../'
import { ConnectionOptions } from '../../lib/Connection'
const conn = new Connection({ {
url: new URL('http://localhost:9200'), const conn = new Connection({
ssl: { ca: 'string' }, url: new URL('http://localhost:9200'),
id: 'id', ssl: { ca: 'string' },
headers: {}, id: 'id',
agent: { keepAlive: false }, headers: {},
status: 'alive', agent: { keepAlive: false },
roles: { master: true }, status: 'alive',
auth: { username: 'username', password: 'password' } roles: { master: true },
}) auth: { username: 'username', password: 'password' }
})
expectType<Connection>(conn) expectType<Connection>(conn)
expectType<URL>(conn.url) expectType<URL>(conn.url)
expectType<string>(conn.id) expectType<string>(conn.id)
expectType<Record<string, any>>(conn.headers) expectType<Record<string, any>>(conn.headers)
expectType<number>(conn.deadCount) expectType<number>(conn.deadCount)
expectType<number>(conn.resurrectTimeout) expectType<number>(conn.resurrectTimeout)
expectType<string>(conn.status) expectType<string>(conn.status)
}
{
const conn = new Connection({
url: new URL('http://localhost:9200'),
agent (opts) {
expectType<ConnectionOptions>(opts)
return 'the agent'
}
})
}

View File

@ -152,7 +152,7 @@ test('Basic (https with ssl agent)', t => {
}) })
test('Custom http agent', t => { test('Custom http agent', t => {
t.plan(5) t.plan(6)
function handler (req, res) { function handler (req, res) {
t.match(req.headers, { t.match(req.headers, {
@ -172,7 +172,12 @@ test('Custom http agent', t => {
agent.custom = true agent.custom = true
const connection = new Connection({ const connection = new Connection({
url: new URL(`http://localhost:${port}`), url: new URL(`http://localhost:${port}`),
agent: () => agent agent: opts => {
t.match(opts, {
url: new URL(`http://localhost:${port}`)
})
return agent
}
}) })
t.true(connection.agent.custom) t.true(connection.agent.custom)
connection.request({ connection.request({

View File

@ -27,17 +27,26 @@ test('Scroll search', async t => {
var count = 0 var count = 0
const MockConnection = connection.buildMockConnection({ const MockConnection = connection.buildMockConnection({
onRequest (params) { onRequest (params) {
t.strictEqual(params.querystring, 'scroll=1m') count += 1
if (params.method === 'POST') {
t.strictEqual(params.querystring, 'scroll=1m')
}
if (count === 4) {
// final automated clear
t.strictEqual(params.method, 'DELETE')
}
return { return {
body: { body: {
_scroll_id: count === 3 ? undefined : 'id', _scroll_id: 'id',
count, count,
hits: { hits: {
hits: [ hits: count === 3
{ _source: { one: 'one' } }, ? []
{ _source: { two: 'two' } }, : [
{ _source: { three: 'three' } } { _source: { one: 'one' } },
] { _source: { two: 'two' } },
{ _source: { three: 'three' } }
]
} }
} }
} }
@ -56,12 +65,7 @@ test('Scroll search', async t => {
for await (const result of scrollSearch) { for await (const result of scrollSearch) {
t.strictEqual(result.body.count, count) t.strictEqual(result.body.count, count)
if (count < 3) { t.strictEqual(result.body._scroll_id, 'id')
t.strictEqual(result.body._scroll_id, 'id')
} else {
t.strictEqual(result.body._scroll_id, undefined)
}
count += 1
} }
}) })
@ -115,21 +119,27 @@ test('Scroll search (retry)', async t => {
var count = 0 var count = 0
const MockConnection = connection.buildMockConnection({ const MockConnection = connection.buildMockConnection({
onRequest (params) { onRequest (params) {
count += 1
if (count === 1) { if (count === 1) {
count += 1
return { body: {}, statusCode: 429 } return { body: {}, statusCode: 429 }
} }
if (count === 5) {
// final automated clear
t.strictEqual(params.method, 'DELETE')
}
return { return {
statusCode: 200, statusCode: 200,
body: { body: {
_scroll_id: count === 4 ? undefined : 'id', _scroll_id: 'id',
count, count,
hits: { hits: {
hits: [ hits: count === 4
{ _source: { one: 'one' } }, ? []
{ _source: { two: 'two' } }, : [
{ _source: { three: 'three' } } { _source: { one: 'one' } },
] { _source: { two: 'two' } },
{ _source: { three: 'three' } }
]
} }
} }
} }
@ -151,12 +161,7 @@ test('Scroll search (retry)', async t => {
for await (const result of scrollSearch) { for await (const result of scrollSearch) {
t.strictEqual(result.body.count, count) t.strictEqual(result.body.count, count)
t.notStrictEqual(result.body.count, 1) t.notStrictEqual(result.body.count, 1)
if (count < 4) { t.strictEqual(result.body._scroll_id, 'id')
t.strictEqual(result.body._scroll_id, 'id')
} else {
t.strictEqual(result.body._scroll_id, undefined)
}
count += 1
} }
}) })
@ -198,20 +203,20 @@ test('Scroll search (retry throws and maxRetries)', async t => {
test('Scroll search (retry throws later)', async t => { test('Scroll search (retry throws later)', async t => {
const maxRetries = 5 const maxRetries = 5
const expectedAttempts = maxRetries + 1 const expectedAttempts = maxRetries + 2
var count = 0 var count = 0
const MockConnection = connection.buildMockConnection({ const MockConnection = connection.buildMockConnection({
onRequest (params) { onRequest (params) {
// filter_path should not be added if is not already present count += 1
// filter_path should not be added if is not already present
t.strictEqual(params.querystring, 'scroll=1m') t.strictEqual(params.querystring, 'scroll=1m')
if (count > 1) { if (count > 1) {
count += 1
return { body: {}, statusCode: 429 } return { body: {}, statusCode: 429 }
} }
return { return {
statusCode: 200, statusCode: 200,
body: { body: {
_scroll_id: count === 4 ? undefined : 'id', _scroll_id: 'id',
count, count,
hits: { hits: {
hits: [ hits: [
@ -227,7 +232,8 @@ test('Scroll search (retry throws later)', async t => {
const client = new Client({ const client = new Client({
node: 'http://localhost:9200', node: 'http://localhost:9200',
Connection: MockConnection Connection: MockConnection,
maxRetries
}) })
const scrollSearch = client.helpers.scrollSearch({ const scrollSearch = client.helpers.scrollSearch({
@ -240,7 +246,6 @@ test('Scroll search (retry throws later)', async t => {
try { try {
for await (const result of scrollSearch) { // eslint-disable-line for await (const result of scrollSearch) { // eslint-disable-line
t.strictEqual(result.body.count, count) t.strictEqual(result.body.count, count)
count += 1
} }
} catch (err) { } catch (err) {
t.true(err instanceof errors.ResponseError) t.true(err instanceof errors.ResponseError)
@ -256,19 +261,23 @@ test('Scroll search documents', async t => {
if (count === 0) { if (count === 0) {
t.strictEqual(params.querystring, 'filter_path=hits.hits._source%2C_scroll_id&scroll=1m') t.strictEqual(params.querystring, 'filter_path=hits.hits._source%2C_scroll_id&scroll=1m')
} else { } else {
t.strictEqual(params.querystring, 'scroll=1m') if (params.method !== 'DELETE') {
t.strictEqual(params.body, '{"scroll_id":"id"}') t.strictEqual(params.querystring, 'scroll=1m')
t.strictEqual(params.body, '{"scroll_id":"id"}')
}
} }
return { return {
body: { body: {
_scroll_id: count === 3 ? undefined : 'id', _scroll_id: 'id',
count, count,
hits: { hits: {
hits: [ hits: count === 3
{ _source: { val: 1 * count } }, ? []
{ _source: { val: 2 * count } }, : [
{ _source: { val: 3 * count } } { _source: { val: 1 * count } },
] { _source: { val: 2 * count } },
{ _source: { val: 3 * count } }
]
} }
} }
} }
@ -339,15 +348,19 @@ test('Fix querystring for scroll search', async t => {
if (count === 0) { if (count === 0) {
t.strictEqual(params.querystring, 'size=1&scroll=1m') t.strictEqual(params.querystring, 'size=1&scroll=1m')
} else { } else {
t.strictEqual(params.querystring, 'scroll=1m') if (params.method !== 'DELETE') {
t.strictEqual(params.querystring, 'scroll=1m')
}
} }
return { return {
body: { body: {
_scroll_id: count === 3 ? undefined : 'id', _scroll_id: 'id',
hits: { hits: {
hits: [ hits: count === 3
{ _source: { val: count } } ? []
] : [
{ _source: { val: count } }
]
} }
} }
} }