diff --git a/.env.example b/.env.example deleted file mode 100644 index 2f4ae554..00000000 --- a/.env.example +++ /dev/null @@ -1,7 +0,0 @@ -FLEET_AUTH_ISSUER_URL= -FLEET_AUTH_AUTHORIZE_URL= -FLEET_AUTH_TOKEN_URL= -FLEET_AUTH_CLIENT_ID= -FLEET_AUTH_REDIRECT_URI= -FLEET_AUTH_SCOPE= -FLEET_AUTH_TRUSTED_AUDIENCES= diff --git a/Cargo.lock b/Cargo.lock index 52da3fe0..cb074901 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4078,6 +4078,7 @@ dependencies = [ "async-nats", "chrono", "harmony-reconciler-contracts", + "harmony_zitadel_auth", "jsonwebtoken", "reqwest 0.12.28", "serde", @@ -4130,6 +4131,7 @@ dependencies = [ "harmony-fleet-auth", "harmony-fleet-deploy", "harmony-fleet-operator", + "harmony-k8s", "harmony-reconciler-contracts", "harmony_types", "k3d-rs", @@ -4463,6 +4465,7 @@ dependencies = [ "async-trait", "directories", "harmony_secret_derive", + "harmony_zitadel_auth", "http 1.4.0", "infisical", "inquire 0.7.5", diff --git a/examples/fleet_auth_callout/src/lib.rs b/examples/fleet_auth_callout/src/lib.rs index c2d4c9bf..a87cb37e 100644 --- a/examples/fleet_auth_callout/src/lib.rs +++ b/examples/fleet_auth_callout/src/lib.rs @@ -759,7 +759,7 @@ pub async fn mint_access_token( access_token: String, } let tr: TokenResponse = resp.json().await.context("parse token response")?; - if std::env::var("FLEET_AUTH_CALLOUT_DEBUG_TOKENS").is_ok() + if std::env::var("HARMONY_SSO_CALLOUT_DEBUG_TOKENS").is_ok() && let Some(payload_b64) = tr.access_token.split('.').nth(1) { use base64::Engine; diff --git a/examples/fleet_staging_install/src/main.rs b/examples/fleet_staging_install/src/main.rs index a3dfa9d8..416f3eec 100644 --- a/examples/fleet_staging_install/src/main.rs +++ b/examples/fleet_staging_install/src/main.rs @@ -33,7 +33,7 @@ use harmony::topology::{K8sAnywhereTopology, K8sclient, Topology}; use harmony_config::{Config, ConfigClient, StoreSource}; use harmony_fleet_deploy::{FleetDeploySecrets, FleetOperatorScore, OperatorCredentials}; use harmony_k8s::KubernetesDistribution; -use harmony_secret::OpenbaoSecretStore; +use harmony_secret::{OpenbaoSecretStore, OpenbaoStoreOptions}; use nkeys::KeyPair; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -325,19 +325,20 @@ path "secret/metadata/harmony/*" { capabilities = ["list","read"] }"# .await .context("port-forward to OpenBao")?; tokio::time::sleep(std::time::Duration::from_secs(1)).await; - let store = OpenbaoSecretStore::new( - format!("http://127.0.0.1:{}", pf.port()), - "secret".to_string(), - "token".to_string(), - true, - Some(cached_root_token(&openbao).map_err(|e| anyhow::anyhow!(e))?), - None, - None, - None, - None, - None, - None, - ) + let store = OpenbaoSecretStore::new(OpenbaoStoreOptions { + base_url: format!("http://127.0.0.1:{}", pf.port()), + kv_mount: "secret".to_string(), + auth_mount: "token".to_string(), + skip_tls: true, + token: Some(cached_root_token(&openbao).map_err(|e| anyhow::anyhow!(e))?), + username: None, + password: None, + zitadel_sso_url: None, + zitadel_client_id: None, + jwt_role: None, + jwt_auth_mount: None, + zitadel_jwt_bearer: None, + }) .await .context("OpenBao client")?; ConfigClient::new(vec![ diff --git a/examples/harmony_sso/src/main.rs b/examples/harmony_sso/src/main.rs index 96d70766..4b7ece74 100644 --- a/examples/harmony_sso/src/main.rs +++ b/examples/harmony_sso/src/main.rs @@ -341,7 +341,6 @@ async fn main() -> anyhow::Result<()> { role_name: "harmony-developer".to_string(), bound_audiences: client_id.clone(), user_claim: "email".to_string(), - policies: vec!["harmony-dev".to_string()], ttl: "4h".to_string(), max_ttl: "24h".to_string(), }), @@ -366,19 +365,20 @@ async fn main() -> anyhow::Result<()> { let openbao_url = format!("http://127.0.0.1:{}", _pf.port()); let sso_url = format!("http://{}:{}", ZITADEL_HOST, HTTP_PORT); - let store = OpenbaoSecretStore::new( - openbao_url, - "secret".to_string(), - "jwt".to_string(), - true, - None, - None, - None, - Some(sso_url), - Some(client_id), - Some("harmony-developer".to_string()), - Some("jwt".to_string()), - ) + let store = OpenbaoSecretStore::new(harmony_secret::OpenbaoStoreOptions { + base_url: openbao_url, + kv_mount: "secret".to_string(), + auth_mount: "jwt".to_string(), + skip_tls: true, + token: None, + username: None, + password: None, + zitadel_sso_url: Some(sso_url), + zitadel_client_id: Some(client_id), + jwt_role: Some("harmony-developer".to_string()), + jwt_auth_mount: Some("jwt".to_string()), + zitadel_jwt_bearer: None, + }) .await .context("SSO authentication failed")?; diff --git a/examples/openbao/src/main.rs b/examples/openbao/src/main.rs index f5824b3e..f40d371e 100644 --- a/examples/openbao/src/main.rs +++ b/examples/openbao/src/main.rs @@ -105,7 +105,6 @@ async fn main() -> Result<()> { role_name: "harmony".to_string(), bound_audiences: cfg.oidc_audience.clone(), user_claim: "sub".to_string(), - policies: vec![HARMONY_CONFIG_POLICY.to_string()], ttl: "1h".to_string(), max_ttl: "8h".to_string(), }); @@ -124,6 +123,7 @@ path "secret/metadata/harmony/*" { capabilities = ["list","read"] }"# instance, kv_mount: "secret".to_string(), policies, + entities: vec![], users: vec![], jwt_auth, }; diff --git a/fleet/harmony-fleet-auth/Cargo.toml b/fleet/harmony-fleet-auth/Cargo.toml index 7e32f1e4..a99836e3 100644 --- a/fleet/harmony-fleet-auth/Cargo.toml +++ b/fleet/harmony-fleet-auth/Cargo.toml @@ -11,6 +11,7 @@ path = "src/lib.rs" [dependencies] harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" } +harmony_zitadel_auth = { path = "../../harmony_zitadel_auth" } async-nats = { workspace = true } anyhow = { workspace = true } chrono = { workspace = true } diff --git a/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs b/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs new file mode 100644 index 00000000..73d7c949 --- /dev/null +++ b/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs @@ -0,0 +1,26 @@ +use anyhow::{Context, Result}; +use harmony_fleet_auth::{CredentialSource, CredentialsSection, NatsCredential}; + +#[tokio::main] +async fn main() -> Result<()> { + let key_path = std::env::var("ZITADEL_KEY_PATH").context("ZITADEL_KEY_PATH missing")?; + let issuer = std::env::var("ZITADEL_ISSUER").context("ZITADEL_ISSUER missing")?; + let audience = std::env::var("ZITADEL_PROJECT_ID").context("ZITADEL_PROJECT_ID missing")?; + + let source = + CredentialSource::credential_source_from_config(&CredentialsSection::ZitadelJwt { + key_path: key_path.into(), + key_json: None, + oidc_issuer_url: issuer, + audience, + danger_accept_invalid_certs: false, + })?; + + match source.next_credential().await? { + NatsCredential::BearerToken(token) => { + println!("{token}"); + Ok(()) + } + NatsCredential::UserPass { .. } => anyhow::bail!("expected bearer token"), + } +} diff --git a/fleet/harmony-fleet-auth/src/credentials.rs b/fleet/harmony-fleet-auth/src/credentials.rs index 82aa799d..36982d55 100644 --- a/fleet/harmony-fleet-auth/src/credentials.rs +++ b/fleet/harmony-fleet-auth/src/credentials.rs @@ -23,14 +23,13 @@ //! cleaner than a Trait + factory. use std::path::Path; -use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::sync::Arc; use anyhow::{Context, Result}; -use jsonwebtoken::{Algorithm, EncodingKey, Header as JwtHeader}; -use serde::Deserialize; -use crate::config::CredentialsSection; +use harmony_zitadel_auth::{MachineKeyFile, ZitadelJwtBearer}; + +use crate::CredentialsSection; /// Material the NATS connector needs to authenticate. Returned per /// (re)connect attempt — the source decides whether to mint fresh. @@ -44,17 +43,8 @@ pub enum NatsCredential { /// from the parsed `[credentials]` section; cloned via Arc into the /// async-nats auth callback. pub enum CredentialSource { - TomlShared { - user: String, - pass: String, - }, - ZitadelJwt { - key: MachineKeyFile, - oidc_issuer_url: String, - audience: String, - http: reqwest::Client, - cache: Mutex>, - }, + TomlShared { user: String, pass: String }, + ZitadelJwt { bearer: Arc }, } impl CredentialSource { @@ -72,256 +62,58 @@ impl CredentialSource { } async fn zitadel_next(&self) -> Result { - // Fast path: lock the cache synchronously, copy out the token if - // it's comfortably valid, drop the lock. Holding a MutexGuard - // across `.await` would make this future !Sync, which - // async-nats's `with_auth_callback` rejects at compile time. - if let Some(token) = self.cached_if_fresh() { - return Ok(NatsCredential::BearerToken(token)); - } - // Slow path: mint outside any lock. Two concurrent (re)connect - // attempts could both reach here and both mint; that's a wasted - // HTTP round-trip in a rare race, not a correctness issue — - // the second writer wins and replaces the first's value. - let fresh = self.zitadel_mint().await?; - let token = fresh.access_token.clone(); - if let Self::ZitadelJwt { - cache, audience, .. - } = self - && let Ok(mut guard) = cache.lock() - { - *guard = Some(fresh); - tracing::info!(audience = %audience, "minted fresh Zitadel access token"); - } + let Self::ZitadelJwt { bearer } = self else { + anyhow::bail!("zitadel_next called on non-ZitadelJwt variant"); + }; + let token = bearer.bearer_token().await?; Ok(NatsCredential::BearerToken(token)) } - fn cached_if_fresh(&self) -> Option { - let Self::ZitadelJwt { cache, .. } = self else { - return None; - }; - let now = chrono::Utc::now().timestamp(); - let guard = cache.lock().ok()?; - let cached = guard.as_ref()?; - if cached.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now { - Some(cached.access_token.clone()) - } else { - None - } - } - - async fn zitadel_mint(&self) -> Result { - let Self::ZitadelJwt { - key, - oidc_issuer_url, - audience, - http, - .. - } = self - else { - anyhow::bail!("zitadel_mint called on non-ZitadelJwt variant"); - }; - - let now = chrono::Utc::now().timestamp(); - let assertion = build_assertion(key, oidc_issuer_url, now)?; - let scope = build_scope(audience); - let token_url = build_token_url(oidc_issuer_url); - - let resp = http - .post(&token_url) - .form(&[ - ( - "grant_type", - "urn:ietf:params:oauth:grant-type:jwt-bearer".to_string(), - ), - ("assertion", assertion), - ("scope", scope), - ]) - .send() - .await - .with_context(|| format!("POST {token_url}"))?; - - if !resp.status().is_success() { - let status = resp.status(); - let body = resp.text().await.unwrap_or_default(); - anyhow::bail!("Zitadel token endpoint returned {status}: {body}"); - } - - #[derive(Deserialize)] - struct TokenResponse { - access_token: String, - #[serde(default)] - expires_in: Option, - } - let tr: TokenResponse = resp.json().await.context("parsing token response")?; - // Zitadel typically returns 12h (43200s); be defensive against - // a missing field by assuming a conservative 1h. - let expires_in = tr.expires_in.unwrap_or(3600); - Ok(CachedToken { - access_token: tr.access_token, - expires_at_unix: now + expires_in, - }) - } -} - -/// Build the JWT-bearer assertion. Split out from the network path so -/// the claims + header shape can be unit-tested without an HTTP server, -/// and split internally into the (pure) claim/header builders so they -/// can be unit-tested without an RSA private key fixture. -pub(crate) fn build_assertion( - key: &MachineKeyFile, - oidc_issuer_url: &str, - now: i64, -) -> Result { - let claims = build_assertion_claims(key, oidc_issuer_url, now); - let header = build_assertion_header(key); - let assertion = jsonwebtoken::encode( - &header, - &claims, - &EncodingKey::from_rsa_pem(key.key.as_bytes()) - .context("parsing RSA private key from machine key file")?, - ) - .context("signing JWT assertion")?; - Ok(assertion) -} - -/// Pure claim payload for the JWT-bearer assertion. `iss == sub == userId` -/// is a Zitadel requirement; `aud` is Zitadel itself (the token endpoint -/// is reached via `oidc_issuer_url`); `exp - iat` MUST be ≤ 60 s or -/// Zitadel rejects. -pub(crate) fn build_assertion_claims( - key: &MachineKeyFile, - oidc_issuer_url: &str, - now: i64, -) -> serde_json::Value { - serde_json::json!({ - "iss": key.user_id, - "sub": key.user_id, - "aud": oidc_issuer_url, - "exp": now + ASSERTION_LIFETIME_SECS, - "iat": now, - }) -} - -/// JWT header for the assertion. The `kid` tells Zitadel which of the -/// machine user's registered keys to verify the signature against. -pub(crate) fn build_assertion_header(key: &MachineKeyFile) -> JwtHeader { - let mut header = JwtHeader::new(Algorithm::RS256); - header.kid = Some(key.key_id.clone()); - header -} - -/// Build the OAuth `scope` string for the token-bearer request. -/// -/// Three scopes are needed for the access token to be useful here: -/// -/// * `openid` — base OIDC requirement. -/// * `urn:zitadel:iam:org:projects:roles` (PLURAL "projects") — -/// tells Zitadel to include the role-claim block in the access -/// token. Without this, the callout sees "no authorized role -/// in token" even when the user has a project role grant. -/// * `urn:zitadel:iam:org:project:id::aud` (SINGULAR -/// "project") — adds to the access token's `aud` claim -/// so the callout's audience validation accepts the project -/// ID we're using as the JWT-bearer audience. -/// -/// The plural-vs-singular distinction is a Zitadel convention, -/// not a typo. Both scopes are required. -pub(crate) fn build_scope(audience: &str) -> String { - format!( - "openid \ - urn:zitadel:iam:org:projects:roles \ - urn:zitadel:iam:org:project:id:{audience}:aud" - ) -} - -/// Resolve the token endpoint URL, tolerating a trailing slash on -/// `oidc_issuer_url`. Without trimming, a configured issuer of -/// `https://sso.example.com/` produces `…//oauth/v2/token` which 404s. -pub(crate) fn build_token_url(oidc_issuer_url: &str) -> String { - format!("{}/oauth/v2/token", oidc_issuer_url.trim_end_matches('/')) -} - -// ---- helper types ---------------------------------------------------------- - -/// JSON keyfile content as Zitadel emits it for a `KEY_TYPE_JSON` -/// machine key. The `key` is a PEM-encoded RSA private key. -#[derive(Debug, Clone, Deserialize)] -pub struct MachineKeyFile { - #[serde(rename = "type")] - pub _type: String, - #[serde(rename = "keyId")] - pub key_id: String, - pub key: String, - #[serde(rename = "userId")] - pub user_id: String, -} - -#[derive(Debug, Clone)] -pub struct CachedToken { - pub(crate) access_token: String, - /// Unix seconds at which the token is no longer trusted by - /// `cached_if_fresh`. Computed from the OAuth response's `expires_in` - /// and the local clock at mint time. - pub(crate) expires_at_unix: i64, -} - -/// Refresh tokens this many seconds before their advertised expiry. -/// Five minutes leaves headroom for clock skew, slow networks, and -/// the round-trip cost of re-minting against Zitadel. -pub const TOKEN_REFRESH_LEEWAY_SECS: i64 = 5 * 60; - -/// Lifetime of the JWT *assertion* (the client-side bearer JWT we sign -/// to authenticate to Zitadel's token endpoint). Zitadel rejects -/// assertions with `exp - iat > 60s`; one minute is the safe ceiling. -pub const ASSERTION_LIFETIME_SECS: i64 = 60; - -// ---- factory --------------------------------------------------------------- - -/// Build the appropriate `CredentialSource` from the parsed config. -/// -/// For [`CredentialsSection::ZitadelJwt`] this reads the keyfile from -/// disk. Both the agent and the operator mount their key as a file -/// (Secret volume in the operator's Pod, dropped by -/// `FleetDeviceSetupScore` on the agent's VM); the path is just -/// configured differently. -pub fn credential_source_from_config(creds: &CredentialsSection) -> Result> { - match creds { - CredentialsSection::TomlShared { - nats_user, - nats_pass, - } => Ok(Arc::new(CredentialSource::TomlShared { - user: nats_user.clone(), - pass: nats_pass.clone(), - })), - CredentialsSection::ZitadelJwt { - key_path, - key_json, - oidc_issuer_url, - audience, - danger_accept_invalid_certs, - } => { - // `key_json` (inline) wins over `key_path` (file). The - // operator pod uses inline because OKD's restricted-v2 - // SCC + env-var-from-Secret deployment shape can't - // reliably mount Secret volumes; the agent uses the file - // path because it lives on a VM and a real file is the - // more natural rotation target. - let key = match key_json.as_deref().map(str::trim) { - Some(json) if !json.is_empty() => parse_machine_key(json)?, - _ => load_machine_key(key_path)?, - }; - Ok(Arc::new(CredentialSource::ZitadelJwt { - key, - oidc_issuer_url: oidc_issuer_url.clone(), - audience: audience.clone(), - http: reqwest::Client::builder() - .danger_accept_invalid_certs(*danger_accept_invalid_certs) - .timeout(Duration::from_secs(10)) - .build() - .context("building HTTP client for Zitadel token endpoint")?, - cache: Mutex::new(None), - })) + /// Build the appropriate `CredentialSource` from the parsed config. + /// + /// For [`CredentialsSection::ZitadelJwt`] this reads the keyfile from + /// disk. Both the agent and the operator mount their key as a file + /// (Secret volume in the operator's Pod, dropped by + /// `FleetDeviceSetupScore` on the agent's VM); the path is just + /// configured differently. + pub fn credential_source_from_config( + creds: &CredentialsSection, + ) -> Result> { + match creds { + CredentialsSection::TomlShared { + nats_user, + nats_pass, + } => Ok(Arc::new(CredentialSource::TomlShared { + user: nats_user.clone(), + pass: nats_pass.clone(), + })), + CredentialsSection::ZitadelJwt { + key_path, + key_json, + oidc_issuer_url, + audience, + danger_accept_invalid_certs, + } => { + // `key_json` (inline) wins over `key_path` (file). The + // operator pod uses inline because OKD's restricted-v2 + // SCC + env-var-from-Secret deployment shape can't + // reliably mount Secret volumes; the agent uses the file + // path because it lives on a VM and a real file is the + // more natural rotation target. + let key = match key_json.as_deref().map(str::trim) { + Some(json) if !json.is_empty() => parse_machine_key(json)?, + _ => load_machine_key(key_path)?, + }; + let bearer = ZitadelJwtBearer::new( + key, + oidc_issuer_url.clone(), + audience.clone(), + *danger_accept_invalid_certs, + )?; + Ok(Arc::new(CredentialSource::ZitadelJwt { + bearer: Arc::new(bearer), + })) + } } } } @@ -341,30 +133,6 @@ fn parse_machine_key(raw: &str) -> Result { mod tests { use super::*; - fn fake_key() -> MachineKeyFile { - MachineKeyFile { - _type: "serviceaccount".to_string(), - key_id: "kid-371358469099356247".to_string(), - // Real PEM not required for the pure-builder tests; the - // signing path that needs a parseable key is exercised - // end-to-end in the e2e harness. - key: "PEM-PLACEHOLDER".to_string(), - user_id: "uid-371358469065801815".to_string(), - } - } - - fn zjwt_source() -> CredentialSource { - CredentialSource::ZitadelJwt { - key: fake_key(), - oidc_issuer_url: "http://sso.fleet.local:8080".to_string(), - audience: "366378028009259037".to_string(), - http: reqwest::Client::new(), - cache: Mutex::new(None), - } - } - - // ---- next_credential / cache state ------------------------------------- - #[tokio::test] async fn toml_shared_returns_userpass_each_call() { let s = CredentialSource::TomlShared { @@ -380,174 +148,4 @@ mod tests { other => panic!("expected UserPass, got {other:?}"), } } - - #[test] - fn cached_token_within_leeway_is_treated_as_expired() { - // Sanity-check the comparison so refactors don't accidentally - // invert the leeway window. - let now = chrono::Utc::now().timestamp(); - let about_to_expire = CachedToken { - access_token: "x".to_string(), - expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS - 1, - }; - assert!( - about_to_expire.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS <= now, - "tokens within the leeway window must be considered expired" - ); - - let comfortable = CachedToken { - access_token: "x".to_string(), - expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS + 60, - }; - assert!( - comfortable.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now, - "tokens with comfortable headroom must be cache-hits" - ); - } - - #[test] - fn cached_if_fresh_returns_some_when_outside_leeway() { - let src = zjwt_source(); - let now = chrono::Utc::now().timestamp(); - if let CredentialSource::ZitadelJwt { cache, .. } = &src { - *cache.lock().unwrap() = Some(CachedToken { - access_token: "fresh".to_string(), - expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS + 60, - }); - } - assert_eq!(src.cached_if_fresh(), Some("fresh".to_string())); - } - - #[test] - fn cached_if_fresh_returns_none_when_no_cache() { - // Brand-new ZitadelJwt source — no token has been minted yet. - // Forces the slow path on first connect. - let src = zjwt_source(); - assert_eq!(src.cached_if_fresh(), None); - } - - #[test] - fn cached_if_fresh_returns_none_for_toml_shared() { - // Defensive: cache_if_fresh is only meaningful for ZitadelJwt; - // TomlShared has no cache. A nonsensical call must return None, - // not panic, so the cold-path can degrade gracefully. - let src = CredentialSource::TomlShared { - user: "u".into(), - pass: "p".into(), - }; - assert_eq!(src.cached_if_fresh(), None); - } - - // ---- assertion claims / header (pure builders) ------------------------ - - #[test] - fn assertion_claims_carry_iss_sub_aud_exp_iat() { - let now = 1_700_000_000; - let claims = build_assertion_claims(&fake_key(), "http://sso.fleet.local:8080", now); - assert_eq!(claims["iss"], "uid-371358469065801815"); - assert_eq!(claims["sub"], "uid-371358469065801815"); - assert_eq!(claims["aud"], "http://sso.fleet.local:8080"); - assert_eq!(claims["iat"].as_i64(), Some(now)); - assert_eq!(claims["exp"].as_i64(), Some(now + ASSERTION_LIFETIME_SECS)); - } - - #[test] - fn assertion_lifetime_locked_at_60_seconds() { - // Zitadel rejects assertions where exp - iat > 60s. If anyone - // bumps ASSERTION_LIFETIME_SECS thinking "more is safer", the - // mints will silently start failing in prod with no helpful - // error. Lock the constant. - assert_eq!(ASSERTION_LIFETIME_SECS, 60); - } - - #[test] - fn assertion_header_carries_kid_and_rs256() { - let header = build_assertion_header(&fake_key()); - assert_eq!(header.alg, jsonwebtoken::Algorithm::RS256); - assert_eq!(header.kid.as_deref(), Some("kid-371358469099356247")); - } - - // ---- scope string ------------------------------------------------------ - - #[test] - fn scope_includes_plural_projects_roles() { - // The plural-projects URN is what tells Zitadel to emit the - // role claim. Day-one bug; lock it. - let s = build_scope("366378028009259037"); - assert!( - s.contains("urn:zitadel:iam:org:projects:roles"), - "scope must include the PLURAL projects-roles URN; got {s:?}" - ); - } - - #[test] - fn scope_audience_uses_singular_project_id_urn() { - // The singular-project URN tells Zitadel to put into the - // access token's aud claim. Different URN entirely from the - // plural one above; both required. - let s = build_scope("366378028009259037"); - assert!( - s.contains("urn:zitadel:iam:org:project:id:366378028009259037:aud"), - "scope must include the SINGULAR project:id::aud URN; got {s:?}" - ); - } - - #[test] - fn scope_includes_openid_base() { - let s = build_scope("any"); - assert!( - s.split_whitespace().any(|tok| tok == "openid"), - "scope must include `openid` as a standalone token; got {s:?}" - ); - } - - // ---- token URL --------------------------------------------------------- - - #[test] - fn token_url_appends_oauth_endpoint() { - assert_eq!( - build_token_url("http://sso.fleet.local:8080"), - "http://sso.fleet.local:8080/oauth/v2/token" - ); - } - - #[test] - fn token_url_strips_single_trailing_slash() { - // A trailing slash would yield `…//oauth/v2/token`, which 404s. - // Common configuration drift; the trim guards against it. - assert_eq!( - build_token_url("http://sso.fleet.local:8080/"), - "http://sso.fleet.local:8080/oauth/v2/token" - ); - } - - #[test] - fn token_url_strips_multiple_trailing_slashes() { - // Defensive — `trim_end_matches('/')` peels all of them, not - // just the first. Locks that semantics. - assert_eq!( - build_token_url("http://sso.fleet.local:8080///"), - "http://sso.fleet.local:8080/oauth/v2/token" - ); - } - - // ---- MachineKeyFile JSON parsing -------------------------------------- - - #[test] - fn machine_key_file_parses_zitadel_json_shape() { - // The serde renames (`type`, `keyId`, `userId`) are easy to - // break. This is the literal JSON shape Zitadel's - // /management/v1/users/.../keys endpoint emits. - let raw = r#"{ - "type": "serviceaccount", - "keyId": "371358469099356247", - "key": "-----BEGIN RSA PRIVATE KEY-----\nABC\n-----END RSA PRIVATE KEY-----\n", - "userId": "371358469065801815" - }"#; - let parsed: MachineKeyFile = serde_json::from_str(raw).expect("valid keyfile"); - assert_eq!(parsed._type, "serviceaccount"); - assert_eq!(parsed.key_id, "371358469099356247"); - assert_eq!(parsed.user_id, "371358469065801815"); - assert!(parsed.key.contains("BEGIN RSA PRIVATE KEY")); - } } diff --git a/fleet/harmony-fleet-auth/src/lib.rs b/fleet/harmony-fleet-auth/src/lib.rs index 3cce33e1..7932c75f 100644 --- a/fleet/harmony-fleet-auth/src/lib.rs +++ b/fleet/harmony-fleet-auth/src/lib.rs @@ -27,13 +27,17 @@ mod credentials; pub use agent_config::{AgentConfig, AgentSection, NatsSection, load_config}; pub use config::CredentialsSection; -pub use credentials::{ - ASSERTION_LIFETIME_SECS, CachedToken, CredentialSource, MachineKeyFile, NatsCredential, - TOKEN_REFRESH_LEEWAY_SECS, credential_source_from_config, -}; use std::sync::Arc; +pub use crate::credentials::{CredentialSource, NatsCredential}; + +pub fn credential_source_from_config( + creds: &CredentialsSection, +) -> anyhow::Result> { + CredentialSource::credential_source_from_config(creds) +} + /// Build `async_nats::ConnectOptions` wired with the auth callback /// that pulls fresh credentials from `creds` on every (re)connect. /// diff --git a/fleet/harmony-fleet-e2e/Cargo.toml b/fleet/harmony-fleet-e2e/Cargo.toml index f11246e0..59b6d97c 100644 --- a/fleet/harmony-fleet-e2e/Cargo.toml +++ b/fleet/harmony-fleet-e2e/Cargo.toml @@ -18,6 +18,10 @@ path = "tests/ping.rs" name = "operator" path = "tests/operator.rs" +[[test]] +name = "openbao_policy" +path = "tests/openbao_policy.rs" + [[test]] name = "vm_ping" path = "tests/vm_ping.rs" @@ -39,6 +43,7 @@ harmony-fleet-auth = { path = "../harmony-fleet-auth" } harmony-fleet-deploy = { path = "../harmony-fleet-deploy" } harmony-fleet-operator = { path = "../harmony-fleet-operator" } harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" } +harmony-k8s = { path = "../../harmony-k8s" } harmony_types = { path = "../../harmony_types" } k3d-rs = { path = "../../k3d" } diff --git a/fleet/harmony-fleet-e2e/tests/openbao_policy.rs b/fleet/harmony-fleet-e2e/tests/openbao_policy.rs new file mode 100644 index 00000000..e9eeeaee --- /dev/null +++ b/fleet/harmony-fleet-e2e/tests/openbao_policy.rs @@ -0,0 +1,226 @@ +use std::sync::Arc; + +use harmony::inventory::Inventory; +use harmony::modules::openbao::{ + OpenbaoInstance, OpenbaoPolicy, OpenbaoScore, OpenbaoSetupScore, OpenbaoUser, cached_root_token, +}; +use harmony::score::Score; +use harmony::topology::{K8sAnywhereTopology, K8sclient}; +use harmony_fleet_e2e::{StackOptions, shared_stack}; + +const E2E_ENV: &str = "HARMONY_FLEET_E2E"; +const RELEASE: &str = "openbao-policy-test"; +const USER: &str = "policy-device"; +const PASSWORD: &str = "policy-device-pass"; +const POLICY: &str = "deployment-policy-test"; +const SECRET_PATH: &str = "fleet-staging/deployment-a"; + +fn e2e_enabled() -> bool { + matches!(std::env::var(E2E_ENV).as_deref(), Ok("1" | "true")) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn entity_policy_grants_existing_openbao_token_access() -> anyhow::Result<()> { + if !e2e_enabled() { + skip_e2e(); + return Ok(()); + } + + let stack = openbao_stack().await?; + stack.print_debug_info(); + + let instance = OpenbaoInstance { + namespace: stack.namespace.clone(), + release: RELEASE.to_string(), + }; + deploy_openbao(&instance).await?; + + let topology = K8sAnywhereTopology::from_env(); + let k8s = topology.k8s_client().await.map_err(anyhow::Error::msg)?; + let root_token = cached_root_token(&instance).map_err(anyhow::Error::msg)?; + let user_token = login_user(&k8s, &instance).await?; + let entity_id = token_entity_id(&k8s, &instance, &user_token).await?; + + set_entity_policies(&k8s, &instance, &root_token, &entity_id, "").await?; + assert_secret_denied(&k8s, &instance, &user_token).await?; + + set_entity_policies(&k8s, &instance, &root_token, &entity_id, POLICY).await?; + assert_secret_allowed(&k8s, &instance, &user_token).await?; + + set_entity_policies(&k8s, &instance, &root_token, &entity_id, "").await?; + Ok(()) +} + +async fn openbao_stack() -> anyhow::Result> { + let _ = tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .try_init(); + + Ok(shared_stack(StackOptions { + deploy_agent: false, + deploy_operator: false, + ..StackOptions::infra_only() + }) + .await?) +} + +async fn deploy_openbao(instance: &OpenbaoInstance) -> anyhow::Result<()> { + let topology = K8sAnywhereTopology::from_env(); + let policy_hcl = format!( + r#"path "secret/data/{SECRET_PATH}" {{ capabilities = ["read"] }} +path "secret/metadata/{SECRET_PATH}" {{ capabilities = ["read"] }}"# + ); + let scores: Vec>> = vec![ + Box::new(OpenbaoScore { + instance: instance.clone(), + host: "openbao-policy-test.local".to_string(), + openshift: false, + tls_issuer: None, + }), + Box::new(OpenbaoSetupScore { + instance: instance.clone(), + kv_mount: "secret".to_string(), + policies: vec![OpenbaoPolicy { + name: POLICY.to_string(), + hcl: policy_hcl, + }], + users: vec![OpenbaoUser { + username: USER.to_string(), + password: PASSWORD.to_string(), + policies: vec![], + }], + jwt_auth: None, + }), + ]; + + for score in scores { + score.interpret(&Inventory::autoload(), &topology).await?; + } + let k8s = topology.k8s_client().await.map_err(anyhow::Error::msg)?; + let root_token = cached_root_token(instance).map_err(anyhow::Error::msg)?; + bao( + &k8s, + instance, + &root_token, + "bao kv put secret/fleet-staging/deployment-a value=e2e", + ) + .await?; + Ok(()) +} + +async fn login_user( + k8s: &harmony_k8s::K8sClient, + instance: &OpenbaoInstance, +) -> anyhow::Result { + let out = exec( + k8s, + instance, + &format!("bao login -method=userpass -format=json username={USER} password={PASSWORD}"), + ) + .await?; + Ok( + serde_json::from_str::(&out)?["auth"]["client_token"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("login response missing auth.client_token: {out}"))? + .to_string(), + ) +} + +async fn token_entity_id( + k8s: &harmony_k8s::K8sClient, + instance: &OpenbaoInstance, + token: &str, +) -> anyhow::Result { + let out = bao(k8s, instance, token, "bao token lookup -format=json").await?; + Ok( + serde_json::from_str::(&out)?["data"]["entity_id"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("token lookup response missing data.entity_id: {out}"))? + .to_string(), + ) +} + +async fn set_entity_policies( + k8s: &harmony_k8s::K8sClient, + instance: &OpenbaoInstance, + root_token: &str, + entity_id: &str, + policies: &str, +) -> anyhow::Result<()> { + bao( + k8s, + instance, + root_token, + &format!("bao write identity/entity/id/{entity_id} policies={policies}"), + ) + .await?; + Ok(()) +} + +async fn assert_secret_denied( + k8s: &harmony_k8s::K8sClient, + instance: &OpenbaoInstance, + token: &str, +) -> anyhow::Result<()> { + let result = bao( + k8s, + instance, + token, + &format!("bao kv get secret/{SECRET_PATH}"), + ) + .await; + assert!(result.is_err(), "secret read unexpectedly succeeded"); + Ok(()) +} + +async fn assert_secret_allowed( + k8s: &harmony_k8s::K8sClient, + instance: &OpenbaoInstance, + token: &str, +) -> anyhow::Result<()> { + bao( + k8s, + instance, + token, + &format!("bao kv get secret/{SECRET_PATH}"), + ) + .await?; + Ok(()) +} + +async fn bao( + k8s: &harmony_k8s::K8sClient, + instance: &OpenbaoInstance, + token: &str, + command: &str, +) -> anyhow::Result { + exec( + k8s, + instance, + &format!("export VAULT_TOKEN={token} && {command}"), + ) + .await +} + +async fn exec( + k8s: &harmony_k8s::K8sClient, + instance: &OpenbaoInstance, + command: &str, +) -> anyhow::Result { + k8s.exec_pod_capture_output( + &instance.pod(), + Some(&instance.namespace), + vec!["sh", "-c", command], + ) + .await + .map_err(|e| anyhow::anyhow!(e)) +} + +fn skip_e2e() { + eprintln!( + "skipping {E2E_ENV}-gated OpenBao e2e test (set {E2E_ENV}=1 to run; requires k3d + helm)" + ); +} diff --git a/fleet/harmony-fleet-operator/dev.sh b/fleet/harmony-fleet-operator/dev.sh index 812f8272..a73268bf 100644 --- a/fleet/harmony-fleet-operator/dev.sh +++ b/fleet/harmony-fleet-operator/dev.sh @@ -1,12 +1,12 @@ #!/bin/bash export BASE_URL=http://localhost:18080 -export FLEET_AUTH_ZITADEL_BASE=https://sso-stg.cb1.nationtech.io -export FLEET_AUTH_CLIENT_ID=372626218874372917 -export FLEET_AUTH_SCOPE="openid profile email" -export FLEET_AUTH_LOGOUT_REDIRECT_URI="http://localhost:18080/" -export FLEET_OPERATOR_COOKIE_KEY_B64=6eKVpj88jwIcmaJajPfohdaIXhSPlfYCrHaOfymTcIWBAIadvhg7NHpMo5vPSMy90vac3cq2liWe1naSgkbaYg== -export FLEET_AUTH_TRUSTED_AUDIENCES=371639797493596981,371683318111994677,372626218874372917,371639797157987125 +export HARMONY_SSO_ZITADEL_BASE=https://sso-stg.cb1.nationtech.io +export HARMONY_SSO_CLIENT_ID=372626218874372917 +export HARMONY_SSO_SCOPE="openid profile email" +export HARMONY_SSO_LOGOUT_REDIRECT_URI="http://localhost:18080/" +export HARMONY_COOKIE_KEY_B64=6eKVpj88jwIcmaJajPfohdaIXhSPlfYCrHaOfymTcIWBAIadvhg7NHpMo5vPSMy90vac3cq2liWe1naSgkbaYg== +export HARMONY_SSO_TRUSTED_AUDIENCES=371639797493596981,371683318111994677,372626218874372917,371639797157987125 export BASE_URL=http://localhost:18080 export RUST_LOG=debug diff --git a/fleet/harmony-fleet-operator/src/fleet_aggregator.rs b/fleet/harmony-fleet-operator/src/fleet_aggregator.rs index 9eec10df..ac202f94 100644 --- a/fleet/harmony-fleet-operator/src/fleet_aggregator.rs +++ b/fleet/harmony-fleet-operator/src/fleet_aggregator.rs @@ -36,6 +36,7 @@ use tokio::sync::Mutex; use harmony::modules::fleet::operator::{ AggregateLastError, Deployment, DeploymentAggregate, Device, }; +use harmony::modules::fleet::secret_access::{DeviceId, DeviceSecretAccess}; use tracing::warn; const PATCH_TICK: Duration = Duration::from_secs(1); @@ -149,7 +150,11 @@ fn matched_devices( // Top-level run // --------------------------------------------------------------------------- -pub async fn run(client: Client, js: async_nats::jetstream::Context) -> anyhow::Result<()> { +pub async fn run( + client: Client, + js: async_nats::jetstream::Context, + secret_access: Option>, +) -> anyhow::Result<()> { let state_bucket = js .create_key_value(async_nats::jetstream::kv::Config { bucket: BUCKET_DEVICE_STATE.to_string(), @@ -197,8 +202,11 @@ pub async fn run(client: Client, js: async_nats::jetstream::Context) -> anyhow:: let deployment_watcher_handle = { let state = state.clone(); let desired = desired_bucket.clone(); + let access = secret_access.clone(); tokio::spawn(async move { - if let Err(e) = run_deployment_watcher(deployments_api.clone(), state, desired).await { + if let Err(e) = + run_deployment_watcher(deployments_api.clone(), state, desired, access).await + { tracing::warn!(error = %e, "aggregator: deployment watcher exited"); } }) @@ -207,8 +215,9 @@ pub async fn run(client: Client, js: async_nats::jetstream::Context) -> anyhow:: let device_watcher_handle = { let state = state.clone(); let desired = desired_bucket.clone(); + let access = secret_access.clone(); tokio::spawn(async move { - if let Err(e) = run_device_watcher(devices_api, state, desired).await { + if let Err(e) = run_device_watcher(devices_api, state, desired, access).await { tracing::warn!(error = %e, "aggregator: device watcher exited"); } }) @@ -281,6 +290,44 @@ async fn run_state_kv_watcher(bucket: Store, state: SharedFleetState) -> anyhow: Ok(()) } +/// Deployments whose secrets `device_id` may currently read — the set it +/// is a target of. Mapping a deployment onto a backend's access +/// primitive (an OpenBao policy, …) lives in the `DeviceSecretAccess` +/// impl, not here. +fn device_deployments(state: &FleetState, device_id: &str) -> Vec { + state + .owned_targets + .iter() + .filter(|(_, devices)| devices.contains(device_id)) + .map(|(dn, _)| dn.clone()) + .collect() +} + +/// Push the current grant set for `devices` to the secret-access backend +/// in one batch. Non-fatal: a backend hiccup logs and is retried on the +/// next membership change rather than tearing down reconciliation. +/// Skipped entirely when no backend is configured (dev without OpenBao). +async fn sync_secret_access( + state: &SharedFleetState, + access: Option<&Arc>, + devices: impl IntoIterator, +) { + let Some(access) = access else { return }; + let grants: Vec<(DeviceId, Vec)> = { + let guard = state.lock().await; + devices + .into_iter() + .map(|d| (DeviceId::from(d.clone()), device_deployments(&guard, &d))) + .collect() + }; + if grants.is_empty() { + return; + } + if let Err(e) = access.set_device_grants(&grants).await { + warn!(error = %e, "aggregator: secret access sync failed"); + } +} + /// Record a device's latest state, dedup against older timestamps, /// maintain last_error, mark the deployment dirty. pub fn apply_state(state: &mut FleetState, pair: DevicePair, ds: DeploymentState) { @@ -359,15 +406,16 @@ async fn run_deployment_watcher( api: Api, state: SharedFleetState, desired: Store, + access: Option>, ) -> anyhow::Result<()> { let mut stream = watcher::watcher(api, WatcherConfig::default()).boxed(); while let Some(event) = stream.try_next().await? { match event { Event::Apply(cr) | Event::InitApply(cr) => { - on_deployment_upsert(&state, &desired, cr).await; + on_deployment_upsert(&state, &desired, access.as_ref(), cr).await; } Event::Delete(cr) => { - on_deployment_delete(&state, &desired, cr).await; + on_deployment_delete(&state, &desired, access.as_ref(), cr).await; } Event::Init | Event::InitDone => {} } @@ -375,7 +423,12 @@ async fn run_deployment_watcher( Ok(()) } -async fn on_deployment_upsert(state: &SharedFleetState, desired: &Store, cr: Deployment) { +async fn on_deployment_upsert( + state: &SharedFleetState, + desired: &Store, + access: Option<&Arc>, + cr: Deployment, +) { let Some(key) = DeploymentKey::from_cr(&cr) else { return; }; @@ -423,9 +476,19 @@ async fn on_deployment_upsert(state: &SharedFleetState, desired: &Store, cr: Dep &score_json, ) .await; + + // Membership change touches the symmetric difference: devices that + // gained or lost this deployment need their grant set recomputed. + let affected: HashSet = new_targets.union(&previous_targets).cloned().collect(); + sync_secret_access(state, access, affected).await; } -async fn on_deployment_delete(state: &SharedFleetState, desired: &Store, cr: Deployment) { +async fn on_deployment_delete( + state: &SharedFleetState, + desired: &Store, + access: Option<&Arc>, + cr: Deployment, +) { let Some(key) = DeploymentKey::from_cr(&cr) else { return; }; @@ -453,6 +516,10 @@ async fn on_deployment_delete(state: &SharedFleetState, desired: &Store, cr: Dep tracing::debug!(key = %k, error = %e, "aggregator: desired-state delete on CR delete failed"); } } + + // The deployment is gone from owned_targets, so each formerly-owned + // device recomputes to a grant set without it. + sync_secret_access(state, access, previous).await; } // --------------------------------------------------------------------------- @@ -467,15 +534,16 @@ async fn run_device_watcher( api: Api, state: SharedFleetState, desired: Store, + access: Option>, ) -> anyhow::Result<()> { let mut stream = watcher::watcher(api, WatcherConfig::default()).boxed(); while let Some(event) = stream.try_next().await? { match event { Event::Apply(dev) | Event::InitApply(dev) => { - on_device_upsert(&state, &desired, dev).await; + on_device_upsert(&state, &desired, access.as_ref(), dev).await; } Event::Delete(dev) => { - on_device_delete(&state, &desired, dev).await; + on_device_delete(&state, &desired, access.as_ref(), dev).await; } Event::Init | Event::InitDone => {} } @@ -483,7 +551,12 @@ async fn run_device_watcher( Ok(()) } -async fn on_device_upsert(state: &SharedFleetState, desired: &Store, dev: Device) { +async fn on_device_upsert( + state: &SharedFleetState, + desired: &Store, + access: Option<&Arc>, + dev: Device, +) { let name = dev.name_any(); let labels: BTreeMap = dev.metadata.labels.clone().unwrap_or_default(); @@ -547,9 +620,16 @@ async fn on_device_upsert(state: &SharedFleetState, desired: &Store, dev: Device _ => {} } } + + sync_secret_access(state, access, std::iter::once(name)).await; } -async fn on_device_delete(state: &SharedFleetState, desired: &Store, dev: Device) { +async fn on_device_delete( + state: &SharedFleetState, + desired: &Store, + access: Option<&Arc>, + dev: Device, +) { let name = dev.name_any(); let removed_from: Vec = { let mut guard = state.lock().await; @@ -573,6 +653,10 @@ async fn on_device_delete(state: &SharedFleetState, desired: &Store, dev: Device tracing::debug!(key = %k, error = %e, "aggregator: desired-state delete on device delete failed"); } } + + // Device is gone from every owned_targets set, so its grant set + // recomputes to empty — clearing all secret access. + sync_secret_access(state, access, std::iter::once(name)).await; } // --------------------------------------------------------------------------- @@ -817,6 +901,55 @@ mod tests { assert_eq!(agg.pending, 1); } + fn devs(ids: &[&str]) -> HashSet { + ids.iter().map(|s| s.to_string()).collect() + } + + #[tokio::test] + async fn sync_pushes_full_deployment_set_per_device() { + use harmony::modules::fleet::secret_access::InMemoryDeviceSecretAccess; + + let mut s = FleetState::default(); + s.owned_targets.insert(dn("web"), devs(&["pi-01", "pi-02"])); + s.owned_targets.insert(dn("db"), devs(&["pi-01"])); + let state: SharedFleetState = Arc::new(Mutex::new(s)); + + let inmem = Arc::new(InMemoryDeviceSecretAccess::new()); + let access: Arc = inmem.clone(); + + sync_secret_access( + &state, + Some(&access), + ["pi-01".to_string(), "pi-02".to_string()], + ) + .await; + + // pi-01 runs both deployments; the impl receives the full set. + assert_eq!( + inmem.grants_for(&DeviceId::from("pi-01")), + vec![dn("db"), dn("web")] + ); + assert_eq!(inmem.grants_for(&DeviceId::from("pi-02")), vec![dn("web")]); + } + + #[tokio::test] + async fn sync_clears_access_for_unmatched_device() { + use harmony::modules::fleet::secret_access::InMemoryDeviceSecretAccess; + + let inmem = Arc::new(InMemoryDeviceSecretAccess::new()); + let access: Arc = inmem.clone(); + access + .set_device_grants(&[(DeviceId::from("pi-01"), vec![dn("web")])]) + .await + .unwrap(); + + // owned_targets no longer contains pi-01 (device deleted / no match). + let state: SharedFleetState = Arc::new(Mutex::new(FleetState::default())); + sync_secret_access(&state, Some(&access), std::iter::once("pi-01".to_string())).await; + + assert!(inmem.grants_for(&DeviceId::from("pi-01")).is_empty()); + } + #[test] fn matched_devices_picks_by_label() { let mut ml = BTreeMap::new(); diff --git a/fleet/harmony-fleet-operator/src/main.rs b/fleet/harmony-fleet-operator/src/main.rs index d4ff8e4b..edd74f17 100644 --- a/fleet/harmony-fleet-operator/src/main.rs +++ b/fleet/harmony-fleet-operator/src/main.rs @@ -3,6 +3,8 @@ mod controller; mod frontend; mod service; +use harmony::modules::fleet::secret_access::DeviceSecretAccess; +use harmony::modules::openbao::OpenBaoDeviceSecretAccess; use harmony_fleet_operator::{device_reconciler, fleet_aggregator}; use anyhow::{Context, Result}; @@ -15,6 +17,7 @@ use harmony_reconciler_contracts::BUCKET_DESIRED_STATE; use kube::Client; #[cfg(feature = "web-frontend")] use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; #[derive(Parser)] @@ -53,6 +56,12 @@ struct Cli { global = true )] credentials_toml: String, + + #[arg(long, env = "OPENBAO_URL", global = true)] + openbao_url: Option, + + #[arg(long, env = "OPENBAO_TOKEN", global = true)] + openbao_token: Option, } #[derive(Subcommand)] @@ -95,7 +104,16 @@ async fn main() -> Result<()> { let cli = Cli::parse(); match cli.command.unwrap_or(Command::Run) { - Command::Run => run(&cli.nats_url, &cli.kv_bucket, &cli.credentials_toml).await, + Command::Run => { + run( + &cli.nats_url, + &cli.kv_bucket, + &cli.credentials_toml, + &cli.openbao_url, + &cli.openbao_token, + ) + .await + } #[cfg(feature = "web-frontend")] Command::ServeWeb { mock, @@ -152,7 +170,13 @@ async fn serve_web( .await } -async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()> { +async fn run( + nats_url: &str, + bucket: &str, + credentials_toml: &str, + openbao_url: &Option, + openbao_token: &Option, +) -> Result<()> { let nats = connect_with_retry(nats_url, credentials_toml).await?; tracing::info!(url = %nats_url, "connected to NATS"); let js = jetstream::new(nats); @@ -165,6 +189,19 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()> tracing::info!(bucket = %bucket, "KV bucket ready"); let client = Client::try_default().await?; + let secret_access: Option> = match (openbao_url, openbao_token) { + (Some(url), Some(token)) => { + tracing::info!(url = %url, "OpenBao secret-access sync enabled"); + Some(Arc::new(OpenBaoDeviceSecretAccess::new( + url.clone(), + token.clone(), + ))) + } + _ => { + tracing::warn!("OPENBAO_URL or OPENBAO_TOKEN not set; secret-access sync disabled"); + None + } + }; // Three concurrent tasks: // controller — CR validation + finalizer-cleanup @@ -179,7 +216,7 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()> tokio::select! { r = controller::run(ctl_client, desired_state_kv) => r, r = device_reconciler::run(dr_client, dr_js) => r, - r = fleet_aggregator::run(client, js) => r, + r = fleet_aggregator::run(client, js, secret_access) => r, } } diff --git a/harmony/src/modules/fleet/mod.rs b/harmony/src/modules/fleet/mod.rs index e7c830b2..71a99e75 100644 --- a/harmony/src/modules/fleet/mod.rs +++ b/harmony/src/modules/fleet/mod.rs @@ -23,6 +23,7 @@ pub mod assets; pub mod libvirt_pool; pub mod operator; pub mod preflight; +pub mod secret_access; mod setup_score; #[cfg(all(feature = "kvm", unix))] mod vm_score; diff --git a/harmony/src/modules/fleet/secret_access.rs b/harmony/src/modules/fleet/secret_access.rs new file mode 100644 index 00000000..a470cc13 --- /dev/null +++ b/harmony/src/modules/fleet/secret_access.rs @@ -0,0 +1,140 @@ +//! Device → secret access control: the domain boundary the fleet +//! operator drives when device membership changes. +//! +//! A device may read the secrets of the deployments it currently runs. +//! How a "deployment" maps onto a backend's access primitive (an OpenBao +//! policy, a Vault role, …) is the *implementation's* concern; callers +//! speak only devices and deployments. That separation is what lets the +//! operator be unit-tested against [`InMemoryDeviceSecretAccess`] with no +//! live secret store — the production backend is +//! [`crate::modules::openbao::OpenBaoDeviceSecretAccess`]. + +use std::collections::{BTreeMap, BTreeSet}; +use std::sync::Mutex; + +use async_trait::async_trait; +use harmony_reconciler_contracts::DeploymentName; +use harmony_types::id::Id; + +/// Identifies a fleet device — the same `Id` the wire contracts carry as +/// `device_id`. +pub type DeviceId = Id; + +#[derive(Debug, thiserror::Error)] +pub enum SecretAccessError { + #[error("secret access backend error: {0}")] + Backend(String), +} + +/// Sets which deployments' secrets a device may read. +/// +/// One selector or deployment change can touch many devices at once, so +/// the operation is batched; an implementation chunks large batches as +/// its backend requires (you can neither make a million calls nor send a +/// million-element body at once). +#[async_trait] +pub trait DeviceSecretAccess: Send + Sync { + /// Set the *complete* set of deployments each listed device may read + /// secrets for. Absolute and idempotent: afterwards a device can + /// access exactly the deployments given — extras are revoked, an + /// empty list clears all access. Devices not listed are untouched. + async fn set_device_grants( + &self, + grants: &[(DeviceId, Vec)], + ) -> Result<(), SecretAccessError>; +} + +/// In-memory [`DeviceSecretAccess`] for tests: records the latest grant +/// set per device so a test can assert the operator synced the right +/// access on each flow, with nothing deployed. +#[derive(Debug, Default)] +pub struct InMemoryDeviceSecretAccess { + grants: Mutex>>, +} + +impl InMemoryDeviceSecretAccess { + pub fn new() -> Self { + Self::default() + } + + /// Current grants for `device`, sorted. Empty when the device has no + /// access (never granted, or cleared). + pub fn grants_for(&self, device: &DeviceId) -> Vec { + self.grants + .lock() + .unwrap() + .get(device) + .map(|s| s.iter().cloned().collect()) + .unwrap_or_default() + } +} + +#[async_trait] +impl DeviceSecretAccess for InMemoryDeviceSecretAccess { + async fn set_device_grants( + &self, + grants: &[(DeviceId, Vec)], + ) -> Result<(), SecretAccessError> { + let mut store = self.grants.lock().unwrap(); + for (device, deployments) in grants { + if deployments.is_empty() { + store.remove(device); + } else { + store.insert(device.clone(), deployments.iter().cloned().collect()); + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn dn(s: &str) -> DeploymentName { + DeploymentName::try_new(s).expect("valid test name") + } + + #[tokio::test] + async fn sets_overwrites_and_clears_grants() { + let access = InMemoryDeviceSecretAccess::new(); + let dev = DeviceId::from("pi-01"); + + access + .set_device_grants(&[(dev.clone(), vec![dn("web"), dn("db")])]) + .await + .unwrap(); + assert_eq!(access.grants_for(&dev), vec![dn("db"), dn("web")]); + + // Absolute, not additive: a smaller set revokes the rest. + access + .set_device_grants(&[(dev.clone(), vec![dn("web")])]) + .await + .unwrap(); + assert_eq!(access.grants_for(&dev), vec![dn("web")]); + + // Empty clears all access. + access + .set_device_grants(&[(dev.clone(), vec![])]) + .await + .unwrap(); + assert!(access.grants_for(&dev).is_empty()); + } + + #[tokio::test] + async fn batch_touches_only_listed_devices() { + let access = InMemoryDeviceSecretAccess::new(); + let a = DeviceId::from("a"); + let b = DeviceId::from("b"); + access + .set_device_grants(&[(a.clone(), vec![dn("web")])]) + .await + .unwrap(); + access + .set_device_grants(&[(b.clone(), vec![dn("db")])]) + .await + .unwrap(); + assert_eq!(access.grants_for(&a), vec![dn("web")]); + assert_eq!(access.grants_for(&b), vec![dn("db")]); + } +} diff --git a/harmony/src/modules/openbao/identity.rs b/harmony/src/modules/openbao/identity.rs new file mode 100644 index 00000000..ff062179 --- /dev/null +++ b/harmony/src/modules/openbao/identity.rs @@ -0,0 +1,201 @@ +//! OpenBao-backed [`DeviceSecretAccess`]. +//! +//! A device's secret access *is* its OpenBao identity entity's policy +//! list. We map one deployment → one policy named `deployment-` +//! (created by [`super::OpenbaoSetupScore`]) and set the entity's +//! policies to exactly the device's current deployments. +//! +//! Why entity-by-policy and not a JWT claim: the device logs in once and +//! keeps its token; reassigning it only rewrites the entity's policy +//! list, which its existing token sees on the next call — no re-auth, no +//! thundering herd on bulk reassignment. +//! +//! Entity resolution is the subtle part. OpenBao's JWT auth +//! auto-provisions an entity (random name) + alias (name = the +//! `user_claim` value, i.e. the device id) on a device's first login, so +//! addressing an entity by a name *we* invent would write to a +//! different, alias-less entity the device never uses. We instead +//! resolve device id → entity via its alias, and create the +//! entity + alias ourselves when the device hasn't logged in yet (a +//! later login reuses the same alias). Both paths converge on writing +//! policies by canonical entity id. + +use async_trait::async_trait; +use reqwest::StatusCode; +use serde_json::json; +use tokio::sync::OnceCell; + +use crate::modules::fleet::secret_access::{DeviceId, DeviceSecretAccess, SecretAccessError}; +use harmony_reconciler_contracts::DeploymentName; + +const DEFAULT_JWT_MOUNT: &str = "jwt"; + +pub struct OpenBaoDeviceSecretAccess { + client: reqwest::Client, + base_url: String, + token: String, + jwt_mount: String, + /// JWT auth mount accessor, resolved once and reused — needed to bind + /// an entity alias to the device's login identity. + jwt_accessor: OnceCell, +} + +impl OpenBaoDeviceSecretAccess { + pub fn new(base_url: String, token: String) -> Self { + Self::with_jwt_mount(base_url, token, DEFAULT_JWT_MOUNT.to_string()) + } + + pub fn with_jwt_mount(base_url: String, token: String, jwt_mount: String) -> Self { + Self { + client: reqwest::Client::new(), + base_url: base_url.trim_end_matches('/').to_string(), + token, + jwt_mount, + jwt_accessor: OnceCell::new(), + } + } + + fn err(context: impl std::fmt::Display, e: impl std::fmt::Display) -> SecretAccessError { + SecretAccessError::Backend(format!("{context}: {e}")) + } + + async fn post( + &self, + path: &str, + body: serde_json::Value, + ) -> Result { + self.client + .post(format!("{}/v1/{path}", self.base_url)) + .header("X-Vault-Token", &self.token) + .json(&body) + .send() + .await + .map_err(|e| Self::err(format!("POST {path}"), e)) + } + + /// JWT mount accessor from `sys/auth`, cached for the client's life. + async fn jwt_accessor(&self) -> Result<&str, SecretAccessError> { + self.jwt_accessor + .get_or_try_init(|| async { + let body: serde_json::Value = self + .client + .get(format!("{}/v1/sys/auth", self.base_url)) + .header("X-Vault-Token", &self.token) + .send() + .await + .map_err(|e| Self::err("GET sys/auth", e))? + .error_for_status() + .map_err(|e| Self::err("GET sys/auth", e))? + .json() + .await + .map_err(|e| Self::err("parse sys/auth", e))?; + // sys/auth nests mounts under `data` over the HTTP API but + // emits them at the document root via the CLI; tolerate both. + let mount_key = format!("{}/", self.jwt_mount); + body.get("data") + .and_then(|d| d.get(&mount_key)) + .or_else(|| body.get(&mount_key)) + .and_then(|m| m.get("accessor")) + .and_then(|a| a.as_str()) + .map(str::to_string) + .ok_or_else(|| { + Self::err( + "resolve jwt accessor", + format!("mount '{mount_key}' not found in sys/auth"), + ) + }) + }) + .await + .map(String::as_str) + } + + /// Canonical entity id for `device`, resolved via its login alias. + /// Creates the entity + alias when the device hasn't logged in yet so + /// access can be granted ahead of first login; a later login reuses + /// the same alias. + async fn entity_id(&self, device: &str) -> Result { + let accessor = self.jwt_accessor().await?.to_string(); + + // Common path: the alias already exists (device logged in, or we + // created it on a previous sync). + let resp = self + .post( + "identity/lookup/entity", + json!({ "alias_name": device, "alias_mount_accessor": accessor }), + ) + .await?; + if resp.status().is_success() { + // A miss returns 204 with an empty body, not an error status. + let text = resp + .text() + .await + .map_err(|e| Self::err("read entity lookup", e))?; + if !text.is_empty() { + let body: serde_json::Value = + serde_json::from_str(&text).map_err(|e| Self::err("parse entity lookup", e))?; + if let Some(id) = body + .get("data") + .and_then(|d| d.get("id")) + .and_then(|v| v.as_str()) + { + return Ok(id.to_string()); + } + } + } + + // Miss — create the entity (named after the device) and bind the + // alias to its JWT login identity. + let body: serde_json::Value = self + .post("identity/entity", json!({ "name": device })) + .await? + .error_for_status() + .map_err(|e| Self::err("create entity", e))? + .json() + .await + .map_err(|e| Self::err("parse create entity", e))?; + let id = body + .get("data") + .and_then(|d| d.get("id")) + .and_then(|v| v.as_str()) + .ok_or_else(|| Self::err("create entity", "response missing data.id"))? + .to_string(); + + let alias = self + .post( + "identity/entity-alias", + json!({ "name": device, "canonical_id": id, "mount_accessor": accessor }), + ) + .await?; + // A concurrent login may have just auto-provisioned the alias; + // OpenBao answers 400 "alias already exists" — treat as success. + if !alias.status().is_success() && alias.status() != StatusCode::BAD_REQUEST { + return Err(Self::err("create entity alias", alias.status())); + } + Ok(id) + } +} + +#[async_trait] +impl DeviceSecretAccess for OpenBaoDeviceSecretAccess { + async fn set_device_grants( + &self, + grants: &[(DeviceId, Vec)], + ) -> Result<(), SecretAccessError> { + for (device, deployments) in grants { + let device = String::from(device.clone()); + let policies: Vec = deployments + .iter() + .map(|d| format!("deployment-{}", d.as_str())) + .collect(); + let id = self.entity_id(&device).await?; + self.post( + &format!("identity/entity/id/{id}"), + json!({ "policies": policies }), + ) + .await? + .error_for_status() + .map_err(|e| Self::err(format!("set policies for {device}"), e))?; + } + Ok(()) + } +} diff --git a/harmony/src/modules/openbao/mod.rs b/harmony/src/modules/openbao/mod.rs index 8b612d9f..67315ed9 100644 --- a/harmony/src/modules/openbao/mod.rs +++ b/harmony/src/modules/openbao/mod.rs @@ -1,3 +1,4 @@ +pub mod identity; pub mod setup; use std::str::FromStr; @@ -13,6 +14,7 @@ use crate::{ topology::{HelmCommand, K8sclient, Topology}, }; +pub use identity::OpenBaoDeviceSecretAccess; pub use setup::{OpenbaoJwtAuth, OpenbaoPolicy, OpenbaoSetupScore, OpenbaoUser, cached_root_token}; const DEFAULT_NAMESPACE: &str = "openbao"; diff --git a/harmony/src/modules/openbao/setup.rs b/harmony/src/modules/openbao/setup.rs index 5c6c62f4..a4b8d40a 100644 --- a/harmony/src/modules/openbao/setup.rs +++ b/harmony/src/modules/openbao/setup.rs @@ -40,7 +40,6 @@ pub struct OpenbaoJwtAuth { pub role_name: String, pub bound_audiences: String, pub user_claim: String, - pub policies: Vec, pub ttl: String, pub max_ttl: String, } @@ -479,7 +478,6 @@ impl OpenbaoSetupInterpret { } } - let policies = jwt.policies.join(","); self.bao( k8s, root_token, @@ -490,7 +488,6 @@ impl OpenbaoSetupInterpret { "role_type=jwt", &format!("bound_audiences={}", jwt.bound_audiences), &format!("user_claim={}", jwt.user_claim), - &format!("policies={}", policies), &format!("ttl={}", jwt.ttl), &format!("max_ttl={}", jwt.max_ttl), "token_type=service", @@ -504,10 +501,6 @@ impl OpenbaoSetupInterpret { )) })?; - info!( - "[OpenbaoSetup] JWT role '{}' created (policies: {})", - jwt.role_name, policies - ); Ok(()) } } diff --git a/harmony_config/src/lib.rs b/harmony_config/src/lib.rs index dde30471..fabe00ee 100644 --- a/harmony_config/src/lib.rs +++ b/harmony_config/src/lib.rs @@ -210,19 +210,20 @@ async fn openbao_from_env(namespace: &str) -> Option> { }; let env = |k: &str| std::env::var(k).ok(); - let store = harmony_secret::OpenbaoSecretStore::new( - url, - env("OPENBAO_KV_MOUNT").unwrap_or_else(|| "secret".to_string()), - env("OPENBAO_AUTH_MOUNT").unwrap_or_else(|| "jwt".to_string()), - env("OPENBAO_SKIP_TLS").as_deref() == Some("true"), - env("OPENBAO_TOKEN"), - env("OPENBAO_USERNAME"), - env("OPENBAO_PASSWORD"), - env("HARMONY_SSO_URL"), - env("HARMONY_SSO_CLIENT_ID"), - None, - None, - ) + let store = harmony_secret::OpenbaoSecretStore::new(harmony_secret::OpenbaoStoreOptions { + base_url: url, + kv_mount: env("OPENBAO_KV_MOUNT").unwrap_or_else(|| "secret".to_string()), + auth_mount: env("OPENBAO_AUTH_MOUNT").unwrap_or_else(|| "jwt".to_string()), + skip_tls: env("OPENBAO_SKIP_TLS").as_deref() == Some("true"), + token: env("OPENBAO_TOKEN"), + username: env("OPENBAO_USERNAME"), + password: env("OPENBAO_PASSWORD"), + zitadel_sso_url: env("HARMONY_SSO_URL"), + zitadel_client_id: env("HARMONY_SSO_CLIENT_ID"), + jwt_role: None, + jwt_auth_mount: None, + zitadel_jwt_bearer: None, + }) .await; match store { diff --git a/harmony_secret/Cargo.toml b/harmony_secret/Cargo.toml index 8d8b9356..266eb1a6 100644 --- a/harmony_secret/Cargo.toml +++ b/harmony_secret/Cargo.toml @@ -12,6 +12,7 @@ license.workspace = true unexpected_cfgs = { level = "warn", check-cfg = ['cfg(secrete2etest)'] } [dependencies] +harmony_zitadel_auth = { path = "../harmony_zitadel_auth" } harmony_secret_derive = { version = "0.1.0", path = "../harmony_secret_derive" } serde = { version = "1.0.209", features = ["derive", "rc"] } serde_json = "1.0.127" diff --git a/harmony_secret/src/lib.rs b/harmony_secret/src/lib.rs index cda2786f..99d32f3c 100644 --- a/harmony_secret/src/lib.rs +++ b/harmony_secret/src/lib.rs @@ -26,7 +26,7 @@ use serde::{Serialize, de::DeserializeOwned}; use std::fmt; use store::InfisicalSecretStore; use store::LocalFileSecretStore; -pub use store::OpenbaoSecretStore; +pub use store::{OpenbaoSecretStore, OpenbaoStoreOptions}; use thiserror::Error; use tokio::sync::OnceCell; @@ -85,19 +85,20 @@ async fn init_secret_manager() -> SecretManager { let store: Box = match store_type.as_str() { "file" => Box::new(LocalFileSecretStore), "openbao" | "vault" => { - let store = OpenbaoSecretStore::new( - OPENBAO_URL.clone().expect("Openbao/Vault URL must be set, see harmony_secret config for ways to provide it. You can try with OPENBAO_URL or VAULT_ADDR"), - OPENBAO_KV_MOUNT.clone(), - OPENBAO_AUTH_MOUNT.clone(), - *OPENBAO_SKIP_TLS, - OPENBAO_TOKEN.clone(), - OPENBAO_USERNAME.clone(), - OPENBAO_PASSWORD.clone(), - HARMONY_SSO_URL.clone(), - HARMONY_SSO_CLIENT_ID.clone(), - None, - None, - ) + let store = OpenbaoSecretStore::new(OpenbaoStoreOptions { + base_url: OPENBAO_URL.clone().expect("Openbao/Vault URL must be set, see harmony_secret config for ways to provide it. You can try with OPENBAO_URL or VAULT_ADDR"), + kv_mount: OPENBAO_KV_MOUNT.clone(), + auth_mount: OPENBAO_AUTH_MOUNT.clone(), + skip_tls: *OPENBAO_SKIP_TLS, + token: OPENBAO_TOKEN.clone(), + username: OPENBAO_USERNAME.clone(), + password: OPENBAO_PASSWORD.clone(), + zitadel_sso_url: HARMONY_SSO_URL.clone(), + zitadel_client_id: HARMONY_SSO_CLIENT_ID.clone(), + jwt_role: None, + jwt_auth_mount: None, + zitadel_jwt_bearer: None, + }) .await .expect("Failed to initialize Openbao/Vault secret store"); Box::new(store) diff --git a/harmony_secret/src/store/mod.rs b/harmony_secret/src/store/mod.rs index 8179291d..9ea55a4c 100644 --- a/harmony_secret/src/store/mod.rs +++ b/harmony_secret/src/store/mod.rs @@ -5,4 +5,4 @@ pub mod zitadel; pub use infisical::InfisicalSecretStore; pub use local_file::LocalFileSecretStore; -pub use openbao::OpenbaoSecretStore; +pub use openbao::{OpenbaoSecretStore, OpenbaoStoreOptions}; diff --git a/harmony_secret/src/store/openbao.rs b/harmony_secret/src/store/openbao.rs index d0110cd3..ccddf77d 100644 --- a/harmony_secret/src/store/openbao.rs +++ b/harmony_secret/src/store/openbao.rs @@ -1,10 +1,13 @@ use crate::{SecretStore, SecretStoreError}; use async_trait::async_trait; +use harmony_zitadel_auth::{MachineKeyFile, ZitadelJwtBearer}; use log::{debug, info, warn}; use serde::{Deserialize, Serialize}; use std::fmt::Debug; use std::fs; use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::Mutex; use vaultrs::auth; use vaultrs::client::{VaultClient, VaultClientSettingsBuilder}; use vaultrs::kv2; @@ -30,21 +33,47 @@ impl From for AuthInfo { } pub struct OpenbaoSecretStore { - client: VaultClient, + client: Mutex, kv_mount: String, auth_mount: String, + jwt_bearer: Option>, + jwt_role: Option, + jwt_auth_mount: Option, + base_url: String, + skip_tls: bool, } impl Debug for OpenbaoSecretStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("OpenbaoSecretStore") - .field("client", &self.client.settings) .field("kv_mount", &self.kv_mount) .field("auth_mount", &self.auth_mount) .finish() } } +pub struct OpenbaoStoreOptions { + pub base_url: String, + pub kv_mount: String, + pub auth_mount: String, + pub skip_tls: bool, + pub token: Option, + pub username: Option, + pub password: Option, + pub zitadel_sso_url: Option, + pub zitadel_client_id: Option, + pub zitadel_jwt_bearer: Option, + pub jwt_role: Option, + pub jwt_auth_mount: Option, +} + +pub struct ZitadelJwtBearerConfig { + pub key_path: Option, + pub key_json: Option, + pub oidc_issuer_url: String, + pub audience: String, +} + impl OpenbaoSecretStore { /// Creates a new Openbao/Vault secret store with authentication. /// @@ -52,34 +81,35 @@ impl OpenbaoSecretStore { /// - `auth_mount`: The auth method mount path (e.g., `"userpass"`). Used for userpass authentication. /// - `zitadel_sso_url`: Zitadel OIDC server URL (e.g., `"https://sso.example.com"`). If provided along with `zitadel_client_id`, Zitadel OIDC device flow will be attempted. /// - `zitadel_client_id`: OIDC client ID registered in Zitadel. - // TODO(clippy): refactor to a typed options struct - #[allow(clippy::too_many_arguments)] - pub async fn new( - base_url: String, - kv_mount: String, - auth_mount: String, - skip_tls: bool, - token: Option, - username: Option, - password: Option, - zitadel_sso_url: Option, - zitadel_client_id: Option, - jwt_role: Option, - jwt_auth_mount: Option, - ) -> Result { - info!("OPENBAO_STORE: Initializing client for URL: {base_url}"); + pub async fn new(options: OpenbaoStoreOptions) -> Result { + info!( + "OPENBAO_STORE: Initializing client for URL: {}", + options.base_url + ); // 1. If token is provided via env var, use it directly - if let Some(t) = token { + if let Some(t) = &options.token { debug!("OPENBAO_STORE: Using token from environment variable"); - return Self::with_token(&base_url, skip_tls, &t, &kv_mount, &auth_mount); + return Self::with_token( + &options.base_url, + options.skip_tls, + t, + &options.kv_mount, + &options.auth_mount, + ); } // 2. Try to load cached token - let cache_path = Self::get_token_cache_path(&base_url); + let cache_path = Self::get_token_cache_path(&options.base_url); if let Ok(mut cached_token) = Self::load_cached_token(&cache_path) { debug!("OPENBAO_STORE: Found cached token, validating..."); - if Self::validate_token(&base_url, skip_tls, &cached_token.client_token).await { + if Self::validate_token( + &options.base_url, + options.skip_tls, + &cached_token.client_token, + ) + .await + { info!("OPENBAO_STORE: Cached token is valid"); // Best-effort renewal: extend the server-side lease so @@ -89,7 +119,13 @@ impl OpenbaoSecretStore { // blip, etc.) doesn't block the caller — we just keep // the validated cached token as-is and let the next // invocation try again. - match Self::renew_self(&base_url, skip_tls, &cached_token.client_token).await { + match Self::renew_self( + &options.base_url, + options.skip_tls, + &cached_token.client_token, + ) + .await + { Ok(new_lease) => { info!("OPENBAO_STORE: cached token renewed, lease_duration={new_lease}s"); cached_token.lease_duration = Some(new_lease); @@ -106,26 +142,59 @@ impl OpenbaoSecretStore { } return Self::with_token( - &base_url, - skip_tls, + &options.base_url, + options.skip_tls, &cached_token.client_token, - &kv_mount, - &auth_mount, + &options.kv_mount, + &options.auth_mount, ); } warn!("OPENBAO_STORE: Cached token is invalid or expired"); } - // 3. Try Zitadel OIDC device flow if configured - if let (Some(sso_url), Some(client_id)) = (zitadel_sso_url, zitadel_client_id) { + // 3. Try Zitadel JWT-bearer for headless devices if configured + if let Some(jwt_bearer_config) = &options.zitadel_jwt_bearer { + info!("OPENBAO_STORE: Attempting Zitadel JWT-bearer authentication"); + match Self::zitadel_jwt_bearer(jwt_bearer_config, options.skip_tls) { + Ok(jwt_bearer) => { + let store = Self::with_token_and_jwt( + &options.base_url, + options.skip_tls, + "", + &options.kv_mount, + &options.auth_mount, + Some(Arc::new(jwt_bearer)), + options.jwt_role.clone(), + options.jwt_auth_mount.clone(), + )?; + match store.refresh_auth().await { + Ok(()) => { + info!("OPENBAO_STORE: Zitadel JWT-bearer authentication successful"); + return Ok(store); + } + Err(e) => warn!( + "OPENBAO_STORE: Zitadel JWT-bearer failed: {e}, trying other auth methods" + ), + } + } + Err(e) => warn!( + "OPENBAO_STORE: Zitadel JWT-bearer setup failed: {e}, trying other auth methods" + ), + } + } + + // 4. Try Zitadel OIDC device flow if configured + if let (Some(sso_url), Some(client_id)) = + (&options.zitadel_sso_url, &options.zitadel_client_id) + { info!("OPENBAO_STORE: Attempting Zitadel OIDC device flow"); match Self::authenticate_zitadel_oidc( - &base_url, - &sso_url, - &client_id, - skip_tls, - jwt_role.as_deref(), - jwt_auth_mount.as_deref(), + &options.base_url, + sso_url, + client_id, + options.skip_tls, + options.jwt_role.as_deref(), + options.jwt_auth_mount.as_deref(), ) .await { @@ -141,11 +210,11 @@ impl OpenbaoSecretStore { warn!("OPENBAO_STORE: Failed to cache OIDC token: {e}"); } return Self::with_token( - &base_url, - skip_tls, + &options.base_url, + options.skip_tls, &auth_info.client_token, - &kv_mount, - &auth_mount, + &options.kv_mount, + &options.auth_mount, ); } Err(e) => { @@ -154,8 +223,8 @@ impl OpenbaoSecretStore { } } - // 4. Authenticate with username/password - let (user, pass) = match (username, password) { + // 5. Authenticate with username/password + let (user, pass) = match (&options.username, &options.password) { (Some(u), Some(p)) => (u, p), _ => { return Err(SecretStoreError::Store( @@ -167,8 +236,14 @@ impl OpenbaoSecretStore { } }; - let token = - Self::authenticate_userpass(&base_url, &auth_mount, skip_tls, &user, &pass).await?; + let token = Self::authenticate_userpass( + &options.base_url, + &options.auth_mount, + options.skip_tls, + user, + pass, + ) + .await?; // Cache the token if let Err(e) = Self::cache_token(&cache_path, &token) { @@ -176,11 +251,11 @@ impl OpenbaoSecretStore { } Self::with_token( - &base_url, - skip_tls, + &options.base_url, + options.skip_tls, &token.client_token, - &kv_mount, - &auth_mount, + &options.kv_mount, + &options.auth_mount, ) } @@ -206,14 +281,112 @@ impl OpenbaoSecretStore { .map_err(|e| SecretStoreError::Store(e.into())) } - /// Create a client with an existing token - fn with_token( + pub async fn refresh_auth(&self) -> Result<(), SecretStoreError> { + let bearer = self.jwt_bearer.as_ref().ok_or_else(|| { + SecretStoreError::Store("Zitadel JWT-bearer auth is not configured".into()) + })?; + let role = self + .jwt_role + .as_deref() + .ok_or_else(|| SecretStoreError::Store("OpenBao JWT role is not configured".into()))?; + let jwt_auth_mount = self.jwt_auth_mount.as_deref().unwrap_or("jwt"); + + bearer.invalidate_cache(); + let jwt = bearer + .bearer_token() + .await + .map_err(|e| SecretStoreError::Store(e.into()))?; + let token = + Self::jwt_login(&self.base_url, self.skip_tls, jwt_auth_mount, role, &jwt).await?; + let client = Self::build_vault_client(&self.base_url, self.skip_tls, &token)?; + + *self.client.lock().await = client; + Ok(()) + } + + fn zitadel_jwt_bearer( + config: &ZitadelJwtBearerConfig, + danger_accept_invalid_certs: bool, + ) -> Result { + let key = match config.key_json.as_deref().map(str::trim) { + Some(raw) if !raw.is_empty() => serde_json::from_str::(raw), + _ => { + let path = config.key_path.as_ref().ok_or_else(|| { + SecretStoreError::Store( + "Zitadel JWT-bearer requires key_json or key_path".into(), + ) + })?; + let raw = + fs::read_to_string(path).map_err(|e| SecretStoreError::Store(e.into()))?; + serde_json::from_str::(&raw) + } + } + .map_err(|e| SecretStoreError::Store(e.into()))?; + + ZitadelJwtBearer::new( + key, + config.oidc_issuer_url.clone(), + config.audience.clone(), + danger_accept_invalid_certs, + ) + .map_err(|e| SecretStoreError::Store(e.into())) + } + + async fn jwt_login( + base_url: &str, + skip_tls: bool, + jwt_auth_mount: &str, + role: &str, + jwt: &str, + ) -> Result { + let mut builder = reqwest::Client::builder(); + if skip_tls { + builder = builder.danger_accept_invalid_certs(true); + } + let client = builder + .build() + .map_err(|e| SecretStoreError::Store(e.into()))?; + let url = format!( + "{}/v1/auth/{}/login", + base_url.trim_end_matches('/'), + jwt_auth_mount.trim_matches('/') + ); + let response = client + .post(&url) + .json(&serde_json::json!({ "role": role, "jwt": jwt })) + .send() + .await + .map_err(|e| SecretStoreError::Store(e.into()))?; + let status = response.status(); + let body = response + .text() + .await + .map_err(|e| SecretStoreError::Store(e.into()))?; + if !status.is_success() { + return Err(SecretStoreError::Store( + format!("OpenBao JWT login returned {status}: {body}").into(), + )); + } + + let parsed: serde_json::Value = + serde_json::from_str(&body).map_err(|e| SecretStoreError::Store(e.into()))?; + parsed + .get("auth") + .and_then(|auth| auth.get("client_token")) + .and_then(|token| token.as_str()) + .map(str::to_string) + .ok_or_else(|| { + SecretStoreError::Store( + format!("OpenBao JWT login response missing auth.client_token: {body}").into(), + ) + }) + } + + fn build_vault_client( base_url: &str, skip_tls: bool, token: &str, - kv_mount: &str, - auth_mount: &str, - ) -> Result { + ) -> Result { let mut settings = VaultClientSettingsBuilder::default(); settings.address(base_url).token(token); @@ -222,17 +395,48 @@ impl OpenbaoSecretStore { settings.verify(false); } - let client = VaultClient::new( + VaultClient::new( settings .build() .map_err(|e| SecretStoreError::Store(Box::new(e)))?, ) - .map_err(|e| SecretStoreError::Store(Box::new(e)))?; + .map_err(|e| SecretStoreError::Store(Box::new(e))) + } + + /// Create a client with an existing token + fn with_token( + base_url: &str, + skip_tls: bool, + token: &str, + kv_mount: &str, + auth_mount: &str, + ) -> Result { + Self::with_token_and_jwt( + base_url, skip_tls, token, kv_mount, auth_mount, None, None, None, + ) + } + + fn with_token_and_jwt( + base_url: &str, + skip_tls: bool, + token: &str, + kv_mount: &str, + auth_mount: &str, + jwt_bearer: Option>, + jwt_role: Option, + jwt_auth_mount: Option, + ) -> Result { + let client = Self::build_vault_client(base_url, skip_tls, token)?; Ok(Self { - client, + client: Mutex::new(client), kv_mount: kv_mount.to_string(), auth_mount: auth_mount.to_string(), + base_url: base_url.to_string(), + skip_tls, + jwt_bearer, + jwt_role, + jwt_auth_mount, }) } @@ -411,18 +615,20 @@ impl SecretStore for OpenbaoSecretStore { info!("OPENBAO_STORE: Getting key '{key}' from namespace '{namespace}'"); debug!("OPENBAO_STORE: Request path: {path}"); - let data: serde_json::Value = kv2::read(&self.client, &self.kv_mount, &path) - .await - .map_err(|e| { - if e.to_string().contains("does not exist") || e.to_string().contains("404") { - SecretStoreError::NotFound { - namespace: namespace.to_string(), - key: key.to_string(), + let client = self.client.lock().await; + let data: serde_json::Value = + kv2::read(&*client, &self.kv_mount, &path) + .await + .map_err(|e| { + if e.to_string().contains("does not exist") || e.to_string().contains("404") { + SecretStoreError::NotFound { + namespace: namespace.to_string(), + key: key.to_string(), + } + } else { + SecretStoreError::Store(Box::new(e)) } - } else { - SecretStoreError::Store(Box::new(e)) - } - })?; + })?; let value = data.get("value").and_then(|v| v.as_str()).ok_or_else(|| { SecretStoreError::Store("Secret does not contain expected 'value' field".into()) @@ -447,8 +653,9 @@ impl SecretStore for OpenbaoSecretStore { let data = serde_json::json!({ "value": value_str }); + let client = self.client.lock().await; - kv2::set(&self.client, &self.kv_mount, &path, &data) + kv2::set(&*client, &self.kv_mount, &path, &data) .await .map_err(|e| SecretStoreError::Store(Box::new(e)))?; diff --git a/harmony_zitadel_auth/src/axum_login_flow.rs b/harmony_zitadel_auth/src/axum_login_flow.rs index 96a9391f..0a61613e 100644 --- a/harmony_zitadel_auth/src/axum_login_flow.rs +++ b/harmony_zitadel_auth/src/axum_login_flow.rs @@ -9,8 +9,8 @@ use base64::engine::general_purpose::URL_SAFE_NO_PAD; use crate::config::ZitadelAuthConfig; use crate::jwks::JwksCache; use crate::login::{ - AuthCallbackQuery, RawAuthCallbackQuery, TokenResponse, build_login_attempt, build_logout_url, - exchange_code_for_token, jwt_exp, validate_callback_state, + AuthCallbackQuery, LoginQuery, RawAuthCallbackQuery, TokenResponse, build_login_attempt, + build_logout_url, exchange_code_for_token, jwt_exp, valid_next, validate_callback_state, }; use crate::session::LoginAttemptCookie; @@ -23,8 +23,9 @@ pub const HARMONY_SESSION_COOKIE: &str = "harmony_fleet_session"; pub async fn login_handler( jar: PrivateCookieJar, State(config): State, + Query(query): Query, ) -> Response { - match build_login_response(jar, &config) { + match build_login_response(jar, &config, query) { Ok(r) => r.into_response(), Err(e) => auth_error_response(e), } @@ -33,9 +34,16 @@ pub async fn login_handler( fn build_login_response( jar: PrivateCookieJar, config: &ZitadelAuthConfig, + query: LoginQuery, ) -> Result { let attempt = build_login_attempt(config)?; - let cookie_payload = LoginAttemptCookie::from(&attempt); + let mut cookie_payload = LoginAttemptCookie::from(&attempt); + cookie_payload.next = Some( + query + .next + .filter(|next| valid_next(next)) + .unwrap_or_else(|| "/".to_string()), + ); let cookie_value = URL_SAFE_NO_PAD.encode(serde_json::to_vec(&cookie_payload)?); let mut builder = Cookie::build((LOGIN_ATTEMPT_COOKIE, cookie_value)) @@ -113,7 +121,12 @@ async fn build_callback_response( } let session_jar = session_jar.add(session_cookie(&tokens, config)); - Ok((jar, session_jar, Redirect::to("/")).into_response()) + let next = attempt + .next + .as_deref() + .filter(|next| valid_next(next)) + .unwrap_or("/"); + Ok((jar, session_jar, Redirect::to(next)).into_response()) } AuthCallbackQuery::Failure { error, diff --git a/harmony_zitadel_auth/src/config.rs b/harmony_zitadel_auth/src/config.rs index 52019426..34e81415 100644 --- a/harmony_zitadel_auth/src/config.rs +++ b/harmony_zitadel_auth/src/config.rs @@ -33,13 +33,13 @@ impl ZitadelAuthConfig { } } -pub const ZITADEL_BASE_ENV: &str = "FLEET_AUTH_ZITADEL_BASE"; +pub const ZITADEL_BASE_ENV: &str = "HARMONY_SSO_ZITADEL_BASE"; pub const BASE_URL_ENV: &str = "BASE_URL"; -pub const CLIENT_ID_ENV: &str = "FLEET_AUTH_CLIENT_ID"; -pub const SCOPE_ENV: &str = "FLEET_AUTH_SCOPE"; -pub const TRUSTED_AUDIENCES_ENV: &str = "FLEET_AUTH_TRUSTED_AUDIENCES"; -pub const LOGOUT_REDIRECT_URI_ENV: &str = "FLEET_AUTH_LOGOUT_REDIRECT_URI"; -pub const COOKIE_KEY_ENV: &str = "FLEET_OPERATOR_COOKIE_KEY_B64"; +pub const CLIENT_ID_ENV: &str = "HARMONY_SSO_CLIENT_ID"; +pub const SCOPE_ENV: &str = "HARMONY_SSO_SCOPE"; +pub const TRUSTED_AUDIENCES_ENV: &str = "HARMONY_SSO_TRUSTED_AUDIENCES"; +pub const LOGOUT_REDIRECT_URI_ENV: &str = "HARMONY_SSO_LOGOUT_REDIRECT_URI"; +pub const COOKIE_KEY_ENV: &str = "HARMONY_COOKIE_KEY_B64"; pub fn config_from_env() -> ZitadelAuthConfig { ZitadelAuthConfig { diff --git a/harmony_zitadel_auth/src/jwt_bearer.rs b/harmony_zitadel_auth/src/jwt_bearer.rs new file mode 100644 index 00000000..97743a91 --- /dev/null +++ b/harmony_zitadel_auth/src/jwt_bearer.rs @@ -0,0 +1,367 @@ +use serde::Deserialize; +use std::sync::Mutex; + +use anyhow::{Context, Result}; +use jsonwebtoken::{Algorithm, EncodingKey, Header as JwtHeader}; + +pub struct ZitadelJwtBearer { + key: MachineKeyFile, + oidc_issuer_url: String, + audience: String, + http: reqwest::Client, + cache: Mutex>, +} + +/// Refresh tokens this many seconds before their advertised expiry. +/// Five minutes leaves headroom for clock skew, slow networks, and +/// the round-trip cost of re-minting against Zitadel. +pub const TOKEN_REFRESH_LEEWAY_SECS: i64 = 5 * 60; + +/// Lifetime of the JWT *assertion* (the client-side bearer JWT we sign +/// to authenticate to Zitadel's token endpoint). Zitadel rejects +/// assertions with `exp - iat > 60s`; one minute is the safe ceiling. +pub const ASSERTION_LIFETIME_SECS: i64 = 60; + +/// JSON keyfile content as Zitadel emits it for a `KEY_TYPE_JSON` +/// machine key. The `key` is a PEM-encoded RSA private key. +#[derive(Debug, Clone, Deserialize)] +pub struct MachineKeyFile { + #[serde(rename = "type")] + pub _type: String, + #[serde(rename = "keyId")] + pub key_id: String, + pub key: String, + #[serde(rename = "userId")] + pub user_id: String, +} + +#[derive(Debug, Clone)] +pub struct CachedToken { + pub(crate) access_token: String, + /// Unix seconds at which the token is no longer trusted by + /// `cached_if_fresh`. Computed from the OAuth response's `expires_in` + /// and the local clock at mint time. + pub(crate) expires_at_unix: i64, +} + +#[derive(Deserialize)] +struct TokenResponse { + access_token: String, + #[serde(default)] + expires_in: Option, +} + +impl ZitadelJwtBearer { + pub fn new( + key: MachineKeyFile, + oidc_issuer_url: String, + audience: String, + danger_accept_invalid_certs: bool, + ) -> Result { + let http = reqwest::Client::builder() + .danger_accept_invalid_certs(danger_accept_invalid_certs) + .timeout(std::time::Duration::from_secs(10)) + .build() + .context("building HTTP client for Zitadel token endpoint")?; + + Ok(Self { + key, + oidc_issuer_url, + audience, + http, + cache: Mutex::new(None), + }) + } + + pub async fn bearer_token(&self) -> Result { + { + let now = chrono::Utc::now().timestamp(); + let guard = self.cache.lock().unwrap(); + if let Some(cached) = guard.as_ref() { + if cached.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now { + return Ok(cached.access_token.clone()); + } + } + } + + let now = chrono::Utc::now().timestamp(); + let assertion = build_assertion(&self.key, &self.oidc_issuer_url, now)?; + let scope = build_scope(&self.audience); + let token_url = build_token_url(&self.oidc_issuer_url); + + let resp = self + .http + .post(&token_url) + .form(&[ + ( + "grant_type", + "urn:ietf:params:oauth:grant-type:jwt-bearer".to_string(), + ), + ("assertion", assertion), + ("scope", scope), + ]) + .send() + .await + .with_context(|| format!("POST {token_url}"))?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + anyhow::bail!("Zitadel token endpoint returned {status}: {body}"); + } + + let tr: TokenResponse = resp.json().await.context("parsing token response")?; + let expires_in = tr.expires_in.unwrap_or(3600); + let token = tr.access_token.clone(); + + *self.cache.lock().unwrap() = Some(CachedToken { + access_token: tr.access_token, + expires_at_unix: now + expires_in, + }); + + Ok(token) + } + + pub fn invalidate_cache(&self) { + *self.cache.lock().unwrap() = None; + } + + #[cfg(test)] + fn cached_if_fresh(&self) -> Option { + let now = chrono::Utc::now().timestamp(); + let guard = self.cache.lock().unwrap(); + let cached = guard.as_ref()?; + (cached.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now) + .then(|| cached.access_token.clone()) + } +} + +/// Build the JWT-bearer assertion. Split out from the network path so +/// the claims + header shape can be unit-tested without an HTTP server, +/// and split internally into the (pure) claim/header builders so they +/// can be unit-tested without an RSA private key fixture. +pub(crate) fn build_assertion( + key: &MachineKeyFile, + oidc_issuer_url: &str, + now: i64, +) -> Result { + let claims = build_assertion_claims(key, oidc_issuer_url, now); + let header = build_assertion_header(key); + let assertion = jsonwebtoken::encode( + &header, + &claims, + &EncodingKey::from_rsa_pem(key.key.as_bytes()) + .context("parsing RSA private key from machine key file")?, + ) + .context("signing JWT assertion")?; + Ok(assertion) +} + +/// Pure claim payload for the JWT-bearer assertion. `iss == sub == userId` +/// is a Zitadel requirement; `aud` is Zitadel itself (the token endpoint +/// is reached via `oidc_issuer_url`); `exp - iat` MUST be ≤ 60 s or +/// Zitadel rejects. +pub(crate) fn build_assertion_claims( + key: &MachineKeyFile, + oidc_issuer_url: &str, + now: i64, +) -> serde_json::Value { + serde_json::json!({ + "iss": key.user_id, + "sub": key.user_id, + "aud": oidc_issuer_url, + "exp": now + ASSERTION_LIFETIME_SECS, + "iat": now, + }) +} + +/// JWT header for the assertion. The `kid` tells Zitadel which of the +/// machine user's registered keys to verify the signature against. +pub(crate) fn build_assertion_header(key: &MachineKeyFile) -> JwtHeader { + let mut header = JwtHeader::new(Algorithm::RS256); + header.kid = Some(key.key_id.clone()); + header +} + +/// Build the OAuth `scope` string for the token-bearer request. +/// +/// Three scopes are needed for the access token to be useful here: +/// +/// * `openid` — base OIDC requirement. +/// * `urn:zitadel:iam:org:projects:roles` (PLURAL "projects") — +/// tells Zitadel to include the role-claim block in the access +/// token. Without this, the callout sees "no authorized role +/// in token" even when the user has a project role grant. +/// * `urn:zitadel:iam:org:project:id::aud` (SINGULAR +/// "project") — adds to the access token's `aud` claim +/// so the callout's audience validation accepts the project +/// ID we're using as the JWT-bearer audience. +/// +/// The plural-vs-singular distinction is a Zitadel convention, +/// not a typo. Both scopes are required. +pub(crate) fn build_scope(audience: &str) -> String { + format!( + "openid \ + urn:zitadel:iam:org:projects:roles \ + urn:zitadel:iam:org:project:id:{audience}:aud" + ) +} + +/// Resolve the token endpoint URL, tolerating a trailing slash on +/// `oidc_issuer_url`. Without trimming, a configured issuer of +/// `https://sso.example.com/` produces `…//oauth/v2/token` which 404s. +pub(crate) fn build_token_url(oidc_issuer_url: &str) -> String { + format!("{}/oauth/v2/token", oidc_issuer_url.trim_end_matches('/')) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn fake_key() -> MachineKeyFile { + MachineKeyFile { + _type: "serviceaccount".to_string(), + key_id: "kid-371358469099356247".to_string(), + key: "PEM-PLACEHOLDER".to_string(), + user_id: "uid-371358469065801815".to_string(), + } + } + + fn bearer() -> ZitadelJwtBearer { + ZitadelJwtBearer::new( + fake_key(), + "http://sso.fleet.local:8080".to_string(), + "366378028009259037".to_string(), + false, + ) + .unwrap() + } + + #[test] + fn cached_token_within_leeway_is_treated_as_expired() { + let now = chrono::Utc::now().timestamp(); + let about_to_expire = CachedToken { + access_token: "x".to_string(), + expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS - 1, + }; + assert!( + about_to_expire.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS <= now, + "tokens within the leeway window must be considered expired" + ); + + let comfortable = CachedToken { + access_token: "x".to_string(), + expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS + 60, + }; + assert!( + comfortable.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now, + "tokens with comfortable headroom must be cache-hits" + ); + } + + #[test] + fn cached_if_fresh_returns_some_when_outside_leeway() { + let bearer = bearer(); + let now = chrono::Utc::now().timestamp(); + *bearer.cache.lock().unwrap() = Some(CachedToken { + access_token: "fresh".to_string(), + expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS + 60, + }); + + assert_eq!(bearer.cached_if_fresh(), Some("fresh".to_string())); + } + + #[test] + fn cached_if_fresh_returns_none_when_no_cache() { + assert_eq!(bearer().cached_if_fresh(), None); + } + + #[test] + fn assertion_claims_carry_iss_sub_aud_exp_iat() { + let now = 1_700_000_000; + let claims = build_assertion_claims(&fake_key(), "http://sso.fleet.local:8080", now); + assert_eq!(claims["iss"], "uid-371358469065801815"); + assert_eq!(claims["sub"], "uid-371358469065801815"); + assert_eq!(claims["aud"], "http://sso.fleet.local:8080"); + assert_eq!(claims["iat"].as_i64(), Some(now)); + assert_eq!(claims["exp"].as_i64(), Some(now + ASSERTION_LIFETIME_SECS)); + } + + #[test] + fn assertion_lifetime_locked_at_60_seconds() { + assert_eq!(ASSERTION_LIFETIME_SECS, 60); + } + + #[test] + fn assertion_header_carries_kid_and_rs256() { + let header = build_assertion_header(&fake_key()); + assert_eq!(header.alg, jsonwebtoken::Algorithm::RS256); + assert_eq!(header.kid.as_deref(), Some("kid-371358469099356247")); + } + + #[test] + fn scope_includes_plural_projects_roles() { + let s = build_scope("366378028009259037"); + assert!( + s.contains("urn:zitadel:iam:org:projects:roles"), + "scope must include the PLURAL projects-roles URN; got {s:?}" + ); + } + + #[test] + fn scope_audience_uses_singular_project_id_urn() { + let s = build_scope("366378028009259037"); + assert!( + s.contains("urn:zitadel:iam:org:project:id:366378028009259037:aud"), + "scope must include the SINGULAR project:id::aud URN; got {s:?}" + ); + } + + #[test] + fn scope_includes_openid_base() { + let s = build_scope("any"); + assert!( + s.split_whitespace().any(|tok| tok == "openid"), + "scope must include `openid` as a standalone token; got {s:?}" + ); + } + + #[test] + fn token_url_appends_oauth_endpoint() { + assert_eq!( + build_token_url("http://sso.fleet.local:8080"), + "http://sso.fleet.local:8080/oauth/v2/token" + ); + } + + #[test] + fn token_url_strips_single_trailing_slash() { + assert_eq!( + build_token_url("http://sso.fleet.local:8080/"), + "http://sso.fleet.local:8080/oauth/v2/token" + ); + } + + #[test] + fn token_url_strips_multiple_trailing_slashes() { + assert_eq!( + build_token_url("http://sso.fleet.local:8080///"), + "http://sso.fleet.local:8080/oauth/v2/token" + ); + } + + #[test] + fn machine_key_file_parses_zitadel_json_shape() { + let raw = r#"{ + "type": "serviceaccount", + "keyId": "371358469099356247", + "key": "-----BEGIN RSA PRIVATE KEY-----\nABC\n-----END RSA PRIVATE KEY-----\n", + "userId": "371358469065801815" + }"#; + let parsed: MachineKeyFile = serde_json::from_str(raw).expect("valid keyfile"); + assert_eq!(parsed._type, "serviceaccount"); + assert_eq!(parsed.key_id, "371358469099356247"); + assert_eq!(parsed.user_id, "371358469065801815"); + assert!(parsed.key.contains("BEGIN RSA PRIVATE KEY")); + } +} diff --git a/harmony_zitadel_auth/src/lib.rs b/harmony_zitadel_auth/src/lib.rs index f5cd0e78..eb3618c8 100644 --- a/harmony_zitadel_auth/src/lib.rs +++ b/harmony_zitadel_auth/src/lib.rs @@ -2,6 +2,7 @@ pub mod axum_login_flow; pub mod config; pub mod jwks; +pub mod jwt_bearer; pub mod login; pub mod session; @@ -11,13 +12,14 @@ pub use config::{ BASE_URL_ENV, CLIENT_ID_ENV, COOKIE_KEY_ENV, LOGOUT_REDIRECT_URI_ENV, SCOPE_ENV, TRUSTED_AUDIENCES_ENV, ZITADEL_BASE_ENV, ZitadelAuthConfig, config_from_env, }; +pub use jwt_bearer::{MachineKeyFile, ZitadelJwtBearer}; pub use jwks::JwksCache; pub use login::{ - AuthCallbackQuery, LoginAttempt, RawAuthCallbackQuery, TokenResponse, ValidatedUser, + AuthCallbackQuery, LoginAttempt, LoginQuery, RawAuthCallbackQuery, TokenResponse, ValidatedUser, build_login_attempt, build_logout_url, exchange_code_for_token, jwt_exp, - validate_callback_state, validate_id_token, + valid_next, validate_callback_state, validate_id_token, }; pub use session::{LoginAttemptCookie, VerifiedSession}; diff --git a/harmony_zitadel_auth/src/login.rs b/harmony_zitadel_auth/src/login.rs index c92aae1b..a87c8578 100644 --- a/harmony_zitadel_auth/src/login.rs +++ b/harmony_zitadel_auth/src/login.rs @@ -37,6 +37,11 @@ pub struct TokenResponse { pub expires_in: Option, } +#[derive(Debug, Deserialize)] +pub struct LoginQuery { + pub next: Option, +} + #[derive(Debug, Deserialize)] pub struct RawAuthCallbackQuery { pub code: Option, @@ -63,10 +68,15 @@ impl From<&LoginAttempt> for LoginAttemptCookie { state: attempt.state.clone(), pkce_code_verifier: attempt.pkce_code_verifier.clone(), nonce: attempt.nonce.clone(), + next: None, } } } +pub fn valid_next(next: &str) -> bool { + next.starts_with('/') && !next.starts_with("//") && !next.chars().any(char::is_control) +} + impl TryFrom for AuthCallbackQuery { type Error = anyhow::Error; diff --git a/harmony_zitadel_auth/src/session.rs b/harmony_zitadel_auth/src/session.rs index 8e5f73c3..68f9dbf9 100644 --- a/harmony_zitadel_auth/src/session.rs +++ b/harmony_zitadel_auth/src/session.rs @@ -18,4 +18,5 @@ pub struct LoginAttemptCookie { pub state: String, pub pkce_code_verifier: String, pub nonce: String, + pub next: Option, }