diff --git a/index.js b/index.js index 5691a6638..f71ecbf6a 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,6 @@ 'use strict' +const { EventEmitter } = require('events') const Transport = require('./lib/Transport') const Connection = require('./lib/Connection') const ConnectionPool = require('./lib/ConnectionPool') @@ -17,16 +18,24 @@ const { kSelector } = symbols -class Client { +class Client extends EventEmitter { constructor (opts = {}) { + super() if (!opts.host) { throw new BadConfigurationError('Missing host option') } - // if (opts.log) { - // this.on('response', console.log) - // this.on('error', console.log) - // } + if (opts.log === true) { + this.on('request', console.log) + this.on('response', console.log) + this.on('error', console.log) + } + + // The logging is exposed via events, which the user can + // listen to and log the message its preferred way + // we add a fake listener to the error event to avoid + // the "unhandled error event" error. + this.on('error', () => {}) const Selector = selectors.RoundRobinSelector const options = Object.assign({}, { @@ -38,15 +47,22 @@ class Client { maxRetries: 3, requestTimeout: 30000, sniffInterval: false, - sniffOnStart: false + sniffOnStart: false, + ssl: null }, opts) this[kSelector] = new options.Selector() this[kSerializer] = new options.Serializer() this[kConnectionPool] = new options.ConnectionPool({ - selector: this[kSelector] + selector: this[kSelector], + ssl: options.ssl }) + + // Add the connections before initialize the Transport + this[kConnectionPool].addConnection(options.host) + this[kTransport] = new options.Transport({ + emit: this.emit.bind(this), connectionPool: this[kConnectionPool], serializer: this[kSerializer], maxRetries: options.maxRetries, @@ -65,7 +81,6 @@ class Client { // this[api] = apis[api] // }) - this[kConnectionPool].addConnection(options.host) } } diff --git a/lib/Connection.js b/lib/Connection.js index 8e9d74386..a08765d8a 100644 --- a/lib/Connection.js +++ b/lib/Connection.js @@ -1,31 +1,47 @@ 'use strict' const assert = require('assert') -const debug = require('debug')('elasticsearch') +const { Agent: HttpAgent } = require('http') +const { Agent: HttpsAgent } = require('https') const { resolve } = require('url') +const debug = require('debug')('elasticsearch') const makeRequest = require('simple-get') class Connection { constructor (opts = {}) { - assert(opts.url, 'Missing url') + assert(opts.host, 'Missing host data') - this.url = opts.url - this.id = opts.id || opts.url + this.host = opts.host + this.ssl = opts.host.ssl || opts.ssl || null + this.id = opts.id || opts.host.href this.deadCount = 0 this.resurrectTimeout = 0 this._status = opts.status || Connection.statuses.ALIVE this.roles = opts.roles || defaultRoles + + const agentOptions = Object.assign({}, { + keepAlive: true, + keepAliveMsecs: 1000, + maxSockets: Infinity, + maxFreeSockets: 256, + timeout: 60000 + }, opts.host.agent || opts.agent) + this._agent = this.host.protocol === 'http:' + ? new HttpAgent(agentOptions) + : new HttpsAgent(Object.assign({}, agentOptions, this.ssl)) } request (params, callback) { - params.url = resolve(this.url, params.path) + params.url = resolve(this.host.href, params.path) + params.agent = this._agent debug('Starting a new request', params) return makeRequest(params, callback) } close () { - debug('Closing connection') + debug('Closing connection', this.id) + this._agent.destroy() } setRole (role, enabled) { diff --git a/lib/ConnectionPool.js b/lib/ConnectionPool.js index a1a1c3637..7e16b5269 100644 --- a/lib/ConnectionPool.js +++ b/lib/ConnectionPool.js @@ -1,5 +1,6 @@ 'use strict' +const { URL } = require('url') const debug = require('debug')('elasticsearch') const Connection = require('./Connection') @@ -9,6 +10,7 @@ class ConnectionPool { this.alive = [] this.dead = [] this.selector = opts.selector + this.sll = opts.sll // the resurrect timeout is 60s // we multiply it by 2 because the resurrect formula is // `Math.pow(resurrectTimeout * 2, deadCount -1)` @@ -132,16 +134,17 @@ class ConnectionPool { * @param {object|string} host * @returns {ConnectionPool} */ - addConnection (host) { - if (Array.isArray(host)) { - host.forEach(h => this.addConnection(h)) + addConnection (opts) { + if (Array.isArray(opts)) { + opts.forEach(o => this.addConnection(o)) return } - if (typeof host === 'string') { - host = this.urlToHost(host) + if (typeof opts === 'string') { + opts = this.urlToHost(opts) } - const connection = new Connection(host) + Object.assign(opts, this.ssl) + const connection = new Connection(opts) debug('Adding a new connection', connection) this.connections.set(connection.id, connection) this.alive.push(connection.id) @@ -156,6 +159,7 @@ class ConnectionPool { */ removeConnection (connection) { debug('Removing connection', connection) + connection.close() const { id } = connection this.connections.delete(id) var index = this.dead.indexOf(id) @@ -193,8 +197,14 @@ class ConnectionPool { for (var i = 0, len = ids.length; i < len; i++) { const node = nodes[ids[i]] + // If there is no protocol in + // the `publish_address` new URL wil throw + var address = node.http.publish_address + address = address.slice(0, 4) === 'http' + ? address + : 'http://' + address hosts.push({ - url: node.http.publish_address, + host: new URL(address), id: ids[i], roles: node.roles.reduce((acc, role) => { acc[role] = true @@ -214,8 +224,7 @@ class ConnectionPool { */ urlToHost (url) { return { - id: url, - url + host: new URL(url) } } } diff --git a/lib/Transport.js b/lib/Transport.js index eb8531c3f..3c962f2df 100644 --- a/lib/Transport.js +++ b/lib/Transport.js @@ -14,6 +14,7 @@ const kRemainingAttempts = Symbol('elasticsearch-remaining-attempts') class Transport { constructor (opts = {}) { + this.emit = opts.emit this.connectionPool = opts.connectionPool this.serializer = opts.serializer this.maxRetries = opts.maxRetries @@ -37,7 +38,7 @@ class Transport { return callback(new NoLivingConnectionsError('There are not living connections')) } - if (params.body !== null) { + if (params.body != null) { try { params.body = this.serializer.serialize(params.body) } catch (err) { @@ -48,6 +49,7 @@ class Transport { params.headers['Content-Length'] = '' + Buffer.byteLength(params.body) } + this.emit('request', params) const request = connection.request(params, (err, response) => { if (err != null) { this.connectionPool.markDead(connection) @@ -57,11 +59,12 @@ class Transport { return this.request(params, callback) } - if (err.message === 'Request timed out') { - return callback(new TimeoutError(err.message)) - } else { - return callback(new ConnectionError(err.message)) - } + const error = err.message === 'Request timed out' + ? new TimeoutError(err.message) + : ConnectionError(err.message) + + this.emit('error', error, params) + return callback(error) } var json = '' @@ -73,9 +76,11 @@ class Transport { try { var payload = this.serializer.deserialize(json) } catch (err) { + this.emit('error', err) return callback(err) } const { statusCode } = response + this.emit('response', params, { statusCode, payload }) if (statusCode >= 400) { callback(new ResponseError(payload)) } else {