Fixes the sniffOnConnectionFault config parameter
This commit is contained in:
@ -46,7 +46,7 @@
|
||||
"mocha-lcov-reporter": "0.0.1",
|
||||
"blanket": "~1.1.5",
|
||||
"sinon": "~1.7.3",
|
||||
"nock": "0.27.0",
|
||||
"nock": "git://github.com/spenceralger/nock#emit_stream_errors",
|
||||
"open": "0.0.4",
|
||||
"load-grunt-tasks": "~0.2.0",
|
||||
"load-grunt-config": "~0.7.0",
|
||||
|
||||
@ -200,7 +200,7 @@ ConnectionPool.prototype._onConnectionDied = function (connection, alreadyWasDea
|
||||
|
||||
var ms = this.calcDeadTimeout(timeout.attempt, this.deadTimeout);
|
||||
timeout.id = setTimeout(timeout.revive, ms);
|
||||
timeout.runAt = Date.now() + ms;
|
||||
timeout.runAt = _.now() + ms;
|
||||
};
|
||||
|
||||
ConnectionPool.prototype._selectDeadConnection = function (cb) {
|
||||
|
||||
@ -168,7 +168,7 @@ LoggerAbstract.prototype._formatTraceMessage = function (req) {
|
||||
|
||||
LoggerAbstract.prototype._prettyJson = function (body) {
|
||||
try {
|
||||
if (typeof object === 'string') {
|
||||
if (typeof body === 'string') {
|
||||
body = JSON.parse(body);
|
||||
}
|
||||
return JSON.stringify(body, null, ' ').replace(/'/g, '\\u0027');
|
||||
|
||||
@ -8,6 +8,7 @@ var _ = require('./utils');
|
||||
var errors = require('./errors');
|
||||
var Host = require('./host');
|
||||
var Promise = require('bluebird');
|
||||
var patchSniffOnConnectionFault = require('./transport/sniff_on_connection_fault');
|
||||
|
||||
function Transport(config) {
|
||||
var self = this;
|
||||
@ -80,7 +81,9 @@ function Transport(config) {
|
||||
}, config.sniffInterval);
|
||||
}
|
||||
|
||||
this.sniffAfterConnectionFault = config.sniffAfterConnectionFault;
|
||||
if (config.sniffOnConnectionFault) {
|
||||
patchSniffOnConnectionFault(this);
|
||||
}
|
||||
}
|
||||
|
||||
Transport.connectionPools = {
|
||||
@ -116,7 +119,6 @@ Transport.prototype.defer = function () {
|
||||
* @param {Function} cb - A function to call back with (error, responseBody, responseStatus)
|
||||
*/
|
||||
Transport.prototype.request = function (params, cb) {
|
||||
|
||||
var self = this;
|
||||
var remainingRetries = this.maxRetries;
|
||||
var requestTimeout = this.requestTimeout;
|
||||
@ -366,4 +368,4 @@ Transport.prototype.close = function () {
|
||||
this.log.close();
|
||||
_.each(this._timers, clearTimeout);
|
||||
this.connectionPool.close();
|
||||
};
|
||||
};
|
||||
59
src/lib/transport/sniff_on_connection_fault.js
Normal file
59
src/lib/transport/sniff_on_connection_fault.js
Normal file
@ -0,0 +1,59 @@
|
||||
var _ = require('../utils');
|
||||
|
||||
|
||||
/**
|
||||
* Patch the transport's connection pool to schedule a sniff after a connection fails.
|
||||
* When a connection fails for the first time it will schedule a sniff 1 second in the
|
||||
* future, and increase the timeout based on the deadTimeout algorithm chosen by the
|
||||
* connectionPool, and the number of times the sniff has failed.
|
||||
*
|
||||
* @param {Transport} transport - the transport that will be using this behavior
|
||||
* @return {undefined}
|
||||
*/
|
||||
module.exports = function setupSniffOnConnectionFault(transport) {
|
||||
var failures = 0;
|
||||
var pool = transport.connectionPool;
|
||||
var originalOnDied = pool._onConnectionDied;
|
||||
|
||||
// do the actual sniff, if the sniff is unable to
|
||||
// connect to a node this function will be called again by the connectionPool
|
||||
var work = function () {
|
||||
work.timerId = transport._timeout(work.timerId);
|
||||
transport.sniff();
|
||||
};
|
||||
|
||||
// create a function that will count down to a
|
||||
// point n milliseconds into the future
|
||||
var countdownTo = function (ms) {
|
||||
var start = _.now();
|
||||
return function () {
|
||||
return start - ms;
|
||||
};
|
||||
};
|
||||
|
||||
// overwrite the function, but still call it
|
||||
pool._onConnectionDied = function (connection, wasAlreadyDead) {
|
||||
var ret = originalOnDied.call(pool, connection, wasAlreadyDead);
|
||||
|
||||
// clear the failures if this is the first failure we have seen
|
||||
failures = work.timerId ? failures + 1 : 0;
|
||||
|
||||
var ms = pool.calcDeadTimeout(failures, 1000);
|
||||
|
||||
if (work.timerId && ms < work.timerId && work.countdown()) {
|
||||
// clear the timer
|
||||
work.timerId = transport._timeout(work.timerId);
|
||||
}
|
||||
|
||||
if (!work.timerId) {
|
||||
work.timerId = transport._timeout(work, ms);
|
||||
work.countdown = countdownTo(ms);
|
||||
}
|
||||
|
||||
return ret;
|
||||
};
|
||||
|
||||
pool._onConnectionDied.restore = function () {
|
||||
pool._onConnectionDied = originalOnDied;
|
||||
};
|
||||
};
|
||||
@ -418,4 +418,11 @@ _.getUnwrittenFromStream = function (stream) {
|
||||
}
|
||||
};
|
||||
|
||||
/**
|
||||
* return the current time in milliseconds since epoch
|
||||
*/
|
||||
_.now = function () {
|
||||
return (typeof Date.now === 'function') ? Date.now() : (new Date()).getTime();
|
||||
};
|
||||
|
||||
module.exports = utils;
|
||||
|
||||
@ -1,10 +1,12 @@
|
||||
var Transport = require('../../../src/lib/transport');
|
||||
var ConnectionPool = require('../../../src/lib/connection_pool');
|
||||
var Host = require('../../../src/lib/host');
|
||||
var errors = require('../../../src/lib/errors');
|
||||
var expect = require('expect.js');
|
||||
|
||||
var sinon = require('sinon');
|
||||
var nock = require('../../mocks/server.js');
|
||||
var estr = require('event-stream');
|
||||
var _ = require('lodash-node');
|
||||
var nodeList = require('../../fixtures/short_node_list.json');
|
||||
var stub = require('../../utils/auto_release_stub').make();
|
||||
@ -302,5 +304,46 @@ describe('Transport + Mock server', function () {
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
describe('sniffOnConnectionFault', function () {
|
||||
it('schedules a sniff when sniffOnConnectionFault is set and a connection failes', function (done) {
|
||||
var clock = sinon.useFakeTimers('setTimeout');
|
||||
var serverMock = nock('http://esbox.1.com')
|
||||
.get('/')
|
||||
.reply(200, function () {
|
||||
var str = estr.readable(function (count, cb) {
|
||||
cb(new Error('force error'));
|
||||
});
|
||||
str.setEncoding = function () {}; // force nock's isStream detection
|
||||
return str;
|
||||
});
|
||||
|
||||
stub(ConnectionPool.prototype, '_onConnectionDied');
|
||||
stub(Transport.prototype, 'sniff');
|
||||
var tran = new Transport({
|
||||
hosts: 'http://esbox.1.com',
|
||||
sniffOnConnectionFault: true,
|
||||
maxRetries: 0
|
||||
});
|
||||
|
||||
expect(tran.connectionPool._onConnectionDied).to.not.be(ConnectionPool.prototype._onConnectionDied);
|
||||
|
||||
tran.request({
|
||||
requestTimeout: Infinity
|
||||
}).then(
|
||||
_.partial(done, new Error('expected the request to fail')),
|
||||
function (err) {
|
||||
expect(ConnectionPool.prototype._onConnectionDied.callCount).to.eql(1);
|
||||
expect(tran.sniff.callCount).to.eql(0);
|
||||
expect(_.size(clock.timeouts)).to.eql(1);
|
||||
var timeout = _.values(clock.timeouts).pop();
|
||||
timeout.func();
|
||||
expect(tran.sniff.callCount).to.eql(1);
|
||||
clock.restore();
|
||||
done();
|
||||
}
|
||||
);
|
||||
});
|
||||
});
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user