feat/iot-aggregation-scale #274

Closed
johnride wants to merge 18 commits from feat/iot-aggregation-scale into feat/iot-operator-helm-chart
21 changed files with 2525 additions and 825 deletions

20
Cargo.lock generated
View File

@@ -3179,6 +3179,25 @@ dependencies = [
"tokio",
]
[[package]]
name = "example_iot_load_test"
version = "0.1.0"
dependencies = [
"anyhow",
"async-nats",
"chrono",
"clap",
"harmony-reconciler-contracts",
"iot-operator-v0",
"k8s-openapi",
"kube",
"rand 0.9.2",
"serde_json",
"tokio",
"tracing",
"tracing-subscriber",
]
[[package]]
name = "example_iot_nats_install"
version = "0.1.0"
@@ -3758,6 +3777,7 @@ dependencies = [
"harmony_types",
"serde",
"serde_json",
"thiserror 2.0.18",
]
[[package]]

View File

@@ -0,0 +1,406 @@
# Chapter 4 — Aggregation architecture at IoT scale
> **Status: design draft (2026-04-22)**
>
> Design document for the Chapter 4 aggregation rework. Review first,
> implement after. Supersedes the Chapter 2 aggregator's O(deployments × devices)
> per-tick recompute, which works for a 10-device smoke but breaks
> the moment a real fleet lands.
## 1. Why now
We have no real deployment in the field yet. That's a liability when
shipping (no user, no revenue) but a gift when designing: we can move
the data model before customers depend on it. After a partner fleet
lands, changing the aggregation substrate is a multi-quarter
migration. Doing it now is days of work.
Chapter 2's aggregator was the right "make it work" design for a
walking-skeleton proof. It's the wrong "make it scale" design for a
partner deployment of even a few hundred devices, let alone the
fleet sizes the product thesis targets. This chapter replaces it.
## 2. What's wrong today
**Per-tick cost, current design.** Every 5 seconds, for each
Deployment CR, resolve the selector against the full device snapshot
and fold into an aggregate:
```
O(deployments × devices) per tick
+ 1 kube patch per CR per tick
```
At 10k deployments × 1M devices, that's 10^10 selector evaluations
and 10k apiserver patches every 5 s. Nothing resembles viable there.
**What else goes wrong at scale.**
- The operator holds the full fleet snapshot in memory. 1M `AgentStatus`
payloads × a few kB each = GB of heap, dominated by `recent_events`
rings.
- Agent heartbeats publish the whole `AgentStatus` every 30 s — a lot
of bytes on the wire whose only incremental content is usually a
timestamp update.
- `agent-status` is a KV bucket. KV is designed for "latest value per
key," not "stream of state changes." We've been using it for both
roles and paying the worst of each.
- Logs are nowhere yet (good — this is the moment to put them in the
right place before we're committed).
## 3. Design overview
Shift to a **CQRS-style architecture** where devices write their
authoritative state, and the operator maintains incrementally-updated
aggregates driven by state-change events.
```
device (N× agents) operator
────────────────── ────────
current state keys ───reads─▶ on cold-start:
(authoritative) walk keys → rebuild counters
then: stream consumer
state-change events ═ JS stream═▶ ± counters per event
(delta stream) ± update reverse index
on tick (1 Hz):
device_info keys ───reads─▶ patch .status for dirty deployments
(labels, inventory)
logs ───at-least-once NATS subj────▶ not stored centrally
(streamed on query)
```
Three substrates, each chosen for its fit:
- **JetStream KV, per-device keys** — device-authoritative state.
Cheap to read when needed, never scanned globally at scale.
- **JetStream stream, per-device events** — ordered delta feed.
Operator consumers replay on restart, consume incrementally during
steady state.
- **Plain NATS subjects, logs** — at-least-once pub/sub, device-side
buffering (~10k lines), streamed on query.
## 4. Data model
### 4.1 NATS KV buckets
**`device-info`** — static-ish facts per device, infrequent updates.
| Key | Value | Written by | Read by |
|-----|-------|------------|---------|
| `info.<device_id>` | `DeviceInfo` (labels, inventory, agent_version) | agent on startup + label change | operator (selector resolution, inventory display) |
**`device-state`** — current phase per deployment per device.
Authoritative source of truth for "what's running where."
| Key | Value | Written by | Read by |
|-----|-------|------------|---------|
| `state.<device_id>.<deployment_name>` | `DeploymentState` (phase, last_event_at, last_error) | agent on reconcile transition | operator on cold-start only |
One key per (device, deployment) pair. Natural TTL via JetStream KV
per-key history — lets us cap the keyspace.
**`device-heartbeat`** — liveness only. Tiny payload, frequent
updates.
| Key | Value | Written by | Read by |
|-----|-------|------------|---------|
| `heartbeat.<device_id>` | `{ timestamp }` (32 bytes) | agent every 30s | operator (stale detection) |
Separate from `device-state` so routine heartbeats don't churn the
state keys or emit spurious state-change events.
### 4.2 NATS JetStream stream
**`device-events`** — ordered delta feed for operator aggregation.
- Subject: `events.state.<device_id>.<deployment_name>`
- Payload: `StateChangeEvent { from: Phase, to: Phase, at, last_error }`
- Retention: time-based (e.g. 24h) — consumers that fall further
behind than retention rebuild from `device-state` KV on recovery.
- Agents emit one event per phase transition, **not** per heartbeat.
Separate stream for **event log** (user-facing reconcile log events):
- Subject: `events.log.<device_id>`
- Payload: `LogEvent { at, severity, message, deployment? }`
- Retention: time-based (1h, enough for "show me what happened the
last few minutes" queries; the device's in-memory ring holds the
rest).
### 4.3 Log transport (NOT JetStream)
- Subject: `logs.<device_id>` — plain pub/sub, at-least-once
- Not persisted by NATS
- Device buffers last ~10k lines in a ring buffer
- Query protocol: request-reply on `logs.<device_id>.query`
- Device responds with buffer contents, then streams live tail
until the query closes
This is a dedicated transport because structured logs at fleet scale
(1M devices × 1k lines/h = 1B messages/h) would crush JetStream's
per-subject storage without adding operator-visible value. Operators
only look at logs on-demand, per-device; device-side buffering
matches the access pattern.
### 4.4 CRD fields
Minimal change from Chapter 2:
- `.status.aggregate.succeeded | failed | pending` — now sourced
from counters, not per-tick fold.
- `.status.aggregate.last_error` — updated on `to: Failed` events.
- `.status.aggregate.last_heartbeat_at` — from the per-deployment
latest event.
- `.status.aggregate.recent_events` — bounded per-deployment ring,
updated on event arrival.
- **Drop** `.status.aggregate.unreported` (no meaningful definition
under selector-based targeting — already removed in the pre-chapter
cleanup).
- **Add** `.status.aggregate.stale: u32` — count of devices matching
the selector whose last heartbeat is older than a threshold
(default 5 min). This is the replacement for "unreported" that
makes sense at scale. Computed on tick from the operator's
reverse-indexed view, not per-device query.
### 4.5 Operator in-memory state
- **Counters** — `HashMap<DeploymentKey, PhaseCounters>`, one entry
per CR, updated atomically on event arrival.
- **Reverse index** — `HashMap<DeviceId, HashSet<DeploymentKey>>`,
updated when a device's labels change or when a CR's selector
changes. Lets a state-change event find affected deployments in
O(deployments-matching-this-device) rather than O(all-deployments).
- **Last-error rollup** — per deployment, the most recent error
keyed by timestamp.
- **Recent-events ring** — per deployment, bounded by N (e.g. 10).
- **Dirty set** — deployments whose aggregate has changed since last
patch. Tick reads + clears this set; only dirty deployments get
patched.
Operator heap is bounded by fleet + deployment count, not their
product.
## 5. Counter invariants (the contract)
Correctness rests on two rules:
### 5.1 Device publishes exactly one transition per reconcile outcome
Every reconcile results in a state. If the state differs from the
last published state for `(device, deployment)`, the agent:
1. Writes the new state to `state.<device>.<deployment>` KV (CAS
against expected-revision for multi-writer safety — only one
agent process per device, so contention is theoretical).
2. Publishes a `StateChangeEvent` to
`events.state.<device>.<deployment>`.
These two writes must be atomic from the agent's perspective — if
(1) succeeds and (2) fails (or vice versa), the agent retries until
both reach NATS. Worst case: a duplicate event on the stream;
counter handles duplicates via `from → to` structure (see 5.2).
### 5.2 Counters are driven by transitions, not snapshots
Each event carries `from: Phase, to: Phase`. Counter update is a
single atomic action:
```rust
counters[(deployment, from)] -= 1;
counters[(deployment, to)] += 1;
```
Duplicates (same `from → to` replayed) are a no-op if `from` ==
current phase for that (device, deployment) — the operator
cross-checks the device's current state in the reverse index before
applying. A duplicate past event is detected and ignored; a duplicate
current event is idempotent anyway (counters converge).
### 5.3 The bootstrap transition
A device's first-ever event for a deployment has `from: None` (or a
sentinel `Unassigned` variant): counter update is just `to`
increment.
### 5.4 Device leaves fleet
When a device's heartbeat goes stale past threshold + grace, OR
when its labels no longer match the deployment's selector:
- Counters are decremented for every deployment the device was
previously contributing to (via the reverse index).
- The device's state keys aren't touched — they're the authoritative
record; a device re-joining resumes from them.
### 5.5 CR created / selector changed
The reverse index + counters are rebuilt for the affected CR by
walking `device-info` + `device-state` once (O(devices + states)
local NATS KV reads). Cheap for a single CR; happens at CR-apply
time, not on every tick.
## 6. Cold-start protocol
On operator process start:
1. **Load CRs** — list `Deployment` CRs via kube API. Build the
reverse index skeleton (deployment → selector).
2. **Load device labels** — iterate `device-info` KV keys once.
Resolve each device against every CR's selector, populate the
reverse index device-side entries. O(devices × CRs), one-time,
in-memory. For 1M devices × 10k CRs this is 10^10 op but purely
local lookups (BTreeMap matches on label maps); back-of-envelope
has it at a few seconds to a minute on a modern CPU.
3. **Rebuild counters** — iterate `device-state` KV keys once.
For each `state.<device>.<deployment>`, look up the matching
deployments from the reverse index and increment counters.
4. **Attach stream consumer** — durable consumer on
`events.state.>`, starting from the newest sequence at cold-start
moment. The KV walk was the "past"; the stream is the "future."
5. **Begin tick loop** — patch dirty CRs on a 1 Hz schedule.
Cold-start time dominated by step 2, not step 3. An ArgoCD-style
"pause all reconciles during leader election / startup" envelope
keeps the CR patches from competing with the cold-start scans.
**What if the operator falls behind the stream's retention window?**
Reset to step 3 (re-walk `device-state`). The KV is authoritative;
the stream is an accelerator.
## 7. CR status patch cadence
- Counter updates happen in memory, instantly.
- The **dirty set** captures which deployments' aggregates changed
since the last patch.
- A 1 Hz ticker reads + clears the dirty set, patches those CRs.
- Individual CR patches are debounced to at most once per second
— avoids hammering the apiserver when a deployment is mid-rollout
and devices are transitioning in a burst.
Steady-state operator → apiserver traffic is proportional to the
rate of *interesting* changes, not to fleet size.
## 8. Failure modes
| Scenario | Detection | Recovery |
|---|---|---|
| Operator crash | k8s restarts the pod | Cold-start protocol §6 |
| Stream consumer falls behind retention | Stream API returns out-of-range | Re-run §6 step 3 (re-walk KV) |
| Agent publishes event but KV write fails | Agent-side local retry; event is replayed | Counter is idempotent per §5.2 |
| Agent writes KV but event publish fails | Agent-side local retry | Operator never sees the transition until retry succeeds; stale threshold catches the device if agent is permanently broken |
| Device's label change lost | Heartbeat carries current labels; stale entry aged out | Periodic sync (e.g. 1/h) re-scans `device-info` to catch drift |
| Duplicate event (retry) | `from == current` in reverse index | No-op (§5.2) |
| Out-of-order event (retry ordering) | Sequence number on event | Consumer tracks per-(device, deployment) last-applied sequence; old events ignored |
## 9. Scale back-of-envelope
**Target:** 1M devices, 10k deployments, p50 reconcile rate 1 event
per device per hour.
- **Event volume.** 1M × (1/3600s) = 278 events/s.
- **Operator event-processing cost.** Each event touches a bounded
number of in-memory counters (via reverse index). At 278 eps, this
is ~1 µs-equivalent of CPU, ~0 network (JetStream local to operator).
- **Operator → apiserver patches.** Deployments change at a rate
far below event rate; debounced dirty-set drains limit patches to
a few per second even during bursty rollouts.
- **Operator memory.** Reverse index entries (device_id + set of
deployment keys) ≈ 200 bytes × 1M = 200 MB. Counters ≈ 10k × few
fields = negligible. Last-error + recent-events rings ≈ 10k × 10
entries × 512 bytes = 50 MB. Total ~250 MB — fine.
- **Cold-start time.** 1M KV reads × amortized 0.1 ms (JetStream KV
is fast for key iteration) = 100 s. Acceptable for a
several-minute-once-per-release recovery window. If it becomes a
problem, chunk the walk and resume-from-checkpoint.
- **Stale device sweep.** On each tick, O(dirty set × reverse index
lookups). Stale detection itself is O(devices-whose-heartbeat-is-old);
a second, slower ticker (e.g. 30 s) scans the heartbeat KV for
entries older than threshold and emits synthetic "device went
stale" events that drive the same counter-decrement path.
## 10. Schema migration
`Deployment` CRD is still `v1alpha1`, not deployed anywhere, so no
migration machinery is needed for the CRD itself — we just change
the aggregate subtree definition.
`harmony-reconciler-contracts::AgentStatus` is deprecated by this
chapter. Replaced by narrower wire types:
- `DeviceInfo` — what `info.<device_id>` stores
- `DeploymentState` — what `state.<device>.<dep>` stores
- `HeartbeatPayload` — what `heartbeat.<device_id>` stores
- `StateChangeEvent` — what events stream emits
- `LogEvent` — what event-log stream emits
The old `AgentStatus` type goes away when the old aggregator
goes away. Clean break, same CRD version.
## 11. Implementation milestones
Landing order, each a reviewable increment:
1. **M1: new contracts crate shapes**`DeviceInfo`,
`DeploymentState`, `HeartbeatPayload`, `StateChangeEvent`,
`LogEvent`. Round-trip serde tests. No runtime code changes yet.
2. **M2: agent-side rewrite** — agent writes the new KV shapes +
publishes state-change events + heartbeats. Old `AgentStatus`
publish path stays in parallel for the smoke to keep passing.
3. **M3: operator-side cold-start protocol** — new operator task
that walks the new KV buckets and builds in-memory counters.
Runs alongside the old aggregator; logs counter parity checks
against the legacy aggregator's output so we can verify
correctness before switching over.
4. **M4: operator-side event consumer** — attach the durable stream
consumer, drive counters incrementally. Parity checks still on.
5. **M5: flip CR patch source** — the new counter-backed aggregator
patches `.status.aggregate`, the legacy one goes read-only, then
deleted in the next commit.
6. **M6: logs subject + query protocol** — device-side ring buffer,
query API, a first CLI surface (`natiq logs device=X` or
equivalent) that drives it.
7. **M7: synthetic-scale test harness** — spin up 1k (then 10k) mock
agents in-process, drive a realistic event load through the
operator, measure + publish numbers.
8. **M8: delete legacy `AgentStatus`**`harmony-reconciler-contracts`
cleanup, smoke-a4 updates.
M1-M5 can land on one branch; M6 is adjacent work; M7-M8 close out.
## 12. Open questions
- **Multi-operator HA.** The design assumes one operator at a time.
Adding HA means either (a) one active + one standby operator with
NATS-based leader election, or (b) shared counter state in KV
instead of in-memory. (a) is simpler; (b) scales better.
Defer until a specific availability target demands it.
- **Counter-KV snapshots.** Should we periodically snapshot the
in-memory counter state to a `counters` KV bucket so cold-start
can resume from a recent snapshot + a short stream tail, instead
of always re-walking `device-state`? Probably yes once cold-start
time becomes an operational concern, but not in the initial cut.
- **Stream retention tuning.** 24h for `events.state.>` is a guess.
Real number depends on observed operator downtime p99. Initial
setting, tune from operational data.
- **Compaction policy for `device-state` KV.** JetStream KV
per-key history can grow unbounded if phases churn. Set
`max_history_per_key = 1` (keep only latest value) unless there's
a reason to keep transition history (there isn't — that's what
the events stream is for).
- **Agent crash before publishing state-change event.** Transition
is durably captured in the agent's local podman state; on agent
restart the reconcile loop re-observes the phase and either
re-publishes (if it differs from `state.<device>.<dep>`) or stays
silent. Correctness preserved at the cost of event-stream ordering
ambiguity during the crash window — acceptable.
## 13. What this chapter deliberately does *not* change
- CRD `.spec.target_selector` semantics — stays exactly as shipped.
- Operator's kube-rs controller loop for CR reconcile — stays as is.
- Helm chart structure (Chapter 3) — orthogonal.
- Authentication (Chapter Auth) — orthogonal. When that chapter
lands, every subject + KV bucket above will be re-scoped under
device-specific NATS credentials; the topology above doesn't need
to change for that to slot in.

