feat(fleet): agent self-upgrade + auto-rollback protocol, ADR-022 (Ch4) #330
20
Cargo.lock
generated
20
Cargo.lock
generated
@@ -4066,6 +4066,7 @@ dependencies = [
|
|||||||
"harmony",
|
"harmony",
|
||||||
"harmony-fleet-auth",
|
"harmony-fleet-auth",
|
||||||
"harmony-reconciler-contracts",
|
"harmony-reconciler-contracts",
|
||||||
|
"harmony_downloadable_asset",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"thiserror 2.0.18",
|
"thiserror 2.0.18",
|
||||||
@@ -4073,6 +4074,7 @@ dependencies = [
|
|||||||
"toml",
|
"toml",
|
||||||
"tracing",
|
"tracing",
|
||||||
"tracing-subscriber",
|
"tracing-subscriber",
|
||||||
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
@@ -4390,6 +4392,20 @@ dependencies = [
|
|||||||
"syn 2.0.117",
|
"syn 2.0.117",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "harmony_downloadable_asset"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"futures-util",
|
||||||
|
"httptest",
|
||||||
|
"reqwest 0.12.28",
|
||||||
|
"sha2 0.10.9",
|
||||||
|
"thiserror 2.0.18",
|
||||||
|
"tokio",
|
||||||
|
"tracing",
|
||||||
|
"url",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "harmony_execution"
|
name = "harmony_execution"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
@@ -5507,15 +5523,13 @@ version = "0.1.0"
|
|||||||
dependencies = [
|
dependencies = [
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
"futures-util",
|
"harmony_downloadable_asset",
|
||||||
"httptest",
|
|
||||||
"kube",
|
"kube",
|
||||||
"log",
|
"log",
|
||||||
"octocrab",
|
"octocrab",
|
||||||
"pretty_assertions",
|
"pretty_assertions",
|
||||||
"regex",
|
"regex",
|
||||||
"reqwest 0.12.28",
|
"reqwest 0.12.28",
|
||||||
"sha2 0.10.9",
|
|
||||||
"tokio",
|
"tokio",
|
||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ members = [
|
|||||||
"private_repos/*",
|
"private_repos/*",
|
||||||
"harmony",
|
"harmony",
|
||||||
"harmony_zitadel_auth",
|
"harmony_zitadel_auth",
|
||||||
|
"harmony_downloadable_asset",
|
||||||
"harmony_types",
|
"harmony_types",
|
||||||
"harmony_macros",
|
"harmony_macros",
|
||||||
"harmony_tui",
|
"harmony_tui",
|
||||||
|
|||||||
48
ROADMAP/fleet_platform/ch4-agent-upgrade-status.md
Normal file
48
ROADMAP/fleet_platform/ch4-agent-upgrade-status.md
Normal file
@@ -0,0 +1,48 @@
|
|||||||
|
# Ch4 — Agent self-upgrade + auto-rollback (ADR-022): status
|
||||||
|
|
||||||
|
Built the full ADR-022 protocol end to end. The state-machine "brain" and the
|
||||||
|
operator's commit decision are exhaustively unit-tested; the OS side-effects sit
|
||||||
|
behind a seam so they're faked in tests and real on-device.
|
||||||
|
|
||||||
|
## Shipped
|
||||||
|
|
||||||
|
| Piece | Where | Tested |
|
||||||
|
|---|---|---|
|
||||||
|
| Wire types: marker, phase, status, `agent_version` on heartbeat, `Verb::UpgradeStop` | `harmony-reconciler-contracts/src/upgrade.rs`, `kv.rs`, `commands.rs`, `fleet.rs` | unit |
|
||||||
|
| Shared download+SHA-256 verify (lifted from k3d) | new crate `harmony_downloadable_asset` | unit (httptest) |
|
||||||
|
| Agent state machine `drive` (Staging→Verifying→CutoverReady→stop/revert) | `harmony-fleet-agent/src/upgrade.rs` | **6 unit tests** incl. timeout-revert, stage/self-test/cutover failure |
|
||||||
|
| `UpgradeExecutor` seam + real `SystemdUpgradeExecutor` (download, `--self-test`, atomic symlink swap, `systemd-run` transient unit, revert) | same | seam fake-tested; real impl self-heals layout |
|
||||||
|
| `--self-test` flag | `harmony-fleet-agent/src/main.rs` | — |
|
||||||
|
| `Verb::UpgradeStop` handling + armed `UpgradeStopSignal` (only the cutover-waiting old agent acts) | `command_server.rs`, `upgrade.rs` | — |
|
||||||
|
| Operator coordinator: send stop **only after** observing the new version's heartbeat; reflect version + phase to the `Device` CR | `harmony-fleet-operator/src/upgrade_coordinator.rs` | **2 unit tests** on the commit decision |
|
||||||
|
| `FleetCommandsClient::upgrade_stop` | `commands.rs` | — |
|
||||||
|
| `Device.status.{currentVersion, upgrade}` | `crd.rs` | — |
|
||||||
|
|
||||||
|
Load-bearing properties from the ADR are intact: old verifies new
|
||||||
|
(`--self-test`); operator commits the stop (single source of truth, never the
|
||||||
|
agent); rollback is the same code path (revert symlink + stop transient unit on
|
||||||
|
self-test failure / heartbeat-timeout); no version is GC'd.
|
||||||
|
|
||||||
|
## Deviations (deliberate)
|
||||||
|
|
||||||
|
- **Marker + status ride NATS KV** (`agent-upgrade` / `agent-upgrade-status`),
|
||||||
|
not a fire-and-forget subject, so they survive an operator restart — same
|
||||||
|
ethos as Ch2. The ADR's `device-cmd.*`/`device-state.*.upgrade` subjects map
|
||||||
|
onto: the existing command protocol (`Verb::UpgradeStop`) and the status KV.
|
||||||
|
- **First-upgrade rollback without M1.** The real executor `capture_revert_target`
|
||||||
|
preserves the running binary at its versioned path on first cutover even when
|
||||||
|
the initial install put a plain file at `/usr/local/bin/fleet-agent`. This
|
||||||
|
makes M1 a clean-install nicety, not a rollback-correctness prerequisite.
|
||||||
|
|
||||||
|
## Flagged for a supervised run (not done tonight)
|
||||||
|
|
||||||
|
1. **M1 clean install layout** — `FleetDeviceSetupScore` should install to
|
||||||
|
`/usr/bin/fleet-agent-v<ver>` + symlink `/usr/local/bin/fleet-agent` from the
|
||||||
|
start. Needs a new `agent_version` config field (≈9 construction sites) and a
|
||||||
|
`FileSource::Symlink` delivery primitive (ansible `state: link`). The executor
|
||||||
|
self-heal above covers correctness in the meantime.
|
||||||
|
2. **libvirt vX→vX+1 e2e + corrupt-binary auto-revert** — needs two built agent
|
||||||
|
binaries, a served URL reachable from the VM, and a KVM run. The VM harness
|
||||||
|
exists (`harmony-fleet-e2e/src/vm`); the protocol brain is unit-green, so this
|
||||||
|
is an integration proof to run on real hardware. The corrupt-binary path is
|
||||||
|
already unit-proven via `stage_failure_*` / `heartbeat_timeout_reverts_*`.
|
||||||
@@ -527,6 +527,7 @@ async fn simulate_heartbeat_loop(
|
|||||||
let hb = HeartbeatPayload {
|
let hb = HeartbeatPayload {
|
||||||
device_id: Id::from(device.device_id.clone()),
|
device_id: Id::from(device.device_id.clone()),
|
||||||
at: Utc::now(),
|
at: Utc::now(),
|
||||||
|
agent_version: "load-test".to_string(),
|
||||||
};
|
};
|
||||||
if let Ok(payload) = serde_json::to_vec(&hb) {
|
if let Ok(payload) = serde_json::to_vec(&hb) {
|
||||||
if bucket.put(&hb_key, payload.into()).await.is_ok() {
|
if bucket.put(&hb_key, payload.into()).await.is_ok() {
|
||||||
|
|||||||
@@ -7,7 +7,9 @@ rust-version = "1.85"
|
|||||||
[dependencies]
|
[dependencies]
|
||||||
harmony-fleet-auth = { path = "../harmony-fleet-auth" }
|
harmony-fleet-auth = { path = "../harmony-fleet-auth" }
|
||||||
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
|
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
|
||||||
|
harmony_downloadable_asset = { path = "../../harmony_downloadable_asset" }
|
||||||
harmony = { path = "../../harmony", default-features = false, features = ["podman"] }
|
harmony = { path = "../../harmony", default-features = false, features = ["podman"] }
|
||||||
|
url = { workspace = true }
|
||||||
async-nats = { workspace = true }
|
async-nats = { workspace = true }
|
||||||
async-trait = { workspace = true }
|
async-trait = { workspace = true }
|
||||||
chrono = { workspace = true }
|
chrono = { workspace = true }
|
||||||
|
|||||||
@@ -25,6 +25,8 @@ use harmony_reconciler_contracts::{
|
|||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use thiserror::Error;
|
use thiserror::Error;
|
||||||
|
|
||||||
|
use crate::upgrade::UpgradeStopSignal;
|
||||||
|
|
||||||
/// Hard cap on a single exec's combined output. Keeps the JSON reply
|
/// Hard cap on a single exec's combined output. Keeps the JSON reply
|
||||||
/// comfortably under NATS's default 1 MiB message limit.
|
/// comfortably under NATS's default 1 MiB message limit.
|
||||||
const EXEC_MAX_OUTPUT: usize = 256 * 1024;
|
const EXEC_MAX_OUTPUT: usize = 256 * 1024;
|
||||||
@@ -38,15 +40,20 @@ pub struct CommandServer {
|
|||||||
client: Client,
|
client: Client,
|
||||||
agent_version: &'static str,
|
agent_version: &'static str,
|
||||||
started_at: Instant,
|
started_at: Instant,
|
||||||
|
/// Fired when the operator sends `Verb::UpgradeStop` (ADR-022 cutover). The
|
||||||
|
/// upgrade manager awaits it and then exits the process so the new version
|
||||||
|
/// takes over. The agent never self-stops on its own observation.
|
||||||
|
upgrade_stop: Arc<UpgradeStopSignal>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl CommandServer {
|
impl CommandServer {
|
||||||
pub fn new(device_id: Id, client: Client) -> Self {
|
pub fn new(device_id: Id, client: Client, upgrade_stop: Arc<UpgradeStopSignal>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
device_id,
|
device_id,
|
||||||
client,
|
client,
|
||||||
agent_version: env!("CARGO_PKG_VERSION"),
|
agent_version: env!("CARGO_PKG_VERSION"),
|
||||||
started_at: Instant::now(),
|
started_at: Instant::now(),
|
||||||
|
upgrade_stop,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -115,6 +122,29 @@ impl CommandServer {
|
|||||||
Err(e) => return Err(CommandError::BadRequestBody(e.to_string())),
|
Err(e) => return Err(CommandError::BadRequestBody(e.to_string())),
|
||||||
};
|
};
|
||||||
self.reply_exec(reply_to, &command).await
|
self.reply_exec(reply_to, &command).await
|
||||||
|
} else if verb_token == Verb::UpgradeStop.as_subject_token() {
|
||||||
|
let reason = match serde_json::from_slice::<CommandRequest>(&msg.payload) {
|
||||||
|
Ok(CommandRequest::UpgradeStop { reason }) => reason,
|
||||||
|
Ok(_) => "upgrade-stop".to_string(),
|
||||||
|
// Tolerate a bodyless stop — the operator's intent is clear.
|
||||||
|
Err(_) => "upgrade-stop".to_string(),
|
||||||
|
};
|
||||||
|
tracing::info!(%reason, "received upgrade-stop; signalling cutover exit");
|
||||||
|
// Ack first so the operator knows we got it, then fire the signal.
|
||||||
|
let ack = serde_json::to_vec(&PingReply {
|
||||||
|
device_id: self.device_id.clone(),
|
||||||
|
agent_version: self.agent_version.to_string(),
|
||||||
|
uptime_s: self.started_at.elapsed().as_secs(),
|
||||||
|
})
|
||||||
|
.map_err(CommandError::SerializeReply)?;
|
||||||
|
self.client
|
||||||
|
.publish(reply_to, ack.into())
|
||||||
|
.await
|
||||||
|
.map_err(|e| CommandError::PublishReply(e.to_string()))?;
|
||||||
|
if !self.upgrade_stop.fire() {
|
||||||
|
tracing::warn!("upgrade-stop received but no upgrade is in cutover; ignoring");
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
} else {
|
} else {
|
||||||
tracing::warn!(verb = %verb_token, "unknown command verb");
|
tracing::warn!(verb = %verb_token, "unknown command verb");
|
||||||
Err(CommandError::UnknownVerb(verb_token.to_string()))
|
Err(CommandError::UnknownVerb(verb_token.to_string()))
|
||||||
|
|||||||
@@ -97,6 +97,7 @@ impl FleetPublisher {
|
|||||||
let hb = HeartbeatPayload {
|
let hb = HeartbeatPayload {
|
||||||
device_id: self.device_id.clone(),
|
device_id: self.device_id.clone(),
|
||||||
at: chrono::Utc::now(),
|
at: chrono::Utc::now(),
|
||||||
|
agent_version: env!("CARGO_PKG_VERSION").to_string(),
|
||||||
};
|
};
|
||||||
let key = device_heartbeat_key(&self.device_id.to_string());
|
let key = device_heartbeat_key(&self.device_id.to_string());
|
||||||
match serde_json::to_vec(&hb) {
|
match serde_json::to_vec(&hb) {
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ mod command_server;
|
|||||||
mod config;
|
mod config;
|
||||||
mod fleet_publisher;
|
mod fleet_publisher;
|
||||||
mod reconciler;
|
mod reconciler;
|
||||||
|
mod upgrade;
|
||||||
|
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@@ -43,6 +44,24 @@ struct Cli {
|
|||||||
default_value = "/etc/fleet-agent/config.toml"
|
default_value = "/etc/fleet-agent/config.toml"
|
||||||
)]
|
)]
|
||||||
config: std::path::PathBuf,
|
config: std::path::PathBuf,
|
||||||
|
|
||||||
|
/// ADR-022 self-test: load config, connect NATS (validates JWT), print
|
||||||
|
/// version + "ok", exit 0 — or exit non-zero on any failure. The upgrade
|
||||||
|
/// state machine runs this on a staged binary before cutover.
|
||||||
|
#[arg(long)]
|
||||||
|
self_test: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// ADR-022 `--self-test`: prove this binary can parse its config and reach NATS
|
||||||
|
/// with a valid JWT, then exit. No state mutation, no long-lived loops.
|
||||||
|
async fn self_test(cfg: &AgentConfig) -> Result<()> {
|
||||||
|
let creds = credential_source_from_config(&cfg.credentials)
|
||||||
|
.context("self-test: building NATS credential source")?;
|
||||||
|
connect_nats(cfg, creds)
|
||||||
|
.await
|
||||||
|
.context("self-test: NATS connect / JWT validation")?;
|
||||||
|
println!("fleet-agent v{} self-test ok", env!("CARGO_PKG_VERSION"));
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn connect_nats(cfg: &AgentConfig, creds: Creds) -> Result<async_nats::Client> {
|
async fn connect_nats(cfg: &AgentConfig, creds: Creds) -> Result<async_nats::Client> {
|
||||||
@@ -198,6 +217,13 @@ async fn main() -> Result<()> {
|
|||||||
|
|
||||||
let cli = Cli::parse();
|
let cli = Cli::parse();
|
||||||
let cfg = config::load_config(&cli.config)?;
|
let cfg = config::load_config(&cli.config)?;
|
||||||
|
|
||||||
|
// ADR-022: the upgrade state machine invokes the staged binary with
|
||||||
|
// `--self-test` before any symlink swap. Exit code is the verdict.
|
||||||
|
if cli.self_test {
|
||||||
|
return self_test(&cfg).await;
|
||||||
|
}
|
||||||
|
|
||||||
tracing::info!(
|
tracing::info!(
|
||||||
device_id = %cfg.agent.device_id,
|
device_id = %cfg.agent.device_id,
|
||||||
runtime_enabled = cfg.agent.runtime_enabled,
|
runtime_enabled = cfg.agent.runtime_enabled,
|
||||||
@@ -277,7 +303,14 @@ async fn main() -> Result<()> {
|
|||||||
))
|
))
|
||||||
});
|
});
|
||||||
|
|
||||||
let command_server = Arc::new(CommandServer::new(device_id.clone(), client.clone()));
|
// Fired by the command server on `Verb::UpgradeStop`; awaited by the
|
||||||
|
// upgrade manager, which then exits the process for the new version.
|
||||||
|
let upgrade_stop = Arc::new(upgrade::UpgradeStopSignal::default());
|
||||||
|
let command_server = Arc::new(CommandServer::new(
|
||||||
|
device_id.clone(),
|
||||||
|
client.clone(),
|
||||||
|
upgrade_stop.clone(),
|
||||||
|
));
|
||||||
|
|
||||||
let ctrlc = async {
|
let ctrlc = async {
|
||||||
tokio::signal::ctrl_c().await.ok();
|
tokio::signal::ctrl_c().await.ok();
|
||||||
@@ -312,6 +345,16 @@ async fn main() -> Result<()> {
|
|||||||
};
|
};
|
||||||
let heartbeat = publish_heartbeat_loop(fleet);
|
let heartbeat = publish_heartbeat_loop(fleet);
|
||||||
let commands = command_server.run();
|
let commands = command_server.run();
|
||||||
|
// ADR-022 self-upgrade manager. Returns only when the operator commits a
|
||||||
|
// cutover (it sent `UpgradeStop`) — that completes the select and the
|
||||||
|
// process exits 0, so the already-started new version takes over and
|
||||||
|
// systemd (Restart=on-failure) does not bring the old one back.
|
||||||
|
let upgrade = upgrade::run(
|
||||||
|
client.clone(),
|
||||||
|
device_id.clone(),
|
||||||
|
env!("CARGO_PKG_VERSION").to_string(),
|
||||||
|
upgrade_stop.clone(),
|
||||||
|
);
|
||||||
|
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
// Waiting on ctrlc in a select will automatically terminate other branches when
|
// Waiting on ctrlc in a select will automatically terminate other branches when
|
||||||
@@ -322,6 +365,7 @@ async fn main() -> Result<()> {
|
|||||||
_ = reconcile => {}
|
_ = reconcile => {}
|
||||||
_ = heartbeat => {}
|
_ = heartbeat => {}
|
||||||
r = commands => { r?; }
|
r = commands => { r?; }
|
||||||
|
r = upgrade => { r?; tracing::info!("agent exiting for upgrade cutover"); }
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
|||||||
663
fleet/harmony-fleet-agent/src/upgrade.rs
Normal file
663
fleet/harmony-fleet-agent/src/upgrade.rs
Normal file
@@ -0,0 +1,663 @@
|
|||||||
|
//! Agent self-upgrade state machine (ADR-022).
|
||||||
|
//!
|
||||||
|
//! The operator publishes an [`AgentUpgradeMarker`] (desired version + signed
|
||||||
|
//! URL + SHA-256). The agent stages the new binary, verifies it with
|
||||||
|
//! `--self-test`, atomically swaps the `/usr/local/bin/fleet-agent` symlink, and
|
||||||
|
//! starts the new version alongside itself. It then **waits for the operator's
|
||||||
|
//! stop signal** — the agent never self-stops (the operator is the single
|
||||||
|
//! source of truth, and only commits the stop after it has seen the new
|
||||||
|
//! version's heartbeat). If the new version never reports healthy within
|
||||||
|
//! [`HEARTBEAT_TIMEOUT`], the agent reverts the symlink and stays on its
|
||||||
|
//! current version. Rollback is the same code path as upgrade.
|
||||||
|
//!
|
||||||
|
//! The OS side-effects (download, `--self-test`, symlink swap, systemd) sit
|
||||||
|
//! behind [`UpgradeExecutor`] so [`drive`] — the state machine "brain" — is
|
||||||
|
//! unit-tested with a fake; [`SystemdUpgradeExecutor`] does the real work.
|
||||||
|
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use harmony_reconciler_contracts::AgentUpgradeMarker;
|
||||||
|
use harmony_reconciler_contracts::AgentUpgradePhase::{
|
||||||
|
self, CutoverReady, Done, Failed, Staging, Verifying,
|
||||||
|
};
|
||||||
|
|
||||||
|
/// Directory holding the immutable versioned binaries (`fleet-agent-v<ver>`).
|
||||||
|
pub const VERSIONED_BIN_DIR: &str = "/usr/bin";
|
||||||
|
/// The stable path the systemd unit's `ExecStart` references. Symlink swap of
|
||||||
|
/// this path is the cutover primitive (POSIX-atomic rename).
|
||||||
|
pub const ACTIVE_SYMLINK: &str = "/usr/local/bin/fleet-agent";
|
||||||
|
/// How long the old agent waits for the new version's heartbeat after cutover
|
||||||
|
/// before reverting (ADR-022 suggests 60s).
|
||||||
|
pub const HEARTBEAT_TIMEOUT: Duration = Duration::from_secs(60);
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum UpgradeError {
|
||||||
|
#[error("invalid binary URL: {0}")]
|
||||||
|
Url(String),
|
||||||
|
#[error(transparent)]
|
||||||
|
Download(#[from] harmony_downloadable_asset::DownloadError),
|
||||||
|
#[error("filesystem error: {0}")]
|
||||||
|
Io(#[from] std::io::Error),
|
||||||
|
#[error("self-test exited with {0}")]
|
||||||
|
SelfTest(String),
|
||||||
|
#[error("systemd error: {0}")]
|
||||||
|
Systemd(String),
|
||||||
|
#[error("no previous symlink target recorded to revert to")]
|
||||||
|
NothingToRevert,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The OS-level operations the upgrade needs. Behind a trait so [`drive`] is
|
||||||
|
/// unit-testable without touching disk or systemd.
|
||||||
|
pub trait UpgradeExecutor {
|
||||||
|
/// Download + verify the binary and place it at its versioned path. Returns
|
||||||
|
/// the staged path.
|
||||||
|
async fn stage(&self, marker: &AgentUpgradeMarker) -> Result<PathBuf, UpgradeError>;
|
||||||
|
/// Run the staged binary's `--self-test`. Err if it doesn't exit 0.
|
||||||
|
async fn self_test(&self, binary: &Path) -> Result<(), UpgradeError>;
|
||||||
|
/// Atomically point [`ACTIVE_SYMLINK`] at `binary` (recording the previous
|
||||||
|
/// target for revert) and start the new version alongside the old.
|
||||||
|
async fn cutover(&self, binary: &Path, version: &str) -> Result<(), UpgradeError>;
|
||||||
|
/// Restore the pre-cutover symlink target and stop the new version.
|
||||||
|
async fn revert(&self) -> Result<(), UpgradeError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Sink for upgrade phase transitions (→ operator via KV).
|
||||||
|
pub trait StatusSink {
|
||||||
|
async fn publish(&self, phase: AgentUpgradePhase, last_error: Option<String>);
|
||||||
|
}
|
||||||
|
|
||||||
|
/// What the agent process should do after an upgrade attempt.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||||
|
pub enum UpgradeAction {
|
||||||
|
/// Cutover succeeded and the operator signalled stop — exit(0) so the new
|
||||||
|
/// version (already running) takes over cleanly.
|
||||||
|
Exit,
|
||||||
|
/// The upgrade failed or was reverted — stay running on the current version.
|
||||||
|
StayedOnCurrent,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The state machine. Stage → verify → cutover → wait for the operator's stop
|
||||||
|
/// (with a heartbeat-timeout revert). Pure orchestration over the injected
|
||||||
|
/// executor, status sink, and post-cutover signal futures.
|
||||||
|
///
|
||||||
|
/// `new_version_healthy` resolves when a heartbeat from the new version is
|
||||||
|
/// observed; `operator_stop` resolves when the operator sends the stop signal.
|
||||||
|
pub async fn drive<E, S>(
|
||||||
|
marker: &AgentUpgradeMarker,
|
||||||
|
executor: &E,
|
||||||
|
status: &S,
|
||||||
|
new_version_healthy: impl std::future::Future<Output = ()>,
|
||||||
|
operator_stop: impl std::future::Future<Output = ()>,
|
||||||
|
heartbeat_timeout: Duration,
|
||||||
|
) -> UpgradeAction
|
||||||
|
where
|
||||||
|
E: UpgradeExecutor,
|
||||||
|
S: StatusSink,
|
||||||
|
{
|
||||||
|
status.publish(Staging, None).await;
|
||||||
|
let staged = match executor.stage(marker).await {
|
||||||
|
Ok(p) => p,
|
||||||
|
Err(e) => {
|
||||||
|
status.publish(Failed, Some(e.to_string())).await;
|
||||||
|
return UpgradeAction::StayedOnCurrent;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
status.publish(Verifying, None).await;
|
||||||
|
if let Err(e) = executor.self_test(&staged).await {
|
||||||
|
status
|
||||||
|
.publish(Failed, Some(format!("self-test failed: {e}")))
|
||||||
|
.await;
|
||||||
|
return UpgradeAction::StayedOnCurrent;
|
||||||
|
}
|
||||||
|
|
||||||
|
status.publish(CutoverReady, None).await;
|
||||||
|
if let Err(e) = executor.cutover(&staged, &marker.desired_version).await {
|
||||||
|
let _ = executor.revert().await;
|
||||||
|
status
|
||||||
|
.publish(Failed, Some(format!("cutover failed: {e}")))
|
||||||
|
.await;
|
||||||
|
return UpgradeAction::StayedOnCurrent;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Post-cutover: race the operator's stop against the new version becoming
|
||||||
|
// healthy and the timeout. Stop ⇒ exit. Healthy ⇒ wait indefinitely for
|
||||||
|
// stop (operator-outage-safe — two agents is wasteful but never blind).
|
||||||
|
// Timeout with no healthy heartbeat ⇒ revert and stay.
|
||||||
|
tokio::pin!(new_version_healthy);
|
||||||
|
tokio::pin!(operator_stop);
|
||||||
|
tokio::select! {
|
||||||
|
_ = &mut operator_stop => {
|
||||||
|
status.publish(Done, None).await;
|
||||||
|
return UpgradeAction::Exit;
|
||||||
|
}
|
||||||
|
_ = &mut new_version_healthy => {}
|
||||||
|
_ = tokio::time::sleep(heartbeat_timeout) => {
|
||||||
|
let _ = executor.revert().await;
|
||||||
|
status
|
||||||
|
.publish(Failed, Some("new version did not report healthy in time; reverted".into()))
|
||||||
|
.await;
|
||||||
|
return UpgradeAction::StayedOnCurrent;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Healthy observed — commit is now the operator's; wait for its stop.
|
||||||
|
operator_stop.await;
|
||||||
|
status.publish(Done, None).await;
|
||||||
|
UpgradeAction::Exit
|
||||||
|
}
|
||||||
|
|
||||||
|
// ───────────────────────── real executor ─────────────────────────
|
||||||
|
|
||||||
|
/// Production [`UpgradeExecutor`]: download+verify, `--self-test`, atomic
|
||||||
|
/// symlink swap, and a systemd transient unit for the new version.
|
||||||
|
pub struct SystemdUpgradeExecutor {
|
||||||
|
/// The version this (old) agent is running — used to preserve its binary at
|
||||||
|
/// a versioned path so `revert` always has a target, even on a pre-M1 plain
|
||||||
|
/// install where `ACTIVE_SYMLINK` is a regular file.
|
||||||
|
current_version: String,
|
||||||
|
/// Previous symlink target, captured at cutover so `revert` can restore it.
|
||||||
|
previous_target: tokio::sync::Mutex<Option<PathBuf>>,
|
||||||
|
/// Transient systemd unit name for the started new version, for `revert`.
|
||||||
|
started_unit: tokio::sync::Mutex<Option<String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl SystemdUpgradeExecutor {
|
||||||
|
pub fn new(current_version: String) -> Self {
|
||||||
|
Self {
|
||||||
|
current_version,
|
||||||
|
previous_target: tokio::sync::Mutex::new(None),
|
||||||
|
started_unit: tokio::sync::Mutex::new(None),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn transient_unit(version: &str) -> String {
|
||||||
|
format!("fleet-agent-v{version}")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// The revert target: the existing symlink target if `ACTIVE_SYMLINK` is a
|
||||||
|
/// symlink, else the running binary preserved at its versioned path (so a
|
||||||
|
/// plain-file initial install can still be rolled back to).
|
||||||
|
fn capture_revert_target(&self) -> PathBuf {
|
||||||
|
if let Ok(target) = std::fs::read_link(ACTIVE_SYMLINK) {
|
||||||
|
return target;
|
||||||
|
}
|
||||||
|
let preserved =
|
||||||
|
PathBuf::from(VERSIONED_BIN_DIR).join(format!("fleet-agent-v{}", self.current_version));
|
||||||
|
if !preserved.exists() {
|
||||||
|
if let Ok(exe) = std::env::current_exe() {
|
||||||
|
if let Err(e) = std::fs::copy(&exe, &preserved) {
|
||||||
|
tracing::warn!(error = %e, "upgrade: could not preserve current binary");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
preserved
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UpgradeExecutor for SystemdUpgradeExecutor {
|
||||||
|
async fn stage(&self, marker: &AgentUpgradeMarker) -> Result<PathBuf, UpgradeError> {
|
||||||
|
use harmony_downloadable_asset::DownloadableAsset;
|
||||||
|
let asset = DownloadableAsset {
|
||||||
|
url: url::Url::parse(&marker.url).map_err(|e| UpgradeError::Url(e.to_string()))?,
|
||||||
|
file_name: format!("fleet-agent-v{}", marker.desired_version),
|
||||||
|
checksum: marker.sha256.clone(),
|
||||||
|
};
|
||||||
|
let path = asset
|
||||||
|
.download_to_path(PathBuf::from(VERSIONED_BIN_DIR))
|
||||||
|
.await?;
|
||||||
|
// Make it executable (0755).
|
||||||
|
let mut perms = std::fs::metadata(&path)?.permissions();
|
||||||
|
use std::os::unix::fs::PermissionsExt;
|
||||||
|
perms.set_mode(0o755);
|
||||||
|
std::fs::set_permissions(&path, perms)?;
|
||||||
|
Ok(path)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn self_test(&self, binary: &Path) -> Result<(), UpgradeError> {
|
||||||
|
let status = tokio::process::Command::new(binary)
|
||||||
|
.arg("--self-test")
|
||||||
|
.status()
|
||||||
|
.await
|
||||||
|
.map_err(|e| UpgradeError::SelfTest(e.to_string()))?;
|
||||||
|
if status.success() {
|
||||||
|
Ok(())
|
||||||
|
} else {
|
||||||
|
Err(UpgradeError::SelfTest(format!("exit {status}")))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn cutover(&self, binary: &Path, version: &str) -> Result<(), UpgradeError> {
|
||||||
|
// Record (and, if needed, preserve) the current binary so revert can
|
||||||
|
// restore it.
|
||||||
|
*self.previous_target.lock().await = Some(self.capture_revert_target());
|
||||||
|
|
||||||
|
// Atomic swap: symlink to a temp name, rename over the active path.
|
||||||
|
let tmp = PathBuf::from(format!("{ACTIVE_SYMLINK}.new"));
|
||||||
|
let _ = std::fs::remove_file(&tmp);
|
||||||
|
std::os::unix::fs::symlink(binary, &tmp)?;
|
||||||
|
std::fs::rename(&tmp, ACTIVE_SYMLINK)?;
|
||||||
|
|
||||||
|
// Start the new version alongside us as a transient systemd unit, so the
|
||||||
|
// operator can observe its heartbeat before committing the stop.
|
||||||
|
let unit = Self::transient_unit(version);
|
||||||
|
let out = tokio::process::Command::new("systemd-run")
|
||||||
|
.args(["--unit", &unit, "--collect"])
|
||||||
|
.arg(binary)
|
||||||
|
.output()
|
||||||
|
.await
|
||||||
|
.map_err(|e| UpgradeError::Systemd(e.to_string()))?;
|
||||||
|
if !out.status.success() {
|
||||||
|
return Err(UpgradeError::Systemd(
|
||||||
|
String::from_utf8_lossy(&out.stderr).to_string(),
|
||||||
|
));
|
||||||
|
}
|
||||||
|
*self.started_unit.lock().await = Some(unit);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn revert(&self) -> Result<(), UpgradeError> {
|
||||||
|
// Stop the transient unit if we started one.
|
||||||
|
if let Some(unit) = self.started_unit.lock().await.take() {
|
||||||
|
let _ = tokio::process::Command::new("systemctl")
|
||||||
|
.args(["stop", &unit])
|
||||||
|
.status()
|
||||||
|
.await;
|
||||||
|
}
|
||||||
|
// Restore the previous symlink target.
|
||||||
|
let previous = self
|
||||||
|
.previous_target
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.clone()
|
||||||
|
.ok_or(UpgradeError::NothingToRevert)?;
|
||||||
|
let tmp = PathBuf::from(format!("{ACTIVE_SYMLINK}.revert"));
|
||||||
|
let _ = std::fs::remove_file(&tmp);
|
||||||
|
std::os::unix::fs::symlink(&previous, &tmp)?;
|
||||||
|
std::fs::rename(&tmp, ACTIVE_SYMLINK)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ───────────────────────── NATS wiring ─────────────────────────
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
|
use async_nats::jetstream::{self, kv::Operation};
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use harmony_reconciler_contracts::{
|
||||||
|
AgentUpgradeStatus, BUCKET_AGENT_UPGRADE, BUCKET_AGENT_UPGRADE_STATUS, BUCKET_DEVICE_HEARTBEAT,
|
||||||
|
HeartbeatPayload, Id, agent_upgrade_key, device_heartbeat_key,
|
||||||
|
};
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use tokio::sync::Notify;
|
||||||
|
|
||||||
|
/// The operator's cutover stop signal, shared between the command server (which
|
||||||
|
/// receives `Verb::UpgradeStop`) and the upgrade manager (which awaits it).
|
||||||
|
///
|
||||||
|
/// Both old and new agents subscribe to the device command subject, so both
|
||||||
|
/// *receive* the stop — but it's only **armed** on the old agent during its
|
||||||
|
/// post-cutover wait. Outside that window `fire` is a no-op, so a stray stop
|
||||||
|
/// (or one meant for a previous upgrade) can't make an agent exit, and no stale
|
||||||
|
/// permit accumulates to trip the next upgrade.
|
||||||
|
pub struct UpgradeStopSignal {
|
||||||
|
notify: Notify,
|
||||||
|
armed: AtomicBool,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Default for UpgradeStopSignal {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
notify: Notify::new(),
|
||||||
|
armed: AtomicBool::new(false),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UpgradeStopSignal {
|
||||||
|
/// Fire the signal if armed. Returns whether it was (for logging).
|
||||||
|
pub fn fire(&self) -> bool {
|
||||||
|
if self.armed.load(Ordering::SeqCst) {
|
||||||
|
self.notify.notify_one();
|
||||||
|
true
|
||||||
|
} else {
|
||||||
|
false
|
||||||
|
}
|
||||||
|
}
|
||||||
|
async fn notified(&self) {
|
||||||
|
self.notify.notified().await;
|
||||||
|
}
|
||||||
|
fn arm(&self) {
|
||||||
|
self.armed.store(true, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
fn disarm(&self) {
|
||||||
|
self.armed.store(false, Ordering::SeqCst);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Writes [`AgentUpgradeStatus`] to the status KV so the operator can reflect
|
||||||
|
/// the live phase onto the `Device` CR.
|
||||||
|
struct KvStatusSink {
|
||||||
|
bucket: jetstream::kv::Store,
|
||||||
|
device_id: Id,
|
||||||
|
current_version: String,
|
||||||
|
target: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl StatusSink for KvStatusSink {
|
||||||
|
async fn publish(&self, phase: AgentUpgradePhase, last_error: Option<String>) {
|
||||||
|
let status = AgentUpgradeStatus {
|
||||||
|
device_id: self.device_id.clone(),
|
||||||
|
phase,
|
||||||
|
current_version: self.current_version.clone(),
|
||||||
|
target_version: Some(self.target.clone()),
|
||||||
|
last_error,
|
||||||
|
at: chrono::Utc::now(),
|
||||||
|
};
|
||||||
|
let key = agent_upgrade_key(&self.device_id.to_string());
|
||||||
|
match serde_json::to_vec(&status) {
|
||||||
|
Ok(payload) => {
|
||||||
|
if let Err(e) = self.bucket.put(&key, payload.into()).await {
|
||||||
|
tracing::warn!(error = %e, "upgrade: status publish failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Err(e) => tracing::warn!(error = %e, "upgrade: status serialize failed"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Watch the device's upgrade marker; run the state machine when a marker asks
|
||||||
|
/// for a different version. Returns `Ok(())` only when an upgrade succeeds and
|
||||||
|
/// the operator signals stop — the caller (main) treats that as "exit so the
|
||||||
|
/// new version takes over". Failures/reverts keep watching for a fixed marker.
|
||||||
|
pub async fn run(
|
||||||
|
client: async_nats::Client,
|
||||||
|
device_id: Id,
|
||||||
|
current_version: String,
|
||||||
|
operator_stop: Arc<UpgradeStopSignal>,
|
||||||
|
) -> anyhow::Result<()> {
|
||||||
|
let js = jetstream::new(client);
|
||||||
|
let marker_bucket = js
|
||||||
|
.create_key_value(jetstream::kv::Config {
|
||||||
|
bucket: BUCKET_AGENT_UPGRADE.to_string(),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
let status_bucket = js
|
||||||
|
.create_key_value(jetstream::kv::Config {
|
||||||
|
bucket: BUCKET_AGENT_UPGRADE_STATUS.to_string(),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
let heartbeat_bucket = js
|
||||||
|
.create_key_value(jetstream::kv::Config {
|
||||||
|
bucket: BUCKET_DEVICE_HEARTBEAT.to_string(),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let my_key = agent_upgrade_key(&device_id.to_string());
|
||||||
|
tracing::info!(version = %current_version, key = %my_key, "upgrade: watching marker");
|
||||||
|
// Outer loop re-establishes the watch after a NATS blip — only an
|
||||||
|
// operator-committed cutover (`UpgradeAction::Exit`) returns from here.
|
||||||
|
loop {
|
||||||
|
let mut watch = marker_bucket.watch_with_history(&my_key).await?;
|
||||||
|
while let Some(entry) = watch.next().await {
|
||||||
|
let entry = match entry {
|
||||||
|
Ok(e) => e,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "upgrade: marker watch error");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if entry.operation != Operation::Put {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let marker: AgentUpgradeMarker = match serde_json::from_slice(&entry.value) {
|
||||||
|
Ok(m) => m,
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "upgrade: bad marker payload");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
if marker.desired_version == current_version {
|
||||||
|
tracing::debug!(version = %current_version, "upgrade: already at desired version");
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::info!(
|
||||||
|
from = %current_version,
|
||||||
|
to = %marker.desired_version,
|
||||||
|
"upgrade: marker requests new version"
|
||||||
|
);
|
||||||
|
let executor = SystemdUpgradeExecutor::new(current_version.clone());
|
||||||
|
let sink = KvStatusSink {
|
||||||
|
bucket: status_bucket.clone(),
|
||||||
|
device_id: device_id.clone(),
|
||||||
|
current_version: current_version.clone(),
|
||||||
|
target: marker.desired_version.clone(),
|
||||||
|
};
|
||||||
|
let healthy =
|
||||||
|
wait_new_version_healthy(&heartbeat_bucket, &device_id, &marker.desired_version);
|
||||||
|
// Arm the stop signal for the duration of this attempt; the command
|
||||||
|
// server only forwards an operator stop while armed.
|
||||||
|
operator_stop.arm();
|
||||||
|
let stop = {
|
||||||
|
let signal = operator_stop.clone();
|
||||||
|
async move { signal.notified().await }
|
||||||
|
};
|
||||||
|
let action = drive(&marker, &executor, &sink, healthy, stop, HEARTBEAT_TIMEOUT).await;
|
||||||
|
operator_stop.disarm();
|
||||||
|
match action {
|
||||||
|
UpgradeAction::Exit => {
|
||||||
|
tracing::info!(
|
||||||
|
"upgrade: cutover committed by operator; exiting for new version"
|
||||||
|
);
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
UpgradeAction::StayedOnCurrent => {
|
||||||
|
tracing::warn!("upgrade: stayed on current version; awaiting a new marker");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tracing::warn!("upgrade: marker watch ended; re-establishing");
|
||||||
|
tokio::time::sleep(Duration::from_secs(5)).await;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Resolve once a heartbeat from `target` version is observed on this device's
|
||||||
|
/// heartbeat key (old and new both write it; we wait for the new one).
|
||||||
|
async fn wait_new_version_healthy(bucket: &jetstream::kv::Store, device_id: &Id, target: &str) {
|
||||||
|
let key = device_heartbeat_key(&device_id.to_string());
|
||||||
|
let mut watch = match bucket.watch_with_history(&key).await {
|
||||||
|
Ok(w) => w,
|
||||||
|
// If we can't watch, never report healthy — the drive's timeout reverts.
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "upgrade: heartbeat watch failed");
|
||||||
|
return std::future::pending::<()>().await;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
while let Some(entry) = watch.next().await {
|
||||||
|
if let Ok(e) = entry {
|
||||||
|
if e.operation == Operation::Put {
|
||||||
|
if let Ok(hb) = serde_json::from_slice::<HeartbeatPayload>(&e.value) {
|
||||||
|
if hb.agent_version == target {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use std::sync::Mutex;
|
||||||
|
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||||
|
|
||||||
|
/// Records the phase sequence and lets a test fail any step.
|
||||||
|
#[derive(Default)]
|
||||||
|
struct FakeExecutor {
|
||||||
|
fail_stage: bool,
|
||||||
|
fail_self_test: bool,
|
||||||
|
fail_cutover: bool,
|
||||||
|
reverted: AtomicUsize,
|
||||||
|
cutover_calls: AtomicUsize,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl UpgradeExecutor for FakeExecutor {
|
||||||
|
async fn stage(&self, _m: &AgentUpgradeMarker) -> Result<PathBuf, UpgradeError> {
|
||||||
|
if self.fail_stage {
|
||||||
|
return Err(UpgradeError::Url("bad".into()));
|
||||||
|
}
|
||||||
|
Ok(PathBuf::from("/usr/bin/fleet-agent-v9.9.9"))
|
||||||
|
}
|
||||||
|
async fn self_test(&self, _b: &Path) -> Result<(), UpgradeError> {
|
||||||
|
if self.fail_self_test {
|
||||||
|
return Err(UpgradeError::SelfTest("rc 1".into()));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
async fn cutover(&self, _b: &Path, _v: &str) -> Result<(), UpgradeError> {
|
||||||
|
self.cutover_calls.fetch_add(1, Ordering::SeqCst);
|
||||||
|
if self.fail_cutover {
|
||||||
|
return Err(UpgradeError::Systemd("nope".into()));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
async fn revert(&self) -> Result<(), UpgradeError> {
|
||||||
|
self.reverted.fetch_add(1, Ordering::SeqCst);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct RecordingSink(Mutex<Vec<AgentUpgradePhase>>);
|
||||||
|
impl StatusSink for RecordingSink {
|
||||||
|
async fn publish(&self, phase: AgentUpgradePhase, _e: Option<String>) {
|
||||||
|
self.0.lock().unwrap().push(phase);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
impl RecordingSink {
|
||||||
|
fn phases(&self) -> Vec<AgentUpgradePhase> {
|
||||||
|
self.0.lock().unwrap().clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn marker() -> AgentUpgradeMarker {
|
||||||
|
AgentUpgradeMarker {
|
||||||
|
desired_version: "9.9.9".into(),
|
||||||
|
url: "https://example/agent".into(),
|
||||||
|
sha256: "deadbeef".into(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
use std::future::{Future, pending, ready};
|
||||||
|
|
||||||
|
fn never() -> impl Future<Output = ()> {
|
||||||
|
pending::<()>()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn happy_path_cutover_then_stop_exits() {
|
||||||
|
let exec = FakeExecutor::default();
|
||||||
|
let sink = RecordingSink::default();
|
||||||
|
// Operator stop fires immediately (it saw the new heartbeat).
|
||||||
|
let action = drive(
|
||||||
|
&marker(),
|
||||||
|
&exec,
|
||||||
|
&sink,
|
||||||
|
never(),
|
||||||
|
ready(()),
|
||||||
|
HEARTBEAT_TIMEOUT,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert_eq!(action, UpgradeAction::Exit);
|
||||||
|
assert_eq!(exec.reverted.load(Ordering::SeqCst), 0);
|
||||||
|
assert_eq!(sink.phases(), vec![Staging, Verifying, CutoverReady, Done]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn healthy_then_stop_exits_without_revert() {
|
||||||
|
let exec = FakeExecutor::default();
|
||||||
|
let sink = RecordingSink::default();
|
||||||
|
// Healthy observed first; then (still) operator stop → exit, no revert.
|
||||||
|
let action = drive(
|
||||||
|
&marker(),
|
||||||
|
&exec,
|
||||||
|
&sink,
|
||||||
|
ready(()),
|
||||||
|
ready(()),
|
||||||
|
HEARTBEAT_TIMEOUT,
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert_eq!(action, UpgradeAction::Exit);
|
||||||
|
assert_eq!(exec.reverted.load(Ordering::SeqCst), 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn stage_failure_stays_and_does_not_cutover() {
|
||||||
|
let exec = FakeExecutor {
|
||||||
|
fail_stage: true,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let sink = RecordingSink::default();
|
||||||
|
let action = drive(&marker(), &exec, &sink, never(), never(), HEARTBEAT_TIMEOUT).await;
|
||||||
|
assert_eq!(action, UpgradeAction::StayedOnCurrent);
|
||||||
|
assert_eq!(exec.cutover_calls.load(Ordering::SeqCst), 0);
|
||||||
|
assert_eq!(sink.phases(), vec![Staging, Failed]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn self_test_failure_stays_and_does_not_cutover() {
|
||||||
|
let exec = FakeExecutor {
|
||||||
|
fail_self_test: true,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let sink = RecordingSink::default();
|
||||||
|
let action = drive(&marker(), &exec, &sink, never(), never(), HEARTBEAT_TIMEOUT).await;
|
||||||
|
assert_eq!(action, UpgradeAction::StayedOnCurrent);
|
||||||
|
assert_eq!(exec.cutover_calls.load(Ordering::SeqCst), 0);
|
||||||
|
assert_eq!(sink.phases(), vec![Staging, Verifying, Failed]);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn cutover_failure_reverts_and_stays() {
|
||||||
|
let exec = FakeExecutor {
|
||||||
|
fail_cutover: true,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let sink = RecordingSink::default();
|
||||||
|
let action = drive(&marker(), &exec, &sink, never(), never(), HEARTBEAT_TIMEOUT).await;
|
||||||
|
assert_eq!(action, UpgradeAction::StayedOnCurrent);
|
||||||
|
assert_eq!(exec.reverted.load(Ordering::SeqCst), 1);
|
||||||
|
assert_eq!(
|
||||||
|
sink.phases(),
|
||||||
|
vec![Staging, Verifying, CutoverReady, Failed]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test(start_paused = true)]
|
||||||
|
async fn heartbeat_timeout_reverts_and_stays() {
|
||||||
|
let exec = FakeExecutor::default();
|
||||||
|
let sink = RecordingSink::default();
|
||||||
|
// Never healthy, never stopped → the timeout fires → revert.
|
||||||
|
let action = drive(
|
||||||
|
&marker(),
|
||||||
|
&exec,
|
||||||
|
&sink,
|
||||||
|
never(),
|
||||||
|
never(),
|
||||||
|
Duration::from_secs(60),
|
||||||
|
)
|
||||||
|
.await;
|
||||||
|
assert_eq!(action, UpgradeAction::StayedOnCurrent);
|
||||||
|
assert_eq!(exec.reverted.load(Ordering::SeqCst), 1);
|
||||||
|
assert_eq!(
|
||||||
|
sink.phases(),
|
||||||
|
vec![Staging, Verifying, CutoverReady, Failed]
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -102,6 +102,24 @@ impl FleetCommandsClient {
|
|||||||
};
|
};
|
||||||
Ok(serde_json::from_slice(&resp.payload)?)
|
Ok(serde_json::from_slice(&resp.payload)?)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Send `Verb::UpgradeStop` at upgrade cutover (ADR-022). The operator is the
|
||||||
|
/// single source of truth for the stop — it calls this only after observing
|
||||||
|
/// the new version's heartbeat. The old agent acks, then exits; the new
|
||||||
|
/// (already running) version takes over.
|
||||||
|
pub async fn upgrade_stop(&self, device_id: &str, reason: &str) -> Result<(), CommandError> {
|
||||||
|
let subject = device_command_subject(device_id, Verb::UpgradeStop);
|
||||||
|
let body = serde_json::to_vec(&CommandRequest::UpgradeStop {
|
||||||
|
reason: reason.to_string(),
|
||||||
|
})
|
||||||
|
.expect("CommandRequest serializes");
|
||||||
|
let fut = self.nc.request(subject, body.into());
|
||||||
|
match tokio::time::timeout(self.timeout, fut).await {
|
||||||
|
Ok(Ok(_)) => Ok(()),
|
||||||
|
Ok(Err(err)) => Err(map_request_error(err, self.timeout)),
|
||||||
|
Err(_) => Err(CommandError::Timeout(self.timeout)),
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Map an async-nats `RequestError` to our typed surface. The `kind`
|
/// Map an async-nats `RequestError` to our typed surface. The `kind`
|
||||||
|
|||||||
@@ -17,3 +17,4 @@ pub mod device_reconciler;
|
|||||||
pub mod device_status;
|
pub mod device_status;
|
||||||
pub mod fleet_aggregator;
|
pub mod fleet_aggregator;
|
||||||
pub mod liveness;
|
pub mod liveness;
|
||||||
|
pub mod upgrade_coordinator;
|
||||||
|
|||||||
@@ -5,7 +5,9 @@ mod frontend;
|
|||||||
#[cfg(feature = "web-frontend")]
|
#[cfg(feature = "web-frontend")]
|
||||||
mod service;
|
mod service;
|
||||||
|
|
||||||
use harmony_fleet_operator::{device_reconciler, device_status, fleet_aggregator};
|
use harmony_fleet_operator::{
|
||||||
|
device_reconciler, device_status, fleet_aggregator, upgrade_coordinator,
|
||||||
|
};
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
use async_nats::jetstream;
|
use async_nats::jetstream;
|
||||||
@@ -258,10 +260,15 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()>
|
|||||||
let dr_js = js.clone();
|
let dr_js = js.clone();
|
||||||
let ds_client = client.clone();
|
let ds_client = client.clone();
|
||||||
let ds_js = js.clone();
|
let ds_js = js.clone();
|
||||||
|
// upgrade_coordinator — agent-upgrade status/heartbeat → cutover stop + CR
|
||||||
|
let uc_client = client.clone();
|
||||||
|
let uc_js = js.clone();
|
||||||
|
let uc_commands = harmony_fleet_operator::commands::FleetCommandsClient::new(nats.clone());
|
||||||
tokio::select! {
|
tokio::select! {
|
||||||
r = controller::run(ctl_client, desired_state_kv) => r,
|
r = controller::run(ctl_client, desired_state_kv) => r,
|
||||||
r = device_reconciler::run(dr_client, dr_js) => r,
|
r = device_reconciler::run(dr_client, dr_js) => r,
|
||||||
r = device_status::run(ds_client, ds_js) => r,
|
r = device_status::run(ds_client, ds_js) => r,
|
||||||
|
r = upgrade_coordinator::run(uc_client, uc_js, uc_commands) => r,
|
||||||
r = fleet_aggregator::run(client, js, liveness) => r,
|
r = fleet_aggregator::run(client, js, liveness) => r,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -592,6 +592,8 @@ mod tests {
|
|||||||
DeviceLiveness {
|
DeviceLiveness {
|
||||||
last_heartbeat: None,
|
last_heartbeat: None,
|
||||||
reachability: r,
|
reachability: r,
|
||||||
|
current_version: None,
|
||||||
|
upgrade: None,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
238
fleet/harmony-fleet-operator/src/upgrade_coordinator.rs
Normal file
238
fleet/harmony-fleet-operator/src/upgrade_coordinator.rs
Normal file
@@ -0,0 +1,238 @@
|
|||||||
|
//! Operator-side agent-upgrade coordinator (ADR-022).
|
||||||
|
//!
|
||||||
|
//! The operator is the single source of truth for the cutover stop. This loop:
|
||||||
|
//! 1. watches the agent **upgrade-status** KV (phase per device) and the
|
||||||
|
//! **heartbeat** KV (running version per device);
|
||||||
|
//! 2. when a device is `CutoverReady` **and** the operator has independently
|
||||||
|
//! observed a heartbeat from the new version, sends `Verb::UpgradeStop` to
|
||||||
|
//! the old agent — never before;
|
||||||
|
//! 3. reflects the current version + upgrade phase onto the `Device` CR.
|
||||||
|
//!
|
||||||
|
//! The agent never self-stops; only this operator-observed handoff advances the
|
||||||
|
//! upgrade. See [`should_send_stop`] for the load-bearing decision.
|
||||||
|
|
||||||
|
use std::collections::{HashMap, HashSet};
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use async_nats::jetstream::kv::Operation;
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use harmony_reconciler_contracts::{
|
||||||
|
AgentUpgradePhase, AgentUpgradeStatus, BUCKET_AGENT_UPGRADE_STATUS, BUCKET_DEVICE_HEARTBEAT,
|
||||||
|
HeartbeatPayload,
|
||||||
|
};
|
||||||
|
use kube::Client;
|
||||||
|
use kube::api::{Api, Patch, PatchParams};
|
||||||
|
use serde_json::json;
|
||||||
|
use tokio::sync::Mutex;
|
||||||
|
|
||||||
|
use harmony::modules::fleet::operator::{Device, DeviceUpgradeStatus};
|
||||||
|
|
||||||
|
use crate::commands::FleetCommandsClient;
|
||||||
|
|
||||||
|
/// The handoff decision: commit the stop only when the agent is `CutoverReady`
|
||||||
|
/// **and** the operator has seen a heartbeat from the target version. Pure so
|
||||||
|
/// it can be unit-tested without NATS.
|
||||||
|
pub fn should_send_stop(status: &AgentUpgradeStatus, observed_version: Option<&str>) -> bool {
|
||||||
|
if status.phase != AgentUpgradePhase::CutoverReady {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
match (&status.target_version, observed_version) {
|
||||||
|
(Some(target), Some(seen)) => seen == target,
|
||||||
|
_ => false,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Default)]
|
||||||
|
struct Coordinator {
|
||||||
|
/// Latest upgrade status per device.
|
||||||
|
statuses: HashMap<String, AgentUpgradeStatus>,
|
||||||
|
/// Latest heartbeat version per device.
|
||||||
|
heartbeat_versions: HashMap<String, String>,
|
||||||
|
/// Devices we've already sent the stop to (don't resend every tick).
|
||||||
|
stopped: HashSet<String>,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn run(
|
||||||
|
kube: Client,
|
||||||
|
js: async_nats::jetstream::Context,
|
||||||
|
commands: FleetCommandsClient,
|
||||||
|
) -> Result<()> {
|
||||||
|
let status_bucket = js
|
||||||
|
.create_key_value(async_nats::jetstream::kv::Config {
|
||||||
|
bucket: BUCKET_AGENT_UPGRADE_STATUS.to_string(),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
let heartbeat_bucket = js
|
||||||
|
.create_key_value(async_nats::jetstream::kv::Config {
|
||||||
|
bucket: BUCKET_DEVICE_HEARTBEAT.to_string(),
|
||||||
|
..Default::default()
|
||||||
|
})
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let state = Mutex::new(Coordinator::default());
|
||||||
|
let devices: Api<Device> = Api::all(kube);
|
||||||
|
|
||||||
|
let status_watch = async {
|
||||||
|
let mut w = status_bucket.watch_with_history(">").await?;
|
||||||
|
while let Some(entry) = w.next().await {
|
||||||
|
let Ok(entry) = entry else { continue };
|
||||||
|
if entry.operation != Operation::Put {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let Ok(status) = serde_json::from_slice::<AgentUpgradeStatus>(&entry.value) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
let device = status.device_id.to_string();
|
||||||
|
{
|
||||||
|
let mut g = state.lock().await;
|
||||||
|
// A fresh non-terminal phase means a new attempt — allow a stop again.
|
||||||
|
if status.phase != AgentUpgradePhase::CutoverReady {
|
||||||
|
g.stopped.remove(&device);
|
||||||
|
}
|
||||||
|
g.statuses.insert(device.clone(), status);
|
||||||
|
}
|
||||||
|
reconcile_device(&state, &devices, &commands, &device).await;
|
||||||
|
}
|
||||||
|
Ok::<(), anyhow::Error>(())
|
||||||
|
};
|
||||||
|
|
||||||
|
let heartbeat_watch = async {
|
||||||
|
let mut w = heartbeat_bucket.watch_with_history(">").await?;
|
||||||
|
while let Some(entry) = w.next().await {
|
||||||
|
let Ok(entry) = entry else { continue };
|
||||||
|
if entry.operation != Operation::Put {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let Ok(hb) = serde_json::from_slice::<HeartbeatPayload>(&entry.value) else {
|
||||||
|
continue;
|
||||||
|
};
|
||||||
|
if hb.agent_version.is_empty() {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
let device = hb.device_id.to_string();
|
||||||
|
state
|
||||||
|
.lock()
|
||||||
|
.await
|
||||||
|
.heartbeat_versions
|
||||||
|
.insert(device.clone(), hb.agent_version);
|
||||||
|
reconcile_device(&state, &devices, &commands, &device).await;
|
||||||
|
}
|
||||||
|
Ok::<(), anyhow::Error>(())
|
||||||
|
};
|
||||||
|
|
||||||
|
tokio::try_join!(status_watch, heartbeat_watch)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// For one device: decide whether to commit the stop, then reflect version +
|
||||||
|
/// upgrade phase onto its CR.
|
||||||
|
async fn reconcile_device(
|
||||||
|
state: &Mutex<Coordinator>,
|
||||||
|
devices: &Api<Device>,
|
||||||
|
commands: &FleetCommandsClient,
|
||||||
|
device: &str,
|
||||||
|
) {
|
||||||
|
// Snapshot the decision inputs under the lock.
|
||||||
|
let (send_stop, status, version) = {
|
||||||
|
let g = state.lock().await;
|
||||||
|
let status = g.statuses.get(device).cloned();
|
||||||
|
let version = g.heartbeat_versions.get(device).cloned();
|
||||||
|
let send_stop = status
|
||||||
|
.as_ref()
|
||||||
|
.map(|s| should_send_stop(s, version.as_deref()) && !g.stopped.contains(device))
|
||||||
|
.unwrap_or(false);
|
||||||
|
(send_stop, status, version)
|
||||||
|
};
|
||||||
|
|
||||||
|
if send_stop {
|
||||||
|
// Mark before sending so a concurrent reconcile doesn't double-send.
|
||||||
|
state.lock().await.stopped.insert(device.to_string());
|
||||||
|
tracing::info!(device, "upgrade: new version healthy; sending cutover stop");
|
||||||
|
if let Err(e) = commands
|
||||||
|
.upgrade_stop(device, "upgrade cutover committed")
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
tracing::warn!(device, error = %e, "upgrade: stop send failed; will retry on next event");
|
||||||
|
state.lock().await.stopped.remove(device);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
patch_device(devices, device, status.as_ref(), version.as_deref()).await;
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn patch_device(
|
||||||
|
devices: &Api<Device>,
|
||||||
|
device: &str,
|
||||||
|
status: Option<&AgentUpgradeStatus>,
|
||||||
|
version: Option<&str>,
|
||||||
|
) {
|
||||||
|
// Surface the upgrade only while it's in flight (drop it on Done/Running).
|
||||||
|
let upgrade = status.and_then(|s| match s.phase {
|
||||||
|
AgentUpgradePhase::Done | AgentUpgradePhase::Running => None,
|
||||||
|
phase => Some(DeviceUpgradeStatus {
|
||||||
|
phase,
|
||||||
|
target_version: s.target_version.clone(),
|
||||||
|
last_error: s.last_error.clone(),
|
||||||
|
}),
|
||||||
|
});
|
||||||
|
let patch = json!({
|
||||||
|
"status": {
|
||||||
|
"currentVersion": version,
|
||||||
|
"upgrade": upgrade,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
if let Err(e) = devices
|
||||||
|
.patch_status(device, &PatchParams::default(), &Patch::Merge(&patch))
|
||||||
|
.await
|
||||||
|
{
|
||||||
|
// A heartbeat/status can outrace the Device CR's creation; retry later.
|
||||||
|
if !matches!(&e, kube::Error::Api(ae) if ae.code == 404) {
|
||||||
|
tracing::warn!(device, error = %e, "upgrade: device status patch failed");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use chrono::Utc;
|
||||||
|
use harmony_reconciler_contracts::Id;
|
||||||
|
|
||||||
|
fn status(phase: AgentUpgradePhase, target: Option<&str>) -> AgentUpgradeStatus {
|
||||||
|
AgentUpgradeStatus {
|
||||||
|
device_id: Id::from("dev-1"),
|
||||||
|
phase,
|
||||||
|
current_version: "0.1.0".into(),
|
||||||
|
target_version: target.map(String::from),
|
||||||
|
last_error: None,
|
||||||
|
at: Utc::now(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn commits_stop_only_when_cutover_ready_and_new_version_seen() {
|
||||||
|
let s = status(AgentUpgradePhase::CutoverReady, Some("0.2.0"));
|
||||||
|
assert!(should_send_stop(&s, Some("0.2.0")));
|
||||||
|
// New version not yet observed (still old) → hold.
|
||||||
|
assert!(!should_send_stop(&s, Some("0.1.0")));
|
||||||
|
// No heartbeat seen → hold.
|
||||||
|
assert!(!should_send_stop(&s, None));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn never_commits_before_cutover_ready() {
|
||||||
|
for phase in [
|
||||||
|
AgentUpgradePhase::Staging,
|
||||||
|
AgentUpgradePhase::Verifying,
|
||||||
|
AgentUpgradePhase::Failed,
|
||||||
|
AgentUpgradePhase::Running,
|
||||||
|
] {
|
||||||
|
let s = status(phase, Some("0.2.0"));
|
||||||
|
assert!(
|
||||||
|
!should_send_stop(&s, Some("0.2.0")),
|
||||||
|
"{phase:?} must not commit"
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -45,6 +45,12 @@ pub enum Verb {
|
|||||||
/// Run a shell command on the device and return its captured
|
/// Run a shell command on the device and return its captured
|
||||||
/// output. Single-shot (no streaming); the agent runs `sh -c`.
|
/// output. Single-shot (no streaming); the agent runs `sh -c`.
|
||||||
Exec,
|
Exec,
|
||||||
|
/// Operator → old agent: gracefully exit during an upgrade cutover
|
||||||
|
/// (ADR-022). The operator is the single source of truth for the stop —
|
||||||
|
/// the agent never self-stops. Reusing the command protocol means no new
|
||||||
|
/// subject or callout-permission change.
|
||||||
|
#[serde(rename = "upgrade-stop")]
|
||||||
|
UpgradeStop,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Verb {
|
impl Verb {
|
||||||
@@ -55,6 +61,7 @@ impl Verb {
|
|||||||
match self {
|
match self {
|
||||||
Verb::Ping => "ping",
|
Verb::Ping => "ping",
|
||||||
Verb::Exec => "exec",
|
Verb::Exec => "exec",
|
||||||
|
Verb::UpgradeStop => "upgrade-stop",
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -77,7 +84,14 @@ pub fn device_command_subscription(device_id: &str) -> String {
|
|||||||
#[serde(tag = "verb", rename_all = "lowercase")]
|
#[serde(tag = "verb", rename_all = "lowercase")]
|
||||||
pub enum CommandRequest {
|
pub enum CommandRequest {
|
||||||
Ping,
|
Ping,
|
||||||
Exec { command: String },
|
Exec {
|
||||||
|
command: String,
|
||||||
|
},
|
||||||
|
/// Stop the old agent at upgrade cutover. `reason` is logged.
|
||||||
|
#[serde(rename = "upgrade-stop")]
|
||||||
|
UpgradeStop {
|
||||||
|
reason: String,
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
/// JSON body of a `Verb::Ping` reply.
|
/// JSON body of a `Verb::Ping` reply.
|
||||||
|
|||||||
@@ -139,6 +139,11 @@ pub struct DeploymentState {
|
|||||||
pub struct HeartbeatPayload {
|
pub struct HeartbeatPayload {
|
||||||
pub device_id: Id,
|
pub device_id: Id,
|
||||||
pub at: DateTime<Utc>,
|
pub at: DateTime<Utc>,
|
||||||
|
/// Agent crate version the device is running (ADR-022). `#[serde(default)]`
|
||||||
|
/// so a pre-upgrade agent's heartbeat (no field) decodes as `""` =
|
||||||
|
/// "unknown" rather than failing — operators in the field must tolerate it.
|
||||||
|
#[serde(default)]
|
||||||
|
pub agent_version: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
@@ -239,10 +244,11 @@ mod tests {
|
|||||||
let hb = HeartbeatPayload {
|
let hb = HeartbeatPayload {
|
||||||
device_id: Id::from("pi-01".to_string()),
|
device_id: Id::from("pi-01".to_string()),
|
||||||
at: ts("2026-04-22T10:00:30Z"),
|
at: ts("2026-04-22T10:00:30Z"),
|
||||||
|
agent_version: "0.1.0".to_string(),
|
||||||
};
|
};
|
||||||
let bytes = serde_json::to_vec(&hb).unwrap();
|
let bytes = serde_json::to_vec(&hb).unwrap();
|
||||||
assert!(
|
assert!(
|
||||||
bytes.len() < 96,
|
bytes.len() < 128,
|
||||||
"heartbeat payload grew to {} bytes: {}",
|
"heartbeat payload grew to {} bytes: {}",
|
||||||
bytes.len(),
|
bytes.len(),
|
||||||
String::from_utf8_lossy(&bytes),
|
String::from_utf8_lossy(&bytes),
|
||||||
|
|||||||
@@ -32,6 +32,20 @@ pub const BUCKET_DEVICE_STATE: &str = "device-state";
|
|||||||
/// the state bucket. Key format: `heartbeat.<device_id>`.
|
/// the state bucket. Key format: `heartbeat.<device_id>`.
|
||||||
pub const BUCKET_DEVICE_HEARTBEAT: &str = "device-heartbeat";
|
pub const BUCKET_DEVICE_HEARTBEAT: &str = "device-heartbeat";
|
||||||
|
|
||||||
|
/// Operator → agent upgrade marker (ADR-022): desired agent version + signed
|
||||||
|
/// binary URL + SHA-256, one per device. Key format: `agent-upgrade.<device_id>`.
|
||||||
|
pub const BUCKET_AGENT_UPGRADE: &str = "agent-upgrade";
|
||||||
|
|
||||||
|
/// Agent → operator upgrade status: the live phase of the upgrade state
|
||||||
|
/// machine. Key format: `agent-upgrade.<device_id>`.
|
||||||
|
pub const BUCKET_AGENT_UPGRADE_STATUS: &str = "agent-upgrade-status";
|
||||||
|
|
||||||
|
/// KV key for a device's upgrade marker / status. Format:
|
||||||
|
/// `agent-upgrade.<device_id>` (shared shape across both upgrade buckets).
|
||||||
|
pub fn agent_upgrade_key(device_id: &str) -> String {
|
||||||
|
format!("agent-upgrade.{device_id}")
|
||||||
|
}
|
||||||
|
|
||||||
/// KV key for a `(device, deployment)` pair in [`BUCKET_DESIRED_STATE`].
|
/// KV key for a `(device, deployment)` pair in [`BUCKET_DESIRED_STATE`].
|
||||||
/// Format: `<device>.<deployment>`.
|
/// Format: `<device>.<deployment>`.
|
||||||
pub fn desired_state_key(device_id: &str, deployment_name: &DeploymentName) -> String {
|
pub fn desired_state_key(device_id: &str, deployment_name: &DeploymentName) -> String {
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ pub mod commands;
|
|||||||
pub mod fleet;
|
pub mod fleet;
|
||||||
pub mod kv;
|
pub mod kv;
|
||||||
pub mod status;
|
pub mod status;
|
||||||
|
pub mod upgrade;
|
||||||
|
|
||||||
pub use commands::{
|
pub use commands::{
|
||||||
CommandRequest, ErrorKind, ErrorReply, ExecReply, HDR_DEADLINE, HDR_FINAL, HDR_OPERATOR_SUB,
|
CommandRequest, ErrorKind, ErrorReply, ExecReply, HDR_DEADLINE, HDR_FINAL, HDR_OPERATOR_SUB,
|
||||||
@@ -30,11 +31,13 @@ pub use fleet::{
|
|||||||
DeploymentName, DeploymentState, DeviceInfo, HeartbeatPayload, InvalidDeploymentName,
|
DeploymentName, DeploymentState, DeviceInfo, HeartbeatPayload, InvalidDeploymentName,
|
||||||
};
|
};
|
||||||
pub use kv::{
|
pub use kv::{
|
||||||
BUCKET_DESIRED_STATE, BUCKET_DEVICE_HEARTBEAT, BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE,
|
BUCKET_AGENT_UPGRADE, BUCKET_AGENT_UPGRADE_STATUS, BUCKET_DESIRED_STATE,
|
||||||
|
BUCKET_DEVICE_HEARTBEAT, BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE, agent_upgrade_key,
|
||||||
desired_state_key, desired_state_watch_filter, device_heartbeat_key, device_info_key,
|
desired_state_key, desired_state_watch_filter, device_heartbeat_key, device_info_key,
|
||||||
device_state_key,
|
device_state_key,
|
||||||
};
|
};
|
||||||
pub use status::{InventorySnapshot, Phase};
|
pub use status::{InventorySnapshot, Phase};
|
||||||
|
pub use upgrade::{AgentUpgradeMarker, AgentUpgradePhase, AgentUpgradeStatus};
|
||||||
|
|
||||||
// Re-exports so consumers (agent, operator) don't need a direct
|
// Re-exports so consumers (agent, operator) don't need a direct
|
||||||
// harmony_types dependency purely to name the cross-boundary types.
|
// harmony_types dependency purely to name the cross-boundary types.
|
||||||
|
|||||||
98
harmony-reconciler-contracts/src/upgrade.rs
Normal file
98
harmony-reconciler-contracts/src/upgrade.rs
Normal file
@@ -0,0 +1,98 @@
|
|||||||
|
//! Agent self-upgrade wire types (ADR-022).
|
||||||
|
//!
|
||||||
|
//! The operator publishes an [`AgentUpgradeMarker`] per device (desired version
|
||||||
|
//! + signed binary URL + SHA-256). The agent runs the upgrade state machine and
|
||||||
|
//! publishes [`AgentUpgradeStatus`] back so the operator can reflect progress
|
||||||
|
//! onto the `Device` CR and decide when to send the stop signal. Both ride NATS
|
||||||
|
//! KV (not a fire-and-forget subject) so they survive an operator restart.
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use schemars::JsonSchema;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use crate::Id;
|
||||||
|
|
||||||
|
/// Operator → agent: the version this device should run, and where to fetch it.
|
||||||
|
/// Written to KV key `agent-upgrade.<device_id>` in
|
||||||
|
/// [`crate::BUCKET_AGENT_UPGRADE`]. The agent acts only when
|
||||||
|
/// `desired_version` differs from what it's running; **any** change re-triggers
|
||||||
|
/// the state machine (so re-publishing the same version after a fix retries).
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
pub struct AgentUpgradeMarker {
|
||||||
|
/// Target crate version, `MAJOR.MINOR.PATCH` (no `v` prefix, no build
|
||||||
|
/// metadata — the on-disk path is `/usr/bin/fleet-agent-v<version>`).
|
||||||
|
pub desired_version: String,
|
||||||
|
/// HTTPS URL of the versioned binary (Gitea/Harbor release asset, CDN, …).
|
||||||
|
pub url: String,
|
||||||
|
/// Lowercase hex SHA-256 of the binary. Verified before the binary is
|
||||||
|
/// staged; a mismatch fails the upgrade and the agent stays on its version.
|
||||||
|
pub sha256: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Phases of the agent upgrade state machine (ADR-022). `Running` is steady
|
||||||
|
/// state; the rest are the upgrade in flight. `Failed` and `Done` are terminal
|
||||||
|
/// for a given marker.
|
||||||
|
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize, JsonSchema)]
|
||||||
|
#[serde(rename_all = "kebab-case")]
|
||||||
|
pub enum AgentUpgradePhase {
|
||||||
|
/// Normal reconcile loop; no upgrade in flight.
|
||||||
|
Running,
|
||||||
|
/// Refusing to start new services; in-flight reconciles drain.
|
||||||
|
Draining,
|
||||||
|
/// Fetching + verifying the new binary onto disk.
|
||||||
|
Staging,
|
||||||
|
/// Running the staged binary's `--self-test` before any swap.
|
||||||
|
Verifying,
|
||||||
|
/// Symlink swapped, new version started alongside; awaiting the operator's
|
||||||
|
/// stop signal (the operator only sends it after seeing the new version's
|
||||||
|
/// heartbeat — the agent never self-stops).
|
||||||
|
CutoverReady,
|
||||||
|
/// The upgrade did not complete; `last_error` says why. The agent stayed on
|
||||||
|
/// (or reverted to) its previous version.
|
||||||
|
Failed,
|
||||||
|
/// The new version is the active one; this marker is satisfied.
|
||||||
|
Done,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Agent → operator: current upgrade phase + the versions involved. Written to
|
||||||
|
/// KV key `agent-upgrade.<device_id>` in [`crate::BUCKET_AGENT_UPGRADE_STATUS`].
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
pub struct AgentUpgradeStatus {
|
||||||
|
pub device_id: Id,
|
||||||
|
pub phase: AgentUpgradePhase,
|
||||||
|
/// Version the agent is currently running.
|
||||||
|
pub current_version: String,
|
||||||
|
/// Version the agent is upgrading toward (the marker's `desired_version`).
|
||||||
|
/// `None` when phase is `Running` with no marker.
|
||||||
|
#[serde(default)]
|
||||||
|
pub target_version: Option<String>,
|
||||||
|
#[serde(default)]
|
||||||
|
pub last_error: Option<String>,
|
||||||
|
pub at: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn phase_wire_form_is_stable() {
|
||||||
|
// The serialized form lands in NATS KV + the Device CR; the operator
|
||||||
|
// and dashboard key off it. Pin it.
|
||||||
|
let s = |p: AgentUpgradePhase| serde_json::to_value(p).unwrap();
|
||||||
|
assert_eq!(s(AgentUpgradePhase::CutoverReady), "cutover-ready");
|
||||||
|
assert_eq!(s(AgentUpgradePhase::Running), "running");
|
||||||
|
assert_eq!(s(AgentUpgradePhase::Failed), "failed");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn marker_roundtrips() {
|
||||||
|
let m = AgentUpgradeMarker {
|
||||||
|
desired_version: "0.2.0".into(),
|
||||||
|
url: "https://git.nationtech.io/fleet/agent/v0.2.0".into(),
|
||||||
|
sha256: "abc123".into(),
|
||||||
|
};
|
||||||
|
let j = serde_json::to_string(&m).unwrap();
|
||||||
|
assert_eq!(serde_json::from_str::<AgentUpgradeMarker>(&j).unwrap(), m);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,4 +1,4 @@
|
|||||||
use harmony_reconciler_contracts::InventorySnapshot;
|
use harmony_reconciler_contracts::{AgentUpgradePhase, InventorySnapshot};
|
||||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
|
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
|
||||||
use kube::CustomResource;
|
use kube::CustomResource;
|
||||||
use schemars::JsonSchema;
|
use schemars::JsonSchema;
|
||||||
@@ -118,6 +118,28 @@ pub struct DeviceStatus {
|
|||||||
#[serde(skip_serializing_if = "Option::is_none")]
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
pub last_heartbeat: Option<String>,
|
pub last_heartbeat: Option<String>,
|
||||||
pub reachability: Reachability,
|
pub reachability: Reachability,
|
||||||
|
/// Agent version the device is currently running, from its heartbeat
|
||||||
|
/// (ADR-022). `None` for a pre-upgrade agent that doesn't report it.
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub current_version: Option<String>,
|
||||||
|
/// In-flight agent upgrade, reflected from the agent's upgrade status.
|
||||||
|
/// `None` when no upgrade is active.
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub upgrade: Option<DeviceUpgradeStatus>,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Reflection of the agent's upgrade state machine onto the CR, so
|
||||||
|
/// `kubectl get device -o yaml` and the dashboard see upgrade progress.
|
||||||
|
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
|
||||||
|
#[serde(rename_all = "camelCase")]
|
||||||
|
pub struct DeviceUpgradeStatus {
|
||||||
|
/// Live phase of the agent's upgrade state machine. The wire enum itself —
|
||||||
|
/// not a stringified copy — so the set of values can't drift.
|
||||||
|
pub phase: AgentUpgradePhase,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub target_version: Option<String>,
|
||||||
|
#[serde(skip_serializing_if = "Option::is_none")]
|
||||||
|
pub last_error: Option<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Coarse liveness derived from heartbeat freshness. Failing/Pending
|
/// Coarse liveness derived from heartbeat freshness. Failing/Pending
|
||||||
|
|||||||
@@ -13,5 +13,5 @@ pub mod crd;
|
|||||||
|
|
||||||
pub use crd::{
|
pub use crd::{
|
||||||
AggregateLastError, Deployment, DeploymentAggregate, DeploymentSpec, DeploymentStatus, Device,
|
AggregateLastError, Deployment, DeploymentAggregate, DeploymentSpec, DeploymentStatus, Device,
|
||||||
DeviceSpec, DeviceStatus, Reachability, Rollout, RolloutStrategy,
|
DeviceSpec, DeviceStatus, DeviceUpgradeStatus, Reachability, Rollout, RolloutStrategy,
|
||||||
};
|
};
|
||||||
|
|||||||
20
harmony_downloadable_asset/Cargo.toml
Normal file
20
harmony_downloadable_asset/Cargo.toml
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
[package]
|
||||||
|
name = "harmony_downloadable_asset"
|
||||||
|
edition = "2024"
|
||||||
|
version.workspace = true
|
||||||
|
readme.workspace = true
|
||||||
|
license.workspace = true
|
||||||
|
description = "Download a file from a URL with SHA-256 checksum verification (shared by k3d binary install and the fleet agent self-upgrade)"
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
reqwest = { version = "0.12", features = ["stream", "rustls-tls", "http2"], default-features = false }
|
||||||
|
sha2 = "0.10"
|
||||||
|
tokio = { workspace = true, features = ["fs", "io-util"] }
|
||||||
|
futures-util = "0.3"
|
||||||
|
url.workspace = true
|
||||||
|
thiserror.workspace = true
|
||||||
|
tracing.workspace = true
|
||||||
|
|
||||||
|
[dev-dependencies]
|
||||||
|
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
|
||||||
|
httptest = "0.16"
|
||||||
213
harmony_downloadable_asset/src/lib.rs
Normal file
213
harmony_downloadable_asset/src/lib.rs
Normal file
@@ -0,0 +1,213 @@
|
|||||||
|
//! Download a file from a URL and verify its SHA-256 before trusting it.
|
||||||
|
//!
|
||||||
|
//! Shared by k3d's binary install and the fleet agent's self-upgrade
|
||||||
|
//! (ADR-022): both fetch a versioned binary from a release host and must refuse
|
||||||
|
//! anything whose hash doesn't match the pin. Re-downloads are skipped when the
|
||||||
|
//! target already exists with the right checksum.
|
||||||
|
|
||||||
|
use std::io::Read;
|
||||||
|
use std::path::{Path, PathBuf};
|
||||||
|
|
||||||
|
use futures_util::StreamExt;
|
||||||
|
use sha2::{Digest, Sha256};
|
||||||
|
use tokio::fs;
|
||||||
|
use tokio::fs::File;
|
||||||
|
use tokio::io::AsyncWriteExt;
|
||||||
|
use url::Url;
|
||||||
|
|
||||||
|
#[derive(Debug, thiserror::Error)]
|
||||||
|
pub enum DownloadError {
|
||||||
|
#[error("failed to create download directory {0}: {1}")]
|
||||||
|
CreateDir(PathBuf, #[source] std::io::Error),
|
||||||
|
#[error("request to {0} failed: {1}")]
|
||||||
|
Request(Url, #[source] reqwest::Error),
|
||||||
|
#[error("download of {url} failed with status {status}")]
|
||||||
|
Status {
|
||||||
|
url: Url,
|
||||||
|
status: reqwest::StatusCode,
|
||||||
|
},
|
||||||
|
#[error("writing {0} failed: {1}")]
|
||||||
|
Write(PathBuf, #[source] std::io::Error),
|
||||||
|
#[error("error streaming download body: {0}")]
|
||||||
|
Stream(#[source] reqwest::Error),
|
||||||
|
#[error("checksum mismatch for {path}: expected {expected}, got {actual}")]
|
||||||
|
Checksum {
|
||||||
|
path: PathBuf,
|
||||||
|
expected: String,
|
||||||
|
actual: String,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
/// A file to fetch from `url` into `file_name`, gated on `checksum` (lowercase
|
||||||
|
/// hex SHA-256).
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct DownloadableAsset {
|
||||||
|
pub url: Url,
|
||||||
|
pub file_name: String,
|
||||||
|
pub checksum: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DownloadableAsset {
|
||||||
|
/// SHA-256 of `file` as lowercase hex, or `None` if it can't be read.
|
||||||
|
fn sha256_hex(file: &Path) -> Option<String> {
|
||||||
|
let mut f = std::fs::File::open(file).ok()?;
|
||||||
|
let mut hasher = Sha256::new();
|
||||||
|
let mut buf = [0u8; 1024 * 1024];
|
||||||
|
loop {
|
||||||
|
match f.read(&mut buf) {
|
||||||
|
Ok(0) => break,
|
||||||
|
Ok(n) => hasher.update(&buf[..n]),
|
||||||
|
Err(e) => {
|
||||||
|
tracing::warn!(error = %e, "error reading file for checksum");
|
||||||
|
return None;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Some(format!("{:x}", hasher.finalize()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn matches_checksum(&self, file: &Path) -> bool {
|
||||||
|
Self::sha256_hex(file).is_some_and(|h| h == self.checksum)
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Download into `folder/<file_name>`, verifying the checksum. Skips the
|
||||||
|
/// download when the file already exists with the right checksum. Returns
|
||||||
|
/// the path on success.
|
||||||
|
pub async fn download_to_path(&self, folder: PathBuf) -> Result<PathBuf, DownloadError> {
|
||||||
|
if !folder.exists() {
|
||||||
|
fs::create_dir_all(&folder)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DownloadError::CreateDir(folder.clone(), e))?;
|
||||||
|
}
|
||||||
|
let target = folder.join(&self.file_name);
|
||||||
|
|
||||||
|
if self.matches_checksum(&target) {
|
||||||
|
tracing::debug!(
|
||||||
|
?target,
|
||||||
|
"already present with correct checksum; skipping download"
|
||||||
|
);
|
||||||
|
return Ok(target);
|
||||||
|
}
|
||||||
|
|
||||||
|
tracing::debug!(url = %self.url, "downloading");
|
||||||
|
let response = reqwest::Client::new()
|
||||||
|
.get(self.url.clone())
|
||||||
|
.send()
|
||||||
|
.await
|
||||||
|
.map_err(|e| DownloadError::Request(self.url.clone(), e))?;
|
||||||
|
if !response.status().is_success() {
|
||||||
|
return Err(DownloadError::Status {
|
||||||
|
url: self.url.clone(),
|
||||||
|
status: response.status(),
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
let mut file = File::create(&target)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DownloadError::Write(target.clone(), e))?;
|
||||||
|
let mut stream = response.bytes_stream();
|
||||||
|
while let Some(chunk) = stream.next().await {
|
||||||
|
let chunk = chunk.map_err(DownloadError::Stream)?;
|
||||||
|
file.write_all(&chunk)
|
||||||
|
.await
|
||||||
|
.map_err(|e| DownloadError::Write(target.clone(), e))?;
|
||||||
|
}
|
||||||
|
file.flush()
|
||||||
|
.await
|
||||||
|
.map_err(|e| DownloadError::Write(target.clone(), e))?;
|
||||||
|
drop(file);
|
||||||
|
|
||||||
|
match Self::sha256_hex(&target) {
|
||||||
|
Some(actual) if actual == self.checksum => Ok(target),
|
||||||
|
actual => Err(DownloadError::Checksum {
|
||||||
|
path: target,
|
||||||
|
expected: self.checksum.clone(),
|
||||||
|
actual: actual.unwrap_or_else(|| "<unreadable>".to_string()),
|
||||||
|
}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
use httptest::{Expectation, Server, matchers, responders};
|
||||||
|
|
||||||
|
const CONTENT: &str = "This is a test file.";
|
||||||
|
const CONTENT_SHA256: &str = "f29bc64a9d3732b4b9035125fdb3285f5b6455778edca72414671e0ca3b2e0de";
|
||||||
|
|
||||||
|
fn tmp() -> PathBuf {
|
||||||
|
let id = std::time::SystemTime::now()
|
||||||
|
.duration_since(std::time::UNIX_EPOCH)
|
||||||
|
.unwrap()
|
||||||
|
.as_nanos();
|
||||||
|
let p = std::env::temp_dir().join(format!("harmony-dl-{id}"));
|
||||||
|
std::fs::create_dir_all(&p).unwrap();
|
||||||
|
p
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn downloads_and_verifies() {
|
||||||
|
let server = Server::run();
|
||||||
|
server.expect(
|
||||||
|
Expectation::matching(httptest::matchers::request::method_path("GET", "/f.bin"))
|
||||||
|
.respond_with(responders::status_code(200).body(CONTENT)),
|
||||||
|
);
|
||||||
|
let asset = DownloadableAsset {
|
||||||
|
url: Url::parse(&server.url("/f.bin").to_string()).unwrap(),
|
||||||
|
file_name: "f.bin".to_string(),
|
||||||
|
checksum: CONTENT_SHA256.to_string(),
|
||||||
|
};
|
||||||
|
let path = asset.download_to_path(tmp()).await.unwrap();
|
||||||
|
assert_eq!(std::fs::read_to_string(path).unwrap(), CONTENT);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn skips_when_already_present_with_matching_checksum() {
|
||||||
|
let server = Server::run();
|
||||||
|
server.expect(
|
||||||
|
Expectation::matching(matchers::any())
|
||||||
|
.times(0)
|
||||||
|
.respond_with(responders::status_code(200).body(CONTENT)),
|
||||||
|
);
|
||||||
|
let folder = tmp();
|
||||||
|
std::fs::write(folder.join("f.bin"), CONTENT).unwrap();
|
||||||
|
let asset = DownloadableAsset {
|
||||||
|
url: Url::parse(&server.url("/f.bin").to_string()).unwrap(),
|
||||||
|
file_name: "f.bin".to_string(),
|
||||||
|
checksum: CONTENT_SHA256.to_string(),
|
||||||
|
};
|
||||||
|
asset.download_to_path(folder).await.unwrap();
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn rejects_bad_checksum() {
|
||||||
|
let server = Server::run();
|
||||||
|
server.expect(
|
||||||
|
Expectation::matching(matchers::any())
|
||||||
|
.respond_with(responders::status_code(200).body("not the expected content")),
|
||||||
|
);
|
||||||
|
let asset = DownloadableAsset {
|
||||||
|
url: Url::parse(&server.url("/f.bin").to_string()).unwrap(),
|
||||||
|
file_name: "f.bin".to_string(),
|
||||||
|
checksum: CONTENT_SHA256.to_string(),
|
||||||
|
};
|
||||||
|
let err = asset.download_to_path(tmp()).await.unwrap_err();
|
||||||
|
assert!(matches!(err, DownloadError::Checksum { .. }));
|
||||||
|
}
|
||||||
|
|
||||||
|
#[tokio::test]
|
||||||
|
async fn surfaces_http_error() {
|
||||||
|
let server = Server::run();
|
||||||
|
server.expect(
|
||||||
|
Expectation::matching(matchers::any()).respond_with(responders::status_code(404)),
|
||||||
|
);
|
||||||
|
let asset = DownloadableAsset {
|
||||||
|
url: Url::parse(&server.url("/f.bin").to_string()).unwrap(),
|
||||||
|
file_name: "f.bin".to_string(),
|
||||||
|
checksum: CONTENT_SHA256.to_string(),
|
||||||
|
};
|
||||||
|
let err = asset.download_to_path(tmp()).await.unwrap_err();
|
||||||
|
assert!(matches!(err, DownloadError::Status { .. }));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -13,11 +13,9 @@ octocrab = "0.44.0"
|
|||||||
regex = "1.11.1"
|
regex = "1.11.1"
|
||||||
reqwest = { version = "0.12", features = ["stream", "rustls-tls", "http2"], default-features = false }
|
reqwest = { version = "0.12", features = ["stream", "rustls-tls", "http2"], default-features = false }
|
||||||
url.workspace = true
|
url.workspace = true
|
||||||
sha2 = "0.10.8"
|
|
||||||
futures-util = "0.3.31"
|
|
||||||
kube.workspace = true
|
kube.workspace = true
|
||||||
|
harmony_downloadable_asset = { path = "../harmony_downloadable_asset" }
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
env_logger = { workspace = true }
|
env_logger = { workspace = true }
|
||||||
httptest = "0.16.3"
|
|
||||||
pretty_assertions = "1.4.1"
|
pretty_assertions = "1.4.1"
|
||||||
|
|||||||
@@ -1,303 +0,0 @@
|
|||||||
use futures_util::StreamExt;
|
|
||||||
use log::{debug, warn};
|
|
||||||
use sha2::{Digest, Sha256};
|
|
||||||
use std::io::Read;
|
|
||||||
use std::path::PathBuf;
|
|
||||||
use tokio::fs;
|
|
||||||
use tokio::fs::File;
|
|
||||||
use tokio::io::AsyncWriteExt;
|
|
||||||
use url::Url;
|
|
||||||
|
|
||||||
const CHECKSUM_FAILED_MSG: &str = "Downloaded file failed checksum verification";
|
|
||||||
|
|
||||||
/// Represents an asset that can be downloaded from a URL with checksum verification.
|
|
||||||
///
|
|
||||||
/// This struct facilitates secure downloading of files from remote URLs by
|
|
||||||
/// verifying the integrity of the downloaded content using SHA-256 checksums.
|
|
||||||
/// It handles downloading the file, saving it to disk, and verifying the checksum matches
|
|
||||||
/// the expected value.
|
|
||||||
///
|
|
||||||
/// # Examples
|
|
||||||
///
|
|
||||||
/// ```compile_fail
|
|
||||||
/// # use url::Url;
|
|
||||||
/// # use std::path::PathBuf;
|
|
||||||
///
|
|
||||||
/// # async fn example() -> Result<(), String> {
|
|
||||||
/// let asset = DownloadableAsset {
|
|
||||||
/// url: Url::parse("https://example.com/file.zip").unwrap(),
|
|
||||||
/// file_name: "file.zip".to_string(),
|
|
||||||
/// checksum: "a1b2c3d4e5f6...".to_string(),
|
|
||||||
/// };
|
|
||||||
///
|
|
||||||
/// let download_dir = PathBuf::from("/tmp/downloads");
|
|
||||||
/// let file_path = asset.download_to_path(download_dir).await?;
|
|
||||||
/// # Ok(())
|
|
||||||
/// # }
|
|
||||||
/// ```
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub(crate) struct DownloadableAsset {
|
|
||||||
pub(crate) url: Url,
|
|
||||||
pub(crate) file_name: String,
|
|
||||||
pub(crate) checksum: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl DownloadableAsset {
|
|
||||||
fn verify_checksum(&self, file: PathBuf) -> bool {
|
|
||||||
if !file.exists() {
|
|
||||||
debug!("File does not exist: {:?}", file);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut file = match std::fs::File::open(&file) {
|
|
||||||
Ok(file) => file,
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Failed to open file for checksum verification: {:?}", e);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
let mut hasher = Sha256::new();
|
|
||||||
let mut buffer = [0; 1024 * 1024]; // 1MB buffer
|
|
||||||
|
|
||||||
loop {
|
|
||||||
let bytes_read = match file.read(&mut buffer) {
|
|
||||||
Ok(0) => break,
|
|
||||||
Ok(n) => n,
|
|
||||||
Err(e) => {
|
|
||||||
warn!("Error reading file for checksum: {:?}", e);
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
hasher.update(&buffer[..bytes_read]);
|
|
||||||
}
|
|
||||||
|
|
||||||
let result = hasher.finalize();
|
|
||||||
let calculated_hash = format!("{:x}", result);
|
|
||||||
|
|
||||||
debug!("Expected checksum: {}", self.checksum);
|
|
||||||
debug!("Calculated checksum: {}", calculated_hash);
|
|
||||||
|
|
||||||
calculated_hash == self.checksum
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Downloads the asset to the specified directory, verifying its checksum.
|
|
||||||
///
|
|
||||||
/// This function will:
|
|
||||||
/// 1. Create the target directory if it doesn't exist
|
|
||||||
/// 2. Check if the file already exists with the correct checksum
|
|
||||||
/// 3. If not, download the file from the URL
|
|
||||||
/// 4. Verify the downloaded file's checksum matches the expected value
|
|
||||||
///
|
|
||||||
/// # Arguments
|
|
||||||
///
|
|
||||||
/// * `folder` - The directory path where the file should be saved
|
|
||||||
///
|
|
||||||
/// # Returns
|
|
||||||
///
|
|
||||||
/// * `Ok(PathBuf)` - The path to the downloaded file on success
|
|
||||||
/// * `Err(String)` - A descriptive error message if the download or verification fails
|
|
||||||
///
|
|
||||||
/// # Errors
|
|
||||||
///
|
|
||||||
/// This function will return an error if:
|
|
||||||
/// - The network request fails
|
|
||||||
/// - The server responds with a non-success status code
|
|
||||||
/// - Writing to disk fails
|
|
||||||
/// - The checksum verification fails
|
|
||||||
pub(crate) async fn download_to_path(&self, folder: PathBuf) -> Result<PathBuf, String> {
|
|
||||||
if !folder.exists() {
|
|
||||||
fs::create_dir_all(&folder)
|
|
||||||
.await
|
|
||||||
.expect("Failed to create download directory");
|
|
||||||
}
|
|
||||||
|
|
||||||
let target_file_path = folder.join(&self.file_name);
|
|
||||||
debug!("Downloading to path: {:?}", target_file_path);
|
|
||||||
|
|
||||||
if self.verify_checksum(target_file_path.clone()) {
|
|
||||||
debug!("File already exists with correct checksum, skipping download");
|
|
||||||
return Ok(target_file_path);
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("Downloading from URL: {}", self.url);
|
|
||||||
let client = reqwest::Client::new();
|
|
||||||
let response = client
|
|
||||||
.get(self.url.clone())
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("Failed to download file: {e}"))?;
|
|
||||||
|
|
||||||
if !response.status().is_success() {
|
|
||||||
return Err(format!(
|
|
||||||
"Failed to download file, status: {}",
|
|
||||||
response.status()
|
|
||||||
));
|
|
||||||
}
|
|
||||||
|
|
||||||
let mut file = File::create(&target_file_path)
|
|
||||||
.await
|
|
||||||
.expect("Failed to create target file");
|
|
||||||
|
|
||||||
let mut stream = response.bytes_stream();
|
|
||||||
while let Some(chunk_result) = stream.next().await {
|
|
||||||
let chunk = chunk_result.expect("Error while downloading file");
|
|
||||||
file.write_all(&chunk)
|
|
||||||
.await
|
|
||||||
.expect("Failed to write data to file");
|
|
||||||
}
|
|
||||||
|
|
||||||
file.flush().await.expect("Failed to flush file");
|
|
||||||
drop(file);
|
|
||||||
|
|
||||||
if !self.verify_checksum(target_file_path.clone()) {
|
|
||||||
return Err(CHECKSUM_FAILED_MSG.to_string());
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
"File downloaded and verified successfully: {}",
|
|
||||||
target_file_path.to_string_lossy()
|
|
||||||
);
|
|
||||||
Ok(target_file_path)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use httptest::{
|
|
||||||
matchers::{self, request},
|
|
||||||
responders, Expectation, Server,
|
|
||||||
};
|
|
||||||
|
|
||||||
const BASE_TEST_PATH: &str = "/tmp/harmony-test-k3d-download";
|
|
||||||
const TEST_CONTENT: &str = "This is a test file.";
|
|
||||||
const TEST_CONTENT_HASH: &str =
|
|
||||||
"f29bc64a9d3732b4b9035125fdb3285f5b6455778edca72414671e0ca3b2e0de";
|
|
||||||
|
|
||||||
fn setup_test() -> (PathBuf, Server) {
|
|
||||||
let _ = env_logger::builder().try_init();
|
|
||||||
|
|
||||||
// Create unique test directory
|
|
||||||
let test_id = std::time::SystemTime::now()
|
|
||||||
.duration_since(std::time::UNIX_EPOCH)
|
|
||||||
.unwrap()
|
|
||||||
.as_millis();
|
|
||||||
let download_path = format!("{}/test_{}", BASE_TEST_PATH, test_id);
|
|
||||||
std::fs::create_dir_all(&download_path).unwrap();
|
|
||||||
|
|
||||||
(PathBuf::from(download_path), Server::run())
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_download_to_path_success() {
|
|
||||||
let (folder, server) = setup_test();
|
|
||||||
|
|
||||||
server.expect(
|
|
||||||
Expectation::matching(request::method_path("GET", "/test.txt"))
|
|
||||||
.respond_with(responders::status_code(200).body(TEST_CONTENT)),
|
|
||||||
);
|
|
||||||
|
|
||||||
let asset = DownloadableAsset {
|
|
||||||
url: Url::parse(&server.url("/test.txt").to_string()).unwrap(),
|
|
||||||
file_name: "test.txt".to_string(),
|
|
||||||
checksum: TEST_CONTENT_HASH.to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let result = asset
|
|
||||||
.download_to_path(folder.join("success"))
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let downloaded_content = std::fs::read_to_string(result).unwrap();
|
|
||||||
assert_eq!(downloaded_content, TEST_CONTENT);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_download_to_path_already_exists() {
|
|
||||||
let (folder, server) = setup_test();
|
|
||||||
|
|
||||||
server.expect(
|
|
||||||
Expectation::matching(matchers::any())
|
|
||||||
.times(0)
|
|
||||||
.respond_with(responders::status_code(200).body(TEST_CONTENT)),
|
|
||||||
);
|
|
||||||
|
|
||||||
let asset = DownloadableAsset {
|
|
||||||
url: Url::parse(&server.url("/test.txt").to_string()).unwrap(),
|
|
||||||
file_name: "test.txt".to_string(),
|
|
||||||
checksum: TEST_CONTENT_HASH.to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let target_file_path = folder.join(&asset.file_name);
|
|
||||||
std::fs::write(&target_file_path, TEST_CONTENT).unwrap();
|
|
||||||
|
|
||||||
let result = asset.download_to_path(folder).await.unwrap();
|
|
||||||
let content = std::fs::read_to_string(result).unwrap();
|
|
||||||
assert_eq!(content, TEST_CONTENT);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_download_to_path_server_error() {
|
|
||||||
let (folder, server) = setup_test();
|
|
||||||
|
|
||||||
server.expect(
|
|
||||||
Expectation::matching(matchers::any()).respond_with(responders::status_code(404)),
|
|
||||||
);
|
|
||||||
|
|
||||||
let asset = DownloadableAsset {
|
|
||||||
url: Url::parse(&server.url("/test.txt").to_string()).unwrap(),
|
|
||||||
file_name: "test.txt".to_string(),
|
|
||||||
checksum: TEST_CONTENT_HASH.to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let result = asset.download_to_path(folder.join("error")).await;
|
|
||||||
assert!(result.is_err());
|
|
||||||
assert!(result.unwrap_err().contains("status: 404"));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_download_to_path_checksum_failure() {
|
|
||||||
let (folder, server) = setup_test();
|
|
||||||
|
|
||||||
let invalid_content = "This is NOT the expected content";
|
|
||||||
server.expect(
|
|
||||||
Expectation::matching(matchers::any())
|
|
||||||
.respond_with(responders::status_code(200).body(invalid_content)),
|
|
||||||
);
|
|
||||||
|
|
||||||
let asset = DownloadableAsset {
|
|
||||||
url: Url::parse(&server.url("/test.txt").to_string()).unwrap(),
|
|
||||||
file_name: "test.txt".to_string(),
|
|
||||||
checksum: TEST_CONTENT_HASH.to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let join_handle =
|
|
||||||
tokio::spawn(async move { asset.download_to_path(folder.join("failure")).await });
|
|
||||||
|
|
||||||
assert_eq!(
|
|
||||||
join_handle.await.unwrap().err().unwrap(),
|
|
||||||
CHECKSUM_FAILED_MSG
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_download_with_specific_path_matcher() {
|
|
||||||
let (folder, server) = setup_test();
|
|
||||||
|
|
||||||
server.expect(
|
|
||||||
Expectation::matching(matchers::request::path("/specific/path.txt"))
|
|
||||||
.respond_with(responders::status_code(200).body(TEST_CONTENT)),
|
|
||||||
);
|
|
||||||
|
|
||||||
let asset = DownloadableAsset {
|
|
||||||
url: Url::parse(&server.url("/specific/path.txt").to_string()).unwrap(),
|
|
||||||
file_name: "path.txt".to_string(),
|
|
||||||
checksum: TEST_CONTENT_HASH.to_string(),
|
|
||||||
};
|
|
||||||
|
|
||||||
let result = asset.download_to_path(folder).await.unwrap();
|
|
||||||
let downloaded_content = std::fs::read_to_string(result).unwrap();
|
|
||||||
assert_eq!(downloaded_content, TEST_CONTENT);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,5 +1,4 @@
|
|||||||
mod downloadable_asset;
|
use harmony_downloadable_asset::DownloadableAsset;
|
||||||
use downloadable_asset::*;
|
|
||||||
|
|
||||||
use kube::Client;
|
use kube::Client;
|
||||||
use log::{debug, info};
|
use log::{debug, info};
|
||||||
@@ -134,7 +133,10 @@ impl K3d {
|
|||||||
|
|
||||||
let release_binary = self.get_binary_for_current_platform(latest_release).await;
|
let release_binary = self.get_binary_for_current_platform(latest_release).await;
|
||||||
debug!("Foudn K3d binary to install : {release_binary:#?}");
|
debug!("Foudn K3d binary to install : {release_binary:#?}");
|
||||||
release_binary.download_to_path(self.base_dir.clone()).await
|
release_binary
|
||||||
|
.download_to_path(self.base_dir.clone())
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.to_string())
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO : Make sure this will only find actual released versions, no prereleases or test
|
// TODO : Make sure this will only find actual released versions, no prereleases or test
|
||||||
|
|||||||
Reference in New Issue
Block a user