feat(fleet): graceful roll-forward upgrade + container-ID identity (Ch5) #331

Open
johnride wants to merge 1 commits from feat/fleet-ch5-graceful-deploy-upgrade into feat/fleet-ch4-agent-upgrade
16 changed files with 648 additions and 263 deletions

View File

@@ -0,0 +1,66 @@
# Ch5 — Graceful deployment upgrade (roll-forward) + redeploy-loop fix: status
Two things shipped together because they share the same root: **how the agent
identifies a running container vs. its desired spec**.
## The redeploy-loop fix (the bug JG flagged)
`podman ps` doesn't surface env/volumes, so the old `matches_spec` declared
*any* service with env or volumes "drifted" and re-created it **every 30s tick**
flapping container ids, connectivity blips, log noise.
Fix (per JG: "use container IDs, store them in NATS"): identity is the
**container, by name+id**, not a spec comparison.
- `ensure_service_running` is now **liveness-only**: a running container is a
NOOP, no spec compare. The periodic tick adopts-or-(re)starts by name and
never recreates a healthy container. The loop is gone — proven by
`tick_is_idempotent_no_recreate` (a service *with env*, the old loop trigger).
- Spec changes are detected where they always were — the reconciler's
byte-compare of the desired-state JSON on a KV Put — not by the runtime.
- The agent records each container's **id** (from `start_service`) in the cache
and in `DeploymentState.container_ids` (surfaced for the dashboard).
## Graceful roll-forward upgrade
- `PodmanV0Score.lifecycle: Option<LifecyclePolicy>` (`stop_signal=SIGTERM`,
`grace_period_secs=30`). The stop signal is baked into the container at
create; the grace period drives `podman stop --time`, after which podman
always SIGKILLs — so there's no `sigkill_fallback` knob to pretend otherwise
(a config field nothing honors is a defect per AGENTS.md).
- On a changed score the reconciler `converge`s: for each service, **stop the
exact old container by its recorded id** (SIGTERM → grace → SIGKILL), then
start the new one, record the new id. Services dropped from the spec are
stopped. **Roll-forward only**: a failure reports `Phase::Failed` and is never
reverted (the customer edits the spec) — proven by
`roll_forward_failure_reports_failed_without_revert`.
- The transient cutover window reports `Phase::Pending` (its documented "in
flight" meaning) → `Running`, so the dashboard reflects the step without a new
Phase variant.
## Architecture note
The graceful-replace orchestration lives in the **agent reconciler** (which
holds the cross-reconcile state: cached score + container ids), driving the
`ContainerRuntime` capability directly. The `PodmanV0Score` interpret stays the
"ensure running" path for non-agent/initial converge. `container_spec` is the
one place both share the `PodmanService → ContainerSpec` mapping. The reconciler
is now generic over `dyn ContainerRuntime`, so the convergence logic is unit-
tested against a `FakeRuntime` (no podman socket).
## Tested
- `tick_is_idempotent_no_recreate` — the loop-fix.
- `changed_score_gracefully_replaces_by_id` — stop-old-by-id then start-new.
- `unchanged_score_is_a_noop`.
- `roll_forward_failure_reports_failed_without_revert`.
- Plus the existing phase-transition tests, all green.
## Flagged for a supervised run
- **VM e2e v1→v2→v3 with controlled failures** (`harmony-fleet-e2e/src/vm`).
Needs multiple image tags reachable from the KVM guest, like Ch4's upgrade
e2e. The convergence logic is unit-proven against the fake runtime; this is
the on-hardware integration proof. The existing `vm_deploy_lifecycle.rs`
covers single-deploy.
- Surfacing `container_ids` in the dashboard device view (data is already on
`DeploymentState`).

View File

@@ -423,6 +423,7 @@ async fn apply_one_cr(
volumes: vec![],
restart_policy: Default::default(),
}],
lifecycle: None,
}),
rollout: Rollout {
strategy: RolloutStrategy::Immediate,
@@ -496,6 +497,7 @@ async fn simulate_state_loop(
last_event_at: Utc::now(),
last_error: matches!(phase, Phase::Failed)
.then(|| format!("synthetic failure @{}", device.device_id)),
container_ids: vec![],
};
match serde_json::to_vec(&ds) {
Ok(payload) => match bucket.put(&state_key, payload.into()).await {

View File

@@ -207,6 +207,7 @@ fn build_cr(cli: &Cli) -> Deployment {
volumes,
restart_policy: cli.restart.into(),
}],
lifecycle: None,
};
let payload = ReconcileScore::PodmanV0(score);

View File

@@ -298,7 +298,6 @@ async fn main() -> Result<()> {
Arc::new(Reconciler::new(
device_id.clone(),
t.clone(),
inventory.clone(),
Some(fleet.clone()),
))
});

View File

