feat(fleet-operator): real dashboard data from kube CRs + NATS KV #322
5
Cargo.lock
generated
5
Cargo.lock
generated
@@ -3498,6 +3498,7 @@ name = "example_fleet_staging_install"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"anyhow",
|
||||
"base64 0.22.1",
|
||||
"clap",
|
||||
"harmony",
|
||||
"harmony-fleet-deploy",
|
||||
@@ -4104,6 +4105,7 @@ dependencies = [
|
||||
"harmony_config",
|
||||
"harmony_macros",
|
||||
"harmony_types",
|
||||
"harmony_zitadel_auth",
|
||||
"k8s-openapi",
|
||||
"kube",
|
||||
"log",
|
||||
@@ -4164,6 +4166,7 @@ dependencies = [
|
||||
"harmony",
|
||||
"harmony-fleet-auth",
|
||||
"harmony-reconciler-contracts",
|
||||
"harmony_config",
|
||||
"harmony_zitadel_auth",
|
||||
"k8s-openapi",
|
||||
"kube",
|
||||
@@ -4533,10 +4536,12 @@ dependencies = [
|
||||
"axum-extra",
|
||||
"base64 0.22.1",
|
||||
"chrono",
|
||||
"harmony_config",
|
||||
"jsonwebtoken",
|
||||
"openidconnect",
|
||||
"rand 0.9.4",
|
||||
"reqwest 0.12.28",
|
||||
"schemars 0.8.22",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"sha2 0.10.9",
|
||||
|
||||
@@ -24,6 +24,7 @@
|
||||
- [Writing a Topology](./guides/writing-a-topology.md)
|
||||
- [Adding Capabilities](./guides/adding-capabilities.md)
|
||||
- [Web Authentication and CSRF Security](./guides/web-auth-security.md)
|
||||
- [Operator Dashboard SSO (Zitadel) — setup](./guides/operator-dashboard-sso.md)
|
||||
|
||||
## Configuration
|
||||
|
||||
|
||||
55
docs/guides/operator-dashboard-sso.md
Normal file
55
docs/guides/operator-dashboard-sso.md
Normal file
@@ -0,0 +1,55 @@
|
||||
# Operator Dashboard SSO (Zitadel) — setup
|
||||
|
||||
Browser SSO for the fleet operator web UI (OIDC Authorization Code + PKCE,
|
||||
public client). Distinct from the agent/callout machine auth
|
||||
([fleet-zitadel-faq](./fleet-zitadel-faq.md)); the security rationale is in
|
||||
[web-auth-security](./web-auth-security.md). Code: `harmony_zitadel_auth/`.
|
||||
|
||||
## Quickstart (staging)
|
||||
|
||||
1. **Zitadel app** — create a **Web** application, auth method **PKCE** (no client
|
||||
secret), redirect URI `https://fleet-stg.<base>/auth/callback`, post-logout URI
|
||||
`https://fleet-stg.<base>/`. Copy its **Client ID**.
|
||||
2. **Seed config** in OpenBao (namespace `fleet-staging`) — the deploy derives every
|
||||
host from `base_domain`, so you set only:
|
||||
- `FleetDeployConfig.operator_oidc_client_id` = the Client ID
|
||||
- `FleetDeployConfig.operator_trusted_audiences` = `["<Client ID>"]`
|
||||
- `FleetDeploySecrets.operator_cookie_key_b64` = `openssl rand -base64 64`
|
||||
3. **Deploy**: `./fleet/scripts/dev-deploy-operator.sh`
|
||||
4. Open `https://fleet-stg.<base>/` → Zitadel login → back to the dashboard.
|
||||
|
||||
`fleet_staging_install` already generates the cookie key, so a fresh install needs
|
||||
only the Client ID + audiences.
|
||||
|
||||
## Local dev (`serve-web`)
|
||||
|
||||
`fleet/harmony-fleet-operator/dev.sh` sets the same config as two
|
||||
`HARMONY_CONFIG_<TypeName>` env JSON blobs (ConfigClient's env source). Point
|
||||
`base_url` at `http://localhost:18080`, register that callback in the app, and turn
|
||||
on the app's **Development Mode** (Zitadel rejects non-HTTPS redirects otherwise).
|
||||
|
||||
## When login fails — check these first
|
||||
|
||||
- **`iss` mismatch** — `zitadel_base` must equal the token issuer byte-for-byte, no
|
||||
trailing slash.
|
||||
- **`aud` mismatch** — `trusted_audiences` must contain the token's `aud`; Zitadel
|
||||
puts the app's Client ID there by default.
|
||||
- **Client secret** — the app must be PKCE-only; the code never sends a secret.
|
||||
- **Redirect URI** — must be exactly `{base_url}/auth/callback`.
|
||||
- **Cookie key** — `cookie_key_b64` must decode to ≥64 bytes, else the dashboard
|
||||
refuses to start (`cookie_key_b64 must decode to at least 64 bytes`; reconcile
|
||||
keeps running).
|
||||
|
||||
## Config reference
|
||||
|
||||
The operator reads `ZitadelAuthConfig` + `OperatorCookieKey` via ConfigClient. The
|
||||
deploy derives `zitadel_base` / `base_url` / `logout_redirect_uri` from `base_domain`
|
||||
(`https://sso-stg.<base>`, `https://fleet-stg.<base>`, `…/`) and fixes
|
||||
`scope = openid profile email`; you supply `client_id`, `trusted_audiences`,
|
||||
`cookie_key_b64`. All endpoints derive from `zitadel_base`:
|
||||
`/.well-known/openid-configuration`, `/oauth/v2/authorize`, `/oauth/v2/token`,
|
||||
`/oidc/v1/end_session`.
|
||||
|
||||
> The dashboard only checks that the user authenticated — no role gate yet
|
||||
> ([web-auth-security](./web-auth-security.md) §3,
|
||||
> [ROADMAP/09](../../ROADMAP/09-sso-config-hardening.md)).
|
||||
@@ -2,6 +2,8 @@
|
||||
|
||||
These guidelines define the baseline for Harmony web frontends and future operator dashboards that use browser-based authentication, cookie sessions, Axum, HTMX, or OIDC providers such as Zitadel.
|
||||
|
||||
> Setting up the fleet operator dashboard's SSO concretely (Zitadel app + the exact config values)? See [Operator Dashboard SSO (Zitadel) — setup](./operator-dashboard-sso.md).
|
||||
|
||||
## Goals
|
||||
|
||||
- Prevent unauthenticated access.
|
||||
|
||||
@@ -680,6 +680,8 @@ key_json = """
|
||||
log_level: "info,kube_runtime=warn".to_string(),
|
||||
credentials: Some(OperatorCredentials { credentials_toml }),
|
||||
chart_version: None,
|
||||
web_auth_config_json: None,
|
||||
web_cookie_key_json: None,
|
||||
};
|
||||
|
||||
// CRDs first — the operator watches them on startup.
|
||||
|
||||
@@ -20,6 +20,7 @@ harmony-nats-callout = { path = "../../nats/callout" }
|
||||
harmony-fleet-deploy = { path = "../../fleet/harmony-fleet-deploy" }
|
||||
nkeys = "0.4"
|
||||
rand = "0.9"
|
||||
base64.workspace = true
|
||||
anyhow.workspace = true
|
||||
clap = { version = "4", features = ["derive", "env"] }
|
||||
tokio.workspace = true
|
||||
|
||||
@@ -347,6 +347,7 @@ path "secret/metadata/harmony/*" { capabilities = ["list","read"] }"#
|
||||
.set(&FleetDeploySecrets {
|
||||
operator_credentials_toml: credentials.credentials_toml.clone(),
|
||||
kubeconfig: None,
|
||||
operator_cookie_key_b64: generate_cookie_key_b64(),
|
||||
})
|
||||
.await
|
||||
.context("seed FleetDeploySecrets")?;
|
||||
@@ -378,3 +379,13 @@ fn generate_alphanum(len: usize) -> String {
|
||||
.map(|_| CHARSET[rng.random_range(0..CHARSET.len())] as char)
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Standard-base64 of 64 random bytes — the operator's session-cookie
|
||||
/// signing key. Generated per install and seeded as a secret.
|
||||
fn generate_cookie_key_b64() -> String {
|
||||
use base64::Engine;
|
||||
use rand::Rng;
|
||||
let mut bytes = [0u8; 64];
|
||||
rand::rng().fill(&mut bytes);
|
||||
base64::engine::general_purpose::STANDARD.encode(bytes)
|
||||
}
|
||||
|
||||
@@ -29,6 +29,7 @@ harmony_types = { path = "../../harmony_types" }
|
||||
harmony_macros = { path = "../../harmony_macros" }
|
||||
harmony-fleet-auth = { path = "../harmony-fleet-auth" }
|
||||
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
|
||||
harmony_zitadel_auth = { path = "../../harmony_zitadel_auth" }
|
||||
|
||||
anyhow = { workspace = true }
|
||||
async-trait = { workspace = true }
|
||||
|
||||
@@ -1,21 +1,29 @@
|
||||
//! `harmony-fleet-publish` — build + publish the operator image + chart
|
||||
//! for a tagged release. `docker` / `helm` must be on PATH and logged in
|
||||
//! to the registry (CI's login actions; dev's manual login).
|
||||
//! at a version. `docker` / `helm` must be on PATH and logged in to the
|
||||
//! registry (CI's login actions; dev's manual login).
|
||||
|
||||
use anyhow::Result;
|
||||
use anyhow::{Result, bail};
|
||||
use clap::Parser;
|
||||
use harmony_fleet_deploy::release::{release_operator, version_from_tag};
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(
|
||||
name = "harmony-fleet-publish",
|
||||
about = "Build + publish the operator image + chart for a tagged release"
|
||||
about = "Build + publish the operator image + chart at a version"
|
||||
)]
|
||||
struct Cli {
|
||||
/// Git tag, e.g. `harmony-fleet-operator-v0.1.0`. Defaults to
|
||||
/// `$GITHUB_REF_NAME` so CI passes nothing.
|
||||
/// The image + chart version, e.g. `0.1.0` or `0.0.0-dev.1730000000`
|
||||
/// (must be valid semver — helm rejects anything else). Wins over
|
||||
/// `--from-tag`; the laptop/dev path.
|
||||
#[arg(long)]
|
||||
version: Option<String>,
|
||||
|
||||
/// Parse the version out of a release tag like
|
||||
/// `harmony-fleet-operator-v0.1.0`. Defaults to `$GITHUB_REF_NAME`
|
||||
/// so the release workflow passes nothing. Used when `--version`
|
||||
/// is unset.
|
||||
#[arg(long, env = "GITHUB_REF_NAME")]
|
||||
from_tag: String,
|
||||
from_tag: Option<String>,
|
||||
|
||||
#[arg(long, default_value = "hub.nationtech.io")]
|
||||
registry: String,
|
||||
@@ -28,9 +36,19 @@ struct Cli {
|
||||
no_push: bool,
|
||||
}
|
||||
|
||||
impl Cli {
|
||||
fn version(&self) -> Result<String> {
|
||||
match (&self.version, &self.from_tag) {
|
||||
(Some(v), _) => Ok(v.clone()),
|
||||
(None, Some(tag)) => version_from_tag(tag),
|
||||
(None, None) => bail!("set --version (e.g. 0.1.0) or --from-tag"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
|
||||
let cli = Cli::parse();
|
||||
let version = version_from_tag(&cli.from_tag)?;
|
||||
let version = cli.version()?;
|
||||
release_operator(&version, &cli.registry, &cli.project, !cli.no_push)
|
||||
}
|
||||
|
||||
@@ -18,6 +18,7 @@ use harmony_config::ConfigClient;
|
||||
use harmony_fleet_deploy::{
|
||||
FleetDeployConfig, FleetDeploySecrets, FleetOperatorScore, version_from_tag,
|
||||
};
|
||||
use harmony_zitadel_auth::{OperatorCookieKey, ZitadelAuthConfig};
|
||||
use tracing::info;
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
@@ -114,12 +115,28 @@ async fn main() -> Result<()> {
|
||||
// Coherent with the other staging hosts (sso-stg., secrets-stg.).
|
||||
let ui_host = format!("fleet-stg.{}", config.base_domain);
|
||||
|
||||
// Dashboard SSO config: hosts derive from base_domain; client_id +
|
||||
// audiences come from config, cookie key from secrets. Baked into the
|
||||
// operator Secret for the pod's ConfigClient to read.
|
||||
let web_auth = ZitadelAuthConfig {
|
||||
zitadel_base: format!("https://sso-stg.{}", config.base_domain),
|
||||
base_url: format!("https://{ui_host}"),
|
||||
client_id: config.operator_oidc_client_id,
|
||||
scope: "openid profile email".to_string(),
|
||||
trusted_audiences: config.operator_trusted_audiences,
|
||||
logout_redirect_uri: format!("https://{ui_host}/"),
|
||||
};
|
||||
let cookie = OperatorCookieKey {
|
||||
cookie_key_b64: secrets.operator_cookie_key_b64,
|
||||
};
|
||||
|
||||
let operator = FleetOperatorScore::new()
|
||||
.namespace(namespace)
|
||||
.nats_url(config.nats_url)
|
||||
.credentials(secrets.operator_credentials_toml)
|
||||
.published_chart(registry, project, version)
|
||||
.ingress(ui_host, Some(config.cluster_issuer));
|
||||
.ingress(ui_host, Some(config.cluster_issuer))
|
||||
.web_auth(web_auth, cookie);
|
||||
|
||||
harmony_cli::run(
|
||||
Inventory::autoload(),
|
||||
|
||||
@@ -70,6 +70,13 @@ pub struct ChartOptions {
|
||||
/// sets this to the released tag so the OCI chart artifact lands
|
||||
/// at `…/harmony-fleet-operator-chart:<tag>` matching the image tag.
|
||||
pub chart_version: Option<String>,
|
||||
/// JSON of the dashboard's `ZitadelAuthConfig`, stored in the
|
||||
/// operator Secret under [`ENV_WEB_AUTH_CONFIG`]. `None` leaves the
|
||||
/// dashboard unauthenticated (dev/e2e).
|
||||
pub web_auth_config_json: Option<String>,
|
||||
/// JSON of the dashboard's `OperatorCookieKey`, stored under
|
||||
/// [`ENV_WEB_COOKIE_KEY`].
|
||||
pub web_cookie_key_json: Option<String>,
|
||||
}
|
||||
|
||||
/// What the operator pod needs to authenticate to NATS via the auth
|
||||
@@ -134,6 +141,8 @@ impl Default for ChartOptions {
|
||||
log_level: "info,kube_runtime=warn".to_string(),
|
||||
credentials: None,
|
||||
chart_version: None,
|
||||
web_auth_config_json: None,
|
||||
web_cookie_key_json: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -153,6 +162,15 @@ pub const OPERATOR_HTTP_PORT: u16 = 18080;
|
||||
/// including the inline JSON keyfile under `key_json`.
|
||||
pub const SECRET_KEY_CREDENTIALS_TOML: &str = "credentials.toml";
|
||||
|
||||
/// Env var + operator-Secret key the dashboard's `ConfigClient` reads
|
||||
/// its `ZitadelAuthConfig` / `OperatorCookieKey` from. The value is the
|
||||
/// JSON of those structs; `ConfigClient`'s `EnvSource` keys a config
|
||||
/// type under `HARMONY_CONFIG_<KEY>`. Locked to the structs' Config
|
||||
/// keys by a test below — the chart wires the env→Secret reference at
|
||||
/// publish time, the deploy fills the Secret values at deploy time.
|
||||
pub const ENV_WEB_AUTH_CONFIG: &str = "HARMONY_CONFIG_ZitadelAuthConfig";
|
||||
pub const ENV_WEB_COOKIE_KEY: &str = "HARMONY_CONFIG_OperatorCookieKey";
|
||||
|
||||
/// Build + write the chart to `opts.output_dir`. Returns the full
|
||||
/// path to the generated chart directory (which is what `helm
|
||||
/// install <path>` wants).
|
||||
@@ -215,6 +233,21 @@ pub fn operator_secret(opts: &ChartOptions) -> Option<Secret> {
|
||||
SECRET_KEY_CREDENTIALS_TOML.to_string(),
|
||||
ByteString(creds.credentials_toml.as_bytes().to_vec()),
|
||||
);
|
||||
// Dashboard auth config + cookie key (when configured) ride in the
|
||||
// same Secret; the Deployment sources them as HARMONY_CONFIG_* env
|
||||
// for the operator's ConfigClient.
|
||||
if let Some(json) = &opts.web_auth_config_json {
|
||||
data.insert(
|
||||
ENV_WEB_AUTH_CONFIG.to_string(),
|
||||
ByteString(json.as_bytes().to_vec()),
|
||||
);
|
||||
}
|
||||
if let Some(json) = &opts.web_cookie_key_json {
|
||||
data.insert(
|
||||
ENV_WEB_COOKIE_KEY.to_string(),
|
||||
ByteString(json.as_bytes().to_vec()),
|
||||
);
|
||||
}
|
||||
// Namespace deliberately omitted — the caller passes the target
|
||||
// namespace to `K8sResourceScore::single`, which injects it at
|
||||
// apply time. Keeps the Secret manifest reusable across
|
||||
@@ -294,7 +327,7 @@ fn cluster_role() -> ClusterRole {
|
||||
// Devices: reconciler server-side-applies + deletes;
|
||||
// aggregator lists + watches.
|
||||
PolicyRule {
|
||||
api_groups: Some(vec![group]),
|
||||
api_groups: Some(vec![group.clone()]),
|
||||
resources: Some(vec!["devices".to_string()]),
|
||||
verbs: vec![
|
||||
"get", "list", "watch", "create", "update", "patch", "delete",
|
||||
@@ -304,6 +337,17 @@ fn cluster_role() -> ClusterRole {
|
||||
.collect(),
|
||||
..Default::default()
|
||||
},
|
||||
// Device liveness: the device-status reconciler patches the
|
||||
// status subresource — a distinct RBAC resource from `devices`.
|
||||
PolicyRule {
|
||||
api_groups: Some(vec![group]),
|
||||
resources: Some(vec!["devices/status".to_string()]),
|
||||
verbs: vec!["get", "update", "patch"]
|
||||
.into_iter()
|
||||
.map(String::from)
|
||||
.collect(),
|
||||
..Default::default()
|
||||
},
|
||||
]),
|
||||
..Default::default()
|
||||
}
|
||||
@@ -383,6 +427,22 @@ fn operator_deployment(opts: &ChartOptions) -> K8sDeployment {
|
||||
// pod compatible with OKD's restricted-v2 SCC and the
|
||||
// `harmony_fleet_auth::CredentialsSection` deserializer
|
||||
// handles inline-vs-file from the same TOML shape.
|
||||
// All three env-from-Secret with optional=true: the Pod starts
|
||||
// before the deploy applies the Secret, and the dashboard's two
|
||||
// HARMONY_CONFIG_* values are simply absent on deploys that don't
|
||||
// configure auth (dev/e2e).
|
||||
let secret_env = |name: &str| EnvVar {
|
||||
name: name.to_string(),
|
||||
value_from: Some(EnvVarSource {
|
||||
secret_key_ref: Some(SecretKeySelector {
|
||||
name: SECRET_NAME.to_string(),
|
||||
key: name.to_string(),
|
||||
optional: Some(true),
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
env.push(EnvVar {
|
||||
name: OPERATOR_CREDENTIALS_ENV_VAR.to_string(),
|
||||
value_from: Some(EnvVarSource {
|
||||
@@ -395,6 +455,8 @@ fn operator_deployment(opts: &ChartOptions) -> K8sDeployment {
|
||||
}),
|
||||
..Default::default()
|
||||
});
|
||||
env.push(secret_env(ENV_WEB_AUTH_CONFIG));
|
||||
env.push(secret_env(ENV_WEB_COOKIE_KEY));
|
||||
|
||||
// Namespace deliberately omitted — same rationale as the
|
||||
// ServiceAccount: helm fills in the release namespace at install
|
||||
@@ -532,7 +594,7 @@ mod tests {
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn chart_includes_credentials_secret_and_env_var() {
|
||||
fn chart_omits_secret_but_deployment_references_it() {
|
||||
let tmp = tempfile::tempdir().unwrap();
|
||||
let chart_path = build_chart(&ChartOptions {
|
||||
output_dir: tmp.path().to_path_buf(),
|
||||
@@ -540,14 +602,58 @@ mod tests {
|
||||
})
|
||||
.unwrap();
|
||||
|
||||
let secret_yaml =
|
||||
std::fs::read_to_string(chart_path.join("templates/secret-credentials.yaml"))
|
||||
.expect("secret-credentials.yaml must exist in chart");
|
||||
assert!(secret_yaml.contains(SECRET_NAME));
|
||||
// The chart is hydrated (no templating), so it must NOT carry the
|
||||
// credentials Secret — the deploy applies it out-of-band. A Secret
|
||||
// in the chart fights Helm ownership against the deploy-applied one
|
||||
// and re-blanks it on every upgrade.
|
||||
assert!(
|
||||
!chart_path
|
||||
.join("templates/secret-credentials.yaml")
|
||||
.exists(),
|
||||
"hydrated chart must not contain a Secret"
|
||||
);
|
||||
|
||||
// The Deployment still wires the env var from that out-of-band
|
||||
// Secret via secretKeyRef (optional, so the Pod starts before it
|
||||
// lands).
|
||||
let deployment_yaml = std::fs::read_to_string(chart_path.join("templates/deployment.yaml"))
|
||||
.expect("deployment.yaml must exist in chart");
|
||||
assert!(deployment_yaml.contains(OPERATOR_CREDENTIALS_ENV_VAR));
|
||||
assert!(deployment_yaml.contains(SECRET_NAME));
|
||||
}
|
||||
|
||||
// The CRDs carry status subresources, which RBAC treats as distinct
|
||||
// resources — a patch on `*/status` is forbidden without an explicit
|
||||
// grant. Lock both so adding a status subresource can't silently 403.
|
||||
#[test]
|
||||
fn cluster_role_grants_status_subresources() {
|
||||
let role = cluster_role();
|
||||
let grants_patch = |resource: &str| {
|
||||
role.rules.as_ref().unwrap().iter().any(|r| {
|
||||
r.resources
|
||||
.as_ref()
|
||||
.is_some_and(|res| res.iter().any(|x| x == resource))
|
||||
&& r.verbs.iter().any(|v| v == "patch")
|
||||
})
|
||||
};
|
||||
assert!(grants_patch("deployments/status"));
|
||||
assert!(grants_patch("devices/status"));
|
||||
}
|
||||
|
||||
// The chart bakes these env names at publish time; the operator's
|
||||
// ConfigClient derives them from the struct names at runtime. Lock
|
||||
// them together so a rename can't silently desync the two.
|
||||
#[test]
|
||||
fn web_auth_env_names_match_config_keys() {
|
||||
use harmony_config::Config;
|
||||
use harmony_zitadel_auth::{OperatorCookieKey, ZitadelAuthConfig};
|
||||
assert_eq!(
|
||||
ENV_WEB_AUTH_CONFIG,
|
||||
format!("HARMONY_CONFIG_{}", ZitadelAuthConfig::KEY)
|
||||
);
|
||||
assert_eq!(
|
||||
ENV_WEB_COOKIE_KEY,
|
||||
format!("HARMONY_CONFIG_{}", OperatorCookieKey::KEY)
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -78,6 +78,17 @@ pub struct FleetOperatorScore {
|
||||
/// cert-manager `ClusterIssuer` for the UI Ingress. `None` (or no
|
||||
/// host) serves plain HTTP — the right default on issuer-less k3d.
|
||||
pub cluster_issuer: Option<String>,
|
||||
/// Dashboard SSO config + cookie key, baked into the operator Secret
|
||||
/// for the pod's `ConfigClient` to read. `None` leaves the dashboard
|
||||
/// unauthenticated (dev/e2e).
|
||||
pub web_auth: Option<WebAuth>,
|
||||
}
|
||||
|
||||
/// The dashboard's auth inputs the operator reads via `ConfigClient`.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct WebAuth {
|
||||
pub config: harmony_zitadel_auth::ZitadelAuthConfig,
|
||||
pub cookie: harmony_zitadel_auth::OperatorCookieKey,
|
||||
}
|
||||
|
||||
impl FleetOperatorScore {
|
||||
@@ -98,6 +109,7 @@ impl FleetOperatorScore {
|
||||
published_chart: None,
|
||||
operator_ui_host: None,
|
||||
cluster_issuer: None,
|
||||
web_auth: None,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -110,6 +122,17 @@ impl FleetOperatorScore {
|
||||
self
|
||||
}
|
||||
|
||||
/// Configure dashboard SSO: the `ZitadelAuthConfig` + cookie key are
|
||||
/// baked into the operator Secret for the pod's `ConfigClient`.
|
||||
pub fn web_auth(
|
||||
mut self,
|
||||
config: harmony_zitadel_auth::ZitadelAuthConfig,
|
||||
cookie: harmony_zitadel_auth::OperatorCookieKey,
|
||||
) -> Self {
|
||||
self.web_auth = Some(WebAuth { config, cookie });
|
||||
self
|
||||
}
|
||||
|
||||
/// Install the published OCI chart at `version` instead of rendering
|
||||
/// one from local source (the CD `harmony apply` path).
|
||||
pub fn published_chart(
|
||||
@@ -202,9 +225,22 @@ impl<T: Topology + HelmCommand + K8sclient> Interpret<T> for FleetOperatorInterp
|
||||
// directly, not via the chart — it's environment-specific. The
|
||||
// published-chart CD path runs without credentials today, so
|
||||
// this is a no-op there.
|
||||
let (web_auth_config_json, web_cookie_key_json) = match &self.score.web_auth {
|
||||
Some(w) => (
|
||||
Some(serde_json::to_string(&w.config).map_err(|e| {
|
||||
InterpretError::new(format!("serialize ZitadelAuthConfig: {e}"))
|
||||
})?),
|
||||
Some(serde_json::to_string(&w.cookie).map_err(|e| {
|
||||
InterpretError::new(format!("serialize OperatorCookieKey: {e}"))
|
||||
})?),
|
||||
),
|
||||
None => (None, None),
|
||||
};
|
||||
if let Some(creds) = &self.score.credentials
|
||||
&& let Some(secret) = operator_secret(&ChartOptions {
|
||||
credentials: Some(creds.clone()),
|
||||
web_auth_config_json,
|
||||
web_cookie_key_json,
|
||||
..ChartOptions::default()
|
||||
})
|
||||
{
|
||||
@@ -252,6 +288,10 @@ impl<T: Topology + HelmCommand + K8sclient> Interpret<T> for FleetOperatorInterp
|
||||
log_level: self.score.log_level.clone(),
|
||||
credentials: self.score.credentials.clone(),
|
||||
chart_version: None,
|
||||
// The auth Secret is applied separately above; the
|
||||
// rendered chart only needs the Deployment env wiring.
|
||||
web_auth_config_json: None,
|
||||
web_cookie_key_json: None,
|
||||
})
|
||||
.map_err(|e| InterpretError::new(format!("build operator chart: {e}")))?;
|
||||
let chart_path_str = chart_path.to_str().ok_or_else(|| {
|
||||
|
||||
@@ -22,6 +22,12 @@ pub struct FleetDeploySecrets {
|
||||
/// `KUBECONFIG`.
|
||||
#[serde(default)]
|
||||
pub kubeconfig: Option<String>,
|
||||
|
||||
/// Operator dashboard session-cookie signing key (standard-base64 of
|
||||
/// ≥64 bytes). `#[serde(default)]` so older seeds without it still
|
||||
/// load — but the dashboard won't authenticate until it's set.
|
||||
#[serde(default)]
|
||||
pub operator_cookie_key_b64: String,
|
||||
}
|
||||
|
||||
/// Non-secret deploy config: k8s namespaces + chart coords. Loaded via
|
||||
@@ -52,6 +58,14 @@ pub struct FleetDeployConfig {
|
||||
|
||||
/// cert-manager `ClusterIssuer` for the operator UI's TLS cert.
|
||||
pub cluster_issuer: String,
|
||||
|
||||
/// Zitadel OIDC app the operator dashboard authenticates against.
|
||||
#[serde(default)]
|
||||
pub operator_oidc_client_id: String,
|
||||
|
||||
/// Token audiences the dashboard accepts (Zitadel project/app ids).
|
||||
#[serde(default)]
|
||||
pub operator_trusted_audiences: Vec<String>,
|
||||
}
|
||||
|
||||
impl Default for FleetDeployConfig {
|
||||
@@ -65,6 +79,8 @@ impl Default for FleetDeployConfig {
|
||||
operator_chart_project: "harmony".to_string(),
|
||||
base_domain: "cb1.nationtech.io".to_string(),
|
||||
cluster_issuer: "letsencrypt-prod".to_string(),
|
||||
operator_oidc_client_id: String::new(),
|
||||
operator_trusted_audiences: Vec::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,6 +17,7 @@ web-frontend = ["dep:axum", "dep:axum-extra", "dep:maud", "dep:tokio-stream", "h
|
||||
harmony = { path = "../../harmony", features = ["podman"] }
|
||||
harmony-fleet-auth = { path = "../harmony-fleet-auth" }
|
||||
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
|
||||
harmony_config = { path = "../../harmony_config" }
|
||||
harmony_zitadel_auth = { path = "../../harmony_zitadel_auth" }
|
||||
toml = { workspace = true }
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
# syntax=docker/dockerfile:1.7
|
||||
# Multi-stage container build for harmony-fleet-operator.
|
||||
#
|
||||
# Build context is the workspace root (the operator's Cargo.toml has
|
||||
@@ -21,17 +22,44 @@
|
||||
FROM docker.io/rust:1.94-slim-bookworm AS builder
|
||||
|
||||
# pkg-config + libssl-dev cover transitive native-tls/openssl-sys deps
|
||||
# that surface during link; ca-certificates lets cargo fetch over TLS.
|
||||
# that surface during link; ca-certificates lets cargo fetch over TLS;
|
||||
# curl downloads the Tailwind CLI below.
|
||||
RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
pkg-config \
|
||||
ca-certificates \
|
||||
libssl-dev \
|
||||
curl \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
# Tailwind v4 standalone CLI: build.rs shells out to it to generate the
|
||||
# embedded CSS bundle. Pinned to match the dev host; override with
|
||||
# --build-arg TAILWIND_VERSION=... TAILWIND_REQUIRED makes build.rs hard
|
||||
# fail (not silently ship empty CSS) if the CLI is missing — see build.rs.
|
||||
ARG TAILWIND_VERSION=v4.3.0
|
||||
RUN curl -fsSL -o /usr/local/bin/tailwindcss \
|
||||
"https://github.com/tailwindlabs/tailwindcss/releases/download/${TAILWIND_VERSION}/tailwindcss-linux-x64" \
|
||||
&& chmod +x /usr/local/bin/tailwindcss \
|
||||
&& tailwindcss --help >/dev/null
|
||||
ENV TAILWIND_REQUIRED=1
|
||||
|
||||
WORKDIR /app
|
||||
COPY . .
|
||||
|
||||
RUN cargo build --release --locked -p harmony-fleet-operator
|
||||
# `web-frontend` bundles the dashboard the operator serves in-process
|
||||
# alongside the reconcile loop.
|
||||
#
|
||||
# BuildKit cache mounts persist the cargo registry + `target/` across
|
||||
# builds, so an iterate-build-deploy loop recompiles only changed crates
|
||||
# (seconds) instead of the whole workspace (minutes). The mounts are
|
||||
# build-time only — the resulting image is identical, and a cold CI
|
||||
# runner just rebuilds from scratch. The binary is copied out of the
|
||||
# cache-mounted target within the same RUN, since cache mounts aren't
|
||||
# part of the produced layer.
|
||||
RUN --mount=type=cache,target=/usr/local/cargo/registry \
|
||||
--mount=type=cache,target=/usr/local/cargo/git \
|
||||
--mount=type=cache,target=/app/target \
|
||||
cargo build --release --locked -p harmony-fleet-operator --features web-frontend \
|
||||
&& cp /app/target/release/harmony-fleet-operator /harmony-fleet-operator
|
||||
|
||||
FROM docker.io/library/debian:bookworm-slim
|
||||
|
||||
@@ -41,7 +69,7 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
ca-certificates \
|
||||
&& rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY --from=builder /app/target/release/harmony-fleet-operator /usr/local/bin/harmony-fleet-operator
|
||||
COPY --from=builder /harmony-fleet-operator /usr/local/bin/harmony-fleet-operator
|
||||
|
||||
# Non-root runtime. Pairs with the Pod's `securityContext.
|
||||
# runAsNonRoot: true` in the helm chart — k8s admission rejects pods
|
||||
|
||||
@@ -33,34 +33,38 @@ fn main() {
|
||||
println!("cargo:rerun-if-changed=style/input.css");
|
||||
println!("cargo:rerun-if-changed=src/frontend");
|
||||
println!("cargo:rerun-if-changed=src/service");
|
||||
// Toggling this re-runs the script, so the container build (which sets
|
||||
// it) regenerates rather than reusing a cache-mounted empty bundle.
|
||||
println!("cargo:rerun-if-env-changed=TAILWIND_REQUIRED");
|
||||
|
||||
let status = Command::new("tailwindcss")
|
||||
// Container/production builds set TAILWIND_REQUIRED: a missing or
|
||||
// failing CLI is a hard error there (never ship empty CSS). Dev builds
|
||||
// leave it unset and fall back to empty, serving CSS via
|
||||
// `serve-web --css-from <path>` against a `tailwindcss --watch` sidecar.
|
||||
let required = std::env::var_os("TAILWIND_REQUIRED").is_some();
|
||||
let fall_back = |reason: String| {
|
||||
assert!(
|
||||
!required,
|
||||
"{reason}\nTAILWIND_REQUIRED is set: refusing to ship a frontend with empty CSS. \
|
||||
Install the v4 standalone CLI: https://github.com/tailwindlabs/tailwindcss/releases"
|
||||
);
|
||||
println!(
|
||||
"cargo:warning={reason}; embedded CSS will be empty \
|
||||
(use `serve-web --css-from <path>` in dev)."
|
||||
);
|
||||
std::fs::write(&output, b"").unwrap();
|
||||
};
|
||||
|
||||
match Command::new("tailwindcss")
|
||||
.arg("--input")
|
||||
.arg(&input)
|
||||
.arg("--output")
|
||||
.arg(&output)
|
||||
.arg("--minify")
|
||||
.status();
|
||||
|
||||
match status {
|
||||
.status()
|
||||
{
|
||||
Ok(s) if s.success() => {}
|
||||
Ok(s) => {
|
||||
println!(
|
||||
"cargo:warning=tailwindcss exited with status {s}; embedded CSS will be empty. \
|
||||
Install the v4 standalone CLI \
|
||||
(https://github.com/tailwindlabs/tailwindcss/releases) for production builds, \
|
||||
or use `serve-web --css-from <path>` against a `tailwindcss --watch` sidecar in dev."
|
||||
);
|
||||
std::fs::write(&output, b"").unwrap();
|
||||
}
|
||||
Err(e) => {
|
||||
println!(
|
||||
"cargo:warning=tailwindcss not invocable ({e}); embedded CSS will be empty. \
|
||||
Install the v4 standalone CLI \
|
||||
(https://github.com/tailwindlabs/tailwindcss/releases) for production builds, \
|
||||
or use `serve-web --css-from <path>` against a `tailwindcss --watch` sidecar in dev."
|
||||
);
|
||||
std::fs::write(&output, b"").unwrap();
|
||||
}
|
||||
Ok(s) => fall_back(format!("tailwindcss exited with status {s}")),
|
||||
Err(e) => fall_back(format!("tailwindcss not invocable ({e})")),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,13 +1,11 @@
|
||||
#!/bin/bash
|
||||
|
||||
export BASE_URL=http://localhost:18080
|
||||
export FLEET_AUTH_ZITADEL_BASE=https://sso-stg.cb1.nationtech.io
|
||||
export FLEET_AUTH_CLIENT_ID=372626218874372917
|
||||
export FLEET_AUTH_SCOPE="openid profile email"
|
||||
export FLEET_AUTH_LOGOUT_REDIRECT_URI="http://localhost:18080/"
|
||||
export FLEET_OPERATOR_COOKIE_KEY_B64=6eKVpj88jwIcmaJajPfohdaIXhSPlfYCrHaOfymTcIWBAIadvhg7NHpMo5vPSMy90vac3cq2liWe1naSgkbaYg==
|
||||
export FLEET_AUTH_TRUSTED_AUDIENCES=371639797493596981,371683318111994677,372626218874372917,371639797157987125
|
||||
export BASE_URL=http://localhost:18080
|
||||
# The operator reads its auth config via ConfigClient — one typed value
|
||||
# each (EnvSource keys a Config struct under HARMONY_CONFIG_<TypeName>),
|
||||
# not a fistful of FLEET_AUTH_* vars. base_url is localhost for this local
|
||||
# serve-web; staging derives it from base_domain in the deploy.
|
||||
export HARMONY_CONFIG_ZitadelAuthConfig='{"zitadel_base":"https://sso-stg.cb1.nationtech.io","base_url":"http://localhost:18080","client_id":"372626218874372917","scope":"openid profile email","trusted_audiences":["371639797493596981","371683318111994677","372626218874372917","371639797157987125"],"logout_redirect_uri":"http://localhost:18080/"}'
|
||||
export HARMONY_CONFIG_OperatorCookieKey='{"cookie_key_b64":"6eKVpj88jwIcmaJajPfohdaIXhSPlfYCrHaOfymTcIWBAIadvhg7NHpMo5vPSMy90vac3cq2liWe1naSgkbaYg=="}'
|
||||
export RUST_LOG=debug
|
||||
|
||||
cargo watch -- cargo run -p harmony-fleet-operator --features web-frontend -- serve-web \
|
||||
|
||||
173
fleet/harmony-fleet-operator/src/device_status.rs
Normal file
173
fleet/harmony-fleet-operator/src/device_status.rs
Normal file
@@ -0,0 +1,173 @@
|
||||
//! Device liveness reconciler: NATS `device-heartbeat` KV → `Device.status`.
|
||||
//!
|
||||
//! Agents ping `heartbeat.<device_id>` every ~30 s. This reconciler
|
||||
//! keeps the latest ping per device in memory (from a KV watch) and,
|
||||
//! on a tick, reflects freshness onto `Device.status` —
|
||||
//! `lastHeartbeat` + a coarse [`Reachability`]. That makes liveness
|
||||
//! visible to `kubectl get devices` and lets the dashboard read it
|
||||
//! from the CR instead of re-deriving staleness from NATS.
|
||||
//!
|
||||
//! Mirrors the aggregator's "watch fills a cache, a tick patches the
|
||||
//! CR" shape. The staleness threshold lives here only — it's a device
|
||||
//! concept the operator owns, not something every reader re-decides.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Result;
|
||||
use async_nats::jetstream::kv::{Operation, Store};
|
||||
use chrono::{DateTime, Utc};
|
||||
use futures_util::StreamExt;
|
||||
use harmony_reconciler_contracts::{BUCKET_DEVICE_HEARTBEAT, HeartbeatPayload};
|
||||
use kube::api::{Api, Patch, PatchParams};
|
||||
use kube::{Client, ResourceExt};
|
||||
use serde_json::json;
|
||||
use tokio::sync::Mutex;
|
||||
|
||||
use harmony::modules::fleet::operator::{Device, Reachability};
|
||||
|
||||
/// A device with no heartbeat within this window is `Stale`. Agents
|
||||
/// ping every 30 s, so this tolerates ~2 missed pings.
|
||||
const STALE_AFTER: Duration = Duration::from_secs(90);
|
||||
/// How often to re-evaluate freshness and patch changed devices.
|
||||
const TICK: Duration = Duration::from_secs(30);
|
||||
|
||||
pub async fn run(client: Client, js: async_nats::jetstream::Context) -> Result<()> {
|
||||
let bucket = js
|
||||
.create_key_value(async_nats::jetstream::kv::Config {
|
||||
bucket: BUCKET_DEVICE_HEARTBEAT.to_string(),
|
||||
..Default::default()
|
||||
})
|
||||
.await?;
|
||||
|
||||
let heartbeats: Mutex<HashMap<String, DateTime<Utc>>> = Mutex::new(HashMap::new());
|
||||
let devices: Api<Device> = Api::all(client);
|
||||
|
||||
tokio::try_join!(
|
||||
watch_heartbeats(&bucket, &heartbeats),
|
||||
patch_loop(&devices, &heartbeats),
|
||||
)?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn watch_heartbeats(
|
||||
bucket: &Store,
|
||||
heartbeats: &Mutex<HashMap<String, DateTime<Utc>>>,
|
||||
) -> Result<()> {
|
||||
let mut watch = bucket.watch_with_history(">").await?;
|
||||
tracing::info!("device-status: watching device-heartbeat KV");
|
||||
while let Some(entry_res) = watch.next().await {
|
||||
let entry = match entry_res {
|
||||
Ok(e) => e,
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "device-status: watch delivery error");
|
||||
continue;
|
||||
}
|
||||
};
|
||||
match entry.operation {
|
||||
Operation::Put => {
|
||||
if let Ok(hb) = serde_json::from_slice::<HeartbeatPayload>(&entry.value) {
|
||||
heartbeats
|
||||
.lock()
|
||||
.await
|
||||
.insert(hb.device_id.to_string(), hb.at);
|
||||
}
|
||||
}
|
||||
Operation::Delete | Operation::Purge => {
|
||||
if let Some(id) = entry.key.strip_prefix("heartbeat.") {
|
||||
heartbeats.lock().await.remove(id);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn patch_loop(
|
||||
devices: &Api<Device>,
|
||||
heartbeats: &Mutex<HashMap<String, DateTime<Utc>>>,
|
||||
) -> Result<()> {
|
||||
// Last status written per device, to skip no-op patches.
|
||||
let mut applied: HashMap<String, (Reachability, DateTime<Utc>)> = HashMap::new();
|
||||
let mut ticker = tokio::time::interval(TICK);
|
||||
loop {
|
||||
ticker.tick().await;
|
||||
let snapshot: Vec<(String, DateTime<Utc>)> = heartbeats
|
||||
.lock()
|
||||
.await
|
||||
.iter()
|
||||
.map(|(k, v)| (k.clone(), *v))
|
||||
.collect();
|
||||
let now = Utc::now();
|
||||
for (id, at) in snapshot {
|
||||
let reachability = reachability(at, now);
|
||||
if applied.get(&id) == Some(&(reachability, at)) {
|
||||
continue;
|
||||
}
|
||||
if patch_status(devices, &id, reachability, at).await {
|
||||
applied.insert(id, (reachability, at));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn reachability(last_heartbeat: DateTime<Utc>, now: DateTime<Utc>) -> Reachability {
|
||||
if (now - last_heartbeat)
|
||||
.to_std()
|
||||
.is_ok_and(|d| d <= STALE_AFTER)
|
||||
{
|
||||
Reachability::Reachable
|
||||
} else {
|
||||
Reachability::Stale
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns whether the patch succeeded (so we only cache applied state
|
||||
/// on success and retry next tick otherwise).
|
||||
async fn patch_status(
|
||||
devices: &Api<Device>,
|
||||
id: &str,
|
||||
reachability: Reachability,
|
||||
last_heartbeat: DateTime<Utc>,
|
||||
) -> bool {
|
||||
let status = json!({
|
||||
"status": {
|
||||
"lastHeartbeat": last_heartbeat.to_rfc3339(),
|
||||
"reachability": reachability,
|
||||
}
|
||||
});
|
||||
match devices
|
||||
.patch_status(id, &PatchParams::default(), &Patch::Merge(&status))
|
||||
.await
|
||||
{
|
||||
Ok(d) => {
|
||||
tracing::debug!(device = %d.name_any(), ?reachability, "device-status: patched");
|
||||
true
|
||||
}
|
||||
// A heartbeat can outrace the Device CR's creation by the
|
||||
// device-reconciler; skip this tick and retry on the next.
|
||||
Err(kube::Error::Api(ae)) if ae.code == 404 => false,
|
||||
Err(e) => {
|
||||
tracing::warn!(%id, error = %e, "device-status: patch failed");
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn reachable_within_window_stale_after() {
|
||||
let now = Utc::now();
|
||||
assert_eq!(
|
||||
reachability(now - chrono::Duration::seconds(10), now),
|
||||
Reachability::Reachable
|
||||
);
|
||||
assert_eq!(
|
||||
reachability(now - chrono::Duration::seconds(120), now),
|
||||
Reachability::Stale
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -14,4 +14,5 @@
|
||||
|
||||
pub mod commands;
|
||||
pub mod device_reconciler;
|
||||
pub mod device_status;
|
||||
pub mod fleet_aggregator;
|
||||
|
||||
@@ -5,7 +5,7 @@ mod frontend;
|
||||
#[cfg(feature = "web-frontend")]
|
||||
mod service;
|
||||
|
||||
use harmony_fleet_operator::{device_reconciler, fleet_aggregator};
|
||||
use harmony_fleet_operator::{device_reconciler, device_status, fleet_aggregator};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use async_nats::jetstream;
|
||||
@@ -109,6 +109,9 @@ async fn main() -> Result<()> {
|
||||
}
|
||||
}
|
||||
|
||||
/// `serve-web` subcommand: dashboard on its own (mock data, or the live
|
||||
/// CR-reading service). The deployed operator instead serves the
|
||||
/// dashboard alongside the reconcile loop — see [`spawn_dashboard`].
|
||||
#[cfg(feature = "web-frontend")]
|
||||
async fn serve_web(
|
||||
mock: bool,
|
||||
@@ -117,22 +120,46 @@ async fn serve_web(
|
||||
live_reload: bool,
|
||||
) -> Result<()> {
|
||||
use std::sync::Arc;
|
||||
use std::time::Duration;
|
||||
|
||||
use frontend::server::{AppState, Config};
|
||||
use service::{FleetService, mock::MockFleetService};
|
||||
use service::{FleetService, mock::MockFleetService, real::RealFleetService};
|
||||
|
||||
let fleet: Arc<dyn FleetService> = if mock {
|
||||
Arc::new(MockFleetService::default())
|
||||
} else {
|
||||
anyhow::bail!(
|
||||
"serve-web without --mock is not implemented yet (real FleetService impl pending). \
|
||||
Pass --mock for the dev workflow."
|
||||
);
|
||||
Arc::new(RealFleetService::new(Client::try_default().await?))
|
||||
};
|
||||
serve_dashboard(fleet, addr, css_from, live_reload).await
|
||||
}
|
||||
|
||||
let cookie_key = harmony_zitadel_auth::cookie_key_from_env();
|
||||
let config = harmony_zitadel_auth::config_from_env();
|
||||
/// Build the Zitadel-authenticated web server around a [`FleetService`]
|
||||
/// and serve until it exits. Shared by the `serve-web` subcommand and
|
||||
/// the in-process dashboard the reconcile loop spawns.
|
||||
#[cfg(feature = "web-frontend")]
|
||||
async fn serve_dashboard(
|
||||
fleet: std::sync::Arc<dyn service::FleetService>,
|
||||
addr: std::net::SocketAddr,
|
||||
css_from: Option<PathBuf>,
|
||||
live_reload: bool,
|
||||
) -> Result<()> {
|
||||
use std::time::Duration;
|
||||
|
||||
use frontend::server::{AppState, Config};
|
||||
use harmony_zitadel_auth::{OperatorCookieKey, ZitadelAuthConfig};
|
||||
|
||||
// One typed config value each, resolved EnvSource → OpenBao by
|
||||
// ConfigClient — not a fistful of FLEET_AUTH_* env vars. Namespace
|
||||
// only matters for the OpenBao source; EnvSource reads regardless.
|
||||
let ns = std::env::var("HARMONY_CONFIG_NAMESPACE").unwrap_or_else(|_| "fleet-staging".into());
|
||||
let cc = harmony_config::ConfigClient::for_namespace(&ns).await;
|
||||
let config: ZitadelAuthConfig = cc
|
||||
.get()
|
||||
.await
|
||||
.context("loading ZitadelAuthConfig (HARMONY_CONFIG_ZitadelAuthConfig or OpenBao)")?;
|
||||
let cookie_key = cc
|
||||
.get::<OperatorCookieKey>()
|
||||
.await
|
||||
.context("loading OperatorCookieKey")?
|
||||
.key()?;
|
||||
let http_client = reqwest::Client::new();
|
||||
|
||||
let jwks = harmony_zitadel_auth::JwksCache::new(&config.zitadel_base, http_client.clone())
|
||||
@@ -155,6 +182,25 @@ async fn serve_web(
|
||||
.await
|
||||
}
|
||||
|
||||
/// Serve the dashboard in-process alongside the reconcile loop. Failure
|
||||
/// (e.g. Zitadel not yet reachable for JWKS) is logged but never tears
|
||||
/// down reconcile — the read UI is best-effort, the controller is not.
|
||||
#[cfg(feature = "web-frontend")]
|
||||
fn spawn_dashboard(client: Client) {
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
|
||||
use service::real::RealFleetService;
|
||||
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], frontend::server::DEFAULT_PORT));
|
||||
tokio::spawn(async move {
|
||||
let fleet = Arc::new(RealFleetService::new(client));
|
||||
if let Err(e) = serve_dashboard(fleet, addr, None, false).await {
|
||||
tracing::error!(error = %e, "dashboard server exited; reconcile continues");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()> {
|
||||
let nats = connect_with_retry(nats_url, credentials_toml).await?;
|
||||
tracing::info!(url = %nats_url, "connected to NATS");
|
||||
@@ -169,9 +215,16 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()>
|
||||
|
||||
let client = Client::try_default().await?;
|
||||
|
||||
// Three concurrent tasks:
|
||||
// Serve the read-only dashboard in the same process (best-effort;
|
||||
// it reads CRs only, no NATS). Built only with the web-frontend
|
||||
// feature; absent from the lean reconcile-only image.
|
||||
#[cfg(feature = "web-frontend")]
|
||||
spawn_dashboard(client.clone());
|
||||
|
||||
// Concurrent tasks:
|
||||
// controller — CR validation + finalizer-cleanup
|
||||
// device_reconciler — NATS device-info → Device CR
|
||||
// device_status — NATS device-heartbeat → Device.status liveness
|
||||
// fleet_aggregator — watches Deployments + Devices + states,
|
||||
// writes desired-state KV, patches CR status
|
||||
// Any failing tears the process down; kube-rs Controller swallows
|
||||
@@ -179,9 +232,12 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()>
|
||||
let ctl_client = client.clone();
|
||||
let dr_client = client.clone();
|
||||
let dr_js = js.clone();
|
||||
let ds_client = client.clone();
|
||||
let ds_js = js.clone();
|
||||
tokio::select! {
|
||||
r = controller::run(ctl_client, desired_state_kv) => r,
|
||||
r = device_reconciler::run(dr_client, dr_js) => r,
|
||||
r = device_status::run(ds_client, ds_js) => r,
|
||||
r = fleet_aggregator::run(client, js) => r,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
pub mod mock;
|
||||
pub mod real;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
|
||||
461
fleet/harmony-fleet-operator/src/service/real.rs
Normal file
461
fleet/harmony-fleet-operator/src/service/real.rs
Normal file
@@ -0,0 +1,461 @@
|
||||
//! Live [`FleetService`] as a read-only projection of Kubernetes CRs.
|
||||
//!
|
||||
//! The operator is the write side: `device_reconciler` materializes
|
||||
//! `Device` CRs (labels + inventory), `device_status` reflects liveness
|
||||
//! onto `Device.status`, and `fleet_aggregator` writes
|
||||
//! `Deployment.status.aggregate`. This dashboard is the read side — it
|
||||
//! only reads those CRs and projects them to view DTOs. No NATS: the
|
||||
//! CR is the single contract between the two sides.
|
||||
|
||||
use std::collections::{BTreeMap, HashSet};
|
||||
use std::sync::Mutex;
|
||||
|
||||
use anyhow::Context as _;
|
||||
use async_trait::async_trait;
|
||||
use chrono::{DateTime, Utc};
|
||||
use kube::api::{Api, ListParams, Patch, PatchParams};
|
||||
use kube::{Client, ResourceExt};
|
||||
use serde_json::json;
|
||||
|
||||
use harmony::modules::fleet::operator::{
|
||||
Deployment as DeploymentCr, Device as DeviceCr, DeviceStatus as DeviceLiveness, Reachability,
|
||||
};
|
||||
use harmony::modules::podman::ReconcileScore;
|
||||
use harmony_fleet_operator::fleet_aggregator::selector_matches;
|
||||
|
||||
use super::{
|
||||
Alert, AlertSeverity, DashboardDetail, DeploymentDetail, DeploymentStatus, DeviceDetail,
|
||||
DeviceStatus, FleetService,
|
||||
};
|
||||
|
||||
/// Label the operator UI sets to quarantine a device.
|
||||
const BLACKLIST_LABEL: &str = "fleet.nationtech.io/blacklisted";
|
||||
/// Label key the agent uses for a device's region, if it sets one.
|
||||
const REGION_LABEL: &str = "region";
|
||||
|
||||
pub struct RealFleetService {
|
||||
kube: Client,
|
||||
/// In-memory ack set. Alerts are derived from live CR state and
|
||||
/// have no store of their own, so acks don't survive a restart.
|
||||
acked_alerts: Mutex<HashSet<String>>,
|
||||
}
|
||||
|
||||
impl RealFleetService {
|
||||
pub fn new(kube: Client) -> Self {
|
||||
Self {
|
||||
kube,
|
||||
acked_alerts: Mutex::new(HashSet::new()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn device_crs(&self) -> anyhow::Result<Vec<DeviceCr>> {
|
||||
let api: Api<DeviceCr> = Api::all(self.kube.clone());
|
||||
Ok(api.list(&ListParams::default()).await?.items)
|
||||
}
|
||||
|
||||
async fn deployment_crs(&self) -> anyhow::Result<Vec<DeploymentCr>> {
|
||||
let api: Api<DeploymentCr> = Api::all(self.kube.clone());
|
||||
Ok(api.list(&ListParams::default()).await?.items)
|
||||
}
|
||||
|
||||
async fn devices(&self) -> anyhow::Result<Vec<DeviceDetail>> {
|
||||
let deployments = self.deployment_crs().await?;
|
||||
let now = Utc::now();
|
||||
let mut devices: Vec<DeviceDetail> = self
|
||||
.device_crs()
|
||||
.await?
|
||||
.iter()
|
||||
.map(|cr| map_device(cr, &deployments, now))
|
||||
.collect();
|
||||
devices.sort_by(|a, b| a.id.cmp(&b.id));
|
||||
Ok(devices)
|
||||
}
|
||||
|
||||
async fn deployments(&self) -> anyhow::Result<Vec<DeploymentDetail>> {
|
||||
let mut deployments: Vec<DeploymentDetail> = self
|
||||
.deployment_crs()
|
||||
.await?
|
||||
.iter()
|
||||
.map(map_deployment)
|
||||
.collect();
|
||||
deployments.sort_by(|a, b| a.name.cmp(&b.name));
|
||||
Ok(deployments)
|
||||
}
|
||||
}
|
||||
|
||||
fn map_device(cr: &DeviceCr, deployments: &[DeploymentCr], now: DateTime<Utc>) -> DeviceDetail {
|
||||
let labels = cr.metadata.labels.clone().unwrap_or_default();
|
||||
let blacklisted = labels.get(BLACKLIST_LABEL).map(String::as_str) == Some("true");
|
||||
let last_seen = cr
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.last_heartbeat.as_deref())
|
||||
.and_then(|s| DateTime::parse_from_rfc3339(s).ok())
|
||||
.map(|dt| dt.with_timezone(&Utc))
|
||||
.or_else(|| cr.creation_timestamp().map(|t| t.0))
|
||||
.unwrap_or(now);
|
||||
|
||||
DeviceDetail {
|
||||
id: cr.name_any(),
|
||||
status: device_status(blacklisted, cr.status.as_ref()),
|
||||
last_seen,
|
||||
minutes_ago: (now - last_seen).num_minutes().max(0),
|
||||
deployment: primary_deployment(&labels, deployments),
|
||||
region: labels
|
||||
.get(REGION_LABEL)
|
||||
.cloned()
|
||||
.unwrap_or_else(|| "\u{2014}".to_string()),
|
||||
tags: tags_from_labels(&labels),
|
||||
inventory: cr.spec.inventory.clone(),
|
||||
}
|
||||
}
|
||||
|
||||
/// UI status from the device's quarantine label + operator-written
|
||||
/// liveness. Failing/Pending are deployment-level (the Deployments
|
||||
/// view), not device liveness, so they don't appear here.
|
||||
fn device_status(blacklisted: bool, liveness: Option<&DeviceLiveness>) -> DeviceStatus {
|
||||
if blacklisted {
|
||||
return DeviceStatus::Blacklisted;
|
||||
}
|
||||
match liveness.map(|s| s.reachability) {
|
||||
Some(Reachability::Reachable) => DeviceStatus::Healthy,
|
||||
Some(Reachability::Stale) => DeviceStatus::Stale,
|
||||
Some(Reachability::Unknown) | None => DeviceStatus::Unknown,
|
||||
}
|
||||
}
|
||||
|
||||
/// First deployment (by name) whose selector matches the device — the
|
||||
/// canonical [`selector_matches`] over CR labels, the same matcher the
|
||||
/// aggregator uses. No reconstruction.
|
||||
fn primary_deployment(
|
||||
labels: &BTreeMap<String, String>,
|
||||
deployments: &[DeploymentCr],
|
||||
) -> Option<String> {
|
||||
let mut matched: Vec<String> = deployments
|
||||
.iter()
|
||||
.filter(|d| selector_matches(&d.spec.target_selector, labels))
|
||||
.map(ResourceExt::name_any)
|
||||
.collect();
|
||||
matched.sort();
|
||||
matched.into_iter().next()
|
||||
}
|
||||
|
||||
/// Routing labels rendered as `k=v` chips, minus internal keys.
|
||||
fn tags_from_labels(labels: &BTreeMap<String, String>) -> Vec<String> {
|
||||
labels
|
||||
.iter()
|
||||
.filter(|(k, _)| k.as_str() != BLACKLIST_LABEL && k.as_str() != REGION_LABEL)
|
||||
.map(|(k, v)| format!("{k}={v}"))
|
||||
.collect()
|
||||
}
|
||||
|
||||
fn map_deployment(cr: &DeploymentCr) -> DeploymentDetail {
|
||||
let agg = cr
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.aggregate.clone())
|
||||
.unwrap_or_default();
|
||||
let status = if agg.failed > 0 {
|
||||
DeploymentStatus::Failing
|
||||
} else if agg.pending > 0 {
|
||||
DeploymentStatus::Rolling
|
||||
} else {
|
||||
DeploymentStatus::Active
|
||||
};
|
||||
DeploymentDetail {
|
||||
name: cr.name_any(),
|
||||
version: deployment_version(&cr.spec.score),
|
||||
status,
|
||||
target: agg.matched_device_count,
|
||||
healthy: agg.succeeded,
|
||||
failing: agg.failed,
|
||||
pending: agg.pending,
|
||||
updated_at: cr
|
||||
.creation_timestamp()
|
||||
.map(|t| t.0.format("%Y-%m-%d %H:%M").to_string())
|
||||
.unwrap_or_else(|| "\u{2014}".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
/// No first-class version on the CR; use the first service's image tag
|
||||
/// as the human-facing proxy (`nginx:1.25` → `1.25`).
|
||||
fn deployment_version(score: &ReconcileScore) -> String {
|
||||
let ReconcileScore::PodmanV0(s) = score;
|
||||
s.services
|
||||
.first()
|
||||
.map(|svc| {
|
||||
svc.image
|
||||
.rsplit_once(':')
|
||||
.map(|(_, tag)| tag.to_string())
|
||||
.unwrap_or_else(|| svc.image.clone())
|
||||
})
|
||||
.unwrap_or_else(|| "\u{2014}".to_string())
|
||||
}
|
||||
|
||||
/// Alerts derived from current CR state (no alert store yet): one
|
||||
/// critical per failing deployment, one warning per stale device.
|
||||
fn derive_alerts(
|
||||
devices: &[DeviceDetail],
|
||||
deployments: &[DeploymentDetail],
|
||||
acked: &HashSet<String>,
|
||||
) -> Vec<Alert> {
|
||||
let mut alerts = Vec::new();
|
||||
for d in deployments {
|
||||
if d.failing > 0 {
|
||||
let id = format!("dep:{}:failing", d.name);
|
||||
alerts.push(Alert {
|
||||
acked: acked.contains(&id),
|
||||
id,
|
||||
severity: AlertSeverity::Critical,
|
||||
title: format!("{} has {} failing device(s)", d.name, d.failing),
|
||||
deployment: Some(d.name.clone()),
|
||||
device: None,
|
||||
at: String::new(),
|
||||
});
|
||||
}
|
||||
}
|
||||
for dev in devices {
|
||||
if dev.status == DeviceStatus::Stale {
|
||||
let id = format!("device:{}:stale", dev.id);
|
||||
alerts.push(Alert {
|
||||
acked: acked.contains(&id),
|
||||
id,
|
||||
severity: AlertSeverity::Warning,
|
||||
title: format!("{} last pinged {}m ago", dev.id, dev.minutes_ago),
|
||||
deployment: dev.deployment.clone(),
|
||||
device: Some(dev.id.clone()),
|
||||
at: String::new(),
|
||||
});
|
||||
}
|
||||
}
|
||||
alerts
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl FleetService for RealFleetService {
|
||||
async fn dashboard_detail(&self) -> anyhow::Result<DashboardDetail> {
|
||||
let devices = self.devices().await?;
|
||||
let mut deployments = self.deployments().await?;
|
||||
let acked = self.acked_alerts.lock().unwrap().clone();
|
||||
let active_alerts = derive_alerts(&devices, &deployments, &acked)
|
||||
.into_iter()
|
||||
.filter(|a| !a.acked)
|
||||
.collect();
|
||||
|
||||
let mut d = DashboardDetail {
|
||||
devices_total: devices.len() as u32,
|
||||
devices_healthy: 0,
|
||||
devices_pending: 0,
|
||||
devices_failing: 0,
|
||||
devices_stale: 0,
|
||||
devices_blacklisted: 0,
|
||||
devices_unknown: 0,
|
||||
deployments_total: deployments.len(),
|
||||
health_pct: 0,
|
||||
attention_devices: devices
|
||||
.iter()
|
||||
.filter(|d| matches!(d.status, DeviceStatus::Stale | DeviceStatus::Unknown))
|
||||
.take(12)
|
||||
.cloned()
|
||||
.collect(),
|
||||
top_deployments: Vec::new(),
|
||||
active_alerts,
|
||||
rolling_count: 0,
|
||||
failing_count: 0,
|
||||
};
|
||||
for dev in &devices {
|
||||
match dev.status {
|
||||
DeviceStatus::Healthy => d.devices_healthy += 1,
|
||||
DeviceStatus::Pending => d.devices_pending += 1,
|
||||
DeviceStatus::Stale => d.devices_stale += 1,
|
||||
DeviceStatus::Failing => d.devices_failing += 1,
|
||||
DeviceStatus::Blacklisted => d.devices_blacklisted += 1,
|
||||
DeviceStatus::Unknown => d.devices_unknown += 1,
|
||||
}
|
||||
}
|
||||
if d.devices_total > 0 {
|
||||
d.health_pct =
|
||||
((d.devices_healthy as f64 / d.devices_total as f64) * 100.0).round() as u32;
|
||||
}
|
||||
for dep in &deployments {
|
||||
match dep.status {
|
||||
DeploymentStatus::Rolling => d.rolling_count += 1,
|
||||
DeploymentStatus::Failing => d.failing_count += 1,
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
deployments.truncate(4);
|
||||
d.top_deployments = deployments;
|
||||
Ok(d)
|
||||
}
|
||||
|
||||
async fn list_devices(&self) -> anyhow::Result<Vec<DeviceDetail>> {
|
||||
self.devices().await
|
||||
}
|
||||
|
||||
async fn get_device(&self, id: &str) -> anyhow::Result<Option<DeviceDetail>> {
|
||||
Ok(self.devices().await?.into_iter().find(|d| d.id == id))
|
||||
}
|
||||
|
||||
async fn list_deployments(&self) -> anyhow::Result<Vec<DeploymentDetail>> {
|
||||
self.deployments().await
|
||||
}
|
||||
|
||||
async fn get_deployment(&self, name: &str) -> anyhow::Result<Option<DeploymentDetail>> {
|
||||
Ok(self
|
||||
.deployments()
|
||||
.await?
|
||||
.into_iter()
|
||||
.find(|d| d.name == name))
|
||||
}
|
||||
|
||||
async fn get_deployment_devices(&self, name: &str) -> anyhow::Result<Vec<DeviceDetail>> {
|
||||
let deployments = self.deployment_crs().await?;
|
||||
let Some(cr) = deployments.iter().find(|c| c.name_any() == name) else {
|
||||
return Ok(Vec::new());
|
||||
};
|
||||
let selector = cr.spec.target_selector.clone();
|
||||
let now = Utc::now();
|
||||
Ok(self
|
||||
.device_crs()
|
||||
.await?
|
||||
.iter()
|
||||
.filter(|dev| {
|
||||
selector_matches(&selector, &dev.metadata.labels.clone().unwrap_or_default())
|
||||
})
|
||||
.map(|dev| map_device(dev, &deployments, now))
|
||||
.collect())
|
||||
}
|
||||
|
||||
async fn blacklist_device(&self, id: &str) -> anyhow::Result<DeviceDetail> {
|
||||
let api: Api<DeviceCr> = Api::all(self.kube.clone());
|
||||
let patch = json!({ "metadata": { "labels": { BLACKLIST_LABEL: "true" } } });
|
||||
api.patch(id, &PatchParams::default(), &Patch::Merge(&patch))
|
||||
.await
|
||||
.with_context(|| format!("patching blacklist label on device {id}"))?;
|
||||
self.get_device(id)
|
||||
.await?
|
||||
.ok_or_else(|| anyhow::anyhow!("device {id} not found after blacklist"))
|
||||
}
|
||||
|
||||
async fn list_alerts(&self) -> anyhow::Result<Vec<Alert>> {
|
||||
let devices = self.devices().await?;
|
||||
let deployments = self.deployments().await?;
|
||||
let acked = self.acked_alerts.lock().unwrap().clone();
|
||||
Ok(derive_alerts(&devices, &deployments, &acked))
|
||||
}
|
||||
|
||||
async fn ack_alert(&self, id: &str) -> anyhow::Result<bool> {
|
||||
Ok(self.acked_alerts.lock().unwrap().insert(id.to_string()))
|
||||
}
|
||||
|
||||
async fn run_command(&self, _device_id: &str, command: &str) -> anyhow::Result<String> {
|
||||
// Seam only: the device round-trip (publish to the agent over
|
||||
// NATS, stream stdout/stderr back) needs agent-side support.
|
||||
Ok(format!(
|
||||
"$ {command}\n[device command transport not implemented yet]"
|
||||
))
|
||||
}
|
||||
|
||||
async fn filtered_devices(
|
||||
&self,
|
||||
status: Option<DeviceStatus>,
|
||||
deployment: Option<String>,
|
||||
region: Option<String>,
|
||||
search: Option<String>,
|
||||
) -> anyhow::Result<Vec<DeviceDetail>> {
|
||||
let search = search.map(|s| s.to_lowercase());
|
||||
Ok(self
|
||||
.devices()
|
||||
.await?
|
||||
.into_iter()
|
||||
.filter(|d| {
|
||||
status.is_none_or(|s| d.status == s)
|
||||
&& deployment
|
||||
.as_deref()
|
||||
.is_none_or(|dep| d.deployment.as_deref() == Some(dep))
|
||||
&& region.as_deref().is_none_or(|r| d.region == r)
|
||||
&& search.as_deref().is_none_or(|q| {
|
||||
d.id.to_lowercase().contains(q)
|
||||
|| d.tags.iter().any(|t| t.to_lowercase().contains(q))
|
||||
|| d.deployment
|
||||
.as_deref()
|
||||
.is_some_and(|dep| dep.to_lowercase().contains(q))
|
||||
})
|
||||
})
|
||||
.collect())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use harmony::modules::podman::{PodmanService, PodmanV0Score};
|
||||
|
||||
fn liveness(r: Reachability) -> DeviceLiveness {
|
||||
DeviceLiveness {
|
||||
last_heartbeat: None,
|
||||
reachability: r,
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn device_status_maps_liveness_and_blacklist() {
|
||||
assert_eq!(
|
||||
device_status(true, Some(&liveness(Reachability::Reachable))),
|
||||
DeviceStatus::Blacklisted
|
||||
);
|
||||
assert_eq!(
|
||||
device_status(false, Some(&liveness(Reachability::Reachable))),
|
||||
DeviceStatus::Healthy
|
||||
);
|
||||
assert_eq!(
|
||||
device_status(false, Some(&liveness(Reachability::Stale))),
|
||||
DeviceStatus::Stale
|
||||
);
|
||||
assert_eq!(device_status(false, None), DeviceStatus::Unknown);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn version_from_image_tag() {
|
||||
let score = ReconcileScore::PodmanV0(PodmanV0Score {
|
||||
services: vec![PodmanService {
|
||||
name: "web".into(),
|
||||
image: "nginx:1.25".into(),
|
||||
ports: vec![],
|
||||
env: vec![],
|
||||
volumes: vec![],
|
||||
restart_policy: Default::default(),
|
||||
}],
|
||||
});
|
||||
assert_eq!(deployment_version(&score), "1.25");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn alerts_from_failing_and_stale() {
|
||||
let devices = vec![DeviceDetail {
|
||||
id: "pi-09".into(),
|
||||
status: DeviceStatus::Stale,
|
||||
last_seen: Utc::now(),
|
||||
minutes_ago: 14,
|
||||
deployment: None,
|
||||
region: "\u{2014}".into(),
|
||||
tags: vec![],
|
||||
inventory: None,
|
||||
}];
|
||||
let deployments = vec![DeploymentDetail {
|
||||
name: "edge".into(),
|
||||
version: "1.0".into(),
|
||||
status: DeploymentStatus::Failing,
|
||||
target: 3,
|
||||
healthy: 1,
|
||||
failing: 2,
|
||||
pending: 0,
|
||||
updated_at: "\u{2014}".into(),
|
||||
}];
|
||||
let alerts = derive_alerts(&devices, &deployments, &HashSet::new());
|
||||
assert_eq!(alerts.len(), 2);
|
||||
assert!(alerts.iter().any(|a| a.severity == AlertSeverity::Critical));
|
||||
assert!(alerts.iter().any(|a| a.severity == AlertSeverity::Warning));
|
||||
}
|
||||
}
|
||||
76
fleet/scripts/dev-deploy-operator.sh
Executable file
76
fleet/scripts/dev-deploy-operator.sh
Executable file
@@ -0,0 +1,76 @@
|
||||
#!/usr/bin/env bash
|
||||
# Build + push + deploy the fleet operator to staging in one shot — the
|
||||
# fast inner loop that skips the git-tag → CI → release ceremony.
|
||||
#
|
||||
# It runs the SAME two binaries the release/CD path uses, just with a
|
||||
# throwaway dev version:
|
||||
# 1. harmony-fleet-publish — docker-build the operator image (with the
|
||||
# web dashboard), generate the helm chart, push BOTH to the OCI
|
||||
# registry.
|
||||
# 2. harmony-fleet-deploy — helm upgrade --install that chart onto
|
||||
# staging, applying the dashboard ingress + Service + cert-manager
|
||||
# cert (host/issuer come from FleetDeployConfig).
|
||||
#
|
||||
# Each run gets a unique semver-dev version, so the node always pulls a
|
||||
# fresh image and helm rolls a new ReplicaSet — no `:dev`-tag cache traps.
|
||||
# The first build is a full compile; thereafter the Dockerfile's BuildKit
|
||||
# cache mounts recompile only what changed (seconds).
|
||||
#
|
||||
# Usage:
|
||||
# ./fleet/scripts/dev-deploy-operator.sh
|
||||
# REGISTRY=hub.nationtech.io PROJECT=harmony ./fleet/scripts/dev-deploy-operator.sh
|
||||
# PUBLISH_ONLY=1 ./fleet/scripts/dev-deploy-operator.sh # build+push, skip deploy
|
||||
#
|
||||
# Prerequisites (same as the release path):
|
||||
# - docker (BuildKit) + helm on PATH, logged in to $REGISTRY.
|
||||
# - The deploy reads FleetDeployConfig + FleetDeploySecrets for
|
||||
# $CONFIG_NAMESPACE (Env → OpenBao). Provide them however you already
|
||||
# do for `harmony-fleet-deploy` — e.g. a sourced `.envrc` with
|
||||
# OPENBAO_URL / OPENBAO_TOKEN / HARMONY_CONFIG_NAMESPACE, and a
|
||||
# KUBECONFIG pointed at staging (or a kubeconfig seeded in OpenBao).
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
REGISTRY="${REGISTRY:-hub.nationtech.io}"
|
||||
PROJECT="${PROJECT:-harmony}"
|
||||
CONFIG_NAMESPACE="${CONFIG_NAMESPACE:-fleet-staging}"
|
||||
PUBLISH_ONLY="${PUBLISH_ONLY:-0}"
|
||||
|
||||
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
|
||||
REPO_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)"
|
||||
cd "$REPO_ROOT"
|
||||
|
||||
# Pick up a local .envrc (OpenBao creds, KUBECONFIG, …) if present and
|
||||
# not already loaded by direnv.
|
||||
if [[ -f "$REPO_ROOT/.envrc" ]]; then
|
||||
# shellcheck disable=SC1091
|
||||
source "$REPO_ROOT/.envrc"
|
||||
fi
|
||||
|
||||
# Unique + semver-valid: helm rejects a non-semver chart version, and a
|
||||
# fresh version forces a clean image pull + rollout every iteration.
|
||||
VERSION="0.0.0-dev.$(date -u +'%Y%m%d%H%M%S')"
|
||||
|
||||
export DOCKER_BUILDKIT=1
|
||||
|
||||
echo "==> [1/2] build + push image and chart @ ${VERSION}"
|
||||
cargo run -q -p harmony-fleet-deploy --bin harmony-fleet-publish -- \
|
||||
--version "$VERSION" \
|
||||
--registry "$REGISTRY" \
|
||||
--project "$PROJECT"
|
||||
|
||||
if [[ "$PUBLISH_ONLY" == "1" ]]; then
|
||||
echo "==> PUBLISH_ONLY=1, skipping deploy. Chart: oci://${REGISTRY}/${PROJECT}/harmony-fleet-operator-chart:${VERSION}"
|
||||
exit 0
|
||||
fi
|
||||
|
||||
echo "==> [2/2] deploy chart @ ${VERSION} to ${CONFIG_NAMESPACE}"
|
||||
cargo run -q -p harmony-fleet-deploy --bin harmony-fleet-deploy -- \
|
||||
--operator-chart-version "$VERSION" \
|
||||
--operator-chart-registry "$REGISTRY" \
|
||||
--operator-chart-project "$PROJECT" \
|
||||
--config-namespace "$CONFIG_NAMESPACE" \
|
||||
--yes
|
||||
|
||||
echo
|
||||
echo "Deployed ${REGISTRY}/${PROJECT}/harmony-fleet-operator:${VERSION} to ${CONFIG_NAMESPACE}."
|
||||
@@ -95,7 +95,8 @@ pub struct AggregateLastError {
|
||||
version = "v1alpha1",
|
||||
kind = "Device",
|
||||
plural = "devices",
|
||||
shortname = "fleetdev"
|
||||
shortname = "fleetdev",
|
||||
status = "DeviceStatus"
|
||||
)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct DeviceSpec {
|
||||
@@ -104,3 +105,31 @@ pub struct DeviceSpec {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub inventory: Option<InventorySnapshot>,
|
||||
}
|
||||
|
||||
/// Operator-maintained liveness reflection of the NATS
|
||||
/// `device-heartbeat` bucket onto the CR, so `kubectl get devices` and
|
||||
/// the dashboard see reachability without reading NATS. Written by the
|
||||
/// device-status reconciler.
|
||||
#[derive(Serialize, Deserialize, Clone, Debug, Default, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct DeviceStatus {
|
||||
/// RFC 3339 timestamp of the last heartbeat seen. `None` until the
|
||||
/// device has pinged at least once.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub last_heartbeat: Option<String>,
|
||||
pub reachability: Reachability,
|
||||
}
|
||||
|
||||
/// Coarse liveness derived from heartbeat freshness. Failing/Pending
|
||||
/// are deployment-level concerns (see [`DeploymentAggregate`]), not
|
||||
/// device liveness.
|
||||
#[derive(Serialize, Deserialize, Clone, Copy, Debug, Default, PartialEq, Eq, JsonSchema)]
|
||||
pub enum Reachability {
|
||||
/// No heartbeat seen yet.
|
||||
#[default]
|
||||
Unknown,
|
||||
/// Heartbeat within the freshness window.
|
||||
Reachable,
|
||||
/// Last heartbeat is older than the freshness window.
|
||||
Stale,
|
||||
}
|
||||
|
||||
@@ -13,5 +13,5 @@ pub mod crd;
|
||||
|
||||
pub use crd::{
|
||||
AggregateLastError, Deployment, DeploymentAggregate, DeploymentSpec, DeploymentStatus, Device,
|
||||
DeviceSpec, Rollout, RolloutStrategy,
|
||||
DeviceSpec, DeviceStatus, Reachability, Rollout, RolloutStrategy,
|
||||
};
|
||||
|
||||
@@ -1,10 +1,15 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use harmony_macros::ingress_path;
|
||||
use harmony_types::id::Id;
|
||||
use k8s_openapi::api::networking::v1::Ingress;
|
||||
use log::{debug, trace};
|
||||
use k8s_openapi::api::networking::v1::{
|
||||
HTTPIngressPath, HTTPIngressRuleValue, Ingress, IngressBackend, IngressRule,
|
||||
IngressServiceBackend, IngressSpec, IngressTLS, ServiceBackendPort,
|
||||
};
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
||||
use log::debug;
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
|
||||
use crate::{
|
||||
data::Version,
|
||||
@@ -59,70 +64,71 @@ impl K8sIngressScore {
|
||||
/// Extracted from `create_interpret` so the TLS/annotation shape is
|
||||
/// unit-testable without a topology.
|
||||
fn render_ingress(&self) -> Ingress {
|
||||
let path = match self.path.clone() {
|
||||
Some(p) => p,
|
||||
None => ingress_path!("/"),
|
||||
};
|
||||
|
||||
let path_type = match self.path_type.clone() {
|
||||
Some(p) => p,
|
||||
None => PathType::Prefix,
|
||||
};
|
||||
|
||||
let ingress_class = match self.ingress_class_name.clone() {
|
||||
Some(ingress_class_name) => ingress_class_name,
|
||||
None => "\"default\"".to_string(),
|
||||
};
|
||||
|
||||
let mut ingress = json!(
|
||||
{
|
||||
"metadata": {
|
||||
"name": self.name.to_string(),
|
||||
},
|
||||
"spec": {
|
||||
"ingressClassName": ingress_class.as_str(),
|
||||
"rules": [
|
||||
{ "host": self.host.to_string(),
|
||||
"http": {
|
||||
"paths": [
|
||||
{
|
||||
"path": path,
|
||||
"pathType": path_type.as_str(),
|
||||
"backend": {
|
||||
"service": {
|
||||
"name": self.backend_service.to_string(),
|
||||
"port": {
|
||||
"number": self.port,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
);
|
||||
let path = self.path.clone().unwrap_or_else(|| ingress_path!("/"));
|
||||
let path_type = self.path_type.clone().unwrap_or(PathType::Prefix);
|
||||
let host = self.host.to_string();
|
||||
|
||||
// cert-manager TLS: the annotation issues the cert into
|
||||
// `secretName`, and the `tls` block must name that same Secret
|
||||
// for the Ingress to be portable (a bare `tls` host without a
|
||||
// secretName is rejected by OKD's ingress-to-route translation).
|
||||
if let Some(issuer) = &self.cluster_issuer {
|
||||
let secret_name = format!("{}-tls", self.host.to_string().replace('.', "-"));
|
||||
ingress["metadata"]["annotations"] = json!({
|
||||
"cert-manager.io/cluster-issuer": issuer,
|
||||
"route.openshift.io/termination": "edge",
|
||||
});
|
||||
ingress["spec"]["tls"] = json!([{
|
||||
"hosts": [self.host.to_string()],
|
||||
"secretName": secret_name,
|
||||
}]);
|
||||
}
|
||||
let (annotations, tls) = match &self.cluster_issuer {
|
||||
Some(issuer) => {
|
||||
let secret_name = format!("{}-tls", host.replace('.', "-"));
|
||||
let annotations = BTreeMap::from([
|
||||
("cert-manager.io/cluster-issuer".to_string(), issuer.clone()),
|
||||
// OKD edge termination — a no-op on non-OpenShift clusters.
|
||||
(
|
||||
"route.openshift.io/termination".to_string(),
|
||||
"edge".to_string(),
|
||||
),
|
||||
]);
|
||||
let tls = vec![IngressTLS {
|
||||
hosts: Some(vec![host.clone()]),
|
||||
secret_name: Some(secret_name),
|
||||
}];
|
||||
(Some(annotations), Some(tls))
|
||||
}
|
||||
None => (None, None),
|
||||
};
|
||||
|
||||
let ingress = Ingress {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(self.name.to_string()),
|
||||
annotations,
|
||||
..Default::default()
|
||||
},
|
||||
spec: Some(IngressSpec {
|
||||
// Left unset when `None` so the cluster's default
|
||||
// IngressClass claims the resource — setting it to a
|
||||
// bogus/empty value instead leaves the Ingress unrouted
|
||||
// (see docs/guides/kubernetes-ingress.md).
|
||||
ingress_class_name: self.ingress_class_name.clone(),
|
||||
rules: Some(vec![IngressRule {
|
||||
host: Some(host),
|
||||
http: Some(HTTPIngressRuleValue {
|
||||
paths: vec![HTTPIngressPath {
|
||||
path: Some(path),
|
||||
path_type: path_type.as_str().to_string(),
|
||||
backend: IngressBackend {
|
||||
service: Some(IngressServiceBackend {
|
||||
name: self.backend_service.to_string(),
|
||||
port: Some(ServiceBackendPort {
|
||||
name: None,
|
||||
number: Some(i32::from(self.port)),
|
||||
}),
|
||||
}),
|
||||
resource: None,
|
||||
},
|
||||
}],
|
||||
}),
|
||||
}]),
|
||||
tls,
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
trace!("Building ingresss object from Value {ingress:#}");
|
||||
let ingress: Ingress = serde_json::from_value(ingress).unwrap();
|
||||
debug!(
|
||||
"Successfully built Ingress for host {:?}",
|
||||
ingress.metadata.name
|
||||
@@ -231,6 +237,37 @@ mod tests {
|
||||
assert!(ing.spec.unwrap().tls.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ingress_class_omitted_when_none() {
|
||||
// Unset (not "default", not "") so the cluster's default
|
||||
// IngressClass claims the resource.
|
||||
let ing = score(None).render_ingress();
|
||||
assert!(ing.spec.unwrap().ingress_class_name.is_none());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn ingress_class_passed_through_when_set() {
|
||||
let mut s = score(None);
|
||||
s.ingress_class_name = Some("openshift-default".into());
|
||||
let ing = s.render_ingress();
|
||||
assert_eq!(
|
||||
ing.spec.unwrap().ingress_class_name.as_deref(),
|
||||
Some("openshift-default")
|
||||
);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn backend_and_path_render() {
|
||||
let ing = score(None).render_ingress();
|
||||
let rule = &ing.spec.unwrap().rules.unwrap()[0];
|
||||
assert_eq!(rule.host.as_deref(), Some("fleet-stg.cb1.nationtech.io"));
|
||||
let p = &rule.http.as_ref().unwrap().paths[0];
|
||||
assert_eq!(p.path_type, "Prefix");
|
||||
let svc = p.backend.service.as_ref().unwrap();
|
||||
assert_eq!(svc.name, "op");
|
||||
assert_eq!(svc.port.as_ref().unwrap().number, Some(18080));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn cluster_issuer_renders_cert_manager_and_tls() {
|
||||
let ing = score(Some("letsencrypt-prod".into())).render_ingress();
|
||||
|
||||
@@ -10,12 +10,14 @@ default = []
|
||||
axum = ["dep:axum", "dep:axum-extra"]
|
||||
|
||||
[dependencies]
|
||||
harmony_config = { path = "../harmony_config" }
|
||||
anyhow.workspace = true
|
||||
base64.workspace = true
|
||||
chrono = { workspace = true, features = ["serde"] }
|
||||
rand.workspace = true
|
||||
reqwest.workspace = true
|
||||
serde.workspace = true
|
||||
schemars = "0.8"
|
||||
serde = { workspace = true, features = ["derive"] }
|
||||
serde_json.workspace = true
|
||||
sha2 = "0.10"
|
||||
url.workspace = true
|
||||
|
||||
@@ -1,4 +1,11 @@
|
||||
#[derive(Debug, Clone)]
|
||||
use harmony_config::Config;
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Operator web-frontend auth config. Loaded via
|
||||
/// [`harmony_config::ConfigClient`] (env → OpenBao), so the operator
|
||||
/// reads one typed value instead of a fistful of `FLEET_AUTH_*` env vars.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Config)]
|
||||
pub struct ZitadelAuthConfig {
|
||||
pub zitadel_base: String,
|
||||
pub base_url: String,
|
||||
@@ -33,43 +40,27 @@ impl ZitadelAuthConfig {
|
||||
}
|
||||
}
|
||||
|
||||
pub const ZITADEL_BASE_ENV: &str = "FLEET_AUTH_ZITADEL_BASE";
|
||||
pub const BASE_URL_ENV: &str = "BASE_URL";
|
||||
pub const CLIENT_ID_ENV: &str = "FLEET_AUTH_CLIENT_ID";
|
||||
pub const SCOPE_ENV: &str = "FLEET_AUTH_SCOPE";
|
||||
pub const TRUSTED_AUDIENCES_ENV: &str = "FLEET_AUTH_TRUSTED_AUDIENCES";
|
||||
pub const LOGOUT_REDIRECT_URI_ENV: &str = "FLEET_AUTH_LOGOUT_REDIRECT_URI";
|
||||
pub const COOKIE_KEY_ENV: &str = "FLEET_OPERATOR_COOKIE_KEY_B64";
|
||||
|
||||
pub fn config_from_env() -> ZitadelAuthConfig {
|
||||
ZitadelAuthConfig {
|
||||
zitadel_base: required_env(ZITADEL_BASE_ENV),
|
||||
base_url: required_env(BASE_URL_ENV),
|
||||
client_id: required_env(CLIENT_ID_ENV),
|
||||
scope: required_env(SCOPE_ENV),
|
||||
trusted_audiences: required_env(TRUSTED_AUDIENCES_ENV)
|
||||
.split(',')
|
||||
.map(str::to_string)
|
||||
.collect(),
|
||||
logout_redirect_uri: required_env(LOGOUT_REDIRECT_URI_ENV),
|
||||
}
|
||||
/// Operator session-cookie signing key: standard-base64 of ≥64 random
|
||||
/// bytes. Secret-class, so it resolves from OpenBao / a k8s Secret —
|
||||
/// never cleartext config. Whoever holds it can forge sessions.
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Config)]
|
||||
#[config(secret)]
|
||||
pub struct OperatorCookieKey {
|
||||
pub cookie_key_b64: String,
|
||||
}
|
||||
|
||||
#[cfg(feature = "axum")]
|
||||
pub fn cookie_key_from_env() -> axum_extra::extract::cookie::Key {
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD;
|
||||
impl OperatorCookieKey {
|
||||
pub fn key(&self) -> anyhow::Result<axum_extra::extract::cookie::Key> {
|
||||
use base64::Engine;
|
||||
use base64::engine::general_purpose::STANDARD;
|
||||
|
||||
let encoded = required_env(COOKIE_KEY_ENV);
|
||||
let bytes = STANDARD
|
||||
.decode(encoded.trim())
|
||||
.unwrap_or_else(|e| panic!("{COOKIE_KEY_ENV} must be standard base64: {e}"));
|
||||
if bytes.len() < 64 {
|
||||
panic!("{COOKIE_KEY_ENV} must decode to at least 64 bytes for private cookies");
|
||||
let bytes = STANDARD
|
||||
.decode(self.cookie_key_b64.trim())
|
||||
.map_err(|e| anyhow::anyhow!("cookie_key_b64 must be standard base64: {e}"))?;
|
||||
if bytes.len() < 64 {
|
||||
anyhow::bail!("cookie_key_b64 must decode to at least 64 bytes for private cookies");
|
||||
}
|
||||
Ok(axum_extra::extract::cookie::Key::from(&bytes))
|
||||
}
|
||||
axum_extra::extract::cookie::Key::from(&bytes)
|
||||
}
|
||||
|
||||
fn required_env(name: &str) -> String {
|
||||
std::env::var(name).unwrap_or_else(|_| panic!("missing required environment variable {name}"))
|
||||
}
|
||||
|
||||
@@ -5,12 +5,7 @@ pub mod jwks;
|
||||
pub mod login;
|
||||
pub mod session;
|
||||
|
||||
#[cfg(feature = "axum")]
|
||||
pub use config::cookie_key_from_env;
|
||||
pub use config::{
|
||||
BASE_URL_ENV, CLIENT_ID_ENV, COOKIE_KEY_ENV, LOGOUT_REDIRECT_URI_ENV, SCOPE_ENV,
|
||||
TRUSTED_AUDIENCES_ENV, ZITADEL_BASE_ENV, ZitadelAuthConfig, config_from_env,
|
||||
};
|
||||
pub use config::{OperatorCookieKey, ZitadelAuthConfig};
|
||||
|
||||
pub use jwks::JwksCache;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user