Elasticsearch v8 (#1558)

This commit is contained in:
Tomas Della Vedova
2021-09-30 09:45:04 +02:00
committed by GitHub
parent 4c72b981cd
commit 1a227459f0
255 changed files with 36905 additions and 42018 deletions

View File

@ -1,180 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
'use strict'
const assert = require('assert')
const { Connection } = require('../../index')
const {
ConnectionError,
RequestAbortedError,
TimeoutError
} = require('../../lib/errors')
const intoStream = require('into-stream')
class MockConnection extends Connection {
request (params, callback) {
let aborted = false
const stream = intoStream(JSON.stringify({ hello: 'world' }))
stream.statusCode = setStatusCode(params.path)
stream.headers = {
'content-type': 'application/json;utf=8',
date: new Date().toISOString(),
connection: 'keep-alive',
'content-length': '17'
}
process.nextTick(() => {
if (!aborted) {
callback(null, stream)
} else {
callback(new RequestAbortedError(), null)
}
})
return {
abort: () => { aborted = true }
}
}
}
class MockConnectionTimeout extends Connection {
request (params, callback) {
let aborted = false
process.nextTick(() => {
if (!aborted) {
callback(new TimeoutError('Request timed out', params), null)
} else {
callback(new RequestAbortedError(), null)
}
})
return {
abort: () => { aborted = true }
}
}
}
class MockConnectionError extends Connection {
request (params, callback) {
let aborted = false
process.nextTick(() => {
if (!aborted) {
callback(new ConnectionError('Kaboom'), null)
} else {
callback(new RequestAbortedError(), null)
}
})
return {
abort: () => { aborted = true }
}
}
}
class MockConnectionSniff extends Connection {
request (params, callback) {
let aborted = false
const sniffResult = {
nodes: {
'node-1': {
http: {
publish_address: 'localhost:9200'
},
roles: ['master', 'data', 'ingest']
},
'node-2': {
http: {
publish_address: 'localhost:9201'
},
roles: ['master', 'data', 'ingest']
}
}
}
const stream = intoStream(JSON.stringify(sniffResult))
stream.statusCode = setStatusCode(params.path)
stream.headers = {
'content-type': 'application/json;utf=8',
date: new Date().toISOString(),
connection: 'keep-alive',
'content-length': '191'
}
process.nextTick(() => {
if (!aborted) {
if (params.headers.timeout) {
callback(new TimeoutError('Request timed out', params), null)
} else {
callback(null, stream)
}
} else {
callback(new RequestAbortedError(), null)
}
})
return {
abort: () => { aborted = true }
}
}
}
function buildMockConnection (opts) {
assert(opts.onRequest, 'Missing required onRequest option')
class MockConnection extends Connection {
request (params, callback) {
let { body, statusCode, headers } = opts.onRequest(params)
if (typeof body !== 'string') {
body = JSON.stringify(body)
}
let aborted = false
const stream = intoStream(body)
stream.statusCode = statusCode || 200
stream.headers = {
'content-type': 'application/json;utf=8',
date: new Date().toISOString(),
connection: 'keep-alive',
'content-length': Buffer.byteLength(body),
...headers
}
process.nextTick(() => {
if (!aborted) {
callback(null, stream)
} else {
callback(new RequestAbortedError(), null)
}
})
return {
abort: () => { aborted = true }
}
}
}
return MockConnection
}
function setStatusCode (path) {
const statusCode = Number(path.slice(1))
if (Number.isInteger(statusCode)) {
return statusCode
}
return 200
}
module.exports = {
MockConnection,
MockConnectionTimeout,
MockConnectionError,
MockConnectionSniff,
buildMockConnection
}

View File

@ -0,0 +1,139 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import assert from 'assert'
import * as http from 'http'
import {
BaseConnection,
ConnectionRequestParams,
ConnectionRequestOptions,
ConnectionRequestResponse,
errors
} from '@elastic/transport'
const {
ConnectionError,
TimeoutError
} = errors
export class MockConnection extends BaseConnection {
request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
return new Promise((resolve, reject) => {
const body = JSON.stringify({ hello: 'world' })
const statusCode = setStatusCode(params.path)
const headers = {
'content-type': 'application/json;utf=8',
date: new Date().toISOString(),
connection: 'keep-alive',
'content-length': '17',
'x-elastic-product': 'Elasticsearch'
}
process.nextTick(resolve, { body, statusCode, headers })
})
}
}
export class MockConnectionTimeout extends BaseConnection {
request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
return new Promise((resolve, reject) => {
process.nextTick(reject, new TimeoutError('Request timed out'))
})
}
}
export class MockConnectionError extends BaseConnection {
request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
return new Promise((resolve, reject) => {
process.nextTick(reject, new ConnectionError('kaboom'))
})
}
}
export class MockConnectionSniff extends BaseConnection {
request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
return new Promise((resolve, reject) => {
const sniffResult = {
nodes: {
'node-1': {
http: {
publish_address: 'localhost:9200'
}
},
'node-2': {
http: {
publish_address: 'localhost:9201'
}
}
}
}
const body = JSON.stringify(sniffResult)
const statusCode = setStatusCode(params.path)
const headers = {
'content-type': 'application/json;utf=8',
date: new Date().toISOString(),
connection: 'keep-alive',
'content-length': '191',
'x-elastic-product': 'Elasticsearch'
}
if (params.headers?.timeout != null) {
process.nextTick(reject, new TimeoutError('Request timed out'))
} else {
process.nextTick(resolve, { body, statusCode, headers })
}
})
}
}
interface onRequestMock {
onRequest(opts: ConnectionRequestParams): { body: any, statusCode?: number, headers?: http.IncomingHttpHeaders }
}
export function buildMockConnection (opts: onRequestMock) {
assert(opts.onRequest, 'Missing required onRequest option')
class MockConnection extends BaseConnection {
request (params: ConnectionRequestParams, options: ConnectionRequestOptions): Promise<ConnectionRequestResponse> {
return new Promise((resolve, reject) => {
params.headers = { ...this.headers, ...params.headers }
let { body, statusCode, headers } = opts.onRequest(params)
if (typeof body !== 'string' && !(body instanceof Buffer)) {
body = JSON.stringify(body)
}
statusCode = statusCode || 200
headers = {
'content-type': 'application/json;utf=8',
date: new Date().toISOString(),
connection: 'keep-alive',
'content-length': Buffer.byteLength(body) + '',
'x-elastic-product': 'Elasticsearch',
...headers
}
process.nextTick(resolve, { body, statusCode, headers })
})
}
}
return MockConnection
}
function setStatusCode (path: string): number {
const statusCode = Number(path.slice(1))
if (Number.isInteger(statusCode)) {
return statusCode
}
return 200
}

