WIP: initial prototype
- Added more sniffing options - Added support for different resurrection strategies - Fixed url resolving
This commit is contained in:
10
index.js
10
index.js
@ -46,8 +46,12 @@ class Client extends EventEmitter {
|
||||
Selector,
|
||||
maxRetries: 3,
|
||||
requestTimeout: 30000,
|
||||
pingTimeout: 3000,
|
||||
sniffInterval: false,
|
||||
sniffOnStart: false,
|
||||
sniffEndpoint: '_nodes/_all/http',
|
||||
sniffOnConnectionFault: false,
|
||||
resurrectStrategy: 'ping',
|
||||
ssl: null,
|
||||
agent: null
|
||||
}, opts)
|
||||
@ -55,6 +59,8 @@ class Client extends EventEmitter {
|
||||
this[kSelector] = new options.Selector()
|
||||
this[kSerializer] = new options.Serializer()
|
||||
this[kConnectionPool] = new options.ConnectionPool({
|
||||
pingTimeout: opts.pingTimeout,
|
||||
resurrectStrategy: opts.resurrectStrategy,
|
||||
selector: this[kSelector],
|
||||
ssl: options.ssl,
|
||||
agent: null
|
||||
@ -70,7 +76,9 @@ class Client extends EventEmitter {
|
||||
maxRetries: options.maxRetries,
|
||||
requestTimeout: options.requestTimeout,
|
||||
sniffInterval: options.sniffInterval,
|
||||
sniffOnStart: options.sniffOnStart
|
||||
sniffOnStart: options.sniffOnStart,
|
||||
sniffOnConnectionFault: options.sniffOnConnectionFault,
|
||||
sniffEndpoint: options.sniffEndpoint
|
||||
})
|
||||
|
||||
this.request = this[kTransport].request.bind(this[kTransport])
|
||||
|
||||
@ -3,7 +3,6 @@
|
||||
const assert = require('assert')
|
||||
const { Agent: HttpAgent } = require('http')
|
||||
const { Agent: HttpsAgent } = require('https')
|
||||
const { resolve } = require('url')
|
||||
const debug = require('debug')('elasticsearch')
|
||||
const makeRequest = require('simple-get')
|
||||
|
||||
@ -95,4 +94,17 @@ const validStatuses = Object.keys(Connection.statuses)
|
||||
const validRoles = Object.keys(Connection.roles)
|
||||
.map(k => Connection.roles[k])
|
||||
|
||||
function resolve (host, path) {
|
||||
const hostEndWithSlash = host[host.length - 1] === '/'
|
||||
const pathStartsWithSlash = path[0] === '/'
|
||||
|
||||
if (hostEndWithSlash === true && pathStartsWithSlash === true) {
|
||||
return host + path.slice(1)
|
||||
} else if (hostEndWithSlash !== pathStartsWithSlash) {
|
||||
return host + path
|
||||
} else {
|
||||
return host + '/' + path
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = Connection
|
||||
|
||||
@ -1,8 +1,10 @@
|
||||
'use strict'
|
||||
|
||||
const assert = require('assert')
|
||||
const { URL } = require('url')
|
||||
const debug = require('debug')('elasticsearch')
|
||||
const Connection = require('./Connection')
|
||||
const noop = () => {}
|
||||
|
||||
class ConnectionPool {
|
||||
constructor (opts = {}) {
|
||||
@ -21,6 +23,14 @@ class ConnectionPool {
|
||||
// number of consecutive failures after which
|
||||
// the timeout doesn't increase
|
||||
this.resurrectTimeoutCutoff = 5
|
||||
this.pingTimeout = opts.pingTimeout
|
||||
|
||||
const resurrectStrategy = opts.resurrectStrategy || 'ping'
|
||||
this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy]
|
||||
assert(
|
||||
this.resurrectStrategy != null,
|
||||
`Invalid resurrection strategy: '${resurrectStrategy}'`
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -84,18 +94,20 @@ class ConnectionPool {
|
||||
}
|
||||
|
||||
/**
|
||||
* Tries to resurrect a connection if the `resurrectTimeout`
|
||||
* has been reached, if so, it moves the connection to the
|
||||
* alive list without resetting the `deadCount` or the `resurrectTimeout`
|
||||
* If enabled, tries to resurrect a connection with the given
|
||||
* resurrect strategy ('ping', 'optimistic', 'none').
|
||||
*
|
||||
* @param {number} epoch
|
||||
* @returns {object} connection
|
||||
* @param {function} callback (isAlive, connection)
|
||||
*/
|
||||
resurrect (now = Date.now()) {
|
||||
if (this.dead.length === 0) return
|
||||
resurrect (now = Date.now(), callback = noop) {
|
||||
if (this.resurrectStrategy === 0 || this.dead.length === 0) {
|
||||
callback(null, null)
|
||||
return
|
||||
}
|
||||
|
||||
// the dead list is sorted in ascending order based on the timeout
|
||||
// so the first element will always be the one with the smalles timeout
|
||||
// so the first element will always be the one with the smaller timeout
|
||||
const connection = this.connections.get(this.dead[0])
|
||||
if (now < connection.resurrectTimeout) {
|
||||
debug('Nothing to resurrect')
|
||||
@ -103,13 +115,34 @@ class ConnectionPool {
|
||||
}
|
||||
|
||||
const { id } = connection
|
||||
debug(`Trying resurrect connection '${id}'`)
|
||||
this.alive.push(id)
|
||||
this.dead.splice(this.dead.indexOf(id), 1)
|
||||
|
||||
connection.status = Connection.statuses.ALIVE
|
||||
this.connections.set(id, connection)
|
||||
return connection
|
||||
// ping strategy
|
||||
if (this.resurrectStrategy === 1) {
|
||||
connection.request({
|
||||
method: 'HEAD',
|
||||
path: '/',
|
||||
timeout: this.pingTimeout
|
||||
}, (err, res) => {
|
||||
var isAlive = true
|
||||
if (err != null) {
|
||||
debug(`Resurrect: connection '${id}' is still dead`)
|
||||
this.markDead(connection)
|
||||
isAlive = false
|
||||
} else {
|
||||
debug(`Resurrect: connection '${id}' is now alive`)
|
||||
this.markAlive(connection)
|
||||
}
|
||||
callback(isAlive, connection)
|
||||
})
|
||||
// optimistic strategy
|
||||
} else {
|
||||
debug(`Resurrect: optimistic resurrection for connection '${id}'`)
|
||||
this.alive.push(id)
|
||||
this.dead.splice(this.dead.indexOf(id), 1)
|
||||
connection.status = Connection.statuses.ALIVE
|
||||
this.connections.set(id, connection)
|
||||
callback(null, connection)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -150,7 +183,7 @@ class ConnectionPool {
|
||||
const connection = new Connection(opts)
|
||||
debug('Adding a new connection', connection)
|
||||
if (this.connections.has(connection.id)) {
|
||||
throw new Error(`Connection with id '${connection.id} is already present`)
|
||||
throw new Error(`Connection with id '${connection.id}' is already present`)
|
||||
}
|
||||
this.connections.set(connection.id, connection)
|
||||
this.alive.push(connection.id)
|
||||
@ -235,4 +268,10 @@ class ConnectionPool {
|
||||
}
|
||||
}
|
||||
|
||||
ConnectionPool.resurrectStrategies = {
|
||||
none: 0,
|
||||
ping: 1,
|
||||
optimistic: 2
|
||||
}
|
||||
|
||||
module.exports = ConnectionPool
|
||||
|
||||
@ -20,6 +20,8 @@ class Transport {
|
||||
this.maxRetries = opts.maxRetries
|
||||
this.requestTimeout = opts.requestTimeout
|
||||
this.sniffInterval = opts.sniffInterval
|
||||
this.sniffOnConnectionFault = opts.sniffOnConnectionFault
|
||||
this.sniffEndpoint = opts.sniffEndpoint
|
||||
|
||||
this._sniffEnabled = typeof this.sniffInterval === 'number'
|
||||
this._nextSniff = this._sniffEnabled ? (Date.now() + this.sniffInterval) : 0
|
||||
@ -54,6 +56,10 @@ class Transport {
|
||||
const request = connection.request(params, (err, response) => {
|
||||
if (err != null) {
|
||||
this.connectionPool.markDead(connection)
|
||||
if (this.sniffOnConnectionFault === true) {
|
||||
this.sniff()
|
||||
}
|
||||
|
||||
if (attempts > 0) {
|
||||
debug(`Retrying request, there are still ${attempts} attempts`, params)
|
||||
params[kRemainingAttempts] = attempts - 1
|
||||
@ -73,6 +79,7 @@ class Transport {
|
||||
response.on('data', chunk => { json += chunk })
|
||||
response.on('error', err => callback(new ConnectionError(err.message)))
|
||||
response.on('end', () => {
|
||||
debug('JSON response', params, json)
|
||||
this.connectionPool.markAlive(connection)
|
||||
const contentType = response.headers['content-type']
|
||||
if (contentType != null && contentType.indexOf('application/json') > -1) {
|
||||
@ -106,32 +113,38 @@ class Transport {
|
||||
getConnection () {
|
||||
const now = Date.now()
|
||||
if (this._sniffEnabled === true && now > this._nextSniff) {
|
||||
this.sniff(now)
|
||||
this.sniff()
|
||||
}
|
||||
this.connectionPool.resurrect(now)
|
||||
return this.connectionPool.getConnection()
|
||||
}
|
||||
|
||||
sniff (now = Date.now(), callback = noop) {
|
||||
sniff (callback = noop) {
|
||||
if (this._isSniffing === true) return
|
||||
this._isSniffing = true
|
||||
debug('Started sniffing request')
|
||||
|
||||
this.request({
|
||||
method: 'GET',
|
||||
path: '_nodes/_all/http'
|
||||
path: this.sniffEndpoint
|
||||
}, (err, body) => {
|
||||
this._isSniffing = false
|
||||
if (this._sniffEnabled === true) {
|
||||
this._nextSniff = now + this.sniffInterval
|
||||
this._nextSniff = Date.now() + this.sniffInterval
|
||||
}
|
||||
if (err) {
|
||||
debug('Siffing errored', err)
|
||||
|
||||
if (err != null) {
|
||||
debug('Sniffing errored', err)
|
||||
return callback(err)
|
||||
}
|
||||
debug('Siffing ended successfully', body)
|
||||
|
||||
debug('Sniffing ended successfully', body)
|
||||
const hosts = this.connectionPool.nodesToHost(body.nodes)
|
||||
this.connectionPool
|
||||
.empty()
|
||||
.addConnection(hosts)
|
||||
callback()
|
||||
|
||||
callback(null, hosts)
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user