refactor(iot): extract iot-contracts crate for cross-boundary types #270

Merged
johnride merged 5 commits from feat/iot-contracts into feat/iot-walking-skeleton 2026-04-21 20:13:17 +00:00
15 changed files with 271 additions and 31 deletions

12
Cargo.lock generated
View File

@@ -3726,6 +3726,16 @@ dependencies = [
"tower",
]
[[package]]
name = "harmony-reconciler-contracts"
version = "0.1.0"
dependencies = [
"chrono",
"harmony_types",
"serde",
"serde_json",
]
[[package]]
name = "harmony_agent"
version = "0.1.0"
@@ -4710,6 +4720,7 @@ dependencies = [
"clap",
"futures-util",
"harmony",
"harmony-reconciler-contracts",
"serde",
"serde_json",
"tokio",
@@ -4726,6 +4737,7 @@ dependencies = [
"async-nats",
"clap",
"futures-util",
"harmony-reconciler-contracts",
"k8s-openapi",
"kube",
"schemars 0.8.22",

View File

@@ -30,6 +30,7 @@ members = [
"harmony_assets", "opnsense-codegen", "opnsense-api",
"iot/iot-operator-v0",
"iot/iot-agent-v0",
"harmony-reconciler-contracts",
]
[workspace.package]

View File

@@ -0,0 +1,20 @@
[package]
name = "harmony-reconciler-contracts"
version = "0.1.0"
edition = "2024"
license.workspace = true
# Cross-boundary types shared between a harmony operator (central,
# writing desired state to NATS JetStream KV) and a harmony agent
# (on-host, watching KV and reconciling). Deliberately lean: pure
# serde data types, bucket/key constants, small helpers. No tokio,
# no async-nats, no harmony. The on-device agent build pulls this
# in alongside a minimal async-nats client; the operator pulls it
# alongside kube-rs; harmony itself treats it as just another
# module. None of those consumers should pay for the others' deps.
[dependencies]
chrono = { workspace = true, features = ["serde"] }
harmony_types = { path = "../harmony_types" }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }

View File

@@ -0,0 +1,53 @@
//! NATS JetStream KV bucket names and key formats used by the
//! harmony reconciler pattern.
//!
//! Hard-coded literals previously lived in five places (agent and
//! operator main.rs, operator deploy YAML, two smoke scripts).
//! Changing any of them meant hunting for the others. They now live
//! here; agent + operator consume the constants directly, and smoke
//! scripts grep for the literal values locked in the tests below.
/// Operator-written bucket. One entry per `(device, deployment)` pair.
/// Values are the JSON-serialized Score envelope — today
/// `harmony::modules::podman::IotScore`, tomorrow any variant of
/// a polymorphic `Score` enum the framework ships.
pub const BUCKET_DESIRED_STATE: &str = "desired-state";
/// Agent-written bucket. One entry per device at `status.<device_id>`.
/// Values are JSON-serialized [`crate::AgentStatus`].
pub const BUCKET_AGENT_STATUS: &str = "agent-status";
/// KV key for a `(device, deployment)` pair in [`BUCKET_DESIRED_STATE`].
/// Format: `<device>.<deployment>`.
pub fn desired_state_key(device_id: &str, deployment_name: &str) -> String {
format!("{device_id}.{deployment_name}")
}
/// KV key for a device's last-known status in [`BUCKET_AGENT_STATUS`].
/// Format: `status.<device_id>`.
pub fn status_key(device_id: &str) -> String {
format!("status.{device_id}")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn desired_state_key_format() {
assert_eq!(desired_state_key("pi-01", "hello-web"), "pi-01.hello-web");
}
#[test]
fn status_key_format() {
assert_eq!(status_key("pi-01"), "status.pi-01");
}
#[test]
fn bucket_names_match_smoke_scripts() {
// These strings are also grepped by iot/scripts/smoke-*.sh —
// flipping them here must be paired with a script update.
assert_eq!(BUCKET_DESIRED_STATE, "desired-state");
assert_eq!(BUCKET_AGENT_STATUS, "agent-status");
}
}

View File

@@ -0,0 +1,31 @@
//! Cross-boundary types for harmony's reconciler pattern.
//!
//! Harmony's "reconciler" pattern is: a central **operator** writes
//! desired state into NATS JetStream KV; a remote **agent** watches
//! the KV, deserializes each entry as a Score, and drives the host
//! toward that state. This split lets one operator orchestrate a
//! fleet of agents across network boundaries it can't reach
//! directly — IoT devices today, OKD cluster agents or edge-compute
//! reconcilers tomorrow.
//!
//! This crate holds the wire-format bits both sides must agree on:
//! NATS bucket names, KV key formats, and the `AgentStatus`
//! heartbeat payload. The Score types themselves (`PodmanV0Score`,
//! future variants) live in their respective harmony modules —
//! consumers import them from there and serialize them over the
//! transport this crate describes.
//!
//! **Deliberately lean** — no tokio, no async-nats, no harmony.
//! The on-device agent build pulls it in alongside a minimal
//! async-nats client; the operator pulls it alongside kube-rs.
//! Neither should pay for the other's dependencies.
pub mod kv;
pub mod status;
pub use kv::{BUCKET_AGENT_STATUS, BUCKET_DESIRED_STATE, desired_state_key, status_key};
pub use status::AgentStatus;
// Re-exports so consumers (agent, operator) don't need a direct
// harmony_types dependency purely to name the cross-boundary types.
pub use harmony_types::id::Id;

View File

@@ -0,0 +1,74 @@
//! Agent → NATS KV status payload.
//!
//! The agent publishes a heartbeat + rollup status to the
//! `agent-status` bucket every 30 s (see
//! [`crate::BUCKET_AGENT_STATUS`]). Today the payload is intentionally
//! minimal — a single `"running"` state + a timestamp — so the
//! operator can implement §12 v0.1 "Status aggregation in operator"
//! without waiting on richer per-workload reporting.
//!
//! When the agent grows richer status (per-container state, rollout
//! progress) this struct gains fields with `#[serde(default)]`; old
//! operators keep working against newer agents.
use chrono::{DateTime, Utc};
use harmony_types::id::Id;
use serde::{Deserialize, Serialize};
/// A single heartbeat published by the agent at
/// `status.<device_id>` in the `agent-status` bucket.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentStatus {
/// Echoed from the agent's own config so the operator can
/// cross-check which device it came from if the KV key is ever
/// ambiguous. Serializes transparently as a plain string.
pub device_id: Id,
/// Coarse rollup state. v0 only ever writes `"running"`; richer
/// variants are a v0.1+ concern. A String (not an enum) so old
/// operators parsing this payload don't fail on a new variant.
pub status: String,
/// RFC 3339 UTC timestamp. Used by the smoke test's reboot-
/// detection gate — any timestamp strictly greater than the gate
/// is evidence of a post-reboot write. `chrono::DateTime<Utc>`
/// serde-serializes as RFC 3339, so the wire format stays
/// lex-comparable (the smoke's string `>` still works).
pub timestamp: DateTime<Utc>,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn status_roundtrip() {
let s = AgentStatus {
device_id: Id::from("pi-01".to_string()),
status: "running".to_string(),
timestamp: DateTime::parse_from_rfc3339("2026-04-21T18:15:42Z")
.unwrap()
.with_timezone(&Utc),
};
let json = serde_json::to_string(&s).unwrap();
let back: AgentStatus = serde_json::from_str(&json).unwrap();
assert_eq!(s, back);
}
#[test]
fn status_has_expected_wire_keys() {
let s = AgentStatus {
device_id: Id::from("pi-01".to_string()),
status: "running".to_string(),
timestamp: DateTime::parse_from_rfc3339("2026-04-21T18:15:42Z")
.unwrap()
.with_timezone(&Utc),
};
let json = serde_json::to_string(&s).unwrap();
// device_id must serialize as a flat string (not {"value": …}).
// Relies on `#[serde(transparent)]` on `harmony_types::id::Id`.
assert!(json.contains("\"device_id\":\"pi-01\""), "got {json}");
assert!(json.contains("\"status\":\"running\""));
// RFC 3339 output — the smoke script greps a `"timestamp":"<rfc3339>"`
// literal and compares lexicographically against a gate.
assert!(json.contains("\"timestamp\":\"2026-04-21T18:15:42Z\""));
}
}

