Added "extends" key to the jshint config files, so there is less repetition.

Mocha now runs from grunt, just run "grunt"

Copied es-php's README.md, will modify later

More logging around sending a request, including stack traces for debug messages

Connections now manage their own state, and emit a "status changed" event which
the connection pool listens for

Fixed the custom errors

Stream loggers will dump their buffered output to stderr when the process exits
so that log messages will be sort of saved, File logger overrides this and
writes to the file syncronously

Added _.handler(), _.scheduled(), and _.makeBoundMethods() to the utils
This commit is contained in:
Spencer Alger
2013-10-21 10:09:12 -07:00
parent 8cc87637e2
commit b063dfdca7
24 changed files with 371 additions and 159 deletions

View File

@ -1,28 +0,0 @@
{
"node": true,
"white": true,
"bitwise": false,
"curly": true,
"eqnull": true,
"eqeqeq": true,
"forin": true,
"immed": true,
"expr": false,
"indent": 2,
"latedef": "nofunc",
"newcap": true,
"noarg": true,
"noempty": true,
"undef": true,
"quotmark": "single",
"plusplus": false,
"boss": true,
"trailing": true,
"laxbreak": true,
"laxcomma": true,
"validthis": true,
"sub": true,
"maxlen": 140
}

4
src/api/.jshintrc Normal file
View File

@ -0,0 +1,4 @@
{
"extends": "../../.jshintrc",
"maxlen": 0
}

View File

