refactor(iot): extract iot-contracts crate for cross-boundary types #270
12
Cargo.lock
generated
12
Cargo.lock
generated
@@ -3726,6 +3726,16 @@ dependencies = [
|
|||||||
"tower",
|
"tower",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "harmony-reconciler-contracts"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"chrono",
|
||||||
|
"harmony_types",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "harmony_agent"
|
name = "harmony_agent"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
@@ -4710,6 +4720,7 @@ dependencies = [
|
|||||||
"clap",
|
"clap",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
"harmony",
|
"harmony",
|
||||||
|
"harmony-reconciler-contracts",
|
||||||
"serde",
|
"serde",
|
||||||
"serde_json",
|
"serde_json",
|
||||||
"tokio",
|
"tokio",
|
||||||
@@ -4726,6 +4737,7 @@ dependencies = [
|
|||||||
"async-nats",
|
"async-nats",
|
||||||
"clap",
|
"clap",
|
||||||
"futures-util",
|
"futures-util",
|
||||||
|
"harmony-reconciler-contracts",
|
||||||
"k8s-openapi",
|
"k8s-openapi",
|
||||||
"kube",
|
"kube",
|
||||||
"schemars 0.8.22",
|
"schemars 0.8.22",
|
||||||
|
|||||||
@@ -30,6 +30,7 @@ members = [
|
|||||||
"harmony_assets", "opnsense-codegen", "opnsense-api",
|
"harmony_assets", "opnsense-codegen", "opnsense-api",
|
||||||
"iot/iot-operator-v0",
|
"iot/iot-operator-v0",
|
||||||
"iot/iot-agent-v0",
|
"iot/iot-agent-v0",
|
||||||
|
"harmony-reconciler-contracts",
|
||||||
]
|
]
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
|
|||||||
20
harmony-reconciler-contracts/Cargo.toml
Normal file
20
harmony-reconciler-contracts/Cargo.toml
Normal file
@@ -0,0 +1,20 @@
|
|||||||
|
[package]
|
||||||
|
name = "harmony-reconciler-contracts"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2024"
|
||||||
|
license.workspace = true
|
||||||
|
|
||||||
|
# Cross-boundary types shared between a harmony operator (central,
|
||||||
|
# writing desired state to NATS JetStream KV) and a harmony agent
|
||||||
|
# (on-host, watching KV and reconciling). Deliberately lean: pure
|
||||||
|
# serde data types, bucket/key constants, small helpers. No tokio,
|
||||||
|
# no async-nats, no harmony. The on-device agent build pulls this
|
||||||
|
# in alongside a minimal async-nats client; the operator pulls it
|
||||||
|
# alongside kube-rs; harmony itself treats it as just another
|
||||||
|
# module. None of those consumers should pay for the others' deps.
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
chrono = { workspace = true, features = ["serde"] }
|
||||||
|
harmony_types = { path = "../harmony_types" }
|
||||||
|
serde = { workspace = true, features = ["derive"] }
|
||||||
|
serde_json = { workspace = true }
|
||||||
53
harmony-reconciler-contracts/src/kv.rs
Normal file
53
harmony-reconciler-contracts/src/kv.rs
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
//! NATS JetStream KV bucket names and key formats used by the
|
||||||
|
//! harmony reconciler pattern.
|
||||||
|
//!
|
||||||
|
//! Hard-coded literals previously lived in five places (agent and
|
||||||
|
//! operator main.rs, operator deploy YAML, two smoke scripts).
|
||||||
|
//! Changing any of them meant hunting for the others. They now live
|
||||||
|
//! here; agent + operator consume the constants directly, and smoke
|
||||||
|
//! scripts grep for the literal values locked in the tests below.
|
||||||
|
|
||||||
|
/// Operator-written bucket. One entry per `(device, deployment)` pair.
|
||||||
|
/// Values are the JSON-serialized Score envelope — today
|
||||||
|
/// `harmony::modules::podman::IotScore`, tomorrow any variant of
|
||||||
|
/// a polymorphic `Score` enum the framework ships.
|
||||||
|
pub const BUCKET_DESIRED_STATE: &str = "desired-state";
|
||||||
|
|
||||||
|
/// Agent-written bucket. One entry per device at `status.<device_id>`.
|
||||||
|
/// Values are JSON-serialized [`crate::AgentStatus`].
|
||||||
|
pub const BUCKET_AGENT_STATUS: &str = "agent-status";
|
||||||
|
|
||||||
|
/// KV key for a `(device, deployment)` pair in [`BUCKET_DESIRED_STATE`].
|
||||||
|
/// Format: `<device>.<deployment>`.
|
||||||
|
pub fn desired_state_key(device_id: &str, deployment_name: &str) -> String {
|
||||||
|
format!("{device_id}.{deployment_name}")
|
||||||
|
}
|
||||||
|
|
||||||
|
/// KV key for a device's last-known status in [`BUCKET_AGENT_STATUS`].
|
||||||
|
/// Format: `status.<device_id>`.
|
||||||
|
pub fn status_key(device_id: &str) -> String {
|
||||||
|
format!("status.{device_id}")
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn desired_state_key_format() {
|
||||||
|
assert_eq!(desired_state_key("pi-01", "hello-web"), "pi-01.hello-web");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn status_key_format() {
|
||||||
|
assert_eq!(status_key("pi-01"), "status.pi-01");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn bucket_names_match_smoke_scripts() {
|
||||||
|
// These strings are also grepped by iot/scripts/smoke-*.sh —
|
||||||
|
// flipping them here must be paired with a script update.
|
||||||
|
assert_eq!(BUCKET_DESIRED_STATE, "desired-state");
|
||||||
|
assert_eq!(BUCKET_AGENT_STATUS, "agent-status");
|
||||||
|
}
|
||||||
|
}
|
||||||
31
harmony-reconciler-contracts/src/lib.rs
Normal file
31
harmony-reconciler-contracts/src/lib.rs
Normal file
@@ -0,0 +1,31 @@
|
|||||||
|
//! Cross-boundary types for harmony's reconciler pattern.
|
||||||
|
//!
|
||||||
|
//! Harmony's "reconciler" pattern is: a central **operator** writes
|
||||||
|
//! desired state into NATS JetStream KV; a remote **agent** watches
|
||||||
|
//! the KV, deserializes each entry as a Score, and drives the host
|
||||||
|
//! toward that state. This split lets one operator orchestrate a
|
||||||
|
//! fleet of agents across network boundaries it can't reach
|
||||||
|
//! directly — IoT devices today, OKD cluster agents or edge-compute
|
||||||
|
//! reconcilers tomorrow.
|
||||||
|
//!
|
||||||
|
//! This crate holds the wire-format bits both sides must agree on:
|
||||||
|
//! NATS bucket names, KV key formats, and the `AgentStatus`
|
||||||
|
//! heartbeat payload. The Score types themselves (`PodmanV0Score`,
|
||||||
|
//! future variants) live in their respective harmony modules —
|
||||||
|
//! consumers import them from there and serialize them over the
|
||||||
|
//! transport this crate describes.
|
||||||
|
//!
|
||||||
|
//! **Deliberately lean** — no tokio, no async-nats, no harmony.
|
||||||
|
//! The on-device agent build pulls it in alongside a minimal
|
||||||
|
//! async-nats client; the operator pulls it alongside kube-rs.
|
||||||
|
//! Neither should pay for the other's dependencies.
|
||||||
|
|
||||||
|
pub mod kv;
|
||||||
|
pub mod status;
|
||||||
|
|
||||||
|
pub use kv::{BUCKET_AGENT_STATUS, BUCKET_DESIRED_STATE, desired_state_key, status_key};
|
||||||
|
pub use status::AgentStatus;
|
||||||
|
|
||||||
|
// Re-exports so consumers (agent, operator) don't need a direct
|
||||||
|
// harmony_types dependency purely to name the cross-boundary types.
|
||||||
|
pub use harmony_types::id::Id;
|
||||||
74
harmony-reconciler-contracts/src/status.rs
Normal file
74
harmony-reconciler-contracts/src/status.rs
Normal file
@@ -0,0 +1,74 @@
|
|||||||
|
//! Agent → NATS KV status payload.
|
||||||
|
//!
|
||||||
|
//! The agent publishes a heartbeat + rollup status to the
|
||||||
|
//! `agent-status` bucket every 30 s (see
|
||||||
|
//! [`crate::BUCKET_AGENT_STATUS`]). Today the payload is intentionally
|
||||||
|
//! minimal — a single `"running"` state + a timestamp — so the
|
||||||
|
//! operator can implement §12 v0.1 "Status aggregation in operator"
|
||||||
|
//! without waiting on richer per-workload reporting.
|
||||||
|
//!
|
||||||
|
//! When the agent grows richer status (per-container state, rollout
|
||||||
|
//! progress) this struct gains fields with `#[serde(default)]`; old
|
||||||
|
//! operators keep working against newer agents.
|
||||||
|
|
||||||
|
use chrono::{DateTime, Utc};
|
||||||
|
use harmony_types::id::Id;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
/// A single heartbeat published by the agent at
|
||||||
|
/// `status.<device_id>` in the `agent-status` bucket.
|
||||||
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
|
pub struct AgentStatus {
|
||||||
|
/// Echoed from the agent's own config so the operator can
|
||||||
|
/// cross-check which device it came from if the KV key is ever
|
||||||
|
/// ambiguous. Serializes transparently as a plain string.
|
||||||
|
pub device_id: Id,
|
||||||
|
/// Coarse rollup state. v0 only ever writes `"running"`; richer
|
||||||
|
/// variants are a v0.1+ concern. A String (not an enum) so old
|
||||||
|
/// operators parsing this payload don't fail on a new variant.
|
||||||
|
pub status: String,
|
||||||
|
/// RFC 3339 UTC timestamp. Used by the smoke test's reboot-
|
||||||
|
/// detection gate — any timestamp strictly greater than the gate
|
||||||
|
/// is evidence of a post-reboot write. `chrono::DateTime<Utc>`
|
||||||
|
/// serde-serializes as RFC 3339, so the wire format stays
|
||||||
|
/// lex-comparable (the smoke's string `>` still works).
|
||||||
|
pub timestamp: DateTime<Utc>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn status_roundtrip() {
|
||||||
|
let s = AgentStatus {
|
||||||
|
device_id: Id::from("pi-01".to_string()),
|
||||||
|
status: "running".to_string(),
|
||||||
|
timestamp: DateTime::parse_from_rfc3339("2026-04-21T18:15:42Z")
|
||||||
|
.unwrap()
|
||||||
|
.with_timezone(&Utc),
|
||||||
|
};
|
||||||
|
let json = serde_json::to_string(&s).unwrap();
|
||||||
|
let back: AgentStatus = serde_json::from_str(&json).unwrap();
|
||||||
|
assert_eq!(s, back);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn status_has_expected_wire_keys() {
|
||||||
|
let s = AgentStatus {
|
||||||
|
device_id: Id::from("pi-01".to_string()),
|
||||||
|
status: "running".to_string(),
|
||||||
|
timestamp: DateTime::parse_from_rfc3339("2026-04-21T18:15:42Z")
|
||||||
|
.unwrap()
|
||||||
|
.with_timezone(&Utc),
|
||||||
|
};
|
||||||
|
let json = serde_json::to_string(&s).unwrap();
|
||||||
|
// device_id must serialize as a flat string (not {"value": …}).
|
||||||
|
// Relies on `#[serde(transparent)]` on `harmony_types::id::Id`.
|
||||||
|
assert!(json.contains("\"device_id\":\"pi-01\""), "got {json}");
|
||||||
|
assert!(json.contains("\"status\":\"running\""));
|
||||||
|
// RFC 3339 output — the smoke script greps a `"timestamp":"<rfc3339>"`
|
||||||
|
// literal and compares lexicographically against a gate.
|
||||||
|
assert!(json.contains("\"timestamp\":\"2026-04-21T18:15:42Z\""));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -242,14 +242,16 @@ async fn provision_ssh_keypair() -> Result<IotSshKeypair, ExecutorError> {
|
|||||||
|
|
||||||
info!("generating ed25519 ssh keypair at {priv_path:?} (one-time)");
|
info!("generating ed25519 ssh keypair at {priv_path:?} (one-time)");
|
||||||
let status = Command::new("ssh-keygen")
|
let status = Command::new("ssh-keygen")
|
||||||
.arg("-t")
|
.args([
|
||||||
.arg("ed25519")
|
"-t",
|
||||||
.arg("-N")
|
"ed25519",
|
||||||
.arg("") // no passphrase
|
"-N",
|
||||||
.arg("-C")
|
"", // no passphrase
|
||||||
.arg("harmony-iot-smoke")
|
"-C",
|
||||||
.arg("-f")
|
"harmony-iot-smoke",
|
||||||
.arg(&priv_path)
|
"-f",
|
||||||
|
])
|
||||||
|
.arg(&priv_path) // PathBuf — kept separate so we don't force &str conversion
|
||||||
.stdout(Stdio::null())
|
.stdout(Stdio::null())
|
||||||
.stderr(Stdio::piped())
|
.stderr(Stdio::piped())
|
||||||
.output()
|
.output()
|
||||||
|
|||||||
@@ -1,3 +1,12 @@
|
|||||||
|
//! Podman Score types and their `Score<T>` / `Interpret<T>` bindings.
|
||||||
|
//!
|
||||||
|
//! `ContainerRuntime` is a first-class harmony capability (see
|
||||||
|
//! [`crate::topology::ContainerRuntime`]); the types that describe a
|
||||||
|
//! set of containers a caller wants running live here next to the
|
||||||
|
//! trait impls that route them to interpretation. These types are
|
||||||
|
//! independent of any particular product (IoT fleet, OKD fleet, etc.)
|
||||||
|
//! — callers serialize them over whatever transport they like.
|
||||||
|
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
@@ -8,6 +17,7 @@ use crate::{
|
|||||||
|
|
||||||
use super::interpret::PodmanV0Interpret;
|
use super::interpret::PodmanV0Interpret;
|
||||||
|
|
||||||
|
/// A single container managed by podman on the target host.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
pub struct PodmanService {
|
pub struct PodmanService {
|
||||||
pub name: String,
|
pub name: String,
|
||||||
@@ -15,28 +25,34 @@ pub struct PodmanService {
|
|||||||
pub ports: Vec<String>,
|
pub ports: Vec<String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// v0 Score for podman-based workloads.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||||
pub struct PodmanV0Score {
|
pub struct PodmanV0Score {
|
||||||
pub services: Vec<PodmanService>,
|
pub services: Vec<PodmanService>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl PodmanV0Score {
|
impl PodmanV0Score {
|
||||||
/// Label value applied to every container this score creates, used by
|
/// Label value applied to every container this Score creates. The
|
||||||
/// [`ContainerRuntime::list_managed_services`] to enumerate the set of
|
/// caller uses the label to enumerate the set of containers that
|
||||||
/// containers that belong to a specific Score (e.g. so the agent can
|
/// belong to a given Score instance (e.g. to tear them down when
|
||||||
/// remove them when the corresponding KV entry is deleted). The label
|
/// the corresponding desired-state entry is deleted).
|
||||||
/// key is [`crate::modules::podman::DEPLOYMENT_LABEL`].
|
|
||||||
///
|
///
|
||||||
/// For v0 there is a single PodmanV0Score per KV key and no multi-score
|
/// For v0 there is a single PodmanV0Score per desired-state key
|
||||||
/// scheduling on the device, so a stable hash over the service names
|
/// and no multi-score scheduling on the target host, so a stable
|
||||||
/// is an adequate identifier. When v0.1 introduces multiple scores per
|
/// join over the service names is an adequate identifier. When
|
||||||
/// device the caller will pass an explicit deployment id.
|
/// v0.1 introduces multiple scores per host the caller will pass
|
||||||
|
/// an explicit deployment id.
|
||||||
pub fn deployment_label(&self) -> String {
|
pub fn deployment_label(&self) -> String {
|
||||||
let names: Vec<&str> = self.services.iter().map(|s| s.name.as_str()).collect();
|
let names: Vec<&str> = self.services.iter().map(|s| s.name.as_str()).collect();
|
||||||
names.join(",")
|
names.join(",")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/// Wire envelope for a reconciler-style Score payload: externally
|
||||||
|
/// tagged so the JSON is `{"type": "PodmanV0", "data": { ... }}`.
|
||||||
|
/// Adding a new variant is additive — emitters stay opaque routers,
|
||||||
|
/// consumers learn the new variant, older consumers harmlessly
|
||||||
|
/// log-and-skip the unknown tag.
|
||||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
|
||||||
#[serde(tag = "type", content = "data")]
|
#[serde(tag = "type", content = "data")]
|
||||||
pub enum IotScore {
|
pub enum IotScore {
|
||||||
@@ -105,4 +121,23 @@ mod tests {
|
|||||||
let deserialized: IotScore = serde_json::from_str(&serialized).unwrap();
|
let deserialized: IotScore = serde_json::from_str(&serialized).unwrap();
|
||||||
assert_eq!(score, deserialized);
|
assert_eq!(score, deserialized);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn deployment_label_joins_service_names() {
|
||||||
|
let score = PodmanV0Score {
|
||||||
|
services: vec![
|
||||||
|
PodmanService {
|
||||||
|
name: "web".to_string(),
|
||||||
|
image: "nginx".to_string(),
|
||||||
|
ports: vec![],
|
||||||
|
},
|
||||||
|
PodmanService {
|
||||||
|
name: "api".to_string(),
|
||||||
|
image: "myapp".to_string(),
|
||||||
|
ports: vec![],
|
||||||
|
},
|
||||||
|
],
|
||||||
|
};
|
||||||
|
assert_eq!(score.deployment_label(), "web,api");
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -20,6 +20,7 @@ use serde::{Deserialize, Serialize};
|
|||||||
/// **It is not meant to be very secure or unique**, it is suitable to generate up to 10 000 items per
|
/// **It is not meant to be very secure or unique**, it is suitable to generate up to 10 000 items per
|
||||||
/// second with a reasonable collision rate of 0,000014 % as calculated by this calculator : https://kevingal.com/apps/collision.html
|
/// second with a reasonable collision rate of 0,000014 % as calculated by this calculator : https://kevingal.com/apps/collision.html
|
||||||
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize)]
|
#[derive(Debug, Clone, Eq, PartialEq, PartialOrd, Ord, Serialize, Deserialize)]
|
||||||
|
#[serde(transparent)]
|
||||||
pub struct Id {
|
pub struct Id {
|
||||||
value: String,
|
value: String,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ edition = "2024"
|
|||||||
rust-version = "1.85"
|
rust-version = "1.85"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
|
||||||
harmony = { path = "../../harmony", default-features = false, features = ["podman"] }
|
harmony = { path = "../../harmony", default-features = false, features = ["podman"] }
|
||||||
async-nats = { workspace = true }
|
async-nats = { workspace = true }
|
||||||
chrono = { workspace = true }
|
chrono = { workspace = true }
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
|
use harmony_reconciler_contracts::Id;
|
||||||
use serde::Deserialize;
|
use serde::Deserialize;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
|
||||||
@@ -10,7 +11,9 @@ pub struct AgentConfig {
|
|||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
pub struct AgentSection {
|
pub struct AgentSection {
|
||||||
pub device_id: String,
|
/// Cross-boundary device identity. TOML deserializes the field
|
||||||
|
/// as a bare string thanks to `#[serde(transparent)]` on `Id`.
|
||||||
|
pub device_id: Id,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Deserialize)]
|
#[derive(Debug, Clone, Deserialize)]
|
||||||
|
|||||||
@@ -8,6 +8,9 @@ use anyhow::{Context, Result};
|
|||||||
use clap::Parser;
|
use clap::Parser;
|
||||||
use config::{AgentConfig, CredentialSource, TomlFileCredentialSource};
|
use config::{AgentConfig, CredentialSource, TomlFileCredentialSource};
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
|
use harmony_reconciler_contracts::{
|
||||||
|
AgentStatus, BUCKET_AGENT_STATUS, BUCKET_DESIRED_STATE, Id, status_key,
|
||||||
|
};
|
||||||
|
|
||||||
use harmony::inventory::Inventory;
|
use harmony::inventory::Inventory;
|
||||||
use harmony::modules::podman::PodmanTopology;
|
use harmony::modules::podman::PodmanTopology;
|
||||||
@@ -42,13 +45,13 @@ async fn connect_nats(cfg: &AgentConfig) -> Result<async_nats::Client> {
|
|||||||
|
|
||||||
async fn watch_desired_state(
|
async fn watch_desired_state(
|
||||||
client: async_nats::Client,
|
client: async_nats::Client,
|
||||||
device_id: String,
|
device_id: Id,
|
||||||
reconciler: Arc<Reconciler>,
|
reconciler: Arc<Reconciler>,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let jetstream = async_nats::jetstream::new(client);
|
let jetstream = async_nats::jetstream::new(client);
|
||||||
let bucket = jetstream
|
let bucket = jetstream
|
||||||
.create_key_value(async_nats::jetstream::kv::Config {
|
.create_key_value(async_nats::jetstream::kv::Config {
|
||||||
bucket: "desired-state".to_string(),
|
bucket: BUCKET_DESIRED_STATE.to_string(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
@@ -82,26 +85,27 @@ async fn watch_desired_state(
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn report_status(client: async_nats::Client, device_id: String) -> Result<()> {
|
async fn report_status(client: async_nats::Client, device_id: Id) -> Result<()> {
|
||||||
let jetstream = async_nats::jetstream::new(client);
|
let jetstream = async_nats::jetstream::new(client);
|
||||||
let bucket = jetstream
|
let bucket = jetstream
|
||||||
.create_key_value(async_nats::jetstream::kv::Config {
|
.create_key_value(async_nats::jetstream::kv::Config {
|
||||||
bucket: "agent-status".to_string(),
|
bucket: BUCKET_AGENT_STATUS.to_string(),
|
||||||
..Default::default()
|
..Default::default()
|
||||||
})
|
})
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
let key = format!("status.{}", device_id);
|
let key = status_key(&device_id.to_string());
|
||||||
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
let mut interval = tokio::time::interval(Duration::from_secs(30));
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
interval.tick().await;
|
interval.tick().await;
|
||||||
let status = serde_json::json!({
|
let status = AgentStatus {
|
||||||
"device_id": device_id,
|
device_id: device_id.clone(),
|
||||||
"status": "running",
|
status: "running".to_string(),
|
||||||
"timestamp": chrono::Utc::now().to_rfc3339(),
|
timestamp: chrono::Utc::now(),
|
||||||
});
|
};
|
||||||
bucket.put(&key, status.to_string().into()).await?;
|
let payload = serde_json::to_vec(&status)?;
|
||||||
|
bucket.put(&key, payload.into()).await?;
|
||||||
tracing::debug!(key = %key, "reported status");
|
tracing::debug!(key = %key, "reported status");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ edition = "2024"
|
|||||||
rust-version = "1.85"
|
rust-version = "1.85"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
|
||||||
kube = { workspace = true, features = ["runtime", "derive"] }
|
kube = { workspace = true, features = ["runtime", "derive"] }
|
||||||
k8s-openapi.workspace = true
|
k8s-openapi.workspace = true
|
||||||
async-nats = { workspace = true }
|
async-nats = { workspace = true }
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ use std::time::Duration;
|
|||||||
|
|
||||||
use async_nats::jetstream::kv::Store;
|
use async_nats::jetstream::kv::Store;
|
||||||
use futures_util::StreamExt;
|
use futures_util::StreamExt;
|
||||||
|
use harmony_reconciler_contracts::desired_state_key;
|
||||||
use kube::api::{Patch, PatchParams};
|
use kube::api::{Patch, PatchParams};
|
||||||
use kube::runtime::Controller;
|
use kube::runtime::Controller;
|
||||||
use kube::runtime::controller::Action;
|
use kube::runtime::controller::Action;
|
||||||
@@ -127,7 +128,7 @@ fn serialize_score(score: &ScorePayload) -> Result<String, Error> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
fn kv_key(device_id: &str, deployment_name: &str) -> String {
|
fn kv_key(device_id: &str, deployment_name: &str) -> String {
|
||||||
format!("{device_id}.{deployment_name}")
|
desired_state_key(device_id, deployment_name)
|
||||||
}
|
}
|
||||||
|
|
||||||
fn error_policy(_obj: Arc<Deployment>, err: &Error, _ctx: Arc<Context>) -> Action {
|
fn error_policy(_obj: Arc<Deployment>, err: &Error, _ctx: Arc<Context>) -> Action {
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ mod crd;
|
|||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use async_nats::jetstream;
|
use async_nats::jetstream;
|
||||||
use clap::{Parser, Subcommand};
|
use clap::{Parser, Subcommand};
|
||||||
|
use harmony_reconciler_contracts::BUCKET_DESIRED_STATE;
|
||||||
use kube::{Client, CustomResourceExt};
|
use kube::{Client, CustomResourceExt};
|
||||||
|
|
||||||
use crate::crd::Deployment;
|
use crate::crd::Deployment;
|
||||||
@@ -28,7 +29,7 @@ struct Cli {
|
|||||||
#[arg(
|
#[arg(
|
||||||
long,
|
long,
|
||||||
env = "KV_BUCKET",
|
env = "KV_BUCKET",
|
||||||
default_value = "desired-state",
|
default_value = BUCKET_DESIRED_STATE,
|
||||||
global = true
|
global = true
|
||||||
)]
|
)]
|
||||||
kv_bucket: String,
|
kv_bucket: String,
|
||||||
|
|||||||
Reference in New Issue
Block a user