moved the transport request back into the transport, added when.js promises

This commit is contained in:
Spencer Alger
2013-11-14 14:30:24 -07:00
parent ffac604a25
commit 20804bb5ab
16 changed files with 386 additions and 32938 deletions

3
.gitignore vendored
View File

@ -1,8 +1,9 @@
dist
node_modules
scripts/scratch*
test/integration/yaml_suite/log
## generated files
scripts/last_rest_spec_update.sha
test/browser_integration/*.xml
test/browser_integration/yaml_tests.js
test/**/test-output-*.xml

View File

@ -7,6 +7,7 @@ module.exports = function (grunt) {
var child_process = require('child_process');
var sharedBrowserfyExclusions = [
'when',
'src/lib/connectors/http.js',
'src/lib/loggers/file.js',
'src/lib/loggers/stdio.js',
@ -32,18 +33,18 @@ module.exports = function (grunt) {
}
},
mochaTest: {
unit: [
'test/unit/**/*.test.js'
],
yaml_suite: [
'test/integration/yaml_suite/index.js'
],
unit: 'test/unit/**/*.test.js',
yaml_suite: {
src: 'test/integration/yaml_suite/index.js',
options: {
reporter: require('./test/integration/yaml_suite/reporter')
}
},
options: {
colors: true,
require: 'should',
reporter: 'dot',
bail: true,
timeout: 11000
timeout: 11e3
}
},
jshint: {

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

16301
dist/elasticsearch.js vendored

File diff suppressed because it is too large Load Diff

File diff suppressed because one or more lines are too long

View File

@ -5,8 +5,8 @@
module.exports = Transport;
var _ = require('./utils');
var TransportRequest = require('./transport_request');
var errors = require('./errors');
var when = require('when');
function Transport(config) {
this.config = config;
@ -26,7 +26,125 @@ function Transport(config) {
* @param {Function} cb - A function to call back with (error, responseBody, responseStatus)
*/
Transport.prototype.request = function (params, cb) {
return new TransportRequest(this.config, params, cb);
var log = this.config.log;
var serializer = this.config.serializer;
var connectionPool = this.config.connectionPool;
var remainingRetries = this.config.maxRetries;
var connection; // set in sendReqWithConnection
var connectionReq; // an object with an abort method, set in sendReqWithConnection
var request; // the object returned to the user, might be a deferred
log.debug('starting request', params);
if (params.body && params.method === 'GET') {
_.nextTick(respond, new TypeError('Body can not be sent with method "GET"'));
return;
}
// serialize the body
if (params.body) {
params.body = serializer[params.bulkBody ? 'bulkBody' : 'serialize'](params.body);
}
params.req = {
timeout: params.timeout,
method: params.method,
path: params.path,
query: params.query,
body: params.body,
};
connectionPool.select(sendReqWithConnection);
function abortRequest() {
remainingRetries = 0;
connectionReq.abort();
}
function sendReqWithConnection(err, _connection) {
if (err) {
respond(err);
} else if (_connection) {
connection = _connection;
connectionReq = connection.request(params.req, checkRespForFailure);
} else {
log.warning('No living connections');
respond(new errors.NoConnections());
}
}
function checkRespForFailure(err, body, status) {
if (err && remainingRetries) {
remainingRetries--;
log.error(err.message, '-- retrying');
connectionPool.select(sendReqWithConnection);
} else {
log.info('Request complete');
respond(err, body, status);
}
}
function respond(err, body, status) {
var parsedBody;
if (!err && body) {
parsedBody = serializer.unserialize(body);
if (parsedBody == null) {
err = new errors.Serialization();
}
}
if (!err) {
// get ignore and ensure that it's an array
var ignore = params.ignore;
if (ignore && !_.isArray(ignore)) {
ignore = [ignore];
}
if ((status < 200 || status >= 300)
&& (!ignore || !_.contains(ignore, status))
) {
if (errors[status]) {
err = new errors[status](parsedBody && parsedBody.error);
} else {
err = new errors.Generic('unknown error');
}
}
}
if (params.castExists) {
if (err && err instanceof errors.NotFound) {
parsedBody = false;
err = void 0;
} else {
parsedBody = !err;
}
}
if (typeof cb === 'function') {
cb(err, parsedBody, status);
} else if (err) {
request.reject(err);
} else {
request.resolve({
body: parsedBody,
status: status
});
}
}
// determine the API based on the presense of a callback
if (typeof cb === 'function') {
request = {
abort: abortRequest
};
} else {
request = when.defer();
request.abort = abortRequest;
}
return request;
};
/**

View File

@ -1,148 +0,0 @@
/**
* Constructs a function that can be called to make a request to ES
* @type {[type]}
*/
module.exports = TransportRequest;
var _ = require('./utils');
var EventEmitter = require('events').EventEmitter;
var errors = require('./errors');
function TransportRequest(config, params, cb) {
// setup event emitter
EventEmitter.call(this);
// copy cetain methods into the bound object
_.makeBoundMethods(this);
this._params = params;
this._log = config.log;
this._serializer = config.serializer;
this._connectionPool = config.connectionPool;
this._remainingRetries = config.maxRetries;
// in cb isn't a function make it one
if (typeof cb === 'function') {
this.once('done', cb);
}
this._startRequest();
}
_.inherits(TransportRequest, EventEmitter);
TransportRequest.prototype._startRequest = function () {
var params = this._params;
this._log.debug('starting request', params);
if (params.body && params.method === 'GET') {
process.nextTick(_.bindKey(this, 'respond', new TypeError('Body can not be sent with method "GET"')));
return;
}
// serialize the body
if (params.body) {
params.body = this._serializer[params.bulkBody ? 'bulkBody' : 'serialize'](params.body);
}
params.req = {
timeout: params.timeout,
path: params.path,
query: params.query,
method: params.method,
body: params.body,
};
this._connectionPool.select(this.bound._sendReqWithCon);
};
TransportRequest.prototype._sendReqWithCon = _.handler(function (err, con) {
if (err) {
this._respond(err);
} else if (con) {
this._connection = con;
this._log.info('Selected', con.status, 'Connection, making request');
this._request = con.request(this._params.req, this.bound._checkRespForFail);
} else {
this._log.warning('No living connections');
this._respond(new errors.NoConnections());
}
});
TransportRequest.prototype._checkRespForFail = _.handler(function (err, body, status) {
if (err && this._remainingRetries) {
this._remainingRetries--;
this._log.error(err.message, '-- retrying');
this._connectionPool.select(this.bound._sendReqWithCon);
} else {
this._log.info('Request complete');
this._respond(err, body, status);
}
});
TransportRequest.prototype._respond = _.handler(function (err, body, status) {
if (this._response) {
throw new Error('Request responded twice');
}
var parsedBody;
var serializer = this._serializer;
// get ignore and ensure that it's an array
var ignore = this._params.ignore;
if (ignore && !_.isArray(ignore)) {
ignore = [ignore];
}
if (!err && body) {
parsedBody = serializer.unserialize(body);
if (parsedBody == null) {
err = new errors.Serialization();
}
}
if (!err) {
if ((status < 200 || status >= 300) && !_.contains(ignore, status)) {
if (errors[status]) {
err = new errors[status](parsedBody && parsedBody.error);
} else {
err = new errors.Generic('unknown error');
}
}
}
if (this._params.castExists) {
if (err && err instanceof errors.NotFound) {
parsedBody = false;
err = void 0;
} else {
parsedBody = !err;
}
}
this._error = err;
this._response = {
body: parsedBody,
status: status
};
this.emit('done', this._error, this._response.body, this._response.status);
});
TransportRequest.prototype.abort = function () {
this.aborted = true;
if (this.__request) {
this.__request.abort();
return true;
}
return false;
};
TransportRequest.prototype.then = function (callback, errback) {
if (this._error) {
errback(this._error);
} else if (this._response) {
callback(this._response);
} else {
this.once('done', _.bindKey(this, 'then', callback, errback));
}
};

View File

@ -129,13 +129,10 @@
});
$test.appendTo(stack[0].$results);
}
if (test.type === 'hook' || stats.tests === this.total) {
allTestsDone();
}
}
});
function allTestsDone() {
runner.on('end', function () {
var testResults = {
stats: stats,
suites: $.map(rootSuite.suites, function removeElements(suite) {
@ -158,7 +155,7 @@
$.post('/tests-complete?browser=' + BROWSER_NAME, JSON.stringify(testResults), function () {
window.close();
});
}
});
/** override console to force all output to go to log and err, then we have all the output **/
global.console = (function () {

View File

@ -7,6 +7,7 @@ var _ = require('lodash');
var util = require('util');
var chalk = require('chalk');
var moment = require('moment');
var makeJUnitXml = require('../make_j_unit_xml');
chalk.enabled = true;
var browserify = require('browserify');
@ -49,7 +50,7 @@ function sendBundle(req, resp, files, opts, extend) {
function collectTestResults(req, resp) {
var body = '';
var browser = req.query.browser;
var logFilename = path.join(__dirname, 'test-output-' + browser + '.xml');
var logFilename = path.join(__dirname, '../test-output-' + browser + '.xml');
req.on('data', function (chunk) {
body += chunk;
@ -73,83 +74,8 @@ function collectTestResults(req, resp) {
resp.writeHead(200);
resp.end('good work');
/**
* The JUnit xml output desired by Jenkins essentially looks like this:
*
* testsuites:
* - testsuite: (name, timestamp, hostname, tests, failures, errors, time)
* - testcase: (error or failure, name, classname, time)
*
* Full XSD avaliable [here](http://windyroad.com.au/dl/Open%20Source/JUnit.xsd)
*
* from
*
* {
* stats: {
*
* }
* suite: [
* {
* name:
* results: []
* suites: [] // optional
* }
* ]
* }
*/
var testXml = require('xmlbuilder');
var suites = testXml.create('testsuites');
var suiteCount = 0;
_.each(testDetails.suites, function serializeSuite(suiteInfo) {
var suite = suites.ele('testsuite', {
package: 'elasticsearch-js:yaml_tests',
id: suiteCount++,
name: suiteInfo.name,
timestamp: moment(suiteInfo.start).toJSON(),
hostname: browser,
tests: (suiteInfo.results && suiteInfo.results.length) || 0,
failures: _.where(suiteInfo.results, {pass: false}).length,
errors: 0,
time: suiteInfo.time / 1000
});
_.each(suiteInfo.results, function (testInfo) {
var parts = suiteInfo.name.replace(/\.yaml$/, '').replace(/\./g, '_').split(/\//);
var section = parts.shift();
var behavior = parts.join('/');
var testcase = suite.ele('testcase', {
name: behavior + ' - ' + testInfo.name,
time: (testInfo.time || 0) / 1000,
classname: browser + '.' + section
});
if (testInfo.errMsg) {
testcase.ele('failure', {
message: testInfo.errMsg,
type: 'AssertError'
});
} else if (!testInfo.pass) {
testcase.ele('error', {
message: 'Unknown Error',
type: 'TestError'
});
}
});
if (suiteInfo.suites) {
_.each(suiteInfo.suites, serializeSuite);
}
suite.ele('system-out', {}).cdata(suiteInfo.stdout);
suite.ele('system-err', {}).cdata(suiteInfo.stderr);
});
fs.writeFile(logFilename, suites.toString({ pretty: true}), function (err) {
var xml = makeJUnitXml(browser, testDetails);
fs.writeFile(logFilename, xml, function (err) {
if (err) {
console.log('unable to save test-output to', err.message);
console.trace();

View File

@ -1,61 +0,0 @@
var EsServer = require('../../mocks/es_server');
var HttpConnection = require('../../../src/lib/connections/http');
var errors = require('../../../src/lib/errors');
describe('overall timeout for the network connections', function () {
var server;
var connection;
before(function (done) {
server = new EsServer();
server.routes.get['/timeout'] = function (req, res) {
// wait for 10 seconds before responding, or the value in the timeout param
var timeout = parseInt(req.parsedUrl.query.timeout, 10);
if (isNaN(timeout)) {
timeout = 10000;
}
res.writeHead(200);
res.on('close', function () {
clearInterval(dataInterval);
clearTimeout(finTimeout);
});
var dataInterval = setInterval(function () {
res.write('.');
}, 100);
var finTimeout = setTimeout(function () {
clearInterval(dataInterval);
res.end('good bye');
}, timeout);
};
server.on('online', function (port) {
connection = new HttpConnection({
hostname: 'localhost',
port: port
});
done();
});
});
it('should bail quickly', function (done) {
this.timeout(1000);
connection.request({
path: '/timeout?timeout=1000',
timeout: 100
}, function (err, resp, status) {
err.should.be.an.instanceof(errors.RequestTimeout);
done();
});
});
});

View File

@ -10,9 +10,6 @@ var path = require('path');
var fs = require('fs');
var _ = require('../../../src/lib/utils');
// location that the logger will write to
var logFile = path.resolve(__dirname, './log');
// current client
var client = null;
@ -58,17 +55,6 @@ module.exports = {
client.close();
}
if (!process.browser) {
// delete existing log file
try {
fs.unlinkSync(logFile);
} catch (e) {
if (!~e.message.indexOf('ENOENT')) {
return _.nextTick(cb, e);
}
}
}
client = new es.Client({
hosts: [
{
@ -77,9 +63,9 @@ module.exports = {
}
],
log: {
type: process.browser ? 'console' : 'file',
type: process.browser ? 'console' : 'stdio',
level: 'trace',
path: logFile
color: false
}
});

View File

@ -0,0 +1,166 @@
/**
* ESJS reporter for running and collecting mocha test results.
*
* @param {Runner} runner
* @api public
*/
module.exports = EsjsReporter;
var Base = require('mocha/lib/reporters/base');
var _ = require('lodash');
var chalk = require('chalk');
var clientManager = require('./client_manager');
var makeJUnitXml = require('../../make_j_unit_xml');
var fs = require('fs');
var path = require('path');
function EsjsReporter(runner) {
Base.call(this, runner);
clientManager.reporter = this;
var stats = this.stats;
var rootSuite = {
results: [],
suites: []
};
var stack = [rootSuite];
function indt() {
return (new Array(stack.length + 1)).join(' ');
}
runner.on('suite', function (suite) {
if (suite.root) {
return;
}
// suite
suite = {
name: suite.title,
results: [],
start: Date.now(),
stdout: '',
stderr: ''
};
// append to the previous stack leader
if (!stack[0].suites) {
stack[0].suites = [];
}
stack[0].suites.push(suite);
// push the suite onto the top of the stack
stack.unshift(suite);
});
runner.on('suite end', function (suite) {
if (suite.root) {
return;
}
stack[0].time = Date.now() - stack[0].start;
stack.shift();
});
runner.on('fail', function (test, err) {
if ('hook' === test.type) {
runner.emit('test end', test);
}
});
runner.on('test end', function (test) {
// test
var color = chalk[test.state === 'passed' ? 'green' : 'red'];
log(color('.'));
var errMsg = void 0;
if (test.err) {
errMsg = test.err.stack || test.err.toString();
// FF / Opera do not add the message
if (!~errMsg.indexOf(test.err.message)) {
errMsg = test.err.message + '\n' + errMsg;
}
// <=IE7 stringifies to [Object Error]. Since it can be overloaded, we
// check for the result of the stringifying.
if ('[object Error]' === errMsg) {
errMsg = test.err.message;
}
// Safari doesn't give you a stack. Let's at least provide a source line.
if (!test.err.stack && test.err.sourceURL && test.err.line !== undefined) {
errMsg += '\n(' + test.err.sourceURL + ':' + test.err.line + ')';
}
console.error(_.map(errMsg.split('\n'), function (line) {
return indt() + ' ' + line;
}).join('\n'));
}
if (!test.pending) {
if (stack[0]) {
stack[0].results.push({
name: test.title,
time: test.duration,
pass: test.state === 'passed',
test: test
});
}
}
});
runner.on('end', function () {
restoreStdio();
var outputFilename = path.join(__dirname, '../../test-output-node-yaml.xml');
var xml = makeJUnitXml('node ' + process.version + ' yaml tests', {
stats: stats,
suites: _.map(rootSuite.suites, function removeElements(suite) {
var s = {
name: suite.name,
start: suite.start,
time: suite.time || 0,
results: suite.results,
stdout: suite.stdout,
stderr: suite.stderr
};
if (suite.suites) {
s.suites = _.map(suite.suites, removeElements);
}
return s;
})
});
fs.writeFileSync(outputFilename, xml);
console.log('\nwrote log to', outputFilename);
});
var log = (function () {
var locked = _.bind(process.stdout.write, process.stdout);
return function (str) {
locked(str);
};
})();
// overload the write methods on stdout and stderr
['stdout', 'stderr'].forEach(function (name) {
var obj = process[name];
var orig = obj.write;
obj.write = function (chunk) {
if (stack[0]) {
stack[0][name] += chunk;
} else {
orig.apply(obj, arguments);
}
};
obj.__restore = function () {
this.write = orig;
};
});
function restoreStdio() {
process.stdout.__restore();
process.stderr.__restore();
}
}

81
test/make_j_unit_xml.js Normal file
View File

@ -0,0 +1,81 @@
/**
* The JUnit xml output desired by Jenkins essentially looks like this:
*
* testsuites:
* - testsuite: (name, timestamp, hostname, tests, failures, errors, time)
* - testcase: (error or failure, name, classname, time)
*
* Full XSD avaliable [here](http://windyroad.com.au/dl/Open%20Source/JUnit.xsd)
*
* from
*
* {
* stats: {
*
* }
* suite: [
* {
* name:
* results: []
* suites: [] // optional
* }
* ]
* }
*/
module.exports = makeJUnitXml;
var testXml = require('xmlbuilder');
var suites = testXml.create('testsuites');
var suiteCount = 0;
var moment = require('moment');
var _ = require('lodash');
function makeJUnitXml(runnerName, testDetails) {
_.each(testDetails.suites, function serializeSuite(suiteInfo) {
var suite = suites.ele('testsuite', {
package: 'elasticsearch-js:yaml_tests',
id: suiteCount++,
name: suiteInfo.name,
timestamp: moment(suiteInfo.start).toJSON(),
hostname: 'localhost',
tests: (suiteInfo.results && suiteInfo.results.length) || 0,
failures: _.where(suiteInfo.results, {pass: false}).length,
errors: 0,
time: suiteInfo.time / 1000
});
_.each(suiteInfo.results, function (testInfo) {
var parts = suiteInfo.name.replace(/\.yaml$/, '').replace(/\./g, '_').split(/\//);
var section = parts.shift();
var behavior = parts.join('/');
var testcase = suite.ele('testcase', {
name: behavior + ' - ' + testInfo.name,
time: (testInfo.time || 0) / 1000,
classname: runnerName + '.' + section
});
if (testInfo.errMsg) {
testcase.ele('failure', {
message: testInfo.errMsg,
type: 'AssertError'
});
} else if (!testInfo.pass) {
testcase.ele('error', {
message: 'Unknown Error',
type: 'TestError'
});
}
});
if (suiteInfo.suites) {
_.each(suiteInfo.suites, serializeSuite);
}
suite.ele('system-out', {}).cdata(suiteInfo.stdout);
suite.ele('system-err', {}).cdata(suiteInfo.stderr);
});
return suites.toString({ pretty: true});
}

View File

@ -1,3 +0,0 @@
--require should
--reporter dot
--timeout 11000