From e1b80882af37d24fd9eac125d3dc7ae28aa2d931 Mon Sep 17 00:00:00 2001 From: delvedor Date: Thu, 18 Oct 2018 17:27:31 +0200 Subject: [PATCH] Initial prototype --- index.js | 79 +++++++++++++++ lib/Connection.js | 83 ++++++++++++++++ lib/ConnectionPool.js | 223 ++++++++++++++++++++++++++++++++++++++++++ lib/Selectors.js | 23 +++++ lib/Serializer.js | 28 ++++++ lib/Transport.js | 126 ++++++++++++++++++++++++ lib/errors.js | 76 ++++++++++++++ lib/symbols.js | 15 +++ 8 files changed, 653 insertions(+) create mode 100644 index.js create mode 100644 lib/Connection.js create mode 100644 lib/ConnectionPool.js create mode 100644 lib/Selectors.js create mode 100644 lib/Serializer.js create mode 100644 lib/Transport.js create mode 100644 lib/errors.js create mode 100644 lib/symbols.js diff --git a/index.js b/index.js new file mode 100644 index 000000000..5691a6638 --- /dev/null +++ b/index.js @@ -0,0 +1,79 @@ +'use strict' + +const Transport = require('./lib/Transport') +const Connection = require('./lib/Connection') +const ConnectionPool = require('./lib/ConnectionPool') +const Serializer = require('./lib/Serializer') +const selectors = require('./lib/Selectors') +const symbols = require('./lib/symbols') +const { BadConfigurationError } = require('./lib/errors') + +// const buildApi = require('../monorepo/packages/es-api-6') + +const { + kTransport, + kConnectionPool, + kSerializer, + kSelector +} = symbols + +class Client { + constructor (opts = {}) { + if (!opts.host) { + throw new BadConfigurationError('Missing host option') + } + + // if (opts.log) { + // this.on('response', console.log) + // this.on('error', console.log) + // } + + const Selector = selectors.RoundRobinSelector + const options = Object.assign({}, { + Connection, + ConnectionPool, + Transport, + Serializer, + Selector, + maxRetries: 3, + requestTimeout: 30000, + sniffInterval: false, + sniffOnStart: false + }, opts) + + this[kSelector] = new options.Selector() + this[kSerializer] = new options.Serializer() + this[kConnectionPool] = new options.ConnectionPool({ + selector: this[kSelector] + }) + this[kTransport] = new options.Transport({ + connectionPool: this[kConnectionPool], + serializer: this[kSerializer], + maxRetries: options.maxRetries, + requestTimeout: options.requestTimeout, + sniffInterval: options.sniffInterval, + sniffOnStart: options.sniffOnStart + }) + + this.request = this[kTransport].request.bind(this[kTransport]) + + // const apis = buildApi({ + // makeRequest: this[kTransport].request.bind(this[kTransport]) + // }) + + // Object.keys(apis).forEach(api => { + // this[api] = apis[api] + // }) + + this[kConnectionPool].addConnection(options.host) + } +} + +module.exports = { + Client, + Transport, + ConnectionPool, + Serializer, + selectors, + symbols +} diff --git a/lib/Connection.js b/lib/Connection.js new file mode 100644 index 000000000..8e9d74386 --- /dev/null +++ b/lib/Connection.js @@ -0,0 +1,83 @@ +'use strict' + +const assert = require('assert') +const debug = require('debug')('elasticsearch') +const { resolve } = require('url') +const makeRequest = require('simple-get') + +class Connection { + constructor (opts = {}) { + assert(opts.url, 'Missing url') + + this.url = opts.url + this.id = opts.id || opts.url + this.deadCount = 0 + this.resurrectTimeout = 0 + + this._status = opts.status || Connection.statuses.ALIVE + this.roles = opts.roles || defaultRoles + } + + request (params, callback) { + params.url = resolve(this.url, params.path) + debug('Starting a new request', params) + return makeRequest(params, callback) + } + + close () { + debug('Closing connection') + } + + setRole (role, enabled) { + assert( + ~validRoles.indexOf(role), + `Unsupported role: '${role}'` + ) + assert( + typeof enabled === 'boolean', + 'enabled should be a boolean' + ) + this.roles[role] = enabled + return this + } + + get status () { + return this._status + } + + set status (status) { + assert( + ~validStatuses.indexOf(status), + `Unsupported status: '${status}'` + ) + this._status = status + } +} + +Connection.statuses = { + ALIVE: 'alive', + DEAD: 'dead' +} + +Connection.roles = { + MASTER: 'master', + DATA: 'data', + INGEST: 'ingest', + COORDINATING: 'coordinating', + MACHINE_LEARNING: 'machine_learning' +} + +const defaultRoles = { + [Connection.roles.MASTER]: true, + [Connection.roles.DATA]: true, + [Connection.roles.INGEST]: true, + [Connection.roles.COORDINATING]: true, + [Connection.roles.MACHINE_LEARNING]: true +} + +const validStatuses = Object.keys(Connection.statuses) + .map(k => Connection.statuses[k]) +const validRoles = Object.keys(Connection.roles) + .map(k => Connection.roles[k]) + +module.exports = Connection diff --git a/lib/ConnectionPool.js b/lib/ConnectionPool.js new file mode 100644 index 000000000..a1a1c3637 --- /dev/null +++ b/lib/ConnectionPool.js @@ -0,0 +1,223 @@ +'use strict' + +const debug = require('debug')('elasticsearch') +const Connection = require('./Connection') + +class ConnectionPool { + constructor (opts = {}) { + this.connections = new Map() + this.alive = [] + this.dead = [] + this.selector = opts.selector + // the resurrect timeout is 60s + // we multiply it by 2 because the resurrect formula is + // `Math.pow(resurrectTimeout * 2, deadCount -1)` + // and we don't need to multiply by 2 + // the resurrectTimeout every time + this.resurrectTimeout = 1000 * 60 * 2 + // number of consecutive failures after which + // the timeout doesn't increase + this.resurrectTimeoutCutoff = 5 + } + + /** + * Marks a connection as 'alive'. + * If needed moves the connection from the dead list + * to the alive list and then resets the `deadCount`. + * + * @param {object} connection + */ + markAlive (connection) { + const { id } = connection + debug(`Marking as 'alive' connection '${id}'`) + if (this.alive.indexOf(id) === -1) { + this.alive.push(id) + const index = this.dead.indexOf(id) + if (index > -1) this.dead.splice(index, 1) + } + connection.status = Connection.statuses.ALIVE + connection.deadCount = 0 + connection.resurrectTimeout = 0 + this.connections.set(id, connection) + } + + /** + * Marks a connection as 'dead'. + * If needed moves the connection from the alive list + * to the dead list and then increments the `deadCount`. + * + * @param {object} connection + */ + markDead (connection) { + const { id } = connection + debug(`Marking as 'dead' connection '${id}'`) + if (this.dead.indexOf(id) === -1) { + this.dead.push(id) + const index = this.alive.indexOf(id) + if (index > -1) this.alive.splice(index, 1) + } + connection.status = Connection.statuses.DEAD + connection.deadCount++ + // resurrectTimeout formula: + // `Math.pow(resurrectTimeout * 2, deadCount -1)` + // we don't need to multiply the resurrectTimeout by 2 + // every time, it is cached during the initialization + connection.resurrectTimeout = Date.now() + Math.pow( + this.resurrectTimeout, + Math.min( + connection.deadCount - 1, + this.resurrectTimeoutCutoff + ) + ) + this.connections.set(id, connection) + + // sort the dead list in ascending order + // based on the resurrectTimeout + this.dead.sort((a, b) => { + const conn1 = this.connections.get(a) + const conn2 = this.connections.get(b) + return conn1.resurrectTimeout - conn2.resurrectTimeout + }) + } + + /** + * Tries to resurrect a connection if the `resurrectTimeout` + * has been reached, if so, it moves the connection to the + * alive list without resetting the `deadCount` or the `resurrectTimeout` + * + * @param {number} epoch + * @returns {object} connection + */ + resurrect (now = Date.now()) { + if (this.dead.length === 0) return + + // the dead list is sorted in ascending order based on the timeout + // so the first element will always be the one with the smalles timeout + const connection = this.connections.get(this.dead[0]) + if (now < connection.resurrectTimeout) { + debug('Nothing to resurrect') + return + } + + const { id } = connection + debug(`Trying resurrect connection '${id}'`) + this.alive.push(id) + this.dead.splice(this.dead.indexOf(id), 1) + + connection.status = Connection.statuses.ALIVE + this.connections.set(id, connection) + return connection + } + + /** + * Returns an alive connection if present, + * otherwise returns null. + * It uses the selector to choose which + * connection return. + * + * @returns {object|null} connection + */ + getConnection () { + if (this.alive.length === 0) { + return null + } + + const id = this.selector.select(this.alive) + return this.connections.get(id) + } + + /** + * Adds a new connection to the pool. + * + * @param {object|string} host + * @returns {ConnectionPool} + */ + addConnection (host) { + if (Array.isArray(host)) { + host.forEach(h => this.addConnection(h)) + return + } + + if (typeof host === 'string') { + host = this.urlToHost(host) + } + const connection = new Connection(host) + debug('Adding a new connection', connection) + this.connections.set(connection.id, connection) + this.alive.push(connection.id) + return this + } + + /** + * Removes a new connection to the pool. + * + * @param {object} connection + * @returns {ConnectionPool} + */ + removeConnection (connection) { + debug('Removing connection', connection) + const { id } = connection + this.connections.delete(id) + var index = this.dead.indexOf(id) + if (index > -1) this.dead.splice(index, 1) + index = this.alive.indexOf(id) + if (index > -1) this.alive.splice(index, 1) + return this + } + + /** + * Empties the connection pool. + * + * @returns {ConnectionPool} + */ + empty () { + debug('Emptying the connection pool') + this.connections.forEach(connection => { + connection.close() + }) + this.connections = new Map() + this.alive = [] + this.dead = [] + return this + } + + /** + * Transforms the nodes objects to a host object. + * + * @param {object} nodes + * @returns {array} hosts + */ + nodesToHost (nodes) { + const ids = Object.keys(nodes) + const hosts = [] + + for (var i = 0, len = ids.length; i < len; i++) { + const node = nodes[ids[i]] + hosts.push({ + url: node.http.publish_address, + id: ids[i], + roles: node.roles.reduce((acc, role) => { + acc[role] = true + return acc + }, {}) + }) + } + + return hosts + } + + /** + * Transforms an url string to a host object + * + * @param {string} url + * @returns {object} host + */ + urlToHost (url) { + return { + id: url, + url + } + } +} + +module.exports = ConnectionPool diff --git a/lib/Selectors.js b/lib/Selectors.js new file mode 100644 index 000000000..3da41a6de --- /dev/null +++ b/lib/Selectors.js @@ -0,0 +1,23 @@ +'use strict' + +class RoundRobinSelector { + constructor () { + this.current = -1 + } + + select (connections) { + if (++this.current >= connections.length) { + this.current = 0 + } + return connections[this.current] + } +} + +class RandomSelector { + select (connections) { + const index = Math.floor(Math.random() * connections.length) + return connections[index] + } +} + +module.exports = { RoundRobinSelector, RandomSelector } diff --git a/lib/Serializer.js b/lib/Serializer.js new file mode 100644 index 000000000..ea37ed513 --- /dev/null +++ b/lib/Serializer.js @@ -0,0 +1,28 @@ +'use strict' + +const debug = require('debug')('elasticsearch') +const { SerializationError, DeserializationError } = require('./errors') + +class Serializer { + serialize (object) { + debug('Serializing', object) + try { + var json = JSON.stringify(object) + } catch (err) { + throw new SerializationError(err.message) + } + return json + } + + deserialize (json) { + debug('Deserializing', json) + try { + var object = JSON.parse(json) + } catch (err) { + throw new DeserializationError(err.message) + } + return object + } +} + +module.exports = Serializer diff --git a/lib/Transport.js b/lib/Transport.js new file mode 100644 index 000000000..eb8531c3f --- /dev/null +++ b/lib/Transport.js @@ -0,0 +1,126 @@ +'use strict' + +const debug = require('debug')('elasticsearch') +const once = require('once') +const { + ConnectionError, + TimeoutError, + NoLivingConnectionsError, + ResponseError +} = require('./errors') + +const noop = () => {} +const kRemainingAttempts = Symbol('elasticsearch-remaining-attempts') + +class Transport { + constructor (opts = {}) { + this.connectionPool = opts.connectionPool + this.serializer = opts.serializer + this.maxRetries = opts.maxRetries + this.requestTimeout = opts.requestTimeout + this.sniffInterval = opts.sniffInterval + + this._sniffEnabled = typeof this.sniffInterval === 'number' + this._nextSniff = this._sniffEnabled ? (Date.now() + this.sniffInterval) : 0 + this._isSniffing = false + + if (opts.sniffOnStart === true) { + this.sniff() + } + } + + request (params, callback) { + callback = once(callback) + const attempts = params[kRemainingAttempts] || params.maxRetries || this.maxRetries + const connection = this.getConnection() + if (connection === null) { + return callback(new NoLivingConnectionsError('There are not living connections')) + } + + if (params.body !== null) { + try { + params.body = this.serializer.serialize(params.body) + } catch (err) { + return callback(err) + } + params.headers = params.headers || {} + params.headers['Content-Type'] = 'application/json' + params.headers['Content-Length'] = '' + Buffer.byteLength(params.body) + } + + const request = connection.request(params, (err, response) => { + if (err != null) { + this.connectionPool.markDead(connection) + if (attempts > 0) { + debug(`Retrying request, there are still ${attempts} attempts`, params) + params[kRemainingAttempts] = attempts - 1 + return this.request(params, callback) + } + + if (err.message === 'Request timed out') { + return callback(new TimeoutError(err.message)) + } else { + return callback(new ConnectionError(err.message)) + } + } + + var json = '' + response.setEncoding('utf8') + response.on('data', chunk => { json += chunk }) + response.on('error', err => callback(err)) + response.on('end', () => { + this.connectionPool.markAlive(connection) + try { + var payload = this.serializer.deserialize(json) + } catch (err) { + return callback(err) + } + const { statusCode } = response + if (statusCode >= 400) { + callback(new ResponseError(payload)) + } else { + callback(null, payload) + } + }) + }) + + return function requestAborter () { + request.abort() + debug('Request aborted', params) + } + } + + getConnection () { + const now = Date.now() + if (this._sniffEnabled === true && now > this._nextSniff) { + this.sniff(now) + } + this.connectionPool.resurrect(now) + return this.connectionPool.getConnection() + } + + sniff (now = Date.now(), callback = noop) { + if (this._isSniffing === true) return + debug('Started sniffing request') + this.request({ + method: 'GET', + path: '_nodes/_all/http' + }, (err, body) => { + if (this._sniffEnabled === true) { + this._nextSniff = now + this.sniffInterval + } + if (err) { + debug('Siffing errored', err) + return callback(err) + } + debug('Siffing ended successfully', body) + const hosts = this.connectionPool.nodesToHost(body.nodes) + this.connectionPool + .empty() + .addConnection(hosts) + callback() + }) + } +} + +module.exports = Transport diff --git a/lib/errors.js b/lib/errors.js new file mode 100644 index 000000000..38c82eb67 --- /dev/null +++ b/lib/errors.js @@ -0,0 +1,76 @@ +'use strict' + +class BadConfigurationError extends Error { + constructor (message) { + super() + Error.captureStackTrace(this, BadConfigurationError) + this.name = 'BadConfigurationError' + this.message = message || 'Bad Configuration Error' + } +} + +class TimeoutError extends Error { + constructor (message) { + super() + Error.captureStackTrace(this, TimeoutError) + this.name = 'TimeoutError' + this.message = message || 'Timeout Error' + } +} + +class ConnectionError extends Error { + constructor (message) { + super() + Error.captureStackTrace(this, ConnectionError) + this.name = 'ConnectionError' + this.message = message || 'Connection Error' + } +} + +class NoLivingConnectionsError extends Error { + constructor (message) { + super() + Error.captureStackTrace(this, NoLivingConnectionsError) + this.name = 'NoLivingConnectionsError' + this.message = message || 'No Living Connections Error' + } +} + +class SerializationError extends Error { + constructor (message) { + super() + Error.captureStackTrace(this, SerializationError) + this.name = 'SerializationError' + this.message = message || 'Serialization Error' + } +} + +class DeserializationError extends Error { + constructor (message) { + super() + Error.captureStackTrace(this, DeserializationError) + this.name = 'DeserializationError' + this.message = message || 'Deserialization Error' + } +} + +class ResponseError extends Error { + constructor (err) { + super() + Error.captureStackTrace(this, ResponseError) + this.name = 'ResponseError' + this.message = (err && err.error && err.error.type) || 'Response Error' + this.response = err + this.statusCode = err && err.status + } +} + +module.exports = { + BadConfigurationError, + TimeoutError, + ConnectionError, + NoLivingConnectionsError, + SerializationError, + DeserializationError, + ResponseError +} diff --git a/lib/symbols.js b/lib/symbols.js new file mode 100644 index 000000000..313b6d4ce --- /dev/null +++ b/lib/symbols.js @@ -0,0 +1,15 @@ +'use strict' + +const kTransport = Symbol('elasticsearch-transport') +const kConnection = Symbol('elasticsearch-connection') +const kConnectionPool = Symbol('elasticsearch-connection-pool') +const kSerializer = Symbol('elasticsearch-serializer') +const kSelector = Symbol('elasticsearch-selector') + +module.exports = { + kTransport, + kConnection, + kConnectionPool, + kSerializer, + kSelector +}