Cleaned up the generation script, fixing the doc-blocks above the client actions.

Replaced the transport, giving it all of the functionality that was brought over to the client and making the client simply a place for the API to live. Essentially a shell that can easily be removed.

spec'd out the TransportRequest which will eventually inherit from one of server possible promise implementations and will be plugable. It will also implement the "abort" functionality needed in an environment like node.js
This commit is contained in:
Spencer Alger
2013-10-29 08:48:29 -07:00
parent cb35524096
commit 984a55f6c0
25 changed files with 1128 additions and 1052 deletions

View File

@ -0,0 +1,123 @@
var _ = require('../../../src/lib/utils');
var EventEmitter = require('events').EventEmitter;
var aliases = require('./aliases');
var castExistsRE = /exists/;
var usesBulkBodyRE = /^(bulk|msearch)$/;
var urlParamRE = /\{(\w+)\}/g;
var specCount = 0;
var actions = [];
var doneParsing = false;
require('../../get_spec')
.get('api/*.json')
.on('entry', transformFile)
.on('end', function () {
doneParsing = true;
if (actions.length === specCount) {
module.exports.emit('ready', actions);
}
});
function transformFile(entry) {
specCount++;
// itterate all of the specs within the file, should only be one
_.each(JSON.parse(entry.data), function (def, name) {
var steps = name.split('.');
var allParams = _.extend({}, def.url.params, def.url.parts);
var spec = {
name: name,
methods: _.map(def.methods, function (m) { return m.toUpperCase(); }),
params: def.url.params,
body: def.body || null,
path2lib: _.repeat('../', steps.length + 1) + 'lib/'
};
if (def.body && def.body.requires) {
spec.needBody = true;
}
if (usesBulkBodyRE.test(name)) {
spec.bulkBody = true;
}
if (castExistsRE.test(name)) {
spec.castExists = true;
}
var urls = _.difference(def.url.paths, aliases[name]);
urls = _.map(urls, function (url, i) {
var optionalVars = {};
var requiredVars = {};
var param;
var target;
var match;
if (url.charAt(0) !== '/') {
url = '/' + url;
}
while (match = urlParamRE.exec(url)) {
param = def.url.parts[match[1]] || {};
target = (param.required || !param.default) ? requiredVars : optionalVars;
target[match[1]] = _.omit(param, 'required');
}
[requiredVars, optionalVars].forEach(function (vars) {
_.each(vars, function (v, name) {
vars[name] = _.omit(v, 'description');
});
});
return _.omit({
fmt: url.replace(urlParamRE, '<%=$1%>'),
opt: _.size(optionalVars) ? optionalVars : null,
req: _.size(requiredVars) ? requiredVars : null,
sortOrder: _.size(requiredVars) * -1
}, function (v) {
return !v;
});
});
spec.urls = _.map(_.sortBy(urls, 'sortOrder'), function (url) {
return _.omit(url, 'sortOrder');
});
spec.params = _.transform(spec.params, function (note, param, name) {
param.name = name;
note[name] = _.pick(param, [
'type', 'default', 'options', 'required'
]);
}, {});
// escape method names with "special" keywords
var location = _.map(spec.name.split('.'), _.camelCase)
.join('.prototype.')
.replace(/(^|\.)(delete|default)(\.|$)/g, '[\'$2\']');
var action = {
spec: _.pick(spec, [
'methods',
'params',
'urls',
'needBody',
'bulkBody',
'castExists',
'castNotFound'
]),
location: location,
docUrl: def.documentation,
name: spec.name,
allParams: allParams
};
if (actions.push(action) === specCount && doneParsing) {
module.exports.emit('ready', action);
}
});
}
module.exports = new EventEmitter();

View File

