diff --git a/harmony/src/domain/topology/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere.rs index ef11f36..785a606 100644 --- a/harmony/src/domain/topology/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere.rs @@ -10,13 +10,18 @@ use crate::{ interpret::{InterpretError, Outcome}, inventory::Inventory, maestro::Maestro, - modules::k3d::K3DInstallationScore, + modules::{k3d::K3DInstallationScore, monitoring::kube_prometheus::kube_prometheus_helm_chart_score::kube_prometheus_helm_chart_score}, topology::LocalhostTopology, }; use super::{ HelmCommand, K8sclient, Topology, k8s::K8sClient, + oberservability::{ + K8sMonitorConfig, + k8s::K8sMonitor, + monitoring::{AlertChannel, AlertChannelConfig, Monitor}, + }, tenant::{ ResourceLimits, TenantConfig, TenantManager, TenantNetworkPolicy, k8s::K8sTenantManager, }, @@ -37,6 +42,7 @@ enum K8sSource { pub struct K8sAnywhereTopology { k8s_state: OnceCell>, tenant_manager: OnceCell, + k8s_monitor: OnceCell, } #[async_trait] @@ -61,6 +67,7 @@ impl K8sAnywhereTopology { Self { k8s_state: OnceCell::new(), tenant_manager: OnceCell::new(), + k8s_monitor: OnceCell::new(), } } @@ -178,6 +185,32 @@ impl K8sAnywhereTopology { )), } } + + async fn ensure_k8s_monitor(&self) -> Result<(), String> { + if let Some(_) = self.k8s_monitor.get() { + return Ok(()); + } + + self.k8s_monitor + .get_or_try_init(async || -> Result { + let config = K8sMonitorConfig { + chart: kube_prometheus_helm_chart_score(), + }; + Ok(K8sMonitor { config }) + }) + .await + .unwrap(); + Ok(()) + } + + fn get_k8s_monitor(&self) -> Result<&K8sMonitor, ExecutorError> { + match self.k8s_monitor.get() { + Some(k) => Ok(k), + None => Err(ExecutorError::UnexpectedError( + "K8sMonitor not available".to_string(), + )), + } + } } struct K8sAnywhereConfig { @@ -217,6 +250,10 @@ impl Topology for K8sAnywhereTopology { "No K8s client could be found or installed".to_string(), ))?; + self.ensure_k8s_monitor() + .await + .map_err(|e| InterpretError::new(e))?; + match self.is_helm_available() { Ok(()) => Ok(Outcome::success(format!( "{} + helm available", @@ -263,3 +300,18 @@ impl TenantManager for K8sAnywhereTopology { .await } } +#[async_trait] +impl Monitor for K8sAnywhereTopology { + async fn provision_monitor( + &self, + alert_receivers: Option>>, + ) -> Result { + self.get_k8s_monitor()? + .provision_monitor(alert_receivers) + .await + } + + fn delete_monitor(&self) -> Result { + todo!() + } +} diff --git a/harmony/src/domain/topology/oberservability/k8s.rs b/harmony/src/domain/topology/oberservability/k8s.rs new file mode 100644 index 0000000..2640358 --- /dev/null +++ b/harmony/src/domain/topology/oberservability/k8s.rs @@ -0,0 +1,68 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use serde::Serialize; + +use crate::{ + interpret::{InterpretError, Outcome}, + inventory::Inventory, + modules::{helm::chart::HelmChartInterpret, monitoring::kube_prometheus::{ + config::KubePrometheusConfig, + kube_prometheus_helm_chart_score::kube_prometheus_helm_chart_score, + }}, + topology::{K8sAnywhereTopology, Topology}, +}; + +use super::{ + K8sMonitorConfig, + monitoring::{AlertChannel, AlertChannelConfig, Monitor}, +}; + +#[derive(Debug, Clone, Serialize)] +pub struct K8sMonitor { + pub config: K8sMonitorConfig, +} + +#[async_trait] +impl Monitor for K8sMonitor { + async fn provision_monitor( + &self, + alert_receivers: Option>>, + ) -> Result { + if let Some(receivers) = alert_receivers { + let alert_channels = self.build_alert_channels(receivers).await?; + for channel in alert_channels { + channel.register_alert_channel().await?; + } + } + let chart = self.config.chart.clone(); + // + chart.create_interpret(); + Ok(Outcome::success("installed monitor".to_string())) + } + + fn delete_monitor(&self) -> Result { + todo!() + } +} + +#[async_trait] +impl AlertChannelConfig for K8sMonitor { + async fn build_alert_channel(&self) -> Result, InterpretError> { + todo!() + } +} + +impl K8sMonitor { + pub async fn build_alert_channels( + &self, + alert_channel_configs: Vec>, + ) -> Result>, InterpretError> { + let mut alert_channels = Vec::new(); + for config in alert_channel_configs { + let channel = config.build_alert_channel().await?; + alert_channels.push(channel) + } + Ok(alert_channels) + } +} diff --git a/harmony/src/domain/topology/oberservability/mod.rs b/harmony/src/domain/topology/oberservability/mod.rs index 7f2ac95..05c7f20 100644 --- a/harmony/src/domain/topology/oberservability/mod.rs +++ b/harmony/src/domain/topology/oberservability/mod.rs @@ -1 +1,12 @@ +use serde::Serialize; + +use crate::modules::helm::chart::HelmChartScore; + pub mod monitoring; +pub mod k8s; + +#[derive(Debug, Clone, Serialize)] +pub struct K8sMonitorConfig { + //probably need to do something better here + pub chart: HelmChartScore, +} diff --git a/harmony/src/domain/topology/oberservability/monitoring.rs b/harmony/src/domain/topology/oberservability/monitoring.rs index 8793400..d8b81d0 100644 --- a/harmony/src/domain/topology/oberservability/monitoring.rs +++ b/harmony/src/domain/topology/oberservability/monitoring.rs @@ -1,8 +1,13 @@ use async_trait::async_trait; +use dyn_clone::DynClone; +use serde::Serialize; use std::fmt::Debug; +use std::sync::Arc; +use crate::executors::ExecutorError; use crate::interpret::InterpretError; +use crate::inventory::Inventory; use crate::{interpret::Outcome, topology::Topology}; /// Represents an entity responsible for collecting and organizing observability data @@ -10,29 +15,24 @@ use crate::{interpret::Outcome, topology::Topology}; /// A `Monitor` abstracts the logic required to scrape, aggregate, and structure /// monitoring data, enabling consistent processing regardless of the underlying data source. #[async_trait] -pub trait Monitor: Debug + Send + Sync { - async fn deploy_monitor( +pub trait Monitor { + async fn provision_monitor( &self, - topology: &T, - alert_receivers: Vec>, + alert_receivers: Option>>, ) -> Result; - fn delete_monitor( - &self, - topolgy: &T, - alert_receivers: Vec>, - ) -> Result; -} -pub trait MonitorConfig: Debug + Send + Sync { - fn build_monitor(&self) -> Box>; -} - -pub trait AlertChannelConfig: std::fmt::Debug + Send + Sync { - fn build_alert_channel(&self) -> Box; - fn channel_type(&self) -> String; + fn delete_monitor(&self) -> Result; } #[async_trait] pub trait AlertChannel: Debug + Send + Sync { - async fn get_channel_id(&self) -> String; + async fn register_alert_channel(&self) -> Result; + //async fn get_channel_id(&self) -> String; } + +#[async_trait] +pub trait AlertChannelConfig: Debug + Send + Sync + DynClone { + async fn build_alert_channel(&self) -> Result, InterpretError>; +} + +dyn_clone::clone_trait_object!(AlertChannelConfig); diff --git a/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs b/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs index 9f7c9d0..397cab9 100644 --- a/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs +++ b/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs @@ -1,14 +1,6 @@ -use async_trait::async_trait; -use serde::Serialize; use url::Url; -use crate::{ - modules::monitoring::kube_prometheus::{ - kube_prometheus_monitor::PrometheusAlertChannel, - types::{AlertManagerGlobalConfigs, AlertManagerReceiver, AlertManagerRoute}, - }, - topology::oberservability::monitoring::{AlertChannel, AlertChannelConfig}, -}; + #[derive(Debug, Clone)] pub struct DiscordWebhookAlertChannel { @@ -17,36 +9,36 @@ pub struct DiscordWebhookAlertChannel { pub send_resolved_notifications: bool, } -impl AlertChannelConfig for DiscordWebhookAlertChannel { - fn build_alert_channel(&self) -> Box { - Box::new(DiscordWebhookAlertChannel { - webhook_url: self.webhook_url.clone(), - name: self.name.clone(), - send_resolved_notifications: self.send_resolved_notifications.clone(), - }) - } - fn channel_type(&self) -> String { - "discord".to_string() - } -} - -#[async_trait] -impl AlertChannel for DiscordWebhookAlertChannel { - async fn get_channel_id(&self) -> String { - self.name.clone() - } -} - -impl PrometheusAlertChannel for DiscordWebhookAlertChannel { - fn get_alert_channel_global_settings(&self) -> Option { - None - } - - fn get_alert_channel_route(&self) -> AlertManagerRoute { - todo!() - } - - fn get_alert_channel_receiver(&self) -> AlertManagerReceiver { - todo!() - } -} +//impl AlertChannelConfig for DiscordWebhookAlertChannel { +// fn build_alert_channel(&self) -> Box { +// Box::new(DiscordWebhookAlertChannel { +// webhook_url: self.webhook_url.clone(), +// name: self.name.clone(), +// send_resolved_notifications: self.send_resolved_notifications.clone(), +// }) +// } +// fn channel_type(&self) -> String { +// "discord".to_string() +// } +//} +// +//#[async_trait] +//impl AlertChannel for DiscordWebhookAlertChannel { +// async fn get_channel_id(&self) -> String { +// self.name.clone() +// } +//} +// +//impl PrometheusAlertChannel for DiscordWebhookAlertChannel { +// fn get_alert_channel_global_settings(&self) -> Option { +// None +// } +// +// fn get_alert_channel_route(&self) -> AlertManagerChannelRoute { +// todo!() +// } +// +// fn get_alert_channel_receiver(&self) -> AlertManagerChannelReceiver { +// todo!() +// } +//} diff --git a/harmony/src/modules/monitoring/kube_prometheus/config.rs b/harmony/src/modules/monitoring/kube_prometheus/config.rs index 0e62c0f..74fdf6f 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/config.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/config.rs @@ -1,5 +1,7 @@ use serde::Serialize; +use super::types::AlertManagerChannelConfig; + #[derive(Debug, Clone, Serialize)] pub struct KubePrometheusConfig { pub namespace: String, @@ -19,6 +21,7 @@ pub struct KubePrometheusConfig { pub kube_proxy: bool, pub kube_state_metrics: bool, pub prometheus_operator: bool, + pub alert_channels: Vec, } impl KubePrometheusConfig { pub fn new() -> Self { @@ -40,6 +43,7 @@ impl KubePrometheusConfig { prometheus_operator: true, core_dns: false, kube_scheduler: false, + alert_channels: Vec::new(), } } } diff --git a/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_helm_chart_score.rs b/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_helm_chart_score.rs index 708d94c..ac24c4a 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_helm_chart_score.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_helm_chart_score.rs @@ -5,7 +5,9 @@ use std::str::FromStr; use crate::modules::helm::chart::HelmChartScore; -pub fn kube_prometheus_helm_chart_score(config: &KubePrometheusConfig) -> HelmChartScore { +pub fn kube_prometheus_helm_chart_score() -> HelmChartScore { + let config = KubePrometheusConfig::new(); + //TODO this should be make into a rule with default formatting that can be easily passed as a vec //to the overrides or something leaving the user to deal with formatting here seems bad let default_rules = config.default_rules.to_string(); @@ -143,7 +145,21 @@ prometheus: enabled: {prometheus} "#, ); - + HelmChartScore { + namespace: Some(NonBlankString::from_str(&config.namespace).unwrap()), + release_name: NonBlankString::from_str("kube-prometheus").unwrap(), + chart_name: NonBlankString::from_str( + "oci://ghcr.io/prometheus-community/charts/kube-prometheus-stack", + ) + .unwrap(), + chart_version: None, + values_overrides: None, + values_yaml: Some(values.to_string()), + create_namespace: true, + install_only: true, + repository: None, + } +} // let alertmanager_config = alert_manager_yaml_builder(&config); // values.push_str(&alertmanager_config); // @@ -204,58 +220,44 @@ prometheus: // alertmanager_config // } - HelmChartScore { - namespace: Some(NonBlankString::from_str(&config.namespace).unwrap()), - release_name: NonBlankString::from_str("kube-prometheus").unwrap(), - chart_name: NonBlankString::from_str( - "oci://ghcr.io/prometheus-community/charts/kube-prometheus-stack", - ) - .unwrap(), - chart_version: None, - values_overrides: None, - values_yaml: Some(values.to_string()), - create_namespace: true, - install_only: true, - repository: None, - } -} -fn discord_alert_builder(release_name: &String) -> (String, String) { - let discord_receiver_name = format!("Discord-{}", release_name); - let receiver = format!( - r#" - - name: '{discord_receiver_name}' - webhook_configs: - - url: 'http://{release_name}-alertmanager-discord:9094' - send_resolved: true"#, - ); - let route = format!( - r#" - - receiver: '{discord_receiver_name}' - matchers: - - alertname!=Watchdog - continue: true"#, - ); - (receiver, route) -} -fn slack_alert_builder(slack_channel: &String) -> (String, String) { - let slack_receiver_name = format!("Slack-{}", slack_channel); - let receiver = format!( - r#" - - name: '{slack_receiver_name}' - slack_configs: - - channel: '{slack_channel}' - send_resolved: true - title: '{{{{ .CommonAnnotations.title }}}}' - text: '{{{{ .CommonAnnotations.description }}}}'"#, - ); - let route = format!( - r#" - - receiver: '{slack_receiver_name}' - matchers: - - alertname!=Watchdog - continue: true"#, - ); - (receiver, route) -} +//fn discord_alert_builder(release_name: &String) -> (String, String) { +// let discord_receiver_name = format!("Discord-{}", release_name); +// let receiver = format!( +// r#" +// - name: '{discord_receiver_name}' +// webhook_configs: +// - url: 'http://{release_name}-alertmanager-discord:9094' +// send_resolved: true"#, +// ); +// let route = format!( +// r#" +// - receiver: '{discord_receiver_name}' +// matchers: +// - alertname!=Watchdog +// continue: true"#, +// ); +// (receiver, route) +//} +// +//fn slack_alert_builder(slack_channel: &String) -> (String, String) { +// let slack_receiver_name = format!("Slack-{}", slack_channel); +// let receiver = format!( +// r#" +// - name: '{slack_receiver_name}' +// slack_configs: +// - channel: '{slack_channel}' +// send_resolved: true +// title: '{{{{ .CommonAnnotations.title }}}}' +// text: '{{{{ .CommonAnnotations.description }}}}'"#, +// ); +// let route = format!( +// r#" +// - receiver: '{slack_receiver_name}' +// matchers: +// - alertname!=Watchdog +// continue: true"#, +// ); +// (receiver, route) +//} diff --git a/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_monitor.rs b/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_monitor.rs index e8cc2d1..4cd2c8c 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_monitor.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/kube_prometheus_monitor.rs @@ -1,51 +1,87 @@ -use async_trait::async_trait; -use serde::Serialize; +//#[derive(Debug, Clone, Serialize)] +//pub struct KubePrometheusMonitorScore { +// pub kube_prometheus_config: KubePrometheusConfig, +// pub alert_channel_configs: Vec, +//} -use crate::{ - interpret::{InterpretError, Outcome}, - topology::{ - Topology, - oberservability::monitoring::{AlertChannel, Monitor, MonitorConfig}, - }, -}; +//impl> MonitorConfig +// for KubePrometheusMonitorScore +//{ +// fn build_monitor(&self) -> Box> { +// Box::new(self.clone()) +// } +//} -use super::{ - config::KubePrometheusConfig, - types::{AlertManagerGlobalConfigs, AlertManagerReceiver, AlertManagerRoute}, -}; +//impl> Score +// for KubePrometheusMonitorScore +//{ +// fn create_interpret(&self) -> Box> { +// Box::new(KubePrometheusMonitorInterpret { +// score: self.clone(), +// }) +// } +// +// fn name(&self) -> String { +// "KubePrometheusMonitorScore".to_string() +// } +//} -#[derive(Debug, Clone, Serialize)] -pub struct KubePrometheusMonitor { - pub kube_prometheus_config: KubePrometheusConfig, -} +//#[derive(Debug, Clone)] +//pub struct KubePrometheusMonitorInterpret { +// score: KubePrometheusMonitorScore, +//} -impl MonitorConfig for KubePrometheusMonitor { - fn build_monitor(&self) -> Box> { - Box::new(self.clone()) - } -} +//#[async_trait] +//impl AlertChannelConfig for KubePrometheusMonitorInterpret { +// async fn build_alert_channel( +// &self, +// ) -> Box { +// todo!() +// } +//} +//#[async_trait] +//impl> Interpret +// for KubePrometheusMonitorInterpret +//{ +// async fn execute( +// &self, +// inventory: &Inventory, +// topology: &T, +// ) -> Result { +// let monitor = self.score.build_monitor(); +// +// let mut alert_channels = Vec::new(); +// //for config in self.score.alert_channel_configs { +// // alert_channels.push(self.build_alert_channel()); +// //} +// +// monitor +// .deploy_monitor(inventory, topology, alert_channels) +// .await +// } +// +// fn get_name(&self) -> InterpretName { +// todo!() +// } +// +// fn get_version(&self) -> Version { +// todo!() +// } +// +// fn get_status(&self) -> InterpretStatus { +// todo!() +// } +// +// fn get_children(&self) -> Vec { +// todo!() +// } +//} -#[async_trait] -pub trait PrometheusAlertChannel: AlertChannel { - fn get_alert_channel_global_settings(&self) -> Option; - fn get_alert_channel_route(&self) -> AlertManagerRoute; - fn get_alert_channel_receiver(&self) -> AlertManagerReceiver; -} -#[async_trait] -impl Monitor for KubePrometheusMonitor { - async fn deploy_monitor( - &self, - _topology: &T, - _alert_channels: Vec>, - ) -> Result { - todo!() - } - fn delete_monitor( - &self, - _topology: &T, - _alert_channels: Vec>, - ) -> Result { - todo!() - } -} +//#[async_trait] +//pub trait PrometheusAlertChannel { +// fn get_alert_channel_global_settings(&self) -> Option; +// fn get_alert_channel_route(&self) -> AlertManagerChannelRoute; +// fn get_alert_channel_receiver(&self) -> AlertManagerChannelReceiver; +//} + diff --git a/harmony/src/modules/monitoring/kube_prometheus/types.rs b/harmony/src/modules/monitoring/kube_prometheus/types.rs index 095aa55..fa66f99 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/types.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/types.rs @@ -1,3 +1,14 @@ -pub struct AlertManagerGlobalConfigs {} -pub struct AlertManagerReceiver {} -pub struct AlertManagerRoute {} +use serde::Serialize; + +#[derive(Debug, Clone, Serialize)] +pub struct AlertManagerChannelConfig { + pub global_configs: AlertManagerChannelGlobalConfigs, + pub route: AlertManagerChannelRoute, + pub receiver: AlertManagerChannelReceiver, +} +#[derive(Debug, Clone, Serialize)] +pub struct AlertManagerChannelGlobalConfigs {} +#[derive(Debug, Clone, Serialize)] +pub struct AlertManagerChannelReceiver {} +#[derive(Debug, Clone, Serialize)] +pub struct AlertManagerChannelRoute {} diff --git a/harmony/src/modules/monitoring/monitoring_alerting.rs b/harmony/src/modules/monitoring/monitoring_alerting.rs index 0391bff..5fe18a0 100644 --- a/harmony/src/modules/monitoring/monitoring_alerting.rs +++ b/harmony/src/modules/monitoring/monitoring_alerting.rs @@ -2,41 +2,18 @@ use async_trait::async_trait; use serde::{Serialize, Serializer, ser::SerializeStruct}; use std::{fmt::Debug, sync::Arc}; -use crate::{ - data::{Id, Version}, - interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, - inventory::Inventory, - score::Score, - topology::{ - HelmCommand, Topology, - oberservability::monitoring::{AlertChannelConfig, MonitorConfig}, - }, -}; +use crate::{data::{Id, Version}, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, score::Score, topology::{ + oberservability::monitoring::{AlertChannelConfig, Monitor},HelmCommand, Topology + }}; -use super::kube_prometheus::{ - config::KubePrometheusConfig, kube_prometheus_monitor::KubePrometheusMonitor, -}; -#[derive(Debug, Clone)] -pub struct MonitoringAlertingScore { - pub monitor_config: Arc>, - pub alert_channel_configs: Vec>, - pub namespace: Option, +#[derive(Debug, Clone, Serialize)] +pub struct MonitoringAlertingScore { + #[serde(skip)] + pub alert_channel_configs: Option>>, } -impl MonitoringAlertingScore { - pub fn default() -> Self { - Self { - monitor_config: Arc::new(KubePrometheusMonitor { - kube_prometheus_config: KubePrometheusConfig::new(), - }), - alert_channel_configs: Vec::new(), - namespace: Some("monitoring".to_string()), - } - } -} - -impl Score for MonitoringAlertingScore { +impl Score for MonitoringAlertingScore { fn create_interpret(&self) -> Box> { Box::new(MonitoringAlertingInterpret { score: self.clone(), @@ -44,30 +21,23 @@ impl Score for Monitorin } fn name(&self) -> String { - todo!() + "MonitoringAlertingScore".to_string() } } #[derive(Debug)] -struct MonitoringAlertingInterpret { - score: MonitoringAlertingScore, +struct MonitoringAlertingInterpret { + score: MonitoringAlertingScore, } #[async_trait] -impl Interpret for MonitoringAlertingInterpret { +impl Interpret for MonitoringAlertingInterpret { async fn execute( &self, - _inventory: &Inventory, + inventory: &Inventory, topology: &T, ) -> Result { - let monitor = self.score.monitor_config.build_monitor(); - - let mut alert_channels = Vec::new(); - for config in &self.score.alert_channel_configs { - alert_channels.push(config.build_alert_channel()); - } - - monitor.deploy_monitor(topology, alert_channels).await + topology.provision_monitor(self.score.alert_channel_configs.clone()).await } fn get_name(&self) -> InterpretName { @@ -87,20 +57,4 @@ impl Interpret for MonitoringAlertingInter } } -impl Serialize for MonitoringAlertingScore { - fn serialize(&self, serializer: S) -> Result - where - S: Serializer, - { - let mut state = serializer.serialize_struct("MonitoringAlertingScore", 3)?; - // For now, just serialize basic info - state.serialize_field("monitor_type", "monitoring_system")?; - - let channel_count = self.alert_channel_configs.len(); - state.serialize_field("alert_channel_count", &channel_count)?; - - state.serialize_field("namespace", &self.namespace)?; - state.end() - } -}