Files
harmony/fleet/harmony-fleet-e2e/tests/operator.rs
Jean-Gabriel Gill-Couture dedfa19380
All checks were successful
Run Check Script / check (pull_request) Successful in 2m36s
feat(fleet): graceful roll-forward upgrade + container-ID identity (Ch5)
Fixes the 30s redeploy loop and adds graceful deployment upgrades — same root:
how the agent identifies a running container vs. its desired spec.

Redeploy-loop fix (container identity by id/name, not spec compare):
- `ensure_service_running` is now liveness-only — a running container is a NOOP,
  no spec comparison (podman ps can't read env/volumes, which made the old
  matches_spec recreate env-bearing services every tick). The periodic tick
  adopts-or-restarts by name and never recreates a healthy container.
- Spec changes are detected by the reconciler's existing byte-compare of the
  desired-state JSON, not by the runtime. The agent records each container's id
  (from start_service) in DeploymentState.container_ids.
- Deleted the matches_spec FIXME and its always-drift hack.

Graceful roll-forward upgrade:
- PodmanV0Score.lifecycle: Option<LifecyclePolicy> (SIGTERM, 30s grace, SIGKILL
  fallback). stop_signal baked into the container at create; grace drives
  `podman stop --time`.
- On a changed score the reconciler stops the exact old container by its
  recorded id (graceful), then starts the new one; dropped services are stopped.
  Roll-forward only — a failure reports Phase::Failed, never reverts.

New ContainerRuntime methods: start_service (→id), container_status, stop_service
(graceful). Reconciler is now generic over `dyn ContainerRuntime`, unit-tested
against a FakeRuntime: tick-idempotency (loop killed), graceful-replace-by-id,
roll-forward-no-revert, unchanged-noop.

Architecture + flagged VM v1->v2->v3 e2e in
ROADMAP/fleet_platform/ch5-graceful-upgrade-status.md.
2026-06-05 15:26:57 -04:00

215 lines
6.9 KiB
Rust

