Verify connection to Elasticsearch (#1487) (#1497)

Co-authored-by: Tomas Della Vedova <delvedor@users.noreply.github.com>
This commit is contained in:
github-actions[bot]
2021-07-19 16:42:53 +02:00
committed by GitHub
parent 7358fd0c83
commit adc5c2b146
24 changed files with 1456 additions and 106 deletions

View File

@ -10,6 +10,7 @@ This page contains the information you need to connect and use the Client with
* <<client-usage, Using the client>> * <<client-usage, Using the client>>
* <<client-connect-proxy, Connecting through a proxy>> * <<client-connect-proxy, Connecting through a proxy>>
* <<client-error-handling, Handling errors>> * <<client-error-handling, Handling errors>>
* <<product-check, Automatic product check>>
[discrete] [discrete]
[[authentication]] [[authentication]]
@ -517,3 +518,15 @@ a|* `name` - `string`
* `statusCode` - `object`, the response headers * `statusCode` - `object`, the response headers
* `headers` - `object`, the response status code * `headers` - `object`, the response status code
|=== |===
[discrete]
[[product-check]]
=== Automatic product check
Since v7.14.0, the client performs a required product check before the first call.
This pre-flight product check allows the client to establish the version of Elasticsearch
that it is communicating with. The product check requires one additional HTTP request to
be sent to the server as part of the request pipeline before the main API call is sent.
In most cases, this will succeed during the very first API call that the client sends.
Once the product check completes, no further product check HTTP requests are sent for
subsequent API calls.

View File

@ -255,6 +255,10 @@ class Client extends ESAPI {
} }
const client = new Client(options) const client = new Client(options)
// sync product check
const tSymbol = Object.getOwnPropertySymbols(this.transport)
.filter(symbol => symbol.description === 'product check')[0]
client.transport[tSymbol] = this.transport[tSymbol]
// Add parent extensions // Add parent extensions
if (this[kExtensions].length > 0) { if (this[kExtensions].length > 0) {
this[kExtensions].forEach(({ name, opts, fn }) => { this[kExtensions].forEach(({ name, opts, fn }) => {

3
lib/Transport.d.ts vendored
View File

@ -26,7 +26,8 @@ import * as errors from './errors';
export type ApiError = errors.ConfigurationError | errors.ConnectionError | export type ApiError = errors.ConfigurationError | errors.ConnectionError |
errors.DeserializationError | errors.SerializationError | errors.DeserializationError | errors.SerializationError |
errors.NoLivingConnectionsError | errors.ResponseError | errors.NoLivingConnectionsError | errors.ResponseError |
errors.TimeoutError | errors.RequestAbortedError errors.TimeoutError | errors.RequestAbortedError |
errors.ProductNotSupportedError
export type Context = unknown export type Context = unknown

View File

@ -24,20 +24,24 @@ const os = require('os')
const { gzip, unzip, createGzip } = require('zlib') const { gzip, unzip, createGzip } = require('zlib')
const buffer = require('buffer') const buffer = require('buffer')
const ms = require('ms') const ms = require('ms')
const { EventEmitter } = require('events')
const { const {
ConnectionError, ConnectionError,
RequestAbortedError, RequestAbortedError,
NoLivingConnectionsError, NoLivingConnectionsError,
ResponseError, ResponseError,
ConfigurationError ConfigurationError,
ProductNotSupportedError
} = require('./errors') } = require('./errors')
const noop = () => {} const noop = () => {}
const productCheckEmitter = new EventEmitter()
const clientVersion = require('../package.json').version const clientVersion = require('../package.json').version
const userAgent = `elasticsearch-js/${clientVersion} (${os.platform()} ${os.release()}-${os.arch()}; Node.js ${process.version})` const userAgent = `elasticsearch-js/${clientVersion} (${os.platform()} ${os.release()}-${os.arch()}; Node.js ${process.version})`
const MAX_BUFFER_LENGTH = buffer.constants.MAX_LENGTH const MAX_BUFFER_LENGTH = buffer.constants.MAX_LENGTH
const MAX_STRING_LENGTH = buffer.constants.MAX_STRING_LENGTH const MAX_STRING_LENGTH = buffer.constants.MAX_STRING_LENGTH
const kProductCheck = Symbol('product check')
const kApiVersioning = Symbol('api versioning') const kApiVersioning = Symbol('api versioning')
class Transport { class Transport {
@ -65,6 +69,7 @@ class Transport {
this.generateRequestId = opts.generateRequestId || generateRequestId() this.generateRequestId = opts.generateRequestId || generateRequestId()
this.name = opts.name this.name = opts.name
this.opaqueIdPrefix = opts.opaqueIdPrefix this.opaqueIdPrefix = opts.opaqueIdPrefix
this[kProductCheck] = 0 // 0 = to be checked, 1 = checking, 2 = checked-ok, 3 checked-notok
this[kApiVersioning] = process.env.ELASTIC_CLIENT_APIVERSIONING === 'true' this[kApiVersioning] = process.env.ELASTIC_CLIENT_APIVERSIONING === 'true'
this.nodeFilter = opts.nodeFilter || defaultNodeFilter this.nodeFilter = opts.nodeFilter || defaultNodeFilter
@ -83,7 +88,11 @@ class Transport {
this._isSniffing = false this._isSniffing = false
if (opts.sniffOnStart === true) { if (opts.sniffOnStart === true) {
this.sniff({ reason: Transport.sniffReasons.SNIFF_ON_START }) // timer needed otherwise it will clash
// with the product check testing
setTimeout(() => {
this.sniff({ reason: Transport.sniffReasons.SNIFF_ON_START })
}, 10)
} }
} }
@ -350,91 +359,124 @@ class Transport {
} }
} }
this.emit('serialization', null, result) const prepareRequest = () => {
const headers = Object.assign({}, this.headers, lowerCaseHeaders(options.headers)) this.emit('serialization', null, result)
const headers = Object.assign({}, this.headers, lowerCaseHeaders(options.headers))
if (options.opaqueId !== undefined) { if (options.opaqueId !== undefined) {
headers['x-opaque-id'] = this.opaqueIdPrefix !== null headers['x-opaque-id'] = this.opaqueIdPrefix !== null
? this.opaqueIdPrefix + options.opaqueId ? this.opaqueIdPrefix + options.opaqueId
: options.opaqueId : options.opaqueId
}
// handle json body
if (params.body != null) {
if (shouldSerialize(params.body) === true) {
try {
params.body = this.serializer.serialize(params.body)
} catch (err) {
this.emit('request', err, result)
process.nextTick(callback, err, result)
return transportReturn
}
} }
if (params.body !== '') { // handle json body
headers['content-type'] = headers['content-type'] || (this[kApiVersioning] ? 'application/vnd.elasticsearch+json; compatible-with=7' : 'application/json') if (params.body != null) {
} if (shouldSerialize(params.body) === true) {
try {
// handle ndjson body params.body = this.serializer.serialize(params.body)
} else if (params.bulkBody != null) { } catch (err) {
if (shouldSerialize(params.bulkBody) === true) {
try {
params.body = this.serializer.ndserialize(params.bulkBody)
} catch (err) {
this.emit('request', err, result)
process.nextTick(callback, err, result)
return transportReturn
}
} else {
params.body = params.bulkBody
}
if (params.body !== '') {
headers['content-type'] = headers['content-type'] || (this[kApiVersioning] ? 'application/vnd.elasticsearch+x-ndjson; compatible-with=7' : 'application/x-ndjson')
}
}
params.headers = headers
// serializes the querystring
if (options.querystring == null) {
params.querystring = this.serializer.qserialize(params.querystring)
} else {
params.querystring = this.serializer.qserialize(
Object.assign({}, params.querystring, options.querystring)
)
}
// handles request timeout
params.timeout = toMs(options.requestTimeout || this.requestTimeout)
if (options.asStream === true) params.asStream = true
meta.request.params = params
meta.request.options = options
// handle compression
if (params.body !== '' && params.body != null) {
if (isStream(params.body) === true) {
if (compression === 'gzip') {
params.headers['content-encoding'] = compression
params.body = params.body.pipe(createGzip())
}
makeRequest()
} else if (compression === 'gzip') {
gzip(params.body, (err, buffer) => {
/* istanbul ignore next */
if (err) {
this.emit('request', err, result) this.emit('request', err, result)
return callback(err, result) process.nextTick(callback, err, result)
return transportReturn
} }
params.headers['content-encoding'] = compression }
params.headers['content-length'] = '' + Buffer.byteLength(buffer)
params.body = buffer if (params.body !== '') {
makeRequest() headers['content-type'] = headers['content-type'] || (this[kApiVersioning] ? 'application/vnd.elasticsearch+json; compatible-with=7' : 'application/json')
}) }
// handle ndjson body
} else if (params.bulkBody != null) {
if (shouldSerialize(params.bulkBody) === true) {
try {
params.body = this.serializer.ndserialize(params.bulkBody)
} catch (err) {
this.emit('request', err, result)
process.nextTick(callback, err, result)
return transportReturn
}
} else {
params.body = params.bulkBody
}
if (params.body !== '') {
headers['content-type'] = headers['content-type'] || (this[kApiVersioning] ? 'application/vnd.elasticsearch+x-ndjson; compatible-with=7' : 'application/x-ndjson')
}
}
params.headers = headers
// serializes the querystring
if (options.querystring == null) {
params.querystring = this.serializer.qserialize(params.querystring)
} else {
params.querystring = this.serializer.qserialize(
Object.assign({}, params.querystring, options.querystring)
)
}
// handles request timeout
params.timeout = toMs(options.requestTimeout || this.requestTimeout)
if (options.asStream === true) params.asStream = true
meta.request.params = params
meta.request.options = options
// handle compression
if (params.body !== '' && params.body != null) {
if (isStream(params.body) === true) {
if (compression === 'gzip') {
params.headers['content-encoding'] = compression
params.body = params.body.pipe(createGzip())
}
makeRequest()
} else if (compression === 'gzip') {
gzip(params.body, (err, buffer) => {
/* istanbul ignore next */
if (err) {
this.emit('request', err, result)
return callback(err, result)
}
params.headers['content-encoding'] = compression
params.headers['content-length'] = '' + Buffer.byteLength(buffer)
params.body = buffer
makeRequest()
})
} else {
params.headers['content-length'] = '' + Buffer.byteLength(params.body)
makeRequest()
}
} else { } else {
params.headers['content-length'] = '' + Buffer.byteLength(params.body)
makeRequest() makeRequest()
} }
}
// still need to check the product or waiting for the check to finish
if (this[kProductCheck] === 0 || this[kProductCheck] === 1) {
// let pass info requests
if (params.method === 'GET' && params.path === '/') {
prepareRequest()
} else {
// wait for product check to finish
productCheckEmitter.once('product-check', status => {
if (status === false) {
const err = new ProductNotSupportedError(result)
this.emit('request', err, result)
process.nextTick(callback, err, result)
} else {
prepareRequest()
}
})
// the very first request triggers the product check
if (this[kProductCheck] === 0) {
this.productCheck()
}
}
// the product check is finished and it's not Elasticsearch
} else if (this[kProductCheck] === 3) {
const err = new ProductNotSupportedError(result)
this.emit('request', err, result)
process.nextTick(callback, err, result)
// the product check finished and it's Elasticsearch
} else { } else {
makeRequest() prepareRequest()
} }
return transportReturn return transportReturn
@ -494,6 +536,59 @@ class Transport {
callback(null, hosts) callback(null, hosts)
}) })
} }
productCheck () {
debug('Start product check')
this[kProductCheck] = 1
this.request({
method: 'GET',
path: '/'
}, (err, result) => {
this[kProductCheck] = 3
if (err) {
debug('Product check failed', err)
if (err.statusCode === 401 || err.statusCode === 403) {
this[kProductCheck] = 2
process.emitWarning('The client is unable to verify that the server is Elasticsearch due to security privileges on the server side. Some functionality may not be compatible if the server is running an unsupported product.')
productCheckEmitter.emit('product-check', true)
} else {
this[kProductCheck] = 0
productCheckEmitter.emit('product-check', false)
}
} else {
debug('Checking elasticsearch version', result.body, result.headers)
if (result.body.version == null || typeof result.body.version.number !== 'string') {
debug('Can\'t access Elasticsearch version')
return productCheckEmitter.emit('product-check', false)
}
const tagline = result.body.tagline
const version = result.body.version.number.split('.')
const major = Number(version[0])
const minor = Number(version[1])
if (major < 6) {
return productCheckEmitter.emit('product-check', false)
} else if (major >= 6 && major < 7) {
if (tagline !== 'You Know, for Search') {
debug('Bad tagline')
return productCheckEmitter.emit('product-check', false)
}
} else if (major === 7 && minor < 14) {
if (tagline !== 'You Know, for Search' || result.body.version.build_flavor !== 'default') {
debug('Bad tagline or build_flavor')
return productCheckEmitter.emit('product-check', false)
}
} else {
if (result.headers['x-elastic-product'] !== 'Elasticsearch') {
debug('x-elastic-product not recognized')
return productCheckEmitter.emit('product-check', false)
}
}
debug('Valid Elasticsearch distribution')
this[kProductCheck] = 2
productCheckEmitter.emit('product-check', true)
}
})
}
} }
Transport.sniffReasons = { Transport.sniffReasons = {

7
lib/errors.d.ts vendored
View File

@ -81,3 +81,10 @@ export declare class RequestAbortedError<TResponse = Record<string, any>, TConte
meta: ApiResponse<TResponse, TContext>; meta: ApiResponse<TResponse, TContext>;
constructor(message: string, meta: ApiResponse); constructor(message: string, meta: ApiResponse);
} }
export declare class ProductNotSupportedError<TResponse = Record<string, any>, TContext = Context> extends ElasticsearchClientError {
name: string;
message: string;
meta: ApiResponse<TResponse, TContext>;
constructor(meta: ApiResponse);
}

