diff --git a/harmony/src/domain/topology/mod.rs b/harmony/src/domain/topology/mod.rs index 3d773ff..0cbfee6 100644 --- a/harmony/src/domain/topology/mod.rs +++ b/harmony/src/domain/topology/mod.rs @@ -1,3 +1,4 @@ +pub mod monitoring_alerting; mod ha_cluster; mod host_binding; mod http; diff --git a/harmony/src/domain/topology/monitoring_alerting.rs b/harmony/src/domain/topology/monitoring_alerting.rs new file mode 100644 index 0000000..4951333 --- /dev/null +++ b/harmony/src/domain/topology/monitoring_alerting.rs @@ -0,0 +1,108 @@ +use std::sync::Arc; + +use log::warn; +use tokio::sync::OnceCell; + +use k8s_openapi::api::core::v1::Pod; +use kube::{ + Client, + api::{Api, ListParams}, +}; + +use async_trait::async_trait; + +use crate::{ + interpret::{InterpretError, Outcome}, + inventory::Inventory, + maestro::Maestro, + modules::monitoring::monitoring_alerting::MonitoringAlertingStackScore, + score::Score, +}; + +use super::{HelmCommand, K8sAnywhereTopology, Topology, k8s::K8sClient}; + +#[derive(Clone, Debug)] +struct MonitoringState { + message: String, +} + +#[derive(Debug)] +pub struct MonitoringAlertingTopology { + monitoring_state: OnceCell>, +} + +impl MonitoringAlertingTopology { + pub fn new() -> Self { + Self { + monitoring_state: OnceCell::new(), + } + } + + async fn get_monitoring_state(&self) -> Result, InterpretError> { + let client = Client::try_default() + .await + .map_err(|e| InterpretError::new(format!("Kubernetes client error: {}", e)))?; + + for ns in &["monitoring", "openshift-monitoring"] { + let pods: Api = Api::namespaced(client.clone(), ns); + //TODO hardcoding the label is a problem + //check all pods are ready + let lp = ListParams::default().labels("app.kubernetes.io/name=prometheus"); + + match pods.list(&lp).await { + Ok(pod_list) => { + for p in pod_list.items { + if let Some(status) = p.status { + if let Some(conditions) = status.conditions { + if conditions + .iter() + .any(|c| c.type_ == "Ready" && c.status == "True") + { + return Ok(Some(MonitoringState { + message: format!( + "Prometheus is ready in namespace: {}", + ns + ), + })); + } + } + } + } + } + Err(e) => { + warn!("Failed to query pods in ns {}: {}", ns, e); + } + } + } + + Ok(None) + } +} + +impl Clone for Box> { + fn clone(&self) -> Box> { + self.clone_box() + } +} + +#[async_trait] +impl Topology for MonitoringAlertingTopology { + fn name(&self) -> &str { + "MonitoringAlertingTopology" + } + + async fn ensure_ready(&self) -> Result { + if let Some(state) = self.get_monitoring_state().await? { + // Monitoring stack is already ready — stop app. + println!("{}", state.message); + std::process::exit(0); + } + + // Monitoring not found — proceed with installation. + Ok(Outcome::success( + "Monitoring stack installation started.".to_string(), + )) + } +} + +impl HelmCommand for MonitoringAlertingTopology {} diff --git a/harmony/src/modules/mod.rs b/harmony/src/modules/mod.rs index 6faeb00..f0530a7 100644 --- a/harmony/src/modules/mod.rs +++ b/harmony/src/modules/mod.rs @@ -12,3 +12,4 @@ pub mod load_balancer; pub mod okd; pub mod opnsense; pub mod tftp; +pub mod monitoring; diff --git a/harmony/src/modules/monitoring/kube_prometheus.rs b/harmony/src/modules/monitoring/kube_prometheus.rs new file mode 100644 index 0000000..dd4b3f1 --- /dev/null +++ b/harmony/src/modules/monitoring/kube_prometheus.rs @@ -0,0 +1,49 @@ +use std::str::FromStr; + +use non_blank_string_rs::NonBlankString; + +use crate::modules::helm::chart::HelmChartScore; + +pub fn kube_prometheus_score(ns: &str) -> HelmChartScore { + //TODO this should be make into a rule with default formatting that can be easily passed as a vec + //to the overrides or something leaving the user to deal with formatting here seems bad + let values = r#" +additionalPrometheusRulesMap: + pvc-alerts: + groups: + - name: pvc-alerts + rules: + - alert: 'PVC Fill Over 95 Percent In 2 Days' + expr: | + ( + kubelet_volume_stats_used_bytes + / + kubelet_volume_stats_capacity_bytes + ) > 0.95 + AND + predict_linear(kubelet_volume_stats_used_bytes[2d], 2 * 24 * 60 * 60) + / + kubelet_volume_stats_capacity_bytes + > 0.95 + for: 1m + labels: + severity: warning + annotations: + description: The PVC {{ $labels.persistentvolumeclaim }} in namespace {{ $labels.namespace }} is predicted to fill over 95% in less than 2 days. + title: PVC {{ $labels.persistentvolumeclaim }} in namespace {{ $labels.namespace }} will fill over 95% in less than 2 days +"#; + HelmChartScore { + namespace: Some(NonBlankString::from_str(ns).unwrap()), + release_name: NonBlankString::from_str("kube-prometheus").unwrap(), + chart_name: NonBlankString::from_str( + "oci://ghcr.io/prometheus-community/charts/kube-prometheus-stack", //use kube prometheus chart which includes grafana, prometheus, alert + //manager, etc + ) + .unwrap(), + chart_version: None, + values_overrides: None, + values_yaml: Some(values.to_string()), + create_namespace: true, + install_only: true, + } +} diff --git a/harmony/src/modules/monitoring/mod.rs b/harmony/src/modules/monitoring/mod.rs new file mode 100644 index 0000000..dd17cc1 --- /dev/null +++ b/harmony/src/modules/monitoring/mod.rs @@ -0,0 +1,3 @@ +pub mod monitoring_alerting; +mod kube_prometheus; + diff --git a/harmony/src/modules/monitoring/monitoring_alerting.rs b/harmony/src/modules/monitoring/monitoring_alerting.rs new file mode 100644 index 0000000..acc5969 --- /dev/null +++ b/harmony/src/modules/monitoring/monitoring_alerting.rs @@ -0,0 +1,144 @@ +use async_trait::async_trait; +use log::info; +use serde::Serialize; + +use crate::{ + data::{Id, Version}, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + maestro::Maestro, + score::{CloneBoxScore, Score}, + topology::{HelmCommand, Topology, monitoring_alerting::MonitoringAlertingTopology}, +}; + +use super::kube_prometheus::kube_prometheus_score; + +#[derive(Debug)] +pub struct MonitoringAlertingStackScore { + //TODO add documenation to explain why its here + //keeps it open for the end user to specify which stack they want + //if it isnt default kube-prometheus + pub monitoring_stack: Vec>>, + pub namespace: String, +} + +impl MonitoringAlertingStackScore { + pub fn new( + monitoring_stack: Vec>>, + namespace: String, + ) -> Self { + Self { + monitoring_stack, + namespace, + } + } +} + +impl Default for MonitoringAlertingStackScore { + fn default() -> Self { + let ns = "monitoring"; + Self { + monitoring_stack: vec![Box::new(kube_prometheus_score(ns))], + namespace: ns.to_string(), + } + } +} +impl Clone for MonitoringAlertingStackScore { + fn clone(&self) -> Self { + Self { + monitoring_stack: self + .monitoring_stack + .iter() + .map(|s| s.clone_box()) + .collect(), + namespace: self.namespace.clone(), + } + } +} + +impl Serialize for MonitoringAlertingStackScore { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + use serde::ser::SerializeStruct; + let mut s = serializer.serialize_struct("MonitoringAlertingStackScore", 1)?; + let monitoring_values: Vec<_> = self + .monitoring_stack + .iter() + .map(|m| m.serialize()) + .collect(); + s.serialize_field("monitoring", &monitoring_values)?; + s.end() + } +} + +impl Score for MonitoringAlertingStackScore { + fn create_interpret(&self) -> Box> { + Box::new(MonitoringAlertingStackInterpret { + score: MonitoringAlertingStackScore { + monitoring_stack: self + .monitoring_stack + .iter() + .map(|s| s.clone_box()) + .collect(), + namespace: self.namespace.clone(), + }, + }) + } + + fn name(&self) -> String { + format!("MonitoringAlertingStackScore") + } +} + +#[derive(Debug)] +struct MonitoringAlertingStackInterpret { + pub score: MonitoringAlertingStackScore, +} + +#[async_trait] +impl Interpret for MonitoringAlertingStackInterpret { + async fn execute( + &self, + _inventory: &Inventory, + _topology: &T, + ) -> Result { + let inventory = Inventory::autoload(); + let topology = MonitoringAlertingTopology::new(); + let maestro = match Maestro::initialize(inventory, topology).await { + Ok(m) => m, + Err(e) => { + println!("failed to initialize Maestro: {}", e); + std::process::exit(1); + } + }; + + let scores_vec = self.score.monitoring_stack.clone(); + for s in scores_vec{ + info!("Running: {}", s.name()); + maestro.interpret(s).await?; + } + + Ok(Outcome::success(format!( + "monitoring stack installed in {} namespace", + self.score.namespace + ))) + } + + fn get_name(&self) -> InterpretName { + todo!() + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +}