- grunt watch will now abort mid task

- connection's ping method now accepts requestTimeout, path, and method params like
  all the grown-up API calls
- ConnectionPool now managed connection timeouts. When a connection dies a
  timeout object is created to track when the timeout is scheduled and the function
  to call when it does. It also tracks how many times it has run to allow the timeout
  to grow
- Timeouts now grow with use of `config.calcDeadTimeout` which is set to 'exponential'
  by default, but can also be set to flat in order to always use the standard
  deadTimeout. Exponential growth of the deadTimeout is stopped at config.maxDeadTimeout
  which is set to 30 minutes by default.
- Connections no longer have a resuscitate method (too hard to spell). Now the
  method is created dynamically as a part of the timeout object as it just calls
  the connection's ping method and needed to access variables like revive attempts.
- Timeouts were moved to the transport layer, meaning that you need to capture the
  abort method and abort the request yourself if you are handling connections
  directly, ConnectionsAbstract's ping method does this.
This commit is contained in:
Spencer Alger
2013-12-16 02:35:28 -07:00
parent 54129ac2b4
commit cf3be01c57
12 changed files with 445 additions and 198 deletions

View File

@ -232,14 +232,13 @@ function exec(transport, spec, params, cb) {
if (params.hasOwnProperty(key) && params[key] != null) {
switch (key) {
case 'body':
request.body = params[key];
case 'requestTimeout':
case 'maxRetries':
request[key] = params[key];
break;
case 'ignore':
request.ignore = _.isArray(params[key]) ? params[key] : [params[key]];
break;
case 'requestTimeout':
request.requestTimeout = params[key];
break;
case 'method':
request.method = _.toUpperString(params[key]);
break;

View File

@ -4,11 +4,7 @@ var _ = require('./utils');
var EventEmitter = require('events').EventEmitter;
var Log = require('./log');
var Host = require('./host');
var defaults = {
deadTimeout: 30000,
requestTimeout: 10000
};
var errors = require('./errors');
/**
* Abstract class used for Connection classes
@ -16,14 +12,10 @@ var defaults = {
* @constructor
*/
function ConnectionAbstract(host, config) {
config = config || {};
EventEmitter.call(this);
config = _.defaults(config || {}, defaults);
this.deadTimeout = config.deadTimeout;
this.requestTimeout = config.requestTimeout;
this.requestCount = 0;
this.requestTimeout = config.hasOwnProperty('requestTimeout') ? config.requestTimeout : 30000;
this.log = config.log || new Log();
if (!host) {
@ -52,49 +44,56 @@ ConnectionAbstract.prototype.request = function () {
throw new Error('Connection#request must be overwritten by the Connector');
};
ConnectionAbstract.prototype.ping = function (cb) {
if (typeof cb !== 'function') {
throw new TypeError('Callback must be a function');
ConnectionAbstract.prototype.ping = function (params, cb) {
if (typeof params === 'function') {
cb = params;
params = null;
} else {
cb = typeof cb === 'function' ? cb : null;
}
return this.request({
var requestTimeout = 100;
var requestTimeoutId;
var aborted;
var abort;
if (params && params.hasOwnProperty('requestTimeout')) {
requestTimeout = params.requestTimeout;
}
abort = this.request(_.defaults(params || {}, {
path: '/',
method: 'HEAD',
requestTimeout: 100
}, cb);
method: 'HEAD'
}), function (err) {
if (aborted) {
return;
}
clearTimeout(requestTimeoutId);
if (cb) {
cb(err);
}
});
if (requestTimeout) {
requestTimeoutId = setTimeout(function () {
if (abort) {
abort();
}
aborted = true;
if (cb) {
cb(new errors.RequestTimeout('Ping Timeout after ' + requestTimeout + 'ms'));
}
}, requestTimeout);
}
};
ConnectionAbstract.prototype.setStatus = function (status) {
var origStatus = this.status;
this.status = status;
if (this._deadTimeoutId) {
clearTimeout(this._deadTimeoutId);
this._deadTimeoutId = null;
}
if (status === 'dead') {
this._deadTimeoutId = setTimeout(this.bound.resuscitate, this.deadTimeout);
}
this.emit('status set', status, origStatus, this);
if (status === 'closed') {
this.removeAllListeners();
}
};
ConnectionAbstract.prototype.resuscitate = _.scheduled(function () {
var self = this;
if (self.status === 'dead') {
self.ping(function (err) {
if (!err) {
self.setStatus('alive');
} else {
self.setStatus('dead');
}
});
}
});
};

View File

@ -13,13 +13,17 @@ var _ = require('./utils');
var Log = require('./log');
function ConnectionPool(config) {
config = config || {};
_.makeBoundMethods(this);
this.log = config.log;
if (!this.log) {
if (!config.log) {
this.log = new Log();
config.log = this.log;
} else {
this.log = config.log;
}
// we will need this when we create connections down the road
this._config = config;
// get the selector config var
@ -29,6 +33,11 @@ function ConnectionPool(config) {
this.Connection = _.funcEnum(config, 'connectionClass', ConnectionPool.connectionClasses,
ConnectionPool.defaultConnectionClass);
// time that connections will wait before being revived
this.deadTimeout = config.hasOwnProperty('deadTimeout') ? config.deadTimeout : 60000;
this.maxDeadTimeout = config.hasOwnProperty('maxDeadTimeout') ? config.maxDeadTimeout : 18e5;
this.calcDeadTimeout = _.funcEnum(config, 'calcDeadTimeout', ConnectionPool.calcDeadTimeoutOptions, 'exponential');
// a map of connections to their "id" property, used when sniffing
this.index = {};
@ -36,6 +45,9 @@ function ConnectionPool(config) {
alive: [],
dead: []
};
// information about timeouts for dead connections
this._timeouts = [];
}
// selector options
@ -47,6 +59,16 @@ ConnectionPool.connectionClasses = require('./connectors');
ConnectionPool.defaultConnectionClass = ConnectionPool.connectionClasses._default;
delete ConnectionPool.connectionClasses._default;
// the function that calculates timeouts based on attempts
ConnectionPool.calcDeadTimeoutOptions = {
flat: function (attempt, baseTimeout) {
return baseTimeout;
},
exponential: function (attempt, baseTimeout) {
return Math.min(baseTimeout * 2 * Math.pow(2, (attempt * 0.5 - 1)), this.maxDeadTimeout);
}
};
/**
* Selects a connection from the list using the this.selector
* Features:
@ -69,73 +91,177 @@ ConnectionPool.prototype.select = function (cb) {
cb(e);
}
}
} else if (this._timeouts.length) {
this._selectDeadConnection(cb);
} else {
_.nextTick(cb, null, this.getConnection());
_.nextTick(cb, null);
}
};
/**
* Handler for the "set status" event emitted but the connections. It will move
* the connection to it's proper connection list (unless it was closed).
*
* @param {String} status - the connection's new status
* @param {String} oldStatus - the connection's old status
* @param {ConnectionAbstract} connection - the connection object itself
*/
ConnectionPool.prototype.onStatusSet = _.handler(function (status, oldStatus, connection) {
var from, to, index;
var index;
if (oldStatus === status) {
if (status === 'dead') {
// we want to remove the connection from it's current possition and move it to the end
status = 'redead';
} else {
return true;
var died = (status === 'dead');
var wasAlreadyDead = (died && oldStatus === 'dead');
var revived = (!died && oldStatus === 'dead');
var noChange = (oldStatus === status);
var from = this._conns[oldStatus];
var to = this._conns[status];
if (noChange && !died) {
return true;
}
if (from !== to) {
if (_.isArray(from)) {
index = from.indexOf(connection);
if (index !== -1) {
from.splice(index, 1);
}
}
if (_.isArray(to)) {
index = to.indexOf(connection);
if (index === -1) {
to.push(connection);
}
}
}
switch (status) {
case 'alive':
from = this._conns.dead;
to = this._conns.alive;
break;
case 'dead':
from = this._conns.alive;
to = this._conns.dead;
break;
case 'redead':
from = this._conns.dead;
to = this._conns.dead;
break;
case 'closed':
from = this._conns[oldStatus];
break;
if (died) {
this._onConnectionDied(connection, wasAlreadyDead);
}
if (from && from.indexOf) {
index = from.indexOf(connection);
if (~index) {
from.splice(index, 1);
}
}
if (to && to.indexOf) {
index = to.indexOf(connection);
if (!~index) {
to.push(connection);
}
if (revived) {
this._onConnectionRevived(connection);
}
});
/**
* Fetches the first active connection, falls back to dead connections
* This is really only here for testing purposes
*
* @private
* @return {Connection} - Some connection
* Handler used to clear the times created when a connection dies
* @param {ConnectionAbstract} connection
*/
ConnectionPool.prototype.getConnection = function () {
if (this._conns.alive.length) {
return this._conns.alive[0];
}
if (this._conns.dead.length) {
return this._conns.dead[0];
ConnectionPool.prototype._onConnectionRevived = function (connection) {
var timeout;
for (var i = 0; i < this._timeouts.length; i++) {
if (this._timeouts[i].conn === connection) {
timeout = this._timeouts[i];
if (timeout.id) {
clearTimeout(timeout.id);
}
this._timeouts.splice(i, 1);
break;
}
}
};
/**
* Handler used to update or create a timeout for the connection which has died
* @param {ConnectionAbstract} connection
* @param {Boolean} alreadyWasDead - If the connection was preivously dead this must be set to true
*/
ConnectionPool.prototype._onConnectionDied = function (connection, alreadyWasDead) {
var timeout;
if (alreadyWasDead) {
for (var i = 0; i < this._timeouts.length; i++) {
if (this._timeouts[i].conn === connection) {
timeout = this._timeouts[i];
break;
}
}
} else {
timeout = {
conn: connection,
attempt: 0,
revive: function (cb) {
timeout.attempt++;
connection.ping(function (err) {
connection.setStatus(err ? 'dead' : 'alive');
if (cb && typeof cb === 'function') {
cb(err);
}
});
}
};
this._timeouts.push(timeout);
}
if (timeout.id) {
clearTimeout(timeout.id);
}
var ms = this.calcDeadTimeout(timeout.attempt, this.deadTimeout);
timeout.id = setTimeout(timeout.revive, ms);
timeout.runAt = Date.now() + ms;
};
ConnectionPool.prototype._selectDeadConnection = function (cb) {
var orderedTimeouts = _.sortBy(this._timeouts, 'runAt');
var log = this.log;
process.nextTick(function next() {
var timeout = orderedTimeouts.shift();
if (!timeout) {
cb(null);
return;
}
if (!timeout.conn) {
next();
return;
}
if (timeout.conn.status === 'dead') {
timeout.revive(function (err) {
if (err) {
log.warning('Unable to revive connection: ' + timeout.conn.id);
process.nextTick(next);
} else {
cb(null, timeout.conn);
}
});
} else {
cb(null, timeout.conn);
}
});
};
/**
* Returns a random list of nodes from the living connections up to the limit.
* If there are no living connections it will fall back to the dead connections.
* If there are no dead connections it will return nothing.
*
* This is used for testing (when we just want the one existing node)
* and sniffing, where using the selector to get all of the living connections
* is not reasonable.
*
* @param {Number} limit - Max number to return
*/
ConnectionPool.prototype.getConnections = function (status, limit) {
var list;
if (status) {
list = this._conns[status];
} else {
list = this._conns[this._conns.alive.length ? 'alive' : 'dead'];
}
return _.shuffle(list).slice(0, typeof limit === 'undefined' ? list.length : limit);
};
/**
* Add a single connection to the pool and change it's status to "alive".
* The connection should inherit from ConnectionAbstract
*
* @param {ConnectionAbstract} connection - The connection to add
*/
ConnectionPool.prototype.addConnection = function (connection) {
if (!connection.id) {
connection.id = connection.host.toString();
@ -149,6 +275,11 @@ ConnectionPool.prototype.addConnection = function (connection) {
}
};
/**
* Remove a connection from the pool, and set it's status to "closed".
*
* @param {ConnectionAbstract} connection - The connection to remove/close
*/
ConnectionPool.prototype.removeConnection = function (connection) {
if (!connection.id) {
connection.id = connection.host.toString();
@ -161,6 +292,12 @@ ConnectionPool.prototype.removeConnection = function (connection) {
}
};
/**
* Override the internal node list. All connections that are not in the new host
* list are closed and removed. Non-unique hosts are ignored.
*
* @param {Host[]} hosts - An array of Host instances.
*/
ConnectionPool.prototype.setHosts = function (hosts) {
var connection;
var i;
@ -186,7 +323,10 @@ ConnectionPool.prototype.setHosts = function (hosts) {
}
};
/**
* Close the conncetion pool, as well as all of it's connections
*/
ConnectionPool.prototype.close = function () {
this.setHosts([]);
};
ConnectionPool.prototype.empty = ConnectionPool.prototype.close;
ConnectionPool.prototype.empty = ConnectionPool.prototype.close;

View File

@ -137,7 +137,6 @@ HttpConnector.prototype.request = function (params, cb) {
} else {
request.end();
}
this.requestCount++;
return function () {
request.abort();

View File

@ -255,10 +255,13 @@ Transport.prototype.request = function (params, cb) {
requestTimeout = params.hasOwnProperty('requestTimeout') ? params.requestTimeout : this.requestTimeout;
if (requestTimeout && requestTimeout !== Infinity) {
requestTimeoutId = setTimeout(function () {
respond(new errors.RequestTimeout());
abortRequest();
}, requestTimeout);
requestTimeout = parseInt(requestTimeout, 10);
if (!isNaN(requestTimeout)) {
requestTimeoutId = setTimeout(function () {
respond(new errors.RequestTimeout('Request Timeout after ' + requestTimeout + 'ms'));
abortRequest();
}, requestTimeout);
}
}
// determine the response based on the presense of a callback
@ -272,7 +275,12 @@ Transport.prototype.request = function (params, cb) {
request.abort = abortRequest;
}
self.connectionPool.select(sendReqWithConnection);
if (connection) {
sendReqWithConnection(null, connection);
} else {
self.connectionPool.select(sendReqWithConnection);
}
return request;
};