From 0c8bd328fec784768ede81d2b94bbe25207d411a Mon Sep 17 00:00:00 2001 From: Spencer Alger Date: Sat, 14 Dec 2013 14:07:47 -0700 Subject: [PATCH] Updated the method for creating the defer object in the transport, it is now a property of the Transport class itself. Fixed the error.message logging in the Transport. Added sniffOn... related stuff --- src/lib/transport.js | 43 +++++++++++++++++++++++++++++-------------- 1 file changed, 29 insertions(+), 14 deletions(-) diff --git a/src/lib/transport.js b/src/lib/transport.js index f7ac40228..3b5098b3b 100644 --- a/src/lib/transport.js +++ b/src/lib/transport.js @@ -10,16 +10,12 @@ var Host = require('./host'); var when = require('when'); function Transport(config) { + var self = this; config = config || {}; var LogClass = (typeof config.log === 'function') ? config.log : require('./log'); config.log = this.log = new LogClass(config); - // overwrite the createDefer method if a new implementation is provided - if (typeof config.createDefer === 'function') { - this.createDefer = config.createDefer; - } - // setup the connection pool var ConnectionPool = _.funcEnum(config, 'connectionPool', Transport.connectionPools, 'main'); this.connectionPool = new ConnectionPool(config); @@ -65,6 +61,19 @@ function Transport(config) { this.connectionPool.setHosts(hosts); } + + if (config.sniffOnStart) { + this.sniff(); + } + + if (config.sniffInterval) { + this._sniffTimeout = setTimeout(function doSniff() { + self.sniff(); + self._sniffTimeout = setTimeout(doSniff, config.sniffInterval); + }, config.sniffInterval); + } + + this.sniffAfterConnectionFault = config.sniffAfterConnectionFault; } Transport.connectionPools = { @@ -79,6 +88,10 @@ Transport.nodesToHostCallbacks = { main: require('./nodes_to_host') }; +Transport.createDefer = function () { + return when.defer(); +}; + /** * Perform a request with the client's transport * @@ -150,10 +163,10 @@ Transport.prototype.request = function (params, cb) { connection.setStatus('dead'); if (remainingRetries) { remainingRetries--; - self.log.error('Request error, retrying --', err.message); + self.log.error('Request error, retrying' + (err.message ? ' -- ' + err.message : '')); self.connectionPool.select(sendReqWithConnection); } else { - self.log.error('Request complete with error --', err.message); + self.log.error('Request complete with error' + (err.message ? ' -- ' + err.message : '')); respond(new errors.ConnectionFault(err)); } } else { @@ -169,12 +182,14 @@ Transport.prototype.request = function (params, cb) { clearTimeout(requestTimeoutId); var parsedBody; + var isJson = !headers || (headers['content-type'] && ~headers['content-type'].indexOf('application/json')); if (!err && body) { - if (!headers || headers['content-type'] === 'application/json') { + if (isJson) { parsedBody = self.serializer.deserialize(body); if (parsedBody == null) { err = new errors.Serialization(); + parsedBody = body; } } else { parsedBody = body; @@ -194,7 +209,7 @@ Transport.prototype.request = function (params, cb) { } } - // how do we parse the body? + // can we cast notfound to false? if (params.castExists) { if (err && err instanceof errors.NotFound) { parsedBody = false; @@ -207,11 +222,13 @@ Transport.prototype.request = function (params, cb) { // how do we send the response? if (typeof cb === 'function') { if (err) { - cb(err); + cb(err, parsedBody, status); } else { cb(void 0, parsedBody, status); } } else if (err) { + err.body = parsedBody; + err.status = status; defer.reject(err); } else { defer.resolve({ @@ -250,7 +267,7 @@ Transport.prototype.request = function (params, cb) { abort: abortRequest }; } else { - defer = this.createDefer(); + defer = Transport.createDefer(); request = defer.promise; request.abort = abortRequest; } @@ -260,9 +277,7 @@ Transport.prototype.request = function (params, cb) { return request; }; -Transport.prototype.createDefer = function () { - return when.defer(); -}; + /** * Ask an ES node for a list of all the nodes, add/remove nodes from the connection