WIP: initial prototype

- Hide auth data from Node ID
- Added support for sending streams
This commit is contained in:
delvedor
2018-11-20 18:51:23 +01:00
parent 4ad6c255d6
commit 79b4187f30
3 changed files with 42 additions and 9 deletions

View File

@ -5,14 +5,14 @@ const http = require('http')
const https = require('https')
const debug = require('debug')('elasticsearch')
const decompressResponse = require('decompress-response')
const pump = require('pump')
const { TimeoutError } = require('./errors')
class Connection {
constructor (opts = {}) {
this.url = opts.url
this.ssl = opts.ssl || null
// TODO: clean user:password from id
this.id = opts.id || opts.url.href
this.id = opts.id || stripAuth(opts.url.href)
this.headers = opts.headers || null
this.deadCount = 0
this.resurrectTimeout = 0
@ -81,7 +81,17 @@ class Connection {
request.setNoDelay(true)
// starts the request
if (isStream(params.body) === true) {
pump(params.body, request, err => {
if (err != null && ended === false) {
ended = true
this._openRequests--
callback(err)
}
})
} else {
request.end(params.body)
}
return request
}
@ -191,6 +201,15 @@ const validStatuses = Object.keys(Connection.statuses)
const validRoles = Object.keys(Connection.roles)
.map(k => Connection.roles[k])
function stripAuth (url) {
if (url.indexOf('@') === -1) return url
return url.slice(0, url.indexOf('//') + 2) + url.slice(url.indexOf('@') + 1)
}
function isStream (obj) {
return obj != null && typeof obj.pipe === 'function'
}
function resolve (host, path) {
const hostEndWithSlash = host[host.length - 1] === '/'
const pathStartsWithSlash = path[0] === '/'

View File

@ -34,7 +34,6 @@ class Transport {
}
}
// TODO: should be able to send a stream of json data
request (params, callback) {
callback = once(callback)
const result = { body: null, statusCode: null, headers: null, warnings: null }
@ -47,7 +46,7 @@ class Transport {
params.headers = params.headers || {}
// handle json body
if (params.body != null) {
if (typeof params.body !== 'string') {
if (shouldSerialize(params.body) === true) {
try {
params.body = this.serializer.serialize(params.body)
} catch (err) {
@ -55,10 +54,12 @@ class Transport {
}
}
params.headers['Content-Type'] = 'application/json'
if (isStream(params.body) === false) {
params.headers['Content-Length'] = '' + Buffer.byteLength(params.body)
}
// handle ndjson body
} else if (params.bulkBody != null) {
if (typeof params.bulkBody !== 'string') {
if (shouldSerialize(params.bulkBody) === true) {
try {
params.body = this.serializer.ndserialize(params.bulkBody)
} catch (err) {
@ -68,8 +69,10 @@ class Transport {
params.body = params.bulkBody
}
params.headers['Content-Type'] = 'application/x-ndjson'
if (isStream(params.body) === false) {
params.headers['Content-Length'] = '' + Buffer.byteLength(params.body)
}
}
if (this.suggestCompression === true) {
params.headers['Accept-Encoding'] = 'gzip,deflate'
@ -239,4 +242,14 @@ function toMs (time) {
return time
}
function shouldSerialize (obj) {
return typeof obj !== 'string' &&
typeof obj.pipe !== 'function' &&
Buffer.isBuffer(obj) === false
}
function isStream (obj) {
return typeof obj.pipe === 'function'
}
module.exports = Transport

View File

@ -51,7 +51,8 @@
"debug": "^4.1.0",
"decompress-response": "^3.3.0",
"ms": "^2.1.1",
"once": "^1.4.0"
"once": "^1.4.0",
"pump": "^3.0.0"
},
"license": "Apache-2.0",
"repository": {