Handle connectivity issues while reading the body (#1343)
This commit is contained in:
committed by
GitHub
parent
a62e26e901
commit
edd4f78bad
@ -25,7 +25,6 @@ const hpagent = require('hpagent')
|
||||
const http = require('http')
|
||||
const https = require('https')
|
||||
const debug = require('debug')('elasticsearch')
|
||||
const decompressResponse = require('decompress-response')
|
||||
const pump = require('pump')
|
||||
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/
|
||||
const {
|
||||
@ -83,7 +82,6 @@ class Connection {
|
||||
|
||||
request (params, callback) {
|
||||
this._openRequests++
|
||||
var ended = false
|
||||
|
||||
const requestParams = this.buildRequestObject(params)
|
||||
// https://github.com/nodejs/node/commit/b961d9fd83
|
||||
@ -96,53 +94,38 @@ class Connection {
|
||||
debug('Starting a new request', params)
|
||||
const request = this.makeRequest(requestParams)
|
||||
|
||||
// listen for the response event
|
||||
// TODO: handle redirects?
|
||||
request.on('response', response => {
|
||||
/* istanbul ignore else */
|
||||
if (ended === false) {
|
||||
ended = true
|
||||
this._openRequests--
|
||||
const onResponse = response => {
|
||||
cleanListeners()
|
||||
this._openRequests--
|
||||
callback(null, response)
|
||||
}
|
||||
|
||||
if (params.asStream === true) {
|
||||
callback(null, response)
|
||||
} else {
|
||||
callback(null, decompressResponse(response))
|
||||
}
|
||||
}
|
||||
})
|
||||
const onTimeout = () => {
|
||||
cleanListeners()
|
||||
this._openRequests--
|
||||
request.once('error', () => {}) // we need to catch the request aborted error
|
||||
request.abort()
|
||||
callback(new TimeoutError('Request timed out', params), null)
|
||||
}
|
||||
|
||||
// handles request timeout
|
||||
request.on('timeout', () => {
|
||||
/* istanbul ignore else */
|
||||
if (ended === false) {
|
||||
ended = true
|
||||
this._openRequests--
|
||||
request.abort()
|
||||
callback(new TimeoutError('Request timed out', params), null)
|
||||
}
|
||||
})
|
||||
const onError = err => {
|
||||
cleanListeners()
|
||||
this._openRequests--
|
||||
callback(new ConnectionError(err.message), null)
|
||||
}
|
||||
|
||||
// handles request error
|
||||
request.on('error', err => {
|
||||
/* istanbul ignore else */
|
||||
if (ended === false) {
|
||||
ended = true
|
||||
this._openRequests--
|
||||
callback(new ConnectionError(err.message), null)
|
||||
}
|
||||
})
|
||||
|
||||
// updates the ended state
|
||||
request.on('abort', () => {
|
||||
const onAbort = () => {
|
||||
cleanListeners()
|
||||
request.once('error', () => {}) // we need to catch the request aborted error
|
||||
debug('Request aborted', params)
|
||||
/* istanbul ignore else */
|
||||
if (ended === false) {
|
||||
ended = true
|
||||
this._openRequests--
|
||||
callback(new RequestAbortedError(), null)
|
||||
}
|
||||
})
|
||||
this._openRequests--
|
||||
callback(new RequestAbortedError(), null)
|
||||
}
|
||||
|
||||
request.on('response', onResponse)
|
||||
request.on('timeout', onTimeout)
|
||||
request.on('error', onError)
|
||||
request.on('abort', onAbort)
|
||||
|
||||
// Disables the Nagle algorithm
|
||||
request.setNoDelay(true)
|
||||
@ -151,8 +134,8 @@ class Connection {
|
||||
if (isStream(params.body) === true) {
|
||||
pump(params.body, request, err => {
|
||||
/* istanbul ignore if */
|
||||
if (err != null && /* istanbul ignore next */ ended === false) {
|
||||
ended = true
|
||||
if (err != null) {
|
||||
cleanListeners()
|
||||
this._openRequests--
|
||||
callback(err, null)
|
||||
}
|
||||
@ -162,6 +145,13 @@ class Connection {
|
||||
}
|
||||
|
||||
return request
|
||||
|
||||
function cleanListeners () {
|
||||
request.removeListener('response', onResponse)
|
||||
request.removeListener('timeout', onTimeout)
|
||||
request.removeListener('error', onError)
|
||||
request.removeListener('abort', onAbort)
|
||||
}
|
||||
}
|
||||
|
||||
// TODO: write a better closing logic
|
||||
|
||||
222
lib/Transport.js
222
lib/Transport.js
@ -21,7 +21,7 @@
|
||||
|
||||
const debug = require('debug')('elasticsearch')
|
||||
const os = require('os')
|
||||
const { gzip, createGzip } = require('zlib')
|
||||
const { gzip, unzip, createGzip } = require('zlib')
|
||||
const ms = require('ms')
|
||||
const {
|
||||
ConnectionError,
|
||||
@ -174,37 +174,40 @@ class Transport {
|
||||
request = meta.connection.request(params, onResponse)
|
||||
}
|
||||
|
||||
const onResponse = (err, response) => {
|
||||
if (err !== null) {
|
||||
if (err.name !== 'RequestAbortedError') {
|
||||
// if there is an error in the connection
|
||||
// let's mark the connection as dead
|
||||
this.connectionPool.markDead(meta.connection)
|
||||
const onConnectionError = (err) => {
|
||||
if (err.name !== 'RequestAbortedError') {
|
||||
// if there is an error in the connection
|
||||
// let's mark the connection as dead
|
||||
this.connectionPool.markDead(meta.connection)
|
||||
|
||||
if (this.sniffOnConnectionFault === true) {
|
||||
this.sniff({
|
||||
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
|
||||
requestId: meta.request.id
|
||||
})
|
||||
}
|
||||
|
||||
// retry logic
|
||||
if (meta.attempts < maxRetries) {
|
||||
meta.attempts++
|
||||
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
|
||||
makeRequest()
|
||||
return
|
||||
}
|
||||
if (this.sniffOnConnectionFault === true) {
|
||||
this.sniff({
|
||||
reason: Transport.sniffReasons.SNIFF_ON_CONNECTION_FAULT,
|
||||
requestId: meta.request.id
|
||||
})
|
||||
}
|
||||
|
||||
err.meta = result
|
||||
this.emit('response', err, result)
|
||||
return callback(err, result)
|
||||
// retry logic
|
||||
if (meta.attempts < maxRetries) {
|
||||
meta.attempts++
|
||||
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
|
||||
makeRequest()
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
const { statusCode, headers } = response
|
||||
result.statusCode = statusCode
|
||||
result.headers = headers
|
||||
err.meta = result
|
||||
this.emit('response', err, result)
|
||||
return callback(err, result)
|
||||
}
|
||||
|
||||
const onResponse = (err, response) => {
|
||||
if (err !== null) {
|
||||
return onConnectionError(err)
|
||||
}
|
||||
|
||||
result.statusCode = response.statusCode
|
||||
result.headers = response.headers
|
||||
|
||||
if (options.asStream === true) {
|
||||
result.body = response
|
||||
@ -213,74 +216,109 @@ class Transport {
|
||||
return
|
||||
}
|
||||
|
||||
var payload = ''
|
||||
// collect the payload
|
||||
response.setEncoding('utf8')
|
||||
response.on('data', chunk => { payload += chunk })
|
||||
/* istanbul ignore next */
|
||||
response.on('error', err => {
|
||||
const error = new ConnectionError(err.message, result)
|
||||
const contentEncoding = (result.headers['content-encoding'] || '').toLowerCase()
|
||||
const isCompressed = contentEncoding.indexOf('gzip') > -1 || contentEncoding.indexOf('deflate') > -1
|
||||
// if the response is compressed, we must handle it
|
||||
// as buffer for allowing decompression later
|
||||
let payload = isCompressed ? [] : ''
|
||||
const onData = isCompressed
|
||||
? chunk => { payload.push(chunk) }
|
||||
: chunk => { payload += chunk }
|
||||
const onEnd = err => {
|
||||
response.removeListener('data', onData)
|
||||
response.removeListener('end', onEnd)
|
||||
response.removeListener('error', onEnd)
|
||||
response.removeListener('aborted', onAbort)
|
||||
|
||||
if (err) {
|
||||
return onConnectionError(new ConnectionError(err.message))
|
||||
}
|
||||
|
||||
if (isCompressed) {
|
||||
unzip(Buffer.concat(payload), onBody)
|
||||
} else {
|
||||
onBody(null, payload)
|
||||
}
|
||||
}
|
||||
|
||||
const onAbort = () => {
|
||||
response.destroy()
|
||||
onEnd(new Error('Response aborted while reading the body'))
|
||||
}
|
||||
|
||||
if (!isCompressed) {
|
||||
response.setEncoding('utf8')
|
||||
}
|
||||
response.on('data', onData)
|
||||
response.on('error', onEnd)
|
||||
response.on('end', onEnd)
|
||||
response.on('aborted', onAbort)
|
||||
}
|
||||
|
||||
const onBody = (err, payload) => {
|
||||
if (err) {
|
||||
this.emit('response', err, result)
|
||||
return callback(err, result)
|
||||
}
|
||||
if (Buffer.isBuffer(payload)) {
|
||||
payload = payload.toString()
|
||||
}
|
||||
const isHead = params.method === 'HEAD'
|
||||
// we should attempt the payload deserialization only if:
|
||||
// - a `content-type` is defined and is equal to `application/json`
|
||||
// - the request is not a HEAD request
|
||||
// - the payload is not an empty string
|
||||
if (result.headers['content-type'] !== undefined &&
|
||||
result.headers['content-type'].indexOf('application/json') > -1 &&
|
||||
isHead === false &&
|
||||
payload !== ''
|
||||
) {
|
||||
try {
|
||||
result.body = this.serializer.deserialize(payload)
|
||||
} catch (err) {
|
||||
this.emit('response', err, result)
|
||||
return callback(err, result)
|
||||
}
|
||||
} else {
|
||||
// cast to boolean if the request method was HEAD
|
||||
result.body = isHead === true ? true : payload
|
||||
}
|
||||
|
||||
// we should ignore the statusCode if the user has configured the `ignore` field with
|
||||
// the statusCode we just got or if the request method is HEAD and the statusCode is 404
|
||||
const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.indexOf(result.statusCode) > -1) ||
|
||||
(isHead === true && result.statusCode === 404)
|
||||
|
||||
if (ignoreStatusCode === false &&
|
||||
(result.statusCode === 502 || result.statusCode === 503 || result.statusCode === 504)) {
|
||||
// if the statusCode is 502/3/4 we should run our retry strategy
|
||||
// and mark the connection as dead
|
||||
this.connectionPool.markDead(meta.connection)
|
||||
// retry logic (we shoukd not retry on "429 - Too Many Requests")
|
||||
if (meta.attempts < maxRetries && result.statusCode !== 429) {
|
||||
meta.attempts++
|
||||
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
|
||||
makeRequest()
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// everything has worked as expected, let's mark
|
||||
// the connection as alive (or confirm it)
|
||||
this.connectionPool.markAlive(meta.connection)
|
||||
}
|
||||
|
||||
if (ignoreStatusCode === false && result.statusCode >= 400) {
|
||||
const error = new ResponseError(result)
|
||||
this.emit('response', error, result)
|
||||
callback(error, result)
|
||||
})
|
||||
response.on('end', () => {
|
||||
const isHead = params.method === 'HEAD'
|
||||
// we should attempt the payload deserialization only if:
|
||||
// - a `content-type` is defined and is equal to `application/json`
|
||||
// - the request is not a HEAD request
|
||||
// - the payload is not an empty string
|
||||
if (headers['content-type'] !== undefined &&
|
||||
headers['content-type'].indexOf('application/json') > -1 &&
|
||||
isHead === false &&
|
||||
payload !== ''
|
||||
) {
|
||||
try {
|
||||
result.body = this.serializer.deserialize(payload)
|
||||
} catch (err) {
|
||||
this.emit('response', err, result)
|
||||
return callback(err, result)
|
||||
}
|
||||
} else {
|
||||
// cast to boolean if the request method was HEAD
|
||||
result.body = isHead === true ? true : payload
|
||||
} else {
|
||||
// cast to boolean if the request method was HEAD
|
||||
if (isHead === true && result.statusCode === 404) {
|
||||
result.body = false
|
||||
}
|
||||
|
||||
// we should ignore the statusCode if the user has configured the `ignore` field with
|
||||
// the statusCode we just got or if the request method is HEAD and the statusCode is 404
|
||||
const ignoreStatusCode = (Array.isArray(options.ignore) && options.ignore.indexOf(statusCode) > -1) ||
|
||||
(isHead === true && statusCode === 404)
|
||||
|
||||
if (ignoreStatusCode === false &&
|
||||
(statusCode === 502 || statusCode === 503 || statusCode === 504)) {
|
||||
// if the statusCode is 502/3/4 we should run our retry strategy
|
||||
// and mark the connection as dead
|
||||
this.connectionPool.markDead(meta.connection)
|
||||
// retry logic (we shoukd not retry on "429 - Too Many Requests")
|
||||
if (meta.attempts < maxRetries && statusCode !== 429) {
|
||||
meta.attempts++
|
||||
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
|
||||
makeRequest()
|
||||
return
|
||||
}
|
||||
} else {
|
||||
// everything has worked as expected, let's mark
|
||||
// the connection as alive (or confirm it)
|
||||
this.connectionPool.markAlive(meta.connection)
|
||||
}
|
||||
|
||||
if (ignoreStatusCode === false && statusCode >= 400) {
|
||||
const error = new ResponseError(result)
|
||||
this.emit('response', error, result)
|
||||
callback(error, result)
|
||||
} else {
|
||||
// cast to boolean if the request method was HEAD
|
||||
if (isHead === true && statusCode === 404) {
|
||||
result.body = false
|
||||
}
|
||||
this.emit('response', null, result)
|
||||
callback(null, result)
|
||||
}
|
||||
})
|
||||
this.emit('response', null, result)
|
||||
callback(null, result)
|
||||
}
|
||||
}
|
||||
|
||||
const headers = Object.assign({}, this.headers, lowerCaseHeaders(options.headers))
|
||||
|
||||
Reference in New Issue
Block a user