Files
harmony/fleet/requests_over_nats.md
Jean-Gabriel Gill-Couture d013246a68 feat(fleet): request/reply commands over NATS — wire types, agent server, operator client, e2e harness
First slice of the device-commands.* protocol from
fleet/requests_over_nats.md. Lands `Verb::Ping` plus the harness that
proves it works against a real in-cluster agent.

Wire types (`harmony-reconciler-contracts::commands`):
- `Verb::Ping`, `CommandRequest`, `PingReply`, `ErrorReply`/`ErrorKind`
- `device_command_subject` / `device_command_subscription` helpers
- `X-Harmony-*` header constants

Agent:
- `command_server.rs` subscribes on `device-commands.<id>.>` and
  dispatches verbs; ping handler replies with `PingReply`
- New `[agent].runtime_enabled` config flag (default true). When
  false, podman init + reconciler loop are skipped so the agent can
  run as a Pod on containerd-only k3d nodes; command server +
  heartbeat still run
- `Dockerfile`: canonical multi-stage build for production registries

Operator:
- `commands::FleetCommandsClient` with typed `CommandError`
  (`DeviceOffline` via `no_responders`, `Timeout`, `BadReply`, `Nats`)

E2E harness (`harmony-fleet-e2e`):
- Library crate + integration test. `Stack::bring_up` provisions a
  fresh `e2e-<uuid8>` namespace in a shared `fleet-e2e` k3d cluster,
  deploys NATS (UserPass auth, JetStream on) + the agent Pod, returns
  a connected admin NATS client, and tears the namespace down on Drop
- v1 ships `AuthMode::UserPass` only; the `Callout` variant is
  reserved on the public API for the follow-up PR that adds the mock
  OIDC fixture + NatsAuthCalloutScore deployment
- Operator pod deployment is also follow-up — for ping the test
  process drives `FleetCommandsClient` directly against the cluster's
  NATS NodePort
- `HARMONY_FLEET_E2E=1` gates the integration test so default
  `cargo test --workspace` runs don't depend on k3d/podman
- Image build + sideload mirrors the `fleet_auth_callout` pattern:
  host `cargo build --release` → single-stage Dockerfile → `podman
  build` → `k3d image import`. ~12s warm bring-up, ~80s cold
2026-05-18 09:47:36 -04:00

20 KiB

I am building Harmony fleet which is basically an iot fleet platform to deploy applications and run commands on the devices. I also will use it for Harmony to manage clusters everywhere distributed clusters that we installed in homes or office buildings and then we're now at the point where it works. We can easily set up an agent on a device and the device joins by using zitadel for authentication. So an administrator will launch the agent and this creates the service account in zitadel than the device is can join the Nats connect to the Nats server using our nats auth. Call out service that validates the zitadel login through Nats directly. So the only endpoint that the agent has access to is Nats which brings me to my question. How can I design an efficient way for the control plane? The operator that is also connected to Nats to send a command or query to an agent and then the agent gets back the operator I want to execute commands like run adhocrypt or maybe even open atty or run a command and send logs. Send statistics about the current status and I'm spelling it out out I think. And what I could do is that every device gets listens on a dedicated channel that is not KV stream like the other stuff. For deployments we use jet stream and for device desired State and last state and stuff like that we use jet stream for that. But for commands, we could have a separate nuts topic that the operator can connect to because operator has rights to connect to every device every channel of every device. And we use Nats to build a dynamic set of permission. So we could simply add another Channel where the operator could communicate so Wendy Alberto writes to it. The agent receives it and then can reply back on this same topic and the other operator receives it and know which device it is for and then it can be identified by request ID or something like that. I'm sure there are existing implementations that do similar stuff with streams instead of arrest apis. So basically how do I implement something that feels more or less like a rest API on top of a stream connection like Nats?


Good question — and yes, this is a well-trodden path. NATS has first-class primitives for exactly this. You don't need to invent a request/response layer on top of streams; you want core NATS request-reply (not JetStream) for the command channel, with JetStream reserved for what it's good at (durable desired/last state, deployment manifests, audit logs).

Here's the shape I'd recommend.

Subject hierarchy

Use a structured subject space so your NATS permissions stay clean and operator→device routing is just a subject pattern:

