diff --git a/index.js b/index.js index f71ecbf6a..8454bce5d 100644 --- a/index.js +++ b/index.js @@ -48,14 +48,16 @@ class Client extends EventEmitter { requestTimeout: 30000, sniffInterval: false, sniffOnStart: false, - ssl: null + ssl: null, + agent: null }, opts) this[kSelector] = new options.Selector() this[kSerializer] = new options.Serializer() this[kConnectionPool] = new options.ConnectionPool({ selector: this[kSelector], - ssl: options.ssl + ssl: options.ssl, + agent: null }) // Add the connections before initialize the Transport diff --git a/lib/Connection.js b/lib/Connection.js index a08765d8a..99d9f7be9 100644 --- a/lib/Connection.js +++ b/lib/Connection.js @@ -24,8 +24,7 @@ class Connection { keepAlive: true, keepAliveMsecs: 1000, maxSockets: Infinity, - maxFreeSockets: 256, - timeout: 60000 + maxFreeSockets: 256 }, opts.host.agent || opts.agent) this._agent = this.host.protocol === 'http:' ? new HttpAgent(agentOptions) diff --git a/lib/ConnectionPool.js b/lib/ConnectionPool.js index 7e16b5269..e6fbd446b 100644 --- a/lib/ConnectionPool.js +++ b/lib/ConnectionPool.js @@ -10,7 +10,8 @@ class ConnectionPool { this.alive = [] this.dead = [] this.selector = opts.selector - this.sll = opts.sll + this._ssl = opts.ssl + this._agent = opts.agent // the resurrect timeout is 60s // we multiply it by 2 because the resurrect formula is // `Math.pow(resurrectTimeout * 2, deadCount -1)` @@ -143,12 +144,17 @@ class ConnectionPool { if (typeof opts === 'string') { opts = this.urlToHost(opts) } - Object.assign(opts, this.ssl) + if (opts.ssl == null) opts.ssl = this._ssl + if (opts.agent == null) opts.agent = this._agent + const connection = new Connection(opts) debug('Adding a new connection', connection) + if (this.connections.has(connection.id)) { + throw new Error(`Connection with id '${connection.id} is already present`) + } this.connections.set(connection.id, connection) this.alive.push(connection.id) - return this + return connection } /** diff --git a/lib/Transport.js b/lib/Transport.js index 3c962f2df..6f30dc7c0 100644 --- a/lib/Transport.js +++ b/lib/Transport.js @@ -49,6 +49,7 @@ class Transport { params.headers['Content-Length'] = '' + Buffer.byteLength(params.body) } + params.timeout = params.timeout || this.requestTimeout this.emit('request', params) const request = connection.request(params, (err, response) => { if (err != null) { @@ -61,7 +62,7 @@ class Transport { const error = err.message === 'Request timed out' ? new TimeoutError(err.message) - : ConnectionError(err.message) + : new ConnectionError(err.message) this.emit('error', error, params) return callback(error) @@ -70,14 +71,19 @@ class Transport { var json = '' response.setEncoding('utf8') response.on('data', chunk => { json += chunk }) - response.on('error', err => callback(err)) + response.on('error', err => callback(new ConnectionError(err.message))) response.on('end', () => { this.connectionPool.markAlive(connection) - try { - var payload = this.serializer.deserialize(json) - } catch (err) { - this.emit('error', err) - return callback(err) + const contentType = response.headers['content-type'] + if (contentType != null && contentType.indexOf('application/json') > -1) { + try { + var payload = this.serializer.deserialize(json) + } catch (err) { + this.emit('error', err) + return callback(err) + } + } else { + payload = json } const { statusCode } = response this.emit('response', params, { statusCode, payload }) @@ -89,9 +95,11 @@ class Transport { }) }) - return function requestAborter () { - request.abort() - debug('Request aborted', params) + return { + abort: () => { + request.abort() + debug('Request aborted', params) + } } }