feat/monitoring_alerting #61

Merged
johnride merged 12 commits from feat/monitoring_alerting into master 2025-06-19 14:37:20 +00:00
16 changed files with 256 additions and 330 deletions
Showing only changes of commit 8d219c648a - Show all commits

View File

@ -0,0 +1,11 @@
[package]
name = "monitoring"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { version = "0.1.0", path = "../../harmony" }
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
tokio.workspace = true

View File

@ -0,0 +1,23 @@
use harmony::{
inventory::Inventory,
maestro::Maestro,
modules::monitoring::monitoring_alerting::MonitoringAlertingScore,
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let mut maestro = Maestro::<K8sAnywhereTopology>::initialize(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
)
.await
.unwrap();
let monitoring = MonitoringAlertingScore {
alert_channel_configs: None,
};
maestro.register_all(vec![Box::new(monitoring)]);
harmony_cli::init(maestro, None).await.unwrap();
}

View File

@ -6,7 +6,6 @@ use log::{debug, info, warn};
use tokio::sync::OnceCell;
use crate::{
data::Id,
executors::ExecutorError,
interpret::{InterpretError, Outcome},
inventory::Inventory,
@ -18,9 +17,12 @@ use crate::{
use super::{
HelmCommand, K8sclient, Topology,
k8s::K8sClient,
tenant::{
ResourceLimits, TenantConfig, TenantManager, TenantNetworkPolicy, k8s::K8sTenantManager,
oberservability::{
K8sMonitorConfig,
k8s::K8sMonitor,
monitoring::{AlertChannelConfig, Monitor},
},
tenant::{TenantConfig, TenantManager, k8s::K8sTenantManager},
};
struct K8sState {
@ -38,6 +40,7 @@ enum K8sSource {
pub struct K8sAnywhereTopology {
k8s_state: OnceCell<Option<K8sState>>,
tenant_manager: OnceCell<K8sTenantManager>,
k8s_monitor: OnceCell<K8sMonitor>,
config: K8sAnywhereConfig,
}
@ -63,6 +66,7 @@ impl K8sAnywhereTopology {
Self {
k8s_state: OnceCell::new(),
tenant_manager: OnceCell::new(),
k8s_monitor: OnceCell::new(),
config: K8sAnywhereConfig::from_env(),
}
}
@ -71,6 +75,7 @@ impl K8sAnywhereTopology {
Self {
k8s_state: OnceCell::new(),
tenant_manager: OnceCell::new(),
k8s_monitor: OnceCell::new(),
config,
}
}
@ -201,6 +206,30 @@ impl K8sAnywhereTopology {
)),
}
}
async fn ensure_k8s_monitor(&self) -> Result<(), String> {
if let Some(_) = self.k8s_monitor.get() {
return Ok(());
}
self.k8s_monitor
.get_or_try_init(async || -> Result<K8sMonitor, String> {
let config = K8sMonitorConfig::cluster_monitor();
Ok(K8sMonitor { config })
})
.await
.unwrap();
Ok(())
}
fn get_k8s_monitor(&self) -> Result<&K8sMonitor, ExecutorError> {
match self.k8s_monitor.get() {
Some(k) => Ok(k),
None => Err(ExecutorError::UnexpectedError(
"K8sMonitor not available".to_string(),
)),
}
}
}
pub struct K8sAnywhereConfig {
@ -252,6 +281,10 @@ impl Topology for K8sAnywhereTopology {
"No K8s client could be found or installed".to_string(),
))?;
self.ensure_k8s_monitor()
.await
.map_err(|e| InterpretError::new(e))?;
self.ensure_k8s_tenant_manager()
.await
.map_err(|e| InterpretError::new(e))?;
@ -276,3 +309,20 @@ impl TenantManager for K8sAnywhereTopology {
.await
}
}
#[async_trait]
impl Monitor for K8sAnywhereTopology {
async fn provision_monitor<T: Topology + HelmCommand>(
&self,
inventory: &Inventory,
topology: &T,
alert_receivers: Option<Vec<Box<dyn AlertChannelConfig>>>,
) -> Result<Outcome, InterpretError> {
self.get_k8s_monitor()?
.provision_monitor(inventory, topology, alert_receivers)
.await
}
fn delete_monitor(&self) -> Result<Outcome, InterpretError> {
todo!()
}
}

View File

