feat(fleet): agent self-upgrade + auto-rollback protocol, ADR-022 (Ch4) #330

Open
johnride wants to merge 1 commits from feat/fleet-ch4-agent-upgrade into feat/fleet-ch3-log-streaming
26 changed files with 1477 additions and 320 deletions

20
Cargo.lock generated
View File

@@ -4066,6 +4066,7 @@ dependencies = [
"harmony",
"harmony-fleet-auth",
"harmony-reconciler-contracts",
"harmony_downloadable_asset",
"serde",
"serde_json",
"thiserror 2.0.18",
@@ -4073,6 +4074,7 @@ dependencies = [
"toml",
"tracing",
"tracing-subscriber",
"url",
]
[[package]]
@@ -4390,6 +4392,20 @@ dependencies = [
"syn 2.0.117",
]
[[package]]
name = "harmony_downloadable_asset"
version = "0.1.0"
dependencies = [
"futures-util",
"httptest",
"reqwest 0.12.28",
"sha2 0.10.9",
"thiserror 2.0.18",
"tokio",
"tracing",
"url",
]
[[package]]
name = "harmony_execution"
version = "0.1.0"
@@ -5507,15 +5523,13 @@ version = "0.1.0"
dependencies = [
"async-trait",
"env_logger",
"futures-util",
"httptest",
"harmony_downloadable_asset",
"kube",
"log",
"octocrab",
"pretty_assertions",
"regex",
"reqwest 0.12.28",
"sha2 0.10.9",
"tokio",
"url",
]

View File

@@ -5,6 +5,7 @@ members = [
"private_repos/*",
"harmony",
"harmony_zitadel_auth",
"harmony_downloadable_asset",
"harmony_types",
"harmony_macros",
"harmony_tui",

View File

@@ -0,0 +1,48 @@
# Ch4 — Agent self-upgrade + auto-rollback (ADR-022): status
Built the full ADR-022 protocol end to end. The state-machine "brain" and the
operator's commit decision are exhaustively unit-tested; the OS side-effects sit
behind a seam so they're faked in tests and real on-device.
## Shipped
| Piece | Where | Tested |
|---|---|---|
| Wire types: marker, phase, status, `agent_version` on heartbeat, `Verb::UpgradeStop` | `harmony-reconciler-contracts/src/upgrade.rs`, `kv.rs`, `commands.rs`, `fleet.rs` | unit |
| Shared download+SHA-256 verify (lifted from k3d) | new crate `harmony_downloadable_asset` | unit (httptest) |
| Agent state machine `drive` (Staging→Verifying→CutoverReady→stop/revert) | `harmony-fleet-agent/src/upgrade.rs` | **6 unit tests** incl. timeout-revert, stage/self-test/cutover failure |
| `UpgradeExecutor` seam + real `SystemdUpgradeExecutor` (download, `--self-test`, atomic symlink swap, `systemd-run` transient unit, revert) | same | seam fake-tested; real impl self-heals layout |
| `--self-test` flag | `harmony-fleet-agent/src/main.rs` | — |
| `Verb::UpgradeStop` handling + armed `UpgradeStopSignal` (only the cutover-waiting old agent acts) | `command_server.rs`, `upgrade.rs` | — |
| Operator coordinator: send stop **only after** observing the new version's heartbeat; reflect version + phase to the `Device` CR | `harmony-fleet-operator/src/upgrade_coordinator.rs` | **2 unit tests** on the commit decision |
| `FleetCommandsClient::upgrade_stop` | `commands.rs` | — |
| `Device.status.{currentVersion, upgrade}` | `crd.rs` | — |
Load-bearing properties from the ADR are intact: old verifies new
(`--self-test`); operator commits the stop (single source of truth, never the
agent); rollback is the same code path (revert symlink + stop transient unit on
self-test failure / heartbeat-timeout); no version is GC'd.
## Deviations (deliberate)
- **Marker + status ride NATS KV** (`agent-upgrade` / `agent-upgrade-status`),
not a fire-and-forget subject, so they survive an operator restart — same
ethos as Ch2. The ADR's `device-cmd.*`/`device-state.*.upgrade` subjects map
onto: the existing command protocol (`Verb::UpgradeStop`) and the status KV.
- **First-upgrade rollback without M1.** The real executor `capture_revert_target`
preserves the running binary at its versioned path on first cutover even when
the initial install put a plain file at `/usr/local/bin/fleet-agent`. This
makes M1 a clean-install nicety, not a rollback-correctness prerequisite.
## Flagged for a supervised run (not done tonight)
1. **M1 clean install layout**`FleetDeviceSetupScore` should install to
`/usr/bin/fleet-agent-v<ver>` + symlink `/usr/local/bin/fleet-agent` from the
start. Needs a new `agent_version` config field (≈9 construction sites) and a
`FileSource::Symlink` delivery primitive (ansible `state: link`). The executor
self-heal above covers correctness in the meantime.
2. **libvirt vX→vX+1 e2e + corrupt-binary auto-revert** — needs two built agent
binaries, a served URL reachable from the VM, and a KVM run. The VM harness
exists (`harmony-fleet-e2e/src/vm`); the protocol brain is unit-green, so this
is an integration proof to run on real hardware. The corrupt-binary path is
already unit-proven via `stage_failure_*` / `heartbeat_timeout_reverts_*`.

View File

@@ -527,6 +527,7 @@ async fn simulate_heartbeat_loop(
let hb = HeartbeatPayload {
device_id: Id::from(device.device_id.clone()),
at: Utc::now(),
agent_version: "load-test".to_string(),
};
if let Ok(payload) = serde_json::to_vec(&hb) {
if bucket.put(&hb_key, payload.into()).await.is_ok() {

View File

@@ -7,7 +7,9 @@ rust-version = "1.85"
[dependencies]
harmony-fleet-auth = { path = "../harmony-fleet-auth" }
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
harmony_downloadable_asset = { path = "../../harmony_downloadable_asset" }
harmony = { path = "../../harmony", default-features = false, features = ["podman"] }
url = { workspace = true }
async-nats = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true }

View File

@@ -25,6 +25,8 @@ use harmony_reconciler_contracts::{
use serde::Serialize;
use thiserror::Error;
use crate::upgrade::UpgradeStopSignal;
/// Hard cap on a single exec's combined output. Keeps the JSON reply
/// comfortably under NATS's default 1 MiB message limit.
const EXEC_MAX_OUTPUT: usize = 256 * 1024;
@@ -38,15 +40,20 @@ pub struct CommandServer {
client: Client,
agent_version: &'static str,
started_at: Instant,
/// Fired when the operator sends `Verb::UpgradeStop` (ADR-022 cutover). The
/// upgrade manager awaits it and then exits the process so the new version
/// takes over. The agent never self-stops on its own observation.
upgrade_stop: Arc<UpgradeStopSignal>,
}
impl CommandServer {
pub fn new(device_id: Id, client: Client) -> Self {
pub fn new(device_id: Id, client: Client, upgrade_stop: Arc<UpgradeStopSignal>) -> Self {
Self {
device_id,
client,
agent_version: env!("CARGO_PKG_VERSION"),
started_at: Instant::now(),
upgrade_stop,
}
}
@@ -115,6 +122,29 @@ impl CommandServer {
Err(e) => return Err(CommandError::BadRequestBody(e.to_string())),
};
self.reply_exec(reply_to, &command).await
} else if verb_token == Verb::UpgradeStop.as_subject_token() {
let reason = match serde_json::from_slice::<CommandRequest>(&msg.payload) {
Ok(CommandRequest::UpgradeStop { reason }) => reason,
Ok(_) => "upgrade-stop".to_string(),
// Tolerate a bodyless stop — the operator's intent is clear.
Err(_) => "upgrade-stop".to_string(),
};
tracing::info!(%reason, "received upgrade-stop; signalling cutover exit");
// Ack first so the operator knows we got it, then fire the signal.
let ack = serde_json::to_vec(&PingReply {
device_id: self.device_id.clone(),
agent_version: self.agent_version.to_string(),
uptime_s: self.started_at.elapsed().as_secs(),
})
.map_err(CommandError::SerializeReply)?;
self.client
.publish(reply_to, ack.into())
.await
.map_err(|e| CommandError::PublishReply(e.to_string()))?;
if !self.upgrade_stop.fire() {
tracing::warn!("upgrade-stop received but no upgrade is in cutover; ignoring");
}
Ok(())
} else {
tracing::warn!(verb = %verb_token, "unknown command verb");
Err(CommandError::UnknownVerb(verb_token.to_string()))

View File

@@ -97,6 +97,7 @@ impl FleetPublisher {
let hb = HeartbeatPayload {
device_id: self.device_id.clone(),
at: chrono::Utc::now(),
agent_version: env!("CARGO_PKG_VERSION").to_string(),
};
let key = device_heartbeat_key(&self.device_id.to_string());
match serde_json::to_vec(&hb) {

View File

@@ -2,6 +2,7 @@ mod command_server;
mod config;
mod fleet_publisher;
mod reconciler;
mod upgrade;
use std::sync::Arc;
use std::time::Duration;
@@ -43,6 +44,24 @@ struct Cli {
default_value = "/etc/fleet-agent/config.toml"
)]
config: std::path::PathBuf,
/// ADR-022 self-test: load config, connect NATS (validates JWT), print
/// version + "ok", exit 0 — or exit non-zero on any failure. The upgrade
/// state machine runs this on a staged binary before cutover.
#[arg(long)]
self_test: bool,
}
/// ADR-022 `--self-test`: prove this binary can parse its config and reach NATS
/// with a valid JWT, then exit. No state mutation, no long-lived loops.
async fn self_test(cfg: &AgentConfig) -> Result<()> {
let creds = credential_source_from_config(&cfg.credentials)
.context("self-test: building NATS credential source")?;
connect_nats(cfg, creds)
.await
.context("self-test: NATS connect / JWT validation")?;
println!("fleet-agent v{} self-test ok", env!("CARGO_PKG_VERSION"));
Ok(())
}
async fn connect_nats(cfg: &AgentConfig, creds: Creds) -> Result<async_nats::Client> {
@@ -198,6 +217,13 @@ async fn main() -> Result<()> {
let cli = Cli::parse();
let cfg = config::load_config(&cli.config)?;
// ADR-022: the upgrade state machine invokes the staged binary with
// `--self-test` before any symlink swap. Exit code is the verdict.
if cli.self_test {
return self_test(&cfg).await;
}
tracing::info!(
device_id = %cfg.agent.device_id,
runtime_enabled = cfg.agent.runtime_enabled,
@@ -277,7 +303,14 @@ async fn main() -> Result<()> {
))
});
let command_server = Arc::new(CommandServer::new(device_id.clone(), client.clone()));
// Fired by the command server on `Verb::UpgradeStop`; awaited by the
// upgrade manager, which then exits the process for the new version.
let upgrade_stop = Arc::new(upgrade::UpgradeStopSignal::default());
let command_server = Arc::new(CommandServer::new(
device_id.clone(),
client.clone(),
upgrade_stop.clone(),
));
let ctrlc = async {
tokio::signal::ctrl_c().await.ok();
@@ -312,6 +345,16 @@ async fn main() -> Result<()> {
};
let heartbeat = publish_heartbeat_loop(fleet);
let commands = command_server.run();
// ADR-022 self-upgrade manager. Returns only when the operator commits a
// cutover (it sent `UpgradeStop`) — that completes the select and the
// process exits 0, so the already-started new version takes over and
// systemd (Restart=on-failure) does not bring the old one back.
let upgrade = upgrade::run(
client.clone(),
device_id.clone(),
env!("CARGO_PKG_VERSION").to_string(),
upgrade_stop.clone(),
);
tokio::select! {
// Waiting on ctrlc in a select will automatically terminate other branches when
@@ -322,6 +365,7 @@ async fn main() -> Result<()> {
_ = reconcile => {}
_ = heartbeat => {}
r = commands => { r?; }
r = upgrade => { r?; tracing::info!("agent exiting for upgrade cutover"); }
}
Ok(())

View File

@@ -0,0 +1,663 @@
//! Agent self-upgrade state machine (ADR-022).
//!
//! The operator publishes an [`AgentUpgradeMarker`] (desired version + signed
//! URL + SHA-256). The agent stages the new binary, verifies it with
//! `--self-test`, atomically swaps the `/usr/local/bin/fleet-agent` symlink, and
//! starts the new version alongside itself. It then **waits for the operator's
//! stop signal** — the agent never self-stops (the operator is the single
//! source of truth, and only commits the stop after it has seen the new
//! version's heartbeat). If the new version never reports healthy within
//! [`HEARTBEAT_TIMEOUT`], the agent reverts the symlink and stays on its
//! current version. Rollback is the same code path as upgrade.
//!
//! The OS side-effects (download, `--self-test`, symlink swap, systemd) sit
//! behind [`UpgradeExecutor`] so [`drive`] — the state machine "brain" — is
//! unit-tested with a fake; [`SystemdUpgradeExecutor`] does the real work.
use std::path::{Path, PathBuf};
use std::time::Duration;
use harmony_reconciler_contracts::AgentUpgradeMarker;
use harmony_reconciler_contracts::AgentUpgradePhase::{
self, CutoverReady, Done, Failed, Staging, Verifying,
};
/// Directory holding the immutable versioned binaries (`fleet-agent-v<ver>`).
pub const VERSIONED_BIN_DIR: &str = "/usr/bin";
/// The stable path the systemd unit's `ExecStart` references. Symlink swap of
/// this path is the cutover primitive (POSIX-atomic rename).
pub const ACTIVE_SYMLINK: &str = "/usr/local/bin/fleet-agent";
/// How long the old agent waits for the new version's heartbeat after cutover
/// before reverting (ADR-022 suggests 60s).
pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(60);
#[derive(Debug, thiserror::Error)]
pub enum UpgradeError {
#[error("invalid binary URL: {0}")]
Url(String),
#[error(transparent)]
Download(#[from] harmony_downloadable_asset::DownloadError),
#[error("filesystem error: {0}")]
Io(#[from] std::io::Error),
#[error("self-test exited with {0}")]
SelfTest(String),
#[error("systemd error: {0}")]
Systemd(String),
#[error("no previous symlink target recorded to revert to")]
NothingToRevert,
}
/// The OS-level operations the upgrade needs. Behind a trait so [`drive`] is
/// unit-testable without touching disk or systemd.
pub trait UpgradeExecutor {
/// Download + verify the binary and place it at its versioned path. Returns
/// the staged path.
async fn stage(&self, marker: &AgentUpgradeMarker) -> Result<PathBuf, UpgradeError>;
/// Run the staged binary's `--self-test`. Err if it doesn't exit 0.
async fn self_test(&self, binary: &Path) -> Result<(), UpgradeError>;
/// Atomically point [`ACTIVE_SYMLINK`] at `binary` (recording the previous
/// target for revert) and start the new version alongside the old.
async fn cutover(&self, binary: &Path, version: &str) -> Result<(), UpgradeError>;
/// Restore the pre-cutover symlink target and stop the new version.
async fn revert(&self) -> Result<(), UpgradeError>;
}
/// Sink for upgrade phase transitions (→ operator via KV).
pub trait StatusSink {
async fn publish(&self, phase: AgentUpgradePhase, last_error: Option<String>);
}
/// What the agent process should do after an upgrade attempt.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum UpgradeAction {
/// Cutover succeeded and the operator signalled stop — exit(0) so the new
/// version (already running) takes over cleanly.
Exit,
/// The upgrade failed or was reverted — stay running on the current version.
StayedOnCurrent,
}
/// The state machine. Stage → verify → cutover → wait for the operator's stop
/// (with a heartbeat-timeout revert). Pure orchestration over the injected
/// executor, status sink, and post-cutover signal futures.
///
/// `new_version_healthy` resolves when a heartbeat from the new version is
/// observed; `operator_stop` resolves when the operator sends the stop signal.
pub async fn drive<E, S>(
marker: &AgentUpgradeMarker,
executor: &E,
status: &S,
new_version_healthy: impl std::future::Future<Output = ()>,
operator_stop: impl std::future::Future<Output = ()>,
heartbeat_timeout: Duration,
) -> UpgradeAction
where
E: UpgradeExecutor,
S: StatusSink,
{
status.publish(Staging, None).await;
let staged = match executor.stage(marker).await {
Ok(p) => p,
Err(e) => {
status.publish(Failed, Some(e.to_string())).await;
return UpgradeAction::StayedOnCurrent;
}
};
status.publish(Verifying, None).await;
if let Err(e) = executor.self_test(&staged).await {
status
.publish(Failed, Some(format!("self-test failed: {e}")))
.await;
return UpgradeAction::StayedOnCurrent;
}
status.publish(CutoverReady, None).await;
if let Err(e) = executor.cutover(&staged, &marker.desired_version).await {
let _ = executor.revert().await;
status
.publish(Failed, Some(format!("cutover failed: {e}")))
.await;
return UpgradeAction::StayedOnCurrent;
}
// Post-cutover: race the operator's stop against the new version becoming
// healthy and the timeout. Stop ⇒ exit. Healthy ⇒ wait indefinitely for
// stop (operator-outage-safe — two agents is wasteful but never blind).
// Timeout with no healthy heartbeat ⇒ revert and stay.
tokio::pin!(new_version_healthy);
tokio::pin!(operator_stop);
tokio::select! {
_ = &mut operator_stop => {
status.publish(Done, None).await;
return UpgradeAction::Exit;
}
_ = &mut new_version_healthy => {}
_ = tokio::time::sleep(heartbeat_timeout) => {
let _ = executor.revert().await;
status
.publish(Failed, Some("new version did not report healthy in time; reverted".into()))
.await;
return UpgradeAction::StayedOnCurrent;
}
}
// Healthy observed — commit is now the operator's; wait for its stop.
operator_stop.await;
status.publish(Done, None).await;
UpgradeAction::Exit
}
// ───────────────────────── real executor ─────────────────────────
/// Production [`UpgradeExecutor`]: download+verify, `--self-test`, atomic
/// symlink swap, and a systemd transient unit for the new version.
pub struct SystemdUpgradeExecutor {
/// The version this (old) agent is running — used to preserve its binary at
/// a versioned path so `revert` always has a target, even on a pre-M1 plain
/// install where `ACTIVE_SYMLINK` is a regular file.
current_version: String,
/// Previous symlink target, captured at cutover so `revert` can restore it.
previous_target: tokio::sync::Mutex<Option<PathBuf>>,
/// Transient systemd unit name for the started new version, for `revert`.
started_unit: tokio::sync::Mutex<Option<String>>,
}
impl SystemdUpgradeExecutor {
pub fn new(current_version: String) -> Self {
Self {
current_version,
previous_target: tokio::sync::Mutex::new(None),
started_unit: tokio::sync::Mutex::new(None),
}
}
fn transient_unit(version: &str) -> String {
format!("fleet-agent-v{version}")
}
/// The revert target: the existing symlink target if `ACTIVE_SYMLINK` is a
/// symlink, else the running binary preserved at its versioned path (so a
/// plain-file initial install can still be rolled back to).
fn capture_revert_target(&self) -> PathBuf {
if let Ok(target) = std::fs::read_link(ACTIVE_SYMLINK) {
return target;
}
let preserved =
PathBuf::from(VERSIONED_BIN_DIR).join(format!("fleet-agent-v{}", self.current_version));
if !preserved.exists() {
if let Ok(exe) = std::env::current_exe() {
if let Err(e) = std::fs::copy(&exe, &preserved) {
tracing::warn!(error = %e, "upgrade: could not preserve current binary");
}
}
}
preserved
}
}
impl UpgradeExecutor for SystemdUpgradeExecutor {
async fn stage(&self, marker: &AgentUpgradeMarker) -> Result<PathBuf, UpgradeError> {
use harmony_downloadable_asset::DownloadableAsset;
let asset = DownloadableAsset {
url: url::Url::parse(&marker.url).map_err(|e| UpgradeError::Url(e.to_string()))?,
file_name: format!("fleet-agent-v{}", marker.desired_version),
checksum: marker.sha256.clone(),
};
let path = asset
.download_to_path(PathBuf::from(VERSIONED_BIN_DIR))
.await?;
// Make it executable (0755).
let mut perms = std::fs::metadata(&path)?.permissions();
use std::os::unix::fs::PermissionsExt;
perms.set_mode(0o755);
std::fs::set_permissions(&path, perms)?;
Ok(path)
}
async fn self_test(&self, binary: &Path) -> Result<(), UpgradeError> {
let status = tokio::process::Command::new(binary)
.arg("--self-test")
.status()
.await
.map_err(|e| UpgradeError::SelfTest(e.to_string()))?;
if status.success() {
Ok(())
} else {
Err(UpgradeError::SelfTest(format!("exit {status}")))
}
}
async fn cutover(&self, binary: &Path, version: &str) -> Result<(), UpgradeError> {
// Record (and, if needed, preserve) the current binary so revert can
// restore it.
*self.previous_target.lock().await = Some(self.capture_revert_target());
// Atomic swap: symlink to a temp name, rename over the active path.
let tmp = PathBuf::from(format!("{ACTIVE_SYMLINK}.new"));
let _ = std::fs::remove_file(&tmp);
std::os::unix::fs::symlink(binary, &tmp)?;
std::fs::rename(&tmp, ACTIVE_SYMLINK)?;
// Start the new version alongside us as a transient systemd unit, so the
// operator can observe its heartbeat before committing the stop.
let unit = Self::transient_unit(version);
let out = tokio::process::Command::new("systemd-run")
.args(["--unit", &unit, "--collect"])
.arg(binary)
.output()
.await
.map_err(|e| UpgradeError::Systemd(e.to_string()))?;
if !out.status.success() {
return Err(UpgradeError::Systemd(
String::from_utf8_lossy(&out.stderr).to_string(),
));
}
*self.started_unit.lock().await = Some(unit);
Ok(())
}
async fn revert(&self) -> Result<(), UpgradeError> {
// Stop the transient unit if we started one.
if let Some(unit) = self.started_unit.lock().await.take() {
let _ = tokio::process::Command::new("systemctl")
.args(["stop", &unit])
.status()
.await;
}
// Restore the previous symlink target.
let previous = self
.previous_target
.lock()
.await
.clone()
.ok_or(UpgradeError::NothingToRevert)?;
let tmp = PathBuf::from(format!("{ACTIVE_SYMLINK}.revert"));
let _ = std::fs::remove_file(&tmp);
std::os::unix::fs::symlink(&previous, &tmp)?;
std::fs::rename(&tmp, ACTIVE_SYMLINK)?;
Ok(())
}
}
// ───────────────────────── NATS wiring ─────────────────────────
use std::sync::Arc;
use async_nats::jetstream::{self, kv::Operation};
use futures_util::StreamExt;
use harmony_reconciler_contracts::{
AgentUpgradeStatus, BUCKET_AGENT_UPGRADE, BUCKET_AGENT_UPGRADE_STATUS, BUCKET_DEVICE_HEARTBEAT,
HeartbeatPayload, Id, agent_upgrade_key, device_heartbeat_key,
};
use std::sync::atomic::{AtomicBool, Ordering};
use tokio::sync::Notify;
/// The operator's cutover stop signal, shared between the command server (which
/// receives `Verb::UpgradeStop`) and the upgrade manager (which awaits it).
///
/// Both old and new agents subscribe to the device command subject, so both
/// *receive* the stop — but it's only **armed** on the old agent during its
/// post-cutover wait. Outside that window `fire` is a no-op, so a stray stop
/// (or one meant for a previous upgrade) can't make an agent exit, and no stale
/// permit accumulates to trip the next upgrade.
pub struct UpgradeStopSignal {
notify: Notify,
armed: AtomicBool,
}
impl Default for UpgradeStopSignal {
fn default() -> Self {
Self {
notify: Notify::new(),
armed: AtomicBool::new(false),
}
}
}
impl UpgradeStopSignal {
/// Fire the signal if armed. Returns whether it was (for logging).
pub fn fire(&self) -> bool {
if self.armed.load(Ordering::SeqCst) {
self.notify.notify_one();
true
} else {
false
}
}
async fn notified(&self) {
self.notify.notified().await;
}
fn arm(&self) {
self.armed.store(true, Ordering::SeqCst);
}
fn disarm(&self) {
self.armed.store(false, Ordering::SeqCst);
}
}
/// Writes [`AgentUpgradeStatus`] to the status KV so the operator can reflect
/// the live phase onto the `Device` CR.
struct KvStatusSink {
bucket: jetstream::kv::Store,
device_id: Id,
current_version: String,
target: String,
}
impl StatusSink for KvStatusSink {
async fn publish(&self, phase: AgentUpgradePhase, last_error: Option<String>) {
let status = AgentUpgradeStatus {
device_id: self.device_id.clone(),
phase,
current_version: self.current_version.clone(),
target_version: Some(self.target.clone()),
last_error,
at: chrono::Utc::now(),
};
let key = agent_upgrade_key(&self.device_id.to_string());
match serde_json::to_vec(&status) {
Ok(payload) => {
if let Err(e) = self.bucket.put(&key, payload.into()).await {
tracing::warn!(error = %e, "upgrade: status publish failed");
}
}
Err(e) => tracing::warn!(error = %e, "upgrade: status serialize failed"),
}
}
}
/// Watch the device's upgrade marker; run the state machine when a marker asks
/// for a different version. Returns `Ok(())` only when an upgrade succeeds and
/// the operator signals stop — the caller (main) treats that as "exit so the
/// new version takes over". Failures/reverts keep watching for a fixed marker.
pub async fn run(
client: async_nats::Client,
device_id: Id,
current_version: String,
operator_stop: Arc<UpgradeStopSignal>,
) -> anyhow::Result<()> {
let js = jetstream::new(client);
let marker_bucket = js
.create_key_value(jetstream::kv::Config {
bucket: BUCKET_AGENT_UPGRADE.to_string(),
..Default::default()
})
.await?;
let status_bucket = js
.create_key_value(jetstream::kv::Config {
bucket: BUCKET_AGENT_UPGRADE_STATUS.to_string(),
..Default::default()
})
.await?;
let heartbeat_bucket = js
.create_key_value(jetstream::kv::Config {
bucket: BUCKET_DEVICE_HEARTBEAT.to_string(),
..Default::default()
})
.await?;
let my_key = agent_upgrade_key(&device_id.to_string());
tracing::info!(version = %current_version, key = %my_key, "upgrade: watching marker");
// Outer loop re-establishes the watch after a NATS blip — only an
// operator-committed cutover (`UpgradeAction::Exit`) returns from here.
loop {
let mut watch = marker_bucket.watch_with_history(&my_key).await?;
while let Some(entry) = watch.next().await {
let entry = match entry {
Ok(e) => e,
Err(e) => {
tracing::warn!(error = %e, "upgrade: marker watch error");
continue;
}
};
if entry.operation != Operation::Put {
continue;
}
let marker: AgentUpgradeMarker = match serde_json::from_slice(&entry.value) {
Ok(m) => m,
Err(e) => {
tracing::warn!(error = %e, "upgrade: bad marker payload");
continue;
}
};
if marker.desired_version == current_version {
tracing::debug!(version = %current_version, "upgrade: already at desired version");
continue;
}
tracing::info!(
from = %current_version,
to = %marker.desired_version,
"upgrade: marker requests new version"
);
let executor = SystemdUpgradeExecutor::new(current_version.clone());
let sink = KvStatusSink {
bucket: status_bucket.clone(),
device_id: device_id.clone(),
current_version: current_version.clone(),
target: marker.desired_version.clone(),
};
let healthy =
wait_new_version_healthy(&heartbeat_bucket, &device_id, &marker.desired_version);
// Arm the stop signal for the duration of this attempt; the command
// server only forwards an operator stop while armed.
operator_stop.arm();
let stop = {
let signal = operator_stop.clone();
async move { signal.notified().await }
};
let action = drive(&marker, &executor, &sink, healthy, stop, HEARTBEAT_TIMEOUT).await;
operator_stop.disarm();
match action {
UpgradeAction::Exit => {
tracing::info!(
"upgrade: cutover committed by operator; exiting for new version"
);
return Ok(());
}
UpgradeAction::StayedOnCurrent => {
tracing::warn!("upgrade: stayed on current version; awaiting a new marker");
}
}
}
tracing::warn!("upgrade: marker watch ended; re-establishing");
tokio::time::sleep(Duration::from_secs(5)).await;
}
}
/// Resolve once a heartbeat from `target` version is observed on this device's
/// heartbeat key (old and new both write it; we wait for the new one).
async fn wait_new_version_healthy(bucket: &jetstream::kv::Store, device_id: &Id, target: &str) {
let key = device_heartbeat_key(&device_id.to_string());
let mut watch = match bucket.watch_with_history(&key).await {
Ok(w) => w,
// If we can't watch, never report healthy — the drive's timeout reverts.
Err(e) => {
tracing::warn!(error = %e, "upgrade: heartbeat watch failed");
return std::future::pending::<()>().await;
}
};
while let Some(entry) = watch.next().await {
if let Ok(e) = entry {
if e.operation == Operation::Put {
if let Ok(hb) = serde_json::from_slice::<HeartbeatPayload>(&e.value) {
if hb.agent_version == target {
return;
}
}
}
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
/// Records the phase sequence and lets a test fail any step.
#[derive(Default)]
struct FakeExecutor {
fail_stage: bool,
fail_self_test: bool,
fail_cutover: bool,
reverted: AtomicUsize,
cutover_calls: AtomicUsize,
}
impl UpgradeExecutor for FakeExecutor {
async fn stage(&self, _m: &AgentUpgradeMarker) -> Result<PathBuf, UpgradeError> {
if self.fail_stage {
return Err(UpgradeError::Url("bad".into()));
}
Ok(PathBuf::from("/usr/bin/fleet-agent-v9.9.9"))
}
async fn self_test(&self, _b: &Path) -> Result<(), UpgradeError> {
if self.fail_self_test {
return Err(UpgradeError::SelfTest("rc 1".into()));
}
Ok(())
}
async fn cutover(&self, _b: &Path, _v: &str) -> Result<(), UpgradeError> {
self.cutover_calls.fetch_add(1, Ordering::SeqCst);
if self.fail_cutover {
return Err(UpgradeError::Systemd("nope".into()));
}
Ok(())
}
async fn revert(&self) -> Result<(), UpgradeError> {
self.reverted.fetch_add(1, Ordering::SeqCst);
Ok(())
}
}
#[derive(Default)]
struct RecordingSink(Mutex<Vec<AgentUpgradePhase>>);
impl StatusSink for RecordingSink {
async fn publish(&self, phase: AgentUpgradePhase, _e: Option<String>) {
self.0.lock().unwrap().push(phase);
}
}
impl RecordingSink {
fn phases(&self) -> Vec<AgentUpgradePhase> {
self.0.lock().unwrap().clone()
}
}
fn marker() -> AgentUpgradeMarker {
AgentUpgradeMarker {
desired_version: "9.9.9".into(),
url: "https://example/agent".into(),
sha256: "deadbeef".into(),
}
}
use std::future::{Future, pending, ready};
fn never() -> impl Future<Output = ()> {
pending::<()>()
}
#[tokio::test]
async fn happy_path_cutover_then_stop_exits() {
let exec = FakeExecutor::default();
let sink = RecordingSink::default();
// Operator stop fires immediately (it saw the new heartbeat).
let action = drive(
&marker(),
&exec,
&sink,
never(),
ready(()),
HEARTBEAT_TIMEOUT,
)
.await;
assert_eq!(action, UpgradeAction::Exit);
assert_eq!(exec.reverted.load(Ordering::SeqCst), 0);
assert_eq!(sink.phases(), vec![Staging, Verifying, CutoverReady, Done]);
}
#[tokio::test]
async fn healthy_then_stop_exits_without_revert() {
let exec = FakeExecutor::default();
let sink = RecordingSink::default();
// Healthy observed first; then (still) operator stop → exit, no revert.
let action = drive(
&marker(),
&exec,
&sink,
ready(()),
ready(()),
HEARTBEAT_TIMEOUT,
)
.await;
assert_eq!(action, UpgradeAction::Exit);
assert_eq!(exec.reverted.load(Ordering::SeqCst), 0);
}
#[tokio::test]
async fn stage_failure_stays_and_does_not_cutover() {
let exec = FakeExecutor {
fail_stage: true,
..Default::default()
};
let sink = RecordingSink::default();
let action = drive(&marker(), &exec, &sink, never(), never(), HEARTBEAT_TIMEOUT).await;
assert_eq!(action, UpgradeAction::StayedOnCurrent);
assert_eq!(exec.cutover_calls.load(Ordering::SeqCst), 0);
assert_eq!(sink.phases(), vec![Staging, Failed]);
}
#[tokio::test]
async fn self_test_failure_stays_and_does_not_cutover() {
let exec = FakeExecutor {
fail_self_test: true,
..Default::default()
};
let sink = RecordingSink::default();
let action = drive(&marker(), &exec, &sink, never(), never(), HEARTBEAT_TIMEOUT).await;
assert_eq!(action, UpgradeAction::StayedOnCurrent);
assert_eq!(exec.cutover_calls.load(Ordering::SeqCst), 0);
assert_eq!(sink.phases(), vec![Staging, Verifying, Failed]);
}
#[tokio::test]
async fn cutover_failure_reverts_and_stays() {
let exec = FakeExecutor {
fail_cutover: true,
..Default::default()
};
let sink = RecordingSink::default();
let action = drive(&marker(), &exec, &sink, never(), never(), HEARTBEAT_TIMEOUT).await;
assert_eq!(action, UpgradeAction::StayedOnCurrent);
assert_eq!(exec.reverted.load(Ordering::SeqCst), 1);
assert_eq!(
sink.phases(),
vec![Staging, Verifying, CutoverReady, Failed]
);
}
#[tokio::test(start_paused = true)]
async fn heartbeat_timeout_reverts_and_stays() {
let exec = FakeExecutor::default();
let sink = RecordingSink::default();
// Never healthy, never stopped → the timeout fires → revert.
let action = drive(
&marker(),
&exec,
&sink,
never(),
never(),
Duration::from_secs(60),
)
.await;
assert_eq!(action, UpgradeAction::StayedOnCurrent);
assert_eq!(exec.reverted.load(Ordering::SeqCst), 1);
assert_eq!(
sink.phases(),
vec![Staging, Verifying, CutoverReady, Failed]
);
}
}

View File

@@ -102,6 +102,24 @@ impl FleetCommandsClient {
};
Ok(serde_json::from_slice(&resp.payload)?)
}
/// Send `Verb::UpgradeStop` at upgrade cutover (ADR-022). The operator is the
/// single source of truth for the stop — it calls this only after observing
/// the new version's heartbeat. The old agent acks, then exits; the new
/// (already running) version takes over.
pub async fn upgrade_stop(&self, device_id: &str, reason: &str) -> Result<(), CommandError> {
let subject = device_command_subject(device_id, Verb::UpgradeStop);
let body = serde_json::to_vec(&CommandRequest::UpgradeStop {
reason: reason.to_string(),
})
.expect("CommandRequest serializes");
let fut = self.nc.request(subject, body.into());
match tokio::time::timeout(self.timeout, fut).await {
Ok(Ok(_)) => Ok(()),
Ok(Err(err)) => Err(map_request_error(err, self.timeout)),
Err(_) => Err(CommandError::Timeout(self.timeout)),
}
}
}
/// Map an async-nats `RequestError` to our typed surface. The `kind`

View File

@@ -17,3 +17,4 @@ pub mod device_reconciler;
pub mod device_status;
pub mod fleet_aggregator;
pub mod liveness;
pub mod upgrade_coordinator;

View File

@@ -5,7 +5,9 @@ mod frontend;
#[cfg(feature = "web-frontend")]
mod service;
use harmony_fleet_operator::{device_reconciler, device_status, fleet_aggregator};
use harmony_fleet_operator::{
device_reconciler, device_status, fleet_aggregator, upgrade_coordinator,
};
use anyhow::{Context, Result};
use async_nats::jetstream;
@@ -258,10 +260,15 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()>
let dr_js = js.clone();
let ds_client = client.clone();
let ds_js = js.clone();
// upgrade_coordinator — agent-upgrade status/heartbeat → cutover stop + CR
let uc_client = client.clone();
let uc_js = js.clone();
let uc_commands = harmony_fleet_operator::commands::FleetCommandsClient::new(nats.clone());
tokio::select! {
r = controller::run(ctl_client, desired_state_kv) => r,
r = device_reconciler::run(dr_client, dr_js) => r,
r = device_status::run(ds_client, ds_js) => r,
r = upgrade_coordinator::run(uc_client, uc_js, uc_commands) => r,
r = fleet_aggregator::run(client, js, liveness) => r,
}
}

View File

@@ -592,6 +592,8 @@ mod tests {
DeviceLiveness {
last_heartbeat: None,
reachability: r,
current_version: None,
upgrade: None,
}
}

View File

@@ -0,0 +1,238 @@
//! Operator-side agent-upgrade coordinator (ADR-022).
//!
//! The operator is the single source of truth for the cutover stop. This loop:
//! 1. watches the agent **upgrade-status** KV (phase per device) and the
//! **heartbeat** KV (running version per device);
//! 2. when a device is `CutoverReady` **and** the operator has independently
//! observed a heartbeat from the new version, sends `Verb::UpgradeStop` to
//! the old agent — never before;
//! 3. reflects the current version + upgrade phase onto the `Device` CR.
//!
//! The agent never self-stops; only this operator-observed handoff advances the
//! upgrade. See [`should_send_stop`] for the load-bearing decision.
use std::collections::{HashMap, HashSet};
use anyhow::Result;
use async_nats::jetstream::kv::Operation;
use futures_util::StreamExt;
use harmony_reconciler_contracts::{
AgentUpgradePhase, AgentUpgradeStatus, BUCKET_AGENT_UPGRADE_STATUS, BUCKET_DEVICE_HEARTBEAT,
HeartbeatPayload,
};
use kube::Client;
use kube::api::{Api, Patch, PatchParams};
use serde_json::json;
use tokio::sync::Mutex;
use harmony::modules::fleet::operator::{Device, DeviceUpgradeStatus};
use crate::commands::FleetCommandsClient;
/// The handoff decision: commit the stop only when the agent is `CutoverReady`
/// **and** the operator has seen a heartbeat from the target version. Pure so
/// it can be unit-tested without NATS.
pub fn should_send_stop(status: &AgentUpgradeStatus, observed_version: Option<&str>) -> bool {
if status.phase != AgentUpgradePhase::CutoverReady {
return false;
}
match (&status.target_version, observed_version) {
(Some(target), Some(seen)) => seen == target,
_ => false,
}
}
#[derive(Default)]
struct Coordinator {
/// Latest upgrade status per device.
statuses: HashMap<String, AgentUpgradeStatus>,
/// Latest heartbeat version per device.
heartbeat_versions: HashMap<String, String>,
/// Devices we've already sent the stop to (don't resend every tick).
stopped: HashSet<String>,
}
pub async fn run(
kube: Client,
js: async_nats::jetstream::Context,
commands: FleetCommandsClient,
) -> Result<()> {
let status_bucket = js
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_AGENT_UPGRADE_STATUS.to_string(),
..Default::default()
})
.await?;
let heartbeat_bucket = js
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_DEVICE_HEARTBEAT.to_string(),
..Default::default()
})
.await?;
let state = Mutex::new(Coordinator::default());
let devices: Api<Device> = Api::all(kube);
let status_watch = async {
let mut w = status_bucket.watch_with_history(">").await?;
while let Some(entry) = w.next().await {
let Ok(entry) = entry else { continue };
if entry.operation != Operation::Put {
continue;
}
let Ok(status) = serde_json::from_slice::<AgentUpgradeStatus>(&entry.value) else {
continue;
};
let device = status.device_id.to_string();
{
let mut g = state.lock().await;
// A fresh non-terminal phase means a new attempt — allow a stop again.
if status.phase != AgentUpgradePhase::CutoverReady {
g.stopped.remove(&device);
}
g.statuses.insert(device.clone(), status);
}
reconcile_device(&state, &devices, &commands, &device).await;
}
Ok::<(), anyhow::Error>(())
};
let heartbeat_watch = async {
let mut w = heartbeat_bucket.watch_with_history(">").await?;
while let Some(entry) = w.next().await {
let Ok(entry) = entry else { continue };
if entry.operation != Operation::Put {
continue;
}
let Ok(hb) = serde_json::from_slice::<HeartbeatPayload>(&entry.value) else {
continue;
};
if hb.agent_version.is_empty() {
continue;
}
let device = hb.device_id.to_string();
state
.lock()
.await
.heartbeat_versions
.insert(device.clone(), hb.agent_version);
reconcile_device(&state, &devices, &commands, &device).await;
}
Ok::<(), anyhow::Error>(())
};
tokio::try_join!(status_watch, heartbeat_watch)?;
Ok(())
}
/// For one device: decide whether to commit the stop, then reflect version +
/// upgrade phase onto its CR.
async fn reconcile_device(
state: &Mutex<Coordinator>,
devices: &Api<Device>,
commands: &FleetCommandsClient,
device: &str,
) {
// Snapshot the decision inputs under the lock.
let (send_stop, status, version) = {
let g = state.lock().await;
let status = g.statuses.get(device).cloned();
let version = g.heartbeat_versions.get(device).cloned();
let send_stop = status
.as_ref()
.map(|s| should_send_stop(s, version.as_deref()) && !g.stopped.contains(device))
.unwrap_or(false);
(send_stop, status, version)
};
if send_stop {
// Mark before sending so a concurrent reconcile doesn't double-send.
state.lock().await.stopped.insert(device.to_string());
tracing::info!(device, "upgrade: new version healthy; sending cutover stop");
if let Err(e) = commands
.upgrade_stop(device, "upgrade cutover committed")
.await
{
tracing::warn!(device, error = %e, "upgrade: stop send failed; will retry on next event");
state.lock().await.stopped.remove(device);
}
}
patch_device(devices, device, status.as_ref(), version.as_deref()).await;
}
async fn patch_device(
devices: &Api<Device>,
device: &str,
status: Option<&AgentUpgradeStatus>,
version: Option<&str>,
) {
// Surface the upgrade only while it's in flight (drop it on Done/Running).
let upgrade = status.and_then(|s| match s.phase {
AgentUpgradePhase::Done | AgentUpgradePhase::Running => None,
phase => Some(DeviceUpgradeStatus {
phase,
target_version: s.target_version.clone(),
last_error: s.last_error.clone(),
}),
});
let patch = json!({
"status": {
"currentVersion": version,
"upgrade": upgrade,
}
});
if let Err(e) = devices
.patch_status(device, &PatchParams::default(), &Patch::Merge(&patch))
.await
{
// A heartbeat/status can outrace the Device CR's creation; retry later.
if !matches!(&e, kube::Error::Api(ae) if ae.code == 404) {
tracing::warn!(device, error = %e, "upgrade: device status patch failed");
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use harmony_reconciler_contracts::Id;
fn status(phase: AgentUpgradePhase, target: Option<&str>) -> AgentUpgradeStatus {
AgentUpgradeStatus {
device_id: Id::from("dev-1"),
phase,
current_version: "0.1.0".into(),
target_version: target.map(String::from),
last_error: None,
at: Utc::now(),
}
}
#[test]
fn commits_stop_only_when_cutover_ready_and_new_version_seen() {
let s = status(AgentUpgradePhase::CutoverReady, Some("0.2.0"));
assert!(should_send_stop(&s, Some("0.2.0")));
// New version not yet observed (still old) → hold.
assert!(!should_send_stop(&s, Some("0.1.0")));
// No heartbeat seen → hold.
assert!(!should_send_stop(&s, None));
}
#[test]
fn never_commits_before_cutover_ready() {
for phase in [
AgentUpgradePhase::Staging,
AgentUpgradePhase::Verifying,
AgentUpgradePhase::Failed,
AgentUpgradePhase::Running,
] {
let s = status(phase, Some("0.2.0"));
assert!(
!should_send_stop(&s, Some("0.2.0")),
"{phase:?} must not commit"
);
}
}
}

View File

@@ -45,6 +45,12 @@ pub enum Verb {
/// Run a shell command on the device and return its captured
/// output. Single-shot (no streaming); the agent runs `sh -c`.
Exec,
/// Operator → old agent: gracefully exit during an upgrade cutover
/// (ADR-022). The operator is the single source of truth for the stop —
/// the agent never self-stops. Reusing the command protocol means no new
/// subject or callout-permission change.
#[serde(rename = "upgrade-stop")]
UpgradeStop,
}
impl Verb {
@@ -55,6 +61,7 @@ impl Verb {
match self {
Verb::Ping => "ping",
Verb::Exec => "exec",
Verb::UpgradeStop => "upgrade-stop",
}
}
}
@@ -77,7 +84,14 @@ pub fn device_command_subscription(device_id: &str) -> String {
#[serde(tag = "verb", rename_all = "lowercase")]
pub enum CommandRequest {
Ping,
Exec { command: String },
Exec {
command: String,
},
/// Stop the old agent at upgrade cutover. `reason` is logged.
#[serde(rename = "upgrade-stop")]
UpgradeStop {
reason: String,
},
}
/// JSON body of a `Verb::Ping` reply.

View File

@@ -139,6 +139,11 @@ pub struct DeploymentState {
pub struct HeartbeatPayload {
pub device_id: Id,
pub at: DateTime<Utc>,
/// Agent crate version the device is running (ADR-022). `#[serde(default)]`
/// so a pre-upgrade agent's heartbeat (no field) decodes as `""` =
/// "unknown" rather than failing — operators in the field must tolerate it.
#[serde(default)]
pub agent_version: String,
}
#[cfg(test)]
@@ -239,10 +244,11 @@ mod tests {
let hb = HeartbeatPayload {
device_id: Id::from("pi-01".to_string()),
at: ts("2026-04-22T10:00:30Z"),
agent_version: "0.1.0".to_string(),
};
let bytes = serde_json::to_vec(&hb).unwrap();
assert!(
bytes.len() < 96,
bytes.len() < 128,
"heartbeat payload grew to {} bytes: {}",
bytes.len(),
String::from_utf8_lossy(&bytes),

View File

@@ -32,6 +32,20 @@ pub const BUCKET_DEVICE_STATE: &str = "device-state";
/// the state bucket. Key format: `heartbeat.<device_id>`.
pub const BUCKET_DEVICE_HEARTBEAT: &str = "device-heartbeat";
/// Operator → agent upgrade marker (ADR-022): desired agent version + signed
/// binary URL + SHA-256, one per device. Key format: `agent-upgrade.<device_id>`.
pub const BUCKET_AGENT_UPGRADE: &str = "agent-upgrade";
/// Agent → operator upgrade status: the live phase of the upgrade state
/// machine. Key format: `agent-upgrade.<device_id>`.
pub const BUCKET_AGENT_UPGRADE_STATUS: &str = "agent-upgrade-status";
/// KV key for a device's upgrade marker / status. Format:
/// `agent-upgrade.<device_id>` (shared shape across both upgrade buckets).
pub fn agent_upgrade_key(device_id: &str) -> String {
format!("agent-upgrade.{device_id}")
}
/// KV key for a `(device, deployment)` pair in [`BUCKET_DESIRED_STATE`].
/// Format: `<device>.<deployment>`.
pub fn desired_state_key(device_id: &str, deployment_name: &DeploymentName) -> String {

View File

@@ -20,6 +20,7 @@ pub mod commands;
pub mod fleet;
pub mod kv;
pub mod status;
pub mod upgrade;
pub use commands::{
CommandRequest, ErrorKind, ErrorReply, ExecReply, HDR_DEADLINE, HDR_FINAL, HDR_OPERATOR_SUB,
@@ -30,11 +31,13 @@ pub use fleet::{
DeploymentName, DeploymentState, DeviceInfo, HeartbeatPayload, InvalidDeploymentName,
};
pub use kv::{
BUCKET_DESIRED_STATE, BUCKET_DEVICE_HEARTBEAT, BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE,
BUCKET_AGENT_UPGRADE, BUCKET_AGENT_UPGRADE_STATUS, BUCKET_DESIRED_STATE,
BUCKET_DEVICE_HEARTBEAT, BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE, agent_upgrade_key,
desired_state_key, desired_state_watch_filter, device_heartbeat_key, device_info_key,
device_state_key,
};
pub use status::{InventorySnapshot, Phase};
pub use upgrade::{AgentUpgradeMarker, AgentUpgradePhase, AgentUpgradeStatus};
// Re-exports so consumers (agent, operator) don't need a direct
// harmony_types dependency purely to name the cross-boundary types.

View File

@@ -0,0 +1,98 @@
//! Agent self-upgrade wire types (ADR-022).
//!
//! The operator publishes an [`AgentUpgradeMarker`] per device (desired version
//! + signed binary URL + SHA-256). The agent runs the upgrade state machine and
//! publishes [`AgentUpgradeStatus`] back so the operator can reflect progress
//! onto the `Device` CR and decide when to send the stop signal. Both ride NATS
//! KV (not a fire-and-forget subject) so they survive an operator restart.
use chrono::{DateTime, Utc};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::Id;
/// Operator → agent: the version this device should run, and where to fetch it.
/// Written to KV key `agent-upgrade.<device_id>` in
/// [`crate::BUCKET_AGENT_UPGRADE`]. The agent acts only when
/// `desired_version` differs from what it's running; **any** change re-triggers
/// the state machine (so re-publishing the same version after a fix retries).
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentUpgradeMarker {
/// Target crate version, `MAJOR.MINOR.PATCH` (no `v` prefix, no build
/// metadata — the on-disk path is `/usr/bin/fleet-agent-v<version>`).
pub desired_version: String,
/// HTTPS URL of the versioned binary (Gitea/Harbor release asset, CDN, …).
pub url: String,
/// Lowercase hex SHA-256 of the binary. Verified before the binary is
/// staged; a mismatch fails the upgrade and the agent stays on its version.
pub sha256: String,
}
/// Phases of the agent upgrade state machine (ADR-022). `Running` is steady
/// state; the rest are the upgrade in flight. `Failed` and `Done` are terminal
/// for a given marker.
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub enum AgentUpgradePhase {
/// Normal reconcile loop; no upgrade in flight.
Running,
/// Refusing to start new services; in-flight reconciles drain.
Draining,
/// Fetching + verifying the new binary onto disk.
Staging,
/// Running the staged binary's `--self-test` before any swap.
Verifying,
/// Symlink swapped, new version started alongside; awaiting the operator's
/// stop signal (the operator only sends it after seeing the new version's
/// heartbeat — the agent never self-stops).
CutoverReady,
/// The upgrade did not complete; `last_error` says why. The agent stayed on
/// (or reverted to) its previous version.
Failed,
/// The new version is the active one; this marker is satisfied.
Done,
}
/// Agent → operator: current upgrade phase + the versions involved. Written to
/// KV key `agent-upgrade.<device_id>` in [`crate::BUCKET_AGENT_UPGRADE_STATUS`].
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentUpgradeStatus {
pub device_id: Id,
pub phase: AgentUpgradePhase,
/// Version the agent is currently running.
pub current_version: String,
/// Version the agent is upgrading toward (the marker's `desired_version`).
/// `None` when phase is `Running` with no marker.
#[serde(default)]
pub target_version: Option<String>,
#[serde(default)]
pub last_error: Option<String>,
pub at: DateTime<Utc>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn phase_wire_form_is_stable() {
// The serialized form lands in NATS KV + the Device CR; the operator
// and dashboard key off it. Pin it.
let s = |p: AgentUpgradePhase| serde_json::to_value(p).unwrap();
assert_eq!(s(AgentUpgradePhase::CutoverReady), "cutover-ready");
assert_eq!(s(AgentUpgradePhase::Running), "running");
assert_eq!(s(AgentUpgradePhase::Failed), "failed");
}
#[test]
fn marker_roundtrips() {
let m = AgentUpgradeMarker {
desired_version: "0.2.0".into(),
url: "https://git.nationtech.io/fleet/agent/v0.2.0".into(),
sha256: "abc123".into(),
};
let j = serde_json::to_string(&m).unwrap();
assert_eq!(serde_json::from_str::<AgentUpgradeMarker>(&j).unwrap(), m);
}
}

View File

@@ -1,4 +1,4 @@
use harmony_reconciler_contracts::InventorySnapshot;
use harmony_reconciler_contracts::{AgentUpgradePhase, InventorySnapshot};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use kube::CustomResource;
use schemars::JsonSchema;
@@ -118,6 +118,28 @@ pub struct DeviceStatus {
#[serde(skip_serializing_if = "Option::is_none")]
pub last_heartbeat: Option<String>,
pub reachability: Reachability,
/// Agent version the device is currently running, from its heartbeat
/// (ADR-022). `None` for a pre-upgrade agent that doesn't report it.
#[serde(skip_serializing_if = "Option::is_none")]
pub current_version: Option<String>,
/// In-flight agent upgrade, reflected from the agent's upgrade status.
/// `None` when no upgrade is active.
#[serde(skip_serializing_if = "Option::is_none")]
pub upgrade: Option<DeviceUpgradeStatus>,
}
/// Reflection of the agent's upgrade state machine onto the CR, so
/// `kubectl get device -o yaml` and the dashboard see upgrade progress.
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct DeviceUpgradeStatus {
/// Live phase of the agent's upgrade state machine. The wire enum itself —
/// not a stringified copy — so the set of values can't drift.
pub phase: AgentUpgradePhase,
#[serde(skip_serializing_if = "Option::is_none")]
pub target_version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_error: Option<String>,
}
/// Coarse liveness derived from heartbeat freshness. Failing/Pending

View File

@@ -13,5 +13,5 @@ pub mod crd;
pub use crd::{
AggregateLastError, Deployment, DeploymentAggregate, DeploymentSpec, DeploymentStatus, Device,
DeviceSpec, DeviceStatus, Reachability, Rollout, RolloutStrategy,
DeviceSpec, DeviceStatus, DeviceUpgradeStatus, Reachability, Rollout, RolloutStrategy,
};

View File

@@ -0,0 +1,20 @@
[package]
name = "harmony_downloadable_asset"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
description = "Download a file from a URL with SHA-256 checksum verification (shared by k3d binary install and the fleet agent self-upgrade)"
[dependencies]
reqwest = { version = "0.12", features = ["stream", "rustls-tls", "http2"], default-features = false }
sha2 = "0.10"
tokio = { workspace = true, features = ["fs", "io-util"] }
futures-util = "0.3"
url.workspace = true
thiserror.workspace = true
tracing.workspace = true
[dev-dependencies]
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
httptest = "0.16"

View File

@@ -0,0 +1,213 @@
//! Download a file from a URL and verify its SHA-256 before trusting it.
//!
//! Shared by k3d's binary install and the fleet agent's self-upgrade
//! (ADR-022): both fetch a versioned binary from a release host and must refuse
//! anything whose hash doesn't match the pin. Re-downloads are skipped when the
//! target already exists with the right checksum.
use std::io::Read;
use std::path::{Path, PathBuf};
use futures_util::StreamExt;
use sha2::{Digest, Sha256};
use tokio::fs;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use url::Url;
#[derive(Debug, thiserror::Error)]
pub enum DownloadError {
#[error("failed to create download directory {0}: {1}")]
CreateDir(PathBuf, #[source] std::io::Error),
#[error("request to {0} failed: {1}")]
Request(Url, #[source] reqwest::Error),
#[error("download of {url} failed with status {status}")]
Status {
url: Url,
status: reqwest::StatusCode,
},
#[error("writing {0} failed: {1}")]
Write(PathBuf, #[source] std::io::Error),
#[error("error streaming download body: {0}")]
Stream(#[source] reqwest::Error),
#[error("checksum mismatch for {path}: expected {expected}, got {actual}")]
Checksum {
path: PathBuf,
expected: String,
actual: String,
},
}
/// A file to fetch from `url` into `file_name`, gated on `checksum` (lowercase
/// hex SHA-256).
#[derive(Debug, Clone)]
pub struct DownloadableAsset {
pub url: Url,
pub file_name: String,
pub checksum: String,
}
impl DownloadableAsset {
/// SHA-256 of `file` as lowercase hex, or `None` if it can't be read.
fn sha256_hex(file: &Path) -> Option<String> {
let mut f = std::fs::File::open(file).ok()?;
let mut hasher = Sha256::new();
let mut buf = [0u8; 1024 * 1024];
loop {
match f.read(&mut buf) {
Ok(0) => break,
Ok(n) => hasher.update(&buf[..n]),
Err(e) => {
tracing::warn!(error = %e, "error reading file for checksum");
return None;
}
}
}
Some(format!("{:x}", hasher.finalize()))
}
fn matches_checksum(&self, file: &Path) -> bool {
Self::sha256_hex(file).is_some_and(|h| h == self.checksum)
}
/// Download into `folder/<file_name>`, verifying the checksum. Skips the
/// download when the file already exists with the right checksum. Returns
/// the path on success.
pub async fn download_to_path(&self, folder: PathBuf) -> Result<PathBuf, DownloadError> {
if !folder.exists() {
fs::create_dir_all(&folder)
.await
.map_err(|e| DownloadError::CreateDir(folder.clone(), e))?;
}
let target = folder.join(&self.file_name);
if self.matches_checksum(&target) {
tracing::debug!(
?target,
"already present with correct checksum; skipping download"
);
return Ok(target);
}
tracing::debug!(url = %self.url, "downloading");
let response = reqwest::Client::new()
.get(self.url.clone())
.send()
.await
.map_err(|e| DownloadError::Request(self.url.clone(), e))?;
if !response.status().is_success() {
return Err(DownloadError::Status {
url: self.url.clone(),
status: response.status(),
});
}
let mut file = File::create(&target)
.await
.map_err(|e| DownloadError::Write(target.clone(), e))?;
let mut stream = response.bytes_stream();
while let Some(chunk) = stream.next().await {
let chunk = chunk.map_err(DownloadError::Stream)?;
file.write_all(&chunk)
.await
.map_err(|e| DownloadError::Write(target.clone(), e))?;
}
file.flush()
.await
.map_err(|e| DownloadError::Write(target.clone(), e))?;
drop(file);
match Self::sha256_hex(&target) {
Some(actual) if actual == self.checksum => Ok(target),
actual => Err(DownloadError::Checksum {
path: target,
expected: self.checksum.clone(),
actual: actual.unwrap_or_else(|| "<unreadable>".to_string()),
}),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use httptest::{Expectation, Server, matchers, responders};
const CONTENT: &str = "This is a test file.";
const CONTENT_SHA256: &str = "f29bc64a9d3732b4b9035125fdb3285f5b6455778edca72414671e0ca3b2e0de";
fn tmp() -> PathBuf {
let id = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_nanos();
let p = std::env::temp_dir().join(format!("harmony-dl-{id}"));
std::fs::create_dir_all(&p).unwrap();
p
}
#[tokio::test]
async fn downloads_and_verifies() {
let server = Server::run();
server.expect(
Expectation::matching(httptest::matchers::request::method_path("GET", "/f.bin"))
.respond_with(responders::status_code(200).body(CONTENT)),
);
let asset = DownloadableAsset {
url: Url::parse(&server.url("/f.bin").to_string()).unwrap(),
file_name: "f.bin".to_string(),
checksum: CONTENT_SHA256.to_string(),
};
let path = asset.download_to_path(tmp()).await.unwrap();
assert_eq!(std::fs::read_to_string(path).unwrap(), CONTENT);
}
#[tokio::test]
async fn skips_when_already_present_with_matching_checksum() {
let server = Server::run();
server.expect(
Expectation::matching(matchers::any())
.times(0)
.respond_with(responders::status_code(200).body(CONTENT)),
);
let folder = tmp();
std::fs::write(folder.join("f.bin"), CONTENT).unwrap();
let asset = DownloadableAsset {
url: Url::parse(&server.url("/f.bin").to_string()).unwrap(),
file_name: "f.bin".to_string(),
checksum: CONTENT_SHA256.to_string(),
};
asset.download_to_path(folder).await.unwrap();
}
#[tokio::test]
async fn rejects_bad_checksum() {
let server = Server::run();
server.expect(
Expectation::matching(matchers::any())
.respond_with(responders::status_code(200).body("not the expected content")),
);
let asset = DownloadableAsset {
url: Url::parse(&server.url("/f.bin").to_string()).unwrap(),
file_name: "f.bin".to_string(),
checksum: CONTENT_SHA256.to_string(),
};
let err = asset.download_to_path(tmp()).await.unwrap_err();
assert!(matches!(err, DownloadError::Checksum { .. }));
}
#[tokio::test]
async fn surfaces_http_error() {
let server = Server::run();
server.expect(
Expectation::matching(matchers::any()).respond_with(responders::status_code(404)),
);
let asset = DownloadableAsset {
url: Url::parse(&server.url("/f.bin").to_string()).unwrap(),
file_name: "f.bin".to_string(),
checksum: CONTENT_SHA256.to_string(),
};
let err = asset.download_to_path(tmp()).await.unwrap_err();
assert!(matches!(err, DownloadError::Status { .. }));
}
}

View File

@@ -13,11 +13,9 @@ octocrab = "0.44.0"
regex = "1.11.1"
reqwest = { version = "0.12", features = ["stream", "rustls-tls", "http2"], default-features = false }
url.workspace = true
sha2 = "0.10.8"
futures-util = "0.3.31"
kube.workspace = true
harmony_downloadable_asset = { path = "../harmony_downloadable_asset" }
[dev-dependencies]
env_logger = { workspace = true }
httptest = "0.16.3"
pretty_assertions = "1.4.1"

View File

@@ -1,303 +0,0 @@
use futures_util::StreamExt;
use log::{debug, warn};
use sha2::{Digest, Sha256};
use std::io::Read;
use std::path::PathBuf;
use tokio::fs;
use tokio::fs::File;
use tokio::io::AsyncWriteExt;
use url::Url;
const CHECKSUM_FAILED_MSG: &str = "Downloaded file failed checksum verification";
/// Represents an asset that can be downloaded from a URL with checksum verification.
///
/// This struct facilitates secure downloading of files from remote URLs by
/// verifying the integrity of the downloaded content using SHA-256 checksums.
/// It handles downloading the file, saving it to disk, and verifying the checksum matches
/// the expected value.
///
/// # Examples
///
/// ```compile_fail
/// # use url::Url;
/// # use std::path::PathBuf;
///
/// # async fn example() -> Result<(), String> {
/// let asset = DownloadableAsset {
/// url: Url::parse("https://example.com/file.zip").unwrap(),
/// file_name: "file.zip".to_string(),
/// checksum: "a1b2c3d4e5f6...".to_string(),
/// };
///
/// let download_dir = PathBuf::from("/tmp/downloads");
/// let file_path = asset.download_to_path(download_dir).await?;
/// # Ok(())
/// # }
/// ```
#[derive(Debug)]
pub(crate) struct DownloadableAsset {
pub(crate) url: Url,
pub(crate) file_name: String,
pub(crate) checksum: String,
}
impl DownloadableAsset {
fn verify_checksum(&self, file: PathBuf) -> bool {
if !file.exists() {
debug!("File does not exist: {:?}", file);
return false;
}
let mut file = match std::fs::File::open(&file) {
Ok(file) => file,
Err(e) => {
warn!("Failed to open file for checksum verification: {:?}", e);
return false;
}
};
let mut hasher = Sha256::new();
let mut buffer = [0; 1024 * 1024]; // 1MB buffer
loop {
let bytes_read = match file.read(&mut buffer) {
Ok(0) => break,
Ok(n) => n,
Err(e) => {
warn!("Error reading file for checksum: {:?}", e);
return false;
}
};
hasher.update(&buffer[..bytes_read]);
}
let result = hasher.finalize();
let calculated_hash = format!("{:x}", result);
debug!("Expected checksum: {}", self.checksum);
debug!("Calculated checksum: {}", calculated_hash);
calculated_hash == self.checksum
}
/// Downloads the asset to the specified directory, verifying its checksum.
///
/// This function will:
/// 1. Create the target directory if it doesn't exist
/// 2. Check if the file already exists with the correct checksum
/// 3. If not, download the file from the URL
/// 4. Verify the downloaded file's checksum matches the expected value
///
/// # Arguments
///
/// * `folder` - The directory path where the file should be saved
///
/// # Returns
///
/// * `Ok(PathBuf)` - The path to the downloaded file on success
/// * `Err(String)` - A descriptive error message if the download or verification fails
///
/// # Errors
///
/// This function will return an error if:
/// - The network request fails
/// - The server responds with a non-success status code
/// - Writing to disk fails
/// - The checksum verification fails
pub(crate) async fn download_to_path(&self, folder: PathBuf) -> Result<PathBuf, String> {
if !folder.exists() {
fs::create_dir_all(&folder)
.await
.expect("Failed to create download directory");
}
let target_file_path = folder.join(&self.file_name);
debug!("Downloading to path: {:?}", target_file_path);
if self.verify_checksum(target_file_path.clone()) {
debug!("File already exists with correct checksum, skipping download");
return Ok(target_file_path);
}
debug!("Downloading from URL: {}", self.url);
let client = reqwest::Client::new();
let response = client
.get(self.url.clone())
.send()
.await
.map_err(|e| format!("Failed to download file: {e}"))?;
if !response.status().is_success() {
return Err(format!(
"Failed to download file, status: {}",
response.status()
));
}
let mut file = File::create(&target_file_path)
.await
.expect("Failed to create target file");
let mut stream = response.bytes_stream();
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result.expect("Error while downloading file");
file.write_all(&chunk)
.await
.expect("Failed to write data to file");
}
file.flush().await.expect("Failed to flush file");
drop(file);
if !self.verify_checksum(target_file_path.clone()) {
return Err(CHECKSUM_FAILED_MSG.to_string());
}
debug!(
"File downloaded and verified successfully: {}",
target_file_path.to_string_lossy()
);
Ok(target_file_path)
}
}
#[cfg(test)]
mod tests {
use super::*;
use httptest::{
matchers::{self, request},
responders, Expectation, Server,
};
const BASE_TEST_PATH: &str = "/tmp/harmony-test-k3d-download";
const TEST_CONTENT: &str = "This is a test file.";
const TEST_CONTENT_HASH: &str =
"f29bc64a9d3732b4b9035125fdb3285f5b6455778edca72414671e0ca3b2e0de";
fn setup_test() -> (PathBuf, Server) {
let _ = env_logger::builder().try_init();
// Create unique test directory
let test_id = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_millis();
let download_path = format!("{}/test_{}", BASE_TEST_PATH, test_id);
std::fs::create_dir_all(&download_path).unwrap();
(PathBuf::from(download_path), Server::run())
}
#[tokio::test]
async fn test_download_to_path_success() {
let (folder, server) = setup_test();
server.expect(
Expectation::matching(request::method_path("GET", "/test.txt"))
.respond_with(responders::status_code(200).body(TEST_CONTENT)),
);
let asset = DownloadableAsset {
url: Url::parse(&server.url("/test.txt").to_string()).unwrap(),
file_name: "test.txt".to_string(),
checksum: TEST_CONTENT_HASH.to_string(),
};
let result = asset
.download_to_path(folder.join("success"))
.await
.unwrap();
let downloaded_content = std::fs::read_to_string(result).unwrap();
assert_eq!(downloaded_content, TEST_CONTENT);
}
#[tokio::test]
async fn test_download_to_path_already_exists() {
let (folder, server) = setup_test();
server.expect(
Expectation::matching(matchers::any())
.times(0)
.respond_with(responders::status_code(200).body(TEST_CONTENT)),
);
let asset = DownloadableAsset {
url: Url::parse(&server.url("/test.txt").to_string()).unwrap(),
file_name: "test.txt".to_string(),
checksum: TEST_CONTENT_HASH.to_string(),
};
let target_file_path = folder.join(&asset.file_name);
std::fs::write(&target_file_path, TEST_CONTENT).unwrap();
let result = asset.download_to_path(folder).await.unwrap();
let content = std::fs::read_to_string(result).unwrap();
assert_eq!(content, TEST_CONTENT);
}
#[tokio::test]
async fn test_download_to_path_server_error() {
let (folder, server) = setup_test();
server.expect(
Expectation::matching(matchers::any()).respond_with(responders::status_code(404)),
);
let asset = DownloadableAsset {
url: Url::parse(&server.url("/test.txt").to_string()).unwrap(),
file_name: "test.txt".to_string(),
checksum: TEST_CONTENT_HASH.to_string(),
};
let result = asset.download_to_path(folder.join("error")).await;
assert!(result.is_err());
assert!(result.unwrap_err().contains("status: 404"));
}
#[tokio::test]
async fn test_download_to_path_checksum_failure() {
let (folder, server) = setup_test();
let invalid_content = "This is NOT the expected content";
server.expect(
Expectation::matching(matchers::any())
.respond_with(responders::status_code(200).body(invalid_content)),
);
let asset = DownloadableAsset {
url: Url::parse(&server.url("/test.txt").to_string()).unwrap(),
file_name: "test.txt".to_string(),
checksum: TEST_CONTENT_HASH.to_string(),
};
let join_handle =
tokio::spawn(async move { asset.download_to_path(folder.join("failure")).await });
assert_eq!(
join_handle.await.unwrap().err().unwrap(),
CHECKSUM_FAILED_MSG
);
}
#[tokio::test]
async fn test_download_with_specific_path_matcher() {
let (folder, server) = setup_test();
server.expect(
Expectation::matching(matchers::request::path("/specific/path.txt"))
.respond_with(responders::status_code(200).body(TEST_CONTENT)),
);
let asset = DownloadableAsset {
url: Url::parse(&server.url("/specific/path.txt").to_string()).unwrap(),
file_name: "path.txt".to_string(),
checksum: TEST_CONTENT_HASH.to_string(),
};
let result = asset.download_to_path(folder).await.unwrap();
let downloaded_content = std::fs::read_to_string(result).unwrap();
assert_eq!(downloaded_content, TEST_CONTENT);
}
}

View File

@@ -1,5 +1,4 @@
mod downloadable_asset;
use downloadable_asset::*;
use harmony_downloadable_asset::DownloadableAsset;
use kube::Client;
use log::{debug, info};
@@ -134,7 +133,10 @@ impl K3d {
let release_binary = self.get_binary_for_current_platform(latest_release).await;
debug!("Foudn K3d binary to install : {release_binary:#?}");
release_binary.download_to_path(self.base_dir.clone()).await
release_binary
.download_to_path(self.base_dir.clone())
.await
.map_err(|e| e.to_string())
}
// TODO : Make sure this will only find actual released versions, no prereleases or test