All checks were successful
Run Check Script / check (pull_request) Successful in 2m36s
Fixes the 30s redeploy loop and adds graceful deployment upgrades — same root: how the agent identifies a running container vs. its desired spec. Redeploy-loop fix (container identity by id/name, not spec compare): - `ensure_service_running` is now liveness-only — a running container is a NOOP, no spec comparison (podman ps can't read env/volumes, which made the old matches_spec recreate env-bearing services every tick). The periodic tick adopts-or-restarts by name and never recreates a healthy container. - Spec changes are detected by the reconciler's existing byte-compare of the desired-state JSON, not by the runtime. The agent records each container's id (from start_service) in DeploymentState.container_ids. - Deleted the matches_spec FIXME and its always-drift hack. Graceful roll-forward upgrade: - PodmanV0Score.lifecycle: Option<LifecyclePolicy> (SIGTERM, 30s grace, SIGKILL fallback). stop_signal baked into the container at create; grace drives `podman stop --time`. - On a changed score the reconciler stops the exact old container by its recorded id (graceful), then starts the new one; dropped services are stopped. Roll-forward only — a failure reports Phase::Failed, never reverts. New ContainerRuntime methods: start_service (→id), container_status, stop_service (graceful). Reconciler is now generic over `dyn ContainerRuntime`, unit-tested against a FakeRuntime: tick-idempotency (loop killed), graceful-replace-by-id, roll-forward-no-revert, unchanged-noop. Architecture + flagged VM v1->v2->v3 e2e in ROADMAP/fleet_platform/ch5-graceful-upgrade-status.md.
372 lines
15 KiB
Rust
372 lines
15 KiB
Rust
mod command_server;
|
|
mod config;
|
|
mod fleet_publisher;
|
|
mod reconciler;
|
|
mod upgrade;
|
|
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use anyhow::{Context, Error, Result};
|
|
use clap::Parser;
|
|
use config::AgentConfig;
|
|
use harmony_fleet_auth::{
|
|
CredentialSource, connect_options_with_credentials, credential_source_from_config,
|
|
};
|
|
// Type alias to keep function signatures readable. The auth callback
|
|
// captures one `Arc<CredentialSource>` and clones it per invocation.
|
|
type Creds = Arc<CredentialSource>;
|
|
use futures_util::StreamExt;
|
|
use harmony_reconciler_contracts::{
|
|
BUCKET_DESIRED_STATE, Id, InventorySnapshot, desired_state_watch_filter,
|
|
};
|
|
|
|
use harmony::inventory::Inventory;
|
|
use harmony::modules::podman::PodmanTopology;
|
|
use harmony::topology::Topology;
|
|
|
|
use crate::command_server::CommandServer;
|
|
use crate::fleet_publisher::FleetPublisher;
|
|
use crate::reconciler::Reconciler;
|
|
|
|
/// ROADMAP §5.6 — agent polls podman every 30s as ground truth; KV watch
|
|
/// events are accelerators.
|
|
const RECONCILE_INTERVAL: Duration = Duration::from_secs(30);
|
|
|
|
#[derive(Parser)]
|
|
#[command(name = "fleet-agent-v0", about = "IoT agent for Raspberry Pi devices")]
|
|
struct Cli {
|
|
#[arg(
|
|
long,
|
|
env = "FLEET_AGENT_CONFIG",
|
|
// FIXME this should be a constant from a config, not just hardcoded here as we need the
|
|
// installation scripts and other bits to know about this file location.
|
|
default_value = "/etc/fleet-agent/config.toml"
|
|
)]
|
|
config: std::path::PathBuf,
|
|
|
|
/// ADR-022 self-test: load config, connect NATS (validates JWT), print
|
|
/// version + "ok", exit 0 — or exit non-zero on any failure. The upgrade
|
|
/// state machine runs this on a staged binary before cutover.
|
|
#[arg(long)]
|
|
self_test: bool,
|
|
}
|
|
|
|
/// ADR-022 `--self-test`: prove this binary can parse its config and reach NATS
|
|
/// with a valid JWT, then exit. No state mutation, no long-lived loops.
|
|
async fn self_test(cfg: &AgentConfig) -> Result<()> {
|
|
let creds = credential_source_from_config(&cfg.credentials)
|
|
.context("self-test: building NATS credential source")?;
|
|
connect_nats(cfg, creds)
|
|
.await
|
|
.context("self-test: NATS connect / JWT validation")?;
|
|
println!("fleet-agent v{} self-test ok", env!("CARGO_PKG_VERSION"));
|
|
Ok(())
|
|
}
|
|
|
|
async fn connect_nats(cfg: &AgentConfig, creds: Creds) -> Result<async_nats::Client> {
|
|
let urls = &cfg.nats.urls;
|
|
tracing::info!(device_id = %cfg.agent.device_id, "connecting to NATS {urls:?}");
|
|
// The auth callback is invoked on every (re)connect, so a fresh
|
|
// Zitadel access token is minted automatically when the cached one
|
|
// is near-expiry — that's how we hold the "never lose connectivity"
|
|
// guarantee even across token rollovers and NATS pod restarts.
|
|
let client = connect_options_with_credentials(creds)
|
|
.ping_interval(Duration::from_secs(10))
|
|
// Surface async-nats's connection lifecycle in our logs. This
|
|
// is load-bearing for ops: a device that quietly disconnects
|
|
// is exactly the failure mode we promise won't happen, and
|
|
// operators need to see the reconnect attempts to debug.
|
|
.event_callback(|event| async move {
|
|
use async_nats::Event;
|
|
match event {
|
|
Event::Connected => tracing::info!("NATS connected"),
|
|
Event::Disconnected => tracing::warn!("NATS disconnected, will reconnect"),
|
|
Event::LameDuckMode => tracing::warn!("NATS server entered lame-duck mode"),
|
|
Event::SlowConsumer(sid) => {
|
|
tracing::warn!(sid = %sid, "NATS slow consumer")
|
|
}
|
|
Event::ServerError(e) => tracing::error!(error = %e, "NATS server error"),
|
|
Event::ClientError(e) => tracing::error!(error = %e, "NATS client error"),
|
|
Event::Closed => tracing::error!("NATS connection closed"),
|
|
other => tracing::debug!(?other, "NATS event"),
|
|
}
|
|
})
|
|
.connect(cfg.nats.urls.as_slice())
|
|
.await?;
|
|
tracing::info!(urls = ?cfg.nats.urls, "connected to NATS");
|
|
Ok(client)
|
|
}
|
|
|
|
async fn watch_desired_state(
|
|
client: async_nats::Client,
|
|
device_id: Id,
|
|
reconciler: Arc<Reconciler>,
|
|
) -> Result<()> {
|
|
let jetstream = async_nats::jetstream::new(client);
|
|
let bucket = jetstream
|
|
.create_key_value(async_nats::jetstream::kv::Config {
|
|
bucket: BUCKET_DESIRED_STATE.to_string(),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
|
|
let key_filter = desired_state_watch_filter(&device_id.to_string());
|
|
tracing::info!(filter = %key_filter, "watching KV keys");
|
|
|
|
// `watch_with_history` (DeliverPolicy::LastPerSubject), not `watch`
|
|
// (DeliverPolicy::New): on every agent (re)start — including a device
|
|
// reboot — we must replay the current desired-state for this device so
|
|
// the reconciler re-learns its deployments and restarts their (now
|
|
// stopped) containers. Plain `watch` delivers only future Puts, so a
|
|
// rebooted device would never bring its containers back until the
|
|
// operator happened to rewrite the KV. The apply path is idempotent,
|
|
// so replaying already-converged state is a NOOP.
|
|
let mut watch = bucket.watch_with_history(&key_filter).await?;
|
|
while let Some(result) = watch.next().await {
|
|
let entry = match result {
|
|
Ok(e) => e,
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "watch error");
|
|
continue;
|
|
}
|
|
};
|
|
|
|
tracing::debug!(key = %entry.key, "bucket watch new value {entry:?}");
|
|
|
|
match entry.operation {
|
|
async_nats::jetstream::kv::Operation::Put => {
|
|
if let Err(e) = reconciler.apply(&entry.key, &entry.value).await {
|
|
tracing::warn!(key = %entry.key, error = %e, "apply failed");
|
|
}
|
|
}
|
|
async_nats::jetstream::kv::Operation::Delete
|
|
| async_nats::jetstream::kv::Operation::Purge => {
|
|
if let Err(e) = reconciler.remove(&entry.key).await {
|
|
tracing::warn!(key = %entry.key, error = %e, "remove failed");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Tiny liveness-only loop: push a `HeartbeatPayload` into the
|
|
/// `device-heartbeat` bucket every N seconds, and fan out the same
|
|
/// pulse on `device-state.<device_id>` for live (non-JetStream)
|
|
/// observers. Stays separate from per-deployment state writes so
|
|
/// routine pings don't churn the device-state bucket or its watch
|
|
/// subscribers — but the direct-subject pulse uses ordinary core
|
|
/// NATS pub/sub and doesn't accumulate state anywhere.
|
|
async fn publish_heartbeat_loop(fleet: Arc<FleetPublisher>) {
|
|
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
|
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
|
|
loop {
|
|
interval.tick().await;
|
|
fleet.publish_heartbeat().await;
|
|
fleet.publish_state_pulse().await;
|
|
}
|
|
}
|
|
|
|
/// Build a one-shot inventory snapshot at agent startup. Cheap,
|
|
/// published alongside every heartbeat until the agent restarts.
|
|
/// NOTE: I don't see why this is *published* with every heartbeat, it feels like noise.
|
|
/// It shoulf be published on heartbeat only when something changed. It is ok to *check* the state
|
|
/// on heartbeat but not always send it over the wire
|
|
fn local_inventory(inventory: &Inventory) -> InventorySnapshot {
|
|
InventorySnapshot {
|
|
hostname: inventory.location.name.clone(),
|
|
arch: std::env::consts::ARCH.to_string(),
|
|
os: std::env::consts::OS.to_string(),
|
|
kernel: std::fs::read_to_string("/proc/sys/kernel/osrelease")
|
|
.map(|s| s.trim().to_string())
|
|
.unwrap_or_default(),
|
|
cpu_cores: std::thread::available_parallelism()
|
|
.map(|n| n.get() as u32)
|
|
.unwrap_or(0),
|
|
memory_mb: sys_memory_total_mb().unwrap_or(0),
|
|
agent_version: env!("CARGO_PKG_VERSION").to_string(),
|
|
}
|
|
}
|
|
|
|
/// Read total RAM from /proc/meminfo. Returns None on non-Linux or
|
|
/// if /proc isn't mounted. Small, avoids a sys-info crate dep for a
|
|
/// single field.
|
|
fn sys_memory_total_mb() -> Option<u64> {
|
|
let s = std::fs::read_to_string("/proc/meminfo").ok()?;
|
|
for line in s.lines() {
|
|
if let Some(rest) = line.strip_prefix("MemTotal:") {
|
|
let kb: u64 = rest.split_whitespace().next()?.parse().ok()?;
|
|
return Some(kb / 1024);
|
|
}
|
|
}
|
|
None
|
|
}
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<()> {
|
|
// Default to `info` so the agent produces useful output without
|
|
// requiring `RUST_LOG` to be set anywhere — the systemd unit
|
|
// installed by `FleetDeviceSetupScore` does set it, but a
|
|
// hand-launched binary or a user who's overridden the unit
|
|
// shouldn't have to know that. `RUST_LOG` still overrides
|
|
// when set (e.g. `RUST_LOG=debug` for troubleshooting).
|
|
let filter = tracing_subscriber::EnvFilter::try_from_default_env()
|
|
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
|
|
tracing_subscriber::fmt().with_env_filter(filter).init();
|
|
|
|
let cli = Cli::parse();
|
|
let cfg = config::load_config(&cli.config)?;
|
|
|
|
// ADR-022: the upgrade state machine invokes the staged binary with
|
|
// `--self-test` before any symlink swap. Exit code is the verdict.
|
|
if cli.self_test {
|
|
return self_test(&cfg).await;
|
|
}
|
|
|
|
tracing::info!(
|
|
device_id = %cfg.agent.device_id,
|
|
runtime_enabled = cfg.agent.runtime_enabled,
|
|
"fleet-agent-v0 starting",
|
|
);
|
|
|
|
let device_id = cfg.agent.device_id.clone();
|
|
|
|
// Podman is the agent's runtime backend for deploying workloads.
|
|
// When `runtime_enabled = false`, skip the socket entirely so the
|
|
// agent can run on hosts that don't ship podman (the in-cluster
|
|
// e2e harness deploys the agent as a Pod on containerd-only k3d
|
|
// nodes). The command server + heartbeat still run; only the
|
|
// reconciler depends on the topology.
|
|
let topology = if cfg.agent.runtime_enabled {
|
|
let t = Arc::new(
|
|
PodmanTopology::from_default_socket()
|
|
.map_err(|e| anyhow::anyhow!("failed to open podman socket: {e}"))?,
|
|
);
|
|
t.ensure_ready().await.context("podman socket not ready")?;
|
|
tracing::info!("podman socket ready");
|
|
Some(t)
|
|
} else {
|
|
tracing::warn!(
|
|
"runtime_enabled=false; skipping podman + reconciler. \
|
|
Desired-state KV deliveries will be logged and dropped."
|
|
);
|
|
None
|
|
};
|
|
|
|
let inventory = Arc::new(Inventory::from_localhost());
|
|
tracing::info!(hostname = %inventory.location.name, "inventory loaded");
|
|
let inventory_snapshot = local_inventory(&inventory);
|
|
|
|
let creds = credential_source_from_config(&cfg.credentials)
|
|
.context("building NATS credential source")?;
|
|
|
|
let client = connect_nats(&cfg, creds).await.map_err(|e| {
|
|
let msg = format!("Nats connection FAILED : {e}");
|
|
tracing::error!(msg);
|
|
Error::msg(msg)
|
|
})?;
|
|
|
|
// Publish surface. Opens the three KV buckets (idempotent
|
|
// creates). Must be live before the reconciler starts so
|
|
// writes on the first desired-state KV watch land on the wire.
|
|
let fleet = Arc::new(
|
|
FleetPublisher::connect(client.clone(), device_id.clone())
|
|
.await
|
|
.context("fleet publisher connect")?,
|
|
);
|
|
tracing::info!("fleet publisher ready");
|
|
|
|
// Publish DeviceInfo once at startup. Merge the config-declared
|
|
// labels with an always-on `device-id=<id>` default so every
|
|
// device is targetable by id even without explicit labels.
|
|
// Config labels win on key conflicts — operators can override
|
|
// `device-id` if they really want to (unusual but legal).
|
|
let mut startup_labels = cfg.labels.clone();
|
|
startup_labels
|
|
.entry("device-id".to_string())
|
|
.or_insert_with(|| device_id.to_string());
|
|
fleet
|
|
.publish_device_info(startup_labels, Some(inventory_snapshot.clone()))
|
|
.await;
|
|
|
|
// Reconciler exists only when a podman topology is available.
|
|
// Without it, the desired-state watch + periodic reconcile arms
|
|
// are replaced by pending-forever futures so `select!` only sees
|
|
// heartbeat + command server.
|
|
let reconciler: Option<Arc<Reconciler>> = topology.as_ref().map(|t| {
|
|
Arc::new(Reconciler::new(
|
|
device_id.clone(),
|
|
t.clone(),
|
|
Some(fleet.clone()),
|
|
))
|
|
});
|
|
|
|
// Fired by the command server on `Verb::UpgradeStop`; awaited by the
|
|
// upgrade manager, which then exits the process for the new version.
|
|
let upgrade_stop = Arc::new(upgrade::UpgradeStopSignal::default());
|
|
let command_server = Arc::new(CommandServer::new(
|
|
device_id.clone(),
|
|
client.clone(),
|
|
upgrade_stop.clone(),
|
|
));
|
|
|
|
let ctrlc = async {
|
|
tokio::signal::ctrl_c().await.ok();
|
|
tracing::info!("received SIGINT, shutting down");
|
|
};
|
|
let sigterm = async {
|
|
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())?
|
|
.recv()
|
|
.await;
|
|
tracing::info!("received SIGTERM, shutting down");
|
|
Ok::<(), anyhow::Error>(())
|
|
};
|
|
|
|
let _ = inventory_snapshot; // consumed by the DeviceInfo publish above
|
|
|
|
let watch: std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send>> =
|
|
match reconciler.as_ref() {
|
|
Some(r) => Box::pin(watch_desired_state(
|
|
client.clone(),
|
|
device_id.clone(),
|
|
r.clone(),
|
|
)),
|
|
None => Box::pin(async {
|
|
std::future::pending::<()>().await;
|
|
Ok(())
|
|
}),
|
|
};
|
|
let reconcile: std::pin::Pin<Box<dyn std::future::Future<Output = ()> + Send>> =
|
|
match reconciler.as_ref() {
|
|
Some(r) => Box::pin(r.clone().run_periodic(RECONCILE_INTERVAL)),
|
|
None => Box::pin(std::future::pending::<()>()),
|
|
};
|
|
let heartbeat = publish_heartbeat_loop(fleet);
|
|
let commands = command_server.run();
|
|
// ADR-022 self-upgrade manager. Returns only when the operator commits a
|
|
// cutover (it sent `UpgradeStop`) — that completes the select and the
|
|
// process exits 0, so the already-started new version takes over and
|
|
// systemd (Restart=on-failure) does not bring the old one back.
|
|
let upgrade = upgrade::run(
|
|
client.clone(),
|
|
device_id.clone(),
|
|
env!("CARGO_PKG_VERSION").to_string(),
|
|
upgrade_stop.clone(),
|
|
);
|
|
|
|
tokio::select! {
|
|
// Waiting on ctrlc in a select will automatically terminate other branches when
|
|
// ctrlc happens.
|
|
_ = ctrlc => {},
|
|
r = sigterm => { r?; }
|
|
r = watch => { r?; }
|
|
_ = reconcile => {}
|
|
_ = heartbeat => {}
|
|
r = commands => { r?; }
|
|
r = upgrade => { r?; tracing::info!("agent exiting for upgrade cutover"); }
|
|
}
|
|
|
|
Ok(())
|
|
}
|