View File

@ -1,110 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
'use strict'
const debug = require('debug')('elasticsearch-test')
const workq = require('workq')
const buildServer = require('./buildServer')
let id = 0
function buildCluster (options, callback) {
const clusterId = id++
debug(`Booting cluster '${clusterId}'`)
if (typeof options === 'function') {
callback = options
options = {}
}
const q = workq()
const nodes = {}
const sniffResult = { nodes: {} }
options.numberOfNodes = options.numberOfNodes || 4
for (let i = 0; i < options.numberOfNodes; i++) {
q.add(bootNode, { id: `node${i}` })
}
function bootNode (q, opts, done) {
function handler (req, res) {
res.setHeader('content-type', 'application/json')
if (req.url === '/_nodes/_all/http') {
res.end(JSON.stringify(sniffResult))
} else {
res.end(JSON.stringify({ hello: 'world' }))
}
}
buildServer(options.handler || handler, ({ port }, server) => {
nodes[opts.id] = {
url: `http://127.0.0.1:${port}`,
server
}
sniffResult.nodes[opts.id] = {
http: {
publish_address: options.hostPublishAddress
? `localhost/127.0.0.1:${port}`
: `127.0.0.1:${port}`
},
roles: ['master', 'data', 'ingest']
}
debug(`Booted cluster node '${opts.id}' on port ${port} (cluster id: '${clusterId}')`)
done()
})
}
function shutdown () {
debug(`Shutting down cluster '${clusterId}'`)
for (const id in nodes) {
kill(id)
}
}
function kill (id, callback) {
debug(`Shutting down cluster node '${id}' (cluster id: '${clusterId}')`)
const node = nodes[id]
delete nodes[id]
delete sniffResult.nodes[id]
node.server.stop(callback)
}
function spawn (id, callback) {
debug(`Spawning cluster node '${id}' (cluster id: '${clusterId}')`)
q.add(bootNode, { id })
q.add((q, done) => {
callback()
done()
})
}
const cluster = {
nodes,
shutdown,
kill,
spawn
}
q.drain(done => {
debug(`Cluster '${clusterId}' booted with ${options.numberOfNodes} nodes`)
callback(cluster)
done()
})
}
module.exports = buildCluster

119
test/utils/buildCluster.ts Normal file
View File