View File

@@ -242,14 +242,16 @@ async fn provision_ssh_keypair() -> Result<IotSshKeypair, ExecutorError> {
info!("generating ed25519 ssh keypair at {priv_path:?} (one-time)");
let status = Command::new("ssh-keygen")
.arg("-t")
.arg("ed25519")
.arg("-N")
.arg("") // no passphrase
.arg("-C")
.arg("harmony-iot-smoke")
.arg("-f")
.arg(&priv_path)
.args([
"-t",
"ed25519",
"-N",
"", // no passphrase
"-C",
"harmony-iot-smoke",
"-f",
])
.arg(&priv_path) // PathBuf — kept separate so we don't force &str conversion
.stdout(Stdio::null())
.stderr(Stdio::piped())
.output()

View File

@@ -1,3 +1,12 @@
//! Podman Score types and their `Score<T>` / `Interpret<T>` bindings.
//!
//! `ContainerRuntime` is a first-class harmony capability (see
//! [`crate::topology::ContainerRuntime`]); the types that describe a
//! set of containers a caller wants running live here next to the
//! trait impls that route them to interpretation. These types are
//! independent of any particular product (IoT fleet, OKD fleet, etc.)
//! — callers serialize them over whatever transport they like.
use serde::{Deserialize, Serialize};
use crate::{
@@ -8,6 +17,7 @@ use crate::{
use super::interpret::PodmanV0Interpret;
/// A single container managed by podman on the target host.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PodmanService {
pub name: String,
@@ -15,28 +25,34 @@ pub struct PodmanService {
pub ports: Vec<String>,
}
/// v0 Score for podman-based workloads.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct PodmanV0Score {
pub services: Vec<PodmanService>,
}
impl PodmanV0Score {
/// Label value applied to every container this score creates, used by
/// [`ContainerRuntime::list_managed_services`] to enumerate the set of
/// containers that belong to a specific Score (e.g. so the agent can
/// remove them when the corresponding KV entry is deleted). The label
/// key is [`crate::modules::podman::DEPLOYMENT_LABEL`].
/// Label value applied to every container this Score creates. The
/// caller uses the label to enumerate the set of containers that
/// belong to a given Score instance (e.g. to tear them down when
/// the corresponding desired-state entry is deleted).
///
/// For v0 there is a single PodmanV0Score per KV key and no multi-score
/// scheduling on the device, so a stable hash over the service names
/// is an adequate identifier. When v0.1 introduces multiple scores per
/// device the caller will pass an explicit deployment id.
/// For v0 there is a single PodmanV0Score per desired-state key
/// and no multi-score scheduling on the target host, so a stable
/// join over the service names is an adequate identifier. When
/// v0.1 introduces multiple scores per host the caller will pass
/// an explicit deployment id.
pub fn deployment_label(&self) -> String {
let names: Vec<&str> = self.services.iter().map(|s| s.name.as_str()).collect();
names.join(",")
}
}
/// Wire envelope for a reconciler-style Score payload: externally
/// tagged so the JSON is `{"type": "PodmanV0", "data": { ... }}`.
/// Adding a new variant is additive — emitters stay opaque routers,
/// consumers learn the new variant, older consumers harmlessly
/// log-and-skip the unknown tag.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
#[serde(tag = "type", content = "data")]
pub enum IotScore {
@@ -105,4 +121,23 @@ mod tests {
let deserialized: IotScore = serde_json::from_str(&serialized).unwrap();
assert_eq!(score, deserialized);
}
#[test]
fn deployment_label_joins_service_names() {
let score = PodmanV0Score {
services: vec![
PodmanService {
name: "web".to_string(),
image: "nginx".to_string(),
ports: vec![],
},
PodmanService {
name: "api".to_string(),
image: "myapp".to_string(),
ports: vec![],
},
],
};
assert_eq!(score.deployment_label(), "web,api");
}
}

View File

@@ -20,6 +20,7 @@ use serde::{Deserialize, Serialize};
/// **It is not meant to be very secure or unique**, it is suitable to generate up to 10 000 items per
/// second with a reasonable collision rate of 0,000014 % as calculated by this calculator : https://kevingal.com/apps/collision.html
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize)]
#[serde(transparent)]
pub struct Id {
value: String,
}

