All checks were successful
Run Check Script / check (pull_request) Successful in 2m36s
Fixes the 30s redeploy loop and adds graceful deployment upgrades — same root: how the agent identifies a running container vs. its desired spec. Redeploy-loop fix (container identity by id/name, not spec compare): - `ensure_service_running` is now liveness-only — a running container is a NOOP, no spec comparison (podman ps can't read env/volumes, which made the old matches_spec recreate env-bearing services every tick). The periodic tick adopts-or-restarts by name and never recreates a healthy container. - Spec changes are detected by the reconciler's existing byte-compare of the desired-state JSON, not by the runtime. The agent records each container's id (from start_service) in DeploymentState.container_ids. - Deleted the matches_spec FIXME and its always-drift hack. Graceful roll-forward upgrade: - PodmanV0Score.lifecycle: Option<LifecyclePolicy> (SIGTERM, 30s grace, SIGKILL fallback). stop_signal baked into the container at create; grace drives `podman stop --time`. - On a changed score the reconciler stops the exact old container by its recorded id (graceful), then starts the new one; dropped services are stopped. Roll-forward only — a failure reports Phase::Failed, never reverts. New ContainerRuntime methods: start_service (→id), container_status, stop_service (graceful). Reconciler is now generic over `dyn ContainerRuntime`, unit-tested against a FakeRuntime: tick-idempotency (loop killed), graceful-replace-by-id, roll-forward-no-revert, unchanged-noop. Architecture + flagged VM v1->v2->v3 e2e in ROADMAP/fleet_platform/ch5-graceful-upgrade-status.md.
442 lines
16 KiB
Rust
442 lines
16 KiB
Rust
//! Operator restart + aggregator recovery regression tests (v0.3 Ch2).
|
|
//!
|
|
//! Each scenario in `docs/fleet-operator-recovery-scenarios.md` has a test
|
|
//! here. They drive `fleet_aggregator::run` in-process against the shared
|
|
//! NATS + k3d stack, aborting and respawning it to simulate operator
|
|
//! restarts. `run` owns its watchers in a `JoinSet`, so a cancelled
|
|
//! aggregator leaves no orphan tasks racing the next one.
|
|
//!
|
|
//! Gated behind `HARMONY_FLEET_E2E=1` (needs k3d + podman on PATH).
|
|
|
|
use std::collections::BTreeMap;
|
|
use std::time::{Duration, Instant};
|
|
|
|
use async_nats::jetstream;
|
|
use chrono::Utc;
|
|
use harmony::modules::fleet::operator::crd::{
|
|
Deployment, DeploymentSpec, Device, DeviceSpec, PodmanService, PodmanV0Score, ReconcileScore,
|
|
Rollout, RolloutStrategy,
|
|
};
|
|
use harmony_fleet_e2e::{AdminKv, StackOptions, shared_stack};
|
|
use harmony_fleet_operator::fleet_aggregator;
|
|
use harmony_fleet_operator::liveness::OperatorLiveness;
|
|
use harmony_reconciler_contracts::{
|
|
BUCKET_DESIRED_STATE, DeploymentName, DeploymentState, Id, Phase, desired_state_key,
|
|
};
|
|
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
|
|
use kube::Client;
|
|
use kube::api::{Api, DeleteParams, ObjectMeta, PostParams};
|
|
use tokio::task::JoinHandle;
|
|
|
|
const E2E_ENV: &str = "HARMONY_FLEET_E2E";
|
|
const LABEL_KEY: &str = "recovery-grp";
|
|
|
|
fn e2e_enabled() -> bool {
|
|
matches!(std::env::var(E2E_ENV).as_deref(), Ok("1" | "true"))
|
|
}
|
|
|
|
fn skip() {
|
|
eprintln!("skipping {E2E_ENV}-gated test (set {E2E_ENV}=1; needs k3d + podman)");
|
|
}
|
|
|
|
// ── Fixtures ────────────────────────────────────────────────────────────
|
|
|
|
/// Shared stack with no in-cluster operator: the tests own the aggregator
|
|
/// lifecycle. CRDs are applied directly so the watchers have something to read.
|
|
async fn recovery_stack() -> anyhow::Result<std::sync::Arc<harmony_fleet_e2e::Stack>> {
|
|
let _ = tracing_subscriber::fmt()
|
|
.with_env_filter(
|
|
tracing_subscriber::EnvFilter::try_from_default_env()
|
|
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("warn")),
|
|
)
|
|
.try_init();
|
|
|
|
let stack = shared_stack(StackOptions {
|
|
deploy_agent: false,
|
|
deploy_operator: false,
|
|
..StackOptions::default()
|
|
})
|
|
.await?;
|
|
|
|
harmony_fleet_deploy::operator::install_crds().await?;
|
|
wait_crds_ready().await?;
|
|
Ok(stack)
|
|
}
|
|
|
|
/// CRD apply doesn't block on `Established`; poll until a list succeeds.
|
|
async fn wait_crds_ready() -> anyhow::Result<()> {
|
|
let client = Client::try_default().await?;
|
|
let deployments: Api<Deployment> = Api::all(client.clone());
|
|
let devices: Api<Device> = Api::all(client);
|
|
let deadline = Instant::now() + Duration::from_secs(30);
|
|
loop {
|
|
let ok = deployments.list(&Default::default()).await.is_ok()
|
|
&& devices.list(&Default::default()).await.is_ok();
|
|
if ok {
|
|
return Ok(());
|
|
}
|
|
if Instant::now() >= deadline {
|
|
anyhow::bail!("CRDs not Established within 30s");
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
|
}
|
|
}
|
|
|
|
async fn admin_nats(stack: &harmony_fleet_e2e::Stack) -> anyhow::Result<async_nats::Client> {
|
|
Ok(async_nats::ConnectOptions::new()
|
|
.user_and_password(stack.admin_user.clone(), stack.admin_pass.clone())
|
|
.connect(&stack.nats_url)
|
|
.await?)
|
|
}
|
|
|
|
/// Spawn an aggregator instance. Returns its handle (abort to "kill the
|
|
/// operator") and the liveness it latches on convergence.
|
|
async fn spawn_aggregator(
|
|
stack: &harmony_fleet_e2e::Stack,
|
|
) -> anyhow::Result<(JoinHandle<()>, OperatorLiveness)> {
|
|
let kube = Client::try_default().await?;
|
|
let js = jetstream::new(admin_nats(stack).await?);
|
|
let liveness = OperatorLiveness::new();
|
|
let handle = {
|
|
let liveness = liveness.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(e) = fleet_aggregator::run(kube, js, liveness).await {
|
|
tracing::warn!(error = %e, "test aggregator exited");
|
|
}
|
|
})
|
|
};
|
|
Ok((handle, liveness))
|
|
}
|
|
|
|
/// Abort an aggregator and wait for it (and its `JoinSet` children) to tear down
|
|
/// before the next one starts.
|
|
async fn kill(handle: JoinHandle<()>) {
|
|
handle.abort();
|
|
let _ = handle.await;
|
|
}
|
|
|
|
fn podman_score() -> PodmanV0Score {
|
|
PodmanV0Score {
|
|
services: vec![PodmanService {
|
|
name: "hello".to_string(),
|
|
image: "docker.io/library/hello-world:latest".to_string(),
|
|
ports: vec![],
|
|
env: vec![],
|
|
volumes: vec![],
|
|
restart_policy: Default::default(),
|
|
}],
|
|
lifecycle: None,
|
|
}
|
|
}
|
|
|
|
fn device_with_label(name: &str, group: &str) -> Device {
|
|
let mut labels = BTreeMap::new();
|
|
labels.insert(LABEL_KEY.to_string(), group.to_string());
|
|
Device {
|
|
metadata: ObjectMeta {
|
|
name: Some(name.to_string()),
|
|
labels: Some(labels),
|
|
..Default::default()
|
|
},
|
|
spec: DeviceSpec { inventory: None },
|
|
status: None,
|
|
}
|
|
}
|
|
|
|
fn deployment_matching(name: &str, group: &str) -> Deployment {
|
|
let mut ml = BTreeMap::new();
|
|
ml.insert(LABEL_KEY.to_string(), group.to_string());
|
|
Deployment {
|
|
metadata: ObjectMeta {
|
|
name: Some(name.to_string()),
|
|
..Default::default()
|
|
},
|
|
spec: DeploymentSpec {
|
|
target_selector: LabelSelector {
|
|
match_labels: Some(ml),
|
|
match_expressions: None,
|
|
},
|
|
score: ReconcileScore::PodmanV0(podman_score()),
|
|
rollout: Rollout {
|
|
strategy: RolloutStrategy::Immediate,
|
|
},
|
|
},
|
|
status: None,
|
|
}
|
|
}
|
|
|
|
/// Poll until `f` resolves true or the budget elapses.
|
|
async fn wait_until<F, Fut>(budget: Duration, mut f: F) -> anyhow::Result<()>
|
|
where
|
|
F: FnMut() -> Fut,
|
|
Fut: std::future::Future<Output = bool>,
|
|
{
|
|
let deadline = Instant::now() + budget;
|
|
loop {
|
|
if f().await {
|
|
return Ok(());
|
|
}
|
|
if Instant::now() >= deadline {
|
|
anyhow::bail!("condition not met within {budget:?}");
|
|
}
|
|
tokio::time::sleep(Duration::from_millis(300)).await;
|
|
}
|
|
}
|
|
|
|
async fn desired_state_present(
|
|
stack: &harmony_fleet_e2e::Stack,
|
|
device: &str,
|
|
deployment: &str,
|
|
) -> anyhow::Result<bool> {
|
|
let js = jetstream::new(admin_nats(stack).await?);
|
|
let bucket = js.get_key_value(BUCKET_DESIRED_STATE).await?;
|
|
let key = desired_state_key(device, &DeploymentName::try_new(deployment)?);
|
|
Ok(bucket.get(&key).await?.is_some())
|
|
}
|
|
|
|
async fn cleanup(stack: &harmony_fleet_e2e::Stack, devices: &[&str], deployments: &[&str]) {
|
|
if let Ok(client) = Client::try_default().await {
|
|
let dev_api: Api<Device> = Api::all(client.clone());
|
|
let dep_api: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
|
for d in devices {
|
|
let _ = dev_api.delete(d, &DeleteParams::default()).await;
|
|
}
|
|
for d in deployments {
|
|
let _ = dep_api.delete(d, &DeleteParams::default()).await;
|
|
}
|
|
}
|
|
}
|
|
|
|
// ── Scenario 1: cold restart, healthy fleet ─────────────────────────────
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
|
async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> {
|
|
if !e2e_enabled() {
|
|
skip();
|
|
return Ok(());
|
|
}
|
|
let stack = recovery_stack().await?;
|
|
let client = Client::try_default().await?;
|
|
let devices: Api<Device> = Api::all(client.clone());
|
|
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
|
|
|
let (dev, depl, grp) = ("recov1-dev", "recov1-depl", "g1");
|
|
let _ = devices.delete(dev, &DeleteParams::default()).await;
|
|
let _ = deployments.delete(depl, &DeleteParams::default()).await;
|
|
devices
|
|
.create(&PostParams::default(), &device_with_label(dev, grp))
|
|
.await?;
|
|
deployments
|
|
.create(&PostParams::default(), &deployment_matching(depl, grp))
|
|
.await?;
|
|
|
|
// First operator: converges and writes desired-state for the match.
|
|
let (h1, l1) = spawn_aggregator(&stack).await?;
|
|
wait_until(Duration::from_secs(30), || async {
|
|
l1.is_converged()
|
|
&& desired_state_present(&stack, dev, depl)
|
|
.await
|
|
.unwrap_or(false)
|
|
})
|
|
.await?;
|
|
|
|
// Simulate the agent having reported Running, then kill the operator.
|
|
let kv = AdminKv::connect(&admin_nats(&stack).await?).await?;
|
|
let depl_name = DeploymentName::try_new(depl)?;
|
|
kv.put_device_state(&DeploymentState {
|
|
device_id: Id::from(dev),
|
|
deployment: depl_name.clone(),
|
|
phase: Phase::Running,
|
|
last_event_at: Utc::now(),
|
|
last_error: None,
|
|
container_ids: vec![],
|
|
})
|
|
.await?;
|
|
kill(h1).await;
|
|
|
|
// Second operator: rebuilds from KV alone.
|
|
let (h2, l2) = spawn_aggregator(&stack).await?;
|
|
wait_until(Duration::from_secs(30), || async { l2.is_converged() }).await?;
|
|
|
|
// Desired-state survives the restart (idempotent re-write, not churn).
|
|
assert!(
|
|
desired_state_present(&stack, dev, depl).await?,
|
|
"desired-state must persist across restart"
|
|
);
|
|
// Health count is rebuilt from device-state KV alone.
|
|
wait_until(Duration::from_secs(30), || async {
|
|
deployments
|
|
.get(depl)
|
|
.await
|
|
.ok()
|
|
.and_then(|cr| cr.status?.aggregate)
|
|
.map(|a| a.succeeded == 1 && a.matched_device_count == 1)
|
|
.unwrap_or(false)
|
|
})
|
|
.await?;
|
|
|
|
kill(h2).await;
|
|
cleanup(&stack, &[dev], &[depl]).await;
|
|
Ok(())
|
|
}
|
|
|
|
// ── Scenario 2: CR deleted while operator down ──────────────────────────
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
|
async fn aggregator_gcs_desired_state_for_deleted_cr() -> anyhow::Result<()> {
|
|
if !e2e_enabled() {
|
|
skip();
|
|
return Ok(());
|
|
}
|
|
let stack = recovery_stack().await?;
|
|
let client = Client::try_default().await?;
|
|
let devices: Api<Device> = Api::all(client.clone());
|
|
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
|
|
|
let (dev, depl, grp) = ("recov2-dev", "recov2-depl", "g2");
|
|
let _ = devices.delete(dev, &DeleteParams::default()).await;
|
|
let _ = deployments.delete(depl, &DeleteParams::default()).await;
|
|
devices
|
|
.create(&PostParams::default(), &device_with_label(dev, grp))
|
|
.await?;
|
|
deployments
|
|
.create(&PostParams::default(), &deployment_matching(depl, grp))
|
|
.await?;
|
|
|
|
let (h1, l1) = spawn_aggregator(&stack).await?;
|
|
wait_until(Duration::from_secs(30), || async {
|
|
l1.is_converged()
|
|
&& desired_state_present(&stack, dev, depl)
|
|
.await
|
|
.unwrap_or(false)
|
|
})
|
|
.await?;
|
|
kill(h1).await;
|
|
|
|
// CR force-deleted while down — no controller running, so no finalizer
|
|
// blocks it. The desired-state entry is now an orphan.
|
|
deployments.delete(depl, &DeleteParams::default()).await?;
|
|
assert!(
|
|
desired_state_present(&stack, dev, depl).await?,
|
|
"orphan desired-state should still be present before recovery"
|
|
);
|
|
|
|
// Recovery GCs the orphan once the CR list is replayed.
|
|
let (h2, _l2) = spawn_aggregator(&stack).await?;
|
|
wait_until(Duration::from_secs(30), || async {
|
|
!desired_state_present(&stack, dev, depl)
|
|
.await
|
|
.unwrap_or(true)
|
|
})
|
|
.await?;
|
|
|
|
kill(h2).await;
|
|
cleanup(&stack, &[dev], &[]).await;
|
|
Ok(())
|
|
}
|
|
|
|
// ── Scenario 3: two operators racing ────────────────────────────────────
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
|
async fn two_aggregators_produce_identical_desired_state() -> anyhow::Result<()> {
|
|
if !e2e_enabled() {
|
|
skip();
|
|
return Ok(());
|
|
}
|
|
let stack = recovery_stack().await?;
|
|
let client = Client::try_default().await?;
|
|
let devices: Api<Device> = Api::all(client.clone());
|
|
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
|
|
|
let (dev, depl, grp) = ("recov3-dev", "recov3-depl", "g3");
|
|
let _ = devices.delete(dev, &DeleteParams::default()).await;
|
|
let _ = deployments.delete(depl, &DeleteParams::default()).await;
|
|
devices
|
|
.create(&PostParams::default(), &device_with_label(dev, grp))
|
|
.await?;
|
|
deployments
|
|
.create(&PostParams::default(), &deployment_matching(depl, grp))
|
|
.await?;
|
|
|
|
// Two operators at once (rolling-deploy overlap).
|
|
let (h_a, l_a) = spawn_aggregator(&stack).await?;
|
|
let (h_b, l_b) = spawn_aggregator(&stack).await?;
|
|
wait_until(Duration::from_secs(30), || async {
|
|
l_a.is_converged() && l_b.is_converged()
|
|
})
|
|
.await?;
|
|
|
|
// The KV value is the deterministic serialized score regardless of which
|
|
// operator wrote it — idempotent multi-writer, no leader election.
|
|
let js = jetstream::new(admin_nats(&stack).await?);
|
|
let bucket = js.get_key_value(BUCKET_DESIRED_STATE).await?;
|
|
let key = desired_state_key(dev, &DeploymentName::try_new(depl)?);
|
|
let value = bucket.get(&key).await?.expect("desired-state present");
|
|
let expected = serde_json::to_vec(&ReconcileScore::PodmanV0(podman_score()))?;
|
|
assert_eq!(
|
|
value.to_vec(),
|
|
expected,
|
|
"racing writers must agree byte-for-byte"
|
|
);
|
|
|
|
kill(h_a).await;
|
|
kill(h_b).await;
|
|
cleanup(&stack, &[dev], &[depl]).await;
|
|
Ok(())
|
|
}
|
|
|
|
// ── Scenario 5: chaos kill under write load ─────────────────────────────
|
|
|
|
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
|
async fn chaos_kill_under_write_load_converges() -> anyhow::Result<()> {
|
|
if !e2e_enabled() {
|
|
skip();
|
|
return Ok(());
|
|
}
|
|
let stack = recovery_stack().await?;
|
|
let client = Client::try_default().await?;
|
|
let devices: Api<Device> = Api::all(client.clone());
|
|
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
|
|
|
let (dev, grp) = ("recov5-dev", "g5");
|
|
let names: Vec<String> = (0..5).map(|i| format!("recov5-depl-{i}")).collect();
|
|
let _ = devices.delete(dev, &DeleteParams::default()).await;
|
|
for n in &names {
|
|
let _ = deployments.delete(n, &DeleteParams::default()).await;
|
|
}
|
|
devices
|
|
.create(&PostParams::default(), &device_with_label(dev, grp))
|
|
.await?;
|
|
|
|
// Create deployments while killing/restarting the operator mid-stream.
|
|
let (mut handle, _) = spawn_aggregator(&stack).await?;
|
|
for (i, n) in names.iter().enumerate() {
|
|
deployments
|
|
.create(&PostParams::default(), &deployment_matching(n, grp))
|
|
.await?;
|
|
if i == 2 {
|
|
kill(handle).await;
|
|
(handle, _) = spawn_aggregator(&stack).await?;
|
|
}
|
|
}
|
|
kill(handle).await;
|
|
|
|
// A replica that stays up must converge the full set within 30s.
|
|
let (h_final, l_final) = spawn_aggregator(&stack).await?;
|
|
wait_until(Duration::from_secs(30), || async { l_final.is_converged() }).await?;
|
|
wait_until(Duration::from_secs(30), || async {
|
|
for n in &names {
|
|
if !desired_state_present(&stack, dev, n).await.unwrap_or(false) {
|
|
return false;
|
|
}
|
|
}
|
|
true
|
|
})
|
|
.await?;
|
|
|
|
kill(h_final).await;
|
|
let dep_refs: Vec<&str> = names.iter().map(String::as_str).collect();
|
|
cleanup(&stack, &[dev], &dep_refs).await;
|
|
Ok(())
|
|
}
|