From 3c8aaaecd118e162721582425e50504930e1f16d Mon Sep 17 00:00:00 2001 From: delvedor Date: Wed, 24 Oct 2018 15:47:53 +0200 Subject: [PATCH] WIP: initial prototype - Added more sniffing options - Added support for different resurrection strategies - Fixed url resolving --- index.js | 10 ++++++- lib/Connection.js | 14 ++++++++- lib/ConnectionPool.js | 67 ++++++++++++++++++++++++++++++++++--------- lib/Transport.js | 29 +++++++++++++------ 4 files changed, 96 insertions(+), 24 deletions(-) diff --git a/index.js b/index.js index 8454bce5d..0d62f44ad 100644 --- a/index.js +++ b/index.js @@ -46,8 +46,12 @@ class Client extends EventEmitter { Selector, maxRetries: 3, requestTimeout: 30000, + pingTimeout: 3000, sniffInterval: false, sniffOnStart: false, + sniffEndpoint: '_nodes/_all/http', + sniffOnConnectionFault: false, + resurrectStrategy: 'ping', ssl: null, agent: null }, opts) @@ -55,6 +59,8 @@ class Client extends EventEmitter { this[kSelector] = new options.Selector() this[kSerializer] = new options.Serializer() this[kConnectionPool] = new options.ConnectionPool({ + pingTimeout: opts.pingTimeout, + resurrectStrategy: opts.resurrectStrategy, selector: this[kSelector], ssl: options.ssl, agent: null @@ -70,7 +76,9 @@ class Client extends EventEmitter { maxRetries: options.maxRetries, requestTimeout: options.requestTimeout, sniffInterval: options.sniffInterval, - sniffOnStart: options.sniffOnStart + sniffOnStart: options.sniffOnStart, + sniffOnConnectionFault: options.sniffOnConnectionFault, + sniffEndpoint: options.sniffEndpoint }) this.request = this[kTransport].request.bind(this[kTransport]) diff --git a/lib/Connection.js b/lib/Connection.js index 99d9f7be9..af470eb8c 100644 --- a/lib/Connection.js +++ b/lib/Connection.js @@ -3,7 +3,6 @@ const assert = require('assert') const { Agent: HttpAgent } = require('http') const { Agent: HttpsAgent } = require('https') -const { resolve } = require('url') const debug = require('debug')('elasticsearch') const makeRequest = require('simple-get') @@ -95,4 +94,17 @@ const validStatuses = Object.keys(Connection.statuses) const validRoles = Object.keys(Connection.roles) .map(k => Connection.roles[k]) +function resolve (host, path) { + const hostEndWithSlash = host[host.length - 1] === '/' + const pathStartsWithSlash = path[0] === '/' + + if (hostEndWithSlash === true && pathStartsWithSlash === true) { + return host + path.slice(1) + } else if (hostEndWithSlash !== pathStartsWithSlash) { + return host + path + } else { + return host + '/' + path + } +} + module.exports = Connection diff --git a/lib/ConnectionPool.js b/lib/ConnectionPool.js index e6fbd446b..37bed21e0 100644 --- a/lib/ConnectionPool.js +++ b/lib/ConnectionPool.js @@ -1,8 +1,10 @@ 'use strict' +const assert = require('assert') const { URL } = require('url') const debug = require('debug')('elasticsearch') const Connection = require('./Connection') +const noop = () => {} class ConnectionPool { constructor (opts = {}) { @@ -21,6 +23,14 @@ class ConnectionPool { // number of consecutive failures after which // the timeout doesn't increase this.resurrectTimeoutCutoff = 5 + this.pingTimeout = opts.pingTimeout + + const resurrectStrategy = opts.resurrectStrategy || 'ping' + this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy] + assert( + this.resurrectStrategy != null, + `Invalid resurrection strategy: '${resurrectStrategy}'` + ) } /** @@ -84,18 +94,20 @@ class ConnectionPool { } /** - * Tries to resurrect a connection if the `resurrectTimeout` - * has been reached, if so, it moves the connection to the - * alive list without resetting the `deadCount` or the `resurrectTimeout` + * If enabled, tries to resurrect a connection with the given + * resurrect strategy ('ping', 'optimistic', 'none'). * * @param {number} epoch - * @returns {object} connection + * @param {function} callback (isAlive, connection) */ - resurrect (now = Date.now()) { - if (this.dead.length === 0) return + resurrect (now = Date.now(), callback = noop) { + if (this.resurrectStrategy === 0 || this.dead.length === 0) { + callback(null, null) + return + } // the dead list is sorted in ascending order based on the timeout - // so the first element will always be the one with the smalles timeout + // so the first element will always be the one with the smaller timeout const connection = this.connections.get(this.dead[0]) if (now < connection.resurrectTimeout) { debug('Nothing to resurrect') @@ -103,13 +115,34 @@ class ConnectionPool { } const { id } = connection - debug(`Trying resurrect connection '${id}'`) - this.alive.push(id) - this.dead.splice(this.dead.indexOf(id), 1) - connection.status = Connection.statuses.ALIVE - this.connections.set(id, connection) - return connection + // ping strategy + if (this.resurrectStrategy === 1) { + connection.request({ + method: 'HEAD', + path: '/', + timeout: this.pingTimeout + }, (err, res) => { + var isAlive = true + if (err != null) { + debug(`Resurrect: connection '${id}' is still dead`) + this.markDead(connection) + isAlive = false + } else { + debug(`Resurrect: connection '${id}' is now alive`) + this.markAlive(connection) + } + callback(isAlive, connection) + }) + // optimistic strategy + } else { + debug(`Resurrect: optimistic resurrection for connection '${id}'`) + this.alive.push(id) + this.dead.splice(this.dead.indexOf(id), 1) + connection.status = Connection.statuses.ALIVE + this.connections.set(id, connection) + callback(null, connection) + } } /** @@ -150,7 +183,7 @@ class ConnectionPool { const connection = new Connection(opts) debug('Adding a new connection', connection) if (this.connections.has(connection.id)) { - throw new Error(`Connection with id '${connection.id} is already present`) + throw new Error(`Connection with id '${connection.id}' is already present`) } this.connections.set(connection.id, connection) this.alive.push(connection.id) @@ -235,4 +268,10 @@ class ConnectionPool { } } +ConnectionPool.resurrectStrategies = { + none: 0, + ping: 1, + optimistic: 2 +} + module.exports = ConnectionPool diff --git a/lib/Transport.js b/lib/Transport.js index 6f30dc7c0..eaa9a1ca2 100644 --- a/lib/Transport.js +++ b/lib/Transport.js @@ -20,6 +20,8 @@ class Transport { this.maxRetries = opts.maxRetries this.requestTimeout = opts.requestTimeout this.sniffInterval = opts.sniffInterval + this.sniffOnConnectionFault = opts.sniffOnConnectionFault + this.sniffEndpoint = opts.sniffEndpoint this._sniffEnabled = typeof this.sniffInterval === 'number' this._nextSniff = this._sniffEnabled ? (Date.now() + this.sniffInterval) : 0 @@ -54,6 +56,10 @@ class Transport { const request = connection.request(params, (err, response) => { if (err != null) { this.connectionPool.markDead(connection) + if (this.sniffOnConnectionFault === true) { + this.sniff() + } + if (attempts > 0) { debug(`Retrying request, there are still ${attempts} attempts`, params) params[kRemainingAttempts] = attempts - 1 @@ -73,6 +79,7 @@ class Transport { response.on('data', chunk => { json += chunk }) response.on('error', err => callback(new ConnectionError(err.message))) response.on('end', () => { + debug('JSON response', params, json) this.connectionPool.markAlive(connection) const contentType = response.headers['content-type'] if (contentType != null && contentType.indexOf('application/json') > -1) { @@ -106,32 +113,38 @@ class Transport { getConnection () { const now = Date.now() if (this._sniffEnabled === true && now > this._nextSniff) { - this.sniff(now) + this.sniff() } this.connectionPool.resurrect(now) return this.connectionPool.getConnection() } - sniff (now = Date.now(), callback = noop) { + sniff (callback = noop) { if (this._isSniffing === true) return + this._isSniffing = true debug('Started sniffing request') + this.request({ method: 'GET', - path: '_nodes/_all/http' + path: this.sniffEndpoint }, (err, body) => { + this._isSniffing = false if (this._sniffEnabled === true) { - this._nextSniff = now + this.sniffInterval + this._nextSniff = Date.now() + this.sniffInterval } - if (err) { - debug('Siffing errored', err) + + if (err != null) { + debug('Sniffing errored', err) return callback(err) } - debug('Siffing ended successfully', body) + + debug('Sniffing ended successfully', body) const hosts = this.connectionPool.nodesToHost(body.nodes) this.connectionPool .empty() .addConnection(hosts) - callback() + + callback(null, hosts) }) } }