wip feat/add-new-node #268

Open
stremblay wants to merge 27 commits from feat/add-new-node into master
33 changed files with 2676 additions and 233 deletions

View File

@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "SELECT data as \"data!: Vec<u8>\" FROM physical_hosts WHERE id = ? ORDER BY version_id DESC LIMIT 1",
"query": "SELECT data as \"data!: Vec<u8>\" FROM physical_hosts WHERE id = ? ORDER BY rowid DESC LIMIT 1",
"describe": {
"columns": [
{
@@ -16,5 +16,5 @@
false
]
},
"hash": "c7ca191faaa23b3ec5019f8c4910f666db9c6c2be22ffe563be4b7caef645bd1"
"hash": "08e36d39f08ee7a06a9faa58990414fe16bec75c3458ee809755d2c975c487cc"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "SELECT id, version_id, data as \"data: Json<PhysicalHost>\" FROM physical_hosts WHERE id = ? ORDER BY version_id DESC LIMIT 1",
"query": "SELECT id, version_id, data as \"data: Json<PhysicalHost>\" FROM physical_hosts WHERE id = ? ORDER BY rowid DESC LIMIT 1",
"describe": {
"columns": [
{
@@ -28,5 +28,5 @@
false
]
},
"hash": "934035c7ca6e064815393e4e049a7934b0a7fac04a4fe4b2a354f0443d630990"
"hash": "36273d65f86cd9070b278a3c3da4ea16e21311f4ba579522282499ac5c62bd5e"
}

View File

@@ -0,0 +1,32 @@
{
"db_name": "SQLite",
"query": "SELECT host_id, installation_device, network_config FROM host_role_mapping WHERE role = ? ORDER BY id DESC LIMIT 1",
"describe": {
"columns": [
{
"name": "host_id",
"ordinal": 0,
"type_info": "Text"
},
{
"name": "installation_device",
"ordinal": 1,
"type_info": "Text"
},
{
"name": "network_config",
"ordinal": 2,
"type_info": "Text"
}
],
"parameters": {
"Right": 1
},
"nullable": [
false,
true,
true
]
},
"hash": "3caebb777887adea0fccc71108b235e49354086857390290753bc22d90d287a0"
}

View File

@@ -1,6 +1,6 @@
{
"db_name": "SQLite",
"query": "\n SELECT\n p1.id,\n p1.version_id,\n p1.data as \"data: Json<PhysicalHost>\"\n FROM\n physical_hosts p1\n INNER JOIN (\n SELECT\n id,\n MAX(version_id) AS max_version\n FROM\n physical_hosts\n GROUP BY\n id\n ) p2 ON p1.id = p2.id AND p1.version_id = p2.max_version\n ",
"query": "\n SELECT\n p1.id,\n p1.version_id,\n p1.data as \"data: Json<PhysicalHost>\"\n FROM\n physical_hosts p1\n INNER JOIN (\n SELECT\n id,\n MAX(rowid) AS max_rowid\n FROM\n physical_hosts\n GROUP BY\n id\n ) p2 ON p1.id = p2.id AND p1.rowid = p2.max_rowid\n ",
"describe": {
"columns": [
{
@@ -28,5 +28,5 @@
false
]
},
"hash": "8d247918eca10a88b784ee353db090c94a222115c543231f2140cba27bd0f067"
"hash": "8dfce6f15a95a44f5899e2953cfc56d8d1b34fe36615edcf3fed08dd81aefa7a"
}

25
Cargo.lock generated
View File

@@ -3612,6 +3612,7 @@ dependencies = [
"rand 0.9.2",
"reqwest 0.11.27",
"russh",
"russh-keys",
"rust-ipmi",
"schemars 0.8.22",
"semver",
@@ -5392,6 +5393,22 @@ dependencies = [
"web-time",
]
[[package]]
name = "okd_add_node"
version = "0.1.0"
dependencies = [
"cidr",
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_secret",
"harmony_secret_derive",
"harmony_types",
"schemars 0.8.22",
"serde",
"tokio",
]
[[package]]
name = "once_cell"
version = "1.21.3"
@@ -7738,20 +7755,12 @@ dependencies = [
name = "sttest"
version = "0.1.0"
dependencies = [
"brocade",
"cidr",
"env_logger",
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_secret",
"harmony_secret_derive",
"harmony_types",
"log",
"schemars 0.8.22",
"serde",
"tokio",
"url",
]
[[package]]

View File

@@ -0,0 +1,19 @@
[package]
name = "okd_add_node"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_macros = { path = "../../harmony_macros" }
harmony_types = { path = "../../harmony_types" }
harmony_secret = { path = "../../harmony_secret" }
harmony_secret_derive = { path = "../../harmony_secret_derive" }
tokio.workspace = true
cidr.workspace = true
serde = { workspace = true }
schemars = "0.8"

View File

@@ -0,0 +1,9 @@
export HARMONY_SECRET_NAMESPACE=okd-add-node
export HARMONY_SECRET_STORE=file
export HARMONY_DATABASE_URL=sqlite://okd_add_node.sqlite
export RUST_LOG=harmony=debug
# Required: the kubeconfig for the running OKD cluster. OKDAddNodeScore reads
# the existing nodes of the target role and pulls the user-data-managed secret
# from `openshift-machine-api` through this.
# export KUBECONFIG=/path/to/cluster/kubeconfig

View File

@@ -0,0 +1,51 @@
//! Runnable example for [`OKDAddNodeScore`].
//!
//! Source `env.sh` first (sets `HARMONY_DATABASE_URL` / `KUBECONFIG`), then:
//!
//! ```bash
//! cargo run -p okd_add_node
//! ```
//!
//! The score will:
//! 1. read existing nodes of the target role from the live cluster,
//! 2. log the auto-derived hostname + IP,
//! 3. wait for the operator to PXE-boot the new machine,
//! 4. walk them through disk / bond / blacklist selection,
//! 5. publish the ignition + per-MAC iPXE + DHCP reservation on the firewall,
//! 6. ask the operator to power-cycle.
mod topology;
use harmony::{
modules::{
inventory::HarmonyDiscoveryStrategy,
okd::add_node::{AddNodeRole, OKDAddNodeScore},
},
score::Score,
topology::HAClusterTopology,
};
use harmony_macros::cidrv4;
use crate::topology::{get_inventory, get_topology};
#[tokio::main]
async fn main() {
harmony_cli::cli_logger::init();
let inventory = get_inventory();
let topology = get_topology().await;
let add_node = OKDAddNodeScore {
role: AddNodeRole::ControlPlane,
discovery_strategy: HarmonyDiscoveryStrategy::SUBNET {
cidr: cidrv4!("192.168.40.0/24"),
port: 25000,
},
};
let scores: Vec<Box<dyn Score<HAClusterTopology>>> = vec![Box::new(add_node)];
harmony_cli::run(inventory, topology, scores, None)
.await
.unwrap();
}

View File

@@ -0,0 +1,101 @@
//! Topology for the add-node example. Mirrors `examples/sttest/src/topology.rs`
//! shape but is deliberately standalone so this example can be copied into a
//! fresh cluster without dragging the whole sttest crate along.
use cidr::Ipv4Cidr;
use harmony::{
hardware::{Location, SwitchGroup},
infra::{brocade::UnmanagedSwitch, opnsense::OPNSenseManagementInterface},
inventory::Inventory,
topology::{HAClusterTopology, LogicalHost, UnmanagedRouter},
};
use harmony_macros::{ip, ipv4};
use harmony_secret::{Secret, SecretManager};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::{
net::IpAddr,
sync::{Arc, OnceLock},
};
#[derive(Secret, Serialize, Deserialize, JsonSchema, Debug, PartialEq)]
struct OkdAddNodeFirewallConfig {
username: String,
password: String,
}
/// Build an HAClusterTopology pointing at an *already-running* cluster.
///
/// `kubeconfig` is `None` because `K8sclient::k8s_client` falls back to
/// `K8sClient::try_default()`, which picks up the `KUBECONFIG` env var or the
/// standard `~/.kube/config` path. Edit the IPs/domain to match your setup.
pub async fn get_topology() -> HAClusterTopology {
let firewall = LogicalHost {
ip: ip!("192.168.40.1"),
name: String::from("fw0"),
};
let switch_client = Arc::new(
UnmanagedSwitch::init()
.await
.expect("Failed to connect to switch"),
);
let config = SecretManager::get_or_prompt::<OkdAddNodeFirewallConfig>()
.await
.unwrap();
let api_creds = harmony::config::secret::OPNSenseApiCredentials {
key: config.username.clone(),
secret: config.password.clone(),
};
let ssh_creds = harmony::config::secret::OPNSenseFirewallCredentials {
username: config.username,
password: config.password,
};
let opnsense = Arc::new(
harmony::infra::opnsense::OPNSenseFirewall::new(firewall, None, &api_creds, &ssh_creds)
.await,
);
let lan_subnet = ipv4!("192.168.40.0");
let gateway_ipv4 = ipv4!("192.168.40.1");
let gateway_ip = IpAddr::V4(gateway_ipv4);
HAClusterTopology {
kubeconfig: None,
domain_name: "addnode.harmony.mcd".to_string(),
router: Arc::new(UnmanagedRouter::new(
gateway_ip,
Ipv4Cidr::new(lan_subnet, 24).unwrap(),
)),
load_balancer: opnsense.clone(),
firewall: opnsense.clone(),
tftp_server: opnsense.clone(),
http_server: opnsense.clone(),
dhcp_server: opnsense.clone(),
dns_server: opnsense.clone(),
// control_plane / workers / bootstrap_host are only consulted by the
// *bootstrap* flow. OKDAddNodeScore reads existing nodes straight
// from the live cluster via kube-rs, so these can stay empty here.
control_plane: vec![],
workers: vec![],
bootstrap_host: LogicalHost {
ip: ip!("192.168.40.10"),
name: "bootstrap".to_string(),
},
node_exporter: opnsense.clone(),
switch_client: switch_client.clone(),
network_manager: OnceLock::new(),
}
}
pub fn get_inventory() -> Inventory {
Inventory {
location: Location::new("Add-node example".to_string(), "Anywhere".to_string()),
switch: SwitchGroup::from([]),
firewall_mgmt: Box::new(OPNSenseManagementInterface::new()),
storage_host: vec![],
worker_host: vec![],
control_plane_host: vec![],
}
}

View File

@@ -9,15 +9,7 @@ publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
cidr = { workspace = true }
tokio = { workspace = true }
harmony_macros = { path = "../../harmony_macros" }
harmony_secret = { path = "../../harmony_secret" }
harmony_secret_derive = { path = "../../harmony_secret_derive" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }
serde = { workspace = true }
brocade = { path = "../../brocade" }
schemars = "0.8"
cidr = { workspace = true }
tokio = { workspace = true }

View File

@@ -1,4 +1,19 @@
export HARMONY_SECRET_NAMESPACE=sttest0
export HARMONY_SECRET_STORE=file
export HARMONY_DATABASE_URL=sqlite://harmony_sttest0.sqlite
export RUST_LOG=info
export RUST_LOG=harmony=debug
# Two OPNsense credential pairs are required (both are Harmony Secrets and
# will be prompted for interactively on first run; the env vars below are
# here as a reminder for unattended runs):
# - OPNSenseFirewallCredentials (SSH username/password)
# - OPNSenseApiCredentials (API key/secret from System > Access >
# Users > API Keys)
#
# export OPNSENSE_PRIMARY_API_KEY="..."
# export OPNSENSE_PRIMARY_API_SECRET="..."
# After the install pipeline finishes, OKDAddNodeScore needs a reachable
# cluster. Point KUBECONFIG at the kubeconfig that `openshift-install`
# wrote during bootstrap, e.g.:
# export KUBECONFIG=./data/okd/installation_files_sttest0/auth/kubeconfig

View File

@@ -6,22 +6,35 @@ use harmony::{
data::{FileContent, FilePath},
modules::{
inventory::HarmonyDiscoveryStrategy,
okd::{installation::OKDInstallationPipeline, ipxe::OKDIpxeScore},
okd::{
add_node::{AddNodeRole, OKDAddNodeScore},
installation::OKDInstallationPipeline,
ipxe::OKDIpxeScore,
},
},
score::Score,
topology::HAClusterTopology,
};
use harmony_macros::cidrv4;
use harmony_secret::SecretManager;
#[tokio::main]
async fn main() {
// env_logger::init();
harmony_cli::cli_logger::init();
let inventory = get_inventory();
let topology = get_topology().await;
let ssh_key = SecretManager::get_or_prompt::<SshKeyPair>().await.unwrap();
// Discovery runs as a CIDR scan across the sttest LAN on the
// harmony_inventory_agent's default port. Shared between the install
// pipeline and the trailing add-node score so they probe the same way.
let discovery_strategy = HarmonyDiscoveryStrategy::SUBNET {
cidr: cidrv4!("192.168.40.0/24"),
port: 25000,
};
let mut scores: Vec<Box<dyn Score<HAClusterTopology>>> = vec![Box::new(OKDIpxeScore {
kickstart_filename: "inventory.kickstart".to_string(),
harmony_inventory_agent: "harmony_inventory_agent".to_string(),
@@ -31,9 +44,17 @@ async fn main() {
},
})];
// let mut scores: Vec<Box<dyn Score<HAClusterTopology>>> = vec![];
scores
.append(&mut OKDInstallationPipeline::get_all_scores(HarmonyDiscoveryStrategy::MDNS).await);
scores.append(&mut OKDInstallationPipeline::get_all_scores(discovery_strategy.clone()).await);
// After the cluster is up, exercise the Day-2 add-node flow by adding a
// fourth control plane (cp3). This only publishes the ignition + byMAC +
// DHCP artifacts — etcd membership, API serving-cert rotation, and CSR
// approval for the new CP are still manual follow-ups per the score's
// success footer.
scores.push(Box::new(OKDAddNodeScore {
role: AddNodeRole::ControlPlane,
discovery_strategy,
}));
harmony_cli::run(inventory, topology, scores, None)
.await

View File

@@ -1,57 +1,61 @@
use cidr::Ipv4Cidr;
use harmony::{
config::secret::{OPNSenseApiCredentials, OPNSenseFirewallCredentials},
hardware::{Location, SwitchGroup},
infra::{brocade::UnmanagedSwitch, opnsense::OPNSenseManagementInterface},
inventory::Inventory,
topology::{HAClusterTopology, LogicalHost, UnmanagedRouter},
};
use harmony_macros::{ip, ipv4};
use harmony_secret::{Secret, SecretManager};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use harmony_secret::SecretManager;
use std::{
net::IpAddr,
sync::{Arc, OnceLock},
};
#[derive(Secret, Serialize, Deserialize, JsonSchema, Debug, PartialEq)]
struct OPNSenseFirewallConfig {
username: String,
password: String,
}
pub async fn get_topology() -> HAClusterTopology {
let firewall = harmony::topology::LogicalHost {
/// Build the firewall instance on its own so other scores (e.g. an OPNsense
/// upgrade score) can get an `Arc<OPNSenseFirewall>` without having to rebuild
/// the whole topology. Mirrors the shape used in `affilium2/src/topology.rs`.
pub async fn get_opnsense() -> Arc<harmony::infra::opnsense::OPNSenseFirewall> {
let firewall = LogicalHost {
ip: ip!("192.168.40.1"),
name: String::from("fw0"),
};
let switch_client = UnmanagedSwitch::init()
.await
.expect("Failed to connect to switch");
let switch_client = Arc::new(switch_client);
let config = SecretManager::get_or_prompt::<OPNSenseFirewallConfig>()
let ssh_creds = SecretManager::get_or_prompt::<OPNSenseFirewallCredentials>()
.await
.unwrap();
let api_credentials = SecretManager::get_or_prompt::<OPNSenseApiCredentials>()
.await
.unwrap();
let api_creds = harmony::config::secret::OPNSenseApiCredentials {
key: config.username.clone(),
secret: config.password.clone(),
};
let ssh_creds = harmony::config::secret::OPNSenseFirewallCredentials {
username: config.username,
password: config.password,
};
let opnsense = Arc::new(
harmony::infra::opnsense::OPNSenseFirewall::new(firewall, None, &api_creds, &ssh_creds)
.await,
// API port is 9443 because the OPNsense web GUI is moved off 443 so
// HAProxy can terminate on the standard port. SSH port stays default.
Arc::new(
harmony::infra::opnsense::OPNSenseFirewall::with_api_port(
firewall,
None,
8443,
&api_credentials,
&ssh_creds,
)
.await,
)
}
pub async fn get_topology() -> HAClusterTopology {
let switch_client = Arc::new(
UnmanagedSwitch::init()
.await
.expect("Failed to connect to switch"),
);
let opnsense = get_opnsense().await;
let lan_subnet = ipv4!("192.168.40.0");
let gateway_ipv4 = ipv4!("192.168.40.1");
let gateway_ip = IpAddr::V4(gateway_ipv4);
harmony::topology::HAClusterTopology {
HAClusterTopology {
kubeconfig: None,
domain_name: "sttest0.harmony.mcd".to_string(),
router: Arc::new(UnmanagedRouter::new(

90
harmony-k8s/src/csr.rs Normal file
View File

@@ -0,0 +1,90 @@
//! CSR approval helpers used by Day-2 node-add flows.
//!
//! Kubelets generate two CertificateSigningRequests when a node first joins
//! a cluster:
//!
//! - `kubernetes.io/kube-apiserver-client-kubelet` — the bootstrap client
//! cert, used by the kubelet to authenticate to the API server.
//! - `kubernetes.io/kubelet-serving` — the kubelet's own TLS serving cert.
//!
//! On OKD, `cluster-machine-approver` auto-approves the client CSR for
//! workers that belong to a MachineSet; nothing auto-approves the serving
//! CSR and nothing auto-approves anything for control-plane nodes added
//! Day-2. These helpers are the minimum needed to let a Harmony score
//! poll for and approve pending CSRs on behalf of the operator.
use k8s_openapi::api::certificates::v1::{
CertificateSigningRequest, CertificateSigningRequestCondition, CertificateSigningRequestStatus,
};
use kube::{
Error,
api::{Api, ListParams, Patch, PatchParams},
};
use log::debug;
use crate::client::K8sClient;
/// The `kubernetes.io/kubelet-serving` signer.
pub const KUBELET_SERVING_SIGNER: &str = "kubernetes.io/kubelet-serving";
/// The `kubernetes.io/kube-apiserver-client-kubelet` signer.
pub const KUBE_APISERVER_CLIENT_KUBELET_SIGNER: &str =
"kubernetes.io/kube-apiserver-client-kubelet";
impl K8sClient {
/// List CSRs that are still pending (no `Approved` or `Denied` condition
/// yet) **and** whose signer is one of the two kubelet signers. CSRs
/// issued to other signers (controller-manager, front-proxy, custom
/// signers) are never returned.
pub async fn list_pending_kubelet_csrs(&self) -> Result<Vec<CertificateSigningRequest>, Error> {
let api: Api<CertificateSigningRequest> = Api::all(self.client.clone());
let list = api.list(&ListParams::default()).await?;
let out = list
.items
.into_iter()
.filter(|csr| is_kubelet_signer(&csr.spec.signer_name))
.filter(is_pending)
.collect();
Ok(out)
}
/// Approve the named CSR by PATCHing its `/approval` subresource to add
/// an `Approved=True` condition with the given `reason` and `message`.
///
/// No-op on an already-approved CSR: the API server accepts repeated
/// approvals as long as they carry the same `type` and `status`.
pub async fn approve_csr(&self, name: &str, reason: &str, message: &str) -> Result<(), Error> {
let api: Api<CertificateSigningRequest> = Api::all(self.client.clone());
let status = CertificateSigningRequestStatus {
certificate: None,
conditions: Some(vec![CertificateSigningRequestCondition {
type_: "Approved".to_string(),
status: "True".to_string(),
reason: Some(reason.to_string()),
message: Some(message.to_string()),
last_update_time: None,
last_transition_time: None,
}]),
};
let patch = Patch::Merge(serde_json::json!({ "status": status }));
debug!("Patching CSR {name} /approval: reason={reason}");
api.patch_approval(name, &PatchParams::default(), &patch)
.await?;
Ok(())
}
}
fn is_kubelet_signer(signer: &str) -> bool {
signer == KUBELET_SERVING_SIGNER || signer == KUBE_APISERVER_CLIENT_KUBELET_SIGNER
}
fn is_pending(csr: &CertificateSigningRequest) -> bool {
let Some(status) = csr.status.as_ref() else {
return true;
};
let Some(conds) = status.conditions.as_ref() else {
return true;
};
!conds
.iter()
.any(|c| c.type_ == "Approved" || c.type_ == "Denied")
}

View File

@@ -2,6 +2,7 @@ pub mod apply;
pub mod bundle;
pub mod client;
pub mod config;
pub mod csr;
pub mod discovery;
pub mod domain;
pub mod helper;

View File

@@ -16,7 +16,8 @@ reqwest = { version = "0.11", features = [
"json",
"rustls-tls",
], default-features = false }
russh = "0.45.0"
russh = { workspace = true }
russh-keys = { workspace = true }
rust-ipmi = "0.1.1"
semver = "1.0.23"
serde.workspace = true

View File

@@ -53,4 +53,14 @@ pub trait InventoryRepository: Send + Sync + 'static {
/// Return the current role mapping for a host, if any. Used at discovery
/// time to ask the operator whether to overwrite or cancel.
async fn get_role_mapping(&self, host_id: &Id) -> Result<Option<HostRoleMapping>, RepoError>;
/// Return the most recently saved role mapping for `role`, along with
/// its `PhysicalHost`. Used by Day-2 flows (e.g. OKDAddNodeScore) that
/// kick off discovery and then need to identify the host the operator
/// just picked — picking "the newest row" is robust whether the row is
/// brand-new or replaces an earlier mapping for the same host.
async fn get_latest_host_for_role(
&self,
role: &HostRole,
) -> Result<Option<(PhysicalHost, HostConfig)>, RepoError>;
}

View File

@@ -44,7 +44,7 @@ pub trait Score<T: Topology>:
topology: topology.name().into(),
interpret: interpret_name.clone(),
score: score_name.clone(),
message: format!("{} running...", interpret_name),
message: format!("[{}] {} starting...", score_name, interpret_name),
})
.unwrap();

View File

@@ -58,8 +58,14 @@ impl InventoryRepository for SqliteInventoryRepository {
// discovery is naturally a polling activity (mDNS is continuous, CIDR scans get
// re-run) and we don't want an unbounded pile of identical version rows. Real
// changes still produce a new version row (audit trail for free).
//
// We order by SQLite's `rowid` (monotonic integer assigned on each INSERT)
// rather than `version_id`, because `version_id` is `Id::default()` which
// formats `{hex_timestamp}_{rand}` with a variable-length hex — so lex
// ordering is NOT time-monotonic (`'_'` (0x5F) > any digit breaks the
// ranking when the masked clock crosses a hex-width boundary).
let latest = sqlx::query!(
r#"SELECT data as "data!: Vec<u8>" FROM physical_hosts WHERE id = ? ORDER BY version_id DESC LIMIT 1"#,
r#"SELECT data as "data!: Vec<u8>" FROM physical_hosts WHERE id = ? ORDER BY rowid DESC LIMIT 1"#,
host_id
)
.fetch_optional(&self.pool)
@@ -86,9 +92,12 @@ impl InventoryRepository for SqliteInventoryRepository {
}
async fn get_latest_by_id(&self, host_id: &str) -> Result<Option<PhysicalHost>, RepoError> {
// `rowid` (implicit INTEGER monotonically increasing on every INSERT)
// is the source of truth for "most recently saved" here. `version_id`
// can't carry that role reliably — see the note in `save()`.
let row = sqlx::query_as!(
DbHost,
r#"SELECT id, version_id, data as "data: Json<PhysicalHost>" FROM physical_hosts WHERE id = ? ORDER BY version_id DESC LIMIT 1"#,
r#"SELECT id, version_id, data as "data: Json<PhysicalHost>" FROM physical_hosts WHERE id = ? ORDER BY rowid DESC LIMIT 1"#,
host_id
)
.fetch_optional(&self.pool)
@@ -98,6 +107,11 @@ impl InventoryRepository for SqliteInventoryRepository {
}
async fn get_all_hosts(&self) -> Result<Vec<PhysicalHost>, RepoError> {
// Pick the latest row per host_id by `MAX(rowid)` rather than
// `MAX(version_id)` — same reason as `save()`: version_id's format
// isn't lexicographically time-monotonic, so a stale row can
// outrank a fresh one and the user ends up with an old IP in the
// Select.
let db_hosts = sqlx::query_as!(
DbHost,
r#"
@@ -110,12 +124,12 @@ impl InventoryRepository for SqliteInventoryRepository {
INNER JOIN (
SELECT
id,
MAX(version_id) AS max_version
MAX(rowid) AS max_rowid
FROM
physical_hosts
GROUP BY
id
) p2 ON p1.id = p2.id AND p1.version_id = p2.max_version
) p2 ON p1.id = p2.id AND p1.rowid = p2.max_rowid
"#
)
.fetch_all(&self.pool)
@@ -210,6 +224,51 @@ impl InventoryRepository for SqliteInventoryRepository {
}))
}
async fn get_latest_host_for_role(
&self,
role: &HostRole,
) -> Result<Option<(PhysicalHost, HostConfig)>, RepoError> {
struct Row {
host_id: String,
installation_device: Option<String>,
network_config: Option<String>,
}
let role_str = format!("{:?}", role);
let row = sqlx::query_as!(
Row,
"SELECT host_id, installation_device, network_config FROM host_role_mapping WHERE role = ? ORDER BY id DESC LIMIT 1",
role_str
)
.fetch_optional(&self.pool)
.await?;
let Some(row) = row else { return Ok(None) };
let physical_host = match self.get_latest_by_id(&row.host_id).await? {
Some(host) => host,
None => {
return Err(RepoError::QueryFailed(format!(
"Found a role mapping for host_id '{}', but the host does not exist in the physical_hosts table.",
row.host_id
)));
}
};
let network_config = match row.network_config.as_deref() {
Some(json) => {
serde_json::from_str(json).map_err(|e| RepoError::Deserialization(e.to_string()))?
}
None => NetworkConfig::default(),
};
let host_config = HostConfig {
installation_device: row.installation_device,
network_config,
};
Ok(Some((physical_host, host_config)))
}
async fn get_hosts_for_role(
&self,
role: &HostRole,

View File

@@ -5,5 +5,7 @@ pub mod intel_amt;
pub mod inventory;
pub mod kube;
pub mod network_manager;
pub(crate) mod networkmanager_cfg;
pub mod opnsense;
mod sqlx;
pub mod ssh;

View File

@@ -3,10 +3,9 @@ use std::{
sync::Arc,
};
use askama::Template;
use async_trait::async_trait;
use harmony_k8s::{DrainOptions, K8sClient, NodeFile};
use harmony_types::id::Id;
use harmony_types::{firewall::LaggProtocol, id::Id};
use k8s_openapi::api::core::v1::Node;
use kube::{
ResourceExt,
@@ -15,65 +14,11 @@ use kube::{
use log::{debug, info, warn};
use crate::{
infra::networkmanager_cfg::{render_bond_master, render_bond_slave},
modules::okd::crd::nmstate,
topology::{HostNetworkConfig, NetworkError, NetworkManager},
};
/// NetworkManager bond configuration template
#[derive(Template)]
#[template(
source = r#"[connection]
id={{ bond_name }}
uuid={{ bond_uuid }}
type=bond
autoconnect-slaves=1
interface-name={{ bond_name }}
[bond]
lacp_rate=fast
mode=802.3ad
xmit_hash_policy=layer2
[ipv4]
method=auto
[ipv6]
addr-gen-mode=default
method=auto
[proxy]
"#,
ext = "txt"
)]
struct BondConfigTemplate {
bond_name: String,
bond_uuid: String,
}
/// NetworkManager bond slave configuration template
#[derive(Template)]
#[template(
source = r#"[connection]
id={{ slave_id }}
uuid={{ slave_uuid }}
type=ethernet
interface-name={{ interface_name }}
master={{ bond_name }}
slave-type=bond
[ethernet]
[bond-port]
"#,
ext = "txt"
)]
struct BondSlaveConfigTemplate {
slave_id: String,
slave_uuid: String,
interface_name: String,
bond_name: String,
}
/// TODO document properly the non-intuitive behavior or "roll forward only" of nmstate in general
/// It is documented in nmstate official doc, but worth mentionning here :
///
@@ -473,19 +418,16 @@ impl OpenShiftNmStateNetworkManager {
let mut files = Vec::new();
let bond_name = "bond0";
let bond_uuid = uuid::Uuid::new_v4().to_string();
// Generate bond master configuration
let bond_template = BondConfigTemplate {
bond_name: bond_name.to_string(),
bond_uuid: bond_uuid.clone(),
};
let bond_content = bond_template.render().map_err(|e| {
NetworkError::new(format!(
"Failed to render bond configuration template: {}",
e
))
})?;
// Post-install bonding here has always been LACP; the primary-iface
// argument is only consumed by active-backup mode, so an empty string
// is fine for this call site.
let primary_iface = config
.switch_ports
.first()
.map(|p| p.interface.name.as_str())
.unwrap_or("");
let bond_content =
render_bond_master(bond_name, &bond_uuid, &LaggProtocol::Lacp, primary_iface);
files.push(NodeFile {
path: format!(
@@ -496,25 +438,12 @@ impl OpenShiftNmStateNetworkManager {
mode: 0o600,
});
// Generate slave configurations for each interface
for switch_port in &config.switch_ports {
let interface_name = &switch_port.interface.name;
let slave_id = format!("{}-{}", bond_name, interface_name);
let slave_uuid = uuid::Uuid::new_v4().to_string();
let slave_template = BondSlaveConfigTemplate {
slave_id: slave_id.clone(),
slave_uuid,
interface_name: interface_name.clone(),
bond_name: bond_name.to_string(),
};
let slave_content = slave_template.render().map_err(|e| {
NetworkError::new(format!(
"Failed to render slave configuration template for interface '{}': {}",
interface_name, e
))
})?;
let slave_content =
render_bond_slave(&slave_id, &slave_uuid, interface_name, bond_name);
files.push(NodeFile {
path: format!(

View File

@@ -0,0 +1,166 @@
//! NetworkManager `.nmconnection` content builders.
//!
//! Two call sites use these today: the post-install bond flow in
//! `OpenShiftNmStateNetworkManager` (always LACP, hardcoded bond name
//! `bond0`), and the Day-2 add-node flow which injects the files into an
//! ignition config derived from the operator's discovery-time choices. Keeping
//! them here means both paths produce bit-identical output for the same input.
use harmony_types::firewall::LaggProtocol;
/// NetworkManager bond `mode=` value for a `LaggProtocol`.
pub(crate) fn nm_bond_mode(proto: &LaggProtocol) -> &'static str {
match proto {
LaggProtocol::Lacp => "802.3ad",
// active-backup is the safe default for both "failover" intent and
// "no protocol selected" — nmcli refuses `mode=none` on a bond.
LaggProtocol::Failover | LaggProtocol::None => "active-backup",
LaggProtocol::LoadBalance => "balance-xor",
LaggProtocol::RoundRobin => "balance-rr",
}
}
/// Render the master `.nmconnection` file for a bond.
///
/// `primary_iface` only matters in active-backup mode (`LaggProtocol::Failover`
/// or `None`). Other modes ignore it; pass any bond member.
pub(crate) fn render_bond_master(
bond_name: &str,
bond_uuid: &str,
proto: &LaggProtocol,
primary_iface: &str,
) -> String {
let bond_section = match proto {
// Preserves the exact LACP block shipped before this module existed;
// changing the attribute order would produce a textually different
// ignition even though the semantics are identical.
LaggProtocol::Lacp => "lacp_rate=fast\nmode=802.3ad\nxmit_hash_policy=layer2\n".to_string(),
LaggProtocol::Failover | LaggProtocol::None => {
format!("mode=active-backup\nmiimon=100\nprimary={primary_iface}\n")
}
LaggProtocol::LoadBalance => {
"mode=balance-xor\nmiimon=100\nxmit_hash_policy=layer2\n".to_string()
}
LaggProtocol::RoundRobin => "mode=balance-rr\nmiimon=100\n".to_string(),
};
format!(
"[connection]\n\
id={bond_name}\n\
uuid={bond_uuid}\n\
type=bond\n\
autoconnect-slaves=1\n\
interface-name={bond_name}\n\
\n\
[bond]\n\
{bond_section}\n\
[ipv4]\n\
method=auto\n\
\n\
[ipv6]\n\
addr-gen-mode=default\n\
method=auto\n\
\n\
[proxy]\n"
)
}
/// Render a `.nmconnection` file for a bond slave (one per bonded interface).
pub(crate) fn render_bond_slave(
slave_id: &str,
slave_uuid: &str,
interface_name: &str,
bond_name: &str,
) -> String {
format!(
"[connection]\n\
id={slave_id}\n\
uuid={slave_uuid}\n\
type=ethernet\n\
interface-name={interface_name}\n\
master={bond_name}\n\
slave-type=bond\n\
\n\
[ethernet]\n\
\n\
[bond-port]\n"
)
}
/// Render a `.nmconnection` file that blacklists an interface — it exists on
/// the system but never auto-connects and carries no v4/v6 address.
pub(crate) fn render_disabled_interface(interface_name: &str) -> String {
format!(
"[connection]\n\
id={interface_name}-disabled\n\
type=ethernet\n\
interface-name={interface_name}\n\
autoconnect=false\n\
\n\
[ipv4]\n\
method=disabled\n\
\n\
[ipv6]\n\
method=disabled\n"
)
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn nm_bond_mode_maps_every_variant() {
assert_eq!(nm_bond_mode(&LaggProtocol::Lacp), "802.3ad");
assert_eq!(nm_bond_mode(&LaggProtocol::Failover), "active-backup");
assert_eq!(nm_bond_mode(&LaggProtocol::None), "active-backup");
assert_eq!(nm_bond_mode(&LaggProtocol::LoadBalance), "balance-xor");
assert_eq!(nm_bond_mode(&LaggProtocol::RoundRobin), "balance-rr");
}
#[test]
fn bond_master_lacp_matches_legacy_output() {
let out = render_bond_master("bond0", "uuid-1", &LaggProtocol::Lacp, "enp1s0f0");
let expected = "\
[connection]
id=bond0
uuid=uuid-1
type=bond
autoconnect-slaves=1
interface-name=bond0
[bond]
lacp_rate=fast
mode=802.3ad
xmit_hash_policy=layer2
[ipv4]
method=auto
[ipv6]
addr-gen-mode=default
method=auto
[proxy]
";
assert_eq!(out, expected);
}
#[test]
fn bond_master_active_backup_uses_primary() {
let out = render_bond_master("bond0", "u", &LaggProtocol::Failover, "enp1s0f0");
assert!(out.contains("mode=active-backup"));
assert!(out.contains("primary=enp1s0f0"));
assert!(out.contains("miimon=100"));
}
#[test]
fn disabled_interface_has_both_stacks_disabled() {
let out = render_disabled_interface("eno1");
assert!(out.contains("interface-name=eno1"));
assert!(out.contains("id=eno1-disabled"));
assert!(out.contains("autoconnect=false"));
assert!(out.contains("[ipv4]\nmethod=disabled"));
assert!(out.contains("[ipv6]\nmethod=disabled"));
}
}

View File

@@ -122,16 +122,30 @@ impl HttpServer for OPNSenseFirewall {
}
async fn ensure_initialized(&self) -> Result<(), ExecutorError> {
if !self.opnsense_config.caddy().is_installed().await {
info!("Http config not available, installing os-caddy package");
self.opnsense_config
.install_package("os-caddy")
.await
.map_err(|e| {
ExecutorError::UnexpectedError(format!("Failed to install os-caddy: {e:?}"))
})?;
} else {
info!("Http config available, assuming Caddy is already installed");
// is_installed() now returns Result<bool, _>: Ok(true) on 2xx,
// Ok(false) specifically on 404, and Err(_) when we couldn't tell
// (timeout, TLS, auth, 5xx). The Err branch must **not** trigger
// an install — that's how Day-2 runs ended up trying to reinstall
// caddy after a transient API hiccup.
match self.opnsense_config.caddy().is_installed().await {
Ok(true) => {
info!("Http config available, assuming Caddy is already installed");
}
Ok(false) => {
info!("Http config not available (404), installing os-caddy package");
self.opnsense_config
.install_package("os-caddy")
.await
.map_err(|e| {
ExecutorError::UnexpectedError(format!("Failed to install os-caddy: {e:?}"))
})?;
}
Err(e) => {
return Err(ExecutorError::UnexpectedError(format!(
"Could not verify whether os-caddy is installed on OPNsense \
(is the API reachable at the configured port?): {e}"
)));
}
}
info!("Adding custom caddy config files");

159
harmony/src/infra/ssh.rs Normal file
View File

@@ -0,0 +1,159 @@
//! Run a single command over SSH.
//!
//! Minimal russh wrapper used by Day-2 flows that need to poke a remote
//! machine — most notably [`OKDAddNodeScore`] rebooting a freshly-discovered
//! host so it picks up its new iPXE config.
//!
//! Public surface: the [`run_command`] free function and the [`SshError`]
//! it returns. No trait, no state carried between calls — each invocation
//! opens a fresh connection and closes it. Credentials come in via
//! `harmony_types::ssh::SshCredentials`.
//!
//! [`OKDAddNodeScore`]: crate::modules::okd::add_node::OKDAddNodeScore
use std::{net::IpAddr, sync::Arc, time::Duration};
use async_trait::async_trait;
use harmony_types::ssh::SshCredentials;
use log::{debug, trace};
use russh::{
Channel, ChannelMsg,
client::{self, Config, Handler, Msg},
keys::key,
};
use russh_keys::decode_secret_key;
use thiserror::Error;
/// How long to wait for the initial TCP+SSH handshake before giving up.
/// The kernel's default SYN retry cascade can burn >2 minutes on a dead
/// host; for Day-2 flows across a LAN a much tighter budget is fine and
/// lets the caller react (e.g. ask the operator to power-cycle) faster.
const SSH_CONNECT_TIMEOUT: Duration = Duration::from_secs(15);
/// Errors produced by [`run_command`].
#[derive(Error, Debug)]
pub enum SshError {
#[error("ssh transport error: {0}")]
Transport(#[from] russh::Error),
#[error("ssh connect to {host}:{port} timed out after {}s", timeout.as_secs())]
ConnectTimeout {
host: IpAddr,
port: u16,
timeout: Duration,
},
#[error("failed to parse private key: {0}")]
KeyParse(String),
#[error("authentication rejected for user '{0}'")]
AuthenticationRejected(String),
#[error("command exited with status {status}, output:\n{output}")]
NonZeroExit { status: u32, output: String },
#[error("unexpected SSH channel message: {0}")]
UnexpectedMsg(String),
#[error("command output was not valid UTF-8: {0}")]
InvalidUtf8(#[from] std::string::FromUtf8Error),
}
/// Open an SSH session to `host:port`, authenticate with `creds`, exec
/// `command`, drain its stdout+stderr, and return the combined output.
///
/// The caller should tolerate errors for commands that sever the SSH
/// session before sending an exit status — e.g. `systemctl reboot`. Wrap
/// those in a `let _ = run_command(...).await;` (or log+ignore).
pub async fn run_command(
host: IpAddr,
port: u16,
creds: &SshCredentials,
command: &str,
) -> Result<String, SshError> {
let config = Arc::new(Config::default());
debug!(
"SSH connect {host}:{port} (timeout {:?})",
SSH_CONNECT_TIMEOUT
);
let mut session = match tokio::time::timeout(
SSH_CONNECT_TIMEOUT,
client::connect(config, (host, port), ClientHandler),
)
.await
{
Ok(res) => res?,
Err(_) => {
return Err(SshError::ConnectTimeout {
host,
port,
timeout: SSH_CONNECT_TIMEOUT,
});
}
};
match creds {
SshCredentials::SshKey {
username,
private_pem,
passphrase,
} => {
let key = decode_secret_key(private_pem, passphrase.as_deref())
.map_err(|e| SshError::KeyParse(e.to_string()))?;
let authed = session
.authenticate_publickey(username, Arc::new(key))
.await?;
if !authed {
return Err(SshError::AuthenticationRejected(username.clone()));
}
}
SshCredentials::Password { username, password } => {
let authed = session.authenticate_password(username, password).await?;
if !authed {
return Err(SshError::AuthenticationRejected(username.clone()));
}
}
}
let mut channel = session.channel_open_session().await?;
debug!("SSH exec: {command}");
channel.exec(true, command).await?;
drain_channel(&mut channel).await
}
struct ClientHandler;
#[async_trait]
impl Handler for ClientHandler {
type Error = russh::Error;
async fn check_server_key(
&mut self,
_server_public_key: &key::PublicKey,
) -> Result<bool, Self::Error> {
// We don't pin host keys today. Day-2 flows run against hosts that
// just PXE'd, so there's no stable fingerprint to check against
// anyway — the integrity guarantee comes from the authorized_keys
// file Harmony itself seeded into the image.
Ok(true)
}
}
async fn drain_channel(channel: &mut Channel<Msg>) -> Result<String, SshError> {
let mut buf = Vec::new();
while let Some(msg) = channel.wait().await {
match msg {
ChannelMsg::Data { ref data } | ChannelMsg::ExtendedData { ref data, .. } => {
buf.extend_from_slice(data);
}
ChannelMsg::ExitStatus { exit_status } => {
if exit_status != 0 {
let output = String::from_utf8_lossy(&buf).into_owned();
return Err(SshError::NonZeroExit {
status: exit_status,
output,
});
}
}
ChannelMsg::Success | ChannelMsg::WindowAdjusted { .. } | ChannelMsg::Eof => {}
other => return Err(SshError::UnexpectedMsg(format!("{other:?}"))),
}
}
let output = String::from_utf8(buf)?;
trace!("SSH command output: {output}");
Ok(output)
}

View File

@@ -309,7 +309,7 @@ impl DiscoverInventoryAgentInterpret {
}
}
Ok(Err(e)) => {
log::info!("Error querying inventory agent on {addr}:{port} : {e}");
log::debug!("Error querying inventory agent on {addr}:{port} : {e}");
}
Err(_) => {
// Timeout for this host

File diff suppressed because it is too large Load Diff

View File

@@ -1,4 +1,7 @@
use std::net::{IpAddr, Ipv4Addr, SocketAddr};
use std::{
collections::HashSet,
net::{IpAddr, Ipv4Addr, SocketAddr},
};
use serde::Serialize;
@@ -8,7 +11,7 @@ use crate::{
score::Score,
topology::{
BackendServer, HAClusterTopology, HealthCheck, HttpMethod, HttpStatusCode, LoadBalancer,
LoadBalancerService, SSL, Topology,
LoadBalancerService, LogicalHost, SSL, Topology,
},
};
@@ -53,18 +56,53 @@ pub struct OKDLoadBalancerScore {
/// ```
impl OKDLoadBalancerScore {
pub fn new(topology: &HAClusterTopology) -> Self {
let (public_services, private_services) =
Self::services_for_nodes(&topology.control_plane, &topology.workers);
Self {
load_balancer_score: LoadBalancerScore {
public_services,
private_services,
},
}
}
/// Build the OKD load-balancer service set for a given cluster shape.
///
/// Returns `(public_services, private_services)` — exactly what the
/// Score's `new()` feeds into `LoadBalancerScore`. Exposing this as a
/// free helper lets Day-2 flows (e.g. `OKDAddNodeScore`) refresh
/// HAProxy against the *current* cluster instead of the bootstrap-time
/// topology snapshot.
pub fn services_for_nodes(
control_plane: &[LogicalHost],
workers: &[LogicalHost],
) -> (Vec<LoadBalancerService>, Vec<LoadBalancerService>) {
// Bind on 0.0.0.0 instead of the LAN IP to avoid CARP VIP race
// conditions where HAProxy fails to bind when the interface
// transitions back to master.
let bind_addr = IpAddr::V4(Ipv4Addr::UNSPECIFIED);
// Order-preserving dedupe by IP. OKD marks CPs as schedulable
// workers (both master AND worker labels), so when Day-2 flows
// build `control_plane` and `workers` by querying each label
// separately the CPs appear in both slices. Feeding duplicates
// into OPNsense makes HAProxy emit two `server` lines with the
// same name, which `haproxy -c` rejects.
let mut all_nodes: Vec<LogicalHost> = Vec::new();
let mut seen: HashSet<String> = HashSet::new();
for host in control_plane.iter().chain(workers.iter()) {
if seen.insert(host.ip.to_string()) {
all_nodes.push(host.clone());
}
}
let public_services = vec![
LoadBalancerService {
backend_servers: Self::nodes_to_backend_server(topology, 80),
backend_servers: backends_for(&all_nodes, 80),
listening_port: SocketAddr::new(bind_addr, 80),
health_check: None,
},
LoadBalancerService {
backend_servers: Self::nodes_to_backend_server(topology, 443),
backend_servers: backends_for(&all_nodes, 443),
listening_port: SocketAddr::new(bind_addr, 443),
health_check: None,
},
@@ -72,7 +110,7 @@ impl OKDLoadBalancerScore {
let private_services = vec![
LoadBalancerService {
backend_servers: Self::nodes_to_backend_server(topology, 80),
backend_servers: backends_for(&all_nodes, 80),
listening_port: SocketAddr::new(bind_addr, 80),
health_check: Some(HealthCheck::HTTP(
Some(25001),
@@ -83,7 +121,7 @@ impl OKDLoadBalancerScore {
)),
},
LoadBalancerService {
backend_servers: Self::nodes_to_backend_server(topology, 443),
backend_servers: backends_for(&all_nodes, 443),
listening_port: SocketAddr::new(bind_addr, 443),
health_check: Some(HealthCheck::HTTP(
Some(25001),
@@ -94,12 +132,12 @@ impl OKDLoadBalancerScore {
)),
},
LoadBalancerService {
backend_servers: Self::control_plane_to_backend_server(topology, 22623),
backend_servers: backends_for(control_plane, 22623),
listening_port: SocketAddr::new(bind_addr, 22623),
health_check: Some(HealthCheck::TCP(None)),
},
LoadBalancerService {
backend_servers: Self::control_plane_to_backend_server(topology, 6443),
backend_servers: backends_for(control_plane, 6443),
listening_port: SocketAddr::new(bind_addr, 6443),
health_check: Some(HealthCheck::HTTP(
None,
@@ -110,57 +148,20 @@ impl OKDLoadBalancerScore {
)),
},
];
Self {
load_balancer_score: LoadBalancerScore {
public_services,
private_services,
},
}
}
/// Creates backend servers list for control plane nodes only
///
/// Use this for control plane-specific services like:
/// - Port 22623: Ignition API (machine configuration during bootstrap)
/// - Port 6443: Kubernetes API server
fn control_plane_to_backend_server(
topology: &HAClusterTopology,
port: u16,
) -> Vec<BackendServer> {
topology
.control_plane
.iter()
.map(|cp| BackendServer {
address: cp.ip.to_string(),
port,
})
.collect()
(public_services, private_services)
}
}
/// Creates backend servers list for all nodes (control plane + workers)
///
/// Use this for ingress traffic that should be distributed across all nodes:
/// - Port 80: HTTP ingress traffic
/// - Port 443: HTTPS ingress traffic
///
/// In OKD, ingress router pods can run on any node, so both control plane
/// and worker nodes should be included in the load balancer backend pool.
fn nodes_to_backend_server(topology: &HAClusterTopology, port: u16) -> Vec<BackendServer> {
let mut nodes = Vec::new();
for cp in &topology.control_plane {
nodes.push(BackendServer {
address: cp.ip.to_string(),
port,
});
}
for worker in &topology.workers {
nodes.push(BackendServer {
address: worker.ip.to_string(),
port,
});
}
nodes
}
/// Map a slice of logical hosts to `BackendServer`s for a single port.
fn backends_for(hosts: &[LogicalHost], port: u16) -> Vec<BackendServer> {
hosts
.iter()
.map(|h| BackendServer {
address: h.ip.to_string(),
port,
})
.collect()
}
#[cfg(test)]
@@ -245,14 +246,23 @@ mod tests {
}
#[test]
fn test_nodes_to_backend_server_includes_control_plane_and_workers() {
fn test_public_port_80_backends_include_all_nodes() {
let topology = create_test_topology();
let (public_services, _) =
OKDLoadBalancerScore::services_for_nodes(&topology.control_plane, &topology.workers);
let backend_servers = OKDLoadBalancerScore::nodes_to_backend_server(&topology, 80);
let svc_80 = public_services
.iter()
.find(|s| s.listening_port.port() == 80)
.expect("Public service on port 80 not found");
assert_eq!(backend_servers.len(), 5);
assert_eq!(svc_80.backend_servers.len(), 5);
let addresses: Vec<&str> = backend_servers.iter().map(|s| s.address.as_str()).collect();
let addresses: Vec<&str> = svc_80
.backend_servers
.iter()
.map(|s| s.address.as_str())
.collect();
assert!(addresses.contains(&"192.168.1.10"));
assert!(addresses.contains(&"192.168.1.11"));
assert!(addresses.contains(&"192.168.1.12"));
@@ -261,14 +271,23 @@ mod tests {
}
#[test]
fn test_control_plane_to_backend_server_only_includes_control_plane() {
fn test_private_port_22623_backends_exclude_workers() {
let topology = create_test_topology();
let (_, private_services) =
OKDLoadBalancerScore::services_for_nodes(&topology.control_plane, &topology.workers);
let backend_servers = OKDLoadBalancerScore::control_plane_to_backend_server(&topology, 80);
let svc = private_services
.iter()
.find(|s| s.listening_port.port() == 22623)
.expect("Private service on port 22623 not found");
assert_eq!(backend_servers.len(), 3);
assert_eq!(svc.backend_servers.len(), 3);
let addresses: Vec<&str> = backend_servers.iter().map(|s| s.address.as_str()).collect();
let addresses: Vec<&str> = svc
.backend_servers
.iter()
.map(|s| s.address.as_str())
.collect();
assert!(addresses.contains(&"192.168.1.10"));
assert!(addresses.contains(&"192.168.1.11"));
assert!(addresses.contains(&"192.168.1.12"));
@@ -276,6 +295,53 @@ mod tests {
assert!(!addresses.contains(&"192.168.1.21"));
}
#[test]
fn test_public_port_80_dedupes_shared_master_worker_nodes() {
// OKD marks control-plane nodes as schedulable workers — they appear
// in BOTH the master-label and worker-label query results. Feeding
// the two slices into services_for_nodes must not duplicate them
// in the "all nodes" ingress pools, or HAProxy rejects the config.
let cps = vec![
LogicalHost {
ip: ip!("192.168.1.21"),
name: "cp1".into(),
},
LogicalHost {
ip: ip!("192.168.1.22"),
name: "cp2".into(),
},
];
let workers = vec![
LogicalHost {
ip: ip!("192.168.1.21"),
name: "cp1".into(),
},
LogicalHost {
ip: ip!("192.168.1.22"),
name: "cp2".into(),
},
LogicalHost {
ip: ip!("192.168.1.30"),
name: "wk0".into(),
},
];
let (public_services, _) = OKDLoadBalancerScore::services_for_nodes(&cps, &workers);
let svc_80 = public_services
.iter()
.find(|s| s.listening_port.port() == 80)
.expect("Public service on port 80 not found");
let addresses: Vec<&str> = svc_80
.backend_servers
.iter()
.map(|s| s.address.as_str())
.collect();
assert_eq!(
addresses,
vec!["192.168.1.21", "192.168.1.22", "192.168.1.30"]
);
}
#[test]
fn test_public_services_include_all_nodes_on_port_80_and_443() {
let topology = create_test_topology();
@@ -341,10 +407,15 @@ mod tests {
#[test]
fn test_all_backend_servers_have_correct_port() {
let topology = create_test_topology();
let (public_services, _) =
OKDLoadBalancerScore::services_for_nodes(&topology.control_plane, &topology.workers);
let backend_servers = OKDLoadBalancerScore::nodes_to_backend_server(&topology, 443);
let svc_443 = public_services
.iter()
.find(|s| s.listening_port.port() == 443)
.expect("Public service on port 443 not found");
for server in backend_servers {
for server in &svc_443.backend_servers {
assert_eq!(server.port, 443);
}
}

View File

@@ -24,8 +24,13 @@ pub use bootstrap_04_workers::*;
pub use bootstrap_05_sanity_check::*;
pub use bootstrap_06_installation_report::*;
pub use bootstrap_persist_network_bond::*;
pub mod add_node;
pub mod crd;
pub mod disable_dad_score;
pub mod host_network;
pub mod node_file_score;
pub mod os_artifacts;
pub mod system_reserved_score;
pub use add_node::*;
pub use os_artifacts::*;

View File

@@ -0,0 +1,433 @@
//! Verify that the kernel/initramfs/rootfs on the firewall's HTTP server
//! match the CoreOS version the running OKD cluster expects.
//!
//! If they're stale (cluster upgraded since bootstrap, for example), new
//! PXE-booting nodes come up on the wrong stream and fail to join. This
//! score asks the cluster what version it wants — via
//! `oc adm release extract --command=openshift-install` →
//! `openshift-install coreos print-stream-json` — then SSHes to the
//! firewall to `sha256sum` each file under `/usr/local/http/scos/`.
//! Match → score succeeds. Mismatch → print an ssh+curl+ln-sfn bundle the
//! operator can paste to refresh the files directly on the firewall, then
//! re-verify in a loop until either clean or the operator aborts.
//!
//! The stable names iPXE serves (`scos-live-kernel.x86_64`, etc.) don't
//! appear in the stream JSON — downloads come with versioned upstream
//! names. The refresh bundle therefore `curl -L -O`s the versioned file
//! and `ln -sfn`s the stable name at it. `sha256sum` follows symlinks by
//! default, so our verification ends up hashing the versioned payload
//! regardless of whether the stable name is a regular file or a symlink.
use async_trait::async_trait;
use harmony_secret::SecretManager;
use harmony_types::{id::Id, ssh::SshCredentials};
use log::{debug, info};
use serde::Serialize;
use serde_json::Value;
use std::net::IpAddr;
use tokio::process::Command;
use crate::{
config::secret::OPNSenseFirewallCredentials,
data::Version,
infra::ssh::run_command,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::HAClusterTopology,
};
const REMOTE_DIR: &str = "/usr/local/http/scos";
const KERNEL_STABLE: &str = "scos-live-kernel.x86_64";
const INITRAMFS_STABLE: &str = "scos-live-initramfs.x86_64.img";
const ROOTFS_STABLE: &str = "scos-live-rootfs.x86_64.img";
const OPENSHIFT_INSTALL_DIR: &str = "./data/okd/bin";
/// Standalone score that validates PXE artifacts before anyone PXEs into
/// them. Runs both on its own (added to a pipeline) and as the first step
/// of [`super::add_node::OKDAddNodeScore`] — operators don't have to
/// remember to wire it in.
#[derive(Debug, Clone, Serialize)]
pub struct OKDOsArtifactsScore;
impl Score<HAClusterTopology> for OKDOsArtifactsScore {
fn name(&self) -> String {
"OKDOsArtifactsScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<HAClusterTopology>> {
Box::new(OKDOsArtifactsInterpret)
}
}
#[derive(Debug)]
pub struct OKDOsArtifactsInterpret;
#[async_trait]
impl Interpret<HAClusterTopology> for OKDOsArtifactsInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &HAClusterTopology,
) -> Result<Outcome, InterpretError> {
let fw_ip = topology.firewall.get_ip();
let fw_creds = SecretManager::get_or_prompt::<OPNSenseFirewallCredentials>().await?;
let ssh_creds = SshCredentials::Password {
username: fw_creds.username,
password: fw_creds.password,
};
info!(
"[OSArtifacts] Extracting cluster-matched openshift-install and reading stream JSON..."
);
let expected = fetch_expected_pxe_artifacts().await?;
loop {
info!("[OSArtifacts] Verifying 3 PXE artifacts on {fw_ip}...");
let rows = verify_each(fw_ip, &ssh_creds, &expected).await?;
if rows.iter().all(|r| r.status == Status::Match) {
info!("[OSArtifacts] All 3 PXE artifacts match the cluster's expected version");
return Ok(Outcome::success(format!(
"PXE artifacts verified on {fw_ip} in {REMOTE_DIR}"
)));
}
print_mismatch_report(fw_ip, &rows);
let cont = inquire::Confirm::new(
"After updating the files on the firewall, press enter to re-verify (or decline to abort)",
)
.with_default(true)
.prompt()
.map_err(|e| InterpretError::new(format!("User prompt failed: {e}")))?;
if !cont {
return Err(InterpretError::new(
"Operator aborted PXE-artifact validation — add-node cannot proceed with stale files.".into(),
));
}
}
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDOsArtifacts")
}
fn get_version(&self) -> Version {
Version::from("1.0.0").unwrap()
}
fn get_status(&self) -> InterpretStatus {
InterpretStatus::QUEUED
}
fn get_children(&self) -> Vec<Id> {
vec![]
}
}
// -----------------------------------------------------------------------------
// Expected artifacts — shell out to openshift-install
// -----------------------------------------------------------------------------
#[derive(Debug, Clone)]
struct PxeArtifact {
stable_filename: &'static str,
url: String,
sha256: String,
}
impl PxeArtifact {
fn remote_path(&self) -> String {
format!("{REMOTE_DIR}/{}", self.stable_filename)
}
/// Upstream filename extracted from the URL's last path segment —
/// what `curl -L -O` would save to disk and what `ln -sfn` targets.
fn upstream_filename(&self) -> &str {
self.url.rsplit('/').next().unwrap_or(self.url.as_str())
}
}
async fn fetch_expected_pxe_artifacts() -> Result<Vec<PxeArtifact>, InterpretError> {
// 1. Extract openshift-install that matches the running cluster.
// Overwrites ./data/okd/bin/openshift-install so we never use a
// stale one. Requires `oc` on PATH and a valid KUBECONFIG.
let to_arg = format!("--to={OPENSHIFT_INSTALL_DIR}");
debug!("[OSArtifacts] oc adm release extract --command=openshift-install {to_arg}");
let status = Command::new("oc")
.args([
"adm",
"release",
"extract",
"--command=openshift-install",
&to_arg,
])
.status()
.await
.map_err(|e| {
InterpretError::new(format!(
"Could not run `oc`: {e}. Is it on PATH and is KUBECONFIG set?"
))
})?;
if !status.success() {
return Err(InterpretError::new(format!(
"`oc adm release extract --command=openshift-install` failed (exit {status}). Is KUBECONFIG pointing at the cluster?"
)));
}
// 2. Print stream JSON.
let installer = format!("{OPENSHIFT_INSTALL_DIR}/openshift-install");
debug!("[OSArtifacts] {installer} coreos print-stream-json");
let out = Command::new(&installer)
.args(["coreos", "print-stream-json"])
.output()
.await
.map_err(|e| InterpretError::new(format!("Could not run {installer}: {e}")))?;
if !out.status.success() {
return Err(InterpretError::new(format!(
"`{installer} coreos print-stream-json` failed (exit {}): {}",
out.status,
String::from_utf8_lossy(&out.stderr),
)));
}
let json: Value = serde_json::from_slice(&out.stdout).map_err(|e| {
InterpretError::new(format!(
"Could not parse `openshift-install` stream JSON: {e}"
))
})?;
// 3. Pull URL + sha256 for kernel/initramfs/rootfs.
let pxe = &json["architectures"]["x86_64"]["artifacts"]["metal"]["formats"]["pxe"];
let field = |artifact: &str, key: &str| -> Result<String, InterpretError> {
pxe[artifact][key]
.as_str()
.map(String::from)
.ok_or_else(|| {
InterpretError::new(format!(
"Stream JSON is missing .architectures.x86_64.artifacts.metal.formats.pxe.{artifact}.{key}"
))
})
};
Ok(vec![
PxeArtifact {
stable_filename: KERNEL_STABLE,
url: field("kernel", "location")?,
sha256: field("kernel", "sha256")?,
},
PxeArtifact {
stable_filename: INITRAMFS_STABLE,
url: field("initramfs", "location")?,
sha256: field("initramfs", "sha256")?,
},
PxeArtifact {
stable_filename: ROOTFS_STABLE,
url: field("rootfs", "location")?,
sha256: field("rootfs", "sha256")?,
},
])
}
// -----------------------------------------------------------------------------
// Verification over SSH
// -----------------------------------------------------------------------------
#[derive(Debug, PartialEq, Clone, Copy)]
enum Status {
Match,
Mismatch,
Missing,
}
#[derive(Debug)]
struct VerifyRow {
artifact: PxeArtifact,
have: Option<String>,
status: Status,
}
async fn verify_each(
fw_ip: IpAddr,
creds: &SshCredentials,
expected: &[PxeArtifact],
) -> Result<Vec<VerifyRow>, InterpretError> {
let mut out = Vec::with_capacity(expected.len());
for e in expected {
let have = sha256_of(fw_ip, creds, &e.remote_path()).await;
let status = match &have {
Some(h) if h == &e.sha256 => Status::Match,
Some(_) => Status::Mismatch,
None => Status::Missing,
};
debug!(
"[OSArtifacts] {}: expected {}... got {:?} -> {:?}",
e.stable_filename,
&e.sha256[..12],
have.as_deref(),
status,
);
out.push(VerifyRow {
artifact: e.clone(),
have,
status,
});
}
Ok(out)
}
/// `sha256sum <path>` over SSH. We deliberately avoid shell features
/// (pipes, `2>/dev/null`, awk) because the firewall's root shell is FreeBSD
/// csh, which doesn't speak Bourne-style stderr redirection — every shell
/// understands a bare `sha256sum <path>`. Missing files / dangling
/// symlinks cause a non-zero exit (our run_command turns that into Err,
/// which we fold back to None). Parsing is done in Rust: scan lines for a
/// token that looks like a 64-char hex hash.
async fn sha256_of(fw_ip: IpAddr, creds: &SshCredentials, remote_path: &str) -> Option<String> {
let cmd = format!("sha256sum {remote_path}");
let out = match run_command(fw_ip, 22, creds, &cmd).await {
Ok(out) => out,
Err(e) => {
debug!("[OSArtifacts] sha256sum {remote_path} returned err (likely missing): {e}");
return None;
}
};
debug!(
"[OSArtifacts] sha256sum raw output for {remote_path}: {:?}",
out.trim()
);
parse_sha256sum_output(&out)
}
/// Extract the 64-hex sha256 from any line whose first whitespace token
/// looks like one. Tolerant of interleaved stderr banners or warnings
/// the remote shell might have splatted into our channel.
fn parse_sha256sum_output(output: &str) -> Option<String> {
for line in output.lines() {
if let Some(first) = line.split_whitespace().next() {
if first.len() == 64 && first.chars().all(|c| c.is_ascii_hexdigit()) {
return Some(first.to_string());
}
}
}
None
}
// -----------------------------------------------------------------------------
// Mismatch report
// -----------------------------------------------------------------------------
fn print_mismatch_report(fw_ip: IpAddr, rows: &[VerifyRow]) {
let mut msg = String::from(
"[OSArtifacts] PXE artifact mismatch — manual refresh required.\n\nExpected (from cluster's `openshift-install coreos print-stream-json`):\n",
);
for r in rows {
let have = match &r.have {
Some(h) => format!("have={}", &h[..16.min(h.len())]),
None => "have=<missing>".to_string(),
};
let marker = if r.status == Status::Match {
" OK"
} else {
""
};
msg.push_str(&format!(
" {:<34} sha256={} {}{}\n",
r.artifact.stable_filename,
&r.artifact.sha256[..16],
have,
marker,
));
}
let needs: Vec<&VerifyRow> = rows.iter().filter(|r| r.status != Status::Match).collect();
msg.push_str(&format!(
"\nRun this from your workstation to refresh directly on the firewall:\n\n ssh root@{fw_ip} '\\\n set -eux && \\\n mkdir -p {REMOTE_DIR} && \\\n cd {REMOTE_DIR}"
));
for r in &needs {
msg.push_str(&format!(" && \\\n curl -L -O \"{}\"", r.artifact.url));
}
for r in &needs {
msg.push_str(&format!(
" && \\\n ln -sfn {} {}",
r.artifact.upstream_filename(),
r.artifact.stable_filename,
));
}
msg.push_str("'\n\nThen press enter here to re-verify.\n");
info!("{msg}");
}
// -----------------------------------------------------------------------------
// Tests
// -----------------------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
fn row(status: Status, have: Option<&str>) -> VerifyRow {
VerifyRow {
artifact: PxeArtifact {
stable_filename: KERNEL_STABLE,
url: "https://example.com/path/rhcos-418.94.xxx-live-kernel-x86_64".to_string(),
sha256: "a".repeat(64),
},
have: have.map(|s| s.to_string()),
status,
}
}
#[test]
fn upstream_filename_is_last_path_segment() {
let a = PxeArtifact {
stable_filename: KERNEL_STABLE,
url: "https://x.example/a/b/rhcos-418.94-live-kernel-x86_64".into(),
sha256: "deadbeef".into(),
};
assert_eq!(a.upstream_filename(), "rhcos-418.94-live-kernel-x86_64");
}
#[test]
fn parse_sha256sum_output_handles_standard_format() {
let out = "dceac0b7809536dea5ff109d231b487a2be4cad742e1152c1268cd800dd6450b /usr/local/http/scos/scos-live-kernel.x86_64\n";
assert_eq!(
parse_sha256sum_output(out).as_deref(),
Some("dceac0b7809536dea5ff109d231b487a2be4cad742e1152c1268cd800dd6450b"),
);
}
#[test]
fn parse_sha256sum_output_skips_stderr_noise_before_hash() {
let out = "Warning: remote shell is weird\nsha256sum: foo: No such file\ndceac0b7809536dea5ff109d231b487a2be4cad742e1152c1268cd800dd6450b /path\n";
assert_eq!(
parse_sha256sum_output(out).as_deref(),
Some("dceac0b7809536dea5ff109d231b487a2be4cad742e1152c1268cd800dd6450b"),
);
}
#[test]
fn parse_sha256sum_output_rejects_short_hex() {
assert_eq!(parse_sha256sum_output("dead /path\n"), None);
}
#[test]
fn parse_sha256sum_output_rejects_non_hex() {
let out = "ZZZZc0b7809536dea5ff109d231b487a2be4cad742e1152c1268cd800dd6450b /path\n";
assert_eq!(parse_sha256sum_output(out), None);
}
#[test]
fn parse_sha256sum_output_empty_is_none() {
assert_eq!(parse_sha256sum_output(""), None);
}
#[test]
fn status_detection_match_vs_mismatch_vs_missing() {
let expected_hash = "a".repeat(64);
let other_hash = "b".repeat(64);
assert_eq!(
row(Status::Match, Some(&expected_hash)).status,
Status::Match
);
assert_eq!(
row(Status::Mismatch, Some(&other_hash)).status,
Status::Mismatch
);
assert_eq!(row(Status::Missing, None).status, Status::Missing);
}
}

View File

@@ -16,5 +16,9 @@ pub struct BootstrapIpxeTpl<'a> {
pub scos_path: &'a str,
pub installation_device: &'a str,
pub ignition_http_path: &'a str,
pub ignition_file_name: &'static str,
// Was `&'static str` because bootstrap only ever rendered
// `bootstrap.ign` / `master.ign` / `worker.ign`. Add-node generates
// per-host filenames like `master-cp3.ign`, so the lifetime must be
// bound to the caller's String instead.
pub ignition_file_name: &'a str,
}

View File

@@ -3,5 +3,6 @@ pub mod id;
pub mod k8s_name;
pub mod net;
pub mod rfc1123;
pub mod ssh;
pub mod storage;
pub mod switch;

21
harmony_types/src/ssh.rs Normal file
View File

@@ -0,0 +1,21 @@
//! SSH credentials primitive.
//!
//! Deliberately lean: a plain enum holding either a PEM-encoded private key
//! (with an optional passphrase) or a password. No derives for interactive
//! prompting, no JsonSchema — consumers build this from secrets they already
//! hold (`harmony::config::secret::SshKeyPair`, static passwords, etc.) and
//! pass it to `harmony::infra::ssh::run_command`.
/// How to authenticate an outbound SSH connection.
pub enum SshCredentials {
/// Private-key authentication. `private_pem` is the key material as
/// stored in OpenSSH / PKCS#8 format; `passphrase` decrypts an encrypted
/// key, or `None` for an unencrypted one.
SshKey {
username: String,
private_pem: String,
passphrase: Option<String>,
},
/// Password authentication.
Password { username: String, password: String },
}

View File

@@ -32,12 +32,26 @@ impl CaddyConfig {
Self { client }
}
/// Check if the Caddy plugin is installed by querying its settings endpoint.
pub async fn is_installed(&self) -> bool {
self.client
/// Check whether the Caddy plugin is installed by querying its settings
/// endpoint.
///
/// Returns:
/// - `Ok(true)` — the endpoint returned 2xx, plugin is present.
/// - `Ok(false)` — OPNsense returned **404** specifically, meaning the
/// MVC route doesn't exist because the plugin isn't installed.
/// - `Err(_)` — anything else (timeout, TLS mismatch, 401/403, 5xx,
/// malformed JSON). The caller must not assume "not installed" in
/// this case; it means we couldn't determine state.
pub async fn is_installed(&self) -> Result<bool, Error> {
match self
.client
.get_typed::<serde_json::Value>("caddy", "General", "get")
.await
.is_ok()
{
Ok(_) => Ok(true),
Err(opnsense_api::Error::Api { status, .. }) if status.as_u16() == 404 => Ok(false),
Err(e) => Err(Error::Api(e)),
}
}
/// Enable or disable Caddy, setting http_port=8080 and https_port=8443.