Initial prototype

This commit is contained in:
delvedor
2018-10-18 17:27:31 +02:00
parent 7dbb9214d2
commit e1b80882af
8 changed files with 653 additions and 0 deletions

79
index.js Normal file
View File

@ -0,0 +1,79 @@
'use strict'
const Transport = require('./lib/Transport')
const Connection = require('./lib/Connection')
const ConnectionPool = require('./lib/ConnectionPool')
const Serializer = require('./lib/Serializer')
const selectors = require('./lib/Selectors')
const symbols = require('./lib/symbols')
const { BadConfigurationError } = require('./lib/errors')
// const buildApi = require('../monorepo/packages/es-api-6')
const {
kTransport,
kConnectionPool,
kSerializer,
kSelector
} = symbols
class Client {
constructor (opts = {}) {
if (!opts.host) {
throw new BadConfigurationError('Missing host option')
}
// if (opts.log) {
// this.on('response', console.log)
// this.on('error', console.log)
// }
const Selector = selectors.RoundRobinSelector
const options = Object.assign({}, {
Connection,
ConnectionPool,
Transport,
Serializer,
Selector,
maxRetries: 3,
requestTimeout: 30000,
sniffInterval: false,
sniffOnStart: false
}, opts)
this[kSelector] = new options.Selector()
this[kSerializer] = new options.Serializer()
this[kConnectionPool] = new options.ConnectionPool({
selector: this[kSelector]
})
this[kTransport] = new options.Transport({
connectionPool: this[kConnectionPool],
serializer: this[kSerializer],
maxRetries: options.maxRetries,
requestTimeout: options.requestTimeout,
sniffInterval: options.sniffInterval,
sniffOnStart: options.sniffOnStart
})
this.request = this[kTransport].request.bind(this[kTransport])
// const apis = buildApi({
// makeRequest: this[kTransport].request.bind(this[kTransport])
// })
// Object.keys(apis).forEach(api => {
// this[api] = apis[api]
// })
this[kConnectionPool].addConnection(options.host)
}
}
module.exports = {
Client,
Transport,
ConnectionPool,
Serializer,
selectors,
symbols
}

83
lib/Connection.js Normal file
View File

@ -0,0 +1,83 @@
'use strict'
const assert = require('assert')
const debug = require('debug')('elasticsearch')
const { resolve } = require('url')
const makeRequest = require('simple-get')
class Connection {
constructor (opts = {}) {
assert(opts.url, 'Missing url')
this.url = opts.url
this.id = opts.id || opts.url
this.deadCount = 0
this.resurrectTimeout = 0
this._status = opts.status || Connection.statuses.ALIVE
this.roles = opts.roles || defaultRoles
}
request (params, callback) {
params.url = resolve(this.url, params.path)
debug('Starting a new request', params)
return makeRequest(params, callback)
}
close () {
debug('Closing connection')
}
setRole (role, enabled) {
assert(
~validRoles.indexOf(role),
`Unsupported role: '${role}'`
)
assert(
typeof enabled === 'boolean',
'enabled should be a boolean'
)
this.roles[role] = enabled
return this
}
get status () {
return this._status
}
set status (status) {
assert(
~validStatuses.indexOf(status),
`Unsupported status: '${status}'`
)
this._status = status
}
}
Connection.statuses = {
ALIVE: 'alive',
DEAD: 'dead'
}
Connection.roles = {
MASTER: 'master',
DATA: 'data',
INGEST: 'ingest',
COORDINATING: 'coordinating',
MACHINE_LEARNING: 'machine_learning'
}
const defaultRoles = {
[Connection.roles.MASTER]: true,
[Connection.roles.DATA]: true,
[Connection.roles.INGEST]: true,
[Connection.roles.COORDINATING]: true,
[Connection.roles.MACHINE_LEARNING]: true
}
const validStatuses = Object.keys(Connection.statuses)
.map(k => Connection.statuses[k])
const validRoles = Object.keys(Connection.roles)
.map(k => Connection.roles[k])
module.exports = Connection

