use async_trait::async_trait; use brocade::PortOperatingMode; use harmony_macros::ip; use harmony_types::{ id::Id, net::{MacAddress, Url}, switch::PortLocation, }; use log::debug; use log::info; use crate::{data::FileContent, executors::ExecutorError, topology::node_exporter::NodeExporter}; use crate::{infra::network_manager::OpenShiftNmStateNetworkManager, topology::PortConfig}; use crate::{modules::inventory::HarmonyDiscoveryStrategy, topology::PxeOptions}; use super::{ DHCPStaticEntry, DhcpServer, DnsRecord, DnsRecordType, DnsServer, Firewall, HostNetworkConfig, HttpServer, IpAddress, K8sclient, LoadBalancer, LoadBalancerService, LogicalHost, NetworkError, NetworkManager, PreparationError, PreparationOutcome, Router, Switch, SwitchClient, SwitchError, TftpServer, Topology, k8s::K8sClient, }; use std::sync::{Arc, OnceLock}; #[derive(Debug, Clone)] pub struct HAClusterTopology { pub domain_name: String, pub router: Arc, pub load_balancer: Arc, pub firewall: Arc, pub dhcp_server: Arc, pub tftp_server: Arc, pub http_server: Arc, pub dns_server: Arc, pub node_exporter: Arc, pub switch_client: Arc, pub bootstrap_host: LogicalHost, pub control_plane: Vec, pub workers: Vec, pub kubeconfig: Option, pub network_manager: OnceLock>, } #[async_trait] impl Topology for HAClusterTopology { fn name(&self) -> &str { "HAClusterTopology" } async fn ensure_ready(&self) -> Result { debug!( "ensure_ready, not entirely sure what it should do here, probably something like verify that the hosts are reachable and all services are up and ready." ); Ok(PreparationOutcome::Noop) } } #[async_trait] impl K8sclient for HAClusterTopology { async fn k8s_client(&self) -> Result, String> { match &self.kubeconfig { None => Ok(Arc::new( K8sClient::try_default().await.map_err(|e| e.to_string())?, )), Some(kubeconfig) => { let Some(client) = K8sClient::from_kubeconfig(kubeconfig).await else { return Err("Failed to create k8s client".to_string()); }; Ok(Arc::new(client)) } } } } impl HAClusterTopology { // TODO this is a hack to avoid refactoring pub fn get_cluster_name(&self) -> String { self.domain_name .split(".") .next() .expect("Cluster domain name must not be empty") .to_string() } pub fn get_cluster_base_domain(&self) -> String { let base_domain = self .domain_name .strip_prefix(&self.get_cluster_name()) .expect("cluster domain must start with cluster name"); base_domain .strip_prefix(".") .unwrap_or(base_domain) .to_string() } pub async fn network_manager(&self) -> &dyn NetworkManager { let k8s_client = self.k8s_client().await.unwrap(); self.network_manager .get_or_init(|| Arc::new(OpenShiftNmStateNetworkManager::new(k8s_client.clone()))) .as_ref() } pub fn autoload() -> Self { let dummy_infra = Arc::new(DummyInfra {}); let dummy_host = LogicalHost { ip: ip!("0.0.0.0"), name: "dummyhost".to_string(), }; Self { kubeconfig: None, domain_name: "DummyTopology".to_string(), router: dummy_infra.clone(), load_balancer: dummy_infra.clone(), firewall: dummy_infra.clone(), dhcp_server: dummy_infra.clone(), tftp_server: dummy_infra.clone(), http_server: dummy_infra.clone(), dns_server: dummy_infra.clone(), node_exporter: dummy_infra.clone(), switch_client: dummy_infra.clone(), bootstrap_host: dummy_host, control_plane: vec![], workers: vec![], network_manager: OnceLock::new(), } } } #[async_trait] impl DnsServer for HAClusterTopology { async fn register_dhcp_leases(&self, register: bool) -> Result<(), ExecutorError> { self.dns_server.register_dhcp_leases(register).await } async fn register_hosts(&self, hosts: Vec) -> Result<(), ExecutorError> { self.dns_server.register_hosts(hosts).await } fn remove_record(&self, name: &str, record_type: DnsRecordType) -> Result<(), ExecutorError> { self.dns_server.remove_record(name, record_type) } async fn list_records(&self) -> Vec { self.dns_server.list_records().await } fn get_ip(&self) -> IpAddress { self.dns_server.get_ip() } fn get_host(&self) -> LogicalHost { self.dns_server.get_host() } async fn commit_config(&self) -> Result<(), ExecutorError> { self.dns_server.commit_config().await } } #[async_trait] impl LoadBalancer for HAClusterTopology { fn get_ip(&self) -> IpAddress { self.load_balancer.get_ip() } fn get_host(&self) -> LogicalHost { self.load_balancer.get_host() } async fn add_service(&self, service: &LoadBalancerService) -> Result<(), ExecutorError> { self.load_balancer.add_service(service).await } async fn remove_service(&self, service: &LoadBalancerService) -> Result<(), ExecutorError> { self.load_balancer.remove_service(service).await } async fn list_services(&self) -> Vec { self.load_balancer.list_services().await } async fn ensure_initialized(&self) -> Result<(), ExecutorError> { self.load_balancer.ensure_initialized().await } async fn commit_config(&self) -> Result<(), ExecutorError> { self.load_balancer.commit_config().await } async fn reload_restart(&self) -> Result<(), ExecutorError> { self.load_balancer.reload_restart().await } } #[async_trait] impl DhcpServer for HAClusterTopology { async fn add_static_mapping(&self, entry: &DHCPStaticEntry) -> Result<(), ExecutorError> { self.dhcp_server.add_static_mapping(entry).await } async fn remove_static_mapping(&self, mac: &MacAddress) -> Result<(), ExecutorError> { self.dhcp_server.remove_static_mapping(mac).await } async fn list_static_mappings(&self) -> Vec<(MacAddress, IpAddress)> { self.dhcp_server.list_static_mappings().await } async fn set_pxe_options(&self, options: PxeOptions) -> Result<(), ExecutorError> { self.dhcp_server.set_pxe_options(options).await } async fn set_dhcp_range( &self, start: &IpAddress, end: &IpAddress, ) -> Result<(), ExecutorError> { self.dhcp_server.set_dhcp_range(start, end).await } fn get_ip(&self) -> IpAddress { self.dhcp_server.get_ip() } fn get_host(&self) -> LogicalHost { self.dhcp_server.get_host() } async fn commit_config(&self) -> Result<(), ExecutorError> { self.dhcp_server.commit_config().await } } #[async_trait] impl TftpServer for HAClusterTopology { async fn serve_files(&self, url: &Url) -> Result<(), ExecutorError> { self.tftp_server.serve_files(url).await } fn get_ip(&self) -> IpAddress { self.tftp_server.get_ip() } async fn set_ip(&self, ip: IpAddress) -> Result<(), ExecutorError> { self.tftp_server.set_ip(ip).await } async fn ensure_initialized(&self) -> Result<(), ExecutorError> { self.tftp_server.ensure_initialized().await } async fn commit_config(&self) -> Result<(), ExecutorError> { self.tftp_server.commit_config().await } async fn reload_restart(&self) -> Result<(), ExecutorError> { self.tftp_server.reload_restart().await } } impl Router for HAClusterTopology { fn get_gateway(&self) -> super::IpAddress { self.router.get_gateway() } fn get_cidr(&self) -> cidr::Ipv4Cidr { self.router.get_cidr() } fn get_host(&self) -> LogicalHost { self.router.get_host() } } #[async_trait] impl HttpServer for HAClusterTopology { async fn serve_files( &self, url: &Url, remote_path: &Option, ) -> Result<(), ExecutorError> { self.http_server.serve_files(url, remote_path).await } async fn serve_file_content(&self, file: &FileContent) -> Result<(), ExecutorError> { self.http_server.serve_file_content(file).await } fn get_ip(&self) -> IpAddress { self.http_server.get_ip() } async fn ensure_initialized(&self) -> Result<(), ExecutorError> { self.http_server.ensure_initialized().await } async fn commit_config(&self) -> Result<(), ExecutorError> { self.http_server.commit_config().await } async fn reload_restart(&self) -> Result<(), ExecutorError> { self.http_server.reload_restart().await } } #[async_trait] impl Switch for HAClusterTopology { async fn setup_switch(&self) -> Result<(), SwitchError> { self.switch_client.setup().await.map(|_| ()) } async fn get_port_for_mac_address( &self, mac_address: &MacAddress, ) -> Result, SwitchError> { self.switch_client.find_port(mac_address).await } async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> { debug!("Configuring port channel: {config:#?}"); let switch_ports = config.switch_ports.iter().map(|s| s.port.clone()).collect(); self.switch_client .configure_port_channel(&format!("Harmony_{}", config.host_id), switch_ports) .await .map_err(|e| SwitchError::new(format!("Failed to configure port-channel: {e}")))?; Ok(()) } async fn clear_port_channel(&self, ids: &Vec) -> Result<(), SwitchError> { todo!() } async fn configure_interface(&self, ports: &Vec) -> Result<(), SwitchError> { todo!() } } #[async_trait] impl NetworkManager for HAClusterTopology { async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError> { self.network_manager() .await .ensure_network_manager_installed() .await } async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError> { self.network_manager().await.configure_bond(config).await } //TODO add snmp here } #[async_trait] impl NodeExporter for HAClusterTopology { async fn ensure_initialized(&self) -> Result<(), ExecutorError> { self.node_exporter.ensure_initialized().await } async fn commit_config(&self) -> Result<(), ExecutorError> { self.node_exporter.commit_config().await } async fn reload_restart(&self) -> Result<(), ExecutorError> { self.node_exporter.reload_restart().await } } #[derive(Debug)] pub struct DummyInfra; #[async_trait] impl Topology for DummyInfra { fn name(&self) -> &str { "DummyInfra" } async fn ensure_ready(&self) -> Result { let dummy_msg = "This is a dummy infrastructure that does nothing"; info!("{dummy_msg}"); Ok(PreparationOutcome::Success { details: dummy_msg.into(), }) } } const UNIMPLEMENTED_DUMMY_INFRA: &str = "This is a dummy infrastructure, no operation is supported"; impl Router for DummyInfra { fn get_gateway(&self) -> super::IpAddress { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } fn get_cidr(&self) -> cidr::Ipv4Cidr { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } fn get_host(&self) -> LogicalHost { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } } impl Firewall for DummyInfra { fn add_rule( &mut self, _rule: super::FirewallRule, ) -> Result<(), crate::executors::ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } fn remove_rule(&mut self, _rule_id: &str) -> Result<(), crate::executors::ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } fn list_rules(&self) -> Vec { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } fn get_ip(&self) -> super::IpAddress { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } fn get_host(&self) -> LogicalHost { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } } #[async_trait] impl DhcpServer for DummyInfra { async fn add_static_mapping(&self, _entry: &DHCPStaticEntry) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn remove_static_mapping(&self, _mac: &MacAddress) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn list_static_mappings(&self) -> Vec<(MacAddress, IpAddress)> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn set_pxe_options(&self, _options: PxeOptions) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn set_dhcp_range( &self, _start: &IpAddress, _end: &IpAddress, ) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } fn get_ip(&self) -> IpAddress { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } fn get_host(&self) -> LogicalHost { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn commit_config(&self) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } } #[async_trait] impl LoadBalancer for DummyInfra { fn get_ip(&self) -> IpAddress { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } fn get_host(&self) -> LogicalHost { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn add_service(&self, _service: &LoadBalancerService) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn remove_service(&self, _service: &LoadBalancerService) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn list_services(&self) -> Vec { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn ensure_initialized(&self) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn commit_config(&self) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn reload_restart(&self) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } } #[async_trait] impl TftpServer for DummyInfra { async fn serve_files(&self, _url: &Url) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } fn get_ip(&self) -> IpAddress { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn set_ip(&self, _ip: IpAddress) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn ensure_initialized(&self) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn commit_config(&self) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn reload_restart(&self) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } } #[async_trait] impl HttpServer for DummyInfra { async fn serve_files( &self, _url: &Url, _remote_path: &Option, ) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn serve_file_content(&self, _file: &FileContent) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } fn get_ip(&self) -> IpAddress { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn ensure_initialized(&self) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn commit_config(&self) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn reload_restart(&self) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } } #[async_trait] impl DnsServer for DummyInfra { async fn register_dhcp_leases(&self, _register: bool) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn register_hosts(&self, _hosts: Vec) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } fn remove_record(&self, _name: &str, _record_type: DnsRecordType) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn list_records(&self) -> Vec { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } fn get_ip(&self) -> IpAddress { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } fn get_host(&self) -> LogicalHost { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn commit_config(&self) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } } #[async_trait] impl NodeExporter for DummyInfra { async fn ensure_initialized(&self) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn commit_config(&self) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn reload_restart(&self) -> Result<(), ExecutorError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } } #[async_trait] impl SwitchClient for DummyInfra { async fn setup(&self) -> Result<(), SwitchError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn find_port( &self, _mac_address: &MacAddress, ) -> Result, SwitchError> { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn configure_port_channel( &self, _channel_name: &str, _switch_ports: Vec, ) -> Result { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } async fn clear_port_channel(&self, ids: &Vec) -> Result<(), SwitchError> { todo!() } async fn configure_interface(&self, ports: &Vec) -> Result<(), SwitchError> { todo!() } }