From dbc66f3d0c42a4c583b9813fede226c142f751b3 Mon Sep 17 00:00:00 2001 From: Willem Date: Fri, 6 Jun 2025 16:41:17 -0400 Subject: [PATCH 1/3] feat: setup basic structure to for the concrete implementation of kube prometheus monitor, removed discord webhook receiver trait as the dependency is no longer required for prometheus to interact with discord --- examples/lamp/src/main.rs | 13 +- .../topology/oberservability/monitoring.rs | 25 ++- .../alert_channel/discord_alert_channel.rs | 52 ++++++ .../modules/monitoring/alert_channel/mod.rs | 1 + .../monitoring/discord_alert_manager.rs | 35 ---- .../monitoring/discord_webhook_sender.rs | 55 ------ .../{ => kube_prometheus}/config.rs | 4 - .../kube_prometheus_helm_chart_score.rs} | 123 +++++++------ .../kube_prometheus_monitor.rs | 51 ++++++ .../modules/monitoring/kube_prometheus/mod.rs | 4 + .../monitoring/kube_prometheus/types.rs | 3 + harmony/src/modules/monitoring/mod.rs | 6 +- .../modules/monitoring/monitoring_alerting.rs | 163 ++++++------------ 13 files changed, 246 insertions(+), 289 deletions(-) create mode 100644 harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs create mode 100644 harmony/src/modules/monitoring/alert_channel/mod.rs delete mode 100644 harmony/src/modules/monitoring/discord_alert_manager.rs delete mode 100644 harmony/src/modules/monitoring/discord_webhook_sender.rs rename harmony/src/modules/monitoring/{ => kube_prometheus}/config.rs (91%) rename harmony/src/modules/monitoring/{kube_prometheus.rs => kube_prometheus/kube_prometheus_helm_chart_score.rs} (73%) create mode 100644 harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_monitor.rs create mode 100644 harmony/src/modules/monitoring/kube_prometheus/mod.rs create mode 100644 harmony/src/modules/monitoring/kube_prometheus/types.rs diff --git a/examples/lamp/src/main.rs b/examples/lamp/src/main.rs index 74df0d8..f8bc6c5 100644 --- a/examples/lamp/src/main.rs +++ b/examples/lamp/src/main.rs @@ -2,10 +2,7 @@ use harmony::{ data::Version, inventory::Inventory, maestro::Maestro, - modules::{ - lamp::{LAMPConfig, LAMPScore}, - monitoring::monitoring_alerting::{AlertChannel, MonitoringAlertingStackScore}, - }, + modules::lamp::{LAMPConfig, LAMPScore}, topology::{K8sAnywhereTopology, Url}, }; @@ -43,13 +40,7 @@ async fn main() { .await .unwrap(); - let url = url::Url::parse("https://discord.com/api/webhooks/dummy_channel/dummy_token") - .expect("invalid URL"); - - let mut monitoring_stack_score = MonitoringAlertingStackScore::new(); - monitoring_stack_score.namespace = Some(lamp_stack.config.namespace.clone()); - - maestro.register_all(vec![Box::new(lamp_stack), Box::new(monitoring_stack_score)]); + maestro.register_all(vec![Box::new(lamp_stack)]); // Here we bootstrap the CLI, this gives some nice features if you need them harmony_cli::init(maestro, None).await.unwrap(); } diff --git a/harmony/src/domain/topology/oberservability/monitoring.rs b/harmony/src/domain/topology/oberservability/monitoring.rs index 4603eba..8793400 100644 --- a/harmony/src/domain/topology/oberservability/monitoring.rs +++ b/harmony/src/domain/topology/oberservability/monitoring.rs @@ -1,14 +1,12 @@ use async_trait::async_trait; - use std::fmt::Debug; -use url::Url; use crate::interpret::InterpretError; use crate::{interpret::Outcome, topology::Topology}; /// Represents an entity responsible for collecting and organizing observability data -/// from various telemetry sources +/// from various telemetry sources such as Prometheus or Datadog /// A `Monitor` abstracts the logic required to scrape, aggregate, and structure /// monitoring data, enabling consistent processing regardless of the underlying data source. #[async_trait] @@ -16,16 +14,25 @@ pub trait Monitor: Debug + Send + Sync { async fn deploy_monitor( &self, topology: &T, - alert_receivers: Vec, + alert_receivers: Vec>, ) -> Result; - async fn delete_monitor( + fn delete_monitor( &self, topolgy: &T, - alert_receivers: Vec, + alert_receivers: Vec>, ) -> Result; } - -pub struct AlertReceiver { - pub receiver_id: String, +pub trait MonitorConfig: Debug + Send + Sync { + fn build_monitor(&self) -> Box>; +} + +pub trait AlertChannelConfig: std::fmt::Debug + Send + Sync { + fn build_alert_channel(&self) -> Box; + fn channel_type(&self) -> String; +} + +#[async_trait] +pub trait AlertChannel: Debug + Send + Sync { + async fn get_channel_id(&self) -> String; } diff --git a/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs b/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs new file mode 100644 index 0000000..9f7c9d0 --- /dev/null +++ b/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs @@ -0,0 +1,52 @@ +use async_trait::async_trait; +use serde::Serialize; +use url::Url; + +use crate::{ + modules::monitoring::kube_prometheus::{ + kube_prometheus_monitor::PrometheusAlertChannel, + types::{AlertManagerGlobalConfigs, AlertManagerReceiver, AlertManagerRoute}, + }, + topology::oberservability::monitoring::{AlertChannel, AlertChannelConfig}, +}; + +#[derive(Debug, Clone)] +pub struct DiscordWebhookAlertChannel { + pub webhook_url: Url, + pub name: String, + pub send_resolved_notifications: bool, +} + +impl AlertChannelConfig for DiscordWebhookAlertChannel { + fn build_alert_channel(&self) -> Box { + Box::new(DiscordWebhookAlertChannel { + webhook_url: self.webhook_url.clone(), + name: self.name.clone(), + send_resolved_notifications: self.send_resolved_notifications.clone(), + }) + } + fn channel_type(&self) -> String { + "discord".to_string() + } +} + +#[async_trait] +impl AlertChannel for DiscordWebhookAlertChannel { + async fn get_channel_id(&self) -> String { + self.name.clone() + } +} + +impl PrometheusAlertChannel for DiscordWebhookAlertChannel { + fn get_alert_channel_global_settings(&self) -> Option { + None + } + + fn get_alert_channel_route(&self) -> AlertManagerRoute { + todo!() + } + + fn get_alert_channel_receiver(&self) -> AlertManagerReceiver { + todo!() + } +} diff --git a/harmony/src/modules/monitoring/alert_channel/mod.rs b/harmony/src/modules/monitoring/alert_channel/mod.rs new file mode 100644 index 0000000..fabc6dd --- /dev/null +++ b/harmony/src/modules/monitoring/alert_channel/mod.rs @@ -0,0 +1 @@ +pub mod discord_alert_channel; diff --git a/harmony/src/modules/monitoring/discord_alert_manager.rs b/harmony/src/modules/monitoring/discord_alert_manager.rs deleted file mode 100644 index 7765505..0000000 --- a/harmony/src/modules/monitoring/discord_alert_manager.rs +++ /dev/null @@ -1,35 +0,0 @@ -use std::str::FromStr; - -use non_blank_string_rs::NonBlankString; -use url::Url; - -use crate::modules::helm::chart::HelmChartScore; - -pub fn discord_alert_manager_score( - webhook_url: Url, - namespace: String, - name: String, -) -> HelmChartScore { - let values = format!( - r#" -environment: - - name: "DISCORD_WEBHOOK" - value: "{webhook_url}" -"#, - ); - - HelmChartScore { - namespace: Some(NonBlankString::from_str(&namespace).unwrap()), - release_name: NonBlankString::from_str(&name).unwrap(), - chart_name: NonBlankString::from_str( - "oci://hub.nationtech.io/library/alertmanager-discord", - ) - .unwrap(), - chart_version: None, - values_overrides: None, - values_yaml: Some(values.to_string()), - create_namespace: true, - install_only: true, - repository: None, - } -} diff --git a/harmony/src/modules/monitoring/discord_webhook_sender.rs b/harmony/src/modules/monitoring/discord_webhook_sender.rs deleted file mode 100644 index bad6402..0000000 --- a/harmony/src/modules/monitoring/discord_webhook_sender.rs +++ /dev/null @@ -1,55 +0,0 @@ -use async_trait::async_trait; -use serde_json::Value; -use url::Url; - -use crate::{ - interpret::{InterpretError, Outcome}, - topology::K8sAnywhereTopology, -}; - -#[derive(Debug, Clone)] -pub struct DiscordWebhookConfig { - pub webhook_url: Url, - pub name: String, - pub send_resolved_notifications: bool, -} - -pub trait DiscordWebhookReceiver { - fn deploy_discord_webhook_receiver( - &self, - _notification_adapter_id: &str, - ) -> Result; - - fn delete_discord_webhook_receiver( - &self, - _notification_adapter_id: &str, - ) -> Result; -} - -// trait used to generate alert manager config values impl Monitor for KubePrometheus -pub trait AlertManagerConfig { - fn get_alert_manager_config(&self) -> Result; -} - -#[async_trait] -impl AlertManagerConfig for DiscordWebhookConfig { - fn get_alert_manager_config(&self) -> Result { - todo!() - } -} - -#[async_trait] -impl DiscordWebhookReceiver for K8sAnywhereTopology { - fn deploy_discord_webhook_receiver( - &self, - _notification_adapter_id: &str, - ) -> Result { - todo!() - } - fn delete_discord_webhook_receiver( - &self, - _notification_adapter_id: &str, - ) -> Result { - todo!() - } -} diff --git a/harmony/src/modules/monitoring/config.rs b/harmony/src/modules/monitoring/kube_prometheus/config.rs similarity index 91% rename from harmony/src/modules/monitoring/config.rs rename to harmony/src/modules/monitoring/kube_prometheus/config.rs index 1477905..0e62c0f 100644 --- a/harmony/src/modules/monitoring/config.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/config.rs @@ -1,7 +1,5 @@ use serde::Serialize; -use super::monitoring_alerting::AlertChannel; - #[derive(Debug, Clone, Serialize)] pub struct KubePrometheusConfig { pub namespace: String, @@ -21,7 +19,6 @@ pub struct KubePrometheusConfig { pub kube_proxy: bool, pub kube_state_metrics: bool, pub prometheus_operator: bool, - pub alert_channel: Vec, } impl KubePrometheusConfig { pub fn new() -> Self { @@ -30,7 +27,6 @@ impl KubePrometheusConfig { default_rules: true, windows_monitoring: false, alert_manager: true, - alert_channel: Vec::new(), grafana: true, node_exporter: false, prometheus: true, diff --git a/harmony/src/modules/monitoring/kube_prometheus.rs b/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_helm_chart_score.rs similarity index 73% rename from harmony/src/modules/monitoring/kube_prometheus.rs rename to harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_helm_chart_score.rs index b694f51..708d94c 100644 --- a/harmony/src/modules/monitoring/kube_prometheus.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_helm_chart_score.rs @@ -1,8 +1,7 @@ -use super::{config::KubePrometheusConfig, monitoring_alerting::AlertChannel}; +use super::config::KubePrometheusConfig; use log::info; use non_blank_string_rs::NonBlankString; -use std::{collections::HashMap, str::FromStr}; -use url::Url; +use std::str::FromStr; use crate::modules::helm::chart::HelmChartScore; @@ -145,65 +144,65 @@ prometheus: "#, ); - let alertmanager_config = alert_manager_yaml_builder(&config); - values.push_str(&alertmanager_config); - - fn alert_manager_yaml_builder(config: &KubePrometheusConfig) -> String { - let mut receivers = String::new(); - let mut routes = String::new(); - let mut global_configs = String::new(); - let alert_manager = config.alert_manager; - for alert_channel in &config.alert_channel { - match alert_channel { - AlertChannel::Discord { name, .. } => { - let (receiver, route) = discord_alert_builder(name); - info!("discord receiver: {} \nroute: {}", receiver, route); - receivers.push_str(&receiver); - routes.push_str(&route); - } - AlertChannel::Slack { - slack_channel, - webhook_url, - } => { - let (receiver, route) = slack_alert_builder(slack_channel); - info!("slack receiver: {} \nroute: {}", receiver, route); - receivers.push_str(&receiver); - - routes.push_str(&route); - let global_config = format!( - r#" - global: - slack_api_url: {webhook_url}"# - ); - - global_configs.push_str(&global_config); - } - AlertChannel::Smpt { .. } => todo!(), - } - } - info!("after alert receiver: {}", receivers); - info!("after alert routes: {}", routes); - - let alertmanager_config = format!( - r#" -alertmanager: - enabled: {alert_manager} - config: {global_configs} - route: - group_by: ['job'] - group_wait: 30s - group_interval: 5m - repeat_interval: 12h - routes: -{routes} - receivers: - - name: 'null' -{receivers}"# - ); - - info!("alert manager config: {}", alertmanager_config); - alertmanager_config - } + // let alertmanager_config = alert_manager_yaml_builder(&config); + // values.push_str(&alertmanager_config); + // + // fn alert_manager_yaml_builder(config: &KubePrometheusConfig) -> String { + // let mut receivers = String::new(); + // let mut routes = String::new(); + // let mut global_configs = String::new(); + // let alert_manager = config.alert_manager; + // for alert_channel in &config.alert_channel { + // match alert_channel { + // AlertChannel::Discord { name, .. } => { + // let (receiver, route) = discord_alert_builder(name); + // info!("discord receiver: {} \nroute: {}", receiver, route); + // receivers.push_str(&receiver); + // routes.push_str(&route); + // } + // AlertChannel::Slack { + // slack_channel, + // webhook_url, + // } => { + // let (receiver, route) = slack_alert_builder(slack_channel); + // info!("slack receiver: {} \nroute: {}", receiver, route); + // receivers.push_str(&receiver); + // + // routes.push_str(&route); + // let global_config = format!( + // r#" + // global: + // slack_api_url: {webhook_url}"# + // ); + // + // global_configs.push_str(&global_config); + // } + // AlertChannel::Smpt { .. } => todo!(), + // } + // } + // info!("after alert receiver: {}", receivers); + // info!("after alert routes: {}", routes); + // + // let alertmanager_config = format!( + // r#" + //alertmanager: + // enabled: {alert_manager} + // config: {global_configs} + // route: + // group_by: ['job'] + // group_wait: 30s + // group_interval: 5m + // repeat_interval: 12h + // routes: + //{routes} + // receivers: + // - name: 'null' + //{receivers}"# + // ); + // + // info!("alert manager config: {}", alertmanager_config); + // alertmanager_config + // } HelmChartScore { namespace: Some(NonBlankString::from_str(&config.namespace).unwrap()), diff --git a/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_monitor.rs b/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_monitor.rs new file mode 100644 index 0000000..e8cc2d1 --- /dev/null +++ b/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_monitor.rs @@ -0,0 +1,51 @@ +use async_trait::async_trait; +use serde::Serialize; + +use crate::{ + interpret::{InterpretError, Outcome}, + topology::{ + Topology, + oberservability::monitoring::{AlertChannel, Monitor, MonitorConfig}, + }, +}; + +use super::{ + config::KubePrometheusConfig, + types::{AlertManagerGlobalConfigs, AlertManagerReceiver, AlertManagerRoute}, +}; + +#[derive(Debug, Clone, Serialize)] +pub struct KubePrometheusMonitor { + pub kube_prometheus_config: KubePrometheusConfig, +} + +impl MonitorConfig for KubePrometheusMonitor { + fn build_monitor(&self) -> Box> { + Box::new(self.clone()) + } +} + +#[async_trait] +pub trait PrometheusAlertChannel: AlertChannel { + fn get_alert_channel_global_settings(&self) -> Option; + fn get_alert_channel_route(&self) -> AlertManagerRoute; + fn get_alert_channel_receiver(&self) -> AlertManagerReceiver; +} + +#[async_trait] +impl Monitor for KubePrometheusMonitor { + async fn deploy_monitor( + &self, + _topology: &T, + _alert_channels: Vec>, + ) -> Result { + todo!() + } + fn delete_monitor( + &self, + _topology: &T, + _alert_channels: Vec>, + ) -> Result { + todo!() + } +} diff --git a/harmony/src/modules/monitoring/kube_prometheus/mod.rs b/harmony/src/modules/monitoring/kube_prometheus/mod.rs new file mode 100644 index 0000000..b0b9985 --- /dev/null +++ b/harmony/src/modules/monitoring/kube_prometheus/mod.rs @@ -0,0 +1,4 @@ +pub mod config; +pub mod kube_prometheus_helm_chart_score; +pub mod kube_prometheus_monitor; +pub mod types; diff --git a/harmony/src/modules/monitoring/kube_prometheus/types.rs b/harmony/src/modules/monitoring/kube_prometheus/types.rs new file mode 100644 index 0000000..095aa55 --- /dev/null +++ b/harmony/src/modules/monitoring/kube_prometheus/types.rs @@ -0,0 +1,3 @@ +pub struct AlertManagerGlobalConfigs {} +pub struct AlertManagerReceiver {} +pub struct AlertManagerRoute {} diff --git a/harmony/src/modules/monitoring/mod.rs b/harmony/src/modules/monitoring/mod.rs index d3eb288..a84bd9f 100644 --- a/harmony/src/modules/monitoring/mod.rs +++ b/harmony/src/modules/monitoring/mod.rs @@ -1,5 +1,3 @@ -mod config; -mod discord_alert_manager; -pub mod discord_webhook_sender; -mod kube_prometheus; +pub mod alert_channel; +pub mod kube_prometheus; pub mod monitoring_alerting; diff --git a/harmony/src/modules/monitoring/monitoring_alerting.rs b/harmony/src/modules/monitoring/monitoring_alerting.rs index 6d2db38..0391bff 100644 --- a/harmony/src/modules/monitoring/monitoring_alerting.rs +++ b/harmony/src/modules/monitoring/monitoring_alerting.rs @@ -1,146 +1,73 @@ use async_trait::async_trait; -use email_address::EmailAddress; - -use log::info; -use serde::Serialize; -use url::Url; +use serde::{Serialize, Serializer, ser::SerializeStruct}; +use std::{fmt::Debug, sync::Arc}; use crate::{ data::{Id, Version}, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, score::Score, - topology::{HelmCommand, Topology}, + topology::{ + HelmCommand, Topology, + oberservability::monitoring::{AlertChannelConfig, MonitorConfig}, + }, }; -use super::{ - config::KubePrometheusConfig, discord_alert_manager::discord_alert_manager_score, - kube_prometheus::kube_prometheus_helm_chart_score, +use super::kube_prometheus::{ + config::KubePrometheusConfig, kube_prometheus_monitor::KubePrometheusMonitor, }; -#[derive(Debug, Clone, Serialize)] -pub enum AlertChannel { - Discord { - name: String, - webhook_url: Url, - }, - Slack { - slack_channel: String, - webhook_url: Url, - }, - //TODO test and implement in helm chart - //currently does not work - Smpt { - email_address: EmailAddress, - service_name: String, - }, -} - -#[derive(Debug, Clone, Serialize)] -pub struct MonitoringAlertingStackScore { - pub alert_channel: Vec, +#[derive(Debug, Clone)] +pub struct MonitoringAlertingScore { + pub monitor_config: Arc>, + pub alert_channel_configs: Vec>, pub namespace: Option, } -impl MonitoringAlertingStackScore { - pub fn new() -> Self { +impl MonitoringAlertingScore { + pub fn default() -> Self { Self { - alert_channel: Vec::new(), - namespace: None, + monitor_config: Arc::new(KubePrometheusMonitor { + kube_prometheus_config: KubePrometheusConfig::new(), + }), + alert_channel_configs: Vec::new(), + namespace: Some("monitoring".to_string()), } } } -impl Score for MonitoringAlertingStackScore { +impl Score for MonitoringAlertingScore { fn create_interpret(&self) -> Box> { - Box::new(MonitoringAlertingStackInterpret { + Box::new(MonitoringAlertingInterpret { score: self.clone(), }) } + fn name(&self) -> String { - format!("MonitoringAlertingStackScore") + todo!() } } -#[derive(Debug, Clone, Serialize)] -struct MonitoringAlertingStackInterpret { - score: MonitoringAlertingStackScore, -} - -impl MonitoringAlertingStackInterpret { - async fn build_kube_prometheus_helm_chart_config(&self) -> KubePrometheusConfig { - let mut config = KubePrometheusConfig::new(); - if let Some(ns) = &self.score.namespace { - config.namespace = ns.clone(); - } - config.alert_channel = self.score.alert_channel.clone(); - config - } - - async fn deploy_kube_prometheus_helm_chart_score( - &self, - inventory: &Inventory, - topology: &T, - config: &KubePrometheusConfig, - ) -> Result { - let helm_chart = kube_prometheus_helm_chart_score(config); - helm_chart - .create_interpret() - .execute(inventory, topology) - .await - } - - async fn deploy_alert_channel_service( - &self, - inventory: &Inventory, - topology: &T, - config: &KubePrometheusConfig, - ) -> Result { - //let mut outcomes = vec![]; - - //for channel in &self.score.alert_channel { - // let outcome = match channel { - // AlertChannel::Discord { .. } => { - // discord_alert_manager_score(config) - // .create_interpret() - // .execute(inventory, topology) - // .await - // } - // AlertChannel::Slack { .. } => Ok(Outcome::success( - // "No extra configs for slack alerting".to_string(), - // )), - // AlertChannel::Smpt { .. } => { - // todo!() - // } - // }; - // outcomes.push(outcome); - //} - //for result in outcomes { - // result?; - //} - - Ok(Outcome::success("All alert channels deployed".to_string())) - } +#[derive(Debug)] +struct MonitoringAlertingInterpret { + score: MonitoringAlertingScore, } #[async_trait] -impl Interpret for MonitoringAlertingStackInterpret { +impl Interpret for MonitoringAlertingInterpret { async fn execute( &self, - inventory: &Inventory, + _inventory: &Inventory, topology: &T, ) -> Result { - let config = self.build_kube_prometheus_helm_chart_config().await; - info!("Built kube prometheus config"); - info!("Installing kube prometheus chart"); - self.deploy_kube_prometheus_helm_chart_score(inventory, topology, &config) - .await?; - info!("Installing alert channel service"); - self.deploy_alert_channel_service(inventory, topology, &config) - .await?; - Ok(Outcome::success(format!( - "succesfully deployed monitoring and alerting stack" - ))) + let monitor = self.score.monitor_config.build_monitor(); + + let mut alert_channels = Vec::new(); + for config in &self.score.alert_channel_configs { + alert_channels.push(config.build_alert_channel()); + } + + monitor.deploy_monitor(topology, alert_channels).await } fn get_name(&self) -> InterpretName { @@ -159,3 +86,21 @@ impl Interpret for MonitoringAlertingStackInterpre todo!() } } + +impl Serialize for MonitoringAlertingScore { + fn serialize(&self, serializer: S) -> Result + where + S: Serializer, + { + let mut state = serializer.serialize_struct("MonitoringAlertingScore", 3)?; + + // For now, just serialize basic info + state.serialize_field("monitor_type", "monitoring_system")?; + + let channel_count = self.alert_channel_configs.len(); + state.serialize_field("alert_channel_count", &channel_count)?; + + state.serialize_field("namespace", &self.namespace)?; + state.end() + } +} -- 2.39.5 From 238d1f85e26dbe497034dc22fb000690ef55c3c2 Mon Sep 17 00:00:00 2001 From: Willem Date: Wed, 11 Jun 2025 13:35:07 -0400 Subject: [PATCH 2/3] wip: impl k8sMonitor --- harmony/src/domain/topology/k8s_anywhere.rs | 54 +++++++- .../domain/topology/oberservability/k8s.rs | 68 ++++++++++ .../domain/topology/oberservability/mod.rs | 11 ++ .../topology/oberservability/monitoring.rs | 36 ++--- .../alert_channel/discord_alert_channel.rs | 76 +++++------ .../monitoring/kube_prometheus/config.rs | 4 + .../kube_prometheus_helm_chart_score.rs | 112 ++++++++-------- .../kube_prometheus_monitor.rs | 126 +++++++++++------- .../monitoring/kube_prometheus/types.rs | 17 ++- .../modules/monitoring/monitoring_alerting.rs | 74 ++-------- 10 files changed, 354 insertions(+), 224 deletions(-) create mode 100644 harmony/src/domain/topology/oberservability/k8s.rs diff --git a/harmony/src/domain/topology/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere.rs index ef11f36..785a606 100644 --- a/harmony/src/domain/topology/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere.rs @@ -10,13 +10,18 @@ use crate::{ interpret::{InterpretError, Outcome}, inventory::Inventory, maestro::Maestro, - modules::k3d::K3DInstallationScore, + modules::{k3d::K3DInstallationScore, monitoring::kube_prometheus::kube_prometheus_helm_chart_score::kube_prometheus_helm_chart_score}, topology::LocalhostTopology, }; use super::{ HelmCommand, K8sclient, Topology, k8s::K8sClient, + oberservability::{ + K8sMonitorConfig, + k8s::K8sMonitor, + monitoring::{AlertChannel, AlertChannelConfig, Monitor}, + }, tenant::{ ResourceLimits, TenantConfig, TenantManager, TenantNetworkPolicy, k8s::K8sTenantManager, }, @@ -37,6 +42,7 @@ enum K8sSource { pub struct K8sAnywhereTopology { k8s_state: OnceCell>, tenant_manager: OnceCell, + k8s_monitor: OnceCell, } #[async_trait] @@ -61,6 +67,7 @@ impl K8sAnywhereTopology { Self { k8s_state: OnceCell::new(), tenant_manager: OnceCell::new(), + k8s_monitor: OnceCell::new(), } } @@ -178,6 +185,32 @@ impl K8sAnywhereTopology { )), } } + + async fn ensure_k8s_monitor(&self) -> Result<(), String> { + if let Some(_) = self.k8s_monitor.get() { + return Ok(()); + } + + self.k8s_monitor + .get_or_try_init(async || -> Result { + let config = K8sMonitorConfig { + chart: kube_prometheus_helm_chart_score(), + }; + Ok(K8sMonitor { config }) + }) + .await + .unwrap(); + Ok(()) + } + + fn get_k8s_monitor(&self) -> Result<&K8sMonitor, ExecutorError> { + match self.k8s_monitor.get() { + Some(k) => Ok(k), + None => Err(ExecutorError::UnexpectedError( + "K8sMonitor not available".to_string(), + )), + } + } } struct K8sAnywhereConfig { @@ -217,6 +250,10 @@ impl Topology for K8sAnywhereTopology { "No K8s client could be found or installed".to_string(), ))?; + self.ensure_k8s_monitor() + .await + .map_err(|e| InterpretError::new(e))?; + match self.is_helm_available() { Ok(()) => Ok(Outcome::success(format!( "{} + helm available", @@ -263,3 +300,18 @@ impl TenantManager for K8sAnywhereTopology { .await } } +#[async_trait] +impl Monitor for K8sAnywhereTopology { + async fn provision_monitor( + &self, + alert_receivers: Option>>, + ) -> Result { + self.get_k8s_monitor()? + .provision_monitor(alert_receivers) + .await + } + + fn delete_monitor(&self) -> Result { + todo!() + } +} diff --git a/harmony/src/domain/topology/oberservability/k8s.rs b/harmony/src/domain/topology/oberservability/k8s.rs new file mode 100644 index 0000000..2640358 --- /dev/null +++ b/harmony/src/domain/topology/oberservability/k8s.rs @@ -0,0 +1,68 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use serde::Serialize; + +use crate::{ + interpret::{InterpretError, Outcome}, + inventory::Inventory, + modules::{helm::chart::HelmChartInterpret, monitoring::kube_prometheus::{ + config::KubePrometheusConfig, + kube_prometheus_helm_chart_score::kube_prometheus_helm_chart_score, + }}, + topology::{K8sAnywhereTopology, Topology}, +}; + +use super::{ + K8sMonitorConfig, + monitoring::{AlertChannel, AlertChannelConfig, Monitor}, +}; + +#[derive(Debug, Clone, Serialize)] +pub struct K8sMonitor { + pub config: K8sMonitorConfig, +} + +#[async_trait] +impl Monitor for K8sMonitor { + async fn provision_monitor( + &self, + alert_receivers: Option>>, + ) -> Result { + if let Some(receivers) = alert_receivers { + let alert_channels = self.build_alert_channels(receivers).await?; + for channel in alert_channels { + channel.register_alert_channel().await?; + } + } + let chart = self.config.chart.clone(); + // + chart.create_interpret(); + Ok(Outcome::success("installed monitor".to_string())) + } + + fn delete_monitor(&self) -> Result { + todo!() + } +} + +#[async_trait] +impl AlertChannelConfig for K8sMonitor { + async fn build_alert_channel(&self) -> Result, InterpretError> { + todo!() + } +} + +impl K8sMonitor { + pub async fn build_alert_channels( + &self, + alert_channel_configs: Vec>, + ) -> Result>, InterpretError> { + let mut alert_channels = Vec::new(); + for config in alert_channel_configs { + let channel = config.build_alert_channel().await?; + alert_channels.push(channel) + } + Ok(alert_channels) + } +} diff --git a/harmony/src/domain/topology/oberservability/mod.rs b/harmony/src/domain/topology/oberservability/mod.rs index 7f2ac95..05c7f20 100644 --- a/harmony/src/domain/topology/oberservability/mod.rs +++ b/harmony/src/domain/topology/oberservability/mod.rs @@ -1 +1,12 @@ +use serde::Serialize; + +use crate::modules::helm::chart::HelmChartScore; + pub mod monitoring; +pub mod k8s; + +#[derive(Debug, Clone, Serialize)] +pub struct K8sMonitorConfig { + //probably need to do something better here + pub chart: HelmChartScore, +} diff --git a/harmony/src/domain/topology/oberservability/monitoring.rs b/harmony/src/domain/topology/oberservability/monitoring.rs index 8793400..d8b81d0 100644 --- a/harmony/src/domain/topology/oberservability/monitoring.rs +++ b/harmony/src/domain/topology/oberservability/monitoring.rs @@ -1,8 +1,13 @@ use async_trait::async_trait; +use dyn_clone::DynClone; +use serde::Serialize; use std::fmt::Debug; +use std::sync::Arc; +use crate::executors::ExecutorError; use crate::interpret::InterpretError; +use crate::inventory::Inventory; use crate::{interpret::Outcome, topology::Topology}; /// Represents an entity responsible for collecting and organizing observability data @@ -10,29 +15,24 @@ use crate::{interpret::Outcome, topology::Topology}; /// A `Monitor` abstracts the logic required to scrape, aggregate, and structure /// monitoring data, enabling consistent processing regardless of the underlying data source. #[async_trait] -pub trait Monitor: Debug + Send + Sync { - async fn deploy_monitor( +pub trait Monitor { + async fn provision_monitor( &self, - topology: &T, - alert_receivers: Vec>, + alert_receivers: Option>>, ) -> Result; - fn delete_monitor( - &self, - topolgy: &T, - alert_receivers: Vec>, - ) -> Result; -} -pub trait MonitorConfig: Debug + Send + Sync { - fn build_monitor(&self) -> Box>; -} - -pub trait AlertChannelConfig: std::fmt::Debug + Send + Sync { - fn build_alert_channel(&self) -> Box; - fn channel_type(&self) -> String; + fn delete_monitor(&self) -> Result; } #[async_trait] pub trait AlertChannel: Debug + Send + Sync { - async fn get_channel_id(&self) -> String; + async fn register_alert_channel(&self) -> Result; + //async fn get_channel_id(&self) -> String; } + +#[async_trait] +pub trait AlertChannelConfig: Debug + Send + Sync + DynClone { + async fn build_alert_channel(&self) -> Result, InterpretError>; +} + +dyn_clone::clone_trait_object!(AlertChannelConfig); diff --git a/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs b/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs index 9f7c9d0..397cab9 100644 --- a/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs +++ b/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs @@ -1,14 +1,6 @@ -use async_trait::async_trait; -use serde::Serialize; use url::Url; -use crate::{ - modules::monitoring::kube_prometheus::{ - kube_prometheus_monitor::PrometheusAlertChannel, - types::{AlertManagerGlobalConfigs, AlertManagerReceiver, AlertManagerRoute}, - }, - topology::oberservability::monitoring::{AlertChannel, AlertChannelConfig}, -}; + #[derive(Debug, Clone)] pub struct DiscordWebhookAlertChannel { @@ -17,36 +9,36 @@ pub struct DiscordWebhookAlertChannel { pub send_resolved_notifications: bool, } -impl AlertChannelConfig for DiscordWebhookAlertChannel { - fn build_alert_channel(&self) -> Box { - Box::new(DiscordWebhookAlertChannel { - webhook_url: self.webhook_url.clone(), - name: self.name.clone(), - send_resolved_notifications: self.send_resolved_notifications.clone(), - }) - } - fn channel_type(&self) -> String { - "discord".to_string() - } -} - -#[async_trait] -impl AlertChannel for DiscordWebhookAlertChannel { - async fn get_channel_id(&self) -> String { - self.name.clone() - } -} - -impl PrometheusAlertChannel for DiscordWebhookAlertChannel { - fn get_alert_channel_global_settings(&self) -> Option { - None - } - - fn get_alert_channel_route(&self) -> AlertManagerRoute { - todo!() - } - - fn get_alert_channel_receiver(&self) -> AlertManagerReceiver { - todo!() - } -} +//impl AlertChannelConfig for DiscordWebhookAlertChannel { +// fn build_alert_channel(&self) -> Box { +// Box::new(DiscordWebhookAlertChannel { +// webhook_url: self.webhook_url.clone(), +// name: self.name.clone(), +// send_resolved_notifications: self.send_resolved_notifications.clone(), +// }) +// } +// fn channel_type(&self) -> String { +// "discord".to_string() +// } +//} +// +//#[async_trait] +//impl AlertChannel for DiscordWebhookAlertChannel { +// async fn get_channel_id(&self) -> String { +// self.name.clone() +// } +//} +// +//impl PrometheusAlertChannel for DiscordWebhookAlertChannel { +// fn get_alert_channel_global_settings(&self) -> Option { +// None +// } +// +// fn get_alert_channel_route(&self) -> AlertManagerChannelRoute { +// todo!() +// } +// +// fn get_alert_channel_receiver(&self) -> AlertManagerChannelReceiver { +// todo!() +// } +//} diff --git a/harmony/src/modules/monitoring/kube_prometheus/config.rs b/harmony/src/modules/monitoring/kube_prometheus/config.rs index 0e62c0f..74fdf6f 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/config.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/config.rs @@ -1,5 +1,7 @@ use serde::Serialize; +use super::types::AlertManagerChannelConfig; + #[derive(Debug, Clone, Serialize)] pub struct KubePrometheusConfig { pub namespace: String, @@ -19,6 +21,7 @@ pub struct KubePrometheusConfig { pub kube_proxy: bool, pub kube_state_metrics: bool, pub prometheus_operator: bool, + pub alert_channels: Vec, } impl KubePrometheusConfig { pub fn new() -> Self { @@ -40,6 +43,7 @@ impl KubePrometheusConfig { prometheus_operator: true, core_dns: false, kube_scheduler: false, + alert_channels: Vec::new(), } } } diff --git a/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_helm_chart_score.rs b/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_helm_chart_score.rs index 708d94c..ac24c4a 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_helm_chart_score.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_helm_chart_score.rs @@ -5,7 +5,9 @@ use std::str::FromStr; use crate::modules::helm::chart::HelmChartScore; -pub fn kube_prometheus_helm_chart_score(config: &KubePrometheusConfig) -> HelmChartScore { +pub fn kube_prometheus_helm_chart_score() -> HelmChartScore { + let config = KubePrometheusConfig::new(); + //TODO this should be make into a rule with default formatting that can be easily passed as a vec //to the overrides or something leaving the user to deal with formatting here seems bad let default_rules = config.default_rules.to_string(); @@ -143,7 +145,21 @@ prometheus: enabled: {prometheus} "#, ); - + HelmChartScore { + namespace: Some(NonBlankString::from_str(&config.namespace).unwrap()), + release_name: NonBlankString::from_str("kube-prometheus").unwrap(), + chart_name: NonBlankString::from_str( + "oci://ghcr.io/prometheus-community/charts/kube-prometheus-stack", + ) + .unwrap(), + chart_version: None, + values_overrides: None, + values_yaml: Some(values.to_string()), + create_namespace: true, + install_only: true, + repository: None, + } +} // let alertmanager_config = alert_manager_yaml_builder(&config); // values.push_str(&alertmanager_config); // @@ -204,58 +220,44 @@ prometheus: // alertmanager_config // } - HelmChartScore { - namespace: Some(NonBlankString::from_str(&config.namespace).unwrap()), - release_name: NonBlankString::from_str("kube-prometheus").unwrap(), - chart_name: NonBlankString::from_str( - "oci://ghcr.io/prometheus-community/charts/kube-prometheus-stack", - ) - .unwrap(), - chart_version: None, - values_overrides: None, - values_yaml: Some(values.to_string()), - create_namespace: true, - install_only: true, - repository: None, - } -} -fn discord_alert_builder(release_name: &String) -> (String, String) { - let discord_receiver_name = format!("Discord-{}", release_name); - let receiver = format!( - r#" - - name: '{discord_receiver_name}' - webhook_configs: - - url: 'http://{release_name}-alertmanager-discord:9094' - send_resolved: true"#, - ); - let route = format!( - r#" - - receiver: '{discord_receiver_name}' - matchers: - - alertname!=Watchdog - continue: true"#, - ); - (receiver, route) -} -fn slack_alert_builder(slack_channel: &String) -> (String, String) { - let slack_receiver_name = format!("Slack-{}", slack_channel); - let receiver = format!( - r#" - - name: '{slack_receiver_name}' - slack_configs: - - channel: '{slack_channel}' - send_resolved: true - title: '{{{{ .CommonAnnotations.title }}}}' - text: '{{{{ .CommonAnnotations.description }}}}'"#, - ); - let route = format!( - r#" - - receiver: '{slack_receiver_name}' - matchers: - - alertname!=Watchdog - continue: true"#, - ); - (receiver, route) -} +//fn discord_alert_builder(release_name: &String) -> (String, String) { +// let discord_receiver_name = format!("Discord-{}", release_name); +// let receiver = format!( +// r#" +// - name: '{discord_receiver_name}' +// webhook_configs: +// - url: 'http://{release_name}-alertmanager-discord:9094' +// send_resolved: true"#, +// ); +// let route = format!( +// r#" +// - receiver: '{discord_receiver_name}' +// matchers: +// - alertname!=Watchdog +// continue: true"#, +// ); +// (receiver, route) +//} +// +//fn slack_alert_builder(slack_channel: &String) -> (String, String) { +// let slack_receiver_name = format!("Slack-{}", slack_channel); +// let receiver = format!( +// r#" +// - name: '{slack_receiver_name}' +// slack_configs: +// - channel: '{slack_channel}' +// send_resolved: true +// title: '{{{{ .CommonAnnotations.title }}}}' +// text: '{{{{ .CommonAnnotations.description }}}}'"#, +// ); +// let route = format!( +// r#" +// - receiver: '{slack_receiver_name}' +// matchers: +// - alertname!=Watchdog +// continue: true"#, +// ); +// (receiver, route) +//} diff --git a/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_monitor.rs b/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_monitor.rs index e8cc2d1..4cd2c8c 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_monitor.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_monitor.rs @@ -1,51 +1,87 @@ -use async_trait::async_trait; -use serde::Serialize; +//#[derive(Debug, Clone, Serialize)] +//pub struct KubePrometheusMonitorScore { +// pub kube_prometheus_config: KubePrometheusConfig, +// pub alert_channel_configs: Vec, +//} -use crate::{ - interpret::{InterpretError, Outcome}, - topology::{ - Topology, - oberservability::monitoring::{AlertChannel, Monitor, MonitorConfig}, - }, -}; +//impl> MonitorConfig +// for KubePrometheusMonitorScore +//{ +// fn build_monitor(&self) -> Box> { +// Box::new(self.clone()) +// } +//} -use super::{ - config::KubePrometheusConfig, - types::{AlertManagerGlobalConfigs, AlertManagerReceiver, AlertManagerRoute}, -}; +//impl> Score +// for KubePrometheusMonitorScore +//{ +// fn create_interpret(&self) -> Box> { +// Box::new(KubePrometheusMonitorInterpret { +// score: self.clone(), +// }) +// } +// +// fn name(&self) -> String { +// "KubePrometheusMonitorScore".to_string() +// } +//} -#[derive(Debug, Clone, Serialize)] -pub struct KubePrometheusMonitor { - pub kube_prometheus_config: KubePrometheusConfig, -} +//#[derive(Debug, Clone)] +//pub struct KubePrometheusMonitorInterpret { +// score: KubePrometheusMonitorScore, +//} -impl MonitorConfig for KubePrometheusMonitor { - fn build_monitor(&self) -> Box> { - Box::new(self.clone()) - } -} +//#[async_trait] +//impl AlertChannelConfig for KubePrometheusMonitorInterpret { +// async fn build_alert_channel( +// &self, +// ) -> Box { +// todo!() +// } +//} +//#[async_trait] +//impl> Interpret +// for KubePrometheusMonitorInterpret +//{ +// async fn execute( +// &self, +// inventory: &Inventory, +// topology: &T, +// ) -> Result { +// let monitor = self.score.build_monitor(); +// +// let mut alert_channels = Vec::new(); +// //for config in self.score.alert_channel_configs { +// // alert_channels.push(self.build_alert_channel()); +// //} +// +// monitor +// .deploy_monitor(inventory, topology, alert_channels) +// .await +// } +// +// fn get_name(&self) -> InterpretName { +// todo!() +// } +// +// fn get_version(&self) -> Version { +// todo!() +// } +// +// fn get_status(&self) -> InterpretStatus { +// todo!() +// } +// +// fn get_children(&self) -> Vec { +// todo!() +// } +//} -#[async_trait] -pub trait PrometheusAlertChannel: AlertChannel { - fn get_alert_channel_global_settings(&self) -> Option; - fn get_alert_channel_route(&self) -> AlertManagerRoute; - fn get_alert_channel_receiver(&self) -> AlertManagerReceiver; -} -#[async_trait] -impl Monitor for KubePrometheusMonitor { - async fn deploy_monitor( - &self, - _topology: &T, - _alert_channels: Vec>, - ) -> Result { - todo!() - } - fn delete_monitor( - &self, - _topology: &T, - _alert_channels: Vec>, - ) -> Result { - todo!() - } -} +//#[async_trait] +//pub trait PrometheusAlertChannel { +// fn get_alert_channel_global_settings(&self) -> Option; +// fn get_alert_channel_route(&self) -> AlertManagerChannelRoute; +// fn get_alert_channel_receiver(&self) -> AlertManagerChannelReceiver; +//} + diff --git a/harmony/src/modules/monitoring/kube_prometheus/types.rs b/harmony/src/modules/monitoring/kube_prometheus/types.rs index 095aa55..fa66f99 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/types.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/types.rs @@ -1,3 +1,14 @@ -pub struct AlertManagerGlobalConfigs {} -pub struct AlertManagerReceiver {} -pub struct AlertManagerRoute {} +use serde::Serialize; + +#[derive(Debug, Clone, Serialize)] +pub struct AlertManagerChannelConfig { + pub global_configs: AlertManagerChannelGlobalConfigs, + pub route: AlertManagerChannelRoute, + pub receiver: AlertManagerChannelReceiver, +} +#[derive(Debug, Clone, Serialize)] +pub struct AlertManagerChannelGlobalConfigs {} +#[derive(Debug, Clone, Serialize)] +pub struct AlertManagerChannelReceiver {} +#[derive(Debug, Clone, Serialize)] +pub struct AlertManagerChannelRoute {} diff --git a/harmony/src/modules/monitoring/monitoring_alerting.rs b/harmony/src/modules/monitoring/monitoring_alerting.rs index 0391bff..5fe18a0 100644 --- a/harmony/src/modules/monitoring/monitoring_alerting.rs +++ b/harmony/src/modules/monitoring/monitoring_alerting.rs @@ -2,41 +2,18 @@ use async_trait::async_trait; use serde::{Serialize, Serializer, ser::SerializeStruct}; use std::{fmt::Debug, sync::Arc}; -use crate::{ - data::{Id, Version}, - interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, - inventory::Inventory, - score::Score, - topology::{ - HelmCommand, Topology, - oberservability::monitoring::{AlertChannelConfig, MonitorConfig}, - }, -}; +use crate::{data::{Id, Version}, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, score::Score, topology::{ + oberservability::monitoring::{AlertChannelConfig, Monitor},HelmCommand, Topology + }}; -use super::kube_prometheus::{ - config::KubePrometheusConfig, kube_prometheus_monitor::KubePrometheusMonitor, -}; -#[derive(Debug, Clone)] -pub struct MonitoringAlertingScore { - pub monitor_config: Arc>, - pub alert_channel_configs: Vec>, - pub namespace: Option, +#[derive(Debug, Clone, Serialize)] +pub struct MonitoringAlertingScore { + #[serde(skip)] + pub alert_channel_configs: Option>>, } -impl MonitoringAlertingScore { - pub fn default() -> Self { - Self { - monitor_config: Arc::new(KubePrometheusMonitor { - kube_prometheus_config: KubePrometheusConfig::new(), - }), - alert_channel_configs: Vec::new(), - namespace: Some("monitoring".to_string()), - } - } -} - -impl Score for MonitoringAlertingScore { +impl Score for MonitoringAlertingScore { fn create_interpret(&self) -> Box> { Box::new(MonitoringAlertingInterpret { score: self.clone(), @@ -44,30 +21,23 @@ impl Score for Monitorin } fn name(&self) -> String { - todo!() + "MonitoringAlertingScore".to_string() } } #[derive(Debug)] -struct MonitoringAlertingInterpret { - score: MonitoringAlertingScore, +struct MonitoringAlertingInterpret { + score: MonitoringAlertingScore, } #[async_trait] -impl Interpret for MonitoringAlertingInterpret { +impl Interpret for MonitoringAlertingInterpret { async fn execute( &self, - _inventory: &Inventory, + inventory: &Inventory, topology: &T, ) -> Result { - let monitor = self.score.monitor_config.build_monitor(); - - let mut alert_channels = Vec::new(); - for config in &self.score.alert_channel_configs { - alert_channels.push(config.build_alert_channel()); - } - - monitor.deploy_monitor(topology, alert_channels).await + topology.provision_monitor(self.score.alert_channel_configs.clone()).await } fn get_name(&self) -> InterpretName { @@ -87,20 +57,4 @@ impl Interpret for MonitoringAlertingInter } } -impl Serialize for MonitoringAlertingScore { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let mut state = serializer.serialize_struct("MonitoringAlertingScore", 3)?; - // For now, just serialize basic info - state.serialize_field("monitor_type", "monitoring_system")?; - - let channel_count = self.alert_channel_configs.len(); - state.serialize_field("alert_channel_count", &channel_count)?; - - state.serialize_field("namespace", &self.namespace)?; - state.end() - } -} -- 2.39.5 From 613def5e0b407035cdef0db76523a35e615203d9 Mon Sep 17 00:00:00 2001 From: Willem Date: Wed, 11 Jun 2025 15:06:39 -0400 Subject: [PATCH 3/3] feat: depoloys cluster monitoring stack from monitoring score on k8sanywhere topology --- examples/monitoring/Cargo.toml | 12 ++ examples/monitoring/src/main.rs | 23 ++++ harmony/src/domain/topology/k8s_anywhere.rs | 15 ++- .../domain/topology/oberservability/k8s.rs | 25 ++-- .../domain/topology/oberservability/mod.rs | 15 ++- .../topology/oberservability/monitoring.rs | 7 +- .../alert_channel/discord_alert_channel.rs | 2 - .../kube_prometheus_helm_chart_score.rs | 124 +++++++++--------- .../kube_prometheus_monitor.rs | 2 - .../modules/monitoring/monitoring_alerting.rs | 27 ++-- 10 files changed, 154 insertions(+), 98 deletions(-) create mode 100644 examples/monitoring/Cargo.toml create mode 100644 examples/monitoring/src/main.rs diff --git a/examples/monitoring/Cargo.toml b/examples/monitoring/Cargo.toml new file mode 100644 index 0000000..5eee2fe --- /dev/null +++ b/examples/monitoring/Cargo.toml @@ -0,0 +1,12 @@ +[package] +name = "webhook_sender" +edition = "2024" +version.workspace = true +readme.workspace = true +license.workspace = true + +[dependencies] +harmony = { version = "0.1.0", path = "../../harmony" } +harmony_cli = { version = "0.1.0", path = "../../harmony_cli" } +tokio.workspace = true +url.workspace = true diff --git a/examples/monitoring/src/main.rs b/examples/monitoring/src/main.rs new file mode 100644 index 0000000..1f553fa --- /dev/null +++ b/examples/monitoring/src/main.rs @@ -0,0 +1,23 @@ +use harmony::{ + inventory::Inventory, + maestro::Maestro, + modules::monitoring::monitoring_alerting::MonitoringAlertingScore, + topology::{K8sAnywhereTopology, oberservability::K8sMonitorConfig}, +}; + +#[tokio::main] +async fn main() { + let mut maestro = Maestro::::initialize( + Inventory::autoload(), + K8sAnywhereTopology::new(), + ) + .await + .unwrap(); + + let monitoring = MonitoringAlertingScore { + alert_channel_configs: None, + }; + + maestro.register_all(vec![Box::new(monitoring)]); + harmony_cli::init(maestro, None).await.unwrap(); +} diff --git a/harmony/src/domain/topology/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere.rs index 785a606..8888dd9 100644 --- a/harmony/src/domain/topology/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere.rs @@ -10,7 +10,10 @@ use crate::{ interpret::{InterpretError, Outcome}, inventory::Inventory, maestro::Maestro, - modules::{k3d::K3DInstallationScore, monitoring::kube_prometheus::kube_prometheus_helm_chart_score::kube_prometheus_helm_chart_score}, + modules::{ + k3d::K3DInstallationScore, + monitoring::kube_prometheus::kube_prometheus_helm_chart_score::kube_prometheus_helm_chart_score, + }, topology::LocalhostTopology, }; @@ -193,9 +196,7 @@ impl K8sAnywhereTopology { self.k8s_monitor .get_or_try_init(async || -> Result { - let config = K8sMonitorConfig { - chart: kube_prometheus_helm_chart_score(), - }; + let config = K8sMonitorConfig::cluster_monitor(); Ok(K8sMonitor { config }) }) .await @@ -302,12 +303,14 @@ impl TenantManager for K8sAnywhereTopology { } #[async_trait] impl Monitor for K8sAnywhereTopology { - async fn provision_monitor( + async fn provision_monitor( &self, + inventory: &Inventory, + topology: &T, alert_receivers: Option>>, ) -> Result { self.get_k8s_monitor()? - .provision_monitor(alert_receivers) + .provision_monitor(inventory, topology, alert_receivers) .await } diff --git a/harmony/src/domain/topology/oberservability/k8s.rs b/harmony/src/domain/topology/oberservability/k8s.rs index 2640358..004f9ec 100644 --- a/harmony/src/domain/topology/oberservability/k8s.rs +++ b/harmony/src/domain/topology/oberservability/k8s.rs @@ -3,14 +3,13 @@ use std::sync::Arc; use async_trait::async_trait; use serde::Serialize; +use crate::score::Score; + +use crate::topology::HelmCommand; use crate::{ interpret::{InterpretError, Outcome}, inventory::Inventory, - modules::{helm::chart::HelmChartInterpret, monitoring::kube_prometheus::{ - config::KubePrometheusConfig, - kube_prometheus_helm_chart_score::kube_prometheus_helm_chart_score, - }}, - topology::{K8sAnywhereTopology, Topology}, + topology::Topology, }; use super::{ @@ -25,19 +24,23 @@ pub struct K8sMonitor { #[async_trait] impl Monitor for K8sMonitor { - async fn provision_monitor( + async fn provision_monitor( &self, - alert_receivers: Option>>, + inventory: &Inventory, + topology: &T, + alert_channels: Option>>, ) -> Result { - if let Some(receivers) = alert_receivers { - let alert_channels = self.build_alert_channels(receivers).await?; + if let Some(channels) = alert_channels { + let alert_channels = self.build_alert_channels(channels).await?; for channel in alert_channels { channel.register_alert_channel().await?; } } let chart = self.config.chart.clone(); - // - chart.create_interpret(); + chart + .create_interpret() + .execute(inventory, topology) + .await?; Ok(Outcome::success("installed monitor".to_string())) } diff --git a/harmony/src/domain/topology/oberservability/mod.rs b/harmony/src/domain/topology/oberservability/mod.rs index 05c7f20..387709a 100644 --- a/harmony/src/domain/topology/oberservability/mod.rs +++ b/harmony/src/domain/topology/oberservability/mod.rs @@ -1,12 +1,23 @@ use serde::Serialize; -use crate::modules::helm::chart::HelmChartScore; +use crate::modules::{ + helm::chart::HelmChartScore, + monitoring::kube_prometheus::kube_prometheus_helm_chart_score::kube_prometheus_helm_chart_score, +}; -pub mod monitoring; pub mod k8s; +pub mod monitoring; #[derive(Debug, Clone, Serialize)] pub struct K8sMonitorConfig { //probably need to do something better here pub chart: HelmChartScore, } + +impl K8sMonitorConfig { + pub fn cluster_monitor() -> Self { + Self { + chart: kube_prometheus_helm_chart_score(), + } + } +} diff --git a/harmony/src/domain/topology/oberservability/monitoring.rs b/harmony/src/domain/topology/oberservability/monitoring.rs index d8b81d0..1d6a159 100644 --- a/harmony/src/domain/topology/oberservability/monitoring.rs +++ b/harmony/src/domain/topology/oberservability/monitoring.rs @@ -1,13 +1,12 @@ use async_trait::async_trait; use dyn_clone::DynClone; -use serde::Serialize; use std::fmt::Debug; -use std::sync::Arc; use crate::executors::ExecutorError; use crate::interpret::InterpretError; use crate::inventory::Inventory; +use crate::topology::HelmCommand; use crate::{interpret::Outcome, topology::Topology}; /// Represents an entity responsible for collecting and organizing observability data @@ -16,8 +15,10 @@ use crate::{interpret::Outcome, topology::Topology}; /// monitoring data, enabling consistent processing regardless of the underlying data source. #[async_trait] pub trait Monitor { - async fn provision_monitor( + async fn provision_monitor( &self, + inventory: &Inventory, + topology: &T, alert_receivers: Option>>, ) -> Result; diff --git a/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs b/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs index 397cab9..76a6498 100644 --- a/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs +++ b/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs @@ -1,7 +1,5 @@ use url::Url; - - #[derive(Debug, Clone)] pub struct DiscordWebhookAlertChannel { pub webhook_url: Url, diff --git a/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_helm_chart_score.rs b/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_helm_chart_score.rs index ac24c4a..fa9eb47 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_helm_chart_score.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_helm_chart_score.rs @@ -160,67 +160,65 @@ prometheus: repository: None, } } - // let alertmanager_config = alert_manager_yaml_builder(&config); - // values.push_str(&alertmanager_config); - // - // fn alert_manager_yaml_builder(config: &KubePrometheusConfig) -> String { - // let mut receivers = String::new(); - // let mut routes = String::new(); - // let mut global_configs = String::new(); - // let alert_manager = config.alert_manager; - // for alert_channel in &config.alert_channel { - // match alert_channel { - // AlertChannel::Discord { name, .. } => { - // let (receiver, route) = discord_alert_builder(name); - // info!("discord receiver: {} \nroute: {}", receiver, route); - // receivers.push_str(&receiver); - // routes.push_str(&route); - // } - // AlertChannel::Slack { - // slack_channel, - // webhook_url, - // } => { - // let (receiver, route) = slack_alert_builder(slack_channel); - // info!("slack receiver: {} \nroute: {}", receiver, route); - // receivers.push_str(&receiver); - // - // routes.push_str(&route); - // let global_config = format!( - // r#" - // global: - // slack_api_url: {webhook_url}"# - // ); - // - // global_configs.push_str(&global_config); - // } - // AlertChannel::Smpt { .. } => todo!(), - // } - // } - // info!("after alert receiver: {}", receivers); - // info!("after alert routes: {}", routes); - // - // let alertmanager_config = format!( - // r#" - //alertmanager: - // enabled: {alert_manager} - // config: {global_configs} - // route: - // group_by: ['job'] - // group_wait: 30s - // group_interval: 5m - // repeat_interval: 12h - // routes: - //{routes} - // receivers: - // - name: 'null' - //{receivers}"# - // ); - // - // info!("alert manager config: {}", alertmanager_config); - // alertmanager_config - // } - - +// let alertmanager_config = alert_manager_yaml_builder(&config); +// values.push_str(&alertmanager_config); +// +// fn alert_manager_yaml_builder(config: &KubePrometheusConfig) -> String { +// let mut receivers = String::new(); +// let mut routes = String::new(); +// let mut global_configs = String::new(); +// let alert_manager = config.alert_manager; +// for alert_channel in &config.alert_channel { +// match alert_channel { +// AlertChannel::Discord { name, .. } => { +// let (receiver, route) = discord_alert_builder(name); +// info!("discord receiver: {} \nroute: {}", receiver, route); +// receivers.push_str(&receiver); +// routes.push_str(&route); +// } +// AlertChannel::Slack { +// slack_channel, +// webhook_url, +// } => { +// let (receiver, route) = slack_alert_builder(slack_channel); +// info!("slack receiver: {} \nroute: {}", receiver, route); +// receivers.push_str(&receiver); +// +// routes.push_str(&route); +// let global_config = format!( +// r#" +// global: +// slack_api_url: {webhook_url}"# +// ); +// +// global_configs.push_str(&global_config); +// } +// AlertChannel::Smpt { .. } => todo!(), +// } +// } +// info!("after alert receiver: {}", receivers); +// info!("after alert routes: {}", routes); +// +// let alertmanager_config = format!( +// r#" +//alertmanager: +// enabled: {alert_manager} +// config: {global_configs} +// route: +// group_by: ['job'] +// group_wait: 30s +// group_interval: 5m +// repeat_interval: 12h +// routes: +//{routes} +// receivers: +// - name: 'null' +//{receivers}"# +// ); +// +// info!("alert manager config: {}", alertmanager_config); +// alertmanager_config +// } //fn discord_alert_builder(release_name: &String) -> (String, String) { // let discord_receiver_name = format!("Discord-{}", release_name); @@ -234,7 +232,7 @@ prometheus: // let route = format!( // r#" // - receiver: '{discord_receiver_name}' -// matchers: +// matchers: // - alertname!=Watchdog // continue: true"#, // ); @@ -255,7 +253,7 @@ prometheus: // let route = format!( // r#" // - receiver: '{slack_receiver_name}' -// matchers: +// matchers: // - alertname!=Watchdog // continue: true"#, // ); diff --git a/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_monitor.rs b/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_monitor.rs index 4cd2c8c..f66ab8a 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_monitor.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_monitor.rs @@ -77,11 +77,9 @@ // } //} - //#[async_trait] //pub trait PrometheusAlertChannel { // fn get_alert_channel_global_settings(&self) -> Option; // fn get_alert_channel_route(&self) -> AlertManagerChannelRoute; // fn get_alert_channel_receiver(&self) -> AlertManagerChannelReceiver; //} - diff --git a/harmony/src/modules/monitoring/monitoring_alerting.rs b/harmony/src/modules/monitoring/monitoring_alerting.rs index 5fe18a0..d4fcecb 100644 --- a/harmony/src/modules/monitoring/monitoring_alerting.rs +++ b/harmony/src/modules/monitoring/monitoring_alerting.rs @@ -1,11 +1,16 @@ use async_trait::async_trait; -use serde::{Serialize, Serializer, ser::SerializeStruct}; -use std::{fmt::Debug, sync::Arc}; - -use crate::{data::{Id, Version}, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, score::Score, topology::{ - oberservability::monitoring::{AlertChannelConfig, Monitor},HelmCommand, Topology - }}; +use serde::Serialize; +use crate::{ + data::{Id, Version}, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + score::Score, + topology::{ + HelmCommand, Topology, + oberservability::monitoring::{AlertChannelConfig, Monitor}, + }, +}; #[derive(Debug, Clone, Serialize)] pub struct MonitoringAlertingScore { @@ -37,7 +42,13 @@ impl Interpret for MonitoringAlertingInt inventory: &Inventory, topology: &T, ) -> Result { - topology.provision_monitor(self.score.alert_channel_configs.clone()).await + topology + .provision_monitor( + inventory, + topology, + self.score.alert_channel_configs.clone(), + ) + .await } fn get_name(&self) -> InterpretName { @@ -56,5 +67,3 @@ impl Interpret for MonitoringAlertingInt todo!() } } - - -- 2.39.5