fix: webhook name must be k8s field compliant, add a FIXME note

This commit is contained in:
Jean-Gabriel Gill-Couture 2025-11-05 16:59:48 -05:00
parent cf84f2cce8
commit 24922321b1
6 changed files with 295 additions and 58 deletions

19
Cargo.lock generated
View File

@ -1804,6 +1804,25 @@ dependencies = [
"url",
]
[[package]]
name = "example-okd-cluster-alerts"
version = "0.1.0"
dependencies = [
"brocade",
"cidr",
"env_logger",
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_secret",
"harmony_secret_derive",
"harmony_types",
"log",
"serde",
"tokio",
"url",
]
[[package]]
name = "example-okd-install"
version = "0.1.0"

View File

@ -15,7 +15,7 @@ async fn main() {
K8sAnywhereTopology::from_env(),
vec![Box::new(OpenshiftClusterAlertScore {
receivers: vec![Box::new(DiscordWebhook {
name: "Webhook example".to_string(),
name: "discord-webhook-example".to_string(),
url: hurl!("http://something.o"),
})],
})],

View File

@ -13,7 +13,7 @@ use kube::{
Client, Config, Discovery, Error, Resource,
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse,
core::{DynamicResourceScope, ErrorResponse},
error::DiscoveryError,
runtime::reflector::Lookup,
};
@ -349,6 +349,169 @@ impl K8sClient {
}
}
fn get_api_for_dynamic_object(
&self,
object: &DynamicObject,
ns: Option<&str>,
) -> Result<Api<DynamicObject>, Error> {
let api_resource = object
.types
.as_ref()
.and_then(|t| {
let parts: Vec<&str> = t.api_version.split('/').collect();
match parts.as_slice() {
[version] => Some(ApiResource::from_gvk(&GroupVersionKind::gvk(
"", version, &t.kind,
))),
[group, version] => Some(ApiResource::from_gvk(&GroupVersionKind::gvk(
group, version, &t.kind,
))),
_ => None,
}
})
.ok_or_else(|| {
Error::BuildRequest(kube::core::request::Error::Validation(
"Invalid apiVersion in DynamicObject {object:#?}".to_string(),
))
})?;
match ns {
Some(ns) => Ok(Api::namespaced_with(self.client.clone(), ns, &api_resource)),
None => Ok(Api::default_namespaced_with(
self.client.clone(),
&api_resource,
)),
}
}
pub async fn apply_dynamic_many(
&self,
resource: &[DynamicObject],
namespace: Option<&str>,
force_conflicts: bool,
) -> Result<Vec<DynamicObject>, Error> {
let mut result = Vec::new();
for r in resource.iter() {
result.push(self.apply_dynamic(r, namespace, force_conflicts).await?);
}
Ok(result)
}
/// Apply DynamicObject resource to the cluster
pub async fn apply_dynamic(
&self,
resource: &DynamicObject,
namespace: Option<&str>,
force_conflicts: bool,
) -> Result<DynamicObject, Error> {
// Build API for this dynamic object
let api = self.get_api_for_dynamic_object(resource, namespace)?;
let name = resource
.metadata
.name
.as_ref()
.ok_or_else(|| {
Error::BuildRequest(kube::core::request::Error::Validation(
"DynamicObject must have metadata.name".to_string(),
))
})?
.as_str();
debug!(
"Applying dynamic resource kind={:?} apiVersion={:?} name='{}' ns={:?}",
resource.types.as_ref().map(|t| &t.kind),
resource.types.as_ref().map(|t| &t.api_version),
name,
namespace
);
trace!(
"Dynamic resource payload:\n{:#}",
serde_json::to_value(resource).unwrap_or(serde_json::Value::Null)
);
// Using same field manager as in apply()
let mut patch_params = PatchParams::apply("harmony");
patch_params.force = force_conflicts;
if *crate::config::DRY_RUN {
// Dry-run path: fetch current, show diff, and return appropriate object
match api.get(name).await {
Ok(current) => {
trace!("Received current dynamic value {current:#?}");
println!("\nPerforming dry-run for resource: '{}'", name);
// Serialize current and new, and strip status from current if present
let mut current_yaml =
serde_yaml::to_value(&current).unwrap_or_else(|_| serde_yaml::Value::Null);
if let Some(map) = current_yaml.as_mapping_mut() {
if map.contains_key(&serde_yaml::Value::String("status".to_string())) {
let removed =
map.remove(&serde_yaml::Value::String("status".to_string()));
trace!("Removed status from current dynamic object: {:?}", removed);
} else {
trace!(
"Did not find status entry for current dynamic object {}/{}",
current.metadata.namespace.as_deref().unwrap_or(""),
current.metadata.name.as_deref().unwrap_or("")
);
}
}
let current_yaml = serde_yaml::to_string(&current_yaml)
.unwrap_or_else(|_| "Failed to serialize current resource".to_string());
let new_yaml = serde_yaml::to_string(resource)
.unwrap_or_else(|_| "Failed to serialize new resource".to_string());
if current_yaml == new_yaml {
println!("No changes detected.");
return Ok(current);
}
println!("Changes detected:");
let diff = TextDiff::from_lines(&current_yaml, &new_yaml);
for change in diff.iter_all_changes() {
let sign = match change.tag() {
similar::ChangeTag::Delete => "-",
similar::ChangeTag::Insert => "+",
similar::ChangeTag::Equal => " ",
};
print!("{}{}", sign, change);
}
// Return the incoming resource as the would-be applied state
Ok(resource.clone())
}
Err(Error::Api(ErrorResponse { code: 404, .. })) => {
println!("\nPerforming dry-run for new resource: '{}'", name);
println!(
"Resource does not exist. It would be created with the following content:"
);
let new_yaml = serde_yaml::to_string(resource)
.unwrap_or_else(|_| "Failed to serialize new resource".to_string());
for line in new_yaml.lines() {
println!("+{}", line);
}
Ok(resource.clone())
}
Err(e) => {
error!("Failed to get dynamic resource '{}': {}", name, e);
Err(e)
}
}
} else {
// Real apply via server-side apply
debug!("Patching (server-side apply) dynamic resource '{}'", name);
api.patch(name, &patch_params, &Patch::Apply(resource))
.await
.map_err(|e| {
error!("Failed to apply dynamic resource '{}': {}", name, e);
e
})
}
}
/// Apply a resource in namespace
///
/// See `kubectl apply` for more information on the expected behavior of this function

