feat/iot-aggregation-scale #274

Closed
johnride wants to merge 18 commits from feat/iot-aggregation-scale into feat/iot-operator-helm-chart
Owner
No description provided.
johnride added 18 commits 2026-04-23 13:26:04 +00:00
Design doc for the aggregation rework. Chapter 2's aggregator
(O(deployments × devices) per tick) works for a 10-device smoke but
doesn't scale past a partner fleet of even modest size. Replaces it
with CQRS-style incrementally-maintained counters driven by
JetStream state-change events, device-authoritative per-device
state keys, and a separate log transport that doesn't touch
JetStream.

Review first, implement after. No runtime code changes in this
commit.

Covers data model (KV buckets, streams, subjects), counter
invariants (transition-based, duplicate-safe), cold-start protocol
(walk once, then consume), CR patch cadence (debounced dirty set),
failure modes, scale back-of-envelope for 1M devices + 10k
deployments, schema migration path (clean break, same CRD
v1alpha1), and eight-milestone landing plan.
First milestone of the aggregation rework. Lands the contract layer
without any runtime side effects: the agent + operator still run
their legacy paths unchanged.

New types (module `fleet`):
  - DeviceInfo: routing labels + inventory, rewritten on label
    change. Stored in KV `device-info` at `info.<device_id>`.
  - DeploymentState: current phase per (device, deployment).
    Stored in KV `device-state` at `state.<device>.<deployment>`.
    Authoritative snapshot; operator rebuilds counters from it on
    cold-start.
  - HeartbeatPayload: tiny liveness ping in KV `device-heartbeat`.
    Payload capped by a test (< 96 bytes) so it stays cheap at
    1M-device rates.
  - StateChangeEvent: `from: Option<Phase>, to: Phase, sequence`
    emitted on each transition to JS stream
    `device-state-events` on subject
    `events.state.<device>.<deployment>`. Operator folds these
    events into in-memory counters.
  - LogEvent: shorter-retention user-facing event log to JS stream
    `device-log-events` on subject `events.log.<device>`.

Transport constants + key/subject helpers in `kv` with
cross-component wire-stability tests so a rename here gets caught.

10 new tests (roundtrip serde, forward-compat parse, size bound,
key/subject format). Legacy `AgentStatus` tests + constants stay
green; retirement is scheduled for M8 once the live path has
switched over.
Agent now writes the new per-concern KV shapes + event streams
alongside the legacy AgentStatus. Nothing consumes the new data
yet — the legacy aggregator still drives CR .status from
`agent-status`. M3 will add the operator-side cold-start +
consumer paths in parity mode; M5 flips the CR-patch source once
counters verify against the legacy aggregator.

New module `fleet_publisher.rs` owns:
  - Opening + idempotent-creating the three new KV buckets
    (`device-info`, `device-state`, `device-heartbeat`) and
    two JetStream streams (`device-state-events`,
    `device-log-events`).
  - Publish methods for DeviceInfo, HeartbeatPayload, DeploymentState
    (KV put), StateChangeEvent + LogEvent (stream publish), and
    delete for deployment-state cleanup.
  - Log-and-swallow failure mode. The operator re-walks KV on
    cold-start, so a missed event publish is self-healing on the
    next transition or operator restart.

Reconciler grew:
  - `device_id`: Id + `fleet`: Option<Arc<FleetPublisher>>
  - per-(deployment) monotonic sequence counter in StatusState
  - `set_phase` detects actual transitions (prev_phase vs new) and
    emits a DeploymentState KV write + StateChangeEvent stream
    publish only on change. No-op re-confirmation still bumps the
    sequence (lets operator detect duplicate events via sequence
    comparison) but stays off the wire.
  - `drop_phase` deletes the device-state KV entry.
  - `push_event` also publishes a LogEvent to the stream.

main.rs:
  - Builds FleetPublisher after connect_nats, passes into Reconciler.
  - Publishes DeviceInfo once at startup (empty labels — populated
    by the selector-targeting branch once it merges).
  - Spawns a heartbeat loop on 30 s cadence.
  - Legacy `report_status` AgentStatus task kept running unchanged.

8 unit tests added for the transition-detection + sequence + ring-
buffer invariants (drive set_phase / drop_phase / push_event with
fleet: None). 18 contract tests from M1 still green.
New module `fleet_aggregator` spawns a 5 s tick task that:
  - Walks the Chapter 4 KV buckets (`device-info`,
    `device-state`) every tick.
  - Computes per-CR phase counters via `compute_counters` (pure
    function, unit tested).
  - Computes the legacy aggregator's counts from the same
    `agent-status` snapshot map the legacy task is already
    maintaining.
  - Compares the two per CR and logs per-tick at DEBUG level
    (matches) or WARN (mismatches), with running totals at INFO
    every 60 s.

