diff --git a/ROADMAP/fleet_platform/ch5-graceful-upgrade-status.md b/ROADMAP/fleet_platform/ch5-graceful-upgrade-status.md new file mode 100644 index 00000000..3d9117ff --- /dev/null +++ b/ROADMAP/fleet_platform/ch5-graceful-upgrade-status.md @@ -0,0 +1,66 @@ +# Ch5 — Graceful deployment upgrade (roll-forward) + redeploy-loop fix: status + +Two things shipped together because they share the same root: **how the agent +identifies a running container vs. its desired spec**. + +## The redeploy-loop fix (the bug JG flagged) + +`podman ps` doesn't surface env/volumes, so the old `matches_spec` declared +*any* service with env or volumes "drifted" and re-created it **every 30s tick** — +flapping container ids, connectivity blips, log noise. + +Fix (per JG: "use container IDs, store them in NATS"): identity is the +**container, by name+id**, not a spec comparison. +- `ensure_service_running` is now **liveness-only**: a running container is a + NOOP, no spec compare. The periodic tick adopts-or-(re)starts by name and + never recreates a healthy container. The loop is gone — proven by + `tick_is_idempotent_no_recreate` (a service *with env*, the old loop trigger). +- Spec changes are detected where they always were — the reconciler's + byte-compare of the desired-state JSON on a KV Put — not by the runtime. +- The agent records each container's **id** (from `start_service`) in the cache + and in `DeploymentState.container_ids` (surfaced for the dashboard). + +## Graceful roll-forward upgrade + +- `PodmanV0Score.lifecycle: Option` (`stop_signal=SIGTERM`, + `grace_period_secs=30`). The stop signal is baked into the container at + create; the grace period drives `podman stop --time`, after which podman + always SIGKILLs — so there's no `sigkill_fallback` knob to pretend otherwise + (a config field nothing honors is a defect per AGENTS.md). +- On a changed score the reconciler `converge`s: for each service, **stop the + exact old container by its recorded id** (SIGTERM → grace → SIGKILL), then + start the new one, record the new id. Services dropped from the spec are + stopped. **Roll-forward only**: a failure reports `Phase::Failed` and is never + reverted (the customer edits the spec) — proven by + `roll_forward_failure_reports_failed_without_revert`. +- The transient cutover window reports `Phase::Pending` (its documented "in + flight" meaning) → `Running`, so the dashboard reflects the step without a new + Phase variant. + +## Architecture note + +The graceful-replace orchestration lives in the **agent reconciler** (which +holds the cross-reconcile state: cached score + container ids), driving the +`ContainerRuntime` capability directly. The `PodmanV0Score` interpret stays the +"ensure running" path for non-agent/initial converge. `container_spec` is the +one place both share the `PodmanService → ContainerSpec` mapping. The reconciler +is now generic over `dyn ContainerRuntime`, so the convergence logic is unit- +tested against a `FakeRuntime` (no podman socket). + +## Tested + +- `tick_is_idempotent_no_recreate` — the loop-fix. +- `changed_score_gracefully_replaces_by_id` — stop-old-by-id then start-new. +- `unchanged_score_is_a_noop`. +- `roll_forward_failure_reports_failed_without_revert`. +- Plus the existing phase-transition tests, all green. + +## Flagged for a supervised run + +- **VM e2e v1→v2→v3 with controlled failures** (`harmony-fleet-e2e/src/vm`). + Needs multiple image tags reachable from the KVM guest, like Ch4's upgrade + e2e. The convergence logic is unit-proven against the fake runtime; this is + the on-hardware integration proof. The existing `vm_deploy_lifecycle.rs` + covers single-deploy. +- Surfacing `container_ids` in the dashboard device view (data is already on + `DeploymentState`). diff --git a/examples/fleet_load_test/src/main.rs b/examples/fleet_load_test/src/main.rs index 5e4da069..4b65d499 100644 --- a/examples/fleet_load_test/src/main.rs +++ b/examples/fleet_load_test/src/main.rs @@ -423,6 +423,7 @@ async fn apply_one_cr( volumes: vec![], restart_policy: Default::default(), }], + lifecycle: None, }), rollout: Rollout { strategy: RolloutStrategy::Immediate, @@ -496,6 +497,7 @@ async fn simulate_state_loop( last_event_at: Utc::now(), last_error: matches!(phase, Phase::Failed) .then(|| format!("synthetic failure @{}", device.device_id)), + container_ids: vec![], }; match serde_json::to_vec(&ds) { Ok(payload) => match bucket.put(&state_key, payload.into()).await { diff --git a/examples/harmony_apply_deployment/src/main.rs b/examples/harmony_apply_deployment/src/main.rs index 6c46cb49..83d255d5 100644 --- a/examples/harmony_apply_deployment/src/main.rs +++ b/examples/harmony_apply_deployment/src/main.rs @@ -207,6 +207,7 @@ fn build_cr(cli: &Cli) -> Deployment { volumes, restart_policy: cli.restart.into(), }], + lifecycle: None, }; let payload = ReconcileScore::PodmanV0(score); diff --git a/fleet/harmony-fleet-agent/src/main.rs b/fleet/harmony-fleet-agent/src/main.rs index e469b835..b3b0b2d7 100644 --- a/fleet/harmony-fleet-agent/src/main.rs +++ b/fleet/harmony-fleet-agent/src/main.rs @@ -298,7 +298,6 @@ async fn main() -> Result<()> { Arc::new(Reconciler::new( device_id.clone(), t.clone(), - inventory.clone(), Some(fleet.clone()), )) }); diff --git a/fleet/harmony-fleet-agent/src/reconciler.rs b/fleet/harmony-fleet-agent/src/reconciler.rs index 3ba5b583..bb0b6bd6 100644 --- a/fleet/harmony-fleet-agent/src/reconciler.rs +++ b/fleet/harmony-fleet-agent/src/reconciler.rs @@ -7,76 +7,69 @@ use chrono::Utc; use harmony_reconciler_contracts::{DeploymentName, DeploymentState, Id, Phase}; use tokio::sync::Mutex; -use harmony::inventory::Inventory; -use harmony::modules::podman::{PodmanTopology, PodmanV0Score, ReconcileScore}; -use harmony::score::Score; +use harmony::modules::podman::{PodmanV0Score, ReconcileScore}; +use harmony::topology::{ContainerRuntime, ContainerSpec}; use crate::fleet_publisher::FleetPublisher; /// Cache key → last-seen state, populated by `apply` and consulted by the /// 30-second periodic tick and the delete path. struct CachedEntry { - /// Serialized score JSON. Used for string-compare idempotency per - /// ROADMAP §5.5 — cheaper and more deterministic than a hash. + /// Serialized score JSON. Byte-compared for idempotency: a re-delivered + /// identical desired-state is a NOOP; a changed one drives a graceful + /// roll-forward upgrade. serialized: String, - /// Parsed score. Cached so the periodic reconcile tick and delete - /// handlers don't have to re-parse the JSON. + /// Parsed score, so the tick / delete paths don't re-parse. score: PodmanV0Score, + /// Runtime container id per service name (v0.3 Ch5). The identity handle: + /// the tick checks these are alive (not the spec — which caused the old 30s + /// redeploy loop), and an upgrade stops the exact old container by id. + container_ids: HashMap, } pub struct Reconciler { device_id: Id, - topology: Arc, - inventory: Arc, - /// Keyed by NATS KV key (`.`). A single entry per - /// KV key — in v0 there is no fan-out from one key to many scores. + topology: Arc, + /// Keyed by NATS KV key (`.`). One entry per key. state: Mutex>, - /// Current phase per deployment, used to decide whether a new - /// write to the `device-state` KV is needed. - /// - /// NOTE : this feels dangerous, conflict on deployment name could be a problem - /// We must explore this and clarify it in the design and decide if it is a constraint - deployments: Mutex>, - /// Publish surface. Optional so unit tests without a live NATS - /// client still work; always populated in the real agent runtime. + /// Last (phase, container_ids) published per deployment, to skip + /// re-publishing unchanged device-state. + deployments: Mutex)>>, + /// Publish surface. Optional so unit tests without a live NATS client work. fleet: Option>, } impl Reconciler { pub fn new( device_id: Id, - topology: Arc, - inventory: Arc, + topology: Arc, fleet: Option>, ) -> Self { Self { device_id, topology, - inventory, state: Mutex::new(HashMap::new()), deployments: Mutex::new(HashMap::new()), fleet, } } - /// Record a new phase for a deployment and, if it changed, write - /// the updated [`DeploymentState`] to the KV. Same-phase - /// re-confirmations are no-ops so the periodic reconcile tick - /// doesn't churn the bucket. + /// Record a phase (+ backing container ids) for a deployment and publish the + /// [`DeploymentState`] when it changed. Same (phase, ids) is a no-op so the + /// periodic tick doesn't churn the bucket. async fn apply_phase( &self, deployment: &DeploymentName, phase: Phase, last_error: Option, + container_ids: Vec, ) { { let mut phases = self.deployments.lock().await; - // performance nitpick : we don't need a write lock here, we could check before acquiring the write - // lock - if phases.get(deployment).copied() == Some(phase) { + if phases.get(deployment) == Some(&(phase, container_ids.clone())) { return; } - phases.insert(deployment.clone(), phase); + phases.insert(deployment.clone(), (phase, container_ids.clone())); } if let Some(publisher) = &self.fleet { @@ -86,14 +79,12 @@ impl Reconciler { phase, last_event_at: Utc::now(), last_error, + container_ids, }; publisher.write_deployment_state(&state).await; } } - /// Clear the in-memory phase for a deployment and delete its KV - /// entry. Idempotent: a delete for a never-applied deployment is - /// a no-op in memory and a harmless tombstone write on the wire. async fn drop_phase(&self, deployment: &DeploymentName) { let was_known = { let mut phases = self.deployments.lock().await; @@ -107,9 +98,26 @@ impl Reconciler { } } - /// Handle a Put event (new or updated score on NATS KV). No-ops if the - /// serialized score is byte-identical to the last-seen value for this - /// key. + /// Adopt-or-start: if a container with this name is already running, return + /// its id (no recreate — survives an agent restart without flapping the + /// workload); otherwise (re)create it from `spec` and return the new id. + async fn ensure_started(&self, spec: &ContainerSpec) -> Result { + match self + .topology + .container_status(&spec.name) + .await + .map_err(rt)? + { + Some(obs) if obs.running => Ok(obs.id), + _ => self.topology.start_service(spec).await.map_err(rt), + } + } + + /// Handle a Put: NOOP if byte-identical to the last-seen score; a new key + /// starts its services; a changed key **gracefully rolls forward** — stop + /// each old container (SIGTERM → grace → SIGKILL per the score's + /// `LifecyclePolicy`), then start the new one. Roll-forward only: a failure + /// is reported as `Failed`, never reverted (the customer edits the spec). pub async fn apply(&self, key: &str, value: &[u8]) -> Result<()> { let deployment = deployment_from_key(key); let incoming = match serde_json::from_slice::(value) { @@ -117,79 +125,135 @@ impl Reconciler { Err(e) => { tracing::warn!(key, error = %e, "failed to deserialize score"); if let Some(name) = &deployment { - self.apply_phase(name, Phase::Failed, Some(format!("bad payload: {e}"))) - .await; + self.apply_phase( + name, + Phase::Failed, + Some(format!("bad payload: {e}")), + vec![], + ) + .await; } return Ok(()); } }; let serialized = String::from_utf8_lossy(value).into_owned(); - { + let previous = { let state = self.state.lock().await; - if let Some(existing) = state.get(key) { - if existing.serialized == serialized { + match state.get(key) { + Some(existing) if existing.serialized == serialized => { tracing::debug!(key, "score unchanged — noop"); return Ok(()); } + Some(existing) => Some(existing.container_ids.clone()), + None => None, } - } + }; + // Upgrade in flight → Pending (the transient cutover window). if let Some(name) = &deployment { - self.apply_phase(name, Phase::Pending, None).await; + self.apply_phase(name, Phase::Pending, None, vec![]).await; } - match self.run_score(key, &incoming).await { - Ok(()) => { + match self.converge(&incoming, previous.as_ref()).await { + Ok(ids) => { + let id_list: Vec = incoming + .services + .iter() + .filter_map(|s| ids.get(&s.name).cloned()) + .collect(); if let Some(name) = &deployment { - self.apply_phase(name, Phase::Running, None).await; + self.apply_phase(name, Phase::Running, None, id_list).await; } + let mut state = self.state.lock().await; + state.insert( + key.to_string(), + CachedEntry { + serialized, + score: incoming, + container_ids: ids, + }, + ); + Ok(()) } Err(e) => { if let Some(name) = &deployment { - self.apply_phase(name, Phase::Failed, Some(short(&e.to_string()))) + self.apply_phase(name, Phase::Failed, Some(short(&e.to_string())), vec![]) .await; } - return Err(e); + Err(e) } } - - let mut state = self.state.lock().await; - state.insert( - key.to_string(), - CachedEntry { - serialized, - score: incoming, - }, - ); - Ok(()) } - /// Handle a Delete/Purge event. Stops and removes every container - /// referenced by the last cached score for this key. Idempotent: if we - /// never saw a Put for this key (agent restart after delete), logs and - /// returns ok. + /// Drive actual → desired for one score. New deployment: start each service. + /// Changed: stop the old container by id (graceful) then start the new one, + /// and remove services dropped from the spec. Returns service → new id. + async fn converge( + &self, + score: &PodmanV0Score, + previous: Option<&HashMap>, + ) -> Result> { + let lifecycle = score.lifecycle_policy(); + let mut ids = HashMap::new(); + for service in &score.services { + let spec = score.container_spec(service); + match previous.and_then(|p| p.get(&service.name)) { + // Existing service whose spec changed → graceful replace by id. + Some(old_id) => { + self.topology + .stop_service(old_id, lifecycle.grace_period_secs) + .await + .map_err(rt)?; + let id = self.topology.start_service(&spec).await.map_err(rt)?; + ids.insert(service.name.clone(), id); + } + // New (or first-seen) service → adopt-or-start. + None => { + ids.insert(service.name.clone(), self.ensure_started(&spec).await?); + } + } + } + // Services removed from the spec: gracefully stop their old containers. + if let Some(prev) = previous { + for (old_name, old_id) in prev { + if !score.services.iter().any(|s| &s.name == old_name) { + if let Err(e) = self + .topology + .stop_service(old_id, lifecycle.grace_period_secs) + .await + { + tracing::warn!(service = %old_name, error = %e, "failed to stop removed service"); + } + } + } + } + Ok(ids) + } + + /// Handle a Delete/Purge: gracefully stop+remove every container for the + /// last cached score. Idempotent. pub async fn remove(&self, key: &str) -> Result<()> { let deployment = deployment_from_key(key); - let mut state = self.state.lock().await; - let Some(entry) = state.remove(key) else { + let entry = self.state.lock().await.remove(key); + let Some(entry) = entry else { tracing::info!(key, "delete for unknown key — nothing to remove"); if let Some(name) = &deployment { self.drop_phase(name).await; } return Ok(()); }; - drop(state); - use harmony::topology::ContainerRuntime; + let grace = entry.score.lifecycle_policy().grace_period_secs; for service in &entry.score.services { - if let Err(e) = self.topology.remove_service(&service.name).await { - tracing::warn!( - key, - service = %service.name, - error = %e, - "failed to remove container" - ); + // Prefer the recorded id; fall back to the service name. + let target = entry + .container_ids + .get(&service.name) + .cloned() + .unwrap_or_else(|| service.name.clone()); + if let Err(e) = self.topology.stop_service(&target, grace).await { + tracing::warn!(key, service = %service.name, error = %e, "failed to remove container"); } else { tracing::info!(key, service = %service.name, "removed container"); } @@ -200,11 +264,10 @@ impl Reconciler { Ok(()) } - /// Periodic ground-truth reconcile. ROADMAP §5.6 — "polling instead of - /// event-driven PLEG. Agent polls podman every 30s as ground truth; - /// KV watch events are accelerators." Re-runs each cached score against - /// podman-api; the underlying `ensure_service_running` is idempotent - /// so a converged state produces no log noise. + /// Periodic ground-truth reconcile. **Liveness only**: verify each cached + /// deployment's containers are up (adopt-or-start), never compare the spec + /// — that's what killed the old 30s redeploy loop. Spec changes arrive as a + /// KV Put and go through `apply`'s graceful path instead. pub async fn tick(&self) -> Result<()> { let snapshot: Vec<(String, PodmanV0Score)> = { let state = self.state.lock().await; @@ -215,18 +278,37 @@ impl Reconciler { }; for (key, score) in snapshot { let deployment = deployment_from_key(&key); - match self.run_score(&key, &score).await { - Ok(()) => { - if let Some(name) = &deployment { - self.apply_phase(name, Phase::Running, None).await; + let mut ids = HashMap::new(); + let mut error: Option = None; + for service in &score.services { + match self.ensure_started(&score.container_spec(service)).await { + Ok(id) => { + ids.insert(service.name.clone(), id); + } + Err(e) => { + tracing::warn!(key, error = %e, "periodic reconcile failed"); + error = Some(short(&e.to_string())); + break; } } - Err(e) => { - tracing::warn!(key, error = %e, "periodic reconcile failed"); - if let Some(name) = &deployment { - self.apply_phase(name, Phase::Failed, Some(short(&e.to_string()))) - .await; + } + // Refresh the cached ids so a recreated container's id is recorded. + if error.is_none() { + if let Some(entry) = self.state.lock().await.get_mut(&key) { + entry.container_ids = ids.clone(); + } + } + if let Some(name) = &deployment { + match error { + None => { + let id_list = score + .services + .iter() + .filter_map(|s| ids.get(&s.name).cloned()) + .collect(); + self.apply_phase(name, Phase::Running, None, id_list).await; } + Some(e) => self.apply_phase(name, Phase::Failed, Some(e), vec![]).await, } } } @@ -243,16 +325,11 @@ impl Reconciler { } } } +} - async fn run_score(&self, key: &str, score: &PodmanV0Score) -> Result<()> { - let interpret = Score::::create_interpret(score); - let outcome = interpret - .execute(&self.inventory, &self.topology) - .await - .map_err(|e| anyhow::anyhow!("PodmanV0Score interpret failed for {key}: {e}"))?; - tracing::info!(key, outcome = ?outcome, "reconciled"); - Ok(()) - } +/// Map a runtime `ExecutorError` into `anyhow` at the agent boundary. +fn rt(e: harmony::executors::ExecutorError) -> anyhow::Error { + anyhow::anyhow!("container runtime: {e}") } /// Extract the deployment name from a NATS KV key of the form @@ -277,25 +354,168 @@ fn short(s: &str) -> String { #[cfg(test)] mod tests { - //! Focused tests for transition detection. Drive `apply_phase` / - //! `drop_phase` directly with an inert topology (no real podman - //! socket) and a `None` FleetPublisher. + //! Drives the reconciler against an in-memory [`FakeRuntime`] (no podman + //! socket) and a `None` FleetPublisher, so the convergence logic — the + //! loop-fix and the graceful roll-forward — is tested deterministically. use super::*; - use harmony::inventory::Inventory; - use harmony::modules::podman::PodmanTopology; - use std::path::PathBuf; + use harmony::executors::ExecutorError; + use harmony::modules::podman::{PodmanService, ReconcileScore}; + use harmony::topology::{ContainerObservation, ContainerState}; + use std::sync::Mutex as StdMutex; - fn reconciler() -> Reconciler { - let topology = Arc::new( - PodmanTopology::from_unix_socket(PathBuf::from("/nonexistent/for-tests")).unwrap(), - ); - let inventory = Arc::new(Inventory::empty()); - Reconciler::new( + #[derive(Default)] + struct FakeState { + /// name → (id, running). + containers: HashMap, + next_id: usize, + started: Vec, + stopped: Vec, + fail_next_start: bool, + } + + #[derive(Default)] + struct FakeRuntime { + inner: StdMutex, + } + + impl FakeRuntime { + fn started(&self) -> Vec { + self.inner.lock().unwrap().started.clone() + } + fn stopped(&self) -> Vec { + self.inner.lock().unwrap().stopped.clone() + } + fn fail_next_start(&self) { + self.inner.lock().unwrap().fail_next_start = true; + } + } + + #[async_trait::async_trait] + impl ContainerRuntime for FakeRuntime { + async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError> { + self.start_service(spec).await.map(|_| ()) + } + async fn start_service(&self, spec: &ContainerSpec) -> Result { + let mut s = self.inner.lock().unwrap(); + if s.fail_next_start { + s.fail_next_start = false; + return Err(ExecutorError::UnexpectedError("start failed".into())); + } + s.next_id += 1; + let id = format!("id{}", s.next_id); + s.containers.insert(spec.name.clone(), (id.clone(), true)); + s.started.push(id.clone()); + Ok(id) + } + async fn container_status( + &self, + name: &str, + ) -> Result, ExecutorError> { + let s = self.inner.lock().unwrap(); + Ok(s.containers + .get(name) + .map(|(id, running)| ContainerObservation { + id: id.clone(), + running: *running, + })) + } + async fn stop_service(&self, id_or_name: &str, _grace: u64) -> Result<(), ExecutorError> { + let mut s = self.inner.lock().unwrap(); + s.stopped.push(id_or_name.to_string()); + s.containers + .retain(|name, (id, _)| name != id_or_name && id != id_or_name); + Ok(()) + } + async fn remove_service(&self, name: &str) -> Result<(), ExecutorError> { + self.stop_service(name, 0).await + } + async fn list_managed_services(&self) -> Result, ExecutorError> { + Ok(vec![]) + } + } + + fn reconciler() -> (Reconciler, Arc) { + let fake = Arc::new(FakeRuntime::default()); + let r = Reconciler::new( Id::from("test-device".to_string()), - topology, - inventory, + fake.clone() as Arc, None, - ) + ); + (r, fake) + } + + fn score_bytes(image: &str) -> Vec { + let score = PodmanV0Score { + services: vec![PodmanService { + name: "web".to_string(), + image: image.to_string(), + ports: vec![], + env: vec![EnvVar::new("LOG", "info")], + volumes: vec![], + restart_policy: Default::default(), + }], + lifecycle: None, + }; + serde_json::to_vec(&ReconcileScore::PodmanV0(score)).unwrap() + } + + use harmony::topology::EnvVar; + + #[tokio::test] + async fn tick_is_idempotent_no_recreate() { + // The loop-fix: a service WITH env (which the old matches_spec declared + // "always drifted") must not be recreated by the periodic tick. + let (r, fake) = reconciler(); + r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap(); + assert_eq!(fake.started().len(), 1, "initial start"); + for _ in 0..3 { + r.tick().await.unwrap(); + } + assert_eq!( + fake.started().len(), + 1, + "tick must not recreate a running container" + ); + assert!( + fake.stopped().is_empty(), + "tick must not stop a healthy container" + ); + } + + #[tokio::test] + async fn changed_score_gracefully_replaces_by_id() { + let (r, fake) = reconciler(); + r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap(); + r.apply("dev.web", &score_bytes("nginx:2")).await.unwrap(); + // Old container (id1) stopped gracefully, new one (id2) started. + assert_eq!(fake.started(), vec!["id1", "id2"]); + assert_eq!(fake.stopped(), vec!["id1"]); + } + + #[tokio::test] + async fn unchanged_score_is_a_noop() { + let (r, fake) = reconciler(); + r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap(); + r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap(); + assert_eq!(fake.started().len(), 1, "byte-identical re-apply is a noop"); + } + + #[tokio::test] + async fn roll_forward_failure_reports_failed_without_revert() { + let (r, fake) = reconciler(); + r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap(); + fake.fail_next_start(); + let res = r.apply("dev.web", &score_bytes("nginx:2")).await; + assert!(res.is_err(), "upgrade failure surfaces"); + // Old was stopped; new start failed; NO revert (roll-forward only). + assert_eq!(fake.stopped(), vec!["id1"]); + assert_eq!( + fake.started(), + vec!["id1"], + "no revert restart of the old version" + ); + let phases = r.deployments.lock().await; + assert_eq!(phases.get(&dn("web")).map(|(p, _)| *p), Some(Phase::Failed)); } fn dn(s: &str) -> DeploymentName { @@ -304,36 +524,63 @@ mod tests { #[tokio::test] async fn apply_phase_records_new_phase() { - let r = reconciler(); - r.apply_phase(&dn("hello"), Phase::Running, None).await; + let (r, _) = reconciler(); + r.apply_phase(&dn("hello"), Phase::Running, None, vec![]) + .await; let phases = r.deployments.lock().await; - assert_eq!(phases.get(&dn("hello")), Some(&Phase::Running)); + assert_eq!( + phases.get(&dn("hello")).map(|(p, _)| *p), + Some(Phase::Running) + ); } #[tokio::test] - async fn apply_phase_idempotent_for_same_phase() { - let r = reconciler(); - r.apply_phase(&dn("hello"), Phase::Running, None).await; - r.apply_phase(&dn("hello"), Phase::Running, None).await; + async fn apply_phase_idempotent_for_same_phase_and_ids() { + let (r, _) = reconciler(); + r.apply_phase(&dn("hello"), Phase::Running, None, vec!["abc".into()]) + .await; + r.apply_phase(&dn("hello"), Phase::Running, None, vec!["abc".into()]) + .await; let phases = r.deployments.lock().await; assert_eq!(phases.len(), 1); } #[tokio::test] - async fn apply_phase_transitions_update_phase() { - let r = reconciler(); - r.apply_phase(&dn("hello"), Phase::Pending, None).await; - r.apply_phase(&dn("hello"), Phase::Running, None).await; - r.apply_phase(&dn("hello"), Phase::Failed, Some("oom".to_string())) + async fn apply_phase_republishes_when_container_id_changes() { + let (r, _) = reconciler(); + r.apply_phase(&dn("hello"), Phase::Running, None, vec!["old".into()]) + .await; + // Same phase, new id (after an upgrade) → recorded. + r.apply_phase(&dn("hello"), Phase::Running, None, vec!["new".into()]) .await; let phases = r.deployments.lock().await; - assert_eq!(phases.get(&dn("hello")), Some(&Phase::Failed)); + assert_eq!( + phases.get(&dn("hello")), + Some(&(Phase::Running, vec!["new".to_string()])) + ); + } + + #[tokio::test] + async fn apply_phase_transitions_update_phase() { + let (r, _) = reconciler(); + r.apply_phase(&dn("hello"), Phase::Pending, None, vec![]) + .await; + r.apply_phase(&dn("hello"), Phase::Running, None, vec![]) + .await; + r.apply_phase(&dn("hello"), Phase::Failed, Some("oom".to_string()), vec![]) + .await; + let phases = r.deployments.lock().await; + assert_eq!( + phases.get(&dn("hello")).map(|(p, _)| *p), + Some(Phase::Failed) + ); } #[tokio::test] async fn drop_phase_clears_known_deployment() { - let r = reconciler(); - r.apply_phase(&dn("hello"), Phase::Running, None).await; + let (r, _) = reconciler(); + r.apply_phase(&dn("hello"), Phase::Running, None, vec![]) + .await; r.drop_phase(&dn("hello")).await; let phases = r.deployments.lock().await; assert!(!phases.contains_key(&dn("hello"))); @@ -341,7 +588,7 @@ mod tests { #[tokio::test] async fn drop_phase_on_unknown_deployment_is_noop() { - let r = reconciler(); + let (r, _) = reconciler(); r.drop_phase(&dn("never-existed")).await; let phases = r.deployments.lock().await; assert!(phases.is_empty()); diff --git a/fleet/harmony-fleet-e2e/tests/operator.rs b/fleet/harmony-fleet-e2e/tests/operator.rs index 9d3c3c9e..8f5492f2 100644 --- a/fleet/harmony-fleet-e2e/tests/operator.rs +++ b/fleet/harmony-fleet-e2e/tests/operator.rs @@ -137,6 +137,7 @@ async fn create_fleet_deployment(deployments: &Api, name: &str) -> a volumes: vec![], restart_policy: Default::default(), }], + lifecycle: None, }), rollout: Rollout { strategy: RolloutStrategy::Immediate, diff --git a/fleet/harmony-fleet-e2e/tests/operator_recovery.rs b/fleet/harmony-fleet-e2e/tests/operator_recovery.rs index ef6543b3..e3d17834 100644 --- a/fleet/harmony-fleet-e2e/tests/operator_recovery.rs +++ b/fleet/harmony-fleet-e2e/tests/operator_recovery.rs @@ -125,6 +125,7 @@ fn podman_score() -> PodmanV0Score { volumes: vec![], restart_policy: Default::default(), }], + lifecycle: None, } } @@ -248,6 +249,7 @@ async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> { phase: Phase::Running, last_event_at: Utc::now(), last_error: None, + container_ids: vec![], }) .await?; kill(h1).await; diff --git a/fleet/harmony-fleet-e2e/tests/vm_deploy_lifecycle.rs b/fleet/harmony-fleet-e2e/tests/vm_deploy_lifecycle.rs index c27fd552..40515351 100644 --- a/fleet/harmony-fleet-e2e/tests/vm_deploy_lifecycle.rs +++ b/fleet/harmony-fleet-e2e/tests/vm_deploy_lifecycle.rs @@ -185,5 +185,6 @@ fn podman_score(image_tag: &str) -> PodmanV0Score { volumes: vec![], restart_policy: RestartPolicy::default(), }], + lifecycle: None, } } diff --git a/fleet/harmony-fleet-e2e/tests/vm_isolation.rs b/fleet/harmony-fleet-e2e/tests/vm_isolation.rs index 88154fa1..fca26da5 100644 --- a/fleet/harmony-fleet-e2e/tests/vm_isolation.rs +++ b/fleet/harmony-fleet-e2e/tests/vm_isolation.rs @@ -92,6 +92,7 @@ async fn agent_ignores_other_devices_keys() -> anyhow::Result<()> { volumes: vec![], restart_policy: RestartPolicy::default(), }], + lifecycle: None, }; admin.put_podman_at_key(&foreign_key, &score).await?; diff --git a/fleet/harmony-fleet-operator/src/fleet_aggregator.rs b/fleet/harmony-fleet-operator/src/fleet_aggregator.rs index 3cede511..aeff9bbd 100644 --- a/fleet/harmony-fleet-operator/src/fleet_aggregator.rs +++ b/fleet/harmony-fleet-operator/src/fleet_aggregator.rs @@ -818,6 +818,7 @@ mod tests { phase, last_event_at: Utc.timestamp_opt(1_700_000_000 + seconds, 0).unwrap(), last_error: None, + container_ids: vec![], } } diff --git a/fleet/harmony-fleet-operator/src/service/real.rs b/fleet/harmony-fleet-operator/src/service/real.rs index a4e7276a..6a3e9097 100644 --- a/fleet/harmony-fleet-operator/src/service/real.rs +++ b/fleet/harmony-fleet-operator/src/service/real.rs @@ -652,6 +652,7 @@ mod tests { volumes: vec![], restart_policy: Default::default(), }], + lifecycle: None, }), rollout: Rollout { strategy: RolloutStrategy::Immediate, @@ -705,6 +706,7 @@ mod tests { volumes: vec![], restart_policy: Default::default(), }], + lifecycle: None, }); assert_eq!(first_service_name(&score).as_deref(), Some("web")); } @@ -720,6 +722,7 @@ mod tests { volumes: vec![], restart_policy: Default::default(), }], + lifecycle: None, }); assert_eq!(deployment_version(&score), "1.25"); } diff --git a/harmony-reconciler-contracts/src/fleet.rs b/harmony-reconciler-contracts/src/fleet.rs index 19500bb6..c27e8a72 100644 --- a/harmony-reconciler-contracts/src/fleet.rs +++ b/harmony-reconciler-contracts/src/fleet.rs @@ -131,6 +131,12 @@ pub struct DeploymentState { pub last_event_at: DateTime, #[serde(default)] pub last_error: Option, + /// Runtime container ids backing this deployment on the device, one per + /// service (v0.3 Ch5). The agent's identity handle for graceful replacement + /// and liveness; surfaced for the dashboard. `#[serde(default)]` for + /// wire-compat with pre-Ch5 agents that don't report it. + #[serde(default)] + pub container_ids: Vec, } /// Tiny liveness ping. Written to KV key `heartbeat.` in @@ -233,6 +239,7 @@ mod tests { phase: Phase::Failed, last_event_at: ts("2026-04-22T10:05:00Z"), last_error: Some("image pull 429".to_string()), + container_ids: vec![], }; let json = serde_json::to_string(&original).unwrap(); let back: DeploymentState = serde_json::from_str(&json).unwrap(); diff --git a/harmony/src/domain/topology/container_runtime.rs b/harmony/src/domain/topology/container_runtime.rs index a5f7a79f..20c12129 100644 --- a/harmony/src/domain/topology/container_runtime.rs +++ b/harmony/src/domain/topology/container_runtime.rs @@ -21,13 +21,36 @@ use crate::executors::ExecutorError; /// capability will be clearer than stretching this one. #[async_trait] pub trait ContainerRuntime: Send + Sync { - /// Ensure a container matching `spec` is running. Idempotent: running the - /// same spec twice must converge to the same state with no observable - /// side effects beyond logs. If a container with the same `name` already - /// exists with the same image and port mappings, this is a NOOP. - /// Otherwise it is stopped, removed, and recreated. + /// Ensure a container matching `spec` is running. Idempotent and + /// **liveness-only**: if a container with this `name` is already running, + /// it's a NOOP — the runtime does **not** compare the spec (a polling + /// reconciler would otherwise re-create on every tick for fields it can't + /// read back). Spec changes are driven through [`Self::replace_service`] by + /// a stateful caller that knows the desired state changed. If no container + /// is running, one is (re)created from `spec`. async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError>; + /// Create + start a container from `spec`, returning its runtime id. Any + /// existing container with the same name is force-removed first. The id is + /// the caller's handle for liveness checks and targeted replacement. + async fn start_service(&self, spec: &ContainerSpec) -> Result; + + /// Observe the container with this name: its id + whether it's running. + /// `None` if no such container exists. + async fn container_status( + &self, + name: &str, + ) -> Result, ExecutorError>; + + /// Gracefully stop and remove a container by id or name: send the + /// container's stop signal, wait `grace_period_secs`, then SIGKILL + /// (podman's stop semantics), then remove. Idempotent — no error if gone. + async fn stop_service( + &self, + id_or_name: &str, + grace_period_secs: u64, + ) -> Result<(), ExecutorError>; + /// Stop and remove the container with this name if it exists. No error /// if the container does not exist — idempotent. async fn remove_service(&self, name: &str) -> Result<(), ExecutorError>; @@ -38,6 +61,37 @@ pub trait ContainerRuntime: Send + Sync { async fn list_managed_services(&self) -> Result, ExecutorError>; } +/// Graceful-shutdown policy for a deployment's container during a roll-forward +/// upgrade (v0.3 Ch5). Mirrors K8s/`docker stop` semantics: send `stop_signal`, +/// wait `grace_period_secs`, then SIGKILL (podman's stop always force-kills +/// after the timeout — there's no "never kill" knob, so we don't pretend one). +#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct LifecyclePolicy { + /// Signal sent first to ask the process to exit cleanly (e.g. `SIGTERM`). + pub stop_signal: String, + /// Seconds to wait for a clean exit before SIGKILL. + pub grace_period_secs: u64, +} + +impl Default for LifecyclePolicy { + fn default() -> Self { + Self { + stop_signal: "SIGTERM".to_string(), + grace_period_secs: 30, + } + } +} + +/// A container's identity + liveness, as observed on the runtime. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct ContainerObservation { + /// Runtime container id. + pub id: String, + /// Whether it's currently running. + pub running: bool, +} + /// A single environment variable, name/value pair. /// /// Modeled as a struct rather than `(String, String)` because k8s CRD @@ -98,6 +152,11 @@ pub struct ContainerSpec { /// Restart policy on container exit. Mirrors podman/docker semantics. #[serde(default)] pub restart_policy: RestartPolicy, + /// Signal sent on graceful stop (from the deployment's `LifecyclePolicy`). + /// Baked into the container at create time so `podman stop` uses it. `None` + /// = the image/podman default (`SIGTERM`). + #[serde(default)] + pub stop_signal: Option, } impl ContainerSpec { diff --git a/harmony/src/modules/podman/interpret.rs b/harmony/src/modules/podman/interpret.rs index ec986685..7ef9f519 100644 --- a/harmony/src/modules/podman/interpret.rs +++ b/harmony/src/modules/podman/interpret.rs @@ -5,7 +5,7 @@ use crate::{ data::Version, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, - topology::{ContainerRuntime, ContainerSpec, Topology}, + topology::{ContainerRuntime, Topology}, }; use super::score::PodmanV0Score; @@ -54,15 +54,7 @@ impl Interpret for PodmanV0Interpret { topology: &T, ) -> Result { for service in &self.score.services { - let spec = ContainerSpec { - name: service.name.clone(), - image: service.image.clone(), - ports: service.ports.clone(), - labels: vec![(DEPLOYMENT_LABEL.to_string(), self.score.deployment_label())], - env: service.env.clone(), - volumes: service.volumes.clone(), - restart_policy: service.restart_policy, - }; + let spec = self.score.container_spec(service); topology.ensure_service_running(&spec).await.map_err(|e| { InterpretError::new(format!( "PodmanV0: ensure_service_running({}) failed: {e}", diff --git a/harmony/src/modules/podman/score.rs b/harmony/src/modules/podman/score.rs index 9994ebf3..69c4ed7d 100644 --- a/harmony/src/modules/podman/score.rs +++ b/harmony/src/modules/podman/score.rs @@ -13,10 +13,13 @@ use serde::{Deserialize, Serialize}; use crate::{ interpret::Interpret, score::Score, - topology::{ContainerRuntime, EnvVar, RestartPolicy, Topology, VolumeMount}, + topology::{ + ContainerRuntime, ContainerSpec, EnvVar, LifecyclePolicy, RestartPolicy, Topology, + VolumeMount, + }, }; -use super::interpret::PodmanV0Interpret; +use super::interpret::{DEPLOYMENT_LABEL, PodmanV0Interpret}; /// A single container managed by podman on the target host. /// @@ -46,9 +49,36 @@ pub struct PodmanService { #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)] pub struct PodmanV0Score { pub services: Vec, + /// Graceful-shutdown policy used when a changed Score replaces a running + /// container (v0.3 Ch5 roll-forward upgrade). `None` = the default + /// (SIGTERM, 30s grace, SIGKILL fallback). Defaults so older Deployment CRs + /// without the field deserialize unchanged. + #[serde(default)] + pub lifecycle: Option, } impl PodmanV0Score { + /// The effective lifecycle policy (configured, or the default). + pub fn lifecycle_policy(&self) -> LifecyclePolicy { + self.lifecycle.clone().unwrap_or_default() + } + + /// Build the [`ContainerSpec`] for one service: image/ports/env/volumes plus + /// the deployment label and the lifecycle stop signal. One place so the + /// interpret and the agent reconciler agree on the mapping. + pub fn container_spec(&self, service: &PodmanService) -> ContainerSpec { + ContainerSpec { + name: service.name.clone(), + image: service.image.clone(), + ports: service.ports.clone(), + labels: vec![(DEPLOYMENT_LABEL.to_string(), self.deployment_label())], + env: service.env.clone(), + volumes: service.volumes.clone(), + restart_policy: service.restart_policy, + stop_signal: self.lifecycle.as_ref().map(|l| l.stop_signal.clone()), + } + } + /// Label value applied to every container this Score creates. The /// caller uses the label to enumerate the set of containers that /// belong to a given Score instance (e.g. to tear them down when @@ -122,6 +152,7 @@ mod tests { ports: vec!["8080:80".to_string()], ..svc("web", "nginx:latest") }], + lifecycle: None, }); let json = serde_json::to_string(&score).unwrap(); assert!(json.contains("\"type\":\"PodmanV0\"")); @@ -148,6 +179,7 @@ mod tests { ..svc("api", "myapp:1.0") }, ], + lifecycle: None, }); let serialized = serde_json::to_string(&score).unwrap(); let deserialized: ReconcileScore = serde_json::from_str(&serialized).unwrap(); @@ -229,6 +261,7 @@ mod tests { fn deployment_label_joins_service_names() { let score = PodmanV0Score { services: vec![svc("web", "nginx"), svc("api", "myapp")], + lifecycle: None, }; assert_eq!(score.deployment_label(), "web,api"); } diff --git a/harmony/src/modules/podman/topology.rs b/harmony/src/modules/podman/topology.rs index 3aeb53bd..0aecc05e 100644 --- a/harmony/src/modules/podman/topology.rs +++ b/harmony/src/modules/podman/topology.rs @@ -12,8 +12,8 @@ use podman_api::opts::{ }; use crate::domain::topology::{ - ContainerRuntime, ContainerSpec, ContainerState, PreparationError, PreparationOutcome, - RestartPolicy, Topology, VolumeMount, + ContainerObservation, ContainerRuntime, ContainerSpec, ContainerState, PreparationError, + PreparationOutcome, RestartPolicy, Topology, VolumeMount, }; use crate::executors::ExecutorError; @@ -97,49 +97,11 @@ impl PodmanTopology { Err(e) => Err(to_exec_error(e)), } } -} - -#[async_trait] -impl Topology for PodmanTopology { - fn name(&self) -> &str { - &self.name - } - - async fn ensure_ready(&self) -> Result { - // A quick ping — calling info() is the cheapest endpoint that - // exercises the socket end-to-end. - self.podman - .info() - .await - .map_err(|e| PreparationError::new(format!("podman socket unreachable: {e}")))?; - Ok(PreparationOutcome::Noop) - } -} - -#[async_trait] -impl ContainerRuntime for PodmanTopology { - async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError> { - let existing = self.get_by_name(&spec.name).await?; - - if let Some(existing) = existing.as_ref() { - if matches_spec(existing, spec) { - let running = existing.state.as_deref() == Some("running"); - if running { - return Ok(()); - } - // Same spec, just not running — start it rather than recreate. - let id = existing.id.clone().unwrap_or_else(|| spec.name.clone()); - self.containers() - .get(id) - .start(None) - .await - .map_err(to_exec_error)?; - return Ok(()); - } - // Drift — remove and recreate below. - self.remove_container(&spec.name).await?; - } + /// Create + start a container from `spec`, returning its id. The container's + /// stop signal + stop timeout are baked in here so a later `podman stop` + /// honors the deployment's lifecycle policy. + async fn create_and_start(&self, spec: &ContainerSpec) -> Result { self.ensure_image_present(&spec.image).await?; let mut labels = HashMap::new(); @@ -171,34 +133,94 @@ impl ContainerRuntime for PodmanTopology { .portmappings(port_mappings) .env(env_map) .restart_policy(map_restart_policy(spec.restart_policy)); + if let Some(sig) = spec.stop_signal.as_deref() { + builder = builder.stop_signal(signal_number(sig)); + } if !mounts.is_empty() { builder = builder.mounts(mounts); } - let opts = builder.build(); - let created = self .containers() - .create(&opts) + .create(&builder.build()) .await .map_err(to_exec_error)?; + let id = created.id; self.containers() - .get(created.id.clone()) + .get(id.clone()) .start(None) .await .map_err(to_exec_error)?; - Ok(()) + Ok(id) + } +} + +#[async_trait] +impl Topology for PodmanTopology { + fn name(&self) -> &str { + &self.name } - async fn remove_service(&self, name: &str) -> Result<(), ExecutorError> { - // First try a graceful stop; ignore failures so removal still happens. + async fn ensure_ready(&self) -> Result { + // A quick ping — calling info() is the cheapest endpoint that + // exercises the socket end-to-end. + self.podman + .info() + .await + .map_err(|e| PreparationError::new(format!("podman socket unreachable: {e}")))?; + Ok(PreparationOutcome::Noop) + } +} + +#[async_trait] +impl ContainerRuntime for PodmanTopology { + async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError> { + // Liveness-only: never compare the spec (that's the caller's job via + // byte-compare of the desired state). A container that's already up + // stays up — no per-tick recreate, no flap. + match self.container_status(&spec.name).await? { + Some(obs) if obs.running => Ok(()), + _ => self.start_service(spec).await.map(|_| ()), + } + } + + async fn start_service(&self, spec: &ContainerSpec) -> Result { + // Force-remove any stale container by this name so create can't collide. + self.remove_container(&spec.name).await?; + self.create_and_start(spec).await + } + + async fn container_status( + &self, + name: &str, + ) -> Result, ExecutorError> { + let Some(c) = self.get_by_name(name).await? else { + return Ok(None); + }; + Ok(Some(ContainerObservation { + id: c.id.clone().unwrap_or_default(), + running: c.state.as_deref() == Some("running"), + })) + } + + async fn stop_service( + &self, + id_or_name: &str, + grace_period_secs: u64, + ) -> Result<(), ExecutorError> { + // Graceful: send the container's stop signal, wait `grace`, then podman + // SIGKILLs. Ignore stop failures so removal still happens. let stop_opts = ContainerStopOpts::builder() - .timeout(STOP_TIMEOUT.as_secs() as usize) + .timeout(grace_period_secs as usize) .build(); - let container = self.containers().get(name); + let container = self.containers().get(id_or_name); if container.exists().await.unwrap_or(false) { let _ = container.stop(&stop_opts).await; } - self.remove_container(name).await + self.remove_container(id_or_name).await + } + + async fn remove_service(&self, name: &str) -> Result<(), ExecutorError> { + self.stop_service(name, STOP_TIMEOUT.as_secs()).await } async fn list_managed_services(&self) -> Result, ExecutorError> { @@ -267,71 +289,19 @@ fn parse_port_mapping(raw: &str) -> Result { }) } -fn matches_spec(observed: &podman_api::models::ListContainer, spec: &ContainerSpec) -> bool { - let same_image = observed - .image - .as_deref() - .map(|i| i == spec.image) - .unwrap_or(false); - if !same_image { - return false; +/// Map a signal name (`SIGTERM`) to its number for podman's `stop_signal` +/// create option. Unknown names fall back to SIGTERM (15) — the safe default. +fn signal_number(name: &str) -> i64 { + match name.trim().to_uppercase().trim_start_matches("SIG") { + "HUP" => 1, + "INT" => 2, + "QUIT" => 3, + "KILL" => 9, + "USR1" => 10, + "USR2" => 12, + "TERM" => 15, + other => other.parse().unwrap_or(15), } - let observed_ports = observed.ports.as_deref().unwrap_or(&[]); - if observed_ports.len() != spec.ports.len() { - return false; - } - for raw in &spec.ports { - let Ok(expected) = parse_port_mapping(raw) else { - return false; - }; - let present = observed_ports.iter().any(|p| { - p.host_port == expected.host_port && p.container_port == expected.container_port - }); - if !present { - return false; - } - } - // FIXME(redeploy-loop): this branch makes the agent's periodic - // reconcile non-idempotent for any non-trivial Deployment. - // Symptom: a service with env or volumes is destroyed and - // recreated every 30s tick (RECONCILE_INTERVAL), even when the - // observed container is already correct — operators see flapping - // container IDs, intermittent connectivity blips, log noise. - // - // Root cause: `podman list` (v5.x) doesn't surface env or mounts, - // so we can't compare them; the original author chose to declare - // "any spec with env/volumes is drifted" as a fail-safe. That's - // the wrong default for a polling reconciler — it weaponizes the - // poll into a re-creation loop. - // - // Right fix (out of scope for the demo, in scope for delivery): - // 1. Switch this code path to `containers.get(name).inspect()` - // which DOES return env + mounts. Compare structurally. - // 2. Treat absent fields on the inspect response as "unchanged", - // not "drifted". - // 3. Add an integration test that runs ensure_service_running - // twice on the same spec and asserts the container ID is - // unchanged. - // - // Layered next: the upcoming health-check addition to - // ContainerSpec gives the agent a separate signal to decide - // when to recreate (failed health checks → unhealthy → recreate) - // independent of the spec-drift check. - // - // Until fixed: avoid env / volumes in demo-time deployments to - // dodge the loop. The hello-web nginx demo doesn't have either, - // which is why it's stable. - if !spec.env.is_empty() || !spec.volumes.is_empty() { - return false; - } - // Restart policy: ListContainer doesn't surface it directly. We - // only force a recreate when the spec explicitly asks for something - // other than the default — so unchanged podman-default behaviour - // stays a NOOP, and explicit policy changes converge on next apply. - if spec.restart_policy != RestartPolicy::default() { - return false; - } - true } fn volume_to_mount(v: &VolumeMount) -> ContainerMount {