feat: setup basic structure to for the concrete implementation of kube prometheus monitor, removed discord webhook receiver trait as the dependency is no longer required for prometheus to interact with discord #52

Closed
wjro wants to merge 3 commits from feat/kube-prometheus into master
13 changed files with 246 additions and 289 deletions
Showing only changes of commit dbc66f3d0c - Show all commits

View File

@@ -2,10 +2,7 @@ use harmony::{
data::Version,
inventory::Inventory,
maestro::Maestro,
modules::{
lamp::{LAMPConfig, LAMPScore},
monitoring::monitoring_alerting::{AlertChannel, MonitoringAlertingStackScore},
},
modules::lamp::{LAMPConfig, LAMPScore},
topology::{K8sAnywhereTopology, Url},
};
@@ -43,13 +40,7 @@ async fn main() {
.await
.unwrap();
let url = url::Url::parse("https://discord.com/api/webhooks/dummy_channel/dummy_token")
.expect("invalid URL");
let mut monitoring_stack_score = MonitoringAlertingStackScore::new();
monitoring_stack_score.namespace = Some(lamp_stack.config.namespace.clone());
maestro.register_all(vec![Box::new(lamp_stack), Box::new(monitoring_stack_score)]);
maestro.register_all(vec![Box::new(lamp_stack)]);
// Here we bootstrap the CLI, this gives some nice features if you need them
harmony_cli::init(maestro, None).await.unwrap();
}

View File

