Files
harmony/nats/callout/src/permissions.rs
Jean-Gabriel Gill-Couture d4fd4859ec
Some checks failed
Run Check Script / check (pull_request) Failing after -44h57m23s
fix(callout): align device permissions with KV key formats and machine-user prefix
Two bugs surfaced when the agent went live against NATS JetStream KV
in the VM-based e2e rehearsal:

1. The default `device` role only allowed flat `device-state.<id>` /
   `device-commands.<id>` subjects. The agent's actual data plane is
   JetStream KV, which puts every operation on `$KV.<bucket>.<key>`
   subjects with control-plane traffic on `$JS.API.>` and `$JS.ACK.>`.
   With the old role config, the very first KV publish died with
   `Permissions Violation for Publish to "$JS.API.INFO"`.

   The role now allows `$JS.API.>` + `$JS.ACK.>` plus the four
   per-device data subjects derived from
   harmony_reconciler_contracts::kv (info.<id>, state.<id>.<dep>,
   heartbeat.<id>, desired-state.<id>.<dep>). The legacy direct
   `device-state.<id>` / `device-commands.<id>` subjects are kept so
   non-JetStream callers of NatsAuthCalloutScore still work.

   A new unit test (`device_role_covers_reconciler_contract_kv_subjects`)
   imports the contract crate as a dev-dep and asserts each contract-
   produced subject is matched, plus that cross-device subjects are
   *not* matched. This locks the role config to the contract surface so
   future renames break the test before they break prod.

2. Zitadel's `client_id` claim for a machine user equals the userName
   verbatim. Both `fleet_rpi_setup` and `fleet_e2e_demo` create the
   user as `device-{device_id}`, so the JWT carries
   `device-vm-device-00` while the agent's KV keys use the bare
   `vm-device-00`. The callout was interpolating the prefixed string
   into permissions, producing rules that never matched what the
   agent actually publishes.

   Adds `device_id_prefix_strip` (env: `DEVICE_ID_PREFIX_STRIP`,
   defaults empty so existing deployments are unaffected). When set,
   the validator strips the prefix from the extracted claim before
   permission interpolation. The fleet_auth_callout example wires it
   to `device-` so the e2e harness stays end-to-end correct without
   reaching into either naming convention.

Verified end-to-end: both VM agents now publish DeviceInfo /
heartbeat through JetStream KV with no permission errors and zero
service restarts since the rollout.
2026-05-03 17:49:48 -04:00

295 lines
10 KiB
Rust

