Files
harmony/nats/callout/src/config.rs
Jean-Gabriel Gill-Couture d4fd4859ec
Some checks failed
Run Check Script / check (pull_request) Failing after -44h57m23s
fix(callout): align device permissions with KV key formats and machine-user prefix
Two bugs surfaced when the agent went live against NATS JetStream KV
in the VM-based e2e rehearsal:

1. The default `device` role only allowed flat `device-state.<id>` /
   `device-commands.<id>` subjects. The agent's actual data plane is
   JetStream KV, which puts every operation on `$KV.<bucket>.<key>`
   subjects with control-plane traffic on `$JS.API.>` and `$JS.ACK.>`.
   With the old role config, the very first KV publish died with
   `Permissions Violation for Publish to "$JS.API.INFO"`.

   The role now allows `$JS.API.>` + `$JS.ACK.>` plus the four
   per-device data subjects derived from
   harmony_reconciler_contracts::kv (info.<id>, state.<id>.<dep>,
   heartbeat.<id>, desired-state.<id>.<dep>). The legacy direct
   `device-state.<id>` / `device-commands.<id>` subjects are kept so
   non-JetStream callers of NatsAuthCalloutScore still work.

   A new unit test (`device_role_covers_reconciler_contract_kv_subjects`)
   imports the contract crate as a dev-dep and asserts each contract-
   produced subject is matched, plus that cross-device subjects are
   *not* matched. This locks the role config to the contract surface so
   future renames break the test before they break prod.

2. Zitadel's `client_id` claim for a machine user equals the userName
   verbatim. Both `fleet_rpi_setup` and `fleet_e2e_demo` create the
   user as `device-{device_id}`, so the JWT carries
   `device-vm-device-00` while the agent's KV keys use the bare
   `vm-device-00`. The callout was interpolating the prefixed string
   into permissions, producing rules that never matched what the
   agent actually publishes.

   Adds `device_id_prefix_strip` (env: `DEVICE_ID_PREFIX_STRIP`,
   defaults empty so existing deployments are unaffected). When set,
   the validator strips the prefix from the extracted claim before
   permission interpolation. The fleet_auth_callout example wires it
   to `device-` so the e2e harness stays end-to-end correct without
   reaching into either naming convention.

Verified end-to-end: both VM agents now publish DeviceInfo /
heartbeat through JetStream KV with no permission errors and zero
service restarts since the rollout.
2026-05-03 17:49:48 -04:00

326 lines
11 KiB
Rust

