Added more unit tests, up to 98% coverage. Fixed the Gruntfile so it's not a cluster-f**k anymore.

This commit is contained in:
Spencer Alger
2013-12-06 18:39:48 -07:00
parent 086636c8a4
commit 270763e0a7
37 changed files with 1361 additions and 433 deletions

View File

@ -14,6 +14,7 @@ angular.module('elasticsearch.client', [])
.factory('esFactory', ['$http', '$q', function ($http, $q) {
AngularConnector.prototype.$http = $http;
AngularConnector.prototype.$q = $q;
var factory = function (config) {
config = config || {};

View File

@ -812,9 +812,10 @@ api.exists = ca({
* @param {String} params.preference - Specify the node or shard the operation should be performed on (default: random)
* @param {String} params.q - Query in the Lucene query string syntax
* @param {String} params.routing - Specific routing value
* @param {String, String[], Boolean} params.source - True or false to return the _source field or not, or a list of fields to return
* @param {String, String[], Boolean} params.sourceExclude - A list of fields to exclude from the returned _source field
* @param {String, String[], Boolean} params.sourceInclude - A list of fields to extract and return from the _source field
* @param {String} params.source - The URL-encoded query definition (instead of using the request body)
* @param {String, String[], Boolean} params._source - True or false to return the _source field or not, or a list of fields to return
* @param {String, String[], Boolean} params._sourceExclude - A list of fields to exclude from the returned _source field
* @param {String, String[], Boolean} params._sourceInclude - A list of fields to extract and return from the _source field
* @param {String} params.id - The document ID
* @param {String} params.index - The name of the index
* @param {String} params.type - The type of the document
@ -863,14 +864,16 @@ api.explain = ca({
type: 'string'
},
source: {
type: 'list',
name: '_source'
type: 'string'
},
sourceExclude: {
_source: {
type: 'list'
},
_sourceExclude: {
type: 'list',
name: '_source_exclude'
},
sourceInclude: {
_sourceInclude: {
type: 'list',
name: '_source_include'
}
@ -904,9 +907,9 @@ api.explain = ca({
* @param {Boolean} params.realtime - Specify whether to perform the operation in realtime or search mode
* @param {Boolean} params.refresh - Refresh the shard containing the document before performing the operation
* @param {String} params.routing - Specific routing value
* @param {String, String[], Boolean} params.source - True or false to return the _source field or not, or a list of fields to return
* @param {String, String[], Boolean} params.sourceExclude - A list of fields to exclude from the returned _source field
* @param {String, String[], Boolean} params.sourceInclude - A list of fields to extract and return from the _source field
* @param {String, String[], Boolean} params._source - True or false to return the _source field or not, or a list of fields to return
* @param {String, String[], Boolean} params._sourceExclude - A list of fields to exclude from the returned _source field
* @param {String, String[], Boolean} params._sourceInclude - A list of fields to extract and return from the _source field
* @param {String} params.id - The document ID
* @param {String} params.index - The name of the index
* @param {String} [params.type=_all] - The type of the document (use `_all` to fetch the first document matching the ID across all types)
@ -931,15 +934,14 @@ api.get = ca({
routing: {
type: 'string'
},
source: {
type: 'list',
name: '_source'
_source: {
type: 'list'
},
sourceExclude: {
_sourceExclude: {
type: 'list',
name: '_source_exclude'
},
sourceInclude: {
_sourceInclude: {
type: 'list',
name: '_source_include'
}
@ -2629,9 +2631,9 @@ api.info = ca({
* @param {String} params.preference - Specify the node or shard the operation should be performed on (default: random)
* @param {Boolean} params.realtime - Specify whether to perform the operation in realtime or search mode
* @param {Boolean} params.refresh - Refresh the shard containing the document before performing the operation
* @param {String, String[], Boolean} params.source - True or false to return the _source field or not, or a list of fields to return
* @param {String, String[], Boolean} params.sourceExclude - A list of fields to exclude from the returned _source field
* @param {String, String[], Boolean} params.sourceInclude - A list of fields to extract and return from the _source field
* @param {String, String[], Boolean} params._source - True or false to return the _source field or not, or a list of fields to return
* @param {String, String[], Boolean} params._sourceExclude - A list of fields to exclude from the returned _source field
* @param {String, String[], Boolean} params._sourceInclude - A list of fields to extract and return from the _source field
* @param {String} params.index - The name of the index
* @param {String} params.type - The type of the document
*/
@ -2649,15 +2651,14 @@ api.mget = ca({
refresh: {
type: 'boolean'
},
source: {
type: 'list',
name: '_source'
_source: {
type: 'list'
},
sourceExclude: {
_sourceExclude: {
type: 'list',
name: '_source_exclude'
},
sourceInclude: {
_sourceInclude: {
type: 'list',
name: '_source_include'
}
@ -2953,9 +2954,10 @@ api.scroll = ca({
* @param {String} params.searchType - Search operation type
* @param {Number} params.size - Number of hits to return (default: 10)
* @param {String, String[], Boolean} params.sort - A comma-separated list of <field>:<direction> pairs
* @param {String, String[], Boolean} params.source - True or false to return the _source field or not, or a list of fields to return
* @param {String, String[], Boolean} params.sourceExclude - A list of fields to exclude from the returned _source field
* @param {String, String[], Boolean} params.sourceInclude - A list of fields to extract and return from the _source field
* @param {String} params.source - The URL-encoded request definition using the Query DSL (instead of using request body)
* @param {String, String[], Boolean} params._source - True or false to return the _source field or not, or a list of fields to return
* @param {String, String[], Boolean} params._sourceExclude - A list of fields to exclude from the returned _source field
* @param {String, String[], Boolean} params._sourceInclude - A list of fields to extract and return from the _source field
* @param {String, String[], Boolean} params.stats - Specific 'tag' of the request for logging and statistical purposes
* @param {String} params.suggestField - Specify which field to use for suggestions
* @param {String} [params.suggestMode=missing] - Specify suggest mode
@ -3047,14 +3049,16 @@ api.search = ca({
type: 'list'
},
source: {
type: 'list',
name: '_source'
type: 'string'
},
sourceExclude: {
_source: {
type: 'list'
},
_sourceExclude: {
type: 'list',
name: '_source_exclude'
},
sourceInclude: {
_sourceInclude: {
type: 'list',
name: '_source_include'
},

View File

@ -32,7 +32,7 @@ function ConnectionPool(config) {
// a map of connections to their "id" property, used when sniffing
this.index = {};
this.connections = {
this._conns = {
alive: [],
dead: []
};
@ -59,18 +59,18 @@ delete ConnectionPool.connectionClasses._default;
* @return {[type]} [description]
*/
ConnectionPool.prototype.select = function (cb) {
if (this.connections.alive.length) {
if (this._conns.alive.length) {
if (this.selector.length > 1) {
this.selector(this.connections.alive, cb);
this.selector(this._conns.alive, cb);
} else {
try {
_.nextTick(cb, null, this.selector(this.connections.alive));
_.nextTick(cb, null, this.selector(this._conns.alive));
} catch (e) {
cb(e);
}
}
} else {
_.nextTick(cb, null, this.connections.dead[0]);
_.nextTick(cb, null, this.getConnection());
}
};
@ -88,19 +88,19 @@ ConnectionPool.prototype.onStatusSet = _.handler(function (status, oldStatus, co
switch (status) {
case 'alive':
from = this.connections.dead;
to = this.connections.alive;
from = this._conns.dead;
to = this._conns.alive;
break;
case 'dead':
from = this.connections.alive;
to = this.connections.dead;
from = this._conns.alive;
to = this._conns.dead;
break;
case 'redead':
from = this.connections.dead;
to = this.connections.dead;
from = this._conns.dead;
to = this._conns.dead;
break;
case 'closed':
from = this.connections[oldStatus];
from = this._conns[oldStatus];
break;
}
@ -119,6 +119,23 @@ ConnectionPool.prototype.onStatusSet = _.handler(function (status, oldStatus, co
}
});
/**
* Fetches the first active connection, falls back to dead connections
* This is really only here for testing purposes
*
* @private
* @return {Connection} - Some connection
*/
ConnectionPool.prototype.getConnection = function () {
if (this._conns.alive.length) {
return this._conns.alive[0];
}
if (this._conns.dead.length) {
return this._conns.dead[0];
}
};
ConnectionPool.prototype.addConnection = function (connection) {
if (!connection.id) {
connection.id = connection.host.toString();

View File

@ -16,18 +16,24 @@ function AngularConnector(host, config) {
_.inherits(AngularConnector, ConnectionAbstract);
AngularConnector.prototype.request = function (params, cb) {
var abort = this.$q.defer();
this.$http({
method: params.method,
url: this.host.makeUrl(params),
data: params.body,
cache: false,
timeout: _.has(params, 'requestTimeout') ? this.requestTimeout : 10000
timeout: abort.promise
}).then(function (response) {
cb(null, response.data, response.status);
}, function (err) {
cb(new ConnectionFault(err.message));
});
return function () {
abort.resolve();
};
};
// must be overwritten before this connection can be used
AngularConnector.prototype.$http = null;
AngularConnector.prototype.$q = null;

View File

@ -12,7 +12,7 @@ _.each(opts, function (conn, name) {
}
});
// custom __default specification
// custom _default specification
if (opts.xhr) {
opts._default = 'xhr';
} else if (opts.angular) {

View File

@ -13,7 +13,6 @@ var handles = {
https: require('https')
};
var _ = require('../utils');
var errors = require('../errors');
var qs = require('querystring');
var KeepAliveAgent = require('agentkeepalive');
var ConnectionAbstract = require('../connection');
@ -88,7 +87,6 @@ HttpConnector.prototype.request = function (params, cb) {
var request;
var response;
var status = 0;
var requestTimeout = _.has(params, 'requestTimeout') ? this.requestTimeout : 10000;
var log = this.log;
var reqParams = this.makeReqParams(params);
@ -105,7 +103,6 @@ HttpConnector.prototype.request = function (params, cb) {
err = void 0;
} else {
log.error(err);
this.setStatus('dead');
}
log.trace(params.method, reqParams, params.body, response, status);
@ -132,14 +129,6 @@ HttpConnector.prototype.request = function (params, cb) {
request.on('error', cleanUp);
if (requestTimeout) {
// timeout for the entire request.
timeoutId = setTimeout(function () {
request.abort();
request.emit('error', new errors.RequestTimeout('Request timed out at ' + requestTimeout + 'ms'));
}, requestTimeout);
}
request.setNoDelay(true);
request.setSocketKeepAlive(true);
request.chunkedEncoding = false;
@ -151,4 +140,8 @@ HttpConnector.prototype.request = function (params, cb) {
request.end();
}
this.requestCount++;
return function () {
request.abort();
};
};

View File

@ -14,7 +14,6 @@ JqueryConnector.prototype.request = function (params, cb) {
data: params.body,
dataType: 'json',
headers: params.headers,
timeout: params.requestTimeout,
done: cb
};
@ -24,7 +23,11 @@ JqueryConnector.prototype.request = function (params, cb) {
ajax.password = auths[1];
}
return jQuery.ajax(ajax);
var jqXhr = jQuery.ajax(ajax);
return function () {
jqXhr.abort();
};
};

View File

@ -10,7 +10,6 @@ module.exports = XhrConnector;
var _ = require('../utils');
var ConnectionAbstract = require('../connection');
var ConnectionFault = require('../errors').ConnectionFault;
var TimeoutError = require('../errors').RequestTimeout;
var asyncDefault = !(navigator && /PhantomJS/i.test(navigator.userAgent));
function XhrConnector(host, config) {
@ -50,7 +49,6 @@ if (!getXhr) {
XhrConnector.prototype.request = function (params, cb) {
var xhr = getXhr();
var requestTimeout = _.has(params, 'requestTimeout') ? this.requestTimeout : 10000;
var timeoutId;
var url = this.host.makeUrl(params);
var log = this.log;
@ -71,13 +69,9 @@ XhrConnector.prototype.request = function (params, cb) {
}
};
if (requestTimeout) {
timeoutId = setTimeout(function () {
xhr.onreadystatechange = _.noop;
xhr.abort();
cb(new TimeoutError());
}, requestTimeout);
}
xhr.send(params.body || void 0);
return function () {
xhr.abort();
};
};

View File

@ -22,6 +22,10 @@ function Log(config) {
var i;
var outputs;
if (config.loggers) {
config.log = config.loggers;
}
if (config.log) {
if (_.isArrayOfStrings(config.log)) {
outputs = [{

View File

@ -34,9 +34,8 @@ Console.prototype.setupListeners = function (levels) {
};
Console.prototype.write = function (label, message, to) {
/* jshint browser:true */
if (window.console && window.console[to]) {
window.console[to](this.format(label, message));
if (console[to]) {
console[to](this.format(label, message));
}
};

View File

@ -12,7 +12,7 @@ var when = require('when');
function Transport(config) {
config = config || {};
var LogClass = _.funcEnum(config, 'logClass', Transport.logs, 'main');
var LogClass = (typeof config.log === 'function') ? config.log : require('./log');
config.log = this.log = new LogClass(config);
// overwrite the createDefer method if a new implementation is provided
@ -34,22 +34,32 @@ function Transport(config) {
// setup max retries
this.maxRetries = config.hasOwnProperty('maxRetries') ? config.maxRetries : 3;
// setup requestTimeout default
this.requestTimeout = config.hasOwnProperty('requestTimeout') ? config.requestTimeout : 10000;
// randomizeHosts option
var randomizeHosts = config.hasOwnProperty('randomizeHosts') ? !!config.randomizeHosts : true;
if (config.host) {
config.hosts = config.host;
}
if (config.hosts) {
var hostsConfig = _.createArray(config.hosts, function (val) {
if (_.isPlainObject(val) || _.isString(val)) {
if (_.isPlainObject(val) || _.isString(val) || val instanceof Host) {
return val;
}
});
if (!hostsConfig) {
throw new Error('Invalid hosts config. Expected a URL, an array of urls, a host config object, or an array of ' +
'host config objects.');
throw new TypeError('Invalid hosts config. Expected a URL, an array of urls, a host config object, ' +
'or an array of host config objects.');
}
var hosts = _.map(hostsConfig, function (conf) {
return new Host(conf);
return (conf instanceof Host) ? conf : new Host(conf);
});
if (config.randomizeHosts) {
if (randomizeHosts) {
hosts = _.shuffle(hosts);
}
@ -65,10 +75,6 @@ Transport.serializers = {
json: require('./serializers/json')
};
Transport.logs = {
main: require('./log')
};
Transport.nodesToHostCallbacks = {
main: require('./nodes_to_host')
};
@ -91,8 +97,12 @@ Transport.prototype.request = function (params, cb) {
var self = this;
var remainingRetries = this.maxRetries;
var connection; // set in sendReqWithConnection
var connectionReq; // an object with an abort method, set in sendReqWithConnection
var request; // the object returned to the user, might be a deferred
var aborted = false; // several connector will respond with an error when the request is aborted
var requestAbort; // an abort function, returned by connection#request()
var requestTimeout; // the general timeout for the total request (inculding all retries)
var requestTimeoutId; // the id of the ^timeout
var request; // the object returned to the user, might be a promise
var defer; // the defer object, will be set when we are using promises.
self.log.debug('starting request', params);
@ -107,26 +117,22 @@ Transport.prototype.request = function (params, cb) {
}
params.req = {
requestTimeout: params.requestTimeout,
method: params.method,
path: params.path,
query: params.query,
body: params.body,
};
self.connectionPool.select(sendReqWithConnection);
function abortRequest() {
remainingRetries = 0;
connectionReq.abort();
}
function sendReqWithConnection(err, _connection) {
if (aborted) {
return;
}
if (err) {
respond(err);
} else if (_connection) {
connection = _connection;
connectionReq = connection.request(params.req, checkRespForFailure);
requestAbort = connection.request(params.req, checkRespForFailure);
} else {
self.log.warning('No living connections');
respond(new errors.NoConnections());
@ -134,17 +140,31 @@ Transport.prototype.request = function (params, cb) {
}
function checkRespForFailure(err, body, status) {
if (err && remainingRetries) {
remainingRetries--;
self.log.error(err.message, '-- retrying');
self.connectionPool.select(sendReqWithConnection);
if (aborted) {
return;
}
if (err) {
connection.setStatus('dead');
if (remainingRetries) {
remainingRetries--;
self.log.error('Request error, retrying --', err.message);
self.connectionPool.select(sendReqWithConnection);
} else {
self.log.error('Request complete with error --', err.message);
respond(new errors.ConnectionFault(err));
}
} else {
self.log.info('Request complete');
respond(err ? new errors.ConnectionFault() : void 0, body, status);
respond(void 0, body, status);
}
}
function respond(err, body, status) {
if (aborted) {
return;
}
var parsedBody;
if (!err && body) {
@ -154,18 +174,20 @@ Transport.prototype.request = function (params, cb) {
}
}
if (!err) {
if ((status < 200 || status >= 300)
&& (!params.ignore || !_.contains(params.ignore, status))
) {
if (errors[status]) {
err = new errors[status](parsedBody && parsedBody.error);
} else {
err = new errors.Generic('unknown error');
}
// does the response represent an error?
if (
(!err || err instanceof errors.Serialization)
&& (status < 200 || status >= 300)
&& (!params.ignore || !_.contains(params.ignore, status))
) {
if (errors[status]) {
err = new errors[status](parsedBody && parsedBody.error);
} else {
err = new errors.Generic('unknown error');
}
}
// how do we parse the body?
if (params.castExists) {
if (err && err instanceof errors.NotFound) {
parsedBody = false;
@ -175,29 +197,59 @@ Transport.prototype.request = function (params, cb) {
}
}
// how do we send the response?
if (typeof cb === 'function') {
cb(err, parsedBody, status);
if (err) {
cb(err);
} else {
cb(void 0, parsedBody, status);
}
} else if (err) {
request.reject(err);
defer.reject(err);
} else {
request.resolve({
defer.resolve({
body: parsedBody,
status: status
});
}
}
// determine the API based on the presense of a callback
function abortRequest() {
if (aborted) {
return;
}
aborted = true;
remainingRetries = 0;
clearTimeout(requestTimeoutId);
if (typeof requestAbort === 'function') {
requestAbort();
}
}
// set the requestTimeout
requestTimeout = params.hasOwnProperty('requestTimeout') ? params.requestTimeout : this.requestTimeout;
if (requestTimeout && requestTimeout !== Infinity) {
requestTimeoutId = setTimeout(function () {
respond(new errors.RequestTimeout());
abortRequest();
}, requestTimeout);
}
// determine the response based on the presense of a callback
if (typeof cb === 'function') {
request = {
abort: abortRequest
};
} else {
var defer = this.createDefer();
defer.promise.abort = abortRequest;
defer = this.createDefer();
request = defer.promise;
request.abort = abortRequest;
}
self.connectionPool.select(sendReqWithConnection);
return request;
};
@ -232,6 +284,10 @@ Transport.prototype.sniff = function (cb) {
});
};
/**
* Close the Transport, which closes the logs and connection pool
* @return {[type]} [description]
*/
Transport.prototype.close = function () {
this.log.close();
this.connectionPool.close();