View File

@@ -0,0 +1,24 @@
[package]
name = "example_iot_load_test"
version.workspace = true
edition = "2024"
license.workspace = true
[[bin]]
name = "iot_load_test"
path = "src/main.rs"
[dependencies]
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
iot-operator-v0 = { path = "../../iot/iot-operator-v0" }
async-nats = { workspace = true }
chrono = { workspace = true }
kube = { workspace = true, features = ["runtime", "derive"] }
k8s-openapi.workspace = true
serde_json = { workspace = true }
tokio = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
anyhow = { workspace = true }
clap = { workspace = true }
rand = { workspace = true }

View File

@@ -0,0 +1,536 @@
//! Load test for the IoT operator's `fleet_aggregator`.
//!
//! Simulates N devices across M Deployment CRs, each device pushing
//! a `DeploymentState` update to NATS every `--tick-ms`. Measures
//! throughput on both sides (devices → NATS and operator → kube
//! apiserver) and, at the end of the run, verifies each CR's
//! `.status.aggregate` counters sum to its `target_devices.len()`.
//!
//! Assumes an already-running stack:
//! - NATS reachable at `--nats-url`
//! - k8s cluster with the operator's CRD installed (KUBECONFIG)
//! - the operator process running against the same NATS + cluster
//!
//! The `iot/scripts/smoke-a4.sh` script brings all three up — pass
//! `--hold` to leave them running, then run this binary.
//!
//! Typical invocation:
//!
//! cargo run -q -p example_iot_load_test -- \
//! --namespace iot-load \
//! --groups 55,5,5,5,5,5,5,5,5,5 \
//! --tick-ms 1000 \
//! --duration-s 60
use anyhow::{Context, Result};
use async_nats::jetstream::{self, kv};
use chrono::Utc;
use clap::Parser;
use harmony_reconciler_contracts::{
BUCKET_DEVICE_HEARTBEAT, BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE, DeploymentName,
DeploymentState, DeviceInfo, HeartbeatPayload, Id, Phase, device_heartbeat_key,
device_info_key, device_state_key,
};
use iot_operator_v0::crd::{
Deployment, DeploymentSpec, Rollout, RolloutStrategy, ScorePayload,
};
use k8s_openapi::api::core::v1::Namespace;
use kube::api::{Api, DeleteParams, Patch, PatchParams, PostParams};
use kube::Client;
use rand::Rng;
use std::collections::BTreeMap;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
use std::time::{Duration, Instant};
use tokio::task::JoinSet;
#[derive(Parser, Debug, Clone)]
#[command(
name = "iot_load_test",
about = "Synthetic load for the IoT operator's fleet_aggregator"
)]
struct Cli {
/// NATS URL (same one the operator connects to).
#[arg(long, default_value = "nats://localhost:4222")]
nats_url: String,
/// k8s namespace for the load-test Deployment CRs. Created if
/// missing.
#[arg(long, default_value = "iot-load")]
namespace: String,
/// Group shape — comma-separated device counts, one per CR.
/// Default: 100 devices over 10 groups (1 × 55 + 9 × 5).
#[arg(long, default_value = "55,5,5,5,5,5,5,5,5,5")]
groups: String,
/// Per-device tick in ms. Each tick publishes one DeploymentState.
#[arg(long, default_value_t = 1000)]
tick_ms: u64,
/// Heartbeat cadence in seconds (separate from the state tick).
#[arg(long, default_value_t = 30)]
heartbeat_s: u64,
/// Total run duration in seconds before tearing down.
#[arg(long, default_value_t = 60)]
duration_s: u64,
/// Report throughput every N seconds.
#[arg(long, default_value_t = 5)]
report_s: u64,
/// Keep the CRs + KV entries in place after the run instead of
/// deleting them. Useful with HOLD=1 to inspect the steady-state
/// aggregate after the load finishes.
#[arg(long)]
keep: bool,
}
/// Metrics collected across all device tasks.
#[derive(Default)]
struct Counters {
state_writes: AtomicU64,
heartbeat_writes: AtomicU64,
errors: AtomicU64,
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(tracing_subscriber::EnvFilter::from_default_env())
.init();
let cli = Cli::parse();
let group_sizes = parse_groups(&cli.groups)?;
let total: usize = group_sizes.iter().sum();
tracing::info!(
devices = total,
groups = group_sizes.len(),
shape = ?group_sizes,
tick_ms = cli.tick_ms,
duration_s = cli.duration_s,
"iot_load_test starting"
);
// --- NATS setup ----------------------------------------------------------
let nc = async_nats::connect(&cli.nats_url)
.await
.with_context(|| format!("connecting to NATS at {}", cli.nats_url))?;
let js = jetstream::new(nc);
let info_bucket = open_bucket(&js, BUCKET_DEVICE_INFO).await?;
let state_bucket = open_bucket(&js, BUCKET_DEVICE_STATE).await?;
let heartbeat_bucket = open_bucket(&js, BUCKET_DEVICE_HEARTBEAT).await?;
// --- kube setup ----------------------------------------------------------
let client = Client::try_default().await.context("kube client")?;
ensure_namespace(&client, &cli.namespace).await?;
let deployments: Api<Deployment> = Api::namespaced(client.clone(), &cli.namespace);
// --- plan groups + device ids --------------------------------------------
let plan = build_plan(&group_sizes);
apply_crs(&deployments, &plan).await?;
publish_device_infos(&info_bucket, &plan).await?;
// --- spawn simulators ----------------------------------------------------
let counters = Arc::new(Counters::default());
let mut sims = JoinSet::new();
let tick = Duration::from_millis(cli.tick_ms);
let hb_tick = Duration::from_secs(cli.heartbeat_s);
for device in &plan.devices {
let device = Arc::new(device.clone());
sims.spawn(simulate_state_loop(
device.clone(),
state_bucket.clone(),
counters.clone(),
tick,
));
sims.spawn(simulate_heartbeat_loop(
device.clone(),
heartbeat_bucket.clone(),
counters.clone(),
hb_tick,
));
}
// --- metrics reporter ----------------------------------------------------
let report_tick = Duration::from_secs(cli.report_s);
let reporter_counters = counters.clone();
let reporter = tokio::spawn(async move {
let mut ticker = tokio::time::interval(report_tick);
ticker.tick().await; // skip immediate fire
let mut prev_state = 0u64;
let mut prev_hb = 0u64;
loop {
ticker.tick().await;
let s = reporter_counters.state_writes.load(Ordering::Relaxed);
let h = reporter_counters.heartbeat_writes.load(Ordering::Relaxed);
let e = reporter_counters.errors.load(Ordering::Relaxed);
let dt = report_tick.as_secs_f64();
let ss = (s - prev_state) as f64 / dt;
let hh = (h - prev_hb) as f64 / dt;
tracing::info!(
state_writes_total = s,
state_writes_per_s = format!("{ss:.1}"),
heartbeats_total = h,
heartbeats_per_s = format!("{hh:.1}"),
errors = e,
"load"
);
prev_state = s;
prev_hb = h;
}
});
// --- run for duration ----------------------------------------------------
let started = Instant::now();
tokio::time::sleep(Duration::from_secs(cli.duration_s)).await;
reporter.abort();
sims.shutdown().await;
let elapsed = started.elapsed();
let s = counters.state_writes.load(Ordering::Relaxed);
let h = counters.heartbeat_writes.load(Ordering::Relaxed);
let e = counters.errors.load(Ordering::Relaxed);
tracing::info!(
elapsed_s = format!("{:.1}", elapsed.as_secs_f64()),
state_writes_total = s,
state_writes_per_s = format!("{:.1}", s as f64 / elapsed.as_secs_f64()),
heartbeats_total = h,
errors = e,
"run complete"
);
// --- give the aggregator a second to drain --------------------------------
tokio::time::sleep(Duration::from_secs(2)).await;
// --- verify CR status aggregates -----------------------------------------
let mut all_ok = true;
for group in &plan.groups {
let cr = deployments.get(&group.cr_name).await?;
let Some(status) = cr.status.as_ref().and_then(|s| s.aggregate.as_ref()) else {
tracing::warn!(cr = %group.cr_name, "aggregate missing on CR status");
all_ok = false;
continue;
};
let total_reported = status.succeeded + status.failed + status.pending;
let expected = group.devices.len() as u32;
let ok = total_reported == expected;
if !ok {
all_ok = false;
}
tracing::info!(
cr = %group.cr_name,
expected_devices = expected,
succeeded = status.succeeded,
failed = status.failed,
pending = status.pending,
total = total_reported,
ok,
"cr status"
);
}
if !cli.keep {
tracing::info!("cleanup: deleting CRs + KV entries");
for group in &plan.groups {
let _ = deployments
.delete(&group.cr_name, &DeleteParams::default())
.await;
}
for device in &plan.devices {
let _ = state_bucket
.delete(&device_state_key(
&device.device_id,
&DeploymentName::try_new(&device.cr_name).unwrap(),
))
.await;
let _ = info_bucket.delete(&device_info_key(&device.device_id)).await;
let _ = heartbeat_bucket
.delete(&device_heartbeat_key(&device.device_id))
.await;
}
}
if all_ok {
tracing::info!("PASS — all CR aggregates match device counts");
Ok(())
} else {
anyhow::bail!("FAIL — at least one CR aggregate did not sum to its target device count")
}
}
fn parse_groups(s: &str) -> Result<Vec<usize>> {
let out: Vec<usize> = s
.split(',')
.map(|t| t.trim().parse::<usize>())
.collect::<Result<_, _>>()
.context("parsing --groups")?;
if out.is_empty() {
anyhow::bail!("--groups must have at least one size");
}
Ok(out)
}
/// A single simulated device and the CR it belongs to.
#[derive(Clone)]
struct DevicePlan {
device_id: String,
cr_name: String,
}
#[derive(Clone)]
struct GroupPlan {
cr_name: String,
devices: Vec<String>,
}
struct Plan {
devices: Vec<DevicePlan>,
groups: Vec<GroupPlan>,
}
fn build_plan(group_sizes: &[usize]) -> Plan {
// CR-name + device-id width scale with group count so large runs
// get zero-padded ids that sort sensibly in kubectl.
let cr_width = group_sizes.len().to_string().len().max(2);
let total: usize = group_sizes.iter().sum();
let dev_width = total.to_string().len().max(5);
let mut devices = Vec::new();
let mut groups = Vec::new();
let mut next_id = 1usize;
for (i, size) in group_sizes.iter().enumerate() {
let cr_name = format!("load-group-{i:0cr_width$}");
let mut ids = Vec::with_capacity(*size);
for _ in 0..*size {
let id = format!("load-dev-{next_id:0dev_width$}");
next_id += 1;
devices.push(DevicePlan {
device_id: id.clone(),
cr_name: cr_name.clone(),
});
ids.push(id);
}
groups.push(GroupPlan {
cr_name,
devices: ids,
});
}
Plan { devices, groups }
}
async fn open_bucket(
js: &jetstream::Context,
bucket: &'static str,
) -> Result<kv::Store> {
Ok(js
.create_key_value(kv::Config {
bucket: bucket.to_string(),
history: 1,
..Default::default()
})
.await?)
}
async fn ensure_namespace(client: &Client, name: &str) -> Result<()> {
let api: Api<Namespace> = Api::all(client.clone());
if api.get_opt(name).await?.is_some() {
return Ok(());
}
let ns = Namespace {
metadata: kube::api::ObjectMeta {
name: Some(name.to_string()),
..Default::default()
},
..Default::default()
};
match api.create(&PostParams::default(), &ns).await {
Ok(_) => Ok(()),
Err(kube::Error::Api(ae)) if ae.code == 409 => Ok(()),
Err(e) => Err(e.into()),
}
}
async fn apply_crs(api: &Api<Deployment>, plan: &Plan) -> Result<()> {
let params = PatchParams::apply("iot-load-test").force();
let started = Instant::now();
// Cap concurrency so we don't overwhelm the apiserver on large
// fleets. 32 in-flight applies is well under typical apiserver
// QPS limits and keeps the startup latency predictable.
const CONCURRENCY: usize = 32;
let mut in_flight: JoinSet<Result<String>> = JoinSet::new();
let mut iter = plan.groups.iter();
for _ in 0..CONCURRENCY {
if let Some(group) = iter.next() {
in_flight.spawn(apply_one_cr(api.clone(), group.clone(), params.clone()));
}
}
while let Some(res) = in_flight.join_next().await {
res??;
if let Some(group) = iter.next() {
in_flight.spawn(apply_one_cr(api.clone(), group.clone(), params.clone()));
}
}
tracing::info!(
crs = plan.groups.len(),
elapsed_ms = started.elapsed().as_millis() as u64,
"applied Deployment CRs"
);
Ok(())
}
async fn apply_one_cr(
api: Api<Deployment>,
group: GroupPlan,
params: PatchParams,
) -> Result<String> {
let cr = Deployment::new(
&group.cr_name,
DeploymentSpec {
target_devices: group.devices.clone(),
// Score content doesn't matter — we're not running real
// agents against these CRs. The controller still writes
// to desired-state KV for each target device; that's
// wire noise we tolerate for realism.
score: ScorePayload {
type_: "PodmanV0".to_string(),
data: serde_json::json!({
"services": [{
"name": group.cr_name,
"image": "docker.io/library/nginx:alpine",
"ports": ["8080:80"],
}],
}),
},
rollout: Rollout {
strategy: RolloutStrategy::Immediate,
},
},
);
api.patch(&group.cr_name, &params, &Patch::Apply(&cr))
.await
.with_context(|| format!("applying CR {}", group.cr_name))?;
Ok(group.cr_name)
}
async fn publish_device_infos(bucket: &kv::Store, plan: &Plan) -> Result<()> {
let started = Instant::now();
const CONCURRENCY: usize = 64;
let mut in_flight: JoinSet<Result<()>> = JoinSet::new();
let mut iter = plan.devices.iter();
for _ in 0..CONCURRENCY {
if let Some(device) = iter.next() {
in_flight.spawn(publish_one_info(bucket.clone(), device.clone()));
}
}
while let Some(res) = in_flight.join_next().await {
res??;
if let Some(device) = iter.next() {
in_flight.spawn(publish_one_info(bucket.clone(), device.clone()));
}
}
tracing::info!(
devices = plan.devices.len(),
elapsed_ms = started.elapsed().as_millis() as u64,
"seeded DeviceInfo"
);
Ok(())
}
async fn publish_one_info(bucket: kv::Store, device: DevicePlan) -> Result<()> {
let info = DeviceInfo {
device_id: Id::from(device.device_id.clone()),
labels: BTreeMap::from([("group".to_string(), device.cr_name.clone())]),
inventory: None,
updated_at: Utc::now(),
};
let key = device_info_key(&device.device_id);
let payload = serde_json::to_vec(&info)?;
bucket.put(&key, payload.into()).await?;
Ok(())
}
async fn simulate_state_loop(
device: Arc<DevicePlan>,
bucket: kv::Store,
counters: Arc<Counters>,
tick: Duration,
) {
let Ok(deployment) = DeploymentName::try_new(&device.cr_name) else {
return;
};
let state_key = device_state_key(&device.device_id, &deployment);
let mut ticker = tokio::time::interval(tick);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
ticker.tick().await;
let phase = pick_phase();
let ds = DeploymentState {
device_id: Id::from(device.device_id.clone()),
deployment: deployment.clone(),
phase,
last_event_at: Utc::now(),
last_error: matches!(phase, Phase::Failed)
.then(|| format!("synthetic failure @{}", device.device_id)),
};
match serde_json::to_vec(&ds) {
Ok(payload) => match bucket.put(&state_key, payload.into()).await {
Ok(_) => {
counters.state_writes.fetch_add(1, Ordering::Relaxed);
}
Err(_) => {
counters.errors.fetch_add(1, Ordering::Relaxed);
}
},
Err(_) => {
counters.errors.fetch_add(1, Ordering::Relaxed);
}
}
}
}
async fn simulate_heartbeat_loop(
device: Arc<DevicePlan>,
bucket: kv::Store,
counters: Arc<Counters>,
tick: Duration,
) {
let hb_key = device_heartbeat_key(&device.device_id);
let mut ticker = tokio::time::interval(tick);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
ticker.tick().await;
let hb = HeartbeatPayload {
device_id: Id::from(device.device_id.clone()),
at: Utc::now(),
};
if let Ok(payload) = serde_json::to_vec(&hb) {
if bucket.put(&hb_key, payload.into()).await.is_ok() {
counters.heartbeat_writes.fetch_add(1, Ordering::Relaxed);
} else {
counters.errors.fetch_add(1, Ordering::Relaxed);
}
}
}
}
/// Phase distribution mirroring a healthy-ish fleet: mostly Running,
/// a sprinkle of Failed + Pending to exercise the aggregator's
/// transition-handling + last_error logic.
fn pick_phase() -> Phase {
let n: u32 = rand::rng().random_range(0..100);
match n {
0..80 => Phase::Running,
80..90 => Phase::Failed,
_ => Phase::Pending,
}
}

