diff --git a/Cargo.lock b/Cargo.lock index 0b5972f..7d9cdcf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1780,6 +1780,7 @@ dependencies = [ name = "example-nanodc" version = "0.1.0" dependencies = [ + "brocade", "cidr", "env_logger", "harmony", @@ -1788,6 +1789,7 @@ dependencies = [ "harmony_tui", "harmony_types", "log", + "serde", "tokio", "url", ] @@ -1806,6 +1808,7 @@ dependencies = [ name = "example-okd-install" version = "0.1.0" dependencies = [ + "brocade", "cidr", "env_logger", "harmony", @@ -1836,13 +1839,16 @@ dependencies = [ name = "example-opnsense" version = "0.1.0" dependencies = [ + "brocade", "cidr", "env_logger", "harmony", "harmony_macros", + "harmony_secret", "harmony_tui", "harmony_types", "log", + "serde", "tokio", "url", ] @@ -1851,6 +1857,7 @@ dependencies = [ name = "example-pxe" version = "0.1.0" dependencies = [ + "brocade", "cidr", "env_logger", "harmony", @@ -1865,6 +1872,15 @@ dependencies = [ "url", ] +[[package]] +name = "example-remove-rook-osd" +version = "0.1.0" +dependencies = [ + "harmony", + "harmony_cli", + "tokio", +] + [[package]] name = "example-rust" version = "0.1.0" @@ -3918,6 +3934,7 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e" name = "opnsense-config" version = "0.1.0" dependencies = [ + "assertor", "async-trait", "chrono", "env_logger", diff --git a/Cargo.toml b/Cargo.toml index a10bf81..a256234 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,8 @@ members = [ "harmony_inventory_agent", "harmony_secret_derive", "harmony_secret", - "adr/agent_discovery/mdns", "brocade", + "adr/agent_discovery/mdns", + "brocade", ] [workspace.package] diff --git a/brocade/src/fast_iron.rs b/brocade/src/fast_iron.rs index a1a2478..5a3474e 100644 --- a/brocade/src/fast_iron.rs +++ b/brocade/src/fast_iron.rs @@ -10,6 +10,7 @@ use log::{debug, info}; use regex::Regex; use std::{collections::HashSet, str::FromStr}; +#[derive(Debug)] pub struct FastIronClient { shell: BrocadeShell, version: BrocadeInfo, diff --git a/brocade/src/lib.rs b/brocade/src/lib.rs index 3822abd..57b464a 100644 --- a/brocade/src/lib.rs +++ b/brocade/src/lib.rs @@ -162,7 +162,7 @@ pub async fn init( } #[async_trait] -pub trait BrocadeClient { +pub trait BrocadeClient: std::fmt::Debug { /// Retrieves the operating system and version details from the connected Brocade switch. /// /// This is typically the first call made after establishing a connection to determine diff --git a/brocade/src/network_operating_system.rs b/brocade/src/network_operating_system.rs index f7f8570..a6f824a 100644 --- a/brocade/src/network_operating_system.rs +++ b/brocade/src/network_operating_system.rs @@ -10,6 +10,7 @@ use crate::{ parse_brocade_mac_address, shell::BrocadeShell, }; +#[derive(Debug)] pub struct NetworkOperatingSystemClient { shell: BrocadeShell, version: BrocadeInfo, diff --git a/brocade/src/shell.rs b/brocade/src/shell.rs index cfa672d..28eceb8 100644 --- a/brocade/src/shell.rs +++ b/brocade/src/shell.rs @@ -13,6 +13,7 @@ use log::info; use russh::ChannelMsg; use tokio::time::timeout; +#[derive(Debug)] pub struct BrocadeShell { ip: IpAddr, port: u16, diff --git a/examples/nanodc/Cargo.toml b/examples/nanodc/Cargo.toml index 889c24d..3072ddf 100644 --- a/examples/nanodc/Cargo.toml +++ b/examples/nanodc/Cargo.toml @@ -17,3 +17,5 @@ harmony_secret = { path = "../../harmony_secret" } log = { workspace = true } env_logger = { workspace = true } url = { workspace = true } +serde = { workspace = true } +brocade = { path = "../../brocade" } diff --git a/examples/nanodc/src/main.rs b/examples/nanodc/src/main.rs index d52766c..629a8dc 100644 --- a/examples/nanodc/src/main.rs +++ b/examples/nanodc/src/main.rs @@ -3,12 +3,13 @@ use std::{ sync::Arc, }; +use brocade::BrocadeOptions; use cidr::Ipv4Cidr; use harmony::{ config::secret::SshKeyPair, data::{FileContent, FilePath}, hardware::{HostCategory, Location, PhysicalHost, SwitchGroup}, - infra::opnsense::OPNSenseManagementInterface, + infra::{brocade::BrocadeSwitchClient, opnsense::OPNSenseManagementInterface}, inventory::Inventory, modules::{ http::StaticFilesHttpScore, @@ -22,8 +23,9 @@ use harmony::{ topology::{LogicalHost, UnmanagedRouter}, }; use harmony_macros::{ip, mac_address}; -use harmony_secret::SecretManager; +use harmony_secret::{Secret, SecretManager}; use harmony_types::net::Url; +use serde::{Deserialize, Serialize}; #[tokio::main] async fn main() { @@ -32,6 +34,26 @@ async fn main() { name: String::from("fw0"), }; + let switch_auth = SecretManager::get_or_prompt::() + .await + .expect("Failed to get credentials"); + + let switches: Vec = vec![ip!("192.168.33.101")]; + let brocade_options = Some(BrocadeOptions { + dry_run: *harmony::config::DRY_RUN, + ..Default::default() + }); + let switch_client = BrocadeSwitchClient::init( + &switches, + &switch_auth.username, + &switch_auth.password, + brocade_options, + ) + .await + .expect("Failed to connect to switch"); + + let switch_client = Arc::new(switch_client); + let opnsense = Arc::new( harmony::infra::opnsense::OPNSenseFirewall::new(firewall, None, "root", "opnsense").await, ); @@ -84,7 +106,7 @@ async fn main() { name: "wk2".to_string(), }, ], - switch: vec![], + switch_client: switch_client.clone(), }; let inventory = Inventory { @@ -167,3 +189,9 @@ async fn main() { .await .unwrap(); } + +#[derive(Secret, Serialize, Deserialize, Debug)] +pub struct BrocadeSwitchAuth { + pub username: String, + pub password: String, +} diff --git a/examples/okd_installation/Cargo.toml b/examples/okd_installation/Cargo.toml index 7314e4f..dfbe944 100644 --- a/examples/okd_installation/Cargo.toml +++ b/examples/okd_installation/Cargo.toml @@ -19,3 +19,4 @@ log = { workspace = true } env_logger = { workspace = true } url = { workspace = true } serde.workspace = true +brocade = { path = "../../brocade" } diff --git a/examples/okd_installation/src/topology.rs b/examples/okd_installation/src/topology.rs index a568e97..ce89402 100644 --- a/examples/okd_installation/src/topology.rs +++ b/examples/okd_installation/src/topology.rs @@ -1,7 +1,8 @@ +use brocade::BrocadeOptions; use cidr::Ipv4Cidr; use harmony::{ hardware::{Location, SwitchGroup}, - infra::opnsense::OPNSenseManagementInterface, + infra::{brocade::BrocadeSwitchClient, opnsense::OPNSenseManagementInterface}, inventory::Inventory, topology::{HAClusterTopology, LogicalHost, UnmanagedRouter}, }; @@ -22,6 +23,26 @@ pub async fn get_topology() -> HAClusterTopology { name: String::from("opnsense-1"), }; + let switch_auth = SecretManager::get_or_prompt::() + .await + .expect("Failed to get credentials"); + + let switches: Vec = vec![ip!("192.168.1.101")]; // TODO: Adjust me + let brocade_options = Some(BrocadeOptions { + dry_run: *harmony::config::DRY_RUN, + ..Default::default() + }); + let switch_client = BrocadeSwitchClient::init( + &switches, + &switch_auth.username, + &switch_auth.password, + brocade_options, + ) + .await + .expect("Failed to connect to switch"); + + let switch_client = Arc::new(switch_client); + let config = SecretManager::get_or_prompt::().await; let config = config.unwrap(); @@ -59,7 +80,7 @@ pub async fn get_topology() -> HAClusterTopology { name: "bootstrap".to_string(), }, workers: vec![], - switch: vec![], + switch_client: switch_client.clone(), } } @@ -76,3 +97,9 @@ pub fn get_inventory() -> Inventory { control_plane_host: vec![], } } + +#[derive(Secret, Serialize, Deserialize, Debug)] +pub struct BrocadeSwitchAuth { + pub username: String, + pub password: String, +} diff --git a/examples/okd_pxe/Cargo.toml b/examples/okd_pxe/Cargo.toml index f75f42b..133b2f9 100644 --- a/examples/okd_pxe/Cargo.toml +++ b/examples/okd_pxe/Cargo.toml @@ -19,3 +19,4 @@ log = { workspace = true } env_logger = { workspace = true } url = { workspace = true } serde.workspace = true +brocade = { path = "../../brocade" } diff --git a/examples/okd_pxe/src/topology.rs b/examples/okd_pxe/src/topology.rs index 6be6af9..7c1244c 100644 --- a/examples/okd_pxe/src/topology.rs +++ b/examples/okd_pxe/src/topology.rs @@ -1,13 +1,15 @@ +use brocade::BrocadeOptions; use cidr::Ipv4Cidr; use harmony::{ config::secret::OPNSenseFirewallCredentials, hardware::{Location, SwitchGroup}, - infra::opnsense::OPNSenseManagementInterface, + infra::{brocade::BrocadeSwitchClient, opnsense::OPNSenseManagementInterface}, inventory::Inventory, topology::{HAClusterTopology, LogicalHost, UnmanagedRouter}, }; use harmony_macros::{ip, ipv4}; -use harmony_secret::SecretManager; +use harmony_secret::{Secret, SecretManager}; +use serde::{Deserialize, Serialize}; use std::{net::IpAddr, sync::Arc}; pub async fn get_topology() -> HAClusterTopology { @@ -16,6 +18,26 @@ pub async fn get_topology() -> HAClusterTopology { name: String::from("opnsense-1"), }; + let switch_auth = SecretManager::get_or_prompt::() + .await + .expect("Failed to get credentials"); + + let switches: Vec = vec![ip!("192.168.1.101")]; // TODO: Adjust me + let brocade_options = Some(BrocadeOptions { + dry_run: *harmony::config::DRY_RUN, + ..Default::default() + }); + let switch_client = BrocadeSwitchClient::init( + &switches, + &switch_auth.username, + &switch_auth.password, + brocade_options, + ) + .await + .expect("Failed to connect to switch"); + + let switch_client = Arc::new(switch_client); + let config = SecretManager::get_or_prompt::().await; let config = config.unwrap(); @@ -53,7 +75,7 @@ pub async fn get_topology() -> HAClusterTopology { name: "cp0".to_string(), }, workers: vec![], - switch: vec![], + switch_client: switch_client.clone(), } } @@ -70,3 +92,9 @@ pub fn get_inventory() -> Inventory { control_plane_host: vec![], } } + +#[derive(Secret, Serialize, Deserialize, Debug)] +pub struct BrocadeSwitchAuth { + pub username: String, + pub password: String, +} diff --git a/examples/opnsense/Cargo.toml b/examples/opnsense/Cargo.toml index 60986d3..1574f29 100644 --- a/examples/opnsense/Cargo.toml +++ b/examples/opnsense/Cargo.toml @@ -16,3 +16,6 @@ harmony_macros = { path = "../../harmony_macros" } log = { workspace = true } env_logger = { workspace = true } url = { workspace = true } +harmony_secret = { path = "../../harmony_secret" } +brocade = { path = "../../brocade" } +serde = { workspace = true } diff --git a/examples/opnsense/src/main.rs b/examples/opnsense/src/main.rs index d269a9d..4422f65 100644 --- a/examples/opnsense/src/main.rs +++ b/examples/opnsense/src/main.rs @@ -3,10 +3,11 @@ use std::{ sync::Arc, }; +use brocade::BrocadeOptions; use cidr::Ipv4Cidr; use harmony::{ hardware::{HostCategory, Location, PhysicalHost, SwitchGroup}, - infra::opnsense::OPNSenseManagementInterface, + infra::{brocade::BrocadeSwitchClient, opnsense::OPNSenseManagementInterface}, inventory::Inventory, modules::{ dummy::{ErrorScore, PanicScore, SuccessScore}, @@ -18,7 +19,9 @@ use harmony::{ topology::{LogicalHost, UnmanagedRouter}, }; use harmony_macros::{ip, mac_address}; +use harmony_secret::{Secret, SecretManager}; use harmony_types::net::Url; +use serde::{Deserialize, Serialize}; #[tokio::main] async fn main() { @@ -27,6 +30,26 @@ async fn main() { name: String::from("opnsense-1"), }; + let switch_auth = SecretManager::get_or_prompt::() + .await + .expect("Failed to get credentials"); + + let switches: Vec = vec![ip!("192.168.5.101")]; // TODO: Adjust me + let brocade_options = Some(BrocadeOptions { + dry_run: *harmony::config::DRY_RUN, + ..Default::default() + }); + let switch_client = BrocadeSwitchClient::init( + &switches, + &switch_auth.username, + &switch_auth.password, + brocade_options, + ) + .await + .expect("Failed to connect to switch"); + + let switch_client = Arc::new(switch_client); + let opnsense = Arc::new( harmony::infra::opnsense::OPNSenseFirewall::new(firewall, None, "root", "opnsense").await, ); @@ -55,7 +78,7 @@ async fn main() { name: "cp0".to_string(), }, workers: vec![], - switch: vec![], + switch_client: switch_client.clone(), }; let inventory = Inventory { @@ -110,3 +133,9 @@ async fn main() { .await .unwrap(); } + +#[derive(Secret, Serialize, Deserialize, Debug)] +pub struct BrocadeSwitchAuth { + pub username: String, + pub password: String, +} diff --git a/examples/remove_rook_osd/Cargo.toml b/examples/remove_rook_osd/Cargo.toml new file mode 100644 index 0000000..6e35ac0 --- /dev/null +++ b/examples/remove_rook_osd/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "example-remove-rook-osd" +edition = "2024" +version.workspace = true +readme.workspace = true +license.workspace = true + +[dependencies] +harmony = { version = "0.1.0", path = "../../harmony" } +harmony_cli = { version = "0.1.0", path = "../../harmony_cli" } +tokio.workspace = true diff --git a/examples/remove_rook_osd/src/main.rs b/examples/remove_rook_osd/src/main.rs new file mode 100644 index 0000000..2794927 --- /dev/null +++ b/examples/remove_rook_osd/src/main.rs @@ -0,0 +1,18 @@ +use harmony::{ + inventory::Inventory, modules::storage::ceph::ceph_remove_osd_score::CephRemoveOsd, + topology::K8sAnywhereTopology, +}; + +#[tokio::main] +async fn main() { + let ceph_score = CephRemoveOsd { + osd_deployment_name: "rook-ceph-osd-2".to_string(), + rook_ceph_namespace: "rook-ceph".to_string(), + }; + + let topology = K8sAnywhereTopology::from_env(); + let inventory = Inventory::autoload(); + harmony_cli::run(inventory, topology, vec![Box::new(ceph_score)], None) + .await + .unwrap(); +} diff --git a/examples/try_rust_webapp/src/main.rs b/examples/try_rust_webapp/src/main.rs index 56a058d..7bfdf57 100644 --- a/examples/try_rust_webapp/src/main.rs +++ b/examples/try_rust_webapp/src/main.rs @@ -3,7 +3,7 @@ use harmony::{ modules::{ application::{ ApplicationScore, RustWebFramework, RustWebapp, - features::{PackagingDeployment, rhob_monitoring::Monitoring}, + features::{Monitoring, PackagingDeployment}, }, monitoring::alert_channel::discord_alert_channel::DiscordWebhook, }, diff --git a/harmony/src/domain/interpret/mod.rs b/harmony/src/domain/interpret/mod.rs index d555d9e..f9a509d 100644 --- a/harmony/src/domain/interpret/mod.rs +++ b/harmony/src/domain/interpret/mod.rs @@ -30,6 +30,7 @@ pub enum InterpretName { Lamp, ApplicationMonitoring, K8sPrometheusCrdAlerting, + CephRemoveOsd, DiscoverInventoryAgent, CephClusterHealth, Custom(&'static str), @@ -61,6 +62,7 @@ impl std::fmt::Display for InterpretName { InterpretName::Lamp => f.write_str("LAMP"), InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"), InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"), + InterpretName::CephRemoveOsd => f.write_str("CephRemoveOsd"), InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"), InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"), InterpretName::Custom(name) => f.write_str(name), diff --git a/harmony/src/domain/topology/ha_cluster.rs b/harmony/src/domain/topology/ha_cluster.rs index 1892cef..ee5f274 100644 --- a/harmony/src/domain/topology/ha_cluster.rs +++ b/harmony/src/domain/topology/ha_cluster.rs @@ -1,7 +1,5 @@ use async_trait::async_trait; -use brocade::BrocadeOptions; use harmony_macros::ip; -use harmony_secret::SecretManager; use harmony_types::{ net::{MacAddress, Url}, switch::PortLocation, @@ -10,14 +8,12 @@ use kube::api::ObjectMeta; use log::debug; use log::info; -use crate::data::FileContent; -use crate::executors::ExecutorError; -use crate::infra::brocade::BrocadeSwitchAuth; -use crate::infra::brocade::BrocadeSwitchClient; -use crate::modules::okd::crd::nmstate::{ - self, NMState, NodeNetworkConfigurationPolicy, NodeNetworkConfigurationPolicySpec, -}; +use crate::modules::okd::crd::nmstate::{self, NodeNetworkConfigurationPolicy}; use crate::topology::PxeOptions; +use crate::{data::FileContent, modules::okd::crd::nmstate::NMState}; +use crate::{ + executors::ExecutorError, modules::okd::crd::nmstate::NodeNetworkConfigurationPolicySpec, +}; use super::{ DHCPStaticEntry, DhcpServer, DnsRecord, DnsRecordType, DnsServer, Firewall, HostNetworkConfig, @@ -27,7 +23,6 @@ use super::{ }; use std::collections::BTreeMap; -use std::net::IpAddr; use std::sync::Arc; #[derive(Debug, Clone)] @@ -40,10 +35,10 @@ pub struct HAClusterTopology { pub tftp_server: Arc, pub http_server: Arc, pub dns_server: Arc, + pub switch_client: Arc, pub bootstrap_host: LogicalHost, pub control_plane: Vec, pub workers: Vec, - pub switch: Vec, pub kubeconfig: Option, } @@ -272,32 +267,11 @@ impl HAClusterTopology { } } - async fn get_switch_client(&self) -> Result, SwitchError> { - let auth = SecretManager::get_or_prompt::() - .await - .map_err(|e| SwitchError::new(format!("Failed to get credentials: {e}")))?; - - // FIXME: We assume Brocade switches - let switches: Vec = self.switch.iter().map(|s| s.ip).collect(); - let brocade_options = Some(BrocadeOptions { - dry_run: *crate::config::DRY_RUN, - ..Default::default() - }); - let client = - BrocadeSwitchClient::init(&switches, &auth.username, &auth.password, brocade_options) - .await - .map_err(|e| SwitchError::new(format!("Failed to connect to switch: {e}")))?; - - Ok(Box::new(client)) - } - async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> { debug!("Configuring port channel: {config:#?}"); - let client = self.get_switch_client().await?; - let switch_ports = config.switch_ports.iter().map(|s| s.port.clone()).collect(); - client + self.switch_client .configure_port_channel(&format!("Harmony_{}", config.host_id), switch_ports) .await .map_err(|e| SwitchError::new(format!("Failed to configure switch: {e}")))?; @@ -322,10 +296,10 @@ impl HAClusterTopology { tftp_server: dummy_infra.clone(), http_server: dummy_infra.clone(), dns_server: dummy_infra.clone(), + switch_client: dummy_infra.clone(), bootstrap_host: dummy_host, control_plane: vec![], workers: vec![], - switch: vec![], } } } @@ -483,8 +457,7 @@ impl HttpServer for HAClusterTopology { #[async_trait] impl Switch for HAClusterTopology { async fn setup_switch(&self) -> Result<(), SwitchError> { - let client = self.get_switch_client().await?; - client.setup().await?; + self.switch_client.setup().await?; Ok(()) } @@ -492,8 +465,7 @@ impl Switch for HAClusterTopology { &self, mac_address: &MacAddress, ) -> Result, SwitchError> { - let client = self.get_switch_client().await?; - let port = client.find_port(mac_address).await?; + let port = self.switch_client.find_port(mac_address).await?; Ok(port) } @@ -689,3 +661,25 @@ impl DnsServer for DummyInfra { unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) } } + +#[async_trait] +impl SwitchClient for DummyInfra { + async fn setup(&self) -> Result<(), SwitchError> { + unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) + } + + async fn find_port( + &self, + _mac_address: &MacAddress, + ) -> Result, SwitchError> { + unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) + } + + async fn configure_port_channel( + &self, + _channel_name: &str, + _switch_ports: Vec, + ) -> Result { + unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA) + } +} diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 10a7df3..71129e1 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -3,10 +3,14 @@ use std::time::Duration; use derive_new::new; use k8s_openapi::{ ClusterResourceScope, NamespaceResourceScope, - api::{apps::v1::Deployment, core::v1::Pod}, + api::{ + apps::v1::Deployment, + core::v1::{Pod, ServiceAccount}, + }, + apimachinery::pkg::version::Info, }; use kube::{ - Client, Config, Error, Resource, + Client, Config, Discovery, Error, Resource, api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt}, config::{KubeConfigOptions, Kubeconfig}, core::ErrorResponse, @@ -19,7 +23,7 @@ use kube::{ api::{ApiResource, GroupVersionKind}, runtime::wait::await_condition, }; -use log::{debug, error, trace, warn}; +use log::{debug, error, info, trace, warn}; use serde::{Serialize, de::DeserializeOwned}; use serde_json::json; use similar::TextDiff; @@ -58,6 +62,22 @@ impl K8sClient { }) } + pub async fn service_account_api(&self, namespace: &str) -> Api { + let api: Api = Api::namespaced(self.client.clone(), namespace); + api + } + + pub async fn get_apiserver_version(&self) -> Result { + let client: Client = self.client.clone(); + let version_info: Info = client.apiserver_version().await?; + Ok(version_info) + } + + pub async fn discovery(&self) -> Result { + let discovery: Discovery = Discovery::new(self.client.clone()).run().await?; + Ok(discovery) + } + pub async fn get_resource_json_value( &self, name: &str, @@ -80,11 +100,14 @@ impl K8sClient { namespace: Option<&str>, ) -> Result, Error> { let deps: Api = if let Some(ns) = namespace { + debug!("getting namespaced deployment"); Api::namespaced(self.client.clone(), ns) } else { + debug!("getting default namespace deployment"); Api::default_namespaced(self.client.clone()) }; + debug!("getting deployment {} in ns {}", name, namespace.unwrap()); deps.get_opt(name).await } @@ -116,7 +139,7 @@ impl K8sClient { } }); let pp = PatchParams::default(); - let scale = Patch::Apply(&patch); + let scale = Patch::Merge(&patch); deployments.patch_scale(name, &pp, &scale).await?; Ok(()) } diff --git a/harmony/src/domain/topology/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere.rs index 53b6436..226860b 100644 --- a/harmony/src/domain/topology/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere.rs @@ -1,7 +1,12 @@ -use std::{process::Command, sync::Arc}; +use std::{collections::BTreeMap, process::Command, sync::Arc, time::Duration}; use async_trait::async_trait; -use kube::api::GroupVersionKind; +use base64::{Engine, engine::general_purpose}; +use k8s_openapi::api::{ + core::v1::Secret, + rbac::v1::{ClusterRoleBinding, RoleRef, Subject}, +}; +use kube::api::{DynamicObject, GroupVersionKind, ObjectMeta}; use log::{debug, info, warn}; use serde::Serialize; use tokio::sync::OnceCell; @@ -12,14 +17,26 @@ use crate::{ inventory::Inventory, modules::{ k3d::K3DInstallationScore, - monitoring::kube_prometheus::crd::{ - crd_alertmanager_config::CRDPrometheus, - prometheus_operator::prometheus_operator_helm_chart_score, - rhob_alertmanager_config::RHOBObservability, + k8s::ingress::{K8sIngressScore, PathType}, + monitoring::{ + grafana::{grafana::Grafana, helm::helm_grafana::grafana_helm_chart_score}, + kube_prometheus::crd::{ + crd_alertmanager_config::CRDPrometheus, + crd_grafana::{ + Grafana as GrafanaCRD, GrafanaCom, GrafanaDashboard, + GrafanaDashboardDatasource, GrafanaDashboardSpec, GrafanaDatasource, + GrafanaDatasourceConfig, GrafanaDatasourceJsonData, + GrafanaDatasourceSecureJsonData, GrafanaDatasourceSpec, GrafanaSpec, + }, + crd_prometheuses::LabelSelector, + prometheus_operator::prometheus_operator_helm_chart_score, + rhob_alertmanager_config::RHOBObservability, + service_monitor::ServiceMonitor, + }, }, prometheus::{ k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore, - prometheus::PrometheusApplicationMonitoring, rhob_alerting_score::RHOBAlertingScore, + prometheus::PrometheusMonitoring, rhob_alerting_score::RHOBAlertingScore, }, }, score::Score, @@ -47,6 +64,13 @@ struct K8sState { message: String, } +#[derive(Debug, Clone)] +pub enum KubernetesDistribution { + OpenshiftFamily, + K3sFamily, + Default, +} + #[derive(Debug, Clone)] enum K8sSource { LocalK3d, @@ -57,6 +81,7 @@ enum K8sSource { pub struct K8sAnywhereTopology { k8s_state: Arc>>, tenant_manager: Arc>, + k8s_distribution: Arc>, config: Arc, } @@ -78,41 +103,172 @@ impl K8sclient for K8sAnywhereTopology { } #[async_trait] -impl PrometheusApplicationMonitoring for K8sAnywhereTopology { +impl Grafana for K8sAnywhereTopology { + async fn ensure_grafana_operator( + &self, + inventory: &Inventory, + ) -> Result { + debug!("ensure grafana operator"); + let client = self.k8s_client().await.unwrap(); + let grafana_gvk = GroupVersionKind { + group: "grafana.integreatly.org".to_string(), + version: "v1beta1".to_string(), + kind: "Grafana".to_string(), + }; + let name = "grafanas.grafana.integreatly.org"; + let ns = "grafana"; + + let grafana_crd = client + .get_resource_json_value(name, Some(ns), &grafana_gvk) + .await; + match grafana_crd { + Ok(_) => { + return Ok(PreparationOutcome::Success { + details: "Found grafana CRDs in cluster".to_string(), + }); + } + + Err(_) => { + return self + .install_grafana_operator(inventory, Some("grafana")) + .await; + } + }; + } + async fn install_grafana(&self) -> Result { + let ns = "grafana"; + + let mut label = BTreeMap::new(); + + label.insert("dashboards".to_string(), "grafana".to_string()); + + let label_selector = LabelSelector { + match_labels: label.clone(), + match_expressions: vec![], + }; + + let client = self.k8s_client().await?; + + let grafana = self.build_grafana(ns, &label); + + client.apply(&grafana, Some(ns)).await?; + //TODO change this to a ensure ready or something better than just a timeout + client + .wait_until_deployment_ready( + "grafana-grafana-deployment", + Some("grafana"), + Some(Duration::from_secs(30)), + ) + .await?; + + let sa_name = "grafana-grafana-sa"; + let token_secret_name = "grafana-sa-token-secret"; + + let sa_token_secret = self.build_sa_token_secret(token_secret_name, sa_name, ns); + + client.apply(&sa_token_secret, Some(ns)).await?; + let secret_gvk = GroupVersionKind { + group: "".to_string(), + version: "v1".to_string(), + kind: "Secret".to_string(), + }; + + let secret = client + .get_resource_json_value(token_secret_name, Some(ns), &secret_gvk) + .await?; + + let token = format!( + "Bearer {}", + self.extract_and_normalize_token(&secret).unwrap() + ); + + debug!("creating grafana clusterrole binding"); + + let clusterrolebinding = + self.build_cluster_rolebinding(sa_name, "cluster-monitoring-view", ns); + + client.apply(&clusterrolebinding, Some(ns)).await?; + + debug!("creating grafana datasource crd"); + + let thanos_url = format!( + "https://{}", + self.get_domain("thanos-querier-openshift-monitoring") + .await + .unwrap() + ); + + let thanos_openshift_datasource = self.build_grafana_datasource( + "thanos-openshift-monitoring", + ns, + &label_selector, + &thanos_url, + &token, + ); + + client.apply(&thanos_openshift_datasource, Some(ns)).await?; + + debug!("creating grafana dashboard crd"); + let dashboard = self.build_grafana_dashboard(ns, &label_selector); + + client.apply(&dashboard, Some(ns)).await?; + debug!("creating grafana ingress"); + let grafana_ingress = self.build_grafana_ingress(ns).await; + + grafana_ingress + .interpret(&Inventory::empty(), self) + .await + .map_err(|e| PreparationError::new(e.to_string()))?; + + Ok(PreparationOutcome::Success { + details: "Installed grafana composants".to_string(), + }) + } +} + +#[async_trait] +impl PrometheusMonitoring for K8sAnywhereTopology { async fn install_prometheus( &self, sender: &CRDPrometheus, - inventory: &Inventory, - receivers: Option>>>, + _inventory: &Inventory, + _receivers: Option>>>, + ) -> Result { + let client = self.k8s_client().await?; + + for monitor in sender.service_monitor.iter() { + client + .apply(monitor, Some(&sender.namespace)) + .await + .map_err(|e| PreparationError::new(e.to_string()))?; + } + Ok(PreparationOutcome::Success { + details: "successfuly installed prometheus components".to_string(), + }) + } + + async fn ensure_prometheus_operator( + &self, + sender: &CRDPrometheus, + _inventory: &Inventory, ) -> Result { let po_result = self.ensure_prometheus_operator(sender).await?; - if po_result == PreparationOutcome::Noop { - debug!("Skipping Prometheus CR installation due to missing operator."); - return Ok(po_result); - } - - let result = self - .get_k8s_prometheus_application_score(sender.clone(), receivers) - .await - .interpret(inventory, self) - .await; - - match result { - Ok(outcome) => match outcome.status { - InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success { - details: outcome.message, - }), - InterpretStatus::NOOP => Ok(PreparationOutcome::Noop), - _ => Err(PreparationError::new(outcome.message)), - }, - Err(err) => Err(PreparationError::new(err.to_string())), + match po_result { + PreparationOutcome::Success { details: _ } => { + debug!("Detected prometheus crds operator present in cluster."); + return Ok(po_result); + } + PreparationOutcome::Noop => { + debug!("Skipping Prometheus CR installation due to missing operator."); + return Ok(po_result); + } } } } #[async_trait] -impl PrometheusApplicationMonitoring for K8sAnywhereTopology { +impl PrometheusMonitoring for K8sAnywhereTopology { async fn install_prometheus( &self, sender: &RHOBObservability, @@ -146,6 +302,14 @@ impl PrometheusApplicationMonitoring for K8sAnywhereTopology Err(err) => Err(PreparationError::new(err.to_string())), } } + + async fn ensure_prometheus_operator( + &self, + sender: &RHOBObservability, + inventory: &Inventory, + ) -> Result { + todo!() + } } impl Serialize for K8sAnywhereTopology { @@ -162,6 +326,7 @@ impl K8sAnywhereTopology { Self { k8s_state: Arc::new(OnceCell::new()), tenant_manager: Arc::new(OnceCell::new()), + k8s_distribution: Arc::new(OnceCell::new()), config: Arc::new(K8sAnywhereConfig::from_env()), } } @@ -170,10 +335,216 @@ impl K8sAnywhereTopology { Self { k8s_state: Arc::new(OnceCell::new()), tenant_manager: Arc::new(OnceCell::new()), + k8s_distribution: Arc::new(OnceCell::new()), config: Arc::new(config), } } + pub async fn get_k8s_distribution(&self) -> Result<&KubernetesDistribution, PreparationError> { + self.k8s_distribution + .get_or_try_init(async || { + let client = self.k8s_client().await.unwrap(); + + let discovery = client.discovery().await.map_err(|e| { + PreparationError::new(format!("Could not discover API groups: {}", e)) + })?; + + let version = client.get_apiserver_version().await.map_err(|e| { + PreparationError::new(format!("Could not get server version: {}", e)) + })?; + + // OpenShift / OKD + if discovery + .groups() + .any(|g| g.name() == "project.openshift.io") + { + return Ok(KubernetesDistribution::OpenshiftFamily); + } + + // K3d / K3s + if version.git_version.contains("k3s") { + return Ok(KubernetesDistribution::K3sFamily); + } + + return Ok(KubernetesDistribution::Default); + }) + .await + } + + fn extract_and_normalize_token(&self, secret: &DynamicObject) -> Option { + let token_b64 = secret + .data + .get("token") + .or_else(|| secret.data.get("data").and_then(|d| d.get("token"))) + .and_then(|v| v.as_str())?; + + let bytes = general_purpose::STANDARD.decode(token_b64).ok()?; + + let s = String::from_utf8(bytes).ok()?; + + let cleaned = s + .trim_matches(|c: char| c.is_whitespace() || c == '\0') + .to_string(); + Some(cleaned) + } + + pub fn build_cluster_rolebinding( + &self, + service_account_name: &str, + clusterrole_name: &str, + ns: &str, + ) -> ClusterRoleBinding { + ClusterRoleBinding { + metadata: ObjectMeta { + name: Some(format!("{}-view-binding", service_account_name)), + ..Default::default() + }, + role_ref: RoleRef { + api_group: "rbac.authorization.k8s.io".into(), + kind: "ClusterRole".into(), + name: clusterrole_name.into(), + }, + subjects: Some(vec![Subject { + kind: "ServiceAccount".into(), + name: service_account_name.into(), + namespace: Some(ns.into()), + ..Default::default() + }]), + } + } + + pub fn build_sa_token_secret( + &self, + secret_name: &str, + service_account_name: &str, + ns: &str, + ) -> Secret { + let mut annotations = BTreeMap::new(); + annotations.insert( + "kubernetes.io/service-account.name".to_string(), + service_account_name.to_string(), + ); + + Secret { + metadata: ObjectMeta { + name: Some(secret_name.into()), + namespace: Some(ns.into()), + annotations: Some(annotations), + ..Default::default() + }, + type_: Some("kubernetes.io/service-account-token".to_string()), + ..Default::default() + } + } + + fn build_grafana_datasource( + &self, + name: &str, + ns: &str, + label_selector: &LabelSelector, + url: &str, + token: &str, + ) -> GrafanaDatasource { + let mut json_data = BTreeMap::new(); + json_data.insert("timeInterval".to_string(), "5s".to_string()); + + GrafanaDatasource { + metadata: ObjectMeta { + name: Some(name.to_string()), + namespace: Some(ns.to_string()), + ..Default::default() + }, + spec: GrafanaDatasourceSpec { + instance_selector: label_selector.clone(), + allow_cross_namespace_import: Some(true), + values_from: None, + datasource: GrafanaDatasourceConfig { + access: "proxy".to_string(), + name: name.to_string(), + r#type: "prometheus".to_string(), + url: url.to_string(), + database: None, + json_data: Some(GrafanaDatasourceJsonData { + time_interval: Some("60s".to_string()), + http_header_name1: Some("Authorization".to_string()), + tls_skip_verify: Some(true), + oauth_pass_thru: Some(true), + }), + secure_json_data: Some(GrafanaDatasourceSecureJsonData { + http_header_value1: Some(format!("Bearer {token}")), + }), + is_default: Some(false), + editable: Some(true), + }, + }, + } + } + + fn build_grafana_dashboard( + &self, + ns: &str, + label_selector: &LabelSelector, + ) -> GrafanaDashboard { + let graf_dashboard = GrafanaDashboard { + metadata: ObjectMeta { + name: Some(format!("grafana-dashboard-{}", ns)), + namespace: Some(ns.to_string()), + ..Default::default() + }, + spec: GrafanaDashboardSpec { + resync_period: Some("30s".to_string()), + instance_selector: label_selector.clone(), + datasources: Some(vec![GrafanaDashboardDatasource { + input_name: "DS_PROMETHEUS".to_string(), + datasource_name: "thanos-openshift-monitoring".to_string(), + }]), + json: None, + grafana_com: Some(GrafanaCom { + id: 17406, + revision: None, + }), + }, + }; + graf_dashboard + } + + fn build_grafana(&self, ns: &str, labels: &BTreeMap) -> GrafanaCRD { + let grafana = GrafanaCRD { + metadata: ObjectMeta { + name: Some(format!("grafana-{}", ns)), + namespace: Some(ns.to_string()), + labels: Some(labels.clone()), + ..Default::default() + }, + spec: GrafanaSpec { + config: None, + admin_user: None, + admin_password: None, + ingress: None, + persistence: None, + resources: None, + }, + }; + grafana + } + + async fn build_grafana_ingress(&self, ns: &str) -> K8sIngressScore { + let domain = self.get_domain(&format!("grafana-{}", ns)).await.unwrap(); + let name = format!("{}-grafana", ns); + let backend_service = format!("grafana-{}-service", ns); + + K8sIngressScore { + name: fqdn::fqdn!(&name), + host: fqdn::fqdn!(&domain), + backend_service: fqdn::fqdn!(&backend_service), + port: 3000, + path: Some("/".to_string()), + path_type: Some(PathType::Prefix), + namespace: Some(fqdn::fqdn!(&ns)), + ingress_class_name: Some("openshift-default".to_string()), + } + } + async fn get_cluster_observability_operator_prometheus_application_score( &self, sender: RHOBObservability, @@ -191,13 +562,14 @@ impl K8sAnywhereTopology { &self, sender: CRDPrometheus, receivers: Option>>>, + service_monitors: Option>, ) -> K8sPrometheusCRDAlertingScore { - K8sPrometheusCRDAlertingScore { + return K8sPrometheusCRDAlertingScore { sender, receivers: receivers.unwrap_or_default(), - service_monitors: vec![], + service_monitors: service_monitors.unwrap_or_default(), prometheus_rules: vec![], - } + }; } async fn openshift_ingress_operator_available(&self) -> Result<(), PreparationError> { @@ -465,6 +837,30 @@ impl K8sAnywhereTopology { details: "prometheus operator present in cluster".into(), }) } + + async fn install_grafana_operator( + &self, + inventory: &Inventory, + ns: Option<&str>, + ) -> Result { + let namespace = ns.unwrap_or("grafana"); + info!("installing grafana operator in ns {namespace}"); + let tenant = self.get_k8s_tenant_manager()?.get_tenant_config().await; + let mut namespace_scope = false; + if tenant.is_some() { + namespace_scope = true; + } + let _grafana_operator_score = grafana_helm_chart_score(namespace, namespace_scope) + .interpret(inventory, self) + .await + .map_err(|e| PreparationError::new(e.to_string())); + Ok(PreparationOutcome::Success { + details: format!( + "Successfully installed grafana operator in ns {}", + ns.unwrap() + ), + }) + } } #[derive(Clone, Debug)] diff --git a/harmony/src/domain/topology/load_balancer.rs b/harmony/src/domain/topology/load_balancer.rs index 901602b..59c5add 100644 --- a/harmony/src/domain/topology/load_balancer.rs +++ b/harmony/src/domain/topology/load_balancer.rs @@ -28,13 +28,7 @@ pub trait LoadBalancer: Send + Sync { &self, service: &LoadBalancerService, ) -> Result<(), ExecutorError> { - debug!( - "Listing LoadBalancer services {:?}", - self.list_services().await - ); - if !self.list_services().await.contains(service) { - self.add_service(service).await?; - } + self.add_service(service).await?; Ok(()) } } diff --git a/harmony/src/domain/topology/network.rs b/harmony/src/domain/topology/network.rs index 8a939a3..cf172ee 100644 --- a/harmony/src/domain/topology/network.rs +++ b/harmony/src/domain/topology/network.rs @@ -1,4 +1,10 @@ -use std::{error::Error, net::Ipv4Addr, str::FromStr, sync::Arc}; +use std::{ + error::Error, + fmt::{self, Debug}, + net::Ipv4Addr, + str::FromStr, + sync::Arc, +}; use async_trait::async_trait; use derive_new::new; @@ -20,8 +26,8 @@ pub struct DHCPStaticEntry { pub ip: Ipv4Addr, } -impl std::fmt::Display for DHCPStaticEntry { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for DHCPStaticEntry { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { let mac = self .mac .iter() @@ -43,8 +49,8 @@ pub trait Firewall: Send + Sync { fn get_host(&self) -> LogicalHost; } -impl std::fmt::Debug for dyn Firewall { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl Debug for dyn Firewall { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_fmt(format_args!("Firewall {}", self.get_ip())) } } @@ -66,7 +72,7 @@ pub struct PxeOptions { } #[async_trait] -pub trait DhcpServer: Send + Sync + std::fmt::Debug { +pub trait DhcpServer: Send + Sync + Debug { async fn add_static_mapping(&self, entry: &DHCPStaticEntry) -> Result<(), ExecutorError>; async fn remove_static_mapping(&self, mac: &MacAddress) -> Result<(), ExecutorError>; async fn list_static_mappings(&self) -> Vec<(MacAddress, IpAddress)>; @@ -105,8 +111,8 @@ pub trait DnsServer: Send + Sync { } } -impl std::fmt::Debug for dyn DnsServer { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl Debug for dyn DnsServer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_fmt(format_args!("DnsServer {}", self.get_ip())) } } @@ -142,8 +148,8 @@ pub enum DnsRecordType { TXT, } -impl std::fmt::Display for DnsRecordType { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for DnsRecordType { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { DnsRecordType::A => write!(f, "A"), DnsRecordType::AAAA => write!(f, "AAAA"), @@ -214,8 +220,8 @@ pub struct SwitchError { msg: String, } -impl std::fmt::Display for SwitchError { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { +impl fmt::Display for SwitchError { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { f.write_str(&self.msg) } } @@ -223,7 +229,7 @@ impl std::fmt::Display for SwitchError { impl Error for SwitchError {} #[async_trait] -pub trait SwitchClient: Send + Sync { +pub trait SwitchClient: Debug + Send + Sync { /// Executes essential, idempotent, one-time initial configuration steps. /// /// This is an opiniated procedure that setups a switch to provide high availability diff --git a/harmony/src/domain/topology/oberservability/monitoring.rs b/harmony/src/domain/topology/oberservability/monitoring.rs index 1489e83..6d7411c 100644 --- a/harmony/src/domain/topology/oberservability/monitoring.rs +++ b/harmony/src/domain/topology/oberservability/monitoring.rs @@ -21,6 +21,7 @@ pub struct AlertingInterpret { pub sender: S, pub receivers: Vec>>, pub rules: Vec>>, + pub scrape_targets: Option>>>, } #[async_trait] @@ -30,6 +31,7 @@ impl, T: Topology> Interpret for AlertingInte inventory: &Inventory, topology: &T, ) -> Result { + debug!("hit sender configure for AlertingInterpret"); self.sender.configure(inventory, topology).await?; for receiver in self.receivers.iter() { receiver.install(&self.sender).await?; @@ -38,6 +40,12 @@ impl, T: Topology> Interpret for AlertingInte debug!("installing rule: {:#?}", rule); rule.install(&self.sender).await?; } + if let Some(targets) = &self.scrape_targets { + for target in targets.iter() { + debug!("installing scrape_target: {:#?}", target); + target.install(&self.sender).await?; + } + } self.sender.ensure_installed(inventory, topology).await?; Ok(Outcome::success(format!( "successfully installed alert sender {}", @@ -77,6 +85,7 @@ pub trait AlertRule: std::fmt::Debug + Send + Sync { } #[async_trait] -pub trait ScrapeTarget { - async fn install(&self, sender: &S) -> Result<(), InterpretError>; +pub trait ScrapeTarget: std::fmt::Debug + Send + Sync { + async fn install(&self, sender: &S) -> Result; + fn clone_box(&self) -> Box>; } diff --git a/harmony/src/infra/brocade.rs b/harmony/src/infra/brocade.rs index f721328..774c8f8 100644 --- a/harmony/src/infra/brocade.rs +++ b/harmony/src/infra/brocade.rs @@ -1,15 +1,14 @@ use async_trait::async_trait; use brocade::{BrocadeClient, BrocadeOptions, InterSwitchLink, InterfaceStatus, PortOperatingMode}; -use harmony_secret::Secret; use harmony_types::{ net::{IpAddress, MacAddress}, switch::{PortDeclaration, PortLocation}, }; use option_ext::OptionExt; -use serde::{Deserialize, Serialize}; use crate::topology::{SwitchClient, SwitchError}; +#[derive(Debug)] pub struct BrocadeSwitchClient { brocade: Box, } @@ -114,12 +113,6 @@ impl SwitchClient for BrocadeSwitchClient { } } -#[derive(Secret, Serialize, Deserialize, Debug)] -pub struct BrocadeSwitchAuth { - pub username: String, - pub password: String, -} - #[cfg(test)] mod tests { use std::sync::{Arc, Mutex}; @@ -235,7 +228,7 @@ mod tests { assert_that!(*configured_interfaces).is_empty(); } - #[derive(Clone)] + #[derive(Debug, Clone)] struct FakeBrocadeClient { stack_topology: Vec, interfaces: Vec, diff --git a/harmony/src/infra/opnsense/load_balancer.rs b/harmony/src/infra/opnsense/load_balancer.rs index ce47f05..3df7511 100644 --- a/harmony/src/infra/opnsense/load_balancer.rs +++ b/harmony/src/infra/opnsense/load_balancer.rs @@ -26,19 +26,13 @@ impl LoadBalancer for OPNSenseFirewall { } async fn add_service(&self, service: &LoadBalancerService) -> Result<(), ExecutorError> { - warn!( - "TODO : the current implementation does not check / cleanup / merge with existing haproxy services properly. Make sure to manually verify that the configuration is correct after executing any operation here" - ); let mut config = self.opnsense_config.write().await; + let mut load_balancer = config.load_balancer(); + let (frontend, backend, servers, healthcheck) = harmony_load_balancer_service_to_haproxy_xml(service); - let mut load_balancer = config.load_balancer(); - load_balancer.add_backend(backend); - load_balancer.add_frontend(frontend); - load_balancer.add_servers(servers); - if let Some(healthcheck) = healthcheck { - load_balancer.add_healthcheck(healthcheck); - } + + load_balancer.configure_service(frontend, backend, servers, healthcheck); Ok(()) } @@ -106,7 +100,7 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer( .backends .backends .iter() - .find(|b| b.uuid == frontend.default_backend); + .find(|b| Some(b.uuid.clone()) == frontend.default_backend); let mut health_check = None; match matching_backend { @@ -116,8 +110,7 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer( } None => { warn!( - "HAProxy config could not find a matching backend for frontend {:?}", - frontend + "HAProxy config could not find a matching backend for frontend {frontend:?}" ); } } @@ -152,11 +145,11 @@ pub(crate) fn get_servers_for_backend( .servers .iter() .filter_map(|server| { + let address = server.address.clone()?; + let port = server.port?; + if backend_servers.contains(&server.uuid.as_str()) { - return Some(BackendServer { - address: server.address.clone(), - port: server.port, - }); + return Some(BackendServer { address, port }); } None }) @@ -347,7 +340,7 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml( name: format!("frontend_{}", service.listening_port), bind: service.listening_port.to_string(), mode: "tcp".to_string(), // TODO do not depend on health check here - default_backend: backend.uuid.clone(), + default_backend: Some(backend.uuid.clone()), ..Default::default() }; info!("HAPRoxy frontend and backend mode currently hardcoded to tcp"); @@ -361,8 +354,8 @@ fn server_to_haproxy_server(server: &BackendServer) -> HAProxyServer { uuid: Uuid::new_v4().to_string(), name: format!("{}_{}", &server.address, &server.port), enabled: 1, - address: server.address.clone(), - port: server.port, + address: Some(server.address.clone()), + port: Some(server.port), mode: "active".to_string(), server_type: "static".to_string(), ..Default::default() @@ -385,8 +378,8 @@ mod tests { let mut haproxy = HAProxy::default(); let server = HAProxyServer { uuid: "server1".to_string(), - address: "192.168.1.1".to_string(), - port: 80, + address: Some("192.168.1.1".to_string()), + port: Some(80), ..Default::default() }; haproxy.servers.servers.push(server); @@ -411,8 +404,8 @@ mod tests { let mut haproxy = HAProxy::default(); let server = HAProxyServer { uuid: "server1".to_string(), - address: "192.168.1.1".to_string(), - port: 80, + address: Some("192.168.1.1".to_string()), + port: Some(80), ..Default::default() }; haproxy.servers.servers.push(server); @@ -431,8 +424,8 @@ mod tests { let mut haproxy = HAProxy::default(); let server = HAProxyServer { uuid: "server1".to_string(), - address: "192.168.1.1".to_string(), - port: 80, + address: Some("192.168.1.1".to_string()), + port: Some(80), ..Default::default() }; haproxy.servers.servers.push(server); @@ -453,16 +446,16 @@ mod tests { let mut haproxy = HAProxy::default(); let server = HAProxyServer { uuid: "server1".to_string(), - address: "some-hostname.test.mcd".to_string(), - port: 80, + address: Some("some-hostname.test.mcd".to_string()), + port: Some(80), ..Default::default() }; haproxy.servers.servers.push(server); let server = HAProxyServer { uuid: "server2".to_string(), - address: "192.168.1.2".to_string(), - port: 8080, + address: Some("192.168.1.2".to_string()), + port: Some(8080), ..Default::default() }; haproxy.servers.servers.push(server); diff --git a/harmony/src/modules/application/features/monitoring.rs b/harmony/src/modules/application/features/monitoring.rs index 1a60d00..fd6ae2a 100644 --- a/harmony/src/modules/application/features/monitoring.rs +++ b/harmony/src/modules/application/features/monitoring.rs @@ -2,7 +2,11 @@ use crate::modules::application::{ Application, ApplicationFeature, InstallationError, InstallationOutcome, }; use crate::modules::monitoring::application_monitoring::application_monitoring_score::ApplicationMonitoringScore; +use crate::modules::monitoring::grafana::grafana::Grafana; use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::CRDPrometheus; +use crate::modules::monitoring::kube_prometheus::crd::service_monitor::{ + ServiceMonitor, ServiceMonitorSpec, +}; use crate::topology::MultiTargetTopology; use crate::topology::ingress::Ingress; use crate::{ @@ -14,7 +18,7 @@ use crate::{ topology::{HelmCommand, K8sclient, Topology, tenant::TenantManager}, }; use crate::{ - modules::prometheus::prometheus::PrometheusApplicationMonitoring, + modules::prometheus::prometheus::PrometheusMonitoring, topology::oberservability::monitoring::AlertReceiver, }; use async_trait::async_trait; @@ -22,6 +26,7 @@ use base64::{Engine as _, engine::general_purpose}; use harmony_secret::SecretManager; use harmony_secret_derive::Secret; use harmony_types::net::Url; +use kube::api::ObjectMeta; use log::{debug, info}; use serde::{Deserialize, Serialize}; use std::sync::Arc; @@ -40,7 +45,8 @@ impl< + TenantManager + K8sclient + MultiTargetTopology - + PrometheusApplicationMonitoring + + PrometheusMonitoring + + Grafana + Ingress + std::fmt::Debug, > ApplicationFeature for Monitoring @@ -57,10 +63,20 @@ impl< .unwrap_or_else(|| self.application.name()); let domain = topology.get_domain("ntfy").await.unwrap(); + let app_service_monitor = ServiceMonitor { + metadata: ObjectMeta { + name: Some(self.application.name()), + namespace: Some(namespace.clone()), + ..Default::default() + }, + spec: ServiceMonitorSpec::default(), + }; + let mut alerting_score = ApplicationMonitoringScore { sender: CRDPrometheus { namespace: namespace.clone(), client: topology.k8s_client().await.unwrap(), + service_monitor: vec![app_service_monitor], }, application: self.application.clone(), receivers: self.alert_receiver.clone(), diff --git a/harmony/src/modules/application/features/rhob_monitoring.rs b/harmony/src/modules/application/features/rhob_monitoring.rs index d87ef61..876dba9 100644 --- a/harmony/src/modules/application/features/rhob_monitoring.rs +++ b/harmony/src/modules/application/features/rhob_monitoring.rs @@ -18,7 +18,7 @@ use crate::{ topology::{HelmCommand, K8sclient, Topology, tenant::TenantManager}, }; use crate::{ - modules::prometheus::prometheus::PrometheusApplicationMonitoring, + modules::prometheus::prometheus::PrometheusMonitoring, topology::oberservability::monitoring::AlertReceiver, }; use async_trait::async_trait; @@ -42,7 +42,7 @@ impl< + MultiTargetTopology + Ingress + std::fmt::Debug - + PrometheusApplicationMonitoring, + + PrometheusMonitoring, > ApplicationFeature for Monitoring { async fn ensure_installed( diff --git a/harmony/src/modules/cert_manager/cluster_issuer.rs b/harmony/src/modules/cert_manager/cluster_issuer.rs new file mode 100644 index 0000000..70294fe --- /dev/null +++ b/harmony/src/modules/cert_manager/cluster_issuer.rs @@ -0,0 +1,209 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use harmony_types::id::Id; +use kube::{CustomResource, api::ObjectMeta}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::{ + data::Version, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + score::Score, + topology::{K8sclient, Topology, k8s::K8sClient}, +}; + +#[derive(Clone, Debug, Serialize)] +pub struct ClusterIssuerScore { + email: String, + server: String, + issuer_name: String, + namespace: String, +} + +impl Score for ClusterIssuerScore { + fn name(&self) -> String { + "ClusterIssuerScore".to_string() + } + + #[doc(hidden)] + fn create_interpret(&self) -> Box> { + Box::new(ClusterIssuerInterpret { + score: self.clone(), + }) + } +} + +#[derive(Debug, Clone)] +pub struct ClusterIssuerInterpret { + score: ClusterIssuerScore, +} + +#[async_trait] +impl Interpret for ClusterIssuerInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + ) -> Result { + self.apply_cluster_issuer(topology.k8s_client().await.unwrap()) + .await + } + + fn get_name(&self) -> InterpretName { + InterpretName::Custom("ClusterIssuer") + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +} + +impl ClusterIssuerInterpret { + async fn validate_cert_manager( + &self, + client: &Arc, + ) -> Result { + let cert_manager = "cert-manager".to_string(); + let operator_namespace = "openshift-operators".to_string(); + match client + .get_deployment(&cert_manager, Some(&operator_namespace)) + .await + { + Ok(Some(deployment)) => { + if let Some(status) = deployment.status { + let ready_count = status.ready_replicas.unwrap_or(0); + if ready_count >= 1 { + return Ok(Outcome::success(format!( + "'{}' is ready with {} replica(s).", + &cert_manager, ready_count + ))); + } else { + return Err(InterpretError::new( + "cert-manager operator not ready in cluster".to_string(), + )); + } + } else { + Err(InterpretError::new(format!( + "failed to get deployment status {} in ns {}", + &cert_manager, &operator_namespace + ))) + } + } + Ok(None) => Err(InterpretError::new(format!( + "Deployment '{}' not found in namespace '{}'.", + &cert_manager, &operator_namespace + ))), + Err(e) => Err(InterpretError::new(format!( + "Failed to query for deployment '{}': {}", + &cert_manager, e + ))), + } + } + + fn build_cluster_issuer(&self) -> Result { + let issuer_name = &self.score.issuer_name; + let email = &self.score.email; + let server = &self.score.server; + let namespace = &self.score.namespace; + let cluster_issuer = ClusterIssuer { + metadata: ObjectMeta { + name: Some(issuer_name.to_string()), + namespace: Some(namespace.to_string()), + ..Default::default() + }, + spec: ClusterIssuerSpec { + acme: AcmeSpec { + email: email.to_string(), + private_key_secret_ref: PrivateKeySecretRef { + name: issuer_name.to_string(), + }, + server: server.to_string(), + solvers: vec![SolverSpec { + http01: Some(Http01Solver { + ingress: Http01Ingress { + class: "nginx".to_string(), + }, + }), + }], + }, + }, + }; + + Ok(cluster_issuer) + } + + pub async fn apply_cluster_issuer( + &self, + client: Arc, + ) -> Result { + let namespace = self.score.namespace.clone(); + self.validate_cert_manager(&client).await?; + let cluster_issuer = self.build_cluster_issuer().unwrap(); + client + .apply_yaml( + &serde_yaml::to_value(cluster_issuer).unwrap(), + Some(&namespace), + ) + .await?; + Ok(Outcome::success(format!( + "successfully deployed cluster operator: {} in namespace: {}", + self.score.issuer_name, self.score.namespace + ))) + } +} + +#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[kube( + group = "cert-manager.io", + version = "v1", + kind = "ClusterIssuer", + plural = "clusterissuers" +)] +#[serde(rename_all = "camelCase")] +pub struct ClusterIssuerSpec { + pub acme: AcmeSpec, +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct AcmeSpec { + pub email: String, + pub private_key_secret_ref: PrivateKeySecretRef, + pub server: String, + pub solvers: Vec, +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct PrivateKeySecretRef { + pub name: String, +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct SolverSpec { + pub http01: Option, + // Other solver types (e.g., dns01) would go here as Options +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct Http01Solver { + pub ingress: Http01Ingress, +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct Http01Ingress { + pub class: String, +} diff --git a/harmony/src/modules/cert_manager/mod.rs b/harmony/src/modules/cert_manager/mod.rs index 8fd309a..032439e 100644 --- a/harmony/src/modules/cert_manager/mod.rs +++ b/harmony/src/modules/cert_manager/mod.rs @@ -1,2 +1,3 @@ +pub mod cluster_issuer; mod helm; pub use helm::*; diff --git a/harmony/src/modules/monitoring/application_monitoring/application_monitoring_score.rs b/harmony/src/modules/monitoring/application_monitoring/application_monitoring_score.rs index 8246d15..8f6b624 100644 --- a/harmony/src/modules/monitoring/application_monitoring/application_monitoring_score.rs +++ b/harmony/src/modules/monitoring/application_monitoring/application_monitoring_score.rs @@ -1,21 +1,23 @@ use std::sync::Arc; -use async_trait::async_trait; +use log::debug; use serde::Serialize; use crate::{ - data::Version, - interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, - inventory::Inventory, + interpret::Interpret, modules::{ application::Application, - monitoring::kube_prometheus::crd::crd_alertmanager_config::CRDPrometheus, - prometheus::prometheus::PrometheusApplicationMonitoring, + monitoring::{ + grafana::grafana::Grafana, kube_prometheus::crd::crd_alertmanager_config::CRDPrometheus, + }, + prometheus::prometheus::PrometheusMonitoring, }, score::Score, - topology::{PreparationOutcome, Topology, oberservability::monitoring::AlertReceiver}, + topology::{ + K8sclient, Topology, + oberservability::monitoring::{AlertReceiver, AlertingInterpret, ScrapeTarget}, + }, }; -use harmony_types::id::Id; #[derive(Debug, Clone, Serialize)] pub struct ApplicationMonitoringScore { @@ -24,12 +26,16 @@ pub struct ApplicationMonitoringScore { pub receivers: Vec>>, } -impl> Score +impl + K8sclient + Grafana> Score for ApplicationMonitoringScore { fn create_interpret(&self) -> Box> { - Box::new(ApplicationMonitoringInterpret { - score: self.clone(), + debug!("creating alerting interpret"); + Box::new(AlertingInterpret { + sender: self.sender.clone(), + receivers: self.receivers.clone(), + rules: vec![], + scrape_targets: None, }) } @@ -40,55 +46,3 @@ impl> Score ) } } - -#[derive(Debug)] -pub struct ApplicationMonitoringInterpret { - score: ApplicationMonitoringScore, -} - -#[async_trait] -impl> Interpret - for ApplicationMonitoringInterpret -{ - async fn execute( - &self, - inventory: &Inventory, - topology: &T, - ) -> Result { - let result = topology - .install_prometheus( - &self.score.sender, - inventory, - Some(self.score.receivers.clone()), - ) - .await; - - match result { - Ok(outcome) => match outcome { - PreparationOutcome::Success { details: _ } => { - Ok(Outcome::success("Prometheus installed".into())) - } - PreparationOutcome::Noop => { - Ok(Outcome::noop("Prometheus installation skipped".into())) - } - }, - Err(err) => Err(InterpretError::from(err)), - } - } - - fn get_name(&self) -> InterpretName { - InterpretName::ApplicationMonitoring - } - - fn get_version(&self) -> Version { - todo!() - } - - fn get_status(&self) -> InterpretStatus { - todo!() - } - - fn get_children(&self) -> Vec { - todo!() - } -} diff --git a/harmony/src/modules/monitoring/application_monitoring/rhobs_application_monitoring_score.rs b/harmony/src/modules/monitoring/application_monitoring/rhobs_application_monitoring_score.rs index 5f5127f..6f45c88 100644 --- a/harmony/src/modules/monitoring/application_monitoring/rhobs_application_monitoring_score.rs +++ b/harmony/src/modules/monitoring/application_monitoring/rhobs_application_monitoring_score.rs @@ -12,7 +12,7 @@ use crate::{ monitoring::kube_prometheus::crd::{ crd_alertmanager_config::CRDPrometheus, rhob_alertmanager_config::RHOBObservability, }, - prometheus::prometheus::PrometheusApplicationMonitoring, + prometheus::prometheus::PrometheusMonitoring, }, score::Score, topology::{PreparationOutcome, Topology, oberservability::monitoring::AlertReceiver}, @@ -26,7 +26,7 @@ pub struct ApplicationRHOBMonitoringScore { pub receivers: Vec>>, } -impl> Score +impl> Score for ApplicationRHOBMonitoringScore { fn create_interpret(&self) -> Box> { @@ -49,7 +49,7 @@ pub struct ApplicationRHOBMonitoringInterpret { } #[async_trait] -impl> Interpret +impl> Interpret for ApplicationRHOBMonitoringInterpret { async fn execute( diff --git a/harmony/src/modules/monitoring/grafana/grafana.rs b/harmony/src/modules/monitoring/grafana/grafana.rs new file mode 100644 index 0000000..5ab57c2 --- /dev/null +++ b/harmony/src/modules/monitoring/grafana/grafana.rs @@ -0,0 +1,17 @@ +use async_trait::async_trait; +use k8s_openapi::Resource; + +use crate::{ + inventory::Inventory, + topology::{PreparationError, PreparationOutcome}, +}; + +#[async_trait] +pub trait Grafana { + async fn ensure_grafana_operator( + &self, + inventory: &Inventory, + ) -> Result; + + async fn install_grafana(&self) -> Result; +} diff --git a/harmony/src/modules/monitoring/grafana/helm/helm_grafana.rs b/harmony/src/modules/monitoring/grafana/helm/helm_grafana.rs index 3af6550..c9ccacb 100644 --- a/harmony/src/modules/monitoring/grafana/helm/helm_grafana.rs +++ b/harmony/src/modules/monitoring/grafana/helm/helm_grafana.rs @@ -1,27 +1,28 @@ +use harmony_macros::hurl; use non_blank_string_rs::NonBlankString; -use std::str::FromStr; +use std::{collections::HashMap, str::FromStr}; -use crate::modules::helm::chart::HelmChartScore; - -pub fn grafana_helm_chart_score(ns: &str) -> HelmChartScore { - let values = r#" -rbac: - namespaced: true -sidecar: - dashboards: - enabled: true - "# - .to_string(); +use crate::modules::helm::chart::{HelmChartScore, HelmRepository}; +pub fn grafana_helm_chart_score(ns: &str, namespace_scope: bool) -> HelmChartScore { + let mut values_overrides = HashMap::new(); + values_overrides.insert( + NonBlankString::from_str("namespaceScope").unwrap(), + namespace_scope.to_string(), + ); HelmChartScore { namespace: Some(NonBlankString::from_str(ns).unwrap()), - release_name: NonBlankString::from_str("grafana").unwrap(), - chart_name: NonBlankString::from_str("oci://ghcr.io/grafana/helm-charts/grafana").unwrap(), + release_name: NonBlankString::from_str("grafana-operator").unwrap(), + chart_name: NonBlankString::from_str("grafana/grafana-operator").unwrap(), chart_version: None, - values_overrides: None, - values_yaml: Some(values.to_string()), + values_overrides: Some(values_overrides), + values_yaml: None, create_namespace: true, install_only: true, - repository: None, + repository: Some(HelmRepository::new( + "grafana".to_string(), + hurl!("https://grafana.github.io/helm-charts"), + true, + )), } } diff --git a/harmony/src/modules/monitoring/grafana/mod.rs b/harmony/src/modules/monitoring/grafana/mod.rs index c821bcb..8dccab1 100644 --- a/harmony/src/modules/monitoring/grafana/mod.rs +++ b/harmony/src/modules/monitoring/grafana/mod.rs @@ -1 +1,2 @@ +pub mod grafana; pub mod helm; diff --git a/harmony/src/modules/monitoring/kube_prometheus/crd/crd_alertmanager_config.rs b/harmony/src/modules/monitoring/kube_prometheus/crd/crd_alertmanager_config.rs index 2165a4a..88ec745 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/crd/crd_alertmanager_config.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/crd/crd_alertmanager_config.rs @@ -1,12 +1,25 @@ use std::sync::Arc; +use async_trait::async_trait; use kube::CustomResource; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; -use crate::topology::{ - k8s::K8sClient, - oberservability::monitoring::{AlertReceiver, AlertSender}, +use crate::{ + interpret::{InterpretError, Outcome}, + inventory::Inventory, + modules::{ + monitoring::{ + grafana::grafana::Grafana, kube_prometheus::crd::service_monitor::ServiceMonitor, + }, + prometheus::prometheus::PrometheusMonitoring, + }, + topology::{ + K8sclient, Topology, + installable::Installable, + k8s::K8sClient, + oberservability::monitoring::{AlertReceiver, AlertSender, ScrapeTarget}, + }, }; #[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)] @@ -26,6 +39,7 @@ pub struct AlertmanagerConfigSpec { pub struct CRDPrometheus { pub namespace: String, pub client: Arc, + pub service_monitor: Vec, } impl AlertSender for CRDPrometheus { @@ -40,6 +54,12 @@ impl Clone for Box> { } } +impl Clone for Box> { + fn clone(&self) -> Self { + self.clone_box() + } +} + impl Serialize for Box> { fn serialize(&self, _serializer: S) -> Result where @@ -48,3 +68,24 @@ impl Serialize for Box> { todo!() } } + +#[async_trait] +impl + Grafana> Installable + for CRDPrometheus +{ + async fn configure(&self, inventory: &Inventory, topology: &T) -> Result<(), InterpretError> { + topology.ensure_grafana_operator(inventory).await?; + topology.ensure_prometheus_operator(self, inventory).await?; + Ok(()) + } + + async fn ensure_installed( + &self, + inventory: &Inventory, + topology: &T, + ) -> Result<(), InterpretError> { + topology.install_grafana().await?; + topology.install_prometheus(&self, inventory, None).await?; + Ok(()) + } +} diff --git a/harmony/src/modules/monitoring/kube_prometheus/crd/crd_grafana.rs b/harmony/src/modules/monitoring/kube_prometheus/crd/crd_grafana.rs index 793f639..386890e 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/crd/crd_grafana.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/crd/crd_grafana.rs @@ -103,9 +103,34 @@ pub struct GrafanaDashboardSpec { #[serde(default, skip_serializing_if = "Option::is_none")] pub resync_period: Option, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub datasources: Option>, + pub instance_selector: LabelSelector, - pub json: String, + #[serde(default, skip_serializing_if = "Option::is_none")] + pub json: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub grafana_com: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct GrafanaDashboardDatasource { + pub input_name: String, + pub datasource_name: String, +} + +// ------------------------------------------------------------------------------------------------ + +#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct GrafanaCom { + pub id: u32, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub revision: Option, } // ------------------------------------------------------------------------------------------------ @@ -126,20 +151,79 @@ pub struct GrafanaDatasourceSpec { pub allow_cross_namespace_import: Option, pub datasource: GrafanaDatasourceConfig, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub values_from: Option>, +} + +#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct GrafanaValueFrom { + pub target_path: String, + pub value_from: GrafanaValueSource, +} + +#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct GrafanaValueSource { + pub secret_key_ref: GrafanaSecretKeyRef, +} + +#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct GrafanaSecretKeyRef { + pub name: String, + pub key: String, } #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)] #[serde(rename_all = "camelCase")] pub struct GrafanaDatasourceConfig { pub access: String, - pub database: Option, #[serde(default, skip_serializing_if = "Option::is_none")] - pub json_data: Option>, + pub database: Option, pub name: String, pub r#type: String, pub url: String, + /// Represents jsonData in the GrafanaDatasource spec + #[serde(default, skip_serializing_if = "Option::is_none")] + pub json_data: Option, + + /// Represents secureJsonData (secrets) + #[serde(default, skip_serializing_if = "Option::is_none")] + pub secure_json_data: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub is_default: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub editable: Option, } +#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct GrafanaDatasourceJsonData { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub time_interval: Option, + + #[serde(default, skip_serializing_if = "Option::is_none")] + pub http_header_name1: Option, + + /// Disable TLS skip verification (false = verify) + #[serde(default, skip_serializing_if = "Option::is_none")] + pub tls_skip_verify: Option, + + /// Auth type - set to "forward" for OpenShift OAuth identity + #[serde(default, skip_serializing_if = "Option::is_none")] + pub oauth_pass_thru: Option, +} + +#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct GrafanaDatasourceSecureJsonData { + #[serde(default, skip_serializing_if = "Option::is_none")] + pub http_header_value1: Option, +} // ------------------------------------------------------------------------------------------------ #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, Default)] diff --git a/harmony/src/modules/monitoring/kube_prometheus/crd/crd_scrape_config.rs b/harmony/src/modules/monitoring/kube_prometheus/crd/crd_scrape_config.rs new file mode 100644 index 0000000..24a2833 --- /dev/null +++ b/harmony/src/modules/monitoring/kube_prometheus/crd/crd_scrape_config.rs @@ -0,0 +1,187 @@ +use std::net::IpAddr; + +use async_trait::async_trait; +use kube::CustomResource; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::{ + modules::monitoring::kube_prometheus::crd::{ + crd_alertmanager_config::CRDPrometheus, crd_prometheuses::LabelSelector, + }, + topology::oberservability::monitoring::ScrapeTarget, +}; + +#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)] +#[kube( + group = "monitoring.coreos.com", + version = "v1alpha1", + kind = "ScrapeConfig", + plural = "scrapeconfigs", + namespaced +)] +#[serde(rename_all = "camelCase")] +pub struct ScrapeConfigSpec { + /// List of static configurations. + pub static_configs: Option>, + + /// Kubernetes service discovery. + pub kubernetes_sd_configs: Option>, + + /// HTTP-based service discovery. + pub http_sd_configs: Option>, + + /// File-based service discovery. + pub file_sd_configs: Option>, + + /// DNS-based service discovery. + pub dns_sd_configs: Option>, + + /// Consul service discovery. + pub consul_sd_configs: Option>, + + /// Relabeling configuration applied to discovered targets. + pub relabel_configs: Option>, + + /// Metric relabeling configuration applied to scraped samples. + pub metric_relabel_configs: Option>, + + /// Path to scrape metrics from (defaults to `/metrics`). + pub metrics_path: Option, + + /// Interval at which Prometheus scrapes targets (e.g., "30s"). + pub scrape_interval: Option, + + /// Timeout for scraping (e.g., "10s"). + pub scrape_timeout: Option, + + /// Optional job name override. + pub job_name: Option, + + /// Optional scheme (http or https). + pub scheme: Option, + + /// Authorization paramaters for snmp walk + pub params: Option, +} + +/// Static configuration section of a ScrapeConfig. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct StaticConfig { + pub targets: Vec, + + pub labels: Option, +} + +/// Relabeling configuration for target or metric relabeling. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct RelabelConfig { + pub source_labels: Option>, + pub separator: Option, + pub target_label: Option, + pub regex: Option, + pub modulus: Option, + pub replacement: Option, + pub action: Option, +} + +/// Kubernetes service discovery configuration. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct KubernetesSDConfig { + ///"pod", "service", "endpoints"pub role: String, + pub namespaces: Option, + pub selectors: Option>, + pub api_server: Option, + pub bearer_token_file: Option, + pub tls_config: Option, +} + +/// Namespace selector for Kubernetes service discovery. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct NamespaceSelector { + pub any: Option, + pub match_names: Option>, +} + +/// HTTP-based service discovery configuration. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct HttpSDConfig { + pub url: String, + pub refresh_interval: Option, + pub basic_auth: Option, + pub authorization: Option, + pub tls_config: Option, +} + +/// File-based service discovery configuration. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct FileSDConfig { + pub files: Vec, + pub refresh_interval: Option, +} + +/// DNS-based service discovery configuration. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct DnsSDConfig { + pub names: Vec, + pub refresh_interval: Option, + pub type_: Option, // SRV, A, AAAA + pub port: Option, +} + +/// Consul service discovery configuration. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct ConsulSDConfig { + pub server: String, + pub services: Option>, + pub scheme: Option, + pub datacenter: Option, + pub tag_separator: Option, + pub refresh_interval: Option, + pub tls_config: Option, +} + +/// Basic authentication credentials. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct BasicAuth { + pub username: String, + pub password: Option, + pub password_file: Option, +} + +/// Bearer token or other auth mechanisms. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct Authorization { + pub credentials: Option, + pub credentials_file: Option, + pub type_: Option, +} + +/// TLS configuration for secure scraping. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct TLSConfig { + pub ca_file: Option, + pub cert_file: Option, + pub key_file: Option, + pub server_name: Option, + pub insecure_skip_verify: Option, +} + +/// Authorization parameters for SNMP walk. +#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct Params { + pub auth: Option>, + pub module: Option>, +} diff --git a/harmony/src/modules/monitoring/kube_prometheus/crd/mod.rs b/harmony/src/modules/monitoring/kube_prometheus/crd/mod.rs index 4dbea74..c8cb854 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/crd/mod.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/crd/mod.rs @@ -4,6 +4,7 @@ pub mod crd_default_rules; pub mod crd_grafana; pub mod crd_prometheus_rules; pub mod crd_prometheuses; +pub mod crd_scrape_config; pub mod grafana_default_dashboard; pub mod grafana_operator; pub mod prometheus_operator; diff --git a/harmony/src/modules/monitoring/kube_prometheus/helm_prometheus_alert_score.rs b/harmony/src/modules/monitoring/kube_prometheus/helm_prometheus_alert_score.rs index c9a0c04..468d308 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/helm_prometheus_alert_score.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/helm_prometheus_alert_score.rs @@ -31,6 +31,7 @@ impl Score for HelmPrometheusAlert sender: KubePrometheus { config }, receivers: self.receivers.clone(), rules: self.rules.clone(), + scrape_targets: None, }) } fn name(&self) -> String { diff --git a/harmony/src/modules/monitoring/mod.rs b/harmony/src/modules/monitoring/mod.rs index edda516..7f07d5a 100644 --- a/harmony/src/modules/monitoring/mod.rs +++ b/harmony/src/modules/monitoring/mod.rs @@ -6,3 +6,4 @@ pub mod kube_prometheus; pub mod ntfy; pub mod okd; pub mod prometheus; +pub mod scrape_target; diff --git a/harmony/src/modules/monitoring/prometheus/prometheus.rs b/harmony/src/modules/monitoring/prometheus/prometheus.rs index a207d5a..2fe0d06 100644 --- a/harmony/src/modules/monitoring/prometheus/prometheus.rs +++ b/harmony/src/modules/monitoring/prometheus/prometheus.rs @@ -114,7 +114,7 @@ impl Prometheus { }; if let Some(ns) = namespace.as_deref() { - grafana_helm_chart_score(ns) + grafana_helm_chart_score(ns, false) .interpret(inventory, topology) .await } else { diff --git a/harmony/src/modules/monitoring/scrape_target/mod.rs b/harmony/src/modules/monitoring/scrape_target/mod.rs new file mode 100644 index 0000000..74f47ad --- /dev/null +++ b/harmony/src/modules/monitoring/scrape_target/mod.rs @@ -0,0 +1 @@ +pub mod server; diff --git a/harmony/src/modules/monitoring/scrape_target/server.rs b/harmony/src/modules/monitoring/scrape_target/server.rs new file mode 100644 index 0000000..178e914 --- /dev/null +++ b/harmony/src/modules/monitoring/scrape_target/server.rs @@ -0,0 +1,80 @@ +use std::net::IpAddr; + +use async_trait::async_trait; +use kube::api::ObjectMeta; +use serde::Serialize; + +use crate::{ + interpret::{InterpretError, Outcome}, + modules::monitoring::kube_prometheus::crd::{ + crd_alertmanager_config::CRDPrometheus, + crd_scrape_config::{Params, RelabelConfig, ScrapeConfig, ScrapeConfigSpec, StaticConfig}, + }, + topology::oberservability::monitoring::ScrapeTarget, +}; + +#[derive(Debug, Clone, Serialize)] +pub struct Server { + pub name: String, + pub ip: IpAddr, + pub auth: String, + pub module: String, + pub domain: String, +} + +#[async_trait] +impl ScrapeTarget for Server { + async fn install(&self, sender: &CRDPrometheus) -> Result { + let scrape_config_spec = ScrapeConfigSpec { + static_configs: Some(vec![StaticConfig { + targets: vec![self.ip.to_string()], + labels: None, + }]), + scrape_interval: Some("2m".to_string()), + kubernetes_sd_configs: None, + http_sd_configs: None, + file_sd_configs: None, + dns_sd_configs: None, + params: Some(Params { + auth: Some(vec![self.auth.clone()]), + module: Some(vec![self.module.clone()]), + }), + consul_sd_configs: None, + relabel_configs: Some(vec![RelabelConfig { + action: None, + source_labels: Some(vec!["__address__".to_string()]), + separator: None, + target_label: Some("__param_target".to_string()), + regex: None, + replacement: Some(format!("snmp.{}:31080", self.domain.clone())), + modulus: None, + }]), + metric_relabel_configs: None, + metrics_path: Some("/snmp".to_string()), + scrape_timeout: Some("2m".to_string()), + job_name: Some(format!("snmp_exporter/cloud/{}", self.name.clone())), + scheme: None, + }; + + let scrape_config = ScrapeConfig { + metadata: ObjectMeta { + name: Some(self.name.clone()), + namespace: Some(sender.namespace.clone()), + ..Default::default() + }, + spec: scrape_config_spec, + }; + sender + .client + .apply(&scrape_config, Some(&sender.namespace.clone())) + .await?; + Ok(Outcome::success(format!( + "installed scrape target {}", + self.name.clone() + ))) + } + + fn clone_box(&self) -> Box> { + Box::new(self.clone()) + } +} diff --git a/harmony/src/modules/okd/bootstrap_load_balancer.rs b/harmony/src/modules/okd/bootstrap_load_balancer.rs index ccc69c9..e99fe97 100644 --- a/harmony/src/modules/okd/bootstrap_load_balancer.rs +++ b/harmony/src/modules/okd/bootstrap_load_balancer.rs @@ -77,6 +77,8 @@ impl OKDBootstrapLoadBalancerScore { address: topology.bootstrap_host.ip.to_string(), port, }); + + backend.dedup(); backend } } diff --git a/harmony/src/modules/prometheus/k8s_prometheus_alerting_score.rs b/harmony/src/modules/prometheus/k8s_prometheus_alerting_score.rs index 24ca918..7093ee8 100644 --- a/harmony/src/modules/prometheus/k8s_prometheus_alerting_score.rs +++ b/harmony/src/modules/prometheus/k8s_prometheus_alerting_score.rs @@ -12,7 +12,8 @@ use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::C use crate::modules::monitoring::kube_prometheus::crd::crd_default_rules::build_default_application_rules; use crate::modules::monitoring::kube_prometheus::crd::crd_grafana::{ Grafana, GrafanaDashboard, GrafanaDashboardSpec, GrafanaDatasource, GrafanaDatasourceConfig, - GrafanaDatasourceSpec, GrafanaSpec, + GrafanaDatasourceJsonData, GrafanaDatasourceSpec, GrafanaSecretKeyRef, GrafanaSpec, + GrafanaValueFrom, GrafanaValueSource, }; use crate::modules::monitoring::kube_prometheus::crd::crd_prometheus_rules::{ PrometheusRule, PrometheusRuleSpec, RuleGroup, @@ -39,7 +40,7 @@ use crate::{ }; use harmony_types::id::Id; -use super::prometheus::PrometheusApplicationMonitoring; +use super::prometheus::PrometheusMonitoring; #[derive(Clone, Debug, Serialize)] pub struct K8sPrometheusCRDAlertingScore { @@ -49,7 +50,7 @@ pub struct K8sPrometheusCRDAlertingScore { pub prometheus_rules: Vec, } -impl> Score +impl> Score for K8sPrometheusCRDAlertingScore { fn create_interpret(&self) -> Box> { @@ -75,7 +76,7 @@ pub struct K8sPrometheusCRDAlertingInterpret { } #[async_trait] -impl> Interpret +impl> Interpret for K8sPrometheusCRDAlertingInterpret { async fn execute( @@ -466,10 +467,13 @@ impl K8sPrometheusCRDAlertingInterpret { match_labels: label.clone(), match_expressions: vec![], }; - let mut json_data = BTreeMap::new(); - json_data.insert("timeInterval".to_string(), "5s".to_string()); let namespace = self.sender.namespace.clone(); - + let json_data = GrafanaDatasourceJsonData { + time_interval: Some("5s".to_string()), + http_header_name1: None, + tls_skip_verify: Some(true), + oauth_pass_thru: Some(true), + }; let json = build_default_dashboard(&namespace); let graf_data_source = GrafanaDatasource { @@ -495,7 +499,11 @@ impl K8sPrometheusCRDAlertingInterpret { "http://prometheus-operated.{}.svc.cluster.local:9090", self.sender.namespace.clone() ), + secure_json_data: None, + is_default: None, + editable: None, }, + values_from: None, }, }; @@ -516,7 +524,9 @@ impl K8sPrometheusCRDAlertingInterpret { spec: GrafanaDashboardSpec { resync_period: Some("30s".to_string()), instance_selector: labels.clone(), - json, + json: Some(json), + grafana_com: None, + datasources: None, }, }; diff --git a/harmony/src/modules/prometheus/prometheus.rs b/harmony/src/modules/prometheus/prometheus.rs index d3940c7..efb89da 100644 --- a/harmony/src/modules/prometheus/prometheus.rs +++ b/harmony/src/modules/prometheus/prometheus.rs @@ -9,11 +9,17 @@ use crate::{ }; #[async_trait] -pub trait PrometheusApplicationMonitoring { +pub trait PrometheusMonitoring { async fn install_prometheus( &self, sender: &S, inventory: &Inventory, receivers: Option>>>, ) -> Result; + + async fn ensure_prometheus_operator( + &self, + sender: &S, + inventory: &Inventory, + ) -> Result; } diff --git a/harmony/src/modules/prometheus/rhob_alerting_score.rs b/harmony/src/modules/prometheus/rhob_alerting_score.rs index 95908d5..644e6f9 100644 --- a/harmony/src/modules/prometheus/rhob_alerting_score.rs +++ b/harmony/src/modules/prometheus/rhob_alerting_score.rs @@ -38,7 +38,7 @@ use crate::{ }; use harmony_types::id::Id; -use super::prometheus::PrometheusApplicationMonitoring; +use super::prometheus::PrometheusMonitoring; #[derive(Clone, Debug, Serialize)] pub struct RHOBAlertingScore { @@ -48,8 +48,8 @@ pub struct RHOBAlertingScore { pub prometheus_rules: Vec, } -impl> - Score for RHOBAlertingScore +impl> Score + for RHOBAlertingScore { fn create_interpret(&self) -> Box> { Box::new(RHOBAlertingInterpret { @@ -74,8 +74,8 @@ pub struct RHOBAlertingInterpret { } #[async_trait] -impl> - Interpret for RHOBAlertingInterpret +impl> Interpret + for RHOBAlertingInterpret { async fn execute( &self, diff --git a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs b/harmony/src/modules/storage/ceph/ceph_remove_osd_score.rs similarity index 90% rename from harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs rename to harmony/src/modules/storage/ceph/ceph_remove_osd_score.rs index 77dd24a..787f9cc 100644 --- a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs +++ b/harmony/src/modules/storage/ceph/ceph_remove_osd_score.rs @@ -4,7 +4,7 @@ use std::{ }; use async_trait::async_trait; -use log::{info, warn}; +use log::{debug, warn}; use serde::{Deserialize, Serialize}; use tokio::time::sleep; @@ -19,8 +19,8 @@ use harmony_types::id::Id; #[derive(Debug, Clone, Serialize)] pub struct CephRemoveOsd { - osd_deployment_name: String, - rook_ceph_namespace: String, + pub osd_deployment_name: String, + pub rook_ceph_namespace: String, } impl Score for CephRemoveOsd { @@ -54,18 +54,17 @@ impl Interpret for CephRemoveOsdInterpret { self.verify_deployment_scaled(client.clone()).await?; self.delete_deployment(client.clone()).await?; self.verify_deployment_deleted(client.clone()).await?; - let osd_id_full = self.get_ceph_osd_id().unwrap(); - self.purge_ceph_osd(client.clone(), &osd_id_full).await?; - self.verify_ceph_osd_removal(client.clone(), &osd_id_full) - .await?; + self.purge_ceph_osd(client.clone()).await?; + self.verify_ceph_osd_removal(client.clone()).await?; + let osd_id_full = self.get_ceph_osd_id().unwrap(); Ok(Outcome::success(format!( "Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}", osd_id_full, self.score.osd_deployment_name ))) } fn get_name(&self) -> InterpretName { - todo!() + InterpretName::CephRemoveOsd } fn get_version(&self) -> Version { @@ -82,7 +81,7 @@ impl Interpret for CephRemoveOsdInterpret { } impl CephRemoveOsdInterpret { - pub fn get_ceph_osd_id(&self) -> Result { + pub fn get_ceph_osd_id_numeric(&self) -> Result { let osd_id_numeric = self .score .osd_deployment_name @@ -94,9 +93,14 @@ impl CephRemoveOsdInterpret { self.score.osd_deployment_name )) })?; + Ok(osd_id_numeric.to_string()) + } + + pub fn get_ceph_osd_id(&self) -> Result { + let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap(); let osd_id_full = format!("osd.{}", osd_id_numeric); - info!( + debug!( "Targeting Ceph OSD: {} (parsed from deployment {})", osd_id_full, self.score.osd_deployment_name ); @@ -108,6 +112,7 @@ impl CephRemoveOsdInterpret { &self, client: Arc, ) -> Result { + debug!("verifying toolbox exists"); let toolbox_dep = "rook-ceph-tools".to_string(); match client @@ -149,7 +154,7 @@ impl CephRemoveOsdInterpret { &self, client: Arc, ) -> Result { - info!( + debug!( "Scaling down OSD deployment: {}", self.score.osd_deployment_name ); @@ -172,7 +177,7 @@ impl CephRemoveOsdInterpret { ) -> Result { let (timeout, interval, start) = self.build_timer(); - info!("Waiting for OSD deployment to scale down to 0 replicas"); + debug!("Waiting for OSD deployment to scale down to 0 replicas"); loop { let dep = client .get_deployment( @@ -180,11 +185,9 @@ impl CephRemoveOsdInterpret { Some(&self.score.rook_ceph_namespace), ) .await?; - if let Some(deployment) = dep { if let Some(status) = deployment.status { - if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0 - { + if status.replicas == None && status.ready_replicas == None { return Ok(Outcome::success( "Deployment successfully scaled down.".to_string(), )); @@ -212,7 +215,7 @@ impl CephRemoveOsdInterpret { &self, client: Arc, ) -> Result { - info!( + debug!( "Deleting OSD deployment: {}", self.score.osd_deployment_name ); @@ -234,7 +237,7 @@ impl CephRemoveOsdInterpret { ) -> Result { let (timeout, interval, start) = self.build_timer(); - info!("Waiting for OSD deployment to scale down to 0 replicas"); + debug!("Verifying OSD deployment deleted"); loop { let dep = client .get_deployment( @@ -244,7 +247,7 @@ impl CephRemoveOsdInterpret { .await?; if dep.is_none() { - info!( + debug!( "Deployment {} successfully deleted.", self.score.osd_deployment_name ); @@ -276,12 +279,10 @@ impl CephRemoveOsdInterpret { Ok(tree) } - pub async fn purge_ceph_osd( - &self, - client: Arc, - osd_id_full: &str, - ) -> Result { - info!( + pub async fn purge_ceph_osd(&self, client: Arc) -> Result { + let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap(); + let osd_id_full = self.get_ceph_osd_id().unwrap(); + debug!( "Purging OSD {} from Ceph cluster and removing its auth key", osd_id_full ); @@ -291,8 +292,9 @@ impl CephRemoveOsdInterpret { "app".to_string(), Some(&self.score.rook_ceph_namespace), vec![ - format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(), - format!("ceph auth del osd.{osd_id_full}").as_str(), + "sh", + "-c", + format!("ceph osd purge {osd_id_numeric} --yes-i-really-mean-it && ceph auth del {osd_id_full}").as_str(), ], ) .await?; @@ -305,10 +307,10 @@ impl CephRemoveOsdInterpret { pub async fn verify_ceph_osd_removal( &self, client: Arc, - osd_id_full: &str, ) -> Result { let (timeout, interval, start) = self.build_timer(); - info!( + let osd_id_full = self.get_ceph_osd_id().unwrap(); + debug!( "Verifying OSD {} has been removed from the Ceph tree...", osd_id_full ); @@ -318,7 +320,7 @@ impl CephRemoveOsdInterpret { "rook-ceph-tools".to_string(), "app".to_string(), Some(&self.score.rook_ceph_namespace), - vec!["ceph osd tree -f json"], + vec!["sh", "-c", "ceph osd tree -f json"], ) .await?; let tree = diff --git a/harmony/src/modules/storage/ceph/mod.rs b/harmony/src/modules/storage/ceph/mod.rs index 3e3250e..0a3dcec 100644 --- a/harmony/src/modules/storage/ceph/mod.rs +++ b/harmony/src/modules/storage/ceph/mod.rs @@ -1,2 +1,2 @@ -pub mod ceph_osd_replacement_score; +pub mod ceph_remove_osd_score; pub mod ceph_validate_health_score; diff --git a/opnsense-config-xml/src/data/haproxy.rs b/opnsense-config-xml/src/data/haproxy.rs index ef631f3..b0aedc2 100644 --- a/opnsense-config-xml/src/data/haproxy.rs +++ b/opnsense-config-xml/src/data/haproxy.rs @@ -77,7 +77,7 @@ impl YaSerializeTrait for HAProxyId { } } -#[derive(PartialEq, Debug)] +#[derive(PartialEq, Debug, Clone)] pub struct HAProxyId(String); impl Default for HAProxyId { @@ -297,7 +297,7 @@ pub struct HAProxyFrontends { pub frontend: Vec, } -#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)] +#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)] pub struct Frontend { #[yaserde(attribute = true)] pub uuid: String, @@ -310,7 +310,7 @@ pub struct Frontend { pub bind_options: MaybeString, pub mode: String, #[yaserde(rename = "defaultBackend")] - pub default_backend: String, + pub default_backend: Option, pub ssl_enabled: i32, pub ssl_certificates: MaybeString, pub ssl_default_certificate: MaybeString, @@ -416,7 +416,7 @@ pub struct HAProxyBackends { pub backends: Vec, } -#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)] +#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)] pub struct HAProxyBackend { #[yaserde(attribute = true, rename = "uuid")] pub uuid: String, @@ -535,7 +535,7 @@ pub struct HAProxyServers { pub servers: Vec, } -#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)] +#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)] pub struct HAProxyServer { #[yaserde(attribute = true, rename = "uuid")] pub uuid: String, @@ -543,8 +543,8 @@ pub struct HAProxyServer { pub enabled: u8, pub name: String, pub description: MaybeString, - pub address: String, - pub port: u16, + pub address: Option, + pub port: Option, pub checkport: MaybeString, pub mode: String, pub multiplexer_protocol: MaybeString, @@ -589,7 +589,7 @@ pub struct HAProxyHealthChecks { pub healthchecks: Vec, } -#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)] +#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)] pub struct HAProxyHealthCheck { #[yaserde(attribute = true)] pub uuid: String, diff --git a/opnsense-config/Cargo.toml b/opnsense-config/Cargo.toml index 0580cb2..bb682df 100644 --- a/opnsense-config/Cargo.toml +++ b/opnsense-config/Cargo.toml @@ -25,6 +25,7 @@ sha2 = "0.10.9" [dev-dependencies] pretty_assertions.workspace = true +assertor.workspace = true [lints.rust] unexpected_cfgs = { level = "warn", check-cfg = ['cfg(e2e_test)'] } diff --git a/opnsense-config/src/config/manager/ssh.rs b/opnsense-config/src/config/manager/ssh.rs index 4b2fe64..afe232f 100644 --- a/opnsense-config/src/config/manager/ssh.rs +++ b/opnsense-config/src/config/manager/ssh.rs @@ -30,8 +30,7 @@ impl SshConfigManager { self.opnsense_shell .exec(&format!( - "cp /conf/config.xml /conf/backup/{}", - backup_filename + "cp /conf/config.xml /conf/backup/{backup_filename}" )) .await } diff --git a/opnsense-config/src/config/shell/mod.rs b/opnsense-config/src/config/shell/mod.rs index aa03837..aa94ff1 100644 --- a/opnsense-config/src/config/shell/mod.rs +++ b/opnsense-config/src/config/shell/mod.rs @@ -1,9 +1,7 @@ mod ssh; -pub use ssh::*; - -use async_trait::async_trait; - use crate::Error; +use async_trait::async_trait; +pub use ssh::*; #[async_trait] pub trait OPNsenseShell: std::fmt::Debug + Send + Sync { diff --git a/opnsense-config/src/modules/load_balancer.rs b/opnsense-config/src/modules/load_balancer.rs index 6c71ed4..00cb364 100644 --- a/opnsense-config/src/modules/load_balancer.rs +++ b/opnsense-config/src/modules/load_balancer.rs @@ -1,11 +1,8 @@ -use std::sync::Arc; - -use log::warn; +use crate::{config::OPNsenseShell, Error}; use opnsense_config_xml::{ Frontend, HAProxy, HAProxyBackend, HAProxyHealthCheck, HAProxyServer, OPNsense, }; - -use crate::{config::OPNsenseShell, Error}; +use std::{collections::HashSet, sync::Arc}; pub struct LoadBalancerConfig<'a> { opnsense: &'a mut OPNsense, @@ -31,7 +28,7 @@ impl<'a> LoadBalancerConfig<'a> { match &mut self.opnsense.opnsense.haproxy.as_mut() { Some(haproxy) => f(haproxy), None => unimplemented!( - "Adding a backend is not supported when haproxy config does not exist yet" + "Cannot configure load balancer when haproxy config does not exist yet" ), } } @@ -40,21 +37,67 @@ impl<'a> LoadBalancerConfig<'a> { self.with_haproxy(|haproxy| haproxy.general.enabled = enabled as i32); } - pub fn add_backend(&mut self, backend: HAProxyBackend) { - warn!("TODO make sure this new backend does not refer non-existing entities like servers or health checks"); - self.with_haproxy(|haproxy| haproxy.backends.backends.push(backend)); + /// Configures a service by removing any existing service on the same port + /// and then adding the new definition. This ensures idempotency. + pub fn configure_service( + &mut self, + frontend: Frontend, + backend: HAProxyBackend, + servers: Vec, + healthcheck: Option, + ) { + self.remove_service_by_bind_address(&frontend.bind); + self.remove_servers(&servers); + + self.add_new_service(frontend, backend, servers, healthcheck); } - pub fn add_frontend(&mut self, frontend: Frontend) { - self.with_haproxy(|haproxy| haproxy.frontends.frontend.push(frontend)); + // Remove the corresponding real servers based on their name if they already exist. + fn remove_servers(&mut self, servers: &[HAProxyServer]) { + let server_names: HashSet<_> = servers.iter().map(|s| s.name.clone()).collect(); + self.with_haproxy(|haproxy| { + haproxy + .servers + .servers + .retain(|s| !server_names.contains(&s.name)); + }); } - pub fn add_healthcheck(&mut self, healthcheck: HAProxyHealthCheck) { - self.with_haproxy(|haproxy| haproxy.healthchecks.healthchecks.push(healthcheck)); + /// Removes a service and its dependent components based on the frontend's bind address. + /// This performs a cascading delete of the frontend, backend, servers, and health check. + fn remove_service_by_bind_address(&mut self, bind_address: &str) { + self.with_haproxy(|haproxy| { + let Some(old_frontend) = remove_frontend_by_bind_address(haproxy, bind_address) else { + return; + }; + + let Some(old_backend) = remove_backend(haproxy, old_frontend) else { + return; + }; + + remove_healthcheck(haproxy, &old_backend); + remove_linked_servers(haproxy, &old_backend); + }); } - pub fn add_servers(&mut self, mut servers: Vec) { - self.with_haproxy(|haproxy| haproxy.servers.servers.append(&mut servers)); + /// Adds the components of a new service to the HAProxy configuration. + /// This function de-duplicates servers by name to prevent configuration errors. + fn add_new_service( + &mut self, + frontend: Frontend, + backend: HAProxyBackend, + servers: Vec, + healthcheck: Option, + ) { + self.with_haproxy(|haproxy| { + if let Some(check) = healthcheck { + haproxy.healthchecks.healthchecks.push(check); + } + + haproxy.servers.servers.extend(servers); + haproxy.backends.backends.push(backend); + haproxy.frontends.frontend.push(frontend); + }); } pub async fn reload_restart(&self) -> Result<(), Error> { @@ -82,3 +125,262 @@ impl<'a> LoadBalancerConfig<'a> { Ok(()) } } + +fn remove_frontend_by_bind_address(haproxy: &mut HAProxy, bind_address: &str) -> Option { + let pos = haproxy + .frontends + .frontend + .iter() + .position(|f| f.bind == bind_address); + + match pos { + Some(pos) => Some(haproxy.frontends.frontend.remove(pos)), + None => None, + } +} + +fn remove_backend(haproxy: &mut HAProxy, old_frontend: Frontend) -> Option { + let default_backend = old_frontend.default_backend?; + let pos = haproxy + .backends + .backends + .iter() + .position(|b| b.uuid == default_backend); + + match pos { + Some(pos) => Some(haproxy.backends.backends.remove(pos)), + None => None, // orphaned frontend, shouldn't happen + } +} + +fn remove_healthcheck(haproxy: &mut HAProxy, backend: &HAProxyBackend) { + if let Some(uuid) = &backend.health_check.content { + haproxy + .healthchecks + .healthchecks + .retain(|h| h.uuid != *uuid); + } +} + +/// Remove the backend's servers. This assumes servers are not shared between services. +fn remove_linked_servers(haproxy: &mut HAProxy, backend: &HAProxyBackend) { + if let Some(server_uuids_str) = &backend.linked_servers.content { + let server_uuids_to_remove: HashSet<_> = server_uuids_str.split(',').collect(); + haproxy + .servers + .servers + .retain(|s| !server_uuids_to_remove.contains(s.uuid.as_str())); + } +} + +#[cfg(test)] +mod tests { + use crate::config::DummyOPNSenseShell; + use assertor::*; + use opnsense_config_xml::{ + Frontend, HAProxy, HAProxyBackend, HAProxyBackends, HAProxyFrontends, HAProxyHealthCheck, + HAProxyHealthChecks, HAProxyId, HAProxyServer, HAProxyServers, MaybeString, OPNsense, + }; + use std::sync::Arc; + + use super::LoadBalancerConfig; + + static SERVICE_BIND_ADDRESS: &str = "192.168.1.1:80"; + static OTHER_SERVICE_BIND_ADDRESS: &str = "192.168.1.1:443"; + + static SERVER_ADDRESS: &str = "1.1.1.1:80"; + static OTHER_SERVER_ADDRESS: &str = "1.1.1.1:443"; + + #[test] + fn configure_service_should_add_all_service_components_to_haproxy() { + let mut opnsense = given_opnsense(); + let mut load_balancer = given_load_balancer(&mut opnsense); + let (healthcheck, servers, backend, frontend) = + given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS); + + load_balancer.configure_service( + frontend.clone(), + backend.clone(), + servers.clone(), + Some(healthcheck.clone()), + ); + + assert_haproxy_configured_with( + opnsense, + vec![frontend], + vec![backend], + servers, + vec![healthcheck], + ); + } + + #[test] + fn configure_service_should_replace_service_on_same_bind_address() { + let (healthcheck, servers, backend, frontend) = + given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS); + let mut opnsense = given_opnsense_with(given_haproxy( + vec![frontend.clone()], + vec![backend.clone()], + servers.clone(), + vec![healthcheck.clone()], + )); + let mut load_balancer = given_load_balancer(&mut opnsense); + + let (updated_healthcheck, updated_servers, updated_backend, updated_frontend) = + given_service(SERVICE_BIND_ADDRESS, OTHER_SERVER_ADDRESS); + + load_balancer.configure_service( + updated_frontend.clone(), + updated_backend.clone(), + updated_servers.clone(), + Some(updated_healthcheck.clone()), + ); + + assert_haproxy_configured_with( + opnsense, + vec![updated_frontend], + vec![updated_backend], + updated_servers, + vec![updated_healthcheck], + ); + } + + #[test] + fn configure_service_should_keep_existing_service_on_different_bind_addresses() { + let (healthcheck, servers, backend, frontend) = + given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS); + let (other_healthcheck, other_servers, other_backend, other_frontend) = + given_service(OTHER_SERVICE_BIND_ADDRESS, OTHER_SERVER_ADDRESS); + let mut opnsense = given_opnsense_with(given_haproxy( + vec![frontend.clone()], + vec![backend.clone()], + servers.clone(), + vec![healthcheck.clone()], + )); + let mut load_balancer = given_load_balancer(&mut opnsense); + + load_balancer.configure_service( + other_frontend.clone(), + other_backend.clone(), + other_servers.clone(), + Some(other_healthcheck.clone()), + ); + + assert_haproxy_configured_with( + opnsense, + vec![frontend, other_frontend], + vec![backend, other_backend], + [servers, other_servers].concat(), + vec![healthcheck, other_healthcheck], + ); + } + + fn assert_haproxy_configured_with( + opnsense: OPNsense, + frontends: Vec, + backends: Vec, + servers: Vec, + healthchecks: Vec, + ) { + let haproxy = opnsense.opnsense.haproxy.as_ref().unwrap(); + assert_that!(haproxy.frontends.frontend).contains_exactly(frontends); + assert_that!(haproxy.backends.backends).contains_exactly(backends); + assert_that!(haproxy.servers.servers).is_equal_to(servers); + assert_that!(haproxy.healthchecks.healthchecks).contains_exactly(healthchecks); + } + + fn given_opnsense() -> OPNsense { + OPNsense::default() + } + + fn given_opnsense_with(haproxy: HAProxy) -> OPNsense { + let mut opnsense = OPNsense::default(); + opnsense.opnsense.haproxy = Some(haproxy); + + opnsense + } + + fn given_load_balancer<'a>(opnsense: &'a mut OPNsense) -> LoadBalancerConfig<'a> { + let opnsense_shell = Arc::new(DummyOPNSenseShell {}); + if opnsense.opnsense.haproxy.is_none() { + opnsense.opnsense.haproxy = Some(HAProxy::default()); + } + LoadBalancerConfig::new(opnsense, opnsense_shell) + } + + fn given_service( + bind_address: &str, + server_address: &str, + ) -> ( + HAProxyHealthCheck, + Vec, + HAProxyBackend, + Frontend, + ) { + let healthcheck = given_healthcheck(); + let servers = vec![given_server(server_address)]; + let backend = given_backend(); + let frontend = given_frontend(bind_address); + (healthcheck, servers, backend, frontend) + } + + fn given_haproxy( + frontends: Vec, + backends: Vec, + servers: Vec, + healthchecks: Vec, + ) -> HAProxy { + HAProxy { + frontends: HAProxyFrontends { + frontend: frontends, + }, + backends: HAProxyBackends { backends }, + servers: HAProxyServers { servers }, + healthchecks: HAProxyHealthChecks { healthchecks }, + ..Default::default() + } + } + + fn given_frontend(bind_address: &str) -> Frontend { + Frontend { + uuid: "uuid".into(), + id: HAProxyId::default(), + enabled: 1, + name: format!("frontend_{bind_address}"), + bind: bind_address.into(), + default_backend: Some("backend-uuid".into()), + ..Default::default() + } + } + + fn given_backend() -> HAProxyBackend { + HAProxyBackend { + uuid: "backend-uuid".into(), + id: HAProxyId::default(), + enabled: 1, + name: "backend_192.168.1.1:80".into(), + linked_servers: MaybeString::from("server-uuid"), + health_check_enabled: 1, + health_check: MaybeString::from("healthcheck-uuid"), + ..Default::default() + } + } + + fn given_server(address: &str) -> HAProxyServer { + HAProxyServer { + uuid: "server-uuid".into(), + id: HAProxyId::default(), + name: address.into(), + address: Some(address.into()), + ..Default::default() + } + } + + fn given_healthcheck() -> HAProxyHealthCheck { + HAProxyHealthCheck { + uuid: "healthcheck-uuid".into(), + name: "healthcheck".into(), + ..Default::default() + } + } +}