diff --git a/examples/nanodc/src/main.rs b/examples/nanodc/src/main.rs index 629a8dc..e487fab 100644 --- a/examples/nanodc/src/main.rs +++ b/examples/nanodc/src/main.rs @@ -1,6 +1,6 @@ use std::{ net::{IpAddr, Ipv4Addr}, - sync::Arc, + sync::{Arc, OnceLock}, }; use brocade::BrocadeOptions; @@ -107,6 +107,7 @@ async fn main() { }, ], switch_client: switch_client.clone(), + network_manager: OnceLock::new(), }; let inventory = Inventory { diff --git a/examples/okd_installation/src/topology.rs b/examples/okd_installation/src/topology.rs index ce89402..2bc9fd2 100644 --- a/examples/okd_installation/src/topology.rs +++ b/examples/okd_installation/src/topology.rs @@ -9,7 +9,10 @@ use harmony::{ use harmony_macros::{ip, ipv4}; use harmony_secret::{Secret, SecretManager}; use serde::{Deserialize, Serialize}; -use std::{net::IpAddr, sync::Arc}; +use std::{ + net::IpAddr, + sync::{Arc, OnceLock}, +}; #[derive(Secret, Serialize, Deserialize, Debug, PartialEq)] struct OPNSenseFirewallConfig { @@ -81,6 +84,7 @@ pub async fn get_topology() -> HAClusterTopology { }, workers: vec![], switch_client: switch_client.clone(), + network_manager: OnceLock::new(), } } diff --git a/examples/okd_pxe/src/topology.rs b/examples/okd_pxe/src/topology.rs index 7c1244c..72695fa 100644 --- a/examples/okd_pxe/src/topology.rs +++ b/examples/okd_pxe/src/topology.rs @@ -10,7 +10,10 @@ use harmony::{ use harmony_macros::{ip, ipv4}; use harmony_secret::{Secret, SecretManager}; use serde::{Deserialize, Serialize}; -use std::{net::IpAddr, sync::Arc}; +use std::{ + net::IpAddr, + sync::{Arc, OnceLock}, +}; pub async fn get_topology() -> HAClusterTopology { let firewall = harmony::topology::LogicalHost { @@ -76,6 +79,7 @@ pub async fn get_topology() -> HAClusterTopology { }, workers: vec![], switch_client: switch_client.clone(), + network_manager: OnceLock::new(), } } diff --git a/examples/opnsense/src/main.rs b/examples/opnsense/src/main.rs index 4422f65..e74f5da 100644 --- a/examples/opnsense/src/main.rs +++ b/examples/opnsense/src/main.rs @@ -1,6 +1,6 @@ use std::{ net::{IpAddr, Ipv4Addr}, - sync::Arc, + sync::{Arc, OnceLock}, }; use brocade::BrocadeOptions; @@ -79,6 +79,7 @@ async fn main() { }, workers: vec![], switch_client: switch_client.clone(), + network_manager: OnceLock::new(), }; let inventory = Inventory { diff --git a/harmony/src/domain/topology/ha_cluster.rs b/harmony/src/domain/topology/ha_cluster.rs index 2d5e568..0edbc37 100644 --- a/harmony/src/domain/topology/ha_cluster.rs +++ b/harmony/src/domain/topology/ha_cluster.rs @@ -1,34 +1,24 @@ use async_trait::async_trait; use harmony_macros::ip; use harmony_types::{ - id::Id, net::{MacAddress, Url}, switch::PortLocation, }; -use k8s_openapi::api::core::v1::Node; -use kube::{ - ResourceExt, - api::{ObjectList, ObjectMeta}, -}; use log::debug; use log::info; -use crate::modules::okd::crd::nmstate::{self, NodeNetworkConfigurationPolicy}; +use crate::infra::network_manager::OpenShiftNmStateNetworkManager; use crate::topology::PxeOptions; -use crate::{data::FileContent, modules::okd::crd::nmstate::NMState}; -use crate::{ - executors::ExecutorError, modules::okd::crd::nmstate::NodeNetworkConfigurationPolicySpec, -}; +use crate::{data::FileContent, executors::ExecutorError}; use super::{ DHCPStaticEntry, DhcpServer, DnsRecord, DnsRecordType, DnsServer, Firewall, HostNetworkConfig, - HttpServer, IpAddress, K8sclient, LoadBalancer, LoadBalancerService, LogicalHost, - PreparationError, PreparationOutcome, Router, Switch, SwitchClient, SwitchError, TftpServer, - Topology, k8s::K8sClient, + HttpServer, IpAddress, K8sclient, LoadBalancer, LoadBalancerService, LogicalHost, NetworkError, + NetworkManager, PreparationError, PreparationOutcome, Router, Switch, SwitchClient, + SwitchError, TftpServer, Topology, k8s::K8sClient, }; -use std::collections::{BTreeMap, HashSet}; -use std::sync::Arc; +use std::sync::{Arc, OnceLock}; #[derive(Debug, Clone)] pub struct HAClusterTopology { @@ -45,6 +35,7 @@ pub struct HAClusterTopology { pub control_plane: Vec, pub workers: Vec, pub kubeconfig: Option, + pub network_manager: OnceLock>, } #[async_trait] @@ -98,263 +89,12 @@ impl HAClusterTopology { .to_string() } - async fn ensure_nmstate_operator_installed(&self) -> Result<(), String> { - let k8s_client = self.k8s_client().await?; + pub async fn network_manager(&self) -> &dyn NetworkManager { + let k8s_client = self.k8s_client().await.unwrap(); - debug!("Installing NMState controller..."); - k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/nmstate.io_nmstates.yaml -").unwrap(), Some("nmstate")) - .await - .map_err(|e| e.to_string())?; - - debug!("Creating NMState namespace..."); - k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/namespace.yaml -").unwrap(), Some("nmstate")) - .await - .map_err(|e| e.to_string())?; - - debug!("Creating NMState service account..."); - k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/service_account.yaml -").unwrap(), Some("nmstate")) - .await - .map_err(|e| e.to_string())?; - - debug!("Creating NMState role..."); - k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role.yaml -").unwrap(), Some("nmstate")) - .await - .map_err(|e| e.to_string())?; - - debug!("Creating NMState role binding..."); - k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role_binding.yaml -").unwrap(), Some("nmstate")) - .await - .map_err(|e| e.to_string())?; - - debug!("Creating NMState operator..."); - k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/operator.yaml -").unwrap(), Some("nmstate")) - .await - .map_err(|e| e.to_string())?; - - k8s_client - .wait_until_deployment_ready("nmstate-operator", Some("nmstate"), None) - .await?; - - let nmstate = NMState { - metadata: ObjectMeta { - name: Some("nmstate".to_string()), - ..Default::default() - }, - ..Default::default() - }; - debug!( - "Creating NMState:\n{}", - serde_yaml::to_string(&nmstate).unwrap() - ); - k8s_client - .apply(&nmstate, None) - .await - .map_err(|e| e.to_string())?; - - Ok(()) - } - - async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> { - self.ensure_nmstate_operator_installed() - .await - .map_err(|e| { - SwitchError::new(format!( - "Can't configure bond, NMState operator not available: {e}" - )) - })?; - - let hostname = self.get_hostname(&config.host_id).await.map_err(|e| { - SwitchError::new(format!( - "Can't configure bond, can't get hostname for host '{}': {e}", - config.host_id - )) - })?; - let bond_id = self.get_next_bond_id(&hostname).await.map_err(|e| { - SwitchError::new(format!( - "Can't configure bond, can't get an available bond id for host '{}': {e}", - config.host_id - )) - })?; - let bond_config = self.create_bond_configuration(&hostname, &bond_id, config); - - debug!( - "Applying NMState bond config for host {}:\n{}", - serde_yaml::to_string(&bond_config).unwrap(), - config.host_id - ); - self.k8s_client() - .await - .unwrap() - .apply(&bond_config, None) - .await - .map_err(|e| SwitchError::new(format!("Failed to configure bond: {e}")))?; - - Ok(()) - } - - fn create_bond_configuration( - &self, - host: &str, - bond_name: &str, - config: &HostNetworkConfig, - ) -> NodeNetworkConfigurationPolicy { - info!("Configuring bond '{bond_name}' for host '{host}'..."); - - let mut bond_mtu: Option = None; - let mut copy_mac_from: Option = None; - let mut bond_ports = Vec::new(); - let mut interfaces: Vec = Vec::new(); - - for switch_port in &config.switch_ports { - let interface_name = switch_port.interface.name.clone(); - - interfaces.push(nmstate::Interface { - name: interface_name.clone(), - description: Some(format!("Member of bond {bond_name}")), - r#type: nmstate::InterfaceType::Ethernet, - state: "up".to_string(), - mtu: Some(switch_port.interface.mtu), - mac_address: Some(switch_port.interface.mac_address.to_string()), - ipv4: Some(nmstate::IpStackSpec { - enabled: Some(false), - ..Default::default() - }), - ipv6: Some(nmstate::IpStackSpec { - enabled: Some(false), - ..Default::default() - }), - link_aggregation: None, - ..Default::default() - }); - - bond_ports.push(interface_name.clone()); - - // Use the first port's details for the bond mtu and mac address - if bond_mtu.is_none() { - bond_mtu = Some(switch_port.interface.mtu); - } - if copy_mac_from.is_none() { - copy_mac_from = Some(interface_name); - } - } - - interfaces.push(nmstate::Interface { - name: bond_name.to_string(), - description: Some(format!("Network bond for host {host}")), - r#type: nmstate::InterfaceType::Bond, - state: "up".to_string(), - copy_mac_from, - ipv4: Some(nmstate::IpStackSpec { - dhcp: Some(true), - enabled: Some(true), - ..Default::default() - }), - ipv6: Some(nmstate::IpStackSpec { - dhcp: Some(true), - autoconf: Some(true), - enabled: Some(true), - ..Default::default() - }), - link_aggregation: Some(nmstate::BondSpec { - mode: "802.3ad".to_string(), - ports: bond_ports, - ..Default::default() - }), - ..Default::default() - }); - - NodeNetworkConfigurationPolicy { - metadata: ObjectMeta { - name: Some(format!("{host}-bond-config")), - ..Default::default() - }, - spec: NodeNetworkConfigurationPolicySpec { - node_selector: Some(BTreeMap::from([( - "kubernetes.io/hostname".to_string(), - host.to_string(), - )])), - desired_state: nmstate::NetworkState { - interfaces, - ..Default::default() - }, - }, - } - } - - async fn get_hostname(&self, host_id: &Id) -> Result { - let nodes: ObjectList = self - .k8s_client() - .await - .unwrap() - .list_resources(None, None) - .await - .map_err(|e| format!("Failed to list nodes: {e}"))?; - - let Some(node) = nodes.iter().find(|n| { - n.status - .as_ref() - .and_then(|s| s.node_info.as_ref()) - .map(|i| i.system_uuid == host_id.to_string()) - .unwrap_or(false) - }) else { - return Err(format!("No node found for host '{host_id}'")); - }; - - node.labels() - .get("kubernetes.io/hostname") - .ok_or(format!( - "Node '{host_id}' has no kubernetes.io/hostname label" - )) - .cloned() - } - - async fn get_next_bond_id(&self, hostname: &str) -> Result { - let network_state: Option = self - .k8s_client() - .await - .unwrap() - .get_resource(hostname, None) - .await - .map_err(|e| format!("Failed to list nodes: {e}"))?; - - let interfaces = vec![]; - let existing_bonds: Vec<&nmstate::Interface> = network_state + self.network_manager + .get_or_init(|| Arc::new(OpenShiftNmStateNetworkManager::new(k8s_client.clone()))) .as_ref() - .and_then(|network_state| network_state.status.current_state.as_ref()) - .map_or(&interfaces, |current_state| ¤t_state.interfaces) - .iter() - .filter(|i| i.r#type == nmstate::InterfaceType::Bond && i.link_aggregation.is_some()) - .collect(); - - let used_ids: HashSet = existing_bonds - .iter() - .filter_map(|i| { - i.name - .strip_prefix("bond") - .and_then(|id| id.parse::().ok()) - }) - .collect(); - - let next_id = (0..).find(|id| !used_ids.contains(id)).unwrap(); - Ok(format!("bond{next_id}")) - } - - async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> { - debug!("Configuring port channel: {config:#?}"); - let switch_ports = config.switch_ports.iter().map(|s| s.port.clone()).collect(); - - self.switch_client - .configure_port_channel(&format!("Harmony_{}", config.host_id), switch_ports) - .await - .map_err(|e| SwitchError::new(format!("Failed to configure switch: {e}")))?; - - Ok(()) } pub fn autoload() -> Self { @@ -378,6 +118,7 @@ impl HAClusterTopology { bootstrap_host: dummy_host, control_plane: vec![], workers: vec![], + network_manager: OnceLock::new(), } } } @@ -545,9 +286,30 @@ impl Switch for HAClusterTopology { self.switch_client.find_port(mac_address).await } - async fn configure_host_network(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> { - self.configure_bond(config).await?; - self.configure_port_channel(config).await + async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> { + debug!("Configuring port channel: {config:#?}"); + let switch_ports = config.switch_ports.iter().map(|s| s.port.clone()).collect(); + + self.switch_client + .configure_port_channel(&format!("Harmony_{}", config.host_id), switch_ports) + .await + .map_err(|e| SwitchError::new(format!("Failed to configure port-channel: {e}")))?; + + Ok(()) + } +} + +#[async_trait] +impl NetworkManager for HAClusterTopology { + async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError> { + self.network_manager() + .await + .ensure_network_manager_installed() + .await + } + + async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError> { + self.network_manager().await.configure_bond(config).await } } diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 03c59e1..fccf0d1 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -25,7 +25,7 @@ use kube::{ api::{ApiResource, GroupVersionKind}, runtime::wait::await_condition, }; -use log::{debug, error, info, trace, warn}; +use log::{debug, error, trace, warn}; use serde::{Serialize, de::DeserializeOwned}; use serde_json::json; use similar::TextDiff; diff --git a/harmony/src/domain/topology/network.rs b/harmony/src/domain/topology/network.rs index cf172ee..d9e4f72 100644 --- a/harmony/src/domain/topology/network.rs +++ b/harmony/src/domain/topology/network.rs @@ -15,7 +15,7 @@ use harmony_types::{ }; use serde::Serialize; -use crate::{executors::ExecutorError, hardware::PhysicalHost}; +use crate::executors::ExecutorError; use super::{LogicalHost, k8s::K8sClient}; @@ -183,6 +183,37 @@ impl FromStr for DnsRecordType { } } +#[async_trait] +pub trait NetworkManager: Debug + Send + Sync { + async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError>; + async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError>; +} + +#[derive(Debug, Clone, new)] +pub struct NetworkError { + msg: String, +} + +impl fmt::Display for NetworkError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.write_str(&self.msg) + } +} + +impl Error for NetworkError {} + +impl From for NetworkError { + fn from(value: kube::Error) -> Self { + NetworkError::new(value.to_string()) + } +} + +impl From for NetworkError { + fn from(value: String) -> Self { + NetworkError::new(value) + } +} + #[async_trait] pub trait Switch: Send + Sync { async fn setup_switch(&self) -> Result<(), SwitchError>; @@ -192,7 +223,7 @@ pub trait Switch: Send + Sync { mac_address: &MacAddress, ) -> Result, SwitchError>; - async fn configure_host_network(&self, config: &HostNetworkConfig) -> Result<(), SwitchError>; + async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError>; } #[derive(Clone, Debug, PartialEq)] diff --git a/harmony/src/infra/mod.rs b/harmony/src/infra/mod.rs index 203cf90..c82a118 100644 --- a/harmony/src/infra/mod.rs +++ b/harmony/src/infra/mod.rs @@ -3,5 +3,6 @@ pub mod executors; pub mod hp_ilo; pub mod intel_amt; pub mod inventory; +pub mod network_manager; pub mod opnsense; mod sqlx; diff --git a/harmony/src/infra/network_manager.rs b/harmony/src/infra/network_manager.rs new file mode 100644 index 0000000..6518b3d --- /dev/null +++ b/harmony/src/infra/network_manager.rs @@ -0,0 +1,259 @@ +use std::{ + collections::{BTreeMap, HashSet}, + sync::Arc, +}; + +use async_trait::async_trait; +use harmony_types::id::Id; +use k8s_openapi::api::core::v1::Node; +use kube::{ + ResourceExt, + api::{ObjectList, ObjectMeta}, +}; +use log::{debug, info}; + +use crate::{ + modules::okd::crd::nmstate, + topology::{HostNetworkConfig, NetworkError, NetworkManager, k8s::K8sClient}, +}; + +pub struct OpenShiftNmStateNetworkManager { + k8s_client: Arc, +} + +impl std::fmt::Debug for OpenShiftNmStateNetworkManager { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("OpenShiftNmStateNetworkManager").finish() + } +} + +#[async_trait] +impl NetworkManager for OpenShiftNmStateNetworkManager { + async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError> { + debug!("Installing NMState controller..."); + self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/nmstate.io_nmstates.yaml +").unwrap(), Some("nmstate")) + .await?; + + debug!("Creating NMState namespace..."); + self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/namespace.yaml +").unwrap(), Some("nmstate")) + .await?; + + debug!("Creating NMState service account..."); + self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/service_account.yaml +").unwrap(), Some("nmstate")) + .await?; + + debug!("Creating NMState role..."); + self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role.yaml +").unwrap(), Some("nmstate")) + .await?; + + debug!("Creating NMState role binding..."); + self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role_binding.yaml +").unwrap(), Some("nmstate")) + .await?; + + debug!("Creating NMState operator..."); + self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/operator.yaml +").unwrap(), Some("nmstate")) + .await?; + + self.k8s_client + .wait_until_deployment_ready("nmstate-operator", Some("nmstate"), None) + .await?; + + let nmstate = nmstate::NMState { + metadata: ObjectMeta { + name: Some("nmstate".to_string()), + ..Default::default() + }, + ..Default::default() + }; + debug!( + "Creating NMState:\n{}", + serde_yaml::to_string(&nmstate).unwrap() + ); + self.k8s_client.apply(&nmstate, None).await?; + + Ok(()) + } + + async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError> { + let hostname = self.get_hostname(&config.host_id).await.map_err(|e| { + NetworkError::new(format!( + "Can't configure bond, can't get hostname for host '{}': {e}", + config.host_id + )) + })?; + let bond_id = self.get_next_bond_id(&hostname).await.map_err(|e| { + NetworkError::new(format!( + "Can't configure bond, can't get an available bond id for host '{}': {e}", + config.host_id + )) + })?; + let bond_config = self.create_bond_configuration(&hostname, &bond_id, config); + + debug!( + "Applying NMState bond config for host {}:\n{}", + serde_yaml::to_string(&bond_config).unwrap(), + config.host_id + ); + self.k8s_client + .apply(&bond_config, None) + .await + .map_err(|e| NetworkError::new(format!("Failed to configure bond: {e}")))?; + + Ok(()) + } +} + +impl OpenShiftNmStateNetworkManager { + pub fn new(k8s_client: Arc) -> Self { + Self { k8s_client } + } + + fn create_bond_configuration( + &self, + host: &str, + bond_name: &str, + config: &HostNetworkConfig, + ) -> nmstate::NodeNetworkConfigurationPolicy { + info!("Configuring bond '{bond_name}' for host '{host}'..."); + + let mut bond_mtu: Option = None; + let mut copy_mac_from: Option = None; + let mut bond_ports = Vec::new(); + let mut interfaces: Vec = Vec::new(); + + for switch_port in &config.switch_ports { + let interface_name = switch_port.interface.name.clone(); + + interfaces.push(nmstate::Interface { + name: interface_name.clone(), + description: Some(format!("Member of bond {bond_name}")), + r#type: nmstate::InterfaceType::Ethernet, + state: "up".to_string(), + mtu: Some(switch_port.interface.mtu), + mac_address: Some(switch_port.interface.mac_address.to_string()), + ipv4: Some(nmstate::IpStackSpec { + enabled: Some(false), + ..Default::default() + }), + ipv6: Some(nmstate::IpStackSpec { + enabled: Some(false), + ..Default::default() + }), + link_aggregation: None, + ..Default::default() + }); + + bond_ports.push(interface_name.clone()); + + // Use the first port's details for the bond mtu and mac address + if bond_mtu.is_none() { + bond_mtu = Some(switch_port.interface.mtu); + } + if copy_mac_from.is_none() { + copy_mac_from = Some(interface_name); + } + } + + interfaces.push(nmstate::Interface { + name: bond_name.to_string(), + description: Some(format!("Network bond for host {host}")), + r#type: nmstate::InterfaceType::Bond, + state: "up".to_string(), + copy_mac_from, + ipv4: Some(nmstate::IpStackSpec { + dhcp: Some(true), + enabled: Some(true), + ..Default::default() + }), + ipv6: Some(nmstate::IpStackSpec { + dhcp: Some(true), + autoconf: Some(true), + enabled: Some(true), + ..Default::default() + }), + link_aggregation: Some(nmstate::BondSpec { + mode: "802.3ad".to_string(), + ports: bond_ports, + ..Default::default() + }), + ..Default::default() + }); + + nmstate::NodeNetworkConfigurationPolicy { + metadata: ObjectMeta { + name: Some(format!("{host}-bond-config")), + ..Default::default() + }, + spec: nmstate::NodeNetworkConfigurationPolicySpec { + node_selector: Some(BTreeMap::from([( + "kubernetes.io/hostname".to_string(), + host.to_string(), + )])), + desired_state: nmstate::NetworkState { + interfaces, + ..Default::default() + }, + }, + } + } + + async fn get_hostname(&self, host_id: &Id) -> Result { + let nodes: ObjectList = self + .k8s_client + .list_resources(None, None) + .await + .map_err(|e| format!("Failed to list nodes: {e}"))?; + + let Some(node) = nodes.iter().find(|n| { + n.status + .as_ref() + .and_then(|s| s.node_info.as_ref()) + .map(|i| i.system_uuid == host_id.to_string()) + .unwrap_or(false) + }) else { + return Err(format!("No node found for host '{host_id}'")); + }; + + node.labels() + .get("kubernetes.io/hostname") + .ok_or(format!( + "Node '{host_id}' has no kubernetes.io/hostname label" + )) + .cloned() + } + + async fn get_next_bond_id(&self, hostname: &str) -> Result { + let network_state: Option = self + .k8s_client + .get_resource(hostname, None) + .await + .map_err(|e| format!("Failed to list nodes: {e}"))?; + + let interfaces = vec![]; + let existing_bonds: Vec<&nmstate::Interface> = network_state + .as_ref() + .and_then(|network_state| network_state.status.current_state.as_ref()) + .map_or(&interfaces, |current_state| ¤t_state.interfaces) + .iter() + .filter(|i| i.r#type == nmstate::InterfaceType::Bond && i.link_aggregation.is_some()) + .collect(); + + let used_ids: HashSet = existing_bonds + .iter() + .filter_map(|i| { + i.name + .strip_prefix("bond") + .and_then(|id| id.parse::().ok()) + }) + .collect(); + + let next_id = (0..).find(|id| !used_ids.contains(id)).unwrap(); + Ok(format!("bond{next_id}")) + } +} diff --git a/harmony/src/modules/okd/host_network.rs b/harmony/src/modules/okd/host_network.rs index 39fb027..d9a0063 100644 --- a/harmony/src/modules/okd/host_network.rs +++ b/harmony/src/modules/okd/host_network.rs @@ -9,7 +9,7 @@ use crate::{ interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, score::Score, - topology::{HostNetworkConfig, NetworkInterface, Switch, SwitchPort, Topology}, + topology::{HostNetworkConfig, NetworkInterface, NetworkManager, Switch, SwitchPort, Topology}, }; #[derive(Debug, Clone, Serialize)] @@ -17,7 +17,7 @@ pub struct HostNetworkConfigurationScore { pub hosts: Vec, } -impl Score for HostNetworkConfigurationScore { +impl Score for HostNetworkConfigurationScore { fn name(&self) -> String { "HostNetworkConfigurationScore".into() } @@ -35,7 +35,7 @@ pub struct HostNetworkConfigurationInterpret { } impl HostNetworkConfigurationInterpret { - async fn configure_network_for_host( + async fn configure_network_for_host( &self, topology: &T, host: &PhysicalHost, @@ -67,10 +67,15 @@ impl HostNetworkConfigurationInterpret { ); info!("[Host {current_host}/{total_hosts}] Configuring host network..."); + topology.configure_bond(&config).await.map_err(|e| { + InterpretError::new(format!("Failed to configure host network: {e}")) + })?; topology - .configure_host_network(&config) + .configure_port_channel(&config) .await - .map_err(|e| InterpretError::new(format!("Failed to configure host: {e}")))?; + .map_err(|e| { + InterpretError::new(format!("Failed to configure host network: {e}")) + })?; } else { info!( "[Host {current_host}/{total_hosts}] No ports found for {} interfaces, skipping", @@ -169,7 +174,7 @@ impl HostNetworkConfigurationInterpret { } #[async_trait] -impl Interpret for HostNetworkConfigurationInterpret { +impl Interpret for HostNetworkConfigurationInterpret { fn get_name(&self) -> InterpretName { InterpretName::Custom("HostNetworkConfigurationInterpret") } @@ -198,6 +203,12 @@ impl Interpret for HostNetworkConfigurationInterpret { let host_count = self.score.hosts.len(); info!("Started network configuration for {host_count} host(s)...",); + info!("Setting up NetworkManager...",); + topology + .ensure_network_manager_installed() + .await + .map_err(|e| InterpretError::new(format!("NetworkManager setup failed: {e}")))?; + info!("Setting up switch with sane defaults..."); topology .setup_switch() @@ -242,7 +253,8 @@ mod tests { use crate::{ hardware::HostCategory, topology::{ - HostNetworkConfig, PreparationError, PreparationOutcome, SwitchError, SwitchPort, + HostNetworkConfig, NetworkError, PreparationError, PreparationOutcome, SwitchError, + SwitchPort, }, }; use std::{ @@ -290,15 +302,27 @@ mod tests { } #[tokio::test] - async fn host_with_one_mac_address_should_create_bond_with_one_interface() { + async fn should_setup_network_manager() { let host = given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]); let score = given_score(vec![host]); let topology = TopologyWithSwitch::new(); let _ = score.interpret(&Inventory::empty(), &topology).await; - let configured_host_networks = topology.configured_host_networks.lock().unwrap(); - assert_that!(*configured_host_networks).contains_exactly(vec![( + let network_manager_setup = topology.network_manager_setup.lock().unwrap(); + assert_that!(*network_manager_setup).is_true(); + } + + #[tokio::test] + async fn host_with_one_mac_address_should_configure_bond_with_one_interface() { + let host = given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]); + let score = given_score(vec![host]); + let topology = TopologyWithSwitch::new(); + + let _ = score.interpret(&Inventory::empty(), &topology).await; + + let config = topology.configured_bonds.lock().unwrap(); + assert_that!(*config).contains_exactly(vec![( HOST_ID.clone(), HostNetworkConfig { host_id: HOST_ID.clone(), @@ -311,7 +335,28 @@ mod tests { } #[tokio::test] - async fn host_with_multiple_mac_addresses_should_create_one_bond_with_all_interfaces() { + async fn host_with_one_mac_address_should_configure_port_channel_with_one_interface() { + let host = given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]); + let score = given_score(vec![host]); + let topology = TopologyWithSwitch::new(); + + let _ = score.interpret(&Inventory::empty(), &topology).await; + + let config = topology.configured_port_channels.lock().unwrap(); + assert_that!(*config).contains_exactly(vec![( + HOST_ID.clone(), + HostNetworkConfig { + host_id: HOST_ID.clone(), + switch_ports: vec![SwitchPort { + interface: EXISTING_INTERFACE.clone(), + port: PORT.clone(), + }], + }, + )]); + } + + #[tokio::test] + async fn host_with_multiple_mac_addresses_should_configure_one_bond_with_all_interfaces() { let score = given_score(vec![given_host( &HOST_ID, vec![ @@ -323,8 +368,8 @@ mod tests { let _ = score.interpret(&Inventory::empty(), &topology).await; - let configured_host_networks = topology.configured_host_networks.lock().unwrap(); - assert_that!(*configured_host_networks).contains_exactly(vec![( + let config = topology.configured_bonds.lock().unwrap(); + assert_that!(*config).contains_exactly(vec![( HOST_ID.clone(), HostNetworkConfig { host_id: HOST_ID.clone(), @@ -343,7 +388,40 @@ mod tests { } #[tokio::test] - async fn multiple_hosts_should_create_one_bond_per_host() { + async fn host_with_multiple_mac_addresses_should_configure_one_port_channel_with_all_interfaces() + { + let score = given_score(vec![given_host( + &HOST_ID, + vec![ + EXISTING_INTERFACE.clone(), + ANOTHER_EXISTING_INTERFACE.clone(), + ], + )]); + let topology = TopologyWithSwitch::new(); + + let _ = score.interpret(&Inventory::empty(), &topology).await; + + let config = topology.configured_port_channels.lock().unwrap(); + assert_that!(*config).contains_exactly(vec![( + HOST_ID.clone(), + HostNetworkConfig { + host_id: HOST_ID.clone(), + switch_ports: vec![ + SwitchPort { + interface: EXISTING_INTERFACE.clone(), + port: PORT.clone(), + }, + SwitchPort { + interface: ANOTHER_EXISTING_INTERFACE.clone(), + port: ANOTHER_PORT.clone(), + }, + ], + }, + )]); + } + + #[tokio::test] + async fn multiple_hosts_should_configure_one_bond_per_host() { let score = given_score(vec![ given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]), given_host(&ANOTHER_HOST_ID, vec![ANOTHER_EXISTING_INTERFACE.clone()]), @@ -352,8 +430,43 @@ mod tests { let _ = score.interpret(&Inventory::empty(), &topology).await; - let configured_host_networks = topology.configured_host_networks.lock().unwrap(); - assert_that!(*configured_host_networks).contains_exactly(vec![ + let config = topology.configured_bonds.lock().unwrap(); + assert_that!(*config).contains_exactly(vec![ + ( + HOST_ID.clone(), + HostNetworkConfig { + host_id: HOST_ID.clone(), + switch_ports: vec![SwitchPort { + interface: EXISTING_INTERFACE.clone(), + port: PORT.clone(), + }], + }, + ), + ( + ANOTHER_HOST_ID.clone(), + HostNetworkConfig { + host_id: ANOTHER_HOST_ID.clone(), + switch_ports: vec![SwitchPort { + interface: ANOTHER_EXISTING_INTERFACE.clone(), + port: ANOTHER_PORT.clone(), + }], + }, + ), + ]); + } + + #[tokio::test] + async fn multiple_hosts_should_configure_one_port_channel_per_host() { + let score = given_score(vec![ + given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]), + given_host(&ANOTHER_HOST_ID, vec![ANOTHER_EXISTING_INTERFACE.clone()]), + ]); + let topology = TopologyWithSwitch::new(); + + let _ = score.interpret(&Inventory::empty(), &topology).await; + + let config = topology.configured_port_channels.lock().unwrap(); + assert_that!(*config).contains_exactly(vec![ ( HOST_ID.clone(), HostNetworkConfig { @@ -384,8 +497,10 @@ mod tests { let _ = score.interpret(&Inventory::empty(), &topology).await; - let configured_host_networks = topology.configured_host_networks.lock().unwrap(); - assert_that!(*configured_host_networks).is_empty(); + let config = topology.configured_port_channels.lock().unwrap(); + assert_that!(*config).is_empty(); + let config = topology.configured_bonds.lock().unwrap(); + assert_that!(*config).is_empty(); } fn given_score(hosts: Vec) -> HostNetworkConfigurationScore { @@ -422,26 +537,33 @@ mod tests { } } + #[derive(Debug)] struct TopologyWithSwitch { available_ports: Arc>>, - configured_host_networks: Arc>>, + configured_port_channels: Arc>>, switch_setup: Arc>, + network_manager_setup: Arc>, + configured_bonds: Arc>>, } impl TopologyWithSwitch { fn new() -> Self { Self { available_ports: Arc::new(Mutex::new(vec![PORT.clone(), ANOTHER_PORT.clone()])), - configured_host_networks: Arc::new(Mutex::new(vec![])), + configured_port_channels: Arc::new(Mutex::new(vec![])), switch_setup: Arc::new(Mutex::new(false)), + network_manager_setup: Arc::new(Mutex::new(false)), + configured_bonds: Arc::new(Mutex::new(vec![])), } } fn new_port_not_found() -> Self { Self { available_ports: Arc::new(Mutex::new(vec![])), - configured_host_networks: Arc::new(Mutex::new(vec![])), + configured_port_channels: Arc::new(Mutex::new(vec![])), switch_setup: Arc::new(Mutex::new(false)), + network_manager_setup: Arc::new(Mutex::new(false)), + configured_bonds: Arc::new(Mutex::new(vec![])), } } } @@ -457,6 +579,22 @@ mod tests { } } + #[async_trait] + impl NetworkManager for TopologyWithSwitch { + async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError> { + let mut network_manager_installed = self.network_manager_setup.lock().unwrap(); + *network_manager_installed = true; + Ok(()) + } + + async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError> { + let mut configured_bonds = self.configured_bonds.lock().unwrap(); + configured_bonds.push((config.host_id.clone(), config.clone())); + + Ok(()) + } + } + #[async_trait] impl Switch for TopologyWithSwitch { async fn setup_switch(&self) -> Result<(), SwitchError> { @@ -476,12 +614,12 @@ mod tests { Ok(Some(ports.remove(0))) } - async fn configure_host_network( + async fn configure_port_channel( &self, config: &HostNetworkConfig, ) -> Result<(), SwitchError> { - let mut configured_host_networks = self.configured_host_networks.lock().unwrap(); - configured_host_networks.push((config.host_id.clone(), config.clone())); + let mut configured_port_channels = self.configured_port_channels.lock().unwrap(); + configured_port_channels.push((config.host_id.clone(), config.clone())); Ok(()) }