From 8648d05ff7a8c38d975a11303438b6a1acee6ece Mon Sep 17 00:00:00 2001 From: Jean-Gabriel Gill-Couture Date: Sun, 24 May 2026 15:27:38 -0400 Subject: [PATCH 1/2] docs(fleet): enumerate operator recovery scenarios Tabletop inventory of every failure mode the fleet operator must survive on restart, re-schedule, or upgrade. Companion to v0.3 roadmap Chapter 2; each scenario lists trigger, expected behavior, code-path citation, current test coverage, and risk classification. Step 1 of Chapter 2. Steps 3-5 (stale-KV reconciliation, leader election, liveness signalling) deferred to follow-up PRs and tagged "Phase 2 work" in the table. --- docs/fleet-operator-recovery-scenarios.md | 345 ++++++++++++++++++++++ 1 file changed, 345 insertions(+) create mode 100644 docs/fleet-operator-recovery-scenarios.md diff --git a/docs/fleet-operator-recovery-scenarios.md b/docs/fleet-operator-recovery-scenarios.md new file mode 100644 index 00000000..6498e9a8 --- /dev/null +++ b/docs/fleet-operator-recovery-scenarios.md @@ -0,0 +1,345 @@ +# Fleet operator recovery scenarios + +Inventory of every failure shape the IoT operator pod must survive on restart, +re-schedule, or upgrade. Companion to ROADMAP `v0_3_plan.md` **Chapter 2 — +Operator restart + aggregator recovery**. + +The operator's in-memory aggregate (`Phase`, `DeploymentAggregate`, per-device +state) is rebuilt from scratch on every startup by watching the four NATS KV +buckets: + +- `desired-state` — operator-written, `.` keys +- `device-info` — agent-written, static-ish facts +- `device-state` — agent-written, per `(device, deployment)` phase +- `device-heartbeat` — agent liveness pings + +The aggregator entry point is `harmony_fleet_operator::fleet_aggregator::run` +(see `fleet/harmony-fleet-operator/src/fleet_aggregator.rs`). + +## Convention used in this document + +- **Code path** cites the most relevant file and line range in + `fleet/harmony-fleet-operator/src/fleet_aggregator.rs` (call site `FA:Lxxx`). + Other crates use `crate:path:Lxxx`. +- **Coverage** points at a smoke script under `fleet/scripts/` or a test under + `fleet/harmony-fleet-e2e/tests/`. `none` means we currently rely on it + working by inspection. +- **Risk** is impact if mishandled, not likelihood. `high` means a customer + could see stale or wrong data on the dashboard; `medium` means the operator + self-heals but logs a noisy error; `low` means transient correctness with + no customer-visible effect. + +## Scenarios + +| # | Name | Risk | Coverage | +|---|-------------------------------------------------|---------|----------| +| 1 | Cold restart with full KV | high | this PR (`tests/operator_restart.rs`) | +| 2 | Cold restart with desired-state seed only | medium | none | +| 3 | Partial KV — device offline during restart | high | none | +| 4 | Stale KV — Deployment CR deleted while down | high | none — Phase 2 | +| 5 | Stale KV — Device CR deleted while down | medium | none — Phase 2 | +| 6 | Selector change while operator is down | high | none | +| 7 | Two operator pods racing on rolling deploy | high | none — Phase 2 (leader election) | +| 8 | NATS reconnect mid-rebuild | medium | partial (`async_nats` retries; no test) | +| 9 | NATS stream loss after rebuild | high | none | +| 10 | KV revision wraparound | low | none | +| 11 | Malformed `device-state` payload | medium | `FA:L266-271` swallow path | +| 12 | Malformed `desired-state` payload | low | `FA:L619-625` swallow path | +| 13 | Invalid `DeploymentName` in KV key | low | `FA:L623-625` swallow path | +| 14 | High write load during rebuild (slow rebuild) | medium | none | +| 15 | Missing KV bucket on startup | low | created on first run (`FA:L154-164`) | +| 16 | Concurrent CR mutation during rebuild | medium | none | +| 17 | Heartbeat-only liveness (no state) on restart | low | none | +| 18 | Aggregator panics on the kube patch path | medium | none — Phase 2 (liveness signal) | +| 19 | Kube apiserver unreachable mid-rebuild | high | none | +| 20 | Operator killed mid-`patch_tick` | medium | none | + +--- + +### 1. Cold restart with full KV + +- **Trigger.** Operator pod is killed (OOM, node reschedule, `kubectl rollout + restart`) when every device has previously published `device-info` and + `device-state`, and every desired-state KV entry the previous operator + wrote is still present. +- **Expected behavior.** New operator replays `device-state` via + `bucket.watch_with_history(">")` (`FA:L251`) and seeds `owned_targets` from + `desired-state` via `seed_owned_targets` (`FA:L611-633`). After both kube + watchers fire `Event::InitDone`, the aggregate converges to byte-identical + status patches. +- **Code path.** `run` (`FA:L152-235`); `seed_owned_targets` (`FA:L611-633`); + `run_state_kv_watcher` (`FA:L250-282`); `run_deployment_watcher` + (`FA:L358-376`); `run_device_watcher` (`FA:L466-484`); `patch_tick` + (`FA:L640-679`). +- **Coverage.** `fleet/harmony-fleet-e2e/tests/operator_restart.rs` (this PR). +- **Risk.** **High.** The customer-facing happy path. If this regresses, + every dashboard reads stale or empty status after an upgrade. + +### 2. Cold restart with desired-state seed only + +- **Trigger.** Operator restart at a moment when `desired-state` has entries + from a previous run but `device-state` is empty (all agents asleep or + freshly provisioned). +- **Expected behavior.** `owned_targets` is seeded correctly. Aggregate + reports `matched_device_count = N`, `pending = N`, no false-positive + `Running` counts. +- **Code path.** `seed_owned_targets` (`FA:L611-633`); `compute_aggregate` + (`FA:L684-710`). +- **Coverage.** none. +- **Risk.** **Medium.** Surfaces as transient over-`pending` until agents + re-publish. Self-heals on the next state watch delivery. + +### 3. Partial KV — device offline during restart + +- **Trigger.** A device was running and reporting before the operator went + down; it has now lost power or NATS connectivity. Its `device-state` entry + is still present (KV is persistent), but it can no longer republish. +- **Expected behavior.** Operator replays the cached state and renders the + device as `Running` (or whatever the last phase was) until the agent comes + back. Dashboard does not show the device as "missing" unless the heartbeat + bucket says so. Phase 2 will surface staleness via the heartbeat watcher. +- **Code path.** `apply_state` (`FA:L286-323`); no heartbeat watcher yet. +- **Coverage.** none. +- **Risk.** **High.** A long-offline device that the operator believes is + `Running` could mask a real incident. Mitigation deferred to Chapter 2 + liveness signalling. + +### 4. Stale KV — Deployment CR deleted while operator was down + +- **Trigger.** While the operator pod is down, a customer deletes a + `Deployment` CR. The CR finalizer never gets to run because no controller + is alive to process it (the apiserver waits). When the operator restarts, + the CR is in `Terminating` with a finalizer; the corresponding + `desired-state..` keys are still in NATS. +- **Expected behavior.** Operator processes the `Event::Delete` for the CR + (`FA:L369`), drops `owned_targets`, deletes desired-state entries + (`FA:L450-455`), removes the finalizer. Agents observe the KV delete and + reconcile-tear-down. +- **Code path.** `on_deployment_delete` (`FA:L428-456`). Controller-side + finalizer in `controller.rs` does a belt-and-braces scan of the same + prefix. +- **Coverage.** none — **Phase 2 work** (stale-KV reconciliation rule, step + 3 of Chapter 2). Not in this PR. +- **Risk.** **High.** Orphaned KV entries make agents reconcile a long-dead + Deployment forever. Customer-visible as "I deleted it but it's still + running." + +### 5. Stale KV — Device CR deleted while operator was down + +- **Trigger.** Device CR is deleted (admin removes a Pi from the fleet) while + the operator is down. `device-info` entry may also have been deleted + separately; if so, the operator never rebuilds a Device CR for it. +- **Expected behavior.** Stale `desired-state` entries keyed on the deleted + device should be cleaned up. Today they're not — the operator only deletes + them on a live `Event::Delete` for the Device (`FA:L552-576`). Phase 2 + reconciliation must walk `owned_targets` after init and prune entries with + no matching device. +- **Code path.** `on_device_delete` (`FA:L552-576`). No init-time prune. +- **Coverage.** none — **Phase 2 work**. +- **Risk.** **Medium.** Smaller blast radius than #4 because the device is + also gone; nothing reconciles against the orphan key. + +### 6. Selector change while operator is down + +- **Trigger.** Customer edits `spec.targetSelector` on a CR while the + operator pod is down. On restart, watcher delivers a single + `Event::Apply` for the updated CR (kube collapses the history). +- **Expected behavior.** Operator computes new matched set, diffs against + seeded `owned_targets`, writes new desired-state entries, deletes + newly-orphaned ones. `reconcile_kv` (`FA:L582-604`) is responsible. +- **Code path.** `on_deployment_upsert` (`FA:L378-426`); `reconcile_kv` + (`FA:L582-604`). +- **Coverage.** none. Critical because the seed step is what makes the diff + correct on a cold restart — without `seed_owned_targets`, a selector + reduction would leak orphan entries. +- **Risk.** **High.** Orphan keys reach agents that no longer match the + selector, causing them to run a deployment they shouldn't. + +### 7. Two operator pods racing on rolling deploy + +- **Trigger.** A `kubectl rollout restart deploy/harmony-fleet-operator` + briefly runs the old and new pods in parallel. Both watch the same KV + and CRs, both write desired-state entries. +- **Expected behavior.** Writes are idempotent and byte-deterministic + (`reconcile_kv` is a put-or-delete on the same content). Status patches + via `Patch::Merge` collide harmlessly. **However**, the loser's + `owned_targets` snapshot can lag and re-delete a key the winner just + wrote, causing flap. +- **Code path.** `reconcile_kv` (`FA:L582-604`); kube patch (`FA:L655-666`). +- **Coverage.** none — **Phase 2 work** (leader election decision, step 4 + of Chapter 2). Not in this PR. +- **Risk.** **High.** Customer sees the dashboard flicker during a rolling + upgrade. Self-heals once the old pod terminates. + +### 8. NATS reconnect mid-rebuild + +- **Trigger.** Operator's NATS connection drops during cold rebuild. + `async_nats` reconnects transparently. The KV `watch_with_history(">")` + call has already returned a `Stream`; the underlying connection drop + surfaces as a delivery error on the stream. +- **Expected behavior.** Stream loop logs the error and continues + (`FA:L256-258`). The history replay may be incomplete — a follow-up watch + refresh would be needed to guarantee correctness. +- **Code path.** `run_state_kv_watcher` (`FA:L250-282`). +- **Coverage.** partial — `async_nats` reconnect is covered by its own + tests; no operator-level test asserts post-reconnect convergence. +- **Risk.** **Medium.** The watch stream may silently miss messages until a + manual restart. + +### 9. NATS stream loss after rebuild + +- **Trigger.** NATS server crashes or the JetStream stream is deleted + out-of-band after the operator has finished its cold rebuild. +- **Expected behavior.** Bucket re-creation on first reconnect + (`create_key_value` is idempotent, `FA:L154-164`). Operator should detect + the empty stream, clear in-memory state, and rebuild. Today the watcher + loop exits silently and `select!` cancels the process. +- **Code path.** `run` (`FA:L229-234`). +- **Coverage.** none. +- **Risk.** **High.** Possible silent data loss on a NATS incident. + +### 10. KV revision wraparound + +- **Trigger.** NATS JetStream KV uses a `u64` revision counter. At ~10 + Hz it would take ~58 billion years to wrap. Included for completeness; + practical only with a corrupted stream. +- **Expected behavior.** No special handling needed. +- **Code path.** N/A. +- **Coverage.** none. +- **Risk.** **Low.** Theoretical. + +### 11. Malformed `device-state` payload + +- **Trigger.** A buggy agent (or a manual `nats kv put`) writes a value to + `state..` that doesn't deserialize as + `DeploymentState`. +- **Expected behavior.** Operator logs `aggregator: bad device_state + payload` and skips the entry. +- **Code path.** `run_state_kv_watcher` deserialize arm (`FA:L266-271`). +- **Coverage.** none. The error path is exercised at compile time only. +- **Risk.** **Medium.** A single bad entry shouldn't poison the whole + rebuild; today's swallow-and-log handles this. Should be unit-tested. + +### 12. Malformed `desired-state` payload + +- **Trigger.** A previous operator wrote a value that no longer matches + the current `ReconcileScore` shape (older version, manual mutation). +- **Expected behavior.** `seed_owned_targets` doesn't deserialize the + value — it only reads keys. The next CR upsert from kube rewrites it. +- **Code path.** `seed_owned_targets` (`FA:L611-633`). +- **Coverage.** none. +- **Risk.** **Low.** Score format evolution is covered by CR validation; + the KV is a derived projection. + +### 13. Invalid `DeploymentName` in KV key + +- **Trigger.** A key like `pi-01.hello.world` snuck into the bucket + (multiple dots) — manual mutation or an older operator version that + didn't validate names. +- **Expected behavior.** `seed_owned_targets` logs `Invalid deployment + name for key …` and skips it (`FA:L623-625`). +- **Code path.** `seed_owned_targets` (`FA:L619-632`). +- **Coverage.** none. +- **Risk.** **Low.** Belt-and-braces, the CR layer already enforces this + via `DeploymentName::try_new`. + +### 14. High write load during rebuild + +- **Trigger.** Hundreds of devices publishing `device-state` updates per + second while the operator is rebuilding. The watch history replay races + the live stream. +- **Expected behavior.** Deliveries are ordered last-writer-wins; the + per-pair `last_event_at` dedup in `apply_state` (`FA:L287-291`) prevents + out-of-order entries from clobbering newer ones. +- **Code path.** `apply_state` (`FA:L286-323`). +- **Coverage.** none. No load test exists. +- **Risk.** **Medium.** Likely fine in practice given the dedup, but + unverified at scale. + +### 15. Missing KV bucket on startup + +- **Trigger.** First-ever operator start on a fresh NATS cluster, or after + someone wiped JetStream state. +- **Expected behavior.** `create_key_value` is idempotent — creates the + bucket if absent, no-ops if present. +- **Code path.** `run` (`FA:L153-164`). +- **Coverage.** implicit in every smoke run that starts from a clean NATS. + `smoke-a1.sh:182-195` asserts `KV bucket ready` log. +- **Risk.** **Low.** Idiomatic NATS pattern. + +### 16. Concurrent CR mutation during rebuild + +- **Trigger.** User applies a new Deployment CR while the operator is still + replaying KV history. +- **Expected behavior.** Kube watcher delivers the `Event::Apply` after + `Event::InitDone`; the upsert handler runs against the partially-seeded + state and correctly diffs against any matching seeded targets. +- **Code path.** `on_deployment_upsert` (`FA:L378-426`). +- **Coverage.** none. +- **Risk.** **Medium.** Possible race between KV seed and CR init; today + the locking in `state.lock().await` serializes both, but the order in + which they observe state is not asserted. + +### 17. Heartbeat-only liveness (no state) on restart + +- **Trigger.** Device has been publishing heartbeats but has no deployments + assigned. Operator restart finds heartbeats but no `device-state` or + `desired-state` entries for it. +- **Expected behavior.** Device is recognized via its `Device` CR + (rebuilt from `device-info` in `device_reconciler.rs`) and shown idle. + No phase counts. The heartbeat bucket isn't watched by the aggregator. +- **Code path.** `device_reconciler` (separate from this module). +- **Coverage.** none. +- **Risk.** **Low.** Expected dashboard rendering. + +### 18. Aggregator panics on the kube patch path + +- **Trigger.** A bug or upstream change makes `patch_status` panic. Tokio + unwinds the spawned task; the process keeps running because of `select!` + — but the patcher silently stops. +- **Expected behavior.** Process should exit-and-restart on any subsystem + failure. The dashboard should also surface "operator unhealthy" so a + customer doesn't trust stale status. +- **Code path.** `patch_tick` (`FA:L640-679`); `run` select (`FA:L229-234`). +- **Coverage.** none — **Phase 2 work** (liveness signalling, step 5 of + Chapter 2). Not in this PR. +- **Risk.** **Medium.** Status freezes silently; depends on dashboard + noticing the lack of updates. + +### 19. Kube apiserver unreachable mid-rebuild + +- **Trigger.** Apiserver hiccup during operator startup. `Api::list` or + the initial `watcher::watcher` invocation fails. +- **Expected behavior.** Watcher loop logs and exits; `select!` cancels + the process; k8s restarts the pod with exponential backoff. +- **Code path.** `run_deployment_watcher` (`FA:L362-374`); + `run_device_watcher` (`FA:L470-482`). +- **Coverage.** none. +- **Risk.** **High.** A flapping apiserver can keep the operator from + ever reaching steady state. + +### 20. Operator killed mid-`patch_tick` + +- **Trigger.** Pod terminated between draining the `dirty` set + (`FA:L643`) and the actual `patch_status` calls (`FA:L656-666`). +- **Expected behavior.** Lost dirty entries are re-marked on the next KV + watch delivery. Worst case is a one-tick lag in `.status.aggregate` — + the patch tick runs at 1 Hz. +- **Code path.** `patch_tick` (`FA:L640-679`). +- **Coverage.** none. +- **Risk.** **Medium.** Self-heals on next event, but unverified. + +## Phase 2 follow-ups (out of scope for this PR) + +The Chapter 2 roadmap lists five steps. This PR ships only steps 1 and 2. + +| Step | What it does | Scenarios it closes | +|------|--------------|---------------------| +| 1. Scenario inventory (this doc) | — | covers all 20 above by enumeration | +| 2. Cold-restart regression test | gates #1 in CI | #1 | +| 3. Stale-KV reconciliation rule | init-time prune of orphan keys | #4, #5 | +| 4. Leader election decision | single-writer or idempotent multi-writer | #7 | +| 5. Liveness signalling | dashboard "operator converging" banner | #18, parts of #19/#20 | + +Each Phase 2 step is its own PR. The scenarios above tagged "Phase 2 work" +are the entry points. -- 2.39.5 From 13e5549d6b73cb26b55b9268fc82beacb14eb33f Mon Sep 17 00:00:00 2001 From: Jean-Gabriel Gill-Couture Date: Sun, 24 May 2026 15:27:49 -0400 Subject: [PATCH 2/2] test(fleet-e2e): cold-restart regression baseline for operator Adds the operator_restart integration test that gates scenario #1 in docs/fleet-operator-recovery-scenarios.md: when the operator Pod is killed and replaced, the new instance must rebuild Deployment status + desired-state KV from NATS alone, byte-for-byte matching the pre-kill snapshot within 30 s. Pattern: deploy via FleetOperatorScore (no handrolled manifests), seed a Device + Deployment CR, wait for first patch, snapshot the aggregate counts + desired-state bytes, delete the operator pod, wait for the replacement Ready, then poll the snapshot until it matches or the budget elapses. Gated behind HARMONY_FLEET_E2E=1 so cargo test --workspace stays cheap; runs in its own test binary to isolate the pod-kill blast radius from the existing operator suite. Step 2 of v0.3 Chapter 2. Steps 3-5 deferred. --- fleet/harmony-fleet-e2e/Cargo.toml | 4 + .../tests/operator_restart.rs | 345 ++++++++++++++++++ 2 files changed, 349 insertions(+) create mode 100644 fleet/harmony-fleet-e2e/tests/operator_restart.rs diff --git a/fleet/harmony-fleet-e2e/Cargo.toml b/fleet/harmony-fleet-e2e/Cargo.toml index f11246e0..0ba495d0 100644 --- a/fleet/harmony-fleet-e2e/Cargo.toml +++ b/fleet/harmony-fleet-e2e/Cargo.toml @@ -18,6 +18,10 @@ path = "tests/ping.rs" name = "operator" path = "tests/operator.rs" +[[test]] +name = "operator_restart" +path = "tests/operator_restart.rs" + [[test]] name = "vm_ping" path = "tests/vm_ping.rs" diff --git a/fleet/harmony-fleet-e2e/tests/operator_restart.rs b/fleet/harmony-fleet-e2e/tests/operator_restart.rs new file mode 100644 index 00000000..3e28feaa --- /dev/null +++ b/fleet/harmony-fleet-e2e/tests/operator_restart.rs @@ -0,0 +1,345 @@ +//! Cold-restart regression test for the fleet operator's aggregator. +//! +//! Gates scenario #1 from `docs/fleet-operator-recovery-scenarios.md`: +//! when the operator Pod is killed and replaced, the new instance must +//! rebuild the aggregate state (Deployment status + `desired-state` +//! KV) from NATS alone, byte-for-byte matching the pre-kill snapshot, +//! within 30 seconds. +//! +//! Why a regression test rather than a unit test on `fleet_aggregator`: +//! the failure mode this guards against is the *integration* between +//! `seed_owned_targets` and the kube CR watcher init — the former +//! reads NATS, the latter delivers `Event::InitApply` for every CR. +//! Both feed `owned_targets` and the dirty set; a regression in either +//! seed shows up only when the operator restarts against a populated +//! KV plus live CRs. +//! +//! Shape (matches `operator.rs`): per-test-binary `OnceCell` stack with +//! `deploy_operator = true`, env-gated on `HARMONY_FLEET_E2E=1` so +//! `cargo test --workspace` stays cheap. Lives in its own test binary +//! so the operator-pod kill below doesn't interact with other tests. +//! +//! Run explicitly: +//! +//! ```bash +//! HARMONY_FLEET_E2E=1 cargo test -p harmony-fleet-e2e \ +//! --test operator_restart -- --nocapture +//! ``` + +use harmony::modules::fleet::operator::crd::{ + Deployment, DeploymentSpec, Device, DeviceSpec, PodmanService, PodmanV0Score, ReconcileScore, + Rollout, RolloutStrategy, +}; +use harmony_fleet_deploy::operator::RELEASE_NAME as OPERATOR_RELEASE_NAME; +use harmony_fleet_e2e::{StackOptions, shared_stack}; +use harmony_reconciler_contracts::{BUCKET_DESIRED_STATE, DeploymentName, desired_state_key}; +use k8s_openapi::api::apps::v1::Deployment as K8sDeployment; +use k8s_openapi::api::core::v1::Pod; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector; +use kube::Client; +use kube::api::{Api, DeleteParams, ListParams, ObjectMeta, PostParams}; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +const E2E_ENV: &str = "HARMONY_FLEET_E2E"; + +const DEVICE_NAME: &str = "restart-device"; +const DEPLOYMENT_NAME: &str = "restart-deployment"; +const CONVERGE_BUDGET: Duration = Duration::from_secs(30); + +fn e2e_enabled() -> bool { + matches!(std::env::var(E2E_ENV).as_deref(), Ok("1" | "true")) +} + +/// Snapshot of operator-owned state at a point in time. Captured pre- +/// kill, re-captured post-restart, and asserted equal. +/// +/// Equality on this struct is the test's convergence oracle. The +/// fields are chosen so that any aggregator regression — desired-state +/// drift, lost status patch, orphan KV entry — surfaces here rather +/// than in a vague timing flake. +#[derive(Debug, Clone, PartialEq, Eq)] +struct OperatorState { + /// `(matched, succeeded, failed, pending)` from + /// `Deployment.status.aggregate`. We compare the tuple rather + /// than the full `DeploymentAggregate` struct because + /// `last_error.at` is an RFC 3339 string that can churn on + /// re-patch even when nothing material changed; the rebuilt + /// operator may write a fresh timestamp. The counters are the + /// load-bearing assertion. + aggregate: (u32, u32, u32, u32), + /// The `desired-state` KV value for the (device, deployment) pair + /// we created in this test. `Some(bytes)` if the key exists. + desired_state_bytes: Option>, +} + +// Behavioral assertion: cold-restart preserves the aggregate. +// +// The test's contract is "operator pod can die at any moment and the +// next pod converges back to byte-identical state within 30 s." If +// this fails, scenario #1 in the recovery doc has regressed; check +// `seed_owned_targets` and the kube CR watcher init paths first. +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn cold_restart_preserves_deployment_aggregate() -> anyhow::Result<()> { + if !e2e_enabled() { + skip_e2e(); + return Ok(()); + } + + let stack = operator_stack().await?; + let client = Client::try_default().await?; + + let devices: Api = Api::all(client.clone()); + let deployments: Api = Api::namespaced(client.clone(), &stack.namespace); + + // Steady-state setup: one device, one deployment that matches it. + // We deliberately use an empty selector — every Device matches + // every Deployment — because the assertion we care about is the + // matched-count survival, not selector semantics (covered by + // `operator.rs`). + create_device(&devices, DEVICE_NAME).await?; + create_fleet_deployment(&deployments, DEPLOYMENT_NAME).await?; + + // Wait for the operator to have done one full reconcile cycle + // before we kill it. Without this the pre-kill snapshot can be + // empty, making the post-restart equality vacuous. + wait_for_first_patch(&deployments, DEPLOYMENT_NAME).await?; + let pre_kill = snapshot_state(&stack, &deployments, DEVICE_NAME, DEPLOYMENT_NAME).await?; + eprintln!("[operator_restart] pre-kill snapshot: {pre_kill:?}"); + + // Force a cold restart. Deleting the underlying Pod is more + // realistic than `delete deployment`: it mirrors what k8s does on + // a node drain or OOM kill — the Deployment controller spins up + // a fresh Pod with no state carry-over. + delete_operator_pod(&client, &stack.namespace).await?; + wait_for_operator_ready(&client, &stack.namespace).await?; + + // Convergence wait. The aggregator rebuilds asynchronously; we + // re-snapshot until it matches or the budget elapses. The + // tolerance is binary: aggregate counts + KV bytes must match + // exactly. A relaxed assertion would mask the very regressions + // this test exists to catch. + let deadline = Instant::now() + CONVERGE_BUDGET; + let mut post_restart; + loop { + post_restart = snapshot_state(&stack, &deployments, DEVICE_NAME, DEPLOYMENT_NAME).await?; + if post_restart == pre_kill { + break; + } + if Instant::now() >= deadline { + anyhow::bail!( + "operator failed to converge within {:?}; \ + pre-kill={:?}, post-restart={:?}", + CONVERGE_BUDGET, + pre_kill, + post_restart, + ); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } + eprintln!("[operator_restart] converged: {post_restart:?}"); + + Ok(()) +} + +async fn operator_stack() -> anyhow::Result> { + let _ = tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .try_init(); + + // `deploy_agent: false` — the aggregator's cold-rebuild correctness + // does not depend on any agent reporting `device-state`. The CR + // watch path alone seeds `matched_device_count` from a Device CR + // we create directly. Adding a real agent would slow the test + // and add a second independent failure surface. + let stack = shared_stack(StackOptions { + deploy_agent: false, + deploy_operator: true, + ..StackOptions::default() + }) + .await?; + + stack.print_debug_info(); + Ok(stack) +} + +fn skip_e2e() { + eprintln!( + "skipping {E2E_ENV}-gated e2e test (set {E2E_ENV}=1 to run; \ + requires k3d + podman on PATH)" + ); +} + +async fn create_device(devices: &Api, name: &str) -> anyhow::Result<()> { + let device = Device::new(name, DeviceSpec { inventory: None }); + devices.create(&PostParams::default(), &device).await?; + Ok(()) +} + +async fn create_fleet_deployment(deployments: &Api, name: &str) -> anyhow::Result<()> { + let deployment = Deployment { + metadata: ObjectMeta { + name: Some(name.to_string()), + ..Default::default() + }, + spec: DeploymentSpec { + target_selector: LabelSelector::default(), + score: ReconcileScore::PodmanV0(PodmanV0Score { + services: vec![PodmanService { + name: "hello".to_string(), + image: "docker.io/library/hello-world:latest".to_string(), + ports: vec![], + env: vec![], + volumes: vec![], + restart_policy: Default::default(), + }], + }), + rollout: Rollout { + strategy: RolloutStrategy::Immediate, + }, + }, + status: None, + }; + + deployments + .create(&PostParams::default(), &deployment) + .await?; + Ok(()) +} + +/// Block until the operator's `patch_tick` has written a non-empty +/// `aggregate` at least once. The aggregator only patches CRs whose +/// dirty set picked them up; the first patch lands within one +/// `PATCH_TICK` (1 s) after the kube watcher emits `Event::InitApply` +/// for the new Deployment + Device. +async fn wait_for_first_patch(deployments: &Api, name: &str) -> anyhow::Result<()> { + let deadline = Instant::now() + Duration::from_secs(30); + loop { + let cr = deployments.get(name).await?; + if let Some(status) = cr.status + && let Some(agg) = status.aggregate + && agg.matched_device_count >= 1 + { + return Ok(()); + } + if Instant::now() >= deadline { + anyhow::bail!("timed out waiting for first aggregate patch on {name}"); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } +} + +async fn snapshot_state( + stack: &harmony_fleet_e2e::Stack, + deployments: &Api, + device_id: &str, + deployment_name: &str, +) -> anyhow::Result { + let cr = deployments.get(deployment_name).await?; + let aggregate = cr + .status + .and_then(|s| s.aggregate) + .map(|a| (a.matched_device_count, a.succeeded, a.failed, a.pending)) + .unwrap_or((0, 0, 0, 0)); + + let nats_client = connect_admin_nats(stack).await?; + let js = async_nats::jetstream::new(nats_client); + let desired_state = js.get_key_value(BUCKET_DESIRED_STATE).await?; + let dn = DeploymentName::try_new(deployment_name)?; + let key = desired_state_key(device_id, &dn); + let desired_state_bytes = desired_state.get(&key).await?.map(|b| b.to_vec()); + + Ok(OperatorState { + aggregate, + desired_state_bytes, + }) +} + +/// Delete every Pod owned by the operator Deployment. The Deployment +/// controller restarts them; we wait for the replacement Ready in the +/// next helper. +/// +/// We avoid `delete deployment` because it would also tear down the +/// Service + RBAC the harness set up, and the shared stack would lose +/// those across the test boundary. +async fn delete_operator_pod(client: &Client, namespace: &str) -> anyhow::Result<()> { + let pods: Api = Api::namespaced(client.clone(), namespace); + let selector = format!("app.kubernetes.io/name={OPERATOR_RELEASE_NAME}"); + let list = pods.list(&ListParams::default().labels(&selector)).await?; + if list.items.is_empty() { + anyhow::bail!("no operator pods found in namespace {namespace} with selector {selector}"); + } + for pod in list.items { + if let Some(name) = pod.metadata.name { + pods.delete(&name, &DeleteParams::default()).await?; + eprintln!("[operator_restart] deleted operator pod {namespace}/{name}"); + } + } + Ok(()) +} + +async fn wait_for_operator_ready(client: &Client, namespace: &str) -> anyhow::Result<()> { + let api: Api = Api::namespaced(client.clone(), namespace); + let deadline = Instant::now() + Duration::from_secs(120); + loop { + // Both `ready_replicas` AND `observed_generation == generation` + // — a stale `ready_replicas=1` may linger from before the + // pod-delete propagated to status. Comparing generations + // pins us on the *post-delete* state. + let d = api.get(OPERATOR_RELEASE_NAME).await?; + let observed_ok = d + .status + .as_ref() + .and_then(|s| s.observed_generation) + .zip(d.metadata.generation) + .is_some_and(|(observed, current)| observed >= current); + let pods_ok = d + .status + .as_ref() + .and_then(|s| s.ready_replicas) + .unwrap_or(0) + >= 1; + if observed_ok && pods_ok { + // Belt-and-braces: confirm at least one new Pod is + // actually Running, not just `ready_replicas=1` leftover + // from the stale pod's grace period. + if running_operator_pod_count(client, namespace).await? >= 1 { + return Ok(()); + } + } + if Instant::now() >= deadline { + anyhow::bail!("operator Deployment did not become Ready within 120s after pod delete"); + } + tokio::time::sleep(Duration::from_secs(1)).await; + } +} + +async fn running_operator_pod_count(client: &Client, namespace: &str) -> anyhow::Result { + let pods: Api = Api::namespaced(client.clone(), namespace); + let selector = format!("app.kubernetes.io/name={OPERATOR_RELEASE_NAME}"); + let list = pods.list(&ListParams::default().labels(&selector)).await?; + Ok(list + .items + .iter() + .filter(|p| { + p.status + .as_ref() + .and_then(|s| s.phase.as_deref()) + .map(|phase| phase == "Running") + .unwrap_or(false) + }) + .count()) +} + +async fn connect_admin_nats( + stack: &harmony_fleet_e2e::Stack, +) -> anyhow::Result { + async_nats::ConnectOptions::new() + .user_and_password(stack.admin_user.clone(), stack.admin_pass.clone()) + .connect(&stack.nats_url) + .await + .map_err(Into::into) +} -- 2.39.5