Slight refactor to the api module, so it will simply extend the client like it did

This commit is contained in:
Spencer Alger
2013-10-23 14:49:00 -07:00
parent 35209ea61a
commit c14d37aa42
39 changed files with 25589 additions and 3627 deletions

View File

@ -32,9 +32,10 @@ module.exports = Client;
var _ = require('./utils');
var ClientConfig = require('./client_config');
// var api = _.reKey(_.requireDir(module, '../api'), _.camelCase);
var ca = require('./client_action').create;
var q = require('q');
var errors = require('./errors');
var api = require('./api.js');
function Client(config) {
this.client = this;
@ -48,11 +49,13 @@ function Client(config) {
});
this.config.client = this;
for (var i = 0; i < _namespaces.length; i++) {
this[_namespaces[i]] = new this[_namespaces[i]](this);
for (var i = 0; i < this._namespaces.length; i++) {
this[this._namespaces[i]] = new this[this._namespaces[i]](this);
}
}
Client.prototype = api;
/**
* Perform a request with the client's transport
*
@ -91,7 +94,7 @@ Client.prototype.request = function (params, cb) {
}
if (params.body && params.method === 'GET') {
_.nextTick(cb, new TypeError('Body can not be sent with method "GET"'));
respond(new TypeError('Body can not be sent with method "GET"'));
return;
}
@ -164,12 +167,13 @@ Client.prototype.request = function (params, cb) {
* @param {Object} params - Currently just a placeholder, no params used at this time
* @param {Function} cb - callback
*/
Client.prototype.ping = function (params, cb) {
this.request({
method: 'HEAD',
path: '/'
}, cb);
};
Client.prototype.ping = ca({
methods: ['HEAD'],
params: {},
url: {
fmt: '/'
}
});
/**
* Ask an ES node for a list of all the nodes, add/remove nodes from the connection
@ -193,26 +197,14 @@ Client.prototype.sniff = function (cb) {
}
cb(err, resp);
});
}
var _namespaces = [];
/**
* These names of the properties that hold namespace objects in the Client prototype
* @type {Array}
*/
Client.namespace = function (namespace) {
var steps = namespace.split('.');
var path = [];
var on = Client;
var i;
for (i = 0; i < steps.length; i ++) {
path.push(steps[i]);
_namespaces.push(path.join('.'));
on.prototype[steps[i]] = function ClientActionNamespace(client) {
this.client = client;
};
}
};
require('./api.js').attach(Client);
/**
* Shutdown the connections, log outputs, and clear timers
*/
Client.prototype.close = function () {
this.config.log.close();
this.config.connectionPool.close();
};

View File

@ -51,6 +51,15 @@ function Log(config) {
}
_.inherits(Log, EventEmitter);
Log.prototype.close = function () {
this.emit('closing');
if (EventEmitter.listenerCount(this)) {
console.error('Something is still listening for log events, but the logger is closing.');
this.clearAllListeners();
}
}
/**
* Levels observed by the loggers, ordered by rank
*

File diff suppressed because it is too large Load Diff

View File

@ -32,9 +32,10 @@ module.exports = Client;
var _ = require('./utils');
var ClientConfig = require('./client_config');
// var api = _.reKey(_.requireDir(module, '../api'), _.camelCase);
var ca = require('./client_action').create;
var q = require('q');
var errors = require('./errors');
var api = require('./api.js');
function Client(config) {
this.client = this;
@ -48,11 +49,13 @@ function Client(config) {
});
this.config.client = this;
for (var i = 0; i < _namespaces.length; i++) {
this[_namespaces[i]] = new this[_namespaces[i]](this);
for (var i = 0; i < this._namespaces.length; i++) {
this[this._namespaces[i]] = new this[this._namespaces[i]](this);
}
}
Client.prototype = api;
/**
* Perform a request with the client's transport
*
@ -91,7 +94,7 @@ Client.prototype.request = function (params, cb) {
}
if (params.body && params.method === 'GET') {
_.nextTick(cb, new TypeError('Body can not be sent with method "GET"'));
respond(new TypeError('Body can not be sent with method "GET"'));
return;
}
@ -164,12 +167,13 @@ Client.prototype.request = function (params, cb) {
* @param {Object} params - Currently just a placeholder, no params used at this time
* @param {Function} cb - callback
*/
Client.prototype.ping = function (params, cb) {
this.request({
method: 'HEAD',
path: '/'
}, cb);
};
Client.prototype.ping = ca({
methods: ['HEAD'],
params: {},
url: {
fmt: '/'
}
});
/**
* Ask an ES node for a list of all the nodes, add/remove nodes from the connection
@ -193,26 +197,14 @@ Client.prototype.sniff = function (cb) {
}
cb(err, resp);
});
}
var _namespaces = [];
/**
* These names of the properties that hold namespace objects in the Client prototype
* @type {Array}
*/
Client.namespace = function (namespace) {
var steps = namespace.split('.');
var path = [];
var on = Client;
var i;
for (i = 0; i < steps.length; i ++) {
path.push(steps[i]);
_namespaces.push(path.join('.'));
on.prototype[steps[i]] = function ClientActionNamespace(client) {
this.client = client;
};
}
};
require('./api.js').attach(Client);
/**
* Shutdown the connections, log outputs, and clear timers
*/
Client.prototype.close = function () {
this.config.log.close();
this.config.connectionPool.close();
};

View File

