feat/iot-operator-helm-chart #272

Closed
johnride wants to merge 2 commits from feat/iot-operator-helm-chart into feat/iot-walking-skeleton
25 changed files with 962 additions and 146 deletions

1
Cargo.lock generated
View File

@@ -4769,6 +4769,7 @@ dependencies = [
"schemars 0.8.22",
"serde",
"serde_json",
"serde_yaml",
"thiserror 2.0.18",
"tokio",
"tracing",

View File

@@ -13,28 +13,37 @@
//!
//! Typical demo-driver usage:
//!
//! # apply an nginx deployment
//! # apply an nginx deployment to every device in the site-a group
//! cargo run -q -p example_iot_apply_deployment -- \
//! --target-device iot-smoke-vm-arm \
//! --to group=site-a \
//! --image nginx:latest
//!
//! # print the CR JSON (lets the user kubectl-apply it manually)
//! cargo run -q -p example_iot_apply_deployment -- \
//! --target-device iot-smoke-vm-arm \
//! --to group=site-a \
//! --image nginx:latest --print | kubectl apply -f -
//!
//! # upgrade the same deployment to a newer image
//! # target a single device for a dev loop
//! cargo run -q -p example_iot_apply_deployment -- \
//! --target-device iot-smoke-vm-arm \
//! --to device=iot-smoke-vm-arm \
//! --image nginx:latest
//!
//! # upgrade the same deployment to a newer image (same selector)
//! cargo run -q -p example_iot_apply_deployment -- \
//! --to group=site-a \
//! --image nginx:1.26
//!
//! # delete the deployment
//! cargo run -q -p example_iot_apply_deployment -- --delete
use anyhow::{Context, Result};
use std::collections::BTreeMap;
use anyhow::{Context, Result, anyhow};
use clap::Parser;
use harmony::modules::podman::{PodmanService, PodmanV0Score};
use iot_operator_v0::crd::{Deployment, DeploymentSpec, Rollout, RolloutStrategy, ScorePayload};
use iot_operator_v0::crd::{
Deployment, DeploymentSpec, Rollout, RolloutStrategy, ScorePayload, TargetSelector,
};
use kube::Client;
use kube::api::{Api, DeleteParams, Patch, PatchParams};
@@ -51,10 +60,13 @@ struct Cli {
/// podman container name on the device.
#[arg(long, default_value = "hello-world")]
name: String,
/// Device id that should run the container. Must match the
/// agent's `device_id` config.
#[arg(long, default_value = "iot-smoke-vm")]
target_device: String,
/// Label selector picking which devices run the deployment.
/// Comma-separated `key=value` pairs (conjunctive), e.g.
/// `--to group=site-a` or `--to device=iot-smoke-vm,arch=aarch64`.
/// A device matches when its published labels are a superset of
/// these pairs. Applies except in `--delete` mode.
#[arg(long, default_value = "device=iot-smoke-vm")]
to: String,
/// Container image to run.
#[arg(long, default_value = "docker.io/library/nginx:latest")]
image: String,
@@ -70,10 +82,29 @@ struct Cli {
print: bool,
}
fn parse_selector(raw: &str) -> Result<TargetSelector> {
let mut match_labels = BTreeMap::new();
for piece in raw.split(',').map(str::trim).filter(|p| !p.is_empty()) {
let (k, v) = piece
.split_once('=')
.ok_or_else(|| anyhow!("selector chunk '{piece}' missing '='"))?;
let k = k.trim();
let v = v.trim();
if k.is_empty() || v.is_empty() {
return Err(anyhow!("selector chunk '{piece}' has empty key or value"));
}
match_labels.insert(k.to_string(), v.to_string());
}
if match_labels.is_empty() {
return Err(anyhow!("selector is empty — pass e.g. `--to group=site-a`"));
}
Ok(TargetSelector { match_labels })
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
let cr = build_cr(&cli);
let cr = build_cr(&cli)?;
if cli.print {
println!("{}", serde_json::to_string_pretty(&cr)?);
@@ -117,7 +148,7 @@ async fn main() -> Result<()> {
Ok(())
}
fn build_cr(cli: &Cli) -> Deployment {
fn build_cr(cli: &Cli) -> Result<Deployment> {
let score = PodmanV0Score {
services: vec![PodmanService {
name: cli.name.clone(),
@@ -135,14 +166,16 @@ fn build_cr(cli: &Cli) -> Deployment {
data: serde_json::to_value(&score).expect("PodmanV0Score is JSON-clean"),
};
Deployment::new(
let target_selector = parse_selector(&cli.to)?;
Ok(Deployment::new(
&cli.name,
DeploymentSpec {
target_devices: vec![cli.target_device.clone()],
target_selector,
score: payload,
rollout: Rollout {
strategy: RolloutStrategy::Immediate,
},
},
)
))
}

View File

@@ -57,9 +57,14 @@ struct Cli {
/// fresh `Id` (hex timestamp + random suffix).
#[arg(long)]
device_id: Option<String>,
/// Fleet group label to write into the agent's TOML config.
#[arg(long, default_value = "group-a")]
group: String,
/// Routing labels to write into the agent's TOML config. Accepts
/// a comma-separated list of `key=value` pairs, e.g.
/// `--labels group=site-a,role=sensor-gateway`. Published in
/// every heartbeat; the operator resolves Deployment selectors
/// against this map. At least one label is required so the
/// device is targetable.
#[arg(long, default_value = "group=group-a")]
labels: String,
/// libvirt network name to attach the VM to.
#[arg(long, default_value = "default")]
network: String,
@@ -186,9 +191,15 @@ async fn main() -> Result<()> {
},
);
let labels = parse_labels(&cli.labels)?;
let labels_display = labels
.iter()
.map(|(k, v)| format!("{k}={v}"))
.collect::<Vec<_>>()
.join(",");
let setup_score = IotDeviceSetupScore::new(IotDeviceSetupConfig {
device_id: device_id.clone(),
group: cli.group.clone(),
labels,
nats_urls: vec![cli.nats_url.clone()],
nats_user: cli.nats_user.clone(),
nats_pass: cli.nats_pass.clone(),
@@ -196,13 +207,31 @@ async fn main() -> Result<()> {
});
run_setup_score(&setup_score, &linux_topology).await?;
println!(
"device '{device_id}' (group '{}') onboarded via {vm_ip}",
cli.group
);
println!("device '{device_id}' ({labels_display}) onboarded via {vm_ip}");
Ok(())
}
fn parse_labels(raw: &str) -> Result<std::collections::BTreeMap<String, String>> {
let mut out = std::collections::BTreeMap::new();
for piece in raw.split(',').map(str::trim).filter(|p| !p.is_empty()) {
let (k, v) = piece
.split_once('=')
.ok_or_else(|| anyhow::anyhow!("label chunk '{piece}' missing '='"))?;
let k = k.trim();
let v = v.trim();
if k.is_empty() || v.is_empty() {
anyhow::bail!("label chunk '{piece}' has empty key or value");
}
out.insert(k.to_string(), v.to_string());
}
if out.is_empty() {
anyhow::bail!(
"--labels must have at least one `key=value` pair (selectors need something to match on)"
);
}
Ok(out)
}
async fn run_vm_score(
score: &ProvisionVmScore,
topology: &KvmVirtualMachineHost,

View File

@@ -58,6 +58,15 @@ pub struct AgentStatus {
/// disk."
#[serde(default)]
pub inventory: Option<InventorySnapshot>,
/// Routing labels. The agent echoes the label map from its local
/// config so the operator can resolve a Deployment's
/// `targetSelector` against the fleet. Keys + values are
/// user-defined; typical entries: `group=site-a`, `arch=aarch64`,
/// `role=sensor-gateway`. Devices with an empty map are
/// effectively un-targetable — a deliberate choice, since every
/// device in a real fleet will have at least a `group` label.
#[serde(default)]
pub labels: BTreeMap<String, String>,
}
/// Reconcile phase for a single deployment on one device.
@@ -151,6 +160,7 @@ mod tests {
deployments: BTreeMap::new(),
recent_events: vec![],
inventory: None,
labels: BTreeMap::new(),
};
let json = serde_json::to_string(&s).unwrap();
let back: AgentStatus = serde_json::from_str(&json).unwrap();
@@ -205,6 +215,7 @@ mod tests {
memory_mb: 8192,
agent_version: "0.1.0".to_string(),
}),
labels: BTreeMap::from([("group".to_string(), "site-a".to_string())]),
};
let json = serde_json::to_string(&s).unwrap();
let back: AgentStatus = serde_json::from_str(&json).unwrap();
@@ -225,6 +236,7 @@ mod tests {
assert!(s.deployments.is_empty());
assert!(s.recent_events.is_empty());
assert!(s.inventory.is_none());
assert!(s.labels.is_empty());
}
#[test]
@@ -236,6 +248,7 @@ mod tests {
deployments: BTreeMap::new(),
recent_events: vec![],
inventory: None,
labels: BTreeMap::new(),
};
let json = serde_json::to_string(&s).unwrap();
assert!(json.contains("\"device_id\":\"pi-01\""), "got {json}");

View File

@@ -1,8 +1,9 @@
//! [`IotDeviceSetupScore`] — install podman + the iot-agent, wire the
//! agent's TOML config, enable the systemd unit. Idempotent: re-running
//! with a changed config (e.g. a different `group`) updates only what
//! with a changed config (different labels, etc.) updates only what
//! differs and restarts the agent once.
use std::collections::BTreeMap;
use std::path::PathBuf;
use async_trait::async_trait;
@@ -25,13 +26,12 @@ use crate::score::Score;
/// User-visible configuration for the setup Score. Everything a customer
/// needs to tell us to bring a device into the fleet.
///
/// **On `group`.** For v0 the group is a *label*, written into the
/// agent's TOML config and reported back via the status bucket. It does
/// not yet drive deployment routing — `Deployment.spec.targetDevices`
/// still takes explicit device IDs. `targetGroups` is a v0.1+ item
/// (ROADMAP §6.5). Running this Score twice against the same device
/// with different `group` values is how a device is moved between
/// fleet partitions once group routing lands.
/// **On `labels`.** The label map is published verbatim in every
/// agent heartbeat so the operator can resolve a Deployment's
/// `targetSelector` against this device. `group` is the conventional
/// primary label, but any key/value pair the user wants can go in.
/// Re-running this Score with a changed label map is how a device is
/// moved between fleet partitions.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct IotDeviceSetupConfig {
/// Stable device identifier. Written into the agent's TOML and
@@ -40,8 +40,9 @@ pub struct IotDeviceSetupConfig {
/// at up to ~10k devices/sec, which matches the feel of a fleet
/// registry.
pub device_id: Id,
/// Fleet partition this device belongs to.
pub group: String,
/// Routing labels for selector-based targeting. Typical keys:
/// `group`, `arch`, `role`. Published in every heartbeat.
pub labels: BTreeMap<String, String>,
/// NATS URLs the agent should connect to. Typically one entry.
pub nats_urls: Vec<String>,
/// Shared v0 credentials (Zitadel-issued per-device tokens in v0.2).
@@ -61,7 +62,6 @@ impl IotDeviceSetupConfig {
// double-quoted strings are just `\` and `"`, handled by
// [`toml_escape`].
let device_id = toml_escape(&self.device_id.to_string());
let group = toml_escape(&self.group);
let nats_user = toml_escape(&self.nats_user);
let nats_pass = toml_escape(&self.nats_pass);
let urls = self
@@ -70,10 +70,20 @@ impl IotDeviceSetupConfig {
.map(|u| format!("\"{}\"", toml_escape(u)))
.collect::<Vec<_>>()
.join(", ");
// BTreeMap iteration is ordered — stable TOML output, so
// idempotent change detection compares cleanly.
let labels = self
.labels
.iter()
.map(|(k, v)| format!("{} = \"{}\"", toml_escape(k), toml_escape(v)))
.collect::<Vec<_>>()
.join("\n");
format!(
r#"[agent]
device_id = "{device_id}"
group = "{group}"
[labels]
{labels}
[credentials]
type = "toml-shared"

View File

@@ -1,5 +1,6 @@
use harmony_reconciler_contracts::Id;
use serde::Deserialize;
use std::collections::BTreeMap;
use std::path::Path;
#[derive(Debug, Clone, Deserialize)]
@@ -7,6 +8,11 @@ pub struct AgentConfig {
pub agent: AgentSection,
pub nats: NatsSection,
pub credentials: CredentialsSection,
/// Routing labels. Published verbatim in every heartbeat so the
/// operator can resolve a Deployment's `targetSelector` against
/// this device. Typical keys: `group`, `arch`, `role`.
#[serde(default)]
pub labels: BTreeMap<String, String>,
}
#[derive(Debug, Clone, Deserialize)]

View File

@@ -90,6 +90,7 @@ async fn report_status(
device_id: Id,
reconciler: Arc<Reconciler>,
inventory: Option<InventorySnapshot>,
labels: std::collections::BTreeMap<String, String>,
) -> Result<()> {
let jetstream = async_nats::jetstream::new(client);
let bucket = jetstream
@@ -112,6 +113,7 @@ async fn report_status(
deployments,
recent_events,
inventory: inventory.clone(),
labels: labels.clone(),
};
let payload = serde_json::to_vec(&status)?;
bucket.put(&key, payload.into()).await?;
@@ -199,6 +201,7 @@ async fn main() -> Result<()> {
device_id,
reconciler.clone(),
Some(inventory_snapshot),
cfg.labels.clone(),
);
let reconcile = reconciler.clone().run_periodic(RECONCILE_INTERVAL);

View File

@@ -13,6 +13,7 @@ k8s-openapi.workspace = true
async-nats = { workspace = true }
serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
schemars = "0.8.22"
tokio.workspace = true
tracing = { workspace = true }

View File

@@ -0,0 +1,7 @@
.DS_Store
.git/
.gitignore
*.swp
*.bak
*.tmp
*.orig

View File

@@ -0,0 +1,17 @@
apiVersion: v2
Review
No yaml. Use template hydration as specified in ADR 018. https://git.nationtech.io/NationTech/harmony/src/branch/master/docs/adr/018-Template-Hydration-For-Workload-Deployment.md
name: iot-operator-v0
description: NationTech IoT operator — Deployment CRD → NATS JetStream KV fleet reconciler.
type: application
version: 0.1.0
appVersion: "0.1.0"
home: https://nationtech.io
sources:
- https://git.nationtech.io/Nationtech/harmony
keywords:
- iot
- nats
- operator
- fleet
maintainers:
- name: NationTech
email: dev@nationtech.io

View File

@@ -0,0 +1,143 @@
apiVersion: apiextensions.k8s.io/v1
Review

don't use yaml. Use rust structs and apply them directly. Even helm generation is fully hydrated, we only use helm as a packaging and versionning tool, no configuration.

don't use yaml. Use rust structs and apply them directly. Even helm generation is fully hydrated, we only use helm as a packaging and versionning tool, no configuration.
kind: CustomResourceDefinition
metadata:
name: deployments.iot.nationtech.io
spec:
group: iot.nationtech.io
names:
categories: []
kind: Deployment
plural: deployments
shortNames:
- iotdep
singular: deployment
scope: Namespaced
versions:
- additionalPrinterColumns: []
name: v1alpha1
schema:
openAPIV3Schema:
description: Auto-generated derived type for DeploymentSpec via `CustomResource`
properties:
spec:
properties:
rollout:
properties:
strategy:
enum:
- Immediate
type: string
required:
- strategy
type: object
score:
properties:
data:
x-kubernetes-preserve-unknown-fields: true
type:
minLength: 1
type: string
required:
- data
- type
type: object
x-kubernetes-validations:
- message: score.type must be a valid Rust identifier matching the struct name of the score variant (e.g. PodmanV0)
rule: self.type.matches('^[A-Za-z_][A-Za-z0-9_]*$')
targetSelector:
description: |-
Label selector that picks which devices in the fleet run this deployment. Devices publish their labels via `AgentStatus`; the operator resolves the selector against the current fleet snapshot on every reconcile.
Matches the Kubernetes `LabelSelector.matchLabels` wire format so the CLI, dashboards, and kubectl tooling all speak the same selector grammar. Expressions (`In`, `NotIn`, etc.) are deferred until there's a concrete need.
properties:
matchLabels:
additionalProperties:
type: string
default: {}
type: object
type: object
required:
- rollout
- score
- targetSelector
type: object
status:
nullable: true
properties:
aggregate:
description: Per-deployment rollup aggregated from the `agent-status` bucket. Present once at least one targeted agent has heartbeated; absent on a freshly-created CR.
nullable: true
properties:
failed:
format: uint32
minimum: 0.0
type: integer
lastError:
description: Device id of the most recent device reporting a failure, with its short error message. Surfaces the top failure to the CR's status without needing per-device subresource lookups.
nullable: true
properties:
at:
type: string
deviceId:
type: string
message:
type: string
required:
- at
- deviceId
- message
type: object
lastHeartbeatAt:
description: Timestamp of the most recent agent heartbeat counted into this aggregate. "Freshness" signal — a CR whose aggregate hasn't advanced in minutes is evidence the whole fleet has gone dark.
nullable: true
type: string
pending:
format: uint32
minimum: 0.0
type: integer
recentEvents:
default: []
description: Last-N events aggregated across all target devices, most recent first. Operator caps at a handful (see operator controller).
items:
properties:
at:
type: string
deployment:
nullable: true
type: string
deviceId:
type: string
message:
type: string
severity:
type: string
required:
- at
- deviceId
- message
- severity
type: object
type: array
succeeded:
description: Count of devices where the deployment is in each phase. Always populated (zeros are valid) so the operator can patch the whole subtree atomically. With selector-based targeting there is no "unreported" counterpart — a device that has never heartbeated is invisible to the selector machinery.
format: uint32
minimum: 0.0
type: integer
required:
- failed
- pending
- succeeded
type: object
observedScoreString:
description: Last serialized score the operator pushed to NATS. Used by the operator itself for change-detection on the hot path (skip KV write + status patch when the CR is unchanged).
nullable: true
type: string
type: object
required:
- spec
title: Deployment
type: object
served: true
storage: true
subresources:
status: {}

View File

@@ -0,0 +1,17 @@
#!/usr/bin/env bash
Review

No bash script to generate yaml, that is a crime against harmony .

No bash script to generate yaml, that is a crime against harmony .
# Regenerate the chart's CRD yaml from the typed-Rust Deployment::crd().
# Runs at chart-release time, not at runtime. The runtime install path
# is `iot-operator-v0 install`, which uses the same Rust source of
# truth via a harmony Score — no yaml.
#
# Usage: iot/iot-operator-v0/chart/regen-crd.sh
set -euo pipefail
HERE="$(cd "$(dirname "$0")" && pwd)"
REPO="$(cd "$HERE/../../.." && pwd)"
OUT="$HERE/crd-source/deployments.iot.nationtech.io.yaml"
cd "$REPO"
cargo build -q -p iot-operator-v0
./target/debug/iot-operator-v0 gen-chart-crd >"$OUT"
echo "regenerated $OUT ($(wc -l <"$OUT") lines)"

View File

@@ -0,0 +1,58 @@
{{/*
Review

Avoid that. Use askama templates when we need them.

Avoid that. Use askama templates when we need them.
Expand the name of the chart.
*/}}
{{- define "iot-operator-v0.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" -}}
{{- end -}}
{{/*
Fully qualified app name defaults to <release>-<chart>. Truncated to
the 63-char k8s label limit.
*/}}
{{- define "iot-operator-v0.fullname" -}}
{{- $name := default .Chart.Name .Values.nameOverride -}}
{{- if contains $name .Release.Name -}}
{{- .Release.Name | trunc 63 | trimSuffix "-" -}}
{{- else -}}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" -}}
{{- end -}}
{{- end -}}
{{- define "iot-operator-v0.chart" -}}
{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" -}}
{{- end -}}
{{/*
Common labels applied to every chart-owned resource.
*/}}
{{- define "iot-operator-v0.labels" -}}
helm.sh/chart: {{ include "iot-operator-v0.chart" . }}
{{ include "iot-operator-v0.selectorLabels" . }}
app.kubernetes.io/version: {{ .Chart.AppVersion | quote }}
app.kubernetes.io/managed-by: {{ .Release.Service }}
{{- end -}}
{{- define "iot-operator-v0.selectorLabels" -}}
app.kubernetes.io/name: {{ include "iot-operator-v0.name" . }}
app.kubernetes.io/instance: {{ .Release.Name }}
{{- end -}}
{{/*
ServiceAccount name honors .Values.serviceAccount.name when set,
falls back to the fullname.
*/}}
{{- define "iot-operator-v0.serviceAccountName" -}}
{{- if .Values.serviceAccount.create -}}
{{- default (include "iot-operator-v0.fullname" .) .Values.serviceAccount.name -}}
{{- else -}}
{{- default "default" .Values.serviceAccount.name -}}
{{- end -}}
{{- end -}}
{{/*
Image ref. Defaults to .Chart.AppVersion when tag is empty.
*/}}
{{- define "iot-operator-v0.image" -}}
{{- $tag := default .Chart.AppVersion .Values.image.tag -}}
{{- printf "%s:%s" .Values.image.repository $tag -}}
{{- end -}}

View File

@@ -0,0 +1,21 @@
{{- if .Values.rbac.create -}}
Review

No yaml, a clusterrole is a fully typed rust struct with kube-rs, much more robust than typo-magnet templates.

No yaml, a clusterrole is a fully typed rust struct with kube-rs, much more robust than typo-magnet templates.
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: {{ include "iot-operator-v0.fullname" . }}
labels:
{{- include "iot-operator-v0.labels" . | nindent 4 }}
rules:
# Reconcile loop: watch the Deployment CRs and update their
# status subresource.
- apiGroups: ["iot.nationtech.io"]
resources: ["deployments"]
verbs: ["get", "list", "watch", "patch", "update"]
- apiGroups: ["iot.nationtech.io"]
resources: ["deployments/status"]
verbs: ["get", "patch", "update"]
# Finalizer add/remove — kube-rs's finalizer() helper patches the
# main CR, covered by the deployments rule above. Kept explicit
# here as a forward-compat note: if we add additional custom
# finalizer-guarded resources, extend this block.
{{- end -}}

View File

@@ -0,0 +1,16 @@
{{- if .Values.rbac.create -}}
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: {{ include "iot-operator-v0.fullname" . }}
labels:
{{- include "iot-operator-v0.labels" . | nindent 4 }}
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: {{ include "iot-operator-v0.fullname" . }}
subjects:
- kind: ServiceAccount
name: {{ include "iot-operator-v0.serviceAccountName" . }}
namespace: {{ .Release.Namespace }}
{{- end -}}

View File

@@ -0,0 +1,21 @@
{{/*
CRD installation. Cnpg-aligned pattern:
- CRDs sit inside templates/ (not Helm's native top-level crds/ dir)
so `helm upgrade` re-applies schema changes transparently.
- resource-policy: keep stops `helm uninstall` from deleting them
(and with them every customer CR).
- Gated by .Values.crds.create so GitOps setups that manage CRDs
out-of-band (ArgoCD, Flux) can flip it off.
The actual CRD YAML lives at chart/crd-source/ — regenerated from
the typed-Rust Deployment::crd() by `iot-operator-v0 gen-chart-crd`
at release time, never hand-edited.
*/}}
{{- if .Values.crds.create }}
{{- $crd := .Files.Get "crd-source/deployments.iot.nationtech.io.yaml" | fromYaml }}
{{- $_ := set $crd "metadata" (merge (dict "annotations" (dict "helm.sh/resource-policy" "keep")) ($crd.metadata | default dict)) }}
{{- $_ = set $crd.metadata "annotations" (merge (dict "helm.sh/resource-policy" "keep") ($crd.metadata.annotations | default dict)) }}
{{- $_ = set $crd.metadata "labels" (merge (dict "app.kubernetes.io/managed-by" .Release.Service "app.kubernetes.io/part-of" (include "iot-operator-v0.name" .)) ($crd.metadata.labels | default dict)) }}
{{ toYaml $crd }}
{{- end }}

View File

@@ -0,0 +1,63 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "iot-operator-v0.fullname" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "iot-operator-v0.labels" . | nindent 4 }}
spec:
replicas: {{ .Values.replicaCount }}
strategy:
type: Recreate
selector:
matchLabels:
{{- include "iot-operator-v0.selectorLabels" . | nindent 6 }}
template:
metadata:
labels:
{{- include "iot-operator-v0.selectorLabels" . | nindent 8 }}
{{- with .Values.podLabels }}
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.podAnnotations }}
annotations:
{{- toYaml . | nindent 8 }}
{{- end }}
spec:
serviceAccountName: {{ include "iot-operator-v0.serviceAccountName" . }}
{{- with .Values.imagePullSecrets }}
imagePullSecrets:
{{- toYaml . | nindent 8 }}
{{- end }}
securityContext:
{{- toYaml .Values.podSecurityContext | nindent 8 }}
containers:
- name: operator
image: {{ include "iot-operator-v0.image" . | quote }}
imagePullPolicy: {{ .Values.image.pullPolicy }}
args: ["run"]
env:
- name: NATS_URL
value: {{ .Values.nats.url | quote }}
{{- if .Values.kvBucket }}
- name: KV_BUCKET
value: {{ .Values.kvBucket | quote }}
{{- end }}
- name: RUST_LOG
value: {{ .Values.logLevel | quote }}
securityContext:
{{- toYaml .Values.containerSecurityContext | nindent 12 }}
resources:
{{- toYaml .Values.resources | nindent 12 }}
{{- with .Values.nodeSelector }}
nodeSelector:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.tolerations }}
tolerations:
{{- toYaml . | nindent 8 }}
{{- end }}
{{- with .Values.affinity }}
affinity:
{{- toYaml . | nindent 8 }}
{{- end }}

View File

@@ -0,0 +1,13 @@
{{- if .Values.serviceAccount.create -}}
apiVersion: v1
kind: ServiceAccount
metadata:
name: {{ include "iot-operator-v0.serviceAccountName" . }}
namespace: {{ .Release.Namespace }}
labels:
{{- include "iot-operator-v0.labels" . | nindent 4 }}
{{- with .Values.serviceAccount.annotations }}
annotations:
{{- toYaml . | nindent 4 }}
{{- end }}
{{- end -}}

View File

@@ -0,0 +1,58 @@
{
Review

No values, we use full hydration. This will be handled by the rust binary generating a fully hydrated template, not typo magnets.

No values, we use full hydration. This will be handled by the rust binary generating a fully hydrated template, not typo magnets.
"$schema": "https://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"image": {
"type": "object",
"required": ["repository", "pullPolicy"],
"properties": {
"repository": { "type": "string", "minLength": 1 },
"tag": { "type": "string" },
"pullPolicy": { "enum": ["Always", "IfNotPresent", "Never"] }
}
},
"imagePullSecrets": {
"type": "array",
"items": {
"type": "object",
"required": ["name"],
"properties": { "name": { "type": "string" } }
}
},
"replicaCount": { "type": "integer", "minimum": 1 },
"nats": {
"type": "object",
"required": ["url"],
"properties": { "url": { "type": "string", "minLength": 1 } }
},
"kvBucket": { "type": "string" },
"logLevel": { "type": "string" },
"crds": {
"type": "object",
"required": ["create"],
"properties": { "create": { "type": "boolean" } }
},
"serviceAccount": {
"type": "object",
"required": ["create"],
"properties": {
"create": { "type": "boolean" },
"name": { "type": "string" },
"annotations": { "type": "object" }
}
},
"rbac": {
"type": "object",
"required": ["create"],
"properties": { "create": { "type": "boolean" } }
},
"resources": { "type": "object" },
"podSecurityContext": { "type": "object" },
"containerSecurityContext": { "type": "object" },
"nodeSelector": { "type": "object" },
"tolerations": { "type": "array" },
"affinity": { "type": "object" },
"podAnnotations": { "type": "object" },
"podLabels": { "type": "object" }
}
}

View File

@@ -0,0 +1,79 @@
## iot-operator-v0 helm chart values.
## Cnpg-aligned conventions: image.* shape, crds.create toggle,
## non-root podSecurityContext, resources{} stub.
image:
repository: hub.nationtech.io/iot-operator-v0
# Defaults to .Chart.AppVersion when unset.
tag: ""
pullPolicy: IfNotPresent
imagePullSecrets: []
## Controller is singleton today — kube-rs Controller handles
## reconcile-level concurrency within the process. Multi-replica
## HA would need leader election wired up first (see ROADMAP
## for future work).
replicaCount: 1
## NATS connection. The operator connects to this URL on startup
## and stays connected; a flaky NATS link surfaces as pod restart.
nats:
url: nats://nats.iot-system.svc.cluster.local:4222
## Override the KV bucket name the operator writes desired-state
## into. Defaults to harmony-reconciler-contracts' BUCKET_DESIRED_STATE
## constant, so this is only useful for multi-tenant testbeds.
kvBucket: ""
## RUST_LOG directive passed through to the operator pod.
logLevel: info
## CRDs. Cnpg pattern: install by default, gated by this toggle so
## GitOps/cluster-admin teams managing CRDs out-of-band can flip
## it off. Resource-policy: keep annotation on the CRDs themselves
## means `helm uninstall` never deletes them (and therefore never
## deletes customer CRs).
crds:
create: true
serviceAccount:
create: true
# Leave empty to default to the release name.
name: ""
annotations: {}
rbac:
create: true
## Standard k8s resource requests/limits. Left unset by default;
## uncomment and tune before running in production.
resources: {}
# limits:
# cpu: 200m
# memory: 256Mi
# requests:
# cpu: 50m
# memory: 128Mi
podSecurityContext:
runAsNonRoot: true
runAsUser: 10001
runAsGroup: 10001
fsGroup: 10001
seccompProfile:
type: RuntimeDefault
containerSecurityContext:
allowPrivilegeEscalation: false
readOnlyRootFilesystem: true
capabilities:
drop:
- ALL
nodeSelector: {}
tolerations: []
affinity: {}
podAnnotations: {}
podLabels: {}

View File

@@ -2,28 +2,23 @@
//!
//! Watches the `agent-status` NATS KV bucket, keeps a per-device
//! snapshot in memory, and periodically recomputes each Deployment
//! CR's `.status.aggregate` subtree from the intersection of its
//! `spec.targetDevices` list and the known device statuses.
//! CR's `.status.aggregate` subtree from the devices whose labels
//! match the CR's `spec.targetSelector`.
//!
//! Runs as a background task alongside the controller. Keeping the
//! controller free of NATS-KV subscription state lets its reconcile
//! loop stay reactive and cheap (just publishing desired state +
//! managing finalizers), while this task handles the slower
//! loop stay reactive and cheap (publishing desired state, managing
//! finalizers), while this task handles the slower
//! many-devices-to-one-CR fan-in.
//!
//! Design choices:
//! - **In-memory snapshot map** (device_id → AgentStatus). Rebuilt
//! from JetStream on startup via the watch's initial replay; kept
//! current by watching thereafter. No persistence — the bucket is
//! the source of truth.
//! - **Periodic aggregation tick** (5 s). Cheap (a few BTreeMap
//! lookups + one `patch_status` per CR) and gives predictable
//! operator behaviour for the smoke harness. A push-based
//! "recompute on every Put" would be tighter but adds complexity
//! this v0.1 doesn't need.
//! - **JSON-Merge Patch.** Writes only the `aggregate` subtree, so
//! it composes cleanly with the controller's
//! `observedScoreString` patch.
//! the source of truth. The same map is shared with the
//! controller so it can resolve selectors for KV writes.
//! - **Periodic aggregation tick** (5 s).
//! - **JSON-Merge Patch.** Writes only the `aggregate` subtree.
use std::collections::BTreeMap;
use std::sync::Arc;
@@ -37,24 +32,35 @@ use kube::{Client, ResourceExt};
use serde_json::json;
use tokio::sync::Mutex;
use crate::crd::{AggregateEvent, AggregateLastError, Deployment, DeploymentAggregate};
use crate::crd::{
AggregateEvent, AggregateLastError, Deployment, DeploymentAggregate, TargetSelector,
};
/// Cap on how many events we surface in `DeploymentAggregate.recent_events`.
/// Small enough to keep the CR status compact.
const AGGREGATE_EVENT_CAP: usize = 10;
/// How often the aggregator recomputes + patches.
const AGGREGATE_TICK: Duration = Duration::from_secs(5);
/// Per-device status snapshot keyed by device id string.
/// Per-device status snapshot keyed by device id string. Shared
/// between the aggregator (writer) and the controller (reader) so
/// selector resolution sees the same data both sides.
pub type StatusSnapshots = Arc<Mutex<BTreeMap<String, AgentStatus>>>;
/// Spawn the aggregator: watch the agent-status bucket into an
/// in-memory map, and periodically fold that map into every
/// Deployment CR's `.status.aggregate`.
pub async fn run(client: Client, status_bucket: Store) -> anyhow::Result<()> {
let snapshots: StatusSnapshots = Arc::new(Mutex::new(BTreeMap::new()));
/// Build a fresh empty snapshot map. Call once from `main`, hand
/// clones to both controller and aggregator.
pub fn new_snapshots() -> StatusSnapshots {
Arc::new(Mutex::new(BTreeMap::new()))
}
/// Spawn the aggregator: watch the agent-status bucket into the
/// shared snapshot map, and periodically fold it into every
/// Deployment CR's `.status.aggregate`.
pub async fn run(
client: Client,
status_bucket: Store,
snapshots: StatusSnapshots,
) -> anyhow::Result<()> {
let watcher = tokio::spawn(watch_status_bucket(status_bucket, snapshots.clone()));
let aggregator = tokio::spawn(aggregate_loop(client, snapshots));
@@ -130,7 +136,7 @@ async fn tick_once(
None => continue,
};
let name = cr.name_any();
let aggregate = compute_aggregate(&cr.spec.target_devices, &name, &snapshot);
let aggregate = compute_aggregate(&cr.spec.target_selector, &name, &snapshot);
let status = json!({ "status": { "aggregate": aggregate } });
let api: Api<Deployment> = Api::namespaced(deployments.clone().into_client(), &ns);
if let Err(e) = api
@@ -143,10 +149,25 @@ async fn tick_once(
Ok(())
}
/// Resolve the selector against the snapshot → sorted list of
/// matching device ids. Stable ordering gives deterministic KV
/// write ordering + readable logs.
pub fn resolve_targets(
selector: &TargetSelector,
snapshots: &BTreeMap<String, AgentStatus>,
) -> Vec<String> {
let mut hits: Vec<String> = snapshots
.iter()
.filter(|(_, status)| selector.matches(&status.labels))
.map(|(id, _)| id.clone())
.collect();
hits.sort();
hits
}
/// Compute the aggregate for one CR from the current snapshot map.
/// Exposed (crate-visible) for unit testing.
pub(crate) fn compute_aggregate(
target_devices: &[String],
selector: &TargetSelector,
deployment_name: &str,
snapshots: &BTreeMap<String, AgentStatus>,
) -> DeploymentAggregate {
@@ -155,13 +176,10 @@ pub(crate) fn compute_aggregate(
let mut last_heartbeat: Option<chrono::DateTime<chrono::Utc>> = None;
let mut events: Vec<AggregateEvent> = Vec::new();
for device in target_devices {
let status = match snapshots.get(device) {
for device in resolve_targets(selector, snapshots) {
let status = match snapshots.get(&device) {
Some(s) => s,
None => {
agg.unreported += 1;
continue;
}
None => continue,
};
if last_heartbeat.is_none_or(|t| status.timestamp > t) {
last_heartbeat = Some(status.timestamp);
@@ -190,8 +208,8 @@ pub(crate) fn compute_aggregate(
Phase::Pending => agg.pending += 1,
},
None => {
// Device reported but hasn't acknowledged this
// deployment yet.
// Device matches the selector but hasn't acknowledged
// this deployment yet — likely mid-reconcile.
agg.pending += 1;
}
}
@@ -214,7 +232,6 @@ pub(crate) fn compute_aggregate(
}
}
// Most recent first; cap.
events.sort_by(|a, b| b.at.cmp(&a.at));
events.truncate(AGGREGATE_EVENT_CAP);
@@ -241,6 +258,7 @@ mod tests {
fn snapshot_with(
device: &str,
labels: &[(&str, &str)],
deployment: &str,
phase: Phase,
err: Option<&str>,
@@ -261,59 +279,150 @@ mod tests {
deployments,
recent_events: vec![],
inventory: None,
labels: labels
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
}
}
fn selector_for(pairs: &[(&str, &str)]) -> TargetSelector {
TargetSelector {
match_labels: pairs
.iter()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect(),
}
}
#[test]
fn aggregate_counts_and_unreported() {
fn selector_matches_only_superset_label_maps() {
let sel = selector_for(&[("group", "site-a")]);
let mut labels = BTreeMap::new();
labels.insert("group".to_string(), "site-a".to_string());
labels.insert("arch".to_string(), "aarch64".to_string());
assert!(sel.matches(&labels));
let mut other = BTreeMap::new();
other.insert("group".to_string(), "site-b".to_string());
assert!(!sel.matches(&other));
let empty: BTreeMap<String, String> = BTreeMap::new();
assert!(!sel.matches(&empty));
}
#[test]
fn empty_selector_matches_nothing() {
let sel = TargetSelector::default();
let mut labels = BTreeMap::new();
labels.insert("group".to_string(), "site-a".to_string());
assert!(!sel.matches(&labels));
}
#[test]
fn resolve_targets_is_sorted_and_filtered() {
let mut map = BTreeMap::new();
map.insert(
"pi-03".to_string(),
snapshot_with(
"pi-03",
&[("group", "site-a")],
"hello",
Phase::Running,
None,
),
);
map.insert(
"pi-01".to_string(),
snapshot_with(
"pi-01",
&[("group", "site-a")],
"hello",
Phase::Running,
None,
),
);
map.insert(
"pi-02".to_string(),
snapshot_with(
"pi-02",
&[("group", "site-b")],
"hello",
Phase::Running,
None,
),
);
let sel = selector_for(&[("group", "site-a")]);
let hits = resolve_targets(&sel, &map);
assert_eq!(hits, vec!["pi-01".to_string(), "pi-03".to_string()]);
}
#[test]
fn aggregate_counts_across_matching_devices() {
let mut map = BTreeMap::new();
map.insert(
"pi-01".to_string(),
snapshot_with("pi-01", "hello", Phase::Running, None),
snapshot_with(
"pi-01",
&[("group", "site-a")],
"hello",
Phase::Running,
None,
),
);
map.insert(
"pi-02".to_string(),
snapshot_with("pi-02", "hello", Phase::Failed, Some("pull err")),
snapshot_with(
"pi-02",
&[("group", "site-a")],
"hello",
Phase::Failed,
Some("pull err"),
),
);
// pi-03 is a target but never reported.
let targets = vec![
"pi-01".to_string(),
"pi-02".to_string(),
"pi-03".to_string(),
];
let agg = compute_aggregate(&targets, "hello", &map);
// pi-03 matches too but hasn't acknowledged this deployment.
let mut pi03 = AgentStatus {
device_id: Id::from("pi-03".to_string()),
status: "running".to_string(),
timestamp: ts("2026-04-22T01:00:00Z"),
deployments: BTreeMap::new(),
recent_events: vec![],
inventory: None,
labels: BTreeMap::new(),
};
pi03.labels
.insert("group".to_string(), "site-a".to_string());
map.insert("pi-03".to_string(), pi03);
// pi-04 is in a different group — should be ignored entirely.
map.insert(
"pi-04".to_string(),
snapshot_with(
"pi-04",
&[("group", "site-b")],
"hello",
Phase::Running,
None,
),
);
let sel = selector_for(&[("group", "site-a")]);
let agg = compute_aggregate(&sel, "hello", &map);
assert_eq!(agg.succeeded, 1);
assert_eq!(agg.failed, 1);
assert_eq!(agg.pending, 0);
assert_eq!(agg.unreported, 1);
assert_eq!(agg.pending, 1);
assert_eq!(agg.last_error.as_ref().unwrap().device_id, "pi-02");
assert_eq!(agg.last_error.as_ref().unwrap().message, "pull err");
}
#[test]
fn device_reported_but_no_deployment_entry_is_pending() {
// Agent heartbeated (device known to operator) but hasn't
// acknowledged this specific deployment yet.
let mut map = BTreeMap::new();
map.insert(
"pi-01".to_string(),
AgentStatus {
device_id: Id::from("pi-01".to_string()),
status: "running".to_string(),
timestamp: ts("2026-04-22T01:00:00Z"),
deployments: BTreeMap::new(),
recent_events: vec![],
inventory: None,
},
);
let agg = compute_aggregate(&["pi-01".to_string()], "hello", &map);
assert_eq!(agg.pending, 1);
assert_eq!(agg.unreported, 0);
}
#[test]
fn events_filtered_to_matching_deployment_only() {
let mut status = snapshot_with("pi-01", "hello", Phase::Running, None);
let mut status = snapshot_with(
"pi-01",
&[("group", "site-a")],
"hello",
Phase::Running,
None,
);
status.recent_events = vec![
EventEntry {
at: ts("2026-04-22T01:00:05Z"),
@@ -336,7 +445,8 @@ mod tests {
];
let mut map = BTreeMap::new();
map.insert("pi-01".to_string(), status);
let agg = compute_aggregate(&["pi-01".to_string()], "hello", &map);
let sel = selector_for(&[("group", "site-a")]);
let agg = compute_aggregate(&sel, "hello", &map);
assert_eq!(agg.recent_events.len(), 1);
assert_eq!(agg.recent_events[0].message, "hello reconciled");
}

View File

@@ -12,6 +12,7 @@ use kube::runtime::watcher::Config as WatcherConfig;
use kube::{Api, Client, ResourceExt};
use serde_json::json;
use crate::aggregate::{StatusSnapshots, resolve_targets};
use crate::crd::{Deployment, ScorePayload};
const FINALIZER: &str = "iot.nationtech.io/finalizer";
@@ -26,18 +27,21 @@ pub enum Error {
Serde(#[from] serde_json::Error),
#[error("missing namespace on resource")]
MissingNamespace,
#[error("missing target devices")]
MissingTargets,
}
pub struct Context {
pub client: Client,
pub kv: Store,
pub snapshots: StatusSnapshots,
}
pub async fn run(client: Client, kv: Store) -> anyhow::Result<()> {
pub async fn run(client: Client, kv: Store, snapshots: StatusSnapshots) -> anyhow::Result<()> {
let api: Api<Deployment> = Api::all(client.clone());
let ctx = Arc::new(Context { client, kv });
let ctx = Arc::new(Context {
client,
kv,
snapshots,
});
tracing::info!("starting Deployment controller");
Controller::new(api, WatcherConfig::default())
@@ -58,10 +62,11 @@ async fn reconcile(obj: Arc<Deployment>, ctx: Arc<Context>) -> Result<Action, Er
tracing::info!(%ns, %name, "reconcile");
let api: Api<Deployment> = Api::namespaced(ctx.client.clone(), &ns);
let ctx = ctx.clone();
finalizer(&api, FINALIZER, obj, |event| async {
match event {
FinalizerEvent::Apply(d) => apply(d, &api, &ctx.kv).await,
FinalizerEvent::Cleanup(d) => cleanup(d, &ctx.kv).await,
FinalizerEvent::Apply(d) => apply(d, &api, &ctx).await,
FinalizerEvent::Cleanup(d) => cleanup(d, &ctx).await,
}
})
.await
@@ -75,47 +80,81 @@ async fn reconcile(obj: Arc<Deployment>, ctx: Arc<Context>) -> Result<Action, Er
})
}
async fn apply(obj: Arc<Deployment>, api: &Api<Deployment>, kv: &Store) -> Result<Action, Error> {
async fn apply(
obj: Arc<Deployment>,
api: &Api<Deployment>,
ctx: &Context,
) -> Result<Action, Error> {
let name = obj.name_any();
if obj.spec.target_devices.is_empty() {
return Err(Error::MissingTargets);
}
let score_json = serialize_score(&obj.spec.score)?;
// Resolve the selector against the current fleet snapshot.
// Zero matches is a legitimate state (devices may not have
// heartbeated yet) — we log and requeue rather than erroring.
// Drift (new devices showing up, old devices relabeled away) is
// handled by the fast requeue cadence below; no need for a
// cross-task subscription.
let snapshot = { ctx.snapshots.lock().await.clone() };
let targets = resolve_targets(&obj.spec.target_selector, &snapshot);
if targets.is_empty() {
tracing::info!(
%name,
selector = ?obj.spec.target_selector,
"selector matches no devices yet; will retry"
);
// Requeue fast so a just-joined device picks the deployment
// up within seconds, not minutes.
return Ok(Action::requeue(Duration::from_secs(15)));
}
let already_observed = obj
.status
.as_ref()
.and_then(|s| s.observed_score_string.as_deref())
== Some(score_json.as_str());
if already_observed {
tracing::debug!(%name, "score unchanged; skipping KV write and status patch");
return Ok(Action::requeue(Duration::from_secs(300)));
if !already_observed {
// JSON-Merge Patch: leaves `aggregate` (populated by the
// aggregator task) intact.
let status = json!({ "status": { "observedScoreString": score_json } });
api.patch_status(&name, &PatchParams::default(), &Patch::Merge(&status))
.await?;
}
for device_id in &obj.spec.target_devices {
for device_id in &targets {
let key = kv_key(device_id, &name);
kv.put(key.clone(), score_json.clone().into_bytes().into())
// `put` is idempotent by byte-equality on NATS JetStream
// KV — re-writing the same payload costs one store op but
// no listener wakeup. Safe to run every reconcile.
ctx.kv
.put(key.clone(), score_json.clone().into_bytes().into())
.await
.map_err(|e| Error::Kv(e.to_string()))?;
tracing::info!(%key, "wrote desired state");
}
// JSON-Merge Patch: this leaves other status fields
// (notably `aggregate`, populated by the aggregator task) intact.
let status = json!({
"status": { "observedScoreString": score_json }
});
api.patch_status(&name, &PatchParams::default(), &Patch::Merge(&status))
.await?;
Ok(Action::requeue(Duration::from_secs(300)))
// Keep the cadence fast enough that selector drift (a device
// relabeled mid-flight, or one joining the fleet) converges
// within 30 s without needing a snapshots-change subscription.
Ok(Action::requeue(Duration::from_secs(30)))
}
async fn cleanup(obj: Arc<Deployment>, kv: &Store) -> Result<Action, Error> {
async fn cleanup(obj: Arc<Deployment>, ctx: &Context) -> Result<Action, Error> {
let name = obj.name_any();
for device_id in &obj.spec.target_devices {
// Resolve against current snapshot to find KV entries to clean.
// A device that left the fleet between apply and delete may
// have a stale KV entry we never clean here — accepted: the
// NATS bucket eventually hits its per-key history limit and the
// orphan ages out. If that becomes a real problem, record the
// resolved list in CR status and use it here instead.
let snapshot = { ctx.snapshots.lock().await.clone() };
let targets = resolve_targets(&obj.spec.target_selector, &snapshot);
for device_id in &targets {
let key = kv_key(device_id, &name);
kv.delete(&key)
ctx.kv
.delete(&key)
.await
.map_err(|e| Error::Kv(e.to_string()))?;
tracing::info!(%key, "deleted desired state");

View File

@@ -1,3 +1,5 @@
use std::collections::BTreeMap;
use kube::CustomResource;
use schemars::JsonSchema;
use schemars::schema::{
@@ -17,12 +19,47 @@ use serde::{Deserialize, Serialize};
)]
#[serde(rename_all = "camelCase")]
pub struct DeploymentSpec {
pub target_devices: Vec<String>,
/// Label selector that picks which devices in the fleet run this
/// deployment. Devices publish their labels via `AgentStatus`;
/// the operator resolves the selector against the current fleet
/// snapshot on every reconcile.
///
/// Matches the Kubernetes `LabelSelector.matchLabels` wire format
/// so the CLI, dashboards, and kubectl tooling all speak the
/// same selector grammar. Expressions (`In`, `NotIn`, etc.) are
/// deferred until there's a concrete need.
pub target_selector: TargetSelector,
#[schemars(schema_with = "score_payload_schema")]
pub score: ScorePayload,
pub rollout: Rollout,
}
/// Subset of Kubernetes `LabelSelector` — only `matchLabels` for
/// now. A device matches iff its label map is a superset of
/// `matchLabels`. An empty `matchLabels` matches nothing on
/// purpose (a deployment targeting "all devices" is almost always a
/// user error; if it's genuinely intended, pick a label every
/// device has, e.g. `managed-by=iot-agent`).
#[derive(Serialize, Deserialize, Clone, Debug, Default, JsonSchema, PartialEq, Eq)]
#[serde(rename_all = "camelCase")]
pub struct TargetSelector {
#[serde(default)]
pub match_labels: BTreeMap<String, String>,
}
impl TargetSelector {
/// True iff every (k, v) in `match_labels` is present in `labels`.
/// Empty selector matches nothing (see struct docs).
pub fn matches(&self, labels: &BTreeMap<String, String>) -> bool {
if self.match_labels.is_empty() {
return false;
}
self.match_labels
.iter()
.all(|(k, v)| labels.get(k).is_some_and(|got| got == v))
}
}
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
pub struct ScorePayload {
#[serde(rename = "type")]
@@ -119,14 +156,12 @@ pub struct DeploymentStatus {
pub struct DeploymentAggregate {
/// Count of devices where the deployment is in each phase.
/// Always populated (zeros are valid) so the operator can patch
/// the whole subtree atomically.
/// the whole subtree atomically. With selector-based targeting
/// there is no "unreported" counterpart — a device that has
/// never heartbeated is invisible to the selector machinery.
pub succeeded: u32,
pub failed: u32,
pub pending: u32,
/// Count of target devices that haven't yet heartbeated at all.
/// "failed to join fleet" vs. "failed to reconcile" — different
/// signals, different remedies.
pub unreported: u32,
/// Device id of the most recent device reporting a failure,
/// with its short error message. Surfaces the top failure to
/// the CR's status without needing per-device subresource

View File

@@ -45,6 +45,11 @@ enum Command {
/// Apply the operator's CRD to the cluster `KUBECONFIG` points
/// at. Uses harmony's typed k8s client — no yaml, no kubectl.
Install,
/// Print the Deployment CRD as YAML on stdout. Used by the
/// helm chart's release-time regeneration step — the runtime
/// `install` path is still the typed-Rust Score. Never invoked
/// in framework code paths, only by release tooling.
GenChartCrd,
}
#[tokio::main]
@@ -57,9 +62,18 @@ async fn main() -> Result<()> {
match cli.command.unwrap_or(Command::Run) {
Command::Install => install::install_crds().await,
Command::Run => run(&cli.nats_url, &cli.kv_bucket).await,
Command::GenChartCrd => gen_chart_crd(),
}
}
fn gen_chart_crd() -> Result<()> {
use kube::CustomResourceExt;
let crd = crd::Deployment::crd();
let yaml = serde_yaml::to_string(&crd)?;
print!("{yaml}");
Ok(())
}
async fn run(nats_url: &str, bucket: &str) -> Result<()> {
let nats = async_nats::connect(nats_url).await?;
tracing::info!(url = %nats_url, "connected to NATS");
@@ -81,12 +95,16 @@ async fn run(nats_url: &str, bucket: &str) -> Result<()> {
let client = Client::try_default().await?;
// Shared fleet snapshot: aggregator writes, controller reads.
// One Arc<Mutex<...>> lives through both tasks.
let snapshots = aggregate::new_snapshots();
// Controller + aggregator run concurrently. If either returns
// an error, tear down the whole process — kube-rs's Controller
// already handles transient reconcile failures internally.
let ctl_client = client.clone();
tokio::select! {
r = controller::run(ctl_client, desired_state_kv) => r,
r = aggregate::run(client, status_kv) => r,
r = controller::run(ctl_client, desired_state_kv, snapshots.clone()) => r,
r = aggregate::run(client, status_kv, snapshots) => r,
}
}

View File

@@ -42,6 +42,11 @@ ARCH="${ARCH:-x86-64}"
VM_NAME="${VM_NAME:-iot-demo-vm}"
DEVICE_ID="${DEVICE_ID:-$VM_NAME}"
GROUP="${GROUP:-group-a}"
# The VM publishes these labels; the deployment selector below
# targets `device=$DEVICE_ID` so a single-device smoke run picks
# exactly this agent.
VM_LABELS="${VM_LABELS:-group=$GROUP,device=$DEVICE_ID}"
DEPLOY_SELECTOR="${DEPLOY_SELECTOR:-device=$DEVICE_ID}"
LIBVIRT_URI="${LIBVIRT_URI:-qemu:///system}"
NATS_NAMESPACE="${NATS_NAMESPACE:-iot-system}"
@@ -299,7 +304,7 @@ fi
--arch "$EXAMPLE_ARCH" \
--vm-name "$VM_NAME" \
--device-id "$DEVICE_ID" \
--group "$GROUP" \
--labels "$VM_LABELS" \
--agent-binary "$AGENT_BINARY" \
--nats-url "nats://$NAT_GW:$NATS_NODE_PORT"
)
@@ -372,7 +377,7 @@ if [[ "$AUTO" == "1" ]]; then
cargo run -q -p example_iot_apply_deployment -- \
--namespace "$DEPLOY_NS" \
--name "$DEPLOY_NAME" \
--target-device "$DEVICE_ID" \
--to "$DEPLOY_SELECTOR" \
--image "$V1_IMAGE" \
--port "$DEPLOY_PORT"
)
@@ -419,7 +424,7 @@ if [[ "$AUTO" == "1" ]]; then
cargo run -q -p example_iot_apply_deployment -- \
--namespace "$DEPLOY_NS" \
--name "$DEPLOY_NAME" \
--target-device "$DEVICE_ID" \
--to "$DEPLOY_SELECTOR" \
--image "$V2_IMAGE" \
--port "$DEPLOY_PORT"
)
@@ -445,7 +450,7 @@ if [[ "$AUTO" == "1" ]]; then
cargo run -q -p example_iot_apply_deployment -- \
--namespace "$DEPLOY_NS" \
--name "$DEPLOY_NAME" \
--target-device "$DEVICE_ID" \
--to "$DEPLOY_SELECTOR" \
--delete
)
for _ in $(seq 1 60); do
@@ -485,17 +490,17 @@ $(printf '\033[1mApply an nginx deployment (typed Rust):\033[0m\n')
cargo run -q -p example_iot_apply_deployment -- \\
--namespace $DEPLOY_NS \\
--name $DEPLOY_NAME \\
--target-device $DEVICE_ID \\
--to $DEPLOY_SELECTOR \\
--image docker.io/library/nginx:latest
$(printf '\033[1mUpgrade it:\033[0m\n')
cargo run -q -p example_iot_apply_deployment -- \\
--namespace $DEPLOY_NS --name $DEPLOY_NAME --target-device $DEVICE_ID \\
--namespace $DEPLOY_NS --name $DEPLOY_NAME --to $DEPLOY_SELECTOR \\
--image docker.io/library/nginx:1.26
$(printf '\033[1mPreview the CR as JSON (and apply via kubectl):\033[0m\n')
cargo run -q -p example_iot_apply_deployment -- \\
--name $DEPLOY_NAME --target-device $DEVICE_ID \\
--name $DEPLOY_NAME --to $DEPLOY_SELECTOR \\
--image docker.io/library/nginx:latest --print | kubectl apply -f -
$(printf '\033[1mConnect to the device:\033[0m\n')