WIP: initial prototype

- Added support for request timeout
- Improved deserializer
- Fixed minor bugs
- Fixed typos
This commit is contained in:
delvedor
2018-10-22 16:50:17 +02:00
parent 68cca6069b
commit cef4e2dfc1
4 changed files with 32 additions and 17 deletions

View File

@ -24,8 +24,7 @@ class Connection {
keepAlive: true,
keepAliveMsecs: 1000,
maxSockets: Infinity,
maxFreeSockets: 256,
timeout: 60000
maxFreeSockets: 256
}, opts.host.agent || opts.agent)
this._agent = this.host.protocol === 'http:'
? new HttpAgent(agentOptions)

View File

@ -10,7 +10,8 @@ class ConnectionPool {
this.alive = []
this.dead = []
this.selector = opts.selector
this.sll = opts.sll
this._ssl = opts.ssl
this._agent = opts.agent
// the resurrect timeout is 60s
// we multiply it by 2 because the resurrect formula is
// `Math.pow(resurrectTimeout * 2, deadCount -1)`
@ -143,12 +144,17 @@ class ConnectionPool {
if (typeof opts === 'string') {
opts = this.urlToHost(opts)
}
Object.assign(opts, this.ssl)
if (opts.ssl == null) opts.ssl = this._ssl
if (opts.agent == null) opts.agent = this._agent
const connection = new Connection(opts)
debug('Adding a new connection', connection)
if (this.connections.has(connection.id)) {
throw new Error(`Connection with id '${connection.id} is already present`)
}
this.connections.set(connection.id, connection)
this.alive.push(connection.id)
return this
return connection
}
/**

View File

@ -49,6 +49,7 @@ class Transport {
params.headers['Content-Length'] = '' + Buffer.byteLength(params.body)
}
params.timeout = params.timeout || this.requestTimeout
this.emit('request', params)
const request = connection.request(params, (err, response) => {
if (err != null) {
@ -61,7 +62,7 @@ class Transport {
const error = err.message === 'Request timed out'
? new TimeoutError(err.message)
: ConnectionError(err.message)
: new ConnectionError(err.message)
this.emit('error', error, params)
return callback(error)
@ -70,14 +71,19 @@ class Transport {
var json = ''
response.setEncoding('utf8')
response.on('data', chunk => { json += chunk })
response.on('error', err => callback(err))
response.on('error', err => callback(new ConnectionError(err.message)))
response.on('end', () => {
this.connectionPool.markAlive(connection)
try {
var payload = this.serializer.deserialize(json)
} catch (err) {
this.emit('error', err)
return callback(err)
const contentType = response.headers['content-type']
if (contentType != null && contentType.indexOf('application/json') > -1) {
try {
var payload = this.serializer.deserialize(json)
} catch (err) {
this.emit('error', err)
return callback(err)
}
} else {
payload = json
}
const { statusCode } = response
this.emit('response', params, { statusCode, payload })
@ -89,9 +95,11 @@ class Transport {
})
})
return function requestAborter () {
request.abort()
debug('Request aborted', params)
return {
abort: () => {
request.abort()
debug('Request aborted', params)
}
}
}