feat/v0-3-operator-restart-baseline #294
345
docs/fleet-operator-recovery-scenarios.md
Normal file
345
docs/fleet-operator-recovery-scenarios.md
Normal file
@@ -0,0 +1,345 @@
|
||||
# Fleet operator recovery scenarios
|
||||
|
||||
Inventory of every failure shape the IoT operator pod must survive on restart,
|
||||
re-schedule, or upgrade. Companion to ROADMAP `v0_3_plan.md` **Chapter 2 —
|
||||
Operator restart + aggregator recovery**.
|
||||
|
||||
The operator's in-memory aggregate (`Phase`, `DeploymentAggregate`, per-device
|
||||
state) is rebuilt from scratch on every startup by watching the four NATS KV
|
||||
buckets:
|
||||
|
||||
- `desired-state` — operator-written, `<device>.<deployment>` keys
|
||||
- `device-info` — agent-written, static-ish facts
|
||||
- `device-state` — agent-written, per `(device, deployment)` phase
|
||||
- `device-heartbeat` — agent liveness pings
|
||||
|
||||
The aggregator entry point is `harmony_fleet_operator::fleet_aggregator::run`
|
||||
(see `fleet/harmony-fleet-operator/src/fleet_aggregator.rs`).
|
||||
|
||||
## Convention used in this document
|
||||
|
||||
- **Code path** cites the most relevant file and line range in
|
||||
`fleet/harmony-fleet-operator/src/fleet_aggregator.rs` (call site `FA:Lxxx`).
|
||||
Other crates use `crate:path:Lxxx`.
|
||||
- **Coverage** points at a smoke script under `fleet/scripts/` or a test under
|
||||
`fleet/harmony-fleet-e2e/tests/`. `none` means we currently rely on it
|
||||
working by inspection.
|
||||
- **Risk** is impact if mishandled, not likelihood. `high` means a customer
|
||||
could see stale or wrong data on the dashboard; `medium` means the operator
|
||||
self-heals but logs a noisy error; `low` means transient correctness with
|
||||
no customer-visible effect.
|
||||
|
||||
## Scenarios
|
||||
|
||||
| # | Name | Risk | Coverage |
|
||||
|---|-------------------------------------------------|---------|----------|
|
||||
| 1 | Cold restart with full KV | high | this PR (`tests/operator_restart.rs`) |
|
||||
| 2 | Cold restart with desired-state seed only | medium | none |
|
||||
| 3 | Partial KV — device offline during restart | high | none |
|
||||
| 4 | Stale KV — Deployment CR deleted while down | high | none — Phase 2 |
|
||||
| 5 | Stale KV — Device CR deleted while down | medium | none — Phase 2 |
|
||||
| 6 | Selector change while operator is down | high | none |
|
||||
| 7 | Two operator pods racing on rolling deploy | high | none — Phase 2 (leader election) |
|
||||
| 8 | NATS reconnect mid-rebuild | medium | partial (`async_nats` retries; no test) |
|
||||
| 9 | NATS stream loss after rebuild | high | none |
|
||||
| 10 | KV revision wraparound | low | none |
|
||||
| 11 | Malformed `device-state` payload | medium | `FA:L266-271` swallow path |
|
||||
| 12 | Malformed `desired-state` payload | low | `FA:L619-625` swallow path |
|
||||
| 13 | Invalid `DeploymentName` in KV key | low | `FA:L623-625` swallow path |
|
||||
| 14 | High write load during rebuild (slow rebuild) | medium | none |
|
||||
| 15 | Missing KV bucket on startup | low | created on first run (`FA:L154-164`) |
|
||||
| 16 | Concurrent CR mutation during rebuild | medium | none |
|
||||
| 17 | Heartbeat-only liveness (no state) on restart | low | none |
|
||||
| 18 | Aggregator panics on the kube patch path | medium | none — Phase 2 (liveness signal) |
|
||||
| 19 | Kube apiserver unreachable mid-rebuild | high | none |
|
||||
| 20 | Operator killed mid-`patch_tick` | medium | none |
|
||||
|
||||
---
|
||||
|
||||
### 1. Cold restart with full KV
|
||||
|
||||
- **Trigger.** Operator pod is killed (OOM, node reschedule, `kubectl rollout
|
||||
restart`) when every device has previously published `device-info` and
|
||||
`device-state`, and every desired-state KV entry the previous operator
|
||||
wrote is still present.
|
||||
- **Expected behavior.** New operator replays `device-state` via
|
||||
`bucket.watch_with_history(">")` (`FA:L251`) and seeds `owned_targets` from
|
||||
`desired-state` via `seed_owned_targets` (`FA:L611-633`). After both kube
|
||||
watchers fire `Event::InitDone`, the aggregate converges to byte-identical
|
||||
status patches.
|
||||
- **Code path.** `run` (`FA:L152-235`); `seed_owned_targets` (`FA:L611-633`);
|
||||
`run_state_kv_watcher` (`FA:L250-282`); `run_deployment_watcher`
|
||||
(`FA:L358-376`); `run_device_watcher` (`FA:L466-484`); `patch_tick`
|
||||
(`FA:L640-679`).
|
||||
- **Coverage.** `fleet/harmony-fleet-e2e/tests/operator_restart.rs` (this PR).
|
||||
- **Risk.** **High.** The customer-facing happy path. If this regresses,
|
||||
every dashboard reads stale or empty status after an upgrade.
|
||||
|
||||
### 2. Cold restart with desired-state seed only
|
||||
|
||||
- **Trigger.** Operator restart at a moment when `desired-state` has entries
|
||||
from a previous run but `device-state` is empty (all agents asleep or
|
||||
freshly provisioned).
|
||||
- **Expected behavior.** `owned_targets` is seeded correctly. Aggregate
|
||||
reports `matched_device_count = N`, `pending = N`, no false-positive
|
||||
`Running` counts.
|
||||
- **Code path.** `seed_owned_targets` (`FA:L611-633`); `compute_aggregate`
|
||||
(`FA:L684-710`).
|
||||
- **Coverage.** none.
|
||||
- **Risk.** **Medium.** Surfaces as transient over-`pending` until agents
|
||||
re-publish. Self-heals on the next state watch delivery.
|
||||
|
||||
### 3. Partial KV — device offline during restart
|
||||
|
||||
- **Trigger.** A device was running and reporting before the operator went
|
||||
down; it has now lost power or NATS connectivity. Its `device-state` entry
|
||||
is still present (KV is persistent), but it can no longer republish.
|
||||
- **Expected behavior.** Operator replays the cached state and renders the
|
||||
device as `Running` (or whatever the last phase was) until the agent comes
|
||||
back. Dashboard does not show the device as "missing" unless the heartbeat
|
||||
bucket says so. Phase 2 will surface staleness via the heartbeat watcher.
|
||||
- **Code path.** `apply_state` (`FA:L286-323`); no heartbeat watcher yet.
|
||||
- **Coverage.** none.
|
||||
- **Risk.** **High.** A long-offline device that the operator believes is
|
||||
`Running` could mask a real incident. Mitigation deferred to Chapter 2
|
||||
liveness signalling.
|
||||
|
||||
### 4. Stale KV — Deployment CR deleted while operator was down
|
||||
|
||||
- **Trigger.** While the operator pod is down, a customer deletes a
|
||||
`Deployment` CR. The CR finalizer never gets to run because no controller
|
||||
is alive to process it (the apiserver waits). When the operator restarts,
|
||||
the CR is in `Terminating` with a finalizer; the corresponding
|
||||
`desired-state.<device>.<name>` keys are still in NATS.
|
||||
- **Expected behavior.** Operator processes the `Event::Delete` for the CR
|
||||
(`FA:L369`), drops `owned_targets`, deletes desired-state entries
|
||||
(`FA:L450-455`), removes the finalizer. Agents observe the KV delete and
|
||||
reconcile-tear-down.
|
||||
- **Code path.** `on_deployment_delete` (`FA:L428-456`). Controller-side
|
||||
finalizer in `controller.rs` does a belt-and-braces scan of the same
|
||||
prefix.
|
||||
- **Coverage.** none — **Phase 2 work** (stale-KV reconciliation rule, step
|
||||
3 of Chapter 2). Not in this PR.
|
||||
- **Risk.** **High.** Orphaned KV entries make agents reconcile a long-dead
|
||||
Deployment forever. Customer-visible as "I deleted it but it's still
|
||||
running."
|
||||
|
||||
### 5. Stale KV — Device CR deleted while operator was down
|
||||
|
||||
- **Trigger.** Device CR is deleted (admin removes a Pi from the fleet) while
|
||||
the operator is down. `device-info` entry may also have been deleted
|
||||
separately; if so, the operator never rebuilds a Device CR for it.
|
||||
- **Expected behavior.** Stale `desired-state` entries keyed on the deleted
|
||||
device should be cleaned up. Today they're not — the operator only deletes
|
||||
them on a live `Event::Delete` for the Device (`FA:L552-576`). Phase 2
|
||||
reconciliation must walk `owned_targets` after init and prune entries with
|
||||
no matching device.
|
||||
- **Code path.** `on_device_delete` (`FA:L552-576`). No init-time prune.
|
||||
- **Coverage.** none — **Phase 2 work**.
|
||||
- **Risk.** **Medium.** Smaller blast radius than #4 because the device is
|
||||
also gone; nothing reconciles against the orphan key.
|
||||
|
||||
### 6. Selector change while operator is down
|
||||
|
||||
- **Trigger.** Customer edits `spec.targetSelector` on a CR while the
|
||||
operator pod is down. On restart, watcher delivers a single
|
||||
`Event::Apply` for the updated CR (kube collapses the history).
|
||||
- **Expected behavior.** Operator computes new matched set, diffs against
|
||||
seeded `owned_targets`, writes new desired-state entries, deletes
|
||||
newly-orphaned ones. `reconcile_kv` (`FA:L582-604`) is responsible.
|
||||
- **Code path.** `on_deployment_upsert` (`FA:L378-426`); `reconcile_kv`
|
||||
(`FA:L582-604`).
|
||||
- **Coverage.** none. Critical because the seed step is what makes the diff
|
||||
correct on a cold restart — without `seed_owned_targets`, a selector
|
||||
reduction would leak orphan entries.
|
||||
- **Risk.** **High.** Orphan keys reach agents that no longer match the
|
||||
selector, causing them to run a deployment they shouldn't.
|
||||
|
||||
### 7. Two operator pods racing on rolling deploy
|
||||
|
||||
- **Trigger.** A `kubectl rollout restart deploy/harmony-fleet-operator`
|
||||
briefly runs the old and new pods in parallel. Both watch the same KV
|
||||
and CRs, both write desired-state entries.
|
||||
- **Expected behavior.** Writes are idempotent and byte-deterministic
|
||||
(`reconcile_kv` is a put-or-delete on the same content). Status patches
|
||||
via `Patch::Merge` collide harmlessly. **However**, the loser's
|
||||
`owned_targets` snapshot can lag and re-delete a key the winner just
|
||||
wrote, causing flap.
|
||||
- **Code path.** `reconcile_kv` (`FA:L582-604`); kube patch (`FA:L655-666`).
|
||||
- **Coverage.** none — **Phase 2 work** (leader election decision, step 4
|
||||
of Chapter 2). Not in this PR.
|
||||
- **Risk.** **High.** Customer sees the dashboard flicker during a rolling
|
||||
upgrade. Self-heals once the old pod terminates.
|
||||
|
||||
### 8. NATS reconnect mid-rebuild
|
||||
|
||||
- **Trigger.** Operator's NATS connection drops during cold rebuild.
|
||||
`async_nats` reconnects transparently. The KV `watch_with_history(">")`
|
||||
call has already returned a `Stream`; the underlying connection drop
|
||||
surfaces as a delivery error on the stream.
|
||||
- **Expected behavior.** Stream loop logs the error and continues
|
||||
(`FA:L256-258`). The history replay may be incomplete — a follow-up watch
|
||||
refresh would be needed to guarantee correctness.
|
||||
- **Code path.** `run_state_kv_watcher` (`FA:L250-282`).
|
||||
- **Coverage.** partial — `async_nats` reconnect is covered by its own
|
||||
tests; no operator-level test asserts post-reconnect convergence.
|
||||
- **Risk.** **Medium.** The watch stream may silently miss messages until a
|
||||
manual restart.
|
||||
|
||||
### 9. NATS stream loss after rebuild
|
||||
|
||||
- **Trigger.** NATS server crashes or the JetStream stream is deleted
|
||||
out-of-band after the operator has finished its cold rebuild.
|
||||
- **Expected behavior.** Bucket re-creation on first reconnect
|
||||
(`create_key_value` is idempotent, `FA:L154-164`). Operator should detect
|
||||
the empty stream, clear in-memory state, and rebuild. Today the watcher
|
||||
loop exits silently and `select!` cancels the process.
|
||||
- **Code path.** `run` (`FA:L229-234`).
|
||||
- **Coverage.** none.
|
||||
- **Risk.** **High.** Possible silent data loss on a NATS incident.
|
||||
|
||||
### 10. KV revision wraparound
|
||||
|
||||
- **Trigger.** NATS JetStream KV uses a `u64` revision counter. At ~10
|
||||
Hz it would take ~58 billion years to wrap. Included for completeness;
|
||||
practical only with a corrupted stream.
|
||||
- **Expected behavior.** No special handling needed.
|
||||
- **Code path.** N/A.
|
||||
- **Coverage.** none.
|
||||
- **Risk.** **Low.** Theoretical.
|
||||
|
||||
### 11. Malformed `device-state` payload
|
||||
|
||||
- **Trigger.** A buggy agent (or a manual `nats kv put`) writes a value to
|
||||
`state.<device>.<deployment>` that doesn't deserialize as
|
||||
`DeploymentState`.
|
||||
- **Expected behavior.** Operator logs `aggregator: bad device_state
|
||||
payload` and skips the entry.
|
||||
- **Code path.** `run_state_kv_watcher` deserialize arm (`FA:L266-271`).
|
||||
- **Coverage.** none. The error path is exercised at compile time only.
|
||||
- **Risk.** **Medium.** A single bad entry shouldn't poison the whole
|
||||
rebuild; today's swallow-and-log handles this. Should be unit-tested.
|
||||
|
||||
### 12. Malformed `desired-state` payload
|
||||
|
||||
- **Trigger.** A previous operator wrote a value that no longer matches
|
||||
the current `ReconcileScore` shape (older version, manual mutation).
|
||||
- **Expected behavior.** `seed_owned_targets` doesn't deserialize the
|
||||
value — it only reads keys. The next CR upsert from kube rewrites it.
|
||||
- **Code path.** `seed_owned_targets` (`FA:L611-633`).
|
||||
- **Coverage.** none.
|
||||
- **Risk.** **Low.** Score format evolution is covered by CR validation;
|
||||
the KV is a derived projection.
|
||||
|
||||
### 13. Invalid `DeploymentName` in KV key
|
||||
|
||||
- **Trigger.** A key like `pi-01.hello.world` snuck into the bucket
|
||||
(multiple dots) — manual mutation or an older operator version that
|
||||
didn't validate names.
|
||||
- **Expected behavior.** `seed_owned_targets` logs `Invalid deployment
|
||||
name for key …` and skips it (`FA:L623-625`).
|
||||
- **Code path.** `seed_owned_targets` (`FA:L619-632`).
|
||||
- **Coverage.** none.
|
||||
- **Risk.** **Low.** Belt-and-braces, the CR layer already enforces this
|
||||
via `DeploymentName::try_new`.
|
||||
|
||||
### 14. High write load during rebuild
|
||||
|
||||
- **Trigger.** Hundreds of devices publishing `device-state` updates per
|
||||
second while the operator is rebuilding. The watch history replay races
|
||||
the live stream.
|
||||
- **Expected behavior.** Deliveries are ordered last-writer-wins; the
|
||||
per-pair `last_event_at` dedup in `apply_state` (`FA:L287-291`) prevents
|
||||
out-of-order entries from clobbering newer ones.
|
||||
- **Code path.** `apply_state` (`FA:L286-323`).
|
||||
- **Coverage.** none. No load test exists.
|
||||
- **Risk.** **Medium.** Likely fine in practice given the dedup, but
|
||||
unverified at scale.
|
||||
|
||||
### 15. Missing KV bucket on startup
|
||||
|
||||
- **Trigger.** First-ever operator start on a fresh NATS cluster, or after
|
||||
someone wiped JetStream state.
|
||||
- **Expected behavior.** `create_key_value` is idempotent — creates the
|
||||
bucket if absent, no-ops if present.
|
||||
- **Code path.** `run` (`FA:L153-164`).
|
||||
- **Coverage.** implicit in every smoke run that starts from a clean NATS.
|
||||
`smoke-a1.sh:182-195` asserts `KV bucket ready` log.
|
||||
- **Risk.** **Low.** Idiomatic NATS pattern.
|
||||
|
||||
### 16. Concurrent CR mutation during rebuild
|
||||
|
||||
- **Trigger.** User applies a new Deployment CR while the operator is still
|
||||
replaying KV history.
|
||||
- **Expected behavior.** Kube watcher delivers the `Event::Apply` after
|
||||
`Event::InitDone`; the upsert handler runs against the partially-seeded
|
||||
state and correctly diffs against any matching seeded targets.
|
||||
- **Code path.** `on_deployment_upsert` (`FA:L378-426`).
|
||||
- **Coverage.** none.
|
||||
- **Risk.** **Medium.** Possible race between KV seed and CR init; today
|
||||
the locking in `state.lock().await` serializes both, but the order in
|
||||
which they observe state is not asserted.
|
||||
|
||||
### 17. Heartbeat-only liveness (no state) on restart
|
||||
|
||||
- **Trigger.** Device has been publishing heartbeats but has no deployments
|
||||
assigned. Operator restart finds heartbeats but no `device-state` or
|
||||
`desired-state` entries for it.
|
||||
- **Expected behavior.** Device is recognized via its `Device` CR
|
||||
(rebuilt from `device-info` in `device_reconciler.rs`) and shown idle.
|
||||
No phase counts. The heartbeat bucket isn't watched by the aggregator.
|
||||
- **Code path.** `device_reconciler` (separate from this module).
|
||||
- **Coverage.** none.
|
||||
- **Risk.** **Low.** Expected dashboard rendering.
|
||||
|
||||
### 18. Aggregator panics on the kube patch path
|
||||
|
||||
- **Trigger.** A bug or upstream change makes `patch_status` panic. Tokio
|
||||
unwinds the spawned task; the process keeps running because of `select!`
|
||||
— but the patcher silently stops.
|
||||
- **Expected behavior.** Process should exit-and-restart on any subsystem
|
||||
failure. The dashboard should also surface "operator unhealthy" so a
|
||||
customer doesn't trust stale status.
|
||||
- **Code path.** `patch_tick` (`FA:L640-679`); `run` select (`FA:L229-234`).
|
||||
- **Coverage.** none — **Phase 2 work** (liveness signalling, step 5 of
|
||||
Chapter 2). Not in this PR.
|
||||
- **Risk.** **Medium.** Status freezes silently; depends on dashboard
|
||||
noticing the lack of updates.
|
||||
|
||||
### 19. Kube apiserver unreachable mid-rebuild
|
||||
|
||||
- **Trigger.** Apiserver hiccup during operator startup. `Api::list` or
|
||||
the initial `watcher::watcher` invocation fails.
|
||||
- **Expected behavior.** Watcher loop logs and exits; `select!` cancels
|
||||
the process; k8s restarts the pod with exponential backoff.
|
||||
- **Code path.** `run_deployment_watcher` (`FA:L362-374`);
|
||||
`run_device_watcher` (`FA:L470-482`).
|
||||
- **Coverage.** none.
|
||||
- **Risk.** **High.** A flapping apiserver can keep the operator from
|
||||
ever reaching steady state.
|
||||
|
||||
### 20. Operator killed mid-`patch_tick`
|
||||
|
||||
- **Trigger.** Pod terminated between draining the `dirty` set
|
||||
(`FA:L643`) and the actual `patch_status` calls (`FA:L656-666`).
|
||||
- **Expected behavior.** Lost dirty entries are re-marked on the next KV
|
||||
watch delivery. Worst case is a one-tick lag in `.status.aggregate` —
|
||||
the patch tick runs at 1 Hz.
|
||||
- **Code path.** `patch_tick` (`FA:L640-679`).
|
||||
- **Coverage.** none.
|
||||
- **Risk.** **Medium.** Self-heals on next event, but unverified.
|
||||
|
||||
## Phase 2 follow-ups (out of scope for this PR)
|
||||
|
||||
The Chapter 2 roadmap lists five steps. This PR ships only steps 1 and 2.
|
||||
|
||||
| Step | What it does | Scenarios it closes |
|
||||
|------|--------------|---------------------|
|
||||
| 1. Scenario inventory (this doc) | — | covers all 20 above by enumeration |
|
||||
| 2. Cold-restart regression test | gates #1 in CI | #1 |
|
||||
| 3. Stale-KV reconciliation rule | init-time prune of orphan keys | #4, #5 |
|
||||
| 4. Leader election decision | single-writer or idempotent multi-writer | #7 |
|
||||
| 5. Liveness signalling | dashboard "operator converging" banner | #18, parts of #19/#20 |
|
||||
|
||||
Each Phase 2 step is its own PR. The scenarios above tagged "Phase 2 work"
|
||||
are the entry points.
|
||||
@@ -18,6 +18,10 @@ path = "tests/ping.rs"
|
||||
name = "operator"
|
||||
path = "tests/operator.rs"
|
||||
|
||||
[[test]]
|
||||
name = "operator_restart"
|
||||
path = "tests/operator_restart.rs"
|
||||
|
||||
[[test]]
|
||||
name = "vm_ping"
|
||||
path = "tests/vm_ping.rs"
|
||||
|
||||
345
fleet/harmony-fleet-e2e/tests/operator_restart.rs
Normal file
345
fleet/harmony-fleet-e2e/tests/operator_restart.rs
Normal file
@@ -0,0 +1,345 @@
|
||||
//! Cold-restart regression test for the fleet operator's aggregator.
|
||||
//!
|
||||
//! Gates scenario #1 from `docs/fleet-operator-recovery-scenarios.md`:
|
||||
//! when the operator Pod is killed and replaced, the new instance must
|
||||
//! rebuild the aggregate state (Deployment status + `desired-state`
|
||||
//! KV) from NATS alone, byte-for-byte matching the pre-kill snapshot,
|
||||
//! within 30 seconds.
|
||||
//!
|
||||
//! Why a regression test rather than a unit test on `fleet_aggregator`:
|
||||
//! the failure mode this guards against is the *integration* between
|
||||
//! `seed_owned_targets` and the kube CR watcher init — the former
|
||||
//! reads NATS, the latter delivers `Event::InitApply` for every CR.
|
||||
//! Both feed `owned_targets` and the dirty set; a regression in either
|
||||
//! seed shows up only when the operator restarts against a populated
|
||||
//! KV plus live CRs.
|
||||
//!
|
||||
//! Shape (matches `operator.rs`): per-test-binary `OnceCell` stack with
|
||||
//! `deploy_operator = true`, env-gated on `HARMONY_FLEET_E2E=1` so
|
||||
//! `cargo test --workspace` stays cheap. Lives in its own test binary
|
||||
//! so the operator-pod kill below doesn't interact with other tests.
|
||||
//!
|
||||
//! Run explicitly:
|
||||
//!
|
||||
//! ```bash
|
||||
//! HARMONY_FLEET_E2E=1 cargo test -p harmony-fleet-e2e \
|
||||
//! --test operator_restart -- --nocapture
|
||||
//! ```
|
||||
|
||||
use harmony::modules::fleet::operator::crd::{
|
||||
Deployment, DeploymentSpec, Device, DeviceSpec, PodmanService, PodmanV0Score, ReconcileScore,
|
||||
Rollout, RolloutStrategy,
|
||||
};
|
||||
use harmony_fleet_deploy::operator::RELEASE_NAME as OPERATOR_RELEASE_NAME;
|
||||
use harmony_fleet_e2e::{StackOptions, shared_stack};
|
||||
use harmony_reconciler_contracts::{BUCKET_DESIRED_STATE, DeploymentName, desired_state_key};
|
||||
use k8s_openapi::api::apps::v1::Deployment as K8sDeployment;
|
||||
use k8s_openapi::api::core::v1::Pod;
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
|
||||
use kube::Client;
|
||||
use kube::api::{Api, DeleteParams, ListParams, ObjectMeta, PostParams};
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
const E2E_ENV: &str = "HARMONY_FLEET_E2E";
|
||||
|
||||
const DEVICE_NAME: &str = "restart-device";
|
||||
const DEPLOYMENT_NAME: &str = "restart-deployment";
|
||||
const CONVERGE_BUDGET: Duration = Duration::from_secs(30);
|
||||
|
||||
fn e2e_enabled() -> bool {
|
||||
matches!(std::env::var(E2E_ENV).as_deref(), Ok("1" | "true"))
|
||||
}
|
||||
|
||||
/// Snapshot of operator-owned state at a point in time. Captured pre-
|
||||
/// kill, re-captured post-restart, and asserted equal.
|
||||
///
|
||||
/// Equality on this struct is the test's convergence oracle. The
|
||||
/// fields are chosen so that any aggregator regression — desired-state
|
||||
/// drift, lost status patch, orphan KV entry — surfaces here rather
|
||||
/// than in a vague timing flake.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
struct OperatorState {
|
||||
/// `(matched, succeeded, failed, pending)` from
|
||||
/// `Deployment.status.aggregate`. We compare the tuple rather
|
||||
/// than the full `DeploymentAggregate` struct because
|
||||
/// `last_error.at` is an RFC 3339 string that can churn on
|
||||
/// re-patch even when nothing material changed; the rebuilt
|
||||
/// operator may write a fresh timestamp. The counters are the
|
||||
/// load-bearing assertion.
|
||||
aggregate: (u32, u32, u32, u32),
|
||||
/// The `desired-state` KV value for the (device, deployment) pair
|
||||
/// we created in this test. `Some(bytes)` if the key exists.
|
||||
desired_state_bytes: Option<Vec<u8>>,
|
||||
}
|
||||
|
||||
// Behavioral assertion: cold-restart preserves the aggregate.
|
||||
//
|
||||
// The test's contract is "operator pod can die at any moment and the
|
||||
// next pod converges back to byte-identical state within 30 s." If
|
||||
// this fails, scenario #1 in the recovery doc has regressed; check
|
||||
// `seed_owned_targets` and the kube CR watcher init paths first.
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn cold_restart_preserves_deployment_aggregate() -> anyhow::Result<()> {
|
||||
if !e2e_enabled() {
|
||||
skip_e2e();
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let stack = operator_stack().await?;
|
||||
let client = Client::try_default().await?;
|
||||
|
||||
let devices: Api<Device> = Api::all(client.clone());
|
||||
let deployments: Api<Deployment> = Api::namespaced(client.clone(), &stack.namespace);
|
||||
|
||||
// Steady-state setup: one device, one deployment that matches it.
|
||||
// We deliberately use an empty selector — every Device matches
|
||||
// every Deployment — because the assertion we care about is the
|
||||
// matched-count survival, not selector semantics (covered by
|
||||
// `operator.rs`).
|
||||
create_device(&devices, DEVICE_NAME).await?;
|
||||
create_fleet_deployment(&deployments, DEPLOYMENT_NAME).await?;
|
||||
|
||||
// Wait for the operator to have done one full reconcile cycle
|
||||
// before we kill it. Without this the pre-kill snapshot can be
|
||||
// empty, making the post-restart equality vacuous.
|
||||
wait_for_first_patch(&deployments, DEPLOYMENT_NAME).await?;
|
||||
let pre_kill = snapshot_state(&stack, &deployments, DEVICE_NAME, DEPLOYMENT_NAME).await?;
|
||||
eprintln!("[operator_restart] pre-kill snapshot: {pre_kill:?}");
|
||||
|
||||
// Force a cold restart. Deleting the underlying Pod is more
|
||||
// realistic than `delete deployment`: it mirrors what k8s does on
|
||||
// a node drain or OOM kill — the Deployment controller spins up
|
||||
// a fresh Pod with no state carry-over.
|
||||
delete_operator_pod(&client, &stack.namespace).await?;
|
||||
wait_for_operator_ready(&client, &stack.namespace).await?;
|
||||
|
||||
// Convergence wait. The aggregator rebuilds asynchronously; we
|
||||
// re-snapshot until it matches or the budget elapses. The
|
||||
// tolerance is binary: aggregate counts + KV bytes must match
|
||||
// exactly. A relaxed assertion would mask the very regressions
|
||||
// this test exists to catch.
|
||||
let deadline = Instant::now() + CONVERGE_BUDGET;
|
||||
let mut post_restart;
|
||||
loop {
|
||||
post_restart = snapshot_state(&stack, &deployments, DEVICE_NAME, DEPLOYMENT_NAME).await?;
|
||||
if post_restart == pre_kill {
|
||||
break;
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
anyhow::bail!(
|
||||
"operator failed to converge within {:?}; \
|
||||
pre-kill={:?}, post-restart={:?}",
|
||||
CONVERGE_BUDGET,
|
||||
pre_kill,
|
||||
post_restart,
|
||||
);
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
eprintln!("[operator_restart] converged: {post_restart:?}");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn operator_stack() -> anyhow::Result<Arc<harmony_fleet_e2e::Stack>> {
|
||||
let _ = tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
|
||||
)
|
||||
.try_init();
|
||||
|
||||
// `deploy_agent: false` — the aggregator's cold-rebuild correctness
|
||||
// does not depend on any agent reporting `device-state`. The CR
|
||||
// watch path alone seeds `matched_device_count` from a Device CR
|
||||
// we create directly. Adding a real agent would slow the test
|
||||
// and add a second independent failure surface.
|
||||
let stack = shared_stack(StackOptions {
|
||||
deploy_agent: false,
|
||||
deploy_operator: true,
|
||||
..StackOptions::default()
|
||||
})
|
||||
.await?;
|
||||
|
||||
stack.print_debug_info();
|
||||
Ok(stack)
|
||||
}
|
||||
|
||||
fn skip_e2e() {
|
||||
eprintln!(
|
||||
"skipping {E2E_ENV}-gated e2e test (set {E2E_ENV}=1 to run; \
|
||||
requires k3d + podman on PATH)"
|
||||
);
|
||||
}
|
||||
|
||||
async fn create_device(devices: &Api<Device>, name: &str) -> anyhow::Result<()> {
|
||||
let device = Device::new(name, DeviceSpec { inventory: None });
|
||||
devices.create(&PostParams::default(), &device).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn create_fleet_deployment(deployments: &Api<Deployment>, name: &str) -> anyhow::Result<()> {
|
||||
let deployment = Deployment {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(name.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: DeploymentSpec {
|
||||
target_selector: LabelSelector::default(),
|
||||
score: ReconcileScore::PodmanV0(PodmanV0Score {
|
||||
services: vec![PodmanService {
|
||||
name: "hello".to_string(),
|
||||
image: "docker.io/library/hello-world:latest".to_string(),
|
||||
ports: vec![],
|
||||
env: vec![],
|
||||
volumes: vec![],
|
||||
restart_policy: Default::default(),
|
||||
}],
|
||||
}),
|
||||
rollout: Rollout {
|
||||
strategy: RolloutStrategy::Immediate,
|
||||
},
|
||||
},
|
||||
status: None,
|
||||
};
|
||||
|
||||
deployments
|
||||
.create(&PostParams::default(), &deployment)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Block until the operator's `patch_tick` has written a non-empty
|
||||
/// `aggregate` at least once. The aggregator only patches CRs whose
|
||||
/// dirty set picked them up; the first patch lands within one
|
||||
/// `PATCH_TICK` (1 s) after the kube watcher emits `Event::InitApply`
|
||||
/// for the new Deployment + Device.
|
||||
async fn wait_for_first_patch(deployments: &Api<Deployment>, name: &str) -> anyhow::Result<()> {
|
||||
let deadline = Instant::now() + Duration::from_secs(30);
|
||||
loop {
|
||||
let cr = deployments.get(name).await?;
|
||||
if let Some(status) = cr.status
|
||||
&& let Some(agg) = status.aggregate
|
||||
&& agg.matched_device_count >= 1
|
||||
{
|
||||
return Ok(());
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
anyhow::bail!("timed out waiting for first aggregate patch on {name}");
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn snapshot_state(
|
||||
stack: &harmony_fleet_e2e::Stack,
|
||||
deployments: &Api<Deployment>,
|
||||
device_id: &str,
|
||||
deployment_name: &str,
|
||||
) -> anyhow::Result<OperatorState> {
|
||||
let cr = deployments.get(deployment_name).await?;
|
||||
let aggregate = cr
|
||||
.status
|
||||
.and_then(|s| s.aggregate)
|
||||
.map(|a| (a.matched_device_count, a.succeeded, a.failed, a.pending))
|
||||
.unwrap_or((0, 0, 0, 0));
|
||||
|
||||
let nats_client = connect_admin_nats(stack).await?;
|
||||
let js = async_nats::jetstream::new(nats_client);
|
||||
let desired_state = js.get_key_value(BUCKET_DESIRED_STATE).await?;
|
||||
let dn = DeploymentName::try_new(deployment_name)?;
|
||||
let key = desired_state_key(device_id, &dn);
|
||||
let desired_state_bytes = desired_state.get(&key).await?.map(|b| b.to_vec());
|
||||
|
||||
Ok(OperatorState {
|
||||
aggregate,
|
||||
desired_state_bytes,
|
||||
})
|
||||
}
|
||||
|
||||
/// Delete every Pod owned by the operator Deployment. The Deployment
|
||||
/// controller restarts them; we wait for the replacement Ready in the
|
||||
/// next helper.
|
||||
///
|
||||
/// We avoid `delete deployment` because it would also tear down the
|
||||
/// Service + RBAC the harness set up, and the shared stack would lose
|
||||
/// those across the test boundary.
|
||||
async fn delete_operator_pod(client: &Client, namespace: &str) -> anyhow::Result<()> {
|
||||
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
|
||||
let selector = format!("app.kubernetes.io/name={OPERATOR_RELEASE_NAME}");
|
||||
let list = pods.list(&ListParams::default().labels(&selector)).await?;
|
||||
if list.items.is_empty() {
|
||||
anyhow::bail!("no operator pods found in namespace {namespace} with selector {selector}");
|
||||
}
|
||||
for pod in list.items {
|
||||
if let Some(name) = pod.metadata.name {
|
||||
pods.delete(&name, &DeleteParams::default()).await?;
|
||||
eprintln!("[operator_restart] deleted operator pod {namespace}/{name}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn wait_for_operator_ready(client: &Client, namespace: &str) -> anyhow::Result<()> {
|
||||
let api: Api<K8sDeployment> = Api::namespaced(client.clone(), namespace);
|
||||
let deadline = Instant::now() + Duration::from_secs(120);
|
||||
loop {
|
||||
// Both `ready_replicas` AND `observed_generation == generation`
|
||||
// — a stale `ready_replicas=1` may linger from before the
|
||||
// pod-delete propagated to status. Comparing generations
|
||||
// pins us on the *post-delete* state.
|
||||
let d = api.get(OPERATOR_RELEASE_NAME).await?;
|
||||
let observed_ok = d
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.observed_generation)
|
||||
.zip(d.metadata.generation)
|
||||
.is_some_and(|(observed, current)| observed >= current);
|
||||
let pods_ok = d
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.ready_replicas)
|
||||
.unwrap_or(0)
|
||||
>= 1;
|
||||
if observed_ok && pods_ok {
|
||||
// Belt-and-braces: confirm at least one new Pod is
|
||||
// actually Running, not just `ready_replicas=1` leftover
|
||||
// from the stale pod's grace period.
|
||||
if running_operator_pod_count(client, namespace).await? >= 1 {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
anyhow::bail!("operator Deployment did not become Ready within 120s after pod delete");
|
||||
}
|
||||
tokio::time::sleep(Duration::from_secs(1)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn running_operator_pod_count(client: &Client, namespace: &str) -> anyhow::Result<usize> {
|
||||
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
|
||||
let selector = format!("app.kubernetes.io/name={OPERATOR_RELEASE_NAME}");
|
||||
let list = pods.list(&ListParams::default().labels(&selector)).await?;
|
||||
Ok(list
|
||||
.items
|
||||
.iter()
|
||||
.filter(|p| {
|
||||
p.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.phase.as_deref())
|
||||
.map(|phase| phase == "Running")
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.count())
|
||||
}
|
||||
|
||||
async fn connect_admin_nats(
|
||||
stack: &harmony_fleet_e2e::Stack,
|
||||
) -> anyhow::Result<async_nats::Client> {
|
||||
async_nats::ConnectOptions::new()
|
||||
.user_and_password(stack.admin_user.clone(), stack.admin_pass.clone())
|
||||
.connect(&stack.nats_url)
|
||||
.await
|
||||
.map_err(Into::into)
|
||||
}
|
||||
Reference in New Issue
Block a user