WIP: initial prototype

- Expose connection info inside events
- Fixed minor bugs
This commit is contained in:
delvedor
2018-11-19 11:33:40 +01:00
parent d259b82763
commit 020165168c
4 changed files with 27 additions and 20 deletions

View File

@ -59,14 +59,15 @@ class Client extends EventEmitter {
this[kSerializer] = new options.Serializer()
this[kConnectionPool] = new options.ConnectionPool({
pingTimeout: opts.pingTimeout,
resurrectStrategy: opts.resurrectStrategy,
randomizeHost: opts.randomizeHost,
pingTimeout: options.pingTimeout,
resurrectStrategy: options.resurrectStrategy,
randomizeHost: options.randomizeHost,
ssl: options.ssl,
agent: null,
nodeFilter: opts.nodeFilter,
nodeWeighter: opts.nodeWeighter,
nodeSelector: opts.nodeSelector
agent: options.agent,
nodeFilter: options.nodeFilter,
nodeWeighter: options.nodeWeighter,
nodeSelector: options.nodeSelector,
Connection: options.Connection
})
// Add the connections before initialize the Transport
@ -99,7 +100,7 @@ class Client extends EventEmitter {
}
}
Client.events = {
const events = {
RESPONSE: 'response',
REQUEST: 'request',
ERROR: 'error'
@ -109,6 +110,8 @@ module.exports = {
Client,
Transport,
ConnectionPool,
Connection,
Serializer,
symbols
symbols,
events
}

View File

@ -86,6 +86,7 @@ class Connection {
return request
}
// TODO: write a better closing logic
close () {
debug('Closing connection', this.id)
if (this._openRequests > 0) {

View File

@ -25,6 +25,7 @@ class ConnectionPool {
this.pingTimeout = opts.pingTimeout
this.randomizeHost = opts.randomizeHost === true
this.nodeFilter = opts.nodeFilter || defaultNodeFilter
this.Connection = opts.Connection
if (typeof opts.nodeSelector === 'function') {
this.nodeSelector = opts.nodeSelector
@ -195,7 +196,7 @@ class ConnectionPool {
if (opts.ssl == null) opts.ssl = this._ssl
if (opts.agent == null) opts.agent = this._agent
const connection = new Connection(opts)
const connection = new this.Connection(opts)
debug('Adding a new connection', connection)
if (this.connections.has(connection.id)) {
throw new Error(`Connection with id '${connection.id}' is already present`)

View File

@ -80,8 +80,7 @@ class Transport {
// handles request timeout
params.timeout = toMs(params.requestTimeout || this.requestTimeout)
// TODO: expose nicely the node metadata (also in response an error)
this.emit('request', params, connection)
this.emit('request', connection, params)
// perform the actual http request
const request = connection.request(params, (err, response) => {
@ -105,7 +104,7 @@ class Transport {
? err
: new ConnectionError(err.message, params)
this.emit('error', error, params)
this.emit('error', error, connection, params)
return callback(error, result)
}
@ -119,6 +118,7 @@ class Transport {
if (params.asStream === true) {
result.body = response
this.emit('response', connection, params, result)
callback(null, result)
return
}
@ -142,7 +142,7 @@ class Transport {
try {
result.body = this.serializer.deserialize(payload)
} catch (err) {
this.emit('error', err, params)
this.emit('error', err, connection, params)
return callback(err, result)
}
} else {
@ -171,7 +171,7 @@ class Transport {
this.connectionPool.markAlive(connection)
}
this.emit('response', params, result)
this.emit('response', connection, params, result)
if (ignoreStatusCode === false && statusCode >= 400) {
callback(new ResponseError(result), result)
} else {
@ -206,23 +206,25 @@ class Transport {
this._isSniffing = true
debug('Started sniffing request')
this.request({
const request = {
method: 'GET',
path: this.sniffEndpoint
}, (err, body) => {
}
this.request(request, (err, result) => {
this._isSniffing = false
if (this._sniffEnabled === true) {
this._nextSniff = Date.now() + this.sniffInterval
}
if (err != null) {
this.emit('error', err)
this.emit('error', err, null, request)
debug('Sniffing errored', err)
return callback(err)
}
debug('Sniffing ended successfully', body)
const hosts = this.connectionPool.nodesToHost(body.nodes)
debug('Sniffing ended successfully', result.body)
const hosts = this.connectionPool.nodesToHost(result.body.nodes)
this.connectionPool.update(hosts)
callback(null, hosts)