From 788227b8c083d938b7c6e8615376407cf5c0b214 Mon Sep 17 00:00:00 2001 From: Reda Tarzalt Date: Tue, 2 Jun 2026 10:32:11 -0400 Subject: [PATCH 1/2] stand alone module for jwt --- .../examples/mint_zitadel_token.rs | 25 + fleet/harmony-fleet-auth/src/credentials.rs | 511 ++---------------- fleet/harmony-fleet-auth/src/jwt_bearer.rs | 367 +++++++++++++ fleet/harmony-fleet-auth/src/lib.rs | 7 +- 4 files changed, 449 insertions(+), 461 deletions(-) create mode 100644 fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs create mode 100644 fleet/harmony-fleet-auth/src/jwt_bearer.rs diff --git a/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs b/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs new file mode 100644 index 00000000..0b48fb64 --- /dev/null +++ b/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs @@ -0,0 +1,25 @@ +use anyhow::{Context, Result}; +use harmony_fleet_auth::{CredentialSource, CredentialsSection, NatsCredential}; + +#[tokio::main] +async fn main() -> Result<()> { + let key_path = std::env::var("ZITADEL_KEY_PATH").context("ZITADEL_KEY_PATH missing")?; + let issuer = std::env::var("ZITADEL_ISSUER").context("ZITADEL_ISSUER missing")?; + let audience = std::env::var("ZITADEL_PROJECT_ID").context("ZITADEL_PROJECT_ID missing")?; + + let source = CredentialSource::credential_source_from_config(&CredentialsSection::ZitadelJwt { + key_path: key_path.into(), + key_json: None, + oidc_issuer_url: issuer, + audience, + danger_accept_invalid_certs: false, + })?; + + match source.next_credential().await? { + NatsCredential::BearerToken(token) => { + println!("{token}"); + Ok(()) + } + NatsCredential::UserPass { .. } => anyhow::bail!("expected bearer token"), + } +} diff --git a/fleet/harmony-fleet-auth/src/credentials.rs b/fleet/harmony-fleet-auth/src/credentials.rs index 82aa799d..964ff0fe 100644 --- a/fleet/harmony-fleet-auth/src/credentials.rs +++ b/fleet/harmony-fleet-auth/src/credentials.rs @@ -23,14 +23,12 @@ //! cleaner than a Trait + factory. use std::path::Path; -use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::sync::Arc; use anyhow::{Context, Result}; -use jsonwebtoken::{Algorithm, EncodingKey, Header as JwtHeader}; -use serde::Deserialize; -use crate::config::CredentialsSection; +use crate::jwt_bearer::ZitadelJwtBearer; +use crate::{config::CredentialsSection, jwt_bearer::MachineKeyFile}; /// Material the NATS connector needs to authenticate. Returned per /// (re)connect attempt — the source decides whether to mint fresh. @@ -44,17 +42,8 @@ pub enum NatsCredential { /// from the parsed `[credentials]` section; cloned via Arc into the /// async-nats auth callback. pub enum CredentialSource { - TomlShared { - user: String, - pass: String, - }, - ZitadelJwt { - key: MachineKeyFile, - oidc_issuer_url: String, - audience: String, - http: reqwest::Client, - cache: Mutex>, - }, + TomlShared { user: String, pass: String }, + ZitadelJwt { bearer: Arc }, } impl CredentialSource { @@ -72,256 +61,58 @@ impl CredentialSource { } async fn zitadel_next(&self) -> Result { - // Fast path: lock the cache synchronously, copy out the token if - // it's comfortably valid, drop the lock. Holding a MutexGuard - // across `.await` would make this future !Sync, which - // async-nats's `with_auth_callback` rejects at compile time. - if let Some(token) = self.cached_if_fresh() { - return Ok(NatsCredential::BearerToken(token)); - } - // Slow path: mint outside any lock. Two concurrent (re)connect - // attempts could both reach here and both mint; that's a wasted - // HTTP round-trip in a rare race, not a correctness issue — - // the second writer wins and replaces the first's value. - let fresh = self.zitadel_mint().await?; - let token = fresh.access_token.clone(); - if let Self::ZitadelJwt { - cache, audience, .. - } = self - && let Ok(mut guard) = cache.lock() - { - *guard = Some(fresh); - tracing::info!(audience = %audience, "minted fresh Zitadel access token"); - } + let Self::ZitadelJwt { bearer } = self else { + anyhow::bail!("zitadel_next called on non-ZitadelJwt variant"); + }; + let token = bearer.bearer_token().await?; Ok(NatsCredential::BearerToken(token)) } - fn cached_if_fresh(&self) -> Option { - let Self::ZitadelJwt { cache, .. } = self else { - return None; - }; - let now = chrono::Utc::now().timestamp(); - let guard = cache.lock().ok()?; - let cached = guard.as_ref()?; - if cached.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now { - Some(cached.access_token.clone()) - } else { - None - } - } - - async fn zitadel_mint(&self) -> Result { - let Self::ZitadelJwt { - key, - oidc_issuer_url, - audience, - http, - .. - } = self - else { - anyhow::bail!("zitadel_mint called on non-ZitadelJwt variant"); - }; - - let now = chrono::Utc::now().timestamp(); - let assertion = build_assertion(key, oidc_issuer_url, now)?; - let scope = build_scope(audience); - let token_url = build_token_url(oidc_issuer_url); - - let resp = http - .post(&token_url) - .form(&[ - ( - "grant_type", - "urn:ietf:params:oauth:grant-type:jwt-bearer".to_string(), - ), - ("assertion", assertion), - ("scope", scope), - ]) - .send() - .await - .with_context(|| format!("POST {token_url}"))?; - - if !resp.status().is_success() { - let status = resp.status(); - let body = resp.text().await.unwrap_or_default(); - anyhow::bail!("Zitadel token endpoint returned {status}: {body}"); - } - - #[derive(Deserialize)] - struct TokenResponse { - access_token: String, - #[serde(default)] - expires_in: Option, - } - let tr: TokenResponse = resp.json().await.context("parsing token response")?; - // Zitadel typically returns 12h (43200s); be defensive against - // a missing field by assuming a conservative 1h. - let expires_in = tr.expires_in.unwrap_or(3600); - Ok(CachedToken { - access_token: tr.access_token, - expires_at_unix: now + expires_in, - }) - } -} - -/// Build the JWT-bearer assertion. Split out from the network path so -/// the claims + header shape can be unit-tested without an HTTP server, -/// and split internally into the (pure) claim/header builders so they -/// can be unit-tested without an RSA private key fixture. -pub(crate) fn build_assertion( - key: &MachineKeyFile, - oidc_issuer_url: &str, - now: i64, -) -> Result { - let claims = build_assertion_claims(key, oidc_issuer_url, now); - let header = build_assertion_header(key); - let assertion = jsonwebtoken::encode( - &header, - &claims, - &EncodingKey::from_rsa_pem(key.key.as_bytes()) - .context("parsing RSA private key from machine key file")?, - ) - .context("signing JWT assertion")?; - Ok(assertion) -} - -/// Pure claim payload for the JWT-bearer assertion. `iss == sub == userId` -/// is a Zitadel requirement; `aud` is Zitadel itself (the token endpoint -/// is reached via `oidc_issuer_url`); `exp - iat` MUST be ≤ 60 s or -/// Zitadel rejects. -pub(crate) fn build_assertion_claims( - key: &MachineKeyFile, - oidc_issuer_url: &str, - now: i64, -) -> serde_json::Value { - serde_json::json!({ - "iss": key.user_id, - "sub": key.user_id, - "aud": oidc_issuer_url, - "exp": now + ASSERTION_LIFETIME_SECS, - "iat": now, - }) -} - -/// JWT header for the assertion. The `kid` tells Zitadel which of the -/// machine user's registered keys to verify the signature against. -pub(crate) fn build_assertion_header(key: &MachineKeyFile) -> JwtHeader { - let mut header = JwtHeader::new(Algorithm::RS256); - header.kid = Some(key.key_id.clone()); - header -} - -/// Build the OAuth `scope` string for the token-bearer request. -/// -/// Three scopes are needed for the access token to be useful here: -/// -/// * `openid` — base OIDC requirement. -/// * `urn:zitadel:iam:org:projects:roles` (PLURAL "projects") — -/// tells Zitadel to include the role-claim block in the access -/// token. Without this, the callout sees "no authorized role -/// in token" even when the user has a project role grant. -/// * `urn:zitadel:iam:org:project:id::aud` (SINGULAR -/// "project") — adds to the access token's `aud` claim -/// so the callout's audience validation accepts the project -/// ID we're using as the JWT-bearer audience. -/// -/// The plural-vs-singular distinction is a Zitadel convention, -/// not a typo. Both scopes are required. -pub(crate) fn build_scope(audience: &str) -> String { - format!( - "openid \ - urn:zitadel:iam:org:projects:roles \ - urn:zitadel:iam:org:project:id:{audience}:aud" - ) -} - -/// Resolve the token endpoint URL, tolerating a trailing slash on -/// `oidc_issuer_url`. Without trimming, a configured issuer of -/// `https://sso.example.com/` produces `…//oauth/v2/token` which 404s. -pub(crate) fn build_token_url(oidc_issuer_url: &str) -> String { - format!("{}/oauth/v2/token", oidc_issuer_url.trim_end_matches('/')) -} - -// ---- helper types ---------------------------------------------------------- - -/// JSON keyfile content as Zitadel emits it for a `KEY_TYPE_JSON` -/// machine key. The `key` is a PEM-encoded RSA private key. -#[derive(Debug, Clone, Deserialize)] -pub struct MachineKeyFile { - #[serde(rename = "type")] - pub _type: String, - #[serde(rename = "keyId")] - pub key_id: String, - pub key: String, - #[serde(rename = "userId")] - pub user_id: String, -} - -#[derive(Debug, Clone)] -pub struct CachedToken { - pub(crate) access_token: String, - /// Unix seconds at which the token is no longer trusted by - /// `cached_if_fresh`. Computed from the OAuth response's `expires_in` - /// and the local clock at mint time. - pub(crate) expires_at_unix: i64, -} - -/// Refresh tokens this many seconds before their advertised expiry. -/// Five minutes leaves headroom for clock skew, slow networks, and -/// the round-trip cost of re-minting against Zitadel. -pub const TOKEN_REFRESH_LEEWAY_SECS: i64 = 5 * 60; - -/// Lifetime of the JWT *assertion* (the client-side bearer JWT we sign -/// to authenticate to Zitadel's token endpoint). Zitadel rejects -/// assertions with `exp - iat > 60s`; one minute is the safe ceiling. -pub const ASSERTION_LIFETIME_SECS: i64 = 60; - -// ---- factory --------------------------------------------------------------- - -/// Build the appropriate `CredentialSource` from the parsed config. -/// -/// For [`CredentialsSection::ZitadelJwt`] this reads the keyfile from -/// disk. Both the agent and the operator mount their key as a file -/// (Secret volume in the operator's Pod, dropped by -/// `FleetDeviceSetupScore` on the agent's VM); the path is just -/// configured differently. -pub fn credential_source_from_config(creds: &CredentialsSection) -> Result> { - match creds { - CredentialsSection::TomlShared { - nats_user, - nats_pass, - } => Ok(Arc::new(CredentialSource::TomlShared { - user: nats_user.clone(), - pass: nats_pass.clone(), - })), - CredentialsSection::ZitadelJwt { - key_path, - key_json, - oidc_issuer_url, - audience, - danger_accept_invalid_certs, - } => { - // `key_json` (inline) wins over `key_path` (file). The - // operator pod uses inline because OKD's restricted-v2 - // SCC + env-var-from-Secret deployment shape can't - // reliably mount Secret volumes; the agent uses the file - // path because it lives on a VM and a real file is the - // more natural rotation target. - let key = match key_json.as_deref().map(str::trim) { - Some(json) if !json.is_empty() => parse_machine_key(json)?, - _ => load_machine_key(key_path)?, - }; - Ok(Arc::new(CredentialSource::ZitadelJwt { - key, - oidc_issuer_url: oidc_issuer_url.clone(), - audience: audience.clone(), - http: reqwest::Client::builder() - .danger_accept_invalid_certs(*danger_accept_invalid_certs) - .timeout(Duration::from_secs(10)) - .build() - .context("building HTTP client for Zitadel token endpoint")?, - cache: Mutex::new(None), - })) + /// Build the appropriate `CredentialSource` from the parsed config. + /// + /// For [`CredentialsSection::ZitadelJwt`] this reads the keyfile from + /// disk. Both the agent and the operator mount their key as a file + /// (Secret volume in the operator's Pod, dropped by + /// `FleetDeviceSetupScore` on the agent's VM); the path is just + /// configured differently. + pub fn credential_source_from_config( + creds: &CredentialsSection, + ) -> Result> { + match creds { + CredentialsSection::TomlShared { + nats_user, + nats_pass, + } => Ok(Arc::new(CredentialSource::TomlShared { + user: nats_user.clone(), + pass: nats_pass.clone(), + })), + CredentialsSection::ZitadelJwt { + key_path, + key_json, + oidc_issuer_url, + audience, + danger_accept_invalid_certs, + } => { + // `key_json` (inline) wins over `key_path` (file). The + // operator pod uses inline because OKD's restricted-v2 + // SCC + env-var-from-Secret deployment shape can't + // reliably mount Secret volumes; the agent uses the file + // path because it lives on a VM and a real file is the + // more natural rotation target. + let key = match key_json.as_deref().map(str::trim) { + Some(json) if !json.is_empty() => parse_machine_key(json)?, + _ => load_machine_key(key_path)?, + }; + let bearer = ZitadelJwtBearer::new( + key, + oidc_issuer_url.clone(), + audience.clone(), + *danger_accept_invalid_certs, + )?; + Ok(Arc::new(CredentialSource::ZitadelJwt { + bearer: Arc::new(bearer), + })) + } } } } @@ -341,30 +132,6 @@ fn parse_machine_key(raw: &str) -> Result { mod tests { use super::*; - fn fake_key() -> MachineKeyFile { - MachineKeyFile { - _type: "serviceaccount".to_string(), - key_id: "kid-371358469099356247".to_string(), - // Real PEM not required for the pure-builder tests; the - // signing path that needs a parseable key is exercised - // end-to-end in the e2e harness. - key: "PEM-PLACEHOLDER".to_string(), - user_id: "uid-371358469065801815".to_string(), - } - } - - fn zjwt_source() -> CredentialSource { - CredentialSource::ZitadelJwt { - key: fake_key(), - oidc_issuer_url: "http://sso.fleet.local:8080".to_string(), - audience: "366378028009259037".to_string(), - http: reqwest::Client::new(), - cache: Mutex::new(None), - } - } - - // ---- next_credential / cache state ------------------------------------- - #[tokio::test] async fn toml_shared_returns_userpass_each_call() { let s = CredentialSource::TomlShared { @@ -380,174 +147,4 @@ mod tests { other => panic!("expected UserPass, got {other:?}"), } } - - #[test] - fn cached_token_within_leeway_is_treated_as_expired() { - // Sanity-check the comparison so refactors don't accidentally - // invert the leeway window. - let now = chrono::Utc::now().timestamp(); - let about_to_expire = CachedToken { - access_token: "x".to_string(), - expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS - 1, - }; - assert!( - about_to_expire.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS <= now, - "tokens within the leeway window must be considered expired" - ); - - let comfortable = CachedToken { - access_token: "x".to_string(), - expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS + 60, - }; - assert!( - comfortable.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now, - "tokens with comfortable headroom must be cache-hits" - ); - } - - #[test] - fn cached_if_fresh_returns_some_when_outside_leeway() { - let src = zjwt_source(); - let now = chrono::Utc::now().timestamp(); - if let CredentialSource::ZitadelJwt { cache, .. } = &src { - *cache.lock().unwrap() = Some(CachedToken { - access_token: "fresh".to_string(), - expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS + 60, - }); - } - assert_eq!(src.cached_if_fresh(), Some("fresh".to_string())); - } - - #[test] - fn cached_if_fresh_returns_none_when_no_cache() { - // Brand-new ZitadelJwt source — no token has been minted yet. - // Forces the slow path on first connect. - let src = zjwt_source(); - assert_eq!(src.cached_if_fresh(), None); - } - - #[test] - fn cached_if_fresh_returns_none_for_toml_shared() { - // Defensive: cache_if_fresh is only meaningful for ZitadelJwt; - // TomlShared has no cache. A nonsensical call must return None, - // not panic, so the cold-path can degrade gracefully. - let src = CredentialSource::TomlShared { - user: "u".into(), - pass: "p".into(), - }; - assert_eq!(src.cached_if_fresh(), None); - } - - // ---- assertion claims / header (pure builders) ------------------------ - - #[test] - fn assertion_claims_carry_iss_sub_aud_exp_iat() { - let now = 1_700_000_000; - let claims = build_assertion_claims(&fake_key(), "http://sso.fleet.local:8080", now); - assert_eq!(claims["iss"], "uid-371358469065801815"); - assert_eq!(claims["sub"], "uid-371358469065801815"); - assert_eq!(claims["aud"], "http://sso.fleet.local:8080"); - assert_eq!(claims["iat"].as_i64(), Some(now)); - assert_eq!(claims["exp"].as_i64(), Some(now + ASSERTION_LIFETIME_SECS)); - } - - #[test] - fn assertion_lifetime_locked_at_60_seconds() { - // Zitadel rejects assertions where exp - iat > 60s. If anyone - // bumps ASSERTION_LIFETIME_SECS thinking "more is safer", the - // mints will silently start failing in prod with no helpful - // error. Lock the constant. - assert_eq!(ASSERTION_LIFETIME_SECS, 60); - } - - #[test] - fn assertion_header_carries_kid_and_rs256() { - let header = build_assertion_header(&fake_key()); - assert_eq!(header.alg, jsonwebtoken::Algorithm::RS256); - assert_eq!(header.kid.as_deref(), Some("kid-371358469099356247")); - } - - // ---- scope string ------------------------------------------------------ - - #[test] - fn scope_includes_plural_projects_roles() { - // The plural-projects URN is what tells Zitadel to emit the - // role claim. Day-one bug; lock it. - let s = build_scope("366378028009259037"); - assert!( - s.contains("urn:zitadel:iam:org:projects:roles"), - "scope must include the PLURAL projects-roles URN; got {s:?}" - ); - } - - #[test] - fn scope_audience_uses_singular_project_id_urn() { - // The singular-project URN tells Zitadel to put into the - // access token's aud claim. Different URN entirely from the - // plural one above; both required. - let s = build_scope("366378028009259037"); - assert!( - s.contains("urn:zitadel:iam:org:project:id:366378028009259037:aud"), - "scope must include the SINGULAR project:id::aud URN; got {s:?}" - ); - } - - #[test] - fn scope_includes_openid_base() { - let s = build_scope("any"); - assert!( - s.split_whitespace().any(|tok| tok == "openid"), - "scope must include `openid` as a standalone token; got {s:?}" - ); - } - - // ---- token URL --------------------------------------------------------- - - #[test] - fn token_url_appends_oauth_endpoint() { - assert_eq!( - build_token_url("http://sso.fleet.local:8080"), - "http://sso.fleet.local:8080/oauth/v2/token" - ); - } - - #[test] - fn token_url_strips_single_trailing_slash() { - // A trailing slash would yield `…//oauth/v2/token`, which 404s. - // Common configuration drift; the trim guards against it. - assert_eq!( - build_token_url("http://sso.fleet.local:8080/"), - "http://sso.fleet.local:8080/oauth/v2/token" - ); - } - - #[test] - fn token_url_strips_multiple_trailing_slashes() { - // Defensive — `trim_end_matches('/')` peels all of them, not - // just the first. Locks that semantics. - assert_eq!( - build_token_url("http://sso.fleet.local:8080///"), - "http://sso.fleet.local:8080/oauth/v2/token" - ); - } - - // ---- MachineKeyFile JSON parsing -------------------------------------- - - #[test] - fn machine_key_file_parses_zitadel_json_shape() { - // The serde renames (`type`, `keyId`, `userId`) are easy to - // break. This is the literal JSON shape Zitadel's - // /management/v1/users/.../keys endpoint emits. - let raw = r#"{ - "type": "serviceaccount", - "keyId": "371358469099356247", - "key": "-----BEGIN RSA PRIVATE KEY-----\nABC\n-----END RSA PRIVATE KEY-----\n", - "userId": "371358469065801815" - }"#; - let parsed: MachineKeyFile = serde_json::from_str(raw).expect("valid keyfile"); - assert_eq!(parsed._type, "serviceaccount"); - assert_eq!(parsed.key_id, "371358469099356247"); - assert_eq!(parsed.user_id, "371358469065801815"); - assert!(parsed.key.contains("BEGIN RSA PRIVATE KEY")); - } } diff --git a/fleet/harmony-fleet-auth/src/jwt_bearer.rs b/fleet/harmony-fleet-auth/src/jwt_bearer.rs new file mode 100644 index 00000000..97743a91 --- /dev/null +++ b/fleet/harmony-fleet-auth/src/jwt_bearer.rs @@ -0,0 +1,367 @@ +use serde::Deserialize; +use std::sync::Mutex; + +use anyhow::{Context, Result}; +use jsonwebtoken::{Algorithm, EncodingKey, Header as JwtHeader}; + +pub struct ZitadelJwtBearer { + key: MachineKeyFile, + oidc_issuer_url: String, + audience: String, + http: reqwest::Client, + cache: Mutex>, +} + +/// Refresh tokens this many seconds before their advertised expiry. +/// Five minutes leaves headroom for clock skew, slow networks, and +/// the round-trip cost of re-minting against Zitadel. +pub const TOKEN_REFRESH_LEEWAY_SECS: i64 = 5 * 60; + +/// Lifetime of the JWT *assertion* (the client-side bearer JWT we sign +/// to authenticate to Zitadel's token endpoint). Zitadel rejects +/// assertions with `exp - iat > 60s`; one minute is the safe ceiling. +pub const ASSERTION_LIFETIME_SECS: i64 = 60; + +/// JSON keyfile content as Zitadel emits it for a `KEY_TYPE_JSON` +/// machine key. The `key` is a PEM-encoded RSA private key. +#[derive(Debug, Clone, Deserialize)] +pub struct MachineKeyFile { + #[serde(rename = "type")] + pub _type: String, + #[serde(rename = "keyId")] + pub key_id: String, + pub key: String, + #[serde(rename = "userId")] + pub user_id: String, +} + +#[derive(Debug, Clone)] +pub struct CachedToken { + pub(crate) access_token: String, + /// Unix seconds at which the token is no longer trusted by + /// `cached_if_fresh`. Computed from the OAuth response's `expires_in` + /// and the local clock at mint time. + pub(crate) expires_at_unix: i64, +} + +#[derive(Deserialize)] +struct TokenResponse { + access_token: String, + #[serde(default)] + expires_in: Option, +} + +impl ZitadelJwtBearer { + pub fn new( + key: MachineKeyFile, + oidc_issuer_url: String, + audience: String, + danger_accept_invalid_certs: bool, + ) -> Result { + let http = reqwest::Client::builder() + .danger_accept_invalid_certs(danger_accept_invalid_certs) + .timeout(std::time::Duration::from_secs(10)) + .build() + .context("building HTTP client for Zitadel token endpoint")?; + + Ok(Self { + key, + oidc_issuer_url, + audience, + http, + cache: Mutex::new(None), + }) + } + + pub async fn bearer_token(&self) -> Result { + { + let now = chrono::Utc::now().timestamp(); + let guard = self.cache.lock().unwrap(); + if let Some(cached) = guard.as_ref() { + if cached.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now { + return Ok(cached.access_token.clone()); + } + } + } + + let now = chrono::Utc::now().timestamp(); + let assertion = build_assertion(&self.key, &self.oidc_issuer_url, now)?; + let scope = build_scope(&self.audience); + let token_url = build_token_url(&self.oidc_issuer_url); + + let resp = self + .http + .post(&token_url) + .form(&[ + ( + "grant_type", + "urn:ietf:params:oauth:grant-type:jwt-bearer".to_string(), + ), + ("assertion", assertion), + ("scope", scope), + ]) + .send() + .await + .with_context(|| format!("POST {token_url}"))?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + anyhow::bail!("Zitadel token endpoint returned {status}: {body}"); + } + + let tr: TokenResponse = resp.json().await.context("parsing token response")?; + let expires_in = tr.expires_in.unwrap_or(3600); + let token = tr.access_token.clone(); + + *self.cache.lock().unwrap() = Some(CachedToken { + access_token: tr.access_token, + expires_at_unix: now + expires_in, + }); + + Ok(token) + } + + pub fn invalidate_cache(&self) { + *self.cache.lock().unwrap() = None; + } + + #[cfg(test)] + fn cached_if_fresh(&self) -> Option { + let now = chrono::Utc::now().timestamp(); + let guard = self.cache.lock().unwrap(); + let cached = guard.as_ref()?; + (cached.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now) + .then(|| cached.access_token.clone()) + } +} + +/// Build the JWT-bearer assertion. Split out from the network path so +/// the claims + header shape can be unit-tested without an HTTP server, +/// and split internally into the (pure) claim/header builders so they +/// can be unit-tested without an RSA private key fixture. +pub(crate) fn build_assertion( + key: &MachineKeyFile, + oidc_issuer_url: &str, + now: i64, +) -> Result { + let claims = build_assertion_claims(key, oidc_issuer_url, now); + let header = build_assertion_header(key); + let assertion = jsonwebtoken::encode( + &header, + &claims, + &EncodingKey::from_rsa_pem(key.key.as_bytes()) + .context("parsing RSA private key from machine key file")?, + ) + .context("signing JWT assertion")?; + Ok(assertion) +} + +/// Pure claim payload for the JWT-bearer assertion. `iss == sub == userId` +/// is a Zitadel requirement; `aud` is Zitadel itself (the token endpoint +/// is reached via `oidc_issuer_url`); `exp - iat` MUST be ≤ 60 s or +/// Zitadel rejects. +pub(crate) fn build_assertion_claims( + key: &MachineKeyFile, + oidc_issuer_url: &str, + now: i64, +) -> serde_json::Value { + serde_json::json!({ + "iss": key.user_id, + "sub": key.user_id, + "aud": oidc_issuer_url, + "exp": now + ASSERTION_LIFETIME_SECS, + "iat": now, + }) +} + +/// JWT header for the assertion. The `kid` tells Zitadel which of the +/// machine user's registered keys to verify the signature against. +pub(crate) fn build_assertion_header(key: &MachineKeyFile) -> JwtHeader { + let mut header = JwtHeader::new(Algorithm::RS256); + header.kid = Some(key.key_id.clone()); + header +} + +/// Build the OAuth `scope` string for the token-bearer request. +/// +/// Three scopes are needed for the access token to be useful here: +/// +/// * `openid` — base OIDC requirement. +/// * `urn:zitadel:iam:org:projects:roles` (PLURAL "projects") — +/// tells Zitadel to include the role-claim block in the access +/// token. Without this, the callout sees "no authorized role +/// in token" even when the user has a project role grant. +/// * `urn:zitadel:iam:org:project:id::aud` (SINGULAR +/// "project") — adds to the access token's `aud` claim +/// so the callout's audience validation accepts the project +/// ID we're using as the JWT-bearer audience. +/// +/// The plural-vs-singular distinction is a Zitadel convention, +/// not a typo. Both scopes are required. +pub(crate) fn build_scope(audience: &str) -> String { + format!( + "openid \ + urn:zitadel:iam:org:projects:roles \ + urn:zitadel:iam:org:project:id:{audience}:aud" + ) +} + +/// Resolve the token endpoint URL, tolerating a trailing slash on +/// `oidc_issuer_url`. Without trimming, a configured issuer of +/// `https://sso.example.com/` produces `…//oauth/v2/token` which 404s. +pub(crate) fn build_token_url(oidc_issuer_url: &str) -> String { + format!("{}/oauth/v2/token", oidc_issuer_url.trim_end_matches('/')) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn fake_key() -> MachineKeyFile { + MachineKeyFile { + _type: "serviceaccount".to_string(), + key_id: "kid-371358469099356247".to_string(), + key: "PEM-PLACEHOLDER".to_string(), + user_id: "uid-371358469065801815".to_string(), + } + } + + fn bearer() -> ZitadelJwtBearer { + ZitadelJwtBearer::new( + fake_key(), + "http://sso.fleet.local:8080".to_string(), + "366378028009259037".to_string(), + false, + ) + .unwrap() + } + + #[test] + fn cached_token_within_leeway_is_treated_as_expired() { + let now = chrono::Utc::now().timestamp(); + let about_to_expire = CachedToken { + access_token: "x".to_string(), + expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS - 1, + }; + assert!( + about_to_expire.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS <= now, + "tokens within the leeway window must be considered expired" + ); + + let comfortable = CachedToken { + access_token: "x".to_string(), + expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS + 60, + }; + assert!( + comfortable.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now, + "tokens with comfortable headroom must be cache-hits" + ); + } + + #[test] + fn cached_if_fresh_returns_some_when_outside_leeway() { + let bearer = bearer(); + let now = chrono::Utc::now().timestamp(); + *bearer.cache.lock().unwrap() = Some(CachedToken { + access_token: "fresh".to_string(), + expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS + 60, + }); + + assert_eq!(bearer.cached_if_fresh(), Some("fresh".to_string())); + } + + #[test] + fn cached_if_fresh_returns_none_when_no_cache() { + assert_eq!(bearer().cached_if_fresh(), None); + } + + #[test] + fn assertion_claims_carry_iss_sub_aud_exp_iat() { + let now = 1_700_000_000; + let claims = build_assertion_claims(&fake_key(), "http://sso.fleet.local:8080", now); + assert_eq!(claims["iss"], "uid-371358469065801815"); + assert_eq!(claims["sub"], "uid-371358469065801815"); + assert_eq!(claims["aud"], "http://sso.fleet.local:8080"); + assert_eq!(claims["iat"].as_i64(), Some(now)); + assert_eq!(claims["exp"].as_i64(), Some(now + ASSERTION_LIFETIME_SECS)); + } + + #[test] + fn assertion_lifetime_locked_at_60_seconds() { + assert_eq!(ASSERTION_LIFETIME_SECS, 60); + } + + #[test] + fn assertion_header_carries_kid_and_rs256() { + let header = build_assertion_header(&fake_key()); + assert_eq!(header.alg, jsonwebtoken::Algorithm::RS256); + assert_eq!(header.kid.as_deref(), Some("kid-371358469099356247")); + } + + #[test] + fn scope_includes_plural_projects_roles() { + let s = build_scope("366378028009259037"); + assert!( + s.contains("urn:zitadel:iam:org:projects:roles"), + "scope must include the PLURAL projects-roles URN; got {s:?}" + ); + } + + #[test] + fn scope_audience_uses_singular_project_id_urn() { + let s = build_scope("366378028009259037"); + assert!( + s.contains("urn:zitadel:iam:org:project:id:366378028009259037:aud"), + "scope must include the SINGULAR project:id::aud URN; got {s:?}" + ); + } + + #[test] + fn scope_includes_openid_base() { + let s = build_scope("any"); + assert!( + s.split_whitespace().any(|tok| tok == "openid"), + "scope must include `openid` as a standalone token; got {s:?}" + ); + } + + #[test] + fn token_url_appends_oauth_endpoint() { + assert_eq!( + build_token_url("http://sso.fleet.local:8080"), + "http://sso.fleet.local:8080/oauth/v2/token" + ); + } + + #[test] + fn token_url_strips_single_trailing_slash() { + assert_eq!( + build_token_url("http://sso.fleet.local:8080/"), + "http://sso.fleet.local:8080/oauth/v2/token" + ); + } + + #[test] + fn token_url_strips_multiple_trailing_slashes() { + assert_eq!( + build_token_url("http://sso.fleet.local:8080///"), + "http://sso.fleet.local:8080/oauth/v2/token" + ); + } + + #[test] + fn machine_key_file_parses_zitadel_json_shape() { + let raw = r#"{ + "type": "serviceaccount", + "keyId": "371358469099356247", + "key": "-----BEGIN RSA PRIVATE KEY-----\nABC\n-----END RSA PRIVATE KEY-----\n", + "userId": "371358469065801815" + }"#; + let parsed: MachineKeyFile = serde_json::from_str(raw).expect("valid keyfile"); + assert_eq!(parsed._type, "serviceaccount"); + assert_eq!(parsed.key_id, "371358469099356247"); + assert_eq!(parsed.user_id, "371358469065801815"); + assert!(parsed.key.contains("BEGIN RSA PRIVATE KEY")); + } +} diff --git a/fleet/harmony-fleet-auth/src/lib.rs b/fleet/harmony-fleet-auth/src/lib.rs index 3cce33e1..81a68ca1 100644 --- a/fleet/harmony-fleet-auth/src/lib.rs +++ b/fleet/harmony-fleet-auth/src/lib.rs @@ -24,16 +24,15 @@ mod agent_config; mod config; mod credentials; +mod jwt_bearer; pub use agent_config::{AgentConfig, AgentSection, NatsSection, load_config}; pub use config::CredentialsSection; -pub use credentials::{ - ASSERTION_LIFETIME_SECS, CachedToken, CredentialSource, MachineKeyFile, NatsCredential, - TOKEN_REFRESH_LEEWAY_SECS, credential_source_from_config, -}; use std::sync::Arc; +pub use crate::credentials::{CredentialSource, NatsCredential}; + /// Build `async_nats::ConnectOptions` wired with the auth callback /// that pulls fresh credentials from `creds` on every (re)connect. /// -- 2.39.5 From 57d9685c57a7eca6f3c515bfa42fdc01f8c532ff Mon Sep 17 00:00:00 2001 From: Reda Tarzalt Date: Tue, 2 Jun 2026 12:46:42 -0400 Subject: [PATCH 2/2] add files --- Cargo.lock | 2 + examples/fleet_staging_install/src/main.rs | 29 ++-- examples/harmony_sso/src/main.rs | 27 +-- fleet/harmony-fleet-auth/Cargo.toml | 1 + .../examples/mint_zitadel_token.rs | 15 +- fleet/harmony-fleet-auth/src/credentials.rs | 5 +- fleet/harmony-fleet-auth/src/lib.rs | 7 +- harmony_config/src/lib.rs | 27 +-- harmony_secret/Cargo.toml | 1 + harmony_secret/src/lib.rs | 29 ++-- harmony_secret/src/store/mod.rs | 2 +- harmony_secret/src/store/openbao.rs | 158 ++++++++++++------ .../src/jwt_bearer.rs | 0 harmony_zitadel_auth/src/lib.rs | 2 + 14 files changed, 192 insertions(+), 113 deletions(-) rename {fleet/harmony-fleet-auth => harmony_zitadel_auth}/src/jwt_bearer.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 52da3fe0..c9b548fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4078,6 +4078,7 @@ dependencies = [ "async-nats", "chrono", "harmony-reconciler-contracts", + "harmony_zitadel_auth", "jsonwebtoken", "reqwest 0.12.28", "serde", @@ -4463,6 +4464,7 @@ dependencies = [ "async-trait", "directories", "harmony_secret_derive", + "harmony_zitadel_auth", "http 1.4.0", "infisical", "inquire 0.7.5", diff --git a/examples/fleet_staging_install/src/main.rs b/examples/fleet_staging_install/src/main.rs index a3dfa9d8..416f3eec 100644 --- a/examples/fleet_staging_install/src/main.rs +++ b/examples/fleet_staging_install/src/main.rs @@ -33,7 +33,7 @@ use harmony::topology::{K8sAnywhereTopology, K8sclient, Topology}; use harmony_config::{Config, ConfigClient, StoreSource}; use harmony_fleet_deploy::{FleetDeploySecrets, FleetOperatorScore, OperatorCredentials}; use harmony_k8s::KubernetesDistribution; -use harmony_secret::OpenbaoSecretStore; +use harmony_secret::{OpenbaoSecretStore, OpenbaoStoreOptions}; use nkeys::KeyPair; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -325,19 +325,20 @@ path "secret/metadata/harmony/*" { capabilities = ["list","read"] }"# .await .context("port-forward to OpenBao")?; tokio::time::sleep(std::time::Duration::from_secs(1)).await; - let store = OpenbaoSecretStore::new( - format!("http://127.0.0.1:{}", pf.port()), - "secret".to_string(), - "token".to_string(), - true, - Some(cached_root_token(&openbao).map_err(|e| anyhow::anyhow!(e))?), - None, - None, - None, - None, - None, - None, - ) + let store = OpenbaoSecretStore::new(OpenbaoStoreOptions { + base_url: format!("http://127.0.0.1:{}", pf.port()), + kv_mount: "secret".to_string(), + auth_mount: "token".to_string(), + skip_tls: true, + token: Some(cached_root_token(&openbao).map_err(|e| anyhow::anyhow!(e))?), + username: None, + password: None, + zitadel_sso_url: None, + zitadel_client_id: None, + jwt_role: None, + jwt_auth_mount: None, + zitadel_jwt_bearer: None, + }) .await .context("OpenBao client")?; ConfigClient::new(vec![ diff --git a/examples/harmony_sso/src/main.rs b/examples/harmony_sso/src/main.rs index 96d70766..abdf021e 100644 --- a/examples/harmony_sso/src/main.rs +++ b/examples/harmony_sso/src/main.rs @@ -366,19 +366,20 @@ async fn main() -> anyhow::Result<()> { let openbao_url = format!("http://127.0.0.1:{}", _pf.port()); let sso_url = format!("http://{}:{}", ZITADEL_HOST, HTTP_PORT); - let store = OpenbaoSecretStore::new( - openbao_url, - "secret".to_string(), - "jwt".to_string(), - true, - None, - None, - None, - Some(sso_url), - Some(client_id), - Some("harmony-developer".to_string()), - Some("jwt".to_string()), - ) + let store = OpenbaoSecretStore::new(harmony_secret::OpenbaoStoreOptions { + base_url: openbao_url, + kv_mount: "secret".to_string(), + auth_mount: "jwt".to_string(), + skip_tls: true, + token: None, + username: None, + password: None, + zitadel_sso_url: Some(sso_url), + zitadel_client_id: Some(client_id), + jwt_role: Some("harmony-developer".to_string()), + jwt_auth_mount: Some("jwt".to_string()), + zitadel_jwt_bearer: None, + }) .await .context("SSO authentication failed")?; diff --git a/fleet/harmony-fleet-auth/Cargo.toml b/fleet/harmony-fleet-auth/Cargo.toml index 7e32f1e4..a99836e3 100644 --- a/fleet/harmony-fleet-auth/Cargo.toml +++ b/fleet/harmony-fleet-auth/Cargo.toml @@ -11,6 +11,7 @@ path = "src/lib.rs" [dependencies] harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" } +harmony_zitadel_auth = { path = "../../harmony_zitadel_auth" } async-nats = { workspace = true } anyhow = { workspace = true } chrono = { workspace = true } diff --git a/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs b/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs index 0b48fb64..73d7c949 100644 --- a/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs +++ b/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs @@ -7,13 +7,14 @@ async fn main() -> Result<()> { let issuer = std::env::var("ZITADEL_ISSUER").context("ZITADEL_ISSUER missing")?; let audience = std::env::var("ZITADEL_PROJECT_ID").context("ZITADEL_PROJECT_ID missing")?; - let source = CredentialSource::credential_source_from_config(&CredentialsSection::ZitadelJwt { - key_path: key_path.into(), - key_json: None, - oidc_issuer_url: issuer, - audience, - danger_accept_invalid_certs: false, - })?; + let source = + CredentialSource::credential_source_from_config(&CredentialsSection::ZitadelJwt { + key_path: key_path.into(), + key_json: None, + oidc_issuer_url: issuer, + audience, + danger_accept_invalid_certs: false, + })?; match source.next_credential().await? { NatsCredential::BearerToken(token) => { diff --git a/fleet/harmony-fleet-auth/src/credentials.rs b/fleet/harmony-fleet-auth/src/credentials.rs index 964ff0fe..36982d55 100644 --- a/fleet/harmony-fleet-auth/src/credentials.rs +++ b/fleet/harmony-fleet-auth/src/credentials.rs @@ -27,8 +27,9 @@ use std::sync::Arc; use anyhow::{Context, Result}; -use crate::jwt_bearer::ZitadelJwtBearer; -use crate::{config::CredentialsSection, jwt_bearer::MachineKeyFile}; +use harmony_zitadel_auth::{MachineKeyFile, ZitadelJwtBearer}; + +use crate::CredentialsSection; /// Material the NATS connector needs to authenticate. Returned per /// (re)connect attempt — the source decides whether to mint fresh. diff --git a/fleet/harmony-fleet-auth/src/lib.rs b/fleet/harmony-fleet-auth/src/lib.rs index 81a68ca1..7932c75f 100644 --- a/fleet/harmony-fleet-auth/src/lib.rs +++ b/fleet/harmony-fleet-auth/src/lib.rs @@ -24,7 +24,6 @@ mod agent_config; mod config; mod credentials; -mod jwt_bearer; pub use agent_config::{AgentConfig, AgentSection, NatsSection, load_config}; pub use config::CredentialsSection; @@ -33,6 +32,12 @@ use std::sync::Arc; pub use crate::credentials::{CredentialSource, NatsCredential}; +pub fn credential_source_from_config( + creds: &CredentialsSection, +) -> anyhow::Result> { + CredentialSource::credential_source_from_config(creds) +} + /// Build `async_nats::ConnectOptions` wired with the auth callback /// that pulls fresh credentials from `creds` on every (re)connect. /// diff --git a/harmony_config/src/lib.rs b/harmony_config/src/lib.rs index dde30471..fabe00ee 100644 --- a/harmony_config/src/lib.rs +++ b/harmony_config/src/lib.rs @@ -210,19 +210,20 @@ async fn openbao_from_env(namespace: &str) -> Option> { }; let env = |k: &str| std::env::var(k).ok(); - let store = harmony_secret::OpenbaoSecretStore::new( - url, - env("OPENBAO_KV_MOUNT").unwrap_or_else(|| "secret".to_string()), - env("OPENBAO_AUTH_MOUNT").unwrap_or_else(|| "jwt".to_string()), - env("OPENBAO_SKIP_TLS").as_deref() == Some("true"), - env("OPENBAO_TOKEN"), - env("OPENBAO_USERNAME"), - env("OPENBAO_PASSWORD"), - env("HARMONY_SSO_URL"), - env("HARMONY_SSO_CLIENT_ID"), - None, - None, - ) + let store = harmony_secret::OpenbaoSecretStore::new(harmony_secret::OpenbaoStoreOptions { + base_url: url, + kv_mount: env("OPENBAO_KV_MOUNT").unwrap_or_else(|| "secret".to_string()), + auth_mount: env("OPENBAO_AUTH_MOUNT").unwrap_or_else(|| "jwt".to_string()), + skip_tls: env("OPENBAO_SKIP_TLS").as_deref() == Some("true"), + token: env("OPENBAO_TOKEN"), + username: env("OPENBAO_USERNAME"), + password: env("OPENBAO_PASSWORD"), + zitadel_sso_url: env("HARMONY_SSO_URL"), + zitadel_client_id: env("HARMONY_SSO_CLIENT_ID"), + jwt_role: None, + jwt_auth_mount: None, + zitadel_jwt_bearer: None, + }) .await; match store { diff --git a/harmony_secret/Cargo.toml b/harmony_secret/Cargo.toml index 8d8b9356..266eb1a6 100644 --- a/harmony_secret/Cargo.toml +++ b/harmony_secret/Cargo.toml @@ -12,6 +12,7 @@ license.workspace = true unexpected_cfgs = { level = "warn", check-cfg = ['cfg(secrete2etest)'] } [dependencies] +harmony_zitadel_auth = { path = "../harmony_zitadel_auth" } harmony_secret_derive = { version = "0.1.0", path = "../harmony_secret_derive" } serde = { version = "1.0.209", features = ["derive", "rc"] } serde_json = "1.0.127" diff --git a/harmony_secret/src/lib.rs b/harmony_secret/src/lib.rs index cda2786f..99d32f3c 100644 --- a/harmony_secret/src/lib.rs +++ b/harmony_secret/src/lib.rs @@ -26,7 +26,7 @@ use serde::{Serialize, de::DeserializeOwned}; use std::fmt; use store::InfisicalSecretStore; use store::LocalFileSecretStore; -pub use store::OpenbaoSecretStore; +pub use store::{OpenbaoSecretStore, OpenbaoStoreOptions}; use thiserror::Error; use tokio::sync::OnceCell; @@ -85,19 +85,20 @@ async fn init_secret_manager() -> SecretManager { let store: Box = match store_type.as_str() { "file" => Box::new(LocalFileSecretStore), "openbao" | "vault" => { - let store = OpenbaoSecretStore::new( - OPENBAO_URL.clone().expect("Openbao/Vault URL must be set, see harmony_secret config for ways to provide it. You can try with OPENBAO_URL or VAULT_ADDR"), - OPENBAO_KV_MOUNT.clone(), - OPENBAO_AUTH_MOUNT.clone(), - *OPENBAO_SKIP_TLS, - OPENBAO_TOKEN.clone(), - OPENBAO_USERNAME.clone(), - OPENBAO_PASSWORD.clone(), - HARMONY_SSO_URL.clone(), - HARMONY_SSO_CLIENT_ID.clone(), - None, - None, - ) + let store = OpenbaoSecretStore::new(OpenbaoStoreOptions { + base_url: OPENBAO_URL.clone().expect("Openbao/Vault URL must be set, see harmony_secret config for ways to provide it. You can try with OPENBAO_URL or VAULT_ADDR"), + kv_mount: OPENBAO_KV_MOUNT.clone(), + auth_mount: OPENBAO_AUTH_MOUNT.clone(), + skip_tls: *OPENBAO_SKIP_TLS, + token: OPENBAO_TOKEN.clone(), + username: OPENBAO_USERNAME.clone(), + password: OPENBAO_PASSWORD.clone(), + zitadel_sso_url: HARMONY_SSO_URL.clone(), + zitadel_client_id: HARMONY_SSO_CLIENT_ID.clone(), + jwt_role: None, + jwt_auth_mount: None, + zitadel_jwt_bearer: None, + }) .await .expect("Failed to initialize Openbao/Vault secret store"); Box::new(store) diff --git a/harmony_secret/src/store/mod.rs b/harmony_secret/src/store/mod.rs index 8179291d..9ea55a4c 100644 --- a/harmony_secret/src/store/mod.rs +++ b/harmony_secret/src/store/mod.rs @@ -5,4 +5,4 @@ pub mod zitadel; pub use infisical::InfisicalSecretStore; pub use local_file::LocalFileSecretStore; -pub use openbao::OpenbaoSecretStore; +pub use openbao::{OpenbaoSecretStore, OpenbaoStoreOptions}; diff --git a/harmony_secret/src/store/openbao.rs b/harmony_secret/src/store/openbao.rs index d0110cd3..0f41eec5 100644 --- a/harmony_secret/src/store/openbao.rs +++ b/harmony_secret/src/store/openbao.rs @@ -1,10 +1,14 @@ use crate::{SecretStore, SecretStoreError}; use async_trait::async_trait; +use harmony_zitadel_auth::ZitadelJwtBearer; use log::{debug, info, warn}; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use std::fmt::Debug; use std::fs; use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::Mutex; use vaultrs::auth; use vaultrs::client::{VaultClient, VaultClientSettingsBuilder}; use vaultrs::kv2; @@ -30,21 +34,52 @@ impl From for AuthInfo { } pub struct OpenbaoSecretStore { - client: VaultClient, + inner: Mutex, kv_mount: String, auth_mount: String, + jwt_bearer: Option>, + jwt_role: Option, + jwt_auth_mount: Option, + base_url: String, + skip_tls: bool, +} + +struct Inner { + client: VaultClient, + scope: HashSet, } impl Debug for OpenbaoSecretStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("OpenbaoSecretStore") - .field("client", &self.client.settings) .field("kv_mount", &self.kv_mount) .field("auth_mount", &self.auth_mount) .finish() } } +pub struct OpenbaoStoreOptions { + pub base_url: String, + pub kv_mount: String, + pub auth_mount: String, + pub skip_tls: bool, + pub token: Option, + pub username: Option, + pub password: Option, + pub zitadel_sso_url: Option, + pub zitadel_client_id: Option, + pub zitadel_jwt_bearer: Option, + pub jwt_role: Option, + pub jwt_auth_mount: Option, +} + +pub struct ZitadelJwtBearerConfig { + pub key_path: Option, + pub key_json: Option, + pub oidc_issuer_url: String, + pub audience: String, +} + impl OpenbaoSecretStore { /// Creates a new Openbao/Vault secret store with authentication. /// @@ -52,34 +87,35 @@ impl OpenbaoSecretStore { /// - `auth_mount`: The auth method mount path (e.g., `"userpass"`). Used for userpass authentication. /// - `zitadel_sso_url`: Zitadel OIDC server URL (e.g., `"https://sso.example.com"`). If provided along with `zitadel_client_id`, Zitadel OIDC device flow will be attempted. /// - `zitadel_client_id`: OIDC client ID registered in Zitadel. - // TODO(clippy): refactor to a typed options struct - #[allow(clippy::too_many_arguments)] - pub async fn new( - base_url: String, - kv_mount: String, - auth_mount: String, - skip_tls: bool, - token: Option, - username: Option, - password: Option, - zitadel_sso_url: Option, - zitadel_client_id: Option, - jwt_role: Option, - jwt_auth_mount: Option, - ) -> Result { - info!("OPENBAO_STORE: Initializing client for URL: {base_url}"); + pub async fn new(options: OpenbaoStoreOptions) -> Result { + info!( + "OPENBAO_STORE: Initializing client for URL: {}", + options.base_url + ); // 1. If token is provided via env var, use it directly - if let Some(t) = token { + if let Some(t) = &options.token { debug!("OPENBAO_STORE: Using token from environment variable"); - return Self::with_token(&base_url, skip_tls, &t, &kv_mount, &auth_mount); + return Self::with_token( + &options.base_url, + options.skip_tls, + t, + &options.kv_mount, + &options.auth_mount, + ); } // 2. Try to load cached token - let cache_path = Self::get_token_cache_path(&base_url); + let cache_path = Self::get_token_cache_path(&options.base_url); if let Ok(mut cached_token) = Self::load_cached_token(&cache_path) { debug!("OPENBAO_STORE: Found cached token, validating..."); - if Self::validate_token(&base_url, skip_tls, &cached_token.client_token).await { + if Self::validate_token( + &options.base_url, + options.skip_tls, + &cached_token.client_token, + ) + .await + { info!("OPENBAO_STORE: Cached token is valid"); // Best-effort renewal: extend the server-side lease so @@ -89,7 +125,13 @@ impl OpenbaoSecretStore { // blip, etc.) doesn't block the caller — we just keep // the validated cached token as-is and let the next // invocation try again. - match Self::renew_self(&base_url, skip_tls, &cached_token.client_token).await { + match Self::renew_self( + &options.base_url, + options.skip_tls, + &cached_token.client_token, + ) + .await + { Ok(new_lease) => { info!("OPENBAO_STORE: cached token renewed, lease_duration={new_lease}s"); cached_token.lease_duration = Some(new_lease); @@ -106,26 +148,28 @@ impl OpenbaoSecretStore { } return Self::with_token( - &base_url, - skip_tls, + &options.base_url, + options.skip_tls, &cached_token.client_token, - &kv_mount, - &auth_mount, + &options.kv_mount, + &options.auth_mount, ); } warn!("OPENBAO_STORE: Cached token is invalid or expired"); } // 3. Try Zitadel OIDC device flow if configured - if let (Some(sso_url), Some(client_id)) = (zitadel_sso_url, zitadel_client_id) { + if let (Some(sso_url), Some(client_id)) = + (&options.zitadel_sso_url, &options.zitadel_client_id) + { info!("OPENBAO_STORE: Attempting Zitadel OIDC device flow"); match Self::authenticate_zitadel_oidc( - &base_url, - &sso_url, - &client_id, - skip_tls, - jwt_role.as_deref(), - jwt_auth_mount.as_deref(), + &options.base_url, + sso_url, + client_id, + options.skip_tls, + options.jwt_role.as_deref(), + options.jwt_auth_mount.as_deref(), ) .await { @@ -141,11 +185,11 @@ impl OpenbaoSecretStore { warn!("OPENBAO_STORE: Failed to cache OIDC token: {e}"); } return Self::with_token( - &base_url, - skip_tls, + &options.base_url, + options.skip_tls, &auth_info.client_token, - &kv_mount, - &auth_mount, + &options.kv_mount, + &options.auth_mount, ); } Err(e) => { @@ -155,7 +199,7 @@ impl OpenbaoSecretStore { } // 4. Authenticate with username/password - let (user, pass) = match (username, password) { + let (user, pass) = match (&options.username, &options.password) { (Some(u), Some(p)) => (u, p), _ => { return Err(SecretStoreError::Store( @@ -167,8 +211,14 @@ impl OpenbaoSecretStore { } }; - let token = - Self::authenticate_userpass(&base_url, &auth_mount, skip_tls, &user, &pass).await?; + let token = Self::authenticate_userpass( + &options.base_url, + &options.auth_mount, + options.skip_tls, + user, + pass, + ) + .await?; // Cache the token if let Err(e) = Self::cache_token(&cache_path, &token) { @@ -176,11 +226,11 @@ impl OpenbaoSecretStore { } Self::with_token( - &base_url, - skip_tls, + &options.base_url, + options.skip_tls, &token.client_token, - &kv_mount, - &auth_mount, + &options.kv_mount, + &options.auth_mount, ) } @@ -229,10 +279,20 @@ impl OpenbaoSecretStore { ) .map_err(|e| SecretStoreError::Store(Box::new(e)))?; - Ok(Self { + let inner = Mutex::new(Inner { client, + scope: HashSet::new(), + }); + + Ok(Self { + inner, kv_mount: kv_mount.to_string(), auth_mount: auth_mount.to_string(), + base_url: base_url.to_string(), + skip_tls, + jwt_bearer: None, + jwt_role: None, + jwt_auth_mount: None, }) } @@ -411,7 +471,8 @@ impl SecretStore for OpenbaoSecretStore { info!("OPENBAO_STORE: Getting key '{key}' from namespace '{namespace}'"); debug!("OPENBAO_STORE: Request path: {path}"); - let data: serde_json::Value = kv2::read(&self.client, &self.kv_mount, &path) + let inner = self.inner.lock().await; + let data: serde_json::Value = kv2::read(&inner.client, &self.kv_mount, &path) .await .map_err(|e| { if e.to_string().contains("does not exist") || e.to_string().contains("404") { @@ -447,8 +508,9 @@ impl SecretStore for OpenbaoSecretStore { let data = serde_json::json!({ "value": value_str }); + let inner = self.inner.lock().await; - kv2::set(&self.client, &self.kv_mount, &path, &data) + kv2::set(&inner.client, &self.kv_mount, &path, &data) .await .map_err(|e| SecretStoreError::Store(Box::new(e)))?; diff --git a/fleet/harmony-fleet-auth/src/jwt_bearer.rs b/harmony_zitadel_auth/src/jwt_bearer.rs similarity index 100% rename from fleet/harmony-fleet-auth/src/jwt_bearer.rs rename to harmony_zitadel_auth/src/jwt_bearer.rs diff --git a/harmony_zitadel_auth/src/lib.rs b/harmony_zitadel_auth/src/lib.rs index f5cd0e78..c59c31c9 100644 --- a/harmony_zitadel_auth/src/lib.rs +++ b/harmony_zitadel_auth/src/lib.rs @@ -2,6 +2,7 @@ pub mod axum_login_flow; pub mod config; pub mod jwks; +pub mod jwt_bearer; pub mod login; pub mod session; @@ -11,6 +12,7 @@ pub use config::{ BASE_URL_ENV, CLIENT_ID_ENV, COOKIE_KEY_ENV, LOGOUT_REDIRECT_URI_ENV, SCOPE_ENV, TRUSTED_AUDIENCES_ENV, ZITADEL_BASE_ENV, ZitadelAuthConfig, config_from_env, }; +pub use jwt_bearer::{MachineKeyFile, ZitadelJwtBearer}; pub use jwks::JwksCache; -- 2.39.5