The operator watches the \`agent-status\` bucket, keeps a per-device
snapshot in memory, and folds it into each Deployment CR's
\`.status.aggregate\` subtree every 5 seconds. The answer to the user's
stated requirement — "CRD .status reflect-back: per-device
succeeded/failed counts + recent log lines" — now lives in the CR
itself, observable via \`kubectl get -o jsonpath\` or any UI that
speaks k8s status subresources.
**Shape (in iot/iot-operator-v0/src/crd.rs)**
DeploymentStatus {
observed_score_string, // unchanged; controller change-detect
aggregate: Option<{
succeeded: u32, // devices with Phase::Running
failed: u32, // devices with Phase::Failed
pending: u32, // devices with Phase::Pending or
// reported-but-no-phase-entry-yet
unreported: u32, // target devices that never heartbeated
last_error: Option<{ // most recent failing device + short msg
device_id, message, at
}>,
recent_events: Vec<{ // last-N events across the fleet, newest first
at, severity, device_id, message, deployment
}>,
last_heartbeat_at, // freshness signal for the whole fleet
}>
}
**New module** \`iot/iot-operator-v0/src/aggregate.rs\`
- \`watch_status_bucket\`: subscribes to \`status.>\` on the
agent-status bucket, maintains a \`BTreeMap<device_id, AgentStatus>\`
in memory. Malformed payloads + malformed keys log-and-skip; the
snapshot map is always the latest good shape.
- \`aggregate_loop\`: 5 s ticker. Per tick: list Deployment CRs,
clone the snapshot (no lock held across network calls), compute
each CR's aggregate, JSON-Merge-Patch \`.status.aggregate\`. Merge
patch composes cleanly with the controller's
\`observedScoreString\` patch — neither clobbers the other.
- \`compute_aggregate\` pure fn: classification logic is in one
place, four unit tests pin its behaviour (counts + unreported,
reported-but-no-phase-entry = pending, event filter matches
deployment name only, status-key parser).
**Operator wiring** (\`main.rs\`)
\`run()\` now opens *both* KV buckets at startup, spawns the
controller and the aggregator concurrently via
\`tokio::select!\`. Either returning an error tears the process
down — kube-rs's Controller already absorbs transient reconcile
errors internally, so anything escaping is genuinely fatal.
**Controller tweak**
The apply path's \`patch_status\` was rebuilding the whole
\`DeploymentStatus\` struct, which would clobber the aggregator's
writes. Switched to raw JSON-Merge-Patch for the
\`observedScoreString\` field only. Behaviour preserved, aggregate
subtree left intact.
**Smoke assertion** (smoke-a4.sh --auto)
After apply + curl succeeds, the --auto path now asserts
\`kubectl get deployment.iot.nationtech.io ... -o
jsonpath='{.status.aggregate.succeeded}'\` reaches 1 within
60 s. Proves the full agent → status bucket → operator aggregate →
CRD status loop, end to end.
Verified locally: \`cargo test -p iot-operator-v0 --lib\` 4/4 green,
\`cargo check --all-targets --all-features\` clean.
23 lines
676 B
TOML
23 lines
676 B
TOML
[package]
|
|
name = "iot-operator-v0"
|
|
version = "0.1.0"
|
|
edition = "2024"
|
|
rust-version = "1.85"
|
|
|
|
[dependencies]
|
|
harmony = { path = "../../harmony" }
|
|
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
|
|
chrono = { workspace = true, features = ["serde"] }
|
|
kube = { workspace = true, features = ["runtime", "derive"] }
|
|
k8s-openapi.workspace = true
|
|
async-nats = { workspace = true }
|
|
serde.workspace = true
|
|
serde_json.workspace = true
|
|
schemars = "0.8.22"
|
|
tokio.workspace = true
|
|
tracing = { workspace = true }
|
|
tracing-subscriber = { workspace = true }
|
|
anyhow.workspace = true
|
|
clap.workspace = true
|
|
futures-util = { workspace = true }
|
|
thiserror.workspace = true |