Updated the method for creating the defer object in the transport, it is now a property of the Transport class itself. Fixed the error.message logging in the Transport. Added sniffOn... related stuff
This commit is contained in:
@ -10,16 +10,12 @@ var Host = require('./host');
|
|||||||
var when = require('when');
|
var when = require('when');
|
||||||
|
|
||||||
function Transport(config) {
|
function Transport(config) {
|
||||||
|
var self = this;
|
||||||
config = config || {};
|
config = config || {};
|
||||||
|
|
||||||
var LogClass = (typeof config.log === 'function') ? config.log : require('./log');
|
var LogClass = (typeof config.log === 'function') ? config.log : require('./log');
|
||||||
config.log = this.log = new LogClass(config);
|
config.log = this.log = new LogClass(config);
|
||||||
|
|
||||||
// overwrite the createDefer method if a new implementation is provided
|
|
||||||
if (typeof config.createDefer === 'function') {
|
|
||||||
this.createDefer = config.createDefer;
|
|
||||||
}
|
|
||||||
|
|
||||||
// setup the connection pool
|
// setup the connection pool
|
||||||
var ConnectionPool = _.funcEnum(config, 'connectionPool', Transport.connectionPools, 'main');
|
var ConnectionPool = _.funcEnum(config, 'connectionPool', Transport.connectionPools, 'main');
|
||||||
this.connectionPool = new ConnectionPool(config);
|
this.connectionPool = new ConnectionPool(config);
|
||||||
@ -65,6 +61,19 @@ function Transport(config) {
|
|||||||
|
|
||||||
this.connectionPool.setHosts(hosts);
|
this.connectionPool.setHosts(hosts);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (config.sniffOnStart) {
|
||||||
|
this.sniff();
|
||||||
|
}
|
||||||
|
|
||||||
|
if (config.sniffInterval) {
|
||||||
|
this._sniffTimeout = setTimeout(function doSniff() {
|
||||||
|
self.sniff();
|
||||||
|
self._sniffTimeout = setTimeout(doSniff, config.sniffInterval);
|
||||||
|
}, config.sniffInterval);
|
||||||
|
}
|
||||||
|
|
||||||
|
this.sniffAfterConnectionFault = config.sniffAfterConnectionFault;
|
||||||
}
|
}
|
||||||
|
|
||||||
Transport.connectionPools = {
|
Transport.connectionPools = {
|
||||||
@ -79,6 +88,10 @@ Transport.nodesToHostCallbacks = {
|
|||||||
main: require('./nodes_to_host')
|
main: require('./nodes_to_host')
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Transport.createDefer = function () {
|
||||||
|
return when.defer();
|
||||||
|
};
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Perform a request with the client's transport
|
* Perform a request with the client's transport
|
||||||
*
|
*
|
||||||
@ -150,10 +163,10 @@ Transport.prototype.request = function (params, cb) {
|
|||||||
connection.setStatus('dead');
|
connection.setStatus('dead');
|
||||||
if (remainingRetries) {
|
if (remainingRetries) {
|
||||||
remainingRetries--;
|
remainingRetries--;
|
||||||
self.log.error('Request error, retrying --', err.message);
|
self.log.error('Request error, retrying' + (err.message ? ' -- ' + err.message : ''));
|
||||||
self.connectionPool.select(sendReqWithConnection);
|
self.connectionPool.select(sendReqWithConnection);
|
||||||
} else {
|
} else {
|
||||||
self.log.error('Request complete with error --', err.message);
|
self.log.error('Request complete with error' + (err.message ? ' -- ' + err.message : ''));
|
||||||
respond(new errors.ConnectionFault(err));
|
respond(new errors.ConnectionFault(err));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
@ -169,12 +182,14 @@ Transport.prototype.request = function (params, cb) {
|
|||||||
|
|
||||||
clearTimeout(requestTimeoutId);
|
clearTimeout(requestTimeoutId);
|
||||||
var parsedBody;
|
var parsedBody;
|
||||||
|
var isJson = !headers || (headers['content-type'] && ~headers['content-type'].indexOf('application/json'));
|
||||||
|
|
||||||
if (!err && body) {
|
if (!err && body) {
|
||||||
if (!headers || headers['content-type'] === 'application/json') {
|
if (isJson) {
|
||||||
parsedBody = self.serializer.deserialize(body);
|
parsedBody = self.serializer.deserialize(body);
|
||||||
if (parsedBody == null) {
|
if (parsedBody == null) {
|
||||||
err = new errors.Serialization();
|
err = new errors.Serialization();
|
||||||
|
parsedBody = body;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
parsedBody = body;
|
parsedBody = body;
|
||||||
@ -194,7 +209,7 @@ Transport.prototype.request = function (params, cb) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// how do we parse the body?
|
// can we cast notfound to false?
|
||||||
if (params.castExists) {
|
if (params.castExists) {
|
||||||
if (err && err instanceof errors.NotFound) {
|
if (err && err instanceof errors.NotFound) {
|
||||||
parsedBody = false;
|
parsedBody = false;
|
||||||
@ -207,11 +222,13 @@ Transport.prototype.request = function (params, cb) {
|
|||||||
// how do we send the response?
|
// how do we send the response?
|
||||||
if (typeof cb === 'function') {
|
if (typeof cb === 'function') {
|
||||||
if (err) {
|
if (err) {
|
||||||
cb(err);
|
cb(err, parsedBody, status);
|
||||||
} else {
|
} else {
|
||||||
cb(void 0, parsedBody, status);
|
cb(void 0, parsedBody, status);
|
||||||
}
|
}
|
||||||
} else if (err) {
|
} else if (err) {
|
||||||
|
err.body = parsedBody;
|
||||||
|
err.status = status;
|
||||||
defer.reject(err);
|
defer.reject(err);
|
||||||
} else {
|
} else {
|
||||||
defer.resolve({
|
defer.resolve({
|
||||||
@ -250,7 +267,7 @@ Transport.prototype.request = function (params, cb) {
|
|||||||
abort: abortRequest
|
abort: abortRequest
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
defer = this.createDefer();
|
defer = Transport.createDefer();
|
||||||
request = defer.promise;
|
request = defer.promise;
|
||||||
request.abort = abortRequest;
|
request.abort = abortRequest;
|
||||||
}
|
}
|
||||||
@ -260,9 +277,7 @@ Transport.prototype.request = function (params, cb) {
|
|||||||
return request;
|
return request;
|
||||||
};
|
};
|
||||||
|
|
||||||
Transport.prototype.createDefer = function () {
|
|
||||||
return when.defer();
|
|
||||||
};
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Ask an ES node for a list of all the nodes, add/remove nodes from the connection
|
* Ask an ES node for a list of all the nodes, add/remove nodes from the connection
|
||||||
|
|||||||
Reference in New Issue
Block a user