committed by
delvedor
parent
a9e68110b5
commit
6ff1db20c4
@ -49,8 +49,7 @@ test('Should execute the recurrect API with the ping strategy', t => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
q.add((q, done) => {
|
q.add((q, done) => {
|
||||||
cluster.kill('node0')
|
cluster.kill('node0', done)
|
||||||
setTimeout(done, 100)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
q.add((q, done) => {
|
q.add((q, done) => {
|
||||||
@ -173,8 +172,7 @@ test('Should execute the recurrect API with the optimistic strategy', t => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
q.add((q, done) => {
|
q.add((q, done) => {
|
||||||
cluster.kill('node0')
|
cluster.kill('node0', done)
|
||||||
setTimeout(done, 100)
|
|
||||||
})
|
})
|
||||||
|
|
||||||
q.add((q, done) => {
|
q.add((q, done) => {
|
||||||
|
|||||||
@ -6,6 +6,8 @@
|
|||||||
|
|
||||||
const { test } = require('tap')
|
const { test } = require('tap')
|
||||||
const { URL } = require('url')
|
const { URL } = require('url')
|
||||||
|
const lolex = require('lolex')
|
||||||
|
const workq = require('workq')
|
||||||
const { buildCluster } = require('../utils')
|
const { buildCluster } = require('../utils')
|
||||||
const { Client, Connection, Transport, events, errors } = require('../../index')
|
const { Client, Connection, Transport, events, errors } = require('../../index')
|
||||||
|
|
||||||
@ -111,8 +113,10 @@ test('Should handle hostnames in publish_address', t => {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
test('Sniff interval', { skip: 'Flaky on CI' }, t => {
|
test('Sniff interval', t => {
|
||||||
t.plan(10)
|
t.plan(11)
|
||||||
|
const clock = lolex.install({ toFake: ['Date'] })
|
||||||
|
const q = workq()
|
||||||
|
|
||||||
buildCluster(({ nodes, shutdown, kill }) => {
|
buildCluster(({ nodes, shutdown, kill }) => {
|
||||||
const client = new Client({
|
const client = new Client({
|
||||||
@ -132,19 +136,47 @@ test('Sniff interval', { skip: 'Flaky on CI' }, t => {
|
|||||||
})
|
})
|
||||||
|
|
||||||
t.strictEqual(client.connectionPool.size, 1)
|
t.strictEqual(client.connectionPool.size, 1)
|
||||||
setTimeout(() => client.info(t.error), 60)
|
|
||||||
|
|
||||||
setTimeout(() => {
|
q.add((q, done) => {
|
||||||
// let's kill a node
|
clock.tick(51)
|
||||||
kill('node1')
|
client.info(err => {
|
||||||
client.info(t.error)
|
t.error(err)
|
||||||
}, 150)
|
waitSniffEnd(() => {
|
||||||
|
t.strictEqual(client.connectionPool.size, 4)
|
||||||
|
done()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
setTimeout(() => {
|
q.add((q, done) => {
|
||||||
t.strictEqual(client.connectionPool.size, 3)
|
kill('node1', done)
|
||||||
}, 200)
|
})
|
||||||
|
|
||||||
|
q.add((q, done) => {
|
||||||
|
clock.tick(51)
|
||||||
|
client.info(err => {
|
||||||
|
t.error(err)
|
||||||
|
waitSniffEnd(() => {
|
||||||
|
t.strictEqual(client.connectionPool.size, 3)
|
||||||
|
done()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
t.teardown(shutdown)
|
t.teardown(shutdown)
|
||||||
|
|
||||||
|
// it can happen that the sniff operation resolves
|
||||||
|
// after the API call that trioggered it, so to
|
||||||
|
// be sure that we are checking the connectionPool size
|
||||||
|
// at the right moment, we verify that the transport
|
||||||
|
// is no longer sniffing
|
||||||
|
function waitSniffEnd (callback) {
|
||||||
|
if (client.transport._isSniffing) {
|
||||||
|
setTimeout(waitSniffEnd, 500, callback)
|
||||||
|
} else {
|
||||||
|
callback()
|
||||||
|
}
|
||||||
|
}
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|||||||
@ -517,9 +517,8 @@ test('Retry mechanism', t => {
|
|||||||
if (count > 0) {
|
if (count > 0) {
|
||||||
res.end(JSON.stringify({ hello: 'world' }))
|
res.end(JSON.stringify({ hello: 'world' }))
|
||||||
} else {
|
} else {
|
||||||
setTimeout(() => {
|
res.statusCode = 504
|
||||||
res.end(JSON.stringify({ hello: 'world' }))
|
res.end(JSON.stringify({ error: true }))
|
||||||
}, 1000)
|
|
||||||
}
|
}
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
@ -542,7 +541,6 @@ test('Retry mechanism', t => {
|
|||||||
connectionPool: pool,
|
connectionPool: pool,
|
||||||
serializer: new Serializer(),
|
serializer: new Serializer(),
|
||||||
maxRetries: 1,
|
maxRetries: 1,
|
||||||
requestTimeout: 250,
|
|
||||||
sniffInterval: false,
|
sniffInterval: false,
|
||||||
sniffOnStart: false
|
sniffOnStart: false
|
||||||
})
|
})
|
||||||
@ -558,6 +556,51 @@ test('Retry mechanism', t => {
|
|||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
test('Should not retry if the body is a stream', t => {
|
||||||
|
t.plan(2)
|
||||||
|
|
||||||
|
var count = 0
|
||||||
|
function handler (req, res) {
|
||||||
|
count++
|
||||||
|
res.setHeader('Content-Type', 'application/json;utf=8')
|
||||||
|
res.statusCode = 504
|
||||||
|
res.end(JSON.stringify({ error: true }))
|
||||||
|
}
|
||||||
|
|
||||||
|
buildServer(handler, ({ port }, server) => {
|
||||||
|
const pool = new ConnectionPool({ Connection })
|
||||||
|
pool.addConnection([{
|
||||||
|
url: new URL(`http://localhost:${port}`),
|
||||||
|
id: 'node1'
|
||||||
|
}, {
|
||||||
|
url: new URL(`http://localhost:${port}`),
|
||||||
|
id: 'node2'
|
||||||
|
}, {
|
||||||
|
url: new URL(`http://localhost:${port}`),
|
||||||
|
id: 'node3'
|
||||||
|
}])
|
||||||
|
|
||||||
|
const transport = new Transport({
|
||||||
|
emit: () => {},
|
||||||
|
connectionPool: pool,
|
||||||
|
serializer: new Serializer(),
|
||||||
|
maxRetries: 1,
|
||||||
|
sniffInterval: false,
|
||||||
|
sniffOnStart: false
|
||||||
|
})
|
||||||
|
|
||||||
|
transport.request({
|
||||||
|
method: 'POST',
|
||||||
|
path: '/hello',
|
||||||
|
body: intoStream(JSON.stringify({ hello: 'world' }))
|
||||||
|
}, (err, { body }) => {
|
||||||
|
t.ok(err instanceof ResponseError)
|
||||||
|
t.strictEqual(count, 1)
|
||||||
|
server.stop()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
test('Custom retry mechanism', t => {
|
test('Custom retry mechanism', t => {
|
||||||
t.plan(2)
|
t.plan(2)
|
||||||
|
|
||||||
@ -567,9 +610,8 @@ test('Custom retry mechanism', t => {
|
|||||||
if (count > 0) {
|
if (count > 0) {
|
||||||
res.end(JSON.stringify({ hello: 'world' }))
|
res.end(JSON.stringify({ hello: 'world' }))
|
||||||
} else {
|
} else {
|
||||||
setTimeout(() => {
|
res.statusCode = 504
|
||||||
res.end(JSON.stringify({ hello: 'world' }))
|
res.end(JSON.stringify({ error: true }))
|
||||||
}, 1000)
|
|
||||||
}
|
}
|
||||||
count++
|
count++
|
||||||
}
|
}
|
||||||
@ -592,7 +634,6 @@ test('Custom retry mechanism', t => {
|
|||||||
connectionPool: pool,
|
connectionPool: pool,
|
||||||
serializer: new Serializer(),
|
serializer: new Serializer(),
|
||||||
maxRetries: 0,
|
maxRetries: 0,
|
||||||
requestTimeout: 250,
|
|
||||||
sniffInterval: false,
|
sniffInterval: false,
|
||||||
sniffOnStart: false
|
sniffOnStart: false
|
||||||
})
|
})
|
||||||
|
|||||||
@ -56,14 +56,17 @@ function buildCluster (options, callback) {
|
|||||||
|
|
||||||
function shutdown () {
|
function shutdown () {
|
||||||
debug(`Shutting down cluster '${clusterId}'`)
|
debug(`Shutting down cluster '${clusterId}'`)
|
||||||
Object.keys(nodes).forEach(kill)
|
for (const id in nodes) {
|
||||||
|
kill(id)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function kill (id) {
|
function kill (id, callback) {
|
||||||
debug(`Shutting down cluster node '${id}' (cluster id: '${clusterId}')`)
|
debug(`Shutting down cluster node '${id}' (cluster id: '${clusterId}')`)
|
||||||
nodes[id].server.stop()
|
const node = nodes[id]
|
||||||
delete nodes[id]
|
delete nodes[id]
|
||||||
delete sniffResult.nodes[id]
|
delete sniffResult.nodes[id]
|
||||||
|
node.server.stop(callback)
|
||||||
}
|
}
|
||||||
|
|
||||||
function spawn (id, callback) {
|
function spawn (id, callback) {
|
||||||
|
|||||||
Reference in New Issue
Block a user