updated the cases for files that git was ignoreing

This commit is contained in:
Spencer Alger
2013-10-23 08:34:48 -07:00
parent 7fef00d9ce
commit 0e817aac36
9 changed files with 852 additions and 117 deletions

218
src/lib/client.js Normal file
View File

@ -0,0 +1,218 @@
/**
* A client that makes requests to Elasticsearch via a {{#crossLink "Transport"}}Transport{{/crossLink}}
*
* Initializing a client might look something like:
*
* ```
* var client = new es.Client({
* hosts: [
* 'es1.net:9200',
* {
* host: 'es2.net',
* port: 9200
* }
* ],
* sniffOnStart: true,
* log: {
* type: 'file',
* level: 'warning'
* }
* });
* ```
*
* @class Client
* @constructor
* @param {Object} [config={}] - Configuration for the transport
* @param {Object} [config.transport] - Transport settings passed to {{#crossLink "Transport"}}Transport Constructor{{/crossLink}}
* @param {String|Array<String>} [config.log] - Log output settings {{#crossLink "Log"}}Log Constructor{{/crossLink}}
* @param {Object} [config.trace=false] - Create a log output to stdio that only tracks trace logs
*/
module.exports = Client;
var _ = require('./utils');
var ClientConfig = require('./client_config');
// var api = _.reKey(_.requireDir(module, '../api'), _.camelCase);
var q = require('q');
var errors = require('./errors');
function Client(config) {
this.client = this;
// setup the config.. this config will be passed EVERYWHERE so for good measure it is locked down
Object.defineProperty(this, 'config', {
configurable: false,
enumerable: false,
writable: false,
value: !config || _.isPlainObject(config) ? new ClientConfig(config) : config,
});
this.config.client = this;
for (var i = 0; i < _namespaces.length; i++) {
this[_namespaces[i]] = new this[_namespaces[i]](this);
}
}
/**
* Perform a request with the client's transport
*
* @method request
* @todo async body writing
* @todo abort
* @todo access to custom headers, modifying of request in general
* @param {object} params
* @param {String} params.url - The url for the request
* @param {String} params.method - The HTTP method for the request
* @param {String} params.body - The body of the HTTP request
* @param {Function} cb - A function to call back with (error, responseBody, responseStatus)
*/
Client.prototype.request = function (params, cb) {
var serializer = this.config.serializer;
// in cb isn't a function make it one
cb = typeof cb === 'function' ? cb : _.noop;
var connectionPool = this.config.connectionPool;
var log = this.config.log;
var remainingRetries = this.config.maxRetries;
var connection;
log.debug('starting request', params);
// get ignore and ensure that it's an array
var ignore = params.ignore;
if (ignore && !_.isArray(ignore)) {
ignore = [ignore];
}
// serialize the body
if (params.body) {
params.body = params.bulkBody ? serializer.bulkBody(params.body) : serializer.serialize(params.body);
}
if (params.body && params.method === 'GET') {
_.nextTick(cb, new TypeError('Body can not be sent with method "GET"'));
return;
}
function sendRequestWithConnection(err, _connection) {
if (err) {
log.error(err);
respond(err);
} else if (_connection) {
connection = _connection;
log.info('Selected', _connection.status, 'Connection, making request');
connection.request(params, checkRespForFailure);
} else {
log.warning('No living connections');
respond(new errors.ConnectionFault('No living connections.'));
}
}
function checkRespForFailure(err, reqParams, body, status) {
connection.setStatus(err ? 'dead' : 'alive');
if (err) {
log.error(err);
}
if (err && remainingRetries) {
remainingRetries--;
log.info('Connection error, retrying');
connectionPool.select(sendRequestWithConnection);
} else {
log.info('Request complete');
respond(err, reqParams, body, status);
}
}
function respond(err, reqParams, body, status) {
var parsedBody = null;
if (reqParams) {
log.trace(reqParams.method, reqParams, params.body, body, status);
}
if (!err) {
if (body) {
parsedBody = serializer.unserialize(body);
if (!parsedBody) {
err = new errors.ParseError();
}
} else if (reqParams.method === 'HEAD') {
parsedBody = (status === 200);
}
}
if (err) {
cb(err, parsedBody, status);
} else if ((status >= 200 && status < 300) || ignore && _.contains(ignore, status)) {
cb(void 0, parsedBody, status);
} else {
if (errors[status]) {
cb(new errors[status](parsedBody.error), parsedBody, status);
} else {
cb(new errors.Generic('unknown error'), parsedBody, status);
}
}
}
connectionPool.select(sendRequestWithConnection);
};
/**
* Ping some node to ensure that the cluster is available in some respect
*
* @param {Object} params - Currently just a placeholder, no params used at this time
* @param {Function} cb - callback
*/
Client.prototype.ping = function (params, cb) {
this.request({
method: 'HEAD',
path: '/'
}, cb);
};
/**
* Ask an ES node for a list of all the nodes, add/remove nodes from the connection
* pool as appropriate
*
* @param {Function} cb - Function to call back once complete
*/
Client.prototype.sniff = function (cb) {
var config = this.config;
// make cb a function if it isn't
cb = typeof cb === 'function' ? cb : _.noop;
this.request({
path: '/_cluster/nodes',
method: 'GET'
}, function (err, resp) {
if (!err && resp && resp.nodes) {
var nodes = config.nodesToHostCallback(resp.nodes);
config.connectionPool.setNodes(nodes);
}
cb(err, resp);
});
}
var _namespaces = [];
/**
* These names of the properties that hold namespace objects in the Client prototype
* @type {Array}
*/
Client.namespace = function (namespace) {
var steps = namespace.split('.');
var path = [];
var on = Client;
var i;
for (i = 0; i < steps.length; i ++) {
path.push(steps[i]);
_namespaces.push(path.join('.'));
on.prototype[steps[i]] = function ClientActionNamespace(client) {
this.client = client;
};
}
};
require('./api.js').attach(Client);

