diff --git a/examples/fleet_staging_install/src/main.rs b/examples/fleet_staging_install/src/main.rs index 01481f23..837c018e 100644 --- a/examples/fleet_staging_install/src/main.rs +++ b/examples/fleet_staging_install/src/main.rs @@ -138,6 +138,7 @@ async fn main() -> Result<()> { external_port: None, namespace: cli.zitadel_namespace.clone(), cluster_issuer: cli.cluster_issuer.clone(), + ..Default::default() }; log::info!( "[1/6] Zitadel helm: ns={} host={}", diff --git a/harmony-k8s/src/pod.rs b/harmony-k8s/src/pod.rs index 5866b538..c8f5b239 100644 --- a/harmony-k8s/src/pod.rs +++ b/harmony-k8s/src/pod.rs @@ -13,6 +13,24 @@ use tokio::time::sleep; use crate::client::K8sClient; +/// Result of executing a command in a pod. Carries both streams plus the +/// reported status (`"Success"` or `"Failure"`) so callers can react +/// without losing output — important when the executed command writes +/// structured payloads to stdout on non-zero exits (the Vault / OpenBao +/// CLIs do this with `-format=json`). +#[derive(Debug, Clone)] +pub struct ExecOutput { + pub status: String, + pub stdout: String, + pub stderr: String, +} + +impl ExecOutput { + pub fn is_success(&self) -> bool { + self.status == "Success" + } +} + impl K8sClient { pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result, Error> { let api: Api = match namespace { @@ -200,55 +218,76 @@ impl K8sClient { namespace: Option<&str>, command: Vec<&str>, ) -> Result { + // Convenience wrapper — preserved for backward compatibility: + // returns stdout on success, stderr on non-zero exit, and drops + // the other stream on each path. New callers should prefer + // `exec_pod_capture` which exposes both streams unconditionally, + // important when the failed command writes structured output to + // stdout (e.g. `bao status -format=json` exits 2 on a sealed or + // uninitialised vault but still emits its JSON payload). + let output = self.exec_pod_capture(pod_name, namespace, command).await?; + if output.is_success() { + Ok(output.stdout) + } else { + Err(output.stderr) + } + } + + /// Execute a command in a pod and capture both stdout and stderr + /// regardless of the process exit status. The outer `Result` is + /// reserved for transport / API failures (couldn't reach the pod, + /// stream read error). A non-zero exit is reflected in + /// `ExecOutput::status` and the caller decides how to react. + pub async fn exec_pod_capture( + &self, + pod_name: &str, + namespace: Option<&str>, + command: Vec<&str>, + ) -> Result { let api: Api = match namespace { Some(ns) => Api::namespaced(self.client.clone(), ns), None => Api::default_namespaced(self.client.clone()), }; - match api + let mut process = api .exec( pod_name, command, &AttachParams::default().stdout(true).stderr(true), ) .await - { - Err(e) => Err(e.to_string()), - Ok(mut process) => { - let status = process - .take_status() - .expect("No status handle") - .await - .expect("Status channel closed"); + .map_err(|e| e.to_string())?; - let mut stdout_buf = String::new(); - if let Some(mut stdout) = process.stdout() { - stdout - .read_to_string(&mut stdout_buf) - .await - .map_err(|e| format!("Failed to read stdout: {e}"))?; - } + let status = process + .take_status() + .expect("No status handle") + .await + .expect("Status channel closed"); - let mut stderr_buf = String::new(); - if let Some(mut stderr) = process.stderr() { - stderr - .read_to_string(&mut stderr_buf) - .await - .map_err(|e| format!("Failed to read stderr: {e}"))?; - } - - if let Some(s) = status.status { - debug!("exec_pod status: {} - {:?}", s, status.details); - if s == "Success" { - Ok(stdout_buf) - } else { - Err(stderr_buf.to_string()) - } - } else { - Err("No inner status from pod exec".to_string()) - } - } + let mut stdout = String::new(); + if let Some(mut s) = process.stdout() { + s.read_to_string(&mut stdout) + .await + .map_err(|e| format!("Failed to read stdout: {e}"))?; } + + let mut stderr = String::new(); + if let Some(mut s) = process.stderr() { + s.read_to_string(&mut stderr) + .await + .map_err(|e| format!("Failed to read stderr: {e}"))?; + } + + let status_field = status + .status + .ok_or_else(|| "No inner status from pod exec".to_string())?; + debug!("exec_pod status: {} - {:?}", status_field, status.details); + + Ok(ExecOutput { + status: status_field, + stdout, + stderr, + }) } /// Execute a command in a specific pod by name (no output capture). diff --git a/harmony/src/modules/openbao/mod.rs b/harmony/src/modules/openbao/mod.rs index 47904f61..e7e11bc5 100644 --- a/harmony/src/modules/openbao/mod.rs +++ b/harmony/src/modules/openbao/mod.rs @@ -53,6 +53,12 @@ server: path = "/openbao/data" }} + audit "file" "file" {{ + options {{ + file_path = "/openbao/audit/audit.log" + }} + }} + service: enabled: true diff --git a/harmony/src/modules/openbao/setup.rs b/harmony/src/modules/openbao/setup.rs index 3407768d..1971ed4b 100644 --- a/harmony/src/modules/openbao/setup.rs +++ b/harmony/src/modules/openbao/setup.rs @@ -60,6 +60,13 @@ pub struct OpenbaoJwtAuth { /// /// All steps are idempotent: re-running skips already-completed work. /// +/// The audit device is NOT configured here — recent OpenBao refuses +/// runtime `bao audit enable` (`cannot enable audit device via API`) +/// and requires a declarative `audit` stanza in the server config, +/// which [`OpenbaoScore`] sets in its helm values. KV v2 version +/// metadata does not record "who modified" a secret; that lives only +/// in the audit log (see the harmony_sso README for how to read it). +/// /// Unseal keys are cached at `~/.local/share/harmony/openbao/unseal-keys.json` /// (with `0600` permissions on Unix). This is a development convenience; production /// deployments should use auto-unseal (Transit, cloud KMS, etc.). @@ -189,38 +196,30 @@ impl OpenbaoSetupInterpret { })?; let path = keys_file(); + + // Source of truth for "is this vault initialized?" is OpenBao itself, + // not a `bao status` pre-check parsed from stderr — that probe is + // brittle because `exec_pod_capture_output` discards stdout on + // non-zero exit, and `bao status` exits 2 on a fresh-and-uninitialised + // vault. So we just attempt `operator init` and let OpenBao tell us + // authoritatively via its error message. if path.exists() { - // Verify the vault is actually initialized before trusting cached keys. - // If the cluster was recreated, the vault has a fresh PVC but the local - // keys file is stale. - let status = self.exec(k8s, vec!["bao", "status", "-format=json"]).await; - let is_initialized = match &status { - Ok(stdout) => !stdout.contains("\"initialized\":false"), - Err(e) => !e.contains("not initialized"), - }; - - if is_initialized { - info!("[OpenbaoSetup] Already initialized, loading existing keys"); - let content = std::fs::read_to_string(&path) - .map_err(|e| InterpretError::new(format!("Failed to read keys: {e}")))?; - let init: InitOutput = serde_json::from_str(&content) - .map_err(|e| InterpretError::new(format!("Failed to parse keys: {e}")))?; - return Ok(init.root_token); - } - warn!( - "[OpenbaoSetup] Vault not initialized but stale keys file exists, re-initializing" + "[OpenbaoSetup] Existing unseal-keys file at {:?} will be \ + overwritten if OpenBao reports as freshly uninitialized", + path ); - let _ = std::fs::remove_file(&path); } - info!("[OpenbaoSetup] Initializing OpenBao..."); + info!("[OpenbaoSetup] Probing init state via `bao operator init`..."); let output = self .exec(k8s, vec!["bao", "operator", "init", "-format=json"]) .await; match output { Ok(stdout) => { + // Fresh init — parse, persist, return root token. Overwrites + // any stale cached keys (warning already emitted above). let init: InitOutput = serde_json::from_str(&stdout).map_err(|e| { InterpretError::new(format!("Failed to parse init output: {e}")) })?; @@ -239,11 +238,27 @@ impl OpenbaoSetupInterpret { info!("[OpenbaoSetup] Initialized, keys saved to {:?}", path); Ok(init.root_token) } - Err(e) if e.contains("already initialized") => Err(InterpretError::new(format!( - "OpenBao already initialized but no local keys file at {:?}. \ - Delete the cluster or restore the keys file.", - path - ))), + Err(e) if e.contains("already initialized") => { + // Existing vault — keys file MUST be present and correspond + // to this install, otherwise we can never unseal it. + if !path.exists() { + return Err(InterpretError::new(format!( + "OpenBao reports already initialized but no local keys \ + file at {:?}. Either restore the keys file or destroy \ + the cluster (PV included) to re-init.", + path + ))); + } + info!( + "[OpenbaoSetup] Vault is already initialized; loading cached keys from {:?}", + path + ); + let content = std::fs::read_to_string(&path) + .map_err(|e| InterpretError::new(format!("Failed to read keys: {e}")))?; + let init: InitOutput = serde_json::from_str(&content) + .map_err(|e| InterpretError::new(format!("Failed to parse keys: {e}")))?; + Ok(init.root_token) + } Err(e) => Err(InterpretError::new(format!( "OpenBao operator init failed: {e}" ))), @@ -258,12 +273,38 @@ impl OpenbaoSetupInterpret { sealed: bool, } - // bao status exits 2 when sealed — treat exec error as "sealed" - let sealed = match self.exec(k8s, vec!["bao", "status", "-format=json"]).await { - Ok(stdout) => serde_json::from_str::(&stdout) + // `bao status -format=json` exits 2 on a sealed-but-initialised + // vault but still emits its JSON payload on stdout. Use + // exec_pod_capture so we read both streams regardless of exit + // status and parse the `sealed` field authoritatively. + let sealed = match k8s + .exec_pod_capture( + &self.score.pod, + Some(&self.score.namespace), + vec!["bao", "status", "-format=json"], + ) + .await + { + Ok(output) => serde_json::from_str::(&output.stdout) .map(|s| s.sealed) - .unwrap_or(true), - Err(_) => true, + .unwrap_or_else(|_| { + // JSON missing or unparseable — fall back to the + // conservative default (treat as sealed) so we + // attempt unseal rather than silently skipping. + warn!( + "[OpenbaoSetup] Could not parse `bao status` JSON \ + (stderr: {}); assuming vault is sealed", + output.stderr.trim() + ); + true + }), + Err(e) => { + warn!( + "[OpenbaoSetup] `bao status` exec failed ({e}); \ + assuming vault is sealed" + ); + true + } }; if !sealed { @@ -525,3 +566,16 @@ impl Interpret for OpenbaoSetupInterpret { vec![] } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn default_score_carries_expected_mounts() { + let s = OpenbaoSetupScore::default(); + assert_eq!(s.namespace, "openbao"); + assert_eq!(s.pod, "openbao-0"); + assert_eq!(s.kv_mount, "secret"); + } +} diff --git a/harmony/src/modules/zitadel/mod.rs b/harmony/src/modules/zitadel/mod.rs index cf0c657c..0be56877 100644 --- a/harmony/src/modules/zitadel/mod.rs +++ b/harmony/src/modules/zitadel/mod.rs @@ -105,6 +105,20 @@ pub struct ZitadelScore { /// (e.g. when the operator has a different issuer name). #[serde(default = "default_cluster_issuer")] pub cluster_issuer: String, + /// Whether the bootstrap admin user must change their password on + /// first login. Default is `false` so the password persisted via + /// `harmony_secret` stays valid for automation across re-runs — + /// without this, Zitadel's `PasswordChangeRequired` flag forces a + /// manual rotation the cache never observes, leaving harmony with + /// a stale credential. + /// + /// Security trade-off: with `false`, the bootstrap password + /// remains in `~/.local/share/harmony/secrets/` until manually + /// rotated. For dev / CI installs this is fine; for production + /// installs set this to `true` and rotate the cached value through + /// a separate channel. + #[serde(default)] + pub password_change_required: bool, } #[allow(dead_code)] @@ -163,6 +177,7 @@ impl Default for ZitadelScore { external_port: None, namespace: DEFAULT_NAMESPACE.to_string(), cluster_issuer: "letsencrypt-prod".to_string(), + password_change_required: false, } } } @@ -181,6 +196,7 @@ impl Score for ZitadelSco external_port: self.external_port, namespace: self.namespace.clone(), cluster_issuer: self.cluster_issuer.clone(), + password_change_required: self.password_change_required, }) } } @@ -195,6 +211,7 @@ struct ZitadelInterpret { external_port: Option, namespace: String, cluster_issuer: String, + password_change_required: bool, } #[async_trait] @@ -515,7 +532,7 @@ zitadel: FirstName: "Zitadel" LastName: "Admin" Email: "admin@zitadel.example.com" - PasswordChangeRequired: true + PasswordChangeRequired: {password_change_required} Machine: Machine: Username: "iam-admin" @@ -671,6 +688,7 @@ login: sc_run_as_user = sc_run_as_user, sc_fs_group = sc_fs_group, external_port = resolved_external_port, + password_change_required = self.password_change_required, ) } KubernetesDistribution::K3sFamily | KubernetesDistribution::Default => { @@ -698,7 +716,7 @@ zitadel: FirstName: "Zitadel" LastName: "Admin" Email: "admin@zitadel.example.com" - PasswordChangeRequired: true + PasswordChangeRequired: {password_change_required} Machine: Machine: Username: "iam-admin" @@ -828,7 +846,8 @@ login: db_host = db_host, db_port = db_port, admin_password = admin_password, - pg_superuser_secret = pg_superuser_secret + pg_superuser_secret = pg_superuser_secret, + password_change_required = self.password_change_required, ) } }; diff --git a/harmony/src/modules/zitadel/setup.rs b/harmony/src/modules/zitadel/setup.rs index 58d22a77..4fd88815 100644 --- a/harmony/src/modules/zitadel/setup.rs +++ b/harmony/src/modules/zitadel/setup.rs @@ -17,6 +17,34 @@ use harmony_types::id::Id; const ADMIN_PAT_SECRET: &str = "iam-admin-pat"; const ZITADEL_NAMESPACE: &str = "zitadel"; +/// Returns true when a Zitadel response is the gRPC-gateway "backend +/// not yet bound" transient. The HTTP listener is up (so /debug/ready +/// passed), but the in-process gRPC server on `[::1]:8080` hasn't +/// finished binding yet. gRPC code 14 is UNAVAILABLE — explicitly a +/// retry-me signal. +/// +/// We match the response shape conservatively: 5xx status AND body +/// containing all three of `"code":14`, `transport: Error while dialing`, +/// and `dial tcp` so a generic 500 / 503 with an unrelated message does +/// not get retried forever. +/// Returns true when a Zitadel response body is the "you tried to write +/// what was already there" signal. Zitadel surfaces this as gRPC code 9 +/// (FAILED_PRECONDITION) with message `No changes (COMMAND-1m88i)` on +/// update endpoints; we treat it as idempotent success so reconciling +/// declarative state is a no-op when the state already holds. +fn is_zitadel_no_changes(body: &str) -> bool { + body.contains("\"code\":9") && (body.contains("COMMAND-1m88i") || body.contains("No changes")) +} + +fn is_grpc_transport_unavailable(status: reqwest::StatusCode, body: &str) -> bool { + if !status.is_server_error() { + return false; + } + body.contains("\"code\":14") + && body.contains("transport: Error while dialing") + && body.contains("dial tcp") +} + fn default_zitadel_namespace() -> String { ZITADEL_NAMESPACE.to_string() } @@ -332,13 +360,20 @@ struct AppSearchResult { #[derive(Deserialize)] struct AppSearchEntry { - #[allow(dead_code)] id: String, name: String, #[serde(rename = "oidcConfig")] oidc_config: Option, } +/// Lightweight view of a Zitadel app surfaced by `find_app` — both the +/// internal app ID (needed to address the app for updates) and the +/// public OIDC client ID (returned to callers). +struct FoundApp { + id: String, + client_id: String, +} + #[derive(Deserialize)] struct OidcConfig { #[serde(rename = "clientId")] @@ -448,6 +483,10 @@ impl ZitadelSetupInterpret { self.request(client, reqwest::Method::POST, path) } + fn put(&self, client: &reqwest::Client, path: &str) -> reqwest::RequestBuilder { + self.request(client, reqwest::Method::PUT, path) + } + fn http_client(&self) -> Result { let mut builder = reqwest::Client::builder(); if self.score.skip_tls { @@ -458,19 +497,41 @@ impl ZitadelSetupInterpret { .map_err(|e| format!("Failed to build HTTP client: {e}")) } - /// Block until Zitadel's `/debug/ready` returns 200, or the deadline - /// elapses. Helm reports success as soon as pods are Ready, but on - /// OKD the Route + cert-manager `Certificate` reconcile asynchronously - /// — for the first ~30-90s after install the route can serve the - /// OKD bootstrap CA cert (a CA:TRUE cert), which rustls rejects with - /// `CaUsedAsEndEntity`. We treat connect / TLS errors as transient - /// and retry; only `ready=true` from Zitadel itself unblocks setup. + /// Block until Zitadel is ready to accept *management* API calls, or + /// the deadline elapses. /// - /// `/debug/ready` is unauthenticated and exposed by the Zitadel - /// container's debug HTTP server (port 8080, same vhost). It returns - /// 200 once the database is reachable and migrations are applied, - /// which is the earliest point management API calls succeed. - async fn wait_until_ready(&self, client: &reqwest::Client) -> Result<(), InterpretError> { + /// Zitadel exposes everything on the same port but with two distinct + /// stacks behind it: + /// 1. an **HTTP listener** serving `/.well-known`, `/debug/ready`, + /// and the login UI; + /// 2. a **gRPC backend** on IPv6 loopback (`[::1]:8080`) serving + /// `/management/v1/*`, `/admin/v1/*`, etc., reached via an + /// HTTP-to-gRPC gateway in the same process. + /// + /// `/debug/ready` flips to 200 as soon as (1) is up and DB migrations + /// are applied. The gRPC backend in (2) takes a few extra seconds to + /// bind — long enough that on a fast bring-up the first management + /// call races the gateway and gets back: + /// + /// ```text + /// 503 Service Unavailable + /// {"code":14, "message":"connection error: desc = \"transport: + /// Error while dialing: dial tcp [::1]:8080: connect: connection refused\""} + /// ``` + /// + /// We therefore gate setup on two checks: first `/debug/ready` 200 + /// (same as before — handles DB migrations and, on OKD, the + /// cert-manager `Certificate` reconcile that can serve the bootstrap + /// CA cert and trip rustls with `CaUsedAsEndEntity`), then a probe of + /// the actual management endpoint we're about to use, retrying as + /// long as it returns the gRPC-gateway transport-unavailable + /// signature. Any other terminal response (200, 4xx, non-transport + /// 5xx) unblocks setup. + async fn wait_until_ready( + &self, + client: &reqwest::Client, + pat: &str, + ) -> Result<(), InterpretError> { use std::time::{Duration, Instant}; let url = self.api_url("/debug/ready"); let timeout = Duration::from_secs(600); @@ -490,11 +551,14 @@ impl ZitadelSetupInterpret { match rb.send().await { Ok(resp) if resp.status().is_success() => { info!( - "[ZitadelSetup] Zitadel ready after {:.1}s ({} attempts)", + "[ZitadelSetup] /debug/ready 200 after {:.1}s ({} attempts) — \ + probing management API", started.elapsed().as_secs_f64(), attempt ); - return Ok(()); + return self + .wait_until_management_ready(client, pat, deadline) + .await; } Ok(resp) => { last_err = format!("HTTP {}", resp.status()); @@ -526,6 +590,76 @@ impl ZitadelSetupInterpret { ))) } + /// Second-phase gate: poll the management API until it answers with + /// something other than the gRPC-gateway transport-unavailable + /// signature. Reuses the original deadline. + async fn wait_until_management_ready( + &self, + client: &reqwest::Client, + pat: &str, + deadline: std::time::Instant, + ) -> Result<(), InterpretError> { + use std::time::{Duration, Instant}; + let interval = Duration::from_secs(2); + let started = Instant::now(); + let mut attempt: u32 = 0; + let mut last_err = String::from("not attempted"); + + while Instant::now() < deadline { + attempt += 1; + // Same shape as `find_project` (cheap name-equals query); the + // PAT is already loaded so we exercise the exact auth + gateway + // path setup is about to use. + let send = self + .post(client, "/management/v1/projects/_search") + .bearer_auth(pat) + .json(&serde_json::json!({ + "queries": [ + { "nameQuery": { "name": "__harmony_readiness_probe__", + "method": "TEXT_QUERY_METHOD_EQUALS" } } + ] + })) + .send() + .await; + + match send { + Ok(resp) => { + let status = resp.status(); + let body = resp.text().await.unwrap_or_default(); + if is_grpc_transport_unavailable(status, &body) { + last_err = format!("HTTP {status} (gRPC backend not yet bound)"); + } else { + info!( + "[ZitadelSetup] management API ready after {:.1}s ({} attempts, \ + probe status {status})", + started.elapsed().as_secs_f64(), + attempt + ); + return Ok(()); + } + } + Err(e) => { + last_err = format!("{e}"); + } + } + + if attempt == 1 || attempt.is_multiple_of(15) { + debug!( + "[ZitadelSetup] management API still warming up \ + (attempt {attempt}, {:.0}s elapsed): {last_err}", + started.elapsed().as_secs_f64() + ); + } + tokio::time::sleep(interval).await; + } + + Err(InterpretError::new(format!( + "Zitadel management API did not become ready within the readiness \ + deadline ({attempt} probe attempts). Last response: {last_err}. The HTTP \ + listener was up but the gRPC backend on [::1]:8080 never finished binding." + ))) + } + async fn read_admin_pat(&self, k8s: &harmony_k8s::K8sClient) -> Result { use k8s_openapi::api::core::v1::Secret; @@ -712,7 +846,7 @@ impl ZitadelSetupInterpret { pat: &str, project_id: &str, app_name: &str, - ) -> Result, String> { + ) -> Result, String> { let resp = self .post( client, @@ -734,7 +868,44 @@ impl ZitadelSetupInterpret { .unwrap_or_default() .into_iter() .find(|a| a.name == app_name) - .and_then(|a| a.oidc_config.and_then(|c| c.client_id))) + .and_then(|a| { + let client_id = a.oidc_config.and_then(|c| c.client_id)?; + Some(FoundApp { + id: a.id, + client_id, + }) + })) + } + + /// OIDC config body shared between create and update. The Zitadel + /// `UpdateOIDCAppConfig` endpoint accepts all the same fields as + /// create except `name`, which is managed via a separate endpoint + /// (we keep the app name stable so we don't need it). + fn device_code_oidc_config_body(&self) -> serde_json::Value { + serde_json::json!({ + "redirectUris": [], + "responseTypes": ["OIDC_RESPONSE_TYPE_CODE"], + // DEVICE_CODE drives the initial browser login; + // REFRESH_TOKEN must also be listed or Zitadel won't + // honour the `grant_type=refresh_token` exchange that + // ZitadelOidcAuth::silent_refresh performs — without it, + // the cached session can't be silently refreshed and + // every expired-token run falls back to a fresh browser + // device flow. The device-authorization request already + // asks for `offline_access`; this is the app-side half + // of issuing a usable refresh token. + "grantTypes": ["OIDC_GRANT_TYPE_DEVICE_CODE", "OIDC_GRANT_TYPE_REFRESH_TOKEN"], + "appType": "OIDC_APP_TYPE_NATIVE", + "authMethodType": "OIDC_AUTH_METHOD_TYPE_NONE", + // Embed userinfo claims (email, preferred_username, etc.) + // into the id_token so downstream JWT verifiers can bind + // their `user_claim` to a meaningful value without making + // a separate userinfo call. Default Zitadel behaviour is + // to put these only in the userinfo response, which means + // a verifier configured with `user_claim=email` rejects + // the id_token with `claim "email" not found in token`. + "idTokenUserinfoAssertion": true + }) } async fn create_device_code_app( @@ -744,20 +915,16 @@ impl ZitadelSetupInterpret { project_id: &str, app_name: &str, ) -> Result { + let mut body = self.device_code_oidc_config_body(); + body["name"] = serde_json::json!(app_name); + let resp = self .post( client, &format!("/management/v1/projects/{project_id}/apps/oidc"), ) .bearer_auth(pat) - .json(&serde_json::json!({ - "name": app_name, - "redirectUris": [], - "responseTypes": ["OIDC_RESPONSE_TYPE_CODE"], - "grantTypes": ["OIDC_GRANT_TYPE_DEVICE_CODE"], - "appType": "OIDC_APP_TYPE_NATIVE", - "authMethodType": "OIDC_AUTH_METHOD_TYPE_NONE" - })) + .json(&body) .send() .await .map_err(|e| format!("Failed to create app: {e}"))?; @@ -776,6 +943,48 @@ impl ZitadelSetupInterpret { .ok_or_else(|| "No clientId in app response".to_string()) } + /// Reconciles an existing app's OIDC config to the desired shape. + /// `ZitadelSetupScore` owns the app definition declaratively, so any + /// drift (manual UI edits, schema changes between harmony versions) + /// is reset on each run. This is what saves users from having to + /// `--cleanup` the cluster after a Score update touches the OIDC + /// config (e.g. when `idTokenUserinfoAssertion` was added). + /// + /// Zitadel returns gRPC code 9 (FAILED_PRECONDITION) with message + /// `No changes (COMMAND-1m88i)` when the PUT body matches the stored + /// config byte-for-byte. We treat that as the success case — the + /// declared state already holds — instead of letting reconciliation + /// fail every re-run after the first. + async fn update_device_code_oidc_config( + &self, + client: &reqwest::Client, + pat: &str, + project_id: &str, + app_id: &str, + ) -> Result<(), String> { + let resp = self + .put( + client, + &format!("/management/v1/projects/{project_id}/apps/{app_id}/oidc_config"), + ) + .bearer_auth(pat) + .json(&self.device_code_oidc_config_body()) + .send() + .await + .map_err(|e| format!("Failed to update OIDC config: {e}"))?; + + if resp.status().is_success() { + return Ok(()); + } + + let body = resp.text().await.unwrap_or_default(); + if is_zitadel_no_changes(&body) { + debug!("[ZitadelSetup] OIDC config already at desired state (no changes)"); + return Ok(()); + } + Err(format!("Update OIDC config failed: {body}")) + } + async fn ensure_app( &self, client: &reqwest::Client, @@ -791,17 +1000,25 @@ impl ZitadelSetupInterpret { .ensure_project(client, pat, &app.project_name, config) .await?; - if let Some(client_id) = self + if let Some(found) = self .find_app(client, pat, &project_id, &app.app_name) .await .map_err(InterpretError::new)? { info!( - "[ZitadelSetup] App '{}' already exists: {}", - app.app_name, client_id + "[ZitadelSetup] App '{}' already exists: {} — reconciling OIDC config", + app.app_name, found.client_id ); - config.apps.insert(app.app_name.clone(), client_id.clone()); - return Ok(client_id); + match &app.app_type { + ZitadelAppType::DeviceCode => self + .update_device_code_oidc_config(client, pat, &project_id, &found.id) + .await + .map_err(InterpretError::new)?, + } + config + .apps + .insert(app.app_name.clone(), found.client_id.clone()); + return Ok(found.client_id); } let client_id = match &app.app_type { @@ -1406,13 +1623,16 @@ impl Interpret for ZitadelSetupInterpret { let client = self.http_client().map_err(InterpretError::new)?; - // Block on /debug/ready before issuing any management call — - // helm reports success when pods are Ready, but on OKD the - // Route + cert-manager Certificate reconcile asynchronously - // and the first ~30-90s typically fails with TLS / connect - // errors. Without this gate the very first POST to - // /management/v1/projects/_search dies with `CaUsedAsEndEntity`. - self.wait_until_ready(&client).await?; + // Block on /debug/ready AND on the management API actually + // responding before issuing real work. Two distinct races to + // close: (a) on OKD the cert-manager Certificate reconciles + // asynchronously and the first ~30-90s of TLS handshakes fail + // with `CaUsedAsEndEntity`; (b) on any cluster, Zitadel's gRPC + // backend on [::1]:8080 binds a few seconds AFTER /debug/ready + // flips to 200, so the first POST to /management/... can race + // and get a 503 with `code:14` + `transport: Error while + // dialing`. Both are now handled in `wait_until_ready`. + self.wait_until_ready(&client, &pat).await?; let mut config = ZitadelClientConfig::load().unwrap_or_default(); @@ -1652,6 +1872,69 @@ mod tests { ); } + #[test] + fn zitadel_no_changes_matches_real_failure_body() { + // Verbatim body returned by Zitadel when a PUT to an OIDC + // config matches the stored value byte-for-byte. + let body = r#"{"code":9, "message":"No changes (COMMAND-1m88i)", "details":[{"@type":"type.googleapis.com/zitadel.v1.ErrorDetail", "id":"COMMAND-1m88i", "message":"No changes"}]}"#; + assert!(is_zitadel_no_changes(body)); + } + + #[test] + fn zitadel_no_changes_rejects_unrelated_code_9() { + // gRPC code 9 (FAILED_PRECONDITION) covers many things; only + // the COMMAND-1m88i / "No changes" specific signature should + // be treated as idempotent success. + let body = r#"{"code":9,"message":"resource exhausted","details":[{"id":"OTHER-xyz"}]}"#; + assert!(!is_zitadel_no_changes(body)); + } + + #[test] + fn zitadel_no_changes_rejects_other_codes() { + let body = r#"{"code":5,"message":"No changes"}"#; + assert!(!is_zitadel_no_changes(body)); + } + + #[test] + fn grpc_transport_unavailable_matches_real_failure_body() { + // The exact body returned by Zitadel's HTTP-to-gRPC gateway + // when the in-process gRPC backend on [::1]:8080 hasn't bound + // yet — observed verbatim during fresh k3d bring-ups. + let body = r#"{"code":14, "message":"connection error: desc = \"transport: Error while dialing: dial tcp [::1]:8080: connect: connection refused\""}"#; + assert!(is_grpc_transport_unavailable( + reqwest::StatusCode::SERVICE_UNAVAILABLE, + body + )); + } + + #[test] + fn grpc_transport_unavailable_ignores_2xx_responses() { + assert!(!is_grpc_transport_unavailable( + reqwest::StatusCode::OK, + r#"{"code":14,"transport: Error while dialing","dial tcp"}"# + )); + } + + #[test] + fn grpc_transport_unavailable_ignores_unrelated_5xx() { + // A generic 500 with no gRPC transport signature must NOT be + // treated as transient — we'd otherwise retry forever on a + // real backend bug. + assert!(!is_grpc_transport_unavailable( + reqwest::StatusCode::INTERNAL_SERVER_ERROR, + r#"{"code":2,"message":"unexpected internal error"}"# + )); + } + + #[test] + fn grpc_transport_unavailable_requires_all_signature_parts() { + // 503 with only one of the three markers — not a match. + assert!(!is_grpc_transport_unavailable( + reqwest::StatusCode::SERVICE_UNAVAILABLE, + r#"{"code":14,"message":"backend dropped"}"# + )); + } + #[test] fn score_deserialises_with_only_required_fields() { // Old cache / new caller minimum: just `host` + `namespace`.