Collapses the load-test harness's chart-gen + helm-install dance into
first-class Harmony Scores. Customer-facing path:
let score = FleetServerScore::new(nats, operator);
score.create_interpret().execute(&Inventory::empty(), &topology).await?;
FleetOperatorScore renders the operator chart (CRDs + RBAC + ServiceAccount
+ Deployment) into a tempdir and delegates to HelmChartScore. FleetServerScore
composes it with NatsBasicScore via fail-fast `?` chaining; Zitadel + Argo
hang off the same chain when their Scores land.
Structural change: CRD type definitions and chart-builder moved from
fleet/harmony-fleet-operator/src/{crd,chart}.rs into
harmony/src/modules/fleet/operator/. Harmony can't depend on the operator
crate (cycle), so the score-side code lives in harmony and the operator
binary imports the types right back via
`harmony::modules::fleet::operator::*`. Considered keeping CRDs in the
operator crate with the score either there or in a sibling crate, but
putting customer-facing scores in harmony/src/modules/fleet/ matches the
existing convention (FleetDeviceSetupScore, ProvisionVmScore) and keeps
the CRDs reachable from future harmony scores (e.g. an inventory aggregator
reading Device CRs) without dragging in the operator binary.
The operator's `chart` subcommand stays as a developer convenience
(routes through harmony::modules::fleet::operator::build_chart) so
`cargo run -p harmony-fleet-operator -- chart` still produces an
identical chart on disk for inspection. Existing examples
(fleet_load_test, harmony_apply_deployment) updated to import CRD types
from harmony directly.
load-test.sh phase 3c collapses to a single
`cargo run -p example_fleet_server_install` invocation; phase 2b's NATS
install still runs separately so the host-side NATS reachability probe
sits where it always did. Idempotency: re-running short-circuits via
HelmChartScore::find_installed_release on both inner installs.
Verified: cargo fmt --check, cargo clippy, cargo test all pass; the
4 fleet operator unit tests (2 migrated from operator crate, 2 new on
FleetOperatorScore defaults/builders) pass under `cargo test -p harmony`;
operator chart subcommand produces an identical chart structure
post-refactor.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
166 lines
5.5 KiB
Rust
166 lines
5.5 KiB
Rust
//! DeviceInfo (NATS `device-info` KV) → Device CR (kube).
|
|
//!
|
|
//! Agents publish a `DeviceInfo` payload to NATS on startup + on
|
|
//! label/inventory change. This reconciler watches that bucket and
|
|
//! materializes each entry as a cluster-scoped `Device` custom
|
|
//! resource, so label selectors and `kubectl get devices -l …`
|
|
//! work the way they do for K8s Nodes.
|
|
//!
|
|
//! Failure mode: idempotent server-side apply with a fixed field
|
|
//! manager, so repeated writes don't accumulate revisions and
|
|
//! concurrent edits from other sources stay merged safely.
|
|
|
|
use anyhow::Result;
|
|
use async_nats::jetstream::kv::{Operation, Store};
|
|
use futures_util::StreamExt;
|
|
use harmony_reconciler_contracts::{BUCKET_DEVICE_INFO, DeviceInfo};
|
|
use kube::Client;
|
|
use kube::api::{Api, DeleteParams, Patch, PatchParams};
|
|
use std::collections::BTreeMap;
|
|
|
|
use harmony::modules::fleet::operator::{Device, DeviceSpec};
|
|
|
|
const FIELD_MANAGER: &str = "harmony-fleet-operator-device-reconciler";
|
|
|
|
pub async fn run(client: Client, js: async_nats::jetstream::Context) -> Result<()> {
|
|
let bucket = js
|
|
.create_key_value(async_nats::jetstream::kv::Config {
|
|
bucket: BUCKET_DEVICE_INFO.to_string(),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
|
|
run_loop(client, bucket).await
|
|
}
|
|
|
|
async fn run_loop(client: Client, bucket: Store) -> Result<()> {
|
|
let devices: Api<Device> = Api::all(client);
|
|
// `watch_with_history` replays every current entry then streams
|
|
// live updates. Matches the aggregator's pattern and means we
|
|
// don't need a separate cold-start KV scan here.
|
|
let mut watch = bucket.watch_with_history(">").await?;
|
|
tracing::info!("device-reconciler: watching device-info KV");
|
|
|
|
while let Some(entry_res) = watch.next().await {
|
|
let entry = match entry_res {
|
|
Ok(e) => e,
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "device-reconciler: watch delivery error");
|
|
continue;
|
|
}
|
|
};
|
|
match entry.operation {
|
|
Operation::Put => {
|
|
let info: DeviceInfo = match serde_json::from_slice(&entry.value) {
|
|
Ok(d) => d,
|
|
Err(e) => {
|
|
tracing::warn!(key = %entry.key, error = %e, "device-reconciler: bad DeviceInfo payload");
|
|
continue;
|
|
}
|
|
};
|
|
if let Err(e) = upsert_device(&devices, &info).await {
|
|
tracing::warn!(
|
|
device = %info.device_id,
|
|
error = %e,
|
|
"device-reconciler: upsert failed"
|
|
);
|
|
}
|
|
}
|
|
Operation::Delete | Operation::Purge => {
|
|
let Some(device_id) = entry.key.strip_prefix("info.") else {
|
|
continue;
|
|
};
|
|
if let Err(e) = delete_device(&devices, device_id).await {
|
|
tracing::warn!(%device_id, error = %e, "device-reconciler: delete failed");
|
|
}
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn upsert_device(api: &Api<Device>, info: &DeviceInfo) -> Result<()> {
|
|
let name = info.device_id.to_string();
|
|
let mut device = Device::new(
|
|
&name,
|
|
DeviceSpec {
|
|
inventory: info.inventory.clone(),
|
|
},
|
|
);
|
|
device.metadata.labels = Some(clean_labels(&info.labels));
|
|
|
|
api.patch(
|
|
&name,
|
|
&PatchParams::apply(FIELD_MANAGER).force(),
|
|
&Patch::Apply(&device),
|
|
)
|
|
.await?;
|
|
tracing::debug!(%name, "device-reconciler: upserted");
|
|
Ok(())
|
|
}
|
|
|
|
async fn delete_device(api: &Api<Device>, name: &str) -> Result<()> {
|
|
match api.delete(name, &DeleteParams::default()).await {
|
|
Ok(_) => {
|
|
tracing::debug!(%name, "device-reconciler: deleted");
|
|
Ok(())
|
|
}
|
|
Err(kube::Error::Api(ae)) if ae.code == 404 => Ok(()),
|
|
Err(e) => Err(e.into()),
|
|
}
|
|
}
|
|
|
|
/// Drop labels whose keys or values violate k8s label-syntax rules.
|
|
/// Agents could in theory publish arbitrary strings; kube will reject
|
|
/// a whole apply if even one is malformed, which would take out that
|
|
/// device's registration. Skip-and-log beats block-everything.
|
|
fn clean_labels(raw: &BTreeMap<String, String>) -> BTreeMap<String, String> {
|
|
raw.iter()
|
|
.filter(|(k, v)| is_label_key(k) && is_label_value(v))
|
|
.map(|(k, v)| (k.clone(), v.clone()))
|
|
.collect()
|
|
}
|
|
|
|
fn is_label_key(s: &str) -> bool {
|
|
// Simplified: DNS-subdomain-like prefix + name ≤ 63 chars alnum/-/./_.
|
|
if s.is_empty() || s.len() > 253 {
|
|
return false;
|
|
}
|
|
let name = s.rsplit_once('/').map(|(_, n)| n).unwrap_or(s);
|
|
!name.is_empty()
|
|
&& name.len() <= 63
|
|
&& name
|
|
.chars()
|
|
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '.' || c == '_')
|
|
}
|
|
|
|
fn is_label_value(s: &str) -> bool {
|
|
if s.len() > 63 {
|
|
return false;
|
|
}
|
|
s.chars()
|
|
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '.' || c == '_')
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
|
|
#[test]
|
|
fn label_cleaner_accepts_common_cases() {
|
|
assert!(is_label_key("group"));
|
|
assert!(is_label_key("arch"));
|
|
assert!(is_label_key("fleet.nationtech.io/region"));
|
|
assert!(is_label_value("aarch64"));
|
|
assert!(is_label_value("site-01"));
|
|
}
|
|
|
|
#[test]
|
|
fn label_cleaner_rejects_bad_cases() {
|
|
assert!(!is_label_key(""));
|
|
assert!(!is_label_key("has space"));
|
|
assert!(!is_label_value("has space"));
|
|
assert!(!is_label_value(&"x".repeat(64)));
|
|
}
|
|
}
|