Files
harmony/examples/fleet_auth_callout/tests/security_model.rs
Reda Tarzalt 1087717295
Some checks failed
Run Check Script / check (pull_request) Failing after 2m3s
ignore tests
2026-05-15 12:45:29 -04:00

135 lines
4.7 KiB
Rust

//! Real cargo tests proving the IoT fleet security model.
//!
//! All tests share a single bringup of the stack via [`OnceCell`]. The
//! cluster keeps running across the suite, with each test using the
//! cached machine keys to mint Zitadel JWTs and exercise NATS through
//! the auth callout. Three invariants:
//!
//! 1. `admin_can_read_any_device_subject` — fleet-admin sees other devices' state.
//! 2. `device_can_only_access_own_subjects` — sensor-a is denied access to sensor-b's commands.
//! 3. `unknown_role_is_rejected` — a Zitadel-authenticated user with no
//! fleet role cannot connect to NATS.
//!
//! ## Why these tests are real-stack
//!
//! Mocking the OIDC issuer or NATS would only re-prove the unit tests
//! already cover. The point of this suite is to confirm — in CI, in
//! cargo — that the **deployed** stack on k3d enforces the security
//! model end-to-end. Hidden cluster-level misconfiguration (an unset
//! `auth_callout` block, a wrong issuer pubkey, a CoreDNS rewrite drift,
//! a permissions YAML typo) only shows up here.
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use async_nats::ConnectOptions;
use example_fleet_auth_callout::{
StackHandles, bring_up_stack, mint_access_token, scopes_for_project,
};
use futures_util::StreamExt;
use tokio::sync::OnceCell;
static STACK: OnceCell<Arc<StackHandles>> = OnceCell::const_new();
async fn shared_stack() -> Result<Arc<StackHandles>> {
let cell = STACK
.get_or_try_init(|| async {
let handles = bring_up_stack().await?;
anyhow::Ok(Arc::new(handles))
})
.await?;
Ok(cell.clone())
}
async fn connect_with_role(stack: &StackHandles, key_json: &str) -> Result<async_nats::Client> {
let token = mint_access_token(
&stack.zitadel_url,
key_json,
&scopes_for_project(&stack.project_id),
)
.await
.context("mint Zitadel access token")?;
ConnectOptions::with_token(token)
.connection_timeout(Duration::from_secs(5))
.connect(&stack.nats_url_external)
.await
.map_err(|e| anyhow::anyhow!("NATS connect: {e}"))
}
#[tokio::test]
#[ignore = "requires k3d + docker environment"]
async fn admin_can_read_any_device_subject() -> Result<()> {
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
let stack = shared_stack().await?;
let admin = connect_with_role(&stack, &stack.admin_machine_key).await?;
let device = connect_with_role(&stack, &stack.device_a_machine_key).await?;
let mut admin_sub = admin.subscribe("device-state.>").await?;
admin.flush().await?;
device
.publish("device-state.sensor-a", "telemetry-payload".into())
.await?;
device.flush().await?;
let msg = tokio::time::timeout(Duration::from_secs(5), admin_sub.next())
.await
.context("admin sub timeout")?
.context("admin sub closed")?;
assert_eq!(msg.payload.as_ref(), b"telemetry-payload");
Ok(())
}
#[tokio::test]
#[ignore = "requires k3d + docker environment"]
async fn device_can_only_access_own_subjects() -> Result<()> {
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
let stack = shared_stack().await?;
let device_a = connect_with_role(&stack, &stack.device_a_machine_key).await?;
let device_b = connect_with_role(&stack, &stack.device_b_machine_key).await?;
let _b_sub = device_b.subscribe("device-commands.sensor-b").await?;
let mut a_wrong = device_a.subscribe("device-commands.sensor-b").await?;
device_a.flush().await?;
device_b.flush().await?;
// We only care that A's subscription does NOT receive B's traffic;
// pushing through B-side traffic would be a no-op since A's
// subscription was rejected by NATS at SUB time.
device_b
.publish("device-commands.sensor-b", "should-not-leak".into())
.await?;
device_b.flush().await?;
let result = tokio::time::timeout(Duration::from_millis(750), a_wrong.next()).await;
assert!(
result.is_err(),
"device A must not observe device B's commands"
);
Ok(())
}
#[tokio::test]
#[ignore = "requires k3d + docker environment"]
async fn unknown_role_is_rejected() -> Result<()> {
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
let stack = shared_stack().await?;
// The intruder has a valid Zitadel JWT but no fleet-admin/device role
// grant. The callout must reject the connection — NATS surfaces that
// as `authorization violation` at connect time.
let result = connect_with_role(&stack, &stack.intruder_machine_key).await;
assert!(
result.is_err(),
"JWT without fleet role must not be admitted to NATS"
);
Ok(())
}