diff --git a/src/lib/client.js b/src/lib/client.js new file mode 100755 index 000000000..9822cdea4 --- /dev/null +++ b/src/lib/client.js @@ -0,0 +1,209 @@ +/** + * A client that makes requests to Elasticsearch via a {{#crossLink "Transport"}}Transport{{/crossLink}} + * + * Initializing a client might look something like: + * + * ``` + * var client = new es.Client({ + * hosts: [ + * 'es1.net:9200', + * { + * host: 'es2.net', + * port: 9200 + * } + * ], + * sniffOnStart: true, + * log: { + * type: 'file', + * level: 'warning' + * } + * }); + * ``` + * + * @class Client + * @constructor + * @param {Object} [config={}] - Configuration for the transport + * @param {Object} [config.transport] - Transport settings passed to {{#crossLink "Transport"}}Transport Constructor{{/crossLink}} + * @param {String|Array} [config.log] - Log output settings {{#crossLink "Log"}}Log Constructor{{/crossLink}} + * @param {Object} [config.trace=false] - Create a log output to stdio that only tracks trace logs + */ + +module.exports = Client; + +var _ = require('./utils'); +var ClientConfig = require('./client_config'); +var ca = require('./client_action').create; +var errors = require('./errors'); +var api = require('./api.js'); + +function Client(config) { + this.client = this; + + // setup the config.. this config will be passed EVERYWHERE so for good measure it is locked down + Object.defineProperty(this, 'config', { + configurable: false, + enumerable: false, + writable: false, + value: !config || _.isPlainObject(config) ? new ClientConfig(config) : config, + }); + this.config.client = this; + + for (var i = 0; i < this._namespaces.length; i++) { + this[this._namespaces[i]] = new this[this._namespaces[i]](this); + } +} + +Client.prototype = api; + +/** + * Perform a request with the client's transport + * + * @method request + * @todo async body writing + * @todo abort + * @todo access to custom headers, modifying of request in general + * @param {object} params + * @param {String} params.url - The url for the request + * @param {String} params.method - The HTTP method for the request + * @param {String} params.body - The body of the HTTP request + * @param {Function} cb - A function to call back with (error, responseBody, responseStatus) + */ +Client.prototype.request = function (params, cb) { + var serializer = this.config.serializer; + + // in cb isn't a function make it one + cb = typeof cb === 'function' ? cb : _.noop; + + var connectionPool = this.config.connectionPool; + var log = this.config.log; + var remainingRetries = this.config.maxRetries; + var connection; + + log.debug('starting request', params); + + // get ignore and ensure that it's an array + var ignore = params.ignore; + if (ignore && !_.isArray(ignore)) { + ignore = [ignore]; + } + + // serialize the body + if (params.body) { + params.body = params.bulkBody ? serializer.bulkBody(params.body) : serializer.serialize(params.body); + } + + if (params.body && params.method === 'GET') { + respond(new TypeError('Body can not be sent with method "GET"')); + return; + } + + function sendRequestWithConnection(err, _connection) { + if (err) { + log.error(err); + respond(err); + } else if (_connection) { + connection = _connection; + log.info('Selected', _connection.status, 'Connection, making request'); + connection.request(params, checkRespForFailure); + } else { + log.warning('No living connections'); + respond(new errors.ConnectionFault('No living connections.')); + } + } + + function checkRespForFailure(err, reqParams, body, status) { + connection.setStatus(err ? 'dead' : 'alive'); + + if (err) { + log.error(err); + } + + if (err && remainingRetries) { + remainingRetries--; + log.info('Connection error, retrying'); + connectionPool.select(sendRequestWithConnection); + } else { + log.info('Request complete'); + respond(err, reqParams, body, status); + } + } + + function respond(err, reqParams, body, status) { + var parsedBody = null; + if (reqParams) { + log.trace(reqParams.method, reqParams, params.body, body, status); + } + if (!err) { + if (body) { + parsedBody = serializer.unserialize(body); + if (!parsedBody) { + err = new errors.ParseError(); + } + } else if (reqParams.method === 'HEAD') { + parsedBody = (status === 200); + } + } + + if (err) { + cb(err, parsedBody, status); + } else if ((status >= 200 && status < 300) || ignore && _.contains(ignore, status)) { + cb(void 0, parsedBody, status); + } else { + if (errors[status]) { + cb(new errors[status](parsedBody.error), parsedBody, status); + } else { + cb(new errors.Generic('unknown error'), parsedBody, status); + } + } + } + + connectionPool.select(sendRequestWithConnection); +}; + +/** + * Ping some node to ensure that the cluster is available in some respect + * + * @param {Object} params - Currently just a placeholder, no params used at this time + * @param {Function} cb - callback + */ +Client.prototype.ping = ca({ + methods: ['HEAD'], + params: {}, + url: { + fmt: '/' + } + }); + +/** + * Ask an ES node for a list of all the nodes, add/remove nodes from the connection + * pool as appropriate + * + * @param {Function} cb - Function to call back once complete + */ +Client.prototype.sniff = function (cb) { + var config = this.config; + + // make cb a function if it isn't + cb = typeof cb === 'function' ? cb : _.noop; + + this.request({ + path: '/_cluster/nodes', + method: 'GET' + }, function (err, resp) { + if (!err && resp && resp.nodes) { + var nodes = config.nodesToHostCallback(resp.nodes); + config.connectionPool.setNodes(nodes); + } + cb(err, resp); + }); +}; + + +/** + * Shutdown the connections, log outputs, and clear timers + */ +Client.prototype.close = function () { + this.config.log.close(); + this.config.connectionPool.close(); +}; + diff --git a/src/lib/log.js b/src/lib/log.js new file mode 100755 index 000000000..26a6974c3 --- /dev/null +++ b/src/lib/log.js @@ -0,0 +1,259 @@ +var _ = require('./utils'), + url = require('url'), + EventEmitter = require('events').EventEmitter; + +/** + * Log bridge, which is an [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter) + * that sends events to one or more outputs/loggers. Setup these loggers by + * specifying their config as the first argument, or by passing it to addOutput(). + * + * @class Log + * @uses Loggers.Stdio + * @constructor + * @param {string|Object|ArrayOfStrings|ArrayOfObjects} output - Either the level + * to setup a single logger, a full config object for alogger, or an array of + * config objects to use for creating log outputs. + * @param {string} output.level - One of the keys in Log.levels (error, warning, etc.) + * @param {string} output.type - The name of the logger to use for this output + */ +function Log(config) { + this.config = config; + + var i; + var output = config.log || 2; + + if (_.isString(output) || _.isFinite(output)) { + output = [ + { + level: output + } + ]; + } else if (_.isPlainObject(output)) { + output = [output]; + } else if (_.isArray(output)) { + for (i = 0; i < output.length; i++) { + if (_.isString(output[i])) { + output[i] = { + level: output[i] + }; + } + } + } + + if (!_.isArrayOfPlainObjects(output)) { + throw new TypeError('Invalid Logging output config'); + } + + for (i = 0; i < output.length; i++) { + this.addOutput(output[i]); + } + +} +_.inherits(Log, EventEmitter); + + +Log.prototype.close = function () { + this.emit('closing'); + if (EventEmitter.listenerCount(this)) { + console.error('Something is still listening for log events, but the logger is closing.'); + this.clearAllListeners(); + } +}; + +/** + * Levels observed by the loggers, ordered by rank + * + * @property levels + * @type Array + * @static + */ +Log.levels = [ + /** + * Event fired for error level log entries + * @event error + * @param {Error} error - The error object to log + */ + 'error', + /** + * Event fired for "warning" level log entries, which usually represent things + * like correctly formatted error responses from ES (400, ...) and recoverable + * errors (one node unresponsive) + * + * @event warning + * @param {String} message - A message to be logged + */ + 'warning', + /** + * Event fired for "info" level log entries, which usually describe what a + * client is doing (sniffing etc) + * + * @event info + * @param {String} message - A message to be logged + */ + 'info', + /** + * Event fired for "debug" level log entries, which will describe requests sent, + * including their url (no data, response codes, or exec times) + * + * @event debug + * @param {String} message - A message to be logged + */ + 'debug', + /** + * Event fired for "trace" level log entries, which provide detailed information + * about each request made from a client, including reponse codes, execution times, + * and a full curl command that can be copied and pasted into a terminal + * + * @event trace + * @param {String} method method, , body, responseStatus, responseBody + * @param {String} url - The url the request was made to + * @param {String} body - The body of the request + * @param {Integer} responseStatus - The status code returned from the response + * @param {String} responseBody - The body of the response + */ + 'trace' +]; + +/** + * Converts a log config value (string or array) to an array of level names which + * it represents + * + * @method parseLevels + * @static + * @private + * @param {String|ArrayOfStrings} input - Cound be a string to specify the max + * level, or an array of exact levels + * @return {Array} - + */ +Log.parseLevels = function (input) { + if (_.isString(input)) { + return Log.levels.slice(0, _.indexOf(Log.levels, input) + 1); + } + else if (_.isArray(input)) { + return _.intersection(input, Log.levels); + } +}; + +/** + * Combine the array-like param into a simple string + * + * @method join + * @static + * @private + * @param {*} arrayish - An array like object that can be itterated by _.each + * @return {String} - The final string. + */ +Log.join = function (arrayish) { + return _.map(arrayish, function (item) { + if (_.isPlainObject(item)) { + return _.inspect(item) + '\n'; + } else { + return item.toString(); + } + }).join(' '); +}; + +/** + * Create a new logger, based on the config. + * + * @method addOutput + * @param {object} config - An object with config options for the logger. + * @param {String} [config.type=stdio] - The name of an output/logger. Options + * can be found in the `src/loggers` directory. + * @param {String|ArrayOfStrings} [config.levels=warning] - The levels to output + * to this logger, when an array is specified no levels other than the ones + * specified will be listened to. When a string is specified, that and all lower + * levels will be logged. + * @return {Logger} + */ +Log.prototype.addOutput = function (config) { + var levels = Log.parseLevels(config.levels || config.level || 'warning'); + + _.defaults(config || {}, { + type: 'stdio', + }); + + // force the levels config + delete config.level; + config.levels = levels; + + var Logger = require('./loggers/' + config.type); + return new Logger(config, this); +}; + +/** + * Log an error + * + * @method error + * @param {Error|String} error The Error to log + * @return {Boolean} - True if any outputs accepted the message + */ +Log.prototype.error = function (e) { + if (EventEmitter.listenerCount(this, 'error')) { + return this.emit('error', e instanceof Error ? e : new Error(e)); + } +}; + + +/** + * Log a warning message + * + * @method warning + * @param {*} msg* - Any amount of messages that will be joined before logged + * @return {Boolean} - True if any outputs accepted the message + */ +Log.prototype.warning = function (/* ...msg */) { + if (EventEmitter.listenerCount(this, 'warning')) { + return this.emit('warning', Log.join(arguments)); + } +}; + + +/** + * Log useful info about what's going on + * + * @method info + * @param {*} msg* - Any amount of messages that will be joined before logged + * @return {Boolean} - True if any outputs accepted the message + */ +Log.prototype.info = function (/* ...msg */) { + if (EventEmitter.listenerCount(this, 'info')) { + return this.emit('info', Log.join(arguments)); + } +}; + +/** + * Log a debug level message + * + * @method debug + * @param {*} msg* - Any amount of messages that will be joined before logged + * @return {Boolean} - True if any outputs accepted the message + */ +Log.prototype.debug = function (/* ...msg */) { + if (EventEmitter.listenerCount(this, 'debug')) { + return this.emit('debug', Log.join(arguments) + _.getStackTrace(Log.prototype.debug)); + } +}; + +/** + * Log a trace level message + * + * @method trace + * @param {String} method - HTTP request method + * @param {String|Object} requestUrl - URL requested. If the value is an object, + * it is expected to be the return value of Node's url.parse() + * @param {String} body - The request's body + * @param {String} responseBody - body returned from ES + * @param {String} responseStatus - HTTP status code + * @return {Boolean} - True if any outputs accepted the message + */ +Log.prototype.trace = function (method, requestUrl, body, responseBody, responseStatus) { + if (EventEmitter.listenerCount(this, 'trace')) { + if (typeof requestUrl === 'object') { + requestUrl = _.formatUrl(requestUrl); + } + return this.emit('trace', method, requestUrl, body, responseBody, responseStatus); + } +}; + +module.exports = Log; diff --git a/src/lib/loggers/file.js b/src/lib/loggers/file.js new file mode 100755 index 000000000..6601a3fdc --- /dev/null +++ b/src/lib/loggers/file.js @@ -0,0 +1,40 @@ +/** + * Logger that writes to a file + * + * @class Loggers.File + * @extends StreamLogger + * @constructor + * @param {Object} config - The configuration for the Logger (See LoggerAbstract for generic options) + * @param {String} config.path - The location to write + * @param {Log} bridge - The object that triggers logging events, which we will record + */ + +module.exports = File; + +var StreamLogger = require('./stream'), + _ = require('../utils'), + fs = require('fs'); + +function File(config, bridge) { + this.path = config.path; + + config.stream = fs.createWriteStream(config.path, { + flags: 'a', + encoding: 'utf8' + }); + + File.callSuper(this, arguments); +} +_.inherits(File, StreamLogger); + +File.prototype.onProcessExit = _.handler(function () { + // flush the write buffer to disk + var writeBuffer = this.stream._writableState.buffer; + var out = ''; + if (writeBuffer) { + writeBuffer.forEach(function (buffered) { + out += buffered.chunk.toString(); + }); + fs.appendFileSync(this.path, out); + } +}); diff --git a/src/lib/loggers/stdio.js b/src/lib/loggers/stdio.js new file mode 100755 index 000000000..ee092d7c7 --- /dev/null +++ b/src/lib/loggers/stdio.js @@ -0,0 +1,119 @@ +/** + * Special version of the Stream logger, which logs errors and warnings to stderr and all other + * levels to stdout. + * + * @class Loggers.Stdio + * @extends LoggerAbstract + * @constructor + * @param {Object} config - The configuration for the Logger + * @param {string} config.level - The highest log level for this logger to output. + * @param {Log} bridge - The object that triggers logging events, which we will record + */ + +module.exports = Stdio; + +var clc = require('cli-color'), + LoggerAbstract = require('../logger'), + _ = require('../utils'); + +function Stdio(config, bridge) { + Stdio.callSuper(this, arguments); + + // config/state + this.color = _.has(config, 'color') ? !!config.color : true; +} + +_.inherits(Stdio, LoggerAbstract); + +/** + * Sends output to a stream, does some formatting first + * + * @method write + * @private + * @param {WritableStream} to - The stream that should receive this message + * @param {String} label - The text that should be used at the beginning the message + * @param {function} colorize - A function that receives a string and returned a colored version of it + * @param {*} what - The message to log + * @return {undefined} + */ +Stdio.prototype.write = function (to, label, colorize, message) { + if (this.color) { + label = colorize(label); + } + to.write(this.format(label, message)); +}; + +/** + * Handler for the bridges "error" event + * + * @method onError + * @private + * @param {Error} e - The Error object to log + * @return {undefined} + */ +Stdio.prototype.onError = _.handler(function (e) { + this.write(process.stderr, e.name === 'Error' ? 'ERROR' : e.name, clc.red.bold, e.stack); +}); + +/** + * Handler for the bridges "warning" event + * + * @method onWarning + * @private + * @param {String} msg - The message to be logged + * @return {undefined} + */ +Stdio.prototype.onWarning = _.handler(function (msg) { + this.write(process.stderr, 'WARNING', clc.yellow.bold, msg); +}); + +/** + * Handler for the bridges "info" event + * + * @method onInfo + * @private + * @param {String} msg - The message to be logged + * @return {undefined} + */ +Stdio.prototype.onInfo = _.handler(function (msg) { + this.write(process.stdout, 'INFO', clc.cyan.bold, msg); +}); + +/** + * Handler for the bridges "debug" event + * + * @method onDebug + * @private + * @param {String} msg - The message to be logged + * @return {undefined} + */ +Stdio.prototype.onDebug = _.handler(function (msg) { + this.write(process.stdout, 'DEBUG', clc.magentaBright.bold, msg); +}); + +/** + * Handler for the bridges "trace" event + * + * @method onTrace + * @private + * @return {undefined} + */ +Stdio.prototype.onTrace = _.handler(function (method, url, body, responseBody, responseStatus) { + var message = 'curl "' + url.replace(/"/g, '\\"') + '" -X' + method.toUpperCase(); + if (body) { + message += ' -d "' + body.replace(/"/g, '\\"') + '"'; + } + message += '\n<- '; + if (this.color) { + if (responseStatus >= 200 && responseStatus < 300) { + message += clc.green.bold(responseStatus); + } else { + message += clc.red.bold(responseStatus); + } + } else { + message += responseStatus; + } + message += '\n' + responseBody; + + this.write(process.stdout, 'TRACE', clc.cyanBright.bold, message); +}); diff --git a/src/lib/loggers/stream.js b/src/lib/loggers/stream.js new file mode 100755 index 000000000..dae6375ff --- /dev/null +++ b/src/lib/loggers/stream.js @@ -0,0 +1,51 @@ +/** + * Logger that writes to a file + * + * @class Loggers.File + * @extends LoggerAbstract + * @constructor + * @see LoggerAbstract + * @param {Object} config - The configuration for the Logger (See LoggerAbstract for generic options) + * @param {String} config.path - The location to write + * @param {Log} bridge - The object that triggers logging events, which we will record + */ + +module.exports = Stream; + +var LoggerAbstract = require('../logger'), + nodeStreams = require('stream'), + _ = require('../utils'), + fs = require('fs'); + +function Stream(config, bridge) { + Stream.callSuper(this, arguments); + _.makeBoundMethods(this); + + if (config.stream instanceof nodeStreams.Writable) { + this.stream = config.stream; + } else { + throw new TypeError('Invalid stream, use an instance of stream.Writeable'); + } + + process.on('exit', this.bound.onProcessExit); +} +_.inherits(Stream, LoggerAbstract); + +// flush the write buffer to stderr synchronously +Stream.prototype.onProcessExit = _.handler(function () { + var writeBuffer = this.stream._writableState.buffer; + if (writeBuffer && writeBuffer.length) { + console.error('Log stream did not get to finish writing. Flushing to stderr'); + writeBuffer.forEach(function (buffered) { + console.error(buffered.chunk.toString()); + }); + } +}); + +Stream.prototype.write = function (label, message) { + this.stream.write(this.format(label, message), 'utf8'); +}; + +Stream.prototype.close = function () { + this.stream.end(); +}; diff --git a/src/lib/selectors/random.js b/src/lib/selectors/random.js new file mode 100755 index 000000000..88e565ab5 --- /dev/null +++ b/src/lib/selectors/random.js @@ -0,0 +1,5 @@ +module.exports = RandomSelect; + +function RandomSelect(connections) { + return connections[Math.floor(Math.random() * connections.length)]; +} diff --git a/src/lib/serializers/json.js b/src/lib/serializers/json.js new file mode 100755 index 000000000..12d3d1929 --- /dev/null +++ b/src/lib/serializers/json.js @@ -0,0 +1,52 @@ +/** + * Simple JSON serializer + * @type {[type]} + */ +module.exports = Json; + +var _ = require('../utils'); + +function Json(client) { + this.client = client; +} + +Json.prototype.serialize = function (val, replacer, spaces) { + if (val == null) { + return null; + } + else if (typeof val === 'string') { + return val; + } else { + return JSON.stringify(val, replacer, spaces); + } +}; + +Json.prototype.unserialize = function (str) { + if (typeof str === 'string') { + try { + return JSON.parse(str); + } catch (e) { + this.client.log.error(new Error('unable to parse', str)); + return null; + } + } else { + return str; + } +}; + +Json.prototype.bulkBody = function (val) { + var body = '', i; + + if (_.isArray(val)) { + for (i = 0; i < val.length; i++) { + body += this.serialize(val[i]) + '\n'; + } + } else if (typeof val === 'string') { + // make sure the string ends in a new line + body = val + (val[val.length - 1] === '\n' ? '' : '\n'); + } else { + throw new TypeError('Bulk body should either be an Array of commands/string, or a String'); + } + + return body; +}; diff --git a/test/serializers/json.test.js! b/test/serializers/json.test.js! new file mode 100755 index 000000000..0261b1c69 --- /dev/null +++ b/test/serializers/json.test.js! @@ -0,0 +1,40 @@ +/* JSON Serializer tests */ + +var JsonSerializer = require('../../src/lib/serializers/Json'); + +describe('json serializer', function () { + + var json; + + beforeEach(function () { + json = new JsonSerializer(); + }); + + it('creates simple json strings', function () { + json.serialize({foo: true}).should.eql('{"foo":true}'); + }); + + it('creates pretty json strings', function () { + json.serialize({foo: true, bake: 'cake', 'with': ['bacon']}, null, ' ') + .should.eql(['{', + ' "foo": true,', + ' "bake": "cake",', + ' "with": [', + ' "bacon"', + ' ]', + '}'].join('\n')); + }); + + it('reads simple json strings', function () { + json.unserialize('{"foo":true}').should.eql({ foo: true }); + }); + + it('does not create date objects', function () { + json + .unserialize('{"date":"2012-04-23T18:25:43.511Z"}') + .should.eql({ + date: '2012-04-23T18:25:43.511Z' + }); + }); + +}); \ No newline at end of file