Compare commits
	
		
			6 Commits
		
	
	
		
			master
			...
			feat/disco
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| b5b77cf1ac | |||
| dda8e29843 | |||
| b33650e9d5 | |||
| ea4709a409 | |||
| cd40660350 | |||
| 2ca732cecd | 
| @ -1,10 +1,11 @@ | |||||||
| use std::{process::Command, sync::Arc}; | use std::{collections::HashMap, process::Command, sync::Arc}; | ||||||
| 
 | 
 | ||||||
| use async_trait::async_trait; | use async_trait::async_trait; | ||||||
| use inquire::Confirm; | use inquire::Confirm; | ||||||
| use log::{info, warn}; | use log::{info, warn}; | ||||||
| use tokio::sync::OnceCell; | use tokio::sync::{Mutex, OnceCell}; | ||||||
| 
 | 
 | ||||||
|  | use crate::score::Score; | ||||||
| use crate::{ | use crate::{ | ||||||
|     executors::ExecutorError, |     executors::ExecutorError, | ||||||
|     interpret::{InterpretError, Outcome}, |     interpret::{InterpretError, Outcome}, | ||||||
| @ -17,6 +18,7 @@ use crate::{ | |||||||
| use super::{ | use super::{ | ||||||
|     HelmCommand, K8sclient, Topology, |     HelmCommand, K8sclient, Topology, | ||||||
|     k8s::K8sClient, |     k8s::K8sClient, | ||||||
|  |     oberservability::monitoring::{AlertReceiver, AlertReceiverProvision}, | ||||||
|     tenant::{ |     tenant::{ | ||||||
|         ResourceLimits, TenantConfig, TenantManager, TenantNetworkPolicy, k8s::K8sTenantManager, |         ResourceLimits, TenantConfig, TenantManager, TenantNetworkPolicy, k8s::K8sTenantManager, | ||||||
|     }, |     }, | ||||||
| @ -37,6 +39,7 @@ enum K8sSource { | |||||||
| pub struct K8sAnywhereTopology { | pub struct K8sAnywhereTopology { | ||||||
|     k8s_state: OnceCell<Option<K8sState>>, |     k8s_state: OnceCell<Option<K8sState>>, | ||||||
|     tenant_manager: OnceCell<K8sTenantManager>, |     tenant_manager: OnceCell<K8sTenantManager>, | ||||||
|  |     pub alert_receivers: Mutex<HashMap<String, OnceCell<AlertReceiver>>>, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[async_trait] | #[async_trait] | ||||||
| @ -61,9 +64,29 @@ impl K8sAnywhereTopology { | |||||||
|         Self { |         Self { | ||||||
|             k8s_state: OnceCell::new(), |             k8s_state: OnceCell::new(), | ||||||
|             tenant_manager: OnceCell::new(), |             tenant_manager: OnceCell::new(), | ||||||
|  |             alert_receivers: Mutex::new(HashMap::new()), | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     pub async fn initialize_alert_receiver<C>( | ||||||
|  |         &self, | ||||||
|  |         config: &C, | ||||||
|  |         inventory: &Inventory, | ||||||
|  |     ) -> Result<AlertReceiver, InterpretError> | ||||||
|  |     where | ||||||
|  |         Self: Topology + HelmCommand, | ||||||
|  |         C: AlertReceiverProvision<Self> + Send + Sync, | ||||||
|  |     { | ||||||
|  |         let score = config.get_deployment_score(); | ||||||
|  |         let interpret = score.create_interpret(); | ||||||
|  |         interpret.execute(inventory, self).await?; | ||||||
|  | 
 | ||||||
|  |         Ok(AlertReceiver { | ||||||
|  |             receiver_id: config.alert_receiver_id(), | ||||||
|  |             receiver_installed: true, | ||||||
|  |         }) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     fn is_helm_available(&self) -> Result<(), String> { |     fn is_helm_available(&self) -> Result<(), String> { | ||||||
|         let version_result = Command::new("helm") |         let version_result = Command::new("helm") | ||||||
|             .arg("version") |             .arg("version") | ||||||
|  | |||||||
| @ -1,14 +1,18 @@ | |||||||
| use async_trait::async_trait; | use async_trait::async_trait; | ||||||
|  | use dyn_clone::DynClone; | ||||||
|  | use serde::Serialize; | ||||||
| 
 | 
 | ||||||
| use std::fmt::Debug; | use std::fmt::Debug; | ||||||
| use url::Url; |  | ||||||
| 
 | 
 | ||||||
| use crate::interpret::InterpretError; | use crate::interpret::InterpretError; | ||||||
| 
 | 
 | ||||||
|  | use crate::inventory::Inventory; | ||||||
|  | use crate::score::Score; | ||||||
|  | use crate::topology::HelmCommand; | ||||||
| use crate::{interpret::Outcome, topology::Topology}; | use crate::{interpret::Outcome, topology::Topology}; | ||||||
| 
 | 
 | ||||||
| /// Represents an entity responsible for collecting and organizing observability data
 | /// Represents an entity responsible for collecting and organizing observability data
 | ||||||
| /// from various telemetry sources
 | /// from various telemetry sources such as Prometheus or Datadog
 | ||||||
| /// A `Monitor` abstracts the logic required to scrape, aggregate, and structure
 | /// A `Monitor` abstracts the logic required to scrape, aggregate, and structure
 | ||||||
| /// monitoring data, enabling consistent processing regardless of the underlying data source.
 | /// monitoring data, enabling consistent processing regardless of the underlying data source.
 | ||||||
| #[async_trait] | #[async_trait] | ||||||
| @ -26,6 +30,26 @@ pub trait Monitor<T: Topology>: Debug + Send + Sync { | |||||||
|     ) -> Result<Outcome, InterpretError>; |     ) -> Result<Outcome, InterpretError>; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | #[async_trait] | ||||||
|  | pub trait EnsureAlertReceiver<T: Topology>: Debug + DynClone + Send + Sync { | ||||||
|  |     async fn ensure_alert_receiver( | ||||||
|  |         &self, | ||||||
|  |         inventory: Inventory, | ||||||
|  |         topology: &T, | ||||||
|  |     ) -> Result<Outcome, InterpretError>; | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | dyn_clone::clone_trait_object!(<T> EnsureAlertReceiver<T>); | ||||||
|  | 
 | ||||||
|  | #[derive(Debug, Clone, Serialize)] | ||||||
| pub struct AlertReceiver { | pub struct AlertReceiver { | ||||||
|     pub receiver_id: String, |     pub receiver_id: String, | ||||||
|  |     pub receiver_installed: bool, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | /// Provides the ability to turn an alert config into an executable score
 | ||||||
|  | /// for the topology
 | ||||||
|  | pub trait AlertReceiverProvision<T: Topology + HelmCommand> { | ||||||
|  |     fn get_deployment_score(&self) -> Box<dyn Score<T>>; | ||||||
|  |     fn alert_receiver_id(&self) -> String; | ||||||
| } | } | ||||||
|  | |||||||
| @ -1,32 +1,104 @@ | |||||||
|  | use super::discord_alert_manager::discord_alert_manager_score; | ||||||
| use async_trait::async_trait; | use async_trait::async_trait; | ||||||
| use serde_json::Value; | use serde::Serialize; | ||||||
|  | use serde_yaml::Value; | ||||||
|  | use tokio::sync::OnceCell; | ||||||
| use url::Url; | use url::Url; | ||||||
| 
 | 
 | ||||||
| use crate::{ | use crate::{ | ||||||
|     interpret::{InterpretError, Outcome}, |     interpret::{Interpret, InterpretError, Outcome}, | ||||||
|     topology::K8sAnywhereTopology, |     inventory::Inventory, | ||||||
|  |     score::Score, | ||||||
|  |     topology::{ | ||||||
|  |         HelmCommand, K8sAnywhereTopology, Topology, | ||||||
|  |         oberservability::monitoring::{AlertReceiverProvision, EnsureAlertReceiver}, | ||||||
|  |     }, | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
| #[derive(Debug, Clone)] | #[derive(Debug, Clone, Serialize)] | ||||||
| pub struct DiscordWebhookConfig { | pub struct DiscordWebhookConfig { | ||||||
|     pub webhook_url: Url, |     pub webhook_url: Url, | ||||||
|     pub name: String, |     pub name: String, | ||||||
|     pub send_resolved_notifications: bool, |     pub send_resolved_notifications: bool, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub trait DiscordWebhookReceiver { | #[async_trait] | ||||||
|     fn deploy_discord_webhook_receiver( | impl<T: Topology + DiscordWebhookReceiver> EnsureAlertReceiver<T> for DiscordWebhookConfig { | ||||||
|  |     async fn ensure_alert_receiver( | ||||||
|         &self, |         &self, | ||||||
|         _notification_adapter_id: &str, |         inventory: Inventory, | ||||||
|     ) -> Result<Outcome, InterpretError>; |         topology: &T, | ||||||
|  |     ) -> Result<Outcome, InterpretError> { | ||||||
|  |         topology | ||||||
|  |             .ensure_discord_webhook_receiver(&inventory, self.clone()) | ||||||
|  |             .await | ||||||
|  |     } | ||||||
|  | } | ||||||
| 
 | 
 | ||||||
|  | impl<T: Topology + HelmCommand> AlertReceiverProvision<T> for DiscordWebhookConfig { | ||||||
|  |     fn get_deployment_score(&self) -> Box<dyn Score<T>> { | ||||||
|  |         Box::new(DiscordWebhookReceiverScore { | ||||||
|  |             config: self.clone(), | ||||||
|  |         }) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn alert_receiver_id(&self) -> String { | ||||||
|  |         self.name.clone() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[async_trait] | ||||||
|  | pub trait DiscordWebhookReceiver { | ||||||
|  |     async fn ensure_discord_webhook_receiver( | ||||||
|  |         &self, | ||||||
|  |         inventory: &Inventory, | ||||||
|  |         config: DiscordWebhookConfig, | ||||||
|  |     ) -> Result<Outcome, InterpretError>; | ||||||
|     fn delete_discord_webhook_receiver( |     fn delete_discord_webhook_receiver( | ||||||
|         &self, |         &self, | ||||||
|         _notification_adapter_id: &str, |         config: DiscordWebhookConfig, | ||||||
|     ) -> Result<Outcome, InterpretError>; |     ) -> Result<Outcome, InterpretError>; | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // trait used to generate alert manager config values impl<T: Topology + AlertManagerConfig> Monitor for KubePrometheus
 | #[async_trait] | ||||||
|  | impl DiscordWebhookReceiver for K8sAnywhereTopology { | ||||||
|  |     async fn ensure_discord_webhook_receiver( | ||||||
|  |         &self, | ||||||
|  |         inventory: &Inventory, | ||||||
|  |         config: DiscordWebhookConfig, | ||||||
|  |     ) -> Result<Outcome, InterpretError> { | ||||||
|  |         let receiver_key = config.name.clone(); | ||||||
|  |         let mut adapters_map_guard = self.alert_receivers.lock().await; | ||||||
|  | 
 | ||||||
|  |         let cell = adapters_map_guard | ||||||
|  |             .entry(receiver_key.clone()) | ||||||
|  |             .or_insert_with(OnceCell::new); | ||||||
|  | 
 | ||||||
|  |         if let Some(initialized_receiver) = cell.get() { | ||||||
|  |             return Ok(Outcome::success(format!( | ||||||
|  |                 "Discord Webhook adapter for '{}' already initialized.", | ||||||
|  |                 initialized_receiver.receiver_id | ||||||
|  |             ))); | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         let final_state = cell | ||||||
|  |             .get_or_try_init(|| async { self.initialize_alert_receiver(&config, inventory).await }) | ||||||
|  |             .await?; | ||||||
|  | 
 | ||||||
|  |         Ok(Outcome::success(format!( | ||||||
|  |             "Discord Webhook Receiver for '{}' ensured/initialized.", | ||||||
|  |             final_state.receiver_id | ||||||
|  |         ))) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn delete_discord_webhook_receiver( | ||||||
|  |         &self, | ||||||
|  |         _config: DiscordWebhookConfig, | ||||||
|  |     ) -> Result<Outcome, InterpretError> { | ||||||
|  |         todo!() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
| pub trait AlertManagerConfig<T> { | pub trait AlertManagerConfig<T> { | ||||||
|     fn get_alert_manager_config(&self) -> Result<Value, InterpretError>; |     fn get_alert_manager_config(&self) -> Result<Value, InterpretError>; | ||||||
| } | } | ||||||
| @ -38,18 +110,22 @@ impl<T: DiscordWebhookReceiver> AlertManagerConfig<T> for DiscordWebhookConfig { | |||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[async_trait] | #[derive(Debug, Clone, Serialize)] | ||||||
| impl DiscordWebhookReceiver for K8sAnywhereTopology { | struct DiscordWebhookReceiverScore { | ||||||
|     fn deploy_discord_webhook_receiver( |     config: DiscordWebhookConfig, | ||||||
|         &self, |  | ||||||
|         _notification_adapter_id: &str, |  | ||||||
|     ) -> Result<Outcome, InterpretError> { |  | ||||||
|         todo!() |  | ||||||
| } | } | ||||||
|     fn delete_discord_webhook_receiver( | 
 | ||||||
|         &self, | impl<T: Topology + HelmCommand> Score<T> for DiscordWebhookReceiverScore { | ||||||
|         _notification_adapter_id: &str, |     fn create_interpret(&self) -> Box<dyn Interpret<T>> { | ||||||
|     ) -> Result<Outcome, InterpretError> { |         discord_alert_manager_score( | ||||||
|         todo!() |             self.config.webhook_url.clone(), | ||||||
|  |             self.config.name.clone(), | ||||||
|  |             self.config.name.clone(), | ||||||
|  |         ) | ||||||
|  |         .create_interpret() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn name(&self) -> String { | ||||||
|  |         "DiscordWebhookReceiverScore".to_string() | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user