First slice of the device-commands.* protocol from fleet/requests_over_nats.md. Lands `Verb::Ping` plus the harness that proves it works against a real in-cluster agent. Wire types (`harmony-reconciler-contracts::commands`): - `Verb::Ping`, `CommandRequest`, `PingReply`, `ErrorReply`/`ErrorKind` - `device_command_subject` / `device_command_subscription` helpers - `X-Harmony-*` header constants Agent: - `command_server.rs` subscribes on `device-commands.<id>.>` and dispatches verbs; ping handler replies with `PingReply` - New `[agent].runtime_enabled` config flag (default true). When false, podman init + reconciler loop are skipped so the agent can run as a Pod on containerd-only k3d nodes; command server + heartbeat still run - `Dockerfile`: canonical multi-stage build for production registries Operator: - `commands::FleetCommandsClient` with typed `CommandError` (`DeviceOffline` via `no_responders`, `Timeout`, `BadReply`, `Nats`) E2E harness (`harmony-fleet-e2e`): - Library crate + integration test. `Stack::bring_up` provisions a fresh `e2e-<uuid8>` namespace in a shared `fleet-e2e` k3d cluster, deploys NATS (UserPass auth, JetStream on) + the agent Pod, returns a connected admin NATS client, and tears the namespace down on Drop - v1 ships `AuthMode::UserPass` only; the `Callout` variant is reserved on the public API for the follow-up PR that adds the mock OIDC fixture + NatsAuthCalloutScore deployment - Operator pod deployment is also follow-up — for ping the test process drives `FleetCommandsClient` directly against the cluster's NATS NodePort - `HARMONY_FLEET_E2E=1` gates the integration test so default `cargo test --workspace` runs don't depend on k3d/podman - Image build + sideload mirrors the `fleet_auth_callout` pattern: host `cargo build --release` → single-stage Dockerfile → `podman build` → `k3d image import`. ~12s warm bring-up, ~80s cold
21 KiB
Plan — Request/Reply over NATS, TDD via in-cluster e2e harness
Two intertwined deliverables:
fleet/harmony-fleet-e2e— a new harness crate that brings up the full stack (NATS + auth-callout + fleet-operator + fleet-agent-as-pod) in a fresh k3d namespace and tears it down at process exit. Fast (target ≤15s bring-up when the cluster is already running, ≤5s teardown). Works against k3d locally or any cluster with a kubeconfig (incl. OKD).- First feature, TDD-style:
Verb::Ping. Failing test in the harness, then the wire types + agent handler + operator client to make it green. Subsequent verbs (logs, exec) follow the same pattern in follow-up PRs.
Both land together because the harness is what proves cohesion: every fleet feature from now on gets its e2e test in the same crate, and the scattered bring-up code in examples/fleet_e2e_demo and examples/fleet_auth_callout becomes a thin layer over this harness.
Goals & non-goals
| Goal | In v1 |
|---|---|
cargo test -p harmony-fleet-e2e brings up the stack, runs ping test, tears down |
✅ |
| Per-test namespace isolation; multiple test runs can coexist in the same cluster | ✅ |
| Images built once and sideloaded into k3d (no registry push) | ✅ |
| Cluster reused across runs; only namespace is recreated | ✅ |
| Agent runs as a Pod (no VMs, no SSH, no libvirt) | ✅ |
| Harness prints NATS URL + admin creds so the developer can poke during a hung test | ✅ |
First feature: ping (operator-side FleetCommandsClient::ping, agent-side handler, wire types) |
✅ |
Runs against a remote OKD cluster via KUBECONFIG |
✅ (image-import step is conditional) |
| Non-goal (v1) | Reason |
|---|---|
logs / exec implementations |
Same wiring; covered in follow-up commits using the same harness |
| PTY | Doc Pattern B; defer |
| JetStream audit log | Defer; sidecar consumer added later |
| Zitadel in the harness | Cold-start cost is 30-60s; harness mode A uses a mock OIDC fixture for the callout to keep bring-up fast. Real Zitadel stays in fleet_e2e_demo (manual rehearsal). |
Crate layout
New workspace member at fleet/harmony-fleet-e2e/:
fleet/harmony-fleet-e2e/
├── Cargo.toml
├── README.md # How to run, debug, point at remote clusters
├── src/
│ ├── lib.rs # Public surface: Stack, StackHandle, bring_up()
│ ├── images.rs # Build + sideload (callout, operator, agent)
│ ├── namespace.rs # Unique-namespace generation + RAII cleanup
│ ├── stack.rs # Compose Scores against K8sBareTopology
│ ├── nats.rs # NatsHelmChartScore preset with callout + mock-issuer block
│ ├── mock_oidc.rs # Tiny in-cluster OIDC fixture (issues JWTs the callout accepts)
│ ├── agent_pod.rs # New Score: agent as a Pod (no VM/SSH)
│ ├── observability.rs # NodePort + admin creds, helper to mint admin JWT
│ └── client.rs # FleetCommandsClient (operator-side wrapper for tests)
└── tests/
└── ping.rs # **First TDD test** — failing until the protocol lands
Crate kind: library + [[test]] integration tests. Not a binary; harness is consumed by tests via harmony_fleet_e2e::Stack::bring_up().await.
Cargo workspace: add to root members. Build deps: harmony (k8s+nats helpers), harmony-fleet-auth, harmony-reconciler-contracts, k3d, async-nats, kube, k8s-openapi, tokio, anyhow, tracing, uuid (for namespace ID), tempfile, serde_json.
Why a separate crate (not an example)
Examples currently are bring-up scripts. The e2e harness is infrastructure for tests consumed by multiple callers (the new tests/ping.rs, future tests/logs.rs, tests/exec.rs, and eventually a slimmed-down examples/fleet_e2e_demo that just calls into it for the manual rehearsal). A library crate lets us expose Stack, StackHandle, FleetCommandsClient as proper types, with cargo test discovery and parallel-friendly per-namespace isolation.
Agent-side prerequisite: gate podman behind config
The agent currently panics if the podman socket isn't ready (fleet/harmony-fleet-agent/src/main.rs:200). For the in-cluster harness we need the agent to run on a node that doesn't expose podman.
Add to agent-config.toml:
[agent]
device_id = "vm-device-00"
# NEW: when false, skip podman init and the reconciler loop.
# Command server still runs (ping/exec-via-fallback are still useful).
runtime_enabled = true # default true; e2e harness sets false
Wire-up in main.rs:
- When
runtime_enabled = false, the agent skipsPodmanTopology::from_default_socket(), skips the reconciler periodic tick, but still subscribes to desired-state (KV watch) and runs the command server. KV deliveries with a non-podman Score variant get logged + rejected withErrorKind::BadRequest(today we'd just drop them silently).
Small, contained change (~30 lines). Unlocks pod-based agents and unblocks future verbs (exec/logs add their own runtime requirements).
Alternative considered: mount /var/run/podman/podman.sock into the pod. Rejected — k3d nodes run containerd, not podman; mount would dangle.
Harness public API
// fleet/harmony-fleet-e2e/src/lib.rs
pub struct Stack {
pub namespace: String, // e2e-<uuid8>
pub nats_url: String, // nats://localhost:<nodeport>
pub admin_token: String, // JWT for the mock OIDC, callout-accepted
pub device_ids: Vec<Id>, // ["vm-device-00", "vm-device-01", …]
pub operator_client: async_nats::Client, // pre-authed admin client
_guard: NamespaceGuard, // Drop impl deletes the namespace
}
pub struct StackOptions {
pub kubeconfig: Option<PathBuf>, // default: $KUBECONFIG, fall back to k3d-managed
pub k3d_cluster_name: Option<String>, // None = pick the harness default; required if not using k3d
pub num_devices: usize, // default 1; ping test uses 1
pub image_rebuild: bool, // env var FLEET_E2E_FORCE_REBUILD
pub keep_namespace: bool, // env var FLEET_E2E_KEEP=1 — skip teardown for debugging
pub auth_mode: AuthMode, // Callout (default) | UserPass (fastest)
}
pub enum AuthMode {
/// Real auth-callout + mock OIDC fixture. Exercises the production code path.
Callout,
/// NATS user/pass via TomlShared credentials. Skips callout entirely.
/// ~3-5s faster bring-up; use for tests that don't care about auth.
UserPass,
}
impl Stack {
pub async fn bring_up(opts: StackOptions) -> anyhow::Result<Self>;
pub fn print_debug_info(&self); // logs URL, token, namespace, kubectl shortcuts
}
Drop for NamespaceGuard: spawns a blocking task that runs kubectl delete namespace <name> --wait=false. Doesn't block process exit; the namespace garbage-collects asynchronously. If keep_namespace = true, just logs the name.
TDD test order
Test 1 (first to land): ping
// fleet/harmony-fleet-e2e/tests/ping.rs
#[tokio::test(flavor = "multi_thread")]
async fn operator_can_ping_agent() -> anyhow::Result<()> {
let stack = Stack::bring_up(StackOptions::default()).await?;
let device_id = &stack.device_ids[0];
let client = FleetCommandsClient::new(stack.operator_client.clone());
let reply = tokio::time::timeout(
Duration::from_secs(10),
client.ping(device_id.as_str()),
).await??;
assert_eq!(reply.device_id.as_str(), device_id.as_str());
assert!(!reply.agent_version.is_empty());
Ok(())
}
Failing → green sequence:
- Red: write the test above. It can't even compile because
FleetCommandsClient,Stack,bring_updon't exist. - Scaffold the harness: stub
Stack::bring_upthat just returns an error. Test compiles, fails at runtime. - Bring up the cluster bits incrementally:
- Namespace creation + RAII guard.
- NATS deploy via
NatsHelmChartScore(UserPass mode first for speed). - Operator deploy via
FleetOperatorScore(image sideloaded). - Agent pod deploy via new
FleetAgentPodScore. - Wait for pod readiness.
- Build operator admin NATS client.
- Implement the wire types in
harmony-reconciler-contracts/src/commands.rs(justVerb::Ping+CommandRequest::Ping+PingReplyfor now). - Implement agent command server with only the ping handler (
fleet/harmony-fleet-agent/src/command_server.rs). - Implement
FleetCommandsClient::pinginfleet/harmony-fleet-operator/src/commands.rs. - Test goes green.
- Add Callout auth mode to the harness (mock OIDC fixture deployed alongside NATS), re-run test in both modes.
Test 2 (follow-up PR): no-responders → DeviceOffline
#[tokio::test]
async fn ping_to_offline_device_returns_immediately() -> anyhow::Result<()> {
let stack = Stack::bring_up(StackOptions::default()).await?;
let client = FleetCommandsClient::new(stack.operator_client.clone());
let started = Instant::now();
let err = client.ping("nonexistent-device").await.unwrap_err();
assert!(matches!(err, CommandError::DeviceOffline));
assert!(started.elapsed() < Duration::from_secs(1));
Ok(())
}
Test 3+ (follow-up PR, same harness): logs + exec — same pattern.
Image build & sideload
src/images.rs exposes:
pub struct Images {
pub callout: String, // e.g. harmony-nats-callout:e2e-<contenthash>
pub operator: String,
pub agent: String,
}
pub async fn build_and_sideload(cluster: &K3dCluster, opts: BuildOpts) -> Result<Images>;
Implementation:
- For each of (callout, operator, agent):
- Hash the crate's source tree +
Cargo.lock. - If
podman imagesdoesn't contain<image>:<hash>andFLEET_E2E_FORCE_REBUILD != 1, skip. - Otherwise:
cargo build --release -p <crate>+podman build -f Dockerfile -t <image>:<hash>. podman save | k3d image import -c <cluster>(or--volumesif--importdoesn't accept stdin; use the existing pattern fromexamples/fleet_e2e_demo).
- Hash the crate's source tree +
Dockerfiles:
- Callout: exists at
nats/callout/Dockerfile(used by the demo). - Operator: exists at
fleet/harmony-fleet-operator/Dockerfile. - Agent: doesn't exist yet — add
fleet/harmony-fleet-agent/Dockerfile. Distroless base, single static binary, ~5MB image.
Sideload bypass for remote clusters: if opts.registry is set, push to that registry and skip sideload. Out of scope for v1 (the user said defer); v1 just panics if running against a non-k3d cluster.
Per-namespace isolation
Today the demo hardcodes fleet-system and zitadel. The harness:
- Picks namespace
e2e-<uuid8>perStack::bring_upcall. - Every Score in the harness is parametrized on
namespace; nothing is hardcoded. - The
FleetOperatorScorealready takes anamespace(verified inharmony/src/modules/fleet/operator/score.rs). TheNatsHelmChartScoretoo. TheNatsAuthCalloutScoretoo. Good. - The CRDs (
Deployment,Device) are cluster-scoped — but they're created once per cluster (idempotent apply), shared across e2e runs. The operator filters by namespace via itskube::Api::namespaced()calls. - Wait —
Deviceis cluster-scoped. Two simultaneous e2e runs would collide onDeviceCR names. Two mitigations:- Option A (simpler): per-test device IDs include the namespace suffix (
vm-device-00-e2e-abc12345). No collision. - Option B: scope the
DeviceCR to a namespace. Bigger change to the operator. Out of scope. - Plan picks A.
- Option A (simpler): per-test device IDs include the namespace suffix (
Auth mode story
Default AuthMode::Callout because the user explicitly asked for "nats + callout + operator + agent". To avoid Zitadel's bring-up cost, the harness ships a mock_oidc.rs fixture: a tiny single-Pod HTTP service that:
- Serves
/.well-known/openid-configurationand/jwks.jsonfrom a process-generated keypair. - Mints JWTs for
device-<id>andfleet-opsmachine users on demand via a/tokenendpoint the harness calls. - ~200 LOC, no external deps. Lives inside
harmony-fleet-e2e(not exposed elsewhere).
The callout points its oidc_issuer_url at the mock service's in-cluster URL. From the callout's perspective this is indistinguishable from Zitadel.
AuthMode::UserPass skips the callout entirely: NATS deploys with two static accounts (device + admin) and the agent's TomlShared credential variant connects directly. ~3-5s faster bring-up. Useful when iterating on the command protocol itself, where auth isn't being tested.
Both modes go through the same Stack::operator_client surface — tests don't see the difference.
Observability — what the harness prints
On bring-up success, print_debug_info() logs:
[e2e] namespace: e2e-7d3a91f4 (will be deleted on exit unless FLEET_E2E_KEEP=1)
[e2e] kubectl -n e2e-7d3a91f4 get pods
[e2e] NATS: nats://localhost:30422
[e2e] admin token: eyJhbGc... (use as auth_token)
[e2e] devices: vm-device-00-e2e-7d3a91f4
[e2e] tail agent: kubectl -n e2e-7d3a91f4 logs deploy/fleet-agent-vm-device-00 -f
[e2e] tail callout: kubectl -n e2e-7d3a91f4 logs deploy/fleet-callout -f
When a test fails, set FLEET_E2E_KEEP=1 and the namespace persists so you can poke around. The next run uses a different namespace, so leaks don't compound.
Reuse / cohesion plan
The existing examples/fleet_e2e_demo/src/lib.rs is the original bring-up Frankenstein. Once harmony-fleet-e2e exists, refactor fleet_e2e_demo to delegate:
// examples/fleet_e2e_demo/src/lib.rs (after refactor)
pub async fn bring_up_full_stack(...) -> ... {
let stack = harmony_fleet_e2e::Stack::bring_up(StackOptions {
auth_mode: AuthMode::Callout, // real
num_devices: cfg.num_devices,
oidc_provider: OidcProvider::RealZitadel(zitadel_config), // adapter for real Zitadel
agent_target: AgentTarget::Vm(vm_ips), // SSH-based, for the rehearsal flow
..
}).await?;
// ...
}
This requires the harness to support multiple agent targets (Pod vs VM/SSH) and multiple OIDC providers (mock vs real Zitadel). Architecture-wise this is a trait AgentTarget and a trait OidcProvider, both with mock + real impls. The v1 PR ships only the Pod + mock-OIDC impls; the demo refactor is a follow-up PR.
Cohesion deliverables this PR closes:
- Single home for "bring up a fleet stack" logic (currently scattered across 3 examples).
- Single home for image-build invocation (today inline
cargo build --release+podman buildcalls live infleet_e2e_demo/src/lib.rslines 553–623). - Single home for "issue NATS test client" plumbing (the
admin_nats_clienthelper ine2e_walking_skeleton.rsshould be a Stack method).
Wire types (same as previous plan, reduced for ping-only first pass)
In harmony-reconciler-contracts/src/commands.rs — add only what ping needs in PR 1:
pub enum Verb { Ping }
pub fn device_command_subject(device_id: &str, verb: Verb) -> String;
pub enum CommandRequest { Ping }
pub struct PingReply {
pub device_id: Id,
pub agent_version: String,
pub uptime_s: u64,
}
pub const HDR_REQUEST_ID: &str = "X-Harmony-Request-Id";
pub const HDR_DEADLINE: &str = "X-Harmony-Deadline";
pub const HDR_OPERATOR_SUB: &str = "X-Harmony-Operator-Sub";
Verb::Exec / Verb::Logs and their payloads are added in follow-up PRs alongside their tests.
Agent-side command server (ping-only scaffold)
fleet/harmony-fleet-agent/src/command_server.rs:
pub struct CommandServer {
device_id: Id,
client: async_nats::Client,
agent_version: &'static str,
started_at: Instant,
}
impl CommandServer {
pub async fn run(self: Arc<Self>) -> Result<()> {
let subject = format!("device-commands.{}.>", self.device_id);
let mut sub = self.client.subscribe(subject).await?;
while let Some(msg) = sub.next().await {
self.dispatch(msg).await;
}
Ok(())
}
async fn dispatch(&self, msg: async_nats::Message) {
let verb = msg.subject.rsplit('.').next();
match verb {
Some("ping") => self.reply_ping(&msg).await,
_ => self.reply_error(&msg, ErrorKind::BadRequest, "unknown verb").await,
}
}
async fn reply_ping(&self, msg: &async_nats::Message) {
let reply = PingReply {
device_id: self.device_id.clone(),
agent_version: env!("CARGO_PKG_VERSION").to_string(),
uptime_s: self.started_at.elapsed().as_secs(),
};
if let Some(inbox) = &msg.reply {
let _ = self.client.publish(inbox.clone(), serde_json::to_vec(&reply)?.into()).await;
}
}
}
Wired into main.rs as a new arm of the existing tokio::select!. Future verbs slot into dispatch.
Operator-side client (ping-only scaffold)
fleet/harmony-fleet-operator/src/commands.rs:
pub struct FleetCommandsClient {
nc: async_nats::Client,
default_timeout: Duration,
}
pub enum CommandError {
DeviceOffline, // 503 no_responders
Timeout,
BadReply(serde_json::Error),
Nats(async_nats::Error),
}
impl FleetCommandsClient {
pub fn new(nc: async_nats::Client) -> Self;
pub async fn ping(&self, device_id: &str) -> Result<PingReply, CommandError>;
}
ping uses nc.request() (relies on no_responders default-on in async-nats). Timeout: 5s. Decodes JSON reply into PingReply.
Test ordering & PR slicing
PR 1 (this plan):
harmony-fleet-e2ecrate scaffoldingharmony-reconciler-contracts::commands(ping types only)- Agent:
runtime_enabledconfig flag +command_server.rs(ping only) - Operator:
commands.rs(ping only) - New
FleetAgentPodScore(or inline manifest) for pod-based agents - New
MockOidcScorefor the auth callout's issuer tests/ping.rs— passing- Agent Dockerfile (new)
PR 2 (after PR 1 merges):
tests/ping_offline.rs(no_responders → DeviceOffline)- Refactor
fleet_e2e_demoto delegate toharmony-fleet-e2ewithAgentTarget::Vm+OidcProvider::RealZitadel
PR 3 (logs):
- Wire types for
Verb::Logs+LogsReq+LogChunk - Agent handler invoking
podman_api::Containers::logs - Operator client streaming method
tests/logs.rs
PR 4 (exec):
- Wire types for
Verb::Exec+ExecReq+ExecReply - Agent handler with container-only default + host-exec policy gate
- Operator client
tests/exec.rs
PR 5+: web frontend wiring, CLI subcommands.
Open questions for review
- Auth mode default — Callout-with-mock-OIDC (slower, exercises real auth path), or UserPass (faster, doesn't test auth)? Plan picks Callout. UserPass available via env or
StackOptions. - Mock OIDC fixture — build into the harness, or use an existing crate? I haven't found a small-enough off-the-shelf one; recommend hand-rolled ~200 LOC (uses
jsonwebtoken). - Image hash strategy — content-hash of
Cargo.lock+ crate source (skip rebuild if matching tag exists)? Or always rebuild and rely on Docker layer cache? Plan: content-hash, withFLEET_E2E_FORCE_REBUILD=1escape hatch. - Cluster lifecycle — harness assumes the k3d cluster already exists (or auto-creates one named
fleet-e2e). Should it also offer aStack::bring_up_isolated_cluster()that creates+destroys the whole cluster per test? Plan: no, namespace isolation is enough; clusters are heavy. - Ping reply shape —
PingReply { device_id, agent_version, uptime_s }minimal. Add anything else useful for a health-check (memory, podman socket status, current desired-state revision)? Easy to extend later; v1 keeps it minimal. - Subject choice —
device-commands.<id>.ping(matches the existing callout permission template). Alternativeharmony.device.<id>.cmd.ping(matches the doc's verbatim suggestion) would require updating the callout permissions. Plan picks the existingdevice-commands.<id>.pingsubject and notes the doc'sharmony.device.*is the same idea with different prefix; no callout change needed.
What you'll see when you run the green ping test
$ cargo test -p harmony-fleet-e2e --test ping
Compiling harmony-fleet-e2e v0.1.0
Finished test [unoptimized + debuginfo] target(s) in 12.4s
Running tests/ping.rs
running 1 test
[e2e] building images: callout, operator, agent (cached, skipping rebuild)
[e2e] sideloading 3 images into k3d cluster fleet-e2e
[e2e] namespace: e2e-7d3a91f4
[e2e] deploying mock-oidc, nats, callout, operator, agent
[e2e] all pods ready in 7.2s
[e2e] NATS: nats://localhost:30422
[e2e] admin token: eyJhbGc...
test operator_can_ping_agent ... ok
test result: ok. 1 passed; 0 failed; 0 ignored; finished in 9.8s
[e2e] tearing down namespace e2e-7d3a91f4
Target: green test in <15s end-to-end, with subsequent runs hitting <10s thanks to image cache + cluster reuse.