From 99e661ce4ddc5f7b046a3f45100ab740edd974a3 Mon Sep 17 00:00:00 2001 From: Jean-Gabriel Gill-Couture Date: Tue, 21 Apr 2026 23:33:06 -0400 Subject: [PATCH 1/2] feat(iot-operator): helm chart + gen-chart-crd subcommand MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Chapter 3 scaffolding. Chart layout mirrors the CloudNativePG convention after reviewing the CRD-in-chart vs CRD-as-hook tradeoff: CRDs live inside templates/ (so helm upgrade re-applies schema changes) with helm.sh/resource-policy: keep so helm uninstall never deletes them. Chart publication target is hub.nationtech.io. CRD yaml is generated at chart-release time by a new `iot-operator-v0 gen-chart-crd` subcommand reading Deployment::crd() — the runtime install path remains the typed Score; only the chart deliverable uses generated yaml. Wrapped with the helm conditional + annotations by templates/crds.yaml via .Files.Get so the generated yaml stays pure. Install / upgrade / uninstall-preserves-CRD validated against a scratch k3d cluster; the operator pod naturally stays pending because the hub.nationtech.io image hasn't been published yet. --- Cargo.lock | 1 + iot/iot-operator-v0/Cargo.toml | 1 + iot/iot-operator-v0/chart/.helmignore | 7 + iot/iot-operator-v0/chart/Chart.yaml | 17 +++ .../deployments.iot.nationtech.io.yaml | 141 ++++++++++++++++++ iot/iot-operator-v0/chart/regen-crd.sh | 17 +++ .../chart/templates/_helpers.tpl | 58 +++++++ .../chart/templates/clusterrole.yaml | 21 +++ .../chart/templates/clusterrolebinding.yaml | 16 ++ iot/iot-operator-v0/chart/templates/crds.yaml | 21 +++ .../chart/templates/deployment.yaml | 63 ++++++++ .../chart/templates/serviceaccount.yaml | 13 ++ iot/iot-operator-v0/chart/values.schema.json | 58 +++++++ iot/iot-operator-v0/chart/values.yaml | 79 ++++++++++ iot/iot-operator-v0/src/main.rs | 14 ++ 15 files changed, 527 insertions(+) create mode 100644 iot/iot-operator-v0/chart/.helmignore create mode 100644 iot/iot-operator-v0/chart/Chart.yaml create mode 100644 iot/iot-operator-v0/chart/crd-source/deployments.iot.nationtech.io.yaml create mode 100755 iot/iot-operator-v0/chart/regen-crd.sh create mode 100644 iot/iot-operator-v0/chart/templates/_helpers.tpl create mode 100644 iot/iot-operator-v0/chart/templates/clusterrole.yaml create mode 100644 iot/iot-operator-v0/chart/templates/clusterrolebinding.yaml create mode 100644 iot/iot-operator-v0/chart/templates/crds.yaml create mode 100644 iot/iot-operator-v0/chart/templates/deployment.yaml create mode 100644 iot/iot-operator-v0/chart/templates/serviceaccount.yaml create mode 100644 iot/iot-operator-v0/chart/values.schema.json create mode 100644 iot/iot-operator-v0/chart/values.yaml diff --git a/Cargo.lock b/Cargo.lock index e2154e7a..0ec7695a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4769,6 +4769,7 @@ dependencies = [ "schemars 0.8.22", "serde", "serde_json", + "serde_yaml", "thiserror 2.0.18", "tokio", "tracing", diff --git a/iot/iot-operator-v0/Cargo.toml b/iot/iot-operator-v0/Cargo.toml index dafc5fbe..9bc6e908 100644 --- a/iot/iot-operator-v0/Cargo.toml +++ b/iot/iot-operator-v0/Cargo.toml @@ -13,6 +13,7 @@ k8s-openapi.workspace = true async-nats = { workspace = true } serde.workspace = true serde_json.workspace = true +serde_yaml.workspace = true schemars = "0.8.22" tokio.workspace = true tracing = { workspace = true } diff --git a/iot/iot-operator-v0/chart/.helmignore b/iot/iot-operator-v0/chart/.helmignore new file mode 100644 index 00000000..7a0167cb --- /dev/null +++ b/iot/iot-operator-v0/chart/.helmignore @@ -0,0 +1,7 @@ +.DS_Store +.git/ +.gitignore +*.swp +*.bak +*.tmp +*.orig diff --git a/iot/iot-operator-v0/chart/Chart.yaml b/iot/iot-operator-v0/chart/Chart.yaml new file mode 100644 index 00000000..dff47b01 --- /dev/null +++ b/iot/iot-operator-v0/chart/Chart.yaml @@ -0,0 +1,17 @@ +apiVersion: v2 +name: iot-operator-v0 +description: NationTech IoT operator — Deployment CRD → NATS JetStream KV fleet reconciler. +type: application +version: 0.1.0 +appVersion: "0.1.0" +home: https://nationtech.io +sources: + - https://git.nationtech.io/Nationtech/harmony +keywords: + - iot + - nats + - operator + - fleet +maintainers: + - name: NationTech + email: dev@nationtech.io diff --git a/iot/iot-operator-v0/chart/crd-source/deployments.iot.nationtech.io.yaml b/iot/iot-operator-v0/chart/crd-source/deployments.iot.nationtech.io.yaml new file mode 100644 index 00000000..e8431e39 --- /dev/null +++ b/iot/iot-operator-v0/chart/crd-source/deployments.iot.nationtech.io.yaml @@ -0,0 +1,141 @@ +apiVersion: apiextensions.k8s.io/v1 +kind: CustomResourceDefinition +metadata: + name: deployments.iot.nationtech.io +spec: + group: iot.nationtech.io + names: + categories: [] + kind: Deployment + plural: deployments + shortNames: + - iotdep + singular: deployment + scope: Namespaced + versions: + - additionalPrinterColumns: [] + name: v1alpha1 + schema: + openAPIV3Schema: + description: Auto-generated derived type for DeploymentSpec via `CustomResource` + properties: + spec: + properties: + rollout: + properties: + strategy: + enum: + - Immediate + type: string + required: + - strategy + type: object + score: + properties: + data: + x-kubernetes-preserve-unknown-fields: true + type: + minLength: 1 + type: string + required: + - data + - type + type: object + x-kubernetes-validations: + - message: score.type must be a valid Rust identifier matching the struct name of the score variant (e.g. PodmanV0) + rule: self.type.matches('^[A-Za-z_][A-Za-z0-9_]*$') + targetDevices: + items: + type: string + type: array + required: + - rollout + - score + - targetDevices + type: object + status: + nullable: true + properties: + aggregate: + description: Per-deployment rollup aggregated from the `agent-status` bucket. Present once at least one targeted agent has heartbeated; absent on a freshly-created CR. + nullable: true + properties: + failed: + format: uint32 + minimum: 0.0 + type: integer + lastError: + description: Device id of the most recent device reporting a failure, with its short error message. Surfaces the top failure to the CR's status without needing per-device subresource lookups. + nullable: true + properties: + at: + type: string + deviceId: + type: string + message: + type: string + required: + - at + - deviceId + - message + type: object + lastHeartbeatAt: + description: Timestamp of the most recent agent heartbeat counted into this aggregate. "Freshness" signal — a CR whose aggregate hasn't advanced in minutes is evidence the whole fleet has gone dark. + nullable: true + type: string + pending: + format: uint32 + minimum: 0.0 + type: integer + recentEvents: + default: [] + description: Last-N events aggregated across all target devices, most recent first. Operator caps at a handful (see operator controller). + items: + properties: + at: + type: string + deployment: + nullable: true + type: string + deviceId: + type: string + message: + type: string + severity: + type: string + required: + - at + - deviceId + - message + - severity + type: object + type: array + succeeded: + description: Count of devices where the deployment is in each phase. Always populated (zeros are valid) so the operator can patch the whole subtree atomically. + format: uint32 + minimum: 0.0 + type: integer + unreported: + description: Count of target devices that haven't yet heartbeated at all. "failed to join fleet" vs. "failed to reconcile" — different signals, different remedies. + format: uint32 + minimum: 0.0 + type: integer + required: + - failed + - pending + - succeeded + - unreported + type: object + observedScoreString: + description: Last serialized score the operator pushed to NATS. Used by the operator itself for change-detection on the hot path (skip KV write + status patch when the CR is unchanged). + nullable: true + type: string + type: object + required: + - spec + title: Deployment + type: object + served: true + storage: true + subresources: + status: {} diff --git a/iot/iot-operator-v0/chart/regen-crd.sh b/iot/iot-operator-v0/chart/regen-crd.sh new file mode 100755 index 00000000..8678f333 --- /dev/null +++ b/iot/iot-operator-v0/chart/regen-crd.sh @@ -0,0 +1,17 @@ +#!/usr/bin/env bash +# Regenerate the chart's CRD yaml from the typed-Rust Deployment::crd(). +# Runs at chart-release time, not at runtime. The runtime install path +# is `iot-operator-v0 install`, which uses the same Rust source of +# truth via a harmony Score — no yaml. +# +# Usage: iot/iot-operator-v0/chart/regen-crd.sh +set -euo pipefail + +HERE="$(cd "$(dirname "$0")" && pwd)" +REPO="$(cd "$HERE/../../.." && pwd)" +OUT="$HERE/crd-source/deployments.iot.nationtech.io.yaml" + +cd "$REPO" +cargo build -q -p iot-operator-v0 +./target/debug/iot-operator-v0 gen-chart-crd >"$OUT" +echo "regenerated $OUT ($(wc -l <"$OUT") lines)" diff --git a/iot/iot-operator-v0/chart/templates/_helpers.tpl b/iot/iot-operator-v0/chart/templates/_helpers.tpl new file mode 100644 index 00000000..2b975d73 --- /dev/null +++ b/iot/iot-operator-v0/chart/templates/_helpers.tpl @@ -0,0 +1,58 @@ +{{/* +Expand the name of the chart. +*/}} +{{- define "iot-operator-v0.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" -}} +{{- end -}} + +{{/* +Fully qualified app name — defaults to -. Truncated to +the 63-char k8s label limit. +*/}} +{{- define "iot-operator-v0.fullname" -}} +{{- $name := default .Chart.Name .Values.nameOverride -}} +{{- if contains $name .Release.Name -}} +{{- .Release.Name | trunc 63 | trimSuffix "-" -}} +{{- else -}} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" -}} +{{- end -}} +{{- end -}} + +{{- define "iot-operator-v0.chart" -}} +{{- printf "%s-%s" .Chart.Name .Chart.Version | replace "+" "_" | trunc 63 | trimSuffix "-" -}} +{{- end -}} + +{{/* +Common labels applied to every chart-owned resource. +*/}} +{{- define "iot-operator-v0.labels" -}} +helm.sh/chart: {{ include "iot-operator-v0.chart" . }} +{{ include "iot-operator-v0.selectorLabels" . }} +app.kubernetes.io/version: {{ .Chart.AppVersion | quote }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- end -}} + +{{- define "iot-operator-v0.selectorLabels" -}} +app.kubernetes.io/name: {{ include "iot-operator-v0.name" . }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end -}} + +{{/* +ServiceAccount name — honors .Values.serviceAccount.name when set, +falls back to the fullname. +*/}} +{{- define "iot-operator-v0.serviceAccountName" -}} +{{- if .Values.serviceAccount.create -}} +{{- default (include "iot-operator-v0.fullname" .) .Values.serviceAccount.name -}} +{{- else -}} +{{- default "default" .Values.serviceAccount.name -}} +{{- end -}} +{{- end -}} + +{{/* +Image ref. Defaults to .Chart.AppVersion when tag is empty. +*/}} +{{- define "iot-operator-v0.image" -}} +{{- $tag := default .Chart.AppVersion .Values.image.tag -}} +{{- printf "%s:%s" .Values.image.repository $tag -}} +{{- end -}} diff --git a/iot/iot-operator-v0/chart/templates/clusterrole.yaml b/iot/iot-operator-v0/chart/templates/clusterrole.yaml new file mode 100644 index 00000000..578be36c --- /dev/null +++ b/iot/iot-operator-v0/chart/templates/clusterrole.yaml @@ -0,0 +1,21 @@ +{{- if .Values.rbac.create -}} +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRole +metadata: + name: {{ include "iot-operator-v0.fullname" . }} + labels: + {{- include "iot-operator-v0.labels" . | nindent 4 }} +rules: + # Reconcile loop: watch the Deployment CRs and update their + # status subresource. + - apiGroups: ["iot.nationtech.io"] + resources: ["deployments"] + verbs: ["get", "list", "watch", "patch", "update"] + - apiGroups: ["iot.nationtech.io"] + resources: ["deployments/status"] + verbs: ["get", "patch", "update"] + # Finalizer add/remove — kube-rs's finalizer() helper patches the + # main CR, covered by the deployments rule above. Kept explicit + # here as a forward-compat note: if we add additional custom + # finalizer-guarded resources, extend this block. +{{- end -}} diff --git a/iot/iot-operator-v0/chart/templates/clusterrolebinding.yaml b/iot/iot-operator-v0/chart/templates/clusterrolebinding.yaml new file mode 100644 index 00000000..46da6290 --- /dev/null +++ b/iot/iot-operator-v0/chart/templates/clusterrolebinding.yaml @@ -0,0 +1,16 @@ +{{- if .Values.rbac.create -}} +apiVersion: rbac.authorization.k8s.io/v1 +kind: ClusterRoleBinding +metadata: + name: {{ include "iot-operator-v0.fullname" . }} + labels: + {{- include "iot-operator-v0.labels" . | nindent 4 }} +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: ClusterRole + name: {{ include "iot-operator-v0.fullname" . }} +subjects: + - kind: ServiceAccount + name: {{ include "iot-operator-v0.serviceAccountName" . }} + namespace: {{ .Release.Namespace }} +{{- end -}} diff --git a/iot/iot-operator-v0/chart/templates/crds.yaml b/iot/iot-operator-v0/chart/templates/crds.yaml new file mode 100644 index 00000000..006bede9 --- /dev/null +++ b/iot/iot-operator-v0/chart/templates/crds.yaml @@ -0,0 +1,21 @@ +{{/* +CRD installation. Cnpg-aligned pattern: + + - CRDs sit inside templates/ (not Helm's native top-level crds/ dir) + so `helm upgrade` re-applies schema changes transparently. + - resource-policy: keep stops `helm uninstall` from deleting them + (and with them every customer CR). + - Gated by .Values.crds.create so GitOps setups that manage CRDs + out-of-band (ArgoCD, Flux) can flip it off. + +The actual CRD YAML lives at chart/crd-source/ — regenerated from +the typed-Rust Deployment::crd() by `iot-operator-v0 gen-chart-crd` +at release time, never hand-edited. +*/}} +{{- if .Values.crds.create }} +{{- $crd := .Files.Get "crd-source/deployments.iot.nationtech.io.yaml" | fromYaml }} +{{- $_ := set $crd "metadata" (merge (dict "annotations" (dict "helm.sh/resource-policy" "keep")) ($crd.metadata | default dict)) }} +{{- $_ = set $crd.metadata "annotations" (merge (dict "helm.sh/resource-policy" "keep") ($crd.metadata.annotations | default dict)) }} +{{- $_ = set $crd.metadata "labels" (merge (dict "app.kubernetes.io/managed-by" .Release.Service "app.kubernetes.io/part-of" (include "iot-operator-v0.name" .)) ($crd.metadata.labels | default dict)) }} +{{ toYaml $crd }} +{{- end }} diff --git a/iot/iot-operator-v0/chart/templates/deployment.yaml b/iot/iot-operator-v0/chart/templates/deployment.yaml new file mode 100644 index 00000000..5fff8db3 --- /dev/null +++ b/iot/iot-operator-v0/chart/templates/deployment.yaml @@ -0,0 +1,63 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "iot-operator-v0.fullname" . }} + namespace: {{ .Release.Namespace }} + labels: + {{- include "iot-operator-v0.labels" . | nindent 4 }} +spec: + replicas: {{ .Values.replicaCount }} + strategy: + type: Recreate + selector: + matchLabels: + {{- include "iot-operator-v0.selectorLabels" . | nindent 6 }} + template: + metadata: + labels: + {{- include "iot-operator-v0.selectorLabels" . | nindent 8 }} + {{- with .Values.podLabels }} + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.podAnnotations }} + annotations: + {{- toYaml . | nindent 8 }} + {{- end }} + spec: + serviceAccountName: {{ include "iot-operator-v0.serviceAccountName" . }} + {{- with .Values.imagePullSecrets }} + imagePullSecrets: + {{- toYaml . | nindent 8 }} + {{- end }} + securityContext: + {{- toYaml .Values.podSecurityContext | nindent 8 }} + containers: + - name: operator + image: {{ include "iot-operator-v0.image" . | quote }} + imagePullPolicy: {{ .Values.image.pullPolicy }} + args: ["run"] + env: + - name: NATS_URL + value: {{ .Values.nats.url | quote }} + {{- if .Values.kvBucket }} + - name: KV_BUCKET + value: {{ .Values.kvBucket | quote }} + {{- end }} + - name: RUST_LOG + value: {{ .Values.logLevel | quote }} + securityContext: + {{- toYaml .Values.containerSecurityContext | nindent 12 }} + resources: + {{- toYaml .Values.resources | nindent 12 }} + {{- with .Values.nodeSelector }} + nodeSelector: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.tolerations }} + tolerations: + {{- toYaml . | nindent 8 }} + {{- end }} + {{- with .Values.affinity }} + affinity: + {{- toYaml . | nindent 8 }} + {{- end }} diff --git a/iot/iot-operator-v0/chart/templates/serviceaccount.yaml b/iot/iot-operator-v0/chart/templates/serviceaccount.yaml new file mode 100644 index 00000000..4e769dfd --- /dev/null +++ b/iot/iot-operator-v0/chart/templates/serviceaccount.yaml @@ -0,0 +1,13 @@ +{{- if .Values.serviceAccount.create -}} +apiVersion: v1 +kind: ServiceAccount +metadata: + name: {{ include "iot-operator-v0.serviceAccountName" . }} + namespace: {{ .Release.Namespace }} + labels: + {{- include "iot-operator-v0.labels" . | nindent 4 }} + {{- with .Values.serviceAccount.annotations }} + annotations: + {{- toYaml . | nindent 4 }} + {{- end }} +{{- end -}} diff --git a/iot/iot-operator-v0/chart/values.schema.json b/iot/iot-operator-v0/chart/values.schema.json new file mode 100644 index 00000000..83001d99 --- /dev/null +++ b/iot/iot-operator-v0/chart/values.schema.json @@ -0,0 +1,58 @@ +{ + "$schema": "https://json-schema.org/draft-07/schema#", + "type": "object", + "properties": { + "image": { + "type": "object", + "required": ["repository", "pullPolicy"], + "properties": { + "repository": { "type": "string", "minLength": 1 }, + "tag": { "type": "string" }, + "pullPolicy": { "enum": ["Always", "IfNotPresent", "Never"] } + } + }, + "imagePullSecrets": { + "type": "array", + "items": { + "type": "object", + "required": ["name"], + "properties": { "name": { "type": "string" } } + } + }, + "replicaCount": { "type": "integer", "minimum": 1 }, + "nats": { + "type": "object", + "required": ["url"], + "properties": { "url": { "type": "string", "minLength": 1 } } + }, + "kvBucket": { "type": "string" }, + "logLevel": { "type": "string" }, + "crds": { + "type": "object", + "required": ["create"], + "properties": { "create": { "type": "boolean" } } + }, + "serviceAccount": { + "type": "object", + "required": ["create"], + "properties": { + "create": { "type": "boolean" }, + "name": { "type": "string" }, + "annotations": { "type": "object" } + } + }, + "rbac": { + "type": "object", + "required": ["create"], + "properties": { "create": { "type": "boolean" } } + }, + "resources": { "type": "object" }, + "podSecurityContext": { "type": "object" }, + "containerSecurityContext": { "type": "object" }, + "nodeSelector": { "type": "object" }, + "tolerations": { "type": "array" }, + "affinity": { "type": "object" }, + "podAnnotations": { "type": "object" }, + "podLabels": { "type": "object" } + } +} diff --git a/iot/iot-operator-v0/chart/values.yaml b/iot/iot-operator-v0/chart/values.yaml new file mode 100644 index 00000000..f1f41350 --- /dev/null +++ b/iot/iot-operator-v0/chart/values.yaml @@ -0,0 +1,79 @@ +## iot-operator-v0 helm chart values. +## Cnpg-aligned conventions: image.* shape, crds.create toggle, +## non-root podSecurityContext, resources{} stub. + +image: + repository: hub.nationtech.io/iot-operator-v0 + # Defaults to .Chart.AppVersion when unset. + tag: "" + pullPolicy: IfNotPresent + +imagePullSecrets: [] + +## Controller is singleton today — kube-rs Controller handles +## reconcile-level concurrency within the process. Multi-replica +## HA would need leader election wired up first (see ROADMAP +## for future work). +replicaCount: 1 + +## NATS connection. The operator connects to this URL on startup +## and stays connected; a flaky NATS link surfaces as pod restart. +nats: + url: nats://nats.iot-system.svc.cluster.local:4222 + +## Override the KV bucket name the operator writes desired-state +## into. Defaults to harmony-reconciler-contracts' BUCKET_DESIRED_STATE +## constant, so this is only useful for multi-tenant testbeds. +kvBucket: "" + +## RUST_LOG directive passed through to the operator pod. +logLevel: info + +## CRDs. Cnpg pattern: install by default, gated by this toggle so +## GitOps/cluster-admin teams managing CRDs out-of-band can flip +## it off. Resource-policy: keep annotation on the CRDs themselves +## means `helm uninstall` never deletes them (and therefore never +## deletes customer CRs). +crds: + create: true + +serviceAccount: + create: true + # Leave empty to default to the release name. + name: "" + annotations: {} + +rbac: + create: true + +## Standard k8s resource requests/limits. Left unset by default; +## uncomment and tune before running in production. +resources: {} + # limits: + # cpu: 200m + # memory: 256Mi + # requests: + # cpu: 50m + # memory: 128Mi + +podSecurityContext: + runAsNonRoot: true + runAsUser: 10001 + runAsGroup: 10001 + fsGroup: 10001 + seccompProfile: + type: RuntimeDefault + +containerSecurityContext: + allowPrivilegeEscalation: false + readOnlyRootFilesystem: true + capabilities: + drop: + - ALL + +nodeSelector: {} +tolerations: [] +affinity: {} + +podAnnotations: {} +podLabels: {} diff --git a/iot/iot-operator-v0/src/main.rs b/iot/iot-operator-v0/src/main.rs index 8c686216..79a7370b 100644 --- a/iot/iot-operator-v0/src/main.rs +++ b/iot/iot-operator-v0/src/main.rs @@ -45,6 +45,11 @@ enum Command { /// Apply the operator's CRD to the cluster `KUBECONFIG` points /// at. Uses harmony's typed k8s client — no yaml, no kubectl. Install, + /// Print the Deployment CRD as YAML on stdout. Used by the + /// helm chart's release-time regeneration step — the runtime + /// `install` path is still the typed-Rust Score. Never invoked + /// in framework code paths, only by release tooling. + GenChartCrd, } #[tokio::main] @@ -57,9 +62,18 @@ async fn main() -> Result<()> { match cli.command.unwrap_or(Command::Run) { Command::Install => install::install_crds().await, Command::Run => run(&cli.nats_url, &cli.kv_bucket).await, + Command::GenChartCrd => gen_chart_crd(), } } +fn gen_chart_crd() -> Result<()> { + use kube::CustomResourceExt; + let crd = crd::Deployment::crd(); + let yaml = serde_yaml::to_string(&crd)?; + print!("{yaml}"); + Ok(()) +} + async fn run(nats_url: &str, bucket: &str) -> Result<()> { let nats = async_nats::connect(nats_url).await?; tracing::info!(url = %nats_url, "connected to NATS"); -- 2.39.5 From 92150da12aa62c030d5f5f82fca579be45680f14 Mon Sep 17 00:00:00 2001 From: Jean-Gabriel Gill-Couture Date: Wed, 22 Apr 2026 11:13:42 -0400 Subject: [PATCH 2/2] feat(iot): label-selector targeting (replace target_devices with targetSelector) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit DeploymentSpec.target_devices (flat string list) is gone. In its place, DeploymentSpec.target_selector is a minimal LabelSelector-shaped struct (matchLabels only for now, matchExpressions deferred until there's a real need). Devices publish a labels map in every AgentStatus heartbeat; operator resolves the selector against the current fleet snapshot on each reconcile + aggregator tick. No legacy shim — the CRD is v1alpha1 and not yet deployed in the wild. Aggregator consequences: - controller and aggregator now share a StatusSnapshots map so selector resolution sees the same data on both sides. - unreported is dropped: a device that has never heartbeated is invisible to the selector machinery, so the field no longer has clean semantics. "device went dark" can come back as a staleness metric later if needed. - controller's MissingTargets error is gone: zero matches is a legitimate state (devices may not have joined yet). The controller logs and fast-requeues (15s/30s) so a just-joining device picks the deployment up without needing a cross-task subscription. Agent + setup Score: - Agent config grows a [labels] section (BTreeMap); the flat [agent].group field is gone. group becomes just one label. - IotDeviceSetupConfig takes a BTreeMap instead of a String group. TOML render iterates the BTreeMap (ordered) so idempotent change detection still works cleanly. CLI-facing: - example_iot_apply_deployment: --target-device -> --to, accepts comma-separated key=value pairs. - example_iot_vm_setup: --group -> --labels, same grammar. - smoke-a4.sh: VM publishes group=$GROUP,device=$DEVICE_ID; deploys target --to device=$DEVICE_ID so single-device smoke behavior is preserved while exercising the selector path. CRD regenerated via chart/regen-crd.sh. 7 contract tests + 6 operator tests pass. --- examples/iot_apply_deployment/src/main.rs | 65 +++-- examples/iot_vm_setup/src/main.rs | 45 +++- harmony-reconciler-contracts/src/status.rs | 13 + harmony/src/modules/iot/setup_score.rs | 34 ++- iot/iot-agent-v0/src/config.rs | 6 + iot/iot-agent-v0/src/main.rs | 3 + .../deployments.iot.nationtech.io.yaml | 26 +- iot/iot-operator-v0/src/aggregate.rs | 246 +++++++++++++----- iot/iot-operator-v0/src/controller.rs | 93 +++++-- iot/iot-operator-v0/src/crd.rs | 47 +++- iot/iot-operator-v0/src/main.rs | 8 +- iot/scripts/smoke-a4.sh | 19 +- 12 files changed, 447 insertions(+), 158 deletions(-) diff --git a/examples/iot_apply_deployment/src/main.rs b/examples/iot_apply_deployment/src/main.rs index 2fe6b0eb..94a98676 100644 --- a/examples/iot_apply_deployment/src/main.rs +++ b/examples/iot_apply_deployment/src/main.rs @@ -13,28 +13,37 @@ //! //! Typical demo-driver usage: //! -//! # apply an nginx deployment +//! # apply an nginx deployment to every device in the site-a group //! cargo run -q -p example_iot_apply_deployment -- \ -//! --target-device iot-smoke-vm-arm \ +//! --to group=site-a \ //! --image nginx:latest //! //! # print the CR JSON (lets the user kubectl-apply it manually) //! cargo run -q -p example_iot_apply_deployment -- \ -//! --target-device iot-smoke-vm-arm \ +//! --to group=site-a \ //! --image nginx:latest --print | kubectl apply -f - //! -//! # upgrade the same deployment to a newer image +//! # target a single device for a dev loop //! cargo run -q -p example_iot_apply_deployment -- \ -//! --target-device iot-smoke-vm-arm \ +//! --to device=iot-smoke-vm-arm \ +//! --image nginx:latest +//! +//! # upgrade the same deployment to a newer image (same selector) +//! cargo run -q -p example_iot_apply_deployment -- \ +//! --to group=site-a \ //! --image nginx:1.26 //! //! # delete the deployment //! cargo run -q -p example_iot_apply_deployment -- --delete -use anyhow::{Context, Result}; +use std::collections::BTreeMap; + +use anyhow::{Context, Result, anyhow}; use clap::Parser; use harmony::modules::podman::{PodmanService, PodmanV0Score}; -use iot_operator_v0::crd::{Deployment, DeploymentSpec, Rollout, RolloutStrategy, ScorePayload}; +use iot_operator_v0::crd::{ + Deployment, DeploymentSpec, Rollout, RolloutStrategy, ScorePayload, TargetSelector, +}; use kube::Client; use kube::api::{Api, DeleteParams, Patch, PatchParams}; @@ -51,10 +60,13 @@ struct Cli { /// podman container name on the device. #[arg(long, default_value = "hello-world")] name: String, - /// Device id that should run the container. Must match the - /// agent's `device_id` config. - #[arg(long, default_value = "iot-smoke-vm")] - target_device: String, + /// Label selector picking which devices run the deployment. + /// Comma-separated `key=value` pairs (conjunctive), e.g. + /// `--to group=site-a` or `--to device=iot-smoke-vm,arch=aarch64`. + /// A device matches when its published labels are a superset of + /// these pairs. Applies except in `--delete` mode. + #[arg(long, default_value = "device=iot-smoke-vm")] + to: String, /// Container image to run. #[arg(long, default_value = "docker.io/library/nginx:latest")] image: String, @@ -70,10 +82,29 @@ struct Cli { print: bool, } +fn parse_selector(raw: &str) -> Result { + let mut match_labels = BTreeMap::new(); + for piece in raw.split(',').map(str::trim).filter(|p| !p.is_empty()) { + let (k, v) = piece + .split_once('=') + .ok_or_else(|| anyhow!("selector chunk '{piece}' missing '='"))?; + let k = k.trim(); + let v = v.trim(); + if k.is_empty() || v.is_empty() { + return Err(anyhow!("selector chunk '{piece}' has empty key or value")); + } + match_labels.insert(k.to_string(), v.to_string()); + } + if match_labels.is_empty() { + return Err(anyhow!("selector is empty — pass e.g. `--to group=site-a`")); + } + Ok(TargetSelector { match_labels }) +} + #[tokio::main] async fn main() -> Result<()> { let cli = Cli::parse(); - let cr = build_cr(&cli); + let cr = build_cr(&cli)?; if cli.print { println!("{}", serde_json::to_string_pretty(&cr)?); @@ -117,7 +148,7 @@ async fn main() -> Result<()> { Ok(()) } -fn build_cr(cli: &Cli) -> Deployment { +fn build_cr(cli: &Cli) -> Result { let score = PodmanV0Score { services: vec![PodmanService { name: cli.name.clone(), @@ -135,14 +166,16 @@ fn build_cr(cli: &Cli) -> Deployment { data: serde_json::to_value(&score).expect("PodmanV0Score is JSON-clean"), }; - Deployment::new( + let target_selector = parse_selector(&cli.to)?; + + Ok(Deployment::new( &cli.name, DeploymentSpec { - target_devices: vec![cli.target_device.clone()], + target_selector, score: payload, rollout: Rollout { strategy: RolloutStrategy::Immediate, }, }, - ) + )) } diff --git a/examples/iot_vm_setup/src/main.rs b/examples/iot_vm_setup/src/main.rs index 308a65cb..e86e640c 100644 --- a/examples/iot_vm_setup/src/main.rs +++ b/examples/iot_vm_setup/src/main.rs @@ -57,9 +57,14 @@ struct Cli { /// fresh `Id` (hex timestamp + random suffix). #[arg(long)] device_id: Option, - /// Fleet group label to write into the agent's TOML config. - #[arg(long, default_value = "group-a")] - group: String, + /// Routing labels to write into the agent's TOML config. Accepts + /// a comma-separated list of `key=value` pairs, e.g. + /// `--labels group=site-a,role=sensor-gateway`. Published in + /// every heartbeat; the operator resolves Deployment selectors + /// against this map. At least one label is required so the + /// device is targetable. + #[arg(long, default_value = "group=group-a")] + labels: String, /// libvirt network name to attach the VM to. #[arg(long, default_value = "default")] network: String, @@ -186,9 +191,15 @@ async fn main() -> Result<()> { }, ); + let labels = parse_labels(&cli.labels)?; + let labels_display = labels + .iter() + .map(|(k, v)| format!("{k}={v}")) + .collect::>() + .join(","); let setup_score = IotDeviceSetupScore::new(IotDeviceSetupConfig { device_id: device_id.clone(), - group: cli.group.clone(), + labels, nats_urls: vec![cli.nats_url.clone()], nats_user: cli.nats_user.clone(), nats_pass: cli.nats_pass.clone(), @@ -196,13 +207,31 @@ async fn main() -> Result<()> { }); run_setup_score(&setup_score, &linux_topology).await?; - println!( - "device '{device_id}' (group '{}') onboarded via {vm_ip}", - cli.group - ); + println!("device '{device_id}' ({labels_display}) onboarded via {vm_ip}"); Ok(()) } +fn parse_labels(raw: &str) -> Result> { + let mut out = std::collections::BTreeMap::new(); + for piece in raw.split(',').map(str::trim).filter(|p| !p.is_empty()) { + let (k, v) = piece + .split_once('=') + .ok_or_else(|| anyhow::anyhow!("label chunk '{piece}' missing '='"))?; + let k = k.trim(); + let v = v.trim(); + if k.is_empty() || v.is_empty() { + anyhow::bail!("label chunk '{piece}' has empty key or value"); + } + out.insert(k.to_string(), v.to_string()); + } + if out.is_empty() { + anyhow::bail!( + "--labels must have at least one `key=value` pair (selectors need something to match on)" + ); + } + Ok(out) +} + async fn run_vm_score( score: &ProvisionVmScore, topology: &KvmVirtualMachineHost, diff --git a/harmony-reconciler-contracts/src/status.rs b/harmony-reconciler-contracts/src/status.rs index bbe39b79..7bb28101 100644 --- a/harmony-reconciler-contracts/src/status.rs +++ b/harmony-reconciler-contracts/src/status.rs @@ -58,6 +58,15 @@ pub struct AgentStatus { /// disk." #[serde(default)] pub inventory: Option, + /// Routing labels. The agent echoes the label map from its local + /// config so the operator can resolve a Deployment's + /// `targetSelector` against the fleet. Keys + values are + /// user-defined; typical entries: `group=site-a`, `arch=aarch64`, + /// `role=sensor-gateway`. Devices with an empty map are + /// effectively un-targetable — a deliberate choice, since every + /// device in a real fleet will have at least a `group` label. + #[serde(default)] + pub labels: BTreeMap, } /// Reconcile phase for a single deployment on one device. @@ -151,6 +160,7 @@ mod tests { deployments: BTreeMap::new(), recent_events: vec![], inventory: None, + labels: BTreeMap::new(), }; let json = serde_json::to_string(&s).unwrap(); let back: AgentStatus = serde_json::from_str(&json).unwrap(); @@ -205,6 +215,7 @@ mod tests { memory_mb: 8192, agent_version: "0.1.0".to_string(), }), + labels: BTreeMap::from([("group".to_string(), "site-a".to_string())]), }; let json = serde_json::to_string(&s).unwrap(); let back: AgentStatus = serde_json::from_str(&json).unwrap(); @@ -225,6 +236,7 @@ mod tests { assert!(s.deployments.is_empty()); assert!(s.recent_events.is_empty()); assert!(s.inventory.is_none()); + assert!(s.labels.is_empty()); } #[test] @@ -236,6 +248,7 @@ mod tests { deployments: BTreeMap::new(), recent_events: vec![], inventory: None, + labels: BTreeMap::new(), }; let json = serde_json::to_string(&s).unwrap(); assert!(json.contains("\"device_id\":\"pi-01\""), "got {json}"); diff --git a/harmony/src/modules/iot/setup_score.rs b/harmony/src/modules/iot/setup_score.rs index 76bfe71c..54aa86ea 100644 --- a/harmony/src/modules/iot/setup_score.rs +++ b/harmony/src/modules/iot/setup_score.rs @@ -1,8 +1,9 @@ //! [`IotDeviceSetupScore`] — install podman + the iot-agent, wire the //! agent's TOML config, enable the systemd unit. Idempotent: re-running -//! with a changed config (e.g. a different `group`) updates only what +//! with a changed config (different labels, etc.) updates only what //! differs and restarts the agent once. +use std::collections::BTreeMap; use std::path::PathBuf; use async_trait::async_trait; @@ -25,13 +26,12 @@ use crate::score::Score; /// User-visible configuration for the setup Score. Everything a customer /// needs to tell us to bring a device into the fleet. /// -/// **On `group`.** For v0 the group is a *label*, written into the -/// agent's TOML config and reported back via the status bucket. It does -/// not yet drive deployment routing — `Deployment.spec.targetDevices` -/// still takes explicit device IDs. `targetGroups` is a v0.1+ item -/// (ROADMAP §6.5). Running this Score twice against the same device -/// with different `group` values is how a device is moved between -/// fleet partitions once group routing lands. +/// **On `labels`.** The label map is published verbatim in every +/// agent heartbeat so the operator can resolve a Deployment's +/// `targetSelector` against this device. `group` is the conventional +/// primary label, but any key/value pair the user wants can go in. +/// Re-running this Score with a changed label map is how a device is +/// moved between fleet partitions. #[derive(Debug, Clone, Serialize, Deserialize)] pub struct IotDeviceSetupConfig { /// Stable device identifier. Written into the agent's TOML and @@ -40,8 +40,9 @@ pub struct IotDeviceSetupConfig { /// at up to ~10k devices/sec, which matches the feel of a fleet /// registry. pub device_id: Id, - /// Fleet partition this device belongs to. - pub group: String, + /// Routing labels for selector-based targeting. Typical keys: + /// `group`, `arch`, `role`. Published in every heartbeat. + pub labels: BTreeMap, /// NATS URLs the agent should connect to. Typically one entry. pub nats_urls: Vec, /// Shared v0 credentials (Zitadel-issued per-device tokens in v0.2). @@ -61,7 +62,6 @@ impl IotDeviceSetupConfig { // double-quoted strings are just `\` and `"`, handled by // [`toml_escape`]. let device_id = toml_escape(&self.device_id.to_string()); - let group = toml_escape(&self.group); let nats_user = toml_escape(&self.nats_user); let nats_pass = toml_escape(&self.nats_pass); let urls = self @@ -70,10 +70,20 @@ impl IotDeviceSetupConfig { .map(|u| format!("\"{}\"", toml_escape(u))) .collect::>() .join(", "); + // BTreeMap iteration is ordered — stable TOML output, so + // idempotent change detection compares cleanly. + let labels = self + .labels + .iter() + .map(|(k, v)| format!("{} = \"{}\"", toml_escape(k), toml_escape(v))) + .collect::>() + .join("\n"); format!( r#"[agent] device_id = "{device_id}" -group = "{group}" + +[labels] +{labels} [credentials] type = "toml-shared" diff --git a/iot/iot-agent-v0/src/config.rs b/iot/iot-agent-v0/src/config.rs index e0c8291f..4ae3175b 100644 --- a/iot/iot-agent-v0/src/config.rs +++ b/iot/iot-agent-v0/src/config.rs @@ -1,5 +1,6 @@ use harmony_reconciler_contracts::Id; use serde::Deserialize; +use std::collections::BTreeMap; use std::path::Path; #[derive(Debug, Clone, Deserialize)] @@ -7,6 +8,11 @@ pub struct AgentConfig { pub agent: AgentSection, pub nats: NatsSection, pub credentials: CredentialsSection, + /// Routing labels. Published verbatim in every heartbeat so the + /// operator can resolve a Deployment's `targetSelector` against + /// this device. Typical keys: `group`, `arch`, `role`. + #[serde(default)] + pub labels: BTreeMap, } #[derive(Debug, Clone, Deserialize)] diff --git a/iot/iot-agent-v0/src/main.rs b/iot/iot-agent-v0/src/main.rs index dfa236ba..b01544f4 100644 --- a/iot/iot-agent-v0/src/main.rs +++ b/iot/iot-agent-v0/src/main.rs @@ -90,6 +90,7 @@ async fn report_status( device_id: Id, reconciler: Arc, inventory: Option, + labels: std::collections::BTreeMap, ) -> Result<()> { let jetstream = async_nats::jetstream::new(client); let bucket = jetstream @@ -112,6 +113,7 @@ async fn report_status( deployments, recent_events, inventory: inventory.clone(), + labels: labels.clone(), }; let payload = serde_json::to_vec(&status)?; bucket.put(&key, payload.into()).await?; @@ -199,6 +201,7 @@ async fn main() -> Result<()> { device_id, reconciler.clone(), Some(inventory_snapshot), + cfg.labels.clone(), ); let reconcile = reconciler.clone().run_periodic(RECONCILE_INTERVAL); diff --git a/iot/iot-operator-v0/chart/crd-source/deployments.iot.nationtech.io.yaml b/iot/iot-operator-v0/chart/crd-source/deployments.iot.nationtech.io.yaml index e8431e39..263badda 100644 --- a/iot/iot-operator-v0/chart/crd-source/deployments.iot.nationtech.io.yaml +++ b/iot/iot-operator-v0/chart/crd-source/deployments.iot.nationtech.io.yaml @@ -44,14 +44,22 @@ spec: x-kubernetes-validations: - message: score.type must be a valid Rust identifier matching the struct name of the score variant (e.g. PodmanV0) rule: self.type.matches('^[A-Za-z_][A-Za-z0-9_]*$') - targetDevices: - items: - type: string - type: array + targetSelector: + description: |- + Label selector that picks which devices in the fleet run this deployment. Devices publish their labels via `AgentStatus`; the operator resolves the selector against the current fleet snapshot on every reconcile. + + Matches the Kubernetes `LabelSelector.matchLabels` wire format so the CLI, dashboards, and kubectl tooling all speak the same selector grammar. Expressions (`In`, `NotIn`, etc.) are deferred until there's a concrete need. + properties: + matchLabels: + additionalProperties: + type: string + default: {} + type: object + type: object required: - rollout - score - - targetDevices + - targetSelector type: object status: nullable: true @@ -111,12 +119,7 @@ spec: type: object type: array succeeded: - description: Count of devices where the deployment is in each phase. Always populated (zeros are valid) so the operator can patch the whole subtree atomically. - format: uint32 - minimum: 0.0 - type: integer - unreported: - description: Count of target devices that haven't yet heartbeated at all. "failed to join fleet" vs. "failed to reconcile" — different signals, different remedies. + description: Count of devices where the deployment is in each phase. Always populated (zeros are valid) so the operator can patch the whole subtree atomically. With selector-based targeting there is no "unreported" counterpart — a device that has never heartbeated is invisible to the selector machinery. format: uint32 minimum: 0.0 type: integer @@ -124,7 +127,6 @@ spec: - failed - pending - succeeded - - unreported type: object observedScoreString: description: Last serialized score the operator pushed to NATS. Used by the operator itself for change-detection on the hot path (skip KV write + status patch when the CR is unchanged). diff --git a/iot/iot-operator-v0/src/aggregate.rs b/iot/iot-operator-v0/src/aggregate.rs index c6ca9c83..cf63f47e 100644 --- a/iot/iot-operator-v0/src/aggregate.rs +++ b/iot/iot-operator-v0/src/aggregate.rs @@ -2,28 +2,23 @@ //! //! Watches the `agent-status` NATS KV bucket, keeps a per-device //! snapshot in memory, and periodically recomputes each Deployment -//! CR's `.status.aggregate` subtree from the intersection of its -//! `spec.targetDevices` list and the known device statuses. +//! CR's `.status.aggregate` subtree from the devices whose labels +//! match the CR's `spec.targetSelector`. //! //! Runs as a background task alongside the controller. Keeping the //! controller free of NATS-KV subscription state lets its reconcile -//! loop stay reactive and cheap (just publishing desired state + -//! managing finalizers), while this task handles the slower +//! loop stay reactive and cheap (publishing desired state, managing +//! finalizers), while this task handles the slower //! many-devices-to-one-CR fan-in. //! //! Design choices: //! - **In-memory snapshot map** (device_id → AgentStatus). Rebuilt //! from JetStream on startup via the watch's initial replay; kept //! current by watching thereafter. No persistence — the bucket is -//! the source of truth. -//! - **Periodic aggregation tick** (5 s). Cheap (a few BTreeMap -//! lookups + one `patch_status` per CR) and gives predictable -//! operator behaviour for the smoke harness. A push-based -//! "recompute on every Put" would be tighter but adds complexity -//! this v0.1 doesn't need. -//! - **JSON-Merge Patch.** Writes only the `aggregate` subtree, so -//! it composes cleanly with the controller's -//! `observedScoreString` patch. +//! the source of truth. The same map is shared with the +//! controller so it can resolve selectors for KV writes. +//! - **Periodic aggregation tick** (5 s). +//! - **JSON-Merge Patch.** Writes only the `aggregate` subtree. use std::collections::BTreeMap; use std::sync::Arc; @@ -37,24 +32,35 @@ use kube::{Client, ResourceExt}; use serde_json::json; use tokio::sync::Mutex; -use crate::crd::{AggregateEvent, AggregateLastError, Deployment, DeploymentAggregate}; +use crate::crd::{ + AggregateEvent, AggregateLastError, Deployment, DeploymentAggregate, TargetSelector, +}; /// Cap on how many events we surface in `DeploymentAggregate.recent_events`. -/// Small enough to keep the CR status compact. const AGGREGATE_EVENT_CAP: usize = 10; /// How often the aggregator recomputes + patches. const AGGREGATE_TICK: Duration = Duration::from_secs(5); -/// Per-device status snapshot keyed by device id string. +/// Per-device status snapshot keyed by device id string. Shared +/// between the aggregator (writer) and the controller (reader) so +/// selector resolution sees the same data both sides. pub type StatusSnapshots = Arc>>; -/// Spawn the aggregator: watch the agent-status bucket into an -/// in-memory map, and periodically fold that map into every -/// Deployment CR's `.status.aggregate`. -pub async fn run(client: Client, status_bucket: Store) -> anyhow::Result<()> { - let snapshots: StatusSnapshots = Arc::new(Mutex::new(BTreeMap::new())); +/// Build a fresh empty snapshot map. Call once from `main`, hand +/// clones to both controller and aggregator. +pub fn new_snapshots() -> StatusSnapshots { + Arc::new(Mutex::new(BTreeMap::new())) +} +/// Spawn the aggregator: watch the agent-status bucket into the +/// shared snapshot map, and periodically fold it into every +/// Deployment CR's `.status.aggregate`. +pub async fn run( + client: Client, + status_bucket: Store, + snapshots: StatusSnapshots, +) -> anyhow::Result<()> { let watcher = tokio::spawn(watch_status_bucket(status_bucket, snapshots.clone())); let aggregator = tokio::spawn(aggregate_loop(client, snapshots)); @@ -130,7 +136,7 @@ async fn tick_once( None => continue, }; let name = cr.name_any(); - let aggregate = compute_aggregate(&cr.spec.target_devices, &name, &snapshot); + let aggregate = compute_aggregate(&cr.spec.target_selector, &name, &snapshot); let status = json!({ "status": { "aggregate": aggregate } }); let api: Api = Api::namespaced(deployments.clone().into_client(), &ns); if let Err(e) = api @@ -143,10 +149,25 @@ async fn tick_once( Ok(()) } +/// Resolve the selector against the snapshot → sorted list of +/// matching device ids. Stable ordering gives deterministic KV +/// write ordering + readable logs. +pub fn resolve_targets( + selector: &TargetSelector, + snapshots: &BTreeMap, +) -> Vec { + let mut hits: Vec = snapshots + .iter() + .filter(|(_, status)| selector.matches(&status.labels)) + .map(|(id, _)| id.clone()) + .collect(); + hits.sort(); + hits +} + /// Compute the aggregate for one CR from the current snapshot map. -/// Exposed (crate-visible) for unit testing. pub(crate) fn compute_aggregate( - target_devices: &[String], + selector: &TargetSelector, deployment_name: &str, snapshots: &BTreeMap, ) -> DeploymentAggregate { @@ -155,13 +176,10 @@ pub(crate) fn compute_aggregate( let mut last_heartbeat: Option> = None; let mut events: Vec = Vec::new(); - for device in target_devices { - let status = match snapshots.get(device) { + for device in resolve_targets(selector, snapshots) { + let status = match snapshots.get(&device) { Some(s) => s, - None => { - agg.unreported += 1; - continue; - } + None => continue, }; if last_heartbeat.is_none_or(|t| status.timestamp > t) { last_heartbeat = Some(status.timestamp); @@ -190,8 +208,8 @@ pub(crate) fn compute_aggregate( Phase::Pending => agg.pending += 1, }, None => { - // Device reported but hasn't acknowledged this - // deployment yet. + // Device matches the selector but hasn't acknowledged + // this deployment yet — likely mid-reconcile. agg.pending += 1; } } @@ -214,7 +232,6 @@ pub(crate) fn compute_aggregate( } } - // Most recent first; cap. events.sort_by(|a, b| b.at.cmp(&a.at)); events.truncate(AGGREGATE_EVENT_CAP); @@ -241,6 +258,7 @@ mod tests { fn snapshot_with( device: &str, + labels: &[(&str, &str)], deployment: &str, phase: Phase, err: Option<&str>, @@ -261,59 +279,150 @@ mod tests { deployments, recent_events: vec![], inventory: None, + labels: labels + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + } + } + + fn selector_for(pairs: &[(&str, &str)]) -> TargetSelector { + TargetSelector { + match_labels: pairs + .iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), } } #[test] - fn aggregate_counts_and_unreported() { + fn selector_matches_only_superset_label_maps() { + let sel = selector_for(&[("group", "site-a")]); + let mut labels = BTreeMap::new(); + labels.insert("group".to_string(), "site-a".to_string()); + labels.insert("arch".to_string(), "aarch64".to_string()); + assert!(sel.matches(&labels)); + + let mut other = BTreeMap::new(); + other.insert("group".to_string(), "site-b".to_string()); + assert!(!sel.matches(&other)); + + let empty: BTreeMap = BTreeMap::new(); + assert!(!sel.matches(&empty)); + } + + #[test] + fn empty_selector_matches_nothing() { + let sel = TargetSelector::default(); + let mut labels = BTreeMap::new(); + labels.insert("group".to_string(), "site-a".to_string()); + assert!(!sel.matches(&labels)); + } + + #[test] + fn resolve_targets_is_sorted_and_filtered() { + let mut map = BTreeMap::new(); + map.insert( + "pi-03".to_string(), + snapshot_with( + "pi-03", + &[("group", "site-a")], + "hello", + Phase::Running, + None, + ), + ); + map.insert( + "pi-01".to_string(), + snapshot_with( + "pi-01", + &[("group", "site-a")], + "hello", + Phase::Running, + None, + ), + ); + map.insert( + "pi-02".to_string(), + snapshot_with( + "pi-02", + &[("group", "site-b")], + "hello", + Phase::Running, + None, + ), + ); + let sel = selector_for(&[("group", "site-a")]); + let hits = resolve_targets(&sel, &map); + assert_eq!(hits, vec!["pi-01".to_string(), "pi-03".to_string()]); + } + + #[test] + fn aggregate_counts_across_matching_devices() { let mut map = BTreeMap::new(); map.insert( "pi-01".to_string(), - snapshot_with("pi-01", "hello", Phase::Running, None), + snapshot_with( + "pi-01", + &[("group", "site-a")], + "hello", + Phase::Running, + None, + ), ); map.insert( "pi-02".to_string(), - snapshot_with("pi-02", "hello", Phase::Failed, Some("pull err")), + snapshot_with( + "pi-02", + &[("group", "site-a")], + "hello", + Phase::Failed, + Some("pull err"), + ), ); - // pi-03 is a target but never reported. - let targets = vec![ - "pi-01".to_string(), - "pi-02".to_string(), - "pi-03".to_string(), - ]; - let agg = compute_aggregate(&targets, "hello", &map); + // pi-03 matches too but hasn't acknowledged this deployment. + let mut pi03 = AgentStatus { + device_id: Id::from("pi-03".to_string()), + status: "running".to_string(), + timestamp: ts("2026-04-22T01:00:00Z"), + deployments: BTreeMap::new(), + recent_events: vec![], + inventory: None, + labels: BTreeMap::new(), + }; + pi03.labels + .insert("group".to_string(), "site-a".to_string()); + map.insert("pi-03".to_string(), pi03); + // pi-04 is in a different group — should be ignored entirely. + map.insert( + "pi-04".to_string(), + snapshot_with( + "pi-04", + &[("group", "site-b")], + "hello", + Phase::Running, + None, + ), + ); + + let sel = selector_for(&[("group", "site-a")]); + let agg = compute_aggregate(&sel, "hello", &map); assert_eq!(agg.succeeded, 1); assert_eq!(agg.failed, 1); - assert_eq!(agg.pending, 0); - assert_eq!(agg.unreported, 1); + assert_eq!(agg.pending, 1); assert_eq!(agg.last_error.as_ref().unwrap().device_id, "pi-02"); assert_eq!(agg.last_error.as_ref().unwrap().message, "pull err"); } - #[test] - fn device_reported_but_no_deployment_entry_is_pending() { - // Agent heartbeated (device known to operator) but hasn't - // acknowledged this specific deployment yet. - let mut map = BTreeMap::new(); - map.insert( - "pi-01".to_string(), - AgentStatus { - device_id: Id::from("pi-01".to_string()), - status: "running".to_string(), - timestamp: ts("2026-04-22T01:00:00Z"), - deployments: BTreeMap::new(), - recent_events: vec![], - inventory: None, - }, - ); - let agg = compute_aggregate(&["pi-01".to_string()], "hello", &map); - assert_eq!(agg.pending, 1); - assert_eq!(agg.unreported, 0); - } - #[test] fn events_filtered_to_matching_deployment_only() { - let mut status = snapshot_with("pi-01", "hello", Phase::Running, None); + let mut status = snapshot_with( + "pi-01", + &[("group", "site-a")], + "hello", + Phase::Running, + None, + ); status.recent_events = vec![ EventEntry { at: ts("2026-04-22T01:00:05Z"), @@ -336,7 +445,8 @@ mod tests { ]; let mut map = BTreeMap::new(); map.insert("pi-01".to_string(), status); - let agg = compute_aggregate(&["pi-01".to_string()], "hello", &map); + let sel = selector_for(&[("group", "site-a")]); + let agg = compute_aggregate(&sel, "hello", &map); assert_eq!(agg.recent_events.len(), 1); assert_eq!(agg.recent_events[0].message, "hello reconciled"); } diff --git a/iot/iot-operator-v0/src/controller.rs b/iot/iot-operator-v0/src/controller.rs index 2d402a4b..a9f963c7 100644 --- a/iot/iot-operator-v0/src/controller.rs +++ b/iot/iot-operator-v0/src/controller.rs @@ -12,6 +12,7 @@ use kube::runtime::watcher::Config as WatcherConfig; use kube::{Api, Client, ResourceExt}; use serde_json::json; +use crate::aggregate::{StatusSnapshots, resolve_targets}; use crate::crd::{Deployment, ScorePayload}; const FINALIZER: &str = "iot.nationtech.io/finalizer"; @@ -26,18 +27,21 @@ pub enum Error { Serde(#[from] serde_json::Error), #[error("missing namespace on resource")] MissingNamespace, - #[error("missing target devices")] - MissingTargets, } pub struct Context { pub client: Client, pub kv: Store, + pub snapshots: StatusSnapshots, } -pub async fn run(client: Client, kv: Store) -> anyhow::Result<()> { +pub async fn run(client: Client, kv: Store, snapshots: StatusSnapshots) -> anyhow::Result<()> { let api: Api = Api::all(client.clone()); - let ctx = Arc::new(Context { client, kv }); + let ctx = Arc::new(Context { + client, + kv, + snapshots, + }); tracing::info!("starting Deployment controller"); Controller::new(api, WatcherConfig::default()) @@ -58,10 +62,11 @@ async fn reconcile(obj: Arc, ctx: Arc) -> Result = Api::namespaced(ctx.client.clone(), &ns); + let ctx = ctx.clone(); finalizer(&api, FINALIZER, obj, |event| async { match event { - FinalizerEvent::Apply(d) => apply(d, &api, &ctx.kv).await, - FinalizerEvent::Cleanup(d) => cleanup(d, &ctx.kv).await, + FinalizerEvent::Apply(d) => apply(d, &api, &ctx).await, + FinalizerEvent::Cleanup(d) => cleanup(d, &ctx).await, } }) .await @@ -75,47 +80,81 @@ async fn reconcile(obj: Arc, ctx: Arc) -> Result, api: &Api, kv: &Store) -> Result { +async fn apply( + obj: Arc, + api: &Api, + ctx: &Context, +) -> Result { let name = obj.name_any(); - if obj.spec.target_devices.is_empty() { - return Err(Error::MissingTargets); - } let score_json = serialize_score(&obj.spec.score)?; + // Resolve the selector against the current fleet snapshot. + // Zero matches is a legitimate state (devices may not have + // heartbeated yet) — we log and requeue rather than erroring. + // Drift (new devices showing up, old devices relabeled away) is + // handled by the fast requeue cadence below; no need for a + // cross-task subscription. + let snapshot = { ctx.snapshots.lock().await.clone() }; + let targets = resolve_targets(&obj.spec.target_selector, &snapshot); + + if targets.is_empty() { + tracing::info!( + %name, + selector = ?obj.spec.target_selector, + "selector matches no devices yet; will retry" + ); + // Requeue fast so a just-joined device picks the deployment + // up within seconds, not minutes. + return Ok(Action::requeue(Duration::from_secs(15))); + } + let already_observed = obj .status .as_ref() .and_then(|s| s.observed_score_string.as_deref()) == Some(score_json.as_str()); - if already_observed { - tracing::debug!(%name, "score unchanged; skipping KV write and status patch"); - return Ok(Action::requeue(Duration::from_secs(300))); + + if !already_observed { + // JSON-Merge Patch: leaves `aggregate` (populated by the + // aggregator task) intact. + let status = json!({ "status": { "observedScoreString": score_json } }); + api.patch_status(&name, &PatchParams::default(), &Patch::Merge(&status)) + .await?; } - for device_id in &obj.spec.target_devices { + for device_id in &targets { let key = kv_key(device_id, &name); - kv.put(key.clone(), score_json.clone().into_bytes().into()) + // `put` is idempotent by byte-equality on NATS JetStream + // KV — re-writing the same payload costs one store op but + // no listener wakeup. Safe to run every reconcile. + ctx.kv + .put(key.clone(), score_json.clone().into_bytes().into()) .await .map_err(|e| Error::Kv(e.to_string()))?; tracing::info!(%key, "wrote desired state"); } - // JSON-Merge Patch: this leaves other status fields - // (notably `aggregate`, populated by the aggregator task) intact. - let status = json!({ - "status": { "observedScoreString": score_json } - }); - api.patch_status(&name, &PatchParams::default(), &Patch::Merge(&status)) - .await?; - - Ok(Action::requeue(Duration::from_secs(300))) + // Keep the cadence fast enough that selector drift (a device + // relabeled mid-flight, or one joining the fleet) converges + // within 30 s without needing a snapshots-change subscription. + Ok(Action::requeue(Duration::from_secs(30))) } -async fn cleanup(obj: Arc, kv: &Store) -> Result { +async fn cleanup(obj: Arc, ctx: &Context) -> Result { let name = obj.name_any(); - for device_id in &obj.spec.target_devices { + // Resolve against current snapshot to find KV entries to clean. + // A device that left the fleet between apply and delete may + // have a stale KV entry we never clean here — accepted: the + // NATS bucket eventually hits its per-key history limit and the + // orphan ages out. If that becomes a real problem, record the + // resolved list in CR status and use it here instead. + let snapshot = { ctx.snapshots.lock().await.clone() }; + let targets = resolve_targets(&obj.spec.target_selector, &snapshot); + + for device_id in &targets { let key = kv_key(device_id, &name); - kv.delete(&key) + ctx.kv + .delete(&key) .await .map_err(|e| Error::Kv(e.to_string()))?; tracing::info!(%key, "deleted desired state"); diff --git a/iot/iot-operator-v0/src/crd.rs b/iot/iot-operator-v0/src/crd.rs index 95bda4f2..1603fece 100644 --- a/iot/iot-operator-v0/src/crd.rs +++ b/iot/iot-operator-v0/src/crd.rs @@ -1,3 +1,5 @@ +use std::collections::BTreeMap; + use kube::CustomResource; use schemars::JsonSchema; use schemars::schema::{ @@ -17,12 +19,47 @@ use serde::{Deserialize, Serialize}; )] #[serde(rename_all = "camelCase")] pub struct DeploymentSpec { - pub target_devices: Vec, + /// Label selector that picks which devices in the fleet run this + /// deployment. Devices publish their labels via `AgentStatus`; + /// the operator resolves the selector against the current fleet + /// snapshot on every reconcile. + /// + /// Matches the Kubernetes `LabelSelector.matchLabels` wire format + /// so the CLI, dashboards, and kubectl tooling all speak the + /// same selector grammar. Expressions (`In`, `NotIn`, etc.) are + /// deferred until there's a concrete need. + pub target_selector: TargetSelector, #[schemars(schema_with = "score_payload_schema")] pub score: ScorePayload, pub rollout: Rollout, } +/// Subset of Kubernetes `LabelSelector` — only `matchLabels` for +/// now. A device matches iff its label map is a superset of +/// `matchLabels`. An empty `matchLabels` matches nothing on +/// purpose (a deployment targeting "all devices" is almost always a +/// user error; if it's genuinely intended, pick a label every +/// device has, e.g. `managed-by=iot-agent`). +#[derive(Serialize, Deserialize, Clone, Debug, Default, JsonSchema, PartialEq, Eq)] +#[serde(rename_all = "camelCase")] +pub struct TargetSelector { + #[serde(default)] + pub match_labels: BTreeMap, +} + +impl TargetSelector { + /// True iff every (k, v) in `match_labels` is present in `labels`. + /// Empty selector matches nothing (see struct docs). + pub fn matches(&self, labels: &BTreeMap) -> bool { + if self.match_labels.is_empty() { + return false; + } + self.match_labels + .iter() + .all(|(k, v)| labels.get(k).is_some_and(|got| got == v)) + } +} + #[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)] pub struct ScorePayload { #[serde(rename = "type")] @@ -119,14 +156,12 @@ pub struct DeploymentStatus { pub struct DeploymentAggregate { /// Count of devices where the deployment is in each phase. /// Always populated (zeros are valid) so the operator can patch - /// the whole subtree atomically. + /// the whole subtree atomically. With selector-based targeting + /// there is no "unreported" counterpart — a device that has + /// never heartbeated is invisible to the selector machinery. pub succeeded: u32, pub failed: u32, pub pending: u32, - /// Count of target devices that haven't yet heartbeated at all. - /// "failed to join fleet" vs. "failed to reconcile" — different - /// signals, different remedies. - pub unreported: u32, /// Device id of the most recent device reporting a failure, /// with its short error message. Surfaces the top failure to /// the CR's status without needing per-device subresource diff --git a/iot/iot-operator-v0/src/main.rs b/iot/iot-operator-v0/src/main.rs index 79a7370b..bd4c20e2 100644 --- a/iot/iot-operator-v0/src/main.rs +++ b/iot/iot-operator-v0/src/main.rs @@ -95,12 +95,16 @@ async fn run(nats_url: &str, bucket: &str) -> Result<()> { let client = Client::try_default().await?; + // Shared fleet snapshot: aggregator writes, controller reads. + // One Arc> lives through both tasks. + let snapshots = aggregate::new_snapshots(); + // Controller + aggregator run concurrently. If either returns // an error, tear down the whole process — kube-rs's Controller // already handles transient reconcile failures internally. let ctl_client = client.clone(); tokio::select! { - r = controller::run(ctl_client, desired_state_kv) => r, - r = aggregate::run(client, status_kv) => r, + r = controller::run(ctl_client, desired_state_kv, snapshots.clone()) => r, + r = aggregate::run(client, status_kv, snapshots) => r, } } diff --git a/iot/scripts/smoke-a4.sh b/iot/scripts/smoke-a4.sh index c7fe913a..4e1402fd 100755 --- a/iot/scripts/smoke-a4.sh +++ b/iot/scripts/smoke-a4.sh @@ -42,6 +42,11 @@ ARCH="${ARCH:-x86-64}" VM_NAME="${VM_NAME:-iot-demo-vm}" DEVICE_ID="${DEVICE_ID:-$VM_NAME}" GROUP="${GROUP:-group-a}" +# The VM publishes these labels; the deployment selector below +# targets `device=$DEVICE_ID` so a single-device smoke run picks +# exactly this agent. +VM_LABELS="${VM_LABELS:-group=$GROUP,device=$DEVICE_ID}" +DEPLOY_SELECTOR="${DEPLOY_SELECTOR:-device=$DEVICE_ID}" LIBVIRT_URI="${LIBVIRT_URI:-qemu:///system}" NATS_NAMESPACE="${NATS_NAMESPACE:-iot-system}" @@ -299,7 +304,7 @@ fi --arch "$EXAMPLE_ARCH" \ --vm-name "$VM_NAME" \ --device-id "$DEVICE_ID" \ - --group "$GROUP" \ + --labels "$VM_LABELS" \ --agent-binary "$AGENT_BINARY" \ --nats-url "nats://$NAT_GW:$NATS_NODE_PORT" ) @@ -372,7 +377,7 @@ if [[ "$AUTO" == "1" ]]; then cargo run -q -p example_iot_apply_deployment -- \ --namespace "$DEPLOY_NS" \ --name "$DEPLOY_NAME" \ - --target-device "$DEVICE_ID" \ + --to "$DEPLOY_SELECTOR" \ --image "$V1_IMAGE" \ --port "$DEPLOY_PORT" ) @@ -419,7 +424,7 @@ if [[ "$AUTO" == "1" ]]; then cargo run -q -p example_iot_apply_deployment -- \ --namespace "$DEPLOY_NS" \ --name "$DEPLOY_NAME" \ - --target-device "$DEVICE_ID" \ + --to "$DEPLOY_SELECTOR" \ --image "$V2_IMAGE" \ --port "$DEPLOY_PORT" ) @@ -445,7 +450,7 @@ if [[ "$AUTO" == "1" ]]; then cargo run -q -p example_iot_apply_deployment -- \ --namespace "$DEPLOY_NS" \ --name "$DEPLOY_NAME" \ - --target-device "$DEVICE_ID" \ + --to "$DEPLOY_SELECTOR" \ --delete ) for _ in $(seq 1 60); do @@ -485,17 +490,17 @@ $(printf '\033[1mApply an nginx deployment (typed Rust):\033[0m\n') cargo run -q -p example_iot_apply_deployment -- \\ --namespace $DEPLOY_NS \\ --name $DEPLOY_NAME \\ - --target-device $DEVICE_ID \\ + --to $DEPLOY_SELECTOR \\ --image docker.io/library/nginx:latest $(printf '\033[1mUpgrade it:\033[0m\n') cargo run -q -p example_iot_apply_deployment -- \\ - --namespace $DEPLOY_NS --name $DEPLOY_NAME --target-device $DEVICE_ID \\ + --namespace $DEPLOY_NS --name $DEPLOY_NAME --to $DEPLOY_SELECTOR \\ --image docker.io/library/nginx:1.26 $(printf '\033[1mPreview the CR as JSON (and apply via kubectl):\033[0m\n') cargo run -q -p example_iot_apply_deployment -- \\ - --name $DEPLOY_NAME --target-device $DEVICE_ID \\ + --name $DEPLOY_NAME --to $DEPLOY_SELECTOR \\ --image docker.io/library/nginx:latest --print | kubectl apply -f - $(printf '\033[1mConnect to the device:\033[0m\n') -- 2.39.5