Support for Elasticsearch 7.7 (#1192)
This commit is contained in:
committed by
GitHub
parent
be6257380e
commit
51169d5efa
@ -21,7 +21,8 @@ const {
|
||||
TimeoutError,
|
||||
ResponseError,
|
||||
ConnectionError,
|
||||
ConfigurationError
|
||||
ConfigurationError,
|
||||
RequestAbortedError
|
||||
} = require('../../lib/errors')
|
||||
|
||||
const ConnectionPool = require('../../lib/pool/ConnectionPool')
|
||||
@ -88,6 +89,32 @@ test('Basic (promises support)', t => {
|
||||
.catch(t.fail)
|
||||
})
|
||||
|
||||
test('Basic - failing (promises support)', t => {
|
||||
t.plan(1)
|
||||
|
||||
const pool = new ConnectionPool({ Connection: MockConnectionTimeout })
|
||||
pool.addConnection('http://localhost:9200')
|
||||
|
||||
const transport = new Transport({
|
||||
emit: () => {},
|
||||
connectionPool: pool,
|
||||
serializer: new Serializer(),
|
||||
maxRetries: 3,
|
||||
requestTimeout: 30000,
|
||||
sniffInterval: false,
|
||||
sniffOnStart: false
|
||||
})
|
||||
|
||||
transport
|
||||
.request({
|
||||
method: 'GET',
|
||||
path: '/hello'
|
||||
})
|
||||
.catch(err => {
|
||||
t.ok(err instanceof TimeoutError)
|
||||
})
|
||||
})
|
||||
|
||||
test('Basic (options + promises support)', t => {
|
||||
t.plan(1)
|
||||
|
||||
@ -335,9 +362,10 @@ test('Not JSON payload from server', t => {
|
||||
})
|
||||
})
|
||||
|
||||
test('NoLivingConnectionsError', t => {
|
||||
t.plan(1)
|
||||
test('NoLivingConnectionsError (null connection)', t => {
|
||||
t.plan(3)
|
||||
const pool = new ConnectionPool({ Connection })
|
||||
pool.addConnection('http://localhost:9200')
|
||||
|
||||
const transport = new Transport({
|
||||
emit: () => {},
|
||||
@ -346,7 +374,40 @@ test('NoLivingConnectionsError', t => {
|
||||
maxRetries: 3,
|
||||
requestTimeout: 30000,
|
||||
sniffInterval: false,
|
||||
sniffOnStart: false
|
||||
sniffOnStart: false,
|
||||
nodeSelector (connections) {
|
||||
t.is(connections.length, 1)
|
||||
t.true(connections[0] instanceof Connection)
|
||||
return null
|
||||
}
|
||||
})
|
||||
|
||||
transport.request({
|
||||
method: 'GET',
|
||||
path: '/hello'
|
||||
}, (err, { body }) => {
|
||||
t.ok(err instanceof NoLivingConnectionsError)
|
||||
})
|
||||
})
|
||||
|
||||
test('NoLivingConnectionsError (undefined connection)', t => {
|
||||
t.plan(3)
|
||||
const pool = new ConnectionPool({ Connection })
|
||||
pool.addConnection('http://localhost:9200')
|
||||
|
||||
const transport = new Transport({
|
||||
emit: () => {},
|
||||
connectionPool: pool,
|
||||
serializer: new Serializer(),
|
||||
maxRetries: 3,
|
||||
requestTimeout: 30000,
|
||||
sniffInterval: false,
|
||||
sniffOnStart: false,
|
||||
nodeSelector (connections) {
|
||||
t.is(connections.length, 1)
|
||||
t.true(connections[0] instanceof Connection)
|
||||
return undefined
|
||||
}
|
||||
})
|
||||
|
||||
transport.request({
|
||||
@ -517,9 +578,8 @@ test('Retry mechanism', t => {
|
||||
if (count > 0) {
|
||||
res.end(JSON.stringify({ hello: 'world' }))
|
||||
} else {
|
||||
setTimeout(() => {
|
||||
res.end(JSON.stringify({ hello: 'world' }))
|
||||
}, 1000)
|
||||
res.statusCode = 504
|
||||
res.end(JSON.stringify({ error: true }))
|
||||
}
|
||||
count++
|
||||
}
|
||||
@ -542,7 +602,6 @@ test('Retry mechanism', t => {
|
||||
connectionPool: pool,
|
||||
serializer: new Serializer(),
|
||||
maxRetries: 1,
|
||||
requestTimeout: 250,
|
||||
sniffInterval: false,
|
||||
sniffOnStart: false
|
||||
})
|
||||
@ -558,6 +617,51 @@ test('Retry mechanism', t => {
|
||||
})
|
||||
})
|
||||
|
||||
test('Should not retry if the body is a stream', t => {
|
||||
t.plan(2)
|
||||
|
||||
var count = 0
|
||||
function handler (req, res) {
|
||||
count++
|
||||
res.setHeader('Content-Type', 'application/json;utf=8')
|
||||
res.statusCode = 504
|
||||
res.end(JSON.stringify({ error: true }))
|
||||
}
|
||||
|
||||
buildServer(handler, ({ port }, server) => {
|
||||
const pool = new ConnectionPool({ Connection })
|
||||
pool.addConnection([{
|
||||
url: new URL(`http://localhost:${port}`),
|
||||
id: 'node1'
|
||||
}, {
|
||||
url: new URL(`http://localhost:${port}`),
|
||||
id: 'node2'
|
||||
}, {
|
||||
url: new URL(`http://localhost:${port}`),
|
||||
id: 'node3'
|
||||
}])
|
||||
|
||||
const transport = new Transport({
|
||||
emit: () => {},
|
||||
connectionPool: pool,
|
||||
serializer: new Serializer(),
|
||||
maxRetries: 1,
|
||||
sniffInterval: false,
|
||||
sniffOnStart: false
|
||||
})
|
||||
|
||||
transport.request({
|
||||
method: 'POST',
|
||||
path: '/hello',
|
||||
body: intoStream(JSON.stringify({ hello: 'world' }))
|
||||
}, (err, { body }) => {
|
||||
t.ok(err instanceof ResponseError)
|
||||
t.strictEqual(count, 1)
|
||||
server.stop()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
test('Custom retry mechanism', t => {
|
||||
t.plan(2)
|
||||
|
||||
@ -567,9 +671,8 @@ test('Custom retry mechanism', t => {
|
||||
if (count > 0) {
|
||||
res.end(JSON.stringify({ hello: 'world' }))
|
||||
} else {
|
||||
setTimeout(() => {
|
||||
res.end(JSON.stringify({ hello: 'world' }))
|
||||
}, 1000)
|
||||
res.statusCode = 504
|
||||
res.end(JSON.stringify({ error: true }))
|
||||
}
|
||||
count++
|
||||
}
|
||||
@ -592,7 +695,6 @@ test('Custom retry mechanism', t => {
|
||||
connectionPool: pool,
|
||||
serializer: new Serializer(),
|
||||
maxRetries: 0,
|
||||
requestTimeout: 250,
|
||||
sniffInterval: false,
|
||||
sniffOnStart: false
|
||||
})
|
||||
@ -730,7 +832,7 @@ test('Should call resurrect on every request', t => {
|
||||
test('Should return a request aborter utility', t => {
|
||||
t.plan(1)
|
||||
|
||||
const pool = new ConnectionPool({ Connection, MockConnection })
|
||||
const pool = new ConnectionPool({ Connection: MockConnection })
|
||||
pool.addConnection({
|
||||
url: new URL('http://localhost:9200'),
|
||||
id: 'node1'
|
||||
@ -749,12 +851,11 @@ test('Should return a request aborter utility', t => {
|
||||
const request = transport.request({
|
||||
method: 'GET',
|
||||
path: '/hello'
|
||||
}, (_err, body) => {
|
||||
t.fail('Should not be called')
|
||||
}, (err, result) => {
|
||||
t.ok(err instanceof RequestAbortedError)
|
||||
})
|
||||
|
||||
request.abort()
|
||||
t.pass('ok')
|
||||
})
|
||||
|
||||
test('Retry mechanism and abort', t => {
|
||||
@ -785,8 +886,6 @@ test('Retry mechanism and abort', t => {
|
||||
emit: event => {
|
||||
if (event === 'request' && count++ > 0) {
|
||||
request.abort()
|
||||
server.stop()
|
||||
t.pass('ok')
|
||||
}
|
||||
},
|
||||
connectionPool: pool,
|
||||
@ -800,12 +899,48 @@ test('Retry mechanism and abort', t => {
|
||||
const request = transport.request({
|
||||
method: 'GET',
|
||||
path: '/hello'
|
||||
}, (e, { body }) => {
|
||||
t.fail('Should not be called')
|
||||
}, (err, result) => {
|
||||
t.ok(err instanceof RequestAbortedError)
|
||||
server.stop()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
test('Abort a request with the promise API', t => {
|
||||
t.plan(1)
|
||||
|
||||
const pool = new ConnectionPool({ Connection: MockConnection })
|
||||
pool.addConnection({
|
||||
url: new URL('http://localhost:9200'),
|
||||
id: 'node1'
|
||||
})
|
||||
|
||||
const transport = new Transport({
|
||||
emit: () => {},
|
||||
connectionPool: pool,
|
||||
serializer: new Serializer(),
|
||||
maxRetries: 3,
|
||||
requestTimeout: 30000,
|
||||
sniffInterval: false,
|
||||
sniffOnStart: false
|
||||
})
|
||||
|
||||
const request = transport.request({
|
||||
method: 'GET',
|
||||
path: '/hello'
|
||||
})
|
||||
|
||||
request
|
||||
.then(() => {
|
||||
t.fail('Should not be called')
|
||||
})
|
||||
.catch(err => {
|
||||
t.ok(err instanceof RequestAbortedError)
|
||||
})
|
||||
|
||||
request.abort()
|
||||
})
|
||||
|
||||
test('ResponseError', t => {
|
||||
t.plan(3)
|
||||
|
||||
@ -1862,6 +1997,62 @@ test('Compress request', t => {
|
||||
})
|
||||
})
|
||||
|
||||
t.test('Retry a gzipped body', t => {
|
||||
t.plan(7)
|
||||
|
||||
var count = 0
|
||||
function handler (req, res) {
|
||||
t.match(req.headers, {
|
||||
'content-type': 'application/json',
|
||||
'content-encoding': 'gzip'
|
||||
})
|
||||
var json = ''
|
||||
req
|
||||
.pipe(createGunzip())
|
||||
.on('data', chunk => { json += chunk })
|
||||
.on('error', err => t.fail(err))
|
||||
.on('end', () => {
|
||||
t.deepEqual(JSON.parse(json), { you_know: 'for search' })
|
||||
res.setHeader('Content-Type', 'application/json;utf=8')
|
||||
if (count++ > 0) {
|
||||
res.end(JSON.stringify({ you_know: 'for search' }))
|
||||
} else {
|
||||
setTimeout(() => {
|
||||
res.end(JSON.stringify({ you_know: 'for search' }))
|
||||
}, 1000)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
buildServer(handler, ({ port }, server) => {
|
||||
const pool = new ConnectionPool({ Connection })
|
||||
pool.addConnection(`http://localhost:${port}`)
|
||||
|
||||
const transport = new Transport({
|
||||
emit: () => {},
|
||||
connectionPool: pool,
|
||||
serializer: new Serializer(),
|
||||
maxRetries: 3,
|
||||
requestTimeout: 250,
|
||||
sniffInterval: false,
|
||||
sniffOnStart: false
|
||||
})
|
||||
|
||||
transport.request({
|
||||
method: 'POST',
|
||||
path: '/hello',
|
||||
body: { you_know: 'for search' }
|
||||
}, {
|
||||
compression: 'gzip'
|
||||
}, (err, { body, meta }) => {
|
||||
t.error(err)
|
||||
t.deepEqual(body, { you_know: 'for search' })
|
||||
t.strictEqual(count, 2)
|
||||
server.stop()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
t.end()
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user