Files
harmony/fleet/PLAN_requests_over_nats.md
Jean-Gabriel Gill-Couture d013246a68 feat(fleet): request/reply commands over NATS — wire types, agent server, operator client, e2e harness
First slice of the device-commands.* protocol from
fleet/requests_over_nats.md. Lands `Verb::Ping` plus the harness that
proves it works against a real in-cluster agent.

Wire types (`harmony-reconciler-contracts::commands`):
- `Verb::Ping`, `CommandRequest`, `PingReply`, `ErrorReply`/`ErrorKind`
- `device_command_subject` / `device_command_subscription` helpers
- `X-Harmony-*` header constants

Agent:
- `command_server.rs` subscribes on `device-commands.<id>.>` and
  dispatches verbs; ping handler replies with `PingReply`
- New `[agent].runtime_enabled` config flag (default true). When
  false, podman init + reconciler loop are skipped so the agent can
  run as a Pod on containerd-only k3d nodes; command server +
  heartbeat still run
- `Dockerfile`: canonical multi-stage build for production registries

Operator:
- `commands::FleetCommandsClient` with typed `CommandError`
  (`DeviceOffline` via `no_responders`, `Timeout`, `BadReply`, `Nats`)

E2E harness (`harmony-fleet-e2e`):
- Library crate + integration test. `Stack::bring_up` provisions a
  fresh `e2e-<uuid8>` namespace in a shared `fleet-e2e` k3d cluster,
  deploys NATS (UserPass auth, JetStream on) + the agent Pod, returns
  a connected admin NATS client, and tears the namespace down on Drop
- v1 ships `AuthMode::UserPass` only; the `Callout` variant is
  reserved on the public API for the follow-up PR that adds the mock
  OIDC fixture + NatsAuthCalloutScore deployment
- Operator pod deployment is also follow-up — for ping the test
  process drives `FleetCommandsClient` directly against the cluster's
  NATS NodePort
- `HARMONY_FLEET_E2E=1` gates the integration test so default
  `cargo test --workspace` runs don't depend on k3d/podman
- Image build + sideload mirrors the `fleet_auth_callout` pattern:
  host `cargo build --release` → single-stage Dockerfile → `podman
  build` → `k3d image import`. ~12s warm bring-up, ~80s cold
2026-05-18 09:47:36 -04:00

21 KiB
Raw Permalink Blame History

Plan — Request/Reply over NATS, TDD via in-cluster e2e harness

Two intertwined deliverables:

  1. fleet/harmony-fleet-e2e — a new harness crate that brings up the full stack (NATS + auth-callout + fleet-operator + fleet-agent-as-pod) in a fresh k3d namespace and tears it down at process exit. Fast (target ≤15s bring-up when the cluster is already running, ≤5s teardown). Works against k3d locally or any cluster with a kubeconfig (incl. OKD).
  2. First feature, TDD-style: Verb::Ping. Failing test in the harness, then the wire types + agent handler + operator client to make it green. Subsequent verbs (logs, exec) follow the same pattern in follow-up PRs.

Both land together because the harness is what proves cohesion: every fleet feature from now on gets its e2e test in the same crate, and the scattered bring-up code in examples/fleet_e2e_demo and examples/fleet_auth_callout becomes a thin layer over this harness.

Goals & non-goals

Goal In v1
cargo test -p harmony-fleet-e2e brings up the stack, runs ping test, tears down
Per-test namespace isolation; multiple test runs can coexist in the same cluster
Images built once and sideloaded into k3d (no registry push)
Cluster reused across runs; only namespace is recreated
Agent runs as a Pod (no VMs, no SSH, no libvirt)
Harness prints NATS URL + admin creds so the developer can poke during a hung test
First feature: ping (operator-side FleetCommandsClient::ping, agent-side handler, wire types)
Runs against a remote OKD cluster via KUBECONFIG (image-import step is conditional)
Non-goal (v1) Reason
logs / exec implementations Same wiring; covered in follow-up commits using the same harness
PTY Doc Pattern B; defer
JetStream audit log Defer; sidecar consumer added later
Zitadel in the harness Cold-start cost is 30-60s; harness mode A uses a mock OIDC fixture for the callout to keep bring-up fast. Real Zitadel stays in fleet_e2e_demo (manual rehearsal).

