Arrow Flight integration¶
This document describes the Arrow Flight contract used for system-table scans and how to:
- implement a producer, and
- implement an ingester/consumer.
For the cross-transport overview, see system-scans.md. For gRPC details, see system-scans-grpc.md.
Scope¶
Floecat uses a two-phase Flight flow for system tables:
GetFlightInfo: validate command + auth, resolve schema, return an opaque ticket.GetStream: redeem ticket, build scan plan, stream Arrow batches.
The command/ticket schema is defined in system_scan_flight.proto (SystemTableFlightCommand and
SystemTableFlightTicket).
Transport flow details¶
Phase 1: GetFlightInfo¶
- Decode
SystemTableFlightCommandfrom descriptor command bytes. - Validate query identity (
query_idvsx-query-idif both are present). - Resolve table handle (
target.nameortarget.id) and project schema. - Return a
FlightInfowith projected schema and a versioned opaque ticket. - No data scan happens in this phase.
Phase 2: GetStream¶
- Decode ticket (
SystemTableFlightTicket) and validate version. - Resolve/authorize table access again.
- Run plan on worker executor thread (not event-loop thread).
- Stream Arrow schema/batches through
FlightArrowBatchSink. - Ensure sink/allocator cleanup on complete, cancel, or error.
Producer guide¶
Use SystemTableFlightProducerBase from floecat-flight.
1) Extend the base class¶
Implement:
resolveCallContext(...)tableNames(...)(canonical lowercase names, for examplesys.session)schemaColumns(...)buildPlan(...)selfLocation()
Optional:
authorize(...)for permission checks.resolveSystemTableId(...)/resolveSystemTableName(...)if ID routing is needed.
2) Constructor wiring¶
Use one of the base constructors:
super(allocator, flightExecutor)when allocator is provided directly by host runtime.super(allocatorProvider, flightExecutor)when allocator is managed by a provider bean.
3) List flights correctly¶
Use descriptorForTable(tableName, callCtx) so descriptors carry protocol-correct command bytes.
4) Build plans with cancellation¶
buildPlan(...) receives BooleanSupplier cancelled; pass it into paged iterators and stop quickly
when cancelled.
5) Memory model¶
The base allocates a per-stream child allocator from the parent allocator and closes it after the stream ends or fails. Producer code should not close the parent allocator.
Ingester / consumer guide¶
Use SystemTableCommands + standard Flight client calls.
1) Build command descriptor¶
Use canonical table name and query id:
FlightDescriptor d = SystemTableCommands.descriptor("sys.query", queryId);
2) Send required headers/context¶
At minimum, provide query identity either:
- in command
query_id(default viaSystemTableCommands), or - via
x-query-idheader.
If both are present, they must match.
For authenticated services, propagate the same auth/session headers used on gRPC paths.
Common headers:
| Header | Purpose |
|---|---|
authorization or session header |
OIDC bearer token or session token |
x-engine-kind |
Engine kind (optional) |
x-engine-version |
Engine version (optional) |
x-query-id |
Query ID (optional if command carries query_id) |
x-correlation-id |
Correlation ID (generated if absent; propagated in call context) |
3) Two-phase call flow¶
FlightInfo info = client.getInfo(d, callOptions...)Ticket ticket = info.getEndpoints().get(0).getTicket()FlightStream stream = client.getStream(ticket, callOptions...)- Read schema + batches from stream.
4) Retry guidance¶
Retry only transient failures (UNAVAILABLE), with bounded timeout/retry budget. Do not retry:
INVALID_ARGUMENT(bad command/query id/ticket version)NOT_FOUND(unknown table/route)UNAUTHENTICATED/UNAUTHORIZED
Protocol notes¶
- Routing is by canonical table name ownership (
tableNames()), not opaque payload heuristics. - Ticket format is versioned (
SystemTableFlightTicket.version); mismatched versions returnINVALID_ARGUMENT. - Producers may accept name-only targets (no ID) for external endpoints.
required_columnsprojection is applied to bothGetFlightInfoschema and stream data.- Arrow schema generation uses Floecat
SchemaColumn.logical_typesemantics viaArrowSchemaUtil: - use Floecat canonical logical types (or supported aliases),
- integer aliases collapse to Arrow
Int64, JSONmaps to ArrowUtf8;BINARYmaps to ArrowBinary;UUIDmaps toFixedSizeBinary(16),INTERVALand complex container kinds (ARRAY,MAP,STRUCT,VARIANT) are not supported and must be omitted or cast toSTRING/BINARY,- unknown/null/blank logical types fail fast instead of defaulting to
Utf8.
Auth/authorization behavior:
catalog.readis enforced on bothGetFlightInfoandGetStream.- Failures map to typed statuses (
UNAUTHENTICATED,UNAUTHORIZED,NOT_FOUND,INVALID_ARGUMENT) rather than generic internal errors.
Configuration¶
| Property | Default | Purpose |
|---|---|---|
floecat.flight.advertised-host |
localhost |
Host returned in FlightEndpointRef |
floecat.flight.advertised-port |
80 |
Port returned in FlightEndpointRef and FlightInfo |
quarkus.grpc.server.port |
${quarkus.http.port} |
Actual shared gRPC and Flight listener port |
quarkus.grpc.server.plain-text |
true |
Controls whether advertised Flight locations use insecure gRPC or TLS |
floecat.flight.memory.max-bytes |
0 |
Parent allocator cap (0 = unbounded) |
ai.floedb.floecat.arrow.max-bytes |
1073741824 |
Per-stream allocator cap |
The configured floecat.flight.advertised-host is published in catalog bundles as
FlightEndpointRef. floecat.flight.advertised-port controls the advertised external port
separately from the actual shared gRPC listener. In non-dev/test profiles, avoid loopback values
(localhost / 127.0.0.1) for distributed clients.
Flight now runs on Quarkus's shared gRPC server instead of a separate standalone listener, but the
advertised host/port can still be overridden to match an ingress or load balancer.
Scanner and engine-context resolution¶
Flight and gRPC share scanner resolution logic:
SystemScannerResolver.resolve(correlationId, tableId, engineContext)- Includes cross-engine table-id translation (
translateToDefault) when engine-specific IDs are presented.
Discovery model¶
When catalog bundles include flight_endpoint, workers can route to the declared endpoint and use
this same command/ticket flow independent of backing service implementation.
Troubleshooting quick checks¶
INVALID_ARGUMENT: query_id is required: missing both commandquery_idandx-query-id.INVALID_ARGUMENT: query_id mismatch: header and command query ids differ.NOT_FOUNDon supported table: producertableNames()does not include canonical name used by descriptor.- Empty batches:
verify writer calls
root.setRowCount(...).