Updated Connections handling (#1127)
* Updated Connections handling - The ConnectionPool.getConnection method now always returns a connection - The ConnectionPool.update mehtod now cleans the dead list - Deprecated the NoLivingConnectionsError * Updated test * Updated docs * The NoLivingConnectionsError can still happen if the filter/selector returns no nodes * Updated test * Updated docs * Catch undefined connections as well * Updated test * Updated ApiError type def
This commit is contained in:
committed by
GitHub
parent
85616b07ef
commit
f913f7d2d2
@ -204,7 +204,7 @@ You can find the errors exported by the client in the table below.
|
||||
|
||||
[cols=2*]
|
||||
|===
|
||||
|`ElasticsearchClientErrors`
|
||||
|`ElasticsearchClientError`
|
||||
|Every error inherits from this class, it is the basic error generated by the client.
|
||||
|
||||
|`TimeoutError`
|
||||
@ -214,7 +214,7 @@ You can find the errors exported by the client in the table below.
|
||||
|Generated when an error occurs during the request, it can be a connection error or a malformed stream of data.
|
||||
|
||||
|`NoLivingConnectionsError`
|
||||
|Generated in case of all connections present in the connection pool are dead.
|
||||
|Given the configuration, the ConnectionPool was not able to find a usable Connection for this request.
|
||||
|
||||
|`SerializationError`
|
||||
|Generated if the serialization fails.
|
||||
|
||||
@ -109,10 +109,9 @@ class Transport {
|
||||
const makeRequest = () => {
|
||||
if (meta.aborted === true) return
|
||||
meta.connection = this.getConnection({ requestId: meta.request.id })
|
||||
if (meta.connection === null) {
|
||||
return callback(new NoLivingConnectionsError('There are no living connections'), result)
|
||||
if (meta.connection == null) {
|
||||
return callback(new NoLivingConnectionsError(), result)
|
||||
}
|
||||
|
||||
// TODO: make this assignment FAST
|
||||
const headers = Object.assign({}, this.headers, options.headers)
|
||||
|
||||
|
||||
@ -36,7 +36,7 @@ class NoLivingConnectionsError extends ElasticsearchClientError {
|
||||
super(message)
|
||||
Error.captureStackTrace(this, NoLivingConnectionsError)
|
||||
this.name = 'NoLivingConnectionsError'
|
||||
this.message = message || 'No Living Connections Error'
|
||||
this.message = message || 'Given the configuration, the ConnectionPool was not able to find a usable Connection for this request.'
|
||||
this.meta = meta
|
||||
}
|
||||
}
|
||||
|
||||
@ -35,13 +35,10 @@ class ConnectionPool extends BaseConnectionPool {
|
||||
* Marks a connection as 'alive'.
|
||||
* If needed removes the connection from the dead list
|
||||
* and then resets the `deadCount`.
|
||||
* If sniffing is not enabled and there is only
|
||||
* one node, this method is a noop.
|
||||
*
|
||||
* @param {object} connection
|
||||
*/
|
||||
markAlive (connection) {
|
||||
if (this._sniffEnabled === false && this.size === 1) return this
|
||||
const { id } = connection
|
||||
debug(`Marking as 'alive' connection '${id}'`)
|
||||
const index = this.dead.indexOf(id)
|
||||
@ -56,13 +53,10 @@ class ConnectionPool extends BaseConnectionPool {
|
||||
* Marks a connection as 'dead'.
|
||||
* If needed adds the connection to the dead list
|
||||
* and then increments the `deadCount`.
|
||||
* If sniffing is not enabled and there is only
|
||||
* one node, this method is a noop.
|
||||
*
|
||||
* @param {object} connection
|
||||
*/
|
||||
markDead (connection) {
|
||||
if (this._sniffEnabled === false && this.size === 1) return this
|
||||
const { id } = connection
|
||||
debug(`Marking as 'dead' connection '${id}'`)
|
||||
if (this.dead.indexOf(id) === -1) {
|
||||
@ -158,7 +152,7 @@ class ConnectionPool extends BaseConnectionPool {
|
||||
|
||||
/**
|
||||
* Returns an alive connection if present,
|
||||
* otherwise returns null.
|
||||
* otherwise returns a dead connection.
|
||||
* By default it filters the `master` only nodes.
|
||||
* It uses the selector to choose which
|
||||
* connection return.
|
||||
@ -176,11 +170,13 @@ class ConnectionPool extends BaseConnectionPool {
|
||||
name: opts.name
|
||||
})
|
||||
|
||||
const noAliveConnections = this.size === this.dead.length
|
||||
|
||||
// TODO: can we cache this?
|
||||
const connections = []
|
||||
for (var i = 0; i < this.size; i++) {
|
||||
const connection = this.connections[i]
|
||||
if (connection.status === Connection.statuses.ALIVE) {
|
||||
if (noAliveConnections || connection.status === Connection.statuses.ALIVE) {
|
||||
if (filter(connection) === true) {
|
||||
connections.push(connection)
|
||||
}
|
||||
@ -212,13 +208,7 @@ class ConnectionPool extends BaseConnectionPool {
|
||||
*/
|
||||
update (connections) {
|
||||
super.update(connections)
|
||||
|
||||
for (var i = 0; i < this.dead.length; i++) {
|
||||
if (this.connections.find(c => c.id === this.dead[i]) === undefined) {
|
||||
this.dead.splice(i, 1)
|
||||
}
|
||||
}
|
||||
|
||||
this.dead = []
|
||||
return this
|
||||
}
|
||||
}
|
||||
|
||||
2
lib/pool/index.d.ts
vendored
2
lib/pool/index.d.ts
vendored
@ -89,7 +89,7 @@ declare class BaseConnectionPool {
|
||||
markDead(connection: Connection): this;
|
||||
/**
|
||||
* Returns an alive connection if present,
|
||||
* otherwise returns null.
|
||||
* otherwise returns a dead connection.
|
||||
* By default it filters the `master` only nodes.
|
||||
* It uses the selector to choose which
|
||||
* connection return.
|
||||
|
||||
@ -249,6 +249,20 @@ test('API', t => {
|
||||
pool.getConnection({ filter })
|
||||
})
|
||||
|
||||
t.test('If all connections are marked as dead, getConnection should return a dead connection', t => {
|
||||
const pool = new ConnectionPool({ Connection })
|
||||
const href1 = 'http://localhost:9200/'
|
||||
const href2 = 'http://localhost:9200/other'
|
||||
const conn1 = pool.addConnection(href1)
|
||||
const conn2 = pool.addConnection(href2)
|
||||
pool.markDead(conn1)
|
||||
pool.markDead(conn2)
|
||||
const conn = pool.getConnection()
|
||||
t.ok(conn instanceof Connection)
|
||||
t.is(conn.status, 'dead')
|
||||
t.end()
|
||||
})
|
||||
|
||||
t.end()
|
||||
})
|
||||
|
||||
@ -762,27 +776,3 @@ test('Node filter', t => {
|
||||
|
||||
t.end()
|
||||
})
|
||||
|
||||
test('Single node behavior', t => {
|
||||
t.test('sniffing disabled (markDead and markAlive should be noop)', t => {
|
||||
t.plan(4)
|
||||
const pool = new ConnectionPool({ Connection, sniffEnabled: false })
|
||||
const conn = pool.addConnection('http://localhost:9200/')
|
||||
t.true(pool.markDead(conn) instanceof ConnectionPool)
|
||||
t.strictEqual(pool.dead.length, 0)
|
||||
t.true(pool.markAlive(conn) instanceof ConnectionPool)
|
||||
t.strictEqual(pool.dead.length, 0)
|
||||
})
|
||||
|
||||
t.test('sniffing enabled (markDead and markAlive should work)', t => {
|
||||
t.plan(4)
|
||||
const pool = new ConnectionPool({ Connection, sniffEnabled: true })
|
||||
const conn = pool.addConnection('http://localhost:9200/')
|
||||
t.true(pool.markDead(conn) instanceof ConnectionPool)
|
||||
t.strictEqual(pool.dead.length, 1)
|
||||
t.true(pool.markAlive(conn) instanceof ConnectionPool)
|
||||
t.strictEqual(pool.dead.length, 0)
|
||||
})
|
||||
|
||||
t.end()
|
||||
})
|
||||
|
||||
@ -335,9 +335,10 @@ test('Not JSON payload from server', t => {
|
||||
})
|
||||
})
|
||||
|
||||
test('NoLivingConnectionsError', t => {
|
||||
t.plan(1)
|
||||
test('NoLivingConnectionsError (null connection)', t => {
|
||||
t.plan(3)
|
||||
const pool = new ConnectionPool({ Connection })
|
||||
pool.addConnection('http://localhost:9200')
|
||||
|
||||
const transport = new Transport({
|
||||
emit: () => {},
|
||||
@ -346,7 +347,40 @@ test('NoLivingConnectionsError', t => {
|
||||
maxRetries: 3,
|
||||
requestTimeout: 30000,
|
||||
sniffInterval: false,
|
||||
sniffOnStart: false
|
||||
sniffOnStart: false,
|
||||
nodeSelector (connections) {
|
||||
t.is(connections.length, 1)
|
||||
t.true(connections[0] instanceof Connection)
|
||||
return null
|
||||
}
|
||||
})
|
||||
|
||||
transport.request({
|
||||
method: 'GET',
|
||||
path: '/hello'
|
||||
}, (err, { body }) => {
|
||||
t.ok(err instanceof NoLivingConnectionsError)
|
||||
})
|
||||
})
|
||||
|
||||
test('NoLivingConnectionsError (undefined connection)', t => {
|
||||
t.plan(3)
|
||||
const pool = new ConnectionPool({ Connection })
|
||||
pool.addConnection('http://localhost:9200')
|
||||
|
||||
const transport = new Transport({
|
||||
emit: () => {},
|
||||
connectionPool: pool,
|
||||
serializer: new Serializer(),
|
||||
maxRetries: 3,
|
||||
requestTimeout: 30000,
|
||||
sniffInterval: false,
|
||||
sniffOnStart: false,
|
||||
nodeSelector (connections) {
|
||||
t.is(connections.length, 1)
|
||||
t.true(connections[0] instanceof Connection)
|
||||
return undefined
|
||||
}
|
||||
})
|
||||
|
||||
transport.request({
|
||||
|
||||
Reference in New Issue
Block a user