Crate layout

New workspace member at fleet/harmony-fleet-e2e/:

fleet/harmony-fleet-e2e/
├── Cargo.toml
├── README.md                    # How to run, debug, point at remote clusters
├── src/
│   ├── lib.rs                   # Public surface: Stack, StackHandle, bring_up()
│   ├── images.rs                # Build + sideload (callout, operator, agent)
│   ├── namespace.rs             # Unique-namespace generation + RAII cleanup
│   ├── stack.rs                 # Compose Scores against K8sBareTopology
│   ├── nats.rs                  # NatsHelmChartScore preset with callout + mock-issuer block
│   ├── mock_oidc.rs             # Tiny in-cluster OIDC fixture (issues JWTs the callout accepts)
│   ├── agent_pod.rs             # New Score: agent as a Pod (no VM/SSH)
│   ├── observability.rs         # NodePort + admin creds, helper to mint admin JWT
│   └── client.rs                # FleetCommandsClient (operator-side wrapper for tests)
└── tests/
    └── ping.rs                  # **First TDD test** — failing until the protocol lands

Crate kind: library + [[test]] integration tests. Not a binary; harness is consumed by tests via harmony_fleet_e2e::Stack::bring_up().await.

Cargo workspace: add to root members. Build deps: harmony (k8s+nats helpers), harmony-fleet-auth, harmony-reconciler-contracts, k3d, async-nats, kube, k8s-openapi, tokio, anyhow, tracing, uuid (for namespace ID), tempfile, serde_json.

Why a separate crate (not an example)

Examples currently are bring-up scripts. The e2e harness is infrastructure for tests consumed by multiple callers (the new tests/ping.rs, future tests/logs.rs, tests/exec.rs, and eventually a slimmed-down examples/fleet_e2e_demo that just calls into it for the manual rehearsal). A library crate lets us expose Stack, StackHandle, FleetCommandsClient as proper types, with cargo test discovery and parallel-friendly per-namespace isolation.

Agent-side prerequisite: gate podman behind config

The agent currently panics if the podman socket isn't ready (fleet/harmony-fleet-agent/src/main.rs:200). For the in-cluster harness we need the agent to run on a node that doesn't expose podman.

Add to agent-config.toml:

[agent]
device_id = "vm-device-00"
# NEW: when false, skip podman init and the reconciler loop.
# Command server still runs (ping/exec-via-fallback are still useful).
runtime_enabled = true   # default true; e2e harness sets false