223
lib/ConnectionPool.js Normal file
View File

@ -0,0 +1,223 @@
'use strict'
const debug = require('debug')('elasticsearch')
const Connection = require('./Connection')
class ConnectionPool {
constructor (opts = {}) {
this.connections = new Map()
this.alive = []
this.dead = []
this.selector = opts.selector
// the resurrect timeout is 60s
// we multiply it by 2 because the resurrect formula is
// `Math.pow(resurrectTimeout * 2, deadCount -1)`
// and we don't need to multiply by 2
// the resurrectTimeout every time
this.resurrectTimeout = 1000 * 60 * 2
// number of consecutive failures after which
// the timeout doesn't increase
this.resurrectTimeoutCutoff = 5
}
/**
* Marks a connection as 'alive'.
* If needed moves the connection from the dead list
* to the alive list and then resets the `deadCount`.
*
* @param {object} connection
*/
markAlive (connection) {
const { id } = connection
debug(`Marking as 'alive' connection '${id}'`)
if (this.alive.indexOf(id) === -1) {
this.alive.push(id)
const index = this.dead.indexOf(id)
if (index > -1) this.dead.splice(index, 1)
}
connection.status = Connection.statuses.ALIVE
connection.deadCount = 0
connection.resurrectTimeout = 0
this.connections.set(id, connection)
}
/**
* Marks a connection as 'dead'.
* If needed moves the connection from the alive list
* to the dead list and then increments the `deadCount`.
*
* @param {object} connection
*/
markDead (connection) {
const { id } = connection
debug(`Marking as 'dead' connection '${id}'`)
if (this.dead.indexOf(id) === -1) {
this.dead.push(id)
const index = this.alive.indexOf(id)
if (index > -1) this.alive.splice(index, 1)
}
connection.status = Connection.statuses.DEAD
connection.deadCount++
// resurrectTimeout formula:
// `Math.pow(resurrectTimeout * 2, deadCount -1)`
// we don't need to multiply the resurrectTimeout by 2
// every time, it is cached during the initialization
connection.resurrectTimeout = Date.now() + Math.pow(
this.resurrectTimeout,
Math.min(
connection.deadCount - 1,
this.resurrectTimeoutCutoff
)
)
this.connections.set(id, connection)
// sort the dead list in ascending order
// based on the resurrectTimeout
this.dead.sort((a, b) => {
const conn1 = this.connections.get(a)
const conn2 = this.connections.get(b)
return conn1.resurrectTimeout - conn2.resurrectTimeout
})
}
/**
* Tries to resurrect a connection if the `resurrectTimeout`
* has been reached, if so, it moves the connection to the
* alive list without resetting the `deadCount` or the `resurrectTimeout`
*
* @param {number} epoch
* @returns {object} connection
*/
resurrect (now = Date.now()) {
if (this.dead.length === 0) return
// the dead list is sorted in ascending order based on the timeout
// so the first element will always be the one with the smalles timeout
const connection = this.connections.get(this.dead[0])
if (now < connection.resurrectTimeout) {
debug('Nothing to resurrect')
return
}
const { id } = connection
debug(`Trying resurrect connection '${id}'`)
this.alive.push(id)
this.dead.splice(this.dead.indexOf(id), 1)
connection.status = Connection.statuses.ALIVE
this.connections.set(id, connection)
return connection
}
/**
* Returns an alive connection if present,
* otherwise returns null.
* It uses the selector to choose which
* connection return.
*
* @returns {object|null} connection
*/
getConnection () {
if (this.alive.length === 0) {
return null
}
const id = this.selector.select(this.alive)
return this.connections.get(id)
}
/**
* Adds a new connection to the pool.
*
* @param {object|string} host
* @returns {ConnectionPool}
*/
addConnection (host) {
if (Array.isArray(host)) {
host.forEach(h => this.addConnection(h))
return
}
if (typeof host === 'string') {
host = this.urlToHost(host)
}
const connection = new Connection(host)
debug('Adding a new connection', connection)
this.connections.set(connection.id, connection)
this.alive.push(connection.id)
return this
}
/**
* Removes a new connection to the pool.
*
* @param {object} connection
* @returns {ConnectionPool}
*/
removeConnection (connection) {
debug('Removing connection', connection)
const { id } = connection
this.connections.delete(id)
var index = this.dead.indexOf(id)
if (index > -1) this.dead.splice(index, 1)
index = this.alive.indexOf(id)
if (index > -1) this.alive.splice(index, 1)
return this
}
/**
* Empties the connection pool.
*
* @returns {ConnectionPool}
*/
empty () {
debug('Emptying the connection pool')
this.connections.forEach(connection => {
connection.close()
})
this.connections = new Map()
this.alive = []
this.dead = []
return this
}
/**
* Transforms the nodes objects to a host object.
*
* @param {object} nodes
* @returns {array} hosts
*/
nodesToHost (nodes) {
const ids = Object.keys(nodes)
const hosts = []
for (var i = 0, len = ids.length; i < len; i++) {
const node = nodes[ids[i]]
hosts.push({
url: node.http.publish_address,
id: ids[i],
roles: node.roles.reduce((acc, role) => {
acc[role] = true
return acc
}, {})
})
}
return hosts
}
/**
* Transforms an url string to a host object
*
* @param {string} url
* @returns {object} host
*/
urlToHost (url) {
return {
id: url,
url
}
}
}
module.exports = ConnectionPool