View File

@ -14,7 +14,7 @@ use k8s_openapi::{
},
apimachinery::pkg::util::intstr::IntOrString,
};
use kube::Resource;
use kube::{api::DynamicObject, Resource};
use log::debug;
use serde::de::DeserializeOwned;
use serde_json::json;

View File

@ -32,6 +32,19 @@ use harmony_types::net::Url;
#[derive(Debug, Clone, Serialize)]
pub struct DiscordWebhook {
// FIXME use a stricter type as this is used as a k8s resource name. It could also be converted
// to remove whitespace and other invalid characters, but this is a potential bug that is not
// very easy to figure out for beginners.
//
// It gives out error messages like this :
//
// [2025-10-30 15:10:49 ERROR harmony::domain::topology::k8s] Failed to get dynamic resource 'Webhook example-secret': Failed to build request: failed to build request: invalid uri character
// [2025-10-30 15:10:49 ERROR harmony_cli::cli_logger] ⚠️ InterpretError : Failed to build request: failed to build request: invalid uri character
// [2025-10-30 15:10:49 DEBUG harmony::domain::maestro] Got result Err(InterpretError { msg: "InterpretError : Failed to build request: failed to build request: invalid uri character" })
// [2025-10-30 15:10:49 INFO harmony_cli::cli_logger] 🎼 Harmony completed
//
// thread 'main' panicked at examples/okd_cluster_alerts/src/main.rs:25:6:
// called `Result::unwrap()` on an `Err` value: InterpretError { msg: "InterpretError : Failed to build request: failed to build request: invalid uri character" }
pub name: String,
pub url: Url,
}
@ -84,7 +97,7 @@ impl AlertReceiver<OpenshiftClusterAlertSender> for DiscordWebhook {
}
fn name(&self) -> String {
todo!()
self.name.clone()
}
fn clone_box(&self) -> Box<dyn AlertReceiver<OpenshiftClusterAlertSender>> {

View File

@ -1,5 +1,4 @@
use base64::prelude::*;
use std::sync::Arc;
use async_trait::async_trait;
use harmony_types::id::Id;
@ -11,21 +10,9 @@ use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
modules::{
application::Application,
monitoring::{
grafana::grafana::Grafana,
kube_prometheus::crd::crd_alertmanager_config::CRDPrometheus,
okd::OpenshiftClusterAlertSender,
},
prometheus::prometheus::PrometheusMonitoring,
},
modules::monitoring::okd::OpenshiftClusterAlertSender,
score::Score,
topology::{
K8sclient, Topology,
k8s::K8sClient,
oberservability::monitoring::{AlertReceiver, AlertingInterpret, ScrapeTarget},
},
topology::{K8sclient, Topology, oberservability::monitoring::AlertReceiver},
};
impl Clone for Box<dyn AlertReceiver<OpenshiftClusterAlertSender>> {
@ -74,31 +61,33 @@ impl<T: Topology + K8sclient> Interpret<T> for OpenshiftClusterAlertInterpret {
topology: &T,
) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await?;
let openshift_monitoring_namespace = "openshift-monitoring";
let secret: DynamicObject = client
.get_secret_json_value("alertmanager-main", Some("openshift-monitoring"))
let mut alertmanager_main_secret: DynamicObject = client
.get_secret_json_value("alertmanager-main", Some(openshift_monitoring_namespace))
.await?;
trace!("Got secret {secret:?}");
trace!("Got secret {alertmanager_main_secret:#?}");
let data: serde_json::Value = secret.data;
let data: &mut serde_json::Value = &mut alertmanager_main_secret.data;
trace!("Alertmanager-main secret data {data:#?}");
let data_obj = data
.get_mut("data")
.ok_or(InterpretError::new(
"Missing 'data' field in alertmanager-main secret.".to_string(),
))?
.as_object_mut()
.ok_or(InterpretError::new(
"'data' field in alertmanager-main secret is expected to be an object ."
.to_string(),
))?;
// TODO fix this unwrap, handle the option gracefully
let config_b64 = match data.get("data") {
Some(data_value) => match data_value.get("alertmanager.yaml") {
Some(value) => value.as_str().unwrap_or(""),
None => {
return Err(InterpretError::new(
"Missing 'alertmanager.yaml' in alertmanager-main secret".to_string(),
));
}
},
None => {
return Err(InterpretError::new(
"Missing 'data' field in alertmanager-main secret.".to_string(),
));
}
};
let config_b64 = data_obj
.get("alertmanager.yaml")
.ok_or(InterpretError::new(
"Missing 'alertmanager.yaml' in alertmanager-main secret data".to_string(),
))?
.as_str()
.unwrap_or("");
trace!("Config base64 {config_b64}");
let config_bytes = BASE64_STANDARD.decode(config_b64).unwrap_or_default();
@ -109,34 +98,28 @@ impl<T: Topology + K8sclient> Interpret<T> for OpenshiftClusterAlertInterpret {
debug!("Current alertmanager config {am_config:#?}");
let existing_receivers = if let Some(receivers) = am_config.get_mut("receivers") {
match receivers.as_mapping_mut() {
Some(recv) => recv,
let existing_receivers_sequence = if let Some(receivers) = am_config.get_mut("receivers") {
match receivers.as_sequence_mut() {
Some(seq) => seq,
None => {
return Err(InterpretError::new(format!(
"Expected alertmanager config receivers to be a mapping, got {receivers:?}"
"Expected alertmanager config receivers to be a sequence, got {:?}",
receivers
)));
}
}
} else {
&mut serde_yaml::mapping::Mapping::default()
&mut serde_yaml::Sequence::default()
};
trace!("Existing receivers : {existing_receivers:#?}");
let mut additional_resources = vec![];
for custom_receiver in &self.receivers {
let name = &custom_receiver.name();
if let Some(recv) = existing_receivers.get(name) {
info!(
"AlertManager receiver {name} already exists and will be overwritten : {recv:#?}"
);
}
debug!(
"Custom receiver YAML output: {:?}",
custom_receiver.as_alertmanager_receiver()
);
let name = custom_receiver.name();
let alertmanager_receiver = custom_receiver.as_alertmanager_receiver()?;
let json_value = alertmanager_receiver.receiver_config;
let json_value = custom_receiver.as_alertmanager_receiver()?.receiver_config;
let yaml_string = serde_json::to_string(&json_value).map_err(|e| {
InterpretError::new(format!("Failed to serialize receiver config: {}", e))
})?;
@ -146,12 +129,71 @@ impl<T: Topology + K8sclient> Interpret<T> for OpenshiftClusterAlertInterpret {
InterpretError::new(format!("Failed to parse receiver config as YAML: {}", e))
})?;
existing_receivers.insert(serde_yaml::Value::from(name.as_str()), yaml_value);
if let Some(idx) = existing_receivers_sequence.iter().position(|r| {
r.get("name")
.and_then(|n| n.as_str())
.map_or(false, |n| n == name)
}) {
info!("Replacing existing AlertManager receiver: {}", name);
existing_receivers_sequence[idx] = yaml_value;
} else {
debug!("Adding new AlertManager receiver: {}", name);
existing_receivers_sequence.push(yaml_value);
}
additional_resources.push(alertmanager_receiver.additional_ressources);
}
debug!("Current alertmanager config {am_config:#?}");
// TODO
// - save new version of alertmanager config
// - write additional ressources to the cluster
let am_config = serde_yaml::to_string(&am_config).map_err(|e| {
InterpretError::new(format!(
"Failed to serialize new alertmanager config to string : {e}"
))
})?;
Ok(Outcome::success(todo!("whats up")))
let mut am_config_b64 = String::new();
BASE64_STANDARD.encode_string(am_config, &mut am_config_b64);
// TODO put update configmap value and save new value
data_obj.insert(
"alertmanager.yaml".to_string(),
serde_json::Value::String(am_config_b64),
);
// https://kubernetes.io/docs/reference/using-api/server-side-apply/#field-management
alertmanager_main_secret.metadata.managed_fields = None;
trace!("Applying new alertmanager_main_secret {alertmanager_main_secret:#?}");
client
.apply_dynamic(
&alertmanager_main_secret,
Some(openshift_monitoring_namespace),
true,
)
.await?;
let additional_resources = additional_resources.concat();
trace!("Applying additional ressources for alert receivers {additional_resources:#?}");
client
.apply_dynamic_many(
&additional_resources,
Some(openshift_monitoring_namespace),
true,
)
.await?;
Ok(Outcome::success(format!(
"Successfully configured {} cluster alert receivers: {}",
self.receivers.len(),
self.receivers
.iter()
.map(|r| r.name())
.collect::<Vec<_>>()
.join(", ")
)))
}
fn get_name(&self) -> InterpretName {