Files
harmony/iot/iot-operator-v0/Cargo.toml
Jean-Gabriel Gill-Couture 37e69b36cf feat(iot-operator): aggregate agent-status into DeploymentStatus.aggregate
The operator watches the \`agent-status\` bucket, keeps a per-device
snapshot in memory, and folds it into each Deployment CR's
\`.status.aggregate\` subtree every 5 seconds. The answer to the user's
stated requirement — "CRD .status reflect-back: per-device
succeeded/failed counts + recent log lines" — now lives in the CR
itself, observable via \`kubectl get -o jsonpath\` or any UI that
speaks k8s status subresources.

**Shape (in iot/iot-operator-v0/src/crd.rs)**

  DeploymentStatus {
    observed_score_string,   // unchanged; controller change-detect
    aggregate: Option<{
      succeeded: u32,        // devices with Phase::Running
      failed: u32,           // devices with Phase::Failed
      pending: u32,          // devices with Phase::Pending or
                             // reported-but-no-phase-entry-yet
      unreported: u32,       // target devices that never heartbeated
      last_error: Option<{   // most recent failing device + short msg
        device_id, message, at
      }>,
      recent_events: Vec<{   // last-N events across the fleet, newest first
        at, severity, device_id, message, deployment
      }>,
      last_heartbeat_at,     // freshness signal for the whole fleet
    }>
  }

**New module** \`iot/iot-operator-v0/src/aggregate.rs\`

  - \`watch_status_bucket\`: subscribes to \`status.>\` on the
    agent-status bucket, maintains a \`BTreeMap<device_id, AgentStatus>\`
    in memory. Malformed payloads + malformed keys log-and-skip; the
    snapshot map is always the latest good shape.
  - \`aggregate_loop\`: 5 s ticker. Per tick: list Deployment CRs,
    clone the snapshot (no lock held across network calls), compute
    each CR's aggregate, JSON-Merge-Patch \`.status.aggregate\`. Merge
    patch composes cleanly with the controller's
    \`observedScoreString\` patch — neither clobbers the other.
  - \`compute_aggregate\` pure fn: classification logic is in one
    place, four unit tests pin its behaviour (counts + unreported,
    reported-but-no-phase-entry = pending, event filter matches
    deployment name only, status-key parser).

**Operator wiring** (\`main.rs\`)

  \`run()\` now opens *both* KV buckets at startup, spawns the
  controller and the aggregator concurrently via
  \`tokio::select!\`. Either returning an error tears the process
  down — kube-rs's Controller already absorbs transient reconcile
  errors internally, so anything escaping is genuinely fatal.

**Controller tweak**

  The apply path's \`patch_status\` was rebuilding the whole
  \`DeploymentStatus\` struct, which would clobber the aggregator's
  writes. Switched to raw JSON-Merge-Patch for the
  \`observedScoreString\` field only. Behaviour preserved, aggregate
  subtree left intact.

**Smoke assertion** (smoke-a4.sh --auto)

  After apply + curl succeeds, the --auto path now asserts
  \`kubectl get deployment.iot.nationtech.io ... -o
  jsonpath='{.status.aggregate.succeeded}'\` reaches 1 within
  60 s. Proves the full agent → status bucket → operator aggregate →
  CRD status loop, end to end.

Verified locally: \`cargo test -p iot-operator-v0 --lib\` 4/4 green,
\`cargo check --all-targets --all-features\` clean.
2026-04-21 21:50:00 -04:00

23 lines
676 B
TOML

[package]
name = "iot-operator-v0"
version = "0.1.0"
edition = "2024"
rust-version = "1.85"
[dependencies]
harmony = { path = "../../harmony" }
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
chrono = { workspace = true, features = ["serde"] }
kube = { workspace = true, features = ["runtime", "derive"] }
k8s-openapi.workspace = true
async-nats = { workspace = true }
serde.workspace = true
serde_json.workspace = true
schemars = "0.8.22"
tokio.workspace = true
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
anyhow.workspace = true
clap.workspace = true
futures-util = { workspace = true }
thiserror.workspace = true