Compare commits

...

4 Commits

Author SHA1 Message Date
de3e7869f7 feat: added impl for kube prometheus monitor
All checks were successful
Run Check Script / check (push) Successful in 1m46s
2025-06-04 11:50:36 -04:00
57eabc9834 wip: added alert manager types for use with kube-prometheus alert manager 2025-06-04 09:31:17 -04:00
cd40660350 fix: Ensure idempotent deployment of Discord adapters
All checks were successful
Run Check Script / check (push) Successful in 1m49s
Run Check Script / check (pull_request) Successful in 1m46s
Updates  to use  for managing
Discord webhook adapter deployments. This prevents redundant installations
by ensuring that the deployment interpret for a given adapter
configuration is executed only once.

The internal state now uses a  keyed by the alertreceiver ID to
track and initialize each receiver's .
2025-06-03 16:07:11 -04:00
2ca732cecd feat: added the steps to install discord-webhook-receiver for k8s anywhere topology if not already installed
All checks were successful
Run Check Script / check (push) Successful in 1m45s
Run Check Script / check (pull_request) Successful in 1m48s
2025-06-03 14:14:41 -04:00
8 changed files with 363 additions and 34 deletions

View File

@ -1,9 +1,9 @@
use std::{process::Command, sync::Arc};
use std::{collections::HashMap, process::Command, sync::Arc};
use async_trait::async_trait;
use inquire::Confirm;
use log::{info, warn};
use tokio::sync::OnceCell;
use tokio::sync::{Mutex, OnceCell};
use crate::{
executors::ExecutorError,
@ -17,6 +17,7 @@ use crate::{
use super::{
HelmCommand, K8sclient, Topology,
k8s::K8sClient,
oberservability::monitoring::AlertReceiver,
tenant::{
ResourceLimits, TenantConfig, TenantManager, TenantNetworkPolicy, k8s::K8sTenantManager,
},
@ -37,6 +38,7 @@ enum K8sSource {
pub struct K8sAnywhereTopology {
k8s_state: OnceCell<Option<K8sState>>,
tenant_manager: OnceCell<K8sTenantManager>,
pub alert_receivers: Mutex<HashMap<String, OnceCell<AlertReceiver>>>,
}
#[async_trait]
@ -61,6 +63,7 @@ impl K8sAnywhereTopology {
Self {
k8s_state: OnceCell::new(),
tenant_manager: OnceCell::new(),
alert_receivers: Mutex::new(HashMap::new()),
}
}

View File

@ -1,7 +1,8 @@
use async_trait::async_trait;
use dyn_clone::DynClone;
use serde::Serialize;
use std::fmt::Debug;
use url::Url;
use crate::interpret::InterpretError;
@ -13,19 +14,20 @@ use crate::{interpret::Outcome, topology::Topology};
/// monitoring data, enabling consistent processing regardless of the underlying data source.
#[async_trait]
pub trait Monitor<T: Topology>: Debug + Send + Sync {
async fn deploy_monitor(
&self,
topology: &T,
alert_receivers: Vec<AlertReceiver>,
) -> Result<Outcome, InterpretError>;
async fn deploy_monitor(&self, topology: &T) -> Result<Outcome, InterpretError>;
async fn delete_monitor(
&self,
topolgy: &T,
alert_receivers: Vec<AlertReceiver>,
) -> Result<Outcome, InterpretError>;
async fn delete_monitor(&self, topolgy: &T) -> Result<Outcome, InterpretError>;
}
#[async_trait]
pub trait AlertReceiverDeployment<T: Topology>: Debug + DynClone + Send + Sync {
async fn deploy_alert_receiver(&self, topology: &T) -> Result<Outcome, InterpretError>;
}
dyn_clone::clone_trait_object!(<T> AlertReceiverDeployment<T>);
#[derive(Debug, Clone, Serialize)]
pub struct AlertReceiver {
pub receiver_id: String,
pub receiver_installed: bool,
}

View File

@ -0,0 +1,102 @@
use serde::{Deserialize, Serialize};
use url::Url;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertManagerValues {
pub alertmanager: AlertManager,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertManager {
pub enabled: bool,
pub config: AlertManagerConfig,
}
#[derive(Clone, Debug, Serialize, Deserialize)]
pub struct AlertChannelConfig {
pub receiver: AlertChannelReceiver,
pub route: AlertChannelRoute,
pub global_config: Option<AlertChannelGlobalConfig>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertChannelReceiver {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub slack_configs: Option<Vec<SlackConfig>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub webhook_configs: Option<Vec<WebhookConfig>>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertManagerRoute {
pub group_by: Vec<String>,
pub group_wait: String,
pub group_interval: String,
pub repeat_interval: String,
pub routes: Vec<AlertChannelRoute>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertChannelGlobalConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub slack_api_url: Option<Url>,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct SlackConfig {
pub channel: String,
pub send_resolved: bool,
pub title: String,
pub text: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WebhookConfig {
pub url: Url,
pub send_resolved: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertChannelRoute {
pub receiver: String,
pub matchers: Vec<String>,
#[serde(default)]
pub r#continue: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertManagerConfig {
pub global: Option<AlertChannelGlobalConfig>,
pub route: AlertManagerRoute,
pub receivers: Vec<AlertChannelReceiver>,
}
impl AlertManagerValues {
pub fn default() -> Self {
Self {
alertmanager: AlertManager {
enabled: true,
config: AlertManagerConfig {
global: None,
route: AlertManagerRoute {
group_by: vec!["job".to_string()],
group_wait: "30s".to_string(),
group_interval: "5m".to_string(),
repeat_interval: "12h".to_string(),
routes: vec![AlertChannelRoute {
receiver: "null".to_string(),
matchers: vec!["alertname=Watchdog".to_string()],
r#continue: false,
}],
},
receivers: vec![AlertChannelReceiver {
name: "null".to_string(),
slack_configs: None,
webhook_configs: None,
}],
},
},
}
}
}

View File

@ -1,55 +1,168 @@
use super::{
discord_alert_manager::discord_alert_manager_score, kube_prometheus_monitor::AlertManagerConfig,
};
use async_trait::async_trait;
use serde_json::Value;
use serde::Serialize;
use serde_yaml::Value;
use tokio::sync::OnceCell;
use url::Url;
use crate::{
interpret::{InterpretError, Outcome},
topology::K8sAnywhereTopology,
data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{
HelmCommand, K8sAnywhereTopology, Topology,
oberservability::monitoring::{AlertReceiver, AlertReceiverDeployment},
},
};
#[derive(Debug, Clone)]
#[async_trait]
impl<T: Topology + DiscordWebhookReceiver> AlertReceiverDeployment<T> for DiscordWebhookConfig {
async fn deploy_alert_receiver(&self, topology: &T) -> Result<Outcome, InterpretError> {
topology.deploy_discord_webhook_receiver(self.clone()).await
}
}
#[derive(Debug, Clone, Serialize)]
pub struct DiscordWebhookConfig {
pub webhook_url: Url,
pub name: String,
pub send_resolved_notifications: bool,
}
#[async_trait]
pub trait DiscordWebhookReceiver {
fn deploy_discord_webhook_receiver(
async fn deploy_discord_webhook_receiver(
&self,
_notification_adapter_id: &str,
config: DiscordWebhookConfig,
) -> Result<Outcome, InterpretError>;
fn delete_discord_webhook_receiver(
&self,
_notification_adapter_id: &str,
config: DiscordWebhookConfig,
) -> Result<Outcome, InterpretError>;
}
// trait used to generate alert manager config values impl<T: Topology + AlertManagerConfig> Monitor for KubePrometheus
pub trait AlertManagerConfig<T> {
fn get_alert_manager_config(&self) -> Result<Value, InterpretError>;
}
#[async_trait]
impl<T: DiscordWebhookReceiver> AlertManagerConfig<T> for DiscordWebhookConfig {
fn get_alert_manager_config(&self) -> Result<Value, InterpretError> {
async fn get_alert_manager_config(&self) -> Result<Value, InterpretError> {
todo!()
}
}
#[async_trait]
impl DiscordWebhookReceiver for K8sAnywhereTopology {
fn deploy_discord_webhook_receiver(
async fn deploy_discord_webhook_receiver(
&self,
_notification_adapter_id: &str,
config: DiscordWebhookConfig,
) -> Result<Outcome, InterpretError> {
todo!()
let receiver_key = config.name.clone();
let mut adapters_map_guard = self.alert_receivers.lock().await;
let cell = adapters_map_guard
.entry(receiver_key.clone())
.or_insert_with(OnceCell::new);
if let Some(initialized_receiver) = cell.get() {
return Ok(Outcome::success(format!(
"Discord Webhook adapter for '{}' already initialized.",
initialized_receiver.receiver_id
)));
}
let final_state = cell
.get_or_try_init(|| async {
initialize_discord_webhook_receiver(config.clone(), self).await
})
.await?;
Ok(Outcome::success(format!(
"Discord Webhook Receiver for '{}' ensured/initialized.",
final_state.receiver_id
)))
}
fn delete_discord_webhook_receiver(
&self,
_notification_adapter_id: &str,
_config: DiscordWebhookConfig,
) -> Result<Outcome, InterpretError> {
todo!()
}
}
async fn initialize_discord_webhook_receiver(
conf: DiscordWebhookConfig,
topology: &K8sAnywhereTopology,
) -> Result<AlertReceiver, InterpretError> {
println!(
"Attempting to initialize Discord adapter for: {}",
conf.name
);
let score = DiscordWebhookReceiverScore {
config: conf.clone(),
};
let inventory = Inventory::autoload();
let interpret = score.create_interpret();
interpret.execute(&inventory, topology).await?;
Ok(AlertReceiver {
receiver_id: conf.name,
receiver_installed: true,
})
}
#[derive(Debug, Clone, Serialize)]
struct DiscordWebhookReceiverScore {
config: DiscordWebhookConfig,
}
impl<T: Topology + HelmCommand> Score<T> for DiscordWebhookReceiverScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(DiscordWebhookReceiverScoreInterpret {
config: self.config.clone(),
})
}
fn name(&self) -> String {
"DiscordWebhookReceiverScore".to_string()
}
}
#[derive(Debug)]
struct DiscordWebhookReceiverScoreInterpret {
config: DiscordWebhookConfig,
}
#[async_trait]
impl<T: Topology + HelmCommand> Interpret<T> for DiscordWebhookReceiverScoreInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
discord_alert_manager_score(
self.config.webhook_url.clone(),
self.config.name.clone(),
self.config.name.clone(),
)
.create_interpret()
.execute(inventory, topology)
.await
}
fn get_name(&self) -> InterpretName {
todo!()
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@ -0,0 +1,108 @@
use async_trait::async_trait;
use serde::Serialize;
use serde_yaml::Value;
use crate::{
data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{
HelmCommand, Topology,
oberservability::monitoring::{AlertReceiverDeployment, Monitor},
},
};
use super::{
config::KubePrometheusConfig, kube_prometheus_helm_chart::kube_prometheus_helm_chart_score,
};
#[derive(Debug, Clone)]
pub struct KubePrometheus<T> {
alert_receivers: Vec<Box<dyn AlertReceiverDeployment<T>>>,
config: KubePrometheusConfig,
}
#[async_trait]
pub trait AlertManagerConfig<T> {
async fn get_alert_manager_config(&self) -> Result<Value, InterpretError>;
}
impl<T: Topology> KubePrometheus<T> {
pub fn new() -> Self {
Self {
alert_receivers: Vec::new(),
config: KubePrometheusConfig::new(),
}
}
}
#[async_trait]
impl<T: Topology + HelmCommand + std::fmt::Debug> Monitor<T> for KubePrometheus<T> {
async fn deploy_monitor(&self, topology: &T) -> Result<Outcome, InterpretError> {
for alert_receiver in &self.alert_receivers {
alert_receiver.deploy_alert_receiver(topology).await?;
}
let score = KubePrometheusScore {
config: self.config.clone(),
};
let inventory = Inventory::autoload();
score.create_interpret().execute(&inventory, topology).await
}
async fn delete_monitor(&self, _topolgy: &T) -> Result<Outcome, InterpretError> {
todo!()
}
}
#[derive(Debug, Clone, Serialize)]
struct KubePrometheusScore {
config: KubePrometheusConfig,
}
impl<T: Topology + HelmCommand> Score<T> for KubePrometheusScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(KubePromethusScoreInterpret {
score: self.clone(),
})
}
fn name(&self) -> String {
todo!()
}
}
#[derive(Debug, Clone, Serialize)]
struct KubePromethusScoreInterpret {
score: KubePrometheusScore,
}
#[async_trait]
impl<T: Topology + HelmCommand> Interpret<T> for KubePromethusScoreInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
kube_prometheus_helm_chart_score(&self.score.config)
.create_interpret()
.execute(inventory, topology)
.await
}
fn get_name(&self) -> InterpretName {
todo!()
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@ -1,5 +1,7 @@
pub mod alertmanager_types;
mod config;
mod discord_alert_manager;
pub mod discord_webhook_sender;
mod kube_prometheus;
mod kube_prometheus_helm_chart;
pub mod kube_prometheus_monitor;
pub mod monitoring_alerting;

View File

@ -14,8 +14,7 @@ use crate::{
};
use super::{
config::KubePrometheusConfig, discord_alert_manager::discord_alert_manager_score,
kube_prometheus::kube_prometheus_helm_chart_score,
config::KubePrometheusConfig, kube_prometheus_helm_chart::kube_prometheus_helm_chart_score,
};
#[derive(Debug, Clone, Serialize)]