View File

@ -13,116 +13,6 @@ var errors = require('./errors');
var _ = require('./utils');
var urlParamRE = /\{(\w+)\}/g;
function exec(client, spec, params, cb) {
if (typeof params === 'function') {
cb = params;
params = {};
} else {
params = params || {};
cb = typeof cb === 'function' ? cb : _.noop;
}
var request = {
ignore: params.ignore
};
var parts = {};
var query = {};
var i;
if (spec.needsBody && !params.body) {
return _.nextTick(cb, new TyperError(spec.name + ' requires a request body.'));
}
if (params.body) {
request.body = params.body;
request.bulkBody = spec.bulkBody;
}
if (spec.methods.length === 1) {
request.method = spec.methods[0];
} else {
// if set, uppercase the user's choice, other wise returns ""
request.method = _.toUpperString(params.method);
if (request.method) {
// use the one specified as long as it's a valid option
if (!_.contains(spec.methods, request.method)) {
return _.nextTick(cb, new TypeError('Invalid method: should be one of ' + spec.methods.join(', ')));
}
} else {
// pick a method
if (request.body) {
// first method that isn't "GET"
request.method = spec.methodWithBody || (
spec.methodWithBody = _.find(spec.methods, function (m) { return m !== 'GET'; })
);
} else {
// just use the first option
request.method = spec.methods[0];
}
}
}
if (spec.url) {
// only one url option
request.path = resolveUrl(spec.url, params);
} else {
for (i = 0; i < spec.urls.length; i++) {
if (request.path = resolveUrl(spec.urls[i], params)) {
break;
}
}
}
if (!request.path) {
// there must have been some mimimun requirements that were not met
return _.nextTick(
cb,
new TypeError(
'Unable to build a path with those params. Supply at least ' +
_.keys(spec.urls[spec.urls.length - 1].req).join(', ')
)
);
}
// build the query string
if (!spec.paramKeys) {
// build a key list on demand
spec.paramKeys = _.keys(spec.params);
}
var key, param;
for (i = 0; i < spec.paramKeys.length; i++) {
key = spec.paramKeys[i];
param = spec.params[key];
try {
if (params[key] != null) {
query[key] = castType[param.type] ? castType[param.type](param, params[key], key) : params[key];
if (param['default'] && query[key] === param['default']) {
delete query[key];
}
} else if (param.required) {
throw new TypeError('Missing required parameter ' + key);
}
} catch (e) {
return _.nextTick(cb, e);
}
}
request.path = request.path + _.makeQueryString(query);
if (spec.castNotFound) {
client.request(request, function (err, response) {
if (err instanceof errors.NotFound) {
cb(null, false);
} else {
cb(err, !err);
}
});
} else {
client.request(request, cb);
}
};
var castType = {
enum: function (param, val, name) {
if (_.contains(param.options, val)) {
@ -239,3 +129,113 @@ function resolveUrl (url, params) {
delete params[name];
}, {}));
};
function exec(client, spec, params, cb) {
if (typeof params === 'function') {
cb = params;
params = {};
} else {
params = params || {};
cb = typeof cb === 'function' ? cb : _.noop;
}
var request = {
ignore: params.ignore
};
var parts = {};
var query = {};
var i;
if (spec.needsBody && !params.body) {
return _.nextTick(cb, new TyperError(spec.name + ' requires a request body.'));
}
if (params.body) {
request.body = params.body;
request.bulkBody = spec.bulkBody;
}
if (spec.methods.length === 1) {
request.method = spec.methods[0];
} else {
// if set, uppercase the user's choice, other wise returns ""
request.method = _.toUpperString(params.method);
if (request.method) {
// use the one specified as long as it's a valid option
if (!_.contains(spec.methods, request.method)) {
return _.nextTick(cb, new TypeError('Invalid method: should be one of ' + spec.methods.join(', ')));
}
} else {
// pick a method
if (request.body) {
// first method that isn't "GET"
request.method = spec.methodWithBody || (
spec.methodWithBody = _.find(spec.methods, function (m) { return m !== 'GET'; })
);
} else {
// just use the first option
request.method = spec.methods[0];
}
}
}
if (spec.url) {
// only one url option
request.path = resolveUrl(spec.url, params);
} else {
for (i = 0; i < spec.urls.length; i++) {
if (request.path = resolveUrl(spec.urls[i], params)) {
break;
}
}
}
if (!request.path) {
// there must have been some mimimun requirements that were not met
return _.nextTick(
cb,
new TypeError(
'Unable to build a path with those params. Supply at least ' +
_.keys(spec.urls[spec.urls.length - 1].req).join(', ')
)
);
}
// build the query string
if (!spec.paramKeys) {
// build a key list on demand
spec.paramKeys = _.keys(spec.params);
}
var key, param;
for (i = 0; i < spec.paramKeys.length; i++) {
key = spec.paramKeys[i];
param = spec.params[key];
try {
if (params[key] != null) {
query[key] = castType[param.type] ? castType[param.type](param, params[key], key) : params[key];
if (param['default'] && query[key] === param['default']) {
delete query[key];
}
} else if (param.required) {
throw new TypeError('Missing required parameter ' + key);
}
} catch (e) {
return _.nextTick(cb, e);
}
}
request.path = request.path + _.makeQueryString(query);
if (spec.castNotFound) {
client.request(request, function (err, response) {
if (err instanceof errors.NotFound) {
cb(null, false);
} else {
cb(err, !err);
}
});
} else {
client.request(request, cb);
}
};

