Skip to content

Reconciler

Overview

The reconciler/ module automates ingestion from upstream connectors. It manages a durable queue of reconciliation jobs, leases work to workers, instantiates connectors via the SPI, and calls the service's gRPC APIs to create or update tables, views, snapshots, statistics, and index artifacts.

This component decouples connector execution from the main service so long-running metadata and file-scoped execution work do not block public gRPC threads. The service submits capture jobs via ReconcileControl.StartCapture, which creates jobs in the reconciler's store.

The current job model is split by responsibility:

  • PLAN_CONNECTOR: top-level connector discovery job. Plans table and view work and enqueues child planning jobs.
  • PLAN_TABLE: child table planning job. Ensures destination table metadata exists and enqueues snapshot planning work for snapshots that need processing.
  • PLAN_VIEW: child view planning job. Creates or updates exactly one destination view.
  • PLAN_SNAPSHOT: child snapshot planning job. Freezes the immutable snapshot plan on the parent job payload, records explicit file-group coverage metadata, and enqueues EXEC_FILE_GROUP plus FINALIZE_SNAPSHOT_CAPTURE children.
  • EXEC_FILE_GROUP: child execution job. Reads the planned source parquet files through FloecatConnector, captures file-target stats, generates parquet sidecar index artifacts, persists per-file execution results, and does not commit snapshot-wide aggregate outputs.
  • FINALIZE_SNAPSHOT_CAPTURE: child finalization job. Validates the persisted snapshot coverage, waits for all planned EXEC_FILE_GROUP children to finish with persisted success results, verifies that the stats store contains exactly the expected file-target records for the snapshot, and then writes snapshot-wide aggregate outputs such as table/column stats.

Architecture & Responsibilities

  • ReconcileJobStore: interface abstracting job persistence and leasing. In service runtime, the default is the durable implementation (DurableReconcileJobStore) selected by floecat.reconciler.job-store=durable; in-memory (InMemoryReconcileJobStore) remains available for lightweight/local usage when floecat.reconciler.job-store=memory.
  • RemoteReconcileExecutorPoller: Quarkus scheduled bean that leases reconcile jobs through the ReconcileExecutorControl gRPC control plane, starts heartbeats, invokes the matching local executor implementation, reports progress/completion, and repolls while worker capacity is available. In worker.mode=local, the poller talks to the colocated service over gRPC; in worker.mode=remote, executor-only nodes use the same lease protocol against a separate control plane.
  • ReconcilerService: core planning/orchestration:
  • Resolves connector metadata via the service's Connectors RPC.
  • Plans connector-scoped table and view work while preserving destination namespace/table overrides.
  • Ensures destination catalogs/namespaces/tables exist and updates connector metadata with resolved destination IDs.
  • Instantiates the connector via ConnectorFactory.
  • Handles incremental vs full-rescan logic.
  • Supports explicit capture modes:
    • METADATA_ONLY: advances catalog, table, view, and snapshot metadata without capture.
    • CAPTURE_ONLY: captures stats / index artifacts for explicitly scoped destination tables without reconciling view metadata.
    • METADATA_AND_CAPTURE: ingests metadata and runs capture for matching table work in the same job tree.
  • Lease executors:
  • RemotePlannerReconcileExecutor handles PLAN_CONNECTOR.
  • RemoteDefaultReconcileExecutor handles PLAN_TABLE and PLAN_VIEW.
  • RemoteSnapshotPlanningReconcileExecutor handles PLAN_SNAPSHOT.
  • RemoteFileGroupReconcileExecutor handles EXEC_FILE_GROUP.
  • SnapshotFinalizeReconcileExecutor handles FINALIZE_SNAPSHOT_CAPTURE.
  • GrpcClients: provides blocking stubs for all service RPCs (Catalog, Namespace, Table, Snapshot, Statistics, Directory, Connectors, ReconcileExecutorControl).
  • FloecatConnector: remains the only component allowed to touch upstream catalogs, table metadata, and object storage. Reconcile planning and file-group execution both go through the connector instance; reconcile does not use ScanBundleService.

