Compare commits
4 Commits
master
...
feat/kube-
Author | SHA1 | Date | |
---|---|---|---|
de3e7869f7 | |||
57eabc9834 | |||
cd40660350 | |||
2ca732cecd |
@ -1,9 +1,9 @@
|
||||
use std::{process::Command, sync::Arc};
|
||||
use std::{collections::HashMap, process::Command, sync::Arc};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use inquire::Confirm;
|
||||
use log::{info, warn};
|
||||
use tokio::sync::OnceCell;
|
||||
use tokio::sync::{Mutex, OnceCell};
|
||||
|
||||
use crate::{
|
||||
executors::ExecutorError,
|
||||
@ -17,6 +17,7 @@ use crate::{
|
||||
use super::{
|
||||
HelmCommand, K8sclient, Topology,
|
||||
k8s::K8sClient,
|
||||
oberservability::monitoring::AlertReceiver,
|
||||
tenant::{
|
||||
ResourceLimits, TenantConfig, TenantManager, TenantNetworkPolicy, k8s::K8sTenantManager,
|
||||
},
|
||||
@ -37,6 +38,7 @@ enum K8sSource {
|
||||
pub struct K8sAnywhereTopology {
|
||||
k8s_state: OnceCell<Option<K8sState>>,
|
||||
tenant_manager: OnceCell<K8sTenantManager>,
|
||||
pub alert_receivers: Mutex<HashMap<String, OnceCell<AlertReceiver>>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@ -61,6 +63,7 @@ impl K8sAnywhereTopology {
|
||||
Self {
|
||||
k8s_state: OnceCell::new(),
|
||||
tenant_manager: OnceCell::new(),
|
||||
alert_receivers: Mutex::new(HashMap::new()),
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1,7 +1,8 @@
|
||||
use async_trait::async_trait;
|
||||
use dyn_clone::DynClone;
|
||||
use serde::Serialize;
|
||||
|
||||
use std::fmt::Debug;
|
||||
use url::Url;
|
||||
|
||||
use crate::interpret::InterpretError;
|
||||
|
||||
@ -13,19 +14,20 @@ use crate::{interpret::Outcome, topology::Topology};
|
||||
/// monitoring data, enabling consistent processing regardless of the underlying data source.
|
||||
#[async_trait]
|
||||
pub trait Monitor<T: Topology>: Debug + Send + Sync {
|
||||
async fn deploy_monitor(
|
||||
&self,
|
||||
topology: &T,
|
||||
alert_receivers: Vec<AlertReceiver>,
|
||||
) -> Result<Outcome, InterpretError>;
|
||||
async fn deploy_monitor(&self, topology: &T) -> Result<Outcome, InterpretError>;
|
||||
|
||||
async fn delete_monitor(
|
||||
&self,
|
||||
topolgy: &T,
|
||||
alert_receivers: Vec<AlertReceiver>,
|
||||
) -> Result<Outcome, InterpretError>;
|
||||
async fn delete_monitor(&self, topolgy: &T) -> Result<Outcome, InterpretError>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait AlertReceiverDeployment<T: Topology>: Debug + DynClone + Send + Sync {
|
||||
async fn deploy_alert_receiver(&self, topology: &T) -> Result<Outcome, InterpretError>;
|
||||
}
|
||||
|
||||
dyn_clone::clone_trait_object!(<T> AlertReceiverDeployment<T>);
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct AlertReceiver {
|
||||
pub receiver_id: String,
|
||||
pub receiver_installed: bool,
|
||||
}
|
||||
|
102
harmony/src/modules/monitoring/alertmanager_types.rs
Normal file
102
harmony/src/modules/monitoring/alertmanager_types.rs
Normal file
@ -0,0 +1,102 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use url::Url;
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AlertManagerValues {
|
||||
pub alertmanager: AlertManager,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AlertManager {
|
||||
pub enabled: bool,
|
||||
pub config: AlertManagerConfig,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, Serialize, Deserialize)]
|
||||
pub struct AlertChannelConfig {
|
||||
pub receiver: AlertChannelReceiver,
|
||||
pub route: AlertChannelRoute,
|
||||
pub global_config: Option<AlertChannelGlobalConfig>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AlertChannelReceiver {
|
||||
pub name: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub slack_configs: Option<Vec<SlackConfig>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub webhook_configs: Option<Vec<WebhookConfig>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AlertManagerRoute {
|
||||
pub group_by: Vec<String>,
|
||||
pub group_wait: String,
|
||||
pub group_interval: String,
|
||||
pub repeat_interval: String,
|
||||
pub routes: Vec<AlertChannelRoute>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AlertChannelGlobalConfig {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub slack_api_url: Option<Url>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct SlackConfig {
|
||||
pub channel: String,
|
||||
pub send_resolved: bool,
|
||||
pub title: String,
|
||||
pub text: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct WebhookConfig {
|
||||
pub url: Url,
|
||||
pub send_resolved: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AlertChannelRoute {
|
||||
pub receiver: String,
|
||||
pub matchers: Vec<String>,
|
||||
#[serde(default)]
|
||||
pub r#continue: bool,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct AlertManagerConfig {
|
||||
pub global: Option<AlertChannelGlobalConfig>,
|
||||
pub route: AlertManagerRoute,
|
||||
pub receivers: Vec<AlertChannelReceiver>,
|
||||
}
|
||||
|
||||
impl AlertManagerValues {
|
||||
pub fn default() -> Self {
|
||||
Self {
|
||||
alertmanager: AlertManager {
|
||||
enabled: true,
|
||||
config: AlertManagerConfig {
|
||||
global: None,
|
||||
route: AlertManagerRoute {
|
||||
group_by: vec!["job".to_string()],
|
||||
group_wait: "30s".to_string(),
|
||||
group_interval: "5m".to_string(),
|
||||
repeat_interval: "12h".to_string(),
|
||||
routes: vec![AlertChannelRoute {
|
||||
receiver: "null".to_string(),
|
||||
matchers: vec!["alertname=Watchdog".to_string()],
|
||||
r#continue: false,
|
||||
}],
|
||||
},
|
||||
receivers: vec![AlertChannelReceiver {
|
||||
name: "null".to_string(),
|
||||
slack_configs: None,
|
||||
webhook_configs: None,
|
||||
}],
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
}
|
@ -1,55 +1,168 @@
|
||||
use super::{
|
||||
discord_alert_manager::discord_alert_manager_score, kube_prometheus_monitor::AlertManagerConfig,
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use serde_json::Value;
|
||||
use serde::Serialize;
|
||||
use serde_yaml::Value;
|
||||
use tokio::sync::OnceCell;
|
||||
use url::Url;
|
||||
|
||||
use crate::{
|
||||
interpret::{InterpretError, Outcome},
|
||||
topology::K8sAnywhereTopology,
|
||||
data::{Id, Version},
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
score::Score,
|
||||
topology::{
|
||||
HelmCommand, K8sAnywhereTopology, Topology,
|
||||
oberservability::monitoring::{AlertReceiver, AlertReceiverDeployment},
|
||||
},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
#[async_trait]
|
||||
impl<T: Topology + DiscordWebhookReceiver> AlertReceiverDeployment<T> for DiscordWebhookConfig {
|
||||
async fn deploy_alert_receiver(&self, topology: &T) -> Result<Outcome, InterpretError> {
|
||||
topology.deploy_discord_webhook_receiver(self.clone()).await
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct DiscordWebhookConfig {
|
||||
pub webhook_url: Url,
|
||||
pub name: String,
|
||||
pub send_resolved_notifications: bool,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait DiscordWebhookReceiver {
|
||||
fn deploy_discord_webhook_receiver(
|
||||
async fn deploy_discord_webhook_receiver(
|
||||
&self,
|
||||
_notification_adapter_id: &str,
|
||||
config: DiscordWebhookConfig,
|
||||
) -> Result<Outcome, InterpretError>;
|
||||
|
||||
fn delete_discord_webhook_receiver(
|
||||
&self,
|
||||
_notification_adapter_id: &str,
|
||||
config: DiscordWebhookConfig,
|
||||
) -> Result<Outcome, InterpretError>;
|
||||
}
|
||||
|
||||
// trait used to generate alert manager config values impl<T: Topology + AlertManagerConfig> Monitor for KubePrometheus
|
||||
pub trait AlertManagerConfig<T> {
|
||||
fn get_alert_manager_config(&self) -> Result<Value, InterpretError>;
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: DiscordWebhookReceiver> AlertManagerConfig<T> for DiscordWebhookConfig {
|
||||
fn get_alert_manager_config(&self) -> Result<Value, InterpretError> {
|
||||
async fn get_alert_manager_config(&self) -> Result<Value, InterpretError> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl DiscordWebhookReceiver for K8sAnywhereTopology {
|
||||
fn deploy_discord_webhook_receiver(
|
||||
async fn deploy_discord_webhook_receiver(
|
||||
&self,
|
||||
_notification_adapter_id: &str,
|
||||
config: DiscordWebhookConfig,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
todo!()
|
||||
let receiver_key = config.name.clone();
|
||||
let mut adapters_map_guard = self.alert_receivers.lock().await;
|
||||
|
||||
let cell = adapters_map_guard
|
||||
.entry(receiver_key.clone())
|
||||
.or_insert_with(OnceCell::new);
|
||||
|
||||
if let Some(initialized_receiver) = cell.get() {
|
||||
return Ok(Outcome::success(format!(
|
||||
"Discord Webhook adapter for '{}' already initialized.",
|
||||
initialized_receiver.receiver_id
|
||||
)));
|
||||
}
|
||||
|
||||
let final_state = cell
|
||||
.get_or_try_init(|| async {
|
||||
initialize_discord_webhook_receiver(config.clone(), self).await
|
||||
})
|
||||
.await?;
|
||||
|
||||
Ok(Outcome::success(format!(
|
||||
"Discord Webhook Receiver for '{}' ensured/initialized.",
|
||||
final_state.receiver_id
|
||||
)))
|
||||
}
|
||||
|
||||
fn delete_discord_webhook_receiver(
|
||||
&self,
|
||||
_notification_adapter_id: &str,
|
||||
_config: DiscordWebhookConfig,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
async fn initialize_discord_webhook_receiver(
|
||||
conf: DiscordWebhookConfig,
|
||||
topology: &K8sAnywhereTopology,
|
||||
) -> Result<AlertReceiver, InterpretError> {
|
||||
println!(
|
||||
"Attempting to initialize Discord adapter for: {}",
|
||||
conf.name
|
||||
);
|
||||
let score = DiscordWebhookReceiverScore {
|
||||
config: conf.clone(),
|
||||
};
|
||||
let inventory = Inventory::autoload();
|
||||
let interpret = score.create_interpret();
|
||||
|
||||
interpret.execute(&inventory, topology).await?;
|
||||
|
||||
Ok(AlertReceiver {
|
||||
receiver_id: conf.name,
|
||||
receiver_installed: true,
|
||||
})
|
||||
}
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
struct DiscordWebhookReceiverScore {
|
||||
config: DiscordWebhookConfig,
|
||||
}
|
||||
|
||||
impl<T: Topology + HelmCommand> Score<T> for DiscordWebhookReceiverScore {
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(DiscordWebhookReceiverScoreInterpret {
|
||||
config: self.config.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn name(&self) -> String {
|
||||
"DiscordWebhookReceiverScore".to_string()
|
||||
}
|
||||
}
|
||||
#[derive(Debug)]
|
||||
struct DiscordWebhookReceiverScoreInterpret {
|
||||
config: DiscordWebhookConfig,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + HelmCommand> Interpret<T> for DiscordWebhookReceiverScoreInterpret {
|
||||
async fn execute(
|
||||
&self,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
discord_alert_manager_score(
|
||||
self.config.webhook_url.clone(),
|
||||
self.config.name.clone(),
|
||||
self.config.name.clone(),
|
||||
)
|
||||
.create_interpret()
|
||||
.execute(inventory, topology)
|
||||
.await
|
||||
}
|
||||
|
||||
fn get_name(&self) -> InterpretName {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
108
harmony/src/modules/monitoring/kube_prometheus_monitor.rs
Normal file
108
harmony/src/modules/monitoring/kube_prometheus_monitor.rs
Normal file
@ -0,0 +1,108 @@
|
||||
use async_trait::async_trait;
|
||||
use serde::Serialize;
|
||||
use serde_yaml::Value;
|
||||
|
||||
use crate::{
|
||||
data::{Id, Version},
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
score::Score,
|
||||
topology::{
|
||||
HelmCommand, Topology,
|
||||
oberservability::monitoring::{AlertReceiverDeployment, Monitor},
|
||||
},
|
||||
};
|
||||
|
||||
use super::{
|
||||
config::KubePrometheusConfig, kube_prometheus_helm_chart::kube_prometheus_helm_chart_score,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct KubePrometheus<T> {
|
||||
alert_receivers: Vec<Box<dyn AlertReceiverDeployment<T>>>,
|
||||
config: KubePrometheusConfig,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait AlertManagerConfig<T> {
|
||||
async fn get_alert_manager_config(&self) -> Result<Value, InterpretError>;
|
||||
}
|
||||
|
||||
impl<T: Topology> KubePrometheus<T> {
|
||||
pub fn new() -> Self {
|
||||
Self {
|
||||
alert_receivers: Vec::new(),
|
||||
config: KubePrometheusConfig::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + HelmCommand + std::fmt::Debug> Monitor<T> for KubePrometheus<T> {
|
||||
async fn deploy_monitor(&self, topology: &T) -> Result<Outcome, InterpretError> {
|
||||
for alert_receiver in &self.alert_receivers {
|
||||
alert_receiver.deploy_alert_receiver(topology).await?;
|
||||
}
|
||||
let score = KubePrometheusScore {
|
||||
config: self.config.clone(),
|
||||
};
|
||||
let inventory = Inventory::autoload();
|
||||
score.create_interpret().execute(&inventory, topology).await
|
||||
}
|
||||
|
||||
async fn delete_monitor(&self, _topolgy: &T) -> Result<Outcome, InterpretError> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
struct KubePrometheusScore {
|
||||
config: KubePrometheusConfig,
|
||||
}
|
||||
|
||||
impl<T: Topology + HelmCommand> Score<T> for KubePrometheusScore {
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(KubePromethusScoreInterpret {
|
||||
score: self.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn name(&self) -> String {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
struct KubePromethusScoreInterpret {
|
||||
score: KubePrometheusScore,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + HelmCommand> Interpret<T> for KubePromethusScoreInterpret {
|
||||
async fn execute(
|
||||
&self,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
kube_prometheus_helm_chart_score(&self.score.config)
|
||||
.create_interpret()
|
||||
.execute(inventory, topology)
|
||||
.await
|
||||
}
|
||||
|
||||
fn get_name(&self) -> InterpretName {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
}
|
@ -1,5 +1,7 @@
|
||||
pub mod alertmanager_types;
|
||||
mod config;
|
||||
mod discord_alert_manager;
|
||||
pub mod discord_webhook_sender;
|
||||
mod kube_prometheus;
|
||||
mod kube_prometheus_helm_chart;
|
||||
pub mod kube_prometheus_monitor;
|
||||
pub mod monitoring_alerting;
|
||||
|
@ -14,8 +14,7 @@ use crate::{
|
||||
};
|
||||
|
||||
use super::{
|
||||
config::KubePrometheusConfig, discord_alert_manager::discord_alert_manager_score,
|
||||
kube_prometheus::kube_prometheus_helm_chart_score,
|
||||
config::KubePrometheusConfig, kube_prometheus_helm_chart::kube_prometheus_helm_chart_score,
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
|
Loading…
Reference in New Issue
Block a user