Files
elasticsearch-js/index.js
Tomas Della Vedova b1458e3511 Improve observability (#834)
* API generation

* Added correlation id support

* Updated docs

* Updated test

* Updated code generation

* API generation

* Updated code generation

* Added support for client name and custom context object

* Updated docs

* Updated test

* Fix docs

* Updated docs

* Added id support also for sniffing

* Updated test

* Update docs/observability.asciidoc

Co-Authored-By: delvedor <delvedor@users.noreply.github.com>

* Update docs/observability.asciidoc

Co-Authored-By: delvedor <delvedor@users.noreply.github.com>

* Apply suggestions

* Update docs/configuration.asciidoc

Co-Authored-By: delvedor <delvedor@users.noreply.github.com>

* Update docs/configuration.asciidoc

Co-Authored-By: delvedor <delvedor@users.noreply.github.com>

* Update docs/observability.asciidoc

Co-Authored-By: delvedor <delvedor@users.noreply.github.com>

* Update docs/observability.asciidoc

Co-Authored-By: delvedor <delvedor@users.noreply.github.com>

* Update docs/observability.asciidoc

Co-Authored-By: delvedor <delvedor@users.noreply.github.com>

* Apply suggestions

* Updated README.md

* Fixed test

* Addressed suggestions
2019-05-03 17:25:25 +02:00

228 lines
7.1 KiB
JavaScript

/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
'use strict'
const { EventEmitter } = require('events')
const debug = require('debug')('elasticsearch')
const Transport = require('./lib/Transport')
const Connection = require('./lib/Connection')
const ConnectionPool = require('./lib/ConnectionPool')
const Serializer = require('./lib/Serializer')
const errors = require('./lib/errors')
const { ConfigurationError } = errors
const kInitialOptions = Symbol('elasticsearchjs-initial-options')
const kChild = Symbol('elasticsearchjs-child')
const kExtensions = Symbol('elasticsearchjs-extensions')
const buildApi = require('./api')
class Client extends EventEmitter {
constructor (opts = {}) {
super()
if (opts.cloud) {
const { id, username, password } = opts.cloud
// the cloud id is `cluster-name:base64encodedurl`
// the url is a string divided by two '$', the first is the cloud url
// the second the elasticsearch instance, the third the kibana instance
const cloudUrls = Buffer.from(id.split(':')[1], 'base64').toString().split('$')
opts.node = `https://${username}:${password}@${cloudUrls[1]}.${cloudUrls[0]}`
// Cloud has better performances with compression enabled
// see https://github.com/elastic/elasticsearch-py/pull/704.
// So unless the user specifies otherwise, we enable compression.
if (opts.compression == null) opts.compression = 'gzip'
if (opts.suggestCompression == null) opts.suggestCompression = true
if (opts.ssl == null ||
(opts.ssl && opts.ssl.secureProtocol == null)) {
opts.ssl = opts.ssl || {}
opts.ssl.secureProtocol = 'TLSv1_2_method'
}
}
if (!opts.node && !opts.nodes) {
throw new ConfigurationError('Missing node(s) option')
}
const options = Object.assign({}, {
Connection,
ConnectionPool,
Transport,
Serializer,
maxRetries: 3,
requestTimeout: 30000,
pingTimeout: 3000,
sniffInterval: false,
sniffOnStart: false,
sniffEndpoint: '_nodes/_all/http',
sniffOnConnectionFault: false,
resurrectStrategy: 'ping',
suggestCompression: false,
compression: false,
ssl: null,
agent: null,
headers: {},
nodeFilter: null,
nodeSelector: 'round-robin',
generateRequestId: null,
name: 'elasticsearch-js'
}, opts)
this[kInitialOptions] = options
this[kExtensions] = []
this.name = options.name
this.serializer = new options.Serializer()
this.connectionPool = new options.ConnectionPool({
pingTimeout: options.pingTimeout,
resurrectStrategy: options.resurrectStrategy,
ssl: options.ssl,
agent: options.agent,
Connection: options.Connection,
emit: this.emit.bind(this),
sniffEnabled: options.sniffInterval !== false ||
options.sniffOnStart !== false ||
options.sniffOnConnectionFault !== false
})
// Add the connections before initialize the Transport
if (opts[kChild] !== true) {
this.connectionPool.addConnection(options.node || options.nodes)
}
this.transport = new options.Transport({
emit: this.emit.bind(this),
connectionPool: this.connectionPool,
serializer: this.serializer,
maxRetries: options.maxRetries,
requestTimeout: options.requestTimeout,
sniffInterval: options.sniffInterval,
sniffOnStart: options.sniffOnStart,
sniffOnConnectionFault: options.sniffOnConnectionFault,
sniffEndpoint: options.sniffEndpoint,
suggestCompression: options.suggestCompression,
compression: options.compression,
headers: options.headers,
nodeFilter: options.nodeFilter,
nodeSelector: options.nodeSelector,
generateRequestId: options.generateRequestId,
name: options.name
})
const apis = buildApi({
makeRequest: this.transport.request.bind(this.transport),
result: { body: null, statusCode: null, headers: null, warnings: null },
ConfigurationError
})
Object.keys(apis).forEach(api => {
this[api] = apis[api]
})
}
extend (name, opts, fn) {
if (typeof opts === 'function') {
fn = opts
opts = {}
}
var [namespace, method] = name.split('.')
if (method == null) {
method = namespace
namespace = null
}
if (namespace != null) {
if (this[namespace] != null && this[namespace][method] != null && opts.force !== true) {
throw new Error(`The method "${method}" already exists on namespace "${namespace}"`)
}
this[namespace] = this[namespace] || {}
this[namespace][method] = fn({
makeRequest: this.transport.request.bind(this.transport),
result: { body: null, statusCode: null, headers: null, warnings: null },
ConfigurationError
})
} else {
if (this[method] != null && opts.force !== true) {
throw new Error(`The method "${method}" already exists`)
}
this[method] = fn({
makeRequest: this.transport.request.bind(this.transport),
result: { body: null, statusCode: null, headers: null, warnings: null },
ConfigurationError
})
}
this[kExtensions].push({ name, opts, fn })
}
child (opts) {
// Merge the new options with the initial ones
const initialOptions = Object.assign({}, this[kInitialOptions], opts)
// Tell to the client that we are creating a child client
initialOptions[kChild] = true
const client = new Client(initialOptions)
// Reuse the same connection pool
client.connectionPool = this.connectionPool
client.transport.connectionPool = this.connectionPool
// Share event listener
const emitter = this.emit.bind(this)
client.emit = emitter
client.connectionPool.emit = emitter
client.transport.emit = emitter
client.on = this.on.bind(this)
// Add parent extensions
this[kExtensions].forEach(({ name, opts, fn }) => {
client.extend(name, opts, fn)
})
return client
}
close (callback) {
if (callback == null) {
return new Promise((resolve, reject) => {
this.close(resolve)
})
}
debug('Closing the client')
this.connectionPool.empty(callback)
}
}
const events = {
RESPONSE: 'response',
REQUEST: 'request',
SNIFF: 'sniff',
RESURRECT: 'resurrect'
}
module.exports = {
Client,
Transport,
ConnectionPool,
Connection,
Serializer,
events,
errors
}