Files
harmony/ROADMAP/iot_platform/chapter_4_aggregation_scale.md
Jean-Gabriel Gill-Couture 0decb1ab61 docs(iot): chapter 4 — aggregation architecture at IoT scale (design draft)
Design doc for the aggregation rework. Chapter 2's aggregator
(O(deployments × devices) per tick) works for a 10-device smoke but
doesn't scale past a partner fleet of even modest size. Replaces it
with CQRS-style incrementally-maintained counters driven by
JetStream state-change events, device-authoritative per-device
state keys, and a separate log transport that doesn't touch
JetStream.

Review first, implement after. No runtime code changes in this
commit.

Covers data model (KV buckets, streams, subjects), counter
invariants (transition-based, duplicate-safe), cold-start protocol
(walk once, then consume), CR patch cadence (debounced dirty set),
failure modes, scale back-of-envelope for 1M devices + 10k
deployments, schema migration path (clean break, same CRD
v1alpha1), and eight-milestone landing plan.
2026-04-22 12:40:06 -04:00

18 KiB
Raw Permalink Blame History

Chapter 4 — Aggregation architecture at IoT scale

Status: design draft (2026-04-22)

Design document for the Chapter 4 aggregation rework. Review first, implement after. Supersedes the Chapter 2 aggregator's O(deployments × devices) per-tick recompute, which works for a 10-device smoke but breaks the moment a real fleet lands.

1. Why now

We have no real deployment in the field yet. That's a liability when shipping (no user, no revenue) but a gift when designing: we can move the data model before customers depend on it. After a partner fleet lands, changing the aggregation substrate is a multi-quarter migration. Doing it now is days of work.

Chapter 2's aggregator was the right "make it work" design for a walking-skeleton proof. It's the wrong "make it scale" design for a partner deployment of even a few hundred devices, let alone the fleet sizes the product thesis targets. This chapter replaces it.

2. What's wrong today

Per-tick cost, current design. Every 5 seconds, for each Deployment CR, resolve the selector against the full device snapshot and fold into an aggregate:

O(deployments × devices) per tick
+ 1 kube patch per CR per tick

At 10k deployments × 1M devices, that's 10^10 selector evaluations and 10k apiserver patches every 5 s. Nothing resembles viable there.

