All checks were successful
Run Check Script / check (pull_request) Successful in 2m22s
Adds the operator_restart integration test that gates scenario #1 in docs/fleet-operator-recovery-scenarios.md: when the operator Pod is killed and replaced, the new instance must rebuild Deployment status + desired-state KV from NATS alone, byte-for-byte matching the pre-kill snapshot within 30 s. Pattern: deploy via FleetOperatorScore (no handrolled manifests), seed a Device + Deployment CR, wait for first patch, snapshot the aggregate counts + desired-state bytes, delete the operator pod, wait for the replacement Ready, then poll the snapshot until it matches or the budget elapses. Gated behind HARMONY_FLEET_E2E=1 so cargo test --workspace stays cheap; runs in its own test binary to isolate the pod-kill blast radius from the existing operator suite. Step 2 of v0.3 Chapter 2. Steps 3-5 deferred.
346 lines
14 KiB
Rust
346 lines
14 KiB
Rust
//! Cold-restart regression test for the fleet operator's aggregator.
|
|
//!
|
|
//! Gates scenario #1 from `docs/fleet-operator-recovery-scenarios.md`:
|
|
//! when the operator Pod is killed and replaced, the new instance must
|
|
//! rebuild the aggregate state (Deployment status + `desired-state`
|
|
//! KV) from NATS alone, byte-for-byte matching the pre-kill snapshot,
|
|
//! within 30 seconds.
|
|
//!
|
|
//! Why a regression test rather than a unit test on `fleet_aggregator`:
|
|
//! the failure mode this guards against is the *integration* between
|
|
//! `seed_owned_targets` and the kube CR watcher init — the former
|
|
//! reads NATS, the latter delivers `Event::InitApply` for every CR.
|
|
//! Both feed `owned_targets` and the dirty set; a regression in either
|
|
//! seed shows up only when the operator restarts against a populated
|
|
//! KV plus live CRs.
|
|
//!
|
|
//! Shape (matches `operator.rs`): per-test-binary `OnceCell` stack with
|
|
//! `deploy_operator = true`, env-gated on `HARMONY_FLEET_E2E=1` so
|
|
//! `cargo test --workspace` stays cheap. Lives in its own test binary
|
|
//! so the operator-pod kill below doesn't interact with other tests.
|
|
//!
|
|
//! Run explicitly:
|
|
//!
|
|
//! ```bash
|
|
//! HARMONY_FLEET_E2E=1 cargo test -p harmony-fleet-e2e \
|
|
//! --test operator_restart -- --nocapture
|
|
//! ```
|
|
|
|
use harmony::modules::fleet::operator::crd::{
|
|
Deployment, DeploymentSpec, Device, DeviceSpec, PodmanService, PodmanV0Score, ReconcileScore,
|
|
Rollout, RolloutStrategy,
|
|
};
|
|
use harmony_fleet_deploy::operator::RELEASE_NAME as OPERATOR_RELEASE_NAME;
|
|
use harmony_fleet_e2e::{StackOptions, shared_stack};
|
|
use harmony_reconciler_contracts::{BUCKET_DESIRED_STATE, DeploymentName, desired_state_key};
|
|
use k8s_openapi::api::apps::v1::Deployment as K8sDeployment;
|
|
use k8s_openapi::api::core::v1::Pod;
|
|
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
|
|
use kube::Client;
|
|
use kube::api::{Api, DeleteParams, ListParams, ObjectMeta, PostParams};
|
|
use std::sync::Arc;
|
|
use std::time::{Duration, Instant};
|
|
|
|
const E2E_ENV: &str = "HARMONY_FLEET_E2E";
|
|
|
|
const DEVICE_NAME: &str = "restart-device";
|
|
const DEPLOYMENT_NAME: &str = "restart-deployment";
|
|
const CONVERGE_BUDGET: Duration = Duration::from_secs(30);
|
|
|
|
fn e2e_enabled() -> bool {
|
|
matches!(std::env::var(E2E_ENV).as_deref(), Ok("1" | "true"))
|
|
}
|
|
|
|
/// Snapshot of operator-owned state at a point in time. Captured pre-
|
|
/// kill, re-captured post-restart, and asserted equal.
|
|
///
|
|
/// Equality on this struct is the test's convergence oracle. The
|
|
/// fields are chosen so that any aggregator regression — desired-state
|
|
/// drift, lost status patch, orphan KV entry — surfaces here rather
|
|
/// than in a vague timing flake.
|
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
|
struct OperatorState {
|
|
/// `(matched, succeeded, failed, pending)` from
|
|
/// `Deployment.status.aggregate`. We compare the tuple rather
|
|
/// than the full `DeploymentAggregate` struct because
|
|
/// `last_error.at` is an RFC 3339 string that can churn on
|
|
/// re-patch even when nothing material changed; the rebuilt
|
|
/// operator may write a fresh timestamp. The counters are the
|
|
/// load-bearing assertion.
|
|
aggregate: (u32, u32, u32, u32),
|
|
/// The `desired-state` KV value for the (device, deployment) pair
|
|
/// we created in this test. `Some(bytes)` if the key exists.
|
|
desired_state_bytes: Option<Vec<u8>>,
|
|
}
|
|
|
|
// Behavioral assertion: cold-restart preserves the aggregate.
|
|
//
|
|
// The test's contract is "operator pod can die at any moment and the
|
|
// next pod converges back to byte-identical state within 30 s." If
|
|
// this fails, scenario #1 in the recovery doc has regressed; check
|
|
// `seed_owned_targets` and the kube CR watcher init paths first.
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
|
async fn cold_restart_preserves_deployment_aggregate() -> anyhow::Result<()> {
|
|
if !e2e_enabled() {
|
|
skip_e2e();
|
|
return Ok(());
|
|
}
|
|
|
|
let stack = operator_stack().await?;
|
|
let client = Client::try_default().await?;
|
|
|
|
let devices: Api<Device> = Api::all(client.clone());
|
|
let deployments: Api<Deployment> = Api::namespaced(client.clone(), &stack.namespace);
|
|
|
|
// Steady-state setup: one device, one deployment that matches it.
|
|
// We deliberately use an empty selector — every Device matches
|
|
// every Deployment — because the assertion we care about is the
|
|
// matched-count survival, not selector semantics (covered by
|
|
// `operator.rs`).
|
|
create_device(&devices, DEVICE_NAME).await?;
|
|
create_fleet_deployment(&deployments, DEPLOYMENT_NAME).await?;
|
|
|
|
// Wait for the operator to have done one full reconcile cycle
|
|
// before we kill it. Without this the pre-kill snapshot can be
|
|
// empty, making the post-restart equality vacuous.
|
|
wait_for_first_patch(&deployments, DEPLOYMENT_NAME).await?;
|
|
let pre_kill = snapshot_state(&stack, &deployments, DEVICE_NAME, DEPLOYMENT_NAME).await?;
|
|
eprintln!("[operator_restart] pre-kill snapshot: {pre_kill:?}");
|
|
|
|
// Force a cold restart. Deleting the underlying Pod is more
|
|
// realistic than `delete deployment`: it mirrors what k8s does on
|
|
// a node drain or OOM kill — the Deployment controller spins up
|
|
// a fresh Pod with no state carry-over.
|
|
delete_operator_pod(&client, &stack.namespace).await?;
|
|
wait_for_operator_ready(&client, &stack.namespace).await?;
|
|
|
|
// Convergence wait. The aggregator rebuilds asynchronously; we
|
|
// re-snapshot until it matches or the budget elapses. The
|
|
// tolerance is binary: aggregate counts + KV bytes must match
|
|
// exactly. A relaxed assertion would mask the very regressions
|
|
// this test exists to catch.
|
|
let deadline = Instant::now() + CONVERGE_BUDGET;
|
|
let mut post_restart;
|
|
loop {
|
|
post_restart = snapshot_state(&stack, &deployments, DEVICE_NAME, DEPLOYMENT_NAME).await?;
|
|
if post_restart == pre_kill {
|
|
break;
|
|
}
|
|
if Instant::now() >= deadline {
|
|
anyhow::bail!(
|
|
"operator failed to converge within {:?}; \
|
|
pre-kill={:?}, post-restart={:?}",
|
|
CONVERGE_BUDGET,
|
|
pre_kill,
|
|
post_restart,
|
|
);
|
|
}
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
}
|
|
eprintln!("[operator_restart] converged: {post_restart:?}");
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn operator_stack() -> anyhow::Result<Arc<harmony_fleet_e2e::Stack>> {
|
|
let _ = tracing_subscriber::fmt()
|
|
.with_env_filter(
|
|
tracing_subscriber::EnvFilter::try_from_default_env()
|
|
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
|
|
)
|
|
.try_init();
|
|
|
|
// `deploy_agent: false` — the aggregator's cold-rebuild correctness
|
|
// does not depend on any agent reporting `device-state`. The CR
|
|
// watch path alone seeds `matched_device_count` from a Device CR
|
|
// we create directly. Adding a real agent would slow the test
|
|
// and add a second independent failure surface.
|
|
let stack = shared_stack(StackOptions {
|
|
deploy_agent: false,
|
|
deploy_operator: true,
|
|
..StackOptions::default()
|
|
})
|
|
.await?;
|
|
|
|
stack.print_debug_info();
|
|
Ok(stack)
|
|
}
|
|
|
|
fn skip_e2e() {
|
|
eprintln!(
|
|
"skipping {E2E_ENV}-gated e2e test (set {E2E_ENV}=1 to run; \
|
|
requires k3d + podman on PATH)"
|
|
);
|
|
}
|
|
|
|
async fn create_device(devices: &Api<Device>, name: &str) -> anyhow::Result<()> {
|
|
let device = Device::new(name, DeviceSpec { inventory: None });
|
|
devices.create(&PostParams::default(), &device).await?;
|
|
Ok(())
|
|
}
|
|
|
|
async fn create_fleet_deployment(deployments: &Api<Deployment>, name: &str) -> anyhow::Result<()> {
|
|
let deployment = Deployment {
|
|
metadata: ObjectMeta {
|
|
name: Some(name.to_string()),
|
|
..Default::default()
|
|
},
|
|
spec: DeploymentSpec {
|
|
target_selector: LabelSelector::default(),
|
|
score: ReconcileScore::PodmanV0(PodmanV0Score {
|
|
services: vec![PodmanService {
|
|
name: "hello".to_string(),
|
|
image: "docker.io/library/hello-world:latest".to_string(),
|
|
ports: vec![],
|
|
env: vec![],
|
|
volumes: vec![],
|
|
restart_policy: Default::default(),
|
|
}],
|
|
}),
|
|
rollout: Rollout {
|
|
strategy: RolloutStrategy::Immediate,
|
|
},
|
|
},
|
|
status: None,
|
|
};
|
|
|
|
deployments
|
|
.create(&PostParams::default(), &deployment)
|
|
.await?;
|
|
Ok(())
|
|
}
|
|
|
|
/// Block until the operator's `patch_tick` has written a non-empty
|
|
/// `aggregate` at least once. The aggregator only patches CRs whose
|
|
/// dirty set picked them up; the first patch lands within one
|
|
/// `PATCH_TICK` (1 s) after the kube watcher emits `Event::InitApply`
|
|
/// for the new Deployment + Device.
|
|
async fn wait_for_first_patch(deployments: &Api<Deployment>, name: &str) -> anyhow::Result<()> {
|
|
let deadline = Instant::now() + Duration::from_secs(30);
|
|
loop {
|
|
let cr = deployments.get(name).await?;
|
|
if let Some(status) = cr.status
|
|
&& let Some(agg) = status.aggregate
|
|
&& agg.matched_device_count >= 1
|
|
{
|
|
return Ok(());
|
|
}
|
|
if Instant::now() >= deadline {
|
|
anyhow::bail!("timed out waiting for first aggregate patch on {name}");
|
|
}
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
}
|
|
}
|
|
|
|
async fn snapshot_state(
|
|
stack: &harmony_fleet_e2e::Stack,
|
|
deployments: &Api<Deployment>,
|
|
device_id: &str,
|
|
deployment_name: &str,
|
|
) -> anyhow::Result<OperatorState> {
|
|
let cr = deployments.get(deployment_name).await?;
|
|
let aggregate = cr
|
|
.status
|
|
.and_then(|s| s.aggregate)
|
|
.map(|a| (a.matched_device_count, a.succeeded, a.failed, a.pending))
|
|
.unwrap_or((0, 0, 0, 0));
|
|
|
|
let nats_client = connect_admin_nats(stack).await?;
|
|
let js = async_nats::jetstream::new(nats_client);
|
|
let desired_state = js.get_key_value(BUCKET_DESIRED_STATE).await?;
|
|
let dn = DeploymentName::try_new(deployment_name)?;
|
|
let key = desired_state_key(device_id, &dn);
|
|
let desired_state_bytes = desired_state.get(&key).await?.map(|b| b.to_vec());
|
|
|
|
Ok(OperatorState {
|
|
aggregate,
|
|
desired_state_bytes,
|
|
})
|
|
}
|
|
|
|
/// Delete every Pod owned by the operator Deployment. The Deployment
|
|
/// controller restarts them; we wait for the replacement Ready in the
|
|
/// next helper.
|
|
///
|
|
/// We avoid `delete deployment` because it would also tear down the
|
|
/// Service + RBAC the harness set up, and the shared stack would lose
|
|
/// those across the test boundary.
|
|
async fn delete_operator_pod(client: &Client, namespace: &str) -> anyhow::Result<()> {
|
|
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
|
|
let selector = format!("app.kubernetes.io/name={OPERATOR_RELEASE_NAME}");
|
|
let list = pods.list(&ListParams::default().labels(&selector)).await?;
|
|
if list.items.is_empty() {
|
|
anyhow::bail!("no operator pods found in namespace {namespace} with selector {selector}");
|
|
}
|
|
for pod in list.items {
|
|
if let Some(name) = pod.metadata.name {
|
|
pods.delete(&name, &DeleteParams::default()).await?;
|
|
eprintln!("[operator_restart] deleted operator pod {namespace}/{name}");
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn wait_for_operator_ready(client: &Client, namespace: &str) -> anyhow::Result<()> {
|
|
let api: Api<K8sDeployment> = Api::namespaced(client.clone(), namespace);
|
|
let deadline = Instant::now() + Duration::from_secs(120);
|
|
loop {
|
|
// Both `ready_replicas` AND `observed_generation == generation`
|
|
// — a stale `ready_replicas=1` may linger from before the
|
|
// pod-delete propagated to status. Comparing generations
|
|
// pins us on the *post-delete* state.
|
|
let d = api.get(OPERATOR_RELEASE_NAME).await?;
|
|
let observed_ok = d
|
|
.status
|
|
.as_ref()
|
|
.and_then(|s| s.observed_generation)
|
|
.zip(d.metadata.generation)
|
|
.is_some_and(|(observed, current)| observed >= current);
|
|
let pods_ok = d
|
|
.status
|
|
.as_ref()
|
|
.and_then(|s| s.ready_replicas)
|
|
.unwrap_or(0)
|
|
>= 1;
|
|
if observed_ok && pods_ok {
|
|
// Belt-and-braces: confirm at least one new Pod is
|
|
// actually Running, not just `ready_replicas=1` leftover
|
|
// from the stale pod's grace period.
|
|
if running_operator_pod_count(client, namespace).await? >= 1 {
|
|
return Ok(());
|
|
}
|
|
}
|
|
if Instant::now() >= deadline {
|
|
anyhow::bail!("operator Deployment did not become Ready within 120s after pod delete");
|
|
}
|
|
tokio::time::sleep(Duration::from_secs(1)).await;
|
|
}
|
|
}
|
|
|
|
async fn running_operator_pod_count(client: &Client, namespace: &str) -> anyhow::Result<usize> {
|
|
let pods: Api<Pod> = Api::namespaced(client.clone(), namespace);
|
|
let selector = format!("app.kubernetes.io/name={OPERATOR_RELEASE_NAME}");
|
|
let list = pods.list(&ListParams::default().labels(&selector)).await?;
|
|
Ok(list
|
|
.items
|
|
.iter()
|
|
.filter(|p| {
|
|
p.status
|
|
.as_ref()
|
|
.and_then(|s| s.phase.as_deref())
|
|
.map(|phase| phase == "Running")
|
|
.unwrap_or(false)
|
|
})
|
|
.count())
|
|
}
|
|
|
|
async fn connect_admin_nats(
|
|
stack: &harmony_fleet_e2e::Stack,
|
|
) -> anyhow::Result<async_nats::Client> {
|
|
async_nats::ConnectOptions::new()
|
|
.user_and_password(stack.admin_user.clone(), stack.admin_pass.clone())
|
|
.connect(&stack.nats_url)
|
|
.await
|
|
.map_err(Into::into)
|
|
}
|