diff --git a/Cargo.lock b/Cargo.lock index ef60028b..041c5bb7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4066,6 +4066,7 @@ dependencies = [ "harmony", "harmony-fleet-auth", "harmony-reconciler-contracts", + "harmony_downloadable_asset", "serde", "serde_json", "thiserror 2.0.18", @@ -4073,6 +4074,7 @@ dependencies = [ "toml", "tracing", "tracing-subscriber", + "url", ] [[package]] @@ -4390,6 +4392,20 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "harmony_downloadable_asset" +version = "0.1.0" +dependencies = [ + "futures-util", + "httptest", + "reqwest 0.12.28", + "sha2 0.10.9", + "thiserror 2.0.18", + "tokio", + "tracing", + "url", +] + [[package]] name = "harmony_execution" version = "0.1.0" @@ -5507,15 +5523,13 @@ version = "0.1.0" dependencies = [ "async-trait", "env_logger", - "futures-util", - "httptest", + "harmony_downloadable_asset", "kube", "log", "octocrab", "pretty_assertions", "regex", "reqwest 0.12.28", - "sha2 0.10.9", "tokio", "url", ] diff --git a/Cargo.toml b/Cargo.toml index 93946321..5b935439 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -5,6 +5,7 @@ members = [ "private_repos/*", "harmony", "harmony_zitadel_auth", + "harmony_downloadable_asset", "harmony_types", "harmony_macros", "harmony_tui", diff --git a/ROADMAP/fleet_platform/ch4-agent-upgrade-status.md b/ROADMAP/fleet_platform/ch4-agent-upgrade-status.md new file mode 100644 index 00000000..5f13581c --- /dev/null +++ b/ROADMAP/fleet_platform/ch4-agent-upgrade-status.md @@ -0,0 +1,48 @@ +# Ch4 — Agent self-upgrade + auto-rollback (ADR-022): status + +Built the full ADR-022 protocol end to end. The state-machine "brain" and the +operator's commit decision are exhaustively unit-tested; the OS side-effects sit +behind a seam so they're faked in tests and real on-device. + +## Shipped + +| Piece | Where | Tested | +|---|---|---| +| Wire types: marker, phase, status, `agent_version` on heartbeat, `Verb::UpgradeStop` | `harmony-reconciler-contracts/src/upgrade.rs`, `kv.rs`, `commands.rs`, `fleet.rs` | unit | +| Shared download+SHA-256 verify (lifted from k3d) | new crate `harmony_downloadable_asset` | unit (httptest) | +| Agent state machine `drive` (Staging→Verifying→CutoverReady→stop/revert) | `harmony-fleet-agent/src/upgrade.rs` | **6 unit tests** incl. timeout-revert, stage/self-test/cutover failure | +| `UpgradeExecutor` seam + real `SystemdUpgradeExecutor` (download, `--self-test`, atomic symlink swap, `systemd-run` transient unit, revert) | same | seam fake-tested; real impl self-heals layout | +| `--self-test` flag | `harmony-fleet-agent/src/main.rs` | — | +| `Verb::UpgradeStop` handling + armed `UpgradeStopSignal` (only the cutover-waiting old agent acts) | `command_server.rs`, `upgrade.rs` | — | +| Operator coordinator: send stop **only after** observing the new version's heartbeat; reflect version + phase to the `Device` CR | `harmony-fleet-operator/src/upgrade_coordinator.rs` | **2 unit tests** on the commit decision | +| `FleetCommandsClient::upgrade_stop` | `commands.rs` | — | +| `Device.status.{currentVersion, upgrade}` | `crd.rs` | — | + +Load-bearing properties from the ADR are intact: old verifies new +(`--self-test`); operator commits the stop (single source of truth, never the +agent); rollback is the same code path (revert symlink + stop transient unit on +self-test failure / heartbeat-timeout); no version is GC'd. + +## Deviations (deliberate) + +- **Marker + status ride NATS KV** (`agent-upgrade` / `agent-upgrade-status`), + not a fire-and-forget subject, so they survive an operator restart — same + ethos as Ch2. The ADR's `device-cmd.*`/`device-state.*.upgrade` subjects map + onto: the existing command protocol (`Verb::UpgradeStop`) and the status KV. +- **First-upgrade rollback without M1.** The real executor `capture_revert_target` + preserves the running binary at its versioned path on first cutover even when + the initial install put a plain file at `/usr/local/bin/fleet-agent`. This + makes M1 a clean-install nicety, not a rollback-correctness prerequisite. + +## Flagged for a supervised run (not done tonight) + +1. **M1 clean install layout** — `FleetDeviceSetupScore` should install to + `/usr/bin/fleet-agent-v` + symlink `/usr/local/bin/fleet-agent` from the + start. Needs a new `agent_version` config field (≈9 construction sites) and a + `FileSource::Symlink` delivery primitive (ansible `state: link`). The executor + self-heal above covers correctness in the meantime. +2. **libvirt vX→vX+1 e2e + corrupt-binary auto-revert** — needs two built agent + binaries, a served URL reachable from the VM, and a KVM run. The VM harness + exists (`harmony-fleet-e2e/src/vm`); the protocol brain is unit-green, so this + is an integration proof to run on real hardware. The corrupt-binary path is + already unit-proven via `stage_failure_*` / `heartbeat_timeout_reverts_*`. diff --git a/examples/fleet_load_test/src/main.rs b/examples/fleet_load_test/src/main.rs index 1cfd09e7..5e4da069 100644 --- a/examples/fleet_load_test/src/main.rs +++ b/examples/fleet_load_test/src/main.rs @@ -527,6 +527,7 @@ async fn simulate_heartbeat_loop( let hb = HeartbeatPayload { device_id: Id::from(device.device_id.clone()), at: Utc::now(), + agent_version: "load-test".to_string(), }; if let Ok(payload) = serde_json::to_vec(&hb) { if bucket.put(&hb_key, payload.into()).await.is_ok() { diff --git a/fleet/harmony-fleet-agent/Cargo.toml b/fleet/harmony-fleet-agent/Cargo.toml index f1ae2d5a..a8a7c544 100644 --- a/fleet/harmony-fleet-agent/Cargo.toml +++ b/fleet/harmony-fleet-agent/Cargo.toml @@ -7,7 +7,9 @@ rust-version = "1.85" [dependencies] harmony-fleet-auth = { path = "../harmony-fleet-auth" } harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" } +harmony_downloadable_asset = { path = "../../harmony_downloadable_asset" } harmony = { path = "../../harmony", default-features = false, features = ["podman"] } +url = { workspace = true } async-nats = { workspace = true } async-trait = { workspace = true } chrono = { workspace = true } diff --git a/fleet/harmony-fleet-agent/src/command_server.rs b/fleet/harmony-fleet-agent/src/command_server.rs index 0af9dbbe..e28699e1 100644 --- a/fleet/harmony-fleet-agent/src/command_server.rs +++ b/fleet/harmony-fleet-agent/src/command_server.rs @@ -25,6 +25,8 @@ use harmony_reconciler_contracts::{ use serde::Serialize; use thiserror::Error; +use crate::upgrade::UpgradeStopSignal; + /// Hard cap on a single exec's combined output. Keeps the JSON reply /// comfortably under NATS's default 1 MiB message limit. const EXEC_MAX_OUTPUT: usize = 256 * 1024; @@ -38,15 +40,20 @@ pub struct CommandServer { client: Client, agent_version: &'static str, started_at: Instant, + /// Fired when the operator sends `Verb::UpgradeStop` (ADR-022 cutover). The + /// upgrade manager awaits it and then exits the process so the new version + /// takes over. The agent never self-stops on its own observation. + upgrade_stop: Arc, } impl CommandServer { - pub fn new(device_id: Id, client: Client) -> Self { + pub fn new(device_id: Id, client: Client, upgrade_stop: Arc) -> Self { Self { device_id, client, agent_version: env!("CARGO_PKG_VERSION"), started_at: Instant::now(), + upgrade_stop, } } @@ -115,6 +122,29 @@ impl CommandServer { Err(e) => return Err(CommandError::BadRequestBody(e.to_string())), }; self.reply_exec(reply_to, &command).await + } else if verb_token == Verb::UpgradeStop.as_subject_token() { + let reason = match serde_json::from_slice::(&msg.payload) { + Ok(CommandRequest::UpgradeStop { reason }) => reason, + Ok(_) => "upgrade-stop".to_string(), + // Tolerate a bodyless stop — the operator's intent is clear. + Err(_) => "upgrade-stop".to_string(), + }; + tracing::info!(%reason, "received upgrade-stop; signalling cutover exit"); + // Ack first so the operator knows we got it, then fire the signal. + let ack = serde_json::to_vec(&PingReply { + device_id: self.device_id.clone(), + agent_version: self.agent_version.to_string(), + uptime_s: self.started_at.elapsed().as_secs(), + }) + .map_err(CommandError::SerializeReply)?; + self.client + .publish(reply_to, ack.into()) + .await + .map_err(|e| CommandError::PublishReply(e.to_string()))?; + if !self.upgrade_stop.fire() { + tracing::warn!("upgrade-stop received but no upgrade is in cutover; ignoring"); + } + Ok(()) } else { tracing::warn!(verb = %verb_token, "unknown command verb"); Err(CommandError::UnknownVerb(verb_token.to_string())) diff --git a/fleet/harmony-fleet-agent/src/fleet_publisher.rs b/fleet/harmony-fleet-agent/src/fleet_publisher.rs index f0e82d81..3e45e5f0 100644 --- a/fleet/harmony-fleet-agent/src/fleet_publisher.rs +++ b/fleet/harmony-fleet-agent/src/fleet_publisher.rs @@ -97,6 +97,7 @@ impl FleetPublisher { let hb = HeartbeatPayload { device_id: self.device_id.clone(), at: chrono::Utc::now(), + agent_version: env!("CARGO_PKG_VERSION").to_string(), }; let key = device_heartbeat_key(&self.device_id.to_string()); match serde_json::to_vec(&hb) { diff --git a/fleet/harmony-fleet-agent/src/main.rs b/fleet/harmony-fleet-agent/src/main.rs index 183ba6c6..e469b835 100644 --- a/fleet/harmony-fleet-agent/src/main.rs +++ b/fleet/harmony-fleet-agent/src/main.rs @@ -2,6 +2,7 @@ mod command_server; mod config; mod fleet_publisher; mod reconciler; +mod upgrade; use std::sync::Arc; use std::time::Duration; @@ -43,6 +44,24 @@ struct Cli { default_value = "/etc/fleet-agent/config.toml" )] config: std::path::PathBuf, + + /// ADR-022 self-test: load config, connect NATS (validates JWT), print + /// version + "ok", exit 0 — or exit non-zero on any failure. The upgrade + /// state machine runs this on a staged binary before cutover. + #[arg(long)] + self_test: bool, +} + +/// ADR-022 `--self-test`: prove this binary can parse its config and reach NATS +/// with a valid JWT, then exit. No state mutation, no long-lived loops. +async fn self_test(cfg: &AgentConfig) -> Result<()> { + let creds = credential_source_from_config(&cfg.credentials) + .context("self-test: building NATS credential source")?; + connect_nats(cfg, creds) + .await + .context("self-test: NATS connect / JWT validation")?; + println!("fleet-agent v{} self-test ok", env!("CARGO_PKG_VERSION")); + Ok(()) } async fn connect_nats(cfg: &AgentConfig, creds: Creds) -> Result { @@ -198,6 +217,13 @@ async fn main() -> Result<()> { let cli = Cli::parse(); let cfg = config::load_config(&cli.config)?; + + // ADR-022: the upgrade state machine invokes the staged binary with + // `--self-test` before any symlink swap. Exit code is the verdict. + if cli.self_test { + return self_test(&cfg).await; + } + tracing::info!( device_id = %cfg.agent.device_id, runtime_enabled = cfg.agent.runtime_enabled, @@ -277,7 +303,14 @@ async fn main() -> Result<()> { )) }); - let command_server = Arc::new(CommandServer::new(device_id.clone(), client.clone())); + // Fired by the command server on `Verb::UpgradeStop`; awaited by the + // upgrade manager, which then exits the process for the new version. + let upgrade_stop = Arc::new(upgrade::UpgradeStopSignal::default()); + let command_server = Arc::new(CommandServer::new( + device_id.clone(), + client.clone(), + upgrade_stop.clone(), + )); let ctrlc = async { tokio::signal::ctrl_c().await.ok(); @@ -312,6 +345,16 @@ async fn main() -> Result<()> { }; let heartbeat = publish_heartbeat_loop(fleet); let commands = command_server.run(); + // ADR-022 self-upgrade manager. Returns only when the operator commits a + // cutover (it sent `UpgradeStop`) — that completes the select and the + // process exits 0, so the already-started new version takes over and + // systemd (Restart=on-failure) does not bring the old one back. + let upgrade = upgrade::run( + client.clone(), + device_id.clone(), + env!("CARGO_PKG_VERSION").to_string(), + upgrade_stop.clone(), + ); tokio::select! { // Waiting on ctrlc in a select will automatically terminate other branches when @@ -322,6 +365,7 @@ async fn main() -> Result<()> { _ = reconcile => {} _ = heartbeat => {} r = commands => { r?; } + r = upgrade => { r?; tracing::info!("agent exiting for upgrade cutover"); } } Ok(()) diff --git a/fleet/harmony-fleet-agent/src/upgrade.rs b/fleet/harmony-fleet-agent/src/upgrade.rs new file mode 100644 index 00000000..c3970526 --- /dev/null +++ b/fleet/harmony-fleet-agent/src/upgrade.rs @@ -0,0 +1,663 @@ +//! Agent self-upgrade state machine (ADR-022). +//! +//! The operator publishes an [`AgentUpgradeMarker`] (desired version + signed +//! URL + SHA-256). The agent stages the new binary, verifies it with +//! `--self-test`, atomically swaps the `/usr/local/bin/fleet-agent` symlink, and +//! starts the new version alongside itself. It then **waits for the operator's +//! stop signal** — the agent never self-stops (the operator is the single +//! source of truth, and only commits the stop after it has seen the new +//! version's heartbeat). If the new version never reports healthy within +//! [`HEARTBEAT_TIMEOUT`], the agent reverts the symlink and stays on its +//! current version. Rollback is the same code path as upgrade. +//! +//! The OS side-effects (download, `--self-test`, symlink swap, systemd) sit +//! behind [`UpgradeExecutor`] so [`drive`] — the state machine "brain" — is +//! unit-tested with a fake; [`SystemdUpgradeExecutor`] does the real work. + +use std::path::{Path, PathBuf}; +use std::time::Duration; + +use harmony_reconciler_contracts::AgentUpgradeMarker; +use harmony_reconciler_contracts::AgentUpgradePhase::{ + self, CutoverReady, Done, Failed, Staging, Verifying, +}; + +/// Directory holding the immutable versioned binaries (`fleet-agent-v`). +pub const VERSIONED_BIN_DIR: &str = "/usr/bin"; +/// The stable path the systemd unit's `ExecStart` references. Symlink swap of +/// this path is the cutover primitive (POSIX-atomic rename). +pub const ACTIVE_SYMLINK: &str = "/usr/local/bin/fleet-agent"; +/// How long the old agent waits for the new version's heartbeat after cutover +/// before reverting (ADR-022 suggests 60s). +pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(60); + +#[derive(Debug, thiserror::Error)] +pub enum UpgradeError { + #[error("invalid binary URL: {0}")] + Url(String), + #[error(transparent)] + Download(#[from] harmony_downloadable_asset::DownloadError), + #[error("filesystem error: {0}")] + Io(#[from] std::io::Error), + #[error("self-test exited with {0}")] + SelfTest(String), + #[error("systemd error: {0}")] + Systemd(String), + #[error("no previous symlink target recorded to revert to")] + NothingToRevert, +} + +/// The OS-level operations the upgrade needs. Behind a trait so [`drive`] is +/// unit-testable without touching disk or systemd. +pub trait UpgradeExecutor { + /// Download + verify the binary and place it at its versioned path. Returns + /// the staged path. + async fn stage(&self, marker: &AgentUpgradeMarker) -> Result; + /// Run the staged binary's `--self-test`. Err if it doesn't exit 0. + async fn self_test(&self, binary: &Path) -> Result<(), UpgradeError>; + /// Atomically point [`ACTIVE_SYMLINK`] at `binary` (recording the previous + /// target for revert) and start the new version alongside the old. + async fn cutover(&self, binary: &Path, version: &str) -> Result<(), UpgradeError>; + /// Restore the pre-cutover symlink target and stop the new version. + async fn revert(&self) -> Result<(), UpgradeError>; +} + +/// Sink for upgrade phase transitions (→ operator via KV). +pub trait StatusSink { + async fn publish(&self, phase: AgentUpgradePhase, last_error: Option); +} + +/// What the agent process should do after an upgrade attempt. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum UpgradeAction { + /// Cutover succeeded and the operator signalled stop — exit(0) so the new + /// version (already running) takes over cleanly. + Exit, + /// The upgrade failed or was reverted — stay running on the current version. + StayedOnCurrent, +} + +/// The state machine. Stage → verify → cutover → wait for the operator's stop +/// (with a heartbeat-timeout revert). Pure orchestration over the injected +/// executor, status sink, and post-cutover signal futures. +/// +/// `new_version_healthy` resolves when a heartbeat from the new version is +/// observed; `operator_stop` resolves when the operator sends the stop signal. +pub async fn drive( + marker: &AgentUpgradeMarker, + executor: &E, + status: &S, + new_version_healthy: impl std::future::Future, + operator_stop: impl std::future::Future, + heartbeat_timeout: Duration, +) -> UpgradeAction +where + E: UpgradeExecutor, + S: StatusSink, +{ + status.publish(Staging, None).await; + let staged = match executor.stage(marker).await { + Ok(p) => p, + Err(e) => { + status.publish(Failed, Some(e.to_string())).await; + return UpgradeAction::StayedOnCurrent; + } + }; + + status.publish(Verifying, None).await; + if let Err(e) = executor.self_test(&staged).await { + status + .publish(Failed, Some(format!("self-test failed: {e}"))) + .await; + return UpgradeAction::StayedOnCurrent; + } + + status.publish(CutoverReady, None).await; + if let Err(e) = executor.cutover(&staged, &marker.desired_version).await { + let _ = executor.revert().await; + status + .publish(Failed, Some(format!("cutover failed: {e}"))) + .await; + return UpgradeAction::StayedOnCurrent; + } + + // Post-cutover: race the operator's stop against the new version becoming + // healthy and the timeout. Stop ⇒ exit. Healthy ⇒ wait indefinitely for + // stop (operator-outage-safe — two agents is wasteful but never blind). + // Timeout with no healthy heartbeat ⇒ revert and stay. + tokio::pin!(new_version_healthy); + tokio::pin!(operator_stop); + tokio::select! { + _ = &mut operator_stop => { + status.publish(Done, None).await; + return UpgradeAction::Exit; + } + _ = &mut new_version_healthy => {} + _ = tokio::time::sleep(heartbeat_timeout) => { + let _ = executor.revert().await; + status + .publish(Failed, Some("new version did not report healthy in time; reverted".into())) + .await; + return UpgradeAction::StayedOnCurrent; + } + } + + // Healthy observed — commit is now the operator's; wait for its stop. + operator_stop.await; + status.publish(Done, None).await; + UpgradeAction::Exit +} + +// ───────────────────────── real executor ───────────────────────── + +/// Production [`UpgradeExecutor`]: download+verify, `--self-test`, atomic +/// symlink swap, and a systemd transient unit for the new version. +pub struct SystemdUpgradeExecutor { + /// The version this (old) agent is running — used to preserve its binary at + /// a versioned path so `revert` always has a target, even on a pre-M1 plain + /// install where `ACTIVE_SYMLINK` is a regular file. + current_version: String, + /// Previous symlink target, captured at cutover so `revert` can restore it. + previous_target: tokio::sync::Mutex>, + /// Transient systemd unit name for the started new version, for `revert`. + started_unit: tokio::sync::Mutex>, +} + +impl SystemdUpgradeExecutor { + pub fn new(current_version: String) -> Self { + Self { + current_version, + previous_target: tokio::sync::Mutex::new(None), + started_unit: tokio::sync::Mutex::new(None), + } + } + + fn transient_unit(version: &str) -> String { + format!("fleet-agent-v{version}") + } + + /// The revert target: the existing symlink target if `ACTIVE_SYMLINK` is a + /// symlink, else the running binary preserved at its versioned path (so a + /// plain-file initial install can still be rolled back to). + fn capture_revert_target(&self) -> PathBuf { + if let Ok(target) = std::fs::read_link(ACTIVE_SYMLINK) { + return target; + } + let preserved = + PathBuf::from(VERSIONED_BIN_DIR).join(format!("fleet-agent-v{}", self.current_version)); + if !preserved.exists() { + if let Ok(exe) = std::env::current_exe() { + if let Err(e) = std::fs::copy(&exe, &preserved) { + tracing::warn!(error = %e, "upgrade: could not preserve current binary"); + } + } + } + preserved + } +} + +impl UpgradeExecutor for SystemdUpgradeExecutor { + async fn stage(&self, marker: &AgentUpgradeMarker) -> Result { + use harmony_downloadable_asset::DownloadableAsset; + let asset = DownloadableAsset { + url: url::Url::parse(&marker.url).map_err(|e| UpgradeError::Url(e.to_string()))?, + file_name: format!("fleet-agent-v{}", marker.desired_version), + checksum: marker.sha256.clone(), + }; + let path = asset + .download_to_path(PathBuf::from(VERSIONED_BIN_DIR)) + .await?; + // Make it executable (0755). + let mut perms = std::fs::metadata(&path)?.permissions(); + use std::os::unix::fs::PermissionsExt; + perms.set_mode(0o755); + std::fs::set_permissions(&path, perms)?; + Ok(path) + } + + async fn self_test(&self, binary: &Path) -> Result<(), UpgradeError> { + let status = tokio::process::Command::new(binary) + .arg("--self-test") + .status() + .await + .map_err(|e| UpgradeError::SelfTest(e.to_string()))?; + if status.success() { + Ok(()) + } else { + Err(UpgradeError::SelfTest(format!("exit {status}"))) + } + } + + async fn cutover(&self, binary: &Path, version: &str) -> Result<(), UpgradeError> { + // Record (and, if needed, preserve) the current binary so revert can + // restore it. + *self.previous_target.lock().await = Some(self.capture_revert_target()); + + // Atomic swap: symlink to a temp name, rename over the active path. + let tmp = PathBuf::from(format!("{ACTIVE_SYMLINK}.new")); + let _ = std::fs::remove_file(&tmp); + std::os::unix::fs::symlink(binary, &tmp)?; + std::fs::rename(&tmp, ACTIVE_SYMLINK)?; + + // Start the new version alongside us as a transient systemd unit, so the + // operator can observe its heartbeat before committing the stop. + let unit = Self::transient_unit(version); + let out = tokio::process::Command::new("systemd-run") + .args(["--unit", &unit, "--collect"]) + .arg(binary) + .output() + .await + .map_err(|e| UpgradeError::Systemd(e.to_string()))?; + if !out.status.success() { + return Err(UpgradeError::Systemd( + String::from_utf8_lossy(&out.stderr).to_string(), + )); + } + *self.started_unit.lock().await = Some(unit); + Ok(()) + } + + async fn revert(&self) -> Result<(), UpgradeError> { + // Stop the transient unit if we started one. + if let Some(unit) = self.started_unit.lock().await.take() { + let _ = tokio::process::Command::new("systemctl") + .args(["stop", &unit]) + .status() + .await; + } + // Restore the previous symlink target. + let previous = self + .previous_target + .lock() + .await + .clone() + .ok_or(UpgradeError::NothingToRevert)?; + let tmp = PathBuf::from(format!("{ACTIVE_SYMLINK}.revert")); + let _ = std::fs::remove_file(&tmp); + std::os::unix::fs::symlink(&previous, &tmp)?; + std::fs::rename(&tmp, ACTIVE_SYMLINK)?; + Ok(()) + } +} + +// ───────────────────────── NATS wiring ───────────────────────── + +use std::sync::Arc; + +use async_nats::jetstream::{self, kv::Operation}; +use futures_util::StreamExt; +use harmony_reconciler_contracts::{ + AgentUpgradeStatus, BUCKET_AGENT_UPGRADE, BUCKET_AGENT_UPGRADE_STATUS, BUCKET_DEVICE_HEARTBEAT, + HeartbeatPayload, Id, agent_upgrade_key, device_heartbeat_key, +}; +use std::sync::atomic::{AtomicBool, Ordering}; +use tokio::sync::Notify; + +/// The operator's cutover stop signal, shared between the command server (which +/// receives `Verb::UpgradeStop`) and the upgrade manager (which awaits it). +/// +/// Both old and new agents subscribe to the device command subject, so both +/// *receive* the stop — but it's only **armed** on the old agent during its +/// post-cutover wait. Outside that window `fire` is a no-op, so a stray stop +/// (or one meant for a previous upgrade) can't make an agent exit, and no stale +/// permit accumulates to trip the next upgrade. +pub struct UpgradeStopSignal { + notify: Notify, + armed: AtomicBool, +} + +impl Default for UpgradeStopSignal { + fn default() -> Self { + Self { + notify: Notify::new(), + armed: AtomicBool::new(false), + } + } +} + +impl UpgradeStopSignal { + /// Fire the signal if armed. Returns whether it was (for logging). + pub fn fire(&self) -> bool { + if self.armed.load(Ordering::SeqCst) { + self.notify.notify_one(); + true + } else { + false + } + } + async fn notified(&self) { + self.notify.notified().await; + } + fn arm(&self) { + self.armed.store(true, Ordering::SeqCst); + } + fn disarm(&self) { + self.armed.store(false, Ordering::SeqCst); + } +} + +/// Writes [`AgentUpgradeStatus`] to the status KV so the operator can reflect +/// the live phase onto the `Device` CR. +struct KvStatusSink { + bucket: jetstream::kv::Store, + device_id: Id, + current_version: String, + target: String, +} + +impl StatusSink for KvStatusSink { + async fn publish(&self, phase: AgentUpgradePhase, last_error: Option) { + let status = AgentUpgradeStatus { + device_id: self.device_id.clone(), + phase, + current_version: self.current_version.clone(), + target_version: Some(self.target.clone()), + last_error, + at: chrono::Utc::now(), + }; + let key = agent_upgrade_key(&self.device_id.to_string()); + match serde_json::to_vec(&status) { + Ok(payload) => { + if let Err(e) = self.bucket.put(&key, payload.into()).await { + tracing::warn!(error = %e, "upgrade: status publish failed"); + } + } + Err(e) => tracing::warn!(error = %e, "upgrade: status serialize failed"), + } + } +} + +/// Watch the device's upgrade marker; run the state machine when a marker asks +/// for a different version. Returns `Ok(())` only when an upgrade succeeds and +/// the operator signals stop — the caller (main) treats that as "exit so the +/// new version takes over". Failures/reverts keep watching for a fixed marker. +pub async fn run( + client: async_nats::Client, + device_id: Id, + current_version: String, + operator_stop: Arc, +) -> anyhow::Result<()> { + let js = jetstream::new(client); + let marker_bucket = js + .create_key_value(jetstream::kv::Config { + bucket: BUCKET_AGENT_UPGRADE.to_string(), + ..Default::default() + }) + .await?; + let status_bucket = js + .create_key_value(jetstream::kv::Config { + bucket: BUCKET_AGENT_UPGRADE_STATUS.to_string(), + ..Default::default() + }) + .await?; + let heartbeat_bucket = js + .create_key_value(jetstream::kv::Config { + bucket: BUCKET_DEVICE_HEARTBEAT.to_string(), + ..Default::default() + }) + .await?; + + let my_key = agent_upgrade_key(&device_id.to_string()); + tracing::info!(version = %current_version, key = %my_key, "upgrade: watching marker"); + // Outer loop re-establishes the watch after a NATS blip — only an + // operator-committed cutover (`UpgradeAction::Exit`) returns from here. + loop { + let mut watch = marker_bucket.watch_with_history(&my_key).await?; + while let Some(entry) = watch.next().await { + let entry = match entry { + Ok(e) => e, + Err(e) => { + tracing::warn!(error = %e, "upgrade: marker watch error"); + continue; + } + }; + if entry.operation != Operation::Put { + continue; + } + let marker: AgentUpgradeMarker = match serde_json::from_slice(&entry.value) { + Ok(m) => m, + Err(e) => { + tracing::warn!(error = %e, "upgrade: bad marker payload"); + continue; + } + }; + if marker.desired_version == current_version { + tracing::debug!(version = %current_version, "upgrade: already at desired version"); + continue; + } + + tracing::info!( + from = %current_version, + to = %marker.desired_version, + "upgrade: marker requests new version" + ); + let executor = SystemdUpgradeExecutor::new(current_version.clone()); + let sink = KvStatusSink { + bucket: status_bucket.clone(), + device_id: device_id.clone(), + current_version: current_version.clone(), + target: marker.desired_version.clone(), + }; + let healthy = + wait_new_version_healthy(&heartbeat_bucket, &device_id, &marker.desired_version); + // Arm the stop signal for the duration of this attempt; the command + // server only forwards an operator stop while armed. + operator_stop.arm(); + let stop = { + let signal = operator_stop.clone(); + async move { signal.notified().await } + }; + let action = drive(&marker, &executor, &sink, healthy, stop, HEARTBEAT_TIMEOUT).await; + operator_stop.disarm(); + match action { + UpgradeAction::Exit => { + tracing::info!( + "upgrade: cutover committed by operator; exiting for new version" + ); + return Ok(()); + } + UpgradeAction::StayedOnCurrent => { + tracing::warn!("upgrade: stayed on current version; awaiting a new marker"); + } + } + } + tracing::warn!("upgrade: marker watch ended; re-establishing"); + tokio::time::sleep(Duration::from_secs(5)).await; + } +} + +/// Resolve once a heartbeat from `target` version is observed on this device's +/// heartbeat key (old and new both write it; we wait for the new one). +async fn wait_new_version_healthy(bucket: &jetstream::kv::Store, device_id: &Id, target: &str) { + let key = device_heartbeat_key(&device_id.to_string()); + let mut watch = match bucket.watch_with_history(&key).await { + Ok(w) => w, + // If we can't watch, never report healthy — the drive's timeout reverts. + Err(e) => { + tracing::warn!(error = %e, "upgrade: heartbeat watch failed"); + return std::future::pending::<()>().await; + } + }; + while let Some(entry) = watch.next().await { + if let Ok(e) = entry { + if e.operation == Operation::Put { + if let Ok(hb) = serde_json::from_slice::(&e.value) { + if hb.agent_version == target { + return; + } + } + } + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::Mutex; + use std::sync::atomic::{AtomicUsize, Ordering}; + + /// Records the phase sequence and lets a test fail any step. + #[derive(Default)] + struct FakeExecutor { + fail_stage: bool, + fail_self_test: bool, + fail_cutover: bool, + reverted: AtomicUsize, + cutover_calls: AtomicUsize, + } + + impl UpgradeExecutor for FakeExecutor { + async fn stage(&self, _m: &AgentUpgradeMarker) -> Result { + if self.fail_stage { + return Err(UpgradeError::Url("bad".into())); + } + Ok(PathBuf::from("/usr/bin/fleet-agent-v9.9.9")) + } + async fn self_test(&self, _b: &Path) -> Result<(), UpgradeError> { + if self.fail_self_test { + return Err(UpgradeError::SelfTest("rc 1".into())); + } + Ok(()) + } + async fn cutover(&self, _b: &Path, _v: &str) -> Result<(), UpgradeError> { + self.cutover_calls.fetch_add(1, Ordering::SeqCst); + if self.fail_cutover { + return Err(UpgradeError::Systemd("nope".into())); + } + Ok(()) + } + async fn revert(&self) -> Result<(), UpgradeError> { + self.reverted.fetch_add(1, Ordering::SeqCst); + Ok(()) + } + } + + #[derive(Default)] + struct RecordingSink(Mutex>); + impl StatusSink for RecordingSink { + async fn publish(&self, phase: AgentUpgradePhase, _e: Option) { + self.0.lock().unwrap().push(phase); + } + } + impl RecordingSink { + fn phases(&self) -> Vec { + self.0.lock().unwrap().clone() + } + } + + fn marker() -> AgentUpgradeMarker { + AgentUpgradeMarker { + desired_version: "9.9.9".into(), + url: "https://example/agent".into(), + sha256: "deadbeef".into(), + } + } + + use std::future::{Future, pending, ready}; + + fn never() -> impl Future { + pending::<()>() + } + + #[tokio::test] + async fn happy_path_cutover_then_stop_exits() { + let exec = FakeExecutor::default(); + let sink = RecordingSink::default(); + // Operator stop fires immediately (it saw the new heartbeat). + let action = drive( + &marker(), + &exec, + &sink, + never(), + ready(()), + HEARTBEAT_TIMEOUT, + ) + .await; + assert_eq!(action, UpgradeAction::Exit); + assert_eq!(exec.reverted.load(Ordering::SeqCst), 0); + assert_eq!(sink.phases(), vec![Staging, Verifying, CutoverReady, Done]); + } + + #[tokio::test] + async fn healthy_then_stop_exits_without_revert() { + let exec = FakeExecutor::default(); + let sink = RecordingSink::default(); + // Healthy observed first; then (still) operator stop → exit, no revert. + let action = drive( + &marker(), + &exec, + &sink, + ready(()), + ready(()), + HEARTBEAT_TIMEOUT, + ) + .await; + assert_eq!(action, UpgradeAction::Exit); + assert_eq!(exec.reverted.load(Ordering::SeqCst), 0); + } + + #[tokio::test] + async fn stage_failure_stays_and_does_not_cutover() { + let exec = FakeExecutor { + fail_stage: true, + ..Default::default() + }; + let sink = RecordingSink::default(); + let action = drive(&marker(), &exec, &sink, never(), never(), HEARTBEAT_TIMEOUT).await; + assert_eq!(action, UpgradeAction::StayedOnCurrent); + assert_eq!(exec.cutover_calls.load(Ordering::SeqCst), 0); + assert_eq!(sink.phases(), vec![Staging, Failed]); + } + + #[tokio::test] + async fn self_test_failure_stays_and_does_not_cutover() { + let exec = FakeExecutor { + fail_self_test: true, + ..Default::default() + }; + let sink = RecordingSink::default(); + let action = drive(&marker(), &exec, &sink, never(), never(), HEARTBEAT_TIMEOUT).await; + assert_eq!(action, UpgradeAction::StayedOnCurrent); + assert_eq!(exec.cutover_calls.load(Ordering::SeqCst), 0); + assert_eq!(sink.phases(), vec![Staging, Verifying, Failed]); + } + + #[tokio::test] + async fn cutover_failure_reverts_and_stays() { + let exec = FakeExecutor { + fail_cutover: true, + ..Default::default() + }; + let sink = RecordingSink::default(); + let action = drive(&marker(), &exec, &sink, never(), never(), HEARTBEAT_TIMEOUT).await; + assert_eq!(action, UpgradeAction::StayedOnCurrent); + assert_eq!(exec.reverted.load(Ordering::SeqCst), 1); + assert_eq!( + sink.phases(), + vec![Staging, Verifying, CutoverReady, Failed] + ); + } + + #[tokio::test(start_paused = true)] + async fn heartbeat_timeout_reverts_and_stays() { + let exec = FakeExecutor::default(); + let sink = RecordingSink::default(); + // Never healthy, never stopped → the timeout fires → revert. + let action = drive( + &marker(), + &exec, + &sink, + never(), + never(), + Duration::from_secs(60), + ) + .await; + assert_eq!(action, UpgradeAction::StayedOnCurrent); + assert_eq!(exec.reverted.load(Ordering::SeqCst), 1); + assert_eq!( + sink.phases(), + vec![Staging, Verifying, CutoverReady, Failed] + ); + } +} diff --git a/fleet/harmony-fleet-operator/src/commands.rs b/fleet/harmony-fleet-operator/src/commands.rs index 2ce476ef..a0d6611e 100644 --- a/fleet/harmony-fleet-operator/src/commands.rs +++ b/fleet/harmony-fleet-operator/src/commands.rs @@ -102,6 +102,24 @@ impl FleetCommandsClient { }; Ok(serde_json::from_slice(&resp.payload)?) } + + /// Send `Verb::UpgradeStop` at upgrade cutover (ADR-022). The operator is the + /// single source of truth for the stop — it calls this only after observing + /// the new version's heartbeat. The old agent acks, then exits; the new + /// (already running) version takes over. + pub async fn upgrade_stop(&self, device_id: &str, reason: &str) -> Result<(), CommandError> { + let subject = device_command_subject(device_id, Verb::UpgradeStop); + let body = serde_json::to_vec(&CommandRequest::UpgradeStop { + reason: reason.to_string(), + }) + .expect("CommandRequest serializes"); + let fut = self.nc.request(subject, body.into()); + match tokio::time::timeout(self.timeout, fut).await { + Ok(Ok(_)) => Ok(()), + Ok(Err(err)) => Err(map_request_error(err, self.timeout)), + Err(_) => Err(CommandError::Timeout(self.timeout)), + } + } } /// Map an async-nats `RequestError` to our typed surface. The `kind` diff --git a/fleet/harmony-fleet-operator/src/lib.rs b/fleet/harmony-fleet-operator/src/lib.rs index fd1820fe..51bb09b2 100644 --- a/fleet/harmony-fleet-operator/src/lib.rs +++ b/fleet/harmony-fleet-operator/src/lib.rs @@ -17,3 +17,4 @@ pub mod device_reconciler; pub mod device_status; pub mod fleet_aggregator; pub mod liveness; +pub mod upgrade_coordinator; diff --git a/fleet/harmony-fleet-operator/src/main.rs b/fleet/harmony-fleet-operator/src/main.rs index 66d04c1f..337638e9 100644 --- a/fleet/harmony-fleet-operator/src/main.rs +++ b/fleet/harmony-fleet-operator/src/main.rs @@ -5,7 +5,9 @@ mod frontend; #[cfg(feature = "web-frontend")] mod service; -use harmony_fleet_operator::{device_reconciler, device_status, fleet_aggregator}; +use harmony_fleet_operator::{ + device_reconciler, device_status, fleet_aggregator, upgrade_coordinator, +}; use anyhow::{Context, Result}; use async_nats::jetstream; @@ -258,10 +260,15 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()> let dr_js = js.clone(); let ds_client = client.clone(); let ds_js = js.clone(); + // upgrade_coordinator — agent-upgrade status/heartbeat → cutover stop + CR + let uc_client = client.clone(); + let uc_js = js.clone(); + let uc_commands = harmony_fleet_operator::commands::FleetCommandsClient::new(nats.clone()); tokio::select! { r = controller::run(ctl_client, desired_state_kv) => r, r = device_reconciler::run(dr_client, dr_js) => r, r = device_status::run(ds_client, ds_js) => r, + r = upgrade_coordinator::run(uc_client, uc_js, uc_commands) => r, r = fleet_aggregator::run(client, js, liveness) => r, } } diff --git a/fleet/harmony-fleet-operator/src/service/real.rs b/fleet/harmony-fleet-operator/src/service/real.rs index 252d6bed..a4e7276a 100644 --- a/fleet/harmony-fleet-operator/src/service/real.rs +++ b/fleet/harmony-fleet-operator/src/service/real.rs @@ -592,6 +592,8 @@ mod tests { DeviceLiveness { last_heartbeat: None, reachability: r, + current_version: None, + upgrade: None, } } diff --git a/fleet/harmony-fleet-operator/src/upgrade_coordinator.rs b/fleet/harmony-fleet-operator/src/upgrade_coordinator.rs new file mode 100644 index 00000000..e46741a0 --- /dev/null +++ b/fleet/harmony-fleet-operator/src/upgrade_coordinator.rs @@ -0,0 +1,238 @@ +//! Operator-side agent-upgrade coordinator (ADR-022). +//! +//! The operator is the single source of truth for the cutover stop. This loop: +//! 1. watches the agent **upgrade-status** KV (phase per device) and the +//! **heartbeat** KV (running version per device); +//! 2. when a device is `CutoverReady` **and** the operator has independently +//! observed a heartbeat from the new version, sends `Verb::UpgradeStop` to +//! the old agent — never before; +//! 3. reflects the current version + upgrade phase onto the `Device` CR. +//! +//! The agent never self-stops; only this operator-observed handoff advances the +//! upgrade. See [`should_send_stop`] for the load-bearing decision. + +use std::collections::{HashMap, HashSet}; + +use anyhow::Result; +use async_nats::jetstream::kv::Operation; +use futures_util::StreamExt; +use harmony_reconciler_contracts::{ + AgentUpgradePhase, AgentUpgradeStatus, BUCKET_AGENT_UPGRADE_STATUS, BUCKET_DEVICE_HEARTBEAT, + HeartbeatPayload, +}; +use kube::Client; +use kube::api::{Api, Patch, PatchParams}; +use serde_json::json; +use tokio::sync::Mutex; + +use harmony::modules::fleet::operator::{Device, DeviceUpgradeStatus}; + +use crate::commands::FleetCommandsClient; + +/// The handoff decision: commit the stop only when the agent is `CutoverReady` +/// **and** the operator has seen a heartbeat from the target version. Pure so +/// it can be unit-tested without NATS. +pub fn should_send_stop(status: &AgentUpgradeStatus, observed_version: Option<&str>) -> bool { + if status.phase != AgentUpgradePhase::CutoverReady { + return false; + } + match (&status.target_version, observed_version) { + (Some(target), Some(seen)) => seen == target, + _ => false, + } +} + +#[derive(Default)] +struct Coordinator { + /// Latest upgrade status per device. + statuses: HashMap, + /// Latest heartbeat version per device. + heartbeat_versions: HashMap, + /// Devices we've already sent the stop to (don't resend every tick). + stopped: HashSet, +} + +pub async fn run( + kube: Client, + js: async_nats::jetstream::Context, + commands: FleetCommandsClient, +) -> Result<()> { + let status_bucket = js + .create_key_value(async_nats::jetstream::kv::Config { + bucket: BUCKET_AGENT_UPGRADE_STATUS.to_string(), + ..Default::default() + }) + .await?; + let heartbeat_bucket = js + .create_key_value(async_nats::jetstream::kv::Config { + bucket: BUCKET_DEVICE_HEARTBEAT.to_string(), + ..Default::default() + }) + .await?; + + let state = Mutex::new(Coordinator::default()); + let devices: Api = Api::all(kube); + + let status_watch = async { + let mut w = status_bucket.watch_with_history(">").await?; + while let Some(entry) = w.next().await { + let Ok(entry) = entry else { continue }; + if entry.operation != Operation::Put { + continue; + } + let Ok(status) = serde_json::from_slice::(&entry.value) else { + continue; + }; + let device = status.device_id.to_string(); + { + let mut g = state.lock().await; + // A fresh non-terminal phase means a new attempt — allow a stop again. + if status.phase != AgentUpgradePhase::CutoverReady { + g.stopped.remove(&device); + } + g.statuses.insert(device.clone(), status); + } + reconcile_device(&state, &devices, &commands, &device).await; + } + Ok::<(), anyhow::Error>(()) + }; + + let heartbeat_watch = async { + let mut w = heartbeat_bucket.watch_with_history(">").await?; + while let Some(entry) = w.next().await { + let Ok(entry) = entry else { continue }; + if entry.operation != Operation::Put { + continue; + } + let Ok(hb) = serde_json::from_slice::(&entry.value) else { + continue; + }; + if hb.agent_version.is_empty() { + continue; + } + let device = hb.device_id.to_string(); + state + .lock() + .await + .heartbeat_versions + .insert(device.clone(), hb.agent_version); + reconcile_device(&state, &devices, &commands, &device).await; + } + Ok::<(), anyhow::Error>(()) + }; + + tokio::try_join!(status_watch, heartbeat_watch)?; + Ok(()) +} + +/// For one device: decide whether to commit the stop, then reflect version + +/// upgrade phase onto its CR. +async fn reconcile_device( + state: &Mutex, + devices: &Api, + commands: &FleetCommandsClient, + device: &str, +) { + // Snapshot the decision inputs under the lock. + let (send_stop, status, version) = { + let g = state.lock().await; + let status = g.statuses.get(device).cloned(); + let version = g.heartbeat_versions.get(device).cloned(); + let send_stop = status + .as_ref() + .map(|s| should_send_stop(s, version.as_deref()) && !g.stopped.contains(device)) + .unwrap_or(false); + (send_stop, status, version) + }; + + if send_stop { + // Mark before sending so a concurrent reconcile doesn't double-send. + state.lock().await.stopped.insert(device.to_string()); + tracing::info!(device, "upgrade: new version healthy; sending cutover stop"); + if let Err(e) = commands + .upgrade_stop(device, "upgrade cutover committed") + .await + { + tracing::warn!(device, error = %e, "upgrade: stop send failed; will retry on next event"); + state.lock().await.stopped.remove(device); + } + } + + patch_device(devices, device, status.as_ref(), version.as_deref()).await; +} + +async fn patch_device( + devices: &Api, + device: &str, + status: Option<&AgentUpgradeStatus>, + version: Option<&str>, +) { + // Surface the upgrade only while it's in flight (drop it on Done/Running). + let upgrade = status.and_then(|s| match s.phase { + AgentUpgradePhase::Done | AgentUpgradePhase::Running => None, + phase => Some(DeviceUpgradeStatus { + phase, + target_version: s.target_version.clone(), + last_error: s.last_error.clone(), + }), + }); + let patch = json!({ + "status": { + "currentVersion": version, + "upgrade": upgrade, + } + }); + if let Err(e) = devices + .patch_status(device, &PatchParams::default(), &Patch::Merge(&patch)) + .await + { + // A heartbeat/status can outrace the Device CR's creation; retry later. + if !matches!(&e, kube::Error::Api(ae) if ae.code == 404) { + tracing::warn!(device, error = %e, "upgrade: device status patch failed"); + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::Utc; + use harmony_reconciler_contracts::Id; + + fn status(phase: AgentUpgradePhase, target: Option<&str>) -> AgentUpgradeStatus { + AgentUpgradeStatus { + device_id: Id::from("dev-1"), + phase, + current_version: "0.1.0".into(), + target_version: target.map(String::from), + last_error: None, + at: Utc::now(), + } + } + + #[test] + fn commits_stop_only_when_cutover_ready_and_new_version_seen() { + let s = status(AgentUpgradePhase::CutoverReady, Some("0.2.0")); + assert!(should_send_stop(&s, Some("0.2.0"))); + // New version not yet observed (still old) → hold. + assert!(!should_send_stop(&s, Some("0.1.0"))); + // No heartbeat seen → hold. + assert!(!should_send_stop(&s, None)); + } + + #[test] + fn never_commits_before_cutover_ready() { + for phase in [ + AgentUpgradePhase::Staging, + AgentUpgradePhase::Verifying, + AgentUpgradePhase::Failed, + AgentUpgradePhase::Running, + ] { + let s = status(phase, Some("0.2.0")); + assert!( + !should_send_stop(&s, Some("0.2.0")), + "{phase:?} must not commit" + ); + } + } +} diff --git a/harmony-reconciler-contracts/src/commands.rs b/harmony-reconciler-contracts/src/commands.rs index 85356149..28e99660 100644 --- a/harmony-reconciler-contracts/src/commands.rs +++ b/harmony-reconciler-contracts/src/commands.rs @@ -45,6 +45,12 @@ pub enum Verb { /// Run a shell command on the device and return its captured /// output. Single-shot (no streaming); the agent runs `sh -c`. Exec, + /// Operator → old agent: gracefully exit during an upgrade cutover + /// (ADR-022). The operator is the single source of truth for the stop — + /// the agent never self-stops. Reusing the command protocol means no new + /// subject or callout-permission change. + #[serde(rename = "upgrade-stop")] + UpgradeStop, } impl Verb { @@ -55,6 +61,7 @@ impl Verb { match self { Verb::Ping => "ping", Verb::Exec => "exec", + Verb::UpgradeStop => "upgrade-stop", } } } @@ -77,7 +84,14 @@ pub fn device_command_subscription(device_id: &str) -> String { #[serde(tag = "verb", rename_all = "lowercase")] pub enum CommandRequest { Ping, - Exec { command: String }, + Exec { + command: String, + }, + /// Stop the old agent at upgrade cutover. `reason` is logged. + #[serde(rename = "upgrade-stop")] + UpgradeStop { + reason: String, + }, } /// JSON body of a `Verb::Ping` reply. diff --git a/harmony-reconciler-contracts/src/fleet.rs b/harmony-reconciler-contracts/src/fleet.rs index 92ef773f..19500bb6 100644 --- a/harmony-reconciler-contracts/src/fleet.rs +++ b/harmony-reconciler-contracts/src/fleet.rs @@ -139,6 +139,11 @@ pub struct DeploymentState { pub struct HeartbeatPayload { pub device_id: Id, pub at: DateTime, + /// Agent crate version the device is running (ADR-022). `#[serde(default)]` + /// so a pre-upgrade agent's heartbeat (no field) decodes as `""` = + /// "unknown" rather than failing — operators in the field must tolerate it. + #[serde(default)] + pub agent_version: String, } #[cfg(test)] @@ -239,10 +244,11 @@ mod tests { let hb = HeartbeatPayload { device_id: Id::from("pi-01".to_string()), at: ts("2026-04-22T10:00:30Z"), + agent_version: "0.1.0".to_string(), }; let bytes = serde_json::to_vec(&hb).unwrap(); assert!( - bytes.len() < 96, + bytes.len() < 128, "heartbeat payload grew to {} bytes: {}", bytes.len(), String::from_utf8_lossy(&bytes), diff --git a/harmony-reconciler-contracts/src/kv.rs b/harmony-reconciler-contracts/src/kv.rs index 8ea10302..350e1c0a 100644 --- a/harmony-reconciler-contracts/src/kv.rs +++ b/harmony-reconciler-contracts/src/kv.rs @@ -32,6 +32,20 @@ pub const BUCKET_DEVICE_STATE: &str = "device-state"; /// the state bucket. Key format: `heartbeat.`. pub const BUCKET_DEVICE_HEARTBEAT: &str = "device-heartbeat"; +/// Operator → agent upgrade marker (ADR-022): desired agent version + signed +/// binary URL + SHA-256, one per device. Key format: `agent-upgrade.`. +pub const BUCKET_AGENT_UPGRADE: &str = "agent-upgrade"; + +/// Agent → operator upgrade status: the live phase of the upgrade state +/// machine. Key format: `agent-upgrade.`. +pub const BUCKET_AGENT_UPGRADE_STATUS: &str = "agent-upgrade-status"; + +/// KV key for a device's upgrade marker / status. Format: +/// `agent-upgrade.` (shared shape across both upgrade buckets). +pub fn agent_upgrade_key(device_id: &str) -> String { + format!("agent-upgrade.{device_id}") +} + /// KV key for a `(device, deployment)` pair in [`BUCKET_DESIRED_STATE`]. /// Format: `.`. pub fn desired_state_key(device_id: &str, deployment_name: &DeploymentName) -> String { diff --git a/harmony-reconciler-contracts/src/lib.rs b/harmony-reconciler-contracts/src/lib.rs index 6c9afd12..3fb02015 100644 --- a/harmony-reconciler-contracts/src/lib.rs +++ b/harmony-reconciler-contracts/src/lib.rs @@ -20,6 +20,7 @@ pub mod commands; pub mod fleet; pub mod kv; pub mod status; +pub mod upgrade; pub use commands::{ CommandRequest, ErrorKind, ErrorReply, ExecReply, HDR_DEADLINE, HDR_FINAL, HDR_OPERATOR_SUB, @@ -30,11 +31,13 @@ pub use fleet::{ DeploymentName, DeploymentState, DeviceInfo, HeartbeatPayload, InvalidDeploymentName, }; pub use kv::{ - BUCKET_DESIRED_STATE, BUCKET_DEVICE_HEARTBEAT, BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE, + BUCKET_AGENT_UPGRADE, BUCKET_AGENT_UPGRADE_STATUS, BUCKET_DESIRED_STATE, + BUCKET_DEVICE_HEARTBEAT, BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE, agent_upgrade_key, desired_state_key, desired_state_watch_filter, device_heartbeat_key, device_info_key, device_state_key, }; pub use status::{InventorySnapshot, Phase}; +pub use upgrade::{AgentUpgradeMarker, AgentUpgradePhase, AgentUpgradeStatus}; // Re-exports so consumers (agent, operator) don't need a direct // harmony_types dependency purely to name the cross-boundary types. diff --git a/harmony-reconciler-contracts/src/upgrade.rs b/harmony-reconciler-contracts/src/upgrade.rs new file mode 100644 index 00000000..d6fce6ee --- /dev/null +++ b/harmony-reconciler-contracts/src/upgrade.rs @@ -0,0 +1,98 @@ +//! Agent self-upgrade wire types (ADR-022). +//! +//! The operator publishes an [`AgentUpgradeMarker`] per device (desired version +//! + signed binary URL + SHA-256). The agent runs the upgrade state machine and +//! publishes [`AgentUpgradeStatus`] back so the operator can reflect progress +//! onto the `Device` CR and decide when to send the stop signal. Both ride NATS +//! KV (not a fire-and-forget subject) so they survive an operator restart. + +use chrono::{DateTime, Utc}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::Id; + +/// Operator → agent: the version this device should run, and where to fetch it. +/// Written to KV key `agent-upgrade.` in +/// [`crate::BUCKET_AGENT_UPGRADE`]. The agent acts only when +/// `desired_version` differs from what it's running; **any** change re-triggers +/// the state machine (so re-publishing the same version after a fix retries). +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct AgentUpgradeMarker { + /// Target crate version, `MAJOR.MINOR.PATCH` (no `v` prefix, no build + /// metadata — the on-disk path is `/usr/bin/fleet-agent-v`). + pub desired_version: String, + /// HTTPS URL of the versioned binary (Gitea/Harbor release asset, CDN, …). + pub url: String, + /// Lowercase hex SHA-256 of the binary. Verified before the binary is + /// staged; a mismatch fails the upgrade and the agent stays on its version. + pub sha256: String, +} + +/// Phases of the agent upgrade state machine (ADR-022). `Running` is steady +/// state; the rest are the upgrade in flight. `Failed` and `Done` are terminal +/// for a given marker. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "kebab-case")] +pub enum AgentUpgradePhase { + /// Normal reconcile loop; no upgrade in flight. + Running, + /// Refusing to start new services; in-flight reconciles drain. + Draining, + /// Fetching + verifying the new binary onto disk. + Staging, + /// Running the staged binary's `--self-test` before any swap. + Verifying, + /// Symlink swapped, new version started alongside; awaiting the operator's + /// stop signal (the operator only sends it after seeing the new version's + /// heartbeat — the agent never self-stops). + CutoverReady, + /// The upgrade did not complete; `last_error` says why. The agent stayed on + /// (or reverted to) its previous version. + Failed, + /// The new version is the active one; this marker is satisfied. + Done, +} + +/// Agent → operator: current upgrade phase + the versions involved. Written to +/// KV key `agent-upgrade.` in [`crate::BUCKET_AGENT_UPGRADE_STATUS`]. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct AgentUpgradeStatus { + pub device_id: Id, + pub phase: AgentUpgradePhase, + /// Version the agent is currently running. + pub current_version: String, + /// Version the agent is upgrading toward (the marker's `desired_version`). + /// `None` when phase is `Running` with no marker. + #[serde(default)] + pub target_version: Option, + #[serde(default)] + pub last_error: Option, + pub at: DateTime, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn phase_wire_form_is_stable() { + // The serialized form lands in NATS KV + the Device CR; the operator + // and dashboard key off it. Pin it. + let s = |p: AgentUpgradePhase| serde_json::to_value(p).unwrap(); + assert_eq!(s(AgentUpgradePhase::CutoverReady), "cutover-ready"); + assert_eq!(s(AgentUpgradePhase::Running), "running"); + assert_eq!(s(AgentUpgradePhase::Failed), "failed"); + } + + #[test] + fn marker_roundtrips() { + let m = AgentUpgradeMarker { + desired_version: "0.2.0".into(), + url: "https://git.nationtech.io/fleet/agent/v0.2.0".into(), + sha256: "abc123".into(), + }; + let j = serde_json::to_string(&m).unwrap(); + assert_eq!(serde_json::from_str::(&j).unwrap(), m); + } +} diff --git a/harmony/src/modules/fleet/operator/crd.rs b/harmony/src/modules/fleet/operator/crd.rs index bdf027cd..6e78d201 100644 --- a/harmony/src/modules/fleet/operator/crd.rs +++ b/harmony/src/modules/fleet/operator/crd.rs @@ -1,4 +1,4 @@ -use harmony_reconciler_contracts::InventorySnapshot; +use harmony_reconciler_contracts::{AgentUpgradePhase, InventorySnapshot}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector; use kube::CustomResource; use schemars::JsonSchema; @@ -118,6 +118,28 @@ pub struct DeviceStatus { #[serde(skip_serializing_if = "Option::is_none")] pub last_heartbeat: Option, pub reachability: Reachability, + /// Agent version the device is currently running, from its heartbeat + /// (ADR-022). `None` for a pre-upgrade agent that doesn't report it. + #[serde(skip_serializing_if = "Option::is_none")] + pub current_version: Option, + /// In-flight agent upgrade, reflected from the agent's upgrade status. + /// `None` when no upgrade is active. + #[serde(skip_serializing_if = "Option::is_none")] + pub upgrade: Option, +} + +/// Reflection of the agent's upgrade state machine onto the CR, so +/// `kubectl get device -o yaml` and the dashboard see upgrade progress. +#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct DeviceUpgradeStatus { + /// Live phase of the agent's upgrade state machine. The wire enum itself — + /// not a stringified copy — so the set of values can't drift. + pub phase: AgentUpgradePhase, + #[serde(skip_serializing_if = "Option::is_none")] + pub target_version: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub last_error: Option, } /// Coarse liveness derived from heartbeat freshness. Failing/Pending diff --git a/harmony/src/modules/fleet/operator/mod.rs b/harmony/src/modules/fleet/operator/mod.rs index 0c89a456..c0926b0d 100644 --- a/harmony/src/modules/fleet/operator/mod.rs +++ b/harmony/src/modules/fleet/operator/mod.rs @@ -13,5 +13,5 @@ pub mod crd; pub use crd::{ AggregateLastError, Deployment, DeploymentAggregate, DeploymentSpec, DeploymentStatus, Device, - DeviceSpec, DeviceStatus, Reachability, Rollout, RolloutStrategy, + DeviceSpec, DeviceStatus, DeviceUpgradeStatus, Reachability, Rollout, RolloutStrategy, }; diff --git a/harmony_downloadable_asset/Cargo.toml b/harmony_downloadable_asset/Cargo.toml new file mode 100644 index 00000000..9a914350 --- /dev/null +++ b/harmony_downloadable_asset/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "harmony_downloadable_asset" +edition = "2024" +version.workspace = true +readme.workspace = true +license.workspace = true +description = "Download a file from a URL with SHA-256 checksum verification (shared by k3d binary install and the fleet agent self-upgrade)" + +[dependencies] +reqwest = { version = "0.12", features = ["stream", "rustls-tls", "http2"], default-features = false } +sha2 = "0.10" +tokio = { workspace = true, features = ["fs", "io-util"] } +futures-util = "0.3" +url.workspace = true +thiserror.workspace = true +tracing.workspace = true + +[dev-dependencies] +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } +httptest = "0.16" diff --git a/harmony_downloadable_asset/src/lib.rs b/harmony_downloadable_asset/src/lib.rs new file mode 100644 index 00000000..e56d96ac --- /dev/null +++ b/harmony_downloadable_asset/src/lib.rs @@ -0,0 +1,213 @@ +//! Download a file from a URL and verify its SHA-256 before trusting it. +//! +//! Shared by k3d's binary install and the fleet agent's self-upgrade +//! (ADR-022): both fetch a versioned binary from a release host and must refuse +//! anything whose hash doesn't match the pin. Re-downloads are skipped when the +//! target already exists with the right checksum. + +use std::io::Read; +use std::path::{Path, PathBuf}; + +use futures_util::StreamExt; +use sha2::{Digest, Sha256}; +use tokio::fs; +use tokio::fs::File; +use tokio::io::AsyncWriteExt; +use url::Url; + +#[derive(Debug, thiserror::Error)] +pub enum DownloadError { + #[error("failed to create download directory {0}: {1}")] + CreateDir(PathBuf, #[source] std::io::Error), + #[error("request to {0} failed: {1}")] + Request(Url, #[source] reqwest::Error), + #[error("download of {url} failed with status {status}")] + Status { + url: Url, + status: reqwest::StatusCode, + }, + #[error("writing {0} failed: {1}")] + Write(PathBuf, #[source] std::io::Error), + #[error("error streaming download body: {0}")] + Stream(#[source] reqwest::Error), + #[error("checksum mismatch for {path}: expected {expected}, got {actual}")] + Checksum { + path: PathBuf, + expected: String, + actual: String, + }, +} + +/// A file to fetch from `url` into `file_name`, gated on `checksum` (lowercase +/// hex SHA-256). +#[derive(Debug, Clone)] +pub struct DownloadableAsset { + pub url: Url, + pub file_name: String, + pub checksum: String, +} + +impl DownloadableAsset { + /// SHA-256 of `file` as lowercase hex, or `None` if it can't be read. + fn sha256_hex(file: &Path) -> Option { + let mut f = std::fs::File::open(file).ok()?; + let mut hasher = Sha256::new(); + let mut buf = [0u8; 1024 * 1024]; + loop { + match f.read(&mut buf) { + Ok(0) => break, + Ok(n) => hasher.update(&buf[..n]), + Err(e) => { + tracing::warn!(error = %e, "error reading file for checksum"); + return None; + } + } + } + Some(format!("{:x}", hasher.finalize())) + } + + fn matches_checksum(&self, file: &Path) -> bool { + Self::sha256_hex(file).is_some_and(|h| h == self.checksum) + } + + /// Download into `folder/`, verifying the checksum. Skips the + /// download when the file already exists with the right checksum. Returns + /// the path on success. + pub async fn download_to_path(&self, folder: PathBuf) -> Result { + if !folder.exists() { + fs::create_dir_all(&folder) + .await + .map_err(|e| DownloadError::CreateDir(folder.clone(), e))?; + } + let target = folder.join(&self.file_name); + + if self.matches_checksum(&target) { + tracing::debug!( + ?target, + "already present with correct checksum; skipping download" + ); + return Ok(target); + } + + tracing::debug!(url = %self.url, "downloading"); + let response = reqwest::Client::new() + .get(self.url.clone()) + .send() + .await + .map_err(|e| DownloadError::Request(self.url.clone(), e))?; + if !response.status().is_success() { + return Err(DownloadError::Status { + url: self.url.clone(), + status: response.status(), + }); + } + + let mut file = File::create(&target) + .await + .map_err(|e| DownloadError::Write(target.clone(), e))?; + let mut stream = response.bytes_stream(); + while let Some(chunk) = stream.next().await { + let chunk = chunk.map_err(DownloadError::Stream)?; + file.write_all(&chunk) + .await + .map_err(|e| DownloadError::Write(target.clone(), e))?; + } + file.flush() + .await + .map_err(|e| DownloadError::Write(target.clone(), e))?; + drop(file); + + match Self::sha256_hex(&target) { + Some(actual) if actual == self.checksum => Ok(target), + actual => Err(DownloadError::Checksum { + path: target, + expected: self.checksum.clone(), + actual: actual.unwrap_or_else(|| "".to_string()), + }), + } + } +} + +#[cfg(test)] +mod tests { + use super::*; + use httptest::{Expectation, Server, matchers, responders}; + + const CONTENT: &str = "This is a test file."; + const CONTENT_SHA256: &str = "f29bc64a9d3732b4b9035125fdb3285f5b6455778edca72414671e0ca3b2e0de"; + + fn tmp() -> PathBuf { + let id = std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .unwrap() + .as_nanos(); + let p = std::env::temp_dir().join(format!("harmony-dl-{id}")); + std::fs::create_dir_all(&p).unwrap(); + p + } + + #[tokio::test] + async fn downloads_and_verifies() { + let server = Server::run(); + server.expect( + Expectation::matching(httptest::matchers::request::method_path("GET", "/f.bin")) + .respond_with(responders::status_code(200).body(CONTENT)), + ); + let asset = DownloadableAsset { + url: Url::parse(&server.url("/f.bin").to_string()).unwrap(), + file_name: "f.bin".to_string(), + checksum: CONTENT_SHA256.to_string(), + }; + let path = asset.download_to_path(tmp()).await.unwrap(); + assert_eq!(std::fs::read_to_string(path).unwrap(), CONTENT); + } + + #[tokio::test] + async fn skips_when_already_present_with_matching_checksum() { + let server = Server::run(); + server.expect( + Expectation::matching(matchers::any()) + .times(0) + .respond_with(responders::status_code(200).body(CONTENT)), + ); + let folder = tmp(); + std::fs::write(folder.join("f.bin"), CONTENT).unwrap(); + let asset = DownloadableAsset { + url: Url::parse(&server.url("/f.bin").to_string()).unwrap(), + file_name: "f.bin".to_string(), + checksum: CONTENT_SHA256.to_string(), + }; + asset.download_to_path(folder).await.unwrap(); + } + + #[tokio::test] + async fn rejects_bad_checksum() { + let server = Server::run(); + server.expect( + Expectation::matching(matchers::any()) + .respond_with(responders::status_code(200).body("not the expected content")), + ); + let asset = DownloadableAsset { + url: Url::parse(&server.url("/f.bin").to_string()).unwrap(), + file_name: "f.bin".to_string(), + checksum: CONTENT_SHA256.to_string(), + }; + let err = asset.download_to_path(tmp()).await.unwrap_err(); + assert!(matches!(err, DownloadError::Checksum { .. })); + } + + #[tokio::test] + async fn surfaces_http_error() { + let server = Server::run(); + server.expect( + Expectation::matching(matchers::any()).respond_with(responders::status_code(404)), + ); + let asset = DownloadableAsset { + url: Url::parse(&server.url("/f.bin").to_string()).unwrap(), + file_name: "f.bin".to_string(), + checksum: CONTENT_SHA256.to_string(), + }; + let err = asset.download_to_path(tmp()).await.unwrap_err(); + assert!(matches!(err, DownloadError::Status { .. })); + } +} diff --git a/k3d/Cargo.toml b/k3d/Cargo.toml index 23f1a4df..4973316a 100644 --- a/k3d/Cargo.toml +++ b/k3d/Cargo.toml @@ -13,11 +13,9 @@ octocrab = "0.44.0" regex = "1.11.1" reqwest = { version = "0.12", features = ["stream", "rustls-tls", "http2"], default-features = false } url.workspace = true -sha2 = "0.10.8" -futures-util = "0.3.31" kube.workspace = true +harmony_downloadable_asset = { path = "../harmony_downloadable_asset" } [dev-dependencies] env_logger = { workspace = true } -httptest = "0.16.3" pretty_assertions = "1.4.1" diff --git a/k3d/src/downloadable_asset.rs b/k3d/src/downloadable_asset.rs deleted file mode 100644 index 085d3828..00000000 --- a/k3d/src/downloadable_asset.rs +++ /dev/null @@ -1,303 +0,0 @@ -use futures_util::StreamExt; -use log::{debug, warn}; -use sha2::{Digest, Sha256}; -use std::io::Read; -use std::path::PathBuf; -use tokio::fs; -use tokio::fs::File; -use tokio::io::AsyncWriteExt; -use url::Url; - -const CHECKSUM_FAILED_MSG: &str = "Downloaded file failed checksum verification"; - -/// Represents an asset that can be downloaded from a URL with checksum verification. -/// -/// This struct facilitates secure downloading of files from remote URLs by -/// verifying the integrity of the downloaded content using SHA-256 checksums. -/// It handles downloading the file, saving it to disk, and verifying the checksum matches -/// the expected value. -/// -/// # Examples -/// -/// ```compile_fail -/// # use url::Url; -/// # use std::path::PathBuf; -/// -/// # async fn example() -> Result<(), String> { -/// let asset = DownloadableAsset { -/// url: Url::parse("https://example.com/file.zip").unwrap(), -/// file_name: "file.zip".to_string(), -/// checksum: "a1b2c3d4e5f6...".to_string(), -/// }; -/// -/// let download_dir = PathBuf::from("/tmp/downloads"); -/// let file_path = asset.download_to_path(download_dir).await?; -/// # Ok(()) -/// # } -/// ``` -#[derive(Debug)] -pub(crate) struct DownloadableAsset { - pub(crate) url: Url, - pub(crate) file_name: String, - pub(crate) checksum: String, -} - -impl DownloadableAsset { - fn verify_checksum(&self, file: PathBuf) -> bool { - if !file.exists() { - debug!("File does not exist: {:?}", file); - return false; - } - - let mut file = match std::fs::File::open(&file) { - Ok(file) => file, - Err(e) => { - warn!("Failed to open file for checksum verification: {:?}", e); - return false; - } - }; - - let mut hasher = Sha256::new(); - let mut buffer = [0; 1024 * 1024]; // 1MB buffer - - loop { - let bytes_read = match file.read(&mut buffer) { - Ok(0) => break, - Ok(n) => n, - Err(e) => { - warn!("Error reading file for checksum: {:?}", e); - return false; - } - }; - - hasher.update(&buffer[..bytes_read]); - } - - let result = hasher.finalize(); - let calculated_hash = format!("{:x}", result); - - debug!("Expected checksum: {}", self.checksum); - debug!("Calculated checksum: {}", calculated_hash); - - calculated_hash == self.checksum - } - - /// Downloads the asset to the specified directory, verifying its checksum. - /// - /// This function will: - /// 1. Create the target directory if it doesn't exist - /// 2. Check if the file already exists with the correct checksum - /// 3. If not, download the file from the URL - /// 4. Verify the downloaded file's checksum matches the expected value - /// - /// # Arguments - /// - /// * `folder` - The directory path where the file should be saved - /// - /// # Returns - /// - /// * `Ok(PathBuf)` - The path to the downloaded file on success - /// * `Err(String)` - A descriptive error message if the download or verification fails - /// - /// # Errors - /// - /// This function will return an error if: - /// - The network request fails - /// - The server responds with a non-success status code - /// - Writing to disk fails - /// - The checksum verification fails - pub(crate) async fn download_to_path(&self, folder: PathBuf) -> Result { - if !folder.exists() { - fs::create_dir_all(&folder) - .await - .expect("Failed to create download directory"); - } - - let target_file_path = folder.join(&self.file_name); - debug!("Downloading to path: {:?}", target_file_path); - - if self.verify_checksum(target_file_path.clone()) { - debug!("File already exists with correct checksum, skipping download"); - return Ok(target_file_path); - } - - debug!("Downloading from URL: {}", self.url); - let client = reqwest::Client::new(); - let response = client - .get(self.url.clone()) - .send() - .await - .map_err(|e| format!("Failed to download file: {e}"))?; - - if !response.status().is_success() { - return Err(format!( - "Failed to download file, status: {}", - response.status() - )); - } - - let mut file = File::create(&target_file_path) - .await - .expect("Failed to create target file"); - - let mut stream = response.bytes_stream(); - while let Some(chunk_result) = stream.next().await { - let chunk = chunk_result.expect("Error while downloading file"); - file.write_all(&chunk) - .await - .expect("Failed to write data to file"); - } - - file.flush().await.expect("Failed to flush file"); - drop(file); - - if !self.verify_checksum(target_file_path.clone()) { - return Err(CHECKSUM_FAILED_MSG.to_string()); - } - - debug!( - "File downloaded and verified successfully: {}", - target_file_path.to_string_lossy() - ); - Ok(target_file_path) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use httptest::{ - matchers::{self, request}, - responders, Expectation, Server, - }; - - const BASE_TEST_PATH: &str = "/tmp/harmony-test-k3d-download"; - const TEST_CONTENT: &str = "This is a test file."; - const TEST_CONTENT_HASH: &str = - "f29bc64a9d3732b4b9035125fdb3285f5b6455778edca72414671e0ca3b2e0de"; - - fn setup_test() -> (PathBuf, Server) { - let _ = env_logger::builder().try_init(); - - // Create unique test directory - let test_id = std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .unwrap() - .as_millis(); - let download_path = format!("{}/test_{}", BASE_TEST_PATH, test_id); - std::fs::create_dir_all(&download_path).unwrap(); - - (PathBuf::from(download_path), Server::run()) - } - - #[tokio::test] - async fn test_download_to_path_success() { - let (folder, server) = setup_test(); - - server.expect( - Expectation::matching(request::method_path("GET", "/test.txt")) - .respond_with(responders::status_code(200).body(TEST_CONTENT)), - ); - - let asset = DownloadableAsset { - url: Url::parse(&server.url("/test.txt").to_string()).unwrap(), - file_name: "test.txt".to_string(), - checksum: TEST_CONTENT_HASH.to_string(), - }; - - let result = asset - .download_to_path(folder.join("success")) - .await - .unwrap(); - let downloaded_content = std::fs::read_to_string(result).unwrap(); - assert_eq!(downloaded_content, TEST_CONTENT); - } - - #[tokio::test] - async fn test_download_to_path_already_exists() { - let (folder, server) = setup_test(); - - server.expect( - Expectation::matching(matchers::any()) - .times(0) - .respond_with(responders::status_code(200).body(TEST_CONTENT)), - ); - - let asset = DownloadableAsset { - url: Url::parse(&server.url("/test.txt").to_string()).unwrap(), - file_name: "test.txt".to_string(), - checksum: TEST_CONTENT_HASH.to_string(), - }; - - let target_file_path = folder.join(&asset.file_name); - std::fs::write(&target_file_path, TEST_CONTENT).unwrap(); - - let result = asset.download_to_path(folder).await.unwrap(); - let content = std::fs::read_to_string(result).unwrap(); - assert_eq!(content, TEST_CONTENT); - } - - #[tokio::test] - async fn test_download_to_path_server_error() { - let (folder, server) = setup_test(); - - server.expect( - Expectation::matching(matchers::any()).respond_with(responders::status_code(404)), - ); - - let asset = DownloadableAsset { - url: Url::parse(&server.url("/test.txt").to_string()).unwrap(), - file_name: "test.txt".to_string(), - checksum: TEST_CONTENT_HASH.to_string(), - }; - - let result = asset.download_to_path(folder.join("error")).await; - assert!(result.is_err()); - assert!(result.unwrap_err().contains("status: 404")); - } - - #[tokio::test] - async fn test_download_to_path_checksum_failure() { - let (folder, server) = setup_test(); - - let invalid_content = "This is NOT the expected content"; - server.expect( - Expectation::matching(matchers::any()) - .respond_with(responders::status_code(200).body(invalid_content)), - ); - - let asset = DownloadableAsset { - url: Url::parse(&server.url("/test.txt").to_string()).unwrap(), - file_name: "test.txt".to_string(), - checksum: TEST_CONTENT_HASH.to_string(), - }; - - let join_handle = - tokio::spawn(async move { asset.download_to_path(folder.join("failure")).await }); - - assert_eq!( - join_handle.await.unwrap().err().unwrap(), - CHECKSUM_FAILED_MSG - ); - } - - #[tokio::test] - async fn test_download_with_specific_path_matcher() { - let (folder, server) = setup_test(); - - server.expect( - Expectation::matching(matchers::request::path("/specific/path.txt")) - .respond_with(responders::status_code(200).body(TEST_CONTENT)), - ); - - let asset = DownloadableAsset { - url: Url::parse(&server.url("/specific/path.txt").to_string()).unwrap(), - file_name: "path.txt".to_string(), - checksum: TEST_CONTENT_HASH.to_string(), - }; - - let result = asset.download_to_path(folder).await.unwrap(); - let downloaded_content = std::fs::read_to_string(result).unwrap(); - assert_eq!(downloaded_content, TEST_CONTENT); - } -} diff --git a/k3d/src/lib.rs b/k3d/src/lib.rs index 193a9eb4..50459696 100644 --- a/k3d/src/lib.rs +++ b/k3d/src/lib.rs @@ -1,5 +1,4 @@ -mod downloadable_asset; -use downloadable_asset::*; +use harmony_downloadable_asset::DownloadableAsset; use kube::Client; use log::{debug, info}; @@ -134,7 +133,10 @@ impl K3d { let release_binary = self.get_binary_for_current_platform(latest_release).await; debug!("Foudn K3d binary to install : {release_binary:#?}"); - release_binary.download_to_path(self.base_dir.clone()).await + release_binary + .download_to_path(self.base_dir.clone()) + .await + .map_err(|e| e.to_string()) } // TODO : Make sure this will only find actual released versions, no prereleases or test