From aacbc509b164488002e846d7808172dd421ba776 Mon Sep 17 00:00:00 2001 From: Reda Tarzalt Date: Sun, 31 May 2026 14:59:00 -0400 Subject: [PATCH 1/7] add files for feature --- harmony_zitadel_auth/src/axum_login_flow.rs | 23 ++++++++++++++++----- harmony_zitadel_auth/src/lib.rs | 4 ++-- harmony_zitadel_auth/src/login.rs | 10 +++++++++ harmony_zitadel_auth/src/session.rs | 1 + 4 files changed, 31 insertions(+), 7 deletions(-) diff --git a/harmony_zitadel_auth/src/axum_login_flow.rs b/harmony_zitadel_auth/src/axum_login_flow.rs index 96a9391f..0a61613e 100644 --- a/harmony_zitadel_auth/src/axum_login_flow.rs +++ b/harmony_zitadel_auth/src/axum_login_flow.rs @@ -9,8 +9,8 @@ use base64::engine::general_purpose::URL_SAFE_NO_PAD; use crate::config::ZitadelAuthConfig; use crate::jwks::JwksCache; use crate::login::{ - AuthCallbackQuery, RawAuthCallbackQuery, TokenResponse, build_login_attempt, build_logout_url, - exchange_code_for_token, jwt_exp, validate_callback_state, + AuthCallbackQuery, LoginQuery, RawAuthCallbackQuery, TokenResponse, build_login_attempt, + build_logout_url, exchange_code_for_token, jwt_exp, valid_next, validate_callback_state, }; use crate::session::LoginAttemptCookie; @@ -23,8 +23,9 @@ pub const HARMONY_SESSION_COOKIE: &str = "harmony_fleet_session"; pub async fn login_handler( jar: PrivateCookieJar, State(config): State, + Query(query): Query, ) -> Response { - match build_login_response(jar, &config) { + match build_login_response(jar, &config, query) { Ok(r) => r.into_response(), Err(e) => auth_error_response(e), } @@ -33,9 +34,16 @@ pub async fn login_handler( fn build_login_response( jar: PrivateCookieJar, config: &ZitadelAuthConfig, + query: LoginQuery, ) -> Result { let attempt = build_login_attempt(config)?; - let cookie_payload = LoginAttemptCookie::from(&attempt); + let mut cookie_payload = LoginAttemptCookie::from(&attempt); + cookie_payload.next = Some( + query + .next + .filter(|next| valid_next(next)) + .unwrap_or_else(|| "/".to_string()), + ); let cookie_value = URL_SAFE_NO_PAD.encode(serde_json::to_vec(&cookie_payload)?); let mut builder = Cookie::build((LOGIN_ATTEMPT_COOKIE, cookie_value)) @@ -113,7 +121,12 @@ async fn build_callback_response( } let session_jar = session_jar.add(session_cookie(&tokens, config)); - Ok((jar, session_jar, Redirect::to("/")).into_response()) + let next = attempt + .next + .as_deref() + .filter(|next| valid_next(next)) + .unwrap_or("/"); + Ok((jar, session_jar, Redirect::to(next)).into_response()) } AuthCallbackQuery::Failure { error, diff --git a/harmony_zitadel_auth/src/lib.rs b/harmony_zitadel_auth/src/lib.rs index f5cd0e78..ff232b35 100644 --- a/harmony_zitadel_auth/src/lib.rs +++ b/harmony_zitadel_auth/src/lib.rs @@ -15,9 +15,9 @@ pub use config::{ pub use jwks::JwksCache; pub use login::{ - AuthCallbackQuery, LoginAttempt, RawAuthCallbackQuery, TokenResponse, ValidatedUser, + AuthCallbackQuery, LoginAttempt, LoginQuery, RawAuthCallbackQuery, TokenResponse, ValidatedUser, build_login_attempt, build_logout_url, exchange_code_for_token, jwt_exp, - validate_callback_state, validate_id_token, + valid_next, validate_callback_state, validate_id_token, }; pub use session::{LoginAttemptCookie, VerifiedSession}; diff --git a/harmony_zitadel_auth/src/login.rs b/harmony_zitadel_auth/src/login.rs index c92aae1b..a87c8578 100644 --- a/harmony_zitadel_auth/src/login.rs +++ b/harmony_zitadel_auth/src/login.rs @@ -37,6 +37,11 @@ pub struct TokenResponse { pub expires_in: Option, } +#[derive(Debug, Deserialize)] +pub struct LoginQuery { + pub next: Option, +} + #[derive(Debug, Deserialize)] pub struct RawAuthCallbackQuery { pub code: Option, @@ -63,10 +68,15 @@ impl From<&LoginAttempt> for LoginAttemptCookie { state: attempt.state.clone(), pkce_code_verifier: attempt.pkce_code_verifier.clone(), nonce: attempt.nonce.clone(), + next: None, } } } +pub fn valid_next(next: &str) -> bool { + next.starts_with('/') && !next.starts_with("//") && !next.chars().any(char::is_control) +} + impl TryFrom for AuthCallbackQuery { type Error = anyhow::Error; diff --git a/harmony_zitadel_auth/src/session.rs b/harmony_zitadel_auth/src/session.rs index 8e5f73c3..68f9dbf9 100644 --- a/harmony_zitadel_auth/src/session.rs +++ b/harmony_zitadel_auth/src/session.rs @@ -18,4 +18,5 @@ pub struct LoginAttemptCookie { pub state: String, pub pkce_code_verifier: String, pub nonce: String, + pub next: Option, } -- 2.39.5 From a185ccc78a14ae850c7462ba5912f41cf2a48f14 Mon Sep 17 00:00:00 2001 From: Reda Tarzalt Date: Sun, 31 May 2026 15:08:06 -0400 Subject: [PATCH 2/7] update env variable names --- examples/fleet_auth_callout/src/lib.rs | 2 +- fleet/harmony-fleet-operator/dev.sh | 12 ++++++------ harmony_zitadel_auth/src/config.rs | 12 ++++++------ 3 files changed, 13 insertions(+), 13 deletions(-) diff --git a/examples/fleet_auth_callout/src/lib.rs b/examples/fleet_auth_callout/src/lib.rs index c2d4c9bf..a87cb37e 100644 --- a/examples/fleet_auth_callout/src/lib.rs +++ b/examples/fleet_auth_callout/src/lib.rs @@ -759,7 +759,7 @@ pub async fn mint_access_token( access_token: String, } let tr: TokenResponse = resp.json().await.context("parse token response")?; - if std::env::var("FLEET_AUTH_CALLOUT_DEBUG_TOKENS").is_ok() + if std::env::var("HARMONY_SSO_CALLOUT_DEBUG_TOKENS").is_ok() && let Some(payload_b64) = tr.access_token.split('.').nth(1) { use base64::Engine; diff --git a/fleet/harmony-fleet-operator/dev.sh b/fleet/harmony-fleet-operator/dev.sh index 812f8272..a73268bf 100644 --- a/fleet/harmony-fleet-operator/dev.sh +++ b/fleet/harmony-fleet-operator/dev.sh @@ -1,12 +1,12 @@ #!/bin/bash export BASE_URL=http://localhost:18080 -export FLEET_AUTH_ZITADEL_BASE=https://sso-stg.cb1.nationtech.io -export FLEET_AUTH_CLIENT_ID=372626218874372917 -export FLEET_AUTH_SCOPE="openid profile email" -export FLEET_AUTH_LOGOUT_REDIRECT_URI="http://localhost:18080/" -export FLEET_OPERATOR_COOKIE_KEY_B64=6eKVpj88jwIcmaJajPfohdaIXhSPlfYCrHaOfymTcIWBAIadvhg7NHpMo5vPSMy90vac3cq2liWe1naSgkbaYg== -export FLEET_AUTH_TRUSTED_AUDIENCES=371639797493596981,371683318111994677,372626218874372917,371639797157987125 +export HARMONY_SSO_ZITADEL_BASE=https://sso-stg.cb1.nationtech.io +export HARMONY_SSO_CLIENT_ID=372626218874372917 +export HARMONY_SSO_SCOPE="openid profile email" +export HARMONY_SSO_LOGOUT_REDIRECT_URI="http://localhost:18080/" +export HARMONY_COOKIE_KEY_B64=6eKVpj88jwIcmaJajPfohdaIXhSPlfYCrHaOfymTcIWBAIadvhg7NHpMo5vPSMy90vac3cq2liWe1naSgkbaYg== +export HARMONY_SSO_TRUSTED_AUDIENCES=371639797493596981,371683318111994677,372626218874372917,371639797157987125 export BASE_URL=http://localhost:18080 export RUST_LOG=debug diff --git a/harmony_zitadel_auth/src/config.rs b/harmony_zitadel_auth/src/config.rs index 52019426..34e81415 100644 --- a/harmony_zitadel_auth/src/config.rs +++ b/harmony_zitadel_auth/src/config.rs @@ -33,13 +33,13 @@ impl ZitadelAuthConfig { } } -pub const ZITADEL_BASE_ENV: &str = "FLEET_AUTH_ZITADEL_BASE"; +pub const ZITADEL_BASE_ENV: &str = "HARMONY_SSO_ZITADEL_BASE"; pub const BASE_URL_ENV: &str = "BASE_URL"; -pub const CLIENT_ID_ENV: &str = "FLEET_AUTH_CLIENT_ID"; -pub const SCOPE_ENV: &str = "FLEET_AUTH_SCOPE"; -pub const TRUSTED_AUDIENCES_ENV: &str = "FLEET_AUTH_TRUSTED_AUDIENCES"; -pub const LOGOUT_REDIRECT_URI_ENV: &str = "FLEET_AUTH_LOGOUT_REDIRECT_URI"; -pub const COOKIE_KEY_ENV: &str = "FLEET_OPERATOR_COOKIE_KEY_B64"; +pub const CLIENT_ID_ENV: &str = "HARMONY_SSO_CLIENT_ID"; +pub const SCOPE_ENV: &str = "HARMONY_SSO_SCOPE"; +pub const TRUSTED_AUDIENCES_ENV: &str = "HARMONY_SSO_TRUSTED_AUDIENCES"; +pub const LOGOUT_REDIRECT_URI_ENV: &str = "HARMONY_SSO_LOGOUT_REDIRECT_URI"; +pub const COOKIE_KEY_ENV: &str = "HARMONY_COOKIE_KEY_B64"; pub fn config_from_env() -> ZitadelAuthConfig { ZitadelAuthConfig { -- 2.39.5 From e220f2ff101fdff82841c3301ead8d866126e2f2 Mon Sep 17 00:00:00 2001 From: Reda Tarzalt Date: Sun, 31 May 2026 15:09:25 -0400 Subject: [PATCH 3/7] remove env file --- .env.example | 7 ------- 1 file changed, 7 deletions(-) delete mode 100644 .env.example diff --git a/.env.example b/.env.example deleted file mode 100644 index 2f4ae554..00000000 --- a/.env.example +++ /dev/null @@ -1,7 +0,0 @@ -FLEET_AUTH_ISSUER_URL= -FLEET_AUTH_AUTHORIZE_URL= -FLEET_AUTH_TOKEN_URL= -FLEET_AUTH_CLIENT_ID= -FLEET_AUTH_REDIRECT_URI= -FLEET_AUTH_SCOPE= -FLEET_AUTH_TRUSTED_AUDIENCES= -- 2.39.5 From 788227b8c083d938b7c6e8615376407cf5c0b214 Mon Sep 17 00:00:00 2001 From: Reda Tarzalt Date: Tue, 2 Jun 2026 10:32:11 -0400 Subject: [PATCH 4/7] stand alone module for jwt --- .../examples/mint_zitadel_token.rs | 25 + fleet/harmony-fleet-auth/src/credentials.rs | 511 ++---------------- fleet/harmony-fleet-auth/src/jwt_bearer.rs | 367 +++++++++++++ fleet/harmony-fleet-auth/src/lib.rs | 7 +- 4 files changed, 449 insertions(+), 461 deletions(-) create mode 100644 fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs create mode 100644 fleet/harmony-fleet-auth/src/jwt_bearer.rs diff --git a/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs b/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs new file mode 100644 index 00000000..0b48fb64 --- /dev/null +++ b/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs @@ -0,0 +1,25 @@ +use anyhow::{Context, Result}; +use harmony_fleet_auth::{CredentialSource, CredentialsSection, NatsCredential}; + +#[tokio::main] +async fn main() -> Result<()> { + let key_path = std::env::var("ZITADEL_KEY_PATH").context("ZITADEL_KEY_PATH missing")?; + let issuer = std::env::var("ZITADEL_ISSUER").context("ZITADEL_ISSUER missing")?; + let audience = std::env::var("ZITADEL_PROJECT_ID").context("ZITADEL_PROJECT_ID missing")?; + + let source = CredentialSource::credential_source_from_config(&CredentialsSection::ZitadelJwt { + key_path: key_path.into(), + key_json: None, + oidc_issuer_url: issuer, + audience, + danger_accept_invalid_certs: false, + })?; + + match source.next_credential().await? { + NatsCredential::BearerToken(token) => { + println!("{token}"); + Ok(()) + } + NatsCredential::UserPass { .. } => anyhow::bail!("expected bearer token"), + } +} diff --git a/fleet/harmony-fleet-auth/src/credentials.rs b/fleet/harmony-fleet-auth/src/credentials.rs index 82aa799d..964ff0fe 100644 --- a/fleet/harmony-fleet-auth/src/credentials.rs +++ b/fleet/harmony-fleet-auth/src/credentials.rs @@ -23,14 +23,12 @@ //! cleaner than a Trait + factory. use std::path::Path; -use std::sync::{Arc, Mutex}; -use std::time::Duration; +use std::sync::Arc; use anyhow::{Context, Result}; -use jsonwebtoken::{Algorithm, EncodingKey, Header as JwtHeader}; -use serde::Deserialize; -use crate::config::CredentialsSection; +use crate::jwt_bearer::ZitadelJwtBearer; +use crate::{config::CredentialsSection, jwt_bearer::MachineKeyFile}; /// Material the NATS connector needs to authenticate. Returned per /// (re)connect attempt — the source decides whether to mint fresh. @@ -44,17 +42,8 @@ pub enum NatsCredential { /// from the parsed `[credentials]` section; cloned via Arc into the /// async-nats auth callback. pub enum CredentialSource { - TomlShared { - user: String, - pass: String, - }, - ZitadelJwt { - key: MachineKeyFile, - oidc_issuer_url: String, - audience: String, - http: reqwest::Client, - cache: Mutex>, - }, + TomlShared { user: String, pass: String }, + ZitadelJwt { bearer: Arc }, } impl CredentialSource { @@ -72,256 +61,58 @@ impl CredentialSource { } async fn zitadel_next(&self) -> Result { - // Fast path: lock the cache synchronously, copy out the token if - // it's comfortably valid, drop the lock. Holding a MutexGuard - // across `.await` would make this future !Sync, which - // async-nats's `with_auth_callback` rejects at compile time. - if let Some(token) = self.cached_if_fresh() { - return Ok(NatsCredential::BearerToken(token)); - } - // Slow path: mint outside any lock. Two concurrent (re)connect - // attempts could both reach here and both mint; that's a wasted - // HTTP round-trip in a rare race, not a correctness issue — - // the second writer wins and replaces the first's value. - let fresh = self.zitadel_mint().await?; - let token = fresh.access_token.clone(); - if let Self::ZitadelJwt { - cache, audience, .. - } = self - && let Ok(mut guard) = cache.lock() - { - *guard = Some(fresh); - tracing::info!(audience = %audience, "minted fresh Zitadel access token"); - } + let Self::ZitadelJwt { bearer } = self else { + anyhow::bail!("zitadel_next called on non-ZitadelJwt variant"); + }; + let token = bearer.bearer_token().await?; Ok(NatsCredential::BearerToken(token)) } - fn cached_if_fresh(&self) -> Option { - let Self::ZitadelJwt { cache, .. } = self else { - return None; - }; - let now = chrono::Utc::now().timestamp(); - let guard = cache.lock().ok()?; - let cached = guard.as_ref()?; - if cached.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now { - Some(cached.access_token.clone()) - } else { - None - } - } - - async fn zitadel_mint(&self) -> Result { - let Self::ZitadelJwt { - key, - oidc_issuer_url, - audience, - http, - .. - } = self - else { - anyhow::bail!("zitadel_mint called on non-ZitadelJwt variant"); - }; - - let now = chrono::Utc::now().timestamp(); - let assertion = build_assertion(key, oidc_issuer_url, now)?; - let scope = build_scope(audience); - let token_url = build_token_url(oidc_issuer_url); - - let resp = http - .post(&token_url) - .form(&[ - ( - "grant_type", - "urn:ietf:params:oauth:grant-type:jwt-bearer".to_string(), - ), - ("assertion", assertion), - ("scope", scope), - ]) - .send() - .await - .with_context(|| format!("POST {token_url}"))?; - - if !resp.status().is_success() { - let status = resp.status(); - let body = resp.text().await.unwrap_or_default(); - anyhow::bail!("Zitadel token endpoint returned {status}: {body}"); - } - - #[derive(Deserialize)] - struct TokenResponse { - access_token: String, - #[serde(default)] - expires_in: Option, - } - let tr: TokenResponse = resp.json().await.context("parsing token response")?; - // Zitadel typically returns 12h (43200s); be defensive against - // a missing field by assuming a conservative 1h. - let expires_in = tr.expires_in.unwrap_or(3600); - Ok(CachedToken { - access_token: tr.access_token, - expires_at_unix: now + expires_in, - }) - } -} - -/// Build the JWT-bearer assertion. Split out from the network path so -/// the claims + header shape can be unit-tested without an HTTP server, -/// and split internally into the (pure) claim/header builders so they -/// can be unit-tested without an RSA private key fixture. -pub(crate) fn build_assertion( - key: &MachineKeyFile, - oidc_issuer_url: &str, - now: i64, -) -> Result { - let claims = build_assertion_claims(key, oidc_issuer_url, now); - let header = build_assertion_header(key); - let assertion = jsonwebtoken::encode( - &header, - &claims, - &EncodingKey::from_rsa_pem(key.key.as_bytes()) - .context("parsing RSA private key from machine key file")?, - ) - .context("signing JWT assertion")?; - Ok(assertion) -} - -/// Pure claim payload for the JWT-bearer assertion. `iss == sub == userId` -/// is a Zitadel requirement; `aud` is Zitadel itself (the token endpoint -/// is reached via `oidc_issuer_url`); `exp - iat` MUST be ≤ 60 s or -/// Zitadel rejects. -pub(crate) fn build_assertion_claims( - key: &MachineKeyFile, - oidc_issuer_url: &str, - now: i64, -) -> serde_json::Value { - serde_json::json!({ - "iss": key.user_id, - "sub": key.user_id, - "aud": oidc_issuer_url, - "exp": now + ASSERTION_LIFETIME_SECS, - "iat": now, - }) -} - -/// JWT header for the assertion. The `kid` tells Zitadel which of the -/// machine user's registered keys to verify the signature against. -pub(crate) fn build_assertion_header(key: &MachineKeyFile) -> JwtHeader { - let mut header = JwtHeader::new(Algorithm::RS256); - header.kid = Some(key.key_id.clone()); - header -} - -/// Build the OAuth `scope` string for the token-bearer request. -/// -/// Three scopes are needed for the access token to be useful here: -/// -/// * `openid` — base OIDC requirement. -/// * `urn:zitadel:iam:org:projects:roles` (PLURAL "projects") — -/// tells Zitadel to include the role-claim block in the access -/// token. Without this, the callout sees "no authorized role -/// in token" even when the user has a project role grant. -/// * `urn:zitadel:iam:org:project:id::aud` (SINGULAR -/// "project") — adds to the access token's `aud` claim -/// so the callout's audience validation accepts the project -/// ID we're using as the JWT-bearer audience. -/// -/// The plural-vs-singular distinction is a Zitadel convention, -/// not a typo. Both scopes are required. -pub(crate) fn build_scope(audience: &str) -> String { - format!( - "openid \ - urn:zitadel:iam:org:projects:roles \ - urn:zitadel:iam:org:project:id:{audience}:aud" - ) -} - -/// Resolve the token endpoint URL, tolerating a trailing slash on -/// `oidc_issuer_url`. Without trimming, a configured issuer of -/// `https://sso.example.com/` produces `…//oauth/v2/token` which 404s. -pub(crate) fn build_token_url(oidc_issuer_url: &str) -> String { - format!("{}/oauth/v2/token", oidc_issuer_url.trim_end_matches('/')) -} - -// ---- helper types ---------------------------------------------------------- - -/// JSON keyfile content as Zitadel emits it for a `KEY_TYPE_JSON` -/// machine key. The `key` is a PEM-encoded RSA private key. -#[derive(Debug, Clone, Deserialize)] -pub struct MachineKeyFile { - #[serde(rename = "type")] - pub _type: String, - #[serde(rename = "keyId")] - pub key_id: String, - pub key: String, - #[serde(rename = "userId")] - pub user_id: String, -} - -#[derive(Debug, Clone)] -pub struct CachedToken { - pub(crate) access_token: String, - /// Unix seconds at which the token is no longer trusted by - /// `cached_if_fresh`. Computed from the OAuth response's `expires_in` - /// and the local clock at mint time. - pub(crate) expires_at_unix: i64, -} - -/// Refresh tokens this many seconds before their advertised expiry. -/// Five minutes leaves headroom for clock skew, slow networks, and -/// the round-trip cost of re-minting against Zitadel. -pub const TOKEN_REFRESH_LEEWAY_SECS: i64 = 5 * 60; - -/// Lifetime of the JWT *assertion* (the client-side bearer JWT we sign -/// to authenticate to Zitadel's token endpoint). Zitadel rejects -/// assertions with `exp - iat > 60s`; one minute is the safe ceiling. -pub const ASSERTION_LIFETIME_SECS: i64 = 60; - -// ---- factory --------------------------------------------------------------- - -/// Build the appropriate `CredentialSource` from the parsed config. -/// -/// For [`CredentialsSection::ZitadelJwt`] this reads the keyfile from -/// disk. Both the agent and the operator mount their key as a file -/// (Secret volume in the operator's Pod, dropped by -/// `FleetDeviceSetupScore` on the agent's VM); the path is just -/// configured differently. -pub fn credential_source_from_config(creds: &CredentialsSection) -> Result> { - match creds { - CredentialsSection::TomlShared { - nats_user, - nats_pass, - } => Ok(Arc::new(CredentialSource::TomlShared { - user: nats_user.clone(), - pass: nats_pass.clone(), - })), - CredentialsSection::ZitadelJwt { - key_path, - key_json, - oidc_issuer_url, - audience, - danger_accept_invalid_certs, - } => { - // `key_json` (inline) wins over `key_path` (file). The - // operator pod uses inline because OKD's restricted-v2 - // SCC + env-var-from-Secret deployment shape can't - // reliably mount Secret volumes; the agent uses the file - // path because it lives on a VM and a real file is the - // more natural rotation target. - let key = match key_json.as_deref().map(str::trim) { - Some(json) if !json.is_empty() => parse_machine_key(json)?, - _ => load_machine_key(key_path)?, - }; - Ok(Arc::new(CredentialSource::ZitadelJwt { - key, - oidc_issuer_url: oidc_issuer_url.clone(), - audience: audience.clone(), - http: reqwest::Client::builder() - .danger_accept_invalid_certs(*danger_accept_invalid_certs) - .timeout(Duration::from_secs(10)) - .build() - .context("building HTTP client for Zitadel token endpoint")?, - cache: Mutex::new(None), - })) + /// Build the appropriate `CredentialSource` from the parsed config. + /// + /// For [`CredentialsSection::ZitadelJwt`] this reads the keyfile from + /// disk. Both the agent and the operator mount their key as a file + /// (Secret volume in the operator's Pod, dropped by + /// `FleetDeviceSetupScore` on the agent's VM); the path is just + /// configured differently. + pub fn credential_source_from_config( + creds: &CredentialsSection, + ) -> Result> { + match creds { + CredentialsSection::TomlShared { + nats_user, + nats_pass, + } => Ok(Arc::new(CredentialSource::TomlShared { + user: nats_user.clone(), + pass: nats_pass.clone(), + })), + CredentialsSection::ZitadelJwt { + key_path, + key_json, + oidc_issuer_url, + audience, + danger_accept_invalid_certs, + } => { + // `key_json` (inline) wins over `key_path` (file). The + // operator pod uses inline because OKD's restricted-v2 + // SCC + env-var-from-Secret deployment shape can't + // reliably mount Secret volumes; the agent uses the file + // path because it lives on a VM and a real file is the + // more natural rotation target. + let key = match key_json.as_deref().map(str::trim) { + Some(json) if !json.is_empty() => parse_machine_key(json)?, + _ => load_machine_key(key_path)?, + }; + let bearer = ZitadelJwtBearer::new( + key, + oidc_issuer_url.clone(), + audience.clone(), + *danger_accept_invalid_certs, + )?; + Ok(Arc::new(CredentialSource::ZitadelJwt { + bearer: Arc::new(bearer), + })) + } } } } @@ -341,30 +132,6 @@ fn parse_machine_key(raw: &str) -> Result { mod tests { use super::*; - fn fake_key() -> MachineKeyFile { - MachineKeyFile { - _type: "serviceaccount".to_string(), - key_id: "kid-371358469099356247".to_string(), - // Real PEM not required for the pure-builder tests; the - // signing path that needs a parseable key is exercised - // end-to-end in the e2e harness. - key: "PEM-PLACEHOLDER".to_string(), - user_id: "uid-371358469065801815".to_string(), - } - } - - fn zjwt_source() -> CredentialSource { - CredentialSource::ZitadelJwt { - key: fake_key(), - oidc_issuer_url: "http://sso.fleet.local:8080".to_string(), - audience: "366378028009259037".to_string(), - http: reqwest::Client::new(), - cache: Mutex::new(None), - } - } - - // ---- next_credential / cache state ------------------------------------- - #[tokio::test] async fn toml_shared_returns_userpass_each_call() { let s = CredentialSource::TomlShared { @@ -380,174 +147,4 @@ mod tests { other => panic!("expected UserPass, got {other:?}"), } } - - #[test] - fn cached_token_within_leeway_is_treated_as_expired() { - // Sanity-check the comparison so refactors don't accidentally - // invert the leeway window. - let now = chrono::Utc::now().timestamp(); - let about_to_expire = CachedToken { - access_token: "x".to_string(), - expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS - 1, - }; - assert!( - about_to_expire.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS <= now, - "tokens within the leeway window must be considered expired" - ); - - let comfortable = CachedToken { - access_token: "x".to_string(), - expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS + 60, - }; - assert!( - comfortable.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now, - "tokens with comfortable headroom must be cache-hits" - ); - } - - #[test] - fn cached_if_fresh_returns_some_when_outside_leeway() { - let src = zjwt_source(); - let now = chrono::Utc::now().timestamp(); - if let CredentialSource::ZitadelJwt { cache, .. } = &src { - *cache.lock().unwrap() = Some(CachedToken { - access_token: "fresh".to_string(), - expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS + 60, - }); - } - assert_eq!(src.cached_if_fresh(), Some("fresh".to_string())); - } - - #[test] - fn cached_if_fresh_returns_none_when_no_cache() { - // Brand-new ZitadelJwt source — no token has been minted yet. - // Forces the slow path on first connect. - let src = zjwt_source(); - assert_eq!(src.cached_if_fresh(), None); - } - - #[test] - fn cached_if_fresh_returns_none_for_toml_shared() { - // Defensive: cache_if_fresh is only meaningful for ZitadelJwt; - // TomlShared has no cache. A nonsensical call must return None, - // not panic, so the cold-path can degrade gracefully. - let src = CredentialSource::TomlShared { - user: "u".into(), - pass: "p".into(), - }; - assert_eq!(src.cached_if_fresh(), None); - } - - // ---- assertion claims / header (pure builders) ------------------------ - - #[test] - fn assertion_claims_carry_iss_sub_aud_exp_iat() { - let now = 1_700_000_000; - let claims = build_assertion_claims(&fake_key(), "http://sso.fleet.local:8080", now); - assert_eq!(claims["iss"], "uid-371358469065801815"); - assert_eq!(claims["sub"], "uid-371358469065801815"); - assert_eq!(claims["aud"], "http://sso.fleet.local:8080"); - assert_eq!(claims["iat"].as_i64(), Some(now)); - assert_eq!(claims["exp"].as_i64(), Some(now + ASSERTION_LIFETIME_SECS)); - } - - #[test] - fn assertion_lifetime_locked_at_60_seconds() { - // Zitadel rejects assertions where exp - iat > 60s. If anyone - // bumps ASSERTION_LIFETIME_SECS thinking "more is safer", the - // mints will silently start failing in prod with no helpful - // error. Lock the constant. - assert_eq!(ASSERTION_LIFETIME_SECS, 60); - } - - #[test] - fn assertion_header_carries_kid_and_rs256() { - let header = build_assertion_header(&fake_key()); - assert_eq!(header.alg, jsonwebtoken::Algorithm::RS256); - assert_eq!(header.kid.as_deref(), Some("kid-371358469099356247")); - } - - // ---- scope string ------------------------------------------------------ - - #[test] - fn scope_includes_plural_projects_roles() { - // The plural-projects URN is what tells Zitadel to emit the - // role claim. Day-one bug; lock it. - let s = build_scope("366378028009259037"); - assert!( - s.contains("urn:zitadel:iam:org:projects:roles"), - "scope must include the PLURAL projects-roles URN; got {s:?}" - ); - } - - #[test] - fn scope_audience_uses_singular_project_id_urn() { - // The singular-project URN tells Zitadel to put into the - // access token's aud claim. Different URN entirely from the - // plural one above; both required. - let s = build_scope("366378028009259037"); - assert!( - s.contains("urn:zitadel:iam:org:project:id:366378028009259037:aud"), - "scope must include the SINGULAR project:id::aud URN; got {s:?}" - ); - } - - #[test] - fn scope_includes_openid_base() { - let s = build_scope("any"); - assert!( - s.split_whitespace().any(|tok| tok == "openid"), - "scope must include `openid` as a standalone token; got {s:?}" - ); - } - - // ---- token URL --------------------------------------------------------- - - #[test] - fn token_url_appends_oauth_endpoint() { - assert_eq!( - build_token_url("http://sso.fleet.local:8080"), - "http://sso.fleet.local:8080/oauth/v2/token" - ); - } - - #[test] - fn token_url_strips_single_trailing_slash() { - // A trailing slash would yield `…//oauth/v2/token`, which 404s. - // Common configuration drift; the trim guards against it. - assert_eq!( - build_token_url("http://sso.fleet.local:8080/"), - "http://sso.fleet.local:8080/oauth/v2/token" - ); - } - - #[test] - fn token_url_strips_multiple_trailing_slashes() { - // Defensive — `trim_end_matches('/')` peels all of them, not - // just the first. Locks that semantics. - assert_eq!( - build_token_url("http://sso.fleet.local:8080///"), - "http://sso.fleet.local:8080/oauth/v2/token" - ); - } - - // ---- MachineKeyFile JSON parsing -------------------------------------- - - #[test] - fn machine_key_file_parses_zitadel_json_shape() { - // The serde renames (`type`, `keyId`, `userId`) are easy to - // break. This is the literal JSON shape Zitadel's - // /management/v1/users/.../keys endpoint emits. - let raw = r#"{ - "type": "serviceaccount", - "keyId": "371358469099356247", - "key": "-----BEGIN RSA PRIVATE KEY-----\nABC\n-----END RSA PRIVATE KEY-----\n", - "userId": "371358469065801815" - }"#; - let parsed: MachineKeyFile = serde_json::from_str(raw).expect("valid keyfile"); - assert_eq!(parsed._type, "serviceaccount"); - assert_eq!(parsed.key_id, "371358469099356247"); - assert_eq!(parsed.user_id, "371358469065801815"); - assert!(parsed.key.contains("BEGIN RSA PRIVATE KEY")); - } } diff --git a/fleet/harmony-fleet-auth/src/jwt_bearer.rs b/fleet/harmony-fleet-auth/src/jwt_bearer.rs new file mode 100644 index 00000000..97743a91 --- /dev/null +++ b/fleet/harmony-fleet-auth/src/jwt_bearer.rs @@ -0,0 +1,367 @@ +use serde::Deserialize; +use std::sync::Mutex; + +use anyhow::{Context, Result}; +use jsonwebtoken::{Algorithm, EncodingKey, Header as JwtHeader}; + +pub struct ZitadelJwtBearer { + key: MachineKeyFile, + oidc_issuer_url: String, + audience: String, + http: reqwest::Client, + cache: Mutex>, +} + +/// Refresh tokens this many seconds before their advertised expiry. +/// Five minutes leaves headroom for clock skew, slow networks, and +/// the round-trip cost of re-minting against Zitadel. +pub const TOKEN_REFRESH_LEEWAY_SECS: i64 = 5 * 60; + +/// Lifetime of the JWT *assertion* (the client-side bearer JWT we sign +/// to authenticate to Zitadel's token endpoint). Zitadel rejects +/// assertions with `exp - iat > 60s`; one minute is the safe ceiling. +pub const ASSERTION_LIFETIME_SECS: i64 = 60; + +/// JSON keyfile content as Zitadel emits it for a `KEY_TYPE_JSON` +/// machine key. The `key` is a PEM-encoded RSA private key. +#[derive(Debug, Clone, Deserialize)] +pub struct MachineKeyFile { + #[serde(rename = "type")] + pub _type: String, + #[serde(rename = "keyId")] + pub key_id: String, + pub key: String, + #[serde(rename = "userId")] + pub user_id: String, +} + +#[derive(Debug, Clone)] +pub struct CachedToken { + pub(crate) access_token: String, + /// Unix seconds at which the token is no longer trusted by + /// `cached_if_fresh`. Computed from the OAuth response's `expires_in` + /// and the local clock at mint time. + pub(crate) expires_at_unix: i64, +} + +#[derive(Deserialize)] +struct TokenResponse { + access_token: String, + #[serde(default)] + expires_in: Option, +} + +impl ZitadelJwtBearer { + pub fn new( + key: MachineKeyFile, + oidc_issuer_url: String, + audience: String, + danger_accept_invalid_certs: bool, + ) -> Result { + let http = reqwest::Client::builder() + .danger_accept_invalid_certs(danger_accept_invalid_certs) + .timeout(std::time::Duration::from_secs(10)) + .build() + .context("building HTTP client for Zitadel token endpoint")?; + + Ok(Self { + key, + oidc_issuer_url, + audience, + http, + cache: Mutex::new(None), + }) + } + + pub async fn bearer_token(&self) -> Result { + { + let now = chrono::Utc::now().timestamp(); + let guard = self.cache.lock().unwrap(); + if let Some(cached) = guard.as_ref() { + if cached.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now { + return Ok(cached.access_token.clone()); + } + } + } + + let now = chrono::Utc::now().timestamp(); + let assertion = build_assertion(&self.key, &self.oidc_issuer_url, now)?; + let scope = build_scope(&self.audience); + let token_url = build_token_url(&self.oidc_issuer_url); + + let resp = self + .http + .post(&token_url) + .form(&[ + ( + "grant_type", + "urn:ietf:params:oauth:grant-type:jwt-bearer".to_string(), + ), + ("assertion", assertion), + ("scope", scope), + ]) + .send() + .await + .with_context(|| format!("POST {token_url}"))?; + + if !resp.status().is_success() { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + anyhow::bail!("Zitadel token endpoint returned {status}: {body}"); + } + + let tr: TokenResponse = resp.json().await.context("parsing token response")?; + let expires_in = tr.expires_in.unwrap_or(3600); + let token = tr.access_token.clone(); + + *self.cache.lock().unwrap() = Some(CachedToken { + access_token: tr.access_token, + expires_at_unix: now + expires_in, + }); + + Ok(token) + } + + pub fn invalidate_cache(&self) { + *self.cache.lock().unwrap() = None; + } + + #[cfg(test)] + fn cached_if_fresh(&self) -> Option { + let now = chrono::Utc::now().timestamp(); + let guard = self.cache.lock().unwrap(); + let cached = guard.as_ref()?; + (cached.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now) + .then(|| cached.access_token.clone()) + } +} + +/// Build the JWT-bearer assertion. Split out from the network path so +/// the claims + header shape can be unit-tested without an HTTP server, +/// and split internally into the (pure) claim/header builders so they +/// can be unit-tested without an RSA private key fixture. +pub(crate) fn build_assertion( + key: &MachineKeyFile, + oidc_issuer_url: &str, + now: i64, +) -> Result { + let claims = build_assertion_claims(key, oidc_issuer_url, now); + let header = build_assertion_header(key); + let assertion = jsonwebtoken::encode( + &header, + &claims, + &EncodingKey::from_rsa_pem(key.key.as_bytes()) + .context("parsing RSA private key from machine key file")?, + ) + .context("signing JWT assertion")?; + Ok(assertion) +} + +/// Pure claim payload for the JWT-bearer assertion. `iss == sub == userId` +/// is a Zitadel requirement; `aud` is Zitadel itself (the token endpoint +/// is reached via `oidc_issuer_url`); `exp - iat` MUST be ≤ 60 s or +/// Zitadel rejects. +pub(crate) fn build_assertion_claims( + key: &MachineKeyFile, + oidc_issuer_url: &str, + now: i64, +) -> serde_json::Value { + serde_json::json!({ + "iss": key.user_id, + "sub": key.user_id, + "aud": oidc_issuer_url, + "exp": now + ASSERTION_LIFETIME_SECS, + "iat": now, + }) +} + +/// JWT header for the assertion. The `kid` tells Zitadel which of the +/// machine user's registered keys to verify the signature against. +pub(crate) fn build_assertion_header(key: &MachineKeyFile) -> JwtHeader { + let mut header = JwtHeader::new(Algorithm::RS256); + header.kid = Some(key.key_id.clone()); + header +} + +/// Build the OAuth `scope` string for the token-bearer request. +/// +/// Three scopes are needed for the access token to be useful here: +/// +/// * `openid` — base OIDC requirement. +/// * `urn:zitadel:iam:org:projects:roles` (PLURAL "projects") — +/// tells Zitadel to include the role-claim block in the access +/// token. Without this, the callout sees "no authorized role +/// in token" even when the user has a project role grant. +/// * `urn:zitadel:iam:org:project:id::aud` (SINGULAR +/// "project") — adds to the access token's `aud` claim +/// so the callout's audience validation accepts the project +/// ID we're using as the JWT-bearer audience. +/// +/// The plural-vs-singular distinction is a Zitadel convention, +/// not a typo. Both scopes are required. +pub(crate) fn build_scope(audience: &str) -> String { + format!( + "openid \ + urn:zitadel:iam:org:projects:roles \ + urn:zitadel:iam:org:project:id:{audience}:aud" + ) +} + +/// Resolve the token endpoint URL, tolerating a trailing slash on +/// `oidc_issuer_url`. Without trimming, a configured issuer of +/// `https://sso.example.com/` produces `…//oauth/v2/token` which 404s. +pub(crate) fn build_token_url(oidc_issuer_url: &str) -> String { + format!("{}/oauth/v2/token", oidc_issuer_url.trim_end_matches('/')) +} + +#[cfg(test)] +mod tests { + use super::*; + + fn fake_key() -> MachineKeyFile { + MachineKeyFile { + _type: "serviceaccount".to_string(), + key_id: "kid-371358469099356247".to_string(), + key: "PEM-PLACEHOLDER".to_string(), + user_id: "uid-371358469065801815".to_string(), + } + } + + fn bearer() -> ZitadelJwtBearer { + ZitadelJwtBearer::new( + fake_key(), + "http://sso.fleet.local:8080".to_string(), + "366378028009259037".to_string(), + false, + ) + .unwrap() + } + + #[test] + fn cached_token_within_leeway_is_treated_as_expired() { + let now = chrono::Utc::now().timestamp(); + let about_to_expire = CachedToken { + access_token: "x".to_string(), + expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS - 1, + }; + assert!( + about_to_expire.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS <= now, + "tokens within the leeway window must be considered expired" + ); + + let comfortable = CachedToken { + access_token: "x".to_string(), + expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS + 60, + }; + assert!( + comfortable.expires_at_unix - TOKEN_REFRESH_LEEWAY_SECS > now, + "tokens with comfortable headroom must be cache-hits" + ); + } + + #[test] + fn cached_if_fresh_returns_some_when_outside_leeway() { + let bearer = bearer(); + let now = chrono::Utc::now().timestamp(); + *bearer.cache.lock().unwrap() = Some(CachedToken { + access_token: "fresh".to_string(), + expires_at_unix: now + TOKEN_REFRESH_LEEWAY_SECS + 60, + }); + + assert_eq!(bearer.cached_if_fresh(), Some("fresh".to_string())); + } + + #[test] + fn cached_if_fresh_returns_none_when_no_cache() { + assert_eq!(bearer().cached_if_fresh(), None); + } + + #[test] + fn assertion_claims_carry_iss_sub_aud_exp_iat() { + let now = 1_700_000_000; + let claims = build_assertion_claims(&fake_key(), "http://sso.fleet.local:8080", now); + assert_eq!(claims["iss"], "uid-371358469065801815"); + assert_eq!(claims["sub"], "uid-371358469065801815"); + assert_eq!(claims["aud"], "http://sso.fleet.local:8080"); + assert_eq!(claims["iat"].as_i64(), Some(now)); + assert_eq!(claims["exp"].as_i64(), Some(now + ASSERTION_LIFETIME_SECS)); + } + + #[test] + fn assertion_lifetime_locked_at_60_seconds() { + assert_eq!(ASSERTION_LIFETIME_SECS, 60); + } + + #[test] + fn assertion_header_carries_kid_and_rs256() { + let header = build_assertion_header(&fake_key()); + assert_eq!(header.alg, jsonwebtoken::Algorithm::RS256); + assert_eq!(header.kid.as_deref(), Some("kid-371358469099356247")); + } + + #[test] + fn scope_includes_plural_projects_roles() { + let s = build_scope("366378028009259037"); + assert!( + s.contains("urn:zitadel:iam:org:projects:roles"), + "scope must include the PLURAL projects-roles URN; got {s:?}" + ); + } + + #[test] + fn scope_audience_uses_singular_project_id_urn() { + let s = build_scope("366378028009259037"); + assert!( + s.contains("urn:zitadel:iam:org:project:id:366378028009259037:aud"), + "scope must include the SINGULAR project:id::aud URN; got {s:?}" + ); + } + + #[test] + fn scope_includes_openid_base() { + let s = build_scope("any"); + assert!( + s.split_whitespace().any(|tok| tok == "openid"), + "scope must include `openid` as a standalone token; got {s:?}" + ); + } + + #[test] + fn token_url_appends_oauth_endpoint() { + assert_eq!( + build_token_url("http://sso.fleet.local:8080"), + "http://sso.fleet.local:8080/oauth/v2/token" + ); + } + + #[test] + fn token_url_strips_single_trailing_slash() { + assert_eq!( + build_token_url("http://sso.fleet.local:8080/"), + "http://sso.fleet.local:8080/oauth/v2/token" + ); + } + + #[test] + fn token_url_strips_multiple_trailing_slashes() { + assert_eq!( + build_token_url("http://sso.fleet.local:8080///"), + "http://sso.fleet.local:8080/oauth/v2/token" + ); + } + + #[test] + fn machine_key_file_parses_zitadel_json_shape() { + let raw = r#"{ + "type": "serviceaccount", + "keyId": "371358469099356247", + "key": "-----BEGIN RSA PRIVATE KEY-----\nABC\n-----END RSA PRIVATE KEY-----\n", + "userId": "371358469065801815" + }"#; + let parsed: MachineKeyFile = serde_json::from_str(raw).expect("valid keyfile"); + assert_eq!(parsed._type, "serviceaccount"); + assert_eq!(parsed.key_id, "371358469099356247"); + assert_eq!(parsed.user_id, "371358469065801815"); + assert!(parsed.key.contains("BEGIN RSA PRIVATE KEY")); + } +} diff --git a/fleet/harmony-fleet-auth/src/lib.rs b/fleet/harmony-fleet-auth/src/lib.rs index 3cce33e1..81a68ca1 100644 --- a/fleet/harmony-fleet-auth/src/lib.rs +++ b/fleet/harmony-fleet-auth/src/lib.rs @@ -24,16 +24,15 @@ mod agent_config; mod config; mod credentials; +mod jwt_bearer; pub use agent_config::{AgentConfig, AgentSection, NatsSection, load_config}; pub use config::CredentialsSection; -pub use credentials::{ - ASSERTION_LIFETIME_SECS, CachedToken, CredentialSource, MachineKeyFile, NatsCredential, - TOKEN_REFRESH_LEEWAY_SECS, credential_source_from_config, -}; use std::sync::Arc; +pub use crate::credentials::{CredentialSource, NatsCredential}; + /// Build `async_nats::ConnectOptions` wired with the auth callback /// that pulls fresh credentials from `creds` on every (re)connect. /// -- 2.39.5 From 57d9685c57a7eca6f3c515bfa42fdc01f8c532ff Mon Sep 17 00:00:00 2001 From: Reda Tarzalt Date: Tue, 2 Jun 2026 12:46:42 -0400 Subject: [PATCH 5/7] add files --- Cargo.lock | 2 + examples/fleet_staging_install/src/main.rs | 29 ++-- examples/harmony_sso/src/main.rs | 27 +-- fleet/harmony-fleet-auth/Cargo.toml | 1 + .../examples/mint_zitadel_token.rs | 15 +- fleet/harmony-fleet-auth/src/credentials.rs | 5 +- fleet/harmony-fleet-auth/src/lib.rs | 7 +- harmony_config/src/lib.rs | 27 +-- harmony_secret/Cargo.toml | 1 + harmony_secret/src/lib.rs | 29 ++-- harmony_secret/src/store/mod.rs | 2 +- harmony_secret/src/store/openbao.rs | 158 ++++++++++++------ .../src/jwt_bearer.rs | 0 harmony_zitadel_auth/src/lib.rs | 2 + 14 files changed, 192 insertions(+), 113 deletions(-) rename {fleet/harmony-fleet-auth => harmony_zitadel_auth}/src/jwt_bearer.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index 52da3fe0..c9b548fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4078,6 +4078,7 @@ dependencies = [ "async-nats", "chrono", "harmony-reconciler-contracts", + "harmony_zitadel_auth", "jsonwebtoken", "reqwest 0.12.28", "serde", @@ -4463,6 +4464,7 @@ dependencies = [ "async-trait", "directories", "harmony_secret_derive", + "harmony_zitadel_auth", "http 1.4.0", "infisical", "inquire 0.7.5", diff --git a/examples/fleet_staging_install/src/main.rs b/examples/fleet_staging_install/src/main.rs index a3dfa9d8..416f3eec 100644 --- a/examples/fleet_staging_install/src/main.rs +++ b/examples/fleet_staging_install/src/main.rs @@ -33,7 +33,7 @@ use harmony::topology::{K8sAnywhereTopology, K8sclient, Topology}; use harmony_config::{Config, ConfigClient, StoreSource}; use harmony_fleet_deploy::{FleetDeploySecrets, FleetOperatorScore, OperatorCredentials}; use harmony_k8s::KubernetesDistribution; -use harmony_secret::OpenbaoSecretStore; +use harmony_secret::{OpenbaoSecretStore, OpenbaoStoreOptions}; use nkeys::KeyPair; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -325,19 +325,20 @@ path "secret/metadata/harmony/*" { capabilities = ["list","read"] }"# .await .context("port-forward to OpenBao")?; tokio::time::sleep(std::time::Duration::from_secs(1)).await; - let store = OpenbaoSecretStore::new( - format!("http://127.0.0.1:{}", pf.port()), - "secret".to_string(), - "token".to_string(), - true, - Some(cached_root_token(&openbao).map_err(|e| anyhow::anyhow!(e))?), - None, - None, - None, - None, - None, - None, - ) + let store = OpenbaoSecretStore::new(OpenbaoStoreOptions { + base_url: format!("http://127.0.0.1:{}", pf.port()), + kv_mount: "secret".to_string(), + auth_mount: "token".to_string(), + skip_tls: true, + token: Some(cached_root_token(&openbao).map_err(|e| anyhow::anyhow!(e))?), + username: None, + password: None, + zitadel_sso_url: None, + zitadel_client_id: None, + jwt_role: None, + jwt_auth_mount: None, + zitadel_jwt_bearer: None, + }) .await .context("OpenBao client")?; ConfigClient::new(vec![ diff --git a/examples/harmony_sso/src/main.rs b/examples/harmony_sso/src/main.rs index 96d70766..abdf021e 100644 --- a/examples/harmony_sso/src/main.rs +++ b/examples/harmony_sso/src/main.rs @@ -366,19 +366,20 @@ async fn main() -> anyhow::Result<()> { let openbao_url = format!("http://127.0.0.1:{}", _pf.port()); let sso_url = format!("http://{}:{}", ZITADEL_HOST, HTTP_PORT); - let store = OpenbaoSecretStore::new( - openbao_url, - "secret".to_string(), - "jwt".to_string(), - true, - None, - None, - None, - Some(sso_url), - Some(client_id), - Some("harmony-developer".to_string()), - Some("jwt".to_string()), - ) + let store = OpenbaoSecretStore::new(harmony_secret::OpenbaoStoreOptions { + base_url: openbao_url, + kv_mount: "secret".to_string(), + auth_mount: "jwt".to_string(), + skip_tls: true, + token: None, + username: None, + password: None, + zitadel_sso_url: Some(sso_url), + zitadel_client_id: Some(client_id), + jwt_role: Some("harmony-developer".to_string()), + jwt_auth_mount: Some("jwt".to_string()), + zitadel_jwt_bearer: None, + }) .await .context("SSO authentication failed")?; diff --git a/fleet/harmony-fleet-auth/Cargo.toml b/fleet/harmony-fleet-auth/Cargo.toml index 7e32f1e4..a99836e3 100644 --- a/fleet/harmony-fleet-auth/Cargo.toml +++ b/fleet/harmony-fleet-auth/Cargo.toml @@ -11,6 +11,7 @@ path = "src/lib.rs" [dependencies] harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" } +harmony_zitadel_auth = { path = "../../harmony_zitadel_auth" } async-nats = { workspace = true } anyhow = { workspace = true } chrono = { workspace = true } diff --git a/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs b/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs index 0b48fb64..73d7c949 100644 --- a/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs +++ b/fleet/harmony-fleet-auth/examples/mint_zitadel_token.rs @@ -7,13 +7,14 @@ async fn main() -> Result<()> { let issuer = std::env::var("ZITADEL_ISSUER").context("ZITADEL_ISSUER missing")?; let audience = std::env::var("ZITADEL_PROJECT_ID").context("ZITADEL_PROJECT_ID missing")?; - let source = CredentialSource::credential_source_from_config(&CredentialsSection::ZitadelJwt { - key_path: key_path.into(), - key_json: None, - oidc_issuer_url: issuer, - audience, - danger_accept_invalid_certs: false, - })?; + let source = + CredentialSource::credential_source_from_config(&CredentialsSection::ZitadelJwt { + key_path: key_path.into(), + key_json: None, + oidc_issuer_url: issuer, + audience, + danger_accept_invalid_certs: false, + })?; match source.next_credential().await? { NatsCredential::BearerToken(token) => { diff --git a/fleet/harmony-fleet-auth/src/credentials.rs b/fleet/harmony-fleet-auth/src/credentials.rs index 964ff0fe..36982d55 100644 --- a/fleet/harmony-fleet-auth/src/credentials.rs +++ b/fleet/harmony-fleet-auth/src/credentials.rs @@ -27,8 +27,9 @@ use std::sync::Arc; use anyhow::{Context, Result}; -use crate::jwt_bearer::ZitadelJwtBearer; -use crate::{config::CredentialsSection, jwt_bearer::MachineKeyFile}; +use harmony_zitadel_auth::{MachineKeyFile, ZitadelJwtBearer}; + +use crate::CredentialsSection; /// Material the NATS connector needs to authenticate. Returned per /// (re)connect attempt — the source decides whether to mint fresh. diff --git a/fleet/harmony-fleet-auth/src/lib.rs b/fleet/harmony-fleet-auth/src/lib.rs index 81a68ca1..7932c75f 100644 --- a/fleet/harmony-fleet-auth/src/lib.rs +++ b/fleet/harmony-fleet-auth/src/lib.rs @@ -24,7 +24,6 @@ mod agent_config; mod config; mod credentials; -mod jwt_bearer; pub use agent_config::{AgentConfig, AgentSection, NatsSection, load_config}; pub use config::CredentialsSection; @@ -33,6 +32,12 @@ use std::sync::Arc; pub use crate::credentials::{CredentialSource, NatsCredential}; +pub fn credential_source_from_config( + creds: &CredentialsSection, +) -> anyhow::Result> { + CredentialSource::credential_source_from_config(creds) +} + /// Build `async_nats::ConnectOptions` wired with the auth callback /// that pulls fresh credentials from `creds` on every (re)connect. /// diff --git a/harmony_config/src/lib.rs b/harmony_config/src/lib.rs index dde30471..fabe00ee 100644 --- a/harmony_config/src/lib.rs +++ b/harmony_config/src/lib.rs @@ -210,19 +210,20 @@ async fn openbao_from_env(namespace: &str) -> Option> { }; let env = |k: &str| std::env::var(k).ok(); - let store = harmony_secret::OpenbaoSecretStore::new( - url, - env("OPENBAO_KV_MOUNT").unwrap_or_else(|| "secret".to_string()), - env("OPENBAO_AUTH_MOUNT").unwrap_or_else(|| "jwt".to_string()), - env("OPENBAO_SKIP_TLS").as_deref() == Some("true"), - env("OPENBAO_TOKEN"), - env("OPENBAO_USERNAME"), - env("OPENBAO_PASSWORD"), - env("HARMONY_SSO_URL"), - env("HARMONY_SSO_CLIENT_ID"), - None, - None, - ) + let store = harmony_secret::OpenbaoSecretStore::new(harmony_secret::OpenbaoStoreOptions { + base_url: url, + kv_mount: env("OPENBAO_KV_MOUNT").unwrap_or_else(|| "secret".to_string()), + auth_mount: env("OPENBAO_AUTH_MOUNT").unwrap_or_else(|| "jwt".to_string()), + skip_tls: env("OPENBAO_SKIP_TLS").as_deref() == Some("true"), + token: env("OPENBAO_TOKEN"), + username: env("OPENBAO_USERNAME"), + password: env("OPENBAO_PASSWORD"), + zitadel_sso_url: env("HARMONY_SSO_URL"), + zitadel_client_id: env("HARMONY_SSO_CLIENT_ID"), + jwt_role: None, + jwt_auth_mount: None, + zitadel_jwt_bearer: None, + }) .await; match store { diff --git a/harmony_secret/Cargo.toml b/harmony_secret/Cargo.toml index 8d8b9356..266eb1a6 100644 --- a/harmony_secret/Cargo.toml +++ b/harmony_secret/Cargo.toml @@ -12,6 +12,7 @@ license.workspace = true unexpected_cfgs = { level = "warn", check-cfg = ['cfg(secrete2etest)'] } [dependencies] +harmony_zitadel_auth = { path = "../harmony_zitadel_auth" } harmony_secret_derive = { version = "0.1.0", path = "../harmony_secret_derive" } serde = { version = "1.0.209", features = ["derive", "rc"] } serde_json = "1.0.127" diff --git a/harmony_secret/src/lib.rs b/harmony_secret/src/lib.rs index cda2786f..99d32f3c 100644 --- a/harmony_secret/src/lib.rs +++ b/harmony_secret/src/lib.rs @@ -26,7 +26,7 @@ use serde::{Serialize, de::DeserializeOwned}; use std::fmt; use store::InfisicalSecretStore; use store::LocalFileSecretStore; -pub use store::OpenbaoSecretStore; +pub use store::{OpenbaoSecretStore, OpenbaoStoreOptions}; use thiserror::Error; use tokio::sync::OnceCell; @@ -85,19 +85,20 @@ async fn init_secret_manager() -> SecretManager { let store: Box = match store_type.as_str() { "file" => Box::new(LocalFileSecretStore), "openbao" | "vault" => { - let store = OpenbaoSecretStore::new( - OPENBAO_URL.clone().expect("Openbao/Vault URL must be set, see harmony_secret config for ways to provide it. You can try with OPENBAO_URL or VAULT_ADDR"), - OPENBAO_KV_MOUNT.clone(), - OPENBAO_AUTH_MOUNT.clone(), - *OPENBAO_SKIP_TLS, - OPENBAO_TOKEN.clone(), - OPENBAO_USERNAME.clone(), - OPENBAO_PASSWORD.clone(), - HARMONY_SSO_URL.clone(), - HARMONY_SSO_CLIENT_ID.clone(), - None, - None, - ) + let store = OpenbaoSecretStore::new(OpenbaoStoreOptions { + base_url: OPENBAO_URL.clone().expect("Openbao/Vault URL must be set, see harmony_secret config for ways to provide it. You can try with OPENBAO_URL or VAULT_ADDR"), + kv_mount: OPENBAO_KV_MOUNT.clone(), + auth_mount: OPENBAO_AUTH_MOUNT.clone(), + skip_tls: *OPENBAO_SKIP_TLS, + token: OPENBAO_TOKEN.clone(), + username: OPENBAO_USERNAME.clone(), + password: OPENBAO_PASSWORD.clone(), + zitadel_sso_url: HARMONY_SSO_URL.clone(), + zitadel_client_id: HARMONY_SSO_CLIENT_ID.clone(), + jwt_role: None, + jwt_auth_mount: None, + zitadel_jwt_bearer: None, + }) .await .expect("Failed to initialize Openbao/Vault secret store"); Box::new(store) diff --git a/harmony_secret/src/store/mod.rs b/harmony_secret/src/store/mod.rs index 8179291d..9ea55a4c 100644 --- a/harmony_secret/src/store/mod.rs +++ b/harmony_secret/src/store/mod.rs @@ -5,4 +5,4 @@ pub mod zitadel; pub use infisical::InfisicalSecretStore; pub use local_file::LocalFileSecretStore; -pub use openbao::OpenbaoSecretStore; +pub use openbao::{OpenbaoSecretStore, OpenbaoStoreOptions}; diff --git a/harmony_secret/src/store/openbao.rs b/harmony_secret/src/store/openbao.rs index d0110cd3..0f41eec5 100644 --- a/harmony_secret/src/store/openbao.rs +++ b/harmony_secret/src/store/openbao.rs @@ -1,10 +1,14 @@ use crate::{SecretStore, SecretStoreError}; use async_trait::async_trait; +use harmony_zitadel_auth::ZitadelJwtBearer; use log::{debug, info, warn}; use serde::{Deserialize, Serialize}; +use std::collections::HashSet; use std::fmt::Debug; use std::fs; use std::path::PathBuf; +use std::sync::Arc; +use tokio::sync::Mutex; use vaultrs::auth; use vaultrs::client::{VaultClient, VaultClientSettingsBuilder}; use vaultrs::kv2; @@ -30,21 +34,52 @@ impl From for AuthInfo { } pub struct OpenbaoSecretStore { - client: VaultClient, + inner: Mutex, kv_mount: String, auth_mount: String, + jwt_bearer: Option>, + jwt_role: Option, + jwt_auth_mount: Option, + base_url: String, + skip_tls: bool, +} + +struct Inner { + client: VaultClient, + scope: HashSet, } impl Debug for OpenbaoSecretStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("OpenbaoSecretStore") - .field("client", &self.client.settings) .field("kv_mount", &self.kv_mount) .field("auth_mount", &self.auth_mount) .finish() } } +pub struct OpenbaoStoreOptions { + pub base_url: String, + pub kv_mount: String, + pub auth_mount: String, + pub skip_tls: bool, + pub token: Option, + pub username: Option, + pub password: Option, + pub zitadel_sso_url: Option, + pub zitadel_client_id: Option, + pub zitadel_jwt_bearer: Option, + pub jwt_role: Option, + pub jwt_auth_mount: Option, +} + +pub struct ZitadelJwtBearerConfig { + pub key_path: Option, + pub key_json: Option, + pub oidc_issuer_url: String, + pub audience: String, +} + impl OpenbaoSecretStore { /// Creates a new Openbao/Vault secret store with authentication. /// @@ -52,34 +87,35 @@ impl OpenbaoSecretStore { /// - `auth_mount`: The auth method mount path (e.g., `"userpass"`). Used for userpass authentication. /// - `zitadel_sso_url`: Zitadel OIDC server URL (e.g., `"https://sso.example.com"`). If provided along with `zitadel_client_id`, Zitadel OIDC device flow will be attempted. /// - `zitadel_client_id`: OIDC client ID registered in Zitadel. - // TODO(clippy): refactor to a typed options struct - #[allow(clippy::too_many_arguments)] - pub async fn new( - base_url: String, - kv_mount: String, - auth_mount: String, - skip_tls: bool, - token: Option, - username: Option, - password: Option, - zitadel_sso_url: Option, - zitadel_client_id: Option, - jwt_role: Option, - jwt_auth_mount: Option, - ) -> Result { - info!("OPENBAO_STORE: Initializing client for URL: {base_url}"); + pub async fn new(options: OpenbaoStoreOptions) -> Result { + info!( + "OPENBAO_STORE: Initializing client for URL: {}", + options.base_url + ); // 1. If token is provided via env var, use it directly - if let Some(t) = token { + if let Some(t) = &options.token { debug!("OPENBAO_STORE: Using token from environment variable"); - return Self::with_token(&base_url, skip_tls, &t, &kv_mount, &auth_mount); + return Self::with_token( + &options.base_url, + options.skip_tls, + t, + &options.kv_mount, + &options.auth_mount, + ); } // 2. Try to load cached token - let cache_path = Self::get_token_cache_path(&base_url); + let cache_path = Self::get_token_cache_path(&options.base_url); if let Ok(mut cached_token) = Self::load_cached_token(&cache_path) { debug!("OPENBAO_STORE: Found cached token, validating..."); - if Self::validate_token(&base_url, skip_tls, &cached_token.client_token).await { + if Self::validate_token( + &options.base_url, + options.skip_tls, + &cached_token.client_token, + ) + .await + { info!("OPENBAO_STORE: Cached token is valid"); // Best-effort renewal: extend the server-side lease so @@ -89,7 +125,13 @@ impl OpenbaoSecretStore { // blip, etc.) doesn't block the caller — we just keep // the validated cached token as-is and let the next // invocation try again. - match Self::renew_self(&base_url, skip_tls, &cached_token.client_token).await { + match Self::renew_self( + &options.base_url, + options.skip_tls, + &cached_token.client_token, + ) + .await + { Ok(new_lease) => { info!("OPENBAO_STORE: cached token renewed, lease_duration={new_lease}s"); cached_token.lease_duration = Some(new_lease); @@ -106,26 +148,28 @@ impl OpenbaoSecretStore { } return Self::with_token( - &base_url, - skip_tls, + &options.base_url, + options.skip_tls, &cached_token.client_token, - &kv_mount, - &auth_mount, + &options.kv_mount, + &options.auth_mount, ); } warn!("OPENBAO_STORE: Cached token is invalid or expired"); } // 3. Try Zitadel OIDC device flow if configured - if let (Some(sso_url), Some(client_id)) = (zitadel_sso_url, zitadel_client_id) { + if let (Some(sso_url), Some(client_id)) = + (&options.zitadel_sso_url, &options.zitadel_client_id) + { info!("OPENBAO_STORE: Attempting Zitadel OIDC device flow"); match Self::authenticate_zitadel_oidc( - &base_url, - &sso_url, - &client_id, - skip_tls, - jwt_role.as_deref(), - jwt_auth_mount.as_deref(), + &options.base_url, + sso_url, + client_id, + options.skip_tls, + options.jwt_role.as_deref(), + options.jwt_auth_mount.as_deref(), ) .await { @@ -141,11 +185,11 @@ impl OpenbaoSecretStore { warn!("OPENBAO_STORE: Failed to cache OIDC token: {e}"); } return Self::with_token( - &base_url, - skip_tls, + &options.base_url, + options.skip_tls, &auth_info.client_token, - &kv_mount, - &auth_mount, + &options.kv_mount, + &options.auth_mount, ); } Err(e) => { @@ -155,7 +199,7 @@ impl OpenbaoSecretStore { } // 4. Authenticate with username/password - let (user, pass) = match (username, password) { + let (user, pass) = match (&options.username, &options.password) { (Some(u), Some(p)) => (u, p), _ => { return Err(SecretStoreError::Store( @@ -167,8 +211,14 @@ impl OpenbaoSecretStore { } }; - let token = - Self::authenticate_userpass(&base_url, &auth_mount, skip_tls, &user, &pass).await?; + let token = Self::authenticate_userpass( + &options.base_url, + &options.auth_mount, + options.skip_tls, + user, + pass, + ) + .await?; // Cache the token if let Err(e) = Self::cache_token(&cache_path, &token) { @@ -176,11 +226,11 @@ impl OpenbaoSecretStore { } Self::with_token( - &base_url, - skip_tls, + &options.base_url, + options.skip_tls, &token.client_token, - &kv_mount, - &auth_mount, + &options.kv_mount, + &options.auth_mount, ) } @@ -229,10 +279,20 @@ impl OpenbaoSecretStore { ) .map_err(|e| SecretStoreError::Store(Box::new(e)))?; - Ok(Self { + let inner = Mutex::new(Inner { client, + scope: HashSet::new(), + }); + + Ok(Self { + inner, kv_mount: kv_mount.to_string(), auth_mount: auth_mount.to_string(), + base_url: base_url.to_string(), + skip_tls, + jwt_bearer: None, + jwt_role: None, + jwt_auth_mount: None, }) } @@ -411,7 +471,8 @@ impl SecretStore for OpenbaoSecretStore { info!("OPENBAO_STORE: Getting key '{key}' from namespace '{namespace}'"); debug!("OPENBAO_STORE: Request path: {path}"); - let data: serde_json::Value = kv2::read(&self.client, &self.kv_mount, &path) + let inner = self.inner.lock().await; + let data: serde_json::Value = kv2::read(&inner.client, &self.kv_mount, &path) .await .map_err(|e| { if e.to_string().contains("does not exist") || e.to_string().contains("404") { @@ -447,8 +508,9 @@ impl SecretStore for OpenbaoSecretStore { let data = serde_json::json!({ "value": value_str }); + let inner = self.inner.lock().await; - kv2::set(&self.client, &self.kv_mount, &path, &data) + kv2::set(&inner.client, &self.kv_mount, &path, &data) .await .map_err(|e| SecretStoreError::Store(Box::new(e)))?; diff --git a/fleet/harmony-fleet-auth/src/jwt_bearer.rs b/harmony_zitadel_auth/src/jwt_bearer.rs similarity index 100% rename from fleet/harmony-fleet-auth/src/jwt_bearer.rs rename to harmony_zitadel_auth/src/jwt_bearer.rs diff --git a/harmony_zitadel_auth/src/lib.rs b/harmony_zitadel_auth/src/lib.rs index f5cd0e78..c59c31c9 100644 --- a/harmony_zitadel_auth/src/lib.rs +++ b/harmony_zitadel_auth/src/lib.rs @@ -2,6 +2,7 @@ pub mod axum_login_flow; pub mod config; pub mod jwks; +pub mod jwt_bearer; pub mod login; pub mod session; @@ -11,6 +12,7 @@ pub use config::{ BASE_URL_ENV, CLIENT_ID_ENV, COOKIE_KEY_ENV, LOGOUT_REDIRECT_URI_ENV, SCOPE_ENV, TRUSTED_AUDIENCES_ENV, ZITADEL_BASE_ENV, ZitadelAuthConfig, config_from_env, }; +pub use jwt_bearer::{MachineKeyFile, ZitadelJwtBearer}; pub use jwks::JwksCache; -- 2.39.5 From 649cfe84bf541522fff95dc4dc95cabe346ac9f0 Mon Sep 17 00:00:00 2001 From: Reda Tarzalt Date: Fri, 5 Jun 2026 13:08:44 -0400 Subject: [PATCH 6/7] resolve conflict, add files ob entity policy --- Cargo.lock | 1 + examples/harmony_sso/src/main.rs | 1 - examples/openbao/src/main.rs | 2 +- fleet/harmony-fleet-e2e/Cargo.toml | 5 + .../harmony-fleet-e2e/tests/openbao_policy.rs | 227 ++++++++++++++++++ .../src/fleet_aggregator.rs | 43 +++- fleet/harmony-fleet-operator/src/lib.rs | 1 + fleet/harmony-fleet-operator/src/main.rs | 43 +++- fleet/harmony-fleet-operator/src/openbao.rs | 41 ++++ .../tests/openbao_identity.rs | 169 +++++++++++++ harmony/src/modules/openbao/mod.rs | 5 +- harmony/src/modules/openbao/setup.rs | 207 +++++++++++++++- harmony_secret/src/store/openbao.rs | 223 ++++++++++++++--- 13 files changed, 911 insertions(+), 57 deletions(-) create mode 100644 fleet/harmony-fleet-e2e/tests/openbao_policy.rs create mode 100644 fleet/harmony-fleet-operator/src/openbao.rs create mode 100644 fleet/harmony-fleet-operator/tests/openbao_identity.rs diff --git a/Cargo.lock b/Cargo.lock index c9b548fb..cb074901 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4131,6 +4131,7 @@ dependencies = [ "harmony-fleet-auth", "harmony-fleet-deploy", "harmony-fleet-operator", + "harmony-k8s", "harmony-reconciler-contracts", "harmony_types", "k3d-rs", diff --git a/examples/harmony_sso/src/main.rs b/examples/harmony_sso/src/main.rs index abdf021e..4b7ece74 100644 --- a/examples/harmony_sso/src/main.rs +++ b/examples/harmony_sso/src/main.rs @@ -341,7 +341,6 @@ async fn main() -> anyhow::Result<()> { role_name: "harmony-developer".to_string(), bound_audiences: client_id.clone(), user_claim: "email".to_string(), - policies: vec!["harmony-dev".to_string()], ttl: "4h".to_string(), max_ttl: "24h".to_string(), }), diff --git a/examples/openbao/src/main.rs b/examples/openbao/src/main.rs index f5824b3e..f40d371e 100644 --- a/examples/openbao/src/main.rs +++ b/examples/openbao/src/main.rs @@ -105,7 +105,6 @@ async fn main() -> Result<()> { role_name: "harmony".to_string(), bound_audiences: cfg.oidc_audience.clone(), user_claim: "sub".to_string(), - policies: vec![HARMONY_CONFIG_POLICY.to_string()], ttl: "1h".to_string(), max_ttl: "8h".to_string(), }); @@ -124,6 +123,7 @@ path "secret/metadata/harmony/*" { capabilities = ["list","read"] }"# instance, kv_mount: "secret".to_string(), policies, + entities: vec![], users: vec![], jwt_auth, }; diff --git a/fleet/harmony-fleet-e2e/Cargo.toml b/fleet/harmony-fleet-e2e/Cargo.toml index f11246e0..59b6d97c 100644 --- a/fleet/harmony-fleet-e2e/Cargo.toml +++ b/fleet/harmony-fleet-e2e/Cargo.toml @@ -18,6 +18,10 @@ path = "tests/ping.rs" name = "operator" path = "tests/operator.rs" +[[test]] +name = "openbao_policy" +path = "tests/openbao_policy.rs" + [[test]] name = "vm_ping" path = "tests/vm_ping.rs" @@ -39,6 +43,7 @@ harmony-fleet-auth = { path = "../harmony-fleet-auth" } harmony-fleet-deploy = { path = "../harmony-fleet-deploy" } harmony-fleet-operator = { path = "../harmony-fleet-operator" } harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" } +harmony-k8s = { path = "../../harmony-k8s" } harmony_types = { path = "../../harmony_types" } k3d-rs = { path = "../../k3d" } diff --git a/fleet/harmony-fleet-e2e/tests/openbao_policy.rs b/fleet/harmony-fleet-e2e/tests/openbao_policy.rs new file mode 100644 index 00000000..ff64a53d --- /dev/null +++ b/fleet/harmony-fleet-e2e/tests/openbao_policy.rs @@ -0,0 +1,227 @@ +use std::sync::Arc; + +use harmony::inventory::Inventory; +use harmony::modules::openbao::{ + OpenbaoInstance, OpenbaoPolicy, OpenbaoScore, OpenbaoSetupScore, OpenbaoUser, cached_root_token, +}; +use harmony::score::Score; +use harmony::topology::{K8sAnywhereTopology, K8sclient}; +use harmony_fleet_e2e::{StackOptions, shared_stack}; + +const E2E_ENV: &str = "HARMONY_FLEET_E2E"; +const RELEASE: &str = "openbao-policy-test"; +const USER: &str = "policy-device"; +const PASSWORD: &str = "policy-device-pass"; +const POLICY: &str = "deployment-policy-test"; +const SECRET_PATH: &str = "fleet-staging/deployment-a"; + +fn e2e_enabled() -> bool { + matches!(std::env::var(E2E_ENV).as_deref(), Ok("1" | "true")) +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn entity_policy_grants_existing_openbao_token_access() -> anyhow::Result<()> { + if !e2e_enabled() { + skip_e2e(); + return Ok(()); + } + + let stack = openbao_stack().await?; + stack.print_debug_info(); + + let instance = OpenbaoInstance { + namespace: stack.namespace.clone(), + release: RELEASE.to_string(), + }; + deploy_openbao(&instance).await?; + + let topology = K8sAnywhereTopology::from_env(); + let k8s = topology.k8s_client().await.map_err(anyhow::Error::msg)?; + let root_token = cached_root_token(&instance).map_err(anyhow::Error::msg)?; + let user_token = login_user(&k8s, &instance).await?; + let entity_id = token_entity_id(&k8s, &instance, &user_token).await?; + + set_entity_policies(&k8s, &instance, &root_token, &entity_id, "").await?; + assert_secret_denied(&k8s, &instance, &user_token).await?; + + set_entity_policies(&k8s, &instance, &root_token, &entity_id, POLICY).await?; + assert_secret_allowed(&k8s, &instance, &user_token).await?; + + set_entity_policies(&k8s, &instance, &root_token, &entity_id, "").await?; + Ok(()) +} + +async fn openbao_stack() -> anyhow::Result> { + let _ = tracing_subscriber::fmt() + .with_env_filter( + tracing_subscriber::EnvFilter::try_from_default_env() + .unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")), + ) + .try_init(); + + Ok(shared_stack(StackOptions { + deploy_agent: false, + deploy_operator: false, + ..StackOptions::infra_only() + }) + .await?) +} + +async fn deploy_openbao(instance: &OpenbaoInstance) -> anyhow::Result<()> { + let topology = K8sAnywhereTopology::from_env(); + let policy_hcl = format!( + r#"path "secret/data/{SECRET_PATH}" {{ capabilities = ["read"] }} +path "secret/metadata/{SECRET_PATH}" {{ capabilities = ["read"] }}"# + ); + let scores: Vec>> = vec![ + Box::new(OpenbaoScore { + instance: instance.clone(), + host: "openbao-policy-test.local".to_string(), + openshift: false, + tls_issuer: None, + }), + Box::new(OpenbaoSetupScore { + instance: instance.clone(), + kv_mount: "secret".to_string(), + policies: vec![OpenbaoPolicy { + name: POLICY.to_string(), + hcl: policy_hcl, + }], + entities: vec![], + users: vec![OpenbaoUser { + username: USER.to_string(), + password: PASSWORD.to_string(), + policies: vec![], + }], + jwt_auth: None, + }), + ]; + + for score in scores { + score.interpret(&Inventory::autoload(), &topology).await?; + } + let k8s = topology.k8s_client().await.map_err(anyhow::Error::msg)?; + let root_token = cached_root_token(instance).map_err(anyhow::Error::msg)?; + bao( + &k8s, + instance, + &root_token, + "bao kv put secret/fleet-staging/deployment-a value=e2e", + ) + .await?; + Ok(()) +} + +async fn login_user( + k8s: &harmony_k8s::K8sClient, + instance: &OpenbaoInstance, +) -> anyhow::Result { + let out = exec( + k8s, + instance, + &format!("bao login -method=userpass -format=json username={USER} password={PASSWORD}"), + ) + .await?; + Ok( + serde_json::from_str::(&out)?["auth"]["client_token"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("login response missing auth.client_token: {out}"))? + .to_string(), + ) +} + +async fn token_entity_id( + k8s: &harmony_k8s::K8sClient, + instance: &OpenbaoInstance, + token: &str, +) -> anyhow::Result { + let out = bao(k8s, instance, token, "bao token lookup -format=json").await?; + Ok( + serde_json::from_str::(&out)?["data"]["entity_id"] + .as_str() + .ok_or_else(|| anyhow::anyhow!("token lookup response missing data.entity_id: {out}"))? + .to_string(), + ) +} + +async fn set_entity_policies( + k8s: &harmony_k8s::K8sClient, + instance: &OpenbaoInstance, + root_token: &str, + entity_id: &str, + policies: &str, +) -> anyhow::Result<()> { + bao( + k8s, + instance, + root_token, + &format!("bao write identity/entity/id/{entity_id} policies={policies}"), + ) + .await?; + Ok(()) +} + +async fn assert_secret_denied( + k8s: &harmony_k8s::K8sClient, + instance: &OpenbaoInstance, + token: &str, +) -> anyhow::Result<()> { + let result = bao( + k8s, + instance, + token, + &format!("bao kv get secret/{SECRET_PATH}"), + ) + .await; + assert!(result.is_err(), "secret read unexpectedly succeeded"); + Ok(()) +} + +async fn assert_secret_allowed( + k8s: &harmony_k8s::K8sClient, + instance: &OpenbaoInstance, + token: &str, +) -> anyhow::Result<()> { + bao( + k8s, + instance, + token, + &format!("bao kv get secret/{SECRET_PATH}"), + ) + .await?; + Ok(()) +} + +async fn bao( + k8s: &harmony_k8s::K8sClient, + instance: &OpenbaoInstance, + token: &str, + command: &str, +) -> anyhow::Result { + exec( + k8s, + instance, + &format!("export VAULT_TOKEN={token} && {command}"), + ) + .await +} + +async fn exec( + k8s: &harmony_k8s::K8sClient, + instance: &OpenbaoInstance, + command: &str, +) -> anyhow::Result { + k8s.exec_pod_capture_output( + &instance.pod(), + Some(&instance.namespace), + vec!["sh", "-c", command], + ) + .await + .map_err(|e| anyhow::anyhow!(e)) +} + +fn skip_e2e() { + eprintln!( + "skipping {E2E_ENV}-gated OpenBao e2e test (set {E2E_ENV}=1 to run; requires k3d + helm)" + ); +} diff --git a/fleet/harmony-fleet-operator/src/fleet_aggregator.rs b/fleet/harmony-fleet-operator/src/fleet_aggregator.rs index 9eec10df..0ed38e26 100644 --- a/fleet/harmony-fleet-operator/src/fleet_aggregator.rs +++ b/fleet/harmony-fleet-operator/src/fleet_aggregator.rs @@ -38,6 +38,8 @@ use harmony::modules::fleet::operator::{ }; use tracing::warn; +use crate::openbao::OpenBaoIdentityClient; + const PATCH_TICK: Duration = Duration::from_secs(1); // --------------------------------------------------------------------------- @@ -149,7 +151,11 @@ fn matched_devices( // Top-level run // --------------------------------------------------------------------------- -pub async fn run(client: Client, js: async_nats::jetstream::Context) -> anyhow::Result<()> { +pub async fn run( + client: Client, + js: async_nats::jetstream::Context, + openbao: Option>, +) -> anyhow::Result<()> { let state_bucket = js .create_key_value(async_nats::jetstream::kv::Config { bucket: BUCKET_DEVICE_STATE.to_string(), @@ -207,8 +213,9 @@ pub async fn run(client: Client, js: async_nats::jetstream::Context) -> anyhow:: let device_watcher_handle = { let state = state.clone(); let desired = desired_bucket.clone(); + let openbao = openbao.clone(); tokio::spawn(async move { - if let Err(e) = run_device_watcher(devices_api, state, desired).await { + if let Err(e) = run_device_watcher(devices_api, state, desired, openbao).await { tracing::warn!(error = %e, "aggregator: device watcher exited"); } }) @@ -281,6 +288,18 @@ async fn run_state_kv_watcher(bucket: Store, state: SharedFleetState) -> anyhow: Ok(()) } +/// Returns the full set of OpenBao policies for `device_id` given the +/// current `owned_targets`. One policy per matched deployment, named +/// `deployment-` to match the HCL policies created by OpenbaoSetupScore. +fn device_deployment_policies(state: &FleetState, device_id: &str) -> Vec { + state + .owned_targets + .iter() + .filter(|(_, devices)| devices.contains(device_id)) + .map(|(dn, _)| format!("deployment-{}", dn.as_str())) + .collect() +} + /// Record a device's latest state, dedup against older timestamps, /// maintain last_error, mark the deployment dirty. pub fn apply_state(state: &mut FleetState, pair: DevicePair, ds: DeploymentState) { @@ -467,12 +486,13 @@ async fn run_device_watcher( api: Api, state: SharedFleetState, desired: Store, + openbao: Option>, ) -> anyhow::Result<()> { let mut stream = watcher::watcher(api, WatcherConfig::default()).boxed(); while let Some(event) = stream.try_next().await? { match event { Event::Apply(dev) | Event::InitApply(dev) => { - on_device_upsert(&state, &desired, dev).await; + on_device_upsert(&state, &desired, openbao.as_ref(), dev).await; } Event::Delete(dev) => { on_device_delete(&state, &desired, dev).await; @@ -483,7 +503,12 @@ async fn run_device_watcher( Ok(()) } -async fn on_device_upsert(state: &SharedFleetState, desired: &Store, dev: Device) { +async fn on_device_upsert( + state: &SharedFleetState, + desired: &Store, + openbao: Option<&Arc>, + dev: Device, +) { let name = dev.name_any(); let labels: BTreeMap = dev.metadata.labels.clone().unwrap_or_default(); @@ -547,6 +572,16 @@ async fn on_device_upsert(state: &SharedFleetState, desired: &Store, dev: Device _ => {} } } + + if let Some(bao) = &openbao { + let policies = { + let guard = state.lock().await; + device_deployment_policies(&guard, &name) + }; + if let Err(e) = bao.set_entity_policies(&name, &policies).await { + tracing::warn!(device = %name, error = %e, "aggregator: OpenBao entity policy sync failed"); + } + } } async fn on_device_delete(state: &SharedFleetState, desired: &Store, dev: Device) { diff --git a/fleet/harmony-fleet-operator/src/lib.rs b/fleet/harmony-fleet-operator/src/lib.rs index 8c293a4e..c7cf40ca 100644 --- a/fleet/harmony-fleet-operator/src/lib.rs +++ b/fleet/harmony-fleet-operator/src/lib.rs @@ -15,3 +15,4 @@ pub mod commands; pub mod device_reconciler; pub mod fleet_aggregator; +pub mod openbao; diff --git a/fleet/harmony-fleet-operator/src/main.rs b/fleet/harmony-fleet-operator/src/main.rs index d4ff8e4b..18abf54d 100644 --- a/fleet/harmony-fleet-operator/src/main.rs +++ b/fleet/harmony-fleet-operator/src/main.rs @@ -3,7 +3,7 @@ mod controller; mod frontend; mod service; -use harmony_fleet_operator::{device_reconciler, fleet_aggregator}; +use harmony_fleet_operator::{device_reconciler, fleet_aggregator, openbao::OpenBaoIdentityClient}; use anyhow::{Context, Result}; use async_nats::jetstream; @@ -15,6 +15,7 @@ use harmony_reconciler_contracts::BUCKET_DESIRED_STATE; use kube::Client; #[cfg(feature = "web-frontend")] use std::path::PathBuf; +use std::sync::Arc; use std::time::Duration; #[derive(Parser)] @@ -53,6 +54,12 @@ struct Cli { global = true )] credentials_toml: String, + + #[arg(long, env = "OPENBAO_URL", global = true)] + openbao_url: Option, + + #[arg(long, env = "OPENBAO_TOKEN", global = true)] + openbao_token: Option, } #[derive(Subcommand)] @@ -95,7 +102,16 @@ async fn main() -> Result<()> { let cli = Cli::parse(); match cli.command.unwrap_or(Command::Run) { - Command::Run => run(&cli.nats_url, &cli.kv_bucket, &cli.credentials_toml).await, + Command::Run => { + run( + &cli.nats_url, + &cli.kv_bucket, + &cli.credentials_toml, + &cli.openbao_url, + &cli.openbao_token, + ) + .await + } #[cfg(feature = "web-frontend")] Command::ServeWeb { mock, @@ -152,7 +168,13 @@ async fn serve_web( .await } -async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()> { +async fn run( + nats_url: &str, + bucket: &str, + credentials_toml: &str, + openbao_url: &Option, + openbao_token: &Option, +) -> Result<()> { let nats = connect_with_retry(nats_url, credentials_toml).await?; tracing::info!(url = %nats_url, "connected to NATS"); let js = jetstream::new(nats); @@ -165,6 +187,19 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()> tracing::info!(bucket = %bucket, "KV bucket ready"); let client = Client::try_default().await?; + let openbao = match (openbao_url, openbao_token) { + (Some(url), Some(token)) => { + tracing::info!(url = %url, "OpenBao identity sync enabled"); + Some(Arc::new(OpenBaoIdentityClient::new( + url.clone(), + token.clone(), + ))) + } + _ => { + tracing::warn!("OPENBAO_URL or OPENBAO_TOKEN not set; entity policy sync disabled"); + None + } + }; // Three concurrent tasks: // controller — CR validation + finalizer-cleanup @@ -179,7 +214,7 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()> tokio::select! { r = controller::run(ctl_client, desired_state_kv) => r, r = device_reconciler::run(dr_client, dr_js) => r, - r = fleet_aggregator::run(client, js) => r, + r = fleet_aggregator::run(client, js, openbao) => r, } } diff --git a/fleet/harmony-fleet-operator/src/openbao.rs b/fleet/harmony-fleet-operator/src/openbao.rs new file mode 100644 index 00000000..f106c67c --- /dev/null +++ b/fleet/harmony-fleet-operator/src/openbao.rs @@ -0,0 +1,41 @@ +use anyhow::Result; + +pub struct OpenBaoIdentityClient { + client: reqwest::Client, + base_url: String, + token: String, +} + +impl OpenBaoIdentityClient { + pub fn new(base_url: String, token: String) -> Self { + Self { + client: reqwest::Client::new(), + base_url, + token, + } + } + + /// Idempotent upsert: sets the entity's policy list to exactly `policies`. + /// Passing an empty slice clears all policies (device loses all secret access). + pub async fn set_entity_policies(&self, entity: &str, policies: &[String]) -> Result<()> { + let url = format!( + "{}/v1/identity/entity/name/{}", + self.base_url.trim_end_matches('/'), + entity + ); + let body = serde_json::json!({ "policies": policies }); + let resp = self + .client + .post(&url) + .header("X-Vault-Token", &self.token) + .json(&body) + .send() + .await?; + let status = resp.status(); + if !status.is_success() { + let text = resp.text().await.unwrap_or_default(); + anyhow::bail!("OpenBao entity update {entity} returned {status}: {text}"); + } + Ok(()) + } +} diff --git a/fleet/harmony-fleet-operator/tests/openbao_identity.rs b/fleet/harmony-fleet-operator/tests/openbao_identity.rs new file mode 100644 index 00000000..5d3e9c7d --- /dev/null +++ b/fleet/harmony-fleet-operator/tests/openbao_identity.rs @@ -0,0 +1,169 @@ +use harmony_fleet_operator::openbao::OpenBaoIdentityClient; +use serde::Deserialize; + +#[derive(Deserialize)] +struct EntityLookup { + data: EntityLookupData, +} + +#[derive(Deserialize)] +struct EntityLookupData { + entity_id: String, +} + +#[tokio::test] +async fn openbao_identity_client_sets_entity_policies() -> anyhow::Result<()> { + let Some(admin) = openbao_admin_from_env() else { + skip_openbao(); + return Ok(()); + }; + + let entity = format!("harmony-test-{}", unique_suffix()); + let policies = vec!["harmony-test-policy-a".to_string()]; + + OpenBaoIdentityClient::new(admin.base_url.clone(), admin.token.clone()) + .set_entity_policies(&entity, &policies) + .await?; + + let body = admin + .client + .get(format!( + "{}/v1/identity/entity/name/{}", + admin.base_url, entity + )) + .header("X-Vault-Token", &admin.token) + .send() + .await? + .error_for_status()? + .json::() + .await?; + + assert_eq!(body["data"]["policies"], serde_json::json!(policies)); + Ok(()) +} + +#[tokio::test] +async fn openbao_entity_policy_grants_existing_token_access() -> anyhow::Result<()> { + let Some(admin) = openbao_admin_from_env() else { + skip_openbao(); + return Ok(()); + }; + let Some(device) = openbao_device_from_env() else { + skip_openbao_device(); + return Ok(()); + }; + + let entity_id = lookup_entity_id(&admin.base_url, &device.token).await?; + + set_entity_id_policies(&admin, &entity_id, &[]).await?; + assert_secret_denied(&admin.base_url, &device.token, &device.secret_api_path).await?; + + set_entity_id_policies(&admin, &entity_id, std::slice::from_ref(&device.policy)).await?; + assert_secret_allowed(&admin.base_url, &device.token, &device.secret_api_path).await?; + + set_entity_id_policies(&admin, &entity_id, &[]).await?; + Ok(()) +} + +struct OpenBaoAdmin { + client: reqwest::Client, + base_url: String, + token: String, +} + +struct OpenBaoDevice { + token: String, + policy: String, + secret_api_path: String, +} + +fn openbao_admin_from_env() -> Option { + Some(OpenBaoAdmin { + client: reqwest::Client::new(), + base_url: std::env::var("OPENBAO_URL") + .ok()? + .trim_end_matches('/') + .to_string(), + token: std::env::var("OPENBAO_TOKEN").ok()?, + }) +} + +fn openbao_device_from_env() -> Option { + Some(OpenBaoDevice { + token: std::env::var("OPENBAO_TEST_DEVICE_TOKEN").ok()?, + policy: std::env::var("OPENBAO_TEST_POLICY").ok()?, + secret_api_path: std::env::var("OPENBAO_TEST_SECRET_API_PATH").ok()?, + }) +} + +async fn lookup_entity_id(base_url: &str, token: &str) -> anyhow::Result { + let lookup = reqwest::Client::new() + .get(format!("{base_url}/v1/auth/token/lookup-self")) + .header("X-Vault-Token", token) + .send() + .await? + .error_for_status()? + .json::() + .await?; + Ok(lookup.data.entity_id) +} + +async fn set_entity_id_policies( + admin: &OpenBaoAdmin, + entity_id: &str, + policies: &[String], +) -> anyhow::Result<()> { + admin + .client + .post(format!( + "{}/v1/identity/entity/id/{}", + admin.base_url, entity_id + )) + .header("X-Vault-Token", &admin.token) + .json(&serde_json::json!({ "policies": policies })) + .send() + .await? + .error_for_status()?; + Ok(()) +} + +async fn assert_secret_denied(base_url: &str, token: &str, path: &str) -> anyhow::Result<()> { + let status = reqwest::Client::new() + .get(format!("{base_url}/v1/{path}")) + .header("X-Vault-Token", token) + .send() + .await? + .status(); + assert!( + status == reqwest::StatusCode::FORBIDDEN || status == reqwest::StatusCode::NOT_FOUND, + "expected secret read denial, got {status}" + ); + Ok(()) +} + +async fn assert_secret_allowed(base_url: &str, token: &str, path: &str) -> anyhow::Result<()> { + reqwest::Client::new() + .get(format!("{base_url}/v1/{path}")) + .header("X-Vault-Token", token) + .send() + .await? + .error_for_status()?; + Ok(()) +} + +fn unique_suffix() -> u128 { + std::time::SystemTime::now() + .duration_since(std::time::UNIX_EPOCH) + .expect("system time before UNIX_EPOCH") + .as_millis() +} + +fn skip_openbao() { + eprintln!("skipping OpenBao integration test; set OPENBAO_URL and OPENBAO_TOKEN to run"); +} + +fn skip_openbao_device() { + eprintln!( + "skipping OpenBao token grant test; set OPENBAO_TEST_DEVICE_TOKEN, OPENBAO_TEST_POLICY, and OPENBAO_TEST_SECRET_API_PATH to run" + ); +} diff --git a/harmony/src/modules/openbao/mod.rs b/harmony/src/modules/openbao/mod.rs index 8b612d9f..6fe4e2b9 100644 --- a/harmony/src/modules/openbao/mod.rs +++ b/harmony/src/modules/openbao/mod.rs @@ -13,7 +13,10 @@ use crate::{ topology::{HelmCommand, K8sclient, Topology}, }; -pub use setup::{OpenbaoJwtAuth, OpenbaoPolicy, OpenbaoSetupScore, OpenbaoUser, cached_root_token}; +pub use setup::{ + OpenbaoEntity, OpenbaoEntityPoliciesScore, OpenbaoJwtAuth, OpenbaoPolicy, OpenbaoSetupScore, + OpenbaoUser, cached_root_token, +}; const DEFAULT_NAMESPACE: &str = "openbao"; const DEFAULT_RELEASE: &str = "openbao"; diff --git a/harmony/src/modules/openbao/setup.rs b/harmony/src/modules/openbao/setup.rs index 5c6c62f4..b162d826 100644 --- a/harmony/src/modules/openbao/setup.rs +++ b/harmony/src/modules/openbao/setup.rs @@ -24,6 +24,40 @@ pub struct OpenbaoPolicy { pub hcl: String, } +#[derive(Debug, Clone, Serialize)] +pub struct OpenbaoEntity { + pub name: String, + pub alias_name: String, + pub policies: Vec, +} + +#[derive(Deserialize)] +struct AuthMount { + accessor: String, +} + +#[derive(Debug, Deserialize)] +struct BaoRead { + data: T, +} + +#[derive(Debug, Deserialize)] +struct EntityRead { + id: String, +} + +#[derive(Debug, Clone, Serialize)] +pub struct OpenbaoEntityPoliciesScore { + pub instance: OpenbaoInstance, + pub entity_name: String, + pub policies: Vec, +} + +#[derive(Debug, Clone)] +struct OpenbaoEntityPoliciesInterpret { + score: OpenbaoEntityPoliciesScore, +} + /// A userpass user to create in OpenBao. #[derive(Debug, Clone, Serialize)] pub struct OpenbaoUser { @@ -40,7 +74,6 @@ pub struct OpenbaoJwtAuth { pub role_name: String, pub bound_audiences: String, pub user_claim: String, - pub policies: Vec, pub ttl: String, pub max_ttl: String, } @@ -84,6 +117,9 @@ pub struct OpenbaoSetupScore { #[serde(default)] pub policies: Vec, + #[serde(default)] + pub entities: Vec, + /// Userpass users to create. #[serde(default)] pub users: Vec, @@ -103,12 +139,25 @@ impl Default for OpenbaoSetupScore { instance: OpenbaoInstance::default(), kv_mount: default_kv_mount(), policies: Vec::new(), + entities: Vec::new(), users: Vec::new(), jwt_auth: None, } } } +impl Score for OpenbaoEntityPoliciesScore { + fn name(&self) -> String { + "OpenbaoEntityPoliciesScore".to_string() + } + + fn create_interpret(&self) -> Box> { + Box::new(OpenbaoEntityPoliciesInterpret { + score: self.clone(), + }) + } +} + impl Score for OpenbaoSetupScore { fn name(&self) -> String { "OpenbaoSetupScore".to_string() @@ -165,6 +214,69 @@ pub fn cached_root_token(instance: &OpenbaoInstance) -> Result { Ok(init.root_token) } +#[async_trait] +impl Interpret for OpenbaoEntityPoliciesInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + ) -> Result { + let k8s = topology + .k8s_client() + .await + .map_err(|e| InterpretError::new(format!("Failed to get K8s client: {e}")))?; + + let root_token = cached_root_token(&self.score.instance).map_err(InterpretError::new)?; + + let policies = self.score.policies.join(","); + + k8s.exec_pod_capture_output( + &self.score.instance.pod(), + Some(&self.score.instance.namespace), + vec![ + "sh", + "-c", + &format!( + "export VAULT_TOKEN={} && bao write identity/entity/name/{} policies={}", + root_token, self.score.entity_name, policies + ), + ], + ) + .await + .map_err(|e| { + InterpretError::new(format!( + "Failed to update OpenBao entity '{}' policies: {e}", + self.score.entity_name + )) + })?; + + Ok(Outcome { + status: InterpretStatus::SUCCESS, + message: format!( + "OpenBao entity '{}' policies updated", + self.score.entity_name + ), + details: vec![format!("policies={policies}")], + }) + } + + fn get_name(&self) -> InterpretName { + InterpretName::Custom("OpenbaoEntityPolicies") + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + vec![] + } +} + impl OpenbaoSetupInterpret { async fn exec( &self, @@ -479,7 +591,6 @@ impl OpenbaoSetupInterpret { } } - let policies = jwt.policies.join(","); self.bao( k8s, root_token, @@ -490,7 +601,6 @@ impl OpenbaoSetupInterpret { "role_type=jwt", &format!("bound_audiences={}", jwt.bound_audiences), &format!("user_claim={}", jwt.user_claim), - &format!("policies={}", policies), &format!("ttl={}", jwt.ttl), &format!("max_ttl={}", jwt.max_ttl), "token_type=service", @@ -504,10 +614,92 @@ impl OpenbaoSetupInterpret { )) })?; - info!( - "[OpenbaoSetup] JWT role '{}' created (policies: {})", - jwt.role_name, policies - ); + Ok(()) + } + + async fn create_entities( + &self, + k8s: &harmony_k8s::K8sClient, + root_token: &str, + ) -> Result<(), InterpretError> { + for entity in &self.score.entities { + let policies = entity.policies.join(","); + self.bao( + k8s, + root_token, + &[ + "bao", + "write", + &format!("identity/entity/name/{}", entity.name), + &format!("policies={}", policies), + ], + ) + .await + .map_err(|e| { + InterpretError::new(format!("Failed to create entity '{}': {e}", entity.name)) + })?; + + let output = self + .bao( + k8s, + root_token, + &[ + "bao", + "read", + "-format=json", + &format!("identity/entity/name/{}", entity.name), + ], + ) + .await + .map_err(|e| { + InterpretError::new(format!("Failed to read entity '{}': {e}", entity.name)) + })?; + + let id = serde_json::from_str::>(&output) + .map_err(|e| { + InterpretError::new(format!( + "Failed to parse entity '{}' read output: {e}", + entity.name + )) + })? + .data + .id; + + let auth_list = self + .bao(k8s, root_token, &["bao", "auth", "list", "-format=json"]) + .await?; + + let mounts = + serde_json::from_str::>(&auth_list) + .map_err(|e| InterpretError::new(format!("Failed to parse auth list: {e}")))?; + + let accessor = mounts + .get("jwt/") + .ok_or_else(|| InterpretError::new("JWT auth mount not found".to_string()))? + .accessor + .clone(); + + let _ = self + .bao( + k8s, + root_token, + &[ + "bao", + "write", + "identity/entity-alias", + &format!("name={}", entity.alias_name), + &format!("mount_accessor={accessor}"), + &format!("canonical_id={id}"), + ], + ) + .await + .map_err(|e| { + InterpretError::new(format!( + "Failed to create alias for entity '{}': {e}", + entity.name + )) + })?; + } Ok(()) } } @@ -549,6 +741,7 @@ impl Interpret for OpenbaoSetupInterpret { self.apply_policies(&k8s, &root_token).await?; self.create_users(&k8s, &root_token).await?; self.configure_jwt(&k8s, &root_token).await?; + self.create_entities(&k8s, &root_token).await?; let mut details = vec![ format!("root_token={}", root_token), diff --git a/harmony_secret/src/store/openbao.rs b/harmony_secret/src/store/openbao.rs index 0f41eec5..ccddf77d 100644 --- a/harmony_secret/src/store/openbao.rs +++ b/harmony_secret/src/store/openbao.rs @@ -1,9 +1,8 @@ use crate::{SecretStore, SecretStoreError}; use async_trait::async_trait; -use harmony_zitadel_auth::ZitadelJwtBearer; +use harmony_zitadel_auth::{MachineKeyFile, ZitadelJwtBearer}; use log::{debug, info, warn}; use serde::{Deserialize, Serialize}; -use std::collections::HashSet; use std::fmt::Debug; use std::fs; use std::path::PathBuf; @@ -34,7 +33,7 @@ impl From for AuthInfo { } pub struct OpenbaoSecretStore { - inner: Mutex, + client: Mutex, kv_mount: String, auth_mount: String, jwt_bearer: Option>, @@ -44,11 +43,6 @@ pub struct OpenbaoSecretStore { skip_tls: bool, } -struct Inner { - client: VaultClient, - scope: HashSet, -} - impl Debug for OpenbaoSecretStore { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { f.debug_struct("OpenbaoSecretStore") @@ -158,7 +152,38 @@ impl OpenbaoSecretStore { warn!("OPENBAO_STORE: Cached token is invalid or expired"); } - // 3. Try Zitadel OIDC device flow if configured + // 3. Try Zitadel JWT-bearer for headless devices if configured + if let Some(jwt_bearer_config) = &options.zitadel_jwt_bearer { + info!("OPENBAO_STORE: Attempting Zitadel JWT-bearer authentication"); + match Self::zitadel_jwt_bearer(jwt_bearer_config, options.skip_tls) { + Ok(jwt_bearer) => { + let store = Self::with_token_and_jwt( + &options.base_url, + options.skip_tls, + "", + &options.kv_mount, + &options.auth_mount, + Some(Arc::new(jwt_bearer)), + options.jwt_role.clone(), + options.jwt_auth_mount.clone(), + )?; + match store.refresh_auth().await { + Ok(()) => { + info!("OPENBAO_STORE: Zitadel JWT-bearer authentication successful"); + return Ok(store); + } + Err(e) => warn!( + "OPENBAO_STORE: Zitadel JWT-bearer failed: {e}, trying other auth methods" + ), + } + } + Err(e) => warn!( + "OPENBAO_STORE: Zitadel JWT-bearer setup failed: {e}, trying other auth methods" + ), + } + } + + // 4. Try Zitadel OIDC device flow if configured if let (Some(sso_url), Some(client_id)) = (&options.zitadel_sso_url, &options.zitadel_client_id) { @@ -198,7 +223,7 @@ impl OpenbaoSecretStore { } } - // 4. Authenticate with username/password + // 5. Authenticate with username/password let (user, pass) = match (&options.username, &options.password) { (Some(u), Some(p)) => (u, p), _ => { @@ -256,14 +281,112 @@ impl OpenbaoSecretStore { .map_err(|e| SecretStoreError::Store(e.into())) } - /// Create a client with an existing token - fn with_token( + pub async fn refresh_auth(&self) -> Result<(), SecretStoreError> { + let bearer = self.jwt_bearer.as_ref().ok_or_else(|| { + SecretStoreError::Store("Zitadel JWT-bearer auth is not configured".into()) + })?; + let role = self + .jwt_role + .as_deref() + .ok_or_else(|| SecretStoreError::Store("OpenBao JWT role is not configured".into()))?; + let jwt_auth_mount = self.jwt_auth_mount.as_deref().unwrap_or("jwt"); + + bearer.invalidate_cache(); + let jwt = bearer + .bearer_token() + .await + .map_err(|e| SecretStoreError::Store(e.into()))?; + let token = + Self::jwt_login(&self.base_url, self.skip_tls, jwt_auth_mount, role, &jwt).await?; + let client = Self::build_vault_client(&self.base_url, self.skip_tls, &token)?; + + *self.client.lock().await = client; + Ok(()) + } + + fn zitadel_jwt_bearer( + config: &ZitadelJwtBearerConfig, + danger_accept_invalid_certs: bool, + ) -> Result { + let key = match config.key_json.as_deref().map(str::trim) { + Some(raw) if !raw.is_empty() => serde_json::from_str::(raw), + _ => { + let path = config.key_path.as_ref().ok_or_else(|| { + SecretStoreError::Store( + "Zitadel JWT-bearer requires key_json or key_path".into(), + ) + })?; + let raw = + fs::read_to_string(path).map_err(|e| SecretStoreError::Store(e.into()))?; + serde_json::from_str::(&raw) + } + } + .map_err(|e| SecretStoreError::Store(e.into()))?; + + ZitadelJwtBearer::new( + key, + config.oidc_issuer_url.clone(), + config.audience.clone(), + danger_accept_invalid_certs, + ) + .map_err(|e| SecretStoreError::Store(e.into())) + } + + async fn jwt_login( + base_url: &str, + skip_tls: bool, + jwt_auth_mount: &str, + role: &str, + jwt: &str, + ) -> Result { + let mut builder = reqwest::Client::builder(); + if skip_tls { + builder = builder.danger_accept_invalid_certs(true); + } + let client = builder + .build() + .map_err(|e| SecretStoreError::Store(e.into()))?; + let url = format!( + "{}/v1/auth/{}/login", + base_url.trim_end_matches('/'), + jwt_auth_mount.trim_matches('/') + ); + let response = client + .post(&url) + .json(&serde_json::json!({ "role": role, "jwt": jwt })) + .send() + .await + .map_err(|e| SecretStoreError::Store(e.into()))?; + let status = response.status(); + let body = response + .text() + .await + .map_err(|e| SecretStoreError::Store(e.into()))?; + if !status.is_success() { + return Err(SecretStoreError::Store( + format!("OpenBao JWT login returned {status}: {body}").into(), + )); + } + + let parsed: serde_json::Value = + serde_json::from_str(&body).map_err(|e| SecretStoreError::Store(e.into()))?; + parsed + .get("auth") + .and_then(|auth| auth.get("client_token")) + .and_then(|token| token.as_str()) + .map(str::to_string) + .ok_or_else(|| { + SecretStoreError::Store( + format!("OpenBao JWT login response missing auth.client_token: {body}").into(), + ) + }) + } + + fn build_vault_client( base_url: &str, skip_tls: bool, token: &str, - kv_mount: &str, - auth_mount: &str, - ) -> Result { + ) -> Result { let mut settings = VaultClientSettingsBuilder::default(); settings.address(base_url).token(token); @@ -272,27 +395,48 @@ impl OpenbaoSecretStore { settings.verify(false); } - let client = VaultClient::new( + VaultClient::new( settings .build() .map_err(|e| SecretStoreError::Store(Box::new(e)))?, ) - .map_err(|e| SecretStoreError::Store(Box::new(e)))?; + .map_err(|e| SecretStoreError::Store(Box::new(e))) + } - let inner = Mutex::new(Inner { - client, - scope: HashSet::new(), - }); + /// Create a client with an existing token + fn with_token( + base_url: &str, + skip_tls: bool, + token: &str, + kv_mount: &str, + auth_mount: &str, + ) -> Result { + Self::with_token_and_jwt( + base_url, skip_tls, token, kv_mount, auth_mount, None, None, None, + ) + } + + fn with_token_and_jwt( + base_url: &str, + skip_tls: bool, + token: &str, + kv_mount: &str, + auth_mount: &str, + jwt_bearer: Option>, + jwt_role: Option, + jwt_auth_mount: Option, + ) -> Result { + let client = Self::build_vault_client(base_url, skip_tls, token)?; Ok(Self { - inner, + client: Mutex::new(client), kv_mount: kv_mount.to_string(), auth_mount: auth_mount.to_string(), base_url: base_url.to_string(), skip_tls, - jwt_bearer: None, - jwt_role: None, - jwt_auth_mount: None, + jwt_bearer, + jwt_role, + jwt_auth_mount, }) } @@ -471,19 +615,20 @@ impl SecretStore for OpenbaoSecretStore { info!("OPENBAO_STORE: Getting key '{key}' from namespace '{namespace}'"); debug!("OPENBAO_STORE: Request path: {path}"); - let inner = self.inner.lock().await; - let data: serde_json::Value = kv2::read(&inner.client, &self.kv_mount, &path) - .await - .map_err(|e| { - if e.to_string().contains("does not exist") || e.to_string().contains("404") { - SecretStoreError::NotFound { - namespace: namespace.to_string(), - key: key.to_string(), + let client = self.client.lock().await; + let data: serde_json::Value = + kv2::read(&*client, &self.kv_mount, &path) + .await + .map_err(|e| { + if e.to_string().contains("does not exist") || e.to_string().contains("404") { + SecretStoreError::NotFound { + namespace: namespace.to_string(), + key: key.to_string(), + } + } else { + SecretStoreError::Store(Box::new(e)) } - } else { - SecretStoreError::Store(Box::new(e)) - } - })?; + })?; let value = data.get("value").and_then(|v| v.as_str()).ok_or_else(|| { SecretStoreError::Store("Secret does not contain expected 'value' field".into()) @@ -508,9 +653,9 @@ impl SecretStore for OpenbaoSecretStore { let data = serde_json::json!({ "value": value_str }); - let inner = self.inner.lock().await; + let client = self.client.lock().await; - kv2::set(&inner.client, &self.kv_mount, &path, &data) + kv2::set(&*client, &self.kv_mount, &path, &data) .await .map_err(|e| SecretStoreError::Store(Box::new(e)))?; -- 2.39.5 From d453d2c6be7b0b10ec62c9aa93a7df6eef205581 Mon Sep 17 00:00:00 2001 From: Reda Tarzalt Date: Sun, 7 Jun 2026 12:51:44 -0400 Subject: [PATCH 7/7] add new files --- .../harmony-fleet-e2e/tests/openbao_policy.rs | 1 - .../src/fleet_aggregator.rs | 156 +++++++++++--- fleet/harmony-fleet-operator/src/lib.rs | 1 - fleet/harmony-fleet-operator/src/main.rs | 14 +- fleet/harmony-fleet-operator/src/openbao.rs | 41 ---- .../tests/openbao_identity.rs | 169 --------------- harmony/src/modules/fleet/mod.rs | 1 + harmony/src/modules/fleet/secret_access.rs | 140 ++++++++++++ harmony/src/modules/openbao/identity.rs | 201 ++++++++++++++++++ harmony/src/modules/openbao/mod.rs | 7 +- harmony/src/modules/openbao/setup.rs | 200 ----------------- 11 files changed, 480 insertions(+), 451 deletions(-) delete mode 100644 fleet/harmony-fleet-operator/src/openbao.rs delete mode 100644 fleet/harmony-fleet-operator/tests/openbao_identity.rs create mode 100644 harmony/src/modules/fleet/secret_access.rs create mode 100644 harmony/src/modules/openbao/identity.rs diff --git a/fleet/harmony-fleet-e2e/tests/openbao_policy.rs b/fleet/harmony-fleet-e2e/tests/openbao_policy.rs index ff64a53d..e9eeeaee 100644 --- a/fleet/harmony-fleet-e2e/tests/openbao_policy.rs +++ b/fleet/harmony-fleet-e2e/tests/openbao_policy.rs @@ -87,7 +87,6 @@ path "secret/metadata/{SECRET_PATH}" {{ capabilities = ["read"] }}"# name: POLICY.to_string(), hcl: policy_hcl, }], - entities: vec![], users: vec![OpenbaoUser { username: USER.to_string(), password: PASSWORD.to_string(), diff --git a/fleet/harmony-fleet-operator/src/fleet_aggregator.rs b/fleet/harmony-fleet-operator/src/fleet_aggregator.rs index 0ed38e26..ac202f94 100644 --- a/fleet/harmony-fleet-operator/src/fleet_aggregator.rs +++ b/fleet/harmony-fleet-operator/src/fleet_aggregator.rs @@ -36,10 +36,9 @@ use tokio::sync::Mutex; use harmony::modules::fleet::operator::{ AggregateLastError, Deployment, DeploymentAggregate, Device, }; +use harmony::modules::fleet::secret_access::{DeviceId, DeviceSecretAccess}; use tracing::warn; -use crate::openbao::OpenBaoIdentityClient; - const PATCH_TICK: Duration = Duration::from_secs(1); // --------------------------------------------------------------------------- @@ -154,7 +153,7 @@ fn matched_devices( pub async fn run( client: Client, js: async_nats::jetstream::Context, - openbao: Option>, + secret_access: Option>, ) -> anyhow::Result<()> { let state_bucket = js .create_key_value(async_nats::jetstream::kv::Config { @@ -203,8 +202,11 @@ pub async fn run( let deployment_watcher_handle = { let state = state.clone(); let desired = desired_bucket.clone(); + let access = secret_access.clone(); tokio::spawn(async move { - if let Err(e) = run_deployment_watcher(deployments_api.clone(), state, desired).await { + if let Err(e) = + run_deployment_watcher(deployments_api.clone(), state, desired, access).await + { tracing::warn!(error = %e, "aggregator: deployment watcher exited"); } }) @@ -213,9 +215,9 @@ pub async fn run( let device_watcher_handle = { let state = state.clone(); let desired = desired_bucket.clone(); - let openbao = openbao.clone(); + let access = secret_access.clone(); tokio::spawn(async move { - if let Err(e) = run_device_watcher(devices_api, state, desired, openbao).await { + if let Err(e) = run_device_watcher(devices_api, state, desired, access).await { tracing::warn!(error = %e, "aggregator: device watcher exited"); } }) @@ -288,18 +290,44 @@ async fn run_state_kv_watcher(bucket: Store, state: SharedFleetState) -> anyhow: Ok(()) } -/// Returns the full set of OpenBao policies for `device_id` given the -/// current `owned_targets`. One policy per matched deployment, named -/// `deployment-` to match the HCL policies created by OpenbaoSetupScore. -fn device_deployment_policies(state: &FleetState, device_id: &str) -> Vec { +/// Deployments whose secrets `device_id` may currently read — the set it +/// is a target of. Mapping a deployment onto a backend's access +/// primitive (an OpenBao policy, …) lives in the `DeviceSecretAccess` +/// impl, not here. +fn device_deployments(state: &FleetState, device_id: &str) -> Vec { state .owned_targets .iter() .filter(|(_, devices)| devices.contains(device_id)) - .map(|(dn, _)| format!("deployment-{}", dn.as_str())) + .map(|(dn, _)| dn.clone()) .collect() } +/// Push the current grant set for `devices` to the secret-access backend +/// in one batch. Non-fatal: a backend hiccup logs and is retried on the +/// next membership change rather than tearing down reconciliation. +/// Skipped entirely when no backend is configured (dev without OpenBao). +async fn sync_secret_access( + state: &SharedFleetState, + access: Option<&Arc>, + devices: impl IntoIterator, +) { + let Some(access) = access else { return }; + let grants: Vec<(DeviceId, Vec)> = { + let guard = state.lock().await; + devices + .into_iter() + .map(|d| (DeviceId::from(d.clone()), device_deployments(&guard, &d))) + .collect() + }; + if grants.is_empty() { + return; + } + if let Err(e) = access.set_device_grants(&grants).await { + warn!(error = %e, "aggregator: secret access sync failed"); + } +} + /// Record a device's latest state, dedup against older timestamps, /// maintain last_error, mark the deployment dirty. pub fn apply_state(state: &mut FleetState, pair: DevicePair, ds: DeploymentState) { @@ -378,15 +406,16 @@ async fn run_deployment_watcher( api: Api, state: SharedFleetState, desired: Store, + access: Option>, ) -> anyhow::Result<()> { let mut stream = watcher::watcher(api, WatcherConfig::default()).boxed(); while let Some(event) = stream.try_next().await? { match event { Event::Apply(cr) | Event::InitApply(cr) => { - on_deployment_upsert(&state, &desired, cr).await; + on_deployment_upsert(&state, &desired, access.as_ref(), cr).await; } Event::Delete(cr) => { - on_deployment_delete(&state, &desired, cr).await; + on_deployment_delete(&state, &desired, access.as_ref(), cr).await; } Event::Init | Event::InitDone => {} } @@ -394,7 +423,12 @@ async fn run_deployment_watcher( Ok(()) } -async fn on_deployment_upsert(state: &SharedFleetState, desired: &Store, cr: Deployment) { +async fn on_deployment_upsert( + state: &SharedFleetState, + desired: &Store, + access: Option<&Arc>, + cr: Deployment, +) { let Some(key) = DeploymentKey::from_cr(&cr) else { return; }; @@ -442,9 +476,19 @@ async fn on_deployment_upsert(state: &SharedFleetState, desired: &Store, cr: Dep &score_json, ) .await; + + // Membership change touches the symmetric difference: devices that + // gained or lost this deployment need their grant set recomputed. + let affected: HashSet = new_targets.union(&previous_targets).cloned().collect(); + sync_secret_access(state, access, affected).await; } -async fn on_deployment_delete(state: &SharedFleetState, desired: &Store, cr: Deployment) { +async fn on_deployment_delete( + state: &SharedFleetState, + desired: &Store, + access: Option<&Arc>, + cr: Deployment, +) { let Some(key) = DeploymentKey::from_cr(&cr) else { return; }; @@ -472,6 +516,10 @@ async fn on_deployment_delete(state: &SharedFleetState, desired: &Store, cr: Dep tracing::debug!(key = %k, error = %e, "aggregator: desired-state delete on CR delete failed"); } } + + // The deployment is gone from owned_targets, so each formerly-owned + // device recomputes to a grant set without it. + sync_secret_access(state, access, previous).await; } // --------------------------------------------------------------------------- @@ -486,16 +534,16 @@ async fn run_device_watcher( api: Api, state: SharedFleetState, desired: Store, - openbao: Option>, + access: Option>, ) -> anyhow::Result<()> { let mut stream = watcher::watcher(api, WatcherConfig::default()).boxed(); while let Some(event) = stream.try_next().await? { match event { Event::Apply(dev) | Event::InitApply(dev) => { - on_device_upsert(&state, &desired, openbao.as_ref(), dev).await; + on_device_upsert(&state, &desired, access.as_ref(), dev).await; } Event::Delete(dev) => { - on_device_delete(&state, &desired, dev).await; + on_device_delete(&state, &desired, access.as_ref(), dev).await; } Event::Init | Event::InitDone => {} } @@ -506,7 +554,7 @@ async fn run_device_watcher( async fn on_device_upsert( state: &SharedFleetState, desired: &Store, - openbao: Option<&Arc>, + access: Option<&Arc>, dev: Device, ) { let name = dev.name_any(); @@ -573,18 +621,15 @@ async fn on_device_upsert( } } - if let Some(bao) = &openbao { - let policies = { - let guard = state.lock().await; - device_deployment_policies(&guard, &name) - }; - if let Err(e) = bao.set_entity_policies(&name, &policies).await { - tracing::warn!(device = %name, error = %e, "aggregator: OpenBao entity policy sync failed"); - } - } + sync_secret_access(state, access, std::iter::once(name)).await; } -async fn on_device_delete(state: &SharedFleetState, desired: &Store, dev: Device) { +async fn on_device_delete( + state: &SharedFleetState, + desired: &Store, + access: Option<&Arc>, + dev: Device, +) { let name = dev.name_any(); let removed_from: Vec = { let mut guard = state.lock().await; @@ -608,6 +653,10 @@ async fn on_device_delete(state: &SharedFleetState, desired: &Store, dev: Device tracing::debug!(key = %k, error = %e, "aggregator: desired-state delete on device delete failed"); } } + + // Device is gone from every owned_targets set, so its grant set + // recomputes to empty — clearing all secret access. + sync_secret_access(state, access, std::iter::once(name)).await; } // --------------------------------------------------------------------------- @@ -852,6 +901,55 @@ mod tests { assert_eq!(agg.pending, 1); } + fn devs(ids: &[&str]) -> HashSet { + ids.iter().map(|s| s.to_string()).collect() + } + + #[tokio::test] + async fn sync_pushes_full_deployment_set_per_device() { + use harmony::modules::fleet::secret_access::InMemoryDeviceSecretAccess; + + let mut s = FleetState::default(); + s.owned_targets.insert(dn("web"), devs(&["pi-01", "pi-02"])); + s.owned_targets.insert(dn("db"), devs(&["pi-01"])); + let state: SharedFleetState = Arc::new(Mutex::new(s)); + + let inmem = Arc::new(InMemoryDeviceSecretAccess::new()); + let access: Arc = inmem.clone(); + + sync_secret_access( + &state, + Some(&access), + ["pi-01".to_string(), "pi-02".to_string()], + ) + .await; + + // pi-01 runs both deployments; the impl receives the full set. + assert_eq!( + inmem.grants_for(&DeviceId::from("pi-01")), + vec![dn("db"), dn("web")] + ); + assert_eq!(inmem.grants_for(&DeviceId::from("pi-02")), vec![dn("web")]); + } + + #[tokio::test] + async fn sync_clears_access_for_unmatched_device() { + use harmony::modules::fleet::secret_access::InMemoryDeviceSecretAccess; + + let inmem = Arc::new(InMemoryDeviceSecretAccess::new()); + let access: Arc = inmem.clone(); + access + .set_device_grants(&[(DeviceId::from("pi-01"), vec![dn("web")])]) + .await + .unwrap(); + + // owned_targets no longer contains pi-01 (device deleted / no match). + let state: SharedFleetState = Arc::new(Mutex::new(FleetState::default())); + sync_secret_access(&state, Some(&access), std::iter::once("pi-01".to_string())).await; + + assert!(inmem.grants_for(&DeviceId::from("pi-01")).is_empty()); + } + #[test] fn matched_devices_picks_by_label() { let mut ml = BTreeMap::new(); diff --git a/fleet/harmony-fleet-operator/src/lib.rs b/fleet/harmony-fleet-operator/src/lib.rs index c7cf40ca..8c293a4e 100644 --- a/fleet/harmony-fleet-operator/src/lib.rs +++ b/fleet/harmony-fleet-operator/src/lib.rs @@ -15,4 +15,3 @@ pub mod commands; pub mod device_reconciler; pub mod fleet_aggregator; -pub mod openbao; diff --git a/fleet/harmony-fleet-operator/src/main.rs b/fleet/harmony-fleet-operator/src/main.rs index 18abf54d..edd74f17 100644 --- a/fleet/harmony-fleet-operator/src/main.rs +++ b/fleet/harmony-fleet-operator/src/main.rs @@ -3,7 +3,9 @@ mod controller; mod frontend; mod service; -use harmony_fleet_operator::{device_reconciler, fleet_aggregator, openbao::OpenBaoIdentityClient}; +use harmony::modules::fleet::secret_access::DeviceSecretAccess; +use harmony::modules::openbao::OpenBaoDeviceSecretAccess; +use harmony_fleet_operator::{device_reconciler, fleet_aggregator}; use anyhow::{Context, Result}; use async_nats::jetstream; @@ -187,16 +189,16 @@ async fn run( tracing::info!(bucket = %bucket, "KV bucket ready"); let client = Client::try_default().await?; - let openbao = match (openbao_url, openbao_token) { + let secret_access: Option> = match (openbao_url, openbao_token) { (Some(url), Some(token)) => { - tracing::info!(url = %url, "OpenBao identity sync enabled"); - Some(Arc::new(OpenBaoIdentityClient::new( + tracing::info!(url = %url, "OpenBao secret-access sync enabled"); + Some(Arc::new(OpenBaoDeviceSecretAccess::new( url.clone(), token.clone(), ))) } _ => { - tracing::warn!("OPENBAO_URL or OPENBAO_TOKEN not set; entity policy sync disabled"); + tracing::warn!("OPENBAO_URL or OPENBAO_TOKEN not set; secret-access sync disabled"); None } }; @@ -214,7 +216,7 @@ async fn run( tokio::select! { r = controller::run(ctl_client, desired_state_kv) => r, r = device_reconciler::run(dr_client, dr_js) => r, - r = fleet_aggregator::run(client, js, openbao) => r, + r = fleet_aggregator::run(client, js, secret_access) => r, } } diff --git a/fleet/harmony-fleet-operator/src/openbao.rs b/fleet/harmony-fleet-operator/src/openbao.rs deleted file mode 100644 index f106c67c..00000000 --- a/fleet/harmony-fleet-operator/src/openbao.rs +++ /dev/null @@ -1,41 +0,0 @@ -use anyhow::Result; - -pub struct OpenBaoIdentityClient { - client: reqwest::Client, - base_url: String, - token: String, -} - -impl OpenBaoIdentityClient { - pub fn new(base_url: String, token: String) -> Self { - Self { - client: reqwest::Client::new(), - base_url, - token, - } - } - - /// Idempotent upsert: sets the entity's policy list to exactly `policies`. - /// Passing an empty slice clears all policies (device loses all secret access). - pub async fn set_entity_policies(&self, entity: &str, policies: &[String]) -> Result<()> { - let url = format!( - "{}/v1/identity/entity/name/{}", - self.base_url.trim_end_matches('/'), - entity - ); - let body = serde_json::json!({ "policies": policies }); - let resp = self - .client - .post(&url) - .header("X-Vault-Token", &self.token) - .json(&body) - .send() - .await?; - let status = resp.status(); - if !status.is_success() { - let text = resp.text().await.unwrap_or_default(); - anyhow::bail!("OpenBao entity update {entity} returned {status}: {text}"); - } - Ok(()) - } -} diff --git a/fleet/harmony-fleet-operator/tests/openbao_identity.rs b/fleet/harmony-fleet-operator/tests/openbao_identity.rs deleted file mode 100644 index 5d3e9c7d..00000000 --- a/fleet/harmony-fleet-operator/tests/openbao_identity.rs +++ /dev/null @@ -1,169 +0,0 @@ -use harmony_fleet_operator::openbao::OpenBaoIdentityClient; -use serde::Deserialize; - -#[derive(Deserialize)] -struct EntityLookup { - data: EntityLookupData, -} - -#[derive(Deserialize)] -struct EntityLookupData { - entity_id: String, -} - -#[tokio::test] -async fn openbao_identity_client_sets_entity_policies() -> anyhow::Result<()> { - let Some(admin) = openbao_admin_from_env() else { - skip_openbao(); - return Ok(()); - }; - - let entity = format!("harmony-test-{}", unique_suffix()); - let policies = vec!["harmony-test-policy-a".to_string()]; - - OpenBaoIdentityClient::new(admin.base_url.clone(), admin.token.clone()) - .set_entity_policies(&entity, &policies) - .await?; - - let body = admin - .client - .get(format!( - "{}/v1/identity/entity/name/{}", - admin.base_url, entity - )) - .header("X-Vault-Token", &admin.token) - .send() - .await? - .error_for_status()? - .json::() - .await?; - - assert_eq!(body["data"]["policies"], serde_json::json!(policies)); - Ok(()) -} - -#[tokio::test] -async fn openbao_entity_policy_grants_existing_token_access() -> anyhow::Result<()> { - let Some(admin) = openbao_admin_from_env() else { - skip_openbao(); - return Ok(()); - }; - let Some(device) = openbao_device_from_env() else { - skip_openbao_device(); - return Ok(()); - }; - - let entity_id = lookup_entity_id(&admin.base_url, &device.token).await?; - - set_entity_id_policies(&admin, &entity_id, &[]).await?; - assert_secret_denied(&admin.base_url, &device.token, &device.secret_api_path).await?; - - set_entity_id_policies(&admin, &entity_id, std::slice::from_ref(&device.policy)).await?; - assert_secret_allowed(&admin.base_url, &device.token, &device.secret_api_path).await?; - - set_entity_id_policies(&admin, &entity_id, &[]).await?; - Ok(()) -} - -struct OpenBaoAdmin { - client: reqwest::Client, - base_url: String, - token: String, -} - -struct OpenBaoDevice { - token: String, - policy: String, - secret_api_path: String, -} - -fn openbao_admin_from_env() -> Option { - Some(OpenBaoAdmin { - client: reqwest::Client::new(), - base_url: std::env::var("OPENBAO_URL") - .ok()? - .trim_end_matches('/') - .to_string(), - token: std::env::var("OPENBAO_TOKEN").ok()?, - }) -} - -fn openbao_device_from_env() -> Option { - Some(OpenBaoDevice { - token: std::env::var("OPENBAO_TEST_DEVICE_TOKEN").ok()?, - policy: std::env::var("OPENBAO_TEST_POLICY").ok()?, - secret_api_path: std::env::var("OPENBAO_TEST_SECRET_API_PATH").ok()?, - }) -} - -async fn lookup_entity_id(base_url: &str, token: &str) -> anyhow::Result { - let lookup = reqwest::Client::new() - .get(format!("{base_url}/v1/auth/token/lookup-self")) - .header("X-Vault-Token", token) - .send() - .await? - .error_for_status()? - .json::() - .await?; - Ok(lookup.data.entity_id) -} - -async fn set_entity_id_policies( - admin: &OpenBaoAdmin, - entity_id: &str, - policies: &[String], -) -> anyhow::Result<()> { - admin - .client - .post(format!( - "{}/v1/identity/entity/id/{}", - admin.base_url, entity_id - )) - .header("X-Vault-Token", &admin.token) - .json(&serde_json::json!({ "policies": policies })) - .send() - .await? - .error_for_status()?; - Ok(()) -} - -async fn assert_secret_denied(base_url: &str, token: &str, path: &str) -> anyhow::Result<()> { - let status = reqwest::Client::new() - .get(format!("{base_url}/v1/{path}")) - .header("X-Vault-Token", token) - .send() - .await? - .status(); - assert!( - status == reqwest::StatusCode::FORBIDDEN || status == reqwest::StatusCode::NOT_FOUND, - "expected secret read denial, got {status}" - ); - Ok(()) -} - -async fn assert_secret_allowed(base_url: &str, token: &str, path: &str) -> anyhow::Result<()> { - reqwest::Client::new() - .get(format!("{base_url}/v1/{path}")) - .header("X-Vault-Token", token) - .send() - .await? - .error_for_status()?; - Ok(()) -} - -fn unique_suffix() -> u128 { - std::time::SystemTime::now() - .duration_since(std::time::UNIX_EPOCH) - .expect("system time before UNIX_EPOCH") - .as_millis() -} - -fn skip_openbao() { - eprintln!("skipping OpenBao integration test; set OPENBAO_URL and OPENBAO_TOKEN to run"); -} - -fn skip_openbao_device() { - eprintln!( - "skipping OpenBao token grant test; set OPENBAO_TEST_DEVICE_TOKEN, OPENBAO_TEST_POLICY, and OPENBAO_TEST_SECRET_API_PATH to run" - ); -} diff --git a/harmony/src/modules/fleet/mod.rs b/harmony/src/modules/fleet/mod.rs index e7c830b2..71a99e75 100644 --- a/harmony/src/modules/fleet/mod.rs +++ b/harmony/src/modules/fleet/mod.rs @@ -23,6 +23,7 @@ pub mod assets; pub mod libvirt_pool; pub mod operator; pub mod preflight; +pub mod secret_access; mod setup_score; #[cfg(all(feature = "kvm", unix))] mod vm_score; diff --git a/harmony/src/modules/fleet/secret_access.rs b/harmony/src/modules/fleet/secret_access.rs new file mode 100644 index 00000000..a470cc13 --- /dev/null +++ b/harmony/src/modules/fleet/secret_access.rs @@ -0,0 +1,140 @@ +//! Device → secret access control: the domain boundary the fleet +//! operator drives when device membership changes. +//! +//! A device may read the secrets of the deployments it currently runs. +//! How a "deployment" maps onto a backend's access primitive (an OpenBao +//! policy, a Vault role, …) is the *implementation's* concern; callers +//! speak only devices and deployments. That separation is what lets the +//! operator be unit-tested against [`InMemoryDeviceSecretAccess`] with no +//! live secret store — the production backend is +//! [`crate::modules::openbao::OpenBaoDeviceSecretAccess`]. + +use std::collections::{BTreeMap, BTreeSet}; +use std::sync::Mutex; + +use async_trait::async_trait; +use harmony_reconciler_contracts::DeploymentName; +use harmony_types::id::Id; + +/// Identifies a fleet device — the same `Id` the wire contracts carry as +/// `device_id`. +pub type DeviceId = Id; + +#[derive(Debug, thiserror::Error)] +pub enum SecretAccessError { + #[error("secret access backend error: {0}")] + Backend(String), +} + +/// Sets which deployments' secrets a device may read. +/// +/// One selector or deployment change can touch many devices at once, so +/// the operation is batched; an implementation chunks large batches as +/// its backend requires (you can neither make a million calls nor send a +/// million-element body at once). +#[async_trait] +pub trait DeviceSecretAccess: Send + Sync { + /// Set the *complete* set of deployments each listed device may read + /// secrets for. Absolute and idempotent: afterwards a device can + /// access exactly the deployments given — extras are revoked, an + /// empty list clears all access. Devices not listed are untouched. + async fn set_device_grants( + &self, + grants: &[(DeviceId, Vec)], + ) -> Result<(), SecretAccessError>; +} + +/// In-memory [`DeviceSecretAccess`] for tests: records the latest grant +/// set per device so a test can assert the operator synced the right +/// access on each flow, with nothing deployed. +#[derive(Debug, Default)] +pub struct InMemoryDeviceSecretAccess { + grants: Mutex>>, +} + +impl InMemoryDeviceSecretAccess { + pub fn new() -> Self { + Self::default() + } + + /// Current grants for `device`, sorted. Empty when the device has no + /// access (never granted, or cleared). + pub fn grants_for(&self, device: &DeviceId) -> Vec { + self.grants + .lock() + .unwrap() + .get(device) + .map(|s| s.iter().cloned().collect()) + .unwrap_or_default() + } +} + +#[async_trait] +impl DeviceSecretAccess for InMemoryDeviceSecretAccess { + async fn set_device_grants( + &self, + grants: &[(DeviceId, Vec)], + ) -> Result<(), SecretAccessError> { + let mut store = self.grants.lock().unwrap(); + for (device, deployments) in grants { + if deployments.is_empty() { + store.remove(device); + } else { + store.insert(device.clone(), deployments.iter().cloned().collect()); + } + } + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + fn dn(s: &str) -> DeploymentName { + DeploymentName::try_new(s).expect("valid test name") + } + + #[tokio::test] + async fn sets_overwrites_and_clears_grants() { + let access = InMemoryDeviceSecretAccess::new(); + let dev = DeviceId::from("pi-01"); + + access + .set_device_grants(&[(dev.clone(), vec![dn("web"), dn("db")])]) + .await + .unwrap(); + assert_eq!(access.grants_for(&dev), vec![dn("db"), dn("web")]); + + // Absolute, not additive: a smaller set revokes the rest. + access + .set_device_grants(&[(dev.clone(), vec![dn("web")])]) + .await + .unwrap(); + assert_eq!(access.grants_for(&dev), vec![dn("web")]); + + // Empty clears all access. + access + .set_device_grants(&[(dev.clone(), vec![])]) + .await + .unwrap(); + assert!(access.grants_for(&dev).is_empty()); + } + + #[tokio::test] + async fn batch_touches_only_listed_devices() { + let access = InMemoryDeviceSecretAccess::new(); + let a = DeviceId::from("a"); + let b = DeviceId::from("b"); + access + .set_device_grants(&[(a.clone(), vec![dn("web")])]) + .await + .unwrap(); + access + .set_device_grants(&[(b.clone(), vec![dn("db")])]) + .await + .unwrap(); + assert_eq!(access.grants_for(&a), vec![dn("web")]); + assert_eq!(access.grants_for(&b), vec![dn("db")]); + } +} diff --git a/harmony/src/modules/openbao/identity.rs b/harmony/src/modules/openbao/identity.rs new file mode 100644 index 00000000..ff062179 --- /dev/null +++ b/harmony/src/modules/openbao/identity.rs @@ -0,0 +1,201 @@ +//! OpenBao-backed [`DeviceSecretAccess`]. +//! +//! A device's secret access *is* its OpenBao identity entity's policy +//! list. We map one deployment → one policy named `deployment-` +//! (created by [`super::OpenbaoSetupScore`]) and set the entity's +//! policies to exactly the device's current deployments. +//! +//! Why entity-by-policy and not a JWT claim: the device logs in once and +//! keeps its token; reassigning it only rewrites the entity's policy +//! list, which its existing token sees on the next call — no re-auth, no +//! thundering herd on bulk reassignment. +//! +//! Entity resolution is the subtle part. OpenBao's JWT auth +//! auto-provisions an entity (random name) + alias (name = the +//! `user_claim` value, i.e. the device id) on a device's first login, so +//! addressing an entity by a name *we* invent would write to a +//! different, alias-less entity the device never uses. We instead +//! resolve device id → entity via its alias, and create the +//! entity + alias ourselves when the device hasn't logged in yet (a +//! later login reuses the same alias). Both paths converge on writing +//! policies by canonical entity id. + +use async_trait::async_trait; +use reqwest::StatusCode; +use serde_json::json; +use tokio::sync::OnceCell; + +use crate::modules::fleet::secret_access::{DeviceId, DeviceSecretAccess, SecretAccessError}; +use harmony_reconciler_contracts::DeploymentName; + +const DEFAULT_JWT_MOUNT: &str = "jwt"; + +pub struct OpenBaoDeviceSecretAccess { + client: reqwest::Client, + base_url: String, + token: String, + jwt_mount: String, + /// JWT auth mount accessor, resolved once and reused — needed to bind + /// an entity alias to the device's login identity. + jwt_accessor: OnceCell, +} + +impl OpenBaoDeviceSecretAccess { + pub fn new(base_url: String, token: String) -> Self { + Self::with_jwt_mount(base_url, token, DEFAULT_JWT_MOUNT.to_string()) + } + + pub fn with_jwt_mount(base_url: String, token: String, jwt_mount: String) -> Self { + Self { + client: reqwest::Client::new(), + base_url: base_url.trim_end_matches('/').to_string(), + token, + jwt_mount, + jwt_accessor: OnceCell::new(), + } + } + + fn err(context: impl std::fmt::Display, e: impl std::fmt::Display) -> SecretAccessError { + SecretAccessError::Backend(format!("{context}: {e}")) + } + + async fn post( + &self, + path: &str, + body: serde_json::Value, + ) -> Result { + self.client + .post(format!("{}/v1/{path}", self.base_url)) + .header("X-Vault-Token", &self.token) + .json(&body) + .send() + .await + .map_err(|e| Self::err(format!("POST {path}"), e)) + } + + /// JWT mount accessor from `sys/auth`, cached for the client's life. + async fn jwt_accessor(&self) -> Result<&str, SecretAccessError> { + self.jwt_accessor + .get_or_try_init(|| async { + let body: serde_json::Value = self + .client + .get(format!("{}/v1/sys/auth", self.base_url)) + .header("X-Vault-Token", &self.token) + .send() + .await + .map_err(|e| Self::err("GET sys/auth", e))? + .error_for_status() + .map_err(|e| Self::err("GET sys/auth", e))? + .json() + .await + .map_err(|e| Self::err("parse sys/auth", e))?; + // sys/auth nests mounts under `data` over the HTTP API but + // emits them at the document root via the CLI; tolerate both. + let mount_key = format!("{}/", self.jwt_mount); + body.get("data") + .and_then(|d| d.get(&mount_key)) + .or_else(|| body.get(&mount_key)) + .and_then(|m| m.get("accessor")) + .and_then(|a| a.as_str()) + .map(str::to_string) + .ok_or_else(|| { + Self::err( + "resolve jwt accessor", + format!("mount '{mount_key}' not found in sys/auth"), + ) + }) + }) + .await + .map(String::as_str) + } + + /// Canonical entity id for `device`, resolved via its login alias. + /// Creates the entity + alias when the device hasn't logged in yet so + /// access can be granted ahead of first login; a later login reuses + /// the same alias. + async fn entity_id(&self, device: &str) -> Result { + let accessor = self.jwt_accessor().await?.to_string(); + + // Common path: the alias already exists (device logged in, or we + // created it on a previous sync). + let resp = self + .post( + "identity/lookup/entity", + json!({ "alias_name": device, "alias_mount_accessor": accessor }), + ) + .await?; + if resp.status().is_success() { + // A miss returns 204 with an empty body, not an error status. + let text = resp + .text() + .await + .map_err(|e| Self::err("read entity lookup", e))?; + if !text.is_empty() { + let body: serde_json::Value = + serde_json::from_str(&text).map_err(|e| Self::err("parse entity lookup", e))?; + if let Some(id) = body + .get("data") + .and_then(|d| d.get("id")) + .and_then(|v| v.as_str()) + { + return Ok(id.to_string()); + } + } + } + + // Miss — create the entity (named after the device) and bind the + // alias to its JWT login identity. + let body: serde_json::Value = self + .post("identity/entity", json!({ "name": device })) + .await? + .error_for_status() + .map_err(|e| Self::err("create entity", e))? + .json() + .await + .map_err(|e| Self::err("parse create entity", e))?; + let id = body + .get("data") + .and_then(|d| d.get("id")) + .and_then(|v| v.as_str()) + .ok_or_else(|| Self::err("create entity", "response missing data.id"))? + .to_string(); + + let alias = self + .post( + "identity/entity-alias", + json!({ "name": device, "canonical_id": id, "mount_accessor": accessor }), + ) + .await?; + // A concurrent login may have just auto-provisioned the alias; + // OpenBao answers 400 "alias already exists" — treat as success. + if !alias.status().is_success() && alias.status() != StatusCode::BAD_REQUEST { + return Err(Self::err("create entity alias", alias.status())); + } + Ok(id) + } +} + +#[async_trait] +impl DeviceSecretAccess for OpenBaoDeviceSecretAccess { + async fn set_device_grants( + &self, + grants: &[(DeviceId, Vec)], + ) -> Result<(), SecretAccessError> { + for (device, deployments) in grants { + let device = String::from(device.clone()); + let policies: Vec = deployments + .iter() + .map(|d| format!("deployment-{}", d.as_str())) + .collect(); + let id = self.entity_id(&device).await?; + self.post( + &format!("identity/entity/id/{id}"), + json!({ "policies": policies }), + ) + .await? + .error_for_status() + .map_err(|e| Self::err(format!("set policies for {device}"), e))?; + } + Ok(()) + } +} diff --git a/harmony/src/modules/openbao/mod.rs b/harmony/src/modules/openbao/mod.rs index 6fe4e2b9..67315ed9 100644 --- a/harmony/src/modules/openbao/mod.rs +++ b/harmony/src/modules/openbao/mod.rs @@ -1,3 +1,4 @@ +pub mod identity; pub mod setup; use std::str::FromStr; @@ -13,10 +14,8 @@ use crate::{ topology::{HelmCommand, K8sclient, Topology}, }; -pub use setup::{ - OpenbaoEntity, OpenbaoEntityPoliciesScore, OpenbaoJwtAuth, OpenbaoPolicy, OpenbaoSetupScore, - OpenbaoUser, cached_root_token, -}; +pub use identity::OpenBaoDeviceSecretAccess; +pub use setup::{OpenbaoJwtAuth, OpenbaoPolicy, OpenbaoSetupScore, OpenbaoUser, cached_root_token}; const DEFAULT_NAMESPACE: &str = "openbao"; const DEFAULT_RELEASE: &str = "openbao"; diff --git a/harmony/src/modules/openbao/setup.rs b/harmony/src/modules/openbao/setup.rs index b162d826..a4b8d40a 100644 --- a/harmony/src/modules/openbao/setup.rs +++ b/harmony/src/modules/openbao/setup.rs @@ -24,40 +24,6 @@ pub struct OpenbaoPolicy { pub hcl: String, } -#[derive(Debug, Clone, Serialize)] -pub struct OpenbaoEntity { - pub name: String, - pub alias_name: String, - pub policies: Vec, -} - -#[derive(Deserialize)] -struct AuthMount { - accessor: String, -} - -#[derive(Debug, Deserialize)] -struct BaoRead { - data: T, -} - -#[derive(Debug, Deserialize)] -struct EntityRead { - id: String, -} - -#[derive(Debug, Clone, Serialize)] -pub struct OpenbaoEntityPoliciesScore { - pub instance: OpenbaoInstance, - pub entity_name: String, - pub policies: Vec, -} - -#[derive(Debug, Clone)] -struct OpenbaoEntityPoliciesInterpret { - score: OpenbaoEntityPoliciesScore, -} - /// A userpass user to create in OpenBao. #[derive(Debug, Clone, Serialize)] pub struct OpenbaoUser { @@ -117,9 +83,6 @@ pub struct OpenbaoSetupScore { #[serde(default)] pub policies: Vec, - #[serde(default)] - pub entities: Vec, - /// Userpass users to create. #[serde(default)] pub users: Vec, @@ -139,25 +102,12 @@ impl Default for OpenbaoSetupScore { instance: OpenbaoInstance::default(), kv_mount: default_kv_mount(), policies: Vec::new(), - entities: Vec::new(), users: Vec::new(), jwt_auth: None, } } } -impl Score for OpenbaoEntityPoliciesScore { - fn name(&self) -> String { - "OpenbaoEntityPoliciesScore".to_string() - } - - fn create_interpret(&self) -> Box> { - Box::new(OpenbaoEntityPoliciesInterpret { - score: self.clone(), - }) - } -} - impl Score for OpenbaoSetupScore { fn name(&self) -> String { "OpenbaoSetupScore".to_string() @@ -214,69 +164,6 @@ pub fn cached_root_token(instance: &OpenbaoInstance) -> Result { Ok(init.root_token) } -#[async_trait] -impl Interpret for OpenbaoEntityPoliciesInterpret { - async fn execute( - &self, - _inventory: &Inventory, - topology: &T, - ) -> Result { - let k8s = topology - .k8s_client() - .await - .map_err(|e| InterpretError::new(format!("Failed to get K8s client: {e}")))?; - - let root_token = cached_root_token(&self.score.instance).map_err(InterpretError::new)?; - - let policies = self.score.policies.join(","); - - k8s.exec_pod_capture_output( - &self.score.instance.pod(), - Some(&self.score.instance.namespace), - vec![ - "sh", - "-c", - &format!( - "export VAULT_TOKEN={} && bao write identity/entity/name/{} policies={}", - root_token, self.score.entity_name, policies - ), - ], - ) - .await - .map_err(|e| { - InterpretError::new(format!( - "Failed to update OpenBao entity '{}' policies: {e}", - self.score.entity_name - )) - })?; - - Ok(Outcome { - status: InterpretStatus::SUCCESS, - message: format!( - "OpenBao entity '{}' policies updated", - self.score.entity_name - ), - details: vec![format!("policies={policies}")], - }) - } - - fn get_name(&self) -> InterpretName { - InterpretName::Custom("OpenbaoEntityPolicies") - } - - fn get_version(&self) -> Version { - todo!() - } - - fn get_status(&self) -> InterpretStatus { - todo!() - } - - fn get_children(&self) -> Vec { - vec![] - } -} - impl OpenbaoSetupInterpret { async fn exec( &self, @@ -616,92 +503,6 @@ impl OpenbaoSetupInterpret { Ok(()) } - - async fn create_entities( - &self, - k8s: &harmony_k8s::K8sClient, - root_token: &str, - ) -> Result<(), InterpretError> { - for entity in &self.score.entities { - let policies = entity.policies.join(","); - self.bao( - k8s, - root_token, - &[ - "bao", - "write", - &format!("identity/entity/name/{}", entity.name), - &format!("policies={}", policies), - ], - ) - .await - .map_err(|e| { - InterpretError::new(format!("Failed to create entity '{}': {e}", entity.name)) - })?; - - let output = self - .bao( - k8s, - root_token, - &[ - "bao", - "read", - "-format=json", - &format!("identity/entity/name/{}", entity.name), - ], - ) - .await - .map_err(|e| { - InterpretError::new(format!("Failed to read entity '{}': {e}", entity.name)) - })?; - - let id = serde_json::from_str::>(&output) - .map_err(|e| { - InterpretError::new(format!( - "Failed to parse entity '{}' read output: {e}", - entity.name - )) - })? - .data - .id; - - let auth_list = self - .bao(k8s, root_token, &["bao", "auth", "list", "-format=json"]) - .await?; - - let mounts = - serde_json::from_str::>(&auth_list) - .map_err(|e| InterpretError::new(format!("Failed to parse auth list: {e}")))?; - - let accessor = mounts - .get("jwt/") - .ok_or_else(|| InterpretError::new("JWT auth mount not found".to_string()))? - .accessor - .clone(); - - let _ = self - .bao( - k8s, - root_token, - &[ - "bao", - "write", - "identity/entity-alias", - &format!("name={}", entity.alias_name), - &format!("mount_accessor={accessor}"), - &format!("canonical_id={id}"), - ], - ) - .await - .map_err(|e| { - InterpretError::new(format!( - "Failed to create alias for entity '{}': {e}", - entity.name - )) - })?; - } - Ok(()) - } } #[async_trait] @@ -741,7 +542,6 @@ impl Interpret for OpenbaoSetupInterpret { self.apply_policies(&k8s, &root_token).await?; self.create_users(&k8s, &root_token).await?; self.configure_jwt(&k8s, &root_token).await?; - self.create_entities(&k8s, &root_token).await?; let mut details = vec![ format!("root_token={}", root_token), -- 2.39.5