Updated test
This commit is contained in:
@ -6,21 +6,26 @@ const { TimeoutError } = require('../../lib/errors')
|
||||
const { connection: { MockConnection, MockConnectionTimeout } } = require('../utils')
|
||||
|
||||
test('Should emit a request event when a request is performed', t => {
|
||||
t.plan(3)
|
||||
t.plan(2)
|
||||
|
||||
const client = new Client({
|
||||
node: 'http://localhost:9200',
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
client.on(events.REQUEST, (connection, request) => {
|
||||
t.match(connection, {
|
||||
id: 'http://localhost:9200'
|
||||
})
|
||||
t.match(request, {
|
||||
method: 'GET',
|
||||
path: '/test/doc/_search',
|
||||
querystring: 'q=foo%3Abar'
|
||||
client.on(events.REQUEST, meta => {
|
||||
t.match(meta, {
|
||||
connection: {
|
||||
id: 'http://localhost:9200'
|
||||
},
|
||||
request: {
|
||||
method: 'GET',
|
||||
path: '/test/doc/_search',
|
||||
querystring: 'q=foo%3Abar'
|
||||
},
|
||||
response: null,
|
||||
attempts: 0,
|
||||
aborted: false
|
||||
})
|
||||
})
|
||||
|
||||
@ -34,30 +39,34 @@ test('Should emit a request event when a request is performed', t => {
|
||||
})
|
||||
|
||||
test('Should emit a response event in case of a successful response', t => {
|
||||
t.plan(4)
|
||||
t.plan(2)
|
||||
|
||||
const client = new Client({
|
||||
node: 'http://localhost:9200',
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
client.on(events.RESPONSE, (connection, request, response) => {
|
||||
t.match(connection, {
|
||||
id: 'http://localhost:9200'
|
||||
})
|
||||
t.match(request, {
|
||||
method: 'GET',
|
||||
path: '/test/doc/_search',
|
||||
querystring: 'q=foo%3Abar'
|
||||
})
|
||||
t.match(response, {
|
||||
body: { hello: 'world' },
|
||||
statusCode: 200,
|
||||
headers: {
|
||||
'content-type': 'application/json;utf=8',
|
||||
'connection': 'keep-alive'
|
||||
client.on(events.RESPONSE, meta => {
|
||||
t.match(meta, {
|
||||
connection: {
|
||||
id: 'http://localhost:9200'
|
||||
},
|
||||
warnings: null
|
||||
request: {
|
||||
method: 'GET',
|
||||
path: '/test/doc/_search',
|
||||
querystring: 'q=foo%3Abar'
|
||||
},
|
||||
response: {
|
||||
body: { hello: 'world' },
|
||||
statusCode: 200,
|
||||
headers: {
|
||||
'content-type': 'application/json;utf=8',
|
||||
'connection': 'keep-alive'
|
||||
},
|
||||
warnings: null
|
||||
},
|
||||
attempts: 0,
|
||||
aborted: false
|
||||
})
|
||||
})
|
||||
|
||||
@ -71,7 +80,7 @@ test('Should emit a response event in case of a successful response', t => {
|
||||
})
|
||||
|
||||
test('Should emit an error event in case of a failing response', t => {
|
||||
t.plan(4)
|
||||
t.plan(3)
|
||||
|
||||
const client = new Client({
|
||||
node: 'http://localhost:9200',
|
||||
@ -79,19 +88,24 @@ test('Should emit an error event in case of a failing response', t => {
|
||||
maxRetries: 0
|
||||
})
|
||||
|
||||
client.on(events.RESPONSE, (connection, request, response) => {
|
||||
client.on(events.RESPONSE, ({ connection, request, response }) => {
|
||||
t.fail('This should not be called')
|
||||
})
|
||||
|
||||
client.on(events.ERROR, (error, connection, request) => {
|
||||
client.on(events.ERROR, (error, meta) => {
|
||||
t.ok(error instanceof TimeoutError)
|
||||
t.match(connection, {
|
||||
id: 'http://localhost:9200'
|
||||
})
|
||||
t.match(request, {
|
||||
method: 'GET',
|
||||
path: '/test/doc/_search',
|
||||
querystring: 'q=foo%3Abar'
|
||||
t.match(meta, {
|
||||
connection: {
|
||||
id: 'http://localhost:9200'
|
||||
},
|
||||
request: {
|
||||
method: 'GET',
|
||||
path: '/test/doc/_search',
|
||||
querystring: 'q=foo%3Abar'
|
||||
},
|
||||
response: null,
|
||||
attempts: 0,
|
||||
aborted: false
|
||||
})
|
||||
})
|
||||
|
||||
|
||||
@ -74,6 +74,20 @@ test('qserialize (array)', t => {
|
||||
)
|
||||
})
|
||||
|
||||
test('qserialize (string)', t => {
|
||||
t.plan(1)
|
||||
const s = new Serializer()
|
||||
const obj = {
|
||||
hello: 'world',
|
||||
you_know: 'for search'
|
||||
}
|
||||
|
||||
t.strictEqual(
|
||||
s.qserialize(stringify(obj)),
|
||||
stringify(obj)
|
||||
)
|
||||
})
|
||||
|
||||
test('SerializationError', t => {
|
||||
t.plan(1)
|
||||
const s = new Serializer()
|
||||
|
||||
137
test/unit/sniff.test.js
Normal file
137
test/unit/sniff.test.js
Normal file
@ -0,0 +1,137 @@
|
||||
'use strict'
|
||||
|
||||
const { test } = require('tap')
|
||||
const { URL } = require('url')
|
||||
const { buildCluster } = require('../utils')
|
||||
const { Client, Connection, events, errors } = require('../../index')
|
||||
|
||||
test('Should update the connection pool', t => {
|
||||
t.plan(8)
|
||||
|
||||
buildCluster(({ nodes, shutdown }) => {
|
||||
const client = new Client({
|
||||
node: nodes[Object.keys(nodes)[0]].url
|
||||
})
|
||||
t.strictEqual(client.connectionPool.connections.size, 1)
|
||||
|
||||
// run the sniffer
|
||||
client.transport.sniff((err, hosts) => {
|
||||
t.error(err)
|
||||
t.strictEqual(hosts.length, 4)
|
||||
|
||||
const ids = Object.keys(nodes)
|
||||
for (var i = 0; i < hosts.length; i++) {
|
||||
const id = ids[i]
|
||||
t.deepEqual(hosts[i], {
|
||||
url: new URL(nodes[id].url),
|
||||
id: id,
|
||||
roles: {
|
||||
master: true,
|
||||
data: true,
|
||||
ingest: true
|
||||
},
|
||||
ssl: null,
|
||||
agent: null
|
||||
})
|
||||
}
|
||||
|
||||
t.strictEqual(client.connectionPool.connections.size, 4)
|
||||
shutdown()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
test('Sniff interval', t => {
|
||||
t.plan(8)
|
||||
|
||||
buildCluster(({ nodes, shutdown, kill }) => {
|
||||
const client = new Client({
|
||||
node: nodes[Object.keys(nodes)[0]].url,
|
||||
sniffInterval: 50
|
||||
})
|
||||
|
||||
// this event will be triggered by api calls
|
||||
client.on(events.SNIFF, (err, hosts) => {
|
||||
t.error(err)
|
||||
t.strictEqual(
|
||||
client.connectionPool.connections.size,
|
||||
hosts.length
|
||||
)
|
||||
})
|
||||
|
||||
t.strictEqual(client.connectionPool.connections.size, 1)
|
||||
setTimeout(() => client.info(t.error), 60)
|
||||
|
||||
setTimeout(() => {
|
||||
// let's kill a node
|
||||
kill('node1')
|
||||
client.info(t.error)
|
||||
}, 150)
|
||||
|
||||
setTimeout(() => {
|
||||
t.strictEqual(client.connectionPool.connections.size, 3)
|
||||
shutdown()
|
||||
}, 200)
|
||||
})
|
||||
})
|
||||
|
||||
test('Should not close living connections', t => {
|
||||
t.plan(3)
|
||||
|
||||
buildCluster(({ nodes, shutdown, kill }) => {
|
||||
class MyConnection extends Connection {
|
||||
close () {
|
||||
t.fail('Should not be called')
|
||||
}
|
||||
}
|
||||
|
||||
const client = new Client({
|
||||
node: {
|
||||
url: new URL(nodes[Object.keys(nodes)[0]].url),
|
||||
id: 'node1'
|
||||
},
|
||||
Connection: MyConnection
|
||||
})
|
||||
|
||||
t.strictEqual(client.connectionPool.connections.size, 1)
|
||||
client.transport.sniff((err, hosts) => {
|
||||
t.error(err)
|
||||
t.strictEqual(
|
||||
client.connectionPool.connections.size,
|
||||
hosts.length
|
||||
)
|
||||
shutdown()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
test('Sniff on connection fault', t => {
|
||||
t.plan(4)
|
||||
|
||||
buildCluster(({ nodes, shutdown }) => {
|
||||
const client = new Client({
|
||||
nodes: [
|
||||
// TODO: this url may cause a flaky test
|
||||
'http://localhost:9200',
|
||||
nodes[Object.keys(nodes)[0]].url
|
||||
],
|
||||
maxRetries: 0,
|
||||
sniffOnConnectionFault: true
|
||||
})
|
||||
t.strictEqual(client.connectionPool.connections.size, 2)
|
||||
|
||||
// this event will be triggered by the connection fault
|
||||
client.on(events.SNIFF, (err, hosts) => {
|
||||
t.error(err)
|
||||
t.strictEqual(
|
||||
client.connectionPool.connections.size,
|
||||
hosts.length
|
||||
)
|
||||
shutdown()
|
||||
})
|
||||
|
||||
client.info((err, result) => {
|
||||
t.ok(err instanceof errors.ConnectionError)
|
||||
})
|
||||
})
|
||||
})
|
||||
@ -468,6 +468,57 @@ test('Retry mechanism', t => {
|
||||
})
|
||||
})
|
||||
|
||||
test('Custom retry mechanism', t => {
|
||||
t.plan(2)
|
||||
|
||||
var count = 0
|
||||
function handler (req, res) {
|
||||
res.setHeader('Content-Type', 'application/json;utf=8')
|
||||
if (count > 0) {
|
||||
res.end(JSON.stringify({ hello: 'world' }))
|
||||
} else {
|
||||
setTimeout(() => {
|
||||
res.end(JSON.stringify({ hello: 'world' }))
|
||||
}, 1000)
|
||||
}
|
||||
count++
|
||||
}
|
||||
|
||||
buildServer(handler, ({ port }, server) => {
|
||||
const pool = new ConnectionPool({ Connection })
|
||||
pool.addConnection([{
|
||||
url: new URL(`http://localhost:${port}`),
|
||||
id: 'node1'
|
||||
}, {
|
||||
url: new URL(`http://localhost:${port}`),
|
||||
id: 'node2'
|
||||
}, {
|
||||
url: new URL(`http://localhost:${port}`),
|
||||
id: 'node3'
|
||||
}])
|
||||
|
||||
const transport = new Transport({
|
||||
emit: () => {},
|
||||
connectionPool: pool,
|
||||
serializer: new Serializer(),
|
||||
maxRetries: 0,
|
||||
requestTimeout: 250,
|
||||
sniffInterval: false,
|
||||
sniffOnStart: false
|
||||
})
|
||||
|
||||
transport.request({
|
||||
method: 'GET',
|
||||
path: '/hello',
|
||||
maxRetries: 1
|
||||
}, (err, { body }) => {
|
||||
t.error(err)
|
||||
t.deepEqual(body, { hello: 'world' })
|
||||
server.stop()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
test('Should call markAlive with a successful response', t => {
|
||||
t.plan(3)
|
||||
|
||||
@ -567,6 +618,55 @@ test('Should return a request aborter utility', t => {
|
||||
t.pass('ok')
|
||||
})
|
||||
|
||||
test('Retry mechanism and abort', t => {
|
||||
t.plan(1)
|
||||
|
||||
function handler (req, res) {
|
||||
setTimeout(() => {
|
||||
res.setHeader('Content-Type', 'application/json;utf=8')
|
||||
res.end(JSON.stringify({ hello: 'world' }))
|
||||
}, 1000)
|
||||
}
|
||||
|
||||
buildServer(handler, ({ port }, server) => {
|
||||
const pool = new ConnectionPool({ Connection })
|
||||
pool.addConnection([{
|
||||
url: new URL(`http://localhost:${port}`),
|
||||
id: 'node1'
|
||||
}, {
|
||||
url: new URL(`http://localhost:${port}`),
|
||||
id: 'node2'
|
||||
}, {
|
||||
url: new URL(`http://localhost:${port}`),
|
||||
id: 'node3'
|
||||
}])
|
||||
|
||||
var count = 0
|
||||
const transport = new Transport({
|
||||
emit: event => {
|
||||
if (event === 'request' && count++ > 0) {
|
||||
request.abort()
|
||||
server.stop()
|
||||
t.pass('ok')
|
||||
}
|
||||
},
|
||||
connectionPool: pool,
|
||||
serializer: new Serializer(),
|
||||
maxRetries: 2,
|
||||
requestTimeout: 100,
|
||||
sniffInterval: false,
|
||||
sniffOnStart: false
|
||||
})
|
||||
|
||||
const request = transport.request({
|
||||
method: 'GET',
|
||||
path: '/hello'
|
||||
}, (e, { body }) => {
|
||||
t.fail('Should not be called')
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
test('ResponseError', t => {
|
||||
t.plan(3)
|
||||
|
||||
|
||||
79
test/utils/buildCluster.js
Normal file
79
test/utils/buildCluster.js
Normal file
@ -0,0 +1,79 @@
|
||||
'use strict'
|
||||
|
||||
const workq = require('workq')
|
||||
const buildServer = require('./buildServer')
|
||||
|
||||
function buildCluster (opts, callback) {
|
||||
if (typeof opts === 'function') {
|
||||
callback = opts
|
||||
opts = {}
|
||||
}
|
||||
|
||||
const q = workq()
|
||||
const nodes = {}
|
||||
const sniffResult = { nodes: {} }
|
||||
|
||||
opts.numberOfNodes = opts.numberOfNodes || 4
|
||||
for (var i = 0; i < opts.numberOfNodes; i++) {
|
||||
q.add(bootNode, { id: `node${i}` })
|
||||
}
|
||||
|
||||
function bootNode (q, opts, done) {
|
||||
function handler (req, res) {
|
||||
res.setHeader('content-type', 'application/json')
|
||||
if (req.url === '/_nodes/_all/http') {
|
||||
res.end(JSON.stringify(sniffResult))
|
||||
} else {
|
||||
res.end(JSON.stringify({ hello: 'world' }))
|
||||
}
|
||||
}
|
||||
|
||||
buildServer(handler, ({ port }, server) => {
|
||||
nodes[opts.id] = {
|
||||
url: `http://localhost:${port}`,
|
||||
server
|
||||
}
|
||||
sniffResult.nodes[opts.id] = {
|
||||
http: {
|
||||
publish_address: `http://localhost:${port}`
|
||||
},
|
||||
roles: ['master', 'data', 'ingest']
|
||||
}
|
||||
done()
|
||||
})
|
||||
}
|
||||
|
||||
function shutdown () {
|
||||
Object.keys(nodes).forEach(id => {
|
||||
nodes[id].server.stop()
|
||||
})
|
||||
}
|
||||
|
||||
function kill (id) {
|
||||
nodes[id].server.stop()
|
||||
delete nodes[id]
|
||||
delete sniffResult.nodes[id]
|
||||
}
|
||||
|
||||
function spawn (id, callback) {
|
||||
q.add(bootNode, { id })
|
||||
q.add((q, done) => {
|
||||
callback()
|
||||
done()
|
||||
})
|
||||
}
|
||||
|
||||
const cluster = {
|
||||
nodes,
|
||||
shutdown,
|
||||
kill,
|
||||
spawn
|
||||
}
|
||||
|
||||
q.drain(done => {
|
||||
callback(cluster)
|
||||
done()
|
||||
})
|
||||
}
|
||||
|
||||
module.exports = buildCluster
|
||||
@ -1,9 +1,11 @@
|
||||
'use strict'
|
||||
|
||||
const buildServer = require('./buildServer')
|
||||
const buildCluster = require('./buildCluster')
|
||||
const connection = require('./MockConnection')
|
||||
|
||||
module.exports = {
|
||||
buildServer,
|
||||
buildCluster,
|
||||
connection
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user