Verify connection to Elasticsearch (#1487)
This commit is contained in:
committed by
GitHub
parent
76f5845ac1
commit
17c744ed80
3
lib/Transport.d.ts
vendored
3
lib/Transport.d.ts
vendored
@ -26,7 +26,8 @@ import * as errors from './errors';
|
||||
export type ApiError = errors.ConfigurationError | errors.ConnectionError |
|
||||
errors.DeserializationError | errors.SerializationError |
|
||||
errors.NoLivingConnectionsError | errors.ResponseError |
|
||||
errors.TimeoutError | errors.RequestAbortedError
|
||||
errors.TimeoutError | errors.RequestAbortedError |
|
||||
errors.ProductNotSupportedError
|
||||
|
||||
export type Context = unknown
|
||||
|
||||
|
||||
251
lib/Transport.js
251
lib/Transport.js
@ -24,20 +24,24 @@ const os = require('os')
|
||||
const { gzip, unzip, createGzip } = require('zlib')
|
||||
const buffer = require('buffer')
|
||||
const ms = require('ms')
|
||||
const { EventEmitter } = require('events')
|
||||
const {
|
||||
ConnectionError,
|
||||
RequestAbortedError,
|
||||
NoLivingConnectionsError,
|
||||
ResponseError,
|
||||
ConfigurationError
|
||||
ConfigurationError,
|
||||
ProductNotSupportedError
|
||||
} = require('./errors')
|
||||
|
||||
const noop = () => {}
|
||||
|
||||
const productCheckEmitter = new EventEmitter()
|
||||
const clientVersion = require('../package.json').version
|
||||
const userAgent = `elasticsearch-js/${clientVersion} (${os.platform()} ${os.release()}-${os.arch()}; Node.js ${process.version})`
|
||||
const MAX_BUFFER_LENGTH = buffer.constants.MAX_LENGTH
|
||||
const MAX_STRING_LENGTH = buffer.constants.MAX_STRING_LENGTH
|
||||
const kProductCheck = Symbol('product check')
|
||||
const kApiVersioning = Symbol('api versioning')
|
||||
|
||||
class Transport {
|
||||
@ -65,6 +69,7 @@ class Transport {
|
||||
this.generateRequestId = opts.generateRequestId || generateRequestId()
|
||||
this.name = opts.name
|
||||
this.opaqueIdPrefix = opts.opaqueIdPrefix
|
||||
this[kProductCheck] = 0 // 0 = to be checked, 1 = checking, 2 = checked-ok, 3 checked-notok
|
||||
this[kApiVersioning] = process.env.ELASTIC_CLIENT_APIVERSIONING === 'true'
|
||||
|
||||
this.nodeFilter = opts.nodeFilter || defaultNodeFilter
|
||||
@ -83,7 +88,11 @@ class Transport {
|
||||
this._isSniffing = false
|
||||
|
||||
if (opts.sniffOnStart === true) {
|
||||
this.sniff({ reason: Transport.sniffReasons.SNIFF_ON_START })
|
||||
// timer needed otherwise it will clash
|
||||
// with the product check testing
|
||||
setTimeout(() => {
|
||||
this.sniff({ reason: Transport.sniffReasons.SNIFF_ON_START })
|
||||
}, 10)
|
||||
}
|
||||
}
|
||||
|
||||
@ -350,91 +359,124 @@ class Transport {
|
||||
}
|
||||
}
|
||||
|
||||
this.emit('serialization', null, result)
|
||||
const headers = Object.assign({}, this.headers, lowerCaseHeaders(options.headers))
|
||||
const prepareRequest = () => {
|
||||
this.emit('serialization', null, result)
|
||||
const headers = Object.assign({}, this.headers, lowerCaseHeaders(options.headers))
|
||||
|
||||
if (options.opaqueId !== undefined) {
|
||||
headers['x-opaque-id'] = this.opaqueIdPrefix !== null
|
||||
? this.opaqueIdPrefix + options.opaqueId
|
||||
: options.opaqueId
|
||||
}
|
||||
|
||||
// handle json body
|
||||
if (params.body != null) {
|
||||
if (shouldSerialize(params.body) === true) {
|
||||
try {
|
||||
params.body = this.serializer.serialize(params.body)
|
||||
} catch (err) {
|
||||
this.emit('request', err, result)
|
||||
process.nextTick(callback, err, result)
|
||||
return transportReturn
|
||||
}
|
||||
if (options.opaqueId !== undefined) {
|
||||
headers['x-opaque-id'] = this.opaqueIdPrefix !== null
|
||||
? this.opaqueIdPrefix + options.opaqueId
|
||||
: options.opaqueId
|
||||
}
|
||||
|
||||
if (params.body !== '') {
|
||||
headers['content-type'] = headers['content-type'] || (this[kApiVersioning] ? 'application/vnd.elasticsearch+json; compatible-with=7' : 'application/json')
|
||||
}
|
||||
|
||||
// handle ndjson body
|
||||
} else if (params.bulkBody != null) {
|
||||
if (shouldSerialize(params.bulkBody) === true) {
|
||||
try {
|
||||
params.body = this.serializer.ndserialize(params.bulkBody)
|
||||
} catch (err) {
|
||||
this.emit('request', err, result)
|
||||
process.nextTick(callback, err, result)
|
||||
return transportReturn
|
||||
}
|
||||
} else {
|
||||
params.body = params.bulkBody
|
||||
}
|
||||
if (params.body !== '') {
|
||||
headers['content-type'] = headers['content-type'] || (this[kApiVersioning] ? 'application/vnd.elasticsearch+x-ndjson; compatible-with=7' : 'application/x-ndjson')
|
||||
}
|
||||
}
|
||||
|
||||
params.headers = headers
|
||||
// serializes the querystring
|
||||
if (options.querystring == null) {
|
||||
params.querystring = this.serializer.qserialize(params.querystring)
|
||||
} else {
|
||||
params.querystring = this.serializer.qserialize(
|
||||
Object.assign({}, params.querystring, options.querystring)
|
||||
)
|
||||
}
|
||||
|
||||
// handles request timeout
|
||||
params.timeout = toMs(options.requestTimeout || this.requestTimeout)
|
||||
if (options.asStream === true) params.asStream = true
|
||||
meta.request.params = params
|
||||
meta.request.options = options
|
||||
|
||||
// handle compression
|
||||
if (params.body !== '' && params.body != null) {
|
||||
if (isStream(params.body) === true) {
|
||||
if (compression === 'gzip') {
|
||||
params.headers['content-encoding'] = compression
|
||||
params.body = params.body.pipe(createGzip())
|
||||
}
|
||||
makeRequest()
|
||||
} else if (compression === 'gzip') {
|
||||
gzip(params.body, (err, buffer) => {
|
||||
/* istanbul ignore next */
|
||||
if (err) {
|
||||
// handle json body
|
||||
if (params.body != null) {
|
||||
if (shouldSerialize(params.body) === true) {
|
||||
try {
|
||||
params.body = this.serializer.serialize(params.body)
|
||||
} catch (err) {
|
||||
this.emit('request', err, result)
|
||||
return callback(err, result)
|
||||
process.nextTick(callback, err, result)
|
||||
return transportReturn
|
||||
}
|
||||
params.headers['content-encoding'] = compression
|
||||
params.headers['content-length'] = '' + Buffer.byteLength(buffer)
|
||||
params.body = buffer
|
||||
makeRequest()
|
||||
})
|
||||
}
|
||||
|
||||
if (params.body !== '') {
|
||||
headers['content-type'] = headers['content-type'] || (this[kApiVersioning] ? 'application/vnd.elasticsearch+json; compatible-with=7' : 'application/json')
|
||||
}
|
||||
|
||||
// handle ndjson body
|
||||
} else if (params.bulkBody != null) {
|
||||
if (shouldSerialize(params.bulkBody) === true) {
|
||||
try {
|
||||
params.body = this.serializer.ndserialize(params.bulkBody)
|
||||
} catch (err) {
|
||||
this.emit('request', err, result)
|
||||
process.nextTick(callback, err, result)
|
||||
return transportReturn
|
||||
}
|
||||
} else {
|
||||
params.body = params.bulkBody
|
||||
}
|
||||
if (params.body !== '') {
|
||||
headers['content-type'] = headers['content-type'] || (this[kApiVersioning] ? 'application/vnd.elasticsearch+x-ndjson; compatible-with=7' : 'application/x-ndjson')
|
||||
}
|
||||
}
|
||||
|
||||
params.headers = headers
|
||||
// serializes the querystring
|
||||
if (options.querystring == null) {
|
||||
params.querystring = this.serializer.qserialize(params.querystring)
|
||||
} else {
|
||||
params.querystring = this.serializer.qserialize(
|
||||
Object.assign({}, params.querystring, options.querystring)
|
||||
)
|
||||
}
|
||||
|
||||
// handles request timeout
|
||||
params.timeout = toMs(options.requestTimeout || this.requestTimeout)
|
||||
if (options.asStream === true) params.asStream = true
|
||||
meta.request.params = params
|
||||
meta.request.options = options
|
||||
|
||||
// handle compression
|
||||
if (params.body !== '' && params.body != null) {
|
||||
if (isStream(params.body) === true) {
|
||||
if (compression === 'gzip') {
|
||||
params.headers['content-encoding'] = compression
|
||||
params.body = params.body.pipe(createGzip())
|
||||
}
|
||||
makeRequest()
|
||||
} else if (compression === 'gzip') {
|
||||
gzip(params.body, (err, buffer) => {
|
||||
/* istanbul ignore next */
|
||||
if (err) {
|
||||
this.emit('request', err, result)
|
||||
return callback(err, result)
|
||||
}
|
||||
params.headers['content-encoding'] = compression
|
||||
params.headers['content-length'] = '' + Buffer.byteLength(buffer)
|
||||
params.body = buffer
|
||||
makeRequest()
|
||||
})
|
||||
} else {
|
||||
params.headers['content-length'] = '' + Buffer.byteLength(params.body)
|
||||
makeRequest()
|
||||
}
|
||||
} else {
|
||||
params.headers['content-length'] = '' + Buffer.byteLength(params.body)
|
||||
makeRequest()
|
||||
}
|
||||
}
|
||||
|
||||
// still need to check the product or waiting for the check to finish
|
||||
if (this[kProductCheck] === 0 || this[kProductCheck] === 1) {
|
||||
// let pass info requests
|
||||
if (params.method === 'GET' && params.path === '/') {
|
||||
prepareRequest()
|
||||
} else {
|
||||
// wait for product check to finish
|
||||
productCheckEmitter.once('product-check', status => {
|
||||
if (status === false) {
|
||||
const err = new ProductNotSupportedError(result)
|
||||
this.emit('request', err, result)
|
||||
process.nextTick(callback, err, result)
|
||||
} else {
|
||||
prepareRequest()
|
||||
}
|
||||
})
|
||||
// the very first request triggers the product check
|
||||
if (this[kProductCheck] === 0) {
|
||||
this.productCheck()
|
||||
}
|
||||
}
|
||||
// the product check is finished and it's not Elasticsearch
|
||||
} else if (this[kProductCheck] === 3) {
|
||||
const err = new ProductNotSupportedError(result)
|
||||
this.emit('request', err, result)
|
||||
process.nextTick(callback, err, result)
|
||||
// the product check finished and it's Elasticsearch
|
||||
} else {
|
||||
makeRequest()
|
||||
prepareRequest()
|
||||
}
|
||||
|
||||
return transportReturn
|
||||
@ -494,6 +536,59 @@ class Transport {
|
||||
callback(null, hosts)
|
||||
})
|
||||
}
|
||||
|
||||
productCheck () {
|
||||
debug('Start product check')
|
||||
this[kProductCheck] = 1
|
||||
this.request({
|
||||
method: 'GET',
|
||||
path: '/'
|
||||
}, (err, result) => {
|
||||
this[kProductCheck] = 3
|
||||
if (err) {
|
||||
debug('Product check failed', err)
|
||||
if (err.statusCode === 401 || err.statusCode === 403) {
|
||||
this[kProductCheck] = 2
|
||||
process.emitWarning('The client is unable to verify that the server is Elasticsearch due to security privileges on the server side. Some functionality may not be compatible if the server is running an unsupported product.')
|
||||
productCheckEmitter.emit('product-check', true)
|
||||
} else {
|
||||
this[kProductCheck] = 0
|
||||
productCheckEmitter.emit('product-check', false)
|
||||
}
|
||||
} else {
|
||||
debug('Checking elasticsearch version', result.body, result.headers)
|
||||
if (result.body.version == null || typeof result.body.version.number !== 'string') {
|
||||
debug('Can\'t access Elasticsearch version')
|
||||
return productCheckEmitter.emit('product-check', false)
|
||||
}
|
||||
const tagline = result.body.tagline
|
||||
const version = result.body.version.number.split('.')
|
||||
const major = Number(version[0])
|
||||
const minor = Number(version[1])
|
||||
if (major < 6) {
|
||||
return productCheckEmitter.emit('product-check', false)
|
||||
} else if (major >= 6 && major < 7) {
|
||||
if (tagline !== 'You Know, for Search') {
|
||||
debug('Bad tagline')
|
||||
return productCheckEmitter.emit('product-check', false)
|
||||
}
|
||||
} else if (major === 7 && minor < 14) {
|
||||
if (tagline !== 'You Know, for Search' || result.body.version.build_flavor !== 'default') {
|
||||
debug('Bad tagline or build_flavor')
|
||||
return productCheckEmitter.emit('product-check', false)
|
||||
}
|
||||
} else {
|
||||
if (result.headers['x-elastic-product'] !== 'Elasticsearch') {
|
||||
debug('x-elastic-product not recognized')
|
||||
return productCheckEmitter.emit('product-check', false)
|
||||
}
|
||||
}
|
||||
debug('Valid Elasticsearch distribution')
|
||||
this[kProductCheck] = 2
|
||||
productCheckEmitter.emit('product-check', true)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
Transport.sniffReasons = {
|
||||
|
||||
7
lib/errors.d.ts
vendored
7
lib/errors.d.ts
vendored
@ -81,3 +81,10 @@ export declare class RequestAbortedError<TResponse = Record<string, any>, TConte
|
||||
meta: ApiResponse<TResponse, TContext>;
|
||||
constructor(message: string, meta: ApiResponse);
|
||||
}
|
||||
|
||||
export declare class ProductNotSupportedError<TResponse = Record<string, any>, TContext = Context> extends ElasticsearchClientError {
|
||||
name: string;
|
||||
message: string;
|
||||
meta: ApiResponse<TResponse, TContext>;
|
||||
constructor(meta: ApiResponse);
|
||||
}
|
||||
|
||||
@ -133,6 +133,16 @@ class RequestAbortedError extends ElasticsearchClientError {
|
||||
}
|
||||
}
|
||||
|
||||
class ProductNotSupportedError extends ElasticsearchClientError {
|
||||
constructor (meta) {
|
||||
super('Product Not Supported Error')
|
||||
Error.captureStackTrace(this, ProductNotSupportedError)
|
||||
this.name = 'ProductNotSupportedError'
|
||||
this.message = 'The client noticed that the server is not Elasticsearch and we do not support this unknown product.'
|
||||
this.meta = meta
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
ElasticsearchClientError,
|
||||
TimeoutError,
|
||||
@ -142,5 +152,6 @@ module.exports = {
|
||||
DeserializationError,
|
||||
ConfigurationError,
|
||||
ResponseError,
|
||||
RequestAbortedError
|
||||
RequestAbortedError,
|
||||
ProductNotSupportedError
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user