use nkeys::KeyPair;
use crate::permissions::PermissionsConfig;
/// Default JWT claim path for Zitadel project roles.
///
/// Zitadel emits roles under this URN as a map of `{role-name: {org-id: org-name}}`.
/// The handler accepts both map and array shapes at this path.
pub const DEFAULT_ROLES_CLAIM: &str = "urn:zitadel:iam:org:project:roles";
/// Default role name granting unrestricted access (read+write on all subjects).
pub const DEFAULT_ADMIN_ROLE: &str = "fleet-admin";
/// Default role name granting per-device scoped access.
pub const DEFAULT_DEVICE_ROLE: &str = "device";
/// Configuration for the NATS auth callout service.
#[derive(Debug, Clone)]
pub struct AuthCalloutConfig {
/// NATS server URL to connect to.
pub nats_url: String,
/// Username for the auth callout service's own NATS connection.
pub auth_user: String,
/// Password for the auth callout service's own NATS connection.
pub auth_pass: String,
/// NKey pair used to sign user JWTs returned to NATS.
pub issuer_kp: KeyPair,
/// Account name to place authenticated users into. Must match the NATS
/// `auth_callout.account` setting.
pub target_account: String,
/// OIDC issuer URL (e.g. Zitadel).
pub oidc_issuer_url: String,
/// Expected OIDC audience.
pub oidc_audience: String,
/// JSON path to the device identifier claim (e.g. "device_id" or "custom.claim.path").
pub device_id_claim: String,
/// Optional prefix to strip from the extracted device-id claim before
/// it's used in permission interpolation. Lets the callout work with
/// the common Zitadel pattern where the machine user's `client_id`
/// is namespaced (`device-vm-device-00`) but the agent's KV keys use
/// the bare device id (`vm-device-00`). Empty string means no strip.
pub device_id_prefix_strip: String,
/// JSON path to the roles claim (e.g. Zitadel's `urn:zitadel:iam:org:project:roles`).
pub roles_claim: String,
/// Role name that, when present, grants the [`admin_permissions`] block.
pub admin_role: String,
/// Role name that, when present, grants the [`device_permissions`] block.
pub device_role: String,
/// Permissions issued for users carrying the [`admin_role`].
pub admin_permissions: PermissionsConfig,
/// Permissions issued for users carrying the [`device_role`]. May contain
/// `{device_id}` placeholders that the handler interpolates per request.
pub device_permissions: PermissionsConfig,
/// Whether to accept invalid TLS certificates (useful for local testing).
pub danger_accept_invalid_certs: bool,
}
impl AuthCalloutConfig {
pub fn builder() -> AuthCalloutConfigBuilder {
AuthCalloutConfigBuilder::default()
}
}
#[derive(Default)]
pub struct AuthCalloutConfigBuilder {
nats_url: Option<String>,
auth_user: Option<String>,
auth_pass: Option<String>,
issuer_kp: Option<KeyPair>,
target_account: Option<String>,
oidc_issuer_url: Option<String>,
oidc_audience: Option<String>,
device_id_claim: Option<String>,
device_id_prefix_strip: Option<String>,
roles_claim: Option<String>,
admin_role: Option<String>,
device_role: Option<String>,
admin_permissions: Option<PermissionsConfig>,
device_permissions: Option<PermissionsConfig>,
danger_accept_invalid_certs: bool,
}
impl AuthCalloutConfigBuilder {
pub fn nats_url(mut self, url: impl Into<String>) -> Self {
self.nats_url = Some(url.into());
self
}
pub fn auth_user(mut self, user: impl Into<String>) -> Self {
self.auth_user = Some(user.into());
self
}
pub fn auth_pass(mut self, pass: impl Into<String>) -> Self {
self.auth_pass = Some(pass.into());
self
}
pub fn issuer_kp(mut self, kp: KeyPair) -> Self {
self.issuer_kp = Some(kp);
self
}
pub fn target_account(mut self, account: impl Into<String>) -> Self {
self.target_account = Some(account.into());
self
}
pub fn oidc_issuer_url(mut self, url: impl Into<String>) -> Self {
self.oidc_issuer_url = Some(url.into());
self
}
pub fn oidc_audience(mut self, aud: impl Into<String>) -> Self {
self.oidc_audience = Some(aud.into());
self
}
pub fn device_id_claim(mut self, claim: impl Into<String>) -> Self {
self.device_id_claim = Some(claim.into());
self
}
pub fn device_id_prefix_strip(mut self, prefix: impl Into<String>) -> Self {
self.device_id_prefix_strip = Some(prefix.into());
self
}
pub fn roles_claim(mut self, claim: impl Into<String>) -> Self {
self.roles_claim = Some(claim.into());
self
}
pub fn admin_role(mut self, role: impl Into<String>) -> Self {
self.admin_role = Some(role.into());
self
}
pub fn device_role(mut self, role: impl Into<String>) -> Self {
self.device_role = Some(role.into());
self
}
pub fn admin_permissions(mut self, perms: PermissionsConfig) -> Self {
self.admin_permissions = Some(perms);
self
}
pub fn device_permissions(mut self, perms: PermissionsConfig) -> Self {
self.device_permissions = Some(perms);
self
}
pub fn danger_accept_invalid_certs(mut self, allow: bool) -> Self {
self.danger_accept_invalid_certs = allow;
self
}
pub fn build(self) -> anyhow::Result<AuthCalloutConfig> {
// Required fields are checked first so the resulting error names a
// missing field rather than panicking on default construction.
Ok(AuthCalloutConfig {
nats_url: self
.nats_url
.ok_or_else(|| anyhow::anyhow!("nats_url is required"))?,
auth_user: self.auth_user.unwrap_or_else(|| "auth".to_string()),
auth_pass: self.auth_pass.unwrap_or_else(|| "auth".to_string()),
issuer_kp: self
.issuer_kp
.ok_or_else(|| anyhow::anyhow!("issuer_kp is required"))?,
target_account: self.target_account.unwrap_or_else(|| "DEVICES".to_string()),
oidc_issuer_url: self
.oidc_issuer_url
.ok_or_else(|| anyhow::anyhow!("oidc_issuer_url is required"))?,
oidc_audience: self
.oidc_audience
.ok_or_else(|| anyhow::anyhow!("oidc_audience is required"))?,
device_id_claim: self
.device_id_claim
.unwrap_or_else(|| "device_id".to_string()),
device_id_prefix_strip: self.device_id_prefix_strip.unwrap_or_default(),
roles_claim: self
.roles_claim
.unwrap_or_else(|| DEFAULT_ROLES_CLAIM.to_string()),
admin_role: self
.admin_role
.unwrap_or_else(|| DEFAULT_ADMIN_ROLE.to_string()),
device_role: self
.device_role
.unwrap_or_else(|| DEFAULT_DEVICE_ROLE.to_string()),
admin_permissions: self
.admin_permissions
.unwrap_or_else(PermissionsConfig::admin_default),
device_permissions: self
.device_permissions
.unwrap_or_else(PermissionsConfig::device_default),
danger_accept_invalid_certs: self.danger_accept_invalid_certs,
})
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::permissions::PermissionSubjects;
use nkeys::KeyPair;
fn full_builder() -> AuthCalloutConfigBuilder {
AuthCalloutConfig::builder()
.nats_url("nats://localhost:4222")
.issuer_kp(KeyPair::new_account())
.oidc_issuer_url("https://issuer.example")
.oidc_audience("aud-1")
}
#[test]
fn defaults_are_applied_when_optional_fields_omitted() {
let cfg = full_builder().build().expect("build should succeed");
assert_eq!(cfg.auth_user, "auth");
assert_eq!(cfg.auth_pass, "auth");
assert_eq!(cfg.target_account, "DEVICES");
assert_eq!(cfg.device_id_claim, "device_id");
assert_eq!(cfg.roles_claim, DEFAULT_ROLES_CLAIM);
assert_eq!(cfg.admin_role, DEFAULT_ADMIN_ROLE);
assert_eq!(cfg.device_role, DEFAULT_DEVICE_ROLE);
assert!(!cfg.danger_accept_invalid_certs);
// Default permissions match the documented defaults of PermissionsConfig.
assert!(cfg.admin_permissions.r#pub.allow.contains(&">".to_string()));
assert!(
cfg.device_permissions
.r#pub
.allow
.iter()
.any(|s| s.contains("{device_id}"))
);
}
#[test]
fn missing_nats_url_errors() {
let err = AuthCalloutConfig::builder()
.issuer_kp(KeyPair::new_account())
.oidc_issuer_url("https://x")
.oidc_audience("y")
.build()
.unwrap_err();
assert!(err.to_string().contains("nats_url"));
}
#[test]
fn missing_issuer_kp_errors() {
let err = AuthCalloutConfig::builder()
.nats_url("nats://x")
.oidc_issuer_url("https://x")
.oidc_audience("y")
.build()
.unwrap_err();
assert!(err.to_string().contains("issuer_kp"));
}
#[test]
fn missing_oidc_issuer_url_errors() {
let err = AuthCalloutConfig::builder()
.nats_url("nats://x")
.issuer_kp(KeyPair::new_account())
.oidc_audience("y")
.build()
.unwrap_err();
assert!(err.to_string().contains("oidc_issuer_url"));
}
#[test]
fn missing_oidc_audience_errors() {
let err = AuthCalloutConfig::builder()
.nats_url("nats://x")
.issuer_kp(KeyPair::new_account())
.oidc_issuer_url("https://x")
.build()
.unwrap_err();
assert!(err.to_string().contains("oidc_audience"));
}
#[test]
fn explicit_overrides_take_effect() {
let cfg = full_builder()
.auth_user("svc")
.auth_pass("hunter2")
.target_account("ACME")
.device_id_claim("custom.path")
.roles_claim("custom_roles")
.admin_role("super-user")
.device_role("iot-thing")
.danger_accept_invalid_certs(true)
.build()
.unwrap();
assert_eq!(cfg.auth_user, "svc");
assert_eq!(cfg.auth_pass, "hunter2");
assert_eq!(cfg.target_account, "ACME");
assert_eq!(cfg.device_id_claim, "custom.path");
assert_eq!(cfg.roles_claim, "custom_roles");
assert_eq!(cfg.admin_role, "super-user");
assert_eq!(cfg.device_role, "iot-thing");
assert!(cfg.danger_accept_invalid_certs);
}
#[test]
fn permissions_overrides_take_effect() {
let perms = PermissionsConfig {
r#pub: PermissionSubjects {
allow: vec!["custom.>".to_string()],
deny: vec![],
},
sub: PermissionSubjects {
allow: vec!["custom.<".to_string()],
deny: vec![],
},
};
let cfg = full_builder()
.device_permissions(perms.clone())
.admin_permissions(perms)
.build()
.unwrap();
assert_eq!(cfg.admin_permissions.r#pub.allow, vec!["custom.>"]);
assert_eq!(cfg.device_permissions.sub.allow, vec!["custom.<"]);
}
}