WIP: initial prototype

- Added support for different format of requestTimemout
- Changed api method result
- Now we are always returning the result in case of error
- Improved body deserialization
- Added cast to boolen for HEAD requests
- Added support for already serialized strings in the ndserializer
- Fixed qserializer in case of null object
- Updated Errors
This commit is contained in:
delvedor
2018-10-30 16:32:10 +01:00
parent 961b8224ef
commit c9635c4a71
5 changed files with 64 additions and 39 deletions

View File

@ -2,6 +2,7 @@
const debug = require('debug')('elasticsearch')
const once = require('once')
const ms = require('ms')
const {
ConnectionError,
TimeoutError,
@ -18,7 +19,7 @@ class Transport {
this.connectionPool = opts.connectionPool
this.serializer = opts.serializer
this.maxRetries = opts.maxRetries
this.requestTimeout = opts.requestTimeout
this.requestTimeout = toMs(opts.requestTimeout)
this.sniffInterval = opts.sniffInterval
this.sniffOnConnectionFault = opts.sniffOnConnectionFault
this.sniffEndpoint = opts.sniffEndpoint
@ -34,10 +35,11 @@ class Transport {
request (params, callback) {
callback = once(callback)
const result = { body: null, statusCode: null, headers: null }
const attempts = params[kRemainingAttempts] || params.maxRetries || this.maxRetries
const connection = this.getConnection()
if (connection === null) {
return callback(new NoLivingConnectionsError('There are not living connections'))
return callback(new NoLivingConnectionsError('There are not living connections'), result)
}
// handle json body
@ -46,7 +48,7 @@ class Transport {
try {
params.body = this.serializer.serialize(params.body)
} catch (err) {
return callback(err)
return callback(err, result)
}
}
params.headers = params.headers || {}
@ -58,7 +60,7 @@ class Transport {
try {
params.body = this.serializer.ndserialize(params.bulkBody)
} catch (err) {
return callback(err)
return callback(err, result)
}
} else {
params.body = params.bulkBody
@ -70,7 +72,9 @@ class Transport {
// serializes the querystring
params.querystring = this.serializer.qserialize(params.querystring)
params.timeout = params.timeout || this.requestTimeout
// handles request timeout
params.timeout = toMs(params.requestTimeout || this.requestTimeout)
this.emit('request', params)
const request = connection.request(params, (err, response) => {
@ -91,30 +95,33 @@ class Transport {
: new ConnectionError(err.message, params)
this.emit('error', error, params)
return callback(error)
return callback(error, result)
}
var json = ''
response.setEncoding('utf8')
response.on('data', chunk => { json += chunk })
response.on('error', err => callback(new ConnectionError(err.message, params)))
response.on('end', () => {
debug('JSON response', params, json)
const { statusCode, headers } = response
result.statusCode = statusCode
result.headers = headers
const contentType = response.headers['content-type']
if (contentType != null && contentType.indexOf('application/json') > -1) {
var payload = ''
response.setEncoding('utf8')
response.on('data', chunk => { payload += chunk })
response.on('error', err => callback(new ConnectionError(err.message, params), result))
response.on('end', () => {
const isHead = params.method === 'HEAD'
const shouldDeserialize = headers['content-type'] != null && isHead === false && payload !== ''
if (shouldDeserialize === true && headers['content-type'].indexOf('application/json') > -1) {
try {
var payload = this.serializer.deserialize(json)
result.body = this.serializer.deserialize(payload)
} catch (err) {
this.emit('error', err)
return callback(err)
return callback(err, result)
}
} else {
payload = json
result.body = isHead === true ? true : payload
}
const { statusCode, headers } = response
const ignoreStatusCode = Array.isArray(params.ignore) && params.ignore.indexOf(statusCode) > -1
const ignoreStatusCode = (Array.isArray(params.ignore) && params.ignore.indexOf(statusCode) > -1) ||
(isHead === true && statusCode === 404)
if (ignoreStatusCode === false &&
(statusCode === 502 || statusCode === 503 || statusCode === 504)) {
@ -128,11 +135,14 @@ class Transport {
this.connectionPool.markAlive(connection)
}
this.emit('response', params, { statusCode, payload, headers })
this.emit('response', params, result)
if (ignoreStatusCode === false && statusCode >= 400) {
callback(new ResponseError(payload, statusCode, headers))
callback(new ResponseError(result), result)
} else {
callback(null, payload)
if (isHead === true && statusCode === 404) {
result.body = false
}
callback(null, result)
}
})
})
@ -184,4 +194,11 @@ class Transport {
}
}
function toMs (time) {
if (typeof time === 'string') {
return ms(time)
}
return time
}
module.exports = Transport