View File

@ -27,6 +27,12 @@ var defaultConfig = {
],
connectionConstructor: 'Http',
selector: selectors.roundRobin,
sniffOnStart: false,
sniffAfterRequests: null,
sniffOnConnectionFail: false,
maxRetries: 3,
timeout: 10000,
deadTimeout: 60000,
nodesToHostCallback: function (nodes) {
var hosts = [];
_.each(nodes, function (node, id) {
@ -41,13 +47,7 @@ var defaultConfig = {
});
});
return hosts;
},
sniffOnStart: false,
sniffAfterRequests: null,
sniffOnConnectionFail: false,
maxRetries: 3,
timeout: 10000,
deadTimeout: 60000
}
};
function ClientConfig(config) {

250
src/lib/log.js Normal file
View File

@ -0,0 +1,250 @@
var _ = require('./utils'),
url = require('url'),
EventEmitter = require('events').EventEmitter;
/**
* Log bridge, which is an [EventEmitter](http://nodejs.org/api/events.html#events_class_events_eventemitter)
* that sends events to one or more outputs/loggers. Setup these loggers by
* specifying their config as the first argument, or by passing it to addOutput().
*
* @class Log
* @uses Loggers.Stdio
* @constructor
* @param {string|Object|ArrayOfStrings|ArrayOfObjects} output - Either the level
* to setup a single logger, a full config object for alogger, or an array of
* config objects to use for creating log outputs.
* @param {string} output.level - One of the keys in Log.levels (error, warning, etc.)
* @param {string} output.type - The name of the logger to use for this output
*/
function Log(config) {
this.config = config;
var i;
var output = config.log || 2;
if (_.isString(output) || _.isFinite(output)) {
output = [
{
level: output
}
];
} else if (_.isPlainObject(output)) {
output = [output];
} else if (_.isArray(output)) {
for (i = 0; i < output.length; i++) {
if (_.isString(output[i])) {
output[i] = {
level: output[i]
};
}
}
}
if (!_.isArrayOfPlainObjects(output)) {
throw new TypeError('Invalid Logging output config');
}
for (i = 0; i < output.length; i++) {
this.addOutput(output[i]);
}
}
_.inherits(Log, EventEmitter);
/**
* Levels observed by the loggers, ordered by rank
*
* @property levels
* @type Array
* @static
*/
Log.levels = [
/**
* Event fired for error level log entries
* @event error
* @param {Error} error - The error object to log
*/
'error',
/**
* Event fired for "warning" level log entries, which usually represent things
* like correctly formatted error responses from ES (400, ...) and recoverable
* errors (one node unresponsive)
*
* @event warning
* @param {String} message - A message to be logged
*/
'warning',
/**
* Event fired for "info" level log entries, which usually describe what a
* client is doing (sniffing etc)
*
* @event info
* @param {String} message - A message to be logged
*/
'info',
/**
* Event fired for "debug" level log entries, which will describe requests sent,
* including their url (no data, response codes, or exec times)
*
* @event debug
* @param {String} message - A message to be logged
*/
'debug',
/**
* Event fired for "trace" level log entries, which provide detailed information
* about each request made from a client, including reponse codes, execution times,
* and a full curl command that can be copied and pasted into a terminal
*
* @event trace
* @param {String} method method, , body, responseStatus, responseBody
* @param {String} url - The url the request was made to
* @param {String} body - The body of the request
* @param {Integer} responseStatus - The status code returned from the response
* @param {String} responseBody - The body of the response
*/
'trace'
];
/**
* Converts a log config value (string or array) to an array of level names which
* it represents
*
* @method parseLevels
* @static
* @private
* @param {String|ArrayOfStrings} input - Cound be a string to specify the max
* level, or an array of exact levels
* @return {Array} -
*/
Log.parseLevels = function (input) {
if (_.isString(input)) {
return Log.levels.slice(0, _.indexOf(Log.levels, input) + 1);
}
else if (_.isArray(input)) {
return _.intersection(input, Log.levels);
}
};
/**
* Combine the array-like param into a simple string
*
* @method join
* @static
* @private
* @param {*} arrayish - An array like object that can be itterated by _.each
* @return {String} - The final string.
*/
Log.join = function (arrayish) {
return _.map(arrayish, function (item) {
if (_.isPlainObject(item)) {
return _.inspect(item) + '\n';
} else {
return item.toString();
}
}).join(' ');
};
/**
* Create a new logger, based on the config.
*
* @method addOutput
* @param {object} config - An object with config options for the logger.
* @param {String} [config.type=stdio] - The name of an output/logger. Options
* can be found in the `src/loggers` directory.
* @param {String|ArrayOfStrings} [config.levels=warning] - The levels to output
* to this logger, when an array is specified no levels other than the ones
* specified will be listened to. When a string is specified, that and all lower
* levels will be logged.
* @return {Logger}
*/
Log.prototype.addOutput = function (config) {
var levels = Log.parseLevels(config.levels || config.level || 'warning');
_.defaults(config || {}, {
type: 'stdio',
});
// force the levels config
delete config.level;
config.levels = levels;
var Logger = require('./loggers/' + config.type);
return new Logger(config, this);
};
/**
* Log an error
*
* @method error
* @param {Error|String} error The Error to log
* @return {Boolean} - True if any outputs accepted the message
*/
Log.prototype.error = function (e) {
if (EventEmitter.listenerCount(this, 'error')) {
return this.emit('error', e instanceof Error ? e : new Error(e));
}
};
/**
* Log a warning message
*
* @method warning
* @param {*} msg* - Any amount of messages that will be joined before logged
* @return {Boolean} - True if any outputs accepted the message
*/
Log.prototype.warning = function (/* ...msg */) {
if (EventEmitter.listenerCount(this, 'warning')) {
return this.emit('warning', Log.join(arguments));
}
};
/**
* Log useful info about what's going on
*
* @method info
* @param {*} msg* - Any amount of messages that will be joined before logged
* @return {Boolean} - True if any outputs accepted the message
*/
Log.prototype.info = function (/* ...msg */) {
if (EventEmitter.listenerCount(this, 'info')) {
return this.emit('info', Log.join(arguments));
}
};
/**
* Log a debug level message
*
* @method debug
* @param {*} msg* - Any amount of messages that will be joined before logged
* @return {Boolean} - True if any outputs accepted the message
*/
Log.prototype.debug = function (/* ...msg */) {
if (EventEmitter.listenerCount(this, 'debug')) {
return this.emit('debug', Log.join(arguments) + _.getStackTrace(Log.prototype.debug));
}
};
/**
* Log a trace level message
*
* @method trace
* @param {String} method - HTTP request method
* @param {String|Object} requestUrl - URL requested. If the value is an object,
* it is expected to be the return value of Node's url.parse()
* @param {String} body - The request's body
* @param {String} responseBody - body returned from ES
* @param {String} responseStatus - HTTP status code
* @return {Boolean} - True if any outputs accepted the message
*/
Log.prototype.trace = function (method, requestUrl, body, responseBody, responseStatus) {
if (EventEmitter.listenerCount(this, 'trace')) {
if (typeof requestUrl === 'object') {
requestUrl = _.formatUrl(requestUrl);
}
return this.emit('trace', method, requestUrl, body, responseBody, responseStatus);
}
};
module.exports = Log;

40
src/lib/loggers/file.js Normal file
View File

@ -0,0 +1,40 @@
/**
* Logger that writes to a file
*
* @class Loggers.File
* @extends StreamLogger
* @constructor
* @param {Object} config - The configuration for the Logger (See LoggerAbstract for generic options)
* @param {String} config.path - The location to write
* @param {Log} bridge - The object that triggers logging events, which we will record
*/
module.exports = File;
var StreamLogger = require('./stream'),
_ = require('../utils'),
fs = require('fs');
function File(config, bridge) {
this.path = config.path;
config.stream = fs.createWriteStream(config.path, {
flags: 'a',
encoding: 'utf8'
});
File.callSuper(this, arguments);
}
_.inherits(File, StreamLogger);
File.prototype.onProcessExit = _.handler(function () {
// flush the write buffer to disk
var writeBuffer = this.stream._writableState.buffer;
var out = '';
if (writeBuffer) {
writeBuffer.forEach(function (buffered) {
out += buffered.chunk.toString();
});
fs.appendFileSync(this.path, out);
}
});

119
src/lib/loggers/stdio.js Normal file
View File

@ -0,0 +1,119 @@
/**
* Special version of the Stream logger, which logs errors and warnings to stderr and all other
* levels to stdout.
*
* @class Loggers.Stdio
* @extends LoggerAbstract
* @constructor
* @param {Object} config - The configuration for the Logger
* @param {string} config.level - The highest log level for this logger to output.
* @param {Log} bridge - The object that triggers logging events, which we will record
*/
module.exports = Stdio;
var clc = require('cli-color'),
LoggerAbstract = require('../logger'),
_ = require('../utils');
function Stdio(config, bridge) {
Stdio.callSuper(this, arguments);
// config/state
this.color = _.has(config, 'color') ? !!config.color : true;
}
_.inherits(Stdio, LoggerAbstract);
/**
* Sends output to a stream, does some formatting first
*
* @method write
* @private
* @param {WritableStream} to - The stream that should receive this message
* @param {String} label - The text that should be used at the beginning the message
* @param {function} colorize - A function that receives a string and returned a colored version of it
* @param {*} what - The message to log
* @return {undefined}
*/
Stdio.prototype.write = function (to, label, colorize, message) {
if (this.color) {
label = colorize(label);
}
to.write(this.format(label, message));
};
/**
* Handler for the bridges "error" event
*
* @method onError
* @private
* @param {Error} e - The Error object to log
* @return {undefined}
*/
Stdio.prototype.onError = _.handler(function (e) {
this.write(process.stderr, e.name === 'Error' ? 'ERROR' : e.name, clc.red.bold, e.stack);
});
/**
* Handler for the bridges "warning" event
*
* @method onWarning
* @private
* @param {String} msg - The message to be logged
* @return {undefined}
*/
Stdio.prototype.onWarning = _.handler(function (msg) {
this.write(process.stderr, 'WARNING', clc.yellow.bold, msg);
});
/**
* Handler for the bridges "info" event
*
* @method onInfo
* @private
* @param {String} msg - The message to be logged
* @return {undefined}
*/
Stdio.prototype.onInfo = _.handler(function (msg) {
this.write(process.stdout, 'INFO', clc.cyan.bold, msg);
});
/**
* Handler for the bridges "debug" event
*
* @method onDebug
* @private
* @param {String} msg - The message to be logged
* @return {undefined}
*/
Stdio.prototype.onDebug = _.handler(function (msg) {
this.write(process.stdout, 'DEBUG', clc.magentaBright.bold, msg);
});
/**
* Handler for the bridges "trace" event
*
* @method onTrace
* @private
* @return {undefined}
*/
Stdio.prototype.onTrace = _.handler(function (method, url, body, responseBody, responseStatus) {
var message = 'curl "' + url.replace(/"/g, '\\"') + '" -X' + method.toUpperCase();
if (body) {
message += ' -d "' + body.replace(/"/g, '\\"') + '"';
}
message += '\n<- ';
if (this.color) {
if (responseStatus >= 200 && responseStatus < 300) {
message += clc.green.bold(responseStatus);
} else {
message += clc.red.bold(responseStatus);
}
} else {
message += responseStatus;
}
message += '\n' + responseBody;
this.write(process.stdout, 'TRACE', clc.cyanBright.bold, message);
});

51
src/lib/loggers/stream.js Normal file
View File

@ -0,0 +1,51 @@
/**
* Logger that writes to a file
*
* @class Loggers.File
* @extends LoggerAbstract
* @constructor
* @see LoggerAbstract
* @param {Object} config - The configuration for the Logger (See LoggerAbstract for generic options)
* @param {String} config.path - The location to write
* @param {Log} bridge - The object that triggers logging events, which we will record
*/
module.exports = Stream;
var LoggerAbstract = require('../Logger'),
nodeStreams = require('stream'),
_ = require('../utils'),
fs = require('fs');
function Stream(config, bridge) {
Stream.callSuper(this, arguments);
_.makeBoundMethods(this);
if (config.stream instanceof nodeStreams.Writable) {
this.stream = config.stream;
} else {
throw new TypeError('Invalid stream, use an instance of stream.Writeable');
}
process.on('exit', this.bound.onProcessExit);
}
_.inherits(Stream, LoggerAbstract);
// flush the write buffer to stderr synchronously
Stream.prototype.onProcessExit = _.handler(function () {
var writeBuffer = this.stream._writableState.buffer;
if (writeBuffer && writeBuffer.length) {
console.error('Log stream did not get to finish writing. Flushing to stderr');
writeBuffer.forEach(function (buffered) {
console.error(buffered.chunk.toString());
});
}
});
Stream.prototype.write = function (label, message) {
this.stream.write(this.format(label, message), 'utf8');
};
Stream.prototype.close = function () {
this.stream.end();
};

View File

@ -0,0 +1,5 @@
module.exports = RandomSelect;
function RandomSelect(connections) {
return connections[Math.floor(Math.random() * connections.length)];
}

View File

@ -0,0 +1,52 @@
/**
* Simple JSON serializer
* @type {[type]}
*/
module.exports = Json;
var _ = require('../utils');
function Json(client) {
this.client = client;
}
Json.prototype.serialize = function (val, replacer, spaces) {
if (val == null) {
return null;
}
else if (typeof val === 'string') {
return val;
} else {
return JSON.stringify(val, replacer, spaces);
}
};
Json.prototype.unserialize = function (str) {
if (typeof str === 'string') {
try {
return JSON.parse(str);
} catch (e) {
this.client.log.error(new Error('unable to parse', str));
return null;
}
} else {
return str;
}
};
Json.prototype.bulkBody = function (val) {
var body = '', i;
if (_.isArray(val)) {
for (i = 0; i < val.length; i++) {
body += this.serialize(val[i]) + '\n';
}
} else if (typeof val === 'string') {
// make sure the string ends in a new line
body = val + (val[val.length - 1] === '\n' ? '' : '\n');
} else {
throw new TypeError('Bulk body should either be an Array of commands/string, or a String');
}
return body;
};