Files
elasticsearch-js/lib/Connection.js
Tomas Della Vedova 1d61cba014 Inspect Connection (#784)
Handles `console.log` and `utils.inspect` invocations for a better debugging experience.

`agent` and `ssl` are hidden since they made the logs very hard to read.
The user can still access them with `instance.agent` and `instance.ssl`.
2019-03-19 09:53:43 +01:00

288 lines
7.8 KiB
JavaScript

/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
'use strict'
const assert = require('assert')
const { inspect } = require('util')
const http = require('http')
const https = require('https')
const debug = require('debug')('elasticsearch')
const decompressResponse = require('decompress-response')
const pump = require('pump')
const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/
const { TimeoutError, ConfigurationError } = require('./errors')
class Connection {
constructor (opts = {}) {
this.url = opts.url
this.ssl = opts.ssl || null
this.id = opts.id || stripAuth(opts.url.href)
this.headers = opts.headers || null
this.deadCount = 0
this.resurrectTimeout = 0
this._openRequests = 0
this._status = opts.status || Connection.statuses.ALIVE
this.roles = Object.assign({}, defaultRoles, opts.roles)
if (!['http:', 'https:'].includes(this.url.protocol)) {
throw new ConfigurationError(`Invalid protocol: '${this.url.protocol}'`)
}
// Probably there is a bug in Node Core
// see https://github.com/nodejs/node/issues/26357
const keepAliveFalse = opts.agent && opts.agent.keepAlive === false
const agentOptions = Object.assign({}, {
keepAlive: true,
keepAliveMsecs: 1000,
maxSockets: keepAliveFalse ? Infinity : 256,
maxFreeSockets: 256
}, opts.agent)
this.agent = this.url.protocol === 'http:'
? new http.Agent(agentOptions)
: new https.Agent(Object.assign({}, agentOptions, this.ssl))
this.makeRequest = this.url.protocol === 'http:'
? http.request
: https.request
}
request (params, callback) {
this._openRequests++
var ended = false
const requestParams = this.buildRequestObject(params)
// https://github.com/nodejs/node/commit/b961d9fd83
if (INVALID_PATH_REGEX.test(requestParams.path) === true) {
callback(new TypeError(`ERR_UNESCAPED_CHARACTERS: ${requestParams.path}`), null)
return { abort: () => {} }
}
debug('Starting a new request', params)
const request = this.makeRequest(requestParams)
// listen for the response event
// TODO: handle redirects?
request.on('response', response => {
if (ended === false) {
ended = true
this._openRequests--
if (params.asStream === true) {
callback(null, response)
} else {
callback(null, decompressResponse(response))
}
}
})
// handles request timeout
request.on('timeout', () => {
if (ended === false) {
ended = true
this._openRequests--
request.abort()
callback(new TimeoutError('Request timed out', params), null)
}
})
// handles request error
request.on('error', err => {
if (ended === false) {
ended = true
this._openRequests--
callback(err, null)
}
})
// updates the ended state
request.on('abort', () => {
debug('Request aborted', params)
if (ended === false) {
ended = true
this._openRequests--
}
})
// Disables the Nagle algorithm
request.setNoDelay(true)
// starts the request
if (isStream(params.body) === true) {
pump(params.body, request, err => {
/* istanbul ignore if */
if (err != null && ended === false) {
ended = true
this._openRequests--
callback(err, null)
}
})
} else {
request.end(params.body)
}
return request
}
// TODO: write a better closing logic
close (callback = () => {}) {
debug('Closing connection', this.id)
if (this._openRequests > 0) {
setTimeout(() => this.close(callback), 1000)
} else {
this.agent.destroy()
callback()
}
}
setRole (role, enabled) {
if (validRoles.indexOf(role) === -1) {
throw new ConfigurationError(`Unsupported role: '${role}'`)
}
if (typeof enabled !== 'boolean') {
throw new ConfigurationError('enabled should be a boolean')
}
this.roles[role] = enabled
return this
}
get status () {
return this._status
}
set status (status) {
assert(
~validStatuses.indexOf(status),
`Unsupported status: '${status}'`
)
this._status = status
}
buildRequestObject (params) {
const url = this.url
const request = {
protocol: url.protocol,
hostname: url.hostname[0] === '['
? url.hostname.slice(1, -1)
: url.hostname,
hash: url.hash,
search: url.search,
pathname: url.pathname,
path: '',
href: url.href,
origin: url.origin,
port: url.port,
headers: this.headers,
auth: !!url.username === true || !!url.password === true
? `${url.username}:${url.password}`
: undefined,
agent: this.agent
}
const paramsKeys = Object.keys(params)
for (var i = 0, len = paramsKeys.length; i < len; i++) {
var key = paramsKeys[i]
if (key === 'path') {
request.pathname = resolve(request.pathname, params[key])
} else if (key === 'querystring' && !!params[key] === true) {
if (request.search === '') {
request.search = '?' + params[key]
} else {
request.search += '&' + params[key]
}
} else if (key === 'headers') {
request.headers = Object.assign({}, request.headers, params.headers)
} else {
request[key] = params[key]
}
}
request.path = request.pathname + request.search
return request
}
// Handles console.log and utils.inspect invocations.
// We want to hide `agent` and `ssl` since they made
// the logs very hard to read. The user can still
// access them with `instance.agent` and `instance.ssl`.
[inspect.custom] (depth, options) {
return {
url: this.url,
id: this.id,
headers: this.headers,
deadCount: this.deadCount,
resurrectTimeout: this.resurrectTimeout,
_openRequests: this._openRequests,
status: this.status,
roles: this.roles
}
}
}
Connection.statuses = {
ALIVE: 'alive',
DEAD: 'dead'
}
Connection.roles = {
MASTER: 'master',
DATA: 'data',
INGEST: 'ingest',
ML: 'ml'
}
const defaultRoles = {
[Connection.roles.MASTER]: true,
[Connection.roles.DATA]: true,
[Connection.roles.INGEST]: true,
[Connection.roles.ML]: false
}
const validStatuses = Object.keys(Connection.statuses)
.map(k => Connection.statuses[k])
const validRoles = Object.keys(Connection.roles)
.map(k => Connection.roles[k])
function stripAuth (url) {
if (url.indexOf('@') === -1) return url
return url.slice(0, url.indexOf('//') + 2) + url.slice(url.indexOf('@') + 1)
}
function isStream (obj) {
return obj != null && typeof obj.pipe === 'function'
}
function resolve (host, path) {
const hostEndWithSlash = host[host.length - 1] === '/'
const pathStartsWithSlash = path[0] === '/'
if (hostEndWithSlash === true && pathStartsWithSlash === true) {
return host + path.slice(1)
} else if (hostEndWithSlash !== pathStartsWithSlash) {
return host + path
} else {
return host + '/' + path
}
}
module.exports = Connection