@@ -1,14 +1,12 @@
use async_trait::async_trait;
use std::fmt::Debug;
use url::Url;
use crate::interpret::InterpretError;
use crate::{interpret::Outcome, topology::Topology};
/// Represents an entity responsible for collecting and organizing observability data
/// from various telemetry sources
/// from various telemetry sources such as Prometheus or Datadog
/// A `Monitor` abstracts the logic required to scrape, aggregate, and structure
/// monitoring data, enabling consistent processing regardless of the underlying data source.
#[async_trait]
@@ -16,16 +14,25 @@ pub trait Monitor<T: Topology>: Debug + Send + Sync {
async fn deploy_monitor(
&self,
topology: &T,
alert_receivers: Vec<AlertReceiver>,
alert_receivers: Vec<Box<dyn AlertChannel>>,
) -> Result<Outcome, InterpretError>;
async fn delete_monitor(
fn delete_monitor(
&self,
topolgy: &T,
alert_receivers: Vec<AlertReceiver>,
alert_receivers: Vec<Box<dyn AlertChannel>>,
) -> Result<Outcome, InterpretError>;
}
pub struct AlertReceiver {
pub receiver_id: String,
pub trait MonitorConfig<T: Topology>: Debug + Send + Sync {
fn build_monitor(&self) -> Box<dyn Monitor<T>>;
}
pub trait AlertChannelConfig: std::fmt::Debug + Send + Sync {
fn build_alert_channel(&self) -> Box<dyn AlertChannel>;
fn channel_type(&self) -> String;
}
#[async_trait]
pub trait AlertChannel: Debug + Send + Sync {
async fn get_channel_id(&self) -> String;
}

View File

@@ -0,0 +1,52 @@
use async_trait::async_trait;
use serde::Serialize;
use url::Url;
use crate::{
modules::monitoring::kube_prometheus::{
kube_prometheus_monitor::PrometheusAlertChannel,
types::{AlertManagerGlobalConfigs, AlertManagerReceiver, AlertManagerRoute},
},
topology::oberservability::monitoring::{AlertChannel, AlertChannelConfig},
};
#[derive(Debug, Clone)]
pub struct DiscordWebhookAlertChannel {
pub webhook_url: Url,
pub name: String,
pub send_resolved_notifications: bool,
}
impl AlertChannelConfig for DiscordWebhookAlertChannel {
fn build_alert_channel(&self) -> Box<dyn AlertChannel> {
Box::new(DiscordWebhookAlertChannel {
webhook_url: self.webhook_url.clone(),
name: self.name.clone(),
send_resolved_notifications: self.send_resolved_notifications.clone(),
})
}
fn channel_type(&self) -> String {
"discord".to_string()
}
}
#[async_trait]
impl AlertChannel for DiscordWebhookAlertChannel {
async fn get_channel_id(&self) -> String {
self.name.clone()
}
}
impl PrometheusAlertChannel for DiscordWebhookAlertChannel {
fn get_alert_channel_global_settings(&self) -> Option<AlertManagerGlobalConfigs> {
None
}
fn get_alert_channel_route(&self) -> AlertManagerRoute {
todo!()
}
fn get_alert_channel_receiver(&self) -> AlertManagerReceiver {
todo!()
}
}

View File

@@ -0,0 +1 @@
pub mod discord_alert_channel;

View File

@@ -1,35 +0,0 @@
use std::str::FromStr;
use non_blank_string_rs::NonBlankString;
use url::Url;
use crate::modules::helm::chart::HelmChartScore;
pub fn discord_alert_manager_score(
webhook_url: Url,
namespace: String,
name: String,
) -> HelmChartScore {
let values = format!(
r#"
environment:
- name: "DISCORD_WEBHOOK"
value: "{webhook_url}"
"#,
);
HelmChartScore {
namespace: Some(NonBlankString::from_str(&namespace).unwrap()),
release_name: NonBlankString::from_str(&name).unwrap(),
chart_name: NonBlankString::from_str(
"oci://hub.nationtech.io/library/alertmanager-discord",
)
.unwrap(),
chart_version: None,
values_overrides: None,
values_yaml: Some(values.to_string()),
create_namespace: true,
install_only: true,
repository: None,
}
}

View File

@@ -1,55 +0,0 @@
use async_trait::async_trait;
use serde_json::Value;
use url::Url;
use crate::{
interpret::{InterpretError, Outcome},
topology::K8sAnywhereTopology,
};
#[derive(Debug, Clone)]
pub struct DiscordWebhookConfig {
pub webhook_url: Url,
pub name: String,
pub send_resolved_notifications: bool,
}
pub trait DiscordWebhookReceiver {
fn deploy_discord_webhook_receiver(
&self,
_notification_adapter_id: &str,
) -> Result<Outcome, InterpretError>;
fn delete_discord_webhook_receiver(
&self,
_notification_adapter_id: &str,
) -> Result<Outcome, InterpretError>;
}
// trait used to generate alert manager config values impl<T: Topology + AlertManagerConfig> Monitor for KubePrometheus
pub trait AlertManagerConfig<T> {
fn get_alert_manager_config(&self) -> Result<Value, InterpretError>;
}
#[async_trait]
impl<T: DiscordWebhookReceiver> AlertManagerConfig<T> for DiscordWebhookConfig {
fn get_alert_manager_config(&self) -> Result<Value, InterpretError> {
todo!()
}
}
#[async_trait]
impl DiscordWebhookReceiver for K8sAnywhereTopology {
fn deploy_discord_webhook_receiver(
&self,
_notification_adapter_id: &str,
) -> Result<Outcome, InterpretError> {
todo!()
}
fn delete_discord_webhook_receiver(
&self,
_notification_adapter_id: &str,
) -> Result<Outcome, InterpretError> {
todo!()
}
}

View File

@@ -1,7 +1,5 @@
use serde::Serialize;
use super::monitoring_alerting::AlertChannel;
#[derive(Debug, Clone, Serialize)]
pub struct KubePrometheusConfig {
pub namespace: String,
@@ -21,7 +19,6 @@ pub struct KubePrometheusConfig {
pub kube_proxy: bool,
pub kube_state_metrics: bool,
pub prometheus_operator: bool,
pub alert_channel: Vec<AlertChannel>,
}
impl KubePrometheusConfig {
pub fn new() -> Self {
@@ -30,7 +27,6 @@ impl KubePrometheusConfig {
default_rules: true,
windows_monitoring: false,
alert_manager: true,
alert_channel: Vec::new(),
grafana: true,
node_exporter: false,
prometheus: true,

View File

@@ -1,8 +1,7 @@
use super::{config::KubePrometheusConfig, monitoring_alerting::AlertChannel};
use super::config::KubePrometheusConfig;
use log::info;
use non_blank_string_rs::NonBlankString;
use std::{collections::HashMap, str::FromStr};
use url::Url;
use std::str::FromStr;
use crate::modules::helm::chart::HelmChartScore;
@@ -145,65 +144,65 @@ prometheus:
"#,
);
let alertmanager_config = alert_manager_yaml_builder(&config);
values.push_str(&alertmanager_config);
fn alert_manager_yaml_builder(config: &KubePrometheusConfig) -> String {
let mut receivers = String::new();
let mut routes = String::new();
let mut global_configs = String::new();
let alert_manager = config.alert_manager;
for alert_channel in &config.alert_channel {
match alert_channel {
AlertChannel::Discord { name, .. } => {
let (receiver, route) = discord_alert_builder(name);
info!("discord receiver: {} \nroute: {}", receiver, route);
receivers.push_str(&receiver);
routes.push_str(&route);
}
AlertChannel::Slack {
slack_channel,
webhook_url,
} => {
let (receiver, route) = slack_alert_builder(slack_channel);
info!("slack receiver: {} \nroute: {}", receiver, route);
receivers.push_str(&receiver);
routes.push_str(&route);
let global_config = format!(
r#"
global:
slack_api_url: {webhook_url}"#
);
global_configs.push_str(&global_config);
}
AlertChannel::Smpt { .. } => todo!(),
}
}
info!("after alert receiver: {}", receivers);
info!("after alert routes: {}", routes);
let alertmanager_config = format!(
r#"
alertmanager:
enabled: {alert_manager}
config: {global_configs}
route:
group_by: ['job']
group_wait: 30s
group_interval: 5m
repeat_interval: 12h
routes:
{routes}
receivers:
- name: 'null'
{receivers}"#
);
info!("alert manager config: {}", alertmanager_config);
alertmanager_config
}
// let alertmanager_config = alert_manager_yaml_builder(&config);
// values.push_str(&alertmanager_config);
//
// fn alert_manager_yaml_builder(config: &KubePrometheusConfig) -> String {
// let mut receivers = String::new();
// let mut routes = String::new();
// let mut global_configs = String::new();
// let alert_manager = config.alert_manager;
// for alert_channel in &config.alert_channel {
// match alert_channel {
// AlertChannel::Discord { name, .. } => {
// let (receiver, route) = discord_alert_builder(name);
// info!("discord receiver: {} \nroute: {}", receiver, route);
// receivers.push_str(&receiver);
// routes.push_str(&route);
// }
// AlertChannel::Slack {
// slack_channel,
// webhook_url,
// } => {
// let (receiver, route) = slack_alert_builder(slack_channel);
// info!("slack receiver: {} \nroute: {}", receiver, route);
// receivers.push_str(&receiver);
//
// routes.push_str(&route);
// let global_config = format!(
// r#"
// global:
// slack_api_url: {webhook_url}"#
// );
//
// global_configs.push_str(&global_config);
// }
// AlertChannel::Smpt { .. } => todo!(),
// }
// }
// info!("after alert receiver: {}", receivers);
// info!("after alert routes: {}", routes);
//
// let alertmanager_config = format!(
// r#"
//alertmanager:
// enabled: {alert_manager}
// config: {global_configs}
// route:
// group_by: ['job']
// group_wait: 30s
// group_interval: 5m
// repeat_interval: 12h
// routes:
//{routes}
// receivers:
// - name: 'null'
//{receivers}"#
// );
//
// info!("alert manager config: {}", alertmanager_config);
// alertmanager_config
// }
HelmChartScore {
namespace: Some(NonBlankString::from_str(&config.namespace).unwrap()),

View File

@@ -0,0 +1,51 @@
use async_trait::async_trait;
use serde::Serialize;
use crate::{
interpret::{InterpretError, Outcome},
topology::{
Topology,
oberservability::monitoring::{AlertChannel, Monitor, MonitorConfig},
},
};
use super::{
config::KubePrometheusConfig,
types::{AlertManagerGlobalConfigs, AlertManagerReceiver, AlertManagerRoute},
};
#[derive(Debug, Clone, Serialize)]
pub struct KubePrometheusMonitor {
pub kube_prometheus_config: KubePrometheusConfig,
}
impl<T: Topology> MonitorConfig<T> for KubePrometheusMonitor {
fn build_monitor(&self) -> Box<dyn Monitor<T>> {
Box::new(self.clone())
}
}
#[async_trait]
pub trait PrometheusAlertChannel: AlertChannel {
fn get_alert_channel_global_settings(&self) -> Option<AlertManagerGlobalConfigs>;
fn get_alert_channel_route(&self) -> AlertManagerRoute;
fn get_alert_channel_receiver(&self) -> AlertManagerReceiver;
}
#[async_trait]
impl<T: Topology> Monitor<T> for KubePrometheusMonitor {
async fn deploy_monitor(
&self,
_topology: &T,
_alert_channels: Vec<Box<dyn AlertChannel>>,
) -> Result<Outcome, InterpretError> {
todo!()
}
fn delete_monitor(
&self,
_topology: &T,
_alert_channels: Vec<Box<dyn AlertChannel>>,
) -> Result<Outcome, InterpretError> {
todo!()
}
}

View File

@@ -0,0 +1,4 @@
pub mod config;
pub mod kube_prometheus_helm_chart_score;
pub mod kube_prometheus_monitor;
pub mod types;

View File

@@ -0,0 +1,3 @@
pub struct AlertManagerGlobalConfigs {}
pub struct AlertManagerReceiver {}
pub struct AlertManagerRoute {}

View File

@@ -1,5 +1,3 @@
mod config;
mod discord_alert_manager;
pub mod discord_webhook_sender;
mod kube_prometheus;
pub mod alert_channel;
pub mod kube_prometheus;
pub mod monitoring_alerting;

View File

@@ -1,146 +1,73 @@
use async_trait::async_trait;
use email_address::EmailAddress;
use log::info;
use serde::Serialize;
use url::Url;
use serde::{Serialize, Serializer, ser::SerializeStruct};
use std::{fmt::Debug, sync::Arc};
use crate::{
data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{HelmCommand, Topology},
topology::{
HelmCommand, Topology,
oberservability::monitoring::{AlertChannelConfig, MonitorConfig},
},
};
use super::{
config::KubePrometheusConfig, discord_alert_manager::discord_alert_manager_score,
kube_prometheus::kube_prometheus_helm_chart_score,
use super::kube_prometheus::{
config::KubePrometheusConfig, kube_prometheus_monitor::KubePrometheusMonitor,
};
#[derive(Debug, Clone, Serialize)]
pub enum AlertChannel {
Discord {
name: String,
webhook_url: Url,
},
Slack {
slack_channel: String,
webhook_url: Url,
},
//TODO test and implement in helm chart
//currently does not work
Smpt {
email_address: EmailAddress,
service_name: String,
},
}
#[derive(Debug, Clone, Serialize)]
pub struct MonitoringAlertingStackScore {
pub alert_channel: Vec<AlertChannel>,
#[derive(Debug, Clone)]
pub struct MonitoringAlertingScore<T: Topology> {
pub monitor_config: Arc<dyn MonitorConfig<T>>,
pub alert_channel_configs: Vec<Arc<dyn AlertChannelConfig>>,
pub namespace: Option<String>,
}
impl MonitoringAlertingStackScore {
pub fn new() -> Self {
impl<T: Topology> MonitoringAlertingScore<T> {
pub fn default() -> Self {
Self {
alert_channel: Vec::new(),
namespace: None,
monitor_config: Arc::new(KubePrometheusMonitor {
kube_prometheus_config: KubePrometheusConfig::new(),
}),
alert_channel_configs: Vec::new(),
namespace: Some("monitoring".to_string()),
}
}
}
impl<T: Topology + HelmCommand> Score<T> for MonitoringAlertingStackScore {
impl<T: Topology + HelmCommand + Debug + Clone + 'static> Score<T> for MonitoringAlertingScore<T> {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(MonitoringAlertingStackInterpret {
Box::new(MonitoringAlertingInterpret {
score: self.clone(),
})
}
fn name(&self) -> String {
format!("MonitoringAlertingStackScore")
todo!()
}
}
#[derive(Debug, Clone, Serialize)]
struct MonitoringAlertingStackInterpret {
score: MonitoringAlertingStackScore,
}
impl MonitoringAlertingStackInterpret {
async fn build_kube_prometheus_helm_chart_config(&self) -> KubePrometheusConfig {
let mut config = KubePrometheusConfig::new();
if let Some(ns) = &self.score.namespace {
config.namespace = ns.clone();
}
config.alert_channel = self.score.alert_channel.clone();
config
}
async fn deploy_kube_prometheus_helm_chart_score<T: Topology + HelmCommand>(
&self,
inventory: &Inventory,
topology: &T,
config: &KubePrometheusConfig,
) -> Result<Outcome, InterpretError> {
let helm_chart = kube_prometheus_helm_chart_score(config);
helm_chart
.create_interpret()
.execute(inventory, topology)
.await
}
async fn deploy_alert_channel_service<T: Topology + HelmCommand>(
&self,
inventory: &Inventory,
topology: &T,
config: &KubePrometheusConfig,
) -> Result<Outcome, InterpretError> {
//let mut outcomes = vec![];
//for channel in &self.score.alert_channel {
// let outcome = match channel {
// AlertChannel::Discord { .. } => {
// discord_alert_manager_score(config)
// .create_interpret()
// .execute(inventory, topology)
// .await
// }
// AlertChannel::Slack { .. } => Ok(Outcome::success(
// "No extra configs for slack alerting".to_string(),
// )),
// AlertChannel::Smpt { .. } => {
// todo!()
// }
// };
// outcomes.push(outcome);
//}
//for result in outcomes {
// result?;
//}
Ok(Outcome::success("All alert channels deployed".to_string()))
}
#[derive(Debug)]
struct MonitoringAlertingInterpret<T: Topology> {
score: MonitoringAlertingScore<T>,
}
#[async_trait]
impl<T: Topology + HelmCommand> Interpret<T> for MonitoringAlertingStackInterpret {
impl<T: Topology + HelmCommand + Debug> Interpret<T> for MonitoringAlertingInterpret<T> {
async fn execute(
&self,
inventory: &Inventory,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let config = self.build_kube_prometheus_helm_chart_config().await;
info!("Built kube prometheus config");
info!("Installing kube prometheus chart");
self.deploy_kube_prometheus_helm_chart_score(inventory, topology, &config)
.await?;
info!("Installing alert channel service");
self.deploy_alert_channel_service(inventory, topology, &config)
.await?;
Ok(Outcome::success(format!(
"succesfully deployed monitoring and alerting stack"
)))
let monitor = self.score.monitor_config.build_monitor();
let mut alert_channels = Vec::new();
for config in &self.score.alert_channel_configs {
alert_channels.push(config.build_alert_channel());
}
monitor.deploy_monitor(topology, alert_channels).await
}
fn get_name(&self) -> InterpretName {
@@ -159,3 +86,21 @@ impl<T: Topology + HelmCommand> Interpret<T> for MonitoringAlertingStackInterpre
todo!()
}
}
impl<T: Topology> Serialize for MonitoringAlertingScore<T> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let mut state = serializer.serialize_struct("MonitoringAlertingScore", 3)?;
// For now, just serialize basic info
state.serialize_field("monitor_type", "monitoring_system")?;
let channel_count = self.alert_channel_configs.len();
state.serialize_field("alert_channel_count", &channel_count)?;
state.serialize_field("namespace", &self.namespace)?;
state.end()
}
}