diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index a6eb100..5a1e6ec 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -1,13 +1,19 @@ +use std::time::Duration; + use derive_new::new; use k8s_openapi::{ ClusterResourceScope, NamespaceResourceScope, - api::{apps::v1::Deployment, core::v1::Pod}, + api::{ + apps::v1::Deployment, + core::v1::{Pod, PodStatus}, + }, }; use kube::{ Client, Config, Error, Resource, api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt}, config::{KubeConfigOptions, Kubeconfig}, core::ErrorResponse, + error::DiscoveryError, runtime::reflector::Lookup, }; use kube::{api::DynamicObject, runtime::conditions}; @@ -19,7 +25,7 @@ use log::{debug, error, trace}; use serde::{Serialize, de::DeserializeOwned}; use serde_json::{Value, json}; use similar::TextDiff; -use tokio::io::AsyncReadExt; +use tokio::{io::AsyncReadExt, time::sleep}; #[derive(new, Clone)] pub struct K8sClient { @@ -153,6 +159,41 @@ impl K8sClient { } } + pub async fn wait_for_pod_ready( + &self, + pod_name: &str, + namespace: Option<&str>, + ) -> Result<(), Error> { + let mut elapsed = 0; + let interval = 5; // seconds between checks + let timeout_secs = 120; + loop { + let pod = self.get_pod(pod_name, namespace).await?; + + if let Some(p) = pod { + if let Some(status) = p.status { + if let Some(phase) = status.phase { + if phase.to_lowercase() == "running" { + return Ok(()); + } + } + } + } + + if elapsed >= timeout_secs { + return Err(Error::Discovery(DiscoveryError::MissingResource(format!( + "'{}' in ns '{}' did not become ready within {}s", + pod_name, + namespace.unwrap(), + timeout_secs + )))); + } + + sleep(Duration::from_secs(interval)).await; + elapsed += interval; + } + } + /// Will execute a commond in the first pod found that matches the specified label /// '{label}={name}' pub async fn exec_app_capture_output( diff --git a/harmony/src/modules/monitoring/okd/enable_user_workload.rs b/harmony/src/modules/monitoring/okd/enable_user_workload.rs index d04c2cc..b322b4d 100644 --- a/harmony/src/modules/monitoring/okd/enable_user_workload.rs +++ b/harmony/src/modules/monitoring/okd/enable_user_workload.rs @@ -1,11 +1,4 @@ -use std::{sync::Arc, time::Duration}; - -use async_trait::async_trait; -use harmony_types::id::Id; -use k8s_openapi::api::core::v1::Pod; -use log::debug; -use serde::Serialize; -use tokio::time::sleep; +use std::{collections::BTreeMap, sync::Arc}; use crate::{ data::Version, @@ -14,6 +7,11 @@ use crate::{ score::Score, topology::{K8sclient, Topology, k8s::K8sClient}, }; +use async_trait::async_trait; +use harmony_types::id::Id; +use k8s_openapi::api::core::v1::ConfigMap; +use kube::api::ObjectMeta; +use serde::Serialize; #[derive(Clone, Debug, Serialize)] pub struct OpenshiftUserWorkloadMonitoring {} @@ -35,7 +33,7 @@ pub struct OpenshiftUserWorkloadMonitoringInterpret {} impl Interpret for OpenshiftUserWorkloadMonitoringInterpret { async fn execute( &self, - inventory: &Inventory, + _inventory: &Inventory, topology: &T, ) -> Result { let client = topology.k8s_client().await.unwrap(); @@ -70,49 +68,60 @@ impl OpenshiftUserWorkloadMonitoringInterpret { &self, client: &Arc, ) -> Result { - let cm = r#" -apiVersion: v1 -kind: ConfigMap -metadata: - name: cluster-monitoring-config - namespace: openshift-monitoring -data: - config.yaml: | - enableUserWorkload: true - alertmanagerMain: - enableUserAlertmanagerConfig: true -"#; - let cm_yaml: serde_yaml::Value = serde_yaml::from_str(&cm).unwrap(); - debug!("{:#?}", cm_yaml); - client - .apply_yaml(&cm_yaml, Some("openshift-monitoring")) - .await?; + let mut data = BTreeMap::new(); + data.insert( + "config.yaml".to_string(), + r#" +enableUserWorkload: true +alertmanagerMain: + enableUserAlertmanagerConfig: true +"# + .to_string(), + ); + + let cm = ConfigMap { + metadata: ObjectMeta { + name: Some("cluster-monitoring-config".to_string()), + namespace: Some("openshift-monitoring".to_string()), + ..Default::default() + }, + data: Some(data), + ..Default::default() + }; + client.apply(&cm, Some("openshift-monitoring")).await?; + Ok(Outcome::success( "updated cluster-monitoring-config-map".to_string(), )) } + pub async fn update_user_workload_monitoring_config_cm( &self, client: &Arc, ) -> Result { - let cm = r#" -apiVersion: v1 -kind: ConfigMap -metadata: - name: user-workload-monitoring-config - namespace: openshift-user-workload-monitoring -data: - config.yaml: | - alertmanager: - enabled: true - enableAlertmanagerConfig: true -"#; - - let cm_yaml: serde_yaml::Value = serde_yaml::from_str(&cm).unwrap(); - + let mut data = BTreeMap::new(); + data.insert( + "config.yaml".to_string(), + r#" +alertmanager: + enabled: true + enableAlertmanagerConfig: true +"# + .to_string(), + ); + let cm = ConfigMap { + metadata: ObjectMeta { + name: Some("user-workload-monitoring-config".to_string()), + namespace: Some("openshift-user-workload-monitoring".to_string()), + ..Default::default() + }, + data: Some(data), + ..Default::default() + }; client - .apply_yaml(&cm_yaml, Some("openshift-user-workload-monitoring")) + .apply(&cm, Some("openshift-user-workload-monitoring")) .await?; + Ok(Outcome::success( "updated openshift-user-monitoring-config-map".to_string(), )) @@ -125,47 +134,16 @@ data: let namespace = "openshift-user-workload-monitoring"; let alertmanager_name = "alertmanager-user-workload-0"; let prometheus_name = "prometheus-user-workload-0"; - self.wait_for_pod_ready(&client, alertmanager_name, namespace) + client + .wait_for_pod_ready(alertmanager_name, Some(namespace)) + .await?; + client + .wait_for_pod_ready(prometheus_name, Some(namespace)) .await?; - self.wait_for_pod_ready(&client, prometheus_name, namespace) - .await - } - async fn wait_for_pod_ready( - &self, - client: &Arc, - pod_name: &str, - namespace: &str, - ) -> Result { - let mut elapsed = 0; - let interval = 5; // seconds between checks - let timeout_secs = 120; - loop { - // Fetch the pod - let pod = client.get_pod(pod_name, Some(namespace)).await?; - - if let Some(p) = pod { - if let Some(status) = p.status { - if let Some(phase) = status.phase { - if phase.to_lowercase() == "running" { - return Ok(Outcome::success(format!( - "'{}' is ready with status.phase '{}'.", - pod_name, phase - ))); - } - } - } - } - - if elapsed >= timeout_secs { - return Err(InterpretError::new(format!( - "'{}' in ns '{}' did not become ready within {}s", - pod_name, namespace, timeout_secs - ))); - } - - sleep(Duration::from_secs(interval)).await; - elapsed += interval; - } + Ok(Outcome::success(format!( + "pods: {}, {} ready in ns: {}", + alertmanager_name, prometheus_name, namespace + ))) } }