refactor(host_network): extract NetworkManager as a reusable component

This commit is contained in:
Ian Letourneau 2025-11-04 17:18:25 -05:00
parent cab4eb19ed
commit 4ea1af8d72
10 changed files with 506 additions and 305 deletions

View File

@ -1,6 +1,6 @@
use std::{ use std::{
net::{IpAddr, Ipv4Addr}, net::{IpAddr, Ipv4Addr},
sync::Arc, sync::{Arc, OnceLock},
}; };
use brocade::BrocadeOptions; use brocade::BrocadeOptions;
@ -107,6 +107,7 @@ async fn main() {
}, },
], ],
switch_client: switch_client.clone(), switch_client: switch_client.clone(),
network_manager: OnceLock::new(),
}; };
let inventory = Inventory { let inventory = Inventory {

View File

@ -9,7 +9,10 @@ use harmony::{
use harmony_macros::{ip, ipv4}; use harmony_macros::{ip, ipv4};
use harmony_secret::{Secret, SecretManager}; use harmony_secret::{Secret, SecretManager};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{net::IpAddr, sync::Arc}; use std::{
net::IpAddr,
sync::{Arc, OnceLock},
};
#[derive(Secret, Serialize, Deserialize, Debug, PartialEq)] #[derive(Secret, Serialize, Deserialize, Debug, PartialEq)]
struct OPNSenseFirewallConfig { struct OPNSenseFirewallConfig {
@ -81,6 +84,7 @@ pub async fn get_topology() -> HAClusterTopology {
}, },
workers: vec![], workers: vec![],
switch_client: switch_client.clone(), switch_client: switch_client.clone(),
network_manager: OnceLock::new(),
} }
} }

View File

@ -10,7 +10,10 @@ use harmony::{
use harmony_macros::{ip, ipv4}; use harmony_macros::{ip, ipv4};
use harmony_secret::{Secret, SecretManager}; use harmony_secret::{Secret, SecretManager};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{net::IpAddr, sync::Arc}; use std::{
net::IpAddr,
sync::{Arc, OnceLock},
};
pub async fn get_topology() -> HAClusterTopology { pub async fn get_topology() -> HAClusterTopology {
let firewall = harmony::topology::LogicalHost { let firewall = harmony::topology::LogicalHost {
@ -76,6 +79,7 @@ pub async fn get_topology() -> HAClusterTopology {
}, },
workers: vec![], workers: vec![],
switch_client: switch_client.clone(), switch_client: switch_client.clone(),
network_manager: OnceLock::new(),
} }
} }

View File