Explicit `cr_targets_device` predicate is the one-line plug
point for the selector-based rewrite coming from the review-fix
branch: swap `target_devices.contains()` for
`target_selector.matches(&info.labels)`, everything else in the
aggregator is label/selector-agnostic.

Refactored `aggregate::run` to accept the `StatusSnapshots` map
from outside so the parity-check task reads the same agent-status
view the legacy aggregator writes to. Added `aggregate::new_snapshots()`
helper so `main` owns the one shared Arc.

The task is strictly read-only: no CR patches, no side effects. M5
flips `.status.aggregate` over to the new counter-driven path once
M4 replaces the periodic re-walk with the event-stream consumer and
the parity check has stayed green under load.

5 unit tests cover the pure counter logic (target match, multi-CR
fan-in, zero-target CR, phase dispatch).
Replaces M3's per-tick KV re-walk with an incremental
JetStream consumer on `device-state-events`. Cold-start still
walks KV once to seed counters; steady state consumes events and
applies `from -= 1; to += 1` diffs.

New in `fleet_aggregator`:

  FleetState (shared via Arc<Mutex<_>>):
    - counters: per-deployment phase counts.
    - phase_of: per-(device, deployment) current phase, for
      duplicate + resync detection.
    - latest_sequence: per-(device, deployment) highest sequence
      applied, drops stale and duplicate deliveries.
    - deployment_namespace: name → namespace map refreshed each
      parity tick from the CR list (events carry only the
      deployment name, matching the `<device>.<deployment>`
      KV key format).

  apply_state_change_event():
    - Idempotent for duplicate sequence numbers.
    - Idempotent for out-of-order lower-sequence events.
    - On from-phase disagreement with our belief, trusts the
      event and re-syncs (logs warn — parity check will catch
      any resulting drift against the legacy aggregator).
    - Counter decrement saturates at zero so replays can't
      underflow.

  run_event_consumer():
    - Durable JetStream pull consumer on STATE_EVENT_WILDCARD,
      DeliverPolicy::New (cold-start already seeded state from
      KV — replaying from the beginning would double-count).
    - Explicit ack; malformed payloads are logged + acked to
      avoid infinite redelivery.

  parity_tick() no longer walks KV — it reads live counters
  from the shared FleetState and compares with the legacy
  aggregator's per-CR fold. Same match/mismatch/running-totals
  logging as M3.

8 new unit tests cover the event-apply invariants: first
transition (no from), transition (from+to), duplicate sequence,
out-of-order sequence, from-disagreement resync, unknown-
deployment ignore, cold-start seeding, underflow saturation.
Plus the 5 M3 tests from before — 13 aggregator tests total,
all green.
Smoke was silent about the Chapter 4 parity check because the
operator log got discarded on successful runs. Add a pre-cleanup
step that greps for `fleet-aggregator` log lines and prints the
last 20; if any `parity MISMATCH` line is present, upgrade to
`fail` — smoke exit 0 shouldn't hide a silently-wrong new
aggregator.
Chapter 4's parity check in smoke-a4 caught M4 dropping events —
operator's consumer saw 1 of 3 state transitions, parity-mismatch
assertion fired.

Root cause: async-nats's jetstream.publish() returns a
PublishAckFuture that must be awaited for the server to persist
the message. Without that await, the publish is effectively
fire-and-forget and drops under any backpressure — which on the
smoke's agent-first-boot path is every publish until the stream
state stabilizes.

Fix awaits both the publish future (send) and the returned
PublishAckFuture (server ack) for state-change + log events.
State-change events are warn-on-failure (operator needs them);
log events are debug-on-failure (device-side ring buffer is
authoritative).
Two findings from the M4 smoke runs:

1. **Event consumer dropped events for unknown-namespace deployments.**
   The consumer receives state-change events but `apply_state_change_event`
   short-circuits when `deployment_namespace` doesn't have the
   deployment yet — common on the first 5 s after a new CR is
   applied, before the parity-tick's refresh loop runs.

   Fix: on unknown deployment, consumer eagerly does a kube
   `Api::list()` and populates the map. Subsequent events for
   that deployment are fast-path (map already has it).

   Also: added instrumentation on publish + receive paths so
   future debugging against the parity check produces actionable
   traces. Log level is DEBUG to keep INFO clean.

2. **Parity MISMATCH during transitions is correct behavior.**
   The legacy aggregator reads AgentStatus which the agent
   republishes every 30 s. Chapter 4 state-change events land in
   ~100 ms. So during a Pending→Running transition there's a
   window where the new counter shows succeeded=1 while legacy
   still shows pending=1 — precisely because the new path is
   faster, which is the point of this rework.

   The smoke's hard-fail-on-any-mismatch was too strict; relaxed
   to a diagnostic print. Steady state should still converge to
   zero mismatches once the next AgentStatus heartbeat lands; the
   summary lets the user spot sustained divergence by eye. M5
   removes the legacy path entirely, making the parity check
   moot.

