fixed and added a test for keepalive connection pool forced shutdown

This commit is contained in:
Spencer Alger
2014-02-06 22:30:29 -07:00
parent 7bb19ac357
commit 327f191056
6 changed files with 112 additions and 12 deletions

View File

@ -2,8 +2,8 @@
## 1.5 (Feb 6 2014) ## 1.5 (Feb 6 2014)
- Switched out `keepaliveagent` dependency with `forever-agent`, which is used in the ever popular `request` module, and is much simpler. - Switched out `keepaliveagent` dependency with `forever-agent`, which is used in the ever popular `request` module, and is much simpler.
- The option to use keep-alive is now all or nothing. `maxKeepAliveTime` and `maxKeepAliveRequests` config parameters have been replaced by `forever`, which will keep connections open "forever". See: http://www.elasticsearch.org/guide/en/elasticsearch/client/javascript-api/current/configuration.html#configuration - The option to use keep-alive is now all or nothing. `maxKeepAliveTime` and `maxKeepAliveRequests` config parameters have been replaced by `keepAlive`, which will keeps at least `minSockets` connections open forever. See: http://www.elasticsearch.org/guide/en/elasticsearch/client/javascript-api/current/configuration.html#configuration
- Closing the client with `forever` turned on will allow the process to exit. #40 - Closing the client with `keepAlive` turned on will allow the process to exit. https://github.com/elasticsearch/elasticsearch-js/issues/40
## 1.4 (Jan 30 2014) ## 1.4 (Jan 30 2014)
- The trace log messages will now diaplay the actual host connected to (without auth info) unless they are being written to a bash script - The trace log messages will now diaplay the actual host connected to (without auth info) unless they are being written to a bash script

View File

@ -102,18 +102,21 @@ Default::: `30000`
`keepAlive`:: `Boolean` -- Should the connections to the node be kept open forever? This behavior is recommended when you are connecting directly to Elasticsearch.
Default::: `true`
`maxSockets`:: `Number` -- Number of sockets each connection should keep to it's corresponding node. This will also be the maximum number of concurrent requests that could be made to that node.
`maxSockets`:: `Number` -- Maximum number of concurrent requests that can be made to any node.
Default::: `10` Default::: `10`
`minSockets`:: `Number` -- Minimum number of sockets to keep connected to a node, only applies when `keepAlive` is true
`forever`:: `Boolean` -- Should the connections to the node be kept open forever? This behavior is recommended when you are connecting directly to Elasticsearch. Default::: `10`
Default::: `true`

View File

