Compare commits

..

No commits in common. "3eecc2f5901beaea484d78c383a13116a84cab80" and "82119076cffbeee10539b61ac8d635f2bf40ec58" have entirely different histories.

10 changed files with 27 additions and 155 deletions

View File

@ -7,13 +7,7 @@ use harmony::{
monitoring::{ monitoring::{
alert_channel::discord_alert_channel::DiscordWebhook, alert_channel::discord_alert_channel::DiscordWebhook,
alert_rule::prometheus_alert_rule::AlertManagerRuleGroup, alert_rule::prometheus_alert_rule::AlertManagerRuleGroup,
kube_prometheus::{ kube_prometheus::helm_prometheus_alert_score::HelmPrometheusAlertingScore,
helm_prometheus_alert_score::HelmPrometheusAlertingScore,
types::{
HTTPScheme, MatchExpression, Operator, Selector, ServiceMonitor,
ServiceMonitorEndpoint,
},
},
}, },
prometheus::alerts::{ prometheus::alerts::{
infra::dell_server::{ infra::dell_server::{

View File

@ -1,5 +1,3 @@
use std::collections::HashMap;
use harmony::{ use harmony::{
data::Id, data::Id,
inventory::Inventory, inventory::Inventory,
@ -8,13 +6,7 @@ use harmony::{
monitoring::{ monitoring::{
alert_channel::discord_alert_channel::DiscordWebhook, alert_channel::discord_alert_channel::DiscordWebhook,
alert_rule::prometheus_alert_rule::AlertManagerRuleGroup, alert_rule::prometheus_alert_rule::AlertManagerRuleGroup,
kube_prometheus::{ kube_prometheus::helm_prometheus_alert_score::HelmPrometheusAlertingScore,
helm_prometheus_alert_score::HelmPrometheusAlertingScore,
types::{
HTTPScheme, MatchExpression, Operator, Selector, ServiceMonitor,
ServiceMonitorEndpoint,
},
},
}, },
prometheus::alerts::k8s::pvc::high_pvc_fill_rate_over_two_days, prometheus::alerts::k8s::pvc::high_pvc_fill_rate_over_two_days,
tenant::TenantScore, tenant::TenantScore,
@ -52,31 +44,9 @@ async fn main() {
let additional_rules = let additional_rules =
AlertManagerRuleGroup::new("pvc-alerts", vec![high_pvc_fill_rate_over_two_days_alert]); AlertManagerRuleGroup::new("pvc-alerts", vec![high_pvc_fill_rate_over_two_days_alert]);
let service_monitor_endpoint = ServiceMonitorEndpoint {
port: Some("80".to_string()),
path: "/metrics".to_string(),
scheme: HTTPScheme::HTTP,
..Default::default()
};
let service_monitor = ServiceMonitor {
name: "test-service-monitor".to_string(),
selector: Selector {
match_labels: HashMap::new(),
match_expressions: vec![MatchExpression {
key: "test".to_string(),
operator: Operator::In,
values: vec!["test-service".to_string()],
}],
},
endpoints: vec![service_monitor_endpoint],
..Default::default()
};
let alerting_score = HelmPrometheusAlertingScore { let alerting_score = HelmPrometheusAlertingScore {
receivers: vec![Box::new(discord_receiver)], receivers: vec![Box::new(discord_receiver)],
rules: vec![Box::new(additional_rules)], rules: vec![Box::new(additional_rules)],
service_monitors: vec![service_monitor],
}; };
let mut maestro = Maestro::<K8sAnywhereTopology>::initialize( let mut maestro = Maestro::<K8sAnywhereTopology>::initialize(
Inventory::autoload(), Inventory::autoload(),

View File

@ -4,7 +4,7 @@ use crate::{interpret::InterpretError, inventory::Inventory};
#[async_trait] #[async_trait]
pub trait Installable<T>: Send + Sync { pub trait Installable<T>: Send + Sync {
async fn configure(&self, inventory: &Inventory, topology: &T) -> Result<(), InterpretError>; fn configure(&self, inventory: &Inventory, topology: &T) -> Result<(), InterpretError>;
async fn ensure_installed( async fn ensure_installed(
&self, &self,

View File

@ -39,6 +39,7 @@ pub struct K8sAnywhereTopology {
k8s_state: Arc<OnceCell<Option<K8sState>>>, k8s_state: Arc<OnceCell<Option<K8sState>>>,
tenant_manager: Arc<OnceCell<K8sTenantManager>>, tenant_manager: Arc<OnceCell<K8sTenantManager>>,
config: Arc<K8sAnywhereConfig>, config: Arc<K8sAnywhereConfig>,
tenant_manager_config: OnceCell<TenantConfig>,
} }
#[async_trait] #[async_trait]
@ -73,6 +74,7 @@ impl K8sAnywhereTopology {
k8s_state: Arc::new(OnceCell::new()), k8s_state: Arc::new(OnceCell::new()),
tenant_manager: Arc::new(OnceCell::new()), tenant_manager: Arc::new(OnceCell::new()),
config: Arc::new(K8sAnywhereConfig::from_env()), config: Arc::new(K8sAnywhereConfig::from_env()),
tenant_manager_config: OnceCell::new(),
} }
} }
@ -81,6 +83,7 @@ impl K8sAnywhereTopology {
k8s_state: Arc::new(OnceCell::new()), k8s_state: Arc::new(OnceCell::new()),
tenant_manager: Arc::new(OnceCell::new()), tenant_manager: Arc::new(OnceCell::new()),
config: Arc::new(config), config: Arc::new(config),
tenant_manager_config: OnceCell::new(),
} }
} }
@ -196,10 +199,16 @@ impl K8sAnywhereTopology {
let k8s_client = self.k8s_client().await?; let k8s_client = self.k8s_client().await?;
Ok(K8sTenantManager::new(k8s_client)) Ok(K8sTenantManager::new(k8s_client))
}) })
.await?; .await
.unwrap();
Ok(()) Ok(())
} }
async fn store_tenant_config(&self, config: TenantConfig) {
self.tenant_manager_config
.get_or_init(|| async { config })
.await;
}
fn get_k8s_tenant_manager(&self) -> Result<&K8sTenantManager, ExecutorError> { fn get_k8s_tenant_manager(&self) -> Result<&K8sTenantManager, ExecutorError> {
match self.tenant_manager.get() { match self.tenant_manager.get() {
@ -280,15 +289,13 @@ impl HelmCommand for K8sAnywhereTopology {}
#[async_trait] #[async_trait]
impl TenantManager for K8sAnywhereTopology { impl TenantManager for K8sAnywhereTopology {
async fn provision_tenant(&self, config: &TenantConfig) -> Result<(), ExecutorError> { async fn provision_tenant(&self, config: &TenantConfig) -> Result<(), ExecutorError> {
self.store_tenant_config(config.clone()).await;
self.get_k8s_tenant_manager()? self.get_k8s_tenant_manager()?
.provision_tenant(config) .provision_tenant(config)
.await .await
} }
async fn get_tenant_config(&self) -> Option<TenantConfig> { fn get_tenant_config(&self) -> Option<TenantConfig> {
self.get_k8s_tenant_manager() self.tenant_manager_config.get().cloned()
.ok()?
.get_tenant_config()
.await
} }
} }

View File

@ -27,7 +27,7 @@ impl<S: AlertSender + Installable<T>, T: Topology> Interpret<T> for AlertingInte
inventory: &Inventory, inventory: &Inventory,
topology: &T, topology: &T,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
self.sender.configure(inventory, topology).await?; self.sender.configure(inventory, topology)?;
for receiver in self.receivers.iter() { for receiver in self.receivers.iter() {
receiver.install(&self.sender).await?; receiver.install(&self.sender).await?;
} }

View File

@ -5,9 +5,10 @@ use crate::{
topology::k8s::{ApplyStrategy, K8sClient}, topology::k8s::{ApplyStrategy, K8sClient},
}; };
use async_trait::async_trait; use async_trait::async_trait;
use derive_new::new;
use k8s_openapi::{ use k8s_openapi::{
api::{ api::{
core::v1::{LimitRange, Namespace, ResourceQuota}, core::v1::{Namespace, ResourceQuota},
networking::v1::{ networking::v1::{
NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule, NetworkPolicyPort, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule, NetworkPolicyPort,
}, },
@ -18,23 +19,12 @@ use kube::Resource;
use log::{debug, info, warn}; use log::{debug, info, warn};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde_json::json; use serde_json::json;
use tokio::sync::OnceCell;
use super::{TenantConfig, TenantManager}; use super::{TenantConfig, TenantManager};
#[derive(Clone, Debug)] #[derive(new, Clone, Debug)]
pub struct K8sTenantManager { pub struct K8sTenantManager {
k8s_client: Arc<K8sClient>, k8s_client: Arc<K8sClient>,
k8s_tenant_config: Arc<OnceCell<TenantConfig>>,
}
impl K8sTenantManager {
pub fn new(client: Arc<K8sClient>) -> Self {
Self {
k8s_client: client,
k8s_tenant_config: Arc::new(OnceCell::new()),
}
}
} }
impl K8sTenantManager { impl K8sTenantManager {
@ -142,43 +132,6 @@ impl K8sTenantManager {
}) })
} }
fn build_limit_range(&self, config: &TenantConfig) -> Result<LimitRange, ExecutorError> {
let limit_range = json!({
"apiVersion": "v1",
"kind": "LimitRange",
"metadata": {
"name": format!("{}-defaults", config.name),
"labels": {
"harmony.nationtech.io/tenant.id": config.id.to_string(),
"harmony.nationtech.io/tenant.name": config.name,
},
"namespace": self.get_namespace_name(config),
},
"spec": {
"limits": [
{
"type": "Container",
"default": {
"cpu": "500m",
"memory": "500Mi"
},
"defaultRequest": {
"cpu": "100m",
"memory": "100Mi"
},
}
]
}
});
serde_json::from_value(limit_range).map_err(|e| {
ExecutorError::ConfigurationError(format!(
"Could not build TenantManager LimitRange. {}",
e
))
})
}
fn build_network_policy(&self, config: &TenantConfig) -> Result<NetworkPolicy, ExecutorError> { fn build_network_policy(&self, config: &TenantConfig) -> Result<NetworkPolicy, ExecutorError> {
let network_policy = json!({ let network_policy = json!({
"apiVersion": "networking.k8s.io/v1", "apiVersion": "networking.k8s.io/v1",
@ -401,9 +354,6 @@ impl K8sTenantManager {
Ok(network_policy) Ok(network_policy)
} }
fn store_config(&self, config: &TenantConfig) {
let _ = self.k8s_tenant_config.set(config.clone());
}
} }
#[async_trait] #[async_trait]
@ -412,7 +362,6 @@ impl TenantManager for K8sTenantManager {
let namespace = self.build_namespace(config)?; let namespace = self.build_namespace(config)?;
let resource_quota = self.build_resource_quota(config)?; let resource_quota = self.build_resource_quota(config)?;
let network_policy = self.build_network_policy(config)?; let network_policy = self.build_network_policy(config)?;
let resource_limit_range = self.build_limit_range(config)?;
self.ensure_constraints(&namespace)?; self.ensure_constraints(&namespace)?;
@ -422,9 +371,6 @@ impl TenantManager for K8sTenantManager {
debug!("Creating resource_quota for tenant {}", config.name); debug!("Creating resource_quota for tenant {}", config.name);
self.apply_resource(resource_quota, config).await?; self.apply_resource(resource_quota, config).await?;
debug!("Creating limit_range for tenant {}", config.name);
self.apply_resource(resource_limit_range, config).await?;
debug!("Creating network_policy for tenant {}", config.name); debug!("Creating network_policy for tenant {}", config.name);
self.apply_resource(network_policy, config).await?; self.apply_resource(network_policy, config).await?;
@ -432,10 +378,9 @@ impl TenantManager for K8sTenantManager {
"Success provisionning K8s tenant id {} name {}", "Success provisionning K8s tenant id {} name {}",
config.id, config.name config.id, config.name
); );
self.store_config(config);
Ok(()) Ok(())
} }
async fn get_tenant_config(&self) -> Option<TenantConfig> { fn get_tenant_config(&self) -> Option<TenantConfig> {
self.k8s_tenant_config.get().cloned() todo!()
} }
} }

View File

@ -16,5 +16,5 @@ pub trait TenantManager {
/// * `config`: The desired configuration for the new tenant. /// * `config`: The desired configuration for the new tenant.
async fn provision_tenant(&self, config: &TenantConfig) -> Result<(), ExecutorError>; async fn provision_tenant(&self, config: &TenantConfig) -> Result<(), ExecutorError>;
async fn get_tenant_config(&self) -> Option<TenantConfig>; fn get_tenant_config(&self) -> Option<TenantConfig>;
} }

View File

@ -1,41 +0,0 @@
use async_trait::async_trait;
use serde::Serialize;
use crate::topology::Topology;
/// An ApplicationFeature provided by harmony, such as Backups, Monitoring, MultisiteAvailability,
/// ContinuousIntegration, ContinuousDelivery
#[async_trait]
pub trait ApplicationFeature<T: Topology>:
std::fmt::Debug + Send + Sync + ApplicationFeatureClone<T>
{
async fn ensure_installed(&self, topology: &T) -> Result<(), String>;
fn name(&self) -> String;
}
trait ApplicationFeatureClone<T: Topology> {
fn clone_box(&self) -> Box<dyn ApplicationFeature<T>>;
}
impl<A, T: Topology> ApplicationFeatureClone<T> for A
where
A: ApplicationFeature<T> + Clone + 'static,
{
fn clone_box(&self) -> Box<dyn ApplicationFeature<T>> {
Box::new(self.clone())
}
}
impl<T: Topology> Serialize for Box<dyn ApplicationFeature<T>> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
todo!()
}
}
impl<T: Topology> Clone for Box<dyn ApplicationFeature<T>> {
fn clone(&self) -> Self {
self.clone_box()
}
}

View File

@ -11,9 +11,7 @@ use std::{
use crate::modules::{ use crate::modules::{
helm::chart::HelmChartScore, helm::chart::HelmChartScore,
monitoring::kube_prometheus::types::{ monitoring::kube_prometheus::types::{
AlertGroup, AlertManager, AlertManagerAdditionalPromRules, AlertManagerConfig, AlertGroup, AlertManager, AlertManagerAdditionalPromRules, AlertManagerConfig, AlertManagerRoute, AlertManagerSpec, AlertManagerValues, ConfigReloader, Limits, PrometheusConfig, Requests, Resources
AlertManagerRoute, AlertManagerSpec, AlertManagerValues, ConfigReloader, Limits,
PrometheusConfig, Requests, Resources,
}, },
}; };

View File

@ -35,8 +35,8 @@ impl AlertSender for Prometheus {
#[async_trait] #[async_trait]
impl<T: Topology + HelmCommand + TenantManager> Installable<T> for Prometheus { impl<T: Topology + HelmCommand + TenantManager> Installable<T> for Prometheus {
async fn configure(&self, _inventory: &Inventory, topology: &T) -> Result<(), InterpretError> { fn configure(&self, _inventory: &Inventory, topology: &T) -> Result<(), InterpretError> {
self.configure_with_topology(topology).await; self.configure_with_topology(topology);
Ok(()) Ok(())
} }
@ -62,10 +62,9 @@ impl Prometheus {
} }
} }
pub async fn configure_with_topology<T: TenantManager>(&self, topology: &T) { pub fn configure_with_topology<T: TenantManager>(&self, topology: &T) {
let ns = topology let ns = topology
.get_tenant_config() .get_tenant_config()
.await
.map(|cfg| cfg.name.clone()) .map(|cfg| cfg.name.clone())
.unwrap_or_else(|| "monitoring".to_string()); .unwrap_or_else(|| "monitoring".to_string());
error!("This must be refactored, see comments in pr #74"); error!("This must be refactored, see comments in pr #74");