Public API / Surface Area

While the reconciler itself runs as an internal Quarkus app, it exposes behavior through the reconcile control RPCs:

  • ReconcileControl.StartCapture(scope, mode, full_rescan, execution_policy): enqueues a top-level PLAN_CONNECTOR job via ReconcileJobStore.
  • ReconcileControl.CaptureNow(...): uses the same split path, but waits for the aggregated outcome of the top-level plan job plus any child planning/execution jobs.
  • ReconcileControl.GetReconcileJob(job_id) / ListReconcileJobs(...): expose both top-level and child jobs. Parent-capable jobs (PLAN_CONNECTOR, PLAN_TABLE, PLAN_SNAPSHOT) surface aggregate child status through eventually consistent projection/root-summary read models rather than synchronous parent-canonical rollups.
  • ReconcileControl.GetFinalizedSnapshotStatus(table_id, snapshot_id): returns whether one destination snapshot has completed reconcile finalization. FSS_PENDING means no finalized snapshot record exists yet for that table/snapshot pair. FSS_FINALIZED includes the finalized_at timestamp and finalizer_job_id for the job that committed finalization.

Internally, the worker poller exposes pollEvery via @Scheduled (default every second).

Important Internal Details

  • Destination binding: when reconciling, the service ensures the connector's declared destination catalog/namespace/table IDs align with actual resources. Any mismatch triggers a ConnectorState update or raises conflicts.
  • Statistics ingestion: stats persistence is centralized behind the stats control plane and the reconcile executor control plane. CAPTURE_ONLY routes capture planning through the same reconcile job tree without metadata reconciliation. METADATA_AND_CAPTURE performs metadata reconciliation and capture within the same planner/executor job tree. Remote file-group workers submit file-target stats and staged index artifacts back through SubmitLeasedFileGroupExecutionResult, and the service persists those results before FINALIZE_SNAPSHOT_CAPTURE writes any snapshot-wide aggregate stats.
  • Snapshot planning persistence: the immutable snapshot plan is stored on the parent PLAN_SNAPSHOT job payload rather than in a separate plan repository. That payload includes the explicit file-group coverage metadata required by FINALIZE_SNAPSHOT_CAPTURE. Child EXEC_FILE_GROUP and FINALIZE_SNAPSHOT_CAPTURE jobs reference the parent plan by parentJobId.
  • File-group execution:
  • EXEC_FILE_GROUP resolves the parent snapshot plan, captures file-target stats, and records per-file execution results on the child job payload.
  • Snapshot-wide aggregate outputs are intentionally deferred to FINALIZE_SNAPSHOT_CAPTURE, which acts as the barrier for complete snapshot capture.
  • Sidecar generation and artifact registration happen per source parquet file.
  • Service-side result submission persists only file-target stats from file-group workers; aggregate table/column outputs are rejected from file-group completion and recomputed once at snapshot finalization time.
  • SubmitLeasedFileGroupExecutionResult requires result_id. The service records top-level idempotency for the whole submit payload and per-item idempotency for individual stats/artifact writes so worker retries can safely replay the same result.
  • Current snapshot reads surface file_groups_total, file_groups_completed, file_groups_failed, files_total, files_completed, and files_failed.
  • Index artifacts:
  • sidecars are parquet artifacts written by execution workers and registered through IndexArtifactRecord.
  • service-side lookup/list/read is exposed by TableIndexService.
  • Connector security boundary: all upstream I/O remains inside FloecatConnector. ScanBundleService stays query-plane only; reconcile snapshot planning uses connector-native snapshot file planning.
  • Mode-aware behavior:
  • in CAPTURE_ONLY, destination-table misses are treated as skip/no-op rather than job-fatal errors.
  • CAPTURE_ONLY is table-scoped: view scope is rejected, and scoped capture requests must resolve to explicit destination table IDs.
  • in METADATA_AND_CAPTURE, planner/executor availability is validated up front based on scope: table scope requires PLAN_TABLE, view scope requires PLAN_VIEW, and broad metadata reconcile requires both.
  • Plan failure behavior: if a parent plan job fails or is cancelled after enqueuing child jobs, the control plane cancels those children.
  • View reconciliation semantics: reconcile is current-state, not history-preserving. When an upstream view already exists in Floecat, reconcile updates the stored canonical definition in place rather than appending a backend version history.
  • Durable queue ownership model: DurableReconcileJobStore is split into explicit state domains with native durable-store boundaries:
  • canonical job state owns the job-index domain transactionally (lookup, state, dedupe, parent, connector, job-local counters, payload references, and related canonical indexes)
  • ready-queue state is a separate due-ordered domain used for leasing eligibility and queue slicing
  • lease coordination owns runtime worker-ownership state separately (lease, lease-expiry, lane lease, snapshot lease)
  • payload blobs are canonical task artifacts referenced from canonical rows (snapshot plans, file-group results, direct stats, per-file-group stats)
  • projection/root-summary state owns eventual-consistent observability views only (parent rollups, root-job list summaries, tree/list aggregate counters)
  • Job leasing: workers lease from persisted ready pointers, mark jobs running/succeeded/failed through transactional state transitions, and reclaim expired leases on a configured interval. Failed jobs are retried with backoff up to configured attempt limits before terminal failure.
  • Execution vs observability split:
  • queue correctness depends only on canonical state plus execution payload references
  • projection refresh is best-effort and may lag without affecting leasing, retries, cancellation, reclaim, or completion
  • detailed read APIs use best-effort payload hydration and may degrade detail when payload blobs are missing
  • No queue self-healing: read paths, lease scans, and maintenance do not rebuild missing or stale job indexes. Canonical job-index pointers are expected to stay correct because the owning job-state transitions update them together. Lease maintenance reclaims expired leases and projection maintenance repairs dirty parent/root summaries, but neither path is part of queue correctness.
  • Canonical vs projection-owned state:
  • canonical parent records carry scheduling and ownership state only (state, message, timestamps, executor ownership, waiting/finalization metadata, retry scheduling)
  • rolled-up aggregate counters (tables*, views*, snapshotsProcessed, statsProcessed, file-group/file counters, index counters) are projection-owned and root-summary-owned rather than canonical-parent-owned
  • transactional child completion updates canonical ancestor scheduling state immediately, while aggregate rollups are refreshed through projections
  • list/get/tree read paths may recompute fresh projections for the returned response, but they do not persist projection or root-summary repair
  • Backend shape:
  • in floecat.kv=dynamodb, the durable store hot paths use native Dynamo-style partition/sort-key layouts for job indexes, ready slices, and lease rows/expiry scans
  • in floecat.kv=memory, the same domain model is preserved, but the physical implementation is in-memory rather than a literal Dynamo simulator

