Files
harmony/fleet/harmony-fleet-e2e/tests/operator_restart.rs
Jean-Gabriel Gill-Couture 13e5549d6b
All checks were successful
Run Check Script / check (pull_request) Successful in 2m22s
test(fleet-e2e): cold-restart regression baseline for operator
Adds the operator_restart integration test that gates scenario #1
in docs/fleet-operator-recovery-scenarios.md: when the operator Pod
is killed and replaced, the new instance must rebuild Deployment
status + desired-state KV from NATS alone, byte-for-byte matching
the pre-kill snapshot within 30 s.

Pattern: deploy via FleetOperatorScore (no handrolled manifests),
seed a Device + Deployment CR, wait for first patch, snapshot the
aggregate counts + desired-state bytes, delete the operator pod,
wait for the replacement Ready, then poll the snapshot until it
matches or the budget elapses.

Gated behind HARMONY_FLEET_E2E=1 so cargo test --workspace stays
cheap; runs in its own test binary to isolate the pod-kill blast
radius from the existing operator suite.

Step 2 of v0.3 Chapter 2. Steps 3-5 deferred.
2026-05-24 15:27:49 -04:00

346 lines
14 KiB
Rust

//! Cold-restart regression test for the fleet operator's aggregator.
//!
//! Gates scenario #1 from `docs/fleet-operator-recovery-scenarios.md`:
//! when the operator Pod is killed and replaced, the new instance must
//! rebuild the aggregate state (Deployment status + `desired-state`
//! KV) from NATS alone, byte-for-byte matching the pre-kill snapshot,
//! within 30 seconds.
//!
//! Why a regression test rather than a unit test on `fleet_aggregator`:
//! the failure mode this guards against is the *integration* between
//! `seed_owned_targets` and the kube CR watcher init — the former
//! reads NATS, the latter delivers `Event::InitApply` for every CR.
//! Both feed `owned_targets` and the dirty set; a regression in either
//! seed shows up only when the operator restarts against a populated
//! KV plus live CRs.
//!
//! Shape (matches `operator.rs`): per-test-binary `OnceCell` stack with
//! `deploy_operator = true`, env-gated on `HARMONY_FLEET_E2E=1` so
//! `cargo test --workspace` stays cheap. Lives in its own test binary
//! so the operator-pod kill below doesn't interact with other tests.
//!
//! Run explicitly:
//!
//! ```bash
//! HARMONY_FLEET_E2E=1 cargo test -p harmony-fleet-e2e \
//! --test operator_restart -- --nocapture
//! ```
use harmony::modules::fleet::operator::crd::{
Deployment, DeploymentSpec, Device, DeviceSpec, PodmanService, PodmanV0Score, ReconcileScore,
Rollout, RolloutStrategy,
};
use harmony_fleet_deploy::operator::RELEASE_NAME as OPERATOR_RELEASE_NAME;
use harmony_fleet_e2e::{StackOptions, shared_stack};
use harmony_reconciler_contracts::{BUCKET_DESIRED_STATE, DeploymentName, desired_state_key};
use k8s_openapi::api::apps::v1::Deployment as K8sDeployment;
use k8s_openapi::api::core::v1::Pod;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use kube::Client;
use kube::api::{Api, DeleteParams, ListParams, ObjectMeta, PostParams};
use std::sync::Arc;
use std::time::{Duration, Instant};
const E2E_ENV: &str = "HARMONY_FLEET_E2E";
const DEVICE_NAME: &str = "restart-device";
const DEPLOYMENT_NAME: &str = "restart-deployment";
const CONVERGE_BUDGET: Duration = Duration::from_secs(30);
fn e2e_enabled() -> bool {
matches!(std::env::var(E2E_ENV).as_deref(), Ok("1" | "true"))
}
/// Snapshot of operator-owned state at a point in time. Captured pre-
/// kill, re-captured post-restart, and asserted equal.
///
/// Equality on this struct is the test's convergence oracle. The
/// fields are chosen so that any aggregator regression — desired-state
/// drift, lost status patch, orphan KV entry — surfaces here rather
/// than in a vague timing flake.
#[derive(Debug, Clone, PartialEq, Eq)]
struct OperatorState {
/// `(matched, succeeded, failed, pending)` from
/// `Deployment.status.aggregate`. We compare the tuple rather
/// than the full `DeploymentAggregate` struct because
/// `last_error.at` is an RFC 3339 string that can churn on
/// re-patch even when nothing material changed; the rebuilt
/// operator may write a fresh timestamp. The counters are the
/// load-bearing assertion.
aggregate: (u32, u32, u32, u32),
/// The `desired-state` KV value for the (device, deployment) pair
/// we created in this test. `Some(bytes)` if the key exists.
desired_state_bytes: Option<Vec<u8>>,
}
// Behavioral assertion: cold-restart preserves the aggregate.
//
// The test's contract is "operator pod can die at any moment and the
// next pod converges back to byte-identical state within 30 s." If
// this fails, scenario #1 in the recovery doc has regressed; check
// `seed_owned_targets` and the kube CR watcher init paths first.
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn cold_restart_preserves_deployment_aggregate() -> anyhow::Result<()> {
if !e2e_enabled() {
skip_e2e();
return Ok(());
}
let stack = operator_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client.clone(), &stack.namespace);
// Steady-state setup: one device, one deployment that matches it.
// We deliberately use an empty selector — every Device matches
// every Deployment — because the assertion we care about is the
// matched-count survival, not selector semantics (covered by
// `operator.rs`).
create_device(&devices, DEVICE_NAME).await?;
create_fleet_deployment(&deployments, DEPLOYMENT_NAME).await?;
// Wait for the operator to have done one full reconcile cycle
// before we kill it. Without this the pre-kill snapshot can be
// empty, making the post-restart equality vacuous.
wait_for_first_patch(&deployments, DEPLOYMENT_NAME).await?;
let pre_kill = snapshot_state(&stack, &deployments, DEVICE_NAME, DEPLOYMENT_NAME).await?;
eprintln!("[operator_restart] pre-kill snapshot: {pre_kill:?}");
// Force a cold restart. Deleting the underlying Pod is more
// realistic than `delete deployment`: it mirrors what k8s does on
// a node drain or OOM kill — the Deployment controller spins up
// a fresh Pod with no state carry-over.
delete_operator_pod(&client, &stack.namespace).await?;
wait_for_operator_ready(&client, &stack.namespace).await?;
// Convergence wait. The aggregator rebuilds asynchronously; we
// re-snapshot until it matches or the budget elapses. The
// tolerance is binary: aggregate counts + KV bytes must match
// exactly. A relaxed assertion would mask the very regressions
// this test exists to catch.
let deadline = Instant::now() + CONVERGE_BUDGET;
let mut post_restart;
loop {
post_restart = snapshot_state(&stack, &deployments, DEVICE_NAME, DEPLOYMENT_NAME).await?;
if post_restart == pre_kill {
break;
}
if Instant::now() >= deadline {
anyhow::bail!(
"operator failed to converge within {:?}; \
pre-kill={:?}, post-restart={:?}",
CONVERGE_BUDGET,
pre_kill,
post_restart,
);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
eprintln!("[operator_restart] converged: {post_restart:?}");
Ok(())
}
async fn operator_stack() -> anyhow::Result<Arc<harmony_fleet_e2e::Stack>> {
let _ = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.try_init();
// `deploy_agent: false` — the aggregator's cold-rebuild correctness
// does not depend on any agent reporting `device-state`. The CR
// watch path alone seeds `matched_device_count` from a Device CR
// we create directly. Adding a real agent would slow the test
// and add a second independent failure surface.
let stack = shared_stack(StackOptions {
deploy_agent: false,
deploy_operator: true,
..StackOptions::default()
})
.await?;
stack.print_debug_info();
Ok(stack)
}
fn skip_e2e() {
eprintln!(
"skipping {E2E_ENV}-gated e2e test (set {E2E_ENV}=1 to run; \
requires k3d + podman on PATH)"
);
}
async fn create_device(devices: &Api<Device>, name: &str) -> anyhow::Result<()> {
let device = Device::new(name, DeviceSpec { inventory: None });
devices.create(&PostParams::default(), &device).await?;
Ok(())
}
async fn create_fleet_deployment(deployments: &Api<Deployment>, name: &str) -> anyhow::Result<()> {
let deployment = Deployment {
metadata: ObjectMeta {
name: Some(name.to_string()),
..Default::default()
},
spec: DeploymentSpec {
target_selector: LabelSelector::default(),
score: ReconcileScore::PodmanV0(PodmanV0Score {
services: vec![PodmanService {
name: "hello".to_string(),
image: "docker.io/library/hello-world:latest".to_string(),
ports: vec![],
env: vec![],
volumes: vec![],
restart_policy: Default::default(),
}],
}),
rollout: Rollout {
strategy: RolloutStrategy::Immediate,
},
},
status: None,
};
deployments
.create(&PostParams::default(), &deployment)
.await?;
Ok(())
}
/// Block until the operator's `patch_tick` has written a non-empty
/// `aggregate` at least once. The aggregator only patches CRs whose
/// dirty set picked them up; the first patch lands within one
/// `PATCH_TICK` (1 s) after the kube watcher emits `Event::InitApply`
/// for the new Deployment + Device.
async fn wait_for_first_patch(deployments: &Api<Deployment>, name: &str) -> anyhow::Result<()> {
let deadline = Instant::now() + Duration::from_secs(30);
loop {
let cr = deployments.get(name).await?;
if let Some(status) = cr.status
&& let Some(agg) = status.aggregate
&& agg.matched_device_count >= 1
{
return Ok(());
}
if Instant::now() >= deadline {
anyhow::bail!("timed out waiting for first aggregate patch on {name}");
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
async fn snapshot_state(
stack: &harmony_fleet_e2e::Stack,
deployments: &Api<Deployment>,
device_id: &str,
deployment_name: &str,
) -> anyhow::Result<OperatorState> {
let cr = deployments.get(deployment_name).await?;
let aggregate = cr
.status
.and_then(|s| s.aggregate)
.map(|a| (a.matched_device_count, a.succeeded, a.failed, a.pending))
.unwrap_or((0, 0, 0, 0));
let nats_client = connect_admin_nats(stack).await?;
let js = async_nats::jetstream::new(nats_client);
let desired_state = js.get_key_value(BUCKET_DESIRED_STATE).await?;
let dn = DeploymentName::try_new(deployment_name)?;
let key = desired_state_key(device_id, &dn);
let desired_state_bytes = desired_state.get(&key).await?.map(|b| b.to_vec());
Ok(OperatorState {
aggregate,
desired_state_bytes,
})
}
/// Delete every Pod owned by the operator Deployment. The Deployment
/// controller restarts them; we wait for the replacement Ready in the
/// next helper.
///
/// We avoid `delete deployment` because it would also tear down the
/// Service + RBAC the harness set up, and the shared stack would lose
/// those across the test boundary.
async fn delete_operator_pod(client: &Client, namespace: &str) -> anyhow::Result<()> {
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
let selector = format!("app.kubernetes.io/name={OPERATOR_RELEASE_NAME}");
let list = pods.list(&ListParams::default().labels(&selector)).await?;
if list.items.is_empty() {
anyhow::bail!("no operator pods found in namespace {namespace} with selector {selector}");
}
for pod in list.items {
if let Some(name) = pod.metadata.name {
pods.delete(&name, &DeleteParams::default()).await?;
eprintln!("[operator_restart] deleted operator pod {namespace}/{name}");
}
}
Ok(())
}
async fn wait_for_operator_ready(client: &Client, namespace: &str) -> anyhow::Result<()> {
let api: Api<K8sDeployment> = Api::namespaced(client.clone(), namespace);
let deadline = Instant::now() + Duration::from_secs(120);
loop {
// Both `ready_replicas` AND `observed_generation == generation`
// — a stale `ready_replicas=1` may linger from before the
// pod-delete propagated to status. Comparing generations
// pins us on the *post-delete* state.
let d = api.get(OPERATOR_RELEASE_NAME).await?;
let observed_ok = d
.status
.as_ref()
.and_then(|s| s.observed_generation)
.zip(d.metadata.generation)
.is_some_and(|(observed, current)| observed >= current);
let pods_ok = d
.status
.as_ref()
.and_then(|s| s.ready_replicas)
.unwrap_or(0)
>= 1;
if observed_ok && pods_ok {
// Belt-and-braces: confirm at least one new Pod is
// actually Running, not just `ready_replicas=1` leftover
// from the stale pod's grace period.
if running_operator_pod_count(client, namespace).await? >= 1 {
return Ok(());
}
}
if Instant::now() >= deadline {
anyhow::bail!("operator Deployment did not become Ready within 120s after pod delete");
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
async fn running_operator_pod_count(client: &Client, namespace: &str) -> anyhow::Result<usize> {
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
let selector = format!("app.kubernetes.io/name={OPERATOR_RELEASE_NAME}");
let list = pods.list(&ListParams::default().labels(&selector)).await?;
Ok(list
.items
.iter()
.filter(|p| {
p.status
.as_ref()
.and_then(|s| s.phase.as_deref())
.map(|phase| phase == "Running")
.unwrap_or(false)
})
.count())
}
async fn connect_admin_nats(
stack: &harmony_fleet_e2e::Stack,
) -> anyhow::Result<async_nats::Client> {
async_nats::ConnectOptions::new()
.user_and_password(stack.admin_user.clone(), stack.admin_pass.clone())
.connect(&stack.nats_url)
.await
.map_err(Into::into)
}