use harmony::modules::fleet::operator::crd::{
Deployment, DeploymentSpec, Device, DeviceSpec, PodmanService, PodmanV0Score, ReconcileScore,
Rollout, RolloutStrategy,
};
use harmony_fleet_e2e::{StackOptions, shared_stack};
use harmony_reconciler_contracts::{BUCKET_DESIRED_STATE, DeploymentName, desired_state_key};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use kube::Client;
use kube::api::{Api, DeleteParams, ObjectMeta, PostParams};
use std::sync::Arc;
use std::time::{Duration, Instant};
const E2E_ENV: &str = "HARMONY_FLEET_E2E";
fn e2e_enabled() -> bool {
matches!(std::env::var(E2E_ENV).as_deref(), Ok("1" | "true"))
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn operator_adds_finalizer_to_fleet_deployment() -> anyhow::Result<()> {
if !e2e_enabled() {
skip_e2e();
return Ok(());
}
let stack = operator_stack().await?;
let deployments = fleet_deployments(&stack.namespace).await?;
create_fleet_deployment(&deployments, "finalizer-test").await?;
wait_for_finalizer(&deployments, "finalizer-test").await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn operator_writes_desired_state_for_matching_device() -> anyhow::Result<()> {
if !e2e_enabled() {
skip_e2e();
return Ok(());
}
let stack = operator_stack().await?;
let client = Client::try_default().await?;
// Device CRs are cluster-scoped; Deployment CRs are namespaced.
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
create_device(&devices, "desired-state-device").await?;
create_fleet_deployment(&deployments, "desired-state-test").await?;
wait_for_desired_state_entry(&stack, "desired-state-device", "desired-state-test", true)
.await?;
Ok(())
}
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn operator_deletes_desired_state_when_deployment_is_deleted() -> anyhow::Result<()> {
if !e2e_enabled() {
skip_e2e();
return Ok(());
}
let stack = operator_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
create_device(&devices, "cleanup-device").await?;
create_fleet_deployment(&deployments, "cleanup-test").await?;
wait_for_finalizer(&deployments, "cleanup-test").await?;
wait_for_desired_state_entry(&stack, "cleanup-device", "cleanup-test", true).await?;
deployments
.delete("cleanup-test", &DeleteParams::default())
.await?;
wait_for_desired_state_entry(&stack, "cleanup-device", "cleanup-test", false).await?;
Ok(())
}
async fn operator_stack() -> anyhow::Result<Arc<harmony_fleet_e2e::Stack>> {
let _ = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.try_init();
let stack = shared_stack(StackOptions {
deploy_agent: false,
deploy_operator: true,
..StackOptions::default()
})
.await?;
stack.print_debug_info();
assert!(stack.device_ids.is_empty());
Ok(stack)
}
fn skip_e2e() {
eprintln!(
"skipping {E2E_ENV}-gated e2e test (set {E2E_ENV}=1 to run; \
requires k3d + podman on PATH)"
);
}
async fn fleet_deployments(namespace: &str) -> anyhow::Result<Api<Deployment>> {
let client = Client::try_default().await?;
Ok(Api::namespaced(client, namespace))
}
async fn create_device(devices: &Api<Device>, name: &str) -> anyhow::Result<()> {
let device = Device::new(name, DeviceSpec { inventory: None });
devices.create(&PostParams::default(), &device).await?;
Ok(())
}
async fn create_fleet_deployment(deployments: &Api<Deployment>, name: &str) -> anyhow::Result<()> {
let deployment = Deployment {
metadata: ObjectMeta {
name: Some(name.to_string()),
..Default::default()
},
spec: DeploymentSpec {
target_selector: LabelSelector::default(),
score: ReconcileScore::PodmanV0(PodmanV0Score {
services: vec![PodmanService {
name: "hello".to_string(),
image: "docker.io/library/hello-world:latest".to_string(),
ports: vec![],
env: vec![],
volumes: vec![],
restart_policy: Default::default(),
}],
lifecycle: None,
}),
rollout: Rollout {
strategy: RolloutStrategy::Immediate,
},
},
status: None,
};
deployments
.create(&PostParams::default(), &deployment)
.await?;
Ok(())
}
async fn wait_for_finalizer(deployments: &Api<Deployment>, name: &str) -> anyhow::Result<()> {
let deadline = Instant::now() + Duration::from_secs(30);
loop {
let created = deployments.get(name).await?;
let finalizers = created.metadata.finalizers.unwrap_or_default();
if finalizers
.iter()
.any(|finalizer| finalizer == "fleet.nationtech.io/finalizer")
{
break;
}
if Instant::now() >= deadline {
anyhow::bail!(
"timed out waiting for fleet finalizer; current finalizers: {:?}",
finalizers
);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
Ok(())
}
async fn wait_for_desired_state_entry(
stack: &harmony_fleet_e2e::Stack,
device_id: &str,
deployment: &str,
expect_present: bool,
) -> anyhow::Result<()> {
let nats_client = connect_admin_nats(stack).await?;
let js = async_nats::jetstream::new(nats_client);
let desired_state = js.get_key_value(BUCKET_DESIRED_STATE).await?;
let deployment_name = DeploymentName::try_new(deployment)?;
let desired_state_key = desired_state_key(device_id, &deployment_name);
let deadline = Instant::now() + Duration::from_secs(30);
loop {
let exists = desired_state.get(&desired_state_key).await?.is_some();
if exists == expect_present {
break;
}
if Instant::now() >= deadline {
let state = if expect_present { "present" } else { "deleted" };
anyhow::bail!(
"timed out waiting for desired-state key {desired_state_key} to be {state}"
);
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
Ok(())
}
async fn connect_admin_nats(
stack: &harmony_fleet_e2e::Stack,
) -> anyhow::Result<async_nats::Client> {
async_nats::ConnectOptions::new()
.user_and_password(stack.admin_user.clone(), stack.admin_pass.clone())
.connect(&stack.nats_url)
.await
.map_err(Into::into)
}