Ensure new connections inherit client's set defaults (#2159)
* Add test confirming the issue See https://github.com/elastic/elasticsearch-js/issues/1791 * fix: ensure new connections inherit the client instance's defaults for https://github.com/elastic/elasticsearch-js/issues/1791
This commit is contained in:
@ -44,7 +44,7 @@ import {
|
|||||||
Context
|
Context
|
||||||
} from '@elastic/transport/lib/types'
|
} from '@elastic/transport/lib/types'
|
||||||
import { RedactionOptions } from '@elastic/transport/lib/Transport'
|
import { RedactionOptions } from '@elastic/transport/lib/Transport'
|
||||||
import BaseConnection, { prepareHeaders } from '@elastic/transport/lib/connection/BaseConnection'
|
import BaseConnection, { prepareHeaders, ConnectionOptions } from '@elastic/transport/lib/connection/BaseConnection'
|
||||||
import SniffingTransport from './sniffingTransport'
|
import SniffingTransport from './sniffingTransport'
|
||||||
import Helpers from './helpers'
|
import Helpers from './helpers'
|
||||||
import API from './api'
|
import API from './api'
|
||||||
@ -237,7 +237,35 @@ export default class Client extends API {
|
|||||||
diagnostic: this.diagnostic,
|
diagnostic: this.diagnostic,
|
||||||
caFingerprint: options.caFingerprint
|
caFingerprint: options.caFingerprint
|
||||||
})
|
})
|
||||||
this.connectionPool.addConnection(options.node ?? options.nodes)
|
|
||||||
|
// ensure default connection values are inherited when creating new connections
|
||||||
|
// see https://github.com/elastic/elasticsearch-js/issues/1791
|
||||||
|
const nodes = options.node ?? options.nodes
|
||||||
|
let nodeOptions: Array<string | ConnectionOptions> = Array.isArray(nodes) ? nodes : [nodes]
|
||||||
|
type ConnectionDefaults = Record<string, any>
|
||||||
|
nodeOptions = nodeOptions.map(opt => {
|
||||||
|
const { tls, headers, auth, requestTimeout: timeout, agent, proxy, caFingerprint } = options
|
||||||
|
let defaults: ConnectionDefaults = { tls, headers, auth, timeout, agent, proxy, caFingerprint }
|
||||||
|
|
||||||
|
// strip undefined values from defaults
|
||||||
|
defaults = Object.keys(defaults).reduce((acc: ConnectionDefaults, key) => {
|
||||||
|
const val = defaults[key]
|
||||||
|
if (val !== undefined) acc[key] = val
|
||||||
|
return acc
|
||||||
|
}, {})
|
||||||
|
|
||||||
|
let newOpts
|
||||||
|
if (typeof opt === 'string') {
|
||||||
|
newOpts = {
|
||||||
|
url: new URL(opt)
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
newOpts = opt
|
||||||
|
}
|
||||||
|
|
||||||
|
return { ...defaults, ...newOpts }
|
||||||
|
})
|
||||||
|
this.connectionPool.addConnection(nodeOptions)
|
||||||
}
|
}
|
||||||
|
|
||||||
this.transport = new options.Transport({
|
this.transport = new options.Transport({
|
||||||
@ -282,7 +310,7 @@ export default class Client extends API {
|
|||||||
// Merge the new options with the initial ones
|
// Merge the new options with the initial ones
|
||||||
// @ts-expect-error kChild symbol is for internal use only
|
// @ts-expect-error kChild symbol is for internal use only
|
||||||
const options: ClientOptions = Object.assign({}, this[kInitialOptions], opts)
|
const options: ClientOptions = Object.assign({}, this[kInitialOptions], opts)
|
||||||
// Pass to the child client the parent instances that cannot be overriden
|
// Pass to the child client the parent instances that cannot be overridden
|
||||||
// @ts-expect-error kInitialOptions symbol is for internal use only
|
// @ts-expect-error kInitialOptions symbol is for internal use only
|
||||||
options[kChild] = {
|
options[kChild] = {
|
||||||
connectionPool: this.connectionPool,
|
connectionPool: this.connectionPool,
|
||||||
|
|||||||
@ -17,9 +17,11 @@
|
|||||||
* under the License.
|
* under the License.
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
import * as http from 'http'
|
||||||
import { test } from 'tap'
|
import { test } from 'tap'
|
||||||
import { URL } from 'url'
|
import { URL } from 'url'
|
||||||
import { connection } from '../utils'
|
import FakeTimers from '@sinonjs/fake-timers'
|
||||||
|
import { buildServer, connection } from '../utils'
|
||||||
import { Client, errors } from '../..'
|
import { Client, errors } from '../..'
|
||||||
import * as symbols from '@elastic/transport/lib/symbols'
|
import * as symbols from '@elastic/transport/lib/symbols'
|
||||||
import { BaseConnectionPool, CloudConnectionPool, WeightedConnectionPool } from '@elastic/transport'
|
import { BaseConnectionPool, CloudConnectionPool, WeightedConnectionPool } from '@elastic/transport'
|
||||||
@ -441,3 +443,42 @@ test('user agent is in the correct format', t => {
|
|||||||
t.ok(/^\d+\.\d+\.\d+/.test(agentSplit[0].split('/')[1]))
|
t.ok(/^\d+\.\d+\.\d+/.test(agentSplit[0].split('/')[1]))
|
||||||
t.end()
|
t.end()
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test('Ensure new client instance stores requestTimeout for each connection', t => {
|
||||||
|
const client = new Client({
|
||||||
|
node: { url: new URL('http://localhost:9200') },
|
||||||
|
requestTimeout: 60000,
|
||||||
|
})
|
||||||
|
t.equal(client.connectionPool.connections[0].timeout, 60000)
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
test('Ensure new client does not time out at default (30s) when client sets requestTimeout', async t => {
|
||||||
|
const clock = FakeTimers.install({ toFake: ['setTimeout', 'clearTimeout'] })
|
||||||
|
t.teardown(() => clock.uninstall())
|
||||||
|
|
||||||
|
function handler (_req: http.IncomingMessage, res: http.ServerResponse) {
|
||||||
|
setTimeout(() => {
|
||||||
|
t.pass('timeout ended')
|
||||||
|
res.setHeader('content-type', 'application/json')
|
||||||
|
res.end(JSON.stringify({ success: true }))
|
||||||
|
}, 31000) // default is 30000
|
||||||
|
clock.runToLast()
|
||||||
|
}
|
||||||
|
|
||||||
|
const [{ port }, server] = await buildServer(handler)
|
||||||
|
|
||||||
|
const client = new Client({
|
||||||
|
node: `http://localhost:${port}`,
|
||||||
|
requestTimeout: 60000
|
||||||
|
})
|
||||||
|
|
||||||
|
try {
|
||||||
|
await client.transport.request({ method: 'GET', path: '/' })
|
||||||
|
} catch (error) {
|
||||||
|
t.fail('timeout error hit')
|
||||||
|
} finally {
|
||||||
|
server.stop()
|
||||||
|
t.end()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|||||||
Reference in New Issue
Block a user