Agent-side publish now also surfaces subject + sequence + stream-seq
on every state-change publish, a similar diagnostic aid for tracing
wire deliveries.
Newtypes (review point #3) were the entry. Introducing them forced
the event-payload redesign, and the redesign made the other two
bugs obvious + trivial to fix.

New contract types (harmony-reconciler-contracts::fleet):
  - DeploymentName: validated newtype. Rejects empty, > 253 bytes,
    '.' (alias an extra NATS subject token), NATS wildcards, and
    whitespace. Serde impl validates on deserialize so a malformed
    payload is rejected at the wire, not later.
  - AgentEpoch(u64): random-per-process. Prefixes every sequence.
  - Revision { agent_epoch, sequence } with lexicographic Ord.
  - LifecycleTransition enum: Applied { from, to, last_error } |
    Removed { from }. Replaces (from: Option<Phase>, to: Phase) so
    deletion is modeled explicitly in the wire format.

Bug fixes that fell out of the redesign:

  #1 (drop_phase was silent on the wire): `drop_phase` now
     produces a RecordedTransition with Removed { from }, which
     the publisher serializes into a StateChangeEvent. Operator
     applies the Removed variant by decrementing `from` without
     a paired increment. Counters no longer over-count after
     deletions.

  #2 (sequence reset on agent restart): (agent_epoch, sequence)
     lexicographic ordering means the first post-restart event
     (seq=1 under a fresh epoch) outranks any pre-restart event
     the operator had applied. No more silently-dropped events
     after an agent crash.

Split recommended in review point #4:
  - `record_apply` / `record_remove`: pure in-memory state
    updates returning Option<RecordedTransition>.
  - `publish_transition`: side-effectful wire emission.
  - `apply_phase` / `drop_phase`: thin composite helpers the
    hot path uses.

Typed keys in the operator:
  - DevicePair { device_id, deployment: DeploymentName } replaces
    (String, String) so the two identifiers can't be swapped.
  - FleetState.deployment_namespace is keyed by DeploymentName.
  - Controller's kv_key signature takes &DeploymentName; invalid
    CR names surface as a clear Error rather than corrupting KV.

Tests:
  - 27 contract tests (roundtrip every payload shape, including
    forward-compat parsing; validate DeploymentName rejection
    paths; assert Revision ordering across epochs).
  - 19 operator fleet_aggregator tests, including regression
    guards named for the specific bugs:
      removed_transition_decrements_without_paired_increment  (#1)
      revision_ordering_handles_agent_restart                 (#2)
  - 8 agent reconciler tests (record_apply/record_remove purity,
    sequence monotonicity, agent_epoch stamping, ring buffer
    cap).

Agent main wires a fresh AgentEpoch via rand::random::<u64>() at
startup; FleetPublisher::connect takes it and includes it in every
DeviceInfo + state-change event.
Chapter 4 shipped per-concern wire types (DeviceInfo, DeploymentState,
HeartbeatPayload, StateChangeEvent) as replacements for the monolithic
AgentStatus heartbeat. The parity check proved the new path matches the
legacy one; legacy now goes.

Removed:
- AgentStatus, DeploymentPhase, EventEntry, agent-status bucket, status_key
- iot-operator-v0/src/aggregate.rs (legacy full-recompute aggregator)
- Parity machinery in fleet_aggregator.rs (ParityStats, parity_tick, dual-write)
- Agent recent_events ring + push_event (consumed only by AgentStatus)
- publish_log_event + device-log-events stream (no consumer, YAGNI)

fleet_aggregator now drives CR .status.aggregate directly: event consumer
maintains counters incrementally, 1 Hz patch_tick flushes only deployments
in the `dirty` set.

Net: ~1000 lines removed (4263 → 3216 across the three iot crates).
Wire surface: 5 types → 4. Operator tasks: 4 → 2 (controller + aggregator).

Tests: 21 contracts + 9 operator + 6 agent — all green.
Zero consumers, zero publishers — pure speculative surface area.
Drops LogEvent struct, EventSeverity enum, STREAM_DEVICE_LOG_EVENTS,
log_event_subject, logs_subject, logs_query_subject.

If per-device log streaming lands later, it arrives with a real
consumer attached.

Contracts tests: 21 → 19 (removed two roundtrip tests for the deleted type).
Collapses the Chapter 4 event-stream architecture into pure KV watch.
The operator was maintaining a durable JetStream consumer on
device-state-events in parallel with the KV bucket it was meant to
shadow — the stream was an optimization over KV scanning, but with
async-nats's ordered bucket watch it's redundant.

Gone:
- StateChangeEvent, LifecycleTransition, STREAM_DEVICE_STATE_EVENTS,
  state_event_subject, STATE_EVENT_WILDCARD (contracts)
- Revision, AgentEpoch (contracts) — restart ordering now handled by
  DeploymentState.last_event_at monotonic check
- PhaseCounters.apply_event + incremental diff machinery (operator) —
  counters recomputed per dirty CR from the states snapshot
- RecordedTransition + publish_transition split (agent) — without an
  event to publish, the pure/publish boundary has no reason to exist
- Agent sequence counter + agent_epoch generation (agent main.rs)
- CR aggregate fields recent_events, last_heartbeat_at, unreported —
  never populated, pure speculation

New shape:
- fleet_aggregator.rs watches device-state via bucket.watch_all_from_revision(0)
- apply_state / drop_state mutate an in-memory snapshot
- patch_tick refreshes CR index from kube, recomputes aggregates for
  CRs marked dirty, patches CR status
- DeploymentAggregate = succeeded/failed/pending + last_error only

Line counts (3 iot crates):
  4263 -> 3090 -> 2162 (-49% overall, -30% this pass)

Tests: 24 total (13 contracts + 6 operator + 5 agent), all green.
- agent-status bucket -> device-heartbeat bucket
- status.<device> key -> heartbeat.<device>
- drop parity check summary from smoke-a4 (legacy path is gone)
- tidy stale AgentStatus comment in agent main
`bucket.watch_all_from_revision(0)` sends the JetStream consumer
request with DeliverByStartSequence and an optional-missing start
sequence, which the server rejects with error 10094:

  consumer delivery policy is deliver by start sequence, but
  optional start sequence is not set

`watch_with_history(">")` uses DeliverPolicy::LastPerSubject instead —
replays the current value of every key, then streams live updates.
Same cold-start-plus-steady-state semantics, correct wire.

Caught by smoke-a4 --auto: state watcher exited immediately on
startup, no deployments ever reconciled.
- example_iot_load_test: simulates N devices (default 100 across 10
  groups: 55 + 9×5) pushing DeploymentState every tick to NATS, no
  real podman. Applies one Deployment CR per group, runs for a
  bounded duration, verifies each CR's .status.aggregate counters
  sum to the target device count.

- iot/scripts/load-test.sh: minimum harness — k3d cluster + NATS via
  NatsBasicScore + CRD + operator + load-test binary. No VM, no
  agent build.

- operator: connect_with_retry() on startup. The NATS TCP probe that
  the smoke scripts do isn't enough to guarantee the protocol
  handshake is ready (k3d loadbalancer can accept SYNs before the
  pod is serving); the load harness hit this racing against a
  freshly-rebuilt operator binary.

- drop unused rand dep from iot-agent-v0 Cargo.toml.

100-device run: 6002 state writes in 60s at a clean 100 writes/s,
all 10 CR aggregates converge to target_devices.len() (e.g.
group-00 → 55 = 45 Running + 9 Failed + 1 Pending).
Sequential apply was fine at 10 groups; becomes the startup bottleneck
at 1000. 32-way concurrent CR apply lands 1000 Deployment CRs in ~1.6s;
64-way concurrent DeviceInfo seed seeds 10k devices in ~0.3s.

Also zero-pad CR names and device ids to the largest width so large
runs sort lexicographically in kubectl.
feat(iot-load-test): stable paths + HOLD=1 interactive mode
Some checks failed
Run Check Script / check (pull_request) Failing after 52s
5e8e72df52
- Stable working dir under /tmp/iot-load-test/ — kubeconfig at
  /tmp/iot-load-test/kubeconfig, operator log at
  /tmp/iot-load-test/operator.log. No more chasing mktemp paths.

- Print an explore banner before the load run so the user can
  `export KUBECONFIG=...` and `kubectl get deployments -w` in
  another terminal while the load actually runs.

- HOLD=1 env var keeps the stack alive after the load completes;
  script blocks on sleep until Ctrl-C. Forwards --keep to the
  binary so CRs + KV entries stay in place for inspection.

- DEBUG=1 bumps operator RUST_LOG to surface every status patch.

- Keep operator.log after successful runs (cheap, often useful).

- Load-test binary: --cleanup bool → --keep flag (clap bool with
  default_value_t = true doesn't accept `--cleanup=false`).
Author
Owner

Superseded by #275 and #276

Superseded by #275 and #276
johnride closed this pull request 2026-04-25 13:55:40 +00:00
Some checks failed
Run Check Script / check (pull_request) Failing after 52s

Pull request closed

Sign in to join this conversation.
No Reviewers
1 Participants
Notifications
Due Date
No due date set.
Dependencies

No dependencies set.

Reference: NationTech/harmony#274
No description provided.