diff --git a/src/lib/client.js b/src/lib/client.js new file mode 100644 index 000000000..0b2402da3 --- /dev/null +++ b/src/lib/client.js @@ -0,0 +1,218 @@ +/** + * A client that makes requests to Elasticsearch via a {{#crossLink "Transport"}}Transport{{/crossLink}} + * + * Initializing a client might look something like: + * + * ``` + * var client = new es.Client({ + * hosts: [ + * 'es1.net:9200', + * { + * host: 'es2.net', + * port: 9200 + * } + * ], + * sniffOnStart: true, + * log: { + * type: 'file', + * level: 'warning' + * } + * }); + * ``` + * + * @class Client + * @constructor + * @param {Object} [config={}] - Configuration for the transport + * @param {Object} [config.transport] - Transport settings passed to {{#crossLink "Transport"}}Transport Constructor{{/crossLink}} + * @param {String|Array} [config.log] - Log output settings {{#crossLink "Log"}}Log Constructor{{/crossLink}} + * @param {Object} [config.trace=false] - Create a log output to stdio that only tracks trace logs + */ + +module.exports = Client; + +var _ = require('./utils'); +var ClientConfig = require('./client_config'); +// var api = _.reKey(_.requireDir(module, '../api'), _.camelCase); +var q = require('q'); +var errors = require('./errors'); + +function Client(config) { + this.client = this; + + // setup the config.. this config will be passed EVERYWHERE so for good measure it is locked down + Object.defineProperty(this, 'config', { + configurable: false, + enumerable: false, + writable: false, + value: !config || _.isPlainObject(config) ? new ClientConfig(config) : config, + }); + this.config.client = this; + + for (var i = 0; i < _namespaces.length; i++) { + this[_namespaces[i]] = new this[_namespaces[i]](this); + } +} + +/** + * Perform a request with the client's transport + * + * @method request + * @todo async body writing + * @todo abort + * @todo access to custom headers, modifying of request in general + * @param {object} params + * @param {String} params.url - The url for the request + * @param {String} params.method - The HTTP method for the request + * @param {String} params.body - The body of the HTTP request + * @param {Function} cb - A function to call back with (error, responseBody, responseStatus) + */ +Client.prototype.request = function (params, cb) { + var serializer = this.config.serializer; + + // in cb isn't a function make it one + cb = typeof cb === 'function' ? cb : _.noop; + + var connectionPool = this.config.connectionPool; + var log = this.config.log; + var remainingRetries = this.config.maxRetries; + var connection; + + log.debug('starting request', params); + + // get ignore and ensure that it's an array + var ignore = params.ignore; + if (ignore && !_.isArray(ignore)) { + ignore = [ignore]; + } + + // serialize the body + if (params.body) { + params.body = params.bulkBody ? serializer.bulkBody(params.body) : serializer.serialize(params.body); + } + + if (params.body && params.method === 'GET') { + _.nextTick(cb, new TypeError('Body can not be sent with method "GET"')); + return; + } + + function sendRequestWithConnection(err, _connection) { + if (err) { + log.error(err); + respond(err); + } else if (_connection) { + connection = _connection; + log.info('Selected', _connection.status, 'Connection, making request'); + connection.request(params, checkRespForFailure); + } else { + log.warning('No living connections'); + respond(new errors.ConnectionFault('No living connections.')); + } + } + + function checkRespForFailure(err, reqParams, body, status) { + connection.setStatus(err ? 'dead' : 'alive'); + + if (err) { + log.error(err); + } + + if (err && remainingRetries) { + remainingRetries--; + log.info('Connection error, retrying'); + connectionPool.select(sendRequestWithConnection); + } else { + log.info('Request complete'); + respond(err, reqParams, body, status); + } + } + + function respond(err, reqParams, body, status) { + var parsedBody = null; + if (reqParams) { + log.trace(reqParams.method, reqParams, params.body, body, status); + } + if (!err) { + if (body) { + parsedBody = serializer.unserialize(body); + if (!parsedBody) { + err = new errors.ParseError(); + } + } else if (reqParams.method === 'HEAD') { + parsedBody = (status === 200); + } + } + + if (err) { + cb(err, parsedBody, status); + } else if ((status >= 200 && status < 300) || ignore && _.contains(ignore, status)) { + cb(void 0, parsedBody, status); + } else { + if (errors[status]) { + cb(new errors[status](parsedBody.error), parsedBody, status); + } else { + cb(new errors.Generic('unknown error'), parsedBody, status); + } + } + } + + connectionPool.select(sendRequestWithConnection); +}; + +/** + * Ping some node to ensure that the cluster is available in some respect + * + * @param {Object} params - Currently just a placeholder, no params used at this time + * @param {Function} cb - callback + */ +Client.prototype.ping = function (params, cb) { + this.request({ + method: 'HEAD', + path: '/' + }, cb); +}; + +/** + * Ask an ES node for a list of all the nodes, add/remove nodes from the connection + * pool as appropriate + * + * @param {Function} cb - Function to call back once complete + */ +Client.prototype.sniff = function (cb) { + var config = this.config; + + // make cb a function if it isn't + cb = typeof cb === 'function' ? cb : _.noop; + + this.request({ + path: '/_cluster/nodes', + method: 'GET' + }, function (err, resp) { + if (!err && resp && resp.nodes) { + var nodes = config.nodesToHostCallback(resp.nodes); + config.connectionPool.setNodes(nodes); + } + cb(err, resp); + }); +} + +var _namespaces = []; + +/** + * These names of the properties that hold namespace objects in the Client prototype + * @type {Array} + */ +Client.namespace = function (namespace) { + var steps = namespace.split('.'); + var path = []; + var on = Client; + var i; + for (i = 0; i < steps.length; i ++) { + path.push(steps[i]); + _namespaces.push(path.join('.')); + on.prototype[steps[i]] = function ClientActionNamespace(client) { + this.client = client; + }; + } +}; + +require('./api.js').attach(Client); diff --git a/src/lib/client_action.js b/src/lib/client_action.js index a8d90b896..78c2f9ab1 100644 --- a/src/lib/client_action.js +++ b/src/lib/client_action.js @@ -13,116 +13,6 @@ var errors = require('./errors'); var _ = require('./utils'); var urlParamRE = /\{(\w+)\}/g; -function exec(client, spec, params, cb) { - if (typeof params === 'function') { - cb = params; - params = {}; - } else { - params = params || {}; - cb = typeof cb === 'function' ? cb : _.noop; - } - - var request = { - ignore: params.ignore - }; - var parts = {}; - var query = {}; - var i; - - if (spec.needsBody && !params.body) { - return _.nextTick(cb, new TyperError(spec.name + ' requires a request body.')); - } - - if (params.body) { - request.body = params.body; - request.bulkBody = spec.bulkBody; - } - - if (spec.methods.length === 1) { - request.method = spec.methods[0]; - } else { - // if set, uppercase the user's choice, other wise returns "" - request.method = _.toUpperString(params.method); - - if (request.method) { - // use the one specified as long as it's a valid option - if (!_.contains(spec.methods, request.method)) { - return _.nextTick(cb, new TypeError('Invalid method: should be one of ' + spec.methods.join(', '))); - } - } else { - // pick a method - if (request.body) { - // first method that isn't "GET" - request.method = spec.methodWithBody || ( - spec.methodWithBody = _.find(spec.methods, function (m) { return m !== 'GET'; }) - ); - } else { - // just use the first option - request.method = spec.methods[0]; - } - } - } - - if (spec.url) { - // only one url option - request.path = resolveUrl(spec.url, params); - } else { - for (i = 0; i < spec.urls.length; i++) { - if (request.path = resolveUrl(spec.urls[i], params)) { - break; - } - } - } - - if (!request.path) { - // there must have been some mimimun requirements that were not met - return _.nextTick( - cb, - new TypeError( - 'Unable to build a path with those params. Supply at least ' + - _.keys(spec.urls[spec.urls.length - 1].req).join(', ') - ) - ); - } - - // build the query string - if (!spec.paramKeys) { - // build a key list on demand - spec.paramKeys = _.keys(spec.params); - } - var key, param; - for (i = 0; i < spec.paramKeys.length; i++) { - key = spec.paramKeys[i]; - param = spec.params[key]; - try { - if (params[key] != null) { - query[key] = castType[param.type] ? castType[param.type](param, params[key], key) : params[key]; - if (param['default'] && query[key] === param['default']) { - delete query[key]; - } - } else if (param.required) { - throw new TypeError('Missing required parameter ' + key); - } - } catch (e) { - return _.nextTick(cb, e); - } - } - - request.path = request.path + _.makeQueryString(query); - - if (spec.castNotFound) { - client.request(request, function (err, response) { - if (err instanceof errors.NotFound) { - cb(null, false); - } else { - cb(err, !err); - } - }); - } else { - client.request(request, cb); - } -}; - var castType = { enum: function (param, val, name) { if (_.contains(param.options, val)) { @@ -239,3 +129,113 @@ function resolveUrl (url, params) { delete params[name]; }, {})); }; + +function exec(client, spec, params, cb) { + if (typeof params === 'function') { + cb = params; + params = {}; + } else { + params = params || {}; + cb = typeof cb === 'function' ? cb : _.noop; + } + + var request = { + ignore: params.ignore + }; + var parts = {}; + var query = {}; + var i; + + if (spec.needsBody && !params.body) { + return _.nextTick(cb, new TyperError(spec.name + ' requires a request body.')); + } + + if (params.body) { + request.body = params.body; + request.bulkBody = spec.bulkBody; + } + + if (spec.methods.length === 1) { + request.method = spec.methods[0]; + } else { + // if set, uppercase the user's choice, other wise returns "" + request.method = _.toUpperString(params.method); + + if (request.method) { + // use the one specified as long as it's a valid option + if (!_.contains(spec.methods, request.method)) { + return _.nextTick(cb, new TypeError('Invalid method: should be one of ' + spec.methods.join(', '))); + } + } else { + // pick a method + if (request.body) { + // first method that isn't "GET" + request.method = spec.methodWithBody || ( + spec.methodWithBody = _.find(spec.methods, function (m) { return m !== 'GET'; }) + ); + } else { + // just use the first option + request.method = spec.methods[0]; + } + } + } + + if (spec.url) { + // only one url option + request.path = resolveUrl(spec.url, params); + } else { + for (i = 0; i < spec.urls.length; i++) { + if (request.path = resolveUrl(spec.urls[i], params)) { + break; + } + } + } + + if (!request.path) { + // there must have been some mimimun requirements that were not met + return _.nextTick( + cb, + new TypeError( + 'Unable to build a path with those params. Supply at least ' + + _.keys(spec.urls[spec.urls.length - 1].req).join(', ') + ) + ); + } + + // build the query string + if (!spec.paramKeys) { + // build a key list on demand + spec.paramKeys = _.keys(spec.params); + } + var key, param; + for (i = 0; i < spec.paramKeys.length; i++) { + key = spec.paramKeys[i]; + param = spec.params[key]; + try { + if (params[key] != null) { + query[key] = castType[param.type] ? castType[param.type](param, params[key], key) : params[key]; + if (param['default'] && query[key] === param['default']) { + delete query[key]; + } + } else if (param.required) { + throw new TypeError('Missing required parameter ' + key); + } + } catch (e) { + return _.nextTick(cb, e); + } + } + + request.path = request.path + _.makeQueryString(query); + + if (spec.castNotFound) { + client.request(request, function (err, response) { + if (err instanceof errors.NotFound) { + cb(null, false); + } else { + cb(err, !err); + } + }); + } else { + client.request(request, cb); + } +}; diff --git a/src/lib/client_config.js b/src/lib/client_config.js index 5260796b1..09eea6fc3 100644 --- a/src/lib/client_config.js +++ b/src/lib/client_config.js @@ -27,6 +27,12 @@ var defaultConfig = { ], connectionConstructor: 'Http', selector: selectors.roundRobin, + sniffOnStart: false, + sniffAfterRequests: null, + sniffOnConnectionFail: false, + maxRetries: 3, + timeout: 10000, + deadTimeout: 60000, nodesToHostCallback: function (nodes) { var hosts = []; _.each(nodes, function (node, id) { @@ -41,13 +47,7 @@ var defaultConfig = { }); }); return hosts; - }, - sniffOnStart: false, - sniffAfterRequests: null, - sniffOnConnectionFail: false, - maxRetries: 3, - timeout: 10000, - deadTimeout: 60000 + } }; function ClientConfig(config) { diff --git a/src/lib/log.js b/src/lib/log.js new file mode 100644 index 000000000..207b4f692 --- /dev/null +++ b/src/lib/log.js @@ -0,0 +1,250 @@ +var _ = require('./utils'), + url = require('url'), + EventEmitter = require('events').EventEmitter; + +/** + * Log bridge, which is an [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter) + * that sends events to one or more outputs/loggers. Setup these loggers by + * specifying their config as the first argument, or by passing it to addOutput(). + * + * @class Log + * @uses Loggers.Stdio + * @constructor + * @param {string|Object|ArrayOfStrings|ArrayOfObjects} output - Either the level + * to setup a single logger, a full config object for alogger, or an array of + * config objects to use for creating log outputs. + * @param {string} output.level - One of the keys in Log.levels (error, warning, etc.) + * @param {string} output.type - The name of the logger to use for this output + */ +function Log(config) { + this.config = config; + + var i; + var output = config.log || 2; + + if (_.isString(output) || _.isFinite(output)) { + output = [ + { + level: output + } + ]; + } else if (_.isPlainObject(output)) { + output = [output]; + } else if (_.isArray(output)) { + for (i = 0; i < output.length; i++) { + if (_.isString(output[i])) { + output[i] = { + level: output[i] + }; + } + } + } + + if (!_.isArrayOfPlainObjects(output)) { + throw new TypeError('Invalid Logging output config'); + } + + for (i = 0; i < output.length; i++) { + this.addOutput(output[i]); + } + +} +_.inherits(Log, EventEmitter); + +/** + * Levels observed by the loggers, ordered by rank + * + * @property levels + * @type Array + * @static + */ +Log.levels = [ + /** + * Event fired for error level log entries + * @event error + * @param {Error} error - The error object to log + */ + 'error', + /** + * Event fired for "warning" level log entries, which usually represent things + * like correctly formatted error responses from ES (400, ...) and recoverable + * errors (one node unresponsive) + * + * @event warning + * @param {String} message - A message to be logged + */ + 'warning', + /** + * Event fired for "info" level log entries, which usually describe what a + * client is doing (sniffing etc) + * + * @event info + * @param {String} message - A message to be logged + */ + 'info', + /** + * Event fired for "debug" level log entries, which will describe requests sent, + * including their url (no data, response codes, or exec times) + * + * @event debug + * @param {String} message - A message to be logged + */ + 'debug', + /** + * Event fired for "trace" level log entries, which provide detailed information + * about each request made from a client, including reponse codes, execution times, + * and a full curl command that can be copied and pasted into a terminal + * + * @event trace + * @param {String} method method, , body, responseStatus, responseBody + * @param {String} url - The url the request was made to + * @param {String} body - The body of the request + * @param {Integer} responseStatus - The status code returned from the response + * @param {String} responseBody - The body of the response + */ + 'trace' +]; + +/** + * Converts a log config value (string or array) to an array of level names which + * it represents + * + * @method parseLevels + * @static + * @private + * @param {String|ArrayOfStrings} input - Cound be a string to specify the max + * level, or an array of exact levels + * @return {Array} - + */ +Log.parseLevels = function (input) { + if (_.isString(input)) { + return Log.levels.slice(0, _.indexOf(Log.levels, input) + 1); + } + else if (_.isArray(input)) { + return _.intersection(input, Log.levels); + } +}; + +/** + * Combine the array-like param into a simple string + * + * @method join + * @static + * @private + * @param {*} arrayish - An array like object that can be itterated by _.each + * @return {String} - The final string. + */ +Log.join = function (arrayish) { + return _.map(arrayish, function (item) { + if (_.isPlainObject(item)) { + return _.inspect(item) + '\n'; + } else { + return item.toString(); + } + }).join(' '); +}; + +/** + * Create a new logger, based on the config. + * + * @method addOutput + * @param {object} config - An object with config options for the logger. + * @param {String} [config.type=stdio] - The name of an output/logger. Options + * can be found in the `src/loggers` directory. + * @param {String|ArrayOfStrings} [config.levels=warning] - The levels to output + * to this logger, when an array is specified no levels other than the ones + * specified will be listened to. When a string is specified, that and all lower + * levels will be logged. + * @return {Logger} + */ +Log.prototype.addOutput = function (config) { + var levels = Log.parseLevels(config.levels || config.level || 'warning'); + + _.defaults(config || {}, { + type: 'stdio', + }); + + // force the levels config + delete config.level; + config.levels = levels; + + var Logger = require('./loggers/' + config.type); + return new Logger(config, this); +}; + +/** + * Log an error + * + * @method error + * @param {Error|String} error The Error to log + * @return {Boolean} - True if any outputs accepted the message + */ +Log.prototype.error = function (e) { + if (EventEmitter.listenerCount(this, 'error')) { + return this.emit('error', e instanceof Error ? e : new Error(e)); + } +}; + + +/** + * Log a warning message + * + * @method warning + * @param {*} msg* - Any amount of messages that will be joined before logged + * @return {Boolean} - True if any outputs accepted the message + */ +Log.prototype.warning = function (/* ...msg */) { + if (EventEmitter.listenerCount(this, 'warning')) { + return this.emit('warning', Log.join(arguments)); + } +}; + + +/** + * Log useful info about what's going on + * + * @method info + * @param {*} msg* - Any amount of messages that will be joined before logged + * @return {Boolean} - True if any outputs accepted the message + */ +Log.prototype.info = function (/* ...msg */) { + if (EventEmitter.listenerCount(this, 'info')) { + return this.emit('info', Log.join(arguments)); + } +}; + +/** + * Log a debug level message + * + * @method debug + * @param {*} msg* - Any amount of messages that will be joined before logged + * @return {Boolean} - True if any outputs accepted the message + */ +Log.prototype.debug = function (/* ...msg */) { + if (EventEmitter.listenerCount(this, 'debug')) { + return this.emit('debug', Log.join(arguments) + _.getStackTrace(Log.prototype.debug)); + } +}; + +/** + * Log a trace level message + * + * @method trace + * @param {String} method - HTTP request method + * @param {String|Object} requestUrl - URL requested. If the value is an object, + * it is expected to be the return value of Node's url.parse() + * @param {String} body - The request's body + * @param {String} responseBody - body returned from ES + * @param {String} responseStatus - HTTP status code + * @return {Boolean} - True if any outputs accepted the message + */ +Log.prototype.trace = function (method, requestUrl, body, responseBody, responseStatus) { + if (EventEmitter.listenerCount(this, 'trace')) { + if (typeof requestUrl === 'object') { + requestUrl = _.formatUrl(requestUrl); + } + return this.emit('trace', method, requestUrl, body, responseBody, responseStatus); + } +}; + +module.exports = Log; diff --git a/src/lib/loggers/file.js b/src/lib/loggers/file.js new file mode 100644 index 000000000..6601a3fdc --- /dev/null +++ b/src/lib/loggers/file.js @@ -0,0 +1,40 @@ +/** + * Logger that writes to a file + * + * @class Loggers.File + * @extends StreamLogger + * @constructor + * @param {Object} config - The configuration for the Logger (See LoggerAbstract for generic options) + * @param {String} config.path - The location to write + * @param {Log} bridge - The object that triggers logging events, which we will record + */ + +module.exports = File; + +var StreamLogger = require('./stream'), + _ = require('../utils'), + fs = require('fs'); + +function File(config, bridge) { + this.path = config.path; + + config.stream = fs.createWriteStream(config.path, { + flags: 'a', + encoding: 'utf8' + }); + + File.callSuper(this, arguments); +} +_.inherits(File, StreamLogger); + +File.prototype.onProcessExit = _.handler(function () { + // flush the write buffer to disk + var writeBuffer = this.stream._writableState.buffer; + var out = ''; + if (writeBuffer) { + writeBuffer.forEach(function (buffered) { + out += buffered.chunk.toString(); + }); + fs.appendFileSync(this.path, out); + } +}); diff --git a/src/lib/loggers/stdio.js b/src/lib/loggers/stdio.js new file mode 100644 index 000000000..ee092d7c7 --- /dev/null +++ b/src/lib/loggers/stdio.js @@ -0,0 +1,119 @@ +/** + * Special version of the Stream logger, which logs errors and warnings to stderr and all other + * levels to stdout. + * + * @class Loggers.Stdio + * @extends LoggerAbstract + * @constructor + * @param {Object} config - The configuration for the Logger + * @param {string} config.level - The highest log level for this logger to output. + * @param {Log} bridge - The object that triggers logging events, which we will record + */ + +module.exports = Stdio; + +var clc = require('cli-color'), + LoggerAbstract = require('../logger'), + _ = require('../utils'); + +function Stdio(config, bridge) { + Stdio.callSuper(this, arguments); + + // config/state + this.color = _.has(config, 'color') ? !!config.color : true; +} + +_.inherits(Stdio, LoggerAbstract); + +/** + * Sends output to a stream, does some formatting first + * + * @method write + * @private + * @param {WritableStream} to - The stream that should receive this message + * @param {String} label - The text that should be used at the beginning the message + * @param {function} colorize - A function that receives a string and returned a colored version of it + * @param {*} what - The message to log + * @return {undefined} + */ +Stdio.prototype.write = function (to, label, colorize, message) { + if (this.color) { + label = colorize(label); + } + to.write(this.format(label, message)); +}; + +/** + * Handler for the bridges "error" event + * + * @method onError + * @private + * @param {Error} e - The Error object to log + * @return {undefined} + */ +Stdio.prototype.onError = _.handler(function (e) { + this.write(process.stderr, e.name === 'Error' ? 'ERROR' : e.name, clc.red.bold, e.stack); +}); + +/** + * Handler for the bridges "warning" event + * + * @method onWarning + * @private + * @param {String} msg - The message to be logged + * @return {undefined} + */ +Stdio.prototype.onWarning = _.handler(function (msg) { + this.write(process.stderr, 'WARNING', clc.yellow.bold, msg); +}); + +/** + * Handler for the bridges "info" event + * + * @method onInfo + * @private + * @param {String} msg - The message to be logged + * @return {undefined} + */ +Stdio.prototype.onInfo = _.handler(function (msg) { + this.write(process.stdout, 'INFO', clc.cyan.bold, msg); +}); + +/** + * Handler for the bridges "debug" event + * + * @method onDebug + * @private + * @param {String} msg - The message to be logged + * @return {undefined} + */ +Stdio.prototype.onDebug = _.handler(function (msg) { + this.write(process.stdout, 'DEBUG', clc.magentaBright.bold, msg); +}); + +/** + * Handler for the bridges "trace" event + * + * @method onTrace + * @private + * @return {undefined} + */ +Stdio.prototype.onTrace = _.handler(function (method, url, body, responseBody, responseStatus) { + var message = 'curl "' + url.replace(/"/g, '\\"') + '" -X' + method.toUpperCase(); + if (body) { + message += ' -d "' + body.replace(/"/g, '\\"') + '"'; + } + message += '\n<- '; + if (this.color) { + if (responseStatus >= 200 && responseStatus < 300) { + message += clc.green.bold(responseStatus); + } else { + message += clc.red.bold(responseStatus); + } + } else { + message += responseStatus; + } + message += '\n' + responseBody; + + this.write(process.stdout, 'TRACE', clc.cyanBright.bold, message); +}); diff --git a/src/lib/loggers/stream.js b/src/lib/loggers/stream.js new file mode 100644 index 000000000..149cfb8d9 --- /dev/null +++ b/src/lib/loggers/stream.js @@ -0,0 +1,51 @@ +/** + * Logger that writes to a file + * + * @class Loggers.File + * @extends LoggerAbstract + * @constructor + * @see LoggerAbstract + * @param {Object} config - The configuration for the Logger (See LoggerAbstract for generic options) + * @param {String} config.path - The location to write + * @param {Log} bridge - The object that triggers logging events, which we will record + */ + +module.exports = Stream; + +var LoggerAbstract = require('../Logger'), + nodeStreams = require('stream'), + _ = require('../utils'), + fs = require('fs'); + +function Stream(config, bridge) { + Stream.callSuper(this, arguments); + _.makeBoundMethods(this); + + if (config.stream instanceof nodeStreams.Writable) { + this.stream = config.stream; + } else { + throw new TypeError('Invalid stream, use an instance of stream.Writeable'); + } + + process.on('exit', this.bound.onProcessExit); +} +_.inherits(Stream, LoggerAbstract); + +// flush the write buffer to stderr synchronously +Stream.prototype.onProcessExit = _.handler(function () { + var writeBuffer = this.stream._writableState.buffer; + if (writeBuffer && writeBuffer.length) { + console.error('Log stream did not get to finish writing. Flushing to stderr'); + writeBuffer.forEach(function (buffered) { + console.error(buffered.chunk.toString()); + }); + } +}); + +Stream.prototype.write = function (label, message) { + this.stream.write(this.format(label, message), 'utf8'); +}; + +Stream.prototype.close = function () { + this.stream.end(); +}; diff --git a/src/lib/selectors/random.js b/src/lib/selectors/random.js new file mode 100644 index 000000000..88e565ab5 --- /dev/null +++ b/src/lib/selectors/random.js @@ -0,0 +1,5 @@ +module.exports = RandomSelect; + +function RandomSelect(connections) { + return connections[Math.floor(Math.random() * connections.length)]; +} diff --git a/src/lib/serializers/json.js b/src/lib/serializers/json.js new file mode 100644 index 000000000..12d3d1929 --- /dev/null +++ b/src/lib/serializers/json.js @@ -0,0 +1,52 @@ +/** + * Simple JSON serializer + * @type {[type]} + */ +module.exports = Json; + +var _ = require('../utils'); + +function Json(client) { + this.client = client; +} + +Json.prototype.serialize = function (val, replacer, spaces) { + if (val == null) { + return null; + } + else if (typeof val === 'string') { + return val; + } else { + return JSON.stringify(val, replacer, spaces); + } +}; + +Json.prototype.unserialize = function (str) { + if (typeof str === 'string') { + try { + return JSON.parse(str); + } catch (e) { + this.client.log.error(new Error('unable to parse', str)); + return null; + } + } else { + return str; + } +}; + +Json.prototype.bulkBody = function (val) { + var body = '', i; + + if (_.isArray(val)) { + for (i = 0; i < val.length; i++) { + body += this.serialize(val[i]) + '\n'; + } + } else if (typeof val === 'string') { + // make sure the string ends in a new line + body = val + (val[val.length - 1] === '\n' ? '' : '\n'); + } else { + throw new TypeError('Bulk body should either be an Array of commands/string, or a String'); + } + + return body; +};