WIP: initial prototype

- Added keep-alive Agent
- Added support for HTTPS
- Added log events
This commit is contained in:
delvedor
2018-10-19 15:04:07 +02:00
parent d0ac6b1f52
commit 68cca6069b
4 changed files with 74 additions and 29 deletions

View File

@ -1,5 +1,6 @@
'use strict'
const { EventEmitter } = require('events')
const Transport = require('./lib/Transport')
const Connection = require('./lib/Connection')
const ConnectionPool = require('./lib/ConnectionPool')
@ -17,16 +18,24 @@ const {
kSelector
} = symbols
class Client {
class Client extends EventEmitter {
constructor (opts = {}) {
super()
if (!opts.host) {
throw new BadConfigurationError('Missing host option')
}
// if (opts.log) {
// this.on('response', console.log)
// this.on('error', console.log)
// }
if (opts.log === true) {
this.on('request', console.log)
this.on('response', console.log)
this.on('error', console.log)
}
// The logging is exposed via events, which the user can
// listen to and log the message its preferred way
// we add a fake listener to the error event to avoid
// the "unhandled error event" error.
this.on('error', () => {})
const Selector = selectors.RoundRobinSelector
const options = Object.assign({}, {
@ -38,15 +47,22 @@ class Client {
maxRetries: 3,
requestTimeout: 30000,
sniffInterval: false,
sniffOnStart: false
sniffOnStart: false,
ssl: null
}, opts)
this[kSelector] = new options.Selector()
this[kSerializer] = new options.Serializer()
this[kConnectionPool] = new options.ConnectionPool({
selector: this[kSelector]
selector: this[kSelector],
ssl: options.ssl
})
// Add the connections before initialize the Transport
this[kConnectionPool].addConnection(options.host)
this[kTransport] = new options.Transport({
emit: this.emit.bind(this),
connectionPool: this[kConnectionPool],
serializer: this[kSerializer],
maxRetries: options.maxRetries,
@ -65,7 +81,6 @@ class Client {
// this[api] = apis[api]
// })
this[kConnectionPool].addConnection(options.host)
}
}

View File

@ -1,31 +1,47 @@
'use strict'
const assert = require('assert')
const debug = require('debug')('elasticsearch')
const { Agent: HttpAgent } = require('http')
const { Agent: HttpsAgent } = require('https')
const { resolve } = require('url')
const debug = require('debug')('elasticsearch')
const makeRequest = require('simple-get')
class Connection {
constructor (opts = {}) {
assert(opts.url, 'Missing url')
assert(opts.host, 'Missing host data')
this.url = opts.url
this.id = opts.id || opts.url
this.host = opts.host
this.ssl = opts.host.ssl || opts.ssl || null
this.id = opts.id || opts.host.href
this.deadCount = 0
this.resurrectTimeout = 0
this._status = opts.status || Connection.statuses.ALIVE
this.roles = opts.roles || defaultRoles
const agentOptions = Object.assign({}, {
keepAlive: true,
keepAliveMsecs: 1000,
maxSockets: Infinity,
maxFreeSockets: 256,
timeout: 60000
}, opts.host.agent || opts.agent)
this._agent = this.host.protocol === 'http:'
? new HttpAgent(agentOptions)
: new HttpsAgent(Object.assign({}, agentOptions, this.ssl))
}
request (params, callback) {
params.url = resolve(this.url, params.path)
params.url = resolve(this.host.href, params.path)
params.agent = this._agent
debug('Starting a new request', params)
return makeRequest(params, callback)
}
close () {
debug('Closing connection')
debug('Closing connection', this.id)
this._agent.destroy()
}
setRole (role, enabled) {

View File

@ -1,5 +1,6 @@
'use strict'
const { URL } = require('url')
const debug = require('debug')('elasticsearch')
const Connection = require('./Connection')
@ -9,6 +10,7 @@ class ConnectionPool {
this.alive = []
this.dead = []
this.selector = opts.selector
this.sll = opts.sll
// the resurrect timeout is 60s
// we multiply it by 2 because the resurrect formula is
// `Math.pow(resurrectTimeout * 2, deadCount -1)`
@ -132,16 +134,17 @@ class ConnectionPool {
* @param {object|string} host
* @returns {ConnectionPool}
*/
addConnection (host) {
if (Array.isArray(host)) {
host.forEach(h => this.addConnection(h))
addConnection (opts) {
if (Array.isArray(opts)) {
opts.forEach(o => this.addConnection(o))
return
}
if (typeof host === 'string') {
host = this.urlToHost(host)
if (typeof opts === 'string') {
opts = this.urlToHost(opts)
}
const connection = new Connection(host)
Object.assign(opts, this.ssl)
const connection = new Connection(opts)
debug('Adding a new connection', connection)
this.connections.set(connection.id, connection)
this.alive.push(connection.id)
@ -156,6 +159,7 @@ class ConnectionPool {
*/
removeConnection (connection) {
debug('Removing connection', connection)
connection.close()
const { id } = connection
this.connections.delete(id)
var index = this.dead.indexOf(id)
@ -193,8 +197,14 @@ class ConnectionPool {
for (var i = 0, len = ids.length; i < len; i++) {
const node = nodes[ids[i]]
// If there is no protocol in
// the `publish_address` new URL wil throw
var address = node.http.publish_address
address = address.slice(0, 4) === 'http'
? address
: 'http://' + address
hosts.push({
url: node.http.publish_address,
host: new URL(address),
id: ids[i],
roles: node.roles.reduce((acc, role) => {
acc[role] = true
@ -214,8 +224,7 @@ class ConnectionPool {
*/
urlToHost (url) {
return {
id: url,
url
host: new URL(url)
}
}
}

View File

@ -14,6 +14,7 @@ const kRemainingAttempts = Symbol('elasticsearch-remaining-attempts')
class Transport {
constructor (opts = {}) {
this.emit = opts.emit
this.connectionPool = opts.connectionPool
this.serializer = opts.serializer
this.maxRetries = opts.maxRetries
@ -37,7 +38,7 @@ class Transport {
return callback(new NoLivingConnectionsError('There are not living connections'))
}
if (params.body !== null) {
if (params.body != null) {
try {
params.body = this.serializer.serialize(params.body)
} catch (err) {
@ -48,6 +49,7 @@ class Transport {
params.headers['Content-Length'] = '' + Buffer.byteLength(params.body)
}
this.emit('request', params)
const request = connection.request(params, (err, response) => {
if (err != null) {
this.connectionPool.markDead(connection)
@ -57,11 +59,12 @@ class Transport {
return this.request(params, callback)
}
if (err.message === 'Request timed out') {
return callback(new TimeoutError(err.message))
} else {
return callback(new ConnectionError(err.message))
}
const error = err.message === 'Request timed out'
? new TimeoutError(err.message)
: ConnectionError(err.message)
this.emit('error', error, params)
return callback(error)
}
var json = ''
@ -73,9 +76,11 @@ class Transport {
try {
var payload = this.serializer.deserialize(json)
} catch (err) {
this.emit('error', err)
return callback(err)
}
const { statusCode } = response
this.emit('response', params, { statusCode, payload })
if (statusCode >= 400) {
callback(new ResponseError(payload))
} else {