Monitor an application within a tenant #86

Merged
letian merged 22 commits from feat/crd-alertmanager-configs into master 2025-08-04 21:42:05 +00:00
17 changed files with 856 additions and 89 deletions
Showing only changes of commit 114219385f - Show all commits

View File

@ -51,8 +51,8 @@ async fn main() {
let service_monitor_endpoint = ServiceMonitorEndpoint { let service_monitor_endpoint = ServiceMonitorEndpoint {
port: Some("80".to_string()), port: Some("80".to_string()),
path: "/metrics".to_string(), path: Some("/metrics".to_string()),
scheme: HTTPScheme::HTTP, scheme: Some(HTTPScheme::HTTP),
..Default::default() ..Default::default()
}; };

View File

@ -54,8 +54,8 @@ async fn main() {
let service_monitor_endpoint = ServiceMonitorEndpoint { let service_monitor_endpoint = ServiceMonitorEndpoint {
port: Some("80".to_string()), port: Some("80".to_string()),
path: "/metrics".to_string(), path: Some("/metrics".to_string()),
scheme: HTTPScheme::HTTP, scheme: Some(HTTPScheme::HTTP),
..Default::default() ..Default::default()
}; };

View File

@ -6,7 +6,7 @@ use harmony::{
modules::{ modules::{
application::{ application::{
ApplicationScore, RustWebFramework, RustWebapp, ApplicationScore, RustWebFramework, RustWebapp,
features::{ContinuousDelivery, PrometheusApplicationMonitoring}, features::{ContinuousDelivery, Monitoring},
}, },
monitoring::alert_channel::{ monitoring::alert_channel::{
discord_alert_channel::DiscordWebhook, webhook_receiver::WebhookReceiver, discord_alert_channel::DiscordWebhook, webhook_receiver::WebhookReceiver,
@ -46,9 +46,11 @@ async fn main() {
Box::new(ContinuousDelivery { Box::new(ContinuousDelivery {
application: application.clone(), application: application.clone(),
}), // TODO add monitoring, backups, multisite ha, etc }), // TODO add monitoring, backups, multisite ha, etc
Box::new(PrometheusApplicationMonitoring { Box::new(Monitoring {
application: application.clone(), application: application.clone(),
alert_receiver: vec![Box::new(discord_receiver), Box::new(webhook_receiver)], alert_receiver: vec![Box::new(discord_receiver), Box::new(webhook_receiver)],
service_monitors: vec![],
alert_rules: vec![],
}), }),
// TODO add backups, multisite ha, etc // TODO add backups, multisite ha, etc
], ],

View File

@ -1,9 +1,13 @@
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use crate::modules::monitoring::kube_prometheus::crd::crd_default_rules::{
use base64::{Engine as _, engine::general_purpose}; build_rule_container_restarting, build_rule_pod_failed,
use log::{debug, info}; };
use crate::modules::monitoring::kube_prometheus::crd::crd_prometheus_rules::RuleGroup;
use crate::modules::monitoring::kube_prometheus::service_monitor::{
ServiceMonitor, ServiceMonitorSpec,
};
use crate::modules::monitoring::kube_prometheus::types::{Selector, ServiceMonitorEndpoint};
use crate::{ use crate::{
inventory::Inventory, inventory::Inventory,
modules::{ modules::{
@ -13,7 +17,7 @@ use crate::{
kube_prometheus::{ kube_prometheus::{
alert_manager_config::{CRDAlertManager, CRDAlertManagerReceiver}, alert_manager_config::{CRDAlertManager, CRDAlertManagerReceiver},
helm_prometheus_application_alerting::HelmPrometheusApplicationAlertingScore, helm_prometheus_application_alerting::HelmPrometheusApplicationAlertingScore,
types::{NamespaceSelector, ServiceMonitor}, types::{NamespaceSelector, ServiceMonitor as KubePrometheusServiceMonitor},
}, },
ntfy::ntfy::NtfyScore, ntfy::ntfy::NtfyScore,
}, },
@ -24,23 +28,31 @@ use crate::{
tenant::TenantManager, tenant::TenantManager,
}, },
}; };
use async_trait::async_trait;
use base64::{Engine as _, engine::general_purpose};
use kube::api::ObjectMeta;
use log::{debug, info};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct PrometheusApplicationMonitoring { pub struct Monitoring {
pub application: Arc<dyn OCICompliant>, pub application: Arc<dyn OCICompliant>,
pub alert_receiver: Vec<Box<dyn CRDAlertManagerReceiver>>, pub alert_receiver: Vec<Box<dyn CRDAlertManagerReceiver>>,
pub service_monitors: Vec<ServiceMonitor>,
pub alert_rules: Vec<RuleGroup>,
} }
#[async_trait] #[async_trait]
impl<T: Topology + HelmCommand + 'static + TenantManager + K8sclient + std::fmt::Debug> impl<T: Topology + HelmCommand + 'static + TenantManager + K8sclient + std::fmt::Debug>
ApplicationFeature<T> for PrometheusApplicationMonitoring ApplicationFeature<T> for Monitoring
{ {
async fn ensure_installed(&self, topology: &T) -> Result<(), String> { async fn ensure_installed(&self, topology: &T) -> Result<(), String> {
info!("Ensuring monitoring is available for application"); info!("Ensuring monitoring is available for application");
let namespace = self.application.name().clone();
let mut alerting_score = HelmPrometheusApplicationAlertingScore { let mut alerting_score = HelmPrometheusApplicationAlertingScore {
namespace: self.application.name().clone(), namespace: namespace.clone(),
receivers: self.alert_receiver.clone(), receivers: self.alert_receiver.clone(),
service_monitors: self.service_monitors.clone(),
prometheus_rules: self.alert_rules.clone(),
}; };
let ntfy = NtfyScore { let ntfy = NtfyScore {
// namespace: topology // namespace: topology
@ -91,14 +103,27 @@ impl<T: Topology + HelmCommand + 'static + TenantManager + K8sclient + std::fmt:
//TODO add service monitors to PrometheusApplicationMonitoring which can be //TODO add service monitors to PrometheusApplicationMonitoring which can be
//deployed for the namespace using prometheus crd-servicemonitors //deployed for the namespace using prometheus crd-servicemonitors
let mut service_monitor = ServiceMonitor::default(); let service_monitor = ServiceMonitor {
service_monitor.namespace_selector = Some(NamespaceSelector { metadata: ObjectMeta {
any: true, name: Some(self.application.name().clone()),
match_names: vec![], labels: Some(std::collections::BTreeMap::from([(
}); "alertmanagerConfig".to_string(),
"enabled".to_string(),
)])),
namespace: Some(namespace),
..Default::default()
},
spec: ServiceMonitorSpec::default(),
};
service_monitor.name = "rust-webapp".to_string(); alerting_score.service_monitors.push(service_monitor);
let rules_group = RuleGroup {
name: format!("{}-rules", self.application.name().clone()),
rules: vec![build_rule_container_restarting(), build_rule_pod_failed()],
};
alerting_score.prometheus_rules.push(rules_group);
alerting_score alerting_score
.create_interpret() .create_interpret()
.execute(&Inventory::empty(), topology) .execute(&Inventory::empty(), topology)

View File

@ -17,8 +17,6 @@ use crate::{
}, },
}; };
use super::types::AlertManagerConfig;
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)] #[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube( #[kube(
group = "monitoring.coreos.com", group = "monitoring.coreos.com",

View File

@ -0,0 +1,53 @@
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use super::crd_prometheuses::LabelSelector;
/// Rust CRD for `Alertmanager` from Prometheus Operator
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "monitoring.coreos.com",
version = "v1",
kind = "Alertmanager",
plural = "alertmanagers",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct AlertmanagerSpec {
/// Number of replicas for HA
pub replicas: i32,
/// Selectors for AlertmanagerConfig CRDs
#[serde(default, skip_serializing_if = "Option::is_none")]
pub alertmanager_config_selector: Option<LabelSelector>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub alertmanager_config_namespace_selector: Option<LabelSelector>,
/// Optional pod template metadata (annotations, labels)
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pod_metadata: Option<LabelSelector>,
/// Optional topology spread settings
#[serde(default, skip_serializing_if = "Option::is_none")]
pub version: Option<String>,
}
impl Default for AlertmanagerSpec {
fn default() -> Self {
AlertmanagerSpec {
replicas: 1,
// Match all AlertmanagerConfigs in the same namespace
alertmanager_config_namespace_selector: None,
// Empty selector matches all AlertmanagerConfigs in that namespace
alertmanager_config_selector: Some(LabelSelector::default()),
pod_metadata: None,
version: None,
}
}
}

View File

@ -0,0 +1,38 @@
use std::collections::BTreeMap;
use super::crd_prometheus_rules::Rule;
pub fn build_rule_container_restarting() -> Rule {
Rule {
alert: Some("ContainerRestarting".into()),
expr: Some("increase(kube_pod_container_status_restarts_total[5m]) > 3".into()),
for_: Some("5m".into()),
labels: Some(BTreeMap::from([("severity".into(), "warning".into())])),
annotations: Some(BTreeMap::from([
(
"summary".into(),
"Container is restarting frequently".into(),
),
(
"description".into(),
"Container in this namespace is restarting more than 3 times in 5 minutes.".into(),
),
])),
}
}
pub fn build_rule_pod_failed() -> Rule {
Rule {
alert: Some("PodFailed".into()),
expr: Some("kube_pod_status_phase{phase=\"Failed\"} > 0".into()),
for_: Some("0m".into()),
labels: Some(BTreeMap::from([("severity".into(), "critical".into())])),
annotations: Some(BTreeMap::from([
("summary".into(), "A pod has failed".into()),
(
"description".into(),
"One or more pods are in Failed phase.".into(),
),
])),
}
}

View File

@ -0,0 +1,170 @@
use std::collections::BTreeMap;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "grafana.integreatly.org",
version = "v1beta1",
kind = "Grafana",
plural = "grafanas",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config: Option<GrafanaConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub admin_user: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub admin_password: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ingress: Option<GrafanaIngress>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub persistence: Option<GrafanaPersistence>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub resources: Option<ResourceRequirements>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub log: Option<GrafanaLogConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub security: Option<GrafanaSecurityConfig>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaLogConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub mode: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub level: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaSecurityConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub admin_user: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub admin_password: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaIngress {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub enabled: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub hosts: Option<Vec<String>>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaPersistence {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub enabled: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub storage_class_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub size: Option<String>,
}
// ------------------------------------------------------------------------------------------------
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "grafana.integreatly.org",
version = "v1beta1",
kind = "GrafanaDashboard",
plural = "grafanadashboards",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaDashboardSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub resync_period: Option<String>,
pub instance_selector: LabelSelector,
pub json: String,
}
// ------------------------------------------------------------------------------------------------
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "grafana.integreatly.org",
version = "v1beta1",
kind = "GrafanaDatasource",
plural = "grafanadatasources",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaDatasourceSpec {
pub instance_selector: LabelSelector,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub allow_cross_namespace_import: Option<bool>,
pub datasource: GrafanaDatasourceConfig,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaDatasourceConfig {
pub access: String,
pub database: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub json_data: Option<BTreeMap<String, serde_json::Value>>,
pub name: String,
pub r#type: String,
pub url: String,
}
// ------------------------------------------------------------------------------------------------
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct LabelSelector {
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub match_labels: BTreeMap<String, String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub match_expressions: Vec<LabelSelectorRequirement>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct LabelSelectorRequirement {
pub key: String,
pub operator: String,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub values: Vec<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, Default)]
#[serde(rename_all = "camelCase")]
pub struct ResourceRequirements {
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub limits: BTreeMap<String, String>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub requests: BTreeMap<String, String>,
}

View File

@ -0,0 +1,54 @@
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use super::crd_default_rules::{build_rule_container_restarting, build_rule_pod_failed};
#[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[kube(
group = "monitoring.coreos.com",
version = "v1",
kind = "PrometheusRule",
plural = "prometheusrules",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct PrometheusRuleSpec {
pub groups: Vec<RuleGroup>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct RuleGroup {
pub name: String,
pub rules: Vec<Rule>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Rule {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub alert: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub expr: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub for_: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub labels: Option<std::collections::BTreeMap<String, String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub annotations: Option<std::collections::BTreeMap<String, String>>,
}
impl PrometheusRuleSpec {
pub fn with_default_rules() -> Self {
PrometheusRuleSpec {
groups: vec![RuleGroup {
name: "default.rules".into(),
rules: vec![build_rule_container_restarting(), build_rule_pod_failed()],
}],
}
}
}

View File

@ -0,0 +1,78 @@
use std::collections::BTreeMap;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::modules::monitoring::kube_prometheus::types::Operator;
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "monitoring.coreos.com",
version = "v1",
kind = "Prometheus",
plural = "prometheuses",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct PrometheusSpec {
pub service_account_name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub service_monitor_namespace_selector: Option<LabelSelector>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub service_monitor_selector: Option<LabelSelector>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub service_discovery_role: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pod_monitor_selector: Option<LabelSelector>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub rule_selector: Option<LabelSelector>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub rule_namespace_selector: Option<LabelSelector>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
#[serde(rename_all = "camelCase")]
pub struct LabelSelector {
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub match_labels: BTreeMap<String, String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub match_expressions: Vec<LabelSelectorRequirement>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct LabelSelectorRequirement {
pub key: String,
pub operator: Operator,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub values: Vec<String>,
}
impl Default for PrometheusSpec {
fn default() -> Self {
PrometheusSpec {
service_account_name: "prometheus".into(),
// null means "only my namespace"
service_monitor_namespace_selector: None,
// empty selector means match all ServiceMonitors in that namespace
service_monitor_selector: Some(LabelSelector::default()),
service_discovery_role: Some("Endpoints".into()),
pod_monitor_selector: None,
rule_selector: None,
rule_namespace_selector: Some(LabelSelector::default()),
}
}
}

View File

@ -0,0 +1,22 @@
use std::str::FromStr;
use non_blank_string_rs::NonBlankString;
use crate::modules::helm::chart::HelmChartScore;
pub fn grafana_operator_helm_chart_score(ns: String) -> HelmChartScore {
HelmChartScore {
namespace: Some(NonBlankString::from_str(&ns).unwrap()),
release_name: NonBlankString::from_str("kube-prometheus").unwrap(),
chart_name: NonBlankString::from_str(
"grafana-operator oci://ghcr.io/grafana/helm-charts/grafana-operator",
)
.unwrap(),
chart_version: None,
values_overrides: None,
values_yaml: None,
create_namespace: true,
install_only: true,
repository: None,
}
}

View File

@ -0,0 +1,7 @@
pub mod crd_alertmanagers;
pub mod crd_default_rules;
pub mod crd_grafana;
pub mod crd_prometheus_rules;
pub mod crd_prometheuses;
pub mod grafana_operator;
pub mod role;

View File

@ -0,0 +1,62 @@
use k8s_openapi::api::{
core::v1::ServiceAccount,
rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
};
use kube::api::ObjectMeta;
pub fn build_prom_role(role_name: String, namespace: String) -> Role {
Role {
metadata: ObjectMeta {
name: Some(role_name),
namespace: Some(namespace),
..Default::default()
},
rules: Some(vec![PolicyRule {
api_groups: Some(vec!["".into()]), // core API group
resources: Some(vec!["services".into(), "endpoints".into(), "pods".into()]),
verbs: vec!["get".into(), "list".into(), "watch".into()],
..Default::default()
}]),
}
}
pub fn build_prom_rolebinding(
role_name: String,
namespace: String,
service_account_name: String,
) -> RoleBinding {
RoleBinding {
metadata: ObjectMeta {
name: Some(format!("{}-rolebinding", role_name)),
namespace: Some(namespace.clone()),
..Default::default()
},
role_ref: RoleRef {
api_group: "rbac.authorization.k8s.io".into(),
kind: "Role".into(),
name: role_name,
},
subjects: Some(vec![Subject {
kind: "ServiceAccount".into(),
name: service_account_name,
namespace: Some(namespace.clone()),
..Default::default()
}]),
}
}
pub fn build_prom_service_account(
service_account_name: String,
namespace: String,
) -> ServiceAccount {
ServiceAccount {
automount_service_account_token: None,
image_pull_secrets: None,
metadata: ObjectMeta {
name: Some(service_account_name),
namespace: Some(namespace),
..Default::default()
},
secrets: None,
}
}

View File

@ -1,20 +1,28 @@
use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use kube::{Api, api::ObjectMeta}; use kube::{Api, api::ObjectMeta};
use log::debug; use log::{debug, info};
use serde::Serialize; use serde::Serialize;
use crate::{ use crate::{
data::{Id, Version}, data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory, inventory::Inventory,
modules::monitoring::kube_prometheus::crd::{
crd_alertmanagers::{Alertmanager, AlertmanagerSpec},
crd_prometheuses::{Prometheus, PrometheusSpec},
role::{build_prom_role, build_prom_rolebinding, build_prom_service_account},
},
score::Score, score::Score,
topology::{K8sclient, Topology, oberservability::monitoring::AlertReceiver}, topology::{K8sclient, Topology, k8s::K8sClient, oberservability::monitoring::AlertReceiver},
}; };
use super::{ use super::{
alert_manager_config::{ alert_manager_config::{
AlertmanagerConfig, AlertmanagerConfigSpec, CRDAlertManager, CRDAlertManagerReceiver, AlertmanagerConfig, AlertmanagerConfigSpec, CRDAlertManager, CRDAlertManagerReceiver,
}, },
crd::crd_prometheus_rules::{PrometheusRule, PrometheusRuleSpec, RuleGroup},
prometheus::KubePrometheus, prometheus::KubePrometheus,
}; };
@ -22,6 +30,8 @@ use super::{
pub struct HelmPrometheusApplicationAlertingScore { pub struct HelmPrometheusApplicationAlertingScore {
pub namespace: String, pub namespace: String,
pub receivers: Vec<Box<dyn CRDAlertManagerReceiver>>, pub receivers: Vec<Box<dyn CRDAlertManagerReceiver>>,
pub service_monitors: Vec<super::service_monitor::ServiceMonitor>,
pub prometheus_rules: Vec<RuleGroup>,
} }
impl<T: Topology + K8sclient> Score<T> for HelmPrometheusApplicationAlertingScore { impl<T: Topology + K8sclient> Score<T> for HelmPrometheusApplicationAlertingScore {
@ -29,6 +39,8 @@ impl<T: Topology + K8sclient> Score<T> for HelmPrometheusApplicationAlertingScor
Box::new(HelmPrometheusApplicationAlertingInterpret { Box::new(HelmPrometheusApplicationAlertingInterpret {
namespace: self.namespace.clone(), namespace: self.namespace.clone(),
receivers: self.receivers.clone(), receivers: self.receivers.clone(),
service_monitors: self.service_monitors.clone(),
prometheus_rules: self.prometheus_rules.clone(),
}) })
} }
@ -41,6 +53,8 @@ impl<T: Topology + K8sclient> Score<T> for HelmPrometheusApplicationAlertingScor
pub struct HelmPrometheusApplicationAlertingInterpret { pub struct HelmPrometheusApplicationAlertingInterpret {
pub namespace: String, pub namespace: String,
pub receivers: Vec<Box<dyn CRDAlertManagerReceiver>>, pub receivers: Vec<Box<dyn CRDAlertManagerReceiver>>,
pub service_monitors: Vec<super::service_monitor::ServiceMonitor>,
pub prometheus_rules: Vec<RuleGroup>,
} }
#[async_trait] #[async_trait]
@ -51,6 +65,8 @@ impl<T: Topology + K8sclient> Interpret<T> for HelmPrometheusApplicationAlerting
topology: &T, topology: &T,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await.unwrap(); let client = topology.k8s_client().await.unwrap();
self.install_prometheus(&client).await?;
self.install_alert_manager(&client).await?;
for receiver in self.receivers.iter() { for receiver in self.receivers.iter() {
let alertmanager_config: AlertmanagerConfig = receiver let alertmanager_config: AlertmanagerConfig = receiver
.configure_receiver(&client, self.namespace.clone()) .configure_receiver(&client, self.namespace.clone())
@ -64,6 +80,15 @@ impl<T: Topology + K8sclient> Interpret<T> for HelmPrometheusApplicationAlerting
InterpretError::new(format!("failed to install receiver: {}", err)) InterpretError::new(format!("failed to install receiver: {}", err))
})?; })?;
} }
self.install_rules(self.prometheus_rules.clone(), client.clone())
.await
.map_err(|err| InterpretError::new(format!("failed to install rules: {}", err)))?;
debug!("\n\n\n monitors: {:#?}", self.service_monitors.clone());
for monitor in self.service_monitors.iter() {
self.install_monitor(monitor.clone(), client.clone())
.await?;
}
Ok(Outcome::success(format!("deployed alert channels"))) Ok(Outcome::success(format!("deployed alert channels")))
} }
@ -83,3 +108,146 @@ impl<T: Topology + K8sclient> Interpret<T> for HelmPrometheusApplicationAlerting
todo!() todo!()
} }
} }
impl HelmPrometheusApplicationAlertingInterpret {
async fn install_prometheus(&self, client: &Arc<K8sClient>) -> Result<Outcome, InterpretError> {
debug!(
"installing crd-prometheuses in namespace {}",
self.namespace.clone()
);
debug!("building role/rolebinding/serviceaccount for crd-prometheus");
let rolename = format!("{}-prom", self.namespace.clone());
let sa_name = format!("{}-prom-sa", self.namespace.clone());
let role = build_prom_role(rolename.clone(), self.namespace.clone());
let rolebinding =
build_prom_rolebinding(rolename.clone(), self.namespace.clone(), sa_name.clone());
let sa = build_prom_service_account(sa_name.clone(), self.namespace.clone());
let mut prom_spec = PrometheusSpec::default();
prom_spec.service_account_name = sa_name.clone();
let prom = Prometheus {
metadata: ObjectMeta {
name: Some(self.namespace.clone()),
labels: Some(std::collections::BTreeMap::from([(
"alertmanagerConfig".to_string(),
"enabled".to_string(),
)])),
namespace: Some(self.namespace.clone()),
..Default::default()
},
spec: prom_spec,
};
client
.apply(&role, Some(&self.namespace.clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
info!(
"installed prometheus role: {:#?} in ns {:#?}",
role.metadata.name.unwrap(),
role.metadata.namespace.unwrap()
);
client
.apply(&rolebinding, Some(&self.namespace.clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
info!(
"installed prometheus rolebinding: {:#?} in ns {:#?}",
rolebinding.metadata.name.unwrap(),
rolebinding.metadata.namespace.unwrap()
);
client
.apply(&sa, Some(&self.namespace.clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
info!(
"installed prometheus service account: {:#?} in ns {:#?}",
sa.metadata.name.unwrap(),
sa.metadata.namespace.unwrap()
);
client
.apply(&prom, Some(&self.namespace.clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
info!(
"installed prometheus: {:#?} in ns {:#?}",
&prom.metadata.name.clone().unwrap(),
&prom.metadata.namespace.clone().unwrap()
);
Ok(Outcome::success(format!(
"successfully deployed crd-prometheus {:#?}",
prom
)))
}
async fn install_alert_manager(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let am = Alertmanager {
metadata: ObjectMeta {
name: Some(self.namespace.clone()),
labels: Some(std::collections::BTreeMap::from([(
"alertmanagerConfig".to_string(),
"enabled".to_string(),
)])),
namespace: Some(self.namespace.clone()),
..Default::default()
},
spec: AlertmanagerSpec::default(),
};
client
.apply(&am, Some(&self.namespace.clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
Ok(Outcome::success(format!(
"successfully deployed service monitor {:#?}",
am.metadata.name
)))
}
async fn install_monitor(
&self,
monitor: super::service_monitor::ServiceMonitor,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
debug!("service monitor: \n{:#?}", monitor.clone());
let namespace = self.namespace.clone();
client
.apply(&monitor, Some(&namespace))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
Ok(Outcome::success(format!(
"successfully deployed service monitor {:#?}",
monitor.metadata.name
)))
}
async fn install_rules(
&self,
rules: Vec<RuleGroup>,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let prom_rule_spec = PrometheusRuleSpec { groups: rules };
let prom_rules = PrometheusRule {
metadata: ObjectMeta {
name: Some(self.namespace.clone()),
labels: Some(std::collections::BTreeMap::from([(
"alertmanagerConfig".to_string(),
"enabled".to_string(),
)])),
namespace: Some(self.namespace.clone()),
..Default::default()
},
spec: prom_rule_spec,
};
client
.apply(&prom_rules, Some(&self.namespace))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
Ok(Outcome::success(format!(
"successfully deployed service monitor {:#?}",
prom_rules.metadata.name
)))
}
}

View File

@ -1,6 +1,8 @@
pub mod alert_manager_config; pub mod alert_manager_config;
pub mod crd;
pub mod helm; pub mod helm;
pub mod helm_prometheus_alert_score; pub mod helm_prometheus_alert_score;
pub mod helm_prometheus_application_alerting; pub mod helm_prometheus_application_alerting;
pub mod prometheus; pub mod prometheus;
pub mod service_monitor;
pub mod types; pub mod types;

View File

@ -0,0 +1,89 @@
use std::collections::{BTreeMap, HashMap};
use kube::{CustomResource, Resource, api::ObjectMeta};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::interpret::InterpretError;
use super::types::{
HTTPScheme, MatchExpression, NamespaceSelector, Operator, Selector,
ServiceMonitor as KubeServiceMonitor, ServiceMonitorEndpoint,
};
/// This is the top-level struct for the ServiceMonitor Custom Resource.
/// The `#[derive(CustomResource)]` macro handles all the boilerplate for you,
/// including the `impl Resource`.
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "monitoring.coreos.com",
version = "v1",
kind = "ServiceMonitor",
plural = "servicemonitors",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct ServiceMonitorSpec {
/// A label selector to select services to monitor.
pub selector: Selector,
/// A list of endpoints on the selected services to be monitored.
pub endpoints: Vec<ServiceMonitorEndpoint>,
/// Selector to select which namespaces the Kubernetes Endpoints objects
/// are discovered from.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub namespace_selector: Option<NamespaceSelector>,
/// The label to use to retrieve the job name from.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub job_label: Option<String>,
/// Pod-based target labels to transfer from the Kubernetes Pod onto the target.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub pod_target_labels: Vec<String>,
/// TargetLabels transfers labels on the Kubernetes Service object to the target.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub target_labels: Vec<String>,
}
impl Default for ServiceMonitorSpec {
fn default() -> Self {
let mut labels = HashMap::new();
Self {
selector: Selector {
match_labels: { labels },
match_expressions: vec![MatchExpression {
key: "app.kubernetes.io/name".into(),
operator: Operator::Exists,
values: vec![],
}],
},
endpoints: vec![ServiceMonitorEndpoint {
port: Some("http".to_string()),
path: Some("/metrics".into()),
interval: Some("30s".into()),
scheme: Some(HTTPScheme::HTTP),
..Default::default()
}],
namespace_selector: None, // only the same namespace
job_label: Some("app".into()),
pod_target_labels: vec![],
target_labels: vec![],
}
}
}
impl From<KubeServiceMonitor> for ServiceMonitorSpec {
fn from(value: KubeServiceMonitor) -> Self {
Self {
selector: value.selector,
endpoints: value.endpoints,
namespace_selector: value.namespace_selector,
job_label: value.job_label,
pod_target_labels: value.pod_target_labels,
target_labels: value.target_labels,
}
}
}

View File

@ -1,7 +1,8 @@
use std::collections::{BTreeMap, HashMap}; use std::collections::{BTreeMap, HashMap};
use async_trait::async_trait; use async_trait::async_trait;
use serde::Serialize; use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_yaml::{Mapping, Sequence, Value}; use serde_yaml::{Mapping, Sequence, Value};
use crate::modules::monitoring::alert_rule::prometheus_alert_rule::AlertManagerRuleGroup; use crate::modules::monitoring::alert_rule::prometheus_alert_rule::AlertManagerRuleGroup;
@ -94,7 +95,7 @@ pub struct AlertGroup {
pub groups: Vec<AlertManagerRuleGroup>, pub groups: Vec<AlertManagerRuleGroup>,
} }
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub enum HTTPScheme { pub enum HTTPScheme {
#[serde(rename = "http")] #[serde(rename = "http")]
HTTP, HTTP,
@ -102,7 +103,7 @@ pub enum HTTPScheme {
HTTPS, HTTPS,
} }
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub enum Operator { pub enum Operator {
In, In,
NotIn, NotIn,
@ -147,70 +148,79 @@ pub struct ServiceMonitorTLSConfig {
pub server_name: Option<String>, pub server_name: Option<String>,
} }
#[derive(Debug, Clone, Serialize)] #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, Default)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct ServiceMonitorEndpoint { pub struct ServiceMonitorEndpoint {
// ## Name of the endpoint's service port /// Name of the service port this endpoint refers to.
// ## Mutually exclusive with targetPort
pub port: Option<String>, pub port: Option<String>,
// ## Name or number of the endpoint's target port /// Interval at which metrics should be scraped.
// ## Mutually exclusive with port #[serde(default, skip_serializing_if = "Option::is_none")]
pub target_port: Option<String>,
// ## File containing bearer token to be used when scraping targets
// ##
pub bearer_token_file: Option<String>,
// ## Interval at which metrics should be scraped
// ##
pub interval: Option<String>, pub interval: Option<String>,
// ## HTTP path to scrape for metrics /// The HTTP path to scrape for metrics.
// ## #[serde(default, skip_serializing_if = "Option::is_none")]
pub path: String, pub path: Option<String>,
// ## HTTP scheme to use for scraping /// HTTP scheme to use for scraping.
// ## #[serde(default, skip_serializing_if = "Option::is_none")]
pub scheme: HTTPScheme, pub scheme: Option<HTTPScheme>,
// ## TLS configuration to use when scraping the endpoint /// Relabelings to apply to samples before scraping.
// ## #[serde(default, skip_serializing_if = "Vec::is_empty")]
pub tls_config: Option<ServiceMonitorTLSConfig>, pub relabelings: Vec<RelabelConfig>,
// ## MetricRelabelConfigs to apply to samples after scraping, but before ingestion. /// MetricRelabelings to apply to samples after scraping, but before ingestion.
// ## ref: https://github.com/prometheus-operator/prometheus-operator/blob/main/Documentation/api-reference/api.md#relabelconfig #[serde(default, skip_serializing_if = "Vec::is_empty")]
// ## pub metric_relabelings: Vec<RelabelConfig>,
// # - action: keep
// # regex: 'kube_(daemonset|deployment|pod|namespace|node|statefulset).+'
// # sourceLabels: [__name__]
pub metric_relabelings: Vec<Mapping>,
// ## RelabelConfigs to apply to samples before scraping
// ## ref: https://github.com/prometheus-operator/prometheus-operator/blob/main/Documentation/api-reference/api.md#relabelconfig
// ##
// # - sourceLabels: [__meta_kubernetes_pod_node_name]
// # separator: ;
// # regex: ^(.*)$
// # targetLabel: nodename
// # replacement: $1
// # action: replace
pub relabelings: Vec<Mapping>,
} }
#[derive(Debug, Clone, Serialize)] #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct RelabelConfig {
/// The action to perform based on the regex matching.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub action: Option<String>,
/// A list of labels from which to extract values.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub source_labels: Vec<String>,
/// Separator to be used when concatenating source_labels.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub separator: Option<String>,
/// The label to which the resulting value is written.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub target_label: Option<String>,
/// A regular expression to match against the concatenated source label values.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub regex: Option<String>,
/// The replacement value to use.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub replacement: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct MatchExpression { pub struct MatchExpression {
pub key: String, pub key: String,
pub operator: Operator, pub operator: Operator, // "In", "NotIn", "Exists", "DoesNotExist"
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub values: Vec<String>, pub values: Vec<String>,
} }
#[derive(Debug, Clone, Serialize)] #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, Default)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct Selector { pub struct Selector {
// # label selector for services /// A map of key-value pairs to match.
#[serde(default, skip_serializing_if = "HashMap::is_empty")]
pub match_labels: HashMap<String, String>, pub match_labels: HashMap<String, String>,
/// A list of label selector requirements.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub match_expressions: Vec<MatchExpression>, pub match_expressions: Vec<MatchExpression>,
} }
@ -258,10 +268,15 @@ pub struct ServiceMonitor {
pub fallback_scrape_protocol: Option<String>, pub fallback_scrape_protocol: Option<String>,
} }
#[derive(Debug, Serialize, Clone)] #[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct NamespaceSelector { pub struct NamespaceSelector {
/// Select all namespaces.
#[serde(default, skip_serializing_if = "std::ops::Not::not")]
pub any: bool, pub any: bool,
/// List of namespace names to select from.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub match_names: Vec<String>, pub match_names: Vec<String>,
} }
@ -283,19 +298,3 @@ impl Default for ServiceMonitor {
} }
} }
} }
impl Default for ServiceMonitorEndpoint {
fn default() -> Self {
Self {
port: Some("80".to_string()),
target_port: Default::default(),
bearer_token_file: Default::default(),
interval: Default::default(),
path: "/metrics".to_string(),
scheme: HTTPScheme::HTTP,
tls_config: Default::default(),
metric_relabelings: Default::default(),
relabelings: Default::default(),
}
}
}