refactor(iot): extract iot-contracts crate for cross-boundary types #270
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -3726,6 +3726,16 @@ dependencies = [
|
||||
"tower",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "harmony-reconciler-contracts"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"harmony_types",
|
||||
"serde",
|
||||
"serde_json",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "harmony_agent"
|
||||
version = "0.1.0"
|
||||
@@ -4710,6 +4720,7 @@ dependencies = [
|
||||
"clap",
|
||||
"futures-util",
|
||||
"harmony",
|
||||
"harmony-reconciler-contracts",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"tokio",
|
||||
@@ -4726,6 +4737,7 @@ dependencies = [
|
||||
"async-nats",
|
||||
"clap",
|
||||
"futures-util",
|
||||
"harmony-reconciler-contracts",
|
||||
"k8s-openapi",
|
||||
"kube",
|
||||
"schemars 0.8.22",
|
||||
|
||||
@@ -30,6 +30,7 @@ members = [
|
||||
"harmony_assets", "opnsense-codegen", "opnsense-api",
|
||||
"iot/iot-operator-v0",
|
||||
"iot/iot-agent-v0",
|
||||
"harmony-reconciler-contracts",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
|
||||
20
harmony-reconciler-contracts/Cargo.toml
Normal file
20
harmony-reconciler-contracts/Cargo.toml
Normal file
@@ -0,0 +1,20 @@
|
||||
[package]
|
||||
name = "harmony-reconciler-contracts"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
license.workspace = true
|
||||
|
||||
# Cross-boundary types shared between a harmony operator (central,
|
||||
# writing desired state to NATS JetStream KV) and a harmony agent
|
||||
# (on-host, watching KV and reconciling). Deliberately lean: pure
|
||||
# serde data types, bucket/key constants, small helpers. No tokio,
|
||||
# no async-nats, no harmony. The on-device agent build pulls this
|
||||
# in alongside a minimal async-nats client; the operator pulls it
|
||||
# alongside kube-rs; harmony itself treats it as just another
|
||||
# module. None of those consumers should pay for the others' deps.
|
||||
|
||||
[dependencies]
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
harmony_types = { path = "../harmony_types" }
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json = { workspace = true }
|
||||
53
harmony-reconciler-contracts/src/kv.rs
Normal file
53
harmony-reconciler-contracts/src/kv.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
//! NATS JetStream KV bucket names and key formats used by the
|
||||
//! harmony reconciler pattern.
|
||||
//!
|
||||
//! Hard-coded literals previously lived in five places (agent and
|
||||
//! operator main.rs, operator deploy YAML, two smoke scripts).
|
||||
//! Changing any of them meant hunting for the others. They now live
|
||||
//! here; agent + operator consume the constants directly, and smoke
|
||||
//! scripts grep for the literal values locked in the tests below.
|
||||
|
||||
/// Operator-written bucket. One entry per `(device, deployment)` pair.
|
||||
/// Values are the JSON-serialized Score envelope — today
|
||||
/// `harmony::modules::podman::IotScore`, tomorrow any variant of
|
||||
/// a polymorphic `Score` enum the framework ships.
|
||||
pub const BUCKET_DESIRED_STATE: &str = "desired-state";
|
||||
|
||||
/// Agent-written bucket. One entry per device at `status.<device_id>`.
|
||||
/// Values are JSON-serialized [`crate::AgentStatus`].
|
||||
pub const BUCKET_AGENT_STATUS: &str = "agent-status";
|
||||
|
||||
/// KV key for a `(device, deployment)` pair in [`BUCKET_DESIRED_STATE`].
|
||||
/// Format: `<device>.<deployment>`.
|
||||
pub fn desired_state_key(device_id: &str, deployment_name: &str) -> String {
|
||||
format!("{device_id}.{deployment_name}")
|
||||
}
|
||||
|
||||
/// KV key for a device's last-known status in [`BUCKET_AGENT_STATUS`].
|
||||
/// Format: `status.<device_id>`.
|
||||
pub fn status_key(device_id: &str) -> String {
|
||||
format!("status.{device_id}")
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn desired_state_key_format() {
|
||||
assert_eq!(desired_state_key("pi-01", "hello-web"), "pi-01.hello-web");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn status_key_format() {
|
||||
assert_eq!(status_key("pi-01"), "status.pi-01");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn bucket_names_match_smoke_scripts() {
|
||||
// These strings are also grepped by iot/scripts/smoke-*.sh —
|
||||
// flipping them here must be paired with a script update.
|
||||
assert_eq!(BUCKET_DESIRED_STATE, "desired-state");
|
||||
assert_eq!(BUCKET_AGENT_STATUS, "agent-status");
|
||||
}
|
||||
}
|
||||
31
harmony-reconciler-contracts/src/lib.rs
Normal file
31
harmony-reconciler-contracts/src/lib.rs
Normal file
@@ -0,0 +1,31 @@
|
||||
//! Cross-boundary types for harmony's reconciler pattern.
|
||||
//!
|
||||
//! Harmony's "reconciler" pattern is: a central **operator** writes
|
||||
//! desired state into NATS JetStream KV; a remote **agent** watches
|
||||
//! the KV, deserializes each entry as a Score, and drives the host
|
||||
//! toward that state. This split lets one operator orchestrate a
|
||||
//! fleet of agents across network boundaries it can't reach
|
||||
//! directly — IoT devices today, OKD cluster agents or edge-compute
|
||||
//! reconcilers tomorrow.
|
||||
//!
|
||||
//! This crate holds the wire-format bits both sides must agree on:
|
||||
//! NATS bucket names, KV key formats, and the `AgentStatus`
|
||||
//! heartbeat payload. The Score types themselves (`PodmanV0Score`,
|
||||
//! future variants) live in their respective harmony modules —
|
||||
//! consumers import them from there and serialize them over the
|
||||
//! transport this crate describes.
|
||||
//!
|
||||
//! **Deliberately lean** — no tokio, no async-nats, no harmony.
|
||||
//! The on-device agent build pulls it in alongside a minimal
|
||||
//! async-nats client; the operator pulls it alongside kube-rs.
|
||||
//! Neither should pay for the other's dependencies.
|
||||
|
||||
pub mod kv;
|
||||
pub mod status;
|
||||
|
||||
pub use kv::{BUCKET_AGENT_STATUS, BUCKET_DESIRED_STATE, desired_state_key, status_key};
|
||||
pub use status::AgentStatus;
|
||||
|
||||
// Re-exports so consumers (agent, operator) don't need a direct
|
||||
// harmony_types dependency purely to name the cross-boundary types.
|
||||
pub use harmony_types::id::Id;
|
||||
74
harmony-reconciler-contracts/src/status.rs
Normal file
74
harmony-reconciler-contracts/src/status.rs
Normal file
@@ -0,0 +1,74 @@
|
||||
//! Agent → NATS KV status payload.
|
||||
//!
|
||||
//! The agent publishes a heartbeat + rollup status to the
|
||||
//! `agent-status` bucket every 30 s (see
|
||||
//! [`crate::BUCKET_AGENT_STATUS`]). Today the payload is intentionally
|
||||
//! minimal — a single `"running"` state + a timestamp — so the
|
||||
//! operator can implement §12 v0.1 "Status aggregation in operator"
|
||||
//! without waiting on richer per-workload reporting.
|
||||
//!
|
||||
//! When the agent grows richer status (per-container state, rollout
|
||||
//! progress) this struct gains fields with `#[serde(default)]`; old
|
||||
//! operators keep working against newer agents.
|
||||
|
||||
use chrono::{DateTime, Utc};
|
||||
use harmony_types::id::Id;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// A single heartbeat published by the agent at
|
||||
/// `status.<device_id>` in the `agent-status` bucket.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct AgentStatus {
|
||||
/// Echoed from the agent's own config so the operator can
|
||||
/// cross-check which device it came from if the KV key is ever
|
||||
/// ambiguous. Serializes transparently as a plain string.
|
||||
pub device_id: Id,
|
||||
/// Coarse rollup state. v0 only ever writes `"running"`; richer
|
||||
/// variants are a v0.1+ concern. A String (not an enum) so old
|
||||
/// operators parsing this payload don't fail on a new variant.
|
||||
pub status: String,
|
||||
/// RFC 3339 UTC timestamp. Used by the smoke test's reboot-
|
||||
/// detection gate — any timestamp strictly greater than the gate
|
||||
/// is evidence of a post-reboot write. `chrono::DateTime<Utc>`
|
||||
/// serde-serializes as RFC 3339, so the wire format stays
|
||||
/// lex-comparable (the smoke's string `>` still works).
|
||||
pub timestamp: DateTime<Utc>,
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn status_roundtrip() {
|
||||
let s = AgentStatus {
|
||||
device_id: Id::from("pi-01".to_string()),
|
||||
status: "running".to_string(),
|
||||
timestamp: DateTime::parse_from_rfc3339("2026-04-21T18:15:42Z")
|
||||
.unwrap()
|
||||
.with_timezone(&Utc),
|
||||
};
|
||||
let json = serde_json::to_string(&s).unwrap();
|
||||
let back: AgentStatus = serde_json::from_str(&json).unwrap();
|
||||
assert_eq!(s, back);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn status_has_expected_wire_keys() {
|
||||
let s = AgentStatus {
|
||||
device_id: Id::from("pi-01".to_string()),
|
||||
status: "running".to_string(),
|
||||
timestamp: DateTime::parse_from_rfc3339("2026-04-21T18:15:42Z")
|
||||
.unwrap()
|
||||
.with_timezone(&Utc),
|
||||
};
|
||||
let json = serde_json::to_string(&s).unwrap();
|
||||
// device_id must serialize as a flat string (not {"value": …}).
|
||||
// Relies on `#[serde(transparent)]` on `harmony_types::id::Id`.
|
||||
assert!(json.contains("\"device_id\":\"pi-01\""), "got {json}");
|
||||
assert!(json.contains("\"status\":\"running\""));
|
||||
// RFC 3339 output — the smoke script greps a `"timestamp":"<rfc3339>"`
|
||||
// literal and compares lexicographically against a gate.
|
||||
assert!(json.contains("\"timestamp\":\"2026-04-21T18:15:42Z\""));
|
||||
}
|
||||
}
|
||||
@@ -242,14 +242,16 @@ async fn provision_ssh_keypair() -> Result<IotSshKeypair, ExecutorError> {
|
||||
|
||||
info!("generating ed25519 ssh keypair at {priv_path:?} (one-time)");
|
||||
let status = Command::new("ssh-keygen")
|
||||
.arg("-t")
|
||||
.arg("ed25519")
|
||||
.arg("-N")
|
||||
.arg("") // no passphrase
|
||||
.arg("-C")
|
||||
.arg("harmony-iot-smoke")
|
||||
.arg("-f")
|
||||
.arg(&priv_path)
|
||||
.args([
|
||||
"-t",
|
||||
"ed25519",
|
||||
"-N",
|
||||
"", // no passphrase
|
||||
"-C",
|
||||
"harmony-iot-smoke",
|
||||
"-f",
|
||||
])
|
||||
.arg(&priv_path) // PathBuf — kept separate so we don't force &str conversion
|
||||
.stdout(Stdio::null())
|
||||
.stderr(Stdio::piped())
|
||||
.output()
|
||||
|
||||
@@ -1,3 +1,12 @@
|
||||
//! Podman Score types and their `Score<T>` / `Interpret<T>` bindings.
|
||||
//!
|
||||
//! `ContainerRuntime` is a first-class harmony capability (see
|
||||
//! [`crate::topology::ContainerRuntime`]); the types that describe a
|
||||
//! set of containers a caller wants running live here next to the
|
||||
//! trait impls that route them to interpretation. These types are
|
||||
//! independent of any particular product (IoT fleet, OKD fleet, etc.)
|
||||
//! — callers serialize them over whatever transport they like.
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
@@ -8,6 +17,7 @@ use crate::{
|
||||
|
||||
use super::interpret::PodmanV0Interpret;
|
||||
|
||||
/// A single container managed by podman on the target host.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct PodmanService {
|
||||
pub name: String,
|
||||
@@ -15,28 +25,34 @@ pub struct PodmanService {
|
||||
pub ports: Vec<String>,
|
||||
}
|
||||
|
||||
/// v0 Score for podman-based workloads.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub struct PodmanV0Score {
|
||||
pub services: Vec<PodmanService>,
|
||||
}
|
||||
|
||||
impl PodmanV0Score {
|
||||
/// Label value applied to every container this score creates, used by
|
||||
/// [`ContainerRuntime::list_managed_services`] to enumerate the set of
|
||||
/// containers that belong to a specific Score (e.g. so the agent can
|
||||
/// remove them when the corresponding KV entry is deleted). The label
|
||||
/// key is [`crate::modules::podman::DEPLOYMENT_LABEL`].
|
||||
/// Label value applied to every container this Score creates. The
|
||||
/// caller uses the label to enumerate the set of containers that
|
||||
/// belong to a given Score instance (e.g. to tear them down when
|
||||
/// the corresponding desired-state entry is deleted).
|
||||
///
|
||||
/// For v0 there is a single PodmanV0Score per KV key and no multi-score
|
||||
/// scheduling on the device, so a stable hash over the service names
|
||||
/// is an adequate identifier. When v0.1 introduces multiple scores per
|
||||
/// device the caller will pass an explicit deployment id.
|
||||
/// For v0 there is a single PodmanV0Score per desired-state key
|
||||
/// and no multi-score scheduling on the target host, so a stable
|
||||
/// join over the service names is an adequate identifier. When
|
||||
/// v0.1 introduces multiple scores per host the caller will pass
|
||||
/// an explicit deployment id.
|
||||
pub fn deployment_label(&self) -> String {
|
||||
let names: Vec<&str> = self.services.iter().map(|s| s.name.as_str()).collect();
|
||||
names.join(",")
|
||||
}
|
||||
}
|
||||
|
||||
/// Wire envelope for a reconciler-style Score payload: externally
|
||||
/// tagged so the JSON is `{"type": "PodmanV0", "data": { ... }}`.
|
||||
/// Adding a new variant is additive — emitters stay opaque routers,
|
||||
/// consumers learn the new variant, older consumers harmlessly
|
||||
/// log-and-skip the unknown tag.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||
#[serde(tag = "type", content = "data")]
|
||||
pub enum IotScore {
|
||||
@@ -105,4 +121,23 @@ mod tests {
|
||||
let deserialized: IotScore = serde_json::from_str(&serialized).unwrap();
|
||||
assert_eq!(score, deserialized);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn deployment_label_joins_service_names() {
|
||||
let score = PodmanV0Score {
|
||||
services: vec![
|
||||
PodmanService {
|
||||
name: "web".to_string(),
|
||||
image: "nginx".to_string(),
|
||||
ports: vec![],
|
||||
},
|
||||
PodmanService {
|
||||
name: "api".to_string(),
|
||||
image: "myapp".to_string(),
|
||||
ports: vec![],
|
||||
},
|
||||
],
|
||||
};
|
||||
assert_eq!(score.deployment_label(), "web,api");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -20,6 +20,7 @@ use serde::{Deserialize, Serialize};
|
||||
/// **It is not meant to be very secure or unique**, it is suitable to generate up to 10 000 items per
|
||||
/// second with a reasonable collision rate of 0,000014 % as calculated by this calculator : https://kevingal.com/apps/collision.html
|
||||
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||
#[serde(transparent)]
|
||||
pub struct Id {
|
||||
value: String,
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ edition = "2024"
|
||||
rust-version = "1.85"
|
||||
|
||||
[dependencies]
|
||||
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
|
||||
harmony = { path = "../../harmony", default-features = false, features = ["podman"] }
|
||||
async-nats = { workspace = true }
|
||||
chrono = { workspace = true }
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use harmony_reconciler_contracts::Id;
|
||||
use serde::Deserialize;
|
||||
use std::path::Path;
|
||||
|
||||
@@ -10,7 +11,9 @@ pub struct AgentConfig {
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct AgentSection {
|
||||
pub device_id: String,
|
||||
/// Cross-boundary device identity. TOML deserializes the field
|
||||
/// as a bare string thanks to `#[serde(transparent)]` on `Id`.
|
||||
pub device_id: Id,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
|
||||
@@ -8,6 +8,9 @@ use anyhow::{Context, Result};
|
||||
use clap::Parser;
|
||||
use config::{AgentConfig, CredentialSource, TomlFileCredentialSource};
|
||||
use futures_util::StreamExt;
|
||||
use harmony_reconciler_contracts::{
|
||||
AgentStatus, BUCKET_AGENT_STATUS, BUCKET_DESIRED_STATE, Id, status_key,
|
||||
};
|
||||
|
||||
use harmony::inventory::Inventory;
|
||||
use harmony::modules::podman::PodmanTopology;
|
||||
@@ -42,13 +45,13 @@ async fn connect_nats(cfg: &AgentConfig) -> Result<async_nats::Client> {
|
||||
|
||||
async fn watch_desired_state(
|
||||
client: async_nats::Client,
|
||||
device_id: String,
|
||||
device_id: Id,
|
||||
reconciler: Arc<Reconciler>,
|
||||
) -> Result<()> {
|
||||
let jetstream = async_nats::jetstream::new(client);
|
||||
let bucket = jetstream
|
||||
.create_key_value(async_nats::jetstream::kv::Config {
|
||||
bucket: "desired-state".to_string(),
|
||||
bucket: BUCKET_DESIRED_STATE.to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
@@ -82,26 +85,27 @@ async fn watch_desired_state(
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn report_status(client: async_nats::Client, device_id: String) -> Result<()> {
|
||||
async fn report_status(client: async_nats::Client, device_id: Id) -> Result<()> {
|
||||
let jetstream = async_nats::jetstream::new(client);
|
||||
let bucket = jetstream
|
||||
.create_key_value(async_nats::jetstream::kv::Config {
|
||||
bucket: "agent-status".to_string(),
|
||||
bucket: BUCKET_AGENT_STATUS.to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
|
||||
let key = format!("status.{}", device_id);
|
||||
let key = status_key(&device_id.to_string());
|
||||
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
||||
|
||||
loop {
|
||||
interval.tick().await;
|
||||
let status = serde_json::json!({
|
||||
"device_id": device_id,
|
||||
"status": "running",
|
||||
"timestamp": chrono::Utc::now().to_rfc3339(),
|
||||
});
|
||||
bucket.put(&key, status.to_string().into()).await?;
|
||||
let status = AgentStatus {
|
||||
device_id: device_id.clone(),
|
||||
status: "running".to_string(),
|
||||
timestamp: chrono::Utc::now(),
|
||||
};
|
||||
let payload = serde_json::to_vec(&status)?;
|
||||
bucket.put(&key, payload.into()).await?;
|
||||
tracing::debug!(key = %key, "reported status");
|
||||
}
|
||||
}
|
||||
|
||||
@@ -5,6 +5,7 @@ edition = "2024"
|
||||
rust-version = "1.85"
|
||||
|
||||
[dependencies]
|
||||
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
|
||||
kube = { workspace = true, features = ["runtime", "derive"] }
|
||||
k8s-openapi.workspace = true
|
||||
async-nats = { workspace = true }
|
||||
|
||||
@@ -3,6 +3,7 @@ use std::time::Duration;
|
||||
|
||||
use async_nats::jetstream::kv::Store;
|
||||
use futures_util::StreamExt;
|
||||
use harmony_reconciler_contracts::desired_state_key;
|
||||
use kube::api::{Patch, PatchParams};
|
||||
use kube::runtime::Controller;
|
||||
use kube::runtime::controller::Action;
|
||||
@@ -127,7 +128,7 @@ fn serialize_score(score: &ScorePayload) -> Result<String, Error> {
|
||||
}
|
||||
|
||||
fn kv_key(device_id: &str, deployment_name: &str) -> String {
|
||||
format!("{device_id}.{deployment_name}")
|
||||
desired_state_key(device_id, deployment_name)
|
||||
}
|
||||
|
||||
fn error_policy(_obj: Arc<Deployment>, err: &Error, _ctx: Arc<Context>) -> Action {
|
||||
|
||||
@@ -4,6 +4,7 @@ mod crd;
|
||||
use anyhow::Result;
|
||||
use async_nats::jetstream;
|
||||
use clap::{Parser, Subcommand};
|
||||
use harmony_reconciler_contracts::BUCKET_DESIRED_STATE;
|
||||
use kube::{Client, CustomResourceExt};
|
||||
|
||||
use crate::crd::Deployment;
|
||||
@@ -28,7 +29,7 @@ struct Cli {
|
||||
#[arg(
|
||||
long,
|
||||
env = "KV_BUCKET",
|
||||
default_value = "desired-state",
|
||||
default_value = BUCKET_DESIRED_STATE,
|
||||
global = true
|
||||
)]
|
||||
kv_bucket: String,
|
||||
|
||||
Reference in New Issue
Block a user