Refactored connection pool (#913)
* Refactored ConnectionPool - Created BaseConnectionPool class - Created CloudConnectionPool - connection pool updates are immutable - resurrect now happens inside getConnection() * Rewritten connection pool(s) type definitions * Updated test * Fixed test * Fix if check * Removed old files * Improve code coverage * Updated license header * Fix if check * Improve code coverage * Updated coverage script
This commit is contained in:
committed by
delvedor
parent
90be646658
commit
8e86450aeb
2
index.d.ts
vendored
2
index.d.ts
vendored
@ -18,7 +18,7 @@ import Transport, {
|
|||||||
} from './lib/Transport';
|
} from './lib/Transport';
|
||||||
import { URL } from 'url';
|
import { URL } from 'url';
|
||||||
import Connection, { AgentOptions, agentFn } from './lib/Connection';
|
import Connection, { AgentOptions, agentFn } from './lib/Connection';
|
||||||
import ConnectionPool, { ResurrectEvent, BasicAuth, ApiKeyAuth } from './lib/ConnectionPool';
|
import { ConnectionPool, ResurrectEvent, BasicAuth, ApiKeyAuth } from './lib/pool';
|
||||||
import Serializer from './lib/Serializer';
|
import Serializer from './lib/Serializer';
|
||||||
import * as RequestParams from './api/requestParams';
|
import * as RequestParams from './api/requestParams';
|
||||||
import * as errors from './lib/errors';
|
import * as errors from './lib/errors';
|
||||||
|
|||||||
4
index.js
4
index.js
@ -9,7 +9,7 @@ const { URL } = require('url')
|
|||||||
const debug = require('debug')('elasticsearch')
|
const debug = require('debug')('elasticsearch')
|
||||||
const Transport = require('./lib/Transport')
|
const Transport = require('./lib/Transport')
|
||||||
const Connection = require('./lib/Connection')
|
const Connection = require('./lib/Connection')
|
||||||
const ConnectionPool = require('./lib/ConnectionPool')
|
const { ConnectionPool, CloudConnectionPool } = require('./lib/pool')
|
||||||
const Serializer = require('./lib/Serializer')
|
const Serializer = require('./lib/Serializer')
|
||||||
const errors = require('./lib/errors')
|
const errors = require('./lib/errors')
|
||||||
const { ConfigurationError } = errors
|
const { ConfigurationError } = errors
|
||||||
@ -59,9 +59,9 @@ class Client extends EventEmitter {
|
|||||||
|
|
||||||
const options = Object.assign({}, {
|
const options = Object.assign({}, {
|
||||||
Connection,
|
Connection,
|
||||||
ConnectionPool,
|
|
||||||
Transport,
|
Transport,
|
||||||
Serializer,
|
Serializer,
|
||||||
|
ConnectionPool: opts.cloud ? CloudConnectionPool : ConnectionPool,
|
||||||
maxRetries: 3,
|
maxRetries: 3,
|
||||||
requestTimeout: 30000,
|
requestTimeout: 30000,
|
||||||
pingTimeout: 3000,
|
pingTimeout: 3000,
|
||||||
|
|||||||
2
lib/Connection.d.ts
vendored
2
lib/Connection.d.ts
vendored
@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
import { URL } from 'url';
|
import { URL } from 'url';
|
||||||
import { inspect, InspectOptions } from 'util';
|
import { inspect, InspectOptions } from 'util';
|
||||||
import { ApiKeyAuth, BasicAuth } from './ConnectionPool'
|
import { ApiKeyAuth, BasicAuth } from './pool'
|
||||||
import * as http from 'http';
|
import * as http from 'http';
|
||||||
import { ConnectionOptions as TlsConnectionOptions } from 'tls';
|
import { ConnectionOptions as TlsConnectionOptions } from 'tls';
|
||||||
|
|
||||||
|
|||||||
@ -1,386 +0,0 @@
|
|||||||
// Licensed to Elasticsearch B.V under one or more agreements.
|
|
||||||
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
|
||||||
// See the LICENSE file in the project root for more information
|
|
||||||
|
|
||||||
'use strict'
|
|
||||||
|
|
||||||
const assert = require('assert')
|
|
||||||
const { URL } = require('url')
|
|
||||||
const debug = require('debug')('elasticsearch')
|
|
||||||
const Connection = require('./Connection')
|
|
||||||
const noop = () => {}
|
|
||||||
|
|
||||||
class ConnectionPool {
|
|
||||||
constructor (opts = {}) {
|
|
||||||
this.connections = new Map()
|
|
||||||
this.dead = []
|
|
||||||
this.selector = opts.selector
|
|
||||||
this.auth = opts.auth || null
|
|
||||||
this._ssl = opts.ssl
|
|
||||||
this._agent = opts.agent
|
|
||||||
// the resurrect timeout is 60s
|
|
||||||
this.resurrectTimeout = 1000 * 60
|
|
||||||
// number of consecutive failures after which
|
|
||||||
// the timeout doesn't increase
|
|
||||||
this.resurrectTimeoutCutoff = 5
|
|
||||||
this.pingTimeout = opts.pingTimeout
|
|
||||||
this.Connection = opts.Connection
|
|
||||||
this.emit = opts.emit || noop
|
|
||||||
this._sniffEnabled = opts.sniffEnabled || false
|
|
||||||
|
|
||||||
const resurrectStrategy = opts.resurrectStrategy || 'ping'
|
|
||||||
this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy]
|
|
||||||
assert(
|
|
||||||
this.resurrectStrategy != null,
|
|
||||||
`Invalid resurrection strategy: '${resurrectStrategy}'`
|
|
||||||
)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Marks a connection as 'alive'.
|
|
||||||
* If needed removes the connection from the dead list
|
|
||||||
* and then resets the `deadCount`.
|
|
||||||
* If sniffing is not enabled and there is only
|
|
||||||
* one node, this method is a noop.
|
|
||||||
*
|
|
||||||
* @param {object} connection
|
|
||||||
*/
|
|
||||||
markAlive (connection) {
|
|
||||||
if (this._sniffEnabled === false && this.connections.size === 1) return
|
|
||||||
const { id } = connection
|
|
||||||
debug(`Marking as 'alive' connection '${id}'`)
|
|
||||||
const index = this.dead.indexOf(id)
|
|
||||||
if (index > -1) this.dead.splice(index, 1)
|
|
||||||
connection.status = Connection.statuses.ALIVE
|
|
||||||
connection.deadCount = 0
|
|
||||||
connection.resurrectTimeout = 0
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Marks a connection as 'dead'.
|
|
||||||
* If needed adds the connection to the dead list
|
|
||||||
* and then increments the `deadCount`.
|
|
||||||
* If sniffing is not enabled and there is only
|
|
||||||
* one node, this method is a noop.
|
|
||||||
*
|
|
||||||
* @param {object} connection
|
|
||||||
*/
|
|
||||||
markDead (connection) {
|
|
||||||
if (this._sniffEnabled === false && this.connections.size === 1) return
|
|
||||||
const { id } = connection
|
|
||||||
debug(`Marking as 'dead' connection '${id}'`)
|
|
||||||
if (this.dead.indexOf(id) === -1) {
|
|
||||||
this.dead.push(id)
|
|
||||||
}
|
|
||||||
connection.status = Connection.statuses.DEAD
|
|
||||||
connection.deadCount++
|
|
||||||
// resurrectTimeout formula:
|
|
||||||
// `resurrectTimeout * 2 ** min(deadCount - 1, resurrectTimeoutCutoff)`
|
|
||||||
connection.resurrectTimeout = Date.now() + this.resurrectTimeout * Math.pow(
|
|
||||||
2, Math.min(connection.deadCount - 1, this.resurrectTimeoutCutoff)
|
|
||||||
)
|
|
||||||
|
|
||||||
// sort the dead list in ascending order
|
|
||||||
// based on the resurrectTimeout
|
|
||||||
this.dead.sort((a, b) => {
|
|
||||||
const conn1 = this.connections.get(a)
|
|
||||||
const conn2 = this.connections.get(b)
|
|
||||||
return conn1.resurrectTimeout - conn2.resurrectTimeout
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* If enabled, tries to resurrect a connection with the given
|
|
||||||
* resurrect strategy ('ping', 'optimistic', 'none').
|
|
||||||
*
|
|
||||||
* @param {object} { now, requestId }
|
|
||||||
* @param {function} callback (isAlive, connection)
|
|
||||||
*/
|
|
||||||
resurrect (opts, callback = noop) {
|
|
||||||
if (this.resurrectStrategy === 0 || this.dead.length === 0) {
|
|
||||||
debug('Nothing to resurrect')
|
|
||||||
callback(null, null)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// the dead list is sorted in ascending order based on the timeout
|
|
||||||
// so the first element will always be the one with the smaller timeout
|
|
||||||
const connection = this.connections.get(this.dead[0])
|
|
||||||
if ((opts.now || Date.now()) < connection.resurrectTimeout) {
|
|
||||||
debug('Nothing to resurrect')
|
|
||||||
callback(null, null)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
const { id } = connection
|
|
||||||
|
|
||||||
// ping strategy
|
|
||||||
if (this.resurrectStrategy === 1) {
|
|
||||||
connection.request({
|
|
||||||
method: 'HEAD',
|
|
||||||
path: '/',
|
|
||||||
timeout: this.pingTimeout
|
|
||||||
}, (err, response) => {
|
|
||||||
var isAlive = true
|
|
||||||
const statusCode = response !== null ? response.statusCode : 0
|
|
||||||
if (err != null ||
|
|
||||||
(statusCode === 502 || statusCode === 503 || statusCode === 504)) {
|
|
||||||
debug(`Resurrect: connection '${id}' is still dead`)
|
|
||||||
this.markDead(connection)
|
|
||||||
isAlive = false
|
|
||||||
} else {
|
|
||||||
debug(`Resurrect: connection '${id}' is now alive`)
|
|
||||||
this.markAlive(connection)
|
|
||||||
}
|
|
||||||
this.emit('resurrect', null, {
|
|
||||||
strategy: 'ping',
|
|
||||||
name: opts.name,
|
|
||||||
request: { id: opts.requestId },
|
|
||||||
isAlive,
|
|
||||||
connection
|
|
||||||
})
|
|
||||||
callback(isAlive, connection)
|
|
||||||
})
|
|
||||||
// optimistic strategy
|
|
||||||
} else {
|
|
||||||
debug(`Resurrect: optimistic resurrection for connection '${id}'`)
|
|
||||||
this.dead.splice(this.dead.indexOf(id), 1)
|
|
||||||
connection.status = Connection.statuses.ALIVE
|
|
||||||
this.emit('resurrect', null, {
|
|
||||||
strategy: 'optimistic',
|
|
||||||
name: opts.name,
|
|
||||||
request: { id: opts.requestId },
|
|
||||||
isAlive: true,
|
|
||||||
connection
|
|
||||||
})
|
|
||||||
// eslint-disable-next-line standard/no-callback-literal
|
|
||||||
callback(true, connection)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Returns an alive connection if present,
|
|
||||||
* otherwise returns null.
|
|
||||||
* By default it filters the `master` only nodes.
|
|
||||||
* It uses the selector to choose which
|
|
||||||
* connection return.
|
|
||||||
*
|
|
||||||
* @param {object} options (filter and selector)
|
|
||||||
* @returns {object|null} connection
|
|
||||||
*/
|
|
||||||
getConnection (opts = {}) {
|
|
||||||
const filter = opts.filter || (() => true)
|
|
||||||
const selector = opts.selector || (c => c[0])
|
|
||||||
|
|
||||||
// TODO: can we cache this?
|
|
||||||
const connections = []
|
|
||||||
for (var connection of this.connections.values()) {
|
|
||||||
if (connection.status === Connection.statuses.ALIVE) {
|
|
||||||
if (filter(connection) === true) {
|
|
||||||
connections.push(connection)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if (connections.length === 0) return null
|
|
||||||
|
|
||||||
return selector(connections)
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Adds a new connection to the pool.
|
|
||||||
*
|
|
||||||
* @param {object|string} host
|
|
||||||
* @returns {ConnectionPool}
|
|
||||||
*/
|
|
||||||
addConnection (opts) {
|
|
||||||
if (Array.isArray(opts)) {
|
|
||||||
opts.forEach(o => this.addConnection(o))
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
if (typeof opts === 'string') {
|
|
||||||
opts = this.urlToHost(opts)
|
|
||||||
}
|
|
||||||
|
|
||||||
if (opts.url.username !== '' && opts.url.password !== '') {
|
|
||||||
opts.auth = {
|
|
||||||
username: decodeURIComponent(opts.url.username),
|
|
||||||
password: decodeURIComponent(opts.url.password)
|
|
||||||
}
|
|
||||||
} else if (this.auth !== null) {
|
|
||||||
opts.auth = this.auth
|
|
||||||
}
|
|
||||||
|
|
||||||
if (opts.ssl == null) opts.ssl = this._ssl
|
|
||||||
if (opts.agent == null) opts.agent = this._agent
|
|
||||||
|
|
||||||
const connection = new this.Connection(opts)
|
|
||||||
debug('Adding a new connection', connection)
|
|
||||||
if (this.connections.has(connection.id)) {
|
|
||||||
throw new Error(`Connection with id '${connection.id}' is already present`)
|
|
||||||
}
|
|
||||||
this.connections.set(connection.id, connection)
|
|
||||||
return connection
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Removes a new connection to the pool.
|
|
||||||
*
|
|
||||||
* @param {object} connection
|
|
||||||
* @returns {ConnectionPool}
|
|
||||||
*/
|
|
||||||
removeConnection (connection) {
|
|
||||||
debug('Removing connection', connection)
|
|
||||||
connection.close(noop)
|
|
||||||
const { id } = connection
|
|
||||||
this.connections.delete(id)
|
|
||||||
var index = this.dead.indexOf(id)
|
|
||||||
if (index > -1) this.dead.splice(index, 1)
|
|
||||||
return this
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Empties the connection pool.
|
|
||||||
*
|
|
||||||
* @returns {ConnectionPool}
|
|
||||||
*/
|
|
||||||
empty (callback) {
|
|
||||||
debug('Emptying the connection pool')
|
|
||||||
var openConnections = this.connections.size
|
|
||||||
this.connections.forEach(connection => {
|
|
||||||
connection.close(() => {
|
|
||||||
if (--openConnections === 0) {
|
|
||||||
this.connections = new Map()
|
|
||||||
this.dead = []
|
|
||||||
callback()
|
|
||||||
}
|
|
||||||
})
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Update the ConnectionPool with new connections.
|
|
||||||
*
|
|
||||||
* @param {array} array of connections
|
|
||||||
* @returns {ConnectionPool}
|
|
||||||
*/
|
|
||||||
update (connections) {
|
|
||||||
debug('Updating the connection pool')
|
|
||||||
for (var i = 0; i < connections.length; i++) {
|
|
||||||
const connection = connections[i]
|
|
||||||
// if we already have a given connection in the pool
|
|
||||||
// we check its status, if is 'alive', we do nothing,
|
|
||||||
// if 'dead' we mark it as alive, we do not close the old
|
|
||||||
// one to avoid socket issues
|
|
||||||
if (this.connections.has(connection.id) === true) {
|
|
||||||
debug(`The connection with id '${connection.id}' is already present`)
|
|
||||||
const oldConnection = this.connections.get(connection.id)
|
|
||||||
if (oldConnection.status === Connection.statuses.DEAD) {
|
|
||||||
this.markAlive(oldConnection)
|
|
||||||
}
|
|
||||||
// in case the user has passed a single url (or an array of urls),
|
|
||||||
// the connection id will be the full href; to avoid closing valid connections
|
|
||||||
// because are not present in the pool, we check also the node url,
|
|
||||||
// and if is already present we update its id with the ES provided one.
|
|
||||||
} else if (this.connections.has(connection.url.href) === true) {
|
|
||||||
const oldConnection = this.connections.get(connection.url.href)
|
|
||||||
this.connections.delete(connection.url.href)
|
|
||||||
oldConnection.id = connection.id
|
|
||||||
this.connections.set(connection.id, oldConnection)
|
|
||||||
if (oldConnection.status === Connection.statuses.DEAD) {
|
|
||||||
this.markAlive(oldConnection)
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
this.addConnection(connection)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
const ids = connections.map(c => c.id)
|
|
||||||
// remove all the dead connections and old connections
|
|
||||||
for (const connection of this.connections.values()) {
|
|
||||||
if (ids.indexOf(connection.id) === -1) {
|
|
||||||
this.removeConnection(connection)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return this
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Transforms the nodes objects to a host object.
|
|
||||||
*
|
|
||||||
* @param {object} nodes
|
|
||||||
* @returns {array} hosts
|
|
||||||
*/
|
|
||||||
nodesToHost (nodes, protocol) {
|
|
||||||
const ids = Object.keys(nodes)
|
|
||||||
const hosts = []
|
|
||||||
|
|
||||||
for (var i = 0, len = ids.length; i < len; i++) {
|
|
||||||
const node = nodes[ids[i]]
|
|
||||||
// If there is no protocol in
|
|
||||||
// the `publish_address` new URL will throw
|
|
||||||
// the publish_address can have two forms:
|
|
||||||
// - ip:port
|
|
||||||
// - hostname/ip:port
|
|
||||||
// if we encounter the second case, we should
|
|
||||||
// use the hostname instead of the ip
|
|
||||||
var address = node.http.publish_address
|
|
||||||
const parts = address.split('/')
|
|
||||||
// the url is in the form of hostname/ip:port
|
|
||||||
if (parts.length > 1) {
|
|
||||||
const hostname = parts[0]
|
|
||||||
const port = parts[1].match(/((?::))(?:[0-9]+)$/g)[0].slice(1)
|
|
||||||
address = `${hostname}:${port}`
|
|
||||||
}
|
|
||||||
|
|
||||||
address = address.slice(0, 4) === 'http'
|
|
||||||
? address
|
|
||||||
: `${protocol}//${address}`
|
|
||||||
const roles = node.roles.reduce((acc, role) => {
|
|
||||||
acc[role] = true
|
|
||||||
return acc
|
|
||||||
}, {})
|
|
||||||
|
|
||||||
hosts.push({
|
|
||||||
url: new URL(address),
|
|
||||||
id: ids[i],
|
|
||||||
roles: Object.assign({
|
|
||||||
[Connection.roles.MASTER]: true,
|
|
||||||
[Connection.roles.DATA]: true,
|
|
||||||
[Connection.roles.INGEST]: true,
|
|
||||||
[Connection.roles.ML]: false
|
|
||||||
}, roles)
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
return hosts
|
|
||||||
}
|
|
||||||
|
|
||||||
/**
|
|
||||||
* Transforms an url string to a host object
|
|
||||||
*
|
|
||||||
* @param {string} url
|
|
||||||
* @returns {object} host
|
|
||||||
*/
|
|
||||||
urlToHost (url) {
|
|
||||||
return {
|
|
||||||
url: new URL(url)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
ConnectionPool.resurrectStrategies = {
|
|
||||||
none: 0,
|
|
||||||
ping: 1,
|
|
||||||
optimistic: 2
|
|
||||||
}
|
|
||||||
|
|
||||||
// https://gist.github.com/guilhermepontes/17ae0cc71fa2b13ea8c20c94c5c35dc4
|
|
||||||
// const shuffleArray = arr => arr
|
|
||||||
// .map(a => [Math.random(), a])
|
|
||||||
// .sort((a, b) => a[0] - b[0])
|
|
||||||
// .map(a => a[1])
|
|
||||||
|
|
||||||
module.exports = ConnectionPool
|
|
||||||
6
lib/Transport.d.ts
vendored
6
lib/Transport.d.ts
vendored
@ -2,7 +2,7 @@
|
|||||||
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||||
// See the LICENSE file in the project root for more information
|
// See the LICENSE file in the project root for more information
|
||||||
|
|
||||||
import ConnectionPool from './ConnectionPool';
|
import { ConnectionPool, CloudConnectionPool } from './pool';
|
||||||
import Connection from './Connection';
|
import Connection from './Connection';
|
||||||
import Serializer from './Serializer';
|
import Serializer from './Serializer';
|
||||||
|
|
||||||
@ -23,7 +23,7 @@ declare type emitFn = (event: string | symbol, ...args: any[]) => boolean;
|
|||||||
|
|
||||||
interface TransportOptions {
|
interface TransportOptions {
|
||||||
emit: emitFn & noopFn;
|
emit: emitFn & noopFn;
|
||||||
connectionPool: ConnectionPool;
|
connectionPool: ConnectionPool | CloudConnectionPool;
|
||||||
serializer: Serializer;
|
serializer: Serializer;
|
||||||
maxRetries: number;
|
maxRetries: number;
|
||||||
requestTimeout: number | string;
|
requestTimeout: number | string;
|
||||||
@ -113,7 +113,7 @@ export default class Transport {
|
|||||||
DEFAULT: string;
|
DEFAULT: string;
|
||||||
};
|
};
|
||||||
emit: emitFn & noopFn;
|
emit: emitFn & noopFn;
|
||||||
connectionPool: ConnectionPool;
|
connectionPool: ConnectionPool | CloudConnectionPool;
|
||||||
serializer: Serializer;
|
serializer: Serializer;
|
||||||
maxRetries: number;
|
maxRetries: number;
|
||||||
requestTimeout: number;
|
requestTimeout: number;
|
||||||
|
|||||||
@ -314,10 +314,12 @@ class Transport {
|
|||||||
if (this._sniffEnabled === true && now > this._nextSniff) {
|
if (this._sniffEnabled === true && now > this._nextSniff) {
|
||||||
this.sniff({ reason: Transport.sniffReasons.SNIFF_INTERVAL, requestId: opts.requestId })
|
this.sniff({ reason: Transport.sniffReasons.SNIFF_INTERVAL, requestId: opts.requestId })
|
||||||
}
|
}
|
||||||
this.connectionPool.resurrect({ now, requestId: opts.requestId, name: this.name })
|
|
||||||
return this.connectionPool.getConnection({
|
return this.connectionPool.getConnection({
|
||||||
filter: this.nodeFilter,
|
filter: this.nodeFilter,
|
||||||
selector: this.nodeSelector
|
selector: this.nodeSelector,
|
||||||
|
requestId: opts.requestId,
|
||||||
|
name: this.name,
|
||||||
|
now
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
239
lib/pool/BaseConnectionPool.js
Normal file
239
lib/pool/BaseConnectionPool.js
Normal file
@ -0,0 +1,239 @@
|
|||||||
|
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||||
|
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||||
|
// See the LICENSE file in the project root for more information
|
||||||
|
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const { URL } = require('url')
|
||||||
|
const debug = require('debug')('elasticsearch')
|
||||||
|
const Connection = require('../Connection')
|
||||||
|
const noop = () => {}
|
||||||
|
|
||||||
|
class BaseConnectionPool {
|
||||||
|
constructor (opts) {
|
||||||
|
// list of nodes and weights
|
||||||
|
this.connections = []
|
||||||
|
// how many nodes we have in our scheduler
|
||||||
|
this.size = this.connections.length
|
||||||
|
this.Connection = opts.Connection
|
||||||
|
this.emit = opts.emit || noop
|
||||||
|
this.auth = opts.auth || null
|
||||||
|
this._ssl = opts.ssl
|
||||||
|
this._agent = opts.agent
|
||||||
|
}
|
||||||
|
|
||||||
|
getConnection () {
|
||||||
|
throw new Error('getConnection must be implemented')
|
||||||
|
}
|
||||||
|
|
||||||
|
markAlive () {
|
||||||
|
return this
|
||||||
|
}
|
||||||
|
|
||||||
|
markDead () {
|
||||||
|
return this
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Creates a new connection instance.
|
||||||
|
*/
|
||||||
|
createConnection (opts) {
|
||||||
|
if (typeof opts === 'string') {
|
||||||
|
opts = this.urlToHost(opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
if (opts.url.username !== '' && opts.url.password !== '') {
|
||||||
|
opts.auth = {
|
||||||
|
username: decodeURIComponent(opts.url.username),
|
||||||
|
password: decodeURIComponent(opts.url.password)
|
||||||
|
}
|
||||||
|
} else if (this.auth !== null) {
|
||||||
|
opts.auth = this.auth
|
||||||
|
}
|
||||||
|
|
||||||
|
if (opts.ssl == null) opts.ssl = this._ssl
|
||||||
|
if (opts.agent == null) opts.agent = this._agent
|
||||||
|
|
||||||
|
const connection = new this.Connection(opts)
|
||||||
|
|
||||||
|
for (const conn of this.connections) {
|
||||||
|
if (conn.id === connection.id) {
|
||||||
|
throw new Error(`Connection with id '${connection.id}' is already present`)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return connection
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Adds a new connection to the pool.
|
||||||
|
*
|
||||||
|
* @param {object|string} host
|
||||||
|
* @returns {ConnectionPool}
|
||||||
|
*/
|
||||||
|
addConnection (opts) {
|
||||||
|
if (Array.isArray(opts)) {
|
||||||
|
return opts.forEach(o => this.addConnection(o))
|
||||||
|
}
|
||||||
|
|
||||||
|
if (typeof opts === 'string') {
|
||||||
|
opts = this.urlToHost(opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
const connectionById = this.connections.find(c => c.id === opts.id)
|
||||||
|
const connectionByUrl = this.connections.find(c => c.id === opts.url.href)
|
||||||
|
|
||||||
|
if (connectionById || connectionByUrl) {
|
||||||
|
throw new Error(`Connection with id '${opts.id || opts.url.href}' is already present`)
|
||||||
|
}
|
||||||
|
|
||||||
|
this.update([...this.connections, opts])
|
||||||
|
return this.connections[this.size - 1]
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes a new connection to the pool.
|
||||||
|
*
|
||||||
|
* @param {object} connection
|
||||||
|
* @returns {ConnectionPool}
|
||||||
|
*/
|
||||||
|
removeConnection (connection) {
|
||||||
|
debug('Removing connection', connection)
|
||||||
|
return this.update(this.connections.filter(c => c.id !== connection.id))
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Empties the connection pool.
|
||||||
|
*
|
||||||
|
* @returns {ConnectionPool}
|
||||||
|
*/
|
||||||
|
empty (callback) {
|
||||||
|
debug('Emptying the connection pool')
|
||||||
|
var openConnections = this.size
|
||||||
|
this.connections.forEach(connection => {
|
||||||
|
connection.close(() => {
|
||||||
|
if (--openConnections === 0) {
|
||||||
|
this.connections = []
|
||||||
|
this.size = this.connections.length
|
||||||
|
callback()
|
||||||
|
}
|
||||||
|
})
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the ConnectionPool with new connections.
|
||||||
|
*
|
||||||
|
* @param {array} array of connections
|
||||||
|
* @returns {ConnectionPool}
|
||||||
|
*/
|
||||||
|
update (nodes) {
|
||||||
|
debug('Updating the connection pool')
|
||||||
|
const newConnections = []
|
||||||
|
const oldConnections = []
|
||||||
|
|
||||||
|
for (const node of nodes) {
|
||||||
|
// if we already have a given connection in the pool
|
||||||
|
// we mark it as alive and we do not close the connection
|
||||||
|
// to avoid socket issues
|
||||||
|
const connectionById = this.connections.find(c => c.id === node.id)
|
||||||
|
const connectionByUrl = this.connections.find(c => c.id === node.url.href)
|
||||||
|
if (connectionById) {
|
||||||
|
debug(`The connection with id '${node.id}' is already present`)
|
||||||
|
this.markAlive(connectionById)
|
||||||
|
newConnections.push(connectionById)
|
||||||
|
// in case the user has passed a single url (or an array of urls),
|
||||||
|
// the connection id will be the full href; to avoid closing valid connections
|
||||||
|
// because are not present in the pool, we check also the node url,
|
||||||
|
// and if is already present we update its id with the ES provided one.
|
||||||
|
} else if (connectionByUrl) {
|
||||||
|
connectionByUrl.id = node.id
|
||||||
|
this.markAlive(connectionByUrl)
|
||||||
|
newConnections.push(connectionByUrl)
|
||||||
|
} else {
|
||||||
|
newConnections.push(this.createConnection(node))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
const ids = nodes.map(c => c.id)
|
||||||
|
// remove all the dead connections and old connections
|
||||||
|
for (const connection of this.connections) {
|
||||||
|
if (ids.indexOf(connection.id) === -1) {
|
||||||
|
oldConnections.push(connection)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// close old connections
|
||||||
|
oldConnections.forEach(connection => connection.close())
|
||||||
|
|
||||||
|
this.connections = newConnections
|
||||||
|
this.size = this.connections.length
|
||||||
|
|
||||||
|
return this
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms the nodes objects to a host object.
|
||||||
|
*
|
||||||
|
* @param {object} nodes
|
||||||
|
* @returns {array} hosts
|
||||||
|
*/
|
||||||
|
nodesToHost (nodes, protocol) {
|
||||||
|
const ids = Object.keys(nodes)
|
||||||
|
const hosts = []
|
||||||
|
|
||||||
|
for (var i = 0, len = ids.length; i < len; i++) {
|
||||||
|
const node = nodes[ids[i]]
|
||||||
|
// If there is no protocol in
|
||||||
|
// the `publish_address` new URL will throw
|
||||||
|
// the publish_address can have two forms:
|
||||||
|
// - ip:port
|
||||||
|
// - hostname/ip:port
|
||||||
|
// if we encounter the second case, we should
|
||||||
|
// use the hostname instead of the ip
|
||||||
|
var address = node.http.publish_address
|
||||||
|
const parts = address.split('/')
|
||||||
|
// the url is in the form of hostname/ip:port
|
||||||
|
if (parts.length > 1) {
|
||||||
|
const hostname = parts[0]
|
||||||
|
const port = parts[1].match(/((?::))(?:[0-9]+)$/g)[0].slice(1)
|
||||||
|
address = `${hostname}:${port}`
|
||||||
|
}
|
||||||
|
|
||||||
|
address = address.slice(0, 4) === 'http'
|
||||||
|
? address
|
||||||
|
: `${protocol}//${address}`
|
||||||
|
const roles = node.roles.reduce((acc, role) => {
|
||||||
|
acc[role] = true
|
||||||
|
return acc
|
||||||
|
}, {})
|
||||||
|
|
||||||
|
hosts.push({
|
||||||
|
url: new URL(address),
|
||||||
|
id: ids[i],
|
||||||
|
roles: Object.assign({
|
||||||
|
[Connection.roles.MASTER]: true,
|
||||||
|
[Connection.roles.DATA]: true,
|
||||||
|
[Connection.roles.INGEST]: true,
|
||||||
|
[Connection.roles.ML]: false
|
||||||
|
}, roles)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return hosts
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Transforms an url string to a host object
|
||||||
|
*
|
||||||
|
* @param {string} url
|
||||||
|
* @returns {object} host
|
||||||
|
*/
|
||||||
|
urlToHost (url) {
|
||||||
|
return {
|
||||||
|
url: new URL(url)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = BaseConnectionPool
|
||||||
49
lib/pool/CloudConnectionPool.js
Normal file
49
lib/pool/CloudConnectionPool.js
Normal file
@ -0,0 +1,49 @@
|
|||||||
|
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||||
|
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||||
|
// See the LICENSE file in the project root for more information
|
||||||
|
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const BaseConnectionPool = require('./BaseConnectionPool')
|
||||||
|
|
||||||
|
class CloudConnectionPool extends BaseConnectionPool {
|
||||||
|
constructor (opts = {}) {
|
||||||
|
super(opts)
|
||||||
|
this.cloudConnection = null
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns the only cloud connection.
|
||||||
|
*
|
||||||
|
* @returns {object} connection
|
||||||
|
*/
|
||||||
|
getConnection () {
|
||||||
|
return this.cloudConnection
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Empties the connection pool.
|
||||||
|
*
|
||||||
|
* @returns {ConnectionPool}
|
||||||
|
*/
|
||||||
|
empty (callback) {
|
||||||
|
super.empty(() => {
|
||||||
|
this.cloudConnection = null
|
||||||
|
callback()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the ConnectionPool with new connections.
|
||||||
|
*
|
||||||
|
* @param {array} array of connections
|
||||||
|
* @returns {ConnectionPool}
|
||||||
|
*/
|
||||||
|
update (connections) {
|
||||||
|
super.update(connections)
|
||||||
|
this.cloudConnection = this.connections[0]
|
||||||
|
return this
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = CloudConnectionPool
|
||||||
232
lib/pool/ConnectionPool.js
Normal file
232
lib/pool/ConnectionPool.js
Normal file
@ -0,0 +1,232 @@
|
|||||||
|
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||||
|
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||||
|
// See the LICENSE file in the project root for more information
|
||||||
|
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const BaseConnectionPool = require('./BaseConnectionPool')
|
||||||
|
const assert = require('assert')
|
||||||
|
const debug = require('debug')('elasticsearch')
|
||||||
|
const Connection = require('../Connection')
|
||||||
|
const noop = () => {}
|
||||||
|
|
||||||
|
class ConnectionPool extends BaseConnectionPool {
|
||||||
|
constructor (opts = {}) {
|
||||||
|
super(opts)
|
||||||
|
|
||||||
|
this.dead = []
|
||||||
|
// the resurrect timeout is 60s
|
||||||
|
this.resurrectTimeout = 1000 * 60
|
||||||
|
// number of consecutive failures after which
|
||||||
|
// the timeout doesn't increase
|
||||||
|
this.resurrectTimeoutCutoff = 5
|
||||||
|
this.pingTimeout = opts.pingTimeout
|
||||||
|
this._sniffEnabled = opts.sniffEnabled || false
|
||||||
|
|
||||||
|
const resurrectStrategy = opts.resurrectStrategy || 'ping'
|
||||||
|
this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy]
|
||||||
|
assert(
|
||||||
|
this.resurrectStrategy != null,
|
||||||
|
`Invalid resurrection strategy: '${resurrectStrategy}'`
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marks a connection as 'alive'.
|
||||||
|
* If needed removes the connection from the dead list
|
||||||
|
* and then resets the `deadCount`.
|
||||||
|
* If sniffing is not enabled and there is only
|
||||||
|
* one node, this method is a noop.
|
||||||
|
*
|
||||||
|
* @param {object} connection
|
||||||
|
*/
|
||||||
|
markAlive (connection) {
|
||||||
|
if (this._sniffEnabled === false && this.size === 1) return this
|
||||||
|
const { id } = connection
|
||||||
|
debug(`Marking as 'alive' connection '${id}'`)
|
||||||
|
const index = this.dead.indexOf(id)
|
||||||
|
if (index > -1) this.dead.splice(index, 1)
|
||||||
|
connection.status = Connection.statuses.ALIVE
|
||||||
|
connection.deadCount = 0
|
||||||
|
connection.resurrectTimeout = 0
|
||||||
|
return this
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Marks a connection as 'dead'.
|
||||||
|
* If needed adds the connection to the dead list
|
||||||
|
* and then increments the `deadCount`.
|
||||||
|
* If sniffing is not enabled and there is only
|
||||||
|
* one node, this method is a noop.
|
||||||
|
*
|
||||||
|
* @param {object} connection
|
||||||
|
*/
|
||||||
|
markDead (connection) {
|
||||||
|
if (this._sniffEnabled === false && this.size === 1) return this
|
||||||
|
const { id } = connection
|
||||||
|
debug(`Marking as 'dead' connection '${id}'`)
|
||||||
|
if (this.dead.indexOf(id) === -1) {
|
||||||
|
this.dead.push(id)
|
||||||
|
}
|
||||||
|
connection.status = Connection.statuses.DEAD
|
||||||
|
connection.deadCount++
|
||||||
|
// resurrectTimeout formula:
|
||||||
|
// `resurrectTimeout * 2 ** min(deadCount - 1, resurrectTimeoutCutoff)`
|
||||||
|
connection.resurrectTimeout = Date.now() + this.resurrectTimeout * Math.pow(
|
||||||
|
2, Math.min(connection.deadCount - 1, this.resurrectTimeoutCutoff)
|
||||||
|
)
|
||||||
|
|
||||||
|
// sort the dead list in ascending order
|
||||||
|
// based on the resurrectTimeout
|
||||||
|
this.dead.sort((a, b) => {
|
||||||
|
const conn1 = this.connections.find(c => c.id === a)
|
||||||
|
const conn2 = this.connections.find(c => c.id === b)
|
||||||
|
return conn1.resurrectTimeout - conn2.resurrectTimeout
|
||||||
|
})
|
||||||
|
|
||||||
|
return this
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If enabled, tries to resurrect a connection with the given
|
||||||
|
* resurrect strategy ('ping', 'optimistic', 'none').
|
||||||
|
*
|
||||||
|
* @param {object} { now, requestId }
|
||||||
|
* @param {function} callback (isAlive, connection)
|
||||||
|
*/
|
||||||
|
resurrect (opts, callback = noop) {
|
||||||
|
if (this.resurrectStrategy === 0 || this.dead.length === 0) {
|
||||||
|
debug('Nothing to resurrect')
|
||||||
|
callback(null, null)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// the dead list is sorted in ascending order based on the timeout
|
||||||
|
// so the first element will always be the one with the smaller timeout
|
||||||
|
const connection = this.connections.find(c => c.id === this.dead[0])
|
||||||
|
if ((opts.now || Date.now()) < connection.resurrectTimeout) {
|
||||||
|
debug('Nothing to resurrect')
|
||||||
|
callback(null, null)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const { id } = connection
|
||||||
|
|
||||||
|
// ping strategy
|
||||||
|
if (this.resurrectStrategy === 1) {
|
||||||
|
connection.request({
|
||||||
|
method: 'HEAD',
|
||||||
|
path: '/',
|
||||||
|
timeout: this.pingTimeout
|
||||||
|
}, (err, response) => {
|
||||||
|
var isAlive = true
|
||||||
|
const statusCode = response !== null ? response.statusCode : 0
|
||||||
|
if (err != null ||
|
||||||
|
(statusCode === 502 || statusCode === 503 || statusCode === 504)) {
|
||||||
|
debug(`Resurrect: connection '${id}' is still dead`)
|
||||||
|
this.markDead(connection)
|
||||||
|
isAlive = false
|
||||||
|
} else {
|
||||||
|
debug(`Resurrect: connection '${id}' is now alive`)
|
||||||
|
this.markAlive(connection)
|
||||||
|
}
|
||||||
|
this.emit('resurrect', null, {
|
||||||
|
strategy: 'ping',
|
||||||
|
name: opts.name,
|
||||||
|
request: { id: opts.requestId },
|
||||||
|
isAlive,
|
||||||
|
connection
|
||||||
|
})
|
||||||
|
callback(isAlive, connection)
|
||||||
|
})
|
||||||
|
// optimistic strategy
|
||||||
|
} else {
|
||||||
|
debug(`Resurrect: optimistic resurrection for connection '${id}'`)
|
||||||
|
this.dead.splice(this.dead.indexOf(id), 1)
|
||||||
|
connection.status = Connection.statuses.ALIVE
|
||||||
|
this.emit('resurrect', null, {
|
||||||
|
strategy: 'optimistic',
|
||||||
|
name: opts.name,
|
||||||
|
request: { id: opts.requestId },
|
||||||
|
isAlive: true,
|
||||||
|
connection
|
||||||
|
})
|
||||||
|
// eslint-disable-next-line standard/no-callback-literal
|
||||||
|
callback(true, connection)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Returns an alive connection if present,
|
||||||
|
* otherwise returns null.
|
||||||
|
* By default it filters the `master` only nodes.
|
||||||
|
* It uses the selector to choose which
|
||||||
|
* connection return.
|
||||||
|
*
|
||||||
|
* @param {object} options (filter and selector)
|
||||||
|
* @returns {object|null} connection
|
||||||
|
*/
|
||||||
|
getConnection (opts = {}) {
|
||||||
|
const filter = opts.filter || (() => true)
|
||||||
|
const selector = opts.selector || (c => c[0])
|
||||||
|
|
||||||
|
this.resurrect({
|
||||||
|
now: opts.now,
|
||||||
|
requestId: opts.requestId,
|
||||||
|
name: opts.name
|
||||||
|
})
|
||||||
|
|
||||||
|
// TODO: can we cache this?
|
||||||
|
const connections = []
|
||||||
|
for (var i = 0; i < this.size; i++) {
|
||||||
|
const connection = this.connections[i]
|
||||||
|
if (connection.status === Connection.statuses.ALIVE) {
|
||||||
|
if (filter(connection) === true) {
|
||||||
|
connections.push(connection)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (connections.length === 0) return null
|
||||||
|
|
||||||
|
return selector(connections)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Empties the connection pool.
|
||||||
|
*
|
||||||
|
* @returns {ConnectionPool}
|
||||||
|
*/
|
||||||
|
empty (callback) {
|
||||||
|
super.empty(() => {
|
||||||
|
this.dead = []
|
||||||
|
callback()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Update the ConnectionPool with new connections.
|
||||||
|
*
|
||||||
|
* @param {array} array of connections
|
||||||
|
* @returns {ConnectionPool}
|
||||||
|
*/
|
||||||
|
update (connections) {
|
||||||
|
super.update(connections)
|
||||||
|
|
||||||
|
for (var i = 0; i < this.dead.length; i++) {
|
||||||
|
if (this.connections.find(c => c.id === this.dead[i]) === undefined) {
|
||||||
|
this.dead.splice(i, 1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return this
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
ConnectionPool.resurrectStrategies = {
|
||||||
|
none: 0,
|
||||||
|
ping: 1,
|
||||||
|
optimistic: 2
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = ConnectionPool
|
||||||
117
lib/ConnectionPool.d.ts → lib/pool/index.d.ts
vendored
117
lib/ConnectionPool.d.ts → lib/pool/index.d.ts
vendored
@ -5,24 +5,34 @@
|
|||||||
/// <reference types="node" />
|
/// <reference types="node" />
|
||||||
|
|
||||||
import { SecureContextOptions } from 'tls';
|
import { SecureContextOptions } from 'tls';
|
||||||
import Connection, { AgentOptions } from './Connection';
|
import Connection, { AgentOptions } from '../Connection';
|
||||||
import { nodeFilterFn, nodeSelectorFn } from './Transport';
|
import { nodeFilterFn, nodeSelectorFn } from '../Transport';
|
||||||
|
|
||||||
interface ConnectionPoolOptions {
|
interface BaseConnectionPoolOptions {
|
||||||
ssl?: SecureContextOptions;
|
ssl?: SecureContextOptions;
|
||||||
agent?: AgentOptions;
|
agent?: AgentOptions;
|
||||||
auth: BasicAuth | ApiKeyAuth;
|
auth?: BasicAuth | ApiKeyAuth;
|
||||||
|
emit: (event: string | symbol, ...args: any[]) => boolean;
|
||||||
pingTimeout?: number;
|
pingTimeout?: number;
|
||||||
Connection: typeof Connection;
|
Connection: typeof Connection;
|
||||||
resurrectStrategy?: string;
|
resurrectStrategy?: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface getConnectionOptions {
|
interface ConnectionPoolOptions extends BaseConnectionPoolOptions {
|
||||||
filter?: nodeFilterFn;
|
pingTimeout?: number;
|
||||||
selector?: nodeSelectorFn;
|
resurrectStrategy?: string;
|
||||||
|
sniffEnabled?: boolean;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface ApiKeyAuth {
|
interface getConnectionOptions {
|
||||||
|
filter?: nodeFilterFn;
|
||||||
|
selector?: nodeSelectorFn;
|
||||||
|
requestId?: string | number;
|
||||||
|
name?: string;
|
||||||
|
now?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
interface ApiKeyAuth {
|
||||||
apiKey:
|
apiKey:
|
||||||
| string
|
| string
|
||||||
| {
|
| {
|
||||||
@ -31,18 +41,18 @@ export interface ApiKeyAuth {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface BasicAuth {
|
interface BasicAuth {
|
||||||
username: string;
|
username: string;
|
||||||
password: string;
|
password: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface resurrectOptions {
|
interface resurrectOptions {
|
||||||
now?: number;
|
now?: number;
|
||||||
requestId: string;
|
requestId: string;
|
||||||
name: string;
|
name: string;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface ResurrectEvent {
|
interface ResurrectEvent {
|
||||||
strategy: string;
|
strategy: string;
|
||||||
isAlive: boolean;
|
isAlive: boolean;
|
||||||
connection: Connection;
|
connection: Connection;
|
||||||
@ -52,24 +62,14 @@ export interface ResurrectEvent {
|
|||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
export default class ConnectionPool {
|
|
||||||
static resurrectStrategies: {
|
declare class BaseConnectionPool {
|
||||||
none: number;
|
connections: Connection[];
|
||||||
ping: number;
|
|
||||||
optimistic: number;
|
|
||||||
};
|
|
||||||
connections: any;
|
|
||||||
dead: string[];
|
|
||||||
_ssl: SecureContextOptions | null;
|
_ssl: SecureContextOptions | null;
|
||||||
_agent: AgentOptions | null;
|
_agent: AgentOptions | null;
|
||||||
_sniffEnabled: boolean;
|
|
||||||
resurrectTimeout: number;
|
|
||||||
resurrectTimeoutCutoff: number;
|
|
||||||
pingTimeout: number;
|
|
||||||
auth: BasicAuth | ApiKeyAuth;
|
auth: BasicAuth | ApiKeyAuth;
|
||||||
Connection: typeof Connection;
|
Connection: typeof Connection;
|
||||||
resurrectStrategy: number;
|
constructor(opts?: BaseConnectionPoolOptions);
|
||||||
constructor(opts?: ConnectionPoolOptions);
|
|
||||||
/**
|
/**
|
||||||
* Marks a connection as 'alive'.
|
* Marks a connection as 'alive'.
|
||||||
* If needed removes the connection from the dead list
|
* If needed removes the connection from the dead list
|
||||||
@ -77,7 +77,7 @@ export default class ConnectionPool {
|
|||||||
*
|
*
|
||||||
* @param {object} connection
|
* @param {object} connection
|
||||||
*/
|
*/
|
||||||
markAlive(connection: Connection): void;
|
markAlive(connection: Connection): this;
|
||||||
/**
|
/**
|
||||||
* Marks a connection as 'dead'.
|
* Marks a connection as 'dead'.
|
||||||
* If needed adds the connection to the dead list
|
* If needed adds the connection to the dead list
|
||||||
@ -85,15 +85,7 @@ export default class ConnectionPool {
|
|||||||
*
|
*
|
||||||
* @param {object} connection
|
* @param {object} connection
|
||||||
*/
|
*/
|
||||||
markDead(connection: Connection): void;
|
markDead(connection: Connection): this;
|
||||||
/**
|
|
||||||
* If enabled, tries to resurrect a connection with the given
|
|
||||||
* resurrect strategy ('ping', 'optimistic', 'none').
|
|
||||||
*
|
|
||||||
* @param {object} { now, requestId, name }
|
|
||||||
* @param {function} callback (isAlive, connection)
|
|
||||||
*/
|
|
||||||
resurrect(opts: resurrectOptions, callback?: (isAlive: boolean | null, connection: Connection | null) => void): void;
|
|
||||||
/**
|
/**
|
||||||
* Returns an alive connection if present,
|
* Returns an alive connection if present,
|
||||||
* otherwise returns null.
|
* otherwise returns null.
|
||||||
@ -111,27 +103,27 @@ export default class ConnectionPool {
|
|||||||
* @param {object|string} host
|
* @param {object|string} host
|
||||||
* @returns {ConnectionPool}
|
* @returns {ConnectionPool}
|
||||||
*/
|
*/
|
||||||
addConnection(opts: any): Connection | void;
|
addConnection(opts: any): Connection;
|
||||||
/**
|
/**
|
||||||
* Removes a new connection to the pool.
|
* Removes a new connection to the pool.
|
||||||
*
|
*
|
||||||
* @param {object} connection
|
* @param {object} connection
|
||||||
* @returns {ConnectionPool}
|
* @returns {ConnectionPool}
|
||||||
*/
|
*/
|
||||||
removeConnection(connection: Connection): ConnectionPool;
|
removeConnection(connection: Connection): this;
|
||||||
/**
|
/**
|
||||||
* Empties the connection pool.
|
* Empties the connection pool.
|
||||||
*
|
*
|
||||||
* @returns {ConnectionPool}
|
* @returns {ConnectionPool}
|
||||||
*/
|
*/
|
||||||
empty(): ConnectionPool;
|
empty(): this;
|
||||||
/**
|
/**
|
||||||
* Update the ConnectionPool with new connections.
|
* Update the ConnectionPool with new connections.
|
||||||
*
|
*
|
||||||
* @param {array} array of connections
|
* @param {array} array of connections
|
||||||
* @returns {ConnectionPool}
|
* @returns {ConnectionPool}
|
||||||
*/
|
*/
|
||||||
update(connections: Connection[]): ConnectionPool;
|
update(connections: any[]): this;
|
||||||
/**
|
/**
|
||||||
* Transforms the nodes objects to a host object.
|
* Transforms the nodes objects to a host object.
|
||||||
*
|
*
|
||||||
@ -148,14 +140,57 @@ export default class ConnectionPool {
|
|||||||
urlToHost(url: string): any;
|
urlToHost(url: string): any;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
declare class ConnectionPool extends BaseConnectionPool {
|
||||||
|
static resurrectStrategies: {
|
||||||
|
none: number;
|
||||||
|
ping: number;
|
||||||
|
optimistic: number;
|
||||||
|
};
|
||||||
|
dead: string[];
|
||||||
|
_sniffEnabled: boolean;
|
||||||
|
resurrectTimeout: number;
|
||||||
|
resurrectTimeoutCutoff: number;
|
||||||
|
pingTimeout: number;
|
||||||
|
resurrectStrategy: number;
|
||||||
|
constructor(opts?: ConnectionPoolOptions);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If enabled, tries to resurrect a connection with the given
|
||||||
|
* resurrect strategy ('ping', 'optimistic', 'none').
|
||||||
|
*
|
||||||
|
* @param {object} { now, requestId, name }
|
||||||
|
* @param {function} callback (isAlive, connection)
|
||||||
|
*/
|
||||||
|
resurrect(opts: resurrectOptions, callback?: (isAlive: boolean | null, connection: Connection | null) => void): void;
|
||||||
|
}
|
||||||
|
|
||||||
|
declare class CloudConnectionPool extends BaseConnectionPool {
|
||||||
|
cloudConnection: Connection | null
|
||||||
|
constructor(opts?: BaseConnectionPoolOptions);
|
||||||
|
getConnection(): Connection;
|
||||||
|
}
|
||||||
|
|
||||||
declare function defaultNodeFilter(node: Connection): boolean;
|
declare function defaultNodeFilter(node: Connection): boolean;
|
||||||
declare function roundRobinSelector(): (connections: Connection[]) => Connection;
|
declare function roundRobinSelector(): (connections: Connection[]) => Connection;
|
||||||
declare function randomSelector(connections: Connection[]): Connection;
|
declare function randomSelector(connections: Connection[]): Connection;
|
||||||
|
|
||||||
export declare const internals: {
|
declare const internals: {
|
||||||
defaultNodeFilter: typeof defaultNodeFilter;
|
defaultNodeFilter: typeof defaultNodeFilter;
|
||||||
roundRobinSelector: typeof roundRobinSelector;
|
roundRobinSelector: typeof roundRobinSelector;
|
||||||
randomSelector: typeof randomSelector;
|
randomSelector: typeof randomSelector;
|
||||||
};
|
};
|
||||||
|
|
||||||
export {};
|
export {
|
||||||
|
// Interfaces
|
||||||
|
ConnectionPoolOptions,
|
||||||
|
getConnectionOptions,
|
||||||
|
ApiKeyAuth,
|
||||||
|
BasicAuth,
|
||||||
|
internals,
|
||||||
|
resurrectOptions,
|
||||||
|
ResurrectEvent,
|
||||||
|
// Classes
|
||||||
|
BaseConnectionPool,
|
||||||
|
ConnectionPool,
|
||||||
|
CloudConnectionPool
|
||||||
|
};
|
||||||
15
lib/pool/index.js
Normal file
15
lib/pool/index.js
Normal file
@ -0,0 +1,15 @@
|
|||||||
|
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||||
|
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||||
|
// See the LICENSE file in the project root for more information
|
||||||
|
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const BaseConnectionPool = require('./BaseConnectionPool')
|
||||||
|
const ConnectionPool = require('./ConnectionPool')
|
||||||
|
const CloudConnectionPool = require('./CloudConnectionPool')
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
BaseConnectionPool,
|
||||||
|
ConnectionPool,
|
||||||
|
CloudConnectionPool
|
||||||
|
}
|
||||||
@ -22,7 +22,7 @@
|
|||||||
"test:integration": "tap test/integration/index.js -T --no-coverage",
|
"test:integration": "tap test/integration/index.js -T --no-coverage",
|
||||||
"test:integration:report": "npm run test:integration | tap-mocha-reporter xunit > $WORKSPACE/test-report-junit.xml",
|
"test:integration:report": "npm run test:integration | tap-mocha-reporter xunit > $WORKSPACE/test-report-junit.xml",
|
||||||
"test:types": "tsc --project ./test/types/tsconfig.json",
|
"test:types": "tsc --project ./test/types/tsconfig.json",
|
||||||
"test:coverage": "nyc npm run test:unit && nyc report --reporter=text-lcov > coverage.lcov && codecov",
|
"test:coverage": "nyc tap test/unit/*.test.js test/behavior/*.test.js -t 300 && nyc report --reporter=text-lcov > coverage.lcov && codecov",
|
||||||
"lint": "standard",
|
"lint": "standard",
|
||||||
"lint:fix": "standard --fix",
|
"lint:fix": "standard --fix",
|
||||||
"ci": "npm run license-checker && npm test && npm run test:integration && npm run test:coverage",
|
"ci": "npm run license-checker && npm test && npm run test:integration && npm run test:coverage",
|
||||||
|
|||||||
@ -26,7 +26,7 @@ test('Should update the connection pool', t => {
|
|||||||
const client = new Client({
|
const client = new Client({
|
||||||
node: nodes[Object.keys(nodes)[0]].url
|
node: nodes[Object.keys(nodes)[0]].url
|
||||||
})
|
})
|
||||||
t.strictEqual(client.connectionPool.connections.size, 1)
|
t.strictEqual(client.connectionPool.size, 1)
|
||||||
|
|
||||||
client.on(events.SNIFF, (err, request) => {
|
client.on(events.SNIFF, (err, request) => {
|
||||||
t.error(err)
|
t.error(err)
|
||||||
@ -72,7 +72,7 @@ test('Should update the connection pool', t => {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
t.strictEqual(client.connectionPool.connections.size, 4)
|
t.strictEqual(client.connectionPool.size, 4)
|
||||||
})
|
})
|
||||||
t.teardown(shutdown)
|
t.teardown(shutdown)
|
||||||
})
|
})
|
||||||
@ -85,7 +85,7 @@ test('Should handle hostnames in publish_address', t => {
|
|||||||
const client = new Client({
|
const client = new Client({
|
||||||
node: nodes[Object.keys(nodes)[0]].url
|
node: nodes[Object.keys(nodes)[0]].url
|
||||||
})
|
})
|
||||||
t.strictEqual(client.connectionPool.connections.size, 1)
|
t.strictEqual(client.connectionPool.size, 1)
|
||||||
|
|
||||||
client.on(events.SNIFF, (err, request) => {
|
client.on(events.SNIFF, (err, request) => {
|
||||||
t.error(err)
|
t.error(err)
|
||||||
@ -105,7 +105,7 @@ test('Should handle hostnames in publish_address', t => {
|
|||||||
t.strictEqual(hosts[i].url.hostname, 'localhost')
|
t.strictEqual(hosts[i].url.hostname, 'localhost')
|
||||||
}
|
}
|
||||||
|
|
||||||
t.strictEqual(client.connectionPool.connections.size, 4)
|
t.strictEqual(client.connectionPool.size, 4)
|
||||||
})
|
})
|
||||||
t.teardown(shutdown)
|
t.teardown(shutdown)
|
||||||
})
|
})
|
||||||
@ -125,13 +125,13 @@ test('Sniff interval', t => {
|
|||||||
t.error(err)
|
t.error(err)
|
||||||
const { hosts, reason } = request.meta.sniff
|
const { hosts, reason } = request.meta.sniff
|
||||||
t.strictEqual(
|
t.strictEqual(
|
||||||
client.connectionPool.connections.size,
|
client.connectionPool.size,
|
||||||
hosts.length
|
hosts.length
|
||||||
)
|
)
|
||||||
t.strictEqual(reason, Transport.sniffReasons.SNIFF_INTERVAL)
|
t.strictEqual(reason, Transport.sniffReasons.SNIFF_INTERVAL)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.strictEqual(client.connectionPool.connections.size, 1)
|
t.strictEqual(client.connectionPool.size, 1)
|
||||||
setTimeout(() => client.info(t.error), 60)
|
setTimeout(() => client.info(t.error), 60)
|
||||||
|
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
@ -141,7 +141,7 @@ test('Sniff interval', t => {
|
|||||||
}, 150)
|
}, 150)
|
||||||
|
|
||||||
setTimeout(() => {
|
setTimeout(() => {
|
||||||
t.strictEqual(client.connectionPool.connections.size, 3)
|
t.strictEqual(client.connectionPool.size, 3)
|
||||||
}, 200)
|
}, 200)
|
||||||
|
|
||||||
t.teardown(shutdown)
|
t.teardown(shutdown)
|
||||||
@ -161,13 +161,13 @@ test('Sniff on start', t => {
|
|||||||
t.error(err)
|
t.error(err)
|
||||||
const { hosts, reason } = request.meta.sniff
|
const { hosts, reason } = request.meta.sniff
|
||||||
t.strictEqual(
|
t.strictEqual(
|
||||||
client.connectionPool.connections.size,
|
client.connectionPool.size,
|
||||||
hosts.length
|
hosts.length
|
||||||
)
|
)
|
||||||
t.strictEqual(reason, Transport.sniffReasons.SNIFF_ON_START)
|
t.strictEqual(reason, Transport.sniffReasons.SNIFF_ON_START)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.strictEqual(client.connectionPool.connections.size, 1)
|
t.strictEqual(client.connectionPool.size, 1)
|
||||||
t.teardown(shutdown)
|
t.teardown(shutdown)
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
@ -190,11 +190,11 @@ test('Should not close living connections', t => {
|
|||||||
Connection: MyConnection
|
Connection: MyConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
t.strictEqual(client.connectionPool.connections.size, 1)
|
t.strictEqual(client.connectionPool.size, 1)
|
||||||
client.transport.sniff((err, hosts) => {
|
client.transport.sniff((err, hosts) => {
|
||||||
t.error(err)
|
t.error(err)
|
||||||
t.strictEqual(
|
t.strictEqual(
|
||||||
client.connectionPool.connections.size,
|
client.connectionPool.size,
|
||||||
hosts.length
|
hosts.length
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
@ -228,13 +228,13 @@ test('Sniff on connection fault', t => {
|
|||||||
Connection: MyConnection
|
Connection: MyConnection
|
||||||
})
|
})
|
||||||
|
|
||||||
t.strictEqual(client.connectionPool.connections.size, 2)
|
t.strictEqual(client.connectionPool.size, 2)
|
||||||
// this event will be triggered by the connection fault
|
// this event will be triggered by the connection fault
|
||||||
client.on(events.SNIFF, (err, request) => {
|
client.on(events.SNIFF, (err, request) => {
|
||||||
t.error(err)
|
t.error(err)
|
||||||
const { hosts, reason } = request.meta.sniff
|
const { hosts, reason } = request.meta.sniff
|
||||||
t.strictEqual(
|
t.strictEqual(
|
||||||
client.connectionPool.connections.size,
|
client.connectionPool.size,
|
||||||
hosts.length
|
hosts.length
|
||||||
)
|
)
|
||||||
t.strictEqual(reason, Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT)
|
t.strictEqual(reason, Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT)
|
||||||
|
|||||||
490
test/unit/base-connection-pool.test.js
Normal file
490
test/unit/base-connection-pool.test.js
Normal file
@ -0,0 +1,490 @@
|
|||||||
|
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||||
|
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||||
|
// See the LICENSE file in the project root for more information
|
||||||
|
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const { test } = require('tap')
|
||||||
|
const { URL } = require('url')
|
||||||
|
const BaseConnectionPool = require('../../lib/pool/BaseConnectionPool')
|
||||||
|
const Connection = require('../../lib/Connection')
|
||||||
|
|
||||||
|
test('API', t => {
|
||||||
|
t.test('addConnection', t => {
|
||||||
|
const pool = new BaseConnectionPool({ Connection })
|
||||||
|
const href = 'http://localhost:9200/'
|
||||||
|
pool.addConnection(href)
|
||||||
|
t.ok(pool.connections.find(c => c.id === href) instanceof Connection)
|
||||||
|
t.strictEqual(pool.connections.find(c => c.id === href).status, Connection.statuses.ALIVE)
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('addConnection should throw with two connections with the same id', t => {
|
||||||
|
const pool = new BaseConnectionPool({ Connection })
|
||||||
|
const href = 'http://localhost:9200/'
|
||||||
|
pool.addConnection(href)
|
||||||
|
try {
|
||||||
|
pool.addConnection(href)
|
||||||
|
t.fail('Should throw')
|
||||||
|
} catch (err) {
|
||||||
|
t.is(err.message, `Connection with id '${href}' is already present`)
|
||||||
|
}
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('addConnection should handle not-friendly url parameters for user and password', t => {
|
||||||
|
const pool = new BaseConnectionPool({ Connection })
|
||||||
|
const href = 'http://us"er:p@assword@localhost:9200/'
|
||||||
|
pool.addConnection(href)
|
||||||
|
const conn = pool.connections[0]
|
||||||
|
t.strictEqual(conn.url.username, 'us%22er')
|
||||||
|
t.strictEqual(conn.url.password, 'p%40assword')
|
||||||
|
t.match(conn.headers, {
|
||||||
|
authorization: 'Basic ' + Buffer.from('us"er:p@assword').toString('base64')
|
||||||
|
})
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('markDead', t => {
|
||||||
|
const pool = new BaseConnectionPool({ Connection, sniffEnabled: true })
|
||||||
|
const href = 'http://localhost:9200/'
|
||||||
|
var connection = pool.addConnection(href)
|
||||||
|
t.same(pool.markDead(connection), pool)
|
||||||
|
connection = pool.connections.find(c => c.id === href)
|
||||||
|
t.strictEqual(connection.status, Connection.statuses.ALIVE)
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('markAlive', t => {
|
||||||
|
const pool = new BaseConnectionPool({ Connection, sniffEnabled: true })
|
||||||
|
const href = 'http://localhost:9200/'
|
||||||
|
var connection = pool.addConnection(href)
|
||||||
|
t.same(pool.markAlive(connection), pool)
|
||||||
|
connection = pool.connections.find(c => c.id === href)
|
||||||
|
t.strictEqual(connection.status, Connection.statuses.ALIVE)
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('getConnection should throw', t => {
|
||||||
|
const pool = new BaseConnectionPool({ Connection })
|
||||||
|
const href = 'http://localhost:9200/'
|
||||||
|
pool.addConnection(href)
|
||||||
|
try {
|
||||||
|
pool.getConnection()
|
||||||
|
t.fail('Should fail')
|
||||||
|
} catch (err) {
|
||||||
|
t.is(err.message, 'getConnection must be implemented')
|
||||||
|
}
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('removeConnection', t => {
|
||||||
|
const pool = new BaseConnectionPool({ Connection })
|
||||||
|
const href = 'http://localhost:9200/'
|
||||||
|
var connection = pool.addConnection(href)
|
||||||
|
pool.removeConnection(connection)
|
||||||
|
t.strictEqual(pool.size, 0)
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('empty', t => {
|
||||||
|
const pool = new BaseConnectionPool({ Connection })
|
||||||
|
pool.addConnection('http://localhost:9200/')
|
||||||
|
pool.addConnection('http://localhost:9201/')
|
||||||
|
pool.empty(() => {
|
||||||
|
t.strictEqual(pool.size, 0)
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('urlToHost', t => {
|
||||||
|
const pool = new BaseConnectionPool({ Connection })
|
||||||
|
const url = 'http://localhost:9200'
|
||||||
|
t.deepEqual(
|
||||||
|
pool.urlToHost(url),
|
||||||
|
{ url: new URL(url) }
|
||||||
|
)
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('nodesToHost', t => {
|
||||||
|
t.test('publish_address as ip address (IPv4)', t => {
|
||||||
|
const pool = new BaseConnectionPool({ Connection })
|
||||||
|
const nodes = {
|
||||||
|
a1: {
|
||||||
|
http: {
|
||||||
|
publish_address: '127.0.0.1:9200'
|
||||||
|
},
|
||||||
|
roles: ['master', 'data', 'ingest']
|
||||||
|
},
|
||||||
|
a2: {
|
||||||
|
http: {
|
||||||
|
publish_address: '127.0.0.1:9201'
|
||||||
|
},
|
||||||
|
roles: ['master', 'data', 'ingest']
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.deepEqual(pool.nodesToHost(nodes, 'http:'), [{
|
||||||
|
url: new URL('http://127.0.0.1:9200'),
|
||||||
|
id: 'a1',
|
||||||
|
roles: {
|
||||||
|
master: true,
|
||||||
|
data: true,
|
||||||
|
ingest: true,
|
||||||
|
ml: false
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
url: new URL('http://127.0.0.1:9201'),
|
||||||
|
id: 'a2',
|
||||||
|
roles: {
|
||||||
|
master: true,
|
||||||
|
data: true,
|
||||||
|
ingest: true,
|
||||||
|
ml: false
|
||||||
|
}
|
||||||
|
}])
|
||||||
|
|
||||||
|
t.strictEqual(pool.nodesToHost(nodes, 'http:')[0].url.host, '127.0.0.1:9200')
|
||||||
|
t.strictEqual(pool.nodesToHost(nodes, 'http:')[1].url.host, '127.0.0.1:9201')
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('publish_address as ip address (IPv6)', t => {
|
||||||
|
const pool = new BaseConnectionPool({ Connection })
|
||||||
|
const nodes = {
|
||||||
|
a1: {
|
||||||
|
http: {
|
||||||
|
publish_address: '[::1]:9200'
|
||||||
|
},
|
||||||
|
roles: ['master', 'data', 'ingest']
|
||||||
|
},
|
||||||
|
a2: {
|
||||||
|
http: {
|
||||||
|
publish_address: '[::1]:9201'
|
||||||
|
},
|
||||||
|
roles: ['master', 'data', 'ingest']
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.deepEqual(pool.nodesToHost(nodes, 'http:'), [{
|
||||||
|
url: new URL('http://[::1]:9200'),
|
||||||
|
id: 'a1',
|
||||||
|
roles: {
|
||||||
|
master: true,
|
||||||
|
data: true,
|
||||||
|
ingest: true,
|
||||||
|
ml: false
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
url: new URL('http://[::1]:9201'),
|
||||||
|
id: 'a2',
|
||||||
|
roles: {
|
||||||
|
master: true,
|
||||||
|
data: true,
|
||||||
|
ingest: true,
|
||||||
|
ml: false
|
||||||
|
}
|
||||||
|
}])
|
||||||
|
|
||||||
|
t.strictEqual(pool.nodesToHost(nodes, 'http:')[0].url.host, '[::1]:9200')
|
||||||
|
t.strictEqual(pool.nodesToHost(nodes, 'http:')[1].url.host, '[::1]:9201')
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('publish_address as host/ip (IPv4)', t => {
|
||||||
|
const pool = new BaseConnectionPool({ Connection })
|
||||||
|
const nodes = {
|
||||||
|
a1: {
|
||||||
|
http: {
|
||||||
|
publish_address: 'example.com/127.0.0.1:9200'
|
||||||
|
},
|
||||||
|
roles: ['master', 'data', 'ingest']
|
||||||
|
},
|
||||||
|
a2: {
|
||||||
|
http: {
|
||||||
|
publish_address: 'example.com/127.0.0.1:9201'
|
||||||
|
},
|
||||||
|
roles: ['master', 'data', 'ingest']
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.deepEqual(pool.nodesToHost(nodes, 'http:'), [{
|
||||||
|
url: new URL('http://example.com:9200'),
|
||||||
|
id: 'a1',
|
||||||
|
roles: {
|
||||||
|
master: true,
|
||||||
|
data: true,
|
||||||
|
ingest: true,
|
||||||
|
ml: false
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
url: new URL('http://example.com:9201'),
|
||||||
|
id: 'a2',
|
||||||
|
roles: {
|
||||||
|
master: true,
|
||||||
|
data: true,
|
||||||
|
ingest: true,
|
||||||
|
ml: false
|
||||||
|
}
|
||||||
|
}])
|
||||||
|
|
||||||
|
t.strictEqual(pool.nodesToHost(nodes, 'http:')[0].url.host, 'example.com:9200')
|
||||||
|
t.strictEqual(pool.nodesToHost(nodes, 'http:')[1].url.host, 'example.com:9201')
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('publish_address as host/ip (IPv6)', t => {
|
||||||
|
const pool = new BaseConnectionPool({ Connection })
|
||||||
|
const nodes = {
|
||||||
|
a1: {
|
||||||
|
http: {
|
||||||
|
publish_address: 'example.com/[::1]:9200'
|
||||||
|
},
|
||||||
|
roles: ['master', 'data', 'ingest']
|
||||||
|
},
|
||||||
|
a2: {
|
||||||
|
http: {
|
||||||
|
publish_address: 'example.com/[::1]:9201'
|
||||||
|
},
|
||||||
|
roles: ['master', 'data', 'ingest']
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.deepEqual(pool.nodesToHost(nodes, 'http:'), [{
|
||||||
|
url: new URL('http://example.com:9200'),
|
||||||
|
id: 'a1',
|
||||||
|
roles: {
|
||||||
|
master: true,
|
||||||
|
data: true,
|
||||||
|
ingest: true,
|
||||||
|
ml: false
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
url: new URL('http://example.com:9201'),
|
||||||
|
id: 'a2',
|
||||||
|
roles: {
|
||||||
|
master: true,
|
||||||
|
data: true,
|
||||||
|
ingest: true,
|
||||||
|
ml: false
|
||||||
|
}
|
||||||
|
}])
|
||||||
|
|
||||||
|
t.strictEqual(pool.nodesToHost(nodes, 'http:')[0].url.host, 'example.com:9200')
|
||||||
|
t.strictEqual(pool.nodesToHost(nodes, 'http:')[1].url.host, 'example.com:9201')
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('Should use the configure protocol', t => {
|
||||||
|
const pool = new BaseConnectionPool({ Connection })
|
||||||
|
const nodes = {
|
||||||
|
a1: {
|
||||||
|
http: {
|
||||||
|
publish_address: 'example.com/127.0.0.1:9200'
|
||||||
|
},
|
||||||
|
roles: ['master', 'data', 'ingest']
|
||||||
|
},
|
||||||
|
a2: {
|
||||||
|
http: {
|
||||||
|
publish_address: 'example.com/127.0.0.1:9201'
|
||||||
|
},
|
||||||
|
roles: ['master', 'data', 'ingest']
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
t.strictEqual(pool.nodesToHost(nodes, 'https:')[0].url.protocol, 'https:')
|
||||||
|
t.strictEqual(pool.nodesToHost(nodes, 'http:')[1].url.protocol, 'http:')
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('update', t => {
|
||||||
|
t.test('Should not update existing connections', t => {
|
||||||
|
t.plan(2)
|
||||||
|
const pool = new BaseConnectionPool({ Connection })
|
||||||
|
pool.addConnection([{
|
||||||
|
url: new URL('http://127.0.0.1:9200'),
|
||||||
|
id: 'a1',
|
||||||
|
roles: {
|
||||||
|
master: true,
|
||||||
|
data: true,
|
||||||
|
ingest: true
|
||||||
|
}
|
||||||
|
}, {
|
||||||
|
url: new URL('http://127.0.0.1:9201'),
|
||||||
|
id: 'a2',
|
||||||
|
roles: {
|
||||||
|
master: true,
|
||||||
|
data: true,
|
||||||
|
ingest: true
|
||||||
|
}
|
||||||
|
}])
|
||||||
|
|
||||||
|
pool.update([{
|
||||||
|
url: new URL('http://127.0.0.1:9200'),
|
||||||
|
id: 'a1',
|
||||||
|
roles: null
|
||||||
|
}, {
|
||||||
|
url: new URL('http://127.0.0.1:9201'),
|
||||||
|
id: 'a2',
|
||||||
|
roles: null
|
||||||
|
}])
|
||||||
|
|
||||||
|
t.ok(pool.connections.find(c => c.id === 'a1').roles !== null)
|
||||||
|
t.ok(pool.connections.find(c => c.id === 'a2').roles !== null)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('Should not update existing connections (mark alive)', t => {
|
||||||
|
t.plan(5)
|
||||||
|
class CustomBaseConnectionPool extends BaseConnectionPool {
|
||||||
|
markAlive (connection) {
|
||||||
|
t.ok('called')
|
||||||
|
super.markAlive(connection)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const pool = new CustomBaseConnectionPool({ Connection })
|
||||||
|
const conn1 = pool.addConnection({
|
||||||
|
url: new URL('http://127.0.0.1:9200'),
|
||||||
|
id: 'a1',
|
||||||
|
roles: {
|
||||||
|
master: true,
|
||||||
|
data: true,
|
||||||
|
ingest: true
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
const conn2 = pool.addConnection({
|
||||||
|
url: new URL('http://127.0.0.1:9201'),
|
||||||
|
id: 'a2',
|
||||||
|
roles: {
|
||||||
|
master: true,
|
||||||
|
data: true,
|
||||||
|
ingest: true
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
pool.markDead(conn1)
|
||||||
|
pool.markDead(conn2)
|
||||||
|
|
||||||
|
pool.update([{
|
||||||
|
url: new URL('http://127.0.0.1:9200'),
|
||||||
|
id: 'a1',
|
||||||
|
roles: null
|
||||||
|
}, {
|
||||||
|
url: new URL('http://127.0.0.1:9201'),
|
||||||
|
id: 'a2',
|
||||||
|
roles: null
|
||||||
|
}])
|
||||||
|
|
||||||
|
t.ok(pool.connections.find(c => c.id === 'a1').roles !== null)
|
||||||
|
t.ok(pool.connections.find(c => c.id === 'a2').roles !== null)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('Should not update existing connections (same url, different id)', t => {
|
||||||
|
t.plan(3)
|
||||||
|
class CustomBaseConnectionPool extends BaseConnectionPool {
|
||||||
|
markAlive (connection) {
|
||||||
|
t.ok('called')
|
||||||
|
super.markAlive(connection)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
const pool = new CustomBaseConnectionPool({ Connection })
|
||||||
|
pool.addConnection([{
|
||||||
|
url: new URL('http://127.0.0.1:9200'),
|
||||||
|
id: 'http://127.0.0.1:9200/',
|
||||||
|
roles: {
|
||||||
|
master: true,
|
||||||
|
data: true,
|
||||||
|
ingest: true
|
||||||
|
}
|
||||||
|
}])
|
||||||
|
|
||||||
|
pool.update([{
|
||||||
|
url: new URL('http://127.0.0.1:9200'),
|
||||||
|
id: 'a1',
|
||||||
|
roles: true
|
||||||
|
}])
|
||||||
|
|
||||||
|
// roles will never be updated, we only use it to do
|
||||||
|
// a dummy check to see if the connection has been updated
|
||||||
|
t.deepEqual(pool.connections.find(c => c.id === 'a1').roles, {
|
||||||
|
master: true,
|
||||||
|
data: true,
|
||||||
|
ingest: true,
|
||||||
|
ml: false
|
||||||
|
})
|
||||||
|
t.strictEqual(pool.connections.find(c => c.id === 'http://127.0.0.1:9200/'), undefined)
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('Add a new connection', t => {
|
||||||
|
t.plan(2)
|
||||||
|
const pool = new BaseConnectionPool({ Connection })
|
||||||
|
pool.addConnection({
|
||||||
|
url: new URL('http://127.0.0.1:9200'),
|
||||||
|
id: 'a1',
|
||||||
|
roles: {
|
||||||
|
master: true,
|
||||||
|
data: true,
|
||||||
|
ingest: true
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
pool.update([{
|
||||||
|
url: new URL('http://127.0.0.1:9200'),
|
||||||
|
id: 'a1',
|
||||||
|
roles: null
|
||||||
|
}, {
|
||||||
|
url: new URL('http://127.0.0.1:9201'),
|
||||||
|
id: 'a2',
|
||||||
|
roles: null
|
||||||
|
}])
|
||||||
|
|
||||||
|
t.ok(pool.connections.find(c => c.id === 'a1').roles !== null)
|
||||||
|
t.ok(pool.connections.find(c => c.id === 'a2'))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('Remove old connections', t => {
|
||||||
|
t.plan(3)
|
||||||
|
const pool = new BaseConnectionPool({ Connection })
|
||||||
|
pool.addConnection({
|
||||||
|
url: new URL('http://127.0.0.1:9200'),
|
||||||
|
id: 'a1',
|
||||||
|
roles: null
|
||||||
|
})
|
||||||
|
|
||||||
|
pool.update([{
|
||||||
|
url: new URL('http://127.0.0.1:9200'),
|
||||||
|
id: 'a2',
|
||||||
|
roles: null
|
||||||
|
}, {
|
||||||
|
url: new URL('http://127.0.0.1:9201'),
|
||||||
|
id: 'a3',
|
||||||
|
roles: null
|
||||||
|
}])
|
||||||
|
|
||||||
|
t.false(pool.connections.find(c => c.id === 'a1'))
|
||||||
|
t.true(pool.connections.find(c => c.id === 'a2'))
|
||||||
|
t.true(pool.connections.find(c => c.id === 'a3'))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('CreateConnection', t => {
|
||||||
|
t.plan(1)
|
||||||
|
const pool = new BaseConnectionPool({ Connection })
|
||||||
|
const conn = pool.createConnection('http://localhost:9200')
|
||||||
|
pool.connections.push(conn)
|
||||||
|
try {
|
||||||
|
pool.createConnection('http://localhost:9200')
|
||||||
|
t.fail('Should throw')
|
||||||
|
} catch (err) {
|
||||||
|
t.is(err.message, 'Connection with id \'http://localhost:9200/\' is already present')
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
@ -7,6 +7,7 @@
|
|||||||
const { test } = require('tap')
|
const { test } = require('tap')
|
||||||
const { URL } = require('url')
|
const { URL } = require('url')
|
||||||
const { Client, ConnectionPool, Transport } = require('../../index')
|
const { Client, ConnectionPool, Transport } = require('../../index')
|
||||||
|
const { CloudConnectionPool } = require('../../lib/pool')
|
||||||
const { buildServer } = require('../utils')
|
const { buildServer } = require('../utils')
|
||||||
|
|
||||||
test('Configure host', t => {
|
test('Configure host', t => {
|
||||||
@ -15,7 +16,7 @@ test('Configure host', t => {
|
|||||||
node: 'http://localhost:9200'
|
node: 'http://localhost:9200'
|
||||||
})
|
})
|
||||||
const pool = client.connectionPool
|
const pool = client.connectionPool
|
||||||
t.match(pool.connections.get('http://localhost:9200/'), {
|
t.match(pool.connections.find(c => c.id === 'http://localhost:9200/'), {
|
||||||
url: new URL('http://localhost:9200'),
|
url: new URL('http://localhost:9200'),
|
||||||
id: 'http://localhost:9200/',
|
id: 'http://localhost:9200/',
|
||||||
ssl: null,
|
ssl: null,
|
||||||
@ -36,7 +37,7 @@ test('Configure host', t => {
|
|||||||
nodes: ['http://localhost:9200', 'http://localhost:9201']
|
nodes: ['http://localhost:9200', 'http://localhost:9201']
|
||||||
})
|
})
|
||||||
const pool = client.connectionPool
|
const pool = client.connectionPool
|
||||||
t.match(pool.connections.get('http://localhost:9200/'), {
|
t.match(pool.connections.find(c => c.id === 'http://localhost:9200/'), {
|
||||||
url: new URL('http://localhost:9200'),
|
url: new URL('http://localhost:9200'),
|
||||||
id: 'http://localhost:9200/',
|
id: 'http://localhost:9200/',
|
||||||
ssl: null,
|
ssl: null,
|
||||||
@ -49,7 +50,7 @@ test('Configure host', t => {
|
|||||||
ml: false
|
ml: false
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
t.match(pool.connections.get('http://localhost:9201/'), {
|
t.match(pool.connections.find(c => c.id === 'http://localhost:9201/'), {
|
||||||
url: new URL('http://localhost:9201'),
|
url: new URL('http://localhost:9201'),
|
||||||
id: 'http://localhost:9201/',
|
id: 'http://localhost:9201/',
|
||||||
ssl: null,
|
ssl: null,
|
||||||
@ -80,7 +81,7 @@ test('Configure host', t => {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
const pool = client.connectionPool
|
const pool = client.connectionPool
|
||||||
t.match(pool.connections.get('node'), {
|
t.match(pool.connections.find(c => c.id === 'node'), {
|
||||||
url: new URL('http://localhost:9200'),
|
url: new URL('http://localhost:9200'),
|
||||||
id: 'node',
|
id: 'node',
|
||||||
ssl: 'ssl',
|
ssl: 'ssl',
|
||||||
@ -88,7 +89,7 @@ test('Configure host', t => {
|
|||||||
resurrectTimeout: 0
|
resurrectTimeout: 0
|
||||||
})
|
})
|
||||||
|
|
||||||
t.deepEqual(pool.connections.get('node').roles, {
|
t.deepEqual(pool.connections.find(c => c.id === 'node').roles, {
|
||||||
master: true,
|
master: true,
|
||||||
data: false,
|
data: false,
|
||||||
ingest: false,
|
ingest: false,
|
||||||
@ -121,7 +122,7 @@ test('Configure host', t => {
|
|||||||
}]
|
}]
|
||||||
})
|
})
|
||||||
const pool = client.connectionPool
|
const pool = client.connectionPool
|
||||||
t.match(pool.connections.get('node1'), {
|
t.match(pool.connections.find(c => c.id === 'node1'), {
|
||||||
url: new URL('http://localhost:9200'),
|
url: new URL('http://localhost:9200'),
|
||||||
id: 'node1',
|
id: 'node1',
|
||||||
ssl: 'ssl',
|
ssl: 'ssl',
|
||||||
@ -129,14 +130,14 @@ test('Configure host', t => {
|
|||||||
resurrectTimeout: 0
|
resurrectTimeout: 0
|
||||||
})
|
})
|
||||||
|
|
||||||
t.deepEqual(pool.connections.get('node1').roles, {
|
t.deepEqual(pool.connections.find(c => c.id === 'node1').roles, {
|
||||||
master: true,
|
master: true,
|
||||||
data: false,
|
data: false,
|
||||||
ingest: false,
|
ingest: false,
|
||||||
ml: false
|
ml: false
|
||||||
})
|
})
|
||||||
|
|
||||||
t.match(pool.connections.get('node2'), {
|
t.match(pool.connections.find(c => c.id === 'node2'), {
|
||||||
url: new URL('http://localhost:9200'),
|
url: new URL('http://localhost:9200'),
|
||||||
id: 'node2',
|
id: 'node2',
|
||||||
ssl: 'ssl',
|
ssl: 'ssl',
|
||||||
@ -144,7 +145,7 @@ test('Configure host', t => {
|
|||||||
resurrectTimeout: 0
|
resurrectTimeout: 0
|
||||||
})
|
})
|
||||||
|
|
||||||
t.deepEqual(pool.connections.get('node2').roles, {
|
t.deepEqual(pool.connections.find(c => c.id === 'node2').roles, {
|
||||||
master: false,
|
master: false,
|
||||||
data: true,
|
data: true,
|
||||||
ingest: false,
|
ingest: false,
|
||||||
@ -163,7 +164,7 @@ test('Configure host', t => {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
const pool = client.connectionPool
|
const pool = client.connectionPool
|
||||||
t.match(pool.connections.get('node'), {
|
t.match(pool.connections.find(c => c.id === 'node'), {
|
||||||
url: new URL('http://localhost:9200'),
|
url: new URL('http://localhost:9200'),
|
||||||
headers: { 'x-foo': 'bar' }
|
headers: { 'x-foo': 'bar' }
|
||||||
})
|
})
|
||||||
@ -755,7 +756,7 @@ test('Extend client APIs', t => {
|
|||||||
|
|
||||||
test('Elastic cloud config', t => {
|
test('Elastic cloud config', t => {
|
||||||
t.test('Basic', t => {
|
t.test('Basic', t => {
|
||||||
t.plan(4)
|
t.plan(5)
|
||||||
const client = new Client({
|
const client = new Client({
|
||||||
cloud: {
|
cloud: {
|
||||||
// 'localhost$abcd$efgh'
|
// 'localhost$abcd$efgh'
|
||||||
@ -766,7 +767,8 @@ test('Elastic cloud config', t => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
const pool = client.connectionPool
|
const pool = client.connectionPool
|
||||||
t.match(pool.connections.get('https://abcd.localhost/'), {
|
t.ok(pool instanceof CloudConnectionPool)
|
||||||
|
t.match(pool.connections.find(c => c.id === 'https://abcd.localhost/'), {
|
||||||
url: new URL('https://elastic:changeme@abcd.localhost'),
|
url: new URL('https://elastic:changeme@abcd.localhost'),
|
||||||
id: 'https://abcd.localhost/',
|
id: 'https://abcd.localhost/',
|
||||||
headers: {
|
headers: {
|
||||||
@ -789,7 +791,7 @@ test('Elastic cloud config', t => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.test('Auth as separate option', t => {
|
t.test('Auth as separate option', t => {
|
||||||
t.plan(4)
|
t.plan(5)
|
||||||
const client = new Client({
|
const client = new Client({
|
||||||
cloud: {
|
cloud: {
|
||||||
// 'localhost$abcd$efgh'
|
// 'localhost$abcd$efgh'
|
||||||
@ -802,7 +804,8 @@ test('Elastic cloud config', t => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
const pool = client.connectionPool
|
const pool = client.connectionPool
|
||||||
t.match(pool.connections.get('https://abcd.localhost/'), {
|
t.ok(pool instanceof CloudConnectionPool)
|
||||||
|
t.match(pool.connections.find(c => c.id === 'https://abcd.localhost/'), {
|
||||||
url: new URL('https://elastic:changeme@abcd.localhost'),
|
url: new URL('https://elastic:changeme@abcd.localhost'),
|
||||||
id: 'https://abcd.localhost/',
|
id: 'https://abcd.localhost/',
|
||||||
headers: {
|
headers: {
|
||||||
@ -825,7 +828,7 @@ test('Elastic cloud config', t => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.test('Override default options', t => {
|
t.test('Override default options', t => {
|
||||||
t.plan(3)
|
t.plan(4)
|
||||||
const client = new Client({
|
const client = new Client({
|
||||||
cloud: {
|
cloud: {
|
||||||
// 'localhost$abcd$efgh'
|
// 'localhost$abcd$efgh'
|
||||||
@ -840,6 +843,7 @@ test('Elastic cloud config', t => {
|
|||||||
}
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
|
t.ok(client.connectionPool instanceof CloudConnectionPool)
|
||||||
t.strictEqual(client.transport.compression, false)
|
t.strictEqual(client.transport.compression, false)
|
||||||
t.strictEqual(client.transport.suggestCompression, false)
|
t.strictEqual(client.transport.suggestCompression, false)
|
||||||
t.deepEqual(client.connectionPool._ssl, { secureProtocol: 'TLSv1_1_method' })
|
t.deepEqual(client.connectionPool._ssl, { secureProtocol: 'TLSv1_1_method' })
|
||||||
|
|||||||
33
test/unit/cloud-connection-pool.test.js
Normal file
33
test/unit/cloud-connection-pool.test.js
Normal file
@ -0,0 +1,33 @@
|
|||||||
|
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||||
|
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||||
|
// See the LICENSE file in the project root for more information
|
||||||
|
|
||||||
|
'use strict'
|
||||||
|
|
||||||
|
const { test } = require('tap')
|
||||||
|
const { CloudConnectionPool } = require('../../lib/pool')
|
||||||
|
const Connection = require('../../lib/Connection')
|
||||||
|
|
||||||
|
test('Should expose a cloudConnection property', t => {
|
||||||
|
const pool = new CloudConnectionPool({ Connection })
|
||||||
|
pool.addConnection('http://localhost:9200/')
|
||||||
|
t.ok(pool.cloudConnection instanceof Connection)
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
test('Get connection should always return cloudConnection', t => {
|
||||||
|
const pool = new CloudConnectionPool({ Connection })
|
||||||
|
const conn = pool.addConnection('http://localhost:9200/')
|
||||||
|
t.deepEqual(pool.getConnection(), conn)
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
|
||||||
|
test('pool.empty should reset cloudConnection', t => {
|
||||||
|
const pool = new CloudConnectionPool({ Connection })
|
||||||
|
pool.addConnection('http://localhost:9200/')
|
||||||
|
t.ok(pool.cloudConnection instanceof Connection)
|
||||||
|
pool.empty(() => {
|
||||||
|
t.strictEqual(pool.cloudConnection, null)
|
||||||
|
t.end()
|
||||||
|
})
|
||||||
|
})
|
||||||
@ -6,7 +6,7 @@
|
|||||||
|
|
||||||
const { test } = require('tap')
|
const { test } = require('tap')
|
||||||
const { URL } = require('url')
|
const { URL } = require('url')
|
||||||
const ConnectionPool = require('../../lib/ConnectionPool')
|
const ConnectionPool = require('../../lib/pool/ConnectionPool')
|
||||||
const Connection = require('../../lib/Connection')
|
const Connection = require('../../lib/Connection')
|
||||||
const { defaultNodeFilter, roundRobinSelector } = require('../../lib/Transport').internals
|
const { defaultNodeFilter, roundRobinSelector } = require('../../lib/Transport').internals
|
||||||
const { connection: { MockConnection, MockConnectionTimeout } } = require('../utils')
|
const { connection: { MockConnection, MockConnectionTimeout } } = require('../utils')
|
||||||
@ -16,8 +16,8 @@ test('API', t => {
|
|||||||
const pool = new ConnectionPool({ Connection })
|
const pool = new ConnectionPool({ Connection })
|
||||||
const href = 'http://localhost:9200/'
|
const href = 'http://localhost:9200/'
|
||||||
pool.addConnection(href)
|
pool.addConnection(href)
|
||||||
t.ok(pool.connections.get(href) instanceof Connection)
|
t.ok(pool.connections.find(c => c.id === href) instanceof Connection)
|
||||||
t.strictEqual(pool.connections.get(href).status, Connection.statuses.ALIVE)
|
t.strictEqual(pool.connections.find(c => c.id === href).status, Connection.statuses.ALIVE)
|
||||||
t.deepEqual(pool.dead, [])
|
t.deepEqual(pool.dead, [])
|
||||||
t.end()
|
t.end()
|
||||||
})
|
})
|
||||||
@ -53,7 +53,7 @@ test('API', t => {
|
|||||||
const href = 'http://localhost:9200/'
|
const href = 'http://localhost:9200/'
|
||||||
var connection = pool.addConnection(href)
|
var connection = pool.addConnection(href)
|
||||||
pool.markDead(connection)
|
pool.markDead(connection)
|
||||||
connection = pool.connections.get(href)
|
connection = pool.connections.find(c => c.id === href)
|
||||||
t.strictEqual(connection.deadCount, 1)
|
t.strictEqual(connection.deadCount, 1)
|
||||||
t.true(connection.resurrectTimeout > 0)
|
t.true(connection.resurrectTimeout > 0)
|
||||||
t.deepEqual(pool.dead, [href])
|
t.deepEqual(pool.dead, [href])
|
||||||
@ -80,7 +80,7 @@ test('API', t => {
|
|||||||
var connection = pool.addConnection(href)
|
var connection = pool.addConnection(href)
|
||||||
pool.markDead(connection)
|
pool.markDead(connection)
|
||||||
pool.markAlive(connection)
|
pool.markAlive(connection)
|
||||||
connection = pool.connections.get(href)
|
connection = pool.connections.find(c => c.id === href)
|
||||||
t.strictEqual(connection.deadCount, 0)
|
t.strictEqual(connection.deadCount, 0)
|
||||||
t.strictEqual(connection.resurrectTimeout, 0)
|
t.strictEqual(connection.resurrectTimeout, 0)
|
||||||
t.strictEqual(connection.status, Connection.statuses.ALIVE)
|
t.strictEqual(connection.status, Connection.statuses.ALIVE)
|
||||||
@ -107,7 +107,7 @@ test('API', t => {
|
|||||||
}
|
}
|
||||||
pool.resurrect(opts, (isAlive, connection) => {
|
pool.resurrect(opts, (isAlive, connection) => {
|
||||||
t.true(isAlive)
|
t.true(isAlive)
|
||||||
connection = pool.connections.get(connection.id)
|
connection = pool.connections.find(c => c.id === connection.id)
|
||||||
t.strictEqual(connection.deadCount, 0)
|
t.strictEqual(connection.deadCount, 0)
|
||||||
t.strictEqual(connection.resurrectTimeout, 0)
|
t.strictEqual(connection.resurrectTimeout, 0)
|
||||||
t.strictEqual(connection.status, Connection.statuses.ALIVE)
|
t.strictEqual(connection.status, Connection.statuses.ALIVE)
|
||||||
@ -133,7 +133,7 @@ test('API', t => {
|
|||||||
}
|
}
|
||||||
pool.resurrect(opts, (isAlive, connection) => {
|
pool.resurrect(opts, (isAlive, connection) => {
|
||||||
t.false(isAlive)
|
t.false(isAlive)
|
||||||
connection = pool.connections.get(connection.id)
|
connection = pool.connections.find(c => c.id === connection.id)
|
||||||
t.strictEqual(connection.deadCount, 2)
|
t.strictEqual(connection.deadCount, 2)
|
||||||
t.true(connection.resurrectTimeout > 0)
|
t.true(connection.resurrectTimeout > 0)
|
||||||
t.strictEqual(connection.status, Connection.statuses.DEAD)
|
t.strictEqual(connection.status, Connection.statuses.DEAD)
|
||||||
@ -161,7 +161,7 @@ test('API', t => {
|
|||||||
}
|
}
|
||||||
pool.resurrect(opts, (isAlive, connection) => {
|
pool.resurrect(opts, (isAlive, connection) => {
|
||||||
t.true(isAlive)
|
t.true(isAlive)
|
||||||
connection = pool.connections.get(connection.id)
|
connection = pool.connections.find(c => c.id === connection.id)
|
||||||
t.strictEqual(connection.deadCount, 1)
|
t.strictEqual(connection.deadCount, 1)
|
||||||
t.true(connection.resurrectTimeout > 0)
|
t.true(connection.resurrectTimeout > 0)
|
||||||
t.strictEqual(connection.status, Connection.statuses.ALIVE)
|
t.strictEqual(connection.status, Connection.statuses.ALIVE)
|
||||||
@ -187,7 +187,7 @@ test('API', t => {
|
|||||||
pool.resurrect(opts, (isAlive, connection) => {
|
pool.resurrect(opts, (isAlive, connection) => {
|
||||||
t.ok(isAlive === null)
|
t.ok(isAlive === null)
|
||||||
t.ok(connection === null)
|
t.ok(connection === null)
|
||||||
connection = pool.connections.get(href)
|
connection = pool.connections.find(c => c.id === href)
|
||||||
t.strictEqual(connection.deadCount, 1)
|
t.strictEqual(connection.deadCount, 1)
|
||||||
t.true(connection.resurrectTimeout > 0)
|
t.true(connection.resurrectTimeout > 0)
|
||||||
t.strictEqual(connection.status, Connection.statuses.DEAD)
|
t.strictEqual(connection.status, Connection.statuses.DEAD)
|
||||||
@ -267,7 +267,7 @@ test('API', t => {
|
|||||||
pool.addConnection('http://localhost:9200/')
|
pool.addConnection('http://localhost:9200/')
|
||||||
pool.addConnection('http://localhost:9201/')
|
pool.addConnection('http://localhost:9201/')
|
||||||
pool.empty(() => {
|
pool.empty(() => {
|
||||||
t.strictEqual(pool.connections.size, 0)
|
t.strictEqual(pool.size, 0)
|
||||||
t.deepEqual(pool.dead, [])
|
t.deepEqual(pool.dead, [])
|
||||||
t.end()
|
t.end()
|
||||||
})
|
})
|
||||||
@ -480,12 +480,7 @@ test('API', t => {
|
|||||||
t.test('update', t => {
|
t.test('update', t => {
|
||||||
t.test('Should not update existing connections', t => {
|
t.test('Should not update existing connections', t => {
|
||||||
t.plan(2)
|
t.plan(2)
|
||||||
class CustomConnectionPool extends ConnectionPool {
|
const pool = new ConnectionPool({ Connection })
|
||||||
markAlive () {
|
|
||||||
t.fail('Should not be called')
|
|
||||||
}
|
|
||||||
}
|
|
||||||
const pool = new CustomConnectionPool({ Connection })
|
|
||||||
pool.addConnection([{
|
pool.addConnection([{
|
||||||
url: new URL('http://127.0.0.1:9200'),
|
url: new URL('http://127.0.0.1:9200'),
|
||||||
id: 'a1',
|
id: 'a1',
|
||||||
@ -514,12 +509,12 @@ test('API', t => {
|
|||||||
roles: null
|
roles: null
|
||||||
}])
|
}])
|
||||||
|
|
||||||
t.ok(pool.connections.get('a1').roles !== null)
|
t.ok(pool.connections.find(c => c.id === 'a1').roles !== null)
|
||||||
t.ok(pool.connections.get('a2').roles !== null)
|
t.ok(pool.connections.find(c => c.id === 'a2').roles !== null)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.test('Should not update existing connections (mark alive)', t => {
|
t.test('Should not update existing connections (mark alive)', t => {
|
||||||
t.plan(4)
|
t.plan(5)
|
||||||
class CustomConnectionPool extends ConnectionPool {
|
class CustomConnectionPool extends ConnectionPool {
|
||||||
markAlive (connection) {
|
markAlive (connection) {
|
||||||
t.ok('called')
|
t.ok('called')
|
||||||
@ -560,15 +555,16 @@ test('API', t => {
|
|||||||
roles: null
|
roles: null
|
||||||
}])
|
}])
|
||||||
|
|
||||||
t.ok(pool.connections.get('a1').roles !== null)
|
t.ok(pool.connections.find(c => c.id === 'a1').roles !== null)
|
||||||
t.ok(pool.connections.get('a2').roles !== null)
|
t.ok(pool.connections.find(c => c.id === 'a2').roles !== null)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.test('Should not update existing connections (same url, different id)', t => {
|
t.test('Should not update existing connections (same url, different id)', t => {
|
||||||
t.plan(2)
|
t.plan(3)
|
||||||
class CustomConnectionPool extends ConnectionPool {
|
class CustomConnectionPool extends ConnectionPool {
|
||||||
markAlive () {
|
markAlive (connection) {
|
||||||
t.fail('Should not be called')
|
t.ok('called')
|
||||||
|
super.markAlive(connection)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
const pool = new CustomConnectionPool({ Connection })
|
const pool = new CustomConnectionPool({ Connection })
|
||||||
@ -590,13 +586,13 @@ test('API', t => {
|
|||||||
|
|
||||||
// roles will never be updated, we only use it to do
|
// roles will never be updated, we only use it to do
|
||||||
// a dummy check to see if the connection has been updated
|
// a dummy check to see if the connection has been updated
|
||||||
t.deepEqual(pool.connections.get('a1').roles, {
|
t.deepEqual(pool.connections.find(c => c.id === 'a1').roles, {
|
||||||
master: true,
|
master: true,
|
||||||
data: true,
|
data: true,
|
||||||
ingest: true,
|
ingest: true,
|
||||||
ml: false
|
ml: false
|
||||||
})
|
})
|
||||||
t.strictEqual(pool.connections.get('http://127.0.0.1:9200/'), undefined)
|
t.strictEqual(pool.connections.find(c => c.id === 'http://127.0.0.1:9200/'), undefined)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.test('Add a new connection', t => {
|
t.test('Add a new connection', t => {
|
||||||
@ -622,8 +618,8 @@ test('API', t => {
|
|||||||
roles: null
|
roles: null
|
||||||
}])
|
}])
|
||||||
|
|
||||||
t.ok(pool.connections.get('a1').roles !== null)
|
t.ok(pool.connections.find(c => c.id === 'a1').roles !== null)
|
||||||
t.true(pool.connections.has('a2'))
|
t.ok(pool.connections.find(c => c.id === 'a2'))
|
||||||
})
|
})
|
||||||
|
|
||||||
t.test('Remove old connections', t => {
|
t.test('Remove old connections', t => {
|
||||||
@ -645,9 +641,37 @@ test('API', t => {
|
|||||||
roles: null
|
roles: null
|
||||||
}])
|
}])
|
||||||
|
|
||||||
t.false(pool.connections.has('a1'))
|
t.false(pool.connections.find(c => c.id === 'a1'))
|
||||||
t.true(pool.connections.has('a2'))
|
t.true(pool.connections.find(c => c.id === 'a2'))
|
||||||
t.true(pool.connections.has('a3'))
|
t.true(pool.connections.find(c => c.id === 'a3'))
|
||||||
|
})
|
||||||
|
|
||||||
|
t.test('Remove old connections (markDead)', t => {
|
||||||
|
t.plan(5)
|
||||||
|
const pool = new ConnectionPool({ Connection, sniffEnabled: true })
|
||||||
|
const conn = pool.addConnection({
|
||||||
|
url: new URL('http://127.0.0.1:9200'),
|
||||||
|
id: 'a1',
|
||||||
|
roles: null
|
||||||
|
})
|
||||||
|
|
||||||
|
pool.markDead(conn)
|
||||||
|
t.deepEqual(pool.dead, ['a1'])
|
||||||
|
|
||||||
|
pool.update([{
|
||||||
|
url: new URL('http://127.0.0.1:9200'),
|
||||||
|
id: 'a2',
|
||||||
|
roles: null
|
||||||
|
}, {
|
||||||
|
url: new URL('http://127.0.0.1:9201'),
|
||||||
|
id: 'a3',
|
||||||
|
roles: null
|
||||||
|
}])
|
||||||
|
|
||||||
|
t.deepEqual(pool.dead, [])
|
||||||
|
t.false(pool.connections.find(c => c.id === 'a1'))
|
||||||
|
t.true(pool.connections.find(c => c.id === 'a2'))
|
||||||
|
t.true(pool.connections.find(c => c.id === 'a3'))
|
||||||
})
|
})
|
||||||
|
|
||||||
t.end()
|
t.end()
|
||||||
@ -702,22 +726,22 @@ test('Node filter', t => {
|
|||||||
|
|
||||||
test('Single node behavior', t => {
|
test('Single node behavior', t => {
|
||||||
t.test('sniffing disabled (markDead and markAlive should be noop)', t => {
|
t.test('sniffing disabled (markDead and markAlive should be noop)', t => {
|
||||||
t.plan(2)
|
t.plan(4)
|
||||||
const pool = new ConnectionPool({ Connection, sniffEnabled: false })
|
const pool = new ConnectionPool({ Connection, sniffEnabled: false })
|
||||||
const conn = pool.addConnection('http://localhost:9200/')
|
const conn = pool.addConnection('http://localhost:9200/')
|
||||||
pool.markDead(conn)
|
t.true(pool.markDead(conn) instanceof ConnectionPool)
|
||||||
t.strictEqual(pool.dead.length, 0)
|
t.strictEqual(pool.dead.length, 0)
|
||||||
pool.markAlive(conn)
|
t.true(pool.markAlive(conn) instanceof ConnectionPool)
|
||||||
t.strictEqual(pool.dead.length, 0)
|
t.strictEqual(pool.dead.length, 0)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.test('sniffing enabled (markDead and markAlive should work)', t => {
|
t.test('sniffing enabled (markDead and markAlive should work)', t => {
|
||||||
t.plan(2)
|
t.plan(4)
|
||||||
const pool = new ConnectionPool({ Connection, sniffEnabled: true })
|
const pool = new ConnectionPool({ Connection, sniffEnabled: true })
|
||||||
const conn = pool.addConnection('http://localhost:9200/')
|
const conn = pool.addConnection('http://localhost:9200/')
|
||||||
pool.markDead(conn)
|
t.true(pool.markDead(conn) instanceof ConnectionPool)
|
||||||
t.strictEqual(pool.dead.length, 1)
|
t.strictEqual(pool.dead.length, 1)
|
||||||
pool.markAlive(conn)
|
t.true(pool.markAlive(conn) instanceof ConnectionPool)
|
||||||
t.strictEqual(pool.dead.length, 0)
|
t.strictEqual(pool.dead.length, 0)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@ -733,7 +733,7 @@ test('Util.inspect Connection class should hide agent, ssl and auth', t => {
|
|||||||
resurrectTimeout: 0,
|
resurrectTimeout: 0,
|
||||||
_openRequests: 0,
|
_openRequests: 0,
|
||||||
status: 'alive',
|
status: 'alive',
|
||||||
roles: { master: true, data: true, ingest: true, ml: false } }`)
|
roles: { master: true, data: true, ingest: true, ml: false }}`)
|
||||||
)
|
)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@ -6,6 +6,7 @@
|
|||||||
|
|
||||||
const { test } = require('tap')
|
const { test } = require('tap')
|
||||||
const { URL } = require('url')
|
const { URL } = require('url')
|
||||||
|
const lolex = require('lolex')
|
||||||
const { createGunzip } = require('zlib')
|
const { createGunzip } = require('zlib')
|
||||||
const os = require('os')
|
const os = require('os')
|
||||||
const intoStream = require('into-stream')
|
const intoStream = require('into-stream')
|
||||||
@ -23,7 +24,7 @@ const {
|
|||||||
ConfigurationError
|
ConfigurationError
|
||||||
} = require('../../lib/errors')
|
} = require('../../lib/errors')
|
||||||
|
|
||||||
const ConnectionPool = require('../../lib/ConnectionPool')
|
const ConnectionPool = require('../../lib/pool/ConnectionPool')
|
||||||
const Connection = require('../../lib/Connection')
|
const Connection = require('../../lib/Connection')
|
||||||
const Serializer = require('../../lib/Serializer')
|
const Serializer = require('../../lib/Serializer')
|
||||||
const Transport = require('../../lib/Transport')
|
const Transport = require('../../lib/Transport')
|
||||||
@ -878,148 +879,95 @@ test('Override requestTimeout', t => {
|
|||||||
|
|
||||||
test('sniff', t => {
|
test('sniff', t => {
|
||||||
t.test('sniffOnStart', t => {
|
t.test('sniffOnStart', t => {
|
||||||
t.plan(3)
|
t.plan(1)
|
||||||
|
|
||||||
class CustomConnectionPool extends ConnectionPool {
|
class MyTransport extends Transport {
|
||||||
update () {
|
sniff (opts) {
|
||||||
t.ok('called')
|
t.strictEqual(opts.reason, Transport.sniffReasons.SNIFF_ON_START)
|
||||||
return this
|
|
||||||
}
|
|
||||||
|
|
||||||
nodesToHost (nodes) {
|
|
||||||
t.ok('called')
|
|
||||||
return []
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function handler (req, res) {
|
const pool = new ConnectionPool({ Connection })
|
||||||
t.strictEqual(req.url, '/sniff')
|
pool.addConnection('http://localhost:9200')
|
||||||
res.setHeader('Content-Type', 'application/json;utf=8')
|
|
||||||
res.end(JSON.stringify({ hello: 'world' }))
|
|
||||||
}
|
|
||||||
|
|
||||||
buildServer(handler, ({ port }, server) => {
|
// eslint-disable-next-line
|
||||||
const pool = new CustomConnectionPool({ Connection })
|
new MyTransport({
|
||||||
pool.addConnection(`http://localhost:${port}`)
|
emit: () => {},
|
||||||
|
connectionPool: pool,
|
||||||
// eslint-disable-next-line
|
serializer: new Serializer(),
|
||||||
new Transport({
|
maxRetries: 3,
|
||||||
emit: () => {},
|
requestTimeout: 30000,
|
||||||
connectionPool: pool,
|
sniffInterval: false,
|
||||||
serializer: new Serializer(),
|
sniffOnStart: true,
|
||||||
maxRetries: 3,
|
sniffEndpoint: '/sniff'
|
||||||
requestTimeout: 30000,
|
|
||||||
sniffInterval: false,
|
|
||||||
sniffOnStart: true,
|
|
||||||
sniffEndpoint: '/sniff'
|
|
||||||
})
|
|
||||||
|
|
||||||
setTimeout(() => server.stop(), 100)
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.test('sniffOnConnectionFault', t => {
|
t.test('sniffOnConnectionFault', t => {
|
||||||
t.plan(3)
|
t.plan(2)
|
||||||
|
|
||||||
class CustomConnectionPool extends ConnectionPool {
|
class MyTransport extends Transport {
|
||||||
update () {
|
sniff (opts) {
|
||||||
t.ok('called')
|
t.strictEqual(opts.reason, Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT)
|
||||||
return this
|
|
||||||
}
|
|
||||||
|
|
||||||
nodesToHost (nodes) {
|
|
||||||
t.ok('called')
|
|
||||||
return []
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function handler (req, res) {
|
const pool = new ConnectionPool({ Connection: MockConnectionTimeout })
|
||||||
if (req.url === '/other/sniff') {
|
pool.addConnection('http://localhost:9200')
|
||||||
res.setHeader('Content-Type', 'application/json;utf=8')
|
|
||||||
res.end(JSON.stringify({ hello: 'world' }))
|
|
||||||
} else {
|
|
||||||
setTimeout(() => res.end(), 1000)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
buildServer(handler, ({ port }, server) => {
|
const transport = new MyTransport({
|
||||||
const pool = new CustomConnectionPool({ Connection })
|
emit: () => {},
|
||||||
pool.addConnection(`http://localhost:${port}`)
|
connectionPool: pool,
|
||||||
pool.addConnection(`http://localhost:${port}/other`)
|
serializer: new Serializer(),
|
||||||
|
maxRetries: 0,
|
||||||
|
requestTimeout: 500,
|
||||||
|
sniffInterval: false,
|
||||||
|
sniffOnConnectionFault: true,
|
||||||
|
sniffEndpoint: '/sniff'
|
||||||
|
})
|
||||||
|
|
||||||
const transport = new Transport({
|
transport.request({
|
||||||
emit: () => {},
|
method: 'GET',
|
||||||
connectionPool: pool,
|
path: '/'
|
||||||
serializer: new Serializer(),
|
}, (err, { body }) => {
|
||||||
maxRetries: 0,
|
t.ok(err instanceof TimeoutError)
|
||||||
requestTimeout: 500,
|
|
||||||
sniffInterval: false,
|
|
||||||
sniffOnConnectionFault: true,
|
|
||||||
sniffEndpoint: '/sniff'
|
|
||||||
})
|
|
||||||
|
|
||||||
transport.request({
|
|
||||||
method: 'GET',
|
|
||||||
path: '/'
|
|
||||||
}, (err, { body }) => {
|
|
||||||
t.ok(err instanceof TimeoutError)
|
|
||||||
})
|
|
||||||
|
|
||||||
setTimeout(() => server.stop(), 1100)
|
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
t.test('sniffInterval', t => {
|
t.test('sniffInterval', t => {
|
||||||
t.plan(9)
|
t.plan(6)
|
||||||
|
|
||||||
class CustomConnectionPool extends ConnectionPool {
|
const clock = lolex.install({ toFake: ['Date'] })
|
||||||
update () {
|
t.teardown(() => clock.uninstall())
|
||||||
return this
|
|
||||||
}
|
|
||||||
|
|
||||||
nodesToHost (nodes) {
|
class MyTransport extends Transport {
|
||||||
return []
|
sniff (opts) {
|
||||||
|
t.strictEqual(opts.reason, Transport.sniffReasons.SNIFF_INTERVAL)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function handler (req, res) {
|
const pool = new ConnectionPool({ Connection: MockConnection })
|
||||||
// this should be called 6 times
|
pool.addConnection('http://localhost:9200')
|
||||||
t.ok('called')
|
|
||||||
res.setHeader('Content-Type', 'application/json;utf=8')
|
|
||||||
res.end(JSON.stringify({ hello: 'world' }))
|
|
||||||
}
|
|
||||||
|
|
||||||
buildServer(handler, ({ port }, server) => {
|
const transport = new MyTransport({
|
||||||
const pool = new CustomConnectionPool({ Connection })
|
emit: () => {},
|
||||||
pool.addConnection(`http://localhost:${port}`)
|
connectionPool: pool,
|
||||||
|
serializer: new Serializer(),
|
||||||
const transport = new Transport({
|
maxRetries: 3,
|
||||||
emit: () => {},
|
requestTimeout: 3000,
|
||||||
connectionPool: pool,
|
sniffInterval: 1,
|
||||||
serializer: new Serializer(),
|
sniffEndpoint: '/sniff'
|
||||||
maxRetries: 3,
|
|
||||||
requestTimeout: 3000,
|
|
||||||
sniffInterval: 1,
|
|
||||||
sniffEndpoint: '/sniff'
|
|
||||||
})
|
|
||||||
|
|
||||||
const params = { method: 'GET', path: '/' }
|
|
||||||
setTimeout(() => {
|
|
||||||
transport.request(params, t.error)
|
|
||||||
}, 100)
|
|
||||||
|
|
||||||
setTimeout(() => {
|
|
||||||
transport.request(params, t.error)
|
|
||||||
}, 200)
|
|
||||||
|
|
||||||
setTimeout(() => {
|
|
||||||
transport.request(params, t.error)
|
|
||||||
}, 300)
|
|
||||||
|
|
||||||
setTimeout(() => {
|
|
||||||
server.stop()
|
|
||||||
}, 400)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
|
const params = { method: 'GET', path: '/' }
|
||||||
|
clock.tick(100)
|
||||||
|
transport.request(params, t.error)
|
||||||
|
|
||||||
|
clock.tick(200)
|
||||||
|
transport.request(params, t.error)
|
||||||
|
|
||||||
|
clock.tick(300)
|
||||||
|
transport.request(params, t.error)
|
||||||
})
|
})
|
||||||
|
|
||||||
t.test('errored', t => {
|
t.test('errored', t => {
|
||||||
|
|||||||
Reference in New Issue
Block a user