Files
harmony/examples/fleet_auth_callout/tests/security_model.rs
Jean-Gabriel Gill-Couture 6c45fb22ba feat(nats-callout): production callout + harmony module + e2e demo
harmony-nats-callout becomes a deployable service, not just a library:
- New [[bin]] target with env+secret-file driven config and
  SIGINT/SIGTERM-aware shutdown.
- Dockerfile (single-stage archlinux:base, non-root, matches
  harmony-fleet-operator convention).
- Refactored handler into a pure `decide()` function so the entire
  authorization decision tree is unit-testable without async-nats.
- New `roles` module with role resolution + a `validate_device_id`
  security gate that rejects NATS subject metacharacters in device_id
  (.>* whitespace) — closes a real escalation path through the
  `{device_id}` placeholder in the per-device permissions block.
- Configurable role claim path + admin/device role names; admin wins
  when both are present (privilege-escalation invariant).

57 unit tests cover every reachable branch of the security decision
tree; 4 e2e tests in nats/integration-test-callout exercise real NATS
in podman with: device pubsub on own subjects, cross-device subject
isolation, admin-can-read-anything, and JWT-without-role rejection.

harmony/src/modules/nats_auth_callout/:
- New `NatsAuthCalloutScore` deploys the callout as a K8s Deployment +
  Secret. fsGroup + 0o440 secret mode so the non-root container can
  read its mounted seed/password without leaving them in env vars.
- `render_auth_callout_block` helper produces the YAML for NATS Helm
  `config.merge.authorization.auth_callout` so both halves stay in
  sync.

examples/fleet_auth_callout/:
- `bring_up_stack()` orchestrates k3d -> Zitadel + Postgres ->
  CoreDNS rewrite -> project + roles + machine users with JWT keys
  -> NATS Helm with auth_callout block -> callout image build +
  sideload -> NatsAuthCalloutScore deploy. Idempotent across re-runs
  (issuer NKey persisted in a K8s secret so user JWTs survive
  restarts).
- `mint_access_token()` RFC 7523 JWT-bearer client. Uses Host header
  with port so Zitadel emits a matching issuer.
- main.rs prints URLs/creds/keyIds and waits for Ctrl-C.
- Three #[tokio::test] functions sharing one cluster via OnceCell:
  admin_can_read_any_device_subject, device_can_only_access_own_subjects,
  unknown_role_is_rejected. All green on real k3d.
2026-05-03 15:01:44 -04:00

132 lines
4.6 KiB
Rust

//! Real cargo tests proving the IoT fleet security model.
//!
//! All tests share a single bringup of the stack via [`OnceCell`]. The
//! cluster keeps running across the suite, with each test using the
//! cached machine keys to mint Zitadel JWTs and exercise NATS through
//! the auth callout. Three invariants:
//!
//! 1. `admin_can_read_any_device_subject` — fleet-admin sees other devices' state.
//! 2. `device_can_only_access_own_subjects` — sensor-a is denied access to sensor-b's commands.
//! 3. `unknown_role_is_rejected` — a Zitadel-authenticated user with no
//! fleet role cannot connect to NATS.
//!
//! ## Why these tests are real-stack
//!
//! Mocking the OIDC issuer or NATS would only re-prove the unit tests
//! already cover. The point of this suite is to confirm — in CI, in
//! cargo — that the **deployed** stack on k3d enforces the security
//! model end-to-end. Hidden cluster-level misconfiguration (an unset
//! `auth_callout` block, a wrong issuer pubkey, a CoreDNS rewrite drift,
//! a permissions YAML typo) only shows up here.
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use async_nats::ConnectOptions;
use example_fleet_auth_callout::{
StackHandles, bring_up_stack, mint_access_token, scopes_for_project,
};
use futures_util::StreamExt;
use tokio::sync::OnceCell;
static STACK: OnceCell<Arc<StackHandles>> = OnceCell::const_new();
async fn shared_stack() -> Result<Arc<StackHandles>> {
let cell = STACK
.get_or_try_init(|| async {
let handles = bring_up_stack().await?;
anyhow::Ok(Arc::new(handles))
})
.await?;
Ok(cell.clone())
}
async fn connect_with_role(stack: &StackHandles, key_json: &str) -> Result<async_nats::Client> {
let token = mint_access_token(
&stack.zitadel_url,
key_json,
&scopes_for_project(&stack.project_id),
)
.await
.context("mint Zitadel access token")?;
ConnectOptions::with_token(token)
.connection_timeout(Duration::from_secs(5))
.connect(&stack.nats_url_external)
.await
.map_err(|e| anyhow::anyhow!("NATS connect: {e}"))
}
#[tokio::test]
async fn admin_can_read_any_device_subject() -> Result<()> {
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
let stack = shared_stack().await?;
let admin = connect_with_role(&stack, &stack.admin_machine_key).await?;
let device = connect_with_role(&stack, &stack.device_a_machine_key).await?;
let mut admin_sub = admin.subscribe("device-state.>").await?;
admin.flush().await?;
device
.publish("device-state.sensor-a", "telemetry-payload".into())
.await?;
device.flush().await?;
let msg = tokio::time::timeout(Duration::from_secs(5), admin_sub.next())
.await
.context("admin sub timeout")?
.context("admin sub closed")?;
assert_eq!(msg.payload.as_ref(), b"telemetry-payload");
Ok(())
}
#[tokio::test]
async fn device_can_only_access_own_subjects() -> Result<()> {
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
let stack = shared_stack().await?;
let device_a = connect_with_role(&stack, &stack.device_a_machine_key).await?;
let device_b = connect_with_role(&stack, &stack.device_b_machine_key).await?;
let _b_sub = device_b.subscribe("device-commands.sensor-b").await?;
let mut a_wrong = device_a.subscribe("device-commands.sensor-b").await?;
device_a.flush().await?;
device_b.flush().await?;
// We only care that A's subscription does NOT receive B's traffic;
// pushing through B-side traffic would be a no-op since A's
// subscription was rejected by NATS at SUB time.
device_b
.publish("device-commands.sensor-b", "should-not-leak".into())
.await?;
device_b.flush().await?;
let result = tokio::time::timeout(Duration::from_millis(750), a_wrong.next()).await;
assert!(
result.is_err(),
"device A must not observe device B's commands"
);
Ok(())
}
#[tokio::test]
async fn unknown_role_is_rejected() -> Result<()> {
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
let stack = shared_stack().await?;
// The intruder has a valid Zitadel JWT but no fleet-admin/device role
// grant. The callout must reject the connection — NATS surfaces that
// as `authorization violation` at connect time.
let result = connect_with_role(&stack, &stack.intruder_machine_key).await;
assert!(
result.is_err(),
"JWT without fleet role must not be admitted to NATS"
);
Ok(())
}