View File

@ -133,6 +133,16 @@ class RequestAbortedError extends ElasticsearchClientError {
} }
} }
class ProductNotSupportedError extends ElasticsearchClientError {
constructor (meta) {
super('Product Not Supported Error')
Error.captureStackTrace(this, ProductNotSupportedError)
this.name = 'ProductNotSupportedError'
this.message = 'The client noticed that the server is not Elasticsearch and we do not support this unknown product.'
this.meta = meta
}
}
module.exports = { module.exports = {
ElasticsearchClientError, ElasticsearchClientError,
TimeoutError, TimeoutError,
@ -142,5 +152,6 @@ module.exports = {
DeserializationError, DeserializationError,
ConfigurationError, ConfigurationError,
ResponseError, ResponseError,
RequestAbortedError RequestAbortedError,
ProductNotSupportedError
} }

View File

@ -21,7 +21,7 @@
const { test } = require('tap') const { test } = require('tap')
const intoStream = require('into-stream') const intoStream = require('into-stream')
const { Client, Connection, events } = require('../../index') const { Connection, events } = require('../../index')
const { const {
TimeoutError, TimeoutError,
ConnectionError, ConnectionError,
@ -31,6 +31,7 @@ const {
DeserializationError DeserializationError
} = require('../../lib/errors') } = require('../../lib/errors')
const { const {
Client,
buildServer, buildServer,
connection: { connection: {
MockConnection, MockConnection,

View File

@ -2,8 +2,9 @@
const { test } = require('tap') const { test } = require('tap')
const FakeTimers = require('@sinonjs/fake-timers') const FakeTimers = require('@sinonjs/fake-timers')
const { Client, Transport } = require('../../index') const { Transport } = require('../../index')
const { const {
Client,
connection: { MockConnection, MockConnectionSniff } connection: { MockConnection, MockConnectionSniff }
} = require('../utils') } = require('../utils')
const noop = () => {} const noop = () => {}

File diff suppressed because it is too large Load Diff

View File

@ -4,8 +4,8 @@
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 0 process.env.NODE_TLS_REJECT_UNAUTHORIZED = 0
const { test } = require('tap') const { test } = require('tap')
const { Client } = require('../../index')
const { const {
Client,
buildProxy: { buildProxy: {
createProxy, createProxy,
createSecureProxy, createSecureProxy,

View File

@ -23,8 +23,8 @@ const { test } = require('tap')
const { URL } = require('url') const { URL } = require('url')
const FakeTimers = require('@sinonjs/fake-timers') const FakeTimers = require('@sinonjs/fake-timers')
const workq = require('workq') const workq = require('workq')
const { buildCluster } = require('../utils') const { Client, buildCluster } = require('../utils')
const { Client, events } = require('../../index') const { events } = require('../../index')
/** /**
* The aim of this test is to verify how the resurrect logic behaves * The aim of this test is to verify how the resurrect logic behaves

View File

@ -23,8 +23,8 @@ const { test } = require('tap')
const { URL } = require('url') const { URL } = require('url')
const FakeTimers = require('@sinonjs/fake-timers') const FakeTimers = require('@sinonjs/fake-timers')
const workq = require('workq') const workq = require('workq')
const { buildCluster } = require('../utils') const { Client, buildCluster } = require('../utils')
const { Client, Connection, Transport, events, errors } = require('../../index') const { Connection, Transport, events, errors } = require('../../index')
/** /**
* The aim of this test is to verify how the sniffer behaves * The aim of this test is to verify how the sniffer behaves

View File

@ -19,8 +19,8 @@
'use strict' 'use strict'
const { Client, errors } = require('../../index') const { errors } = require('../../index')
const { buildServer } = require('../utils') const { Client, buildServer } = require('../utils')
function runAsyncTest (test) { function runAsyncTest (test) {
test('async await (search)', t => { test('async await (search)', t => {

View File

@ -20,8 +20,8 @@
'use strict' 'use strict'
const { test } = require('tap') const { test } = require('tap')
const { Client, errors } = require('../../index') const { errors } = require('../../index')
const { buildServer } = require('../utils') const { Client, buildServer } = require('../utils')
test('Basic (callback)', t => { test('Basic (callback)', t => {
t.plan(2) t.plan(2)

View File

@ -20,8 +20,9 @@
'use strict' 'use strict'
const { test } = require('tap') const { test } = require('tap')
const { Client, errors } = require('../../index') const { errors } = require('../../index')
const { const {
Client,
buildServer, buildServer,
connection: { MockConnection } connection: { MockConnection }
} = require('../utils') } = require('../utils')

View File

@ -23,9 +23,9 @@ const { test } = require('tap')
const { URL } = require('url') const { URL } = require('url')
const buffer = require('buffer') const buffer = require('buffer')
const intoStream = require('into-stream') const intoStream = require('into-stream')
const { Client, ConnectionPool, Transport, Connection, errors } = require('../../index') const { ConnectionPool, Transport, Connection, errors } = require('../../index')
const { CloudConnectionPool } = require('../../lib/pool') const { CloudConnectionPool } = require('../../lib/pool')
const { buildServer } = require('../utils') const { Client, buildServer } = require('../utils')
let clientVersion = require('../../package.json').version let clientVersion = require('../../package.json').version
if (clientVersion.includes('-')) { if (clientVersion.includes('-')) {
clientVersion = clientVersion.slice(0, clientVersion.indexOf('-')) + 'p' clientVersion = clientVersion.slice(0, clientVersion.indexOf('-')) + 'p'

View File

@ -20,9 +20,10 @@
'use strict' 'use strict'
const { test } = require('tap') const { test } = require('tap')
const { Client, events } = require('../../index') const { events } = require('../../index')
const { TimeoutError } = require('../../lib/errors') const { TimeoutError } = require('../../lib/errors')
const { const {
Client,
connection: { connection: {
MockConnection, MockConnection,
MockConnectionTimeout MockConnectionTimeout

View File

@ -24,8 +24,8 @@ const { join } = require('path')
const split = require('split2') const split = require('split2')
const FakeTimers = require('@sinonjs/fake-timers') const FakeTimers = require('@sinonjs/fake-timers')
const { test } = require('tap') const { test } = require('tap')
const { Client, errors } = require('../../../') const { errors } = require('../../../')
const { buildServer, connection } = require('../../utils') const { Client, buildServer, connection } = require('../../utils')
let clientVersion = require('../../../package.json').version let clientVersion = require('../../../package.json').version
if (clientVersion.includes('-')) { if (clientVersion.includes('-')) {
clientVersion = clientVersion.slice(0, clientVersion.indexOf('-')) + 'p' clientVersion = clientVersion.slice(0, clientVersion.indexOf('-')) + 'p'

View File

@ -20,8 +20,8 @@
'use strict' 'use strict'
const { test } = require('tap') const { test } = require('tap')
const { Client, errors } = require('../../../') const { errors } = require('../../../')
const { connection } = require('../../utils') const { Client, connection } = require('../../utils')
const FakeTimers = require('@sinonjs/fake-timers') const FakeTimers = require('@sinonjs/fake-timers')
test('Basic', async t => { test('Basic', async t => {

View File

@ -20,8 +20,8 @@
'use strict' 'use strict'
const { test } = require('tap') const { test } = require('tap')
const { Client, errors } = require('../../../') const { errors } = require('../../../')
const { connection } = require('../../utils') const { Client, connection } = require('../../utils')
let clientVersion = require('../../../package.json').version let clientVersion = require('../../../package.json').version
if (clientVersion.includes('-')) { if (clientVersion.includes('-')) {
clientVersion = clientVersion.slice(0, clientVersion.indexOf('-')) + 'p' clientVersion = clientVersion.slice(0, clientVersion.indexOf('-')) + 'p'

View File

@ -20,8 +20,7 @@
'use strict' 'use strict'
const { test } = require('tap') const { test } = require('tap')
const { Client } = require('../../../') const { Client, connection } = require('../../utils')
const { connection } = require('../../utils')
test('Search should have an additional documents property', async t => { test('Search should have an additional documents property', async t => {
const MockConnection = connection.buildMockConnection({ const MockConnection = connection.buildMockConnection({

View File

@ -27,6 +27,7 @@ const os = require('os')
const intoStream = require('into-stream') const intoStream = require('into-stream')
const { const {
buildServer, buildServer,
skipProductCheck,
connection: { MockConnection, MockConnectionTimeout, MockConnectionError } connection: { MockConnection, MockConnectionTimeout, MockConnectionError }
} = require('../utils') } = require('../utils')
const { const {
@ -65,6 +66,7 @@ test('Basic', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -92,6 +94,7 @@ test('Basic (promises support)', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport transport
.request({ .request({
@ -119,6 +122,7 @@ test('Basic - failing (promises support)', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport transport
.request({ .request({
@ -145,6 +149,7 @@ test('Basic (options + promises support)', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport transport
.request({ .request({
@ -190,6 +195,7 @@ test('Send POST', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'POST', method: 'POST',
@ -246,6 +252,7 @@ test('Send POST (ndjson)', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'POST', method: 'POST',
@ -289,6 +296,7 @@ test('Send stream', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'POST', method: 'POST',
@ -332,6 +340,7 @@ test('Send stream (bulkBody)', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'POST', method: 'POST',
@ -365,6 +374,7 @@ test('Not JSON payload from server', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -396,6 +406,7 @@ test('NoLivingConnectionsError (null connection)', t => {
return null return null
} }
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -424,6 +435,7 @@ test('NoLivingConnectionsError (undefined connection)', t => {
return undefined return undefined
} }
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -447,6 +459,7 @@ test('SerializationError', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
const body = { hello: 'world' } const body = { hello: 'world' }
body.o = body body.o = body
@ -473,6 +486,7 @@ test('SerializationError (bulk)', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
const bulkBody = { hello: 'world' } const bulkBody = { hello: 'world' }
bulkBody.o = bulkBody bulkBody.o = bulkBody
@ -505,6 +519,7 @@ test('DeserializationError', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -541,6 +556,7 @@ test('TimeoutError (should call markDead on the failing connection)', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -575,6 +591,7 @@ test('ConnectionError (should call markDead on the failing connection)', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -620,6 +637,7 @@ test('Retry mechanism', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -664,6 +682,7 @@ test('Should not retry if the body is a stream', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'POST', method: 'POST',
@ -709,6 +728,7 @@ test('Should not retry if the bulkBody is a stream', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'POST', method: 'POST',
@ -754,6 +774,7 @@ test('No retry', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'POST', method: 'POST',
@ -805,6 +826,7 @@ test('Custom retry mechanism', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -852,6 +874,7 @@ test('Should not retry on 429', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -889,6 +912,7 @@ test('Should call markAlive with a successful response', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -926,6 +950,7 @@ test('Should call resurrect on every request', t => {
sniffOnStart: false, sniffOnStart: false,
name: 'elasticsearch-js' name: 'elasticsearch-js'
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -954,6 +979,7 @@ test('Should return a request aborter utility', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
const request = transport.request({ const request = transport.request({
method: 'GET', method: 'GET',
@ -1002,6 +1028,7 @@ test('Retry mechanism and abort', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
const request = transport.request({ const request = transport.request({
method: 'GET', method: 'GET',
@ -1031,6 +1058,7 @@ test('Abort a request with the promise API', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
const request = transport.request({ const request = transport.request({
method: 'GET', method: 'GET',
@ -1070,6 +1098,7 @@ test('ResponseError', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1105,6 +1134,7 @@ test('Override requestTimeout', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1167,6 +1197,7 @@ test('sniff', t => {
sniffOnConnectionFault: true, sniffOnConnectionFault: true,
sniffEndpoint: '/sniff' sniffEndpoint: '/sniff'
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1200,6 +1231,7 @@ test('sniff', t => {
sniffInterval: 1, sniffInterval: 1,
sniffEndpoint: '/sniff' sniffEndpoint: '/sniff'
}) })
skipProductCheck(transport)
const params = { method: 'GET', path: '/' } const params = { method: 'GET', path: '/' }
clock.tick(100) clock.tick(100)
@ -1233,6 +1265,7 @@ test('sniff', t => {
sniffInterval: false, sniffInterval: false,
sniffEndpoint: '/sniff' sniffEndpoint: '/sniff'
}) })
skipProductCheck(transport)
transport.sniff((err, hosts) => { transport.sniff((err, hosts) => {
t.ok(err instanceof ConnectionError) t.ok(err instanceof ConnectionError)
@ -1269,6 +1302,7 @@ test(`Should mark as dead connections where the statusCode is 502/3/4
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1323,6 +1357,7 @@ test('Should retry the request if the statusCode is 502/3/4', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1354,6 +1389,7 @@ test('Ignore status code', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1403,6 +1439,7 @@ test('Should serialize the querystring', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1446,6 +1483,7 @@ test('timeout option', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1476,6 +1514,7 @@ test('timeout option', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1512,6 +1551,7 @@ test('timeout option', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1542,6 +1582,7 @@ test('timeout option', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1576,6 +1617,7 @@ test('Should cast to boolean HEAD request', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'HEAD', method: 'HEAD',
@ -1601,6 +1643,7 @@ test('Should cast to boolean HEAD request', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'HEAD', method: 'HEAD',
@ -1627,6 +1670,7 @@ test('Should cast to boolean HEAD request', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'HEAD', method: 'HEAD',
@ -1652,6 +1696,7 @@ test('Should cast to boolean HEAD request', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'HEAD', method: 'HEAD',
@ -1694,6 +1739,7 @@ test('Suggest compression', t => {
sniffOnStart: false, sniffOnStart: false,
suggestCompression: true suggestCompression: true
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1734,6 +1780,7 @@ test('Broken compression', t => {
sniffOnStart: false, sniffOnStart: false,
suggestCompression: true suggestCompression: true
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1769,6 +1816,7 @@ test('Warning header', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1806,6 +1854,7 @@ test('Warning header', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1840,6 +1889,7 @@ test('Warning header', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1875,6 +1925,7 @@ test('asStream set to true', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -1933,6 +1984,7 @@ test('Compress request', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'POST', method: 'POST',
@ -1981,6 +2033,7 @@ test('Compress request', t => {
sniffOnStart: false, sniffOnStart: false,
compression: 'gzip' compression: 'gzip'
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'POST', method: 'POST',
@ -2026,6 +2079,7 @@ test('Compress request', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'POST', method: 'POST',
@ -2085,6 +2139,7 @@ test('Compress request', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'DELETE', method: 'DELETE',
@ -2151,6 +2206,7 @@ test('Compress request', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'POST', method: 'POST',
@ -2195,6 +2251,7 @@ test('Headers configuration', t => {
'x-foo': 'bar' 'x-foo': 'bar'
} }
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -2234,6 +2291,7 @@ test('Headers configuration', t => {
'x-foo': 'bar' 'x-foo': 'bar'
} }
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -2272,6 +2330,7 @@ test('Headers configuration', t => {
'x-foo': 'bar' 'x-foo': 'bar'
} }
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -2312,6 +2371,7 @@ test('nodeFilter and nodeSelector', t => {
return conns[0] return conns[0]
} }
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -2345,6 +2405,7 @@ test('Should accept custom querystring in the optons object', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -2381,6 +2442,7 @@ test('Should accept custom querystring in the optons object', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -2425,6 +2487,7 @@ test('Should add an User-Agent header', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -2459,6 +2522,7 @@ test('Should pass request params and options to generateRequestId', t => {
return 'id' return 'id'
} }
}) })
skipProductCheck(transport)
transport.request(params, options, t.error) transport.request(params, options, t.error)
}) })
@ -2484,6 +2548,7 @@ test('Secure json parsing', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -2516,6 +2581,7 @@ test('Secure json parsing', t => {
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
transport.request({ transport.request({
method: 'GET', method: 'GET',
@ -2574,6 +2640,7 @@ test('The callback with a sync error should be called in the next tick - json',
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
const body = { a: true } const body = { a: true }
body.o = body body.o = body
@ -2605,6 +2672,7 @@ test('The callback with a sync error should be called in the next tick - ndjson'
sniffInterval: false, sniffInterval: false,
sniffOnStart: false sniffOnStart: false
}) })
skipProductCheck(transport)
const field = { a: true } const field = { a: true }
field.o = field field.o = field

View File

@ -133,7 +133,7 @@ function buildMockConnection (opts) {
class MockConnection extends Connection { class MockConnection extends Connection {
request (params, callback) { request (params, callback) {
let { body, statusCode } = opts.onRequest(params) let { body, statusCode, headers } = opts.onRequest(params)
if (typeof body !== 'string') { if (typeof body !== 'string') {
body = JSON.stringify(body) body = JSON.stringify(body)
} }
@ -144,7 +144,8 @@ function buildMockConnection (opts) {
'content-type': 'application/json;utf=8', 'content-type': 'application/json;utf=8',
date: new Date().toISOString(), date: new Date().toISOString(),
connection: 'keep-alive', connection: 'keep-alive',
'content-length': Buffer.byteLength(body) 'content-length': Buffer.byteLength(body),
...headers
} }
process.nextTick(() => { process.nextTick(() => {
if (!aborted) { if (!aborted) {

View File

@ -25,6 +25,7 @@ const buildServer = require('./buildServer')
const buildCluster = require('./buildCluster') const buildCluster = require('./buildCluster')
const buildProxy = require('./buildProxy') const buildProxy = require('./buildProxy')
const connection = require('./MockConnection') const connection = require('./MockConnection')
const { Client } = require('../../')
async function waitCluster (client, waitForStatus = 'green', timeout = '50s', times = 0) { async function waitCluster (client, waitForStatus = 'green', timeout = '50s', times = 0) {
if (!client) { if (!client) {
@ -41,10 +42,25 @@ async function waitCluster (client, waitForStatus = 'green', timeout = '50s', ti
} }
} }
function skipProductCheck (client) {
const tSymbol = Object.getOwnPropertySymbols(client.transport || client)
.filter(symbol => symbol.description === 'product check')[0]
;(client.transport || client)[tSymbol] = 2
}
class NoProductCheckClient extends Client {
constructor (opts) {
super(opts)
skipProductCheck(this)
}
}
module.exports = { module.exports = {
buildServer, buildServer,
buildCluster, buildCluster,
buildProxy, buildProxy,
connection, connection,
waitCluster waitCluster,
skipProductCheck,
Client: NoProductCheckClient
} }