diff --git a/.sqlx/query-c7ca191faaa23b3ec5019f8c4910f666db9c6c2be22ffe563be4b7caef645bd1.json b/.sqlx/query-08e36d39f08ee7a06a9faa58990414fe16bec75c3458ee809755d2c975c487cc.json similarity index 71% rename from .sqlx/query-c7ca191faaa23b3ec5019f8c4910f666db9c6c2be22ffe563be4b7caef645bd1.json rename to .sqlx/query-08e36d39f08ee7a06a9faa58990414fe16bec75c3458ee809755d2c975c487cc.json index cbe2716e..74ae7e53 100644 --- a/.sqlx/query-c7ca191faaa23b3ec5019f8c4910f666db9c6c2be22ffe563be4b7caef645bd1.json +++ b/.sqlx/query-08e36d39f08ee7a06a9faa58990414fe16bec75c3458ee809755d2c975c487cc.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "SELECT data as \"data!: Vec\" FROM physical_hosts WHERE id = ? ORDER BY version_id DESC LIMIT 1", + "query": "SELECT data as \"data!: Vec\" FROM physical_hosts WHERE id = ? ORDER BY rowid DESC LIMIT 1", "describe": { "columns": [ { @@ -16,5 +16,5 @@ false ] }, - "hash": "c7ca191faaa23b3ec5019f8c4910f666db9c6c2be22ffe563be4b7caef645bd1" + "hash": "08e36d39f08ee7a06a9faa58990414fe16bec75c3458ee809755d2c975c487cc" } diff --git a/.sqlx/query-934035c7ca6e064815393e4e049a7934b0a7fac04a4fe4b2a354f0443d630990.json b/.sqlx/query-36273d65f86cd9070b278a3c3da4ea16e21311f4ba579522282499ac5c62bd5e.json similarity index 77% rename from .sqlx/query-934035c7ca6e064815393e4e049a7934b0a7fac04a4fe4b2a354f0443d630990.json rename to .sqlx/query-36273d65f86cd9070b278a3c3da4ea16e21311f4ba579522282499ac5c62bd5e.json index 2d1f1bab..9a5582b3 100644 --- a/.sqlx/query-934035c7ca6e064815393e4e049a7934b0a7fac04a4fe4b2a354f0443d630990.json +++ b/.sqlx/query-36273d65f86cd9070b278a3c3da4ea16e21311f4ba579522282499ac5c62bd5e.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "SELECT id, version_id, data as \"data: Json\" FROM physical_hosts WHERE id = ? ORDER BY version_id DESC LIMIT 1", + "query": "SELECT id, version_id, data as \"data: Json\" FROM physical_hosts WHERE id = ? ORDER BY rowid DESC LIMIT 1", "describe": { "columns": [ { @@ -28,5 +28,5 @@ false ] }, - "hash": "934035c7ca6e064815393e4e049a7934b0a7fac04a4fe4b2a354f0443d630990" + "hash": "36273d65f86cd9070b278a3c3da4ea16e21311f4ba579522282499ac5c62bd5e" } diff --git a/.sqlx/query-3caebb777887adea0fccc71108b235e49354086857390290753bc22d90d287a0.json b/.sqlx/query-3caebb777887adea0fccc71108b235e49354086857390290753bc22d90d287a0.json new file mode 100644 index 00000000..89e0ae05 --- /dev/null +++ b/.sqlx/query-3caebb777887adea0fccc71108b235e49354086857390290753bc22d90d287a0.json @@ -0,0 +1,32 @@ +{ + "db_name": "SQLite", + "query": "SELECT host_id, installation_device, network_config FROM host_role_mapping WHERE role = ? ORDER BY id DESC LIMIT 1", + "describe": { + "columns": [ + { + "name": "host_id", + "ordinal": 0, + "type_info": "Text" + }, + { + "name": "installation_device", + "ordinal": 1, + "type_info": "Text" + }, + { + "name": "network_config", + "ordinal": 2, + "type_info": "Text" + } + ], + "parameters": { + "Right": 1 + }, + "nullable": [ + false, + true, + true + ] + }, + "hash": "3caebb777887adea0fccc71108b235e49354086857390290753bc22d90d287a0" +} diff --git a/.sqlx/query-8d247918eca10a88b784ee353db090c94a222115c543231f2140cba27bd0f067.json b/.sqlx/query-8dfce6f15a95a44f5899e2953cfc56d8d1b34fe36615edcf3fed08dd81aefa7a.json similarity index 72% rename from .sqlx/query-8d247918eca10a88b784ee353db090c94a222115c543231f2140cba27bd0f067.json rename to .sqlx/query-8dfce6f15a95a44f5899e2953cfc56d8d1b34fe36615edcf3fed08dd81aefa7a.json index ba998bc8..575f0061 100644 --- a/.sqlx/query-8d247918eca10a88b784ee353db090c94a222115c543231f2140cba27bd0f067.json +++ b/.sqlx/query-8dfce6f15a95a44f5899e2953cfc56d8d1b34fe36615edcf3fed08dd81aefa7a.json @@ -1,6 +1,6 @@ { "db_name": "SQLite", - "query": "\n SELECT\n p1.id,\n p1.version_id,\n p1.data as \"data: Json\"\n FROM\n physical_hosts p1\n INNER JOIN (\n SELECT\n id,\n MAX(version_id) AS max_version\n FROM\n physical_hosts\n GROUP BY\n id\n ) p2 ON p1.id = p2.id AND p1.version_id = p2.max_version\n ", + "query": "\n SELECT\n p1.id,\n p1.version_id,\n p1.data as \"data: Json\"\n FROM\n physical_hosts p1\n INNER JOIN (\n SELECT\n id,\n MAX(rowid) AS max_rowid\n FROM\n physical_hosts\n GROUP BY\n id\n ) p2 ON p1.id = p2.id AND p1.rowid = p2.max_rowid\n ", "describe": { "columns": [ { @@ -28,5 +28,5 @@ false ] }, - "hash": "8d247918eca10a88b784ee353db090c94a222115c543231f2140cba27bd0f067" + "hash": "8dfce6f15a95a44f5899e2953cfc56d8d1b34fe36615edcf3fed08dd81aefa7a" } diff --git a/Cargo.lock b/Cargo.lock index 86a77a4b..97904ddf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3612,6 +3612,7 @@ dependencies = [ "rand 0.9.2", "reqwest 0.11.27", "russh", + "russh-keys", "rust-ipmi", "schemars 0.8.22", "semver", @@ -5392,6 +5393,22 @@ dependencies = [ "web-time", ] +[[package]] +name = "okd_add_node" +version = "0.1.0" +dependencies = [ + "cidr", + "harmony", + "harmony_cli", + "harmony_macros", + "harmony_secret", + "harmony_secret_derive", + "harmony_types", + "schemars 0.8.22", + "serde", + "tokio", +] + [[package]] name = "once_cell" version = "1.21.3" @@ -7738,20 +7755,12 @@ dependencies = [ name = "sttest" version = "0.1.0" dependencies = [ - "brocade", "cidr", - "env_logger", "harmony", "harmony_cli", "harmony_macros", "harmony_secret", - "harmony_secret_derive", - "harmony_types", - "log", - "schemars 0.8.22", - "serde", "tokio", - "url", ] [[package]] diff --git a/examples/okd_add_node/Cargo.toml b/examples/okd_add_node/Cargo.toml new file mode 100644 index 00000000..8b063c00 --- /dev/null +++ b/examples/okd_add_node/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "okd_add_node" +edition = "2024" +version.workspace = true +readme.workspace = true +license.workspace = true +publish = false + +[dependencies] +harmony = { path = "../../harmony" } +harmony_cli = { path = "../../harmony_cli" } +harmony_macros = { path = "../../harmony_macros" } +harmony_types = { path = "../../harmony_types" } +harmony_secret = { path = "../../harmony_secret" } +harmony_secret_derive = { path = "../../harmony_secret_derive" } +tokio.workspace = true +cidr.workspace = true +serde = { workspace = true } +schemars = "0.8" diff --git a/examples/okd_add_node/env.sh b/examples/okd_add_node/env.sh new file mode 100644 index 00000000..fe84034c --- /dev/null +++ b/examples/okd_add_node/env.sh @@ -0,0 +1,9 @@ +export HARMONY_SECRET_NAMESPACE=okd-add-node +export HARMONY_SECRET_STORE=file +export HARMONY_DATABASE_URL=sqlite://okd_add_node.sqlite +export RUST_LOG=harmony=debug + +# Required: the kubeconfig for the running OKD cluster. OKDAddNodeScore reads +# the existing nodes of the target role and pulls the user-data-managed secret +# from `openshift-machine-api` through this. +# export KUBECONFIG=/path/to/cluster/kubeconfig diff --git a/examples/okd_add_node/src/main.rs b/examples/okd_add_node/src/main.rs new file mode 100644 index 00000000..d6e194c4 --- /dev/null +++ b/examples/okd_add_node/src/main.rs @@ -0,0 +1,51 @@ +//! Runnable example for [`OKDAddNodeScore`]. +//! +//! Source `env.sh` first (sets `HARMONY_DATABASE_URL` / `KUBECONFIG`), then: +//! +//! ```bash +//! cargo run -p okd_add_node +//! ``` +//! +//! The score will: +//! 1. read existing nodes of the target role from the live cluster, +//! 2. log the auto-derived hostname + IP, +//! 3. wait for the operator to PXE-boot the new machine, +//! 4. walk them through disk / bond / blacklist selection, +//! 5. publish the ignition + per-MAC iPXE + DHCP reservation on the firewall, +//! 6. ask the operator to power-cycle. + +mod topology; + +use harmony::{ + modules::{ + inventory::HarmonyDiscoveryStrategy, + okd::add_node::{AddNodeRole, OKDAddNodeScore}, + }, + score::Score, + topology::HAClusterTopology, +}; +use harmony_macros::cidrv4; + +use crate::topology::{get_inventory, get_topology}; + +#[tokio::main] +async fn main() { + harmony_cli::cli_logger::init(); + + let inventory = get_inventory(); + let topology = get_topology().await; + + let add_node = OKDAddNodeScore { + role: AddNodeRole::ControlPlane, + discovery_strategy: HarmonyDiscoveryStrategy::SUBNET { + cidr: cidrv4!("192.168.40.0/24"), + port: 25000, + }, + }; + + let scores: Vec>> = vec![Box::new(add_node)]; + + harmony_cli::run(inventory, topology, scores, None) + .await + .unwrap(); +} diff --git a/examples/okd_add_node/src/topology.rs b/examples/okd_add_node/src/topology.rs new file mode 100644 index 00000000..28ed6647 --- /dev/null +++ b/examples/okd_add_node/src/topology.rs @@ -0,0 +1,101 @@ +//! Topology for the add-node example. Mirrors `examples/sttest/src/topology.rs` +//! shape but is deliberately standalone so this example can be copied into a +//! fresh cluster without dragging the whole sttest crate along. + +use cidr::Ipv4Cidr; +use harmony::{ + hardware::{Location, SwitchGroup}, + infra::{brocade::UnmanagedSwitch, opnsense::OPNSenseManagementInterface}, + inventory::Inventory, + topology::{HAClusterTopology, LogicalHost, UnmanagedRouter}, +}; +use harmony_macros::{ip, ipv4}; +use harmony_secret::{Secret, SecretManager}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; +use std::{ + net::IpAddr, + sync::{Arc, OnceLock}, +}; + +#[derive(Secret, Serialize, Deserialize, JsonSchema, Debug, PartialEq)] +struct OkdAddNodeFirewallConfig { + username: String, + password: String, +} + +/// Build an HAClusterTopology pointing at an *already-running* cluster. +/// +/// `kubeconfig` is `None` because `K8sclient::k8s_client` falls back to +/// `K8sClient::try_default()`, which picks up the `KUBECONFIG` env var or the +/// standard `~/.kube/config` path. Edit the IPs/domain to match your setup. +pub async fn get_topology() -> HAClusterTopology { + let firewall = LogicalHost { + ip: ip!("192.168.40.1"), + name: String::from("fw0"), + }; + + let switch_client = Arc::new( + UnmanagedSwitch::init() + .await + .expect("Failed to connect to switch"), + ); + + let config = SecretManager::get_or_prompt::() + .await + .unwrap(); + let api_creds = harmony::config::secret::OPNSenseApiCredentials { + key: config.username.clone(), + secret: config.password.clone(), + }; + let ssh_creds = harmony::config::secret::OPNSenseFirewallCredentials { + username: config.username, + password: config.password, + }; + + let opnsense = Arc::new( + harmony::infra::opnsense::OPNSenseFirewall::new(firewall, None, &api_creds, &ssh_creds) + .await, + ); + let lan_subnet = ipv4!("192.168.40.0"); + let gateway_ipv4 = ipv4!("192.168.40.1"); + let gateway_ip = IpAddr::V4(gateway_ipv4); + + HAClusterTopology { + kubeconfig: None, + domain_name: "addnode.harmony.mcd".to_string(), + router: Arc::new(UnmanagedRouter::new( + gateway_ip, + Ipv4Cidr::new(lan_subnet, 24).unwrap(), + )), + load_balancer: opnsense.clone(), + firewall: opnsense.clone(), + tftp_server: opnsense.clone(), + http_server: opnsense.clone(), + dhcp_server: opnsense.clone(), + dns_server: opnsense.clone(), + // control_plane / workers / bootstrap_host are only consulted by the + // *bootstrap* flow. OKDAddNodeScore reads existing nodes straight + // from the live cluster via kube-rs, so these can stay empty here. + control_plane: vec![], + workers: vec![], + bootstrap_host: LogicalHost { + ip: ip!("192.168.40.10"), + name: "bootstrap".to_string(), + }, + node_exporter: opnsense.clone(), + switch_client: switch_client.clone(), + network_manager: OnceLock::new(), + } +} + +pub fn get_inventory() -> Inventory { + Inventory { + location: Location::new("Add-node example".to_string(), "Anywhere".to_string()), + switch: SwitchGroup::from([]), + firewall_mgmt: Box::new(OPNSenseManagementInterface::new()), + storage_host: vec![], + worker_host: vec![], + control_plane_host: vec![], + } +} diff --git a/examples/sttest/Cargo.toml b/examples/sttest/Cargo.toml index 618039af..ade085f9 100644 --- a/examples/sttest/Cargo.toml +++ b/examples/sttest/Cargo.toml @@ -9,15 +9,7 @@ publish = false [dependencies] harmony = { path = "../../harmony" } harmony_cli = { path = "../../harmony_cli" } -harmony_types = { path = "../../harmony_types" } -cidr = { workspace = true } -tokio = { workspace = true } harmony_macros = { path = "../../harmony_macros" } harmony_secret = { path = "../../harmony_secret" } -harmony_secret_derive = { path = "../../harmony_secret_derive" } -log = { workspace = true } -env_logger = { workspace = true } -url = { workspace = true } -serde = { workspace = true } -brocade = { path = "../../brocade" } -schemars = "0.8" +cidr = { workspace = true } +tokio = { workspace = true } diff --git a/examples/sttest/env.sh b/examples/sttest/env.sh index 4a3c24ee..d8ced3e4 100644 --- a/examples/sttest/env.sh +++ b/examples/sttest/env.sh @@ -1,4 +1,19 @@ export HARMONY_SECRET_NAMESPACE=sttest0 export HARMONY_SECRET_STORE=file export HARMONY_DATABASE_URL=sqlite://harmony_sttest0.sqlite -export RUST_LOG=info +export RUST_LOG=harmony=debug + +# Two OPNsense credential pairs are required (both are Harmony Secrets and +# will be prompted for interactively on first run; the env vars below are +# here as a reminder for unattended runs): +# - OPNSenseFirewallCredentials (SSH username/password) +# - OPNSenseApiCredentials (API key/secret from System > Access > +# Users > API Keys) +# +# export OPNSENSE_PRIMARY_API_KEY="..." +# export OPNSENSE_PRIMARY_API_SECRET="..." + +# After the install pipeline finishes, OKDAddNodeScore needs a reachable +# cluster. Point KUBECONFIG at the kubeconfig that `openshift-install` +# wrote during bootstrap, e.g.: +# export KUBECONFIG=./data/okd/installation_files_sttest0/auth/kubeconfig diff --git a/examples/sttest/src/main.rs b/examples/sttest/src/main.rs index ba049cfa..c5f7d02a 100644 --- a/examples/sttest/src/main.rs +++ b/examples/sttest/src/main.rs @@ -6,22 +6,35 @@ use harmony::{ data::{FileContent, FilePath}, modules::{ inventory::HarmonyDiscoveryStrategy, - okd::{installation::OKDInstallationPipeline, ipxe::OKDIpxeScore}, + okd::{ + add_node::{AddNodeRole, OKDAddNodeScore}, + installation::OKDInstallationPipeline, + ipxe::OKDIpxeScore, + }, }, score::Score, topology::HAClusterTopology, }; +use harmony_macros::cidrv4; use harmony_secret::SecretManager; #[tokio::main] async fn main() { - // env_logger::init(); + harmony_cli::cli_logger::init(); let inventory = get_inventory(); let topology = get_topology().await; let ssh_key = SecretManager::get_or_prompt::().await.unwrap(); + // Discovery runs as a CIDR scan across the sttest LAN on the + // harmony_inventory_agent's default port. Shared between the install + // pipeline and the trailing add-node score so they probe the same way. + let discovery_strategy = HarmonyDiscoveryStrategy::SUBNET { + cidr: cidrv4!("192.168.40.0/24"), + port: 25000, + }; + let mut scores: Vec>> = vec![Box::new(OKDIpxeScore { kickstart_filename: "inventory.kickstart".to_string(), harmony_inventory_agent: "harmony_inventory_agent".to_string(), @@ -31,9 +44,17 @@ async fn main() { }, })]; - // let mut scores: Vec>> = vec![]; - scores - .append(&mut OKDInstallationPipeline::get_all_scores(HarmonyDiscoveryStrategy::MDNS).await); + scores.append(&mut OKDInstallationPipeline::get_all_scores(discovery_strategy.clone()).await); + + // After the cluster is up, exercise the Day-2 add-node flow by adding a + // fourth control plane (cp3). This only publishes the ignition + byMAC + + // DHCP artifacts — etcd membership, API serving-cert rotation, and CSR + // approval for the new CP are still manual follow-ups per the score's + // success footer. + scores.push(Box::new(OKDAddNodeScore { + role: AddNodeRole::ControlPlane, + discovery_strategy, + })); harmony_cli::run(inventory, topology, scores, None) .await diff --git a/examples/sttest/src/topology.rs b/examples/sttest/src/topology.rs index bf454813..ff3eebca 100644 --- a/examples/sttest/src/topology.rs +++ b/examples/sttest/src/topology.rs @@ -1,57 +1,61 @@ use cidr::Ipv4Cidr; use harmony::{ + config::secret::{OPNSenseApiCredentials, OPNSenseFirewallCredentials}, hardware::{Location, SwitchGroup}, infra::{brocade::UnmanagedSwitch, opnsense::OPNSenseManagementInterface}, inventory::Inventory, topology::{HAClusterTopology, LogicalHost, UnmanagedRouter}, }; use harmony_macros::{ip, ipv4}; -use harmony_secret::{Secret, SecretManager}; -use schemars::JsonSchema; -use serde::{Deserialize, Serialize}; +use harmony_secret::SecretManager; use std::{ net::IpAddr, sync::{Arc, OnceLock}, }; -#[derive(Secret, Serialize, Deserialize, JsonSchema, Debug, PartialEq)] -struct OPNSenseFirewallConfig { - username: String, - password: String, -} - -pub async fn get_topology() -> HAClusterTopology { - let firewall = harmony::topology::LogicalHost { +/// Build the firewall instance on its own so other scores (e.g. an OPNsense +/// upgrade score) can get an `Arc` without having to rebuild +/// the whole topology. Mirrors the shape used in `affilium2/src/topology.rs`. +pub async fn get_opnsense() -> Arc { + let firewall = LogicalHost { ip: ip!("192.168.40.1"), name: String::from("fw0"), }; - let switch_client = UnmanagedSwitch::init() - .await - .expect("Failed to connect to switch"); - - let switch_client = Arc::new(switch_client); - - let config = SecretManager::get_or_prompt::() + let ssh_creds = SecretManager::get_or_prompt::() + .await + .unwrap(); + let api_credentials = SecretManager::get_or_prompt::() .await .unwrap(); - let api_creds = harmony::config::secret::OPNSenseApiCredentials { - key: config.username.clone(), - secret: config.password.clone(), - }; - let ssh_creds = harmony::config::secret::OPNSenseFirewallCredentials { - username: config.username, - password: config.password, - }; - let opnsense = Arc::new( - harmony::infra::opnsense::OPNSenseFirewall::new(firewall, None, &api_creds, &ssh_creds) - .await, + // API port is 9443 because the OPNsense web GUI is moved off 443 so + // HAProxy can terminate on the standard port. SSH port stays default. + Arc::new( + harmony::infra::opnsense::OPNSenseFirewall::with_api_port( + firewall, + None, + 8443, + &api_credentials, + &ssh_creds, + ) + .await, + ) +} + +pub async fn get_topology() -> HAClusterTopology { + let switch_client = Arc::new( + UnmanagedSwitch::init() + .await + .expect("Failed to connect to switch"), ); + + let opnsense = get_opnsense().await; + let lan_subnet = ipv4!("192.168.40.0"); let gateway_ipv4 = ipv4!("192.168.40.1"); let gateway_ip = IpAddr::V4(gateway_ipv4); - harmony::topology::HAClusterTopology { + HAClusterTopology { kubeconfig: None, domain_name: "sttest0.harmony.mcd".to_string(), router: Arc::new(UnmanagedRouter::new( diff --git a/harmony-k8s/src/csr.rs b/harmony-k8s/src/csr.rs new file mode 100644 index 00000000..f2812b70 --- /dev/null +++ b/harmony-k8s/src/csr.rs @@ -0,0 +1,90 @@ +//! CSR approval helpers used by Day-2 node-add flows. +//! +//! Kubelets generate two CertificateSigningRequests when a node first joins +//! a cluster: +//! +//! - `kubernetes.io/kube-apiserver-client-kubelet` — the bootstrap client +//! cert, used by the kubelet to authenticate to the API server. +//! - `kubernetes.io/kubelet-serving` — the kubelet's own TLS serving cert. +//! +//! On OKD, `cluster-machine-approver` auto-approves the client CSR for +//! workers that belong to a MachineSet; nothing auto-approves the serving +//! CSR and nothing auto-approves anything for control-plane nodes added +//! Day-2. These helpers are the minimum needed to let a Harmony score +//! poll for and approve pending CSRs on behalf of the operator. + +use k8s_openapi::api::certificates::v1::{ + CertificateSigningRequest, CertificateSigningRequestCondition, CertificateSigningRequestStatus, +}; +use kube::{ + Error, + api::{Api, ListParams, Patch, PatchParams}, +}; +use log::debug; + +use crate::client::K8sClient; + +/// The `kubernetes.io/kubelet-serving` signer. +pub const KUBELET_SERVING_SIGNER: &str = "kubernetes.io/kubelet-serving"; +/// The `kubernetes.io/kube-apiserver-client-kubelet` signer. +pub const KUBE_APISERVER_CLIENT_KUBELET_SIGNER: &str = + "kubernetes.io/kube-apiserver-client-kubelet"; + +impl K8sClient { + /// List CSRs that are still pending (no `Approved` or `Denied` condition + /// yet) **and** whose signer is one of the two kubelet signers. CSRs + /// issued to other signers (controller-manager, front-proxy, custom + /// signers) are never returned. + pub async fn list_pending_kubelet_csrs(&self) -> Result, Error> { + let api: Api = Api::all(self.client.clone()); + let list = api.list(&ListParams::default()).await?; + let out = list + .items + .into_iter() + .filter(|csr| is_kubelet_signer(&csr.spec.signer_name)) + .filter(is_pending) + .collect(); + Ok(out) + } + + /// Approve the named CSR by PATCHing its `/approval` subresource to add + /// an `Approved=True` condition with the given `reason` and `message`. + /// + /// No-op on an already-approved CSR: the API server accepts repeated + /// approvals as long as they carry the same `type` and `status`. + pub async fn approve_csr(&self, name: &str, reason: &str, message: &str) -> Result<(), Error> { + let api: Api = Api::all(self.client.clone()); + let status = CertificateSigningRequestStatus { + certificate: None, + conditions: Some(vec![CertificateSigningRequestCondition { + type_: "Approved".to_string(), + status: "True".to_string(), + reason: Some(reason.to_string()), + message: Some(message.to_string()), + last_update_time: None, + last_transition_time: None, + }]), + }; + let patch = Patch::Merge(serde_json::json!({ "status": status })); + debug!("Patching CSR {name} /approval: reason={reason}"); + api.patch_approval(name, &PatchParams::default(), &patch) + .await?; + Ok(()) + } +} + +fn is_kubelet_signer(signer: &str) -> bool { + signer == KUBELET_SERVING_SIGNER || signer == KUBE_APISERVER_CLIENT_KUBELET_SIGNER +} + +fn is_pending(csr: &CertificateSigningRequest) -> bool { + let Some(status) = csr.status.as_ref() else { + return true; + }; + let Some(conds) = status.conditions.as_ref() else { + return true; + }; + !conds + .iter() + .any(|c| c.type_ == "Approved" || c.type_ == "Denied") +} diff --git a/harmony-k8s/src/lib.rs b/harmony-k8s/src/lib.rs index 1943540c..bc804351 100644 --- a/harmony-k8s/src/lib.rs +++ b/harmony-k8s/src/lib.rs @@ -2,6 +2,7 @@ pub mod apply; pub mod bundle; pub mod client; pub mod config; +pub mod csr; pub mod discovery; pub mod domain; pub mod helper; diff --git a/harmony/Cargo.toml b/harmony/Cargo.toml index 7caea1ac..19aa892b 100644 --- a/harmony/Cargo.toml +++ b/harmony/Cargo.toml @@ -16,7 +16,8 @@ reqwest = { version = "0.11", features = [ "json", "rustls-tls", ], default-features = false } -russh = "0.45.0" +russh = { workspace = true } +russh-keys = { workspace = true } rust-ipmi = "0.1.1" semver = "1.0.23" serde.workspace = true diff --git a/harmony/src/domain/inventory/repository.rs b/harmony/src/domain/inventory/repository.rs index 5a83ad83..6c5d042f 100644 --- a/harmony/src/domain/inventory/repository.rs +++ b/harmony/src/domain/inventory/repository.rs @@ -53,4 +53,14 @@ pub trait InventoryRepository: Send + Sync + 'static { /// Return the current role mapping for a host, if any. Used at discovery /// time to ask the operator whether to overwrite or cancel. async fn get_role_mapping(&self, host_id: &Id) -> Result, RepoError>; + + /// Return the most recently saved role mapping for `role`, along with + /// its `PhysicalHost`. Used by Day-2 flows (e.g. OKDAddNodeScore) that + /// kick off discovery and then need to identify the host the operator + /// just picked — picking "the newest row" is robust whether the row is + /// brand-new or replaces an earlier mapping for the same host. + async fn get_latest_host_for_role( + &self, + role: &HostRole, + ) -> Result, RepoError>; } diff --git a/harmony/src/domain/score.rs b/harmony/src/domain/score.rs index 53ac8767..f3e527e8 100644 --- a/harmony/src/domain/score.rs +++ b/harmony/src/domain/score.rs @@ -44,7 +44,7 @@ pub trait Score: topology: topology.name().into(), interpret: interpret_name.clone(), score: score_name.clone(), - message: format!("{} running...", interpret_name), + message: format!("[{}] {} starting...", score_name, interpret_name), }) .unwrap(); diff --git a/harmony/src/infra/inventory/sqlite.rs b/harmony/src/infra/inventory/sqlite.rs index 0cff734e..14ea60aa 100644 --- a/harmony/src/infra/inventory/sqlite.rs +++ b/harmony/src/infra/inventory/sqlite.rs @@ -58,8 +58,14 @@ impl InventoryRepository for SqliteInventoryRepository { // discovery is naturally a polling activity (mDNS is continuous, CIDR scans get // re-run) and we don't want an unbounded pile of identical version rows. Real // changes still produce a new version row (audit trail for free). + // + // We order by SQLite's `rowid` (monotonic integer assigned on each INSERT) + // rather than `version_id`, because `version_id` is `Id::default()` which + // formats `{hex_timestamp}_{rand}` with a variable-length hex — so lex + // ordering is NOT time-monotonic (`'_'` (0x5F) > any digit breaks the + // ranking when the masked clock crosses a hex-width boundary). let latest = sqlx::query!( - r#"SELECT data as "data!: Vec" FROM physical_hosts WHERE id = ? ORDER BY version_id DESC LIMIT 1"#, + r#"SELECT data as "data!: Vec" FROM physical_hosts WHERE id = ? ORDER BY rowid DESC LIMIT 1"#, host_id ) .fetch_optional(&self.pool) @@ -86,9 +92,12 @@ impl InventoryRepository for SqliteInventoryRepository { } async fn get_latest_by_id(&self, host_id: &str) -> Result, RepoError> { + // `rowid` (implicit INTEGER monotonically increasing on every INSERT) + // is the source of truth for "most recently saved" here. `version_id` + // can't carry that role reliably — see the note in `save()`. let row = sqlx::query_as!( DbHost, - r#"SELECT id, version_id, data as "data: Json" FROM physical_hosts WHERE id = ? ORDER BY version_id DESC LIMIT 1"#, + r#"SELECT id, version_id, data as "data: Json" FROM physical_hosts WHERE id = ? ORDER BY rowid DESC LIMIT 1"#, host_id ) .fetch_optional(&self.pool) @@ -98,6 +107,11 @@ impl InventoryRepository for SqliteInventoryRepository { } async fn get_all_hosts(&self) -> Result, RepoError> { + // Pick the latest row per host_id by `MAX(rowid)` rather than + // `MAX(version_id)` — same reason as `save()`: version_id's format + // isn't lexicographically time-monotonic, so a stale row can + // outrank a fresh one and the user ends up with an old IP in the + // Select. let db_hosts = sqlx::query_as!( DbHost, r#" @@ -110,12 +124,12 @@ impl InventoryRepository for SqliteInventoryRepository { INNER JOIN ( SELECT id, - MAX(version_id) AS max_version + MAX(rowid) AS max_rowid FROM physical_hosts GROUP BY id - ) p2 ON p1.id = p2.id AND p1.version_id = p2.max_version + ) p2 ON p1.id = p2.id AND p1.rowid = p2.max_rowid "# ) .fetch_all(&self.pool) @@ -210,6 +224,51 @@ impl InventoryRepository for SqliteInventoryRepository { })) } + async fn get_latest_host_for_role( + &self, + role: &HostRole, + ) -> Result, RepoError> { + struct Row { + host_id: String, + installation_device: Option, + network_config: Option, + } + + let role_str = format!("{:?}", role); + + let row = sqlx::query_as!( + Row, + "SELECT host_id, installation_device, network_config FROM host_role_mapping WHERE role = ? ORDER BY id DESC LIMIT 1", + role_str + ) + .fetch_optional(&self.pool) + .await?; + + let Some(row) = row else { return Ok(None) }; + + let physical_host = match self.get_latest_by_id(&row.host_id).await? { + Some(host) => host, + None => { + return Err(RepoError::QueryFailed(format!( + "Found a role mapping for host_id '{}', but the host does not exist in the physical_hosts table.", + row.host_id + ))); + } + }; + + let network_config = match row.network_config.as_deref() { + Some(json) => { + serde_json::from_str(json).map_err(|e| RepoError::Deserialization(e.to_string()))? + } + None => NetworkConfig::default(), + }; + let host_config = HostConfig { + installation_device: row.installation_device, + network_config, + }; + Ok(Some((physical_host, host_config))) + } + async fn get_hosts_for_role( &self, role: &HostRole, diff --git a/harmony/src/infra/mod.rs b/harmony/src/infra/mod.rs index b996afb5..b0fcee55 100644 --- a/harmony/src/infra/mod.rs +++ b/harmony/src/infra/mod.rs @@ -5,5 +5,7 @@ pub mod intel_amt; pub mod inventory; pub mod kube; pub mod network_manager; +pub(crate) mod networkmanager_cfg; pub mod opnsense; mod sqlx; +pub mod ssh; diff --git a/harmony/src/infra/network_manager.rs b/harmony/src/infra/network_manager.rs index e0361052..8e51609c 100644 --- a/harmony/src/infra/network_manager.rs +++ b/harmony/src/infra/network_manager.rs @@ -3,10 +3,9 @@ use std::{ sync::Arc, }; -use askama::Template; use async_trait::async_trait; use harmony_k8s::{DrainOptions, K8sClient, NodeFile}; -use harmony_types::id::Id; +use harmony_types::{firewall::LaggProtocol, id::Id}; use k8s_openapi::api::core::v1::Node; use kube::{ ResourceExt, @@ -15,65 +14,11 @@ use kube::{ use log::{debug, info, warn}; use crate::{ + infra::networkmanager_cfg::{render_bond_master, render_bond_slave}, modules::okd::crd::nmstate, topology::{HostNetworkConfig, NetworkError, NetworkManager}, }; -/// NetworkManager bond configuration template -#[derive(Template)] -#[template( - source = r#"[connection] -id={{ bond_name }} -uuid={{ bond_uuid }} -type=bond -autoconnect-slaves=1 -interface-name={{ bond_name }} - -[bond] -lacp_rate=fast -mode=802.3ad -xmit_hash_policy=layer2 - -[ipv4] -method=auto - -[ipv6] -addr-gen-mode=default -method=auto - -[proxy] -"#, - ext = "txt" -)] -struct BondConfigTemplate { - bond_name: String, - bond_uuid: String, -} - -/// NetworkManager bond slave configuration template -#[derive(Template)] -#[template( - source = r#"[connection] -id={{ slave_id }} -uuid={{ slave_uuid }} -type=ethernet -interface-name={{ interface_name }} -master={{ bond_name }} -slave-type=bond - -[ethernet] - -[bond-port] -"#, - ext = "txt" -)] -struct BondSlaveConfigTemplate { - slave_id: String, - slave_uuid: String, - interface_name: String, - bond_name: String, -} - /// TODO document properly the non-intuitive behavior or "roll forward only" of nmstate in general /// It is documented in nmstate official doc, but worth mentionning here : /// @@ -473,19 +418,16 @@ impl OpenShiftNmStateNetworkManager { let mut files = Vec::new(); let bond_name = "bond0"; let bond_uuid = uuid::Uuid::new_v4().to_string(); - - // Generate bond master configuration - let bond_template = BondConfigTemplate { - bond_name: bond_name.to_string(), - bond_uuid: bond_uuid.clone(), - }; - - let bond_content = bond_template.render().map_err(|e| { - NetworkError::new(format!( - "Failed to render bond configuration template: {}", - e - )) - })?; + // Post-install bonding here has always been LACP; the primary-iface + // argument is only consumed by active-backup mode, so an empty string + // is fine for this call site. + let primary_iface = config + .switch_ports + .first() + .map(|p| p.interface.name.as_str()) + .unwrap_or(""); + let bond_content = + render_bond_master(bond_name, &bond_uuid, &LaggProtocol::Lacp, primary_iface); files.push(NodeFile { path: format!( @@ -496,25 +438,12 @@ impl OpenShiftNmStateNetworkManager { mode: 0o600, }); - // Generate slave configurations for each interface for switch_port in &config.switch_ports { let interface_name = &switch_port.interface.name; let slave_id = format!("{}-{}", bond_name, interface_name); let slave_uuid = uuid::Uuid::new_v4().to_string(); - - let slave_template = BondSlaveConfigTemplate { - slave_id: slave_id.clone(), - slave_uuid, - interface_name: interface_name.clone(), - bond_name: bond_name.to_string(), - }; - - let slave_content = slave_template.render().map_err(|e| { - NetworkError::new(format!( - "Failed to render slave configuration template for interface '{}': {}", - interface_name, e - )) - })?; + let slave_content = + render_bond_slave(&slave_id, &slave_uuid, interface_name, bond_name); files.push(NodeFile { path: format!( diff --git a/harmony/src/infra/networkmanager_cfg.rs b/harmony/src/infra/networkmanager_cfg.rs new file mode 100644 index 00000000..1ebb361e --- /dev/null +++ b/harmony/src/infra/networkmanager_cfg.rs @@ -0,0 +1,166 @@ +//! NetworkManager `.nmconnection` content builders. +//! +//! Two call sites use these today: the post-install bond flow in +//! `OpenShiftNmStateNetworkManager` (always LACP, hardcoded bond name +//! `bond0`), and the Day-2 add-node flow which injects the files into an +//! ignition config derived from the operator's discovery-time choices. Keeping +//! them here means both paths produce bit-identical output for the same input. + +use harmony_types::firewall::LaggProtocol; + +/// NetworkManager bond `mode=` value for a `LaggProtocol`. +pub(crate) fn nm_bond_mode(proto: &LaggProtocol) -> &'static str { + match proto { + LaggProtocol::Lacp => "802.3ad", + // active-backup is the safe default for both "failover" intent and + // "no protocol selected" — nmcli refuses `mode=none` on a bond. + LaggProtocol::Failover | LaggProtocol::None => "active-backup", + LaggProtocol::LoadBalance => "balance-xor", + LaggProtocol::RoundRobin => "balance-rr", + } +} + +/// Render the master `.nmconnection` file for a bond. +/// +/// `primary_iface` only matters in active-backup mode (`LaggProtocol::Failover` +/// or `None`). Other modes ignore it; pass any bond member. +pub(crate) fn render_bond_master( + bond_name: &str, + bond_uuid: &str, + proto: &LaggProtocol, + primary_iface: &str, +) -> String { + let bond_section = match proto { + // Preserves the exact LACP block shipped before this module existed; + // changing the attribute order would produce a textually different + // ignition even though the semantics are identical. + LaggProtocol::Lacp => "lacp_rate=fast\nmode=802.3ad\nxmit_hash_policy=layer2\n".to_string(), + LaggProtocol::Failover | LaggProtocol::None => { + format!("mode=active-backup\nmiimon=100\nprimary={primary_iface}\n") + } + LaggProtocol::LoadBalance => { + "mode=balance-xor\nmiimon=100\nxmit_hash_policy=layer2\n".to_string() + } + LaggProtocol::RoundRobin => "mode=balance-rr\nmiimon=100\n".to_string(), + }; + + format!( + "[connection]\n\ + id={bond_name}\n\ + uuid={bond_uuid}\n\ + type=bond\n\ + autoconnect-slaves=1\n\ + interface-name={bond_name}\n\ + \n\ + [bond]\n\ + {bond_section}\n\ + [ipv4]\n\ + method=auto\n\ + \n\ + [ipv6]\n\ + addr-gen-mode=default\n\ + method=auto\n\ + \n\ + [proxy]\n" + ) +} + +/// Render a `.nmconnection` file for a bond slave (one per bonded interface). +pub(crate) fn render_bond_slave( + slave_id: &str, + slave_uuid: &str, + interface_name: &str, + bond_name: &str, +) -> String { + format!( + "[connection]\n\ + id={slave_id}\n\ + uuid={slave_uuid}\n\ + type=ethernet\n\ + interface-name={interface_name}\n\ + master={bond_name}\n\ + slave-type=bond\n\ + \n\ + [ethernet]\n\ + \n\ + [bond-port]\n" + ) +} + +/// Render a `.nmconnection` file that blacklists an interface — it exists on +/// the system but never auto-connects and carries no v4/v6 address. +pub(crate) fn render_disabled_interface(interface_name: &str) -> String { + format!( + "[connection]\n\ + id={interface_name}-disabled\n\ + type=ethernet\n\ + interface-name={interface_name}\n\ + autoconnect=false\n\ + \n\ + [ipv4]\n\ + method=disabled\n\ + \n\ + [ipv6]\n\ + method=disabled\n" + ) +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn nm_bond_mode_maps_every_variant() { + assert_eq!(nm_bond_mode(&LaggProtocol::Lacp), "802.3ad"); + assert_eq!(nm_bond_mode(&LaggProtocol::Failover), "active-backup"); + assert_eq!(nm_bond_mode(&LaggProtocol::None), "active-backup"); + assert_eq!(nm_bond_mode(&LaggProtocol::LoadBalance), "balance-xor"); + assert_eq!(nm_bond_mode(&LaggProtocol::RoundRobin), "balance-rr"); + } + + #[test] + fn bond_master_lacp_matches_legacy_output() { + let out = render_bond_master("bond0", "uuid-1", &LaggProtocol::Lacp, "enp1s0f0"); + let expected = "\ +[connection] +id=bond0 +uuid=uuid-1 +type=bond +autoconnect-slaves=1 +interface-name=bond0 + +[bond] +lacp_rate=fast +mode=802.3ad +xmit_hash_policy=layer2 + +[ipv4] +method=auto + +[ipv6] +addr-gen-mode=default +method=auto + +[proxy] +"; + assert_eq!(out, expected); + } + + #[test] + fn bond_master_active_backup_uses_primary() { + let out = render_bond_master("bond0", "u", &LaggProtocol::Failover, "enp1s0f0"); + assert!(out.contains("mode=active-backup")); + assert!(out.contains("primary=enp1s0f0")); + assert!(out.contains("miimon=100")); + } + + #[test] + fn disabled_interface_has_both_stacks_disabled() { + let out = render_disabled_interface("eno1"); + assert!(out.contains("interface-name=eno1")); + assert!(out.contains("id=eno1-disabled")); + assert!(out.contains("autoconnect=false")); + assert!(out.contains("[ipv4]\nmethod=disabled")); + assert!(out.contains("[ipv6]\nmethod=disabled")); + } +} diff --git a/harmony/src/infra/opnsense/http.rs b/harmony/src/infra/opnsense/http.rs index b6e6c0f6..bd2fb6da 100644 --- a/harmony/src/infra/opnsense/http.rs +++ b/harmony/src/infra/opnsense/http.rs @@ -122,16 +122,30 @@ impl HttpServer for OPNSenseFirewall { } async fn ensure_initialized(&self) -> Result<(), ExecutorError> { - if !self.opnsense_config.caddy().is_installed().await { - info!("Http config not available, installing os-caddy package"); - self.opnsense_config - .install_package("os-caddy") - .await - .map_err(|e| { - ExecutorError::UnexpectedError(format!("Failed to install os-caddy: {e:?}")) - })?; - } else { - info!("Http config available, assuming Caddy is already installed"); + // is_installed() now returns Result: Ok(true) on 2xx, + // Ok(false) specifically on 404, and Err(_) when we couldn't tell + // (timeout, TLS, auth, 5xx). The Err branch must **not** trigger + // an install — that's how Day-2 runs ended up trying to reinstall + // caddy after a transient API hiccup. + match self.opnsense_config.caddy().is_installed().await { + Ok(true) => { + info!("Http config available, assuming Caddy is already installed"); + } + Ok(false) => { + info!("Http config not available (404), installing os-caddy package"); + self.opnsense_config + .install_package("os-caddy") + .await + .map_err(|e| { + ExecutorError::UnexpectedError(format!("Failed to install os-caddy: {e:?}")) + })?; + } + Err(e) => { + return Err(ExecutorError::UnexpectedError(format!( + "Could not verify whether os-caddy is installed on OPNsense \ + (is the API reachable at the configured port?): {e}" + ))); + } } info!("Adding custom caddy config files"); diff --git a/harmony/src/infra/ssh.rs b/harmony/src/infra/ssh.rs new file mode 100644 index 00000000..d8a03f7f --- /dev/null +++ b/harmony/src/infra/ssh.rs @@ -0,0 +1,159 @@ +//! Run a single command over SSH. +//! +//! Minimal russh wrapper used by Day-2 flows that need to poke a remote +//! machine — most notably [`OKDAddNodeScore`] rebooting a freshly-discovered +//! host so it picks up its new iPXE config. +//! +//! Public surface: the [`run_command`] free function and the [`SshError`] +//! it returns. No trait, no state carried between calls — each invocation +//! opens a fresh connection and closes it. Credentials come in via +//! `harmony_types::ssh::SshCredentials`. +//! +//! [`OKDAddNodeScore`]: crate::modules::okd::add_node::OKDAddNodeScore + +use std::{net::IpAddr, sync::Arc, time::Duration}; + +use async_trait::async_trait; +use harmony_types::ssh::SshCredentials; +use log::{debug, trace}; +use russh::{ + Channel, ChannelMsg, + client::{self, Config, Handler, Msg}, + keys::key, +}; +use russh_keys::decode_secret_key; +use thiserror::Error; + +/// How long to wait for the initial TCP+SSH handshake before giving up. +/// The kernel's default SYN retry cascade can burn >2 minutes on a dead +/// host; for Day-2 flows across a LAN a much tighter budget is fine and +/// lets the caller react (e.g. ask the operator to power-cycle) faster. +const SSH_CONNECT_TIMEOUT: Duration = Duration::from_secs(15); + +/// Errors produced by [`run_command`]. +#[derive(Error, Debug)] +pub enum SshError { + #[error("ssh transport error: {0}")] + Transport(#[from] russh::Error), + #[error("ssh connect to {host}:{port} timed out after {}s", timeout.as_secs())] + ConnectTimeout { + host: IpAddr, + port: u16, + timeout: Duration, + }, + #[error("failed to parse private key: {0}")] + KeyParse(String), + #[error("authentication rejected for user '{0}'")] + AuthenticationRejected(String), + #[error("command exited with status {status}, output:\n{output}")] + NonZeroExit { status: u32, output: String }, + #[error("unexpected SSH channel message: {0}")] + UnexpectedMsg(String), + #[error("command output was not valid UTF-8: {0}")] + InvalidUtf8(#[from] std::string::FromUtf8Error), +} + +/// Open an SSH session to `host:port`, authenticate with `creds`, exec +/// `command`, drain its stdout+stderr, and return the combined output. +/// +/// The caller should tolerate errors for commands that sever the SSH +/// session before sending an exit status — e.g. `systemctl reboot`. Wrap +/// those in a `let _ = run_command(...).await;` (or log+ignore). +pub async fn run_command( + host: IpAddr, + port: u16, + creds: &SshCredentials, + command: &str, +) -> Result { + let config = Arc::new(Config::default()); + debug!( + "SSH connect {host}:{port} (timeout {:?})", + SSH_CONNECT_TIMEOUT + ); + let mut session = match tokio::time::timeout( + SSH_CONNECT_TIMEOUT, + client::connect(config, (host, port), ClientHandler), + ) + .await + { + Ok(res) => res?, + Err(_) => { + return Err(SshError::ConnectTimeout { + host, + port, + timeout: SSH_CONNECT_TIMEOUT, + }); + } + }; + + match creds { + SshCredentials::SshKey { + username, + private_pem, + passphrase, + } => { + let key = decode_secret_key(private_pem, passphrase.as_deref()) + .map_err(|e| SshError::KeyParse(e.to_string()))?; + let authed = session + .authenticate_publickey(username, Arc::new(key)) + .await?; + if !authed { + return Err(SshError::AuthenticationRejected(username.clone())); + } + } + SshCredentials::Password { username, password } => { + let authed = session.authenticate_password(username, password).await?; + if !authed { + return Err(SshError::AuthenticationRejected(username.clone())); + } + } + } + + let mut channel = session.channel_open_session().await?; + debug!("SSH exec: {command}"); + channel.exec(true, command).await?; + drain_channel(&mut channel).await +} + +struct ClientHandler; + +#[async_trait] +impl Handler for ClientHandler { + type Error = russh::Error; + + async fn check_server_key( + &mut self, + _server_public_key: &key::PublicKey, + ) -> Result { + // We don't pin host keys today. Day-2 flows run against hosts that + // just PXE'd, so there's no stable fingerprint to check against + // anyway — the integrity guarantee comes from the authorized_keys + // file Harmony itself seeded into the image. + Ok(true) + } +} + +async fn drain_channel(channel: &mut Channel) -> Result { + let mut buf = Vec::new(); + while let Some(msg) = channel.wait().await { + match msg { + ChannelMsg::Data { ref data } | ChannelMsg::ExtendedData { ref data, .. } => { + buf.extend_from_slice(data); + } + ChannelMsg::ExitStatus { exit_status } => { + if exit_status != 0 { + let output = String::from_utf8_lossy(&buf).into_owned(); + return Err(SshError::NonZeroExit { + status: exit_status, + output, + }); + } + } + ChannelMsg::Success | ChannelMsg::WindowAdjusted { .. } | ChannelMsg::Eof => {} + other => return Err(SshError::UnexpectedMsg(format!("{other:?}"))), + } + } + let output = String::from_utf8(buf)?; + trace!("SSH command output: {output}"); + Ok(output) +} diff --git a/harmony/src/modules/inventory/mod.rs b/harmony/src/modules/inventory/mod.rs index acfa7aca..788f7e90 100644 --- a/harmony/src/modules/inventory/mod.rs +++ b/harmony/src/modules/inventory/mod.rs @@ -309,7 +309,7 @@ impl DiscoverInventoryAgentInterpret { } } Ok(Err(e)) => { - log::info!("Error querying inventory agent on {addr}:{port} : {e}"); + log::debug!("Error querying inventory agent on {addr}:{port} : {e}"); } Err(_) => { // Timeout for this host diff --git a/harmony/src/modules/okd/add_node.rs b/harmony/src/modules/okd/add_node.rs new file mode 100644 index 00000000..a8c07345 --- /dev/null +++ b/harmony/src/modules/okd/add_node.rs @@ -0,0 +1,1210 @@ +//! Day-2 score that adds a single node (control-plane or worker) to an +//! already-running OKD cluster. +//! +//! Unlike the bootstrap flow (which runs `openshift-install` locally to +//! generate ignition), this score pulls the cluster's *live* user-data +//! secret, injects NetworkManager `.nmconnection` files derived from the +//! operator's discovery-time bond/blacklist choices, and publishes the +//! resulting stub ignition on the firewall HTTP server with a per-host +//! filename. Hostname and IP are fully auto-derived from the existing cluster +//! state — no operator prompt, no score parameter. +//! +//! Scope: publishes artifacts (ignition + iPXE + DHCP), SSH-reboots the +//! node, then polls for the kubelet's bootstrap CSRs and approves them +//! until the Node is Ready. Etcd membership is reconciled automatically +//! by `cluster-etcd-operator`; API serving-cert rotation for new +//! control-plane members is still manual. + +use std::{ + net::{IpAddr, Ipv4Addr}, + time::{Duration, Instant}, +}; + +use async_trait::async_trait; +use base64::{Engine, engine::general_purpose::STANDARD as BASE64_STANDARD}; +use derive_new::new; +use harmony_k8s::{ + K8sClient, + csr::{KUBE_APISERVER_CLIENT_KUBELET_SIGNER, KUBELET_SERVING_SIGNER}, +}; +use harmony_secret::SecretManager; +use harmony_types::{id::Id, ssh::SshCredentials}; +use k8s_openapi::api::core::v1::{Node, Secret}; +use kube::api::ListParams; +use log::{debug, info, warn}; +use serde::Serialize; +use serde_json::{Value, json}; + +use crate::{ + config::secret::SshKeyPair, + data::{FileContent, FilePath, Version}, + infra::{ + inventory::InventoryRepositoryFactory, + networkmanager_cfg::{render_bond_master, render_bond_slave, render_disabled_interface}, + ssh::run_command, + }, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::{HostRole, Inventory}, + modules::{ + dhcp::DhcpHostBindingScore, + http::{IPxeMacBootFileScore, StaticFilesHttpScore}, + inventory::{DiscoverHostForRoleScore, HarmonyDiscoveryStrategy}, + okd::templates::BootstrapIpxeTpl, + }, + score::Score, + topology::{HAClusterTopology, HostBinding, K8sclient, LogicalHost, NetworkConfig}, +}; + +/// Roles this score can add. Deliberately narrower than `HostRole` — +/// `Bootstrap` makes no sense as a Day-2 add. +#[derive(Debug, Clone, Copy, Serialize)] +pub enum AddNodeRole { + ControlPlane, + Worker, +} + +impl AddNodeRole { + fn as_host_role(self) -> HostRole { + match self { + Self::ControlPlane => HostRole::ControlPlane, + Self::Worker => HostRole::Worker, + } + } + + /// Filename prefix for ignition artifacts, matches the naming the + /// bootstrap flow already uses (`master.ign` / `worker.ign`). + fn ignition_file_prefix(self) -> &'static str { + match self { + Self::ControlPlane => "master", + Self::Worker => "worker", + } + } + + /// Secret holding the role's user-data ignition in the running cluster. + fn user_data_secret_name(self) -> &'static str { + match self { + Self::ControlPlane => "master-user-data-managed", + Self::Worker => "worker-user-data-managed", + } + } + + /// Kubernetes node-role label key (value is empty in OKD). + fn node_role_label(self) -> &'static str { + match self { + Self::ControlPlane => "node-role.kubernetes.io/master", + Self::Worker => "node-role.kubernetes.io/worker", + } + } +} + +#[derive(Debug, Clone, Serialize, new)] +pub struct OKDAddNodeScore { + pub role: AddNodeRole, + pub discovery_strategy: HarmonyDiscoveryStrategy, +} + +impl Score for OKDAddNodeScore { + fn name(&self) -> String { + format!("OKDAddNodeScore({:?})", self.role) + } + + fn create_interpret(&self) -> Box> { + Box::new(OKDAddNodeInterpret { + score: self.clone(), + }) + } +} + +#[derive(Debug, Clone)] +pub struct OKDAddNodeInterpret { + score: OKDAddNodeScore, +} + +#[async_trait] +impl Interpret for OKDAddNodeInterpret { + async fn execute( + &self, + inventory: &Inventory, + topology: &HAClusterTopology, + ) -> Result { + let role = self.score.role; + + // 0. Validate firewall PXE artifacts before anything else. If the + // kernel/initramfs/rootfs don't match the cluster's current + // CoreOS version, the new node would PXE into the wrong stream + // and fail to join. OKDOsArtifactsScore is a standalone score + // too but we run it here unconditionally so the operator never + // has to remember to wire it in. + crate::modules::okd::os_artifacts::OKDOsArtifactsScore + .interpret(inventory, topology) + .await?; + + // 1-3. Derive the new node's identity from the live cluster. + let k8s = topology + .k8s_client() + .await + .map_err(|e| InterpretError::new(format!("Could not build k8s client: {e}")))?; + let existing = list_existing_role_nodes(&k8s, role).await?; + if existing.is_empty() { + return Err(InterpretError::new(format!( + "OKDAddNode requires at least one existing {:?} node in the cluster to infer \ + the hostname and IP pattern; found zero. Add the first node manually (update \ + the static topology and run the bootstrap flow) and re-run this score.", + role + ))); + } + + let existing_names: Vec = existing.iter().map(|n| n.hostname.clone()).collect(); + let new_hostname = next_hostname(role, &existing_names).ok_or_else(|| { + InterpretError::new(format!( + "Could not infer a hostname pattern from existing nodes: {existing_names:?}. \ + Expected names like `cp0`, `cp0-harmony`, `worker3`." + )) + })?; + + let new_ip = next_ipv4(&existing, &new_hostname).map_err(InterpretError::new)?; + + info!( + "[AddNode/{role:?}] Existing cluster nodes for this role:{}", + summarize_existing(&existing) + ); + info!("[AddNode/{role:?}] New node: hostname={new_hostname}, ip={new_ip}"); + + // 4. PXE-boot reminder. + let confirmed = inquire::Confirm::new(&format!( + "PXE-boot the new {:?} node now. Press enter once its inventory agent is \ + reachable (default port 25000) on the discovery subnet.", + role, + )) + .with_default(true) + .prompt() + .map_err(|e| InterpretError::new(format!("User prompt failed: {e}")))?; + if !confirmed { + return Err(InterpretError::new("User aborted before discovery".into())); + } + + // 5. Discovery — captures installation disk, bond, blacklist. + let host_role = role.as_host_role(); + let repo = InventoryRepositoryFactory::build().await?; + + DiscoverHostForRoleScore { + role: host_role.clone(), + number_desired_hosts: 1, + discovery_strategy: self.score.discovery_strategy.clone(), + } + .interpret(inventory, topology) + .await?; + + // Pick the most recently saved mapping for this role. That's the + // one DiscoverHostForRoleScore just wrote — whether it's a brand-new + // host or an operator re-picking the same host they chose in a prior + // run (the discovery flow DELETEs+INSERTs on overwrite, so the + // re-picked row has the highest auto-increment id). + let (physical_host, host_config) = repo + .get_latest_host_for_role(&host_role) + .await? + .ok_or_else(|| { + InterpretError::new(format!( + "Discovery completed but no {host_role:?} mapping was saved" + )) + })?; + info!( + "[AddNode/{role:?}] Discovered host {} with disk {:?}", + physical_host.summary(), + host_config.installation_device, + ); + + // 6. Extract the cluster's live ignition. + let mut ignition = fetch_user_data_ignition(&k8s, role.user_data_secret_name()).await?; + + // 7. Inject the operator's bond/blacklist .nmconnection files into + // the stub ignition. Hostname is set by the DHCP reservation we + // create in step 10 — NetworkManager on RHCOS honors the DHCP + // hostname option, same as the initial bootstrap flow does for + // cp0/cp1/cp2. + inject_network_files(&mut ignition, &host_config.network_config)?; + + // 8. Push the merged ignition to the firewall HTTP root. + let ignition_filename = format!("{}-{}.ign", role.ignition_file_prefix(), new_hostname); + let ignition_relpath = format!("okd_ignition_files/{ignition_filename}"); + let ignition_content = serde_json::to_string(&ignition) + .map_err(|e| InterpretError::new(format!("Could not serialize ignition: {e}")))?; + + StaticFilesHttpScore { + folder_to_serve: None, + remote_path: None, + files: vec![FileContent { + path: FilePath::Relative(ignition_relpath.clone()), + content: ignition_content, + }], + } + .interpret(inventory, topology) + .await?; + + // 9. Per-MAC iPXE. + let installation_device = host_config.installation_device.as_deref().ok_or_else(|| { + InterpretError::new(format!( + "Discovery did not record an installation disk for host {}", + physical_host.summary(), + )) + })?; + let ipxe_content = BootstrapIpxeTpl { + http_ip: &topology.http_server.get_ip().to_string(), + scos_path: "scos", + ignition_http_path: "okd_ignition_files", + installation_device, + ignition_file_name: &ignition_filename, + } + .to_string(); + IPxeMacBootFileScore { + mac_address: physical_host.get_mac_address(), + content: ipxe_content, + } + .interpret(inventory, topology) + .await?; + + // 10. DHCP reservation. + let logical_host = LogicalHost { + ip: IpAddr::V4(new_ip), + name: new_hostname.clone(), + }; + let binding = HostBinding { + logical_host, + physical_host: physical_host.clone(), + host_config: host_config.clone(), + }; + DhcpHostBindingScore { + host_binding: vec![binding], + domain: Some(topology.domain_name.clone()), + } + .interpret(inventory, topology) + .await?; + + // 10.5. Refresh OPNsense HAProxy backends up-front, alongside the + // DHCP reservation. HAProxy marks cp3 DOWN until its kubelet + // starts answering /readyz — the other CPs keep serving — so + // registering early is safe and has the nice property that + // firewall-side prep fails *before* we reboot the node. + // + // Source of truth: live cluster + the new (hostname, IP) we + // just DHCP-reserved. The static topology.control_plane vec + // doesn't grow past bootstrap — reading from the cluster + // means subsequent add-node runs still see the nodes we + // added in earlier ones. + let mut same_role_hosts = existing_to_logical_hosts(&existing); + same_role_hosts.push(LogicalHost { + ip: IpAddr::V4(new_ip), + name: new_hostname.clone(), + }); + let other_role = match role { + AddNodeRole::ControlPlane => AddNodeRole::Worker, + AddNodeRole::Worker => AddNodeRole::ControlPlane, + }; + let other_role_hosts = cluster_logical_hosts(&k8s, other_role).await?; + let (cps, workers) = match role { + AddNodeRole::ControlPlane => (same_role_hosts, other_role_hosts), + AddNodeRole::Worker => (other_role_hosts, same_role_hosts), + }; + let (public_services, private_services) = + crate::modules::okd::load_balancer::OKDLoadBalancerScore::services_for_nodes( + &cps, &workers, + ); + crate::modules::load_balancer::LoadBalancerScore { + public_services, + private_services, + } + .interpret(inventory, topology) + .await?; + info!( + "[AddNode/{role:?}] Refreshed HAProxy: {} CP backend(s), {} worker backend(s) \ + (new node starts DOWN until kubelet answers health checks)", + cps.len(), + workers.len() + ); + let haproxy_summary = format!( + "HAProxy: {} CP + {} worker backend(s) refreshed", + cps.len(), + workers.len() + ); + + // 11. Reboot the discovery-booted node via SSH so it picks up its + // byMAC iPXE and renders the new ignition. The cluster's + // SshKeyPair is the one sttest/main.rs already baked into the + // discovery image's /root/.ssh/authorized_keys (via + // inventory.kickstart.j2), so its private half is the right + // credential here. + let discovery_ip = first_reachable_ipv4(&physical_host).ok_or_else(|| { + InterpretError::new(format!( + "Could not find a reachable IPv4 on host {} to SSH-reboot it", + physical_host.summary() + )) + })?; + + let rebooted = inquire::Confirm::new(&format!( + "Ignition {ignition_relpath} + byMAC + DHCP are published. SSH-reboot {new_hostname} \ + at {discovery_ip} now? (Decline to power-cycle manually.)" + )) + .with_default(true) + .prompt() + .map_err(|e| InterpretError::new(format!("User prompt failed: {e}")))?; + + if rebooted { + let ssh_key = SecretManager::get_or_prompt::().await?; + let creds = SshCredentials::SshKey { + username: "root".to_string(), + private_pem: ssh_key.private, + passphrase: None, + }; + // systemd detaches the reboot on its own with --no-block and + // returns exit 0 immediately, so SSH sees a clean close and we + // don't have to dance with nohup/&/redirects. An earlier + // attempt with `nohup bash -c 'sleep 2 && systemctl reboot' &` + // silently did nothing in the field: sshd closed the session + // before the backgrounded process actually ran. + let cmd = "systemctl --no-block reboot"; + info!("[AddNode/{role:?}] Triggering reboot on {discovery_ip} via SSH"); + if let Err(e) = run_command(IpAddr::V4(discovery_ip), 22, &creds, cmd).await { + warn!("[AddNode/{role:?}] SSH reboot failed: {e}"); + let manual = inquire::Confirm::new(&format!( + "SSH reboot of {new_hostname} at {discovery_ip} failed ({e}). \ + Power-cycle the node manually and press enter to continue, or decline to abort." + )) + .with_default(true) + .prompt() + .map_err(|e| InterpretError::new(format!("User prompt failed: {e}")))?; + if !manual { + return Err(InterpretError::new(format!( + "Operator aborted after SSH reboot to {discovery_ip} failed" + ))); + } + } else { + info!("[AddNode/{role:?}] Reboot dispatched"); + } + } else { + warn!( + "[AddNode/{role:?}] Operator declined SSH reboot. Manually power-cycle the \ + node at {discovery_ip} so it picks up its new PXE config." + ); + } + + // 12. Approve CSRs as they appear and wait for the node to be Ready. + let approved_csrs = + approve_csrs_until_ready(&k8s, &new_hostname, role, APPROVE_CSRS_TIMEOUT).await?; + + let macs: Vec = physical_host + .get_mac_address() + .iter() + .map(|m| m.to_string()) + .collect(); + let bond_desc = match &host_config.network_config.bond { + Some(b) => format!("{} on [{}]", b.mode, b.interfaces.join(", ")), + None => "none".to_string(), + }; + let blacklist = host_config.network_config.blacklisted_interfaces.join(", "); + + let mut details = vec![ + format!("Hostname: {new_hostname}"), + format!("IP: {new_ip}"), + format!("MACs: [{}]", macs.join(", ")), + format!("Install disk: {installation_device}"), + format!("Bond: {bond_desc}"), + format!( + "Blacklist: {}", + if blacklist.is_empty() { + "(none)" + } else { + &blacklist + } + ), + format!("Ignition: /usr/local/http/{ignition_relpath}"), + format!( + "Approved {} CSR(s) while waiting for Ready: {}", + approved_csrs.len(), + if approved_csrs.is_empty() { + "(none needed)".to_string() + } else { + approved_csrs.join(", ") + } + ), + haproxy_summary, + ]; + if matches!(role, AddNodeRole::ControlPlane) { + details.push(String::new()); + details.push( + "NOTE: this score published ignition + CSR-approved the node. API serving \ + cert rotation is still manual for control-plane adds; etcd membership is \ + reconciled by cluster-etcd-operator automatically." + .into(), + ); + } + + Ok(Outcome::success_with_details( + format!("Published add-node artifacts for {new_hostname}"), + details, + )) + } + + fn get_name(&self) -> InterpretName { + InterpretName::Custom("OKDAddNode") + } + fn get_version(&self) -> Version { + Version::from("1.0.0").unwrap() + } + fn get_status(&self) -> InterpretStatus { + InterpretStatus::QUEUED + } + fn get_children(&self) -> Vec { + vec![] + } +} + +/// How long we'll wait for the newly-added node to become Ready, approving +/// kubelet CSRs in a tight loop. Baremetal PXE + ignition + first boot is +/// slow; 15 min is deliberately generous. Bump if it bites. +const APPROVE_CSRS_TIMEOUT: Duration = Duration::from_secs(15 * 60); + +/// Poll interval between CSR-list calls. 30 s is comfortable for a Day-2 +/// add: the CSR + Ready transition takes minutes, so polling faster just +/// hammers the API server and crowds the logs. +const APPROVE_CSRS_POLL_INTERVAL: Duration = Duration::from_secs(30); + +/// Poll the cluster: approve any pending kubelet CSRs that concern this node +/// and return as soon as the Node is Ready. Returns the list of CSR names +/// we approved (for logging / outcome details). +/// +/// CSR matching: +/// - `kubernetes.io/kubelet-serving`: `spec.username == "system:node:"`. +/// That field is signed by the kubelet itself so it's safe to trust. +/// - `kubernetes.io/kube-apiserver-client-kubelet`: every pending one is +/// approved. The node identity lives inside the embedded CSR PEM (subject +/// CN), and parsing it would pull a new x509 dep for little gain — a +/// Day-2 add is an operator-triggered, single-window operation and this +/// matches what `oc adm certificate approve` of the pending set does. +async fn approve_csrs_until_ready( + k8s: &K8sClient, + hostname: &str, + role: AddNodeRole, + timeout: Duration, +) -> Result, InterpretError> { + let deadline = Instant::now() + timeout; + let mut approved: Vec = vec![]; + let expected_username = format!("system:node:{hostname}"); + + info!( + "[AddNode/{role:?}] Waiting for {hostname} to become Ready (approving CSRs as they appear, \ + timeout {}min)...", + timeout.as_secs() / 60, + ); + + let mut iteration: u32 = 0; + loop { + // Wait first, then poll. The node has *just* been told to reboot; + // it can't possibly be Ready yet and CSRs take tens of seconds to + // even appear. Polling immediately after the reboot just burns an + // API round-trip for nothing. + debug!( + "[AddNode/{role:?}] sleeping {}s before next poll", + APPROVE_CSRS_POLL_INTERVAL.as_secs() + ); + tokio::time::sleep(APPROVE_CSRS_POLL_INTERVAL).await; + + iteration += 1; + let elapsed = timeout.saturating_sub(deadline.saturating_duration_since(Instant::now())); + debug!( + "[AddNode/{role:?}] poll #{iteration} (elapsed {}s / timeout {}s)", + elapsed.as_secs(), + timeout.as_secs() + ); + + // All API calls inside the loop are transient-failure tolerant: + // during a CP reboot the apiserver we're talking to may drop the + // connection, HAProxy may briefly flip its backend set, etcd may + // flap quorum. We log + fall through to the next tick rather than + // abort the whole score — the operator's node is mid-ignition and + // needs its CSRs approved, not a dead script. + match k8s.get_resource::(hostname, None).await { + Ok(None) => { + debug!("[AddNode/{role:?}] Node {hostname} not yet present in the cluster"); + } + Ok(Some(n)) => { + let ready = is_node_ready(&n); + let summary = node_conditions_summary(&n); + debug!( + "[AddNode/{role:?}] Node {hostname} present, Ready={ready}; conditions: {summary}" + ); + if ready { + info!( + "[AddNode/{role:?}] Node {hostname} Ready after approving {} CSR(s)", + approved.len() + ); + return Ok(approved); + } + } + Err(e) => { + warn!( + "[AddNode/{role:?}] Transient API error getting node {hostname} — \ + retrying next tick: {e}" + ); + // Don't try the CSR list either — same apiserver; fall + // through to the deadline check and re-sleep. + if Instant::now() >= deadline { + return Err(timeout_err(hostname, timeout, &approved)); + } + continue; + } + } + + let pending = match k8s.list_pending_kubelet_csrs().await { + Ok(p) => p, + Err(e) => { + warn!( + "[AddNode/{role:?}] Transient API error listing CSRs — retrying next tick: {e}" + ); + if Instant::now() >= deadline { + return Err(timeout_err(hostname, timeout, &approved)); + } + continue; + } + }; + debug!( + "[AddNode/{role:?}] Found {} pending kubelet CSR(s) cluster-wide", + pending.len() + ); + let mut approved_this_tick = 0usize; + let mut skipped_this_tick = 0usize; + for csr in pending { + let signer = csr.spec.signer_name.as_str(); + let username = csr.spec.username.as_deref().unwrap_or(""); + let name = csr.metadata.name.as_deref().unwrap_or(""); + let matches_this_node = match signer { + KUBELET_SERVING_SIGNER => { + csr.spec.username.as_deref() == Some(expected_username.as_str()) + } + KUBE_APISERVER_CLIENT_KUBELET_SIGNER => true, + _ => false, + }; + if !matches_this_node { + debug!( + "[AddNode/{role:?}] skip CSR {name} (signer={signer}, username={username}): \ + not a match for {hostname}" + ); + skipped_this_tick += 1; + continue; + } + if csr.metadata.name.is_none() { + debug!("[AddNode/{role:?}] skip CSR with no metadata.name"); + skipped_this_tick += 1; + continue; + } + if approved.iter().any(|n| n == name) { + // Already approved in a prior iteration but the condition + // hasn't settled yet; skip. + debug!("[AddNode/{role:?}] skip CSR {name}: already approved this run"); + skipped_this_tick += 1; + continue; + } + match k8s + .approve_csr( + name, + "HarmonyAddNode", + &format!("Approved by OKDAddNodeScore for {hostname}"), + ) + .await + { + Ok(()) => { + info!("[AddNode/{role:?}] Approved CSR {name} (signer {signer})"); + approved.push(name.to_string()); + approved_this_tick += 1; + } + Err(e) => { + warn!( + "[AddNode/{role:?}] Transient API error approving CSR {name} — \ + will retry next tick: {e}" + ); + // Intentionally don't push onto `approved`; we want + // to try again on the next iteration. + skipped_this_tick += 1; + } + } + } + debug!( + "[AddNode/{role:?}] tick summary: approved {approved_this_tick}, skipped \ + {skipped_this_tick}, approved cumulative {}", + approved.len() + ); + + if Instant::now() >= deadline { + return Err(timeout_err(hostname, timeout, &approved)); + } + } +} + +fn timeout_err(hostname: &str, timeout: Duration, approved: &[String]) -> InterpretError { + InterpretError::new(format!( + "Timed out waiting for {hostname} to become Ready after {}min. Approved CSRs: {approved:?}", + timeout.as_secs() / 60, + )) +} + +/// Compact human summary of a Node's status conditions, e.g. +/// `Ready=False MemoryPressure=False DiskPressure=False`. Returns +/// `""` if the Node hasn't populated conditions yet. +fn node_conditions_summary(node: &Node) -> String { + match node.status.as_ref().and_then(|s| s.conditions.as_ref()) { + Some(conds) if !conds.is_empty() => conds + .iter() + .map(|c| format!("{}={}", c.type_, c.status)) + .collect::>() + .join(" "), + _ => "".to_string(), + } +} + +/// Map a snapshot of cluster nodes to `LogicalHost`s. Entries whose +/// InternalIP didn't parse as IPv4 are dropped silently — there's nothing +/// useful we can do with a hostname without an address. +fn existing_to_logical_hosts(existing: &[ExistingNode]) -> Vec { + existing + .iter() + .filter_map(|n| { + n.internal_ip.map(|ip| LogicalHost { + ip: IpAddr::V4(ip), + name: n.hostname.clone(), + }) + }) + .collect() +} + +/// Query the cluster for current nodes of `role` and convert to +/// `LogicalHost`s. Thin wrapper over `list_existing_role_nodes` + +/// `existing_to_logical_hosts`. +async fn cluster_logical_hosts( + k8s: &std::sync::Arc, + role: AddNodeRole, +) -> Result, InterpretError> { + let existing = list_existing_role_nodes(k8s, role).await?; + Ok(existing_to_logical_hosts(&existing)) +} + +fn is_node_ready(node: &Node) -> bool { + node.status + .as_ref() + .and_then(|s| s.conditions.as_ref()) + .map(|conds| { + conds + .iter() + .any(|c| c.type_ == "Ready" && c.status == "True") + }) + .unwrap_or(false) +} + +/// Pick the first IPv4 address attached to an "up" interface on this host. +/// That's the address the inventory agent answered discovery on, so it's +/// the one we can reach for the SSH-reboot step. +fn first_reachable_ipv4(host: &crate::hardware::PhysicalHost) -> Option { + host.network + .iter() + .filter(|nic| nic.is_up) + .flat_map(|nic| nic.ipv4_addresses.iter()) + .find_map(|addr| addr.parse::().ok()) +} + +// ----------------------------------------------------------------------------- +// Cluster introspection +// ----------------------------------------------------------------------------- + +/// Snapshot of an existing cluster node we'll use to infer hostname/IP patterns. +#[derive(Debug, Clone)] +struct ExistingNode { + hostname: String, + internal_ip: Option, +} + +async fn list_existing_role_nodes( + k8s: &std::sync::Arc, + role: AddNodeRole, +) -> Result, InterpretError> { + let params = ListParams::default().labels(role.node_role_label()); + let list = k8s + .list_resources::(None, Some(params)) + .await + .map_err(|e| InterpretError::new(format!("Failed to list cluster nodes: {e}")))?; + + let mut out = Vec::with_capacity(list.items.len()); + for node in list.items { + let hostname = match &node.metadata.name { + Some(n) => n.clone(), + None => continue, + }; + let internal_ip = node + .status + .as_ref() + .and_then(|s| s.addresses.as_ref()) + .and_then(|addrs| { + addrs + .iter() + .find(|a| a.type_ == "InternalIP") + .and_then(|a| a.address.parse::().ok()) + }); + out.push(ExistingNode { + hostname, + internal_ip, + }); + } + Ok(out) +} + +fn summarize_existing(nodes: &[ExistingNode]) -> String { + let mut out = String::new(); + for n in nodes { + let ip = n + .internal_ip + .map(|i| i.to_string()) + .unwrap_or_else(|| "?".into()); + out.push_str(&format!("\n - {} ({})", n.hostname, ip)); + } + out +} + +async fn fetch_user_data_ignition( + k8s: &std::sync::Arc, + secret_name: &str, +) -> Result { + let secret: Option = k8s + .get_resource(secret_name, Some("openshift-machine-api")) + .await + .map_err(|e| { + InterpretError::new(format!( + "Failed to read secret openshift-machine-api/{secret_name}: {e}" + )) + })?; + let secret = secret.ok_or_else(|| { + InterpretError::new(format!( + "Secret openshift-machine-api/{secret_name} not found — is the cluster installed?" + )) + })?; + + let user_data = secret + .data + .as_ref() + .and_then(|d| d.get("userData")) + .ok_or_else(|| { + InterpretError::new(format!( + "Secret openshift-machine-api/{secret_name} is missing the `userData` key" + )) + })?; + + serde_json::from_slice::(&user_data.0).map_err(|e| { + InterpretError::new(format!( + "Could not parse userData from openshift-machine-api/{secret_name} as JSON: {e}" + )) + }) +} + +// ----------------------------------------------------------------------------- +// Hostname / IP derivation +// ----------------------------------------------------------------------------- + +/// Split a hostname into `(prefix, seq, suffix)` at its first digit run, e.g. +/// `cp0-harmony` → `("cp", 0, "-harmony")`, `worker10` → `("worker", 10, "")`. +/// Returns `None` if no digit run exists. +fn parse_hostname(hostname: &str) -> Option<(&str, u32, &str)> { + let first_digit = hostname + .char_indices() + .find(|(_, c)| c.is_ascii_digit()) + .map(|(i, _)| i)?; + let digits_end = hostname[first_digit..] + .char_indices() + .find(|(_, c)| !c.is_ascii_digit()) + .map(|(i, _)| first_digit + i) + .unwrap_or(hostname.len()); + let prefix = &hostname[..first_digit]; + let seq: u32 = hostname[first_digit..digits_end].parse().ok()?; + let suffix = &hostname[digits_end..]; + Some((prefix, seq, suffix)) +} + +/// Compute the next hostname for a role given the current cluster's node names. +/// The dominant `(prefix, suffix)` pair wins; ties prefer the one with the +/// highest sequence. Returns `None` when no node name parses. +fn next_hostname(role: AddNodeRole, existing: &[String]) -> Option { + use std::collections::HashMap; + + let mut buckets: HashMap<(String, String), Vec> = HashMap::new(); + for name in existing { + if let Some((prefix, seq, suffix)) = parse_hostname(name) { + buckets + .entry((prefix.to_string(), suffix.to_string())) + .or_default() + .push(seq); + } + } + if buckets.is_empty() { + return None; + } + + // Pick the largest bucket. On a tie, pick the one whose max seq is highest + // (so `cp0, cp1, cp2, worker5` doesn't accidentally produce `worker6` for + // role=ControlPlane — but that also wouldn't happen because we only feed + // same-role nodes in). + let (_key, seqs) = buckets + .iter() + .max_by_key(|(_, seqs)| (seqs.len(), seqs.iter().copied().max().unwrap_or(0))) + .unwrap(); + let max = seqs.iter().copied().max().unwrap(); + let (prefix, suffix) = buckets + .iter() + .find(|(_, v)| v.len() == seqs.len() && v.iter().copied().max().unwrap_or(0) == max) + .map(|(k, _)| k) + .unwrap(); + + // Emit the next name. If every existing sequence is >= 0 we bump by 1. + let next_seq = max + 1; + let _ = role; // role is implicit in the caller passing the right subset + Some(format!("{prefix}{next_seq}{suffix}")) +} + +/// Derive the next IPv4 address from the pattern `seq → last_octet` observed +/// across existing nodes of the same role. Assumes stride 1 and the same /24. +fn next_ipv4(existing: &[ExistingNode], new_hostname: &str) -> Result { + let (_, new_seq, _) = parse_hostname(new_hostname) + .ok_or_else(|| format!("New hostname `{new_hostname}` has no digit run"))?; + + // Collect (seq, ipv4) pairs from nodes whose name parses. + let mut pairs: Vec<(u32, Ipv4Addr)> = Vec::new(); + for n in existing { + let Some((_, seq, _)) = parse_hostname(&n.hostname) else { + continue; + }; + let Some(ip) = n.internal_ip else { continue }; + pairs.push((seq, ip)); + } + if pairs.is_empty() { + return Err(format!( + "No existing node with a parseable hostname AND an InternalIP; cannot infer the \ + IP pattern. Existing: {:?}", + existing + .iter() + .map(|n| (&n.hostname, n.internal_ip)) + .collect::>() + )); + } + + // Confirm all pairs share the first three octets (same /24). + let (_, ref0) = pairs[0]; + let [a, b, c, _] = ref0.octets(); + for (_, ip) in &pairs { + let oct = ip.octets(); + if oct[0] != a || oct[1] != b || oct[2] != c { + return Err(format!( + "Cluster nodes span multiple /24s ({} and {}); refusing to guess", + ref0, ip + )); + } + } + + // Infer base octet: for each (seq, ip.last_octet), base = last - seq. + // All bases must agree, otherwise the pattern isn't linear-stride-1. + let mut bases: Vec = pairs + .iter() + .map(|(seq, ip)| ip.octets()[3] as i32 - *seq as i32) + .collect(); + bases.sort(); + bases.dedup(); + if bases.len() != 1 { + return Err(format!( + "IP assignment is not linear stride-1 (seq→last-octet offsets: {bases:?}); refusing \ + to guess" + )); + } + let base = bases[0]; + let projected = base + new_seq as i32; + if !(0..=255).contains(&projected) { + return Err(format!( + "Projected last octet {projected} is outside 0..=255 for {new_hostname}" + )); + } + + let new_last = projected as u8; + let new_ip = Ipv4Addr::new(a, b, c, new_last); + + // Collision check against already-assigned IPs. + if pairs.iter().any(|(_, ip)| *ip == new_ip) { + return Err(format!( + "Derived IP {new_ip} is already in use by another node" + )); + } + + Ok(new_ip) +} + +// ----------------------------------------------------------------------------- +// Ignition injection +// ----------------------------------------------------------------------------- + +fn inject_network_files( + ignition: &mut Value, + network_config: &NetworkConfig, +) -> Result<(), InterpretError> { + let files = ensure_storage_files(ignition); + + if let Some(bond) = &network_config.bond { + let bond_name = "bond0"; + let bond_uuid = uuid::Uuid::new_v4().to_string(); + let primary_iface = bond.interfaces.first().map(|s| s.as_str()).unwrap_or(""); + files.push(nmconnection_file_entry( + &format!("/etc/NetworkManager/system-connections/{bond_name}.nmconnection"), + &render_bond_master(bond_name, &bond_uuid, &bond.mode, primary_iface), + )); + + for iface in &bond.interfaces { + let slave_id = format!("{bond_name}-{iface}"); + let slave_uuid = uuid::Uuid::new_v4().to_string(); + files.push(nmconnection_file_entry( + &format!("/etc/NetworkManager/system-connections/{slave_id}.nmconnection"), + &render_bond_slave(&slave_id, &slave_uuid, iface, bond_name), + )); + } + } + + for iface in &network_config.blacklisted_interfaces { + files.push(nmconnection_file_entry( + &format!("/etc/NetworkManager/system-connections/{iface}-disabled.nmconnection"), + &render_disabled_interface(iface), + )); + } + + Ok(()) +} + +fn nmconnection_file_entry(path: &str, content: &str) -> Value { + json!({ + "path": path, + "mode": 0o600, + "overwrite": true, + "contents": { + "source": format!( + "data:text/plain;charset=utf-8;base64,{}", + BASE64_STANDARD.encode(content) + ) + } + }) +} + +/// Resolve `ignition.storage.files` as a mutable array reference, creating +/// intermediate objects as needed. The stub ignition from OKD's +/// `-user-data-managed` secret has no `storage` at all, so we can't +/// just index into it. +fn ensure_storage_files(ignition: &mut Value) -> &mut Vec { + if !ignition.is_object() { + *ignition = json!({}); + } + let root = ignition.as_object_mut().unwrap(); + let storage = root + .entry("storage") + .or_insert_with(|| json!({})) + .as_object_mut() + .expect("storage must be an object"); + let files = storage + .entry("files") + .or_insert_with(|| json!([])) + .as_array_mut() + .expect("storage.files must be an array"); + files +} + +// ----------------------------------------------------------------------------- +// Tests +// ----------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + fn node(name: &str, ip: &str) -> ExistingNode { + ExistingNode { + hostname: name.to_string(), + internal_ip: Some(ip.parse().unwrap()), + } + } + + #[test] + fn parse_hostname_plain() { + assert_eq!(parse_hostname("cp0"), Some(("cp", 0, ""))); + assert_eq!(parse_hostname("worker12"), Some(("worker", 12, ""))); + } + + #[test] + fn parse_hostname_with_suffix() { + assert_eq!(parse_hostname("cp0-harmony"), Some(("cp", 0, "-harmony"))); + assert_eq!( + parse_hostname("worker10-harmony"), + Some(("worker", 10, "-harmony")) + ); + } + + #[test] + fn parse_hostname_no_digits() { + assert_eq!(parse_hostname("controlplane"), None); + } + + #[test] + fn next_hostname_plain_sequence() { + let names: Vec = ["cp0", "cp1", "cp2"] + .iter() + .map(|s| s.to_string()) + .collect(); + assert_eq!( + next_hostname(AddNodeRole::ControlPlane, &names).as_deref(), + Some("cp3") + ); + } + + #[test] + fn next_hostname_with_suffix() { + let names: Vec = ["worker0-harmony", "worker1-harmony", "worker2-harmony"] + .iter() + .map(|s| s.to_string()) + .collect(); + assert_eq!( + next_hostname(AddNodeRole::Worker, &names).as_deref(), + Some("worker3-harmony") + ); + } + + #[test] + fn next_hostname_with_gap() { + let names: Vec = ["cp0", "cp2"].iter().map(|s| s.to_string()).collect(); + // Uses max+1 rather than filling the gap — simpler and matches the + // "monotonic sequence" mental model. + assert_eq!( + next_hostname(AddNodeRole::ControlPlane, &names).as_deref(), + Some("cp3") + ); + } + + #[test] + fn next_hostname_none_when_unparseable() { + let names: Vec = ["foo", "bar"].iter().map(|s| s.to_string()).collect(); + assert_eq!(next_hostname(AddNodeRole::Worker, &names), None); + } + + #[test] + fn next_ipv4_standard_sequence() { + let existing = vec![ + node("cp0", "192.168.40.20"), + node("cp1", "192.168.40.21"), + node("cp2", "192.168.40.22"), + ]; + assert_eq!( + next_ipv4(&existing, "cp3").unwrap(), + "192.168.40.23".parse::().unwrap() + ); + } + + #[test] + fn next_ipv4_with_suffix() { + let existing = vec![ + node("cp0-harmony", "10.0.0.20"), + node("cp1-harmony", "10.0.0.21"), + ]; + assert_eq!( + next_ipv4(&existing, "cp2-harmony").unwrap(), + "10.0.0.22".parse::().unwrap() + ); + } + + #[test] + fn next_ipv4_rejects_nonlinear() { + let existing = vec![ + node("cp0", "192.168.40.20"), + node("cp1", "192.168.40.25"), // gap breaks stride-1 + ]; + assert!(next_ipv4(&existing, "cp2").is_err()); + } + + #[test] + fn next_ipv4_rejects_multiple_subnets() { + let existing = vec![node("cp0", "192.168.40.20"), node("cp1", "192.168.41.21")]; + assert!(next_ipv4(&existing, "cp2").is_err()); + } + + #[test] + fn next_ipv4_empty_existing_errors() { + assert!(next_ipv4(&[], "cp0").is_err()); + } + + #[test] + fn inject_network_files_preserves_existing() { + use crate::topology::BondConfig; + use harmony_types::firewall::LaggProtocol; + + // Start with a stub that already has a file — this mirrors OKD's + // real user-data-managed secret (which points at the MCS URL). + let mut ign = json!({ + "ignition": { + "version": "3.2.0", + "config": { "merge": [{"source": "https://api-int/config/worker"}] } + }, + "storage": { + "files": [ + {"path": "/preexisting", "contents": {"source": "data:,hello"}} + ] + } + }); + + let cfg = NetworkConfig { + bond: Some(BondConfig { + interfaces: vec!["enp1s0f0".into(), "enp1s0f1".into()], + mode: LaggProtocol::Lacp, + }), + blacklisted_interfaces: vec!["eno1".into()], + }; + + inject_network_files(&mut ign, &cfg).unwrap(); + + let files = ign["storage"]["files"].as_array().unwrap(); + // 1 preexisting + 1 bond master + 2 slaves + 1 blacklist = 5 + assert_eq!(files.len(), 5); + + let paths: Vec<&str> = files.iter().map(|f| f["path"].as_str().unwrap()).collect(); + assert!(paths.contains(&"/preexisting")); + assert!(paths.contains(&"/etc/NetworkManager/system-connections/bond0.nmconnection")); + assert!( + paths.contains(&"/etc/NetworkManager/system-connections/bond0-enp1s0f0.nmconnection") + ); + assert!( + paths.contains(&"/etc/NetworkManager/system-connections/bond0-enp1s0f1.nmconnection") + ); + assert!( + paths.contains(&"/etc/NetworkManager/system-connections/eno1-disabled.nmconnection") + ); + + // Spot-check that every new file has mode 0600. + for f in files.iter() { + let path = f["path"].as_str().unwrap(); + if path.starts_with("/etc/NetworkManager/") { + assert_eq!(f["mode"].as_u64().unwrap(), 0o600); + assert_eq!(f["overwrite"].as_bool().unwrap(), true); + } + } + } + + #[test] + fn inject_network_files_handles_missing_storage() { + use crate::topology::BondConfig; + use harmony_types::firewall::LaggProtocol; + + // Fresh stub with no storage key at all. + let mut ign = json!({"ignition": {"version": "3.2.0"}}); + let cfg = NetworkConfig { + bond: Some(BondConfig { + interfaces: vec!["eth0".into(), "eth1".into()], + mode: LaggProtocol::Failover, + }), + blacklisted_interfaces: vec![], + }; + inject_network_files(&mut ign, &cfg).unwrap(); + assert_eq!(ign["storage"]["files"].as_array().unwrap().len(), 3); + } +} diff --git a/harmony/src/modules/okd/load_balancer.rs b/harmony/src/modules/okd/load_balancer.rs index 0f9dfa61..3ae0bf15 100644 --- a/harmony/src/modules/okd/load_balancer.rs +++ b/harmony/src/modules/okd/load_balancer.rs @@ -1,4 +1,7 @@ -use std::net::{IpAddr, Ipv4Addr, SocketAddr}; +use std::{ + collections::HashSet, + net::{IpAddr, Ipv4Addr, SocketAddr}, +}; use serde::Serialize; @@ -8,7 +11,7 @@ use crate::{ score::Score, topology::{ BackendServer, HAClusterTopology, HealthCheck, HttpMethod, HttpStatusCode, LoadBalancer, - LoadBalancerService, SSL, Topology, + LoadBalancerService, LogicalHost, SSL, Topology, }, }; @@ -53,18 +56,53 @@ pub struct OKDLoadBalancerScore { /// ``` impl OKDLoadBalancerScore { pub fn new(topology: &HAClusterTopology) -> Self { + let (public_services, private_services) = + Self::services_for_nodes(&topology.control_plane, &topology.workers); + Self { + load_balancer_score: LoadBalancerScore { + public_services, + private_services, + }, + } + } + + /// Build the OKD load-balancer service set for a given cluster shape. + /// + /// Returns `(public_services, private_services)` — exactly what the + /// Score's `new()` feeds into `LoadBalancerScore`. Exposing this as a + /// free helper lets Day-2 flows (e.g. `OKDAddNodeScore`) refresh + /// HAProxy against the *current* cluster instead of the bootstrap-time + /// topology snapshot. + pub fn services_for_nodes( + control_plane: &[LogicalHost], + workers: &[LogicalHost], + ) -> (Vec, Vec) { // Bind on 0.0.0.0 instead of the LAN IP to avoid CARP VIP race // conditions where HAProxy fails to bind when the interface // transitions back to master. let bind_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED); + // Order-preserving dedupe by IP. OKD marks CPs as schedulable + // workers (both master AND worker labels), so when Day-2 flows + // build `control_plane` and `workers` by querying each label + // separately the CPs appear in both slices. Feeding duplicates + // into OPNsense makes HAProxy emit two `server` lines with the + // same name, which `haproxy -c` rejects. + let mut all_nodes: Vec = Vec::new(); + let mut seen: HashSet = HashSet::new(); + for host in control_plane.iter().chain(workers.iter()) { + if seen.insert(host.ip.to_string()) { + all_nodes.push(host.clone()); + } + } + let public_services = vec![ LoadBalancerService { - backend_servers: Self::nodes_to_backend_server(topology, 80), + backend_servers: backends_for(&all_nodes, 80), listening_port: SocketAddr::new(bind_addr, 80), health_check: None, }, LoadBalancerService { - backend_servers: Self::nodes_to_backend_server(topology, 443), + backend_servers: backends_for(&all_nodes, 443), listening_port: SocketAddr::new(bind_addr, 443), health_check: None, }, @@ -72,7 +110,7 @@ impl OKDLoadBalancerScore { let private_services = vec![ LoadBalancerService { - backend_servers: Self::nodes_to_backend_server(topology, 80), + backend_servers: backends_for(&all_nodes, 80), listening_port: SocketAddr::new(bind_addr, 80), health_check: Some(HealthCheck::HTTP( Some(25001), @@ -83,7 +121,7 @@ impl OKDLoadBalancerScore { )), }, LoadBalancerService { - backend_servers: Self::nodes_to_backend_server(topology, 443), + backend_servers: backends_for(&all_nodes, 443), listening_port: SocketAddr::new(bind_addr, 443), health_check: Some(HealthCheck::HTTP( Some(25001), @@ -94,12 +132,12 @@ impl OKDLoadBalancerScore { )), }, LoadBalancerService { - backend_servers: Self::control_plane_to_backend_server(topology, 22623), + backend_servers: backends_for(control_plane, 22623), listening_port: SocketAddr::new(bind_addr, 22623), health_check: Some(HealthCheck::TCP(None)), }, LoadBalancerService { - backend_servers: Self::control_plane_to_backend_server(topology, 6443), + backend_servers: backends_for(control_plane, 6443), listening_port: SocketAddr::new(bind_addr, 6443), health_check: Some(HealthCheck::HTTP( None, @@ -110,57 +148,20 @@ impl OKDLoadBalancerScore { )), }, ]; - Self { - load_balancer_score: LoadBalancerScore { - public_services, - private_services, - }, - } - } - /// Creates backend servers list for control plane nodes only - /// - /// Use this for control plane-specific services like: - /// - Port 22623: Ignition API (machine configuration during bootstrap) - /// - Port 6443: Kubernetes API server - fn control_plane_to_backend_server( - topology: &HAClusterTopology, - port: u16, - ) -> Vec { - topology - .control_plane - .iter() - .map(|cp| BackendServer { - address: cp.ip.to_string(), - port, - }) - .collect() + (public_services, private_services) } +} - /// Creates backend servers list for all nodes (control plane + workers) - /// - /// Use this for ingress traffic that should be distributed across all nodes: - /// - Port 80: HTTP ingress traffic - /// - Port 443: HTTPS ingress traffic - /// - /// In OKD, ingress router pods can run on any node, so both control plane - /// and worker nodes should be included in the load balancer backend pool. - fn nodes_to_backend_server(topology: &HAClusterTopology, port: u16) -> Vec { - let mut nodes = Vec::new(); - for cp in &topology.control_plane { - nodes.push(BackendServer { - address: cp.ip.to_string(), - port, - }); - } - for worker in &topology.workers { - nodes.push(BackendServer { - address: worker.ip.to_string(), - port, - }); - } - nodes - } +/// Map a slice of logical hosts to `BackendServer`s for a single port. +fn backends_for(hosts: &[LogicalHost], port: u16) -> Vec { + hosts + .iter() + .map(|h| BackendServer { + address: h.ip.to_string(), + port, + }) + .collect() } #[cfg(test)] @@ -245,14 +246,23 @@ mod tests { } #[test] - fn test_nodes_to_backend_server_includes_control_plane_and_workers() { + fn test_public_port_80_backends_include_all_nodes() { let topology = create_test_topology(); + let (public_services, _) = + OKDLoadBalancerScore::services_for_nodes(&topology.control_plane, &topology.workers); - let backend_servers = OKDLoadBalancerScore::nodes_to_backend_server(&topology, 80); + let svc_80 = public_services + .iter() + .find(|s| s.listening_port.port() == 80) + .expect("Public service on port 80 not found"); - assert_eq!(backend_servers.len(), 5); + assert_eq!(svc_80.backend_servers.len(), 5); - let addresses: Vec<&str> = backend_servers.iter().map(|s| s.address.as_str()).collect(); + let addresses: Vec<&str> = svc_80 + .backend_servers + .iter() + .map(|s| s.address.as_str()) + .collect(); assert!(addresses.contains(&"192.168.1.10")); assert!(addresses.contains(&"192.168.1.11")); assert!(addresses.contains(&"192.168.1.12")); @@ -261,14 +271,23 @@ mod tests { } #[test] - fn test_control_plane_to_backend_server_only_includes_control_plane() { + fn test_private_port_22623_backends_exclude_workers() { let topology = create_test_topology(); + let (_, private_services) = + OKDLoadBalancerScore::services_for_nodes(&topology.control_plane, &topology.workers); - let backend_servers = OKDLoadBalancerScore::control_plane_to_backend_server(&topology, 80); + let svc = private_services + .iter() + .find(|s| s.listening_port.port() == 22623) + .expect("Private service on port 22623 not found"); - assert_eq!(backend_servers.len(), 3); + assert_eq!(svc.backend_servers.len(), 3); - let addresses: Vec<&str> = backend_servers.iter().map(|s| s.address.as_str()).collect(); + let addresses: Vec<&str> = svc + .backend_servers + .iter() + .map(|s| s.address.as_str()) + .collect(); assert!(addresses.contains(&"192.168.1.10")); assert!(addresses.contains(&"192.168.1.11")); assert!(addresses.contains(&"192.168.1.12")); @@ -276,6 +295,53 @@ mod tests { assert!(!addresses.contains(&"192.168.1.21")); } + #[test] + fn test_public_port_80_dedupes_shared_master_worker_nodes() { + // OKD marks control-plane nodes as schedulable workers — they appear + // in BOTH the master-label and worker-label query results. Feeding + // the two slices into services_for_nodes must not duplicate them + // in the "all nodes" ingress pools, or HAProxy rejects the config. + let cps = vec![ + LogicalHost { + ip: ip!("192.168.1.21"), + name: "cp1".into(), + }, + LogicalHost { + ip: ip!("192.168.1.22"), + name: "cp2".into(), + }, + ]; + let workers = vec![ + LogicalHost { + ip: ip!("192.168.1.21"), + name: "cp1".into(), + }, + LogicalHost { + ip: ip!("192.168.1.22"), + name: "cp2".into(), + }, + LogicalHost { + ip: ip!("192.168.1.30"), + name: "wk0".into(), + }, + ]; + let (public_services, _) = OKDLoadBalancerScore::services_for_nodes(&cps, &workers); + let svc_80 = public_services + .iter() + .find(|s| s.listening_port.port() == 80) + .expect("Public service on port 80 not found"); + + let addresses: Vec<&str> = svc_80 + .backend_servers + .iter() + .map(|s| s.address.as_str()) + .collect(); + assert_eq!( + addresses, + vec!["192.168.1.21", "192.168.1.22", "192.168.1.30"] + ); + } + #[test] fn test_public_services_include_all_nodes_on_port_80_and_443() { let topology = create_test_topology(); @@ -341,10 +407,15 @@ mod tests { #[test] fn test_all_backend_servers_have_correct_port() { let topology = create_test_topology(); + let (public_services, _) = + OKDLoadBalancerScore::services_for_nodes(&topology.control_plane, &topology.workers); - let backend_servers = OKDLoadBalancerScore::nodes_to_backend_server(&topology, 443); + let svc_443 = public_services + .iter() + .find(|s| s.listening_port.port() == 443) + .expect("Public service on port 443 not found"); - for server in backend_servers { + for server in &svc_443.backend_servers { assert_eq!(server.port, 443); } } diff --git a/harmony/src/modules/okd/mod.rs b/harmony/src/modules/okd/mod.rs index 5fafe15d..6fd48e7a 100644 --- a/harmony/src/modules/okd/mod.rs +++ b/harmony/src/modules/okd/mod.rs @@ -24,8 +24,13 @@ pub use bootstrap_04_workers::*; pub use bootstrap_05_sanity_check::*; pub use bootstrap_06_installation_report::*; pub use bootstrap_persist_network_bond::*; +pub mod add_node; pub mod crd; pub mod disable_dad_score; pub mod host_network; pub mod node_file_score; +pub mod os_artifacts; pub mod system_reserved_score; + +pub use add_node::*; +pub use os_artifacts::*; diff --git a/harmony/src/modules/okd/os_artifacts.rs b/harmony/src/modules/okd/os_artifacts.rs new file mode 100644 index 00000000..979a53ea --- /dev/null +++ b/harmony/src/modules/okd/os_artifacts.rs @@ -0,0 +1,433 @@ +//! Verify that the kernel/initramfs/rootfs on the firewall's HTTP server +//! match the CoreOS version the running OKD cluster expects. +//! +//! If they're stale (cluster upgraded since bootstrap, for example), new +//! PXE-booting nodes come up on the wrong stream and fail to join. This +//! score asks the cluster what version it wants — via +//! `oc adm release extract --command=openshift-install` → +//! `openshift-install coreos print-stream-json` — then SSHes to the +//! firewall to `sha256sum` each file under `/usr/local/http/scos/`. +//! Match → score succeeds. Mismatch → print an ssh+curl+ln-sfn bundle the +//! operator can paste to refresh the files directly on the firewall, then +//! re-verify in a loop until either clean or the operator aborts. +//! +//! The stable names iPXE serves (`scos-live-kernel.x86_64`, etc.) don't +//! appear in the stream JSON — downloads come with versioned upstream +//! names. The refresh bundle therefore `curl -L -O`s the versioned file +//! and `ln -sfn`s the stable name at it. `sha256sum` follows symlinks by +//! default, so our verification ends up hashing the versioned payload +//! regardless of whether the stable name is a regular file or a symlink. + +use async_trait::async_trait; +use harmony_secret::SecretManager; +use harmony_types::{id::Id, ssh::SshCredentials}; +use log::{debug, info}; +use serde::Serialize; +use serde_json::Value; +use std::net::IpAddr; +use tokio::process::Command; + +use crate::{ + config::secret::OPNSenseFirewallCredentials, + data::Version, + infra::ssh::run_command, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + score::Score, + topology::HAClusterTopology, +}; + +const REMOTE_DIR: &str = "/usr/local/http/scos"; +const KERNEL_STABLE: &str = "scos-live-kernel.x86_64"; +const INITRAMFS_STABLE: &str = "scos-live-initramfs.x86_64.img"; +const ROOTFS_STABLE: &str = "scos-live-rootfs.x86_64.img"; +const OPENSHIFT_INSTALL_DIR: &str = "./data/okd/bin"; + +/// Standalone score that validates PXE artifacts before anyone PXEs into +/// them. Runs both on its own (added to a pipeline) and as the first step +/// of [`super::add_node::OKDAddNodeScore`] — operators don't have to +/// remember to wire it in. +#[derive(Debug, Clone, Serialize)] +pub struct OKDOsArtifactsScore; + +impl Score for OKDOsArtifactsScore { + fn name(&self) -> String { + "OKDOsArtifactsScore".to_string() + } + + fn create_interpret(&self) -> Box> { + Box::new(OKDOsArtifactsInterpret) + } +} + +#[derive(Debug)] +pub struct OKDOsArtifactsInterpret; + +#[async_trait] +impl Interpret for OKDOsArtifactsInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &HAClusterTopology, + ) -> Result { + let fw_ip = topology.firewall.get_ip(); + let fw_creds = SecretManager::get_or_prompt::().await?; + let ssh_creds = SshCredentials::Password { + username: fw_creds.username, + password: fw_creds.password, + }; + + info!( + "[OSArtifacts] Extracting cluster-matched openshift-install and reading stream JSON..." + ); + let expected = fetch_expected_pxe_artifacts().await?; + + loop { + info!("[OSArtifacts] Verifying 3 PXE artifacts on {fw_ip}..."); + let rows = verify_each(fw_ip, &ssh_creds, &expected).await?; + if rows.iter().all(|r| r.status == Status::Match) { + info!("[OSArtifacts] All 3 PXE artifacts match the cluster's expected version"); + return Ok(Outcome::success(format!( + "PXE artifacts verified on {fw_ip} in {REMOTE_DIR}" + ))); + } + print_mismatch_report(fw_ip, &rows); + let cont = inquire::Confirm::new( + "After updating the files on the firewall, press enter to re-verify (or decline to abort)", + ) + .with_default(true) + .prompt() + .map_err(|e| InterpretError::new(format!("User prompt failed: {e}")))?; + if !cont { + return Err(InterpretError::new( + "Operator aborted PXE-artifact validation — add-node cannot proceed with stale files.".into(), + )); + } + } + } + + fn get_name(&self) -> InterpretName { + InterpretName::Custom("OKDOsArtifacts") + } + fn get_version(&self) -> Version { + Version::from("1.0.0").unwrap() + } + fn get_status(&self) -> InterpretStatus { + InterpretStatus::QUEUED + } + fn get_children(&self) -> Vec { + vec![] + } +} + +// ----------------------------------------------------------------------------- +// Expected artifacts — shell out to openshift-install +// ----------------------------------------------------------------------------- + +#[derive(Debug, Clone)] +struct PxeArtifact { + stable_filename: &'static str, + url: String, + sha256: String, +} + +impl PxeArtifact { + fn remote_path(&self) -> String { + format!("{REMOTE_DIR}/{}", self.stable_filename) + } + /// Upstream filename extracted from the URL's last path segment — + /// what `curl -L -O` would save to disk and what `ln -sfn` targets. + fn upstream_filename(&self) -> &str { + self.url.rsplit('/').next().unwrap_or(self.url.as_str()) + } +} + +async fn fetch_expected_pxe_artifacts() -> Result, InterpretError> { + // 1. Extract openshift-install that matches the running cluster. + // Overwrites ./data/okd/bin/openshift-install so we never use a + // stale one. Requires `oc` on PATH and a valid KUBECONFIG. + let to_arg = format!("--to={OPENSHIFT_INSTALL_DIR}"); + debug!("[OSArtifacts] oc adm release extract --command=openshift-install {to_arg}"); + let status = Command::new("oc") + .args([ + "adm", + "release", + "extract", + "--command=openshift-install", + &to_arg, + ]) + .status() + .await + .map_err(|e| { + InterpretError::new(format!( + "Could not run `oc`: {e}. Is it on PATH and is KUBECONFIG set?" + )) + })?; + if !status.success() { + return Err(InterpretError::new(format!( + "`oc adm release extract --command=openshift-install` failed (exit {status}). Is KUBECONFIG pointing at the cluster?" + ))); + } + + // 2. Print stream JSON. + let installer = format!("{OPENSHIFT_INSTALL_DIR}/openshift-install"); + debug!("[OSArtifacts] {installer} coreos print-stream-json"); + let out = Command::new(&installer) + .args(["coreos", "print-stream-json"]) + .output() + .await + .map_err(|e| InterpretError::new(format!("Could not run {installer}: {e}")))?; + if !out.status.success() { + return Err(InterpretError::new(format!( + "`{installer} coreos print-stream-json` failed (exit {}): {}", + out.status, + String::from_utf8_lossy(&out.stderr), + ))); + } + let json: Value = serde_json::from_slice(&out.stdout).map_err(|e| { + InterpretError::new(format!( + "Could not parse `openshift-install` stream JSON: {e}" + )) + })?; + + // 3. Pull URL + sha256 for kernel/initramfs/rootfs. + let pxe = &json["architectures"]["x86_64"]["artifacts"]["metal"]["formats"]["pxe"]; + let field = |artifact: &str, key: &str| -> Result { + pxe[artifact][key] + .as_str() + .map(String::from) + .ok_or_else(|| { + InterpretError::new(format!( + "Stream JSON is missing .architectures.x86_64.artifacts.metal.formats.pxe.{artifact}.{key}" + )) + }) + }; + + Ok(vec![ + PxeArtifact { + stable_filename: KERNEL_STABLE, + url: field("kernel", "location")?, + sha256: field("kernel", "sha256")?, + }, + PxeArtifact { + stable_filename: INITRAMFS_STABLE, + url: field("initramfs", "location")?, + sha256: field("initramfs", "sha256")?, + }, + PxeArtifact { + stable_filename: ROOTFS_STABLE, + url: field("rootfs", "location")?, + sha256: field("rootfs", "sha256")?, + }, + ]) +} + +// ----------------------------------------------------------------------------- +// Verification over SSH +// ----------------------------------------------------------------------------- + +#[derive(Debug, PartialEq, Clone, Copy)] +enum Status { + Match, + Mismatch, + Missing, +} + +#[derive(Debug)] +struct VerifyRow { + artifact: PxeArtifact, + have: Option, + status: Status, +} + +async fn verify_each( + fw_ip: IpAddr, + creds: &SshCredentials, + expected: &[PxeArtifact], +) -> Result, InterpretError> { + let mut out = Vec::with_capacity(expected.len()); + for e in expected { + let have = sha256_of(fw_ip, creds, &e.remote_path()).await; + let status = match &have { + Some(h) if h == &e.sha256 => Status::Match, + Some(_) => Status::Mismatch, + None => Status::Missing, + }; + debug!( + "[OSArtifacts] {}: expected {}... got {:?} -> {:?}", + e.stable_filename, + &e.sha256[..12], + have.as_deref(), + status, + ); + out.push(VerifyRow { + artifact: e.clone(), + have, + status, + }); + } + Ok(out) +} + +/// `sha256sum ` over SSH. We deliberately avoid shell features +/// (pipes, `2>/dev/null`, awk) because the firewall's root shell is FreeBSD +/// csh, which doesn't speak Bourne-style stderr redirection — every shell +/// understands a bare `sha256sum `. Missing files / dangling +/// symlinks cause a non-zero exit (our run_command turns that into Err, +/// which we fold back to None). Parsing is done in Rust: scan lines for a +/// token that looks like a 64-char hex hash. +async fn sha256_of(fw_ip: IpAddr, creds: &SshCredentials, remote_path: &str) -> Option { + let cmd = format!("sha256sum {remote_path}"); + let out = match run_command(fw_ip, 22, creds, &cmd).await { + Ok(out) => out, + Err(e) => { + debug!("[OSArtifacts] sha256sum {remote_path} returned err (likely missing): {e}"); + return None; + } + }; + debug!( + "[OSArtifacts] sha256sum raw output for {remote_path}: {:?}", + out.trim() + ); + parse_sha256sum_output(&out) +} + +/// Extract the 64-hex sha256 from any line whose first whitespace token +/// looks like one. Tolerant of interleaved stderr banners or warnings +/// the remote shell might have splatted into our channel. +fn parse_sha256sum_output(output: &str) -> Option { + for line in output.lines() { + if let Some(first) = line.split_whitespace().next() { + if first.len() == 64 && first.chars().all(|c| c.is_ascii_hexdigit()) { + return Some(first.to_string()); + } + } + } + None +} + +// ----------------------------------------------------------------------------- +// Mismatch report +// ----------------------------------------------------------------------------- + +fn print_mismatch_report(fw_ip: IpAddr, rows: &[VerifyRow]) { + let mut msg = String::from( + "[OSArtifacts] PXE artifact mismatch — manual refresh required.\n\nExpected (from cluster's `openshift-install coreos print-stream-json`):\n", + ); + for r in rows { + let have = match &r.have { + Some(h) => format!("have={}", &h[..16.min(h.len())]), + None => "have=".to_string(), + }; + let marker = if r.status == Status::Match { + " OK" + } else { + "" + }; + msg.push_str(&format!( + " {:<34} sha256={} {}{}\n", + r.artifact.stable_filename, + &r.artifact.sha256[..16], + have, + marker, + )); + } + + let needs: Vec<&VerifyRow> = rows.iter().filter(|r| r.status != Status::Match).collect(); + + msg.push_str(&format!( + "\nRun this from your workstation to refresh directly on the firewall:\n\n ssh root@{fw_ip} '\\\n set -eux && \\\n mkdir -p {REMOTE_DIR} && \\\n cd {REMOTE_DIR}" + )); + for r in &needs { + msg.push_str(&format!(" && \\\n curl -L -O \"{}\"", r.artifact.url)); + } + for r in &needs { + msg.push_str(&format!( + " && \\\n ln -sfn {} {}", + r.artifact.upstream_filename(), + r.artifact.stable_filename, + )); + } + msg.push_str("'\n\nThen press enter here to re-verify.\n"); + info!("{msg}"); +} + +// ----------------------------------------------------------------------------- +// Tests +// ----------------------------------------------------------------------------- + +#[cfg(test)] +mod tests { + use super::*; + + fn row(status: Status, have: Option<&str>) -> VerifyRow { + VerifyRow { + artifact: PxeArtifact { + stable_filename: KERNEL_STABLE, + url: "https://example.com/path/rhcos-418.94.xxx-live-kernel-x86_64".to_string(), + sha256: "a".repeat(64), + }, + have: have.map(|s| s.to_string()), + status, + } + } + + #[test] + fn upstream_filename_is_last_path_segment() { + let a = PxeArtifact { + stable_filename: KERNEL_STABLE, + url: "https://x.example/a/b/rhcos-418.94-live-kernel-x86_64".into(), + sha256: "deadbeef".into(), + }; + assert_eq!(a.upstream_filename(), "rhcos-418.94-live-kernel-x86_64"); + } + + #[test] + fn parse_sha256sum_output_handles_standard_format() { + let out = "dceac0b7809536dea5ff109d231b487a2be4cad742e1152c1268cd800dd6450b /usr/local/http/scos/scos-live-kernel.x86_64\n"; + assert_eq!( + parse_sha256sum_output(out).as_deref(), + Some("dceac0b7809536dea5ff109d231b487a2be4cad742e1152c1268cd800dd6450b"), + ); + } + + #[test] + fn parse_sha256sum_output_skips_stderr_noise_before_hash() { + let out = "Warning: remote shell is weird\nsha256sum: foo: No such file\ndceac0b7809536dea5ff109d231b487a2be4cad742e1152c1268cd800dd6450b /path\n"; + assert_eq!( + parse_sha256sum_output(out).as_deref(), + Some("dceac0b7809536dea5ff109d231b487a2be4cad742e1152c1268cd800dd6450b"), + ); + } + + #[test] + fn parse_sha256sum_output_rejects_short_hex() { + assert_eq!(parse_sha256sum_output("dead /path\n"), None); + } + + #[test] + fn parse_sha256sum_output_rejects_non_hex() { + let out = "ZZZZc0b7809536dea5ff109d231b487a2be4cad742e1152c1268cd800dd6450b /path\n"; + assert_eq!(parse_sha256sum_output(out), None); + } + + #[test] + fn parse_sha256sum_output_empty_is_none() { + assert_eq!(parse_sha256sum_output(""), None); + } + + #[test] + fn status_detection_match_vs_mismatch_vs_missing() { + let expected_hash = "a".repeat(64); + let other_hash = "b".repeat(64); + assert_eq!( + row(Status::Match, Some(&expected_hash)).status, + Status::Match + ); + assert_eq!( + row(Status::Mismatch, Some(&other_hash)).status, + Status::Mismatch + ); + assert_eq!(row(Status::Missing, None).status, Status::Missing); + } +} diff --git a/harmony/src/modules/okd/templates.rs b/harmony/src/modules/okd/templates.rs index 2e1494e6..6993890d 100644 --- a/harmony/src/modules/okd/templates.rs +++ b/harmony/src/modules/okd/templates.rs @@ -16,5 +16,9 @@ pub struct BootstrapIpxeTpl<'a> { pub scos_path: &'a str, pub installation_device: &'a str, pub ignition_http_path: &'a str, - pub ignition_file_name: &'static str, + // Was `&'static str` because bootstrap only ever rendered + // `bootstrap.ign` / `master.ign` / `worker.ign`. Add-node generates + // per-host filenames like `master-cp3.ign`, so the lifetime must be + // bound to the caller's String instead. + pub ignition_file_name: &'a str, } diff --git a/harmony_types/src/lib.rs b/harmony_types/src/lib.rs index 4eb05398..625f9a76 100644 --- a/harmony_types/src/lib.rs +++ b/harmony_types/src/lib.rs @@ -3,5 +3,6 @@ pub mod id; pub mod k8s_name; pub mod net; pub mod rfc1123; +pub mod ssh; pub mod storage; pub mod switch; diff --git a/harmony_types/src/ssh.rs b/harmony_types/src/ssh.rs new file mode 100644 index 00000000..8bf40356 --- /dev/null +++ b/harmony_types/src/ssh.rs @@ -0,0 +1,21 @@ +//! SSH credentials primitive. +//! +//! Deliberately lean: a plain enum holding either a PEM-encoded private key +//! (with an optional passphrase) or a password. No derives for interactive +//! prompting, no JsonSchema — consumers build this from secrets they already +//! hold (`harmony::config::secret::SshKeyPair`, static passwords, etc.) and +//! pass it to `harmony::infra::ssh::run_command`. + +/// How to authenticate an outbound SSH connection. +pub enum SshCredentials { + /// Private-key authentication. `private_pem` is the key material as + /// stored in OpenSSH / PKCS#8 format; `passphrase` decrypts an encrypted + /// key, or `None` for an unencrypted one. + SshKey { + username: String, + private_pem: String, + passphrase: Option, + }, + /// Password authentication. + Password { username: String, password: String }, +} diff --git a/opnsense-config/src/modules/caddy.rs b/opnsense-config/src/modules/caddy.rs index db3e30fc..305c62bc 100644 --- a/opnsense-config/src/modules/caddy.rs +++ b/opnsense-config/src/modules/caddy.rs @@ -32,12 +32,26 @@ impl CaddyConfig { Self { client } } - /// Check if the Caddy plugin is installed by querying its settings endpoint. - pub async fn is_installed(&self) -> bool { - self.client + /// Check whether the Caddy plugin is installed by querying its settings + /// endpoint. + /// + /// Returns: + /// - `Ok(true)` — the endpoint returned 2xx, plugin is present. + /// - `Ok(false)` — OPNsense returned **404** specifically, meaning the + /// MVC route doesn't exist because the plugin isn't installed. + /// - `Err(_)` — anything else (timeout, TLS mismatch, 401/403, 5xx, + /// malformed JSON). The caller must not assume "not installed" in + /// this case; it means we couldn't determine state. + pub async fn is_installed(&self) -> Result { + match self + .client .get_typed::("caddy", "General", "get") .await - .is_ok() + { + Ok(_) => Ok(true), + Err(opnsense_api::Error::Api { status, .. }) if status.as_u16() == 404 => Ok(false), + Err(e) => Err(Error::Api(e)), + } } /// Enable or disable Caddy, setting http_port=8080 and https_port=8443.