WIP: initial prototype
- Added body compression - Removed old options
This commit is contained in:
@ -2,24 +2,31 @@
|
||||
|
||||
const debug = require('debug')('elasticsearch')
|
||||
const once = require('once')
|
||||
const { createGzip } = require('zlib')
|
||||
const intoStream = require('into-stream')
|
||||
const ms = require('ms')
|
||||
const {
|
||||
ConnectionError,
|
||||
TimeoutError,
|
||||
NoLivingConnectionsError,
|
||||
ResponseError
|
||||
ResponseError,
|
||||
ConfigurationError
|
||||
} = require('./errors')
|
||||
|
||||
const noop = () => {}
|
||||
|
||||
class Transport {
|
||||
constructor (opts = {}) {
|
||||
if (typeof opts.compression === 'string' && opts.compression !== 'gzip') {
|
||||
throw new ConfigurationError(`Invalid compression: '${opts.compression}'`)
|
||||
}
|
||||
this.emit = opts.emit
|
||||
this.connectionPool = opts.connectionPool
|
||||
this.serializer = opts.serializer
|
||||
this.maxRetries = opts.maxRetries
|
||||
this.requestTimeout = toMs(opts.requestTimeout)
|
||||
this.suggestCompression = opts.suggestCompression === true
|
||||
this.compression = opts.compression || false
|
||||
this.sniffInterval = opts.sniffInterval
|
||||
this.sniffOnConnectionFault = opts.sniffOnConnectionFault
|
||||
this.sniffEndpoint = opts.sniffEndpoint
|
||||
@ -65,6 +72,7 @@ class Transport {
|
||||
warnings: options.warnings || null
|
||||
}
|
||||
const maxRetries = options.maxRetries || this.maxRetries
|
||||
const compression = options.compression || this.compression
|
||||
var request = { abort: noop }
|
||||
|
||||
const makeRequest = () => {
|
||||
@ -75,6 +83,7 @@ class Transport {
|
||||
}
|
||||
|
||||
const headers = options.headers || {}
|
||||
|
||||
// handle json body
|
||||
if (params.body != null) {
|
||||
if (shouldSerialize(params.body) === true) {
|
||||
@ -85,6 +94,12 @@ class Transport {
|
||||
}
|
||||
}
|
||||
headers['Content-Type'] = 'application/json'
|
||||
|
||||
if (compression === 'gzip') {
|
||||
params.body = intoStream(params.body).pipe(createGzip())
|
||||
headers['Content-Encoding'] = compression
|
||||
}
|
||||
|
||||
if (isStream(params.body) === false) {
|
||||
headers['Content-Length'] = '' + Buffer.byteLength(params.body)
|
||||
}
|
||||
@ -207,8 +222,8 @@ class Transport {
|
||||
// if the statusCode is 502/3/4 we should run our retry strategy
|
||||
// and mark the connection as dead
|
||||
this.connectionPool.markDead(meta.connection)
|
||||
// retry logic
|
||||
if (meta.attempts < maxRetries) {
|
||||
// retry logic (we shoukd not retry on "429 - Too Many Requests")
|
||||
if (meta.attempts < maxRetries && statusCode !== 429) {
|
||||
meta.attempts++
|
||||
debug(`Retrying request, there are still ${maxRetries - meta.attempts} attempts`, params)
|
||||
request = makeRequest(params, callback)
|
||||
|
||||
Reference in New Issue
Block a user