23
lib/Selectors.js Normal file
View File

@ -0,0 +1,23 @@
'use strict'
class RoundRobinSelector {
constructor () {
this.current = -1
}
select (connections) {
if (++this.current >= connections.length) {
this.current = 0
}
return connections[this.current]
}
}
class RandomSelector {
select (connections) {
const index = Math.floor(Math.random() * connections.length)
return connections[index]
}
}
module.exports = { RoundRobinSelector, RandomSelector }

28
lib/Serializer.js Normal file
View File

@ -0,0 +1,28 @@
'use strict'
const debug = require('debug')('elasticsearch')
const { SerializationError, DeserializationError } = require('./errors')
class Serializer {
serialize (object) {
debug('Serializing', object)
try {
var json = JSON.stringify(object)
} catch (err) {
throw new SerializationError(err.message)
}
return json
}
deserialize (json) {
debug('Deserializing', json)
try {
var object = JSON.parse(json)
} catch (err) {
throw new DeserializationError(err.message)
}
return object
}
}
module.exports = Serializer

126
lib/Transport.js Normal file
View File

@ -0,0 +1,126 @@
'use strict'
const debug = require('debug')('elasticsearch')
const once = require('once')
const {
ConnectionError,
TimeoutError,
NoLivingConnectionsError,
ResponseError
} = require('./errors')
const noop = () => {}
const kRemainingAttempts = Symbol('elasticsearch-remaining-attempts')
class Transport {
constructor (opts = {}) {
this.connectionPool = opts.connectionPool
this.serializer = opts.serializer
this.maxRetries = opts.maxRetries
this.requestTimeout = opts.requestTimeout
this.sniffInterval = opts.sniffInterval
this._sniffEnabled = typeof this.sniffInterval === 'number'
this._nextSniff = this._sniffEnabled ? (Date.now() + this.sniffInterval) : 0
this._isSniffing = false
if (opts.sniffOnStart === true) {
this.sniff()
}
}
request (params, callback) {
callback = once(callback)
const attempts = params[kRemainingAttempts] || params.maxRetries || this.maxRetries
const connection = this.getConnection()
if (connection === null) {
return callback(new NoLivingConnectionsError('There are not living connections'))
}
if (params.body !== null) {
try {
params.body = this.serializer.serialize(params.body)
} catch (err) {
return callback(err)
}
params.headers = params.headers || {}
params.headers['Content-Type'] = 'application/json'
params.headers['Content-Length'] = '' + Buffer.byteLength(params.body)
}
const request = connection.request(params, (err, response) => {
if (err != null) {
this.connectionPool.markDead(connection)
if (attempts > 0) {
debug(`Retrying request, there are still ${attempts} attempts`, params)
params[kRemainingAttempts] = attempts - 1
return this.request(params, callback)
}
if (err.message === 'Request timed out') {
return callback(new TimeoutError(err.message))
} else {
return callback(new ConnectionError(err.message))
}
}
var json = ''
response.setEncoding('utf8')
response.on('data', chunk => { json += chunk })
response.on('error', err => callback(err))
response.on('end', () => {
this.connectionPool.markAlive(connection)
try {
var payload = this.serializer.deserialize(json)
} catch (err) {
return callback(err)
}
const { statusCode } = response
if (statusCode >= 400) {
callback(new ResponseError(payload))
} else {
callback(null, payload)
}
})
})
return function requestAborter () {
request.abort()
debug('Request aborted', params)
}
}
getConnection () {
const now = Date.now()
if (this._sniffEnabled === true && now > this._nextSniff) {
this.sniff(now)
}
this.connectionPool.resurrect(now)
return this.connectionPool.getConnection()
}
sniff (now = Date.now(), callback = noop) {
if (this._isSniffing === true) return
debug('Started sniffing request')
this.request({
method: 'GET',
path: '_nodes/_all/http'
}, (err, body) => {
if (this._sniffEnabled === true) {
this._nextSniff = now + this.sniffInterval
}
if (err) {
debug('Siffing errored', err)
return callback(err)
}
debug('Siffing ended successfully', body)
const hosts = this.connectionPool.nodesToHost(body.nodes)
this.connectionPool
.empty()
.addConnection(hosts)
callback()
})
}
}
module.exports = Transport

