diff --git a/Cargo.lock b/Cargo.lock
index 45330c58..0c7bbac5 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3498,6 +3498,7 @@ name = "example_fleet_staging_install"
version = "0.1.0"
dependencies = [
"anyhow",
+ "base64 0.22.1",
"clap",
"harmony",
"harmony-fleet-deploy",
@@ -4104,6 +4105,7 @@ dependencies = [
"harmony_config",
"harmony_macros",
"harmony_types",
+ "harmony_zitadel_auth",
"k8s-openapi",
"kube",
"log",
@@ -4164,6 +4166,7 @@ dependencies = [
"harmony",
"harmony-fleet-auth",
"harmony-reconciler-contracts",
+ "harmony_config",
"harmony_zitadel_auth",
"k8s-openapi",
"kube",
@@ -4533,10 +4536,12 @@ dependencies = [
"axum-extra",
"base64 0.22.1",
"chrono",
+ "harmony_config",
"jsonwebtoken",
"openidconnect",
"rand 0.9.4",
"reqwest 0.12.28",
+ "schemars 0.8.22",
"serde",
"serde_json",
"sha2 0.10.9",
diff --git a/docs/SUMMARY.md b/docs/SUMMARY.md
index 10ae47c4..14798669 100644
--- a/docs/SUMMARY.md
+++ b/docs/SUMMARY.md
@@ -24,6 +24,7 @@
- [Writing a Topology](./guides/writing-a-topology.md)
- [Adding Capabilities](./guides/adding-capabilities.md)
- [Web Authentication and CSRF Security](./guides/web-auth-security.md)
+- [Operator Dashboard SSO (Zitadel) — setup](./guides/operator-dashboard-sso.md)
## Configuration
diff --git a/docs/guides/operator-dashboard-sso.md b/docs/guides/operator-dashboard-sso.md
new file mode 100644
index 00000000..c22eafb2
--- /dev/null
+++ b/docs/guides/operator-dashboard-sso.md
@@ -0,0 +1,55 @@
+# Operator Dashboard SSO (Zitadel) — setup
+
+Browser SSO for the fleet operator web UI (OIDC Authorization Code + PKCE,
+public client). Distinct from the agent/callout machine auth
+([fleet-zitadel-faq](./fleet-zitadel-faq.md)); the security rationale is in
+[web-auth-security](./web-auth-security.md). Code: `harmony_zitadel_auth/`.
+
+## Quickstart (staging)
+
+1. **Zitadel app** — create a **Web** application, auth method **PKCE** (no client
+ secret), redirect URI `https://fleet-stg./auth/callback`, post-logout URI
+ `https://fleet-stg./`. Copy its **Client ID**.
+2. **Seed config** in OpenBao (namespace `fleet-staging`) — the deploy derives every
+ host from `base_domain`, so you set only:
+ - `FleetDeployConfig.operator_oidc_client_id` = the Client ID
+ - `FleetDeployConfig.operator_trusted_audiences` = `[""]`
+ - `FleetDeploySecrets.operator_cookie_key_b64` = `openssl rand -base64 64`
+3. **Deploy**: `./fleet/scripts/dev-deploy-operator.sh`
+4. Open `https://fleet-stg./` → Zitadel login → back to the dashboard.
+
+`fleet_staging_install` already generates the cookie key, so a fresh install needs
+only the Client ID + audiences.
+
+## Local dev (`serve-web`)
+
+`fleet/harmony-fleet-operator/dev.sh` sets the same config as two
+`HARMONY_CONFIG_` env JSON blobs (ConfigClient's env source). Point
+`base_url` at `http://localhost:18080`, register that callback in the app, and turn
+on the app's **Development Mode** (Zitadel rejects non-HTTPS redirects otherwise).
+
+## When login fails — check these first
+
+- **`iss` mismatch** — `zitadel_base` must equal the token issuer byte-for-byte, no
+ trailing slash.
+- **`aud` mismatch** — `trusted_audiences` must contain the token's `aud`; Zitadel
+ puts the app's Client ID there by default.
+- **Client secret** — the app must be PKCE-only; the code never sends a secret.
+- **Redirect URI** — must be exactly `{base_url}/auth/callback`.
+- **Cookie key** — `cookie_key_b64` must decode to ≥64 bytes, else the dashboard
+ refuses to start (`cookie_key_b64 must decode to at least 64 bytes`; reconcile
+ keeps running).
+
+## Config reference
+
+The operator reads `ZitadelAuthConfig` + `OperatorCookieKey` via ConfigClient. The
+deploy derives `zitadel_base` / `base_url` / `logout_redirect_uri` from `base_domain`
+(`https://sso-stg.`, `https://fleet-stg.`, `…/`) and fixes
+`scope = openid profile email`; you supply `client_id`, `trusted_audiences`,
+`cookie_key_b64`. All endpoints derive from `zitadel_base`:
+`/.well-known/openid-configuration`, `/oauth/v2/authorize`, `/oauth/v2/token`,
+`/oidc/v1/end_session`.
+
+> The dashboard only checks that the user authenticated — no role gate yet
+> ([web-auth-security](./web-auth-security.md) §3,
+> [ROADMAP/09](../../ROADMAP/09-sso-config-hardening.md)).
diff --git a/docs/guides/web-auth-security.md b/docs/guides/web-auth-security.md
index 8c52a52e..139189af 100644
--- a/docs/guides/web-auth-security.md
+++ b/docs/guides/web-auth-security.md
@@ -2,6 +2,8 @@
These guidelines define the baseline for Harmony web frontends and future operator dashboards that use browser-based authentication, cookie sessions, Axum, HTMX, or OIDC providers such as Zitadel.
+> Setting up the fleet operator dashboard's SSO concretely (Zitadel app + the exact config values)? See [Operator Dashboard SSO (Zitadel) — setup](./operator-dashboard-sso.md).
+
## Goals
- Prevent unauthenticated access.
diff --git a/examples/fleet_e2e_demo/src/lib.rs b/examples/fleet_e2e_demo/src/lib.rs
index 2f0e78b4..d615ee1b 100644
--- a/examples/fleet_e2e_demo/src/lib.rs
+++ b/examples/fleet_e2e_demo/src/lib.rs
@@ -680,6 +680,8 @@ key_json = """
log_level: "info,kube_runtime=warn".to_string(),
credentials: Some(OperatorCredentials { credentials_toml }),
chart_version: None,
+ web_auth_config_json: None,
+ web_cookie_key_json: None,
};
// CRDs first — the operator watches them on startup.
diff --git a/examples/fleet_staging_install/Cargo.toml b/examples/fleet_staging_install/Cargo.toml
index 24bdaa4b..f438eafb 100644
--- a/examples/fleet_staging_install/Cargo.toml
+++ b/examples/fleet_staging_install/Cargo.toml
@@ -20,6 +20,7 @@ harmony-nats-callout = { path = "../../nats/callout" }
harmony-fleet-deploy = { path = "../../fleet/harmony-fleet-deploy" }
nkeys = "0.4"
rand = "0.9"
+base64.workspace = true
anyhow.workspace = true
clap = { version = "4", features = ["derive", "env"] }
tokio.workspace = true
diff --git a/examples/fleet_staging_install/src/main.rs b/examples/fleet_staging_install/src/main.rs
index a3dfa9d8..a2ffd675 100644
--- a/examples/fleet_staging_install/src/main.rs
+++ b/examples/fleet_staging_install/src/main.rs
@@ -347,6 +347,7 @@ path "secret/metadata/harmony/*" { capabilities = ["list","read"] }"#
.set(&FleetDeploySecrets {
operator_credentials_toml: credentials.credentials_toml.clone(),
kubeconfig: None,
+ operator_cookie_key_b64: generate_cookie_key_b64(),
})
.await
.context("seed FleetDeploySecrets")?;
@@ -378,3 +379,13 @@ fn generate_alphanum(len: usize) -> String {
.map(|_| CHARSET[rng.random_range(0..CHARSET.len())] as char)
.collect()
}
+
+/// Standard-base64 of 64 random bytes — the operator's session-cookie
+/// signing key. Generated per install and seeded as a secret.
+fn generate_cookie_key_b64() -> String {
+ use base64::Engine;
+ use rand::Rng;
+ let mut bytes = [0u8; 64];
+ rand::rng().fill(&mut bytes);
+ base64::engine::general_purpose::STANDARD.encode(bytes)
+}
diff --git a/fleet/harmony-fleet-deploy/Cargo.toml b/fleet/harmony-fleet-deploy/Cargo.toml
index 5b760da4..130d4bd0 100644
--- a/fleet/harmony-fleet-deploy/Cargo.toml
+++ b/fleet/harmony-fleet-deploy/Cargo.toml
@@ -29,6 +29,7 @@ harmony_types = { path = "../../harmony_types" }
harmony_macros = { path = "../../harmony_macros" }
harmony-fleet-auth = { path = "../harmony-fleet-auth" }
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
+harmony_zitadel_auth = { path = "../../harmony_zitadel_auth" }
anyhow = { workspace = true }
async-trait = { workspace = true }
diff --git a/fleet/harmony-fleet-deploy/src/bin/harmony-fleet-publish.rs b/fleet/harmony-fleet-deploy/src/bin/harmony-fleet-publish.rs
index 00298cec..8d4adefb 100644
--- a/fleet/harmony-fleet-deploy/src/bin/harmony-fleet-publish.rs
+++ b/fleet/harmony-fleet-deploy/src/bin/harmony-fleet-publish.rs
@@ -1,21 +1,29 @@
//! `harmony-fleet-publish` — build + publish the operator image + chart
-//! for a tagged release. `docker` / `helm` must be on PATH and logged in
-//! to the registry (CI's login actions; dev's manual login).
+//! at a version. `docker` / `helm` must be on PATH and logged in to the
+//! registry (CI's login actions; dev's manual login).
-use anyhow::Result;
+use anyhow::{Result, bail};
use clap::Parser;
use harmony_fleet_deploy::release::{release_operator, version_from_tag};
#[derive(Parser, Debug)]
#[command(
name = "harmony-fleet-publish",
- about = "Build + publish the operator image + chart for a tagged release"
+ about = "Build + publish the operator image + chart at a version"
)]
struct Cli {
- /// Git tag, e.g. `harmony-fleet-operator-v0.1.0`. Defaults to
- /// `$GITHUB_REF_NAME` so CI passes nothing.
+ /// The image + chart version, e.g. `0.1.0` or `0.0.0-dev.1730000000`
+ /// (must be valid semver — helm rejects anything else). Wins over
+ /// `--from-tag`; the laptop/dev path.
+ #[arg(long)]
+ version: Option,
+
+ /// Parse the version out of a release tag like
+ /// `harmony-fleet-operator-v0.1.0`. Defaults to `$GITHUB_REF_NAME`
+ /// so the release workflow passes nothing. Used when `--version`
+ /// is unset.
#[arg(long, env = "GITHUB_REF_NAME")]
- from_tag: String,
+ from_tag: Option,
#[arg(long, default_value = "hub.nationtech.io")]
registry: String,
@@ -28,9 +36,19 @@ struct Cli {
no_push: bool,
}
+impl Cli {
+ fn version(&self) -> Result {
+ match (&self.version, &self.from_tag) {
+ (Some(v), _) => Ok(v.clone()),
+ (None, Some(tag)) => version_from_tag(tag),
+ (None, None) => bail!("set --version (e.g. 0.1.0) or --from-tag"),
+ }
+ }
+}
+
fn main() -> Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let cli = Cli::parse();
- let version = version_from_tag(&cli.from_tag)?;
+ let version = cli.version()?;
release_operator(&version, &cli.registry, &cli.project, !cli.no_push)
}
diff --git a/fleet/harmony-fleet-deploy/src/main.rs b/fleet/harmony-fleet-deploy/src/main.rs
index 97dfa828..0eb9ff95 100644
--- a/fleet/harmony-fleet-deploy/src/main.rs
+++ b/fleet/harmony-fleet-deploy/src/main.rs
@@ -18,6 +18,7 @@ use harmony_config::ConfigClient;
use harmony_fleet_deploy::{
FleetDeployConfig, FleetDeploySecrets, FleetOperatorScore, version_from_tag,
};
+use harmony_zitadel_auth::{OperatorCookieKey, ZitadelAuthConfig};
use tracing::info;
#[derive(Parser, Debug)]
@@ -114,12 +115,28 @@ async fn main() -> Result<()> {
// Coherent with the other staging hosts (sso-stg., secrets-stg.).
let ui_host = format!("fleet-stg.{}", config.base_domain);
+ // Dashboard SSO config: hosts derive from base_domain; client_id +
+ // audiences come from config, cookie key from secrets. Baked into the
+ // operator Secret for the pod's ConfigClient to read.
+ let web_auth = ZitadelAuthConfig {
+ zitadel_base: format!("https://sso-stg.{}", config.base_domain),
+ base_url: format!("https://{ui_host}"),
+ client_id: config.operator_oidc_client_id,
+ scope: "openid profile email".to_string(),
+ trusted_audiences: config.operator_trusted_audiences,
+ logout_redirect_uri: format!("https://{ui_host}/"),
+ };
+ let cookie = OperatorCookieKey {
+ cookie_key_b64: secrets.operator_cookie_key_b64,
+ };
+
let operator = FleetOperatorScore::new()
.namespace(namespace)
.nats_url(config.nats_url)
.credentials(secrets.operator_credentials_toml)
.published_chart(registry, project, version)
- .ingress(ui_host, Some(config.cluster_issuer));
+ .ingress(ui_host, Some(config.cluster_issuer))
+ .web_auth(web_auth, cookie);
harmony_cli::run(
Inventory::autoload(),
diff --git a/fleet/harmony-fleet-deploy/src/operator/chart.rs b/fleet/harmony-fleet-deploy/src/operator/chart.rs
index f870a21b..c7fb19ef 100644
--- a/fleet/harmony-fleet-deploy/src/operator/chart.rs
+++ b/fleet/harmony-fleet-deploy/src/operator/chart.rs
@@ -70,6 +70,13 @@ pub struct ChartOptions {
/// sets this to the released tag so the OCI chart artifact lands
/// at `…/harmony-fleet-operator-chart:` matching the image tag.
pub chart_version: Option,
+ /// JSON of the dashboard's `ZitadelAuthConfig`, stored in the
+ /// operator Secret under [`ENV_WEB_AUTH_CONFIG`]. `None` leaves the
+ /// dashboard unauthenticated (dev/e2e).
+ pub web_auth_config_json: Option,
+ /// JSON of the dashboard's `OperatorCookieKey`, stored under
+ /// [`ENV_WEB_COOKIE_KEY`].
+ pub web_cookie_key_json: Option,
}
/// What the operator pod needs to authenticate to NATS via the auth
@@ -134,6 +141,8 @@ impl Default for ChartOptions {
log_level: "info,kube_runtime=warn".to_string(),
credentials: None,
chart_version: None,
+ web_auth_config_json: None,
+ web_cookie_key_json: None,
}
}
}
@@ -153,6 +162,15 @@ pub const OPERATOR_HTTP_PORT: u16 = 18080;
/// including the inline JSON keyfile under `key_json`.
pub const SECRET_KEY_CREDENTIALS_TOML: &str = "credentials.toml";
+/// Env var + operator-Secret key the dashboard's `ConfigClient` reads
+/// its `ZitadelAuthConfig` / `OperatorCookieKey` from. The value is the
+/// JSON of those structs; `ConfigClient`'s `EnvSource` keys a config
+/// type under `HARMONY_CONFIG_`. Locked to the structs' Config
+/// keys by a test below — the chart wires the env→Secret reference at
+/// publish time, the deploy fills the Secret values at deploy time.
+pub const ENV_WEB_AUTH_CONFIG: &str = "HARMONY_CONFIG_ZitadelAuthConfig";
+pub const ENV_WEB_COOKIE_KEY: &str = "HARMONY_CONFIG_OperatorCookieKey";
+
/// Build + write the chart to `opts.output_dir`. Returns the full
/// path to the generated chart directory (which is what `helm
/// install ` wants).
@@ -215,6 +233,21 @@ pub fn operator_secret(opts: &ChartOptions) -> Option {
SECRET_KEY_CREDENTIALS_TOML.to_string(),
ByteString(creds.credentials_toml.as_bytes().to_vec()),
);
+ // Dashboard auth config + cookie key (when configured) ride in the
+ // same Secret; the Deployment sources them as HARMONY_CONFIG_* env
+ // for the operator's ConfigClient.
+ if let Some(json) = &opts.web_auth_config_json {
+ data.insert(
+ ENV_WEB_AUTH_CONFIG.to_string(),
+ ByteString(json.as_bytes().to_vec()),
+ );
+ }
+ if let Some(json) = &opts.web_cookie_key_json {
+ data.insert(
+ ENV_WEB_COOKIE_KEY.to_string(),
+ ByteString(json.as_bytes().to_vec()),
+ );
+ }
// Namespace deliberately omitted — the caller passes the target
// namespace to `K8sResourceScore::single`, which injects it at
// apply time. Keeps the Secret manifest reusable across
@@ -294,7 +327,7 @@ fn cluster_role() -> ClusterRole {
// Devices: reconciler server-side-applies + deletes;
// aggregator lists + watches.
PolicyRule {
- api_groups: Some(vec![group]),
+ api_groups: Some(vec![group.clone()]),
resources: Some(vec!["devices".to_string()]),
verbs: vec![
"get", "list", "watch", "create", "update", "patch", "delete",
@@ -304,6 +337,17 @@ fn cluster_role() -> ClusterRole {
.collect(),
..Default::default()
},
+ // Device liveness: the device-status reconciler patches the
+ // status subresource — a distinct RBAC resource from `devices`.
+ PolicyRule {
+ api_groups: Some(vec![group]),
+ resources: Some(vec!["devices/status".to_string()]),
+ verbs: vec!["get", "update", "patch"]
+ .into_iter()
+ .map(String::from)
+ .collect(),
+ ..Default::default()
+ },
]),
..Default::default()
}
@@ -383,6 +427,22 @@ fn operator_deployment(opts: &ChartOptions) -> K8sDeployment {
// pod compatible with OKD's restricted-v2 SCC and the
// `harmony_fleet_auth::CredentialsSection` deserializer
// handles inline-vs-file from the same TOML shape.
+ // All three env-from-Secret with optional=true: the Pod starts
+ // before the deploy applies the Secret, and the dashboard's two
+ // HARMONY_CONFIG_* values are simply absent on deploys that don't
+ // configure auth (dev/e2e).
+ let secret_env = |name: &str| EnvVar {
+ name: name.to_string(),
+ value_from: Some(EnvVarSource {
+ secret_key_ref: Some(SecretKeySelector {
+ name: SECRET_NAME.to_string(),
+ key: name.to_string(),
+ optional: Some(true),
+ }),
+ ..Default::default()
+ }),
+ ..Default::default()
+ };
env.push(EnvVar {
name: OPERATOR_CREDENTIALS_ENV_VAR.to_string(),
value_from: Some(EnvVarSource {
@@ -395,6 +455,8 @@ fn operator_deployment(opts: &ChartOptions) -> K8sDeployment {
}),
..Default::default()
});
+ env.push(secret_env(ENV_WEB_AUTH_CONFIG));
+ env.push(secret_env(ENV_WEB_COOKIE_KEY));
// Namespace deliberately omitted — same rationale as the
// ServiceAccount: helm fills in the release namespace at install
@@ -532,7 +594,7 @@ mod tests {
}
#[test]
- fn chart_includes_credentials_secret_and_env_var() {
+ fn chart_omits_secret_but_deployment_references_it() {
let tmp = tempfile::tempdir().unwrap();
let chart_path = build_chart(&ChartOptions {
output_dir: tmp.path().to_path_buf(),
@@ -540,14 +602,58 @@ mod tests {
})
.unwrap();
- let secret_yaml =
- std::fs::read_to_string(chart_path.join("templates/secret-credentials.yaml"))
- .expect("secret-credentials.yaml must exist in chart");
- assert!(secret_yaml.contains(SECRET_NAME));
+ // The chart is hydrated (no templating), so it must NOT carry the
+ // credentials Secret — the deploy applies it out-of-band. A Secret
+ // in the chart fights Helm ownership against the deploy-applied one
+ // and re-blanks it on every upgrade.
+ assert!(
+ !chart_path
+ .join("templates/secret-credentials.yaml")
+ .exists(),
+ "hydrated chart must not contain a Secret"
+ );
+ // The Deployment still wires the env var from that out-of-band
+ // Secret via secretKeyRef (optional, so the Pod starts before it
+ // lands).
let deployment_yaml = std::fs::read_to_string(chart_path.join("templates/deployment.yaml"))
.expect("deployment.yaml must exist in chart");
assert!(deployment_yaml.contains(OPERATOR_CREDENTIALS_ENV_VAR));
assert!(deployment_yaml.contains(SECRET_NAME));
}
+
+ // The CRDs carry status subresources, which RBAC treats as distinct
+ // resources — a patch on `*/status` is forbidden without an explicit
+ // grant. Lock both so adding a status subresource can't silently 403.
+ #[test]
+ fn cluster_role_grants_status_subresources() {
+ let role = cluster_role();
+ let grants_patch = |resource: &str| {
+ role.rules.as_ref().unwrap().iter().any(|r| {
+ r.resources
+ .as_ref()
+ .is_some_and(|res| res.iter().any(|x| x == resource))
+ && r.verbs.iter().any(|v| v == "patch")
+ })
+ };
+ assert!(grants_patch("deployments/status"));
+ assert!(grants_patch("devices/status"));
+ }
+
+ // The chart bakes these env names at publish time; the operator's
+ // ConfigClient derives them from the struct names at runtime. Lock
+ // them together so a rename can't silently desync the two.
+ #[test]
+ fn web_auth_env_names_match_config_keys() {
+ use harmony_config::Config;
+ use harmony_zitadel_auth::{OperatorCookieKey, ZitadelAuthConfig};
+ assert_eq!(
+ ENV_WEB_AUTH_CONFIG,
+ format!("HARMONY_CONFIG_{}", ZitadelAuthConfig::KEY)
+ );
+ assert_eq!(
+ ENV_WEB_COOKIE_KEY,
+ format!("HARMONY_CONFIG_{}", OperatorCookieKey::KEY)
+ );
+ }
}
diff --git a/fleet/harmony-fleet-deploy/src/operator/score.rs b/fleet/harmony-fleet-deploy/src/operator/score.rs
index 94376016..305fb253 100644
--- a/fleet/harmony-fleet-deploy/src/operator/score.rs
+++ b/fleet/harmony-fleet-deploy/src/operator/score.rs
@@ -78,6 +78,17 @@ pub struct FleetOperatorScore {
/// cert-manager `ClusterIssuer` for the UI Ingress. `None` (or no
/// host) serves plain HTTP — the right default on issuer-less k3d.
pub cluster_issuer: Option,
+ /// Dashboard SSO config + cookie key, baked into the operator Secret
+ /// for the pod's `ConfigClient` to read. `None` leaves the dashboard
+ /// unauthenticated (dev/e2e).
+ pub web_auth: Option,
+}
+
+/// The dashboard's auth inputs the operator reads via `ConfigClient`.
+#[derive(Debug, Clone, Serialize)]
+pub struct WebAuth {
+ pub config: harmony_zitadel_auth::ZitadelAuthConfig,
+ pub cookie: harmony_zitadel_auth::OperatorCookieKey,
}
impl FleetOperatorScore {
@@ -98,6 +109,7 @@ impl FleetOperatorScore {
published_chart: None,
operator_ui_host: None,
cluster_issuer: None,
+ web_auth: None,
}
}
@@ -110,6 +122,17 @@ impl FleetOperatorScore {
self
}
+ /// Configure dashboard SSO: the `ZitadelAuthConfig` + cookie key are
+ /// baked into the operator Secret for the pod's `ConfigClient`.
+ pub fn web_auth(
+ mut self,
+ config: harmony_zitadel_auth::ZitadelAuthConfig,
+ cookie: harmony_zitadel_auth::OperatorCookieKey,
+ ) -> Self {
+ self.web_auth = Some(WebAuth { config, cookie });
+ self
+ }
+
/// Install the published OCI chart at `version` instead of rendering
/// one from local source (the CD `harmony apply` path).
pub fn published_chart(
@@ -202,9 +225,22 @@ impl Interpret for FleetOperatorInterp
// directly, not via the chart — it's environment-specific. The
// published-chart CD path runs without credentials today, so
// this is a no-op there.
+ let (web_auth_config_json, web_cookie_key_json) = match &self.score.web_auth {
+ Some(w) => (
+ Some(serde_json::to_string(&w.config).map_err(|e| {
+ InterpretError::new(format!("serialize ZitadelAuthConfig: {e}"))
+ })?),
+ Some(serde_json::to_string(&w.cookie).map_err(|e| {
+ InterpretError::new(format!("serialize OperatorCookieKey: {e}"))
+ })?),
+ ),
+ None => (None, None),
+ };
if let Some(creds) = &self.score.credentials
&& let Some(secret) = operator_secret(&ChartOptions {
credentials: Some(creds.clone()),
+ web_auth_config_json,
+ web_cookie_key_json,
..ChartOptions::default()
})
{
@@ -252,6 +288,10 @@ impl Interpret for FleetOperatorInterp
log_level: self.score.log_level.clone(),
credentials: self.score.credentials.clone(),
chart_version: None,
+ // The auth Secret is applied separately above; the
+ // rendered chart only needs the Deployment env wiring.
+ web_auth_config_json: None,
+ web_cookie_key_json: None,
})
.map_err(|e| InterpretError::new(format!("build operator chart: {e}")))?;
let chart_path_str = chart_path.to_str().ok_or_else(|| {
diff --git a/fleet/harmony-fleet-deploy/src/secrets.rs b/fleet/harmony-fleet-deploy/src/secrets.rs
index d86dad5c..74768364 100644
--- a/fleet/harmony-fleet-deploy/src/secrets.rs
+++ b/fleet/harmony-fleet-deploy/src/secrets.rs
@@ -22,6 +22,12 @@ pub struct FleetDeploySecrets {
/// `KUBECONFIG`.
#[serde(default)]
pub kubeconfig: Option,
+
+ /// Operator dashboard session-cookie signing key (standard-base64 of
+ /// ≥64 bytes). `#[serde(default)]` so older seeds without it still
+ /// load — but the dashboard won't authenticate until it's set.
+ #[serde(default)]
+ pub operator_cookie_key_b64: String,
}
/// Non-secret deploy config: k8s namespaces + chart coords. Loaded via
@@ -52,6 +58,14 @@ pub struct FleetDeployConfig {
/// cert-manager `ClusterIssuer` for the operator UI's TLS cert.
pub cluster_issuer: String,
+
+ /// Zitadel OIDC app the operator dashboard authenticates against.
+ #[serde(default)]
+ pub operator_oidc_client_id: String,
+
+ /// Token audiences the dashboard accepts (Zitadel project/app ids).
+ #[serde(default)]
+ pub operator_trusted_audiences: Vec,
}
impl Default for FleetDeployConfig {
@@ -65,6 +79,8 @@ impl Default for FleetDeployConfig {
operator_chart_project: "harmony".to_string(),
base_domain: "cb1.nationtech.io".to_string(),
cluster_issuer: "letsencrypt-prod".to_string(),
+ operator_oidc_client_id: String::new(),
+ operator_trusted_audiences: Vec::new(),
}
}
}
diff --git a/fleet/harmony-fleet-operator/Cargo.toml b/fleet/harmony-fleet-operator/Cargo.toml
index f272a067..90dc0edb 100644
--- a/fleet/harmony-fleet-operator/Cargo.toml
+++ b/fleet/harmony-fleet-operator/Cargo.toml
@@ -17,6 +17,7 @@ web-frontend = ["dep:axum", "dep:axum-extra", "dep:maud", "dep:tokio-stream", "h
harmony = { path = "../../harmony", features = ["podman"] }
harmony-fleet-auth = { path = "../harmony-fleet-auth" }
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
+harmony_config = { path = "../../harmony_config" }
harmony_zitadel_auth = { path = "../../harmony_zitadel_auth" }
toml = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
diff --git a/fleet/harmony-fleet-operator/Dockerfile b/fleet/harmony-fleet-operator/Dockerfile
index e1ba15be..59cb934b 100644
--- a/fleet/harmony-fleet-operator/Dockerfile
+++ b/fleet/harmony-fleet-operator/Dockerfile
@@ -1,3 +1,4 @@
+# syntax=docker/dockerfile:1.7
# Multi-stage container build for harmony-fleet-operator.
#
# Build context is the workspace root (the operator's Cargo.toml has
@@ -21,17 +22,44 @@
FROM docker.io/rust:1.94-slim-bookworm AS builder
# pkg-config + libssl-dev cover transitive native-tls/openssl-sys deps
-# that surface during link; ca-certificates lets cargo fetch over TLS.
+# that surface during link; ca-certificates lets cargo fetch over TLS;
+# curl downloads the Tailwind CLI below.
RUN apt-get update && apt-get install -y --no-install-recommends \
pkg-config \
ca-certificates \
libssl-dev \
+ curl \
&& rm -rf /var/lib/apt/lists/*
+# Tailwind v4 standalone CLI: build.rs shells out to it to generate the
+# embedded CSS bundle. Pinned to match the dev host; override with
+# --build-arg TAILWIND_VERSION=... TAILWIND_REQUIRED makes build.rs hard
+# fail (not silently ship empty CSS) if the CLI is missing — see build.rs.
+ARG TAILWIND_VERSION=v4.3.0
+RUN curl -fsSL -o /usr/local/bin/tailwindcss \
+ "https://github.com/tailwindlabs/tailwindcss/releases/download/${TAILWIND_VERSION}/tailwindcss-linux-x64" \
+ && chmod +x /usr/local/bin/tailwindcss \
+ && tailwindcss --help >/dev/null
+ENV TAILWIND_REQUIRED=1
+
WORKDIR /app
COPY . .
-RUN cargo build --release --locked -p harmony-fleet-operator
+# `web-frontend` bundles the dashboard the operator serves in-process
+# alongside the reconcile loop.
+#
+# BuildKit cache mounts persist the cargo registry + `target/` across
+# builds, so an iterate-build-deploy loop recompiles only changed crates
+# (seconds) instead of the whole workspace (minutes). The mounts are
+# build-time only — the resulting image is identical, and a cold CI
+# runner just rebuilds from scratch. The binary is copied out of the
+# cache-mounted target within the same RUN, since cache mounts aren't
+# part of the produced layer.
+RUN --mount=type=cache,target=/usr/local/cargo/registry \
+ --mount=type=cache,target=/usr/local/cargo/git \
+ --mount=type=cache,target=/app/target \
+ cargo build --release --locked -p harmony-fleet-operator --features web-frontend \
+ && cp /app/target/release/harmony-fleet-operator /harmony-fleet-operator
FROM docker.io/library/debian:bookworm-slim
@@ -41,7 +69,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
-COPY --from=builder /app/target/release/harmony-fleet-operator /usr/local/bin/harmony-fleet-operator
+COPY --from=builder /harmony-fleet-operator /usr/local/bin/harmony-fleet-operator
# Non-root runtime. Pairs with the Pod's `securityContext.
# runAsNonRoot: true` in the helm chart — k8s admission rejects pods
diff --git a/fleet/harmony-fleet-operator/build.rs b/fleet/harmony-fleet-operator/build.rs
index c004905b..bcac15bd 100644
--- a/fleet/harmony-fleet-operator/build.rs
+++ b/fleet/harmony-fleet-operator/build.rs
@@ -33,34 +33,38 @@ fn main() {
println!("cargo:rerun-if-changed=style/input.css");
println!("cargo:rerun-if-changed=src/frontend");
println!("cargo:rerun-if-changed=src/service");
+ // Toggling this re-runs the script, so the container build (which sets
+ // it) regenerates rather than reusing a cache-mounted empty bundle.
+ println!("cargo:rerun-if-env-changed=TAILWIND_REQUIRED");
- let status = Command::new("tailwindcss")
+ // Container/production builds set TAILWIND_REQUIRED: a missing or
+ // failing CLI is a hard error there (never ship empty CSS). Dev builds
+ // leave it unset and fall back to empty, serving CSS via
+ // `serve-web --css-from ` against a `tailwindcss --watch` sidecar.
+ let required = std::env::var_os("TAILWIND_REQUIRED").is_some();
+ let fall_back = |reason: String| {
+ assert!(
+ !required,
+ "{reason}\nTAILWIND_REQUIRED is set: refusing to ship a frontend with empty CSS. \
+ Install the v4 standalone CLI: https://github.com/tailwindlabs/tailwindcss/releases"
+ );
+ println!(
+ "cargo:warning={reason}; embedded CSS will be empty \
+ (use `serve-web --css-from ` in dev)."
+ );
+ std::fs::write(&output, b"").unwrap();
+ };
+
+ match Command::new("tailwindcss")
.arg("--input")
.arg(&input)
.arg("--output")
.arg(&output)
.arg("--minify")
- .status();
-
- match status {
+ .status()
+ {
Ok(s) if s.success() => {}
- Ok(s) => {
- println!(
- "cargo:warning=tailwindcss exited with status {s}; embedded CSS will be empty. \
- Install the v4 standalone CLI \
- (https://github.com/tailwindlabs/tailwindcss/releases) for production builds, \
- or use `serve-web --css-from ` against a `tailwindcss --watch` sidecar in dev."
- );
- std::fs::write(&output, b"").unwrap();
- }
- Err(e) => {
- println!(
- "cargo:warning=tailwindcss not invocable ({e}); embedded CSS will be empty. \
- Install the v4 standalone CLI \
- (https://github.com/tailwindlabs/tailwindcss/releases) for production builds, \
- or use `serve-web --css-from ` against a `tailwindcss --watch` sidecar in dev."
- );
- std::fs::write(&output, b"").unwrap();
- }
+ Ok(s) => fall_back(format!("tailwindcss exited with status {s}")),
+ Err(e) => fall_back(format!("tailwindcss not invocable ({e})")),
}
}
diff --git a/fleet/harmony-fleet-operator/dev.sh b/fleet/harmony-fleet-operator/dev.sh
index 812f8272..75173f04 100644
--- a/fleet/harmony-fleet-operator/dev.sh
+++ b/fleet/harmony-fleet-operator/dev.sh
@@ -1,13 +1,11 @@
#!/bin/bash
-export BASE_URL=http://localhost:18080
-export FLEET_AUTH_ZITADEL_BASE=https://sso-stg.cb1.nationtech.io
-export FLEET_AUTH_CLIENT_ID=372626218874372917
-export FLEET_AUTH_SCOPE="openid profile email"
-export FLEET_AUTH_LOGOUT_REDIRECT_URI="http://localhost:18080/"
-export FLEET_OPERATOR_COOKIE_KEY_B64=6eKVpj88jwIcmaJajPfohdaIXhSPlfYCrHaOfymTcIWBAIadvhg7NHpMo5vPSMy90vac3cq2liWe1naSgkbaYg==
-export FLEET_AUTH_TRUSTED_AUDIENCES=371639797493596981,371683318111994677,372626218874372917,371639797157987125
-export BASE_URL=http://localhost:18080
+# The operator reads its auth config via ConfigClient — one typed value
+# each (EnvSource keys a Config struct under HARMONY_CONFIG_),
+# not a fistful of FLEET_AUTH_* vars. base_url is localhost for this local
+# serve-web; staging derives it from base_domain in the deploy.
+export HARMONY_CONFIG_ZitadelAuthConfig='{"zitadel_base":"https://sso-stg.cb1.nationtech.io","base_url":"http://localhost:18080","client_id":"372626218874372917","scope":"openid profile email","trusted_audiences":["371639797493596981","371683318111994677","372626218874372917","371639797157987125"],"logout_redirect_uri":"http://localhost:18080/"}'
+export HARMONY_CONFIG_OperatorCookieKey='{"cookie_key_b64":"6eKVpj88jwIcmaJajPfohdaIXhSPlfYCrHaOfymTcIWBAIadvhg7NHpMo5vPSMy90vac3cq2liWe1naSgkbaYg=="}'
export RUST_LOG=debug
cargo watch -- cargo run -p harmony-fleet-operator --features web-frontend -- serve-web \
diff --git a/fleet/harmony-fleet-operator/src/device_status.rs b/fleet/harmony-fleet-operator/src/device_status.rs
new file mode 100644
index 00000000..052c9e0b
--- /dev/null
+++ b/fleet/harmony-fleet-operator/src/device_status.rs
@@ -0,0 +1,173 @@
+//! Device liveness reconciler: NATS `device-heartbeat` KV → `Device.status`.
+//!
+//! Agents ping `heartbeat.` every ~30 s. This reconciler
+//! keeps the latest ping per device in memory (from a KV watch) and,
+//! on a tick, reflects freshness onto `Device.status` —
+//! `lastHeartbeat` + a coarse [`Reachability`]. That makes liveness
+//! visible to `kubectl get devices` and lets the dashboard read it
+//! from the CR instead of re-deriving staleness from NATS.
+//!
+//! Mirrors the aggregator's "watch fills a cache, a tick patches the
+//! CR" shape. The staleness threshold lives here only — it's a device
+//! concept the operator owns, not something every reader re-decides.
+
+use std::collections::HashMap;
+use std::time::Duration;
+
+use anyhow::Result;
+use async_nats::jetstream::kv::{Operation, Store};
+use chrono::{DateTime, Utc};
+use futures_util::StreamExt;
+use harmony_reconciler_contracts::{BUCKET_DEVICE_HEARTBEAT, HeartbeatPayload};
+use kube::api::{Api, Patch, PatchParams};
+use kube::{Client, ResourceExt};
+use serde_json::json;
+use tokio::sync::Mutex;
+
+use harmony::modules::fleet::operator::{Device, Reachability};
+
+/// A device with no heartbeat within this window is `Stale`. Agents
+/// ping every 30 s, so this tolerates ~2 missed pings.
+const STALE_AFTER: Duration = Duration::from_secs(90);
+/// How often to re-evaluate freshness and patch changed devices.
+const TICK: Duration = Duration::from_secs(30);
+
+pub async fn run(client: Client, js: async_nats::jetstream::Context) -> Result<()> {
+ let bucket = js
+ .create_key_value(async_nats::jetstream::kv::Config {
+ bucket: BUCKET_DEVICE_HEARTBEAT.to_string(),
+ ..Default::default()
+ })
+ .await?;
+
+ let heartbeats: Mutex>> = Mutex::new(HashMap::new());
+ let devices: Api = Api::all(client);
+
+ tokio::try_join!(
+ watch_heartbeats(&bucket, &heartbeats),
+ patch_loop(&devices, &heartbeats),
+ )?;
+ Ok(())
+}
+
+async fn watch_heartbeats(
+ bucket: &Store,
+ heartbeats: &Mutex>>,
+) -> Result<()> {
+ let mut watch = bucket.watch_with_history(">").await?;
+ tracing::info!("device-status: watching device-heartbeat KV");
+ while let Some(entry_res) = watch.next().await {
+ let entry = match entry_res {
+ Ok(e) => e,
+ Err(e) => {
+ tracing::warn!(error = %e, "device-status: watch delivery error");
+ continue;
+ }
+ };
+ match entry.operation {
+ Operation::Put => {
+ if let Ok(hb) = serde_json::from_slice::(&entry.value) {
+ heartbeats
+ .lock()
+ .await
+ .insert(hb.device_id.to_string(), hb.at);
+ }
+ }
+ Operation::Delete | Operation::Purge => {
+ if let Some(id) = entry.key.strip_prefix("heartbeat.") {
+ heartbeats.lock().await.remove(id);
+ }
+ }
+ }
+ }
+ Ok(())
+}
+
+async fn patch_loop(
+ devices: &Api,
+ heartbeats: &Mutex>>,
+) -> Result<()> {
+ // Last status written per device, to skip no-op patches.
+ let mut applied: HashMap)> = HashMap::new();
+ let mut ticker = tokio::time::interval(TICK);
+ loop {
+ ticker.tick().await;
+ let snapshot: Vec<(String, DateTime)> = heartbeats
+ .lock()
+ .await
+ .iter()
+ .map(|(k, v)| (k.clone(), *v))
+ .collect();
+ let now = Utc::now();
+ for (id, at) in snapshot {
+ let reachability = reachability(at, now);
+ if applied.get(&id) == Some(&(reachability, at)) {
+ continue;
+ }
+ if patch_status(devices, &id, reachability, at).await {
+ applied.insert(id, (reachability, at));
+ }
+ }
+ }
+}
+
+fn reachability(last_heartbeat: DateTime, now: DateTime) -> Reachability {
+ if (now - last_heartbeat)
+ .to_std()
+ .is_ok_and(|d| d <= STALE_AFTER)
+ {
+ Reachability::Reachable
+ } else {
+ Reachability::Stale
+ }
+}
+
+/// Returns whether the patch succeeded (so we only cache applied state
+/// on success and retry next tick otherwise).
+async fn patch_status(
+ devices: &Api,
+ id: &str,
+ reachability: Reachability,
+ last_heartbeat: DateTime,
+) -> bool {
+ let status = json!({
+ "status": {
+ "lastHeartbeat": last_heartbeat.to_rfc3339(),
+ "reachability": reachability,
+ }
+ });
+ match devices
+ .patch_status(id, &PatchParams::default(), &Patch::Merge(&status))
+ .await
+ {
+ Ok(d) => {
+ tracing::debug!(device = %d.name_any(), ?reachability, "device-status: patched");
+ true
+ }
+ // A heartbeat can outrace the Device CR's creation by the
+ // device-reconciler; skip this tick and retry on the next.
+ Err(kube::Error::Api(ae)) if ae.code == 404 => false,
+ Err(e) => {
+ tracing::warn!(%id, error = %e, "device-status: patch failed");
+ false
+ }
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+
+ #[test]
+ fn reachable_within_window_stale_after() {
+ let now = Utc::now();
+ assert_eq!(
+ reachability(now - chrono::Duration::seconds(10), now),
+ Reachability::Reachable
+ );
+ assert_eq!(
+ reachability(now - chrono::Duration::seconds(120), now),
+ Reachability::Stale
+ );
+ }
+}
diff --git a/fleet/harmony-fleet-operator/src/lib.rs b/fleet/harmony-fleet-operator/src/lib.rs
index 8c293a4e..da2f81d6 100644
--- a/fleet/harmony-fleet-operator/src/lib.rs
+++ b/fleet/harmony-fleet-operator/src/lib.rs
@@ -14,4 +14,5 @@
pub mod commands;
pub mod device_reconciler;
+pub mod device_status;
pub mod fleet_aggregator;
diff --git a/fleet/harmony-fleet-operator/src/main.rs b/fleet/harmony-fleet-operator/src/main.rs
index 781fbc2d..e6ff215d 100644
--- a/fleet/harmony-fleet-operator/src/main.rs
+++ b/fleet/harmony-fleet-operator/src/main.rs
@@ -5,7 +5,7 @@ mod frontend;
#[cfg(feature = "web-frontend")]
mod service;
-use harmony_fleet_operator::{device_reconciler, fleet_aggregator};
+use harmony_fleet_operator::{device_reconciler, device_status, fleet_aggregator};
use anyhow::{Context, Result};
use async_nats::jetstream;
@@ -109,6 +109,9 @@ async fn main() -> Result<()> {
}
}
+/// `serve-web` subcommand: dashboard on its own (mock data, or the live
+/// CR-reading service). The deployed operator instead serves the
+/// dashboard alongside the reconcile loop — see [`spawn_dashboard`].
#[cfg(feature = "web-frontend")]
async fn serve_web(
mock: bool,
@@ -117,22 +120,46 @@ async fn serve_web(
live_reload: bool,
) -> Result<()> {
use std::sync::Arc;
- use std::time::Duration;
- use frontend::server::{AppState, Config};
- use service::{FleetService, mock::MockFleetService};
+ use service::{FleetService, mock::MockFleetService, real::RealFleetService};
let fleet: Arc = if mock {
Arc::new(MockFleetService::default())
} else {
- anyhow::bail!(
- "serve-web without --mock is not implemented yet (real FleetService impl pending). \
- Pass --mock for the dev workflow."
- );
+ Arc::new(RealFleetService::new(Client::try_default().await?))
};
+ serve_dashboard(fleet, addr, css_from, live_reload).await
+}
- let cookie_key = harmony_zitadel_auth::cookie_key_from_env();
- let config = harmony_zitadel_auth::config_from_env();
+/// Build the Zitadel-authenticated web server around a [`FleetService`]
+/// and serve until it exits. Shared by the `serve-web` subcommand and
+/// the in-process dashboard the reconcile loop spawns.
+#[cfg(feature = "web-frontend")]
+async fn serve_dashboard(
+ fleet: std::sync::Arc,
+ addr: std::net::SocketAddr,
+ css_from: Option,
+ live_reload: bool,
+) -> Result<()> {
+ use std::time::Duration;
+
+ use frontend::server::{AppState, Config};
+ use harmony_zitadel_auth::{OperatorCookieKey, ZitadelAuthConfig};
+
+ // One typed config value each, resolved EnvSource → OpenBao by
+ // ConfigClient — not a fistful of FLEET_AUTH_* env vars. Namespace
+ // only matters for the OpenBao source; EnvSource reads regardless.
+ let ns = std::env::var("HARMONY_CONFIG_NAMESPACE").unwrap_or_else(|_| "fleet-staging".into());
+ let cc = harmony_config::ConfigClient::for_namespace(&ns).await;
+ let config: ZitadelAuthConfig = cc
+ .get()
+ .await
+ .context("loading ZitadelAuthConfig (HARMONY_CONFIG_ZitadelAuthConfig or OpenBao)")?;
+ let cookie_key = cc
+ .get::()
+ .await
+ .context("loading OperatorCookieKey")?
+ .key()?;
let http_client = reqwest::Client::new();
let jwks = harmony_zitadel_auth::JwksCache::new(&config.zitadel_base, http_client.clone())
@@ -155,6 +182,25 @@ async fn serve_web(
.await
}
+/// Serve the dashboard in-process alongside the reconcile loop. Failure
+/// (e.g. Zitadel not yet reachable for JWKS) is logged but never tears
+/// down reconcile — the read UI is best-effort, the controller is not.
+#[cfg(feature = "web-frontend")]
+fn spawn_dashboard(client: Client) {
+ use std::net::SocketAddr;
+ use std::sync::Arc;
+
+ use service::real::RealFleetService;
+
+ let addr = SocketAddr::from(([0, 0, 0, 0], frontend::server::DEFAULT_PORT));
+ tokio::spawn(async move {
+ let fleet = Arc::new(RealFleetService::new(client));
+ if let Err(e) = serve_dashboard(fleet, addr, None, false).await {
+ tracing::error!(error = %e, "dashboard server exited; reconcile continues");
+ }
+ });
+}
+
async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()> {
let nats = connect_with_retry(nats_url, credentials_toml).await?;
tracing::info!(url = %nats_url, "connected to NATS");
@@ -169,9 +215,16 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()>
let client = Client::try_default().await?;
- // Three concurrent tasks:
+ // Serve the read-only dashboard in the same process (best-effort;
+ // it reads CRs only, no NATS). Built only with the web-frontend
+ // feature; absent from the lean reconcile-only image.
+ #[cfg(feature = "web-frontend")]
+ spawn_dashboard(client.clone());
+
+ // Concurrent tasks:
// controller — CR validation + finalizer-cleanup
// device_reconciler — NATS device-info → Device CR
+ // device_status — NATS device-heartbeat → Device.status liveness
// fleet_aggregator — watches Deployments + Devices + states,
// writes desired-state KV, patches CR status
// Any failing tears the process down; kube-rs Controller swallows
@@ -179,9 +232,12 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()>
let ctl_client = client.clone();
let dr_client = client.clone();
let dr_js = js.clone();
+ let ds_client = client.clone();
+ let ds_js = js.clone();
tokio::select! {
r = controller::run(ctl_client, desired_state_kv) => r,
r = device_reconciler::run(dr_client, dr_js) => r,
+ r = device_status::run(ds_client, ds_js) => r,
r = fleet_aggregator::run(client, js) => r,
}
}
diff --git a/fleet/harmony-fleet-operator/src/service/mod.rs b/fleet/harmony-fleet-operator/src/service/mod.rs
index 48bc78ac..fb60b2e4 100644
--- a/fleet/harmony-fleet-operator/src/service/mod.rs
+++ b/fleet/harmony-fleet-operator/src/service/mod.rs
@@ -1,4 +1,5 @@
pub mod mock;
+pub mod real;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
diff --git a/fleet/harmony-fleet-operator/src/service/real.rs b/fleet/harmony-fleet-operator/src/service/real.rs
new file mode 100644
index 00000000..5e48f59f
--- /dev/null
+++ b/fleet/harmony-fleet-operator/src/service/real.rs
@@ -0,0 +1,461 @@
+//! Live [`FleetService`] as a read-only projection of Kubernetes CRs.
+//!
+//! The operator is the write side: `device_reconciler` materializes
+//! `Device` CRs (labels + inventory), `device_status` reflects liveness
+//! onto `Device.status`, and `fleet_aggregator` writes
+//! `Deployment.status.aggregate`. This dashboard is the read side — it
+//! only reads those CRs and projects them to view DTOs. No NATS: the
+//! CR is the single contract between the two sides.
+
+use std::collections::{BTreeMap, HashSet};
+use std::sync::Mutex;
+
+use anyhow::Context as _;
+use async_trait::async_trait;
+use chrono::{DateTime, Utc};
+use kube::api::{Api, ListParams, Patch, PatchParams};
+use kube::{Client, ResourceExt};
+use serde_json::json;
+
+use harmony::modules::fleet::operator::{
+ Deployment as DeploymentCr, Device as DeviceCr, DeviceStatus as DeviceLiveness, Reachability,
+};
+use harmony::modules::podman::ReconcileScore;
+use harmony_fleet_operator::fleet_aggregator::selector_matches;
+
+use super::{
+ Alert, AlertSeverity, DashboardDetail, DeploymentDetail, DeploymentStatus, DeviceDetail,
+ DeviceStatus, FleetService,
+};
+
+/// Label the operator UI sets to quarantine a device.
+const BLACKLIST_LABEL: &str = "fleet.nationtech.io/blacklisted";
+/// Label key the agent uses for a device's region, if it sets one.
+const REGION_LABEL: &str = "region";
+
+pub struct RealFleetService {
+ kube: Client,
+ /// In-memory ack set. Alerts are derived from live CR state and
+ /// have no store of their own, so acks don't survive a restart.
+ acked_alerts: Mutex>,
+}
+
+impl RealFleetService {
+ pub fn new(kube: Client) -> Self {
+ Self {
+ kube,
+ acked_alerts: Mutex::new(HashSet::new()),
+ }
+ }
+
+ async fn device_crs(&self) -> anyhow::Result> {
+ let api: Api = Api::all(self.kube.clone());
+ Ok(api.list(&ListParams::default()).await?.items)
+ }
+
+ async fn deployment_crs(&self) -> anyhow::Result> {
+ let api: Api = Api::all(self.kube.clone());
+ Ok(api.list(&ListParams::default()).await?.items)
+ }
+
+ async fn devices(&self) -> anyhow::Result> {
+ let deployments = self.deployment_crs().await?;
+ let now = Utc::now();
+ let mut devices: Vec = self
+ .device_crs()
+ .await?
+ .iter()
+ .map(|cr| map_device(cr, &deployments, now))
+ .collect();
+ devices.sort_by(|a, b| a.id.cmp(&b.id));
+ Ok(devices)
+ }
+
+ async fn deployments(&self) -> anyhow::Result> {
+ let mut deployments: Vec = self
+ .deployment_crs()
+ .await?
+ .iter()
+ .map(map_deployment)
+ .collect();
+ deployments.sort_by(|a, b| a.name.cmp(&b.name));
+ Ok(deployments)
+ }
+}
+
+fn map_device(cr: &DeviceCr, deployments: &[DeploymentCr], now: DateTime) -> DeviceDetail {
+ let labels = cr.metadata.labels.clone().unwrap_or_default();
+ let blacklisted = labels.get(BLACKLIST_LABEL).map(String::as_str) == Some("true");
+ let last_seen = cr
+ .status
+ .as_ref()
+ .and_then(|s| s.last_heartbeat.as_deref())
+ .and_then(|s| DateTime::parse_from_rfc3339(s).ok())
+ .map(|dt| dt.with_timezone(&Utc))
+ .or_else(|| cr.creation_timestamp().map(|t| t.0))
+ .unwrap_or(now);
+
+ DeviceDetail {
+ id: cr.name_any(),
+ status: device_status(blacklisted, cr.status.as_ref()),
+ last_seen,
+ minutes_ago: (now - last_seen).num_minutes().max(0),
+ deployment: primary_deployment(&labels, deployments),
+ region: labels
+ .get(REGION_LABEL)
+ .cloned()
+ .unwrap_or_else(|| "\u{2014}".to_string()),
+ tags: tags_from_labels(&labels),
+ inventory: cr.spec.inventory.clone(),
+ }
+}
+
+/// UI status from the device's quarantine label + operator-written
+/// liveness. Failing/Pending are deployment-level (the Deployments
+/// view), not device liveness, so they don't appear here.
+fn device_status(blacklisted: bool, liveness: Option<&DeviceLiveness>) -> DeviceStatus {
+ if blacklisted {
+ return DeviceStatus::Blacklisted;
+ }
+ match liveness.map(|s| s.reachability) {
+ Some(Reachability::Reachable) => DeviceStatus::Healthy,
+ Some(Reachability::Stale) => DeviceStatus::Stale,
+ Some(Reachability::Unknown) | None => DeviceStatus::Unknown,
+ }
+}
+
+/// First deployment (by name) whose selector matches the device — the
+/// canonical [`selector_matches`] over CR labels, the same matcher the
+/// aggregator uses. No reconstruction.
+fn primary_deployment(
+ labels: &BTreeMap,
+ deployments: &[DeploymentCr],
+) -> Option {
+ let mut matched: Vec = deployments
+ .iter()
+ .filter(|d| selector_matches(&d.spec.target_selector, labels))
+ .map(ResourceExt::name_any)
+ .collect();
+ matched.sort();
+ matched.into_iter().next()
+}
+
+/// Routing labels rendered as `k=v` chips, minus internal keys.
+fn tags_from_labels(labels: &BTreeMap) -> Vec {
+ labels
+ .iter()
+ .filter(|(k, _)| k.as_str() != BLACKLIST_LABEL && k.as_str() != REGION_LABEL)
+ .map(|(k, v)| format!("{k}={v}"))
+ .collect()
+}
+
+fn map_deployment(cr: &DeploymentCr) -> DeploymentDetail {
+ let agg = cr
+ .status
+ .as_ref()
+ .and_then(|s| s.aggregate.clone())
+ .unwrap_or_default();
+ let status = if agg.failed > 0 {
+ DeploymentStatus::Failing
+ } else if agg.pending > 0 {
+ DeploymentStatus::Rolling
+ } else {
+ DeploymentStatus::Active
+ };
+ DeploymentDetail {
+ name: cr.name_any(),
+ version: deployment_version(&cr.spec.score),
+ status,
+ target: agg.matched_device_count,
+ healthy: agg.succeeded,
+ failing: agg.failed,
+ pending: agg.pending,
+ updated_at: cr
+ .creation_timestamp()
+ .map(|t| t.0.format("%Y-%m-%d %H:%M").to_string())
+ .unwrap_or_else(|| "\u{2014}".to_string()),
+ }
+}
+
+/// No first-class version on the CR; use the first service's image tag
+/// as the human-facing proxy (`nginx:1.25` → `1.25`).
+fn deployment_version(score: &ReconcileScore) -> String {
+ let ReconcileScore::PodmanV0(s) = score;
+ s.services
+ .first()
+ .map(|svc| {
+ svc.image
+ .rsplit_once(':')
+ .map(|(_, tag)| tag.to_string())
+ .unwrap_or_else(|| svc.image.clone())
+ })
+ .unwrap_or_else(|| "\u{2014}".to_string())
+}
+
+/// Alerts derived from current CR state (no alert store yet): one
+/// critical per failing deployment, one warning per stale device.
+fn derive_alerts(
+ devices: &[DeviceDetail],
+ deployments: &[DeploymentDetail],
+ acked: &HashSet,
+) -> Vec {
+ let mut alerts = Vec::new();
+ for d in deployments {
+ if d.failing > 0 {
+ let id = format!("dep:{}:failing", d.name);
+ alerts.push(Alert {
+ acked: acked.contains(&id),
+ id,
+ severity: AlertSeverity::Critical,
+ title: format!("{} has {} failing device(s)", d.name, d.failing),
+ deployment: Some(d.name.clone()),
+ device: None,
+ at: String::new(),
+ });
+ }
+ }
+ for dev in devices {
+ if dev.status == DeviceStatus::Stale {
+ let id = format!("device:{}:stale", dev.id);
+ alerts.push(Alert {
+ acked: acked.contains(&id),
+ id,
+ severity: AlertSeverity::Warning,
+ title: format!("{} last pinged {}m ago", dev.id, dev.minutes_ago),
+ deployment: dev.deployment.clone(),
+ device: Some(dev.id.clone()),
+ at: String::new(),
+ });
+ }
+ }
+ alerts
+}
+
+#[async_trait]
+impl FleetService for RealFleetService {
+ async fn dashboard_detail(&self) -> anyhow::Result {
+ let devices = self.devices().await?;
+ let mut deployments = self.deployments().await?;
+ let acked = self.acked_alerts.lock().unwrap().clone();
+ let active_alerts = derive_alerts(&devices, &deployments, &acked)
+ .into_iter()
+ .filter(|a| !a.acked)
+ .collect();
+
+ let mut d = DashboardDetail {
+ devices_total: devices.len() as u32,
+ devices_healthy: 0,
+ devices_pending: 0,
+ devices_failing: 0,
+ devices_stale: 0,
+ devices_blacklisted: 0,
+ devices_unknown: 0,
+ deployments_total: deployments.len(),
+ health_pct: 0,
+ attention_devices: devices
+ .iter()
+ .filter(|d| matches!(d.status, DeviceStatus::Stale | DeviceStatus::Unknown))
+ .take(12)
+ .cloned()
+ .collect(),
+ top_deployments: Vec::new(),
+ active_alerts,
+ rolling_count: 0,
+ failing_count: 0,
+ };
+ for dev in &devices {
+ match dev.status {
+ DeviceStatus::Healthy => d.devices_healthy += 1,
+ DeviceStatus::Pending => d.devices_pending += 1,
+ DeviceStatus::Stale => d.devices_stale += 1,
+ DeviceStatus::Failing => d.devices_failing += 1,
+ DeviceStatus::Blacklisted => d.devices_blacklisted += 1,
+ DeviceStatus::Unknown => d.devices_unknown += 1,
+ }
+ }
+ if d.devices_total > 0 {
+ d.health_pct =
+ ((d.devices_healthy as f64 / d.devices_total as f64) * 100.0).round() as u32;
+ }
+ for dep in &deployments {
+ match dep.status {
+ DeploymentStatus::Rolling => d.rolling_count += 1,
+ DeploymentStatus::Failing => d.failing_count += 1,
+ _ => {}
+ }
+ }
+ deployments.truncate(4);
+ d.top_deployments = deployments;
+ Ok(d)
+ }
+
+ async fn list_devices(&self) -> anyhow::Result> {
+ self.devices().await
+ }
+
+ async fn get_device(&self, id: &str) -> anyhow::Result