Reconciler¶
Overview¶
The reconciler/ module automates ingestion from upstream connectors. It manages a durable queue of
reconciliation jobs, leases work to workers, instantiates connectors via the SPI, and calls the
service's gRPC APIs to create or update tables, views, snapshots, statistics, and index artifacts.
This component decouples connector execution from the main service so long-running metadata and
file-scoped execution work do not block public gRPC threads. The service submits capture jobs via
ReconcileControl.StartCapture, which creates jobs in the reconciler's store.
The current job model is split by responsibility:
PLAN_CONNECTOR: top-level connector discovery job. Plans table and view work and enqueues child planning jobs.PLAN_TABLE: child table planning job. Ensures destination table metadata exists and enqueues snapshot planning work for snapshots that need processing.PLAN_VIEW: child view planning job. Creates or updates exactly one destination view.PLAN_SNAPSHOT: child snapshot planning job. Freezes the immutable snapshot plan on the parent job payload, records explicit file-group coverage metadata, and enqueuesEXEC_FILE_GROUPplusFINALIZE_SNAPSHOT_CAPTUREchildren.EXEC_FILE_GROUP: child execution job. Reads the planned source parquet files throughFloecatConnector, captures file-target stats, generates parquet sidecar index artifacts, persists per-file execution results, and does not commit snapshot-wide aggregate outputs.FINALIZE_SNAPSHOT_CAPTURE: child finalization job. Validates the persisted snapshot coverage, waits for all plannedEXEC_FILE_GROUPchildren to finish with persisted success results, verifies that the stats store contains exactly the expected file-target records for the snapshot, and then writes snapshot-wide aggregate outputs such as table/column stats.
Architecture & Responsibilities¶
ReconcileJobStore: interface abstracting job persistence and leasing. In service runtime, the default is the durable implementation (DurableReconcileJobStore) selected byfloecat.reconciler.job-store=durable; in-memory (InMemoryReconcileJobStore) remains available for lightweight/local usage whenfloecat.reconciler.job-store=memory.RemoteReconcileExecutorPoller: Quarkus scheduled bean that leases reconcile jobs through theReconcileExecutorControlgRPC control plane, starts heartbeats, invokes the matching local executor implementation, reports progress/completion, and repolls while worker capacity is available. Inworker.mode=local, the poller talks to the colocated service over gRPC; inworker.mode=remote, executor-only nodes use the same lease protocol against a separate control plane.ReconcilerService: core planning/orchestration:- Resolves connector metadata via the service's
ConnectorsRPC. - Plans connector-scoped table and view work while preserving destination namespace/table overrides.
- Ensures destination catalogs/namespaces/tables exist and updates connector metadata with resolved destination IDs.
- Instantiates the connector via
ConnectorFactory. - Handles incremental vs full-rescan logic.
- Supports explicit capture modes:
METADATA_ONLY: advances catalog, table, view, and snapshot metadata without capture.CAPTURE_ONLY: captures stats / index artifacts for explicitly scoped destination tables without reconciling view metadata.METADATA_AND_CAPTURE: ingests metadata and runs capture for matching table work in the same job tree.
- Lease executors:
RemotePlannerReconcileExecutorhandlesPLAN_CONNECTOR.RemoteDefaultReconcileExecutorhandlesPLAN_TABLEandPLAN_VIEW.RemoteSnapshotPlanningReconcileExecutorhandlesPLAN_SNAPSHOT.RemoteFileGroupReconcileExecutorhandlesEXEC_FILE_GROUP.SnapshotFinalizeReconcileExecutorhandlesFINALIZE_SNAPSHOT_CAPTURE.GrpcClients: provides blocking stubs for all service RPCs (Catalog, Namespace, Table, Snapshot, Statistics, Directory, Connectors, ReconcileExecutorControl).FloecatConnector: remains the only component allowed to touch upstream catalogs, table metadata, and object storage. Reconcile planning and file-group execution both go through the connector instance; reconcile does not useScanBundleService.
Public API / Surface Area¶
While the reconciler itself runs as an internal Quarkus app, it exposes behavior through the reconcile control RPCs:
ReconcileControl.StartCapture(scope, mode, full_rescan, execution_policy): enqueues a top-levelPLAN_CONNECTORjob viaReconcileJobStore.ReconcileControl.CaptureNow(...): uses the same split path, but waits for the aggregated outcome of the top-level plan job plus any child planning/execution jobs.ReconcileControl.GetReconcileJob(job_id)/ListReconcileJobs(...): expose both top-level and child jobs. Parent-capable jobs (PLAN_CONNECTOR,PLAN_TABLE,PLAN_SNAPSHOT) surface aggregate child status through eventually consistent projection/root-summary read models rather than synchronous parent-canonical rollups.ReconcileControl.GetFinalizedSnapshotStatus(table_id, snapshot_id): returns whether one destination snapshot has completed reconcile finalization.FSS_PENDINGmeans no finalized snapshot record exists yet for that table/snapshot pair.FSS_FINALIZEDincludes thefinalized_attimestamp andfinalizer_job_idfor the job that committed finalization.
Internally, the worker poller exposes pollEvery via @Scheduled (default every second).
Important Internal Details¶
- Destination binding: when reconciling, the service ensures the connector's declared
destination catalog/namespace/table IDs align with actual resources. Any mismatch triggers a
ConnectorStateupdate or raises conflicts. - Statistics ingestion: stats persistence is centralized behind the stats control plane
and the reconcile executor control plane.
CAPTURE_ONLYroutes capture planning through the same reconcile job tree without metadata reconciliation.METADATA_AND_CAPTUREperforms metadata reconciliation and capture within the same planner/executor job tree. Remote file-group workers submit file-target stats and staged index artifacts back throughSubmitLeasedFileGroupExecutionResult, and the service persists those results beforeFINALIZE_SNAPSHOT_CAPTUREwrites any snapshot-wide aggregate stats. - Snapshot planning persistence: the immutable snapshot plan is stored on the parent
PLAN_SNAPSHOTjob payload rather than in a separate plan repository. That payload includes the explicit file-group coverage metadata required byFINALIZE_SNAPSHOT_CAPTURE. ChildEXEC_FILE_GROUPandFINALIZE_SNAPSHOT_CAPTUREjobs reference the parent plan byparentJobId. - File-group execution:
EXEC_FILE_GROUPresolves the parent snapshot plan, captures file-target stats, and records per-file execution results on the child job payload.- Snapshot-wide aggregate outputs are intentionally deferred to
FINALIZE_SNAPSHOT_CAPTURE, which acts as the barrier for complete snapshot capture. - Sidecar generation and artifact registration happen per source parquet file.
- Service-side result submission persists only file-target stats from file-group workers; aggregate table/column outputs are rejected from file-group completion and recomputed once at snapshot finalization time.
SubmitLeasedFileGroupExecutionResultrequiresresult_id. The service records top-level idempotency for the whole submit payload and per-item idempotency for individual stats/artifact writes so worker retries can safely replay the same result.- Current snapshot reads surface
file_groups_total,file_groups_completed,file_groups_failed,files_total,files_completed, andfiles_failed. - Index artifacts:
- sidecars are parquet artifacts written by execution workers and registered through
IndexArtifactRecord. - service-side lookup/list/read is exposed by
TableIndexService. - Connector security boundary: all upstream I/O remains inside
FloecatConnector.ScanBundleServicestays query-plane only; reconcile snapshot planning uses connector-native snapshot file planning. - Mode-aware behavior:
- in
CAPTURE_ONLY, destination-table misses are treated as skip/no-op rather than job-fatal errors. CAPTURE_ONLYis table-scoped: view scope is rejected, and scoped capture requests must resolve to explicit destination table IDs.- in
METADATA_AND_CAPTURE, planner/executor availability is validated up front based on scope: table scope requiresPLAN_TABLE, view scope requiresPLAN_VIEW, and broad metadata reconcile requires both. - Plan failure behavior: if a parent plan job fails or is cancelled after enqueuing child jobs, the control plane cancels those children.
- View reconciliation semantics: reconcile is current-state, not history-preserving. When an upstream view already exists in Floecat, reconcile updates the stored canonical definition in place rather than appending a backend version history.
- Durable queue ownership model:
DurableReconcileJobStoreis split into explicit state domains with native durable-store boundaries: - canonical job state owns the job-index domain transactionally (
lookup,state,dedupe, parent, connector, job-local counters, payload references, and related canonical indexes) - ready-queue state is a separate due-ordered domain used for leasing eligibility and queue slicing
- lease coordination owns runtime worker-ownership state separately (
lease,lease-expiry, lane lease, snapshot lease) - payload blobs are canonical task artifacts referenced from canonical rows (snapshot plans, file-group results, direct stats, per-file-group stats)
- projection/root-summary state owns eventual-consistent observability views only (parent rollups, root-job list summaries, tree/list aggregate counters)
- Job leasing: workers lease from persisted ready pointers, mark jobs running/succeeded/failed through transactional state transitions, and reclaim expired leases on a configured interval. Failed jobs are retried with backoff up to configured attempt limits before terminal failure.
- Execution vs observability split:
- queue correctness depends only on canonical state plus execution payload references
- projection refresh is best-effort and may lag without affecting leasing, retries, cancellation, reclaim, or completion
- detailed read APIs use best-effort payload hydration and may degrade detail when payload blobs are missing
- No queue self-healing: read paths, lease scans, and maintenance do not rebuild missing or stale job indexes. Canonical job-index pointers are expected to stay correct because the owning job-state transitions update them together. Lease maintenance reclaims expired leases and projection maintenance repairs dirty parent/root summaries, but neither path is part of queue correctness.
- Canonical vs projection-owned state:
- canonical parent records carry scheduling and ownership state only (
state,message, timestamps, executor ownership, waiting/finalization metadata, retry scheduling) - rolled-up aggregate counters (
tables*,views*,snapshotsProcessed,statsProcessed, file-group/file counters, index counters) are projection-owned and root-summary-owned rather than canonical-parent-owned - transactional child completion updates canonical ancestor scheduling state immediately, while aggregate rollups are refreshed through projections
- list/get/tree read paths may recompute fresh projections for the returned response, but they do not persist projection or root-summary repair
- Backend shape:
- in
floecat.kv=dynamodb, the durable store hot paths use native Dynamo-style partition/sort-key layouts for job indexes, ready slices, and lease rows/expiry scans - in
floecat.kv=memory, the same domain model is preserved, but the physical implementation is in-memory rather than a literal Dynamo simulator
gRPC auth¶
- Reconcile workers use the gRPC control plane for leasing, progress, and standalone worker payload/result exchange.
- Worker auth is attached explicitly by the reconcile executor client. The global outbound gRPC interceptor is not responsible for minting or attaching worker tokens.
- In OIDC mode, background workers obtain a machine token via client credentials using
floecat.reconciler.oidc.issuer,client-id,client-secret,token-refresh-skew-seconds, andconnect-timeout. - If
floecat.interceptor.session.headeris configured, worker RPCs attach the token there (typicallyx-floe-session). Otherwise they fall back tofloecat.reconciler.authorization.header. ReconcileExecutorControlaccepts only the dedicated internal worker permission carried by the reconciler service principal.- Other internal gRPC fanout paths may still propagate request-scoped user/session headers where that is the actual call contract, but that is separate from reconcile worker auth.
Data Flow & Lifecycle¶
Connector StartCapture / CaptureNow
→ ReconcileJobStore.enqueue(PLAN_CONNECTOR)
→ RemoteReconcileExecutorPoller.pollOnce
→ leaseNext
→ markRunning
→ if PLAN_CONNECTOR:
→ ReconcilerService.planTableTasks / planViewTasks
→ enqueue PLAN_TABLE / PLAN_VIEW children
→ if PLAN_TABLE:
→ ensure destination table metadata
→ enumerate snapshots via FloecatConnector
→ enqueue PLAN_SNAPSHOT children
→ if PLAN_VIEW:
→ describeView
→ ensure destination namespace exists
→ create or update the destination view
→ if PLAN_SNAPSHOT:
→ ask FloecatConnector for planned parquet file membership
→ persist grouped file plan on parent job payload
→ enqueue EXEC_FILE_GROUP children
→ enqueue FINALIZE_SNAPSHOT_CAPTURE child
→ if EXEC_FILE_GROUP:
→ resolve parent PLAN_SNAPSHOT payload
→ instantiate FloecatConnector
→ capture file-target stats for planned files
→ generate parquet sidecar index artifacts
→ persist per-file execution results and artifact registrations
→ if FINALIZE_SNAPSHOT_CAPTURE:
→ validate explicit planned coverage metadata
→ wait for all planned EXEC_FILE_GROUP children to succeed with persisted results
→ verify exact file-target coverage in the stats store
→ roll up snapshot-wide aggregate outputs
→ markSucceeded or markFailed
Jobs include fullRescan, executionPolicy, jobKind, and optional task payloads. Snapshot plan
jobs, file-group jobs, and snapshot finalization jobs surface file-group/file counters in projected
public views; parent canonical records do not store rolled-up aggregate counters.
RemoteReconcileExecutorPoller uses AtomicBoolean and in-flight counters to avoid over-leasing
within the same instance while continuing to repoll until worker slots are full.
For handled remote completions, workers stop heartbeats before the handled success RPC and do not
perform a post-completion final lease confirmation after that RPC has durably completed the job.
Configuration & Extensibility¶
- Scheduling cadence via
reconciler.pollEvery(defaults to1s). - Worker mode via
floecat.reconciler.worker.mode: localruns the lease poller in the same JVM as the control plane.remotekeeps the same gRPC lease protocol but is intended for executor-only nodes. Setreconciler.max-parallelism=0on control-plane-only nodes.- Worker capacity via
reconciler.max-parallelism. - Job store selection:
floecat.reconciler.job-store=durable(service default) uses persisted queue records plus retry/lease tuning via:floecat.reconciler.job-store.max-attempts,base-backoff-ms,max-backoff-ms,lease-ms,reclaim-interval-ms, andready-scan-limit.floecat.reconciler.job-store=memoryuses the in-memory queue implementation.- Executor toggles:
floecat.reconciler.executor.remote-default.enabledfloecat.reconciler.executor.remote-planner.enabledfloecat.reconciler.executor.remote-snapshot-planner.enabledfloecat.reconciler.executor.remote-file-group.enabledfloecat.reconciler.executor.snapshot-finalize.enabledFINALIZE_SNAPSHOT_CAPTUREis handled by the service-localSnapshotFinalizeReconcileExecutorand can be disabled independently, but it is not a separate remote worker toggle.- Swap out
ReconcileJobStorefor additional backends by providing a CDI alternative (job ID references must remain stable forGetReconcileJob). - Extend
FloecatConnectorto add richer snapshot planning or file execution behavior. Query scan planning remains separate behindScanBundleService.
Examples & Scenarios¶
- Full metadata rescan: operator triggers
connector trigger demo-glue --full --all --mode metadata-only. The job store enqueues a fullPLAN_CONNECTORjob, the worker poller leases it, and the reconciler walks connector discovery and metadata planning across the full upstream history. - Incremental capture run: operator triggers
connector trigger demo-glue --incremental --current --mode metadata-and-capture --capture stats. The reconcile path captures table/file/column stats for matching table work while still allowing metadata mutation. - Incremental run:
--incrementalrestricts work to snapshots not already ingested, and the explicit snapshot scope (--current,--latest-n,--snapshot, or--all) controls which upstream snapshots are eligible for planning.
Cross-References¶
- Connector SPI details:
docs/connectors-spi.md - Service connector/query RPCs:
docs/service.md - Rust file-group worker implementation guide:
docs/rust-remote-capture-executor.md - Concrete connectors:
docs/connectors-iceberg.md,docs/connectors-delta.md