@ -1,6 +1,6 @@
use std::{ use std::{
net::{IpAddr, Ipv4Addr}, net::{IpAddr, Ipv4Addr},
sync::Arc, sync::{Arc, OnceLock},
}; };
use brocade::BrocadeOptions; use brocade::BrocadeOptions;
@ -79,6 +79,7 @@ async fn main() {
}, },
workers: vec![], workers: vec![],
switch_client: switch_client.clone(), switch_client: switch_client.clone(),
network_manager: OnceLock::new(),
}; };
let inventory = Inventory { let inventory = Inventory {

View File

@ -1,34 +1,24 @@
use async_trait::async_trait; use async_trait::async_trait;
use harmony_macros::ip; use harmony_macros::ip;
use harmony_types::{ use harmony_types::{
id::Id,
net::{MacAddress, Url}, net::{MacAddress, Url},
switch::PortLocation, switch::PortLocation,
}; };
use k8s_openapi::api::core::v1::Node;
use kube::{
ResourceExt,
api::{ObjectList, ObjectMeta},
};
use log::debug; use log::debug;
use log::info; use log::info;
use crate::modules::okd::crd::nmstate::{self, NodeNetworkConfigurationPolicy}; use crate::infra::network_manager::OpenShiftNmStateNetworkManager;
use crate::topology::PxeOptions; use crate::topology::PxeOptions;
use crate::{data::FileContent, modules::okd::crd::nmstate::NMState}; use crate::{data::FileContent, executors::ExecutorError};
use crate::{
executors::ExecutorError, modules::okd::crd::nmstate::NodeNetworkConfigurationPolicySpec,
};
use super::{ use super::{
DHCPStaticEntry, DhcpServer, DnsRecord, DnsRecordType, DnsServer, Firewall, HostNetworkConfig, DHCPStaticEntry, DhcpServer, DnsRecord, DnsRecordType, DnsServer, Firewall, HostNetworkConfig,
HttpServer, IpAddress, K8sclient, LoadBalancer, LoadBalancerService, LogicalHost, HttpServer, IpAddress, K8sclient, LoadBalancer, LoadBalancerService, LogicalHost, NetworkError,
PreparationError, PreparationOutcome, Router, Switch, SwitchClient, SwitchError, TftpServer, NetworkManager, PreparationError, PreparationOutcome, Router, Switch, SwitchClient,
Topology, k8s::K8sClient, SwitchError, TftpServer, Topology, k8s::K8sClient,
}; };
use std::collections::{BTreeMap, HashSet}; use std::sync::{Arc, OnceLock};
use std::sync::Arc;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct HAClusterTopology { pub struct HAClusterTopology {
@ -45,6 +35,7 @@ pub struct HAClusterTopology {
pub control_plane: Vec<LogicalHost>, pub control_plane: Vec<LogicalHost>,
pub workers: Vec<LogicalHost>, pub workers: Vec<LogicalHost>,
pub kubeconfig: Option<String>, pub kubeconfig: Option<String>,
pub network_manager: OnceLock<Arc<dyn NetworkManager>>,
} }
#[async_trait] #[async_trait]
@ -98,263 +89,12 @@ impl HAClusterTopology {
.to_string() .to_string()
} }
async fn ensure_nmstate_operator_installed(&self) -> Result<(), String> { pub async fn network_manager(&self) -> &dyn NetworkManager {
let k8s_client = self.k8s_client().await?; let k8s_client = self.k8s_client().await.unwrap();
debug!("Installing NMState controller..."); self.network_manager
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/nmstate.io_nmstates.yaml .get_or_init(|| Arc::new(OpenShiftNmStateNetworkManager::new(k8s_client.clone())))
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
debug!("Creating NMState namespace...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/namespace.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
debug!("Creating NMState service account...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/service_account.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
debug!("Creating NMState role...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
debug!("Creating NMState role binding...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role_binding.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
debug!("Creating NMState operator...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/operator.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
k8s_client
.wait_until_deployment_ready("nmstate-operator", Some("nmstate"), None)
.await?;
let nmstate = NMState {
metadata: ObjectMeta {
name: Some("nmstate".to_string()),
..Default::default()
},
..Default::default()
};
debug!(
"Creating NMState:\n{}",
serde_yaml::to_string(&nmstate).unwrap()
);
k8s_client
.apply(&nmstate, None)
.await
.map_err(|e| e.to_string())?;
Ok(())
}
async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> {
self.ensure_nmstate_operator_installed()
.await
.map_err(|e| {
SwitchError::new(format!(
"Can't configure bond, NMState operator not available: {e}"
))
})?;
let hostname = self.get_hostname(&config.host_id).await.map_err(|e| {
SwitchError::new(format!(
"Can't configure bond, can't get hostname for host '{}': {e}",
config.host_id
))
})?;
let bond_id = self.get_next_bond_id(&hostname).await.map_err(|e| {
SwitchError::new(format!(
"Can't configure bond, can't get an available bond id for host '{}': {e}",
config.host_id
))
})?;
let bond_config = self.create_bond_configuration(&hostname, &bond_id, config);
debug!(
"Applying NMState bond config for host {}:\n{}",
serde_yaml::to_string(&bond_config).unwrap(),
config.host_id
);
self.k8s_client()
.await
.unwrap()
.apply(&bond_config, None)
.await
.map_err(|e| SwitchError::new(format!("Failed to configure bond: {e}")))?;
Ok(())
}
fn create_bond_configuration(
&self,
host: &str,
bond_name: &str,
config: &HostNetworkConfig,
) -> NodeNetworkConfigurationPolicy {
info!("Configuring bond '{bond_name}' for host '{host}'...");
let mut bond_mtu: Option<u32> = None;
let mut copy_mac_from: Option<String> = None;
let mut bond_ports = Vec::new();
let mut interfaces: Vec<nmstate::Interface> = Vec::new();
for switch_port in &config.switch_ports {
let interface_name = switch_port.interface.name.clone();
interfaces.push(nmstate::Interface {
name: interface_name.clone(),
description: Some(format!("Member of bond {bond_name}")),
r#type: nmstate::InterfaceType::Ethernet,
state: "up".to_string(),
mtu: Some(switch_port.interface.mtu),
mac_address: Some(switch_port.interface.mac_address.to_string()),
ipv4: Some(nmstate::IpStackSpec {
enabled: Some(false),
..Default::default()
}),
ipv6: Some(nmstate::IpStackSpec {
enabled: Some(false),
..Default::default()
}),
link_aggregation: None,
..Default::default()
});
bond_ports.push(interface_name.clone());
// Use the first port's details for the bond mtu and mac address
if bond_mtu.is_none() {
bond_mtu = Some(switch_port.interface.mtu);
}
if copy_mac_from.is_none() {
copy_mac_from = Some(interface_name);
}
}
interfaces.push(nmstate::Interface {
name: bond_name.to_string(),
description: Some(format!("Network bond for host {host}")),
r#type: nmstate::InterfaceType::Bond,
state: "up".to_string(),
copy_mac_from,
ipv4: Some(nmstate::IpStackSpec {
dhcp: Some(true),
enabled: Some(true),
..Default::default()
}),
ipv6: Some(nmstate::IpStackSpec {
dhcp: Some(true),
autoconf: Some(true),
enabled: Some(true),
..Default::default()
}),
link_aggregation: Some(nmstate::BondSpec {
mode: "802.3ad".to_string(),
ports: bond_ports,
..Default::default()
}),
..Default::default()
});
NodeNetworkConfigurationPolicy {
metadata: ObjectMeta {
name: Some(format!("{host}-bond-config")),
..Default::default()
},
spec: NodeNetworkConfigurationPolicySpec {
node_selector: Some(BTreeMap::from([(
"kubernetes.io/hostname".to_string(),
host.to_string(),
)])),
desired_state: nmstate::NetworkState {
interfaces,
..Default::default()
},
},
}
}
async fn get_hostname(&self, host_id: &Id) -> Result<String, String> {
let nodes: ObjectList<Node> = self
.k8s_client()
.await
.unwrap()
.list_resources(None, None)
.await
.map_err(|e| format!("Failed to list nodes: {e}"))?;
let Some(node) = nodes.iter().find(|n| {
n.status
.as_ref()
.and_then(|s| s.node_info.as_ref())
.map(|i| i.system_uuid == host_id.to_string())
.unwrap_or(false)
}) else {
return Err(format!("No node found for host '{host_id}'"));
};
node.labels()
.get("kubernetes.io/hostname")
.ok_or(format!(
"Node '{host_id}' has no kubernetes.io/hostname label"
))
.cloned()
}
async fn get_next_bond_id(&self, hostname: &str) -> Result<String, String> {
let network_state: Option<nmstate::NodeNetworkState> = self
.k8s_client()
.await
.unwrap()
.get_resource(hostname, None)
.await
.map_err(|e| format!("Failed to list nodes: {e}"))?;
let interfaces = vec![];
let existing_bonds: Vec<&nmstate::Interface> = network_state
.as_ref() .as_ref()
.and_then(|network_state| network_state.status.current_state.as_ref())
.map_or(&interfaces, |current_state| &current_state.interfaces)
.iter()
.filter(|i| i.r#type == nmstate::InterfaceType::Bond && i.link_aggregation.is_some())
.collect();
let used_ids: HashSet<u32> = existing_bonds
.iter()
.filter_map(|i| {
i.name
.strip_prefix("bond")
.and_then(|id| id.parse::<u32>().ok())
})
.collect();
let next_id = (0..).find(|id| !used_ids.contains(id)).unwrap();
Ok(format!("bond{next_id}"))
}
async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> {
debug!("Configuring port channel: {config:#?}");
let switch_ports = config.switch_ports.iter().map(|s| s.port.clone()).collect();
self.switch_client
.configure_port_channel(&format!("Harmony_{}", config.host_id), switch_ports)
.await
.map_err(|e| SwitchError::new(format!("Failed to configure switch: {e}")))?;
Ok(())
} }
pub fn autoload() -> Self { pub fn autoload() -> Self {
@ -378,6 +118,7 @@ impl HAClusterTopology {
bootstrap_host: dummy_host, bootstrap_host: dummy_host,
control_plane: vec![], control_plane: vec![],
workers: vec![], workers: vec![],
network_manager: OnceLock::new(),
} }
} }
} }
@ -545,9 +286,30 @@ impl Switch for HAClusterTopology {
self.switch_client.find_port(mac_address).await self.switch_client.find_port(mac_address).await
} }
async fn configure_host_network(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> { async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> {
self.configure_bond(config).await?; debug!("Configuring port channel: {config:#?}");
self.configure_port_channel(config).await let switch_ports = config.switch_ports.iter().map(|s| s.port.clone()).collect();
self.switch_client
.configure_port_channel(&format!("Harmony_{}", config.host_id), switch_ports)
.await
.map_err(|e| SwitchError::new(format!("Failed to configure port-channel: {e}")))?;
Ok(())
}
}
#[async_trait]
impl NetworkManager for HAClusterTopology {
async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError> {
self.network_manager()
.await
.ensure_network_manager_installed()
.await
}
async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError> {
self.network_manager().await.configure_bond(config).await
} }
} }

View File

@ -25,7 +25,7 @@ use kube::{
api::{ApiResource, GroupVersionKind}, api::{ApiResource, GroupVersionKind},
runtime::wait::await_condition, runtime::wait::await_condition,
}; };
use log::{debug, error, info, trace, warn}; use log::{debug, error, trace, warn};
use serde::{Serialize, de::DeserializeOwned}; use serde::{Serialize, de::DeserializeOwned};
use serde_json::json; use serde_json::json;
use similar::TextDiff; use similar::TextDiff;

View File

@ -15,7 +15,7 @@ use harmony_types::{
}; };
use serde::Serialize; use serde::Serialize;
use crate::{executors::ExecutorError, hardware::PhysicalHost}; use crate::executors::ExecutorError;
use super::{LogicalHost, k8s::K8sClient}; use super::{LogicalHost, k8s::K8sClient};
@ -183,6 +183,37 @@ impl FromStr for DnsRecordType {
} }
} }
#[async_trait]
pub trait NetworkManager: Debug + Send + Sync {
async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError>;
async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError>;
}
#[derive(Debug, Clone, new)]
pub struct NetworkError {
msg: String,
}
impl fmt::Display for NetworkError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.msg)
}
}
impl Error for NetworkError {}
impl From<kube::Error> for NetworkError {
fn from(value: kube::Error) -> Self {
NetworkError::new(value.to_string())
}
}
impl From<String> for NetworkError {
fn from(value: String) -> Self {
NetworkError::new(value)
}
}
#[async_trait] #[async_trait]
pub trait Switch: Send + Sync { pub trait Switch: Send + Sync {
async fn setup_switch(&self) -> Result<(), SwitchError>; async fn setup_switch(&self) -> Result<(), SwitchError>;
@ -192,7 +223,7 @@ pub trait Switch: Send + Sync {
mac_address: &MacAddress, mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError>; ) -> Result<Option<PortLocation>, SwitchError>;
async fn configure_host_network(&self, config: &HostNetworkConfig) -> Result<(), SwitchError>; async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError>;
} }
#[derive(Clone, Debug, PartialEq)] #[derive(Clone, Debug, PartialEq)]

