Some checks failed
Run Check Script / check (pull_request) Failing after -44h57m23s
Two bugs surfaced when the agent went live against NATS JetStream KV
in the VM-based e2e rehearsal:
1. The default `device` role only allowed flat `device-state.<id>` /
`device-commands.<id>` subjects. The agent's actual data plane is
JetStream KV, which puts every operation on `$KV.<bucket>.<key>`
subjects with control-plane traffic on `$JS.API.>` and `$JS.ACK.>`.
With the old role config, the very first KV publish died with
`Permissions Violation for Publish to "$JS.API.INFO"`.
The role now allows `$JS.API.>` + `$JS.ACK.>` plus the four
per-device data subjects derived from
harmony_reconciler_contracts::kv (info.<id>, state.<id>.<dep>,
heartbeat.<id>, desired-state.<id>.<dep>). The legacy direct
`device-state.<id>` / `device-commands.<id>` subjects are kept so
non-JetStream callers of NatsAuthCalloutScore still work.
A new unit test (`device_role_covers_reconciler_contract_kv_subjects`)
imports the contract crate as a dev-dep and asserts each contract-
produced subject is matched, plus that cross-device subjects are
*not* matched. This locks the role config to the contract surface so
future renames break the test before they break prod.
2. Zitadel's `client_id` claim for a machine user equals the userName
verbatim. Both `fleet_rpi_setup` and `fleet_e2e_demo` create the
user as `device-{device_id}`, so the JWT carries
`device-vm-device-00` while the agent's KV keys use the bare
`vm-device-00`. The callout was interpolating the prefixed string
into permissions, producing rules that never matched what the
agent actually publishes.
Adds `device_id_prefix_strip` (env: `DEVICE_ID_PREFIX_STRIP`,
defaults empty so existing deployments are unaffected). When set,
the validator strips the prefix from the extracted claim before
permission interpolation. The fleet_auth_callout example wires it
to `device-` so the e2e harness stays end-to-end correct without
reaching into either naming convention.
Verified end-to-end: both VM agents now publish DeviceInfo /
heartbeat through JetStream KV with no permission errors and zero
service restarts since the rollout.
66 lines
2.0 KiB
Rust
66 lines
2.0 KiB
Rust
use std::sync::Arc;
|
|
|
|
use async_nats::ConnectOptions;
|
|
use futures_util::StreamExt;
|
|
use tracing::{error, info, warn};
|
|
|
|
use crate::config::AuthCalloutConfig;
|
|
use crate::handler::handle_auth_request;
|
|
use crate::zitadel::ZitadelValidator;
|
|
|
|
const AUTH_SUBJECT: &str = "$SYS.REQ.USER.AUTH";
|
|
|
|
/// Production NATS auth callout service.
|
|
pub struct AuthCalloutService {
|
|
config: AuthCalloutConfig,
|
|
}
|
|
|
|
impl AuthCalloutService {
|
|
pub fn new(config: AuthCalloutConfig) -> Self {
|
|
Self { config }
|
|
}
|
|
|
|
pub async fn run(&self) -> anyhow::Result<()> {
|
|
let nc = async_nats::connect_with_options(
|
|
&self.config.nats_url,
|
|
ConnectOptions::new()
|
|
.user_and_password(self.config.auth_user.clone(), self.config.auth_pass.clone())
|
|
.retry_on_initial_connect(),
|
|
)
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("NATS connection failed: {e}"))?;
|
|
|
|
let validator = Arc::new(
|
|
ZitadelValidator::new(
|
|
self.config.oidc_issuer_url.clone(),
|
|
self.config.oidc_audience.clone(),
|
|
self.config.device_id_claim.clone(),
|
|
self.config.device_id_prefix_strip.clone(),
|
|
self.config.danger_accept_invalid_certs,
|
|
)
|
|
.await?,
|
|
);
|
|
|
|
let mut subscriber = nc
|
|
.subscribe(AUTH_SUBJECT)
|
|
.await
|
|
.map_err(|e| anyhow::anyhow!("subscribe failed: {e}"))?;
|
|
|
|
info!(subject = AUTH_SUBJECT, "auth callout service listening");
|
|
|
|
while let Some(msg) = subscriber.next().await {
|
|
let config = self.config.clone();
|
|
let validator = validator.clone();
|
|
let nc = nc.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(e) = handle_auth_request(&nc, &msg, &config, &validator).await {
|
|
error!(error = %e, "failed to handle auth request");
|
|
}
|
|
});
|
|
}
|
|
|
|
warn!("auth callout subscription closed");
|
|
Ok(())
|
|
}
|
|
}
|