Files
harmony/fleet/harmony-fleet-e2e/tests/operator_recovery.rs
Jean-Gabriel Gill-Couture dedfa19380
All checks were successful
Run Check Script / check (pull_request) Successful in 2m36s
feat(fleet): graceful roll-forward upgrade + container-ID identity (Ch5)
Fixes the 30s redeploy loop and adds graceful deployment upgrades — same root:
how the agent identifies a running container vs. its desired spec.

Redeploy-loop fix (container identity by id/name, not spec compare):
- `ensure_service_running` is now liveness-only — a running container is a NOOP,
  no spec comparison (podman ps can't read env/volumes, which made the old
  matches_spec recreate env-bearing services every tick). The periodic tick
  adopts-or-restarts by name and never recreates a healthy container.
- Spec changes are detected by the reconciler's existing byte-compare of the
  desired-state JSON, not by the runtime. The agent records each container's id
  (from start_service) in DeploymentState.container_ids.
- Deleted the matches_spec FIXME and its always-drift hack.

Graceful roll-forward upgrade:
- PodmanV0Score.lifecycle: Option<LifecyclePolicy> (SIGTERM, 30s grace, SIGKILL
  fallback). stop_signal baked into the container at create; grace drives
  `podman stop --time`.
- On a changed score the reconciler stops the exact old container by its
  recorded id (graceful), then starts the new one; dropped services are stopped.
  Roll-forward only — a failure reports Phase::Failed, never reverts.

New ContainerRuntime methods: start_service (→id), container_status, stop_service
(graceful). Reconciler is now generic over `dyn ContainerRuntime`, unit-tested
against a FakeRuntime: tick-idempotency (loop killed), graceful-replace-by-id,
roll-forward-no-revert, unchanged-noop.

Architecture + flagged VM v1->v2->v3 e2e in
ROADMAP/fleet_platform/ch5-graceful-upgrade-status.md.
2026-06-05 15:26:57 -04:00

442 lines
16 KiB
Rust

