From f073b7e5fb15bf7da0d4fb9fdcee60aa665c854c Mon Sep 17 00:00:00 2001 From: Willem Date: Wed, 24 Sep 2025 13:28:46 -0400 Subject: [PATCH 1/8] feat:added k8s flavour to k8s_aywhere topology to be able to get the type of cluster --- harmony/src/domain/topology/k8s.rs | 14 ++++- harmony/src/domain/topology/k8s_anywhere.rs | 62 +++++++++++++++++++++ 2 files changed, 75 insertions(+), 1 deletion(-) diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 88bd2e8..144533c 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -2,9 +2,10 @@ use derive_new::new; use k8s_openapi::{ ClusterResourceScope, NamespaceResourceScope, api::{apps::v1::Deployment, core::v1::Pod}, + apimachinery::pkg::version::Info, }; use kube::{ - Client, Config, Error, Resource, + Client, Config, Discovery, Error, Resource, api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt}, config::{KubeConfigOptions, Kubeconfig}, core::ErrorResponse, @@ -53,6 +54,17 @@ impl K8sClient { }) } + pub async fn get_apiserver_version(&self) -> Result { + let client: Client = self.client.clone(); + let version_info: Info = client.apiserver_version().await?; + Ok(version_info) + } + + pub async fn discovery(&self) -> Result { + let discovery: Discovery = Discovery::new(self.client.clone()).run().await?; + Ok(discovery) + } + pub async fn get_resource_json_value( &self, name: &str, diff --git a/harmony/src/domain/topology/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere.rs index 53b6436..e6c37ea 100644 --- a/harmony/src/domain/topology/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere.rs @@ -47,6 +47,13 @@ struct K8sState { message: String, } +#[derive(Debug, Clone)] +pub enum K8sFlavour { + Okd, + K3d, + K8s, +} + #[derive(Debug, Clone)] enum K8sSource { LocalK3d, @@ -57,6 +64,7 @@ enum K8sSource { pub struct K8sAnywhereTopology { k8s_state: Arc>>, tenant_manager: Arc>, + flavour: Arc>, config: Arc, } @@ -162,6 +170,7 @@ impl K8sAnywhereTopology { Self { k8s_state: Arc::new(OnceCell::new()), tenant_manager: Arc::new(OnceCell::new()), + flavour: Arc::new(OnceCell::new()), config: Arc::new(K8sAnywhereConfig::from_env()), } } @@ -170,10 +179,63 @@ impl K8sAnywhereTopology { Self { k8s_state: Arc::new(OnceCell::new()), tenant_manager: Arc::new(OnceCell::new()), + flavour: Arc::new(OnceCell::new()), config: Arc::new(config), } } + pub async fn get_k8s_flavour(&self) -> K8sFlavour { + self.flavour + .get_or_try_init(async || { + let client = self.k8s_client().await.unwrap(); + + let discovery = client.discovery().await.map_err(|e| { + PreparationError::new(format!("Could not discover API groups: {}", e)) + })?; + + let version = client.get_apiserver_version().await.map_err(|e| { + PreparationError::new(format!("Could not get server version: {}", e)) + })?; + + let rules: &[&dyn Fn() -> Option] = &[ + // OpenShift / OKD + &|| { + discovery + .groups() + .any(|g| g.name().ends_with("openshift.io")) + .then_some(K8sFlavour::Okd) + }, + // K3d / K3s + &|| { + version + .git_version + .contains("k3s") + .then_some(K8sFlavour::K3d) + }, + // Vanilla Kubernetes + &|| { + if !discovery + .groups() + .any(|g| g.name().ends_with("openshift.io")) + && !version.git_version.contains("k3s") + && !version.git_version.contains("k3d") + { + Some(K8sFlavour::K8s) + } else { + None + } + }, + ]; + + rules.iter().find_map(|rule| rule()).ok_or_else(|| { + PreparationError::new("Unknown Kubernetes cluster flavour".to_string()) + }) + }) + .await + .unwrap() + .clone() + } + async fn get_cluster_observability_operator_prometheus_application_score( &self, sender: RHOBObservability, From 1cec398d4d4185adec39273e2b329a20da00f6f4 Mon Sep 17 00:00:00 2001 From: Willem Date: Mon, 29 Sep 2025 11:29:34 -0400 Subject: [PATCH 2/8] fix: modifed naming scheme to OpenshiftFamily, K3sFamily, and defaultswitched discovery of openshiftfamily to look for projet.openshift.io --- harmony/src/domain/topology/k8s_anywhere.rs | 28 +++++++++++---------- 1 file changed, 15 insertions(+), 13 deletions(-) diff --git a/harmony/src/domain/topology/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere.rs index e6c37ea..758198f 100644 --- a/harmony/src/domain/topology/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere.rs @@ -48,10 +48,10 @@ struct K8sState { } #[derive(Debug, Clone)] -pub enum K8sFlavour { - Okd, - K3d, - K8s, +pub enum KubernetesDistribution { + OpenshiftFamily, + K3sFamily, + Default, } #[derive(Debug, Clone)] @@ -64,7 +64,7 @@ enum K8sSource { pub struct K8sAnywhereTopology { k8s_state: Arc>>, tenant_manager: Arc>, - flavour: Arc>, + flavour: Arc>, config: Arc, } @@ -184,7 +184,7 @@ impl K8sAnywhereTopology { } } - pub async fn get_k8s_flavour(&self) -> K8sFlavour { + pub async fn get_k8s_distribution(&self) -> KubernetesDistribution { self.flavour .get_or_try_init(async || { let client = self.k8s_client().await.unwrap(); @@ -197,22 +197,22 @@ impl K8sAnywhereTopology { PreparationError::new(format!("Could not get server version: {}", e)) })?; - let rules: &[&dyn Fn() -> Option] = &[ + let rules: &[&dyn Fn() -> Option] = &[ // OpenShift / OKD &|| { discovery .groups() - .any(|g| g.name().ends_with("openshift.io")) - .then_some(K8sFlavour::Okd) + .any(|g| g.name() == "project.openshift.io") + .then_some(KubernetesDistribution::OpenshiftFamily) }, // K3d / K3s &|| { version .git_version .contains("k3s") - .then_some(K8sFlavour::K3d) + .then_some(KubernetesDistribution::K3sFamily) }, - // Vanilla Kubernetes + // Fallback Kubernetes K8s &|| { if !discovery .groups() @@ -220,7 +220,7 @@ impl K8sAnywhereTopology { && !version.git_version.contains("k3s") && !version.git_version.contains("k3d") { - Some(K8sFlavour::K8s) + Some(KubernetesDistribution::Default) } else { None } @@ -228,7 +228,9 @@ impl K8sAnywhereTopology { ]; rules.iter().find_map(|rule| rule()).ok_or_else(|| { - PreparationError::new("Unknown Kubernetes cluster flavour".to_string()) + PreparationError::new( + "Unable to detect Kubernetes cluster distribution".to_string(), + ) }) }) .await From 2d3c32469c49107bf0e4a625b8e6f39afe27740c Mon Sep 17 00:00:00 2001 From: Jean-Gabriel Gill-Couture Date: Tue, 30 Sep 2025 22:59:50 -0400 Subject: [PATCH 3/8] chore: Simplify k8s flavour detection algorithm and do not unwrap when it cannot be detected, just return Err --- harmony/src/domain/topology/k8s_anywhere.rs | 51 ++++++--------------- 1 file changed, 14 insertions(+), 37 deletions(-) diff --git a/harmony/src/domain/topology/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere.rs index 758198f..5e448b8 100644 --- a/harmony/src/domain/topology/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere.rs @@ -184,7 +184,7 @@ impl K8sAnywhereTopology { } } - pub async fn get_k8s_distribution(&self) -> KubernetesDistribution { + pub async fn get_k8s_distribution(&self) -> Result<&KubernetesDistribution, PreparationError> { self.flavour .get_or_try_init(async || { let client = self.k8s_client().await.unwrap(); @@ -197,45 +197,22 @@ impl K8sAnywhereTopology { PreparationError::new(format!("Could not get server version: {}", e)) })?; - let rules: &[&dyn Fn() -> Option] = &[ - // OpenShift / OKD - &|| { - discovery - .groups() - .any(|g| g.name() == "project.openshift.io") - .then_some(KubernetesDistribution::OpenshiftFamily) - }, - // K3d / K3s - &|| { - version - .git_version - .contains("k3s") - .then_some(KubernetesDistribution::K3sFamily) - }, - // Fallback Kubernetes K8s - &|| { - if !discovery - .groups() - .any(|g| g.name().ends_with("openshift.io")) - && !version.git_version.contains("k3s") - && !version.git_version.contains("k3d") - { - Some(KubernetesDistribution::Default) - } else { - None - } - }, - ]; + // OpenShift / OKD + if discovery + .groups() + .any(|g| g.name() == "project.openshift.io") + { + return Ok(KubernetesDistribution::OpenshiftFamily); + } - rules.iter().find_map(|rule| rule()).ok_or_else(|| { - PreparationError::new( - "Unable to detect Kubernetes cluster distribution".to_string(), - ) - }) + // K3d / K3s + if version.git_version.contains("k3s") { + return Ok(KubernetesDistribution::K3sFamily); + } + + return Ok(KubernetesDistribution::Default); }) .await - .unwrap() - .clone() } async fn get_cluster_observability_operator_prometheus_application_score( From ed7f81aa1f1dcc93e65a586c87c1f2c520740b0a Mon Sep 17 00:00:00 2001 From: Ian Letourneau Date: Mon, 20 Oct 2025 19:18:49 +0000 Subject: [PATCH 4/8] fix(opnsense-config): ensure load balancer service configuration is idempotent (#129) The previous implementation blindly added HAProxy components without checking for existing configurations on the same port, which caused duplicate entries and errors when a service was updated. This commit refactors the logic to a robust "remove-then-add" strategy. The configure_service method now finds and removes any existing frontend and its dependent components (backend, servers, health check) before adding the new, complete service definition. This change makes the process fully idempotent, preventing configuration drift and ensuring a predictable state. Co-authored-by: Ian Letourneau Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/129 --- Cargo.lock | 3 + Cargo.toml | 3 +- harmony/src/domain/topology/load_balancer.rs | 8 +- harmony/src/infra/opnsense/load_balancer.rs | 53 ++- .../modules/okd/bootstrap_load_balancer.rs | 2 + opnsense-config-xml/src/data/haproxy.rs | 16 +- opnsense-config/Cargo.toml | 1 + opnsense-config/src/config/manager/ssh.rs | 3 +- opnsense-config/src/config/shell/mod.rs | 6 +- opnsense-config/src/modules/load_balancer.rs | 332 +++++++++++++++++- 10 files changed, 360 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0b5972f..429f09b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1918,6 +1918,8 @@ dependencies = [ "env_logger", "harmony", "harmony_macros", + "harmony_secret", + "harmony_secret_derive", "harmony_tui", "harmony_types", "log", @@ -3918,6 +3920,7 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" name = "opnsense-config" version = "0.1.0" dependencies = [ + "assertor", "async-trait", "chrono", "env_logger", diff --git a/Cargo.toml b/Cargo.toml index a10bf81..a256234 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,8 @@ members = [ "harmony_inventory_agent", "harmony_secret_derive", "harmony_secret", - "adr/agent_discovery/mdns", "brocade", + "adr/agent_discovery/mdns", + "brocade", ] [workspace.package] diff --git a/harmony/src/domain/topology/load_balancer.rs b/harmony/src/domain/topology/load_balancer.rs index 901602b..59c5add 100644 --- a/harmony/src/domain/topology/load_balancer.rs +++ b/harmony/src/domain/topology/load_balancer.rs @@ -28,13 +28,7 @@ pub trait LoadBalancer: Send + Sync { &self, service: &LoadBalancerService, ) -> Result<(), ExecutorError> { - debug!( - "Listing LoadBalancer services {:?}", - self.list_services().await - ); - if !self.list_services().await.contains(service) { - self.add_service(service).await?; - } + self.add_service(service).await?; Ok(()) } } diff --git a/harmony/src/infra/opnsense/load_balancer.rs b/harmony/src/infra/opnsense/load_balancer.rs index ce47f05..3df7511 100644 --- a/harmony/src/infra/opnsense/load_balancer.rs +++ b/harmony/src/infra/opnsense/load_balancer.rs @@ -26,19 +26,13 @@ impl LoadBalancer for OPNSenseFirewall { } async fn add_service(&self, service: &LoadBalancerService) -> Result<(), ExecutorError> { - warn!( - "TODO : the current implementation does not check / cleanup / merge with existing haproxy services properly. Make sure to manually verify that the configuration is correct after executing any operation here" - ); let mut config = self.opnsense_config.write().await; + let mut load_balancer = config.load_balancer(); + let (frontend, backend, servers, healthcheck) = harmony_load_balancer_service_to_haproxy_xml(service); - let mut load_balancer = config.load_balancer(); - load_balancer.add_backend(backend); - load_balancer.add_frontend(frontend); - load_balancer.add_servers(servers); - if let Some(healthcheck) = healthcheck { - load_balancer.add_healthcheck(healthcheck); - } + + load_balancer.configure_service(frontend, backend, servers, healthcheck); Ok(()) } @@ -106,7 +100,7 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer( .backends .backends .iter() - .find(|b| b.uuid == frontend.default_backend); + .find(|b| Some(b.uuid.clone()) == frontend.default_backend); let mut health_check = None; match matching_backend { @@ -116,8 +110,7 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer( } None => { warn!( - "HAProxy config could not find a matching backend for frontend {:?}", - frontend + "HAProxy config could not find a matching backend for frontend {frontend:?}" ); } } @@ -152,11 +145,11 @@ pub(crate) fn get_servers_for_backend( .servers .iter() .filter_map(|server| { + let address = server.address.clone()?; + let port = server.port?; + if backend_servers.contains(&server.uuid.as_str()) { - return Some(BackendServer { - address: server.address.clone(), - port: server.port, - }); + return Some(BackendServer { address, port }); } None }) @@ -347,7 +340,7 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml( name: format!("frontend_{}", service.listening_port), bind: service.listening_port.to_string(), mode: "tcp".to_string(), // TODO do not depend on health check here - default_backend: backend.uuid.clone(), + default_backend: Some(backend.uuid.clone()), ..Default::default() }; info!("HAPRoxy frontend and backend mode currently hardcoded to tcp"); @@ -361,8 +354,8 @@ fn server_to_haproxy_server(server: &BackendServer) -> HAProxyServer { uuid: Uuid::new_v4().to_string(), name: format!("{}_{}", &server.address, &server.port), enabled: 1, - address: server.address.clone(), - port: server.port, + address: Some(server.address.clone()), + port: Some(server.port), mode: "active".to_string(), server_type: "static".to_string(), ..Default::default() @@ -385,8 +378,8 @@ mod tests { let mut haproxy = HAProxy::default(); let server = HAProxyServer { uuid: "server1".to_string(), - address: "192.168.1.1".to_string(), - port: 80, + address: Some("192.168.1.1".to_string()), + port: Some(80), ..Default::default() }; haproxy.servers.servers.push(server); @@ -411,8 +404,8 @@ mod tests { let mut haproxy = HAProxy::default(); let server = HAProxyServer { uuid: "server1".to_string(), - address: "192.168.1.1".to_string(), - port: 80, + address: Some("192.168.1.1".to_string()), + port: Some(80), ..Default::default() }; haproxy.servers.servers.push(server); @@ -431,8 +424,8 @@ mod tests { let mut haproxy = HAProxy::default(); let server = HAProxyServer { uuid: "server1".to_string(), - address: "192.168.1.1".to_string(), - port: 80, + address: Some("192.168.1.1".to_string()), + port: Some(80), ..Default::default() }; haproxy.servers.servers.push(server); @@ -453,16 +446,16 @@ mod tests { let mut haproxy = HAProxy::default(); let server = HAProxyServer { uuid: "server1".to_string(), - address: "some-hostname.test.mcd".to_string(), - port: 80, + address: Some("some-hostname.test.mcd".to_string()), + port: Some(80), ..Default::default() }; haproxy.servers.servers.push(server); let server = HAProxyServer { uuid: "server2".to_string(), - address: "192.168.1.2".to_string(), - port: 8080, + address: Some("192.168.1.2".to_string()), + port: Some(8080), ..Default::default() }; haproxy.servers.servers.push(server); diff --git a/harmony/src/modules/okd/bootstrap_load_balancer.rs b/harmony/src/modules/okd/bootstrap_load_balancer.rs index ccc69c9..e99fe97 100644 --- a/harmony/src/modules/okd/bootstrap_load_balancer.rs +++ b/harmony/src/modules/okd/bootstrap_load_balancer.rs @@ -77,6 +77,8 @@ impl OKDBootstrapLoadBalancerScore { address: topology.bootstrap_host.ip.to_string(), port, }); + + backend.dedup(); backend } } diff --git a/opnsense-config-xml/src/data/haproxy.rs b/opnsense-config-xml/src/data/haproxy.rs index ef631f3..b0aedc2 100644 --- a/opnsense-config-xml/src/data/haproxy.rs +++ b/opnsense-config-xml/src/data/haproxy.rs @@ -77,7 +77,7 @@ impl YaSerializeTrait for HAProxyId { } } -#[derive(PartialEq, Debug)] +#[derive(PartialEq, Debug, Clone)] pub struct HAProxyId(String); impl Default for HAProxyId { @@ -297,7 +297,7 @@ pub struct HAProxyFrontends { pub frontend: Vec, } -#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)] +#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)] pub struct Frontend { #[yaserde(attribute = true)] pub uuid: String, @@ -310,7 +310,7 @@ pub struct Frontend { pub bind_options: MaybeString, pub mode: String, #[yaserde(rename = "defaultBackend")] - pub default_backend: String, + pub default_backend: Option, pub ssl_enabled: i32, pub ssl_certificates: MaybeString, pub ssl_default_certificate: MaybeString, @@ -416,7 +416,7 @@ pub struct HAProxyBackends { pub backends: Vec, } -#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)] +#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)] pub struct HAProxyBackend { #[yaserde(attribute = true, rename = "uuid")] pub uuid: String, @@ -535,7 +535,7 @@ pub struct HAProxyServers { pub servers: Vec, } -#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)] +#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)] pub struct HAProxyServer { #[yaserde(attribute = true, rename = "uuid")] pub uuid: String, @@ -543,8 +543,8 @@ pub struct HAProxyServer { pub enabled: u8, pub name: String, pub description: MaybeString, - pub address: String, - pub port: u16, + pub address: Option, + pub port: Option, pub checkport: MaybeString, pub mode: String, pub multiplexer_protocol: MaybeString, @@ -589,7 +589,7 @@ pub struct HAProxyHealthChecks { pub healthchecks: Vec, } -#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)] +#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)] pub struct HAProxyHealthCheck { #[yaserde(attribute = true)] pub uuid: String, diff --git a/opnsense-config/Cargo.toml b/opnsense-config/Cargo.toml index 0580cb2..bb682df 100644 --- a/opnsense-config/Cargo.toml +++ b/opnsense-config/Cargo.toml @@ -25,6 +25,7 @@ sha2 = "0.10.9" [dev-dependencies] pretty_assertions.workspace = true +assertor.workspace = true [lints.rust] unexpected_cfgs = { level = "warn", check-cfg = ['cfg(e2e_test)'] } diff --git a/opnsense-config/src/config/manager/ssh.rs b/opnsense-config/src/config/manager/ssh.rs index 4b2fe64..afe232f 100644 --- a/opnsense-config/src/config/manager/ssh.rs +++ b/opnsense-config/src/config/manager/ssh.rs @@ -30,8 +30,7 @@ impl SshConfigManager { self.opnsense_shell .exec(&format!( - "cp /conf/config.xml /conf/backup/{}", - backup_filename + "cp /conf/config.xml /conf/backup/{backup_filename}" )) .await } diff --git a/opnsense-config/src/config/shell/mod.rs b/opnsense-config/src/config/shell/mod.rs index aa03837..aa94ff1 100644 --- a/opnsense-config/src/config/shell/mod.rs +++ b/opnsense-config/src/config/shell/mod.rs @@ -1,9 +1,7 @@ mod ssh; -pub use ssh::*; - -use async_trait::async_trait; - use crate::Error; +use async_trait::async_trait; +pub use ssh::*; #[async_trait] pub trait OPNsenseShell: std::fmt::Debug + Send + Sync { diff --git a/opnsense-config/src/modules/load_balancer.rs b/opnsense-config/src/modules/load_balancer.rs index 6c71ed4..00cb364 100644 --- a/opnsense-config/src/modules/load_balancer.rs +++ b/opnsense-config/src/modules/load_balancer.rs @@ -1,11 +1,8 @@ -use std::sync::Arc; - -use log::warn; +use crate::{config::OPNsenseShell, Error}; use opnsense_config_xml::{ Frontend, HAProxy, HAProxyBackend, HAProxyHealthCheck, HAProxyServer, OPNsense, }; - -use crate::{config::OPNsenseShell, Error}; +use std::{collections::HashSet, sync::Arc}; pub struct LoadBalancerConfig<'a> { opnsense: &'a mut OPNsense, @@ -31,7 +28,7 @@ impl<'a> LoadBalancerConfig<'a> { match &mut self.opnsense.opnsense.haproxy.as_mut() { Some(haproxy) => f(haproxy), None => unimplemented!( - "Adding a backend is not supported when haproxy config does not exist yet" + "Cannot configure load balancer when haproxy config does not exist yet" ), } } @@ -40,21 +37,67 @@ impl<'a> LoadBalancerConfig<'a> { self.with_haproxy(|haproxy| haproxy.general.enabled = enabled as i32); } - pub fn add_backend(&mut self, backend: HAProxyBackend) { - warn!("TODO make sure this new backend does not refer non-existing entities like servers or health checks"); - self.with_haproxy(|haproxy| haproxy.backends.backends.push(backend)); + /// Configures a service by removing any existing service on the same port + /// and then adding the new definition. This ensures idempotency. + pub fn configure_service( + &mut self, + frontend: Frontend, + backend: HAProxyBackend, + servers: Vec, + healthcheck: Option, + ) { + self.remove_service_by_bind_address(&frontend.bind); + self.remove_servers(&servers); + + self.add_new_service(frontend, backend, servers, healthcheck); } - pub fn add_frontend(&mut self, frontend: Frontend) { - self.with_haproxy(|haproxy| haproxy.frontends.frontend.push(frontend)); + // Remove the corresponding real servers based on their name if they already exist. + fn remove_servers(&mut self, servers: &[HAProxyServer]) { + let server_names: HashSet<_> = servers.iter().map(|s| s.name.clone()).collect(); + self.with_haproxy(|haproxy| { + haproxy + .servers + .servers + .retain(|s| !server_names.contains(&s.name)); + }); } - pub fn add_healthcheck(&mut self, healthcheck: HAProxyHealthCheck) { - self.with_haproxy(|haproxy| haproxy.healthchecks.healthchecks.push(healthcheck)); + /// Removes a service and its dependent components based on the frontend's bind address. + /// This performs a cascading delete of the frontend, backend, servers, and health check. + fn remove_service_by_bind_address(&mut self, bind_address: &str) { + self.with_haproxy(|haproxy| { + let Some(old_frontend) = remove_frontend_by_bind_address(haproxy, bind_address) else { + return; + }; + + let Some(old_backend) = remove_backend(haproxy, old_frontend) else { + return; + }; + + remove_healthcheck(haproxy, &old_backend); + remove_linked_servers(haproxy, &old_backend); + }); } - pub fn add_servers(&mut self, mut servers: Vec) { - self.with_haproxy(|haproxy| haproxy.servers.servers.append(&mut servers)); + /// Adds the components of a new service to the HAProxy configuration. + /// This function de-duplicates servers by name to prevent configuration errors. + fn add_new_service( + &mut self, + frontend: Frontend, + backend: HAProxyBackend, + servers: Vec, + healthcheck: Option, + ) { + self.with_haproxy(|haproxy| { + if let Some(check) = healthcheck { + haproxy.healthchecks.healthchecks.push(check); + } + + haproxy.servers.servers.extend(servers); + haproxy.backends.backends.push(backend); + haproxy.frontends.frontend.push(frontend); + }); } pub async fn reload_restart(&self) -> Result<(), Error> { @@ -82,3 +125,262 @@ impl<'a> LoadBalancerConfig<'a> { Ok(()) } } + +fn remove_frontend_by_bind_address(haproxy: &mut HAProxy, bind_address: &str) -> Option { + let pos = haproxy + .frontends + .frontend + .iter() + .position(|f| f.bind == bind_address); + + match pos { + Some(pos) => Some(haproxy.frontends.frontend.remove(pos)), + None => None, + } +} + +fn remove_backend(haproxy: &mut HAProxy, old_frontend: Frontend) -> Option { + let default_backend = old_frontend.default_backend?; + let pos = haproxy + .backends + .backends + .iter() + .position(|b| b.uuid == default_backend); + + match pos { + Some(pos) => Some(haproxy.backends.backends.remove(pos)), + None => None, // orphaned frontend, shouldn't happen + } +} + +fn remove_healthcheck(haproxy: &mut HAProxy, backend: &HAProxyBackend) { + if let Some(uuid) = &backend.health_check.content { + haproxy + .healthchecks + .healthchecks + .retain(|h| h.uuid != *uuid); + } +} + +/// Remove the backend's servers. This assumes servers are not shared between services. +fn remove_linked_servers(haproxy: &mut HAProxy, backend: &HAProxyBackend) { + if let Some(server_uuids_str) = &backend.linked_servers.content { + let server_uuids_to_remove: HashSet<_> = server_uuids_str.split(',').collect(); + haproxy + .servers + .servers + .retain(|s| !server_uuids_to_remove.contains(s.uuid.as_str())); + } +} + +#[cfg(test)] +mod tests { + use crate::config::DummyOPNSenseShell; + use assertor::*; + use opnsense_config_xml::{ + Frontend, HAProxy, HAProxyBackend, HAProxyBackends, HAProxyFrontends, HAProxyHealthCheck, + HAProxyHealthChecks, HAProxyId, HAProxyServer, HAProxyServers, MaybeString, OPNsense, + }; + use std::sync::Arc; + + use super::LoadBalancerConfig; + + static SERVICE_BIND_ADDRESS: &str = "192.168.1.1:80"; + static OTHER_SERVICE_BIND_ADDRESS: &str = "192.168.1.1:443"; + + static SERVER_ADDRESS: &str = "1.1.1.1:80"; + static OTHER_SERVER_ADDRESS: &str = "1.1.1.1:443"; + + #[test] + fn configure_service_should_add_all_service_components_to_haproxy() { + let mut opnsense = given_opnsense(); + let mut load_balancer = given_load_balancer(&mut opnsense); + let (healthcheck, servers, backend, frontend) = + given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS); + + load_balancer.configure_service( + frontend.clone(), + backend.clone(), + servers.clone(), + Some(healthcheck.clone()), + ); + + assert_haproxy_configured_with( + opnsense, + vec![frontend], + vec![backend], + servers, + vec![healthcheck], + ); + } + + #[test] + fn configure_service_should_replace_service_on_same_bind_address() { + let (healthcheck, servers, backend, frontend) = + given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS); + let mut opnsense = given_opnsense_with(given_haproxy( + vec![frontend.clone()], + vec![backend.clone()], + servers.clone(), + vec![healthcheck.clone()], + )); + let mut load_balancer = given_load_balancer(&mut opnsense); + + let (updated_healthcheck, updated_servers, updated_backend, updated_frontend) = + given_service(SERVICE_BIND_ADDRESS, OTHER_SERVER_ADDRESS); + + load_balancer.configure_service( + updated_frontend.clone(), + updated_backend.clone(), + updated_servers.clone(), + Some(updated_healthcheck.clone()), + ); + + assert_haproxy_configured_with( + opnsense, + vec![updated_frontend], + vec![updated_backend], + updated_servers, + vec![updated_healthcheck], + ); + } + + #[test] + fn configure_service_should_keep_existing_service_on_different_bind_addresses() { + let (healthcheck, servers, backend, frontend) = + given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS); + let (other_healthcheck, other_servers, other_backend, other_frontend) = + given_service(OTHER_SERVICE_BIND_ADDRESS, OTHER_SERVER_ADDRESS); + let mut opnsense = given_opnsense_with(given_haproxy( + vec![frontend.clone()], + vec![backend.clone()], + servers.clone(), + vec![healthcheck.clone()], + )); + let mut load_balancer = given_load_balancer(&mut opnsense); + + load_balancer.configure_service( + other_frontend.clone(), + other_backend.clone(), + other_servers.clone(), + Some(other_healthcheck.clone()), + ); + + assert_haproxy_configured_with( + opnsense, + vec![frontend, other_frontend], + vec![backend, other_backend], + [servers, other_servers].concat(), + vec![healthcheck, other_healthcheck], + ); + } + + fn assert_haproxy_configured_with( + opnsense: OPNsense, + frontends: Vec, + backends: Vec, + servers: Vec, + healthchecks: Vec, + ) { + let haproxy = opnsense.opnsense.haproxy.as_ref().unwrap(); + assert_that!(haproxy.frontends.frontend).contains_exactly(frontends); + assert_that!(haproxy.backends.backends).contains_exactly(backends); + assert_that!(haproxy.servers.servers).is_equal_to(servers); + assert_that!(haproxy.healthchecks.healthchecks).contains_exactly(healthchecks); + } + + fn given_opnsense() -> OPNsense { + OPNsense::default() + } + + fn given_opnsense_with(haproxy: HAProxy) -> OPNsense { + let mut opnsense = OPNsense::default(); + opnsense.opnsense.haproxy = Some(haproxy); + + opnsense + } + + fn given_load_balancer<'a>(opnsense: &'a mut OPNsense) -> LoadBalancerConfig<'a> { + let opnsense_shell = Arc::new(DummyOPNSenseShell {}); + if opnsense.opnsense.haproxy.is_none() { + opnsense.opnsense.haproxy = Some(HAProxy::default()); + } + LoadBalancerConfig::new(opnsense, opnsense_shell) + } + + fn given_service( + bind_address: &str, + server_address: &str, + ) -> ( + HAProxyHealthCheck, + Vec, + HAProxyBackend, + Frontend, + ) { + let healthcheck = given_healthcheck(); + let servers = vec![given_server(server_address)]; + let backend = given_backend(); + let frontend = given_frontend(bind_address); + (healthcheck, servers, backend, frontend) + } + + fn given_haproxy( + frontends: Vec, + backends: Vec, + servers: Vec, + healthchecks: Vec, + ) -> HAProxy { + HAProxy { + frontends: HAProxyFrontends { + frontend: frontends, + }, + backends: HAProxyBackends { backends }, + servers: HAProxyServers { servers }, + healthchecks: HAProxyHealthChecks { healthchecks }, + ..Default::default() + } + } + + fn given_frontend(bind_address: &str) -> Frontend { + Frontend { + uuid: "uuid".into(), + id: HAProxyId::default(), + enabled: 1, + name: format!("frontend_{bind_address}"), + bind: bind_address.into(), + default_backend: Some("backend-uuid".into()), + ..Default::default() + } + } + + fn given_backend() -> HAProxyBackend { + HAProxyBackend { + uuid: "backend-uuid".into(), + id: HAProxyId::default(), + enabled: 1, + name: "backend_192.168.1.1:80".into(), + linked_servers: MaybeString::from("server-uuid"), + health_check_enabled: 1, + health_check: MaybeString::from("healthcheck-uuid"), + ..Default::default() + } + } + + fn given_server(address: &str) -> HAProxyServer { + HAProxyServer { + uuid: "server-uuid".into(), + id: HAProxyId::default(), + name: address.into(), + address: Some(address.into()), + ..Default::default() + } + } + + fn given_healthcheck() -> HAProxyHealthCheck { + HAProxyHealthCheck { + uuid: "healthcheck-uuid".into(), + name: "healthcheck".into(), + ..Default::default() + } + } +} From 2a48d51479a5ad004aa8e9b47c0a08bd9816eb21 Mon Sep 17 00:00:00 2001 From: Willem Date: Tue, 21 Oct 2025 11:09:45 -0400 Subject: [PATCH 5/8] fix: naming of k8s distribution --- harmony/src/domain/topology/k8s_anywhere.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/harmony/src/domain/topology/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere.rs index 5e448b8..1c2f764 100644 --- a/harmony/src/domain/topology/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere.rs @@ -64,7 +64,7 @@ enum K8sSource { pub struct K8sAnywhereTopology { k8s_state: Arc>>, tenant_manager: Arc>, - flavour: Arc>, + k8s_distribution: Arc>, config: Arc, } @@ -170,7 +170,7 @@ impl K8sAnywhereTopology { Self { k8s_state: Arc::new(OnceCell::new()), tenant_manager: Arc::new(OnceCell::new()), - flavour: Arc::new(OnceCell::new()), + k8s_distribution: Arc::new(OnceCell::new()), config: Arc::new(K8sAnywhereConfig::from_env()), } } @@ -179,13 +179,13 @@ impl K8sAnywhereTopology { Self { k8s_state: Arc::new(OnceCell::new()), tenant_manager: Arc::new(OnceCell::new()), - flavour: Arc::new(OnceCell::new()), + k8s_distribution: Arc::new(OnceCell::new()), config: Arc::new(config), } } pub async fn get_k8s_distribution(&self) -> Result<&KubernetesDistribution, PreparationError> { - self.flavour + self.k8s_distribution .get_or_try_init(async || { let client = self.k8s_client().await.unwrap(); From 14d1823d153ee3d2d87572fcc31ebb259db6f0e1 Mon Sep 17 00:00:00 2001 From: Willem Date: Tue, 21 Oct 2025 15:54:51 +0000 Subject: [PATCH 6/8] fix: remove ceph osd deletes and purges osd from ceph osd tree\ (#120) k8s returns None rather than zero when checking deployment for replicas exec_app requires commands 's' and '-c' to run correctly Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/120 Co-authored-by: Willem Co-committed-by: Willem --- Cargo.lock | 9 +++ examples/remove_rook_osd/Cargo.toml | 11 ++++ examples/remove_rook_osd/src/main.rs | 18 ++++++ harmony/src/domain/interpret/mod.rs | 5 ++ harmony/src/domain/topology/k8s.rs | 7 ++- ...ment_score.rs => ceph_remove_osd_score.rs} | 60 ++++++++++--------- harmony/src/modules/storage/ceph/mod.rs | 2 +- 7 files changed, 80 insertions(+), 32 deletions(-) create mode 100644 examples/remove_rook_osd/Cargo.toml create mode 100644 examples/remove_rook_osd/src/main.rs rename harmony/src/modules/storage/ceph/{ceph_osd_replacement_score.rs => ceph_remove_osd_score.rs} (90%) diff --git a/Cargo.lock b/Cargo.lock index 429f09b..666fe3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4613,6 +4613,15 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" +[[package]] +name = "remove_rook_osd" +version = "0.1.0" +dependencies = [ + "harmony", + "harmony_cli", + "tokio", +] + [[package]] name = "reqwest" version = "0.11.27" diff --git a/examples/remove_rook_osd/Cargo.toml b/examples/remove_rook_osd/Cargo.toml new file mode 100644 index 0000000..6e35ac0 --- /dev/null +++ b/examples/remove_rook_osd/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "example-remove-rook-osd" +edition = "2024" +version.workspace = true +readme.workspace = true +license.workspace = true + +[dependencies] +harmony = { version = "0.1.0", path = "../../harmony" } +harmony_cli = { version = "0.1.0", path = "../../harmony_cli" } +tokio.workspace = true diff --git a/examples/remove_rook_osd/src/main.rs b/examples/remove_rook_osd/src/main.rs new file mode 100644 index 0000000..2794927 --- /dev/null +++ b/examples/remove_rook_osd/src/main.rs @@ -0,0 +1,18 @@ +use harmony::{ + inventory::Inventory, modules::storage::ceph::ceph_remove_osd_score::CephRemoveOsd, + topology::K8sAnywhereTopology, +}; + +#[tokio::main] +async fn main() { + let ceph_score = CephRemoveOsd { + osd_deployment_name: "rook-ceph-osd-2".to_string(), + rook_ceph_namespace: "rook-ceph".to_string(), + }; + + let topology = K8sAnywhereTopology::from_env(); + let inventory = Inventory::autoload(); + harmony_cli::run(inventory, topology, vec![Box::new(ceph_score)], None) + .await + .unwrap(); +} diff --git a/harmony/src/domain/interpret/mod.rs b/harmony/src/domain/interpret/mod.rs index d555d9e..ad3dac3 100644 --- a/harmony/src/domain/interpret/mod.rs +++ b/harmony/src/domain/interpret/mod.rs @@ -30,6 +30,7 @@ pub enum InterpretName { Lamp, ApplicationMonitoring, K8sPrometheusCrdAlerting, + CephRemoveOsd, DiscoverInventoryAgent, CephClusterHealth, Custom(&'static str), @@ -61,7 +62,11 @@ impl std::fmt::Display for InterpretName { InterpretName::Lamp => f.write_str("LAMP"), InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"), InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"), +<<<<<<< HEAD + InterpretName::CephRemoveOsd => f.write_str("CephRemoveOsd"), +======= InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"), +>>>>>>> origin/master InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"), InterpretName::Custom(name) => f.write_str(name), InterpretName::RHOBAlerting => f.write_str("RHOBAlerting"), diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 5a1e6ec..1fd41e9 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -21,7 +21,7 @@ use kube::{ api::{ApiResource, GroupVersionKind}, runtime::wait::await_condition, }; -use log::{debug, error, trace}; +use log::{debug, error, info, trace}; use serde::{Serialize, de::DeserializeOwned}; use serde_json::{Value, json}; use similar::TextDiff; @@ -80,10 +80,13 @@ impl K8sClient { namespace: Option<&str>, ) -> Result, Error> { let deps: Api = if let Some(ns) = namespace { + debug!("getting namespaced deployment"); Api::namespaced(self.client.clone(), ns) } else { + debug!("getting default namespace deployment"); Api::default_namespaced(self.client.clone()) }; + debug!("getting deployment {} in ns {}", name, namespace.unwrap()); Ok(deps.get_opt(name).await?) } @@ -114,7 +117,7 @@ impl K8sClient { } }); let pp = PatchParams::default(); - let scale = Patch::Apply(&patch); + let scale = Patch::Merge(&patch); deployments.patch_scale(name, &pp, &scale).await?; Ok(()) } diff --git a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs b/harmony/src/modules/storage/ceph/ceph_remove_osd_score.rs similarity index 90% rename from harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs rename to harmony/src/modules/storage/ceph/ceph_remove_osd_score.rs index 77dd24a..787f9cc 100644 --- a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs +++ b/harmony/src/modules/storage/ceph/ceph_remove_osd_score.rs @@ -4,7 +4,7 @@ use std::{ }; use async_trait::async_trait; -use log::{info, warn}; +use log::{debug, warn}; use serde::{Deserialize, Serialize}; use tokio::time::sleep; @@ -19,8 +19,8 @@ use harmony_types::id::Id; #[derive(Debug, Clone, Serialize)] pub struct CephRemoveOsd { - osd_deployment_name: String, - rook_ceph_namespace: String, + pub osd_deployment_name: String, + pub rook_ceph_namespace: String, } impl Score for CephRemoveOsd { @@ -54,18 +54,17 @@ impl Interpret for CephRemoveOsdInterpret { self.verify_deployment_scaled(client.clone()).await?; self.delete_deployment(client.clone()).await?; self.verify_deployment_deleted(client.clone()).await?; - let osd_id_full = self.get_ceph_osd_id().unwrap(); - self.purge_ceph_osd(client.clone(), &osd_id_full).await?; - self.verify_ceph_osd_removal(client.clone(), &osd_id_full) - .await?; + self.purge_ceph_osd(client.clone()).await?; + self.verify_ceph_osd_removal(client.clone()).await?; + let osd_id_full = self.get_ceph_osd_id().unwrap(); Ok(Outcome::success(format!( "Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}", osd_id_full, self.score.osd_deployment_name ))) } fn get_name(&self) -> InterpretName { - todo!() + InterpretName::CephRemoveOsd } fn get_version(&self) -> Version { @@ -82,7 +81,7 @@ impl Interpret for CephRemoveOsdInterpret { } impl CephRemoveOsdInterpret { - pub fn get_ceph_osd_id(&self) -> Result { + pub fn get_ceph_osd_id_numeric(&self) -> Result { let osd_id_numeric = self .score .osd_deployment_name @@ -94,9 +93,14 @@ impl CephRemoveOsdInterpret { self.score.osd_deployment_name )) })?; + Ok(osd_id_numeric.to_string()) + } + + pub fn get_ceph_osd_id(&self) -> Result { + let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap(); let osd_id_full = format!("osd.{}", osd_id_numeric); - info!( + debug!( "Targeting Ceph OSD: {} (parsed from deployment {})", osd_id_full, self.score.osd_deployment_name ); @@ -108,6 +112,7 @@ impl CephRemoveOsdInterpret { &self, client: Arc, ) -> Result { + debug!("verifying toolbox exists"); let toolbox_dep = "rook-ceph-tools".to_string(); match client @@ -149,7 +154,7 @@ impl CephRemoveOsdInterpret { &self, client: Arc, ) -> Result { - info!( + debug!( "Scaling down OSD deployment: {}", self.score.osd_deployment_name ); @@ -172,7 +177,7 @@ impl CephRemoveOsdInterpret { ) -> Result { let (timeout, interval, start) = self.build_timer(); - info!("Waiting for OSD deployment to scale down to 0 replicas"); + debug!("Waiting for OSD deployment to scale down to 0 replicas"); loop { let dep = client .get_deployment( @@ -180,11 +185,9 @@ impl CephRemoveOsdInterpret { Some(&self.score.rook_ceph_namespace), ) .await?; - if let Some(deployment) = dep { if let Some(status) = deployment.status { - if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0 - { + if status.replicas == None && status.ready_replicas == None { return Ok(Outcome::success( "Deployment successfully scaled down.".to_string(), )); @@ -212,7 +215,7 @@ impl CephRemoveOsdInterpret { &self, client: Arc, ) -> Result { - info!( + debug!( "Deleting OSD deployment: {}", self.score.osd_deployment_name ); @@ -234,7 +237,7 @@ impl CephRemoveOsdInterpret { ) -> Result { let (timeout, interval, start) = self.build_timer(); - info!("Waiting for OSD deployment to scale down to 0 replicas"); + debug!("Verifying OSD deployment deleted"); loop { let dep = client .get_deployment( @@ -244,7 +247,7 @@ impl CephRemoveOsdInterpret { .await?; if dep.is_none() { - info!( + debug!( "Deployment {} successfully deleted.", self.score.osd_deployment_name ); @@ -276,12 +279,10 @@ impl CephRemoveOsdInterpret { Ok(tree) } - pub async fn purge_ceph_osd( - &self, - client: Arc, - osd_id_full: &str, - ) -> Result { - info!( + pub async fn purge_ceph_osd(&self, client: Arc) -> Result { + let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap(); + let osd_id_full = self.get_ceph_osd_id().unwrap(); + debug!( "Purging OSD {} from Ceph cluster and removing its auth key", osd_id_full ); @@ -291,8 +292,9 @@ impl CephRemoveOsdInterpret { "app".to_string(), Some(&self.score.rook_ceph_namespace), vec![ - format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(), - format!("ceph auth del osd.{osd_id_full}").as_str(), + "sh", + "-c", + format!("ceph osd purge {osd_id_numeric} --yes-i-really-mean-it && ceph auth del {osd_id_full}").as_str(), ], ) .await?; @@ -305,10 +307,10 @@ impl CephRemoveOsdInterpret { pub async fn verify_ceph_osd_removal( &self, client: Arc, - osd_id_full: &str, ) -> Result { let (timeout, interval, start) = self.build_timer(); - info!( + let osd_id_full = self.get_ceph_osd_id().unwrap(); + debug!( "Verifying OSD {} has been removed from the Ceph tree...", osd_id_full ); @@ -318,7 +320,7 @@ impl CephRemoveOsdInterpret { "rook-ceph-tools".to_string(), "app".to_string(), Some(&self.score.rook_ceph_namespace), - vec!["ceph osd tree -f json"], + vec!["sh", "-c", "ceph osd tree -f json"], ) .await?; let tree = diff --git a/harmony/src/modules/storage/ceph/mod.rs b/harmony/src/modules/storage/ceph/mod.rs index 3e3250e..0a3dcec 100644 --- a/harmony/src/modules/storage/ceph/mod.rs +++ b/harmony/src/modules/storage/ceph/mod.rs @@ -1,2 +1,2 @@ -pub mod ceph_osd_replacement_score; +pub mod ceph_remove_osd_score; pub mod ceph_validate_health_score; From 987f195e2fb5a4da2614e825327eb6ef1da84a00 Mon Sep 17 00:00:00 2001 From: wjro Date: Tue, 21 Oct 2025 15:55:55 +0000 Subject: [PATCH 7/8] feat(cert-manager): add cluster issuer to okd cluster score (#157) added score to install okd cluster issuer Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/157 --- .../modules/cert_manager/cluster_issuer.rs | 209 ++++++++++++++++++ harmony/src/modules/cert_manager/mod.rs | 1 + 2 files changed, 210 insertions(+) create mode 100644 harmony/src/modules/cert_manager/cluster_issuer.rs diff --git a/harmony/src/modules/cert_manager/cluster_issuer.rs b/harmony/src/modules/cert_manager/cluster_issuer.rs new file mode 100644 index 0000000..70294fe --- /dev/null +++ b/harmony/src/modules/cert_manager/cluster_issuer.rs @@ -0,0 +1,209 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use harmony_types::id::Id; +use kube::{CustomResource, api::ObjectMeta}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::{ + data::Version, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + score::Score, + topology::{K8sclient, Topology, k8s::K8sClient}, +}; + +#[derive(Clone, Debug, Serialize)] +pub struct ClusterIssuerScore { + email: String, + server: String, + issuer_name: String, + namespace: String, +} + +impl Score for ClusterIssuerScore { + fn name(&self) -> String { + "ClusterIssuerScore".to_string() + } + + #[doc(hidden)] + fn create_interpret(&self) -> Box> { + Box::new(ClusterIssuerInterpret { + score: self.clone(), + }) + } +} + +#[derive(Debug, Clone)] +pub struct ClusterIssuerInterpret { + score: ClusterIssuerScore, +} + +#[async_trait] +impl Interpret for ClusterIssuerInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + ) -> Result { + self.apply_cluster_issuer(topology.k8s_client().await.unwrap()) + .await + } + + fn get_name(&self) -> InterpretName { + InterpretName::Custom("ClusterIssuer") + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +} + +impl ClusterIssuerInterpret { + async fn validate_cert_manager( + &self, + client: &Arc, + ) -> Result { + let cert_manager = "cert-manager".to_string(); + let operator_namespace = "openshift-operators".to_string(); + match client + .get_deployment(&cert_manager, Some(&operator_namespace)) + .await + { + Ok(Some(deployment)) => { + if let Some(status) = deployment.status { + let ready_count = status.ready_replicas.unwrap_or(0); + if ready_count >= 1 { + return Ok(Outcome::success(format!( + "'{}' is ready with {} replica(s).", + &cert_manager, ready_count + ))); + } else { + return Err(InterpretError::new( + "cert-manager operator not ready in cluster".to_string(), + )); + } + } else { + Err(InterpretError::new(format!( + "failed to get deployment status {} in ns {}", + &cert_manager, &operator_namespace + ))) + } + } + Ok(None) => Err(InterpretError::new(format!( + "Deployment '{}' not found in namespace '{}'.", + &cert_manager, &operator_namespace + ))), + Err(e) => Err(InterpretError::new(format!( + "Failed to query for deployment '{}': {}", + &cert_manager, e + ))), + } + } + + fn build_cluster_issuer(&self) -> Result { + let issuer_name = &self.score.issuer_name; + let email = &self.score.email; + let server = &self.score.server; + let namespace = &self.score.namespace; + let cluster_issuer = ClusterIssuer { + metadata: ObjectMeta { + name: Some(issuer_name.to_string()), + namespace: Some(namespace.to_string()), + ..Default::default() + }, + spec: ClusterIssuerSpec { + acme: AcmeSpec { + email: email.to_string(), + private_key_secret_ref: PrivateKeySecretRef { + name: issuer_name.to_string(), + }, + server: server.to_string(), + solvers: vec![SolverSpec { + http01: Some(Http01Solver { + ingress: Http01Ingress { + class: "nginx".to_string(), + }, + }), + }], + }, + }, + }; + + Ok(cluster_issuer) + } + + pub async fn apply_cluster_issuer( + &self, + client: Arc, + ) -> Result { + let namespace = self.score.namespace.clone(); + self.validate_cert_manager(&client).await?; + let cluster_issuer = self.build_cluster_issuer().unwrap(); + client + .apply_yaml( + &serde_yaml::to_value(cluster_issuer).unwrap(), + Some(&namespace), + ) + .await?; + Ok(Outcome::success(format!( + "successfully deployed cluster operator: {} in namespace: {}", + self.score.issuer_name, self.score.namespace + ))) + } +} + +#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[kube( + group = "cert-manager.io", + version = "v1", + kind = "ClusterIssuer", + plural = "clusterissuers" +)] +#[serde(rename_all = "camelCase")] +pub struct ClusterIssuerSpec { + pub acme: AcmeSpec, +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct AcmeSpec { + pub email: String, + pub private_key_secret_ref: PrivateKeySecretRef, + pub server: String, + pub solvers: Vec, +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct PrivateKeySecretRef { + pub name: String, +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct SolverSpec { + pub http01: Option, + // Other solver types (e.g., dns01) would go here as Options +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct Http01Solver { + pub ingress: Http01Ingress, +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct Http01Ingress { + pub class: String, +} diff --git a/harmony/src/modules/cert_manager/mod.rs b/harmony/src/modules/cert_manager/mod.rs index 8fd309a..032439e 100644 --- a/harmony/src/modules/cert_manager/mod.rs +++ b/harmony/src/modules/cert_manager/mod.rs @@ -1,2 +1,3 @@ +pub mod cluster_issuer; mod helm; pub use helm::*; From 464347d3e5515e909b04cb8c9ae2578c50895116 Mon Sep 17 00:00:00 2001 From: Willem Date: Tue, 21 Oct 2025 12:01:31 -0400 Subject: [PATCH 8/8] fix: fixed merge error that somehow got missed --- harmony/src/domain/interpret/mod.rs | 3 --- 1 file changed, 3 deletions(-) diff --git a/harmony/src/domain/interpret/mod.rs b/harmony/src/domain/interpret/mod.rs index ad3dac3..f9a509d 100644 --- a/harmony/src/domain/interpret/mod.rs +++ b/harmony/src/domain/interpret/mod.rs @@ -62,11 +62,8 @@ impl std::fmt::Display for InterpretName { InterpretName::Lamp => f.write_str("LAMP"), InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"), InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"), -<<<<<<< HEAD InterpretName::CephRemoveOsd => f.write_str("CephRemoveOsd"), -======= InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"), ->>>>>>> origin/master InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"), InterpretName::Custom(name) => f.write_str(name), InterpretName::RHOBAlerting => f.write_str("RHOBAlerting"),