@ -34,13 +34,19 @@ function HttpConnector(host, config) {
} }
config = _.defaults(config || {}, { config = _.defaults(config || {}, {
forever: true, keepAlive: true,
minSockets: 10,
maxSockets: 10 maxSockets: 10
}); });
var Agent = this.hand.Agent; // the class var Agent = this.hand.Agent; // the class
if (config.forever) { if (config.forever) {
config.keepAlive = config.forever;
}
if (config.keepAlive) {
Agent = this.host.protocol === 'https' ? ForeverAgent.SSL : ForeverAgent; Agent = this.host.protocol === 'https' ? ForeverAgent.SSL : ForeverAgent;
this.on('status set', this.bound.onStatusSet);
} }
this.agent = new Agent({ this.agent = new Agent({
@ -49,6 +55,24 @@ function HttpConnector(host, config) {
} }
_.inherits(HttpConnector, ConnectionAbstract); _.inherits(HttpConnector, ConnectionAbstract);
HttpConnector.prototype.onStatusSet = _.handler(function (status) {
if (status === 'closed') {
this.agent.minSockets = this.agent.maxSockets = 0;
_.each(this.agent.sockets, function (sockets) {
_.each(sockets, function (s) {
s.destroy();
});
});
_.each(this.agent.freeSockets, function (sockets) {
_.each(sockets, function (s) {
s.destroy();
});
});
}
});
HttpConnector.prototype.makeReqParams = function (params) { HttpConnector.prototype.makeReqParams = function (params) {
params = params || {}; params = params || {};
var host = this.host; var host = this.host;

View File

@ -71,9 +71,9 @@ function Transport(config) {
} }
if (config.sniffInterval) { if (config.sniffInterval) {
this._sniffTimeout = setTimeout(function doSniff() { this._timeout(function doSniff() {
self.sniff(); self.sniff();
self._sniffTimeout = setTimeout(doSniff, config.sniffInterval); self._timeout(doSniff, config.sniffInterval);
}, config.sniffInterval); }, config.sniffInterval);
} }
@ -196,7 +196,7 @@ Transport.prototype.request = function (params, cb) {
return; return;
} }
clearTimeout(requestTimeoutId); self._timeout(requestTimeoutId);
var parsedBody; var parsedBody;
var isJson = !headers || (headers['content-type'] && ~headers['content-type'].indexOf('application/json')); var isJson = !headers || (headers['content-type'] && ~headers['content-type'].indexOf('application/json'));
@ -258,14 +258,14 @@ Transport.prototype.request = function (params, cb) {
aborted = true; aborted = true;
remainingRetries = 0; remainingRetries = 0;
clearTimeout(requestTimeoutId); self._timeout(requestTimeoutId);
if (typeof requestAborter === 'function') { if (typeof requestAborter === 'function') {
requestAborter(); requestAborter();
} }
} }
if (requestTimeout && requestTimeout !== Infinity) { if (requestTimeout && requestTimeout !== Infinity) {
requestTimeoutId = setTimeout(function () { requestTimeoutId = this._timeout(function () {
respond(new errors.RequestTimeout('Request Timeout after ' + requestTimeout + 'ms')); respond(new errors.RequestTimeout('Request Timeout after ' + requestTimeout + 'ms'));
abortRequest(); abortRequest();
}, requestTimeout); }, requestTimeout);
@ -292,7 +292,31 @@ Transport.prototype.request = function (params, cb) {
return ret; return ret;
}; };
Transport.prototype._timeout = function (cb, delay) {
this._timers = this._timers || [];
var id;
if ('function' !== typeof cb) {
id = cb;
cb = void 0;
}
if (cb) {
// set the timer
id = setTimeout(cb, delay);
this._timers.push(id);
return id;
}
if (id) {
clearTimeout(id);
var i = this._timers.indexOf(id);
if (i !== -1) {
this._timers.splice(i, 1);
}
}
};
/** /**
* Ask an ES node for a list of all the nodes, add/remove nodes from the connection * Ask an ES node for a list of all the nodes, add/remove nodes from the connection
@ -327,5 +351,6 @@ Transport.prototype.sniff = function (cb) {
*/ */
Transport.prototype.close = function () { Transport.prototype.close = function () {
this.log.close(); this.log.close();
_.each(this._timers, clearTimeout);
this.connectionPool.close(); this.connectionPool.close();
}; };

26
test/fixtures/keepalive.js vendored Normal file
View File

@ -0,0 +1,26 @@
var elasticsearch = require('../../src/elasticsearch');
var _ = require('lodash');
var es = elasticsearch.Client({
host: 'localhost:5555',
log: false
});
es.search({
index: '_all',
type: '_all',
body: {
query: {
match_all: {}
}
}
}, function (err, resp) {
var conn = _.union(es.transport.connectionPool._conns.dead, es.transport.connectionPool._conns.alive).pop();
es.close();
var destroyedSockets = 0;
function countDestroyed(sockets) {
destroyedSockets += _.where(sockets, { destroyed: true}).length;
}
_.each(conn.agent.sockets, countDestroyed);
_.each(conn.agent.freeSockets, countDestroyed);
console.log(destroyedSockets);
});

View File

@ -369,4 +369,26 @@ describe('Http Connector', function () {
}); });
}); });
describe('Connection cleanup', function () {
it('destroys any connections created', function (done) {
this.timeout(1000);
var cp = require('child_process');
var path = require('path');
var es = require('event-stream');
var proc = cp.fork(path.join(__dirname, '../../fixtures/keepalive.js'), {
silent: true
});
es.merge(
proc.stdout,
proc.stderr
).pipe(es.wait(function (err, output) {
expect(err).to.eql(null);
expect(output.trim()).to.eql('1');
done();
}));
});
});
}); });