forked from NationTech/harmony
		
	fix: Ensure idempotent deployment of Discord adapters
Updates to use for managing Discord webhook adapter deployments. This prevents redundant installations by ensuring that the deployment interpret for a given adapter configuration is executed only once. The internal state now uses a keyed by the alertreceiver ID to track and initialize each receiver's .
This commit is contained in:
		
							parent
							
								
									2ca732cecd
								
							
						
					
					
						commit
						cd40660350
					
				| @ -1,9 +1,9 @@ | ||||
| use std::{process::Command, sync::Arc}; | ||||
| use std::{collections::HashMap, process::Command, sync::Arc}; | ||||
| 
 | ||||
| use async_trait::async_trait; | ||||
| use inquire::Confirm; | ||||
| use log::{info, warn}; | ||||
| use tokio::sync::OnceCell; | ||||
| use tokio::sync::{Mutex, OnceCell}; | ||||
| 
 | ||||
| use crate::{ | ||||
|     executors::ExecutorError, | ||||
| @ -17,6 +17,7 @@ use crate::{ | ||||
| use super::{ | ||||
|     HelmCommand, K8sclient, Topology, | ||||
|     k8s::K8sClient, | ||||
|     oberservability::monitoring::AlertReceiver, | ||||
|     tenant::{ | ||||
|         ResourceLimits, TenantConfig, TenantManager, TenantNetworkPolicy, k8s::K8sTenantManager, | ||||
|     }, | ||||
| @ -37,6 +38,7 @@ enum K8sSource { | ||||
| pub struct K8sAnywhereTopology { | ||||
|     k8s_state: OnceCell<Option<K8sState>>, | ||||
|     tenant_manager: OnceCell<K8sTenantManager>, | ||||
|     pub alert_receivers: Mutex<HashMap<String, OnceCell<AlertReceiver>>>, | ||||
| } | ||||
| 
 | ||||
| #[async_trait] | ||||
| @ -61,6 +63,7 @@ impl K8sAnywhereTopology { | ||||
|         Self { | ||||
|             k8s_state: OnceCell::new(), | ||||
|             tenant_manager: OnceCell::new(), | ||||
|             alert_receivers: Mutex::new(HashMap::new()), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|  | ||||
| @ -1,7 +1,7 @@ | ||||
| use async_trait::async_trait; | ||||
| use serde::Serialize; | ||||
| 
 | ||||
| use std::fmt::Debug; | ||||
| use url::Url; | ||||
| 
 | ||||
| use crate::interpret::InterpretError; | ||||
| 
 | ||||
| @ -26,6 +26,8 @@ pub trait Monitor<T: Topology>: Debug + Send + Sync { | ||||
|     ) -> Result<Outcome, InterpretError>; | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, Clone, Serialize)] | ||||
| pub struct AlertReceiver { | ||||
|     pub receiver_id: String, | ||||
|     pub receiver_installed: bool, | ||||
| } | ||||
|  | ||||
| @ -1,3 +1,5 @@ | ||||
| use std::sync::Arc; | ||||
| 
 | ||||
| use super::discord_alert_manager::discord_alert_manager_score; | ||||
| use async_trait::async_trait; | ||||
| use serde::Serialize; | ||||
| @ -10,7 +12,9 @@ use crate::{ | ||||
|     interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, | ||||
|     inventory::Inventory, | ||||
|     score::Score, | ||||
|     topology::{HelmCommand, K8sAnywhereTopology, Topology}, | ||||
|     topology::{ | ||||
|         HelmCommand, K8sAnywhereTopology, Topology, oberservability::monitoring::AlertReceiver, | ||||
|     }, | ||||
| }; | ||||
| 
 | ||||
| #[derive(Debug, Clone, Serialize)] | ||||
| @ -20,17 +24,11 @@ pub struct DiscordWebhookConfig { | ||||
|     pub send_resolved_notifications: bool, | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, Clone)] | ||||
| pub struct DiscordWebhookReceiverState { | ||||
|     installed: OnceCell<Outcome>, | ||||
| } | ||||
| 
 | ||||
| #[async_trait] | ||||
| pub trait DiscordWebhookReceiver { | ||||
|     async fn deploy_discord_webhook_receiver( | ||||
|         &self, | ||||
|         config: DiscordWebhookConfig, | ||||
|         state: DiscordWebhookReceiverState, | ||||
|     ) -> Result<Outcome, InterpretError>; | ||||
|     fn delete_discord_webhook_receiver( | ||||
|         &self, | ||||
| @ -54,19 +52,33 @@ impl DiscordWebhookReceiver for K8sAnywhereTopology { | ||||
|     async fn deploy_discord_webhook_receiver( | ||||
|         &self, | ||||
|         config: DiscordWebhookConfig, | ||||
|         state: DiscordWebhookReceiverState, | ||||
|     ) -> Result<Outcome, InterpretError> { | ||||
|         let discord_webhook_receiver_score = DiscordWebhookReceiverScore { config }; | ||||
|         let state = state | ||||
|             .installed | ||||
|             .get_or_try_init(|| { | ||||
|                 let inventory = Inventory::autoload(); | ||||
|                 let interpret = discord_webhook_receiver_score.create_interpret(); | ||||
|                 async move { interpret.execute(&inventory, self).await } | ||||
|         let receiver_key = config.name.clone(); | ||||
|         let mut adapters_map_guard = self.alert_receivers.lock().await; | ||||
| 
 | ||||
|         let cell = adapters_map_guard | ||||
|             .entry(receiver_key.clone()) | ||||
|             .or_insert_with(OnceCell::new); | ||||
| 
 | ||||
|         if let Some(initialized_receiver) = cell.get() { | ||||
|             return Ok(Outcome::success(format!( | ||||
|                 "Discord Webhook adapter for '{}' already initialized.", | ||||
|                 initialized_receiver.receiver_id | ||||
|             ))); | ||||
|         } | ||||
| 
 | ||||
|         let final_state = cell | ||||
|             .get_or_try_init(|| async { | ||||
|                 initialize_discord_webhook_receiver(config.clone(), self).await | ||||
|             }) | ||||
|             .await?; | ||||
|         Ok(state.clone()) | ||||
| 
 | ||||
|         Ok(Outcome::success(format!( | ||||
|             "Discord Webhook Receiver for '{}' ensured/initialized.", | ||||
|             final_state.receiver_id | ||||
|         ))) | ||||
|     } | ||||
| 
 | ||||
|     fn delete_discord_webhook_receiver( | ||||
|         &self, | ||||
|         _config: DiscordWebhookConfig, | ||||
| @ -75,6 +87,27 @@ impl DiscordWebhookReceiver for K8sAnywhereTopology { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| async fn initialize_discord_webhook_receiver( | ||||
|     conf: DiscordWebhookConfig, | ||||
|     topology: &K8sAnywhereTopology, | ||||
| ) -> Result<AlertReceiver, InterpretError> { | ||||
|     println!( | ||||
|         "Attempting to initialize Discord adapter for: {}", | ||||
|         conf.name | ||||
|     ); | ||||
|     let score = DiscordWebhookReceiverScore { | ||||
|         config: conf.clone(), | ||||
|     }; | ||||
|     let inventory = Inventory::autoload(); | ||||
|     let interpret = score.create_interpret(); | ||||
| 
 | ||||
|     interpret.execute(&inventory, topology).await?; | ||||
| 
 | ||||
|     Ok(AlertReceiver { | ||||
|         receiver_id: conf.name, | ||||
|         receiver_installed: true, | ||||
|     }) | ||||
| } | ||||
| #[derive(Debug, Clone, Serialize)] | ||||
| struct DiscordWebhookReceiverScore { | ||||
|     config: DiscordWebhookConfig, | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user