wip: impl k8sMonitor
Some checks failed
Run Check Script / check (push) Failing after 45s
Run Check Script / check (pull_request) Failing after 42s

This commit is contained in:
Willem 2025-06-11 13:35:07 -04:00
parent dbc66f3d0c
commit 238d1f85e2
10 changed files with 354 additions and 224 deletions

View File

@ -10,13 +10,18 @@ use crate::{
interpret::{InterpretError, Outcome},
inventory::Inventory,
maestro::Maestro,
modules::k3d::K3DInstallationScore,
modules::{k3d::K3DInstallationScore, monitoring::kube_prometheus::kube_prometheus_helm_chart_score::kube_prometheus_helm_chart_score},
topology::LocalhostTopology,
};
use super::{
HelmCommand, K8sclient, Topology,
k8s::K8sClient,
oberservability::{
K8sMonitorConfig,
k8s::K8sMonitor,
monitoring::{AlertChannel, AlertChannelConfig, Monitor},
},
tenant::{
ResourceLimits, TenantConfig, TenantManager, TenantNetworkPolicy, k8s::K8sTenantManager,
},
@ -37,6 +42,7 @@ enum K8sSource {
pub struct K8sAnywhereTopology {
k8s_state: OnceCell<Option<K8sState>>,
tenant_manager: OnceCell<K8sTenantManager>,
k8s_monitor: OnceCell<K8sMonitor>,
}
#[async_trait]
@ -61,6 +67,7 @@ impl K8sAnywhereTopology {
Self {
k8s_state: OnceCell::new(),
tenant_manager: OnceCell::new(),
k8s_monitor: OnceCell::new(),
}
}
@ -178,6 +185,32 @@ impl K8sAnywhereTopology {
)),
}
}
async fn ensure_k8s_monitor(&self) -> Result<(), String> {
if let Some(_) = self.k8s_monitor.get() {
return Ok(());
}
self.k8s_monitor
.get_or_try_init(async || -> Result<K8sMonitor, String> {
let config = K8sMonitorConfig {
chart: kube_prometheus_helm_chart_score(),
};
Ok(K8sMonitor { config })
})
.await
.unwrap();
Ok(())
}
fn get_k8s_monitor(&self) -> Result<&K8sMonitor, ExecutorError> {
match self.k8s_monitor.get() {
Some(k) => Ok(k),
None => Err(ExecutorError::UnexpectedError(
"K8sMonitor not available".to_string(),
)),
}
}
}
struct K8sAnywhereConfig {
@ -217,6 +250,10 @@ impl Topology for K8sAnywhereTopology {
"No K8s client could be found or installed".to_string(),
))?;
self.ensure_k8s_monitor()
.await
.map_err(|e| InterpretError::new(e))?;
match self.is_helm_available() {
Ok(()) => Ok(Outcome::success(format!(
"{} + helm available",
@ -263,3 +300,18 @@ impl TenantManager for K8sAnywhereTopology {
.await
}
}
#[async_trait]
impl Monitor for K8sAnywhereTopology {
async fn provision_monitor(
&self,
alert_receivers: Option<Vec<Box<dyn AlertChannelConfig>>>,
) -> Result<Outcome, InterpretError> {
self.get_k8s_monitor()?
.provision_monitor(alert_receivers)
.await
}
fn delete_monitor(&self) -> Result<Outcome, InterpretError> {
todo!()
}
}

View File

@ -0,0 +1,68 @@
use std::sync::Arc;
use async_trait::async_trait;
use serde::Serialize;
use crate::{
interpret::{InterpretError, Outcome},
inventory::Inventory,
modules::{helm::chart::HelmChartInterpret, monitoring::kube_prometheus::{
config::KubePrometheusConfig,
kube_prometheus_helm_chart_score::kube_prometheus_helm_chart_score,
}},
topology::{K8sAnywhereTopology, Topology},
};
use super::{
K8sMonitorConfig,
monitoring::{AlertChannel, AlertChannelConfig, Monitor},
};
#[derive(Debug, Clone, Serialize)]
pub struct K8sMonitor {
pub config: K8sMonitorConfig,
}
#[async_trait]
impl Monitor for K8sMonitor {
async fn provision_monitor(
&self,
alert_receivers: Option<Vec<Box<dyn AlertChannelConfig>>>,
) -> Result<Outcome, InterpretError> {
if let Some(receivers) = alert_receivers {
let alert_channels = self.build_alert_channels(receivers).await?;
for channel in alert_channels {
channel.register_alert_channel().await?;
}
}
let chart = self.config.chart.clone();
//
chart.create_interpret();
Ok(Outcome::success("installed monitor".to_string()))
}
fn delete_monitor(&self) -> Result<Outcome, InterpretError> {
todo!()
}
}
#[async_trait]
impl AlertChannelConfig for K8sMonitor {
async fn build_alert_channel(&self) -> Result<Box<dyn AlertChannel>, InterpretError> {
todo!()
}
}
impl K8sMonitor {
pub async fn build_alert_channels(
&self,
alert_channel_configs: Vec<Box<dyn AlertChannelConfig>>,
) -> Result<Vec<Box<dyn AlertChannel>>, InterpretError> {
let mut alert_channels = Vec::new();
for config in alert_channel_configs {
let channel = config.build_alert_channel().await?;
alert_channels.push(channel)
}
Ok(alert_channels)
}
}

View File

@ -1 +1,12 @@
use serde::Serialize;
use crate::modules::helm::chart::HelmChartScore;
pub mod monitoring;
pub mod k8s;
#[derive(Debug, Clone, Serialize)]
pub struct K8sMonitorConfig {
//probably need to do something better here
pub chart: HelmChartScore,
}

View File

@ -1,8 +1,13 @@
use async_trait::async_trait;
use dyn_clone::DynClone;
use serde::Serialize;
use std::fmt::Debug;
use std::sync::Arc;
use crate::executors::ExecutorError;
use crate::interpret::InterpretError;
use crate::inventory::Inventory;
use crate::{interpret::Outcome, topology::Topology};
/// Represents an entity responsible for collecting and organizing observability data
@ -10,29 +15,24 @@ use crate::{interpret::Outcome, topology::Topology};
/// A `Monitor` abstracts the logic required to scrape, aggregate, and structure
/// monitoring data, enabling consistent processing regardless of the underlying data source.
#[async_trait]
pub trait Monitor<T: Topology>: Debug + Send + Sync {
async fn deploy_monitor(
pub trait Monitor {
async fn provision_monitor(
&self,
topology: &T,
alert_receivers: Vec<Box<dyn AlertChannel>>,
alert_receivers: Option<Vec<Box<dyn AlertChannelConfig>>>,
) -> Result<Outcome, InterpretError>;
fn delete_monitor(
&self,
topolgy: &T,
alert_receivers: Vec<Box<dyn AlertChannel>>,
) -> Result<Outcome, InterpretError>;
}
pub trait MonitorConfig<T: Topology>: Debug + Send + Sync {
fn build_monitor(&self) -> Box<dyn Monitor<T>>;
}
pub trait AlertChannelConfig: std::fmt::Debug + Send + Sync {
fn build_alert_channel(&self) -> Box<dyn AlertChannel>;
fn channel_type(&self) -> String;
fn delete_monitor(&self) -> Result<Outcome, InterpretError>;
}
#[async_trait]
pub trait AlertChannel: Debug + Send + Sync {
async fn get_channel_id(&self) -> String;
async fn register_alert_channel(&self) -> Result<Outcome, ExecutorError>;
//async fn get_channel_id(&self) -> String;
}
#[async_trait]
pub trait AlertChannelConfig: Debug + Send + Sync + DynClone {
async fn build_alert_channel(&self) -> Result<Box<dyn AlertChannel>, InterpretError>;
}
dyn_clone::clone_trait_object!(AlertChannelConfig);

View File

@ -1,14 +1,6 @@
use async_trait::async_trait;
use serde::Serialize;
use url::Url;
use crate::{
modules::monitoring::kube_prometheus::{
kube_prometheus_monitor::PrometheusAlertChannel,
types::{AlertManagerGlobalConfigs, AlertManagerReceiver, AlertManagerRoute},
},
topology::oberservability::monitoring::{AlertChannel, AlertChannelConfig},
};
#[derive(Debug, Clone)]
pub struct DiscordWebhookAlertChannel {
@ -17,36 +9,36 @@ pub struct DiscordWebhookAlertChannel {
pub send_resolved_notifications: bool,
}
impl AlertChannelConfig for DiscordWebhookAlertChannel {
fn build_alert_channel(&self) -> Box<dyn AlertChannel> {
Box::new(DiscordWebhookAlertChannel {
webhook_url: self.webhook_url.clone(),
name: self.name.clone(),
send_resolved_notifications: self.send_resolved_notifications.clone(),
})
}
fn channel_type(&self) -> String {
"discord".to_string()
}
}
#[async_trait]
impl AlertChannel for DiscordWebhookAlertChannel {
async fn get_channel_id(&self) -> String {
self.name.clone()
}
}
impl PrometheusAlertChannel for DiscordWebhookAlertChannel {
fn get_alert_channel_global_settings(&self) -> Option<AlertManagerGlobalConfigs> {
None
}
fn get_alert_channel_route(&self) -> AlertManagerRoute {
todo!()
}
fn get_alert_channel_receiver(&self) -> AlertManagerReceiver {
todo!()
}
}
//impl AlertChannelConfig for DiscordWebhookAlertChannel {
// fn build_alert_channel(&self) -> Box<dyn AlertChannel> {
// Box::new(DiscordWebhookAlertChannel {
// webhook_url: self.webhook_url.clone(),
// name: self.name.clone(),
// send_resolved_notifications: self.send_resolved_notifications.clone(),
// })
// }
// fn channel_type(&self) -> String {
// "discord".to_string()
// }
//}
//
//#[async_trait]
//impl AlertChannel for DiscordWebhookAlertChannel {
// async fn get_channel_id(&self) -> String {
// self.name.clone()
// }
//}
//
//impl PrometheusAlertChannel for DiscordWebhookAlertChannel {
// fn get_alert_channel_global_settings(&self) -> Option<AlertManagerChannelGlobalConfigs> {
// None
// }
//
// fn get_alert_channel_route(&self) -> AlertManagerChannelRoute {
// todo!()
// }
//
// fn get_alert_channel_receiver(&self) -> AlertManagerChannelReceiver {
// todo!()
// }
//}

View File

@ -1,5 +1,7 @@
use serde::Serialize;
use super::types::AlertManagerChannelConfig;
#[derive(Debug, Clone, Serialize)]
pub struct KubePrometheusConfig {
pub namespace: String,
@ -19,6 +21,7 @@ pub struct KubePrometheusConfig {
pub kube_proxy: bool,
pub kube_state_metrics: bool,
pub prometheus_operator: bool,
pub alert_channels: Vec<AlertManagerChannelConfig>,
}
impl KubePrometheusConfig {
pub fn new() -> Self {
@ -40,6 +43,7 @@ impl KubePrometheusConfig {
prometheus_operator: true,
core_dns: false,
kube_scheduler: false,
alert_channels: Vec::new(),
}
}
}

View File

@ -5,7 +5,9 @@ use std::str::FromStr;
use crate::modules::helm::chart::HelmChartScore;
pub fn kube_prometheus_helm_chart_score(config: &KubePrometheusConfig) -> HelmChartScore {
pub fn kube_prometheus_helm_chart_score() -> HelmChartScore {
let config = KubePrometheusConfig::new();
//TODO this should be make into a rule with default formatting that can be easily passed as a vec
//to the overrides or something leaving the user to deal with formatting here seems bad
let default_rules = config.default_rules.to_string();
@ -143,7 +145,21 @@ prometheus:
enabled: {prometheus}
"#,
);
HelmChartScore {
namespace: Some(NonBlankString::from_str(&config.namespace).unwrap()),
release_name: NonBlankString::from_str("kube-prometheus").unwrap(),
chart_name: NonBlankString::from_str(
"oci://ghcr.io/prometheus-community/charts/kube-prometheus-stack",
)
.unwrap(),
chart_version: None,
values_overrides: None,
values_yaml: Some(values.to_string()),
create_namespace: true,
install_only: true,
repository: None,
}
}
// let alertmanager_config = alert_manager_yaml_builder(&config);
// values.push_str(&alertmanager_config);
//
@ -204,58 +220,44 @@ prometheus:
// alertmanager_config
// }
HelmChartScore {
namespace: Some(NonBlankString::from_str(&config.namespace).unwrap()),
release_name: NonBlankString::from_str("kube-prometheus").unwrap(),
chart_name: NonBlankString::from_str(
"oci://ghcr.io/prometheus-community/charts/kube-prometheus-stack",
)
.unwrap(),
chart_version: None,
values_overrides: None,
values_yaml: Some(values.to_string()),
create_namespace: true,
install_only: true,
repository: None,
}
}
fn discord_alert_builder(release_name: &String) -> (String, String) {
let discord_receiver_name = format!("Discord-{}", release_name);
let receiver = format!(
r#"
- name: '{discord_receiver_name}'
webhook_configs:
- url: 'http://{release_name}-alertmanager-discord:9094'
send_resolved: true"#,
);
let route = format!(
r#"
- receiver: '{discord_receiver_name}'
matchers:
- alertname!=Watchdog
continue: true"#,
);
(receiver, route)
}
fn slack_alert_builder(slack_channel: &String) -> (String, String) {
let slack_receiver_name = format!("Slack-{}", slack_channel);
let receiver = format!(
r#"
- name: '{slack_receiver_name}'
slack_configs:
- channel: '{slack_channel}'
send_resolved: true
title: '{{{{ .CommonAnnotations.title }}}}'
text: '{{{{ .CommonAnnotations.description }}}}'"#,
);
let route = format!(
r#"
- receiver: '{slack_receiver_name}'
matchers:
- alertname!=Watchdog
continue: true"#,
);
(receiver, route)
}
//fn discord_alert_builder(release_name: &String) -> (String, String) {
// let discord_receiver_name = format!("Discord-{}", release_name);
// let receiver = format!(
// r#"
// - name: '{discord_receiver_name}'
// webhook_configs:
// - url: 'http://{release_name}-alertmanager-discord:9094'
// send_resolved: true"#,
// );
// let route = format!(
// r#"
// - receiver: '{discord_receiver_name}'
// matchers:
// - alertname!=Watchdog
// continue: true"#,
// );
// (receiver, route)
//}
//
//fn slack_alert_builder(slack_channel: &String) -> (String, String) {
// let slack_receiver_name = format!("Slack-{}", slack_channel);
// let receiver = format!(
// r#"
// - name: '{slack_receiver_name}'
// slack_configs:
// - channel: '{slack_channel}'
// send_resolved: true
// title: '{{{{ .CommonAnnotations.title }}}}'
// text: '{{{{ .CommonAnnotations.description }}}}'"#,
// );
// let route = format!(
// r#"
// - receiver: '{slack_receiver_name}'
// matchers:
// - alertname!=Watchdog
// continue: true"#,
// );
// (receiver, route)
//}

View File

@ -1,51 +1,87 @@
use async_trait::async_trait;
use serde::Serialize;
//#[derive(Debug, Clone, Serialize)]
//pub struct KubePrometheusMonitorScore {
// pub kube_prometheus_config: KubePrometheusConfig,
// pub alert_channel_configs: Vec<dyn AlertChannelConfig>,
//}
use crate::{
interpret::{InterpretError, Outcome},
topology::{
Topology,
oberservability::monitoring::{AlertChannel, Monitor, MonitorConfig},
},
};
//impl<T: Topology + Debug + HelmCommand + Monitor<T>> MonitorConfig<T>
// for KubePrometheusMonitorScore
//{
// fn build_monitor(&self) -> Box<dyn Monitor<T>> {
// Box::new(self.clone())
// }
//}
use super::{
config::KubePrometheusConfig,
types::{AlertManagerGlobalConfigs, AlertManagerReceiver, AlertManagerRoute},
};
//impl<T: Topology + HelmCommand + Debug + Clone + 'static + Monitor<T>> Score<T>
// for KubePrometheusMonitorScore
//{
// fn create_interpret(&self) -> Box<dyn Interpret<T>> {
// Box::new(KubePrometheusMonitorInterpret {
// score: self.clone(),
// })
// }
//
// fn name(&self) -> String {
// "KubePrometheusMonitorScore".to_string()
// }
//}
#[derive(Debug, Clone, Serialize)]
pub struct KubePrometheusMonitor {
pub kube_prometheus_config: KubePrometheusConfig,
}
//#[derive(Debug, Clone)]
//pub struct KubePrometheusMonitorInterpret {
// score: KubePrometheusMonitorScore,
//}
impl<T: Topology> MonitorConfig<T> for KubePrometheusMonitor {
fn build_monitor(&self) -> Box<dyn Monitor<T>> {
Box::new(self.clone())
}
}
//#[async_trait]
//impl AlertChannelConfig for KubePrometheusMonitorInterpret {
// async fn build_alert_channel(
// &self,
// ) -> Box<dyn AlertChannel> {
// todo!()
// }
//}
//#[async_trait]
//impl<T: Topology + HelmCommand + Debug + Monitor<T>> Interpret<T>
// for KubePrometheusMonitorInterpret
//{
// async fn execute(
// &self,
// inventory: &Inventory,
// topology: &T,
// ) -> Result<Outcome, InterpretError> {
// let monitor = self.score.build_monitor();
//
// let mut alert_channels = Vec::new();
// //for config in self.score.alert_channel_configs {
// // alert_channels.push(self.build_alert_channel());
// //}
//
// monitor
// .deploy_monitor(inventory, topology, alert_channels)
// .await
// }
//
// fn get_name(&self) -> InterpretName {
// todo!()
// }
//
// fn get_version(&self) -> Version {
// todo!()
// }
//
// fn get_status(&self) -> InterpretStatus {
// todo!()
// }
//
// fn get_children(&self) -> Vec<Id> {
// todo!()
// }
//}
#[async_trait]
pub trait PrometheusAlertChannel: AlertChannel {
fn get_alert_channel_global_settings(&self) -> Option<AlertManagerGlobalConfigs>;
fn get_alert_channel_route(&self) -> AlertManagerRoute;
fn get_alert_channel_receiver(&self) -> AlertManagerReceiver;
}
#[async_trait]
impl<T: Topology> Monitor<T> for KubePrometheusMonitor {
async fn deploy_monitor(
&self,
_topology: &T,
_alert_channels: Vec<Box<dyn AlertChannel>>,
) -> Result<Outcome, InterpretError> {
todo!()
}
fn delete_monitor(
&self,
_topology: &T,
_alert_channels: Vec<Box<dyn AlertChannel>>,
) -> Result<Outcome, InterpretError> {
todo!()
}
}
//#[async_trait]
//pub trait PrometheusAlertChannel {
// fn get_alert_channel_global_settings(&self) -> Option<AlertManagerChannelGlobalConfigs>;
// fn get_alert_channel_route(&self) -> AlertManagerChannelRoute;
// fn get_alert_channel_receiver(&self) -> AlertManagerChannelReceiver;
//}

View File

@ -1,3 +1,14 @@
pub struct AlertManagerGlobalConfigs {}
pub struct AlertManagerReceiver {}
pub struct AlertManagerRoute {}
use serde::Serialize;
#[derive(Debug, Clone, Serialize)]
pub struct AlertManagerChannelConfig {
pub global_configs: AlertManagerChannelGlobalConfigs,
pub route: AlertManagerChannelRoute,
pub receiver: AlertManagerChannelReceiver,
}
#[derive(Debug, Clone, Serialize)]
pub struct AlertManagerChannelGlobalConfigs {}
#[derive(Debug, Clone, Serialize)]
pub struct AlertManagerChannelReceiver {}
#[derive(Debug, Clone, Serialize)]
pub struct AlertManagerChannelRoute {}

View File

@ -2,41 +2,18 @@ use async_trait::async_trait;
use serde::{Serialize, Serializer, ser::SerializeStruct};
use std::{fmt::Debug, sync::Arc};
use crate::{
data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{
HelmCommand, Topology,
oberservability::monitoring::{AlertChannelConfig, MonitorConfig},
},
};
use crate::{data::{Id, Version}, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, score::Score, topology::{
oberservability::monitoring::{AlertChannelConfig, Monitor},HelmCommand, Topology
}};
use super::kube_prometheus::{
config::KubePrometheusConfig, kube_prometheus_monitor::KubePrometheusMonitor,
};
#[derive(Debug, Clone)]
pub struct MonitoringAlertingScore<T: Topology> {
pub monitor_config: Arc<dyn MonitorConfig<T>>,
pub alert_channel_configs: Vec<Arc<dyn AlertChannelConfig>>,
pub namespace: Option<String>,
#[derive(Debug, Clone, Serialize)]
pub struct MonitoringAlertingScore {
#[serde(skip)]
pub alert_channel_configs: Option<Vec<Box<dyn AlertChannelConfig>>>,
}
impl<T: Topology> MonitoringAlertingScore<T> {
pub fn default() -> Self {
Self {
monitor_config: Arc::new(KubePrometheusMonitor {
kube_prometheus_config: KubePrometheusConfig::new(),
}),
alert_channel_configs: Vec::new(),
namespace: Some("monitoring".to_string()),
}
}
}
impl<T: Topology + HelmCommand + Debug + Clone + 'static> Score<T> for MonitoringAlertingScore<T> {
impl<T: Topology + HelmCommand + Monitor> Score<T> for MonitoringAlertingScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(MonitoringAlertingInterpret {
score: self.clone(),
@ -44,30 +21,23 @@ impl<T: Topology + HelmCommand + Debug + Clone + 'static> Score<T> for Monitorin
}
fn name(&self) -> String {
todo!()
"MonitoringAlertingScore".to_string()
}
}
#[derive(Debug)]
struct MonitoringAlertingInterpret<T: Topology> {
score: MonitoringAlertingScore<T>,
struct MonitoringAlertingInterpret {
score: MonitoringAlertingScore,
}
#[async_trait]
impl<T: Topology + HelmCommand + Debug> Interpret<T> for MonitoringAlertingInterpret<T> {
impl<T: Topology + HelmCommand + Monitor> Interpret<T> for MonitoringAlertingInterpret {
async fn execute(
&self,
_inventory: &Inventory,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let monitor = self.score.monitor_config.build_monitor();
let mut alert_channels = Vec::new();
for config in &self.score.alert_channel_configs {
alert_channels.push(config.build_alert_channel());
}
monitor.deploy_monitor(topology, alert_channels).await
topology.provision_monitor(self.score.alert_channel_configs.clone()).await
}
fn get_name(&self) -> InterpretName {
@ -87,20 +57,4 @@ impl<T: Topology + HelmCommand + Debug> Interpret<T> for MonitoringAlertingInter
}
}
impl<T: Topology> Serialize for MonitoringAlertingScore<T> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("MonitoringAlertingScore", 3)?;
// For now, just serialize basic info
state.serialize_field("monitor_type", "monitoring_system")?;
let channel_count = self.alert_channel_configs.len();
state.serialize_field("alert_channel_count", &channel_count)?;
state.serialize_field("namespace", &self.namespace)?;
state.end()
}
}