feat/v0-3-operator-restart-baseline #294

Open
johnride wants to merge 2 commits from feat/v0-3-operator-restart-baseline into feat/smoke-test-contract
3 changed files with 694 additions and 0 deletions

View File

@@ -0,0 +1,345 @@
# Fleet operator recovery scenarios
Inventory of every failure shape the IoT operator pod must survive on restart,
re-schedule, or upgrade. Companion to ROADMAP `v0_3_plan.md` **Chapter 2 —
Operator restart + aggregator recovery**.
The operator's in-memory aggregate (`Phase`, `DeploymentAggregate`, per-device
state) is rebuilt from scratch on every startup by watching the four NATS KV
buckets:
- `desired-state` — operator-written, `<device>.<deployment>` keys
- `device-info` — agent-written, static-ish facts
- `device-state` — agent-written, per `(device, deployment)` phase
- `device-heartbeat` — agent liveness pings
The aggregator entry point is `harmony_fleet_operator::fleet_aggregator::run`
(see `fleet/harmony-fleet-operator/src/fleet_aggregator.rs`).
## Convention used in this document
- **Code path** cites the most relevant file and line range in
`fleet/harmony-fleet-operator/src/fleet_aggregator.rs` (call site `FA:Lxxx`).
Other crates use `crate:path:Lxxx`.
- **Coverage** points at a smoke script under `fleet/scripts/` or a test under
`fleet/harmony-fleet-e2e/tests/`. `none` means we currently rely on it
working by inspection.
- **Risk** is impact if mishandled, not likelihood. `high` means a customer
could see stale or wrong data on the dashboard; `medium` means the operator
self-heals but logs a noisy error; `low` means transient correctness with
no customer-visible effect.
## Scenarios
| # | Name | Risk | Coverage |
|---|-------------------------------------------------|---------|----------|
| 1 | Cold restart with full KV | high | this PR (`tests/operator_restart.rs`) |
| 2 | Cold restart with desired-state seed only | medium | none |
| 3 | Partial KV — device offline during restart | high | none |
| 4 | Stale KV — Deployment CR deleted while down | high | none — Phase 2 |
| 5 | Stale KV — Device CR deleted while down | medium | none — Phase 2 |
| 6 | Selector change while operator is down | high | none |
| 7 | Two operator pods racing on rolling deploy | high | none — Phase 2 (leader election) |
| 8 | NATS reconnect mid-rebuild | medium | partial (`async_nats` retries; no test) |
| 9 | NATS stream loss after rebuild | high | none |
| 10 | KV revision wraparound | low | none |
| 11 | Malformed `device-state` payload | medium | `FA:L266-271` swallow path |
| 12 | Malformed `desired-state` payload | low | `FA:L619-625` swallow path |
| 13 | Invalid `DeploymentName` in KV key | low | `FA:L623-625` swallow path |
| 14 | High write load during rebuild (slow rebuild) | medium | none |
| 15 | Missing KV bucket on startup | low | created on first run (`FA:L154-164`) |
| 16 | Concurrent CR mutation during rebuild | medium | none |
| 17 | Heartbeat-only liveness (no state) on restart | low | none |
| 18 | Aggregator panics on the kube patch path | medium | none — Phase 2 (liveness signal) |
| 19 | Kube apiserver unreachable mid-rebuild | high | none |
| 20 | Operator killed mid-`patch_tick` | medium | none |
---
### 1. Cold restart with full KV
- **Trigger.** Operator pod is killed (OOM, node reschedule, `kubectl rollout
restart`) when every device has previously published `device-info` and
`device-state`, and every desired-state KV entry the previous operator
wrote is still present.
- **Expected behavior.** New operator replays `device-state` via
`bucket.watch_with_history(">")` (`FA:L251`) and seeds `owned_targets` from
`desired-state` via `seed_owned_targets` (`FA:L611-633`). After both kube
watchers fire `Event::InitDone`, the aggregate converges to byte-identical
status patches.
- **Code path.** `run` (`FA:L152-235`); `seed_owned_targets` (`FA:L611-633`);
`run_state_kv_watcher` (`FA:L250-282`); `run_deployment_watcher`
(`FA:L358-376`); `run_device_watcher` (`FA:L466-484`); `patch_tick`
(`FA:L640-679`).
- **Coverage.** `fleet/harmony-fleet-e2e/tests/operator_restart.rs` (this PR).
- **Risk.** **High.** The customer-facing happy path. If this regresses,
every dashboard reads stale or empty status after an upgrade.
### 2. Cold restart with desired-state seed only
- **Trigger.** Operator restart at a moment when `desired-state` has entries
from a previous run but `device-state` is empty (all agents asleep or
freshly provisioned).
- **Expected behavior.** `owned_targets` is seeded correctly. Aggregate
reports `matched_device_count = N`, `pending = N`, no false-positive
`Running` counts.
- **Code path.** `seed_owned_targets` (`FA:L611-633`); `compute_aggregate`
(`FA:L684-710`).
- **Coverage.** none.
- **Risk.** **Medium.** Surfaces as transient over-`pending` until agents
re-publish. Self-heals on the next state watch delivery.
### 3. Partial KV — device offline during restart
- **Trigger.** A device was running and reporting before the operator went
down; it has now lost power or NATS connectivity. Its `device-state` entry
is still present (KV is persistent), but it can no longer republish.
- **Expected behavior.** Operator replays the cached state and renders the
device as `Running` (or whatever the last phase was) until the agent comes
back. Dashboard does not show the device as "missing" unless the heartbeat
bucket says so. Phase 2 will surface staleness via the heartbeat watcher.
- **Code path.** `apply_state` (`FA:L286-323`); no heartbeat watcher yet.
- **Coverage.** none.
- **Risk.** **High.** A long-offline device that the operator believes is
`Running` could mask a real incident. Mitigation deferred to Chapter 2
liveness signalling.
### 4. Stale KV — Deployment CR deleted while operator was down
- **Trigger.** While the operator pod is down, a customer deletes a
`Deployment` CR. The CR finalizer never gets to run because no controller
is alive to process it (the apiserver waits). When the operator restarts,
the CR is in `Terminating` with a finalizer; the corresponding
`desired-state.<device>.<name>` keys are still in NATS.
- **Expected behavior.** Operator processes the `Event::Delete` for the CR
(`FA:L369`), drops `owned_targets`, deletes desired-state entries
(`FA:L450-455`), removes the finalizer. Agents observe the KV delete and
reconcile-tear-down.
- **Code path.** `on_deployment_delete` (`FA:L428-456`). Controller-side
finalizer in `controller.rs` does a belt-and-braces scan of the same
prefix.
- **Coverage.** none — **Phase 2 work** (stale-KV reconciliation rule, step
3 of Chapter 2). Not in this PR.
- **Risk.** **High.** Orphaned KV entries make agents reconcile a long-dead
Deployment forever. Customer-visible as "I deleted it but it's still
running."
### 5. Stale KV — Device CR deleted while operator was down
- **Trigger.** Device CR is deleted (admin removes a Pi from the fleet) while
the operator is down. `device-info` entry may also have been deleted
separately; if so, the operator never rebuilds a Device CR for it.
- **Expected behavior.** Stale `desired-state` entries keyed on the deleted
device should be cleaned up. Today they're not — the operator only deletes
them on a live `Event::Delete` for the Device (`FA:L552-576`). Phase 2
reconciliation must walk `owned_targets` after init and prune entries with
no matching device.
- **Code path.** `on_device_delete` (`FA:L552-576`). No init-time prune.
- **Coverage.** none — **Phase 2 work**.
- **Risk.** **Medium.** Smaller blast radius than #4 because the device is
also gone; nothing reconciles against the orphan key.
### 6. Selector change while operator is down
- **Trigger.** Customer edits `spec.targetSelector` on a CR while the
operator pod is down. On restart, watcher delivers a single
`Event::Apply` for the updated CR (kube collapses the history).
- **Expected behavior.** Operator computes new matched set, diffs against
seeded `owned_targets`, writes new desired-state entries, deletes
newly-orphaned ones. `reconcile_kv` (`FA:L582-604`) is responsible.
- **Code path.** `on_deployment_upsert` (`FA:L378-426`); `reconcile_kv`
(`FA:L582-604`).
- **Coverage.** none. Critical because the seed step is what makes the diff
correct on a cold restart — without `seed_owned_targets`, a selector
reduction would leak orphan entries.
- **Risk.** **High.** Orphan keys reach agents that no longer match the
selector, causing them to run a deployment they shouldn't.
### 7. Two operator pods racing on rolling deploy
- **Trigger.** A `kubectl rollout restart deploy/harmony-fleet-operator`
briefly runs the old and new pods in parallel. Both watch the same KV
and CRs, both write desired-state entries.
- **Expected behavior.** Writes are idempotent and byte-deterministic
(`reconcile_kv` is a put-or-delete on the same content). Status patches
via `Patch::Merge` collide harmlessly. **However**, the loser's
`owned_targets` snapshot can lag and re-delete a key the winner just
wrote, causing flap.
- **Code path.** `reconcile_kv` (`FA:L582-604`); kube patch (`FA:L655-666`).
- **Coverage.** none — **Phase 2 work** (leader election decision, step 4
of Chapter 2). Not in this PR.
- **Risk.** **High.** Customer sees the dashboard flicker during a rolling
upgrade. Self-heals once the old pod terminates.
### 8. NATS reconnect mid-rebuild
- **Trigger.** Operator's NATS connection drops during cold rebuild.
`async_nats` reconnects transparently. The KV `watch_with_history(">")`
call has already returned a `Stream`; the underlying connection drop
surfaces as a delivery error on the stream.
- **Expected behavior.** Stream loop logs the error and continues
(`FA:L256-258`). The history replay may be incomplete — a follow-up watch
refresh would be needed to guarantee correctness.
- **Code path.** `run_state_kv_watcher` (`FA:L250-282`).
- **Coverage.** partial — `async_nats` reconnect is covered by its own
tests; no operator-level test asserts post-reconnect convergence.
- **Risk.** **Medium.** The watch stream may silently miss messages until a
manual restart.
### 9. NATS stream loss after rebuild
- **Trigger.** NATS server crashes or the JetStream stream is deleted
out-of-band after the operator has finished its cold rebuild.
- **Expected behavior.** Bucket re-creation on first reconnect
(`create_key_value` is idempotent, `FA:L154-164`). Operator should detect
the empty stream, clear in-memory state, and rebuild. Today the watcher
loop exits silently and `select!` cancels the process.
- **Code path.** `run` (`FA:L229-234`).
- **Coverage.** none.
- **Risk.** **High.** Possible silent data loss on a NATS incident.
### 10. KV revision wraparound
- **Trigger.** NATS JetStream KV uses a `u64` revision counter. At ~10
Hz it would take ~58 billion years to wrap. Included for completeness;
practical only with a corrupted stream.
- **Expected behavior.** No special handling needed.
- **Code path.** N/A.
- **Coverage.** none.
- **Risk.** **Low.** Theoretical.
### 11. Malformed `device-state` payload
- **Trigger.** A buggy agent (or a manual `nats kv put`) writes a value to
`state.<device>.<deployment>` that doesn't deserialize as
`DeploymentState`.
- **Expected behavior.** Operator logs `aggregator: bad device_state
payload` and skips the entry.
- **Code path.** `run_state_kv_watcher` deserialize arm (`FA:L266-271`).
- **Coverage.** none. The error path is exercised at compile time only.
- **Risk.** **Medium.** A single bad entry shouldn't poison the whole
rebuild; today's swallow-and-log handles this. Should be unit-tested.
### 12. Malformed `desired-state` payload
- **Trigger.** A previous operator wrote a value that no longer matches
the current `ReconcileScore` shape (older version, manual mutation).
- **Expected behavior.** `seed_owned_targets` doesn't deserialize the
value — it only reads keys. The next CR upsert from kube rewrites it.
- **Code path.** `seed_owned_targets` (`FA:L611-633`).
- **Coverage.** none.
- **Risk.** **Low.** Score format evolution is covered by CR validation;
the KV is a derived projection.
### 13. Invalid `DeploymentName` in KV key
- **Trigger.** A key like `pi-01.hello.world` snuck into the bucket
(multiple dots) — manual mutation or an older operator version that
didn't validate names.
- **Expected behavior.** `seed_owned_targets` logs `Invalid deployment
name for key …` and skips it (`FA:L623-625`).
- **Code path.** `seed_owned_targets` (`FA:L619-632`).
- **Coverage.** none.
- **Risk.** **Low.** Belt-and-braces, the CR layer already enforces this
via `DeploymentName::try_new`.
### 14. High write load during rebuild
- **Trigger.** Hundreds of devices publishing `device-state` updates per
second while the operator is rebuilding. The watch history replay races
the live stream.
- **Expected behavior.** Deliveries are ordered last-writer-wins; the
per-pair `last_event_at` dedup in `apply_state` (`FA:L287-291`) prevents
out-of-order entries from clobbering newer ones.
- **Code path.** `apply_state` (`FA:L286-323`).
- **Coverage.** none. No load test exists.
- **Risk.** **Medium.** Likely fine in practice given the dedup, but
unverified at scale.
### 15. Missing KV bucket on startup
- **Trigger.** First-ever operator start on a fresh NATS cluster, or after
someone wiped JetStream state.
- **Expected behavior.** `create_key_value` is idempotent — creates the
bucket if absent, no-ops if present.
- **Code path.** `run` (`FA:L153-164`).
- **Coverage.** implicit in every smoke run that starts from a clean NATS.
`smoke-a1.sh:182-195` asserts `KV bucket ready` log.
- **Risk.** **Low.** Idiomatic NATS pattern.
### 16. Concurrent CR mutation during rebuild
- **Trigger.** User applies a new Deployment CR while the operator is still
replaying KV history.
- **Expected behavior.** Kube watcher delivers the `Event::Apply` after
`Event::InitDone`; the upsert handler runs against the partially-seeded
state and correctly diffs against any matching seeded targets.
- **Code path.** `on_deployment_upsert` (`FA:L378-426`).
- **Coverage.** none.
- **Risk.** **Medium.** Possible race between KV seed and CR init; today
the locking in `state.lock().await` serializes both, but the order in
which they observe state is not asserted.
### 17. Heartbeat-only liveness (no state) on restart
- **Trigger.** Device has been publishing heartbeats but has no deployments
assigned. Operator restart finds heartbeats but no `device-state` or
`desired-state` entries for it.
- **Expected behavior.** Device is recognized via its `Device` CR
(rebuilt from `device-info` in `device_reconciler.rs`) and shown idle.
No phase counts. The heartbeat bucket isn't watched by the aggregator.
- **Code path.** `device_reconciler` (separate from this module).
- **Coverage.** none.
- **Risk.** **Low.** Expected dashboard rendering.
### 18. Aggregator panics on the kube patch path
- **Trigger.** A bug or upstream change makes `patch_status` panic. Tokio
unwinds the spawned task; the process keeps running because of `select!`
— but the patcher silently stops.
- **Expected behavior.** Process should exit-and-restart on any subsystem
failure. The dashboard should also surface "operator unhealthy" so a
customer doesn't trust stale status.
- **Code path.** `patch_tick` (`FA:L640-679`); `run` select (`FA:L229-234`).
- **Coverage.** none — **Phase 2 work** (liveness signalling, step 5 of
Chapter 2). Not in this PR.
- **Risk.** **Medium.** Status freezes silently; depends on dashboard
noticing the lack of updates.
### 19. Kube apiserver unreachable mid-rebuild
- **Trigger.** Apiserver hiccup during operator startup. `Api::list` or
the initial `watcher::watcher` invocation fails.
- **Expected behavior.** Watcher loop logs and exits; `select!` cancels
the process; k8s restarts the pod with exponential backoff.
- **Code path.** `run_deployment_watcher` (`FA:L362-374`);
`run_device_watcher` (`FA:L470-482`).
- **Coverage.** none.
- **Risk.** **High.** A flapping apiserver can keep the operator from
ever reaching steady state.
### 20. Operator killed mid-`patch_tick`
- **Trigger.** Pod terminated between draining the `dirty` set
(`FA:L643`) and the actual `patch_status` calls (`FA:L656-666`).
- **Expected behavior.** Lost dirty entries are re-marked on the next KV
watch delivery. Worst case is a one-tick lag in `.status.aggregate` —
the patch tick runs at 1 Hz.
- **Code path.** `patch_tick` (`FA:L640-679`).
- **Coverage.** none.
- **Risk.** **Medium.** Self-heals on next event, but unverified.
## Phase 2 follow-ups (out of scope for this PR)
The Chapter 2 roadmap lists five steps. This PR ships only steps 1 and 2.
| Step | What it does | Scenarios it closes |
|------|--------------|---------------------|
| 1. Scenario inventory (this doc) | — | covers all 20 above by enumeration |
| 2. Cold-restart regression test | gates #1 in CI | #1 |
| 3. Stale-KV reconciliation rule | init-time prune of orphan keys | #4, #5 |
| 4. Leader election decision | single-writer or idempotent multi-writer | #7 |
| 5. Liveness signalling | dashboard "operator converging" banner | #18, parts of #19/#20 |
Each Phase 2 step is its own PR. The scenarios above tagged "Phase 2 work"
are the entry points.

