fix: moved wait_for_pod_ready to K8sClient, switched rust yaml string to k8s_openapi resource ConfigMap

This commit is contained in:
Willem 2025-09-29 09:36:01 -04:00
parent cdf75faa8f
commit 36b909ee20
2 changed files with 103 additions and 84 deletions

View File

@ -1,13 +1,19 @@
use std::time::Duration;
use derive_new::new; use derive_new::new;
use k8s_openapi::{ use k8s_openapi::{
ClusterResourceScope, NamespaceResourceScope, ClusterResourceScope, NamespaceResourceScope,
api::{apps::v1::Deployment, core::v1::Pod}, api::{
apps::v1::Deployment,
core::v1::{Pod, PodStatus},
},
}; };
use kube::{ use kube::{
Client, Config, Error, Resource, Client, Config, Error, Resource,
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt}, api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
config::{KubeConfigOptions, Kubeconfig}, config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse, core::ErrorResponse,
error::DiscoveryError,
runtime::reflector::Lookup, runtime::reflector::Lookup,
}; };
use kube::{api::DynamicObject, runtime::conditions}; use kube::{api::DynamicObject, runtime::conditions};
@ -19,7 +25,7 @@ use log::{debug, error, trace};
use serde::{Serialize, de::DeserializeOwned}; use serde::{Serialize, de::DeserializeOwned};
use serde_json::{Value, json}; use serde_json::{Value, json};
use similar::TextDiff; use similar::TextDiff;
use tokio::io::AsyncReadExt; use tokio::{io::AsyncReadExt, time::sleep};
#[derive(new, Clone)] #[derive(new, Clone)]
pub struct K8sClient { pub struct K8sClient {
@ -153,6 +159,41 @@ impl K8sClient {
} }
} }
pub async fn wait_for_pod_ready(
&self,
pod_name: &str,
namespace: Option<&str>,
) -> Result<(), Error> {
let mut elapsed = 0;
let interval = 5; // seconds between checks
let timeout_secs = 120;
loop {
let pod = self.get_pod(pod_name, namespace).await?;
if let Some(p) = pod {
if let Some(status) = p.status {
if let Some(phase) = status.phase {
if phase.to_lowercase() == "running" {
return Ok(());
}
}
}
}
if elapsed >= timeout_secs {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"'{}' in ns '{}' did not become ready within {}s",
pod_name,
namespace.unwrap(),
timeout_secs
))));
}
sleep(Duration::from_secs(interval)).await;
elapsed += interval;
}
}
/// Will execute a commond in the first pod found that matches the specified label /// Will execute a commond in the first pod found that matches the specified label
/// '{label}={name}' /// '{label}={name}'
pub async fn exec_app_capture_output( pub async fn exec_app_capture_output(

View File

@ -1,11 +1,4 @@
use std::{sync::Arc, time::Duration}; use std::{collections::BTreeMap, sync::Arc};
use async_trait::async_trait;
use harmony_types::id::Id;
use k8s_openapi::api::core::v1::Pod;
use log::debug;
use serde::Serialize;
use tokio::time::sleep;
use crate::{ use crate::{
data::Version, data::Version,
@ -14,6 +7,11 @@ use crate::{
score::Score, score::Score,
topology::{K8sclient, Topology, k8s::K8sClient}, topology::{K8sclient, Topology, k8s::K8sClient},
}; };
use async_trait::async_trait;
use harmony_types::id::Id;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::api::ObjectMeta;
use serde::Serialize;
#[derive(Clone, Debug, Serialize)] #[derive(Clone, Debug, Serialize)]
pub struct OpenshiftUserWorkloadMonitoring {} pub struct OpenshiftUserWorkloadMonitoring {}
@ -35,7 +33,7 @@ pub struct OpenshiftUserWorkloadMonitoringInterpret {}
impl<T: Topology + K8sclient> Interpret<T> for OpenshiftUserWorkloadMonitoringInterpret { impl<T: Topology + K8sclient> Interpret<T> for OpenshiftUserWorkloadMonitoringInterpret {
async fn execute( async fn execute(
&self, &self,
inventory: &Inventory, _inventory: &Inventory,
topology: &T, topology: &T,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await.unwrap(); let client = topology.k8s_client().await.unwrap();
@ -70,49 +68,60 @@ impl OpenshiftUserWorkloadMonitoringInterpret {
&self, &self,
client: &Arc<K8sClient>, client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
let cm = r#" let mut data = BTreeMap::new();
apiVersion: v1 data.insert(
kind: ConfigMap "config.yaml".to_string(),
metadata: r#"
name: cluster-monitoring-config
namespace: openshift-monitoring
data:
config.yaml: |
enableUserWorkload: true enableUserWorkload: true
alertmanagerMain: alertmanagerMain:
enableUserAlertmanagerConfig: true enableUserAlertmanagerConfig: true
"#; "#
let cm_yaml: serde_yaml::Value = serde_yaml::from_str(&cm).unwrap(); .to_string(),
debug!("{:#?}", cm_yaml); );
client
.apply_yaml(&cm_yaml, Some("openshift-monitoring")) let cm = ConfigMap {
.await?; metadata: ObjectMeta {
name: Some("cluster-monitoring-config".to_string()),
namespace: Some("openshift-monitoring".to_string()),
..Default::default()
},
data: Some(data),
..Default::default()
};
client.apply(&cm, Some("openshift-monitoring")).await?;
Ok(Outcome::success( Ok(Outcome::success(
"updated cluster-monitoring-config-map".to_string(), "updated cluster-monitoring-config-map".to_string(),
)) ))
} }
pub async fn update_user_workload_monitoring_config_cm( pub async fn update_user_workload_monitoring_config_cm(
&self, &self,
client: &Arc<K8sClient>, client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
let cm = r#" let mut data = BTreeMap::new();
apiVersion: v1 data.insert(
kind: ConfigMap "config.yaml".to_string(),
metadata: r#"
name: user-workload-monitoring-config
namespace: openshift-user-workload-monitoring
data:
config.yaml: |
alertmanager: alertmanager:
enabled: true enabled: true
enableAlertmanagerConfig: true enableAlertmanagerConfig: true
"#; "#
.to_string(),
let cm_yaml: serde_yaml::Value = serde_yaml::from_str(&cm).unwrap(); );
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("user-workload-monitoring-config".to_string()),
namespace: Some("openshift-user-workload-monitoring".to_string()),
..Default::default()
},
data: Some(data),
..Default::default()
};
client client
.apply_yaml(&cm_yaml, Some("openshift-user-workload-monitoring")) .apply(&cm, Some("openshift-user-workload-monitoring"))
.await?; .await?;
Ok(Outcome::success( Ok(Outcome::success(
"updated openshift-user-monitoring-config-map".to_string(), "updated openshift-user-monitoring-config-map".to_string(),
)) ))
@ -125,47 +134,16 @@ data:
let namespace = "openshift-user-workload-monitoring"; let namespace = "openshift-user-workload-monitoring";
let alertmanager_name = "alertmanager-user-workload-0"; let alertmanager_name = "alertmanager-user-workload-0";
let prometheus_name = "prometheus-user-workload-0"; let prometheus_name = "prometheus-user-workload-0";
self.wait_for_pod_ready(&client, alertmanager_name, namespace) client
.wait_for_pod_ready(alertmanager_name, Some(namespace))
.await?;
client
.wait_for_pod_ready(prometheus_name, Some(namespace))
.await?; .await?;
self.wait_for_pod_ready(&client, prometheus_name, namespace)
.await
}
async fn wait_for_pod_ready( Ok(Outcome::success(format!(
&self, "pods: {}, {} ready in ns: {}",
client: &Arc<K8sClient>, alertmanager_name, prometheus_name, namespace
pod_name: &str, )))
namespace: &str,
) -> Result<Outcome, InterpretError> {
let mut elapsed = 0;
let interval = 5; // seconds between checks
let timeout_secs = 120;
loop {
// Fetch the pod
let pod = client.get_pod(pod_name, Some(namespace)).await?;
if let Some(p) = pod {
if let Some(status) = p.status {
if let Some(phase) = status.phase {
if phase.to_lowercase() == "running" {
return Ok(Outcome::success(format!(
"'{}' is ready with status.phase '{}'.",
pod_name, phase
)));
}
}
}
}
if elapsed >= timeout_secs {
return Err(InterpretError::new(format!(
"'{}' in ns '{}' did not become ready within {}s",
pod_name, namespace, timeout_secs
)));
}
sleep(Duration::from_secs(interval)).await;
elapsed += interval;
}
} }
} }