@ -0,0 +1,71 @@
use std::sync::Arc;
use async_trait::async_trait;
use serde::Serialize;
use crate::score::Score;
use crate::topology::HelmCommand;
use crate::{
interpret::{InterpretError, Outcome},
inventory::Inventory,
topology::Topology,
};
use super::{
K8sMonitorConfig,
monitoring::{AlertChannel, AlertChannelConfig, Monitor},
};
#[derive(Debug, Clone, Serialize)]
pub struct K8sMonitor {
pub config: K8sMonitorConfig,
}
#[async_trait]
impl Monitor for K8sMonitor {
async fn provision_monitor<T: Topology + HelmCommand>(
&self,
inventory: &Inventory,
topology: &T,
alert_channels: Option<Vec<Box<dyn AlertChannelConfig>>>,
) -> Result<Outcome, InterpretError> {
if let Some(channels) = alert_channels {
let alert_channels = self.build_alert_channels(channels).await?;
for channel in alert_channels {
channel.register_alert_channel().await?;
}
}
let chart = self.config.chart.clone();
chart
.create_interpret()
.execute(inventory, topology)
.await?;
Ok(Outcome::success("installed monitor".to_string()))
}
fn delete_monitor(&self) -> Result<Outcome, InterpretError> {
todo!()
}
}
#[async_trait]
impl AlertChannelConfig for K8sMonitor {
async fn build_alert_channel(&self) -> Result<Box<dyn AlertChannel>, InterpretError> {
todo!()
}
}
impl K8sMonitor {
pub async fn build_alert_channels(
&self,
alert_channel_configs: Vec<Box<dyn AlertChannelConfig>>,
) -> Result<Vec<Box<dyn AlertChannel>>, InterpretError> {
let mut alert_channels = Vec::new();
for config in alert_channel_configs {
let channel = config.build_alert_channel().await?;
alert_channels.push(channel)
}
Ok(alert_channels)
}
}

View File

@ -1 +1,23 @@
use serde::Serialize;
use crate::modules::{
helm::chart::HelmChartScore,
monitoring::kube_prometheus::kube_prometheus_helm_chart_score::kube_prometheus_helm_chart_score,
};
pub mod k8s;
pub mod monitoring;
#[derive(Debug, Clone, Serialize)]
pub struct K8sMonitorConfig {
//probably need to do something better here
pub chart: HelmChartScore,
}
impl K8sMonitorConfig {
pub fn cluster_monitor() -> Self {
Self {
chart: kube_prometheus_helm_chart_score(),
}
}
}

View File