@@ -7,76 +7,69 @@ use chrono::Utc;
use harmony_reconciler_contracts::{DeploymentName, DeploymentState, Id, Phase};
use tokio::sync::Mutex;
use harmony::inventory::Inventory;
use harmony::modules::podman::{PodmanTopology, PodmanV0Score, ReconcileScore};
use harmony::score::Score;
use harmony::modules::podman::{PodmanV0Score, ReconcileScore};
use harmony::topology::{ContainerRuntime, ContainerSpec};
use crate::fleet_publisher::FleetPublisher;
/// Cache key → last-seen state, populated by `apply` and consulted by the
/// 30-second periodic tick and the delete path.
struct CachedEntry {
/// Serialized score JSON. Used for string-compare idempotency per
/// ROADMAP §5.5 — cheaper and more deterministic than a hash.
/// Serialized score JSON. Byte-compared for idempotency: a re-delivered
/// identical desired-state is a NOOP; a changed one drives a graceful
/// roll-forward upgrade.
serialized: String,
/// Parsed score. Cached so the periodic reconcile tick and delete
/// handlers don't have to re-parse the JSON.
/// Parsed score, so the tick / delete paths don't re-parse.
score: PodmanV0Score,
/// Runtime container id per service name (v0.3 Ch5). The identity handle:
/// the tick checks these are alive (not the spec — which caused the old 30s
/// redeploy loop), and an upgrade stops the exact old container by id.
container_ids: HashMap<String, String>,
}
pub struct Reconciler {
device_id: Id,
topology: Arc<PodmanTopology>,
inventory: Arc<Inventory>,
/// Keyed by NATS KV key (`<device>.<deployment>`). A single entry per
/// KV key — in v0 there is no fan-out from one key to many scores.
topology: Arc<dyn ContainerRuntime>,
/// Keyed by NATS KV key (`<device>.<deployment>`). One entry per key.
state: Mutex<HashMap<String, CachedEntry>>,
/// Current phase per deployment, used to decide whether a new
/// write to the `device-state` KV is needed.
///
/// NOTE : this feels dangerous, conflict on deployment name could be a problem
/// We must explore this and clarify it in the design and decide if it is a constraint
deployments: Mutex<HashMap<DeploymentName, Phase>>,
/// Publish surface. Optional so unit tests without a live NATS
/// client still work; always populated in the real agent runtime.
/// Last (phase, container_ids) published per deployment, to skip
/// re-publishing unchanged device-state.
deployments: Mutex<HashMap<DeploymentName, (Phase, Vec<String>)>>,
/// Publish surface. Optional so unit tests without a live NATS client work.
fleet: Option<Arc<FleetPublisher>>,
}
impl Reconciler {
pub fn new(
device_id: Id,
topology: Arc<PodmanTopology>,
inventory: Arc<Inventory>,
topology: Arc<dyn ContainerRuntime>,
fleet: Option<Arc<FleetPublisher>>,
) -> Self {
Self {
device_id,
topology,
inventory,
state: Mutex::new(HashMap::new()),
deployments: Mutex::new(HashMap::new()),
fleet,
}
}
/// Record a new phase for a deployment and, if it changed, write
/// the updated [`DeploymentState`] to the KV. Same-phase
/// re-confirmations are no-ops so the periodic reconcile tick
/// doesn't churn the bucket.
/// Record a phase (+ backing container ids) for a deployment and publish the
/// [`DeploymentState`] when it changed. Same (phase, ids) is a no-op so the
/// periodic tick doesn't churn the bucket.
async fn apply_phase(
&self,
deployment: &DeploymentName,
phase: Phase,
last_error: Option<String>,
container_ids: Vec<String>,
) {
{
let mut phases = self.deployments.lock().await;
// performance nitpick : we don't need a write lock here, we could check before acquiring the write
// lock
if phases.get(deployment).copied() == Some(phase) {
if phases.get(deployment) == Some(&(phase, container_ids.clone())) {
return;
}
phases.insert(deployment.clone(), phase);
phases.insert(deployment.clone(), (phase, container_ids.clone()));
}
if let Some(publisher) = &self.fleet {
@@ -86,14 +79,12 @@ impl Reconciler {
phase,
last_event_at: Utc::now(),
last_error,
container_ids,
};
publisher.write_deployment_state(&state).await;
}
}
/// Clear the in-memory phase for a deployment and delete its KV
/// entry. Idempotent: a delete for a never-applied deployment is
/// a no-op in memory and a harmless tombstone write on the wire.
async fn drop_phase(&self, deployment: &DeploymentName) {
let was_known = {
let mut phases = self.deployments.lock().await;
@@ -107,9 +98,26 @@ impl Reconciler {
}
}
/// Handle a Put event (new or updated score on NATS KV). No-ops if the
/// serialized score is byte-identical to the last-seen value for this
/// key.
/// Adopt-or-start: if a container with this name is already running, return
/// its id (no recreate — survives an agent restart without flapping the
/// workload); otherwise (re)create it from `spec` and return the new id.
async fn ensure_started(&self, spec: &ContainerSpec) -> Result<String> {
match self
.topology
.container_status(&spec.name)
.await
.map_err(rt)?
{
Some(obs) if obs.running => Ok(obs.id),
_ => self.topology.start_service(spec).await.map_err(rt),
}
}
/// Handle a Put: NOOP if byte-identical to the last-seen score; a new key
/// starts its services; a changed key **gracefully rolls forward** — stop
/// each old container (SIGTERM → grace → SIGKILL per the score's
/// `LifecyclePolicy`), then start the new one. Roll-forward only: a failure
/// is reported as `Failed`, never reverted (the customer edits the spec).
pub async fn apply(&self, key: &str, value: &[u8]) -> Result<()> {
let deployment = deployment_from_key(key);
let incoming = match serde_json::from_slice::<ReconcileScore>(value) {
@@ -117,79 +125,135 @@ impl Reconciler {
Err(e) => {
tracing::warn!(key, error = %e, "failed to deserialize score");
if let Some(name) = &deployment {
self.apply_phase(name, Phase::Failed, Some(format!("bad payload: {e}")))
.await;
self.apply_phase(
name,
Phase::Failed,
Some(format!("bad payload: {e}")),
vec![],
)
.await;
}
return Ok(());
}
};
let serialized = String::from_utf8_lossy(value).into_owned();
{
let previous = {
let state = self.state.lock().await;
if let Some(existing) = state.get(key) {
if existing.serialized == serialized {
match state.get(key) {
Some(existing) if existing.serialized == serialized => {
tracing::debug!(key, "score unchanged — noop");
return Ok(());
}
Some(existing) => Some(existing.container_ids.clone()),
None => None,
}
}
};
// Upgrade in flight → Pending (the transient cutover window).
if let Some(name) = &deployment {
self.apply_phase(name, Phase::Pending, None).await;
self.apply_phase(name, Phase::Pending, None, vec![]).await;
}
match self.run_score(key, &incoming).await {
Ok(()) => {
match self.converge(&incoming, previous.as_ref()).await {
Ok(ids) => {
let id_list: Vec<String> = incoming
.services
.iter()
.filter_map(|s| ids.get(&s.name).cloned())
.collect();
if let Some(name) = &deployment {
self.apply_phase(name, Phase::Running, None).await;
self.apply_phase(name, Phase::Running, None, id_list).await;
}
let mut state = self.state.lock().await;
state.insert(
key.to_string(),
CachedEntry {
serialized,
score: incoming,
container_ids: ids,
},
);
Ok(())
}
Err(e) => {
if let Some(name) = &deployment {
self.apply_phase(name, Phase::Failed, Some(short(&e.to_string())))
self.apply_phase(name, Phase::Failed, Some(short(&e.to_string())), vec![])
.await;
}
return Err(e);
Err(e)
}
}
let mut state = self.state.lock().await;
state.insert(
key.to_string(),
CachedEntry {
serialized,
score: incoming,
},
);
Ok(())
}
/// Handle a Delete/Purge event. Stops and removes every container
/// referenced by the last cached score for this key. Idempotent: if we
/// never saw a Put for this key (agent restart after delete), logs and
/// returns ok.
/// Drive actual → desired for one score. New deployment: start each service.
/// Changed: stop the old container by id (graceful) then start the new one,
/// and remove services dropped from the spec. Returns service → new id.
async fn converge(
&self,
score: &PodmanV0Score,
previous: Option<&HashMap<String, String>>,
) -> Result<HashMap<String, String>> {
let lifecycle = score.lifecycle_policy();
let mut ids = HashMap::new();
for service in &score.services {
let spec = score.container_spec(service);
match previous.and_then(|p| p.get(&service.name)) {
// Existing service whose spec changed → graceful replace by id.
Some(old_id) => {
self.topology
.stop_service(old_id, lifecycle.grace_period_secs)
.await
.map_err(rt)?;
let id = self.topology.start_service(&spec).await.map_err(rt)?;
ids.insert(service.name.clone(), id);
}
// New (or first-seen) service → adopt-or-start.
None => {
ids.insert(service.name.clone(), self.ensure_started(&spec).await?);
}
}
}
// Services removed from the spec: gracefully stop their old containers.
if let Some(prev) = previous {
for (old_name, old_id) in prev {
if !score.services.iter().any(|s| &s.name == old_name) {
if let Err(e) = self
.topology
.stop_service(old_id, lifecycle.grace_period_secs)
.await
{
tracing::warn!(service = %old_name, error = %e, "failed to stop removed service");
}
}
}
}
Ok(ids)
}
/// Handle a Delete/Purge: gracefully stop+remove every container for the
/// last cached score. Idempotent.
pub async fn remove(&self, key: &str) -> Result<()> {
let deployment = deployment_from_key(key);
let mut state = self.state.lock().await;
let Some(entry) = state.remove(key) else {
let entry = self.state.lock().await.remove(key);
let Some(entry) = entry else {
tracing::info!(key, "delete for unknown key — nothing to remove");
if let Some(name) = &deployment {
self.drop_phase(name).await;
}
return Ok(());
};
drop(state);
use harmony::topology::ContainerRuntime;
let grace = entry.score.lifecycle_policy().grace_period_secs;
for service in &entry.score.services {
if let Err(e) = self.topology.remove_service(&service.name).await {
tracing::warn!(
key,
service = %service.name,
error = %e,
"failed to remove container"
);
// Prefer the recorded id; fall back to the service name.
let target = entry
.container_ids
.get(&service.name)
.cloned()
.unwrap_or_else(|| service.name.clone());
if let Err(e) = self.topology.stop_service(&target, grace).await {
tracing::warn!(key, service = %service.name, error = %e, "failed to remove container");
} else {
tracing::info!(key, service = %service.name, "removed container");
}
@@ -200,11 +264,10 @@ impl Reconciler {
Ok(())
}
/// Periodic ground-truth reconcile. ROADMAP §5.6 — "polling instead of
/// event-driven PLEG. Agent polls podman every 30s as ground truth;
/// KV watch events are accelerators." Re-runs each cached score against
/// podman-api; the underlying `ensure_service_running` is idempotent
/// so a converged state produces no log noise.
/// Periodic ground-truth reconcile. **Liveness only**: verify each cached
/// deployment's containers are up (adopt-or-start), never compare the spec
/// — that's what killed the old 30s redeploy loop. Spec changes arrive as a
/// KV Put and go through `apply`'s graceful path instead.
pub async fn tick(&self) -> Result<()> {
let snapshot: Vec<(String, PodmanV0Score)> = {
let state = self.state.lock().await;
@@ -215,18 +278,37 @@ impl Reconciler {
};
for (key, score) in snapshot {
let deployment = deployment_from_key(&key);
match self.run_score(&key, &score).await {
Ok(()) => {
if let Some(name) = &deployment {
self.apply_phase(name, Phase::Running, None).await;
let mut ids = HashMap::new();
let mut error: Option<String> = None;
for service in &score.services {
match self.ensure_started(&score.container_spec(service)).await {
Ok(id) => {
ids.insert(service.name.clone(), id);
}
Err(e) => {
tracing::warn!(key, error = %e, "periodic reconcile failed");
error = Some(short(&e.to_string()));
break;
}
}
Err(e) => {
tracing::warn!(key, error = %e, "periodic reconcile failed");
if let Some(name) = &deployment {
self.apply_phase(name, Phase::Failed, Some(short(&e.to_string())))
.await;
}
// Refresh the cached ids so a recreated container's id is recorded.
if error.is_none() {
if let Some(entry) = self.state.lock().await.get_mut(&key) {
entry.container_ids = ids.clone();
}
}
if let Some(name) = &deployment {
match error {
None => {
let id_list = score
.services
.iter()
.filter_map(|s| ids.get(&s.name).cloned())
.collect();
self.apply_phase(name, Phase::Running, None, id_list).await;
}
Some(e) => self.apply_phase(name, Phase::Failed, Some(e), vec![]).await,
}
}
}
@@ -243,16 +325,11 @@ impl Reconciler {
}
}
}
}
async fn run_score(&self, key: &str, score: &PodmanV0Score) -> Result<()> {
let interpret = Score::<PodmanTopology>::create_interpret(score);
let outcome = interpret
.execute(&self.inventory, &self.topology)
.await
.map_err(|e| anyhow::anyhow!("PodmanV0Score interpret failed for {key}: {e}"))?;
tracing::info!(key, outcome = ?outcome, "reconciled");
Ok(())
}
/// Map a runtime `ExecutorError` into `anyhow` at the agent boundary.
fn rt(e: harmony::executors::ExecutorError) -> anyhow::Error {
anyhow::anyhow!("container runtime: {e}")
}
/// Extract the deployment name from a NATS KV key of the form
@@ -277,25 +354,168 @@ fn short(s: &str) -> String {
#[cfg(test)]
mod tests {
//! Focused tests for transition detection. Drive `apply_phase` /
//! `drop_phase` directly with an inert topology (no real podman
//! socket) and a `None` FleetPublisher.
//! Drives the reconciler against an in-memory [`FakeRuntime`] (no podman
//! socket) and a `None` FleetPublisher, so the convergence logic — the
//! loop-fix and the graceful roll-forward — is tested deterministically.
use super::*;
use harmony::inventory::Inventory;
use harmony::modules::podman::PodmanTopology;
use std::path::PathBuf;
use harmony::executors::ExecutorError;
use harmony::modules::podman::{PodmanService, ReconcileScore};
use harmony::topology::{ContainerObservation, ContainerState};
use std::sync::Mutex as StdMutex;
fn reconciler() -> Reconciler {
let topology = Arc::new(
PodmanTopology::from_unix_socket(PathBuf::from("/nonexistent/for-tests")).unwrap(),
);
let inventory = Arc::new(Inventory::empty());
Reconciler::new(
#[derive(Default)]
struct FakeState {
/// name → (id, running).
containers: HashMap<String, (String, bool)>,
next_id: usize,
started: Vec<String>,
stopped: Vec<String>,
fail_next_start: bool,
}
#[derive(Default)]
struct FakeRuntime {
inner: StdMutex<FakeState>,
}
impl FakeRuntime {
fn started(&self) -> Vec<String> {
self.inner.lock().unwrap().started.clone()
}
fn stopped(&self) -> Vec<String> {
self.inner.lock().unwrap().stopped.clone()
}
fn fail_next_start(&self) {
self.inner.lock().unwrap().fail_next_start = true;
}
}
#[async_trait::async_trait]
impl ContainerRuntime for FakeRuntime {
async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError> {
self.start_service(spec).await.map(|_| ())
}
async fn start_service(&self, spec: &ContainerSpec) -> Result<String, ExecutorError> {
let mut s = self.inner.lock().unwrap();
if s.fail_next_start {
s.fail_next_start = false;
return Err(ExecutorError::UnexpectedError("start failed".into()));
}
s.next_id += 1;
let id = format!("id{}", s.next_id);
s.containers.insert(spec.name.clone(), (id.clone(), true));
s.started.push(id.clone());
Ok(id)
}
async fn container_status(
&self,
name: &str,
) -> Result<Option<ContainerObservation>, ExecutorError> {
let s = self.inner.lock().unwrap();
Ok(s.containers
.get(name)
.map(|(id, running)| ContainerObservation {
id: id.clone(),
running: *running,
}))
}
async fn stop_service(&self, id_or_name: &str, _grace: u64) -> Result<(), ExecutorError> {
let mut s = self.inner.lock().unwrap();
s.stopped.push(id_or_name.to_string());
s.containers
.retain(|name, (id, _)| name != id_or_name && id != id_or_name);
Ok(())
}
async fn remove_service(&self, name: &str) -> Result<(), ExecutorError> {
self.stop_service(name, 0).await
}
async fn list_managed_services(&self) -> Result<Vec<ContainerState>, ExecutorError> {
Ok(vec![])
}
}
fn reconciler() -> (Reconciler, Arc<FakeRuntime>) {
let fake = Arc::new(FakeRuntime::default());
let r = Reconciler::new(
Id::from("test-device".to_string()),
topology,
inventory,
fake.clone() as Arc<dyn ContainerRuntime>,
None,
)
);
(r, fake)
}
fn score_bytes(image: &str) -> Vec<u8> {
let score = PodmanV0Score {
services: vec![PodmanService {
name: "web".to_string(),
image: image.to_string(),
ports: vec![],
env: vec![EnvVar::new("LOG", "info")],
volumes: vec![],
restart_policy: Default::default(),
}],
lifecycle: None,
};
serde_json::to_vec(&ReconcileScore::PodmanV0(score)).unwrap()
}
use harmony::topology::EnvVar;
#[tokio::test]
async fn tick_is_idempotent_no_recreate() {
// The loop-fix: a service WITH env (which the old matches_spec declared
// "always drifted") must not be recreated by the periodic tick.
let (r, fake) = reconciler();
r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap();
assert_eq!(fake.started().len(), 1, "initial start");
for _ in 0..3 {
r.tick().await.unwrap();
}
assert_eq!(
fake.started().len(),
1,
"tick must not recreate a running container"
);
assert!(
fake.stopped().is_empty(),
"tick must not stop a healthy container"
);
}
#[tokio::test]
async fn changed_score_gracefully_replaces_by_id() {
let (r, fake) = reconciler();
r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap();
r.apply("dev.web", &score_bytes("nginx:2")).await.unwrap();
// Old container (id1) stopped gracefully, new one (id2) started.
assert_eq!(fake.started(), vec!["id1", "id2"]);
assert_eq!(fake.stopped(), vec!["id1"]);
}
#[tokio::test]
async fn unchanged_score_is_a_noop() {
let (r, fake) = reconciler();
r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap();
r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap();
assert_eq!(fake.started().len(), 1, "byte-identical re-apply is a noop");
}
#[tokio::test]
async fn roll_forward_failure_reports_failed_without_revert() {
let (r, fake) = reconciler();
r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap();
fake.fail_next_start();
let res = r.apply("dev.web", &score_bytes("nginx:2")).await;
assert!(res.is_err(), "upgrade failure surfaces");
// Old was stopped; new start failed; NO revert (roll-forward only).
assert_eq!(fake.stopped(), vec!["id1"]);
assert_eq!(
fake.started(),
vec!["id1"],
"no revert restart of the old version"
);
let phases = r.deployments.lock().await;
assert_eq!(phases.get(&dn("web")).map(|(p, _)| *p), Some(Phase::Failed));
}
fn dn(s: &str) -> DeploymentName {
@@ -304,36 +524,63 @@ mod tests {
#[tokio::test]
async fn apply_phase_records_new_phase() {
let r = reconciler();
r.apply_phase(&dn("hello"), Phase::Running, None).await;
let (r, _) = reconciler();
r.apply_phase(&dn("hello"), Phase::Running, None, vec![])
.await;
let phases = r.deployments.lock().await;
assert_eq!(phases.get(&dn("hello")), Some(&Phase::Running));
assert_eq!(
phases.get(&dn("hello")).map(|(p, _)| *p),
Some(Phase::Running)
);
}
#[tokio::test]
async fn apply_phase_idempotent_for_same_phase() {
let r = reconciler();
r.apply_phase(&dn("hello"), Phase::Running, None).await;
r.apply_phase(&dn("hello"), Phase::Running, None).await;
async fn apply_phase_idempotent_for_same_phase_and_ids() {
let (r, _) = reconciler();
r.apply_phase(&dn("hello"), Phase::Running, None, vec!["abc".into()])
.await;
r.apply_phase(&dn("hello"), Phase::Running, None, vec!["abc".into()])
.await;
let phases = r.deployments.lock().await;
assert_eq!(phases.len(), 1);
}
#[tokio::test]
async fn apply_phase_transitions_update_phase() {
let r = reconciler();
r.apply_phase(&dn("hello"), Phase::Pending, None).await;
r.apply_phase(&dn("hello"), Phase::Running, None).await;
r.apply_phase(&dn("hello"), Phase::Failed, Some("oom".to_string()))
async fn apply_phase_republishes_when_container_id_changes() {
let (r, _) = reconciler();
r.apply_phase(&dn("hello"), Phase::Running, None, vec!["old".into()])
.await;
// Same phase, new id (after an upgrade) → recorded.
r.apply_phase(&dn("hello"), Phase::Running, None, vec!["new".into()])
.await;
let phases = r.deployments.lock().await;
assert_eq!(phases.get(&dn("hello")), Some(&Phase::Failed));
assert_eq!(
phases.get(&dn("hello")),
Some(&(Phase::Running, vec!["new".to_string()]))
);
}
#[tokio::test]
async fn apply_phase_transitions_update_phase() {
let (r, _) = reconciler();
r.apply_phase(&dn("hello"), Phase::Pending, None, vec![])
.await;
r.apply_phase(&dn("hello"), Phase::Running, None, vec![])
.await;
r.apply_phase(&dn("hello"), Phase::Failed, Some("oom".to_string()), vec![])
.await;
let phases = r.deployments.lock().await;
assert_eq!(
phases.get(&dn("hello")).map(|(p, _)| *p),
Some(Phase::Failed)
);
}
#[tokio::test]
async fn drop_phase_clears_known_deployment() {
let r = reconciler();
r.apply_phase(&dn("hello"), Phase::Running, None).await;
let (r, _) = reconciler();
r.apply_phase(&dn("hello"), Phase::Running, None, vec![])
.await;
r.drop_phase(&dn("hello")).await;
let phases = r.deployments.lock().await;
assert!(!phases.contains_key(&dn("hello")));
@@ -341,7 +588,7 @@ mod tests {
#[tokio::test]
async fn drop_phase_on_unknown_deployment_is_noop() {
let r = reconciler();
let (r, _) = reconciler();
r.drop_phase(&dn("never-existed")).await;
let phases = r.deployments.lock().await;
assert!(phases.is_empty());

View File

@@ -137,6 +137,7 @@ async fn create_fleet_deployment(deployments: &Api<Deployment>, name: &str) -> a
volumes: vec![],
restart_policy: Default::default(),
}],
lifecycle: None,
}),
rollout: Rollout {
strategy: RolloutStrategy::Immediate,

View File

@@ -125,6 +125,7 @@ fn podman_score() -> PodmanV0Score {
volumes: vec![],
restart_policy: Default::default(),
}],
lifecycle: None,
}
}
@@ -248,6 +249,7 @@ async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> {
phase: Phase::Running,
last_event_at: Utc::now(),
last_error: None,
container_ids: vec![],
})
.await?;
kill(h1).await;

View File

@@ -185,5 +185,6 @@ fn podman_score(image_tag: &str) -> PodmanV0Score {
volumes: vec![],
restart_policy: RestartPolicy::default(),
}],
lifecycle: None,
}
}

View File

@@ -92,6 +92,7 @@ async fn agent_ignores_other_devices_keys() -> anyhow::Result<()> {
volumes: vec![],
restart_policy: RestartPolicy::default(),
}],
lifecycle: None,
};
admin.put_podman_at_key(&foreign_key, &score).await?;

View File

@@ -818,6 +818,7 @@ mod tests {
phase,
last_event_at: Utc.timestamp_opt(1_700_000_000 + seconds, 0).unwrap(),
last_error: None,
container_ids: vec![],
}
}

View File

@@ -652,6 +652,7 @@ mod tests {
volumes: vec![],
restart_policy: Default::default(),
}],
lifecycle: None,
}),
rollout: Rollout {
strategy: RolloutStrategy::Immediate,
@@ -705,6 +706,7 @@ mod tests {
volumes: vec![],
restart_policy: Default::default(),
}],
lifecycle: None,
});
assert_eq!(first_service_name(&score).as_deref(), Some("web"));
}
@@ -720,6 +722,7 @@ mod tests {
volumes: vec![],
restart_policy: Default::default(),
}],
lifecycle: None,
});
assert_eq!(deployment_version(&score), "1.25");
}

View File

@@ -131,6 +131,12 @@ pub struct DeploymentState {
pub last_event_at: DateTime<Utc>,
#[serde(default)]
pub last_error: Option<String>,
/// Runtime container ids backing this deployment on the device, one per
/// service (v0.3 Ch5). The agent's identity handle for graceful replacement
/// and liveness; surfaced for the dashboard. `#[serde(default)]` for
/// wire-compat with pre-Ch5 agents that don't report it.
#[serde(default)]
pub container_ids: Vec<String>,
}
/// Tiny liveness ping. Written to KV key `heartbeat.<device_id>` in
@@ -233,6 +239,7 @@ mod tests {
phase: Phase::Failed,
last_event_at: ts("2026-04-22T10:05:00Z"),
last_error: Some("image pull 429".to_string()),
container_ids: vec![],
};
let json = serde_json::to_string(&original).unwrap();
let back: DeploymentState = serde_json::from_str(&json).unwrap();

View File

@@ -21,13 +21,36 @@ use crate::executors::ExecutorError;
/// capability will be clearer than stretching this one.
#[async_trait]
pub trait ContainerRuntime: Send + Sync {
/// Ensure a container matching `spec` is running. Idempotent: running the
/// same spec twice must converge to the same state with no observable
/// side effects beyond logs. If a container with the same `name` already
/// exists with the same image and port mappings, this is a NOOP.
/// Otherwise it is stopped, removed, and recreated.
/// Ensure a container matching `spec` is running. Idempotent and
/// **liveness-only**: if a container with this `name` is already running,
/// it's a NOOP — the runtime does **not** compare the spec (a polling
/// reconciler would otherwise re-create on every tick for fields it can't
/// read back). Spec changes are driven through [`Self::replace_service`] by
/// a stateful caller that knows the desired state changed. If no container
/// is running, one is (re)created from `spec`.
async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError>;
/// Create + start a container from `spec`, returning its runtime id. Any
/// existing container with the same name is force-removed first. The id is
/// the caller's handle for liveness checks and targeted replacement.
async fn start_service(&self, spec: &ContainerSpec) -> Result<String, ExecutorError>;
/// Observe the container with this name: its id + whether it's running.
/// `None` if no such container exists.
async fn container_status(
&self,
name: &str,
) -> Result<Option<ContainerObservation>, ExecutorError>;
/// Gracefully stop and remove a container by id or name: send the
/// container's stop signal, wait `grace_period_secs`, then SIGKILL
/// (podman's stop semantics), then remove. Idempotent — no error if gone.
async fn stop_service(
&self,
id_or_name: &str,
grace_period_secs: u64,
) -> Result<(), ExecutorError>;
/// Stop and remove the container with this name if it exists. No error
/// if the container does not exist — idempotent.
async fn remove_service(&self, name: &str) -> Result<(), ExecutorError>;
@@ -38,6 +61,37 @@ pub trait ContainerRuntime: Send + Sync {
async fn list_managed_services(&self) -> Result<Vec<ContainerState>, ExecutorError>;
}
/// Graceful-shutdown policy for a deployment's container during a roll-forward
/// upgrade (v0.3 Ch5). Mirrors K8s/`docker stop` semantics: send `stop_signal`,
/// wait `grace_period_secs`, then SIGKILL (podman's stop always force-kills
/// after the timeout — there's no "never kill" knob, so we don't pretend one).
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct LifecyclePolicy {
/// Signal sent first to ask the process to exit cleanly (e.g. `SIGTERM`).
pub stop_signal: String,
/// Seconds to wait for a clean exit before SIGKILL.
pub grace_period_secs: u64,
}
impl Default for LifecyclePolicy {
fn default() -> Self {
Self {
stop_signal: "SIGTERM".to_string(),
grace_period_secs: 30,
}
}
}
/// A container's identity + liveness, as observed on the runtime.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ContainerObservation {
/// Runtime container id.
pub id: String,
/// Whether it's currently running.
pub running: bool,
}
/// A single environment variable, name/value pair.
///
/// Modeled as a struct rather than `(String, String)` because k8s CRD
@@ -98,6 +152,11 @@ pub struct ContainerSpec {
/// Restart policy on container exit. Mirrors podman/docker semantics.
#[serde(default)]
pub restart_policy: RestartPolicy,
/// Signal sent on graceful stop (from the deployment's `LifecyclePolicy`).
/// Baked into the container at create time so `podman stop` uses it. `None`
/// = the image/podman default (`SIGTERM`).
#[serde(default)]
pub stop_signal: Option<String>,
}
impl ContainerSpec {

View File

@@ -5,7 +5,7 @@ use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
topology::{ContainerRuntime, ContainerSpec, Topology},
topology::{ContainerRuntime, Topology},
};
use super::score::PodmanV0Score;
@@ -54,15 +54,7 @@ impl<T: Topology + ContainerRuntime> Interpret<T> for PodmanV0Interpret {
topology: &T,
) -> Result<Outcome, InterpretError> {
for service in &self.score.services {
let spec = ContainerSpec {
name: service.name.clone(),
image: service.image.clone(),
ports: service.ports.clone(),
labels: vec![(DEPLOYMENT_LABEL.to_string(), self.score.deployment_label())],
env: service.env.clone(),
volumes: service.volumes.clone(),
restart_policy: service.restart_policy,
};
let spec = self.score.container_spec(service);
topology.ensure_service_running(&spec).await.map_err(|e| {
InterpretError::new(format!(
"PodmanV0: ensure_service_running({}) failed: {e}",

View File

@@ -13,10 +13,13 @@ use serde::{Deserialize, Serialize};
use crate::{
interpret::Interpret,
score::Score,
topology::{ContainerRuntime, EnvVar, RestartPolicy, Topology, VolumeMount},
topology::{
ContainerRuntime, ContainerSpec, EnvVar, LifecyclePolicy, RestartPolicy, Topology,
VolumeMount,
},
};
use super::interpret::PodmanV0Interpret;
use super::interpret::{DEPLOYMENT_LABEL, PodmanV0Interpret};
/// A single container managed by podman on the target host.
///
@@ -46,9 +49,36 @@ pub struct PodmanService {
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
pub struct PodmanV0Score {
pub services: Vec<PodmanService>,
/// Graceful-shutdown policy used when a changed Score replaces a running
/// container (v0.3 Ch5 roll-forward upgrade). `None` = the default
/// (SIGTERM, 30s grace, SIGKILL fallback). Defaults so older Deployment CRs
/// without the field deserialize unchanged.
#[serde(default)]
pub lifecycle: Option<LifecyclePolicy>,
}
impl PodmanV0Score {
/// The effective lifecycle policy (configured, or the default).
pub fn lifecycle_policy(&self) -> LifecyclePolicy {
self.lifecycle.clone().unwrap_or_default()
}
/// Build the [`ContainerSpec`] for one service: image/ports/env/volumes plus
/// the deployment label and the lifecycle stop signal. One place so the
/// interpret and the agent reconciler agree on the mapping.
pub fn container_spec(&self, service: &PodmanService) -> ContainerSpec {
ContainerSpec {
name: service.name.clone(),
image: service.image.clone(),
ports: service.ports.clone(),
labels: vec![(DEPLOYMENT_LABEL.to_string(), self.deployment_label())],
env: service.env.clone(),
volumes: service.volumes.clone(),
restart_policy: service.restart_policy,
stop_signal: self.lifecycle.as_ref().map(|l| l.stop_signal.clone()),
}
}
/// Label value applied to every container this Score creates. The
/// caller uses the label to enumerate the set of containers that
/// belong to a given Score instance (e.g. to tear them down when
@@ -122,6 +152,7 @@ mod tests {
ports: vec!["8080:80".to_string()],
..svc("web", "nginx:latest")
}],
lifecycle: None,
});
let json = serde_json::to_string(&score).unwrap();
assert!(json.contains("\"type\":\"PodmanV0\""));
@@ -148,6 +179,7 @@ mod tests {
..svc("api", "myapp:1.0")
},
],
lifecycle: None,
});
let serialized = serde_json::to_string(&score).unwrap();
let deserialized: ReconcileScore = serde_json::from_str(&serialized).unwrap();
@@ -229,6 +261,7 @@ mod tests {
fn deployment_label_joins_service_names() {
let score = PodmanV0Score {
services: vec![svc("web", "nginx"), svc("api", "myapp")],
lifecycle: None,
};
assert_eq!(score.deployment_label(), "web,api");
}

View File

@@ -12,8 +12,8 @@ use podman_api::opts::{
};
use crate::domain::topology::{
ContainerRuntime, ContainerSpec, ContainerState, PreparationError, PreparationOutcome,
RestartPolicy, Topology, VolumeMount,
ContainerObservation, ContainerRuntime, ContainerSpec, ContainerState, PreparationError,
PreparationOutcome, RestartPolicy, Topology, VolumeMount,
};
use crate::executors::ExecutorError;
@@ -97,49 +97,11 @@ impl PodmanTopology {
Err(e) => Err(to_exec_error(e)),
}
}
}
#[async_trait]
impl Topology for PodmanTopology {
fn name(&self) -> &str {
&self.name
}
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
// A quick ping — calling info() is the cheapest endpoint that
// exercises the socket end-to-end.
self.podman
.info()
.await
.map_err(|e| PreparationError::new(format!("podman socket unreachable: {e}")))?;
Ok(PreparationOutcome::Noop)
}
}
#[async_trait]
impl ContainerRuntime for PodmanTopology {
async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError> {
let existing = self.get_by_name(&spec.name).await?;
if let Some(existing) = existing.as_ref() {
if matches_spec(existing, spec) {
let running = existing.state.as_deref() == Some("running");
if running {
return Ok(());
}
// Same spec, just not running — start it rather than recreate.
let id = existing.id.clone().unwrap_or_else(|| spec.name.clone());
self.containers()
.get(id)
.start(None)
.await
.map_err(to_exec_error)?;
return Ok(());
}
// Drift — remove and recreate below.
self.remove_container(&spec.name).await?;
}
/// Create + start a container from `spec`, returning its id. The container's
/// stop signal + stop timeout are baked in here so a later `podman stop`
/// honors the deployment's lifecycle policy.
async fn create_and_start(&self, spec: &ContainerSpec) -> Result<String, ExecutorError> {
self.ensure_image_present(&spec.image).await?;
let mut labels = HashMap::new();
@@ -171,34 +133,94 @@ impl ContainerRuntime for PodmanTopology {
.portmappings(port_mappings)
.env(env_map)
.restart_policy(map_restart_policy(spec.restart_policy));
if let Some(sig) = spec.stop_signal.as_deref() {
builder = builder.stop_signal(signal_number(sig));
}
if !mounts.is_empty() {
builder = builder.mounts(mounts);
}
let opts = builder.build();
let created = self
.containers()
.create(&opts)
.create(&builder.build())
.await
.map_err(to_exec_error)?;
let id = created.id;
self.containers()
.get(created.id.clone())
.get(id.clone())
.start(None)
.await
.map_err(to_exec_error)?;
Ok(())
Ok(id)
}
}
#[async_trait]
impl Topology for PodmanTopology {
fn name(&self) -> &str {
&self.name
}
async fn remove_service(&self, name: &str) -> Result<(), ExecutorError> {
// First try a graceful stop; ignore failures so removal still happens.
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
// A quick ping — calling info() is the cheapest endpoint that
// exercises the socket end-to-end.
self.podman
.info()
.await
.map_err(|e| PreparationError::new(format!("podman socket unreachable: {e}")))?;
Ok(PreparationOutcome::Noop)
}
}
#[async_trait]
impl ContainerRuntime for PodmanTopology {
async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError> {
// Liveness-only: never compare the spec (that's the caller's job via
// byte-compare of the desired state). A container that's already up
// stays up — no per-tick recreate, no flap.
match self.container_status(&spec.name).await? {
Some(obs) if obs.running => Ok(()),
_ => self.start_service(spec).await.map(|_| ()),
}
}
async fn start_service(&self, spec: &ContainerSpec) -> Result<String, ExecutorError> {
// Force-remove any stale container by this name so create can't collide.
self.remove_container(&spec.name).await?;
self.create_and_start(spec).await
}
async fn container_status(
&self,
name: &str,
) -> Result<Option<ContainerObservation>, ExecutorError> {
let Some(c) = self.get_by_name(name).await? else {
return Ok(None);
};
Ok(Some(ContainerObservation {
id: c.id.clone().unwrap_or_default(),
running: c.state.as_deref() == Some("running"),
}))
}
async fn stop_service(
&self,
id_or_name: &str,
grace_period_secs: u64,
) -> Result<(), ExecutorError> {
// Graceful: send the container's stop signal, wait `grace`, then podman
// SIGKILLs. Ignore stop failures so removal still happens.
let stop_opts = ContainerStopOpts::builder()
.timeout(STOP_TIMEOUT.as_secs() as usize)
.timeout(grace_period_secs as usize)
.build();
let container = self.containers().get(name);
let container = self.containers().get(id_or_name);
if container.exists().await.unwrap_or(false) {
let _ = container.stop(&stop_opts).await;
}
self.remove_container(name).await
self.remove_container(id_or_name).await
}
async fn remove_service(&self, name: &str) -> Result<(), ExecutorError> {
self.stop_service(name, STOP_TIMEOUT.as_secs()).await
}
async fn list_managed_services(&self) -> Result<Vec<ContainerState>, ExecutorError> {
@@ -267,71 +289,19 @@ fn parse_port_mapping(raw: &str) -> Result<PortMapping, ExecutorError> {
})
}
fn matches_spec(observed: &podman_api::models::ListContainer, spec: &ContainerSpec) -> bool {
let same_image = observed
.image
.as_deref()
.map(|i| i == spec.image)
.unwrap_or(false);
if !same_image {
return false;
/// Map a signal name (`SIGTERM`) to its number for podman's `stop_signal`
/// create option. Unknown names fall back to SIGTERM (15) — the safe default.
fn signal_number(name: &str) -> i64 {
match name.trim().to_uppercase().trim_start_matches("SIG") {
"HUP" => 1,
"INT" => 2,
"QUIT" => 3,
"KILL" => 9,
"USR1" => 10,
"USR2" => 12,
"TERM" => 15,
other => other.parse().unwrap_or(15),
}
let observed_ports = observed.ports.as_deref().unwrap_or(&[]);
if observed_ports.len() != spec.ports.len() {
return false;
}
for raw in &spec.ports {
let Ok(expected) = parse_port_mapping(raw) else {
return false;
};
let present = observed_ports.iter().any(|p| {
p.host_port == expected.host_port && p.container_port == expected.container_port
});
if !present {
return false;
}
}
// FIXME(redeploy-loop): this branch makes the agent's periodic
// reconcile non-idempotent for any non-trivial Deployment.
// Symptom: a service with env or volumes is destroyed and
// recreated every 30s tick (RECONCILE_INTERVAL), even when the
// observed container is already correct — operators see flapping
// container IDs, intermittent connectivity blips, log noise.
//
// Root cause: `podman list` (v5.x) doesn't surface env or mounts,
// so we can't compare them; the original author chose to declare
// "any spec with env/volumes is drifted" as a fail-safe. That's
// the wrong default for a polling reconciler — it weaponizes the
// poll into a re-creation loop.
//
// Right fix (out of scope for the demo, in scope for delivery):
// 1. Switch this code path to `containers.get(name).inspect()`
// which DOES return env + mounts. Compare structurally.
// 2. Treat absent fields on the inspect response as "unchanged",
// not "drifted".
// 3. Add an integration test that runs ensure_service_running
// twice on the same spec and asserts the container ID is
// unchanged.
//
// Layered next: the upcoming health-check addition to
// ContainerSpec gives the agent a separate signal to decide
// when to recreate (failed health checks → unhealthy → recreate)
// independent of the spec-drift check.
//
// Until fixed: avoid env / volumes in demo-time deployments to
// dodge the loop. The hello-web nginx demo doesn't have either,
// which is why it's stable.
if !spec.env.is_empty() || !spec.volumes.is_empty() {
return false;
}
// Restart policy: ListContainer doesn't surface it directly. We
// only force a recreate when the spec explicitly asks for something
// other than the default — so unchanged podman-default behaviour
// stays a NOOP, and explicit policy changes converge on next apply.
if spec.restart_policy != RestartPolicy::default() {
return false;
}
true
}
fn volume_to_mount(v: &VolumeMount) -> ContainerMount {