View File

@@ -5,6 +5,7 @@ edition = "2024"
rust-version = "1.85"
[dependencies]
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
harmony = { path = "../../harmony", default-features = false, features = ["podman"] }
async-nats = { workspace = true }
chrono = { workspace = true }

View File

@@ -1,3 +1,4 @@
use harmony_reconciler_contracts::Id;
use serde::Deserialize;
use std::path::Path;
@@ -10,7 +11,9 @@ pub struct AgentConfig {
#[derive(Debug, Clone, Deserialize)]
pub struct AgentSection {
pub device_id: String,
/// Cross-boundary device identity. TOML deserializes the field
/// as a bare string thanks to `#[serde(transparent)]` on `Id`.
pub device_id: Id,
}
#[derive(Debug, Clone, Deserialize)]

View File

@@ -8,6 +8,9 @@ use anyhow::{Context, Result};
use clap::Parser;
use config::{AgentConfig, CredentialSource, TomlFileCredentialSource};
use futures_util::StreamExt;
use harmony_reconciler_contracts::{
AgentStatus, BUCKET_AGENT_STATUS, BUCKET_DESIRED_STATE, Id, status_key,
};
use harmony::inventory::Inventory;
use harmony::modules::podman::PodmanTopology;
@@ -42,13 +45,13 @@ async fn connect_nats(cfg: &AgentConfig) -> Result<async_nats::Client> {
async fn watch_desired_state(
client: async_nats::Client,
device_id: String,
device_id: Id,
reconciler: Arc<Reconciler>,
) -> Result<()> {
let jetstream = async_nats::jetstream::new(client);
let bucket = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "desired-state".to_string(),
bucket: BUCKET_DESIRED_STATE.to_string(),
..Default::default()
})
.await?;
@@ -82,26 +85,27 @@ async fn watch_desired_state(
Ok(())
}
async fn report_status(client: async_nats::Client, device_id: String) -> Result<()> {
async fn report_status(client: async_nats::Client, device_id: Id) -> Result<()> {
let jetstream = async_nats::jetstream::new(client);
let bucket = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: "agent-status".to_string(),
bucket: BUCKET_AGENT_STATUS.to_string(),
..Default::default()
})
.await?;
let key = format!("status.{}", device_id);
let key = status_key(&device_id.to_string());
let mut interval = tokio::time::interval(Duration::from_secs(30));
loop {
interval.tick().await;
let status = serde_json::json!({
"device_id": device_id,
"status": "running",
"timestamp": chrono::Utc::now().to_rfc3339(),
});
bucket.put(&key, status.to_string().into()).await?;
let status = AgentStatus {
device_id: device_id.clone(),
status: "running".to_string(),
timestamp: chrono::Utc::now(),
};
let payload = serde_json::to_vec(&status)?;
bucket.put(&key, payload.into()).await?;
tracing::debug!(key = %key, "reported status");
}
}