@ -1,30 +1,39 @@
use async_trait::async_trait;
use dyn_clone::DynClone;
use std::fmt::Debug;
use crate::executors::ExecutorError;
use crate::interpret::InterpretError;
use crate::inventory::Inventory;
use crate::topology::HelmCommand;
use crate::{interpret::Outcome, topology::Topology};
/// Represents an entity responsible for collecting and organizing observability data
/// from various telemetry sources
/// from various telemetry sources such as Prometheus or Datadog
/// A `Monitor` abstracts the logic required to scrape, aggregate, and structure
/// monitoring data, enabling consistent processing regardless of the underlying data source.
#[async_trait]
pub trait Monitor<T: Topology>: Debug + Send + Sync {
async fn deploy_monitor(
pub trait Monitor {
async fn provision_monitor<T: Topology + HelmCommand>(
&self,
inventory: &Inventory,
topology: &T,
alert_receivers: Vec<AlertReceiver>,
alert_receivers: Option<Vec<Box<dyn AlertChannelConfig>>>,
) -> Result<Outcome, InterpretError>;
async fn delete_monitor(
&self,
topolgy: &T,
alert_receivers: Vec<AlertReceiver>,
) -> Result<Outcome, InterpretError>;
fn delete_monitor(&self) -> Result<Outcome, InterpretError>;
}
pub struct AlertReceiver {
pub receiver_id: String,
#[async_trait]
pub trait AlertChannel: Debug + Send + Sync {
async fn register_alert_channel(&self) -> Result<Outcome, ExecutorError>;
//async fn get_channel_id(&self) -> String;
}
#[async_trait]
pub trait AlertChannelConfig: Debug + Send + Sync + DynClone {
async fn build_alert_channel(&self) -> Result<Box<dyn AlertChannel>, InterpretError>;
}
dyn_clone::clone_trait_object!(AlertChannelConfig);

View File

@ -0,0 +1,8 @@
use url::Url;
#[derive(Debug, Clone)]
pub struct DiscordWebhookAlertChannel {
pub webhook_url: Url,
pub name: String,
pub send_resolved_notifications: bool,
}

View File

@ -0,0 +1 @@
pub mod discord_alert_channel;

View File

@ -1,35 +0,0 @@
use std::str::FromStr;
use non_blank_string_rs::NonBlankString;
use url::Url;
use crate::modules::helm::chart::HelmChartScore;
pub fn discord_alert_manager_score(
webhook_url: Url,
namespace: String,
name: String,
) -> HelmChartScore {
let values = format!(
r#"
environment:
- name: "DISCORD_WEBHOOK"
value: "{webhook_url}"
"#,
);
HelmChartScore {
namespace: Some(NonBlankString::from_str(&namespace).unwrap()),
release_name: NonBlankString::from_str(&name).unwrap(),
chart_name: NonBlankString::from_str(
"oci://hub.nationtech.io/library/alertmanager-discord",
)
.unwrap(),
chart_version: None,
values_overrides: None,
values_yaml: Some(values.to_string()),
create_namespace: true,
install_only: true,
repository: None,
}
}

View File

@ -1,55 +0,0 @@
use async_trait::async_trait;
use serde_json::Value;
use url::Url;
use crate::{
interpret::{InterpretError, Outcome},
topology::K8sAnywhereTopology,
};
#[derive(Debug, Clone)]
pub struct DiscordWebhookConfig {
pub webhook_url: Url,
pub name: String,
pub send_resolved_notifications: bool,
}
pub trait DiscordWebhookReceiver {
fn deploy_discord_webhook_receiver(
&self,
_notification_adapter_id: &str,
) -> Result<Outcome, InterpretError>;
fn delete_discord_webhook_receiver(
&self,
_notification_adapter_id: &str,
) -> Result<Outcome, InterpretError>;
}
// trait used to generate alert manager config values impl<T: Topology + AlertManagerConfig> Monitor for KubePrometheus
pub trait AlertManagerConfig<T> {
fn get_alert_manager_config(&self) -> Result<Value, InterpretError>;
}
#[async_trait]
impl<T: DiscordWebhookReceiver> AlertManagerConfig<T> for DiscordWebhookConfig {
fn get_alert_manager_config(&self) -> Result<Value, InterpretError> {
todo!()
}
}
#[async_trait]
impl DiscordWebhookReceiver for K8sAnywhereTopology {
fn deploy_discord_webhook_receiver(
&self,
_notification_adapter_id: &str,
) -> Result<Outcome, InterpretError> {
todo!()
}
fn delete_discord_webhook_receiver(
&self,
_notification_adapter_id: &str,
) -> Result<Outcome, InterpretError> {
todo!()
}
}

View File

@ -1,6 +1,6 @@
use serde::Serialize;
use super::monitoring_alerting::AlertChannel;
use super::types::AlertManagerChannelConfig;
#[derive(Debug, Clone, Serialize)]
pub struct KubePrometheusConfig {
@ -21,7 +21,7 @@ pub struct KubePrometheusConfig {
pub kube_proxy: bool,
pub kube_state_metrics: bool,
pub prometheus_operator: bool,
pub alert_channel: Vec<AlertChannel>,
pub alert_channels: Vec<AlertManagerChannelConfig>,
}
impl KubePrometheusConfig {
pub fn new() -> Self {
@ -30,7 +30,6 @@ impl KubePrometheusConfig {
default_rules: true,
windows_monitoring: false,
alert_manager: true,
alert_channel: Vec::new(),
grafana: true,
node_exporter: false,
prometheus: true,
@ -44,6 +43,7 @@ impl KubePrometheusConfig {
prometheus_operator: true,
core_dns: false,
kube_scheduler: false,
alert_channels: Vec::new(),
}
}
}

View File

@ -1,12 +1,13 @@
use super::{config::KubePrometheusConfig, monitoring_alerting::AlertChannel};
use super::config::KubePrometheusConfig;
use log::info;
use non_blank_string_rs::NonBlankString;
use std::{collections::HashMap, str::FromStr};
use url::Url;
use std::str::FromStr;
use crate::modules::helm::chart::HelmChartScore;
pub fn kube_prometheus_helm_chart_score(config: &KubePrometheusConfig) -> HelmChartScore {
pub fn kube_prometheus_helm_chart_score() -> HelmChartScore {
let config = KubePrometheusConfig::new();
//TODO this should be make into a rule with default formatting that can be easily passed as a vec
//to the overrides or something leaving the user to deal with formatting here seems bad
let default_rules = config.default_rules.to_string();
@ -144,67 +145,6 @@ prometheus:
enabled: {prometheus}
"#,
);
let alertmanager_config = alert_manager_yaml_builder(&config);
values.push_str(&alertmanager_config);
fn alert_manager_yaml_builder(config: &KubePrometheusConfig) -> String {
let mut receivers = String::new();
let mut routes = String::new();
let mut global_configs = String::new();
let alert_manager = config.alert_manager;
for alert_channel in &config.alert_channel {
match alert_channel {
AlertChannel::Discord { name, .. } => {
let (receiver, route) = discord_alert_builder(name);
info!("discord receiver: {} \nroute: {}", receiver, route);
receivers.push_str(&receiver);
routes.push_str(&route);
}
AlertChannel::Slack {
slack_channel,
webhook_url,
} => {
let (receiver, route) = slack_alert_builder(slack_channel);
info!("slack receiver: {} \nroute: {}", receiver, route);
receivers.push_str(&receiver);
routes.push_str(&route);
let global_config = format!(
r#"
global:
slack_api_url: {webhook_url}"#
);
global_configs.push_str(&global_config);
}
AlertChannel::Smpt { .. } => todo!(),
}
}
info!("after alert receiver: {}", receivers);
info!("after alert routes: {}", routes);
let alertmanager_config = format!(
r#"
alertmanager:
enabled: {alert_manager}
config: {global_configs}
route:
group_by: ['job']
group_wait: 30s
group_interval: 5m
repeat_interval: 12h
routes:
{routes}
receivers:
- name: 'null'
{receivers}"#
);
info!("alert manager config: {}", alertmanager_config);
alertmanager_config
}
HelmChartScore {
namespace: Some(NonBlankString::from_str(&config.namespace).unwrap()),
release_name: NonBlankString::from_str("kube-prometheus").unwrap(),
@ -220,43 +160,3 @@ alertmanager:
repository: None,
}
}
fn discord_alert_builder(release_name: &String) -> (String, String) {
let discord_receiver_name = format!("Discord-{}", release_name);
let receiver = format!(
r#"
- name: '{discord_receiver_name}'
webhook_configs:
- url: 'http://{release_name}-alertmanager-discord:9094'
send_resolved: true"#,
);
let route = format!(
r#"
- receiver: '{discord_receiver_name}'
matchers:
- alertname!=Watchdog
continue: true"#,
);
(receiver, route)
}
fn slack_alert_builder(slack_channel: &String) -> (String, String) {
let slack_receiver_name = format!("Slack-{}", slack_channel);
let receiver = format!(
r#"
- name: '{slack_receiver_name}'
slack_configs:
- channel: '{slack_channel}'
send_resolved: true
title: '{{{{ .CommonAnnotations.title }}}}'
text: '{{{{ .CommonAnnotations.description }}}}'"#,
);
let route = format!(
r#"
- receiver: '{slack_receiver_name}'
matchers:
- alertname!=Watchdog
continue: true"#,
);
(receiver, route)
}

View File

@ -0,0 +1,3 @@
pub mod config;
pub mod kube_prometheus_helm_chart_score;
pub mod types;

View File

@ -0,0 +1,14 @@
use serde::Serialize;
#[derive(Debug, Clone, Serialize)]
pub struct AlertManagerChannelConfig {
pub global_configs: AlertManagerChannelGlobalConfigs,
pub route: AlertManagerChannelRoute,
pub receiver: AlertManagerChannelReceiver,
}
#[derive(Debug, Clone, Serialize)]
pub struct AlertManagerChannelGlobalConfigs {}
#[derive(Debug, Clone, Serialize)]
pub struct AlertManagerChannelReceiver {}
#[derive(Debug, Clone, Serialize)]
pub struct AlertManagerChannelRoute {}

View File

@ -1,5 +1,3 @@
mod config;
mod discord_alert_manager;
pub mod discord_webhook_sender;
mod kube_prometheus;
pub mod alert_channel;
pub mod kube_prometheus;
pub mod monitoring_alerting;

View File

@ -1,10 +1,7 @@
use async_trait::async_trait;
use email_address::EmailAddress;
use log::info;
use serde::Serialize;
use url::Url;
use crate::topology::oberservability::monitoring::{AlertChannelConfig, Monitor};
use crate::{
data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
@ -13,134 +10,43 @@ use crate::{
topology::{HelmCommand, Topology},
};
use super::{
config::KubePrometheusConfig, discord_alert_manager::discord_alert_manager_score,
kube_prometheus::kube_prometheus_helm_chart_score,
};
#[derive(Debug, Clone, Serialize)]
pub enum AlertChannel {
Discord {
name: String,
webhook_url: Url,
},
Slack {
slack_channel: String,
webhook_url: Url,
},
//TODO test and implement in helm chart
//currently does not work
Smpt {
email_address: EmailAddress,
service_name: String,
},
pub struct MonitoringAlertingScore {
#[serde(skip)]
pub alert_channel_configs: Option<Vec<Box<dyn AlertChannelConfig>>>,
}
#[derive(Debug, Clone, Serialize)]
pub struct MonitoringAlertingStackScore {
pub alert_channel: Vec<AlertChannel>,
pub namespace: Option<String>,
}
impl MonitoringAlertingStackScore {
pub fn new() -> Self {
Self {
alert_channel: Vec::new(),
namespace: None,
}
}
}
impl<T: Topology + HelmCommand> Score<T> for MonitoringAlertingStackScore {
impl<T: Topology + HelmCommand + Monitor> Score<T> for MonitoringAlertingScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(MonitoringAlertingStackInterpret {
Box::new(MonitoringAlertingInterpret {
score: self.clone(),
})
}
fn name(&self) -> String {
format!("MonitoringAlertingStackScore")
"MonitoringAlertingScore".to_string()
}
}
#[derive(Debug, Clone, Serialize)]
struct MonitoringAlertingStackInterpret {
score: MonitoringAlertingStackScore,
}
impl MonitoringAlertingStackInterpret {
async fn build_kube_prometheus_helm_chart_config(&self) -> KubePrometheusConfig {
let mut config = KubePrometheusConfig::new();
if let Some(ns) = &self.score.namespace {
config.namespace = ns.clone();
}
config.alert_channel = self.score.alert_channel.clone();
config
}
async fn deploy_kube_prometheus_helm_chart_score<T: Topology + HelmCommand>(
&self,
inventory: &Inventory,
topology: &T,
config: &KubePrometheusConfig,
) -> Result<Outcome, InterpretError> {
let helm_chart = kube_prometheus_helm_chart_score(config);
helm_chart
.create_interpret()
.execute(inventory, topology)
.await
}
async fn deploy_alert_channel_service<T: Topology + HelmCommand>(
&self,
inventory: &Inventory,
topology: &T,
config: &KubePrometheusConfig,
) -> Result<Outcome, InterpretError> {
//let mut outcomes = vec![];
//for channel in &self.score.alert_channel {
// let outcome = match channel {
// AlertChannel::Discord { .. } => {
// discord_alert_manager_score(config)
// .create_interpret()
// .execute(inventory, topology)
// .await
// }
// AlertChannel::Slack { .. } => Ok(Outcome::success(
// "No extra configs for slack alerting".to_string(),
// )),
// AlertChannel::Smpt { .. } => {
// todo!()
// }
// };
// outcomes.push(outcome);
//}
//for result in outcomes {
// result?;
//}
Ok(Outcome::success("All alert channels deployed".to_string()))
}
#[derive(Debug)]
struct MonitoringAlertingInterpret {
score: MonitoringAlertingScore,
}
#[async_trait]
impl<T: Topology + HelmCommand> Interpret<T> for MonitoringAlertingStackInterpret {
impl<T: Topology + HelmCommand + Monitor> Interpret<T> for MonitoringAlertingInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let config = self.build_kube_prometheus_helm_chart_config().await;
info!("Built kube prometheus config");
info!("Installing kube prometheus chart");
self.deploy_kube_prometheus_helm_chart_score(inventory, topology, &config)
.await?;
info!("Installing alert channel service");
self.deploy_alert_channel_service(inventory, topology, &config)
.await?;
Ok(Outcome::success(format!(
"succesfully deployed monitoring and alerting stack"
)))
topology
.provision_monitor(
inventory,
topology,
self.score.alert_channel_configs.clone(),
)
.await
}
fn get_name(&self) -> InterpretName {