Files
harmony/fleet/PLAN_requests_over_nats.md
Jean-Gabriel Gill-Couture d013246a68 feat(fleet): request/reply commands over NATS — wire types, agent server, operator client, e2e harness
First slice of the device-commands.* protocol from
fleet/requests_over_nats.md. Lands `Verb::Ping` plus the harness that
proves it works against a real in-cluster agent.

Wire types (`harmony-reconciler-contracts::commands`):
- `Verb::Ping`, `CommandRequest`, `PingReply`, `ErrorReply`/`ErrorKind`
- `device_command_subject` / `device_command_subscription` helpers
- `X-Harmony-*` header constants

Agent:
- `command_server.rs` subscribes on `device-commands.<id>.>` and
  dispatches verbs; ping handler replies with `PingReply`
- New `[agent].runtime_enabled` config flag (default true). When
  false, podman init + reconciler loop are skipped so the agent can
  run as a Pod on containerd-only k3d nodes; command server +
  heartbeat still run
- `Dockerfile`: canonical multi-stage build for production registries

Operator:
- `commands::FleetCommandsClient` with typed `CommandError`
  (`DeviceOffline` via `no_responders`, `Timeout`, `BadReply`, `Nats`)

E2E harness (`harmony-fleet-e2e`):
- Library crate + integration test. `Stack::bring_up` provisions a
  fresh `e2e-<uuid8>` namespace in a shared `fleet-e2e` k3d cluster,
  deploys NATS (UserPass auth, JetStream on) + the agent Pod, returns
  a connected admin NATS client, and tears the namespace down on Drop
- v1 ships `AuthMode::UserPass` only; the `Callout` variant is
  reserved on the public API for the follow-up PR that adds the mock
  OIDC fixture + NatsAuthCalloutScore deployment
- Operator pod deployment is also follow-up — for ping the test
  process drives `FleetCommandsClient` directly against the cluster's
  NATS NodePort
- `HARMONY_FLEET_E2E=1` gates the integration test so default
  `cargo test --workspace` runs don't depend on k3d/podman
- Image build + sideload mirrors the `fleet_auth_callout` pattern:
  host `cargo build --release` → single-stage Dockerfile → `podman
  build` → `k3d image import`. ~12s warm bring-up, ~80s cold
2026-05-18 09:47:36 -04:00

