diff --git a/Cargo.lock b/Cargo.lock index 5c45111..37a55fb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1839,6 +1839,25 @@ dependencies = [ "url", ] +[[package]] +name = "example-okd-cluster-alerts" +version = "0.1.0" +dependencies = [ + "brocade", + "cidr", + "env_logger", + "harmony", + "harmony_cli", + "harmony_macros", + "harmony_secret", + "harmony_secret_derive", + "harmony_types", + "log", + "serde", + "tokio", + "url", +] + [[package]] name = "example-okd-install" version = "0.1.0" diff --git a/examples/monitoring/src/main.rs b/examples/monitoring/src/main.rs index d06a93e..7037094 100644 --- a/examples/monitoring/src/main.rs +++ b/examples/monitoring/src/main.rs @@ -24,13 +24,14 @@ use harmony::{ }, topology::K8sAnywhereTopology, }; -use harmony_types::net::Url; +use harmony_types::{k8s_name::K8sName, net::Url}; #[tokio::main] async fn main() { let discord_receiver = DiscordWebhook { - name: "test-discord".to_string(), + name: K8sName("test-discord".to_string()), url: Url::Url(url::Url::parse("https://discord.doesnt.exist.com").unwrap()), + selectors: vec![], }; let high_pvc_fill_rate_over_two_days_alert = high_pvc_fill_rate_over_two_days(); diff --git a/examples/monitoring_with_tenant/src/main.rs b/examples/monitoring_with_tenant/src/main.rs index 5b85f78..f67f9d8 100644 --- a/examples/monitoring_with_tenant/src/main.rs +++ b/examples/monitoring_with_tenant/src/main.rs @@ -22,8 +22,8 @@ use harmony::{ tenant::{ResourceLimits, TenantConfig, TenantNetworkPolicy}, }, }; -use harmony_types::id::Id; use harmony_types::net::Url; +use harmony_types::{id::Id, k8s_name::K8sName}; #[tokio::main] async fn main() { @@ -43,8 +43,9 @@ async fn main() { }; let discord_receiver = DiscordWebhook { - name: "test-discord".to_string(), + name: K8sName("test-discord".to_string()), url: Url::Url(url::Url::parse("https://discord.doesnt.exist.com").unwrap()), + selectors: vec![], }; let high_pvc_fill_rate_over_two_days_alert = high_pvc_fill_rate_over_two_days(); diff --git a/examples/okd_cluster_alerts/Cargo.toml b/examples/okd_cluster_alerts/Cargo.toml new file mode 100644 index 0000000..5590675 --- /dev/null +++ b/examples/okd_cluster_alerts/Cargo.toml @@ -0,0 +1,22 @@ +[package] +name = "example-okd-cluster-alerts" +edition = "2024" +version.workspace = true +readme.workspace = true +license.workspace = true +publish = false + +[dependencies] +harmony = { path = "../../harmony" } +harmony_cli = { path = "../../harmony_cli" } +harmony_types = { path = "../../harmony_types" } +harmony_secret = { path = "../../harmony_secret" } +harmony_secret_derive = { path = "../../harmony_secret_derive" } +cidr = { workspace = true } +tokio = { workspace = true } +harmony_macros = { path = "../../harmony_macros" } +log = { workspace = true } +env_logger = { workspace = true } +url = { workspace = true } +serde.workspace = true +brocade = { path = "../../brocade" } diff --git a/examples/okd_cluster_alerts/src/main.rs b/examples/okd_cluster_alerts/src/main.rs new file mode 100644 index 0000000..93dac3b --- /dev/null +++ b/examples/okd_cluster_alerts/src/main.rs @@ -0,0 +1,38 @@ +use std::collections::HashMap; + +use harmony::{ + inventory::Inventory, + modules::monitoring::{ + alert_channel::discord_alert_channel::DiscordWebhook, + okd::cluster_monitoring::OpenshiftClusterAlertScore, + }, + topology::K8sAnywhereTopology, +}; +use harmony_macros::hurl; +use harmony_types::k8s_name::K8sName; + +#[tokio::main] +async fn main() { + let mut sel = HashMap::new(); + sel.insert( + "openshift_io_alert_source".to_string(), + "platform".to_string(), + ); + let mut sel2 = HashMap::new(); + sel2.insert("openshift_io_alert_source".to_string(), "".to_string()); + let selectors = vec![sel, sel2]; + harmony_cli::run( + Inventory::autoload(), + K8sAnywhereTopology::from_env(), + vec![Box::new(OpenshiftClusterAlertScore { + receivers: vec![Box::new(DiscordWebhook { + name: K8sName("wills-discord-webhook-example".to_string()), + url: hurl!("https://something.io"), + selectors: selectors, + })], + })], + None, + ) + .await + .unwrap(); +} diff --git a/examples/rhob_application_monitoring/src/main.rs b/examples/rhob_application_monitoring/src/main.rs index ec69be1..6eeaea2 100644 --- a/examples/rhob_application_monitoring/src/main.rs +++ b/examples/rhob_application_monitoring/src/main.rs @@ -1,4 +1,4 @@ -use std::{path::PathBuf, sync::Arc}; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; use harmony::{ inventory::Inventory, @@ -10,7 +10,7 @@ use harmony::{ }, topology::K8sAnywhereTopology, }; -use harmony_types::net::Url; +use harmony_types::{k8s_name::K8sName, net::Url}; #[tokio::main] async fn main() { @@ -23,8 +23,9 @@ async fn main() { }); let discord_receiver = DiscordWebhook { - name: "test-discord".to_string(), + name: K8sName("test-discord".to_string()), url: Url::Url(url::Url::parse("https://discord.doesnt.exist.com").unwrap()), + selectors: vec![], }; let app = ApplicationScore { diff --git a/examples/rust/.gitignore b/examples/rust/.gitignore index df77acc..a545af6 100644 --- a/examples/rust/.gitignore +++ b/examples/rust/.gitignore @@ -1,3 +1,4 @@ Dockerfile.harmony .harmony_generated harmony +webapp diff --git a/examples/rust/src/main.rs b/examples/rust/src/main.rs index feba1eb..a100606 100644 --- a/examples/rust/src/main.rs +++ b/examples/rust/src/main.rs @@ -1,4 +1,4 @@ -use std::{path::PathBuf, sync::Arc}; +use std::{collections::HashMap, path::PathBuf, sync::Arc}; use harmony::{ inventory::Inventory, @@ -14,6 +14,7 @@ use harmony::{ topology::K8sAnywhereTopology, }; use harmony_macros::hurl; +use harmony_types::k8s_name::K8sName; #[tokio::main] async fn main() { @@ -26,8 +27,9 @@ async fn main() { }); let discord_receiver = DiscordWebhook { - name: "test-discord".to_string(), + name: K8sName("test-discord".to_string()), url: hurl!("https://discord.doesnt.exist.com"), + selectors: vec![], }; let webhook_receiver = WebhookReceiver { diff --git a/examples/rust/webapp/helm/harmony-example-rust-webapp-chart-0.1.0.tgz b/examples/rust/webapp/helm/harmony-example-rust-webapp-chart-0.1.0.tgz new file mode 100644 index 0000000..9736a50 Binary files /dev/null and b/examples/rust/webapp/helm/harmony-example-rust-webapp-chart-0.1.0.tgz differ diff --git a/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/Chart.yaml b/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/Chart.yaml new file mode 100644 index 0000000..5408d35 --- /dev/null +++ b/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/Chart.yaml @@ -0,0 +1,7 @@ + +apiVersion: v2 +name: harmony-example-rust-webapp-chart +description: A Helm chart for the harmony-example-rust-webapp web application. +type: application +version: 0.1.0 +appVersion: "latest" diff --git a/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/templates/_helpers.tpl b/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/templates/_helpers.tpl new file mode 100644 index 0000000..622a662 --- /dev/null +++ b/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/templates/_helpers.tpl @@ -0,0 +1,16 @@ + +{{/* +Expand the name of the chart. +*/}} +{{- define "chart.name" -}} +{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }} +{{- end }} + +{{/* +Create a default fully qualified app name. +We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). +*/}} +{{- define "chart.fullname" -}} +{{- $name := default .Chart.Name .Values.nameOverride }} +{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} +{{- end }} diff --git a/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/templates/deployment.yaml b/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/templates/deployment.yaml new file mode 100644 index 0000000..03b9276 --- /dev/null +++ b/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/templates/deployment.yaml @@ -0,0 +1,23 @@ + +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ include "chart.fullname" . }} +spec: + replicas: {{ .Values.replicaCount }} + selector: + matchLabels: + app: {{ include "chart.name" . }} + template: + metadata: + labels: + app: {{ include "chart.name" . }} + spec: + containers: + - name: {{ .Chart.Name }} + image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + ports: + - name: http + containerPort: 3000 + protocol: TCP diff --git a/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/templates/ingress.yaml b/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/templates/ingress.yaml new file mode 100644 index 0000000..7001e16 --- /dev/null +++ b/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/templates/ingress.yaml @@ -0,0 +1,35 @@ + +{{- if .Values.ingress.enabled -}} +apiVersion: networking.k8s.io/v1 +kind: Ingress +metadata: + name: {{ include "chart.fullname" . }} + annotations: + {{- toYaml .Values.ingress.annotations | nindent 4 }} +spec: + {{- if .Values.ingress.tls }} + tls: + {{- range .Values.ingress.tls }} + - hosts: + {{- range .hosts }} + - {{ . | quote }} + {{- end }} + secretName: {{ .secretName }} + {{- end }} + {{- end }} + rules: + {{- range .Values.ingress.hosts }} + - host: {{ .host | quote }} + http: + paths: + {{- range .paths }} + - path: {{ .path }} + pathType: {{ .pathType }} + backend: + service: + name: {{ include "chart.fullname" $ }} + port: + number: 3000 + {{- end }} + {{- end }} +{{- end }} diff --git a/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/templates/service.yaml b/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/templates/service.yaml new file mode 100644 index 0000000..f3e6841 --- /dev/null +++ b/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/templates/service.yaml @@ -0,0 +1,14 @@ + +apiVersion: v1 +kind: Service +metadata: + name: {{ include "chart.fullname" . }} +spec: + type: {{ .Values.service.type }} + ports: + - port: {{ .Values.service.port }} + targetPort: 3000 + protocol: TCP + name: http + selector: + app: {{ include "chart.name" . }} diff --git a/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/values.yaml b/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/values.yaml new file mode 100644 index 0000000..640df94 --- /dev/null +++ b/examples/rust/webapp/helm/harmony-example-rust-webapp-chart/values.yaml @@ -0,0 +1,34 @@ + +# Default values for harmony-example-rust-webapp-chart. +# This is a YAML-formatted file. +# Declare variables to be passed into your templates. + +replicaCount: 1 + +image: + repository: hub.nationtech.io/harmony/harmony-example-rust-webapp + pullPolicy: IfNotPresent + # Overridden by the chart's appVersion + tag: "latest" + +service: + type: ClusterIP + port: 3000 + +ingress: + enabled: true + # Annotations for cert-manager to handle SSL. + annotations: + cert-manager.io/cluster-issuer: "letsencrypt-prod" + # Add other annotations like nginx ingress class if needed + # kubernetes.io/ingress.class: nginx + hosts: + - host: chart-example.local + paths: + - path: / + pathType: ImplementationSpecific + tls: + - secretName: harmony-example-rust-webapp-tls + hosts: + - chart-example.local + diff --git a/examples/try_rust_webapp/src/main.rs b/examples/try_rust_webapp/src/main.rs index 5d67391..d3c88b3 100644 --- a/examples/try_rust_webapp/src/main.rs +++ b/examples/try_rust_webapp/src/main.rs @@ -10,6 +10,7 @@ use harmony::{ topology::K8sAnywhereTopology, }; use harmony_macros::hurl; +use harmony_types::k8s_name::K8sName; use std::{path::PathBuf, sync::Arc}; #[tokio::main] @@ -32,8 +33,9 @@ async fn main() { Box::new(Monitoring { application: application.clone(), alert_receiver: vec![Box::new(DiscordWebhook { - name: "test-discord".to_string(), + name: K8sName("test-discord".to_string()), url: hurl!("https://discord.doesnt.exist.com"), + selectors: vec![], })], }), ], diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 60b0b38..e46426b 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -16,7 +16,7 @@ use kube::{ Api, AttachParams, DeleteParams, ListParams, ObjectList, Patch, PatchParams, ResourceExt, }, config::{KubeConfigOptions, Kubeconfig}, - core::ErrorResponse, + core::{DynamicResourceScope, ErrorResponse}, discovery::{ApiCapabilities, Scope}, error::DiscoveryError, runtime::reflector::Lookup, @@ -240,6 +240,23 @@ impl K8sClient { resource.get(name).await } + pub async fn get_secret_json_value( + &self, + name: &str, + namespace: Option<&str>, + ) -> Result { + self.get_resource_json_value( + name, + namespace, + &GroupVersionKind { + group: "".to_string(), + version: "v1".to_string(), + kind: "Secret".to_string(), + }, + ) + .await + } + pub async fn get_deployment( &self, name: &str, @@ -483,6 +500,169 @@ impl K8sClient { } } + fn get_api_for_dynamic_object( + &self, + object: &DynamicObject, + ns: Option<&str>, + ) -> Result, Error> { + let api_resource = object + .types + .as_ref() + .and_then(|t| { + let parts: Vec<&str> = t.api_version.split('/').collect(); + match parts.as_slice() { + [version] => Some(ApiResource::from_gvk(&GroupVersionKind::gvk( + "", version, &t.kind, + ))), + [group, version] => Some(ApiResource::from_gvk(&GroupVersionKind::gvk( + group, version, &t.kind, + ))), + _ => None, + } + }) + .ok_or_else(|| { + Error::BuildRequest(kube::core::request::Error::Validation( + "Invalid apiVersion in DynamicObject {object:#?}".to_string(), + )) + })?; + + match ns { + Some(ns) => Ok(Api::namespaced_with(self.client.clone(), ns, &api_resource)), + None => Ok(Api::default_namespaced_with( + self.client.clone(), + &api_resource, + )), + } + } + + pub async fn apply_dynamic_many( + &self, + resource: &[DynamicObject], + namespace: Option<&str>, + force_conflicts: bool, + ) -> Result, Error> { + let mut result = Vec::new(); + for r in resource.iter() { + result.push(self.apply_dynamic(r, namespace, force_conflicts).await?); + } + + Ok(result) + } + + /// Apply DynamicObject resource to the cluster + pub async fn apply_dynamic( + &self, + resource: &DynamicObject, + namespace: Option<&str>, + force_conflicts: bool, + ) -> Result { + // Build API for this dynamic object + let api = self.get_api_for_dynamic_object(resource, namespace)?; + let name = resource + .metadata + .name + .as_ref() + .ok_or_else(|| { + Error::BuildRequest(kube::core::request::Error::Validation( + "DynamicObject must have metadata.name".to_string(), + )) + })? + .as_str(); + + debug!( + "Applying dynamic resource kind={:?} apiVersion={:?} name='{}' ns={:?}", + resource.types.as_ref().map(|t| &t.kind), + resource.types.as_ref().map(|t| &t.api_version), + name, + namespace + ); + trace!( + "Dynamic resource payload:\n{:#}", + serde_json::to_value(resource).unwrap_or(serde_json::Value::Null) + ); + + // Using same field manager as in apply() + let mut patch_params = PatchParams::apply("harmony"); + patch_params.force = force_conflicts; + + if *crate::config::DRY_RUN { + // Dry-run path: fetch current, show diff, and return appropriate object + match api.get(name).await { + Ok(current) => { + trace!("Received current dynamic value {current:#?}"); + + println!("\nPerforming dry-run for resource: '{}'", name); + + // Serialize current and new, and strip status from current if present + let mut current_yaml = + serde_yaml::to_value(¤t).unwrap_or_else(|_| serde_yaml::Value::Null); + if let Some(map) = current_yaml.as_mapping_mut() { + if map.contains_key(&serde_yaml::Value::String("status".to_string())) { + let removed = + map.remove(&serde_yaml::Value::String("status".to_string())); + trace!("Removed status from current dynamic object: {:?}", removed); + } else { + trace!( + "Did not find status entry for current dynamic object {}/{}", + current.metadata.namespace.as_deref().unwrap_or(""), + current.metadata.name.as_deref().unwrap_or("") + ); + } + } + + let current_yaml = serde_yaml::to_string(¤t_yaml) + .unwrap_or_else(|_| "Failed to serialize current resource".to_string()); + let new_yaml = serde_yaml::to_string(resource) + .unwrap_or_else(|_| "Failed to serialize new resource".to_string()); + + if current_yaml == new_yaml { + println!("No changes detected."); + return Ok(current); + } + + println!("Changes detected:"); + let diff = TextDiff::from_lines(¤t_yaml, &new_yaml); + for change in diff.iter_all_changes() { + let sign = match change.tag() { + similar::ChangeTag::Delete => "-", + similar::ChangeTag::Insert => "+", + similar::ChangeTag::Equal => " ", + }; + print!("{}{}", sign, change); + } + + // Return the incoming resource as the would-be applied state + Ok(resource.clone()) + } + Err(Error::Api(ErrorResponse { code: 404, .. })) => { + println!("\nPerforming dry-run for new resource: '{}'", name); + println!( + "Resource does not exist. It would be created with the following content:" + ); + let new_yaml = serde_yaml::to_string(resource) + .unwrap_or_else(|_| "Failed to serialize new resource".to_string()); + for line in new_yaml.lines() { + println!("+{}", line); + } + Ok(resource.clone()) + } + Err(e) => { + error!("Failed to get dynamic resource '{}': {}", name, e); + Err(e) + } + } + } else { + // Real apply via server-side apply + debug!("Patching (server-side apply) dynamic resource '{}'", name); + api.patch(name, &patch_params, &Patch::Apply(resource)) + .await + .map_err(|e| { + error!("Failed to apply dynamic resource '{}': {}", name, e); + e + }) + } + } + /// Apply a resource in namespace /// /// See `kubectl apply` for more information on the expected behavior of this function diff --git a/harmony/src/domain/topology/oberservability/monitoring.rs b/harmony/src/domain/topology/oberservability/monitoring.rs index 6d7411c..78bb141 100644 --- a/harmony/src/domain/topology/oberservability/monitoring.rs +++ b/harmony/src/domain/topology/oberservability/monitoring.rs @@ -1,6 +1,7 @@ -use std::any::Any; +use std::{any::Any, collections::HashMap}; use async_trait::async_trait; +use kube::api::DynamicObject; use log::debug; use crate::{ @@ -76,6 +77,15 @@ pub trait AlertReceiver: std::fmt::Debug + Send + Sync { fn name(&self) -> String; fn clone_box(&self) -> Box>; fn as_any(&self) -> &dyn Any; + fn as_alertmanager_receiver(&self) -> Result; +} + +#[derive(Debug)] +pub struct AlertManagerReceiver { + pub receiver_config: serde_json::Value, + // FIXME we should not leak k8s here. DynamicObject is k8s specific + pub additional_ressources: Vec, + pub route_config: serde_json::Value, } #[async_trait] diff --git a/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs b/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs index 8bef793..9462789 100644 --- a/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs +++ b/harmony/src/modules/monitoring/alert_channel/discord_alert_channel.rs @@ -1,18 +1,23 @@ use std::any::Any; -use std::collections::BTreeMap; +use std::collections::{BTreeMap, HashMap}; use async_trait::async_trait; +use harmony_types::k8s_name::K8sName; use k8s_openapi::api::core::v1::Secret; -use kube::api::ObjectMeta; -use log::debug; +use kube::Resource; +use kube::api::{DynamicObject, ObjectMeta}; +use log::{debug, trace}; use serde::Serialize; use serde_json::json; use serde_yaml::{Mapping, Value}; +use crate::infra::kube::kube_resource_to_dynamic; use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::{ AlertmanagerConfig, AlertmanagerConfigSpec, CRDPrometheus, }; use crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::RHOBObservability; +use crate::modules::monitoring::okd::OpenshiftClusterAlertSender; +use crate::topology::oberservability::monitoring::AlertManagerReceiver; use crate::{ interpret::{InterpretError, Outcome}, modules::monitoring::{ @@ -28,14 +33,13 @@ use harmony_types::net::Url; #[derive(Debug, Clone, Serialize)] pub struct DiscordWebhook { - pub name: String, + pub name: K8sName, pub url: Url, + pub selectors: Vec>, } -#[async_trait] -impl AlertReceiver for DiscordWebhook { - async fn install(&self, sender: &RHOBObservability) -> Result { - let ns = sender.namespace.clone(); +impl DiscordWebhook { + fn get_receiver_config(&self) -> Result { let secret_name = format!("{}-secret", self.name.clone()); let webhook_key = format!("{}", self.url.clone()); @@ -52,33 +56,91 @@ impl AlertReceiver for DiscordWebhook { ..Default::default() }; - let _ = sender.client.apply(&secret, Some(&ns)).await; + let mut matchers: Vec = Vec::new(); + for selector in &self.selectors { + trace!("selector: {:#?}", selector); + for (k, v) in selector { + matchers.push(format!("{} = {}", k, v)); + } + } + + Ok(AlertManagerReceiver { + additional_ressources: vec![kube_resource_to_dynamic(&secret)?], + + receiver_config: json!({ + "name": self.name, + "discord_configs": [ + { + "webhook_url": self.url.clone(), + "title": "{{ template \"discord.default.title\" . }}", + "message": "{{ template \"discord.default.message\" . }}" + } + ] + }), + route_config: json!({ + "receiver": self.name, + "matchers": matchers, + + }), + }) + } +} + +#[async_trait] +impl AlertReceiver for DiscordWebhook { + async fn install( + &self, + sender: &OpenshiftClusterAlertSender, + ) -> Result { + todo!() + } + + fn name(&self) -> String { + self.name.clone().to_string() + } + + fn clone_box(&self) -> Box> { + Box::new(self.clone()) + } + + fn as_any(&self) -> &dyn Any { + todo!() + } + + fn as_alertmanager_receiver(&self) -> Result { + self.get_receiver_config() + } +} + +#[async_trait] +impl AlertReceiver for DiscordWebhook { + fn as_alertmanager_receiver(&self) -> Result { + todo!() + } + + async fn install(&self, sender: &RHOBObservability) -> Result { + let ns = sender.namespace.clone(); + + let config = self.get_receiver_config()?; + for resource in config.additional_ressources.iter() { + todo!("can I apply a dynamicresource"); + // sender.client.apply(resource, Some(&ns)).await; + } + let spec = crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::AlertmanagerConfigSpec { data: json!({ "route": { "receiver": self.name, }, "receivers": [ - { - "name": self.name, - "discordConfigs": [ - { - "apiURL": { - "name": secret_name, - "key": "webhook-url", - }, - "title": "{{ template \"discord.default.title\" . }}", - "message": "{{ template \"discord.default.message\" . }}" - } - ] - } + config.receiver_config ] }), }; let alertmanager_configs = crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::AlertmanagerConfig { metadata: ObjectMeta { - name: Some(self.name.clone()), + name: Some(self.name.clone().to_string()), labels: Some(std::collections::BTreeMap::from([( "alertmanagerConfig".to_string(), "enabled".to_string(), @@ -122,6 +184,9 @@ impl AlertReceiver for DiscordWebhook { #[async_trait] impl AlertReceiver for DiscordWebhook { + fn as_alertmanager_receiver(&self) -> Result { + todo!() + } async fn install(&self, sender: &CRDPrometheus) -> Result { let ns = sender.namespace.clone(); let secret_name = format!("{}-secret", self.name.clone()); @@ -167,7 +232,7 @@ impl AlertReceiver for DiscordWebhook { let alertmanager_configs = AlertmanagerConfig { metadata: ObjectMeta { - name: Some(self.name.clone()), + name: Some(self.name.clone().to_string()), labels: Some(std::collections::BTreeMap::from([( "alertmanagerConfig".to_string(), "enabled".to_string(), @@ -200,6 +265,9 @@ impl AlertReceiver for DiscordWebhook { #[async_trait] impl AlertReceiver for DiscordWebhook { + fn as_alertmanager_receiver(&self) -> Result { + todo!() + } async fn install(&self, sender: &Prometheus) -> Result { sender.install_receiver(self).await } @@ -217,7 +285,7 @@ impl AlertReceiver for DiscordWebhook { #[async_trait] impl PrometheusReceiver for DiscordWebhook { fn name(&self) -> String { - self.name.clone() + self.name.clone().to_string() } async fn configure_receiver(&self) -> AlertManagerChannelConfig { self.get_config().await @@ -226,6 +294,9 @@ impl PrometheusReceiver for DiscordWebhook { #[async_trait] impl AlertReceiver for DiscordWebhook { + fn as_alertmanager_receiver(&self) -> Result { + todo!() + } async fn install(&self, sender: &KubePrometheus) -> Result { sender.install_receiver(self).await } @@ -243,7 +314,7 @@ impl AlertReceiver for DiscordWebhook { #[async_trait] impl KubePrometheusReceiver for DiscordWebhook { fn name(&self) -> String { - self.name.clone() + self.name.clone().to_string() } async fn configure_receiver(&self) -> AlertManagerChannelConfig { self.get_config().await @@ -270,7 +341,7 @@ impl DiscordWebhook { let mut route = Mapping::new(); route.insert( Value::String("receiver".to_string()), - Value::String(self.name.clone()), + Value::String(self.name.clone().to_string()), ); route.insert( Value::String("matchers".to_string()), @@ -284,7 +355,7 @@ impl DiscordWebhook { let mut receiver = Mapping::new(); receiver.insert( Value::String("name".to_string()), - Value::String(self.name.clone()), + Value::String(self.name.clone().to_string()), ); let mut discord_config = Mapping::new(); @@ -309,8 +380,9 @@ mod tests { #[tokio::test] async fn discord_serialize_should_match() { let discord_receiver = DiscordWebhook { - name: "test-discord".to_string(), + name: K8sName("test-discord".to_string()), url: Url::Url(url::Url::parse("https://discord.i.dont.exist.com").unwrap()), + selectors: vec![], }; let discord_receiver_receiver = diff --git a/harmony/src/modules/monitoring/alert_channel/webhook_receiver.rs b/harmony/src/modules/monitoring/alert_channel/webhook_receiver.rs index 1b20df3..a141df0 100644 --- a/harmony/src/modules/monitoring/alert_channel/webhook_receiver.rs +++ b/harmony/src/modules/monitoring/alert_channel/webhook_receiver.rs @@ -19,7 +19,7 @@ use crate::{ }, prometheus::prometheus::{Prometheus, PrometheusReceiver}, }, - topology::oberservability::monitoring::AlertReceiver, + topology::oberservability::monitoring::{AlertManagerReceiver, AlertReceiver}, }; use harmony_types::net::Url; @@ -31,6 +31,9 @@ pub struct WebhookReceiver { #[async_trait] impl AlertReceiver for WebhookReceiver { + fn as_alertmanager_receiver(&self) -> Result { + todo!() + } async fn install(&self, sender: &RHOBObservability) -> Result { let spec = crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::AlertmanagerConfigSpec { data: json!({ @@ -97,6 +100,9 @@ impl AlertReceiver for WebhookReceiver { #[async_trait] impl AlertReceiver for WebhookReceiver { + fn as_alertmanager_receiver(&self) -> Result { + todo!() + } async fn install(&self, sender: &CRDPrometheus) -> Result { let spec = crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::AlertmanagerConfigSpec { data: json!({ @@ -158,6 +164,9 @@ impl AlertReceiver for WebhookReceiver { #[async_trait] impl AlertReceiver for WebhookReceiver { + fn as_alertmanager_receiver(&self) -> Result { + todo!() + } async fn install(&self, sender: &Prometheus) -> Result { sender.install_receiver(self).await } @@ -184,6 +193,9 @@ impl PrometheusReceiver for WebhookReceiver { #[async_trait] impl AlertReceiver for WebhookReceiver { + fn as_alertmanager_receiver(&self) -> Result { + todo!() + } async fn install(&self, sender: &KubePrometheus) -> Result { sender.install_receiver(self).await } diff --git a/harmony/src/modules/monitoring/okd/cluster_monitoring.rs b/harmony/src/modules/monitoring/okd/cluster_monitoring.rs new file mode 100644 index 0000000..56fb59e --- /dev/null +++ b/harmony/src/modules/monitoring/okd/cluster_monitoring.rs @@ -0,0 +1,270 @@ +use base64::prelude::*; + +use async_trait::async_trait; +use harmony_types::id::Id; +use kube::api::DynamicObject; +use log::{debug, info, trace}; +use serde::Serialize; + +use crate::{ + data::Version, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + modules::monitoring::okd::OpenshiftClusterAlertSender, + score::Score, + topology::{K8sclient, Topology, oberservability::monitoring::AlertReceiver}, +}; + +impl Clone for Box> { + fn clone(&self) -> Self { + self.clone_box() + } +} + +impl Serialize for Box> { + fn serialize(&self, _serializer: S) -> Result + where + S: serde::Serializer, + { + todo!() + } +} + +#[derive(Debug, Clone, Serialize)] +pub struct OpenshiftClusterAlertScore { + pub receivers: Vec>>, +} + +impl Score for OpenshiftClusterAlertScore { + fn name(&self) -> String { + "ClusterAlertScore".to_string() + } + + #[doc(hidden)] + fn create_interpret(&self) -> Box> { + Box::new(OpenshiftClusterAlertInterpret { + receivers: self.receivers.clone(), + }) + } +} + +#[derive(Debug)] +pub struct OpenshiftClusterAlertInterpret { + receivers: Vec>>, +} + +#[async_trait] +impl Interpret for OpenshiftClusterAlertInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + ) -> Result { + let client = topology.k8s_client().await?; + let openshift_monitoring_namespace = "openshift-monitoring"; + + let mut alertmanager_main_secret: DynamicObject = client + .get_secret_json_value("alertmanager-main", Some(openshift_monitoring_namespace)) + .await?; + trace!("Got secret {alertmanager_main_secret:#?}"); + + let data: &mut serde_json::Value = &mut alertmanager_main_secret.data; + trace!("Alertmanager-main secret data {data:#?}"); + let data_obj = data + .get_mut("data") + .ok_or(InterpretError::new( + "Missing 'data' field in alertmanager-main secret.".to_string(), + ))? + .as_object_mut() + .ok_or(InterpretError::new( + "'data' field in alertmanager-main secret is expected to be an object ." + .to_string(), + ))?; + + let config_b64 = data_obj + .get("alertmanager.yaml") + .ok_or(InterpretError::new( + "Missing 'alertmanager.yaml' in alertmanager-main secret data".to_string(), + ))? + .as_str() + .unwrap_or(""); + trace!("Config base64 {config_b64}"); + + let config_bytes = BASE64_STANDARD.decode(config_b64).unwrap_or_default(); + + let mut am_config: serde_yaml::Value = + serde_yaml::from_str(&String::from_utf8(config_bytes).unwrap_or_default()) + .unwrap_or_default(); + + debug!("Current alertmanager config {am_config:#?}"); + + let existing_receivers_sequence = if let Some(receivers) = am_config.get_mut("receivers") { + match receivers.as_sequence_mut() { + Some(seq) => seq, + None => { + return Err(InterpretError::new(format!( + "Expected alertmanager config receivers to be a sequence, got {:?}", + receivers + ))); + } + } + } else { + &mut serde_yaml::Sequence::default() + }; + + let mut additional_resources = vec![]; + + for custom_receiver in &self.receivers { + let name = custom_receiver.name(); + let alertmanager_receiver = custom_receiver.as_alertmanager_receiver()?; + + let receiver_json_value = alertmanager_receiver.receiver_config; + + let receiver_yaml_string = + serde_json::to_string(&receiver_json_value).map_err(|e| { + InterpretError::new(format!("Failed to serialize receiver config: {}", e)) + })?; + + let receiver_yaml_value: serde_yaml::Value = + serde_yaml::from_str(&receiver_yaml_string).map_err(|e| { + InterpretError::new(format!("Failed to parse receiver config as YAML: {}", e)) + })?; + + if let Some(idx) = existing_receivers_sequence.iter().position(|r| { + r.get("name") + .and_then(|n| n.as_str()) + .map_or(false, |n| n == name) + }) { + info!("Replacing existing AlertManager receiver: {}", name); + existing_receivers_sequence[idx] = receiver_yaml_value; + } else { + debug!("Adding new AlertManager receiver: {}", name); + existing_receivers_sequence.push(receiver_yaml_value); + } + + additional_resources.push(alertmanager_receiver.additional_ressources); + } + + let existing_route_mapping = if let Some(route) = am_config.get_mut("route") { + match route.as_mapping_mut() { + Some(map) => map, + None => { + return Err(InterpretError::new(format!( + "Expected alertmanager config route to be a mapping, got {:?}", + route + ))); + } + } + } else { + &mut serde_yaml::Mapping::default() + }; + + let existing_route_sequence = if let Some(routes) = existing_route_mapping.get_mut("routes") + { + match routes.as_sequence_mut() { + Some(seq) => seq, + None => { + return Err(InterpretError::new(format!( + "Expected alertmanager config routes to be a sequence, got {:?}", + routes + ))); + } + } + } else { + &mut serde_yaml::Sequence::default() + }; + + for custom_receiver in &self.receivers { + let name = custom_receiver.name(); + let alertmanager_receiver = custom_receiver.as_alertmanager_receiver()?; + + let route_json_value = alertmanager_receiver.route_config; + let route_yaml_string = serde_json::to_string(&route_json_value).map_err(|e| { + InterpretError::new(format!("Failed to serialize route config: {}", e)) + })?; + + let route_yaml_value: serde_yaml::Value = serde_yaml::from_str(&route_yaml_string) + .map_err(|e| { + InterpretError::new(format!("Failed to parse route config as YAML: {}", e)) + })?; + + if let Some(idy) = existing_route_sequence.iter().position(|r| { + r.get("receiver") + .and_then(|n| n.as_str()) + .map_or(false, |n| n == name) + }) { + info!("Replacing existing AlertManager receiver: {}", name); + existing_route_sequence[idy] = route_yaml_value; + } else { + debug!("Adding new AlertManager receiver: {}", name); + existing_route_sequence.push(route_yaml_value); + } + } + debug!("Current alertmanager config {am_config:#?}"); + // TODO + // - save new version of alertmanager config + // - write additional ressources to the cluster + let am_config = serde_yaml::to_string(&am_config).map_err(|e| { + InterpretError::new(format!( + "Failed to serialize new alertmanager config to string : {e}" + )) + })?; + + let mut am_config_b64 = String::new(); + BASE64_STANDARD.encode_string(am_config, &mut am_config_b64); + + // TODO put update configmap value and save new value + data_obj.insert( + "alertmanager.yaml".to_string(), + serde_json::Value::String(am_config_b64), + ); + + // https://kubernetes.io/docs/reference/using-api/server-side-apply/#field-management + alertmanager_main_secret.metadata.managed_fields = None; + + trace!("Applying new alertmanager_main_secret {alertmanager_main_secret:#?}"); + client + .apply_dynamic( + &alertmanager_main_secret, + Some(openshift_monitoring_namespace), + true, + ) + .await?; + + let additional_resources = additional_resources.concat(); + trace!("Applying additional ressources for alert receivers {additional_resources:#?}"); + client + .apply_dynamic_many( + &additional_resources, + Some(openshift_monitoring_namespace), + true, + ) + .await?; + + Ok(Outcome::success(format!( + "Successfully configured {} cluster alert receivers: {}", + self.receivers.len(), + self.receivers + .iter() + .map(|r| r.name()) + .collect::>() + .join(", ") + ))) + } + + fn get_name(&self) -> InterpretName { + InterpretName::Custom("OpenshiftClusterAlertInterpret") + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +} diff --git a/harmony/src/modules/monitoring/okd/config.rs b/harmony/src/modules/monitoring/okd/config.rs new file mode 100644 index 0000000..b86c5f0 --- /dev/null +++ b/harmony/src/modules/monitoring/okd/config.rs @@ -0,0 +1,90 @@ +use std::{collections::BTreeMap, sync::Arc}; + +use crate::{ + interpret::{InterpretError, Outcome}, + topology::k8s::K8sClient, +}; +use k8s_openapi::api::core::v1::ConfigMap; +use kube::api::ObjectMeta; + +pub(crate) struct Config; + +impl Config { + pub async fn create_cluster_monitoring_config_cm( + client: &Arc, + ) -> Result { + let mut data = BTreeMap::new(); + data.insert( + "config.yaml".to_string(), + r#" +enableUserWorkload: true +alertmanagerMain: + enableUserAlertmanagerConfig: true +"# + .to_string(), + ); + + let cm = ConfigMap { + metadata: ObjectMeta { + name: Some("cluster-monitoring-config".to_string()), + namespace: Some("openshift-monitoring".to_string()), + ..Default::default() + }, + data: Some(data), + ..Default::default() + }; + client.apply(&cm, Some("openshift-monitoring")).await?; + + Ok(Outcome::success( + "updated cluster-monitoring-config-map".to_string(), + )) + } + + pub async fn create_user_workload_monitoring_config_cm( + client: &Arc, + ) -> Result { + let mut data = BTreeMap::new(); + data.insert( + "config.yaml".to_string(), + r#" +alertmanager: + enabled: true + enableAlertmanagerConfig: true +"# + .to_string(), + ); + let cm = ConfigMap { + metadata: ObjectMeta { + name: Some("user-workload-monitoring-config".to_string()), + namespace: Some("openshift-user-workload-monitoring".to_string()), + ..Default::default() + }, + data: Some(data), + ..Default::default() + }; + client + .apply(&cm, Some("openshift-user-workload-monitoring")) + .await?; + + Ok(Outcome::success( + "updated openshift-user-monitoring-config-map".to_string(), + )) + } + + pub async fn verify_user_workload(client: &Arc) -> Result { + let namespace = "openshift-user-workload-monitoring"; + let alertmanager_name = "alertmanager-user-workload-0"; + let prometheus_name = "prometheus-user-workload-0"; + client + .wait_for_pod_ready(alertmanager_name, Some(namespace)) + .await?; + client + .wait_for_pod_ready(prometheus_name, Some(namespace)) + .await?; + + Ok(Outcome::success(format!( + "pods: {}, {} ready in ns: {}", + alertmanager_name, prometheus_name, namespace + ))) + } +} diff --git a/harmony/src/modules/monitoring/okd/enable_user_workload.rs b/harmony/src/modules/monitoring/okd/enable_user_workload.rs index b322b4d..2ed0fa4 100644 --- a/harmony/src/modules/monitoring/okd/enable_user_workload.rs +++ b/harmony/src/modules/monitoring/okd/enable_user_workload.rs @@ -1,16 +1,13 @@ -use std::{collections::BTreeMap, sync::Arc}; - use crate::{ data::Version, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, + modules::monitoring::okd::config::Config, score::Score, - topology::{K8sclient, Topology, k8s::K8sClient}, + topology::{K8sclient, Topology}, }; use async_trait::async_trait; use harmony_types::id::Id; -use k8s_openapi::api::core::v1::ConfigMap; -use kube::api::ObjectMeta; use serde::Serialize; #[derive(Clone, Debug, Serialize)] @@ -37,10 +34,9 @@ impl Interpret for OpenshiftUserWorkloadMonitoringIn topology: &T, ) -> Result { let client = topology.k8s_client().await.unwrap(); - self.update_cluster_monitoring_config_cm(&client).await?; - self.update_user_workload_monitoring_config_cm(&client) - .await?; - self.verify_user_workload(&client).await?; + Config::create_cluster_monitoring_config_cm(&client).await?; + Config::create_user_workload_monitoring_config_cm(&client).await?; + Config::verify_user_workload(&client).await?; Ok(Outcome::success( "successfully enabled user-workload-monitoring".to_string(), )) @@ -62,88 +58,3 @@ impl Interpret for OpenshiftUserWorkloadMonitoringIn todo!() } } - -impl OpenshiftUserWorkloadMonitoringInterpret { - pub async fn update_cluster_monitoring_config_cm( - &self, - client: &Arc, - ) -> Result { - let mut data = BTreeMap::new(); - data.insert( - "config.yaml".to_string(), - r#" -enableUserWorkload: true -alertmanagerMain: - enableUserAlertmanagerConfig: true -"# - .to_string(), - ); - - let cm = ConfigMap { - metadata: ObjectMeta { - name: Some("cluster-monitoring-config".to_string()), - namespace: Some("openshift-monitoring".to_string()), - ..Default::default() - }, - data: Some(data), - ..Default::default() - }; - client.apply(&cm, Some("openshift-monitoring")).await?; - - Ok(Outcome::success( - "updated cluster-monitoring-config-map".to_string(), - )) - } - - pub async fn update_user_workload_monitoring_config_cm( - &self, - client: &Arc, - ) -> Result { - let mut data = BTreeMap::new(); - data.insert( - "config.yaml".to_string(), - r#" -alertmanager: - enabled: true - enableAlertmanagerConfig: true -"# - .to_string(), - ); - let cm = ConfigMap { - metadata: ObjectMeta { - name: Some("user-workload-monitoring-config".to_string()), - namespace: Some("openshift-user-workload-monitoring".to_string()), - ..Default::default() - }, - data: Some(data), - ..Default::default() - }; - client - .apply(&cm, Some("openshift-user-workload-monitoring")) - .await?; - - Ok(Outcome::success( - "updated openshift-user-monitoring-config-map".to_string(), - )) - } - - pub async fn verify_user_workload( - &self, - client: &Arc, - ) -> Result { - let namespace = "openshift-user-workload-monitoring"; - let alertmanager_name = "alertmanager-user-workload-0"; - let prometheus_name = "prometheus-user-workload-0"; - client - .wait_for_pod_ready(alertmanager_name, Some(namespace)) - .await?; - client - .wait_for_pod_ready(prometheus_name, Some(namespace)) - .await?; - - Ok(Outcome::success(format!( - "pods: {}, {} ready in ns: {}", - alertmanager_name, prometheus_name, namespace - ))) - } -} diff --git a/harmony/src/modules/monitoring/okd/mod.rs b/harmony/src/modules/monitoring/okd/mod.rs index 50339ba..ac246c5 100644 --- a/harmony/src/modules/monitoring/okd/mod.rs +++ b/harmony/src/modules/monitoring/okd/mod.rs @@ -1 +1,14 @@ +use crate::topology::oberservability::monitoring::AlertSender; + +pub mod cluster_monitoring; +pub(crate) mod config; pub mod enable_user_workload; + +#[derive(Debug)] +pub struct OpenshiftClusterAlertSender; + +impl AlertSender for OpenshiftClusterAlertSender { + fn name(&self) -> String { + "OpenshiftClusterAlertSender".to_string() + } +} diff --git a/harmony_types/src/k8s_name.rs b/harmony_types/src/k8s_name.rs new file mode 100644 index 0000000..9cb92ae --- /dev/null +++ b/harmony_types/src/k8s_name.rs @@ -0,0 +1,96 @@ +use std::str::FromStr; + +use serde::Serialize; + +#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)] +pub struct K8sName(pub String); + +impl K8sName { + #[cfg(test)] + pub fn dummy() -> Self { + K8sName("example".to_string()) + } + + fn is_valid(name: &str) -> bool { + if name.is_empty() || name.len() > 63 { + return false; + } + + let b = name.as_bytes(); + + if !b[0].is_ascii_alphanumeric() || !b[b.len() - 1].is_ascii_alphanumeric() { + return false; + } + + b.iter() + .all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == b'-') + } +} + +impl FromStr for K8sName { + type Err = K8sNameError; + + fn from_str(s: &str) -> Result { + if !Self::is_valid(s) { + return Err(K8sNameError::InvalidFormat(format!( + "Invalid Kubernetes resource name '{s}': \ + must match DNS-1123 (lowercase alphanumeric, hyphens, <=63 chars)" + ))); + }; + + Ok(K8sName(s.to_string())) + } +} + +#[derive(Debug)] +pub enum K8sNameError { + InvalidFormat(String), +} + +impl From<&K8sName> for String { + fn from(value: &K8sName) -> Self { + value.0.clone() + } +} + +impl std::fmt::Display for K8sName { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.0) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_valid_name() { + assert!(K8sName::from_str("k8s-name-test").is_ok()); + assert!(K8sName::from_str("n").is_ok()); + assert!(K8sName::from_str("node1").is_ok()); + assert!(K8sName::from_str("my-app-v2").is_ok()); + assert!(K8sName::from_str("service123").is_ok()); + assert!(K8sName::from_str("abcdefghijklmnopqrstuvwxyz-1234567890").is_ok()); + } + + #[test] + fn test_invalid_name() { + assert!(K8sName::from_str("").is_err()); + assert!(K8sName::from_str(".config").is_err()); + assert!(K8sName::from_str("_hidden").is_err()); + assert!(K8sName::from_str("UPPER-CASE").is_err()); + assert!(K8sName::from_str("123-$$$").is_err()); + assert!(K8sName::from_str("app!name").is_err()); + assert!(K8sName::from_str("my..app").is_err()); + assert!(K8sName::from_str("backend-").is_err()); + assert!(K8sName::from_str("-frontend").is_err()); + assert!(K8sName::from_str("InvalidName").is_err()); + assert!( + K8sName::from_str("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") + .is_err() + ); + assert!(K8sName::from_str("k8s name").is_err()); + assert!(K8sName::from_str("k8s_name").is_err()); + assert!(K8sName::from_str("k8s@name").is_err()); + } +} diff --git a/harmony_types/src/lib.rs b/harmony_types/src/lib.rs index 1ebdb97..a87f5c9 100644 --- a/harmony_types/src/lib.rs +++ b/harmony_types/src/lib.rs @@ -1,4 +1,5 @@ pub mod id; +pub mod k8s_name; pub mod net; pub mod storage; pub mod switch;