Compare commits

...

3 Commits

Author SHA1 Message Date
035de57925 add missing field kubeconfig 2025-10-21 13:42:18 -04:00
83fcf9e8ac wip 2025-10-17 11:39:26 -04:00
dbd1f1b010 install operatorhub to configure nmstate 2025-10-16 17:01:05 -04:00
16 changed files with 278 additions and 55 deletions

View File

@ -39,6 +39,7 @@ async fn main() {
let gateway_ipv4 = Ipv4Addr::new(192, 168, 33, 1);
let gateway_ip = IpAddr::V4(gateway_ipv4);
let topology = harmony::topology::HAClusterTopology {
kubeconfig: None,
domain_name: "ncd0.harmony.mcd".to_string(), // TODO this must be set manually correctly
// when setting up the opnsense firewall
router: Arc::new(UnmanagedRouter::new(

View File

@ -38,6 +38,7 @@ pub async fn get_topology() -> HAClusterTopology {
let gateway_ipv4 = ipv4!("192.168.1.1");
let gateway_ip = IpAddr::V4(gateway_ipv4);
harmony::topology::HAClusterTopology {
kubeconfig: None,
domain_name: "demo.harmony.mcd".to_string(),
router: Arc::new(UnmanagedRouter::new(
gateway_ip,

View File

@ -32,6 +32,7 @@ pub async fn get_topology() -> HAClusterTopology {
let gateway_ipv4 = ipv4!("192.168.1.1");
let gateway_ip = IpAddr::V4(gateway_ipv4);
harmony::topology::HAClusterTopology {
kubeconfig: None,
domain_name: "demo.harmony.mcd".to_string(),
router: Arc::new(UnmanagedRouter::new(
gateway_ip,

View File

@ -34,6 +34,7 @@ async fn main() {
let gateway_ipv4 = Ipv4Addr::new(10, 100, 8, 1);
let gateway_ip = IpAddr::V4(gateway_ipv4);
let topology = harmony::topology::HAClusterTopology {
kubeconfig: None,
domain_name: "demo.harmony.mcd".to_string(),
router: Arc::new(UnmanagedRouter::new(
gateway_ip,

View File

@ -29,9 +29,9 @@ use super::{
Topology, k8s::K8sClient,
};
use std::collections::BTreeMap;
use std::net::IpAddr;
use std::sync::Arc;
use std::{collections::BTreeMap, time::Duration};
#[derive(Debug, Clone)]
pub struct HAClusterTopology {
@ -47,6 +47,7 @@ pub struct HAClusterTopology {
pub control_plane: Vec<LogicalHost>,
pub workers: Vec<LogicalHost>,
pub switch: Vec<LogicalHost>,
pub kubeconfig: Option<String>,
}
#[async_trait]
@ -65,9 +66,17 @@ impl Topology for HAClusterTopology {
#[async_trait]
impl K8sclient for HAClusterTopology {
async fn k8s_client(&self) -> Result<Arc<K8sClient>, String> {
Ok(Arc::new(
K8sClient::try_default().await.map_err(|e| e.to_string())?,
))
match &self.kubeconfig {
None => Ok(Arc::new(
K8sClient::try_default().await.map_err(|e| e.to_string())?,
)),
Some(kubeconfig) => {
let Some(client) = K8sClient::from_kubeconfig(&kubeconfig).await else {
return Err("Failed to create k8s client".to_string());
};
Ok(Arc::new(client))
}
}
}
}
@ -93,13 +102,13 @@ impl HAClusterTopology {
}
async fn ensure_nmstate_operator_installed(&self) -> Result<(), String> {
// FIXME: Find a way to check nmstate is already available (get pod -n openshift-nmstate)
// FIXME: Find a way to check nmstate is already available (get pod -n nmstate)
debug!("Installing NMState operator...");
let k8s_client = self.k8s_client().await?;
let nmstate_namespace = Namespace {
metadata: ObjectMeta {
name: Some("openshift-nmstate".to_string()),
name: Some("nmstate".to_string()),
finalizers: Some(vec!["kubernetes".to_string()]),
..Default::default()
},
@ -113,50 +122,59 @@ impl HAClusterTopology {
let nmstate_operator_group = OperatorGroup {
metadata: ObjectMeta {
name: Some("openshift-nmstate".to_string()),
namespace: Some("openshift-nmstate".to_string()),
name: Some("nmstate".to_string()),
namespace: Some("nmstate".to_string()),
..Default::default()
},
spec: OperatorGroupSpec {
target_namespaces: vec!["openshift-nmstate".to_string()],
target_namespaces: vec!["nmstate".to_string()],
},
};
debug!("Creating NMState operator group: {nmstate_operator_group:#?}");
k8s_client
.apply(&nmstate_operator_group, None)
.apply(&nmstate_operator_group, Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
let nmstate_subscription = Subscription {
metadata: ObjectMeta {
name: Some("kubernetes-nmstate-operator".to_string()),
namespace: Some("openshift-nmstate".to_string()),
namespace: Some("nmstate".to_string()),
..Default::default()
},
spec: SubscriptionSpec {
channel: Some("stable".to_string()),
install_plan_approval: Some(InstallPlanApproval::Automatic),
channel: Some("alpha".to_string()),
name: "kubernetes-nmstate-operator".to_string(),
source: "redhat-operators".to_string(),
source: "operatorhubio-catalog".to_string(),
source_namespace: "openshift-marketplace".to_string(),
install_plan_approval: Some(InstallPlanApproval::Automatic),
},
};
debug!("Subscribing to NMState Operator: {nmstate_subscription:#?}");
k8s_client
.apply(&nmstate_subscription, None)
.apply(&nmstate_subscription, Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
k8s_client
.wait_for_operator(
"kubernetes-nmstate-operator",
Some("nmstate"),
Some(Duration::from_secs(30)),
)
.await?;
let nmstate = NMState {
metadata: ObjectMeta {
name: Some("nmstate".to_string()),
namespace: Some("nmstate".to_string()),
..Default::default()
},
..Default::default()
};
debug!("Creating NMState: {nmstate:#?}");
k8s_client
.apply(&nmstate, None)
.apply(&nmstate, Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
@ -187,9 +205,9 @@ impl HAClusterTopology {
.unwrap()
.apply(&bond_config, None)
.await
.unwrap();
.map_err(|e| SwitchError::new(format!("Failed to configure bond: {e}")))?;
todo!()
Ok(())
}
fn create_bond_configuration(
@ -325,6 +343,7 @@ impl HAClusterTopology {
};
Self {
kubeconfig: None,
domain_name: "DummyTopology".to_string(),
router: dummy_infra.clone(),
load_balancer: dummy_infra.clone(),

View File

@ -10,11 +10,13 @@ use k8s_openapi::{
};
use kube::{
Client, Config, Error, Resource,
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
api::{
Api, AttachParams, DeleteParams, ListParams, ObjectMeta, Patch, PatchParams, ResourceExt,
},
config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse,
error::DiscoveryError,
runtime::reflector::Lookup,
runtime::{reflector::Lookup, wait::Condition},
};
use kube::{api::DynamicObject, runtime::conditions};
use kube::{
@ -22,11 +24,13 @@ use kube::{
runtime::wait::await_condition,
};
use log::{debug, error, trace};
use serde::{Serialize, de::DeserializeOwned};
use serde::{Deserialize, Serialize, de::DeserializeOwned};
use serde_json::{Value, json};
use similar::TextDiff;
use tokio::{io::AsyncReadExt, time::sleep};
use crate::modules::okd::crd::ClusterServiceVersion;
#[derive(new, Clone)]
pub struct K8sClient {
client: Client,
@ -194,6 +198,33 @@ impl K8sClient {
}
}
pub async fn wait_for_operator(
&self,
operator_name: &str,
namespace: Option<&str>,
timeout: Option<Duration>,
) -> Result<(), String> {
let api: Api<ClusterServiceVersion>;
if let Some(ns) = namespace {
api = Api::namespaced(self.client.clone(), ns);
} else {
api = Api::default_namespaced(self.client.clone());
}
let establish = await_condition(api, operator_name, is_operator_ready());
let t = timeout.unwrap_or(Duration::from_secs(5));
let res = tokio::time::timeout(t, establish).await;
if res.is_ok() {
Ok(())
} else {
Err(format!(
"timed out while waiting for operator {operator_name}"
))
}
}
/// Will execute a commond in the first pod found that matches the specified label
/// '{label}={name}'
pub async fn exec_app_capture_output(
@ -547,3 +578,14 @@ where
}
}
}
fn is_operator_ready() -> impl Condition<ClusterServiceVersion> {
|obj: Option<&ClusterServiceVersion>| {
if let Some(csv) = obj {
if let Some(status) = &csv.spec.status {
return status.phase == "Succeeded";
}
}
false
}
}

View File

@ -0,0 +1 @@
pub mod types;

View File

@ -0,0 +1,27 @@
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Default, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "operators.coreos.com",
version = "v1alpha1",
kind = "CatalogSource",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct CatalogSourceSpec {
pub source_type: String,
pub image: String,
pub display_name: String,
pub publisher: String,
}
impl Default for CatalogSource {
fn default() -> Self {
Self {
metadata: Default::default(),
spec: Default::default(),
}
}
}

View File

@ -3,5 +3,6 @@ pub mod executors;
pub mod hp_ilo;
pub mod intel_amt;
pub mod inventory;
pub mod kubers;
pub mod opnsense;
mod sqlx;

View File

@ -38,13 +38,15 @@ impl<
+ 'static
+ Send
+ Clone,
T: Topology,
T: Topology + K8sclient,
> Score<T> for K8sResourceScore<K>
where
<K as kube::Resource>::DynamicType: Default,
{
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
todo!()
Box::new(K8sResourceInterpret {
score: self.clone(),
})
}
fn name(&self) -> String {

View File

@ -5,10 +5,8 @@ use crate::{
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::{HostRole, Inventory},
modules::{
dhcp::DhcpHostBindingScore,
http::IPxeMacBootFileScore,
inventory::DiscoverHostForRoleScore,
okd::{host_network::HostNetworkConfigurationScore, templates::BootstrapIpxeTpl},
dhcp::DhcpHostBindingScore, http::IPxeMacBootFileScore,
inventory::DiscoverHostForRoleScore, okd::templates::BootstrapIpxeTpl,
},
score::Score,
topology::{HAClusterTopology, HostBinding},
@ -205,28 +203,6 @@ impl OKDSetup03ControlPlaneInterpret {
Ok(())
}
/// Placeholder for automating network bonding configuration.
async fn persist_network_bond(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
hosts: &Vec<PhysicalHost>,
) -> Result<(), InterpretError> {
info!("[ControlPlane] Ensuring persistent bonding");
let score = HostNetworkConfigurationScore {
hosts: hosts.clone(),
};
score.interpret(inventory, topology).await?;
inquire::Confirm::new(
"Network configuration for control plane nodes is not automated yet. Configure it manually if needed.",
)
.prompt()
.map_err(|e| InterpretError::new(format!("User prompt failed: {e}")))?;
Ok(())
}
}
#[async_trait]
@ -265,10 +241,6 @@ impl Interpret<HAClusterTopology> for OKDSetup03ControlPlaneInterpret {
// 4. Reboot the nodes to start the OS installation.
self.reboot_targets(&nodes).await?;
// 5. Placeholder for post-boot network configuration (e.g., bonding).
self.persist_network_bond(inventory, topology, &nodes)
.await?;
// TODO: Implement a step to wait for the control plane nodes to join the cluster
// and for the cluster operators to become available. This would be similar to
// the `wait-for bootstrap-complete` command.

View File

@ -0,0 +1,131 @@
use crate::{
data::Version,
hardware::PhysicalHost,
infra::inventory::InventoryRepositoryFactory,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::{HostRole, Inventory},
modules::okd::host_network::HostNetworkConfigurationScore,
score::Score,
topology::HAClusterTopology,
};
use async_trait::async_trait;
use derive_new::new;
use harmony_types::id::Id;
use log::info;
use serde::Serialize;
// -------------------------------------------------------------------------------------------------
// Step XX: Persist Network Bond
// - Persist bonding via NMState
// - Persist port channels on the Switch
// -------------------------------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize, new)]
pub struct OKDSetupPersistNetworkBondScore {}
impl Score<HAClusterTopology> for OKDSetupPersistNetworkBondScore {
fn create_interpret(&self) -> Box<dyn Interpret<HAClusterTopology>> {
Box::new(OKDSetupPersistNetworkBondInterpet::new())
}
fn name(&self) -> String {
"OKDSetupPersistNetworkBondScore".to_string()
}
}
#[derive(Debug, Clone)]
pub struct OKDSetupPersistNetworkBondInterpet {
version: Version,
status: InterpretStatus,
}
impl OKDSetupPersistNetworkBondInterpet {
pub fn new() -> Self {
let version = Version::from("1.0.0").unwrap();
Self {
version,
status: InterpretStatus::QUEUED,
}
}
/// Ensures that three physical hosts are discovered and available for the ControlPlane role.
/// It will trigger discovery if not enough hosts are found.
async fn get_nodes(
&self,
_inventory: &Inventory,
_topology: &HAClusterTopology,
) -> Result<Vec<PhysicalHost>, InterpretError> {
const REQUIRED_HOSTS: usize = 3;
let repo = InventoryRepositoryFactory::build().await?;
let control_plane_hosts = repo.get_host_for_role(&HostRole::ControlPlane).await?;
if control_plane_hosts.len() < REQUIRED_HOSTS {
Err(InterpretError::new(format!(
"OKD Requires at least {} control plane hosts, but only found {}. Cannot proceed.",
REQUIRED_HOSTS,
control_plane_hosts.len()
)))
} else {
// Take exactly the number of required hosts to ensure consistency.
Ok(control_plane_hosts
.into_iter()
.take(REQUIRED_HOSTS)
.collect())
}
}
async fn persist_network_bond(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
hosts: &Vec<PhysicalHost>,
) -> Result<(), InterpretError> {
info!("[ControlPlane] Ensuring persistent bonding");
let score = HostNetworkConfigurationScore {
hosts: hosts.clone(),
};
score.interpret(inventory, topology).await?;
inquire::Confirm::new(
"Network configuration for control plane nodes is not automated yet. Configure it manually if needed.",
)
.prompt()
.map_err(|e| InterpretError::new(format!("User prompt failed: {e}")))?;
Ok(())
}
}
#[async_trait]
impl Interpret<HAClusterTopology> for OKDSetupPersistNetworkBondInterpet {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDSetup03ControlPlane")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<Id> {
vec![]
}
async fn execute(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
) -> Result<Outcome, InterpretError> {
let nodes = self.get_nodes(inventory, topology).await?;
self.persist_network_bond(inventory, topology, &nodes)
.await?;
Ok(Outcome::success(
"Network bond successfully persisted".into(),
))
}
}

View File

@ -28,7 +28,9 @@ pub struct SubscriptionSpec {
pub name: String,
pub source: String,
pub source_namespace: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub channel: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub install_plan_approval: Option<InstallPlanApproval>,
}
@ -39,3 +41,22 @@ pub enum InstallPlanApproval {
#[serde(rename = "Manual")]
Manual,
}
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "operators.coreos.com",
version = "v1alpha1",
kind = "ClusterServiceVersion",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct ClusterServiceVersionSpec {
pub status: Option<ClusterServiceVersionStatus>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ClusterServiceVersionStatus {
pub phase: String,
pub reason: String,
}

View File

@ -50,7 +50,7 @@
use crate::{
modules::okd::{
OKDSetup01InventoryScore, OKDSetup02BootstrapScore, OKDSetup03ControlPlaneScore,
OKDSetup04WorkersScore, OKDSetup05SanityCheckScore,
OKDSetup04WorkersScore, OKDSetup05SanityCheckScore, OKDSetupPersistNetworkBondScore,
bootstrap_06_installation_report::OKDSetup06InstallationReportScore,
},
score::Score,
@ -65,6 +65,7 @@ impl OKDInstallationPipeline {
Box::new(OKDSetup01InventoryScore::new()),
Box::new(OKDSetup02BootstrapScore::new()),
Box::new(OKDSetup03ControlPlaneScore::new()),
Box::new(OKDSetupPersistNetworkBondScore::new()),
Box::new(OKDSetup04WorkersScore::new()),
Box::new(OKDSetup05SanityCheckScore::new()),
Box::new(OKDSetup06InstallationReportScore::new()),

View File

@ -6,6 +6,7 @@ mod bootstrap_05_sanity_check;
mod bootstrap_06_installation_report;
pub mod bootstrap_dhcp;
pub mod bootstrap_load_balancer;
mod bootstrap_persist_network_bond;
pub mod dhcp;
pub mod dns;
pub mod installation;
@ -19,5 +20,6 @@ pub use bootstrap_03_control_plane::*;
pub use bootstrap_04_workers::*;
pub use bootstrap_05_sanity_check::*;
pub use bootstrap_06_installation_report::*;
pub use bootstrap_persist_network_bond::*;
pub mod crd;
pub mod host_network;

View File

@ -9,7 +9,7 @@ pub struct Interface {
pub physical_interface_name: String,
pub descr: Option<MaybeString>,
pub mtu: Option<MaybeString>,
pub enable: MaybeString,
pub enable: Option<MaybeString>,
pub lock: Option<MaybeString>,
#[yaserde(rename = "spoofmac")]
pub spoof_mac: Option<MaybeString>,