Verify connection to Elasticsearch (#1487)

This commit is contained in:
Tomas Della Vedova
2021-07-19 16:42:04 +02:00
committed by delvedor
parent 9df5e6f713
commit 12dfc31c8d
24 changed files with 1456 additions and 106 deletions

View File

@ -24,20 +24,24 @@ const os = require('os')
const { gzip, unzip, createGzip } = require('zlib')
const buffer = require('buffer')
const ms = require('ms')
const { EventEmitter } = require('events')
const {
ConnectionError,
RequestAbortedError,
NoLivingConnectionsError,
ResponseError,
ConfigurationError
ConfigurationError,
ProductNotSupportedError
} = require('./errors')
const noop = () => {}
const productCheckEmitter = new EventEmitter()
const clientVersion = require('../package.json').version
const userAgent = `elasticsearch-js/${clientVersion} (${os.platform()} ${os.release()}-${os.arch()}; Node.js ${process.version})`
const MAX_BUFFER_LENGTH = buffer.constants.MAX_LENGTH
const MAX_STRING_LENGTH = buffer.constants.MAX_STRING_LENGTH
const kProductCheck = Symbol('product check')
const kApiVersioning = Symbol('api versioning')
class Transport {
@ -65,6 +69,7 @@ class Transport {
this.generateRequestId = opts.generateRequestId || generateRequestId()
this.name = opts.name
this.opaqueIdPrefix = opts.opaqueIdPrefix
this[kProductCheck] = 0 // 0 = to be checked, 1 = checking, 2 = checked-ok, 3 checked-notok
this[kApiVersioning] = process.env.ELASTIC_CLIENT_APIVERSIONING === 'true'
this.nodeFilter = opts.nodeFilter || defaultNodeFilter
@ -83,7 +88,11 @@ class Transport {
this._isSniffing = false
if (opts.sniffOnStart === true) {
this.sniff({ reason: Transport.sniffReasons.SNIFF_ON_START })
// timer needed otherwise it will clash
// with the product check testing
setTimeout(() => {
this.sniff({ reason: Transport.sniffReasons.SNIFF_ON_START })
}, 10)
}
}
@ -350,91 +359,124 @@ class Transport {
}
}
this.emit('serialization', null, result)
const headers = Object.assign({}, this.headers, lowerCaseHeaders(options.headers))
const prepareRequest = () => {
this.emit('serialization', null, result)
const headers = Object.assign({}, this.headers, lowerCaseHeaders(options.headers))
if (options.opaqueId !== undefined) {
headers['x-opaque-id'] = this.opaqueIdPrefix !== null
? this.opaqueIdPrefix + options.opaqueId
: options.opaqueId
}
// handle json body
if (params.body != null) {
if (shouldSerialize(params.body) === true) {
try {
params.body = this.serializer.serialize(params.body)
} catch (err) {
this.emit('request', err, result)
process.nextTick(callback, err, result)
return transportReturn
}
if (options.opaqueId !== undefined) {
headers['x-opaque-id'] = this.opaqueIdPrefix !== null
? this.opaqueIdPrefix + options.opaqueId
: options.opaqueId
}
if (params.body !== '') {
headers['content-type'] = headers['content-type'] || (this[kApiVersioning] ? 'application/vnd.elasticsearch+json; compatible-with=7' : 'application/json')
}
// handle ndjson body
} else if (params.bulkBody != null) {
if (shouldSerialize(params.bulkBody) === true) {
try {
params.body = this.serializer.ndserialize(params.bulkBody)
} catch (err) {
this.emit('request', err, result)
process.nextTick(callback, err, result)
return transportReturn
}
} else {
params.body = params.bulkBody
}
if (params.body !== '') {
headers['content-type'] = headers['content-type'] || (this[kApiVersioning] ? 'application/vnd.elasticsearch+x-ndjson; compatible-with=7' : 'application/x-ndjson')
}
}
params.headers = headers
// serializes the querystring
if (options.querystring == null) {
params.querystring = this.serializer.qserialize(params.querystring)
} else {
params.querystring = this.serializer.qserialize(
Object.assign({}, params.querystring, options.querystring)
)
}
// handles request timeout
params.timeout = toMs(options.requestTimeout || this.requestTimeout)
if (options.asStream === true) params.asStream = true
meta.request.params = params
meta.request.options = options
// handle compression
if (params.body !== '' && params.body != null) {
if (isStream(params.body) === true) {
if (compression === 'gzip') {
params.headers['content-encoding'] = compression
params.body = params.body.pipe(createGzip())
}
makeRequest()
} else if (compression === 'gzip') {
gzip(params.body, (err, buffer) => {
/* istanbul ignore next */
if (err) {
// handle json body
if (params.body != null) {
if (shouldSerialize(params.body) === true) {
try {
params.body = this.serializer.serialize(params.body)
} catch (err) {
this.emit('request', err, result)
return callback(err, result)
process.nextTick(callback, err, result)
return transportReturn
}
params.headers['content-encoding'] = compression
params.headers['content-length'] = '' + Buffer.byteLength(buffer)
params.body = buffer
makeRequest()
})
}
if (params.body !== '') {
headers['content-type'] = headers['content-type'] || (this[kApiVersioning] ? 'application/vnd.elasticsearch+json; compatible-with=7' : 'application/json')
}
// handle ndjson body
} else if (params.bulkBody != null) {
if (shouldSerialize(params.bulkBody) === true) {
try {
params.body = this.serializer.ndserialize(params.bulkBody)
} catch (err) {
this.emit('request', err, result)
process.nextTick(callback, err, result)
return transportReturn
}
} else {
params.body = params.bulkBody
}
if (params.body !== '') {
headers['content-type'] = headers['content-type'] || (this[kApiVersioning] ? 'application/vnd.elasticsearch+x-ndjson; compatible-with=7' : 'application/x-ndjson')
}
}
params.headers = headers
// serializes the querystring
if (options.querystring == null) {
params.querystring = this.serializer.qserialize(params.querystring)
} else {
params.querystring = this.serializer.qserialize(
Object.assign({}, params.querystring, options.querystring)
)
}
// handles request timeout
params.timeout = toMs(options.requestTimeout || this.requestTimeout)
if (options.asStream === true) params.asStream = true
meta.request.params = params
meta.request.options = options
// handle compression
if (params.body !== '' && params.body != null) {
if (isStream(params.body) === true) {
if (compression === 'gzip') {
params.headers['content-encoding'] = compression
params.body = params.body.pipe(createGzip())
}
makeRequest()
} else if (compression === 'gzip') {
gzip(params.body, (err, buffer) => {
/* istanbul ignore next */
if (err) {
this.emit('request', err, result)
return callback(err, result)
}
params.headers['content-encoding'] = compression
params.headers['content-length'] = '' + Buffer.byteLength(buffer)
params.body = buffer
makeRequest()
})
} else {
params.headers['content-length'] = '' + Buffer.byteLength(params.body)
makeRequest()
}
} else {
params.headers['content-length'] = '' + Buffer.byteLength(params.body)
makeRequest()
}
}
// still need to check the product or waiting for the check to finish
if (this[kProductCheck] === 0 || this[kProductCheck] === 1) {
// let pass info requests
if (params.method === 'GET' && params.path === '/') {
prepareRequest()
} else {
// wait for product check to finish
productCheckEmitter.once('product-check', status => {
if (status === false) {
const err = new ProductNotSupportedError(result)
this.emit('request', err, result)
process.nextTick(callback, err, result)
} else {
prepareRequest()
}
})
// the very first request triggers the product check
if (this[kProductCheck] === 0) {
this.productCheck()
}
}
// the product check is finished and it's not Elasticsearch
} else if (this[kProductCheck] === 3) {
const err = new ProductNotSupportedError(result)
this.emit('request', err, result)
process.nextTick(callback, err, result)
// the product check finished and it's Elasticsearch
} else {
makeRequest()
prepareRequest()
}
return transportReturn
@ -494,6 +536,59 @@ class Transport {
callback(null, hosts)
})
}
productCheck () {
debug('Start product check')
this[kProductCheck] = 1
this.request({
method: 'GET',
path: '/'
}, (err, result) => {
this[kProductCheck] = 3
if (err) {
debug('Product check failed', err)
if (err.statusCode === 401 || err.statusCode === 403) {
this[kProductCheck] = 2
process.emitWarning('The client is unable to verify that the server is Elasticsearch due to security privileges on the server side. Some functionality may not be compatible if the server is running an unsupported product.')
productCheckEmitter.emit('product-check', true)
} else {
this[kProductCheck] = 0
productCheckEmitter.emit('product-check', false)
}
} else {
debug('Checking elasticsearch version', result.body, result.headers)
if (result.body.version == null || typeof result.body.version.number !== 'string') {
debug('Can\'t access Elasticsearch version')
return productCheckEmitter.emit('product-check', false)
}
const tagline = result.body.tagline
const version = result.body.version.number.split('.')
const major = Number(version[0])
const minor = Number(version[1])
if (major < 6) {
return productCheckEmitter.emit('product-check', false)
} else if (major >= 6 && major < 7) {
if (tagline !== 'You Know, for Search') {
debug('Bad tagline')
return productCheckEmitter.emit('product-check', false)
}
} else if (major === 7 && minor < 14) {
if (tagline !== 'You Know, for Search' || result.body.version.build_flavor !== 'default') {
debug('Bad tagline or build_flavor')
return productCheckEmitter.emit('product-check', false)
}
} else {
if (result.headers['x-elastic-product'] !== 'Elasticsearch') {
debug('x-elastic-product not recognized')
return productCheckEmitter.emit('product-check', false)
}
}
debug('Valid Elasticsearch distribution')
this[kProductCheck] = 2
productCheckEmitter.emit('product-check', true)
}
})
}
}
Transport.sniffReasons = {