Files
harmony/fleet/harmony-fleet-agent/src/main.rs
Jean-Gabriel Gill-Couture dedfa19380
All checks were successful
Run Check Script / check (pull_request) Successful in 2m36s
feat(fleet): graceful roll-forward upgrade + container-ID identity (Ch5)
Fixes the 30s redeploy loop and adds graceful deployment upgrades — same root:
how the agent identifies a running container vs. its desired spec.

Redeploy-loop fix (container identity by id/name, not spec compare):
- `ensure_service_running` is now liveness-only — a running container is a NOOP,
  no spec comparison (podman ps can't read env/volumes, which made the old
  matches_spec recreate env-bearing services every tick). The periodic tick
  adopts-or-restarts by name and never recreates a healthy container.
- Spec changes are detected by the reconciler's existing byte-compare of the
  desired-state JSON, not by the runtime. The agent records each container's id
  (from start_service) in DeploymentState.container_ids.
- Deleted the matches_spec FIXME and its always-drift hack.

Graceful roll-forward upgrade:
- PodmanV0Score.lifecycle: Option<LifecyclePolicy> (SIGTERM, 30s grace, SIGKILL
  fallback). stop_signal baked into the container at create; grace drives
  `podman stop --time`.
- On a changed score the reconciler stops the exact old container by its
  recorded id (graceful), then starts the new one; dropped services are stopped.
  Roll-forward only — a failure reports Phase::Failed, never reverts.

New ContainerRuntime methods: start_service (→id), container_status, stop_service
(graceful). Reconciler is now generic over `dyn ContainerRuntime`, unit-tested
against a FakeRuntime: tick-idempotency (loop killed), graceful-replace-by-id,
roll-forward-no-revert, unchanged-noop.

Architecture + flagged VM v1->v2->v3 e2e in
ROADMAP/fleet_platform/ch5-graceful-upgrade-status.md.
2026-06-05 15:26:57 -04:00

372 lines
15 KiB
Rust

