Files
harmony/fleet/harmony-fleet-operator/src/device_reconciler.rs
Sylvain Tremblay 3a3e4a2312 feat(fleet): FleetOperatorScore + FleetServerScore for k8s server-side install
Collapses the load-test harness's chart-gen + helm-install dance into
first-class Harmony Scores. Customer-facing path:

  let score = FleetServerScore::new(nats, operator);
  score.create_interpret().execute(&Inventory::empty(), &topology).await?;

FleetOperatorScore renders the operator chart (CRDs + RBAC + ServiceAccount
+ Deployment) into a tempdir and delegates to HelmChartScore. FleetServerScore
composes it with NatsBasicScore via fail-fast `?` chaining; Zitadel + Argo
hang off the same chain when their Scores land.

Structural change: CRD type definitions and chart-builder moved from
fleet/harmony-fleet-operator/src/{crd,chart}.rs into
harmony/src/modules/fleet/operator/. Harmony can't depend on the operator
crate (cycle), so the score-side code lives in harmony and the operator
binary imports the types right back via
`harmony::modules::fleet::operator::*`. Considered keeping CRDs in the
operator crate with the score either there or in a sibling crate, but
putting customer-facing scores in harmony/src/modules/fleet/ matches the
existing convention (FleetDeviceSetupScore, ProvisionVmScore) and keeps
the CRDs reachable from future harmony scores (e.g. an inventory aggregator
reading Device CRs) without dragging in the operator binary.

The operator's `chart` subcommand stays as a developer convenience
(routes through harmony::modules::fleet::operator::build_chart) so
`cargo run -p harmony-fleet-operator -- chart` still produces an
identical chart on disk for inspection. Existing examples
(fleet_load_test, harmony_apply_deployment) updated to import CRD types
from harmony directly.

load-test.sh phase 3c collapses to a single
`cargo run -p example_fleet_server_install` invocation; phase 2b's NATS
install still runs separately so the host-side NATS reachability probe
sits where it always did. Idempotency: re-running short-circuits via
HelmChartScore::find_installed_release on both inner installs.

Verified: cargo fmt --check, cargo clippy, cargo test all pass; the
4 fleet operator unit tests (2 migrated from operator crate, 2 new on
FleetOperatorScore defaults/builders) pass under `cargo test -p harmony`;
operator chart subcommand produces an identical chart structure
post-refactor.

Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
2026-05-04 15:06:28 -04:00

166 lines
5.5 KiB
Rust

//! DeviceInfo (NATS `device-info` KV) → Device CR (kube).
//!
//! Agents publish a `DeviceInfo` payload to NATS on startup + on
//! label/inventory change. This reconciler watches that bucket and
//! materializes each entry as a cluster-scoped `Device` custom
//! resource, so label selectors and `kubectl get devices -l …`
//! work the way they do for K8s Nodes.
//!
//! Failure mode: idempotent server-side apply with a fixed field
//! manager, so repeated writes don't accumulate revisions and
//! concurrent edits from other sources stay merged safely.
use anyhow::Result;
use async_nats::jetstream::kv::{Operation, Store};
use futures_util::StreamExt;
use harmony_reconciler_contracts::{BUCKET_DEVICE_INFO, DeviceInfo};
use kube::Client;
use kube::api::{Api, DeleteParams, Patch, PatchParams};
use std::collections::BTreeMap;
use harmony::modules::fleet::operator::{Device, DeviceSpec};
const FIELD_MANAGER: &str = "harmony-fleet-operator-device-reconciler";
pub async fn run(client: Client, js: async_nats::jetstream::Context) -> Result<()> {
let bucket = js
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_DEVICE_INFO.to_string(),
..Default::default()
})
.await?;
run_loop(client, bucket).await
}
async fn run_loop(client: Client, bucket: Store) -> Result<()> {
let devices: Api<Device> = Api::all(client);
// `watch_with_history` replays every current entry then streams
// live updates. Matches the aggregator's pattern and means we
// don't need a separate cold-start KV scan here.
let mut watch = bucket.watch_with_history(">").await?;
tracing::info!("device-reconciler: watching device-info KV");
while let Some(entry_res) = watch.next().await {
let entry = match entry_res {
Ok(e) => e,
Err(e) => {
tracing::warn!(error = %e, "device-reconciler: watch delivery error");
continue;
}
};
match entry.operation {
Operation::Put => {
let info: DeviceInfo = match serde_json::from_slice(&entry.value) {
Ok(d) => d,
Err(e) => {
tracing::warn!(key = %entry.key, error = %e, "device-reconciler: bad DeviceInfo payload");
continue;
}
};
if let Err(e) = upsert_device(&devices, &info).await {
tracing::warn!(
device = %info.device_id,
error = %e,
"device-reconciler: upsert failed"
);
}
}
Operation::Delete | Operation::Purge => {
let Some(device_id) = entry.key.strip_prefix("info.") else {
continue;
};
if let Err(e) = delete_device(&devices, device_id).await {
tracing::warn!(%device_id, error = %e, "device-reconciler: delete failed");
}
}
}
}
Ok(())
}
async fn upsert_device(api: &Api<Device>, info: &DeviceInfo) -> Result<()> {
let name = info.device_id.to_string();
let mut device = Device::new(
&name,
DeviceSpec {
inventory: info.inventory.clone(),
},
);
device.metadata.labels = Some(clean_labels(&info.labels));
api.patch(
&name,
&PatchParams::apply(FIELD_MANAGER).force(),
&Patch::Apply(&device),
)
.await?;
tracing::debug!(%name, "device-reconciler: upserted");
Ok(())
}
async fn delete_device(api: &Api<Device>, name: &str) -> Result<()> {
match api.delete(name, &DeleteParams::default()).await {
Ok(_) => {
tracing::debug!(%name, "device-reconciler: deleted");
Ok(())
}
Err(kube::Error::Api(ae)) if ae.code == 404 => Ok(()),
Err(e) => Err(e.into()),
}
}
/// Drop labels whose keys or values violate k8s label-syntax rules.
/// Agents could in theory publish arbitrary strings; kube will reject
/// a whole apply if even one is malformed, which would take out that
/// device's registration. Skip-and-log beats block-everything.
fn clean_labels(raw: &BTreeMap<String, String>) -> BTreeMap<String, String> {
raw.iter()
.filter(|(k, v)| is_label_key(k) && is_label_value(v))
.map(|(k, v)| (k.clone(), v.clone()))
.collect()
}
fn is_label_key(s: &str) -> bool {
// Simplified: DNS-subdomain-like prefix + name ≤ 63 chars alnum/-/./_.
if s.is_empty() || s.len() > 253 {
return false;
}
let name = s.rsplit_once('/').map(|(_, n)| n).unwrap_or(s);
!name.is_empty()
&& name.len() <= 63
&& name
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '.' || c == '_')
}
fn is_label_value(s: &str) -> bool {
if s.len() > 63 {
return false;
}
s.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '.' || c == '_')
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn label_cleaner_accepts_common_cases() {
assert!(is_label_key("group"));
assert!(is_label_key("arch"));
assert!(is_label_key("fleet.nationtech.io/region"));
assert!(is_label_value("aarch64"));
assert!(is_label_value("site-01"));
}
#[test]
fn label_cleaner_rejects_bad_cases() {
assert!(!is_label_key(""));
assert!(!is_label_key("has space"));
assert!(!is_label_value("has space"));
assert!(!is_label_value(&"x".repeat(64)));
}
}