diff --git a/Cargo.lock b/Cargo.lock index 16f386f1..c5cd3715 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3726,6 +3726,16 @@ dependencies = [ "tower", ] +[[package]] +name = "harmony-reconciler-contracts" +version = "0.1.0" +dependencies = [ + "chrono", + "harmony_types", + "serde", + "serde_json", +] + [[package]] name = "harmony_agent" version = "0.1.0" @@ -4710,6 +4720,7 @@ dependencies = [ "clap", "futures-util", "harmony", + "harmony-reconciler-contracts", "serde", "serde_json", "tokio", @@ -4726,6 +4737,7 @@ dependencies = [ "async-nats", "clap", "futures-util", + "harmony-reconciler-contracts", "k8s-openapi", "kube", "schemars 0.8.22", diff --git a/Cargo.toml b/Cargo.toml index 929354b0..1e9eeaf8 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -30,6 +30,7 @@ members = [ "harmony_assets", "opnsense-codegen", "opnsense-api", "iot/iot-operator-v0", "iot/iot-agent-v0", + "harmony-reconciler-contracts", ] [workspace.package] diff --git a/harmony-reconciler-contracts/Cargo.toml b/harmony-reconciler-contracts/Cargo.toml new file mode 100644 index 00000000..fc52cdb7 --- /dev/null +++ b/harmony-reconciler-contracts/Cargo.toml @@ -0,0 +1,20 @@ +[package] +name = "harmony-reconciler-contracts" +version = "0.1.0" +edition = "2024" +license.workspace = true + +# Cross-boundary types shared between a harmony operator (central, +# writing desired state to NATS JetStream KV) and a harmony agent +# (on-host, watching KV and reconciling). Deliberately lean: pure +# serde data types, bucket/key constants, small helpers. No tokio, +# no async-nats, no harmony. The on-device agent build pulls this +# in alongside a minimal async-nats client; the operator pulls it +# alongside kube-rs; harmony itself treats it as just another +# module. None of those consumers should pay for the others' deps. + +[dependencies] +chrono = { workspace = true, features = ["serde"] } +harmony_types = { path = "../harmony_types" } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } diff --git a/harmony-reconciler-contracts/src/kv.rs b/harmony-reconciler-contracts/src/kv.rs new file mode 100644 index 00000000..c773eba4 --- /dev/null +++ b/harmony-reconciler-contracts/src/kv.rs @@ -0,0 +1,53 @@ +//! NATS JetStream KV bucket names and key formats used by the +//! harmony reconciler pattern. +//! +//! Hard-coded literals previously lived in five places (agent and +//! operator main.rs, operator deploy YAML, two smoke scripts). +//! Changing any of them meant hunting for the others. They now live +//! here; agent + operator consume the constants directly, and smoke +//! scripts grep for the literal values locked in the tests below. + +/// Operator-written bucket. One entry per `(device, deployment)` pair. +/// Values are the JSON-serialized Score envelope — today +/// `harmony::modules::podman::IotScore`, tomorrow any variant of +/// a polymorphic `Score` enum the framework ships. +pub const BUCKET_DESIRED_STATE: &str = "desired-state"; + +/// Agent-written bucket. One entry per device at `status.`. +/// Values are JSON-serialized [`crate::AgentStatus`]. +pub const BUCKET_AGENT_STATUS: &str = "agent-status"; + +/// KV key for a `(device, deployment)` pair in [`BUCKET_DESIRED_STATE`]. +/// Format: `.`. +pub fn desired_state_key(device_id: &str, deployment_name: &str) -> String { + format!("{device_id}.{deployment_name}") +} + +/// KV key for a device's last-known status in [`BUCKET_AGENT_STATUS`]. +/// Format: `status.`. +pub fn status_key(device_id: &str) -> String { + format!("status.{device_id}") +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn desired_state_key_format() { + assert_eq!(desired_state_key("pi-01", "hello-web"), "pi-01.hello-web"); + } + + #[test] + fn status_key_format() { + assert_eq!(status_key("pi-01"), "status.pi-01"); + } + + #[test] + fn bucket_names_match_smoke_scripts() { + // These strings are also grepped by iot/scripts/smoke-*.sh — + // flipping them here must be paired with a script update. + assert_eq!(BUCKET_DESIRED_STATE, "desired-state"); + assert_eq!(BUCKET_AGENT_STATUS, "agent-status"); + } +} diff --git a/harmony-reconciler-contracts/src/lib.rs b/harmony-reconciler-contracts/src/lib.rs new file mode 100644 index 00000000..24aeeae4 --- /dev/null +++ b/harmony-reconciler-contracts/src/lib.rs @@ -0,0 +1,31 @@ +//! Cross-boundary types for harmony's reconciler pattern. +//! +//! Harmony's "reconciler" pattern is: a central **operator** writes +//! desired state into NATS JetStream KV; a remote **agent** watches +//! the KV, deserializes each entry as a Score, and drives the host +//! toward that state. This split lets one operator orchestrate a +//! fleet of agents across network boundaries it can't reach +//! directly — IoT devices today, OKD cluster agents or edge-compute +//! reconcilers tomorrow. +//! +//! This crate holds the wire-format bits both sides must agree on: +//! NATS bucket names, KV key formats, and the `AgentStatus` +//! heartbeat payload. The Score types themselves (`PodmanV0Score`, +//! future variants) live in their respective harmony modules — +//! consumers import them from there and serialize them over the +//! transport this crate describes. +//! +//! **Deliberately lean** — no tokio, no async-nats, no harmony. +//! The on-device agent build pulls it in alongside a minimal +//! async-nats client; the operator pulls it alongside kube-rs. +//! Neither should pay for the other's dependencies. + +pub mod kv; +pub mod status; + +pub use kv::{BUCKET_AGENT_STATUS, BUCKET_DESIRED_STATE, desired_state_key, status_key}; +pub use status::AgentStatus; + +// Re-exports so consumers (agent, operator) don't need a direct +// harmony_types dependency purely to name the cross-boundary types. +pub use harmony_types::id::Id; diff --git a/harmony-reconciler-contracts/src/status.rs b/harmony-reconciler-contracts/src/status.rs new file mode 100644 index 00000000..e57a1e53 --- /dev/null +++ b/harmony-reconciler-contracts/src/status.rs @@ -0,0 +1,74 @@ +//! Agent → NATS KV status payload. +//! +//! The agent publishes a heartbeat + rollup status to the +//! `agent-status` bucket every 30 s (see +//! [`crate::BUCKET_AGENT_STATUS`]). Today the payload is intentionally +//! minimal — a single `"running"` state + a timestamp — so the +//! operator can implement §12 v0.1 "Status aggregation in operator" +//! without waiting on richer per-workload reporting. +//! +//! When the agent grows richer status (per-container state, rollout +//! progress) this struct gains fields with `#[serde(default)]`; old +//! operators keep working against newer agents. + +use chrono::{DateTime, Utc}; +use harmony_types::id::Id; +use serde::{Deserialize, Serialize}; + +/// A single heartbeat published by the agent at +/// `status.` in the `agent-status` bucket. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct AgentStatus { + /// Echoed from the agent's own config so the operator can + /// cross-check which device it came from if the KV key is ever + /// ambiguous. Serializes transparently as a plain string. + pub device_id: Id, + /// Coarse rollup state. v0 only ever writes `"running"`; richer + /// variants are a v0.1+ concern. A String (not an enum) so old + /// operators parsing this payload don't fail on a new variant. + pub status: String, + /// RFC 3339 UTC timestamp. Used by the smoke test's reboot- + /// detection gate — any timestamp strictly greater than the gate + /// is evidence of a post-reboot write. `chrono::DateTime` + /// serde-serializes as RFC 3339, so the wire format stays + /// lex-comparable (the smoke's string `>` still works). + pub timestamp: DateTime, +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn status_roundtrip() { + let s = AgentStatus { + device_id: Id::from("pi-01".to_string()), + status: "running".to_string(), + timestamp: DateTime::parse_from_rfc3339("2026-04-21T18:15:42Z") + .unwrap() + .with_timezone(&Utc), + }; + let json = serde_json::to_string(&s).unwrap(); + let back: AgentStatus = serde_json::from_str(&json).unwrap(); + assert_eq!(s, back); + } + + #[test] + fn status_has_expected_wire_keys() { + let s = AgentStatus { + device_id: Id::from("pi-01".to_string()), + status: "running".to_string(), + timestamp: DateTime::parse_from_rfc3339("2026-04-21T18:15:42Z") + .unwrap() + .with_timezone(&Utc), + }; + let json = serde_json::to_string(&s).unwrap(); + // device_id must serialize as a flat string (not {"value": …}). + // Relies on `#[serde(transparent)]` on `harmony_types::id::Id`. + assert!(json.contains("\"device_id\":\"pi-01\""), "got {json}"); + assert!(json.contains("\"status\":\"running\"")); + // RFC 3339 output — the smoke script greps a `"timestamp":""` + // literal and compares lexicographically against a gate. + assert!(json.contains("\"timestamp\":\"2026-04-21T18:15:42Z\"")); + } +} diff --git a/harmony/src/modules/iot/assets.rs b/harmony/src/modules/iot/assets.rs index 2e1f405e..dcbe1bf9 100644 --- a/harmony/src/modules/iot/assets.rs +++ b/harmony/src/modules/iot/assets.rs @@ -242,14 +242,16 @@ async fn provision_ssh_keypair() -> Result { info!("generating ed25519 ssh keypair at {priv_path:?} (one-time)"); let status = Command::new("ssh-keygen") - .arg("-t") - .arg("ed25519") - .arg("-N") - .arg("") // no passphrase - .arg("-C") - .arg("harmony-iot-smoke") - .arg("-f") - .arg(&priv_path) + .args([ + "-t", + "ed25519", + "-N", + "", // no passphrase + "-C", + "harmony-iot-smoke", + "-f", + ]) + .arg(&priv_path) // PathBuf — kept separate so we don't force &str conversion .stdout(Stdio::null()) .stderr(Stdio::piped()) .output() diff --git a/harmony/src/modules/podman/score.rs b/harmony/src/modules/podman/score.rs index a8ee309f..e795cf0c 100644 --- a/harmony/src/modules/podman/score.rs +++ b/harmony/src/modules/podman/score.rs @@ -1,3 +1,12 @@ +//! Podman Score types and their `Score` / `Interpret` bindings. +//! +//! `ContainerRuntime` is a first-class harmony capability (see +//! [`crate::topology::ContainerRuntime`]); the types that describe a +//! set of containers a caller wants running live here next to the +//! trait impls that route them to interpretation. These types are +//! independent of any particular product (IoT fleet, OKD fleet, etc.) +//! — callers serialize them over whatever transport they like. + use serde::{Deserialize, Serialize}; use crate::{ @@ -8,6 +17,7 @@ use crate::{ use super::interpret::PodmanV0Interpret; +/// A single container managed by podman on the target host. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct PodmanService { pub name: String, @@ -15,28 +25,34 @@ pub struct PodmanService { pub ports: Vec, } +/// v0 Score for podman-based workloads. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct PodmanV0Score { pub services: Vec, } impl PodmanV0Score { - /// Label value applied to every container this score creates, used by - /// [`ContainerRuntime::list_managed_services`] to enumerate the set of - /// containers that belong to a specific Score (e.g. so the agent can - /// remove them when the corresponding KV entry is deleted). The label - /// key is [`crate::modules::podman::DEPLOYMENT_LABEL`]. + /// Label value applied to every container this Score creates. The + /// caller uses the label to enumerate the set of containers that + /// belong to a given Score instance (e.g. to tear them down when + /// the corresponding desired-state entry is deleted). /// - /// For v0 there is a single PodmanV0Score per KV key and no multi-score - /// scheduling on the device, so a stable hash over the service names - /// is an adequate identifier. When v0.1 introduces multiple scores per - /// device the caller will pass an explicit deployment id. + /// For v0 there is a single PodmanV0Score per desired-state key + /// and no multi-score scheduling on the target host, so a stable + /// join over the service names is an adequate identifier. When + /// v0.1 introduces multiple scores per host the caller will pass + /// an explicit deployment id. pub fn deployment_label(&self) -> String { let names: Vec<&str> = self.services.iter().map(|s| s.name.as_str()).collect(); names.join(",") } } +/// Wire envelope for a reconciler-style Score payload: externally +/// tagged so the JSON is `{"type": "PodmanV0", "data": { ... }}`. +/// Adding a new variant is additive — emitters stay opaque routers, +/// consumers learn the new variant, older consumers harmlessly +/// log-and-skip the unknown tag. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] #[serde(tag = "type", content = "data")] pub enum IotScore { @@ -105,4 +121,23 @@ mod tests { let deserialized: IotScore = serde_json::from_str(&serialized).unwrap(); assert_eq!(score, deserialized); } + + #[test] + fn deployment_label_joins_service_names() { + let score = PodmanV0Score { + services: vec![ + PodmanService { + name: "web".to_string(), + image: "nginx".to_string(), + ports: vec![], + }, + PodmanService { + name: "api".to_string(), + image: "myapp".to_string(), + ports: vec![], + }, + ], + }; + assert_eq!(score.deployment_label(), "web,api"); + } } diff --git a/harmony_types/src/id.rs b/harmony_types/src/id.rs index 748c1050..8411aece 100644 --- a/harmony_types/src/id.rs +++ b/harmony_types/src/id.rs @@ -20,6 +20,7 @@ use serde::{Deserialize, Serialize}; /// **It is not meant to be very secure or unique**, it is suitable to generate up to 10 000 items per /// second with a reasonable collision rate of 0,000014 % as calculated by this calculator : https://kevingal.com/apps/collision.html #[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize)] +#[serde(transparent)] pub struct Id { value: String, } diff --git a/iot/iot-agent-v0/Cargo.toml b/iot/iot-agent-v0/Cargo.toml index c56d847d..f90e9e65 100644 --- a/iot/iot-agent-v0/Cargo.toml +++ b/iot/iot-agent-v0/Cargo.toml @@ -5,6 +5,7 @@ edition = "2024" rust-version = "1.85" [dependencies] +harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" } harmony = { path = "../../harmony", default-features = false, features = ["podman"] } async-nats = { workspace = true } chrono = { workspace = true } diff --git a/iot/iot-agent-v0/src/config.rs b/iot/iot-agent-v0/src/config.rs index 7dd4ae73..e0c8291f 100644 --- a/iot/iot-agent-v0/src/config.rs +++ b/iot/iot-agent-v0/src/config.rs @@ -1,3 +1,4 @@ +use harmony_reconciler_contracts::Id; use serde::Deserialize; use std::path::Path; @@ -10,7 +11,9 @@ pub struct AgentConfig { #[derive(Debug, Clone, Deserialize)] pub struct AgentSection { - pub device_id: String, + /// Cross-boundary device identity. TOML deserializes the field + /// as a bare string thanks to `#[serde(transparent)]` on `Id`. + pub device_id: Id, } #[derive(Debug, Clone, Deserialize)] diff --git a/iot/iot-agent-v0/src/main.rs b/iot/iot-agent-v0/src/main.rs index c75151ba..2d386aab 100644 --- a/iot/iot-agent-v0/src/main.rs +++ b/iot/iot-agent-v0/src/main.rs @@ -8,6 +8,9 @@ use anyhow::{Context, Result}; use clap::Parser; use config::{AgentConfig, CredentialSource, TomlFileCredentialSource}; use futures_util::StreamExt; +use harmony_reconciler_contracts::{ + AgentStatus, BUCKET_AGENT_STATUS, BUCKET_DESIRED_STATE, Id, status_key, +}; use harmony::inventory::Inventory; use harmony::modules::podman::PodmanTopology; @@ -42,13 +45,13 @@ async fn connect_nats(cfg: &AgentConfig) -> Result { async fn watch_desired_state( client: async_nats::Client, - device_id: String, + device_id: Id, reconciler: Arc, ) -> Result<()> { let jetstream = async_nats::jetstream::new(client); let bucket = jetstream .create_key_value(async_nats::jetstream::kv::Config { - bucket: "desired-state".to_string(), + bucket: BUCKET_DESIRED_STATE.to_string(), ..Default::default() }) .await?; @@ -82,26 +85,27 @@ async fn watch_desired_state( Ok(()) } -async fn report_status(client: async_nats::Client, device_id: String) -> Result<()> { +async fn report_status(client: async_nats::Client, device_id: Id) -> Result<()> { let jetstream = async_nats::jetstream::new(client); let bucket = jetstream .create_key_value(async_nats::jetstream::kv::Config { - bucket: "agent-status".to_string(), + bucket: BUCKET_AGENT_STATUS.to_string(), ..Default::default() }) .await?; - let key = format!("status.{}", device_id); + let key = status_key(&device_id.to_string()); let mut interval = tokio::time::interval(Duration::from_secs(30)); loop { interval.tick().await; - let status = serde_json::json!({ - "device_id": device_id, - "status": "running", - "timestamp": chrono::Utc::now().to_rfc3339(), - }); - bucket.put(&key, status.to_string().into()).await?; + let status = AgentStatus { + device_id: device_id.clone(), + status: "running".to_string(), + timestamp: chrono::Utc::now(), + }; + let payload = serde_json::to_vec(&status)?; + bucket.put(&key, payload.into()).await?; tracing::debug!(key = %key, "reported status"); } } diff --git a/iot/iot-operator-v0/Cargo.toml b/iot/iot-operator-v0/Cargo.toml index c96e06e3..0ebafed7 100644 --- a/iot/iot-operator-v0/Cargo.toml +++ b/iot/iot-operator-v0/Cargo.toml @@ -5,6 +5,7 @@ edition = "2024" rust-version = "1.85" [dependencies] +harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" } kube = { workspace = true, features = ["runtime", "derive"] } k8s-openapi.workspace = true async-nats = { workspace = true } diff --git a/iot/iot-operator-v0/src/controller.rs b/iot/iot-operator-v0/src/controller.rs index 5c0eb09b..54cc37a2 100644 --- a/iot/iot-operator-v0/src/controller.rs +++ b/iot/iot-operator-v0/src/controller.rs @@ -3,6 +3,7 @@ use std::time::Duration; use async_nats::jetstream::kv::Store; use futures_util::StreamExt; +use harmony_reconciler_contracts::desired_state_key; use kube::api::{Patch, PatchParams}; use kube::runtime::Controller; use kube::runtime::controller::Action; @@ -127,7 +128,7 @@ fn serialize_score(score: &ScorePayload) -> Result { } fn kv_key(device_id: &str, deployment_name: &str) -> String { - format!("{device_id}.{deployment_name}") + desired_state_key(device_id, deployment_name) } fn error_policy(_obj: Arc, err: &Error, _ctx: Arc) -> Action { diff --git a/iot/iot-operator-v0/src/main.rs b/iot/iot-operator-v0/src/main.rs index 6ee5a787..230a639f 100644 --- a/iot/iot-operator-v0/src/main.rs +++ b/iot/iot-operator-v0/src/main.rs @@ -4,6 +4,7 @@ mod crd; use anyhow::Result; use async_nats::jetstream; use clap::{Parser, Subcommand}; +use harmony_reconciler_contracts::BUCKET_DESIRED_STATE; use kube::{Client, CustomResourceExt}; use crate::crd::Deployment; @@ -28,7 +29,7 @@ struct Cli { #[arg( long, env = "KV_BUCKET", - default_value = "desired-state", + default_value = BUCKET_DESIRED_STATE, global = true )] kv_bucket: String,