@ -0,0 +1,119 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import Debug from 'debug'
import * as http from 'http'
import buildServer, { ServerHandler } from './buildServer'
import { StoppableServer } from 'stoppable'
interface BuildClusterOptions {
numberOfNodes?: number
handler?: ServerHandler
hostPublishAddress?: boolean
}
interface Node {
url: string
server: StoppableServer
}
interface Cluster {
nodes: Record<string, Node>,
shutdown(): Promise<void>,
kill(id: string): Promise<void>,
spawn(id: string): Promise<void>
}
interface SniffNode {
http: {
publish_address: string
},
roles: string[]
}
type SniffResult = Record<string, SniffNode>
const debug = Debug('elasticsearch-test')
let id = 0
export default async function buildCluster (options: BuildClusterOptions): Promise<Cluster> {
const clusterId = id++
debug(`Booting cluster '${clusterId}'`)
const cluster: Cluster = {
nodes: {},
shutdown,
kill,
spawn
}
options.numberOfNodes = options.numberOfNodes || 4
for (let i = 0; i < options.numberOfNodes; i++) {
await bootNode(`node${i}`)
}
async function bootNode (id: string): Promise<void> {
const [{ port }, server] = await buildServer(options.handler ?? handler)
cluster.nodes[id] = {
url: `http://127.0.0.1:${port}`,
server
}
}
function handler (req: http.IncomingMessage, res: http.ServerResponse): void {
res.setHeader('content-type', 'application/json')
if (req.url === '/_nodes/_all/http') {
const sniffResult: SniffResult = Object.keys(cluster.nodes).reduce((acc: SniffResult, val: string) => {
const node = cluster.nodes[val]
acc[val] = {
http: {
publish_address: options.hostPublishAddress
? `localhost/${node.url}`
: node.url
},
roles: ['master', 'data', 'ingest']
}
return acc
}, {})
res.end(JSON.stringify(sniffResult))
} else {
res.end(JSON.stringify({ hello: 'world' }))
}
}
async function shutdown (): Promise<void> {
debug(`Shutting down cluster '${clusterId}'`)
for (const id in cluster.nodes) {
await kill(id)
}
}
async function kill (id: string): Promise<void> {
debug(`Shutting down cluster node '${id}' (cluster id: '${clusterId}')`)
const node = cluster.nodes[id]
delete cluster.nodes[id]
node.server.stop()
}
async function spawn (id: string): Promise<void> {
debug(`Spawning cluster node '${id}' (cluster id: '${clusterId}')`)
await bootNode(id)
}
return cluster
}

View File

@ -1,60 +0,0 @@
// Licensed to Elasticsearch B.V under one or more agreements.
// Elasticsearch B.V licenses this file to you under the Apache 2.0 License.
// See the LICENSE file in the project root for more information
'use strict'
const proxy = require('proxy')
const { readFileSync } = require('fs')
const { join } = require('path')
const http = require('http')
const https = require('https')
const ssl = {
key: readFileSync(join(__dirname, '..', 'fixtures', 'https.key')),
cert: readFileSync(join(__dirname, '..', 'fixtures', 'https.cert'))
}
function createProxy () {
return new Promise((resolve, reject) => {
const server = proxy(http.createServer())
server.listen(0, '127.0.0.1', () => {
resolve(server)
})
})
}
function createSecureProxy () {
return new Promise((resolve, reject) => {
const server = proxy(https.createServer(ssl))
server.listen(0, '127.0.0.1', () => {
resolve(server)
})
})
}
function createServer (handler, callback) {
return new Promise((resolve, reject) => {
const server = http.createServer()
server.listen(0, '127.0.0.1', () => {
resolve(server)
})
})
}
function createSecureServer (handler, callback) {
return new Promise((resolve, reject) => {
const server = https.createServer(ssl)
server.listen(0, '127.0.0.1', () => {
resolve(server)
})
})
}
module.exports = {
ssl,
createProxy,
createSecureProxy,
createServer,
createSecureServer
}

71
test/utils/buildProxy.ts Normal file
View File

@ -0,0 +1,71 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
// @ts-ignore
import proxy from 'proxy'
import { readFileSync } from 'fs'
import { join } from 'path'
import * as http from 'http'
import * as https from 'https'
export const ssl = {
key: readFileSync(join(__dirname, '..', 'fixtures', 'https.key')),
cert: readFileSync(join(__dirname, '..', 'fixtures', 'https.cert'))
}
type AuthenticateFn = (err: Error | null, valid: boolean) => void
interface ProxyServer extends http.Server {
authenticate?(req: http.IncomingMessage, fn: AuthenticateFn): void
}
export function createProxy (): Promise<ProxyServer> {
return new Promise((resolve, reject) => {
const server = proxy(http.createServer())
server.listen(0, '127.0.0.1', () => {
resolve(server)
})
})
}
export function createSecureProxy (): Promise<ProxyServer> {
return new Promise((resolve, reject) => {
const server = proxy(https.createServer(ssl))
server.listen(0, '127.0.0.1', () => {
resolve(server)
})
})
}
export function createServer (): Promise<http.Server> {
return new Promise((resolve, reject) => {
const server = http.createServer()
server.listen(0, '127.0.0.1', () => {
resolve(server)
})
})
}
export function createSecureServer (): Promise<http.Server> {
return new Promise((resolve, reject) => {
const server = https.createServer(ssl)
server.listen(0, '127.0.0.1', () => {
resolve(server)
})
})
}

