Compare commits
6 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 5d9bfd6492 | |||
| d43884cc5c | |||
| f53b82f7d5 | |||
| be2a7ab544 | |||
| 6ff1db20c4 | |||
| a9e68110b5 |
@ -2,7 +2,8 @@
|
||||
|
||||
source /usr/local/bin/bash_standard_lib.sh
|
||||
|
||||
DOCKER_IMAGES="node:12-alpine
|
||||
DOCKER_IMAGES="node:14-alpine
|
||||
node:12-alpine
|
||||
node:10-alpine
|
||||
node:8-alpine
|
||||
"
|
||||
|
||||
@ -3,6 +3,7 @@ STACK_VERSION:
|
||||
- 6.8.4
|
||||
|
||||
NODE_JS_VERSION:
|
||||
- 14
|
||||
- 12
|
||||
- 10
|
||||
- 8
|
||||
|
||||
4
.github/workflows/nodejs.yml
vendored
4
.github/workflows/nodejs.yml
vendored
@ -1,6 +1,6 @@
|
||||
name: Node CI
|
||||
|
||||
on: [push]
|
||||
on: [push, pull_request]
|
||||
|
||||
jobs:
|
||||
test:
|
||||
@ -9,7 +9,7 @@ jobs:
|
||||
|
||||
strategy:
|
||||
matrix:
|
||||
node-version: [10.x, 12.x, 13.x]
|
||||
node-version: [10.x, 12.x, 14.x]
|
||||
os: [ubuntu-latest, windows-latest, macOS-latest]
|
||||
|
||||
steps:
|
||||
|
||||
@ -66,7 +66,17 @@ class ConnectionPool extends BaseConnectionPool {
|
||||
const { id } = connection
|
||||
debug(`Marking as 'dead' connection '${id}'`)
|
||||
if (this.dead.indexOf(id) === -1) {
|
||||
this.dead.push(id)
|
||||
// It might happen that `markDead` is called jsut after
|
||||
// a pool update, and in such case we will add to the dead
|
||||
// list a node that no longer exist. The following check verify
|
||||
// that the connection is still part of the pool before
|
||||
// marking it as dead.
|
||||
for (var i = 0; i < this.size; i++) {
|
||||
if (this.connections[i].id === id) {
|
||||
this.dead.push(id)
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
connection.status = Connection.statuses.DEAD
|
||||
connection.deadCount++
|
||||
|
||||
@ -4,7 +4,7 @@
|
||||
"main": "index.js",
|
||||
"types": "index.d.ts",
|
||||
"homepage": "http://www.elastic.co/guide/en/elasticsearch/client/javascript-api/current/index.html",
|
||||
"version": "6.8.6",
|
||||
"version": "6.8.7",
|
||||
"keywords": [
|
||||
"elasticsearch",
|
||||
"elastic",
|
||||
|
||||
@ -49,8 +49,7 @@ test('Should execute the recurrect API with the ping strategy', t => {
|
||||
})
|
||||
|
||||
q.add((q, done) => {
|
||||
cluster.kill('node0')
|
||||
setTimeout(done, 100)
|
||||
cluster.kill('node0', done)
|
||||
})
|
||||
|
||||
q.add((q, done) => {
|
||||
@ -173,8 +172,7 @@ test('Should execute the recurrect API with the optimistic strategy', t => {
|
||||
})
|
||||
|
||||
q.add((q, done) => {
|
||||
cluster.kill('node0')
|
||||
setTimeout(done, 100)
|
||||
cluster.kill('node0', done)
|
||||
})
|
||||
|
||||
q.add((q, done) => {
|
||||
|
||||
@ -6,6 +6,8 @@
|
||||
|
||||
const { test } = require('tap')
|
||||
const { URL } = require('url')
|
||||
const lolex = require('lolex')
|
||||
const workq = require('workq')
|
||||
const { buildCluster } = require('../utils')
|
||||
const { Client, Connection, Transport, events, errors } = require('../../index')
|
||||
|
||||
@ -111,8 +113,10 @@ test('Should handle hostnames in publish_address', t => {
|
||||
})
|
||||
})
|
||||
|
||||
test('Sniff interval', { skip: 'Flaky on CI' }, t => {
|
||||
t.plan(10)
|
||||
test('Sniff interval', t => {
|
||||
t.plan(11)
|
||||
const clock = lolex.install({ toFake: ['Date'] })
|
||||
const q = workq()
|
||||
|
||||
buildCluster(({ nodes, shutdown, kill }) => {
|
||||
const client = new Client({
|
||||
@ -132,19 +136,47 @@ test('Sniff interval', { skip: 'Flaky on CI' }, t => {
|
||||
})
|
||||
|
||||
t.strictEqual(client.connectionPool.size, 1)
|
||||
setTimeout(() => client.info(t.error), 60)
|
||||
|
||||
setTimeout(() => {
|
||||
// let's kill a node
|
||||
kill('node1')
|
||||
client.info(t.error)
|
||||
}, 150)
|
||||
q.add((q, done) => {
|
||||
clock.tick(51)
|
||||
client.info(err => {
|
||||
t.error(err)
|
||||
waitSniffEnd(() => {
|
||||
t.strictEqual(client.connectionPool.size, 4)
|
||||
done()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
setTimeout(() => {
|
||||
t.strictEqual(client.connectionPool.size, 3)
|
||||
}, 200)
|
||||
q.add((q, done) => {
|
||||
kill('node1', done)
|
||||
})
|
||||
|
||||
q.add((q, done) => {
|
||||
clock.tick(51)
|
||||
client.info(err => {
|
||||
t.error(err)
|
||||
waitSniffEnd(() => {
|
||||
t.strictEqual(client.connectionPool.size, 3)
|
||||
done()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
t.teardown(shutdown)
|
||||
|
||||
// it can happen that the sniff operation resolves
|
||||
// after the API call that trioggered it, so to
|
||||
// be sure that we are checking the connectionPool size
|
||||
// at the right moment, we verify that the transport
|
||||
// is no longer sniffing
|
||||
function waitSniffEnd (callback) {
|
||||
if (client.transport._isSniffing) {
|
||||
setTimeout(waitSniffEnd, 500, callback)
|
||||
} else {
|
||||
callback()
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
@ -74,6 +74,14 @@ test('API', t => {
|
||||
}, 10)
|
||||
})
|
||||
|
||||
t.test('markDead should ignore connections that no longer exists', t => {
|
||||
const pool = new ConnectionPool({ Connection, sniffEnabled: true })
|
||||
pool.addConnection('http://localhost:9200/')
|
||||
pool.markDead({ id: 'foo-bar' })
|
||||
t.deepEqual(pool.dead, [])
|
||||
t.end()
|
||||
})
|
||||
|
||||
t.test('markAlive', t => {
|
||||
const pool = new ConnectionPool({ Connection, sniffEnabled: true })
|
||||
const href = 'http://localhost:9200/'
|
||||
|
||||
@ -517,9 +517,8 @@ test('Retry mechanism', t => {
|
||||
if (count > 0) {
|
||||
res.end(JSON.stringify({ hello: 'world' }))
|
||||
} else {
|
||||
setTimeout(() => {
|
||||
res.end(JSON.stringify({ hello: 'world' }))
|
||||
}, 1000)
|
||||
res.statusCode = 504
|
||||
res.end(JSON.stringify({ error: true }))
|
||||
}
|
||||
count++
|
||||
}
|
||||
@ -542,7 +541,6 @@ test('Retry mechanism', t => {
|
||||
connectionPool: pool,
|
||||
serializer: new Serializer(),
|
||||
maxRetries: 1,
|
||||
requestTimeout: 250,
|
||||
sniffInterval: false,
|
||||
sniffOnStart: false
|
||||
})
|
||||
@ -558,6 +556,51 @@ test('Retry mechanism', t => {
|
||||
})
|
||||
})
|
||||
|
||||
test('Should not retry if the body is a stream', { skip: 'https://github.com/elastic/elasticsearch-js/pull/1143 has not been backported' }, t => {
|
||||
t.plan(2)
|
||||
|
||||
var count = 0
|
||||
function handler (req, res) {
|
||||
count++
|
||||
res.setHeader('Content-Type', 'application/json;utf=8')
|
||||
res.statusCode = 504
|
||||
res.end(JSON.stringify({ error: true }))
|
||||
}
|
||||
|
||||
buildServer(handler, ({ port }, server) => {
|
||||
const pool = new ConnectionPool({ Connection })
|
||||
pool.addConnection([{
|
||||
url: new URL(`http://localhost:${port}`),
|
||||
id: 'node1'
|
||||
}, {
|
||||
url: new URL(`http://localhost:${port}`),
|
||||
id: 'node2'
|
||||
}, {
|
||||
url: new URL(`http://localhost:${port}`),
|
||||
id: 'node3'
|
||||
}])
|
||||
|
||||
const transport = new Transport({
|
||||
emit: () => {},
|
||||
connectionPool: pool,
|
||||
serializer: new Serializer(),
|
||||
maxRetries: 1,
|
||||
sniffInterval: false,
|
||||
sniffOnStart: false
|
||||
})
|
||||
|
||||
transport.request({
|
||||
method: 'POST',
|
||||
path: '/hello',
|
||||
body: intoStream(JSON.stringify({ hello: 'world' }))
|
||||
}, (err, { body }) => {
|
||||
t.ok(err instanceof ResponseError)
|
||||
t.strictEqual(count, 1)
|
||||
server.stop()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
test('Custom retry mechanism', t => {
|
||||
t.plan(2)
|
||||
|
||||
@ -567,9 +610,8 @@ test('Custom retry mechanism', t => {
|
||||
if (count > 0) {
|
||||
res.end(JSON.stringify({ hello: 'world' }))
|
||||
} else {
|
||||
setTimeout(() => {
|
||||
res.end(JSON.stringify({ hello: 'world' }))
|
||||
}, 1000)
|
||||
res.statusCode = 504
|
||||
res.end(JSON.stringify({ error: true }))
|
||||
}
|
||||
count++
|
||||
}
|
||||
@ -592,7 +634,6 @@ test('Custom retry mechanism', t => {
|
||||
connectionPool: pool,
|
||||
serializer: new Serializer(),
|
||||
maxRetries: 0,
|
||||
requestTimeout: 250,
|
||||
sniffInterval: false,
|
||||
sniffOnStart: false
|
||||
})
|
||||
|
||||
@ -56,14 +56,17 @@ function buildCluster (options, callback) {
|
||||
|
||||
function shutdown () {
|
||||
debug(`Shutting down cluster '${clusterId}'`)
|
||||
Object.keys(nodes).forEach(kill)
|
||||
for (const id in nodes) {
|
||||
kill(id)
|
||||
}
|
||||
}
|
||||
|
||||
function kill (id) {
|
||||
function kill (id, callback) {
|
||||
debug(`Shutting down cluster node '${id}' (cluster id: '${clusterId}')`)
|
||||
nodes[id].server.stop()
|
||||
const node = nodes[id]
|
||||
delete nodes[id]
|
||||
delete sniffResult.nodes[id]
|
||||
node.server.stop(callback)
|
||||
}
|
||||
|
||||
function spawn (id, callback) {
|
||||
|
||||
Reference in New Issue
Block a user