/* * Licensed to Elasticsearch B.V. under one or more contributor * license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright * ownership. Elasticsearch B.V. licenses this file to you under * the Apache License, Version 2.0 (the "License"); you may * not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ 'use strict' const assert = require('assert') const { inspect } = require('util') const http = require('http') const https = require('https') const debug = require('debug')('elasticsearch') const decompressResponse = require('decompress-response') const pump = require('pump') const INVALID_PATH_REGEX = /[^\u0021-\u00ff]/ const { TimeoutError, ConfigurationError } = require('./errors') class Connection { constructor (opts = {}) { this.url = opts.url this.ssl = opts.ssl || null this.id = opts.id || stripAuth(opts.url.href) this.headers = opts.headers || null this.deadCount = 0 this.resurrectTimeout = 0 this._openRequests = 0 this._status = opts.status || Connection.statuses.ALIVE this.roles = Object.assign({}, defaultRoles, opts.roles) if (!['http:', 'https:'].includes(this.url.protocol)) { throw new ConfigurationError(`Invalid protocol: '${this.url.protocol}'`) } if (typeof opts.agent === 'function') { this.agent = opts.agent() } else { const keepAliveFalse = opts.agent && opts.agent.keepAlive === false const agentOptions = Object.assign({}, { keepAlive: true, keepAliveMsecs: 1000, maxSockets: keepAliveFalse ? Infinity : 256, maxFreeSockets: 256 }, opts.agent) this.agent = this.url.protocol === 'http:' ? new http.Agent(agentOptions) : new https.Agent(Object.assign({}, agentOptions, this.ssl)) } this.makeRequest = this.url.protocol === 'http:' ? http.request : https.request } request (params, callback) { this._openRequests++ var ended = false const requestParams = this.buildRequestObject(params) // https://github.com/nodejs/node/commit/b961d9fd83 if (INVALID_PATH_REGEX.test(requestParams.path) === true) { callback(new TypeError(`ERR_UNESCAPED_CHARACTERS: ${requestParams.path}`), null) return { abort: () => {} } } debug('Starting a new request', params) const request = this.makeRequest(requestParams) // listen for the response event // TODO: handle redirects? request.on('response', response => { if (ended === false) { ended = true this._openRequests-- if (params.asStream === true) { callback(null, response) } else { callback(null, decompressResponse(response)) } } }) // handles request timeout request.on('timeout', () => { if (ended === false) { ended = true this._openRequests-- request.abort() callback(new TimeoutError('Request timed out', params), null) } }) // handles request error request.on('error', err => { if (ended === false) { ended = true this._openRequests-- callback(err, null) } }) // updates the ended state request.on('abort', () => { debug('Request aborted', params) if (ended === false) { ended = true this._openRequests-- } }) // Disables the Nagle algorithm request.setNoDelay(true) // starts the request if (isStream(params.body) === true) { pump(params.body, request, err => { /* istanbul ignore if */ if (err != null && ended === false) { ended = true this._openRequests-- callback(err, null) } }) } else { request.end(params.body) } return request } // TODO: write a better closing logic close (callback = () => {}) { debug('Closing connection', this.id) if (this._openRequests > 0) { setTimeout(() => this.close(callback), 1000) } else { this.agent.destroy() callback() } } setRole (role, enabled) { if (validRoles.indexOf(role) === -1) { throw new ConfigurationError(`Unsupported role: '${role}'`) } if (typeof enabled !== 'boolean') { throw new ConfigurationError('enabled should be a boolean') } this.roles[role] = enabled return this } get status () { return this._status } set status (status) { assert( ~validStatuses.indexOf(status), `Unsupported status: '${status}'` ) this._status = status } buildRequestObject (params) { const url = this.url const request = { protocol: url.protocol, hostname: url.hostname[0] === '[' ? url.hostname.slice(1, -1) : url.hostname, hash: url.hash, search: url.search, pathname: url.pathname, path: '', href: url.href, origin: url.origin, port: url.port, headers: this.headers, auth: !!url.username === true || !!url.password === true ? `${url.username}:${url.password}` : undefined, agent: this.agent } const paramsKeys = Object.keys(params) for (var i = 0, len = paramsKeys.length; i < len; i++) { var key = paramsKeys[i] if (key === 'path') { request.pathname = resolve(request.pathname, params[key]) } else if (key === 'querystring' && !!params[key] === true) { if (request.search === '') { request.search = '?' + params[key] } else { request.search += '&' + params[key] } } else if (key === 'headers') { request.headers = Object.assign({}, request.headers, params.headers) } else { request[key] = params[key] } } request.path = request.pathname + request.search return request } // Handles console.log and utils.inspect invocations. // We want to hide `agent` and `ssl` since they made // the logs very hard to read. The user can still // access them with `instance.agent` and `instance.ssl`. [inspect.custom] (depth, options) { return { url: this.url, id: this.id, headers: this.headers, deadCount: this.deadCount, resurrectTimeout: this.resurrectTimeout, _openRequests: this._openRequests, status: this.status, roles: this.roles } } } Connection.statuses = { ALIVE: 'alive', DEAD: 'dead' } Connection.roles = { MASTER: 'master', DATA: 'data', INGEST: 'ingest', ML: 'ml' } const defaultRoles = { [Connection.roles.MASTER]: true, [Connection.roles.DATA]: true, [Connection.roles.INGEST]: true, [Connection.roles.ML]: false } const validStatuses = Object.keys(Connection.statuses) .map(k => Connection.statuses[k]) const validRoles = Object.keys(Connection.roles) .map(k => Connection.roles[k]) function stripAuth (url) { if (url.indexOf('@') === -1) return url return url.slice(0, url.indexOf('//') + 2) + url.slice(url.indexOf('@') + 1) } function isStream (obj) { return obj != null && typeof obj.pipe === 'function' } function resolve (host, path) { const hostEndWithSlash = host[host.length - 1] === '/' const pathStartsWithSlash = path[0] === '/' if (hostEndWithSlash === true && pathStartsWithSlash === true) { return host + path.slice(1) } else if (hostEndWithSlash !== pathStartsWithSlash) { return host + path } else { return host + '/' + path } } module.exports = Connection