Some checks failed
Run Check Script / check (pull_request) Failing after 52s
- Rewrite e2e tests with explicit SETUP/ACTION/ASSERT structure per scenario. Each test header documents devices, deployments, and expected end state. - Add scenario 4 test: device offline during restart counts as pending (2 devices, 1 deployment, only 1 reports → succeeded=1, pending=1). - Add apply_state_rejects_older_timestamp unit test (previously claimed in docs but missing). - Fix split_once → rsplit_once in parse_state_key and seed_owned_targets (device IDs with dots would silently drop entries).
569 lines
19 KiB
Rust
569 lines
19 KiB
Rust
//! Operator restart + aggregator recovery regression tests (v0.3 Ch2).
|
|
//!
|
|
//! Each test maps to a scenario in `docs/fleet-operator-recovery-scenarios.md`.
|
|
//! Structure per test:
|
|
//!
|
|
//! 1. SETUP — create devices + deployments, start first aggregator
|
|
//! 2. ACTION — kill/restart/delete (the failure being tested)
|
|
//! 3. ASSERT — verify convergence + expected end state
|
|
//!
|
|
//! Gated behind `HARMONY_FLEET_E2E=1` (needs k3d + podman on PATH).
|
|
|
|
use std::collections::BTreeMap;
|
|
use std::time::{Duration, Instant};
|
|
|
|
use async_nats::jetstream;
|
|
use chrono::Utc;
|
|
use harmony::modules::fleet::operator::crd::{
|
|
Deployment, DeploymentSpec, Device, DeviceSpec, PodmanService, PodmanV0Score, ReconcileScore,
|
|
Rollout, RolloutStrategy,
|
|
};
|
|
use harmony_fleet_e2e::{AdminKv, StackOptions, shared_stack};
|
|
use harmony_fleet_operator::fleet_aggregator;
|
|
use harmony_fleet_operator::liveness::OperatorLiveness;
|
|
use harmony_reconciler_contracts::{
|
|
BUCKET_DESIRED_STATE, DeploymentName, DeploymentState, Id, Phase, desired_state_key,
|
|
};
|
|
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
|
|
use kube::Client;
|
|
use kube::api::{Api, DeleteParams, ObjectMeta, PostParams};
|
|
use tokio::task::JoinHandle;
|
|
|
|
const E2E_ENV: &str = "HARMONY_FLEET_E2E";
|
|
const LABEL_KEY: &str = "recovery-grp";
|
|
|
|
fn e2e_enabled() -> bool {
|
|
matches!(std::env::var(E2E_ENV).as_deref(), Ok("1" | "true"))
|
|
}
|
|
|
|
fn skip() {
|
|
eprintln!("skipping {E2E_ENV}-gated test (set {E2E_ENV}=1; needs k3d + podman)");
|
|
}
|
|
|
|
// ── Fixtures ────────────────────────────────────────────────────────────
|
|
|
|
async fn recovery_stack() -> anyhow::Result<std::sync::Arc<harmony_fleet_e2e::Stack>> {
|
|
let _ = tracing_subscriber::fmt()
|
|
.with_env_filter(
|
|
tracing_subscriber::EnvFilter::try_from_default_env()
|
|
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("warn")),
|
|
)
|
|
.try_init();
|
|
|
|
let stack = shared_stack(StackOptions {
|
|
deploy_agent: false,
|
|
deploy_operator: false,
|
|
..StackOptions::default()
|
|
})
|
|
.await?;
|
|
|
|
harmony_fleet_deploy::operator::install_crds().await?;
|
|
wait_crds_ready().await?;
|
|
Ok(stack)
|
|
}
|
|
|
|
async fn wait_crds_ready() -> anyhow::Result<()> {
|
|
let client = Client::try_default().await?;
|
|
let deployments: Api<Deployment> = Api::all(client.clone());
|
|
let devices: Api<Device> = Api::all(client);
|
|
let deadline = Instant::now() + Duration::from_secs(30);
|
|
loop {
|
|
let ok = deployments.list(&Default::default()).await.is_ok()
|
|
&& devices.list(&Default::default()).await.is_ok();
|
|
if ok {
|
|
return Ok(());
|
|
}
|
|
if Instant::now() >= deadline {
|
|
anyhow::bail!("CRDs not Established within 30s");
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
|
}
|
|
}
|
|
|
|
async fn admin_nats(stack: &harmony_fleet_e2e::Stack) -> anyhow::Result<async_nats::Client> {
|
|
Ok(async_nats::ConnectOptions::new()
|
|
.user_and_password(stack.admin_user.clone(), stack.admin_pass.clone())
|
|
.connect(&stack.nats_url)
|
|
.await?)
|
|
}
|
|
|
|
async fn spawn_aggregator(
|
|
stack: &harmony_fleet_e2e::Stack,
|
|
) -> anyhow::Result<(JoinHandle<()>, OperatorLiveness)> {
|
|
let kube = Client::try_default().await?;
|
|
let js = jetstream::new(admin_nats(stack).await?);
|
|
let liveness = OperatorLiveness::new();
|
|
let handle = {
|
|
let liveness = liveness.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(e) = fleet_aggregator::run(kube, js, liveness).await {
|
|
tracing::warn!(error = %e, "test aggregator exited");
|
|
}
|
|
})
|
|
};
|
|
Ok((handle, liveness))
|
|
}
|
|
|
|
async fn kill(handle: JoinHandle<()>) {
|
|
handle.abort();
|
|
let _ = handle.await;
|
|
}
|
|
|
|
fn podman_score() -> PodmanV0Score {
|
|
PodmanV0Score {
|
|
services: vec![PodmanService {
|
|
name: "hello".to_string(),
|
|
image: "docker.io/library/hello-world:latest".to_string(),
|
|
ports: vec![],
|
|
env: vec![],
|
|
volumes: vec![],
|
|
restart_policy: Default::default(),
|
|
}],
|
|
}
|
|
}
|
|
|
|
fn device_with_label(name: &str, group: &str) -> Device {
|
|
let mut labels = BTreeMap::new();
|
|
labels.insert(LABEL_KEY.to_string(), group.to_string());
|
|
Device {
|
|
metadata: ObjectMeta {
|
|
name: Some(name.to_string()),
|
|
labels: Some(labels),
|
|
..Default::default()
|
|
},
|
|
spec: DeviceSpec { inventory: None },
|
|
status: None,
|
|
}
|
|
}
|
|
|
|
fn deployment_matching(name: &str, group: &str) -> Deployment {
|
|
let mut ml = BTreeMap::new();
|
|
ml.insert(LABEL_KEY.to_string(), group.to_string());
|
|
Deployment {
|
|
metadata: ObjectMeta {
|
|
name: Some(name.to_string()),
|
|
..Default::default()
|
|
},
|
|
spec: DeploymentSpec {
|
|
target_selector: LabelSelector {
|
|
match_labels: Some(ml),
|
|
match_expressions: None,
|
|
},
|
|
score: ReconcileScore::PodmanV0(podman_score()),
|
|
rollout: Rollout {
|
|
strategy: RolloutStrategy::Immediate,
|
|
},
|
|
},
|
|
status: None,
|
|
}
|
|
}
|
|
|
|
async fn wait_until<F, Fut>(budget: Duration, mut f: F) -> anyhow::Result<()>
|
|
where
|
|
F: FnMut() -> Fut,
|
|
Fut: std::future::Future<Output = bool>,
|
|
{
|
|
let deadline = Instant::now() + budget;
|
|
loop {
|
|
if f().await {
|
|
return Ok(());
|
|
}
|
|
if Instant::now() >= deadline {
|
|
anyhow::bail!("condition not met within {budget:?}");
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(300)).await;
|
|
}
|
|
}
|
|
|
|
async fn desired_state_present(
|
|
stack: &harmony_fleet_e2e::Stack,
|
|
device: &str,
|
|
deployment: &str,
|
|
) -> anyhow::Result<bool> {
|
|
let js = jetstream::new(admin_nats(stack).await?);
|
|
let bucket = js.get_key_value(BUCKET_DESIRED_STATE).await?;
|
|
let key = desired_state_key(device, &DeploymentName::try_new(deployment)?);
|
|
Ok(bucket.get(&key).await?.is_some())
|
|
}
|
|
|
|
async fn cleanup(stack: &harmony_fleet_e2e::Stack, devices: &[&str], deployments: &[&str]) {
|
|
if let Ok(client) = Client::try_default().await {
|
|
let dev_api: Api<Device> = Api::all(client.clone());
|
|
let dep_api: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
|
for d in devices {
|
|
let _ = dev_api.delete(d, &DeleteParams::default()).await;
|
|
}
|
|
for d in deployments {
|
|
let _ = dep_api.delete(d, &DeleteParams::default()).await;
|
|
}
|
|
}
|
|
}
|
|
|
|
// ── Scenario 1: cold restart, healthy fleet ─────────────────────────────
|
|
//
|
|
// SETUP: 1 device (recov1-dev, group=g1)
|
|
// 1 deployment (recov1-depl, selector: g1)
|
|
// aggregator converges, agent reports Running
|
|
//
|
|
// ACTION: kill aggregator, start a new one
|
|
//
|
|
// ASSERT: desired-state survives restart (idempotent re-write)
|
|
// health counts rebuilt: succeeded=1, matched=1
|
|
// liveness → Converged
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
|
async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> {
|
|
if !e2e_enabled() {
|
|
skip();
|
|
return Ok(());
|
|
}
|
|
let stack = recovery_stack().await?;
|
|
let client = Client::try_default().await?;
|
|
let devices: Api<Device> = Api::all(client.clone());
|
|
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
|
|
|
let (dev, depl, grp) = ("recov1-dev", "recov1-depl", "g1");
|
|
let _ = devices.delete(dev, &DeleteParams::default()).await;
|
|
let _ = deployments.delete(depl, &DeleteParams::default()).await;
|
|
|
|
// SETUP: create device + deployment, run first aggregator to convergence
|
|
devices
|
|
.create(&PostParams::default(), &device_with_label(dev, grp))
|
|
.await?;
|
|
deployments
|
|
.create(&PostParams::default(), &deployment_matching(depl, grp))
|
|
.await?;
|
|
|
|
let (h1, l1) = spawn_aggregator(&stack).await?;
|
|
wait_until(Duration::from_secs(30), || async {
|
|
l1.is_converged()
|
|
&& desired_state_present(&stack, dev, depl)
|
|
.await
|
|
.unwrap_or(false)
|
|
})
|
|
.await?;
|
|
|
|
// Simulate agent having reported Running
|
|
let kv = AdminKv::connect(&admin_nats(&stack).await?).await?;
|
|
let depl_name = DeploymentName::try_new(depl)?;
|
|
kv.put_device_state(&DeploymentState {
|
|
device_id: Id::from(dev),
|
|
deployment: depl_name.clone(),
|
|
phase: Phase::Running,
|
|
last_event_at: Utc::now(),
|
|
last_error: None,
|
|
})
|
|
.await?;
|
|
|
|
// ACTION: kill and restart aggregator
|
|
kill(h1).await;
|
|
let (h2, l2) = spawn_aggregator(&stack).await?;
|
|
wait_until(Duration::from_secs(30), || async { l2.is_converged() }).await?;
|
|
|
|
// ASSERT: desired-state survives, health counts rebuilt from KV
|
|
assert!(
|
|
desired_state_present(&stack, dev, depl).await?,
|
|
"desired-state must persist across restart"
|
|
);
|
|
wait_until(Duration::from_secs(30), || async {
|
|
deployments
|
|
.get(depl)
|
|
.await
|
|
.ok()
|
|
.and_then(|cr| cr.status?.aggregate)
|
|
.map(|a| a.succeeded == 1 && a.matched_device_count == 1)
|
|
.unwrap_or(false)
|
|
})
|
|
.await?;
|
|
|
|
kill(h2).await;
|
|
cleanup(&stack, &[dev], &[depl]).await;
|
|
Ok(())
|
|
}
|
|
|
|
// ── Scenario 2: CR deleted while operator down ──────────────────────────
|
|
//
|
|
// SETUP: 1 device (recov2-dev, group=g2)
|
|
// 1 deployment (recov2-depl, selector: g2)
|
|
// aggregator converges → desired-state written
|
|
//
|
|
// ACTION: kill aggregator
|
|
// force-delete the deployment CR (bypasses finalizer)
|
|
// start new aggregator
|
|
//
|
|
// ASSERT: orphan desired-state is GC'd at convergence
|
|
// (deployment no longer exists → no targets → entries purged)
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
|
async fn aggregator_gcs_desired_state_for_deleted_cr() -> anyhow::Result<()> {
|
|
if !e2e_enabled() {
|
|
skip();
|
|
return Ok(());
|
|
}
|
|
let stack = recovery_stack().await?;
|
|
let client = Client::try_default().await?;
|
|
let devices: Api<Device> = Api::all(client.clone());
|
|
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
|
|
|
let (dev, depl, grp) = ("recov2-dev", "recov2-depl", "g2");
|
|
let _ = devices.delete(dev, &DeleteParams::default()).await;
|
|
let _ = deployments.delete(depl, &DeleteParams::default()).await;
|
|
|
|
// SETUP
|
|
devices
|
|
.create(&PostParams::default(), &device_with_label(dev, grp))
|
|
.await?;
|
|
deployments
|
|
.create(&PostParams::default(), &deployment_matching(depl, grp))
|
|
.await?;
|
|
|
|
let (h1, l1) = spawn_aggregator(&stack).await?;
|
|
wait_until(Duration::from_secs(30), || async {
|
|
l1.is_converged()
|
|
&& desired_state_present(&stack, dev, depl)
|
|
.await
|
|
.unwrap_or(false)
|
|
})
|
|
.await?;
|
|
kill(h1).await;
|
|
|
|
// ACTION: delete CR while operator is down, then restart
|
|
deployments.delete(depl, &DeleteParams::default()).await?;
|
|
assert!(
|
|
desired_state_present(&stack, dev, depl).await?,
|
|
"orphan desired-state should still be present before recovery"
|
|
);
|
|
|
|
let (h2, _l2) = spawn_aggregator(&stack).await?;
|
|
|
|
// ASSERT: orphan is GC'd
|
|
wait_until(Duration::from_secs(30), || async {
|
|
!desired_state_present(&stack, dev, depl)
|
|
.await
|
|
.unwrap_or(true)
|
|
})
|
|
.await?;
|
|
|
|
kill(h2).await;
|
|
cleanup(&stack, &[dev], &[]).await;
|
|
Ok(())
|
|
}
|
|
|
|
// ── Scenario 3: two operators racing ────────────────────────────────────
|
|
//
|
|
// SETUP: 1 device (recov3-dev, group=g3)
|
|
// 1 deployment (recov3-depl, selector: g3)
|
|
//
|
|
// ACTION: start TWO aggregators simultaneously (rolling-deploy overlap)
|
|
//
|
|
// ASSERT: both converge
|
|
// desired-state KV value is byte-identical to the serialized score
|
|
// (idempotent multi-writer, no leader election needed)
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
|
async fn two_aggregators_produce_identical_desired_state() -> anyhow::Result<()> {
|
|
if !e2e_enabled() {
|
|
skip();
|
|
return Ok(());
|
|
}
|
|
let stack = recovery_stack().await?;
|
|
let client = Client::try_default().await?;
|
|
let devices: Api<Device> = Api::all(client.clone());
|
|
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
|
|
|
let (dev, depl, grp) = ("recov3-dev", "recov3-depl", "g3");
|
|
let _ = devices.delete(dev, &DeleteParams::default()).await;
|
|
let _ = deployments.delete(depl, &DeleteParams::default()).await;
|
|
|
|
// SETUP
|
|
devices
|
|
.create(&PostParams::default(), &device_with_label(dev, grp))
|
|
.await?;
|
|
deployments
|
|
.create(&PostParams::default(), &deployment_matching(depl, grp))
|
|
.await?;
|
|
|
|
// ACTION: two operators at once
|
|
let (h_a, l_a) = spawn_aggregator(&stack).await?;
|
|
let (h_b, l_b) = spawn_aggregator(&stack).await?;
|
|
wait_until(Duration::from_secs(30), || async {
|
|
l_a.is_converged() && l_b.is_converged()
|
|
})
|
|
.await?;
|
|
|
|
// ASSERT: KV value is deterministic regardless of writer
|
|
let js = jetstream::new(admin_nats(&stack).await?);
|
|
let bucket = js.get_key_value(BUCKET_DESIRED_STATE).await?;
|
|
let key = desired_state_key(dev, &DeploymentName::try_new(depl)?);
|
|
let value = bucket.get(&key).await?.expect("desired-state present");
|
|
let expected = serde_json::to_vec(&ReconcileScore::PodmanV0(podman_score()))?;
|
|
assert_eq!(
|
|
value.to_vec(),
|
|
expected,
|
|
"racing writers must agree byte-for-byte"
|
|
);
|
|
|
|
kill(h_a).await;
|
|
kill(h_b).await;
|
|
cleanup(&stack, &[dev], &[depl]).await;
|
|
Ok(())
|
|
}
|
|
|
|
// ── Scenario 4: device offline during restart ───────────────────────────
|
|
//
|
|
// SETUP: 2 devices (recov4-dev-a reporting, recov4-dev-b silent)
|
|
// 1 deployment (recov4-depl, selector: g4, matches both)
|
|
// aggregator converges
|
|
// dev-a reports Running; dev-b never reports
|
|
//
|
|
// ACTION: kill aggregator, start a new one
|
|
//
|
|
// ASSERT: dev-a → succeeded (rebuilt from device-state KV)
|
|
// dev-b → pending (no state entry = pending)
|
|
// matched_device_count = 2
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
|
async fn device_offline_during_restart_counts_as_pending() -> anyhow::Result<()> {
|
|
if !e2e_enabled() {
|
|
skip();
|
|
return Ok(());
|
|
}
|
|
let stack = recovery_stack().await?;
|
|
let client = Client::try_default().await?;
|
|
let devices: Api<Device> = Api::all(client.clone());
|
|
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
|
|
|
let (dev_a, dev_b, depl, grp) = ("recov4-dev-a", "recov4-dev-b", "recov4-depl", "g4");
|
|
for d in [dev_a, dev_b] {
|
|
let _ = devices.delete(d, &DeleteParams::default()).await;
|
|
}
|
|
let _ = deployments.delete(depl, &DeleteParams::default()).await;
|
|
|
|
// SETUP: 2 devices, 1 deployment matching both
|
|
devices
|
|
.create(&PostParams::default(), &device_with_label(dev_a, grp))
|
|
.await?;
|
|
devices
|
|
.create(&PostParams::default(), &device_with_label(dev_b, grp))
|
|
.await?;
|
|
deployments
|
|
.create(&PostParams::default(), &deployment_matching(depl, grp))
|
|
.await?;
|
|
|
|
let (h1, l1) = spawn_aggregator(&stack).await?;
|
|
wait_until(Duration::from_secs(30), || async {
|
|
l1.is_converged()
|
|
&& desired_state_present(&stack, dev_a, depl)
|
|
.await
|
|
.unwrap_or(false)
|
|
&& desired_state_present(&stack, dev_b, depl)
|
|
.await
|
|
.unwrap_or(false)
|
|
})
|
|
.await?;
|
|
|
|
// Only dev-a reports Running; dev-b stays silent
|
|
let kv = AdminKv::connect(&admin_nats(&stack).await?).await?;
|
|
let depl_name = DeploymentName::try_new(depl)?;
|
|
kv.put_device_state(&DeploymentState {
|
|
device_id: Id::from(dev_a),
|
|
deployment: depl_name.clone(),
|
|
phase: Phase::Running,
|
|
last_event_at: Utc::now(),
|
|
last_error: None,
|
|
})
|
|
.await?;
|
|
|
|
// ACTION: kill and restart aggregator
|
|
kill(h1).await;
|
|
let (h2, l2) = spawn_aggregator(&stack).await?;
|
|
wait_until(Duration::from_secs(30), || async { l2.is_converged() }).await?;
|
|
|
|
// ASSERT: dev-a = succeeded, dev-b = pending, matched = 2
|
|
wait_until(Duration::from_secs(30), || async {
|
|
deployments
|
|
.get(depl)
|
|
.await
|
|
.ok()
|
|
.and_then(|cr| cr.status?.aggregate)
|
|
.map(|a| {
|
|
a.matched_device_count == 2 && a.succeeded == 1 && a.pending == 1
|
|
})
|
|
.unwrap_or(false)
|
|
})
|
|
.await?;
|
|
|
|
kill(h2).await;
|
|
cleanup(&stack, &[dev_a, dev_b], &[depl]).await;
|
|
Ok(())
|
|
}
|
|
|
|
// ── Scenario 5: chaos kill under write load ─────────────────────────────
|
|
//
|
|
// SETUP: 1 device (recov5-dev, group=g5)
|
|
// 5 deployments created incrementally (recov5-depl-0..4)
|
|
//
|
|
// ACTION: create deployments one by one
|
|
// kill + restart aggregator mid-stream (after depl-2)
|
|
// kill again after all created
|
|
// start final aggregator
|
|
//
|
|
// ASSERT: final aggregator converges all 5 desired-state entries within 30s
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
|
async fn chaos_kill_under_write_load_converges() -> anyhow::Result<()> {
|
|
if !e2e_enabled() {
|
|
skip();
|
|
return Ok(());
|
|
}
|
|
let stack = recovery_stack().await?;
|
|
let client = Client::try_default().await?;
|
|
let devices: Api<Device> = Api::all(client.clone());
|
|
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
|
|
|
let (dev, grp) = ("recov5-dev", "g5");
|
|
let names: Vec<String> = (0..5).map(|i| format!("recov5-depl-{i}")).collect();
|
|
let _ = devices.delete(dev, &DeleteParams::default()).await;
|
|
for n in &names {
|
|
let _ = deployments.delete(n, &DeleteParams::default()).await;
|
|
}
|
|
|
|
// SETUP: 1 device
|
|
devices
|
|
.create(&PostParams::default(), &device_with_label(dev, grp))
|
|
.await?;
|
|
|
|
// ACTION: create deployments while killing/restarting the operator
|
|
let (mut handle, _) = spawn_aggregator(&stack).await?;
|
|
for (i, n) in names.iter().enumerate() {
|
|
deployments
|
|
.create(&PostParams::default(), &deployment_matching(n, grp))
|
|
.await?;
|
|
if i == 2 {
|
|
kill(handle).await;
|
|
(handle, _) = spawn_aggregator(&stack).await?;
|
|
}
|
|
}
|
|
kill(handle).await;
|
|
|
|
// Final aggregator must converge all 5
|
|
let (h_final, l_final) = spawn_aggregator(&stack).await?;
|
|
wait_until(Duration::from_secs(30), || async { l_final.is_converged() }).await?;
|
|
|
|
// ASSERT: all 5 desired-state entries present
|
|
wait_until(Duration::from_secs(30), || async {
|
|
for n in &names {
|
|
if !desired_state_present(&stack, dev, n).await.unwrap_or(false) {
|
|
return false;
|
|
}
|
|
}
|
|
true
|
|
})
|
|
.await?;
|
|
|
|
kill(h_final).await;
|
|
let dep_refs: Vec<&str> = names.iter().map(String::as_str).collect();
|
|
cleanup(&stack, &[dev], &dep_refs).await;
|
|
Ok(())
|
|
}
|