gRPC auth

  • Reconcile workers use the gRPC control plane for leasing, progress, and standalone worker payload/result exchange.
  • Worker auth is attached explicitly by the reconcile executor client. The global outbound gRPC interceptor is not responsible for minting or attaching worker tokens.
  • In OIDC mode, background workers obtain a machine token via client credentials using floecat.reconciler.oidc.issuer, client-id, client-secret, token-refresh-skew-seconds, and connect-timeout.
  • If floecat.interceptor.session.header is configured, worker RPCs attach the token there (typically x-floe-session). Otherwise they fall back to floecat.reconciler.authorization.header.
  • ReconcileExecutorControl accepts only the dedicated internal worker permission carried by the reconciler service principal.
  • Other internal gRPC fanout paths may still propagate request-scoped user/session headers where that is the actual call contract, but that is separate from reconcile worker auth.

Data Flow & Lifecycle

Connector StartCapture / CaptureNow
  → ReconcileJobStore.enqueue(PLAN_CONNECTOR)
  → RemoteReconcileExecutorPoller.pollOnce
      → leaseNext
      → markRunning
      → if PLAN_CONNECTOR:
          → ReconcilerService.planTableTasks / planViewTasks
          → enqueue PLAN_TABLE / PLAN_VIEW children
      → if PLAN_TABLE:
          → ensure destination table metadata
          → enumerate snapshots via FloecatConnector
          → enqueue PLAN_SNAPSHOT children
      → if PLAN_VIEW:
          → describeView
          → ensure destination namespace exists
          → create or update the destination view
      → if PLAN_SNAPSHOT:
          → ask FloecatConnector for planned parquet file membership
          → persist grouped file plan on parent job payload
          → enqueue EXEC_FILE_GROUP children
          → enqueue FINALIZE_SNAPSHOT_CAPTURE child
      → if EXEC_FILE_GROUP:
          → resolve parent PLAN_SNAPSHOT payload
          → instantiate FloecatConnector
          → capture file-target stats for planned files
          → generate parquet sidecar index artifacts
          → persist per-file execution results and artifact registrations
      → if FINALIZE_SNAPSHOT_CAPTURE:
          → validate explicit planned coverage metadata
          → wait for all planned EXEC_FILE_GROUP children to succeed with persisted results
          → verify exact file-target coverage in the stats store
          → roll up snapshot-wide aggregate outputs
      → markSucceeded or markFailed

