Add streaming support to Arrow helper (#2407)
This commit is contained in:
@ -18,7 +18,7 @@
|
||||
*/
|
||||
|
||||
import { test } from 'tap'
|
||||
import { Table } from '@apache-arrow/esnext-cjs'
|
||||
import * as arrow from 'apache-arrow'
|
||||
import { connection } from '../../utils'
|
||||
import { Client } from '../../../'
|
||||
|
||||
@ -111,7 +111,7 @@ test('ES|QL helper', t => {
|
||||
t.end()
|
||||
})
|
||||
|
||||
test('toArrow', t => {
|
||||
test('toArrowTable', t => {
|
||||
t.test('Parses a binary response into an Arrow table', async t => {
|
||||
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
|
||||
|
||||
@ -132,8 +132,8 @@ test('ES|QL helper', t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrow()
|
||||
t.ok(result instanceof Table)
|
||||
const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowTable()
|
||||
t.ok(result instanceof arrow.Table)
|
||||
|
||||
const table = [...result]
|
||||
t.same(table[0], [
|
||||
@ -165,7 +165,125 @@ test('ES|QL helper', t => {
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
await client.helpers.esql({ query: 'FROM sample_data' }).toArrow()
|
||||
await client.helpers.esql({ query: 'FROM sample_data' }).toArrowTable()
|
||||
t.end()
|
||||
})
|
||||
|
||||
t.end()
|
||||
})
|
||||
|
||||
test('toArrowReader', t => {
|
||||
t.test('Parses a binary response into an Arrow stream reader', async t => {
|
||||
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
|
||||
|
||||
const MockConnection = connection.buildMockConnection({
|
||||
onRequest (_params) {
|
||||
return {
|
||||
body: Buffer.from(binaryContent, 'base64'),
|
||||
statusCode: 200,
|
||||
headers: {
|
||||
'content-type': 'application/vnd.elasticsearch+arrow+stream'
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
const client = new Client({
|
||||
node: 'http://localhost:9200',
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader()
|
||||
t.ok(result.isStream())
|
||||
|
||||
const recordBatch = result.next().value
|
||||
t.same(recordBatch.get(0)?.toJSON(), {
|
||||
amount: 4.900000095367432,
|
||||
date: 1729532586965,
|
||||
})
|
||||
t.end()
|
||||
})
|
||||
|
||||
t.test('ESQL helper uses correct x-elastic-client-meta helper value', async t => {
|
||||
const binaryContent = '/////zABAAAQAAAAAAAKAA4ABgANAAgACgAAAAAABAAQAAAAAAEKAAwAAAAIAAQACgAAAAgAAAAIAAAAAAAAAAIAAAB8AAAABAAAAJ7///8UAAAARAAAAEQAAAAAAAoBRAAAAAEAAAAEAAAAjP///wgAAAAQAAAABAAAAGRhdGUAAAAADAAAAGVsYXN0aWM6dHlwZQAAAAAAAAAAgv///wAAAQAEAAAAZGF0ZQAAEgAYABQAEwASAAwAAAAIAAQAEgAAABQAAABMAAAAVAAAAAAAAwFUAAAAAQAAAAwAAAAIAAwACAAEAAgAAAAIAAAAEAAAAAYAAABkb3VibGUAAAwAAABlbGFzdGljOnR5cGUAAAAAAAAAAAAABgAIAAYABgAAAAAAAgAGAAAAYW1vdW50AAAAAAAA/////7gAAAAUAAAAAAAAAAwAFgAOABUAEAAEAAwAAABgAAAAAAAAAAAABAAQAAAAAAMKABgADAAIAAQACgAAABQAAABYAAAABQAAAAAAAAAAAAAABAAAAAAAAAAAAAAAAQAAAAAAAAAIAAAAAAAAACgAAAAAAAAAMAAAAAAAAAABAAAAAAAAADgAAAAAAAAAKAAAAAAAAAAAAAAAAgAAAAUAAAAAAAAAAAAAAAAAAAAFAAAAAAAAAAAAAAAAAAAAHwAAAAAAAAAAAACgmZkTQAAAAGBmZiBAAAAAAAAAL0AAAADAzMwjQAAAAMDMzCtAHwAAAAAAAADV6yywkgEAANWPBquSAQAA1TPgpZIBAADV17mgkgEAANV7k5uSAQAA/////wAAAAA='
|
||||
|
||||
const MockConnection = connection.buildMockConnection({
|
||||
onRequest (params) {
|
||||
const header = params.headers?.['x-elastic-client-meta'] ?? ''
|
||||
t.ok(header.includes('h=qa'), `Client meta header does not include ESQL helper value: ${header}`)
|
||||
return {
|
||||
body: Buffer.from(binaryContent, 'base64'),
|
||||
statusCode: 200,
|
||||
headers: {
|
||||
'content-type': 'application/vnd.elasticsearch+arrow+stream'
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
const client = new Client({
|
||||
node: 'http://localhost:9200',
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader()
|
||||
t.end()
|
||||
})
|
||||
|
||||
t.test('multi-batch support', async t => {
|
||||
const intType = new arrow.Uint32
|
||||
const floatType = new arrow.Float32
|
||||
const schema = new arrow.Schema([
|
||||
arrow.Field.new('id', intType),
|
||||
arrow.Field.new('val', floatType)
|
||||
])
|
||||
|
||||
function getBatch(ids: number[], vals: number[]) {
|
||||
const id = arrow.makeData({ type: intType, data: ids })
|
||||
const val = arrow.makeData({ type: floatType, data: vals })
|
||||
return new arrow.RecordBatch({ id, val })
|
||||
}
|
||||
|
||||
const batch1 = getBatch([1, 2, 3], [0.1, 0.2, 0.3])
|
||||
const batch2 = getBatch([4, 5, 6], [0.4, 0.5, 0.6])
|
||||
const batch3 = getBatch([7, 8, 9], [0.7, 0.8, 0.9])
|
||||
|
||||
const table = new arrow.Table(schema, [
|
||||
new arrow.RecordBatch(schema, batch1.data),
|
||||
new arrow.RecordBatch(schema, batch2.data),
|
||||
new arrow.RecordBatch(schema, batch3.data),
|
||||
])
|
||||
|
||||
const MockConnection = connection.buildMockConnection({
|
||||
onRequest (_params) {
|
||||
return {
|
||||
body: Buffer.from(arrow.tableToIPC(table, "stream")),
|
||||
statusCode: 200,
|
||||
headers: {
|
||||
'content-type': 'application/vnd.elasticsearch+arrow+stream'
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
const client = new Client({
|
||||
node: 'http://localhost:9200',
|
||||
Connection: MockConnection
|
||||
})
|
||||
|
||||
const result = await client.helpers.esql({ query: 'FROM sample_data' }).toArrowReader()
|
||||
t.ok(result.isStream())
|
||||
|
||||
let counter = 0
|
||||
for (const batch of result) {
|
||||
for (const row of batch) {
|
||||
counter++
|
||||
const { id, val } = row.toJSON()
|
||||
t.equal(id, counter)
|
||||
// floating points are hard in JS
|
||||
t.equal((Math.round(val * 10) / 10).toFixed(1), (counter * 0.1).toFixed(1))
|
||||
}
|
||||
}
|
||||
t.end()
|
||||
})
|
||||
|
||||
|
||||
Reference in New Issue
Block a user