Do not retry a request if the body is a stream (#1143)

* Do not retry a request if the body is a stream

Refactored the trnasport.request method to not use stream for gzipping
the body, but use the callback API instead. The maxRetries will be 0 in
case of a stream body and cached the Accept-Encoding header.

* Updated dependencies

* Updated test
This commit is contained in:
Tomas Della Vedova
2020-04-06 12:16:21 +02:00
committed by GitHub
parent d10e8bb9f3
commit e67b55d163
4 changed files with 209 additions and 99 deletions

View File

@ -6,9 +6,7 @@
const debug = require('debug')('elasticsearch') const debug = require('debug')('elasticsearch')
const os = require('os') const os = require('os')
const once = require('once') const { gzip, createGzip } = require('zlib')
const { createGzip } = require('zlib')
const intoStream = require('into-stream')
const ms = require('ms') const ms = require('ms')
const { const {
ConnectionError, ConnectionError,
@ -35,7 +33,11 @@ class Transport {
this.requestTimeout = toMs(opts.requestTimeout) this.requestTimeout = toMs(opts.requestTimeout)
this.suggestCompression = opts.suggestCompression === true this.suggestCompression = opts.suggestCompression === true
this.compression = opts.compression || false this.compression = opts.compression || false
this.headers = Object.assign({}, { 'User-Agent': userAgent }, opts.headers) this.headers = Object.assign({},
{ 'User-Agent': userAgent },
opts.suggestCompression === true ? { 'Accept-Encoding': 'gzip,deflate' } : null,
opts.headers
)
this.sniffInterval = opts.sniffInterval this.sniffInterval = opts.sniffInterval
this.sniffOnConnectionFault = opts.sniffOnConnectionFault this.sniffOnConnectionFault = opts.sniffOnConnectionFault
this.sniffEndpoint = opts.sniffEndpoint this.sniffEndpoint = opts.sniffEndpoint
@ -85,7 +87,6 @@ class Transport {
} }
} }
callback = once(callback)
const meta = { const meta = {
context: options.context || null, context: options.context || null,
request: { request: {
@ -107,8 +108,12 @@ class Transport {
meta meta
} }
const maxRetries = options.maxRetries || this.maxRetries // We should not retry if we are sending a stream body, because we should store in memory
const compression = options.compression || this.compression // a copy of the stream to be able to send it again, but since we don't know in advance
// the size of the stream, we risk to take too much memory.
// Furthermore, copying everytime the stream is very a expensive operation.
const maxRetries = isStream(params.body) ? 0 : options.maxRetries || this.maxRetries
const compression = options.compression !== undefined ? options.compression : this.compression
var request = { abort: noop } var request = { abort: noop }
const makeRequest = () => { const makeRequest = () => {
@ -119,80 +124,9 @@ class Transport {
if (meta.connection == null) { if (meta.connection == null) {
return callback(new NoLivingConnectionsError(), result) return callback(new NoLivingConnectionsError(), result)
} }
// TODO: make this assignment FAST
const headers = Object.assign({}, this.headers, options.headers)
if (options.opaqueId !== undefined) {
headers['X-Opaque-Id'] = this.opaqueIdPrefix !== null
? this.opaqueIdPrefix + options.opaqueId
: options.opaqueId
}
// handle json body
if (params.body != null) {
if (shouldSerialize(params.body) === true) {
try {
params.body = this.serializer.serialize(params.body)
} catch (err) {
return callback(err, result)
}
}
if (params.body !== '') {
headers['Content-Type'] = headers['Content-Type'] || 'application/json'
if (compression === 'gzip') {
if (isStream(params.body) === false) {
params.body = intoStream(params.body).pipe(createGzip())
} else {
params.body = params.body.pipe(createGzip())
}
headers['Content-Encoding'] = compression
}
}
if (isStream(params.body) === false) {
headers['Content-Length'] = '' + Buffer.byteLength(params.body)
}
// handle ndjson body
} else if (params.bulkBody != null) {
if (shouldSerialize(params.bulkBody) === true) {
try {
params.body = this.serializer.ndserialize(params.bulkBody)
} catch (err) {
return callback(err, result)
}
} else {
params.body = params.bulkBody
}
headers['Content-Type'] = headers['Content-Type'] || 'application/x-ndjson'
if (isStream(params.body) === false) {
headers['Content-Length'] = '' + Buffer.byteLength(params.body)
}
}
if (this.suggestCompression === true) {
headers['Accept-Encoding'] = 'gzip,deflate'
}
params.headers = headers
// serializes the querystring
if (options.querystring == null) {
params.querystring = this.serializer.qserialize(params.querystring)
} else {
params.querystring = this.serializer.qserialize(
Object.assign({}, params.querystring, options.querystring)
)
}
meta.request.params = params
meta.request.options = options
this.emit('request', null, result) this.emit('request', null, result)
// handles request timeout
params.timeout = toMs(options.requestTimeout || this.requestTimeout)
if (options.asStream === true) params.asStream = true
// perform the actual http request // perform the actual http request
return meta.connection.request(params, onResponse) request = meta.connection.request(params, onResponse)
} }
const onResponse = (err, response) => { const onResponse = (err, response) => {
@ -213,7 +147,7 @@ class Transport {
if (meta.attempts < maxRetries) { if (meta.attempts < maxRetries) {
meta.attempts++ meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params) debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
request = makeRequest(params, callback) makeRequest()
return return
} }
} }
@ -226,7 +160,7 @@ class Transport {
const { statusCode, headers } = response const { statusCode, headers } = response
result.statusCode = statusCode result.statusCode = statusCode
result.headers = headers result.headers = headers
if (headers['warning'] != null) { if (headers['warning'] !== undefined) {
result.warnings = result.warnings || [] result.warnings = result.warnings || []
// split the string over the commas not inside quotes // split the string over the commas not inside quotes
result.warnings.push.apply(result.warnings, headers['warning'].split(/(?!\B"[^"]*),(?![^"]*"\B)/)) result.warnings.push.apply(result.warnings, headers['warning'].split(/(?!\B"[^"]*),(?![^"]*"\B)/))
@ -255,7 +189,7 @@ class Transport {
// - a `content-type` is defined and is equal to `application/json` // - a `content-type` is defined and is equal to `application/json`
// - the request is not a HEAD request // - the request is not a HEAD request
// - the payload is not an empty string // - the payload is not an empty string
if (headers['content-type'] != null && if (headers['content-type'] !== undefined &&
headers['content-type'].indexOf('application/json') > -1 && headers['content-type'].indexOf('application/json') > -1 &&
isHead === false && isHead === false &&
payload !== '' payload !== ''
@ -285,7 +219,7 @@ class Transport {
if (meta.attempts < maxRetries && statusCode !== 429) { if (meta.attempts < maxRetries && statusCode !== 429) {
meta.attempts++ meta.attempts++
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params) debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
request = makeRequest(params, callback) makeRequest()
return return
} }
} else { } else {
@ -309,7 +243,86 @@ class Transport {
}) })
} }
request = makeRequest() const headers = Object.assign({}, this.headers, options.headers)
if (options.opaqueId !== undefined) {
headers['X-Opaque-Id'] = this.opaqueIdPrefix !== null
? this.opaqueIdPrefix + options.opaqueId
: options.opaqueId
}
// handle json body
if (params.body != null) {
if (shouldSerialize(params.body) === true) {
try {
params.body = this.serializer.serialize(params.body)
} catch (err) {
return callback(err, result)
}
}
if (params.body !== '') {
headers['Content-Type'] = headers['Content-Type'] || 'application/json'
}
// handle ndjson body
} else if (params.bulkBody != null) {
if (shouldSerialize(params.bulkBody) === true) {
try {
params.body = this.serializer.ndserialize(params.bulkBody)
} catch (err) {
return callback(err, result)
}
} else {
params.body = params.bulkBody
}
if (params.body !== '') {
headers['Content-Type'] = headers['Content-Type'] || 'application/x-ndjson'
}
}
params.headers = headers
// serializes the querystring
if (options.querystring == null) {
params.querystring = this.serializer.qserialize(params.querystring)
} else {
params.querystring = this.serializer.qserialize(
Object.assign({}, params.querystring, options.querystring)
)
}
// handles request timeout
params.timeout = toMs(options.requestTimeout || this.requestTimeout)
if (options.asStream === true) params.asStream = true
meta.request.params = params
meta.request.options = options
// handle compression
if (params.body !== '' && params.body != null) {
if (isStream(params.body) === true) {
if (compression === 'gzip') {
params.headers['Content-Encoding'] = compression
params.body = params.body.pipe(createGzip())
}
makeRequest()
} else if (compression === 'gzip') {
gzip(params.body, (err, buffer) => {
/* istanbul ignore next */
if (err) {
return callback(err, result)
}
params.headers['Content-Encoding'] = compression
params.headers['Content-Length'] = '' + Buffer.byteLength(buffer)
params.body = buffer
makeRequest()
})
} else {
params.headers['Content-Length'] = '' + Buffer.byteLength(params.body)
makeRequest()
}
} else {
makeRequest()
}
return { return {
then (onFulfilled, onRejected) { then (onFulfilled, onRejected) {
@ -405,7 +418,7 @@ function shouldSerialize (obj) {
} }
function isStream (obj) { function isStream (obj) {
return typeof obj.pipe === 'function' return obj != null && typeof obj.pipe === 'function'
} }
function defaultNodeFilter (node) { function defaultNodeFilter (node) {

View File

@ -46,6 +46,7 @@
"deepmerge": "^4.0.0", "deepmerge": "^4.0.0",
"dezalgo": "^1.0.3", "dezalgo": "^1.0.3",
"fast-deep-equal": "^3.1.1", "fast-deep-equal": "^3.1.1",
"into-stream": "^5.1.1",
"js-yaml": "^3.13.1", "js-yaml": "^3.13.1",
"license-checker": "^25.0.1", "license-checker": "^25.0.1",
"lolex": "^4.0.1", "lolex": "^4.0.1",
@ -66,9 +67,7 @@
"dependencies": { "dependencies": {
"debug": "^4.1.1", "debug": "^4.1.1",
"decompress-response": "^4.2.0", "decompress-response": "^4.2.0",
"into-stream": "^5.1.0",
"ms": "^2.1.1", "ms": "^2.1.1",
"once": "^1.4.0",
"pump": "^3.0.0", "pump": "^3.0.0",
"secure-json-parse": "^2.1.0" "secure-json-parse": "^2.1.0"
}, },

View File

@ -32,10 +32,7 @@ test('Should emit a request event when a request is performed', t => {
method: 'GET', method: 'GET',
path: '/test/_search', path: '/test/_search',
body: '', body: '',
querystring: 'q=foo%3Abar', querystring: 'q=foo%3Abar'
headers: {
'Content-Length': '0'
}
}, },
options: {}, options: {},
id: 1 id: 1
@ -83,10 +80,7 @@ test('Should emit a response event in case of a successful response', t => {
method: 'GET', method: 'GET',
path: '/test/_search', path: '/test/_search',
body: '', body: '',
querystring: 'q=foo%3Abar', querystring: 'q=foo%3Abar'
headers: {
'Content-Length': '0'
}
}, },
options: {}, options: {},
id: 1 id: 1
@ -132,10 +126,7 @@ test('Should emit a response event with the error set', t => {
method: 'GET', method: 'GET',
path: '/test/_search', path: '/test/_search',
body: '', body: '',
querystring: 'q=foo%3Abar', querystring: 'q=foo%3Abar'
headers: {
'Content-Length': '0'
}
}, },
options: { options: {
requestTimeout: 500 requestTimeout: 500

View File

@ -619,6 +619,57 @@ test('Retry mechanism', t => {
}) })
}) })
test('Should not retry if the body is a stream', t => {
t.plan(2)
var count = 0
function handler (req, res) {
res.setHeader('Content-Type', 'application/json;utf=8')
if (count > 0) {
res.end(JSON.stringify({ hello: 'world' }))
} else {
setTimeout(() => {
res.end(JSON.stringify({ hello: 'world' }))
}, 1000)
}
count++
}
buildServer(handler, ({ port }, server) => {
const pool = new ConnectionPool({ Connection })
pool.addConnection([{
url: new URL(`http://localhost:${port}`),
id: 'node1'
}, {
url: new URL(`http://localhost:${port}`),
id: 'node2'
}, {
url: new URL(`http://localhost:${port}`),
id: 'node3'
}])
const transport = new Transport({
emit: () => {},
connectionPool: pool,
serializer: new Serializer(),
maxRetries: 1,
requestTimeout: 10,
sniffInterval: false,
sniffOnStart: false
})
transport.request({
method: 'POST',
path: '/hello',
body: intoStream(JSON.stringify({ hello: 'world' }))
}, (err, { body }) => {
t.ok(err instanceof TimeoutError)
t.strictEqual(count, 1)
server.stop()
})
})
})
test('Custom retry mechanism', t => { test('Custom retry mechanism', t => {
t.plan(2) t.plan(2)
@ -1956,6 +2007,62 @@ test('Compress request', t => {
}) })
}) })
t.test('Retry a gzipped body', t => {
t.plan(7)
var count = 0
function handler (req, res) {
t.match(req.headers, {
'content-type': 'application/json',
'content-encoding': 'gzip'
})
var json = ''
req
.pipe(createGunzip())
.on('data', chunk => { json += chunk })
.on('error', err => t.fail(err))
.on('end', () => {
t.deepEqual(JSON.parse(json), { you_know: 'for search' })
res.setHeader('Content-Type', 'application/json;utf=8')
if (count++ > 0) {
res.end(JSON.stringify({ you_know: 'for search' }))
} else {
setTimeout(() => {
res.end(JSON.stringify({ you_know: 'for search' }))
}, 1000)
}
})
}
buildServer(handler, ({ port }, server) => {
const pool = new ConnectionPool({ Connection })
pool.addConnection(`http://localhost:${port}`)
const transport = new Transport({
emit: () => {},
connectionPool: pool,
serializer: new Serializer(),
maxRetries: 3,
requestTimeout: 250,
sniffInterval: false,
sniffOnStart: false
})
transport.request({
method: 'POST',
path: '/hello',
body: { you_know: 'for search' }
}, {
compression: 'gzip'
}, (err, { body, meta }) => {
t.error(err)
t.deepEqual(body, { you_know: 'for search' })
t.strictEqual(count, 2)
server.stop()
})
})
})
t.end() t.end()
}) })