Files
harmony/examples/fleet_e2e_demo/tests/e2e_walking_skeleton.rs
Jean-Gabriel Gill-Couture 54308fd7a4
Some checks failed
Run Check Script / check (pull_request) Failing after -44h56m9s
chore: formatting
2026-05-04 09:03:35 -04:00

160 lines
6.2 KiB
Rust

//! End-to-end walking-skeleton tests for the VM-based demo rehearsal.
//!
//! Shares one bring-up across the whole suite via `OnceCell`. Run
//! sequentially — they touch shared k3d + libvirt VM state.
//!
//! Pre-flight (manual, before `cargo test`):
//!
//! - libvirt + qemu installed; default network active.
//! - Two cloud-init Ubuntu VMs provisioned (e.g. via
//! `cargo run -p example_fleet_vm_setup`). Their IPs exported as
//! `FLEET_E2E_VM_0_IP` and `FLEET_E2E_VM_1_IP`.
//! - SSH keypair the VMs trust at `~/.ssh/id_ed25519` (or
//! override path; harness reads the standard pair).
//!
//! Run:
//!
//! ```bash
//! FLEET_E2E_VM_0_IP=192.168.122.42 \
//! FLEET_E2E_VM_1_IP=192.168.122.43 \
//! cargo test -p example-fleet-e2e-demo --test e2e_walking_skeleton \
//! -- --test-threads=1 --nocapture
//! ```
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use async_nats::ConnectOptions;
use example_fleet_auth_callout::{mint_access_token, scopes_for_project};
use example_fleet_e2e_demo::{E2eDemoOpts, E2eHandles, bring_up_full_stack};
use futures_util::StreamExt;
use tokio::sync::OnceCell;
static STACK: OnceCell<Arc<E2eHandles>> = OnceCell::const_new();
async fn shared_stack() -> Result<Arc<E2eHandles>> {
let cell = STACK
.get_or_try_init(|| async {
let h = bring_up_full_stack(E2eDemoOpts::default()).await?;
anyhow::Ok(Arc::new(h))
})
.await?;
Ok(cell.clone())
}
async fn admin_nats_client(stack: &E2eHandles) -> Result<async_nats::Client> {
let token = mint_access_token(
&stack.zitadel_url,
&stack.admin_machine_key,
&scopes_for_project(&stack.project_id),
)
.await
.context("mint admin Zitadel token")?;
ConnectOptions::with_token(token)
.connection_timeout(Duration::from_secs(5))
.connect(&stack.nats_url_external)
.await
.map_err(|e| anyhow::anyhow!("admin connect: {e}"))
}
// -- Test 1 -------------------------------------------------------------
/// Each provisioned VM publishes a DeviceInfo within the heartbeat
/// window. Reads from the `device-info` KV bucket via the admin
/// client (admin role can subscribe to anything).
#[tokio::test]
async fn both_devices_heartbeat_within_60s() -> Result<()> {
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
let stack = shared_stack().await?;
let admin = admin_nats_client(&stack).await?;
let js = async_nats::jetstream::new(admin);
let bucket = js
.get_key_value(harmony_reconciler_contracts::BUCKET_DEVICE_INFO)
.await
.context("device-info bucket")?;
let deadline = std::time::Instant::now() + Duration::from_secs(60);
let expected: std::collections::HashSet<String> =
stack.devices.iter().map(|d| d.device_id.clone()).collect();
let mut seen = std::collections::HashSet::new();
while std::time::Instant::now() < deadline && seen != expected {
for d in &stack.devices {
let key = harmony_reconciler_contracts::device_info_key(&d.device_id);
if let Some(_e) = bucket.entry(&key).await? {
seen.insert(d.device_id.clone());
}
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
assert_eq!(
seen, expected,
"each provisioned device must publish DeviceInfo within 60s; saw {seen:?}"
);
Ok(())
}
// -- Test 5 (admin cross-device read) -----------------------------------
/// The admin's Zitadel JWT carries `fleet-admin` role. Callout maps
/// that to `pub/sub allow: [">"]`, so subscribing to `device-state.>`
/// is admitted and observes every device's traffic.
#[tokio::test]
async fn admin_jwt_reads_any_device_subject() -> Result<()> {
let _ = tracing_subscriber::fmt().with_env_filter("info").try_init();
let stack = shared_stack().await?;
let admin = admin_nats_client(&stack).await?;
let mut sub = admin.subscribe("device-state.>").await?;
admin.flush().await?;
// Hold the subscription open long enough that any device's
// periodic state publication should land. We don't pump traffic
// ourselves — the agents themselves publish per-deployment state
// on every reconcile tick. If no traffic arrives in 30s it means
// either the agents aren't connected or they're not publishing,
// both of which are fatal for the demo.
let result = tokio::time::timeout(Duration::from_secs(30), sub.next()).await;
assert!(
result.is_ok() && result.as_ref().unwrap().is_some(),
"admin must observe at least one device-state.* message in 30s"
);
Ok(())
}
// -- Test 6 (per-device isolation) ---------------------------------------
/// A per-device JWT has subject permissions scoped to its own
/// `device-state.{device_id}` and `device-commands.{device_id}`. The
/// callout enforces this; subscribing to a sibling device's commands
/// must fail at NATS connect-time or at SUB-time.
///
/// Skipped here because the per-device JWT minting helper (analogous
/// to `mint_access_token` but for a `device` role user) needs the
/// per-device machine key to be plumbed back from `bring_up_full_stack`
/// through `E2eHandles`. Follow-up commit adds
/// `E2eHandles::device_machine_key(idx)` so this test can be
/// implemented without re-running `ZitadelSetupScore` from the test
/// body.
#[tokio::test]
#[ignore = "requires E2eHandles::device_machine_key plumbing"]
async fn cross_device_isolation_enforced_in_vm() {}
// -- Test 7 (load-bearing reconnect) -------------------------------------
/// Kill the NATS pod, wait for the new one to come up, verify both
/// agents reconnect with fresh JWTs and resume publishing within
/// 30 seconds. This is the test that validates the "never lose
/// connectivity to a device" guarantee under realistic disturbance.
///
/// Skipped pending operator install in the harness — without the
/// operator the agents have no `desired-state` to publish status
/// against, so verifying "publishing resumed" needs a separate
/// signal. Follow-up commit observes the agents' periodic
/// heartbeat publication directly via the device-heartbeat KV.
#[tokio::test]
#[ignore = "requires NATS-pod-restart driver and heartbeat-presence assertion"]
async fn agent_recovers_from_nats_pod_restart() {}