feat(fleet): agent self-upgrade + auto-rollback protocol, ADR-022 (Ch4) #330
20
Cargo.lock
generated
20
Cargo.lock
generated
@@ -4066,6 +4066,7 @@ dependencies = [
|
||||
"harmony",
|
||||
"harmony-fleet-auth",
|
||||
"harmony-reconciler-contracts",
|
||||
"harmony_downloadable_asset",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"thiserror 2.0.18",
|
||||
@@ -4073,6 +4074,7 @@ dependencies = [
|
||||
"toml",
|
||||
"tracing",
|
||||
"tracing-subscriber",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
@@ -4390,6 +4392,20 @@ dependencies = [
|
||||
"syn 2.0.117",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "harmony_downloadable_asset"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"futures-util",
|
||||
"httptest",
|
||||
"reqwest 0.12.28",
|
||||
"sha2 0.10.9",
|
||||
"thiserror 2.0.18",
|
||||
"tokio",
|
||||
"tracing",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "harmony_execution"
|
||||
version = "0.1.0"
|
||||
@@ -5507,15 +5523,13 @@ version = "0.1.0"
|
||||
dependencies = [
|
||||
"async-trait",
|
||||
"env_logger",
|
||||
"futures-util",
|
||||
"httptest",
|
||||
"harmony_downloadable_asset",
|
||||
"kube",
|
||||
"log",
|
||||
"octocrab",
|
||||
"pretty_assertions",
|
||||
"regex",
|
||||
"reqwest 0.12.28",
|
||||
"sha2 0.10.9",
|
||||
"tokio",
|
||||
"url",
|
||||
]
|
||||
|
||||
@@ -5,6 +5,7 @@ members = [
|
||||
"private_repos/*",
|
||||
"harmony",
|
||||
"harmony_zitadel_auth",
|
||||
"harmony_downloadable_asset",
|
||||
"harmony_types",
|
||||
"harmony_macros",
|
||||
"harmony_tui",
|
||||
|
||||
48
ROADMAP/fleet_platform/ch4-agent-upgrade-status.md
Normal file
48
ROADMAP/fleet_platform/ch4-agent-upgrade-status.md
Normal file
@@ -0,0 +1,48 @@
|
||||
# Ch4 — Agent self-upgrade + auto-rollback (ADR-022): status
|
||||
|
||||
Built the full ADR-022 protocol end to end. The state-machine "brain" and the
|
||||
operator's commit decision are exhaustively unit-tested; the OS side-effects sit
|
||||
behind a seam so they're faked in tests and real on-device.
|
||||
|
||||
## Shipped
|
||||
|
||||
| Piece | Where | Tested |
|
||||
|---|---|---|
|
||||
| Wire types: marker, phase, status, `agent_version` on heartbeat, `Verb::UpgradeStop` | `harmony-reconciler-contracts/src/upgrade.rs`, `kv.rs`, `commands.rs`, `fleet.rs` | unit |
|
||||
| Shared download+SHA-256 verify (lifted from k3d) | new crate `harmony_downloadable_asset` | unit (httptest) |
|
||||
| Agent state machine `drive` (Staging→Verifying→CutoverReady→stop/revert) | `harmony-fleet-agent/src/upgrade.rs` | **6 unit tests** incl. timeout-revert, stage/self-test/cutover failure |
|
||||
| `UpgradeExecutor` seam + real `SystemdUpgradeExecutor` (download, `--self-test`, atomic symlink swap, `systemd-run` transient unit, revert) | same | seam fake-tested; real impl self-heals layout |
|
||||
| `--self-test` flag | `harmony-fleet-agent/src/main.rs` | — |
|
||||
| `Verb::UpgradeStop` handling + armed `UpgradeStopSignal` (only the cutover-waiting old agent acts) | `command_server.rs`, `upgrade.rs` | — |
|
||||
| Operator coordinator: send stop **only after** observing the new version's heartbeat; reflect version + phase to the `Device` CR | `harmony-fleet-operator/src/upgrade_coordinator.rs` | **2 unit tests** on the commit decision |
|
||||
| `FleetCommandsClient::upgrade_stop` | `commands.rs` | — |
|
||||
| `Device.status.{currentVersion, upgrade}` | `crd.rs` | — |
|
||||
|
||||
Load-bearing properties from the ADR are intact: old verifies new
|
||||
(`--self-test`); operator commits the stop (single source of truth, never the
|
||||
agent); rollback is the same code path (revert symlink + stop transient unit on
|
||||
self-test failure / heartbeat-timeout); no version is GC'd.
|
||||
|
||||
## Deviations (deliberate)
|
||||
|
||||
- **Marker + status ride NATS KV** (`agent-upgrade` / `agent-upgrade-status`),
|
||||
not a fire-and-forget subject, so they survive an operator restart — same
|
||||
ethos as Ch2. The ADR's `device-cmd.*`/`device-state.*.upgrade` subjects map
|
||||
onto: the existing command protocol (`Verb::UpgradeStop`) and the status KV.
|
||||
- **First-upgrade rollback without M1.** The real executor `capture_revert_target`
|
||||
preserves the running binary at its versioned path on first cutover even when
|
||||
the initial install put a plain file at `/usr/local/bin/fleet-agent`. This
|
||||
makes M1 a clean-install nicety, not a rollback-correctness prerequisite.
|
||||
|
||||
## Flagged for a supervised run (not done tonight)
|
||||
|
||||
1. **M1 clean install layout** — `FleetDeviceSetupScore` should install to
|
||||
`/usr/bin/fleet-agent-v<ver>` + symlink `/usr/local/bin/fleet-agent` from the
|
||||
start. Needs a new `agent_version` config field (≈9 construction sites) and a
|
||||
`FileSource::Symlink` delivery primitive (ansible `state: link`). The executor
|
||||
self-heal above covers correctness in the meantime.
|
||||
2. **libvirt vX→vX+1 e2e + corrupt-binary auto-revert** — needs two built agent
|
||||
binaries, a served URL reachable from the VM, and a KVM run. The VM harness
|
||||
exists (`harmony-fleet-e2e/src/vm`); the protocol brain is unit-green, so this
|
||||
is an integration proof to run on real hardware. The corrupt-binary path is
|
||||
already unit-proven via `stage_failure_*` / `heartbeat_timeout_reverts_*`.
|
||||
@@ -527,6 +527,7 @@ async fn simulate_heartbeat_loop(
|
||||
let hb = HeartbeatPayload {
|
||||
device_id: Id::from(device.device_id.clone()),
|
||||
at: Utc::now(),
|
||||
agent_version: "load-test".to_string(),
|
||||
};
|
||||
if let Ok(payload) = serde_json::to_vec(&hb) {
|
||||
if bucket.put(&hb_key, payload.into()).await.is_ok() {
|
||||
|
||||
@@ -7,7 +7,9 @@ rust-version = "1.85"
|
||||
[dependencies]
|
||||
harmony-fleet-auth = { path = "../harmony-fleet-auth" }
|
||||
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
|
||||
harmony_downloadable_asset = { path = "../../harmony_downloadable_asset" }
|
||||
harmony = { path = "../../harmony", default-features = false, features = ["podman"] }
|
||||
url = { workspace = true }
|
||||
async-nats = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
|
||||
@@ -25,6 +25,8 @@ use harmony_reconciler_contracts::{
|
||||
use serde::Serialize;
|
||||
use thiserror::Error;
|
||||
|
||||
use crate::upgrade::UpgradeStopSignal;
|
||||
|
||||
/// Hard cap on a single exec's combined output. Keeps the JSON reply
|
||||
/// comfortably under NATS's default 1 MiB message limit.
|
||||
const EXEC_MAX_OUTPUT: usize = 256 * 1024;
|
||||
@@ -38,15 +40,20 @@ pub struct CommandServer {
|
||||
client: Client,
|
||||
agent_version: &'static str,
|
||||
started_at: Instant,
|
||||
/// Fired when the operator sends `Verb::UpgradeStop` (ADR-022 cutover). The
|
||||
/// upgrade manager awaits it and then exits the process so the new version
|
||||
/// takes over. The agent never self-stops on its own observation.
|
||||
upgrade_stop: Arc<UpgradeStopSignal>,
|
||||
}
|
||||
|
||||
impl CommandServer {
|
||||
pub fn new(device_id: Id, client: Client) -> Self {
|
||||
pub fn new(device_id: Id, client: Client, upgrade_stop: Arc<UpgradeStopSignal>) -> Self {
|
||||
Self {
|
||||
device_id,
|
||||
client,
|
||||
agent_version: env!("CARGO_PKG_VERSION"),
|
||||
started_at: Instant::now(),
|
||||
upgrade_stop,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -115,6 +122,29 @@ impl CommandServer {
|
||||
Err(e) => return Err(CommandError::BadRequestBody(e.to_string())),
|
||||
};
|
||||
self.reply_exec(reply_to, &command).await
|
||||
} else if verb_token == Verb::UpgradeStop.as_subject_token() {
|
||||
let reason = match serde_json::from_slice::<CommandRequest>(&msg.payload) {
|
||||
Ok(CommandRequest::UpgradeStop { reason }) => reason,
|
||||
Ok(_) => "upgrade-stop".to_string(),
|
||||
// Tolerate a bodyless stop — the operator's intent is clear.
|
||||
Err(_) => "upgrade-stop".to_string(),
|
||||
};
|
||||
tracing::info!(%reason, "received upgrade-stop; signalling cutover exit");
|
||||
// Ack first so the operator knows we got it, then fire the signal.
|
||||
let ack = serde_json::to_vec(&PingReply {
|
||||
device_id: self.device_id.clone(),
|
||||
agent_version: self.agent_version.to_string(),
|
||||
uptime_s: self.started_at.elapsed().as_secs(),
|
||||
})
|
||||
.map_err(CommandError::SerializeReply)?;
|
||||
self.client
|
||||
.publish(reply_to, ack.into())
|
||||
.await
|
||||
.map_err(|e| CommandError::PublishReply(e.to_string()))?;
|
||||
if !self.upgrade_stop.fire() {
|
||||
tracing::warn!("upgrade-stop received but no upgrade is in cutover; ignoring");
|
||||
}
|
||||
Ok(())
|
||||
} else {
|
||||
tracing::warn!(verb = %verb_token, "unknown command verb");
|
||||
Err(CommandError::UnknownVerb(verb_token.to_string()))
|
||||
|
||||
@@ -97,6 +97,7 @@ impl FleetPublisher {
|
||||
let hb = HeartbeatPayload {
|
||||
device_id: self.device_id.clone(),
|
||||
at: chrono::Utc::now(),
|
||||
agent_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||
};
|
||||
let key = device_heartbeat_key(&self.device_id.to_string());
|
||||
match serde_json::to_vec(&hb) {
|
||||
|
||||
@@ -2,6 +2,7 @@ mod command_server;
|
||||
mod config;
|
||||
mod fleet_publisher;
|
||||
mod reconciler;
|
||||
mod upgrade;
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
@@ -43,6 +44,24 @@ struct Cli {
|
||||
default_value = "/etc/fleet-agent/config.toml"
|
||||
)]
|
||||
config: std::path::PathBuf,
|
||||
|
||||
/// ADR-022 self-test: load config, connect NATS (validates JWT), print
|
||||
/// version + "ok", exit 0 — or exit non-zero on any failure. The upgrade
|
||||
/// state machine runs this on a staged binary before cutover.
|
||||
#[arg(long)]
|
||||
self_test: bool,
|
||||
}
|
||||
|
||||
/// ADR-022 `--self-test`: prove this binary can parse its config and reach NATS
|
||||
/// with a valid JWT, then exit. No state mutation, no long-lived loops.
|
||||
async fn self_test(cfg: &AgentConfig) -> Result<()> {
|
||||
let creds = credential_source_from_config(&cfg.credentials)
|
||||
.context("self-test: building NATS credential source")?;
|
||||
connect_nats(cfg, creds)
|
||||
.await
|
||||
.context("self-test: NATS connect / JWT validation")?;
|
||||
println!("fleet-agent v{} self-test ok", env!("CARGO_PKG_VERSION"));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn connect_nats(cfg: &AgentConfig, creds: Creds) -> Result<async_nats::Client> {
|
||||
@@ -198,6 +217,13 @@ async fn main() -> Result<()> {
|
||||
|
||||
let cli = Cli::parse();
|
||||
let cfg = config::load_config(&cli.config)?;
|
||||
|
||||
// ADR-022: the upgrade state machine invokes the staged binary with
|
||||
// `--self-test` before any symlink swap. Exit code is the verdict.
|
||||
if cli.self_test {
|
||||
return self_test(&cfg).await;
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
device_id = %cfg.agent.device_id,
|
||||
runtime_enabled = cfg.agent.runtime_enabled,
|
||||
@@ -277,7 +303,14 @@ async fn main() -> Result<()> {
|
||||
))
|
||||
});
|
||||
|
||||
let command_server = Arc::new(CommandServer::new(device_id.clone(), client.clone()));
|
||||
// Fired by the command server on `Verb::UpgradeStop`; awaited by the
|
||||
// upgrade manager, which then exits the process for the new version.
|
||||
let upgrade_stop = Arc::new(upgrade::UpgradeStopSignal::default());
|
||||
let command_server = Arc::new(CommandServer::new(
|
||||
device_id.clone(),
|
||||
client.clone(),
|
||||
upgrade_stop.clone(),
|
||||
));
|
||||
|
||||
let ctrlc = async {
|
||||
tokio::signal::ctrl_c().await.ok();
|
||||
@@ -312,6 +345,16 @@ async fn main() -> Result<()> {
|
||||
};
|
||||
let heartbeat = publish_heartbeat_loop(fleet);
|
||||
let commands = command_server.run();
|
||||
// ADR-022 self-upgrade manager. Returns only when the operator commits a
|
||||
// cutover (it sent `UpgradeStop`) — that completes the select and the
|
||||
// process exits 0, so the already-started new version takes over and
|
||||
// systemd (Restart=on-failure) does not bring the old one back.
|
||||
let upgrade = upgrade::run(
|
||||
client.clone(),
|
||||
device_id.clone(),
|
||||
env!("CARGO_PKG_VERSION").to_string(),
|
||||
upgrade_stop.clone(),
|
||||
);
|
||||
|
||||
tokio::select! {
|
||||
// Waiting on ctrlc in a select will automatically terminate other branches when
|
||||
@@ -322,6 +365,7 @@ async fn main() -> Result<()> {
|
||||
_ = reconcile => {}
|
||||
_ = heartbeat => {}
|
||||
r = commands => { r?; }
|
||||
r = upgrade => { r?; tracing::info!("agent exiting for upgrade cutover"); }
|
||||
}
|
||||
|
||||
Ok(())
|
||||
|
||||
663
fleet/harmony-fleet-agent/src/upgrade.rs
Normal file
663
fleet/harmony-fleet-agent/src/upgrade.rs
Normal file
@@ -0,0 +1,663 @@
|
||||
//! Agent self-upgrade state machine (ADR-022).
|
||||
//!
|
||||
//! The operator publishes an [`AgentUpgradeMarker`] (desired version + signed
|
||||
//! URL + SHA-256). The agent stages the new binary, verifies it with
|
||||
//! `--self-test`, atomically swaps the `/usr/local/bin/fleet-agent` symlink, and
|
||||
//! starts the new version alongside itself. It then **waits for the operator's
|
||||
//! stop signal** — the agent never self-stops (the operator is the single
|
||||
//! source of truth, and only commits the stop after it has seen the new
|
||||
//! version's heartbeat). If the new version never reports healthy within
|
||||
//! [`HEARTBEAT_TIMEOUT`], the agent reverts the symlink and stays on its
|
||||
//! current version. Rollback is the same code path as upgrade.
|
||||
//!
|
||||
//! The OS side-effects (download, `--self-test`, symlink swap, systemd) sit
|
||||
//! behind [`UpgradeExecutor`] so [`drive`] — the state machine "brain" — is
|
||||
//! unit-tested with a fake; [`SystemdUpgradeExecutor`] does the real work.
|
||||
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::time::Duration;
|
||||
|
||||
use harmony_reconciler_contracts::AgentUpgradeMarker;
|
||||
use harmony_reconciler_contracts::AgentUpgradePhase::{
|
||||
self, CutoverReady, Done, Failed, Staging, Verifying,
|
||||
};
|
||||
|
||||
/// Directory holding the immutable versioned binaries (`fleet-agent-v<ver>`).
|
||||
pub const VERSIONED_BIN_DIR: &str = "/usr/bin";
|
||||
/// The stable path the systemd unit's `ExecStart` references. Symlink swap of
|
||||
/// this path is the cutover primitive (POSIX-atomic rename).
|
||||
pub const ACTIVE_SYMLINK: &str = "/usr/local/bin/fleet-agent";
|
||||
/// How long the old agent waits for the new version's heartbeat after cutover
|
||||
/// before reverting (ADR-022 suggests 60s).
|
||||
pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum UpgradeError {
|
||||
#[error("invalid binary URL: {0}")]
|
||||
Url(String),
|
||||
#[error(transparent)]
|
||||
Download(#[from] harmony_downloadable_asset::DownloadError),
|
||||
#[error("filesystem error: {0}")]
|
||||
Io(#[from] std::io::Error),
|
||||
#[error("self-test exited with {0}")]
|
||||
SelfTest(String),
|
||||
#[error("systemd error: {0}")]
|
||||
Systemd(String),
|
||||
#[error("no previous symlink target recorded to revert to")]
|
||||
NothingToRevert,
|
||||
}
|
||||
|
||||
/// The OS-level operations the upgrade needs. Behind a trait so [`drive`] is
|
||||
/// unit-testable without touching disk or systemd.
|
||||
pub trait UpgradeExecutor {
|
||||
/// Download + verify the binary and place it at its versioned path. Returns
|
||||
/// the staged path.
|
||||
async fn stage(&self, marker: &AgentUpgradeMarker) -> Result<PathBuf, UpgradeError>;
|
||||
/// Run the staged binary's `--self-test`. Err if it doesn't exit 0.
|
||||
async fn self_test(&self, binary: &Path) -> Result<(), UpgradeError>;
|
||||
/// Atomically point [`ACTIVE_SYMLINK`] at `binary` (recording the previous
|
||||
/// target for revert) and start the new version alongside the old.
|
||||
async fn cutover(&self, binary: &Path, version: &str) -> Result<(), UpgradeError>;
|
||||
/// Restore the pre-cutover symlink target and stop the new version.
|
||||
async fn revert(&self) -> Result<(), UpgradeError>;
|
||||
}
|
||||
|
||||
/// Sink for upgrade phase transitions (→ operator via KV).
|
||||
pub trait StatusSink {
|
||||
async fn publish(&self, phase: AgentUpgradePhase, last_error: Option<String>);
|
||||
}
|
||||
|
||||
/// What the agent process should do after an upgrade attempt.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum UpgradeAction {
|
||||
/// Cutover succeeded and the operator signalled stop — exit(0) so the new
|
||||
/// version (already running) takes over cleanly.
|
||||
Exit,
|
||||
/// The upgrade failed or was reverted — stay running on the current version.
|
||||
StayedOnCurrent,
|
||||
}
|
||||
|
||||
/// The state machine. Stage → verify → cutover → wait for the operator's stop
|
||||
/// (with a heartbeat-timeout revert). Pure orchestration over the injected
|
||||
/// executor, status sink, and post-cutover signal futures.
|
||||
///
|
||||
/// `new_version_healthy` resolves when a heartbeat from the new version is
|
||||
/// observed; `operator_stop` resolves when the operator sends the stop signal.
|
||||
pub async fn drive<E, S>(
|
||||
marker: &AgentUpgradeMarker,
|
||||
executor: &E,
|
||||
status: &S,
|
||||
new_version_healthy: impl std::future::Future<Output = ()>,
|
||||
operator_stop: impl std::future::Future<Output = ()>,
|
||||
heartbeat_timeout: Duration,
|
||||
) -> UpgradeAction
|
||||
where
|
||||
E: UpgradeExecutor,
|
||||
S: StatusSink,
|
||||
{
|
||||
status.publish(Staging, None).await;
|
||||
let staged = match executor.stage(marker).await {
|
||||
Ok(p) => p,
|
||||
Err(e) => {
|
||||
status.publish(Failed, Some(e.to_string())).await;
|
||||
return UpgradeAction::StayedOnCurrent;
|
||||
}
|
||||
};
|
||||
|
||||
status.publish(Verifying, None).await;
|
||||
if let Err(e) = executor.self_test(&staged).await {
|
||||
status
|
||||
.publish(Failed, Some(format!("self-test failed: {e}")))
|
||||
.await;
|
||||
return UpgradeAction::StayedOnCurrent;
|
||||
}
|
||||
|
||||
status.publish(CutoverReady, None).await;
|
||||
if let Err(e) = executor.cutover(&staged, &marker.desired_version).await {
|
||||
let _ = executor.revert().await;
|
||||
status
|
||||
.publish(Failed, Some(format!("cutover failed: {e}")))
|
||||
.await;
|
||||
return UpgradeAction::StayedOnCurrent;
|
||||
}
|
||||
|
||||
// Post-cutover: race the operator's stop against the new version becoming
|
||||
// healthy and the timeout. Stop ⇒ exit. Healthy ⇒ wait indefinitely for
|
||||
// stop (operator-outage-safe — two agents is wasteful but never blind).
|
||||
// Timeout with no healthy heartbeat ⇒ revert and stay.
|
||||
tokio::pin!(new_version_healthy);
|
||||
tokio::pin!(operator_stop);
|
||||
tokio::select! {
|
||||
_ = &mut operator_stop => {
|
||||
status.publish(Done, None).await;
|
||||
return UpgradeAction::Exit;
|
||||
}
|
||||
_ = &mut new_version_healthy => {}
|
||||
_ = tokio::time::sleep(heartbeat_timeout) => {
|
||||
let _ = executor.revert().await;
|
||||
status
|
||||
.publish(Failed, Some("new version did not report healthy in time; reverted".into()))
|
||||
.await;
|
||||
return UpgradeAction::StayedOnCurrent;
|
||||
}
|
||||
}
|
||||
|
||||
// Healthy observed — commit is now the operator's; wait for its stop.
|
||||
operator_stop.await;
|
||||
status.publish(Done, None).await;
|
||||
UpgradeAction::Exit
|
||||
}
|
||||
|
||||
// ───────────────────────── real executor ─────────────────────────
|
||||
|
||||
/// Production [`UpgradeExecutor`]: download+verify, `--self-test`, atomic
|
||||
/// symlink swap, and a systemd transient unit for the new version.
|
||||
pub struct SystemdUpgradeExecutor {
|
||||
/// The version this (old) agent is running — used to preserve its binary at
|
||||
/// a versioned path so `revert` always has a target, even on a pre-M1 plain
|
||||
/// install where `ACTIVE_SYMLINK` is a regular file.
|
||||
current_version: String,
|
||||
/// Previous symlink target, captured at cutover so `revert` can restore it.
|
||||
previous_target: tokio::sync::Mutex<Option<PathBuf>>,
|
||||
/// Transient systemd unit name for the started new version, for `revert`.
|
||||
started_unit: tokio::sync::Mutex<Option<String>>,
|
||||
}
|
||||
|
||||
impl SystemdUpgradeExecutor {
|
||||
pub fn new(current_version: String) -> Self {
|
||||
Self {
|
||||
current_version,
|
||||
previous_target: tokio::sync::Mutex::new(None),
|
||||
started_unit: tokio::sync::Mutex::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
fn transient_unit(version: &str) -> String {
|
||||
format!("fleet-agent-v{version}")
|
||||
}
|
||||
|
||||
/// The revert target: the existing symlink target if `ACTIVE_SYMLINK` is a
|
||||
/// symlink, else the running binary preserved at its versioned path (so a
|
||||
/// plain-file initial install can still be rolled back to).
|
||||
fn capture_revert_target(&self) -> PathBuf {
|
||||
if let Ok(target) = std::fs::read_link(ACTIVE_SYMLINK) {
|
||||
return target;
|
||||
}
|
||||
let preserved =
|
||||
PathBuf::from(VERSIONED_BIN_DIR).join(format!("fleet-agent-v{}", self.current_version));
|
||||
if !preserved.exists() {
|
||||
if let Ok(exe) = std::env::current_exe() {
|
||||
if let Err(e) = std::fs::copy(&exe, &preserved) {
|
||||
tracing::warn!(error = %e, "upgrade: could not preserve current binary");
|
||||
}
|
||||
}
|
||||
}
|
||||
preserved
|
||||
}
|
||||
}
|
||||
|
||||
impl UpgradeExecutor for SystemdUpgradeExecutor {
|
||||
async fn stage(&self, marker: &AgentUpgradeMarker) -> Result<PathBuf, UpgradeError> {
|
||||
use harmony_downloadable_asset::DownloadableAsset;
|
||||
let asset = DownloadableAsset {
|
||||
url: url::Url::parse(&marker.url).map_err(|e| UpgradeError::Url(e.to_string()))?,
|
||||
file_name: format!("fleet-agent-v{}", marker.desired_version),
|
||||
checksum: marker.sha256.clone(),
|
||||
};
|
||||
let path = asset
|
||||
.download_to_path(PathBuf::from(VERSIONED_BIN_DIR))
|
||||
.await?;
|
||||
// Make it executable (0755).
|
||||
let mut perms = std::fs::metadata(&path)?.permissions();
|
||||
use std::os::unix::fs::PermissionsExt;
|
||||
perms.set_mode(0o755);
|
||||
std::fs::set_permissions(&path, perms)?;
|
||||
Ok(path)
|
||||
}
|
||||
|
||||
async fn self_test(&self, binary: &Path) -> Result<(), UpgradeError> {
|
||||
let status = tokio::process::Command::new(binary)
|
||||
.arg("--self-test")
|
||||
.status()
|
||||
.await
|
||||
.map_err(|e| UpgradeError::SelfTest(e.to_string()))?;
|
||||
if status.success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(UpgradeError::SelfTest(format!("exit {status}")))
|
||||
}
|
||||
}
|
||||
|
||||
async fn cutover(&self, binary: &Path, version: &str) -> Result<(), UpgradeError> {
|
||||
// Record (and, if needed, preserve) the current binary so revert can
|
||||
// restore it.
|
||||
*self.previous_target.lock().await = Some(self.capture_revert_target());
|
||||
|
||||
// Atomic swap: symlink to a temp name, rename over the active path.
|
||||
let tmp = PathBuf::from(format!("{ACTIVE_SYMLINK}.new"));
|
||||
let _ = std::fs::remove_file(&tmp);
|
||||
std::os::unix::fs::symlink(binary, &tmp)?;
|
||||
std::fs::rename(&tmp, ACTIVE_SYMLINK)?;
|
||||
|
||||
// Start the new version alongside us as a transient systemd unit, so the
|
||||
// operator can observe its heartbeat before committing the stop.
|
||||
let unit = Self::transient_unit(version);
|
||||
let out = tokio::process::Command::new("systemd-run")
|
||||
.args(["--unit", &unit, "--collect"])
|
||||
.arg(binary)
|
||||
.output()
|
||||
.await
|
||||
.map_err(|e| UpgradeError::Systemd(e.to_string()))?;
|
||||
if !out.status.success() {
|
||||
return Err(UpgradeError::Systemd(
|
||||
String::from_utf8_lossy(&out.stderr).to_string(),
|
||||
));
|
||||
}
|
||||
*self.started_unit.lock().await = Some(unit);
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn revert(&self) -> Result<(), UpgradeError> {
|
||||
// Stop the transient unit if we started one.
|
||||
if let Some(unit) = self.started_unit.lock().await.take() {
|
||||
let _ = tokio::process::Command::new("systemctl")
|
||||
.args(["stop", &unit])
|
||||
.status()
|
||||
.await;
|
||||
}
|
||||
// Restore the previous symlink target.
|
||||
let previous = self
|
||||
.previous_target
|
||||
.lock()
|
||||
.await
|
||||
.clone()
|
||||
.ok_or(UpgradeError::NothingToRevert)?;
|
||||
let tmp = PathBuf::from(format!("{ACTIVE_SYMLINK}.revert"));
|
||||
let _ = std::fs::remove_file(&tmp);
|
||||
std::os::unix::fs::symlink(&previous, &tmp)?;
|
||||
std::fs::rename(&tmp, ACTIVE_SYMLINK)?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
// ───────────────────────── NATS wiring ─────────────────────────
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_nats::jetstream::{self, kv::Operation};
|
||||
use futures_util::StreamExt;
|
||||
use harmony_reconciler_contracts::{
|
||||
AgentUpgradeStatus, BUCKET_AGENT_UPGRADE, BUCKET_AGENT_UPGRADE_STATUS, BUCKET_DEVICE_HEARTBEAT,
|
||||
HeartbeatPayload, Id, agent_upgrade_key, device_heartbeat_key,
|
||||
};
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use tokio::sync::Notify;
|
||||
|
||||
/// The operator's cutover stop signal, shared between the command server (which
|
||||
/// receives `Verb::UpgradeStop`) and the upgrade manager (which awaits it).
|
||||
///
|
||||
/// Both old and new agents subscribe to the device command subject, so both
|
||||
/// *receive* the stop — but it's only **armed** on the old agent during its
|
||||
/// post-cutover wait. Outside that window `fire` is a no-op, so a stray stop
|
||||
/// (or one meant for a previous upgrade) can't make an agent exit, and no stale
|
||||
/// permit accumulates to trip the next upgrade.
|
||||
pub struct UpgradeStopSignal {
|
||||
notify: Notify,
|
||||
armed: AtomicBool,
|
||||
}
|
||||
|
||||
impl Default for UpgradeStopSignal {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
notify: Notify::new(),
|
||||
armed: AtomicBool::new(false),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl UpgradeStopSignal {
|
||||
/// Fire the signal if armed. Returns whether it was (for logging).
|
||||
pub fn fire(&self) -> bool {
|
||||
if self.armed.load(Ordering::SeqCst) {
|
||||
self.notify.notify_one();
|
||||
true
|
||||
} else {
|
||||
false
|
||||
}
|
||||
}
|
||||
async fn notified(&self) {
|
||||
self.notify.notified().await;
|
||||
}
|
||||
fn arm(&self) {
|
||||
self.armed.store(true, Ordering::SeqCst);
|
||||
}
|
||||
fn disarm(&self) {
|
||||
self.armed.store(false, Ordering::SeqCst);
|
||||
}
|
||||
}
|
||||
|
||||
/// Writes [`AgentUpgradeStatus`] to the status KV so the operator can reflect
|
||||
/// the live phase onto the `Device` CR.
|
||||
struct KvStatusSink {
|
||||
bucket: jetstream::kv::Store,
|
||||
device_id: Id,
|
||||
current_version: String,
|
||||
target: String,
|
||||
}
|
||||
|
||||
impl StatusSink for KvStatusSink {
|
||||
async fn publish(&self, phase: AgentUpgradePhase, last_error: Option<String>) {
|
||||
let status = AgentUpgradeStatus {
|
||||
device_id: self.device_id.clone(),
|
||||
phase,
|
||||
current_version: self.current_version.clone(),
|
||||
target_version: Some(self.target.clone()),
|
||||
last_error,
|
||||
at: chrono::Utc::now(),
|
||||
};
|
||||
let key = agent_upgrade_key(&self.device_id.to_string());
|
||||
match serde_json::to_vec(&status) {
|
||||
Ok(payload) => {
|
||||
if let Err(e) = self.bucket.put(&key, payload.into()).await {
|
||||
tracing::warn!(error = %e, "upgrade: status publish failed");
|
||||
}
|
||||
}
|
||||
Err(e) => tracing::warn!(error = %e, "upgrade: status serialize failed"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Watch the device's upgrade marker; run the state machine when a marker asks
|
||||
/// for a different version. Returns `Ok(())` only when an upgrade succeeds and
|
||||
/// the operator signals stop — the caller (main) treats that as "exit so the
|
||||
/// new version takes over". Failures/reverts keep watching for a fixed marker.
|
||||
pub async fn run(
|
||||
client: async_nats::Client,
|
||||
device_id: Id,
|
||||
current_version: String,
|
||||
operator_stop: Arc<UpgradeStopSignal>,
|
||||
) -> anyhow::Result<()> {
|
||||
let js = jetstream::new(client);
|
||||
let marker_bucket = js
|
||||
.create_key_value(jetstream::kv::Config {
|
||||
bucket: BUCKET_AGENT_UPGRADE.to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let status_bucket = js
|
||||
.create_key_value(jetstream::kv::Config {
|
||||
bucket: BUCKET_AGENT_UPGRADE_STATUS.to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let heartbeat_bucket = js
|
||||
.create_key_value(jetstream::kv::Config {
|
||||
bucket: BUCKET_DEVICE_HEARTBEAT.to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
|
||||
let my_key = agent_upgrade_key(&device_id.to_string());
|
||||
tracing::info!(version = %current_version, key = %my_key, "upgrade: watching marker");
|
||||
// Outer loop re-establishes the watch after a NATS blip — only an
|
||||
// operator-committed cutover (`UpgradeAction::Exit`) returns from here.
|
||||
loop {
|
||||
let mut watch = marker_bucket.watch_with_history(&my_key).await?;
|
||||
while let Some(entry) = watch.next().await {
|
||||
let entry = match entry {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "upgrade: marker watch error");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if entry.operation != Operation::Put {
|
||||
continue;
|
||||
}
|
||||
let marker: AgentUpgradeMarker = match serde_json::from_slice(&entry.value) {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "upgrade: bad marker payload");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
if marker.desired_version == current_version {
|
||||
tracing::debug!(version = %current_version, "upgrade: already at desired version");
|
||||
continue;
|
||||
}
|
||||
|
||||
tracing::info!(
|
||||
from = %current_version,
|
||||
to = %marker.desired_version,
|
||||
"upgrade: marker requests new version"
|
||||
);
|
||||
let executor = SystemdUpgradeExecutor::new(current_version.clone());
|
||||
let sink = KvStatusSink {
|
||||
bucket: status_bucket.clone(),
|
||||
device_id: device_id.clone(),
|
||||
current_version: current_version.clone(),
|
||||
target: marker.desired_version.clone(),
|
||||
};
|
||||
let healthy =
|
||||
wait_new_version_healthy(&heartbeat_bucket, &device_id, &marker.desired_version);
|
||||
// Arm the stop signal for the duration of this attempt; the command
|
||||
// server only forwards an operator stop while armed.
|
||||
operator_stop.arm();
|
||||
let stop = {
|
||||
let signal = operator_stop.clone();
|
||||
async move { signal.notified().await }
|
||||
};
|
||||
let action = drive(&marker, &executor, &sink, healthy, stop, HEARTBEAT_TIMEOUT).await;
|
||||
operator_stop.disarm();
|
||||
match action {
|
||||
UpgradeAction::Exit => {
|
||||
tracing::info!(
|
||||
"upgrade: cutover committed by operator; exiting for new version"
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
UpgradeAction::StayedOnCurrent => {
|
||||
tracing::warn!("upgrade: stayed on current version; awaiting a new marker");
|
||||
}
|
||||
}
|
||||
}
|
||||
tracing::warn!("upgrade: marker watch ended; re-establishing");
|
||||
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Resolve once a heartbeat from `target` version is observed on this device's
|
||||
/// heartbeat key (old and new both write it; we wait for the new one).
|
||||
async fn wait_new_version_healthy(bucket: &jetstream::kv::Store, device_id: &Id, target: &str) {
|
||||
let key = device_heartbeat_key(&device_id.to_string());
|
||||
let mut watch = match bucket.watch_with_history(&key).await {
|
||||
Ok(w) => w,
|
||||
// If we can't watch, never report healthy — the drive's timeout reverts.
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "upgrade: heartbeat watch failed");
|
||||
return std::future::pending::<()>().await;
|
||||
}
|
||||
};
|
||||
while let Some(entry) = watch.next().await {
|
||||
if let Ok(e) = entry {
|
||||
if e.operation == Operation::Put {
|
||||
if let Ok(hb) = serde_json::from_slice::<HeartbeatPayload>(&e.value) {
|
||||
if hb.agent_version == target {
|
||||
return;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use std::sync::Mutex;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
|
||||
/// Records the phase sequence and lets a test fail any step.
|
||||
#[derive(Default)]
|
||||
struct FakeExecutor {
|
||||
fail_stage: bool,
|
||||
fail_self_test: bool,
|
||||
fail_cutover: bool,
|
||||
reverted: AtomicUsize,
|
||||
cutover_calls: AtomicUsize,
|
||||
}
|
||||
|
||||
impl UpgradeExecutor for FakeExecutor {
|
||||
async fn stage(&self, _m: &AgentUpgradeMarker) -> Result<PathBuf, UpgradeError> {
|
||||
if self.fail_stage {
|
||||
return Err(UpgradeError::Url("bad".into()));
|
||||
}
|
||||
Ok(PathBuf::from("/usr/bin/fleet-agent-v9.9.9"))
|
||||
}
|
||||
async fn self_test(&self, _b: &Path) -> Result<(), UpgradeError> {
|
||||
if self.fail_self_test {
|
||||
return Err(UpgradeError::SelfTest("rc 1".into()));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn cutover(&self, _b: &Path, _v: &str) -> Result<(), UpgradeError> {
|
||||
self.cutover_calls.fetch_add(1, Ordering::SeqCst);
|
||||
if self.fail_cutover {
|
||||
return Err(UpgradeError::Systemd("nope".into()));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
async fn revert(&self) -> Result<(), UpgradeError> {
|
||||
self.reverted.fetch_add(1, Ordering::SeqCst);
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct RecordingSink(Mutex<Vec<AgentUpgradePhase>>);
|
||||
impl StatusSink for RecordingSink {
|
||||
async fn publish(&self, phase: AgentUpgradePhase, _e: Option<String>) {
|
||||
self.0.lock().unwrap().push(phase);
|
||||
}
|
||||
}
|
||||
impl RecordingSink {
|
||||
fn phases(&self) -> Vec<AgentUpgradePhase> {
|
||||
self.0.lock().unwrap().clone()
|
||||
}
|
||||
}
|
||||
|
||||
fn marker() -> AgentUpgradeMarker {
|
||||
AgentUpgradeMarker {
|
||||
desired_version: "9.9.9".into(),
|
||||
url: "https://example/agent".into(),
|
||||
sha256: "deadbeef".into(),
|
||||
}
|
||||
}
|
||||
|
||||
use std::future::{Future, pending, ready};
|
||||
|
||||
fn never() -> impl Future<Output = ()> {
|
||||
pending::<()>()
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn happy_path_cutover_then_stop_exits() {
|
||||
let exec = FakeExecutor::default();
|
||||
let sink = RecordingSink::default();
|
||||
// Operator stop fires immediately (it saw the new heartbeat).
|
||||
let action = drive(
|
||||
&marker(),
|
||||
&exec,
|
||||
&sink,
|
||||
never(),
|
||||
ready(()),
|
||||
HEARTBEAT_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(action, UpgradeAction::Exit);
|
||||
assert_eq!(exec.reverted.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(sink.phases(), vec![Staging, Verifying, CutoverReady, Done]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn healthy_then_stop_exits_without_revert() {
|
||||
let exec = FakeExecutor::default();
|
||||
let sink = RecordingSink::default();
|
||||
// Healthy observed first; then (still) operator stop → exit, no revert.
|
||||
let action = drive(
|
||||
&marker(),
|
||||
&exec,
|
||||
&sink,
|
||||
ready(()),
|
||||
ready(()),
|
||||
HEARTBEAT_TIMEOUT,
|
||||
)
|
||||
.await;
|
||||
assert_eq!(action, UpgradeAction::Exit);
|
||||
assert_eq!(exec.reverted.load(Ordering::SeqCst), 0);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn stage_failure_stays_and_does_not_cutover() {
|
||||
let exec = FakeExecutor {
|
||||
fail_stage: true,
|
||||
..Default::default()
|
||||
};
|
||||
let sink = RecordingSink::default();
|
||||
let action = drive(&marker(), &exec, &sink, never(), never(), HEARTBEAT_TIMEOUT).await;
|
||||
assert_eq!(action, UpgradeAction::StayedOnCurrent);
|
||||
assert_eq!(exec.cutover_calls.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(sink.phases(), vec![Staging, Failed]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn self_test_failure_stays_and_does_not_cutover() {
|
||||
let exec = FakeExecutor {
|
||||
fail_self_test: true,
|
||||
..Default::default()
|
||||
};
|
||||
let sink = RecordingSink::default();
|
||||
let action = drive(&marker(), &exec, &sink, never(), never(), HEARTBEAT_TIMEOUT).await;
|
||||
assert_eq!(action, UpgradeAction::StayedOnCurrent);
|
||||
assert_eq!(exec.cutover_calls.load(Ordering::SeqCst), 0);
|
||||
assert_eq!(sink.phases(), vec![Staging, Verifying, Failed]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn cutover_failure_reverts_and_stays() {
|
||||
let exec = FakeExecutor {
|
||||
fail_cutover: true,
|
||||
..Default::default()
|
||||
};
|
||||
let sink = RecordingSink::default();
|
||||
let action = drive(&marker(), &exec, &sink, never(), never(), HEARTBEAT_TIMEOUT).await;
|
||||
assert_eq!(action, UpgradeAction::StayedOnCurrent);
|
||||
assert_eq!(exec.reverted.load(Ordering::SeqCst), 1);
|
||||
assert_eq!(
|
||||
sink.phases(),
|
||||
vec![Staging, Verifying, CutoverReady, Failed]
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test(start_paused = true)]
|
||||
async fn heartbeat_timeout_reverts_and_stays() {
|
||||
let exec = FakeExecutor::default();
|
||||
let sink = RecordingSink::default();
|
||||
// Never healthy, never stopped → the timeout fires → revert.
|
||||
let action = drive(
|
||||
&marker(),
|
||||
&exec,
|
||||
&sink,
|
||||
never(),
|
||||
never(),
|
||||
Duration::from_secs(60),
|
||||
)
|
||||
.await;
|
||||
assert_eq!(action, UpgradeAction::StayedOnCurrent);
|
||||
assert_eq!(exec.reverted.load(Ordering::SeqCst), 1);
|
||||
assert_eq!(
|
||||
sink.phases(),
|
||||
vec![Staging, Verifying, CutoverReady, Failed]
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -102,6 +102,24 @@ impl FleetCommandsClient {
|
||||
};
|
||||
Ok(serde_json::from_slice(&resp.payload)?)
|
||||
}
|
||||
|
||||
/// Send `Verb::UpgradeStop` at upgrade cutover (ADR-022). The operator is the
|
||||
/// single source of truth for the stop — it calls this only after observing
|
||||
/// the new version's heartbeat. The old agent acks, then exits; the new
|
||||
/// (already running) version takes over.
|
||||
pub async fn upgrade_stop(&self, device_id: &str, reason: &str) -> Result<(), CommandError> {
|
||||
let subject = device_command_subject(device_id, Verb::UpgradeStop);
|
||||
let body = serde_json::to_vec(&CommandRequest::UpgradeStop {
|
||||
reason: reason.to_string(),
|
||||
})
|
||||
.expect("CommandRequest serializes");
|
||||
let fut = self.nc.request(subject, body.into());
|
||||
match tokio::time::timeout(self.timeout, fut).await {
|
||||
Ok(Ok(_)) => Ok(()),
|
||||
Ok(Err(err)) => Err(map_request_error(err, self.timeout)),
|
||||
Err(_) => Err(CommandError::Timeout(self.timeout)),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Map an async-nats `RequestError` to our typed surface. The `kind`
|
||||
|
||||
@@ -17,3 +17,4 @@ pub mod device_reconciler;
|
||||
pub mod device_status;
|
||||
pub mod fleet_aggregator;
|
||||
pub mod liveness;
|
||||
pub mod upgrade_coordinator;
|
||||
|
||||
@@ -5,7 +5,9 @@ mod frontend;
|
||||
#[cfg(feature = "web-frontend")]
|
||||
mod service;
|
||||
|
||||
use harmony_fleet_operator::{device_reconciler, device_status, fleet_aggregator};
|
||||
use harmony_fleet_operator::{
|
||||
device_reconciler, device_status, fleet_aggregator, upgrade_coordinator,
|
||||
};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use async_nats::jetstream;
|
||||
@@ -258,10 +260,15 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()>
|
||||
let dr_js = js.clone();
|
||||
let ds_client = client.clone();
|
||||
let ds_js = js.clone();
|
||||
// upgrade_coordinator — agent-upgrade status/heartbeat → cutover stop + CR
|
||||
let uc_client = client.clone();
|
||||
let uc_js = js.clone();
|
||||
let uc_commands = harmony_fleet_operator::commands::FleetCommandsClient::new(nats.clone());
|
||||
tokio::select! {
|
||||
r = controller::run(ctl_client, desired_state_kv) => r,
|
||||
r = device_reconciler::run(dr_client, dr_js) => r,
|
||||
r = device_status::run(ds_client, ds_js) => r,
|
||||
r = upgrade_coordinator::run(uc_client, uc_js, uc_commands) => r,
|
||||
r = fleet_aggregator::run(client, js, liveness) => r,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -592,6 +592,8 @@ mod tests {
|
||||
DeviceLiveness {
|
||||
last_heartbeat: None,
|
||||
reachability: r,
|
||||
current_version: None,
|
||||
upgrade: None,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
238
fleet/harmony-fleet-operator/src/upgrade_coordinator.rs
Normal file
238
fleet/harmony-fleet-operator/src/upgrade_coordinator.rs
Normal file
@@ -0,0 +1,238 @@
|
||||
//! Operator-side agent-upgrade coordinator (ADR-022).
|
||||
//!
|
||||
//! The operator is the single source of truth for the cutover stop. This loop:
|
||||
//! 1. watches the agent **upgrade-status** KV (phase per device) and the
|
||||
//! **heartbeat** KV (running version per device);
|
||||
//! 2. when a device is `CutoverReady` **and** the operator has independently
|
||||
//! observed a heartbeat from the new version, sends `Verb::UpgradeStop` to
|
||||
//! the old agent — never before;
|
||||
//! 3. reflects the current version + upgrade phase onto the `Device` CR.
|
||||
//!
|
||||
//! The agent never self-stops; only this operator-observed handoff advances the
|
||||
//! upgrade. See [`should_send_stop`] for the load-bearing decision.
|
||||
|
||||
use std::collections::{HashMap, HashSet};
|
||||
|
||||
use anyhow::Result;
|
||||
use async_nats::jetstream::kv::Operation;
|
||||
use futures_util::StreamExt;
|
||||
use harmony_reconciler_contracts::{
|
||||
AgentUpgradePhase, AgentUpgradeStatus, BUCKET_AGENT_UPGRADE_STATUS, BUCKET_DEVICE_HEARTBEAT,
|
||||
HeartbeatPayload,
|
||||
};
|
||||
use kube::Client;
|
||||
use kube::api::{Api, Patch, PatchParams};
|
||||
use serde_json::json;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use harmony::modules::fleet::operator::{Device, DeviceUpgradeStatus};
|
||||
|
||||
use crate::commands::FleetCommandsClient;
|
||||
|
||||
/// The handoff decision: commit the stop only when the agent is `CutoverReady`
|
||||
/// **and** the operator has seen a heartbeat from the target version. Pure so
|
||||
/// it can be unit-tested without NATS.
|
||||
pub fn should_send_stop(status: &AgentUpgradeStatus, observed_version: Option<&str>) -> bool {
|
||||
if status.phase != AgentUpgradePhase::CutoverReady {
|
||||
return false;
|
||||
}
|
||||
match (&status.target_version, observed_version) {
|
||||
(Some(target), Some(seen)) => seen == target,
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct Coordinator {
|
||||
/// Latest upgrade status per device.
|
||||
statuses: HashMap<String, AgentUpgradeStatus>,
|
||||
/// Latest heartbeat version per device.
|
||||
heartbeat_versions: HashMap<String, String>,
|
||||
/// Devices we've already sent the stop to (don't resend every tick).
|
||||
stopped: HashSet<String>,
|
||||
}
|
||||
|
||||
pub async fn run(
|
||||
kube: Client,
|
||||
js: async_nats::jetstream::Context,
|
||||
commands: FleetCommandsClient,
|
||||
) -> Result<()> {
|
||||
let status_bucket = js
|
||||
.create_key_value(async_nats::jetstream::kv::Config {
|
||||
bucket: BUCKET_AGENT_UPGRADE_STATUS.to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
let heartbeat_bucket = js
|
||||
.create_key_value(async_nats::jetstream::kv::Config {
|
||||
bucket: BUCKET_DEVICE_HEARTBEAT.to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
|
||||
let state = Mutex::new(Coordinator::default());
|
||||
let devices: Api<Device> = Api::all(kube);
|
||||
|
||||
let status_watch = async {
|
||||
let mut w = status_bucket.watch_with_history(">").await?;
|
||||
while let Some(entry) = w.next().await {
|
||||
let Ok(entry) = entry else { continue };
|
||||
if entry.operation != Operation::Put {
|
||||
continue;
|
||||
}
|
||||
let Ok(status) = serde_json::from_slice::<AgentUpgradeStatus>(&entry.value) else {
|
||||
continue;
|
||||
};
|
||||
let device = status.device_id.to_string();
|
||||
{
|
||||
let mut g = state.lock().await;
|
||||
// A fresh non-terminal phase means a new attempt — allow a stop again.
|
||||
if status.phase != AgentUpgradePhase::CutoverReady {
|
||||
g.stopped.remove(&device);
|
||||
}
|
||||
g.statuses.insert(device.clone(), status);
|
||||
}
|
||||
reconcile_device(&state, &devices, &commands, &device).await;
|
||||
}
|
||||
Ok::<(), anyhow::Error>(())
|
||||
};
|
||||
|
||||
let heartbeat_watch = async {
|
||||
let mut w = heartbeat_bucket.watch_with_history(">").await?;
|
||||
while let Some(entry) = w.next().await {
|
||||
let Ok(entry) = entry else { continue };
|
||||
if entry.operation != Operation::Put {
|
||||
continue;
|
||||
}
|
||||
let Ok(hb) = serde_json::from_slice::<HeartbeatPayload>(&entry.value) else {
|
||||
continue;
|
||||
};
|
||||
if hb.agent_version.is_empty() {
|
||||
continue;
|
||||
}
|
||||
let device = hb.device_id.to_string();
|
||||
state
|
||||
.lock()
|
||||
.await
|
||||
.heartbeat_versions
|
||||
.insert(device.clone(), hb.agent_version);
|
||||
reconcile_device(&state, &devices, &commands, &device).await;
|
||||
}
|
||||
Ok::<(), anyhow::Error>(())
|
||||
};
|
||||
|
||||
tokio::try_join!(status_watch, heartbeat_watch)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// For one device: decide whether to commit the stop, then reflect version +
|
||||
/// upgrade phase onto its CR.
|
||||
async fn reconcile_device(
|
||||
state: &Mutex<Coordinator>,
|
||||
devices: &Api<Device>,
|
||||
commands: &FleetCommandsClient,
|
||||
device: &str,
|
||||
) {
|
||||
// Snapshot the decision inputs under the lock.
|
||||
let (send_stop, status, version) = {
|
||||
let g = state.lock().await;
|
||||
let status = g.statuses.get(device).cloned();
|
||||
let version = g.heartbeat_versions.get(device).cloned();
|
||||
let send_stop = status
|
||||
.as_ref()
|
||||
.map(|s| should_send_stop(s, version.as_deref()) && !g.stopped.contains(device))
|
||||
.unwrap_or(false);
|
||||
(send_stop, status, version)
|
||||
};
|
||||
|
||||
if send_stop {
|
||||
// Mark before sending so a concurrent reconcile doesn't double-send.
|
||||
state.lock().await.stopped.insert(device.to_string());
|
||||
tracing::info!(device, "upgrade: new version healthy; sending cutover stop");
|
||||
if let Err(e) = commands
|
||||
.upgrade_stop(device, "upgrade cutover committed")
|
||||
.await
|
||||
{
|
||||
tracing::warn!(device, error = %e, "upgrade: stop send failed; will retry on next event");
|
||||
state.lock().await.stopped.remove(device);
|
||||
}
|
||||
}
|
||||
|
||||
patch_device(devices, device, status.as_ref(), version.as_deref()).await;
|
||||
}
|
||||
|
||||
async fn patch_device(
|
||||
devices: &Api<Device>,
|
||||
device: &str,
|
||||
status: Option<&AgentUpgradeStatus>,
|
||||
version: Option<&str>,
|
||||
) {
|
||||
// Surface the upgrade only while it's in flight (drop it on Done/Running).
|
||||
let upgrade = status.and_then(|s| match s.phase {
|
||||
AgentUpgradePhase::Done | AgentUpgradePhase::Running => None,
|
||||
phase => Some(DeviceUpgradeStatus {
|
||||
phase,
|
||||
target_version: s.target_version.clone(),
|
||||
last_error: s.last_error.clone(),
|
||||
}),
|
||||
});
|
||||
let patch = json!({
|
||||
"status": {
|
||||
"currentVersion": version,
|
||||
"upgrade": upgrade,
|
||||
}
|
||||
});
|
||||
if let Err(e) = devices
|
||||
.patch_status(device, &PatchParams::default(), &Patch::Merge(&patch))
|
||||
.await
|
||||
{
|
||||
// A heartbeat/status can outrace the Device CR's creation; retry later.
|
||||
if !matches!(&e, kube::Error::Api(ae) if ae.code == 404) {
|
||||
tracing::warn!(device, error = %e, "upgrade: device status patch failed");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use chrono::Utc;
|
||||
use harmony_reconciler_contracts::Id;
|
||||
|
||||
fn status(phase: AgentUpgradePhase, target: Option<&str>) -> AgentUpgradeStatus {
|
||||
AgentUpgradeStatus {
|
||||
device_id: Id::from("dev-1"),
|
||||
phase,
|
||||
current_version: "0.1.0".into(),
|
||||
target_version: target.map(String::from),
|
||||
last_error: None,
|
||||
at: Utc::now(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn commits_stop_only_when_cutover_ready_and_new_version_seen() {
|
||||
let s = status(AgentUpgradePhase::CutoverReady, Some("0.2.0"));
|
||||
assert!(should_send_stop(&s, Some("0.2.0")));
|
||||
// New version not yet observed (still old) → hold.
|
||||
assert!(!should_send_stop(&s, Some("0.1.0")));
|
||||
// No heartbeat seen → hold.
|
||||
assert!(!should_send_stop(&s, None));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn never_commits_before_cutover_ready() {
|
||||
for phase in [
|
||||
AgentUpgradePhase::Staging,
|
||||
AgentUpgradePhase::Verifying,
|
||||
AgentUpgradePhase::Failed,
|
||||
AgentUpgradePhase::Running,
|
||||
] {
|
||||
let s = status(phase, Some("0.2.0"));
|
||||
assert!(
|
||||
!should_send_stop(&s, Some("0.2.0")),
|
||||
"{phase:?} must not commit"
|
||||
);
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -45,6 +45,12 @@ pub enum Verb {
|
||||
/// Run a shell command on the device and return its captured
|
||||
/// output. Single-shot (no streaming); the agent runs `sh -c`.
|
||||
Exec,
|
||||
/// Operator → old agent: gracefully exit during an upgrade cutover
|
||||
/// (ADR-022). The operator is the single source of truth for the stop —
|
||||
/// the agent never self-stops. Reusing the command protocol means no new
|
||||
/// subject or callout-permission change.
|
||||
#[serde(rename = "upgrade-stop")]
|
||||
UpgradeStop,
|
||||
}
|
||||
|
||||
impl Verb {
|
||||
@@ -55,6 +61,7 @@ impl Verb {
|
||||
match self {
|
||||
Verb::Ping => "ping",
|
||||
Verb::Exec => "exec",
|
||||
Verb::UpgradeStop => "upgrade-stop",
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -77,7 +84,14 @@ pub fn device_command_subscription(device_id: &str) -> String {
|
||||
#[serde(tag = "verb", rename_all = "lowercase")]
|
||||
pub enum CommandRequest {
|
||||
Ping,
|
||||
Exec { command: String },
|
||||
Exec {
|
||||
command: String,
|
||||
},
|
||||
/// Stop the old agent at upgrade cutover. `reason` is logged.
|
||||
#[serde(rename = "upgrade-stop")]
|
||||
UpgradeStop {
|
||||
reason: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// JSON body of a `Verb::Ping` reply.
|
||||
|
||||
@@ -139,6 +139,11 @@ pub struct DeploymentState {
|
||||
pub struct HeartbeatPayload {
|
||||
pub device_id: Id,
|
||||
pub at: DateTime<Utc>,
|
||||
/// Agent crate version the device is running (ADR-022). `#[serde(default)]`
|
||||
/// so a pre-upgrade agent's heartbeat (no field) decodes as `""` =
|
||||
/// "unknown" rather than failing — operators in the field must tolerate it.
|
||||
#[serde(default)]
|
||||
pub agent_version: String,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
@@ -239,10 +244,11 @@ mod tests {
|
||||
let hb = HeartbeatPayload {
|
||||
device_id: Id::from("pi-01".to_string()),
|
||||
at: ts("2026-04-22T10:00:30Z"),
|
||||
agent_version: "0.1.0".to_string(),
|
||||
};
|
||||
let bytes = serde_json::to_vec(&hb).unwrap();
|
||||
assert!(
|
||||
bytes.len() < 96,
|
||||
bytes.len() < 128,
|
||||
"heartbeat payload grew to {} bytes: {}",
|
||||
bytes.len(),
|
||||
String::from_utf8_lossy(&bytes),
|
||||
|
||||
@@ -32,6 +32,20 @@ pub const BUCKET_DEVICE_STATE: &str = "device-state";
|
||||
/// the state bucket. Key format: `heartbeat.<device_id>`.
|
||||
pub const BUCKET_DEVICE_HEARTBEAT: &str = "device-heartbeat";
|
||||
|
||||
/// Operator → agent upgrade marker (ADR-022): desired agent version + signed
|
||||
/// binary URL + SHA-256, one per device. Key format: `agent-upgrade.<device_id>`.
|
||||
pub const BUCKET_AGENT_UPGRADE: &str = "agent-upgrade";
|
||||
|
||||
/// Agent → operator upgrade status: the live phase of the upgrade state
|
||||
/// machine. Key format: `agent-upgrade.<device_id>`.
|
||||
pub const BUCKET_AGENT_UPGRADE_STATUS: &str = "agent-upgrade-status";
|
||||
|
||||
/// KV key for a device's upgrade marker / status. Format:
|
||||
/// `agent-upgrade.<device_id>` (shared shape across both upgrade buckets).
|
||||
pub fn agent_upgrade_key(device_id: &str) -> String {
|
||||
format!("agent-upgrade.{device_id}")
|
||||
}
|
||||
|
||||
/// KV key for a `(device, deployment)` pair in [`BUCKET_DESIRED_STATE`].
|
||||
/// Format: `<device>.<deployment>`.
|
||||
pub fn desired_state_key(device_id: &str, deployment_name: &DeploymentName) -> String {
|
||||
|
||||
@@ -20,6 +20,7 @@ pub mod commands;
|
||||
pub mod fleet;
|
||||
pub mod kv;
|
||||
pub mod status;
|
||||
pub mod upgrade;
|
||||
|
||||
pub use commands::{
|
||||
CommandRequest, ErrorKind, ErrorReply, ExecReply, HDR_DEADLINE, HDR_FINAL, HDR_OPERATOR_SUB,
|
||||
@@ -30,11 +31,13 @@ pub use fleet::{
|
||||
DeploymentName, DeploymentState, DeviceInfo, HeartbeatPayload, InvalidDeploymentName,
|
||||
};
|
||||
pub use kv::{
|
||||
BUCKET_DESIRED_STATE, BUCKET_DEVICE_HEARTBEAT, BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE,
|
||||
BUCKET_AGENT_UPGRADE, BUCKET_AGENT_UPGRADE_STATUS, BUCKET_DESIRED_STATE,
|
||||
BUCKET_DEVICE_HEARTBEAT, BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE, agent_upgrade_key,
|
||||
desired_state_key, desired_state_watch_filter, device_heartbeat_key, device_info_key,
|
||||
device_state_key,
|
||||
};
|
||||
pub use status::{InventorySnapshot, Phase};
|
||||
pub use upgrade::{AgentUpgradeMarker, AgentUpgradePhase, AgentUpgradeStatus};
|
||||
|
||||
// Re-exports so consumers (agent, operator) don't need a direct
|
||||
// harmony_types dependency purely to name the cross-boundary types.
|
||||
|
||||
98
harmony-reconciler-contracts/src/upgrade.rs
Normal file
98
harmony-reconciler-contracts/src/upgrade.rs
Normal file
@@ -0,0 +1,98 @@
|
||||
//! Agent self-upgrade wire types (ADR-022).
|
||||
//!
|
||||
//! The operator publishes an [`AgentUpgradeMarker`] per device (desired version
|
||||
//! + signed binary URL + SHA-256). The agent runs the upgrade state machine and
|
||||
//! publishes [`AgentUpgradeStatus`] back so the operator can reflect progress
|
||||
//! onto the `Device` CR and decide when to send the stop signal. Both ride NATS
|
||||
//! KV (not a fire-and-forget subject) so they survive an operator restart.
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::Id;
|
||||
|
||||
/// Operator → agent: the version this device should run, and where to fetch it.
|
||||
/// Written to KV key `agent-upgrade.<device_id>` in
|
||||
/// [`crate::BUCKET_AGENT_UPGRADE`]. The agent acts only when
|
||||
/// `desired_version` differs from what it's running; **any** change re-triggers
|
||||
/// the state machine (so re-publishing the same version after a fix retries).
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct AgentUpgradeMarker {
|
||||
/// Target crate version, `MAJOR.MINOR.PATCH` (no `v` prefix, no build
|
||||
/// metadata — the on-disk path is `/usr/bin/fleet-agent-v<version>`).
|
||||
pub desired_version: String,
|
||||
/// HTTPS URL of the versioned binary (Gitea/Harbor release asset, CDN, …).
|
||||
pub url: String,
|
||||
/// Lowercase hex SHA-256 of the binary. Verified before the binary is
|
||||
/// staged; a mismatch fails the upgrade and the agent stays on its version.
|
||||
pub sha256: String,
|
||||
}
|
||||
|
||||
/// Phases of the agent upgrade state machine (ADR-022). `Running` is steady
|
||||
/// state; the rest are the upgrade in flight. `Failed` and `Done` are terminal
|
||||
/// for a given marker.
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum AgentUpgradePhase {
|
||||
/// Normal reconcile loop; no upgrade in flight.
|
||||
Running,
|
||||
/// Refusing to start new services; in-flight reconciles drain.
|
||||
Draining,
|
||||
/// Fetching + verifying the new binary onto disk.
|
||||
Staging,
|
||||
/// Running the staged binary's `--self-test` before any swap.
|
||||
Verifying,
|
||||
/// Symlink swapped, new version started alongside; awaiting the operator's
|
||||
/// stop signal (the operator only sends it after seeing the new version's
|
||||
/// heartbeat — the agent never self-stops).
|
||||
CutoverReady,
|
||||
/// The upgrade did not complete; `last_error` says why. The agent stayed on
|
||||
/// (or reverted to) its previous version.
|
||||
Failed,
|
||||
/// The new version is the active one; this marker is satisfied.
|
||||
Done,
|
||||
}
|
||||
|
||||
/// Agent → operator: current upgrade phase + the versions involved. Written to
|
||||
/// KV key `agent-upgrade.<device_id>` in [`crate::BUCKET_AGENT_UPGRADE_STATUS`].
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct AgentUpgradeStatus {
|
||||
pub device_id: Id,
|
||||
pub phase: AgentUpgradePhase,
|
||||
/// Version the agent is currently running.
|
||||
pub current_version: String,
|
||||
/// Version the agent is upgrading toward (the marker's `desired_version`).
|
||||
/// `None` when phase is `Running` with no marker.
|
||||
#[serde(default)]
|
||||
pub target_version: Option<String>,
|
||||
#[serde(default)]
|
||||
pub last_error: Option<String>,
|
||||
pub at: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn phase_wire_form_is_stable() {
|
||||
// The serialized form lands in NATS KV + the Device CR; the operator
|
||||
// and dashboard key off it. Pin it.
|
||||
let s = |p: AgentUpgradePhase| serde_json::to_value(p).unwrap();
|
||||
assert_eq!(s(AgentUpgradePhase::CutoverReady), "cutover-ready");
|
||||
assert_eq!(s(AgentUpgradePhase::Running), "running");
|
||||
assert_eq!(s(AgentUpgradePhase::Failed), "failed");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn marker_roundtrips() {
|
||||
let m = AgentUpgradeMarker {
|
||||
desired_version: "0.2.0".into(),
|
||||
url: "https://git.nationtech.io/fleet/agent/v0.2.0".into(),
|
||||
sha256: "abc123".into(),
|
||||
};
|
||||
let j = serde_json::to_string(&m).unwrap();
|
||||
assert_eq!(serde_json::from_str::<AgentUpgradeMarker>(&j).unwrap(), m);
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,4 @@
|
||||
use harmony_reconciler_contracts::InventorySnapshot;
|
||||
use harmony_reconciler_contracts::{AgentUpgradePhase, InventorySnapshot};
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
|
||||
use kube::CustomResource;
|
||||
use schemars::JsonSchema;
|
||||
@@ -118,6 +118,28 @@ pub struct DeviceStatus {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub last_heartbeat: Option<String>,
|
||||
pub reachability: Reachability,
|
||||
/// Agent version the device is currently running, from its heartbeat
|
||||
/// (ADR-022). `None` for a pre-upgrade agent that doesn't report it.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub current_version: Option<String>,
|
||||
/// In-flight agent upgrade, reflected from the agent's upgrade status.
|
||||
/// `None` when no upgrade is active.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub upgrade: Option<DeviceUpgradeStatus>,
|
||||
}
|
||||
|
||||
/// Reflection of the agent's upgrade state machine onto the CR, so
|
||||
/// `kubectl get device -o yaml` and the dashboard see upgrade progress.
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct DeviceUpgradeStatus {
|
||||
/// Live phase of the agent's upgrade state machine. The wire enum itself —
|
||||
/// not a stringified copy — so the set of values can't drift.
|
||||
pub phase: AgentUpgradePhase,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub target_version: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub last_error: Option<String>,
|
||||
}
|
||||
|
||||
/// Coarse liveness derived from heartbeat freshness. Failing/Pending
|
||||
|
||||
@@ -13,5 +13,5 @@ pub mod crd;
|
||||
|
||||
pub use crd::{
|
||||
AggregateLastError, Deployment, DeploymentAggregate, DeploymentSpec, DeploymentStatus, Device,
|
||||
DeviceSpec, DeviceStatus, Reachability, Rollout, RolloutStrategy,
|
||||
DeviceSpec, DeviceStatus, DeviceUpgradeStatus, Reachability, Rollout, RolloutStrategy,
|
||||
};
|
||||
|
||||
20
harmony_downloadable_asset/Cargo.toml
Normal file
20
harmony_downloadable_asset/Cargo.toml
Normal file
@@ -0,0 +1,20 @@
|
||||
[package]
|
||||
name = "harmony_downloadable_asset"
|
||||
edition = "2024"
|
||||
version.workspace = true
|
||||
readme.workspace = true
|
||||
license.workspace = true
|
||||
description = "Download a file from a URL with SHA-256 checksum verification (shared by k3d binary install and the fleet agent self-upgrade)"
|
||||
|
||||
[dependencies]
|
||||
reqwest = { version = "0.12", features = ["stream", "rustls-tls", "http2"], default-features = false }
|
||||
sha2 = "0.10"
|
||||
tokio = { workspace = true, features = ["fs", "io-util"] }
|
||||
futures-util = "0.3"
|
||||
url.workspace = true
|
||||
thiserror.workspace = true
|
||||
tracing.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
|
||||
httptest = "0.16"
|
||||
213
harmony_downloadable_asset/src/lib.rs
Normal file
213
harmony_downloadable_asset/src/lib.rs
Normal file
@@ -0,0 +1,213 @@
|
||||
//! Download a file from a URL and verify its SHA-256 before trusting it.
|
||||
//!
|
||||
//! Shared by k3d's binary install and the fleet agent's self-upgrade
|
||||
//! (ADR-022): both fetch a versioned binary from a release host and must refuse
|
||||
//! anything whose hash doesn't match the pin. Re-downloads are skipped when the
|
||||
//! target already exists with the right checksum.
|
||||
|
||||
use std::io::Read;
|
||||
use std::path::{Path, PathBuf};
|
||||
|
||||
use futures_util::StreamExt;
|
||||
use sha2::{Digest, Sha256};
|
||||
use tokio::fs;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use url::Url;
|
||||
|
||||
#[derive(Debug, thiserror::Error)]
|
||||
pub enum DownloadError {
|
||||
#[error("failed to create download directory {0}: {1}")]
|
||||
CreateDir(PathBuf, #[source] std::io::Error),
|
||||
#[error("request to {0} failed: {1}")]
|
||||
Request(Url, #[source] reqwest::Error),
|
||||
#[error("download of {url} failed with status {status}")]
|
||||
Status {
|
||||
url: Url,
|
||||
status: reqwest::StatusCode,
|
||||
},
|
||||
#[error("writing {0} failed: {1}")]
|
||||
Write(PathBuf, #[source] std::io::Error),
|
||||
#[error("error streaming download body: {0}")]
|
||||
Stream(#[source] reqwest::Error),
|
||||
#[error("checksum mismatch for {path}: expected {expected}, got {actual}")]
|
||||
Checksum {
|
||||
path: PathBuf,
|
||||
expected: String,
|
||||
actual: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// A file to fetch from `url` into `file_name`, gated on `checksum` (lowercase
|
||||
/// hex SHA-256).
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DownloadableAsset {
|
||||
pub url: Url,
|
||||
pub file_name: String,
|
||||
pub checksum: String,
|
||||
}
|
||||
|
||||
impl DownloadableAsset {
|
||||
/// SHA-256 of `file` as lowercase hex, or `None` if it can't be read.
|
||||
fn sha256_hex(file: &Path) -> Option<String> {
|
||||
let mut f = std::fs::File::open(file).ok()?;
|
||||
let mut hasher = Sha256::new();
|
||||
let mut buf = [0u8; 1024 * 1024];
|
||||
loop {
|
||||
match f.read(&mut buf) {
|
||||
Ok(0) => break,
|
||||
Ok(n) => hasher.update(&buf[..n]),
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "error reading file for checksum");
|
||||
return None;
|
||||
}
|
||||
}
|
||||
}
|
||||
Some(format!("{:x}", hasher.finalize()))
|
||||
}
|
||||
|
||||
fn matches_checksum(&self, file: &Path) -> bool {
|
||||
Self::sha256_hex(file).is_some_and(|h| h == self.checksum)
|
||||
}
|
||||
|
||||
/// Download into `folder/<file_name>`, verifying the checksum. Skips the
|
||||
/// download when the file already exists with the right checksum. Returns
|
||||
/// the path on success.
|
||||
pub async fn download_to_path(&self, folder: PathBuf) -> Result<PathBuf, DownloadError> {
|
||||
if !folder.exists() {
|
||||
fs::create_dir_all(&folder)
|
||||
.await
|
||||
.map_err(|e| DownloadError::CreateDir(folder.clone(), e))?;
|
||||
}
|
||||
let target = folder.join(&self.file_name);
|
||||
|
||||
if self.matches_checksum(&target) {
|
||||
tracing::debug!(
|
||||
?target,
|
||||
"already present with correct checksum; skipping download"
|
||||
);
|
||||
return Ok(target);
|
||||
}
|
||||
|
||||
tracing::debug!(url = %self.url, "downloading");
|
||||
let response = reqwest::Client::new()
|
||||
.get(self.url.clone())
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| DownloadError::Request(self.url.clone(), e))?;
|
||||
if !response.status().is_success() {
|
||||
return Err(DownloadError::Status {
|
||||
url: self.url.clone(),
|
||||
status: response.status(),
|
||||
});
|
||||
}
|
||||
|
||||
let mut file = File::create(&target)
|
||||
.await
|
||||
.map_err(|e| DownloadError::Write(target.clone(), e))?;
|
||||
let mut stream = response.bytes_stream();
|
||||
while let Some(chunk) = stream.next().await {
|
||||
let chunk = chunk.map_err(DownloadError::Stream)?;
|
||||
file.write_all(&chunk)
|
||||
.await
|
||||
.map_err(|e| DownloadError::Write(target.clone(), e))?;
|
||||
}
|
||||
file.flush()
|
||||
.await
|
||||
.map_err(|e| DownloadError::Write(target.clone(), e))?;
|
||||
drop(file);
|
||||
|
||||
match Self::sha256_hex(&target) {
|
||||
Some(actual) if actual == self.checksum => Ok(target),
|
||||
actual => Err(DownloadError::Checksum {
|
||||
path: target,
|
||||
expected: self.checksum.clone(),
|
||||
actual: actual.unwrap_or_else(|| "<unreadable>".to_string()),
|
||||
}),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use httptest::{Expectation, Server, matchers, responders};
|
||||
|
||||
const CONTENT: &str = "This is a test file.";
|
||||
const CONTENT_SHA256: &str = "f29bc64a9d3732b4b9035125fdb3285f5b6455778edca72414671e0ca3b2e0de";
|
||||
|
||||
fn tmp() -> PathBuf {
|
||||
let id = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_nanos();
|
||||
let p = std::env::temp_dir().join(format!("harmony-dl-{id}"));
|
||||
std::fs::create_dir_all(&p).unwrap();
|
||||
p
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn downloads_and_verifies() {
|
||||
let server = Server::run();
|
||||
server.expect(
|
||||
Expectation::matching(httptest::matchers::request::method_path("GET", "/f.bin"))
|
||||
.respond_with(responders::status_code(200).body(CONTENT)),
|
||||
);
|
||||
let asset = DownloadableAsset {
|
||||
url: Url::parse(&server.url("/f.bin").to_string()).unwrap(),
|
||||
file_name: "f.bin".to_string(),
|
||||
checksum: CONTENT_SHA256.to_string(),
|
||||
};
|
||||
let path = asset.download_to_path(tmp()).await.unwrap();
|
||||
assert_eq!(std::fs::read_to_string(path).unwrap(), CONTENT);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn skips_when_already_present_with_matching_checksum() {
|
||||
let server = Server::run();
|
||||
server.expect(
|
||||
Expectation::matching(matchers::any())
|
||||
.times(0)
|
||||
.respond_with(responders::status_code(200).body(CONTENT)),
|
||||
);
|
||||
let folder = tmp();
|
||||
std::fs::write(folder.join("f.bin"), CONTENT).unwrap();
|
||||
let asset = DownloadableAsset {
|
||||
url: Url::parse(&server.url("/f.bin").to_string()).unwrap(),
|
||||
file_name: "f.bin".to_string(),
|
||||
checksum: CONTENT_SHA256.to_string(),
|
||||
};
|
||||
asset.download_to_path(folder).await.unwrap();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn rejects_bad_checksum() {
|
||||
let server = Server::run();
|
||||
server.expect(
|
||||
Expectation::matching(matchers::any())
|
||||
.respond_with(responders::status_code(200).body("not the expected content")),
|
||||
);
|
||||
let asset = DownloadableAsset {
|
||||
url: Url::parse(&server.url("/f.bin").to_string()).unwrap(),
|
||||
file_name: "f.bin".to_string(),
|
||||
checksum: CONTENT_SHA256.to_string(),
|
||||
};
|
||||
let err = asset.download_to_path(tmp()).await.unwrap_err();
|
||||
assert!(matches!(err, DownloadError::Checksum { .. }));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn surfaces_http_error() {
|
||||
let server = Server::run();
|
||||
server.expect(
|
||||
Expectation::matching(matchers::any()).respond_with(responders::status_code(404)),
|
||||
);
|
||||
let asset = DownloadableAsset {
|
||||
url: Url::parse(&server.url("/f.bin").to_string()).unwrap(),
|
||||
file_name: "f.bin".to_string(),
|
||||
checksum: CONTENT_SHA256.to_string(),
|
||||
};
|
||||
let err = asset.download_to_path(tmp()).await.unwrap_err();
|
||||
assert!(matches!(err, DownloadError::Status { .. }));
|
||||
}
|
||||
}
|
||||
@@ -13,11 +13,9 @@ octocrab = "0.44.0"
|
||||
regex = "1.11.1"
|
||||
reqwest = { version = "0.12", features = ["stream", "rustls-tls", "http2"], default-features = false }
|
||||
url.workspace = true
|
||||
sha2 = "0.10.8"
|
||||
futures-util = "0.3.31"
|
||||
kube.workspace = true
|
||||
harmony_downloadable_asset = { path = "../harmony_downloadable_asset" }
|
||||
|
||||
[dev-dependencies]
|
||||
env_logger = { workspace = true }
|
||||
httptest = "0.16.3"
|
||||
pretty_assertions = "1.4.1"
|
||||
|
||||
@@ -1,303 +0,0 @@
|
||||
use futures_util::StreamExt;
|
||||
use log::{debug, warn};
|
||||
use sha2::{Digest, Sha256};
|
||||
use std::io::Read;
|
||||
use std::path::PathBuf;
|
||||
use tokio::fs;
|
||||
use tokio::fs::File;
|
||||
use tokio::io::AsyncWriteExt;
|
||||
use url::Url;
|
||||
|
||||
const CHECKSUM_FAILED_MSG: &str = "Downloaded file failed checksum verification";
|
||||
|
||||
/// Represents an asset that can be downloaded from a URL with checksum verification.
|
||||
///
|
||||
/// This struct facilitates secure downloading of files from remote URLs by
|
||||
/// verifying the integrity of the downloaded content using SHA-256 checksums.
|
||||
/// It handles downloading the file, saving it to disk, and verifying the checksum matches
|
||||
/// the expected value.
|
||||
///
|
||||
/// # Examples
|
||||
///
|
||||
/// ```compile_fail
|
||||
/// # use url::Url;
|
||||
/// # use std::path::PathBuf;
|
||||
///
|
||||
/// # async fn example() -> Result<(), String> {
|
||||
/// let asset = DownloadableAsset {
|
||||
/// url: Url::parse("https://example.com/file.zip").unwrap(),
|
||||
/// file_name: "file.zip".to_string(),
|
||||
/// checksum: "a1b2c3d4e5f6...".to_string(),
|
||||
/// };
|
||||
///
|
||||
/// let download_dir = PathBuf::from("/tmp/downloads");
|
||||
/// let file_path = asset.download_to_path(download_dir).await?;
|
||||
/// # Ok(())
|
||||
/// # }
|
||||
/// ```
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct DownloadableAsset {
|
||||
pub(crate) url: Url,
|
||||
pub(crate) file_name: String,
|
||||
pub(crate) checksum: String,
|
||||
}
|
||||
|
||||
impl DownloadableAsset {
|
||||
fn verify_checksum(&self, file: PathBuf) -> bool {
|
||||
if !file.exists() {
|
||||
debug!("File does not exist: {:?}", file);
|
||||
return false;
|
||||
}
|
||||
|
||||
let mut file = match std::fs::File::open(&file) {
|
||||
Ok(file) => file,
|
||||
Err(e) => {
|
||||
warn!("Failed to open file for checksum verification: {:?}", e);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
let mut hasher = Sha256::new();
|
||||
let mut buffer = [0; 1024 * 1024]; // 1MB buffer
|
||||
|
||||
loop {
|
||||
let bytes_read = match file.read(&mut buffer) {
|
||||
Ok(0) => break,
|
||||
Ok(n) => n,
|
||||
Err(e) => {
|
||||
warn!("Error reading file for checksum: {:?}", e);
|
||||
return false;
|
||||
}
|
||||
};
|
||||
|
||||
hasher.update(&buffer[..bytes_read]);
|
||||
}
|
||||
|
||||
let result = hasher.finalize();
|
||||
let calculated_hash = format!("{:x}", result);
|
||||
|
||||
debug!("Expected checksum: {}", self.checksum);
|
||||
debug!("Calculated checksum: {}", calculated_hash);
|
||||
|
||||
calculated_hash == self.checksum
|
||||
}
|
||||
|
||||
/// Downloads the asset to the specified directory, verifying its checksum.
|
||||
///
|
||||
/// This function will:
|
||||
/// 1. Create the target directory if it doesn't exist
|
||||
/// 2. Check if the file already exists with the correct checksum
|
||||
/// 3. If not, download the file from the URL
|
||||
/// 4. Verify the downloaded file's checksum matches the expected value
|
||||
///
|
||||
/// # Arguments
|
||||
///
|
||||
/// * `folder` - The directory path where the file should be saved
|
||||
///
|
||||
/// # Returns
|
||||
///
|
||||
/// * `Ok(PathBuf)` - The path to the downloaded file on success
|
||||
/// * `Err(String)` - A descriptive error message if the download or verification fails
|
||||
///
|
||||
/// # Errors
|
||||
///
|
||||
/// This function will return an error if:
|
||||
/// - The network request fails
|
||||
/// - The server responds with a non-success status code
|
||||
/// - Writing to disk fails
|
||||
/// - The checksum verification fails
|
||||
pub(crate) async fn download_to_path(&self, folder: PathBuf) -> Result<PathBuf, String> {
|
||||
if !folder.exists() {
|
||||
fs::create_dir_all(&folder)
|
||||
.await
|
||||
.expect("Failed to create download directory");
|
||||
}
|
||||
|
||||
let target_file_path = folder.join(&self.file_name);
|
||||
debug!("Downloading to path: {:?}", target_file_path);
|
||||
|
||||
if self.verify_checksum(target_file_path.clone()) {
|
||||
debug!("File already exists with correct checksum, skipping download");
|
||||
return Ok(target_file_path);
|
||||
}
|
||||
|
||||
debug!("Downloading from URL: {}", self.url);
|
||||
let client = reqwest::Client::new();
|
||||
let response = client
|
||||
.get(self.url.clone())
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("Failed to download file: {e}"))?;
|
||||
|
||||
if !response.status().is_success() {
|
||||
return Err(format!(
|
||||
"Failed to download file, status: {}",
|
||||
response.status()
|
||||
));
|
||||
}
|
||||
|
||||
let mut file = File::create(&target_file_path)
|
||||
.await
|
||||
.expect("Failed to create target file");
|
||||
|
||||
let mut stream = response.bytes_stream();
|
||||
while let Some(chunk_result) = stream.next().await {
|
||||
let chunk = chunk_result.expect("Error while downloading file");
|
||||
file.write_all(&chunk)
|
||||
.await
|
||||
.expect("Failed to write data to file");
|
||||
}
|
||||
|
||||
file.flush().await.expect("Failed to flush file");
|
||||
drop(file);
|
||||
|
||||
if !self.verify_checksum(target_file_path.clone()) {
|
||||
return Err(CHECKSUM_FAILED_MSG.to_string());
|
||||
}
|
||||
|
||||
debug!(
|
||||
"File downloaded and verified successfully: {}",
|
||||
target_file_path.to_string_lossy()
|
||||
);
|
||||
Ok(target_file_path)
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use httptest::{
|
||||
matchers::{self, request},
|
||||
responders, Expectation, Server,
|
||||
};
|
||||
|
||||
const BASE_TEST_PATH: &str = "/tmp/harmony-test-k3d-download";
|
||||
const TEST_CONTENT: &str = "This is a test file.";
|
||||
const TEST_CONTENT_HASH: &str =
|
||||
"f29bc64a9d3732b4b9035125fdb3285f5b6455778edca72414671e0ca3b2e0de";
|
||||
|
||||
fn setup_test() -> (PathBuf, Server) {
|
||||
let _ = env_logger::builder().try_init();
|
||||
|
||||
// Create unique test directory
|
||||
let test_id = std::time::SystemTime::now()
|
||||
.duration_since(std::time::UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
let download_path = format!("{}/test_{}", BASE_TEST_PATH, test_id);
|
||||
std::fs::create_dir_all(&download_path).unwrap();
|
||||
|
||||
(PathBuf::from(download_path), Server::run())
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_download_to_path_success() {
|
||||
let (folder, server) = setup_test();
|
||||
|
||||
server.expect(
|
||||
Expectation::matching(request::method_path("GET", "/test.txt"))
|
||||
.respond_with(responders::status_code(200).body(TEST_CONTENT)),
|
||||
);
|
||||
|
||||
let asset = DownloadableAsset {
|
||||
url: Url::parse(&server.url("/test.txt").to_string()).unwrap(),
|
||||
file_name: "test.txt".to_string(),
|
||||
checksum: TEST_CONTENT_HASH.to_string(),
|
||||
};
|
||||
|
||||
let result = asset
|
||||
.download_to_path(folder.join("success"))
|
||||
.await
|
||||
.unwrap();
|
||||
let downloaded_content = std::fs::read_to_string(result).unwrap();
|
||||
assert_eq!(downloaded_content, TEST_CONTENT);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_download_to_path_already_exists() {
|
||||
let (folder, server) = setup_test();
|
||||
|
||||
server.expect(
|
||||
Expectation::matching(matchers::any())
|
||||
.times(0)
|
||||
.respond_with(responders::status_code(200).body(TEST_CONTENT)),
|
||||
);
|
||||
|
||||
let asset = DownloadableAsset {
|
||||
url: Url::parse(&server.url("/test.txt").to_string()).unwrap(),
|
||||
file_name: "test.txt".to_string(),
|
||||
checksum: TEST_CONTENT_HASH.to_string(),
|
||||
};
|
||||
|
||||
let target_file_path = folder.join(&asset.file_name);
|
||||
std::fs::write(&target_file_path, TEST_CONTENT).unwrap();
|
||||
|
||||
let result = asset.download_to_path(folder).await.unwrap();
|
||||
let content = std::fs::read_to_string(result).unwrap();
|
||||
assert_eq!(content, TEST_CONTENT);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_download_to_path_server_error() {
|
||||
let (folder, server) = setup_test();
|
||||
|
||||
server.expect(
|
||||
Expectation::matching(matchers::any()).respond_with(responders::status_code(404)),
|
||||
);
|
||||
|
||||
let asset = DownloadableAsset {
|
||||
url: Url::parse(&server.url("/test.txt").to_string()).unwrap(),
|
||||
file_name: "test.txt".to_string(),
|
||||
checksum: TEST_CONTENT_HASH.to_string(),
|
||||
};
|
||||
|
||||
let result = asset.download_to_path(folder.join("error")).await;
|
||||
assert!(result.is_err());
|
||||
assert!(result.unwrap_err().contains("status: 404"));
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_download_to_path_checksum_failure() {
|
||||
let (folder, server) = setup_test();
|
||||
|
||||
let invalid_content = "This is NOT the expected content";
|
||||
server.expect(
|
||||
Expectation::matching(matchers::any())
|
||||
.respond_with(responders::status_code(200).body(invalid_content)),
|
||||
);
|
||||
|
||||
let asset = DownloadableAsset {
|
||||
url: Url::parse(&server.url("/test.txt").to_string()).unwrap(),
|
||||
file_name: "test.txt".to_string(),
|
||||
checksum: TEST_CONTENT_HASH.to_string(),
|
||||
};
|
||||
|
||||
let join_handle =
|
||||
tokio::spawn(async move { asset.download_to_path(folder.join("failure")).await });
|
||||
|
||||
assert_eq!(
|
||||
join_handle.await.unwrap().err().unwrap(),
|
||||
CHECKSUM_FAILED_MSG
|
||||
);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn test_download_with_specific_path_matcher() {
|
||||
let (folder, server) = setup_test();
|
||||
|
||||
server.expect(
|
||||
Expectation::matching(matchers::request::path("/specific/path.txt"))
|
||||
.respond_with(responders::status_code(200).body(TEST_CONTENT)),
|
||||
);
|
||||
|
||||
let asset = DownloadableAsset {
|
||||
url: Url::parse(&server.url("/specific/path.txt").to_string()).unwrap(),
|
||||
file_name: "path.txt".to_string(),
|
||||
checksum: TEST_CONTENT_HASH.to_string(),
|
||||
};
|
||||
|
||||
let result = asset.download_to_path(folder).await.unwrap();
|
||||
let downloaded_content = std::fs::read_to_string(result).unwrap();
|
||||
assert_eq!(downloaded_content, TEST_CONTENT);
|
||||
}
|
||||
}
|
||||
@@ -1,5 +1,4 @@
|
||||
mod downloadable_asset;
|
||||
use downloadable_asset::*;
|
||||
use harmony_downloadable_asset::DownloadableAsset;
|
||||
|
||||
use kube::Client;
|
||||
use log::{debug, info};
|
||||
@@ -134,7 +133,10 @@ impl K3d {
|
||||
|
||||
let release_binary = self.get_binary_for_current_platform(latest_release).await;
|
||||
debug!("Foudn K3d binary to install : {release_binary:#?}");
|
||||
release_binary.download_to_path(self.base_dir.clone()).await
|
||||
release_binary
|
||||
.download_to_path(self.base_dir.clone())
|
||||
.await
|
||||
.map_err(|e| e.to_string())
|
||||
}
|
||||
|
||||
// TODO : Make sure this will only find actual released versions, no prereleases or test
|
||||
|
||||
Reference in New Issue
Block a user