From 24922321b186d028ba7c4d0c5c729009455c37fc Mon Sep 17 00:00:00 2001 From: Jean-Gabriel Gill-Couture Date: Wed, 5 Nov 2025 16:59:48 -0500 Subject: [PATCH] fix: webhook name must be k8s field compliant, add a FIXME note --- Cargo.lock | 19 ++ examples/okd_cluster_alerts/src/main.rs | 2 +- harmony/src/domain/topology/k8s.rs | 165 +++++++++++++++++- harmony/src/domain/topology/tenant/k8s.rs | 2 +- .../alert_channel/discord_alert_channel.rs | 15 +- .../monitoring/okd/cluster_monitoring.rs | 150 ++++++++++------ 6 files changed, 295 insertions(+), 58 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7d9cdcf..01d631c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1804,6 +1804,25 @@ dependencies = [ "url", ] +[[package]] +name = "example-okd-cluster-alerts" +version = "0.1.0" +dependencies = [ + "brocade", + "cidr", + "env_logger", + "harmony", + "harmony_cli", + "harmony_macros", + "harmony_secret", + "harmony_secret_derive", + "harmony_types", + "log", + "serde", + "tokio", + "url", +] + [[package]] name = "example-okd-install" version = "0.1.0" diff --git a/examples/okd_cluster_alerts/src/main.rs b/examples/okd_cluster_alerts/src/main.rs index 0d5241c..631b3ad 100644 --- a/examples/okd_cluster_alerts/src/main.rs +++ b/examples/okd_cluster_alerts/src/main.rs @@ -15,7 +15,7 @@ async fn main() { K8sAnywhereTopology::from_env(), vec![Box::new(OpenshiftClusterAlertScore { receivers: vec![Box::new(DiscordWebhook { - name: "Webhook example".to_string(), + name: "discord-webhook-example".to_string(), url: hurl!("http://something.o"), })], })], diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 35ab211..3ad997b 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -13,7 +13,7 @@ use kube::{ Client, Config, Discovery, Error, Resource, api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt}, config::{KubeConfigOptions, Kubeconfig}, - core::ErrorResponse, + core::{DynamicResourceScope, ErrorResponse}, error::DiscoveryError, runtime::reflector::Lookup, }; @@ -349,6 +349,169 @@ impl K8sClient { } } + fn get_api_for_dynamic_object( + &self, + object: &DynamicObject, + ns: Option<&str>, + ) -> Result, Error> { + let api_resource = object + .types + .as_ref() + .and_then(|t| { + let parts: Vec<&str> = t.api_version.split('/').collect(); + match parts.as_slice() { + [version] => Some(ApiResource::from_gvk(&GroupVersionKind::gvk( + "", version, &t.kind, + ))), + [group, version] => Some(ApiResource::from_gvk(&GroupVersionKind::gvk( + group, version, &t.kind, + ))), + _ => None, + } + }) + .ok_or_else(|| { + Error::BuildRequest(kube::core::request::Error::Validation( + "Invalid apiVersion in DynamicObject {object:#?}".to_string(), + )) + })?; + + match ns { + Some(ns) => Ok(Api::namespaced_with(self.client.clone(), ns, &api_resource)), + None => Ok(Api::default_namespaced_with( + self.client.clone(), + &api_resource, + )), + } + } + + pub async fn apply_dynamic_many( + &self, + resource: &[DynamicObject], + namespace: Option<&str>, + force_conflicts: bool, + ) -> Result, Error> { + let mut result = Vec::new(); + for r in resource.iter() { + result.push(self.apply_dynamic(r, namespace, force_conflicts).await?); + } + + Ok(result) + } + + /// Apply DynamicObject resource to the cluster + pub async fn apply_dynamic( + &self, + resource: &DynamicObject, + namespace: Option<&str>, + force_conflicts: bool, + ) -> Result { + // Build API for this dynamic object + let api = self.get_api_for_dynamic_object(resource, namespace)?; + let name = resource + .metadata + .name + .as_ref() + .ok_or_else(|| { + Error::BuildRequest(kube::core::request::Error::Validation( + "DynamicObject must have metadata.name".to_string(), + )) + })? + .as_str(); + + debug!( + "Applying dynamic resource kind={:?} apiVersion={:?} name='{}' ns={:?}", + resource.types.as_ref().map(|t| &t.kind), + resource.types.as_ref().map(|t| &t.api_version), + name, + namespace + ); + trace!( + "Dynamic resource payload:\n{:#}", + serde_json::to_value(resource).unwrap_or(serde_json::Value::Null) + ); + + // Using same field manager as in apply() + let mut patch_params = PatchParams::apply("harmony"); + patch_params.force = force_conflicts; + + if *crate::config::DRY_RUN { + // Dry-run path: fetch current, show diff, and return appropriate object + match api.get(name).await { + Ok(current) => { + trace!("Received current dynamic value {current:#?}"); + + println!("\nPerforming dry-run for resource: '{}'", name); + + // Serialize current and new, and strip status from current if present + let mut current_yaml = + serde_yaml::to_value(¤t).unwrap_or_else(|_| serde_yaml::Value::Null); + if let Some(map) = current_yaml.as_mapping_mut() { + if map.contains_key(&serde_yaml::Value::String("status".to_string())) { + let removed = + map.remove(&serde_yaml::Value::String("status".to_string())); + trace!("Removed status from current dynamic object: {:?}", removed); + } else { + trace!( + "Did not find status entry for current dynamic object {}/{}", + current.metadata.namespace.as_deref().unwrap_or(""), + current.metadata.name.as_deref().unwrap_or("") + ); + } + } + + let current_yaml = serde_yaml::to_string(¤t_yaml) + .unwrap_or_else(|_| "Failed to serialize current resource".to_string()); + let new_yaml = serde_yaml::to_string(resource) + .unwrap_or_else(|_| "Failed to serialize new resource".to_string()); + + if current_yaml == new_yaml { + println!("No changes detected."); + return Ok(current); + } + + println!("Changes detected:"); + let diff = TextDiff::from_lines(¤t_yaml, &new_yaml); + for change in diff.iter_all_changes() { + let sign = match change.tag() { + similar::ChangeTag::Delete => "-", + similar::ChangeTag::Insert => "+", + similar::ChangeTag::Equal => " ", + }; + print!("{}{}", sign, change); + } + + // Return the incoming resource as the would-be applied state + Ok(resource.clone()) + } + Err(Error::Api(ErrorResponse { code: 404, .. })) => { + println!("\nPerforming dry-run for new resource: '{}'", name); + println!( + "Resource does not exist. It would be created with the following content:" + ); + let new_yaml = serde_yaml::to_string(resource) + .unwrap_or_else(|_| "Failed to serialize new resource".to_string()); + for line in new_yaml.lines() { + println!("+{}", line); + } + Ok(resource.clone()) + } + Err(e) => { + error!("Failed to get dynamic resource '{}': {}", name, e); + Err(e) + } + } + } else { + // Real apply via server-side apply + debug!("Patching (server-side apply) dynamic resource '{}'", name); + api.patch(name, &patch_params, &Patch::Apply(resource)) + .await + .map_err(|e| { + error!("Failed to apply dynamic resource '{}': {}", name, e); + e + }) + } + } + /// Apply a resource in namespace /// /// See `kubectl apply` for more information on the expected behavior of this function diff --git a/harmony/src/domain/topology/tenant/k8s.rs b/harmony/src/domain/topology/tenant/k8s.rs index 8085127..cc6df13 100644 --- a/harmony/src/domain/topology/tenant/k8s.rs +++ b/harmony/src/domain/topology/tenant/k8s.rs @@ -14,7 +14,7 @@ use k8s_openapi::{ }, apimachinery::pkg::util::intstr::IntOrString, }; -use kube::Resource; +use kube::{api::DynamicObject, Resource}; use log::debug; use serde::de::DeserializeOwned; use serde_json::json; diff --git a/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs b/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs index e455e76..98d1d38 100644 --- a/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs +++ b/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs @@ -32,6 +32,19 @@ use harmony_types::net::Url; #[derive(Debug, Clone, Serialize)] pub struct DiscordWebhook { + // FIXME use a stricter type as this is used as a k8s resource name. It could also be converted + // to remove whitespace and other invalid characters, but this is a potential bug that is not + // very easy to figure out for beginners. + // + // It gives out error messages like this : + // + // [2025-10-30 15:10:49 ERROR harmony::domain::topology::k8s] Failed to get dynamic resource 'Webhook example-secret': Failed to build request: failed to build request: invalid uri character + // [2025-10-30 15:10:49 ERROR harmony_cli::cli_logger] ⚠️ InterpretError : Failed to build request: failed to build request: invalid uri character + // [2025-10-30 15:10:49 DEBUG harmony::domain::maestro] Got result Err(InterpretError { msg: "InterpretError : Failed to build request: failed to build request: invalid uri character" }) + // [2025-10-30 15:10:49 INFO harmony_cli::cli_logger] 🎼 Harmony completed + // + // thread 'main' panicked at examples/okd_cluster_alerts/src/main.rs:25:6: + // called `Result::unwrap()` on an `Err` value: InterpretError { msg: "InterpretError : Failed to build request: failed to build request: invalid uri character" } pub name: String, pub url: Url, } @@ -84,7 +97,7 @@ impl AlertReceiver for DiscordWebhook { } fn name(&self) -> String { - todo!() + self.name.clone() } fn clone_box(&self) -> Box> { diff --git a/harmony/src/modules/monitoring/okd/cluster_monitoring.rs b/harmony/src/modules/monitoring/okd/cluster_monitoring.rs index 2dfc169..9706061 100644 --- a/harmony/src/modules/monitoring/okd/cluster_monitoring.rs +++ b/harmony/src/modules/monitoring/okd/cluster_monitoring.rs @@ -1,5 +1,4 @@ use base64::prelude::*; -use std::sync::Arc; use async_trait::async_trait; use harmony_types::id::Id; @@ -11,21 +10,9 @@ use crate::{ data::Version, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, - modules::{ - application::Application, - monitoring::{ - grafana::grafana::Grafana, - kube_prometheus::crd::crd_alertmanager_config::CRDPrometheus, - okd::OpenshiftClusterAlertSender, - }, - prometheus::prometheus::PrometheusMonitoring, - }, + modules::monitoring::okd::OpenshiftClusterAlertSender, score::Score, - topology::{ - K8sclient, Topology, - k8s::K8sClient, - oberservability::monitoring::{AlertReceiver, AlertingInterpret, ScrapeTarget}, - }, + topology::{K8sclient, Topology, oberservability::monitoring::AlertReceiver}, }; impl Clone for Box> { @@ -74,31 +61,33 @@ impl Interpret for OpenshiftClusterAlertInterpret { topology: &T, ) -> Result { let client = topology.k8s_client().await?; + let openshift_monitoring_namespace = "openshift-monitoring"; - let secret: DynamicObject = client - .get_secret_json_value("alertmanager-main", Some("openshift-monitoring")) + let mut alertmanager_main_secret: DynamicObject = client + .get_secret_json_value("alertmanager-main", Some(openshift_monitoring_namespace)) .await?; - trace!("Got secret {secret:?}"); + trace!("Got secret {alertmanager_main_secret:#?}"); - let data: serde_json::Value = secret.data; + let data: &mut serde_json::Value = &mut alertmanager_main_secret.data; trace!("Alertmanager-main secret data {data:#?}"); + let data_obj = data + .get_mut("data") + .ok_or(InterpretError::new( + "Missing 'data' field in alertmanager-main secret.".to_string(), + ))? + .as_object_mut() + .ok_or(InterpretError::new( + "'data' field in alertmanager-main secret is expected to be an object ." + .to_string(), + ))?; - // TODO fix this unwrap, handle the option gracefully - let config_b64 = match data.get("data") { - Some(data_value) => match data_value.get("alertmanager.yaml") { - Some(value) => value.as_str().unwrap_or(""), - None => { - return Err(InterpretError::new( - "Missing 'alertmanager.yaml' in alertmanager-main secret".to_string(), - )); - } - }, - None => { - return Err(InterpretError::new( - "Missing 'data' field in alertmanager-main secret.".to_string(), - )); - } - }; + let config_b64 = data_obj + .get("alertmanager.yaml") + .ok_or(InterpretError::new( + "Missing 'alertmanager.yaml' in alertmanager-main secret data".to_string(), + ))? + .as_str() + .unwrap_or(""); trace!("Config base64 {config_b64}"); let config_bytes = BASE64_STANDARD.decode(config_b64).unwrap_or_default(); @@ -109,34 +98,28 @@ impl Interpret for OpenshiftClusterAlertInterpret { debug!("Current alertmanager config {am_config:#?}"); - let existing_receivers = if let Some(receivers) = am_config.get_mut("receivers") { - match receivers.as_mapping_mut() { - Some(recv) => recv, + let existing_receivers_sequence = if let Some(receivers) = am_config.get_mut("receivers") { + match receivers.as_sequence_mut() { + Some(seq) => seq, None => { return Err(InterpretError::new(format!( - "Expected alertmanager config receivers to be a mapping, got {receivers:?}" + "Expected alertmanager config receivers to be a sequence, got {:?}", + receivers ))); } } } else { - &mut serde_yaml::mapping::Mapping::default() + &mut serde_yaml::Sequence::default() }; - trace!("Existing receivers : {existing_receivers:#?}"); + let mut additional_resources = vec![]; for custom_receiver in &self.receivers { - let name = &custom_receiver.name(); - if let Some(recv) = existing_receivers.get(name) { - info!( - "AlertManager receiver {name} already exists and will be overwritten : {recv:#?}" - ); - } - debug!( - "Custom receiver YAML output: {:?}", - custom_receiver.as_alertmanager_receiver() - ); + let name = custom_receiver.name(); + let alertmanager_receiver = custom_receiver.as_alertmanager_receiver()?; + + let json_value = alertmanager_receiver.receiver_config; - let json_value = custom_receiver.as_alertmanager_receiver()?.receiver_config; let yaml_string = serde_json::to_string(&json_value).map_err(|e| { InterpretError::new(format!("Failed to serialize receiver config: {}", e)) })?; @@ -146,12 +129,71 @@ impl Interpret for OpenshiftClusterAlertInterpret { InterpretError::new(format!("Failed to parse receiver config as YAML: {}", e)) })?; - existing_receivers.insert(serde_yaml::Value::from(name.as_str()), yaml_value); + if let Some(idx) = existing_receivers_sequence.iter().position(|r| { + r.get("name") + .and_then(|n| n.as_str()) + .map_or(false, |n| n == name) + }) { + info!("Replacing existing AlertManager receiver: {}", name); + existing_receivers_sequence[idx] = yaml_value; + } else { + debug!("Adding new AlertManager receiver: {}", name); + existing_receivers_sequence.push(yaml_value); + } + + additional_resources.push(alertmanager_receiver.additional_ressources); } debug!("Current alertmanager config {am_config:#?}"); + // TODO + // - save new version of alertmanager config + // - write additional ressources to the cluster + let am_config = serde_yaml::to_string(&am_config).map_err(|e| { + InterpretError::new(format!( + "Failed to serialize new alertmanager config to string : {e}" + )) + })?; - Ok(Outcome::success(todo!("whats up"))) + let mut am_config_b64 = String::new(); + BASE64_STANDARD.encode_string(am_config, &mut am_config_b64); + + // TODO put update configmap value and save new value + data_obj.insert( + "alertmanager.yaml".to_string(), + serde_json::Value::String(am_config_b64), + ); + + // https://kubernetes.io/docs/reference/using-api/server-side-apply/#field-management + alertmanager_main_secret.metadata.managed_fields = None; + + trace!("Applying new alertmanager_main_secret {alertmanager_main_secret:#?}"); + client + .apply_dynamic( + &alertmanager_main_secret, + Some(openshift_monitoring_namespace), + true, + ) + .await?; + + let additional_resources = additional_resources.concat(); + trace!("Applying additional ressources for alert receivers {additional_resources:#?}"); + client + .apply_dynamic_many( + &additional_resources, + Some(openshift_monitoring_namespace), + true, + ) + .await?; + + Ok(Outcome::success(format!( + "Successfully configured {} cluster alert receivers: {}", + self.receivers.len(), + self.receivers + .iter() + .map(|r| r.name()) + .collect::>() + .join(", ") + ))) } fn get_name(&self) -> InterpretName {