Design doc for the aggregation rework. Chapter 2's aggregator (O(deployments × devices) per tick) works for a 10-device smoke but doesn't scale past a partner fleet of even modest size. Replaces it with CQRS-style incrementally-maintained counters driven by JetStream state-change events, device-authoritative per-device state keys, and a separate log transport that doesn't touch JetStream. Review first, implement after. No runtime code changes in this commit. Covers data model (KV buckets, streams, subjects), counter invariants (transition-based, duplicate-safe), cold-start protocol (walk once, then consume), CR patch cadence (debounced dirty set), failure modes, scale back-of-envelope for 1M devices + 10k deployments, schema migration path (clean break, same CRD v1alpha1), and eight-milestone landing plan.
18 KiB
Chapter 4 — Aggregation architecture at IoT scale
Status: design draft (2026-04-22)
Design document for the Chapter 4 aggregation rework. Review first, implement after. Supersedes the Chapter 2 aggregator's O(deployments × devices) per-tick recompute, which works for a 10-device smoke but breaks the moment a real fleet lands.
1. Why now
We have no real deployment in the field yet. That's a liability when shipping (no user, no revenue) but a gift when designing: we can move the data model before customers depend on it. After a partner fleet lands, changing the aggregation substrate is a multi-quarter migration. Doing it now is days of work.
Chapter 2's aggregator was the right "make it work" design for a walking-skeleton proof. It's the wrong "make it scale" design for a partner deployment of even a few hundred devices, let alone the fleet sizes the product thesis targets. This chapter replaces it.
2. What's wrong today
Per-tick cost, current design. Every 5 seconds, for each Deployment CR, resolve the selector against the full device snapshot and fold into an aggregate:
O(deployments × devices) per tick
+ 1 kube patch per CR per tick
At 10k deployments × 1M devices, that's 10^10 selector evaluations and 10k apiserver patches every 5 s. Nothing resembles viable there.
What else goes wrong at scale.
- The operator holds the full fleet snapshot in memory. 1M
AgentStatuspayloads × a few kB each = GB of heap, dominated byrecent_eventsrings. - Agent heartbeats publish the whole
AgentStatusevery 30 s — a lot of bytes on the wire whose only incremental content is usually a timestamp update. agent-statusis a KV bucket. KV is designed for "latest value per key," not "stream of state changes." We've been using it for both roles and paying the worst of each.- Logs are nowhere yet (good — this is the moment to put them in the right place before we're committed).
3. Design overview
Shift to a CQRS-style architecture where devices write their authoritative state, and the operator maintains incrementally-updated aggregates driven by state-change events.
device (N× agents) operator
────────────────── ────────
current state keys ───reads─▶ on cold-start:
(authoritative) walk keys → rebuild counters
then: stream consumer
state-change events ═ JS stream═▶ ± counters per event
(delta stream) ± update reverse index
on tick (1 Hz):
device_info keys ───reads─▶ patch .status for dirty deployments
(labels, inventory)
logs ───at-least-once NATS subj────▶ not stored centrally
(streamed on query)
Three substrates, each chosen for its fit:
- JetStream KV, per-device keys — device-authoritative state. Cheap to read when needed, never scanned globally at scale.
- JetStream stream, per-device events — ordered delta feed. Operator consumers replay on restart, consume incrementally during steady state.
- Plain NATS subjects, logs — at-least-once pub/sub, device-side buffering (~10k lines), streamed on query.
4. Data model
4.1 NATS KV buckets
device-info — static-ish facts per device, infrequent updates.
| Key | Value | Written by | Read by |
|---|---|---|---|
info.<device_id> |
DeviceInfo (labels, inventory, agent_version) |
agent on startup + label change | operator (selector resolution, inventory display) |
device-state — current phase per deployment per device.
Authoritative source of truth for "what's running where."
| Key | Value | Written by | Read by |
|---|---|---|---|
state.<device_id>.<deployment_name> |
DeploymentState (phase, last_event_at, last_error) |
agent on reconcile transition | operator on cold-start only |
One key per (device, deployment) pair. Natural TTL via JetStream KV per-key history — lets us cap the keyspace.
device-heartbeat — liveness only. Tiny payload, frequent
updates.
| Key | Value | Written by | Read by |
|---|---|---|---|
heartbeat.<device_id> |
{ timestamp } (32 bytes) |
agent every 30s | operator (stale detection) |
Separate from device-state so routine heartbeats don't churn the
state keys or emit spurious state-change events.
4.2 NATS JetStream stream
device-events — ordered delta feed for operator aggregation.
- Subject:
events.state.<device_id>.<deployment_name> - Payload:
StateChangeEvent { from: Phase, to: Phase, at, last_error } - Retention: time-based (e.g. 24h) — consumers that fall further
behind than retention rebuild from
device-stateKV on recovery. - Agents emit one event per phase transition, not per heartbeat.
Separate stream for event log (user-facing reconcile log events):
- Subject:
events.log.<device_id> - Payload:
LogEvent { at, severity, message, deployment? } - Retention: time-based (1h, enough for "show me what happened the last few minutes" queries; the device's in-memory ring holds the rest).
4.3 Log transport (NOT JetStream)
- Subject:
logs.<device_id>— plain pub/sub, at-least-once - Not persisted by NATS
- Device buffers last ~10k lines in a ring buffer
- Query protocol: request-reply on
logs.<device_id>.query- Device responds with buffer contents, then streams live tail until the query closes
This is a dedicated transport because structured logs at fleet scale (1M devices × 1k lines/h = 1B messages/h) would crush JetStream's per-subject storage without adding operator-visible value. Operators only look at logs on-demand, per-device; device-side buffering matches the access pattern.
4.4 CRD fields
Minimal change from Chapter 2:
.status.aggregate.succeeded | failed | pending— now sourced from counters, not per-tick fold..status.aggregate.last_error— updated onto: Failedevents..status.aggregate.last_heartbeat_at— from the per-deployment latest event..status.aggregate.recent_events— bounded per-deployment ring, updated on event arrival.- Drop
.status.aggregate.unreported(no meaningful definition under selector-based targeting — already removed in the pre-chapter cleanup). - Add
.status.aggregate.stale: u32— count of devices matching the selector whose last heartbeat is older than a threshold (default 5 min). This is the replacement for "unreported" that makes sense at scale. Computed on tick from the operator's reverse-indexed view, not per-device query.
4.5 Operator in-memory state
- Counters —
HashMap<DeploymentKey, PhaseCounters>, one entry per CR, updated atomically on event arrival. - Reverse index —
HashMap<DeviceId, HashSet<DeploymentKey>>, updated when a device's labels change or when a CR's selector changes. Lets a state-change event find affected deployments in O(deployments-matching-this-device) rather than O(all-deployments). - Last-error rollup — per deployment, the most recent error keyed by timestamp.
- Recent-events ring — per deployment, bounded by N (e.g. 10).
- Dirty set — deployments whose aggregate has changed since last patch. Tick reads + clears this set; only dirty deployments get patched.
Operator heap is bounded by fleet + deployment count, not their product.
5. Counter invariants (the contract)
Correctness rests on two rules:
5.1 Device publishes exactly one transition per reconcile outcome
Every reconcile results in a state. If the state differs from the
last published state for (device, deployment), the agent:
- Writes the new state to
state.<device>.<deployment>KV (CAS against expected-revision for multi-writer safety — only one agent process per device, so contention is theoretical). - Publishes a
StateChangeEventtoevents.state.<device>.<deployment>.
These two writes must be atomic from the agent's perspective — if
(1) succeeds and (2) fails (or vice versa), the agent retries until
both reach NATS. Worst case: a duplicate event on the stream;
counter handles duplicates via from → to structure (see 5.2).
5.2 Counters are driven by transitions, not snapshots
Each event carries from: Phase, to: Phase. Counter update is a
single atomic action:
counters[(deployment, from)] -= 1;
counters[(deployment, to)] += 1;
Duplicates (same from → to replayed) are a no-op if from ==
current phase for that (device, deployment) — the operator
cross-checks the device's current state in the reverse index before
applying. A duplicate past event is detected and ignored; a duplicate
current event is idempotent anyway (counters converge).
5.3 The bootstrap transition
A device's first-ever event for a deployment has from: None (or a
sentinel Unassigned variant): counter update is just to
increment.
5.4 Device leaves fleet
When a device's heartbeat goes stale past threshold + grace, OR when its labels no longer match the deployment's selector:
- Counters are decremented for every deployment the device was previously contributing to (via the reverse index).
- The device's state keys aren't touched — they're the authoritative record; a device re-joining resumes from them.
5.5 CR created / selector changed
The reverse index + counters are rebuilt for the affected CR by
walking device-info + device-state once (O(devices + states)
local NATS KV reads). Cheap for a single CR; happens at CR-apply
time, not on every tick.
6. Cold-start protocol
On operator process start:
- Load CRs — list
DeploymentCRs via kube API. Build the reverse index skeleton (deployment → selector). - Load device labels — iterate
device-infoKV keys once. Resolve each device against every CR's selector, populate the reverse index device-side entries. O(devices × CRs), one-time, in-memory. For 1M devices × 10k CRs this is 10^10 op but purely local lookups (BTreeMap matches on label maps); back-of-envelope has it at a few seconds to a minute on a modern CPU. - Rebuild counters — iterate
device-stateKV keys once. For eachstate.<device>.<deployment>, look up the matching deployments from the reverse index and increment counters. - Attach stream consumer — durable consumer on
events.state.>, starting from the newest sequence at cold-start moment. The KV walk was the "past"; the stream is the "future." - Begin tick loop — patch dirty CRs on a 1 Hz schedule.
Cold-start time dominated by step 2, not step 3. An ArgoCD-style "pause all reconciles during leader election / startup" envelope keeps the CR patches from competing with the cold-start scans.
What if the operator falls behind the stream's retention window?
Reset to step 3 (re-walk device-state). The KV is authoritative;
the stream is an accelerator.
7. CR status patch cadence
- Counter updates happen in memory, instantly.
- The dirty set captures which deployments' aggregates changed since the last patch.
- A 1 Hz ticker reads + clears the dirty set, patches those CRs.
- Individual CR patches are debounced to at most once per second — avoids hammering the apiserver when a deployment is mid-rollout and devices are transitioning in a burst.
Steady-state operator → apiserver traffic is proportional to the rate of interesting changes, not to fleet size.
8. Failure modes
| Scenario | Detection | Recovery |
|---|---|---|
| Operator crash | k8s restarts the pod | Cold-start protocol §6 |
| Stream consumer falls behind retention | Stream API returns out-of-range | Re-run §6 step 3 (re-walk KV) |
| Agent publishes event but KV write fails | Agent-side local retry; event is replayed | Counter is idempotent per §5.2 |
| Agent writes KV but event publish fails | Agent-side local retry | Operator never sees the transition until retry succeeds; stale threshold catches the device if agent is permanently broken |
| Device's label change lost | Heartbeat carries current labels; stale entry aged out | Periodic sync (e.g. 1/h) re-scans device-info to catch drift |
| Duplicate event (retry) | from == current in reverse index |
No-op (§5.2) |
| Out-of-order event (retry ordering) | Sequence number on event | Consumer tracks per-(device, deployment) last-applied sequence; old events ignored |
9. Scale back-of-envelope
Target: 1M devices, 10k deployments, p50 reconcile rate 1 event per device per hour.
- Event volume. 1M × (1/3600s) = 278 events/s.
- Operator event-processing cost. Each event touches a bounded number of in-memory counters (via reverse index). At 278 eps, this is ~1 µs-equivalent of CPU, ~0 network (JetStream local to operator).
- Operator → apiserver patches. Deployments change at a rate far below event rate; debounced dirty-set drains limit patches to a few per second even during bursty rollouts.
- Operator memory. Reverse index entries (device_id + set of deployment keys) ≈ 200 bytes × 1M = 200 MB. Counters ≈ 10k × few fields = negligible. Last-error + recent-events rings ≈ 10k × 10 entries × 512 bytes = 50 MB. Total ~250 MB — fine.
- Cold-start time. 1M KV reads × amortized 0.1 ms (JetStream KV is fast for key iteration) = 100 s. Acceptable for a several-minute-once-per-release recovery window. If it becomes a problem, chunk the walk and resume-from-checkpoint.
- Stale device sweep. On each tick, O(dirty set × reverse index lookups). Stale detection itself is O(devices-whose-heartbeat-is-old); a second, slower ticker (e.g. 30 s) scans the heartbeat KV for entries older than threshold and emits synthetic "device went stale" events that drive the same counter-decrement path.
10. Schema migration
Deployment CRD is still v1alpha1, not deployed anywhere, so no
migration machinery is needed for the CRD itself — we just change
the aggregate subtree definition.
harmony-reconciler-contracts::AgentStatus is deprecated by this
chapter. Replaced by narrower wire types:
DeviceInfo— whatinfo.<device_id>storesDeploymentState— whatstate.<device>.<dep>storesHeartbeatPayload— whatheartbeat.<device_id>storesStateChangeEvent— what events stream emitsLogEvent— what event-log stream emits
The old AgentStatus type goes away when the old aggregator
goes away. Clean break, same CRD version.
11. Implementation milestones
Landing order, each a reviewable increment:
- M1: new contracts crate shapes —
DeviceInfo,DeploymentState,HeartbeatPayload,StateChangeEvent,LogEvent. Round-trip serde tests. No runtime code changes yet. - M2: agent-side rewrite — agent writes the new KV shapes +
publishes state-change events + heartbeats. Old
AgentStatuspublish path stays in parallel for the smoke to keep passing. - M3: operator-side cold-start protocol — new operator task that walks the new KV buckets and builds in-memory counters. Runs alongside the old aggregator; logs counter parity checks against the legacy aggregator's output so we can verify correctness before switching over.
- M4: operator-side event consumer — attach the durable stream consumer, drive counters incrementally. Parity checks still on.
- M5: flip CR patch source — the new counter-backed aggregator
patches
.status.aggregate, the legacy one goes read-only, then deleted in the next commit. - M6: logs subject + query protocol — device-side ring buffer,
query API, a first CLI surface (
natiq logs device=Xor equivalent) that drives it. - M7: synthetic-scale test harness — spin up 1k (then 10k) mock agents in-process, drive a realistic event load through the operator, measure + publish numbers.
- M8: delete legacy
AgentStatus—harmony-reconciler-contractscleanup, smoke-a4 updates.
M1-M5 can land on one branch; M6 is adjacent work; M7-M8 close out.
12. Open questions
- Multi-operator HA. The design assumes one operator at a time. Adding HA means either (a) one active + one standby operator with NATS-based leader election, or (b) shared counter state in KV instead of in-memory. (a) is simpler; (b) scales better. Defer until a specific availability target demands it.
- Counter-KV snapshots. Should we periodically snapshot the
in-memory counter state to a
countersKV bucket so cold-start can resume from a recent snapshot + a short stream tail, instead of always re-walkingdevice-state? Probably yes once cold-start time becomes an operational concern, but not in the initial cut. - Stream retention tuning. 24h for
events.state.>is a guess. Real number depends on observed operator downtime p99. Initial setting, tune from operational data. - Compaction policy for
device-stateKV. JetStream KV per-key history can grow unbounded if phases churn. Setmax_history_per_key = 1(keep only latest value) unless there's a reason to keep transition history (there isn't — that's what the events stream is for). - Agent crash before publishing state-change event. Transition
is durably captured in the agent's local podman state; on agent
restart the reconcile loop re-observes the phase and either
re-publishes (if it differs from
state.<device>.<dep>) or stays silent. Correctness preserved at the cost of event-stream ordering ambiguity during the crash window — acceptable.
13. What this chapter deliberately does not change
- CRD
.spec.target_selectorsemantics — stays exactly as shipped. - Operator's kube-rs controller loop for CR reconcile — stays as is.
- Helm chart structure (Chapter 3) — orthogonal.
- Authentication (Chapter Auth) — orthogonal. When that chapter lands, every subject + KV bucket above will be re-scoped under device-specific NATS credentials; the topology above doesn't need to change for that to slot in.