What else goes wrong at scale.

  • The operator holds the full fleet snapshot in memory. 1M AgentStatus payloads × a few kB each = GB of heap, dominated by recent_events rings.
  • Agent heartbeats publish the whole AgentStatus every 30 s — a lot of bytes on the wire whose only incremental content is usually a timestamp update.
  • agent-status is a KV bucket. KV is designed for "latest value per key," not "stream of state changes." We've been using it for both roles and paying the worst of each.
  • Logs are nowhere yet (good — this is the moment to put them in the right place before we're committed).

3. Design overview

Shift to a CQRS-style architecture where devices write their authoritative state, and the operator maintains incrementally-updated aggregates driven by state-change events.

  device (N× agents)                 operator
  ──────────────────                 ────────
  current state keys    ───reads─▶   on cold-start:
    (authoritative)                    walk keys → rebuild counters
                                     then: stream consumer
  state-change events   ═ JS stream═▶   ± counters per event
    (delta stream)                     ± update reverse index
                                     on tick (1 Hz):
  device_info keys      ───reads─▶     patch .status for dirty deployments
    (labels, inventory)

  logs      ───at-least-once NATS subj────▶   not stored centrally
                                              (streamed on query)

Three substrates, each chosen for its fit:

  • JetStream KV, per-device keys — device-authoritative state. Cheap to read when needed, never scanned globally at scale.
  • JetStream stream, per-device events — ordered delta feed. Operator consumers replay on restart, consume incrementally during steady state.
  • Plain NATS subjects, logs — at-least-once pub/sub, device-side buffering (~10k lines), streamed on query.

4. Data model

4.1 NATS KV buckets

device-info — static-ish facts per device, infrequent updates.

Key Value Written by Read by
info.<device_id> DeviceInfo (labels, inventory, agent_version) agent on startup + label change operator (selector resolution, inventory display)

device-state — current phase per deployment per device. Authoritative source of truth for "what's running where."

Key Value Written by Read by
state.<device_id>.<deployment_name> DeploymentState (phase, last_event_at, last_error) agent on reconcile transition operator on cold-start only

One key per (device, deployment) pair. Natural TTL via JetStream KV per-key history — lets us cap the keyspace.

device-heartbeat — liveness only. Tiny payload, frequent updates.

Key Value Written by Read by
heartbeat.<device_id> { timestamp } (32 bytes) agent every 30s operator (stale detection)

Separate from device-state so routine heartbeats don't churn the state keys or emit spurious state-change events.

4.2 NATS JetStream stream

device-events — ordered delta feed for operator aggregation.

  • Subject: events.state.<device_id>.<deployment_name>
  • Payload: StateChangeEvent { from: Phase, to: Phase, at, last_error }
  • Retention: time-based (e.g. 24h) — consumers that fall further behind than retention rebuild from device-state KV on recovery.
  • Agents emit one event per phase transition, not per heartbeat.

Separate stream for event log (user-facing reconcile log events):

  • Subject: events.log.<device_id>
  • Payload: LogEvent { at, severity, message, deployment? }
  • Retention: time-based (1h, enough for "show me what happened the last few minutes" queries; the device's in-memory ring holds the rest).

4.3 Log transport (NOT JetStream)

  • Subject: logs.<device_id> — plain pub/sub, at-least-once
  • Not persisted by NATS
  • Device buffers last ~10k lines in a ring buffer
  • Query protocol: request-reply on logs.<device_id>.query
    • Device responds with buffer contents, then streams live tail until the query closes

This is a dedicated transport because structured logs at fleet scale (1M devices × 1k lines/h = 1B messages/h) would crush JetStream's per-subject storage without adding operator-visible value. Operators only look at logs on-demand, per-device; device-side buffering matches the access pattern.

4.4 CRD fields

Minimal change from Chapter 2:

  • .status.aggregate.succeeded | failed | pending — now sourced from counters, not per-tick fold.
  • .status.aggregate.last_error — updated on to: Failed events.
  • .status.aggregate.last_heartbeat_at — from the per-deployment latest event.
  • .status.aggregate.recent_events — bounded per-deployment ring, updated on event arrival.
  • Drop .status.aggregate.unreported (no meaningful definition under selector-based targeting — already removed in the pre-chapter cleanup).
  • Add .status.aggregate.stale: u32 — count of devices matching the selector whose last heartbeat is older than a threshold (default 5 min). This is the replacement for "unreported" that makes sense at scale. Computed on tick from the operator's reverse-indexed view, not per-device query.

4.5 Operator in-memory state

  • CountersHashMap<DeploymentKey, PhaseCounters>, one entry per CR, updated atomically on event arrival.
  • Reverse indexHashMap<DeviceId, HashSet<DeploymentKey>>, updated when a device's labels change or when a CR's selector changes. Lets a state-change event find affected deployments in O(deployments-matching-this-device) rather than O(all-deployments).
  • Last-error rollup — per deployment, the most recent error keyed by timestamp.
  • Recent-events ring — per deployment, bounded by N (e.g. 10).
  • Dirty set — deployments whose aggregate has changed since last patch. Tick reads + clears this set; only dirty deployments get patched.

Operator heap is bounded by fleet + deployment count, not their product.

5. Counter invariants (the contract)

Correctness rests on two rules:

5.1 Device publishes exactly one transition per reconcile outcome

Every reconcile results in a state. If the state differs from the last published state for (device, deployment), the agent:

  1. Writes the new state to state.<device>.<deployment> KV (CAS against expected-revision for multi-writer safety — only one agent process per device, so contention is theoretical).
  2. Publishes a StateChangeEvent to events.state.<device>.<deployment>.

These two writes must be atomic from the agent's perspective — if (1) succeeds and (2) fails (or vice versa), the agent retries until both reach NATS. Worst case: a duplicate event on the stream; counter handles duplicates via from → to structure (see 5.2).

5.2 Counters are driven by transitions, not snapshots

Each event carries from: Phase, to: Phase. Counter update is a single atomic action:

counters[(deployment, from)] -= 1;
counters[(deployment, to)]   += 1;

Duplicates (same from → to replayed) are a no-op if from == current phase for that (device, deployment) — the operator cross-checks the device's current state in the reverse index before applying. A duplicate past event is detected and ignored; a duplicate current event is idempotent anyway (counters converge).

5.3 The bootstrap transition

A device's first-ever event for a deployment has from: None (or a sentinel Unassigned variant): counter update is just to increment.

5.4 Device leaves fleet

When a device's heartbeat goes stale past threshold + grace, OR when its labels no longer match the deployment's selector:

  • Counters are decremented for every deployment the device was previously contributing to (via the reverse index).
  • The device's state keys aren't touched — they're the authoritative record; a device re-joining resumes from them.

5.5 CR created / selector changed

The reverse index + counters are rebuilt for the affected CR by walking device-info + device-state once (O(devices + states) local NATS KV reads). Cheap for a single CR; happens at CR-apply time, not on every tick.

6. Cold-start protocol

On operator process start:

  1. Load CRs — list Deployment CRs via kube API. Build the reverse index skeleton (deployment → selector).
  2. Load device labels — iterate device-info KV keys once. Resolve each device against every CR's selector, populate the reverse index device-side entries. O(devices × CRs), one-time, in-memory. For 1M devices × 10k CRs this is 10^10 op but purely local lookups (BTreeMap matches on label maps); back-of-envelope has it at a few seconds to a minute on a modern CPU.
  3. Rebuild counters — iterate device-state KV keys once. For each state.<device>.<deployment>, look up the matching deployments from the reverse index and increment counters.
  4. Attach stream consumer — durable consumer on events.state.>, starting from the newest sequence at cold-start moment. The KV walk was the "past"; the stream is the "future."
  5. Begin tick loop — patch dirty CRs on a 1 Hz schedule.

Cold-start time dominated by step 2, not step 3. An ArgoCD-style "pause all reconciles during leader election / startup" envelope keeps the CR patches from competing with the cold-start scans.

What if the operator falls behind the stream's retention window? Reset to step 3 (re-walk device-state). The KV is authoritative; the stream is an accelerator.

7. CR status patch cadence

  • Counter updates happen in memory, instantly.
  • The dirty set captures which deployments' aggregates changed since the last patch.
  • A 1 Hz ticker reads + clears the dirty set, patches those CRs.
  • Individual CR patches are debounced to at most once per second — avoids hammering the apiserver when a deployment is mid-rollout and devices are transitioning in a burst.

Steady-state operator → apiserver traffic is proportional to the rate of interesting changes, not to fleet size.

8. Failure modes

Scenario Detection Recovery
Operator crash k8s restarts the pod Cold-start protocol §6
Stream consumer falls behind retention Stream API returns out-of-range Re-run §6 step 3 (re-walk KV)
Agent publishes event but KV write fails Agent-side local retry; event is replayed Counter is idempotent per §5.2
Agent writes KV but event publish fails Agent-side local retry Operator never sees the transition until retry succeeds; stale threshold catches the device if agent is permanently broken
Device's label change lost Heartbeat carries current labels; stale entry aged out Periodic sync (e.g. 1/h) re-scans device-info to catch drift
Duplicate event (retry) from == current in reverse index No-op (§5.2)
Out-of-order event (retry ordering) Sequence number on event Consumer tracks per-(device, deployment) last-applied sequence; old events ignored

9. Scale back-of-envelope

Target: 1M devices, 10k deployments, p50 reconcile rate 1 event per device per hour.

  • Event volume. 1M × (1/3600s) = 278 events/s.
  • Operator event-processing cost. Each event touches a bounded number of in-memory counters (via reverse index). At 278 eps, this is ~1 µs-equivalent of CPU, ~0 network (JetStream local to operator).
  • Operator → apiserver patches. Deployments change at a rate far below event rate; debounced dirty-set drains limit patches to a few per second even during bursty rollouts.
  • Operator memory. Reverse index entries (device_id + set of deployment keys) ≈ 200 bytes × 1M = 200 MB. Counters ≈ 10k × few fields = negligible. Last-error + recent-events rings ≈ 10k × 10 entries × 512 bytes = 50 MB. Total ~250 MB — fine.
  • Cold-start time. 1M KV reads × amortized 0.1 ms (JetStream KV is fast for key iteration) = 100 s. Acceptable for a several-minute-once-per-release recovery window. If it becomes a problem, chunk the walk and resume-from-checkpoint.
  • Stale device sweep. On each tick, O(dirty set × reverse index lookups). Stale detection itself is O(devices-whose-heartbeat-is-old); a second, slower ticker (e.g. 30 s) scans the heartbeat KV for entries older than threshold and emits synthetic "device went stale" events that drive the same counter-decrement path.

10. Schema migration

Deployment CRD is still v1alpha1, not deployed anywhere, so no migration machinery is needed for the CRD itself — we just change the aggregate subtree definition.

harmony-reconciler-contracts::AgentStatus is deprecated by this chapter. Replaced by narrower wire types:

  • DeviceInfo — what info.<device_id> stores
  • DeploymentState — what state.<device>.<dep> stores
  • HeartbeatPayload — what heartbeat.<device_id> stores
  • StateChangeEvent — what events stream emits
  • LogEvent — what event-log stream emits

The old AgentStatus type goes away when the old aggregator goes away. Clean break, same CRD version.

11. Implementation milestones

Landing order, each a reviewable increment:

  1. M1: new contracts crate shapesDeviceInfo, DeploymentState, HeartbeatPayload, StateChangeEvent, LogEvent. Round-trip serde tests. No runtime code changes yet.
  2. M2: agent-side rewrite — agent writes the new KV shapes + publishes state-change events + heartbeats. Old AgentStatus publish path stays in parallel for the smoke to keep passing.
  3. M3: operator-side cold-start protocol — new operator task that walks the new KV buckets and builds in-memory counters. Runs alongside the old aggregator; logs counter parity checks against the legacy aggregator's output so we can verify correctness before switching over.
  4. M4: operator-side event consumer — attach the durable stream consumer, drive counters incrementally. Parity checks still on.
  5. M5: flip CR patch source — the new counter-backed aggregator patches .status.aggregate, the legacy one goes read-only, then deleted in the next commit.
  6. M6: logs subject + query protocol — device-side ring buffer, query API, a first CLI surface (natiq logs device=X or equivalent) that drives it.
  7. M7: synthetic-scale test harness — spin up 1k (then 10k) mock agents in-process, drive a realistic event load through the operator, measure + publish numbers.
  8. M8: delete legacy AgentStatusharmony-reconciler-contracts cleanup, smoke-a4 updates.

M1-M5 can land on one branch; M6 is adjacent work; M7-M8 close out.

12. Open questions

  • Multi-operator HA. The design assumes one operator at a time. Adding HA means either (a) one active + one standby operator with NATS-based leader election, or (b) shared counter state in KV instead of in-memory. (a) is simpler; (b) scales better. Defer until a specific availability target demands it.
  • Counter-KV snapshots. Should we periodically snapshot the in-memory counter state to a counters KV bucket so cold-start can resume from a recent snapshot + a short stream tail, instead of always re-walking device-state? Probably yes once cold-start time becomes an operational concern, but not in the initial cut.
  • Stream retention tuning. 24h for events.state.> is a guess. Real number depends on observed operator downtime p99. Initial setting, tune from operational data.
  • Compaction policy for device-state KV. JetStream KV per-key history can grow unbounded if phases churn. Set max_history_per_key = 1 (keep only latest value) unless there's a reason to keep transition history (there isn't — that's what the events stream is for).
  • Agent crash before publishing state-change event. Transition is durably captured in the agent's local podman state; on agent restart the reconcile loop re-observes the phase and either re-publishes (if it differs from state.<device>.<dep>) or stays silent. Correctness preserved at the cost of event-stream ordering ambiguity during the crash window — acceptable.

13. What this chapter deliberately does not change

  • CRD .spec.target_selector semantics — stays exactly as shipped.
  • Operator's kube-rs controller loop for CR reconcile — stays as is.
  • Helm chart structure (Chapter 3) — orthogonal.
  • Authentication (Chapter Auth) — orthogonal. When that chapter lands, every subject + KV bucket above will be re-scoped under device-specific NATS credentials; the topology above doesn't need to change for that to slot in.