save point durring huge unorganized refactor

This commit is contained in:
Spencer Alger
2013-11-22 16:48:30 -07:00
parent 5bb70fbe58
commit 97ba084795
80 changed files with 46126 additions and 2410 deletions

View File

@ -6,12 +6,66 @@ module.exports = Transport;
var _ = require('./utils');
var errors = require('./errors');
var Host = require('./host');
var Log = require('./log');
var when = require('when');
function Transport(config) {
this.config = config;
config = config || {};
var LogClass;
// setup the log
switch (typeof config.log) {
case 'function':
LogClass = config.log;
break;
case 'undefined':
config.log = 'warning';
/* fall through */
default:
LogClass = Log;
}
config.log = this.log = new LogClass(config);
// overwrite the createDefer method if a new implementation is provided
if (typeof config.createDefer === 'function') {
this.createDefer = config.createDefer;
}
// setup the connection pool
var ConnectionPool = _.funcEnum(config, 'connectionPool', Transport.connectionPools, 'main');
this.connectionPool = new ConnectionPool(config);
if (config.hosts) {
var hosts = _.createArray(config.hosts, function (val) {
if (_.isPlainObject(val) || _.isString(val)) {
return val;
}
});
if (!hosts) {
throw new Error('Invalid hosts config. Expected a URL, an array of urls, a host config object, or an array of ' +
'host config objects.');
}
this.connectionPool.setHosts(_.map(hosts, function (conf) {
return new Host(conf);
}));
}
// setup the serializer
var Serializer = _.funcEnum(config, 'serializer', Transport.serializers, 'json');
this.serializer = new Serializer(config);
}
Transport.connectionPools = {
main: require('./connection_pool')
};
Transport.serializers = {
json: require('./serializers/json')
};
/**
* Perform a request with the client's transport
*
@ -27,15 +81,13 @@ function Transport(config) {
*/
Transport.prototype.request = function (params, cb) {
var log = this.config.log;
var serializer = this.config.serializer;
var connectionPool = this.config.connectionPool;
var remainingRetries = this.config.maxRetries;
var self = this;
var remainingRetries = this.maxRetries;
var connection; // set in sendReqWithConnection
var connectionReq; // an object with an abort method, set in sendReqWithConnection
var request; // the object returned to the user, might be a deferred
log.debug('starting request', params);
self.log.debug('starting request', params);
if (params.body && params.method === 'GET') {
_.nextTick(respond, new TypeError('Body can not be sent with method "GET"'));
@ -44,7 +96,7 @@ Transport.prototype.request = function (params, cb) {
// serialize the body
if (params.body) {
params.body = serializer[params.bulkBody ? 'bulkBody' : 'serialize'](params.body);
params.body = self.serializer[params.bulkBody ? 'bulkBody' : 'serialize'](params.body);
}
params.req = {
@ -55,7 +107,7 @@ Transport.prototype.request = function (params, cb) {
body: params.body,
};
connectionPool.select(sendReqWithConnection);
self.connectionPool.select(sendReqWithConnection);
function abortRequest() {
remainingRetries = 0;
@ -69,7 +121,7 @@ Transport.prototype.request = function (params, cb) {
connection = _connection;
connectionReq = connection.request(params.req, checkRespForFailure);
} else {
log.warning('No living connections');
self.log.warning('No living connections');
respond(new errors.NoConnections());
}
}
@ -77,10 +129,10 @@ Transport.prototype.request = function (params, cb) {
function checkRespForFailure(err, body, status) {
if (err && remainingRetries) {
remainingRetries--;
log.error(err.message, '-- retrying');
connectionPool.select(sendReqWithConnection);
self.log.error(err.message, '-- retrying');
self.connectionPool.select(sendReqWithConnection);
} else {
log.info('Request complete');
self.log.info('Request complete');
respond(err, body, status);
}
}
@ -89,21 +141,15 @@ Transport.prototype.request = function (params, cb) {
var parsedBody;
if (!err && body) {
parsedBody = serializer.unserialize(body);
parsedBody = self.serializer.unserialize(body);
if (parsedBody == null) {
err = new errors.Serialization();
}
}
if (!err) {
// get ignore and ensure that it's an array
var ignore = params.ignore;
if (ignore && !_.isArray(ignore)) {
ignore = [ignore];
}
if ((status < 200 || status >= 300)
&& (!ignore || !_.contains(ignore, status))
&& (!params.ignore || !_.contains(params.ignore, status))
) {
if (errors[status]) {
err = new errors[status](parsedBody && parsedBody.error);
@ -140,7 +186,7 @@ Transport.prototype.request = function (params, cb) {
abort: abortRequest
};
} else {
var defer = when.defer();
var defer = this.createDefer();
defer.promise.abort = abortRequest;
request = defer.promise;
}
@ -148,6 +194,10 @@ Transport.prototype.request = function (params, cb) {
return request;
};
Transport.prototype.createDefer = function () {
return when.defer();
};
/**
* Ask an ES node for a list of all the nodes, add/remove nodes from the connection
* pool as appropriate
@ -155,7 +205,7 @@ Transport.prototype.request = function (params, cb) {
* @param {Function} cb - Function to call back once complete
*/
Transport.prototype.sniff = function (cb) {
var config = this.config;
var self = this;
// make cb a function if it isn't
cb = typeof cb === 'function' ? cb : _.noop;
@ -165,9 +215,16 @@ Transport.prototype.sniff = function (cb) {
method: 'GET'
}, function (err, resp) {
if (!err && resp && resp.nodes) {
var nodes = config.nodesToHostCallback(resp.nodes);
config.connectionPool.setNodes(nodes);
var hosts = _.map(self.nodesToHostCallback(resp.nodes), function (hostConfig) {
return new Host(hostConfig);
});
this.connectionPool.setHosts(hosts);
}
cb(err, resp);
});
};
Transport.prototype.close = function () {
this.log.close();
this.connectionPool.close();
};