feat(fleet-operator): real dashboard data from kube CRs + NATS KV #322

Merged
johnride merged 11 commits from feat/fleet-operator-real-data into master 2026-06-02 03:48:30 +00:00
29 changed files with 1292 additions and 164 deletions

5
Cargo.lock generated
View File

@@ -3498,6 +3498,7 @@ name = "example_fleet_staging_install"
version = "0.1.0"
dependencies = [
"anyhow",
"base64 0.22.1",
"clap",
"harmony",
"harmony-fleet-deploy",
@@ -4104,6 +4105,7 @@ dependencies = [
"harmony_config",
"harmony_macros",
"harmony_types",
"harmony_zitadel_auth",
"k8s-openapi",
"kube",
"log",
@@ -4164,6 +4166,7 @@ dependencies = [
"harmony",
"harmony-fleet-auth",
"harmony-reconciler-contracts",
"harmony_config",
"harmony_zitadel_auth",
"k8s-openapi",
"kube",
@@ -4533,10 +4536,12 @@ dependencies = [
"axum-extra",
"base64 0.22.1",
"chrono",
"harmony_config",
"jsonwebtoken",
"openidconnect",
"rand 0.9.4",
"reqwest 0.12.28",
"schemars 0.8.22",
"serde",
"serde_json",
"sha2 0.10.9",

View File

@@ -24,6 +24,7 @@
- [Writing a Topology](./guides/writing-a-topology.md)
- [Adding Capabilities](./guides/adding-capabilities.md)
- [Web Authentication and CSRF Security](./guides/web-auth-security.md)
- [Operator Dashboard SSO (Zitadel) — setup](./guides/operator-dashboard-sso.md)
## Configuration

View File

@@ -0,0 +1,55 @@
# Operator Dashboard SSO (Zitadel) — setup
Browser SSO for the fleet operator web UI (OIDC Authorization Code + PKCE,
public client). Distinct from the agent/callout machine auth
([fleet-zitadel-faq](./fleet-zitadel-faq.md)); the security rationale is in
[web-auth-security](./web-auth-security.md). Code: `harmony_zitadel_auth/`.
## Quickstart (staging)
1. **Zitadel app** — create a **Web** application, auth method **PKCE** (no client
secret), redirect URI `https://fleet-stg.<base>/auth/callback`, post-logout URI
`https://fleet-stg.<base>/`. Copy its **Client ID**.
2. **Seed config** in OpenBao (namespace `fleet-staging`) — the deploy derives every
host from `base_domain`, so you set only:
- `FleetDeployConfig.operator_oidc_client_id` = the Client ID
- `FleetDeployConfig.operator_trusted_audiences` = `["<Client ID>"]`
- `FleetDeploySecrets.operator_cookie_key_b64` = `openssl rand -base64 64`
3. **Deploy**: `./fleet/scripts/dev-deploy-operator.sh`
4. Open `https://fleet-stg.<base>/` → Zitadel login → back to the dashboard.
`fleet_staging_install` already generates the cookie key, so a fresh install needs
only the Client ID + audiences.
## Local dev (`serve-web`)
`fleet/harmony-fleet-operator/dev.sh` sets the same config as two
`HARMONY_CONFIG_<TypeName>` env JSON blobs (ConfigClient's env source). Point
`base_url` at `http://localhost:18080`, register that callback in the app, and turn
on the app's **Development Mode** (Zitadel rejects non-HTTPS redirects otherwise).
## When login fails — check these first
- **`iss` mismatch** — `zitadel_base` must equal the token issuer byte-for-byte, no
trailing slash.
- **`aud` mismatch** — `trusted_audiences` must contain the token's `aud`; Zitadel
puts the app's Client ID there by default.
- **Client secret** — the app must be PKCE-only; the code never sends a secret.
- **Redirect URI** — must be exactly `{base_url}/auth/callback`.
- **Cookie key** — `cookie_key_b64` must decode to ≥64 bytes, else the dashboard
refuses to start (`cookie_key_b64 must decode to at least 64 bytes`; reconcile
keeps running).
## Config reference
The operator reads `ZitadelAuthConfig` + `OperatorCookieKey` via ConfigClient. The
deploy derives `zitadel_base` / `base_url` / `logout_redirect_uri` from `base_domain`
(`https://sso-stg.<base>`, `https://fleet-stg.<base>`, `…/`) and fixes
`scope = openid profile email`; you supply `client_id`, `trusted_audiences`,
`cookie_key_b64`. All endpoints derive from `zitadel_base`:
`/.well-known/openid-configuration`, `/oauth/v2/authorize`, `/oauth/v2/token`,
`/oidc/v1/end_session`.
> The dashboard only checks that the user authenticated — no role gate yet
> ([web-auth-security](./web-auth-security.md) §3,
> [ROADMAP/09](../../ROADMAP/09-sso-config-hardening.md)).

View File

@@ -2,6 +2,8 @@
These guidelines define the baseline for Harmony web frontends and future operator dashboards that use browser-based authentication, cookie sessions, Axum, HTMX, or OIDC providers such as Zitadel.
> Setting up the fleet operator dashboard's SSO concretely (Zitadel app + the exact config values)? See [Operator Dashboard SSO (Zitadel) — setup](./operator-dashboard-sso.md).
## Goals
- Prevent unauthenticated access.

View File

@@ -680,6 +680,8 @@ key_json = """
log_level: "info,kube_runtime=warn".to_string(),
credentials: Some(OperatorCredentials { credentials_toml }),
chart_version: None,
web_auth_config_json: None,
web_cookie_key_json: None,
};
// CRDs first — the operator watches them on startup.

View File

@@ -20,6 +20,7 @@ harmony-nats-callout = { path = "../../nats/callout" }
harmony-fleet-deploy = { path = "../../fleet/harmony-fleet-deploy" }
nkeys = "0.4"
rand = "0.9"
base64.workspace = true
anyhow.workspace = true
clap = { version = "4", features = ["derive", "env"] }
tokio.workspace = true

View File

@@ -347,6 +347,7 @@ path "secret/metadata/harmony/*" { capabilities = ["list","read"] }"#
.set(&FleetDeploySecrets {
operator_credentials_toml: credentials.credentials_toml.clone(),
kubeconfig: None,
operator_cookie_key_b64: generate_cookie_key_b64(),
})
.await
.context("seed FleetDeploySecrets")?;
@@ -378,3 +379,13 @@ fn generate_alphanum(len: usize) -> String {
.map(|_| CHARSET[rng.random_range(0..CHARSET.len())] as char)
.collect()
}
/// Standard-base64 of 64 random bytes — the operator's session-cookie
/// signing key. Generated per install and seeded as a secret.
fn generate_cookie_key_b64() -> String {
use base64::Engine;
use rand::Rng;
let mut bytes = [0u8; 64];
rand::rng().fill(&mut bytes);
base64::engine::general_purpose::STANDARD.encode(bytes)
}

View File

@@ -29,6 +29,7 @@ harmony_types = { path = "../../harmony_types" }
harmony_macros = { path = "../../harmony_macros" }
harmony-fleet-auth = { path = "../harmony-fleet-auth" }
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
harmony_zitadel_auth = { path = "../../harmony_zitadel_auth" }
anyhow = { workspace = true }
async-trait = { workspace = true }

View File

@@ -1,21 +1,29 @@
//! `harmony-fleet-publish` — build + publish the operator image + chart
//! for a tagged release. `docker` / `helm` must be on PATH and logged in
//! to the registry (CI's login actions; dev's manual login).
//! at a version. `docker` / `helm` must be on PATH and logged in to the
//! registry (CI's login actions; dev's manual login).
use anyhow::Result;
use anyhow::{Result, bail};
use clap::Parser;
use harmony_fleet_deploy::release::{release_operator, version_from_tag};
#[derive(Parser, Debug)]
#[command(
name = "harmony-fleet-publish",
about = "Build + publish the operator image + chart for a tagged release"
about = "Build + publish the operator image + chart at a version"
)]
struct Cli {
/// Git tag, e.g. `harmony-fleet-operator-v0.1.0`. Defaults to
/// `$GITHUB_REF_NAME` so CI passes nothing.
/// The image + chart version, e.g. `0.1.0` or `0.0.0-dev.1730000000`
/// (must be valid semver — helm rejects anything else). Wins over
/// `--from-tag`; the laptop/dev path.
#[arg(long)]
version: Option<String>,
/// Parse the version out of a release tag like
/// `harmony-fleet-operator-v0.1.0`. Defaults to `$GITHUB_REF_NAME`
/// so the release workflow passes nothing. Used when `--version`
/// is unset.
#[arg(long, env = "GITHUB_REF_NAME")]
from_tag: String,
from_tag: Option<String>,
#[arg(long, default_value = "hub.nationtech.io")]
registry: String,
@@ -28,9 +36,19 @@ struct Cli {
no_push: bool,
}
impl Cli {
fn version(&self) -> Result<String> {
match (&self.version, &self.from_tag) {
(Some(v), _) => Ok(v.clone()),
(None, Some(tag)) => version_from_tag(tag),
(None, None) => bail!("set --version (e.g. 0.1.0) or --from-tag"),
}
}
}
fn main() -> Result<()> {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
let cli = Cli::parse();
let version = version_from_tag(&cli.from_tag)?;
let version = cli.version()?;
release_operator(&version, &cli.registry, &cli.project, !cli.no_push)
}

View File

@@ -18,6 +18,7 @@ use harmony_config::ConfigClient;
use harmony_fleet_deploy::{
FleetDeployConfig, FleetDeploySecrets, FleetOperatorScore, version_from_tag,
};
use harmony_zitadel_auth::{OperatorCookieKey, ZitadelAuthConfig};
use tracing::info;
#[derive(Parser, Debug)]
@@ -114,12 +115,28 @@ async fn main() -> Result<()> {
// Coherent with the other staging hosts (sso-stg., secrets-stg.).
let ui_host = format!("fleet-stg.{}", config.base_domain);
// Dashboard SSO config: hosts derive from base_domain; client_id +
// audiences come from config, cookie key from secrets. Baked into the
// operator Secret for the pod's ConfigClient to read.
let web_auth = ZitadelAuthConfig {
zitadel_base: format!("https://sso-stg.{}", config.base_domain),
base_url: format!("https://{ui_host}"),
client_id: config.operator_oidc_client_id,
scope: "openid profile email".to_string(),
trusted_audiences: config.operator_trusted_audiences,
logout_redirect_uri: format!("https://{ui_host}/"),
};
let cookie = OperatorCookieKey {
cookie_key_b64: secrets.operator_cookie_key_b64,
};
let operator = FleetOperatorScore::new()
.namespace(namespace)
.nats_url(config.nats_url)
.credentials(secrets.operator_credentials_toml)
.published_chart(registry, project, version)
.ingress(ui_host, Some(config.cluster_issuer));
.ingress(ui_host, Some(config.cluster_issuer))
.web_auth(web_auth, cookie);
harmony_cli::run(
Inventory::autoload(),

View File

@@ -70,6 +70,13 @@ pub struct ChartOptions {
/// sets this to the released tag so the OCI chart artifact lands
/// at `…/harmony-fleet-operator-chart:<tag>` matching the image tag.
pub chart_version: Option<String>,
/// JSON of the dashboard's `ZitadelAuthConfig`, stored in the
/// operator Secret under [`ENV_WEB_AUTH_CONFIG`]. `None` leaves the
/// dashboard unauthenticated (dev/e2e).
pub web_auth_config_json: Option<String>,
/// JSON of the dashboard's `OperatorCookieKey`, stored under
/// [`ENV_WEB_COOKIE_KEY`].
pub web_cookie_key_json: Option<String>,
}
/// What the operator pod needs to authenticate to NATS via the auth
@@ -134,6 +141,8 @@ impl Default for ChartOptions {
log_level: "info,kube_runtime=warn".to_string(),
credentials: None,
chart_version: None,
web_auth_config_json: None,
web_cookie_key_json: None,
}
}
}
@@ -153,6 +162,15 @@ pub const OPERATOR_HTTP_PORT: u16 = 18080;
/// including the inline JSON keyfile under `key_json`.
pub const SECRET_KEY_CREDENTIALS_TOML: &str = "credentials.toml";
/// Env var + operator-Secret key the dashboard's `ConfigClient` reads
/// its `ZitadelAuthConfig` / `OperatorCookieKey` from. The value is the
/// JSON of those structs; `ConfigClient`'s `EnvSource` keys a config
/// type under `HARMONY_CONFIG_<KEY>`. Locked to the structs' Config
/// keys by a test below — the chart wires the env→Secret reference at
/// publish time, the deploy fills the Secret values at deploy time.
pub const ENV_WEB_AUTH_CONFIG: &str = "HARMONY_CONFIG_ZitadelAuthConfig";
pub const ENV_WEB_COOKIE_KEY: &str = "HARMONY_CONFIG_OperatorCookieKey";
/// Build + write the chart to `opts.output_dir`. Returns the full
/// path to the generated chart directory (which is what `helm
/// install <path>` wants).
@@ -215,6 +233,21 @@ pub fn operator_secret(opts: &ChartOptions) -> Option<Secret> {
SECRET_KEY_CREDENTIALS_TOML.to_string(),
ByteString(creds.credentials_toml.as_bytes().to_vec()),
);
// Dashboard auth config + cookie key (when configured) ride in the
// same Secret; the Deployment sources them as HARMONY_CONFIG_* env
// for the operator's ConfigClient.
if let Some(json) = &opts.web_auth_config_json {
data.insert(
ENV_WEB_AUTH_CONFIG.to_string(),
ByteString(json.as_bytes().to_vec()),
);
}
if let Some(json) = &opts.web_cookie_key_json {
data.insert(
ENV_WEB_COOKIE_KEY.to_string(),
ByteString(json.as_bytes().to_vec()),
);
}
// Namespace deliberately omitted — the caller passes the target
// namespace to `K8sResourceScore::single`, which injects it at
// apply time. Keeps the Secret manifest reusable across
@@ -294,7 +327,7 @@ fn cluster_role() -> ClusterRole {
// Devices: reconciler server-side-applies + deletes;
// aggregator lists + watches.
PolicyRule {
api_groups: Some(vec![group]),
api_groups: Some(vec![group.clone()]),
resources: Some(vec!["devices".to_string()]),
verbs: vec![
"get", "list", "watch", "create", "update", "patch", "delete",
@@ -304,6 +337,17 @@ fn cluster_role() -> ClusterRole {
.collect(),
..Default::default()
},
// Device liveness: the device-status reconciler patches the
// status subresource — a distinct RBAC resource from `devices`.
PolicyRule {
api_groups: Some(vec![group]),
resources: Some(vec!["devices/status".to_string()]),
verbs: vec!["get", "update", "patch"]
.into_iter()
.map(String::from)
.collect(),
..Default::default()
},
]),
..Default::default()
}
@@ -383,6 +427,22 @@ fn operator_deployment(opts: &ChartOptions) -> K8sDeployment {
// pod compatible with OKD's restricted-v2 SCC and the
// `harmony_fleet_auth::CredentialsSection` deserializer
// handles inline-vs-file from the same TOML shape.
// All three env-from-Secret with optional=true: the Pod starts
// before the deploy applies the Secret, and the dashboard's two
// HARMONY_CONFIG_* values are simply absent on deploys that don't
// configure auth (dev/e2e).
let secret_env = |name: &str| EnvVar {
name: name.to_string(),
value_from: Some(EnvVarSource {
secret_key_ref: Some(SecretKeySelector {
name: SECRET_NAME.to_string(),
key: name.to_string(),
optional: Some(true),
}),
..Default::default()
}),
..Default::default()
};
env.push(EnvVar {
name: OPERATOR_CREDENTIALS_ENV_VAR.to_string(),
value_from: Some(EnvVarSource {
@@ -395,6 +455,8 @@ fn operator_deployment(opts: &ChartOptions) -> K8sDeployment {
}),
..Default::default()
});
env.push(secret_env(ENV_WEB_AUTH_CONFIG));
env.push(secret_env(ENV_WEB_COOKIE_KEY));
// Namespace deliberately omitted — same rationale as the
// ServiceAccount: helm fills in the release namespace at install
@@ -532,7 +594,7 @@ mod tests {
}
#[test]
fn chart_includes_credentials_secret_and_env_var() {
fn chart_omits_secret_but_deployment_references_it() {
let tmp = tempfile::tempdir().unwrap();
let chart_path = build_chart(&ChartOptions {
output_dir: tmp.path().to_path_buf(),
@@ -540,14 +602,58 @@ mod tests {
})
.unwrap();
let secret_yaml =
std::fs::read_to_string(chart_path.join("templates/secret-credentials.yaml"))
.expect("secret-credentials.yaml must exist in chart");
assert!(secret_yaml.contains(SECRET_NAME));
// The chart is hydrated (no templating), so it must NOT carry the
// credentials Secret — the deploy applies it out-of-band. A Secret
// in the chart fights Helm ownership against the deploy-applied one
// and re-blanks it on every upgrade.
assert!(
!chart_path
.join("templates/secret-credentials.yaml")
.exists(),
"hydrated chart must not contain a Secret"
);
// The Deployment still wires the env var from that out-of-band
// Secret via secretKeyRef (optional, so the Pod starts before it
// lands).
let deployment_yaml = std::fs::read_to_string(chart_path.join("templates/deployment.yaml"))
.expect("deployment.yaml must exist in chart");
assert!(deployment_yaml.contains(OPERATOR_CREDENTIALS_ENV_VAR));
assert!(deployment_yaml.contains(SECRET_NAME));
}
// The CRDs carry status subresources, which RBAC treats as distinct
// resources — a patch on `*/status` is forbidden without an explicit
// grant. Lock both so adding a status subresource can't silently 403.
#[test]
fn cluster_role_grants_status_subresources() {
let role = cluster_role();
let grants_patch = |resource: &str| {
role.rules.as_ref().unwrap().iter().any(|r| {
r.resources
.as_ref()
.is_some_and(|res| res.iter().any(|x| x == resource))
&& r.verbs.iter().any(|v| v == "patch")
})
};
assert!(grants_patch("deployments/status"));
assert!(grants_patch("devices/status"));
}
// The chart bakes these env names at publish time; the operator's
// ConfigClient derives them from the struct names at runtime. Lock
// them together so a rename can't silently desync the two.
#[test]
fn web_auth_env_names_match_config_keys() {
use harmony_config::Config;
use harmony_zitadel_auth::{OperatorCookieKey, ZitadelAuthConfig};
assert_eq!(
ENV_WEB_AUTH_CONFIG,
format!("HARMONY_CONFIG_{}", ZitadelAuthConfig::KEY)
);
assert_eq!(
ENV_WEB_COOKIE_KEY,
format!("HARMONY_CONFIG_{}", OperatorCookieKey::KEY)
);
}
}

View File

@@ -78,6 +78,17 @@ pub struct FleetOperatorScore {
/// cert-manager `ClusterIssuer` for the UI Ingress. `None` (or no
/// host) serves plain HTTP — the right default on issuer-less k3d.
pub cluster_issuer: Option<String>,
/// Dashboard SSO config + cookie key, baked into the operator Secret
/// for the pod's `ConfigClient` to read. `None` leaves the dashboard
/// unauthenticated (dev/e2e).
pub web_auth: Option<WebAuth>,
}
/// The dashboard's auth inputs the operator reads via `ConfigClient`.
#[derive(Debug, Clone, Serialize)]
pub struct WebAuth {
pub config: harmony_zitadel_auth::ZitadelAuthConfig,
pub cookie: harmony_zitadel_auth::OperatorCookieKey,
}
impl FleetOperatorScore {
@@ -98,6 +109,7 @@ impl FleetOperatorScore {
published_chart: None,
operator_ui_host: None,
cluster_issuer: None,
web_auth: None,
}
}
@@ -110,6 +122,17 @@ impl FleetOperatorScore {
self
}
/// Configure dashboard SSO: the `ZitadelAuthConfig` + cookie key are
/// baked into the operator Secret for the pod's `ConfigClient`.
pub fn web_auth(
mut self,
config: harmony_zitadel_auth::ZitadelAuthConfig,
cookie: harmony_zitadel_auth::OperatorCookieKey,
) -> Self {
self.web_auth = Some(WebAuth { config, cookie });
self
}
/// Install the published OCI chart at `version` instead of rendering
/// one from local source (the CD `harmony apply` path).
pub fn published_chart(
@@ -202,9 +225,22 @@ impl<T: Topology + HelmCommand + K8sclient> Interpret<T> for FleetOperatorInterp
// directly, not via the chart — it's environment-specific. The
// published-chart CD path runs without credentials today, so
// this is a no-op there.
let (web_auth_config_json, web_cookie_key_json) = match &self.score.web_auth {
Some(w) => (
Some(serde_json::to_string(&w.config).map_err(|e| {
InterpretError::new(format!("serialize ZitadelAuthConfig: {e}"))
})?),
Some(serde_json::to_string(&w.cookie).map_err(|e| {
InterpretError::new(format!("serialize OperatorCookieKey: {e}"))
})?),
),
None => (None, None),
};
if let Some(creds) = &self.score.credentials
&& let Some(secret) = operator_secret(&ChartOptions {
credentials: Some(creds.clone()),
web_auth_config_json,
web_cookie_key_json,
..ChartOptions::default()
})
{
@@ -252,6 +288,10 @@ impl<T: Topology + HelmCommand + K8sclient> Interpret<T> for FleetOperatorInterp
log_level: self.score.log_level.clone(),
credentials: self.score.credentials.clone(),
chart_version: None,
// The auth Secret is applied separately above; the
// rendered chart only needs the Deployment env wiring.
web_auth_config_json: None,
web_cookie_key_json: None,
})
.map_err(|e| InterpretError::new(format!("build operator chart: {e}")))?;
let chart_path_str = chart_path.to_str().ok_or_else(|| {

View File

@@ -22,6 +22,12 @@ pub struct FleetDeploySecrets {
/// `KUBECONFIG`.
#[serde(default)]
pub kubeconfig: Option<String>,
/// Operator dashboard session-cookie signing key (standard-base64 of
/// ≥64 bytes). `#[serde(default)]` so older seeds without it still
/// load — but the dashboard won't authenticate until it's set.
#[serde(default)]
pub operator_cookie_key_b64: String,
}
/// Non-secret deploy config: k8s namespaces + chart coords. Loaded via
@@ -52,6 +58,14 @@ pub struct FleetDeployConfig {
/// cert-manager `ClusterIssuer` for the operator UI's TLS cert.
pub cluster_issuer: String,
/// Zitadel OIDC app the operator dashboard authenticates against.
#[serde(default)]
pub operator_oidc_client_id: String,
/// Token audiences the dashboard accepts (Zitadel project/app ids).
#[serde(default)]
pub operator_trusted_audiences: Vec<String>,
}
impl Default for FleetDeployConfig {
@@ -65,6 +79,8 @@ impl Default for FleetDeployConfig {
operator_chart_project: "harmony".to_string(),
base_domain: "cb1.nationtech.io".to_string(),
cluster_issuer: "letsencrypt-prod".to_string(),
operator_oidc_client_id: String::new(),
operator_trusted_audiences: Vec::new(),
}
}
}

View File

@@ -17,6 +17,7 @@ web-frontend = ["dep:axum", "dep:axum-extra", "dep:maud", "dep:tokio-stream", "h
harmony = { path = "../../harmony", features = ["podman"] }
harmony-fleet-auth = { path = "../harmony-fleet-auth" }
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
harmony_config = { path = "../../harmony_config" }
harmony_zitadel_auth = { path = "../../harmony_zitadel_auth" }
toml = { workspace = true }
chrono = { workspace = true, features = ["serde"] }

View File

@@ -1,3 +1,4 @@
# syntax=docker/dockerfile:1.7
# Multi-stage container build for harmony-fleet-operator.
#
# Build context is the workspace root (the operator's Cargo.toml has
@@ -21,17 +22,44 @@
FROM docker.io/rust:1.94-slim-bookworm AS builder
# pkg-config + libssl-dev cover transitive native-tls/openssl-sys deps
# that surface during link; ca-certificates lets cargo fetch over TLS.
# that surface during link; ca-certificates lets cargo fetch over TLS;
# curl downloads the Tailwind CLI below.
RUN apt-get update && apt-get install -y --no-install-recommends \
pkg-config \
ca-certificates \
libssl-dev \
curl \
&& rm -rf /var/lib/apt/lists/*
# Tailwind v4 standalone CLI: build.rs shells out to it to generate the
# embedded CSS bundle. Pinned to match the dev host; override with
# --build-arg TAILWIND_VERSION=... TAILWIND_REQUIRED makes build.rs hard
# fail (not silently ship empty CSS) if the CLI is missing — see build.rs.
ARG TAILWIND_VERSION=v4.3.0
RUN curl -fsSL -o /usr/local/bin/tailwindcss \
"https://github.com/tailwindlabs/tailwindcss/releases/download/${TAILWIND_VERSION}/tailwindcss-linux-x64" \
&& chmod +x /usr/local/bin/tailwindcss \
&& tailwindcss --help >/dev/null
ENV TAILWIND_REQUIRED=1
WORKDIR /app
COPY . .
RUN cargo build --release --locked -p harmony-fleet-operator
# `web-frontend` bundles the dashboard the operator serves in-process
# alongside the reconcile loop.
#
# BuildKit cache mounts persist the cargo registry + `target/` across
# builds, so an iterate-build-deploy loop recompiles only changed crates
# (seconds) instead of the whole workspace (minutes). The mounts are
# build-time only — the resulting image is identical, and a cold CI
# runner just rebuilds from scratch. The binary is copied out of the
# cache-mounted target within the same RUN, since cache mounts aren't
# part of the produced layer.
RUN --mount=type=cache,target=/usr/local/cargo/registry \
--mount=type=cache,target=/usr/local/cargo/git \
--mount=type=cache,target=/app/target \
cargo build --release --locked -p harmony-fleet-operator --features web-frontend \
&& cp /app/target/release/harmony-fleet-operator /harmony-fleet-operator
FROM docker.io/library/debian:bookworm-slim
@@ -41,7 +69,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
ca-certificates \
&& rm -rf /var/lib/apt/lists/*
COPY --from=builder /app/target/release/harmony-fleet-operator /usr/local/bin/harmony-fleet-operator
COPY --from=builder /harmony-fleet-operator /usr/local/bin/harmony-fleet-operator
# Non-root runtime. Pairs with the Pod's `securityContext.
# runAsNonRoot: true` in the helm chart — k8s admission rejects pods

View File

@@ -33,34 +33,38 @@ fn main() {
println!("cargo:rerun-if-changed=style/input.css");
println!("cargo:rerun-if-changed=src/frontend");
println!("cargo:rerun-if-changed=src/service");
// Toggling this re-runs the script, so the container build (which sets
// it) regenerates rather than reusing a cache-mounted empty bundle.
println!("cargo:rerun-if-env-changed=TAILWIND_REQUIRED");
let status = Command::new("tailwindcss")
// Container/production builds set TAILWIND_REQUIRED: a missing or
// failing CLI is a hard error there (never ship empty CSS). Dev builds
// leave it unset and fall back to empty, serving CSS via
// `serve-web --css-from <path>` against a `tailwindcss --watch` sidecar.
let required = std::env::var_os("TAILWIND_REQUIRED").is_some();
let fall_back = |reason: String| {
assert!(
!required,
"{reason}\nTAILWIND_REQUIRED is set: refusing to ship a frontend with empty CSS. \
Install the v4 standalone CLI: https://github.com/tailwindlabs/tailwindcss/releases"
);
println!(
"cargo:warning={reason}; embedded CSS will be empty \
(use `serve-web --css-from <path>` in dev)."
);
std::fs::write(&output, b"").unwrap();
};
match Command::new("tailwindcss")
.arg("--input")
.arg(&input)
.arg("--output")
.arg(&output)
.arg("--minify")
.status();
match status {
.status()
{
Ok(s) if s.success() => {}
Ok(s) => {
println!(
"cargo:warning=tailwindcss exited with status {s}; embedded CSS will be empty. \
Install the v4 standalone CLI \
(https://github.com/tailwindlabs/tailwindcss/releases) for production builds, \
or use `serve-web --css-from <path>` against a `tailwindcss --watch` sidecar in dev."
);
std::fs::write(&output, b"").unwrap();
}
Err(e) => {
println!(
"cargo:warning=tailwindcss not invocable ({e}); embedded CSS will be empty. \
Install the v4 standalone CLI \
(https://github.com/tailwindlabs/tailwindcss/releases) for production builds, \
or use `serve-web --css-from <path>` against a `tailwindcss --watch` sidecar in dev."
);
std::fs::write(&output, b"").unwrap();
}
Ok(s) => fall_back(format!("tailwindcss exited with status {s}")),
Err(e) => fall_back(format!("tailwindcss not invocable ({e})")),
}
}

View File

@@ -1,13 +1,11 @@
#!/bin/bash
export BASE_URL=http://localhost:18080
export FLEET_AUTH_ZITADEL_BASE=https://sso-stg.cb1.nationtech.io
export FLEET_AUTH_CLIENT_ID=372626218874372917
export FLEET_AUTH_SCOPE="openid profile email"
export FLEET_AUTH_LOGOUT_REDIRECT_URI="http://localhost:18080/"
export FLEET_OPERATOR_COOKIE_KEY_B64=6eKVpj88jwIcmaJajPfohdaIXhSPlfYCrHaOfymTcIWBAIadvhg7NHpMo5vPSMy90vac3cq2liWe1naSgkbaYg==
export FLEET_AUTH_TRUSTED_AUDIENCES=371639797493596981,371683318111994677,372626218874372917,371639797157987125
export BASE_URL=http://localhost:18080
# The operator reads its auth config via ConfigClient — one typed value
# each (EnvSource keys a Config struct under HARMONY_CONFIG_<TypeName>),
# not a fistful of FLEET_AUTH_* vars. base_url is localhost for this local
# serve-web; staging derives it from base_domain in the deploy.
export HARMONY_CONFIG_ZitadelAuthConfig='{"zitadel_base":"https://sso-stg.cb1.nationtech.io","base_url":"http://localhost:18080","client_id":"372626218874372917","scope":"openid profile email","trusted_audiences":["371639797493596981","371683318111994677","372626218874372917","371639797157987125"],"logout_redirect_uri":"http://localhost:18080/"}'
export HARMONY_CONFIG_OperatorCookieKey='{"cookie_key_b64":"6eKVpj88jwIcmaJajPfohdaIXhSPlfYCrHaOfymTcIWBAIadvhg7NHpMo5vPSMy90vac3cq2liWe1naSgkbaYg=="}'
export RUST_LOG=debug
cargo watch -- cargo run -p harmony-fleet-operator --features web-frontend -- serve-web \

View File

@@ -0,0 +1,173 @@
//! Device liveness reconciler: NATS `device-heartbeat` KV → `Device.status`.
//!
//! Agents ping `heartbeat.<device_id>` every ~30 s. This reconciler
//! keeps the latest ping per device in memory (from a KV watch) and,
//! on a tick, reflects freshness onto `Device.status` —
//! `lastHeartbeat` + a coarse [`Reachability`]. That makes liveness
//! visible to `kubectl get devices` and lets the dashboard read it
//! from the CR instead of re-deriving staleness from NATS.
//!
//! Mirrors the aggregator's "watch fills a cache, a tick patches the
//! CR" shape. The staleness threshold lives here only — it's a device
//! concept the operator owns, not something every reader re-decides.
use std::collections::HashMap;
use std::time::Duration;
use anyhow::Result;
use async_nats::jetstream::kv::{Operation, Store};
use chrono::{DateTime, Utc};
use futures_util::StreamExt;
use harmony_reconciler_contracts::{BUCKET_DEVICE_HEARTBEAT, HeartbeatPayload};
use kube::api::{Api, Patch, PatchParams};
use kube::{Client, ResourceExt};
use serde_json::json;
use tokio::sync::Mutex;
use harmony::modules::fleet::operator::{Device, Reachability};
/// A device with no heartbeat within this window is `Stale`. Agents
/// ping every 30 s, so this tolerates ~2 missed pings.
const STALE_AFTER: Duration = Duration::from_secs(90);
/// How often to re-evaluate freshness and patch changed devices.
const TICK: Duration = Duration::from_secs(30);
pub async fn run(client: Client, js: async_nats::jetstream::Context) -> Result<()> {
let bucket = js
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_DEVICE_HEARTBEAT.to_string(),
..Default::default()
})
.await?;
let heartbeats: Mutex<HashMap<String, DateTime<Utc>>> = Mutex::new(HashMap::new());
let devices: Api<Device> = Api::all(client);
tokio::try_join!(
watch_heartbeats(&bucket, &heartbeats),
patch_loop(&devices, &heartbeats),
)?;
Ok(())
}
async fn watch_heartbeats(
bucket: &Store,
heartbeats: &Mutex<HashMap<String, DateTime<Utc>>>,
) -> Result<()> {
let mut watch = bucket.watch_with_history(">").await?;
tracing::info!("device-status: watching device-heartbeat KV");
while let Some(entry_res) = watch.next().await {
let entry = match entry_res {
Ok(e) => e,
Err(e) => {
tracing::warn!(error = %e, "device-status: watch delivery error");
continue;
}
};
match entry.operation {
Operation::Put => {
if let Ok(hb) = serde_json::from_slice::<HeartbeatPayload>(&entry.value) {
heartbeats
.lock()
.await
.insert(hb.device_id.to_string(), hb.at);
}
}
Operation::Delete | Operation::Purge => {
if let Some(id) = entry.key.strip_prefix("heartbeat.") {
heartbeats.lock().await.remove(id);
}
}
}
}
Ok(())
}
async fn patch_loop(
devices: &Api<Device>,
heartbeats: &Mutex<HashMap<String, DateTime<Utc>>>,
) -> Result<()> {
// Last status written per device, to skip no-op patches.
let mut applied: HashMap<String, (Reachability, DateTime<Utc>)> = HashMap::new();
let mut ticker = tokio::time::interval(TICK);
loop {
ticker.tick().await;
let snapshot: Vec<(String, DateTime<Utc>)> = heartbeats
.lock()
.await
.iter()
.map(|(k, v)| (k.clone(), *v))
.collect();
let now = Utc::now();
for (id, at) in snapshot {
let reachability = reachability(at, now);
if applied.get(&id) == Some(&(reachability, at)) {
continue;
}
if patch_status(devices, &id, reachability, at).await {
applied.insert(id, (reachability, at));
}
}
}
}
fn reachability(last_heartbeat: DateTime<Utc>, now: DateTime<Utc>) -> Reachability {
if (now - last_heartbeat)
.to_std()
.is_ok_and(|d| d <= STALE_AFTER)
{
Reachability::Reachable
} else {
Reachability::Stale
}
}
/// Returns whether the patch succeeded (so we only cache applied state
/// on success and retry next tick otherwise).
async fn patch_status(
devices: &Api<Device>,
id: &str,
reachability: Reachability,
last_heartbeat: DateTime<Utc>,
) -> bool {
let status = json!({
"status": {
"lastHeartbeat": last_heartbeat.to_rfc3339(),
"reachability": reachability,
}
});
match devices
.patch_status(id, &PatchParams::default(), &Patch::Merge(&status))
.await
{
Ok(d) => {
tracing::debug!(device = %d.name_any(), ?reachability, "device-status: patched");
true
}
// A heartbeat can outrace the Device CR's creation by the
// device-reconciler; skip this tick and retry on the next.
Err(kube::Error::Api(ae)) if ae.code == 404 => false,
Err(e) => {
tracing::warn!(%id, error = %e, "device-status: patch failed");
false
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn reachable_within_window_stale_after() {
let now = Utc::now();
assert_eq!(
reachability(now - chrono::Duration::seconds(10), now),
Reachability::Reachable
);
assert_eq!(
reachability(now - chrono::Duration::seconds(120), now),
Reachability::Stale
);
}
}

View File

@@ -14,4 +14,5 @@
pub mod commands;
pub mod device_reconciler;
pub mod device_status;
pub mod fleet_aggregator;

View File

@@ -5,7 +5,7 @@ mod frontend;
#[cfg(feature = "web-frontend")]
mod service;
use harmony_fleet_operator::{device_reconciler, fleet_aggregator};
use harmony_fleet_operator::{device_reconciler, device_status, fleet_aggregator};
use anyhow::{Context, Result};
use async_nats::jetstream;
@@ -109,6 +109,9 @@ async fn main() -> Result<()> {
}
}
/// `serve-web` subcommand: dashboard on its own (mock data, or the live
/// CR-reading service). The deployed operator instead serves the
/// dashboard alongside the reconcile loop — see [`spawn_dashboard`].
#[cfg(feature = "web-frontend")]
async fn serve_web(
mock: bool,
@@ -117,22 +120,46 @@ async fn serve_web(
live_reload: bool,
) -> Result<()> {
use std::sync::Arc;
use std::time::Duration;
use frontend::server::{AppState, Config};
use service::{FleetService, mock::MockFleetService};
use service::{FleetService, mock::MockFleetService, real::RealFleetService};
let fleet: Arc<dyn FleetService> = if mock {
Arc::new(MockFleetService::default())
} else {
anyhow::bail!(
"serve-web without --mock is not implemented yet (real FleetService impl pending). \
Pass --mock for the dev workflow."
);
Arc::new(RealFleetService::new(Client::try_default().await?))
};
serve_dashboard(fleet, addr, css_from, live_reload).await
}
let cookie_key = harmony_zitadel_auth::cookie_key_from_env();
let config = harmony_zitadel_auth::config_from_env();
/// Build the Zitadel-authenticated web server around a [`FleetService`]
/// and serve until it exits. Shared by the `serve-web` subcommand and
/// the in-process dashboard the reconcile loop spawns.
#[cfg(feature = "web-frontend")]
async fn serve_dashboard(
fleet: std::sync::Arc<dyn service::FleetService>,
addr: std::net::SocketAddr,
css_from: Option<PathBuf>,
live_reload: bool,
) -> Result<()> {
use std::time::Duration;
use frontend::server::{AppState, Config};
use harmony_zitadel_auth::{OperatorCookieKey, ZitadelAuthConfig};
// One typed config value each, resolved EnvSource → OpenBao by
// ConfigClient — not a fistful of FLEET_AUTH_* env vars. Namespace
// only matters for the OpenBao source; EnvSource reads regardless.
let ns = std::env::var("HARMONY_CONFIG_NAMESPACE").unwrap_or_else(|_| "fleet-staging".into());
let cc = harmony_config::ConfigClient::for_namespace(&ns).await;
let config: ZitadelAuthConfig = cc
.get()
.await
.context("loading ZitadelAuthConfig (HARMONY_CONFIG_ZitadelAuthConfig or OpenBao)")?;
let cookie_key = cc
.get::<OperatorCookieKey>()
.await
.context("loading OperatorCookieKey")?
.key()?;
let http_client = reqwest::Client::new();
let jwks = harmony_zitadel_auth::JwksCache::new(&config.zitadel_base, http_client.clone())
@@ -155,6 +182,25 @@ async fn serve_web(
.await
}
/// Serve the dashboard in-process alongside the reconcile loop. Failure
/// (e.g. Zitadel not yet reachable for JWKS) is logged but never tears
/// down reconcile — the read UI is best-effort, the controller is not.
#[cfg(feature = "web-frontend")]
fn spawn_dashboard(client: Client) {
use std::net::SocketAddr;
use std::sync::Arc;
use service::real::RealFleetService;
let addr = SocketAddr::from(([0, 0, 0, 0], frontend::server::DEFAULT_PORT));
tokio::spawn(async move {
let fleet = Arc::new(RealFleetService::new(client));
if let Err(e) = serve_dashboard(fleet, addr, None, false).await {
tracing::error!(error = %e, "dashboard server exited; reconcile continues");
}
});
}
async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()> {
let nats = connect_with_retry(nats_url, credentials_toml).await?;
tracing::info!(url = %nats_url, "connected to NATS");
@@ -169,9 +215,16 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()>
let client = Client::try_default().await?;
// Three concurrent tasks:
// Serve the read-only dashboard in the same process (best-effort;
// it reads CRs only, no NATS). Built only with the web-frontend
// feature; absent from the lean reconcile-only image.
#[cfg(feature = "web-frontend")]
spawn_dashboard(client.clone());
// Concurrent tasks:
// controller — CR validation + finalizer-cleanup
// device_reconciler — NATS device-info → Device CR
// device_status — NATS device-heartbeat → Device.status liveness
// fleet_aggregator — watches Deployments + Devices + states,
// writes desired-state KV, patches CR status
// Any failing tears the process down; kube-rs Controller swallows
@@ -179,9 +232,12 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()>
let ctl_client = client.clone();
let dr_client = client.clone();
let dr_js = js.clone();
let ds_client = client.clone();
let ds_js = js.clone();
tokio::select! {
r = controller::run(ctl_client, desired_state_kv) => r,
r = device_reconciler::run(dr_client, dr_js) => r,
r = device_status::run(ds_client, ds_js) => r,
r = fleet_aggregator::run(client, js) => r,
}
}

View File

@@ -1,4 +1,5 @@
pub mod mock;
pub mod real;
use async_trait::async_trait;
use chrono::{DateTime, Utc};

View File

@@ -0,0 +1,461 @@
//! Live [`FleetService`] as a read-only projection of Kubernetes CRs.
//!
//! The operator is the write side: `device_reconciler` materializes
//! `Device` CRs (labels + inventory), `device_status` reflects liveness
//! onto `Device.status`, and `fleet_aggregator` writes
//! `Deployment.status.aggregate`. This dashboard is the read side — it
//! only reads those CRs and projects them to view DTOs. No NATS: the
//! CR is the single contract between the two sides.
use std::collections::{BTreeMap, HashSet};
use std::sync::Mutex;
use anyhow::Context as _;
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use kube::api::{Api, ListParams, Patch, PatchParams};
use kube::{Client, ResourceExt};
use serde_json::json;
use harmony::modules::fleet::operator::{
Deployment as DeploymentCr, Device as DeviceCr, DeviceStatus as DeviceLiveness, Reachability,
};
use harmony::modules::podman::ReconcileScore;
use harmony_fleet_operator::fleet_aggregator::selector_matches;
use super::{
Alert, AlertSeverity, DashboardDetail, DeploymentDetail, DeploymentStatus, DeviceDetail,
DeviceStatus, FleetService,
};
/// Label the operator UI sets to quarantine a device.
const BLACKLIST_LABEL: &str = "fleet.nationtech.io/blacklisted";
/// Label key the agent uses for a device's region, if it sets one.
const REGION_LABEL: &str = "region";
pub struct RealFleetService {
kube: Client,
/// In-memory ack set. Alerts are derived from live CR state and
/// have no store of their own, so acks don't survive a restart.
acked_alerts: Mutex<HashSet<String>>,
}
impl RealFleetService {
pub fn new(kube: Client) -> Self {
Self {
kube,
acked_alerts: Mutex::new(HashSet::new()),
}
}
async fn device_crs(&self) -> anyhow::Result<Vec<DeviceCr>> {
let api: Api<DeviceCr> = Api::all(self.kube.clone());
Ok(api.list(&ListParams::default()).await?.items)
}
async fn deployment_crs(&self) -> anyhow::Result<Vec<DeploymentCr>> {
let api: Api<DeploymentCr> = Api::all(self.kube.clone());
Ok(api.list(&ListParams::default()).await?.items)
}
async fn devices(&self) -> anyhow::Result<Vec<DeviceDetail>> {
let deployments = self.deployment_crs().await?;
let now = Utc::now();
let mut devices: Vec<DeviceDetail> = self
.device_crs()
.await?
.iter()
.map(|cr| map_device(cr, &deployments, now))
.collect();
devices.sort_by(|a, b| a.id.cmp(&b.id));
Ok(devices)
}
async fn deployments(&self) -> anyhow::Result<Vec<DeploymentDetail>> {
let mut deployments: Vec<DeploymentDetail> = self
.deployment_crs()
.await?
.iter()
.map(map_deployment)
.collect();
deployments.sort_by(|a, b| a.name.cmp(&b.name));
Ok(deployments)
}
}
fn map_device(cr: &DeviceCr, deployments: &[DeploymentCr], now: DateTime<Utc>) -> DeviceDetail {
let labels = cr.metadata.labels.clone().unwrap_or_default();
let blacklisted = labels.get(BLACKLIST_LABEL).map(String::as_str) == Some("true");
let last_seen = cr
.status
.as_ref()
.and_then(|s| s.last_heartbeat.as_deref())
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
.map(|dt| dt.with_timezone(&Utc))
.or_else(|| cr.creation_timestamp().map(|t| t.0))
.unwrap_or(now);
DeviceDetail {
id: cr.name_any(),
status: device_status(blacklisted, cr.status.as_ref()),
last_seen,
minutes_ago: (now - last_seen).num_minutes().max(0),
deployment: primary_deployment(&labels, deployments),
region: labels
.get(REGION_LABEL)
.cloned()
.unwrap_or_else(|| "\u{2014}".to_string()),
tags: tags_from_labels(&labels),
inventory: cr.spec.inventory.clone(),
}
}
/// UI status from the device's quarantine label + operator-written
/// liveness. Failing/Pending are deployment-level (the Deployments
/// view), not device liveness, so they don't appear here.
fn device_status(blacklisted: bool, liveness: Option<&DeviceLiveness>) -> DeviceStatus {
if blacklisted {
return DeviceStatus::Blacklisted;
}
match liveness.map(|s| s.reachability) {
Some(Reachability::Reachable) => DeviceStatus::Healthy,
Some(Reachability::Stale) => DeviceStatus::Stale,
Some(Reachability::Unknown) | None => DeviceStatus::Unknown,
}
}
/// First deployment (by name) whose selector matches the device — the
/// canonical [`selector_matches`] over CR labels, the same matcher the
/// aggregator uses. No reconstruction.
fn primary_deployment(
labels: &BTreeMap<String, String>,
deployments: &[DeploymentCr],
) -> Option<String> {
let mut matched: Vec<String> = deployments
.iter()
.filter(|d| selector_matches(&d.spec.target_selector, labels))
.map(ResourceExt::name_any)
.collect();
matched.sort();
matched.into_iter().next()
}
/// Routing labels rendered as `k=v` chips, minus internal keys.
fn tags_from_labels(labels: &BTreeMap<String, String>) -> Vec<String> {
labels
.iter()
.filter(|(k, _)| k.as_str() != BLACKLIST_LABEL && k.as_str() != REGION_LABEL)
.map(|(k, v)| format!("{k}={v}"))
.collect()
}
fn map_deployment(cr: &DeploymentCr) -> DeploymentDetail {
let agg = cr
.status
.as_ref()
.and_then(|s| s.aggregate.clone())
.unwrap_or_default();
let status = if agg.failed > 0 {
DeploymentStatus::Failing
} else if agg.pending > 0 {
DeploymentStatus::Rolling
} else {
DeploymentStatus::Active
};
DeploymentDetail {
name: cr.name_any(),
version: deployment_version(&cr.spec.score),
status,
target: agg.matched_device_count,
healthy: agg.succeeded,
failing: agg.failed,
pending: agg.pending,
updated_at: cr
.creation_timestamp()
.map(|t| t.0.format("%Y-%m-%d %H:%M").to_string())
.unwrap_or_else(|| "\u{2014}".to_string()),
}
}
/// No first-class version on the CR; use the first service's image tag
/// as the human-facing proxy (`nginx:1.25` → `1.25`).
fn deployment_version(score: &ReconcileScore) -> String {
let ReconcileScore::PodmanV0(s) = score;
s.services
.first()
.map(|svc| {
svc.image
.rsplit_once(':')
.map(|(_, tag)| tag.to_string())
.unwrap_or_else(|| svc.image.clone())
})
.unwrap_or_else(|| "\u{2014}".to_string())
}
/// Alerts derived from current CR state (no alert store yet): one
/// critical per failing deployment, one warning per stale device.
fn derive_alerts(
devices: &[DeviceDetail],
deployments: &[DeploymentDetail],
acked: &HashSet<String>,
) -> Vec<Alert> {
let mut alerts = Vec::new();
for d in deployments {
if d.failing > 0 {
let id = format!("dep:{}:failing", d.name);
alerts.push(Alert {
acked: acked.contains(&id),
id,
severity: AlertSeverity::Critical,
title: format!("{} has {} failing device(s)", d.name, d.failing),
deployment: Some(d.name.clone()),
device: None,
at: String::new(),
});
}
}
for dev in devices {
if dev.status == DeviceStatus::Stale {
let id = format!("device:{}:stale", dev.id);
alerts.push(Alert {
acked: acked.contains(&id),
id,
severity: AlertSeverity::Warning,
title: format!("{} last pinged {}m ago", dev.id, dev.minutes_ago),
deployment: dev.deployment.clone(),
device: Some(dev.id.clone()),
at: String::new(),
});
}
}
alerts
}
#[async_trait]
impl FleetService for RealFleetService {
async fn dashboard_detail(&self) -> anyhow::Result<DashboardDetail> {
let devices = self.devices().await?;
let mut deployments = self.deployments().await?;
let acked = self.acked_alerts.lock().unwrap().clone();
let active_alerts = derive_alerts(&devices, &deployments, &acked)
.into_iter()
.filter(|a| !a.acked)
.collect();
let mut d = DashboardDetail {
devices_total: devices.len() as u32,
devices_healthy: 0,
devices_pending: 0,
devices_failing: 0,
devices_stale: 0,
devices_blacklisted: 0,
devices_unknown: 0,
deployments_total: deployments.len(),
health_pct: 0,
attention_devices: devices
.iter()
.filter(|d| matches!(d.status, DeviceStatus::Stale | DeviceStatus::Unknown))
.take(12)
.cloned()
.collect(),
top_deployments: Vec::new(),
active_alerts,
rolling_count: 0,
failing_count: 0,
};
for dev in &devices {
match dev.status {
DeviceStatus::Healthy => d.devices_healthy += 1,
DeviceStatus::Pending => d.devices_pending += 1,
DeviceStatus::Stale => d.devices_stale += 1,
DeviceStatus::Failing => d.devices_failing += 1,
DeviceStatus::Blacklisted => d.devices_blacklisted += 1,
DeviceStatus::Unknown => d.devices_unknown += 1,
}
}
if d.devices_total > 0 {
d.health_pct =
((d.devices_healthy as f64 / d.devices_total as f64) * 100.0).round() as u32;
}
for dep in &deployments {
match dep.status {
DeploymentStatus::Rolling => d.rolling_count += 1,
DeploymentStatus::Failing => d.failing_count += 1,
_ => {}
}
}
deployments.truncate(4);
d.top_deployments = deployments;
Ok(d)
}
async fn list_devices(&self) -> anyhow::Result<Vec<DeviceDetail>> {
self.devices().await
}
async fn get_device(&self, id: &str) -> anyhow::Result<Option<DeviceDetail>> {
Ok(self.devices().await?.into_iter().find(|d| d.id == id))
}
async fn list_deployments(&self) -> anyhow::Result<Vec<DeploymentDetail>> {
self.deployments().await
}
async fn get_deployment(&self, name: &str) -> anyhow::Result<Option<DeploymentDetail>> {
Ok(self
.deployments()
.await?
.into_iter()
.find(|d| d.name == name))
}
async fn get_deployment_devices(&self, name: &str) -> anyhow::Result<Vec<DeviceDetail>> {
let deployments = self.deployment_crs().await?;
let Some(cr) = deployments.iter().find(|c| c.name_any() == name) else {
return Ok(Vec::new());
};
let selector = cr.spec.target_selector.clone();
let now = Utc::now();
Ok(self
.device_crs()
.await?
.iter()
.filter(|dev| {
selector_matches(&selector, &dev.metadata.labels.clone().unwrap_or_default())
})
.map(|dev| map_device(dev, &deployments, now))
.collect())
}
async fn blacklist_device(&self, id: &str) -> anyhow::Result<DeviceDetail> {
let api: Api<DeviceCr> = Api::all(self.kube.clone());
let patch = json!({ "metadata": { "labels": { BLACKLIST_LABEL: "true" } } });
api.patch(id, &PatchParams::default(), &Patch::Merge(&patch))
.await
.with_context(|| format!("patching blacklist label on device {id}"))?;
self.get_device(id)
.await?
.ok_or_else(|| anyhow::anyhow!("device {id} not found after blacklist"))
}
async fn list_alerts(&self) -> anyhow::Result<Vec<Alert>> {
let devices = self.devices().await?;
let deployments = self.deployments().await?;
let acked = self.acked_alerts.lock().unwrap().clone();
Ok(derive_alerts(&devices, &deployments, &acked))
}
async fn ack_alert(&self, id: &str) -> anyhow::Result<bool> {
Ok(self.acked_alerts.lock().unwrap().insert(id.to_string()))
}
async fn run_command(&self, _device_id: &str, command: &str) -> anyhow::Result<String> {
// Seam only: the device round-trip (publish to the agent over
// NATS, stream stdout/stderr back) needs agent-side support.
Ok(format!(
"$ {command}\n[device command transport not implemented yet]"
))
}
async fn filtered_devices(
&self,
status: Option<DeviceStatus>,
deployment: Option<String>,
region: Option<String>,
search: Option<String>,
) -> anyhow::Result<Vec<DeviceDetail>> {
let search = search.map(|s| s.to_lowercase());
Ok(self
.devices()
.await?
.into_iter()
.filter(|d| {
status.is_none_or(|s| d.status == s)
&& deployment
.as_deref()
.is_none_or(|dep| d.deployment.as_deref() == Some(dep))
&& region.as_deref().is_none_or(|r| d.region == r)
&& search.as_deref().is_none_or(|q| {
d.id.to_lowercase().contains(q)
|| d.tags.iter().any(|t| t.to_lowercase().contains(q))
|| d.deployment
.as_deref()
.is_some_and(|dep| dep.to_lowercase().contains(q))
})
})
.collect())
}
}
#[cfg(test)]
mod tests {
use super::*;
use harmony::modules::podman::{PodmanService, PodmanV0Score};
fn liveness(r: Reachability) -> DeviceLiveness {
DeviceLiveness {
last_heartbeat: None,
reachability: r,
}
}
#[test]
fn device_status_maps_liveness_and_blacklist() {
assert_eq!(
device_status(true, Some(&liveness(Reachability::Reachable))),
DeviceStatus::Blacklisted
);
assert_eq!(
device_status(false, Some(&liveness(Reachability::Reachable))),
DeviceStatus::Healthy
);
assert_eq!(
device_status(false, Some(&liveness(Reachability::Stale))),
DeviceStatus::Stale
);
assert_eq!(device_status(false, None), DeviceStatus::Unknown);
}
#[test]
fn version_from_image_tag() {
let score = ReconcileScore::PodmanV0(PodmanV0Score {
services: vec![PodmanService {
name: "web".into(),
image: "nginx:1.25".into(),
ports: vec![],
env: vec![],
volumes: vec![],
restart_policy: Default::default(),
}],
});
assert_eq!(deployment_version(&score), "1.25");
}
#[test]
fn alerts_from_failing_and_stale() {
let devices = vec![DeviceDetail {
id: "pi-09".into(),
status: DeviceStatus::Stale,
last_seen: Utc::now(),
minutes_ago: 14,
deployment: None,
region: "\u{2014}".into(),
tags: vec![],
inventory: None,
}];
let deployments = vec![DeploymentDetail {
name: "edge".into(),
version: "1.0".into(),
status: DeploymentStatus::Failing,
target: 3,
healthy: 1,
failing: 2,
pending: 0,
updated_at: "\u{2014}".into(),
}];
let alerts = derive_alerts(&devices, &deployments, &HashSet::new());
assert_eq!(alerts.len(), 2);
assert!(alerts.iter().any(|a| a.severity == AlertSeverity::Critical));
assert!(alerts.iter().any(|a| a.severity == AlertSeverity::Warning));
}
}

View File

@@ -0,0 +1,76 @@
#!/usr/bin/env bash
# Build + push + deploy the fleet operator to staging in one shot — the
# fast inner loop that skips the git-tag → CI → release ceremony.
#
# It runs the SAME two binaries the release/CD path uses, just with a
# throwaway dev version:
# 1. harmony-fleet-publish — docker-build the operator image (with the
# web dashboard), generate the helm chart, push BOTH to the OCI
# registry.
# 2. harmony-fleet-deploy — helm upgrade --install that chart onto
# staging, applying the dashboard ingress + Service + cert-manager
# cert (host/issuer come from FleetDeployConfig).
#
# Each run gets a unique semver-dev version, so the node always pulls a
# fresh image and helm rolls a new ReplicaSet — no `:dev`-tag cache traps.
# The first build is a full compile; thereafter the Dockerfile's BuildKit
# cache mounts recompile only what changed (seconds).
#
# Usage:
# ./fleet/scripts/dev-deploy-operator.sh
# REGISTRY=hub.nationtech.io PROJECT=harmony ./fleet/scripts/dev-deploy-operator.sh
# PUBLISH_ONLY=1 ./fleet/scripts/dev-deploy-operator.sh # build+push, skip deploy
#
# Prerequisites (same as the release path):
# - docker (BuildKit) + helm on PATH, logged in to $REGISTRY.
# - The deploy reads FleetDeployConfig + FleetDeploySecrets for
# $CONFIG_NAMESPACE (Env → OpenBao). Provide them however you already
# do for `harmony-fleet-deploy` — e.g. a sourced `.envrc` with
# OPENBAO_URL / OPENBAO_TOKEN / HARMONY_CONFIG_NAMESPACE, and a
# KUBECONFIG pointed at staging (or a kubeconfig seeded in OpenBao).
set -euo pipefail
REGISTRY="${REGISTRY:-hub.nationtech.io}"
PROJECT="${PROJECT:-harmony}"
CONFIG_NAMESPACE="${CONFIG_NAMESPACE:-fleet-staging}"
PUBLISH_ONLY="${PUBLISH_ONLY:-0}"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"
cd "$REPO_ROOT"
# Pick up a local .envrc (OpenBao creds, KUBECONFIG, …) if present and
# not already loaded by direnv.
if [[ -f "$REPO_ROOT/.envrc" ]]; then
# shellcheck disable=SC1091
source "$REPO_ROOT/.envrc"
fi
# Unique + semver-valid: helm rejects a non-semver chart version, and a
# fresh version forces a clean image pull + rollout every iteration.
VERSION="0.0.0-dev.$(date -u +'%Y%m%d%H%M%S')"
export DOCKER_BUILDKIT=1
echo "==> [1/2] build + push image and chart @ ${VERSION}"
cargo run -q -p harmony-fleet-deploy --bin harmony-fleet-publish -- \
--version "$VERSION" \
--registry "$REGISTRY" \
--project "$PROJECT"
if [[ "$PUBLISH_ONLY" == "1" ]]; then
echo "==> PUBLISH_ONLY=1, skipping deploy. Chart: oci://${REGISTRY}/${PROJECT}/harmony-fleet-operator-chart:${VERSION}"
exit 0
fi
echo "==> [2/2] deploy chart @ ${VERSION} to ${CONFIG_NAMESPACE}"
cargo run -q -p harmony-fleet-deploy --bin harmony-fleet-deploy -- \
--operator-chart-version "$VERSION" \
--operator-chart-registry "$REGISTRY" \
--operator-chart-project "$PROJECT" \
--config-namespace "$CONFIG_NAMESPACE" \
--yes
echo
echo "Deployed ${REGISTRY}/${PROJECT}/harmony-fleet-operator:${VERSION} to ${CONFIG_NAMESPACE}."

View File

@@ -95,7 +95,8 @@ pub struct AggregateLastError {
version = "v1alpha1",
kind = "Device",
plural = "devices",
shortname = "fleetdev"
shortname = "fleetdev",
status = "DeviceStatus"
)]
#[serde(rename_all = "camelCase")]
pub struct DeviceSpec {
@@ -104,3 +105,31 @@ pub struct DeviceSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub inventory: Option<InventorySnapshot>,
}
/// Operator-maintained liveness reflection of the NATS
/// `device-heartbeat` bucket onto the CR, so `kubectl get devices` and
/// the dashboard see reachability without reading NATS. Written by the
/// device-status reconciler.
#[derive(Serialize, Deserialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct DeviceStatus {
/// RFC 3339 timestamp of the last heartbeat seen. `None` until the
/// device has pinged at least once.
#[serde(skip_serializing_if = "Option::is_none")]
pub last_heartbeat: Option<String>,
pub reachability: Reachability,
}
/// Coarse liveness derived from heartbeat freshness. Failing/Pending
/// are deployment-level concerns (see [`DeploymentAggregate`]), not
/// device liveness.
#[derive(Serialize, Deserialize, Clone, Copy, Debug, Default, PartialEq, Eq, JsonSchema)]
pub enum Reachability {
/// No heartbeat seen yet.
#[default]
Unknown,
/// Heartbeat within the freshness window.
Reachable,
/// Last heartbeat is older than the freshness window.
Stale,
}

View File

@@ -13,5 +13,5 @@ pub mod crd;
pub use crd::{
AggregateLastError, Deployment, DeploymentAggregate, DeploymentSpec, DeploymentStatus, Device,
DeviceSpec, Rollout, RolloutStrategy,
DeviceSpec, DeviceStatus, Reachability, Rollout, RolloutStrategy,
};

View File

@@ -1,10 +1,15 @@
use std::collections::BTreeMap;
use async_trait::async_trait;
use harmony_macros::ingress_path;
use harmony_types::id::Id;
use k8s_openapi::api::networking::v1::Ingress;
use log::{debug, trace};
use k8s_openapi::api::networking::v1::{
HTTPIngressPath, HTTPIngressRuleValue, Ingress, IngressBackend, IngressRule,
IngressServiceBackend, IngressSpec, IngressTLS, ServiceBackendPort,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use log::debug;
use serde::Serialize;
use serde_json::json;
use crate::{
data::Version,
@@ -59,70 +64,71 @@ impl K8sIngressScore {
/// Extracted from `create_interpret` so the TLS/annotation shape is
/// unit-testable without a topology.
fn render_ingress(&self) -> Ingress {
let path = match self.path.clone() {
Some(p) => p,
None => ingress_path!("/"),
};
let path_type = match self.path_type.clone() {
Some(p) => p,
None => PathType::Prefix,
};
let ingress_class = match self.ingress_class_name.clone() {
Some(ingress_class_name) => ingress_class_name,
None => "\"default\"".to_string(),
};
let mut ingress = json!(
{
"metadata": {
"name": self.name.to_string(),
},
"spec": {
"ingressClassName": ingress_class.as_str(),
"rules": [
{ "host": self.host.to_string(),
"http": {
"paths": [
{
"path": path,
"pathType": path_type.as_str(),
"backend": {
"service": {
"name": self.backend_service.to_string(),
"port": {
"number": self.port,
}
}
}
}
]
}
}
]
}
}
);
let path = self.path.clone().unwrap_or_else(|| ingress_path!("/"));
let path_type = self.path_type.clone().unwrap_or(PathType::Prefix);
let host = self.host.to_string();
// cert-manager TLS: the annotation issues the cert into
// `secretName`, and the `tls` block must name that same Secret
// for the Ingress to be portable (a bare `tls` host without a
// secretName is rejected by OKD's ingress-to-route translation).
if let Some(issuer) = &self.cluster_issuer {
let secret_name = format!("{}-tls", self.host.to_string().replace('.', "-"));
ingress["metadata"]["annotations"] = json!({
"cert-manager.io/cluster-issuer": issuer,
"route.openshift.io/termination": "edge",
});
ingress["spec"]["tls"] = json!([{
"hosts": [self.host.to_string()],
"secretName": secret_name,
}]);
}
let (annotations, tls) = match &self.cluster_issuer {
Some(issuer) => {
let secret_name = format!("{}-tls", host.replace('.', "-"));
let annotations = BTreeMap::from([
("cert-manager.io/cluster-issuer".to_string(), issuer.clone()),
// OKD edge termination — a no-op on non-OpenShift clusters.
(
"route.openshift.io/termination".to_string(),
"edge".to_string(),
),
]);
let tls = vec![IngressTLS {
hosts: Some(vec![host.clone()]),
secret_name: Some(secret_name),
}];
(Some(annotations), Some(tls))
}
None => (None, None),
};
let ingress = Ingress {
metadata: ObjectMeta {
name: Some(self.name.to_string()),
annotations,
..Default::default()
},
spec: Some(IngressSpec {
// Left unset when `None` so the cluster's default
// IngressClass claims the resource — setting it to a
// bogus/empty value instead leaves the Ingress unrouted
// (see docs/guides/kubernetes-ingress.md).
ingress_class_name: self.ingress_class_name.clone(),
rules: Some(vec![IngressRule {
host: Some(host),
http: Some(HTTPIngressRuleValue {
paths: vec![HTTPIngressPath {
path: Some(path),
path_type: path_type.as_str().to_string(),
backend: IngressBackend {
service: Some(IngressServiceBackend {
name: self.backend_service.to_string(),
port: Some(ServiceBackendPort {
name: None,
number: Some(i32::from(self.port)),
}),
}),
resource: None,
},
}],
}),
}]),
tls,
..Default::default()
}),
..Default::default()
};
trace!("Building ingresss object from Value {ingress:#}");
let ingress: Ingress = serde_json::from_value(ingress).unwrap();
debug!(
"Successfully built Ingress for host {:?}",
ingress.metadata.name
@@ -231,6 +237,37 @@ mod tests {
assert!(ing.spec.unwrap().tls.is_none());
}
#[test]
fn ingress_class_omitted_when_none() {
// Unset (not "default", not "") so the cluster's default
// IngressClass claims the resource.
let ing = score(None).render_ingress();
assert!(ing.spec.unwrap().ingress_class_name.is_none());
}
#[test]
fn ingress_class_passed_through_when_set() {
let mut s = score(None);
s.ingress_class_name = Some("openshift-default".into());
let ing = s.render_ingress();
assert_eq!(
ing.spec.unwrap().ingress_class_name.as_deref(),
Some("openshift-default")
);
}
#[test]
fn backend_and_path_render() {
let ing = score(None).render_ingress();
let rule = &ing.spec.unwrap().rules.unwrap()[0];
assert_eq!(rule.host.as_deref(), Some("fleet-stg.cb1.nationtech.io"));
let p = &rule.http.as_ref().unwrap().paths[0];
assert_eq!(p.path_type, "Prefix");
let svc = p.backend.service.as_ref().unwrap();
assert_eq!(svc.name, "op");
assert_eq!(svc.port.as_ref().unwrap().number, Some(18080));
}
#[test]
fn cluster_issuer_renders_cert_manager_and_tls() {
let ing = score(Some("letsencrypt-prod".into())).render_ingress();

View File

@@ -10,12 +10,14 @@ default = []
axum = ["dep:axum", "dep:axum-extra"]
[dependencies]
harmony_config = { path = "../harmony_config" }
anyhow.workspace = true
base64.workspace = true
chrono = { workspace = true, features = ["serde"] }
rand.workspace = true
reqwest.workspace = true
serde.workspace = true
schemars = "0.8"
serde = { workspace = true, features = ["derive"] }
serde_json.workspace = true
sha2 = "0.10"
url.workspace = true

View File

@@ -1,4 +1,11 @@
#[derive(Debug, Clone)]
use harmony_config::Config;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
/// Operator web-frontend auth config. Loaded via
/// [`harmony_config::ConfigClient`] (env → OpenBao), so the operator
/// reads one typed value instead of a fistful of `FLEET_AUTH_*` env vars.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Config)]
pub struct ZitadelAuthConfig {
pub zitadel_base: String,
pub base_url: String,
@@ -33,43 +40,27 @@ impl ZitadelAuthConfig {
}
}
pub const ZITADEL_BASE_ENV: &str = "FLEET_AUTH_ZITADEL_BASE";
pub const BASE_URL_ENV: &str = "BASE_URL";
pub const CLIENT_ID_ENV: &str = "FLEET_AUTH_CLIENT_ID";
pub const SCOPE_ENV: &str = "FLEET_AUTH_SCOPE";
pub const TRUSTED_AUDIENCES_ENV: &str = "FLEET_AUTH_TRUSTED_AUDIENCES";
pub const LOGOUT_REDIRECT_URI_ENV: &str = "FLEET_AUTH_LOGOUT_REDIRECT_URI";
pub const COOKIE_KEY_ENV: &str = "FLEET_OPERATOR_COOKIE_KEY_B64";
pub fn config_from_env() -> ZitadelAuthConfig {
ZitadelAuthConfig {
zitadel_base: required_env(ZITADEL_BASE_ENV),
base_url: required_env(BASE_URL_ENV),
client_id: required_env(CLIENT_ID_ENV),
scope: required_env(SCOPE_ENV),
trusted_audiences: required_env(TRUSTED_AUDIENCES_ENV)
.split(',')
.map(str::to_string)
.collect(),
logout_redirect_uri: required_env(LOGOUT_REDIRECT_URI_ENV),
}
/// Operator session-cookie signing key: standard-base64 of ≥64 random
/// bytes. Secret-class, so it resolves from OpenBao / a k8s Secret —
/// never cleartext config. Whoever holds it can forge sessions.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Config)]
#[config(secret)]
pub struct OperatorCookieKey {
pub cookie_key_b64: String,
}
#[cfg(feature = "axum")]
pub fn cookie_key_from_env() -> axum_extra::extract::cookie::Key {
use base64::Engine;
use base64::engine::general_purpose::STANDARD;
impl OperatorCookieKey {
pub fn key(&self) -> anyhow::Result<axum_extra::extract::cookie::Key> {
use base64::Engine;
use base64::engine::general_purpose::STANDARD;
let encoded = required_env(COOKIE_KEY_ENV);
let bytes = STANDARD
.decode(encoded.trim())
.unwrap_or_else(|e| panic!("{COOKIE_KEY_ENV} must be standard base64: {e}"));
if bytes.len() < 64 {
panic!("{COOKIE_KEY_ENV} must decode to at least 64 bytes for private cookies");
let bytes = STANDARD
.decode(self.cookie_key_b64.trim())
.map_err(|e| anyhow::anyhow!("cookie_key_b64 must be standard base64: {e}"))?;
if bytes.len() < 64 {
anyhow::bail!("cookie_key_b64 must decode to at least 64 bytes for private cookies");
}
Ok(axum_extra::extract::cookie::Key::from(&bytes))
}
axum_extra::extract::cookie::Key::from(&bytes)
}
fn required_env(name: &str) -> String {
std::env::var(name).unwrap_or_else(|_| panic!("missing required environment variable {name}"))
}

View File

@@ -5,12 +5,7 @@ pub mod jwks;
pub mod login;
pub mod session;
#[cfg(feature = "axum")]
pub use config::cookie_key_from_env;
pub use config::{
BASE_URL_ENV, CLIENT_ID_ENV, COOKIE_KEY_ENV, LOGOUT_REDIRECT_URI_ENV, SCOPE_ENV,
TRUSTED_AUDIENCES_ENV, ZITADEL_BASE_ENV, ZitadelAuthConfig, config_from_env,
};
pub use config::{OperatorCookieKey, ZitadelAuthConfig};
pub use jwks::JwksCache;