mod command_server;
mod config;
mod fleet_publisher;
mod reconciler;
mod upgrade;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Error, Result};
use clap::Parser;
use config::AgentConfig;
use harmony_fleet_auth::{
CredentialSource, connect_options_with_credentials, credential_source_from_config,
};
// Type alias to keep function signatures readable. The auth callback
// captures one `Arc<CredentialSource>` and clones it per invocation.
type Creds = Arc<CredentialSource>;
use futures_util::StreamExt;
use harmony_reconciler_contracts::{
BUCKET_DESIRED_STATE, Id, InventorySnapshot, desired_state_watch_filter,
};
use harmony::inventory::Inventory;
use harmony::modules::podman::PodmanTopology;
use harmony::topology::Topology;
use crate::command_server::CommandServer;
use crate::fleet_publisher::FleetPublisher;
use crate::reconciler::Reconciler;
/// ROADMAP §5.6 — agent polls podman every 30s as ground truth; KV watch
/// events are accelerators.
const RECONCILE_INTERVAL: Duration = Duration::from_secs(30);
#[derive(Parser)]
#[command(name = "fleet-agent-v0", about = "IoT agent for Raspberry Pi devices")]
struct Cli {
#[arg(
long,
env = "FLEET_AGENT_CONFIG",
// FIXME this should be a constant from a config, not just hardcoded here as we need the
// installation scripts and other bits to know about this file location.
default_value = "/etc/fleet-agent/config.toml"
)]
config: std::path::PathBuf,
/// ADR-022 self-test: load config, connect NATS (validates JWT), print
/// version + "ok", exit 0 — or exit non-zero on any failure. The upgrade
/// state machine runs this on a staged binary before cutover.
#[arg(long)]
self_test: bool,
}
/// ADR-022 `--self-test`: prove this binary can parse its config and reach NATS
/// with a valid JWT, then exit. No state mutation, no long-lived loops.
async fn self_test(cfg: &AgentConfig) -> Result<()> {
let creds = credential_source_from_config(&cfg.credentials)
.context("self-test: building NATS credential source")?;
connect_nats(cfg, creds)
.await
.context("self-test: NATS connect / JWT validation")?;
println!("fleet-agent v{} self-test ok", env!("CARGO_PKG_VERSION"));
Ok(())
}
async fn connect_nats(cfg: &AgentConfig, creds: Creds) -> Result<async_nats::Client> {
let urls = &cfg.nats.urls;
tracing::info!(device_id = %cfg.agent.device_id, "connecting to NATS {urls:?}");
// The auth callback is invoked on every (re)connect, so a fresh
// Zitadel access token is minted automatically when the cached one
// is near-expiry — that's how we hold the "never lose connectivity"
// guarantee even across token rollovers and NATS pod restarts.
let client = connect_options_with_credentials(creds)
.ping_interval(Duration::from_secs(10))
// Surface async-nats's connection lifecycle in our logs. This
// is load-bearing for ops: a device that quietly disconnects
// is exactly the failure mode we promise won't happen, and
// operators need to see the reconnect attempts to debug.
.event_callback(|event| async move {
use async_nats::Event;
match event {
Event::Connected => tracing::info!("NATS connected"),
Event::Disconnected => tracing::warn!("NATS disconnected, will reconnect"),
Event::LameDuckMode => tracing::warn!("NATS server entered lame-duck mode"),
Event::SlowConsumer(sid) => {
tracing::warn!(sid = %sid, "NATS slow consumer")
}
Event::ServerError(e) => tracing::error!(error = %e, "NATS server error"),
Event::ClientError(e) => tracing::error!(error = %e, "NATS client error"),
Event::Closed => tracing::error!("NATS connection closed"),
other => tracing::debug!(?other, "NATS event"),
}
})
.connect(cfg.nats.urls.as_slice())
.await?;
tracing::info!(urls = ?cfg.nats.urls, "connected to NATS");
Ok(client)
}
async fn watch_desired_state(
client: async_nats::Client,
device_id: Id,
reconciler: Arc<Reconciler>,
) -> Result<()> {
let jetstream = async_nats::jetstream::new(client);
let bucket = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_DESIRED_STATE.to_string(),
..Default::default()
})
.await?;
let key_filter = desired_state_watch_filter(&device_id.to_string());
tracing::info!(filter = %key_filter, "watching KV keys");
// `watch_with_history` (DeliverPolicy::LastPerSubject), not `watch`
// (DeliverPolicy::New): on every agent (re)start — including a device
// reboot — we must replay the current desired-state for this device so
// the reconciler re-learns its deployments and restarts their (now
// stopped) containers. Plain `watch` delivers only future Puts, so a
// rebooted device would never bring its containers back until the
// operator happened to rewrite the KV. The apply path is idempotent,
// so replaying already-converged state is a NOOP.
let mut watch = bucket.watch_with_history(&key_filter).await?;
while let Some(result) = watch.next().await {
let entry = match result {
Ok(e) => e,
Err(e) => {
tracing::warn!(error = %e, "watch error");
continue;
}
};
tracing::debug!(key = %entry.key, "bucket watch new value {entry:?}");
match entry.operation {
async_nats::jetstream::kv::Operation::Put => {
if let Err(e) = reconciler.apply(&entry.key, &entry.value).await {
tracing::warn!(key = %entry.key, error = %e, "apply failed");
}
}
async_nats::jetstream::kv::Operation::Delete
| async_nats::jetstream::kv::Operation::Purge => {
if let Err(e) = reconciler.remove(&entry.key).await {
tracing::warn!(key = %entry.key, error = %e, "remove failed");
}
}
}
}
Ok(())
}
/// Tiny liveness-only loop: push a `HeartbeatPayload` into the
/// `device-heartbeat` bucket every N seconds, and fan out the same
/// pulse on `device-state.<device_id>` for live (non-JetStream)
/// observers. Stays separate from per-deployment state writes so
/// routine pings don't churn the device-state bucket or its watch
/// subscribers — but the direct-subject pulse uses ordinary core
/// NATS pub/sub and doesn't accumulate state anywhere.
async fn publish_heartbeat_loop(fleet: Arc<FleetPublisher>) {
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
interval.tick().await;
fleet.publish_heartbeat().await;
fleet.publish_state_pulse().await;
}
}
/// Build a one-shot inventory snapshot at agent startup. Cheap,
/// published alongside every heartbeat until the agent restarts.
/// NOTE: I don't see why this is *published* with every heartbeat, it feels like noise.
/// It shoulf be published on heartbeat only when something changed. It is ok to *check* the state
/// on heartbeat but not always send it over the wire
fn local_inventory(inventory: &Inventory) -> InventorySnapshot {
InventorySnapshot {
hostname: inventory.location.name.clone(),
arch: std::env::consts::ARCH.to_string(),
os: std::env::consts::OS.to_string(),
kernel: std::fs::read_to_string("/proc/sys/kernel/osrelease")
.map(|s| s.trim().to_string())
.unwrap_or_default(),
cpu_cores: std::thread::available_parallelism()
.map(|n| n.get() as u32)
.unwrap_or(0),
memory_mb: sys_memory_total_mb().unwrap_or(0),
agent_version: env!("CARGO_PKG_VERSION").to_string(),
}
}
/// Read total RAM from /proc/meminfo. Returns None on non-Linux or
/// if /proc isn't mounted. Small, avoids a sys-info crate dep for a
/// single field.
fn sys_memory_total_mb() -> Option<u64> {
let s = std::fs::read_to_string("/proc/meminfo").ok()?;
for line in s.lines() {
if let Some(rest) = line.strip_prefix("MemTotal:") {
let kb: u64 = rest.split_whitespace().next()?.parse().ok()?;
return Some(kb / 1024);
}
}
None
}
#[tokio::main]
async fn main() -> Result<()> {
// Default to `info` so the agent produces useful output without
// requiring `RUST_LOG` to be set anywhere — the systemd unit
// installed by `FleetDeviceSetupScore` does set it, but a
// hand-launched binary or a user who's overridden the unit
// shouldn't have to know that. `RUST_LOG` still overrides
// when set (e.g. `RUST_LOG=debug` for troubleshooting).
let filter = tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
tracing_subscriber::fmt().with_env_filter(filter).init();
let cli = Cli::parse();
let cfg = config::load_config(&cli.config)?;
// ADR-022: the upgrade state machine invokes the staged binary with
// `--self-test` before any symlink swap. Exit code is the verdict.
if cli.self_test {
return self_test(&cfg).await;
}
tracing::info!(
device_id = %cfg.agent.device_id,
runtime_enabled = cfg.agent.runtime_enabled,
"fleet-agent-v0 starting",
);
let device_id = cfg.agent.device_id.clone();
// Podman is the agent's runtime backend for deploying workloads.
// When `runtime_enabled = false`, skip the socket entirely so the
// agent can run on hosts that don't ship podman (the in-cluster
// e2e harness deploys the agent as a Pod on containerd-only k3d
// nodes). The command server + heartbeat still run; only the
// reconciler depends on the topology.
let topology = if cfg.agent.runtime_enabled {
let t = Arc::new(
PodmanTopology::from_default_socket()
.map_err(|e| anyhow::anyhow!("failed to open podman socket: {e}"))?,
);
t.ensure_ready().await.context("podman socket not ready")?;
tracing::info!("podman socket ready");
Some(t)
} else {
tracing::warn!(
"runtime_enabled=false; skipping podman + reconciler. \
Desired-state KV deliveries will be logged and dropped."
);
None
};
let inventory = Arc::new(Inventory::from_localhost());
tracing::info!(hostname = %inventory.location.name, "inventory loaded");
let inventory_snapshot = local_inventory(&inventory);
let creds = credential_source_from_config(&cfg.credentials)
.context("building NATS credential source")?;
let client = connect_nats(&cfg, creds).await.map_err(|e| {
let msg = format!("Nats connection FAILED : {e}");
tracing::error!(msg);
Error::msg(msg)
})?;
// Publish surface. Opens the three KV buckets (idempotent
// creates). Must be live before the reconciler starts so
// writes on the first desired-state KV watch land on the wire.
let fleet = Arc::new(
FleetPublisher::connect(client.clone(), device_id.clone())
.await
.context("fleet publisher connect")?,
);
tracing::info!("fleet publisher ready");
// Publish DeviceInfo once at startup. Merge the config-declared
// labels with an always-on `device-id=<id>` default so every
// device is targetable by id even without explicit labels.
// Config labels win on key conflicts — operators can override
// `device-id` if they really want to (unusual but legal).
let mut startup_labels = cfg.labels.clone();
startup_labels
.entry("device-id".to_string())
.or_insert_with(|| device_id.to_string());
fleet
.publish_device_info(startup_labels, Some(inventory_snapshot.clone()))
.await;
// Reconciler exists only when a podman topology is available.
// Without it, the desired-state watch + periodic reconcile arms
// are replaced by pending-forever futures so `select!` only sees
// heartbeat + command server.
let reconciler: Option<Arc<Reconciler>> = topology.as_ref().map(|t| {
Arc::new(Reconciler::new(
device_id.clone(),
t.clone(),
Some(fleet.clone()),
))
});
// Fired by the command server on `Verb::UpgradeStop`; awaited by the
// upgrade manager, which then exits the process for the new version.
let upgrade_stop = Arc::new(upgrade::UpgradeStopSignal::default());
let command_server = Arc::new(CommandServer::new(
device_id.clone(),
client.clone(),
upgrade_stop.clone(),
));
let ctrlc = async {
tokio::signal::ctrl_c().await.ok();
tracing::info!("received SIGINT, shutting down");
};
let sigterm = async {
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?
.recv()
.await;
tracing::info!("received SIGTERM, shutting down");
Ok::<(), anyhow::Error>(())
};
let _ = inventory_snapshot; // consumed by the DeviceInfo publish above
let watch: std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>> =
match reconciler.as_ref() {
Some(r) => Box::pin(watch_desired_state(
client.clone(),
device_id.clone(),
r.clone(),
)),
None => Box::pin(async {
std::future::pending::<()>().await;
Ok(())
}),
};
let reconcile: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> =
match reconciler.as_ref() {
Some(r) => Box::pin(r.clone().run_periodic(RECONCILE_INTERVAL)),
None => Box::pin(std::future::pending::<()>()),
};
let heartbeat = publish_heartbeat_loop(fleet);
let commands = command_server.run();
// ADR-022 self-upgrade manager. Returns only when the operator commits a
// cutover (it sent `UpgradeStop`) — that completes the select and the
// process exits 0, so the already-started new version takes over and
// systemd (Restart=on-failure) does not bring the old one back.
let upgrade = upgrade::run(
client.clone(),
device_id.clone(),
env!("CARGO_PKG_VERSION").to_string(),
upgrade_stop.clone(),
);
tokio::select! {
// Waiting on ctrlc in a select will automatically terminate other branches when
// ctrlc happens.
_ = ctrlc => {},
r = sigterm => { r?; }
r = watch => { r?; }
_ = reconcile => {}
_ = heartbeat => {}
r = commands => { r?; }
r = upgrade => { r?; tracing::info!("agent exiting for upgrade cutover"); }
}
Ok(())
}