feat(fleet): graceful roll-forward upgrade + container-ID identity (Ch5) #331
66
ROADMAP/fleet_platform/ch5-graceful-upgrade-status.md
Normal file
66
ROADMAP/fleet_platform/ch5-graceful-upgrade-status.md
Normal file
@@ -0,0 +1,66 @@
|
||||
# Ch5 — Graceful deployment upgrade (roll-forward) + redeploy-loop fix: status
|
||||
|
||||
Two things shipped together because they share the same root: **how the agent
|
||||
identifies a running container vs. its desired spec**.
|
||||
|
||||
## The redeploy-loop fix (the bug JG flagged)
|
||||
|
||||
`podman ps` doesn't surface env/volumes, so the old `matches_spec` declared
|
||||
*any* service with env or volumes "drifted" and re-created it **every 30s tick** —
|
||||
flapping container ids, connectivity blips, log noise.
|
||||
|
||||
Fix (per JG: "use container IDs, store them in NATS"): identity is the
|
||||
**container, by name+id**, not a spec comparison.
|
||||
- `ensure_service_running` is now **liveness-only**: a running container is a
|
||||
NOOP, no spec compare. The periodic tick adopts-or-(re)starts by name and
|
||||
never recreates a healthy container. The loop is gone — proven by
|
||||
`tick_is_idempotent_no_recreate` (a service *with env*, the old loop trigger).
|
||||
- Spec changes are detected where they always were — the reconciler's
|
||||
byte-compare of the desired-state JSON on a KV Put — not by the runtime.
|
||||
- The agent records each container's **id** (from `start_service`) in the cache
|
||||
and in `DeploymentState.container_ids` (surfaced for the dashboard).
|
||||
|
||||
## Graceful roll-forward upgrade
|
||||
|
||||
- `PodmanV0Score.lifecycle: Option<LifecyclePolicy>` (`stop_signal=SIGTERM`,
|
||||
`grace_period_secs=30`). The stop signal is baked into the container at
|
||||
create; the grace period drives `podman stop --time`, after which podman
|
||||
always SIGKILLs — so there's no `sigkill_fallback` knob to pretend otherwise
|
||||
(a config field nothing honors is a defect per AGENTS.md).
|
||||
- On a changed score the reconciler `converge`s: for each service, **stop the
|
||||
exact old container by its recorded id** (SIGTERM → grace → SIGKILL), then
|
||||
start the new one, record the new id. Services dropped from the spec are
|
||||
stopped. **Roll-forward only**: a failure reports `Phase::Failed` and is never
|
||||
reverted (the customer edits the spec) — proven by
|
||||
`roll_forward_failure_reports_failed_without_revert`.
|
||||
- The transient cutover window reports `Phase::Pending` (its documented "in
|
||||
flight" meaning) → `Running`, so the dashboard reflects the step without a new
|
||||
Phase variant.
|
||||
|
||||
## Architecture note
|
||||
|
||||
The graceful-replace orchestration lives in the **agent reconciler** (which
|
||||
holds the cross-reconcile state: cached score + container ids), driving the
|
||||
`ContainerRuntime` capability directly. The `PodmanV0Score` interpret stays the
|
||||
"ensure running" path for non-agent/initial converge. `container_spec` is the
|
||||
one place both share the `PodmanService → ContainerSpec` mapping. The reconciler
|
||||
is now generic over `dyn ContainerRuntime`, so the convergence logic is unit-
|
||||
tested against a `FakeRuntime` (no podman socket).
|
||||
|
||||
## Tested
|
||||
|
||||
- `tick_is_idempotent_no_recreate` — the loop-fix.
|
||||
- `changed_score_gracefully_replaces_by_id` — stop-old-by-id then start-new.
|
||||
- `unchanged_score_is_a_noop`.
|
||||
- `roll_forward_failure_reports_failed_without_revert`.
|
||||
- Plus the existing phase-transition tests, all green.
|
||||
|
||||
## Flagged for a supervised run
|
||||
|
||||
- **VM e2e v1→v2→v3 with controlled failures** (`harmony-fleet-e2e/src/vm`).
|
||||
Needs multiple image tags reachable from the KVM guest, like Ch4's upgrade
|
||||
e2e. The convergence logic is unit-proven against the fake runtime; this is
|
||||
the on-hardware integration proof. The existing `vm_deploy_lifecycle.rs`
|
||||
covers single-deploy.
|
||||
- Surfacing `container_ids` in the dashboard device view (data is already on
|
||||
`DeploymentState`).
|
||||
@@ -423,6 +423,7 @@ async fn apply_one_cr(
|
||||
volumes: vec![],
|
||||
restart_policy: Default::default(),
|
||||
}],
|
||||
lifecycle: None,
|
||||
}),
|
||||
rollout: Rollout {
|
||||
strategy: RolloutStrategy::Immediate,
|
||||
@@ -496,6 +497,7 @@ async fn simulate_state_loop(
|
||||
last_event_at: Utc::now(),
|
||||
last_error: matches!(phase, Phase::Failed)
|
||||
.then(|| format!("synthetic failure @{}", device.device_id)),
|
||||
container_ids: vec![],
|
||||
};
|
||||
match serde_json::to_vec(&ds) {
|
||||
Ok(payload) => match bucket.put(&state_key, payload.into()).await {
|
||||
|
||||
@@ -207,6 +207,7 @@ fn build_cr(cli: &Cli) -> Deployment {
|
||||
volumes,
|
||||
restart_policy: cli.restart.into(),
|
||||
}],
|
||||
lifecycle: None,
|
||||
};
|
||||
|
||||
let payload = ReconcileScore::PodmanV0(score);
|
||||
|
||||
@@ -298,7 +298,6 @@ async fn main() -> Result<()> {
|
||||
Arc::new(Reconciler::new(
|
||||
device_id.clone(),
|
||||
t.clone(),
|
||||
inventory.clone(),
|
||||
Some(fleet.clone()),
|
||||
))
|
||||
});
|
||||
|
||||
@@ -7,76 +7,69 @@ use chrono::Utc;
|
||||
use harmony_reconciler_contracts::{DeploymentName, DeploymentState, Id, Phase};
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use harmony::inventory::Inventory;
|
||||
use harmony::modules::podman::{PodmanTopology, PodmanV0Score, ReconcileScore};
|
||||
use harmony::score::Score;
|
||||
use harmony::modules::podman::{PodmanV0Score, ReconcileScore};
|
||||
use harmony::topology::{ContainerRuntime, ContainerSpec};
|
||||
|
||||
use crate::fleet_publisher::FleetPublisher;
|
||||
|
||||
/// Cache key → last-seen state, populated by `apply` and consulted by the
|
||||
/// 30-second periodic tick and the delete path.
|
||||
struct CachedEntry {
|
||||
/// Serialized score JSON. Used for string-compare idempotency per
|
||||
/// ROADMAP §5.5 — cheaper and more deterministic than a hash.
|
||||
/// Serialized score JSON. Byte-compared for idempotency: a re-delivered
|
||||
/// identical desired-state is a NOOP; a changed one drives a graceful
|
||||
/// roll-forward upgrade.
|
||||
serialized: String,
|
||||
/// Parsed score. Cached so the periodic reconcile tick and delete
|
||||
/// handlers don't have to re-parse the JSON.
|
||||
/// Parsed score, so the tick / delete paths don't re-parse.
|
||||
score: PodmanV0Score,
|
||||
/// Runtime container id per service name (v0.3 Ch5). The identity handle:
|
||||
/// the tick checks these are alive (not the spec — which caused the old 30s
|
||||
/// redeploy loop), and an upgrade stops the exact old container by id.
|
||||
container_ids: HashMap<String, String>,
|
||||
}
|
||||
|
||||
pub struct Reconciler {
|
||||
device_id: Id,
|
||||
topology: Arc<PodmanTopology>,
|
||||
inventory: Arc<Inventory>,
|
||||
/// Keyed by NATS KV key (`<device>.<deployment>`). A single entry per
|
||||
/// KV key — in v0 there is no fan-out from one key to many scores.
|
||||
topology: Arc<dyn ContainerRuntime>,
|
||||
/// Keyed by NATS KV key (`<device>.<deployment>`). One entry per key.
|
||||
state: Mutex<HashMap<String, CachedEntry>>,
|
||||
/// Current phase per deployment, used to decide whether a new
|
||||
/// write to the `device-state` KV is needed.
|
||||
///
|
||||
/// NOTE : this feels dangerous, conflict on deployment name could be a problem
|
||||
/// We must explore this and clarify it in the design and decide if it is a constraint
|
||||
deployments: Mutex<HashMap<DeploymentName, Phase>>,
|
||||
/// Publish surface. Optional so unit tests without a live NATS
|
||||
/// client still work; always populated in the real agent runtime.
|
||||
/// Last (phase, container_ids) published per deployment, to skip
|
||||
/// re-publishing unchanged device-state.
|
||||
deployments: Mutex<HashMap<DeploymentName, (Phase, Vec<String>)>>,
|
||||
/// Publish surface. Optional so unit tests without a live NATS client work.
|
||||
fleet: Option<Arc<FleetPublisher>>,
|
||||
}
|
||||
|
||||
impl Reconciler {
|
||||
pub fn new(
|
||||
device_id: Id,
|
||||
topology: Arc<PodmanTopology>,
|
||||
inventory: Arc<Inventory>,
|
||||
topology: Arc<dyn ContainerRuntime>,
|
||||
fleet: Option<Arc<FleetPublisher>>,
|
||||
) -> Self {
|
||||
Self {
|
||||
device_id,
|
||||
topology,
|
||||
inventory,
|
||||
state: Mutex::new(HashMap::new()),
|
||||
deployments: Mutex::new(HashMap::new()),
|
||||
fleet,
|
||||
}
|
||||
}
|
||||
|
||||
/// Record a new phase for a deployment and, if it changed, write
|
||||
/// the updated [`DeploymentState`] to the KV. Same-phase
|
||||
/// re-confirmations are no-ops so the periodic reconcile tick
|
||||
/// doesn't churn the bucket.
|
||||
/// Record a phase (+ backing container ids) for a deployment and publish the
|
||||
/// [`DeploymentState`] when it changed. Same (phase, ids) is a no-op so the
|
||||
/// periodic tick doesn't churn the bucket.
|
||||
async fn apply_phase(
|
||||
&self,
|
||||
deployment: &DeploymentName,
|
||||
phase: Phase,
|
||||
last_error: Option<String>,
|
||||
container_ids: Vec<String>,
|
||||
) {
|
||||
{
|
||||
let mut phases = self.deployments.lock().await;
|
||||
// performance nitpick : we don't need a write lock here, we could check before acquiring the write
|
||||
// lock
|
||||
if phases.get(deployment).copied() == Some(phase) {
|
||||
if phases.get(deployment) == Some(&(phase, container_ids.clone())) {
|
||||
return;
|
||||
}
|
||||
phases.insert(deployment.clone(), phase);
|
||||
phases.insert(deployment.clone(), (phase, container_ids.clone()));
|
||||
}
|
||||
|
||||
if let Some(publisher) = &self.fleet {
|
||||
@@ -86,14 +79,12 @@ impl Reconciler {
|
||||
phase,
|
||||
last_event_at: Utc::now(),
|
||||
last_error,
|
||||
container_ids,
|
||||
};
|
||||
publisher.write_deployment_state(&state).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Clear the in-memory phase for a deployment and delete its KV
|
||||
/// entry. Idempotent: a delete for a never-applied deployment is
|
||||
/// a no-op in memory and a harmless tombstone write on the wire.
|
||||
async fn drop_phase(&self, deployment: &DeploymentName) {
|
||||
let was_known = {
|
||||
let mut phases = self.deployments.lock().await;
|
||||
@@ -107,9 +98,26 @@ impl Reconciler {
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a Put event (new or updated score on NATS KV). No-ops if the
|
||||
/// serialized score is byte-identical to the last-seen value for this
|
||||
/// key.
|
||||
/// Adopt-or-start: if a container with this name is already running, return
|
||||
/// its id (no recreate — survives an agent restart without flapping the
|
||||
/// workload); otherwise (re)create it from `spec` and return the new id.
|
||||
async fn ensure_started(&self, spec: &ContainerSpec) -> Result<String> {
|
||||
match self
|
||||
.topology
|
||||
.container_status(&spec.name)
|
||||
.await
|
||||
.map_err(rt)?
|
||||
{
|
||||
Some(obs) if obs.running => Ok(obs.id),
|
||||
_ => self.topology.start_service(spec).await.map_err(rt),
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a Put: NOOP if byte-identical to the last-seen score; a new key
|
||||
/// starts its services; a changed key **gracefully rolls forward** — stop
|
||||
/// each old container (SIGTERM → grace → SIGKILL per the score's
|
||||
/// `LifecyclePolicy`), then start the new one. Roll-forward only: a failure
|
||||
/// is reported as `Failed`, never reverted (the customer edits the spec).
|
||||
pub async fn apply(&self, key: &str, value: &[u8]) -> Result<()> {
|
||||
let deployment = deployment_from_key(key);
|
||||
let incoming = match serde_json::from_slice::<ReconcileScore>(value) {
|
||||
@@ -117,7 +125,12 @@ impl Reconciler {
|
||||
Err(e) => {
|
||||
tracing::warn!(key, error = %e, "failed to deserialize score");
|
||||
if let Some(name) = &deployment {
|
||||
self.apply_phase(name, Phase::Failed, Some(format!("bad payload: {e}")))
|
||||
self.apply_phase(
|
||||
name,
|
||||
Phase::Failed,
|
||||
Some(format!("bad payload: {e}")),
|
||||
vec![],
|
||||
)
|
||||
.await;
|
||||
}
|
||||
return Ok(());
|
||||
@@ -125,71 +138,122 @@ impl Reconciler {
|
||||
};
|
||||
let serialized = String::from_utf8_lossy(value).into_owned();
|
||||
|
||||
{
|
||||
let previous = {
|
||||
let state = self.state.lock().await;
|
||||
if let Some(existing) = state.get(key) {
|
||||
if existing.serialized == serialized {
|
||||
match state.get(key) {
|
||||
Some(existing) if existing.serialized == serialized => {
|
||||
tracing::debug!(key, "score unchanged — noop");
|
||||
return Ok(());
|
||||
}
|
||||
Some(existing) => Some(existing.container_ids.clone()),
|
||||
None => None,
|
||||
}
|
||||
};
|
||||
|
||||
// Upgrade in flight → Pending (the transient cutover window).
|
||||
if let Some(name) = &deployment {
|
||||
self.apply_phase(name, Phase::Pending, None, vec![]).await;
|
||||
}
|
||||
|
||||
match self.converge(&incoming, previous.as_ref()).await {
|
||||
Ok(ids) => {
|
||||
let id_list: Vec<String> = incoming
|
||||
.services
|
||||
.iter()
|
||||
.filter_map(|s| ids.get(&s.name).cloned())
|
||||
.collect();
|
||||
if let Some(name) = &deployment {
|
||||
self.apply_phase(name, Phase::Pending, None).await;
|
||||
self.apply_phase(name, Phase::Running, None, id_list).await;
|
||||
}
|
||||
|
||||
match self.run_score(key, &incoming).await {
|
||||
Ok(()) => {
|
||||
if let Some(name) = &deployment {
|
||||
self.apply_phase(name, Phase::Running, None).await;
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
if let Some(name) = &deployment {
|
||||
self.apply_phase(name, Phase::Failed, Some(short(&e.to_string())))
|
||||
.await;
|
||||
}
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
|
||||
let mut state = self.state.lock().await;
|
||||
state.insert(
|
||||
key.to_string(),
|
||||
CachedEntry {
|
||||
serialized,
|
||||
score: incoming,
|
||||
container_ids: ids,
|
||||
},
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
if let Some(name) = &deployment {
|
||||
self.apply_phase(name, Phase::Failed, Some(short(&e.to_string())), vec![])
|
||||
.await;
|
||||
}
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Handle a Delete/Purge event. Stops and removes every container
|
||||
/// referenced by the last cached score for this key. Idempotent: if we
|
||||
/// never saw a Put for this key (agent restart after delete), logs and
|
||||
/// returns ok.
|
||||
/// Drive actual → desired for one score. New deployment: start each service.
|
||||
/// Changed: stop the old container by id (graceful) then start the new one,
|
||||
/// and remove services dropped from the spec. Returns service → new id.
|
||||
async fn converge(
|
||||
&self,
|
||||
score: &PodmanV0Score,
|
||||
previous: Option<&HashMap<String, String>>,
|
||||
) -> Result<HashMap<String, String>> {
|
||||
let lifecycle = score.lifecycle_policy();
|
||||
let mut ids = HashMap::new();
|
||||
for service in &score.services {
|
||||
let spec = score.container_spec(service);
|
||||
match previous.and_then(|p| p.get(&service.name)) {
|
||||
// Existing service whose spec changed → graceful replace by id.
|
||||
Some(old_id) => {
|
||||
self.topology
|
||||
.stop_service(old_id, lifecycle.grace_period_secs)
|
||||
.await
|
||||
.map_err(rt)?;
|
||||
let id = self.topology.start_service(&spec).await.map_err(rt)?;
|
||||
ids.insert(service.name.clone(), id);
|
||||
}
|
||||
// New (or first-seen) service → adopt-or-start.
|
||||
None => {
|
||||
ids.insert(service.name.clone(), self.ensure_started(&spec).await?);
|
||||
}
|
||||
}
|
||||
}
|
||||
// Services removed from the spec: gracefully stop their old containers.
|
||||
if let Some(prev) = previous {
|
||||
for (old_name, old_id) in prev {
|
||||
if !score.services.iter().any(|s| &s.name == old_name) {
|
||||
if let Err(e) = self
|
||||
.topology
|
||||
.stop_service(old_id, lifecycle.grace_period_secs)
|
||||
.await
|
||||
{
|
||||
tracing::warn!(service = %old_name, error = %e, "failed to stop removed service");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(ids)
|
||||
}
|
||||
|
||||
/// Handle a Delete/Purge: gracefully stop+remove every container for the
|
||||
/// last cached score. Idempotent.
|
||||
pub async fn remove(&self, key: &str) -> Result<()> {
|
||||
let deployment = deployment_from_key(key);
|
||||
let mut state = self.state.lock().await;
|
||||
let Some(entry) = state.remove(key) else {
|
||||
let entry = self.state.lock().await.remove(key);
|
||||
let Some(entry) = entry else {
|
||||
tracing::info!(key, "delete for unknown key — nothing to remove");
|
||||
if let Some(name) = &deployment {
|
||||
self.drop_phase(name).await;
|
||||
}
|
||||
return Ok(());
|
||||
};
|
||||
drop(state);
|
||||
|
||||
use harmony::topology::ContainerRuntime;
|
||||
let grace = entry.score.lifecycle_policy().grace_period_secs;
|
||||
for service in &entry.score.services {
|
||||
if let Err(e) = self.topology.remove_service(&service.name).await {
|
||||
tracing::warn!(
|
||||
key,
|
||||
service = %service.name,
|
||||
error = %e,
|
||||
"failed to remove container"
|
||||
);
|
||||
// Prefer the recorded id; fall back to the service name.
|
||||
let target = entry
|
||||
.container_ids
|
||||
.get(&service.name)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| service.name.clone());
|
||||
if let Err(e) = self.topology.stop_service(&target, grace).await {
|
||||
tracing::warn!(key, service = %service.name, error = %e, "failed to remove container");
|
||||
} else {
|
||||
tracing::info!(key, service = %service.name, "removed container");
|
||||
}
|
||||
@@ -200,11 +264,10 @@ impl Reconciler {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Periodic ground-truth reconcile. ROADMAP §5.6 — "polling instead of
|
||||
/// event-driven PLEG. Agent polls podman every 30s as ground truth;
|
||||
/// KV watch events are accelerators." Re-runs each cached score against
|
||||
/// podman-api; the underlying `ensure_service_running` is idempotent
|
||||
/// so a converged state produces no log noise.
|
||||
/// Periodic ground-truth reconcile. **Liveness only**: verify each cached
|
||||
/// deployment's containers are up (adopt-or-start), never compare the spec
|
||||
/// — that's what killed the old 30s redeploy loop. Spec changes arrive as a
|
||||
/// KV Put and go through `apply`'s graceful path instead.
|
||||
pub async fn tick(&self) -> Result<()> {
|
||||
let snapshot: Vec<(String, PodmanV0Score)> = {
|
||||
let state = self.state.lock().await;
|
||||
@@ -215,20 +278,39 @@ impl Reconciler {
|
||||
};
|
||||
for (key, score) in snapshot {
|
||||
let deployment = deployment_from_key(&key);
|
||||
match self.run_score(&key, &score).await {
|
||||
Ok(()) => {
|
||||
if let Some(name) = &deployment {
|
||||
self.apply_phase(name, Phase::Running, None).await;
|
||||
}
|
||||
let mut ids = HashMap::new();
|
||||
let mut error: Option<String> = None;
|
||||
for service in &score.services {
|
||||
match self.ensure_started(&score.container_spec(service)).await {
|
||||
Ok(id) => {
|
||||
ids.insert(service.name.clone(), id);
|
||||
}
|
||||
Err(e) => {
|
||||
tracing::warn!(key, error = %e, "periodic reconcile failed");
|
||||
if let Some(name) = &deployment {
|
||||
self.apply_phase(name, Phase::Failed, Some(short(&e.to_string())))
|
||||
.await;
|
||||
error = Some(short(&e.to_string()));
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
// Refresh the cached ids so a recreated container's id is recorded.
|
||||
if error.is_none() {
|
||||
if let Some(entry) = self.state.lock().await.get_mut(&key) {
|
||||
entry.container_ids = ids.clone();
|
||||
}
|
||||
}
|
||||
if let Some(name) = &deployment {
|
||||
match error {
|
||||
None => {
|
||||
let id_list = score
|
||||
.services
|
||||
.iter()
|
||||
.filter_map(|s| ids.get(&s.name).cloned())
|
||||
.collect();
|
||||
self.apply_phase(name, Phase::Running, None, id_list).await;
|
||||
}
|
||||
Some(e) => self.apply_phase(name, Phase::Failed, Some(e), vec![]).await,
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
@@ -243,16 +325,11 @@ impl Reconciler {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn run_score(&self, key: &str, score: &PodmanV0Score) -> Result<()> {
|
||||
let interpret = Score::<PodmanTopology>::create_interpret(score);
|
||||
let outcome = interpret
|
||||
.execute(&self.inventory, &self.topology)
|
||||
.await
|
||||
.map_err(|e| anyhow::anyhow!("PodmanV0Score interpret failed for {key}: {e}"))?;
|
||||
tracing::info!(key, outcome = ?outcome, "reconciled");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Map a runtime `ExecutorError` into `anyhow` at the agent boundary.
|
||||
fn rt(e: harmony::executors::ExecutorError) -> anyhow::Error {
|
||||
anyhow::anyhow!("container runtime: {e}")
|
||||
}
|
||||
|
||||
/// Extract the deployment name from a NATS KV key of the form
|
||||
@@ -277,25 +354,168 @@ fn short(s: &str) -> String {
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
//! Focused tests for transition detection. Drive `apply_phase` /
|
||||
//! `drop_phase` directly with an inert topology (no real podman
|
||||
//! socket) and a `None` FleetPublisher.
|
||||
//! Drives the reconciler against an in-memory [`FakeRuntime`] (no podman
|
||||
//! socket) and a `None` FleetPublisher, so the convergence logic — the
|
||||
//! loop-fix and the graceful roll-forward — is tested deterministically.
|
||||
use super::*;
|
||||
use harmony::inventory::Inventory;
|
||||
use harmony::modules::podman::PodmanTopology;
|
||||
use std::path::PathBuf;
|
||||
use harmony::executors::ExecutorError;
|
||||
use harmony::modules::podman::{PodmanService, ReconcileScore};
|
||||
use harmony::topology::{ContainerObservation, ContainerState};
|
||||
use std::sync::Mutex as StdMutex;
|
||||
|
||||
fn reconciler() -> Reconciler {
|
||||
let topology = Arc::new(
|
||||
PodmanTopology::from_unix_socket(PathBuf::from("/nonexistent/for-tests")).unwrap(),
|
||||
);
|
||||
let inventory = Arc::new(Inventory::empty());
|
||||
Reconciler::new(
|
||||
#[derive(Default)]
|
||||
struct FakeState {
|
||||
/// name → (id, running).
|
||||
containers: HashMap<String, (String, bool)>,
|
||||
next_id: usize,
|
||||
started: Vec<String>,
|
||||
stopped: Vec<String>,
|
||||
fail_next_start: bool,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct FakeRuntime {
|
||||
inner: StdMutex<FakeState>,
|
||||
}
|
||||
|
||||
impl FakeRuntime {
|
||||
fn started(&self) -> Vec<String> {
|
||||
self.inner.lock().unwrap().started.clone()
|
||||
}
|
||||
fn stopped(&self) -> Vec<String> {
|
||||
self.inner.lock().unwrap().stopped.clone()
|
||||
}
|
||||
fn fail_next_start(&self) {
|
||||
self.inner.lock().unwrap().fail_next_start = true;
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait::async_trait]
|
||||
impl ContainerRuntime for FakeRuntime {
|
||||
async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError> {
|
||||
self.start_service(spec).await.map(|_| ())
|
||||
}
|
||||
async fn start_service(&self, spec: &ContainerSpec) -> Result<String, ExecutorError> {
|
||||
let mut s = self.inner.lock().unwrap();
|
||||
if s.fail_next_start {
|
||||
s.fail_next_start = false;
|
||||
return Err(ExecutorError::UnexpectedError("start failed".into()));
|
||||
}
|
||||
s.next_id += 1;
|
||||
let id = format!("id{}", s.next_id);
|
||||
s.containers.insert(spec.name.clone(), (id.clone(), true));
|
||||
s.started.push(id.clone());
|
||||
Ok(id)
|
||||
}
|
||||
async fn container_status(
|
||||
&self,
|
||||
name: &str,
|
||||
) -> Result<Option<ContainerObservation>, ExecutorError> {
|
||||
let s = self.inner.lock().unwrap();
|
||||
Ok(s.containers
|
||||
.get(name)
|
||||
.map(|(id, running)| ContainerObservation {
|
||||
id: id.clone(),
|
||||
running: *running,
|
||||
}))
|
||||
}
|
||||
async fn stop_service(&self, id_or_name: &str, _grace: u64) -> Result<(), ExecutorError> {
|
||||
let mut s = self.inner.lock().unwrap();
|
||||
s.stopped.push(id_or_name.to_string());
|
||||
s.containers
|
||||
.retain(|name, (id, _)| name != id_or_name && id != id_or_name);
|
||||
Ok(())
|
||||
}
|
||||
async fn remove_service(&self, name: &str) -> Result<(), ExecutorError> {
|
||||
self.stop_service(name, 0).await
|
||||
}
|
||||
async fn list_managed_services(&self) -> Result<Vec<ContainerState>, ExecutorError> {
|
||||
Ok(vec![])
|
||||
}
|
||||
}
|
||||
|
||||
fn reconciler() -> (Reconciler, Arc<FakeRuntime>) {
|
||||
let fake = Arc::new(FakeRuntime::default());
|
||||
let r = Reconciler::new(
|
||||
Id::from("test-device".to_string()),
|
||||
topology,
|
||||
inventory,
|
||||
fake.clone() as Arc<dyn ContainerRuntime>,
|
||||
None,
|
||||
)
|
||||
);
|
||||
(r, fake)
|
||||
}
|
||||
|
||||
fn score_bytes(image: &str) -> Vec<u8> {
|
||||
let score = PodmanV0Score {
|
||||
services: vec![PodmanService {
|
||||
name: "web".to_string(),
|
||||
image: image.to_string(),
|
||||
ports: vec![],
|
||||
env: vec![EnvVar::new("LOG", "info")],
|
||||
volumes: vec![],
|
||||
restart_policy: Default::default(),
|
||||
}],
|
||||
lifecycle: None,
|
||||
};
|
||||
serde_json::to_vec(&ReconcileScore::PodmanV0(score)).unwrap()
|
||||
}
|
||||
|
||||
use harmony::topology::EnvVar;
|
||||
|
||||
#[tokio::test]
|
||||
async fn tick_is_idempotent_no_recreate() {
|
||||
// The loop-fix: a service WITH env (which the old matches_spec declared
|
||||
// "always drifted") must not be recreated by the periodic tick.
|
||||
let (r, fake) = reconciler();
|
||||
r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap();
|
||||
assert_eq!(fake.started().len(), 1, "initial start");
|
||||
for _ in 0..3 {
|
||||
r.tick().await.unwrap();
|
||||
}
|
||||
assert_eq!(
|
||||
fake.started().len(),
|
||||
1,
|
||||
"tick must not recreate a running container"
|
||||
);
|
||||
assert!(
|
||||
fake.stopped().is_empty(),
|
||||
"tick must not stop a healthy container"
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn changed_score_gracefully_replaces_by_id() {
|
||||
let (r, fake) = reconciler();
|
||||
r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap();
|
||||
r.apply("dev.web", &score_bytes("nginx:2")).await.unwrap();
|
||||
// Old container (id1) stopped gracefully, new one (id2) started.
|
||||
assert_eq!(fake.started(), vec!["id1", "id2"]);
|
||||
assert_eq!(fake.stopped(), vec!["id1"]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn unchanged_score_is_a_noop() {
|
||||
let (r, fake) = reconciler();
|
||||
r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap();
|
||||
r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap();
|
||||
assert_eq!(fake.started().len(), 1, "byte-identical re-apply is a noop");
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn roll_forward_failure_reports_failed_without_revert() {
|
||||
let (r, fake) = reconciler();
|
||||
r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap();
|
||||
fake.fail_next_start();
|
||||
let res = r.apply("dev.web", &score_bytes("nginx:2")).await;
|
||||
assert!(res.is_err(), "upgrade failure surfaces");
|
||||
// Old was stopped; new start failed; NO revert (roll-forward only).
|
||||
assert_eq!(fake.stopped(), vec!["id1"]);
|
||||
assert_eq!(
|
||||
fake.started(),
|
||||
vec!["id1"],
|
||||
"no revert restart of the old version"
|
||||
);
|
||||
let phases = r.deployments.lock().await;
|
||||
assert_eq!(phases.get(&dn("web")).map(|(p, _)| *p), Some(Phase::Failed));
|
||||
}
|
||||
|
||||
fn dn(s: &str) -> DeploymentName {
|
||||
@@ -304,36 +524,63 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_phase_records_new_phase() {
|
||||
let r = reconciler();
|
||||
r.apply_phase(&dn("hello"), Phase::Running, None).await;
|
||||
let (r, _) = reconciler();
|
||||
r.apply_phase(&dn("hello"), Phase::Running, None, vec![])
|
||||
.await;
|
||||
let phases = r.deployments.lock().await;
|
||||
assert_eq!(phases.get(&dn("hello")), Some(&Phase::Running));
|
||||
assert_eq!(
|
||||
phases.get(&dn("hello")).map(|(p, _)| *p),
|
||||
Some(Phase::Running)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_phase_idempotent_for_same_phase() {
|
||||
let r = reconciler();
|
||||
r.apply_phase(&dn("hello"), Phase::Running, None).await;
|
||||
r.apply_phase(&dn("hello"), Phase::Running, None).await;
|
||||
async fn apply_phase_idempotent_for_same_phase_and_ids() {
|
||||
let (r, _) = reconciler();
|
||||
r.apply_phase(&dn("hello"), Phase::Running, None, vec!["abc".into()])
|
||||
.await;
|
||||
r.apply_phase(&dn("hello"), Phase::Running, None, vec!["abc".into()])
|
||||
.await;
|
||||
let phases = r.deployments.lock().await;
|
||||
assert_eq!(phases.len(), 1);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_phase_transitions_update_phase() {
|
||||
let r = reconciler();
|
||||
r.apply_phase(&dn("hello"), Phase::Pending, None).await;
|
||||
r.apply_phase(&dn("hello"), Phase::Running, None).await;
|
||||
r.apply_phase(&dn("hello"), Phase::Failed, Some("oom".to_string()))
|
||||
async fn apply_phase_republishes_when_container_id_changes() {
|
||||
let (r, _) = reconciler();
|
||||
r.apply_phase(&dn("hello"), Phase::Running, None, vec!["old".into()])
|
||||
.await;
|
||||
// Same phase, new id (after an upgrade) → recorded.
|
||||
r.apply_phase(&dn("hello"), Phase::Running, None, vec!["new".into()])
|
||||
.await;
|
||||
let phases = r.deployments.lock().await;
|
||||
assert_eq!(phases.get(&dn("hello")), Some(&Phase::Failed));
|
||||
assert_eq!(
|
||||
phases.get(&dn("hello")),
|
||||
Some(&(Phase::Running, vec!["new".to_string()]))
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn apply_phase_transitions_update_phase() {
|
||||
let (r, _) = reconciler();
|
||||
r.apply_phase(&dn("hello"), Phase::Pending, None, vec![])
|
||||
.await;
|
||||
r.apply_phase(&dn("hello"), Phase::Running, None, vec![])
|
||||
.await;
|
||||
r.apply_phase(&dn("hello"), Phase::Failed, Some("oom".to_string()), vec![])
|
||||
.await;
|
||||
let phases = r.deployments.lock().await;
|
||||
assert_eq!(
|
||||
phases.get(&dn("hello")).map(|(p, _)| *p),
|
||||
Some(Phase::Failed)
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn drop_phase_clears_known_deployment() {
|
||||
let r = reconciler();
|
||||
r.apply_phase(&dn("hello"), Phase::Running, None).await;
|
||||
let (r, _) = reconciler();
|
||||
r.apply_phase(&dn("hello"), Phase::Running, None, vec![])
|
||||
.await;
|
||||
r.drop_phase(&dn("hello")).await;
|
||||
let phases = r.deployments.lock().await;
|
||||
assert!(!phases.contains_key(&dn("hello")));
|
||||
@@ -341,7 +588,7 @@ mod tests {
|
||||
|
||||
#[tokio::test]
|
||||
async fn drop_phase_on_unknown_deployment_is_noop() {
|
||||
let r = reconciler();
|
||||
let (r, _) = reconciler();
|
||||
r.drop_phase(&dn("never-existed")).await;
|
||||
let phases = r.deployments.lock().await;
|
||||
assert!(phases.is_empty());
|
||||
|
||||
@@ -137,6 +137,7 @@ async fn create_fleet_deployment(deployments: &Api<Deployment>, name: &str) -> a
|
||||
volumes: vec![],
|
||||
restart_policy: Default::default(),
|
||||
}],
|
||||
lifecycle: None,
|
||||
}),
|
||||
rollout: Rollout {
|
||||
strategy: RolloutStrategy::Immediate,
|
||||
|
||||
@@ -125,6 +125,7 @@ fn podman_score() -> PodmanV0Score {
|
||||
volumes: vec![],
|
||||
restart_policy: Default::default(),
|
||||
}],
|
||||
lifecycle: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -248,6 +249,7 @@ async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> {
|
||||
phase: Phase::Running,
|
||||
last_event_at: Utc::now(),
|
||||
last_error: None,
|
||||
container_ids: vec![],
|
||||
})
|
||||
.await?;
|
||||
kill(h1).await;
|
||||
|
||||
@@ -185,5 +185,6 @@ fn podman_score(image_tag: &str) -> PodmanV0Score {
|
||||
volumes: vec![],
|
||||
restart_policy: RestartPolicy::default(),
|
||||
}],
|
||||
lifecycle: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -92,6 +92,7 @@ async fn agent_ignores_other_devices_keys() -> anyhow::Result<()> {
|
||||
volumes: vec![],
|
||||
restart_policy: RestartPolicy::default(),
|
||||
}],
|
||||
lifecycle: None,
|
||||
};
|
||||
admin.put_podman_at_key(&foreign_key, &score).await?;
|
||||
|
||||
|
||||
@@ -818,6 +818,7 @@ mod tests {
|
||||
phase,
|
||||
last_event_at: Utc.timestamp_opt(1_700_000_000 + seconds, 0).unwrap(),
|
||||
last_error: None,
|
||||
container_ids: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -652,6 +652,7 @@ mod tests {
|
||||
volumes: vec![],
|
||||
restart_policy: Default::default(),
|
||||
}],
|
||||
lifecycle: None,
|
||||
}),
|
||||
rollout: Rollout {
|
||||
strategy: RolloutStrategy::Immediate,
|
||||
@@ -705,6 +706,7 @@ mod tests {
|
||||
volumes: vec![],
|
||||
restart_policy: Default::default(),
|
||||
}],
|
||||
lifecycle: None,
|
||||
});
|
||||
assert_eq!(first_service_name(&score).as_deref(), Some("web"));
|
||||
}
|
||||
@@ -720,6 +722,7 @@ mod tests {
|
||||
volumes: vec![],
|
||||
restart_policy: Default::default(),
|
||||
}],
|
||||
lifecycle: None,
|
||||
});
|
||||
assert_eq!(deployment_version(&score), "1.25");
|
||||
}
|
||||
|
||||
@@ -131,6 +131,12 @@ pub struct DeploymentState {
|
||||
pub last_event_at: DateTime<Utc>,
|
||||
#[serde(default)]
|
||||
pub last_error: Option<String>,
|
||||
/// Runtime container ids backing this deployment on the device, one per
|
||||
/// service (v0.3 Ch5). The agent's identity handle for graceful replacement
|
||||
/// and liveness; surfaced for the dashboard. `#[serde(default)]` for
|
||||
/// wire-compat with pre-Ch5 agents that don't report it.
|
||||
#[serde(default)]
|
||||
pub container_ids: Vec<String>,
|
||||
}
|
||||
|
||||
/// Tiny liveness ping. Written to KV key `heartbeat.<device_id>` in
|
||||
@@ -233,6 +239,7 @@ mod tests {
|
||||
phase: Phase::Failed,
|
||||
last_event_at: ts("2026-04-22T10:05:00Z"),
|
||||
last_error: Some("image pull 429".to_string()),
|
||||
container_ids: vec![],
|
||||
};
|
||||
let json = serde_json::to_string(&original).unwrap();
|
||||
let back: DeploymentState = serde_json::from_str(&json).unwrap();
|
||||
|
||||
@@ -21,13 +21,36 @@ use crate::executors::ExecutorError;
|
||||
/// capability will be clearer than stretching this one.
|
||||
#[async_trait]
|
||||
pub trait ContainerRuntime: Send + Sync {
|
||||
/// Ensure a container matching `spec` is running. Idempotent: running the
|
||||
/// same spec twice must converge to the same state with no observable
|
||||
/// side effects beyond logs. If a container with the same `name` already
|
||||
/// exists with the same image and port mappings, this is a NOOP.
|
||||
/// Otherwise it is stopped, removed, and recreated.
|
||||
/// Ensure a container matching `spec` is running. Idempotent and
|
||||
/// **liveness-only**: if a container with this `name` is already running,
|
||||
/// it's a NOOP — the runtime does **not** compare the spec (a polling
|
||||
/// reconciler would otherwise re-create on every tick for fields it can't
|
||||
/// read back). Spec changes are driven through [`Self::replace_service`] by
|
||||
/// a stateful caller that knows the desired state changed. If no container
|
||||
/// is running, one is (re)created from `spec`.
|
||||
async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError>;
|
||||
|
||||
/// Create + start a container from `spec`, returning its runtime id. Any
|
||||
/// existing container with the same name is force-removed first. The id is
|
||||
/// the caller's handle for liveness checks and targeted replacement.
|
||||
async fn start_service(&self, spec: &ContainerSpec) -> Result<String, ExecutorError>;
|
||||
|
||||
/// Observe the container with this name: its id + whether it's running.
|
||||
/// `None` if no such container exists.
|
||||
async fn container_status(
|
||||
&self,
|
||||
name: &str,
|
||||
) -> Result<Option<ContainerObservation>, ExecutorError>;
|
||||
|
||||
/// Gracefully stop and remove a container by id or name: send the
|
||||
/// container's stop signal, wait `grace_period_secs`, then SIGKILL
|
||||
/// (podman's stop semantics), then remove. Idempotent — no error if gone.
|
||||
async fn stop_service(
|
||||
&self,
|
||||
id_or_name: &str,
|
||||
grace_period_secs: u64,
|
||||
) -> Result<(), ExecutorError>;
|
||||
|
||||
/// Stop and remove the container with this name if it exists. No error
|
||||
/// if the container does not exist — idempotent.
|
||||
async fn remove_service(&self, name: &str) -> Result<(), ExecutorError>;
|
||||
@@ -38,6 +61,37 @@ pub trait ContainerRuntime: Send + Sync {
|
||||
async fn list_managed_services(&self) -> Result<Vec<ContainerState>, ExecutorError>;
|
||||
}
|
||||
|
||||
/// Graceful-shutdown policy for a deployment's container during a roll-forward
|
||||
/// upgrade (v0.3 Ch5). Mirrors K8s/`docker stop` semantics: send `stop_signal`,
|
||||
/// wait `grace_period_secs`, then SIGKILL (podman's stop always force-kills
|
||||
/// after the timeout — there's no "never kill" knob, so we don't pretend one).
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct LifecyclePolicy {
|
||||
/// Signal sent first to ask the process to exit cleanly (e.g. `SIGTERM`).
|
||||
pub stop_signal: String,
|
||||
/// Seconds to wait for a clean exit before SIGKILL.
|
||||
pub grace_period_secs: u64,
|
||||
}
|
||||
|
||||
impl Default for LifecyclePolicy {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
stop_signal: "SIGTERM".to_string(),
|
||||
grace_period_secs: 30,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A container's identity + liveness, as observed on the runtime.
|
||||
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||
pub struct ContainerObservation {
|
||||
/// Runtime container id.
|
||||
pub id: String,
|
||||
/// Whether it's currently running.
|
||||
pub running: bool,
|
||||
}
|
||||
|
||||
/// A single environment variable, name/value pair.
|
||||
///
|
||||
/// Modeled as a struct rather than `(String, String)` because k8s CRD
|
||||
@@ -98,6 +152,11 @@ pub struct ContainerSpec {
|
||||
/// Restart policy on container exit. Mirrors podman/docker semantics.
|
||||
#[serde(default)]
|
||||
pub restart_policy: RestartPolicy,
|
||||
/// Signal sent on graceful stop (from the deployment's `LifecyclePolicy`).
|
||||
/// Baked into the container at create time so `podman stop` uses it. `None`
|
||||
/// = the image/podman default (`SIGTERM`).
|
||||
#[serde(default)]
|
||||
pub stop_signal: Option<String>,
|
||||
}
|
||||
|
||||
impl ContainerSpec {
|
||||
|
||||
@@ -5,7 +5,7 @@ use crate::{
|
||||
data::Version,
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
topology::{ContainerRuntime, ContainerSpec, Topology},
|
||||
topology::{ContainerRuntime, Topology},
|
||||
};
|
||||
|
||||
use super::score::PodmanV0Score;
|
||||
@@ -54,15 +54,7 @@ impl<T: Topology + ContainerRuntime> Interpret<T> for PodmanV0Interpret {
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
for service in &self.score.services {
|
||||
let spec = ContainerSpec {
|
||||
name: service.name.clone(),
|
||||
image: service.image.clone(),
|
||||
ports: service.ports.clone(),
|
||||
labels: vec![(DEPLOYMENT_LABEL.to_string(), self.score.deployment_label())],
|
||||
env: service.env.clone(),
|
||||
volumes: service.volumes.clone(),
|
||||
restart_policy: service.restart_policy,
|
||||
};
|
||||
let spec = self.score.container_spec(service);
|
||||
topology.ensure_service_running(&spec).await.map_err(|e| {
|
||||
InterpretError::new(format!(
|
||||
"PodmanV0: ensure_service_running({}) failed: {e}",
|
||||
|
||||
@@ -13,10 +13,13 @@ use serde::{Deserialize, Serialize};
|
||||
use crate::{
|
||||
interpret::Interpret,
|
||||
score::Score,
|
||||
topology::{ContainerRuntime, EnvVar, RestartPolicy, Topology, VolumeMount},
|
||||
topology::{
|
||||
ContainerRuntime, ContainerSpec, EnvVar, LifecyclePolicy, RestartPolicy, Topology,
|
||||
VolumeMount,
|
||||
},
|
||||
};
|
||||
|
||||
use super::interpret::PodmanV0Interpret;
|
||||
use super::interpret::{DEPLOYMENT_LABEL, PodmanV0Interpret};
|
||||
|
||||
/// A single container managed by podman on the target host.
|
||||
///
|
||||
@@ -46,9 +49,36 @@ pub struct PodmanService {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
|
||||
pub struct PodmanV0Score {
|
||||
pub services: Vec<PodmanService>,
|
||||
/// Graceful-shutdown policy used when a changed Score replaces a running
|
||||
/// container (v0.3 Ch5 roll-forward upgrade). `None` = the default
|
||||
/// (SIGTERM, 30s grace, SIGKILL fallback). Defaults so older Deployment CRs
|
||||
/// without the field deserialize unchanged.
|
||||
#[serde(default)]
|
||||
pub lifecycle: Option<LifecyclePolicy>,
|
||||
}
|
||||
|
||||
impl PodmanV0Score {
|
||||
/// The effective lifecycle policy (configured, or the default).
|
||||
pub fn lifecycle_policy(&self) -> LifecyclePolicy {
|
||||
self.lifecycle.clone().unwrap_or_default()
|
||||
}
|
||||
|
||||
/// Build the [`ContainerSpec`] for one service: image/ports/env/volumes plus
|
||||
/// the deployment label and the lifecycle stop signal. One place so the
|
||||
/// interpret and the agent reconciler agree on the mapping.
|
||||
pub fn container_spec(&self, service: &PodmanService) -> ContainerSpec {
|
||||
ContainerSpec {
|
||||
name: service.name.clone(),
|
||||
image: service.image.clone(),
|
||||
ports: service.ports.clone(),
|
||||
labels: vec![(DEPLOYMENT_LABEL.to_string(), self.deployment_label())],
|
||||
env: service.env.clone(),
|
||||
volumes: service.volumes.clone(),
|
||||
restart_policy: service.restart_policy,
|
||||
stop_signal: self.lifecycle.as_ref().map(|l| l.stop_signal.clone()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Label value applied to every container this Score creates. The
|
||||
/// caller uses the label to enumerate the set of containers that
|
||||
/// belong to a given Score instance (e.g. to tear them down when
|
||||
@@ -122,6 +152,7 @@ mod tests {
|
||||
ports: vec!["8080:80".to_string()],
|
||||
..svc("web", "nginx:latest")
|
||||
}],
|
||||
lifecycle: None,
|
||||
});
|
||||
let json = serde_json::to_string(&score).unwrap();
|
||||
assert!(json.contains("\"type\":\"PodmanV0\""));
|
||||
@@ -148,6 +179,7 @@ mod tests {
|
||||
..svc("api", "myapp:1.0")
|
||||
},
|
||||
],
|
||||
lifecycle: None,
|
||||
});
|
||||
let serialized = serde_json::to_string(&score).unwrap();
|
||||
let deserialized: ReconcileScore = serde_json::from_str(&serialized).unwrap();
|
||||
@@ -229,6 +261,7 @@ mod tests {
|
||||
fn deployment_label_joins_service_names() {
|
||||
let score = PodmanV0Score {
|
||||
services: vec![svc("web", "nginx"), svc("api", "myapp")],
|
||||
lifecycle: None,
|
||||
};
|
||||
assert_eq!(score.deployment_label(), "web,api");
|
||||
}
|
||||
|
||||
@@ -12,8 +12,8 @@ use podman_api::opts::{
|
||||
};
|
||||
|
||||
use crate::domain::topology::{
|
||||
ContainerRuntime, ContainerSpec, ContainerState, PreparationError, PreparationOutcome,
|
||||
RestartPolicy, Topology, VolumeMount,
|
||||
ContainerObservation, ContainerRuntime, ContainerSpec, ContainerState, PreparationError,
|
||||
PreparationOutcome, RestartPolicy, Topology, VolumeMount,
|
||||
};
|
||||
use crate::executors::ExecutorError;
|
||||
|
||||
@@ -97,49 +97,11 @@ impl PodmanTopology {
|
||||
Err(e) => Err(to_exec_error(e)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Topology for PodmanTopology {
|
||||
fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
|
||||
// A quick ping — calling info() is the cheapest endpoint that
|
||||
// exercises the socket end-to-end.
|
||||
self.podman
|
||||
.info()
|
||||
.await
|
||||
.map_err(|e| PreparationError::new(format!("podman socket unreachable: {e}")))?;
|
||||
Ok(PreparationOutcome::Noop)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ContainerRuntime for PodmanTopology {
|
||||
async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError> {
|
||||
let existing = self.get_by_name(&spec.name).await?;
|
||||
|
||||
if let Some(existing) = existing.as_ref() {
|
||||
if matches_spec(existing, spec) {
|
||||
let running = existing.state.as_deref() == Some("running");
|
||||
if running {
|
||||
return Ok(());
|
||||
}
|
||||
// Same spec, just not running — start it rather than recreate.
|
||||
let id = existing.id.clone().unwrap_or_else(|| spec.name.clone());
|
||||
self.containers()
|
||||
.get(id)
|
||||
.start(None)
|
||||
.await
|
||||
.map_err(to_exec_error)?;
|
||||
return Ok(());
|
||||
}
|
||||
// Drift — remove and recreate below.
|
||||
self.remove_container(&spec.name).await?;
|
||||
}
|
||||
|
||||
/// Create + start a container from `spec`, returning its id. The container's
|
||||
/// stop signal + stop timeout are baked in here so a later `podman stop`
|
||||
/// honors the deployment's lifecycle policy.
|
||||
async fn create_and_start(&self, spec: &ContainerSpec) -> Result<String, ExecutorError> {
|
||||
self.ensure_image_present(&spec.image).await?;
|
||||
|
||||
let mut labels = HashMap::new();
|
||||
@@ -171,34 +133,94 @@ impl ContainerRuntime for PodmanTopology {
|
||||
.portmappings(port_mappings)
|
||||
.env(env_map)
|
||||
.restart_policy(map_restart_policy(spec.restart_policy));
|
||||
if let Some(sig) = spec.stop_signal.as_deref() {
|
||||
builder = builder.stop_signal(signal_number(sig));
|
||||
}
|
||||
if !mounts.is_empty() {
|
||||
builder = builder.mounts(mounts);
|
||||
}
|
||||
let opts = builder.build();
|
||||
|
||||
let created = self
|
||||
.containers()
|
||||
.create(&opts)
|
||||
.create(&builder.build())
|
||||
.await
|
||||
.map_err(to_exec_error)?;
|
||||
let id = created.id;
|
||||
self.containers()
|
||||
.get(created.id.clone())
|
||||
.get(id.clone())
|
||||
.start(None)
|
||||
.await
|
||||
.map_err(to_exec_error)?;
|
||||
Ok(())
|
||||
Ok(id)
|
||||
}
|
||||
}
|
||||
|
||||
async fn remove_service(&self, name: &str) -> Result<(), ExecutorError> {
|
||||
// First try a graceful stop; ignore failures so removal still happens.
|
||||
#[async_trait]
|
||||
impl Topology for PodmanTopology {
|
||||
fn name(&self) -> &str {
|
||||
&self.name
|
||||
}
|
||||
|
||||
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
|
||||
// A quick ping — calling info() is the cheapest endpoint that
|
||||
// exercises the socket end-to-end.
|
||||
self.podman
|
||||
.info()
|
||||
.await
|
||||
.map_err(|e| PreparationError::new(format!("podman socket unreachable: {e}")))?;
|
||||
Ok(PreparationOutcome::Noop)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl ContainerRuntime for PodmanTopology {
|
||||
async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError> {
|
||||
// Liveness-only: never compare the spec (that's the caller's job via
|
||||
// byte-compare of the desired state). A container that's already up
|
||||
// stays up — no per-tick recreate, no flap.
|
||||
match self.container_status(&spec.name).await? {
|
||||
Some(obs) if obs.running => Ok(()),
|
||||
_ => self.start_service(spec).await.map(|_| ()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn start_service(&self, spec: &ContainerSpec) -> Result<String, ExecutorError> {
|
||||
// Force-remove any stale container by this name so create can't collide.
|
||||
self.remove_container(&spec.name).await?;
|
||||
self.create_and_start(spec).await
|
||||
}
|
||||
|
||||
async fn container_status(
|
||||
&self,
|
||||
name: &str,
|
||||
) -> Result<Option<ContainerObservation>, ExecutorError> {
|
||||
let Some(c) = self.get_by_name(name).await? else {
|
||||
return Ok(None);
|
||||
};
|
||||
Ok(Some(ContainerObservation {
|
||||
id: c.id.clone().unwrap_or_default(),
|
||||
running: c.state.as_deref() == Some("running"),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn stop_service(
|
||||
&self,
|
||||
id_or_name: &str,
|
||||
grace_period_secs: u64,
|
||||
) -> Result<(), ExecutorError> {
|
||||
// Graceful: send the container's stop signal, wait `grace`, then podman
|
||||
// SIGKILLs. Ignore stop failures so removal still happens.
|
||||
let stop_opts = ContainerStopOpts::builder()
|
||||
.timeout(STOP_TIMEOUT.as_secs() as usize)
|
||||
.timeout(grace_period_secs as usize)
|
||||
.build();
|
||||
let container = self.containers().get(name);
|
||||
let container = self.containers().get(id_or_name);
|
||||
if container.exists().await.unwrap_or(false) {
|
||||
let _ = container.stop(&stop_opts).await;
|
||||
}
|
||||
self.remove_container(name).await
|
||||
self.remove_container(id_or_name).await
|
||||
}
|
||||
|
||||
async fn remove_service(&self, name: &str) -> Result<(), ExecutorError> {
|
||||
self.stop_service(name, STOP_TIMEOUT.as_secs()).await
|
||||
}
|
||||
|
||||
async fn list_managed_services(&self) -> Result<Vec<ContainerState>, ExecutorError> {
|
||||
@@ -267,71 +289,19 @@ fn parse_port_mapping(raw: &str) -> Result<PortMapping, ExecutorError> {
|
||||
})
|
||||
}
|
||||
|
||||
fn matches_spec(observed: &podman_api::models::ListContainer, spec: &ContainerSpec) -> bool {
|
||||
let same_image = observed
|
||||
.image
|
||||
.as_deref()
|
||||
.map(|i| i == spec.image)
|
||||
.unwrap_or(false);
|
||||
if !same_image {
|
||||
return false;
|
||||
/// Map a signal name (`SIGTERM`) to its number for podman's `stop_signal`
|
||||
/// create option. Unknown names fall back to SIGTERM (15) — the safe default.
|
||||
fn signal_number(name: &str) -> i64 {
|
||||
match name.trim().to_uppercase().trim_start_matches("SIG") {
|
||||
"HUP" => 1,
|
||||
"INT" => 2,
|
||||
"QUIT" => 3,
|
||||
"KILL" => 9,
|
||||
"USR1" => 10,
|
||||
"USR2" => 12,
|
||||
"TERM" => 15,
|
||||
other => other.parse().unwrap_or(15),
|
||||
}
|
||||
let observed_ports = observed.ports.as_deref().unwrap_or(&[]);
|
||||
if observed_ports.len() != spec.ports.len() {
|
||||
return false;
|
||||
}
|
||||
for raw in &spec.ports {
|
||||
let Ok(expected) = parse_port_mapping(raw) else {
|
||||
return false;
|
||||
};
|
||||
let present = observed_ports.iter().any(|p| {
|
||||
p.host_port == expected.host_port && p.container_port == expected.container_port
|
||||
});
|
||||
if !present {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
// FIXME(redeploy-loop): this branch makes the agent's periodic
|
||||
// reconcile non-idempotent for any non-trivial Deployment.
|
||||
// Symptom: a service with env or volumes is destroyed and
|
||||
// recreated every 30s tick (RECONCILE_INTERVAL), even when the
|
||||
// observed container is already correct — operators see flapping
|
||||
// container IDs, intermittent connectivity blips, log noise.
|
||||
//
|
||||
// Root cause: `podman list` (v5.x) doesn't surface env or mounts,
|
||||
// so we can't compare them; the original author chose to declare
|
||||
// "any spec with env/volumes is drifted" as a fail-safe. That's
|
||||
// the wrong default for a polling reconciler — it weaponizes the
|
||||
// poll into a re-creation loop.
|
||||
//
|
||||
// Right fix (out of scope for the demo, in scope for delivery):
|
||||
// 1. Switch this code path to `containers.get(name).inspect()`
|
||||
// which DOES return env + mounts. Compare structurally.
|
||||
// 2. Treat absent fields on the inspect response as "unchanged",
|
||||
// not "drifted".
|
||||
// 3. Add an integration test that runs ensure_service_running
|
||||
// twice on the same spec and asserts the container ID is
|
||||
// unchanged.
|
||||
//
|
||||
// Layered next: the upcoming health-check addition to
|
||||
// ContainerSpec gives the agent a separate signal to decide
|
||||
// when to recreate (failed health checks → unhealthy → recreate)
|
||||
// independent of the spec-drift check.
|
||||
//
|
||||
// Until fixed: avoid env / volumes in demo-time deployments to
|
||||
// dodge the loop. The hello-web nginx demo doesn't have either,
|
||||
// which is why it's stable.
|
||||
if !spec.env.is_empty() || !spec.volumes.is_empty() {
|
||||
return false;
|
||||
}
|
||||
// Restart policy: ListContainer doesn't surface it directly. We
|
||||
// only force a recreate when the spec explicitly asks for something
|
||||
// other than the default — so unchanged podman-default behaviour
|
||||
// stays a NOOP, and explicit policy changes converge on next apply.
|
||||
if spec.restart_policy != RestartPolicy::default() {
|
||||
return false;
|
||||
}
|
||||
true
|
||||
}
|
||||
|
||||
fn volume_to_mount(v: &VolumeMount) -> ContainerMount {
|
||||
|
||||
Reference in New Issue
Block a user