many tests

This commit is contained in:
Spencer Alger
2013-12-03 19:01:04 -07:00
parent 2ddde47972
commit 4e5f08a29c
19 changed files with 1011 additions and 201 deletions

View File

@ -109,7 +109,11 @@ HttpConnector.prototype.request = function (params, cb) {
}
log.trace(params.method, reqParams, params.body, response, status);
cb(err, response, status);
if (err) {
cb(err);
} else {
cb(err, response, status);
}
}, this);
request = this.hand.request(reqParams, function (_incoming) {

View File

@ -12,7 +12,7 @@ function LoggerAbstract(log, config) {
_.makeBoundMethods(this);
// when the log closes, remove our event listeners
this.log.on('closing', this.bound.cleanUpListeners);
this.log.once('closing', this.bound.cleanUpListeners);
this.setupListeners(config.levels);
}
@ -62,14 +62,15 @@ LoggerAbstract.prototype.write = function () {
LoggerAbstract.prototype.setupListeners = function (levels) {
this.cleanUpListeners();
this.listeningLevels = levels;
this.listeningLevels = [];
_.each(this.listeningLevels, function (level) {
_.each(levels, function (level) {
var fnName = 'on' + _.ucfirst(level);
if (this.bound[fnName]) {
this.listeningLevels.push(level);
this.log.on(level, this.bound[fnName]);
} else {
throw new Error(fnName + ' is not a function');
throw new Error('Unable to listen for level "' + level + '"');
}
}, this);
};
@ -144,7 +145,7 @@ LoggerAbstract.prototype.onDebug = _.handler(function (msg) {
* @return {undefined}
*/
LoggerAbstract.prototype.onTrace = _.handler(function (message, curlCall) {
this.write('TRACE', message + '\n' + curlCall);
this.write('TRACE', curlCall + '\n' + message);
});

View File

@ -20,24 +20,11 @@ var LoggerAbstract = require('../logger');
var _ = require('../utils');
var defaultColors = {
error: function (txt) {
return chalk.red.bold(txt);
},
warning: function (txt) {
return chalk.yellow.bold(txt);
},
info: function (txt) {
return chalk.cyan.bold(txt);
},
debug: function (txt) {
return chalk.magenta.bold(txt);
},
trace: function (txt) {
return chalk.white.bold(txt);
},
traceStatus: function (status) {
return chalk[status >= 200 && status < 300 ? 'green' : 'red'].bold(status);
}
error: chalk.red.bold,
warning: chalk.yellow.bold,
info: chalk.cyan.bold,
debug: chalk.magenta.bold,
trace: chalk.white.bold
};
function Stdio(log, config) {

View File

@ -1,7 +1,7 @@
/**
* Logger that writes to a file
*
* @class Loggers.File
* @class Loggers.Stream
* @extends LoggerAbstract
* @constructor
* @see LoggerAbstract
@ -16,7 +16,6 @@ var LoggerAbstract = require('../logger');
var _ = require('../utils');
function Stream(log, config) {
// call my super
LoggerAbstract.call(this, log, config);
if (config.stream && config.stream.write && config.stream.end) {
@ -25,22 +24,21 @@ function Stream(log, config) {
throw new TypeError('Invalid stream, use an instance of stream.Writeable');
}
if (this.stream._writableState && this.stream._writableState.buffer) {
process.on('exit', this.bound.onProcessExit);
}
// else you should probably flush your stream
process.once('exit', this.bound.onProcessExit);
}
_.inherits(Stream, LoggerAbstract);
// flush the write buffer to stderr synchronously
Stream.prototype.onProcessExit = _.handler(function () {
// process is dying, lets manually flush the buffer synchronously to stderr.
var writeBuffer = this.stream._writableState.buffer;
if (writeBuffer && writeBuffer.length) {
console.error('Log stream did not get to finish writing. Flushing to stderr');
writeBuffer.forEach(function (buffered) {
console.error(buffered.chunk.toString());
});
if (this.stream._writableState && this.stream._writableState.buffer) {
var writeBuffer = this.stream._writableState.buffer;
if (writeBuffer.length) {
console.error('Log stream did not get to finish writing. Flushing to stderr');
writeBuffer.forEach(function (buffered) {
console.error(buffered.chunk.toString());
});
}
}
});

View File

@ -1,20 +1,25 @@
var _ = require('./utils');
var extractHostPartsRE = /\[([^:]+):(\d+)\]/;
var extractHostPartsRE = /\[\/*([^:]+):(\d+)\]/;
module.exports = function (nodes) {
var hosts = [];
_.each(nodes, function (node, id) {
var hostnameMatches = extractHostPartsRE.exec(node.http_address);
hosts.push({
host: hostnameMatches[1],
port: hostnameMatches[2],
_meta: {
id: id,
name: node.name,
hostname: node.hostname,
version: node.version
}
function makeNodeParser(hostProp) {
return function (nodes) {
var hosts = [];
_.each(nodes, function (node, id) {
var hostnameMatches = extractHostPartsRE.exec(node[hostProp]);
hosts.push({
host: hostnameMatches[1],
port: parseInt(hostnameMatches[2], 10),
_meta: {
id: id,
name: node.name,
hostname: node.hostname,
version: node.version
}
});
});
});
return hosts;
};
return hosts;
};
}
module.exports = makeNodeParser('http_address');
module.exports.thrift = makeNodeParser('transport_address');

View File

@ -28,6 +28,9 @@ function Transport(config) {
var Serializer = _.funcEnum(config, 'serializer', Transport.serializers, 'json');
this.serializer = new Serializer(config);
// setup the nodesToHostCallback
this.nodesToHostCallback = _.funcEnum(config, 'nodesToHostCallback', Transport.nodesToHostCallbacks, 'main');
// setup max retries
this.maxRetries = config.hasOwnProperty('maxRetries') ? config.maxRetries : 3;
@ -209,7 +212,8 @@ Transport.prototype.createDefer = function () {
* @param {Function} cb - Function to call back once complete
*/
Transport.prototype.sniff = function (cb) {
var self = this;
var connectionPool = this.connectionPool;
var nodesToHostCallback = this.nodesToHostCallback;
// make cb a function if it isn't
cb = typeof cb === 'function' ? cb : _.noop;
@ -217,14 +221,14 @@ Transport.prototype.sniff = function (cb) {
this.request({
path: '/_cluster/nodes',
method: 'GET'
}, function (err, resp) {
}, function (err, resp, status) {
if (!err && resp && resp.nodes) {
var hosts = _.map(self.nodesToHostCallback(resp.nodes), function (hostConfig) {
var hosts = _.map(nodesToHostCallback(resp.nodes), function (hostConfig) {
return new Host(hostConfig);
});
this.connectionPool.setHosts(hosts);
connectionPool.setHosts(hosts);
}
cb(err, resp);
cb(err, resp, status);
});
};