From f437c404287707c0e77214dad5d7d583a35ccb47 Mon Sep 17 00:00:00 2001 From: Willem Date: Tue, 24 Jun 2025 18:54:15 +0000 Subject: [PATCH] impl_monitoring_alerting_kube_prometheus (#64) Co-authored-by: tahahawa Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/64 Co-authored-by: Willem Co-committed-by: Willem --- Cargo.lock | 1 + examples/lamp/src/main.rs | 7 +- examples/monitoring/Cargo.toml | 1 + examples/monitoring/src/main.rs | 24 ++-- harmony/src/domain/topology/installable.rs | 10 +- .../topology/oberservability/monitoring.rs | 18 ++- .../alert_channel/discord_alert_channel.rs | 115 +++++++++++++++++- .../monitoring/kube_prometheus/helm/config.rs | 4 + .../helm/kube_prometheus_helm_chart.rs | 79 ++++++++++-- .../helm_prometheus_alert_score.rs | 18 +-- .../monitoring/kube_prometheus/prometheus.rs | 84 ++++++++++--- .../monitoring/kube_prometheus/types.rs | 44 +++++-- 12 files changed, 333 insertions(+), 72 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 0ee6318..18f8abf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1161,6 +1161,7 @@ dependencies = [ "harmony", "harmony_cli", "tokio", + "url", ] [[package]] diff --git a/examples/lamp/src/main.rs b/examples/lamp/src/main.rs index 0b1f93c..276e255 100644 --- a/examples/lamp/src/main.rs +++ b/examples/lamp/src/main.rs @@ -2,10 +2,7 @@ use harmony::{ data::Version, inventory::Inventory, maestro::Maestro, - modules::{ - lamp::{LAMPConfig, LAMPScore}, - monitoring::alert_channel::discord_alert_channel::DiscordWebhook, - }, + modules::lamp::{LAMPConfig, LAMPScore}, topology::{K8sAnywhereTopology, Url}, }; @@ -46,7 +43,7 @@ async fn main() { // K8sAnywhereTopology as it is the most automatic one that enables you to easily deploy // locally, to development environment from a CI, to staging, and to production with settings // that automatically adapt to each environment grade. - let mut maestro = Maestro::::initialize( + let maestro = Maestro::::initialize( Inventory::autoload(), K8sAnywhereTopology::from_env(), ) diff --git a/examples/monitoring/Cargo.toml b/examples/monitoring/Cargo.toml index af42491..d188b78 100644 --- a/examples/monitoring/Cargo.toml +++ b/examples/monitoring/Cargo.toml @@ -9,3 +9,4 @@ license.workspace = true harmony = { version = "0.1.0", path = "../../harmony" } harmony_cli = { version = "0.1.0", path = "../../harmony_cli" } tokio.workspace = true +url.workspace = true diff --git a/examples/monitoring/src/main.rs b/examples/monitoring/src/main.rs index d52c649..c0fcf33 100644 --- a/examples/monitoring/src/main.rs +++ b/examples/monitoring/src/main.rs @@ -1,12 +1,22 @@ use harmony::{ - inventory::Inventory, maestro::Maestro, - modules::monitoring::kube_prometheus::helm_prometheus_alert_score::HelmPrometheusAlertingScore, - topology::K8sAnywhereTopology, + inventory::Inventory, + maestro::Maestro, + modules::monitoring::{ + alert_channel::discord_alert_channel::DiscordWebhook, + kube_prometheus::helm_prometheus_alert_score::HelmPrometheusAlertingScore, + }, + topology::{K8sAnywhereTopology, Url}, }; #[tokio::main] async fn main() { - let alerting_score = HelmPrometheusAlertingScore { receivers: vec![] }; + let discord_receiver = DiscordWebhook { + name: "test-discord".to_string(), + url: Url::Url(url::Url::parse("discord.doesnt.exist.com").unwrap()), + }; + let alerting_score = HelmPrometheusAlertingScore { + receivers: vec![Box::new(discord_receiver)], + }; let mut maestro = Maestro::::initialize( Inventory::autoload(), K8sAnywhereTopology::from_env(), @@ -14,12 +24,6 @@ async fn main() { .await .unwrap(); - //let monitoring = MonitoringAlertingScore { - // alert_receivers: vec![], - // alert_rules: vec![], - // scrape_targets: vec![], - //}; - //maestro.register_all(vec![Box::new(monitoring)]); maestro.register_all(vec![Box::new(alerting_score)]); harmony_cli::init(maestro, None).await.unwrap(); } diff --git a/harmony/src/domain/topology/installable.rs b/harmony/src/domain/topology/installable.rs index 9b9054f..8d8178c 100644 --- a/harmony/src/domain/topology/installable.rs +++ b/harmony/src/domain/topology/installable.rs @@ -1,8 +1,12 @@ use async_trait::async_trait; -use crate::interpret::InterpretError; +use crate::{interpret::InterpretError, inventory::Inventory}; #[async_trait] -pub trait Installable { - async fn ensure_installed(&self) -> Result<(), InterpretError>; +pub trait Installable: Send + Sync { + async fn ensure_installed( + &self, + inventory: &Inventory, + topology: &T, + ) -> Result<(), InterpretError>; } diff --git a/harmony/src/domain/topology/oberservability/monitoring.rs b/harmony/src/domain/topology/oberservability/monitoring.rs index a3a6164..7d65bf2 100644 --- a/harmony/src/domain/topology/oberservability/monitoring.rs +++ b/harmony/src/domain/topology/oberservability/monitoring.rs @@ -4,10 +4,13 @@ use crate::{ data::{Id, Version}, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, - topology::{Topology, installable::Installable}, + topology::{HelmCommand, Topology, installable::Installable}, }; -pub trait AlertSender: Send + Sync + std::fmt::Debug + Installable {} +#[async_trait] +pub trait AlertSender: Send + Sync + std::fmt::Debug { + fn name(&self) -> String; +} #[derive(Debug)] pub struct AlertingInterpret { @@ -16,7 +19,7 @@ pub struct AlertingInterpret { } #[async_trait] -impl Interpret for AlertingInterpret { +impl, T: Topology> Interpret for AlertingInterpret { async fn execute( &self, inventory: &Inventory, @@ -25,7 +28,11 @@ impl Interpret for AlertingInterpret { for receiver in self.receivers.iter() { receiver.install(&self.sender).await?; } - todo!() + self.sender.ensure_installed(inventory, topology).await?; + Ok(Outcome::success(format!( + "successfully installed alert sender {}", + self.sender.name() + ))) } fn get_name(&self) -> InterpretName { @@ -47,7 +54,8 @@ impl Interpret for AlertingInterpret { #[async_trait] pub trait AlertReceiver: std::fmt::Debug + Send + Sync { - async fn install(&self, sender: &S) -> Result<(), InterpretError>; + async fn install(&self, sender: &S) -> Result; + fn clone_box(&self) -> Box>; } #[async_trait] diff --git a/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs b/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs index 42f4450..fb85a98 100644 --- a/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs +++ b/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs @@ -1,12 +1,17 @@ use async_trait::async_trait; +use serde::Serialize; +use serde_yaml::{Mapping, Value}; use crate::{ - interpret::InterpretError, - modules::monitoring::kube_prometheus::prometheus::{Prometheus, PrometheusReceiver}, + interpret::{InterpretError, Outcome}, + modules::monitoring::kube_prometheus::{ + prometheus::{Prometheus, PrometheusReceiver}, + types::{AlertChannelConfig, AlertManagerChannelConfig}, + }, topology::{Url, oberservability::monitoring::AlertReceiver}, }; -#[derive(Debug)] +#[derive(Debug, Clone, Serialize)] pub struct DiscordWebhook { pub name: String, pub url: Url, @@ -14,7 +19,107 @@ pub struct DiscordWebhook { #[async_trait] impl AlertReceiver for DiscordWebhook { - async fn install(&self, sender: &Prometheus) -> Result<(), InterpretError> { - sender.install_receiver(PrometheusReceiver {}).await + async fn install(&self, sender: &Prometheus) -> Result { + sender.install_receiver(self).await + } + fn clone_box(&self) -> Box> { + Box::new(self.clone()) + } +} + +#[async_trait] +impl PrometheusReceiver for DiscordWebhook { + fn name(&self) -> String { + self.name.clone() + } + async fn configure_receiver(&self) -> AlertManagerChannelConfig { + self.get_config().await + } +} + +#[async_trait] +impl AlertChannelConfig for DiscordWebhook { + async fn get_config(&self) -> AlertManagerChannelConfig { + let channel_global_config = None; + let channel_receiver = self.alert_channel_receiver().await; + let channel_route = self.alert_channel_route().await; + + AlertManagerChannelConfig { + channel_global_config, + channel_receiver, + channel_route, + } + } +} + +impl DiscordWebhook { + async fn alert_channel_route(&self) -> serde_yaml::Value { + let mut route = Mapping::new(); + route.insert( + Value::String("receiver".to_string()), + Value::String(self.name.clone()), + ); + route.insert( + Value::String("matchers".to_string()), + Value::Sequence(vec![Value::String("alertname!=Watchdog".to_string())]), + ); + route.insert(Value::String("continue".to_string()), Value::Bool(true)); + Value::Mapping(route) + } + + async fn alert_channel_receiver(&self) -> serde_yaml::Value { + let mut receiver = Mapping::new(); + receiver.insert( + Value::String("name".to_string()), + Value::String(self.name.clone()), + ); + + let mut discord_config = Mapping::new(); + discord_config.insert( + Value::String("webhook_url".to_string()), + Value::String(self.url.to_string()), + ); + + receiver.insert( + Value::String("discord_configs".to_string()), + Value::Sequence(vec![Value::Mapping(discord_config)]), + ); + + Value::Mapping(receiver) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[tokio::test] + async fn discord_serialize_should_match() { + let discord_receiver = DiscordWebhook { + name: "test-discord".to_string(), + url: Url::Url(url::Url::parse("https://discord.i.dont.exist.com").unwrap()), + }; + + let discord_receiver_receiver = + serde_yaml::to_string(&discord_receiver.alert_channel_receiver().await).unwrap(); + println!("receiver \n{:#}", discord_receiver_receiver); + let discord_receiver_receiver_yaml = r#"name: test-discord +discord_configs: +- webhook_url: https://discord.i.dont.exist.com/ +"# + .to_string(); + + let discord_receiver_route = + serde_yaml::to_string(&discord_receiver.alert_channel_route().await).unwrap(); + println!("route \n{:#}", discord_receiver_route); + let discord_receiver_route_yaml = r#"receiver: test-discord +matchers: +- alertname!=Watchdog +continue: true +"# + .to_string(); + + assert_eq!(discord_receiver_receiver, discord_receiver_receiver_yaml); + assert_eq!(discord_receiver_route, discord_receiver_route_yaml); } } diff --git a/harmony/src/modules/monitoring/kube_prometheus/helm/config.rs b/harmony/src/modules/monitoring/kube_prometheus/helm/config.rs index 0e62c0f..741cd1b 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/helm/config.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/helm/config.rs @@ -1,5 +1,7 @@ use serde::Serialize; +use crate::modules::monitoring::kube_prometheus::types::AlertManagerChannelConfig; + #[derive(Debug, Clone, Serialize)] pub struct KubePrometheusConfig { pub namespace: String, @@ -19,6 +21,7 @@ pub struct KubePrometheusConfig { pub kube_proxy: bool, pub kube_state_metrics: bool, pub prometheus_operator: bool, + pub alert_receiver_configs: Vec, } impl KubePrometheusConfig { pub fn new() -> Self { @@ -40,6 +43,7 @@ impl KubePrometheusConfig { prometheus_operator: true, core_dns: false, kube_scheduler: false, + alert_receiver_configs: vec![], } } } diff --git a/harmony/src/modules/monitoring/kube_prometheus/helm/kube_prometheus_helm_chart.rs b/harmony/src/modules/monitoring/kube_prometheus/helm/kube_prometheus_helm_chart.rs index 2377627..94440c0 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/helm/kube_prometheus_helm_chart.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/helm/kube_prometheus_helm_chart.rs @@ -1,21 +1,32 @@ use super::config::KubePrometheusConfig; +use log::debug; use non_blank_string_rs::NonBlankString; -use std::str::FromStr; +use serde_yaml::{Mapping, Value}; +use std::{ + str::FromStr, + sync::{Arc, Mutex}, +}; -use crate::modules::helm::chart::HelmChartScore; - -pub fn kube_prometheus_helm_chart_score() -> HelmChartScore { - let config = KubePrometheusConfig::new(); +use crate::modules::{ + helm::chart::HelmChartScore, + monitoring::kube_prometheus::types::{ + AlertManager, AlertManagerConfig, AlertManagerRoute, AlertManagerValues, + }, +}; +pub fn kube_prometheus_helm_chart_score( + config: Arc>, +) -> HelmChartScore { + let config = config.lock().unwrap(); //TODO this should be make into a rule with default formatting that can be easily passed as a vec //to the overrides or something leaving the user to deal with formatting here seems bad let default_rules = config.default_rules.to_string(); let windows_monitoring = config.windows_monitoring.to_string(); - let alert_manager = config.alert_manager.to_string(); let grafana = config.grafana.to_string(); let kubernetes_service_monitors = config.kubernetes_service_monitors.to_string(); let kubernetes_api_server = config.kubernetes_api_server.to_string(); let kubelet = config.kubelet.to_string(); + let alert_manager = config.alert_manager.to_string(); let kube_controller_manager = config.kube_controller_manager.to_string(); let core_dns = config.core_dns.to_string(); let kube_etcd = config.kube_etcd.to_string(); @@ -25,7 +36,7 @@ pub fn kube_prometheus_helm_chart_score() -> HelmChartScore { let node_exporter = config.node_exporter.to_string(); let prometheus_operator = config.prometheus_operator.to_string(); let prometheus = config.prometheus.to_string(); - let values = format!( + let mut values = format!( r#" additionalPrometheusRulesMap: pods-status-alerts: @@ -62,14 +73,14 @@ additionalPrometheusRulesMap: - alert: 'PVC Fill Over 95 Percent In 2 Days' expr: | ( - kubelet_volume_stats_used_bytes - / + kubelet_volume_stats_used_bytes + / kubelet_volume_stats_capacity_bytes ) > 0.95 AND predict_linear(kubelet_volume_stats_used_bytes[2d], 2 * 24 * 60 * 60) - / - kubelet_volume_stats_capacity_bytes + / + kubelet_volume_stats_capacity_bytes > 0.95 for: 1m labels: @@ -144,6 +155,52 @@ prometheus: enabled: {prometheus} "#, ); + + let mut null_receiver = Mapping::new(); + null_receiver.insert( + Value::String("receiver".to_string()), + Value::String("null".to_string()), + ); + null_receiver.insert( + Value::String("matchers".to_string()), + Value::Sequence(vec![Value::String("alertname!=Watchdog".to_string())]), + ); + null_receiver.insert(Value::String("continue".to_string()), Value::Bool(true)); + + let mut alert_manager_channel_config = AlertManagerConfig { + global: Mapping::new(), + route: AlertManagerRoute { + routes: vec![Value::Mapping(null_receiver)], + }, + receivers: vec![serde_yaml::from_str("name: 'null'").unwrap()], + }; + for receiver in config.alert_receiver_configs.iter() { + if let Some(global) = receiver.channel_global_config.clone() { + alert_manager_channel_config + .global + .insert(global.0, global.1); + } + alert_manager_channel_config + .route + .routes + .push(receiver.channel_route.clone()); + alert_manager_channel_config + .receivers + .push(receiver.channel_receiver.clone()); + } + + let alert_manager_values = AlertManagerValues { + alertmanager: AlertManager { + enabled: config.alert_manager, + config: alert_manager_channel_config, + }, + }; + + let alert_manager_yaml = + serde_yaml::to_string(&alert_manager_values).expect("Failed to serialize YAML"); + debug!("serialized alert manager: \n {:#}", alert_manager_yaml); + values.push_str(&alert_manager_yaml); + debug!("full values.yaml: \n {:#}", values); HelmChartScore { namespace: Some(NonBlankString::from_str(&config.namespace).unwrap()), release_name: NonBlankString::from_str("kube-prometheus").unwrap(), diff --git a/harmony/src/modules/monitoring/kube_prometheus/helm_prometheus_alert_score.rs b/harmony/src/modules/monitoring/kube_prometheus/helm_prometheus_alert_score.rs index c090f13..f1f5322 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/helm_prometheus_alert_score.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/helm_prometheus_alert_score.rs @@ -1,7 +1,8 @@ +use std::sync::{Arc, Mutex}; + use serde::Serialize; use crate::{ - modules::monitoring::alert_channel::discord_alert_channel::DiscordWebhook, score::Score, topology::{ HelmCommand, Topology, @@ -9,7 +10,7 @@ use crate::{ }, }; -use super::prometheus::Prometheus; +use super::{helm::config::KubePrometheusConfig, prometheus::Prometheus}; #[derive(Clone, Debug, Serialize)] pub struct HelmPrometheusAlertingScore { @@ -19,14 +20,12 @@ pub struct HelmPrometheusAlertingScore { impl Score for HelmPrometheusAlertingScore { fn create_interpret(&self) -> Box> { Box::new(AlertingInterpret { - sender: Prometheus {}, - receivers: vec![Box::new(DiscordWebhook { - url: todo!(), - name: todo!(), - })], + sender: Prometheus { + config: Arc::new(Mutex::new(KubePrometheusConfig::new())), + }, + receivers: self.receivers.clone(), }) } - fn name(&self) -> String { "HelmPrometheusAlertingScore".to_string() } @@ -40,8 +39,9 @@ impl Serialize for Box> { todo!() } } + impl Clone for Box> { fn clone(&self) -> Self { - todo!() + self.clone_box() } } diff --git a/harmony/src/modules/monitoring/kube_prometheus/prometheus.rs b/harmony/src/modules/monitoring/kube_prometheus/prometheus.rs index c5be07e..554d319 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/prometheus.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/prometheus.rs @@ -1,34 +1,86 @@ +use std::sync::{Arc, Mutex}; + use async_trait::async_trait; +use log::debug; use crate::{ - interpret::InterpretError, - topology::{installable::Installable, oberservability::monitoring::AlertSender}, + interpret::{InterpretError, Outcome}, + inventory::Inventory, + score, + topology::{ + HelmCommand, Topology, installable::Installable, oberservability::monitoring::AlertSender, + }, }; -impl AlertSender for Prometheus {} +use score::Score; + +use super::{ + helm::{ + config::KubePrometheusConfig, kube_prometheus_helm_chart::kube_prometheus_helm_chart_score, + }, + types::AlertManagerChannelConfig, +}; #[async_trait] -impl Installable for Prometheus { - async fn ensure_installed(&self) -> Result<(), InterpretError> { - todo!() +impl AlertSender for Prometheus { + fn name(&self) -> String { + "HelmKubePrometheus".to_string() } } + +#[async_trait] +impl Installable for Prometheus { + async fn ensure_installed( + &self, + inventory: &Inventory, + topology: &T, + ) -> Result<(), InterpretError> { + //install_prometheus + self.install_prometheus(inventory, topology).await?; + Ok(()) + } +} + #[derive(Debug)] -pub struct Prometheus; +pub struct Prometheus { + pub config: Arc>, +} impl Prometheus { pub async fn install_receiver( &self, - prometheus_receiver: PrometheusReceiver, - ) -> Result<(), InterpretError> { - todo!() + prometheus_receiver: &dyn PrometheusReceiver, + ) -> Result { + let prom_receiver = prometheus_receiver.configure_receiver().await; + debug!( + "adding alert receiver to prometheus config: {:#?}", + &prom_receiver + ); + let mut config = self.config.lock().unwrap(); + + config.alert_receiver_configs.push(prom_receiver); + let prom_receiver_name = prometheus_receiver.name(); + debug!("installed alert receiver {}", &prom_receiver_name); + Ok(Outcome::success(format!( + "Sucessfully installed receiver {}", + prom_receiver_name + ))) + } + + pub async fn install_prometheus( + &self, + inventory: &Inventory, + topology: &T, + ) -> Result { + kube_prometheus_helm_chart_score(self.config.clone()) + .create_interpret() + .execute(inventory, topology) + .await } } -pub struct PrometheusReceiver {} - -impl PrometheusReceiver { - fn get_prometheus_receiver_config(&self) {} +#[async_trait] +pub trait PrometheusReceiver: Send + Sync + std::fmt::Debug { + fn name(&self) -> String; + async fn configure_receiver(&self) -> AlertManagerChannelConfig; } - -pub struct AlertChannelGlobalConfig {} diff --git a/harmony/src/modules/monitoring/kube_prometheus/types.rs b/harmony/src/modules/monitoring/kube_prometheus/types.rs index 224b125..f237bba 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/types.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/types.rs @@ -1,12 +1,40 @@ +use async_trait::async_trait; use serde::Serialize; +use serde_yaml::{Mapping, Sequence, Value}; -#[derive(Serialize)] -pub struct AlertReceiverRoute { - pub receiver: String, - pub matchers: Vec, - #[serde(default)] - pub r#continue: bool, +#[async_trait] +pub trait AlertChannelConfig { + async fn get_config(&self) -> AlertManagerChannelConfig; } -pub struct AlertChannelReceiver { - pub name: String, + +#[derive(Debug, Clone, Serialize)] +pub struct AlertManagerValues { + pub alertmanager: AlertManager, +} +#[derive(Debug, Clone, Serialize)] +pub struct AlertManager { + pub enabled: bool, + pub config: AlertManagerConfig, +} + +#[derive(Debug, Clone, Serialize)] +pub struct AlertManagerConfig { + pub global: Mapping, + pub route: AlertManagerRoute, + pub receivers: Sequence, +} + +#[derive(Debug, Clone, Serialize)] +pub struct AlertManagerRoute { + pub routes: Sequence, +} + +#[derive(Debug, Clone, Serialize)] +pub struct AlertManagerChannelConfig { + ///expecting an option that contains two values + ///if necessary for the alertchannel + ///[ jira_api_url: ] + pub channel_global_config: Option<(Value, Value)>, + pub channel_route: Value, + pub channel_receiver: Value, }