76
lib/errors.js Normal file
View File

@ -0,0 +1,76 @@
'use strict'
class BadConfigurationError extends Error {
constructor (message) {
super()
Error.captureStackTrace(this, BadConfigurationError)
this.name = 'BadConfigurationError'
this.message = message || 'Bad Configuration Error'
}
}
class TimeoutError extends Error {
constructor (message) {
super()
Error.captureStackTrace(this, TimeoutError)
this.name = 'TimeoutError'
this.message = message || 'Timeout Error'
}
}
class ConnectionError extends Error {
constructor (message) {
super()
Error.captureStackTrace(this, ConnectionError)
this.name = 'ConnectionError'
this.message = message || 'Connection Error'
}
}
class NoLivingConnectionsError extends Error {
constructor (message) {
super()
Error.captureStackTrace(this, NoLivingConnectionsError)
this.name = 'NoLivingConnectionsError'
this.message = message || 'No Living Connections Error'
}
}
class SerializationError extends Error {
constructor (message) {
super()
Error.captureStackTrace(this, SerializationError)
this.name = 'SerializationError'
this.message = message || 'Serialization Error'
}
}
class DeserializationError extends Error {
constructor (message) {
super()
Error.captureStackTrace(this, DeserializationError)
this.name = 'DeserializationError'
this.message = message || 'Deserialization Error'
}
}
class ResponseError extends Error {
constructor (err) {
super()
Error.captureStackTrace(this, ResponseError)
this.name = 'ResponseError'
this.message = (err && err.error && err.error.type) || 'Response Error'
this.response = err
this.statusCode = err && err.status
}
}
module.exports = {
BadConfigurationError,
TimeoutError,
ConnectionError,
NoLivingConnectionsError,
SerializationError,
DeserializationError,
ResponseError
}

15
lib/symbols.js Normal file
View File

@ -0,0 +1,15 @@
'use strict'
const kTransport = Symbol('elasticsearch-transport')
const kConnection = Symbol('elasticsearch-connection')
const kConnectionPool = Symbol('elasticsearch-connection-pool')
const kSerializer = Symbol('elasticsearch-serializer')
const kSelector = Symbol('elasticsearch-selector')
module.exports = {
kTransport,
kConnection,
kConnectionPool,
kSerializer,
kSelector
}