feat(harmony): harmony-sso deploy hardening (Zitadel + OpenBao Scores) #303

Merged
johnride merged 1 commits from pr/harmony-sso-deploy-hardening into master 2026-05-29 15:03:45 +00:00
6 changed files with 508 additions and 106 deletions

View File

@@ -138,6 +138,7 @@ async fn main() -> Result<()> {
external_port: None, external_port: None,
namespace: cli.zitadel_namespace.clone(), namespace: cli.zitadel_namespace.clone(),
cluster_issuer: cli.cluster_issuer.clone(), cluster_issuer: cli.cluster_issuer.clone(),
..Default::default()
}; };
log::info!( log::info!(
"[1/6] Zitadel helm: ns={} host={}", "[1/6] Zitadel helm: ns={} host={}",

View File

@@ -13,6 +13,24 @@ use tokio::time::sleep;
use crate::client::K8sClient; use crate::client::K8sClient;
/// Result of executing a command in a pod. Carries both streams plus the
/// reported status (`"Success"` or `"Failure"`) so callers can react
/// without losing output — important when the executed command writes
/// structured payloads to stdout on non-zero exits (the Vault / OpenBao
/// CLIs do this with `-format=json`).
#[derive(Debug, Clone)]
pub struct ExecOutput {
pub status: String,
pub stdout: String,
pub stderr: String,
}
impl ExecOutput {
pub fn is_success(&self) -> bool {
self.status == "Success"
}
}
impl K8sClient { impl K8sClient {
pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result<Option<Pod>, Error> { pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result<Option<Pod>, Error> {
let api: Api<Pod> = match namespace { let api: Api<Pod> = match namespace {
@@ -200,55 +218,76 @@ impl K8sClient {
namespace: Option<&str>, namespace: Option<&str>,
command: Vec<&str>, command: Vec<&str>,
) -> Result<String, String> { ) -> Result<String, String> {
// Convenience wrapper — preserved for backward compatibility:
// returns stdout on success, stderr on non-zero exit, and drops
// the other stream on each path. New callers should prefer
// `exec_pod_capture` which exposes both streams unconditionally,
// important when the failed command writes structured output to
// stdout (e.g. `bao status -format=json` exits 2 on a sealed or
// uninitialised vault but still emits its JSON payload).
let output = self.exec_pod_capture(pod_name, namespace, command).await?;
if output.is_success() {
Ok(output.stdout)
} else {
Err(output.stderr)
}
}
/// Execute a command in a pod and capture both stdout and stderr
/// regardless of the process exit status. The outer `Result` is
/// reserved for transport / API failures (couldn't reach the pod,
/// stream read error). A non-zero exit is reflected in
/// `ExecOutput::status` and the caller decides how to react.
pub async fn exec_pod_capture(
&self,
pod_name: &str,
namespace: Option<&str>,
command: Vec<&str>,
) -> Result<ExecOutput, String> {
let api: Api<Pod> = match namespace { let api: Api<Pod> = match namespace {
Some(ns) => Api::namespaced(self.client.clone(), ns), Some(ns) => Api::namespaced(self.client.clone(), ns),
None => Api::default_namespaced(self.client.clone()), None => Api::default_namespaced(self.client.clone()),
}; };
match api let mut process = api
.exec( .exec(
pod_name, pod_name,
command, command,
&AttachParams::default().stdout(true).stderr(true), &AttachParams::default().stdout(true).stderr(true),
) )
.await .await
{ .map_err(|e| e.to_string())?;
Err(e) => Err(e.to_string()),
Ok(mut process) => {
let status = process
.take_status()
.expect("No status handle")
.await
.expect("Status channel closed");
let mut stdout_buf = String::new(); let status = process
if let Some(mut stdout) = process.stdout() { .take_status()
stdout .expect("No status handle")
.read_to_string(&mut stdout_buf) .await
.await .expect("Status channel closed");
.map_err(|e| format!("Failed to read stdout: {e}"))?;
}
let mut stderr_buf = String::new(); let mut stdout = String::new();
if let Some(mut stderr) = process.stderr() { if let Some(mut s) = process.stdout() {
stderr s.read_to_string(&mut stdout)
.read_to_string(&mut stderr_buf) .await
.await .map_err(|e| format!("Failed to read stdout: {e}"))?;
.map_err(|e| format!("Failed to read stderr: {e}"))?;
}
if let Some(s) = status.status {
debug!("exec_pod status: {} - {:?}", s, status.details);
if s == "Success" {
Ok(stdout_buf)
} else {
Err(stderr_buf.to_string())
}
} else {
Err("No inner status from pod exec".to_string())
}
}
} }
let mut stderr = String::new();
if let Some(mut s) = process.stderr() {
s.read_to_string(&mut stderr)
.await
.map_err(|e| format!("Failed to read stderr: {e}"))?;
}
let status_field = status
.status
.ok_or_else(|| "No inner status from pod exec".to_string())?;
debug!("exec_pod status: {} - {:?}", status_field, status.details);
Ok(ExecOutput {
status: status_field,
stdout,
stderr,
})
} }
/// Execute a command in a specific pod by name (no output capture). /// Execute a command in a specific pod by name (no output capture).

View File

@@ -53,6 +53,12 @@ server:
path = "/openbao/data" path = "/openbao/data"
}} }}
audit "file" "file" {{
options {{
file_path = "/openbao/audit/audit.log"
}}
}}
service: service:
enabled: true enabled: true

View File

@@ -60,6 +60,13 @@ pub struct OpenbaoJwtAuth {
/// ///
/// All steps are idempotent: re-running skips already-completed work. /// All steps are idempotent: re-running skips already-completed work.
/// ///
/// The audit device is NOT configured here — recent OpenBao refuses
/// runtime `bao audit enable` (`cannot enable audit device via API`)
/// and requires a declarative `audit` stanza in the server config,
/// which [`OpenbaoScore`] sets in its helm values. KV v2 version
/// metadata does not record "who modified" a secret; that lives only
/// in the audit log (see the harmony_sso README for how to read it).
///
/// Unseal keys are cached at `~/.local/share/harmony/openbao/unseal-keys.json` /// Unseal keys are cached at `~/.local/share/harmony/openbao/unseal-keys.json`
/// (with `0600` permissions on Unix). This is a development convenience; production /// (with `0600` permissions on Unix). This is a development convenience; production
/// deployments should use auto-unseal (Transit, cloud KMS, etc.). /// deployments should use auto-unseal (Transit, cloud KMS, etc.).
@@ -189,38 +196,30 @@ impl OpenbaoSetupInterpret {
})?; })?;
let path = keys_file(); let path = keys_file();
// Source of truth for "is this vault initialized?" is OpenBao itself,
// not a `bao status` pre-check parsed from stderr — that probe is
// brittle because `exec_pod_capture_output` discards stdout on
// non-zero exit, and `bao status` exits 2 on a fresh-and-uninitialised
// vault. So we just attempt `operator init` and let OpenBao tell us
// authoritatively via its error message.
if path.exists() { if path.exists() {
// Verify the vault is actually initialized before trusting cached keys.
// If the cluster was recreated, the vault has a fresh PVC but the local
// keys file is stale.
let status = self.exec(k8s, vec!["bao", "status", "-format=json"]).await;
let is_initialized = match &status {
Ok(stdout) => !stdout.contains("\"initialized\":false"),
Err(e) => !e.contains("not initialized"),
};
if is_initialized {
info!("[OpenbaoSetup] Already initialized, loading existing keys");
let content = std::fs::read_to_string(&path)
.map_err(|e| InterpretError::new(format!("Failed to read keys: {e}")))?;
let init: InitOutput = serde_json::from_str(&content)
.map_err(|e| InterpretError::new(format!("Failed to parse keys: {e}")))?;
return Ok(init.root_token);
}
warn!( warn!(
"[OpenbaoSetup] Vault not initialized but stale keys file exists, re-initializing" "[OpenbaoSetup] Existing unseal-keys file at {:?} will be \
overwritten if OpenBao reports as freshly uninitialized",
path
); );
let _ = std::fs::remove_file(&path);
} }
info!("[OpenbaoSetup] Initializing OpenBao..."); info!("[OpenbaoSetup] Probing init state via `bao operator init`...");
let output = self let output = self
.exec(k8s, vec!["bao", "operator", "init", "-format=json"]) .exec(k8s, vec!["bao", "operator", "init", "-format=json"])
.await; .await;
match output { match output {
Ok(stdout) => { Ok(stdout) => {
// Fresh init — parse, persist, return root token. Overwrites
// any stale cached keys (warning already emitted above).
let init: InitOutput = serde_json::from_str(&stdout).map_err(|e| { let init: InitOutput = serde_json::from_str(&stdout).map_err(|e| {
InterpretError::new(format!("Failed to parse init output: {e}")) InterpretError::new(format!("Failed to parse init output: {e}"))
})?; })?;
@@ -239,11 +238,27 @@ impl OpenbaoSetupInterpret {
info!("[OpenbaoSetup] Initialized, keys saved to {:?}", path); info!("[OpenbaoSetup] Initialized, keys saved to {:?}", path);
Ok(init.root_token) Ok(init.root_token)
} }
Err(e) if e.contains("already initialized") => Err(InterpretError::new(format!( Err(e) if e.contains("already initialized") => {
"OpenBao already initialized but no local keys file at {:?}. \ // Existing vault — keys file MUST be present and correspond
Delete the cluster or restore the keys file.", // to this install, otherwise we can never unseal it.
path if !path.exists() {
))), return Err(InterpretError::new(format!(
"OpenBao reports already initialized but no local keys \
file at {:?}. Either restore the keys file or destroy \
the cluster (PV included) to re-init.",
path
)));
}
info!(
"[OpenbaoSetup] Vault is already initialized; loading cached keys from {:?}",
path
);
let content = std::fs::read_to_string(&path)
.map_err(|e| InterpretError::new(format!("Failed to read keys: {e}")))?;
let init: InitOutput = serde_json::from_str(&content)
.map_err(|e| InterpretError::new(format!("Failed to parse keys: {e}")))?;
Ok(init.root_token)
}
Err(e) => Err(InterpretError::new(format!( Err(e) => Err(InterpretError::new(format!(
"OpenBao operator init failed: {e}" "OpenBao operator init failed: {e}"
))), ))),
@@ -258,12 +273,38 @@ impl OpenbaoSetupInterpret {
sealed: bool, sealed: bool,
} }
// bao status exits 2 when sealed — treat exec error as "sealed" // `bao status -format=json` exits 2 on a sealed-but-initialised
let sealed = match self.exec(k8s, vec!["bao", "status", "-format=json"]).await { // vault but still emits its JSON payload on stdout. Use
Ok(stdout) => serde_json::from_str::<Status>(&stdout) // exec_pod_capture so we read both streams regardless of exit
// status and parse the `sealed` field authoritatively.
let sealed = match k8s
.exec_pod_capture(
&self.score.pod,
Some(&self.score.namespace),
vec!["bao", "status", "-format=json"],
)
.await
{
Ok(output) => serde_json::from_str::<Status>(&output.stdout)
.map(|s| s.sealed) .map(|s| s.sealed)
.unwrap_or(true), .unwrap_or_else(|_| {
Err(_) => true, // JSON missing or unparseable — fall back to the
// conservative default (treat as sealed) so we
// attempt unseal rather than silently skipping.
warn!(
"[OpenbaoSetup] Could not parse `bao status` JSON \
(stderr: {}); assuming vault is sealed",
output.stderr.trim()
);
true
}),
Err(e) => {
warn!(
"[OpenbaoSetup] `bao status` exec failed ({e}); \
assuming vault is sealed"
);
true
}
}; };
if !sealed { if !sealed {
@@ -525,3 +566,16 @@ impl<T: Topology + K8sclient> Interpret<T> for OpenbaoSetupInterpret {
vec![] vec![]
} }
} }
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn default_score_carries_expected_mounts() {
let s = OpenbaoSetupScore::default();
assert_eq!(s.namespace, "openbao");
assert_eq!(s.pod, "openbao-0");
assert_eq!(s.kv_mount, "secret");
}
}

View File

@@ -105,6 +105,20 @@ pub struct ZitadelScore {
/// (e.g. when the operator has a different issuer name). /// (e.g. when the operator has a different issuer name).
#[serde(default = "default_cluster_issuer")] #[serde(default = "default_cluster_issuer")]
pub cluster_issuer: String, pub cluster_issuer: String,
/// Whether the bootstrap admin user must change their password on
/// first login. Default is `false` so the password persisted via
/// `harmony_secret` stays valid for automation across re-runs —
/// without this, Zitadel's `PasswordChangeRequired` flag forces a
/// manual rotation the cache never observes, leaving harmony with
/// a stale credential.
///
/// Security trade-off: with `false`, the bootstrap password
/// remains in `~/.local/share/harmony/secrets/` until manually
/// rotated. For dev / CI installs this is fine; for production
/// installs set this to `true` and rotate the cached value through
/// a separate channel.
#[serde(default)]
pub password_change_required: bool,
} }
#[allow(dead_code)] #[allow(dead_code)]
@@ -163,6 +177,7 @@ impl Default for ZitadelScore {
external_port: None, external_port: None,
namespace: DEFAULT_NAMESPACE.to_string(), namespace: DEFAULT_NAMESPACE.to_string(),
cluster_issuer: "letsencrypt-prod".to_string(), cluster_issuer: "letsencrypt-prod".to_string(),
password_change_required: false,
} }
} }
} }
@@ -181,6 +196,7 @@ impl<T: Topology + K8sclient + HelmCommand + PostgreSQL> Score<T> for ZitadelSco
external_port: self.external_port, external_port: self.external_port,
namespace: self.namespace.clone(), namespace: self.namespace.clone(),
cluster_issuer: self.cluster_issuer.clone(), cluster_issuer: self.cluster_issuer.clone(),
password_change_required: self.password_change_required,
}) })
} }
} }
@@ -195,6 +211,7 @@ struct ZitadelInterpret {
external_port: Option<u32>, external_port: Option<u32>,
namespace: String, namespace: String,
cluster_issuer: String, cluster_issuer: String,
password_change_required: bool,
} }
#[async_trait] #[async_trait]
@@ -515,7 +532,7 @@ zitadel:
FirstName: "Zitadel" FirstName: "Zitadel"
LastName: "Admin" LastName: "Admin"
Email: "admin@zitadel.example.com" Email: "admin@zitadel.example.com"
PasswordChangeRequired: true PasswordChangeRequired: {password_change_required}
Machine: Machine:
Machine: Machine:
Username: "iam-admin" Username: "iam-admin"
@@ -671,6 +688,7 @@ login:
sc_run_as_user = sc_run_as_user, sc_run_as_user = sc_run_as_user,
sc_fs_group = sc_fs_group, sc_fs_group = sc_fs_group,
external_port = resolved_external_port, external_port = resolved_external_port,
password_change_required = self.password_change_required,
) )
} }
KubernetesDistribution::K3sFamily | KubernetesDistribution::Default => { KubernetesDistribution::K3sFamily | KubernetesDistribution::Default => {
@@ -698,7 +716,7 @@ zitadel:
FirstName: "Zitadel" FirstName: "Zitadel"
LastName: "Admin" LastName: "Admin"
Email: "admin@zitadel.example.com" Email: "admin@zitadel.example.com"
PasswordChangeRequired: true PasswordChangeRequired: {password_change_required}
Machine: Machine:
Machine: Machine:
Username: "iam-admin" Username: "iam-admin"
@@ -828,7 +846,8 @@ login:
db_host = db_host, db_host = db_host,
db_port = db_port, db_port = db_port,
admin_password = admin_password, admin_password = admin_password,
pg_superuser_secret = pg_superuser_secret pg_superuser_secret = pg_superuser_secret,
password_change_required = self.password_change_required,
) )
} }
}; };

