First slice of the device-commands.* protocol from fleet/requests_over_nats.md. Lands `Verb::Ping` plus the harness that proves it works against a real in-cluster agent. Wire types (`harmony-reconciler-contracts::commands`): - `Verb::Ping`, `CommandRequest`, `PingReply`, `ErrorReply`/`ErrorKind` - `device_command_subject` / `device_command_subscription` helpers - `X-Harmony-*` header constants Agent: - `command_server.rs` subscribes on `device-commands.<id>.>` and dispatches verbs; ping handler replies with `PingReply` - New `[agent].runtime_enabled` config flag (default true). When false, podman init + reconciler loop are skipped so the agent can run as a Pod on containerd-only k3d nodes; command server + heartbeat still run - `Dockerfile`: canonical multi-stage build for production registries Operator: - `commands::FleetCommandsClient` with typed `CommandError` (`DeviceOffline` via `no_responders`, `Timeout`, `BadReply`, `Nats`) E2E harness (`harmony-fleet-e2e`): - Library crate + integration test. `Stack::bring_up` provisions a fresh `e2e-<uuid8>` namespace in a shared `fleet-e2e` k3d cluster, deploys NATS (UserPass auth, JetStream on) + the agent Pod, returns a connected admin NATS client, and tears the namespace down on Drop - v1 ships `AuthMode::UserPass` only; the `Callout` variant is reserved on the public API for the follow-up PR that adds the mock OIDC fixture + NatsAuthCalloutScore deployment - Operator pod deployment is also follow-up — for ping the test process drives `FleetCommandsClient` directly against the cluster's NATS NodePort - `HARMONY_FLEET_E2E=1` gates the integration test so default `cargo test --workspace` runs don't depend on k3d/podman - Image build + sideload mirrors the `fleet_auth_callout` pattern: host `cargo build --release` → single-stage Dockerfile → `podman build` → `k3d image import`. ~12s warm bring-up, ~80s cold
20 KiB
I am building Harmony fleet which is basically an iot fleet platform to deploy applications and run commands on the devices. I also will use it for Harmony to manage clusters everywhere distributed clusters that we installed in homes or office buildings and then we're now at the point where it works. We can easily set up an agent on a device and the device joins by using zitadel for authentication. So an administrator will launch the agent and this creates the service account in zitadel than the device is can join the Nats connect to the Nats server using our nats auth. Call out service that validates the zitadel login through Nats directly. So the only endpoint that the agent has access to is Nats which brings me to my question. How can I design an efficient way for the control plane? The operator that is also connected to Nats to send a command or query to an agent and then the agent gets back the operator I want to execute commands like run adhocrypt or maybe even open atty or run a command and send logs. Send statistics about the current status and I'm spelling it out out I think. And what I could do is that every device gets listens on a dedicated channel that is not KV stream like the other stuff. For deployments we use jet stream and for device desired State and last state and stuff like that we use jet stream for that. But for commands, we could have a separate nuts topic that the operator can connect to because operator has rights to connect to every device every channel of every device. And we use Nats to build a dynamic set of permission. So we could simply add another Channel where the operator could communicate so Wendy Alberto writes to it. The agent receives it and then can reply back on this same topic and the other operator receives it and know which device it is for and then it can be identified by request ID or something like that. I'm sure there are existing implementations that do similar stuff with streams instead of arrest apis. So basically how do I implement something that feels more or less like a rest API on top of a stream connection like Nats?
Good question — and yes, this is a well-trodden path. NATS has first-class primitives for exactly this. You don't need to invent a request/response layer on top of streams; you want core NATS request-reply (not JetStream) for the command channel, with JetStream reserved for what it's good at (durable desired/last state, deployment manifests, audit logs).
Here's the shape I'd recommend.
Subject hierarchy
Use a structured subject space so your NATS permissions stay clean and operator→device routing is just a subject pattern:
harmony.device.<device_id>.cmd.<verb> # operator → agent (request)
harmony.device.<device_id>.cmd.<verb>.ack # optional progress/streaming
harmony.device.<device_id>.evt.<kind> # agent → world (telemetry, unsolicited)
harmony.device.<device_id>.tty.<session> # interactive PTY (bidirectional)
Verbs map to your command types: exec, logs, stat, pty.open, pty.input, pty.resize, pty.close, restart, etc.
Agent permissions (per device, derived from the Zitadel service account claims in your auth callout):
- sub:
harmony.device.<device_id>.cmd.> - pub:
harmony.device.<device_id>.evt.>, plus replies to inboxes (_INBOX.>)
Operator permissions:
- pub:
harmony.device.*.cmd.> - sub:
harmony.device.*.evt.>,_INBOX.>
The auth callout already gives you the device_id; bake it into the permission template at issuance time. That's your whole authz model.
Request/reply: core NATS, not JetStream
For commands, use nc.Request(subject, payload) (or request_multi if you want streaming replies). NATS handles this natively:
- Operator publishes to
harmony.device.dev-abc.cmd.execwithreply=_INBOX.xyz. - Agent has a queue subscription on
harmony.device.<my_id>.cmd.>. It receives the message, runs the work, publishes the result to_INBOX.xyz. - Operator's request future resolves. Done. No stream, no consumer, no acks to manage.
This is the REST-feeling layer you're describing. Each request gets a unique inbox subject, NATS routes the reply back to the originating operator, and you get request/response semantics with millisecond latency.
You do not need to invent request IDs for correlation — the inbox subject IS the correlation. Put your own request_id in headers anyway for logging/tracing, but routing is handled.
When you need streaming (logs, exec stdout, PTY)
Two patterns, pick per use case:
Pattern A — single request, multiple replies (requestMany / request_multi): operator sends one request, agent publishes N messages to the reply inbox, then a sentinel (empty payload + Status: 204 header, or your own final=true header). Good for logs --follow up to a point, exec with streamed stdout, bounded result sets.
Pattern B — bidirectional session subject: for interactive PTY, allocate a session subject like harmony.device.<id>.tty.<session_uuid>. Both sides sub/pub on it. The initial pty.open cmd negotiates the session ID (return it in the reply), then both sides switch to the session subject. Tear it down with pty.close or a TTL. This is how kubectl exec / SSH-over-NATS implementations typically work.
For Pattern B, grant the agent temporary pub/sub on harmony.device.<id>.tty.<session>.> — either by including session subjects in its base permissions template, or by using NATS 2.10+ scoped permissions if you want tighter isolation.
Where JetStream still belongs in the command path
Not in the request/reply itself, but adjacent:
- Command audit log: agent (or a sidecar consumer) republishes completed commands + results to a JS stream
HARMONY_AUDITkeyed byharmony.audit.device.<id>.<verb>. Operators query history without bothering the device. - Deferred/offline commands: if you need "run this when the device comes back online," publish to a JS stream
HARMONY_CMD_QUEUEwith a per-device consumer the agent pulls from on connect. This is a separate flow from interactive commands — don't conflate them. Interactive = core request/reply, deferred = JS work queue. - Long-running job results: if a command takes minutes, return a
job_idimmediately from the request/reply, then have the agent stream progress toharmony.device.<id>.evt.job.<job_id>(which you can persist in JS if you want resumability).
Concrete recommendations
- Use
async-nats(Rust) —client.request()returns aMessage,client.request_with_headers()lets you carry trace IDs, deadlines, etc. For streaming userequestMultisemantics: subscribe to a fresh inbox, publish with that inbox as reply, drain until sentinel. - Put a deadline header (
Nats-Expected-Reply-Deadlineor your own) on every command. Agent should refuse to start work if it's past deadline (clock skew tolerance ~5s). Saves you from zombie commands when an operator disconnects. - Use queue groups on the agent side only if you ever run multiple agent processes per device (you probably won't, but designing for it costs nothing — queue group name = device_id).
- Headers carry:
request_id,operator_sub(from JWT, for audit),deadline,command_version. Body is your command-specific payload (protobuf or msgpack — avoid JSON for PTY streams). - Don't put the PTY through JetStream. People try this and regret it. JS adds latency and persistence you don't want for keystrokes. Core NATS only, ephemeral subject, session dies with the connection.
- For discovery / "is device online", use NATS service framework (the
servicesAPI in async-nats) or just a heartbeat toharmony.device.<id>.evt.heartbeat+ a presence KV that the agent updates with a short TTL. The latter integrates nicely with your existing desired/last-state KV.
What this gets you
- REST-like ergonomics (
operator.exec(device_id, cmd).await) with sub-10ms RTT. - No consumer management for the hot path.
- Permissions enforced entirely by subject patterns — no app-level authz checks beyond what the JWT already grants.
- JetStream still does what it's good at (audit, deferred work, state) without being on the critical path for interactive ops.
- Trivially horizontally scalable on the operator side (any operator can talk to any device; replies route back to the right one via inbox).
The mental model: NATS subjects are your URL space, request/reply is your HTTP, JetStream is your database. Don't blur those layers.
One thing worth checking before you commit: the async-nats services module (nats.rs has it, nats.go definitely has it) gives you a ready-made request/reply server abstraction with built-in stats, schema endpoints, and $SRV.PING-style discovery. Might save you writing the agent-side command dispatcher from scratch — worth a look even if you end up not using it.
Yep, that sounds exactly like what I need. I think I remember reading about that a few months ago, but now I'm just wondering if I dive a bit deeper first find the official Nats Ducks and scenario videos so I can review them in depth before implementing it. I've been writing code with language models for a couple weeks, but now I need to really get a grip on it so I'll write that part of the code manually. Probably this will be the foundation for the last couple features before we ship Harmony fleet to production for the first customers and then my intuition is that the Nats request implementation could very well be client-side only. I don't see any server-side feature because basically all it does is create a topic. Yeah let's call it. The topic create a topic, send a message there with probably a request ID inside the message and then wait until it receives back the message with the request ID by listening on that topic or create, send the first handshake message and then create a temporary topic for this request specifically. Anyways, I don't see any server-side feature required for that, unlike for example jet stream that requires persistence on the server side. So make an in-depth research about that. Provide me with documentation, link, video links, but also an answer with hardproof about my intuition whether it's correct or not
I'll dig into this. Your intuition is worth verifying against the actual protocol spec, not just docs prose.Good, the docs already confirm the core picture. Let me dig into the protocol spec to verify the no-server-feature claim definitively, then look at the services framework and PTY-over-NATS implementations.Perfect — this nails the protocol-level proof. Now let me look at the services framework and the no_responders detail, plus the Synadia talks/videos for review material.Excellent. Now let me confirm the no_responders mechanism and look at the no_responders headers feature.This is very interesting — the no_responders is one server feature, and the .NET shared-inbox mode confirms how the client side optimizes things. Let me check on PTY/shell-over-NATS examples and find Synadia videos for review material.Let me also check the Synadia "Supercharged Microservices" workshop and the original "Hello NATS" series for primary review material, plus look at the .NET shared-inbox optimization to understand client-side implementation differences.I now have everything I need. The protocol-level evidence is conclusive. Let me give you the verdict with hard proof, the small footnote where you're slightly off, and a curated review list.
Your intuition: correct, with one server-side footnote
Hard proof from the protocol spec. The NATS wire protocol defines exactly three operations a client uses for messaging: PUB, SUB, and MSG (plus HPUB/HMSG for the header variants). The PUB line is:
PUB <subject> [reply-to] <#bytes>\r\n[payload]\r\n
That optional reply-to field is the entire mechanism. The PUB message publishes the message payload to the given subject name, optionally supplying a reply subject. If a reply subject is supplied, it will be delivered to eligible subscribers along with the supplied payload. On the receiving side, MSG carries the same optional field forward: MSG <subject> <sid> [reply-to] <#bytes> — the subject on which the publisher is listening for responses.
That's it. There is no REQUEST verb, no REPLY verb, no special server-side acknowledgement, no broker state for the in-flight request. The server's job is unchanged: route a message published on subject X to anyone subscribed to X. The fact that the message happens to carry a reply-to subject is just another field it forwards. The server has no idea this is a "request."
What clients actually do, confirmed by the NATS FAQ: Request() is simply a convenience API that does this for you in a pseudo-synchronous fashion, using a timeout supplied by you. It creates an INBOX (a type of subject that is unique to the requestor), subscribes to it, then publishes your request message with the reply address set to the inbox subject. It will then wait for a response, or the timeout period to elapse, whichever comes first.
So your mental model — "create a topic, send a message with reply-to set, listen on that topic for the response, correlate by request ID if needed" — is exactly what every client library does under the hood. You could implement it with telnet and a pencil.
Two refinements worth knowing:
-
Inbox prefix optimization (client-side only — confirms your intuition further). Naive implementations create and tear down a subscription per request, which floods the interest graph across cluster routes and gateways. From the nats.c issue that originally fixed this: Request() will now setup a subscription on a wildcard inbox and each request will add a token to the inbox prefix. A map is then used to "dispatch" the reply to the corresponding request call. So modern clients open one subscription on
_INBOX.<conn_uuid>.*for the connection's lifetime and use the last token to demultiplex replies. .NET calls this SharedInbox Mode (Default): each request creates a temporary subscription using a unique inbox subject. Direct mode uses a pre-established subscription and message correlation via numeric IDs for higher performance request-reply operations. Pure client-side optimization, zero server involvement. -
The one tiny server-side feature:
no_responders. This is the only place where your "pure client-side" claim isn't 100% true. When a request is sent to a subject that has no subscribers, it can be convenient to know about it right away. For this use-case, a NATS client can opt-into no_responder messages. This requires a server and client that support headers. When enabled, a request sent to a subject with no subscribers will immediately receive a reply that has no body, and a 503 status. The client signals support inCONNECT, and the server short-circuits the timeout by synthesizing a 503 reply when the subject has zero interest. Without this, your request would just hang until your timeout — which is what early NATS did, and what a "pure client" implementation gives you. It's a small server optimization that's been default since NATS 2.2.0.
For Harmony fleet, this 503 path is actually useful: if you fire an exec command at a device that isn't connected, you find out in milliseconds rather than at your deadline. You want clients with no_responders support.
So the corrected statement of your intuition:
Request/reply is fundamentally a client-side pattern built from
PUBwith a reply-to +SUBon an inbox subject. The server's only request/reply-specific behavior is the optionalno_respondersshort-circuit (503 on a subject with zero interest), which is a latency optimization — not a correctness requirement. Everything else is just regular pub/sub.
That matches the JetStream contrast you drew: JetStream is genuinely server-side because it adds persistence, consumer state, sequence tracking, dedup windows, and acks. Request/reply adds none of that.
Review material, ranked by what'll be most useful before you write code
Read first (protocol-level proof and conceptual grounding):
- NATS Client Protocol reference — the actual wire spec. The
PUB,SUB,MSG,HPUB,HMSGdefinitions are short. Read these and request/reply stops being magic. https://docs.nats.io/reference/reference-protocols/nats-protocol - Request-Reply concept page — the no_responders detail and the inbox concept. https://docs.nats.io/nats-concepts/core-nats/reqreply
- NATS Protocol Demo — walks through
telnet demo.nats.io 4222and doing PUB/SUB by hand. Five minutes, demystifies everything. https://docs.nats.io/reference/reference-protocols/nats-protocol-demo - FAQ entry on Request() — the cleanest single-paragraph description of what the API actually does. https://docs.nats.io/reference/faq
Then read (Rust-specific, since you'll write the agent and operator in Rust):
async_natsdocs —client.request(),client.request_with_headers(), theRequestbuilder, and theservicemodule. https://docs.rs/async-nats- NATS by Example: Request-Reply (Rust) — runnable, idiomatic. https://natsbyexample.com/examples/messaging/request-reply/rust
- NATS by Example: Services intro (Rust) — the
service_builderpattern, automatic stats/ping/discovery. Worth knowing about even if you don't end up using it. https://natsbyexample.com/examples/services/intro/rust nats.rsrepo README — explains the core vs. Orbit split, which matters for picking which crate to depend on. https://github.com/nats-io/nats.rs
Videos (for review, not deep study):
- "Deep Dive: NATS" — Colin Sullivan & Wally Quevedo (KubeCon) — older but the fundamentals haven't changed. https://www.youtube.com/watch?v=wBJj0UbcZuU
- "Supercharged Microservices with NATS" — Jeremy Saenz (Synadia) — 3-hour workshop replay, hands-on, covers the micro/services framework directly. The APAC replay is the one referenced in Supercharged Microservices with NATS - Join Jeremy Saenz from Synadia for a 3+ hour workshop and deep dive on NATS for building applications with a microservice-centric architecture. Search the Synadia YouTube channel for the latest upload. https://www.youtube.com/@SynadiaCommunications/videos
- Developer Voices podcast: "Inter-System Messaging with NATS & JetStream" — Kris Jenkins interviewing Jeremy Saenz, audio-friendly for car/walk time.
Worth a glance for design ideas:
- Synadia "Orbit.go" guide — shows the
natsextmodule'sRequestManyfor scatter-gather and multi-response streaming. Useful conceptually even though it's Go; the Rust equivalent (request_multi/ building it yourself with an inbox sub) follows the same model. https://www.synadia.com/blog/orbit-go-guide
One pragmatic note for when you sit down to write it
When you implement the operator → agent command path manually in Rust to really understand it, do this order:
- First write the naive version: per-request
client.subscribe(new_inbox).await, thenclient.publish_with_reply(), thensubscriber.next().await, then drop the subscription. This is what the protocol literally describes and what you intuited. It works. You'll see every byte of the flow. - Then switch to
client.request()and observe (vianats sub '>'ornats traffic) that the wire trace is nearly identical except your client now has one persistent_INBOX.<uuid>.*subscription. - Then layer headers on for
request_id,deadline,operator_sub. - Then decide whether to adopt the
service_builderframework on the agent side — it's a nice-to-have for ping/stats/discovery, but you can skip it and use plain subscriptions if you want full control.
You'll have the foundation you described internalized in an afternoon. The protocol is small enough that the magic disappears completely once you've seen the bytes.