feat: K8sFlavour #161

Merged
wjro merged 7 commits from feat/detect_k8s_flavour into master 2025-10-21 15:56:48 +00:00
4 changed files with 196 additions and 4 deletions
Showing only changes of commit 5f78300d78 - Show all commits

View File

@ -1,3 +1,5 @@
use std::time::Duration;
use derive_new::new;
use k8s_openapi::{
ClusterResourceScope, NamespaceResourceScope,
@ -9,6 +11,7 @@ use kube::{
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse,
error::DiscoveryError,
runtime::reflector::Lookup,
};
use kube::{api::DynamicObject, runtime::conditions};
@ -20,7 +23,7 @@ use log::{debug, error, trace};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::{Value, json};
use similar::TextDiff;
use tokio::io::AsyncReadExt;
use tokio::{io::AsyncReadExt, time::sleep};
#[derive(new, Clone)]
pub struct K8sClient {
@ -165,6 +168,41 @@ impl K8sClient {
}
}
pub async fn wait_for_pod_ready(
&self,
pod_name: &str,
namespace: Option<&str>,
) -> Result<(), Error> {
let mut elapsed = 0;
let interval = 5; // seconds between checks
let timeout_secs = 120;
loop {
let pod = self.get_pod(pod_name, namespace).await?;
if let Some(p) = pod {
if let Some(status) = p.status {
if let Some(phase) = status.phase {
if phase.to_lowercase() == "running" {
return Ok(());
}
}
}
}
if elapsed >= timeout_secs {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"'{}' in ns '{}' did not become ready within {}s",
pod_name,
namespace.unwrap(),
timeout_secs
))));
}
sleep(Duration::from_secs(interval)).await;
elapsed += interval;
}
}
/// Will execute a commond in the first pod found that matches the specified label
/// '{label}={name}'
pub async fn exec_app_capture_output(
@ -431,9 +469,12 @@ impl K8sClient {
.as_str()
.expect("couldn't get kind as str");
let split: Vec<&str> = api_version.splitn(2, "/").collect();
let g = split[0];
let v = split[1];
let mut it = api_version.splitn(2, '/');
let first = it.next().unwrap();
let (g, v) = match it.next() {
Some(second) => (first, second),
None => ("", first),
};
let gvk = GroupVersionKind::gvk(g, v, kind);
let api_resource = ApiResource::from_gvk(&gvk);

View File

@ -4,4 +4,5 @@ pub mod application_monitoring;
pub mod grafana;
pub mod kube_prometheus;
pub mod ntfy;
pub mod okd;
pub mod prometheus;

View File

@ -0,0 +1,149 @@
use std::{collections::BTreeMap, sync::Arc};
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
};
use async_trait::async_trait;
use harmony_types::id::Id;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::api::ObjectMeta;
use serde::Serialize;
#[derive(Clone, Debug, Serialize)]
pub struct OpenshiftUserWorkloadMonitoring {}
impl<T: Topology + K8sclient> Score<T> for OpenshiftUserWorkloadMonitoring {
fn name(&self) -> String {
"OpenshiftUserWorkloadMonitoringScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(OpenshiftUserWorkloadMonitoringInterpret {})
}
}
#[derive(Clone, Debug, Serialize)]
pub struct OpenshiftUserWorkloadMonitoringInterpret {}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for OpenshiftUserWorkloadMonitoringInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await.unwrap();
self.update_cluster_monitoring_config_cm(&client).await?;
self.update_user_workload_monitoring_config_cm(&client)
.await?;
self.verify_user_workload(&client).await?;
Ok(Outcome::success(
"successfully enabled user-workload-monitoring".to_string(),
))
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OpenshiftUserWorkloadMonitoring")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl OpenshiftUserWorkloadMonitoringInterpret {
pub async fn update_cluster_monitoring_config_cm(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let mut data = BTreeMap::new();
data.insert(
"config.yaml".to_string(),
r#"
enableUserWorkload: true
alertmanagerMain:
enableUserAlertmanagerConfig: true
"#
.to_string(),
);
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("cluster-monitoring-config".to_string()),
namespace: Some("openshift-monitoring".to_string()),
..Default::default()
},
data: Some(data),
..Default::default()
};
client.apply(&cm, Some("openshift-monitoring")).await?;
Ok(Outcome::success(
"updated cluster-monitoring-config-map".to_string(),
))
}
pub async fn update_user_workload_monitoring_config_cm(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let mut data = BTreeMap::new();
data.insert(
"config.yaml".to_string(),
r#"
alertmanager:
enabled: true
enableAlertmanagerConfig: true
"#
.to_string(),
);
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("user-workload-monitoring-config".to_string()),
namespace: Some("openshift-user-workload-monitoring".to_string()),
..Default::default()
},
data: Some(data),
..Default::default()
};
client
.apply(&cm, Some("openshift-user-workload-monitoring"))
.await?;
Ok(Outcome::success(
"updated openshift-user-monitoring-config-map".to_string(),
))
}
pub async fn verify_user_workload(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let namespace = "openshift-user-workload-monitoring";
let alertmanager_name = "alertmanager-user-workload-0";
let prometheus_name = "prometheus-user-workload-0";
client
.wait_for_pod_ready(alertmanager_name, Some(namespace))
.await?;
client
.wait_for_pod_ready(prometheus_name, Some(namespace))
.await?;
Ok(Outcome::success(format!(
"pods: {}, {} ready in ns: {}",
alertmanager_name, prometheus_name, namespace
)))
}
}

View File

@ -0,0 +1 @@
pub mod enable_user_workload;