use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct PermissionSubjects {
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub allow: Vec<String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub deny: Vec<String>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct PermissionsConfig {
#[serde(default)]
pub sub: PermissionSubjects,
#[serde(default)]
pub r#pub: PermissionSubjects,
}
impl PermissionsConfig {
/// Permissions for an unrestricted "admin" role: read+write on every subject.
pub fn admin_default() -> Self {
Self {
r#pub: PermissionSubjects {
allow: vec![">".to_string()],
deny: vec![],
},
sub: PermissionSubjects {
allow: vec![">".to_string()],
deny: vec![],
},
}
}
/// Permissions for a per-device "device" role.
///
/// The fleet agent uses NATS JetStream KV for the desired-state /
/// device-info / device-state / device-heartbeat buckets. KV
/// operations on the wire become messages on `$KV.<bucket>.<key>`
/// subjects, with metadata + consumer-creation calls riding the
/// `$JS.API.>` subject space.
///
/// Cross-device isolation is enforced by the `{device_id}`
/// interpolation on the per-device data subjects: device A's JWT
/// cannot publish to `$KV.device-info.<B>` because that subject
/// isn't in the allow list at all. The broader `$JS.API.>` allow
/// is fine — JetStream API responses identify the caller, and the
/// data plane is what the security model actually has to defend.
pub fn device_default() -> Self {
Self {
r#pub: PermissionSubjects {
allow: vec![
// request/reply
"_INBOX.>".to_string(),
// legacy direct pub/sub channels (kept for callers
// using NatsAuthCalloutScore without JetStream)
"device-state.{device_id}".to_string(),
"device-state.{device_id}.>".to_string(),
// JetStream API + ACK plumbing — required for KV
// stream-info / consumer-create / pull-fetch.
"$JS.API.>".to_string(),
"$JS.ACK.>".to_string(),
// Per-device data writes. KV subjects are
// `$KV.<bucket>.<key>`. Key formats come from
// harmony_reconciler_contracts::kv:
// device_info_key -> info.<device_id>
// device_state_key -> state.<device_id>.<deployment>
// device_heartbeat_key -> heartbeat.<device_id>
"$KV.device-info.info.{device_id}".to_string(),
"$KV.device-state.state.{device_id}".to_string(),
"$KV.device-state.state.{device_id}.>".to_string(),
"$KV.device-heartbeat.heartbeat.{device_id}".to_string(),
],
deny: vec![],
},
sub: PermissionSubjects {
allow: vec![
"_INBOX.>".to_string(),
"device-commands.{device_id}".to_string(),
"device-commands.{device_id}.>".to_string(),
// Watch desired-state filtered to this device only.
// Key format: `<device_id>.<deployment>` (see
// harmony_reconciler_contracts::kv::desired_state_key).
"$KV.desired-state.{device_id}.>".to_string(),
],
deny: vec![],
},
}
}
}
impl Default for PermissionsConfig {
fn default() -> Self {
Self::device_default()
}
}
/// Result of interpolating a [`PermissionsConfig`] with a concrete device id.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct InterpolatedPermissions {
pub pub_allow: Vec<String>,
pub pub_deny: Vec<String>,
pub sub_allow: Vec<String>,
pub sub_deny: Vec<String>,
}
pub fn interpolate_permissions(
config: &PermissionsConfig,
device_id: &str,
) -> InterpolatedPermissions {
InterpolatedPermissions {
pub_allow: config
.r#pub
.allow
.iter()
.map(|s: &String| s.replace("{device_id}", device_id))
.collect(),
pub_deny: config
.r#pub
.deny
.iter()
.map(|s: &String| s.replace("{device_id}", device_id))
.collect(),
sub_allow: config
.sub
.allow
.iter()
.map(|s: &String| s.replace("{device_id}", device_id))
.collect(),
sub_deny: config
.sub
.deny
.iter()
.map(|s: &String| s.replace("{device_id}", device_id))
.collect(),
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn interpolates_device_id_in_all_subjects() {
let config = PermissionsConfig::device_default();
let perms = interpolate_permissions(&config, "sensor-42");
assert!(
perms
.pub_allow
.contains(&"device-state.sensor-42".to_string())
);
assert!(
perms
.pub_allow
.contains(&"device-state.sensor-42.>".to_string())
);
assert!(perms.pub_allow.contains(&"_INBOX.>".to_string()));
assert!(
perms
.sub_allow
.contains(&"device-commands.sensor-42".to_string())
);
assert!(
perms
.sub_allow
.contains(&"device-commands.sensor-42.>".to_string())
);
}
#[test]
fn admin_default_is_unrestricted() {
let perms = interpolate_permissions(&PermissionsConfig::admin_default(), "ignored");
assert_eq!(perms.pub_allow, vec![">"]);
assert_eq!(perms.sub_allow, vec![">"]);
}
#[test]
fn no_interpolation_when_no_placeholder() {
let config = PermissionsConfig {
r#pub: PermissionSubjects {
allow: vec!["_INBOX.>".to_string()],
deny: vec![],
},
sub: PermissionSubjects {
allow: vec!["_INBOX.>".to_string()],
deny: vec![],
},
};
let perms = interpolate_permissions(&config, "xyz");
assert_eq!(perms.pub_allow, vec!["_INBOX.>"]);
assert_eq!(perms.sub_allow, vec!["_INBOX.>"]);
}
/// Lock the device-role permissions to the actual key formats the
/// agent uses. KV operations on the wire are
/// `$KV.<bucket>.<key>` — if the contracts crate ever changes a
/// key format, this test breaks before the agent does in prod.
#[test]
fn device_role_covers_reconciler_contract_kv_subjects() {
use harmony_reconciler_contracts::{
BUCKET_DESIRED_STATE, BUCKET_DEVICE_HEARTBEAT, BUCKET_DEVICE_INFO, BUCKET_DEVICE_STATE,
DeploymentName, desired_state_key, device_heartbeat_key, device_info_key,
device_state_key,
};
let device = "vm-device-00";
let other = "vm-device-01";
let dn = DeploymentName::try_new("hello-web").unwrap();
let perms = interpolate_permissions(&PermissionsConfig::device_default(), device);
let info_subject = format!("$KV.{}.{}", BUCKET_DEVICE_INFO, device_info_key(device));
let state_subject = format!(
"$KV.{}.{}",
BUCKET_DEVICE_STATE,
device_state_key(device, &dn)
);
let heartbeat_subject = format!(
"$KV.{}.{}",
BUCKET_DEVICE_HEARTBEAT,
device_heartbeat_key(device)
);
let desired_subject = format!(
"$KV.{}.{}",
BUCKET_DESIRED_STATE,
desired_state_key(device, &dn)
);
assert!(
subject_matches_any(&info_subject, &perms.pub_allow),
"device-info publish must be allowed for own subject {info_subject}; \
pub_allow={:?}",
perms.pub_allow
);
assert!(
subject_matches_any(&state_subject, &perms.pub_allow),
"device-state publish must be allowed for {state_subject}; \
pub_allow={:?}",
perms.pub_allow
);
assert!(
subject_matches_any(&heartbeat_subject, &perms.pub_allow),
"device-heartbeat publish must be allowed for {heartbeat_subject}; \
pub_allow={:?}",
perms.pub_allow
);
assert!(
subject_matches_any(&desired_subject, &perms.sub_allow),
"desired-state subscribe must be allowed for {desired_subject}; \
sub_allow={:?}",
perms.sub_allow
);
let other_info = format!("$KV.{}.{}", BUCKET_DEVICE_INFO, device_info_key(other));
let other_desired = format!(
"$KV.{}.{}",
BUCKET_DESIRED_STATE,
desired_state_key(other, &dn)
);
assert!(
!subject_matches_any(&other_info, &perms.pub_allow),
"cross-device write to {other_info} must NOT be allowed under device {device}'s permissions"
);
assert!(
!subject_matches_any(&other_desired, &perms.sub_allow),
"cross-device subscribe to {other_desired} must NOT be allowed under device {device}'s permissions"
);
}
/// NATS-style subject match: `*` is a single token, `>` is one-or-more
/// trailing tokens. Good enough for asserting that a literal subject
/// is covered by a permission pattern.
fn subject_matches_any(subject: &str, patterns: &[String]) -> bool {
patterns.iter().any(|p| subject_matches(subject, p))
}
fn subject_matches(subject: &str, pattern: &str) -> bool {
let s: Vec<&str> = subject.split('.').collect();
let p: Vec<&str> = pattern.split('.').collect();
let mut i = 0;
while i < p.len() {
if p[i] == ">" {
return s.len() > i;
}
if i >= s.len() {
return false;
}
if p[i] != "*" && p[i] != s[i] {
return false;
}
i += 1;
}
i == s.len()
}
}