View File

@@ -18,3 +18,4 @@ chrono = { workspace = true, features = ["serde"] }
harmony_types = { path = "../harmony_types" }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
thiserror = { workspace = true }

View File

@@ -0,0 +1,272 @@
//! Fleet-scale wire-format types.
//!
//! Per-concern payloads on dedicated NATS KV buckets:
//!
//! | Type | Bucket | Cadence |
//! |------|--------|---------|
//! | [`DeviceInfo`] | KV `device-info` | on startup + label/inventory change |
//! | [`DeploymentState`] | KV `device-state` | on reconcile phase transition |
//! | [`HeartbeatPayload`] | KV `device-heartbeat` | every 30 s |
//!
//! The operator watches `device-state` directly — KV watch deliveries
//! are ordered and last-writer-wins, so there's no separate event
//! stream or per-write revision to track.
use std::collections::BTreeMap;
use std::fmt;
use chrono::{DateTime, Utc};
use harmony_types::id::Id;
use serde::{Deserialize, Deserializer, Serialize};
use crate::status::{InventorySnapshot, Phase};
/// Deployment CR `metadata.name`, validated for NATS-subject safety.
///
/// Scope: what identifies a Deployment to the agent. Appears in KV
/// keys (`state.<device>.<deployment>`) and every in-memory map
/// keyed by "which deployment." A raw `String` here would let an
/// invalid name (containing a `.`, splitting into extra subject
/// tokens) break routing at runtime.
///
/// Validation:
/// - Not empty.
/// - No `.` (would alias an extra subject token).
/// - No `*` / `>` (NATS wildcards).
/// - No ASCII whitespace.
/// - ≤ 253 bytes (RFC 1123 max, matches Kubernetes name limit).
///
/// The constructor is fallible; deserialization runs the same
/// validation so malformed payloads are rejected at the wire.
#[derive(Debug, Clone, Hash, PartialEq, Eq, Ord, PartialOrd, Serialize)]
#[serde(transparent)]
pub struct DeploymentName(String);
#[derive(Debug, thiserror::Error, PartialEq, Eq)]
pub enum InvalidDeploymentName {
#[error("deployment name must not be empty")]
Empty,
#[error("deployment name must not exceed 253 bytes")]
TooLong,
#[error("deployment name must not contain '.' (would alias an extra NATS subject token)")]
ContainsDot,
#[error("deployment name must not contain NATS wildcards '*' or '>'")]
ContainsWildcard,
#[error("deployment name must not contain whitespace")]
ContainsWhitespace,
}
impl DeploymentName {
pub fn try_new(s: impl Into<String>) -> Result<Self, InvalidDeploymentName> {
let s = s.into();
if s.is_empty() {
return Err(InvalidDeploymentName::Empty);
}
if s.len() > 253 {
return Err(InvalidDeploymentName::TooLong);
}
if s.contains('.') {
return Err(InvalidDeploymentName::ContainsDot);
}
if s.contains('*') || s.contains('>') {
return Err(InvalidDeploymentName::ContainsWildcard);
}
if s.chars().any(|c| c.is_ascii_whitespace()) {
return Err(InvalidDeploymentName::ContainsWhitespace);
}
Ok(Self(s))
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl fmt::Display for DeploymentName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.0)
}
}
impl<'de> Deserialize<'de> for DeploymentName {
fn deserialize<D: Deserializer<'de>>(de: D) -> Result<Self, D::Error> {
let s = String::deserialize(de)?;
Self::try_new(s).map_err(serde::de::Error::custom)
}
}
/// Static-ish per-device facts: routing labels, hardware, agent
/// version. Written to KV key `info.<device_id>` in
/// [`crate::BUCKET_DEVICE_INFO`]. Rewritten by the agent on startup
/// and whenever its labels change — **not** on every heartbeat.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DeviceInfo {
pub device_id: Id,
/// Routing labels. Operator resolves Deployment
/// `targetSelector.matchLabels` against this map.
#[serde(default)]
pub labels: BTreeMap<String, String>,
/// Hardware / OS snapshot. `None` until the first post-startup
/// publish.
#[serde(default)]
pub inventory: Option<InventorySnapshot>,
/// RFC 3339 UTC timestamp of this publish.
pub updated_at: DateTime<Utc>,
}
/// Authoritative current phase for one `(device, deployment)` pair.
/// Written to KV key `state.<device_id>.<deployment>` in
/// [`crate::BUCKET_DEVICE_STATE`]. Deleted when the deployment is
/// removed from the device.
///
/// The operator's KV watch sees every write + delete in order, so
/// this value alone — plus the operator's in-memory belief about
/// the last phase for the pair — is enough to drive the aggregate
/// counters. No separate event stream, no per-write revision.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DeploymentState {
pub device_id: Id,
pub deployment: DeploymentName,
pub phase: Phase,
pub last_event_at: DateTime<Utc>,
#[serde(default)]
pub last_error: Option<String>,
}
/// Tiny liveness ping. Written to KV key `heartbeat.<device_id>` in
/// [`crate::BUCKET_DEVICE_HEARTBEAT`].
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct HeartbeatPayload {
pub device_id: Id,
pub at: DateTime<Utc>,
}
#[cfg(test)]
mod tests {
use super::*;
fn ts(s: &str) -> DateTime<Utc> {
DateTime::parse_from_rfc3339(s).unwrap().with_timezone(&Utc)
}
fn dn(s: &str) -> DeploymentName {
DeploymentName::try_new(s).expect("valid")
}
#[test]
fn deployment_name_accepts_rfc1123() {
assert!(DeploymentName::try_new("hello-world").is_ok());
assert!(DeploymentName::try_new("a").is_ok());
assert!(DeploymentName::try_new("a-b-c-1-2-3").is_ok());
}
#[test]
fn deployment_name_rejects_dot() {
assert_eq!(
DeploymentName::try_new("hello.world"),
Err(InvalidDeploymentName::ContainsDot)
);
}
#[test]
fn deployment_name_rejects_nats_wildcards() {
assert_eq!(
DeploymentName::try_new("hello*"),
Err(InvalidDeploymentName::ContainsWildcard)
);
assert_eq!(
DeploymentName::try_new("hello>"),
Err(InvalidDeploymentName::ContainsWildcard)
);
}
#[test]
fn deployment_name_rejects_empty_and_too_long() {
assert_eq!(
DeploymentName::try_new(""),
Err(InvalidDeploymentName::Empty)
);
assert_eq!(
DeploymentName::try_new("x".repeat(254)),
Err(InvalidDeploymentName::TooLong)
);
}
#[test]
fn deployment_name_rejects_whitespace() {
assert_eq!(
DeploymentName::try_new("hello world"),
Err(InvalidDeploymentName::ContainsWhitespace)
);
assert_eq!(
DeploymentName::try_new("hello\tworld"),
Err(InvalidDeploymentName::ContainsWhitespace)
);
}
#[test]
fn deployment_name_deserialization_validates() {
let json = r#""bad.name""#;
let result: Result<DeploymentName, _> = serde_json::from_str(json);
assert!(result.is_err());
}
#[test]
fn deployment_name_roundtrip() {
let name = dn("hello-world");
let json = serde_json::to_string(&name).unwrap();
assert_eq!(json, r#""hello-world""#);
let back: DeploymentName = serde_json::from_str(&json).unwrap();
assert_eq!(name, back);
}
#[test]
fn deployment_state_roundtrip() {
let original = DeploymentState {
device_id: Id::from("pi-01".to_string()),
deployment: dn("hello-web"),
phase: Phase::Failed,
last_event_at: ts("2026-04-22T10:05:00Z"),
last_error: Some("image pull 429".to_string()),
};
let json = serde_json::to_string(&original).unwrap();
let back: DeploymentState = serde_json::from_str(&json).unwrap();
assert_eq!(original, back);
}
#[test]
fn heartbeat_is_tiny() {
let hb = HeartbeatPayload {
device_id: Id::from("pi-01".to_string()),
at: ts("2026-04-22T10:00:30Z"),
};
let bytes = serde_json::to_vec(&hb).unwrap();
assert!(
bytes.len() < 96,
"heartbeat payload grew to {} bytes: {}",
bytes.len(),
String::from_utf8_lossy(&bytes),
);
}
#[test]
fn device_info_roundtrip() {
let original = DeviceInfo {
device_id: Id::from("pi-01".to_string()),
labels: BTreeMap::from([("group".to_string(), "site-a".to_string())]),
inventory: Some(InventorySnapshot {
hostname: "pi-01".to_string(),
arch: "aarch64".to_string(),
os: "Ubuntu 24.04".to_string(),
kernel: "6.8.0".to_string(),
cpu_cores: 4,
memory_mb: 8192,
agent_version: "0.1.0".to_string(),
}),
updated_at: ts("2026-04-22T10:00:00Z"),
};
let json = serde_json::to_string(&original).unwrap();
let back: DeviceInfo = serde_json::from_str(&json).unwrap();
assert_eq!(original, back);
}
}

View File

@@ -7,47 +7,88 @@
//! here; agent + operator consume the constants directly, and smoke
//! scripts grep for the literal values locked in the tests below.
use crate::fleet::DeploymentName;
/// Operator-written bucket. One entry per `(device, deployment)` pair.
/// Values are the JSON-serialized Score envelope — today
/// `harmony::modules::podman::IotScore`, tomorrow any variant of
/// a polymorphic `Score` enum the framework ships.
pub const BUCKET_DESIRED_STATE: &str = "desired-state";
/// Agent-written bucket. One entry per device at `status.<device_id>`.
/// Values are JSON-serialized [`crate::AgentStatus`].
pub const BUCKET_AGENT_STATUS: &str = "agent-status";
/// Static-ish per-device facts: routing labels, inventory, agent
/// version. Agent rewrites the entry on startup and whenever its
/// labels change. Key format: `info.<device_id>`.
pub const BUCKET_DEVICE_INFO: &str = "device-info";
/// Current reconcile phase for each `(device, deployment)` pair.
/// Agent writes on phase transition; operator watches this bucket
/// to drive CR `.status.aggregate`. Authoritative source of truth
/// for "what's running where." Key format:
/// `state.<device_id>.<deployment>`.
pub const BUCKET_DEVICE_STATE: &str = "device-state";
/// Tiny liveness ping from each device every N seconds. Separate
/// from [`BUCKET_DEVICE_STATE`] so routine heartbeats don't churn
/// the state bucket. Key format: `heartbeat.<device_id>`.
pub const BUCKET_DEVICE_HEARTBEAT: &str = "device-heartbeat";
/// KV key for a `(device, deployment)` pair in [`BUCKET_DESIRED_STATE`].
/// Format: `<device>.<deployment>`.
pub fn desired_state_key(device_id: &str, deployment_name: &str) -> String {
format!("{device_id}.{deployment_name}")
pub fn desired_state_key(device_id: &str, deployment_name: &DeploymentName) -> String {
format!("{device_id}.{}", deployment_name.as_str())
}
/// KV key for a device's last-known status in [`BUCKET_AGENT_STATUS`].
/// Format: `status.<device_id>`.
pub fn status_key(device_id: &str) -> String {
format!("status.{device_id}")
/// KV key for a device's `DeviceInfo` entry in [`BUCKET_DEVICE_INFO`].
/// Format: `info.<device_id>`.
pub fn device_info_key(device_id: &str) -> String {
format!("info.{device_id}")
}
/// KV key for a `(device, deployment)` state entry in
/// [`BUCKET_DEVICE_STATE`]. Format: `state.<device_id>.<deployment>`.
pub fn device_state_key(device_id: &str, deployment_name: &DeploymentName) -> String {
format!("state.{device_id}.{}", deployment_name.as_str())
}
/// KV key for a device's liveness entry in
/// [`BUCKET_DEVICE_HEARTBEAT`]. Format: `heartbeat.<device_id>`.
pub fn device_heartbeat_key(device_id: &str) -> String {
format!("heartbeat.{device_id}")
}
#[cfg(test)]
mod tests {
use super::*;
fn dn(s: &str) -> crate::DeploymentName {
crate::DeploymentName::try_new(s).expect("valid")
}
#[test]
fn desired_state_key_format() {
assert_eq!(desired_state_key("pi-01", "hello-web"), "pi-01.hello-web");
assert_eq!(
desired_state_key("pi-01", &dn("hello-web")),
"pi-01.hello-web"
);
}
#[test]
fn status_key_format() {
assert_eq!(status_key("pi-01"), "status.pi-01");
}
#[test]
fn bucket_names_match_smoke_scripts() {
// These strings are also grepped by iot/scripts/smoke-*.sh —
// flipping them here must be paired with a script update.
fn bucket_names_stable() {
// Flipping these is a cross-component break — operator,
// agent, and smoke scripts all grep for the literal values.
assert_eq!(BUCKET_DESIRED_STATE, "desired-state");
assert_eq!(BUCKET_AGENT_STATUS, "agent-status");
assert_eq!(BUCKET_DEVICE_INFO, "device-info");
assert_eq!(BUCKET_DEVICE_STATE, "device-state");
assert_eq!(BUCKET_DEVICE_HEARTBEAT, "device-heartbeat");
}
#[test]
fn key_formats() {
assert_eq!(device_info_key("pi-01"), "info.pi-01");
assert_eq!(
device_state_key("pi-01", &dn("hello-web")),
"state.pi-01.hello-web"
);
assert_eq!(device_heartbeat_key("pi-01"), "heartbeat.pi-01");
}
}

View File

@@ -3,30 +3,31 @@
//! Harmony's "reconciler" pattern is: a central **operator** writes
//! desired state into NATS JetStream KV; a remote **agent** watches
//! the KV, deserializes each entry as a Score, and drives the host
//! toward that state. This split lets one operator orchestrate a
//! fleet of agents across network boundaries it can't reach
//! directly — IoT devices today, OKD cluster agents or edge-compute
//! reconcilers tomorrow.
//! toward that state. The agent writes back per-device info and
//! per-deployment state into separate KV buckets; the operator reads
//! those to aggregate `.status.aggregate` onto the CR.
//!
//! This crate holds the wire-format bits both sides must agree on:
//! NATS bucket names, KV key formats, and the `AgentStatus`
//! heartbeat payload. The Score types themselves (`PodmanV0Score`,
//! future variants) live in their respective harmony modules
//! consumers import them from there and serialize them over the
//! transport this crate describes.
//! NATS bucket names, KV key formats, and the typed payloads
//! (`DeviceInfo`, `DeploymentState`, `HeartbeatPayload`). The Score
//! types themselves live in their respective harmony modules.
//!
//! **Deliberately lean** — no tokio, no async-nats, no harmony.
//! The on-device agent build pulls it in alongside a minimal
//! async-nats client; the operator pulls it alongside kube-rs.
//! Neither should pay for the other's dependencies.
pub mod fleet;
pub mod kv;
pub mod status;
pub use kv::{BUCKET_AGENT_STATUS, BUCKET_DESIRED_STATE, desired_state_key, status_key};
pub use status::{
AgentStatus, DeploymentPhase, EventEntry, EventSeverity, InventorySnapshot, Phase,
pub use fleet::{
DeploymentName, DeploymentState, DeviceInfo, HeartbeatPayload, InvalidDeploymentName,
};
pub use kv::{
BUCKET_DESIRED_STATE, BUCKET_DEVICE_HEARTBEAT, BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE,
desired_state_key, device_heartbeat_key, device_info_key, device_state_key,
};
pub use status::{InventorySnapshot, Phase};
// Re-exports so consumers (agent, operator) don't need a direct
// harmony_types dependency purely to name the cross-boundary types.

View File

@@ -1,79 +1,7 @@
//! Agent → NATS KV status payload.
//!
//! The agent publishes a rolling status snapshot to the
//! `agent-status` bucket every 30 s (see
//! [`crate::BUCKET_AGENT_STATUS`]). The payload is cumulative and
//! self-contained: every publish is a full picture, so the operator
//! doesn't have to replay history from JetStream to reconstruct
//! current state.
//!
//! Wire-format evolution rule: new fields must be `#[serde(default)]`
//! so older operators keep parsing newer agent payloads, and newer
//! operators keep parsing older ones. Every field below respects
//! that.
//! Shared status primitives reused across the fleet wire format.
use std::collections::BTreeMap;
use chrono::{DateTime, Utc};
use harmony_types::id::Id;
use serde::{Deserialize, Serialize};
/// Rolling heartbeat / status snapshot from a single agent.
///
/// Published at `status.<device_id>` in [`crate::BUCKET_AGENT_STATUS`]
/// on a regular cadence (30 s) and after significant state changes
/// (reconcile success, reconcile failure, image pull start/end).
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct AgentStatus {
/// Echoed from the agent's own config so the operator can
/// cross-check which device it came from if the KV key is ever
/// ambiguous. Serializes transparently as a plain string.
pub device_id: Id,
/// Coarse rollup state. v0 only ever writes `"running"`; richer
/// variants are a v0.1+ concern. A String (not an enum) so old
/// operators parsing this payload don't fail on a new variant.
pub status: String,
/// RFC 3339 UTC timestamp of this publish. Lexicographically
/// comparable against other agent timestamps for freshness
/// checks.
pub timestamp: DateTime<Utc>,
/// Per-deployment reconcile state. Keyed by deployment name
/// (the CR's `metadata.name`). When the agent has no
/// deployments, this is an empty map.
#[serde(default)]
pub deployments: BTreeMap<String, DeploymentPhase>,
/// Bounded ring-buffer of the most recent reconcile events on
/// this device. Used by the operator to surface "what did the
/// agent actually do" in the CR's status without the operator
/// having to replay per-message JetStream streams.
///
/// Agents cap this to the last N entries (typical: 20); operator
/// aggregation shows the first M across the fleet (typical: 10).
#[serde(default)]
pub recent_events: Vec<EventEntry>,
/// Hardware / OS inventory. Published once on startup and on
/// change. `None` means "not yet reported" (fresh agent before
/// first publish). Keeping this optional (rather than a zeroed
/// struct) makes "absence" distinguishable from "zero bytes of
/// disk."
#[serde(default)]
pub inventory: Option<InventorySnapshot>,
}
/// Reconcile phase for a single deployment on one device.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct DeploymentPhase {
/// Current phase of this deployment on this device.
pub phase: Phase,
/// Timestamp of the last phase transition or retry.
pub last_event_at: DateTime<Utc>,
/// Short human-readable error message from the most recent
/// failure, if any. Cleared when the deployment transitions
/// back to `Running`.
#[serde(default)]
pub last_error: Option<String>,
}
/// Coarse state of a single reconcile on one device.
///
/// Deliberately coarse — richer granularity (ImagePulling,
@@ -83,7 +11,7 @@ pub struct DeploymentPhase {
pub enum Phase {
/// Agent has applied the Score and the container is up.
Running,
/// Reconcile hit an error. See `last_error` for the message.
/// Reconcile hit an error. See paired `last_error` for the message.
Failed,
/// Reconcile is in flight or waiting on an external dependency
/// (image pull, network, etc.). Agents may also report this
@@ -91,36 +19,8 @@ pub enum Phase {
Pending,
}
/// One agent-side event worth surfacing to the operator.
///
/// "Event" in the Kubernetes sense: a timestamped short log-like
/// observation, not a structured metric. Used for the
/// `.status.aggregate.recent_events` rollup so an operator seeing
/// `failed: 3` can click through to see the last three error
/// messages.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct EventEntry {
pub at: DateTime<Utc>,
pub severity: EventSeverity,
/// Short human-readable message. Agents should cap this at a
/// reasonable length (~512 chars) to keep the payload under
/// NATS JetStream's per-message limit.
pub message: String,
/// Optional deployment this event relates to. `None` for
/// device-wide events (podman socket bounce, NATS reconnect).
#[serde(default)]
pub deployment: Option<String>,
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize, PartialEq, Eq)]
pub enum EventSeverity {
Info,
Warn,
Error,
}
/// Static-ish facts about the device. Published once per agent
/// lifetime (startup) and republished on change.
/// Static-ish facts about the device. Embedded in
/// [`crate::DeviceInfo`]; republished on change.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct InventorySnapshot {
pub hostname: String,
@@ -133,113 +33,3 @@ pub struct InventorySnapshot {
/// agents that are behind the current release.
pub agent_version: String,
}
#[cfg(test)]
mod tests {
use super::*;
fn ts(s: &str) -> DateTime<Utc> {
DateTime::parse_from_rfc3339(s).unwrap().with_timezone(&Utc)
}
#[test]
fn minimal_status_roundtrip() {
let s = AgentStatus {
device_id: Id::from("pi-01".to_string()),
status: "running".to_string(),
timestamp: ts("2026-04-21T18:15:42Z"),
deployments: BTreeMap::new(),
recent_events: vec![],
inventory: None,
};
let json = serde_json::to_string(&s).unwrap();
let back: AgentStatus = serde_json::from_str(&json).unwrap();
assert_eq!(s, back);
}
#[test]
fn enriched_status_roundtrip() {
let mut deployments = BTreeMap::new();
deployments.insert(
"hello-world".to_string(),
DeploymentPhase {
phase: Phase::Running,
last_event_at: ts("2026-04-21T18:15:42Z"),
last_error: None,
},
);
deployments.insert(
"broken-app".to_string(),
DeploymentPhase {
phase: Phase::Failed,
last_event_at: ts("2026-04-21T18:16:00Z"),
last_error: Some("podman pull: 429 Too Many Requests".to_string()),
},
);
let s = AgentStatus {
device_id: Id::from("pi-01".to_string()),
status: "running".to_string(),
timestamp: ts("2026-04-21T18:15:42Z"),
deployments,
recent_events: vec![
EventEntry {
at: ts("2026-04-21T18:14:00Z"),
severity: EventSeverity::Info,
message: "started hello-world".to_string(),
deployment: Some("hello-world".to_string()),
},
EventEntry {
at: ts("2026-04-21T18:16:00Z"),
severity: EventSeverity::Error,
message: "pull failed".to_string(),
deployment: Some("broken-app".to_string()),
},
],
inventory: Some(InventorySnapshot {
hostname: "pi-01".to_string(),
arch: "aarch64".to_string(),
os: "Ubuntu 24.04".to_string(),
kernel: "6.8.0-1004-raspi".to_string(),
cpu_cores: 4,
memory_mb: 8192,
agent_version: "0.1.0".to_string(),
}),
};
let json = serde_json::to_string(&s).unwrap();
let back: AgentStatus = serde_json::from_str(&json).unwrap();
assert_eq!(s, back);
}
#[test]
fn old_wire_format_parses_into_enriched_struct() {
// Payload shape produced by a pre-Chapter-2 agent. Must
// still deserialize so operators doing a mixed-fleet upgrade
// don't explode.
let json = r#"{
"device_id": "pi-01",
"status": "running",
"timestamp": "2026-04-21T18:15:42Z"
}"#;
let s: AgentStatus = serde_json::from_str(json).unwrap();
assert!(s.deployments.is_empty());
assert!(s.recent_events.is_empty());
assert!(s.inventory.is_none());
}
#[test]
fn wire_keys_present() {
let s = AgentStatus {
device_id: Id::from("pi-01".to_string()),
status: "running".to_string(),
timestamp: ts("2026-04-21T18:15:42Z"),
deployments: BTreeMap::new(),
recent_events: vec![],
inventory: None,
};
let json = serde_json::to_string(&s).unwrap();
assert!(json.contains("\"device_id\":\"pi-01\""), "got {json}");
assert!(json.contains("\"status\":\"running\""));
assert!(json.contains("\"timestamp\":\"2026-04-21T18:15:42Z\""));
}
}

View File

@@ -0,0 +1,126 @@
//! Agent-side publish surface.
//!
//! Thin wrapper around three KV buckets: [`BUCKET_DEVICE_INFO`],
//! [`BUCKET_DEVICE_STATE`], [`BUCKET_DEVICE_HEARTBEAT`].
//!
//! Failure mode: log and swallow. The KV is the source of truth —
//! a dropped put gets corrected on the next reconcile transition
//! or operator watch reconnection.
use async_nats::jetstream::{self, kv};
use harmony_reconciler_contracts::{
BUCKET_DEVICE_HEARTBEAT, BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE, DeploymentName,
DeploymentState, DeviceInfo, HeartbeatPayload, Id, InventorySnapshot, device_heartbeat_key,
device_info_key, device_state_key,
};
use std::collections::BTreeMap;
pub struct FleetPublisher {
device_id: Id,
info_bucket: kv::Store,
state_bucket: kv::Store,
heartbeat_bucket: kv::Store,
}
impl FleetPublisher {
/// Open every bucket the agent needs, creating those that don't
/// exist yet. Idempotent with operator-side creation.
pub async fn connect(client: async_nats::Client, device_id: Id) -> anyhow::Result<Self> {
let jetstream = jetstream::new(client);
let info_bucket = jetstream
.create_key_value(kv::Config {
bucket: BUCKET_DEVICE_INFO.to_string(),
history: 1,
..Default::default()
})
.await?;
let state_bucket = jetstream
.create_key_value(kv::Config {
bucket: BUCKET_DEVICE_STATE.to_string(),
history: 1,
..Default::default()
})
.await?;
let heartbeat_bucket = jetstream
.create_key_value(kv::Config {
bucket: BUCKET_DEVICE_HEARTBEAT.to_string(),
history: 1,
..Default::default()
})
.await?;
Ok(Self {
device_id,
info_bucket,
state_bucket,
heartbeat_bucket,
})
}
/// Publish the agent's static-ish facts. Called at startup and
/// on label change.
pub async fn publish_device_info(
&self,
labels: BTreeMap<String, String>,
inventory: Option<InventorySnapshot>,
) {
let info = DeviceInfo {
device_id: self.device_id.clone(),
labels,
inventory,
updated_at: chrono::Utc::now(),
};
let key = device_info_key(&self.device_id.to_string());
match serde_json::to_vec(&info) {
Ok(payload) => {
if let Err(e) = self.info_bucket.put(&key, payload.into()).await {
tracing::warn!(%key, error = %e, "publish_device_info: kv put failed");
}
}
Err(e) => tracing::warn!(error = %e, "publish_device_info: serialize failed"),
}
}
/// Tiny liveness ping. Called every 30s.
pub async fn publish_heartbeat(&self) {
let hb = HeartbeatPayload {
device_id: self.device_id.clone(),
at: chrono::Utc::now(),
};
let key = device_heartbeat_key(&self.device_id.to_string());
match serde_json::to_vec(&hb) {
Ok(payload) => {
if let Err(e) = self.heartbeat_bucket.put(&key, payload.into()).await {
tracing::debug!(%key, error = %e, "publish_heartbeat: kv put failed");
}
}
Err(e) => tracing::warn!(error = %e, "publish_heartbeat: serialize failed"),
}
}
/// Persist the authoritative current phase for a `(device,
/// deployment)` pair. The operator's watch on the `device-state`
/// bucket picks up this put and updates CR status counters.
pub async fn write_deployment_state(&self, state: &DeploymentState) {
let key = device_state_key(&self.device_id.to_string(), &state.deployment);
match serde_json::to_vec(state) {
Ok(payload) => {
if let Err(e) = self.state_bucket.put(&key, payload.into()).await {
tracing::warn!(%key, error = %e, "write_deployment_state: kv put failed");
}
}
Err(e) => tracing::warn!(error = %e, "write_deployment_state: serialize failed"),
}
}
/// Delete the authoritative current-phase entry, e.g. when the
/// Deployment CR is removed and the agent has torn down the
/// container.
pub async fn delete_deployment_state(&self, deployment: &DeploymentName) {
let key = device_state_key(&self.device_id.to_string(), deployment);
if let Err(e) = self.state_bucket.delete(&key).await {
tracing::debug!(%key, error = %e, "delete_deployment_state: kv delete failed");
}
}
}

View File

@@ -1,4 +1,5 @@
mod config;
mod fleet_publisher;
mod reconciler;
use std::sync::Arc;
@@ -8,14 +9,13 @@ use anyhow::{Context, Result};
use clap::Parser;
use config::{AgentConfig, CredentialSource, TomlFileCredentialSource};
use futures_util::StreamExt;
use harmony_reconciler_contracts::{
AgentStatus, BUCKET_AGENT_STATUS, BUCKET_DESIRED_STATE, Id, InventorySnapshot, status_key,
};
use harmony_reconciler_contracts::{BUCKET_DESIRED_STATE, Id, InventorySnapshot};
use harmony::inventory::Inventory;
use harmony::modules::podman::PodmanTopology;
use harmony::topology::Topology;
use crate::fleet_publisher::FleetPublisher;
use crate::reconciler::Reconciler;
/// ROADMAP §5.6 — agent polls podman every 30s as ground truth; KV watch
@@ -85,37 +85,16 @@ async fn watch_desired_state(
Ok(())
}
async fn report_status(
client: async_nats::Client,
device_id: Id,
reconciler: Arc<Reconciler>,
inventory: Option<InventorySnapshot>,
) -> Result<()> {
let jetstream = async_nats::jetstream::new(client);
let bucket = jetstream
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_AGENT_STATUS.to_string(),
..Default::default()
})
.await?;
let key = status_key(&device_id.to_string());
/// Tiny liveness-only loop: push a `HeartbeatPayload` into the
/// `device-heartbeat` bucket every N seconds. Stays separate from
/// per-deployment state writes so routine pings don't churn the
/// device-state bucket or its watch subscribers.
async fn publish_heartbeat_loop(fleet: Arc<FleetPublisher>) {
let mut interval = tokio::time::interval(Duration::from_secs(30));
interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
interval.tick().await;
let (deployments, recent_events) = reconciler.status_snapshot().await;
let status = AgentStatus {
device_id: device_id.clone(),
status: "running".to_string(),
timestamp: chrono::Utc::now(),
deployments,
recent_events,
inventory: inventory.clone(),
};
let payload = serde_json::to_vec(&status)?;
bucket.put(&key, payload.into()).await?;
tracing::debug!(key = %key, "reported status");
fleet.publish_heartbeat().await;
}
}
@@ -177,10 +156,36 @@ async fn main() -> Result<()> {
tracing::info!(hostname = %inventory.location.name, "inventory loaded");
let inventory_snapshot = local_inventory(&inventory);
let reconciler = Arc::new(Reconciler::new(topology, inventory));
let client = connect_nats(&cfg).await?;
// Publish surface. Opens the three KV buckets (idempotent
// creates). Must be live before the reconciler starts so
// writes on the first desired-state KV watch land on the wire.
let fleet = Arc::new(
FleetPublisher::connect(client.clone(), device_id.clone())
.await
.context("fleet publisher connect")?,
);
tracing::info!("fleet publisher ready");
// Publish DeviceInfo once at startup. Labels are empty on this
// branch — the agent config's `[labels]` section is added in
// the selector-targeting work and flows here once that branch
// merges. Until then, operators will see a DeviceInfo payload
// with an empty label map (matches no deployment selector, which
// is the correct fail-safe behavior for an unconfigured device).
let startup_labels = std::collections::BTreeMap::new();
fleet
.publish_device_info(startup_labels, Some(inventory_snapshot.clone()))
.await;
let reconciler = Arc::new(Reconciler::new(
device_id.clone(),
topology,
inventory,
Some(fleet.clone()),
));
let ctrlc = async {
tokio::signal::ctrl_c().await.ok();
tracing::info!("received SIGINT, shutting down");
@@ -193,21 +198,17 @@ async fn main() -> Result<()> {
Ok::<(), anyhow::Error>(())
};
let watch = watch_desired_state(client.clone(), device_id.clone(), reconciler.clone());
let status = report_status(
client,
device_id,
reconciler.clone(),
Some(inventory_snapshot),
);
let _ = inventory_snapshot; // consumed by the DeviceInfo publish above
let watch = watch_desired_state(client, device_id, reconciler.clone());
let reconcile = reconciler.clone().run_periodic(RECONCILE_INTERVAL);
let heartbeat = publish_heartbeat_loop(fleet);
tokio::select! {
_ = ctrlc => {},
r = sigterm => { r?; }
r = watch => { r?; }
r = status => { r?; }
_ = reconcile => {}
_ = heartbeat => {}
}
Ok(())

View File

@@ -1,18 +1,18 @@
use std::collections::{BTreeMap, HashMap, VecDeque};
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use anyhow::Result;
use chrono::Utc;
use harmony_reconciler_contracts::{
DeploymentPhase as ReportedPhase, EventEntry, EventSeverity, Phase,
};
use harmony_reconciler_contracts::{DeploymentName, DeploymentState, Id, Phase};
use tokio::sync::Mutex;
use harmony::inventory::Inventory;
use harmony::modules::podman::{IotScore, PodmanTopology, PodmanV0Score};
use harmony::score::Score;
use crate::fleet_publisher::FleetPublisher;
/// Cache key → last-seen state, populated by `apply` and consulted by the
/// 30-second periodic tick and the delete path.
struct CachedEntry {
@@ -24,83 +24,81 @@ struct CachedEntry {
score: PodmanV0Score,
}
/// Per-device reconcile status, separate from the desired-state cache
/// so the status reporter can snapshot it without racing the apply
/// path.
#[derive(Default)]
struct StatusState {
deployments: BTreeMap<String, ReportedPhase>,
recent_events: VecDeque<EventEntry>,
}
/// Cap on the ring buffer of recent events. Large enough for the
/// operator's "last 5-10 events" rollup; small enough that the whole
/// AgentStatus payload stays well under the NATS JetStream per-message
/// limit.
const EVENT_RING_CAP: usize = 32;
pub struct Reconciler {
device_id: Id,
topology: Arc<PodmanTopology>,
inventory: Arc<Inventory>,
/// Keyed by NATS KV key (`<device>.<deployment>`). A single entry per
/// KV key — in v0 there is no fan-out from one key to many scores.
state: Mutex<HashMap<String, CachedEntry>>,
status: Mutex<StatusState>,
/// Current phase per deployment, used to decide whether a new
/// write to the `device-state` KV is needed.
phases: Mutex<HashMap<DeploymentName, Phase>>,
/// Publish surface. Optional so unit tests without a live NATS
/// client still work; always populated in the real agent runtime.
fleet: Option<Arc<FleetPublisher>>,
}
impl Reconciler {
pub fn new(topology: Arc<PodmanTopology>, inventory: Arc<Inventory>) -> Self {
pub fn new(
device_id: Id,
topology: Arc<PodmanTopology>,
inventory: Arc<Inventory>,
fleet: Option<Arc<FleetPublisher>>,
) -> Self {
Self {
device_id,
topology,
inventory,
state: Mutex::new(HashMap::new()),
status: Mutex::new(StatusState::default()),
phases: Mutex::new(HashMap::new()),
fleet,
}
}
/// Snapshot of everything the status reporter needs to publish.
/// Returns clones so the caller can serialize without holding
/// locks.
pub async fn status_snapshot(&self) -> (BTreeMap<String, ReportedPhase>, Vec<EventEntry>) {
let status = self.status.lock().await;
(
status.deployments.clone(),
status.recent_events.iter().cloned().collect(),
)
}
/// Record a new phase for a deployment and, if it changed, write
/// the updated [`DeploymentState`] to the KV. Same-phase
/// re-confirmations are no-ops so the periodic reconcile tick
/// doesn't churn the bucket.
async fn apply_phase(
&self,
deployment: &DeploymentName,
phase: Phase,
last_error: Option<String>,
) {
{
let mut phases = self.phases.lock().await;
if phases.get(deployment).copied() == Some(phase) {
return;
}
phases.insert(deployment.clone(), phase);
}
async fn set_phase(&self, deployment: &str, phase: Phase, last_error: Option<String>) {
let mut status = self.status.lock().await;
status.deployments.insert(
deployment.to_string(),
ReportedPhase {
if let Some(publisher) = &self.fleet {
let state = DeploymentState {
device_id: self.device_id.clone(),
deployment: deployment.clone(),
phase,
last_event_at: Utc::now(),
last_error,
},
);
};
publisher.write_deployment_state(&state).await;
}
}
async fn drop_phase(&self, deployment: &str) {
let mut status = self.status.lock().await;
status.deployments.remove(deployment);
}
async fn push_event(
&self,
severity: EventSeverity,
message: String,
deployment: Option<String>,
) {
let mut status = self.status.lock().await;
status.recent_events.push_back(EventEntry {
at: Utc::now(),
severity,
message,
deployment,
});
while status.recent_events.len() > EVENT_RING_CAP {
status.recent_events.pop_front();
/// Clear the in-memory phase for a deployment and delete its KV
/// entry. Idempotent: a delete for a never-applied deployment is
/// a no-op in memory and a harmless tombstone write on the wire.
async fn drop_phase(&self, deployment: &DeploymentName) {
let was_known = {
let mut phases = self.phases.lock().await;
phases.remove(deployment).is_some()
};
if !was_known {
return;
}
if let Some(publisher) = &self.fleet {
publisher.delete_deployment_state(deployment).await;
}
}
@@ -113,15 +111,9 @@ impl Reconciler {
Ok(IotScore::PodmanV0(s)) => s,
Err(e) => {
tracing::warn!(key, error = %e, "failed to deserialize score");
if let Some(name) = deployment.as_deref() {
self.set_phase(name, Phase::Failed, Some(format!("bad payload: {e}")))
if let Some(name) = &deployment {
self.apply_phase(name, Phase::Failed, Some(format!("bad payload: {e}")))
.await;
self.push_event(
EventSeverity::Error,
format!("deserialize failure: {e}"),
Some(name.to_string()),
)
.await;
}
return Ok(());
}
@@ -138,32 +130,20 @@ impl Reconciler {
}
}
if let Some(name) = deployment.as_deref() {
self.set_phase(name, Phase::Pending, None).await;
if let Some(name) = &deployment {
self.apply_phase(name, Phase::Pending, None).await;
}
match self.run_score(key, &incoming).await {
Ok(()) => {
if let Some(name) = deployment.as_deref() {
self.set_phase(name, Phase::Running, None).await;
self.push_event(
EventSeverity::Info,
"reconciled".to_string(),
Some(name.to_string()),
)
.await;
if let Some(name) = &deployment {
self.apply_phase(name, Phase::Running, None).await;
}
}
Err(e) => {
if let Some(name) = deployment.as_deref() {
self.set_phase(name, Phase::Failed, Some(short(&e.to_string())))
if let Some(name) = &deployment {
self.apply_phase(name, Phase::Failed, Some(short(&e.to_string())))
.await;
self.push_event(
EventSeverity::Error,
short(&e.to_string()),
Some(name.to_string()),
)
.await;
}
return Err(e);
}
@@ -189,7 +169,7 @@ impl Reconciler {
let mut state = self.state.lock().await;
let Some(entry) = state.remove(key) else {
tracing::info!(key, "delete for unknown key — nothing to remove");
if let Some(name) = deployment.as_deref() {
if let Some(name) = &deployment {
self.drop_phase(name).await;
}
return Ok(());
@@ -209,14 +189,8 @@ impl Reconciler {
tracing::info!(key, service = %service.name, "removed container");
}
}
if let Some(name) = deployment.as_deref() {
if let Some(name) = &deployment {
self.drop_phase(name).await;
self.push_event(
EventSeverity::Info,
"deployment deleted".to_string(),
Some(name.to_string()),
)
.await;
}
Ok(())
}
@@ -238,24 +212,15 @@ impl Reconciler {
let deployment = deployment_from_key(&key);
match self.run_score(&key, &score).await {
Ok(()) => {
// Keep the phase Running (no-op if already).
// Don't emit an event on idempotent no-change
// ticks — the 30 s cadence would drown the ring.
if let Some(name) = deployment.as_deref() {
self.set_phase(name, Phase::Running, None).await;
if let Some(name) = &deployment {
self.apply_phase(name, Phase::Running, None).await;
}
}
Err(e) => {
tracing::warn!(key, error = %e, "periodic reconcile failed");
if let Some(name) = deployment.as_deref() {
self.set_phase(name, Phase::Failed, Some(short(&e.to_string())))
if let Some(name) = &deployment {
self.apply_phase(name, Phase::Failed, Some(short(&e.to_string())))
.await;
self.push_event(
EventSeverity::Error,
short(&e.to_string()),
Some(name.to_string()),
)
.await;
}
}
}
@@ -286,15 +251,13 @@ impl Reconciler {
}
/// Extract the deployment name from a NATS KV key of the form
/// `<device>.<deployment>`. Returns `None` for keys that don't match
/// that shape (defensive — the agent only ever subscribes to
/// `<device>.>` filters so this should always succeed, but we don't
/// want to crash on a malformed key).
fn deployment_from_key(key: &str) -> Option<String> {
key.split_once('.').map(|(_, rest)| rest.to_string())
/// `<device>.<deployment>`.
fn deployment_from_key(key: &str) -> Option<DeploymentName> {
let (_, rest) = key.split_once('.')?;
DeploymentName::try_new(rest).ok()
}
/// Truncate a long error message so the AgentStatus payload stays
/// Truncate a long error message so the DeploymentState payload stays
/// comfortably below NATS JetStream's per-message limit.
fn short(s: &str) -> String {
const MAX: usize = 512;
@@ -306,3 +269,76 @@ fn short(s: &str) -> String {
cut
}
}
#[cfg(test)]
mod tests {
//! Focused tests for transition detection. Drive `apply_phase` /
//! `drop_phase` directly with an inert topology (no real podman
//! socket) and a `None` FleetPublisher.
use super::*;
use harmony::inventory::Inventory;
use harmony::modules::podman::PodmanTopology;
use std::path::PathBuf;
fn reconciler() -> Reconciler {
let topology = Arc::new(
PodmanTopology::from_unix_socket(PathBuf::from("/nonexistent/for-tests")).unwrap(),
);
let inventory = Arc::new(Inventory::empty());
Reconciler::new(
Id::from("test-device".to_string()),
topology,
inventory,
None,
)
}
fn dn(s: &str) -> DeploymentName {
DeploymentName::try_new(s).expect("valid test name")
}
#[tokio::test]
async fn apply_phase_records_new_phase() {
let r = reconciler();
r.apply_phase(&dn("hello"), Phase::Running, None).await;
let phases = r.phases.lock().await;
assert_eq!(phases.get(&dn("hello")), Some(&Phase::Running));
}
#[tokio::test]
async fn apply_phase_idempotent_for_same_phase() {
let r = reconciler();
r.apply_phase(&dn("hello"), Phase::Running, None).await;
r.apply_phase(&dn("hello"), Phase::Running, None).await;
let phases = r.phases.lock().await;
assert_eq!(phases.len(), 1);
}
#[tokio::test]
async fn apply_phase_transitions_update_phase() {
let r = reconciler();
r.apply_phase(&dn("hello"), Phase::Pending, None).await;
r.apply_phase(&dn("hello"), Phase::Running, None).await;
r.apply_phase(&dn("hello"), Phase::Failed, Some("oom".to_string()))
.await;
let phases = r.phases.lock().await;
assert_eq!(phases.get(&dn("hello")), Some(&Phase::Failed));
}
#[tokio::test]
async fn drop_phase_clears_known_deployment() {
let r = reconciler();
r.apply_phase(&dn("hello"), Phase::Running, None).await;
r.drop_phase(&dn("hello")).await;
let phases = r.phases.lock().await;
assert!(!phases.contains_key(&dn("hello")));
}
#[tokio::test]
async fn drop_phase_on_unknown_deployment_is_noop() {
let r = reconciler();
r.drop_phase(&dn("never-existed")).await;
let phases = r.phases.lock().await;
assert!(phases.is_empty());
}
}

View File

@@ -1,352 +0,0 @@
//! Agent-status → CR-status aggregator.
//!
//! Watches the `agent-status` NATS KV bucket, keeps a per-device
//! snapshot in memory, and periodically recomputes each Deployment
//! CR's `.status.aggregate` subtree from the intersection of its
//! `spec.targetDevices` list and the known device statuses.
//!
//! Runs as a background task alongside the controller. Keeping the
//! controller free of NATS-KV subscription state lets its reconcile
//! loop stay reactive and cheap (just publishing desired state +
//! managing finalizers), while this task handles the slower
//! many-devices-to-one-CR fan-in.
//!
//! Design choices:
//! - **In-memory snapshot map** (device_id → AgentStatus). Rebuilt
//! from JetStream on startup via the watch's initial replay; kept
//! current by watching thereafter. No persistence — the bucket is
//! the source of truth.
//! - **Periodic aggregation tick** (5 s). Cheap (a few BTreeMap
//! lookups + one `patch_status` per CR) and gives predictable
//! operator behaviour for the smoke harness. A push-based
//! "recompute on every Put" would be tighter but adds complexity
//! this v0.1 doesn't need.
//! - **JSON-Merge Patch.** Writes only the `aggregate` subtree, so
//! it composes cleanly with the controller's
//! `observedScoreString` patch.
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::Duration;
use async_nats::jetstream::kv::{Operation, Store};
use futures_util::StreamExt;
use harmony_reconciler_contracts::{AgentStatus, Phase};
use kube::api::{Api, Patch, PatchParams};
use kube::{Client, ResourceExt};
use serde_json::json;
use tokio::sync::Mutex;
use crate::crd::{AggregateEvent, AggregateLastError, Deployment, DeploymentAggregate};
/// Cap on how many events we surface in `DeploymentAggregate.recent_events`.
/// Small enough to keep the CR status compact.
const AGGREGATE_EVENT_CAP: usize = 10;
/// How often the aggregator recomputes + patches.
const AGGREGATE_TICK: Duration = Duration::from_secs(5);
/// Per-device status snapshot keyed by device id string.
pub type StatusSnapshots = Arc<Mutex<BTreeMap<String, AgentStatus>>>;
/// Spawn the aggregator: watch the agent-status bucket into an
/// in-memory map, and periodically fold that map into every
/// Deployment CR's `.status.aggregate`.
pub async fn run(client: Client, status_bucket: Store) -> anyhow::Result<()> {
let snapshots: StatusSnapshots = Arc::new(Mutex::new(BTreeMap::new()));
let watcher = tokio::spawn(watch_status_bucket(status_bucket, snapshots.clone()));
let aggregator = tokio::spawn(aggregate_loop(client, snapshots));
tokio::select! {
r = watcher => r??,
r = aggregator => r??,
}
Ok(())
}
async fn watch_status_bucket(bucket: Store, snapshots: StatusSnapshots) -> anyhow::Result<()> {
tracing::info!("aggregator: watching agent-status bucket");
let mut watch = bucket.watch("status.>").await?;
while let Some(entry) = watch.next().await {
let entry = match entry {
Ok(e) => e,
Err(e) => {
tracing::warn!(error = %e, "aggregator: watch error");
continue;
}
};
let device_id = match device_id_from_status_key(&entry.key) {
Some(id) => id,
None => {
tracing::warn!(key = %entry.key, "aggregator: skipping malformed key");
continue;
}
};
match entry.operation {
Operation::Put => match serde_json::from_slice::<AgentStatus>(&entry.value) {
Ok(status) => {
let mut map = snapshots.lock().await;
map.insert(device_id, status);
}
Err(e) => {
tracing::warn!(key = %entry.key, error = %e, "aggregator: bad status payload");
}
},
Operation::Delete | Operation::Purge => {
let mut map = snapshots.lock().await;
map.remove(&device_id);
}
}
}
Ok(())
}
async fn aggregate_loop(client: Client, snapshots: StatusSnapshots) -> anyhow::Result<()> {
let deployments: Api<Deployment> = Api::all(client.clone());
let mut ticker = tokio::time::interval(AGGREGATE_TICK);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
ticker.tick().await;
if let Err(e) = tick_once(&deployments, &snapshots).await {
tracing::warn!(error = %e, "aggregator: tick failed");
}
}
}
async fn tick_once(
deployments: &Api<Deployment>,
snapshots: &StatusSnapshots,
) -> anyhow::Result<()> {
let crs = deployments.list(&Default::default()).await?;
// Clone the snapshot once per tick so we don't hold the lock
// across network calls.
let snapshot = { snapshots.lock().await.clone() };
for cr in &crs {
let ns = match cr.namespace() {
Some(ns) => ns,
None => continue,
};
let name = cr.name_any();
let aggregate = compute_aggregate(&cr.spec.target_devices, &name, &snapshot);
let status = json!({ "status": { "aggregate": aggregate } });
let api: Api<Deployment> = Api::namespaced(deployments.clone().into_client(), &ns);
if let Err(e) = api
.patch_status(&name, &PatchParams::default(), &Patch::Merge(&status))
.await
{
tracing::warn!(%ns, %name, error = %e, "aggregator: patch failed");
}
}
Ok(())
}
/// Compute the aggregate for one CR from the current snapshot map.
/// Exposed (crate-visible) for unit testing.
pub(crate) fn compute_aggregate(
target_devices: &[String],
deployment_name: &str,
snapshots: &BTreeMap<String, AgentStatus>,
) -> DeploymentAggregate {
let mut agg = DeploymentAggregate::default();
let mut last_error: Option<AggregateLastError> = None;
let mut last_heartbeat: Option<chrono::DateTime<chrono::Utc>> = None;
let mut events: Vec<AggregateEvent> = Vec::new();
for device in target_devices {
let status = match snapshots.get(device) {
Some(s) => s,
None => {
agg.unreported += 1;
continue;
}
};
if last_heartbeat.is_none_or(|t| status.timestamp > t) {
last_heartbeat = Some(status.timestamp);
}
match status.deployments.get(deployment_name) {
Some(phase) => match phase.phase {
Phase::Running => agg.succeeded += 1,
Phase::Failed => {
agg.failed += 1;
let error_at = phase.last_event_at;
let error_msg = phase
.last_error
.clone()
.unwrap_or_else(|| "failed".to_string());
let candidate = AggregateLastError {
device_id: device.clone(),
message: error_msg,
at: error_at.to_rfc3339(),
};
match &last_error {
Some(cur) if cur.at >= candidate.at => {}
_ => last_error = Some(candidate),
}
}
Phase::Pending => agg.pending += 1,
},
None => {
// Device reported but hasn't acknowledged this
// deployment yet.
agg.pending += 1;
}
}
// Collect per-deployment events for the fleet-wide ring.
for ev in &status.recent_events {
if ev.deployment.as_deref() == Some(deployment_name) {
events.push(AggregateEvent {
at: ev.at.to_rfc3339(),
severity: match ev.severity {
harmony_reconciler_contracts::EventSeverity::Info => "Info".to_string(),
harmony_reconciler_contracts::EventSeverity::Warn => "Warn".to_string(),
harmony_reconciler_contracts::EventSeverity::Error => "Error".to_string(),
},
device_id: device.clone(),
message: ev.message.clone(),
deployment: ev.deployment.clone(),
});
}
}
}
// Most recent first; cap.
events.sort_by(|a, b| b.at.cmp(&a.at));
events.truncate(AGGREGATE_EVENT_CAP);
agg.last_error = last_error;
agg.recent_events = events;
agg.last_heartbeat_at = last_heartbeat.map(|t| t.to_rfc3339());
agg
}
/// `status.<device_id>` → `<device_id>`.
fn device_id_from_status_key(key: &str) -> Option<String> {
key.strip_prefix("status.").map(|s| s.to_string())
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{DateTime, Utc};
use harmony_reconciler_contracts::{DeploymentPhase, EventEntry, EventSeverity, Id};
fn ts(s: &str) -> DateTime<Utc> {
DateTime::parse_from_rfc3339(s).unwrap().with_timezone(&Utc)
}
fn snapshot_with(
device: &str,
deployment: &str,
phase: Phase,
err: Option<&str>,
) -> AgentStatus {
let mut deployments = BTreeMap::new();
deployments.insert(
deployment.to_string(),
DeploymentPhase {
phase,
last_event_at: ts("2026-04-22T01:00:00Z"),
last_error: err.map(|s| s.to_string()),
},
);
AgentStatus {
device_id: Id::from(device.to_string()),
status: "running".to_string(),
timestamp: ts("2026-04-22T01:00:00Z"),
deployments,
recent_events: vec![],
inventory: None,
}
}
#[test]
fn aggregate_counts_and_unreported() {
let mut map = BTreeMap::new();
map.insert(
"pi-01".to_string(),
snapshot_with("pi-01", "hello", Phase::Running, None),
);
map.insert(
"pi-02".to_string(),
snapshot_with("pi-02", "hello", Phase::Failed, Some("pull err")),
);
// pi-03 is a target but never reported.
let targets = vec![
"pi-01".to_string(),
"pi-02".to_string(),
"pi-03".to_string(),
];
let agg = compute_aggregate(&targets, "hello", &map);
assert_eq!(agg.succeeded, 1);
assert_eq!(agg.failed, 1);
assert_eq!(agg.pending, 0);
assert_eq!(agg.unreported, 1);
assert_eq!(agg.last_error.as_ref().unwrap().device_id, "pi-02");
assert_eq!(agg.last_error.as_ref().unwrap().message, "pull err");
}
#[test]
fn device_reported_but_no_deployment_entry_is_pending() {
// Agent heartbeated (device known to operator) but hasn't
// acknowledged this specific deployment yet.
let mut map = BTreeMap::new();
map.insert(
"pi-01".to_string(),
AgentStatus {
device_id: Id::from("pi-01".to_string()),
status: "running".to_string(),
timestamp: ts("2026-04-22T01:00:00Z"),
deployments: BTreeMap::new(),
recent_events: vec![],
inventory: None,
},
);
let agg = compute_aggregate(&["pi-01".to_string()], "hello", &map);
assert_eq!(agg.pending, 1);
assert_eq!(agg.unreported, 0);
}
#[test]
fn events_filtered_to_matching_deployment_only() {
let mut status = snapshot_with("pi-01", "hello", Phase::Running, None);
status.recent_events = vec![
EventEntry {
at: ts("2026-04-22T01:00:05Z"),
severity: EventSeverity::Info,
message: "hello reconciled".to_string(),
deployment: Some("hello".to_string()),
},
EventEntry {
at: ts("2026-04-22T01:00:06Z"),
severity: EventSeverity::Info,
message: "other reconciled".to_string(),
deployment: Some("other".to_string()),
},
EventEntry {
at: ts("2026-04-22T01:00:07Z"),
severity: EventSeverity::Info,
message: "generic device event".to_string(),
deployment: None,
},
];
let mut map = BTreeMap::new();
map.insert("pi-01".to_string(), status);
let agg = compute_aggregate(&["pi-01".to_string()], "hello", &map);
assert_eq!(agg.recent_events.len(), 1);
assert_eq!(agg.recent_events[0].message, "hello reconciled");
}
#[test]
fn device_id_from_status_key_happy_and_malformed() {
assert_eq!(
device_id_from_status_key("status.pi-01"),
Some("pi-01".into())
);
assert_eq!(device_id_from_status_key("desired-state.pi-01.x"), None);
}
}

View File

@@ -3,7 +3,7 @@ use std::time::Duration;
use async_nats::jetstream::kv::Store;
use futures_util::StreamExt;
use harmony_reconciler_contracts::desired_state_key;
use harmony_reconciler_contracts::{DeploymentName, desired_state_key};
use kube::api::{Patch, PatchParams};
use kube::runtime::Controller;
use kube::runtime::controller::Action;
@@ -92,8 +92,19 @@ async fn apply(obj: Arc<Deployment>, api: &Api<Deployment>, kv: &Store) -> Resul
return Ok(Action::requeue(Duration::from_secs(300)));
}
// The controller trusts its input: `name` came from a k8s CR's
// metadata.name, which the apiserver already validated to RFC
// 1123. A name that doesn't parse as a `DeploymentName` here
// would mean the operator is running against a cluster with a
// CR name containing a `.` or NATS wildcard — a real bug, but
// one we'd rather surface as a clear error than silently skip.
let deployment_name = DeploymentName::try_new(&name).map_err(|e| {
Error::Kv(format!(
"CR name '{name}' is not a valid DeploymentName: {e}"
))
})?;
for device_id in &obj.spec.target_devices {
let key = kv_key(device_id, &name);
let key = kv_key(device_id, &deployment_name);
kv.put(key.clone(), score_json.clone().into_bytes().into())
.await
.map_err(|e| Error::Kv(e.to_string()))?;
@@ -113,8 +124,13 @@ async fn apply(obj: Arc<Deployment>, api: &Api<Deployment>, kv: &Store) -> Resul
async fn cleanup(obj: Arc<Deployment>, kv: &Store) -> Result<Action, Error> {
let name = obj.name_any();
let deployment_name = DeploymentName::try_new(&name).map_err(|e| {
Error::Kv(format!(
"CR name '{name}' is not a valid DeploymentName: {e}"
))
})?;
for device_id in &obj.spec.target_devices {
let key = kv_key(device_id, &name);
let key = kv_key(device_id, &deployment_name);
kv.delete(&key)
.await
.map_err(|e| Error::Kv(e.to_string()))?;
@@ -127,7 +143,7 @@ fn serialize_score(score: &ScorePayload) -> Result<String, Error> {
Ok(serde_json::to_string(score)?)
}
fn kv_key(device_id: &str, deployment_name: &str) -> String {
fn kv_key(device_id: &str, deployment_name: &DeploymentName) -> String {
desired_state_key(device_id, deployment_name)
}

View File

@@ -105,45 +105,29 @@ pub struct DeploymentStatus {
/// (skip KV write + status patch when the CR is unchanged).
#[serde(skip_serializing_if = "Option::is_none")]
pub observed_score_string: Option<String>,
/// Per-deployment rollup aggregated from the `agent-status`
/// bucket. Present once at least one targeted agent has
/// heartbeated; absent on a freshly-created CR.
/// Per-deployment rollup aggregated from the `device-state` KV
/// bucket. Present once at least one targeted agent has reported;
/// absent on a freshly-created CR.
#[serde(skip_serializing_if = "Option::is_none")]
pub aggregate: Option<DeploymentAggregate>,
}
/// Rollup of per-device `AgentStatus.deployments` entries for this
/// Deployment CR.
/// Rollup of per-device deployment phases for this Deployment CR.
#[derive(Serialize, Deserialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct DeploymentAggregate {
/// Count of devices where the deployment is in each phase.
/// Count of target devices where the deployment is in each phase.
/// Targeted-but-unreported devices are folded into `pending`.
/// Always populated (zeros are valid) so the operator can patch
/// the whole subtree atomically.
pub succeeded: u32,
pub failed: u32,
pub pending: u32,
/// Count of target devices that haven't yet heartbeated at all.
/// "failed to join fleet" vs. "failed to reconcile" — different
/// signals, different remedies.
pub unreported: u32,
/// Device id of the most recent device reporting a failure,
/// with its short error message. Surfaces the top failure to
/// the CR's status without needing per-device subresource
/// lookups.
/// Device id of the most recent device reporting a failure, with
/// its short error message. Cleared when that device transitions
/// back to Running.
#[serde(skip_serializing_if = "Option::is_none")]
pub last_error: Option<AggregateLastError>,
/// Last-N events aggregated across all target devices, most
/// recent first. Operator caps at a handful (see operator
/// controller).
#[serde(default)]
pub recent_events: Vec<AggregateEvent>,
/// Timestamp of the most recent agent heartbeat counted into
/// this aggregate. "Freshness" signal — a CR whose aggregate
/// hasn't advanced in minutes is evidence the whole fleet has
/// gone dark.
#[serde(skip_serializing_if = "Option::is_none")]
pub last_heartbeat_at: Option<String>,
}
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
@@ -153,14 +137,3 @@ pub struct AggregateLastError {
pub message: String,
pub at: String,
}
#[derive(Serialize, Deserialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct AggregateEvent {
pub at: String,
pub severity: String,
pub device_id: String,
pub message: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub deployment: Option<String>,
}

View File

@@ -0,0 +1,542 @@
//! Operator-side aggregator.
//!
//! Watches the `device-state` KV bucket, maintains an in-memory
//! snapshot of every `(device, deployment)` phase, and patches each
//! Deployment CR's `.status.aggregate` as reports arrive.
//!
//! Everything flows through the KV: the watcher delivers historical
//! entries on startup to seed the snapshot, then live Put/Delete
//! events to keep it current. Counters are recomputed per-CR from
//! the snapshot at 1 Hz, for CRs marked dirty since the last tick.
//! No separate event stream, no revision dedup — the KV is ordered
//! last-writer-wins and that's enough.
use std::collections::{HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use async_nats::jetstream::kv::{Operation, Store};
use futures_util::StreamExt;
use harmony_reconciler_contracts::{
BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE, DeploymentName, DeploymentState, DeviceInfo, Phase,
};
use kube::api::{Api, Patch, PatchParams};
use kube::{Client, ResourceExt};
use serde_json::json;
use tokio::sync::Mutex;
use crate::crd::{AggregateLastError, Deployment, DeploymentAggregate};
const PATCH_TICK: Duration = Duration::from_secs(1);
/// (namespace, name) identifying a Deployment CR.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DeploymentKey {
pub namespace: String,
pub name: String,
}
impl DeploymentKey {
pub fn from_cr(cr: &Deployment) -> Option<Self> {
Some(Self {
namespace: cr.namespace()?,
name: cr.name_any(),
})
}
}
/// One `(device, deployment)` pair — the natural key into the states
/// snapshot. Strong-typed so the two fields can't be swapped by
/// accident.
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct DevicePair {
pub device_id: String,
pub deployment: DeploymentName,
}
#[derive(Debug, Default)]
pub struct FleetState {
/// Authoritative per-pair phase snapshot, driven by the KV watch.
pub states: HashMap<DevicePair, DeploymentState>,
/// Routing facts per device. Populated on cold-start + updated
/// by a future device-info watch; labels here feed selector
/// matching.
pub infos: HashMap<String, DeviceInfo>,
/// CR index by deployment name. The KV key space encodes only
/// the deployment name, so we need a name → CR key lookup to
/// surface every namespace that uses that name. Refreshed at
/// the top of each patch tick from the CR list.
pub crs_by_name: HashMap<DeploymentName, Vec<DeploymentKey>>,
/// Most-recent failure surfaced per deployment CR.
pub last_error: HashMap<DeploymentKey, AggregateLastError>,
/// CR keys whose aggregate needs re-patching on the next tick.
pub dirty: HashSet<DeploymentKey>,
}
pub type SharedFleetState = Arc<Mutex<FleetState>>;
/// Does this CR target this device?
///
/// Today: CR lists device ids explicitly. After the selector branch
/// merges: `cr.spec.target_selector.matches(&info.labels)`.
fn cr_targets_device(cr: &Deployment, device_id: &str) -> bool {
cr.spec.target_devices.iter().any(|d| d == device_id)
}
pub async fn run(client: Client, js: async_nats::jetstream::Context) -> anyhow::Result<()> {
let info_bucket = js
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_DEVICE_INFO.to_string(),
..Default::default()
})
.await?;
let state_bucket = js
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_DEVICE_STATE.to_string(),
..Default::default()
})
.await?;
let deployments: Api<Deployment> = Api::all(client);
// Seed infos once so label-based targeting has data to match
// against on the first patch tick. (A future change can replace
// this with a device-info watch.)
let infos = read_device_info(&info_bucket).await?;
let state: SharedFleetState = Arc::new(Mutex::new(FleetState {
infos,
..Default::default()
}));
tracing::info!(
devices = state.lock().await.infos.len(),
"aggregator: startup complete — watching device-state"
);
let watcher_state = state.clone();
let watcher = tokio::spawn(async move {
if let Err(e) = run_state_watcher(state_bucket, watcher_state).await {
tracing::warn!(error = %e, "aggregator: state watcher exited");
}
});
let patch_state = state.clone();
let patch_loop = async move {
let mut ticker = tokio::time::interval(PATCH_TICK);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
ticker.tick().await;
if let Err(e) = patch_tick(&deployments, &patch_state).await {
tracing::warn!(error = %e, "aggregator: patch tick failed");
}
}
};
tokio::select! {
_ = patch_loop => Ok(()),
_ = watcher => Ok(()),
}
}
/// Parse a `device-state` KV key (`state.<device>.<deployment>`) into
/// its component pair.
fn parse_state_key(key: &str) -> Option<DevicePair> {
let rest = key.strip_prefix("state.")?;
let (device, deployment) = rest.split_once('.')?;
Some(DevicePair {
device_id: device.to_string(),
deployment: DeploymentName::try_new(deployment).ok()?,
})
}
async fn run_state_watcher(bucket: Store, state: SharedFleetState) -> anyhow::Result<()> {
// LastPerSubject delivery replays the current value of every key
// first, then streams live updates. Gives us cold-start + steady
// state in a single subscription — no separate KV scan.
let mut watch = bucket.watch_with_history(">").await?;
while let Some(entry_res) = watch.next().await {
let entry = match entry_res {
Ok(e) => e,
Err(e) => {
tracing::warn!(error = %e, "aggregator: watch delivery error");
continue;
}
};
let Some(pair) = parse_state_key(&entry.key) else {
continue;
};
match entry.operation {
Operation::Put => {
let ds: DeploymentState = match serde_json::from_slice(&entry.value) {
Ok(d) => d,
Err(e) => {
tracing::warn!(key = %entry.key, error = %e, "aggregator: bad device_state payload");
continue;
}
};
let mut guard = state.lock().await;
apply_state(&mut guard, pair, ds);
}
Operation::Delete | Operation::Purge => {
let mut guard = state.lock().await;
drop_state(&mut guard, &pair);
}
}
}
Ok(())
}
/// Record a device's latest state. Drops stale writes via the
/// `last_event_at` timestamp, updates `last_error`, and marks every
/// CR whose name matches as dirty.
pub fn apply_state(state: &mut FleetState, pair: DevicePair, ds: DeploymentState) {
if let Some(prev) = state.states.get(&pair) {
if prev.last_event_at > ds.last_event_at {
return;
}
}
let phase = ds.phase;
let device_id = ds.device_id.to_string();
let last_error_msg = ds.last_error.clone();
let at = ds.last_event_at.to_rfc3339();
state.states.insert(pair.clone(), ds);
for key in matching_cr_keys(state, &pair.deployment) {
match phase {
Phase::Failed => {
if let Some(msg) = last_error_msg.as_deref() {
state.last_error.insert(
key.clone(),
AggregateLastError {
device_id: device_id.clone(),
message: msg.to_string(),
at: at.clone(),
},
);
}
}
Phase::Running => {
if let Some(existing) = state.last_error.get(&key) {
if existing.device_id == device_id {
state.last_error.remove(&key);
}
}
}
Phase::Pending => {}
}
state.dirty.insert(key);
}
}
pub fn drop_state(state: &mut FleetState, pair: &DevicePair) {
let Some(removed) = state.states.remove(pair) else {
return;
};
let device_id = removed.device_id.to_string();
for key in matching_cr_keys(state, &pair.deployment) {
if let Some(existing) = state.last_error.get(&key) {
if existing.device_id == device_id {
state.last_error.remove(&key);
}
}
state.dirty.insert(key);
}
}
/// CR keys matching a deployment name, via the index refreshed by
/// [`patch_tick`]. The CR index may be empty for names whose CR
/// hasn't been seen yet — those updates land in `states` and get
/// picked up on the next tick that finds the CR in the kube list.
fn matching_cr_keys(state: &FleetState, deployment: &DeploymentName) -> Vec<DeploymentKey> {
state
.crs_by_name
.get(deployment)
.cloned()
.unwrap_or_default()
}
async fn patch_tick(deployments: &Api<Deployment>, state: &SharedFleetState) -> anyhow::Result<()> {
let crs = deployments.list(&Default::default()).await?.items;
let aggregates = {
let mut guard = state.lock().await;
// Refresh the CR-name index. A CR we haven't seen before is
// automatically marked dirty so the first tick after its
// creation patches an initial aggregate (even all-zero).
let mut next_index: HashMap<DeploymentName, Vec<DeploymentKey>> = HashMap::new();
for cr in &crs {
let Some(cr_key) = DeploymentKey::from_cr(cr) else {
continue;
};
let Ok(deployment_name) = DeploymentName::try_new(&cr_key.name) else {
continue;
};
let was_known = guard
.crs_by_name
.get(&deployment_name)
.map(|v| v.contains(&cr_key))
.unwrap_or(false);
if !was_known {
guard.dirty.insert(cr_key.clone());
}
next_index.entry(deployment_name).or_default().push(cr_key);
}
guard.crs_by_name = next_index;
let dirty_keys: Vec<DeploymentKey> = guard.dirty.drain().collect();
let mut aggs = Vec::with_capacity(dirty_keys.len());
for key in &dirty_keys {
let Some(cr) = crs.iter().find(|c| {
c.namespace().as_deref() == Some(key.namespace.as_str()) && c.name_any() == key.name
}) else {
continue;
};
let agg = compute_aggregate(&guard, cr);
aggs.push((key.clone(), agg));
}
aggs
};
for (key, aggregate) in aggregates {
let api: Api<Deployment> =
Api::namespaced(deployments.clone().into_client(), &key.namespace);
let status = json!({ "status": { "aggregate": aggregate } });
if let Err(e) = api
.patch_status(&key.name, &PatchParams::default(), &Patch::Merge(&status))
.await
{
tracing::warn!(
namespace = %key.namespace,
name = %key.name,
error = %e,
"aggregator: status patch failed"
);
} else {
tracing::debug!(
namespace = %key.namespace,
name = %key.name,
succeeded = aggregate.succeeded,
failed = aggregate.failed,
pending = aggregate.pending,
"aggregator: status patched"
);
}
}
Ok(())
}
/// Build the aggregate for one CR from the current snapshot. Target
/// devices with no state entry count as `pending` — "we asked, they
/// haven't reported yet" folds into the same bucket as "reconcile in
/// flight" so operators see one pending count.
pub fn compute_aggregate(state: &FleetState, cr: &Deployment) -> DeploymentAggregate {
let mut agg = DeploymentAggregate::default();
let Ok(deployment_name) = DeploymentName::try_new(cr.name_any()) else {
return agg;
};
for device_id in &cr.spec.target_devices {
if !cr_targets_device(cr, device_id) {
continue;
}
let pair = DevicePair {
device_id: device_id.clone(),
deployment: deployment_name.clone(),
};
match state.states.get(&pair).map(|s| s.phase) {
Some(Phase::Running) => agg.succeeded += 1,
Some(Phase::Failed) => agg.failed += 1,
Some(Phase::Pending) | None => agg.pending += 1,
}
}
if let Some(cr_key) = DeploymentKey::from_cr(cr) {
agg.last_error = state.last_error.get(&cr_key).cloned();
}
agg
}
async fn read_device_info(bucket: &Store) -> anyhow::Result<HashMap<String, DeviceInfo>> {
let mut out = HashMap::new();
let mut keys = bucket.keys().await?;
while let Some(key_res) = keys.next().await {
let key = key_res?;
let Some(entry) = bucket.entry(&key).await? else {
continue;
};
let Some(device_id) = key.strip_prefix("info.") else {
continue;
};
match serde_json::from_slice::<DeviceInfo>(&entry.value) {
Ok(info) => {
out.insert(device_id.to_string(), info);
}
Err(e) => {
tracing::warn!(%key, error = %e, "aggregator: bad device_info payload");
}
}
}
Ok(out)
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{TimeZone, Utc};
use harmony_reconciler_contracts::Id;
use kube::api::ObjectMeta;
fn dn(s: &str) -> DeploymentName {
DeploymentName::try_new(s).expect("valid test name")
}
fn state(device: &str, deployment: &str, phase: Phase, seconds: i64) -> DeploymentState {
DeploymentState {
device_id: Id::from(device.to_string()),
deployment: dn(deployment),
phase,
last_event_at: Utc.timestamp_opt(1_700_000_000 + seconds, 0).unwrap(),
last_error: None,
}
}
fn cr(namespace: &str, name: &str, devices: &[&str]) -> Deployment {
Deployment {
metadata: ObjectMeta {
name: Some(name.to_string()),
namespace: Some(namespace.to_string()),
..Default::default()
},
spec: crate::crd::DeploymentSpec {
target_devices: devices.iter().map(|s| s.to_string()).collect(),
score: crate::crd::ScorePayload {
type_: "PodmanV0".to_string(),
data: serde_json::json!({}),
},
rollout: crate::crd::Rollout {
strategy: crate::crd::RolloutStrategy::Immediate,
},
},
status: None,
}
}
fn demo_cr() -> Deployment {
cr("iot-demo", "hello", &["pi-01", "pi-02", "pi-03"])
}
fn demo_key() -> DeploymentKey {
DeploymentKey {
namespace: "iot-demo".to_string(),
name: "hello".to_string(),
}
}
fn pair(device: &str, deployment: &str) -> DevicePair {
DevicePair {
device_id: device.to_string(),
deployment: dn(deployment),
}
}
#[test]
fn compute_aggregate_counts_target_devices() {
let mut s = FleetState::default();
s.states.insert(
pair("pi-01", "hello"),
state("pi-01", "hello", Phase::Running, 0),
);
s.states.insert(
pair("pi-02", "hello"),
state("pi-02", "hello", Phase::Failed, 0),
);
// pi-03 unreported → counted as pending
let agg = compute_aggregate(&s, &demo_cr());
assert_eq!(agg.succeeded, 1);
assert_eq!(agg.failed, 1);
assert_eq!(agg.pending, 1);
}
fn seeded_state() -> FleetState {
let mut s = FleetState::default();
s.crs_by_name.insert(dn("hello"), vec![demo_key()]);
s
}
#[test]
fn apply_state_marks_cr_dirty_and_captures_last_error() {
let mut s = seeded_state();
let ds = DeploymentState {
last_error: Some("pull err".to_string()),
..state("pi-01", "hello", Phase::Failed, 0)
};
apply_state(&mut s, pair("pi-01", "hello"), ds);
assert!(s.dirty.contains(&demo_key()));
assert_eq!(s.last_error[&demo_key()].device_id, "pi-01");
assert_eq!(s.last_error[&demo_key()].message, "pull err");
}
#[test]
fn apply_state_clears_last_error_on_return_to_running() {
let mut s = seeded_state();
s.last_error.insert(
demo_key(),
AggregateLastError {
device_id: "pi-01".to_string(),
message: "pull err".to_string(),
at: "".to_string(),
},
);
apply_state(
&mut s,
pair("pi-01", "hello"),
state("pi-01", "hello", Phase::Running, 0),
);
assert!(!s.last_error.contains_key(&demo_key()));
}
#[test]
fn apply_state_ignores_stale_timestamp() {
let mut s = FleetState::default();
apply_state(
&mut s,
pair("pi-01", "hello"),
state("pi-01", "hello", Phase::Running, 10),
);
apply_state(
&mut s,
pair("pi-01", "hello"),
state("pi-01", "hello", Phase::Failed, 5),
);
assert_eq!(s.states[&pair("pi-01", "hello")].phase, Phase::Running);
}
#[test]
fn drop_state_removes_entry_and_clears_last_error() {
let mut s = seeded_state();
s.states.insert(
pair("pi-01", "hello"),
state("pi-01", "hello", Phase::Running, 0),
);
s.last_error.insert(
demo_key(),
AggregateLastError {
device_id: "pi-01".to_string(),
message: "old".to_string(),
at: "".to_string(),
},
);
drop_state(&mut s, &pair("pi-01", "hello"));
assert!(!s.states.contains_key(&pair("pi-01", "hello")));
assert!(!s.last_error.contains_key(&demo_key()));
}
#[test]
fn parse_state_key_roundtrip() {
assert_eq!(
parse_state_key("state.pi-01.hello"),
Some(pair("pi-01", "hello"))
);
assert_eq!(parse_state_key("nope"), None);
assert_eq!(parse_state_key("state.missing-deployment"), None);
}
}

View File

@@ -6,5 +6,5 @@
//! — can import the typed `Deployment`, `DeploymentSpec`,
//! `ScorePayload`, etc. without duplicating them.
pub mod aggregate;
pub mod crd;
pub mod fleet_aggregator;

View File

@@ -1,15 +1,15 @@
mod controller;
mod install;
// `crd` + `aggregate` modules are owned by the library target (see
// `lib.rs`); the binary imports from there so the types aren't
// `crd` + `fleet_aggregator` modules are owned by the library target
// (see `lib.rs`); the binary imports from there so the types aren't
// compiled twice.
use iot_operator_v0::{aggregate, crd};
use iot_operator_v0::{crd, fleet_aggregator};
use anyhow::Result;
use async_nats::jetstream;
use clap::{Parser, Subcommand};
use harmony_reconciler_contracts::{BUCKET_AGENT_STATUS, BUCKET_DESIRED_STATE};
use harmony_reconciler_contracts::BUCKET_DESIRED_STATE;
use kube::Client;
#[derive(Parser)]
@@ -61,7 +61,11 @@ async fn main() -> Result<()> {
}
async fn run(nats_url: &str, bucket: &str) -> Result<()> {
let nats = async_nats::connect(nats_url).await?;
// Short retry loop on the initial connect. Startup races against
// the NATS server becoming ready (k3d loadbalancer accepting TCP
// before the NATS pod answers the protocol handshake), and a
// hard-fail on the very first attempt produces no useful signal.
let nats = connect_with_retry(nats_url).await?;
tracing::info!(url = %nats_url, "connected to NATS");
let js = jetstream::new(nats);
let desired_state_kv = js
@@ -71,22 +75,32 @@ async fn run(nats_url: &str, bucket: &str) -> Result<()> {
})
.await?;
tracing::info!(bucket = %bucket, "KV bucket ready");
let status_kv = js
.create_key_value(jetstream::kv::Config {
bucket: BUCKET_AGENT_STATUS.to_string(),
..Default::default()
})
.await?;
tracing::info!(bucket = %BUCKET_AGENT_STATUS, "agent-status bucket ready");
let client = Client::try_default().await?;
// Controller + aggregator run concurrently. If either returns
// an error, tear down the whole process — kube-rs's Controller
// already handles transient reconcile failures internally.
// Controller (CR → desired-state KV) + aggregator (device-info
// + device-state → CR status). Either failing tears the whole
// process down; kube-rs's Controller already handles transient
// reconcile errors internally.
let ctl_client = client.clone();
tokio::select! {
r = controller::run(ctl_client, desired_state_kv) => r,
r = aggregate::run(client, status_kv) => r,
r = fleet_aggregator::run(client, js) => r,
}
}
async fn connect_with_retry(nats_url: &str) -> Result<async_nats::Client> {
use std::time::Duration;
let mut last_err: Option<anyhow::Error> = None;
for attempt in 0..15 {
match async_nats::connect(nats_url).await {
Ok(c) => return Ok(c),
Err(e) => {
tracing::warn!(attempt, error = %e, "NATS connect failed; retrying");
last_err = Some(e.into());
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}
Err(last_err.unwrap_or_else(|| anyhow::anyhow!("NATS connect failed after retries")))
}

252
iot/scripts/load-test.sh Executable file
View File

@@ -0,0 +1,252 @@
#!/usr/bin/env bash
# Load-test harness for the IoT operator's fleet_aggregator.
#
# Brings up the minimum stack (k3d + in-cluster NATS + CRD + operator)
# with no VM or real agent, then runs the `iot_load_test` binary
# which simulates N devices pushing DeploymentState to NATS.
#
# All stable paths under $WORK_DIR (default /tmp/iot-load-test) so you
# can point kubectl / tail at them while the test is running.
#
# Quick usage:
# iot/scripts/load-test.sh # 100-device default (55 + 9×5)
# HOLD=1 iot/scripts/load-test.sh # leave stack running for exploration
# DEVICES=10000 GROUP_SIZES=5500,500,500,500,500,500,500,500,500,500 \
# DURATION=90 iot/scripts/load-test.sh
#
# While it's running, in another terminal:
# export KUBECONFIG=/tmp/iot-load-test/kubeconfig
# kubectl get deployments.iot.nationtech.io -A -w
# kubectl get deployments.iot.nationtech.io -A \
# -o custom-columns=NAME:.metadata.name,RUN:.status.aggregate.succeeded,FAIL:.status.aggregate.failed,PEND:.status.aggregate.pending
# tail -f /tmp/iot-load-test/operator.log
#
# Set DEBUG=1 to bump RUST_LOG so the operator logs every status patch.
set -euo pipefail
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"
OPERATOR_DIR="$REPO_ROOT/iot/iot-operator-v0"
# ---- config -----------------------------------------------------------------
K3D_BIN="${K3D_BIN:-$HOME/.local/share/harmony/k3d/k3d}"
CLUSTER_NAME="${CLUSTER_NAME:-iot-load}"
NATS_NAMESPACE="${NATS_NAMESPACE:-iot-system}"
NATS_NAME="${NATS_NAME:-iot-nats}"
NATS_NODE_PORT="${NATS_NODE_PORT:-4222}"
NATS_IMAGE="${NATS_IMAGE:-docker.io/library/nats:2.10-alpine}"
DEVICES="${DEVICES:-100}"
GROUP_SIZES="${GROUP_SIZES:-55,5,5,5,5,5,5,5,5,5}"
TICK_MS="${TICK_MS:-1000}"
DURATION="${DURATION:-60}"
NAMESPACE="${NAMESPACE:-iot-load}"
# Keep the stack alive after the test completes so the user can poke
# at CRs + NATS interactively. Ctrl-C to tear everything down.
HOLD="${HOLD:-0}"
# Stable working dir so kubectl + tail targets are predictable.
WORK_DIR="${WORK_DIR:-/tmp/iot-load-test}"
mkdir -p "$WORK_DIR"
KUBECONFIG_FILE="$WORK_DIR/kubeconfig"
OPERATOR_LOG="$WORK_DIR/operator.log"
OPERATOR_PID=""
log() { printf '\033[1;34m[load-test]\033[0m %s\n' "$*"; }
fail() { printf '\033[1;31m[load-test FAIL]\033[0m %s\n' "$*" >&2; exit 1; }
cleanup() {
local rc=$?
log "cleanup…"
if [[ -n "$OPERATOR_PID" ]] && kill -0 "$OPERATOR_PID" 2>/dev/null; then
kill "$OPERATOR_PID" 2>/dev/null || true
wait "$OPERATOR_PID" 2>/dev/null || true
fi
"$K3D_BIN" cluster delete "$CLUSTER_NAME" >/dev/null 2>&1 || true
if [[ $rc -ne 0 && -s "$OPERATOR_LOG" ]]; then
log "operator log at $OPERATOR_LOG (kept for inspection)"
echo "----- operator log tail -----"
tail -n 60 "$OPERATOR_LOG" 2>/dev/null || true
else
# Leave the operator log on success too — cheap, often useful.
log "operator log at $OPERATOR_LOG"
fi
exit $rc
}
trap cleanup EXIT INT TERM
require() { command -v "$1" >/dev/null 2>&1 || fail "missing required tool: $1"; }
require cargo
require kubectl
require podman
require docker
[[ -x "$K3D_BIN" ]] || fail "k3d binary not executable at $K3D_BIN"
# ---- phase 1: k3d cluster ---------------------------------------------------
log "phase 1: create k3d cluster '$CLUSTER_NAME' (host port $NATS_NODE_PORT → loadbalancer)"
"$K3D_BIN" cluster delete "$CLUSTER_NAME" >/dev/null 2>&1 || true
"$K3D_BIN" cluster create "$CLUSTER_NAME" \
--wait --timeout 90s \
-p "${NATS_NODE_PORT}:${NATS_NODE_PORT}@loadbalancer" \
>/dev/null
"$K3D_BIN" kubeconfig get "$CLUSTER_NAME" > "$KUBECONFIG_FILE"
export KUBECONFIG="$KUBECONFIG_FILE"
# ---- phase 2: NATS in-cluster ------------------------------------------------
log "phase 2a: sideload NATS image ($NATS_IMAGE)"
if ! docker image inspect "$NATS_IMAGE" >/dev/null 2>&1; then
if ! podman image inspect "$NATS_IMAGE" >/dev/null 2>&1; then
podman pull "$NATS_IMAGE" >/dev/null || fail "podman pull $NATS_IMAGE failed"
fi
tmptar="$(mktemp -t nats-image.XXXXXX.tar)"
podman save "$NATS_IMAGE" -o "$tmptar" >/dev/null
docker load -i "$tmptar" >/dev/null
rm -f "$tmptar"
fi
"$K3D_BIN" image import "$NATS_IMAGE" -c "$CLUSTER_NAME" >/dev/null
log "phase 2b: install NATS via NatsBasicScore"
(
cd "$REPO_ROOT"
cargo run -q --release -p example_iot_nats_install -- \
--namespace "$NATS_NAMESPACE" \
--name "$NATS_NAME" \
--expose load-balancer
)
kubectl -n "$NATS_NAMESPACE" wait --for=condition=Available \
"deployment/$NATS_NAME" --timeout=120s >/dev/null
log "probing nats://localhost:$NATS_NODE_PORT end-to-end"
for _ in $(seq 1 60); do
(echo >"/dev/tcp/127.0.0.1/$NATS_NODE_PORT") 2>/dev/null && break
sleep 1
done
(echo >"/dev/tcp/127.0.0.1/$NATS_NODE_PORT") 2>/dev/null \
|| fail "TCP localhost:$NATS_NODE_PORT never came up"
# ---- phase 3: CRD + operator ------------------------------------------------
log "phase 3: install CRD"
(
cd "$OPERATOR_DIR"
cargo run -q -- install
)
kubectl wait --for=condition=Established \
"crd/deployments.iot.nationtech.io" --timeout=30s >/dev/null
log "phase 4: start operator"
(
cd "$OPERATOR_DIR"
cargo build -q --release
)
# Default log level exposes the CR patch loop + watch attach; DEBUG=1
# bumps it so every status patch + transition is printed.
if [[ "${DEBUG:-0}" == "1" ]]; then
OPERATOR_RUST_LOG="debug,async_nats=warn,hyper=warn,rustls=warn,kube=info"
else
OPERATOR_RUST_LOG="info,kube_runtime=warn"
fi
NATS_URL="nats://localhost:$NATS_NODE_PORT" \
KV_BUCKET="desired-state" \
RUST_LOG="$OPERATOR_RUST_LOG" \
"$REPO_ROOT/target/release/iot-operator-v0" \
>"$OPERATOR_LOG" 2>&1 &
OPERATOR_PID=$!
log "operator pid=$OPERATOR_PID"
for _ in $(seq 1 30); do
if grep -q "starting Deployment controller" "$OPERATOR_LOG"; then break; fi
if ! kill -0 "$OPERATOR_PID" 2>/dev/null; then fail "operator exited early"; fi
sleep 0.5
done
grep -q "starting Deployment controller" "$OPERATOR_LOG" \
|| fail "operator never logged controller startup"
# ---- explore banner (before the load run so the user can start watching) ----
print_banner() {
cat <<EOF
$(printf '\033[1;32m[load-test]\033[0m stack ready. In another terminal:')
$(printf '\033[1mPoint kubectl at the k3d cluster:\033[0m')
export KUBECONFIG=$KUBECONFIG_FILE
$(printf '\033[1mWatch CRs as they update:\033[0m')
kubectl -n $NAMESPACE get deployments.iot.nationtech.io -w
$(printf '\033[1mSnapshot aggregate columns:\033[0m')
kubectl -n $NAMESPACE get deployments.iot.nationtech.io \\
-o custom-columns=NAME:.metadata.name,SUCCEEDED:.status.aggregate.succeeded,FAILED:.status.aggregate.failed,PENDING:.status.aggregate.pending,LAST_ERR:.status.aggregate.lastError.message
$(printf '\033[1mFull CR status JSON for one CR (first group):\033[0m')
kubectl -n $NAMESPACE get deployments.iot.nationtech.io/load-group-00 -o jsonpath='{.status.aggregate}' | jq
$(printf '\033[1mOperator log:\033[0m')
tail -F $OPERATOR_LOG
$(printf '\033[1mPeek at NATS KV directly (natsbox):\033[0m')
alias natsbox='podman run --rm docker.io/natsio/nats-box:latest nats --server nats://host.containers.internal:$NATS_NODE_PORT'
natsbox kv ls device-state
natsbox kv get device-state 'state.load-dev-00001.load-group-00' --raw
natsbox kv ls device-heartbeat
natsbox kv get device-heartbeat 'heartbeat.load-dev-00001' --raw
EOF
}
print_banner
# ---- phase 5: load test ------------------------------------------------------
log "phase 5: run iot_load_test (devices=$DEVICES, tick=${TICK_MS}ms, duration=${DURATION}s)"
(
cd "$REPO_ROOT"
cargo build -q --release -p example_iot_load_test
)
# `--no-cleanup` keeps the CRs + KV entries around after the run so
# you can inspect steady-state aggregate numbers after duration elapses.
LOAD_ARGS=(
--nats-url "nats://localhost:$NATS_NODE_PORT"
--namespace "$NAMESPACE"
--groups "$GROUP_SIZES"
--tick-ms "$TICK_MS"
--duration-s "$DURATION"
)
if [[ "$HOLD" == "1" ]]; then
LOAD_ARGS+=(--keep)
fi
RUST_LOG="info" "$REPO_ROOT/target/release/iot_load_test" "${LOAD_ARGS[@]}"
# ---- phase 6: operator log stats --------------------------------------------
log "phase 6: operator log summary"
patches="$(grep -c "aggregator: status patched" "$OPERATOR_LOG" 2>/dev/null || echo 0)"
warnings="$(grep -c " WARN " "$OPERATOR_LOG" 2>/dev/null || echo 0)"
errors="$(grep -c " ERROR " "$OPERATOR_LOG" 2>/dev/null || echo 0)"
log " CR status patches logged (DEBUG-level; use DEBUG=1 to surface): $patches"
log " operator warnings: $warnings errors: $errors"
if [[ "$errors" -gt 0 ]]; then
echo "----- operator error lines -----"
grep " ERROR " "$OPERATOR_LOG" | tail -20
fi
# ---- hold open (optional) ---------------------------------------------------
if [[ "$HOLD" == "1" ]]; then
print_banner
log "HOLD=1 — stack is still running. Ctrl-C to tear down."
# Block until user interrupts; cleanup trap does the teardown.
while true; do sleep 60; done
fi
log "PASS"

View File

@@ -136,34 +136,34 @@ case "$ARCH" in
aarch64|arm64) STATUS_TIMEOUT=300 ;;
*) STATUS_TIMEOUT=60 ;;
esac
log "phase 4: wait for agent to report status to NATS (timeout=${STATUS_TIMEOUT}s)"
log "phase 4: wait for agent to report heartbeat to NATS (timeout=${STATUS_TIMEOUT}s)"
wait_for_status() {
local timeout=$1
for _ in $(seq 1 "$timeout"); do
if podman run --rm --network "$NATS_NET_NAME" \
docker.io/natsio/nats-box:latest \
nats --server "nats://$NATS_CONTAINER:4222" kv get agent-status \
"status.$DEVICE_ID" --raw >/dev/null 2>&1; then
nats --server "nats://$NATS_CONTAINER:4222" kv get device-heartbeat \
"heartbeat.$DEVICE_ID" --raw >/dev/null 2>&1; then
return 0
fi
sleep 1
done
return 1
}
wait_for_status "$STATUS_TIMEOUT" || fail "agent-status never appeared for $DEVICE_ID"
log "agent status present on NATS"
wait_for_status "$STATUS_TIMEOUT" || fail "device-heartbeat never appeared for $DEVICE_ID"
log "agent heartbeat present on NATS"
# ---------------------------- phase 5: hard power-cycle, expect recovery ----------------------------
log "phase 5: power-cycle VM (virsh destroy + start) → agent must reconnect to NATS"
nats_status_timestamp() {
# Prints the "timestamp" field of the status.<device> entry, or "".
# Prints the "at" field of the heartbeat.<device> entry, or "".
# Never errors (for `set -e` safety).
podman run --rm --network "$NATS_NET_NAME" \
docker.io/natsio/nats-box:latest \
nats --server "nats://$NATS_CONTAINER:4222" kv get agent-status \
"status.$DEVICE_ID" --raw 2>/dev/null \
| grep -oE '"timestamp":"[^"]+"' \
nats --server "nats://$NATS_CONTAINER:4222" kv get device-heartbeat \
"heartbeat.$DEVICE_ID" --raw 2>/dev/null \
| grep -oE '"at":"[^"]+"' \
| head -1 | cut -d'"' -f4 || true
}

View File

@@ -349,17 +349,17 @@ done
NATSBOX_HOST="podman run --rm docker.io/natsio/nats-box:latest \
nats --server nats://host.containers.internal:$NATS_NODE_PORT"
log "checking agent heartbeat in NATS KV (agent-status bucket)"
log "checking agent heartbeat in NATS KV (device-heartbeat bucket)"
for _ in $(seq 1 30); do
if $NATSBOX_HOST kv get agent-status "status.$DEVICE_ID" --raw \
if $NATSBOX_HOST kv get device-heartbeat "heartbeat.$DEVICE_ID" --raw \
>/dev/null 2>&1; then
break
fi
sleep 2
done
$NATSBOX_HOST kv get agent-status "status.$DEVICE_ID" --raw >/dev/null \
|| fail "agent never published status to NATS"
log "agent heartbeat present: status.$DEVICE_ID"
$NATSBOX_HOST kv get device-heartbeat "heartbeat.$DEVICE_ID" --raw >/dev/null \
|| fail "agent never published heartbeat to NATS"
log "agent heartbeat present: heartbeat.$DEVICE_ID"
# ---- phase 7: either hand off to user, or drive regression ------------------
@@ -508,8 +508,9 @@ $(printf '\033[1mInspect NATS KV (natsbox):\033[0m\n')
alias natsbox='podman run --rm docker.io/natsio/nats-box:latest nats --server nats://host.containers.internal:$NATS_NODE_PORT'
natsbox kv ls desired-state
natsbox kv get desired-state '$DEVICE_ID.$DEPLOY_NAME' --raw
natsbox kv ls agent-status
natsbox kv get agent-status 'status.$DEVICE_ID' --raw
natsbox kv ls device-state
natsbox kv ls device-heartbeat
natsbox kv get device-heartbeat 'heartbeat.$DEVICE_ID' --raw
$(printf '\033[1mHit the deployed nginx:\033[0m\n')
curl http://$VM_IP:${DEPLOY_PORT%%:*}/