From 327f191056219388a69e39bc82ae3e54db36687d Mon Sep 17 00:00:00 2001 From: Spencer Alger Date: Thu, 6 Feb 2014 22:30:29 -0700 Subject: [PATCH] fixed and added a test for keepalive connection pool forced shutdown --- CHANGELOG.md | 4 ++-- docs/configuration.asciidoc | 11 ++++++---- src/lib/connectors/http.js | 26 ++++++++++++++++++++++- src/lib/transport.js | 35 ++++++++++++++++++++++++++----- test/fixtures/keepalive.js | 26 +++++++++++++++++++++++ test/unit/specs/http_connector.js | 22 +++++++++++++++++++ 6 files changed, 112 insertions(+), 12 deletions(-) create mode 100644 test/fixtures/keepalive.js diff --git a/CHANGELOG.md b/CHANGELOG.md index 206849094..af6c25b92 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,8 +2,8 @@ ## 1.5 (Feb 6 2014) - Switched out `keepaliveagent` dependency with `forever-agent`, which is used in the ever popular `request` module, and is much simpler. -- The option to use keep-alive is now all or nothing. `maxKeepAliveTime` and `maxKeepAliveRequests` config parameters have been replaced by `forever`, which will keep connections open "forever". See: http://www.elasticsearch.org/guide/en/elasticsearch/client/javascript-api/current/configuration.html#configuration -- Closing the client with `forever` turned on will allow the process to exit. #40 +- The option to use keep-alive is now all or nothing. `maxKeepAliveTime` and `maxKeepAliveRequests` config parameters have been replaced by `keepAlive`, which will keeps at least `minSockets` connections open forever. See: http://www.elasticsearch.org/guide/en/elasticsearch/client/javascript-api/current/configuration.html#configuration +- Closing the client with `keepAlive` turned on will allow the process to exit. https://github.com/elasticsearch/elasticsearch-js/issues/40 ## 1.4 (Jan 30 2014) - The trace log messages will now diaplay the actual host connected to (without auth info) unless they are being written to a bash script diff --git a/docs/configuration.asciidoc b/docs/configuration.asciidoc index 529994bfb..08adf1f91 100644 --- a/docs/configuration.asciidoc +++ b/docs/configuration.asciidoc @@ -102,18 +102,21 @@ Default::: `30000` +`keepAlive`:: `Boolean` -- Should the connections to the node be kept open forever? This behavior is recommended when you are connecting directly to Elasticsearch. + +Default::: `true` -`maxSockets`:: `Number` -- Number of sockets each connection should keep to it's corresponding node. This will also be the maximum number of concurrent requests that could be made to that node. + +`maxSockets`:: `Number` -- Maximum number of concurrent requests that can be made to any node. Default::: `10` +`minSockets`:: `Number` -- Minimum number of sockets to keep connected to a node, only applies when `keepAlive` is true -`forever`:: `Boolean` -- Should the connections to the node be kept open forever? This behavior is recommended when you are connecting directly to Elasticsearch. - -Default::: `true` +Default::: `10` diff --git a/src/lib/connectors/http.js b/src/lib/connectors/http.js index ef392d076..2e7660cf5 100644 --- a/src/lib/connectors/http.js +++ b/src/lib/connectors/http.js @@ -34,13 +34,19 @@ function HttpConnector(host, config) { } config = _.defaults(config || {}, { - forever: true, + keepAlive: true, + minSockets: 10, maxSockets: 10 }); var Agent = this.hand.Agent; // the class if (config.forever) { + config.keepAlive = config.forever; + } + + if (config.keepAlive) { Agent = this.host.protocol === 'https' ? ForeverAgent.SSL : ForeverAgent; + this.on('status set', this.bound.onStatusSet); } this.agent = new Agent({ @@ -49,6 +55,24 @@ function HttpConnector(host, config) { } _.inherits(HttpConnector, ConnectionAbstract); +HttpConnector.prototype.onStatusSet = _.handler(function (status) { + if (status === 'closed') { + this.agent.minSockets = this.agent.maxSockets = 0; + + _.each(this.agent.sockets, function (sockets) { + _.each(sockets, function (s) { + s.destroy(); + }); + }); + + _.each(this.agent.freeSockets, function (sockets) { + _.each(sockets, function (s) { + s.destroy(); + }); + }); + } +}); + HttpConnector.prototype.makeReqParams = function (params) { params = params || {}; var host = this.host; diff --git a/src/lib/transport.js b/src/lib/transport.js index 99fdd2d69..fab411a14 100644 --- a/src/lib/transport.js +++ b/src/lib/transport.js @@ -71,9 +71,9 @@ function Transport(config) { } if (config.sniffInterval) { - this._sniffTimeout = setTimeout(function doSniff() { + this._timeout(function doSniff() { self.sniff(); - self._sniffTimeout = setTimeout(doSniff, config.sniffInterval); + self._timeout(doSniff, config.sniffInterval); }, config.sniffInterval); } @@ -196,7 +196,7 @@ Transport.prototype.request = function (params, cb) { return; } - clearTimeout(requestTimeoutId); + self._timeout(requestTimeoutId); var parsedBody; var isJson = !headers || (headers['content-type'] && ~headers['content-type'].indexOf('application/json')); @@ -258,14 +258,14 @@ Transport.prototype.request = function (params, cb) { aborted = true; remainingRetries = 0; - clearTimeout(requestTimeoutId); + self._timeout(requestTimeoutId); if (typeof requestAborter === 'function') { requestAborter(); } } if (requestTimeout && requestTimeout !== Infinity) { - requestTimeoutId = setTimeout(function () { + requestTimeoutId = this._timeout(function () { respond(new errors.RequestTimeout('Request Timeout after ' + requestTimeout + 'ms')); abortRequest(); }, requestTimeout); @@ -292,7 +292,31 @@ Transport.prototype.request = function (params, cb) { return ret; }; +Transport.prototype._timeout = function (cb, delay) { + this._timers = this._timers || []; + var id; + if ('function' !== typeof cb) { + id = cb; + cb = void 0; + } + + if (cb) { + // set the timer + id = setTimeout(cb, delay); + this._timers.push(id); + return id; + } + + if (id) { + clearTimeout(id); + + var i = this._timers.indexOf(id); + if (i !== -1) { + this._timers.splice(i, 1); + } + } +}; /** * Ask an ES node for a list of all the nodes, add/remove nodes from the connection @@ -327,5 +351,6 @@ Transport.prototype.sniff = function (cb) { */ Transport.prototype.close = function () { this.log.close(); + _.each(this._timers, clearTimeout); this.connectionPool.close(); }; diff --git a/test/fixtures/keepalive.js b/test/fixtures/keepalive.js new file mode 100644 index 000000000..39affd68f --- /dev/null +++ b/test/fixtures/keepalive.js @@ -0,0 +1,26 @@ +var elasticsearch = require('../../src/elasticsearch'); +var _ = require('lodash'); +var es = elasticsearch.Client({ + host: 'localhost:5555', + log: false +}); + +es.search({ + index: '_all', + type: '_all', + body: { + query: { + match_all: {} + } + } +}, function (err, resp) { + var conn = _.union(es.transport.connectionPool._conns.dead, es.transport.connectionPool._conns.alive).pop(); + es.close(); + var destroyedSockets = 0; + function countDestroyed(sockets) { + destroyedSockets += _.where(sockets, { destroyed: true}).length; + } + _.each(conn.agent.sockets, countDestroyed); + _.each(conn.agent.freeSockets, countDestroyed); + console.log(destroyedSockets); +}); \ No newline at end of file diff --git a/test/unit/specs/http_connector.js b/test/unit/specs/http_connector.js index a1e63740e..12fe8d076 100644 --- a/test/unit/specs/http_connector.js +++ b/test/unit/specs/http_connector.js @@ -369,4 +369,26 @@ describe('Http Connector', function () { }); }); + describe('Connection cleanup', function () { + it('destroys any connections created', function (done) { + this.timeout(1000); + var cp = require('child_process'); + var path = require('path'); + var es = require('event-stream'); + + var proc = cp.fork(path.join(__dirname, '../../fixtures/keepalive.js'), { + silent: true + }); + + es.merge( + proc.stdout, + proc.stderr + ).pipe(es.wait(function (err, output) { + expect(err).to.eql(null); + expect(output.trim()).to.eql('1'); + done(); + })); + }); + }); + });