Basic helper for ES|QL's Apache Arrow output format (#2391)

This commit is contained in:
Josh Mock
2024-10-22 15:00:18 -05:00
committed by GitHub
parent e9fdcb0647
commit c3247d0c66
4 changed files with 89 additions and 8 deletions

View File

@ -87,7 +87,8 @@
"zx": "^7.2.2" "zx": "^7.2.2"
}, },
"dependencies": { "dependencies": {
"@elastic/transport": "^8.8.1", "@elastic/transport": "^8.9.0",
"@apache-arrow/esnext-cjs": "^17.0.0",
"tslib": "^2.4.0" "tslib": "^2.4.0"
}, },
"tap": { "tap": {

View File

@ -25,6 +25,7 @@ import assert from 'node:assert'
import * as timersPromises from 'node:timers/promises' import * as timersPromises from 'node:timers/promises'
import { Readable } from 'node:stream' import { Readable } from 'node:stream'
import { errors, TransportResult, TransportRequestOptions, TransportRequestOptionsWithMeta } from '@elastic/transport' import { errors, TransportResult, TransportRequestOptions, TransportRequestOptionsWithMeta } from '@elastic/transport'
import { Table, TypeMap, tableFromIPC } from '@apache-arrow/esnext-cjs'
import Client from './client' import Client from './client'
import * as T from './api/types' import * as T from './api/types'
@ -155,6 +156,7 @@ export interface EsqlResponse {
export interface EsqlHelper { export interface EsqlHelper {
toRecords: <TDocument>() => Promise<EsqlToRecords<TDocument>> toRecords: <TDocument>() => Promise<EsqlToRecords<TDocument>>
toArrow: () => Promise<Table<TypeMap>>
} }
export interface EsqlToRecords<TDocument> { export interface EsqlToRecords<TDocument> {
@ -965,11 +967,6 @@ export default class Helpers {
* @returns {object} EsqlHelper instance * @returns {object} EsqlHelper instance
*/ */
esql (params: T.EsqlQueryRequest, reqOptions: TransportRequestOptions = {}): EsqlHelper { esql (params: T.EsqlQueryRequest, reqOptions: TransportRequestOptions = {}): EsqlHelper {
if (this[kMetaHeader] !== null) {
reqOptions.headers = reqOptions.headers ?? {}
reqOptions.headers['x-elastic-client-meta'] = `${this[kMetaHeader] as string},h=qo`
}
const client = this[kClient] const client = this[kClient]
function toRecords<TDocument> (response: EsqlResponse): TDocument[] { function toRecords<TDocument> (response: EsqlResponse): TDocument[] {
@ -985,17 +982,37 @@ export default class Helpers {
}) })
} }
const metaHeader = this[kMetaHeader]
const helper: EsqlHelper = { const helper: EsqlHelper = {
/** /**
* Pivots ES|QL query results into an array of row objects, rather than the default format where each row is an array of values. * Pivots ES|QL query results into an array of row objects, rather than the default format where each row is an array of values.
*/ */
async toRecords<TDocument>(): Promise<EsqlToRecords<TDocument>> { async toRecords<TDocument>(): Promise<EsqlToRecords<TDocument>> {
if (metaHeader !== null) {
reqOptions.headers = reqOptions.headers ?? {}
reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qo`
}
params.format = 'json' params.format = 'json'
params.columnar = false
// @ts-expect-error it's typed as ArrayBuffer but we know it will be JSON // @ts-expect-error it's typed as ArrayBuffer but we know it will be JSON
const response: EsqlResponse = await client.esql.query(params, reqOptions) const response: EsqlResponse = await client.esql.query(params, reqOptions)
const records: TDocument[] = toRecords(response) const records: TDocument[] = toRecords(response)
const { columns } = response const { columns } = response
return { records, columns } return { records, columns }
},
async toArrow (): Promise<Table<TypeMap>> {
if (metaHeader !== null) {
reqOptions.headers = reqOptions.headers ?? {}
reqOptions.headers['x-elastic-client-meta'] = `${metaHeader as string},h=qa`
}
params.format = 'arrow'
const response = await client.esql.query(params, reqOptions)
return tableFromIPC(response)
} }
} }

View File

@ -18,6 +18,7 @@
*/ */
import { test } from 'tap' import { test } from 'tap'
import { Table } from '@apache-arrow/esnext-cjs'
import { connection } from '../../utils' import { connection } from '../../utils'
import { Client } from '../../../' import { Client } from '../../../'
@ -109,5 +110,66 @@ test('ES|QL helper', t => {
t.end() t.end()
}) })
test('toArrow', t => {
t.test('Parses a binary response into an Arrow table', async t => {
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
const MockConnection = connection.buildMockConnection({
onRequest (_params) {
return {
body: Buffer.from(binaryContent, 'base64'),
statusCode: 200,
headers: {
'content-type': 'application/vnd.elasticsearch+arrow+stream'
}
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrow()
t.ok(result instanceof Table)
const table = [...result]
t.same(table[0], [
["amount", 4.900000095367432],
["date", 1729532586965],
])
t.end()
})
t.test('ESQL helper uses correct x-elastic-client-meta helper value', async t => {
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
const MockConnection = connection.buildMockConnection({
onRequest (params) {
const header = params.headers?.['x-elastic-client-meta'] ?? ''
t.ok(header.includes('h=qa'), `Client meta header does not include ESQL helper value: ${header}`)
return {
body: Buffer.from(binaryContent, 'base64'),
statusCode: 200,
headers: {
'content-type': 'application/vnd.elasticsearch+arrow+stream'
}
}
}
})
const client = new Client({
node: 'http://localhost:9200',
Connection: MockConnection
})
await client.helpers.esql({ query: 'FROM sample_data' }).toArrow()
t.end()
})
t.end()
})
t.end() t.end()
}) })

View File

@ -1,6 +1,6 @@
{ {
"compilerOptions": { "compilerOptions": {
"target": "es2019", "target": "ES2019",
"module": "commonjs", "module": "commonjs",
"moduleResolution": "node", "moduleResolution": "node",
"declaration": true, "declaration": true,
@ -21,7 +21,8 @@
"importHelpers": true, "importHelpers": true,
"outDir": "lib", "outDir": "lib",
"lib": [ "lib": [
"esnext" "ES2019",
"dom"
] ]
}, },
"formatCodeOptions": { "formatCodeOptions": {