From 56602b505c0930dd217f72b5ee0570ad20f6f089 Mon Sep 17 00:00:00 2001 From: Jean-Gabriel Gill-Couture Date: Wed, 3 Jun 2026 23:21:30 -0400 Subject: [PATCH 1/3] feat(fleet-operator): aggregator recovery signal + orphan GC + recovery e2e (Ch2) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Operator restart + aggregator recovery (v0.3 plan Ch2). The aggregator already cold-rebuilds from NATS KV + CR watches; this makes recovery observable, closes an orphan gap, and pins each failure shape with a regression test. - OperatorLiveness: a shared in-process latch (Recovering → Converged) the aggregator sets once all three cold-start sources replay (Deployment/Device watcher InitDone, device-state KV seen_current; empty-bucket short-circuit). The in-process dashboard reads it and shows a self-clearing banner via an HTMX self-poll (/__recovery), so the customer sees progress, not a blank. - gc_orphaned_desired_state: at convergence, purge desired-state whose Deployment CR no longer exists (force-deleted while the operator was down, finalizer bypassed). Belt-and-suspenders with the controller finalizer. - run() now owns its watchers in a JoinSet, so cancelling the aggregator aborts its children — no orphan tasks outliving a restart (matters for the restart-simulation tests and clean process teardown). Also made run() Send (hoisted a .await out of a tracing macro) so it can be spawned. - docs/fleet-operator-recovery-scenarios.md enumerates the failure shapes and maps each to its test. - harmony-fleet-e2e/tests/operator_recovery.rs: regression test per scenario (cold restart converges from KV; orphan GC; two operators write identical bytes; chaos kill under write load converges <30s) + AdminKv::put_device_state. Writes stay idempotent + byte-deterministic, so two operators racing agree without leader election (operator HA = D3, deferred). --- Cargo.lock | 1 + docs/fleet-operator-recovery-scenarios.md | 61 +++ fleet/harmony-fleet-e2e/Cargo.toml | 1 + fleet/harmony-fleet-e2e/src/kv_admin.rs | 16 + .../tests/operator_recovery.rs | 439 ++++++++++++++++++ .../src/fleet_aggregator.rs | 190 ++++++-- .../src/frontend/layout.rs | 27 ++ .../src/frontend/server.rs | 9 + fleet/harmony-fleet-operator/src/lib.rs | 1 + fleet/harmony-fleet-operator/src/liveness.rs | 103 ++++ fleet/harmony-fleet-operator/src/main.rs | 20 +- .../harmony-fleet-operator/src/service/mod.rs | 6 + .../src/service/real.rs | 19 +- 13 files changed, 840 insertions(+), 53 deletions(-) create mode 100644 docs/fleet-operator-recovery-scenarios.md create mode 100644 fleet/harmony-fleet-e2e/tests/operator_recovery.rs create mode 100644 fleet/harmony-fleet-operator/src/liveness.rs diff --git a/Cargo.lock b/Cargo.lock index bd7bd815..ef60028b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4131,6 +4131,7 @@ dependencies = [ "anyhow", "async-nats", "async-trait", + "chrono", "directories", "futures-util", "harmony", diff --git a/docs/fleet-operator-recovery-scenarios.md b/docs/fleet-operator-recovery-scenarios.md new file mode 100644 index 00000000..2c002ca2 --- /dev/null +++ b/docs/fleet-operator-recovery-scenarios.md @@ -0,0 +1,61 @@ +# Fleet operator recovery scenarios + +The operator can be killed, upgraded, or rescheduled at any time. When it comes +back it must **converge from NATS KV + the CRs alone** — no customer-visible +"unknown state" window. This doc enumerates the failure shapes, what correct +recovery looks like, and the regression test that pins each one. + +## How recovery works + +The operator is stateless across restarts. Everything it needs is durable: + +- **Deployment / Device CRs** live in etcd (kube). On restart the aggregator's + `watcher` replays the full list (`Event::InitApply` …→ `Event::InitDone`). +- **`desired-state` KV** is what the operator previously wrote per + `(device, deployment)`. On restart `seed_owned_targets` reloads it so the + first reconcile diffs correctly instead of orphaning entries. +- **`device-state` KV** is per-device phase reported by agents. On restart the + state watcher replays it (`watch_with_history`), rebuilding health counts. + +All writes are **idempotent and byte-deterministic**: the desired-state payload +is the same serialized score for a given CR, and status patches are computed +from the rebuilt caches. So a second operator racing the first writes identical +bytes — no leader election needed at current fleet size (operator HA is D3, +deferred). See [`fleet_aggregator.rs`]. + +### Convergence signal + +Until the cold replay finishes, health counts can be stale. The aggregator +latches an [`OperatorLiveness`] from `Recovering` → `Converged` once all three +sources have replayed (Deployment `InitDone`, Device `InitDone`, device-state KV +`seen_current`). The in-process dashboard reads it and shows a banner, so the +customer sees "recovering", never a silent stale view. Convergence is typically +sub-second; the banner self-clears. + +## Scenarios + +| # | Shape | Correct recovery | Regression test | +|---|-------|------------------|-----------------| +| 1 | **Cold restart, healthy fleet.** Operator killed and restarted; devices kept reporting state to KV. | Rebuild desired-state + health counts from KV alone. Desired-state entries are **re-written identically, not churned** (agents byte-compare and no-op). Liveness reaches `Converged`. | `aggregator_converges_from_kv_after_restart` | +| 2 | **Stale KV: CR deleted while operator down.** A Deployment CR is force-deleted (finalizer bypassed — namespace teardown, `--force`) while the operator is down, leaving orphan desired-state. | At convergence, GC any desired-state whose Deployment CR no longer exists (`gc_orphaned_desired_state`). Agents stop running the dead deployment. | `aggregator_gcs_desired_state_for_deleted_cr` | +| 3 | **Two operators racing.** A rolling deploy briefly runs two operator replicas. | Both write **identical** desired-state bytes; the KV value is stable; no orphan, no flapping. Idempotent multi-writer, so no leader election required. | `two_aggregators_produce_identical_desired_state` | +| 4 | **Partial KV: device offline during reset.** A device hasn't reported (no `device-state` entry) when the operator restarts. | The deployment still converges; the missing device counts as `pending`, not lost. Recovers to `Running` when the device reports. | covered by #1 (one target has no state entry) + unit `apply_state` dedup | +| 5 | **Chaos: kill under write load.** Operator repeatedly killed/restarted while deployments are being created. | Final state converges to the full desired-state set in < 30 s once a replica stays up. | `chaos_kill_under_write_load_converges` | + +Out-of-order / replayed `device-state` (an older event arriving after a newer +one) is handled by `apply_state`'s `last_event_at` dedup — unit-tested in +`fleet_aggregator.rs`, exercised on every replay. + +## Running the regression tests + +```bash +HARMONY_FLEET_E2E=1 cargo test -p harmony-fleet-e2e --test operator_recovery +``` + +Needs k3d + podman on PATH (the shared harness brings up NATS in a fresh +namespace). The tests drive `fleet_aggregator::run` in-process against the real +NATS + k3d, aborting and respawning it to simulate restarts — `run` owns its +watchers in a `JoinSet`, so a cancelled aggregator leaves no orphan tasks. + +[`fleet_aggregator.rs`]: ../fleet/harmony-fleet-operator/src/fleet_aggregator.rs +[`OperatorLiveness`]: ../fleet/harmony-fleet-operator/src/liveness.rs diff --git a/fleet/harmony-fleet-e2e/Cargo.toml b/fleet/harmony-fleet-e2e/Cargo.toml index f11246e0..a97dcb8d 100644 --- a/fleet/harmony-fleet-e2e/Cargo.toml +++ b/fleet/harmony-fleet-e2e/Cargo.toml @@ -60,3 +60,4 @@ uuid = { version = "1", features = ["v4"] } [dev-dependencies] tokio = { workspace = true, features = ["full"] } +chrono = { workspace = true } diff --git a/fleet/harmony-fleet-e2e/src/kv_admin.rs b/fleet/harmony-fleet-e2e/src/kv_admin.rs index eca29dd6..fdcfbe6b 100644 --- a/fleet/harmony-fleet-e2e/src/kv_admin.rs +++ b/fleet/harmony-fleet-e2e/src/kv_admin.rs @@ -172,6 +172,22 @@ impl AdminKv { Ok(()) } + /// Publish a `device-state` entry directly, simulating an agent report. + /// Recovery tests use this to assert the operator rebuilds health counts + /// from KV alone after a restart. + pub async fn put_device_state(&self, state: &DeploymentState) -> Result<(), AdminKvError> { + let key = device_state_key(&state.device_id.to_string(), &state.deployment); + let payload = serde_json::to_vec(state).map_err(AdminKvError::Serialize)?; + self.device_state + .put(&key, payload.into()) + .await + .map_err(|e| AdminKvError::Put { + key, + source: Box::new(e), + })?; + Ok(()) + } + /// Delete a desired-state entry. Triggers the agent's tombstone /// reconcile path: containers for that deployment are removed /// and the corresponding device-state entry is dropped. diff --git a/fleet/harmony-fleet-e2e/tests/operator_recovery.rs b/fleet/harmony-fleet-e2e/tests/operator_recovery.rs new file mode 100644 index 00000000..ef6543b3 --- /dev/null +++ b/fleet/harmony-fleet-e2e/tests/operator_recovery.rs @@ -0,0 +1,439 @@ +//! Operator restart + aggregator recovery regression tests (v0.3 Ch2). +//! +//! Each scenario in `docs/fleet-operator-recovery-scenarios.md` has a test +//! here. They drive `fleet_aggregator::run` in-process against the shared +//! NATS + k3d stack, aborting and respawning it to simulate operator +//! restarts. `run` owns its watchers in a `JoinSet`, so a cancelled +//! aggregator leaves no orphan tasks racing the next one. +//! +//! Gated behind `HARMONY_FLEET_E2E=1` (needs k3d + podman on PATH). + +use std::collections::BTreeMap; +use std::time::{Duration, Instant}; + +use async_nats::jetstream; +use chrono::Utc; +use harmony::modules::fleet::operator::crd::{ + Deployment, DeploymentSpec, Device, DeviceSpec, PodmanService, PodmanV0Score, ReconcileScore, + Rollout, RolloutStrategy, +}; +use harmony_fleet_e2e::{AdminKv, StackOptions, shared_stack}; +use harmony_fleet_operator::fleet_aggregator; +use harmony_fleet_operator::liveness::OperatorLiveness; +use harmony_reconciler_contracts::{ + BUCKET_DESIRED_STATE, DeploymentName, DeploymentState, Id, Phase, desired_state_key, +}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector; +use kube::Client; +use kube::api::{Api, DeleteParams, ObjectMeta, PostParams}; +use tokio::task::JoinHandle; + +const E2E_ENV: &str = "HARMONY_FLEET_E2E"; +const LABEL_KEY: &str = "recovery-grp"; + +fn e2e_enabled() -> bool { + matches!(std::env::var(E2E_ENV).as_deref(), Ok("1" | "true")) +} + +fn skip() { + eprintln!("skipping {E2E_ENV}-gated test (set {E2E_ENV}=1; needs k3d + podman)"); +} + +// ── Fixtures ──────────────────────────────────────────────────────────── + +/// Shared stack with no in-cluster operator: the tests own the aggregator +/// lifecycle. CRDs are applied directly so the watchers have something to read. +async fn recovery_stack() -> anyhow::Result> { + let _ = tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("warn")), + ) + .try_init(); + + let stack = shared_stack(StackOptions { + deploy_agent: false, + deploy_operator: false, + ..StackOptions::default() + }) + .await?; + + harmony_fleet_deploy::operator::install_crds().await?; + wait_crds_ready().await?; + Ok(stack) +} + +/// CRD apply doesn't block on `Established`; poll until a list succeeds. +async fn wait_crds_ready() -> anyhow::Result<()> { + let client = Client::try_default().await?; + let deployments: Api = Api::all(client.clone()); + let devices: Api = Api::all(client); + let deadline = Instant::now() + Duration::from_secs(30); + loop { + let ok = deployments.list(&Default::default()).await.is_ok() + && devices.list(&Default::default()).await.is_ok(); + if ok { + return Ok(()); + } + if Instant::now() >= deadline { + anyhow::bail!("CRDs not Established within 30s"); + } + tokio::time::sleep(Duration::from_millis(500)).await; + } +} + +async fn admin_nats(stack: &harmony_fleet_e2e::Stack) -> anyhow::Result { + Ok(async_nats::ConnectOptions::new() + .user_and_password(stack.admin_user.clone(), stack.admin_pass.clone()) + .connect(&stack.nats_url) + .await?) +} + +/// Spawn an aggregator instance. Returns its handle (abort to "kill the +/// operator") and the liveness it latches on convergence. +async fn spawn_aggregator( + stack: &harmony_fleet_e2e::Stack, +) -> anyhow::Result<(JoinHandle<()>, OperatorLiveness)> { + let kube = Client::try_default().await?; + let js = jetstream::new(admin_nats(stack).await?); + let liveness = OperatorLiveness::new(); + let handle = { + let liveness = liveness.clone(); + tokio::spawn(async move { + if let Err(e) = fleet_aggregator::run(kube, js, liveness).await { + tracing::warn!(error = %e, "test aggregator exited"); + } + }) + }; + Ok((handle, liveness)) +} + +/// Abort an aggregator and wait for it (and its `JoinSet` children) to tear down +/// before the next one starts. +async fn kill(handle: JoinHandle<()>) { + handle.abort(); + let _ = handle.await; +} + +fn podman_score() -> PodmanV0Score { + PodmanV0Score { + services: vec![PodmanService { + name: "hello".to_string(), + image: "docker.io/library/hello-world:latest".to_string(), + ports: vec![], + env: vec![], + volumes: vec![], + restart_policy: Default::default(), + }], + } +} + +fn device_with_label(name: &str, group: &str) -> Device { + let mut labels = BTreeMap::new(); + labels.insert(LABEL_KEY.to_string(), group.to_string()); + Device { + metadata: ObjectMeta { + name: Some(name.to_string()), + labels: Some(labels), + ..Default::default() + }, + spec: DeviceSpec { inventory: None }, + status: None, + } +} + +fn deployment_matching(name: &str, group: &str) -> Deployment { + let mut ml = BTreeMap::new(); + ml.insert(LABEL_KEY.to_string(), group.to_string()); + Deployment { + metadata: ObjectMeta { + name: Some(name.to_string()), + ..Default::default() + }, + spec: DeploymentSpec { + target_selector: LabelSelector { + match_labels: Some(ml), + match_expressions: None, + }, + score: ReconcileScore::PodmanV0(podman_score()), + rollout: Rollout { + strategy: RolloutStrategy::Immediate, + }, + }, + status: None, + } +} + +/// Poll until `f` resolves true or the budget elapses. +async fn wait_until(budget: Duration, mut f: F) -> anyhow::Result<()> +where + F: FnMut() -> Fut, + Fut: std::future::Future, +{ + let deadline = Instant::now() + budget; + loop { + if f().await { + return Ok(()); + } + if Instant::now() >= deadline { + anyhow::bail!("condition not met within {budget:?}"); + } + tokio::time::sleep(Duration::from_millis(300)).await; + } +} + +async fn desired_state_present( + stack: &harmony_fleet_e2e::Stack, + device: &str, + deployment: &str, +) -> anyhow::Result { + let js = jetstream::new(admin_nats(stack).await?); + let bucket = js.get_key_value(BUCKET_DESIRED_STATE).await?; + let key = desired_state_key(device, &DeploymentName::try_new(deployment)?); + Ok(bucket.get(&key).await?.is_some()) +} + +async fn cleanup(stack: &harmony_fleet_e2e::Stack, devices: &[&str], deployments: &[&str]) { + if let Ok(client) = Client::try_default().await { + let dev_api: Api = Api::all(client.clone()); + let dep_api: Api = Api::namespaced(client, &stack.namespace); + for d in devices { + let _ = dev_api.delete(d, &DeleteParams::default()).await; + } + for d in deployments { + let _ = dep_api.delete(d, &DeleteParams::default()).await; + } + } +} + +// ── Scenario 1: cold restart, healthy fleet ───────────────────────────── + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> { + if !e2e_enabled() { + skip(); + return Ok(()); + } + let stack = recovery_stack().await?; + let client = Client::try_default().await?; + let devices: Api = Api::all(client.clone()); + let deployments: Api = Api::namespaced(client, &stack.namespace); + + let (dev, depl, grp) = ("recov1-dev", "recov1-depl", "g1"); + let _ = devices.delete(dev, &DeleteParams::default()).await; + let _ = deployments.delete(depl, &DeleteParams::default()).await; + devices + .create(&PostParams::default(), &device_with_label(dev, grp)) + .await?; + deployments + .create(&PostParams::default(), &deployment_matching(depl, grp)) + .await?; + + // First operator: converges and writes desired-state for the match. + let (h1, l1) = spawn_aggregator(&stack).await?; + wait_until(Duration::from_secs(30), || async { + l1.is_converged() + && desired_state_present(&stack, dev, depl) + .await + .unwrap_or(false) + }) + .await?; + + // Simulate the agent having reported Running, then kill the operator. + let kv = AdminKv::connect(&admin_nats(&stack).await?).await?; + let depl_name = DeploymentName::try_new(depl)?; + kv.put_device_state(&DeploymentState { + device_id: Id::from(dev), + deployment: depl_name.clone(), + phase: Phase::Running, + last_event_at: Utc::now(), + last_error: None, + }) + .await?; + kill(h1).await; + + // Second operator: rebuilds from KV alone. + let (h2, l2) = spawn_aggregator(&stack).await?; + wait_until(Duration::from_secs(30), || async { l2.is_converged() }).await?; + + // Desired-state survives the restart (idempotent re-write, not churn). + assert!( + desired_state_present(&stack, dev, depl).await?, + "desired-state must persist across restart" + ); + // Health count is rebuilt from device-state KV alone. + wait_until(Duration::from_secs(30), || async { + deployments + .get(depl) + .await + .ok() + .and_then(|cr| cr.status?.aggregate) + .map(|a| a.succeeded == 1 && a.matched_device_count == 1) + .unwrap_or(false) + }) + .await?; + + kill(h2).await; + cleanup(&stack, &[dev], &[depl]).await; + Ok(()) +} + +// ── Scenario 2: CR deleted while operator down ────────────────────────── + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn aggregator_gcs_desired_state_for_deleted_cr() -> anyhow::Result<()> { + if !e2e_enabled() { + skip(); + return Ok(()); + } + let stack = recovery_stack().await?; + let client = Client::try_default().await?; + let devices: Api = Api::all(client.clone()); + let deployments: Api = Api::namespaced(client, &stack.namespace); + + let (dev, depl, grp) = ("recov2-dev", "recov2-depl", "g2"); + let _ = devices.delete(dev, &DeleteParams::default()).await; + let _ = deployments.delete(depl, &DeleteParams::default()).await; + devices + .create(&PostParams::default(), &device_with_label(dev, grp)) + .await?; + deployments + .create(&PostParams::default(), &deployment_matching(depl, grp)) + .await?; + + let (h1, l1) = spawn_aggregator(&stack).await?; + wait_until(Duration::from_secs(30), || async { + l1.is_converged() + && desired_state_present(&stack, dev, depl) + .await + .unwrap_or(false) + }) + .await?; + kill(h1).await; + + // CR force-deleted while down — no controller running, so no finalizer + // blocks it. The desired-state entry is now an orphan. + deployments.delete(depl, &DeleteParams::default()).await?; + assert!( + desired_state_present(&stack, dev, depl).await?, + "orphan desired-state should still be present before recovery" + ); + + // Recovery GCs the orphan once the CR list is replayed. + let (h2, _l2) = spawn_aggregator(&stack).await?; + wait_until(Duration::from_secs(30), || async { + !desired_state_present(&stack, dev, depl) + .await + .unwrap_or(true) + }) + .await?; + + kill(h2).await; + cleanup(&stack, &[dev], &[]).await; + Ok(()) +} + +// ── Scenario 3: two operators racing ──────────────────────────────────── + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn two_aggregators_produce_identical_desired_state() -> anyhow::Result<()> { + if !e2e_enabled() { + skip(); + return Ok(()); + } + let stack = recovery_stack().await?; + let client = Client::try_default().await?; + let devices: Api = Api::all(client.clone()); + let deployments: Api = Api::namespaced(client, &stack.namespace); + + let (dev, depl, grp) = ("recov3-dev", "recov3-depl", "g3"); + let _ = devices.delete(dev, &DeleteParams::default()).await; + let _ = deployments.delete(depl, &DeleteParams::default()).await; + devices + .create(&PostParams::default(), &device_with_label(dev, grp)) + .await?; + deployments + .create(&PostParams::default(), &deployment_matching(depl, grp)) + .await?; + + // Two operators at once (rolling-deploy overlap). + let (h_a, l_a) = spawn_aggregator(&stack).await?; + let (h_b, l_b) = spawn_aggregator(&stack).await?; + wait_until(Duration::from_secs(30), || async { + l_a.is_converged() && l_b.is_converged() + }) + .await?; + + // The KV value is the deterministic serialized score regardless of which + // operator wrote it — idempotent multi-writer, no leader election. + let js = jetstream::new(admin_nats(&stack).await?); + let bucket = js.get_key_value(BUCKET_DESIRED_STATE).await?; + let key = desired_state_key(dev, &DeploymentName::try_new(depl)?); + let value = bucket.get(&key).await?.expect("desired-state present"); + let expected = serde_json::to_vec(&ReconcileScore::PodmanV0(podman_score()))?; + assert_eq!( + value.to_vec(), + expected, + "racing writers must agree byte-for-byte" + ); + + kill(h_a).await; + kill(h_b).await; + cleanup(&stack, &[dev], &[depl]).await; + Ok(()) +} + +// ── Scenario 5: chaos kill under write load ───────────────────────────── + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn chaos_kill_under_write_load_converges() -> anyhow::Result<()> { + if !e2e_enabled() { + skip(); + return Ok(()); + } + let stack = recovery_stack().await?; + let client = Client::try_default().await?; + let devices: Api = Api::all(client.clone()); + let deployments: Api = Api::namespaced(client, &stack.namespace); + + let (dev, grp) = ("recov5-dev", "g5"); + let names: Vec = (0..5).map(|i| format!("recov5-depl-{i}")).collect(); + let _ = devices.delete(dev, &DeleteParams::default()).await; + for n in &names { + let _ = deployments.delete(n, &DeleteParams::default()).await; + } + devices + .create(&PostParams::default(), &device_with_label(dev, grp)) + .await?; + + // Create deployments while killing/restarting the operator mid-stream. + let (mut handle, _) = spawn_aggregator(&stack).await?; + for (i, n) in names.iter().enumerate() { + deployments + .create(&PostParams::default(), &deployment_matching(n, grp)) + .await?; + if i == 2 { + kill(handle).await; + (handle, _) = spawn_aggregator(&stack).await?; + } + } + kill(handle).await; + + // A replica that stays up must converge the full set within 30s. + let (h_final, l_final) = spawn_aggregator(&stack).await?; + wait_until(Duration::from_secs(30), || async { l_final.is_converged() }).await?; + wait_until(Duration::from_secs(30), || async { + for n in &names { + if !desired_state_present(&stack, dev, n).await.unwrap_or(false) { + return false; + } + } + true + }) + .await?; + + kill(h_final).await; + let dep_refs: Vec<&str> = names.iter().map(String::as_str).collect(); + cleanup(&stack, &[dev], &dep_refs).await; + Ok(()) +} diff --git a/fleet/harmony-fleet-operator/src/fleet_aggregator.rs b/fleet/harmony-fleet-operator/src/fleet_aggregator.rs index 9eec10df..3cede511 100644 --- a/fleet/harmony-fleet-operator/src/fleet_aggregator.rs +++ b/fleet/harmony-fleet-operator/src/fleet_aggregator.rs @@ -38,6 +38,8 @@ use harmony::modules::fleet::operator::{ }; use tracing::warn; +use crate::liveness::OperatorLiveness; + const PATCH_TICK: Duration = Duration::from_secs(1); // --------------------------------------------------------------------------- @@ -149,7 +151,11 @@ fn matched_devices( // Top-level run // --------------------------------------------------------------------------- -pub async fn run(client: Client, js: async_nats::jetstream::Context) -> anyhow::Result<()> { +pub async fn run( + client: Client, + js: async_nats::jetstream::Context, + liveness: OperatorLiveness, +) -> anyhow::Result<()> { let state_bucket = js .create_key_value(async_nats::jetstream::kv::Config { bucket: BUCKET_DEVICE_STATE.to_string(), @@ -173,71 +179,89 @@ pub async fn run(client: Client, js: async_nats::jetstream::Context) -> anyhow:: let devices_api: Api = Api::all(client.clone()); let patch_api: Api = Api::all(client); - tracing::info!( - owned = state - .lock() - .await - .owned_targets - .values() - .map(|s| s.len()) - .sum::(), - "aggregator: startup complete" - ); + // Compute before logging: a `.await` inside the macro would hold the + // event's non-Send format temporaries across it, making `run` non-Send. + let owned_count: usize = { + let guard = state.lock().await; + guard.owned_targets.values().map(|s| s.len()).sum() + }; + tracing::info!(owned = owned_count, "aggregator: startup complete"); - let state_watcher_handle = { + // A JoinSet so cancelling `run` (its future dropped) aborts every child + // task — no orphan watchers outliving the aggregator. The whole point of + // recovery is a clean stop/start, so this cancellation property matters + // both in tests (restart simulation) and in process teardown. + let mut tasks = tokio::task::JoinSet::new(); + + { let state = state.clone(); let bucket = state_bucket.clone(); - tokio::spawn(async move { - if let Err(e) = run_state_kv_watcher(bucket, state).await { + let liveness = liveness.clone(); + tasks.spawn(async move { + if let Err(e) = run_state_kv_watcher(bucket, state, liveness).await { tracing::warn!(error = %e, "aggregator: state watcher exited"); } - }) - }; - - let deployment_watcher_handle = { + }); + } + { let state = state.clone(); let desired = desired_bucket.clone(); - tokio::spawn(async move { - if let Err(e) = run_deployment_watcher(deployments_api.clone(), state, desired).await { + let liveness = liveness.clone(); + tasks.spawn(async move { + if let Err(e) = + run_deployment_watcher(deployments_api.clone(), state, desired, liveness).await + { tracing::warn!(error = %e, "aggregator: deployment watcher exited"); } - }) - }; - - let device_watcher_handle = { + }); + } + { let state = state.clone(); let desired = desired_bucket.clone(); - tokio::spawn(async move { - if let Err(e) = run_device_watcher(devices_api, state, desired).await { + let liveness = liveness.clone(); + tasks.spawn(async move { + if let Err(e) = run_device_watcher(devices_api, state, desired, liveness).await { tracing::warn!(error = %e, "aggregator: device watcher exited"); } - }) - }; - - let patch_state = state.clone(); - let patch_loop = async move { - let mut ticker = tokio::time::interval(PATCH_TICK); - ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - loop { - ticker.tick().await; - if let Err(e) = patch_tick(&patch_api, &patch_state).await { - tracing::warn!(error = %e, "aggregator: patch tick failed"); - } - } - }; - - tokio::select! { - _ = patch_loop => Ok(()), - _ = state_watcher_handle => Ok(()), - _ = deployment_watcher_handle => Ok(()), - _ = device_watcher_handle => Ok(()), + }); } + { + let patch_state = state.clone(); + tasks.spawn(async move { + let mut ticker = tokio::time::interval(PATCH_TICK); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + loop { + ticker.tick().await; + if let Err(e) = patch_tick(&patch_api, &patch_state).await { + tracing::warn!(error = %e, "aggregator: patch tick failed"); + } + } + }); + } + + // Return when the first task exits (mirrors the old `select!`); dropping + // `tasks` here aborts the rest. + tasks.join_next().await; + Ok(()) } // --------------------------------------------------------------------------- // Device-state KV watcher (unchanged path) // --------------------------------------------------------------------------- +/// True when the bucket currently holds no keys. Used to short-circuit the +/// caught-up signal: an empty bucket's watch yields no entry to carry +/// `seen_current`. +async fn bucket_is_empty(bucket: &Store) -> bool { + match bucket.keys().await { + Ok(mut keys) => keys.next().await.is_none(), + Err(e) => { + tracing::warn!(error = %e, "aggregator: keys() probe failed; assuming non-empty"); + false + } + } +} + fn parse_state_key(key: &str) -> Option { let rest = key.strip_prefix("state.")?; let (device, deployment) = rest.split_once('.')?; @@ -247,7 +271,18 @@ fn parse_state_key(key: &str) -> Option { }) } -async fn run_state_kv_watcher(bucket: Store, state: SharedFleetState) -> anyhow::Result<()> { +async fn run_state_kv_watcher( + bucket: Store, + state: SharedFleetState, + liveness: OperatorLiveness, +) -> anyhow::Result<()> { + // An empty bucket replays nothing, so `seen_current` never surfaces on an + // entry — mark the state source caught-up up front when there's nothing to + // replay. (A write landing just after this is replayed by the watch below + // and re-marks; the mark is an idempotent latch.) + if bucket_is_empty(&bucket).await { + liveness.mark_states_ready(); + } let mut watch = bucket.watch_with_history(">").await?; while let Some(entry_res) = watch.next().await { let entry = match entry_res { @@ -257,6 +292,11 @@ async fn run_state_kv_watcher(bucket: Store, state: SharedFleetState) -> anyhow: continue; } }; + // First entry that has caught up to the bucket head means the cold + // replay is done. + if entry.seen_current { + liveness.mark_states_ready(); + } let Some(pair) = parse_state_key(&entry.key) else { continue; }; @@ -359,6 +399,7 @@ async fn run_deployment_watcher( api: Api, state: SharedFleetState, desired: Store, + liveness: OperatorLiveness, ) -> anyhow::Result<()> { let mut stream = watcher::watcher(api, WatcherConfig::default()).boxed(); while let Some(event) = stream.try_next().await? { @@ -369,12 +410,60 @@ async fn run_deployment_watcher( Event::Delete(cr) => { on_deployment_delete(&state, &desired, cr).await; } - Event::Init | Event::InitDone => {} + Event::Init => {} + Event::InitDone => { + // The initial CR list is fully replayed; the deployment cache is + // now authoritative. GC desired-state for any deployment that + // was seeded from KV but has no surviving CR (deleted while we + // were down — finalizer bypassed, e.g. namespace teardown), + // then declare this source converged. + gc_orphaned_desired_state(&state, &desired).await; + liveness.mark_deployments_ready(); + } } } Ok(()) } +/// Purge desired-state entries whose Deployment CR no longer exists. Runs once, +/// after the initial CR list is loaded: `owned_targets` was seeded from the KV +/// (so it includes deployments the previous operator wrote), and any name with +/// no CR in the now-complete cache is an orphan. Belt-and-suspenders with the +/// controller finalizer, which covers deletes that happen while we're *up*. +async fn gc_orphaned_desired_state(state: &SharedFleetState, desired: &Store) { + let orphans: Vec<(DeploymentName, HashSet)> = { + let mut guard = state.lock().await; + let live: HashSet = guard + .deployments + .values() + .map(|d| d.deployment_name.clone()) + .collect(); + let orphan_names: Vec = guard + .owned_targets + .keys() + .filter(|n| !live.contains(*n)) + .cloned() + .collect(); + orphan_names + .into_iter() + .filter_map(|n| guard.owned_targets.remove(&n).map(|set| (n, set))) + .collect() + }; + for (deployment_name, devices) in orphans { + for device in &devices { + let k = desired_state_key(device, &deployment_name); + if let Err(e) = desired.delete(&k).await { + tracing::debug!(key = %k, error = %e, "aggregator: orphan desired-state delete failed"); + } + } + tracing::info!( + deployment = ?deployment_name, + count = devices.len(), + "aggregator: GC'd orphaned desired-state for deleted deployment" + ); + } +} + async fn on_deployment_upsert(state: &SharedFleetState, desired: &Store, cr: Deployment) { let Some(key) = DeploymentKey::from_cr(&cr) else { return; @@ -467,6 +556,7 @@ async fn run_device_watcher( api: Api, state: SharedFleetState, desired: Store, + liveness: OperatorLiveness, ) -> anyhow::Result<()> { let mut stream = watcher::watcher(api, WatcherConfig::default()).boxed(); while let Some(event) = stream.try_next().await? { @@ -477,7 +567,9 @@ async fn run_device_watcher( Event::Delete(dev) => { on_device_delete(&state, &desired, dev).await; } - Event::Init | Event::InitDone => {} + Event::Init => {} + // Initial CR list fully replayed into the cache. + Event::InitDone => liveness.mark_devices_ready(), } } Ok(()) diff --git a/fleet/harmony-fleet-operator/src/frontend/layout.rs b/fleet/harmony-fleet-operator/src/frontend/layout.rs index 7e039d58..b77faa76 100644 --- a/fleet/harmony-fleet-operator/src/frontend/layout.rs +++ b/fleet/harmony-fleet-operator/src/frontend/layout.rs @@ -1,5 +1,7 @@ use maud::{DOCTYPE, Markup, PreEscaped, html}; +use harmony_fleet_operator::liveness::RecoveryPhase; + use crate::frontend::auth::DashboardSession; // ── Inline SVG icons ──────────────────────────────────────────────────── @@ -41,6 +43,9 @@ pub fn page( (sidebar(current_path, session, unacked_alerts)) main class="flex-1 min-w-0 flex flex-col overflow-hidden" { (topbar(title, unacked_alerts)) + // Bootstrap the recovery banner; the fragment self-polls + // while recovering and goes inert once converged. + div id="recovery-banner" hx-get="/__recovery" hx-trigger="load" hx-swap="outerHTML" {} div class="flex-1 overflow-y-auto grid-bg" { (content) } } } @@ -50,6 +55,28 @@ pub fn page( } } +/// Recovery-state banner fragment swapped into `#recovery-banner`. +/// +/// `Recovering` → a visible banner that re-requests itself every 2s. +/// `Converged` → an inert element with no trigger, so polling stops and the +/// banner disappears. Same element id either way so HTMX `outerHTML` swaps cleanly. +pub fn recovery_banner(phase: RecoveryPhase) -> Markup { + match phase { + RecoveryPhase::Recovering => html! { + div id="recovery-banner" + hx-get="/__recovery" hx-trigger="load delay:2s" hx-swap="outerHTML" + class="flex items-center gap-2 px-6 py-2 text-[12px] border-b" + style="background:rgba(234,179,8,0.10); border-color:rgba(234,179,8,0.25); color:#fde68a" { + span class="inline-block w-2 h-2 rounded-full animate-pulse" style="background:#eab308" {} + "Operator recovering — rebuilding fleet state from NATS. Counts may be briefly stale." + } + }, + RecoveryPhase::Converged => html! { + div id="recovery-banner" {} + }, + } +} + fn sidebar( current_path: &str, session: Option<&DashboardSession>, diff --git a/fleet/harmony-fleet-operator/src/frontend/server.rs b/fleet/harmony-fleet-operator/src/frontend/server.rs index dea07f34..6d26b6cb 100644 --- a/fleet/harmony-fleet-operator/src/frontend/server.rs +++ b/fleet/harmony-fleet-operator/src/frontend/server.rs @@ -121,6 +121,8 @@ pub fn router(state: AppState) -> Router { // Settings .route("/settings", get(settings_handler)) .route("/settings/toggle/{key}", post(settings_toggle_handler)) + // Operator recovery banner (HTMX self-poll while recovering) + .route("/__recovery", get(recovery_banner_handler)) // Logout .route("/logout", get(auth::logout_handler)) .route_layer(middleware::from_fn_with_state(state.clone(), csrf_protect)) @@ -615,6 +617,13 @@ async fn device_logs_handler( Ok(devices_view::logs_modal(&id, q.deployment.as_deref())) } +/// Recovery banner fragment. While the aggregator replays KV/CR history this +/// returns a banner that re-fetches itself every 2s; once converged it returns +/// an inert empty element, so the poll naturally stops. +async fn recovery_banner_handler(State(s): State) -> Markup { + crate::frontend::layout::recovery_banner(s.fleet.recovery_phase()) +} + /// One-shot `podman logs` tail of the device's deployment container, /// fetched over the NATS command channel. Rendered as the log panel /// body (HTMX swaps it in on `load` and on Refresh). diff --git a/fleet/harmony-fleet-operator/src/lib.rs b/fleet/harmony-fleet-operator/src/lib.rs index da2f81d6..fd1820fe 100644 --- a/fleet/harmony-fleet-operator/src/lib.rs +++ b/fleet/harmony-fleet-operator/src/lib.rs @@ -16,3 +16,4 @@ pub mod commands; pub mod device_reconciler; pub mod device_status; pub mod fleet_aggregator; +pub mod liveness; diff --git a/fleet/harmony-fleet-operator/src/liveness.rs b/fleet/harmony-fleet-operator/src/liveness.rs new file mode 100644 index 00000000..72385d6a --- /dev/null +++ b/fleet/harmony-fleet-operator/src/liveness.rs @@ -0,0 +1,103 @@ +//! Operator recovery liveness. +//! +//! After a restart the aggregator cold-rebuilds its caches from NATS KV + +//! kube CR watches. Until that replay completes, the dashboard's counts can +//! be stale — so we expose a coarse [`RecoveryPhase`] the UI shows as a banner +//! ("recovering" → "converged"), and the customer sees progress instead of a +//! silent stale view. +//! +//! Convergence = all three cold-start sources have replayed their current +//! contents at least once: Deployment CRs, Device CRs, and the device-state KV. +//! Each is a one-way latch (`Recovering` → `Converged`, never back), set from +//! the natural caught-up signals (`watcher::Event::InitDone`, KV +//! `Entry::seen_current`). + +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; + +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum RecoveryPhase { + /// Still replaying KV / CR history; counts may be stale. + Recovering, + /// All cold-start sources replayed; the picture is current. + Converged, +} + +/// Cheap-to-clone handle shared between the aggregator (writer) and the +/// in-process dashboard (reader). Both live in one operator process, so a +/// shared atomic is all the coordination needed. +#[derive(Clone, Default)] +pub struct OperatorLiveness { + inner: Arc, +} + +#[derive(Default)] +struct Inner { + deployments_ready: AtomicBool, + devices_ready: AtomicBool, + states_ready: AtomicBool, +} + +impl OperatorLiveness { + pub fn new() -> Self { + Self::default() + } + + pub fn mark_deployments_ready(&self) { + self.inner.deployments_ready.store(true, Ordering::Relaxed); + } + + pub fn mark_devices_ready(&self) { + self.inner.devices_ready.store(true, Ordering::Relaxed); + } + + pub fn mark_states_ready(&self) { + self.inner.states_ready.store(true, Ordering::Relaxed); + } + + pub fn phase(&self) -> RecoveryPhase { + let i = &self.inner; + if i.deployments_ready.load(Ordering::Relaxed) + && i.devices_ready.load(Ordering::Relaxed) + && i.states_ready.load(Ordering::Relaxed) + { + RecoveryPhase::Converged + } else { + RecoveryPhase::Recovering + } + } + + pub fn is_converged(&self) -> bool { + self.phase() == RecoveryPhase::Converged + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn starts_recovering_and_needs_all_three_sources() { + let l = OperatorLiveness::new(); + assert_eq!(l.phase(), RecoveryPhase::Recovering); + l.mark_deployments_ready(); + l.mark_devices_ready(); + assert_eq!(l.phase(), RecoveryPhase::Recovering, "states still pending"); + l.mark_states_ready(); + assert_eq!(l.phase(), RecoveryPhase::Converged); + } + + #[test] + fn converged_is_a_one_way_latch_and_clones_share_state() { + let a = OperatorLiveness::new(); + let b = a.clone(); + a.mark_deployments_ready(); + a.mark_devices_ready(); + a.mark_states_ready(); + // The reader clone observes the writer's marks. + assert!(b.is_converged()); + // Marking again is idempotent — no way back to Recovering. + b.mark_states_ready(); + assert!(a.is_converged()); + } +} diff --git a/fleet/harmony-fleet-operator/src/main.rs b/fleet/harmony-fleet-operator/src/main.rs index 8b8d56ba..66d04c1f 100644 --- a/fleet/harmony-fleet-operator/src/main.rs +++ b/fleet/harmony-fleet-operator/src/main.rs @@ -129,7 +129,11 @@ async fn serve_web( // Standalone serve-web has no NATS, so device exec/log tailing is // unavailable here — those features need the deployed operator // (`run`), which threads in the command channel. - Arc::new(RealFleetService::new(Client::try_default().await?, None)) + Arc::new(RealFleetService::new( + Client::try_default().await?, + None, + None, + )) }; serve_dashboard(fleet, addr, css_from, live_reload).await } @@ -192,6 +196,7 @@ async fn serve_dashboard( fn spawn_dashboard( client: Client, commands: harmony_fleet_operator::commands::FleetCommandsClient, + liveness: harmony_fleet_operator::liveness::OperatorLiveness, ) { use std::net::SocketAddr; use std::sync::Arc; @@ -200,7 +205,11 @@ fn spawn_dashboard( let addr = SocketAddr::from(([0, 0, 0, 0], frontend::server::DEFAULT_PORT)); tokio::spawn(async move { - let fleet = Arc::new(RealFleetService::new(client, Some(commands))); + let fleet = Arc::new(RealFleetService::new( + client, + Some(commands), + Some(liveness), + )); if let Err(e) = serve_dashboard(fleet, addr, None, false).await { tracing::error!(error = %e, "dashboard server exited; reconcile continues"); } @@ -221,6 +230,10 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()> let client = Client::try_default().await?; + // Shared recovery signal: the aggregator latches it to Converged once it + // has replayed KV + CR history; the dashboard reads it for the banner. + let liveness = harmony_fleet_operator::liveness::OperatorLiveness::new(); + // Serve the read-only dashboard in the same process (best-effort). // It reads CRs for state and uses the NATS command channel for // device exec / log tailing. Built only with the web-frontend @@ -229,6 +242,7 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()> spawn_dashboard( client.clone(), harmony_fleet_operator::commands::FleetCommandsClient::new(nats.clone()), + liveness.clone(), ); // Concurrent tasks: @@ -248,7 +262,7 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()> r = controller::run(ctl_client, desired_state_kv) => r, r = device_reconciler::run(dr_client, dr_js) => r, r = device_status::run(ds_client, ds_js) => r, - r = fleet_aggregator::run(client, js) => r, + r = fleet_aggregator::run(client, js, liveness) => r, } } diff --git a/fleet/harmony-fleet-operator/src/service/mod.rs b/fleet/harmony-fleet-operator/src/service/mod.rs index 5602b8f5..3a3f7380 100644 --- a/fleet/harmony-fleet-operator/src/service/mod.rs +++ b/fleet/harmony-fleet-operator/src/service/mod.rs @@ -45,6 +45,12 @@ pub trait FleetService: Send + Sync + 'static { device_id: &str, deployment: Option<&str>, ) -> anyhow::Result; + /// Coarse operator recovery state for the dashboard banner. Sync — a cheap + /// in-memory read, not a CR/NATS round-trip. Defaults to `Converged`, which + /// the mock and the standalone `serve-web` path (no aggregator) inherit. + fn recovery_phase(&self) -> harmony_fleet_operator::liveness::RecoveryPhase { + harmony_fleet_operator::liveness::RecoveryPhase::Converged + } } // ── Device ───────────────────────────────────────────────────────────── diff --git a/fleet/harmony-fleet-operator/src/service/real.rs b/fleet/harmony-fleet-operator/src/service/real.rs index fe5acfee..252d6bed 100644 --- a/fleet/harmony-fleet-operator/src/service/real.rs +++ b/fleet/harmony-fleet-operator/src/service/real.rs @@ -25,6 +25,7 @@ use harmony::modules::fleet::operator::{ use harmony::modules::podman::ReconcileScore; use harmony_fleet_operator::commands::FleetCommandsClient; use harmony_fleet_operator::fleet_aggregator::selector_matches; +use harmony_fleet_operator::liveness::{OperatorLiveness, RecoveryPhase}; use harmony_reconciler_contracts::ExecReply; use super::{ @@ -46,14 +47,23 @@ pub struct RealFleetService { /// In-memory ack set. Alerts are derived from live CR state and /// have no store of their own, so acks don't survive a restart. acked_alerts: Mutex>, + /// Aggregator recovery signal, shared in-process. `None` in the + /// standalone `serve-web` path (no aggregator running) → reported as + /// `Converged` so the banner never shows there. + liveness: Option, } impl RealFleetService { - pub fn new(kube: Client, commands: Option) -> Self { + pub fn new( + kube: Client, + commands: Option, + liveness: Option, + ) -> Self { Self { kube, commands, acked_alerts: Mutex::new(HashSet::new()), + liveness, } } @@ -360,6 +370,13 @@ fn derive_alerts( #[async_trait] impl FleetService for RealFleetService { + fn recovery_phase(&self) -> RecoveryPhase { + self.liveness + .as_ref() + .map(|l| l.phase()) + .unwrap_or(RecoveryPhase::Converged) + } + async fn dashboard_detail(&self) -> anyhow::Result { let devices = self.devices().await?; let mut deployments = self.deployments().await?; -- 2.39.5 From 81b0f79f555fde6ba13f860a66ea3442d9be41cb Mon Sep 17 00:00:00 2001 From: Jean-Gabriel Gill-Couture Date: Tue, 9 Jun 2026 16:45:35 -0400 Subject: [PATCH 2/3] docs(fleet): rewrite operator recovery scenarios with diagrams ASCII diagrams per scenario replace dense paragraphs. Architecture overview shows the three watchers, FleetState, and convergence latches. Cold-start sequence and key invariants in scannable tables. --- docs/fleet-operator-recovery-scenarios.md | 240 ++++++++++++++++++---- 1 file changed, 199 insertions(+), 41 deletions(-) diff --git a/docs/fleet-operator-recovery-scenarios.md b/docs/fleet-operator-recovery-scenarios.md index 2c002ca2..2acf463c 100644 --- a/docs/fleet-operator-recovery-scenarios.md +++ b/docs/fleet-operator-recovery-scenarios.md @@ -1,50 +1,208 @@ # Fleet operator recovery scenarios -The operator can be killed, upgraded, or rescheduled at any time. When it comes -back it must **converge from NATS KV + the CRs alone** — no customer-visible -"unknown state" window. This doc enumerates the failure shapes, what correct -recovery looks like, and the regression test that pins each one. +The operator is **stateless across restarts**. It can be killed, upgraded, or +rescheduled at any time. On restart it cold-rebuilds from two durable sources — +kube CRs and NATS KV — with no customer-visible "unknown state" window. -## How recovery works +## Architecture -The operator is stateless across restarts. Everything it needs is durable: +``` +┌─────────────────────────────────────────────────────────────────┐ +│ Operator process │ +│ │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │ +│ │ Deployment │ │ Device │ │ Device-state KV │ │ +│ │ CR watcher │ │ CR watcher │ │ watcher (history) │ │ +│ └──────┬───────┘ └──────┬───────┘ └──────────┬───────────┘ │ +│ │ │ │ │ +│ ▼ ▼ ▼ │ +│ ┌──────────────────────────────────────────────────────────┐ │ +│ │ FleetState (mutex) │ │ +│ │ deployments: {key → CachedDeployment} │ │ +│ │ devices: {name → labels} │ │ +│ │ states: {(device,deployment) → DeploymentState} │ │ +│ │ owned_targets: {deployment → {device…}} │ │ +│ │ dirty: {key…} ──► patch_tick (1 Hz) │ │ +│ └──────────────────────────────────────────────────────────┘ │ +│ │ │ +│ ▼ │ +│ ┌──────────────────┐ ┌────────────────────────────────┐ │ +│ │ desired-state KV │ │ Deployment.status.aggregate │ │ +│ │ put / delete │ │ (patched to kube at 1 Hz) │ │ +│ └──────────────────┘ └────────────────────────────────┘ │ +│ │ +│ ┌──────────────────────────────────────────────────────────┐ │ +│ │ OperatorLiveness (3 atomic latches, shared w/ dashboard) │ │ +│ │ deployments_ready ─┐ │ │ +│ │ devices_ready ─────┼──► all three → Converged │ │ +│ │ states_ready ──────┘ (else: Recovering → banner) │ │ +│ └──────────────────────────────────────────────────────────┘ │ +└─────────────────────────────────────────────────────────────────┘ +``` -- **Deployment / Device CRs** live in etcd (kube). On restart the aggregator's - `watcher` replays the full list (`Event::InitApply` …→ `Event::InitDone`). -- **`desired-state` KV** is what the operator previously wrote per - `(device, deployment)`. On restart `seed_owned_targets` reloads it so the - first reconcile diffs correctly instead of orphaning entries. -- **`device-state` KV** is per-device phase reported by agents. On restart the - state watcher replays it (`watch_with_history`), rebuilding health counts. +### Cold-start sequence -All writes are **idempotent and byte-deterministic**: the desired-state payload -is the same serialized score for a given CR, and status patches are computed -from the rebuilt caches. So a second operator racing the first writes identical -bytes — no leader election needed at current fleet size (operator HA is D3, -deferred). See [`fleet_aggregator.rs`]. +``` +Operator starts + │ + ├─ seed_owned_targets() reads desired-state KV keys + │ → populates owned_targets + │ (so first reconcile diffs correctly) + │ + ├─ spawn 3 watchers (JoinSet) │ + │ ├─ Deployment CR watcher │ replay → InitDone → mark_deployments_ready + │ │ │ + gc_orphaned_desired_state + │ ├─ Device CR watcher │ replay → InitDone → mark_devices_ready + │ └─ Device-state KV watch │ replay → seen_current → mark_states_ready + │ │ (or: empty bucket → mark immediately) + │ + └─ spawn patch_tick (1 Hz) flushes dirty CR status patches +``` -### Convergence signal +### Convergence banner -Until the cold replay finishes, health counts can be stale. The aggregator -latches an [`OperatorLiveness`] from `Recovering` → `Converged` once all three -sources have replayed (Deployment `InitDone`, Device `InitDone`, device-state KV -`seen_current`). The in-process dashboard reads it and shows a banner, so the -customer sees "recovering", never a silent stale view. Convergence is typically -sub-second; the banner self-clears. +``` +Dashboard reads OperatorLiveness.phase(): + + Recovering ──────► Converged + ┌──────────────┐ ┌─────┐ + │ ⚠ rebuilding │ │ │ (empty div, polling stops) + │ polls /2s │ │ │ + └──────────────┘ └─────┘ +``` + +The banner self-clears when all three latches trip. Typically sub-second. + +### Key invariants + +| Invariant | Mechanism | +|---|---| +| Writes are byte-deterministic | Same CR → same serialized score JSON. Two operators write identical bytes. | +| No leader election needed | Idempotent multi-writer at current fleet size. (HA = D3, deferred.) | +| Cancel = clean teardown | `JoinSet` owns all watchers; dropping `run`'s future aborts children. | +| `owned_targets` seeded before first reconcile | `seed_owned_targets()` reads KV keys so diff doesn't orphan entries. | +| Orphan GC on `InitDone` | Deployment CRs deleted while operator was down → purged at convergence. | +| Out-of-order state dedup | `apply_state` rejects entries with older `last_event_at`. | + +--- ## Scenarios -| # | Shape | Correct recovery | Regression test | -|---|-------|------------------|-----------------| -| 1 | **Cold restart, healthy fleet.** Operator killed and restarted; devices kept reporting state to KV. | Rebuild desired-state + health counts from KV alone. Desired-state entries are **re-written identically, not churned** (agents byte-compare and no-op). Liveness reaches `Converged`. | `aggregator_converges_from_kv_after_restart` | -| 2 | **Stale KV: CR deleted while operator down.** A Deployment CR is force-deleted (finalizer bypassed — namespace teardown, `--force`) while the operator is down, leaving orphan desired-state. | At convergence, GC any desired-state whose Deployment CR no longer exists (`gc_orphaned_desired_state`). Agents stop running the dead deployment. | `aggregator_gcs_desired_state_for_deleted_cr` | -| 3 | **Two operators racing.** A rolling deploy briefly runs two operator replicas. | Both write **identical** desired-state bytes; the KV value is stable; no orphan, no flapping. Idempotent multi-writer, so no leader election required. | `two_aggregators_produce_identical_desired_state` | -| 4 | **Partial KV: device offline during reset.** A device hasn't reported (no `device-state` entry) when the operator restarts. | The deployment still converges; the missing device counts as `pending`, not lost. Recovers to `Running` when the device reports. | covered by #1 (one target has no state entry) + unit `apply_state` dedup | -| 5 | **Chaos: kill under write load.** Operator repeatedly killed/restarted while deployments are being created. | Final state converges to the full desired-state set in < 30 s once a replica stays up. | `chaos_kill_under_write_load_converges` | +### 1. Cold restart, healthy fleet -Out-of-order / replayed `device-state` (an older event arriving after a newer -one) is handled by `apply_state`'s `last_event_at` dedup — unit-tested in -`fleet_aggregator.rs`, exercised on every replay. +Operator killed and restarted. Devices kept reporting to KV throughout. + +``` +Time ──────────────────────────────────────────────────► + +Operator: [running]─── KILL ───[restart]──────────────────► + │ + ├─ seed_owned_targets (from KV keys) + ├─ replay CRs → InitDone + ├─ replay device-state → seen_current + └─ Converged + +KV: [desired-state ✓] [device-state ✓] (untouched) + +Expected: desired-state re-written identically (agents byte-compare → no-op) + health counts rebuilt from device-state KV + liveness → Converged +``` + +**Test:** `aggregator_converges_from_kv_after_restart` + +### 2. Stale KV — CR deleted while operator down + +A Deployment CR is force-deleted (finalizer bypassed: namespace teardown, +`--force`) while no operator is running. The desired-state KV entry is orphaned. + +``` +Time ──────────────────────────────────────────────────► + +Operator: [running]─── KILL ──────────────[restart]──────────► + │ │ + │ ├─ seed_owned_targets + │ │ (picks up orphan key) + │ │ + │ ├─ CR replay: InitDone + │ │ deployment NOT in cache + │ │ + │ └─ gc_orphaned_desired_state + │ deletes orphan from KV + │ +CR: [exists]─── force-delete ──[gone]─────────────────► +KV: [desired-state ✓]──────────[orphan]───[purged]────► + +Expected: orphan desired-state deleted at convergence + agents stop running the dead deployment +``` + +**Test:** `aggregator_gcs_desired_state_for_deleted_cr` + +### 3. Two operators racing (rolling deploy overlap) + +Brief period where two operator replicas run simultaneously. + +``` +Time ──────────────────────────────────────────────────► + +Operator A: [running]────────────────────────────────────────► +Operator B: [start]───────────────────────────────────► + │ + ├─ both seed_owned_targets (same KV keys) + ├─ both replay same CRs + ├─ both compute same matched_devices + └─ both put identical score_json to same KV keys + +KV: [desired-state: bytes_A] == [desired-state: bytes_B] + +Expected: KV value stable regardless of write order + no orphan, no flapping, no leader election needed +``` + +**Test:** `two_aggregators_produce_identical_desired_state` + +### 4. Partial KV — device offline during restart + +A device hasn't reported state (no `device-state` entry) when the operator +restarts. + +``` +Time ──────────────────────────────────────────────────► + +Operator: [running]─── KILL ───[restart]──────────────────► + +Device A: [reporting ✓]────────────────────────────────────► +Device B: [offline]────────────────────────────────────────► + (no device-state entry in KV) + +Expected: Device A → Running (rebuilt from KV) + Device B → Pending (no state entry = pending in aggregate) + Device B recovers to Running when it reports +``` + +**Test:** `device_offline_during_restart_counts_as_pending` + +### 5. Chaos — kill under write load + +Operator repeatedly killed and restarted while deployments are being created. + +``` +Time ──────────────────────────────────────────────────► + +Deployments: [create d0] [create d1] [create d2] [create d3] [create d4] + ▲ +Operator: [running]──────── KILL ──────┘──[restart]──── KILL ──[final]──► + │ + └─ converge + all 5 + +Expected: final operator converges full desired-state set within 30s +``` + +**Test:** `chaos_kill_under_write_load_converges` + +--- ## Running the regression tests @@ -52,10 +210,10 @@ one) is handled by `apply_state`'s `last_event_at` dedup — unit-tested in HARMONY_FLEET_E2E=1 cargo test -p harmony-fleet-e2e --test operator_recovery ``` -Needs k3d + podman on PATH (the shared harness brings up NATS in a fresh -namespace). The tests drive `fleet_aggregator::run` in-process against the real -NATS + k3d, aborting and respawning it to simulate restarts — `run` owns its -watchers in a `JoinSet`, so a cancelled aggregator leaves no orphan tasks. +**Prerequisites:** k3d + podman on PATH. The shared harness brings up NATS in +a fresh namespace. -[`fleet_aggregator.rs`]: ../fleet/harmony-fleet-operator/src/fleet_aggregator.rs -[`OperatorLiveness`]: ../fleet/harmony-fleet-operator/src/liveness.rs +**How it works:** tests drive `fleet_aggregator::run` in-process against the +real NATS + k3d. `spawn_aggregator` returns a `JoinHandle`; `kill` aborts it +and awaits (ensuring the `JoinSet` children tear down before the next instance +starts). -- 2.39.5 From 086d9055863e2c663d1ef2c0bbc6f6393818bdbe Mon Sep 17 00:00:00 2001 From: Jean-Gabriel Gill-Couture Date: Tue, 9 Jun 2026 16:49:52 -0400 Subject: [PATCH 3/3] fix(fleet): clarify recovery tests + add missing scenario 4 + dedup test MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Rewrite e2e tests with explicit SETUP/ACTION/ASSERT structure per scenario. Each test header documents devices, deployments, and expected end state. - Add scenario 4 test: device offline during restart counts as pending (2 devices, 1 deployment, only 1 reports → succeeded=1, pending=1). - Add apply_state_rejects_older_timestamp unit test (previously claimed in docs but missing). - Fix split_once → rsplit_once in parse_state_key and seed_owned_targets (device IDs with dots would silently drop entries). --- .../tests/operator_recovery.rs | 183 +++++++++++++++--- .../src/fleet_aggregator.rs | 32 ++- 2 files changed, 183 insertions(+), 32 deletions(-) diff --git a/fleet/harmony-fleet-e2e/tests/operator_recovery.rs b/fleet/harmony-fleet-e2e/tests/operator_recovery.rs index ef6543b3..e145344c 100644 --- a/fleet/harmony-fleet-e2e/tests/operator_recovery.rs +++ b/fleet/harmony-fleet-e2e/tests/operator_recovery.rs @@ -1,10 +1,11 @@ //! Operator restart + aggregator recovery regression tests (v0.3 Ch2). //! -//! Each scenario in `docs/fleet-operator-recovery-scenarios.md` has a test -//! here. They drive `fleet_aggregator::run` in-process against the shared -//! NATS + k3d stack, aborting and respawning it to simulate operator -//! restarts. `run` owns its watchers in a `JoinSet`, so a cancelled -//! aggregator leaves no orphan tasks racing the next one. +//! Each test maps to a scenario in `docs/fleet-operator-recovery-scenarios.md`. +//! Structure per test: +//! +//! 1. SETUP — create devices + deployments, start first aggregator +//! 2. ACTION — kill/restart/delete (the failure being tested) +//! 3. ASSERT — verify convergence + expected end state //! //! Gated behind `HARMONY_FLEET_E2E=1` (needs k3d + podman on PATH). @@ -41,8 +42,6 @@ fn skip() { // ── Fixtures ──────────────────────────────────────────────────────────── -/// Shared stack with no in-cluster operator: the tests own the aggregator -/// lifecycle. CRDs are applied directly so the watchers have something to read. async fn recovery_stack() -> anyhow::Result> { let _ = tracing_subscriber::fmt() .with_env_filter( @@ -63,7 +62,6 @@ async fn recovery_stack() -> anyhow::Result anyhow::Result<()> { let client = Client::try_default().await?; let deployments: Api = Api::all(client.clone()); @@ -89,8 +87,6 @@ async fn admin_nats(stack: &harmony_fleet_e2e::Stack) -> anyhow::Result anyhow::Result<(JoinHandle<()>, OperatorLiveness)> { @@ -108,8 +104,6 @@ async fn spawn_aggregator( Ok((handle, liveness)) } -/// Abort an aggregator and wait for it (and its `JoinSet` children) to tear down -/// before the next one starts. async fn kill(handle: JoinHandle<()>) { handle.abort(); let _ = handle.await; @@ -164,7 +158,6 @@ fn deployment_matching(name: &str, group: &str) -> Deployment { } } -/// Poll until `f` resolves true or the budget elapses. async fn wait_until(budget: Duration, mut f: F) -> anyhow::Result<()> where F: FnMut() -> Fut, @@ -207,6 +200,16 @@ async fn cleanup(stack: &harmony_fleet_e2e::Stack, devices: &[&str], deployments } // ── Scenario 1: cold restart, healthy fleet ───────────────────────────── +// +// SETUP: 1 device (recov1-dev, group=g1) +// 1 deployment (recov1-depl, selector: g1) +// aggregator converges, agent reports Running +// +// ACTION: kill aggregator, start a new one +// +// ASSERT: desired-state survives restart (idempotent re-write) +// health counts rebuilt: succeeded=1, matched=1 +// liveness → Converged #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> { @@ -222,6 +225,8 @@ async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> { let (dev, depl, grp) = ("recov1-dev", "recov1-depl", "g1"); let _ = devices.delete(dev, &DeleteParams::default()).await; let _ = deployments.delete(depl, &DeleteParams::default()).await; + + // SETUP: create device + deployment, run first aggregator to convergence devices .create(&PostParams::default(), &device_with_label(dev, grp)) .await?; @@ -229,7 +234,6 @@ async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> { .create(&PostParams::default(), &deployment_matching(depl, grp)) .await?; - // First operator: converges and writes desired-state for the match. let (h1, l1) = spawn_aggregator(&stack).await?; wait_until(Duration::from_secs(30), || async { l1.is_converged() @@ -239,7 +243,7 @@ async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> { }) .await?; - // Simulate the agent having reported Running, then kill the operator. + // Simulate agent having reported Running let kv = AdminKv::connect(&admin_nats(&stack).await?).await?; let depl_name = DeploymentName::try_new(depl)?; kv.put_device_state(&DeploymentState { @@ -250,18 +254,17 @@ async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> { last_error: None, }) .await?; - kill(h1).await; - // Second operator: rebuilds from KV alone. + // ACTION: kill and restart aggregator + kill(h1).await; let (h2, l2) = spawn_aggregator(&stack).await?; wait_until(Duration::from_secs(30), || async { l2.is_converged() }).await?; - // Desired-state survives the restart (idempotent re-write, not churn). + // ASSERT: desired-state survives, health counts rebuilt from KV assert!( desired_state_present(&stack, dev, depl).await?, "desired-state must persist across restart" ); - // Health count is rebuilt from device-state KV alone. wait_until(Duration::from_secs(30), || async { deployments .get(depl) @@ -279,6 +282,17 @@ async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> { } // ── Scenario 2: CR deleted while operator down ────────────────────────── +// +// SETUP: 1 device (recov2-dev, group=g2) +// 1 deployment (recov2-depl, selector: g2) +// aggregator converges → desired-state written +// +// ACTION: kill aggregator +// force-delete the deployment CR (bypasses finalizer) +// start new aggregator +// +// ASSERT: orphan desired-state is GC'd at convergence +// (deployment no longer exists → no targets → entries purged) #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn aggregator_gcs_desired_state_for_deleted_cr() -> anyhow::Result<()> { @@ -294,6 +308,8 @@ async fn aggregator_gcs_desired_state_for_deleted_cr() -> anyhow::Result<()> { let (dev, depl, grp) = ("recov2-dev", "recov2-depl", "g2"); let _ = devices.delete(dev, &DeleteParams::default()).await; let _ = deployments.delete(depl, &DeleteParams::default()).await; + + // SETUP devices .create(&PostParams::default(), &device_with_label(dev, grp)) .await?; @@ -311,16 +327,16 @@ async fn aggregator_gcs_desired_state_for_deleted_cr() -> anyhow::Result<()> { .await?; kill(h1).await; - // CR force-deleted while down — no controller running, so no finalizer - // blocks it. The desired-state entry is now an orphan. + // ACTION: delete CR while operator is down, then restart deployments.delete(depl, &DeleteParams::default()).await?; assert!( desired_state_present(&stack, dev, depl).await?, "orphan desired-state should still be present before recovery" ); - // Recovery GCs the orphan once the CR list is replayed. let (h2, _l2) = spawn_aggregator(&stack).await?; + + // ASSERT: orphan is GC'd wait_until(Duration::from_secs(30), || async { !desired_state_present(&stack, dev, depl) .await @@ -334,6 +350,15 @@ async fn aggregator_gcs_desired_state_for_deleted_cr() -> anyhow::Result<()> { } // ── Scenario 3: two operators racing ──────────────────────────────────── +// +// SETUP: 1 device (recov3-dev, group=g3) +// 1 deployment (recov3-depl, selector: g3) +// +// ACTION: start TWO aggregators simultaneously (rolling-deploy overlap) +// +// ASSERT: both converge +// desired-state KV value is byte-identical to the serialized score +// (idempotent multi-writer, no leader election needed) #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn two_aggregators_produce_identical_desired_state() -> anyhow::Result<()> { @@ -349,6 +374,8 @@ async fn two_aggregators_produce_identical_desired_state() -> anyhow::Result<()> let (dev, depl, grp) = ("recov3-dev", "recov3-depl", "g3"); let _ = devices.delete(dev, &DeleteParams::default()).await; let _ = deployments.delete(depl, &DeleteParams::default()).await; + + // SETUP devices .create(&PostParams::default(), &device_with_label(dev, grp)) .await?; @@ -356,7 +383,7 @@ async fn two_aggregators_produce_identical_desired_state() -> anyhow::Result<()> .create(&PostParams::default(), &deployment_matching(depl, grp)) .await?; - // Two operators at once (rolling-deploy overlap). + // ACTION: two operators at once let (h_a, l_a) = spawn_aggregator(&stack).await?; let (h_b, l_b) = spawn_aggregator(&stack).await?; wait_until(Duration::from_secs(30), || async { @@ -364,8 +391,7 @@ async fn two_aggregators_produce_identical_desired_state() -> anyhow::Result<()> }) .await?; - // The KV value is the deterministic serialized score regardless of which - // operator wrote it — idempotent multi-writer, no leader election. + // ASSERT: KV value is deterministic regardless of writer let js = jetstream::new(admin_nats(&stack).await?); let bucket = js.get_key_value(BUCKET_DESIRED_STATE).await?; let key = desired_state_key(dev, &DeploymentName::try_new(depl)?); @@ -383,7 +409,106 @@ async fn two_aggregators_produce_identical_desired_state() -> anyhow::Result<()> Ok(()) } +// ── Scenario 4: device offline during restart ─────────────────────────── +// +// SETUP: 2 devices (recov4-dev-a reporting, recov4-dev-b silent) +// 1 deployment (recov4-depl, selector: g4, matches both) +// aggregator converges +// dev-a reports Running; dev-b never reports +// +// ACTION: kill aggregator, start a new one +// +// ASSERT: dev-a → succeeded (rebuilt from device-state KV) +// dev-b → pending (no state entry = pending) +// matched_device_count = 2 + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn device_offline_during_restart_counts_as_pending() -> anyhow::Result<()> { + if !e2e_enabled() { + skip(); + return Ok(()); + } + let stack = recovery_stack().await?; + let client = Client::try_default().await?; + let devices: Api = Api::all(client.clone()); + let deployments: Api = Api::namespaced(client, &stack.namespace); + + let (dev_a, dev_b, depl, grp) = ("recov4-dev-a", "recov4-dev-b", "recov4-depl", "g4"); + for d in [dev_a, dev_b] { + let _ = devices.delete(d, &DeleteParams::default()).await; + } + let _ = deployments.delete(depl, &DeleteParams::default()).await; + + // SETUP: 2 devices, 1 deployment matching both + devices + .create(&PostParams::default(), &device_with_label(dev_a, grp)) + .await?; + devices + .create(&PostParams::default(), &device_with_label(dev_b, grp)) + .await?; + deployments + .create(&PostParams::default(), &deployment_matching(depl, grp)) + .await?; + + let (h1, l1) = spawn_aggregator(&stack).await?; + wait_until(Duration::from_secs(30), || async { + l1.is_converged() + && desired_state_present(&stack, dev_a, depl) + .await + .unwrap_or(false) + && desired_state_present(&stack, dev_b, depl) + .await + .unwrap_or(false) + }) + .await?; + + // Only dev-a reports Running; dev-b stays silent + let kv = AdminKv::connect(&admin_nats(&stack).await?).await?; + let depl_name = DeploymentName::try_new(depl)?; + kv.put_device_state(&DeploymentState { + device_id: Id::from(dev_a), + deployment: depl_name.clone(), + phase: Phase::Running, + last_event_at: Utc::now(), + last_error: None, + }) + .await?; + + // ACTION: kill and restart aggregator + kill(h1).await; + let (h2, l2) = spawn_aggregator(&stack).await?; + wait_until(Duration::from_secs(30), || async { l2.is_converged() }).await?; + + // ASSERT: dev-a = succeeded, dev-b = pending, matched = 2 + wait_until(Duration::from_secs(30), || async { + deployments + .get(depl) + .await + .ok() + .and_then(|cr| cr.status?.aggregate) + .map(|a| { + a.matched_device_count == 2 && a.succeeded == 1 && a.pending == 1 + }) + .unwrap_or(false) + }) + .await?; + + kill(h2).await; + cleanup(&stack, &[dev_a, dev_b], &[depl]).await; + Ok(()) +} + // ── Scenario 5: chaos kill under write load ───────────────────────────── +// +// SETUP: 1 device (recov5-dev, group=g5) +// 5 deployments created incrementally (recov5-depl-0..4) +// +// ACTION: create deployments one by one +// kill + restart aggregator mid-stream (after depl-2) +// kill again after all created +// start final aggregator +// +// ASSERT: final aggregator converges all 5 desired-state entries within 30s #[tokio::test(flavor = "multi_thread", worker_threads = 4)] async fn chaos_kill_under_write_load_converges() -> anyhow::Result<()> { @@ -402,11 +527,13 @@ async fn chaos_kill_under_write_load_converges() -> anyhow::Result<()> { for n in &names { let _ = deployments.delete(n, &DeleteParams::default()).await; } + + // SETUP: 1 device devices .create(&PostParams::default(), &device_with_label(dev, grp)) .await?; - // Create deployments while killing/restarting the operator mid-stream. + // ACTION: create deployments while killing/restarting the operator let (mut handle, _) = spawn_aggregator(&stack).await?; for (i, n) in names.iter().enumerate() { deployments @@ -419,9 +546,11 @@ async fn chaos_kill_under_write_load_converges() -> anyhow::Result<()> { } kill(handle).await; - // A replica that stays up must converge the full set within 30s. + // Final aggregator must converge all 5 let (h_final, l_final) = spawn_aggregator(&stack).await?; wait_until(Duration::from_secs(30), || async { l_final.is_converged() }).await?; + + // ASSERT: all 5 desired-state entries present wait_until(Duration::from_secs(30), || async { for n in &names { if !desired_state_present(&stack, dev, n).await.unwrap_or(false) { diff --git a/fleet/harmony-fleet-operator/src/fleet_aggregator.rs b/fleet/harmony-fleet-operator/src/fleet_aggregator.rs index 3cede511..08b23d6c 100644 --- a/fleet/harmony-fleet-operator/src/fleet_aggregator.rs +++ b/fleet/harmony-fleet-operator/src/fleet_aggregator.rs @@ -264,7 +264,7 @@ async fn bucket_is_empty(bucket: &Store) -> bool { fn parse_state_key(key: &str) -> Option { let rest = key.strip_prefix("state.")?; - let (device, deployment) = rest.split_once('.')?; + let (device, deployment) = rest.rsplit_once('.')?; Some(DevicePair { device_id: device.to_string(), deployment: DeploymentName::try_new(deployment).ok()?, @@ -705,10 +705,10 @@ async fn seed_owned_targets(bucket: &Store, state: &SharedFleetState) -> anyhow: let mut keys = bucket.keys().await?; while let Some(key_res) = keys.next().await { let key = key_res?; - // Keys are `.`. The KV key space carries - // no namespace — names are globally unique at this layer — - // which is exactly why `owned_targets` keys by DeploymentName. - let Some((device, deployment)) = key.split_once('.') else { + // Keys are `.`. DeploymentName is dot-free + // (RFC1123), so rsplit_once is unambiguous even if device ids + // contain dots. + let Some((device, deployment)) = key.rsplit_once('.') else { warn!("Could not read device.deployment for key {key}"); continue; }; @@ -930,4 +930,26 @@ mod tests { assert_eq!(matched.len(), 1); assert!(matched.contains("pi-01")); } + + #[test] + fn apply_state_rejects_older_timestamp() { + let cached = cached("ns", "hello", "group", "edge"); + let mut s = FleetState::default(); + s.deployments.insert(cached.key.clone(), cached.clone()); + s.owned_targets.insert( + cached.deployment_name.clone(), + ["pi-01"].iter().map(|s| s.to_string()).collect(), + ); + + let p = pair("pi-01", "hello"); + apply_state(&mut s, p.clone(), state("pi-01", "hello", Phase::Running, 10)); + assert_eq!(s.states[&p].phase, Phase::Running); + + apply_state(&mut s, p.clone(), state("pi-01", "hello", Phase::Failed, 5)); + assert_eq!( + s.states[&p].phase, + Phase::Running, + "older event must not overwrite newer state" + ); + } } -- 2.39.5