View File

@@ -18,6 +18,10 @@ path = "tests/ping.rs"
name = "operator"
path = "tests/operator.rs"
[[test]]
name = "operator_restart"
path = "tests/operator_restart.rs"
[[test]]
name = "vm_ping"
path = "tests/vm_ping.rs"

View File

@@ -0,0 +1,345 @@
//! Cold-restart regression test for the fleet operator's aggregator.
//!
//! Gates scenario #1 from `docs/fleet-operator-recovery-scenarios.md`:
//! when the operator Pod is killed and replaced, the new instance must
//! rebuild the aggregate state (Deployment status + `desired-state`
//! KV) from NATS alone, byte-for-byte matching the pre-kill snapshot,
//! within 30 seconds.
//!
//! Why a regression test rather than a unit test on `fleet_aggregator`:
//! the failure mode this guards against is the *integration* between
//! `seed_owned_targets` and the kube CR watcher init — the former
//! reads NATS, the latter delivers `Event::InitApply` for every CR.
//! Both feed `owned_targets` and the dirty set; a regression in either
//! seed shows up only when the operator restarts against a populated
//! KV plus live CRs.
//!
//! Shape (matches `operator.rs`): per-test-binary `OnceCell` stack with
//! `deploy_operator = true`, env-gated on `HARMONY_FLEET_E2E=1` so
//! `cargo test --workspace` stays cheap. Lives in its own test binary
//! so the operator-pod kill below doesn't interact with other tests.
//!
//! Run explicitly:
//!
//! ```bash
//! HARMONY_FLEET_E2E=1 cargo test -p harmony-fleet-e2e \
//! --test operator_restart -- --nocapture
//! ```
use harmony::modules::fleet::operator::crd::{
Deployment, DeploymentSpec, Device, DeviceSpec, PodmanService, PodmanV0Score, ReconcileScore,
Rollout, RolloutStrategy,
};
use harmony_fleet_deploy::operator::RELEASE_NAME as OPERATOR_RELEASE_NAME;
use harmony_fleet_e2e::{StackOptions, shared_stack};
use harmony_reconciler_contracts::{BUCKET_DESIRED_STATE, DeploymentName, desired_state_key};
use k8s_openapi::api::apps::v1::Deployment as K8sDeployment;
use k8s_openapi::api::core::v1::Pod;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use kube::Client;
use kube::api::{Api, DeleteParams, ListParams, ObjectMeta, PostParams};
use std::sync::Arc;
use std::time::{Duration, Instant};
const E2E_ENV: &str = "HARMONY_FLEET_E2E";
const DEVICE_NAME: &str = "restart-device";
const DEPLOYMENT_NAME: &str = "restart-deployment";
const CONVERGE_BUDGET: Duration = Duration::from_secs(30);
fn e2e_enabled() -> bool {
matches!(std::env::var(E2E_ENV).as_deref(), Ok("1" | "true"))
}
/// Snapshot of operator-owned state at a point in time. Captured pre-
/// kill, re-captured post-restart, and asserted equal.
///
/// Equality on this struct is the test's convergence oracle. The
/// fields are chosen so that any aggregator regression — desired-state
/// drift, lost status patch, orphan KV entry — surfaces here rather
/// than in a vague timing flake.
#[derive(Debug, Clone, PartialEq, Eq)]
struct OperatorState {
/// `(matched, succeeded, failed, pending)` from
/// `Deployment.status.aggregate`. We compare the tuple rather
/// than the full `DeploymentAggregate` struct because
/// `last_error.at` is an RFC 3339 string that can churn on
/// re-patch even when nothing material changed; the rebuilt
/// operator may write a fresh timestamp. The counters are the
/// load-bearing assertion.
aggregate: (u32, u32, u32, u32),
/// The `desired-state` KV value for the (device, deployment) pair
/// we created in this test. `Some(bytes)` if the key exists.
desired_state_bytes: Option<Vec<u8>>,
}
// Behavioral assertion: cold-restart preserves the aggregate.
//
// The test's contract is "operator pod can die at any moment and the
// next pod converges back to byte-identical state within 30 s." If
// this fails, scenario #1 in the recovery doc has regressed; check
// `seed_owned_targets` and the kube CR watcher init paths first.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn cold_restart_preserves_deployment_aggregate() -> anyhow::Result<()> {
if !e2e_enabled() {
skip_e2e();
return Ok(());
}
let stack = operator_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client.clone(), &stack.namespace);
// Steady-state setup: one device, one deployment that matches it.
// We deliberately use an empty selector — every Device matches
// every Deployment — because the assertion we care about is the
// matched-count survival, not selector semantics (covered by
// `operator.rs`).
create_device(&devices, DEVICE_NAME).await?;
create_fleet_deployment(&deployments, DEPLOYMENT_NAME).await?;
// Wait for the operator to have done one full reconcile cycle
// before we kill it. Without this the pre-kill snapshot can be
// empty, making the post-restart equality vacuous.
wait_for_first_patch(&deployments, DEPLOYMENT_NAME).await?;
let pre_kill = snapshot_state(&stack, &deployments, DEVICE_NAME, DEPLOYMENT_NAME).await?;
eprintln!("[operator_restart] pre-kill snapshot: {pre_kill:?}");
// Force a cold restart. Deleting the underlying Pod is more
// realistic than `delete deployment`: it mirrors what k8s does on
// a node drain or OOM kill — the Deployment controller spins up
// a fresh Pod with no state carry-over.
delete_operator_pod(&client, &stack.namespace).await?;
wait_for_operator_ready(&client, &stack.namespace).await?;
// Convergence wait. The aggregator rebuilds asynchronously; we
// re-snapshot until it matches or the budget elapses. The
// tolerance is binary: aggregate counts + KV bytes must match
// exactly. A relaxed assertion would mask the very regressions
// this test exists to catch.
let deadline = Instant::now() + CONVERGE_BUDGET;
let mut post_restart;
loop {
post_restart = snapshot_state(&stack, &deployments, DEVICE_NAME, DEPLOYMENT_NAME).await?;
if post_restart == pre_kill {
break;
}
if Instant::now() >= deadline {
anyhow::bail!(
"operator failed to converge within {:?}; \
pre-kill={:?}, post-restart={:?}",
CONVERGE_BUDGET,
pre_kill,
post_restart,
);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
eprintln!("[operator_restart] converged: {post_restart:?}");
Ok(())
}
async fn operator_stack() -> anyhow::Result<Arc<harmony_fleet_e2e::Stack>> {
let _ = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.try_init();
// `deploy_agent: false` — the aggregator's cold-rebuild correctness
// does not depend on any agent reporting `device-state`. The CR
// watch path alone seeds `matched_device_count` from a Device CR
// we create directly. Adding a real agent would slow the test
// and add a second independent failure surface.
let stack = shared_stack(StackOptions {
deploy_agent: false,
deploy_operator: true,
..StackOptions::default()
})
.await?;
stack.print_debug_info();
Ok(stack)
}
fn skip_e2e() {
eprintln!(
"skipping {E2E_ENV}-gated e2e test (set {E2E_ENV}=1 to run; \
requires k3d + podman on PATH)"
);
}
async fn create_device(devices: &Api<Device>, name: &str) -> anyhow::Result<()> {
let device = Device::new(name, DeviceSpec { inventory: None });
devices.create(&PostParams::default(), &device).await?;
Ok(())
}
async fn create_fleet_deployment(deployments: &Api<Deployment>, name: &str) -> anyhow::Result<()> {
let deployment = Deployment {
metadata: ObjectMeta {
name: Some(name.to_string()),
..Default::default()
},
spec: DeploymentSpec {
target_selector: LabelSelector::default(),
score: ReconcileScore::PodmanV0(PodmanV0Score {
services: vec![PodmanService {
name: "hello".to_string(),
image: "docker.io/library/hello-world:latest".to_string(),
ports: vec![],
env: vec![],
volumes: vec![],
restart_policy: Default::default(),
}],
}),
rollout: Rollout {
strategy: RolloutStrategy::Immediate,
},
},
status: None,
};
deployments
.create(&PostParams::default(), &deployment)
.await?;
Ok(())
}
/// Block until the operator's `patch_tick` has written a non-empty
/// `aggregate` at least once. The aggregator only patches CRs whose
/// dirty set picked them up; the first patch lands within one
/// `PATCH_TICK` (1 s) after the kube watcher emits `Event::InitApply`
/// for the new Deployment + Device.
async fn wait_for_first_patch(deployments: &Api<Deployment>, name: &str) -> anyhow::Result<()> {
let deadline = Instant::now() + Duration::from_secs(30);
loop {
let cr = deployments.get(name).await?;
if let Some(status) = cr.status
&& let Some(agg) = status.aggregate
&& agg.matched_device_count >= 1
{
return Ok(());
}
if Instant::now() >= deadline {
anyhow::bail!("timed out waiting for first aggregate patch on {name}");
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
async fn snapshot_state(
stack: &harmony_fleet_e2e::Stack,
deployments: &Api<Deployment>,
device_id: &str,
deployment_name: &str,
) -> anyhow::Result<OperatorState> {
let cr = deployments.get(deployment_name).await?;
let aggregate = cr
.status
.and_then(|s| s.aggregate)
.map(|a| (a.matched_device_count, a.succeeded, a.failed, a.pending))
.unwrap_or((0, 0, 0, 0));
let nats_client = connect_admin_nats(stack).await?;
let js = async_nats::jetstream::new(nats_client);
let desired_state = js.get_key_value(BUCKET_DESIRED_STATE).await?;
let dn = DeploymentName::try_new(deployment_name)?;
let key = desired_state_key(device_id, &dn);
let desired_state_bytes = desired_state.get(&key).await?.map(|b| b.to_vec());
Ok(OperatorState {
aggregate,
desired_state_bytes,
})
}
/// Delete every Pod owned by the operator Deployment. The Deployment
/// controller restarts them; we wait for the replacement Ready in the
/// next helper.
///
/// We avoid `delete deployment` because it would also tear down the
/// Service + RBAC the harness set up, and the shared stack would lose
/// those across the test boundary.
async fn delete_operator_pod(client: &Client, namespace: &str) -> anyhow::Result<()> {
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
let selector = format!("app.kubernetes.io/name={OPERATOR_RELEASE_NAME}");
let list = pods.list(&ListParams::default().labels(&selector)).await?;
if list.items.is_empty() {
anyhow::bail!("no operator pods found in namespace {namespace} with selector {selector}");
}
for pod in list.items {
if let Some(name) = pod.metadata.name {
pods.delete(&name, &DeleteParams::default()).await?;
eprintln!("[operator_restart] deleted operator pod {namespace}/{name}");
}
}
Ok(())
}
async fn wait_for_operator_ready(client: &Client, namespace: &str) -> anyhow::Result<()> {
let api: Api<K8sDeployment> = Api::namespaced(client.clone(), namespace);
let deadline = Instant::now() + Duration::from_secs(120);
loop {
// Both `ready_replicas` AND `observed_generation == generation`
// — a stale `ready_replicas=1` may linger from before the
// pod-delete propagated to status. Comparing generations
// pins us on the *post-delete* state.
let d = api.get(OPERATOR_RELEASE_NAME).await?;
let observed_ok = d
.status
.as_ref()
.and_then(|s| s.observed_generation)
.zip(d.metadata.generation)
.is_some_and(|(observed, current)| observed >= current);
let pods_ok = d
.status
.as_ref()
.and_then(|s| s.ready_replicas)
.unwrap_or(0)
>= 1;
if observed_ok && pods_ok {
// Belt-and-braces: confirm at least one new Pod is
// actually Running, not just `ready_replicas=1` leftover
// from the stale pod's grace period.
if running_operator_pod_count(client, namespace).await? >= 1 {
return Ok(());
}
}
if Instant::now() >= deadline {
anyhow::bail!("operator Deployment did not become Ready within 120s after pod delete");
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
async fn running_operator_pod_count(client: &Client, namespace: &str) -> anyhow::Result<usize> {
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
let selector = format!("app.kubernetes.io/name={OPERATOR_RELEASE_NAME}");
let list = pods.list(&ListParams::default().labels(&selector)).await?;
Ok(list
.items
.iter()
.filter(|p| {
p.status
.as_ref()
.and_then(|s| s.phase.as_deref())
.map(|phase| phase == "Running")
.unwrap_or(false)
})
.count())
}
async fn connect_admin_nats(
stack: &harmony_fleet_e2e::Stack,
) -> anyhow::Result<async_nats::Client> {
async_nats::ConnectOptions::new()
.user_and_password(stack.admin_user.clone(), stack.admin_pass.clone())
.connect(&stack.nats_url)
.await
.map_err(Into::into)
}