View File

@ -17,19 +17,18 @@
* under the License.
*/
'use strict'
import { readFileSync } from 'fs'
import crypto from 'crypto'
import { join } from 'path'
import https from 'https'
import http from 'http'
import Debug from 'debug'
import stoppable, { StoppableServer } from 'stoppable'
const crypto = require('crypto')
const debug = require('debug')('elasticsearch-test')
const stoppable = require('stoppable')
const debug = Debug('elasticsearch-test')
// allow self signed certificates for testing purposes
process.env.NODE_TLS_REJECT_UNAUTHORIZED = 0
const { readFileSync } = require('fs')
const { join } = require('path')
const https = require('https')
const http = require('http')
process.env.NODE_TLS_REJECT_UNAUTHORIZED = '0'
const secureOpts = {
key: readFileSync(join(__dirname, '..', 'fixtures', 'https.key'), 'utf8'),
@ -43,46 +42,48 @@ const caFingerprint = getFingerprint(secureOpts.cert
.join('')
)
export type ServerHandler = (req: http.IncomingMessage, res: http.ServerResponse) => void
interface Options { secure?: boolean }
type Server = [{ key: string, cert: string, port: number, caFingerprint: string }, StoppableServer]
let id = 0
function buildServer (handler, opts, cb) {
export default function buildServer (handler: ServerHandler, opts: Options = {}): Promise<Server> {
const serverId = id++
debug(`Booting server '${serverId}'`)
if (cb == null) {
cb = opts
opts = {}
}
const server = opts.secure
? stoppable(https.createServer(secureOpts))
: stoppable(http.createServer())
server.on('request', handler)
server.on('request', (req, res) => {
res.setHeader('x-elastic-product', 'Elasticsearch')
handler(req, res)
})
server.on('error', err => {
console.log('http server error', err)
process.exit(1)
})
if (cb === undefined) {
return new Promise((resolve, reject) => {
server.listen(0, () => {
const port = server.address().port
debug(`Server '${serverId}' booted on port ${port}`)
resolve([Object.assign({}, secureOpts, { port, caFingerprint }), server])
})
})
} else {
return new Promise((resolve, reject) => {
server.listen(0, () => {
// @ts-expect-error
const port = server.address().port
debug(`Server '${serverId}' booted on port ${port}`)
cb(Object.assign({}, secureOpts, { port }), server)
resolve([Object.assign({}, secureOpts, { port, caFingerprint }), server])
})
}
})
}
function getFingerprint (content, inputEncoding = 'base64', outputEncoding = 'hex') {
function getFingerprint (content: string, inputEncoding = 'base64', outputEncoding = 'hex'): string {
const shasum = crypto.createHash('sha256')
// @ts-expect-error
shasum.update(content, inputEncoding)
// @ts-expect-error
const res = shasum.digest(outputEncoding)
return res.toUpperCase().match(/.{1,2}/g).join(':')
const arr = res.toUpperCase().match(/.{1,2}/g)
if (arr == null) {
throw new Error('Should produce a match')
}
return arr.join(':')
}
module.exports = buildServer

View File

@ -1,66 +0,0 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
'use strict'
const { promisify } = require('util')
const sleep = promisify(setTimeout)
const buildServer = require('./buildServer')
const buildCluster = require('./buildCluster')
const buildProxy = require('./buildProxy')
const connection = require('./MockConnection')
const { Client } = require('../../')
async function waitCluster (client, waitForStatus = 'green', timeout = '50s', times = 0) {
if (!client) {
throw new Error('waitCluster helper: missing client instance')
}
try {
await client.cluster.health({ waitForStatus, timeout })
} catch (err) {
if (++times < 10) {
await sleep(5000)
return waitCluster(client, waitForStatus, timeout, times)
}
throw err
}
}
function skipProductCheck (client) {
const tSymbol = Object.getOwnPropertySymbols(client.transport || client)
.filter(symbol => symbol.description === 'product check')[0]
;(client.transport || client)[tSymbol] = 2
}
class NoProductCheckClient extends Client {
constructor (opts) {
super(opts)
skipProductCheck(this)
}
}
module.exports = {
buildServer,
buildCluster,
buildProxy,
connection,
waitCluster,
skipProductCheck,
Client: NoProductCheckClient
}

30
test/utils/index.ts Normal file
View File

@ -0,0 +1,30 @@
/*
* Licensed to Elasticsearch B.V. under one or more contributor
* license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright
* ownership. Elasticsearch B.V. licenses this file to you under
* the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
import buildServer from './buildServer'
import * as connection from './MockConnection'
import buildCluster from './buildCluster'
import * as buildProxy from './buildProxy'
export {
buildServer,
connection,
buildCluster,
buildProxy
}