@ -9,83 +9,18 @@ var urlParamRE = /\{(\w+)\}/g;
var outputPath = _.joinPath(__dirname, '../../../src/lib/api.js');
require('./spec').on('ready', function (specs) {
require('./actions').on('ready', function (actions) {
var defs = [];
var namespaces = [];
var namespaces = _.filter(_.map(actions, function (action) {
if (~action.location.indexOf('.')) {
var path = action.location.split('.').slice(0, -1);
_.pull(path, 'prototype');
return path.join('.');
}
}));
clean(outputPath);
var actions = _.map(specs, function (spec) {
spec.urls = _.map(
_.sortBy(
_.transform(spec.urls, function (note, url, i) {
var optionalVars = {};
var requiredVars = {};
var param;
var target;
var match;
if (url.charAt(0) !== '/') {
url = '/' + url;
}
while (match = urlParamRE.exec(url)) {
param = spec.urlParts[match[1]] || {};
target = (param.required || !param.default) ? requiredVars : optionalVars;
target[match[1]] = _.omit(param, 'required');
}
[requiredVars, optionalVars].forEach(function (vars) {
_.each(vars, function (v, name) {
vars[name] = _.omit(v, 'description');
});
});
note.push(_.omit({
fmt: url.replace(urlParamRE, '<%=$1%>'),
opt: _.size(optionalVars) ? optionalVars : null,
req: _.size(requiredVars) ? requiredVars : null,
sortOrder: _.size(requiredVars) * -1
}, function (v) { return !v; }));
}, [])
, 'sortOrder')
, function (url) {
return _.omit(url, 'sortOrder');
});
var docUrl = spec.docUrl;
var location = _.map(spec.name.split('.'), _.camelCase).join('.');
spec = _.pick(spec, [
'methods',
'params',
'urls',
'needBody',
'bulkBody',
'castNotFound'
]);
spec.params = _.transform(spec.params, function (note, param, name) {
param.name = name;
note[name] = _.pick(param, [
'type', 'default', 'options', 'required'
]);
}, {});
if (~location.indexOf('.')) {
var steps = location.split('.');
namespaces.push(steps.slice(0, -1).join('.'));
location = steps.join('.prototype.');
}
// escape method names with "special" keywords
location = location.replace(/(^|\.)(delete|default)(\.|$)/g, '[\'$2\']');
return {
spec: spec,
location: location,
docUrl: docUrl
};
});
console.log('writing', actions.length, 'api actions to', outputPath);
fs.writeFileSync(outputPath, templates.apiFile({
actions: actions,

View File

@ -1,58 +0,0 @@
var _ = require('../../../src/lib/utils');
var EventEmitter = require('events').EventEmitter;
var aliases = require('./aliases');
var castNotFoundRE = /exists/;
var usesBulkBodyRE = /^(bulk|msearch)$/;
var specCount = 0;
var completedSpecs = [];
var doneParsing = false;
require('../../get_spec')
.get('api/*.json')
.on('entry', transformFile)
.on('end', function () {
doneParsing = true;
if (completedSpecs.length === specCount) {
module.exports.emit('ready', completedSpecs);
}
});
function transformFile(entry) {
specCount++;
var file = entry.data;
// itterate all of the specs within the file, should only be one
_.each(JSON.parse(file), function (def, name) {
var steps = name.split('.');
var spec = {
name: name,
methods: _.map(def.methods, function (m) { return m.toUpperCase(); }),
docUrl: def.documentation,
urlParts: def.url.parts,
params: def.url.params,
urls: _.difference(def.url.paths, aliases[name]),
body: def.body || null,
path2lib: _.repeat('../', steps.length + 1) + 'lib/'
};
if (def.body && def.body.requires) {
spec.needBody = true;
}
if (usesBulkBodyRE.test(name)) {
spec.bulkBody = true;
}
if (castNotFoundRE.test(name)) {
spec.castNotFound = true;
}
if (completedSpecs.push(spec) === specCount && doneParsing) {
module.exports.emit('ready', completedSpecs);
}
});
}
module.exports = new EventEmitter();

View File

@ -1,4 +1,4 @@
var ca = require('./client_action').create;
var ca = require('./client_action');
var errors = require('./errors');
var api = module.exports = {};

View File

@ -1,8 +1,8 @@
/**
* Perform a [<%= spec.name %>](<%= docUrl %>) request
* Perform a [<%= name %>](<%= docUrl %>) request
*
* @param {Object} params - An object with parameters used to carry out this action<%
_.each(spec.params, function(param, paramName) { %>
_.each(allParams, function(param, paramName) { %>
* @param {<%= paramType(param.type) %>} <%= paramWithDefault('params.' + paramName, param.default) %><%
if (param.description) {
%> - <%= param.description %><%

View File

@ -1,8 +1,7 @@
var _ = require('../../../../src/lib/utils')
, fs = require('fs')
, path = require('path')
, urlParamRE = /\{(\w+)\}/g;
var _ = require('../../../../src/lib/utils');
var fs = require('fs');
var path = require('path');
/**
@ -16,7 +15,7 @@ function lines(i) {
if (line === '') {
// no indent on empty lines
l.lines.push('');
} else if (typeof line !== 'undefined') {
} else if (line === void 0) {
l.lines.push(_.repeat(' ', l.indent) + line);
}
return l;
@ -81,152 +80,6 @@ var templates = {};
*/
var templateGlobals = {
writeParams: function (indent, params, namespace) {
var l = lines(indent);
_.each(params, function (param, name) {
if (!param.required) {
l('if (typeof params.' + name + ' !== \'undefined\') {').in();
}
l.split(templates[param.type || 'any']({
get: 'params.' + name,
set: namespace + name,
name: name
}));
if (!param.required) {
l.out();
l('}');
}
l('');
});
return l.toString();
},
writeBrowserParams: function (indent, params, namespace) {
var l = lines(indent);
_.each(params, function (param, name) {
if (!param.required) {
l('if (_.has(params, ' + stringify(name) + ')) {').in();
}
switch (param.type) {
case 'enum':
l(
namespace + name + ' = _.' +
(param.type || 'any') + 'Param(params.' + name + ', ' + stringify(param.options) +
');'
);
break;
default:
l(namespace + name + ' = _.' + (param.type || 'any') + 'Param(params.' + name + ');');
break;
}
if (!param.required) {
l.out('}');
}
l('');
});
return l.toString();
},
writeUrls: function (indent, urls, urlParams, queryStringParams) {
var l = lines(indent);
function urlVarIsRequired(varDetails) {
varDetails = typeof varDetails === 'string' ? urlParams[varDetails] : varDetails;
return varDetails && (varDetails.required || !varDetails.default);
}
// turn a url string into an object describing the url, then sort them in decending order by how many args they have
urls = _.sortBy(urls, function (url) {
var requiredVars = _.filter(_.collectMatches(url, urlParamRE), function (match) {
return urlVarIsRequired(urlParams[match[1]]);
});
return requiredVars ? requiredVars.length * -1 : 0;
});
_.each(urls, function (url, urlIndex) {
// collect the vars from the url and replace them to form the js that will build the url
var makeL = lines(), vars = [];
makeL('request.path = \'' + url.replace(urlParamRE, function (match, varName) {
var varDetails = urlParams[varName];
varDetails.name = varName;
vars.push(varDetails);
if (urlVarIsRequired(varDetails)) {
return '\' + encodeURIComponent(parts.' + varName + ') + \'';
} else {
return '\' + encodeURIComponent(parts.' + varName + ' || ' + stringify(varDetails.default) + ') + \'';
}
}) + '\';');
makeL(_.filter(_.map(vars, function (v, i) {
if (_.has(queryStringParams, v.name)) {
// delete the param so that it's not used later on in the queryString
return 'delete params.' + v.name + ';';
}
})).join(' '));
if (vars.length || urlIndex) {
var requiredVars = _.filter(vars, urlVarIsRequired);
var condition = _.map(requiredVars, function (v) {
return 'parts.' + v.name + ')';
}).join(' && ');
l((urlIndex > 0 ? 'else ' : '') + (condition ? 'if (' + condition + ') ' : '') + '{')
.in()
.split(makeL.toString())
.out('}');
if (urlIndex === urls.length - 1 && condition) {
l('else {')
.in('throw new TypeError(\'Unable to build a path with those params. Supply at least ' +
vars.join(', ') + '\');'
)
.out('}');
}
} else {
l.split(makeL.toString());
}
});
l('');
return l.toString();
},
writeRequestObjectBody: function (indent, name, body, methods) {
var parts = [], l = lines(indent);
if (~name.indexOf('exists')) {
parts.push('ignore: _.union([404], params.ignore)');
} else {
parts.push('ignore: params.ignore');
}
if (body) {
if (_.contains(['bulk', 'msearch'], name)) {
parts.push('body: this.client.config.serializer.bulkBody(params.body || null)');
} else {
parts.push('body: params.body || null');
}
}
if (methods.length === 1) {
parts.push('method: ' + stringify(methods[0]));
}
_.each(parts, function (part, i) {
l(part + (i < parts.length - 1 ? ',' : ''));
});
return l.toString();
},
stringify: stringify,
_: _,
@ -254,22 +107,6 @@ var templateGlobals = {
}
},
returnStatement: function (indent, name) {
var l = lines(indent);
if (name.match(/(^|\.)exists/)) {
l('this.client.request(request, function (err, response) {')
.in('if (err instanceof errors.NotFound) {')
.in('cb(err, false);')
.out('} else {')
.in('cb(err, true);')
.out('}')
.out('});');
} else {
l('this.client.request(request, cb);');
}
return l.toString();
},
partials: templates
};
@ -290,6 +127,5 @@ fs.readdirSync(path.resolve(__dirname)).forEach(function (filename) {
templates.text = templates.string;
module.exports = {
apiFile: templates.api_file,
urlParamRE: urlParamRE
apiFile: templates.api_file
};

View File

@ -12,16 +12,16 @@ var client = new es.Client({
maxSockets: 100
});
console.log('clearing existing "test-docs" indices');
async.series([
function (done) {
console.log('clearing existing "test-docs" indices');
client.indices.delete({
index: 'test-docs',
ignore: 404
}, done);
},
function (done) {
console.log('waiting for cluster');
client.cluster.health({
wait_for_status: 'yellow'
}, done);
@ -43,4 +43,6 @@ async.series([
}
});
}
]);
], function (err) {
if (err) console.error(err);
});

File diff suppressed because it is too large Load Diff

View File

@ -32,8 +32,6 @@ module.exports = Client;
var _ = require('./utils');
var ClientConfig = require('./client_config');
var ca = require('./client_action').create;
var errors = require('./errors');
var api = require('./api.js');
function Client(config) {
@ -48,117 +46,13 @@ function Client(config) {
});
this.config.client = this;
// instansiate the api's namespaces
for (var i = 0; i < this._namespaces.length; i++) {
this[this._namespaces[i]] = new this[this._namespaces[i]](this);
}
}
Client.prototype = api;
/**
* Perform a request with the client's transport
*
* @method request
* @todo async body writing
* @todo abort
* @todo access to custom headers, modifying of request in general
* @param {object} params
* @param {String} params.url - The url for the request
* @param {String} params.method - The HTTP method for the request
* @param {String} params.body - The body of the HTTP request
* @param {Function} cb - A function to call back with (error, responseBody, responseStatus)
*/
Client.prototype.request = function (params, cb) {
var serializer = this.config.serializer;
// in cb isn't a function make it one
cb = typeof cb === 'function' ? cb : _.noop;
var connectionPool = this.config.connectionPool;
var log = this.config.log;
var remainingRetries = this.config.maxRetries;
var connection;
log.debug('starting request', params);
// get ignore and ensure that it's an array
var ignore = params.ignore;
if (ignore && !_.isArray(ignore)) {
ignore = [ignore];
}
// serialize the body
if (params.body) {
params.body = params.bulkBody ? serializer.bulkBody(params.body) : serializer.serialize(params.body);
}
if (params.body && params.method === 'GET') {
respond(new TypeError('Body can not be sent with method "GET"'));
return;
}
function sendRequestWithConnection(err, _connection) {
if (err) {
log.error(err);
respond(err);
} else if (_connection) {
connection = _connection;
log.info('Selected', _connection.status, 'Connection, making request');
connection.request(params, checkRespForFailure);
} else {
log.warning('No living connections');
respond(new errors.ConnectionFault('No living connections.'));
}
}
function checkRespForFailure(err, reqParams, body, status) {
connection.setStatus(err ? 'dead' : 'alive');
if (err) {
log.error(err);
}
if (err && remainingRetries) {
remainingRetries--;
log.info('Connection error, retrying');
connectionPool.select(sendRequestWithConnection);
} else {
log.info('Request complete');
respond(err, reqParams, body, status);
}
}
function respond(err, reqParams, body, status) {
var parsedBody = null;
if (reqParams) {
log.trace(reqParams.method, reqParams, params.body, body, status);
}
if (!err) {
if (body) {
parsedBody = serializer.unserialize(body);
if (!parsedBody) {
err = new errors.ParseError();
}
} else if (reqParams.method === 'HEAD') {
parsedBody = (status === 200);
}
}
if (err) {
cb(err, parsedBody, status);
} else if ((status >= 200 && status < 300) || ignore && _.contains(ignore, status)) {
cb(void 0, parsedBody, status);
} else {
if (errors[status]) {
cb(new errors[status](parsedBody.error), parsedBody, status);
} else {
cb(new errors.Generic('unknown error'), parsedBody, status);
}
}
}
connectionPool.select(sendRequestWithConnection);
};
Client.prototype = _.clone(api);
/**
* Ping some node to ensure that the cluster is available in some respect
@ -166,44 +60,18 @@ Client.prototype.request = function (params, cb) {
* @param {Object} params - Currently just a placeholder, no params used at this time
* @param {Function} cb - callback
*/
Client.prototype.ping = ca({
methods: ['HEAD'],
params: {},
url: {
fmt: '/'
}
});
Client.prototype.ping = function (params, cb) {
if (typeof params === 'function') {
cb = params;
params = {};
}
/**
* Ask an ES node for a list of all the nodes, add/remove nodes from the connection
* pool as appropriate
*
* @param {Function} cb - Function to call back once complete
*/
Client.prototype.sniff = function (cb) {
var config = this.config;
// make cb a function if it isn't
cb = typeof cb === 'function' ? cb : _.noop;
this.request({
path: '/_cluster/nodes',
method: 'GET'
}, function (err, resp) {
if (!err && resp && resp.nodes) {
var nodes = config.nodesToHostCallback(resp.nodes);
config.connectionPool.setNodes(nodes);
}
cb(err, resp);
});
this.config.transport.request({
method: 'HEAD',
path: '/'
}, cb);
};
/**
* Shutdown the connections, log outputs, and clear timers
*/
Client.prototype.close = function () {
this.config.log.close();
this.config.connectionPool.close();
this.config.close();
};

View File

@ -2,14 +2,13 @@
* Constructs a function that can be called to make a request to ES
* @type {[type]}
*/
exports.create = function clientAction(spec) {
module.exports = function ClientAction(spec, client) {
return function (params, cb) {
return exec(this.client, spec, params, cb);
return exec((client || this.client).config.transport, spec, params, cb);
};
};
var errors = require('./errors');
var _ = require('./utils');
var urlParamRE = /\{(\w+)\}/g;
@ -130,7 +129,7 @@ function resolveUrl(url, params) {
}, {}));
}
function exec(client, spec, params, cb) {
function exec(transport, spec, params, cb) {
if (typeof params === 'function') {
cb = params;
params = {};
@ -148,17 +147,13 @@ function exec(client, spec, params, cb) {
return _.nextTick(cb, new TypeError('A request body is required.'));
}
if (params.body) {
request.body = params.body;
}
params.body && (request.body = params.body);
params.ignore && (request.ignore = _.isArray(params.ignore) ? params.ignore : [params.ignore]);
params.timeout && (request.ignore = _.isArray(params.ignore) ? params.ignore : [params.ignore]);
if (spec.bulkBody) {
request.bulkBody = true;
}
if (params.ignore) {
request.ignore = _.isArray(params.ignore) ? params.ignore : [params.ignore];
}
// copy over some properties from the spec
spec.bulkBody && (request.bulkBody = true);
spec.castExists && (request.castExists = true);
if (spec.methods.length === 1) {
request.method = spec.methods[0];
@ -230,17 +225,7 @@ function exec(client, spec, params, cb) {
}
}
request.path = request.path + _.makeQueryString(query);
request.query = query;
if (spec.castNotFound) {
client.request(request, function (err, response) {
if (err instanceof errors.NotFound) {
cb(null, false);
} else {
cb(err, !err);
}
});
} else {
client.request(request, cb);
}
transport.request(request, cb);
}

View File

@ -8,24 +8,30 @@ module.exports = ClientConfig;
var url = require('url');
var _ = require('./utils');
var Host = require('./host');
var selectors = _.reKey(_.requireDir(module, './selectors'), _.camelCase);
var connections = _.requireClasses(module, './connections');
var serializers = _.requireClasses(module, './serializers');
var ConnectionPool = require('./connection_pool');
var Log = require('./log');
var extractHostPartsRE = /\[([^:]+):(\d+)]/;
var hostProtocolRE = /^([a-z]+:)?\/\//;
var defaultClasses = {
log: require('./log'),
serializer: serializers.Json,
connectionPool: require('./connection_pool'),
transport: require('./transport'),
};
var defaultConfig = {
hosts: [
{
protocol: 'http:',
hostname: 'localhost',
port: 9200
host: 'localhost',
port: 9200,
protocol: 'http'
}
],
connectionConstructor: 'Http',
connectionClass: connections.Http,
selector: selectors.roundRobin,
sniffOnStart: false,
sniffAfterRequests: null,
@ -37,14 +43,16 @@ var defaultConfig = {
nodesToHostCallback: function (nodes) {
var hosts = [];
_.each(nodes, function (node, id) {
var hostnameMatches = extractHostPartsRE.exec(node.hostname);
var hostnameMatches = extractHostPartsRE.exec(node.host);
hosts.push({
hostname: hostnameMatches[1],
host: hostnameMatches[1],
port: hostnameMatches[2],
id: id,
name: node.name,
servername: node.hostname,
version: node.version
_meta: {
id: id,
name: node.name,
servername: node.host,
version: node.version
}
});
});
return hosts;
@ -54,13 +62,13 @@ var defaultConfig = {
function ClientConfig(config) {
_.extend(this, defaultConfig, config);
// validate connectionConstructor
if (typeof this.connectionConstructor !== 'function') {
if (_.has(connections, this.connectionConstructor)) {
this.connectionConstructor = connections[this.connectionConstructor];
// validate connectionClass
if (typeof this.connectionClass !== 'function') {
if (typeof connections[this.connectionClass] === 'function') {
this.connectionClass = connections[this.connectionClass];
} else {
throw new TypeError('Invalid connectionConstructor ' + this.connectionConstructor +
', specify a function or one of ' + _.keys(connections).join(', '));
throw new TypeError('Invalid connectionClass ' + this.connectionClass + '. ' +
'Expected a constructor or one of ' + _.keys(connections).join(', '));
}
}
@ -74,10 +82,9 @@ function ClientConfig(config) {
}
}
// currently not configurable because!
this.log = new Log(this);
this.connectionPool = new ConnectionPool(this);
this.serializer = new serializers.Json(this);
_.each(defaultClasses, function (DefaultClass, prop) {
this[prop] = typeof this[prop] === 'function' ? new this[prop](this) : new DefaultClass(this);
}, this);
// populate the connection pool
this.connectionPool.setNodes(this.prepareHosts(this.hosts));
@ -95,49 +102,15 @@ ClientConfig.prototype.prepareHosts = function (hosts) {
hosts = [hosts];
}
for (i = 0; i < hosts.length; i++) {
host = hosts[i];
if (typeof host === 'object') {
if (host.protocol) {
// the protocol must end in a color
if (host.protocol[host.protocol.length - 1] !== ':') {
host.protocol = host.protocol + ':';
}
} else {
host.protocol = 'http:';
}
if (host.host && !host.hostname) {
// utl.format && url.parse uses "hostname" to represent just the name of the host, "host" is "hostname + port"
host.hostname = host.host;
delete host.host;
}
if (!host.hostname) {
host.hostname = 'localhost';
}
if (!host.port) {
host.port = 9200;
}
} else {
// assume it is a string.
if (!hostProtocolRE.test(host)) {
// add a defaul protocol
host = 'http://' + host;
}
// parse the url please, node
var urlInfo = url.parse(host, false, true);
// override the host value
hosts[i] = {
protocol: urlInfo.protocol || 'http:',
hostname: urlInfo.hostname || 'localhost',
port: urlInfo.port || 9200
};
}
}
return hosts;
return _.map(hosts, function (host) {
return new Host(host);
});
};
/**
* Shutdown the connections, log outputs, and clear timers
*/
ClientConfig.prototype.close = function () {
this.log.close();
this.connectionPool.close();
};

View File

@ -5,18 +5,18 @@ var _ = require('./utils'),
/**
* Abstract class used for Connection classes
* @param client {Client} - The client that this connection belongs to
* @param config {Object} - a map of configuration details for this connection
* @param [config.hostname=localhost] {String} - The hostname for the node this connection connects to
* @param [config.port=9200] {Integer} - The port on the server that ES is listening to
* @class ConnectionAbstract
* @constructor
*/
function ConnectionAbstract(config, nodeInfo) {
function ConnectionAbstract(host, config) {
EventEmitter.call(this);
this.config = config;
this.hostname = nodeInfo.hostname || 'localhost';
this.port = nodeInfo.port || 9200;
this.host = host;
this.requestCount = 0;
if (!this.host) {
throw new Error('Missing host config');
}
_.makeBoundMethods(this);
}

View File

@ -8,11 +8,12 @@
module.exports = ConnectionPool;
var _ = require('./utils'),
selectors = _.reKey(_.requireDir(module, './selectors'), _.camelCase),
connectors = _.reKey(_.requireDir(module, './connections'), _.studlyCase),
EventEmitter = require('events').EventEmitter,
errors = require('./errors');
var _ = require('./utils');
var selectors = _.reKey(_.requireDir(module, './selectors'), _.camelCase);
var connectors = _.reKey(_.requireDir(module, './connections'), _.studlyCase);
var EventEmitter = require('events').EventEmitter;
var errors = require('./errors');
var Host = require('./host');
function ConnectionPool(config) {
_.makeBoundMethods(this);
@ -32,6 +33,7 @@ ConnectionPool.prototype.select = function (cb) {
try {
cb(null, this.config.selector(this.connections.alive));
} catch (e) {
this.config.log.error(e);
cb(e);
}
}
@ -46,7 +48,7 @@ ConnectionPool.prototype.onStatusChanged = _.handler(function (status, oldStatus
if (oldStatus === status) {
return true;
} else {
this.config.log.info('connection to', _.formatUrl(connection), 'is', status);
this.config.log.info('connection id:', connection.__id, 'is', status);
}
switch (status) {
@ -103,12 +105,12 @@ ConnectionPool.prototype.setNodes = function (nodeConfigs) {
for (i = 0; i < nodeConfigs.length; i++) {
node = nodeConfigs[i];
if (node.hostname && node.port) {
id = node.hostname + ':' + node.port;
if (node instanceof Host) {
id = node.toString();
if (this.index[id]) {
delete toRemove[id];
} else {
connection = new this.config.connectionConstructor(this.config, nodeConfigs[i]);
connection = new this.config.connectionClass(node, this.config);
connection.__id = id;
this._add(connection);
}

View File

@ -8,22 +8,19 @@
*/
module.exports = HttpConnection;
var http = require('http'),
_ = require('../utils'),
errors = require('../errors'),
ConnectionAbstract = require('../connection'),
defaultHeaders = {
'connection': 'keep-alive'
};
var http = require('http');
var https = require('https');
var _ = require('../utils');
var errors = require('../errors');
var qs = require('querystring');
var ConnectionAbstract = require('../connection');
var defaultHeaders = {
'connection': 'keep-alive'
};
function HttpConnection(config, nodeInfo) {
ConnectionAbstract.call(this, config, nodeInfo);
this.protocol = nodeInfo.protocol || 'http:';
if (this.protocol[this.protocol.length - 1] !== ':') {
this.protocol = this.protocol + ':';
}
function HttpConnection(host, config) {
ConnectionAbstract.call(this, host, config);
this.agent = new http.Agent({
keepAlive: true,
@ -35,7 +32,6 @@ function HttpConnection(config, nodeInfo) {
this.on('closed', this.bound.onClosed);
this.once('alive', this.bound.onAlive);
this.requestCount = 0;
}
_.inherits(HttpConnection, ConnectionAbstract);
@ -49,41 +45,63 @@ HttpConnection.prototype.onAlive = _.handler(function () {
this.agent.maxSockets = this.config.maxSockets;
});
HttpConnection.prototype.makeReqParams = function (params) {
var reqParams = {
method: params.method,
protocol: this.host.protocol + ':',
auth: this.host.auth,
hostname: this.host.host,
port: this.host.port,
path: this.host.path + params.path,
headers: this.host.headers,
agent: this.agent
};
var query = qs.stringify(this.host.query ? _.defaults(params.query, this.host.query) : params.query);
reqParams.path += query ? '?' + query : '';
return reqParams;
};
HttpConnection.prototype.request = function (params, cb) {
var incoming;
var timeoutId;
var log = this.config.log;
var request;
var requestId = this.requestCount;
var response;
var responseStarted = false;
var status = 0;
var timeout = params.timeout || this.config.timeout;
var log = this.config.log;
var reqParams = _.defaults({
protocol: this.protocol,
hostname: this.hostname,
port: this.port,
path: params.path,
method: _.toUpperString(params.method) || (params.body ? 'POST' : 'GET'),
headers: _.defaults(params.headers || {}, defaultHeaders)
});
var reqParams = this.makeReqParams(params);
// general clean-up procedure to run after the request, can only run once
// general clean-up procedure to run after the request
// completes, has an error, or is aborted.
var cleanUp = function (err) {
clearTimeout(timeoutId);
request && request.removeAllListeners();
incoming && incoming.removeAllListeners();
log.debug('calling back request', requestId, err ? 'with error "' + err.message + '"' : '');
cb(err, reqParams, response, status);
if ((err instanceof Error) === false) {
err = void 0;
} else {
log.error(err);
// override so this doesn't get called again
cleanUp = _.noop;
if (err instanceof errors.RequestTimeout) {
request.on('error', function catchAbortError() {
request.removeListener('error', catchAbortError);
});
} else {
this.setStatus('dead');
}
}
log.trace(params.method, reqParams, params.body, response, status);
cb(err, response, status);
};
reqParams.agent = this.agent;
request = http.request(reqParams, function (_incoming) {
incoming = _incoming;
@ -95,20 +113,19 @@ HttpConnection.prototype.request = function (params, cb) {
response += d;
});
incoming.on('end', function requestComplete() {
cleanUp();
});
incoming.on('error', cleanUp);
incoming.on('end', cleanUp);
});
request.on('error', function (err) {
request.abort();
cleanUp(err);
});
request.on('error', cleanUp);
// timeout for the entire request.
timeoutId = setTimeout(function () {
request.emit('error', new errors.RequestTimeout('Request timed out at ' + timeout + 'ms'));
}, timeout);
if (timeout !== Infinity) {
// timeout for the entire request.
timeoutId = setTimeout(function () {
request.abort();
request.emit('error', new errors.RequestTimeout('Request timed out at ' + timeout + 'ms'));
}, timeout);
}
request.setNoDelay(true);
request.setSocketKeepAlive(true);

View File

@ -1,9 +1,16 @@
/* jshint browser: true, jquery: true */
/**
* Simple connection class for using the XHR object in browsers
*
* @class {XhrConnection}
*/
module.exports = JqueryConnection;
var _ = require('../utils'),
ConnectionAbstract = require('../connection');
function JqueryConnection() {}
JqueryConnection.prototype.request = function (params, cb) {
var $xhr = jQuery.ajax(params).done(cb);
};
function JqueryConnection() {
}
_.inherits(JqueryConnection, ConnectionAbstract);

View File

@ -3,13 +3,73 @@
*
* @class connections.Xhr
*/
module.exports = XhrConnection;
var _ = require('../utils'),
ConnectionAbstract = require('../connection');
/* jshint browser:true */
function XhrConnection() {
var _ = require('../utils');
var ConnectionAbstract = require('../connection');
var ConnectionError = require('../errors').ConnectionError;
var TimeoutError = require('../errors').TimeoutError;
function XhrConnection(config, nodeInfo) {
ConnectionAbstract.call(this, config, nodeInfo);
}
_.inherits(XhrConnection, ConnectionAbstract);
/**
* Simply returns an XHR object cross browser
* @type {Function}
*/
var getXhr = _.noop;
if (typeof XMLHttpRequest !== 'undefined') {
// rewrite the getXhr method to always return the native implementation
getXhr = function () {
return new XMLHttpRequest();
};
} else {
// find the first MS implementation available
getXhr = _.first(['Msxml2.XMLHTTP', 'Microsoft.XMLHTTP', 'Msxml2.XMLHTTP.4.0'], function (appName) {
try {
var test = new window.ActiveXObject(appName);
return function () {
return new window.ActiveXObject(appName);
};
} catch (e) {
return null;
}
});
}
if (!getXhr) {
throw new Error('getXhr(): XMLHttpRequest not available');
}
XhrConnection.prototype.request = function (params, cb) {
var xhr = getXhr();
var timeoutId;
if (params.auth) {
xhr.open(params.method, params.url, true, params.auth.user, params.auth.pass);
} else {
xhr.open(params.method, params.url, true);
}
xhr.onreadystatechange = function (e) {
if (xhr.readyState === 4) {
clearTimeout(timeoutId);
cb(xhr.status ? null : new ConnectionError(), xhr.responseText, xhr.status);
}
};
if (params.timeout !== Infinity) {
timeoutId = setTimeout(function () {
xhr.onreadystatechange = _.noop;
xhr.abort();
cb(new TimeoutError());
}, params.timeout);
}
xhr.send(params.body || null);
};

View File

@ -36,14 +36,15 @@ errors.RequestTimeout = function RequestTimeout(msg) {
};
_.inherits(errors.RequestTimeout, ErrorAbstract);
/**
* Request Body could not be parsed
* @param {String} [msg] - An error message that will probably end up in a log.
*/
errors.Serialization = function RequestTimeout(msg) {
ErrorAbstract.call(this, msg || 'Unable to parse response body', errors.RequestTimeout);
errors.Serialization = function Serialization(msg) {
ErrorAbstract.call(this, msg || 'Unable to parse/serialize body', errors.Serialization);
};
_.inherits(errors.RequestTimeout, ErrorAbstract);
_.inherits(errors.Serialization, ErrorAbstract);
var statusCodes = {

65
src/lib/host.js Normal file
View File

@ -0,0 +1,65 @@
/**
* Class to wrap URLS, formatting them and maintaining their seperate details
* @type {[type]}
*/
module.exports = Host;
var url = require('url');
var qs = require('querystring');
var _ = require('./utils');
var startsWithProtocolRE = /^([a-z]+:)?\/\//;
// simple reference used when formatting as a url
var defaultPort = {
http: 80,
https: 443
};
function Host(config) {
if (this instanceof Host) {
if (typeof config === 'string') {
return Host.fromString(config);
} else {
_.extend(this, config || {});
}
} else {
return new Host(config);
}
}
Host.fromString = function (urlString) {
if (!startsWithProtocolRE.test(urlString)) {
urlString = 'http://' + urlString;
}
var u = url.parse(urlString, true, true);
return new Host({
protocol: u.protocol ? u.protocol.substring(0, u.protocol.length - 1) : 'http',
host: u.hostname || 'localhost',
port: u.port || 9200,
auth: u.auth || '',
path: u.pathname,
query: u.query,
});
};
Host.prototype = {
protocol: 'http',
host: 'localhost',
port: 9200,
auth: '',
path: '',
query: {}
};
Host.prototype.toUrl = function (path, query) {
if (query) {
query = '?' + qs.stringify(_.defaults(typeof query === 'string' ? qs.parse(query) : query, this.query));
} else {
query = '';
}
return this.protocol + '://' +
this.host + (this.port !== defaultPort[this.protocol] ? ':' + this.port : '') +
'/' + this.path + (path || '') + query;
};

View File

@ -17,7 +17,7 @@ var _ = require('./utils'),
* @param {string} output.type - The name of the logger to use for this output
*/
function Log(config) {
this.config = config;
this.config = config || {};
var i;
var output = config.log || 2;
@ -250,7 +250,7 @@ Log.prototype.debug = function (/* ...msg */) {
Log.prototype.trace = function (method, requestUrl, body, responseBody, responseStatus) {
if (EventEmitter.listenerCount(this, 'trace')) {
if (typeof requestUrl === 'object') {
requestUrl = _.formatUrl(requestUrl);
requestUrl = url.format(requestUrl);
}
return this.emit('trace', method, requestUrl, body, responseBody, responseStatus);
}

54
src/lib/transport.js Normal file
View File

@ -0,0 +1,54 @@
/**
* Class that manages making request, called by all of the API methods.
* @type {[type]}
*/
module.exports = Transport;
var _ = require('./utils');
var TransportRequest = require('./transport_request');
var errors = require('./errors');
function Transport(config) {
this.config = config;
}
/**
* Perform a request with the client's transport
*
* @method request
* @todo async body writing
* @todo abort
* @todo access to custom headers, modifying of request in general
* @param {object} params
* @param {String} params.url - The url for the request
* @param {String} params.method - The HTTP method for the request
* @param {String} params.body - The body of the HTTP request
* @param {Function} cb - A function to call back with (error, responseBody, responseStatus)
*/
Transport.prototype.request = function (params, cb) {
return new TransportRequest(this.config, params, cb);
};
/**
* Ask an ES node for a list of all the nodes, add/remove nodes from the connection
* pool as appropriate
*
* @param {Function} cb - Function to call back once complete
*/
Transport.prototype.sniff = function (cb) {
var config = this.config;
// make cb a function if it isn't
cb = typeof cb === 'function' ? cb : _.noop;
this.request({
path: '/_cluster/nodes',
method: 'GET'
}, function (err, resp) {
if (!err && resp && resp.nodes) {
var nodes = config.nodesToHostCallback(resp.nodes);
config.connectionPool.setNodes(nodes);
}
cb(err, resp);
});
};

View File

@ -0,0 +1,147 @@
/**
* Constructs a function that can be called to make a request to ES
* @type {[type]}
*/
module.exports = TransportRequest;
var _ = require('./utils');
var EventEmitter = require('events').EventEmitter;
var errors = require('./errors');
function TransportRequest(config, params, cb) {
// setup event emitter
EventEmitter.call(this);
// copy cetain methods into the bound object
_.makeBoundMethods(this);
this._params = params;
this._log = config.log;
this._serializer = config.serializer;
this._connectionPool = config.connectionPool;
this._remainingRetries = config.maxRetries;
// in cb isn't a function make it one
if (typeof cb === 'function') {
this.once('done', cb);
}
this._startRequest();
}
_.inherits(TransportRequest, EventEmitter);
TransportRequest.prototype._startRequest = function () {
var params = this._params;
this._log.debug('starting request', params);
if (params.body && params.method === 'GET') {
process.nextTick(_.bindKey(this, 'respond', new TypeError('Body can not be sent with method "GET"')));
return;
}
// serialize the body
if (params.body) {
params.body = this._serializer[params.bulkBody ? 'bulkBody' : 'serialize'](params.body);
}
params.req = {
path: params.path,
query: params.query,
method: params.method,
body: params.body,
};
this._connectionPool.select(this.bound._sendReqWithCon);
};
TransportRequest.prototype._sendReqWithCon = _.handler(function (err, con) {
if (err) {
this._respond(err);
} else if (con) {
this._connection = con;
this._log.info('Selected', con.status, 'Connection, making request');
this._request = con.request(this._params.req, this.bound._checkRespForFail);
} else {
this._log.warning('No living connections');
this._respond(new errors.ConnectionFault('No living connections.'));
}
});
TransportRequest.prototype._checkRespForFail = _.handler(function (err, body, status) {
if (err && this._remainingRetries) {
this._remainingRetries--;
this._log.info('Connection error, retrying');
this._connectionPool.select(this.bound._sendReqWithCon);
} else {
this._log.info('Request complete');
this._respond(err, body, status);
}
});
TransportRequest.prototype._respond = _.handler(function (err, body, status) {
if (this._response) {
throw new Error('Request responded twice');
}
var parsedBody;
var serializer = this._serializer;
// get ignore and ensure that it's an array
var ignore = this._params.ignore;
if (ignore && !_.isArray(ignore)) {
ignore = [ignore];
}
if (!err && body) {
parsedBody = serializer.unserialize(body);
if (parsedBody == null) {
err = new errors.Serialization();
}
}
if (!err) {
if ((status < 200 || status >= 300) && !_.contains(ignore, status)) {
if (errors[status]) {
err = new errors[status](parsedBody && parsedBody.error);
} else {
err = new errors.Generic('unknown error');
}
}
}
if (this._params.castExists) {
if (err && err instanceof errors.NotFound) {
parsedBody = false;
err = void 0;
} else {
parsedBody = !err;
}
}
this._error = err;
this._response = {
body: parsedBody,
status: status
};
this.emit('done', this._error, this._response.body, this._response.status);
});
TransportRequest.prototype.abort = function () {
this.aborted = true;
if (this.__request) {
this.__request.abort();
return true;
}
return false;
};
TransportRequest.prototype.then = function (callback, errback) {
if (this._error) {
errback(this._error);
} else if (this._response) {
callback(this._response);
} else {
this.once('done', _.bindKey(this, 'then', callback, errback));
}
};

View File

@ -2,8 +2,6 @@ var path = require('path'),
_ = require('lodash'),
fs = require('fs'),
requireDir = require('require-directory'),
qs = require('querystring'),
url = require('url'),
nodeUtils = require('util');
/**
@ -70,7 +68,7 @@ utils.requireClasses = function (module, dirPath) {
*/
utils.reKey = function (obj, transform, recursive) {
// defaults
if (typeof recursive === 'undefined') { recursive = true; }
if (recursive === void 0) { recursive = true; }
if (typeof transform !== 'function') { throw new TypeError('invalid transform function'); }
var out = {};
@ -252,19 +250,6 @@ utils.repeat = function (what, times) {
return (new Array(times + 1)).join(what);
};
/**
* Convert an object into a query string
*
* @method makeQueryString
* @param {Object} obj - The object to convert
* @param {Boolean} [start=true] - Should the query string start with a '?'
* @return {String}
*/
utils.makeQueryString = function (obj, start) {
var str = qs.stringify(obj);
return (start === false || str === '') ? str : '?' + str;
};
/**
* Override node's util.inherits function to also supply a callSuper function on the child class that can be called
* with the instance and the arguments passed to the child's constructor. This should only be called from within the
@ -310,45 +295,6 @@ utils.collectMatches = function (text, regExp) {
return matches;
};
var startsWithProtocolRE = /^([a-z]+:)?\/\//;
/**
* Runs a string through node's url.parse, removing the return value's host property and insuring the text has a
* protocol first
*
* @todo Tests
* @param urlString {String} - a url of some sort
* @returns {Object} - an object containing 'hostname', 'port', 'protocol', 'path', and a few other keys
*/
utils.parseUrl = function (urlString) {
if (!startsWithProtocolRE.text(urlString)) {
urlString = 'http://' + urlString;
}
var info = url.parse(urlString);
return info;
};
/**
* Formats a urlinfo object, sort of juggling the 'host' and 'hostname' keys based on the presense of the port and
* including http: as the default protocol.
*
* @todo Tests,
* @todo add checking for ':' at the end of the protocol
* @param urlInfo {Object} - An object, similar to that returned from _.parseUrl
* @returns {String}
*/
utils.formatUrl = function (urlInfo) {
var info = _.pick(urlInfo, ['protocol', 'hostname', 'port']);
if (info.port && urlInfo.host && !info.hostname) {
info.hostname = urlInfo.host;
delete info.host;
}
if (!info.protocol) {
info.protocol = 'http:';
}
return url.format(info);
};
/**
* Call a function, applying the arguments object to it in an optimized way, rather than always turning it into an array
*
@ -425,7 +371,7 @@ _.scheduled = _.handler;
* ```
*
* @param {Object} obj - The object to bind the methods to
* @param {Array} [methods] - The methods to bind, false values === bind them all
* @param {Array} [methods] - The methods to bind, false values === bind all flagged with _provideBound
*/
_.makeBoundMethods = function (obj, methods) {
obj.bound = {};

View File

@ -12,7 +12,7 @@ var argv = require('optimist')
.default('executable', process.env.ES_HOME ? path.join(process.env.ES_HOME, './bin/elasticsearch') : null)
.default('clusterName', 'yaml-test-runner')
.default('dataPath', '/tmp/yaml-test-runner')
.default('hostname', 'localhost')
.default('host', 'localhost')
.default('port', '9200')
.default('match', '**')
.boolean('createServer')
@ -49,7 +49,7 @@ function createClient() {
client = new es.Client({
hosts: [
{
hostname: esServer ? esServer.__hostname : argv.hostname,
host: esServer ? esServer.__hostname : argv.host,
port: esServer ? esServer.__port : argv.port
}
],
@ -343,7 +343,7 @@ ActionRunner.prototype = {
, remainingSteps;
for (i = 0; from != null && i < steps.length; i++) {
if (typeof from[steps[i]] === 'undefined') {
if (from[steps[i]] === void 0) {
remainingSteps = steps.slice(i).join('.').replace(/\\\./g, '.');
from = from[remainingSteps];
break;
@ -429,7 +429,7 @@ ActionRunner.prototype = {
catcher = null;
}
clientAction.call(client, params, _.bind(function (error, body, status) {
var cb = _.bind(function (error, body, status) {
this._last_requests_response = body;
if (error) {
@ -451,7 +451,23 @@ ActionRunner.prototype = {
}
done(error);
}, this));
}, this);
// switch (Math.round(Math.random() * 100) % 3) {
// case 0:
// clientAction.call(client, params).then(function (resp) {
// cb(void 0, resp.body, resp.status);
// }, function (err) {
// cb(err);
// });
// break;
// case 1:
// clientAction.call(client, params).once('done', cb);
// break;
// case 2:
clientAction.call(client, params, cb);
// break;
// }
} else {
done(new Error('stepped in do_do, did not find a function'));
}

View File

@ -21,7 +21,7 @@ exports.start = function (params, cb) {
'-Des.discovery.zen.ping.multicast.enabled=false',
],
{
cwd: undefined,
cwd: void 0,
env: process.env,
stdio: [
'ignore',

View File

@ -19,7 +19,7 @@ describe('EsServer Mock', function () {
function makeRequest(opts, respCb) {
opts = _.defaults(opts || {}, {
hostname: 'localhost',
host: 'localhost',
port: port
});