Handle connectivity issues while reading the body (#1343)

This commit is contained in:
Tomas Della Vedova
2020-11-10 18:16:25 +01:00
committed by delvedor
parent 2d51ef429f
commit 3d728bcad7
8 changed files with 269 additions and 231 deletions

View File

@ -9,7 +9,7 @@ jobs:
strategy: strategy:
matrix: matrix:
node-version: [10.x, 12.x, 14.x] node-version: [10.x, 12.x, 14.x, 15.x]
os: [ubuntu-latest, windows-latest, macOS-latest] os: [ubuntu-latest, windows-latest, macOS-latest]
steps: steps:

View File

@ -25,7 +25,6 @@ const hpagent = require('hpagent')
const http = require('http') const http = require('http')
const https = require('https') const https = require('https')
const debug = require('debug')('elasticsearch') const debug = require('debug')('elasticsearch')
const decompressResponse = require('decompress-response')
const pump = require('pump') const pump = require('pump')
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/ const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/
const { const {
@ -83,7 +82,6 @@ class Connection {
request (params, callback) { request (params, callback) {
this._openRequests++ this._openRequests++
var ended = false
const requestParams = this.buildRequestObject(params) const requestParams = this.buildRequestObject(params)
// https://github.com/nodejs/node/commit/b961d9fd83 // https://github.com/nodejs/node/commit/b961d9fd83
@ -96,53 +94,38 @@ class Connection {
debug('Starting a new request', params) debug('Starting a new request', params)
const request = this.makeRequest(requestParams) const request = this.makeRequest(requestParams)
// listen for the response event const onResponse = response => {
// TODO: handle redirects? cleanListeners()
request.on('response', response => { this._openRequests--
/* istanbul ignore else */ callback(null, response)
if (ended === false) { }
ended = true
this._openRequests--
if (params.asStream === true) { const onTimeout = () => {
callback(null, response) cleanListeners()
} else { this._openRequests--
callback(null, decompressResponse(response)) request.once('error', () => {}) // we need to catch the request aborted error
} request.abort()
} callback(new TimeoutError('Request timed out', params), null)
}) }
// handles request timeout const onError = err => {
request.on('timeout', () => { cleanListeners()
/* istanbul ignore else */ this._openRequests--
if (ended === false) { callback(new ConnectionError(err.message), null)
ended = true }
this._openRequests--
request.abort()
callback(new TimeoutError('Request timed out', params), null)
}
})
// handles request error const onAbort = () => {
request.on('error', err => { cleanListeners()
/* istanbul ignore else */ request.once('error', () => {}) // we need to catch the request aborted error
if (ended === false) {
ended = true
this._openRequests--
callback(new ConnectionError(err.message), null)
}
})
// updates the ended state
request.on('abort', () => {
debug('Request aborted', params) debug('Request aborted', params)
/* istanbul ignore else */ this._openRequests--
if (ended === false) { callback(new RequestAbortedError(), null)
ended = true }
this._openRequests--
callback(new RequestAbortedError(), null) request.on('response', onResponse)
} request.on('timeout', onTimeout)
}) request.on('error', onError)
request.on('abort', onAbort)
// Disables the Nagle algorithm // Disables the Nagle algorithm
request.setNoDelay(true) request.setNoDelay(true)
@ -151,8 +134,8 @@ class Connection {
if (isStream(params.body) === true) { if (isStream(params.body) === true) {
pump(params.body, request, err => { pump(params.body, request, err => {
/* istanbul ignore if */ /* istanbul ignore if */
if (err != null && /* istanbul ignore next */ ended === false) { if (err != null) {
ended = true cleanListeners()
this._openRequests-- this._openRequests--
callback(err, null) callback(err, null)
} }
@ -162,6 +145,13 @@ class Connection {
} }
return request return request
function cleanListeners () {
request.removeListener('response', onResponse)
request.removeListener('timeout', onTimeout)
request.removeListener('error', onError)
request.removeListener('abort', onAbort)
}
} }
// TODO: write a better closing logic // TODO: write a better closing logic

View File

@ -21,7 +21,7 @@
const debug = require('debug')('elasticsearch') const debug = require('debug')('elasticsearch')
const os = require('os') const os = require('os')
const { gzip, createGzip } = require('zlib') const { gzip, unzip, createGzip } = require('zlib')
const ms = require('ms') const ms = require('ms')
const { const {
ConnectionError, ConnectionError,
@ -174,37 +174,40 @@ class Transport {
request = meta.connection.request(params, onResponse) request = meta.connection.request(params, onResponse)
} }
const onResponse = (err, response) => { const onConnectionError = (err) => {
if (err !== null) { if (err.name !== 'RequestAbortedError') {
if (err.name !== 'RequestAbortedError') { // if there is an error in the connection
// if there is an error in the connection // let's mark the connection as dead
// let's mark the connection as dead this.connectionPool.markDead(meta.connection)
this.connectionPool.markDead(meta.connection)
if (this.sniffOnConnectionFault === true) { if (this.sniffOnConnectionFault === true) {
this.sniff({ this.sniff({
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT, reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
requestId: meta.request.id requestId: meta.request.id
}) })
}
// retry logic
if (meta.attempts < maxRetries) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
makeRequest()
return
}
} }
err.meta = result // retry logic
this.emit('response', err, result) if (meta.attempts < maxRetries) {
return callback(err, result) meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
makeRequest()
return
}
} }
const { statusCode, headers } = response err.meta = result
result.statusCode = statusCode this.emit('response', err, result)
result.headers = headers return callback(err, result)
}
const onResponse = (err, response) => {
if (err !== null) {
return onConnectionError(err)
}
result.statusCode = response.statusCode
result.headers = response.headers
if (options.asStream === true) { if (options.asStream === true) {
result.body = response result.body = response
@ -213,74 +216,109 @@ class Transport {
return return
} }
var payload = '' const contentEncoding = (result.headers['content-encoding'] || '').toLowerCase()
// collect the payload const isCompressed = contentEncoding.indexOf('gzip') > -1 || contentEncoding.indexOf('deflate') > -1
response.setEncoding('utf8') // if the response is compressed, we must handle it
response.on('data', chunk => { payload += chunk }) // as buffer for allowing decompression later
/* istanbul ignore next */ let payload = isCompressed ? [] : ''
response.on('error', err => { const onData = isCompressed
const error = new ConnectionError(err.message, result) ? chunk => { payload.push(chunk) }
: chunk => { payload += chunk }
const onEnd = err => {
response.removeListener('data', onData)
response.removeListener('end', onEnd)
response.removeListener('error', onEnd)
response.removeListener('aborted', onAbort)
if (err) {
return onConnectionError(new ConnectionError(err.message))
}
if (isCompressed) {
unzip(Buffer.concat(payload), onBody)
} else {
onBody(null, payload)
}
}
const onAbort = () => {
response.destroy()
onEnd(new Error('Response aborted while reading the body'))
}
if (!isCompressed) {
response.setEncoding('utf8')
}
response.on('data', onData)
response.on('error', onEnd)
response.on('end', onEnd)
response.on('aborted', onAbort)
}
const onBody = (err, payload) => {
if (err) {
this.emit('response', err, result)
return callback(err, result)
}
if (Buffer.isBuffer(payload)) {
payload = payload.toString()
}
const isHead = params.method === 'HEAD'
// we should attempt the payload deserialization only if:
// - a `content-type` is defined and is equal to `application/json`
// - the request is not a HEAD request
// - the payload is not an empty string
if (result.headers['content-type'] !== undefined &&
result.headers['content-type'].indexOf('application/json') > -1 &&
isHead === false &&
payload !== ''
) {
try {
result.body = this.serializer.deserialize(payload)
} catch (err) {
this.emit('response', err, result)
return callback(err, result)
}
} else {
// cast to boolean if the request method was HEAD
result.body = isHead === true ? true : payload
}
// we should ignore the statusCode if the user has configured the `ignore` field with
// the statusCode we just got or if the request method is HEAD and the statusCode is 404
const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.indexOf(result.statusCode) > -1) ||
(isHead === true && result.statusCode === 404)
if (ignoreStatusCode === false &&
(result.statusCode === 502 || result.statusCode === 503 || result.statusCode === 504)) {
// if the statusCode is 502/3/4 we should run our retry strategy
// and mark the connection as dead
this.connectionPool.markDead(meta.connection)
// retry logic (we shoukd not retry on "429 - Too Many Requests")
if (meta.attempts < maxRetries && result.statusCode !== 429) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
makeRequest()
return
}
} else {
// everything has worked as expected, let's mark
// the connection as alive (or confirm it)
this.connectionPool.markAlive(meta.connection)
}
if (ignoreStatusCode === false && result.statusCode >= 400) {
const error = new ResponseError(result)
this.emit('response', error, result) this.emit('response', error, result)
callback(error, result) callback(error, result)
}) } else {
response.on('end', () => { // cast to boolean if the request method was HEAD
const isHead = params.method === 'HEAD' if (isHead === true && result.statusCode === 404) {
// we should attempt the payload deserialization only if: result.body = false
// - a `content-type` is defined and is equal to `application/json`
// - the request is not a HEAD request
// - the payload is not an empty string
if (headers['content-type'] !== undefined &&
headers['content-type'].indexOf('application/json') > -1 &&
isHead === false &&
payload !== ''
) {
try {
result.body = this.serializer.deserialize(payload)
} catch (err) {
this.emit('response', err, result)
return callback(err, result)
}
} else {
// cast to boolean if the request method was HEAD
result.body = isHead === true ? true : payload
} }
this.emit('response', null, result)
// we should ignore the statusCode if the user has configured the `ignore` field with callback(null, result)
// the statusCode we just got or if the request method is HEAD and the statusCode is 404 }
const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.indexOf(statusCode) > -1) ||
(isHead === true && statusCode === 404)
if (ignoreStatusCode === false &&
(statusCode === 502 || statusCode === 503 || statusCode === 504)) {
// if the statusCode is 502/3/4 we should run our retry strategy
// and mark the connection as dead
this.connectionPool.markDead(meta.connection)
// retry logic (we shoukd not retry on "429 - Too Many Requests")
if (meta.attempts < maxRetries && statusCode !== 429) {
meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
makeRequest()
return
}
} else {
// everything has worked as expected, let's mark
// the connection as alive (or confirm it)
this.connectionPool.markAlive(meta.connection)
}
if (ignoreStatusCode === false && statusCode >= 400) {
const error = new ResponseError(result)
this.emit('response', error, result)
callback(error, result)
} else {
// cast to boolean if the request method was HEAD
if (isHead === true && statusCode === 404) {
result.body = false
}
this.emit('response', null, result)
callback(null, result)
}
})
} }
const headers = Object.assign({}, this.headers, lowerCaseHeaders(options.headers)) const headers = Object.assign({}, this.headers, lowerCaseHeaders(options.headers))

View File

@ -75,7 +75,6 @@
}, },
"dependencies": { "dependencies": {
"debug": "^4.1.1", "debug": "^4.1.1",
"decompress-response": "^4.2.0",
"hpagent": "^0.1.1", "hpagent": "^0.1.1",
"ms": "^2.1.1", "ms": "^2.1.1",
"pump": "^3.0.0", "pump": "^3.0.0",

View File

@ -21,7 +21,7 @@
const { test } = require('tap') const { test } = require('tap')
const { URL } = require('url') const { URL } = require('url')
const { Client, ConnectionPool, Transport } = require('../../index') const { Client, ConnectionPool, Transport, errors } = require('../../index')
const { CloudConnectionPool } = require('../../lib/pool') const { CloudConnectionPool } = require('../../lib/pool')
const { buildServer } = require('../utils') const { buildServer } = require('../utils')
@ -1191,3 +1191,55 @@ test('name property as symbol', t => {
t.strictEqual(client.name, symbol) t.strictEqual(client.name, symbol)
}) })
// The nodejs http agent will try to wait for the whole
// body to arrive before closing the request, so this
// test might take some time.
test('Bad content length', t => {
t.plan(3)
let count = 0
function handler (req, res) {
count += 1
const body = JSON.stringify({ hello: 'world' })
res.setHeader('Content-Type', 'application/json;utf=8')
res.setHeader('Content-Length', body.length + '')
res.end(body.slice(0, -5))
}
buildServer(handler, ({ port }, server) => {
const client = new Client({ node: `http://localhost:${port}`, maxRetries: 1 })
client.info((err, { body }) => {
t.ok(err instanceof errors.ConnectionError)
t.is(err.message, 'Response aborted while reading the body')
t.strictEqual(count, 2)
server.stop()
})
})
})
test('Socket destryed while reading the body', t => {
t.plan(3)
let count = 0
function handler (req, res) {
count += 1
const body = JSON.stringify({ hello: 'world' })
res.setHeader('Content-Type', 'application/json;utf=8')
res.setHeader('Content-Length', body.length + '')
res.write(body.slice(0, -5))
setTimeout(() => {
res.socket.destroy()
}, 500)
}
buildServer(handler, ({ port }, server) => {
const client = new Client({ node: `http://localhost:${port}`, maxRetries: 1 })
client.info((err, { body }) => {
t.ok(err instanceof errors.ConnectionError)
t.is(err.message, 'Response aborted while reading the body')
t.strictEqual(count, 2)
server.stop()
})
})
})

View File

@ -21,7 +21,6 @@
const { test } = require('tap') const { test } = require('tap')
const { inspect } = require('util') const { inspect } = require('util')
const { createGzip, createDeflate } = require('zlib')
const { URL } = require('url') const { URL } = require('url')
const { Agent } = require('http') const { Agent } = require('http')
const hpagent = require('hpagent') const hpagent = require('hpagent')
@ -400,90 +399,6 @@ test('Send body as stream', t => {
}) })
}) })
test('Should handle compression', t => {
t.test('gzip', t => {
t.plan(3)
function handler (req, res) {
res.writeHead(200, {
'Content-Type': 'application/json;utf=8',
'Content-Encoding': 'gzip'
})
intoStream(JSON.stringify({ hello: 'world' }))
.pipe(createGzip())
.pipe(res)
}
buildServer(handler, ({ port }, server) => {
const connection = new Connection({
url: new URL(`http://localhost:${port}`)
})
connection.request({
path: '/hello',
method: 'GET'
}, (err, res) => {
t.error(err)
t.match(res.headers, {
'content-type': 'application/json;utf=8',
'content-encoding': 'gzip'
})
var payload = ''
res.setEncoding('utf8')
res.on('data', chunk => { payload += chunk })
res.on('error', err => t.fail(err))
res.on('end', () => {
t.deepEqual(JSON.parse(payload), { hello: 'world' })
server.stop()
})
})
})
})
t.test('deflate', t => {
t.plan(3)
function handler (req, res) {
res.writeHead(200, {
'Content-Type': 'application/json;utf=8',
'Content-Encoding': 'deflate'
})
intoStream(JSON.stringify({ hello: 'world' }))
.pipe(createDeflate())
.pipe(res)
}
buildServer(handler, ({ port }, server) => {
const connection = new Connection({
url: new URL(`http://localhost:${port}`)
})
connection.request({
path: '/hello',
method: 'GET'
}, (err, res) => {
t.error(err)
t.match(res.headers, {
'content-type': 'application/json;utf=8',
'content-encoding': 'deflate'
})
var payload = ''
res.setEncoding('utf8')
res.on('data', chunk => { payload += chunk })
res.on('error', err => t.fail(err))
res.on('end', () => {
t.deepEqual(JSON.parse(payload), { hello: 'world' })
server.stop()
})
})
})
})
t.end()
})
test('Should not close a connection if there are open requests', t => { test('Should not close a connection if there are open requests', t => {
t.plan(4) t.plan(4)

View File

@ -22,7 +22,7 @@
const { test } = require('tap') const { test } = require('tap')
const { URL } = require('url') const { URL } = require('url')
const FakeTimers = require('@sinonjs/fake-timers') const FakeTimers = require('@sinonjs/fake-timers')
const { createGunzip } = require('zlib') const { createGunzip, gzipSync } = require('zlib')
const os = require('os') const os = require('os')
const intoStream = require('into-stream') const intoStream = require('into-stream')
const { const {
@ -1665,13 +1665,17 @@ test('Should cast to boolean HEAD request', t => {
}) })
test('Suggest compression', t => { test('Suggest compression', t => {
t.plan(2) t.plan(3)
function handler (req, res) { function handler (req, res) {
t.match(req.headers, { t.match(req.headers, {
'accept-encoding': 'gzip,deflate' 'accept-encoding': 'gzip,deflate'
}) })
const body = gzipSync(JSON.stringify({ hello: 'world' }))
res.setHeader('Content-Type', 'application/json;utf=8') res.setHeader('Content-Type', 'application/json;utf=8')
res.end(JSON.stringify({ hello: 'world' })) res.setHeader('Content-Encoding', 'gzip')
res.setHeader('Content-Length', Buffer.byteLength(body))
res.end(body)
} }
buildServer(handler, ({ port }, server) => { buildServer(handler, ({ port }, server) => {
@ -1694,6 +1698,46 @@ test('Suggest compression', t => {
path: '/hello' path: '/hello'
}, (err, { body }) => { }, (err, { body }) => {
t.error(err) t.error(err)
t.deepEqual(body, { hello: 'world' })
server.stop()
})
})
})
test('Broken compression', t => {
t.plan(2)
function handler (req, res) {
t.match(req.headers, {
'accept-encoding': 'gzip,deflate'
})
const body = gzipSync(JSON.stringify({ hello: 'world' }))
res.setHeader('Content-Type', 'application/json;utf=8')
res.setHeader('Content-Encoding', 'gzip')
// we are not setting the content length on purpose
res.end(body.slice(0, -5))
}
buildServer(handler, ({ port }, server) => {
const pool = new ConnectionPool({ Connection })
pool.addConnection(`http://localhost:${port}`)
const transport = new Transport({
emit: () => {},
connectionPool: pool,
serializer: new Serializer(),
maxRetries: 3,
requestTimeout: 30000,
sniffInterval: false,
sniffOnStart: false,
suggestCompression: true
})
transport.request({
method: 'GET',
path: '/hello'
}, (err, { body }) => {
t.ok(err)
server.stop() server.stop()
}) })
}) })

View File

@ -109,7 +109,7 @@ class MockConnectionSniff extends Connection {
'content-type': 'application/json;utf=8', 'content-type': 'application/json;utf=8',
date: new Date().toISOString(), date: new Date().toISOString(),
connection: 'keep-alive', connection: 'keep-alive',
'content-length': '205' 'content-length': '191'
} }
process.nextTick(() => { process.nextTick(() => {
if (!aborted) { if (!aborted) {