Monitor an application within a tenant #86

Merged
letian merged 22 commits from feat/crd-alertmanager-configs into master 2025-08-04 21:42:05 +00:00
16 changed files with 949 additions and 157 deletions
Showing only changes of commit a5deda647b - Show all commits

View File

@ -50,10 +50,8 @@ async fn main() {
let app = ApplicationScore { let app = ApplicationScore {
features: vec![Box::new(Monitoring { features: vec![Box::new(Monitoring {
application: application.clone(),
alert_receiver: vec![Box::new(webhook_receiver)], alert_receiver: vec![Box::new(webhook_receiver)],
service_monitors: vec![], application: application.clone(),
alert_rules: vec![],
})], })],
application, application,
}; };

View File

@ -43,14 +43,12 @@ async fn main() {
let app = ApplicationScore { let app = ApplicationScore {
features: vec![ features: vec![
Box::new(ContinuousDelivery { // Box::new(ContinuousDelivery {
application: application.clone(), // application: application.clone(),
}), // }),
Box::new(Monitoring { Box::new(Monitoring {
application: application.clone(), application: application.clone(),
alert_receiver: vec![Box::new(discord_receiver), Box::new(webhook_receiver)], alert_receiver: vec![Box::new(discord_receiver), Box::new(webhook_receiver)],
Outdated
Review

As JG mentioned, we want to keep this as simple as possible, as opinionated as possible.

If we keep it named Monitoring we can just replace the inner implementation later.

As JG mentioned, we want to keep this as simple as possible, as opinionated as possible. If we keep it named `Monitoring` we can just replace the inner implementation later.
service_monitors: vec![],
alert_rules: vec![],
}), }),
// TODO add backups, multisite ha, etc // TODO add backups, multisite ha, etc
], ],

View File

@ -17,7 +17,7 @@ use kube::{
runtime::wait::await_condition, runtime::wait::await_condition,
}; };
use log::{debug, error, trace}; use log::{debug, error, trace};
use serde::de::DeserializeOwned; use serde::{Serialize, de::DeserializeOwned};
use similar::{DiffableStr, TextDiff}; use similar::{DiffableStr, TextDiff};
#[derive(new, Clone)] #[derive(new, Clone)]
@ -25,6 +25,15 @@ pub struct K8sClient {
client: Client, client: Client,
} }
impl Serialize for K8sClient {
Review

I'm not sure it's really a good thing that we try to Serialize the K8sClient 🤔 maybe we should find a way to avoid needing to store the client into the CrdPrometheus sender

I'm not sure it's really a good thing that we try to `Serialize` the `K8sClient` 🤔 maybe we should find a way to avoid needing to store the `client` into the `CrdPrometheus` sender
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
todo!()
}
}
impl std::fmt::Debug for K8sClient { impl std::fmt::Debug for K8sClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
// This is a poor man's debug implementation for now as kube::Client does not provide much // This is a poor man's debug implementation for now as kube::Client does not provide much

View File

@ -1,28 +1,37 @@
use std::{fs, process::Command, sync::Arc}; use std::{process::Command, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use inquire::Confirm; use inquire::Confirm;
use log::{debug, info, warn}; use log::{debug, info, warn};
use serde::Serialize; use serde::Serialize;
use tempfile::tempdir;
use tokio::sync::OnceCell; use tokio::sync::OnceCell;
use crate::{ use crate::{
executors::ExecutorError, executors::ExecutorError,
interpret::{InterpretError, Outcome}, interpret::{InterpretError, InterpretStatus, Outcome},
inventory::Inventory, inventory::Inventory,
maestro::Maestro, maestro::Maestro,
modules::{ modules::{
application::Application,
k3d::K3DInstallationScore, k3d::K3DInstallationScore,
monitoring::kube_prometheus::crd::prometheus_operator::prometheus_operator_helm_chart_score, monitoring::kube_prometheus::crd::{
crd_alertmanager_config::{CRDAlertManagerReceiver, CRDPrometheus},
prometheus_operator::prometheus_operator_helm_chart_score,
}, },
prometheus::{
k3d_prometheus_alerting_score::K3dPrometheusCRDAlertingScore,
k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore,
prometheus::{PrometheusApplicationMonitoring, PrometheusMonitoring},
},
},
score::Score,
topology::LocalhostTopology, topology::LocalhostTopology,
}; };
use super::{ use super::{
DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, Topology, DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, Topology,
k8s::K8sClient, k8s::K8sClient,
oberservability::monitoring::PrometheusK8sAnywhere, oberservability::monitoring::{AlertReceiver, AlertSender},
tenant::{TenantConfig, TenantManager, k8s::K8sTenantManager}, tenant::{TenantConfig, TenantManager, k8s::K8sTenantManager},
}; };
@ -64,48 +73,82 @@ impl K8sclient for K8sAnywhereTopology {
} }
#[async_trait] #[async_trait]
impl PrometheusK8sAnywhere for K8sAnywhereTopology { impl PrometheusMonitoring<CRDPrometheus> for K8sAnywhereTopology {
async fn ensure_prometheus_operator( async fn ensure_prometheus_operator(
&self, &self,
namespace: Option<String>, sender: &CRDPrometheus,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
let output = Command::new("sh")
.args(["-c", "kubectl get crd -A | grep -i prometheuses"])
.output()
.map_err(|e| InterpretError::new(format!("could not connect to cluster: {}", e)))?;
if output.status.success() && output.stdout.is_empty() {
if let Some(Some(k8s_state)) = self.k8s_state.get() { if let Some(Some(k8s_state)) = self.k8s_state.get() {
match k8s_state.source { match k8s_state.source {
K8sSource::LocalK3d => { K8sSource::LocalK3d => {
debug!("Working on LocalK3d, installing prometheus operator"); debug!("installing prometheus operator");
let output = Command::new("sh") let op_score =
.args(["-c", "kubectl get all -A | grep -i kube-prome-operator"]) prometheus_operator_helm_chart_score(sender.namespace.clone());
.output() op_score
.map_err(|e| { .create_interpret()
InterpretError::new(format!("could not connect to cluster: {}", e)) .execute(&Inventory::empty(), self)
})?; .await?;
if output.status.success() && !output.stdout.is_empty() { return Ok(Outcome::success(
debug!("Prometheus operator is already present, skipping install"); "installed prometheus operator".to_string(),
return Ok(Outcome::noop()); ));
}
self.install_k3d_prometheus_operator(namespace).await?;
Ok(Outcome::success(format!("prometheus operator available")))
} }
K8sSource::Kubeconfig => { K8sSource::Kubeconfig => {
//TODO this doesnt feel robust enough to ensure that the operator is indeed debug!("unable to install prometheus operator, contact cluster admin");
//available return Ok(Outcome::noop());
debug!(
"Working outside of LocalK3d topology, skipping install of client prometheus operator"
);
Ok(Outcome::success(format!("prometheus operator available")))
} }
} }
} else { } else {
Err(InterpretError::new( warn!("Unable to detect k8s_state. Skipping Prometheus Operator install.");
"failed to install prometheus operator".to_string(), return Ok(Outcome::noop());
}
}
debug!("Prometheus operator is already present, skipping install");
Ok(Outcome::success(
"prometheus operator present in cluster".to_string(),
)) ))
} }
} }
#[async_trait]
impl PrometheusApplicationMonitoring<CRDPrometheus> for K8sAnywhereTopology {
async fn configure_receivers(
&self,
receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>,
) -> Option<Vec<Box<dyn CRDAlertManagerReceiver>>> {
let Some(receivers) = receivers else {
return None;
};
todo!()
}
async fn install_prometheus(
&self,
sender: &CRDPrometheus,
inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>,
) -> Result<Outcome, InterpretError> {
let po_result = self.ensure_prometheus_operator(sender).await?;
if po_result.status == InterpretStatus::NOOP {
debug!("Skipping Prometheus CR installation due to missing operator.");
return Ok(Outcome::noop());
}
self.get_k8s_prometheus_application_score(sender.clone(), receivers)
.await
.create_interpret()
.execute(inventory, self)
.await?;
Ok(Outcome::success(format!("No action, working on cluster ")))
}
} }
impl Serialize for K8sAnywhereTopology { impl Serialize for K8sAnywhereTopology {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where where
S: serde::Serializer, S: serde::Serializer,
{ {
@ -130,18 +173,20 @@ impl K8sAnywhereTopology {
} }
} }
async fn install_k3d_prometheus_operator( async fn get_k8s_prometheus_application_score(
&self, &self,
namespace: Option<String>, sender: CRDPrometheus,
) -> Result<Outcome, InterpretError> { receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>,
let maestro = Maestro::initialize(Inventory::autoload(), LocalhostTopology::new()).await?; ) -> K8sPrometheusCRDAlertingScore {
let tenant = self.get_k8s_tenant_manager().unwrap(); K8sPrometheusCRDAlertingScore {
let namespace_name = tenant.get_tenant_config().await; sender,
let namespace = namespace_name receivers: self
.map(|ns| ns.name.clone()) .configure_receivers(receivers)
.unwrap_or_else(|| namespace.unwrap_or_else(|| "default".to_string())); .await
let score = crate::modules::monitoring::kube_prometheus::crd::prometheus_operator::prometheus_operator_helm_chart_score(namespace); .unwrap_or_else(|| vec![]),
maestro.interpret(Box::new(score)).await service_monitors: vec![],
prometheus_rules: vec![],
}
} }
fn is_helm_available(&self) -> Result<(), String> { fn is_helm_available(&self) -> Result<(), String> {

View File

@ -1,3 +1,5 @@
use std::any::Any;
use async_trait::async_trait; use async_trait::async_trait;
use log::debug; use log::debug;
@ -64,6 +66,7 @@ pub trait AlertReceiver<S: AlertSender>: std::fmt::Debug + Send + Sync {
async fn install(&self, sender: &S) -> Result<Outcome, InterpretError>; async fn install(&self, sender: &S) -> Result<Outcome, InterpretError>;
fn name(&self) -> String; fn name(&self) -> String;
fn clone_box(&self) -> Box<dyn AlertReceiver<S>>; fn clone_box(&self) -> Box<dyn AlertReceiver<S>>;
fn as_any(&self) -> &dyn Any;
} }
#[async_trait] #[async_trait]
@ -76,11 +79,3 @@ pub trait AlertRule<S: AlertSender>: std::fmt::Debug + Send + Sync {
pub trait ScrapeTarget<S: AlertSender> { pub trait ScrapeTarget<S: AlertSender> {
async fn install(&self, sender: &S) -> Result<(), InterpretError>; async fn install(&self, sender: &S) -> Result<(), InterpretError>;
} }
#[async_trait]
pub trait PrometheusK8sAnywhere {
async fn ensure_prometheus_operator(
&self,
namespace: Option<String>,
) -> Result<Outcome, InterpretError>;
}

View File

@ -1,14 +1,11 @@
use std::sync::Arc; use std::sync::Arc;
use crate::modules::application::{ApplicationFeature, OCICompliant}; use crate::modules::application::{Application, ApplicationFeature};
use crate::modules::monitoring::application_monitoring::crd_application_monitoring_alerting::CRDApplicationAlertingScore; use crate::modules::monitoring::application_monitoring::application_monitoring_score::ApplicationMonitoringScore;
use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::CRDAlertManagerReceiver; use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::{
use crate::modules::monitoring::kube_prometheus::crd::crd_default_rules::build_default_application_rules; AlertmanagerConfig, AlertmanagerConfigSpec, CRDPrometheus,
use crate::modules::monitoring::kube_prometheus::crd::crd_prometheus_rules::RuleGroup;
use crate::modules::monitoring::kube_prometheus::crd::service_monitor::{
ServiceMonitor, ServiceMonitorSpec,
}; };
use crate::topology::oberservability::monitoring::PrometheusK8sAnywhere;
use crate::{ use crate::{
inventory::Inventory, inventory::Inventory,
modules::monitoring::{ modules::monitoring::{
@ -17,17 +14,20 @@ use crate::{
score::Score, score::Score,
topology::{HelmCommand, K8sclient, Topology, Url, tenant::TenantManager}, topology::{HelmCommand, K8sclient, Topology, Url, tenant::TenantManager},
}; };
use crate::{
modules::prometheus::prometheus::PrometheusApplicationMonitoring,
topology::oberservability::monitoring::AlertReceiver,
};
use async_trait::async_trait; use async_trait::async_trait;
use base64::{Engine as _, engine::general_purpose}; use base64::{Engine as _, engine::general_purpose};
use kube::api::ObjectMeta; use kube::api::ObjectMeta;
use log::{debug, info}; use log::{debug, info};
use serde_json::json;
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct Monitoring { pub struct Monitoring {
pub application: Arc<dyn OCICompliant>, pub application: Arc<dyn Application>,
pub alert_receiver: Vec<Box<dyn CRDAlertManagerReceiver>>, pub alert_receiver: Vec<Box<dyn AlertReceiver<CRDPrometheus>>>,
pub service_monitors: Vec<ServiceMonitor>,
pub alert_rules: Vec<RuleGroup>,
} }
#[async_trait] #[async_trait]
@ -38,25 +38,33 @@ impl<
+ TenantManager + TenantManager
+ K8sclient + K8sclient
+ std::fmt::Debug + std::fmt::Debug
+ PrometheusK8sAnywhere, + PrometheusApplicationMonitoring<CRDPrometheus>,
> ApplicationFeature<T> for Monitoring > ApplicationFeature<T> for Monitoring
{ {
async fn ensure_installed(&self, topology: &T) -> Result<(), String> { async fn ensure_installed(&self, topology: &T) -> Result<(), String> {
info!("Ensuring monitoring is available for application"); info!("Ensuring monitoring is available for application");
let namespace = self.application.name().clone(); let namespace = topology
let mut alerting_score = CRDApplicationAlertingScore { .get_tenant_config()
.await
.map(|ns| ns.name.clone())
.unwrap_or_else(|| self.application.name());
let mut alerting_score = ApplicationMonitoringScore {
sender: CRDPrometheus {
alertmanager_configs: AlertmanagerConfig {
metadata: ObjectMeta {
..Default::default()
},
spec: AlertmanagerConfigSpec { data: json! {""} },
},
namespace: namespace.clone(), namespace: namespace.clone(),
client: topology.k8s_client().await.unwrap(),
},
application: self.application.clone(),
receivers: self.alert_receiver.clone(), receivers: self.alert_receiver.clone(),
service_monitors: self.service_monitors.clone(),
prometheus_rules: self.alert_rules.clone(),
}; };
let ntfy = NtfyScore { let ntfy = NtfyScore {
// namespace: topology namespace: namespace.clone(),
// .get_tenant_config()
// .await
// .expect("couldn't get tenant config")
// .name,
namespace: self.application.name(),
host: "localhost".to_string(), host: "localhost".to_string(),
}; };
ntfy.create_interpret() ntfy.create_interpret()
@ -87,7 +95,7 @@ impl<
url::Url::parse( url::Url::parse(
format!( format!(
"http://ntfy.{}.svc.cluster.local/rust-web-app?auth={ntfy_default_auth_param}", "http://ntfy.{}.svc.cluster.local/rust-web-app?auth={ntfy_default_auth_param}",
self.application.name() namespace.clone()
) )
.as_str(), .as_str(),
) )
@ -96,32 +104,6 @@ impl<
}; };
alerting_score.receivers.push(Box::new(ntfy_receiver)); alerting_score.receivers.push(Box::new(ntfy_receiver));
let service_monitor = ServiceMonitor {
metadata: ObjectMeta {
name: Some(self.application.name().clone()),
labels: Some(std::collections::BTreeMap::from([
("alertmanagerConfig".to_string(), "enabled".to_string()),
("client".to_string(), "prometheus".to_string()),
(
"app.kubernetes.io/name".to_string(),
"kube-state-metrics".to_string(),
),
])),
namespace: Some(namespace),
..Default::default()
},
spec: ServiceMonitorSpec::default(),
};
alerting_score.service_monitors.push(service_monitor);
let rules_group = RuleGroup {
name: format!("{}-rules", self.application.name().clone()),
rules: build_default_application_rules(),
};
alerting_score.prometheus_rules.push(rules_group);
alerting_score alerting_score
.create_interpret() .create_interpret()
.execute(&Inventory::empty(), topology) .execute(&Inventory::empty(), topology)

View File

@ -10,6 +10,7 @@ pub use oci::*;
pub use rust::*; pub use rust::*;
use async_trait::async_trait; use async_trait::async_trait;
use serde::Serialize;
use crate::{ use crate::{
data::{Id, Version}, data::{Id, Version},
@ -78,3 +79,12 @@ impl<A: Application, T: Topology + std::fmt::Debug> Interpret<T> for Application
todo!() todo!()
} }
} }
impl Serialize for dyn Application {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
todo!()
}
}

View File

@ -1,3 +1,4 @@
use std::any::Any;
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::Arc; use std::sync::Arc;
@ -11,7 +12,7 @@ use serde_json::json;
use serde_yaml::{Mapping, Value}; use serde_yaml::{Mapping, Value};
use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::{ use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::{
AlertmanagerConfig, AlertmanagerConfigSpec, CRDAlertManager, CRDAlertManagerReceiver, AlertmanagerConfig, AlertmanagerConfigSpec, CRDAlertManagerReceiver, CRDPrometheus,
}; };
use crate::topology::k8s::K8sClient; use crate::topology::k8s::K8sClient;
use crate::{ use crate::{
@ -100,8 +101,8 @@ impl CRDAlertManagerReceiver for DiscordWebhook {
} }
#[async_trait] #[async_trait]
impl AlertReceiver<CRDAlertManager> for DiscordWebhook { impl AlertReceiver<CRDPrometheus> for DiscordWebhook {
async fn install(&self, sender: &CRDAlertManager) -> Result<Outcome, InterpretError> { async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> {
sender sender
.client .client
.apply(&sender.alertmanager_configs, Some(&sender.namespace)) .apply(&sender.alertmanager_configs, Some(&sender.namespace))
@ -114,9 +115,12 @@ impl AlertReceiver<CRDAlertManager> for DiscordWebhook {
fn name(&self) -> String { fn name(&self) -> String {
"discord-webhook".to_string() "discord-webhook".to_string()
} }
fn clone_box(&self) -> Box<dyn AlertReceiver<CRDAlertManager>> { fn clone_box(&self) -> Box<dyn AlertReceiver<CRDPrometheus>> {
Box::new(self.clone()) Box::new(self.clone())
} }
fn as_any(&self) -> &dyn Any {
self
}
} }
#[async_trait] #[async_trait]
@ -130,6 +134,9 @@ impl AlertReceiver<Prometheus> for DiscordWebhook {
fn clone_box(&self) -> Box<dyn AlertReceiver<Prometheus>> { fn clone_box(&self) -> Box<dyn AlertReceiver<Prometheus>> {
Box::new(self.clone()) Box::new(self.clone())
} }
fn as_any(&self) -> &dyn Any {
self
}
} }
#[async_trait] #[async_trait]
@ -153,6 +160,9 @@ impl AlertReceiver<KubePrometheus> for DiscordWebhook {
fn name(&self) -> String { fn name(&self) -> String {
"discord-webhook".to_string() "discord-webhook".to_string()
} }
fn as_any(&self) -> &dyn Any {
self
}
} }
#[async_trait] #[async_trait]

View File

@ -1,4 +1,4 @@
use std::sync::Arc; use std::{any::Any, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use k8s_openapi::api::core::v1::Secret; use k8s_openapi::api::core::v1::Secret;
@ -13,8 +13,7 @@ use crate::{
modules::monitoring::{ modules::monitoring::{
kube_prometheus::{ kube_prometheus::{
crd::crd_alertmanager_config::{ crd::crd_alertmanager_config::{
AlertmanagerConfig, AlertmanagerConfigSpec, CRDAlertManager, AlertmanagerConfig, AlertmanagerConfigSpec, CRDAlertManagerReceiver, CRDPrometheus,
CRDAlertManagerReceiver,
}, },
prometheus::{KubePrometheus, KubePrometheusReceiver}, prometheus::{KubePrometheus, KubePrometheusReceiver},
types::{AlertChannelConfig, AlertManagerChannelConfig}, types::{AlertChannelConfig, AlertManagerChannelConfig},
@ -77,8 +76,8 @@ impl CRDAlertManagerReceiver for WebhookReceiver {
} }
#[async_trait] #[async_trait]
impl AlertReceiver<CRDAlertManager> for WebhookReceiver { impl AlertReceiver<CRDPrometheus> for WebhookReceiver {
async fn install(&self, sender: &CRDAlertManager) -> Result<Outcome, InterpretError> { async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> {
sender sender
.client .client
.apply(&sender.alertmanager_configs, Some(&sender.namespace)) .apply(&sender.alertmanager_configs, Some(&sender.namespace))
@ -91,9 +90,12 @@ impl AlertReceiver<CRDAlertManager> for WebhookReceiver {
fn name(&self) -> String { fn name(&self) -> String {
"webhook-receiver".to_string() "webhook-receiver".to_string()
} }
fn clone_box(&self) -> Box<dyn AlertReceiver<CRDAlertManager>> { fn clone_box(&self) -> Box<dyn AlertReceiver<CRDPrometheus>> {
Box::new(self.clone()) Box::new(self.clone())
} }
fn as_any(&self) -> &dyn Any {
self
}
} }
#[async_trait] #[async_trait]
@ -107,6 +109,9 @@ impl AlertReceiver<Prometheus> for WebhookReceiver {
fn clone_box(&self) -> Box<dyn AlertReceiver<Prometheus>> { fn clone_box(&self) -> Box<dyn AlertReceiver<Prometheus>> {
Box::new(self.clone()) Box::new(self.clone())
} }
fn as_any(&self) -> &dyn Any {
self
}
} }
#[async_trait] #[async_trait]
@ -129,6 +134,9 @@ impl AlertReceiver<KubePrometheus> for WebhookReceiver {
fn clone_box(&self) -> Box<dyn AlertReceiver<KubePrometheus>> { fn clone_box(&self) -> Box<dyn AlertReceiver<KubePrometheus>> {
Box::new(self.clone()) Box::new(self.clone())
} }
fn as_any(&self) -> &dyn Any {
self
}
} }
#[async_trait] #[async_trait]

View File

@ -0,0 +1,86 @@
use std::sync::Arc;
use async_trait::async_trait;
use serde::Serialize;
use crate::{
data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
modules::{
application::Application,
monitoring::kube_prometheus::crd::crd_alertmanager_config::{
CRDAlertManagerReceiver, CRDPrometheus,
},
prometheus::prometheus::{PrometheusApplicationMonitoring, PrometheusMonitoring},
},
score::Score,
topology::{
Topology,
oberservability::monitoring::{AlertReceiver, AlertSender},
tenant::TenantManager,
},
};
#[derive(Debug, Clone, Serialize)]
pub struct ApplicationMonitoringScore {
pub sender: CRDPrometheus,
pub application: Arc<dyn Application>,
pub receivers: Vec<Box<dyn AlertReceiver<CRDPrometheus>>>,
}
impl<T: Topology + PrometheusApplicationMonitoring<CRDPrometheus>> Score<T>
for ApplicationMonitoringScore
{
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(ApplicationMonitoringInterpret {
score: self.clone(),
})
}
fn name(&self) -> String {
"ApplicationMonitoringScore".to_string()
}
}
#[derive(Debug)]
pub struct ApplicationMonitoringInterpret {
score: ApplicationMonitoringScore,
}
#[async_trait]
impl<T: Topology + PrometheusApplicationMonitoring<CRDPrometheus>> Interpret<T>
for ApplicationMonitoringInterpret
{
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
//TODO need to pass the receivers, rules, etc. to the config in a generic way that
topology
.install_prometheus(
&self.score.sender,
&inventory,
Some(self.score.receivers.clone()),
)
.await
}
fn get_name(&self) -> InterpretName {
todo!()
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@ -1,2 +1,2 @@
pub mod crd_application_monitoring_alerting; pub mod application_monitoring_score;
pub mod k8s_application_monitoring_score; pub mod k8s_application_monitoring_score;

View File

@ -6,6 +6,8 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::topology::{ use crate::topology::{
Topology,
installable::Installable,
k8s::K8sClient, k8s::K8sClient,
oberservability::monitoring::{AlertReceiver, AlertSender}, oberservability::monitoring::{AlertReceiver, AlertSender},
}; };
@ -23,37 +25,28 @@ pub struct AlertmanagerConfigSpec {
pub data: serde_json::Value, pub data: serde_json::Value,
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone, Serialize)]
pub struct CRDAlertManager { pub struct CRDPrometheus {
pub alertmanager_configs: AlertmanagerConfig, pub alertmanager_configs: AlertmanagerConfig,
pub namespace: String, pub namespace: String,
pub client: Arc<K8sClient>, pub client: Arc<K8sClient>,
} }
impl AlertSender for CRDAlertManager { impl AlertSender for CRDPrometheus {
fn name(&self) -> String { fn name(&self) -> String {
"CRDAlertManager".to_string() "CRDAlertManager".to_string()
} }
} }
impl Clone for Box<dyn AlertReceiver<CRDAlertManager>> { impl Clone for Box<dyn AlertReceiver<CRDPrometheus>> {
fn clone(&self) -> Self { fn clone(&self) -> Self {
self.clone_box() self.clone_box()
} }
} }
impl Serialize for Box<dyn AlertReceiver<CRDAlertManager>> {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
todo!()
}
}
#[async_trait] #[async_trait]
pub trait CRDAlertManagerReceiver: pub trait CRDAlertManagerReceiver:
AlertReceiver<CRDAlertManager> + Send + Sync + std::fmt::Debug AlertReceiver<CRDPrometheus> + Send + Sync + std::fmt::Debug
{ {
fn name(&self) -> String; fn name(&self) -> String;
async fn configure_receiver(&self, client: &Arc<K8sClient>, ns: String) -> AlertmanagerConfig; async fn configure_receiver(&self, client: &Arc<K8sClient>, ns: String) -> AlertmanagerConfig;
@ -68,7 +61,16 @@ impl Clone for Box<dyn CRDAlertManagerReceiver> {
} }
impl Serialize for Box<dyn CRDAlertManagerReceiver> { impl Serialize for Box<dyn CRDAlertManagerReceiver> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
todo!()
}
}
impl Serialize for Box<dyn AlertReceiver<CRDPrometheus>> {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where where
S: serde::Serializer, S: serde::Serializer,
{ {

View File

@ -9,8 +9,9 @@ use serde::Serialize;
use tokio::process::Command; use tokio::process::Command;
use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::{ use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::{
AlertmanagerConfig, CRDAlertManager, CRDAlertManagerReceiver, AlertmanagerConfig, CRDAlertManagerReceiver, CRDPrometheus,
}; };
use crate::modules::monitoring::kube_prometheus::crd::crd_default_rules::build_default_application_rules;
use crate::modules::monitoring::kube_prometheus::crd::crd_grafana::{ use crate::modules::monitoring::kube_prometheus::crd::crd_grafana::{
Grafana, GrafanaDashboard, GrafanaDashboardSpec, GrafanaDatasource, GrafanaDatasourceConfig, Grafana, GrafanaDashboard, GrafanaDashboardSpec, GrafanaDatasource, GrafanaDatasourceConfig,
GrafanaDatasourceSpec, GrafanaSpec, GrafanaDatasourceSpec, GrafanaSpec,
@ -19,8 +20,9 @@ use crate::modules::monitoring::kube_prometheus::crd::crd_prometheus_rules::{
PrometheusRule, PrometheusRuleSpec, RuleGroup, PrometheusRule, PrometheusRuleSpec, RuleGroup,
}; };
use crate::modules::monitoring::kube_prometheus::crd::grafana_default_dashboard::build_default_dashboard; use crate::modules::monitoring::kube_prometheus::crd::grafana_default_dashboard::build_default_dashboard;
use crate::modules::monitoring::kube_prometheus::crd::service_monitor::ServiceMonitor; use crate::modules::monitoring::kube_prometheus::crd::service_monitor::{
use crate::topology::oberservability::monitoring::PrometheusK8sAnywhere; ServiceMonitor, ServiceMonitorSpec,
};
use crate::topology::{K8sclient, Topology, k8s::K8sClient}; use crate::topology::{K8sclient, Topology, k8s::K8sClient};
use crate::{ use crate::{
data::{Id, Version}, data::{Id, Version},
@ -37,17 +39,21 @@ use crate::{
score::Score, score::Score,
}; };
use super::prometheus::PrometheusMonitoring;
#[derive(Clone, Debug, Serialize)] #[derive(Clone, Debug, Serialize)]
pub struct CRDApplicationAlertingScore { pub struct K3dPrometheusCRDAlertingScore {

This file is almost identical to k8s_prometheus_alerting_score, the only difference being that this one has a namespace attribute whereas the k8s one has a sender that is used to retrieve the namespace.

Is there a reason for these 2 almost identical files?

Also, this file is actually unused.

This file is almost identical to `k8s_prometheus_alerting_score`, the only difference being that this one has a `namespace` attribute whereas the `k8s` one has a `sender` that is used to retrieve the namespace. Is there a reason for these 2 almost identical files? Also, this file is actually unused.
pub namespace: String, pub namespace: String,
pub receivers: Vec<Box<dyn CRDAlertManagerReceiver>>, pub receivers: Vec<Box<dyn CRDAlertManagerReceiver>>,
pub service_monitors: Vec<ServiceMonitor>, pub service_monitors: Vec<ServiceMonitor>,
pub prometheus_rules: Vec<RuleGroup>, pub prometheus_rules: Vec<RuleGroup>,
} }
impl<T: Topology + K8sclient + PrometheusK8sAnywhere> Score<T> for CRDApplicationAlertingScore { impl<T: Topology + K8sclient + PrometheusMonitoring<CRDPrometheus>> Score<T>
for K3dPrometheusCRDAlertingScore
{
fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> { fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> {
Box::new(CRDApplicationAlertingInterpret { Box::new(K3dPrometheusCRDAlertingInterpret {
namespace: self.namespace.clone(), namespace: self.namespace.clone(),
receivers: self.receivers.clone(), receivers: self.receivers.clone(),
service_monitors: self.service_monitors.clone(), service_monitors: self.service_monitors.clone(),
@ -61,7 +67,7 @@ impl<T: Topology + K8sclient + PrometheusK8sAnywhere> Score<T> for CRDApplicatio
} }
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
pub struct CRDApplicationAlertingInterpret { pub struct K3dPrometheusCRDAlertingInterpret {
pub namespace: String, pub namespace: String,
pub receivers: Vec<Box<dyn CRDAlertManagerReceiver>>, pub receivers: Vec<Box<dyn CRDAlertManagerReceiver>>,
pub service_monitors: Vec<ServiceMonitor>, pub service_monitors: Vec<ServiceMonitor>,
@ -69,8 +75,8 @@ pub struct CRDApplicationAlertingInterpret {
} }
#[async_trait] #[async_trait]
impl<T: Topology + K8sclient + PrometheusK8sAnywhere> Interpret<T> impl<T: Topology + K8sclient + PrometheusMonitoring<CRDPrometheus>> Interpret<T>
for CRDApplicationAlertingInterpret for K3dPrometheusCRDAlertingInterpret
{ {
async fn execute( async fn execute(
&self, &self,
@ -78,9 +84,7 @@ impl<T: Topology + K8sclient + PrometheusK8sAnywhere> Interpret<T>
topology: &T, topology: &T,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await.unwrap(); let client = topology.k8s_client().await.unwrap();
topology self.ensure_prometheus_operator().await?;
.ensure_prometheus_operator(Some(self.namespace.clone()))
.await?;
self.ensure_grafana_operator().await?; self.ensure_grafana_operator().await?;
self.install_prometheus(&client).await?; self.install_prometheus(&client).await?;
self.install_alert_manager(&client).await?; self.install_alert_manager(&client).await?;
@ -112,7 +116,7 @@ impl<T: Topology + K8sclient + PrometheusK8sAnywhere> Interpret<T>
} }
} }
impl CRDApplicationAlertingInterpret { impl K3dPrometheusCRDAlertingInterpret {
async fn crd_exists(&self, crd: &str) -> bool { async fn crd_exists(&self, crd: &str) -> bool {
let output = Command::new("kubectl") let output = Command::new("kubectl")
.args(["get", "crd", crd]) .args(["get", "crd", crd])
@ -190,6 +194,18 @@ impl CRDApplicationAlertingInterpret {
Ok(()) Ok(())
} }
async fn ensure_prometheus_operator(&self) -> Result<Outcome, InterpretError> {
self.install_chart(
"oci://hub.nationtech.io/harmony".to_string(),
"nt-prometheus-operator".to_string(),
)
.await?;
Ok(Outcome::success(format!(
"installed prometheus operator to ns {}",
self.namespace.clone()
)))
}
async fn ensure_grafana_operator(&self) -> Result<Outcome, InterpretError> { async fn ensure_grafana_operator(&self) -> Result<Outcome, InterpretError> {
if self.crd_exists("grafanas.grafana.integreatly.org").await { if self.crd_exists("grafanas.grafana.integreatly.org").await {
debug!("grafana CRDs already exist — skipping install."); debug!("grafana CRDs already exist — skipping install.");
@ -377,9 +393,26 @@ impl CRDApplicationAlertingInterpret {
} }
async fn install_monitors( async fn install_monitors(
&self, &self,
monitors: Vec<ServiceMonitor>, mut monitors: Vec<ServiceMonitor>,
client: &Arc<K8sClient>, client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
let default_service_monitor = ServiceMonitor {
metadata: ObjectMeta {
name: Some(self.namespace.clone()),
labels: Some(std::collections::BTreeMap::from([
("alertmanagerConfig".to_string(), "enabled".to_string()),
("client".to_string(), "prometheus".to_string()),
(
"app.kubernetes.io/name".to_string(),
"kube-state-metrics".to_string(),
),
])),
namespace: Some(self.namespace.clone()),
..Default::default()
},
spec: ServiceMonitorSpec::default(),
};
monitors.push(default_service_monitor);
for monitor in monitors.iter() { for monitor in monitors.iter() {
client client
.apply(monitor, Some(&self.namespace.clone())) .apply(monitor, Some(&self.namespace.clone()))
@ -396,10 +429,16 @@ impl CRDApplicationAlertingInterpret {
rules: &Vec<RuleGroup>, rules: &Vec<RuleGroup>,
client: &Arc<K8sClient>, client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
let prom_rule_spec = PrometheusRuleSpec { let mut prom_rule_spec = PrometheusRuleSpec {
groups: rules.clone(), groups: rules.clone(),
}; };
let default_rules_group = RuleGroup {
name: format!("default-rules"),
rules: build_default_application_rules(),
};
prom_rule_spec.groups.push(default_rules_group);
let prom_rules = PrometheusRule { let prom_rules = PrometheusRule {
metadata: ObjectMeta { metadata: ObjectMeta {
name: Some(self.namespace.clone()), name: Some(self.namespace.clone()),
@ -460,7 +499,7 @@ impl CRDApplicationAlertingInterpret {
access: "proxy".to_string(), access: "proxy".to_string(),
database: Some("prometheus".to_string()), database: Some("prometheus".to_string()),
json_data: Some(json_data), json_data: Some(json_data),
//this is fragile //TODO this is fragile
name: format!("prometheus-{}-0", self.namespace.clone()), name: format!("prometheus-{}-0", self.namespace.clone()),
r#type: "prometheus".to_string(), r#type: "prometheus".to_string(),
url: format!( url: format!(
@ -529,7 +568,7 @@ impl CRDApplicationAlertingInterpret {
let alertmanager_config: AlertmanagerConfig = receiver let alertmanager_config: AlertmanagerConfig = receiver
.configure_receiver(&client, self.namespace.clone()) .configure_receiver(&client, self.namespace.clone())
.await; .await;
let sender = CRDAlertManager { let sender = CRDPrometheus {
alertmanager_configs: alertmanager_config, alertmanager_configs: alertmanager_config,
namespace: self.namespace.clone(), namespace: self.namespace.clone(),
client: client.clone(), client: client.clone(),

View File

@ -0,0 +1,580 @@
use std::fs;
use std::{collections::BTreeMap, sync::Arc};
use tempfile::tempdir;
use async_trait::async_trait;
use kube::api::ObjectMeta;
use log::{debug, info};
use serde::Serialize;
use tokio::process::Command;
use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::{
AlertmanagerConfig, CRDAlertManagerReceiver, CRDPrometheus,
};
use crate::modules::monitoring::kube_prometheus::crd::crd_default_rules::build_default_application_rules;
use crate::modules::monitoring::kube_prometheus::crd::crd_grafana::{
Grafana, GrafanaDashboard, GrafanaDashboardSpec, GrafanaDatasource, GrafanaDatasourceConfig,
GrafanaDatasourceSpec, GrafanaSpec,
};
use crate::modules::monitoring::kube_prometheus::crd::crd_prometheus_rules::{
PrometheusRule, PrometheusRuleSpec, RuleGroup,
};
use crate::modules::monitoring::kube_prometheus::crd::grafana_default_dashboard::build_default_dashboard;
use crate::modules::monitoring::kube_prometheus::crd::service_monitor::{
ServiceMonitor, ServiceMonitorSpec,
};
use crate::topology::{K8sclient, Topology, k8s::K8sClient};
use crate::{
data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
modules::monitoring::kube_prometheus::crd::{
crd_alertmanagers::{Alertmanager, AlertmanagerSpec},
crd_prometheuses::{
AlertmanagerEndpoints, LabelSelector, Prometheus, PrometheusSpec,
PrometheusSpecAlerting,
},
role::{build_prom_role, build_prom_rolebinding, build_prom_service_account},
},
score::Score,
};
use super::prometheus::{PrometheusApplicationMonitoring, PrometheusMonitoring};
#[derive(Clone, Debug, Serialize)]
pub struct K8sPrometheusCRDAlertingScore {
pub sender: CRDPrometheus,
pub receivers: Vec<Box<dyn CRDAlertManagerReceiver>>,
pub service_monitors: Vec<ServiceMonitor>,
pub prometheus_rules: Vec<RuleGroup>,
}
impl<T: Topology + K8sclient + PrometheusApplicationMonitoring<CRDPrometheus>> Score<T>
for K8sPrometheusCRDAlertingScore
{
fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> {
Box::new(K8sPrometheusCRDAlertingInterpret {
sender: self.sender.clone(),
receivers: self.receivers.clone(),
service_monitors: self.service_monitors.clone(),
prometheus_rules: self.prometheus_rules.clone(),
})
}
fn name(&self) -> String {
"CRDApplicationAlertingScore".into()
}
}
#[derive(Clone, Debug)]
pub struct K8sPrometheusCRDAlertingInterpret {
pub sender: CRDPrometheus,
pub receivers: Vec<Box<dyn CRDAlertManagerReceiver>>,
pub service_monitors: Vec<ServiceMonitor>,
pub prometheus_rules: Vec<RuleGroup>,
}
#[async_trait]
impl<T: Topology + K8sclient + PrometheusApplicationMonitoring<CRDPrometheus>> Interpret<T>
for K8sPrometheusCRDAlertingInterpret
{
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await.unwrap();
self.ensure_grafana_operator().await?;
self.install_prometheus(&client).await?;
self.install_alert_manager(&client).await?;
self.install_client_kube_metrics().await?;
self.install_grafana(&client).await?;
self.install_receivers(&self.receivers, &client).await?;
self.install_rules(&self.prometheus_rules, &client).await?;
self.install_monitors(self.service_monitors.clone(), &client)
.await?;
Ok(Outcome::success(format!(
"deployed application monitoring composants"
)))
}
fn get_name(&self) -> InterpretName {
todo!()
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl K8sPrometheusCRDAlertingInterpret {
async fn crd_exists(&self, crd: &str) -> bool {
let output = Command::new("kubectl")
.args(["get", "crd", crd])
.output()
.await;
matches!(output, Ok(o) if o.status.success())
}
async fn install_chart(
&self,
chart_path: String,
chart_name: String,
) -> Result<(), InterpretError> {
let temp_dir =
tempdir().map_err(|e| InterpretError::new(format!("Tempdir error: {}", e)))?;
let temp_path = temp_dir.path().to_path_buf();
debug!("Using temp directory: {}", temp_path.display());
let chart = format!("{}/{}", chart_path, chart_name);
let pull_output = Command::new("helm")
.args(&["pull", &chart, "--destination", temp_path.to_str().unwrap()])
.output()
.await
.map_err(|e| InterpretError::new(format!("Helm pull error: {}", e)))?;
if !pull_output.status.success() {
return Err(InterpretError::new(format!(
"Helm pull failed: {}",
String::from_utf8_lossy(&pull_output.stderr)
)));
}
let tgz_path = fs::read_dir(&temp_path)
.unwrap()
.filter_map(|entry| {
let entry = entry.ok()?;
let path = entry.path();
if path.extension()? == "tgz" {
Some(path)
} else {
None
}
})
.next()
.ok_or_else(|| InterpretError::new("Could not find pulled Helm chart".into()))?;
debug!("Installing chart from: {}", tgz_path.display());
let install_output = Command::new("helm")
.args(&[
"install",
&chart_name,
tgz_path.to_str().unwrap(),
"--namespace",
&self.sender.namespace.clone(),
"--create-namespace",
"--wait",
"--atomic",
])
.output()
.await
.map_err(|e| InterpretError::new(format!("Helm install error: {}", e)))?;
if !install_output.status.success() {
return Err(InterpretError::new(format!(
"Helm install failed: {}",
String::from_utf8_lossy(&install_output.stderr)
)));
}
debug!(
"Installed chart {}/{} in namespace: {}",
&chart_path,
&chart_name,
self.sender.namespace.clone()
);
Ok(())
}
async fn ensure_grafana_operator(&self) -> Result<Outcome, InterpretError> {
if self.crd_exists("grafanas.grafana.integreatly.org").await {
debug!("grafana CRDs already exist — skipping install.");
return Ok(Outcome::success("Grafana CRDs already exist".to_string()));
}
let _ = Command::new("helm")
.args(&[
"repo",
"add",
"grafana-operator",
"https://grafana.github.io/helm-charts",
])
.output()
.await
.unwrap();
let _ = Command::new("helm")
.args(&["repo", "update"])
.output()
.await
.unwrap();
let output = Command::new("helm")
.args(&[
"install",
"grafana-operator",
"grafana-operator/grafana-operator",
"--namespace",
&self.sender.namespace.clone(),
"--create-namespace",
"--set",
"namespaceScope=true",
])
.output()
.await
.unwrap();
if !output.status.success() {
return Err(InterpretError::new(format!(
"helm install failed:\nstdout: {}\nstderr: {}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr)
)));
}
Ok(Outcome::success(format!(
"installed grafana operator in ns {}",
self.sender.namespace.clone().clone()

is there a reason why these double clone().clone()? there's a lot of them in this file

is there a reason why these double `clone().clone()`? there's a lot of them in this file
)))
}
async fn install_prometheus(&self, client: &Arc<K8sClient>) -> Result<Outcome, InterpretError> {
debug!(
"installing crd-prometheuses in namespace {}",
self.sender.namespace.clone().clone()
);
debug!("building role/rolebinding/serviceaccount for crd-prometheus");
let rolename = format!("{}-prom", self.sender.namespace.clone().clone());
let sa_name = format!("{}-prom-sa", self.sender.namespace.clone().clone());
let role = build_prom_role(rolename.clone(), self.sender.namespace.clone().clone());
let rolebinding = build_prom_rolebinding(
rolename.clone(),
self.sender.namespace.clone().clone(),
sa_name.clone(),
);
let sa = build_prom_service_account(sa_name.clone(), self.sender.namespace.clone().clone());
let prom_spec = PrometheusSpec {
alerting: Some(PrometheusSpecAlerting {
alertmanagers: Some(vec![AlertmanagerEndpoints {
name: Some(format!("alertmanager-operated")),
namespace: Some(format!("{}", self.sender.namespace.clone().clone())),
port: Some("web".into()),
scheme: Some("http".into()),
}]),
}),
service_account_name: sa_name.clone(),
service_monitor_namespace_selector: Some(LabelSelector {
match_labels: BTreeMap::from([(
"kubernetes.io/metadata.name".to_string(),
format!("{}", self.sender.namespace.clone().clone()),
)]),
match_expressions: vec![],
}),
service_monitor_selector: Some(LabelSelector {
match_labels: BTreeMap::from([("client".to_string(), "prometheus".to_string())]),
..Default::default()
}),
service_discovery_role: Some("Endpoints".into()),
pod_monitor_selector: Some(LabelSelector {
match_labels: BTreeMap::from([("client".to_string(), "prometheus".to_string())]),
..Default::default()
}),
rule_selector: Some(LabelSelector {
match_labels: BTreeMap::from([("role".to_string(), "prometheus-rule".to_string())]),
..Default::default()
}),
rule_namespace_selector: Some(LabelSelector {
match_labels: BTreeMap::from([(
"kubernetes.io/metadata.name".to_string(),
format!("{}", self.sender.namespace.clone().clone()),
)]),
match_expressions: vec![],
}),
};
let prom = Prometheus {
metadata: ObjectMeta {
name: Some(self.sender.namespace.clone().clone()),
labels: Some(std::collections::BTreeMap::from([
("alertmanagerConfig".to_string(), "enabled".to_string()),
("client".to_string(), "prometheus".to_string()),
])),
namespace: Some(self.sender.namespace.clone().clone()),
..Default::default()
},
spec: prom_spec,
};
client
.apply(&role, Some(&self.sender.namespace.clone().clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
info!(
"installed prometheus role: {:#?} in ns {:#?}",
role.metadata.name.unwrap(),
role.metadata.namespace.unwrap()
);
client
.apply(&rolebinding, Some(&self.sender.namespace.clone().clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
info!(
"installed prometheus rolebinding: {:#?} in ns {:#?}",
rolebinding.metadata.name.unwrap(),
rolebinding.metadata.namespace.unwrap()
);
client
.apply(&sa, Some(&self.sender.namespace.clone().clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
info!(
"installed prometheus service account: {:#?} in ns {:#?}",
sa.metadata.name.unwrap(),
sa.metadata.namespace.unwrap()
);
client
.apply(&prom, Some(&self.sender.namespace.clone().clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
info!(
"installed prometheus: {:#?} in ns {:#?}",
&prom.metadata.name.clone().unwrap(),
&prom.metadata.namespace.clone().unwrap()
);
Ok(Outcome::success(format!(
"successfully deployed crd-prometheus {:#?}",
prom
)))
}
async fn install_alert_manager(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let am = Alertmanager {
metadata: ObjectMeta {
name: Some(self.sender.namespace.clone().clone()),
labels: Some(std::collections::BTreeMap::from([(
"alertmanagerConfig".to_string(),
"enabled".to_string(),
)])),
namespace: Some(self.sender.namespace.clone().clone()),
..Default::default()
},
spec: AlertmanagerSpec::default(),
};
client
.apply(&am, Some(&self.sender.namespace.clone().clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
Ok(Outcome::success(format!(
"successfully deployed service monitor {:#?}",
am.metadata.name
)))
}
async fn install_monitors(
&self,
mut monitors: Vec<ServiceMonitor>,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let default_service_monitor = ServiceMonitor {
metadata: ObjectMeta {
name: Some(self.sender.namespace.clone().clone()),
labels: Some(std::collections::BTreeMap::from([
("alertmanagerConfig".to_string(), "enabled".to_string()),
("client".to_string(), "prometheus".to_string()),
(
"app.kubernetes.io/name".to_string(),
"kube-state-metrics".to_string(),
),
])),
namespace: Some(self.sender.namespace.clone().clone()),
..Default::default()
},
spec: ServiceMonitorSpec::default(),
};
monitors.push(default_service_monitor);
for monitor in monitors.iter() {
client
.apply(monitor, Some(&self.sender.namespace.clone().clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
}
Ok(Outcome::success(
"succesfully deployed service monitors".to_string(),
))
}
async fn install_rules(
&self,
rules: &Vec<RuleGroup>,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let mut prom_rule_spec = PrometheusRuleSpec {
groups: rules.clone(),
};
let default_rules_group = RuleGroup {
name: format!("default-rules"),
rules: build_default_application_rules(),
};
prom_rule_spec.groups.push(default_rules_group);
let prom_rules = PrometheusRule {
metadata: ObjectMeta {
name: Some(self.sender.namespace.clone().clone()),
labels: Some(std::collections::BTreeMap::from([
("alertmanagerConfig".to_string(), "enabled".to_string()),
("role".to_string(), "prometheus-rule".to_string()),
])),
namespace: Some(self.sender.namespace.clone().clone()),
..Default::default()
},
spec: prom_rule_spec,
};
client
.apply(&prom_rules, Some(&self.sender.namespace.clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
Ok(Outcome::success(format!(
"successfully deployed rules {:#?}",
prom_rules.metadata.name
)))
}
async fn install_client_kube_metrics(&self) -> Result<Outcome, InterpretError> {
self.install_chart(
"oci://hub.nationtech.io/harmony".to_string(),
"nt-kube-metrics".to_string(),
)
.await?;
Ok(Outcome::success(format!(
"Installed client kube metrics in ns {}",
&self.sender.namespace.clone()
)))
}
async fn install_grafana(&self, client: &Arc<K8sClient>) -> Result<Outcome, InterpretError> {
let mut label = BTreeMap::new();
label.insert("dashboards".to_string(), "grafana".to_string());
let labels = LabelSelector {
match_labels: label.clone(),
match_expressions: vec![],
};
let mut json_data = BTreeMap::new();
json_data.insert("timeInterval".to_string(), "5s".to_string());
let namespace = self.sender.namespace.clone().clone();
let json = build_default_dashboard(&namespace);
let graf_data_source = GrafanaDatasource {
metadata: ObjectMeta {
name: Some(format!(
"grafana-datasource-{}",
self.sender.namespace.clone().clone()
)),
namespace: Some(self.sender.namespace.clone().clone()),
..Default::default()
},
spec: GrafanaDatasourceSpec {
instance_selector: labels.clone(),
allow_cross_namespace_import: Some(false),
datasource: GrafanaDatasourceConfig {
access: "proxy".to_string(),
database: Some("prometheus".to_string()),
json_data: Some(json_data),
//this is fragile
name: format!("prometheus-{}-0", self.sender.namespace.clone().clone()),
r#type: "prometheus".to_string(),
url: format!(
"http://prometheus-operated.{}.svc.cluster.local:9090",
self.sender.namespace.clone().clone()
),
},
},
};
client
.apply(&graf_data_source, Some(&self.sender.namespace.clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
let graf_dashboard = GrafanaDashboard {
metadata: ObjectMeta {
name: Some(format!(
"grafana-dashboard-{}",
self.sender.namespace.clone().clone()
)),
namespace: Some(self.sender.namespace.clone().clone()),
..Default::default()
},
spec: GrafanaDashboardSpec {
resync_period: Some("30s".to_string()),
instance_selector: labels.clone(),
json,
},
};
client
.apply(&graf_dashboard, Some(&self.sender.namespace.clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
let grafana = Grafana {
metadata: ObjectMeta {
name: Some(format!("grafana-{}", self.sender.namespace.clone().clone())),
namespace: Some(self.sender.namespace.clone().clone()),
labels: Some(label.clone()),
..Default::default()
},
spec: GrafanaSpec {
config: None,
admin_user: None,
admin_password: None,
ingress: None,
persistence: None,
resources: None,
},
};
client
.apply(&grafana, Some(&self.sender.namespace.clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
Ok(Outcome::success(format!(
"successfully deployed grafana instance {:#?}",
grafana.metadata.name
)))
}
async fn install_receivers(
&self,
receivers: &Vec<Box<dyn CRDAlertManagerReceiver>>,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
for receiver in receivers.iter() {
let alertmanager_config: AlertmanagerConfig = receiver
.configure_receiver(&client, self.sender.namespace.clone().clone())
.await;
let sender = CRDPrometheus {
alertmanager_configs: alertmanager_config,
namespace: self.sender.namespace.clone().clone(),
client: self.sender.client.clone(),
};
receiver.install(&sender).await.map_err(|err| {
InterpretError::new(format!("failed to install receiver: {}", err))
})?;
}
Ok(Outcome::success(format!("successfully deployed receivers")))
}
}

View File

@ -1 +1,4 @@
pub mod alerts; pub mod alerts;
pub mod k3d_prometheus_alerting_score;
pub mod k8s_prometheus_alerting_score;
pub mod prometheus;

View File

@ -0,0 +1,27 @@
use async_trait::async_trait;
use crate::{
interpret::{InterpretError, Outcome},
inventory::Inventory,
modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::CRDAlertManagerReceiver,
topology::oberservability::monitoring::{AlertReceiver, AlertSender},
};
#[async_trait]
pub trait PrometheusMonitoring<S: AlertSender> {
Review

install_prometheus seems a bit inaccurate: we're not installing prometheus here, we're installing the receivers to run on prometheus

maybe should be renamed to install_receivers or install_monitoring or something similar

`install_prometheus` seems a bit inaccurate: we're not installing prometheus here, we're installing the receivers to run on prometheus maybe should be renamed to `install_receivers` or `install_monitoring` or something similar
async fn ensure_prometheus_operator(&self, sender: &S) -> Result<Outcome, InterpretError>;
}
#[async_trait]
pub trait PrometheusApplicationMonitoring<S: AlertSender>: PrometheusMonitoring<S> {
letian marked this conversation as resolved
Review

This trait shouldn't be extending PrometheusMonitoring as they're not really related to each other. It's 2 different capabilities.

And actually if you dig a bit more, you see that ensure_prometheus_operator is used only internally by K8sAnywhere itself. So it should be a private method there instead of a separate capability.

This trait shouldn't be extending `PrometheusMonitoring` as they're not really related to each other. It's 2 different capabilities. And actually if you dig a bit more, you see that `ensure_prometheus_operator` is used only internally by `K8sAnywhere` itself. So it should be a private method there instead of a separate capability.
async fn configure_receivers(
&self,
receivers: Option<Vec<Box<dyn AlertReceiver<S>>>>,
) -> Option<Vec<Box<dyn CRDAlertManagerReceiver>>>;
async fn install_prometheus(
&self,
sender: &S,
inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<S>>>>,
) -> Result<Outcome, InterpretError>;
}