Wire-up in main.rs:

  • When runtime_enabled = false, the agent skips PodmanTopology::from_default_socket(), skips the reconciler periodic tick, but still subscribes to desired-state (KV watch) and runs the command server. KV deliveries with a non-podman Score variant get logged + rejected with ErrorKind::BadRequest (today we'd just drop them silently).

Small, contained change (~30 lines). Unlocks pod-based agents and unblocks future verbs (exec/logs add their own runtime requirements).

Alternative considered: mount /var/run/podman/podman.sock into the pod. Rejected — k3d nodes run containerd, not podman; mount would dangle.

Harness public API

// fleet/harmony-fleet-e2e/src/lib.rs
pub struct Stack {
    pub namespace: String,            // e2e-<uuid8>
    pub nats_url: String,             // nats://localhost:<nodeport>
    pub admin_token: String,          // JWT for the mock OIDC, callout-accepted
    pub device_ids: Vec<Id>,          // ["vm-device-00", "vm-device-01", …]
    pub operator_client: async_nats::Client,  // pre-authed admin client
    _guard: NamespaceGuard,           // Drop impl deletes the namespace
}

pub struct StackOptions {
    pub kubeconfig: Option<PathBuf>,   // default: $KUBECONFIG, fall back to k3d-managed
    pub k3d_cluster_name: Option<String>, // None = pick the harness default; required if not using k3d
    pub num_devices: usize,            // default 1; ping test uses 1
    pub image_rebuild: bool,           // env var FLEET_E2E_FORCE_REBUILD
    pub keep_namespace: bool,          // env var FLEET_E2E_KEEP=1 — skip teardown for debugging
    pub auth_mode: AuthMode,           // Callout (default) | UserPass (fastest)
}

pub enum AuthMode {
    /// Real auth-callout + mock OIDC fixture. Exercises the production code path.
    Callout,
    /// NATS user/pass via TomlShared credentials. Skips callout entirely.
    /// ~3-5s faster bring-up; use for tests that don't care about auth.
    UserPass,
}

impl Stack {
    pub async fn bring_up(opts: StackOptions) -> anyhow::Result<Self>;
    pub fn print_debug_info(&self);     // logs URL, token, namespace, kubectl shortcuts
}

Drop for NamespaceGuard: spawns a blocking task that runs kubectl delete namespace <name> --wait=false. Doesn't block process exit; the namespace garbage-collects asynchronously. If keep_namespace = true, just logs the name.

TDD test order

Test 1 (first to land): ping

// fleet/harmony-fleet-e2e/tests/ping.rs
#[tokio::test(flavor = "multi_thread")]
async fn operator_can_ping_agent() -> anyhow::Result<()> {
    let stack = Stack::bring_up(StackOptions::default()).await?;
    let device_id = &stack.device_ids[0];

    let client = FleetCommandsClient::new(stack.operator_client.clone());
    let reply = tokio::time::timeout(
        Duration::from_secs(10),
        client.ping(device_id.as_str()),
    ).await??;

    assert_eq!(reply.device_id.as_str(), device_id.as_str());
    assert!(!reply.agent_version.is_empty());
    Ok(())
}

Failing → green sequence:

  1. Red: write the test above. It can't even compile because FleetCommandsClient, Stack, bring_up don't exist.
  2. Scaffold the harness: stub Stack::bring_up that just returns an error. Test compiles, fails at runtime.
  3. Bring up the cluster bits incrementally:
    • Namespace creation + RAII guard.
    • NATS deploy via NatsHelmChartScore (UserPass mode first for speed).
    • Operator deploy via FleetOperatorScore (image sideloaded).
    • Agent pod deploy via new FleetAgentPodScore.
    • Wait for pod readiness.
    • Build operator admin NATS client.
  4. Implement the wire types in harmony-reconciler-contracts/src/commands.rs (just Verb::Ping + CommandRequest::Ping + PingReply for now).
  5. Implement agent command server with only the ping handler (fleet/harmony-fleet-agent/src/command_server.rs).
  6. Implement FleetCommandsClient::ping in fleet/harmony-fleet-operator/src/commands.rs.
  7. Test goes green.
  8. Add Callout auth mode to the harness (mock OIDC fixture deployed alongside NATS), re-run test in both modes.

Test 2 (follow-up PR): no-responders → DeviceOffline

#[tokio::test]
async fn ping_to_offline_device_returns_immediately() -> anyhow::Result<()> {
    let stack = Stack::bring_up(StackOptions::default()).await?;
    let client = FleetCommandsClient::new(stack.operator_client.clone());
    let started = Instant::now();
    let err = client.ping("nonexistent-device").await.unwrap_err();
    assert!(matches!(err, CommandError::DeviceOffline));
    assert!(started.elapsed() < Duration::from_secs(1));
    Ok(())
}

Test 3+ (follow-up PR, same harness): logs + exec — same pattern.

Image build & sideload

src/images.rs exposes:

pub struct Images {
    pub callout: String,           // e.g. harmony-nats-callout:e2e-<contenthash>
    pub operator: String,
    pub agent: String,
}

pub async fn build_and_sideload(cluster: &K3dCluster, opts: BuildOpts) -> Result<Images>;

Implementation:

  • For each of (callout, operator, agent):
    • Hash the crate's source tree + Cargo.lock.
    • If podman images doesn't contain <image>:<hash> and FLEET_E2E_FORCE_REBUILD != 1, skip.
    • Otherwise: cargo build --release -p <crate> + podman build -f Dockerfile -t <image>:<hash>.
    • podman save | k3d image import -c <cluster> (or --volumes if --import doesn't accept stdin; use the existing pattern from examples/fleet_e2e_demo).

Dockerfiles:

  • Callout: exists at nats/callout/Dockerfile (used by the demo).
  • Operator: exists at fleet/harmony-fleet-operator/Dockerfile.
  • Agent: doesn't exist yet — add fleet/harmony-fleet-agent/Dockerfile. Distroless base, single static binary, ~5MB image.

Sideload bypass for remote clusters: if opts.registry is set, push to that registry and skip sideload. Out of scope for v1 (the user said defer); v1 just panics if running against a non-k3d cluster.

Per-namespace isolation

Today the demo hardcodes fleet-system and zitadel. The harness:

  • Picks namespace e2e-<uuid8> per Stack::bring_up call.
  • Every Score in the harness is parametrized on namespace; nothing is hardcoded.
  • The FleetOperatorScore already takes a namespace (verified in harmony/src/modules/fleet/operator/score.rs). The NatsHelmChartScore too. The NatsAuthCalloutScore too. Good.
  • The CRDs (Deployment, Device) are cluster-scoped — but they're created once per cluster (idempotent apply), shared across e2e runs. The operator filters by namespace via its kube::Api::namespaced() calls.
  • Wait — Device is cluster-scoped. Two simultaneous e2e runs would collide on Device CR names. Two mitigations:
    • Option A (simpler): per-test device IDs include the namespace suffix (vm-device-00-e2e-abc12345). No collision.
    • Option B: scope the Device CR to a namespace. Bigger change to the operator. Out of scope.
    • Plan picks A.

Auth mode story

Default AuthMode::Callout because the user explicitly asked for "nats + callout + operator + agent". To avoid Zitadel's bring-up cost, the harness ships a mock_oidc.rs fixture: a tiny single-Pod HTTP service that:

  • Serves /.well-known/openid-configuration and /jwks.json from a process-generated keypair.
  • Mints JWTs for device-<id> and fleet-ops machine users on demand via a /token endpoint the harness calls.
  • ~200 LOC, no external deps. Lives inside harmony-fleet-e2e (not exposed elsewhere).

The callout points its oidc_issuer_url at the mock service's in-cluster URL. From the callout's perspective this is indistinguishable from Zitadel.

AuthMode::UserPass skips the callout entirely: NATS deploys with two static accounts (device + admin) and the agent's TomlShared credential variant connects directly. ~3-5s faster bring-up. Useful when iterating on the command protocol itself, where auth isn't being tested.

Both modes go through the same Stack::operator_client surface — tests don't see the difference.

Observability — what the harness prints

On bring-up success, print_debug_info() logs:

[e2e] namespace: e2e-7d3a91f4 (will be deleted on exit unless FLEET_E2E_KEEP=1)
[e2e] kubectl -n e2e-7d3a91f4 get pods
[e2e] NATS: nats://localhost:30422
[e2e] admin token: eyJhbGc... (use as auth_token)
[e2e] devices: vm-device-00-e2e-7d3a91f4
[e2e] tail agent: kubectl -n e2e-7d3a91f4 logs deploy/fleet-agent-vm-device-00 -f
[e2e] tail callout: kubectl -n e2e-7d3a91f4 logs deploy/fleet-callout -f

When a test fails, set FLEET_E2E_KEEP=1 and the namespace persists so you can poke around. The next run uses a different namespace, so leaks don't compound.

Reuse / cohesion plan

The existing examples/fleet_e2e_demo/src/lib.rs is the original bring-up Frankenstein. Once harmony-fleet-e2e exists, refactor fleet_e2e_demo to delegate:

// examples/fleet_e2e_demo/src/lib.rs (after refactor)
pub async fn bring_up_full_stack(...) -> ... {
    let stack = harmony_fleet_e2e::Stack::bring_up(StackOptions {
        auth_mode: AuthMode::Callout,           // real
        num_devices: cfg.num_devices,
        oidc_provider: OidcProvider::RealZitadel(zitadel_config),  // adapter for real Zitadel
        agent_target: AgentTarget::Vm(vm_ips),  // SSH-based, for the rehearsal flow
        ..
    }).await?;
    // ...
}

This requires the harness to support multiple agent targets (Pod vs VM/SSH) and multiple OIDC providers (mock vs real Zitadel). Architecture-wise this is a trait AgentTarget and a trait OidcProvider, both with mock + real impls. The v1 PR ships only the Pod + mock-OIDC impls; the demo refactor is a follow-up PR.

Cohesion deliverables this PR closes:

  • Single home for "bring up a fleet stack" logic (currently scattered across 3 examples).
  • Single home for image-build invocation (today inline cargo build --release + podman build calls live in fleet_e2e_demo/src/lib.rs lines 553623).
  • Single home for "issue NATS test client" plumbing (the admin_nats_client helper in e2e_walking_skeleton.rs should be a Stack method).

Wire types (same as previous plan, reduced for ping-only first pass)

In harmony-reconciler-contracts/src/commands.rs — add only what ping needs in PR 1:

pub enum Verb { Ping }
pub fn device_command_subject(device_id: &str, verb: Verb) -> String;

pub enum CommandRequest { Ping }
pub struct PingReply {
    pub device_id: Id,
    pub agent_version: String,
    pub uptime_s: u64,
}

pub const HDR_REQUEST_ID: &str = "X-Harmony-Request-Id";
pub const HDR_DEADLINE: &str = "X-Harmony-Deadline";
pub const HDR_OPERATOR_SUB: &str = "X-Harmony-Operator-Sub";

Verb::Exec / Verb::Logs and their payloads are added in follow-up PRs alongside their tests.

Agent-side command server (ping-only scaffold)

fleet/harmony-fleet-agent/src/command_server.rs:

pub struct CommandServer {
    device_id: Id,
    client: async_nats::Client,
    agent_version: &'static str,
    started_at: Instant,
}

impl CommandServer {
    pub async fn run(self: Arc<Self>) -> Result<()> {
        let subject = format!("device-commands.{}.>", self.device_id);
        let mut sub = self.client.subscribe(subject).await?;
        while let Some(msg) = sub.next().await {
            self.dispatch(msg).await;
        }
        Ok(())
    }

    async fn dispatch(&self, msg: async_nats::Message) {
        let verb = msg.subject.rsplit('.').next();
        match verb {
            Some("ping") => self.reply_ping(&msg).await,
            _ => self.reply_error(&msg, ErrorKind::BadRequest, "unknown verb").await,
        }
    }

    async fn reply_ping(&self, msg: &async_nats::Message) {
        let reply = PingReply {
            device_id: self.device_id.clone(),
            agent_version: env!("CARGO_PKG_VERSION").to_string(),
            uptime_s: self.started_at.elapsed().as_secs(),
        };
        if let Some(inbox) = &msg.reply {
            let _ = self.client.publish(inbox.clone(), serde_json::to_vec(&reply)?.into()).await;
        }
    }
}

Wired into main.rs as a new arm of the existing tokio::select!. Future verbs slot into dispatch.

Operator-side client (ping-only scaffold)

fleet/harmony-fleet-operator/src/commands.rs:

pub struct FleetCommandsClient {
    nc: async_nats::Client,
    default_timeout: Duration,
}

pub enum CommandError {
    DeviceOffline,   // 503 no_responders
    Timeout,
    BadReply(serde_json::Error),
    Nats(async_nats::Error),
}

impl FleetCommandsClient {
    pub fn new(nc: async_nats::Client) -> Self;
    pub async fn ping(&self, device_id: &str) -> Result<PingReply, CommandError>;
}

ping uses nc.request() (relies on no_responders default-on in async-nats). Timeout: 5s. Decodes JSON reply into PingReply.

Test ordering & PR slicing

PR 1 (this plan):

  • harmony-fleet-e2e crate scaffolding
  • harmony-reconciler-contracts::commands (ping types only)
  • Agent: runtime_enabled config flag + command_server.rs (ping only)
  • Operator: commands.rs (ping only)
  • New FleetAgentPodScore (or inline manifest) for pod-based agents
  • New MockOidcScore for the auth callout's issuer
  • tests/ping.rs — passing
  • Agent Dockerfile (new)

PR 2 (after PR 1 merges):

  • tests/ping_offline.rs (no_responders → DeviceOffline)
  • Refactor fleet_e2e_demo to delegate to harmony-fleet-e2e with AgentTarget::Vm + OidcProvider::RealZitadel

PR 3 (logs):

  • Wire types for Verb::Logs + LogsReq + LogChunk
  • Agent handler invoking podman_api::Containers::logs
  • Operator client streaming method
  • tests/logs.rs

PR 4 (exec):

  • Wire types for Verb::Exec + ExecReq + ExecReply
  • Agent handler with container-only default + host-exec policy gate
  • Operator client
  • tests/exec.rs

PR 5+: web frontend wiring, CLI subcommands.

Open questions for review

  1. Auth mode default — Callout-with-mock-OIDC (slower, exercises real auth path), or UserPass (faster, doesn't test auth)? Plan picks Callout. UserPass available via env or StackOptions.
  2. Mock OIDC fixture — build into the harness, or use an existing crate? I haven't found a small-enough off-the-shelf one; recommend hand-rolled ~200 LOC (uses jsonwebtoken).
  3. Image hash strategy — content-hash of Cargo.lock + crate source (skip rebuild if matching tag exists)? Or always rebuild and rely on Docker layer cache? Plan: content-hash, with FLEET_E2E_FORCE_REBUILD=1 escape hatch.
  4. Cluster lifecycle — harness assumes the k3d cluster already exists (or auto-creates one named fleet-e2e). Should it also offer a Stack::bring_up_isolated_cluster() that creates+destroys the whole cluster per test? Plan: no, namespace isolation is enough; clusters are heavy.
  5. Ping reply shapePingReply { device_id, agent_version, uptime_s } minimal. Add anything else useful for a health-check (memory, podman socket status, current desired-state revision)? Easy to extend later; v1 keeps it minimal.
  6. Subject choicedevice-commands.<id>.ping (matches the existing callout permission template). Alternative harmony.device.<id>.cmd.ping (matches the doc's verbatim suggestion) would require updating the callout permissions. Plan picks the existing device-commands.<id>.ping subject and notes the doc's harmony.device.* is the same idea with different prefix; no callout change needed.

What you'll see when you run the green ping test

$ cargo test -p harmony-fleet-e2e --test ping
   Compiling harmony-fleet-e2e v0.1.0
    Finished test [unoptimized + debuginfo] target(s) in 12.4s
     Running tests/ping.rs

running 1 test
[e2e] building images: callout, operator, agent (cached, skipping rebuild)
[e2e] sideloading 3 images into k3d cluster fleet-e2e
[e2e] namespace: e2e-7d3a91f4
[e2e] deploying mock-oidc, nats, callout, operator, agent
[e2e] all pods ready in 7.2s
[e2e] NATS: nats://localhost:30422
[e2e] admin token: eyJhbGc...
test operator_can_ping_agent ... ok

test result: ok. 1 passed; 0 failed; 0 ignored; finished in 9.8s
[e2e] tearing down namespace e2e-7d3a91f4

Target: green test in <15s end-to-end, with subsequent runs hitting <10s thanks to image cache + cluster reuse.