@ -222,7 +222,7 @@ Log.prototype.info = function (/* ...msg */) {
*/
Log.prototype.debug = function (/* ...msg */) {
if (EventEmitter.listenerCount(this, 'debug')) {
return this.emit('debug', Log.join(arguments));
return this.emit('debug', Log.join(arguments) + _.getStackTrace(Log.prototype.debug));
}
};

View File

@ -63,12 +63,15 @@ Transport.prototype.request = function (params, cb) {
function sendRequestWithConnection(err, _connection) {
if (err) {
log.error(err);
cb(err);
} else if (_connection) {
connection = _connection;
log.info('Selected', _connection.status, 'Connection, making request');
connection.request(params, checkRespForFailure);
} else {
cb(new errors.ConnectionFault('No active connections.'));
log.warning('No living connections');
cb(new errors.ConnectionFault('No living connections.'));
}
}
@ -81,9 +84,10 @@ Transport.prototype.request = function (params, cb) {
if (err && remainingRetries) {
remainingRetries--;
log.info('Retrying request after connection error');
log.info('connection error, retrying');
connectionPool.select(sendRequestWithConnection);
} else {
log.info('Request complete');
cb(err, reqParams, body, status);
}
}

View File

@ -46,7 +46,9 @@ var defaultConfig = {
sniffOnStart: false,
sniffAfterRequests: null,
sniffOnConnectionFail: false,
maxRetries: 3
maxRetries: 3,
timeout: 10000,
deadTimeout: 60000
};
function ClientConfig(config) {

View File

@ -12,12 +12,11 @@ var _ = require('./utils'),
* @class ConnectionAbstract
* @constructor
*/
function ConnectionAbstract(config) {
function ConnectionAbstract(config, nodeInfo) {
EventEmitter.call(this);
this.config = config;
this.hostname = config.hostname || 'localhost';
this.port = config.port || 9200;
this.timeout = config.timeout || 10000;
this.hostname = nodeInfo.hostname || 'localhost';
this.port = nodeInfo.port || 9200;
_.makeBoundMethods(this);
}
@ -37,12 +36,17 @@ ConnectionAbstract.prototype.request = function () {
throw new Error('Connection#request must be overwritten by the Connector');
};
ConnectionAbstract.prototype.ping = function () {
ConnectionAbstract.prototype.ping = function (params, cb) {
if (typeof params === 'function') {
cb = params;
} else if (typeof cb !== 'function') {
throw new TypeError('Callback must be a function');
}
return this.request({
path: '/',
method: 'HEAD',
timeout: '100'
});
}, cb);
};
ConnectionAbstract.prototype.setStatus = function (status) {

View File

@ -30,10 +30,14 @@ ConnectionPool.prototype.select = function (cb) {
if (this.config.selector.length > 1) {
this.config.selector(this.connections.alive, cb);
} else {
cb(null, this.config.selector(this.connections.alive));
try {
cb(null, this.config.selector(this.connections.alive));
} catch (e) {
cb(e);
}
}
} else {
cb(new errors.ConnectionFault('No active connections'));
cb();
}
};
@ -46,10 +50,10 @@ ConnectionPool.prototype.empty = function () {
});
};
ConnectionPool.prototype.setStatus = _.handler(function (status, oldStatus, connection) {
var origStatus = connection.status, from, to, index;
ConnectionPool.prototype.onStatusChanged = _.handler(function (status, oldStatus, connection) {
var from, to, index;
if (origStatus === status) {
if (oldStatus === status) {
return true;
} else {
this.config.log.info('connection to', _.formatUrl(connection), 'is', status);
@ -65,8 +69,8 @@ ConnectionPool.prototype.setStatus = _.handler(function (status, oldStatus, conn
to = this.connections.dead;
break;
case 'closed':
from = this.connections[origStatus];
connection.removeListener('status changed', this.bound.setStatus);
from = this.connections[oldStatus];
connection.removeListener('status changed', this.bound.onStatusChanged);
break;
}
@ -88,7 +92,7 @@ ConnectionPool.prototype.setStatus = _.handler(function (status, oldStatus, conn
ConnectionPool.prototype.add = function (connection) {
if (!~this.connections.alive.indexOf(connection) && !~this.connections.dead.indexOf(connection)) {
connection.status = 'alive';
connection.on('status changed', this.bound.setStatus);
connection.on('status changed', this.bound.onStatusChanged);
this.connections.alive.push(connection);
}
};

View File

@ -18,10 +18,10 @@ var http = require('http'),
};
function HttpConnection(config) {
ConnectionAbstract.call(this, config);
function HttpConnection(config, nodeInfo) {
ConnectionAbstract.call(this, config, nodeInfo);
this.protocol = config.protocol || 'http:';
this.protocol = nodeInfo.protocol || 'http:';
if (this.protocol[this.protocol.length - 1] !== ':') {
this.protocol = this.protocol + ':';
}
@ -36,7 +36,7 @@ function HttpConnection(config) {
this.on('closed', this.bound.onClosed);
this.once('alive', this.bound.onAlive);
this.requestCount = 0;
}
_.inherits(HttpConnection, ConnectionAbstract);
@ -51,12 +51,15 @@ HttpConnection.prototype.onAlive = _.handler(function () {
});
HttpConnection.prototype.request = function (params, cb) {
var request,
response,
status = 0,
timeout = params.timeout || this.timeout,
timeoutId,
log = this.config.log;
var incoming;
var timeoutId;
var log = this.config.log;
var request;
var requestId = this.requestCount;
var response;
var responseStarted = false;
var status = 0;
var timeout = params.timeout || this.config.timeout;
var reqParams = _.defaults({
protocol: this.protocol,
@ -67,15 +70,20 @@ HttpConnection.prototype.request = function (params, cb) {
headers: _.defaults(params.headers || {}, defaultHeaders)
});
log.debug('starting request', requestId);
// general clean-up procedure to run after the request, can only run once
var cleanUp = function (err) {
cleanUp = _.noop;
clearTimeout(timeoutId);
if (request) {
request.removeAllListeners();
}
request && request.removeAllListeners();
incoming && incoming.removeAllListeners();
log.debug('calling back request', requestId, err ? 'with error "' + err.message + '"' : '');
_.nextTick(cb, err, reqParams, response, status);
// override so this doesn't get called again
cleanUp = _.noop;
};
// ensure that "get" isn't being used with a request body
@ -88,7 +96,8 @@ HttpConnection.prototype.request = function (params, cb) {
request = http.request(reqParams);
request.on('response', function (incoming) {
request.on('response', function (_incoming) {
incoming = _incoming;
status = incoming.statusCode;
incoming.setEncoding('utf8');
response = '';
@ -98,7 +107,6 @@ HttpConnection.prototype.request = function (params, cb) {
});
incoming.on('end', function requestComplete() {
incoming.removeAllListeners();
cleanUp();
});
});
@ -113,5 +121,9 @@ HttpConnection.prototype.request = function (params, cb) {
request.emit('error', new errors.RequestTimeout('Request timed out at ' + timeout + 'ms'));
}, timeout);
request.setNoDelay(true);
request.setSocketKeepAlive(true);
request.end(params.body);
this.requestCount++;
};

View File

@ -1,11 +1,11 @@
var _ = require('./utils'),
errors = module.exports;
function ErrorAbstract(msg) {
Error.call(this, msg);
Error.captureStackTrace(this, this.constructor);
function ErrorAbstract(msg, constructor) {
this.message = msg;
Error.call(this, this.message);
Error.captureStackTrace(this, constructor);
}
_.inherits(ErrorAbstract, Error);
@ -14,8 +14,7 @@ _.inherits(ErrorAbstract, Error);
* @param {String} [msg] - An error message that will probably end up in a log.
*/
errors.ConnectionFault = function ConnectionFault(msg) {
return new Error(msg || 'Connection Failure');
ErrorAbstract.call(this, msg || 'Connection Failure');
ErrorAbstract.call(this, msg || 'Connection Failure', errors.ConnectionFault);
};
_.inherits(errors.ConnectionFault, ErrorAbstract);
@ -24,8 +23,7 @@ _.inherits(errors.ConnectionFault, ErrorAbstract);
* @param {String} [msg] - An error message that will probably end up in a log.
*/
errors.Generic = function Generic(msg) {
return new Error(msg || 'Generic Error');
ErrorAbstract.call(this, msg || 'Generic Error');
ErrorAbstract.call(this, msg || 'Generic Error', errors.Generic);
};
_.inherits(errors.Generic, ErrorAbstract);
@ -34,8 +32,7 @@ _.inherits(errors.Generic, ErrorAbstract);
* @param {String} [msg] - An error message that will probably end up in a log.
*/
errors.RequestTimeout = function RequestTimeout(msg) {
return new Error(msg || 'RequestTimeout');
ErrorAbstract.call(this, msg || 'Request Timeout');
ErrorAbstract.call(this, msg || 'Request Timeout', errors.RequestTimeout);
};
_.inherits(errors.RequestTimeout, ErrorAbstract);
@ -44,8 +41,7 @@ _.inherits(errors.RequestTimeout, ErrorAbstract);
* @param {String} [msg] - An error message that will probably end up in a log.
*/
errors.Serialization = function RequestTimeout(msg) {
return new Error(msg || 'ParseError');
ErrorAbstract.call(this, msg || 'Unable to parse response body');
ErrorAbstract.call(this, msg || 'Unable to parse response body', errors.RequestTimeout);
};
_.inherits(errors.RequestTimeout, ErrorAbstract);
@ -105,8 +101,7 @@ _.each(statusCodes, function (name, status) {
var className = _.studlyCase(name);
function StatusCodeError(msg) {
return new Error(msg || name);
ErrorAbstract.call(this, msg || name);
ErrorAbstract.call(this, msg || name, errors.StatusCodeError);
}
_.inherits(StatusCodeError, ErrorAbstract);

View File

@ -13,8 +13,6 @@ function LoggerAbstract(config, bridge) {
_.makeBoundMethods(this);
console.log(this.bound);
// when the bridge closes, remove our event listeners
this.bridge.on('closing', this.bound.cleanUpListeners);

View File

@ -16,6 +16,8 @@ var StreamLogger = require('./stream'),
fs = require('fs');
function File(config, bridge) {
this.path = config.path;
config.stream = fs.createWriteStream(config.path, {
flags: 'a',
encoding: 'utf8'
@ -24,3 +26,15 @@ function File(config, bridge) {
File.callSuper(this, arguments);
}
_.inherits(File, StreamLogger);
File.prototype.onProcessExit = _.handler(function () {
// flush the write buffer to disk
var writeBuffer = this.stream._writableState.buffer;
var out = '';
if (writeBuffer) {
writeBuffer.forEach(function (buffered) {
out += buffered.chunk.toString();
});
fs.appendFileSync(this.path, out);
}
});

View File

@ -51,9 +51,9 @@ Stdio.prototype.write = function (to, label, colorize, message) {
* @param {Error} e - The Error object to log
* @return {undefined}
*/
Stdio.prototype.onError = function (e) {
Stdio.prototype.onError = _.handler(function (e) {
this.write(process.stderr, e.name === 'Error' ? 'ERROR' : e.name, clc.red.bold, e.stack);
};
});
/**
* Handler for the bridges "warning" event
@ -63,9 +63,9 @@ Stdio.prototype.onError = function (e) {
* @param {String} msg - The message to be logged
* @return {undefined}
*/
Stdio.prototype.onWarning = function (msg) {
Stdio.prototype.onWarning = _.handler(function (msg) {
this.write(process.stderr, 'WARNING', clc.yellow.bold, msg);
};
});
/**
* Handler for the bridges "info" event
@ -75,9 +75,9 @@ Stdio.prototype.onWarning = function (msg) {
* @param {String} msg - The message to be logged
* @return {undefined}
*/
Stdio.prototype.onInfo = function (msg) {
Stdio.prototype.onInfo = _.handler(function (msg) {
this.write(process.stdout, 'INFO', clc.cyan.bold, msg);
};
});
/**
* Handler for the bridges "debug" event
@ -87,9 +87,9 @@ Stdio.prototype.onInfo = function (msg) {
* @param {String} msg - The message to be logged
* @return {undefined}
*/
Stdio.prototype.onDebug = function (msg) {
Stdio.prototype.onDebug = _.handler(function (msg) {
this.write(process.stdout, 'DEBUG', clc.magentaBright.bold, msg);
};
});
/**
* Handler for the bridges "trace" event
@ -98,7 +98,7 @@ Stdio.prototype.onDebug = function (msg) {
* @private
* @return {undefined}
*/
Stdio.prototype.onTrace = function (method, url, body, responseBody, responseStatus) {
Stdio.prototype.onTrace = _.handler(function (method, url, body, responseBody, responseStatus) {
var message = 'curl "' + url.replace(/"/g, '\\"') + '" -X' + method.toUpperCase();
if (body) {
message += ' -d "' + body.replace(/"/g, '\\"') + '"';
@ -116,4 +116,4 @@ Stdio.prototype.onTrace = function (method, url, body, responseBody, responseSta
message += '\n' + responseBody;
this.write(process.stdout, 'TRACE', clc.cyanBright.bold, message);
};
});

View File

@ -19,15 +19,29 @@ var LoggerAbstract = require('../Logger'),
function Stream(config, bridge) {
Stream.callSuper(this, arguments);
_.makeBoundMethods(this);
if (config.stream instanceof nodeStreams.Writable) {
this.stream = config.stream;
} else {
throw new TypeError('Invalid stream, use an instance of stream.Writeable');
}
process.on('exit', this.bound.onProcessExit);
}
_.inherits(Stream, LoggerAbstract);
// flush the write buffer to stderr synchronously
Stream.prototype.onProcessExit = _.handler(function () {
var writeBuffer = this.stream._writableState.buffer;
if (writeBuffer && writeBuffer.length) {
console.error('Log stream did not get to finish writing. Flushing to stderr');
writeBuffer.forEach(function (buffered) {
console.error(buffered.chunk.toString());
});
}
});
Stream.prototype.write = function (label, message) {
this.stream.write(this.format(label, message), 'utf8');
};

View File

@ -450,7 +450,6 @@ utils.makeBoundMethods = function (obj, methods) {
for (var prop in obj) {
// dearest maintainer, we want to look through the prototype
if (typeof obj[prop] === 'function' && obj[prop]._provideBound === true) {
console.log('binding', prop);
obj.bound[prop] = utils.bind(obj[prop], obj);
}
}
@ -463,4 +462,10 @@ utils.makeBoundMethods = function (obj, methods) {
utils.noop = function () {};
utils.getStackTrace = function (callee) {
var e = {};
Error.captureStackTrace(e, callee || utils.getStackTrace);
return '\n' + e.stack.split('\n').slice(1).join('\n');
};
module.exports = utils;