diff --git a/Cargo.lock b/Cargo.lock index e2154e7a..11d14ad7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3179,6 +3179,25 @@ dependencies = [ "tokio", ] +[[package]] +name = "example_iot_load_test" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-nats", + "chrono", + "clap", + "harmony-reconciler-contracts", + "iot-operator-v0", + "k8s-openapi", + "kube", + "rand 0.9.2", + "serde_json", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "example_iot_nats_install" version = "0.1.0" @@ -3758,6 +3777,7 @@ dependencies = [ "harmony_types", "serde", "serde_json", + "thiserror 2.0.18", ] [[package]] diff --git a/ROADMAP/iot_platform/chapter_4_aggregation_scale.md b/ROADMAP/iot_platform/chapter_4_aggregation_scale.md new file mode 100644 index 00000000..d6fe82f3 --- /dev/null +++ b/ROADMAP/iot_platform/chapter_4_aggregation_scale.md @@ -0,0 +1,406 @@ +# Chapter 4 — Aggregation architecture at IoT scale + +> **Status: design draft (2026-04-22)** +> +> Design document for the Chapter 4 aggregation rework. Review first, +> implement after. Supersedes the Chapter 2 aggregator's O(deployments × devices) +> per-tick recompute, which works for a 10-device smoke but breaks +> the moment a real fleet lands. + +## 1. Why now + +We have no real deployment in the field yet. That's a liability when +shipping (no user, no revenue) but a gift when designing: we can move +the data model before customers depend on it. After a partner fleet +lands, changing the aggregation substrate is a multi-quarter +migration. Doing it now is days of work. + +Chapter 2's aggregator was the right "make it work" design for a +walking-skeleton proof. It's the wrong "make it scale" design for a +partner deployment of even a few hundred devices, let alone the +fleet sizes the product thesis targets. This chapter replaces it. + +## 2. What's wrong today + +**Per-tick cost, current design.** Every 5 seconds, for each +Deployment CR, resolve the selector against the full device snapshot +and fold into an aggregate: + +``` +O(deployments × devices) per tick ++ 1 kube patch per CR per tick +``` + +At 10k deployments × 1M devices, that's 10^10 selector evaluations +and 10k apiserver patches every 5 s. Nothing resembles viable there. + +**What else goes wrong at scale.** + +- The operator holds the full fleet snapshot in memory. 1M `AgentStatus` + payloads × a few kB each = GB of heap, dominated by `recent_events` + rings. +- Agent heartbeats publish the whole `AgentStatus` every 30 s — a lot + of bytes on the wire whose only incremental content is usually a + timestamp update. +- `agent-status` is a KV bucket. KV is designed for "latest value per + key," not "stream of state changes." We've been using it for both + roles and paying the worst of each. +- Logs are nowhere yet (good — this is the moment to put them in the + right place before we're committed). + +## 3. Design overview + +Shift to a **CQRS-style architecture** where devices write their +authoritative state, and the operator maintains incrementally-updated +aggregates driven by state-change events. + +``` + device (N× agents) operator + ────────────────── ──────── + current state keys ───reads─▶ on cold-start: + (authoritative) walk keys → rebuild counters + then: stream consumer + state-change events ═ JS stream═▶ ± counters per event + (delta stream) ± update reverse index + on tick (1 Hz): + device_info keys ───reads─▶ patch .status for dirty deployments + (labels, inventory) + + logs ───at-least-once NATS subj────▶ not stored centrally + (streamed on query) +``` + +Three substrates, each chosen for its fit: + +- **JetStream KV, per-device keys** — device-authoritative state. + Cheap to read when needed, never scanned globally at scale. +- **JetStream stream, per-device events** — ordered delta feed. + Operator consumers replay on restart, consume incrementally during + steady state. +- **Plain NATS subjects, logs** — at-least-once pub/sub, device-side + buffering (~10k lines), streamed on query. + +## 4. Data model + +### 4.1 NATS KV buckets + +**`device-info`** — static-ish facts per device, infrequent updates. + +| Key | Value | Written by | Read by | +|-----|-------|------------|---------| +| `info.` | `DeviceInfo` (labels, inventory, agent_version) | agent on startup + label change | operator (selector resolution, inventory display) | + +**`device-state`** — current phase per deployment per device. +Authoritative source of truth for "what's running where." + +| Key | Value | Written by | Read by | +|-----|-------|------------|---------| +| `state..` | `DeploymentState` (phase, last_event_at, last_error) | agent on reconcile transition | operator on cold-start only | + +One key per (device, deployment) pair. Natural TTL via JetStream KV +per-key history — lets us cap the keyspace. + +**`device-heartbeat`** — liveness only. Tiny payload, frequent +updates. + +| Key | Value | Written by | Read by | +|-----|-------|------------|---------| +| `heartbeat.` | `{ timestamp }` (32 bytes) | agent every 30s | operator (stale detection) | + +Separate from `device-state` so routine heartbeats don't churn the +state keys or emit spurious state-change events. + +### 4.2 NATS JetStream stream + +**`device-events`** — ordered delta feed for operator aggregation. + +- Subject: `events.state..` +- Payload: `StateChangeEvent { from: Phase, to: Phase, at, last_error }` +- Retention: time-based (e.g. 24h) — consumers that fall further + behind than retention rebuild from `device-state` KV on recovery. +- Agents emit one event per phase transition, **not** per heartbeat. + +Separate stream for **event log** (user-facing reconcile log events): + +- Subject: `events.log.` +- Payload: `LogEvent { at, severity, message, deployment? }` +- Retention: time-based (1h, enough for "show me what happened the + last few minutes" queries; the device's in-memory ring holds the + rest). + +### 4.3 Log transport (NOT JetStream) + +- Subject: `logs.` — plain pub/sub, at-least-once +- Not persisted by NATS +- Device buffers last ~10k lines in a ring buffer +- Query protocol: request-reply on `logs..query` + - Device responds with buffer contents, then streams live tail + until the query closes + +This is a dedicated transport because structured logs at fleet scale +(1M devices × 1k lines/h = 1B messages/h) would crush JetStream's +per-subject storage without adding operator-visible value. Operators +only look at logs on-demand, per-device; device-side buffering +matches the access pattern. + +### 4.4 CRD fields + +Minimal change from Chapter 2: + +- `.status.aggregate.succeeded | failed | pending` — now sourced + from counters, not per-tick fold. +- `.status.aggregate.last_error` — updated on `to: Failed` events. +- `.status.aggregate.last_heartbeat_at` — from the per-deployment + latest event. +- `.status.aggregate.recent_events` — bounded per-deployment ring, + updated on event arrival. +- **Drop** `.status.aggregate.unreported` (no meaningful definition + under selector-based targeting — already removed in the pre-chapter + cleanup). +- **Add** `.status.aggregate.stale: u32` — count of devices matching + the selector whose last heartbeat is older than a threshold + (default 5 min). This is the replacement for "unreported" that + makes sense at scale. Computed on tick from the operator's + reverse-indexed view, not per-device query. + +### 4.5 Operator in-memory state + +- **Counters** — `HashMap`, one entry + per CR, updated atomically on event arrival. +- **Reverse index** — `HashMap>`, + updated when a device's labels change or when a CR's selector + changes. Lets a state-change event find affected deployments in + O(deployments-matching-this-device) rather than O(all-deployments). +- **Last-error rollup** — per deployment, the most recent error + keyed by timestamp. +- **Recent-events ring** — per deployment, bounded by N (e.g. 10). +- **Dirty set** — deployments whose aggregate has changed since last + patch. Tick reads + clears this set; only dirty deployments get + patched. + +Operator heap is bounded by fleet + deployment count, not their +product. + +## 5. Counter invariants (the contract) + +Correctness rests on two rules: + +### 5.1 Device publishes exactly one transition per reconcile outcome + +Every reconcile results in a state. If the state differs from the +last published state for `(device, deployment)`, the agent: + +1. Writes the new state to `state..` KV (CAS + against expected-revision for multi-writer safety — only one + agent process per device, so contention is theoretical). +2. Publishes a `StateChangeEvent` to + `events.state..`. + +These two writes must be atomic from the agent's perspective — if +(1) succeeds and (2) fails (or vice versa), the agent retries until +both reach NATS. Worst case: a duplicate event on the stream; +counter handles duplicates via `from → to` structure (see 5.2). + +### 5.2 Counters are driven by transitions, not snapshots + +Each event carries `from: Phase, to: Phase`. Counter update is a +single atomic action: + +```rust +counters[(deployment, from)] -= 1; +counters[(deployment, to)] += 1; +``` + +Duplicates (same `from → to` replayed) are a no-op if `from` == +current phase for that (device, deployment) — the operator +cross-checks the device's current state in the reverse index before +applying. A duplicate past event is detected and ignored; a duplicate +current event is idempotent anyway (counters converge). + +### 5.3 The bootstrap transition + +A device's first-ever event for a deployment has `from: None` (or a +sentinel `Unassigned` variant): counter update is just `to` +increment. + +### 5.4 Device leaves fleet + +When a device's heartbeat goes stale past threshold + grace, OR +when its labels no longer match the deployment's selector: + +- Counters are decremented for every deployment the device was + previously contributing to (via the reverse index). +- The device's state keys aren't touched — they're the authoritative + record; a device re-joining resumes from them. + +### 5.5 CR created / selector changed + +The reverse index + counters are rebuilt for the affected CR by +walking `device-info` + `device-state` once (O(devices + states) +local NATS KV reads). Cheap for a single CR; happens at CR-apply +time, not on every tick. + +## 6. Cold-start protocol + +On operator process start: + +1. **Load CRs** — list `Deployment` CRs via kube API. Build the + reverse index skeleton (deployment → selector). +2. **Load device labels** — iterate `device-info` KV keys once. + Resolve each device against every CR's selector, populate the + reverse index device-side entries. O(devices × CRs), one-time, + in-memory. For 1M devices × 10k CRs this is 10^10 op but purely + local lookups (BTreeMap matches on label maps); back-of-envelope + has it at a few seconds to a minute on a modern CPU. +3. **Rebuild counters** — iterate `device-state` KV keys once. + For each `state..`, look up the matching + deployments from the reverse index and increment counters. +4. **Attach stream consumer** — durable consumer on + `events.state.>`, starting from the newest sequence at cold-start + moment. The KV walk was the "past"; the stream is the "future." +5. **Begin tick loop** — patch dirty CRs on a 1 Hz schedule. + +Cold-start time dominated by step 2, not step 3. An ArgoCD-style +"pause all reconciles during leader election / startup" envelope +keeps the CR patches from competing with the cold-start scans. + +**What if the operator falls behind the stream's retention window?** +Reset to step 3 (re-walk `device-state`). The KV is authoritative; +the stream is an accelerator. + +## 7. CR status patch cadence + +- Counter updates happen in memory, instantly. +- The **dirty set** captures which deployments' aggregates changed + since the last patch. +- A 1 Hz ticker reads + clears the dirty set, patches those CRs. +- Individual CR patches are debounced to at most once per second + — avoids hammering the apiserver when a deployment is mid-rollout + and devices are transitioning in a burst. + +Steady-state operator → apiserver traffic is proportional to the +rate of *interesting* changes, not to fleet size. + +## 8. Failure modes + +| Scenario | Detection | Recovery | +|---|---|---| +| Operator crash | k8s restarts the pod | Cold-start protocol §6 | +| Stream consumer falls behind retention | Stream API returns out-of-range | Re-run §6 step 3 (re-walk KV) | +| Agent publishes event but KV write fails | Agent-side local retry; event is replayed | Counter is idempotent per §5.2 | +| Agent writes KV but event publish fails | Agent-side local retry | Operator never sees the transition until retry succeeds; stale threshold catches the device if agent is permanently broken | +| Device's label change lost | Heartbeat carries current labels; stale entry aged out | Periodic sync (e.g. 1/h) re-scans `device-info` to catch drift | +| Duplicate event (retry) | `from == current` in reverse index | No-op (§5.2) | +| Out-of-order event (retry ordering) | Sequence number on event | Consumer tracks per-(device, deployment) last-applied sequence; old events ignored | + +## 9. Scale back-of-envelope + +**Target:** 1M devices, 10k deployments, p50 reconcile rate 1 event +per device per hour. + +- **Event volume.** 1M × (1/3600s) = 278 events/s. +- **Operator event-processing cost.** Each event touches a bounded + number of in-memory counters (via reverse index). At 278 eps, this + is ~1 µs-equivalent of CPU, ~0 network (JetStream local to operator). +- **Operator → apiserver patches.** Deployments change at a rate + far below event rate; debounced dirty-set drains limit patches to + a few per second even during bursty rollouts. +- **Operator memory.** Reverse index entries (device_id + set of + deployment keys) ≈ 200 bytes × 1M = 200 MB. Counters ≈ 10k × few + fields = negligible. Last-error + recent-events rings ≈ 10k × 10 + entries × 512 bytes = 50 MB. Total ~250 MB — fine. +- **Cold-start time.** 1M KV reads × amortized 0.1 ms (JetStream KV + is fast for key iteration) = 100 s. Acceptable for a + several-minute-once-per-release recovery window. If it becomes a + problem, chunk the walk and resume-from-checkpoint. +- **Stale device sweep.** On each tick, O(dirty set × reverse index + lookups). Stale detection itself is O(devices-whose-heartbeat-is-old); + a second, slower ticker (e.g. 30 s) scans the heartbeat KV for + entries older than threshold and emits synthetic "device went + stale" events that drive the same counter-decrement path. + +## 10. Schema migration + +`Deployment` CRD is still `v1alpha1`, not deployed anywhere, so no +migration machinery is needed for the CRD itself — we just change +the aggregate subtree definition. + +`harmony-reconciler-contracts::AgentStatus` is deprecated by this +chapter. Replaced by narrower wire types: + +- `DeviceInfo` — what `info.` stores +- `DeploymentState` — what `state..` stores +- `HeartbeatPayload` — what `heartbeat.` stores +- `StateChangeEvent` — what events stream emits +- `LogEvent` — what event-log stream emits + +The old `AgentStatus` type goes away when the old aggregator +goes away. Clean break, same CRD version. + +## 11. Implementation milestones + +Landing order, each a reviewable increment: + +1. **M1: new contracts crate shapes** — `DeviceInfo`, + `DeploymentState`, `HeartbeatPayload`, `StateChangeEvent`, + `LogEvent`. Round-trip serde tests. No runtime code changes yet. +2. **M2: agent-side rewrite** — agent writes the new KV shapes + + publishes state-change events + heartbeats. Old `AgentStatus` + publish path stays in parallel for the smoke to keep passing. +3. **M3: operator-side cold-start protocol** — new operator task + that walks the new KV buckets and builds in-memory counters. + Runs alongside the old aggregator; logs counter parity checks + against the legacy aggregator's output so we can verify + correctness before switching over. +4. **M4: operator-side event consumer** — attach the durable stream + consumer, drive counters incrementally. Parity checks still on. +5. **M5: flip CR patch source** — the new counter-backed aggregator + patches `.status.aggregate`, the legacy one goes read-only, then + deleted in the next commit. +6. **M6: logs subject + query protocol** — device-side ring buffer, + query API, a first CLI surface (`natiq logs device=X` or + equivalent) that drives it. +7. **M7: synthetic-scale test harness** — spin up 1k (then 10k) mock + agents in-process, drive a realistic event load through the + operator, measure + publish numbers. +8. **M8: delete legacy `AgentStatus`** — `harmony-reconciler-contracts` + cleanup, smoke-a4 updates. + +M1-M5 can land on one branch; M6 is adjacent work; M7-M8 close out. + +## 12. Open questions + +- **Multi-operator HA.** The design assumes one operator at a time. + Adding HA means either (a) one active + one standby operator with + NATS-based leader election, or (b) shared counter state in KV + instead of in-memory. (a) is simpler; (b) scales better. + Defer until a specific availability target demands it. +- **Counter-KV snapshots.** Should we periodically snapshot the + in-memory counter state to a `counters` KV bucket so cold-start + can resume from a recent snapshot + a short stream tail, instead + of always re-walking `device-state`? Probably yes once cold-start + time becomes an operational concern, but not in the initial cut. +- **Stream retention tuning.** 24h for `events.state.>` is a guess. + Real number depends on observed operator downtime p99. Initial + setting, tune from operational data. +- **Compaction policy for `device-state` KV.** JetStream KV + per-key history can grow unbounded if phases churn. Set + `max_history_per_key = 1` (keep only latest value) unless there's + a reason to keep transition history (there isn't — that's what + the events stream is for). +- **Agent crash before publishing state-change event.** Transition + is durably captured in the agent's local podman state; on agent + restart the reconcile loop re-observes the phase and either + re-publishes (if it differs from `state..`) or stays + silent. Correctness preserved at the cost of event-stream ordering + ambiguity during the crash window — acceptable. + +## 13. What this chapter deliberately does *not* change + +- CRD `.spec.target_selector` semantics — stays exactly as shipped. +- Operator's kube-rs controller loop for CR reconcile — stays as is. +- Helm chart structure (Chapter 3) — orthogonal. +- Authentication (Chapter Auth) — orthogonal. When that chapter + lands, every subject + KV bucket above will be re-scoped under + device-specific NATS credentials; the topology above doesn't need + to change for that to slot in. diff --git a/examples/iot_load_test/Cargo.toml b/examples/iot_load_test/Cargo.toml new file mode 100644 index 00000000..e83db8da --- /dev/null +++ b/examples/iot_load_test/Cargo.toml @@ -0,0 +1,24 @@ +[package] +name = "example_iot_load_test" +version.workspace = true +edition = "2024" +license.workspace = true + +[[bin]] +name = "iot_load_test" +path = "src/main.rs" + +[dependencies] +harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" } +iot-operator-v0 = { path = "../../iot/iot-operator-v0" } +async-nats = { workspace = true } +chrono = { workspace = true } +kube = { workspace = true, features = ["runtime", "derive"] } +k8s-openapi.workspace = true +serde_json = { workspace = true } +tokio = { workspace = true } +tracing = { workspace = true } +tracing-subscriber = { workspace = true } +anyhow = { workspace = true } +clap = { workspace = true } +rand = { workspace = true } diff --git a/examples/iot_load_test/src/main.rs b/examples/iot_load_test/src/main.rs new file mode 100644 index 00000000..61e37e3c --- /dev/null +++ b/examples/iot_load_test/src/main.rs @@ -0,0 +1,536 @@ +//! Load test for the IoT operator's `fleet_aggregator`. +//! +//! Simulates N devices across M Deployment CRs, each device pushing +//! a `DeploymentState` update to NATS every `--tick-ms`. Measures +//! throughput on both sides (devices → NATS and operator → kube +//! apiserver) and, at the end of the run, verifies each CR's +//! `.status.aggregate` counters sum to its `target_devices.len()`. +//! +//! Assumes an already-running stack: +//! - NATS reachable at `--nats-url` +//! - k8s cluster with the operator's CRD installed (KUBECONFIG) +//! - the operator process running against the same NATS + cluster +//! +//! The `iot/scripts/smoke-a4.sh` script brings all three up — pass +//! `--hold` to leave them running, then run this binary. +//! +//! Typical invocation: +//! +//! cargo run -q -p example_iot_load_test -- \ +//! --namespace iot-load \ +//! --groups 55,5,5,5,5,5,5,5,5,5 \ +//! --tick-ms 1000 \ +//! --duration-s 60 + +use anyhow::{Context, Result}; +use async_nats::jetstream::{self, kv}; +use chrono::Utc; +use clap::Parser; +use harmony_reconciler_contracts::{ + BUCKET_DEVICE_HEARTBEAT, BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE, DeploymentName, + DeploymentState, DeviceInfo, HeartbeatPayload, Id, Phase, device_heartbeat_key, + device_info_key, device_state_key, +}; +use iot_operator_v0::crd::{ + Deployment, DeploymentSpec, Rollout, RolloutStrategy, ScorePayload, +}; +use k8s_openapi::api::core::v1::Namespace; +use kube::api::{Api, DeleteParams, Patch, PatchParams, PostParams}; +use kube::Client; +use rand::Rng; +use std::collections::BTreeMap; +use std::sync::Arc; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::time::{Duration, Instant}; +use tokio::task::JoinSet; + +#[derive(Parser, Debug, Clone)] +#[command( + name = "iot_load_test", + about = "Synthetic load for the IoT operator's fleet_aggregator" +)] +struct Cli { + /// NATS URL (same one the operator connects to). + #[arg(long, default_value = "nats://localhost:4222")] + nats_url: String, + + /// k8s namespace for the load-test Deployment CRs. Created if + /// missing. + #[arg(long, default_value = "iot-load")] + namespace: String, + + /// Group shape — comma-separated device counts, one per CR. + /// Default: 100 devices over 10 groups (1 × 55 + 9 × 5). + #[arg(long, default_value = "55,5,5,5,5,5,5,5,5,5")] + groups: String, + + /// Per-device tick in ms. Each tick publishes one DeploymentState. + #[arg(long, default_value_t = 1000)] + tick_ms: u64, + + /// Heartbeat cadence in seconds (separate from the state tick). + #[arg(long, default_value_t = 30)] + heartbeat_s: u64, + + /// Total run duration in seconds before tearing down. + #[arg(long, default_value_t = 60)] + duration_s: u64, + + /// Report throughput every N seconds. + #[arg(long, default_value_t = 5)] + report_s: u64, + + /// Keep the CRs + KV entries in place after the run instead of + /// deleting them. Useful with HOLD=1 to inspect the steady-state + /// aggregate after the load finishes. + #[arg(long)] + keep: bool, +} + +/// Metrics collected across all device tasks. +#[derive(Default)] +struct Counters { + state_writes: AtomicU64, + heartbeat_writes: AtomicU64, + errors: AtomicU64, +} + +#[tokio::main] +async fn main() -> Result<()> { + tracing_subscriber::fmt() + .with_env_filter(tracing_subscriber::EnvFilter::from_default_env()) + .init(); + + let cli = Cli::parse(); + let group_sizes = parse_groups(&cli.groups)?; + let total: usize = group_sizes.iter().sum(); + + tracing::info!( + devices = total, + groups = group_sizes.len(), + shape = ?group_sizes, + tick_ms = cli.tick_ms, + duration_s = cli.duration_s, + "iot_load_test starting" + ); + + // --- NATS setup ---------------------------------------------------------- + let nc = async_nats::connect(&cli.nats_url) + .await + .with_context(|| format!("connecting to NATS at {}", cli.nats_url))?; + let js = jetstream::new(nc); + let info_bucket = open_bucket(&js, BUCKET_DEVICE_INFO).await?; + let state_bucket = open_bucket(&js, BUCKET_DEVICE_STATE).await?; + let heartbeat_bucket = open_bucket(&js, BUCKET_DEVICE_HEARTBEAT).await?; + + // --- kube setup ---------------------------------------------------------- + let client = Client::try_default().await.context("kube client")?; + ensure_namespace(&client, &cli.namespace).await?; + let deployments: Api = Api::namespaced(client.clone(), &cli.namespace); + + // --- plan groups + device ids -------------------------------------------- + let plan = build_plan(&group_sizes); + apply_crs(&deployments, &plan).await?; + publish_device_infos(&info_bucket, &plan).await?; + + // --- spawn simulators ---------------------------------------------------- + let counters = Arc::new(Counters::default()); + let mut sims = JoinSet::new(); + + let tick = Duration::from_millis(cli.tick_ms); + let hb_tick = Duration::from_secs(cli.heartbeat_s); + for device in &plan.devices { + let device = Arc::new(device.clone()); + sims.spawn(simulate_state_loop( + device.clone(), + state_bucket.clone(), + counters.clone(), + tick, + )); + sims.spawn(simulate_heartbeat_loop( + device.clone(), + heartbeat_bucket.clone(), + counters.clone(), + hb_tick, + )); + } + + // --- metrics reporter ---------------------------------------------------- + let report_tick = Duration::from_secs(cli.report_s); + let reporter_counters = counters.clone(); + let reporter = tokio::spawn(async move { + let mut ticker = tokio::time::interval(report_tick); + ticker.tick().await; // skip immediate fire + let mut prev_state = 0u64; + let mut prev_hb = 0u64; + loop { + ticker.tick().await; + let s = reporter_counters.state_writes.load(Ordering::Relaxed); + let h = reporter_counters.heartbeat_writes.load(Ordering::Relaxed); + let e = reporter_counters.errors.load(Ordering::Relaxed); + let dt = report_tick.as_secs_f64(); + let ss = (s - prev_state) as f64 / dt; + let hh = (h - prev_hb) as f64 / dt; + tracing::info!( + state_writes_total = s, + state_writes_per_s = format!("{ss:.1}"), + heartbeats_total = h, + heartbeats_per_s = format!("{hh:.1}"), + errors = e, + "load" + ); + prev_state = s; + prev_hb = h; + } + }); + + // --- run for duration ---------------------------------------------------- + let started = Instant::now(); + tokio::time::sleep(Duration::from_secs(cli.duration_s)).await; + reporter.abort(); + sims.shutdown().await; + let elapsed = started.elapsed(); + + let s = counters.state_writes.load(Ordering::Relaxed); + let h = counters.heartbeat_writes.load(Ordering::Relaxed); + let e = counters.errors.load(Ordering::Relaxed); + tracing::info!( + elapsed_s = format!("{:.1}", elapsed.as_secs_f64()), + state_writes_total = s, + state_writes_per_s = format!("{:.1}", s as f64 / elapsed.as_secs_f64()), + heartbeats_total = h, + errors = e, + "run complete" + ); + + // --- give the aggregator a second to drain -------------------------------- + tokio::time::sleep(Duration::from_secs(2)).await; + + // --- verify CR status aggregates ----------------------------------------- + let mut all_ok = true; + for group in &plan.groups { + let cr = deployments.get(&group.cr_name).await?; + let Some(status) = cr.status.as_ref().and_then(|s| s.aggregate.as_ref()) else { + tracing::warn!(cr = %group.cr_name, "aggregate missing on CR status"); + all_ok = false; + continue; + }; + let total_reported = status.succeeded + status.failed + status.pending; + let expected = group.devices.len() as u32; + let ok = total_reported == expected; + if !ok { + all_ok = false; + } + tracing::info!( + cr = %group.cr_name, + expected_devices = expected, + succeeded = status.succeeded, + failed = status.failed, + pending = status.pending, + total = total_reported, + ok, + "cr status" + ); + } + + if !cli.keep { + tracing::info!("cleanup: deleting CRs + KV entries"); + for group in &plan.groups { + let _ = deployments + .delete(&group.cr_name, &DeleteParams::default()) + .await; + } + for device in &plan.devices { + let _ = state_bucket + .delete(&device_state_key( + &device.device_id, + &DeploymentName::try_new(&device.cr_name).unwrap(), + )) + .await; + let _ = info_bucket.delete(&device_info_key(&device.device_id)).await; + let _ = heartbeat_bucket + .delete(&device_heartbeat_key(&device.device_id)) + .await; + } + } + + if all_ok { + tracing::info!("PASS — all CR aggregates match device counts"); + Ok(()) + } else { + anyhow::bail!("FAIL — at least one CR aggregate did not sum to its target device count") + } +} + +fn parse_groups(s: &str) -> Result> { + let out: Vec = s + .split(',') + .map(|t| t.trim().parse::()) + .collect::>() + .context("parsing --groups")?; + if out.is_empty() { + anyhow::bail!("--groups must have at least one size"); + } + Ok(out) +} + +/// A single simulated device and the CR it belongs to. +#[derive(Clone)] +struct DevicePlan { + device_id: String, + cr_name: String, +} + +#[derive(Clone)] +struct GroupPlan { + cr_name: String, + devices: Vec, +} + +struct Plan { + devices: Vec, + groups: Vec, +} + +fn build_plan(group_sizes: &[usize]) -> Plan { + // CR-name + device-id width scale with group count so large runs + // get zero-padded ids that sort sensibly in kubectl. + let cr_width = group_sizes.len().to_string().len().max(2); + let total: usize = group_sizes.iter().sum(); + let dev_width = total.to_string().len().max(5); + + let mut devices = Vec::new(); + let mut groups = Vec::new(); + let mut next_id = 1usize; + for (i, size) in group_sizes.iter().enumerate() { + let cr_name = format!("load-group-{i:0cr_width$}"); + let mut ids = Vec::with_capacity(*size); + for _ in 0..*size { + let id = format!("load-dev-{next_id:0dev_width$}"); + next_id += 1; + devices.push(DevicePlan { + device_id: id.clone(), + cr_name: cr_name.clone(), + }); + ids.push(id); + } + groups.push(GroupPlan { + cr_name, + devices: ids, + }); + } + Plan { devices, groups } +} + +async fn open_bucket( + js: &jetstream::Context, + bucket: &'static str, +) -> Result { + Ok(js + .create_key_value(kv::Config { + bucket: bucket.to_string(), + history: 1, + ..Default::default() + }) + .await?) +} + +async fn ensure_namespace(client: &Client, name: &str) -> Result<()> { + let api: Api = Api::all(client.clone()); + if api.get_opt(name).await?.is_some() { + return Ok(()); + } + let ns = Namespace { + metadata: kube::api::ObjectMeta { + name: Some(name.to_string()), + ..Default::default() + }, + ..Default::default() + }; + match api.create(&PostParams::default(), &ns).await { + Ok(_) => Ok(()), + Err(kube::Error::Api(ae)) if ae.code == 409 => Ok(()), + Err(e) => Err(e.into()), + } +} + +async fn apply_crs(api: &Api, plan: &Plan) -> Result<()> { + let params = PatchParams::apply("iot-load-test").force(); + let started = Instant::now(); + + // Cap concurrency so we don't overwhelm the apiserver on large + // fleets. 32 in-flight applies is well under typical apiserver + // QPS limits and keeps the startup latency predictable. + const CONCURRENCY: usize = 32; + let mut in_flight: JoinSet> = JoinSet::new(); + let mut iter = plan.groups.iter(); + + for _ in 0..CONCURRENCY { + if let Some(group) = iter.next() { + in_flight.spawn(apply_one_cr(api.clone(), group.clone(), params.clone())); + } + } + while let Some(res) = in_flight.join_next().await { + res??; + if let Some(group) = iter.next() { + in_flight.spawn(apply_one_cr(api.clone(), group.clone(), params.clone())); + } + } + + tracing::info!( + crs = plan.groups.len(), + elapsed_ms = started.elapsed().as_millis() as u64, + "applied Deployment CRs" + ); + Ok(()) +} + +async fn apply_one_cr( + api: Api, + group: GroupPlan, + params: PatchParams, +) -> Result { + let cr = Deployment::new( + &group.cr_name, + DeploymentSpec { + target_devices: group.devices.clone(), + // Score content doesn't matter — we're not running real + // agents against these CRs. The controller still writes + // to desired-state KV for each target device; that's + // wire noise we tolerate for realism. + score: ScorePayload { + type_: "PodmanV0".to_string(), + data: serde_json::json!({ + "services": [{ + "name": group.cr_name, + "image": "docker.io/library/nginx:alpine", + "ports": ["8080:80"], + }], + }), + }, + rollout: Rollout { + strategy: RolloutStrategy::Immediate, + }, + }, + ); + api.patch(&group.cr_name, ¶ms, &Patch::Apply(&cr)) + .await + .with_context(|| format!("applying CR {}", group.cr_name))?; + Ok(group.cr_name) +} + +async fn publish_device_infos(bucket: &kv::Store, plan: &Plan) -> Result<()> { + let started = Instant::now(); + const CONCURRENCY: usize = 64; + let mut in_flight: JoinSet> = JoinSet::new(); + let mut iter = plan.devices.iter(); + + for _ in 0..CONCURRENCY { + if let Some(device) = iter.next() { + in_flight.spawn(publish_one_info(bucket.clone(), device.clone())); + } + } + while let Some(res) = in_flight.join_next().await { + res??; + if let Some(device) = iter.next() { + in_flight.spawn(publish_one_info(bucket.clone(), device.clone())); + } + } + + tracing::info!( + devices = plan.devices.len(), + elapsed_ms = started.elapsed().as_millis() as u64, + "seeded DeviceInfo" + ); + Ok(()) +} + +async fn publish_one_info(bucket: kv::Store, device: DevicePlan) -> Result<()> { + let info = DeviceInfo { + device_id: Id::from(device.device_id.clone()), + labels: BTreeMap::from([("group".to_string(), device.cr_name.clone())]), + inventory: None, + updated_at: Utc::now(), + }; + let key = device_info_key(&device.device_id); + let payload = serde_json::to_vec(&info)?; + bucket.put(&key, payload.into()).await?; + Ok(()) +} + +async fn simulate_state_loop( + device: Arc, + bucket: kv::Store, + counters: Arc, + tick: Duration, +) { + let Ok(deployment) = DeploymentName::try_new(&device.cr_name) else { + return; + }; + let state_key = device_state_key(&device.device_id, &deployment); + let mut ticker = tokio::time::interval(tick); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + loop { + ticker.tick().await; + let phase = pick_phase(); + let ds = DeploymentState { + device_id: Id::from(device.device_id.clone()), + deployment: deployment.clone(), + phase, + last_event_at: Utc::now(), + last_error: matches!(phase, Phase::Failed) + .then(|| format!("synthetic failure @{}", device.device_id)), + }; + match serde_json::to_vec(&ds) { + Ok(payload) => match bucket.put(&state_key, payload.into()).await { + Ok(_) => { + counters.state_writes.fetch_add(1, Ordering::Relaxed); + } + Err(_) => { + counters.errors.fetch_add(1, Ordering::Relaxed); + } + }, + Err(_) => { + counters.errors.fetch_add(1, Ordering::Relaxed); + } + } + } +} + +async fn simulate_heartbeat_loop( + device: Arc, + bucket: kv::Store, + counters: Arc, + tick: Duration, +) { + let hb_key = device_heartbeat_key(&device.device_id); + let mut ticker = tokio::time::interval(tick); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + loop { + ticker.tick().await; + let hb = HeartbeatPayload { + device_id: Id::from(device.device_id.clone()), + at: Utc::now(), + }; + if let Ok(payload) = serde_json::to_vec(&hb) { + if bucket.put(&hb_key, payload.into()).await.is_ok() { + counters.heartbeat_writes.fetch_add(1, Ordering::Relaxed); + } else { + counters.errors.fetch_add(1, Ordering::Relaxed); + } + } + } +} + +/// Phase distribution mirroring a healthy-ish fleet: mostly Running, +/// a sprinkle of Failed + Pending to exercise the aggregator's +/// transition-handling + last_error logic. +fn pick_phase() -> Phase { + let n: u32 = rand::rng().random_range(0..100); + match n { + 0..80 => Phase::Running, + 80..90 => Phase::Failed, + _ => Phase::Pending, + } +} + diff --git a/harmony-reconciler-contracts/Cargo.toml b/harmony-reconciler-contracts/Cargo.toml index fc52cdb7..a3c5a1ca 100644 --- a/harmony-reconciler-contracts/Cargo.toml +++ b/harmony-reconciler-contracts/Cargo.toml @@ -18,3 +18,4 @@ chrono = { workspace = true, features = ["serde"] } harmony_types = { path = "../harmony_types" } serde = { workspace = true, features = ["derive"] } serde_json = { workspace = true } +thiserror = { workspace = true } diff --git a/harmony-reconciler-contracts/src/fleet.rs b/harmony-reconciler-contracts/src/fleet.rs new file mode 100644 index 00000000..92ef773f --- /dev/null +++ b/harmony-reconciler-contracts/src/fleet.rs @@ -0,0 +1,272 @@ +//! Fleet-scale wire-format types. +//! +//! Per-concern payloads on dedicated NATS KV buckets: +//! +//! | Type | Bucket | Cadence | +//! |------|--------|---------| +//! | [`DeviceInfo`] | KV `device-info` | on startup + label/inventory change | +//! | [`DeploymentState`] | KV `device-state` | on reconcile phase transition | +//! | [`HeartbeatPayload`] | KV `device-heartbeat` | every 30 s | +//! +//! The operator watches `device-state` directly — KV watch deliveries +//! are ordered and last-writer-wins, so there's no separate event +//! stream or per-write revision to track. + +use std::collections::BTreeMap; +use std::fmt; + +use chrono::{DateTime, Utc}; +use harmony_types::id::Id; +use serde::{Deserialize, Deserializer, Serialize}; + +use crate::status::{InventorySnapshot, Phase}; + +/// Deployment CR `metadata.name`, validated for NATS-subject safety. +/// +/// Scope: what identifies a Deployment to the agent. Appears in KV +/// keys (`state..`) and every in-memory map +/// keyed by "which deployment." A raw `String` here would let an +/// invalid name (containing a `.`, splitting into extra subject +/// tokens) break routing at runtime. +/// +/// Validation: +/// - Not empty. +/// - No `.` (would alias an extra subject token). +/// - No `*` / `>` (NATS wildcards). +/// - No ASCII whitespace. +/// - ≤ 253 bytes (RFC 1123 max, matches Kubernetes name limit). +/// +/// The constructor is fallible; deserialization runs the same +/// validation so malformed payloads are rejected at the wire. +#[derive(Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize)] +#[serde(transparent)] +pub struct DeploymentName(String); + +#[derive(Debug, thiserror::Error, PartialEq, Eq)] +pub enum InvalidDeploymentName { + #[error("deployment name must not be empty")] + Empty, + #[error("deployment name must not exceed 253 bytes")] + TooLong, + #[error("deployment name must not contain '.' (would alias an extra NATS subject token)")] + ContainsDot, + #[error("deployment name must not contain NATS wildcards '*' or '>'")] + ContainsWildcard, + #[error("deployment name must not contain whitespace")] + ContainsWhitespace, +} + +impl DeploymentName { + pub fn try_new(s: impl Into) -> Result { + let s = s.into(); + if s.is_empty() { + return Err(InvalidDeploymentName::Empty); + } + if s.len() > 253 { + return Err(InvalidDeploymentName::TooLong); + } + if s.contains('.') { + return Err(InvalidDeploymentName::ContainsDot); + } + if s.contains('*') || s.contains('>') { + return Err(InvalidDeploymentName::ContainsWildcard); + } + if s.chars().any(|c| c.is_ascii_whitespace()) { + return Err(InvalidDeploymentName::ContainsWhitespace); + } + Ok(Self(s)) + } + + pub fn as_str(&self) -> &str { + &self.0 + } +} + +impl fmt::Display for DeploymentName { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.0) + } +} + +impl<'de> Deserialize<'de> for DeploymentName { + fn deserialize>(de: D) -> Result { + let s = String::deserialize(de)?; + Self::try_new(s).map_err(serde::de::Error::custom) + } +} + +/// Static-ish per-device facts: routing labels, hardware, agent +/// version. Written to KV key `info.` in +/// [`crate::BUCKET_DEVICE_INFO`]. Rewritten by the agent on startup +/// and whenever its labels change — **not** on every heartbeat. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct DeviceInfo { + pub device_id: Id, + /// Routing labels. Operator resolves Deployment + /// `targetSelector.matchLabels` against this map. + #[serde(default)] + pub labels: BTreeMap, + /// Hardware / OS snapshot. `None` until the first post-startup + /// publish. + #[serde(default)] + pub inventory: Option, + /// RFC 3339 UTC timestamp of this publish. + pub updated_at: DateTime, +} + +/// Authoritative current phase for one `(device, deployment)` pair. +/// Written to KV key `state..` in +/// [`crate::BUCKET_DEVICE_STATE`]. Deleted when the deployment is +/// removed from the device. +/// +/// The operator's KV watch sees every write + delete in order, so +/// this value alone — plus the operator's in-memory belief about +/// the last phase for the pair — is enough to drive the aggregate +/// counters. No separate event stream, no per-write revision. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct DeploymentState { + pub device_id: Id, + pub deployment: DeploymentName, + pub phase: Phase, + pub last_event_at: DateTime, + #[serde(default)] + pub last_error: Option, +} + +/// Tiny liveness ping. Written to KV key `heartbeat.` in +/// [`crate::BUCKET_DEVICE_HEARTBEAT`]. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub struct HeartbeatPayload { + pub device_id: Id, + pub at: DateTime, +} + +#[cfg(test)] +mod tests { + use super::*; + + fn ts(s: &str) -> DateTime { + DateTime::parse_from_rfc3339(s).unwrap().with_timezone(&Utc) + } + + fn dn(s: &str) -> DeploymentName { + DeploymentName::try_new(s).expect("valid") + } + + #[test] + fn deployment_name_accepts_rfc1123() { + assert!(DeploymentName::try_new("hello-world").is_ok()); + assert!(DeploymentName::try_new("a").is_ok()); + assert!(DeploymentName::try_new("a-b-c-1-2-3").is_ok()); + } + + #[test] + fn deployment_name_rejects_dot() { + assert_eq!( + DeploymentName::try_new("hello.world"), + Err(InvalidDeploymentName::ContainsDot) + ); + } + + #[test] + fn deployment_name_rejects_nats_wildcards() { + assert_eq!( + DeploymentName::try_new("hello*"), + Err(InvalidDeploymentName::ContainsWildcard) + ); + assert_eq!( + DeploymentName::try_new("hello>"), + Err(InvalidDeploymentName::ContainsWildcard) + ); + } + + #[test] + fn deployment_name_rejects_empty_and_too_long() { + assert_eq!( + DeploymentName::try_new(""), + Err(InvalidDeploymentName::Empty) + ); + assert_eq!( + DeploymentName::try_new("x".repeat(254)), + Err(InvalidDeploymentName::TooLong) + ); + } + + #[test] + fn deployment_name_rejects_whitespace() { + assert_eq!( + DeploymentName::try_new("hello world"), + Err(InvalidDeploymentName::ContainsWhitespace) + ); + assert_eq!( + DeploymentName::try_new("hello\tworld"), + Err(InvalidDeploymentName::ContainsWhitespace) + ); + } + + #[test] + fn deployment_name_deserialization_validates() { + let json = r#""bad.name""#; + let result: Result = serde_json::from_str(json); + assert!(result.is_err()); + } + + #[test] + fn deployment_name_roundtrip() { + let name = dn("hello-world"); + let json = serde_json::to_string(&name).unwrap(); + assert_eq!(json, r#""hello-world""#); + let back: DeploymentName = serde_json::from_str(&json).unwrap(); + assert_eq!(name, back); + } + + #[test] + fn deployment_state_roundtrip() { + let original = DeploymentState { + device_id: Id::from("pi-01".to_string()), + deployment: dn("hello-web"), + phase: Phase::Failed, + last_event_at: ts("2026-04-22T10:05:00Z"), + last_error: Some("image pull 429".to_string()), + }; + let json = serde_json::to_string(&original).unwrap(); + let back: DeploymentState = serde_json::from_str(&json).unwrap(); + assert_eq!(original, back); + } + + #[test] + fn heartbeat_is_tiny() { + let hb = HeartbeatPayload { + device_id: Id::from("pi-01".to_string()), + at: ts("2026-04-22T10:00:30Z"), + }; + let bytes = serde_json::to_vec(&hb).unwrap(); + assert!( + bytes.len() < 96, + "heartbeat payload grew to {} bytes: {}", + bytes.len(), + String::from_utf8_lossy(&bytes), + ); + } + + #[test] + fn device_info_roundtrip() { + let original = DeviceInfo { + device_id: Id::from("pi-01".to_string()), + labels: BTreeMap::from([("group".to_string(), "site-a".to_string())]), + inventory: Some(InventorySnapshot { + hostname: "pi-01".to_string(), + arch: "aarch64".to_string(), + os: "Ubuntu 24.04".to_string(), + kernel: "6.8.0".to_string(), + cpu_cores: 4, + memory_mb: 8192, + agent_version: "0.1.0".to_string(), + }), + updated_at: ts("2026-04-22T10:00:00Z"), + }; + let json = serde_json::to_string(&original).unwrap(); + let back: DeviceInfo = serde_json::from_str(&json).unwrap(); + assert_eq!(original, back); + } +} diff --git a/harmony-reconciler-contracts/src/kv.rs b/harmony-reconciler-contracts/src/kv.rs index c773eba4..e5ae6371 100644 --- a/harmony-reconciler-contracts/src/kv.rs +++ b/harmony-reconciler-contracts/src/kv.rs @@ -7,47 +7,88 @@ //! here; agent + operator consume the constants directly, and smoke //! scripts grep for the literal values locked in the tests below. +use crate::fleet::DeploymentName; + /// Operator-written bucket. One entry per `(device, deployment)` pair. /// Values are the JSON-serialized Score envelope — today /// `harmony::modules::podman::IotScore`, tomorrow any variant of /// a polymorphic `Score` enum the framework ships. pub const BUCKET_DESIRED_STATE: &str = "desired-state"; -/// Agent-written bucket. One entry per device at `status.`. -/// Values are JSON-serialized [`crate::AgentStatus`]. -pub const BUCKET_AGENT_STATUS: &str = "agent-status"; +/// Static-ish per-device facts: routing labels, inventory, agent +/// version. Agent rewrites the entry on startup and whenever its +/// labels change. Key format: `info.`. +pub const BUCKET_DEVICE_INFO: &str = "device-info"; + +/// Current reconcile phase for each `(device, deployment)` pair. +/// Agent writes on phase transition; operator watches this bucket +/// to drive CR `.status.aggregate`. Authoritative source of truth +/// for "what's running where." Key format: +/// `state..`. +pub const BUCKET_DEVICE_STATE: &str = "device-state"; + +/// Tiny liveness ping from each device every N seconds. Separate +/// from [`BUCKET_DEVICE_STATE`] so routine heartbeats don't churn +/// the state bucket. Key format: `heartbeat.`. +pub const BUCKET_DEVICE_HEARTBEAT: &str = "device-heartbeat"; /// KV key for a `(device, deployment)` pair in [`BUCKET_DESIRED_STATE`]. /// Format: `.`. -pub fn desired_state_key(device_id: &str, deployment_name: &str) -> String { - format!("{device_id}.{deployment_name}") +pub fn desired_state_key(device_id: &str, deployment_name: &DeploymentName) -> String { + format!("{device_id}.{}", deployment_name.as_str()) } -/// KV key for a device's last-known status in [`BUCKET_AGENT_STATUS`]. -/// Format: `status.`. -pub fn status_key(device_id: &str) -> String { - format!("status.{device_id}") +/// KV key for a device's `DeviceInfo` entry in [`BUCKET_DEVICE_INFO`]. +/// Format: `info.`. +pub fn device_info_key(device_id: &str) -> String { + format!("info.{device_id}") +} + +/// KV key for a `(device, deployment)` state entry in +/// [`BUCKET_DEVICE_STATE`]. Format: `state..`. +pub fn device_state_key(device_id: &str, deployment_name: &DeploymentName) -> String { + format!("state.{device_id}.{}", deployment_name.as_str()) +} + +/// KV key for a device's liveness entry in +/// [`BUCKET_DEVICE_HEARTBEAT`]. Format: `heartbeat.`. +pub fn device_heartbeat_key(device_id: &str) -> String { + format!("heartbeat.{device_id}") } #[cfg(test)] mod tests { use super::*; + fn dn(s: &str) -> crate::DeploymentName { + crate::DeploymentName::try_new(s).expect("valid") + } + #[test] fn desired_state_key_format() { - assert_eq!(desired_state_key("pi-01", "hello-web"), "pi-01.hello-web"); + assert_eq!( + desired_state_key("pi-01", &dn("hello-web")), + "pi-01.hello-web" + ); } #[test] - fn status_key_format() { - assert_eq!(status_key("pi-01"), "status.pi-01"); - } - - #[test] - fn bucket_names_match_smoke_scripts() { - // These strings are also grepped by iot/scripts/smoke-*.sh — - // flipping them here must be paired with a script update. + fn bucket_names_stable() { + // Flipping these is a cross-component break — operator, + // agent, and smoke scripts all grep for the literal values. assert_eq!(BUCKET_DESIRED_STATE, "desired-state"); - assert_eq!(BUCKET_AGENT_STATUS, "agent-status"); + assert_eq!(BUCKET_DEVICE_INFO, "device-info"); + assert_eq!(BUCKET_DEVICE_STATE, "device-state"); + assert_eq!(BUCKET_DEVICE_HEARTBEAT, "device-heartbeat"); + } + + #[test] + fn key_formats() { + assert_eq!(device_info_key("pi-01"), "info.pi-01"); + assert_eq!( + device_state_key("pi-01", &dn("hello-web")), + "state.pi-01.hello-web" + ); + assert_eq!(device_heartbeat_key("pi-01"), "heartbeat.pi-01"); } } diff --git a/harmony-reconciler-contracts/src/lib.rs b/harmony-reconciler-contracts/src/lib.rs index 472ee4e4..5127d0a8 100644 --- a/harmony-reconciler-contracts/src/lib.rs +++ b/harmony-reconciler-contracts/src/lib.rs @@ -3,30 +3,31 @@ //! Harmony's "reconciler" pattern is: a central **operator** writes //! desired state into NATS JetStream KV; a remote **agent** watches //! the KV, deserializes each entry as a Score, and drives the host -//! toward that state. This split lets one operator orchestrate a -//! fleet of agents across network boundaries it can't reach -//! directly — IoT devices today, OKD cluster agents or edge-compute -//! reconcilers tomorrow. +//! toward that state. The agent writes back per-device info and +//! per-deployment state into separate KV buckets; the operator reads +//! those to aggregate `.status.aggregate` onto the CR. //! //! This crate holds the wire-format bits both sides must agree on: -//! NATS bucket names, KV key formats, and the `AgentStatus` -//! heartbeat payload. The Score types themselves (`PodmanV0Score`, -//! future variants) live in their respective harmony modules — -//! consumers import them from there and serialize them over the -//! transport this crate describes. +//! NATS bucket names, KV key formats, and the typed payloads +//! (`DeviceInfo`, `DeploymentState`, `HeartbeatPayload`). The Score +//! types themselves live in their respective harmony modules. //! //! **Deliberately lean** — no tokio, no async-nats, no harmony. //! The on-device agent build pulls it in alongside a minimal //! async-nats client; the operator pulls it alongside kube-rs. -//! Neither should pay for the other's dependencies. +pub mod fleet; pub mod kv; pub mod status; -pub use kv::{BUCKET_AGENT_STATUS, BUCKET_DESIRED_STATE, desired_state_key, status_key}; -pub use status::{ - AgentStatus, DeploymentPhase, EventEntry, EventSeverity, InventorySnapshot, Phase, +pub use fleet::{ + DeploymentName, DeploymentState, DeviceInfo, HeartbeatPayload, InvalidDeploymentName, }; +pub use kv::{ + BUCKET_DESIRED_STATE, BUCKET_DEVICE_HEARTBEAT, BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE, + desired_state_key, device_heartbeat_key, device_info_key, device_state_key, +}; +pub use status::{InventorySnapshot, Phase}; // Re-exports so consumers (agent, operator) don't need a direct // harmony_types dependency purely to name the cross-boundary types. diff --git a/harmony-reconciler-contracts/src/status.rs b/harmony-reconciler-contracts/src/status.rs index bbe39b79..5162797f 100644 --- a/harmony-reconciler-contracts/src/status.rs +++ b/harmony-reconciler-contracts/src/status.rs @@ -1,79 +1,7 @@ -//! Agent → NATS KV status payload. -//! -//! The agent publishes a rolling status snapshot to the -//! `agent-status` bucket every 30 s (see -//! [`crate::BUCKET_AGENT_STATUS`]). The payload is cumulative and -//! self-contained: every publish is a full picture, so the operator -//! doesn't have to replay history from JetStream to reconstruct -//! current state. -//! -//! Wire-format evolution rule: new fields must be `#[serde(default)]` -//! so older operators keep parsing newer agent payloads, and newer -//! operators keep parsing older ones. Every field below respects -//! that. +//! Shared status primitives reused across the fleet wire format. -use std::collections::BTreeMap; - -use chrono::{DateTime, Utc}; -use harmony_types::id::Id; use serde::{Deserialize, Serialize}; -/// Rolling heartbeat / status snapshot from a single agent. -/// -/// Published at `status.` in [`crate::BUCKET_AGENT_STATUS`] -/// on a regular cadence (30 s) and after significant state changes -/// (reconcile success, reconcile failure, image pull start/end). -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct AgentStatus { - /// Echoed from the agent's own config so the operator can - /// cross-check which device it came from if the KV key is ever - /// ambiguous. Serializes transparently as a plain string. - pub device_id: Id, - /// Coarse rollup state. v0 only ever writes `"running"`; richer - /// variants are a v0.1+ concern. A String (not an enum) so old - /// operators parsing this payload don't fail on a new variant. - pub status: String, - /// RFC 3339 UTC timestamp of this publish. Lexicographically - /// comparable against other agent timestamps for freshness - /// checks. - pub timestamp: DateTime, - /// Per-deployment reconcile state. Keyed by deployment name - /// (the CR's `metadata.name`). When the agent has no - /// deployments, this is an empty map. - #[serde(default)] - pub deployments: BTreeMap, - /// Bounded ring-buffer of the most recent reconcile events on - /// this device. Used by the operator to surface "what did the - /// agent actually do" in the CR's status without the operator - /// having to replay per-message JetStream streams. - /// - /// Agents cap this to the last N entries (typical: 20); operator - /// aggregation shows the first M across the fleet (typical: 10). - #[serde(default)] - pub recent_events: Vec, - /// Hardware / OS inventory. Published once on startup and on - /// change. `None` means "not yet reported" (fresh agent before - /// first publish). Keeping this optional (rather than a zeroed - /// struct) makes "absence" distinguishable from "zero bytes of - /// disk." - #[serde(default)] - pub inventory: Option, -} - -/// Reconcile phase for a single deployment on one device. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct DeploymentPhase { - /// Current phase of this deployment on this device. - pub phase: Phase, - /// Timestamp of the last phase transition or retry. - pub last_event_at: DateTime, - /// Short human-readable error message from the most recent - /// failure, if any. Cleared when the deployment transitions - /// back to `Running`. - #[serde(default)] - pub last_error: Option, -} - /// Coarse state of a single reconcile on one device. /// /// Deliberately coarse — richer granularity (ImagePulling, @@ -83,7 +11,7 @@ pub struct DeploymentPhase { pub enum Phase { /// Agent has applied the Score and the container is up. Running, - /// Reconcile hit an error. See `last_error` for the message. + /// Reconcile hit an error. See paired `last_error` for the message. Failed, /// Reconcile is in flight or waiting on an external dependency /// (image pull, network, etc.). Agents may also report this @@ -91,36 +19,8 @@ pub enum Phase { Pending, } -/// One agent-side event worth surfacing to the operator. -/// -/// "Event" in the Kubernetes sense: a timestamped short log-like -/// observation, not a structured metric. Used for the -/// `.status.aggregate.recent_events` rollup so an operator seeing -/// `failed: 3` can click through to see the last three error -/// messages. -#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] -pub struct EventEntry { - pub at: DateTime, - pub severity: EventSeverity, - /// Short human-readable message. Agents should cap this at a - /// reasonable length (~512 chars) to keep the payload under - /// NATS JetStream's per-message limit. - pub message: String, - /// Optional deployment this event relates to. `None` for - /// device-wide events (podman socket bounce, NATS reconnect). - #[serde(default)] - pub deployment: Option, -} - -#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)] -pub enum EventSeverity { - Info, - Warn, - Error, -} - -/// Static-ish facts about the device. Published once per agent -/// lifetime (startup) and republished on change. +/// Static-ish facts about the device. Embedded in +/// [`crate::DeviceInfo`]; republished on change. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] pub struct InventorySnapshot { pub hostname: String, @@ -133,113 +33,3 @@ pub struct InventorySnapshot { /// agents that are behind the current release. pub agent_version: String, } - -#[cfg(test)] -mod tests { - use super::*; - - fn ts(s: &str) -> DateTime { - DateTime::parse_from_rfc3339(s).unwrap().with_timezone(&Utc) - } - - #[test] - fn minimal_status_roundtrip() { - let s = AgentStatus { - device_id: Id::from("pi-01".to_string()), - status: "running".to_string(), - timestamp: ts("2026-04-21T18:15:42Z"), - deployments: BTreeMap::new(), - recent_events: vec![], - inventory: None, - }; - let json = serde_json::to_string(&s).unwrap(); - let back: AgentStatus = serde_json::from_str(&json).unwrap(); - assert_eq!(s, back); - } - - #[test] - fn enriched_status_roundtrip() { - let mut deployments = BTreeMap::new(); - deployments.insert( - "hello-world".to_string(), - DeploymentPhase { - phase: Phase::Running, - last_event_at: ts("2026-04-21T18:15:42Z"), - last_error: None, - }, - ); - deployments.insert( - "broken-app".to_string(), - DeploymentPhase { - phase: Phase::Failed, - last_event_at: ts("2026-04-21T18:16:00Z"), - last_error: Some("podman pull: 429 Too Many Requests".to_string()), - }, - ); - - let s = AgentStatus { - device_id: Id::from("pi-01".to_string()), - status: "running".to_string(), - timestamp: ts("2026-04-21T18:15:42Z"), - deployments, - recent_events: vec![ - EventEntry { - at: ts("2026-04-21T18:14:00Z"), - severity: EventSeverity::Info, - message: "started hello-world".to_string(), - deployment: Some("hello-world".to_string()), - }, - EventEntry { - at: ts("2026-04-21T18:16:00Z"), - severity: EventSeverity::Error, - message: "pull failed".to_string(), - deployment: Some("broken-app".to_string()), - }, - ], - inventory: Some(InventorySnapshot { - hostname: "pi-01".to_string(), - arch: "aarch64".to_string(), - os: "Ubuntu 24.04".to_string(), - kernel: "6.8.0-1004-raspi".to_string(), - cpu_cores: 4, - memory_mb: 8192, - agent_version: "0.1.0".to_string(), - }), - }; - let json = serde_json::to_string(&s).unwrap(); - let back: AgentStatus = serde_json::from_str(&json).unwrap(); - assert_eq!(s, back); - } - - #[test] - fn old_wire_format_parses_into_enriched_struct() { - // Payload shape produced by a pre-Chapter-2 agent. Must - // still deserialize so operators doing a mixed-fleet upgrade - // don't explode. - let json = r#"{ - "device_id": "pi-01", - "status": "running", - "timestamp": "2026-04-21T18:15:42Z" - }"#; - let s: AgentStatus = serde_json::from_str(json).unwrap(); - assert!(s.deployments.is_empty()); - assert!(s.recent_events.is_empty()); - assert!(s.inventory.is_none()); - } - - #[test] - fn wire_keys_present() { - let s = AgentStatus { - device_id: Id::from("pi-01".to_string()), - status: "running".to_string(), - timestamp: ts("2026-04-21T18:15:42Z"), - deployments: BTreeMap::new(), - recent_events: vec![], - inventory: None, - }; - let json = serde_json::to_string(&s).unwrap(); - assert!(json.contains("\"device_id\":\"pi-01\""), "got {json}"); - assert!(json.contains("\"status\":\"running\"")); - assert!(json.contains("\"timestamp\":\"2026-04-21T18:15:42Z\"")); - } -} diff --git a/iot/iot-agent-v0/src/fleet_publisher.rs b/iot/iot-agent-v0/src/fleet_publisher.rs new file mode 100644 index 00000000..0c334d6e --- /dev/null +++ b/iot/iot-agent-v0/src/fleet_publisher.rs @@ -0,0 +1,126 @@ +//! Agent-side publish surface. +//! +//! Thin wrapper around three KV buckets: [`BUCKET_DEVICE_INFO`], +//! [`BUCKET_DEVICE_STATE`], [`BUCKET_DEVICE_HEARTBEAT`]. +//! +//! Failure mode: log and swallow. The KV is the source of truth — +//! a dropped put gets corrected on the next reconcile transition +//! or operator watch reconnection. + +use async_nats::jetstream::{self, kv}; +use harmony_reconciler_contracts::{ + BUCKET_DEVICE_HEARTBEAT, BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE, DeploymentName, + DeploymentState, DeviceInfo, HeartbeatPayload, Id, InventorySnapshot, device_heartbeat_key, + device_info_key, device_state_key, +}; +use std::collections::BTreeMap; + +pub struct FleetPublisher { + device_id: Id, + info_bucket: kv::Store, + state_bucket: kv::Store, + heartbeat_bucket: kv::Store, +} + +impl FleetPublisher { + /// Open every bucket the agent needs, creating those that don't + /// exist yet. Idempotent with operator-side creation. + pub async fn connect(client: async_nats::Client, device_id: Id) -> anyhow::Result { + let jetstream = jetstream::new(client); + + let info_bucket = jetstream + .create_key_value(kv::Config { + bucket: BUCKET_DEVICE_INFO.to_string(), + history: 1, + ..Default::default() + }) + .await?; + let state_bucket = jetstream + .create_key_value(kv::Config { + bucket: BUCKET_DEVICE_STATE.to_string(), + history: 1, + ..Default::default() + }) + .await?; + let heartbeat_bucket = jetstream + .create_key_value(kv::Config { + bucket: BUCKET_DEVICE_HEARTBEAT.to_string(), + history: 1, + ..Default::default() + }) + .await?; + + Ok(Self { + device_id, + info_bucket, + state_bucket, + heartbeat_bucket, + }) + } + + /// Publish the agent's static-ish facts. Called at startup and + /// on label change. + pub async fn publish_device_info( + &self, + labels: BTreeMap, + inventory: Option, + ) { + let info = DeviceInfo { + device_id: self.device_id.clone(), + labels, + inventory, + updated_at: chrono::Utc::now(), + }; + let key = device_info_key(&self.device_id.to_string()); + match serde_json::to_vec(&info) { + Ok(payload) => { + if let Err(e) = self.info_bucket.put(&key, payload.into()).await { + tracing::warn!(%key, error = %e, "publish_device_info: kv put failed"); + } + } + Err(e) => tracing::warn!(error = %e, "publish_device_info: serialize failed"), + } + } + + /// Tiny liveness ping. Called every 30s. + pub async fn publish_heartbeat(&self) { + let hb = HeartbeatPayload { + device_id: self.device_id.clone(), + at: chrono::Utc::now(), + }; + let key = device_heartbeat_key(&self.device_id.to_string()); + match serde_json::to_vec(&hb) { + Ok(payload) => { + if let Err(e) = self.heartbeat_bucket.put(&key, payload.into()).await { + tracing::debug!(%key, error = %e, "publish_heartbeat: kv put failed"); + } + } + Err(e) => tracing::warn!(error = %e, "publish_heartbeat: serialize failed"), + } + } + + /// Persist the authoritative current phase for a `(device, + /// deployment)` pair. The operator's watch on the `device-state` + /// bucket picks up this put and updates CR status counters. + pub async fn write_deployment_state(&self, state: &DeploymentState) { + let key = device_state_key(&self.device_id.to_string(), &state.deployment); + match serde_json::to_vec(state) { + Ok(payload) => { + if let Err(e) = self.state_bucket.put(&key, payload.into()).await { + tracing::warn!(%key, error = %e, "write_deployment_state: kv put failed"); + } + } + Err(e) => tracing::warn!(error = %e, "write_deployment_state: serialize failed"), + } + } + + /// Delete the authoritative current-phase entry, e.g. when the + /// Deployment CR is removed and the agent has torn down the + /// container. + pub async fn delete_deployment_state(&self, deployment: &DeploymentName) { + let key = device_state_key(&self.device_id.to_string(), deployment); + if let Err(e) = self.state_bucket.delete(&key).await { + tracing::debug!(%key, error = %e, "delete_deployment_state: kv delete failed"); + } + } +} diff --git a/iot/iot-agent-v0/src/main.rs b/iot/iot-agent-v0/src/main.rs index dfa236ba..b0b71c45 100644 --- a/iot/iot-agent-v0/src/main.rs +++ b/iot/iot-agent-v0/src/main.rs @@ -1,4 +1,5 @@ mod config; +mod fleet_publisher; mod reconciler; use std::sync::Arc; @@ -8,14 +9,13 @@ use anyhow::{Context, Result}; use clap::Parser; use config::{AgentConfig, CredentialSource, TomlFileCredentialSource}; use futures_util::StreamExt; -use harmony_reconciler_contracts::{ - AgentStatus, BUCKET_AGENT_STATUS, BUCKET_DESIRED_STATE, Id, InventorySnapshot, status_key, -}; +use harmony_reconciler_contracts::{BUCKET_DESIRED_STATE, Id, InventorySnapshot}; use harmony::inventory::Inventory; use harmony::modules::podman::PodmanTopology; use harmony::topology::Topology; +use crate::fleet_publisher::FleetPublisher; use crate::reconciler::Reconciler; /// ROADMAP §5.6 — agent polls podman every 30s as ground truth; KV watch @@ -85,37 +85,16 @@ async fn watch_desired_state( Ok(()) } -async fn report_status( - client: async_nats::Client, - device_id: Id, - reconciler: Arc, - inventory: Option, -) -> Result<()> { - let jetstream = async_nats::jetstream::new(client); - let bucket = jetstream - .create_key_value(async_nats::jetstream::kv::Config { - bucket: BUCKET_AGENT_STATUS.to_string(), - ..Default::default() - }) - .await?; - - let key = status_key(&device_id.to_string()); +/// Tiny liveness-only loop: push a `HeartbeatPayload` into the +/// `device-heartbeat` bucket every N seconds. Stays separate from +/// per-deployment state writes so routine pings don't churn the +/// device-state bucket or its watch subscribers. +async fn publish_heartbeat_loop(fleet: Arc) { let mut interval = tokio::time::interval(Duration::from_secs(30)); - + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); loop { interval.tick().await; - let (deployments, recent_events) = reconciler.status_snapshot().await; - let status = AgentStatus { - device_id: device_id.clone(), - status: "running".to_string(), - timestamp: chrono::Utc::now(), - deployments, - recent_events, - inventory: inventory.clone(), - }; - let payload = serde_json::to_vec(&status)?; - bucket.put(&key, payload.into()).await?; - tracing::debug!(key = %key, "reported status"); + fleet.publish_heartbeat().await; } } @@ -177,10 +156,36 @@ async fn main() -> Result<()> { tracing::info!(hostname = %inventory.location.name, "inventory loaded"); let inventory_snapshot = local_inventory(&inventory); - let reconciler = Arc::new(Reconciler::new(topology, inventory)); - let client = connect_nats(&cfg).await?; + // Publish surface. Opens the three KV buckets (idempotent + // creates). Must be live before the reconciler starts so + // writes on the first desired-state KV watch land on the wire. + let fleet = Arc::new( + FleetPublisher::connect(client.clone(), device_id.clone()) + .await + .context("fleet publisher connect")?, + ); + tracing::info!("fleet publisher ready"); + + // Publish DeviceInfo once at startup. Labels are empty on this + // branch — the agent config's `[labels]` section is added in + // the selector-targeting work and flows here once that branch + // merges. Until then, operators will see a DeviceInfo payload + // with an empty label map (matches no deployment selector, which + // is the correct fail-safe behavior for an unconfigured device). + let startup_labels = std::collections::BTreeMap::new(); + fleet + .publish_device_info(startup_labels, Some(inventory_snapshot.clone())) + .await; + + let reconciler = Arc::new(Reconciler::new( + device_id.clone(), + topology, + inventory, + Some(fleet.clone()), + )); + let ctrlc = async { tokio::signal::ctrl_c().await.ok(); tracing::info!("received SIGINT, shutting down"); @@ -193,21 +198,17 @@ async fn main() -> Result<()> { Ok::<(), anyhow::Error>(()) }; - let watch = watch_desired_state(client.clone(), device_id.clone(), reconciler.clone()); - let status = report_status( - client, - device_id, - reconciler.clone(), - Some(inventory_snapshot), - ); + let _ = inventory_snapshot; // consumed by the DeviceInfo publish above + let watch = watch_desired_state(client, device_id, reconciler.clone()); let reconcile = reconciler.clone().run_periodic(RECONCILE_INTERVAL); + let heartbeat = publish_heartbeat_loop(fleet); tokio::select! { _ = ctrlc => {}, r = sigterm => { r?; } r = watch => { r?; } - r = status => { r?; } _ = reconcile => {} + _ = heartbeat => {} } Ok(()) diff --git a/iot/iot-agent-v0/src/reconciler.rs b/iot/iot-agent-v0/src/reconciler.rs index dd54d7c4..c46d862a 100644 --- a/iot/iot-agent-v0/src/reconciler.rs +++ b/iot/iot-agent-v0/src/reconciler.rs @@ -1,18 +1,18 @@ -use std::collections::{BTreeMap, HashMap, VecDeque}; +use std::collections::HashMap; use std::sync::Arc; use std::time::Duration; use anyhow::Result; use chrono::Utc; -use harmony_reconciler_contracts::{ - DeploymentPhase as ReportedPhase, EventEntry, EventSeverity, Phase, -}; +use harmony_reconciler_contracts::{DeploymentName, DeploymentState, Id, Phase}; use tokio::sync::Mutex; use harmony::inventory::Inventory; use harmony::modules::podman::{IotScore, PodmanTopology, PodmanV0Score}; use harmony::score::Score; +use crate::fleet_publisher::FleetPublisher; + /// Cache key → last-seen state, populated by `apply` and consulted by the /// 30-second periodic tick and the delete path. struct CachedEntry { @@ -24,83 +24,81 @@ struct CachedEntry { score: PodmanV0Score, } -/// Per-device reconcile status, separate from the desired-state cache -/// so the status reporter can snapshot it without racing the apply -/// path. -#[derive(Default)] -struct StatusState { - deployments: BTreeMap, - recent_events: VecDeque, -} - -/// Cap on the ring buffer of recent events. Large enough for the -/// operator's "last 5-10 events" rollup; small enough that the whole -/// AgentStatus payload stays well under the NATS JetStream per-message -/// limit. -const EVENT_RING_CAP: usize = 32; - pub struct Reconciler { + device_id: Id, topology: Arc, inventory: Arc, /// Keyed by NATS KV key (`.`). A single entry per /// KV key — in v0 there is no fan-out from one key to many scores. state: Mutex>, - status: Mutex, + /// Current phase per deployment, used to decide whether a new + /// write to the `device-state` KV is needed. + phases: Mutex>, + /// Publish surface. Optional so unit tests without a live NATS + /// client still work; always populated in the real agent runtime. + fleet: Option>, } impl Reconciler { - pub fn new(topology: Arc, inventory: Arc) -> Self { + pub fn new( + device_id: Id, + topology: Arc, + inventory: Arc, + fleet: Option>, + ) -> Self { Self { + device_id, topology, inventory, state: Mutex::new(HashMap::new()), - status: Mutex::new(StatusState::default()), + phases: Mutex::new(HashMap::new()), + fleet, } } - /// Snapshot of everything the status reporter needs to publish. - /// Returns clones so the caller can serialize without holding - /// locks. - pub async fn status_snapshot(&self) -> (BTreeMap, Vec) { - let status = self.status.lock().await; - ( - status.deployments.clone(), - status.recent_events.iter().cloned().collect(), - ) - } + /// Record a new phase for a deployment and, if it changed, write + /// the updated [`DeploymentState`] to the KV. Same-phase + /// re-confirmations are no-ops so the periodic reconcile tick + /// doesn't churn the bucket. + async fn apply_phase( + &self, + deployment: &DeploymentName, + phase: Phase, + last_error: Option, + ) { + { + let mut phases = self.phases.lock().await; + if phases.get(deployment).copied() == Some(phase) { + return; + } + phases.insert(deployment.clone(), phase); + } - async fn set_phase(&self, deployment: &str, phase: Phase, last_error: Option) { - let mut status = self.status.lock().await; - status.deployments.insert( - deployment.to_string(), - ReportedPhase { + if let Some(publisher) = &self.fleet { + let state = DeploymentState { + device_id: self.device_id.clone(), + deployment: deployment.clone(), phase, last_event_at: Utc::now(), last_error, - }, - ); + }; + publisher.write_deployment_state(&state).await; + } } - async fn drop_phase(&self, deployment: &str) { - let mut status = self.status.lock().await; - status.deployments.remove(deployment); - } - - async fn push_event( - &self, - severity: EventSeverity, - message: String, - deployment: Option, - ) { - let mut status = self.status.lock().await; - status.recent_events.push_back(EventEntry { - at: Utc::now(), - severity, - message, - deployment, - }); - while status.recent_events.len() > EVENT_RING_CAP { - status.recent_events.pop_front(); + /// Clear the in-memory phase for a deployment and delete its KV + /// entry. Idempotent: a delete for a never-applied deployment is + /// a no-op in memory and a harmless tombstone write on the wire. + async fn drop_phase(&self, deployment: &DeploymentName) { + let was_known = { + let mut phases = self.phases.lock().await; + phases.remove(deployment).is_some() + }; + if !was_known { + return; + } + if let Some(publisher) = &self.fleet { + publisher.delete_deployment_state(deployment).await; } } @@ -113,15 +111,9 @@ impl Reconciler { Ok(IotScore::PodmanV0(s)) => s, Err(e) => { tracing::warn!(key, error = %e, "failed to deserialize score"); - if let Some(name) = deployment.as_deref() { - self.set_phase(name, Phase::Failed, Some(format!("bad payload: {e}"))) + if let Some(name) = &deployment { + self.apply_phase(name, Phase::Failed, Some(format!("bad payload: {e}"))) .await; - self.push_event( - EventSeverity::Error, - format!("deserialize failure: {e}"), - Some(name.to_string()), - ) - .await; } return Ok(()); } @@ -138,32 +130,20 @@ impl Reconciler { } } - if let Some(name) = deployment.as_deref() { - self.set_phase(name, Phase::Pending, None).await; + if let Some(name) = &deployment { + self.apply_phase(name, Phase::Pending, None).await; } match self.run_score(key, &incoming).await { Ok(()) => { - if let Some(name) = deployment.as_deref() { - self.set_phase(name, Phase::Running, None).await; - self.push_event( - EventSeverity::Info, - "reconciled".to_string(), - Some(name.to_string()), - ) - .await; + if let Some(name) = &deployment { + self.apply_phase(name, Phase::Running, None).await; } } Err(e) => { - if let Some(name) = deployment.as_deref() { - self.set_phase(name, Phase::Failed, Some(short(&e.to_string()))) + if let Some(name) = &deployment { + self.apply_phase(name, Phase::Failed, Some(short(&e.to_string()))) .await; - self.push_event( - EventSeverity::Error, - short(&e.to_string()), - Some(name.to_string()), - ) - .await; } return Err(e); } @@ -189,7 +169,7 @@ impl Reconciler { let mut state = self.state.lock().await; let Some(entry) = state.remove(key) else { tracing::info!(key, "delete for unknown key — nothing to remove"); - if let Some(name) = deployment.as_deref() { + if let Some(name) = &deployment { self.drop_phase(name).await; } return Ok(()); @@ -209,14 +189,8 @@ impl Reconciler { tracing::info!(key, service = %service.name, "removed container"); } } - if let Some(name) = deployment.as_deref() { + if let Some(name) = &deployment { self.drop_phase(name).await; - self.push_event( - EventSeverity::Info, - "deployment deleted".to_string(), - Some(name.to_string()), - ) - .await; } Ok(()) } @@ -238,24 +212,15 @@ impl Reconciler { let deployment = deployment_from_key(&key); match self.run_score(&key, &score).await { Ok(()) => { - // Keep the phase Running (no-op if already). - // Don't emit an event on idempotent no-change - // ticks — the 30 s cadence would drown the ring. - if let Some(name) = deployment.as_deref() { - self.set_phase(name, Phase::Running, None).await; + if let Some(name) = &deployment { + self.apply_phase(name, Phase::Running, None).await; } } Err(e) => { tracing::warn!(key, error = %e, "periodic reconcile failed"); - if let Some(name) = deployment.as_deref() { - self.set_phase(name, Phase::Failed, Some(short(&e.to_string()))) + if let Some(name) = &deployment { + self.apply_phase(name, Phase::Failed, Some(short(&e.to_string()))) .await; - self.push_event( - EventSeverity::Error, - short(&e.to_string()), - Some(name.to_string()), - ) - .await; } } } @@ -286,15 +251,13 @@ impl Reconciler { } /// Extract the deployment name from a NATS KV key of the form -/// `.`. Returns `None` for keys that don't match -/// that shape (defensive — the agent only ever subscribes to -/// `.>` filters so this should always succeed, but we don't -/// want to crash on a malformed key). -fn deployment_from_key(key: &str) -> Option { - key.split_once('.').map(|(_, rest)| rest.to_string()) +/// `.`. +fn deployment_from_key(key: &str) -> Option { + let (_, rest) = key.split_once('.')?; + DeploymentName::try_new(rest).ok() } -/// Truncate a long error message so the AgentStatus payload stays +/// Truncate a long error message so the DeploymentState payload stays /// comfortably below NATS JetStream's per-message limit. fn short(s: &str) -> String { const MAX: usize = 512; @@ -306,3 +269,76 @@ fn short(s: &str) -> String { cut } } + +#[cfg(test)] +mod tests { + //! Focused tests for transition detection. Drive `apply_phase` / + //! `drop_phase` directly with an inert topology (no real podman + //! socket) and a `None` FleetPublisher. + use super::*; + use harmony::inventory::Inventory; + use harmony::modules::podman::PodmanTopology; + use std::path::PathBuf; + + fn reconciler() -> Reconciler { + let topology = Arc::new( + PodmanTopology::from_unix_socket(PathBuf::from("/nonexistent/for-tests")).unwrap(), + ); + let inventory = Arc::new(Inventory::empty()); + Reconciler::new( + Id::from("test-device".to_string()), + topology, + inventory, + None, + ) + } + + fn dn(s: &str) -> DeploymentName { + DeploymentName::try_new(s).expect("valid test name") + } + + #[tokio::test] + async fn apply_phase_records_new_phase() { + let r = reconciler(); + r.apply_phase(&dn("hello"), Phase::Running, None).await; + let phases = r.phases.lock().await; + assert_eq!(phases.get(&dn("hello")), Some(&Phase::Running)); + } + + #[tokio::test] + async fn apply_phase_idempotent_for_same_phase() { + let r = reconciler(); + r.apply_phase(&dn("hello"), Phase::Running, None).await; + r.apply_phase(&dn("hello"), Phase::Running, None).await; + let phases = r.phases.lock().await; + assert_eq!(phases.len(), 1); + } + + #[tokio::test] + async fn apply_phase_transitions_update_phase() { + let r = reconciler(); + r.apply_phase(&dn("hello"), Phase::Pending, None).await; + r.apply_phase(&dn("hello"), Phase::Running, None).await; + r.apply_phase(&dn("hello"), Phase::Failed, Some("oom".to_string())) + .await; + let phases = r.phases.lock().await; + assert_eq!(phases.get(&dn("hello")), Some(&Phase::Failed)); + } + + #[tokio::test] + async fn drop_phase_clears_known_deployment() { + let r = reconciler(); + r.apply_phase(&dn("hello"), Phase::Running, None).await; + r.drop_phase(&dn("hello")).await; + let phases = r.phases.lock().await; + assert!(!phases.contains_key(&dn("hello"))); + } + + #[tokio::test] + async fn drop_phase_on_unknown_deployment_is_noop() { + let r = reconciler(); + r.drop_phase(&dn("never-existed")).await; + let phases = r.phases.lock().await; + assert!(phases.is_empty()); + } +} diff --git a/iot/iot-operator-v0/src/aggregate.rs b/iot/iot-operator-v0/src/aggregate.rs deleted file mode 100644 index c6ca9c83..00000000 --- a/iot/iot-operator-v0/src/aggregate.rs +++ /dev/null @@ -1,352 +0,0 @@ -//! Agent-status → CR-status aggregator. -//! -//! Watches the `agent-status` NATS KV bucket, keeps a per-device -//! snapshot in memory, and periodically recomputes each Deployment -//! CR's `.status.aggregate` subtree from the intersection of its -//! `spec.targetDevices` list and the known device statuses. -//! -//! Runs as a background task alongside the controller. Keeping the -//! controller free of NATS-KV subscription state lets its reconcile -//! loop stay reactive and cheap (just publishing desired state + -//! managing finalizers), while this task handles the slower -//! many-devices-to-one-CR fan-in. -//! -//! Design choices: -//! - **In-memory snapshot map** (device_id → AgentStatus). Rebuilt -//! from JetStream on startup via the watch's initial replay; kept -//! current by watching thereafter. No persistence — the bucket is -//! the source of truth. -//! - **Periodic aggregation tick** (5 s). Cheap (a few BTreeMap -//! lookups + one `patch_status` per CR) and gives predictable -//! operator behaviour for the smoke harness. A push-based -//! "recompute on every Put" would be tighter but adds complexity -//! this v0.1 doesn't need. -//! - **JSON-Merge Patch.** Writes only the `aggregate` subtree, so -//! it composes cleanly with the controller's -//! `observedScoreString` patch. - -use std::collections::BTreeMap; -use std::sync::Arc; -use std::time::Duration; - -use async_nats::jetstream::kv::{Operation, Store}; -use futures_util::StreamExt; -use harmony_reconciler_contracts::{AgentStatus, Phase}; -use kube::api::{Api, Patch, PatchParams}; -use kube::{Client, ResourceExt}; -use serde_json::json; -use tokio::sync::Mutex; - -use crate::crd::{AggregateEvent, AggregateLastError, Deployment, DeploymentAggregate}; - -/// Cap on how many events we surface in `DeploymentAggregate.recent_events`. -/// Small enough to keep the CR status compact. -const AGGREGATE_EVENT_CAP: usize = 10; - -/// How often the aggregator recomputes + patches. -const AGGREGATE_TICK: Duration = Duration::from_secs(5); - -/// Per-device status snapshot keyed by device id string. -pub type StatusSnapshots = Arc>>; - -/// Spawn the aggregator: watch the agent-status bucket into an -/// in-memory map, and periodically fold that map into every -/// Deployment CR's `.status.aggregate`. -pub async fn run(client: Client, status_bucket: Store) -> anyhow::Result<()> { - let snapshots: StatusSnapshots = Arc::new(Mutex::new(BTreeMap::new())); - - let watcher = tokio::spawn(watch_status_bucket(status_bucket, snapshots.clone())); - let aggregator = tokio::spawn(aggregate_loop(client, snapshots)); - - tokio::select! { - r = watcher => r??, - r = aggregator => r??, - } - Ok(()) -} - -async fn watch_status_bucket(bucket: Store, snapshots: StatusSnapshots) -> anyhow::Result<()> { - tracing::info!("aggregator: watching agent-status bucket"); - let mut watch = bucket.watch("status.>").await?; - while let Some(entry) = watch.next().await { - let entry = match entry { - Ok(e) => e, - Err(e) => { - tracing::warn!(error = %e, "aggregator: watch error"); - continue; - } - }; - let device_id = match device_id_from_status_key(&entry.key) { - Some(id) => id, - None => { - tracing::warn!(key = %entry.key, "aggregator: skipping malformed key"); - continue; - } - }; - match entry.operation { - Operation::Put => match serde_json::from_slice::(&entry.value) { - Ok(status) => { - let mut map = snapshots.lock().await; - map.insert(device_id, status); - } - Err(e) => { - tracing::warn!(key = %entry.key, error = %e, "aggregator: bad status payload"); - } - }, - Operation::Delete | Operation::Purge => { - let mut map = snapshots.lock().await; - map.remove(&device_id); - } - } - } - Ok(()) -} - -async fn aggregate_loop(client: Client, snapshots: StatusSnapshots) -> anyhow::Result<()> { - let deployments: Api = Api::all(client.clone()); - let mut ticker = tokio::time::interval(AGGREGATE_TICK); - ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); - - loop { - ticker.tick().await; - if let Err(e) = tick_once(&deployments, &snapshots).await { - tracing::warn!(error = %e, "aggregator: tick failed"); - } - } -} - -async fn tick_once( - deployments: &Api, - snapshots: &StatusSnapshots, -) -> anyhow::Result<()> { - let crs = deployments.list(&Default::default()).await?; - // Clone the snapshot once per tick so we don't hold the lock - // across network calls. - let snapshot = { snapshots.lock().await.clone() }; - - for cr in &crs { - let ns = match cr.namespace() { - Some(ns) => ns, - None => continue, - }; - let name = cr.name_any(); - let aggregate = compute_aggregate(&cr.spec.target_devices, &name, &snapshot); - let status = json!({ "status": { "aggregate": aggregate } }); - let api: Api = Api::namespaced(deployments.clone().into_client(), &ns); - if let Err(e) = api - .patch_status(&name, &PatchParams::default(), &Patch::Merge(&status)) - .await - { - tracing::warn!(%ns, %name, error = %e, "aggregator: patch failed"); - } - } - Ok(()) -} - -/// Compute the aggregate for one CR from the current snapshot map. -/// Exposed (crate-visible) for unit testing. -pub(crate) fn compute_aggregate( - target_devices: &[String], - deployment_name: &str, - snapshots: &BTreeMap, -) -> DeploymentAggregate { - let mut agg = DeploymentAggregate::default(); - let mut last_error: Option = None; - let mut last_heartbeat: Option> = None; - let mut events: Vec = Vec::new(); - - for device in target_devices { - let status = match snapshots.get(device) { - Some(s) => s, - None => { - agg.unreported += 1; - continue; - } - }; - if last_heartbeat.is_none_or(|t| status.timestamp > t) { - last_heartbeat = Some(status.timestamp); - } - - match status.deployments.get(deployment_name) { - Some(phase) => match phase.phase { - Phase::Running => agg.succeeded += 1, - Phase::Failed => { - agg.failed += 1; - let error_at = phase.last_event_at; - let error_msg = phase - .last_error - .clone() - .unwrap_or_else(|| "failed".to_string()); - let candidate = AggregateLastError { - device_id: device.clone(), - message: error_msg, - at: error_at.to_rfc3339(), - }; - match &last_error { - Some(cur) if cur.at >= candidate.at => {} - _ => last_error = Some(candidate), - } - } - Phase::Pending => agg.pending += 1, - }, - None => { - // Device reported but hasn't acknowledged this - // deployment yet. - agg.pending += 1; - } - } - - // Collect per-deployment events for the fleet-wide ring. - for ev in &status.recent_events { - if ev.deployment.as_deref() == Some(deployment_name) { - events.push(AggregateEvent { - at: ev.at.to_rfc3339(), - severity: match ev.severity { - harmony_reconciler_contracts::EventSeverity::Info => "Info".to_string(), - harmony_reconciler_contracts::EventSeverity::Warn => "Warn".to_string(), - harmony_reconciler_contracts::EventSeverity::Error => "Error".to_string(), - }, - device_id: device.clone(), - message: ev.message.clone(), - deployment: ev.deployment.clone(), - }); - } - } - } - - // Most recent first; cap. - events.sort_by(|a, b| b.at.cmp(&a.at)); - events.truncate(AGGREGATE_EVENT_CAP); - - agg.last_error = last_error; - agg.recent_events = events; - agg.last_heartbeat_at = last_heartbeat.map(|t| t.to_rfc3339()); - agg -} - -/// `status.` → ``. -fn device_id_from_status_key(key: &str) -> Option { - key.strip_prefix("status.").map(|s| s.to_string()) -} - -#[cfg(test)] -mod tests { - use super::*; - use chrono::{DateTime, Utc}; - use harmony_reconciler_contracts::{DeploymentPhase, EventEntry, EventSeverity, Id}; - - fn ts(s: &str) -> DateTime { - DateTime::parse_from_rfc3339(s).unwrap().with_timezone(&Utc) - } - - fn snapshot_with( - device: &str, - deployment: &str, - phase: Phase, - err: Option<&str>, - ) -> AgentStatus { - let mut deployments = BTreeMap::new(); - deployments.insert( - deployment.to_string(), - DeploymentPhase { - phase, - last_event_at: ts("2026-04-22T01:00:00Z"), - last_error: err.map(|s| s.to_string()), - }, - ); - AgentStatus { - device_id: Id::from(device.to_string()), - status: "running".to_string(), - timestamp: ts("2026-04-22T01:00:00Z"), - deployments, - recent_events: vec![], - inventory: None, - } - } - - #[test] - fn aggregate_counts_and_unreported() { - let mut map = BTreeMap::new(); - map.insert( - "pi-01".to_string(), - snapshot_with("pi-01", "hello", Phase::Running, None), - ); - map.insert( - "pi-02".to_string(), - snapshot_with("pi-02", "hello", Phase::Failed, Some("pull err")), - ); - // pi-03 is a target but never reported. - let targets = vec![ - "pi-01".to_string(), - "pi-02".to_string(), - "pi-03".to_string(), - ]; - let agg = compute_aggregate(&targets, "hello", &map); - assert_eq!(agg.succeeded, 1); - assert_eq!(agg.failed, 1); - assert_eq!(agg.pending, 0); - assert_eq!(agg.unreported, 1); - assert_eq!(agg.last_error.as_ref().unwrap().device_id, "pi-02"); - assert_eq!(agg.last_error.as_ref().unwrap().message, "pull err"); - } - - #[test] - fn device_reported_but_no_deployment_entry_is_pending() { - // Agent heartbeated (device known to operator) but hasn't - // acknowledged this specific deployment yet. - let mut map = BTreeMap::new(); - map.insert( - "pi-01".to_string(), - AgentStatus { - device_id: Id::from("pi-01".to_string()), - status: "running".to_string(), - timestamp: ts("2026-04-22T01:00:00Z"), - deployments: BTreeMap::new(), - recent_events: vec![], - inventory: None, - }, - ); - let agg = compute_aggregate(&["pi-01".to_string()], "hello", &map); - assert_eq!(agg.pending, 1); - assert_eq!(agg.unreported, 0); - } - - #[test] - fn events_filtered_to_matching_deployment_only() { - let mut status = snapshot_with("pi-01", "hello", Phase::Running, None); - status.recent_events = vec![ - EventEntry { - at: ts("2026-04-22T01:00:05Z"), - severity: EventSeverity::Info, - message: "hello reconciled".to_string(), - deployment: Some("hello".to_string()), - }, - EventEntry { - at: ts("2026-04-22T01:00:06Z"), - severity: EventSeverity::Info, - message: "other reconciled".to_string(), - deployment: Some("other".to_string()), - }, - EventEntry { - at: ts("2026-04-22T01:00:07Z"), - severity: EventSeverity::Info, - message: "generic device event".to_string(), - deployment: None, - }, - ]; - let mut map = BTreeMap::new(); - map.insert("pi-01".to_string(), status); - let agg = compute_aggregate(&["pi-01".to_string()], "hello", &map); - assert_eq!(agg.recent_events.len(), 1); - assert_eq!(agg.recent_events[0].message, "hello reconciled"); - } - - #[test] - fn device_id_from_status_key_happy_and_malformed() { - assert_eq!( - device_id_from_status_key("status.pi-01"), - Some("pi-01".into()) - ); - assert_eq!(device_id_from_status_key("desired-state.pi-01.x"), None); - } -} diff --git a/iot/iot-operator-v0/src/controller.rs b/iot/iot-operator-v0/src/controller.rs index 2d402a4b..6d3ca7c6 100644 --- a/iot/iot-operator-v0/src/controller.rs +++ b/iot/iot-operator-v0/src/controller.rs @@ -3,7 +3,7 @@ use std::time::Duration; use async_nats::jetstream::kv::Store; use futures_util::StreamExt; -use harmony_reconciler_contracts::desired_state_key; +use harmony_reconciler_contracts::{DeploymentName, desired_state_key}; use kube::api::{Patch, PatchParams}; use kube::runtime::Controller; use kube::runtime::controller::Action; @@ -92,8 +92,19 @@ async fn apply(obj: Arc, api: &Api, kv: &Store) -> Resul return Ok(Action::requeue(Duration::from_secs(300))); } + // The controller trusts its input: `name` came from a k8s CR's + // metadata.name, which the apiserver already validated to RFC + // 1123. A name that doesn't parse as a `DeploymentName` here + // would mean the operator is running against a cluster with a + // CR name containing a `.` or NATS wildcard — a real bug, but + // one we'd rather surface as a clear error than silently skip. + let deployment_name = DeploymentName::try_new(&name).map_err(|e| { + Error::Kv(format!( + "CR name '{name}' is not a valid DeploymentName: {e}" + )) + })?; for device_id in &obj.spec.target_devices { - let key = kv_key(device_id, &name); + let key = kv_key(device_id, &deployment_name); kv.put(key.clone(), score_json.clone().into_bytes().into()) .await .map_err(|e| Error::Kv(e.to_string()))?; @@ -113,8 +124,13 @@ async fn apply(obj: Arc, api: &Api, kv: &Store) -> Resul async fn cleanup(obj: Arc, kv: &Store) -> Result { let name = obj.name_any(); + let deployment_name = DeploymentName::try_new(&name).map_err(|e| { + Error::Kv(format!( + "CR name '{name}' is not a valid DeploymentName: {e}" + )) + })?; for device_id in &obj.spec.target_devices { - let key = kv_key(device_id, &name); + let key = kv_key(device_id, &deployment_name); kv.delete(&key) .await .map_err(|e| Error::Kv(e.to_string()))?; @@ -127,7 +143,7 @@ fn serialize_score(score: &ScorePayload) -> Result { Ok(serde_json::to_string(score)?) } -fn kv_key(device_id: &str, deployment_name: &str) -> String { +fn kv_key(device_id: &str, deployment_name: &DeploymentName) -> String { desired_state_key(device_id, deployment_name) } diff --git a/iot/iot-operator-v0/src/crd.rs b/iot/iot-operator-v0/src/crd.rs index 95bda4f2..a19a7416 100644 --- a/iot/iot-operator-v0/src/crd.rs +++ b/iot/iot-operator-v0/src/crd.rs @@ -105,45 +105,29 @@ pub struct DeploymentStatus { /// (skip KV write + status patch when the CR is unchanged). #[serde(skip_serializing_if = "Option::is_none")] pub observed_score_string: Option, - /// Per-deployment rollup aggregated from the `agent-status` - /// bucket. Present once at least one targeted agent has - /// heartbeated; absent on a freshly-created CR. + /// Per-deployment rollup aggregated from the `device-state` KV + /// bucket. Present once at least one targeted agent has reported; + /// absent on a freshly-created CR. #[serde(skip_serializing_if = "Option::is_none")] pub aggregate: Option, } -/// Rollup of per-device `AgentStatus.deployments` entries for this -/// Deployment CR. +/// Rollup of per-device deployment phases for this Deployment CR. #[derive(Serialize, Deserialize, Clone, Debug, Default, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct DeploymentAggregate { - /// Count of devices where the deployment is in each phase. + /// Count of target devices where the deployment is in each phase. + /// Targeted-but-unreported devices are folded into `pending`. /// Always populated (zeros are valid) so the operator can patch /// the whole subtree atomically. pub succeeded: u32, pub failed: u32, pub pending: u32, - /// Count of target devices that haven't yet heartbeated at all. - /// "failed to join fleet" vs. "failed to reconcile" — different - /// signals, different remedies. - pub unreported: u32, - /// Device id of the most recent device reporting a failure, - /// with its short error message. Surfaces the top failure to - /// the CR's status without needing per-device subresource - /// lookups. + /// Device id of the most recent device reporting a failure, with + /// its short error message. Cleared when that device transitions + /// back to Running. #[serde(skip_serializing_if = "Option::is_none")] pub last_error: Option, - /// Last-N events aggregated across all target devices, most - /// recent first. Operator caps at a handful (see operator - /// controller). - #[serde(default)] - pub recent_events: Vec, - /// Timestamp of the most recent agent heartbeat counted into - /// this aggregate. "Freshness" signal — a CR whose aggregate - /// hasn't advanced in minutes is evidence the whole fleet has - /// gone dark. - #[serde(skip_serializing_if = "Option::is_none")] - pub last_heartbeat_at: Option, } #[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)] @@ -153,14 +137,3 @@ pub struct AggregateLastError { pub message: String, pub at: String, } - -#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)] -#[serde(rename_all = "camelCase")] -pub struct AggregateEvent { - pub at: String, - pub severity: String, - pub device_id: String, - pub message: String, - #[serde(skip_serializing_if = "Option::is_none")] - pub deployment: Option, -} diff --git a/iot/iot-operator-v0/src/fleet_aggregator.rs b/iot/iot-operator-v0/src/fleet_aggregator.rs new file mode 100644 index 00000000..d7946356 --- /dev/null +++ b/iot/iot-operator-v0/src/fleet_aggregator.rs @@ -0,0 +1,542 @@ +//! Operator-side aggregator. +//! +//! Watches the `device-state` KV bucket, maintains an in-memory +//! snapshot of every `(device, deployment)` phase, and patches each +//! Deployment CR's `.status.aggregate` as reports arrive. +//! +//! Everything flows through the KV: the watcher delivers historical +//! entries on startup to seed the snapshot, then live Put/Delete +//! events to keep it current. Counters are recomputed per-CR from +//! the snapshot at 1 Hz, for CRs marked dirty since the last tick. +//! No separate event stream, no revision dedup — the KV is ordered +//! last-writer-wins and that's enough. + +use std::collections::{HashMap, HashSet}; +use std::sync::Arc; +use std::time::Duration; + +use async_nats::jetstream::kv::{Operation, Store}; +use futures_util::StreamExt; +use harmony_reconciler_contracts::{ + BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE, DeploymentName, DeploymentState, DeviceInfo, Phase, +}; +use kube::api::{Api, Patch, PatchParams}; +use kube::{Client, ResourceExt}; +use serde_json::json; +use tokio::sync::Mutex; + +use crate::crd::{AggregateLastError, Deployment, DeploymentAggregate}; + +const PATCH_TICK: Duration = Duration::from_secs(1); + +/// (namespace, name) identifying a Deployment CR. +#[derive(Debug, Clone, PartialEq, Eq, Hash)] +pub struct DeploymentKey { + pub namespace: String, + pub name: String, +} + +impl DeploymentKey { + pub fn from_cr(cr: &Deployment) -> Option { + Some(Self { + namespace: cr.namespace()?, + name: cr.name_any(), + }) + } +} + +/// One `(device, deployment)` pair — the natural key into the states +/// snapshot. Strong-typed so the two fields can't be swapped by +/// accident. +#[derive(Debug, Clone, Hash, PartialEq, Eq)] +pub struct DevicePair { + pub device_id: String, + pub deployment: DeploymentName, +} + +#[derive(Debug, Default)] +pub struct FleetState { + /// Authoritative per-pair phase snapshot, driven by the KV watch. + pub states: HashMap, + /// Routing facts per device. Populated on cold-start + updated + /// by a future device-info watch; labels here feed selector + /// matching. + pub infos: HashMap, + /// CR index by deployment name. The KV key space encodes only + /// the deployment name, so we need a name → CR key lookup to + /// surface every namespace that uses that name. Refreshed at + /// the top of each patch tick from the CR list. + pub crs_by_name: HashMap>, + /// Most-recent failure surfaced per deployment CR. + pub last_error: HashMap, + /// CR keys whose aggregate needs re-patching on the next tick. + pub dirty: HashSet, +} + +pub type SharedFleetState = Arc>; + +/// Does this CR target this device? +/// +/// Today: CR lists device ids explicitly. After the selector branch +/// merges: `cr.spec.target_selector.matches(&info.labels)`. +fn cr_targets_device(cr: &Deployment, device_id: &str) -> bool { + cr.spec.target_devices.iter().any(|d| d == device_id) +} + +pub async fn run(client: Client, js: async_nats::jetstream::Context) -> anyhow::Result<()> { + let info_bucket = js + .create_key_value(async_nats::jetstream::kv::Config { + bucket: BUCKET_DEVICE_INFO.to_string(), + ..Default::default() + }) + .await?; + let state_bucket = js + .create_key_value(async_nats::jetstream::kv::Config { + bucket: BUCKET_DEVICE_STATE.to_string(), + ..Default::default() + }) + .await?; + + let deployments: Api = Api::all(client); + + // Seed infos once so label-based targeting has data to match + // against on the first patch tick. (A future change can replace + // this with a device-info watch.) + let infos = read_device_info(&info_bucket).await?; + let state: SharedFleetState = Arc::new(Mutex::new(FleetState { + infos, + ..Default::default() + })); + + tracing::info!( + devices = state.lock().await.infos.len(), + "aggregator: startup complete — watching device-state" + ); + + let watcher_state = state.clone(); + let watcher = tokio::spawn(async move { + if let Err(e) = run_state_watcher(state_bucket, watcher_state).await { + tracing::warn!(error = %e, "aggregator: state watcher exited"); + } + }); + + let patch_state = state.clone(); + let patch_loop = async move { + let mut ticker = tokio::time::interval(PATCH_TICK); + ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay); + loop { + ticker.tick().await; + if let Err(e) = patch_tick(&deployments, &patch_state).await { + tracing::warn!(error = %e, "aggregator: patch tick failed"); + } + } + }; + + tokio::select! { + _ = patch_loop => Ok(()), + _ = watcher => Ok(()), + } +} + +/// Parse a `device-state` KV key (`state..`) into +/// its component pair. +fn parse_state_key(key: &str) -> Option { + let rest = key.strip_prefix("state.")?; + let (device, deployment) = rest.split_once('.')?; + Some(DevicePair { + device_id: device.to_string(), + deployment: DeploymentName::try_new(deployment).ok()?, + }) +} + +async fn run_state_watcher(bucket: Store, state: SharedFleetState) -> anyhow::Result<()> { + // LastPerSubject delivery replays the current value of every key + // first, then streams live updates. Gives us cold-start + steady + // state in a single subscription — no separate KV scan. + let mut watch = bucket.watch_with_history(">").await?; + while let Some(entry_res) = watch.next().await { + let entry = match entry_res { + Ok(e) => e, + Err(e) => { + tracing::warn!(error = %e, "aggregator: watch delivery error"); + continue; + } + }; + let Some(pair) = parse_state_key(&entry.key) else { + continue; + }; + match entry.operation { + Operation::Put => { + let ds: DeploymentState = match serde_json::from_slice(&entry.value) { + Ok(d) => d, + Err(e) => { + tracing::warn!(key = %entry.key, error = %e, "aggregator: bad device_state payload"); + continue; + } + }; + let mut guard = state.lock().await; + apply_state(&mut guard, pair, ds); + } + Operation::Delete | Operation::Purge => { + let mut guard = state.lock().await; + drop_state(&mut guard, &pair); + } + } + } + Ok(()) +} + +/// Record a device's latest state. Drops stale writes via the +/// `last_event_at` timestamp, updates `last_error`, and marks every +/// CR whose name matches as dirty. +pub fn apply_state(state: &mut FleetState, pair: DevicePair, ds: DeploymentState) { + if let Some(prev) = state.states.get(&pair) { + if prev.last_event_at > ds.last_event_at { + return; + } + } + let phase = ds.phase; + let device_id = ds.device_id.to_string(); + let last_error_msg = ds.last_error.clone(); + let at = ds.last_event_at.to_rfc3339(); + state.states.insert(pair.clone(), ds); + + for key in matching_cr_keys(state, &pair.deployment) { + match phase { + Phase::Failed => { + if let Some(msg) = last_error_msg.as_deref() { + state.last_error.insert( + key.clone(), + AggregateLastError { + device_id: device_id.clone(), + message: msg.to_string(), + at: at.clone(), + }, + ); + } + } + Phase::Running => { + if let Some(existing) = state.last_error.get(&key) { + if existing.device_id == device_id { + state.last_error.remove(&key); + } + } + } + Phase::Pending => {} + } + state.dirty.insert(key); + } +} + +pub fn drop_state(state: &mut FleetState, pair: &DevicePair) { + let Some(removed) = state.states.remove(pair) else { + return; + }; + let device_id = removed.device_id.to_string(); + for key in matching_cr_keys(state, &pair.deployment) { + if let Some(existing) = state.last_error.get(&key) { + if existing.device_id == device_id { + state.last_error.remove(&key); + } + } + state.dirty.insert(key); + } +} + +/// CR keys matching a deployment name, via the index refreshed by +/// [`patch_tick`]. The CR index may be empty for names whose CR +/// hasn't been seen yet — those updates land in `states` and get +/// picked up on the next tick that finds the CR in the kube list. +fn matching_cr_keys(state: &FleetState, deployment: &DeploymentName) -> Vec { + state + .crs_by_name + .get(deployment) + .cloned() + .unwrap_or_default() +} + +async fn patch_tick(deployments: &Api, state: &SharedFleetState) -> anyhow::Result<()> { + let crs = deployments.list(&Default::default()).await?.items; + + let aggregates = { + let mut guard = state.lock().await; + + // Refresh the CR-name index. A CR we haven't seen before is + // automatically marked dirty so the first tick after its + // creation patches an initial aggregate (even all-zero). + let mut next_index: HashMap> = HashMap::new(); + for cr in &crs { + let Some(cr_key) = DeploymentKey::from_cr(cr) else { + continue; + }; + let Ok(deployment_name) = DeploymentName::try_new(&cr_key.name) else { + continue; + }; + let was_known = guard + .crs_by_name + .get(&deployment_name) + .map(|v| v.contains(&cr_key)) + .unwrap_or(false); + if !was_known { + guard.dirty.insert(cr_key.clone()); + } + next_index.entry(deployment_name).or_default().push(cr_key); + } + guard.crs_by_name = next_index; + + let dirty_keys: Vec = guard.dirty.drain().collect(); + let mut aggs = Vec::with_capacity(dirty_keys.len()); + for key in &dirty_keys { + let Some(cr) = crs.iter().find(|c| { + c.namespace().as_deref() == Some(key.namespace.as_str()) && c.name_any() == key.name + }) else { + continue; + }; + let agg = compute_aggregate(&guard, cr); + aggs.push((key.clone(), agg)); + } + aggs + }; + + for (key, aggregate) in aggregates { + let api: Api = + Api::namespaced(deployments.clone().into_client(), &key.namespace); + let status = json!({ "status": { "aggregate": aggregate } }); + if let Err(e) = api + .patch_status(&key.name, &PatchParams::default(), &Patch::Merge(&status)) + .await + { + tracing::warn!( + namespace = %key.namespace, + name = %key.name, + error = %e, + "aggregator: status patch failed" + ); + } else { + tracing::debug!( + namespace = %key.namespace, + name = %key.name, + succeeded = aggregate.succeeded, + failed = aggregate.failed, + pending = aggregate.pending, + "aggregator: status patched" + ); + } + } + Ok(()) +} + +/// Build the aggregate for one CR from the current snapshot. Target +/// devices with no state entry count as `pending` — "we asked, they +/// haven't reported yet" folds into the same bucket as "reconcile in +/// flight" so operators see one pending count. +pub fn compute_aggregate(state: &FleetState, cr: &Deployment) -> DeploymentAggregate { + let mut agg = DeploymentAggregate::default(); + let Ok(deployment_name) = DeploymentName::try_new(cr.name_any()) else { + return agg; + }; + for device_id in &cr.spec.target_devices { + if !cr_targets_device(cr, device_id) { + continue; + } + let pair = DevicePair { + device_id: device_id.clone(), + deployment: deployment_name.clone(), + }; + match state.states.get(&pair).map(|s| s.phase) { + Some(Phase::Running) => agg.succeeded += 1, + Some(Phase::Failed) => agg.failed += 1, + Some(Phase::Pending) | None => agg.pending += 1, + } + } + if let Some(cr_key) = DeploymentKey::from_cr(cr) { + agg.last_error = state.last_error.get(&cr_key).cloned(); + } + agg +} + +async fn read_device_info(bucket: &Store) -> anyhow::Result> { + let mut out = HashMap::new(); + let mut keys = bucket.keys().await?; + while let Some(key_res) = keys.next().await { + let key = key_res?; + let Some(entry) = bucket.entry(&key).await? else { + continue; + }; + let Some(device_id) = key.strip_prefix("info.") else { + continue; + }; + match serde_json::from_slice::(&entry.value) { + Ok(info) => { + out.insert(device_id.to_string(), info); + } + Err(e) => { + tracing::warn!(%key, error = %e, "aggregator: bad device_info payload"); + } + } + } + Ok(out) +} + +#[cfg(test)] +mod tests { + use super::*; + use chrono::{TimeZone, Utc}; + use harmony_reconciler_contracts::Id; + use kube::api::ObjectMeta; + + fn dn(s: &str) -> DeploymentName { + DeploymentName::try_new(s).expect("valid test name") + } + + fn state(device: &str, deployment: &str, phase: Phase, seconds: i64) -> DeploymentState { + DeploymentState { + device_id: Id::from(device.to_string()), + deployment: dn(deployment), + phase, + last_event_at: Utc.timestamp_opt(1_700_000_000 + seconds, 0).unwrap(), + last_error: None, + } + } + + fn cr(namespace: &str, name: &str, devices: &[&str]) -> Deployment { + Deployment { + metadata: ObjectMeta { + name: Some(name.to_string()), + namespace: Some(namespace.to_string()), + ..Default::default() + }, + spec: crate::crd::DeploymentSpec { + target_devices: devices.iter().map(|s| s.to_string()).collect(), + score: crate::crd::ScorePayload { + type_: "PodmanV0".to_string(), + data: serde_json::json!({}), + }, + rollout: crate::crd::Rollout { + strategy: crate::crd::RolloutStrategy::Immediate, + }, + }, + status: None, + } + } + + fn demo_cr() -> Deployment { + cr("iot-demo", "hello", &["pi-01", "pi-02", "pi-03"]) + } + + fn demo_key() -> DeploymentKey { + DeploymentKey { + namespace: "iot-demo".to_string(), + name: "hello".to_string(), + } + } + + fn pair(device: &str, deployment: &str) -> DevicePair { + DevicePair { + device_id: device.to_string(), + deployment: dn(deployment), + } + } + + #[test] + fn compute_aggregate_counts_target_devices() { + let mut s = FleetState::default(); + s.states.insert( + pair("pi-01", "hello"), + state("pi-01", "hello", Phase::Running, 0), + ); + s.states.insert( + pair("pi-02", "hello"), + state("pi-02", "hello", Phase::Failed, 0), + ); + // pi-03 unreported → counted as pending + let agg = compute_aggregate(&s, &demo_cr()); + assert_eq!(agg.succeeded, 1); + assert_eq!(agg.failed, 1); + assert_eq!(agg.pending, 1); + } + + fn seeded_state() -> FleetState { + let mut s = FleetState::default(); + s.crs_by_name.insert(dn("hello"), vec![demo_key()]); + s + } + + #[test] + fn apply_state_marks_cr_dirty_and_captures_last_error() { + let mut s = seeded_state(); + let ds = DeploymentState { + last_error: Some("pull err".to_string()), + ..state("pi-01", "hello", Phase::Failed, 0) + }; + apply_state(&mut s, pair("pi-01", "hello"), ds); + assert!(s.dirty.contains(&demo_key())); + assert_eq!(s.last_error[&demo_key()].device_id, "pi-01"); + assert_eq!(s.last_error[&demo_key()].message, "pull err"); + } + + #[test] + fn apply_state_clears_last_error_on_return_to_running() { + let mut s = seeded_state(); + s.last_error.insert( + demo_key(), + AggregateLastError { + device_id: "pi-01".to_string(), + message: "pull err".to_string(), + at: "".to_string(), + }, + ); + apply_state( + &mut s, + pair("pi-01", "hello"), + state("pi-01", "hello", Phase::Running, 0), + ); + assert!(!s.last_error.contains_key(&demo_key())); + } + + #[test] + fn apply_state_ignores_stale_timestamp() { + let mut s = FleetState::default(); + apply_state( + &mut s, + pair("pi-01", "hello"), + state("pi-01", "hello", Phase::Running, 10), + ); + apply_state( + &mut s, + pair("pi-01", "hello"), + state("pi-01", "hello", Phase::Failed, 5), + ); + assert_eq!(s.states[&pair("pi-01", "hello")].phase, Phase::Running); + } + + #[test] + fn drop_state_removes_entry_and_clears_last_error() { + let mut s = seeded_state(); + s.states.insert( + pair("pi-01", "hello"), + state("pi-01", "hello", Phase::Running, 0), + ); + s.last_error.insert( + demo_key(), + AggregateLastError { + device_id: "pi-01".to_string(), + message: "old".to_string(), + at: "".to_string(), + }, + ); + drop_state(&mut s, &pair("pi-01", "hello")); + assert!(!s.states.contains_key(&pair("pi-01", "hello"))); + assert!(!s.last_error.contains_key(&demo_key())); + } + + #[test] + fn parse_state_key_roundtrip() { + assert_eq!( + parse_state_key("state.pi-01.hello"), + Some(pair("pi-01", "hello")) + ); + assert_eq!(parse_state_key("nope"), None); + assert_eq!(parse_state_key("state.missing-deployment"), None); + } +} diff --git a/iot/iot-operator-v0/src/lib.rs b/iot/iot-operator-v0/src/lib.rs index 8ae640a4..b1214fc4 100644 --- a/iot/iot-operator-v0/src/lib.rs +++ b/iot/iot-operator-v0/src/lib.rs @@ -6,5 +6,5 @@ //! — can import the typed `Deployment`, `DeploymentSpec`, //! `ScorePayload`, etc. without duplicating them. -pub mod aggregate; pub mod crd; +pub mod fleet_aggregator; diff --git a/iot/iot-operator-v0/src/main.rs b/iot/iot-operator-v0/src/main.rs index 8c686216..f314db6d 100644 --- a/iot/iot-operator-v0/src/main.rs +++ b/iot/iot-operator-v0/src/main.rs @@ -1,15 +1,15 @@ mod controller; mod install; -// `crd` + `aggregate` modules are owned by the library target (see -// `lib.rs`); the binary imports from there so the types aren't +// `crd` + `fleet_aggregator` modules are owned by the library target +// (see `lib.rs`); the binary imports from there so the types aren't // compiled twice. -use iot_operator_v0::{aggregate, crd}; +use iot_operator_v0::{crd, fleet_aggregator}; use anyhow::Result; use async_nats::jetstream; use clap::{Parser, Subcommand}; -use harmony_reconciler_contracts::{BUCKET_AGENT_STATUS, BUCKET_DESIRED_STATE}; +use harmony_reconciler_contracts::BUCKET_DESIRED_STATE; use kube::Client; #[derive(Parser)] @@ -61,7 +61,11 @@ async fn main() -> Result<()> { } async fn run(nats_url: &str, bucket: &str) -> Result<()> { - let nats = async_nats::connect(nats_url).await?; + // Short retry loop on the initial connect. Startup races against + // the NATS server becoming ready (k3d loadbalancer accepting TCP + // before the NATS pod answers the protocol handshake), and a + // hard-fail on the very first attempt produces no useful signal. + let nats = connect_with_retry(nats_url).await?; tracing::info!(url = %nats_url, "connected to NATS"); let js = jetstream::new(nats); let desired_state_kv = js @@ -71,22 +75,32 @@ async fn run(nats_url: &str, bucket: &str) -> Result<()> { }) .await?; tracing::info!(bucket = %bucket, "KV bucket ready"); - let status_kv = js - .create_key_value(jetstream::kv::Config { - bucket: BUCKET_AGENT_STATUS.to_string(), - ..Default::default() - }) - .await?; - tracing::info!(bucket = %BUCKET_AGENT_STATUS, "agent-status bucket ready"); let client = Client::try_default().await?; - // Controller + aggregator run concurrently. If either returns - // an error, tear down the whole process — kube-rs's Controller - // already handles transient reconcile failures internally. + // Controller (CR → desired-state KV) + aggregator (device-info + // + device-state → CR status). Either failing tears the whole + // process down; kube-rs's Controller already handles transient + // reconcile errors internally. let ctl_client = client.clone(); tokio::select! { r = controller::run(ctl_client, desired_state_kv) => r, - r = aggregate::run(client, status_kv) => r, + r = fleet_aggregator::run(client, js) => r, } } + +async fn connect_with_retry(nats_url: &str) -> Result { + use std::time::Duration; + let mut last_err: Option = None; + for attempt in 0..15 { + match async_nats::connect(nats_url).await { + Ok(c) => return Ok(c), + Err(e) => { + tracing::warn!(attempt, error = %e, "NATS connect failed; retrying"); + last_err = Some(e.into()); + tokio::time::sleep(Duration::from_secs(2)).await; + } + } + } + Err(last_err.unwrap_or_else(|| anyhow::anyhow!("NATS connect failed after retries"))) +} diff --git a/iot/scripts/load-test.sh b/iot/scripts/load-test.sh new file mode 100755 index 00000000..a7cf8023 --- /dev/null +++ b/iot/scripts/load-test.sh @@ -0,0 +1,252 @@ +#!/usr/bin/env bash +# Load-test harness for the IoT operator's fleet_aggregator. +# +# Brings up the minimum stack (k3d + in-cluster NATS + CRD + operator) +# with no VM or real agent, then runs the `iot_load_test` binary +# which simulates N devices pushing DeploymentState to NATS. +# +# All stable paths under $WORK_DIR (default /tmp/iot-load-test) so you +# can point kubectl / tail at them while the test is running. +# +# Quick usage: +# iot/scripts/load-test.sh # 100-device default (55 + 9×5) +# HOLD=1 iot/scripts/load-test.sh # leave stack running for exploration +# DEVICES=10000 GROUP_SIZES=5500,500,500,500,500,500,500,500,500,500 \ +# DURATION=90 iot/scripts/load-test.sh +# +# While it's running, in another terminal: +# export KUBECONFIG=/tmp/iot-load-test/kubeconfig +# kubectl get deployments.iot.nationtech.io -A -w +# kubectl get deployments.iot.nationtech.io -A \ +# -o custom-columns=NAME:.metadata.name,RUN:.status.aggregate.succeeded,FAIL:.status.aggregate.failed,PEND:.status.aggregate.pending +# tail -f /tmp/iot-load-test/operator.log +# +# Set DEBUG=1 to bump RUST_LOG so the operator logs every status patch. + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" +OPERATOR_DIR="$REPO_ROOT/iot/iot-operator-v0" + +# ---- config ----------------------------------------------------------------- + +K3D_BIN="${K3D_BIN:-$HOME/.local/share/harmony/k3d/k3d}" +CLUSTER_NAME="${CLUSTER_NAME:-iot-load}" +NATS_NAMESPACE="${NATS_NAMESPACE:-iot-system}" +NATS_NAME="${NATS_NAME:-iot-nats}" +NATS_NODE_PORT="${NATS_NODE_PORT:-4222}" +NATS_IMAGE="${NATS_IMAGE:-docker.io/library/nats:2.10-alpine}" + +DEVICES="${DEVICES:-100}" +GROUP_SIZES="${GROUP_SIZES:-55,5,5,5,5,5,5,5,5,5}" +TICK_MS="${TICK_MS:-1000}" +DURATION="${DURATION:-60}" +NAMESPACE="${NAMESPACE:-iot-load}" + +# Keep the stack alive after the test completes so the user can poke +# at CRs + NATS interactively. Ctrl-C to tear everything down. +HOLD="${HOLD:-0}" + +# Stable working dir so kubectl + tail targets are predictable. +WORK_DIR="${WORK_DIR:-/tmp/iot-load-test}" +mkdir -p "$WORK_DIR" + +KUBECONFIG_FILE="$WORK_DIR/kubeconfig" +OPERATOR_LOG="$WORK_DIR/operator.log" +OPERATOR_PID="" + +log() { printf '\033[1;34m[load-test]\033[0m %s\n' "$*"; } +fail() { printf '\033[1;31m[load-test FAIL]\033[0m %s\n' "$*" >&2; exit 1; } + +cleanup() { + local rc=$? + log "cleanup…" + if [[ -n "$OPERATOR_PID" ]] && kill -0 "$OPERATOR_PID" 2>/dev/null; then + kill "$OPERATOR_PID" 2>/dev/null || true + wait "$OPERATOR_PID" 2>/dev/null || true + fi + "$K3D_BIN" cluster delete "$CLUSTER_NAME" >/dev/null 2>&1 || true + if [[ $rc -ne 0 && -s "$OPERATOR_LOG" ]]; then + log "operator log at $OPERATOR_LOG (kept for inspection)" + echo "----- operator log tail -----" + tail -n 60 "$OPERATOR_LOG" 2>/dev/null || true + else + # Leave the operator log on success too — cheap, often useful. + log "operator log at $OPERATOR_LOG" + fi + exit $rc +} +trap cleanup EXIT INT TERM + +require() { command -v "$1" >/dev/null 2>&1 || fail "missing required tool: $1"; } +require cargo +require kubectl +require podman +require docker +[[ -x "$K3D_BIN" ]] || fail "k3d binary not executable at $K3D_BIN" + +# ---- phase 1: k3d cluster --------------------------------------------------- + +log "phase 1: create k3d cluster '$CLUSTER_NAME' (host port $NATS_NODE_PORT → loadbalancer)" +"$K3D_BIN" cluster delete "$CLUSTER_NAME" >/dev/null 2>&1 || true +"$K3D_BIN" cluster create "$CLUSTER_NAME" \ + --wait --timeout 90s \ + -p "${NATS_NODE_PORT}:${NATS_NODE_PORT}@loadbalancer" \ + >/dev/null +"$K3D_BIN" kubeconfig get "$CLUSTER_NAME" > "$KUBECONFIG_FILE" +export KUBECONFIG="$KUBECONFIG_FILE" + +# ---- phase 2: NATS in-cluster ------------------------------------------------ + +log "phase 2a: sideload NATS image ($NATS_IMAGE)" +if ! docker image inspect "$NATS_IMAGE" >/dev/null 2>&1; then + if ! podman image inspect "$NATS_IMAGE" >/dev/null 2>&1; then + podman pull "$NATS_IMAGE" >/dev/null || fail "podman pull $NATS_IMAGE failed" + fi + tmptar="$(mktemp -t nats-image.XXXXXX.tar)" + podman save "$NATS_IMAGE" -o "$tmptar" >/dev/null + docker load -i "$tmptar" >/dev/null + rm -f "$tmptar" +fi +"$K3D_BIN" image import "$NATS_IMAGE" -c "$CLUSTER_NAME" >/dev/null + +log "phase 2b: install NATS via NatsBasicScore" +( + cd "$REPO_ROOT" + cargo run -q --release -p example_iot_nats_install -- \ + --namespace "$NATS_NAMESPACE" \ + --name "$NATS_NAME" \ + --expose load-balancer +) +kubectl -n "$NATS_NAMESPACE" wait --for=condition=Available \ + "deployment/$NATS_NAME" --timeout=120s >/dev/null + +log "probing nats://localhost:$NATS_NODE_PORT end-to-end" +for _ in $(seq 1 60); do + (echo >"/dev/tcp/127.0.0.1/$NATS_NODE_PORT") 2>/dev/null && break + sleep 1 +done +(echo >"/dev/tcp/127.0.0.1/$NATS_NODE_PORT") 2>/dev/null \ + || fail "TCP localhost:$NATS_NODE_PORT never came up" + +# ---- phase 3: CRD + operator ------------------------------------------------ + +log "phase 3: install CRD" +( + cd "$OPERATOR_DIR" + cargo run -q -- install +) +kubectl wait --for=condition=Established \ + "crd/deployments.iot.nationtech.io" --timeout=30s >/dev/null + +log "phase 4: start operator" +( + cd "$OPERATOR_DIR" + cargo build -q --release +) + +# Default log level exposes the CR patch loop + watch attach; DEBUG=1 +# bumps it so every status patch + transition is printed. +if [[ "${DEBUG:-0}" == "1" ]]; then + OPERATOR_RUST_LOG="debug,async_nats=warn,hyper=warn,rustls=warn,kube=info" +else + OPERATOR_RUST_LOG="info,kube_runtime=warn" +fi + +NATS_URL="nats://localhost:$NATS_NODE_PORT" \ +KV_BUCKET="desired-state" \ +RUST_LOG="$OPERATOR_RUST_LOG" \ + "$REPO_ROOT/target/release/iot-operator-v0" \ + >"$OPERATOR_LOG" 2>&1 & +OPERATOR_PID=$! +log "operator pid=$OPERATOR_PID" +for _ in $(seq 1 30); do + if grep -q "starting Deployment controller" "$OPERATOR_LOG"; then break; fi + if ! kill -0 "$OPERATOR_PID" 2>/dev/null; then fail "operator exited early"; fi + sleep 0.5 +done +grep -q "starting Deployment controller" "$OPERATOR_LOG" \ + || fail "operator never logged controller startup" + +# ---- explore banner (before the load run so the user can start watching) ---- + +print_banner() { + cat </dev/null || echo 0)" +warnings="$(grep -c " WARN " "$OPERATOR_LOG" 2>/dev/null || echo 0)" +errors="$(grep -c " ERROR " "$OPERATOR_LOG" 2>/dev/null || echo 0)" +log " CR status patches logged (DEBUG-level; use DEBUG=1 to surface): $patches" +log " operator warnings: $warnings errors: $errors" +if [[ "$errors" -gt 0 ]]; then + echo "----- operator error lines -----" + grep " ERROR " "$OPERATOR_LOG" | tail -20 +fi + +# ---- hold open (optional) --------------------------------------------------- + +if [[ "$HOLD" == "1" ]]; then + print_banner + log "HOLD=1 — stack is still running. Ctrl-C to tear down." + # Block until user interrupts; cleanup trap does the teardown. + while true; do sleep 60; done +fi + +log "PASS" diff --git a/iot/scripts/smoke-a3.sh b/iot/scripts/smoke-a3.sh index 8bb8d5a5..2565bfda 100755 --- a/iot/scripts/smoke-a3.sh +++ b/iot/scripts/smoke-a3.sh @@ -136,34 +136,34 @@ case "$ARCH" in aarch64|arm64) STATUS_TIMEOUT=300 ;; *) STATUS_TIMEOUT=60 ;; esac -log "phase 4: wait for agent to report status to NATS (timeout=${STATUS_TIMEOUT}s)" +log "phase 4: wait for agent to report heartbeat to NATS (timeout=${STATUS_TIMEOUT}s)" wait_for_status() { local timeout=$1 for _ in $(seq 1 "$timeout"); do if podman run --rm --network "$NATS_NET_NAME" \ docker.io/natsio/nats-box:latest \ - nats --server "nats://$NATS_CONTAINER:4222" kv get agent-status \ - "status.$DEVICE_ID" --raw >/dev/null 2>&1; then + nats --server "nats://$NATS_CONTAINER:4222" kv get device-heartbeat \ + "heartbeat.$DEVICE_ID" --raw >/dev/null 2>&1; then return 0 fi sleep 1 done return 1 } -wait_for_status "$STATUS_TIMEOUT" || fail "agent-status never appeared for $DEVICE_ID" -log "agent status present on NATS" +wait_for_status "$STATUS_TIMEOUT" || fail "device-heartbeat never appeared for $DEVICE_ID" +log "agent heartbeat present on NATS" # ---------------------------- phase 5: hard power-cycle, expect recovery ---------------------------- log "phase 5: power-cycle VM (virsh destroy + start) → agent must reconnect to NATS" nats_status_timestamp() { - # Prints the "timestamp" field of the status. entry, or "". + # Prints the "at" field of the heartbeat. entry, or "". # Never errors (for `set -e` safety). podman run --rm --network "$NATS_NET_NAME" \ docker.io/natsio/nats-box:latest \ - nats --server "nats://$NATS_CONTAINER:4222" kv get agent-status \ - "status.$DEVICE_ID" --raw 2>/dev/null \ - | grep -oE '"timestamp":"[^"]+"' \ + nats --server "nats://$NATS_CONTAINER:4222" kv get device-heartbeat \ + "heartbeat.$DEVICE_ID" --raw 2>/dev/null \ + | grep -oE '"at":"[^"]+"' \ | head -1 | cut -d'"' -f4 || true } diff --git a/iot/scripts/smoke-a4.sh b/iot/scripts/smoke-a4.sh index c7fe913a..2f0741d4 100755 --- a/iot/scripts/smoke-a4.sh +++ b/iot/scripts/smoke-a4.sh @@ -349,17 +349,17 @@ done NATSBOX_HOST="podman run --rm docker.io/natsio/nats-box:latest \ nats --server nats://host.containers.internal:$NATS_NODE_PORT" -log "checking agent heartbeat in NATS KV (agent-status bucket)" +log "checking agent heartbeat in NATS KV (device-heartbeat bucket)" for _ in $(seq 1 30); do - if $NATSBOX_HOST kv get agent-status "status.$DEVICE_ID" --raw \ + if $NATSBOX_HOST kv get device-heartbeat "heartbeat.$DEVICE_ID" --raw \ >/dev/null 2>&1; then break fi sleep 2 done -$NATSBOX_HOST kv get agent-status "status.$DEVICE_ID" --raw >/dev/null \ - || fail "agent never published status to NATS" -log "agent heartbeat present: status.$DEVICE_ID" +$NATSBOX_HOST kv get device-heartbeat "heartbeat.$DEVICE_ID" --raw >/dev/null \ + || fail "agent never published heartbeat to NATS" +log "agent heartbeat present: heartbeat.$DEVICE_ID" # ---- phase 7: either hand off to user, or drive regression ------------------ @@ -508,8 +508,9 @@ $(printf '\033[1mInspect NATS KV (natsbox):\033[0m\n') alias natsbox='podman run --rm docker.io/natsio/nats-box:latest nats --server nats://host.containers.internal:$NATS_NODE_PORT' natsbox kv ls desired-state natsbox kv get desired-state '$DEVICE_ID.$DEPLOY_NAME' --raw - natsbox kv ls agent-status - natsbox kv get agent-status 'status.$DEVICE_ID' --raw + natsbox kv ls device-state + natsbox kv ls device-heartbeat + natsbox kv get device-heartbeat 'heartbeat.$DEVICE_ID' --raw $(printf '\033[1mHit the deployed nginx:\033[0m\n') curl http://$VM_IP:${DEPLOY_PORT%%:*}/