diff --git a/harmony/src/domain/topology/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere.rs index ef11f36..54472b9 100644 --- a/harmony/src/domain/topology/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere.rs @@ -1,9 +1,9 @@ -use std::{process::Command, sync::Arc}; +use std::{collections::HashMap, process::Command, sync::Arc}; use async_trait::async_trait; use inquire::Confirm; use log::{info, warn}; -use tokio::sync::OnceCell; +use tokio::sync::{Mutex, OnceCell}; use crate::{ executors::ExecutorError, @@ -17,6 +17,7 @@ use crate::{ use super::{ HelmCommand, K8sclient, Topology, k8s::K8sClient, + oberservability::monitoring::AlertReceiver, tenant::{ ResourceLimits, TenantConfig, TenantManager, TenantNetworkPolicy, k8s::K8sTenantManager, }, @@ -37,6 +38,7 @@ enum K8sSource { pub struct K8sAnywhereTopology { k8s_state: OnceCell>, tenant_manager: OnceCell, + pub alert_receivers: Mutex>>, } #[async_trait] @@ -61,6 +63,7 @@ impl K8sAnywhereTopology { Self { k8s_state: OnceCell::new(), tenant_manager: OnceCell::new(), + alert_receivers: Mutex::new(HashMap::new()), } } diff --git a/harmony/src/domain/topology/oberservability/monitoring.rs b/harmony/src/domain/topology/oberservability/monitoring.rs index 4603eba..97f040f 100644 --- a/harmony/src/domain/topology/oberservability/monitoring.rs +++ b/harmony/src/domain/topology/oberservability/monitoring.rs @@ -1,7 +1,7 @@ use async_trait::async_trait; +use serde::Serialize; use std::fmt::Debug; -use url::Url; use crate::interpret::InterpretError; @@ -26,6 +26,8 @@ pub trait Monitor: Debug + Send + Sync { ) -> Result; } +#[derive(Debug, Clone, Serialize)] pub struct AlertReceiver { pub receiver_id: String, + pub receiver_installed: bool, } diff --git a/harmony/src/modules/monitoring/discord_webhook_sender.rs b/harmony/src/modules/monitoring/discord_webhook_sender.rs index f4295b7..f87ba0c 100644 --- a/harmony/src/modules/monitoring/discord_webhook_sender.rs +++ b/harmony/src/modules/monitoring/discord_webhook_sender.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use super::discord_alert_manager::discord_alert_manager_score; use async_trait::async_trait; use serde::Serialize; @@ -10,7 +12,9 @@ use crate::{ interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, score::Score, - topology::{HelmCommand, K8sAnywhereTopology, Topology}, + topology::{ + HelmCommand, K8sAnywhereTopology, Topology, oberservability::monitoring::AlertReceiver, + }, }; #[derive(Debug, Clone, Serialize)] @@ -20,17 +24,11 @@ pub struct DiscordWebhookConfig { pub send_resolved_notifications: bool, } -#[derive(Debug, Clone)] -pub struct DiscordWebhookReceiverState { - installed: OnceCell, -} - #[async_trait] pub trait DiscordWebhookReceiver { async fn deploy_discord_webhook_receiver( &self, config: DiscordWebhookConfig, - state: DiscordWebhookReceiverState, ) -> Result; fn delete_discord_webhook_receiver( &self, @@ -54,19 +52,33 @@ impl DiscordWebhookReceiver for K8sAnywhereTopology { async fn deploy_discord_webhook_receiver( &self, config: DiscordWebhookConfig, - state: DiscordWebhookReceiverState, ) -> Result { - let discord_webhook_receiver_score = DiscordWebhookReceiverScore { config }; - let state = state - .installed - .get_or_try_init(|| { - let inventory = Inventory::autoload(); - let interpret = discord_webhook_receiver_score.create_interpret(); - async move { interpret.execute(&inventory, self).await } + let receiver_key = config.name.clone(); + let mut adapters_map_guard = self.alert_receivers.lock().await; + + let cell = adapters_map_guard + .entry(receiver_key.clone()) + .or_insert_with(OnceCell::new); + + if let Some(initialized_receiver) = cell.get() { + return Ok(Outcome::success(format!( + "Discord Webhook adapter for '{}' already initialized.", + initialized_receiver.receiver_id + ))); + } + + let final_state = cell + .get_or_try_init(|| async { + initialize_discord_webhook_receiver(config.clone(), self).await }) .await?; - Ok(state.clone()) + + Ok(Outcome::success(format!( + "Discord Webhook Receiver for '{}' ensured/initialized.", + final_state.receiver_id + ))) } + fn delete_discord_webhook_receiver( &self, _config: DiscordWebhookConfig, @@ -75,6 +87,27 @@ impl DiscordWebhookReceiver for K8sAnywhereTopology { } } +async fn initialize_discord_webhook_receiver( + conf: DiscordWebhookConfig, + topology: &K8sAnywhereTopology, +) -> Result { + println!( + "Attempting to initialize Discord adapter for: {}", + conf.name + ); + let score = DiscordWebhookReceiverScore { + config: conf.clone(), + }; + let inventory = Inventory::autoload(); + let interpret = score.create_interpret(); + + interpret.execute(&inventory, topology).await?; + + Ok(AlertReceiver { + receiver_id: conf.name, + receiver_installed: true, + }) +} #[derive(Debug, Clone, Serialize)] struct DiscordWebhookReceiverScore { config: DiscordWebhookConfig,