Jobs include fullRescan, executionPolicy, jobKind, and optional task payloads. Snapshot plan jobs, file-group jobs, and snapshot finalization jobs surface file-group/file counters in projected public views; parent canonical records do not store rolled-up aggregate counters. RemoteReconcileExecutorPoller uses AtomicBoolean and in-flight counters to avoid over-leasing within the same instance while continuing to repoll until worker slots are full. For handled remote completions, workers stop heartbeats before the handled success RPC and do not perform a post-completion final lease confirmation after that RPC has durably completed the job.

Configuration & Extensibility

  • Scheduling cadence via reconciler.pollEvery (defaults to 1s).
  • Worker mode via floecat.reconciler.worker.mode:
  • local runs the lease poller in the same JVM as the control plane.
  • remote keeps the same gRPC lease protocol but is intended for executor-only nodes. Set reconciler.max-parallelism=0 on control-plane-only nodes.
  • Worker capacity via reconciler.max-parallelism.
  • Job store selection:
  • floecat.reconciler.job-store=durable (service default) uses persisted queue records plus retry/lease tuning via: floecat.reconciler.job-store.max-attempts, base-backoff-ms, max-backoff-ms, lease-ms, reclaim-interval-ms, and ready-scan-limit.
  • floecat.reconciler.job-store=memory uses the in-memory queue implementation.
  • Executor toggles:
  • floecat.reconciler.executor.remote-default.enabled
  • floecat.reconciler.executor.remote-planner.enabled
  • floecat.reconciler.executor.remote-snapshot-planner.enabled
  • floecat.reconciler.executor.remote-file-group.enabled
  • floecat.reconciler.executor.snapshot-finalize.enabled
  • FINALIZE_SNAPSHOT_CAPTURE is handled by the service-local SnapshotFinalizeReconcileExecutor and can be disabled independently, but it is not a separate remote worker toggle.
  • Swap out ReconcileJobStore for additional backends by providing a CDI alternative (job ID references must remain stable for GetReconcileJob).
  • Extend FloecatConnector to add richer snapshot planning or file execution behavior. Query scan planning remains separate behind ScanBundleService.

Examples & Scenarios

  • Full metadata rescan: operator triggers connector trigger demo-glue --full --all --mode metadata-only. The job store enqueues a full PLAN_CONNECTOR job, the worker poller leases it, and the reconciler walks connector discovery and metadata planning across the full upstream history.
  • Incremental capture run: operator triggers connector trigger demo-glue --incremental --current --mode metadata-and-capture --capture stats. The reconcile path captures table/file/column stats for matching table work while still allowing metadata mutation.
  • Incremental run: --incremental restricts work to snapshots not already ingested, and the explicit snapshot scope (--current, --latest-n, --snapshot, or --all) controls which upstream snapshots are eligible for planning.

Cross-References