harmony-nats-callout becomes a deployable service, not just a library:
- New [[bin]] target with env+secret-file driven config and
SIGINT/SIGTERM-aware shutdown.
- Dockerfile (single-stage archlinux:base, non-root, matches
harmony-fleet-operator convention).
- Refactored handler into a pure `decide()` function so the entire
authorization decision tree is unit-testable without async-nats.
- New `roles` module with role resolution + a `validate_device_id`
security gate that rejects NATS subject metacharacters in device_id
(.>* whitespace) — closes a real escalation path through the
`{device_id}` placeholder in the per-device permissions block.
- Configurable role claim path + admin/device role names; admin wins
when both are present (privilege-escalation invariant).
57 unit tests cover every reachable branch of the security decision
tree; 4 e2e tests in nats/integration-test-callout exercise real NATS
in podman with: device pubsub on own subjects, cross-device subject
isolation, admin-can-read-anything, and JWT-without-role rejection.
harmony/src/modules/nats_auth_callout/:
- New `NatsAuthCalloutScore` deploys the callout as a K8s Deployment +
Secret. fsGroup + 0o440 secret mode so the non-root container can
read its mounted seed/password without leaving them in env vars.
- `render_auth_callout_block` helper produces the YAML for NATS Helm
`config.merge.authorization.auth_callout` so both halves stay in
sync.
examples/fleet_auth_callout/:
- `bring_up_stack()` orchestrates k3d -> Zitadel + Postgres ->
CoreDNS rewrite -> project + roles + machine users with JWT keys
-> NATS Helm with auth_callout block -> callout image build +
sideload -> NatsAuthCalloutScore deploy. Idempotent across re-runs
(issuer NKey persisted in a K8s secret so user JWTs survive
restarts).
- `mint_access_token()` RFC 7523 JWT-bearer client. Uses Host header
with port so Zitadel emits a matching issuer.
- main.rs prints URLs/creds/keyIds and waits for Ctrl-C.
- Three #[tokio::test] functions sharing one cluster via OnceCell:
admin_can_read_any_device_subject, device_can_only_access_own_subjects,
unknown_role_is_rejected. All green on real k3d.
132 lines
4.6 KiB
Rust
132 lines
4.6 KiB
Rust
//! Real cargo tests proving the IoT fleet security model.
|
|
//!
|
|
//! All tests share a single bringup of the stack via [`OnceCell`]. The
|
|
//! cluster keeps running across the suite, with each test using the
|
|
//! cached machine keys to mint Zitadel JWTs and exercise NATS through
|
|
//! the auth callout. Three invariants:
|
|
//!
|
|
//! 1. `admin_can_read_any_device_subject` — fleet-admin sees other devices' state.
|
|
//! 2. `device_can_only_access_own_subjects` — sensor-a is denied access to sensor-b's commands.
|
|
//! 3. `unknown_role_is_rejected` — a Zitadel-authenticated user with no
|
|
//! fleet role cannot connect to NATS.
|
|
//!
|
|
//! ## Why these tests are real-stack
|
|
//!
|
|
//! Mocking the OIDC issuer or NATS would only re-prove the unit tests
|
|
//! already cover. The point of this suite is to confirm — in CI, in
|
|
//! cargo — that the **deployed** stack on k3d enforces the security
|
|
//! model end-to-end. Hidden cluster-level misconfiguration (an unset
|
|
//! `auth_callout` block, a wrong issuer pubkey, a CoreDNS rewrite drift,
|
|
//! a permissions YAML typo) only shows up here.
|
|
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use anyhow::{Context, Result};
|
|
use async_nats::ConnectOptions;
|
|
use example_fleet_auth_callout::{
|
|
StackHandles, bring_up_stack, mint_access_token, scopes_for_project,
|
|
};
|
|
use futures_util::StreamExt;
|
|
use tokio::sync::OnceCell;
|
|
|
|
static STACK: OnceCell<Arc<StackHandles>> = OnceCell::const_new();
|
|
|
|
async fn shared_stack() -> Result<Arc<StackHandles>> {
|
|
let cell = STACK
|
|
.get_or_try_init(|| async {
|
|
let handles = bring_up_stack().await?;
|
|
anyhow::Ok(Arc::new(handles))
|
|
})
|
|
.await?;
|
|
Ok(cell.clone())
|
|
}
|
|
|
|
async fn connect_with_role(stack: &StackHandles, key_json: &str) -> Result<async_nats::Client> {
|
|
let token = mint_access_token(
|
|
&stack.zitadel_url,
|
|
key_json,
|
|
&scopes_for_project(&stack.project_id),
|
|
)
|
|
.await
|
|
.context("mint Zitadel access token")?;
|
|
|
|
ConnectOptions::with_token(token)
|
|
.connection_timeout(Duration::from_secs(5))
|
|
.connect(&stack.nats_url_external)
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("NATS connect: {e}"))
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn admin_can_read_any_device_subject() -> Result<()> {
|
|
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
|
|
let stack = shared_stack().await?;
|
|
|
|
let admin = connect_with_role(&stack, &stack.admin_machine_key).await?;
|
|
let device = connect_with_role(&stack, &stack.device_a_machine_key).await?;
|
|
|
|
let mut admin_sub = admin.subscribe("device-state.>").await?;
|
|
admin.flush().await?;
|
|
|
|
device
|
|
.publish("device-state.sensor-a", "telemetry-payload".into())
|
|
.await?;
|
|
device.flush().await?;
|
|
|
|
let msg = tokio::time::timeout(Duration::from_secs(5), admin_sub.next())
|
|
.await
|
|
.context("admin sub timeout")?
|
|
.context("admin sub closed")?;
|
|
assert_eq!(msg.payload.as_ref(), b"telemetry-payload");
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn device_can_only_access_own_subjects() -> Result<()> {
|
|
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
|
|
let stack = shared_stack().await?;
|
|
|
|
let device_a = connect_with_role(&stack, &stack.device_a_machine_key).await?;
|
|
let device_b = connect_with_role(&stack, &stack.device_b_machine_key).await?;
|
|
|
|
let _b_sub = device_b.subscribe("device-commands.sensor-b").await?;
|
|
let mut a_wrong = device_a.subscribe("device-commands.sensor-b").await?;
|
|
device_a.flush().await?;
|
|
device_b.flush().await?;
|
|
|
|
// We only care that A's subscription does NOT receive B's traffic;
|
|
// pushing through B-side traffic would be a no-op since A's
|
|
// subscription was rejected by NATS at SUB time.
|
|
device_b
|
|
.publish("device-commands.sensor-b", "should-not-leak".into())
|
|
.await?;
|
|
device_b.flush().await?;
|
|
|
|
let result = tokio::time::timeout(Duration::from_millis(750), a_wrong.next()).await;
|
|
assert!(
|
|
result.is_err(),
|
|
"device A must not observe device B's commands"
|
|
);
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
async fn unknown_role_is_rejected() -> Result<()> {
|
|
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
|
|
let stack = shared_stack().await?;
|
|
|
|
// The intruder has a valid Zitadel JWT but no fleet-admin/device role
|
|
// grant. The callout must reject the connection — NATS surfaces that
|
|
// as `authorization violation` at connect time.
|
|
let result = connect_with_role(&stack, &stack.intruder_machine_key).await;
|
|
assert!(
|
|
result.is_err(),
|
|
"JWT without fleet role must not be admitted to NATS"
|
|
);
|
|
|
|
Ok(())
|
|
}
|