From 21e51b8d809c2cd379e4d0d44f8d5095c2af63a2 Mon Sep 17 00:00:00 2001 From: Willem Date: Fri, 19 Sep 2025 10:00:13 -0400 Subject: [PATCH 1/3] feat(monitoring): score to enable user-workload-monitoring within okd --- harmony/src/modules/monitoring/mod.rs | 1 + .../monitoring/okd/enable_user_workload.rs | 181 ++++++++++++++++++ harmony/src/modules/monitoring/okd/mod.rs | 1 + 3 files changed, 183 insertions(+) create mode 100644 harmony/src/modules/monitoring/okd/enable_user_workload.rs create mode 100644 harmony/src/modules/monitoring/okd/mod.rs diff --git a/harmony/src/modules/monitoring/mod.rs b/harmony/src/modules/monitoring/mod.rs index b93f0c6..edda516 100644 --- a/harmony/src/modules/monitoring/mod.rs +++ b/harmony/src/modules/monitoring/mod.rs @@ -4,4 +4,5 @@ pub mod application_monitoring; pub mod grafana; pub mod kube_prometheus; pub mod ntfy; +pub mod okd; pub mod prometheus; diff --git a/harmony/src/modules/monitoring/okd/enable_user_workload.rs b/harmony/src/modules/monitoring/okd/enable_user_workload.rs new file mode 100644 index 0000000..d3a5998 --- /dev/null +++ b/harmony/src/modules/monitoring/okd/enable_user_workload.rs @@ -0,0 +1,181 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use harmony_types::id::Id; +use k8s_openapi::api::core::v1::Pod; +use kube::api::GroupVersionKind; +use serde::Serialize; + +use crate::{ + data::Version, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + score::Score, + topology::{K8sclient, Topology, k8s::K8sClient}, +}; + +#[derive(Clone, Debug, Serialize)] +pub struct OpenshiftUserWorkloadMonitoring {} + +impl Score for OpenshiftUserWorkloadMonitoring { + fn name(&self) -> String { + "OpenshiftUserWorkloadMonitoringScore".to_string() + } + + fn create_interpret(&self) -> Box> { + Box::new(OpenshiftUserWorkloadMonitoringInterpret {}) + } +} + +#[derive(Clone, Debug, Serialize)] +pub struct OpenshiftUserWorkloadMonitoringInterpret {} + +#[async_trait] +impl Interpret for OpenshiftUserWorkloadMonitoringInterpret { + async fn execute( + &self, + inventory: &Inventory, + topology: &T, + ) -> Result { + let client = topology.k8s_client().await.unwrap(); + self.update_cluster_monitoring_config_cm(&client).await?; + self.update_user_workload_monitoring_config_cm(&client) + .await?; + self.verify_user_workload(&client).await?; + Ok(Outcome::success( + "successfully enabled user-workload-monitoring".to_string(), + )) + } + + fn get_name(&self) -> InterpretName { + InterpretName::Custom("OpenshiftUserWorkloadMonitoring") + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +} + +impl OpenshiftUserWorkloadMonitoringInterpret { + pub async fn update_cluster_monitoring_config_cm( + &self, + client: &Arc, + ) -> Result { + let cm = format!( + r#" +apiVersion: v1 +kind: ConfigMap +metadata: + name: cluster-monitoring-config + namespace: openshift-monitoring +data: + config.yaml: | + enableUserWorkload: true + alertmanagerMain: + enableUserAlertmanagerConfig: true + "# + ); + + let cm_yaml = serde_yaml::to_value(cm).unwrap(); + + client + .apply_yaml(&cm_yaml, Some("openshift-monitoring")) + .await?; + Ok(Outcome::success( + "updated cluster-monitoring-config-map".to_string(), + )) + } + pub async fn update_user_workload_monitoring_config_cm( + &self, + client: &Arc, + ) -> Result { + let cm = format!( + r#" +apiVersion: v1 +kind: ConfigMap +metadata: + name: user-workload-monitoring-config + namespace: openshift-user-workload-monitoring +data: + config.yaml: | + alertmanager: + enabled: true + enableAlertmanagerConfig: true + "# + ); + + let cm_yaml = serde_yaml::to_value(cm).unwrap(); + + client + .apply_yaml(&cm_yaml, Some("openshift-user-workload-monitoring")) + .await?; + Ok(Outcome::success( + "updated openshift-user-monitoring-config-map".to_string(), + )) + } + + pub async fn verify_user_workload( + &self, + client: &Arc, + ) -> Result { + let namespace = "openshift-user-workload-monitoring"; + let alertmanager_name = "alertmanager-user-workload-0"; + let alertmanager = client + .get_pod(alertmanager_name, Some(namespace)) + .await + .unwrap(); + let prometheus_name = "prometheus-user-workload-0"; + let prometheus = client + .get_pod(prometheus_name, Some(namespace)) + .await + .unwrap(); + self.ensure_pod(alertmanager, alertmanager_name, namespace) + .await?; + self.ensure_pod(prometheus, prometheus_name, namespace) + .await + } + + async fn ensure_pod( + &self, + pod: Option, + pod_name: &str, + namespace: &str, + ) -> Result { + match pod { + Some(pod) => { + if let Some(status) = pod.status { + let phase = status.phase.unwrap_or("failed".to_string()); + if phase == "running" { + Ok(Outcome::success(format!( + "'{}' is ready with status.phase '{}'.", + pod.metadata.name.unwrap(), + phase + ))) + } else { + Err(InterpretError::new(format!( + "'{}' in namespace '{}' has status.phase '{}'.", + pod_name, namespace, phase + ))) + } + } else { + Err(InterpretError::new(format!( + "{} not found in ns: {}", + pod_name, namespace + ))) + } + } + None => Err(InterpretError::new(format!( + "'{}' not found in namespace '{}'", + pod_name, namespace + ))), + } + } +} diff --git a/harmony/src/modules/monitoring/okd/mod.rs b/harmony/src/modules/monitoring/okd/mod.rs new file mode 100644 index 0000000..50339ba --- /dev/null +++ b/harmony/src/modules/monitoring/okd/mod.rs @@ -0,0 +1 @@ +pub mod enable_user_workload; -- 2.39.5 From cdf75faa8f82cd8a09240314a88c5ed4cc18a52a Mon Sep 17 00:00:00 2001 From: Willem Date: Fri, 19 Sep 2025 15:11:28 -0400 Subject: [PATCH 2/3] feat(monitoring): tested and modified ensure pod ready to wait for pod ready, which prevents check from failing immediately and gives time for the resource to be created --- harmony/src/domain/topology/k8s.rs | 9 +- .../monitoring/okd/enable_user_workload.rs | 94 +++++++++---------- 2 files changed, 48 insertions(+), 55 deletions(-) diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 88bd2e8..a6eb100 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -419,9 +419,12 @@ impl K8sClient { .as_str() .expect("couldn't get kind as str"); - let split: Vec<&str> = api_version.splitn(2, "/").collect(); - let g = split[0]; - let v = split[1]; + let mut it = api_version.splitn(2, '/'); + let first = it.next().unwrap(); + let (g, v) = match it.next() { + Some(second) => (first, second), + None => ("", first), + }; let gvk = GroupVersionKind::gvk(g, v, kind); let api_resource = ApiResource::from_gvk(&gvk); diff --git a/harmony/src/modules/monitoring/okd/enable_user_workload.rs b/harmony/src/modules/monitoring/okd/enable_user_workload.rs index d3a5998..d04c2cc 100644 --- a/harmony/src/modules/monitoring/okd/enable_user_workload.rs +++ b/harmony/src/modules/monitoring/okd/enable_user_workload.rs @@ -1,10 +1,11 @@ -use std::sync::Arc; +use std::{sync::Arc, time::Duration}; use async_trait::async_trait; use harmony_types::id::Id; use k8s_openapi::api::core::v1::Pod; -use kube::api::GroupVersionKind; +use log::debug; use serde::Serialize; +use tokio::time::sleep; use crate::{ data::Version, @@ -69,8 +70,7 @@ impl OpenshiftUserWorkloadMonitoringInterpret { &self, client: &Arc, ) -> Result { - let cm = format!( - r#" + let cm = r#" apiVersion: v1 kind: ConfigMap metadata: @@ -81,11 +81,9 @@ data: enableUserWorkload: true alertmanagerMain: enableUserAlertmanagerConfig: true - "# - ); - - let cm_yaml = serde_yaml::to_value(cm).unwrap(); - +"#; + let cm_yaml: serde_yaml::Value = serde_yaml::from_str(&cm).unwrap(); + debug!("{:#?}", cm_yaml); client .apply_yaml(&cm_yaml, Some("openshift-monitoring")) .await?; @@ -97,8 +95,7 @@ data: &self, client: &Arc, ) -> Result { - let cm = format!( - r#" + let cm = r#" apiVersion: v1 kind: ConfigMap metadata: @@ -107,12 +104,11 @@ metadata: data: config.yaml: | alertmanager: - enabled: true - enableAlertmanagerConfig: true - "# - ); + enabled: true + enableAlertmanagerConfig: true +"#; - let cm_yaml = serde_yaml::to_value(cm).unwrap(); + let cm_yaml: serde_yaml::Value = serde_yaml::from_str(&cm).unwrap(); client .apply_yaml(&cm_yaml, Some("openshift-user-workload-monitoring")) @@ -128,54 +124,48 @@ data: ) -> Result { let namespace = "openshift-user-workload-monitoring"; let alertmanager_name = "alertmanager-user-workload-0"; - let alertmanager = client - .get_pod(alertmanager_name, Some(namespace)) - .await - .unwrap(); let prometheus_name = "prometheus-user-workload-0"; - let prometheus = client - .get_pod(prometheus_name, Some(namespace)) - .await - .unwrap(); - self.ensure_pod(alertmanager, alertmanager_name, namespace) + self.wait_for_pod_ready(&client, alertmanager_name, namespace) .await?; - self.ensure_pod(prometheus, prometheus_name, namespace) + self.wait_for_pod_ready(&client, prometheus_name, namespace) .await } - async fn ensure_pod( + async fn wait_for_pod_ready( &self, - pod: Option, + client: &Arc, pod_name: &str, namespace: &str, ) -> Result { - match pod { - Some(pod) => { - if let Some(status) = pod.status { - let phase = status.phase.unwrap_or("failed".to_string()); - if phase == "running" { - Ok(Outcome::success(format!( - "'{}' is ready with status.phase '{}'.", - pod.metadata.name.unwrap(), - phase - ))) - } else { - Err(InterpretError::new(format!( - "'{}' in namespace '{}' has status.phase '{}'.", - pod_name, namespace, phase - ))) + let mut elapsed = 0; + let interval = 5; // seconds between checks + let timeout_secs = 120; + loop { + // Fetch the pod + let pod = client.get_pod(pod_name, Some(namespace)).await?; + + if let Some(p) = pod { + if let Some(status) = p.status { + if let Some(phase) = status.phase { + if phase.to_lowercase() == "running" { + return Ok(Outcome::success(format!( + "'{}' is ready with status.phase '{}'.", + pod_name, phase + ))); + } } - } else { - Err(InterpretError::new(format!( - "{} not found in ns: {}", - pod_name, namespace - ))) } } - None => Err(InterpretError::new(format!( - "'{}' not found in namespace '{}'", - pod_name, namespace - ))), + + if elapsed >= timeout_secs { + return Err(InterpretError::new(format!( + "'{}' in ns '{}' did not become ready within {}s", + pod_name, namespace, timeout_secs + ))); + } + + sleep(Duration::from_secs(interval)).await; + elapsed += interval; } } } -- 2.39.5 From 36b909ee20319da1522a39b5c18032443e35881b Mon Sep 17 00:00:00 2001 From: Willem Date: Mon, 29 Sep 2025 09:36:01 -0400 Subject: [PATCH 3/3] fix: moved wait_for_pod_ready to K8sClient, switched rust yaml string to k8s_openapi resource ConfigMap --- harmony/src/domain/topology/k8s.rs | 45 +++++- .../monitoring/okd/enable_user_workload.rs | 142 ++++++++---------- 2 files changed, 103 insertions(+), 84 deletions(-) diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index a6eb100..5a1e6ec 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -1,13 +1,19 @@ +use std::time::Duration; + use derive_new::new; use k8s_openapi::{ ClusterResourceScope, NamespaceResourceScope, - api::{apps::v1::Deployment, core::v1::Pod}, + api::{ + apps::v1::Deployment, + core::v1::{Pod, PodStatus}, + }, }; use kube::{ Client, Config, Error, Resource, api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt}, config::{KubeConfigOptions, Kubeconfig}, core::ErrorResponse, + error::DiscoveryError, runtime::reflector::Lookup, }; use kube::{api::DynamicObject, runtime::conditions}; @@ -19,7 +25,7 @@ use log::{debug, error, trace}; use serde::{Serialize, de::DeserializeOwned}; use serde_json::{Value, json}; use similar::TextDiff; -use tokio::io::AsyncReadExt; +use tokio::{io::AsyncReadExt, time::sleep}; #[derive(new, Clone)] pub struct K8sClient { @@ -153,6 +159,41 @@ impl K8sClient { } } + pub async fn wait_for_pod_ready( + &self, + pod_name: &str, + namespace: Option<&str>, + ) -> Result<(), Error> { + let mut elapsed = 0; + let interval = 5; // seconds between checks + let timeout_secs = 120; + loop { + let pod = self.get_pod(pod_name, namespace).await?; + + if let Some(p) = pod { + if let Some(status) = p.status { + if let Some(phase) = status.phase { + if phase.to_lowercase() == "running" { + return Ok(()); + } + } + } + } + + if elapsed >= timeout_secs { + return Err(Error::Discovery(DiscoveryError::MissingResource(format!( + "'{}' in ns '{}' did not become ready within {}s", + pod_name, + namespace.unwrap(), + timeout_secs + )))); + } + + sleep(Duration::from_secs(interval)).await; + elapsed += interval; + } + } + /// Will execute a commond in the first pod found that matches the specified label /// '{label}={name}' pub async fn exec_app_capture_output( diff --git a/harmony/src/modules/monitoring/okd/enable_user_workload.rs b/harmony/src/modules/monitoring/okd/enable_user_workload.rs index d04c2cc..b322b4d 100644 --- a/harmony/src/modules/monitoring/okd/enable_user_workload.rs +++ b/harmony/src/modules/monitoring/okd/enable_user_workload.rs @@ -1,11 +1,4 @@ -use std::{sync::Arc, time::Duration}; - -use async_trait::async_trait; -use harmony_types::id::Id; -use k8s_openapi::api::core::v1::Pod; -use log::debug; -use serde::Serialize; -use tokio::time::sleep; +use std::{collections::BTreeMap, sync::Arc}; use crate::{ data::Version, @@ -14,6 +7,11 @@ use crate::{ score::Score, topology::{K8sclient, Topology, k8s::K8sClient}, }; +use async_trait::async_trait; +use harmony_types::id::Id; +use k8s_openapi::api::core::v1::ConfigMap; +use kube::api::ObjectMeta; +use serde::Serialize; #[derive(Clone, Debug, Serialize)] pub struct OpenshiftUserWorkloadMonitoring {} @@ -35,7 +33,7 @@ pub struct OpenshiftUserWorkloadMonitoringInterpret {} impl Interpret for OpenshiftUserWorkloadMonitoringInterpret { async fn execute( &self, - inventory: &Inventory, + _inventory: &Inventory, topology: &T, ) -> Result { let client = topology.k8s_client().await.unwrap(); @@ -70,49 +68,60 @@ impl OpenshiftUserWorkloadMonitoringInterpret { &self, client: &Arc, ) -> Result { - let cm = r#" -apiVersion: v1 -kind: ConfigMap -metadata: - name: cluster-monitoring-config - namespace: openshift-monitoring -data: - config.yaml: | - enableUserWorkload: true - alertmanagerMain: - enableUserAlertmanagerConfig: true -"#; - let cm_yaml: serde_yaml::Value = serde_yaml::from_str(&cm).unwrap(); - debug!("{:#?}", cm_yaml); - client - .apply_yaml(&cm_yaml, Some("openshift-monitoring")) - .await?; + let mut data = BTreeMap::new(); + data.insert( + "config.yaml".to_string(), + r#" +enableUserWorkload: true +alertmanagerMain: + enableUserAlertmanagerConfig: true +"# + .to_string(), + ); + + let cm = ConfigMap { + metadata: ObjectMeta { + name: Some("cluster-monitoring-config".to_string()), + namespace: Some("openshift-monitoring".to_string()), + ..Default::default() + }, + data: Some(data), + ..Default::default() + }; + client.apply(&cm, Some("openshift-monitoring")).await?; + Ok(Outcome::success( "updated cluster-monitoring-config-map".to_string(), )) } + pub async fn update_user_workload_monitoring_config_cm( &self, client: &Arc, ) -> Result { - let cm = r#" -apiVersion: v1 -kind: ConfigMap -metadata: - name: user-workload-monitoring-config - namespace: openshift-user-workload-monitoring -data: - config.yaml: | - alertmanager: - enabled: true - enableAlertmanagerConfig: true -"#; - - let cm_yaml: serde_yaml::Value = serde_yaml::from_str(&cm).unwrap(); - + let mut data = BTreeMap::new(); + data.insert( + "config.yaml".to_string(), + r#" +alertmanager: + enabled: true + enableAlertmanagerConfig: true +"# + .to_string(), + ); + let cm = ConfigMap { + metadata: ObjectMeta { + name: Some("user-workload-monitoring-config".to_string()), + namespace: Some("openshift-user-workload-monitoring".to_string()), + ..Default::default() + }, + data: Some(data), + ..Default::default() + }; client - .apply_yaml(&cm_yaml, Some("openshift-user-workload-monitoring")) + .apply(&cm, Some("openshift-user-workload-monitoring")) .await?; + Ok(Outcome::success( "updated openshift-user-monitoring-config-map".to_string(), )) @@ -125,47 +134,16 @@ data: let namespace = "openshift-user-workload-monitoring"; let alertmanager_name = "alertmanager-user-workload-0"; let prometheus_name = "prometheus-user-workload-0"; - self.wait_for_pod_ready(&client, alertmanager_name, namespace) + client + .wait_for_pod_ready(alertmanager_name, Some(namespace)) + .await?; + client + .wait_for_pod_ready(prometheus_name, Some(namespace)) .await?; - self.wait_for_pod_ready(&client, prometheus_name, namespace) - .await - } - async fn wait_for_pod_ready( - &self, - client: &Arc, - pod_name: &str, - namespace: &str, - ) -> Result { - let mut elapsed = 0; - let interval = 5; // seconds between checks - let timeout_secs = 120; - loop { - // Fetch the pod - let pod = client.get_pod(pod_name, Some(namespace)).await?; - - if let Some(p) = pod { - if let Some(status) = p.status { - if let Some(phase) = status.phase { - if phase.to_lowercase() == "running" { - return Ok(Outcome::success(format!( - "'{}' is ready with status.phase '{}'.", - pod_name, phase - ))); - } - } - } - } - - if elapsed >= timeout_secs { - return Err(InterpretError::new(format!( - "'{}' in ns '{}' did not become ready within {}s", - pod_name, namespace, timeout_secs - ))); - } - - sleep(Duration::from_secs(interval)).await; - elapsed += interval; - } + Ok(Outcome::success(format!( + "pods: {}, {} ready in ns: {}", + alertmanager_name, prometheus_name, namespace + ))) } } -- 2.39.5