View File

@ -3,5 +3,6 @@ pub mod executors;
pub mod hp_ilo; pub mod hp_ilo;
pub mod intel_amt; pub mod intel_amt;
pub mod inventory; pub mod inventory;
pub mod network_manager;
pub mod opnsense; pub mod opnsense;
mod sqlx; mod sqlx;

View File

@ -0,0 +1,259 @@
use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
};
use async_trait::async_trait;
use harmony_types::id::Id;
use k8s_openapi::api::core::v1::Node;
use kube::{
ResourceExt,
api::{ObjectList, ObjectMeta},
};
use log::{debug, info};
use crate::{
modules::okd::crd::nmstate,
topology::{HostNetworkConfig, NetworkError, NetworkManager, k8s::K8sClient},
};
pub struct OpenShiftNmStateNetworkManager {
k8s_client: Arc<K8sClient>,
}
impl std::fmt::Debug for OpenShiftNmStateNetworkManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpenShiftNmStateNetworkManager").finish()
}
}
#[async_trait]
impl NetworkManager for OpenShiftNmStateNetworkManager {
async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError> {
debug!("Installing NMState controller...");
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/nmstate.io_nmstates.yaml
").unwrap(), Some("nmstate"))
.await?;
debug!("Creating NMState namespace...");
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/namespace.yaml
").unwrap(), Some("nmstate"))
.await?;
debug!("Creating NMState service account...");
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/service_account.yaml
").unwrap(), Some("nmstate"))
.await?;
debug!("Creating NMState role...");
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role.yaml
").unwrap(), Some("nmstate"))
.await?;
debug!("Creating NMState role binding...");
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role_binding.yaml
").unwrap(), Some("nmstate"))
.await?;
debug!("Creating NMState operator...");
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/operator.yaml
").unwrap(), Some("nmstate"))
.await?;
self.k8s_client
.wait_until_deployment_ready("nmstate-operator", Some("nmstate"), None)
.await?;
let nmstate = nmstate::NMState {
metadata: ObjectMeta {
name: Some("nmstate".to_string()),
..Default::default()
},
..Default::default()
};
debug!(
"Creating NMState:\n{}",
serde_yaml::to_string(&nmstate).unwrap()
);
self.k8s_client.apply(&nmstate, None).await?;
Ok(())
}
async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError> {
let hostname = self.get_hostname(&config.host_id).await.map_err(|e| {
NetworkError::new(format!(
"Can't configure bond, can't get hostname for host '{}': {e}",
config.host_id
))
})?;
let bond_id = self.get_next_bond_id(&hostname).await.map_err(|e| {
NetworkError::new(format!(
"Can't configure bond, can't get an available bond id for host '{}': {e}",
config.host_id
))
})?;
let bond_config = self.create_bond_configuration(&hostname, &bond_id, config);
debug!(
"Applying NMState bond config for host {}:\n{}",
serde_yaml::to_string(&bond_config).unwrap(),
config.host_id
);
self.k8s_client
.apply(&bond_config, None)
.await
.map_err(|e| NetworkError::new(format!("Failed to configure bond: {e}")))?;
Ok(())
}
}
impl OpenShiftNmStateNetworkManager {
pub fn new(k8s_client: Arc<K8sClient>) -> Self {
Self { k8s_client }
}
fn create_bond_configuration(
&self,
host: &str,
bond_name: &str,
config: &HostNetworkConfig,
) -> nmstate::NodeNetworkConfigurationPolicy {
info!("Configuring bond '{bond_name}' for host '{host}'...");
let mut bond_mtu: Option<u32> = None;
let mut copy_mac_from: Option<String> = None;
let mut bond_ports = Vec::new();
let mut interfaces: Vec<nmstate::Interface> = Vec::new();
for switch_port in &config.switch_ports {
let interface_name = switch_port.interface.name.clone();
interfaces.push(nmstate::Interface {
name: interface_name.clone(),
description: Some(format!("Member of bond {bond_name}")),
r#type: nmstate::InterfaceType::Ethernet,
state: "up".to_string(),
mtu: Some(switch_port.interface.mtu),
mac_address: Some(switch_port.interface.mac_address.to_string()),
ipv4: Some(nmstate::IpStackSpec {
enabled: Some(false),
..Default::default()
}),
ipv6: Some(nmstate::IpStackSpec {
enabled: Some(false),
..Default::default()
}),
link_aggregation: None,
..Default::default()
});
bond_ports.push(interface_name.clone());
// Use the first port's details for the bond mtu and mac address
if bond_mtu.is_none() {
bond_mtu = Some(switch_port.interface.mtu);
}
if copy_mac_from.is_none() {
copy_mac_from = Some(interface_name);
}
}
interfaces.push(nmstate::Interface {
name: bond_name.to_string(),
description: Some(format!("Network bond for host {host}")),
r#type: nmstate::InterfaceType::Bond,
state: "up".to_string(),
copy_mac_from,
ipv4: Some(nmstate::IpStackSpec {
dhcp: Some(true),
enabled: Some(true),
..Default::default()
}),
ipv6: Some(nmstate::IpStackSpec {
dhcp: Some(true),
autoconf: Some(true),
enabled: Some(true),
..Default::default()
}),
link_aggregation: Some(nmstate::BondSpec {
mode: "802.3ad".to_string(),
ports: bond_ports,
..Default::default()
}),
..Default::default()
});
nmstate::NodeNetworkConfigurationPolicy {
metadata: ObjectMeta {
name: Some(format!("{host}-bond-config")),
..Default::default()
},
spec: nmstate::NodeNetworkConfigurationPolicySpec {
node_selector: Some(BTreeMap::from([(
"kubernetes.io/hostname".to_string(),
host.to_string(),
)])),
desired_state: nmstate::NetworkState {
interfaces,
..Default::default()
},
},
}
}
async fn get_hostname(&self, host_id: &Id) -> Result<String, String> {
let nodes: ObjectList<Node> = self
.k8s_client
.list_resources(None, None)
.await
.map_err(|e| format!("Failed to list nodes: {e}"))?;
let Some(node) = nodes.iter().find(|n| {
n.status
.as_ref()
.and_then(|s| s.node_info.as_ref())
.map(|i| i.system_uuid == host_id.to_string())
.unwrap_or(false)
}) else {
return Err(format!("No node found for host '{host_id}'"));
};
node.labels()
.get("kubernetes.io/hostname")
.ok_or(format!(
"Node '{host_id}' has no kubernetes.io/hostname label"
))
.cloned()
}
async fn get_next_bond_id(&self, hostname: &str) -> Result<String, String> {
let network_state: Option<nmstate::NodeNetworkState> = self
.k8s_client
.get_resource(hostname, None)
.await
.map_err(|e| format!("Failed to list nodes: {e}"))?;
let interfaces = vec![];
let existing_bonds: Vec<&nmstate::Interface> = network_state
.as_ref()
.and_then(|network_state| network_state.status.current_state.as_ref())
.map_or(&interfaces, |current_state| &current_state.interfaces)
.iter()
.filter(|i| i.r#type == nmstate::InterfaceType::Bond && i.link_aggregation.is_some())
.collect();
let used_ids: HashSet<u32> = existing_bonds
.iter()
.filter_map(|i| {
i.name
.strip_prefix("bond")
.and_then(|id| id.parse::<u32>().ok())
})
.collect();
let next_id = (0..).find(|id| !used_ids.contains(id)).unwrap();
Ok(format!("bond{next_id}"))
}
}

View File

@ -9,7 +9,7 @@ use crate::{
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory, inventory::Inventory,
score::Score, score::Score,
topology::{HostNetworkConfig, NetworkInterface, Switch, SwitchPort, Topology}, topology::{HostNetworkConfig, NetworkInterface, NetworkManager, Switch, SwitchPort, Topology},
}; };
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
@ -17,7 +17,7 @@ pub struct HostNetworkConfigurationScore {
pub hosts: Vec<PhysicalHost>, pub hosts: Vec<PhysicalHost>,
} }
impl<T: Topology + Switch> Score<T> for HostNetworkConfigurationScore { impl<T: Topology + NetworkManager + Switch> Score<T> for HostNetworkConfigurationScore {
fn name(&self) -> String { fn name(&self) -> String {
"HostNetworkConfigurationScore".into() "HostNetworkConfigurationScore".into()
} }
@ -35,7 +35,7 @@ pub struct HostNetworkConfigurationInterpret {
} }
impl HostNetworkConfigurationInterpret { impl HostNetworkConfigurationInterpret {
async fn configure_network_for_host<T: Topology + Switch>( async fn configure_network_for_host<T: Topology + NetworkManager + Switch>(
&self, &self,
topology: &T, topology: &T,
host: &PhysicalHost, host: &PhysicalHost,
@ -67,10 +67,15 @@ impl HostNetworkConfigurationInterpret {
); );
info!("[Host {current_host}/{total_hosts}] Configuring host network..."); info!("[Host {current_host}/{total_hosts}] Configuring host network...");
topology.configure_bond(&config).await.map_err(|e| {
InterpretError::new(format!("Failed to configure host network: {e}"))
})?;
topology topology
.configure_host_network(&config) .configure_port_channel(&config)
.await .await
.map_err(|e| InterpretError::new(format!("Failed to configure host: {e}")))?; .map_err(|e| {
InterpretError::new(format!("Failed to configure host network: {e}"))
})?;
} else { } else {
info!( info!(
"[Host {current_host}/{total_hosts}] No ports found for {} interfaces, skipping", "[Host {current_host}/{total_hosts}] No ports found for {} interfaces, skipping",
@ -169,7 +174,7 @@ impl HostNetworkConfigurationInterpret {
} }
#[async_trait] #[async_trait]
impl<T: Topology + Switch> Interpret<T> for HostNetworkConfigurationInterpret { impl<T: Topology + NetworkManager + Switch> Interpret<T> for HostNetworkConfigurationInterpret {
fn get_name(&self) -> InterpretName { fn get_name(&self) -> InterpretName {
InterpretName::Custom("HostNetworkConfigurationInterpret") InterpretName::Custom("HostNetworkConfigurationInterpret")
} }
@ -198,6 +203,12 @@ impl<T: Topology + Switch> Interpret<T> for HostNetworkConfigurationInterpret {
let host_count = self.score.hosts.len(); let host_count = self.score.hosts.len();
info!("Started network configuration for {host_count} host(s)...",); info!("Started network configuration for {host_count} host(s)...",);
info!("Setting up NetworkManager...",);
topology
.ensure_network_manager_installed()
.await
.map_err(|e| InterpretError::new(format!("NetworkManager setup failed: {e}")))?;
info!("Setting up switch with sane defaults..."); info!("Setting up switch with sane defaults...");
topology topology
.setup_switch() .setup_switch()
@ -242,7 +253,8 @@ mod tests {
use crate::{ use crate::{
hardware::HostCategory, hardware::HostCategory,
topology::{ topology::{
HostNetworkConfig, PreparationError, PreparationOutcome, SwitchError, SwitchPort, HostNetworkConfig, NetworkError, PreparationError, PreparationOutcome, SwitchError,
SwitchPort,
}, },
}; };
use std::{ use std::{
@ -290,15 +302,27 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn host_with_one_mac_address_should_create_bond_with_one_interface() { async fn should_setup_network_manager() {
let host = given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]); let host = given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]);
let score = given_score(vec![host]); let score = given_score(vec![host]);
let topology = TopologyWithSwitch::new(); let topology = TopologyWithSwitch::new();
let _ = score.interpret(&Inventory::empty(), &topology).await; let _ = score.interpret(&Inventory::empty(), &topology).await;
let configured_host_networks = topology.configured_host_networks.lock().unwrap(); let network_manager_setup = topology.network_manager_setup.lock().unwrap();
assert_that!(*configured_host_networks).contains_exactly(vec![( assert_that!(*network_manager_setup).is_true();
}
#[tokio::test]
async fn host_with_one_mac_address_should_configure_bond_with_one_interface() {
let host = given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]);
let score = given_score(vec![host]);
let topology = TopologyWithSwitch::new();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let config = topology.configured_bonds.lock().unwrap();
assert_that!(*config).contains_exactly(vec![(
HOST_ID.clone(), HOST_ID.clone(),
HostNetworkConfig { HostNetworkConfig {
host_id: HOST_ID.clone(), host_id: HOST_ID.clone(),
@ -311,7 +335,28 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn host_with_multiple_mac_addresses_should_create_one_bond_with_all_interfaces() { async fn host_with_one_mac_address_should_configure_port_channel_with_one_interface() {
let host = given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]);
let score = given_score(vec![host]);
let topology = TopologyWithSwitch::new();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let config = topology.configured_port_channels.lock().unwrap();
assert_that!(*config).contains_exactly(vec![(
HOST_ID.clone(),
HostNetworkConfig {
host_id: HOST_ID.clone(),
switch_ports: vec![SwitchPort {
interface: EXISTING_INTERFACE.clone(),
port: PORT.clone(),
}],
},
)]);
}
#[tokio::test]
async fn host_with_multiple_mac_addresses_should_configure_one_bond_with_all_interfaces() {
let score = given_score(vec![given_host( let score = given_score(vec![given_host(
&HOST_ID, &HOST_ID,
vec![ vec![
@ -323,8 +368,8 @@ mod tests {
let _ = score.interpret(&Inventory::empty(), &topology).await; let _ = score.interpret(&Inventory::empty(), &topology).await;
let configured_host_networks = topology.configured_host_networks.lock().unwrap(); let config = topology.configured_bonds.lock().unwrap();
assert_that!(*configured_host_networks).contains_exactly(vec![( assert_that!(*config).contains_exactly(vec![(
HOST_ID.clone(), HOST_ID.clone(),
HostNetworkConfig { HostNetworkConfig {
host_id: HOST_ID.clone(), host_id: HOST_ID.clone(),
@ -343,7 +388,40 @@ mod tests {
} }
#[tokio::test] #[tokio::test]
async fn multiple_hosts_should_create_one_bond_per_host() { async fn host_with_multiple_mac_addresses_should_configure_one_port_channel_with_all_interfaces()
{
let score = given_score(vec![given_host(
&HOST_ID,
vec![
EXISTING_INTERFACE.clone(),
ANOTHER_EXISTING_INTERFACE.clone(),
],
)]);
let topology = TopologyWithSwitch::new();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let config = topology.configured_port_channels.lock().unwrap();
assert_that!(*config).contains_exactly(vec![(
HOST_ID.clone(),
HostNetworkConfig {
host_id: HOST_ID.clone(),
switch_ports: vec![
SwitchPort {
interface: EXISTING_INTERFACE.clone(),
port: PORT.clone(),
},
SwitchPort {
interface: ANOTHER_EXISTING_INTERFACE.clone(),
port: ANOTHER_PORT.clone(),
},
],
},
)]);
}
#[tokio::test]
async fn multiple_hosts_should_configure_one_bond_per_host() {
let score = given_score(vec![ let score = given_score(vec![
given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]), given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]),
given_host(&ANOTHER_HOST_ID, vec![ANOTHER_EXISTING_INTERFACE.clone()]), given_host(&ANOTHER_HOST_ID, vec![ANOTHER_EXISTING_INTERFACE.clone()]),
@ -352,8 +430,43 @@ mod tests {
let _ = score.interpret(&Inventory::empty(), &topology).await; let _ = score.interpret(&Inventory::empty(), &topology).await;
let configured_host_networks = topology.configured_host_networks.lock().unwrap(); let config = topology.configured_bonds.lock().unwrap();
assert_that!(*configured_host_networks).contains_exactly(vec![ assert_that!(*config).contains_exactly(vec![
(
HOST_ID.clone(),
HostNetworkConfig {
host_id: HOST_ID.clone(),
switch_ports: vec![SwitchPort {
interface: EXISTING_INTERFACE.clone(),
port: PORT.clone(),
}],
},
),
(
ANOTHER_HOST_ID.clone(),
HostNetworkConfig {
host_id: ANOTHER_HOST_ID.clone(),
switch_ports: vec![SwitchPort {
interface: ANOTHER_EXISTING_INTERFACE.clone(),
port: ANOTHER_PORT.clone(),
}],
},
),
]);
}
#[tokio::test]
async fn multiple_hosts_should_configure_one_port_channel_per_host() {
let score = given_score(vec![
given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]),
given_host(&ANOTHER_HOST_ID, vec![ANOTHER_EXISTING_INTERFACE.clone()]),
]);
let topology = TopologyWithSwitch::new();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let config = topology.configured_port_channels.lock().unwrap();
assert_that!(*config).contains_exactly(vec![
( (
HOST_ID.clone(), HOST_ID.clone(),
HostNetworkConfig { HostNetworkConfig {
@ -384,8 +497,10 @@ mod tests {
let _ = score.interpret(&Inventory::empty(), &topology).await; let _ = score.interpret(&Inventory::empty(), &topology).await;
let configured_host_networks = topology.configured_host_networks.lock().unwrap(); let config = topology.configured_port_channels.lock().unwrap();
assert_that!(*configured_host_networks).is_empty(); assert_that!(*config).is_empty();
let config = topology.configured_bonds.lock().unwrap();
assert_that!(*config).is_empty();
} }
fn given_score(hosts: Vec<PhysicalHost>) -> HostNetworkConfigurationScore { fn given_score(hosts: Vec<PhysicalHost>) -> HostNetworkConfigurationScore {
@ -422,26 +537,33 @@ mod tests {
} }
} }
#[derive(Debug)]
struct TopologyWithSwitch { struct TopologyWithSwitch {
available_ports: Arc<Mutex<Vec<PortLocation>>>, available_ports: Arc<Mutex<Vec<PortLocation>>>,
configured_host_networks: Arc<Mutex<Vec<(Id, HostNetworkConfig)>>>, configured_port_channels: Arc<Mutex<Vec<(Id, HostNetworkConfig)>>>,
switch_setup: Arc<Mutex<bool>>, switch_setup: Arc<Mutex<bool>>,
network_manager_setup: Arc<Mutex<bool>>,
configured_bonds: Arc<Mutex<Vec<(Id, HostNetworkConfig)>>>,
} }
impl TopologyWithSwitch { impl TopologyWithSwitch {
fn new() -> Self { fn new() -> Self {
Self { Self {
available_ports: Arc::new(Mutex::new(vec![PORT.clone(), ANOTHER_PORT.clone()])), available_ports: Arc::new(Mutex::new(vec![PORT.clone(), ANOTHER_PORT.clone()])),
configured_host_networks: Arc::new(Mutex::new(vec![])), configured_port_channels: Arc::new(Mutex::new(vec![])),
switch_setup: Arc::new(Mutex::new(false)), switch_setup: Arc::new(Mutex::new(false)),
network_manager_setup: Arc::new(Mutex::new(false)),
configured_bonds: Arc::new(Mutex::new(vec![])),
} }
} }
fn new_port_not_found() -> Self { fn new_port_not_found() -> Self {
Self { Self {
available_ports: Arc::new(Mutex::new(vec![])), available_ports: Arc::new(Mutex::new(vec![])),
configured_host_networks: Arc::new(Mutex::new(vec![])), configured_port_channels: Arc::new(Mutex::new(vec![])),
switch_setup: Arc::new(Mutex::new(false)), switch_setup: Arc::new(Mutex::new(false)),
network_manager_setup: Arc::new(Mutex::new(false)),
configured_bonds: Arc::new(Mutex::new(vec![])),
} }
} }
} }
@ -457,6 +579,22 @@ mod tests {
} }
} }
#[async_trait]
impl NetworkManager for TopologyWithSwitch {
async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError> {
let mut network_manager_installed = self.network_manager_setup.lock().unwrap();
*network_manager_installed = true;
Ok(())
}
async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError> {
let mut configured_bonds = self.configured_bonds.lock().unwrap();
configured_bonds.push((config.host_id.clone(), config.clone()));
Ok(())
}
}
#[async_trait] #[async_trait]
impl Switch for TopologyWithSwitch { impl Switch for TopologyWithSwitch {
async fn setup_switch(&self) -> Result<(), SwitchError> { async fn setup_switch(&self) -> Result<(), SwitchError> {
@ -476,12 +614,12 @@ mod tests {
Ok(Some(ports.remove(0))) Ok(Some(ports.remove(0)))
} }
async fn configure_host_network( async fn configure_port_channel(
&self, &self,
config: &HostNetworkConfig, config: &HostNetworkConfig,
) -> Result<(), SwitchError> { ) -> Result<(), SwitchError> {
let mut configured_host_networks = self.configured_host_networks.lock().unwrap(); let mut configured_port_channels = self.configured_port_channels.lock().unwrap();
configured_host_networks.push((config.host_id.clone(), config.clone())); configured_port_channels.push((config.host_id.clone(), config.clone()));
Ok(()) Ok(())
} }