WIP: initial prototype

- Use plain node http.request
- WIP: connection closing procedure
- Updated ConnectionPool.getConnection
- Removed Selector class and expose a single configuration parameter
- Added compression handling
- Improved code documentation
This commit is contained in:
delvedor
2018-11-05 19:19:39 +01:00
parent 2d3701d7e9
commit d77d97d301
5 changed files with 230 additions and 58 deletions

View File

@ -1,21 +1,23 @@
'use strict'
const assert = require('assert')
const { Agent: HttpAgent } = require('http')
const { Agent: HttpsAgent } = require('https')
const http = require('http')
const https = require('https')
const debug = require('debug')('elasticsearch')
const makeRequest = require('simple-get')
const decompressResponse = require('decompress-response')
const { TimeoutError } = require('./errors')
class Connection {
constructor (opts = {}) {
assert(opts.host, 'Missing host data')
this.host = opts.host
this.host = urlToOptions(opts.host)
this.ssl = opts.host.ssl || opts.ssl || null
this.id = opts.id || opts.host.href
this.deadCount = 0
this.resurrectTimeout = 0
this._openRequests = 0
this._status = opts.status || Connection.statuses.ALIVE
this.roles = opts.roles || defaultRoles
@ -26,23 +28,67 @@ class Connection {
maxFreeSockets: 256
}, opts.host.agent || opts.agent)
this._agent = this.host.protocol === 'http:'
? new HttpAgent(agentOptions)
: new HttpsAgent(Object.assign({}, agentOptions, this.ssl))
? new http.Agent(agentOptions)
: new https.Agent(Object.assign({}, agentOptions, this.ssl))
this.makeRequest = this.host.protocol === 'http:'
? http.request
: https.request
}
request (params, callback) {
params.url = resolve(this.host.href, params.path)
if (params.querystring != null && params.querystring.length > 0) {
params.url += '?' + params.querystring
}
this._openRequests++
var ended = false
params.agent = this._agent
debug('Starting a new request', params)
return makeRequest(params, callback)
const request = this.makeRequest(buildRequestObject(this.host, params))
// listen for the response event
// TODO: handle redirects?
request.on('response', response => {
if (ended === false) {
ended = true
this._openRequests--
callback(null, decompressResponse(response))
}
})
// handles request timeout
request.on('timeout', () => {
if (ended === false) {
ended = true
this._openRequests--
request.abort()
callback(new TimeoutError('Request timed out', params))
}
})
// handles request error
request.on('error', err => {
if (ended === false) {
ended = true
this._openRequests--
callback(err)
}
})
// Disables the Nagle algorithm
request.setNoDelay(true)
// starts the request
request.end(params.body)
return request
}
close () {
debug('Closing connection', this.id)
this._agent.destroy()
if (this._openRequests > 0) {
setTimeout(() => this.close(), 1000)
} else {
this._agent.destroy()
}
}
setRole (role, enabled) {
@ -110,4 +156,58 @@ function resolve (host, path) {
}
}
function buildRequestObject (host, request) {
var merged = {}
var hostKeys = Object.keys(host)
var requestKeys = Object.keys(request)
for (var i = 0, len = hostKeys.length; i < len; i++) {
var key = hostKeys[i]
merged[key] = host[key]
}
for (i = 0, len = requestKeys.length; i < len; i++) {
key = requestKeys[i]
if (key === 'path') {
merged.pathname = resolve(merged.pathname, request[key])
} else if (key === 'querystring' && !!request[key] === true) {
if (merged.search === '') {
merged.search = '?' + request[key]
} else {
merged.search += '&' + request[key]
}
} else {
merged[key] = request[key]
}
}
merged.path = merged.pathname + merged.search
return merged
}
// Utility function that converts a URL object into an ordinary
// options object as expected by the http.request and https.request APIs.
// https://github.com/nodejs/node/blob/v11.0.0/lib/internal/url.js#L1324
function urlToOptions (url) {
var options = {
protocol: url.protocol,
hostname: url.hostname.startsWith('[')
? url.hostname.slice(1, -1)
: url.hostname,
hash: url.hash,
search: url.search,
pathname: url.pathname,
path: `${url.pathname}${url.search}`,
href: url.href
}
if (url.port !== '') {
options.port = Number(url.port)
}
if (url.username || url.password) {
options.auth = `${url.username}:${url.password}`
}
return options
}
module.exports = Connection