View File

@@ -17,6 +17,34 @@ use harmony_types::id::Id;
const ADMIN_PAT_SECRET: &str = "iam-admin-pat"; const ADMIN_PAT_SECRET: &str = "iam-admin-pat";
const ZITADEL_NAMESPACE: &str = "zitadel"; const ZITADEL_NAMESPACE: &str = "zitadel";
/// Returns true when a Zitadel response is the gRPC-gateway "backend
/// not yet bound" transient. The HTTP listener is up (so /debug/ready
/// passed), but the in-process gRPC server on `[::1]:8080` hasn't
/// finished binding yet. gRPC code 14 is UNAVAILABLE — explicitly a
/// retry-me signal.
///
/// We match the response shape conservatively: 5xx status AND body
/// containing all three of `"code":14`, `transport: Error while dialing`,
/// and `dial tcp` so a generic 500 / 503 with an unrelated message does
/// not get retried forever.
/// Returns true when a Zitadel response body is the "you tried to write
/// what was already there" signal. Zitadel surfaces this as gRPC code 9
/// (FAILED_PRECONDITION) with message `No changes (COMMAND-1m88i)` on
/// update endpoints; we treat it as idempotent success so reconciling
/// declarative state is a no-op when the state already holds.
fn is_zitadel_no_changes(body: &str) -> bool {
body.contains("\"code\":9") && (body.contains("COMMAND-1m88i") || body.contains("No changes"))
}
fn is_grpc_transport_unavailable(status: reqwest::StatusCode, body: &str) -> bool {
if !status.is_server_error() {
return false;
}
body.contains("\"code\":14")
&& body.contains("transport: Error while dialing")
&& body.contains("dial tcp")
}
fn default_zitadel_namespace() -> String { fn default_zitadel_namespace() -> String {
ZITADEL_NAMESPACE.to_string() ZITADEL_NAMESPACE.to_string()
} }
@@ -332,13 +360,20 @@ struct AppSearchResult {
#[derive(Deserialize)] #[derive(Deserialize)]
struct AppSearchEntry { struct AppSearchEntry {
#[allow(dead_code)]
id: String, id: String,
name: String, name: String,
#[serde(rename = "oidcConfig")] #[serde(rename = "oidcConfig")]
oidc_config: Option<OidcConfig>, oidc_config: Option<OidcConfig>,
} }
/// Lightweight view of a Zitadel app surfaced by `find_app` — both the
/// internal app ID (needed to address the app for updates) and the
/// public OIDC client ID (returned to callers).
struct FoundApp {
id: String,
client_id: String,
}
#[derive(Deserialize)] #[derive(Deserialize)]
struct OidcConfig { struct OidcConfig {
#[serde(rename = "clientId")] #[serde(rename = "clientId")]
@@ -448,6 +483,10 @@ impl ZitadelSetupInterpret {
self.request(client, reqwest::Method::POST, path) self.request(client, reqwest::Method::POST, path)
} }
fn put(&self, client: &reqwest::Client, path: &str) -> reqwest::RequestBuilder {
self.request(client, reqwest::Method::PUT, path)
}
fn http_client(&self) -> Result<reqwest::Client, String> { fn http_client(&self) -> Result<reqwest::Client, String> {
let mut builder = reqwest::Client::builder(); let mut builder = reqwest::Client::builder();
if self.score.skip_tls { if self.score.skip_tls {
@@ -458,19 +497,41 @@ impl ZitadelSetupInterpret {
.map_err(|e| format!("Failed to build HTTP client: {e}")) .map_err(|e| format!("Failed to build HTTP client: {e}"))
} }
/// Block until Zitadel's `/debug/ready` returns 200, or the deadline /// Block until Zitadel is ready to accept *management* API calls, or
/// elapses. Helm reports success as soon as pods are Ready, but on /// the deadline elapses.
/// OKD the Route + cert-manager `Certificate` reconcile asynchronously
/// — for the first ~30-90s after install the route can serve the
/// OKD bootstrap CA cert (a CA:TRUE cert), which rustls rejects with
/// `CaUsedAsEndEntity`. We treat connect / TLS errors as transient
/// and retry; only `ready=true` from Zitadel itself unblocks setup.
/// ///
/// `/debug/ready` is unauthenticated and exposed by the Zitadel /// Zitadel exposes everything on the same port but with two distinct
/// container's debug HTTP server (port 8080, same vhost). It returns /// stacks behind it:
/// 200 once the database is reachable and migrations are applied, /// 1. an **HTTP listener** serving `/.well-known`, `/debug/ready`,
/// which is the earliest point management API calls succeed. /// and the login UI;
async fn wait_until_ready(&self, client: &reqwest::Client) -> Result<(), InterpretError> { /// 2. a **gRPC backend** on IPv6 loopback (`[::1]:8080`) serving
/// `/management/v1/*`, `/admin/v1/*`, etc., reached via an
/// HTTP-to-gRPC gateway in the same process.
///
/// `/debug/ready` flips to 200 as soon as (1) is up and DB migrations
/// are applied. The gRPC backend in (2) takes a few extra seconds to
/// bind — long enough that on a fast bring-up the first management
/// call races the gateway and gets back:
///
/// ```text
/// 503 Service Unavailable
/// {"code":14, "message":"connection error: desc = \"transport:
/// Error while dialing: dial tcp [::1]:8080: connect: connection refused\""}
/// ```
///
/// We therefore gate setup on two checks: first `/debug/ready` 200
/// (same as before — handles DB migrations and, on OKD, the
/// cert-manager `Certificate` reconcile that can serve the bootstrap
/// CA cert and trip rustls with `CaUsedAsEndEntity`), then a probe of
/// the actual management endpoint we're about to use, retrying as
/// long as it returns the gRPC-gateway transport-unavailable
/// signature. Any other terminal response (200, 4xx, non-transport
/// 5xx) unblocks setup.
async fn wait_until_ready(
&self,
client: &reqwest::Client,
pat: &str,
) -> Result<(), InterpretError> {
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
let url = self.api_url("/debug/ready"); let url = self.api_url("/debug/ready");
let timeout = Duration::from_secs(600); let timeout = Duration::from_secs(600);
@@ -490,11 +551,14 @@ impl ZitadelSetupInterpret {
match rb.send().await { match rb.send().await {
Ok(resp) if resp.status().is_success() => { Ok(resp) if resp.status().is_success() => {
info!( info!(
"[ZitadelSetup] Zitadel ready after {:.1}s ({} attempts)", "[ZitadelSetup] /debug/ready 200 after {:.1}s ({} attempts)\
probing management API",
started.elapsed().as_secs_f64(), started.elapsed().as_secs_f64(),
attempt attempt
); );
return Ok(()); return self
.wait_until_management_ready(client, pat, deadline)
.await;
} }
Ok(resp) => { Ok(resp) => {
last_err = format!("HTTP {}", resp.status()); last_err = format!("HTTP {}", resp.status());
@@ -526,6 +590,76 @@ impl ZitadelSetupInterpret {
))) )))
} }
/// Second-phase gate: poll the management API until it answers with
/// something other than the gRPC-gateway transport-unavailable
/// signature. Reuses the original deadline.
async fn wait_until_management_ready(
&self,
client: &reqwest::Client,
pat: &str,
deadline: std::time::Instant,
) -> Result<(), InterpretError> {
use std::time::{Duration, Instant};
let interval = Duration::from_secs(2);
let started = Instant::now();
let mut attempt: u32 = 0;
let mut last_err = String::from("not attempted");
while Instant::now() < deadline {
attempt += 1;
// Same shape as `find_project` (cheap name-equals query); the
// PAT is already loaded so we exercise the exact auth + gateway
// path setup is about to use.
let send = self
.post(client, "/management/v1/projects/_search")
.bearer_auth(pat)
.json(&serde_json::json!({
"queries": [
{ "nameQuery": { "name": "__harmony_readiness_probe__",
"method": "TEXT_QUERY_METHOD_EQUALS" } }
]
}))
.send()
.await;
match send {
Ok(resp) => {
let status = resp.status();
let body = resp.text().await.unwrap_or_default();
if is_grpc_transport_unavailable(status, &body) {
last_err = format!("HTTP {status} (gRPC backend not yet bound)");
} else {
info!(
"[ZitadelSetup] management API ready after {:.1}s ({} attempts, \
probe status {status})",
started.elapsed().as_secs_f64(),
attempt
);
return Ok(());
}
}
Err(e) => {
last_err = format!("{e}");
}
}
if attempt == 1 || attempt.is_multiple_of(15) {
debug!(
"[ZitadelSetup] management API still warming up \
(attempt {attempt}, {:.0}s elapsed): {last_err}",
started.elapsed().as_secs_f64()
);
}
tokio::time::sleep(interval).await;
}
Err(InterpretError::new(format!(
"Zitadel management API did not become ready within the readiness \
deadline ({attempt} probe attempts). Last response: {last_err}. The HTTP \
listener was up but the gRPC backend on [::1]:8080 never finished binding."
)))
}
async fn read_admin_pat(&self, k8s: &harmony_k8s::K8sClient) -> Result<String, InterpretError> { async fn read_admin_pat(&self, k8s: &harmony_k8s::K8sClient) -> Result<String, InterpretError> {
use k8s_openapi::api::core::v1::Secret; use k8s_openapi::api::core::v1::Secret;
@@ -712,7 +846,7 @@ impl ZitadelSetupInterpret {
pat: &str, pat: &str,
project_id: &str, project_id: &str,
app_name: &str, app_name: &str,
) -> Result<Option<String>, String> { ) -> Result<Option<FoundApp>, String> {
let resp = self let resp = self
.post( .post(
client, client,
@@ -734,7 +868,44 @@ impl ZitadelSetupInterpret {
.unwrap_or_default() .unwrap_or_default()
.into_iter() .into_iter()
.find(|a| a.name == app_name) .find(|a| a.name == app_name)
.and_then(|a| a.oidc_config.and_then(|c| c.client_id))) .and_then(|a| {
let client_id = a.oidc_config.and_then(|c| c.client_id)?;
Some(FoundApp {
id: a.id,
client_id,
})
}))
}
/// OIDC config body shared between create and update. The Zitadel
/// `UpdateOIDCAppConfig` endpoint accepts all the same fields as
/// create except `name`, which is managed via a separate endpoint
/// (we keep the app name stable so we don't need it).
fn device_code_oidc_config_body(&self) -> serde_json::Value {
serde_json::json!({
"redirectUris": [],
"responseTypes": ["OIDC_RESPONSE_TYPE_CODE"],
// DEVICE_CODE drives the initial browser login;
// REFRESH_TOKEN must also be listed or Zitadel won't
// honour the `grant_type=refresh_token` exchange that
// ZitadelOidcAuth::silent_refresh performs — without it,
// the cached session can't be silently refreshed and
// every expired-token run falls back to a fresh browser
// device flow. The device-authorization request already
// asks for `offline_access`; this is the app-side half
// of issuing a usable refresh token.
"grantTypes": ["OIDC_GRANT_TYPE_DEVICE_CODE", "OIDC_GRANT_TYPE_REFRESH_TOKEN"],
"appType": "OIDC_APP_TYPE_NATIVE",
"authMethodType": "OIDC_AUTH_METHOD_TYPE_NONE",
// Embed userinfo claims (email, preferred_username, etc.)
// into the id_token so downstream JWT verifiers can bind
// their `user_claim` to a meaningful value without making
// a separate userinfo call. Default Zitadel behaviour is
// to put these only in the userinfo response, which means
// a verifier configured with `user_claim=email` rejects
// the id_token with `claim "email" not found in token`.
"idTokenUserinfoAssertion": true
})
} }
async fn create_device_code_app( async fn create_device_code_app(
@@ -744,20 +915,16 @@ impl ZitadelSetupInterpret {
project_id: &str, project_id: &str,
app_name: &str, app_name: &str,
) -> Result<String, String> { ) -> Result<String, String> {
let mut body = self.device_code_oidc_config_body();
body["name"] = serde_json::json!(app_name);
let resp = self let resp = self
.post( .post(
client, client,
&format!("/management/v1/projects/{project_id}/apps/oidc"), &format!("/management/v1/projects/{project_id}/apps/oidc"),
) )
.bearer_auth(pat) .bearer_auth(pat)
.json(&serde_json::json!({ .json(&body)
"name": app_name,
"redirectUris": [],
"responseTypes": ["OIDC_RESPONSE_TYPE_CODE"],
"grantTypes": ["OIDC_GRANT_TYPE_DEVICE_CODE"],
"appType": "OIDC_APP_TYPE_NATIVE",
"authMethodType": "OIDC_AUTH_METHOD_TYPE_NONE"
}))
.send() .send()
.await .await
.map_err(|e| format!("Failed to create app: {e}"))?; .map_err(|e| format!("Failed to create app: {e}"))?;
@@ -776,6 +943,48 @@ impl ZitadelSetupInterpret {
.ok_or_else(|| "No clientId in app response".to_string()) .ok_or_else(|| "No clientId in app response".to_string())
} }
/// Reconciles an existing app's OIDC config to the desired shape.
/// `ZitadelSetupScore` owns the app definition declaratively, so any
/// drift (manual UI edits, schema changes between harmony versions)
/// is reset on each run. This is what saves users from having to
/// `--cleanup` the cluster after a Score update touches the OIDC
/// config (e.g. when `idTokenUserinfoAssertion` was added).
///
/// Zitadel returns gRPC code 9 (FAILED_PRECONDITION) with message
/// `No changes (COMMAND-1m88i)` when the PUT body matches the stored
/// config byte-for-byte. We treat that as the success case — the
/// declared state already holds — instead of letting reconciliation
/// fail every re-run after the first.
async fn update_device_code_oidc_config(
&self,
client: &reqwest::Client,
pat: &str,
project_id: &str,
app_id: &str,
) -> Result<(), String> {
let resp = self
.put(
client,
&format!("/management/v1/projects/{project_id}/apps/{app_id}/oidc_config"),
)
.bearer_auth(pat)
.json(&self.device_code_oidc_config_body())
.send()
.await
.map_err(|e| format!("Failed to update OIDC config: {e}"))?;
if resp.status().is_success() {
return Ok(());
}
let body = resp.text().await.unwrap_or_default();
if is_zitadel_no_changes(&body) {
debug!("[ZitadelSetup] OIDC config already at desired state (no changes)");
return Ok(());
}
Err(format!("Update OIDC config failed: {body}"))
}
async fn ensure_app( async fn ensure_app(
&self, &self,
client: &reqwest::Client, client: &reqwest::Client,
@@ -791,17 +1000,25 @@ impl ZitadelSetupInterpret {
.ensure_project(client, pat, &app.project_name, config) .ensure_project(client, pat, &app.project_name, config)
.await?; .await?;
if let Some(client_id) = self if let Some(found) = self
.find_app(client, pat, &project_id, &app.app_name) .find_app(client, pat, &project_id, &app.app_name)
.await .await
.map_err(InterpretError::new)? .map_err(InterpretError::new)?
{ {
info!( info!(
"[ZitadelSetup] App '{}' already exists: {}", "[ZitadelSetup] App '{}' already exists: {} — reconciling OIDC config",
app.app_name, client_id app.app_name, found.client_id
); );
config.apps.insert(app.app_name.clone(), client_id.clone()); match &app.app_type {
return Ok(client_id); ZitadelAppType::DeviceCode => self
.update_device_code_oidc_config(client, pat, &project_id, &found.id)
.await
.map_err(InterpretError::new)?,
}
config
.apps
.insert(app.app_name.clone(), found.client_id.clone());
return Ok(found.client_id);
} }
let client_id = match &app.app_type { let client_id = match &app.app_type {
@@ -1406,13 +1623,16 @@ impl<T: Topology + K8sclient> Interpret<T> for ZitadelSetupInterpret {
let client = self.http_client().map_err(InterpretError::new)?; let client = self.http_client().map_err(InterpretError::new)?;
// Block on /debug/ready before issuing any management call // Block on /debug/ready AND on the management API actually
// helm reports success when pods are Ready, but on OKD the // responding before issuing real work. Two distinct races to
// Route + cert-manager Certificate reconcile asynchronously // close: (a) on OKD the cert-manager Certificate reconciles
// and the first ~30-90s typically fails with TLS / connect // asynchronously and the first ~30-90s of TLS handshakes fail
// errors. Without this gate the very first POST to // with `CaUsedAsEndEntity`; (b) on any cluster, Zitadel's gRPC
// /management/v1/projects/_search dies with `CaUsedAsEndEntity`. // backend on [::1]:8080 binds a few seconds AFTER /debug/ready
self.wait_until_ready(&client).await?; // flips to 200, so the first POST to /management/... can race
// and get a 503 with `code:14` + `transport: Error while
// dialing`. Both are now handled in `wait_until_ready`.
self.wait_until_ready(&client, &pat).await?;
let mut config = ZitadelClientConfig::load().unwrap_or_default(); let mut config = ZitadelClientConfig::load().unwrap_or_default();
@@ -1652,6 +1872,69 @@ mod tests {
); );
} }
#[test]
fn zitadel_no_changes_matches_real_failure_body() {
// Verbatim body returned by Zitadel when a PUT to an OIDC
// config matches the stored value byte-for-byte.
let body = r#"{"code":9, "message":"No changes (COMMAND-1m88i)", "details":[{"@type":"type.googleapis.com/zitadel.v1.ErrorDetail", "id":"COMMAND-1m88i", "message":"No changes"}]}"#;
assert!(is_zitadel_no_changes(body));
}
#[test]
fn zitadel_no_changes_rejects_unrelated_code_9() {
// gRPC code 9 (FAILED_PRECONDITION) covers many things; only
// the COMMAND-1m88i / "No changes" specific signature should
// be treated as idempotent success.
let body = r#"{"code":9,"message":"resource exhausted","details":[{"id":"OTHER-xyz"}]}"#;
assert!(!is_zitadel_no_changes(body));
}
#[test]
fn zitadel_no_changes_rejects_other_codes() {
let body = r#"{"code":5,"message":"No changes"}"#;
assert!(!is_zitadel_no_changes(body));
}
#[test]
fn grpc_transport_unavailable_matches_real_failure_body() {
// The exact body returned by Zitadel's HTTP-to-gRPC gateway
// when the in-process gRPC backend on [::1]:8080 hasn't bound
// yet — observed verbatim during fresh k3d bring-ups.
let body = r#"{"code":14, "message":"connection error: desc = \"transport: Error while dialing: dial tcp [::1]:8080: connect: connection refused\""}"#;
assert!(is_grpc_transport_unavailable(
reqwest::StatusCode::SERVICE_UNAVAILABLE,
body
));
}
#[test]
fn grpc_transport_unavailable_ignores_2xx_responses() {
assert!(!is_grpc_transport_unavailable(
reqwest::StatusCode::OK,
r#"{"code":14,"transport: Error while dialing","dial tcp"}"#
));
}
#[test]
fn grpc_transport_unavailable_ignores_unrelated_5xx() {
// A generic 500 with no gRPC transport signature must NOT be
// treated as transient — we'd otherwise retry forever on a
// real backend bug.
assert!(!is_grpc_transport_unavailable(
reqwest::StatusCode::INTERNAL_SERVER_ERROR,
r#"{"code":2,"message":"unexpected internal error"}"#
));
}
#[test]
fn grpc_transport_unavailable_requires_all_signature_parts() {
// 503 with only one of the three markers — not a match.
assert!(!is_grpc_transport_unavailable(
reqwest::StatusCode::SERVICE_UNAVAILABLE,
r#"{"code":14,"message":"backend dropped"}"#
));
}
#[test] #[test]
fn score_deserialises_with_only_required_fields() { fn score_deserialises_with_only_required_fields() {
// Old cache / new caller minimum: just `host` + `namespace`. // Old cache / new caller minimum: just `host` + `namespace`.