Files
harmony/nats/integration-test-callout/tests/callout_e2e.rs
Jean-Gabriel Gill-Couture 54308fd7a4
Some checks failed
Run Check Script / check (pull_request) Failing after -44h56m9s
chore: formatting
2026-05-04 09:03:35 -04:00

275 lines
9.4 KiB
Rust

use std::time::Duration;
use anyhow::{Context, Result};
use async_nats::ConnectOptions;
use futures_util::StreamExt;
use tracing::{info, warn};
use harmony_nats_callout::{AuthCalloutConfig, AuthCalloutService};
use integration_test_callout::{
ADMIN_ROLE, CalloutContext, NATS_PORT_TEST_ADMIN, NATS_PORT_TEST_ISOLATION,
NATS_PORT_TEST_NO_ROLE, NATS_PORT_TEST_PUBSUB, NatsServer,
};
/// Spawn the auth callout service against `ctx` and `nats`. Returns once the
/// subscription is active and the service is ready to handle requests.
async fn start_callout(ctx: &CalloutContext, nats_url: String) -> Result<()> {
let config = AuthCalloutConfig::builder()
.nats_url(nats_url)
.auth_user("auth")
.auth_pass("auth")
.issuer_kp(ctx.issuer_kp.clone())
.oidc_issuer_url(ctx.oidc.issuer_url())
.oidc_audience("harmony-iot-devices")
.device_id_claim("device_id")
.danger_accept_invalid_certs(true)
.build()?;
let service = AuthCalloutService::new(config);
tokio::spawn(async move {
if let Err(e) = service.run().await {
warn!(error = %e, "callout service exited with error");
}
});
tokio::time::sleep(Duration::from_millis(500)).await;
Ok(())
}
#[tokio::test]
async fn device_authenticates_and_pubsub() -> Result<()> {
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
info!("generating callout context");
let ctx = CalloutContext::generate(NATS_PORT_TEST_PUBSUB).await?;
info!(issuer_pubkey = %ctx.issuer_kp.public_key(), oidc_url = %ctx.oidc.issuer_url(), "callout context ready");
info!("starting NATS server in podman");
let nats = NatsServer::start(&ctx.tmpdir, NATS_PORT_TEST_PUBSUB).await?;
info!(url = %nats.url(), "NATS server ready");
info!("starting auth callout service");
start_callout(&ctx, nats.url()).await?;
let device_id = "sensor-test-01";
let zitadel_jwt = ctx.oidc.issue_jwt(device_id)?;
info!(device_id = device_id, "issued Zitadel JWT for device");
let nats_url = nats.url();
info!("connecting device client with Zitadel JWT");
let device_client = ConnectOptions::with_token(zitadel_jwt.clone())
.connection_timeout(Duration::from_secs(5))
.connect(&nats_url)
.await;
let device_client = match device_client {
Ok(c) => {
info!("device connected on first attempt");
c
}
Err(first_err) => {
warn!(error = %first_err, "first connection failed, retrying");
tokio::time::sleep(Duration::from_millis(500)).await;
let zitadel_jwt2 = ctx.oidc.issue_jwt(device_id)?;
ConnectOptions::with_token(zitadel_jwt2)
.connection_timeout(Duration::from_secs(5))
.connect(&nats_url)
.await
.map_err(|e| {
anyhow::anyhow!("device connection failed on retry: {e} (first: {first_err})")
})?
}
};
let pub_subject = format!("device-state.{device_id}");
let sub_subject = format!("device-commands.{device_id}");
let platform_nc = async_nats::connect_with_options(
&nats_url,
ConnectOptions::new().user_and_password("platform".to_string(), "platform".to_string()),
)
.await
.map_err(|e| anyhow::anyhow!("platform connection failed: {e}"))?;
let mut platform_sub = platform_nc.subscribe(pub_subject.clone()).await?;
platform_nc.flush().await?;
info!(subject = %sub_subject, "subscribing device to commands");
let mut sub = device_client.subscribe(sub_subject.clone()).await?;
info!(subject = %pub_subject, "publishing state from device");
device_client
.publish(pub_subject.clone(), "hello from device".into())
.await?;
device_client.flush().await?;
info!("waiting for device state message on platform side");
let state_msg = tokio::time::timeout(Duration::from_secs(5), platform_sub.next())
.await
.context("timeout waiting for device state")?
.context("subscription closed")?;
assert_eq!(
state_msg.payload.as_ref(),
b"hello from device",
"device state payload mismatch"
);
info!("device state received by platform");
info!("sending command from platform to device");
platform_nc
.publish(sub_subject.clone(), "command: reboot".into())
.await?;
platform_nc.flush().await?;
info!("waiting for command on device side");
let cmd_msg = tokio::time::timeout(Duration::from_secs(5), sub.next())
.await
.context("timeout waiting for command")?
.context("subscription closed")?;
assert_eq!(
cmd_msg.payload.as_ref(),
b"command: reboot",
"command payload mismatch"
);
info!("command received by device");
nats.stop().await?;
info!("test passed — device authenticated and pub/sub verified end-to-end");
Ok(())
}
#[tokio::test]
async fn device_cannot_access_other_device_subjects() -> Result<()> {
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
let ctx = CalloutContext::generate(NATS_PORT_TEST_ISOLATION).await?;
let nats = NatsServer::start(&ctx.tmpdir, NATS_PORT_TEST_ISOLATION).await?;
start_callout(&ctx, nats.url()).await?;
let device_a_jwt = ctx.oidc.issue_jwt("sensor-a")?;
let device_b_jwt = ctx.oidc.issue_jwt("sensor-b")?;
let nats_url = format!("nats://127.0.0.1:{NATS_PORT_TEST_ISOLATION}");
let device_a = ConnectOptions::with_token(device_a_jwt)
.connection_timeout(Duration::from_secs(5))
.connect(&nats_url)
.await
.map_err(|e| anyhow::anyhow!("device A connection failed: {e}"))?;
let device_b = ConnectOptions::with_token(device_b_jwt)
.connection_timeout(Duration::from_secs(5))
.connect(&nats_url)
.await
.map_err(|e| anyhow::anyhow!("device B connection failed: {e}"))?;
let _sub_b_commands = device_b.subscribe("device-commands.sensor-b").await?;
let mut sub_a_wrong = device_a.subscribe("device-commands.sensor-b").await?;
device_a.flush().await?;
device_b.flush().await?;
device_a
.publish("device-state.sensor-a", "hello from A".into())
.await?;
device_a.flush().await?;
let result = tokio::time::timeout(Duration::from_millis(500), sub_a_wrong.next()).await;
assert!(
result.is_err(),
"device A should NOT receive device B's commands"
);
nats.stop().await?;
Ok(())
}
#[tokio::test]
async fn admin_role_can_read_any_device_subject() -> Result<()> {
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
let ctx = CalloutContext::generate(NATS_PORT_TEST_ADMIN).await?;
let nats = NatsServer::start(&ctx.tmpdir, NATS_PORT_TEST_ADMIN).await?;
start_callout(&ctx, nats.url()).await?;
// Admin JWT — name carries no device meaning, but the role grants
// unrestricted access; device JWT scopes the publishing client.
let admin_jwt = ctx
.oidc
.issue_jwt_with_roles("ops-station", &[ADMIN_ROLE])?;
let device_jwt = ctx.oidc.issue_jwt("sensor-7")?;
let nats_url = format!("nats://127.0.0.1:{NATS_PORT_TEST_ADMIN}");
let admin = ConnectOptions::with_token(admin_jwt)
.connection_timeout(Duration::from_secs(5))
.connect(&nats_url)
.await
.map_err(|e| anyhow::anyhow!("admin connection failed: {e}"))?;
let device = ConnectOptions::with_token(device_jwt)
.connection_timeout(Duration::from_secs(5))
.connect(&nats_url)
.await
.map_err(|e| anyhow::anyhow!("device connection failed: {e}"))?;
// Admin subscribes to a wildcard the device role would never be allowed
// to subscribe to. If the admin permissions block didn't grant `>`, NATS
// would reject this subscription before any message ever reached us.
let mut admin_sub = admin.subscribe("device-state.>").await?;
admin.flush().await?;
device
.publish("device-state.sensor-7", "hello from sensor-7".into())
.await?;
device.flush().await?;
let msg = tokio::time::timeout(Duration::from_secs(5), admin_sub.next())
.await
.context("timeout waiting for device state on admin subscription")?
.context("admin subscription closed")?;
assert_eq!(
msg.payload.as_ref(),
b"hello from sensor-7",
"admin should observe device state"
);
nats.stop().await?;
Ok(())
}
#[tokio::test]
async fn jwt_with_no_authorized_role_is_rejected() -> Result<()> {
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
let ctx = CalloutContext::generate(NATS_PORT_TEST_NO_ROLE).await?;
let nats = NatsServer::start(&ctx.tmpdir, NATS_PORT_TEST_NO_ROLE).await?;
start_callout(&ctx, nats.url()).await?;
// JWT signed by the trusted issuer with the right audience but carrying
// no role mapped to either admin or device permissions.
let unprivileged_jwt = ctx
.oidc
.issue_jwt_with_roles("intruder-1", &["some-other-role"])?;
let nats_url = format!("nats://127.0.0.1:{NATS_PORT_TEST_NO_ROLE}");
let connect = ConnectOptions::with_token(unprivileged_jwt)
.connection_timeout(Duration::from_secs(3))
.connect(&nats_url)
.await;
assert!(
connect.is_err(),
"JWT without an authorized role must not be admitted"
);
nats.stop().await?;
Ok(())
}