428 lines
21 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# Plan — Request/Reply over NATS, TDD via in-cluster e2e harness
Two intertwined deliverables:
1. **`fleet/harmony-fleet-e2e`** — a new harness crate that brings up the full stack (NATS + auth-callout + fleet-operator + fleet-agent-as-pod) in a fresh k3d namespace and tears it down at process exit. Fast (target ≤15s bring-up when the cluster is already running, ≤5s teardown). Works against k3d locally or any cluster with a kubeconfig (incl. OKD).
2. **First feature, TDD-style**: `Verb::Ping`. Failing test in the harness, then the wire types + agent handler + operator client to make it green. Subsequent verbs (logs, exec) follow the same pattern in follow-up PRs.
Both land together because the harness is what proves cohesion: every fleet feature from now on gets its e2e test in the same crate, and the scattered bring-up code in `examples/fleet_e2e_demo` and `examples/fleet_auth_callout` becomes a thin layer over this harness.
## Goals & non-goals
| Goal | In v1 |
|---|---|
| `cargo test -p harmony-fleet-e2e` brings up the stack, runs ping test, tears down | ✅ |
| Per-test namespace isolation; multiple test runs can coexist in the same cluster | ✅ |
| Images built once and sideloaded into k3d (no registry push) | ✅ |
| Cluster reused across runs; only namespace is recreated | ✅ |
| Agent runs as a Pod (no VMs, no SSH, no libvirt) | ✅ |
| Harness prints NATS URL + admin creds so the developer can poke during a hung test | ✅ |
| First feature: `ping` (operator-side `FleetCommandsClient::ping`, agent-side handler, wire types) | ✅ |
| Runs against a remote OKD cluster via `KUBECONFIG` | ✅ (image-import step is conditional) |
| Non-goal (v1) | Reason |
|---|---|
| `logs` / `exec` implementations | Same wiring; covered in follow-up commits using the same harness |
| PTY | Doc Pattern B; defer |
| JetStream audit log | Defer; sidecar consumer added later |
| Zitadel in the harness | Cold-start cost is 30-60s; harness mode A uses a mock OIDC fixture for the callout to keep bring-up fast. Real Zitadel stays in `fleet_e2e_demo` (manual rehearsal). |
## Crate layout
New workspace member at `fleet/harmony-fleet-e2e/`:
```
fleet/harmony-fleet-e2e/
├── Cargo.toml
├── README.md # How to run, debug, point at remote clusters
├── src/
│ ├── lib.rs # Public surface: Stack, StackHandle, bring_up()
│ ├── images.rs # Build + sideload (callout, operator, agent)
│ ├── namespace.rs # Unique-namespace generation + RAII cleanup
│ ├── stack.rs # Compose Scores against K8sBareTopology
│ ├── nats.rs # NatsHelmChartScore preset with callout + mock-issuer block
│ ├── mock_oidc.rs # Tiny in-cluster OIDC fixture (issues JWTs the callout accepts)
│ ├── agent_pod.rs # New Score: agent as a Pod (no VM/SSH)
│ ├── observability.rs # NodePort + admin creds, helper to mint admin JWT
│ └── client.rs # FleetCommandsClient (operator-side wrapper for tests)
└── tests/
└── ping.rs # **First TDD test** — failing until the protocol lands
```
Crate kind: library + `[[test]]` integration tests. Not a binary; harness is consumed by tests via `harmony_fleet_e2e::Stack::bring_up().await`.
Cargo workspace: add to root `members`. Build deps: `harmony` (k8s+nats helpers), `harmony-fleet-auth`, `harmony-reconciler-contracts`, `k3d`, `async-nats`, `kube`, `k8s-openapi`, `tokio`, `anyhow`, `tracing`, `uuid` (for namespace ID), `tempfile`, `serde_json`.
### Why a separate crate (not an example)
Examples currently are bring-up scripts. The e2e harness is **infrastructure for tests** consumed by multiple callers (the new `tests/ping.rs`, future `tests/logs.rs`, `tests/exec.rs`, and eventually a slimmed-down `examples/fleet_e2e_demo` that just calls into it for the manual rehearsal). A library crate lets us expose `Stack`, `StackHandle`, `FleetCommandsClient` as proper types, with `cargo test` discovery and parallel-friendly per-namespace isolation.
## Agent-side prerequisite: gate podman behind config
The agent currently `panic`s if the podman socket isn't ready (`fleet/harmony-fleet-agent/src/main.rs:200`). For the in-cluster harness we need the agent to run on a node that doesn't expose podman.
Add to `agent-config.toml`:
```toml
[agent]
device_id = "vm-device-00"
# NEW: when false, skip podman init and the reconciler loop.
# Command server still runs (ping/exec-via-fallback are still useful).
runtime_enabled = true # default true; e2e harness sets false
```
Wire-up in `main.rs`:
- When `runtime_enabled = false`, the agent skips `PodmanTopology::from_default_socket()`, skips the reconciler periodic tick, but still subscribes to desired-state (KV watch) and runs the command server. KV deliveries with a non-podman Score variant get logged + rejected with `ErrorKind::BadRequest` (today we'd just drop them silently).
Small, contained change (~30 lines). Unlocks pod-based agents and unblocks future verbs (exec/logs add their own runtime requirements).
Alternative considered: mount `/var/run/podman/podman.sock` into the pod. Rejected — k3d nodes run containerd, not podman; mount would dangle.
## Harness public API
```rust
// fleet/harmony-fleet-e2e/src/lib.rs
pub struct Stack {
pub namespace: String, // e2e-<uuid8>
pub nats_url: String, // nats://localhost:<nodeport>
pub admin_token: String, // JWT for the mock OIDC, callout-accepted
pub device_ids: Vec<Id>, // ["vm-device-00", "vm-device-01", …]
pub operator_client: async_nats::Client, // pre-authed admin client
_guard: NamespaceGuard, // Drop impl deletes the namespace
}
pub struct StackOptions {
pub kubeconfig: Option<PathBuf>, // default: $KUBECONFIG, fall back to k3d-managed
pub k3d_cluster_name: Option<String>, // None = pick the harness default; required if not using k3d
pub num_devices: usize, // default 1; ping test uses 1
pub image_rebuild: bool, // env var FLEET_E2E_FORCE_REBUILD
pub keep_namespace: bool, // env var FLEET_E2E_KEEP=1 — skip teardown for debugging
pub auth_mode: AuthMode, // Callout (default) | UserPass (fastest)
}
pub enum AuthMode {
/// Real auth-callout + mock OIDC fixture. Exercises the production code path.
Callout,
/// NATS user/pass via TomlShared credentials. Skips callout entirely.
/// ~3-5s faster bring-up; use for tests that don't care about auth.
UserPass,
}
impl Stack {
pub async fn bring_up(opts: StackOptions) -> anyhow::Result<Self>;
pub fn print_debug_info(&self); // logs URL, token, namespace, kubectl shortcuts
}
```
`Drop for NamespaceGuard`: spawns a blocking task that runs `kubectl delete namespace <name> --wait=false`. Doesn't block process exit; the namespace garbage-collects asynchronously. If `keep_namespace = true`, just logs the name.
## TDD test order
### Test 1 (first to land): ping
```rust
// fleet/harmony-fleet-e2e/tests/ping.rs
#[tokio::test(flavor = "multi_thread")]
async fn operator_can_ping_agent() -> anyhow::Result<()> {
let stack = Stack::bring_up(StackOptions::default()).await?;
let device_id = &stack.device_ids[0];
let client = FleetCommandsClient::new(stack.operator_client.clone());
let reply = tokio::time::timeout(
Duration::from_secs(10),
client.ping(device_id.as_str()),
).await??;
assert_eq!(reply.device_id.as_str(), device_id.as_str());
assert!(!reply.agent_version.is_empty());
Ok(())
}
```
**Failing → green sequence:**
1. **Red**: write the test above. It can't even compile because `FleetCommandsClient`, `Stack`, `bring_up` don't exist.
2. **Scaffold the harness**: stub `Stack::bring_up` that just returns an error. Test compiles, fails at runtime.
3. **Bring up the cluster bits incrementally**:
- Namespace creation + RAII guard.
- NATS deploy via `NatsHelmChartScore` (UserPass mode first for speed).
- Operator deploy via `FleetOperatorScore` (image sideloaded).
- Agent pod deploy via new `FleetAgentPodScore`.
- Wait for pod readiness.
- Build operator admin NATS client.
4. **Implement the wire types** in `harmony-reconciler-contracts/src/commands.rs` (just `Verb::Ping` + `CommandRequest::Ping` + `PingReply` for now).
5. **Implement agent command server** with only the ping handler (`fleet/harmony-fleet-agent/src/command_server.rs`).
6. **Implement `FleetCommandsClient::ping`** in `fleet/harmony-fleet-operator/src/commands.rs`.
7. **Test goes green.**
8. **Add Callout auth mode** to the harness (mock OIDC fixture deployed alongside NATS), re-run test in both modes.
### Test 2 (follow-up PR): no-responders → DeviceOffline
```rust
#[tokio::test]
async fn ping_to_offline_device_returns_immediately() -> anyhow::Result<()> {
let stack = Stack::bring_up(StackOptions::default()).await?;
let client = FleetCommandsClient::new(stack.operator_client.clone());
let started = Instant::now();
let err = client.ping("nonexistent-device").await.unwrap_err();
assert!(matches!(err, CommandError::DeviceOffline));
assert!(started.elapsed() < Duration::from_secs(1));
Ok(())
}
```
### Test 3+ (follow-up PR, same harness): logs + exec — same pattern.
## Image build & sideload
`src/images.rs` exposes:
```rust
pub struct Images {
pub callout: String, // e.g. harmony-nats-callout:e2e-<contenthash>
pub operator: String,
pub agent: String,
}
pub async fn build_and_sideload(cluster: &K3dCluster, opts: BuildOpts) -> Result<Images>;
```
Implementation:
- For each of (callout, operator, agent):
- Hash the crate's source tree + `Cargo.lock`.
- If `podman images` doesn't contain `<image>:<hash>` and `FLEET_E2E_FORCE_REBUILD != 1`, skip.
- Otherwise: `cargo build --release -p <crate>` + `podman build -f Dockerfile -t <image>:<hash>`.
- `podman save | k3d image import -c <cluster>` (or `--volumes` if `--import` doesn't accept stdin; use the existing pattern from `examples/fleet_e2e_demo`).
Dockerfiles:
- Callout: exists at `nats/callout/Dockerfile` (used by the demo).
- Operator: exists at `fleet/harmony-fleet-operator/Dockerfile`.
- **Agent**: doesn't exist yet — add `fleet/harmony-fleet-agent/Dockerfile`. Distroless base, single static binary, ~5MB image.
Sideload bypass for remote clusters: if `opts.registry` is set, push to that registry and skip sideload. Out of scope for v1 (the user said defer); v1 just panics if running against a non-k3d cluster.
## Per-namespace isolation
Today the demo hardcodes `fleet-system` and `zitadel`. The harness:
- Picks namespace `e2e-<uuid8>` per `Stack::bring_up` call.
- Every Score in the harness is parametrized on `namespace`; nothing is hardcoded.
- The `FleetOperatorScore` already takes a `namespace` (verified in `harmony/src/modules/fleet/operator/score.rs`). The `NatsHelmChartScore` too. The `NatsAuthCalloutScore` too. Good.
- The CRDs (`Deployment`, `Device`) are cluster-scoped — but they're created once per cluster (idempotent apply), shared across e2e runs. The operator filters by namespace via its `kube::Api::namespaced()` calls.
- Wait — `Device` is cluster-scoped. Two simultaneous e2e runs would collide on `Device` CR names. Two mitigations:
- **Option A** (simpler): per-test device IDs include the namespace suffix (`vm-device-00-e2e-abc12345`). No collision.
- **Option B**: scope the `Device` CR to a namespace. Bigger change to the operator. Out of scope.
- Plan picks A.
## Auth mode story
Default `AuthMode::Callout` because the user explicitly asked for "nats + callout + operator + agent". To avoid Zitadel's bring-up cost, the harness ships a `mock_oidc.rs` fixture: a tiny single-Pod HTTP service that:
- Serves `/.well-known/openid-configuration` and `/jwks.json` from a process-generated keypair.
- Mints JWTs for `device-<id>` and `fleet-ops` machine users on demand via a `/token` endpoint the harness calls.
- ~200 LOC, no external deps. Lives inside `harmony-fleet-e2e` (not exposed elsewhere).
The callout points its `oidc_issuer_url` at the mock service's in-cluster URL. From the callout's perspective this is indistinguishable from Zitadel.
`AuthMode::UserPass` skips the callout entirely: NATS deploys with two static accounts (`device` + `admin`) and the agent's `TomlShared` credential variant connects directly. ~3-5s faster bring-up. Useful when iterating on the command protocol itself, where auth isn't being tested.
Both modes go through the same `Stack::operator_client` surface — tests don't see the difference.
## Observability — what the harness prints
On bring-up success, `print_debug_info()` logs:
```
[e2e] namespace: e2e-7d3a91f4 (will be deleted on exit unless FLEET_E2E_KEEP=1)
[e2e] kubectl -n e2e-7d3a91f4 get pods
[e2e] NATS: nats://localhost:30422
[e2e] admin token: eyJhbGc... (use as auth_token)
[e2e] devices: vm-device-00-e2e-7d3a91f4
[e2e] tail agent: kubectl -n e2e-7d3a91f4 logs deploy/fleet-agent-vm-device-00 -f
[e2e] tail callout: kubectl -n e2e-7d3a91f4 logs deploy/fleet-callout -f
```
When a test fails, set `FLEET_E2E_KEEP=1` and the namespace persists so you can poke around. The next run uses a different namespace, so leaks don't compound.
## Reuse / cohesion plan
The existing `examples/fleet_e2e_demo/src/lib.rs` is the original bring-up Frankenstein. Once `harmony-fleet-e2e` exists, refactor `fleet_e2e_demo` to delegate:
```rust
// examples/fleet_e2e_demo/src/lib.rs (after refactor)
pub async fn bring_up_full_stack(...) -> ... {
let stack = harmony_fleet_e2e::Stack::bring_up(StackOptions {
auth_mode: AuthMode::Callout, // real
num_devices: cfg.num_devices,
oidc_provider: OidcProvider::RealZitadel(zitadel_config), // adapter for real Zitadel
agent_target: AgentTarget::Vm(vm_ips), // SSH-based, for the rehearsal flow
..
}).await?;
// ...
}
```
This requires the harness to support **multiple agent targets** (Pod vs VM/SSH) and **multiple OIDC providers** (mock vs real Zitadel). Architecture-wise this is a `trait AgentTarget` and a `trait OidcProvider`, both with mock + real impls. The v1 PR ships only the Pod + mock-OIDC impls; the demo refactor is a follow-up PR.
Cohesion deliverables this PR closes:
- Single home for "bring up a fleet stack" logic (currently scattered across 3 examples).
- Single home for image-build invocation (today inline `cargo build --release` + `podman build` calls live in `fleet_e2e_demo/src/lib.rs` lines 553623).
- Single home for "issue NATS test client" plumbing (the `admin_nats_client` helper in `e2e_walking_skeleton.rs` should be a Stack method).
## Wire types (same as previous plan, reduced for ping-only first pass)
In `harmony-reconciler-contracts/src/commands.rs` — add only what `ping` needs in PR 1:
```rust
pub enum Verb { Ping }
pub fn device_command_subject(device_id: &str, verb: Verb) -> String;
pub enum CommandRequest { Ping }
pub struct PingReply {
pub device_id: Id,
pub agent_version: String,
pub uptime_s: u64,
}
pub const HDR_REQUEST_ID: &str = "X-Harmony-Request-Id";
pub const HDR_DEADLINE: &str = "X-Harmony-Deadline";
pub const HDR_OPERATOR_SUB: &str = "X-Harmony-Operator-Sub";
```
`Verb::Exec` / `Verb::Logs` and their payloads are added in follow-up PRs alongside their tests.
## Agent-side command server (ping-only scaffold)
`fleet/harmony-fleet-agent/src/command_server.rs`:
```rust
pub struct CommandServer {
device_id: Id,
client: async_nats::Client,
agent_version: &'static str,
started_at: Instant,
}
impl CommandServer {
pub async fn run(self: Arc<Self>) -> Result<()> {
let subject = format!("device-commands.{}.>", self.device_id);
let mut sub = self.client.subscribe(subject).await?;
while let Some(msg) = sub.next().await {
self.dispatch(msg).await;
}
Ok(())
}
async fn dispatch(&self, msg: async_nats::Message) {
let verb = msg.subject.rsplit('.').next();
match verb {
Some("ping") => self.reply_ping(&msg).await,
_ => self.reply_error(&msg, ErrorKind::BadRequest, "unknown verb").await,
}
}
async fn reply_ping(&self, msg: &async_nats::Message) {
let reply = PingReply {
device_id: self.device_id.clone(),
agent_version: env!("CARGO_PKG_VERSION").to_string(),
uptime_s: self.started_at.elapsed().as_secs(),
};
if let Some(inbox) = &msg.reply {
let _ = self.client.publish(inbox.clone(), serde_json::to_vec(&reply)?.into()).await;
}
}
}
```
Wired into `main.rs` as a new arm of the existing `tokio::select!`. Future verbs slot into `dispatch`.
## Operator-side client (ping-only scaffold)
`fleet/harmony-fleet-operator/src/commands.rs`:
```rust
pub struct FleetCommandsClient {
nc: async_nats::Client,
default_timeout: Duration,
}
pub enum CommandError {
DeviceOffline, // 503 no_responders
Timeout,
BadReply(serde_json::Error),
Nats(async_nats::Error),
}
impl FleetCommandsClient {
pub fn new(nc: async_nats::Client) -> Self;
pub async fn ping(&self, device_id: &str) -> Result<PingReply, CommandError>;
}
```
`ping` uses `nc.request()` (relies on `no_responders` default-on in async-nats). Timeout: 5s. Decodes JSON reply into `PingReply`.
## Test ordering & PR slicing
**PR 1 (this plan):**
- `harmony-fleet-e2e` crate scaffolding
- `harmony-reconciler-contracts::commands` (ping types only)
- Agent: `runtime_enabled` config flag + `command_server.rs` (ping only)
- Operator: `commands.rs` (ping only)
- New `FleetAgentPodScore` (or inline manifest) for pod-based agents
- New `MockOidcScore` for the auth callout's issuer
- `tests/ping.rs` — passing
- Agent Dockerfile (new)
**PR 2** (after PR 1 merges):
- `tests/ping_offline.rs` (no_responders → DeviceOffline)
- Refactor `fleet_e2e_demo` to delegate to `harmony-fleet-e2e` with `AgentTarget::Vm` + `OidcProvider::RealZitadel`
**PR 3 (logs):**
- Wire types for `Verb::Logs` + `LogsReq` + `LogChunk`
- Agent handler invoking `podman_api::Containers::logs`
- Operator client streaming method
- `tests/logs.rs`
**PR 4 (exec):**
- Wire types for `Verb::Exec` + `ExecReq` + `ExecReply`
- Agent handler with container-only default + host-exec policy gate
- Operator client
- `tests/exec.rs`
**PR 5+**: web frontend wiring, CLI subcommands.
## Open questions for review
1. **Auth mode default** — Callout-with-mock-OIDC (slower, exercises real auth path), or UserPass (faster, doesn't test auth)? Plan picks Callout. UserPass available via env or `StackOptions`.
2. **Mock OIDC fixture** — build into the harness, or use an existing crate? I haven't found a small-enough off-the-shelf one; recommend hand-rolled ~200 LOC (uses `jsonwebtoken`).
3. **Image hash strategy** — content-hash of `Cargo.lock` + crate source (skip rebuild if matching tag exists)? Or always rebuild and rely on Docker layer cache? Plan: content-hash, with `FLEET_E2E_FORCE_REBUILD=1` escape hatch.
4. **Cluster lifecycle** — harness assumes the k3d cluster already exists (or auto-creates one named `fleet-e2e`). Should it also offer a `Stack::bring_up_isolated_cluster()` that creates+destroys the whole cluster per test? Plan: no, namespace isolation is enough; clusters are heavy.
5. **Ping reply shape**`PingReply { device_id, agent_version, uptime_s }` minimal. Add anything else useful for a health-check (memory, podman socket status, current desired-state revision)? Easy to extend later; v1 keeps it minimal.
6. **Subject choice**`device-commands.<id>.ping` (matches the existing callout permission template). Alternative `harmony.device.<id>.cmd.ping` (matches the doc's verbatim suggestion) would require updating the callout permissions. Plan picks the existing `device-commands.<id>.ping` subject and notes the doc's `harmony.device.*` is the same idea with different prefix; no callout change needed.
## What you'll see when you run the green ping test
```
$ cargo test -p harmony-fleet-e2e --test ping
Compiling harmony-fleet-e2e v0.1.0
Finished test [unoptimized + debuginfo] target(s) in 12.4s
Running tests/ping.rs
running 1 test
[e2e] building images: callout, operator, agent (cached, skipping rebuild)
[e2e] sideloading 3 images into k3d cluster fleet-e2e
[e2e] namespace: e2e-7d3a91f4
[e2e] deploying mock-oidc, nats, callout, operator, agent
[e2e] all pods ready in 7.2s
[e2e] NATS: nats://localhost:30422
[e2e] admin token: eyJhbGc...
test operator_can_ping_agent ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; finished in 9.8s
[e2e] tearing down namespace e2e-7d3a91f4
```
Target: green test in <15s end-to-end, with subsequent runs hitting <10s thanks to image cache + cluster reuse.