harmony.device.<device_id>.cmd.<verb>      # operator → agent (request)
harmony.device.<device_id>.cmd.<verb>.ack  # optional progress/streaming
harmony.device.<device_id>.evt.<kind>      # agent → world (telemetry, unsolicited)
harmony.device.<device_id>.tty.<session>   # interactive PTY (bidirectional)

Verbs map to your command types: exec, logs, stat, pty.open, pty.input, pty.resize, pty.close, restart, etc.

Agent permissions (per device, derived from the Zitadel service account claims in your auth callout):

  • sub: harmony.device.<device_id>.cmd.>
  • pub: harmony.device.<device_id>.evt.>, plus replies to inboxes (_INBOX.>)

Operator permissions:

  • pub: harmony.device.*.cmd.>
  • sub: harmony.device.*.evt.>, _INBOX.>

The auth callout already gives you the device_id; bake it into the permission template at issuance time. That's your whole authz model.

Request/reply: core NATS, not JetStream

For commands, use nc.Request(subject, payload) (or request_multi if you want streaming replies). NATS handles this natively:

  1. Operator publishes to harmony.device.dev-abc.cmd.exec with reply=_INBOX.xyz.
  2. Agent has a queue subscription on harmony.device.<my_id>.cmd.>. It receives the message, runs the work, publishes the result to _INBOX.xyz.
  3. Operator's request future resolves. Done. No stream, no consumer, no acks to manage.

This is the REST-feeling layer you're describing. Each request gets a unique inbox subject, NATS routes the reply back to the originating operator, and you get request/response semantics with millisecond latency.

You do not need to invent request IDs for correlation — the inbox subject IS the correlation. Put your own request_id in headers anyway for logging/tracing, but routing is handled.

When you need streaming (logs, exec stdout, PTY)

Two patterns, pick per use case:

Pattern A — single request, multiple replies (requestMany / request_multi): operator sends one request, agent publishes N messages to the reply inbox, then a sentinel (empty payload + Status: 204 header, or your own final=true header). Good for logs --follow up to a point, exec with streamed stdout, bounded result sets.

Pattern B — bidirectional session subject: for interactive PTY, allocate a session subject like harmony.device.<id>.tty.<session_uuid>. Both sides sub/pub on it. The initial pty.open cmd negotiates the session ID (return it in the reply), then both sides switch to the session subject. Tear it down with pty.close or a TTL. This is how kubectl exec / SSH-over-NATS implementations typically work.

For Pattern B, grant the agent temporary pub/sub on harmony.device.<id>.tty.<session>.> — either by including session subjects in its base permissions template, or by using NATS 2.10+ scoped permissions if you want tighter isolation.

Where JetStream still belongs in the command path

Not in the request/reply itself, but adjacent:

  • Command audit log: agent (or a sidecar consumer) republishes completed commands + results to a JS stream HARMONY_AUDIT keyed by harmony.audit.device.<id>.<verb>. Operators query history without bothering the device.
  • Deferred/offline commands: if you need "run this when the device comes back online," publish to a JS stream HARMONY_CMD_QUEUE with a per-device consumer the agent pulls from on connect. This is a separate flow from interactive commands — don't conflate them. Interactive = core request/reply, deferred = JS work queue.
  • Long-running job results: if a command takes minutes, return a job_id immediately from the request/reply, then have the agent stream progress to harmony.device.<id>.evt.job.<job_id> (which you can persist in JS if you want resumability).

