Files
harmony/fleet/harmony-fleet-e2e/tests/operator_recovery.rs
Jean-Gabriel Gill-Couture 086d905586
Some checks failed
Run Check Script / check (pull_request) Failing after 52s
fix(fleet): clarify recovery tests + add missing scenario 4 + dedup test
- Rewrite e2e tests with explicit SETUP/ACTION/ASSERT structure per
  scenario. Each test header documents devices, deployments, and
  expected end state.
- Add scenario 4 test: device offline during restart counts as pending
  (2 devices, 1 deployment, only 1 reports → succeeded=1, pending=1).
- Add apply_state_rejects_older_timestamp unit test (previously claimed
  in docs but missing).
- Fix split_once → rsplit_once in parse_state_key and seed_owned_targets
  (device IDs with dots would silently drop entries).
2026-06-09 16:49:52 -04:00

569 lines
19 KiB
Rust

//! Operator restart + aggregator recovery regression tests (v0.3 Ch2).
//!
//! Each test maps to a scenario in `docs/fleet-operator-recovery-scenarios.md`.
//! Structure per test:
//!
//! 1. SETUP — create devices + deployments, start first aggregator
//! 2. ACTION — kill/restart/delete (the failure being tested)
//! 3. ASSERT — verify convergence + expected end state
//!
//! Gated behind `HARMONY_FLEET_E2E=1` (needs k3d + podman on PATH).
use std::collections::BTreeMap;
use std::time::{Duration, Instant};
use async_nats::jetstream;
use chrono::Utc;
use harmony::modules::fleet::operator::crd::{
Deployment, DeploymentSpec, Device, DeviceSpec, PodmanService, PodmanV0Score, ReconcileScore,
Rollout, RolloutStrategy,
};
use harmony_fleet_e2e::{AdminKv, StackOptions, shared_stack};
use harmony_fleet_operator::fleet_aggregator;
use harmony_fleet_operator::liveness::OperatorLiveness;
use harmony_reconciler_contracts::{
BUCKET_DESIRED_STATE, DeploymentName, DeploymentState, Id, Phase, desired_state_key,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use kube::Client;
use kube::api::{Api, DeleteParams, ObjectMeta, PostParams};
use tokio::task::JoinHandle;
const E2E_ENV: &str = "HARMONY_FLEET_E2E";
const LABEL_KEY: &str = "recovery-grp";
fn e2e_enabled() -> bool {
matches!(std::env::var(E2E_ENV).as_deref(), Ok("1" | "true"))
}
fn skip() {
eprintln!("skipping {E2E_ENV}-gated test (set {E2E_ENV}=1; needs k3d + podman)");
}
// ── Fixtures ────────────────────────────────────────────────────────────
async fn recovery_stack() -> anyhow::Result<std::sync::Arc<harmony_fleet_e2e::Stack>> {
let _ = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("warn")),
)
.try_init();
let stack = shared_stack(StackOptions {
deploy_agent: false,
deploy_operator: false,
..StackOptions::default()
})
.await?;
harmony_fleet_deploy::operator::install_crds().await?;
wait_crds_ready().await?;
Ok(stack)
}
async fn wait_crds_ready() -> anyhow::Result<()> {
let client = Client::try_default().await?;
let deployments: Api<Deployment> = Api::all(client.clone());
let devices: Api<Device> = Api::all(client);
let deadline = Instant::now() + Duration::from_secs(30);
loop {
let ok = deployments.list(&Default::default()).await.is_ok()
&& devices.list(&Default::default()).await.is_ok();
if ok {
return Ok(());
}
if Instant::now() >= deadline {
anyhow::bail!("CRDs not Established within 30s");
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
async fn admin_nats(stack: &harmony_fleet_e2e::Stack) -> anyhow::Result<async_nats::Client> {
Ok(async_nats::ConnectOptions::new()
.user_and_password(stack.admin_user.clone(), stack.admin_pass.clone())
.connect(&stack.nats_url)
.await?)
}
async fn spawn_aggregator(
stack: &harmony_fleet_e2e::Stack,
) -> anyhow::Result<(JoinHandle<()>, OperatorLiveness)> {
let kube = Client::try_default().await?;
let js = jetstream::new(admin_nats(stack).await?);
let liveness = OperatorLiveness::new();
let handle = {
let liveness = liveness.clone();
tokio::spawn(async move {
if let Err(e) = fleet_aggregator::run(kube, js, liveness).await {
tracing::warn!(error = %e, "test aggregator exited");
}
})
};
Ok((handle, liveness))
}
async fn kill(handle: JoinHandle<()>) {
handle.abort();
let _ = handle.await;
}
fn podman_score() -> PodmanV0Score {
PodmanV0Score {
services: vec![PodmanService {
name: "hello".to_string(),
image: "docker.io/library/hello-world:latest".to_string(),
ports: vec![],
env: vec![],
volumes: vec![],
restart_policy: Default::default(),
}],
}
}
fn device_with_label(name: &str, group: &str) -> Device {
let mut labels = BTreeMap::new();
labels.insert(LABEL_KEY.to_string(), group.to_string());
Device {
metadata: ObjectMeta {
name: Some(name.to_string()),
labels: Some(labels),
..Default::default()
},
spec: DeviceSpec { inventory: None },
status: None,
}
}
fn deployment_matching(name: &str, group: &str) -> Deployment {
let mut ml = BTreeMap::new();
ml.insert(LABEL_KEY.to_string(), group.to_string());
Deployment {
metadata: ObjectMeta {
name: Some(name.to_string()),
..Default::default()
},
spec: DeploymentSpec {
target_selector: LabelSelector {
match_labels: Some(ml),
match_expressions: None,
},
score: ReconcileScore::PodmanV0(podman_score()),
rollout: Rollout {
strategy: RolloutStrategy::Immediate,
},
},
status: None,
}
}
async fn wait_until<F, Fut>(budget: Duration, mut f: F) -> anyhow::Result<()>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = bool>,
{
let deadline = Instant::now() + budget;
loop {
if f().await {
return Ok(());
}
if Instant::now() >= deadline {
anyhow::bail!("condition not met within {budget:?}");
}
tokio::time::sleep(Duration::from_millis(300)).await;
}
}
async fn desired_state_present(
stack: &harmony_fleet_e2e::Stack,
device: &str,
deployment: &str,
) -> anyhow::Result<bool> {
let js = jetstream::new(admin_nats(stack).await?);
let bucket = js.get_key_value(BUCKET_DESIRED_STATE).await?;
let key = desired_state_key(device, &DeploymentName::try_new(deployment)?);
Ok(bucket.get(&key).await?.is_some())
}
async fn cleanup(stack: &harmony_fleet_e2e::Stack, devices: &[&str], deployments: &[&str]) {
if let Ok(client) = Client::try_default().await {
let dev_api: Api<Device> = Api::all(client.clone());
let dep_api: Api<Deployment> = Api::namespaced(client, &stack.namespace);
for d in devices {
let _ = dev_api.delete(d, &DeleteParams::default()).await;
}
for d in deployments {
let _ = dep_api.delete(d, &DeleteParams::default()).await;
}
}
}
// ── Scenario 1: cold restart, healthy fleet ─────────────────────────────
//
// SETUP: 1 device (recov1-dev, group=g1)
// 1 deployment (recov1-depl, selector: g1)
// aggregator converges, agent reports Running
//
// ACTION: kill aggregator, start a new one
//
// ASSERT: desired-state survives restart (idempotent re-write)
// health counts rebuilt: succeeded=1, matched=1
// liveness → Converged
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> {
if !e2e_enabled() {
skip();
return Ok(());
}
let stack = recovery_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
let (dev, depl, grp) = ("recov1-dev", "recov1-depl", "g1");
let _ = devices.delete(dev, &DeleteParams::default()).await;
let _ = deployments.delete(depl, &DeleteParams::default()).await;
// SETUP: create device + deployment, run first aggregator to convergence
devices
.create(&PostParams::default(), &device_with_label(dev, grp))
.await?;
deployments
.create(&PostParams::default(), &deployment_matching(depl, grp))
.await?;
let (h1, l1) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async {
l1.is_converged()
&& desired_state_present(&stack, dev, depl)
.await
.unwrap_or(false)
})
.await?;
// Simulate agent having reported Running
let kv = AdminKv::connect(&admin_nats(&stack).await?).await?;
let depl_name = DeploymentName::try_new(depl)?;
kv.put_device_state(&DeploymentState {
device_id: Id::from(dev),
deployment: depl_name.clone(),
phase: Phase::Running,
last_event_at: Utc::now(),
last_error: None,
})
.await?;
// ACTION: kill and restart aggregator
kill(h1).await;
let (h2, l2) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async { l2.is_converged() }).await?;
// ASSERT: desired-state survives, health counts rebuilt from KV
assert!(
desired_state_present(&stack, dev, depl).await?,
"desired-state must persist across restart"
);
wait_until(Duration::from_secs(30), || async {
deployments
.get(depl)
.await
.ok()
.and_then(|cr| cr.status?.aggregate)
.map(|a| a.succeeded == 1 && a.matched_device_count == 1)
.unwrap_or(false)
})
.await?;
kill(h2).await;
cleanup(&stack, &[dev], &[depl]).await;
Ok(())
}
// ── Scenario 2: CR deleted while operator down ──────────────────────────
//
// SETUP: 1 device (recov2-dev, group=g2)
// 1 deployment (recov2-depl, selector: g2)
// aggregator converges → desired-state written
//
// ACTION: kill aggregator
// force-delete the deployment CR (bypasses finalizer)
// start new aggregator
//
// ASSERT: orphan desired-state is GC'd at convergence
// (deployment no longer exists → no targets → entries purged)
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn aggregator_gcs_desired_state_for_deleted_cr() -> anyhow::Result<()> {
if !e2e_enabled() {
skip();
return Ok(());
}
let stack = recovery_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
let (dev, depl, grp) = ("recov2-dev", "recov2-depl", "g2");
let _ = devices.delete(dev, &DeleteParams::default()).await;
let _ = deployments.delete(depl, &DeleteParams::default()).await;
// SETUP
devices
.create(&PostParams::default(), &device_with_label(dev, grp))
.await?;
deployments
.create(&PostParams::default(), &deployment_matching(depl, grp))
.await?;
let (h1, l1) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async {
l1.is_converged()
&& desired_state_present(&stack, dev, depl)
.await
.unwrap_or(false)
})
.await?;
kill(h1).await;
// ACTION: delete CR while operator is down, then restart
deployments.delete(depl, &DeleteParams::default()).await?;
assert!(
desired_state_present(&stack, dev, depl).await?,
"orphan desired-state should still be present before recovery"
);
let (h2, _l2) = spawn_aggregator(&stack).await?;
// ASSERT: orphan is GC'd
wait_until(Duration::from_secs(30), || async {
!desired_state_present(&stack, dev, depl)
.await
.unwrap_or(true)
})
.await?;
kill(h2).await;
cleanup(&stack, &[dev], &[]).await;
Ok(())
}
// ── Scenario 3: two operators racing ────────────────────────────────────
//
// SETUP: 1 device (recov3-dev, group=g3)
// 1 deployment (recov3-depl, selector: g3)
//
// ACTION: start TWO aggregators simultaneously (rolling-deploy overlap)
//
// ASSERT: both converge
// desired-state KV value is byte-identical to the serialized score
// (idempotent multi-writer, no leader election needed)
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn two_aggregators_produce_identical_desired_state() -> anyhow::Result<()> {
if !e2e_enabled() {
skip();
return Ok(());
}
let stack = recovery_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
let (dev, depl, grp) = ("recov3-dev", "recov3-depl", "g3");
let _ = devices.delete(dev, &DeleteParams::default()).await;
let _ = deployments.delete(depl, &DeleteParams::default()).await;
// SETUP
devices
.create(&PostParams::default(), &device_with_label(dev, grp))
.await?;
deployments
.create(&PostParams::default(), &deployment_matching(depl, grp))
.await?;
// ACTION: two operators at once
let (h_a, l_a) = spawn_aggregator(&stack).await?;
let (h_b, l_b) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async {
l_a.is_converged() && l_b.is_converged()
})
.await?;
// ASSERT: KV value is deterministic regardless of writer
let js = jetstream::new(admin_nats(&stack).await?);
let bucket = js.get_key_value(BUCKET_DESIRED_STATE).await?;
let key = desired_state_key(dev, &DeploymentName::try_new(depl)?);
let value = bucket.get(&key).await?.expect("desired-state present");
let expected = serde_json::to_vec(&ReconcileScore::PodmanV0(podman_score()))?;
assert_eq!(
value.to_vec(),
expected,
"racing writers must agree byte-for-byte"
);
kill(h_a).await;
kill(h_b).await;
cleanup(&stack, &[dev], &[depl]).await;
Ok(())
}
// ── Scenario 4: device offline during restart ───────────────────────────
//
// SETUP: 2 devices (recov4-dev-a reporting, recov4-dev-b silent)
// 1 deployment (recov4-depl, selector: g4, matches both)
// aggregator converges
// dev-a reports Running; dev-b never reports
//
// ACTION: kill aggregator, start a new one
//
// ASSERT: dev-a → succeeded (rebuilt from device-state KV)
// dev-b → pending (no state entry = pending)
// matched_device_count = 2
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn device_offline_during_restart_counts_as_pending() -> anyhow::Result<()> {
if !e2e_enabled() {
skip();
return Ok(());
}
let stack = recovery_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
let (dev_a, dev_b, depl, grp) = ("recov4-dev-a", "recov4-dev-b", "recov4-depl", "g4");
for d in [dev_a, dev_b] {
let _ = devices.delete(d, &DeleteParams::default()).await;
}
let _ = deployments.delete(depl, &DeleteParams::default()).await;
// SETUP: 2 devices, 1 deployment matching both
devices
.create(&PostParams::default(), &device_with_label(dev_a, grp))
.await?;
devices
.create(&PostParams::default(), &device_with_label(dev_b, grp))
.await?;
deployments
.create(&PostParams::default(), &deployment_matching(depl, grp))
.await?;
let (h1, l1) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async {
l1.is_converged()
&& desired_state_present(&stack, dev_a, depl)
.await
.unwrap_or(false)
&& desired_state_present(&stack, dev_b, depl)
.await
.unwrap_or(false)
})
.await?;
// Only dev-a reports Running; dev-b stays silent
let kv = AdminKv::connect(&admin_nats(&stack).await?).await?;
let depl_name = DeploymentName::try_new(depl)?;
kv.put_device_state(&DeploymentState {
device_id: Id::from(dev_a),
deployment: depl_name.clone(),
phase: Phase::Running,
last_event_at: Utc::now(),
last_error: None,
})
.await?;
// ACTION: kill and restart aggregator
kill(h1).await;
let (h2, l2) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async { l2.is_converged() }).await?;
// ASSERT: dev-a = succeeded, dev-b = pending, matched = 2
wait_until(Duration::from_secs(30), || async {
deployments
.get(depl)
.await
.ok()
.and_then(|cr| cr.status?.aggregate)
.map(|a| {
a.matched_device_count == 2 && a.succeeded == 1 && a.pending == 1
})
.unwrap_or(false)
})
.await?;
kill(h2).await;
cleanup(&stack, &[dev_a, dev_b], &[depl]).await;
Ok(())
}
// ── Scenario 5: chaos kill under write load ─────────────────────────────
//
// SETUP: 1 device (recov5-dev, group=g5)
// 5 deployments created incrementally (recov5-depl-0..4)
//
// ACTION: create deployments one by one
// kill + restart aggregator mid-stream (after depl-2)
// kill again after all created
// start final aggregator
//
// ASSERT: final aggregator converges all 5 desired-state entries within 30s
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn chaos_kill_under_write_load_converges() -> anyhow::Result<()> {
if !e2e_enabled() {
skip();
return Ok(());
}
let stack = recovery_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
let (dev, grp) = ("recov5-dev", "g5");
let names: Vec<String> = (0..5).map(|i| format!("recov5-depl-{i}")).collect();
let _ = devices.delete(dev, &DeleteParams::default()).await;
for n in &names {
let _ = deployments.delete(n, &DeleteParams::default()).await;
}
// SETUP: 1 device
devices
.create(&PostParams::default(), &device_with_label(dev, grp))
.await?;
// ACTION: create deployments while killing/restarting the operator
let (mut handle, _) = spawn_aggregator(&stack).await?;
for (i, n) in names.iter().enumerate() {
deployments
.create(&PostParams::default(), &deployment_matching(n, grp))
.await?;
if i == 2 {
kill(handle).await;
(handle, _) = spawn_aggregator(&stack).await?;
}
}
kill(handle).await;
// Final aggregator must converge all 5
let (h_final, l_final) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async { l_final.is_converged() }).await?;
// ASSERT: all 5 desired-state entries present
wait_until(Duration::from_secs(30), || async {
for n in &names {
if !desired_state_present(&stack, dev, n).await.unwrap_or(false) {
return false;
}
}
true
})
.await?;
kill(h_final).await;
let dep_refs: Vec<&str> = names.iter().map(String::as_str).collect();
cleanup(&stack, &[dev], &dep_refs).await;
Ok(())
}