271 lines
8.2 KiB
JavaScript
271 lines
8.2 KiB
JavaScript
// Licensed to Elasticsearch B.V under one or more agreements.
|
|
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
|
|
// See the LICENSE file in the project root for more information
|
|
|
|
'use strict'
|
|
|
|
const nodeMajor = Number(process.versions.node.split('.')[0])
|
|
|
|
const { EventEmitter } = require('events')
|
|
const { URL } = require('url')
|
|
const debug = require('debug')('elasticsearch')
|
|
const Transport = require('./lib/Transport')
|
|
const Connection = require('./lib/Connection')
|
|
const { ConnectionPool, CloudConnectionPool } = require('./lib/pool')
|
|
// Helpers works only in Node.js >= 10
|
|
const Helpers = nodeMajor < 10 ? null : require('./lib/Helpers')
|
|
const Serializer = require('./lib/Serializer')
|
|
const errors = require('./lib/errors')
|
|
const { ConfigurationError } = errors
|
|
|
|
const kInitialOptions = Symbol('elasticsearchjs-initial-options')
|
|
const kChild = Symbol('elasticsearchjs-child')
|
|
const kExtensions = Symbol('elasticsearchjs-extensions')
|
|
|
|
const buildApi = require('./api')
|
|
|
|
class Client extends EventEmitter {
|
|
constructor (opts = {}) {
|
|
super()
|
|
if (opts.cloud) {
|
|
const { id, username, password } = opts.cloud
|
|
// the cloud id is `cluster-name:base64encodedurl`
|
|
// the url is a string divided by two '$', the first is the cloud url
|
|
// the second the elasticsearch instance, the third the kibana instance
|
|
const cloudUrls = Buffer.from(id.split(':')[1], 'base64').toString().split('$')
|
|
|
|
// TODO: remove username and password here in 8
|
|
if (username && password) {
|
|
opts.auth = Object.assign({}, opts.auth, { username, password })
|
|
}
|
|
opts.node = `https://${cloudUrls[1]}.${cloudUrls[0]}`
|
|
|
|
// Cloud has better performances with compression enabled
|
|
// see https://github.com/elastic/elasticsearch-py/pull/704.
|
|
// So unless the user specifies otherwise, we enable compression.
|
|
if (opts.compression == null) opts.compression = 'gzip'
|
|
if (opts.suggestCompression == null) opts.suggestCompression = true
|
|
if (opts.ssl == null ||
|
|
(opts.ssl && opts.ssl.secureProtocol == null)) {
|
|
opts.ssl = opts.ssl || {}
|
|
opts.ssl.secureProtocol = 'TLSv1_2_method'
|
|
}
|
|
}
|
|
|
|
if (!opts.node && !opts.nodes) {
|
|
throw new ConfigurationError('Missing node(s) option')
|
|
}
|
|
|
|
const checkAuth = getAuth(opts.node || opts.nodes)
|
|
if (checkAuth && checkAuth.username && checkAuth.password) {
|
|
opts.auth = Object.assign({}, opts.auth, { username: checkAuth.username, password: checkAuth.password })
|
|
}
|
|
|
|
const options = Object.assign({}, {
|
|
Connection,
|
|
Transport,
|
|
Serializer,
|
|
ConnectionPool: opts.cloud ? CloudConnectionPool : ConnectionPool,
|
|
maxRetries: 3,
|
|
requestTimeout: 30000,
|
|
pingTimeout: 3000,
|
|
sniffInterval: false,
|
|
sniffOnStart: false,
|
|
sniffEndpoint: '_nodes/_all/http',
|
|
sniffOnConnectionFault: false,
|
|
resurrectStrategy: 'ping',
|
|
suggestCompression: false,
|
|
compression: false,
|
|
ssl: null,
|
|
agent: null,
|
|
headers: {},
|
|
nodeFilter: null,
|
|
nodeSelector: 'round-robin',
|
|
generateRequestId: null,
|
|
name: 'elasticsearch-js',
|
|
auth: null,
|
|
opaqueIdPrefix: null
|
|
}, opts)
|
|
|
|
this[kInitialOptions] = options
|
|
this[kExtensions] = []
|
|
|
|
this.name = options.name
|
|
this.serializer = new options.Serializer()
|
|
this.connectionPool = new options.ConnectionPool({
|
|
pingTimeout: options.pingTimeout,
|
|
resurrectStrategy: options.resurrectStrategy,
|
|
ssl: options.ssl,
|
|
agent: options.agent,
|
|
Connection: options.Connection,
|
|
auth: options.auth,
|
|
emit: this.emit.bind(this),
|
|
sniffEnabled: options.sniffInterval !== false ||
|
|
options.sniffOnStart !== false ||
|
|
options.sniffOnConnectionFault !== false
|
|
})
|
|
|
|
// Add the connections before initialize the Transport
|
|
if (opts[kChild] !== true) {
|
|
this.connectionPool.addConnection(options.node || options.nodes)
|
|
}
|
|
|
|
this.transport = new options.Transport({
|
|
emit: this.emit.bind(this),
|
|
connectionPool: this.connectionPool,
|
|
serializer: this.serializer,
|
|
maxRetries: options.maxRetries,
|
|
requestTimeout: options.requestTimeout,
|
|
sniffInterval: options.sniffInterval,
|
|
sniffOnStart: options.sniffOnStart,
|
|
sniffOnConnectionFault: options.sniffOnConnectionFault,
|
|
sniffEndpoint: options.sniffEndpoint,
|
|
suggestCompression: options.suggestCompression,
|
|
compression: options.compression,
|
|
headers: options.headers,
|
|
nodeFilter: options.nodeFilter,
|
|
nodeSelector: options.nodeSelector,
|
|
generateRequestId: options.generateRequestId,
|
|
name: options.name,
|
|
opaqueIdPrefix: options.opaqueIdPrefix
|
|
})
|
|
|
|
if (Helpers !== null) {
|
|
this.helpers = new Helpers({ client: this, maxRetries: options.maxRetries })
|
|
}
|
|
|
|
const apis = buildApi({
|
|
makeRequest: this.transport.request.bind(this.transport),
|
|
result: { body: null, statusCode: null, headers: null, warnings: null },
|
|
ConfigurationError
|
|
})
|
|
|
|
Object.keys(apis).forEach(api => {
|
|
this[api] = apis[api]
|
|
})
|
|
}
|
|
|
|
extend (name, opts, fn) {
|
|
if (typeof opts === 'function') {
|
|
fn = opts
|
|
opts = {}
|
|
}
|
|
|
|
var [namespace, method] = name.split('.')
|
|
if (method == null) {
|
|
method = namespace
|
|
namespace = null
|
|
}
|
|
|
|
if (namespace != null) {
|
|
if (this[namespace] != null && this[namespace][method] != null && opts.force !== true) {
|
|
throw new Error(`The method "${method}" already exists on namespace "${namespace}"`)
|
|
}
|
|
|
|
this[namespace] = this[namespace] || {}
|
|
this[namespace][method] = fn({
|
|
makeRequest: this.transport.request.bind(this.transport),
|
|
result: { body: null, statusCode: null, headers: null, warnings: null },
|
|
ConfigurationError
|
|
})
|
|
} else {
|
|
if (this[method] != null && opts.force !== true) {
|
|
throw new Error(`The method "${method}" already exists`)
|
|
}
|
|
|
|
this[method] = fn({
|
|
makeRequest: this.transport.request.bind(this.transport),
|
|
result: { body: null, statusCode: null, headers: null, warnings: null },
|
|
ConfigurationError
|
|
})
|
|
}
|
|
|
|
this[kExtensions].push({ name, opts, fn })
|
|
}
|
|
|
|
child (opts) {
|
|
// Merge the new options with the initial ones
|
|
const initialOptions = Object.assign({}, this[kInitialOptions], opts)
|
|
// Tell to the client that we are creating a child client
|
|
initialOptions[kChild] = true
|
|
|
|
const client = new Client(initialOptions)
|
|
// Reuse the same connection pool
|
|
client.connectionPool = this.connectionPool
|
|
client.transport.connectionPool = this.connectionPool
|
|
// Share event listener
|
|
const emitter = this.emit.bind(this)
|
|
client.emit = emitter
|
|
client.connectionPool.emit = emitter
|
|
client.transport.emit = emitter
|
|
client.on = this.on.bind(this)
|
|
// Add parent extensions
|
|
this[kExtensions].forEach(({ name, opts, fn }) => {
|
|
client.extend(name, opts, fn)
|
|
})
|
|
return client
|
|
}
|
|
|
|
close (callback) {
|
|
if (callback == null) {
|
|
return new Promise((resolve, reject) => {
|
|
this.close(resolve)
|
|
})
|
|
}
|
|
debug('Closing the client')
|
|
this.connectionPool.empty(callback)
|
|
}
|
|
}
|
|
|
|
function getAuth (node) {
|
|
if (Array.isArray(node)) {
|
|
for (const url of node) {
|
|
const auth = getUsernameAndPassword(url)
|
|
if (auth.username !== '' && auth.password !== '') {
|
|
return auth
|
|
}
|
|
}
|
|
|
|
return null
|
|
}
|
|
|
|
const auth = getUsernameAndPassword(node)
|
|
if (auth.username !== '' && auth.password !== '') {
|
|
return auth
|
|
}
|
|
|
|
return null
|
|
|
|
function getUsernameAndPassword (node) {
|
|
if (typeof node === 'string') {
|
|
const { username, password } = new URL(node)
|
|
return {
|
|
username: decodeURIComponent(username),
|
|
password: decodeURIComponent(password)
|
|
}
|
|
} else if (node.url instanceof URL) {
|
|
return {
|
|
username: decodeURIComponent(node.url.username),
|
|
password: decodeURIComponent(node.url.password)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
const events = {
|
|
RESPONSE: 'response',
|
|
REQUEST: 'request',
|
|
SNIFF: 'sniff',
|
|
RESURRECT: 'resurrect'
|
|
}
|
|
|
|
module.exports = {
|
|
Client,
|
|
Transport,
|
|
ConnectionPool,
|
|
Connection,
|
|
Serializer,
|
|
events,
|
|
errors
|
|
}
|