View File

@@ -5,6 +5,7 @@ edition = "2024"
rust-version = "1.85"
[dependencies]
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
kube = { workspace = true, features = ["runtime", "derive"] }
k8s-openapi.workspace = true
async-nats = { workspace = true }

View File

@@ -3,6 +3,7 @@ use std::time::Duration;
use async_nats::jetstream::kv::Store;
use futures_util::StreamExt;
use harmony_reconciler_contracts::desired_state_key;
use kube::api::{Patch, PatchParams};
use kube::runtime::Controller;
use kube::runtime::controller::Action;
@@ -127,7 +128,7 @@ fn serialize_score(score: &ScorePayload) -> Result<String, Error> {
}
fn kv_key(device_id: &str, deployment_name: &str) -> String {
format!("{device_id}.{deployment_name}")
desired_state_key(device_id, deployment_name)
}
fn error_policy(_obj: Arc<Deployment>, err: &Error, _ctx: Arc<Context>) -> Action {

View File

@@ -4,6 +4,7 @@ mod crd;
use anyhow::Result;
use async_nats::jetstream;
use clap::{Parser, Subcommand};
use harmony_reconciler_contracts::BUCKET_DESIRED_STATE;
use kube::{Client, CustomResourceExt};
use crate::crd::Deployment;
@@ -28,7 +29,7 @@ struct Cli {
#[arg(
long,
env = "KV_BUCKET",
default_value = "desired-state",
default_value = BUCKET_DESIRED_STATE,
global = true
)]
kv_bucket: String,