Concrete recommendations

  1. Use async-nats (Rust) — client.request() returns a Message, client.request_with_headers() lets you carry trace IDs, deadlines, etc. For streaming use requestMulti semantics: subscribe to a fresh inbox, publish with that inbox as reply, drain until sentinel.
  2. Put a deadline header (Nats-Expected-Reply-Deadline or your own) on every command. Agent should refuse to start work if it's past deadline (clock skew tolerance ~5s). Saves you from zombie commands when an operator disconnects.
  3. Use queue groups on the agent side only if you ever run multiple agent processes per device (you probably won't, but designing for it costs nothing — queue group name = device_id).
  4. Headers carry: request_id, operator_sub (from JWT, for audit), deadline, command_version. Body is your command-specific payload (protobuf or msgpack — avoid JSON for PTY streams).
  5. Don't put the PTY through JetStream. People try this and regret it. JS adds latency and persistence you don't want for keystrokes. Core NATS only, ephemeral subject, session dies with the connection.
  6. For discovery / "is device online", use NATS service framework (the services API in async-nats) or just a heartbeat to harmony.device.<id>.evt.heartbeat + a presence KV that the agent updates with a short TTL. The latter integrates nicely with your existing desired/last-state KV.

What this gets you

  • REST-like ergonomics (operator.exec(device_id, cmd).await) with sub-10ms RTT.
  • No consumer management for the hot path.
  • Permissions enforced entirely by subject patterns — no app-level authz checks beyond what the JWT already grants.
  • JetStream still does what it's good at (audit, deferred work, state) without being on the critical path for interactive ops.
  • Trivially horizontally scalable on the operator side (any operator can talk to any device; replies route back to the right one via inbox).

The mental model: NATS subjects are your URL space, request/reply is your HTTP, JetStream is your database. Don't blur those layers.

One thing worth checking before you commit: the async-nats services module (nats.rs has it, nats.go definitely has it) gives you a ready-made request/reply server abstraction with built-in stats, schema endpoints, and $SRV.PING-style discovery. Might save you writing the agent-side command dispatcher from scratch — worth a look even if you end up not using it.


Yep, that sounds exactly like what I need. I think I remember reading about that a few months ago, but now I'm just wondering if I dive a bit deeper first find the official Nats Ducks and scenario videos so I can review them in depth before implementing it. I've been writing code with language models for a couple weeks, but now I need to really get a grip on it so I'll write that part of the code manually. Probably this will be the foundation for the last couple features before we ship Harmony fleet to production for the first customers and then my intuition is that the Nats request implementation could very well be client-side only. I don't see any server-side feature because basically all it does is create a topic. Yeah let's call it. The topic create a topic, send a message there with probably a request ID inside the message and then wait until it receives back the message with the request ID by listening on that topic or create, send the first handshake message and then create a temporary topic for this request specifically. Anyways, I don't see any server-side feature required for that, unlike for example jet stream that requires persistence on the server side. So make an in-depth research about that. Provide me with documentation, link, video links, but also an answer with hardproof about my intuition whether it's correct or not


I'll dig into this. Your intuition is worth verifying against the actual protocol spec, not just docs prose.Good, the docs already confirm the core picture. Let me dig into the protocol spec to verify the no-server-feature claim definitively, then look at the services framework and PTY-over-NATS implementations.Perfect — this nails the protocol-level proof. Now let me look at the services framework and the no_responders detail, plus the Synadia talks/videos for review material.Excellent. Now let me confirm the no_responders mechanism and look at the no_responders headers feature.This is very interesting — the no_responders is one server feature, and the .NET shared-inbox mode confirms how the client side optimizes things. Let me check on PTY/shell-over-NATS examples and find Synadia videos for review material.Let me also check the Synadia "Supercharged Microservices" workshop and the original "Hello NATS" series for primary review material, plus look at the .NET shared-inbox optimization to understand client-side implementation differences.I now have everything I need. The protocol-level evidence is conclusive. Let me give you the verdict with hard proof, the small footnote where you're slightly off, and a curated review list.


Your intuition: correct, with one server-side footnote

Hard proof from the protocol spec. The NATS wire protocol defines exactly three operations a client uses for messaging: PUB, SUB, and MSG (plus HPUB/HMSG for the header variants). The PUB line is:

PUB <subject> [reply-to] <#bytes>\r\n[payload]\r\n

That optional reply-to field is the entire mechanism. The PUB message publishes the message payload to the given subject name, optionally supplying a reply subject. If a reply subject is supplied, it will be delivered to eligible subscribers along with the supplied payload. On the receiving side, MSG carries the same optional field forward: MSG <subject> <sid> [reply-to] <#bytes> — the subject on which the publisher is listening for responses.

That's it. There is no REQUEST verb, no REPLY verb, no special server-side acknowledgement, no broker state for the in-flight request. The server's job is unchanged: route a message published on subject X to anyone subscribed to X. The fact that the message happens to carry a reply-to subject is just another field it forwards. The server has no idea this is a "request."

What clients actually do, confirmed by the NATS FAQ: Request() is simply a convenience API that does this for you in a pseudo-synchronous fashion, using a timeout supplied by you. It creates an INBOX (a type of subject that is unique to the requestor), subscribes to it, then publishes your request message with the reply address set to the inbox subject. It will then wait for a response, or the timeout period to elapse, whichever comes first.

So your mental model — "create a topic, send a message with reply-to set, listen on that topic for the response, correlate by request ID if needed" — is exactly what every client library does under the hood. You could implement it with telnet and a pencil.

Two refinements worth knowing:

  1. Inbox prefix optimization (client-side only — confirms your intuition further). Naive implementations create and tear down a subscription per request, which floods the interest graph across cluster routes and gateways. From the nats.c issue that originally fixed this: Request() will now setup a subscription on a wildcard inbox and each request will add a token to the inbox prefix. A map is then used to "dispatch" the reply to the corresponding request call. So modern clients open one subscription on _INBOX.<conn_uuid>.* for the connection's lifetime and use the last token to demultiplex replies. .NET calls this SharedInbox Mode (Default): each request creates a temporary subscription using a unique inbox subject. Direct mode uses a pre-established subscription and message correlation via numeric IDs for higher performance request-reply operations. Pure client-side optimization, zero server involvement.

  2. The one tiny server-side feature: no_responders. This is the only place where your "pure client-side" claim isn't 100% true. When a request is sent to a subject that has no subscribers, it can be convenient to know about it right away. For this use-case, a NATS client can opt-into no_responder messages. This requires a server and client that support headers. When enabled, a request sent to a subject with no subscribers will immediately receive a reply that has no body, and a 503 status. The client signals support in CONNECT, and the server short-circuits the timeout by synthesizing a 503 reply when the subject has zero interest. Without this, your request would just hang until your timeout — which is what early NATS did, and what a "pure client" implementation gives you. It's a small server optimization that's been default since NATS 2.2.0.

For Harmony fleet, this 503 path is actually useful: if you fire an exec command at a device that isn't connected, you find out in milliseconds rather than at your deadline. You want clients with no_responders support.

So the corrected statement of your intuition:

Request/reply is fundamentally a client-side pattern built from PUB with a reply-to + SUB on an inbox subject. The server's only request/reply-specific behavior is the optional no_responders short-circuit (503 on a subject with zero interest), which is a latency optimization — not a correctness requirement. Everything else is just regular pub/sub.

That matches the JetStream contrast you drew: JetStream is genuinely server-side because it adds persistence, consumer state, sequence tracking, dedup windows, and acks. Request/reply adds none of that.


Review material, ranked by what'll be most useful before you write code

Read first (protocol-level proof and conceptual grounding):

Then read (Rust-specific, since you'll write the agent and operator in Rust):

Videos (for review, not deep study):

  • "Deep Dive: NATS" — Colin Sullivan & Wally Quevedo (KubeCon) — older but the fundamentals haven't changed. https://www.youtube.com/watch?v=wBJj0UbcZuU
  • "Supercharged Microservices with NATS" — Jeremy Saenz (Synadia) — 3-hour workshop replay, hands-on, covers the micro/services framework directly. The APAC replay is the one referenced in Supercharged Microservices with NATS - Join Jeremy Saenz from Synadia for a 3+ hour workshop and deep dive on NATS for building applications with a microservice-centric architecture. Search the Synadia YouTube channel for the latest upload. https://www.youtube.com/@SynadiaCommunications/videos
  • Developer Voices podcast: "Inter-System Messaging with NATS & JetStream" — Kris Jenkins interviewing Jeremy Saenz, audio-friendly for car/walk time.

Worth a glance for design ideas:

  • Synadia "Orbit.go" guide — shows the natsext module's RequestMany for scatter-gather and multi-response streaming. Useful conceptually even though it's Go; the Rust equivalent (request_multi / building it yourself with an inbox sub) follows the same model. https://www.synadia.com/blog/orbit-go-guide

One pragmatic note for when you sit down to write it

When you implement the operator → agent command path manually in Rust to really understand it, do this order:

  1. First write the naive version: per-request client.subscribe(new_inbox).await, then client.publish_with_reply(), then subscriber.next().await, then drop the subscription. This is what the protocol literally describes and what you intuited. It works. You'll see every byte of the flow.
  2. Then switch to client.request() and observe (via nats sub '>' or nats traffic) that the wire trace is nearly identical except your client now has one persistent _INBOX.<uuid>.* subscription.
  3. Then layer headers on for request_id, deadline, operator_sub.
  4. Then decide whether to adopt the service_builder framework on the agent side — it's a nice-to-have for ping/stats/discovery, but you can skip it and use plain subscriptions if you want full control.

You'll have the foundation you described internalized in an afternoon. The protocol is small enough that the magic disappears completely once you've seen the bytes.