View File

@ -9,6 +9,8 @@ const noop = () => {}
class ConnectionPool {
constructor (opts = {}) {
this.connections = new Map()
// TODO: do we need those queue? (or find a better use)
// can we use just the connections map?
this.alive = []
this.dead = []
this.selector = opts.selector
@ -24,7 +26,19 @@ class ConnectionPool {
// the timeout doesn't increase
this.resurrectTimeoutCutoff = 5
this.pingTimeout = opts.pingTimeout
this.randomizeHost = opts.randomizeHost
this.randomizeHost = opts.randomizeHost === true
this.nodeFilter = opts.nodeFilter || defaultNodeFilter
this.nodeWeighter = opts.nodeWeighter || noop
if (typeof opts.nodeSelector === 'function') {
this.nodeSelector = opts.nodeSelector
} else if (opts.nodeSelector === 'round-robin') {
this.nodeSelector = roundRobinSelector()
} else if (opts.nodeSelector === 'random') {
this.nodeSelector = randomSelector
} else {
this.nodeSelector = roundRobinSelector()
}
const resurrectStrategy = opts.resurrectStrategy || 'ping'
this.resurrectStrategy = ConnectionPool.resurrectStrategies[resurrectStrategy]
@ -150,18 +164,35 @@ class ConnectionPool {
/**
* Returns an alive connection if present,
* otherwise returns null.
* By default it does not apply a weighter to the node list,
* and filters the `master` only nodes
* It uses the selector to choose which
* connection return.
*
* @param {object} options (weighter, filter and selector)
* @returns {object|null} connection
*/
getConnection () {
getConnection (opts = {}) {
if (this.alive.length === 0) {
return null
}
const id = this.selector.select(this.alive)
return this.connections.get(id)
const filter = opts.filter || this.nodeFilter
const weighter = opts.weighter || this.nodeWeighter
const selector = opts.selector || this.nodeSelector
// TODO: can we cache this?
const connections = []
for (var connection of this.connections.values()) {
if (connection.status === Connection.statuses.ALIVE) {
if (filter(connection) === true) {
connections.push(connection)
}
}
}
connections.sort(weighter)
return selector(connections)
}
/**
@ -285,4 +316,30 @@ const shuffleArray = arr => arr
.sort((a, b) => a[0] - b[0])
.map(a => a[1])
function defaultNodeFilter (node) {
// avoid master only nodes
if (!!node.master === true &&
!!node.data === false &&
!!node.ingest === false) {
return false
}
return true
}
function roundRobinSelector () {
var current = -1
return function _roundRobinSelector (connections) {
if (++current >= connections.length) {
current = 0
}
return connections[current]
}
}
function randomSelector (connections) {
const index = Math.floor(Math.random() * connections.length)
return connections[index]
}
module.exports = ConnectionPool
module.exports.internals = { defaultNodeFilter, roundRobinSelector, randomSelector }

View File

@ -1,23 +0,0 @@
'use strict'
class RoundRobinSelector {
constructor () {
this.current = -1
}
select (connections) {
if (++this.current >= connections.length) {
this.current = 0
}
return connections[this.current]
}
}
class RandomSelector {
select (connections) {
const index = Math.floor(Math.random() * connections.length)
return connections[index]
}
}
module.exports = { RoundRobinSelector, RandomSelector }

View File

@ -20,6 +20,7 @@ class Transport {
this.serializer = opts.serializer
this.maxRetries = opts.maxRetries
this.requestTimeout = toMs(opts.requestTimeout)
this.suggestCompression = opts.suggestCompression === true
this.sniffInterval = opts.sniffInterval
this.sniffOnConnectionFault = opts.sniffOnConnectionFault
this.sniffEndpoint = opts.sniffEndpoint
@ -35,13 +36,14 @@ class Transport {
request (params, callback) {
callback = once(callback)
const result = { body: null, statusCode: null, headers: null }
const result = { body: null, statusCode: null, headers: null, warnings: null }
const attempts = params[kRemainingAttempts] || params.maxRetries || this.maxRetries
const connection = this.getConnection()
if (connection === null) {
return callback(new NoLivingConnectionsError('There are not living connections'), result)
}
params.headers = params.headers || {}
// handle json body
if (params.body != null) {
if (typeof params.body !== 'string') {
@ -51,7 +53,6 @@ class Transport {
return callback(err, result)
}
}
params.headers = params.headers || {}
params.headers['Content-Type'] = 'application/json'
params.headers['Content-Length'] = '' + Buffer.byteLength(params.body)
// handle ndjson body
@ -65,11 +66,14 @@ class Transport {
} else {
params.body = params.bulkBody
}
params.headers = params.headers || {}
params.headers['Content-Type'] = 'application/x-ndjson'
params.headers['Content-Length'] = '' + Buffer.byteLength(params.body)
}
if (this.suggestCompression === true) {
params.headers['Accept-Encoding'] = 'gzip,deflate'
}
// serializes the querystring
params.querystring = this.serializer.qserialize(params.querystring)
// handles request timeout
@ -77,21 +81,26 @@ class Transport {
this.emit('request', params)
// perform the actual http request
const request = connection.request(params, (err, response) => {
if (err != null) {
// if there is an error in the connection
// let's mark the connection as dead
this.connectionPool.markDead(connection)
if (this.sniffOnConnectionFault === true) {
this.sniff()
}
// retry logic
if (attempts > 0) {
debug(`Retrying request, there are still ${attempts} attempts`, params)
params[kRemainingAttempts] = attempts - 1
return this.request(params, callback)
}
const error = err.message === 'Request timed out'
? new TimeoutError(err.message, params)
const error = err instanceof TimeoutError
? err
: new ConnectionError(err.message, params)
this.emit('error', error, params)
@ -101,30 +110,47 @@ class Transport {
const { statusCode, headers } = response
result.statusCode = statusCode
result.headers = headers
if (headers['warning'] != null) {
// split the string over the commas not inside quotes
result.warnings = headers['warning'].split(/(?!\B"[^"]*),(?![^"]*"\B)/)
}
var payload = ''
// collect the payload
response.setEncoding('utf8')
response.on('data', chunk => { payload += chunk })
response.on('error', err => callback(new ConnectionError(err.message, params), result))
response.on('end', () => {
const isHead = params.method === 'HEAD'
const shouldDeserialize = headers['content-type'] != null && isHead === false && payload !== ''
if (shouldDeserialize === true && headers['content-type'].indexOf('application/json') > -1) {
// we should attempt the payload deserialization only if:
// - a `content-type` is defined and is equal to `application/json`
// - the request is not a HEAD request
// - the payload is not an empty string
if (headers['content-type'] != null &&
headers['content-type'].indexOf('application/json') > -1 &&
isHead === false &&
payload !== ''
) {
try {
result.body = this.serializer.deserialize(payload)
} catch (err) {
this.emit('error', err)
this.emit('error', err, params)
return callback(err, result)
}
} else {
// cast to boolean if the request method was HEAD
result.body = isHead === true ? true : payload
}
// we should ignore the statusCode if the user has configured the `ignore` field with
// the statusCode we just got or if the request method is HEAD and the statusCode is 404
const ignoreStatusCode = (Array.isArray(params.ignore) && params.ignore.indexOf(statusCode) > -1) ||
(isHead === true && statusCode === 404)
if (ignoreStatusCode === false &&
(statusCode === 502 || statusCode === 503 || statusCode === 504)) {
// if the statusCode is 502/3/4 we should run our retry strategy
// and mark the connection as dead
this.connectionPool.markDead(connection)
if (attempts > 0) {
debug(`Retrying request, there are still ${attempts} attempts`, params)
@ -132,6 +158,8 @@ class Transport {
return this.request(params, callback)
}
} else {
// everything has worked as expected, let's mark
// the connection as alive (or confirm it)
this.connectionPool.markAlive(connection)
}
@ -139,6 +167,7 @@ class Transport {
if (ignoreStatusCode === false && statusCode >= 400) {
callback(new ResponseError(result), result)
} else {
// cast to boolean if the request method was HEAD
if (isHead === true && statusCode === 404) {
result.body = false
}
@ -179,6 +208,7 @@ class Transport {
}
if (err != null) {
this.emit('error', err)
debug('Sniffing errored', err)
return callback(err)
}