Support for Elasticsearch 7.3 (#928)
This commit is contained in:
committed by
GitHub
parent
823c209c32
commit
8c78f47ac3
23
lib/Connection.d.ts
vendored
23
lib/Connection.d.ts
vendored
@ -1,26 +1,12 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
// See the LICENSE file in the project root for more information
|
||||
|
||||
/// <reference types="node" />
|
||||
|
||||
import { URL } from 'url';
|
||||
import { inspect, InspectOptions } from 'util';
|
||||
import { ApiKeyAuth, BasicAuth } from './pool'
|
||||
import * as http from 'http';
|
||||
import { ConnectionOptions as TlsConnectionOptions } from 'tls';
|
||||
|
||||
@ -34,6 +20,7 @@ interface ConnectionOptions {
|
||||
agent?: AgentOptions | agentFn;
|
||||
status?: string;
|
||||
roles?: any;
|
||||
auth?: BasicAuth | ApiKeyAuth;
|
||||
}
|
||||
|
||||
interface RequestOptions extends http.ClientRequestArgs {
|
||||
|
||||
@ -1,21 +1,6 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
// See the LICENSE file in the project root for more information
|
||||
|
||||
'use strict'
|
||||
|
||||
@ -34,8 +19,7 @@ class Connection {
|
||||
this.url = opts.url
|
||||
this.ssl = opts.ssl || null
|
||||
this.id = opts.id || stripAuth(opts.url.href)
|
||||
this.headers = opts.headers || null
|
||||
this.auth = opts.auth || { username: null, password: null }
|
||||
this.headers = prepareHeaders(opts.headers, opts.auth)
|
||||
this.deadCount = 0
|
||||
this.resurrectTimeout = 0
|
||||
|
||||
@ -181,7 +165,6 @@ class Connection {
|
||||
|
||||
buildRequestObject (params) {
|
||||
const url = this.url
|
||||
const { username, password } = this.auth
|
||||
const request = {
|
||||
protocol: url.protocol,
|
||||
hostname: url.hostname[0] === '['
|
||||
@ -196,9 +179,6 @@ class Connection {
|
||||
// https://github.com/elastic/elasticsearch-js/issues/843
|
||||
port: url.port !== '' ? url.port : undefined,
|
||||
headers: this.headers,
|
||||
auth: username != null && password != null
|
||||
? `${username}:${password}`
|
||||
: undefined,
|
||||
agent: this.agent
|
||||
}
|
||||
|
||||
@ -230,10 +210,15 @@ class Connection {
|
||||
// the logs very hard to read. The user can still
|
||||
// access them with `instance.agent` and `instance.ssl`.
|
||||
[inspect.custom] (depth, options) {
|
||||
const {
|
||||
authorization,
|
||||
...headers
|
||||
} = this.headers
|
||||
|
||||
return {
|
||||
url: stripAuth(this.url.toString()),
|
||||
id: this.id,
|
||||
headers: this.headers,
|
||||
headers,
|
||||
deadCount: this.deadCount,
|
||||
resurrectTimeout: this.resurrectTimeout,
|
||||
_openRequests: this._openRequests,
|
||||
@ -243,10 +228,15 @@ class Connection {
|
||||
}
|
||||
|
||||
toJSON () {
|
||||
const {
|
||||
authorization,
|
||||
...headers
|
||||
} = this.headers
|
||||
|
||||
return {
|
||||
url: stripAuth(this.url.toString()),
|
||||
id: this.id,
|
||||
headers: this.headers,
|
||||
headers,
|
||||
deadCount: this.deadCount,
|
||||
resurrectTimeout: this.resurrectTimeout,
|
||||
_openRequests: this._openRequests,
|
||||
@ -302,4 +292,21 @@ function resolve (host, path) {
|
||||
}
|
||||
}
|
||||
|
||||
function prepareHeaders (headers = {}, auth) {
|
||||
if (auth != null && headers.authorization == null) {
|
||||
if (auth.username && auth.password) {
|
||||
headers.authorization = 'Basic ' + Buffer.from(`${auth.username}:${auth.password}`).toString('base64')
|
||||
}
|
||||
|
||||
if (auth.apiKey) {
|
||||
if (typeof auth.apiKey === 'object') {
|
||||
headers.authorization = 'ApiKey ' + Buffer.from(`${auth.apiKey.id}:${auth.apiKey.api_key}`).toString('base64')
|
||||
} else {
|
||||
headers.authorization = `ApiKey ${auth.apiKey}`
|
||||
}
|
||||
}
|
||||
}
|
||||
return headers
|
||||
}
|
||||
|
||||
module.exports = Connection
|
||||
|
||||
@ -1,410 +0,0 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
|
||||
'use strict'
|
||||
|
||||
const assert = require('assert')
|
||||
const { URL } = require('url')
|
||||
const debug = require('debug')('elasticsearch')
|
||||
const Connection = require('./Connection')
|
||||
const noop = () => {}
|
||||
|
||||
class ConnectionPool {
|
||||
constructor (opts = {}) {
|
||||
this.connections = new Map()
|
||||
this.dead = []
|
||||
this.selector = opts.selector
|
||||
this._auth = null
|
||||
this._ssl = opts.ssl
|
||||
this._agent = opts.agent
|
||||
// the resurrect timeout is 60s
|
||||
this.resurrectTimeout = 1000 * 60
|
||||
// number of consecutive failures after which
|
||||
// the timeout doesn't increase
|
||||
this.resurrectTimeoutCutoff = 5
|
||||
this.pingTimeout = opts.pingTimeout
|
||||
this.Connection = opts.Connection
|
||||
this.emit = opts.emit || noop
|
||||
this._sniffEnabled = opts.sniffEnabled || false
|
||||
|
||||
const resurrectStrategy = opts.resurrectStrategy || 'ping'
|
||||
this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy]
|
||||
assert(
|
||||
this.resurrectStrategy != null,
|
||||
`Invalid resurrection strategy: '${resurrectStrategy}'`
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks a connection as 'alive'.
|
||||
* If needed removes the connection from the dead list
|
||||
* and then resets the `deadCount`.
|
||||
* If sniffing is not enabled and there is only
|
||||
* one node, this method is a noop.
|
||||
*
|
||||
* @param {object} connection
|
||||
*/
|
||||
markAlive (connection) {
|
||||
if (this._sniffEnabled === false && this.connections.size === 1) return
|
||||
const { id } = connection
|
||||
debug(`Marking as 'alive' connection '${id}'`)
|
||||
const index = this.dead.indexOf(id)
|
||||
if (index > -1) this.dead.splice(index, 1)
|
||||
connection.status = Connection.statuses.ALIVE
|
||||
connection.deadCount = 0
|
||||
connection.resurrectTimeout = 0
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks a connection as 'dead'.
|
||||
* If needed adds the connection to the dead list
|
||||
* and then increments the `deadCount`.
|
||||
* If sniffing is not enabled and there is only
|
||||
* one node, this method is a noop.
|
||||
*
|
||||
* @param {object} connection
|
||||
*/
|
||||
markDead (connection) {
|
||||
if (this._sniffEnabled === false && this.connections.size === 1) return
|
||||
const { id } = connection
|
||||
debug(`Marking as 'dead' connection '${id}'`)
|
||||
if (this.dead.indexOf(id) === -1) {
|
||||
this.dead.push(id)
|
||||
}
|
||||
connection.status = Connection.statuses.DEAD
|
||||
connection.deadCount++
|
||||
// resurrectTimeout formula:
|
||||
// `resurrectTimeout * 2 ** min(deadCount - 1, resurrectTimeoutCutoff)`
|
||||
connection.resurrectTimeout = Date.now() + this.resurrectTimeout * Math.pow(
|
||||
2, Math.min(connection.deadCount - 1, this.resurrectTimeoutCutoff)
|
||||
)
|
||||
|
||||
// sort the dead list in ascending order
|
||||
// based on the resurrectTimeout
|
||||
this.dead.sort((a, b) => {
|
||||
const conn1 = this.connections.get(a)
|
||||
const conn2 = this.connections.get(b)
|
||||
return conn1.resurrectTimeout - conn2.resurrectTimeout
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* If enabled, tries to resurrect a connection with the given
|
||||
* resurrect strategy ('ping', 'optimistic', 'none').
|
||||
*
|
||||
* @param {object} { now, requestId }
|
||||
* @param {function} callback (isAlive, connection)
|
||||
*/
|
||||
resurrect (opts, callback = noop) {
|
||||
if (this.resurrectStrategy === 0 || this.dead.length === 0) {
|
||||
debug('Nothing to resurrect')
|
||||
callback(null, null)
|
||||
return
|
||||
}
|
||||
|
||||
// the dead list is sorted in ascending order based on the timeout
|
||||
// so the first element will always be the one with the smaller timeout
|
||||
const connection = this.connections.get(this.dead[0])
|
||||
if ((opts.now || Date.now()) < connection.resurrectTimeout) {
|
||||
debug('Nothing to resurrect')
|
||||
callback(null, null)
|
||||
return
|
||||
}
|
||||
|
||||
const { id } = connection
|
||||
|
||||
// ping strategy
|
||||
if (this.resurrectStrategy === 1) {
|
||||
connection.request({
|
||||
method: 'HEAD',
|
||||
path: '/',
|
||||
timeout: this.pingTimeout
|
||||
}, (err, response) => {
|
||||
var isAlive = true
|
||||
const statusCode = response !== null ? response.statusCode : 0
|
||||
if (err != null ||
|
||||
(statusCode === 502 || statusCode === 503 || statusCode === 504)) {
|
||||
debug(`Resurrect: connection '${id}' is still dead`)
|
||||
this.markDead(connection)
|
||||
isAlive = false
|
||||
} else {
|
||||
debug(`Resurrect: connection '${id}' is now alive`)
|
||||
this.markAlive(connection)
|
||||
}
|
||||
this.emit('resurrect', null, {
|
||||
strategy: 'ping',
|
||||
name: opts.name,
|
||||
request: { id: opts.requestId },
|
||||
isAlive,
|
||||
connection
|
||||
})
|
||||
callback(isAlive, connection)
|
||||
})
|
||||
// optimistic strategy
|
||||
} else {
|
||||
debug(`Resurrect: optimistic resurrection for connection '${id}'`)
|
||||
this.dead.splice(this.dead.indexOf(id), 1)
|
||||
connection.status = Connection.statuses.ALIVE
|
||||
this.emit('resurrect', null, {
|
||||
strategy: 'optimistic',
|
||||
name: opts.name,
|
||||
request: { id: opts.requestId },
|
||||
isAlive: true,
|
||||
connection
|
||||
})
|
||||
// eslint-disable-next-line standard/no-callback-literal
|
||||
callback(true, connection)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an alive connection if present,
|
||||
* otherwise returns null.
|
||||
* By default it filters the `master` only nodes.
|
||||
* It uses the selector to choose which
|
||||
* connection return.
|
||||
*
|
||||
* @param {object} options (filter and selector)
|
||||
* @returns {object|null} connection
|
||||
*/
|
||||
getConnection (opts = {}) {
|
||||
const filter = opts.filter || (() => true)
|
||||
const selector = opts.selector || (c => c[0])
|
||||
|
||||
// TODO: can we cache this?
|
||||
const connections = []
|
||||
for (var connection of this.connections.values()) {
|
||||
if (connection.status === Connection.statuses.ALIVE) {
|
||||
if (filter(connection) === true) {
|
||||
connections.push(connection)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (connections.length === 0) return null
|
||||
|
||||
return selector(connections)
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a new connection to the pool.
|
||||
*
|
||||
* @param {object|string} host
|
||||
* @returns {ConnectionPool}
|
||||
*/
|
||||
addConnection (opts) {
|
||||
if (Array.isArray(opts)) {
|
||||
opts.forEach(o => this.addConnection(o))
|
||||
return
|
||||
}
|
||||
|
||||
if (typeof opts === 'string') {
|
||||
opts = this.urlToHost(opts)
|
||||
}
|
||||
// if a given node has auth data we store it in the connection pool,
|
||||
// so if we add new nodes without auth data (after a sniff for example)
|
||||
// we can add it to them once the connection instance has been created
|
||||
if (opts.url.username !== '' && opts.url.password !== '') {
|
||||
this._auth = {
|
||||
username: decodeURIComponent(opts.url.username),
|
||||
password: decodeURIComponent(opts.url.password)
|
||||
}
|
||||
opts.auth = this._auth
|
||||
}
|
||||
|
||||
if (this._auth != null) {
|
||||
if (opts.auth == null || (opts.auth.username == null && opts.auth.password == null)) {
|
||||
opts.auth = this._auth
|
||||
opts.url.username = this._auth.username
|
||||
opts.url.password = this._auth.password
|
||||
}
|
||||
}
|
||||
|
||||
if (opts.ssl == null) opts.ssl = this._ssl
|
||||
if (opts.agent == null) opts.agent = this._agent
|
||||
|
||||
const connection = new this.Connection(opts)
|
||||
debug('Adding a new connection', connection)
|
||||
if (this.connections.has(connection.id)) {
|
||||
throw new Error(`Connection with id '${connection.id}' is already present`)
|
||||
}
|
||||
this.connections.set(connection.id, connection)
|
||||
return connection
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a new connection to the pool.
|
||||
*
|
||||
* @param {object} connection
|
||||
* @returns {ConnectionPool}
|
||||
*/
|
||||
removeConnection (connection) {
|
||||
debug('Removing connection', connection)
|
||||
connection.close(noop)
|
||||
const { id } = connection
|
||||
this.connections.delete(id)
|
||||
var index = this.dead.indexOf(id)
|
||||
if (index > -1) this.dead.splice(index, 1)
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* Empties the connection pool.
|
||||
*
|
||||
* @returns {ConnectionPool}
|
||||
*/
|
||||
empty (callback) {
|
||||
debug('Emptying the connection pool')
|
||||
var openConnections = this.connections.size
|
||||
this.connections.forEach(connection => {
|
||||
connection.close(() => {
|
||||
if (--openConnections === 0) {
|
||||
this.connections = new Map()
|
||||
this.dead = []
|
||||
callback()
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the ConnectionPool with new connections.
|
||||
*
|
||||
* @param {array} array of connections
|
||||
* @returns {ConnectionPool}
|
||||
*/
|
||||
update (connections) {
|
||||
debug('Updating the connection pool')
|
||||
for (var i = 0; i < connections.length; i++) {
|
||||
const connection = connections[i]
|
||||
// if we already have a given connection in the pool
|
||||
// we check its status, if is 'alive', we do nothing,
|
||||
// if 'dead' we mark it as alive, we do not close the old
|
||||
// one to avoid socket issues
|
||||
if (this.connections.has(connection.id) === true) {
|
||||
debug(`The connection with id '${connection.id}' is already present`)
|
||||
const oldConnection = this.connections.get(connection.id)
|
||||
if (oldConnection.status === Connection.statuses.DEAD) {
|
||||
this.markAlive(oldConnection)
|
||||
}
|
||||
// in case the user has passed a single url (or an array of urls),
|
||||
// the connection id will be the full href; to avoid closing valid connections
|
||||
// because are not present in the pool, we check also the node url,
|
||||
// and if is already present we update its id with the ES provided one.
|
||||
} else if (this.connections.has(connection.url.href) === true) {
|
||||
const oldConnection = this.connections.get(connection.url.href)
|
||||
this.connections.delete(connection.url.href)
|
||||
oldConnection.id = connection.id
|
||||
this.connections.set(connection.id, oldConnection)
|
||||
if (oldConnection.status === Connection.statuses.DEAD) {
|
||||
this.markAlive(oldConnection)
|
||||
}
|
||||
} else {
|
||||
this.addConnection(connection)
|
||||
}
|
||||
}
|
||||
|
||||
const ids = connections.map(c => c.id)
|
||||
// remove all the dead connections and old connections
|
||||
for (const connection of this.connections.values()) {
|
||||
if (ids.indexOf(connection.id) === -1) {
|
||||
this.removeConnection(connection)
|
||||
}
|
||||
}
|
||||
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms the nodes objects to a host object.
|
||||
*
|
||||
* @param {object} nodes
|
||||
* @returns {array} hosts
|
||||
*/
|
||||
nodesToHost (nodes, protocol) {
|
||||
const ids = Object.keys(nodes)
|
||||
const hosts = []
|
||||
|
||||
for (var i = 0, len = ids.length; i < len; i++) {
|
||||
const node = nodes[ids[i]]
|
||||
// If there is no protocol in
|
||||
// the `publish_address` new URL will throw
|
||||
// the publish_address can have two forms:
|
||||
// - ip:port
|
||||
// - hostname/ip:port
|
||||
// if we encounter the second case, we should
|
||||
// use the hostname instead of the ip
|
||||
var address = node.http.publish_address
|
||||
const parts = address.split('/')
|
||||
// the url is in the form of hostname/ip:port
|
||||
if (parts.length > 1) {
|
||||
const hostname = parts[0]
|
||||
const port = parts[1].match(/((?::))(?:[0-9]+)$/g)[0].slice(1)
|
||||
address = `${hostname}:${port}`
|
||||
}
|
||||
|
||||
address = address.slice(0, 4) === 'http'
|
||||
? address
|
||||
: `${protocol}//${address}`
|
||||
const roles = node.roles.reduce((acc, role) => {
|
||||
acc[role] = true
|
||||
return acc
|
||||
}, {})
|
||||
|
||||
hosts.push({
|
||||
url: new URL(address),
|
||||
id: ids[i],
|
||||
roles: Object.assign({
|
||||
[Connection.roles.MASTER]: true,
|
||||
[Connection.roles.DATA]: true,
|
||||
[Connection.roles.INGEST]: true,
|
||||
[Connection.roles.ML]: false
|
||||
}, roles)
|
||||
})
|
||||
}
|
||||
|
||||
return hosts
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms an url string to a host object
|
||||
*
|
||||
* @param {string} url
|
||||
* @returns {object} host
|
||||
*/
|
||||
urlToHost (url) {
|
||||
return {
|
||||
url: new URL(url)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ConnectionPool.resurrectStrategies = {
|
||||
none: 0,
|
||||
ping: 1,
|
||||
optimistic: 2
|
||||
}
|
||||
|
||||
// https://gist.github.com/guilhermepontes/17ae0cc71fa2b13ea8c20c94c5c35dc4
|
||||
// const shuffleArray = arr => arr
|
||||
// .map(a => [Math.random(), a])
|
||||
// .sort((a, b) => a[0] - b[0])
|
||||
// .map(a => a[1])
|
||||
|
||||
module.exports = ConnectionPool
|
||||
21
lib/Serializer.d.ts
vendored
21
lib/Serializer.d.ts
vendored
@ -1,21 +1,6 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
// See the LICENSE file in the project root for more information
|
||||
|
||||
export default class Serializer {
|
||||
serialize(object: any): string;
|
||||
|
||||
@ -1,21 +1,6 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
// See the LICENSE file in the project root for more information
|
||||
|
||||
'use strict'
|
||||
|
||||
|
||||
27
lib/Transport.d.ts
vendored
27
lib/Transport.d.ts
vendored
@ -1,23 +1,8 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
// See the LICENSE file in the project root for more information
|
||||
|
||||
import ConnectionPool from './ConnectionPool';
|
||||
import { ConnectionPool, CloudConnectionPool } from './pool';
|
||||
import Connection from './Connection';
|
||||
import Serializer from './Serializer';
|
||||
|
||||
@ -38,7 +23,7 @@ declare type emitFn = (event: string | symbol, ...args: any[]) => boolean;
|
||||
|
||||
interface TransportOptions {
|
||||
emit: emitFn & noopFn;
|
||||
connectionPool: ConnectionPool;
|
||||
connectionPool: ConnectionPool | CloudConnectionPool;
|
||||
serializer: Serializer;
|
||||
maxRetries: number;
|
||||
requestTimeout: number | string;
|
||||
@ -128,7 +113,7 @@ export default class Transport {
|
||||
DEFAULT: string;
|
||||
};
|
||||
emit: emitFn & noopFn;
|
||||
connectionPool: ConnectionPool;
|
||||
connectionPool: ConnectionPool | CloudConnectionPool;
|
||||
serializer: Serializer;
|
||||
maxRetries: number;
|
||||
requestTimeout: number;
|
||||
|
||||
@ -1,21 +1,6 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
// See the LICENSE file in the project root for more information
|
||||
|
||||
'use strict'
|
||||
|
||||
@ -329,10 +314,12 @@ class Transport {
|
||||
if (this._sniffEnabled === true && now > this._nextSniff) {
|
||||
this.sniff({ reason: Transport.sniffReasons.SNIFF_INTERVAL, requestId: opts.requestId })
|
||||
}
|
||||
this.connectionPool.resurrect({ now, requestId: opts.requestId, name: this.name })
|
||||
return this.connectionPool.getConnection({
|
||||
filter: this.nodeFilter,
|
||||
selector: this.nodeSelector
|
||||
selector: this.nodeSelector,
|
||||
requestId: opts.requestId,
|
||||
name: this.name,
|
||||
now
|
||||
})
|
||||
}
|
||||
|
||||
|
||||
21
lib/errors.d.ts
vendored
21
lib/errors.d.ts
vendored
@ -1,21 +1,6 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
// See the LICENSE file in the project root for more information
|
||||
|
||||
import { ApiResponse } from './Transport'
|
||||
|
||||
|
||||
@ -1,21 +1,6 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
// See the LICENSE file in the project root for more information
|
||||
|
||||
'use strict'
|
||||
|
||||
|
||||
239
lib/pool/BaseConnectionPool.js
Normal file
239
lib/pool/BaseConnectionPool.js
Normal file
@ -0,0 +1,239 @@
|
||||
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
// See the LICENSE file in the project root for more information
|
||||
|
||||
'use strict'
|
||||
|
||||
const { URL } = require('url')
|
||||
const debug = require('debug')('elasticsearch')
|
||||
const Connection = require('../Connection')
|
||||
const noop = () => {}
|
||||
|
||||
class BaseConnectionPool {
|
||||
constructor (opts) {
|
||||
// list of nodes and weights
|
||||
this.connections = []
|
||||
// how many nodes we have in our scheduler
|
||||
this.size = this.connections.length
|
||||
this.Connection = opts.Connection
|
||||
this.emit = opts.emit || noop
|
||||
this.auth = opts.auth || null
|
||||
this._ssl = opts.ssl
|
||||
this._agent = opts.agent
|
||||
}
|
||||
|
||||
getConnection () {
|
||||
throw new Error('getConnection must be implemented')
|
||||
}
|
||||
|
||||
markAlive () {
|
||||
return this
|
||||
}
|
||||
|
||||
markDead () {
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new connection instance.
|
||||
*/
|
||||
createConnection (opts) {
|
||||
if (typeof opts === 'string') {
|
||||
opts = this.urlToHost(opts)
|
||||
}
|
||||
|
||||
if (opts.url.username !== '' && opts.url.password !== '') {
|
||||
opts.auth = {
|
||||
username: decodeURIComponent(opts.url.username),
|
||||
password: decodeURIComponent(opts.url.password)
|
||||
}
|
||||
} else if (this.auth !== null) {
|
||||
opts.auth = this.auth
|
||||
}
|
||||
|
||||
if (opts.ssl == null) opts.ssl = this._ssl
|
||||
if (opts.agent == null) opts.agent = this._agent
|
||||
|
||||
const connection = new this.Connection(opts)
|
||||
|
||||
for (const conn of this.connections) {
|
||||
if (conn.id === connection.id) {
|
||||
throw new Error(`Connection with id '${connection.id}' is already present`)
|
||||
}
|
||||
}
|
||||
|
||||
return connection
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds a new connection to the pool.
|
||||
*
|
||||
* @param {object|string} host
|
||||
* @returns {ConnectionPool}
|
||||
*/
|
||||
addConnection (opts) {
|
||||
if (Array.isArray(opts)) {
|
||||
return opts.forEach(o => this.addConnection(o))
|
||||
}
|
||||
|
||||
if (typeof opts === 'string') {
|
||||
opts = this.urlToHost(opts)
|
||||
}
|
||||
|
||||
const connectionById = this.connections.find(c => c.id === opts.id)
|
||||
const connectionByUrl = this.connections.find(c => c.id === opts.url.href)
|
||||
|
||||
if (connectionById || connectionByUrl) {
|
||||
throw new Error(`Connection with id '${opts.id || opts.url.href}' is already present`)
|
||||
}
|
||||
|
||||
this.update([...this.connections, opts])
|
||||
return this.connections[this.size - 1]
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a new connection to the pool.
|
||||
*
|
||||
* @param {object} connection
|
||||
* @returns {ConnectionPool}
|
||||
*/
|
||||
removeConnection (connection) {
|
||||
debug('Removing connection', connection)
|
||||
return this.update(this.connections.filter(c => c.id !== connection.id))
|
||||
}
|
||||
|
||||
/**
|
||||
* Empties the connection pool.
|
||||
*
|
||||
* @returns {ConnectionPool}
|
||||
*/
|
||||
empty (callback) {
|
||||
debug('Emptying the connection pool')
|
||||
var openConnections = this.size
|
||||
this.connections.forEach(connection => {
|
||||
connection.close(() => {
|
||||
if (--openConnections === 0) {
|
||||
this.connections = []
|
||||
this.size = this.connections.length
|
||||
callback()
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the ConnectionPool with new connections.
|
||||
*
|
||||
* @param {array} array of connections
|
||||
* @returns {ConnectionPool}
|
||||
*/
|
||||
update (nodes) {
|
||||
debug('Updating the connection pool')
|
||||
const newConnections = []
|
||||
const oldConnections = []
|
||||
|
||||
for (const node of nodes) {
|
||||
// if we already have a given connection in the pool
|
||||
// we mark it as alive and we do not close the connection
|
||||
// to avoid socket issues
|
||||
const connectionById = this.connections.find(c => c.id === node.id)
|
||||
const connectionByUrl = this.connections.find(c => c.id === node.url.href)
|
||||
if (connectionById) {
|
||||
debug(`The connection with id '${node.id}' is already present`)
|
||||
this.markAlive(connectionById)
|
||||
newConnections.push(connectionById)
|
||||
// in case the user has passed a single url (or an array of urls),
|
||||
// the connection id will be the full href; to avoid closing valid connections
|
||||
// because are not present in the pool, we check also the node url,
|
||||
// and if is already present we update its id with the ES provided one.
|
||||
} else if (connectionByUrl) {
|
||||
connectionByUrl.id = node.id
|
||||
this.markAlive(connectionByUrl)
|
||||
newConnections.push(connectionByUrl)
|
||||
} else {
|
||||
newConnections.push(this.createConnection(node))
|
||||
}
|
||||
}
|
||||
|
||||
const ids = nodes.map(c => c.id)
|
||||
// remove all the dead connections and old connections
|
||||
for (const connection of this.connections) {
|
||||
if (ids.indexOf(connection.id) === -1) {
|
||||
oldConnections.push(connection)
|
||||
}
|
||||
}
|
||||
|
||||
// close old connections
|
||||
oldConnections.forEach(connection => connection.close())
|
||||
|
||||
this.connections = newConnections
|
||||
this.size = this.connections.length
|
||||
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms the nodes objects to a host object.
|
||||
*
|
||||
* @param {object} nodes
|
||||
* @returns {array} hosts
|
||||
*/
|
||||
nodesToHost (nodes, protocol) {
|
||||
const ids = Object.keys(nodes)
|
||||
const hosts = []
|
||||
|
||||
for (var i = 0, len = ids.length; i < len; i++) {
|
||||
const node = nodes[ids[i]]
|
||||
// If there is no protocol in
|
||||
// the `publish_address` new URL will throw
|
||||
// the publish_address can have two forms:
|
||||
// - ip:port
|
||||
// - hostname/ip:port
|
||||
// if we encounter the second case, we should
|
||||
// use the hostname instead of the ip
|
||||
var address = node.http.publish_address
|
||||
const parts = address.split('/')
|
||||
// the url is in the form of hostname/ip:port
|
||||
if (parts.length > 1) {
|
||||
const hostname = parts[0]
|
||||
const port = parts[1].match(/((?::))(?:[0-9]+)$/g)[0].slice(1)
|
||||
address = `${hostname}:${port}`
|
||||
}
|
||||
|
||||
address = address.slice(0, 4) === 'http'
|
||||
? address
|
||||
: `${protocol}//${address}`
|
||||
const roles = node.roles.reduce((acc, role) => {
|
||||
acc[role] = true
|
||||
return acc
|
||||
}, {})
|
||||
|
||||
hosts.push({
|
||||
url: new URL(address),
|
||||
id: ids[i],
|
||||
roles: Object.assign({
|
||||
[Connection.roles.MASTER]: true,
|
||||
[Connection.roles.DATA]: true,
|
||||
[Connection.roles.INGEST]: true,
|
||||
[Connection.roles.ML]: false
|
||||
}, roles)
|
||||
})
|
||||
}
|
||||
|
||||
return hosts
|
||||
}
|
||||
|
||||
/**
|
||||
* Transforms an url string to a host object
|
||||
*
|
||||
* @param {string} url
|
||||
* @returns {object} host
|
||||
*/
|
||||
urlToHost (url) {
|
||||
return {
|
||||
url: new URL(url)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = BaseConnectionPool
|
||||
49
lib/pool/CloudConnectionPool.js
Normal file
49
lib/pool/CloudConnectionPool.js
Normal file
@ -0,0 +1,49 @@
|
||||
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
// See the LICENSE file in the project root for more information
|
||||
|
||||
'use strict'
|
||||
|
||||
const BaseConnectionPool = require('./BaseConnectionPool')
|
||||
|
||||
class CloudConnectionPool extends BaseConnectionPool {
|
||||
constructor (opts = {}) {
|
||||
super(opts)
|
||||
this.cloudConnection = null
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the only cloud connection.
|
||||
*
|
||||
* @returns {object} connection
|
||||
*/
|
||||
getConnection () {
|
||||
return this.cloudConnection
|
||||
}
|
||||
|
||||
/**
|
||||
* Empties the connection pool.
|
||||
*
|
||||
* @returns {ConnectionPool}
|
||||
*/
|
||||
empty (callback) {
|
||||
super.empty(() => {
|
||||
this.cloudConnection = null
|
||||
callback()
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the ConnectionPool with new connections.
|
||||
*
|
||||
* @param {array} array of connections
|
||||
* @returns {ConnectionPool}
|
||||
*/
|
||||
update (connections) {
|
||||
super.update(connections)
|
||||
this.cloudConnection = this.connections[0]
|
||||
return this
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = CloudConnectionPool
|
||||
232
lib/pool/ConnectionPool.js
Normal file
232
lib/pool/ConnectionPool.js
Normal file
@ -0,0 +1,232 @@
|
||||
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
// See the LICENSE file in the project root for more information
|
||||
|
||||
'use strict'
|
||||
|
||||
const BaseConnectionPool = require('./BaseConnectionPool')
|
||||
const assert = require('assert')
|
||||
const debug = require('debug')('elasticsearch')
|
||||
const Connection = require('../Connection')
|
||||
const noop = () => {}
|
||||
|
||||
class ConnectionPool extends BaseConnectionPool {
|
||||
constructor (opts = {}) {
|
||||
super(opts)
|
||||
|
||||
this.dead = []
|
||||
// the resurrect timeout is 60s
|
||||
this.resurrectTimeout = 1000 * 60
|
||||
// number of consecutive failures after which
|
||||
// the timeout doesn't increase
|
||||
this.resurrectTimeoutCutoff = 5
|
||||
this.pingTimeout = opts.pingTimeout
|
||||
this._sniffEnabled = opts.sniffEnabled || false
|
||||
|
||||
const resurrectStrategy = opts.resurrectStrategy || 'ping'
|
||||
this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy]
|
||||
assert(
|
||||
this.resurrectStrategy != null,
|
||||
`Invalid resurrection strategy: '${resurrectStrategy}'`
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks a connection as 'alive'.
|
||||
* If needed removes the connection from the dead list
|
||||
* and then resets the `deadCount`.
|
||||
* If sniffing is not enabled and there is only
|
||||
* one node, this method is a noop.
|
||||
*
|
||||
* @param {object} connection
|
||||
*/
|
||||
markAlive (connection) {
|
||||
if (this._sniffEnabled === false && this.size === 1) return this
|
||||
const { id } = connection
|
||||
debug(`Marking as 'alive' connection '${id}'`)
|
||||
const index = this.dead.indexOf(id)
|
||||
if (index > -1) this.dead.splice(index, 1)
|
||||
connection.status = Connection.statuses.ALIVE
|
||||
connection.deadCount = 0
|
||||
connection.resurrectTimeout = 0
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* Marks a connection as 'dead'.
|
||||
* If needed adds the connection to the dead list
|
||||
* and then increments the `deadCount`.
|
||||
* If sniffing is not enabled and there is only
|
||||
* one node, this method is a noop.
|
||||
*
|
||||
* @param {object} connection
|
||||
*/
|
||||
markDead (connection) {
|
||||
if (this._sniffEnabled === false && this.size === 1) return this
|
||||
const { id } = connection
|
||||
debug(`Marking as 'dead' connection '${id}'`)
|
||||
if (this.dead.indexOf(id) === -1) {
|
||||
this.dead.push(id)
|
||||
}
|
||||
connection.status = Connection.statuses.DEAD
|
||||
connection.deadCount++
|
||||
// resurrectTimeout formula:
|
||||
// `resurrectTimeout * 2 ** min(deadCount - 1, resurrectTimeoutCutoff)`
|
||||
connection.resurrectTimeout = Date.now() + this.resurrectTimeout * Math.pow(
|
||||
2, Math.min(connection.deadCount - 1, this.resurrectTimeoutCutoff)
|
||||
)
|
||||
|
||||
// sort the dead list in ascending order
|
||||
// based on the resurrectTimeout
|
||||
this.dead.sort((a, b) => {
|
||||
const conn1 = this.connections.find(c => c.id === a)
|
||||
const conn2 = this.connections.find(c => c.id === b)
|
||||
return conn1.resurrectTimeout - conn2.resurrectTimeout
|
||||
})
|
||||
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* If enabled, tries to resurrect a connection with the given
|
||||
* resurrect strategy ('ping', 'optimistic', 'none').
|
||||
*
|
||||
* @param {object} { now, requestId }
|
||||
* @param {function} callback (isAlive, connection)
|
||||
*/
|
||||
resurrect (opts, callback = noop) {
|
||||
if (this.resurrectStrategy === 0 || this.dead.length === 0) {
|
||||
debug('Nothing to resurrect')
|
||||
callback(null, null)
|
||||
return
|
||||
}
|
||||
|
||||
// the dead list is sorted in ascending order based on the timeout
|
||||
// so the first element will always be the one with the smaller timeout
|
||||
const connection = this.connections.find(c => c.id === this.dead[0])
|
||||
if ((opts.now || Date.now()) < connection.resurrectTimeout) {
|
||||
debug('Nothing to resurrect')
|
||||
callback(null, null)
|
||||
return
|
||||
}
|
||||
|
||||
const { id } = connection
|
||||
|
||||
// ping strategy
|
||||
if (this.resurrectStrategy === 1) {
|
||||
connection.request({
|
||||
method: 'HEAD',
|
||||
path: '/',
|
||||
timeout: this.pingTimeout
|
||||
}, (err, response) => {
|
||||
var isAlive = true
|
||||
const statusCode = response !== null ? response.statusCode : 0
|
||||
if (err != null ||
|
||||
(statusCode === 502 || statusCode === 503 || statusCode === 504)) {
|
||||
debug(`Resurrect: connection '${id}' is still dead`)
|
||||
this.markDead(connection)
|
||||
isAlive = false
|
||||
} else {
|
||||
debug(`Resurrect: connection '${id}' is now alive`)
|
||||
this.markAlive(connection)
|
||||
}
|
||||
this.emit('resurrect', null, {
|
||||
strategy: 'ping',
|
||||
name: opts.name,
|
||||
request: { id: opts.requestId },
|
||||
isAlive,
|
||||
connection
|
||||
})
|
||||
callback(isAlive, connection)
|
||||
})
|
||||
// optimistic strategy
|
||||
} else {
|
||||
debug(`Resurrect: optimistic resurrection for connection '${id}'`)
|
||||
this.dead.splice(this.dead.indexOf(id), 1)
|
||||
connection.status = Connection.statuses.ALIVE
|
||||
this.emit('resurrect', null, {
|
||||
strategy: 'optimistic',
|
||||
name: opts.name,
|
||||
request: { id: opts.requestId },
|
||||
isAlive: true,
|
||||
connection
|
||||
})
|
||||
// eslint-disable-next-line standard/no-callback-literal
|
||||
callback(true, connection)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns an alive connection if present,
|
||||
* otherwise returns null.
|
||||
* By default it filters the `master` only nodes.
|
||||
* It uses the selector to choose which
|
||||
* connection return.
|
||||
*
|
||||
* @param {object} options (filter and selector)
|
||||
* @returns {object|null} connection
|
||||
*/
|
||||
getConnection (opts = {}) {
|
||||
const filter = opts.filter || (() => true)
|
||||
const selector = opts.selector || (c => c[0])
|
||||
|
||||
this.resurrect({
|
||||
now: opts.now,
|
||||
requestId: opts.requestId,
|
||||
name: opts.name
|
||||
})
|
||||
|
||||
// TODO: can we cache this?
|
||||
const connections = []
|
||||
for (var i = 0; i < this.size; i++) {
|
||||
const connection = this.connections[i]
|
||||
if (connection.status === Connection.statuses.ALIVE) {
|
||||
if (filter(connection) === true) {
|
||||
connections.push(connection)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (connections.length === 0) return null
|
||||
|
||||
return selector(connections)
|
||||
}
|
||||
|
||||
/**
|
||||
* Empties the connection pool.
|
||||
*
|
||||
* @returns {ConnectionPool}
|
||||
*/
|
||||
empty (callback) {
|
||||
super.empty(() => {
|
||||
this.dead = []
|
||||
callback()
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Update the ConnectionPool with new connections.
|
||||
*
|
||||
* @param {array} array of connections
|
||||
* @returns {ConnectionPool}
|
||||
*/
|
||||
update (connections) {
|
||||
super.update(connections)
|
||||
|
||||
for (var i = 0; i < this.dead.length; i++) {
|
||||
if (this.connections.find(c => c.id === this.dead[i]) === undefined) {
|
||||
this.dead.splice(i, 1)
|
||||
}
|
||||
}
|
||||
|
||||
return this
|
||||
}
|
||||
}
|
||||
|
||||
ConnectionPool.resurrectStrategies = {
|
||||
none: 0,
|
||||
ping: 1,
|
||||
optimistic: 2
|
||||
}
|
||||
|
||||
module.exports = ConnectionPool
|
||||
148
lib/ConnectionPool.d.ts → lib/pool/index.d.ts
vendored
148
lib/ConnectionPool.d.ts → lib/pool/index.d.ts
vendored
@ -1,48 +1,58 @@
|
||||
/*
|
||||
* Licensed to Elasticsearch B.V. under one or more contributor
|
||||
* license agreements. See the NOTICE file distributed with
|
||||
* this work for additional information regarding copyright
|
||||
* ownership. Elasticsearch B.V. licenses this file to you under
|
||||
* the Apache License, Version 2.0 (the "License"); you may
|
||||
* not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing,
|
||||
* software distributed under the License is distributed on an
|
||||
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
* KIND, either express or implied. See the License for the
|
||||
* specific language governing permissions and limitations
|
||||
* under the License.
|
||||
*/
|
||||
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
// See the LICENSE file in the project root for more information
|
||||
|
||||
/// <reference types="node" />
|
||||
|
||||
import { SecureContextOptions } from 'tls';
|
||||
import Connection, { AgentOptions } from './Connection';
|
||||
import { nodeFilterFn, nodeSelectorFn } from './Transport';
|
||||
import Connection, { AgentOptions } from '../Connection';
|
||||
import { nodeFilterFn, nodeSelectorFn } from '../Transport';
|
||||
|
||||
interface ConnectionPoolOptions {
|
||||
interface BaseConnectionPoolOptions {
|
||||
ssl?: SecureContextOptions;
|
||||
agent?: AgentOptions;
|
||||
auth?: BasicAuth | ApiKeyAuth;
|
||||
emit: (event: string | symbol, ...args: any[]) => boolean;
|
||||
pingTimeout?: number;
|
||||
Connection: typeof Connection;
|
||||
resurrectStrategy?: string;
|
||||
}
|
||||
|
||||
export interface getConnectionOptions {
|
||||
filter?: nodeFilterFn;
|
||||
selector?: nodeSelectorFn;
|
||||
interface ConnectionPoolOptions extends BaseConnectionPoolOptions {
|
||||
pingTimeout?: number;
|
||||
resurrectStrategy?: string;
|
||||
sniffEnabled?: boolean;
|
||||
}
|
||||
|
||||
export interface resurrectOptions {
|
||||
interface getConnectionOptions {
|
||||
filter?: nodeFilterFn;
|
||||
selector?: nodeSelectorFn;
|
||||
requestId?: string | number;
|
||||
name?: string;
|
||||
now?: number;
|
||||
}
|
||||
|
||||
interface ApiKeyAuth {
|
||||
apiKey:
|
||||
| string
|
||||
| {
|
||||
id: string;
|
||||
api_key: string;
|
||||
}
|
||||
}
|
||||
|
||||
interface BasicAuth {
|
||||
username: string;
|
||||
password: string;
|
||||
}
|
||||
|
||||
interface resurrectOptions {
|
||||
now?: number;
|
||||
requestId: string;
|
||||
name: string;
|
||||
}
|
||||
|
||||
export interface ResurrectEvent {
|
||||
interface ResurrectEvent {
|
||||
strategy: string;
|
||||
isAlive: boolean;
|
||||
connection: Connection;
|
||||
@ -52,23 +62,14 @@ export interface ResurrectEvent {
|
||||
};
|
||||
}
|
||||
|
||||
export default class ConnectionPool {
|
||||
static resurrectStrategies: {
|
||||
none: number;
|
||||
ping: number;
|
||||
optimistic: number;
|
||||
};
|
||||
connections: any;
|
||||
dead: string[];
|
||||
|
||||
declare class BaseConnectionPool {
|
||||
connections: Connection[];
|
||||
_ssl: SecureContextOptions | null;
|
||||
_agent: AgentOptions | null;
|
||||
_sniffEnabled: boolean;
|
||||
resurrectTimeout: number;
|
||||
resurrectTimeoutCutoff: number;
|
||||
pingTimeout: number;
|
||||
auth: BasicAuth | ApiKeyAuth;
|
||||
Connection: typeof Connection;
|
||||
resurrectStrategy: number;
|
||||
constructor(opts?: ConnectionPoolOptions);
|
||||
constructor(opts?: BaseConnectionPoolOptions);
|
||||
/**
|
||||
* Marks a connection as 'alive'.
|
||||
* If needed removes the connection from the dead list
|
||||
@ -76,7 +77,7 @@ export default class ConnectionPool {
|
||||
*
|
||||
* @param {object} connection
|
||||
*/
|
||||
markAlive(connection: Connection): void;
|
||||
markAlive(connection: Connection): this;
|
||||
/**
|
||||
* Marks a connection as 'dead'.
|
||||
* If needed adds the connection to the dead list
|
||||
@ -84,15 +85,7 @@ export default class ConnectionPool {
|
||||
*
|
||||
* @param {object} connection
|
||||
*/
|
||||
markDead(connection: Connection): void;
|
||||
/**
|
||||
* If enabled, tries to resurrect a connection with the given
|
||||
* resurrect strategy ('ping', 'optimistic', 'none').
|
||||
*
|
||||
* @param {object} { now, requestId, name }
|
||||
* @param {function} callback (isAlive, connection)
|
||||
*/
|
||||
resurrect(opts: resurrectOptions, callback?: (isAlive: boolean | null, connection: Connection | null) => void): void;
|
||||
markDead(connection: Connection): this;
|
||||
/**
|
||||
* Returns an alive connection if present,
|
||||
* otherwise returns null.
|
||||
@ -110,27 +103,27 @@ export default class ConnectionPool {
|
||||
* @param {object|string} host
|
||||
* @returns {ConnectionPool}
|
||||
*/
|
||||
addConnection(opts: any): Connection | void;
|
||||
addConnection(opts: any): Connection;
|
||||
/**
|
||||
* Removes a new connection to the pool.
|
||||
*
|
||||
* @param {object} connection
|
||||
* @returns {ConnectionPool}
|
||||
*/
|
||||
removeConnection(connection: Connection): ConnectionPool;
|
||||
removeConnection(connection: Connection): this;
|
||||
/**
|
||||
* Empties the connection pool.
|
||||
*
|
||||
* @returns {ConnectionPool}
|
||||
*/
|
||||
empty(): ConnectionPool;
|
||||
empty(): this;
|
||||
/**
|
||||
* Update the ConnectionPool with new connections.
|
||||
*
|
||||
* @param {array} array of connections
|
||||
* @returns {ConnectionPool}
|
||||
*/
|
||||
update(connections: Connection[]): ConnectionPool;
|
||||
update(connections: any[]): this;
|
||||
/**
|
||||
* Transforms the nodes objects to a host object.
|
||||
*
|
||||
@ -147,14 +140,57 @@ export default class ConnectionPool {
|
||||
urlToHost(url: string): any;
|
||||
}
|
||||
|
||||
declare class ConnectionPool extends BaseConnectionPool {
|
||||
static resurrectStrategies: {
|
||||
none: number;
|
||||
ping: number;
|
||||
optimistic: number;
|
||||
};
|
||||
dead: string[];
|
||||
_sniffEnabled: boolean;
|
||||
resurrectTimeout: number;
|
||||
resurrectTimeoutCutoff: number;
|
||||
pingTimeout: number;
|
||||
resurrectStrategy: number;
|
||||
constructor(opts?: ConnectionPoolOptions);
|
||||
|
||||
/**
|
||||
* If enabled, tries to resurrect a connection with the given
|
||||
* resurrect strategy ('ping', 'optimistic', 'none').
|
||||
*
|
||||
* @param {object} { now, requestId, name }
|
||||
* @param {function} callback (isAlive, connection)
|
||||
*/
|
||||
resurrect(opts: resurrectOptions, callback?: (isAlive: boolean | null, connection: Connection | null) => void): void;
|
||||
}
|
||||
|
||||
declare class CloudConnectionPool extends BaseConnectionPool {
|
||||
cloudConnection: Connection | null
|
||||
constructor(opts?: BaseConnectionPoolOptions);
|
||||
getConnection(): Connection;
|
||||
}
|
||||
|
||||
declare function defaultNodeFilter(node: Connection): boolean;
|
||||
declare function roundRobinSelector(): (connections: Connection[]) => Connection;
|
||||
declare function randomSelector(connections: Connection[]): Connection;
|
||||
|
||||
export declare const internals: {
|
||||
declare const internals: {
|
||||
defaultNodeFilter: typeof defaultNodeFilter;
|
||||
roundRobinSelector: typeof roundRobinSelector;
|
||||
randomSelector: typeof randomSelector;
|
||||
};
|
||||
|
||||
export {};
|
||||
export {
|
||||
// Interfaces
|
||||
ConnectionPoolOptions,
|
||||
getConnectionOptions,
|
||||
ApiKeyAuth,
|
||||
BasicAuth,
|
||||
internals,
|
||||
resurrectOptions,
|
||||
ResurrectEvent,
|
||||
// Classes
|
||||
BaseConnectionPool,
|
||||
ConnectionPool,
|
||||
CloudConnectionPool
|
||||
};
|
||||
15
lib/pool/index.js
Normal file
15
lib/pool/index.js
Normal file
@ -0,0 +1,15 @@
|
||||
// Licensed to Elasticsearch B.V under one or more agreements.
|
||||
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
||||
// See the LICENSE file in the project root for more information
|
||||
|
||||
'use strict'
|
||||
|
||||
const BaseConnectionPool = require('./BaseConnectionPool')
|
||||
const ConnectionPool = require('./ConnectionPool')
|
||||
const CloudConnectionPool = require('./CloudConnectionPool')
|
||||
|
||||
module.exports = {
|
||||
BaseConnectionPool,
|
||||
ConnectionPool,
|
||||
CloudConnectionPool
|
||||
}
|
||||
Reference in New Issue
Block a user