All checks were successful
Run Check Script / check (pull_request) Successful in 2m0s
279 lines
9.7 KiB
Rust
279 lines
9.7 KiB
Rust
use std::time::Duration;
|
|
|
|
use anyhow::{Context, Result};
|
|
use async_nats::ConnectOptions;
|
|
use futures_util::StreamExt;
|
|
use tracing::{info, warn};
|
|
|
|
use harmony_nats_callout::{AuthCalloutConfig, AuthCalloutService};
|
|
use integration_test_callout::{
|
|
ADMIN_ROLE, CalloutContext, NATS_PORT_TEST_ADMIN, NATS_PORT_TEST_ISOLATION,
|
|
NATS_PORT_TEST_NO_ROLE, NATS_PORT_TEST_PUBSUB, NatsServer,
|
|
};
|
|
|
|
/// Spawn the auth callout service against `ctx` and `nats`. Returns once the
|
|
/// subscription is active and the service is ready to handle requests.
|
|
async fn start_callout(ctx: &CalloutContext, nats_url: String) -> Result<()> {
|
|
let config = AuthCalloutConfig::builder()
|
|
.nats_url(nats_url)
|
|
.auth_user("auth")
|
|
.auth_pass("auth")
|
|
.issuer_kp(ctx.issuer_kp.clone())
|
|
.oidc_issuer_url(ctx.oidc.issuer_url())
|
|
.oidc_audience("harmony-iot-devices")
|
|
.device_id_claim("device_id")
|
|
.danger_accept_invalid_certs(true)
|
|
.build()?;
|
|
|
|
let service = AuthCalloutService::new(config);
|
|
tokio::spawn(async move {
|
|
if let Err(e) = service.run().await {
|
|
warn!(error = %e, "callout service exited with error");
|
|
}
|
|
});
|
|
|
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
#[ignore = "requires podman + NATS image pull; runs against a real callout server, flaky in CI"]
|
|
async fn device_authenticates_and_pubsub() -> Result<()> {
|
|
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
|
|
|
|
info!("generating callout context");
|
|
let ctx = CalloutContext::generate(NATS_PORT_TEST_PUBSUB).await?;
|
|
info!(issuer_pubkey = %ctx.issuer_kp.public_key(), oidc_url = %ctx.oidc.issuer_url(), "callout context ready");
|
|
|
|
info!("starting NATS server in podman");
|
|
let nats = NatsServer::start(&ctx.tmpdir, NATS_PORT_TEST_PUBSUB).await?;
|
|
info!(url = %nats.url(), "NATS server ready");
|
|
|
|
info!("starting auth callout service");
|
|
start_callout(&ctx, nats.url()).await?;
|
|
|
|
let device_id = "sensor-test-01";
|
|
let zitadel_jwt = ctx.oidc.issue_jwt(device_id)?;
|
|
info!(device_id = device_id, "issued Zitadel JWT for device");
|
|
|
|
let nats_url = nats.url();
|
|
|
|
info!("connecting device client with Zitadel JWT");
|
|
let device_client = ConnectOptions::with_token(zitadel_jwt.clone())
|
|
.connection_timeout(Duration::from_secs(5))
|
|
.connect(&nats_url)
|
|
.await;
|
|
|
|
let device_client = match device_client {
|
|
Ok(c) => {
|
|
info!("device connected on first attempt");
|
|
c
|
|
}
|
|
Err(first_err) => {
|
|
warn!(error = %first_err, "first connection failed, retrying");
|
|
tokio::time::sleep(Duration::from_millis(500)).await;
|
|
|
|
let zitadel_jwt2 = ctx.oidc.issue_jwt(device_id)?;
|
|
ConnectOptions::with_token(zitadel_jwt2)
|
|
.connection_timeout(Duration::from_secs(5))
|
|
.connect(&nats_url)
|
|
.await
|
|
.map_err(|e| {
|
|
anyhow::anyhow!("device connection failed on retry: {e} (first: {first_err})")
|
|
})?
|
|
}
|
|
};
|
|
|
|
let pub_subject = format!("device-state.{device_id}");
|
|
let sub_subject = format!("device-commands.{device_id}");
|
|
|
|
let platform_nc = async_nats::connect_with_options(
|
|
&nats_url,
|
|
ConnectOptions::new().user_and_password("platform".to_string(), "platform".to_string()),
|
|
)
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("platform connection failed: {e}"))?;
|
|
|
|
let mut platform_sub = platform_nc.subscribe(pub_subject.clone()).await?;
|
|
platform_nc.flush().await?;
|
|
|
|
info!(subject = %sub_subject, "subscribing device to commands");
|
|
let mut sub = device_client.subscribe(sub_subject.clone()).await?;
|
|
|
|
info!(subject = %pub_subject, "publishing state from device");
|
|
device_client
|
|
.publish(pub_subject.clone(), "hello from device".into())
|
|
.await?;
|
|
device_client.flush().await?;
|
|
|
|
info!("waiting for device state message on platform side");
|
|
let state_msg = tokio::time::timeout(Duration::from_secs(5), platform_sub.next())
|
|
.await
|
|
.context("timeout waiting for device state")?
|
|
.context("subscription closed")?;
|
|
|
|
assert_eq!(
|
|
state_msg.payload.as_ref(),
|
|
b"hello from device",
|
|
"device state payload mismatch"
|
|
);
|
|
info!("device state received by platform");
|
|
|
|
info!("sending command from platform to device");
|
|
platform_nc
|
|
.publish(sub_subject.clone(), "command: reboot".into())
|
|
.await?;
|
|
platform_nc.flush().await?;
|
|
|
|
info!("waiting for command on device side");
|
|
let cmd_msg = tokio::time::timeout(Duration::from_secs(5), sub.next())
|
|
.await
|
|
.context("timeout waiting for command")?
|
|
.context("subscription closed")?;
|
|
|
|
assert_eq!(
|
|
cmd_msg.payload.as_ref(),
|
|
b"command: reboot",
|
|
"command payload mismatch"
|
|
);
|
|
info!("command received by device");
|
|
|
|
nats.stop().await?;
|
|
info!("test passed — device authenticated and pub/sub verified end-to-end");
|
|
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
#[ignore = "requires podman + NATS image pull; runs against a real callout server, flaky in CI"]
|
|
async fn device_cannot_access_other_device_subjects() -> Result<()> {
|
|
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
|
|
|
|
let ctx = CalloutContext::generate(NATS_PORT_TEST_ISOLATION).await?;
|
|
let nats = NatsServer::start(&ctx.tmpdir, NATS_PORT_TEST_ISOLATION).await?;
|
|
start_callout(&ctx, nats.url()).await?;
|
|
|
|
let device_a_jwt = ctx.oidc.issue_jwt("sensor-a")?;
|
|
let device_b_jwt = ctx.oidc.issue_jwt("sensor-b")?;
|
|
|
|
let nats_url = format!("nats://127.0.0.1:{NATS_PORT_TEST_ISOLATION}");
|
|
|
|
let device_a = ConnectOptions::with_token(device_a_jwt)
|
|
.connection_timeout(Duration::from_secs(5))
|
|
.connect(&nats_url)
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("device A connection failed: {e}"))?;
|
|
|
|
let device_b = ConnectOptions::with_token(device_b_jwt)
|
|
.connection_timeout(Duration::from_secs(5))
|
|
.connect(&nats_url)
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("device B connection failed: {e}"))?;
|
|
|
|
let _sub_b_commands = device_b.subscribe("device-commands.sensor-b").await?;
|
|
let mut sub_a_wrong = device_a.subscribe("device-commands.sensor-b").await?;
|
|
device_a.flush().await?;
|
|
device_b.flush().await?;
|
|
|
|
device_a
|
|
.publish("device-state.sensor-a", "hello from A".into())
|
|
.await?;
|
|
device_a.flush().await?;
|
|
|
|
let result = tokio::time::timeout(Duration::from_millis(500), sub_a_wrong.next()).await;
|
|
assert!(
|
|
result.is_err(),
|
|
"device A should NOT receive device B's commands"
|
|
);
|
|
|
|
nats.stop().await?;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
#[ignore = "requires podman + NATS image pull; runs against a real callout server, flaky in CI"]
|
|
async fn admin_role_can_read_any_device_subject() -> Result<()> {
|
|
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
|
|
|
|
let ctx = CalloutContext::generate(NATS_PORT_TEST_ADMIN).await?;
|
|
let nats = NatsServer::start(&ctx.tmpdir, NATS_PORT_TEST_ADMIN).await?;
|
|
start_callout(&ctx, nats.url()).await?;
|
|
|
|
// Admin JWT — name carries no device meaning, but the role grants
|
|
// unrestricted access; device JWT scopes the publishing client.
|
|
let admin_jwt = ctx
|
|
.oidc
|
|
.issue_jwt_with_roles("ops-station", &[ADMIN_ROLE])?;
|
|
let device_jwt = ctx.oidc.issue_jwt("sensor-7")?;
|
|
|
|
let nats_url = format!("nats://127.0.0.1:{NATS_PORT_TEST_ADMIN}");
|
|
|
|
let admin = ConnectOptions::with_token(admin_jwt)
|
|
.connection_timeout(Duration::from_secs(5))
|
|
.connect(&nats_url)
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("admin connection failed: {e}"))?;
|
|
|
|
let device = ConnectOptions::with_token(device_jwt)
|
|
.connection_timeout(Duration::from_secs(5))
|
|
.connect(&nats_url)
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("device connection failed: {e}"))?;
|
|
|
|
// Admin subscribes to a wildcard the device role would never be allowed
|
|
// to subscribe to. If the admin permissions block didn't grant `>`, NATS
|
|
// would reject this subscription before any message ever reached us.
|
|
let mut admin_sub = admin.subscribe("device-state.>").await?;
|
|
admin.flush().await?;
|
|
|
|
device
|
|
.publish("device-state.sensor-7", "hello from sensor-7".into())
|
|
.await?;
|
|
device.flush().await?;
|
|
|
|
let msg = tokio::time::timeout(Duration::from_secs(5), admin_sub.next())
|
|
.await
|
|
.context("timeout waiting for device state on admin subscription")?
|
|
.context("admin subscription closed")?;
|
|
|
|
assert_eq!(
|
|
msg.payload.as_ref(),
|
|
b"hello from sensor-7",
|
|
"admin should observe device state"
|
|
);
|
|
|
|
nats.stop().await?;
|
|
Ok(())
|
|
}
|
|
|
|
#[tokio::test]
|
|
#[ignore = "requires podman + NATS image pull; runs against a real callout server, flaky in CI"]
|
|
async fn jwt_with_no_authorized_role_is_rejected() -> Result<()> {
|
|
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
|
|
|
|
let ctx = CalloutContext::generate(NATS_PORT_TEST_NO_ROLE).await?;
|
|
let nats = NatsServer::start(&ctx.tmpdir, NATS_PORT_TEST_NO_ROLE).await?;
|
|
start_callout(&ctx, nats.url()).await?;
|
|
|
|
// JWT signed by the trusted issuer with the right audience but carrying
|
|
// no role mapped to either admin or device permissions.
|
|
let unprivileged_jwt = ctx
|
|
.oidc
|
|
.issue_jwt_with_roles("intruder-1", &["some-other-role"])?;
|
|
|
|
let nats_url = format!("nats://127.0.0.1:{NATS_PORT_TEST_NO_ROLE}");
|
|
|
|
let connect = ConnectOptions::with_token(unprivileged_jwt)
|
|
.connection_timeout(Duration::from_secs(3))
|
|
.connect(&nats_url)
|
|
.await;
|
|
|
|
assert!(
|
|
connect.is_err(),
|
|
"JWT without an authorized role must not be admitted"
|
|
);
|
|
|
|
nats.stop().await?;
|
|
Ok(())
|
|
}
|