//! Operator restart + aggregator recovery regression tests (v0.3 Ch2).
//!
//! Each scenario in `docs/fleet-operator-recovery-scenarios.md` has a test
//! here. They drive `fleet_aggregator::run` in-process against the shared
//! NATS + k3d stack, aborting and respawning it to simulate operator
//! restarts. `run` owns its watchers in a `JoinSet`, so a cancelled
//! aggregator leaves no orphan tasks racing the next one.
//!
//! Gated behind `HARMONY_FLEET_E2E=1` (needs k3d + podman on PATH).
use std::collections::BTreeMap;
use std::time::{Duration, Instant};
use async_nats::jetstream;
use chrono::Utc;
use harmony::modules::fleet::operator::crd::{
Deployment, DeploymentSpec, Device, DeviceSpec, PodmanService, PodmanV0Score, ReconcileScore,
Rollout, RolloutStrategy,
};
use harmony_fleet_e2e::{AdminKv, StackOptions, shared_stack};
use harmony_fleet_operator::fleet_aggregator;
use harmony_fleet_operator::liveness::OperatorLiveness;
use harmony_reconciler_contracts::{
BUCKET_DESIRED_STATE, DeploymentName, DeploymentState, Id, Phase, desired_state_key,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use kube::Client;
use kube::api::{Api, DeleteParams, ObjectMeta, PostParams};
use tokio::task::JoinHandle;
const E2E_ENV: &str = "HARMONY_FLEET_E2E";
const LABEL_KEY: &str = "recovery-grp";
fn e2e_enabled() -> bool {
matches!(std::env::var(E2E_ENV).as_deref(), Ok("1" | "true"))
}
fn skip() {
eprintln!("skipping {E2E_ENV}-gated test (set {E2E_ENV}=1; needs k3d + podman)");
}
// ── Fixtures ────────────────────────────────────────────────────────────
/// Shared stack with no in-cluster operator: the tests own the aggregator
/// lifecycle. CRDs are applied directly so the watchers have something to read.
async fn recovery_stack() -> anyhow::Result<std::sync::Arc<harmony_fleet_e2e::Stack>> {
let _ = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("warn")),
)
.try_init();
let stack = shared_stack(StackOptions {
deploy_agent: false,
deploy_operator: false,
..StackOptions::default()
})
.await?;
harmony_fleet_deploy::operator::install_crds().await?;
wait_crds_ready().await?;
Ok(stack)
}
/// CRD apply doesn't block on `Established`; poll until a list succeeds.
async fn wait_crds_ready() -> anyhow::Result<()> {
let client = Client::try_default().await?;
let deployments: Api<Deployment> = Api::all(client.clone());
let devices: Api<Device> = Api::all(client);
let deadline = Instant::now() + Duration::from_secs(30);
loop {
let ok = deployments.list(&Default::default()).await.is_ok()
&& devices.list(&Default::default()).await.is_ok();
if ok {
return Ok(());
}
if Instant::now() >= deadline {
anyhow::bail!("CRDs not Established within 30s");
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
async fn admin_nats(stack: &harmony_fleet_e2e::Stack) -> anyhow::Result<async_nats::Client> {
Ok(async_nats::ConnectOptions::new()
.user_and_password(stack.admin_user.clone(), stack.admin_pass.clone())
.connect(&stack.nats_url)
.await?)
}
/// Spawn an aggregator instance. Returns its handle (abort to "kill the
/// operator") and the liveness it latches on convergence.
async fn spawn_aggregator(
stack: &harmony_fleet_e2e::Stack,
) -> anyhow::Result<(JoinHandle<()>, OperatorLiveness)> {
let kube = Client::try_default().await?;
let js = jetstream::new(admin_nats(stack).await?);
let liveness = OperatorLiveness::new();
let handle = {
let liveness = liveness.clone();
tokio::spawn(async move {
if let Err(e) = fleet_aggregator::run(kube, js, liveness).await {
tracing::warn!(error = %e, "test aggregator exited");
}
})
};
Ok((handle, liveness))
}
/// Abort an aggregator and wait for it (and its `JoinSet` children) to tear down
/// before the next one starts.
async fn kill(handle: JoinHandle<()>) {
handle.abort();
let _ = handle.await;
}
fn podman_score() -> PodmanV0Score {
PodmanV0Score {
services: vec![PodmanService {
name: "hello".to_string(),
image: "docker.io/library/hello-world:latest".to_string(),
ports: vec![],
env: vec![],
volumes: vec![],
restart_policy: Default::default(),
}],
lifecycle: None,
}
}
fn device_with_label(name: &str, group: &str) -> Device {
let mut labels = BTreeMap::new();
labels.insert(LABEL_KEY.to_string(), group.to_string());
Device {
metadata: ObjectMeta {
name: Some(name.to_string()),
labels: Some(labels),
..Default::default()
},
spec: DeviceSpec { inventory: None },
status: None,
}
}
fn deployment_matching(name: &str, group: &str) -> Deployment {
let mut ml = BTreeMap::new();
ml.insert(LABEL_KEY.to_string(), group.to_string());
Deployment {
metadata: ObjectMeta {
name: Some(name.to_string()),
..Default::default()
},
spec: DeploymentSpec {
target_selector: LabelSelector {
match_labels: Some(ml),
match_expressions: None,
},
score: ReconcileScore::PodmanV0(podman_score()),
rollout: Rollout {
strategy: RolloutStrategy::Immediate,
},
},
status: None,
}
}
/// Poll until `f` resolves true or the budget elapses.
async fn wait_until<F, Fut>(budget: Duration, mut f: F) -> anyhow::Result<()>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = bool>,
{
let deadline = Instant::now() + budget;
loop {
if f().await {
return Ok(());
}
if Instant::now() >= deadline {
anyhow::bail!("condition not met within {budget:?}");
}
tokio::time::sleep(Duration::from_millis(300)).await;
}
}
async fn desired_state_present(
stack: &harmony_fleet_e2e::Stack,
device: &str,
deployment: &str,
) -> anyhow::Result<bool> {
let js = jetstream::new(admin_nats(stack).await?);
let bucket = js.get_key_value(BUCKET_DESIRED_STATE).await?;
let key = desired_state_key(device, &DeploymentName::try_new(deployment)?);
Ok(bucket.get(&key).await?.is_some())
}
async fn cleanup(stack: &harmony_fleet_e2e::Stack, devices: &[&str], deployments: &[&str]) {
if let Ok(client) = Client::try_default().await {
let dev_api: Api<Device> = Api::all(client.clone());
let dep_api: Api<Deployment> = Api::namespaced(client, &stack.namespace);
for d in devices {
let _ = dev_api.delete(d, &DeleteParams::default()).await;
}
for d in deployments {
let _ = dep_api.delete(d, &DeleteParams::default()).await;
}
}
}
// ── Scenario 1: cold restart, healthy fleet ─────────────────────────────
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> {
if !e2e_enabled() {
skip();
return Ok(());
}
let stack = recovery_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
let (dev, depl, grp) = ("recov1-dev", "recov1-depl", "g1");
let _ = devices.delete(dev, &DeleteParams::default()).await;
let _ = deployments.delete(depl, &DeleteParams::default()).await;
devices
.create(&PostParams::default(), &device_with_label(dev, grp))
.await?;
deployments
.create(&PostParams::default(), &deployment_matching(depl, grp))
.await?;
// First operator: converges and writes desired-state for the match.
let (h1, l1) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async {
l1.is_converged()
&& desired_state_present(&stack, dev, depl)
.await
.unwrap_or(false)
})
.await?;
// Simulate the agent having reported Running, then kill the operator.
let kv = AdminKv::connect(&admin_nats(&stack).await?).await?;
let depl_name = DeploymentName::try_new(depl)?;
kv.put_device_state(&DeploymentState {
device_id: Id::from(dev),
deployment: depl_name.clone(),
phase: Phase::Running,
last_event_at: Utc::now(),
last_error: None,
container_ids: vec![],
})
.await?;
kill(h1).await;
// Second operator: rebuilds from KV alone.
let (h2, l2) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async { l2.is_converged() }).await?;
// Desired-state survives the restart (idempotent re-write, not churn).
assert!(
desired_state_present(&stack, dev, depl).await?,
"desired-state must persist across restart"
);
// Health count is rebuilt from device-state KV alone.
wait_until(Duration::from_secs(30), || async {
deployments
.get(depl)
.await
.ok()
.and_then(|cr| cr.status?.aggregate)
.map(|a| a.succeeded == 1 && a.matched_device_count == 1)
.unwrap_or(false)
})
.await?;
kill(h2).await;
cleanup(&stack, &[dev], &[depl]).await;
Ok(())
}
// ── Scenario 2: CR deleted while operator down ──────────────────────────
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn aggregator_gcs_desired_state_for_deleted_cr() -> anyhow::Result<()> {
if !e2e_enabled() {
skip();
return Ok(());
}
let stack = recovery_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
let (dev, depl, grp) = ("recov2-dev", "recov2-depl", "g2");
let _ = devices.delete(dev, &DeleteParams::default()).await;
let _ = deployments.delete(depl, &DeleteParams::default()).await;
devices
.create(&PostParams::default(), &device_with_label(dev, grp))
.await?;
deployments
.create(&PostParams::default(), &deployment_matching(depl, grp))
.await?;
let (h1, l1) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async {
l1.is_converged()
&& desired_state_present(&stack, dev, depl)
.await
.unwrap_or(false)
})
.await?;
kill(h1).await;
// CR force-deleted while down — no controller running, so no finalizer
// blocks it. The desired-state entry is now an orphan.
deployments.delete(depl, &DeleteParams::default()).await?;
assert!(
desired_state_present(&stack, dev, depl).await?,
"orphan desired-state should still be present before recovery"
);
// Recovery GCs the orphan once the CR list is replayed.
let (h2, _l2) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async {
!desired_state_present(&stack, dev, depl)
.await
.unwrap_or(true)
})
.await?;
kill(h2).await;
cleanup(&stack, &[dev], &[]).await;
Ok(())
}
// ── Scenario 3: two operators racing ────────────────────────────────────
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn two_aggregators_produce_identical_desired_state() -> anyhow::Result<()> {
if !e2e_enabled() {
skip();
return Ok(());
}
let stack = recovery_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
let (dev, depl, grp) = ("recov3-dev", "recov3-depl", "g3");
let _ = devices.delete(dev, &DeleteParams::default()).await;
let _ = deployments.delete(depl, &DeleteParams::default()).await;
devices
.create(&PostParams::default(), &device_with_label(dev, grp))
.await?;
deployments
.create(&PostParams::default(), &deployment_matching(depl, grp))
.await?;
// Two operators at once (rolling-deploy overlap).
let (h_a, l_a) = spawn_aggregator(&stack).await?;
let (h_b, l_b) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async {
l_a.is_converged() && l_b.is_converged()
})
.await?;
// The KV value is the deterministic serialized score regardless of which
// operator wrote it — idempotent multi-writer, no leader election.
let js = jetstream::new(admin_nats(&stack).await?);
let bucket = js.get_key_value(BUCKET_DESIRED_STATE).await?;
let key = desired_state_key(dev, &DeploymentName::try_new(depl)?);
let value = bucket.get(&key).await?.expect("desired-state present");
let expected = serde_json::to_vec(&ReconcileScore::PodmanV0(podman_score()))?;
assert_eq!(
value.to_vec(),
expected,
"racing writers must agree byte-for-byte"
);
kill(h_a).await;
kill(h_b).await;
cleanup(&stack, &[dev], &[depl]).await;
Ok(())
}
// ── Scenario 5: chaos kill under write load ─────────────────────────────
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn chaos_kill_under_write_load_converges() -> anyhow::Result<()> {
if !e2e_enabled() {
skip();
return Ok(());
}
let stack = recovery_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
let (dev, grp) = ("recov5-dev", "g5");
let names: Vec<String> = (0..5).map(|i| format!("recov5-depl-{i}")).collect();
let _ = devices.delete(dev, &DeleteParams::default()).await;
for n in &names {
let _ = deployments.delete(n, &DeleteParams::default()).await;
}
devices
.create(&PostParams::default(), &device_with_label(dev, grp))
.await?;
// Create deployments while killing/restarting the operator mid-stream.
let (mut handle, _) = spawn_aggregator(&stack).await?;
for (i, n) in names.iter().enumerate() {
deployments
.create(&PostParams::default(), &deployment_matching(n, grp))
.await?;
if i == 2 {
kill(handle).await;
(handle, _) = spawn_aggregator(&stack).await?;
}
}
kill(handle).await;
// A replica that stays up must converge the full set within 30s.
let (h_final, l_final) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async { l_final.is_converged() }).await?;
wait_until(Duration::from_secs(30), || async {
for n in &names {
if !desired_state_present(&stack, dev, n).await.unwrap_or(false) {
return false;
}
}
true
})
.await?;
kill(h_final).await;
let dep_refs: Vec<&str> = names.iter().map(String::as_str).collect();
cleanup(&stack, &[dev], &dep_refs).await;
Ok(())
}