@ -2,10 +2,10 @@
* Constructs a function that can be called to make a request to ES
* @type {[type]}
*/
module.exports = function clientAction(spec) {
exports.create = function clientAction(spec) {
return function (params, cb) {
return exec(this.client, spec, params, cb);
}
};
};
var errors = require('./errors');
@ -75,8 +75,8 @@ var castType = {
}
};
function resolveUrl (url, params) {
var vars = {}, name, i, key;
function resolveUrl(url, params) {
var vars = {}, i, key;
if (url.req) {
// url has required params
@ -128,7 +128,7 @@ function resolveUrl (url, params) {
// remove it from the params so that it isn't sent to the final request
delete params[name];
}, {}));
};
}
function exec(client, spec, params, cb) {
if (typeof params === 'function') {
@ -139,20 +139,25 @@ function exec(client, spec, params, cb) {
cb = typeof cb === 'function' ? cb : _.noop;
}
var request = {
ignore: params.ignore
};
var request = {};
var parts = {};
var query = {};
var i;
if (spec.needsBody && !params.body) {
return _.nextTick(cb, new TyperError(spec.name + ' requires a request body.'));
return _.nextTick(cb, new TypeError('A request body is required.'));
}
if (params.body) {
request.body = params.body;
request.bulkBody = spec.bulkBody;
}
if (spec.bulkBody) {
request.bulkBody = true;
}
if (params.ignore) {
request.ignore = _.isArray(params.ignore) ? params.ignore : [params.ignore];
}
if (spec.methods.length === 1) {
@ -238,4 +243,4 @@ function exec(client, spec, params, cb) {
} else {
client.request(request, cb);
}
};
}

View File

@ -94,7 +94,7 @@ ClientConfig.prototype.prepareHosts = function (hosts) {
hosts = [hosts];
}
for(i = 0; i < hosts.length; i++) {
for (i = 0; i < hosts.length; i++) {
host = hosts[i];
if (typeof host === 'object') {
if (host.protocol) {

View File

@ -54,11 +54,13 @@ ConnectionAbstract.prototype.setStatus = function (status) {
this.status = status;
if (status === 'dead') {
if (status === 'dead' || status === 'closed') {
if (this.__deadTimeout) {
clearTimeout(this.__deadTimeout);
}
this.__deadTimeout = setTimeout(this.bound.resuscitate, this.config.deadTimeout);
if (status === 'dead') {
this.__deadTimeout = setTimeout(this.bound.resuscitate, this.config.deadTimeout);
}
}
this.emit('status changed', status, origStatus, this);

View File

@ -93,13 +93,15 @@ ConnectionPool.prototype._remove = function (connection) {
connection.setStatus('closed');
connection.removeListener('status changed', this.bound.onStatusChanged);
}
}
};
ConnectionPool.prototype.setNodes = function (nodeConfigs) {
var i;
var connection;
var i;
var id;
var node;
var toRemove = _.clone(this.index);
for (i = 0; i < nodeConfigs.length; i++) {
node = nodeConfigs[i];
if (node.hostname && node.port) {
@ -115,9 +117,9 @@ ConnectionPool.prototype.setNodes = function (nodeConfigs) {
}
_.each(toRemove, this._remove, this);
}
};
ConnectionPool.prototype.empty = function () {
ConnectionPool.prototype.close = function () {
this.setNodes([]);
};
ConnectionPool.prototype.empty = ConnectionPool.prototype.close;

View File

@ -70,8 +70,6 @@ HttpConnection.prototype.request = function (params, cb) {
headers: _.defaults(params.headers || {}, defaultHeaders)
});
log.debug('starting request', requestId);
// general clean-up procedure to run after the request, can only run once
var cleanUp = function (err) {
clearTimeout(timeoutId);
@ -80,7 +78,7 @@ HttpConnection.prototype.request = function (params, cb) {
incoming && incoming.removeAllListeners();
log.debug('calling back request', requestId, err ? 'with error "' + err.message + '"' : '');
_.nextTick(cb, err, reqParams, response, status);
cb(err, reqParams, response, status);
// override so this doesn't get called again
cleanUp = _.noop;
@ -88,9 +86,7 @@ HttpConnection.prototype.request = function (params, cb) {
reqParams.agent = this.agent;
request = http.request(reqParams);
request.on('response', function (_incoming) {
request = http.request(reqParams, function (_incoming) {
incoming = _incoming;
status = incoming.statusCode;
incoming.setEncoding('utf8');

View File

@ -51,6 +51,15 @@ function Log(config) {
}
_.inherits(Log, EventEmitter);
Log.prototype.close = function () {
this.emit('closing');
if (EventEmitter.listenerCount(this)) {
console.error('Something is still listening for log events, but the logger is closing.');
this.clearAllListeners();
}
}
/**
* Levels observed by the loggers, ordered by rank
*

View File

@ -85,7 +85,7 @@ LoggerAbstract.prototype.setupListeners = function (levels) {
*/
LoggerAbstract.prototype.cleanUpListeners = _.handler(function () {
_.each(this.listeningLevels, function (level) {
this.bridge.removeListener(level, this.handlers[level]);
this.bridge.removeListener(level, this.bound['on' + _.ucfirst(level)]);
}, this);
});

View File

@ -295,7 +295,7 @@ utils.inherits = function (constructor, superConstructor) {
* @returns {String}
*/
utils.trim = function (str) {
return typeof str !== 'string' ? str.replace(/^\s+|\s+$/g, '') : '';
return typeof str === 'string' ? str.replace(/^\s+|\s+$/g, '') : '';
};
utils.collectMatches = function (text, regExp) {
@ -385,8 +385,13 @@ utils.applyArgs = function (func, context, args, sliceIndex) {
* @return {[type]} [description]
*/
_.nextTick = function (cb) {
console.log('tick');
var args = Array.prototype.slice.call(arguments, 1);
// bind the function and schedule it
process.nextTick(_.bindKey(_, 'applyArgs', cb, null, arguments, 1));
process.nextTick(function () {
console.log('tock');
cb.apply(null, args);
});
};
/**