Move ZitadelJwt to stand alone module #324

Closed
reda wants to merge 1 commits from refactor/move-zitadel-jwt-to-module into docs/fleet-secrets-device-access
4 changed files with 449 additions and 461 deletions

View File

@@ -0,0 +1,25 @@
use anyhow::{Context, Result};
use harmony_fleet_auth::{CredentialSource, CredentialsSection, NatsCredential};
#[tokio::main]
async fn main() -> Result<()> {
let key_path = std::env::var("ZITADEL_KEY_PATH").context("ZITADEL_KEY_PATH missing")?;
let issuer = std::env::var("ZITADEL_ISSUER").context("ZITADEL_ISSUER missing")?;
let audience = std::env::var("ZITADEL_PROJECT_ID").context("ZITADEL_PROJECT_ID missing")?;
let source = CredentialSource::credential_source_from_config(&CredentialsSection::ZitadelJwt {
key_path: key_path.into(),
key_json: None,
oidc_issuer_url: issuer,
audience,
danger_accept_invalid_certs: false,
})?;
match source.next_credential().await? {
NatsCredential::BearerToken(token) => {
println!("{token}");
Ok(())
}
NatsCredential::UserPass { .. } => anyhow::bail!("expected bearer token"),
}
}

View File

@@ -23,14 +23,12 @@
//! cleaner than a Trait + factory.
use std::path::Path;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use std::sync::Arc;
use anyhow::{Context, Result};
use jsonwebtoken::{Algorithm, EncodingKey, Header as JwtHeader};
use serde::Deserialize;
use crate::config::CredentialsSection;
use crate::jwt_bearer::ZitadelJwtBearer;
use crate::{config::CredentialsSection, jwt_bearer::MachineKeyFile};
/// Material the NATS connector needs to authenticate. Returned per
/// (re)connect attempt — the source decides whether to mint fresh.
@@ -44,17 +42,8 @@ pub enum NatsCredential {
/// from the parsed `[credentials]` section; cloned via Arc into the
/// async-nats auth callback.
pub enum CredentialSource {
TomlShared {
user: String,
pass: String,
},
ZitadelJwt {
key: MachineKeyFile,
oidc_issuer_url: String,
audience: String,
http: reqwest::Client,
cache: Mutex<Option<CachedToken>>,
},
TomlShared { user: String, pass: String },
ZitadelJwt { bearer: Arc<ZitadelJwtBearer> },
}
impl CredentialSource {
@@ -72,212 +61,13 @@ impl CredentialSource {
}
async fn zitadel_next(&self) -> Result<NatsCredential> {
// Fast path: lock the cache synchronously, copy out the token if
// it's comfortably valid, drop the lock. Holding a MutexGuard
// across `.await` would make this future !Sync, which
// async-nats's `with_auth_callback` rejects at compile time.
if let Some(token) = self.cached_if_fresh() {
return Ok(NatsCredential::BearerToken(token));
}
// Slow path: mint outside any lock. Two concurrent (re)connect
// attempts could both reach here and both mint; that's a wasted
// HTTP round-trip in a rare race, not a correctness issue —
// the second writer wins and replaces the first's value.
let fresh = self.zitadel_mint().await?;
let token = fresh.access_token.clone();
if let Self::ZitadelJwt {
cache, audience, ..
} = self
&& let Ok(mut guard) = cache.lock()
{
*guard = Some(fresh);
tracing::info!(audience = %audience, "minted fresh Zitadel access token");
}
let Self::ZitadelJwt { bearer } = self else {
anyhow::bail!("zitadel_next called on non-ZitadelJwt variant");
};
let token = bearer.bearer_token().await?;
Ok(NatsCredential::BearerToken(token))
}
fn cached_if_fresh(&self) -> Option<String> {
let Self::ZitadelJwt { cache, .. } = self else {
return None;
};
let now = chrono::Utc::now().timestamp();
let guard = cache.lock().ok()?;
let cached = guard.as_ref()?;
if cached.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now {
Some(cached.access_token.clone())
} else {
None
}
}
async fn zitadel_mint(&self) -> Result<CachedToken> {
let Self::ZitadelJwt {
key,
oidc_issuer_url,
audience,
http,
..
} = self
else {
anyhow::bail!("zitadel_mint called on non-ZitadelJwt variant");
};
let now = chrono::Utc::now().timestamp();
let assertion = build_assertion(key, oidc_issuer_url, now)?;
let scope = build_scope(audience);
let token_url = build_token_url(oidc_issuer_url);
let resp = http
.post(&token_url)
.form(&[
(
"grant_type",
"urn:ietf:params:oauth:grant-type:jwt-bearer".to_string(),
),
("assertion", assertion),
("scope", scope),
])
.send()
.await
.with_context(|| format!("POST {token_url}"))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("Zitadel token endpoint returned {status}: {body}");
}
#[derive(Deserialize)]
struct TokenResponse {
access_token: String,
#[serde(default)]
expires_in: Option<i64>,
}
let tr: TokenResponse = resp.json().await.context("parsing token response")?;
// Zitadel typically returns 12h (43200s); be defensive against
// a missing field by assuming a conservative 1h.
let expires_in = tr.expires_in.unwrap_or(3600);
Ok(CachedToken {
access_token: tr.access_token,
expires_at_unix: now + expires_in,
})
}
}
/// Build the JWT-bearer assertion. Split out from the network path so
/// the claims + header shape can be unit-tested without an HTTP server,
/// and split internally into the (pure) claim/header builders so they
/// can be unit-tested without an RSA private key fixture.
pub(crate) fn build_assertion(
key: &MachineKeyFile,
oidc_issuer_url: &str,
now: i64,
) -> Result<String> {
let claims = build_assertion_claims(key, oidc_issuer_url, now);
let header = build_assertion_header(key);
let assertion = jsonwebtoken::encode(
&header,
&claims,
&EncodingKey::from_rsa_pem(key.key.as_bytes())
.context("parsing RSA private key from machine key file")?,
)
.context("signing JWT assertion")?;
Ok(assertion)
}
/// Pure claim payload for the JWT-bearer assertion. `iss == sub == userId`
/// is a Zitadel requirement; `aud` is Zitadel itself (the token endpoint
/// is reached via `oidc_issuer_url`); `exp - iat` MUST be ≤ 60 s or
/// Zitadel rejects.
pub(crate) fn build_assertion_claims(
key: &MachineKeyFile,
oidc_issuer_url: &str,
now: i64,
) -> serde_json::Value {
serde_json::json!({
"iss": key.user_id,
"sub": key.user_id,
"aud": oidc_issuer_url,
"exp": now + ASSERTION_LIFETIME_SECS,
"iat": now,
})
}
/// JWT header for the assertion. The `kid` tells Zitadel which of the
/// machine user's registered keys to verify the signature against.
pub(crate) fn build_assertion_header(key: &MachineKeyFile) -> JwtHeader {
let mut header = JwtHeader::new(Algorithm::RS256);
header.kid = Some(key.key_id.clone());
header
}
/// Build the OAuth `scope` string for the token-bearer request.
///
/// Three scopes are needed for the access token to be useful here:
///
/// * `openid` — base OIDC requirement.
/// * `urn:zitadel:iam:org:projects:roles` (PLURAL "projects") —
/// tells Zitadel to include the role-claim block in the access
/// token. Without this, the callout sees "no authorized role
/// in token" even when the user has a project role grant.
/// * `urn:zitadel:iam:org:project:id:<aud>:aud` (SINGULAR
/// "project") — adds <aud> to the access token's `aud` claim
/// so the callout's audience validation accepts the project
/// ID we're using as the JWT-bearer audience.
///
/// The plural-vs-singular distinction is a Zitadel convention,
/// not a typo. Both scopes are required.
pub(crate) fn build_scope(audience: &str) -> String {
format!(
"openid \
urn:zitadel:iam:org:projects:roles \
urn:zitadel:iam:org:project:id:{audience}:aud"
)
}
/// Resolve the token endpoint URL, tolerating a trailing slash on
/// `oidc_issuer_url`. Without trimming, a configured issuer of
/// `https://sso.example.com/` produces `…//oauth/v2/token` which 404s.
pub(crate) fn build_token_url(oidc_issuer_url: &str) -> String {
format!("{}/oauth/v2/token", oidc_issuer_url.trim_end_matches('/'))
}
// ---- helper types ----------------------------------------------------------
/// JSON keyfile content as Zitadel emits it for a `KEY_TYPE_JSON`
/// machine key. The `key` is a PEM-encoded RSA private key.
#[derive(Debug, Clone, Deserialize)]
pub struct MachineKeyFile {
#[serde(rename = "type")]
pub _type: String,
#[serde(rename = "keyId")]
pub key_id: String,
pub key: String,
#[serde(rename = "userId")]
pub user_id: String,
}
#[derive(Debug, Clone)]
pub struct CachedToken {
pub(crate) access_token: String,
/// Unix seconds at which the token is no longer trusted by
/// `cached_if_fresh`. Computed from the OAuth response's `expires_in`
/// and the local clock at mint time.
pub(crate) expires_at_unix: i64,
}
/// Refresh tokens this many seconds before their advertised expiry.
/// Five minutes leaves headroom for clock skew, slow networks, and
/// the round-trip cost of re-minting against Zitadel.
pub const TOKEN_REFRESH_LEEWAY_SECS: i64 = 5 * 60;
/// Lifetime of the JWT *assertion* (the client-side bearer JWT we sign
/// to authenticate to Zitadel's token endpoint). Zitadel rejects
/// assertions with `exp - iat > 60s`; one minute is the safe ceiling.
pub const ASSERTION_LIFETIME_SECS: i64 = 60;
// ---- factory ---------------------------------------------------------------
/// Build the appropriate `CredentialSource` from the parsed config.
///
/// For [`CredentialsSection::ZitadelJwt`] this reads the keyfile from
@@ -285,7 +75,9 @@ pub const ASSERTION_LIFETIME_SECS: i64 = 60;
/// (Secret volume in the operator's Pod, dropped by
/// `FleetDeviceSetupScore` on the agent's VM); the path is just
/// configured differently.
pub fn credential_source_from_config(creds: &CredentialsSection) -> Result<Arc<CredentialSource>> {
pub fn credential_source_from_config(
creds: &CredentialsSection,
) -> Result<Arc<CredentialSource>> {
match creds {
CredentialsSection::TomlShared {
nats_user,
@@ -311,20 +103,19 @@ pub fn credential_source_from_config(creds: &CredentialsSection) -> Result<Arc<C
Some(json) if !json.is_empty() => parse_machine_key(json)?,
_ => load_machine_key(key_path)?,
};
Ok(Arc::new(CredentialSource::ZitadelJwt {
let bearer = ZitadelJwtBearer::new(
key,
oidc_issuer_url: oidc_issuer_url.clone(),
audience: audience.clone(),
http: reqwest::Client::builder()
.danger_accept_invalid_certs(*danger_accept_invalid_certs)
.timeout(Duration::from_secs(10))
.build()
.context("building HTTP client for Zitadel token endpoint")?,
cache: Mutex::new(None),
oidc_issuer_url.clone(),
audience.clone(),
*danger_accept_invalid_certs,
)?;
Ok(Arc::new(CredentialSource::ZitadelJwt {
bearer: Arc::new(bearer),
}))
}
}
}
}
fn load_machine_key(key_path: &Path) -> Result<MachineKeyFile> {
let raw = std::fs::read_to_string(key_path)
@@ -341,30 +132,6 @@ fn parse_machine_key(raw: &str) -> Result<MachineKeyFile> {
mod tests {
use super::*;
fn fake_key() -> MachineKeyFile {
MachineKeyFile {
_type: "serviceaccount".to_string(),
key_id: "kid-371358469099356247".to_string(),
// Real PEM not required for the pure-builder tests; the
// signing path that needs a parseable key is exercised
// end-to-end in the e2e harness.
key: "PEM-PLACEHOLDER".to_string(),
user_id: "uid-371358469065801815".to_string(),
}
}
fn zjwt_source() -> CredentialSource {
CredentialSource::ZitadelJwt {
key: fake_key(),
oidc_issuer_url: "http://sso.fleet.local:8080".to_string(),
audience: "366378028009259037".to_string(),
http: reqwest::Client::new(),
cache: Mutex::new(None),
}
}
// ---- next_credential / cache state -------------------------------------
#[tokio::test]
async fn toml_shared_returns_userpass_each_call() {
let s = CredentialSource::TomlShared {
@@ -380,174 +147,4 @@ mod tests {
other => panic!("expected UserPass, got {other:?}"),
}
}
#[test]
fn cached_token_within_leeway_is_treated_as_expired() {
// Sanity-check the comparison so refactors don't accidentally
// invert the leeway window.
let now = chrono::Utc::now().timestamp();
let about_to_expire = CachedToken {
access_token: "x".to_string(),
expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS - 1,
};
assert!(
about_to_expire.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS <= now,
"tokens within the leeway window must be considered expired"
);
let comfortable = CachedToken {
access_token: "x".to_string(),
expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS + 60,
};
assert!(
comfortable.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now,
"tokens with comfortable headroom must be cache-hits"
);
}
#[test]
fn cached_if_fresh_returns_some_when_outside_leeway() {
let src = zjwt_source();
let now = chrono::Utc::now().timestamp();
if let CredentialSource::ZitadelJwt { cache, .. } = &src {
*cache.lock().unwrap() = Some(CachedToken {
access_token: "fresh".to_string(),
expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS + 60,
});
}
assert_eq!(src.cached_if_fresh(), Some("fresh".to_string()));
}
#[test]
fn cached_if_fresh_returns_none_when_no_cache() {
// Brand-new ZitadelJwt source — no token has been minted yet.
// Forces the slow path on first connect.
let src = zjwt_source();
assert_eq!(src.cached_if_fresh(), None);
}
#[test]
fn cached_if_fresh_returns_none_for_toml_shared() {
// Defensive: cache_if_fresh is only meaningful for ZitadelJwt;
// TomlShared has no cache. A nonsensical call must return None,
// not panic, so the cold-path can degrade gracefully.
let src = CredentialSource::TomlShared {
user: "u".into(),
pass: "p".into(),
};
assert_eq!(src.cached_if_fresh(), None);
}
// ---- assertion claims / header (pure builders) ------------------------
#[test]
fn assertion_claims_carry_iss_sub_aud_exp_iat() {
let now = 1_700_000_000;
let claims = build_assertion_claims(&fake_key(), "http://sso.fleet.local:8080", now);
assert_eq!(claims["iss"], "uid-371358469065801815");
assert_eq!(claims["sub"], "uid-371358469065801815");
assert_eq!(claims["aud"], "http://sso.fleet.local:8080");
assert_eq!(claims["iat"].as_i64(), Some(now));
assert_eq!(claims["exp"].as_i64(), Some(now + ASSERTION_LIFETIME_SECS));
}
#[test]
fn assertion_lifetime_locked_at_60_seconds() {
// Zitadel rejects assertions where exp - iat > 60s. If anyone
// bumps ASSERTION_LIFETIME_SECS thinking "more is safer", the
// mints will silently start failing in prod with no helpful
// error. Lock the constant.
assert_eq!(ASSERTION_LIFETIME_SECS, 60);
}
#[test]
fn assertion_header_carries_kid_and_rs256() {
let header = build_assertion_header(&fake_key());
assert_eq!(header.alg, jsonwebtoken::Algorithm::RS256);
assert_eq!(header.kid.as_deref(), Some("kid-371358469099356247"));
}
// ---- scope string ------------------------------------------------------
#[test]
fn scope_includes_plural_projects_roles() {
// The plural-projects URN is what tells Zitadel to emit the
// role claim. Day-one bug; lock it.
let s = build_scope("366378028009259037");
assert!(
s.contains("urn:zitadel:iam:org:projects:roles"),
"scope must include the PLURAL projects-roles URN; got {s:?}"
);
}
#[test]
fn scope_audience_uses_singular_project_id_urn() {
// The singular-project URN tells Zitadel to put <id> into the
// access token's aud claim. Different URN entirely from the
// plural one above; both required.
let s = build_scope("366378028009259037");
assert!(
s.contains("urn:zitadel:iam:org:project:id:366378028009259037:aud"),
"scope must include the SINGULAR project:id:<aud>:aud URN; got {s:?}"
);
}
#[test]
fn scope_includes_openid_base() {
let s = build_scope("any");
assert!(
s.split_whitespace().any(|tok| tok == "openid"),
"scope must include `openid` as a standalone token; got {s:?}"
);
}
// ---- token URL ---------------------------------------------------------
#[test]
fn token_url_appends_oauth_endpoint() {
assert_eq!(
build_token_url("http://sso.fleet.local:8080"),
"http://sso.fleet.local:8080/oauth/v2/token"
);
}
#[test]
fn token_url_strips_single_trailing_slash() {
// A trailing slash would yield `…//oauth/v2/token`, which 404s.
// Common configuration drift; the trim guards against it.
assert_eq!(
build_token_url("http://sso.fleet.local:8080/"),
"http://sso.fleet.local:8080/oauth/v2/token"
);
}
#[test]
fn token_url_strips_multiple_trailing_slashes() {
// Defensive — `trim_end_matches('/')` peels all of them, not
// just the first. Locks that semantics.
assert_eq!(
build_token_url("http://sso.fleet.local:8080///"),
"http://sso.fleet.local:8080/oauth/v2/token"
);
}
// ---- MachineKeyFile JSON parsing --------------------------------------
#[test]
fn machine_key_file_parses_zitadel_json_shape() {
// The serde renames (`type`, `keyId`, `userId`) are easy to
// break. This is the literal JSON shape Zitadel's
// /management/v1/users/.../keys endpoint emits.
let raw = r#"{
"type": "serviceaccount",
"keyId": "371358469099356247",
"key": "-----BEGIN RSA PRIVATE KEY-----\nABC\n-----END RSA PRIVATE KEY-----\n",
"userId": "371358469065801815"
}"#;
let parsed: MachineKeyFile = serde_json::from_str(raw).expect("valid keyfile");
assert_eq!(parsed._type, "serviceaccount");
assert_eq!(parsed.key_id, "371358469099356247");
assert_eq!(parsed.user_id, "371358469065801815");
assert!(parsed.key.contains("BEGIN RSA PRIVATE KEY"));
}
}

View File

@@ -0,0 +1,367 @@
use serde::Deserialize;
use std::sync::Mutex;
use anyhow::{Context, Result};
use jsonwebtoken::{Algorithm, EncodingKey, Header as JwtHeader};
pub struct ZitadelJwtBearer {
key: MachineKeyFile,
oidc_issuer_url: String,
audience: String,
http: reqwest::Client,
cache: Mutex<Option<CachedToken>>,
}
/// Refresh tokens this many seconds before their advertised expiry.
/// Five minutes leaves headroom for clock skew, slow networks, and
/// the round-trip cost of re-minting against Zitadel.
pub const TOKEN_REFRESH_LEEWAY_SECS: i64 = 5 * 60;
/// Lifetime of the JWT *assertion* (the client-side bearer JWT we sign
/// to authenticate to Zitadel's token endpoint). Zitadel rejects
/// assertions with `exp - iat > 60s`; one minute is the safe ceiling.
pub const ASSERTION_LIFETIME_SECS: i64 = 60;
/// JSON keyfile content as Zitadel emits it for a `KEY_TYPE_JSON`
/// machine key. The `key` is a PEM-encoded RSA private key.
#[derive(Debug, Clone, Deserialize)]
pub struct MachineKeyFile {
#[serde(rename = "type")]
pub _type: String,
#[serde(rename = "keyId")]
pub key_id: String,
pub key: String,
#[serde(rename = "userId")]
pub user_id: String,
}
#[derive(Debug, Clone)]
pub struct CachedToken {
pub(crate) access_token: String,
/// Unix seconds at which the token is no longer trusted by
/// `cached_if_fresh`. Computed from the OAuth response's `expires_in`
/// and the local clock at mint time.
pub(crate) expires_at_unix: i64,
}
#[derive(Deserialize)]
struct TokenResponse {
access_token: String,
#[serde(default)]
expires_in: Option<i64>,
}
impl ZitadelJwtBearer {
pub fn new(
key: MachineKeyFile,
oidc_issuer_url: String,
audience: String,
danger_accept_invalid_certs: bool,
) -> Result<Self> {
let http = reqwest::Client::builder()
.danger_accept_invalid_certs(danger_accept_invalid_certs)
.timeout(std::time::Duration::from_secs(10))
.build()
.context("building HTTP client for Zitadel token endpoint")?;
Ok(Self {
key,
oidc_issuer_url,
audience,
http,
cache: Mutex::new(None),
})
}
pub async fn bearer_token(&self) -> Result<String> {
{
let now = chrono::Utc::now().timestamp();
let guard = self.cache.lock().unwrap();
if let Some(cached) = guard.as_ref() {
if cached.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now {
return Ok(cached.access_token.clone());
}
}
}
let now = chrono::Utc::now().timestamp();
let assertion = build_assertion(&self.key, &self.oidc_issuer_url, now)?;
let scope = build_scope(&self.audience);
let token_url = build_token_url(&self.oidc_issuer_url);
let resp = self
.http
.post(&token_url)
.form(&[
(
"grant_type",
"urn:ietf:params:oauth:grant-type:jwt-bearer".to_string(),
),
("assertion", assertion),
("scope", scope),
])
.send()
.await
.with_context(|| format!("POST {token_url}"))?;
if !resp.status().is_success() {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
anyhow::bail!("Zitadel token endpoint returned {status}: {body}");
}
let tr: TokenResponse = resp.json().await.context("parsing token response")?;
let expires_in = tr.expires_in.unwrap_or(3600);
let token = tr.access_token.clone();
*self.cache.lock().unwrap() = Some(CachedToken {
access_token: tr.access_token,
expires_at_unix: now + expires_in,
});
Ok(token)
}
pub fn invalidate_cache(&self) {
*self.cache.lock().unwrap() = None;
}
#[cfg(test)]
fn cached_if_fresh(&self) -> Option<String> {
let now = chrono::Utc::now().timestamp();
let guard = self.cache.lock().unwrap();
let cached = guard.as_ref()?;
(cached.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now)
.then(|| cached.access_token.clone())
}
}
/// Build the JWT-bearer assertion. Split out from the network path so
/// the claims + header shape can be unit-tested without an HTTP server,
/// and split internally into the (pure) claim/header builders so they
/// can be unit-tested without an RSA private key fixture.
pub(crate) fn build_assertion(
key: &MachineKeyFile,
oidc_issuer_url: &str,
now: i64,
) -> Result<String> {
let claims = build_assertion_claims(key, oidc_issuer_url, now);
let header = build_assertion_header(key);
let assertion = jsonwebtoken::encode(
&header,
&claims,
&EncodingKey::from_rsa_pem(key.key.as_bytes())
.context("parsing RSA private key from machine key file")?,
)
.context("signing JWT assertion")?;
Ok(assertion)
}
/// Pure claim payload for the JWT-bearer assertion. `iss == sub == userId`
/// is a Zitadel requirement; `aud` is Zitadel itself (the token endpoint
/// is reached via `oidc_issuer_url`); `exp - iat` MUST be ≤ 60 s or
/// Zitadel rejects.
pub(crate) fn build_assertion_claims(
key: &MachineKeyFile,
oidc_issuer_url: &str,
now: i64,
) -> serde_json::Value {
serde_json::json!({
"iss": key.user_id,
"sub": key.user_id,
"aud": oidc_issuer_url,
"exp": now + ASSERTION_LIFETIME_SECS,
"iat": now,
})
}
/// JWT header for the assertion. The `kid` tells Zitadel which of the
/// machine user's registered keys to verify the signature against.
pub(crate) fn build_assertion_header(key: &MachineKeyFile) -> JwtHeader {
let mut header = JwtHeader::new(Algorithm::RS256);
header.kid = Some(key.key_id.clone());
header
}
/// Build the OAuth `scope` string for the token-bearer request.
///
/// Three scopes are needed for the access token to be useful here:
///
/// * `openid` — base OIDC requirement.
/// * `urn:zitadel:iam:org:projects:roles` (PLURAL "projects") —
/// tells Zitadel to include the role-claim block in the access
/// token. Without this, the callout sees "no authorized role
/// in token" even when the user has a project role grant.
/// * `urn:zitadel:iam:org:project:id:<aud>:aud` (SINGULAR
/// "project") — adds <aud> to the access token's `aud` claim
/// so the callout's audience validation accepts the project
/// ID we're using as the JWT-bearer audience.
///
/// The plural-vs-singular distinction is a Zitadel convention,
/// not a typo. Both scopes are required.
pub(crate) fn build_scope(audience: &str) -> String {
format!(
"openid \
urn:zitadel:iam:org:projects:roles \
urn:zitadel:iam:org:project:id:{audience}:aud"
)
}
/// Resolve the token endpoint URL, tolerating a trailing slash on
/// `oidc_issuer_url`. Without trimming, a configured issuer of
/// `https://sso.example.com/` produces `…//oauth/v2/token` which 404s.
pub(crate) fn build_token_url(oidc_issuer_url: &str) -> String {
format!("{}/oauth/v2/token", oidc_issuer_url.trim_end_matches('/'))
}
#[cfg(test)]
mod tests {
use super::*;
fn fake_key() -> MachineKeyFile {
MachineKeyFile {
_type: "serviceaccount".to_string(),
key_id: "kid-371358469099356247".to_string(),
key: "PEM-PLACEHOLDER".to_string(),
user_id: "uid-371358469065801815".to_string(),
}
}
fn bearer() -> ZitadelJwtBearer {
ZitadelJwtBearer::new(
fake_key(),
"http://sso.fleet.local:8080".to_string(),
"366378028009259037".to_string(),
false,
)
.unwrap()
}
#[test]
fn cached_token_within_leeway_is_treated_as_expired() {
let now = chrono::Utc::now().timestamp();
let about_to_expire = CachedToken {
access_token: "x".to_string(),
expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS - 1,
};
assert!(
about_to_expire.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS <= now,
"tokens within the leeway window must be considered expired"
);
let comfortable = CachedToken {
access_token: "x".to_string(),
expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS + 60,
};
assert!(
comfortable.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now,
"tokens with comfortable headroom must be cache-hits"
);
}
#[test]
fn cached_if_fresh_returns_some_when_outside_leeway() {
let bearer = bearer();
let now = chrono::Utc::now().timestamp();
*bearer.cache.lock().unwrap() = Some(CachedToken {
access_token: "fresh".to_string(),
expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS + 60,
});
assert_eq!(bearer.cached_if_fresh(), Some("fresh".to_string()));
}
#[test]
fn cached_if_fresh_returns_none_when_no_cache() {
assert_eq!(bearer().cached_if_fresh(), None);
}
#[test]
fn assertion_claims_carry_iss_sub_aud_exp_iat() {
let now = 1_700_000_000;
let claims = build_assertion_claims(&fake_key(), "http://sso.fleet.local:8080", now);
assert_eq!(claims["iss"], "uid-371358469065801815");
assert_eq!(claims["sub"], "uid-371358469065801815");
assert_eq!(claims["aud"], "http://sso.fleet.local:8080");
assert_eq!(claims["iat"].as_i64(), Some(now));
assert_eq!(claims["exp"].as_i64(), Some(now + ASSERTION_LIFETIME_SECS));
}
#[test]
fn assertion_lifetime_locked_at_60_seconds() {
assert_eq!(ASSERTION_LIFETIME_SECS, 60);
}
#[test]
fn assertion_header_carries_kid_and_rs256() {
let header = build_assertion_header(&fake_key());
assert_eq!(header.alg, jsonwebtoken::Algorithm::RS256);
assert_eq!(header.kid.as_deref(), Some("kid-371358469099356247"));
}
#[test]
fn scope_includes_plural_projects_roles() {
let s = build_scope("366378028009259037");
assert!(
s.contains("urn:zitadel:iam:org:projects:roles"),
"scope must include the PLURAL projects-roles URN; got {s:?}"
);
}
#[test]
fn scope_audience_uses_singular_project_id_urn() {
let s = build_scope("366378028009259037");
assert!(
s.contains("urn:zitadel:iam:org:project:id:366378028009259037:aud"),
"scope must include the SINGULAR project:id:<aud>:aud URN; got {s:?}"
);
}
#[test]
fn scope_includes_openid_base() {
let s = build_scope("any");
assert!(
s.split_whitespace().any(|tok| tok == "openid"),
"scope must include `openid` as a standalone token; got {s:?}"
);
}
#[test]
fn token_url_appends_oauth_endpoint() {
assert_eq!(
build_token_url("http://sso.fleet.local:8080"),
"http://sso.fleet.local:8080/oauth/v2/token"
);
}
#[test]
fn token_url_strips_single_trailing_slash() {
assert_eq!(
build_token_url("http://sso.fleet.local:8080/"),
"http://sso.fleet.local:8080/oauth/v2/token"
);
}
#[test]
fn token_url_strips_multiple_trailing_slashes() {
assert_eq!(
build_token_url("http://sso.fleet.local:8080///"),
"http://sso.fleet.local:8080/oauth/v2/token"
);
}
#[test]
fn machine_key_file_parses_zitadel_json_shape() {
let raw = r#"{
"type": "serviceaccount",
"keyId": "371358469099356247",
"key": "-----BEGIN RSA PRIVATE KEY-----\nABC\n-----END RSA PRIVATE KEY-----\n",
"userId": "371358469065801815"
}"#;
let parsed: MachineKeyFile = serde_json::from_str(raw).expect("valid keyfile");
assert_eq!(parsed._type, "serviceaccount");
assert_eq!(parsed.key_id, "371358469099356247");
assert_eq!(parsed.user_id, "371358469065801815");
assert!(parsed.key.contains("BEGIN RSA PRIVATE KEY"));
}
}

View File

@@ -24,16 +24,15 @@
mod agent_config;
mod config;
mod credentials;
mod jwt_bearer;
pub use agent_config::{AgentConfig, AgentSection, NatsSection, load_config};
pub use config::CredentialsSection;
pub use credentials::{
ASSERTION_LIFETIME_SECS, CachedToken, CredentialSource, MachineKeyFile, NatsCredential,
TOKEN_REFRESH_LEEWAY_SECS, credential_source_from_config,
};
use std::sync::Arc;
pub use crate::credentials::{CredentialSource, NatsCredential};
/// Build `async_nats::ConnectOptions` wired with the auth callback
/// that pulls fresh credentials from `creds` on every (re)connect.
///