feat(fleet): graceful roll-forward upgrade + container-ID identity (Ch5) #331
66
ROADMAP/fleet_platform/ch5-graceful-upgrade-status.md
Normal file
66
ROADMAP/fleet_platform/ch5-graceful-upgrade-status.md
Normal file
@@ -0,0 +1,66 @@
|
|||||||
|
# Ch5 — Graceful deployment upgrade (roll-forward) + redeploy-loop fix: status
|
||||||
|
|
||||||
|
Two things shipped together because they share the same root: **how the agent
|
||||||
|
identifies a running container vs. its desired spec**.
|
||||||
|
|
||||||
|
## The redeploy-loop fix (the bug JG flagged)
|
||||||
|
|
||||||
|
`podman ps` doesn't surface env/volumes, so the old `matches_spec` declared
|
||||||
|
*any* service with env or volumes "drifted" and re-created it **every 30s tick** —
|
||||||
|
flapping container ids, connectivity blips, log noise.
|
||||||
|
|
||||||
|
Fix (per JG: "use container IDs, store them in NATS"): identity is the
|
||||||
|
**container, by name+id**, not a spec comparison.
|
||||||
|
- `ensure_service_running` is now **liveness-only**: a running container is a
|
||||||
|
NOOP, no spec compare. The periodic tick adopts-or-(re)starts by name and
|
||||||
|
never recreates a healthy container. The loop is gone — proven by
|
||||||
|
`tick_is_idempotent_no_recreate` (a service *with env*, the old loop trigger).
|
||||||
|
- Spec changes are detected where they always were — the reconciler's
|
||||||
|
byte-compare of the desired-state JSON on a KV Put — not by the runtime.
|
||||||
|
- The agent records each container's **id** (from `start_service`) in the cache
|
||||||
|
and in `DeploymentState.container_ids` (surfaced for the dashboard).
|
||||||
|
|
||||||
|
## Graceful roll-forward upgrade
|
||||||
|
|
||||||
|
- `PodmanV0Score.lifecycle: Option<LifecyclePolicy>` (`stop_signal=SIGTERM`,
|
||||||
|
`grace_period_secs=30`). The stop signal is baked into the container at
|
||||||
|
create; the grace period drives `podman stop --time`, after which podman
|
||||||
|
always SIGKILLs — so there's no `sigkill_fallback` knob to pretend otherwise
|
||||||
|
(a config field nothing honors is a defect per AGENTS.md).
|
||||||
|
- On a changed score the reconciler `converge`s: for each service, **stop the
|
||||||
|
exact old container by its recorded id** (SIGTERM → grace → SIGKILL), then
|
||||||
|
start the new one, record the new id. Services dropped from the spec are
|
||||||
|
stopped. **Roll-forward only**: a failure reports `Phase::Failed` and is never
|
||||||
|
reverted (the customer edits the spec) — proven by
|
||||||
|
`roll_forward_failure_reports_failed_without_revert`.
|
||||||
|
- The transient cutover window reports `Phase::Pending` (its documented "in
|
||||||
|
flight" meaning) → `Running`, so the dashboard reflects the step without a new
|
||||||
|
Phase variant.
|
||||||
|
|
||||||
|
## Architecture note
|
||||||
|
|
||||||
|
The graceful-replace orchestration lives in the **agent reconciler** (which
|
||||||
|
holds the cross-reconcile state: cached score + container ids), driving the
|
||||||
|
`ContainerRuntime` capability directly. The `PodmanV0Score` interpret stays the
|
||||||
|
"ensure running" path for non-agent/initial converge. `container_spec` is the
|
||||||
|
one place both share the `PodmanService → ContainerSpec` mapping. The reconciler
|
||||||
|
is now generic over `dyn ContainerRuntime`, so the convergence logic is unit-
|
||||||
|
tested against a `FakeRuntime` (no podman socket).
|
||||||
|
|
||||||
|
## Tested
|
||||||
|
|
||||||
|
- `tick_is_idempotent_no_recreate` — the loop-fix.
|
||||||
|
- `changed_score_gracefully_replaces_by_id` — stop-old-by-id then start-new.
|
||||||
|
- `unchanged_score_is_a_noop`.
|
||||||
|
- `roll_forward_failure_reports_failed_without_revert`.
|
||||||
|
- Plus the existing phase-transition tests, all green.
|
||||||
|
|
||||||
|
## Flagged for a supervised run
|
||||||
|
|
||||||
|
- **VM e2e v1→v2→v3 with controlled failures** (`harmony-fleet-e2e/src/vm`).
|
||||||
|
Needs multiple image tags reachable from the KVM guest, like Ch4's upgrade
|
||||||
|
e2e. The convergence logic is unit-proven against the fake runtime; this is
|
||||||
|
the on-hardware integration proof. The existing `vm_deploy_lifecycle.rs`
|
||||||
|
covers single-deploy.
|
||||||
|
- Surfacing `container_ids` in the dashboard device view (data is already on
|
||||||
|
`DeploymentState`).
|
||||||
@@ -423,6 +423,7 @@ async fn apply_one_cr(
|
|||||||
volumes: vec![],
|
volumes: vec![],
|
||||||
restart_policy: Default::default(),
|
restart_policy: Default::default(),
|
||||||
}],
|
}],
|
||||||
|
lifecycle: None,
|
||||||
}),
|
}),
|
||||||
rollout: Rollout {
|
rollout: Rollout {
|
||||||
strategy: RolloutStrategy::Immediate,
|
strategy: RolloutStrategy::Immediate,
|
||||||
@@ -496,6 +497,7 @@ async fn simulate_state_loop(
|
|||||||
last_event_at: Utc::now(),
|
last_event_at: Utc::now(),
|
||||||
last_error: matches!(phase, Phase::Failed)
|
last_error: matches!(phase, Phase::Failed)
|
||||||
.then(|| format!("synthetic failure @{}", device.device_id)),
|
.then(|| format!("synthetic failure @{}", device.device_id)),
|
||||||
|
container_ids: vec![],
|
||||||
};
|
};
|
||||||
match serde_json::to_vec(&ds) {
|
match serde_json::to_vec(&ds) {
|
||||||
Ok(payload) => match bucket.put(&state_key, payload.into()).await {
|
Ok(payload) => match bucket.put(&state_key, payload.into()).await {
|
||||||
|
|||||||
@@ -207,6 +207,7 @@ fn build_cr(cli: &Cli) -> Deployment {
|
|||||||
volumes,
|
volumes,
|
||||||
restart_policy: cli.restart.into(),
|
restart_policy: cli.restart.into(),
|
||||||
}],
|
}],
|
||||||
|
lifecycle: None,
|
||||||
};
|
};
|
||||||
|
|
||||||
let payload = ReconcileScore::PodmanV0(score);
|
let payload = ReconcileScore::PodmanV0(score);
|
||||||
|
|||||||
@@ -298,7 +298,6 @@ async fn main() -> Result<()> {
|
|||||||
Arc::new(Reconciler::new(
|
Arc::new(Reconciler::new(
|
||||||
device_id.clone(),
|
device_id.clone(),
|
||||||
t.clone(),
|
t.clone(),
|
||||||
inventory.clone(),
|
|
||||||
Some(fleet.clone()),
|
Some(fleet.clone()),
|
||||||
))
|
))
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -7,76 +7,69 @@ use chrono::Utc;
|
|||||||
use harmony_reconciler_contracts::{DeploymentName, DeploymentState, Id, Phase};
|
use harmony_reconciler_contracts::{DeploymentName, DeploymentState, Id, Phase};
|
||||||
use tokio::sync::Mutex;
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
use harmony::inventory::Inventory;
|
use harmony::modules::podman::{PodmanV0Score, ReconcileScore};
|
||||||
use harmony::modules::podman::{PodmanTopology, PodmanV0Score, ReconcileScore};
|
use harmony::topology::{ContainerRuntime, ContainerSpec};
|
||||||
use harmony::score::Score;
|
|
||||||
|
|
||||||
use crate::fleet_publisher::FleetPublisher;
|
use crate::fleet_publisher::FleetPublisher;
|
||||||
|
|
||||||
/// Cache key → last-seen state, populated by `apply` and consulted by the
|
/// Cache key → last-seen state, populated by `apply` and consulted by the
|
||||||
/// 30-second periodic tick and the delete path.
|
/// 30-second periodic tick and the delete path.
|
||||||
struct CachedEntry {
|
struct CachedEntry {
|
||||||
/// Serialized score JSON. Used for string-compare idempotency per
|
/// Serialized score JSON. Byte-compared for idempotency: a re-delivered
|
||||||
/// ROADMAP §5.5 — cheaper and more deterministic than a hash.
|
/// identical desired-state is a NOOP; a changed one drives a graceful
|
||||||
|
/// roll-forward upgrade.
|
||||||
serialized: String,
|
serialized: String,
|
||||||
/// Parsed score. Cached so the periodic reconcile tick and delete
|
/// Parsed score, so the tick / delete paths don't re-parse.
|
||||||
/// handlers don't have to re-parse the JSON.
|
|
||||||
score: PodmanV0Score,
|
score: PodmanV0Score,
|
||||||
|
/// Runtime container id per service name (v0.3 Ch5). The identity handle:
|
||||||
|
/// the tick checks these are alive (not the spec — which caused the old 30s
|
||||||
|
/// redeploy loop), and an upgrade stops the exact old container by id.
|
||||||
|
container_ids: HashMap<String, String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Reconciler {
|
pub struct Reconciler {
|
||||||
device_id: Id,
|
device_id: Id,
|
||||||
topology: Arc<PodmanTopology>,
|
topology: Arc<dyn ContainerRuntime>,
|
||||||
inventory: Arc<Inventory>,
|
/// Keyed by NATS KV key (`<device>.<deployment>`). One entry per key.
|
||||||
/// Keyed by NATS KV key (`<device>.<deployment>`). A single entry per
|
|
||||||
/// KV key — in v0 there is no fan-out from one key to many scores.
|
|
||||||
state: Mutex<HashMap<String, CachedEntry>>,
|
state: Mutex<HashMap<String, CachedEntry>>,
|
||||||
/// Current phase per deployment, used to decide whether a new
|
/// Last (phase, container_ids) published per deployment, to skip
|
||||||
/// write to the `device-state` KV is needed.
|
/// re-publishing unchanged device-state.
|
||||||
///
|
deployments: Mutex<HashMap<DeploymentName, (Phase, Vec<String>)>>,
|
||||||
/// NOTE : this feels dangerous, conflict on deployment name could be a problem
|
/// Publish surface. Optional so unit tests without a live NATS client work.
|
||||||
/// We must explore this and clarify it in the design and decide if it is a constraint
|
|
||||||
deployments: Mutex<HashMap<DeploymentName, Phase>>,
|
|
||||||
/// Publish surface. Optional so unit tests without a live NATS
|
|
||||||
/// client still work; always populated in the real agent runtime.
|
|
||||||
fleet: Option<Arc<FleetPublisher>>,
|
fleet: Option<Arc<FleetPublisher>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Reconciler {
|
impl Reconciler {
|
||||||
pub fn new(
|
pub fn new(
|
||||||
device_id: Id,
|
device_id: Id,
|
||||||
topology: Arc<PodmanTopology>,
|
topology: Arc<dyn ContainerRuntime>,
|
||||||
inventory: Arc<Inventory>,
|
|
||||||
fleet: Option<Arc<FleetPublisher>>,
|
fleet: Option<Arc<FleetPublisher>>,
|
||||||
) -> Self {
|
) -> Self {
|
||||||
Self {
|
Self {
|
||||||
device_id,
|
device_id,
|
||||||
topology,
|
topology,
|
||||||
inventory,
|
|
||||||
state: Mutex::new(HashMap::new()),
|
state: Mutex::new(HashMap::new()),
|
||||||
deployments: Mutex::new(HashMap::new()),
|
deployments: Mutex::new(HashMap::new()),
|
||||||
fleet,
|
fleet,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Record a new phase for a deployment and, if it changed, write
|
/// Record a phase (+ backing container ids) for a deployment and publish the
|
||||||
/// the updated [`DeploymentState`] to the KV. Same-phase
|
/// [`DeploymentState`] when it changed. Same (phase, ids) is a no-op so the
|
||||||
/// re-confirmations are no-ops so the periodic reconcile tick
|
/// periodic tick doesn't churn the bucket.
|
||||||
/// doesn't churn the bucket.
|
|
||||||
async fn apply_phase(
|
async fn apply_phase(
|
||||||
&self,
|
&self,
|
||||||
deployment: &DeploymentName,
|
deployment: &DeploymentName,
|
||||||
phase: Phase,
|
phase: Phase,
|
||||||
last_error: Option<String>,
|
last_error: Option<String>,
|
||||||
|
container_ids: Vec<String>,
|
||||||
) {
|
) {
|
||||||
{
|
{
|
||||||
let mut phases = self.deployments.lock().await;
|
let mut phases = self.deployments.lock().await;
|
||||||
// performance nitpick : we don't need a write lock here, we could check before acquiring the write
|
if phases.get(deployment) == Some(&(phase, container_ids.clone())) {
|
||||||
// lock
|
|
||||||
if phases.get(deployment).copied() == Some(phase) {
|
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
phases.insert(deployment.clone(), phase);
|
phases.insert(deployment.clone(), (phase, container_ids.clone()));
|
||||||
}
|
}
|
||||||
|
|
||||||
if let Some(publisher) = &self.fleet {
|
if let Some(publisher) = &self.fleet {
|
||||||
@@ -86,14 +79,12 @@ impl Reconciler {
|
|||||||
phase,
|
phase,
|
||||||
last_event_at: Utc::now(),
|
last_event_at: Utc::now(),
|
||||||
last_error,
|
last_error,
|
||||||
|
container_ids,
|
||||||
};
|
};
|
||||||
publisher.write_deployment_state(&state).await;
|
publisher.write_deployment_state(&state).await;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Clear the in-memory phase for a deployment and delete its KV
|
|
||||||
/// entry. Idempotent: a delete for a never-applied deployment is
|
|
||||||
/// a no-op in memory and a harmless tombstone write on the wire.
|
|
||||||
async fn drop_phase(&self, deployment: &DeploymentName) {
|
async fn drop_phase(&self, deployment: &DeploymentName) {
|
||||||
let was_known = {
|
let was_known = {
|
||||||
let mut phases = self.deployments.lock().await;
|
let mut phases = self.deployments.lock().await;
|
||||||
@@ -107,9 +98,26 @@ impl Reconciler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Handle a Put event (new or updated score on NATS KV). No-ops if the
|
/// Adopt-or-start: if a container with this name is already running, return
|
||||||
/// serialized score is byte-identical to the last-seen value for this
|
/// its id (no recreate — survives an agent restart without flapping the
|
||||||
/// key.
|
/// workload); otherwise (re)create it from `spec` and return the new id.
|
||||||
|
async fn ensure_started(&self, spec: &ContainerSpec) -> Result<String> {
|
||||||
|
match self
|
||||||
|
.topology
|
||||||
|
.container_status(&spec.name)
|
||||||
|
.await
|
||||||
|
.map_err(rt)?
|
||||||
|
{
|
||||||
|
Some(obs) if obs.running => Ok(obs.id),
|
||||||
|
_ => self.topology.start_service(spec).await.map_err(rt),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle a Put: NOOP if byte-identical to the last-seen score; a new key
|
||||||
|
/// starts its services; a changed key **gracefully rolls forward** — stop
|
||||||
|
/// each old container (SIGTERM → grace → SIGKILL per the score's
|
||||||
|
/// `LifecyclePolicy`), then start the new one. Roll-forward only: a failure
|
||||||
|
/// is reported as `Failed`, never reverted (the customer edits the spec).
|
||||||
pub async fn apply(&self, key: &str, value: &[u8]) -> Result<()> {
|
pub async fn apply(&self, key: &str, value: &[u8]) -> Result<()> {
|
||||||
let deployment = deployment_from_key(key);
|
let deployment = deployment_from_key(key);
|
||||||
let incoming = match serde_json::from_slice::<ReconcileScore>(value) {
|
let incoming = match serde_json::from_slice::<ReconcileScore>(value) {
|
||||||
@@ -117,7 +125,12 @@ impl Reconciler {
|
|||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::warn!(key, error = %e, "failed to deserialize score");
|
tracing::warn!(key, error = %e, "failed to deserialize score");
|
||||||
if let Some(name) = &deployment {
|
if let Some(name) = &deployment {
|
||||||
self.apply_phase(name, Phase::Failed, Some(format!("bad payload: {e}")))
|
self.apply_phase(
|
||||||
|
name,
|
||||||
|
Phase::Failed,
|
||||||
|
Some(format!("bad payload: {e}")),
|
||||||
|
vec![],
|
||||||
|
)
|
||||||
.await;
|
.await;
|
||||||
}
|
}
|
||||||
return Ok(());
|
return Ok(());
|
||||||
@@ -125,71 +138,122 @@ impl Reconciler {
|
|||||||
};
|
};
|
||||||
let serialized = String::from_utf8_lossy(value).into_owned();
|
let serialized = String::from_utf8_lossy(value).into_owned();
|
||||||
|
|
||||||
{
|
let previous = {
|
||||||
let state = self.state.lock().await;
|
let state = self.state.lock().await;
|
||||||
if let Some(existing) = state.get(key) {
|
match state.get(key) {
|
||||||
if existing.serialized == serialized {
|
Some(existing) if existing.serialized == serialized => {
|
||||||
tracing::debug!(key, "score unchanged — noop");
|
tracing::debug!(key, "score unchanged — noop");
|
||||||
return Ok(());
|
return Ok(());
|
||||||
}
|
}
|
||||||
|
Some(existing) => Some(existing.container_ids.clone()),
|
||||||
|
None => None,
|
||||||
}
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
// Upgrade in flight → Pending (the transient cutover window).
|
||||||
|
if let Some(name) = &deployment {
|
||||||
|
self.apply_phase(name, Phase::Pending, None, vec![]).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
match self.converge(&incoming, previous.as_ref()).await {
|
||||||
|
Ok(ids) => {
|
||||||
|
let id_list: Vec<String> = incoming
|
||||||
|
.services
|
||||||
|
.iter()
|
||||||
|
.filter_map(|s| ids.get(&s.name).cloned())
|
||||||
|
.collect();
|
||||||
if let Some(name) = &deployment {
|
if let Some(name) = &deployment {
|
||||||
self.apply_phase(name, Phase::Pending, None).await;
|
self.apply_phase(name, Phase::Running, None, id_list).await;
|
||||||
}
|
}
|
||||||
|
|
||||||
match self.run_score(key, &incoming).await {
|
|
||||||
Ok(()) => {
|
|
||||||
if let Some(name) = &deployment {
|
|
||||||
self.apply_phase(name, Phase::Running, None).await;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
if let Some(name) = &deployment {
|
|
||||||
self.apply_phase(name, Phase::Failed, Some(short(&e.to_string())))
|
|
||||||
.await;
|
|
||||||
}
|
|
||||||
return Err(e);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut state = self.state.lock().await;
|
let mut state = self.state.lock().await;
|
||||||
state.insert(
|
state.insert(
|
||||||
key.to_string(),
|
key.to_string(),
|
||||||
CachedEntry {
|
CachedEntry {
|
||||||
serialized,
|
serialized,
|
||||||
score: incoming,
|
score: incoming,
|
||||||
|
container_ids: ids,
|
||||||
},
|
},
|
||||||
);
|
);
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
Err(e) => {
|
||||||
|
if let Some(name) = &deployment {
|
||||||
|
self.apply_phase(name, Phase::Failed, Some(short(&e.to_string())), vec![])
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
Err(e)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Handle a Delete/Purge event. Stops and removes every container
|
/// Drive actual → desired for one score. New deployment: start each service.
|
||||||
/// referenced by the last cached score for this key. Idempotent: if we
|
/// Changed: stop the old container by id (graceful) then start the new one,
|
||||||
/// never saw a Put for this key (agent restart after delete), logs and
|
/// and remove services dropped from the spec. Returns service → new id.
|
||||||
/// returns ok.
|
async fn converge(
|
||||||
|
&self,
|
||||||
|
score: &PodmanV0Score,
|
||||||
|
previous: Option<&HashMap<String, String>>,
|
||||||
|
) -> Result<HashMap<String, String>> {
|
||||||
|
let lifecycle = score.lifecycle_policy();
|
||||||
|
let mut ids = HashMap::new();
|
||||||
|
for service in &score.services {
|
||||||
|
let spec = score.container_spec(service);
|
||||||
|
match previous.and_then(|p| p.get(&service.name)) {
|
||||||
|
// Existing service whose spec changed → graceful replace by id.
|
||||||
|
Some(old_id) => {
|
||||||
|
self.topology
|
||||||
|
.stop_service(old_id, lifecycle.grace_period_secs)
|
||||||
|
.await
|
||||||
|
.map_err(rt)?;
|
||||||
|
let id = self.topology.start_service(&spec).await.map_err(rt)?;
|
||||||
|
ids.insert(service.name.clone(), id);
|
||||||
|
}
|
||||||
|
// New (or first-seen) service → adopt-or-start.
|
||||||
|
None => {
|
||||||
|
ids.insert(service.name.clone(), self.ensure_started(&spec).await?);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Services removed from the spec: gracefully stop their old containers.
|
||||||
|
if let Some(prev) = previous {
|
||||||
|
for (old_name, old_id) in prev {
|
||||||
|
if !score.services.iter().any(|s| &s.name == old_name) {
|
||||||
|
if let Err(e) = self
|
||||||
|
.topology
|
||||||
|
.stop_service(old_id, lifecycle.grace_period_secs)
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::warn!(service = %old_name, error = %e, "failed to stop removed service");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Ok(ids)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Handle a Delete/Purge: gracefully stop+remove every container for the
|
||||||
|
/// last cached score. Idempotent.
|
||||||
pub async fn remove(&self, key: &str) -> Result<()> {
|
pub async fn remove(&self, key: &str) -> Result<()> {
|
||||||
let deployment = deployment_from_key(key);
|
let deployment = deployment_from_key(key);
|
||||||
let mut state = self.state.lock().await;
|
let entry = self.state.lock().await.remove(key);
|
||||||
let Some(entry) = state.remove(key) else {
|
let Some(entry) = entry else {
|
||||||
tracing::info!(key, "delete for unknown key — nothing to remove");
|
tracing::info!(key, "delete for unknown key — nothing to remove");
|
||||||
if let Some(name) = &deployment {
|
if let Some(name) = &deployment {
|
||||||
self.drop_phase(name).await;
|
self.drop_phase(name).await;
|
||||||
}
|
}
|
||||||
return Ok(());
|
return Ok(());
|
||||||
};
|
};
|
||||||
drop(state);
|
|
||||||
|
|
||||||
use harmony::topology::ContainerRuntime;
|
let grace = entry.score.lifecycle_policy().grace_period_secs;
|
||||||
for service in &entry.score.services {
|
for service in &entry.score.services {
|
||||||
if let Err(e) = self.topology.remove_service(&service.name).await {
|
// Prefer the recorded id; fall back to the service name.
|
||||||
tracing::warn!(
|
let target = entry
|
||||||
key,
|
.container_ids
|
||||||
service = %service.name,
|
.get(&service.name)
|
||||||
error = %e,
|
.cloned()
|
||||||
"failed to remove container"
|
.unwrap_or_else(|| service.name.clone());
|
||||||
);
|
if let Err(e) = self.topology.stop_service(&target, grace).await {
|
||||||
|
tracing::warn!(key, service = %service.name, error = %e, "failed to remove container");
|
||||||
} else {
|
} else {
|
||||||
tracing::info!(key, service = %service.name, "removed container");
|
tracing::info!(key, service = %service.name, "removed container");
|
||||||
}
|
}
|
||||||
@@ -200,11 +264,10 @@ impl Reconciler {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Periodic ground-truth reconcile. ROADMAP §5.6 — "polling instead of
|
/// Periodic ground-truth reconcile. **Liveness only**: verify each cached
|
||||||
/// event-driven PLEG. Agent polls podman every 30s as ground truth;
|
/// deployment's containers are up (adopt-or-start), never compare the spec
|
||||||
/// KV watch events are accelerators." Re-runs each cached score against
|
/// — that's what killed the old 30s redeploy loop. Spec changes arrive as a
|
||||||
/// podman-api; the underlying `ensure_service_running` is idempotent
|
/// KV Put and go through `apply`'s graceful path instead.
|
||||||
/// so a converged state produces no log noise.
|
|
||||||
pub async fn tick(&self) -> Result<()> {
|
pub async fn tick(&self) -> Result<()> {
|
||||||
let snapshot: Vec<(String, PodmanV0Score)> = {
|
let snapshot: Vec<(String, PodmanV0Score)> = {
|
||||||
let state = self.state.lock().await;
|
let state = self.state.lock().await;
|
||||||
@@ -215,20 +278,39 @@ impl Reconciler {
|
|||||||
};
|
};
|
||||||
for (key, score) in snapshot {
|
for (key, score) in snapshot {
|
||||||
let deployment = deployment_from_key(&key);
|
let deployment = deployment_from_key(&key);
|
||||||
match self.run_score(&key, &score).await {
|
let mut ids = HashMap::new();
|
||||||
Ok(()) => {
|
let mut error: Option<String> = None;
|
||||||
if let Some(name) = &deployment {
|
for service in &score.services {
|
||||||
self.apply_phase(name, Phase::Running, None).await;
|
match self.ensure_started(&score.container_spec(service)).await {
|
||||||
}
|
Ok(id) => {
|
||||||
|
ids.insert(service.name.clone(), id);
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Err(e) => {
|
||||||
tracing::warn!(key, error = %e, "periodic reconcile failed");
|
tracing::warn!(key, error = %e, "periodic reconcile failed");
|
||||||
if let Some(name) = &deployment {
|
error = Some(short(&e.to_string()));
|
||||||
self.apply_phase(name, Phase::Failed, Some(short(&e.to_string())))
|
break;
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
// Refresh the cached ids so a recreated container's id is recorded.
|
||||||
|
if error.is_none() {
|
||||||
|
if let Some(entry) = self.state.lock().await.get_mut(&key) {
|
||||||
|
entry.container_ids = ids.clone();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if let Some(name) = &deployment {
|
||||||
|
match error {
|
||||||
|
None => {
|
||||||
|
let id_list = score
|
||||||
|
.services
|
||||||
|
.iter()
|
||||||
|
.filter_map(|s| ids.get(&s.name).cloned())
|
||||||
|
.collect();
|
||||||
|
self.apply_phase(name, Phase::Running, None, id_list).await;
|
||||||
|
}
|
||||||
|
Some(e) => self.apply_phase(name, Phase::Failed, Some(e), vec![]).await,
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
@@ -243,16 +325,11 @@ impl Reconciler {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
async fn run_score(&self, key: &str, score: &PodmanV0Score) -> Result<()> {
|
/// Map a runtime `ExecutorError` into `anyhow` at the agent boundary.
|
||||||
let interpret = Score::<PodmanTopology>::create_interpret(score);
|
fn rt(e: harmony::executors::ExecutorError) -> anyhow::Error {
|
||||||
let outcome = interpret
|
anyhow::anyhow!("container runtime: {e}")
|
||||||
.execute(&self.inventory, &self.topology)
|
|
||||||
.await
|
|
||||||
.map_err(|e| anyhow::anyhow!("PodmanV0Score interpret failed for {key}: {e}"))?;
|
|
||||||
tracing::info!(key, outcome = ?outcome, "reconciled");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Extract the deployment name from a NATS KV key of the form
|
/// Extract the deployment name from a NATS KV key of the form
|
||||||
@@ -277,25 +354,168 @@ fn short(s: &str) -> String {
|
|||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
//! Focused tests for transition detection. Drive `apply_phase` /
|
//! Drives the reconciler against an in-memory [`FakeRuntime`] (no podman
|
||||||
//! `drop_phase` directly with an inert topology (no real podman
|
//! socket) and a `None` FleetPublisher, so the convergence logic — the
|
||||||
//! socket) and a `None` FleetPublisher.
|
//! loop-fix and the graceful roll-forward — is tested deterministically.
|
||||||
use super::*;
|
use super::*;
|
||||||
use harmony::inventory::Inventory;
|
use harmony::executors::ExecutorError;
|
||||||
use harmony::modules::podman::PodmanTopology;
|
use harmony::modules::podman::{PodmanService, ReconcileScore};
|
||||||
use std::path::PathBuf;
|
use harmony::topology::{ContainerObservation, ContainerState};
|
||||||
|
use std::sync::Mutex as StdMutex;
|
||||||
|
|
||||||
fn reconciler() -> Reconciler {
|
#[derive(Default)]
|
||||||
let topology = Arc::new(
|
struct FakeState {
|
||||||
PodmanTopology::from_unix_socket(PathBuf::from("/nonexistent/for-tests")).unwrap(),
|
/// name → (id, running).
|
||||||
);
|
containers: HashMap<String, (String, bool)>,
|
||||||
let inventory = Arc::new(Inventory::empty());
|
next_id: usize,
|
||||||
Reconciler::new(
|
started: Vec<String>,
|
||||||
|
stopped: Vec<String>,
|
||||||
|
fail_next_start: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct FakeRuntime {
|
||||||
|
inner: StdMutex<FakeState>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl FakeRuntime {
|
||||||
|
fn started(&self) -> Vec<String> {
|
||||||
|
self.inner.lock().unwrap().started.clone()
|
||||||
|
}
|
||||||
|
fn stopped(&self) -> Vec<String> {
|
||||||
|
self.inner.lock().unwrap().stopped.clone()
|
||||||
|
}
|
||||||
|
fn fail_next_start(&self) {
|
||||||
|
self.inner.lock().unwrap().fail_next_start = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait::async_trait]
|
||||||
|
impl ContainerRuntime for FakeRuntime {
|
||||||
|
async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError> {
|
||||||
|
self.start_service(spec).await.map(|_| ())
|
||||||
|
}
|
||||||
|
async fn start_service(&self, spec: &ContainerSpec) -> Result<String, ExecutorError> {
|
||||||
|
let mut s = self.inner.lock().unwrap();
|
||||||
|
if s.fail_next_start {
|
||||||
|
s.fail_next_start = false;
|
||||||
|
return Err(ExecutorError::UnexpectedError("start failed".into()));
|
||||||
|
}
|
||||||
|
s.next_id += 1;
|
||||||
|
let id = format!("id{}", s.next_id);
|
||||||
|
s.containers.insert(spec.name.clone(), (id.clone(), true));
|
||||||
|
s.started.push(id.clone());
|
||||||
|
Ok(id)
|
||||||
|
}
|
||||||
|
async fn container_status(
|
||||||
|
&self,
|
||||||
|
name: &str,
|
||||||
|
) -> Result<Option<ContainerObservation>, ExecutorError> {
|
||||||
|
let s = self.inner.lock().unwrap();
|
||||||
|
Ok(s.containers
|
||||||
|
.get(name)
|
||||||
|
.map(|(id, running)| ContainerObservation {
|
||||||
|
id: id.clone(),
|
||||||
|
running: *running,
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
async fn stop_service(&self, id_or_name: &str, _grace: u64) -> Result<(), ExecutorError> {
|
||||||
|
let mut s = self.inner.lock().unwrap();
|
||||||
|
s.stopped.push(id_or_name.to_string());
|
||||||
|
s.containers
|
||||||
|
.retain(|name, (id, _)| name != id_or_name && id != id_or_name);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
async fn remove_service(&self, name: &str) -> Result<(), ExecutorError> {
|
||||||
|
self.stop_service(name, 0).await
|
||||||
|
}
|
||||||
|
async fn list_managed_services(&self) -> Result<Vec<ContainerState>, ExecutorError> {
|
||||||
|
Ok(vec![])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn reconciler() -> (Reconciler, Arc<FakeRuntime>) {
|
||||||
|
let fake = Arc::new(FakeRuntime::default());
|
||||||
|
let r = Reconciler::new(
|
||||||
Id::from("test-device".to_string()),
|
Id::from("test-device".to_string()),
|
||||||
topology,
|
fake.clone() as Arc<dyn ContainerRuntime>,
|
||||||
inventory,
|
|
||||||
None,
|
None,
|
||||||
)
|
);
|
||||||
|
(r, fake)
|
||||||
|
}
|
||||||
|
|
||||||
|
fn score_bytes(image: &str) -> Vec<u8> {
|
||||||
|
let score = PodmanV0Score {
|
||||||
|
services: vec![PodmanService {
|
||||||
|
name: "web".to_string(),
|
||||||
|
image: image.to_string(),
|
||||||
|
ports: vec![],
|
||||||
|
env: vec![EnvVar::new("LOG", "info")],
|
||||||
|
volumes: vec![],
|
||||||
|
restart_policy: Default::default(),
|
||||||
|
}],
|
||||||
|
lifecycle: None,
|
||||||
|
};
|
||||||
|
serde_json::to_vec(&ReconcileScore::PodmanV0(score)).unwrap()
|
||||||
|
}
|
||||||
|
|
||||||
|
use harmony::topology::EnvVar;
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn tick_is_idempotent_no_recreate() {
|
||||||
|
// The loop-fix: a service WITH env (which the old matches_spec declared
|
||||||
|
// "always drifted") must not be recreated by the periodic tick.
|
||||||
|
let (r, fake) = reconciler();
|
||||||
|
r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap();
|
||||||
|
assert_eq!(fake.started().len(), 1, "initial start");
|
||||||
|
for _ in 0..3 {
|
||||||
|
r.tick().await.unwrap();
|
||||||
|
}
|
||||||
|
assert_eq!(
|
||||||
|
fake.started().len(),
|
||||||
|
1,
|
||||||
|
"tick must not recreate a running container"
|
||||||
|
);
|
||||||
|
assert!(
|
||||||
|
fake.stopped().is_empty(),
|
||||||
|
"tick must not stop a healthy container"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn changed_score_gracefully_replaces_by_id() {
|
||||||
|
let (r, fake) = reconciler();
|
||||||
|
r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap();
|
||||||
|
r.apply("dev.web", &score_bytes("nginx:2")).await.unwrap();
|
||||||
|
// Old container (id1) stopped gracefully, new one (id2) started.
|
||||||
|
assert_eq!(fake.started(), vec!["id1", "id2"]);
|
||||||
|
assert_eq!(fake.stopped(), vec!["id1"]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn unchanged_score_is_a_noop() {
|
||||||
|
let (r, fake) = reconciler();
|
||||||
|
r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap();
|
||||||
|
r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap();
|
||||||
|
assert_eq!(fake.started().len(), 1, "byte-identical re-apply is a noop");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn roll_forward_failure_reports_failed_without_revert() {
|
||||||
|
let (r, fake) = reconciler();
|
||||||
|
r.apply("dev.web", &score_bytes("nginx:1")).await.unwrap();
|
||||||
|
fake.fail_next_start();
|
||||||
|
let res = r.apply("dev.web", &score_bytes("nginx:2")).await;
|
||||||
|
assert!(res.is_err(), "upgrade failure surfaces");
|
||||||
|
// Old was stopped; new start failed; NO revert (roll-forward only).
|
||||||
|
assert_eq!(fake.stopped(), vec!["id1"]);
|
||||||
|
assert_eq!(
|
||||||
|
fake.started(),
|
||||||
|
vec!["id1"],
|
||||||
|
"no revert restart of the old version"
|
||||||
|
);
|
||||||
|
let phases = r.deployments.lock().await;
|
||||||
|
assert_eq!(phases.get(&dn("web")).map(|(p, _)| *p), Some(Phase::Failed));
|
||||||
}
|
}
|
||||||
|
|
||||||
fn dn(s: &str) -> DeploymentName {
|
fn dn(s: &str) -> DeploymentName {
|
||||||
@@ -304,36 +524,63 @@ mod tests {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn apply_phase_records_new_phase() {
|
async fn apply_phase_records_new_phase() {
|
||||||
let r = reconciler();
|
let (r, _) = reconciler();
|
||||||
r.apply_phase(&dn("hello"), Phase::Running, None).await;
|
r.apply_phase(&dn("hello"), Phase::Running, None, vec![])
|
||||||
|
.await;
|
||||||
let phases = r.deployments.lock().await;
|
let phases = r.deployments.lock().await;
|
||||||
assert_eq!(phases.get(&dn("hello")), Some(&Phase::Running));
|
assert_eq!(
|
||||||
|
phases.get(&dn("hello")).map(|(p, _)| *p),
|
||||||
|
Some(Phase::Running)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn apply_phase_idempotent_for_same_phase() {
|
async fn apply_phase_idempotent_for_same_phase_and_ids() {
|
||||||
let r = reconciler();
|
let (r, _) = reconciler();
|
||||||
r.apply_phase(&dn("hello"), Phase::Running, None).await;
|
r.apply_phase(&dn("hello"), Phase::Running, None, vec!["abc".into()])
|
||||||
r.apply_phase(&dn("hello"), Phase::Running, None).await;
|
.await;
|
||||||
|
r.apply_phase(&dn("hello"), Phase::Running, None, vec!["abc".into()])
|
||||||
|
.await;
|
||||||
let phases = r.deployments.lock().await;
|
let phases = r.deployments.lock().await;
|
||||||
assert_eq!(phases.len(), 1);
|
assert_eq!(phases.len(), 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn apply_phase_transitions_update_phase() {
|
async fn apply_phase_republishes_when_container_id_changes() {
|
||||||
let r = reconciler();
|
let (r, _) = reconciler();
|
||||||
r.apply_phase(&dn("hello"), Phase::Pending, None).await;
|
r.apply_phase(&dn("hello"), Phase::Running, None, vec!["old".into()])
|
||||||
r.apply_phase(&dn("hello"), Phase::Running, None).await;
|
.await;
|
||||||
r.apply_phase(&dn("hello"), Phase::Failed, Some("oom".to_string()))
|
// Same phase, new id (after an upgrade) → recorded.
|
||||||
|
r.apply_phase(&dn("hello"), Phase::Running, None, vec!["new".into()])
|
||||||
.await;
|
.await;
|
||||||
let phases = r.deployments.lock().await;
|
let phases = r.deployments.lock().await;
|
||||||
assert_eq!(phases.get(&dn("hello")), Some(&Phase::Failed));
|
assert_eq!(
|
||||||
|
phases.get(&dn("hello")),
|
||||||
|
Some(&(Phase::Running, vec!["new".to_string()]))
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn apply_phase_transitions_update_phase() {
|
||||||
|
let (r, _) = reconciler();
|
||||||
|
r.apply_phase(&dn("hello"), Phase::Pending, None, vec![])
|
||||||
|
.await;
|
||||||
|
r.apply_phase(&dn("hello"), Phase::Running, None, vec![])
|
||||||
|
.await;
|
||||||
|
r.apply_phase(&dn("hello"), Phase::Failed, Some("oom".to_string()), vec![])
|
||||||
|
.await;
|
||||||
|
let phases = r.deployments.lock().await;
|
||||||
|
assert_eq!(
|
||||||
|
phases.get(&dn("hello")).map(|(p, _)| *p),
|
||||||
|
Some(Phase::Failed)
|
||||||
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn drop_phase_clears_known_deployment() {
|
async fn drop_phase_clears_known_deployment() {
|
||||||
let r = reconciler();
|
let (r, _) = reconciler();
|
||||||
r.apply_phase(&dn("hello"), Phase::Running, None).await;
|
r.apply_phase(&dn("hello"), Phase::Running, None, vec![])
|
||||||
|
.await;
|
||||||
r.drop_phase(&dn("hello")).await;
|
r.drop_phase(&dn("hello")).await;
|
||||||
let phases = r.deployments.lock().await;
|
let phases = r.deployments.lock().await;
|
||||||
assert!(!phases.contains_key(&dn("hello")));
|
assert!(!phases.contains_key(&dn("hello")));
|
||||||
@@ -341,7 +588,7 @@ mod tests {
|
|||||||
|
|
||||||
#[tokio::test]
|
#[tokio::test]
|
||||||
async fn drop_phase_on_unknown_deployment_is_noop() {
|
async fn drop_phase_on_unknown_deployment_is_noop() {
|
||||||
let r = reconciler();
|
let (r, _) = reconciler();
|
||||||
r.drop_phase(&dn("never-existed")).await;
|
r.drop_phase(&dn("never-existed")).await;
|
||||||
let phases = r.deployments.lock().await;
|
let phases = r.deployments.lock().await;
|
||||||
assert!(phases.is_empty());
|
assert!(phases.is_empty());
|
||||||
|
|||||||
@@ -137,6 +137,7 @@ async fn create_fleet_deployment(deployments: &Api<Deployment>, name: &str) -> a
|
|||||||
volumes: vec![],
|
volumes: vec![],
|
||||||
restart_policy: Default::default(),
|
restart_policy: Default::default(),
|
||||||
}],
|
}],
|
||||||
|
lifecycle: None,
|
||||||
}),
|
}),
|
||||||
rollout: Rollout {
|
rollout: Rollout {
|
||||||
strategy: RolloutStrategy::Immediate,
|
strategy: RolloutStrategy::Immediate,
|
||||||
|
|||||||
@@ -125,6 +125,7 @@ fn podman_score() -> PodmanV0Score {
|
|||||||
volumes: vec![],
|
volumes: vec![],
|
||||||
restart_policy: Default::default(),
|
restart_policy: Default::default(),
|
||||||
}],
|
}],
|
||||||
|
lifecycle: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -248,6 +249,7 @@ async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> {
|
|||||||
phase: Phase::Running,
|
phase: Phase::Running,
|
||||||
last_event_at: Utc::now(),
|
last_event_at: Utc::now(),
|
||||||
last_error: None,
|
last_error: None,
|
||||||
|
container_ids: vec![],
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
kill(h1).await;
|
kill(h1).await;
|
||||||
|
|||||||
@@ -185,5 +185,6 @@ fn podman_score(image_tag: &str) -> PodmanV0Score {
|
|||||||
volumes: vec![],
|
volumes: vec![],
|
||||||
restart_policy: RestartPolicy::default(),
|
restart_policy: RestartPolicy::default(),
|
||||||
}],
|
}],
|
||||||
|
lifecycle: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -92,6 +92,7 @@ async fn agent_ignores_other_devices_keys() -> anyhow::Result<()> {
|
|||||||
volumes: vec![],
|
volumes: vec![],
|
||||||
restart_policy: RestartPolicy::default(),
|
restart_policy: RestartPolicy::default(),
|
||||||
}],
|
}],
|
||||||
|
lifecycle: None,
|
||||||
};
|
};
|
||||||
admin.put_podman_at_key(&foreign_key, &score).await?;
|
admin.put_podman_at_key(&foreign_key, &score).await?;
|
||||||
|
|
||||||
|
|||||||
@@ -818,6 +818,7 @@ mod tests {
|
|||||||
phase,
|
phase,
|
||||||
last_event_at: Utc.timestamp_opt(1_700_000_000 + seconds, 0).unwrap(),
|
last_event_at: Utc.timestamp_opt(1_700_000_000 + seconds, 0).unwrap(),
|
||||||
last_error: None,
|
last_error: None,
|
||||||
|
container_ids: vec![],
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -652,6 +652,7 @@ mod tests {
|
|||||||
volumes: vec![],
|
volumes: vec![],
|
||||||
restart_policy: Default::default(),
|
restart_policy: Default::default(),
|
||||||
}],
|
}],
|
||||||
|
lifecycle: None,
|
||||||
}),
|
}),
|
||||||
rollout: Rollout {
|
rollout: Rollout {
|
||||||
strategy: RolloutStrategy::Immediate,
|
strategy: RolloutStrategy::Immediate,
|
||||||
@@ -705,6 +706,7 @@ mod tests {
|
|||||||
volumes: vec![],
|
volumes: vec![],
|
||||||
restart_policy: Default::default(),
|
restart_policy: Default::default(),
|
||||||
}],
|
}],
|
||||||
|
lifecycle: None,
|
||||||
});
|
});
|
||||||
assert_eq!(first_service_name(&score).as_deref(), Some("web"));
|
assert_eq!(first_service_name(&score).as_deref(), Some("web"));
|
||||||
}
|
}
|
||||||
@@ -720,6 +722,7 @@ mod tests {
|
|||||||
volumes: vec![],
|
volumes: vec![],
|
||||||
restart_policy: Default::default(),
|
restart_policy: Default::default(),
|
||||||
}],
|
}],
|
||||||
|
lifecycle: None,
|
||||||
});
|
});
|
||||||
assert_eq!(deployment_version(&score), "1.25");
|
assert_eq!(deployment_version(&score), "1.25");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -131,6 +131,12 @@ pub struct DeploymentState {
|
|||||||
pub last_event_at: DateTime<Utc>,
|
pub last_event_at: DateTime<Utc>,
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub last_error: Option<String>,
|
pub last_error: Option<String>,
|
||||||
|
/// Runtime container ids backing this deployment on the device, one per
|
||||||
|
/// service (v0.3 Ch5). The agent's identity handle for graceful replacement
|
||||||
|
/// and liveness; surfaced for the dashboard. `#[serde(default)]` for
|
||||||
|
/// wire-compat with pre-Ch5 agents that don't report it.
|
||||||
|
#[serde(default)]
|
||||||
|
pub container_ids: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tiny liveness ping. Written to KV key `heartbeat.<device_id>` in
|
/// Tiny liveness ping. Written to KV key `heartbeat.<device_id>` in
|
||||||
@@ -233,6 +239,7 @@ mod tests {
|
|||||||
phase: Phase::Failed,
|
phase: Phase::Failed,
|
||||||
last_event_at: ts("2026-04-22T10:05:00Z"),
|
last_event_at: ts("2026-04-22T10:05:00Z"),
|
||||||
last_error: Some("image pull 429".to_string()),
|
last_error: Some("image pull 429".to_string()),
|
||||||
|
container_ids: vec![],
|
||||||
};
|
};
|
||||||
let json = serde_json::to_string(&original).unwrap();
|
let json = serde_json::to_string(&original).unwrap();
|
||||||
let back: DeploymentState = serde_json::from_str(&json).unwrap();
|
let back: DeploymentState = serde_json::from_str(&json).unwrap();
|
||||||
|
|||||||
@@ -21,13 +21,36 @@ use crate::executors::ExecutorError;
|
|||||||
/// capability will be clearer than stretching this one.
|
/// capability will be clearer than stretching this one.
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait ContainerRuntime: Send + Sync {
|
pub trait ContainerRuntime: Send + Sync {
|
||||||
/// Ensure a container matching `spec` is running. Idempotent: running the
|
/// Ensure a container matching `spec` is running. Idempotent and
|
||||||
/// same spec twice must converge to the same state with no observable
|
/// **liveness-only**: if a container with this `name` is already running,
|
||||||
/// side effects beyond logs. If a container with the same `name` already
|
/// it's a NOOP — the runtime does **not** compare the spec (a polling
|
||||||
/// exists with the same image and port mappings, this is a NOOP.
|
/// reconciler would otherwise re-create on every tick for fields it can't
|
||||||
/// Otherwise it is stopped, removed, and recreated.
|
/// read back). Spec changes are driven through [`Self::replace_service`] by
|
||||||
|
/// a stateful caller that knows the desired state changed. If no container
|
||||||
|
/// is running, one is (re)created from `spec`.
|
||||||
async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError>;
|
async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError>;
|
||||||
|
|
||||||
|
/// Create + start a container from `spec`, returning its runtime id. Any
|
||||||
|
/// existing container with the same name is force-removed first. The id is
|
||||||
|
/// the caller's handle for liveness checks and targeted replacement.
|
||||||
|
async fn start_service(&self, spec: &ContainerSpec) -> Result<String, ExecutorError>;
|
||||||
|
|
||||||
|
/// Observe the container with this name: its id + whether it's running.
|
||||||
|
/// `None` if no such container exists.
|
||||||
|
async fn container_status(
|
||||||
|
&self,
|
||||||
|
name: &str,
|
||||||
|
) -> Result<Option<ContainerObservation>, ExecutorError>;
|
||||||
|
|
||||||
|
/// Gracefully stop and remove a container by id or name: send the
|
||||||
|
/// container's stop signal, wait `grace_period_secs`, then SIGKILL
|
||||||
|
/// (podman's stop semantics), then remove. Idempotent — no error if gone.
|
||||||
|
async fn stop_service(
|
||||||
|
&self,
|
||||||
|
id_or_name: &str,
|
||||||
|
grace_period_secs: u64,
|
||||||
|
) -> Result<(), ExecutorError>;
|
||||||
|
|
||||||
/// Stop and remove the container with this name if it exists. No error
|
/// Stop and remove the container with this name if it exists. No error
|
||||||
/// if the container does not exist — idempotent.
|
/// if the container does not exist — idempotent.
|
||||||
async fn remove_service(&self, name: &str) -> Result<(), ExecutorError>;
|
async fn remove_service(&self, name: &str) -> Result<(), ExecutorError>;
|
||||||
@@ -38,6 +61,37 @@ pub trait ContainerRuntime: Send + Sync {
|
|||||||
async fn list_managed_services(&self) -> Result<Vec<ContainerState>, ExecutorError>;
|
async fn list_managed_services(&self) -> Result<Vec<ContainerState>, ExecutorError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Graceful-shutdown policy for a deployment's container during a roll-forward
|
||||||
|
/// upgrade (v0.3 Ch5). Mirrors K8s/`docker stop` semantics: send `stop_signal`,
|
||||||
|
/// wait `grace_period_secs`, then SIGKILL (podman's stop always force-kills
|
||||||
|
/// after the timeout — there's no "never kill" knob, so we don't pretend one).
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct LifecyclePolicy {
|
||||||
|
/// Signal sent first to ask the process to exit cleanly (e.g. `SIGTERM`).
|
||||||
|
pub stop_signal: String,
|
||||||
|
/// Seconds to wait for a clean exit before SIGKILL.
|
||||||
|
pub grace_period_secs: u64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for LifecyclePolicy {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
stop_signal: "SIGTERM".to_string(),
|
||||||
|
grace_period_secs: 30,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A container's identity + liveness, as observed on the runtime.
|
||||||
|
#[derive(Debug, Clone, PartialEq, Eq)]
|
||||||
|
pub struct ContainerObservation {
|
||||||
|
/// Runtime container id.
|
||||||
|
pub id: String,
|
||||||
|
/// Whether it's currently running.
|
||||||
|
pub running: bool,
|
||||||
|
}
|
||||||
|
|
||||||
/// A single environment variable, name/value pair.
|
/// A single environment variable, name/value pair.
|
||||||
///
|
///
|
||||||
/// Modeled as a struct rather than `(String, String)` because k8s CRD
|
/// Modeled as a struct rather than `(String, String)` because k8s CRD
|
||||||
@@ -98,6 +152,11 @@ pub struct ContainerSpec {
|
|||||||
/// Restart policy on container exit. Mirrors podman/docker semantics.
|
/// Restart policy on container exit. Mirrors podman/docker semantics.
|
||||||
#[serde(default)]
|
#[serde(default)]
|
||||||
pub restart_policy: RestartPolicy,
|
pub restart_policy: RestartPolicy,
|
||||||
|
/// Signal sent on graceful stop (from the deployment's `LifecyclePolicy`).
|
||||||
|
/// Baked into the container at create time so `podman stop` uses it. `None`
|
||||||
|
/// = the image/podman default (`SIGTERM`).
|
||||||
|
#[serde(default)]
|
||||||
|
pub stop_signal: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl ContainerSpec {
|
impl ContainerSpec {
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ use crate::{
|
|||||||
data::Version,
|
data::Version,
|
||||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||||
inventory::Inventory,
|
inventory::Inventory,
|
||||||
topology::{ContainerRuntime, ContainerSpec, Topology},
|
topology::{ContainerRuntime, Topology},
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::score::PodmanV0Score;
|
use super::score::PodmanV0Score;
|
||||||
@@ -54,15 +54,7 @@ impl<T: Topology + ContainerRuntime> Interpret<T> for PodmanV0Interpret {
|
|||||||
topology: &T,
|
topology: &T,
|
||||||
) -> Result<Outcome, InterpretError> {
|
) -> Result<Outcome, InterpretError> {
|
||||||
for service in &self.score.services {
|
for service in &self.score.services {
|
||||||
let spec = ContainerSpec {
|
let spec = self.score.container_spec(service);
|
||||||
name: service.name.clone(),
|
|
||||||
image: service.image.clone(),
|
|
||||||
ports: service.ports.clone(),
|
|
||||||
labels: vec![(DEPLOYMENT_LABEL.to_string(), self.score.deployment_label())],
|
|
||||||
env: service.env.clone(),
|
|
||||||
volumes: service.volumes.clone(),
|
|
||||||
restart_policy: service.restart_policy,
|
|
||||||
};
|
|
||||||
topology.ensure_service_running(&spec).await.map_err(|e| {
|
topology.ensure_service_running(&spec).await.map_err(|e| {
|
||||||
InterpretError::new(format!(
|
InterpretError::new(format!(
|
||||||
"PodmanV0: ensure_service_running({}) failed: {e}",
|
"PodmanV0: ensure_service_running({}) failed: {e}",
|
||||||
|
|||||||
@@ -13,10 +13,13 @@ use serde::{Deserialize, Serialize};
|
|||||||
use crate::{
|
use crate::{
|
||||||
interpret::Interpret,
|
interpret::Interpret,
|
||||||
score::Score,
|
score::Score,
|
||||||
topology::{ContainerRuntime, EnvVar, RestartPolicy, Topology, VolumeMount},
|
topology::{
|
||||||
|
ContainerRuntime, ContainerSpec, EnvVar, LifecyclePolicy, RestartPolicy, Topology,
|
||||||
|
VolumeMount,
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
use super::interpret::PodmanV0Interpret;
|
use super::interpret::{DEPLOYMENT_LABEL, PodmanV0Interpret};
|
||||||
|
|
||||||
/// A single container managed by podman on the target host.
|
/// A single container managed by podman on the target host.
|
||||||
///
|
///
|
||||||
@@ -46,9 +49,36 @@ pub struct PodmanService {
|
|||||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq, Eq)]
|
||||||
pub struct PodmanV0Score {
|
pub struct PodmanV0Score {
|
||||||
pub services: Vec<PodmanService>,
|
pub services: Vec<PodmanService>,
|
||||||
|
/// Graceful-shutdown policy used when a changed Score replaces a running
|
||||||
|
/// container (v0.3 Ch5 roll-forward upgrade). `None` = the default
|
||||||
|
/// (SIGTERM, 30s grace, SIGKILL fallback). Defaults so older Deployment CRs
|
||||||
|
/// without the field deserialize unchanged.
|
||||||
|
#[serde(default)]
|
||||||
|
pub lifecycle: Option<LifecyclePolicy>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PodmanV0Score {
|
impl PodmanV0Score {
|
||||||
|
/// The effective lifecycle policy (configured, or the default).
|
||||||
|
pub fn lifecycle_policy(&self) -> LifecyclePolicy {
|
||||||
|
self.lifecycle.clone().unwrap_or_default()
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Build the [`ContainerSpec`] for one service: image/ports/env/volumes plus
|
||||||
|
/// the deployment label and the lifecycle stop signal. One place so the
|
||||||
|
/// interpret and the agent reconciler agree on the mapping.
|
||||||
|
pub fn container_spec(&self, service: &PodmanService) -> ContainerSpec {
|
||||||
|
ContainerSpec {
|
||||||
|
name: service.name.clone(),
|
||||||
|
image: service.image.clone(),
|
||||||
|
ports: service.ports.clone(),
|
||||||
|
labels: vec![(DEPLOYMENT_LABEL.to_string(), self.deployment_label())],
|
||||||
|
env: service.env.clone(),
|
||||||
|
volumes: service.volumes.clone(),
|
||||||
|
restart_policy: service.restart_policy,
|
||||||
|
stop_signal: self.lifecycle.as_ref().map(|l| l.stop_signal.clone()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/// Label value applied to every container this Score creates. The
|
/// Label value applied to every container this Score creates. The
|
||||||
/// caller uses the label to enumerate the set of containers that
|
/// caller uses the label to enumerate the set of containers that
|
||||||
/// belong to a given Score instance (e.g. to tear them down when
|
/// belong to a given Score instance (e.g. to tear them down when
|
||||||
@@ -122,6 +152,7 @@ mod tests {
|
|||||||
ports: vec!["8080:80".to_string()],
|
ports: vec!["8080:80".to_string()],
|
||||||
..svc("web", "nginx:latest")
|
..svc("web", "nginx:latest")
|
||||||
}],
|
}],
|
||||||
|
lifecycle: None,
|
||||||
});
|
});
|
||||||
let json = serde_json::to_string(&score).unwrap();
|
let json = serde_json::to_string(&score).unwrap();
|
||||||
assert!(json.contains("\"type\":\"PodmanV0\""));
|
assert!(json.contains("\"type\":\"PodmanV0\""));
|
||||||
@@ -148,6 +179,7 @@ mod tests {
|
|||||||
..svc("api", "myapp:1.0")
|
..svc("api", "myapp:1.0")
|
||||||
},
|
},
|
||||||
],
|
],
|
||||||
|
lifecycle: None,
|
||||||
});
|
});
|
||||||
let serialized = serde_json::to_string(&score).unwrap();
|
let serialized = serde_json::to_string(&score).unwrap();
|
||||||
let deserialized: ReconcileScore = serde_json::from_str(&serialized).unwrap();
|
let deserialized: ReconcileScore = serde_json::from_str(&serialized).unwrap();
|
||||||
@@ -229,6 +261,7 @@ mod tests {
|
|||||||
fn deployment_label_joins_service_names() {
|
fn deployment_label_joins_service_names() {
|
||||||
let score = PodmanV0Score {
|
let score = PodmanV0Score {
|
||||||
services: vec![svc("web", "nginx"), svc("api", "myapp")],
|
services: vec![svc("web", "nginx"), svc("api", "myapp")],
|
||||||
|
lifecycle: None,
|
||||||
};
|
};
|
||||||
assert_eq!(score.deployment_label(), "web,api");
|
assert_eq!(score.deployment_label(), "web,api");
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -12,8 +12,8 @@ use podman_api::opts::{
|
|||||||
};
|
};
|
||||||
|
|
||||||
use crate::domain::topology::{
|
use crate::domain::topology::{
|
||||||
ContainerRuntime, ContainerSpec, ContainerState, PreparationError, PreparationOutcome,
|
ContainerObservation, ContainerRuntime, ContainerSpec, ContainerState, PreparationError,
|
||||||
RestartPolicy, Topology, VolumeMount,
|
PreparationOutcome, RestartPolicy, Topology, VolumeMount,
|
||||||
};
|
};
|
||||||
use crate::executors::ExecutorError;
|
use crate::executors::ExecutorError;
|
||||||
|
|
||||||
@@ -97,49 +97,11 @@ impl PodmanTopology {
|
|||||||
Err(e) => Err(to_exec_error(e)),
|
Err(e) => Err(to_exec_error(e)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl Topology for PodmanTopology {
|
|
||||||
fn name(&self) -> &str {
|
|
||||||
&self.name
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
|
|
||||||
// A quick ping — calling info() is the cheapest endpoint that
|
|
||||||
// exercises the socket end-to-end.
|
|
||||||
self.podman
|
|
||||||
.info()
|
|
||||||
.await
|
|
||||||
.map_err(|e| PreparationError::new(format!("podman socket unreachable: {e}")))?;
|
|
||||||
Ok(PreparationOutcome::Noop)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl ContainerRuntime for PodmanTopology {
|
|
||||||
async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError> {
|
|
||||||
let existing = self.get_by_name(&spec.name).await?;
|
|
||||||
|
|
||||||
if let Some(existing) = existing.as_ref() {
|
|
||||||
if matches_spec(existing, spec) {
|
|
||||||
let running = existing.state.as_deref() == Some("running");
|
|
||||||
if running {
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
// Same spec, just not running — start it rather than recreate.
|
|
||||||
let id = existing.id.clone().unwrap_or_else(|| spec.name.clone());
|
|
||||||
self.containers()
|
|
||||||
.get(id)
|
|
||||||
.start(None)
|
|
||||||
.await
|
|
||||||
.map_err(to_exec_error)?;
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
// Drift — remove and recreate below.
|
|
||||||
self.remove_container(&spec.name).await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
|
/// Create + start a container from `spec`, returning its id. The container's
|
||||||
|
/// stop signal + stop timeout are baked in here so a later `podman stop`
|
||||||
|
/// honors the deployment's lifecycle policy.
|
||||||
|
async fn create_and_start(&self, spec: &ContainerSpec) -> Result<String, ExecutorError> {
|
||||||
self.ensure_image_present(&spec.image).await?;
|
self.ensure_image_present(&spec.image).await?;
|
||||||
|
|
||||||
let mut labels = HashMap::new();
|
let mut labels = HashMap::new();
|
||||||
@@ -171,34 +133,94 @@ impl ContainerRuntime for PodmanTopology {
|
|||||||
.portmappings(port_mappings)
|
.portmappings(port_mappings)
|
||||||
.env(env_map)
|
.env(env_map)
|
||||||
.restart_policy(map_restart_policy(spec.restart_policy));
|
.restart_policy(map_restart_policy(spec.restart_policy));
|
||||||
|
if let Some(sig) = spec.stop_signal.as_deref() {
|
||||||
|
builder = builder.stop_signal(signal_number(sig));
|
||||||
|
}
|
||||||
if !mounts.is_empty() {
|
if !mounts.is_empty() {
|
||||||
builder = builder.mounts(mounts);
|
builder = builder.mounts(mounts);
|
||||||
}
|
}
|
||||||
let opts = builder.build();
|
|
||||||
|
|
||||||
let created = self
|
let created = self
|
||||||
.containers()
|
.containers()
|
||||||
.create(&opts)
|
.create(&builder.build())
|
||||||
.await
|
.await
|
||||||
.map_err(to_exec_error)?;
|
.map_err(to_exec_error)?;
|
||||||
|
let id = created.id;
|
||||||
self.containers()
|
self.containers()
|
||||||
.get(created.id.clone())
|
.get(id.clone())
|
||||||
.start(None)
|
.start(None)
|
||||||
.await
|
.await
|
||||||
.map_err(to_exec_error)?;
|
.map_err(to_exec_error)?;
|
||||||
Ok(())
|
Ok(id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Topology for PodmanTopology {
|
||||||
|
fn name(&self) -> &str {
|
||||||
|
&self.name
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn remove_service(&self, name: &str) -> Result<(), ExecutorError> {
|
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
|
||||||
// First try a graceful stop; ignore failures so removal still happens.
|
// A quick ping — calling info() is the cheapest endpoint that
|
||||||
|
// exercises the socket end-to-end.
|
||||||
|
self.podman
|
||||||
|
.info()
|
||||||
|
.await
|
||||||
|
.map_err(|e| PreparationError::new(format!("podman socket unreachable: {e}")))?;
|
||||||
|
Ok(PreparationOutcome::Noop)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl ContainerRuntime for PodmanTopology {
|
||||||
|
async fn ensure_service_running(&self, spec: &ContainerSpec) -> Result<(), ExecutorError> {
|
||||||
|
// Liveness-only: never compare the spec (that's the caller's job via
|
||||||
|
// byte-compare of the desired state). A container that's already up
|
||||||
|
// stays up — no per-tick recreate, no flap.
|
||||||
|
match self.container_status(&spec.name).await? {
|
||||||
|
Some(obs) if obs.running => Ok(()),
|
||||||
|
_ => self.start_service(spec).await.map(|_| ()),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn start_service(&self, spec: &ContainerSpec) -> Result<String, ExecutorError> {
|
||||||
|
// Force-remove any stale container by this name so create can't collide.
|
||||||
|
self.remove_container(&spec.name).await?;
|
||||||
|
self.create_and_start(spec).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn container_status(
|
||||||
|
&self,
|
||||||
|
name: &str,
|
||||||
|
) -> Result<Option<ContainerObservation>, ExecutorError> {
|
||||||
|
let Some(c) = self.get_by_name(name).await? else {
|
||||||
|
return Ok(None);
|
||||||
|
};
|
||||||
|
Ok(Some(ContainerObservation {
|
||||||
|
id: c.id.clone().unwrap_or_default(),
|
||||||
|
running: c.state.as_deref() == Some("running"),
|
||||||
|
}))
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn stop_service(
|
||||||
|
&self,
|
||||||
|
id_or_name: &str,
|
||||||
|
grace_period_secs: u64,
|
||||||
|
) -> Result<(), ExecutorError> {
|
||||||
|
// Graceful: send the container's stop signal, wait `grace`, then podman
|
||||||
|
// SIGKILLs. Ignore stop failures so removal still happens.
|
||||||
let stop_opts = ContainerStopOpts::builder()
|
let stop_opts = ContainerStopOpts::builder()
|
||||||
.timeout(STOP_TIMEOUT.as_secs() as usize)
|
.timeout(grace_period_secs as usize)
|
||||||
.build();
|
.build();
|
||||||
let container = self.containers().get(name);
|
let container = self.containers().get(id_or_name);
|
||||||
if container.exists().await.unwrap_or(false) {
|
if container.exists().await.unwrap_or(false) {
|
||||||
let _ = container.stop(&stop_opts).await;
|
let _ = container.stop(&stop_opts).await;
|
||||||
}
|
}
|
||||||
self.remove_container(name).await
|
self.remove_container(id_or_name).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn remove_service(&self, name: &str) -> Result<(), ExecutorError> {
|
||||||
|
self.stop_service(name, STOP_TIMEOUT.as_secs()).await
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn list_managed_services(&self) -> Result<Vec<ContainerState>, ExecutorError> {
|
async fn list_managed_services(&self) -> Result<Vec<ContainerState>, ExecutorError> {
|
||||||
@@ -267,71 +289,19 @@ fn parse_port_mapping(raw: &str) -> Result<PortMapping, ExecutorError> {
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
fn matches_spec(observed: &podman_api::models::ListContainer, spec: &ContainerSpec) -> bool {
|
/// Map a signal name (`SIGTERM`) to its number for podman's `stop_signal`
|
||||||
let same_image = observed
|
/// create option. Unknown names fall back to SIGTERM (15) — the safe default.
|
||||||
.image
|
fn signal_number(name: &str) -> i64 {
|
||||||
.as_deref()
|
match name.trim().to_uppercase().trim_start_matches("SIG") {
|
||||||
.map(|i| i == spec.image)
|
"HUP" => 1,
|
||||||
.unwrap_or(false);
|
"INT" => 2,
|
||||||
if !same_image {
|
"QUIT" => 3,
|
||||||
return false;
|
"KILL" => 9,
|
||||||
|
"USR1" => 10,
|
||||||
|
"USR2" => 12,
|
||||||
|
"TERM" => 15,
|
||||||
|
other => other.parse().unwrap_or(15),
|
||||||
}
|
}
|
||||||
let observed_ports = observed.ports.as_deref().unwrap_or(&[]);
|
|
||||||
if observed_ports.len() != spec.ports.len() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
for raw in &spec.ports {
|
|
||||||
let Ok(expected) = parse_port_mapping(raw) else {
|
|
||||||
return false;
|
|
||||||
};
|
|
||||||
let present = observed_ports.iter().any(|p| {
|
|
||||||
p.host_port == expected.host_port && p.container_port == expected.container_port
|
|
||||||
});
|
|
||||||
if !present {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
// FIXME(redeploy-loop): this branch makes the agent's periodic
|
|
||||||
// reconcile non-idempotent for any non-trivial Deployment.
|
|
||||||
// Symptom: a service with env or volumes is destroyed and
|
|
||||||
// recreated every 30s tick (RECONCILE_INTERVAL), even when the
|
|
||||||
// observed container is already correct — operators see flapping
|
|
||||||
// container IDs, intermittent connectivity blips, log noise.
|
|
||||||
//
|
|
||||||
// Root cause: `podman list` (v5.x) doesn't surface env or mounts,
|
|
||||||
// so we can't compare them; the original author chose to declare
|
|
||||||
// "any spec with env/volumes is drifted" as a fail-safe. That's
|
|
||||||
// the wrong default for a polling reconciler — it weaponizes the
|
|
||||||
// poll into a re-creation loop.
|
|
||||||
//
|
|
||||||
// Right fix (out of scope for the demo, in scope for delivery):
|
|
||||||
// 1. Switch this code path to `containers.get(name).inspect()`
|
|
||||||
// which DOES return env + mounts. Compare structurally.
|
|
||||||
// 2. Treat absent fields on the inspect response as "unchanged",
|
|
||||||
// not "drifted".
|
|
||||||
// 3. Add an integration test that runs ensure_service_running
|
|
||||||
// twice on the same spec and asserts the container ID is
|
|
||||||
// unchanged.
|
|
||||||
//
|
|
||||||
// Layered next: the upcoming health-check addition to
|
|
||||||
// ContainerSpec gives the agent a separate signal to decide
|
|
||||||
// when to recreate (failed health checks → unhealthy → recreate)
|
|
||||||
// independent of the spec-drift check.
|
|
||||||
//
|
|
||||||
// Until fixed: avoid env / volumes in demo-time deployments to
|
|
||||||
// dodge the loop. The hello-web nginx demo doesn't have either,
|
|
||||||
// which is why it's stable.
|
|
||||||
if !spec.env.is_empty() || !spec.volumes.is_empty() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
// Restart policy: ListContainer doesn't surface it directly. We
|
|
||||||
// only force a recreate when the spec explicitly asks for something
|
|
||||||
// other than the default — so unchanged podman-default behaviour
|
|
||||||
// stays a NOOP, and explicit policy changes converge on next apply.
|
|
||||||
if spec.restart_policy != RestartPolicy::default() {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
true
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn volume_to_mount(v: &VolumeMount) -> ContainerMount {
|
fn volume_to_mount(v: &VolumeMount) -> ContainerMount {
|
||||||
|
|||||||
Reference in New Issue
Block a user