forked from NationTech/harmony
		
	fix: K8sTenantManager is responsible for concrete implementation. K8sAnywhere should delegate
This commit is contained in:
		
							parent
							
								
									3959c07261
								
							
						
					
					
						commit
						3eecc2f590
					
				| @ -7,7 +7,13 @@ use harmony::{ | |||||||
|         monitoring::{ |         monitoring::{ | ||||||
|             alert_channel::discord_alert_channel::DiscordWebhook, |             alert_channel::discord_alert_channel::DiscordWebhook, | ||||||
|             alert_rule::prometheus_alert_rule::AlertManagerRuleGroup, |             alert_rule::prometheus_alert_rule::AlertManagerRuleGroup, | ||||||
|             kube_prometheus::helm_prometheus_alert_score::HelmPrometheusAlertingScore, |             kube_prometheus::{ | ||||||
|  |                 helm_prometheus_alert_score::HelmPrometheusAlertingScore, | ||||||
|  |                 types::{ | ||||||
|  |                     HTTPScheme, MatchExpression, Operator, Selector, ServiceMonitor, | ||||||
|  |                     ServiceMonitorEndpoint, | ||||||
|  |                 }, | ||||||
|  |             }, | ||||||
|         }, |         }, | ||||||
|         prometheus::alerts::{ |         prometheus::alerts::{ | ||||||
|             infra::dell_server::{ |             infra::dell_server::{ | ||||||
|  | |||||||
| @ -1,3 +1,5 @@ | |||||||
|  | use std::collections::HashMap; | ||||||
|  | 
 | ||||||
| use harmony::{ | use harmony::{ | ||||||
|     data::Id, |     data::Id, | ||||||
|     inventory::Inventory, |     inventory::Inventory, | ||||||
| @ -6,7 +8,13 @@ use harmony::{ | |||||||
|         monitoring::{ |         monitoring::{ | ||||||
|             alert_channel::discord_alert_channel::DiscordWebhook, |             alert_channel::discord_alert_channel::DiscordWebhook, | ||||||
|             alert_rule::prometheus_alert_rule::AlertManagerRuleGroup, |             alert_rule::prometheus_alert_rule::AlertManagerRuleGroup, | ||||||
|             kube_prometheus::helm_prometheus_alert_score::HelmPrometheusAlertingScore, |             kube_prometheus::{ | ||||||
|  |                 helm_prometheus_alert_score::HelmPrometheusAlertingScore, | ||||||
|  |                 types::{ | ||||||
|  |                     HTTPScheme, MatchExpression, Operator, Selector, ServiceMonitor, | ||||||
|  |                     ServiceMonitorEndpoint, | ||||||
|  |                 }, | ||||||
|  |             }, | ||||||
|         }, |         }, | ||||||
|         prometheus::alerts::k8s::pvc::high_pvc_fill_rate_over_two_days, |         prometheus::alerts::k8s::pvc::high_pvc_fill_rate_over_two_days, | ||||||
|         tenant::TenantScore, |         tenant::TenantScore, | ||||||
| @ -44,9 +52,31 @@ async fn main() { | |||||||
|     let additional_rules = |     let additional_rules = | ||||||
|         AlertManagerRuleGroup::new("pvc-alerts", vec![high_pvc_fill_rate_over_two_days_alert]); |         AlertManagerRuleGroup::new("pvc-alerts", vec![high_pvc_fill_rate_over_two_days_alert]); | ||||||
| 
 | 
 | ||||||
|  |     let service_monitor_endpoint = ServiceMonitorEndpoint { | ||||||
|  |         port: Some("80".to_string()), | ||||||
|  |         path: "/metrics".to_string(), | ||||||
|  |         scheme: HTTPScheme::HTTP, | ||||||
|  |         ..Default::default() | ||||||
|  |     }; | ||||||
|  | 
 | ||||||
|  |     let service_monitor = ServiceMonitor { | ||||||
|  |         name: "test-service-monitor".to_string(), | ||||||
|  |         selector: Selector { | ||||||
|  |             match_labels: HashMap::new(), | ||||||
|  |             match_expressions: vec![MatchExpression { | ||||||
|  |                 key: "test".to_string(), | ||||||
|  |                 operator: Operator::In, | ||||||
|  |                 values: vec!["test-service".to_string()], | ||||||
|  |             }], | ||||||
|  |         }, | ||||||
|  |         endpoints: vec![service_monitor_endpoint], | ||||||
|  |         ..Default::default() | ||||||
|  |     }; | ||||||
|  | 
 | ||||||
|     let alerting_score = HelmPrometheusAlertingScore { |     let alerting_score = HelmPrometheusAlertingScore { | ||||||
|         receivers: vec![Box::new(discord_receiver)], |         receivers: vec![Box::new(discord_receiver)], | ||||||
|         rules: vec![Box::new(additional_rules)], |         rules: vec![Box::new(additional_rules)], | ||||||
|  |         service_monitors: vec![service_monitor], | ||||||
|     }; |     }; | ||||||
|     let mut maestro = Maestro::<K8sAnywhereTopology>::initialize( |     let mut maestro = Maestro::<K8sAnywhereTopology>::initialize( | ||||||
|         Inventory::autoload(), |         Inventory::autoload(), | ||||||
|  | |||||||
| @ -4,7 +4,7 @@ use crate::{interpret::InterpretError, inventory::Inventory}; | |||||||
| 
 | 
 | ||||||
| #[async_trait] | #[async_trait] | ||||||
| pub trait Installable<T>: Send + Sync { | pub trait Installable<T>: Send + Sync { | ||||||
|     fn configure(&self, inventory: &Inventory, topology: &T) -> Result<(), InterpretError>; |     async fn configure(&self, inventory: &Inventory, topology: &T) -> Result<(), InterpretError>; | ||||||
| 
 | 
 | ||||||
|     async fn ensure_installed( |     async fn ensure_installed( | ||||||
|         &self, |         &self, | ||||||
|  | |||||||
| @ -39,7 +39,6 @@ pub struct K8sAnywhereTopology { | |||||||
|     k8s_state: Arc<OnceCell<Option<K8sState>>>, |     k8s_state: Arc<OnceCell<Option<K8sState>>>, | ||||||
|     tenant_manager: Arc<OnceCell<K8sTenantManager>>, |     tenant_manager: Arc<OnceCell<K8sTenantManager>>, | ||||||
|     config: Arc<K8sAnywhereConfig>, |     config: Arc<K8sAnywhereConfig>, | ||||||
|     tenant_manager_config: OnceCell<TenantConfig>, |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[async_trait] | #[async_trait] | ||||||
| @ -74,7 +73,6 @@ impl K8sAnywhereTopology { | |||||||
|             k8s_state: Arc::new(OnceCell::new()), |             k8s_state: Arc::new(OnceCell::new()), | ||||||
|             tenant_manager: Arc::new(OnceCell::new()), |             tenant_manager: Arc::new(OnceCell::new()), | ||||||
|             config: Arc::new(K8sAnywhereConfig::from_env()), |             config: Arc::new(K8sAnywhereConfig::from_env()), | ||||||
|             tenant_manager_config: OnceCell::new(), |  | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| @ -83,7 +81,6 @@ impl K8sAnywhereTopology { | |||||||
|             k8s_state: Arc::new(OnceCell::new()), |             k8s_state: Arc::new(OnceCell::new()), | ||||||
|             tenant_manager: Arc::new(OnceCell::new()), |             tenant_manager: Arc::new(OnceCell::new()), | ||||||
|             config: Arc::new(config), |             config: Arc::new(config), | ||||||
|             tenant_manager_config: OnceCell::new(), |  | ||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| @ -199,16 +196,10 @@ impl K8sAnywhereTopology { | |||||||
|                 let k8s_client = self.k8s_client().await?; |                 let k8s_client = self.k8s_client().await?; | ||||||
|                 Ok(K8sTenantManager::new(k8s_client)) |                 Ok(K8sTenantManager::new(k8s_client)) | ||||||
|             }) |             }) | ||||||
|             .await |             .await?; | ||||||
|             .unwrap(); |  | ||||||
| 
 | 
 | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
|     async fn store_tenant_config(&self, config: TenantConfig) { |  | ||||||
|         self.tenant_manager_config |  | ||||||
|             .get_or_init(|| async { config }) |  | ||||||
|             .await; |  | ||||||
|     } |  | ||||||
| 
 | 
 | ||||||
|     fn get_k8s_tenant_manager(&self) -> Result<&K8sTenantManager, ExecutorError> { |     fn get_k8s_tenant_manager(&self) -> Result<&K8sTenantManager, ExecutorError> { | ||||||
|         match self.tenant_manager.get() { |         match self.tenant_manager.get() { | ||||||
| @ -289,13 +280,15 @@ impl HelmCommand for K8sAnywhereTopology {} | |||||||
| #[async_trait] | #[async_trait] | ||||||
| impl TenantManager for K8sAnywhereTopology { | impl TenantManager for K8sAnywhereTopology { | ||||||
|     async fn provision_tenant(&self, config: &TenantConfig) -> Result<(), ExecutorError> { |     async fn provision_tenant(&self, config: &TenantConfig) -> Result<(), ExecutorError> { | ||||||
|         self.store_tenant_config(config.clone()).await; |  | ||||||
|         self.get_k8s_tenant_manager()? |         self.get_k8s_tenant_manager()? | ||||||
|             .provision_tenant(config) |             .provision_tenant(config) | ||||||
|             .await |             .await | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     fn get_tenant_config(&self) -> Option<TenantConfig> { |     async fn get_tenant_config(&self) -> Option<TenantConfig> { | ||||||
|         self.tenant_manager_config.get().cloned() |         self.get_k8s_tenant_manager() | ||||||
|  |             .ok()? | ||||||
|  |             .get_tenant_config() | ||||||
|  |             .await | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  | |||||||
| @ -27,7 +27,7 @@ impl<S: AlertSender + Installable<T>, T: Topology> Interpret<T> for AlertingInte | |||||||
|         inventory: &Inventory, |         inventory: &Inventory, | ||||||
|         topology: &T, |         topology: &T, | ||||||
|     ) -> Result<Outcome, InterpretError> { |     ) -> Result<Outcome, InterpretError> { | ||||||
|         self.sender.configure(inventory, topology)?; |         self.sender.configure(inventory, topology).await?; | ||||||
|         for receiver in self.receivers.iter() { |         for receiver in self.receivers.iter() { | ||||||
|             receiver.install(&self.sender).await?; |             receiver.install(&self.sender).await?; | ||||||
|         } |         } | ||||||
|  | |||||||
| @ -5,7 +5,6 @@ use crate::{ | |||||||
|     topology::k8s::{ApplyStrategy, K8sClient}, |     topology::k8s::{ApplyStrategy, K8sClient}, | ||||||
| }; | }; | ||||||
| use async_trait::async_trait; | use async_trait::async_trait; | ||||||
| use derive_new::new; |  | ||||||
| use k8s_openapi::{ | use k8s_openapi::{ | ||||||
|     api::{ |     api::{ | ||||||
|         core::v1::{LimitRange, Namespace, ResourceQuota}, |         core::v1::{LimitRange, Namespace, ResourceQuota}, | ||||||
| @ -19,12 +18,23 @@ use kube::Resource; | |||||||
| use log::{debug, info, warn}; | use log::{debug, info, warn}; | ||||||
| use serde::de::DeserializeOwned; | use serde::de::DeserializeOwned; | ||||||
| use serde_json::json; | use serde_json::json; | ||||||
|  | use tokio::sync::OnceCell; | ||||||
| 
 | 
 | ||||||
| use super::{TenantConfig, TenantManager}; | use super::{TenantConfig, TenantManager}; | ||||||
| 
 | 
 | ||||||
| #[derive(new, Clone, Debug)] | #[derive(Clone, Debug)] | ||||||
| pub struct K8sTenantManager { | pub struct K8sTenantManager { | ||||||
|     k8s_client: Arc<K8sClient>, |     k8s_client: Arc<K8sClient>, | ||||||
|  |     k8s_tenant_config: Arc<OnceCell<TenantConfig>>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl K8sTenantManager { | ||||||
|  |     pub fn new(client: Arc<K8sClient>) -> Self { | ||||||
|  |         Self { | ||||||
|  |             k8s_client: client, | ||||||
|  |             k8s_tenant_config: Arc::new(OnceCell::new()), | ||||||
|  |         } | ||||||
|  |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl K8sTenantManager { | impl K8sTenantManager { | ||||||
| @ -391,6 +401,9 @@ impl K8sTenantManager { | |||||||
| 
 | 
 | ||||||
|         Ok(network_policy) |         Ok(network_policy) | ||||||
|     } |     } | ||||||
|  |     fn store_config(&self, config: &TenantConfig) { | ||||||
|  |         let _ = self.k8s_tenant_config.set(config.clone()); | ||||||
|  |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[async_trait] | #[async_trait] | ||||||
| @ -419,9 +432,10 @@ impl TenantManager for K8sTenantManager { | |||||||
|             "Success provisionning K8s tenant id {} name {}", |             "Success provisionning K8s tenant id {} name {}", | ||||||
|             config.id, config.name |             config.id, config.name | ||||||
|         ); |         ); | ||||||
|  |         self.store_config(config); | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
|     fn get_tenant_config(&self) -> Option<TenantConfig> { |     async fn get_tenant_config(&self) -> Option<TenantConfig> { | ||||||
|         todo!() |         self.k8s_tenant_config.get().cloned() | ||||||
|     } |     } | ||||||
| } | } | ||||||
|  | |||||||
| @ -16,5 +16,5 @@ pub trait TenantManager { | |||||||
|     /// * `config`: The desired configuration for the new tenant.
 |     /// * `config`: The desired configuration for the new tenant.
 | ||||||
|     async fn provision_tenant(&self, config: &TenantConfig) -> Result<(), ExecutorError>; |     async fn provision_tenant(&self, config: &TenantConfig) -> Result<(), ExecutorError>; | ||||||
| 
 | 
 | ||||||
|     fn get_tenant_config(&self) -> Option<TenantConfig>; |     async fn get_tenant_config(&self) -> Option<TenantConfig>; | ||||||
| } | } | ||||||
|  | |||||||
| @ -11,7 +11,9 @@ use std::{ | |||||||
| use crate::modules::{ | use crate::modules::{ | ||||||
|     helm::chart::HelmChartScore, |     helm::chart::HelmChartScore, | ||||||
|     monitoring::kube_prometheus::types::{ |     monitoring::kube_prometheus::types::{ | ||||||
|         AlertGroup, AlertManager, AlertManagerAdditionalPromRules, AlertManagerConfig, AlertManagerRoute, AlertManagerSpec, AlertManagerValues, ConfigReloader, Limits, PrometheusConfig, Requests, Resources |         AlertGroup, AlertManager, AlertManagerAdditionalPromRules, AlertManagerConfig, | ||||||
|  |         AlertManagerRoute, AlertManagerSpec, AlertManagerValues, ConfigReloader, Limits, | ||||||
|  |         PrometheusConfig, Requests, Resources, | ||||||
|     }, |     }, | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -35,8 +35,8 @@ impl AlertSender for Prometheus { | |||||||
| 
 | 
 | ||||||
| #[async_trait] | #[async_trait] | ||||||
| impl<T: Topology + HelmCommand + TenantManager> Installable<T> for Prometheus { | impl<T: Topology + HelmCommand + TenantManager> Installable<T> for Prometheus { | ||||||
|     fn configure(&self, _inventory: &Inventory, topology: &T) -> Result<(), InterpretError> { |     async fn configure(&self, _inventory: &Inventory, topology: &T) -> Result<(), InterpretError> { | ||||||
|         self.configure_with_topology(topology); |         self.configure_with_topology(topology).await; | ||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
| @ -62,9 +62,10 @@ impl Prometheus { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub fn configure_with_topology<T: TenantManager>(&self, topology: &T) { |     pub async fn configure_with_topology<T: TenantManager>(&self, topology: &T) { | ||||||
|         let ns = topology |         let ns = topology | ||||||
|             .get_tenant_config() |             .get_tenant_config() | ||||||
|  |             .await | ||||||
|             .map(|cfg| cfg.name.clone()) |             .map(|cfg| cfg.name.clone()) | ||||||
|             .unwrap_or_else(|| "monitoring".to_string()); |             .unwrap_or_else(|| "monitoring".to_string()); | ||||||
|         error!("This must be refactored, see comments in pr #74"); |         error!("This must be refactored, see comments in pr #74"); | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user