Merge branch 'master' into url_macros_to_the_rescue
All checks were successful
Run Check Script / check (pull_request) Successful in 1m6s

This commit is contained in:
Ian Letourneau 2025-09-08 10:37:57 -04:00
commit 935ecd3f95
38 changed files with 2081 additions and 135 deletions

3
.gitmodules vendored Normal file
View File

@ -0,0 +1,3 @@
[submodule "examples/try_rust_webapp/tryrust.org"]
path = examples/try_rust_webapp/tryrust.org
url = https://github.com/rust-dd/tryrust.org.git

59
Cargo.lock generated
View File

@ -1838,6 +1838,21 @@ dependencies = [
"url", "url",
] ]
[[package]]
name = "example-try-rust-webapp"
version = "0.1.0"
dependencies = [
"base64 0.22.1",
"env_logger",
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_types",
"log",
"tokio",
"url",
]
[[package]] [[package]]
name = "example-tui" name = "example-tui"
version = "0.1.0" version = "0.1.0"
@ -2318,6 +2333,7 @@ dependencies = [
"tokio-util", "tokio-util",
"url", "url",
"uuid", "uuid",
"walkdir",
] ]
[[package]] [[package]]
@ -4616,6 +4632,21 @@ dependencies = [
"subtle", "subtle",
] ]
[[package]]
name = "rhob-application-monitoring"
version = "0.1.0"
dependencies = [
"base64 0.22.1",
"env_logger",
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_types",
"log",
"tokio",
"url",
]
[[package]] [[package]]
name = "ring" name = "ring"
version = "0.17.14" version = "0.17.14"
@ -4956,6 +4987,15 @@ dependencies = [
"cipher", "cipher",
] ]
[[package]]
name = "same-file"
version = "1.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502"
dependencies = [
"winapi-util",
]
[[package]] [[package]]
name = "schannel" name = "schannel"
version = "0.1.27" version = "0.1.27"
@ -6495,6 +6535,16 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "walkdir"
version = "2.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b"
dependencies = [
"same-file",
"winapi-util",
]
[[package]] [[package]]
name = "want" name = "want"
version = "0.3.1" version = "0.3.1"
@ -6677,6 +6727,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6"
[[package]]
name = "winapi-util"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0978bf7171b3d90bac376700cb56d606feb40f251a475a5d6634613564460b22"
dependencies = [
"windows-sys 0.60.2",
]
[[package]] [[package]]
name = "winapi-x86_64-pc-windows-gnu" name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0" version = "0.4.0"

View File

@ -30,6 +30,7 @@ async fn main() {
domain: Url::Url(url::Url::parse("https://rustapp.harmony.example.com").unwrap()), domain: Url::Url(url::Url::parse("https://rustapp.harmony.example.com").unwrap()),
project_root: PathBuf::from("./examples/rust/webapp"), project_root: PathBuf::from("./examples/rust/webapp"),
framework: Some(RustWebFramework::Leptos), framework: Some(RustWebFramework::Leptos),
service_port: 3000,
}); });
let webhook_receiver = WebhookReceiver { let webhook_receiver = WebhookReceiver {

View File

@ -0,0 +1,17 @@
[package]
name = "rhob-application-monitoring"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
harmony_macros = { path = "../../harmony_macros" }
tokio = { workspace = true }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }
base64.workspace = true

View File

@ -0,0 +1,50 @@
use std::{path::PathBuf, sync::Arc};
use harmony::{
inventory::Inventory,
modules::{
application::{
ApplicationScore, RustWebFramework, RustWebapp,
features::rhob_monitoring::RHOBMonitoring,
},
monitoring::alert_channel::discord_alert_channel::DiscordWebhook,
},
topology::K8sAnywhereTopology,
};
use harmony_types::net::Url;
#[tokio::main]
async fn main() {
let application = Arc::new(RustWebapp {
name: "test-rhob-monitoring".to_string(),
domain: Url::Url(url::Url::parse("htps://some-fake-url").unwrap()),
project_root: PathBuf::from("./webapp"), // Relative from 'harmony-path' param
framework: Some(RustWebFramework::Leptos),
service_port: 3000,
});
let discord_receiver = DiscordWebhook {
name: "test-discord".to_string(),
url: Url::Url(url::Url::parse("https://discord.doesnt.exist.com").unwrap()),
};
let app = ApplicationScore {
features: vec![
Box::new(RHOBMonitoring {
application: application.clone(),
alert_receiver: vec![Box::new(discord_receiver)],
}),
// TODO add backups, multisite ha, etc
],
application,
};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(app)],
None,
)
.await
.unwrap();
}

View File

@ -13,25 +13,26 @@ use harmony::{
}, },
topology::K8sAnywhereTopology, topology::K8sAnywhereTopology,
}; };
use harmony_macros::remote_url; use harmony_macros::hurl;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let application = Arc::new(RustWebapp { let application = Arc::new(RustWebapp {
name: "harmony-example-rust-webapp".to_string(), name: "harmony-example-rust-webapp".to_string(),
domain: remote_url!("https://rustapp.harmony.example.com"), domain: hurl!("https://rustapp.harmony.example.com"),
project_root: PathBuf::from("./webapp"), // Relative from 'harmony-path' param project_root: PathBuf::from("./webapp"),
framework: Some(RustWebFramework::Leptos), framework: Some(RustWebFramework::Leptos),
service_port: 3000,
}); });
let discord_receiver = DiscordWebhook { let discord_receiver = DiscordWebhook {
name: "test-discord".to_string(), name: "test-discord".to_string(),
url: remote_url!("https://discord.doesnt.exist.com"), url: hurl!("https://discord.doesnt.exist.com"),
}; };
let webhook_receiver = WebhookReceiver { let webhook_receiver = WebhookReceiver {
name: "sample-webhook-receiver".to_string(), name: "sample-webhook-receiver".to_string(),
url: remote_url!("https://webhook-doesnt-exist.com"), url: hurl!("https://webhook-doesnt-exist.com"),
}; };
let app = ApplicationScore { let app = ApplicationScore {

View File

@ -0,0 +1,17 @@
[package]
name = "example-try-rust-webapp"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
harmony_macros = { path = "../../harmony_macros" }
tokio = { workspace = true }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }
base64.workspace = true

View File

@ -0,0 +1,52 @@
use std::{path::PathBuf, sync::Arc};
use harmony::{
inventory::Inventory,
modules::{
application::{
ApplicationScore, RustWebFramework, RustWebapp,
features::{ContinuousDelivery, Monitoring},
},
monitoring::alert_channel::discord_alert_channel::DiscordWebhook,
},
topology::K8sAnywhereTopology,
};
use harmony_types::net::Url;
#[tokio::main]
async fn main() {
let application = Arc::new(RustWebapp {
name: "harmony-example-tryrust".to_string(),
domain: Url::Url(url::Url::parse("https://tryrust.harmony.example.com").unwrap()),
project_root: PathBuf::from("./tryrust.org"),
framework: Some(RustWebFramework::Leptos),
service_port: 8080,
});
let discord_receiver = DiscordWebhook {
name: "test-discord".to_string(),
url: Url::Url(url::Url::parse("https://discord.doesnt.exist.com").unwrap()),
};
let app = ApplicationScore {
features: vec![
Box::new(ContinuousDelivery {
application: application.clone(),
}),
Box::new(Monitoring {
application: application.clone(),
alert_receiver: vec![Box::new(discord_receiver)],
}),
],
application,
};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(app)],
None,
)
.await
.unwrap();
}

@ -0,0 +1 @@
Subproject commit 0f9ba145172867f467e5320b37d07a5bbb7dd438

View File

@ -66,6 +66,7 @@ tar.workspace = true
base64.workspace = true base64.workspace = true
thiserror.workspace = true thiserror.workspace = true
once_cell = "1.21.3" once_cell = "1.21.3"
walkdir = "2.5.0"
harmony_inventory_agent = { path = "../harmony_inventory_agent" } harmony_inventory_agent = { path = "../harmony_inventory_agent" }
harmony_secret_derive = { version = "0.1.0", path = "../harmony_secret_derive" } harmony_secret_derive = { version = "0.1.0", path = "../harmony_secret_derive" }
askama.workspace = true askama.workspace = true

View File

@ -32,6 +32,7 @@ pub enum InterpretName {
K8sPrometheusCrdAlerting, K8sPrometheusCrdAlerting,
DiscoverInventoryAgent, DiscoverInventoryAgent,
CephClusterHealth, CephClusterHealth,
RHOBAlerting,
} }
impl std::fmt::Display for InterpretName { impl std::fmt::Display for InterpretName {
@ -60,6 +61,7 @@ impl std::fmt::Display for InterpretName {
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"), InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"), InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"),
InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"), InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"),
InterpretName::RHOBAlerting => f.write_str("RHOBAlerting"),
} }
} }
} }

View File

@ -17,7 +17,7 @@ use kube::{
}; };
use log::{debug, error, trace}; use log::{debug, error, trace};
use serde::{Serialize, de::DeserializeOwned}; use serde::{Serialize, de::DeserializeOwned};
use serde_json::json; use serde_json::{Value, json};
use similar::TextDiff; use similar::TextDiff;
use tokio::io::AsyncReadExt; use tokio::io::AsyncReadExt;
@ -53,6 +53,21 @@ impl K8sClient {
}) })
} }
pub async fn get_resource_json_value(
&self,
name: &str,
namespace: Option<&str>,
gvk: &GroupVersionKind,
) -> Result<DynamicObject, Error> {
let gvk = ApiResource::from_gvk(gvk);
let resource: Api<DynamicObject> = if let Some(ns) = namespace {
Api::namespaced_with(self.client.clone(), ns, &gvk)
} else {
Api::default_namespaced_with(self.client.clone(), &gvk)
};
Ok(resource.get(name).await?)
}
pub async fn get_deployment( pub async fn get_deployment(
&self, &self,
name: &str, name: &str,

View File

@ -14,10 +14,11 @@ use crate::{
monitoring::kube_prometheus::crd::{ monitoring::kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus, crd_alertmanager_config::CRDPrometheus,
prometheus_operator::prometheus_operator_helm_chart_score, prometheus_operator::prometheus_operator_helm_chart_score,
rhob_alertmanager_config::RHOBObservability,
}, },
prometheus::{ prometheus::{
k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore, k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore,
prometheus::PrometheusApplicationMonitoring, prometheus::PrometheusApplicationMonitoring, rhob_alerting_score::RHOBAlertingScore,
}, },
}, },
score::Score, score::Score,
@ -108,6 +109,43 @@ impl PrometheusApplicationMonitoring<CRDPrometheus> for K8sAnywhereTopology {
} }
} }
#[async_trait]
impl PrometheusApplicationMonitoring<RHOBObservability> for K8sAnywhereTopology {
async fn install_prometheus(
&self,
sender: &RHOBObservability,
inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<RHOBObservability>>>>,
) -> Result<PreparationOutcome, PreparationError> {
let po_result = self.ensure_cluster_observability_operator(sender).await?;
if po_result == PreparationOutcome::Noop {
debug!("Skipping Prometheus CR installation due to missing operator.");
return Ok(po_result);
}
let result = self
.get_cluster_observability_operator_prometheus_application_score(
sender.clone(),
receivers,
)
.await
.interpret(inventory, self)
.await;
match result {
Ok(outcome) => match outcome.status {
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
details: outcome.message,
}),
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
_ => Err(PreparationError::new(outcome.message)),
},
Err(err) => Err(PreparationError::new(err.to_string())),
}
}
}
impl Serialize for K8sAnywhereTopology { impl Serialize for K8sAnywhereTopology {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where where
@ -134,6 +172,19 @@ impl K8sAnywhereTopology {
} }
} }
async fn get_cluster_observability_operator_prometheus_application_score(
&self,
sender: RHOBObservability,
receivers: Option<Vec<Box<dyn AlertReceiver<RHOBObservability>>>>,
) -> RHOBAlertingScore {
RHOBAlertingScore {
sender,
receivers: receivers.unwrap_or_default(),
service_monitors: vec![],
prometheus_rules: vec![],
}
}
async fn get_k8s_prometheus_application_score( async fn get_k8s_prometheus_application_score(
&self, &self,
sender: CRDPrometheus, sender: CRDPrometheus,
@ -286,6 +337,60 @@ impl K8sAnywhereTopology {
} }
} }
async fn ensure_cluster_observability_operator(
&self,
sender: &RHOBObservability,
) -> Result<PreparationOutcome, PreparationError> {
let status = Command::new("sh")
.args(["-c", "kubectl get crd -A | grep -i rhobs"])
.status()
.map_err(|e| PreparationError::new(format!("could not connect to cluster: {}", e)))?;
if !status.success() {
if let Some(Some(k8s_state)) = self.k8s_state.get() {
match k8s_state.source {
K8sSource::LocalK3d => {
debug!("installing cluster observability operator");
todo!();
let op_score =
prometheus_operator_helm_chart_score(sender.namespace.clone());
let result = op_score.interpret(&Inventory::empty(), self).await;
return match result {
Ok(outcome) => match outcome.status {
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
details: "installed cluster observability operator".into(),
}),
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
_ => Err(PreparationError::new(
"failed to install cluster observability operator (unknown error)".into(),
)),
},
Err(err) => Err(PreparationError::new(err.to_string())),
};
}
K8sSource::Kubeconfig => {
debug!(
"unable to install cluster observability operator, contact cluster admin"
);
return Ok(PreparationOutcome::Noop);
}
}
} else {
warn!(
"Unable to detect k8s_state. Skipping Cluster Observability Operator install."
);
return Ok(PreparationOutcome::Noop);
}
}
debug!("Cluster Observability Operator is already present, skipping install");
Ok(PreparationOutcome::Success {
details: "cluster observability operator present in cluster".into(),
})
}
async fn ensure_prometheus_operator( async fn ensure_prometheus_operator(
&self, &self,
sender: &CRDPrometheus, sender: &CRDPrometheus,

View File

@ -176,18 +176,18 @@ impl<
} }
target => { target => {
info!("Deploying {} to target {target:?}", self.application.name()); info!("Deploying {} to target {target:?}", self.application.name());
let score = ArgoHelmScore { let score = ArgoHelmScore {
namespace: "harmony-example-rust-webapp".to_string(), namespace: format!("{}", self.application.name()),
openshift: true, openshift: true,
domain: "argo.harmonydemo.apps.ncd0.harmony.mcd".to_string(),
argo_apps: vec![ArgoApplication::from(CDApplicationConfig { argo_apps: vec![ArgoApplication::from(CDApplicationConfig {
// helm pull oci://hub.nationtech.io/harmony/harmony-example-rust-webapp-chart --version 0.1.0 // helm pull oci://hub.nationtech.io/harmony/harmony-example-rust-webapp-chart --version 0.1.0
version: Version::from("0.1.0").unwrap(), version: Version::from("0.1.0").unwrap(),
helm_chart_repo_url: "hub.nationtech.io/harmony".to_string(), helm_chart_repo_url: "hub.nationtech.io/harmony".to_string(),
helm_chart_name: "harmony-example-rust-webapp-chart".to_string(), helm_chart_name: format!("{}-chart", self.application.name()),
values_overrides: None, values_overrides: None,
name: "harmony-demo-rust-webapp".to_string(), name: format!("{}", self.application.name()),
namespace: "harmony-example-rust-webapp".to_string(), namespace: format!("{}", self.application.name()),
})], })],
}; };
score score

View File

@ -1,7 +1,10 @@
use async_trait::async_trait; use async_trait::async_trait;
use kube::{Api, api::GroupVersionKind};
use log::{debug, warn};
use non_blank_string_rs::NonBlankString; use non_blank_string_rs::NonBlankString;
use serde::Serialize; use serde::Serialize;
use std::str::FromStr; use serde::de::DeserializeOwned;
use std::{process::Command, str::FromStr, sync::Arc};
use crate::{ use crate::{
data::Version, data::Version,
@ -9,7 +12,9 @@ use crate::{
inventory::Inventory, inventory::Inventory,
modules::helm::chart::{HelmChartScore, HelmRepository}, modules::helm::chart::{HelmChartScore, HelmRepository},
score::Score, score::Score,
topology::{HelmCommand, K8sclient, Topology}, topology::{
HelmCommand, K8sclient, PreparationError, PreparationOutcome, Topology, k8s::K8sClient,
},
}; };
use harmony_types::id::Id; use harmony_types::id::Id;
@ -19,15 +24,13 @@ use super::ArgoApplication;
pub struct ArgoHelmScore { pub struct ArgoHelmScore {
pub namespace: String, pub namespace: String,
pub openshift: bool, pub openshift: bool,
pub domain: String,
pub argo_apps: Vec<ArgoApplication>, pub argo_apps: Vec<ArgoApplication>,
} }
impl<T: Topology + HelmCommand + K8sclient> Score<T> for ArgoHelmScore { impl<T: Topology + HelmCommand + K8sclient> Score<T> for ArgoHelmScore {
fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> { fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> {
let helm_score = argo_helm_chart_score(&self.namespace, self.openshift, &self.domain);
Box::new(ArgoInterpret { Box::new(ArgoInterpret {
score: helm_score, score: self.clone(),
argo_apps: self.argo_apps.clone(), argo_apps: self.argo_apps.clone(),
}) })
} }
@ -39,7 +42,7 @@ impl<T: Topology + HelmCommand + K8sclient> Score<T> for ArgoHelmScore {
#[derive(Debug)] #[derive(Debug)]
pub struct ArgoInterpret { pub struct ArgoInterpret {
score: HelmChartScore, score: ArgoHelmScore,
argo_apps: Vec<ArgoApplication>, argo_apps: Vec<ArgoApplication>,
} }
@ -50,9 +53,16 @@ impl<T: Topology + K8sclient + HelmCommand> Interpret<T> for ArgoInterpret {
inventory: &Inventory, inventory: &Inventory,
topology: &T, topology: &T,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
self.score.interpret(inventory, topology).await?;
let k8s_client = topology.k8s_client().await?; let k8s_client = topology.k8s_client().await?;
let domain = self
.get_host_domain(k8s_client.clone(), self.score.openshift)
.await?;
let domain = format!("argo.{domain}");
let helm_score =
argo_helm_chart_score(&self.score.namespace, self.score.openshift, &domain);
helm_score.interpret(inventory, topology).await?;
k8s_client k8s_client
.apply_yaml_many(&self.argo_apps.iter().map(|a| a.to_yaml()).collect(), None) .apply_yaml_many(&self.argo_apps.iter().map(|a| a.to_yaml()).collect(), None)
.await .await
@ -85,6 +95,38 @@ impl<T: Topology + K8sclient + HelmCommand> Interpret<T> for ArgoInterpret {
} }
} }
impl ArgoInterpret {
pub async fn get_host_domain(
&self,
client: Arc<K8sClient>,
openshift: bool,
) -> Result<String, InterpretError> {
//This should be the job of the topology to determine if we are in
//openshift, potentially we need on openshift topology the same way we create a
//localhosttopology
match openshift {
true => {
let gvk = GroupVersionKind {
group: "operator.openshift.io".into(),
version: "v1".into(),
kind: "IngressController".into(),
};
let ic = client
.get_resource_json_value("default", Some("openshift-ingress-operator"), &gvk)
.await?;
match ic.data["status"]["domain"].as_str() {
Some(domain) => return Ok(domain.to_string()),
None => return Err(InterpretError::new("Could not find domain".to_string())),
}
}
false => {
todo!()
}
};
}
}
pub fn argo_helm_chart_score(namespace: &str, openshift: bool, domain: &str) -> HelmChartScore { pub fn argo_helm_chart_score(namespace: &str, openshift: bool, domain: &str) -> HelmChartScore {
let values = format!( let values = format!(
r#" r#"
@ -660,7 +702,7 @@ server:
# nginx.ingress.kubernetes.io/ssl-passthrough: "true" # nginx.ingress.kubernetes.io/ssl-passthrough: "true"
# -- Defines which ingress controller will implement the resource # -- Defines which ingress controller will implement the resource
ingressClassName: "" ingressClassName: "openshift-default"
# -- Argo CD server hostname # -- Argo CD server hostname
# @default -- `""` (defaults to global.domain) # @default -- `""` (defaults to global.domain)

View File

@ -1,4 +1,5 @@
mod endpoint; mod endpoint;
pub mod rhob_monitoring;
pub use endpoint::*; pub use endpoint::*;
mod monitoring; mod monitoring;

View File

@ -0,0 +1,109 @@
use std::sync::Arc;
use crate::modules::application::{Application, ApplicationFeature};
use crate::modules::monitoring::application_monitoring::application_monitoring_score::ApplicationMonitoringScore;
use crate::modules::monitoring::application_monitoring::rhobs_application_monitoring_score::ApplicationRHOBMonitoringScore;
use crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::RHOBObservability;
use crate::topology::MultiTargetTopology;
use crate::{
inventory::Inventory,
modules::monitoring::{
alert_channel::webhook_receiver::WebhookReceiver, ntfy::ntfy::NtfyScore,
},
score::Score,
topology::{HelmCommand, K8sclient, Topology, tenant::TenantManager},
};
use crate::{
modules::prometheus::prometheus::PrometheusApplicationMonitoring,
topology::oberservability::monitoring::AlertReceiver,
};
use async_trait::async_trait;
use base64::{Engine as _, engine::general_purpose};
use harmony_types::net::Url;
use log::{debug, info};
#[derive(Debug, Clone)]
pub struct RHOBMonitoring {
pub application: Arc<dyn Application>,
pub alert_receiver: Vec<Box<dyn AlertReceiver<RHOBObservability>>>,
}
#[async_trait]
impl<
T: Topology
+ HelmCommand
+ 'static
+ TenantManager
+ K8sclient
+ MultiTargetTopology
+ std::fmt::Debug
+ PrometheusApplicationMonitoring<RHOBObservability>,
> ApplicationFeature<T> for RHOBMonitoring
{
async fn ensure_installed(&self, topology: &T) -> Result<(), String> {
info!("Ensuring monitoring is available for application");
let namespace = topology
.get_tenant_config()
.await
.map(|ns| ns.name.clone())
.unwrap_or_else(|| self.application.name());
let mut alerting_score = ApplicationRHOBMonitoringScore {
sender: RHOBObservability {
namespace: namespace.clone(),
client: topology.k8s_client().await.unwrap(),
},
application: self.application.clone(),
receivers: self.alert_receiver.clone(),
};
let ntfy = NtfyScore {
namespace: namespace.clone(),
host: "ntfy.harmonydemo.apps.ncd0.harmony.mcd".to_string(),
};
ntfy.interpret(&Inventory::empty(), topology)
.await
.map_err(|e| e.to_string())?;
let ntfy_default_auth_username = "harmony";
let ntfy_default_auth_password = "harmony";
let ntfy_default_auth_header = format!(
"Basic {}",
general_purpose::STANDARD.encode(format!(
"{ntfy_default_auth_username}:{ntfy_default_auth_password}"
))
);
debug!("ntfy_default_auth_header: {ntfy_default_auth_header}");
let ntfy_default_auth_param = general_purpose::STANDARD
.encode(ntfy_default_auth_header)
.replace("=", "");
debug!("ntfy_default_auth_param: {ntfy_default_auth_param}");
let ntfy_receiver = WebhookReceiver {
name: "ntfy-webhook".to_string(),
url: Url::Url(
url::Url::parse(
format!(
"http://ntfy.{}.svc.cluster.local/rust-web-app?auth={ntfy_default_auth_param}",
namespace.clone()
)
.as_str(),
)
.unwrap(),
),
};
alerting_score.receivers.push(Box::new(ntfy_receiver));
alerting_score
.interpret(&Inventory::empty(), topology)
.await
.map_err(|e| e.to_string())?;
Ok(())
}
fn name(&self) -> String {
"Monitoring".to_string()
}
}

View File

@ -1,4 +1,5 @@
use std::fs; use std::fs::{self, File};
use std::io::Read;
use std::path::{Path, PathBuf}; use std::path::{Path, PathBuf};
use std::process; use std::process;
use std::sync::Arc; use std::sync::Arc;
@ -12,7 +13,8 @@ use dockerfile_builder::instruction_builder::CopyBuilder;
use futures_util::StreamExt; use futures_util::StreamExt;
use log::{debug, info, log_enabled}; use log::{debug, info, log_enabled};
use serde::Serialize; use serde::Serialize;
use tar::Archive; use tar::{Archive, Builder, Header};
use walkdir::WalkDir;
use crate::config::{REGISTRY_PROJECT, REGISTRY_URL}; use crate::config::{REGISTRY_PROJECT, REGISTRY_URL};
use crate::{score::Score, topology::Topology}; use crate::{score::Score, topology::Topology};
@ -59,6 +61,7 @@ pub struct RustWebapp {
pub domain: Url, pub domain: Url,
/// The path to the root of the Rust project to be containerized. /// The path to the root of the Rust project to be containerized.
pub project_root: PathBuf, pub project_root: PathBuf,
pub service_port: u32,
pub framework: Option<RustWebFramework>, pub framework: Option<RustWebFramework>,
} }
@ -158,45 +161,99 @@ impl RustWebapp {
image_name: &str, image_name: &str,
) -> Result<String, Box<dyn std::error::Error>> { ) -> Result<String, Box<dyn std::error::Error>> {
debug!("Generating Dockerfile for '{}'", self.name); debug!("Generating Dockerfile for '{}'", self.name);
let _dockerfile_path = self.build_dockerfile()?; let dockerfile = self.get_or_build_dockerfile();
let docker = Docker::connect_with_socket_defaults().unwrap();
let quiet = !log_enabled!(log::Level::Debug); let quiet = !log_enabled!(log::Level::Debug);
match dockerfile
let build_image_options = bollard::query_parameters::BuildImageOptionsBuilder::default()
.dockerfile("Dockerfile.harmony")
.t(image_name)
.q(quiet)
.version(bollard::query_parameters::BuilderVersion::BuilderV1)
.platform("linux/x86_64");
let mut temp_tar_builder = tar::Builder::new(Vec::new());
temp_tar_builder
.append_dir_all("", self.project_root.clone())
.unwrap();
let archive = temp_tar_builder
.into_inner()
.expect("couldn't finish creating tar");
let archived_files = Archive::new(archive.as_slice())
.entries()
.unwrap() .unwrap()
.map(|entry| entry.unwrap().path().unwrap().into_owned()) .file_name()
.collect::<Vec<_>>(); .and_then(|os_str| os_str.to_str())
{
Some(path_str) => {
debug!("Building from dockerfile {}", path_str);
debug!("files in docker tar: {:#?}", archived_files); let tar_data = self
.create_deterministic_tar(&self.project_root.clone())
.await
.unwrap();
let mut image_build_stream = docker.build_image( let docker = Docker::connect_with_socket_defaults().unwrap();
build_image_options.build(),
None,
Some(body_full(archive.into())),
);
while let Some(msg) = image_build_stream.next().await { let build_image_options =
debug!("Message: {msg:?}"); bollard::query_parameters::BuildImageOptionsBuilder::default()
.dockerfile(path_str)
.t(image_name)
.q(quiet)
.version(bollard::query_parameters::BuilderVersion::BuilderV1)
.platform("linux/x86_64");
let mut image_build_stream = docker.build_image(
build_image_options.build(),
None,
Some(body_full(tar_data.into())),
);
while let Some(msg) = image_build_stream.next().await {
debug!("Message: {msg:?}");
}
Ok(image_name.to_string())
}
None => Err(Box::new(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Path is not valid UTF-8",
))),
} }
}
Ok(image_name.to_string()) ///normalizes timestamp and ignores files that will bust the docker cach
async fn create_deterministic_tar(
&self,
project_root: &std::path::Path,
) -> Result<Vec<u8>, Box<dyn std::error::Error>> {
debug!("building tar file from project root {:#?}", project_root);
let mut tar_data = Vec::new();
{
let mut builder = Builder::new(&mut tar_data);
let ignore_prefixes = [
"target",
".git",
".github",
".harmony_generated",
"node_modules",
];
let mut entries: Vec<_> = WalkDir::new(project_root)
.into_iter()
.filter_map(Result::ok)
.filter(|e| e.file_type().is_file())
.filter(|e| {
let rel_path = e.path().strip_prefix(project_root).unwrap();
!ignore_prefixes
.iter()
.any(|prefix| rel_path.starts_with(prefix))
})
.collect();
entries.sort_by_key(|e| e.path().to_owned());
for entry in entries {
let path = entry.path();
let rel_path = path.strip_prefix(project_root).unwrap();
let mut file = fs::File::open(path)?;
let mut header = Header::new_gnu();
header.set_size(entry.metadata()?.len());
header.set_mode(0o644);
header.set_mtime(0);
header.set_uid(0);
header.set_gid(0);
builder.append_data(&mut header, rel_path, &mut file)?;
}
builder.finish()?;
}
Ok(tar_data)
} }
/// Tags and pushes a Docker image to the configured remote registry. /// Tags and pushes a Docker image to the configured remote registry.
@ -272,8 +329,11 @@ impl RustWebapp {
"groupadd -r appgroup && useradd -r -s /bin/false -g appgroup appuser", "groupadd -r appgroup && useradd -r -s /bin/false -g appgroup appuser",
)); ));
dockerfile.push(ENV::from("LEPTOS_SITE_ADDR=0.0.0.0:3000")); dockerfile.push(ENV::from(format!(
dockerfile.push(EXPOSE::from("3000/tcp")); "LEPTOS_SITE_ADDR=0.0.0.0:{}",
self.service_port
)));
dockerfile.push(EXPOSE::from(format!("{}/tcp", self.service_port)));
dockerfile.push(WORKDIR::from("/home/appuser")); dockerfile.push(WORKDIR::from("/home/appuser"));
// Copy static files // Copy static files
@ -394,7 +454,7 @@ image:
service: service:
type: ClusterIP type: ClusterIP
port: 3000 port: {}
ingress: ingress:
enabled: true enabled: true
@ -414,112 +474,123 @@ ingress:
- chart-example.local - chart-example.local
"#, "#,
chart_name, image_repo, image_tag, self.name chart_name, image_repo, image_tag, self.service_port, self.name
); );
fs::write(chart_dir.join("values.yaml"), values_yaml)?; fs::write(chart_dir.join("values.yaml"), values_yaml)?;
// Create templates/_helpers.tpl // Create templates/_helpers.tpl
let helpers_tpl = r#" let helpers_tpl = format!(
{{/* r#"
{{{{/*
Expand the name of the chart. Expand the name of the chart.
*/}} */}}}}
{{- define "chart.name" -}} {{{{- define "chart.name" -}}}}
{{- default .Chart.Name $.Values.nameOverride | trunc 63 | trimSuffix "-" }} {{{{- default .Chart.Name $.Values.nameOverride | trunc 63 | trimSuffix "-" }}}}
{{- end }} {{{{- end }}}}
{{/* {{{{/*
Create a default fully qualified app name. Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec). We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
*/}} */}}}}
{{- define "chart.fullname" -}} {{{{- define "chart.fullname" -}}}}
{{- $name := default .Chart.Name $.Values.nameOverride }} {{{{- $name := default .Chart.Name $.Values.nameOverride }}}}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }} {{{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}}}
{{- end }} {{{{- end }}}}
"#; "#
);
fs::write(templates_dir.join("_helpers.tpl"), helpers_tpl)?; fs::write(templates_dir.join("_helpers.tpl"), helpers_tpl)?;
// Create templates/service.yaml // Create templates/service.yaml
let service_yaml = r#" let service_yaml = format!(
r#"
apiVersion: v1 apiVersion: v1
kind: Service kind: Service
metadata: metadata:
name: {{ include "chart.fullname" . }} name: {{{{ include "chart.fullname" . }}}}
spec: spec:
type: {{ $.Values.service.type }} type: {{{{ $.Values.service.type }}}}
ports: ports:
- name: main - name: main
port: {{ $.Values.service.port | default 3000 }} port: {{{{ $.Values.service.port | default {} }}}}
targetPort: {{ $.Values.service.port | default 3000 }} targetPort: {{{{ $.Values.service.port | default {} }}}}
protocol: TCP protocol: TCP
selector: selector:
app: {{ include "chart.name" . }} app: {{{{ include "chart.name" . }}}}
"#; "#,
self.service_port, self.service_port
);
fs::write(templates_dir.join("service.yaml"), service_yaml)?; fs::write(templates_dir.join("service.yaml"), service_yaml)?;
// Create templates/deployment.yaml // Create templates/deployment.yaml
let deployment_yaml = r#" let deployment_yaml = format!(
r#"
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: {{ include "chart.fullname" . }} name: {{{{ include "chart.fullname" . }}}}
spec: spec:
replicas: {{ $.Values.replicaCount }} replicas: {{{{ $.Values.replicaCount }}}}
selector: selector:
matchLabels: matchLabels:
app: {{ include "chart.name" . }} app: {{{{ include "chart.name" . }}}}
template: template:
metadata: metadata:
labels: labels:
app: {{ include "chart.name" . }} app: {{{{ include "chart.name" . }}}}
spec: spec:
containers: containers:
- name: {{ .Chart.Name }} - name: {{{{ .Chart.Name }}}}
image: "{{ $.Values.image.repository }}:{{ $.Values.image.tag | default .Chart.AppVersion }}" image: "{{{{ $.Values.image.repository }}}}:{{{{ $.Values.image.tag | default .Chart.AppVersion }}}}"
imagePullPolicy: {{ $.Values.image.pullPolicy }} imagePullPolicy: {{{{ $.Values.image.pullPolicy }}}}
ports: ports:
- name: main - name: main
containerPort: {{ $.Values.service.port | default 3000 }} containerPort: {{{{ $.Values.service.port | default {} }}}}
protocol: TCP protocol: TCP
"#; "#,
self.service_port
);
fs::write(templates_dir.join("deployment.yaml"), deployment_yaml)?; fs::write(templates_dir.join("deployment.yaml"), deployment_yaml)?;
// Create templates/ingress.yaml // Create templates/ingress.yaml
let ingress_yaml = r#" let ingress_yaml = format!(
{{- if $.Values.ingress.enabled -}} r#"
{{{{- if $.Values.ingress.enabled -}}}}
apiVersion: networking.k8s.io/v1 apiVersion: networking.k8s.io/v1
kind: Ingress kind: Ingress
metadata: metadata:
name: {{ include "chart.fullname" . }} name: {{{{ include "chart.fullname" . }}}}
annotations: annotations:
{{- toYaml $.Values.ingress.annotations | nindent 4 }} {{{{- toYaml $.Values.ingress.annotations | nindent 4 }}}}
spec: spec:
{{- if $.Values.ingress.tls }} {{{{- if $.Values.ingress.tls }}}}
tls: tls:
{{- range $.Values.ingress.tls }} {{{{- range $.Values.ingress.tls }}}}
- hosts: - hosts:
{{- range .hosts }} {{{{- range .hosts }}}}
- {{ . | quote }} - {{{{ . | quote }}}}
{{- end }} {{{{- end }}}}
secretName: {{ .secretName }} secretName: {{{{ .secretName }}}}
{{- end }} {{{{- end }}}}
{{- end }} {{{{- end }}}}
rules: rules:
{{- range $.Values.ingress.hosts }} {{{{- range $.Values.ingress.hosts }}}}
- host: {{ .host | quote }} - host: {{{{ .host | quote }}}}
http: http:
paths: paths:
{{- range .paths }} {{{{- range .paths }}}}
- path: {{ .path }} - path: {{{{ .path }}}}
pathType: {{ .pathType }} pathType: {{{{ .pathType }}}}
backend: backend:
service: service:
name: {{ include "chart.fullname" $ }} name: {{{{ include "chart.fullname" $ }}}}
port: port:
number: {{ $.Values.service.port | default 3000 }} number: {{{{ $.Values.service.port | default {} }}}}
{{- end }} {{{{- end }}}}
{{- end }} {{{{- end }}}}
{{- end }} {{{{- end }}}}
"#; "#,
self.service_port
);
fs::write(templates_dir.join("ingress.yaml"), ingress_yaml)?; fs::write(templates_dir.join("ingress.yaml"), ingress_yaml)?;
Ok(chart_dir) Ok(chart_dir)
@ -571,7 +642,6 @@ spec:
let chart_file_name = packaged_chart_path.file_stem().unwrap().to_str().unwrap(); let chart_file_name = packaged_chart_path.file_stem().unwrap().to_str().unwrap();
let oci_push_url = format!("oci://{}/{}", *REGISTRY_URL, *REGISTRY_PROJECT); let oci_push_url = format!("oci://{}/{}", *REGISTRY_URL, *REGISTRY_PROJECT);
let oci_pull_url = format!("{oci_push_url}/{}-chart", self.name); let oci_pull_url = format!("{oci_push_url}/{}-chart", self.name);
debug!( debug!(
"Pushing Helm chart {} to {}", "Pushing Helm chart {} to {}",
packaged_chart_path.to_string_lossy(), packaged_chart_path.to_string_lossy(),
@ -590,4 +660,20 @@ spec:
debug!("push url {oci_push_url}"); debug!("push url {oci_push_url}");
Ok(format!("{}:{}", oci_pull_url, version)) Ok(format!("{}:{}", oci_pull_url, version))
} }
fn get_or_build_dockerfile(&self) -> Result<PathBuf, Box<dyn std::error::Error>> {
let existing_dockerfile = self.project_root.join("Dockerfile");
debug!("project_root = {:?}", self.project_root);
debug!("checking = {:?}", existing_dockerfile);
if existing_dockerfile.exists() {
debug!(
"Checking path {:#?} for existing Dockerfile",
self.project_root.clone()
);
return Ok(existing_dockerfile);
}
self.build_dockerfile()
}
} }

View File

@ -4,6 +4,7 @@ use std::collections::BTreeMap;
use async_trait::async_trait; use async_trait::async_trait;
use k8s_openapi::api::core::v1::Secret; use k8s_openapi::api::core::v1::Secret;
use kube::api::ObjectMeta; use kube::api::ObjectMeta;
use log::debug;
use serde::Serialize; use serde::Serialize;
use serde_json::json; use serde_json::json;
use serde_yaml::{Mapping, Value}; use serde_yaml::{Mapping, Value};
@ -11,6 +12,7 @@ use serde_yaml::{Mapping, Value};
use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::{ use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::{
AlertmanagerConfig, AlertmanagerConfigSpec, CRDPrometheus, AlertmanagerConfig, AlertmanagerConfigSpec, CRDPrometheus,
}; };
use crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::RHOBObservability;
use crate::{ use crate::{
interpret::{InterpretError, Outcome}, interpret::{InterpretError, Outcome},
modules::monitoring::{ modules::monitoring::{
@ -30,6 +32,71 @@ pub struct DiscordWebhook {
pub url: Url, pub url: Url,
} }
#[async_trait]
impl AlertReceiver<RHOBObservability> for DiscordWebhook {
async fn install(&self, sender: &RHOBObservability) -> Result<Outcome, InterpretError> {
let spec = crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::AlertmanagerConfigSpec {
data: json!({
"route": {
"receiver": self.name,
},
"receivers": [
{
"name": self.name,
"webhookConfigs": [
{
"url": self.url,
}
]
}
]
}),
};
let alertmanager_configs = crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::AlertmanagerConfig {
metadata: ObjectMeta {
name: Some(self.name.clone()),
labels: Some(std::collections::BTreeMap::from([(
"alertmanagerConfig".to_string(),
"enabled".to_string(),
)])),
namespace: Some(sender.namespace.clone()),
..Default::default()
},
spec,
};
debug!(
"alertmanager_configs yaml:\n{:#?}",
serde_yaml::to_string(&alertmanager_configs)
);
debug!(
"alert manager configs: \n{:#?}",
alertmanager_configs.clone()
);
sender
.client
.apply(&alertmanager_configs, Some(&sender.namespace))
.await?;
Ok(Outcome::success(format!(
"installed rhob-alertmanagerconfigs for {}",
self.name
)))
}
fn name(&self) -> String {
"webhook-receiver".to_string()
}
fn clone_box(&self) -> Box<dyn AlertReceiver<RHOBObservability>> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[async_trait] #[async_trait]
impl AlertReceiver<CRDPrometheus> for DiscordWebhook { impl AlertReceiver<CRDPrometheus> for DiscordWebhook {
async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> { async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> {

View File

@ -11,8 +11,8 @@ use crate::{
interpret::{InterpretError, Outcome}, interpret::{InterpretError, Outcome},
modules::monitoring::{ modules::monitoring::{
kube_prometheus::{ kube_prometheus::{
crd::crd_alertmanager_config::{ crd::{
AlertmanagerConfig, AlertmanagerConfigSpec, CRDPrometheus, crd_alertmanager_config::CRDPrometheus, rhob_alertmanager_config::RHOBObservability,
}, },
prometheus::{KubePrometheus, KubePrometheusReceiver}, prometheus::{KubePrometheus, KubePrometheusReceiver},
types::{AlertChannelConfig, AlertManagerChannelConfig}, types::{AlertChannelConfig, AlertManagerChannelConfig},
@ -30,9 +30,9 @@ pub struct WebhookReceiver {
} }
#[async_trait] #[async_trait]
impl AlertReceiver<CRDPrometheus> for WebhookReceiver { impl AlertReceiver<RHOBObservability> for WebhookReceiver {
async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> { async fn install(&self, sender: &RHOBObservability) -> Result<Outcome, InterpretError> {
let spec = AlertmanagerConfigSpec { let spec = crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::AlertmanagerConfigSpec {
data: json!({ data: json!({
"route": { "route": {
"receiver": self.name, "receiver": self.name,
@ -50,7 +50,68 @@ impl AlertReceiver<CRDPrometheus> for WebhookReceiver {
}), }),
}; };
let alertmanager_configs = AlertmanagerConfig { let alertmanager_configs = crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::AlertmanagerConfig {
metadata: ObjectMeta {
name: Some(self.name.clone()),
labels: Some(std::collections::BTreeMap::from([(
"alertmanagerConfig".to_string(),
"enabled".to_string(),
)])),
namespace: Some(sender.namespace.clone()),
..Default::default()
},
spec,
};
debug!(
"alert manager configs: \n{:#?}",
alertmanager_configs.clone()
);
sender
.client
.apply(&alertmanager_configs, Some(&sender.namespace))
.await?;
Ok(Outcome::success(format!(
"installed rhob-alertmanagerconfigs for {}",
self.name
)))
}
fn name(&self) -> String {
"webhook-receiver".to_string()
}
fn clone_box(&self) -> Box<dyn AlertReceiver<RHOBObservability>> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[async_trait]
impl AlertReceiver<CRDPrometheus> for WebhookReceiver {
async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> {
let spec = crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::AlertmanagerConfigSpec {
data: json!({
"route": {
"receiver": self.name,
},
"receivers": [
{
"name": self.name,
"webhookConfigs": [
{
"url": self.url,
}
]
}
]
}),
};
let alertmanager_configs = crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::AlertmanagerConfig {
metadata: ObjectMeta { metadata: ObjectMeta {
name: Some(self.name.clone()), name: Some(self.name.clone()),
labels: Some(std::collections::BTreeMap::from([( labels: Some(std::collections::BTreeMap::from([(
@ -115,6 +176,7 @@ impl PrometheusReceiver for WebhookReceiver {
self.get_config().await self.get_config().await
} }
} }
#[async_trait] #[async_trait]
impl AlertReceiver<KubePrometheus> for WebhookReceiver { impl AlertReceiver<KubePrometheus> for WebhookReceiver {
async fn install(&self, sender: &KubePrometheus) -> Result<Outcome, InterpretError> { async fn install(&self, sender: &KubePrometheus) -> Result<Outcome, InterpretError> {

View File

@ -1 +1,2 @@
pub mod application_monitoring_score; pub mod application_monitoring_score;
pub mod rhobs_application_monitoring_score;

View File

@ -0,0 +1,94 @@
use std::sync::Arc;
use async_trait::async_trait;
use serde::Serialize;
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
modules::{
application::Application,
monitoring::kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus, rhob_alertmanager_config::RHOBObservability,
},
prometheus::prometheus::PrometheusApplicationMonitoring,
},
score::Score,
topology::{PreparationOutcome, Topology, oberservability::monitoring::AlertReceiver},
};
use harmony_types::id::Id;
#[derive(Debug, Clone, Serialize)]
pub struct ApplicationRHOBMonitoringScore {
pub sender: RHOBObservability,
pub application: Arc<dyn Application>,
pub receivers: Vec<Box<dyn AlertReceiver<RHOBObservability>>>,
}
impl<T: Topology + PrometheusApplicationMonitoring<RHOBObservability>> Score<T>
for ApplicationRHOBMonitoringScore
{
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(ApplicationRHOBMonitoringInterpret {
score: self.clone(),
})
}
fn name(&self) -> String {
format!(
"{} monitoring [ApplicationRHOBMonitoringScore]",
self.application.name()
)
}
}
#[derive(Debug)]
pub struct ApplicationRHOBMonitoringInterpret {
score: ApplicationRHOBMonitoringScore,
}
#[async_trait]
impl<T: Topology + PrometheusApplicationMonitoring<RHOBObservability>> Interpret<T>
for ApplicationRHOBMonitoringInterpret
{
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let result = topology
.install_prometheus(
&self.score.sender,
inventory,
Some(self.score.receivers.clone()),
)
.await;
match result {
Ok(outcome) => match outcome {
PreparationOutcome::Success { details: _ } => {
Ok(Outcome::success("Prometheus installed".into()))
}
PreparationOutcome::Noop => Ok(Outcome::noop()),
},
Err(err) => Err(InterpretError::from(err)),
}
}
fn get_name(&self) -> InterpretName {
InterpretName::ApplicationMonitoring
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@ -7,5 +7,15 @@ pub mod crd_prometheuses;
pub mod grafana_default_dashboard; pub mod grafana_default_dashboard;
pub mod grafana_operator; pub mod grafana_operator;
pub mod prometheus_operator; pub mod prometheus_operator;
pub mod rhob_alertmanager_config;
pub mod rhob_alertmanagers;
pub mod rhob_cluster_observability_operator;
pub mod rhob_default_rules;
pub mod rhob_grafana;
pub mod rhob_monitoring_stack;
pub mod rhob_prometheus_rules;
pub mod rhob_prometheuses;
pub mod rhob_role;
pub mod rhob_service_monitor;
pub mod role; pub mod role;
pub mod service_monitor; pub mod service_monitor;

View File

@ -0,0 +1,50 @@
use std::sync::Arc;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::topology::{
k8s::K8sClient,
oberservability::monitoring::{AlertReceiver, AlertSender},
};
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "monitoring.rhobs",
version = "v1alpha1",
kind = "AlertmanagerConfig",
plural = "alertmanagerconfigs",
namespaced
)]
pub struct AlertmanagerConfigSpec {
#[serde(flatten)]
pub data: serde_json::Value,
}
#[derive(Debug, Clone, Serialize)]
pub struct RHOBObservability {
pub namespace: String,
pub client: Arc<K8sClient>,
}
impl AlertSender for RHOBObservability {
fn name(&self) -> String {
"RHOBAlertManager".to_string()
}
}
impl Clone for Box<dyn AlertReceiver<RHOBObservability>> {
fn clone(&self) -> Self {
self.clone_box()
}
}
impl Serialize for Box<dyn AlertReceiver<RHOBObservability>> {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
todo!()
}
}

View File

@ -0,0 +1,52 @@
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use super::crd_prometheuses::LabelSelector;
/// Rust CRD for `Alertmanager` from Prometheus Operator
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "monitoring.rhobs",
version = "v1",
kind = "Alertmanager",
plural = "alertmanagers",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct AlertmanagerSpec {
/// Number of replicas for HA
pub replicas: i32,
/// Selectors for AlertmanagerConfig CRDs
#[serde(default, skip_serializing_if = "Option::is_none")]
pub alertmanager_config_selector: Option<LabelSelector>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub alertmanager_config_namespace_selector: Option<LabelSelector>,
/// Optional pod template metadata (annotations, labels)
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pod_metadata: Option<LabelSelector>,
/// Optional topology spread settings
#[serde(default, skip_serializing_if = "Option::is_none")]
pub version: Option<String>,
}
impl Default for AlertmanagerSpec {
fn default() -> Self {
AlertmanagerSpec {
replicas: 1,
// Match all AlertmanagerConfigs in the same namespace
alertmanager_config_namespace_selector: None,
// Empty selector matches all AlertmanagerConfigs in that namespace
alertmanager_config_selector: Some(LabelSelector::default()),
pod_metadata: None,
version: None,
}
}
}

View File

@ -0,0 +1,22 @@
use std::str::FromStr;
use non_blank_string_rs::NonBlankString;
use crate::modules::helm::chart::HelmChartScore;
//TODO package chart or something for COO okd
pub fn rhob_cluster_observability_operator() -> HelmChartScore {
HelmChartScore {
namespace: None,
release_name: NonBlankString::from_str("").unwrap(),
chart_name: NonBlankString::from_str(
"oci://hub.nationtech.io/harmony/nt-prometheus-operator",
)
.unwrap(),
chart_version: None,
values_overrides: None,
values_yaml: None,
create_namespace: true,
install_only: true,
repository: None,
}
}

View File

@ -0,0 +1,26 @@
use crate::modules::{
monitoring::kube_prometheus::crd::rhob_prometheus_rules::Rule,
prometheus::alerts::k8s::{
deployment::alert_deployment_unavailable,
pod::{alert_container_restarting, alert_pod_not_ready, pod_failed},
pvc::high_pvc_fill_rate_over_two_days,
service::alert_service_down,
},
};
pub fn build_default_application_rules() -> Vec<Rule> {
let pod_failed: Rule = pod_failed().into();
let container_restarting: Rule = alert_container_restarting().into();
let pod_not_ready: Rule = alert_pod_not_ready().into();
let service_down: Rule = alert_service_down().into();
let deployment_unavailable: Rule = alert_deployment_unavailable().into();
let high_pvc_fill_rate: Rule = high_pvc_fill_rate_over_two_days().into();
vec![
pod_failed,
container_restarting,
pod_not_ready,
service_down,
deployment_unavailable,
high_pvc_fill_rate,
]
}

View File

@ -0,0 +1,153 @@
use std::collections::BTreeMap;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::modules::monitoring::kube_prometheus::crd::rhob_prometheuses::LabelSelector;
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "grafana.integreatly.org",
version = "v1beta1",
kind = "Grafana",
plural = "grafanas",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub config: Option<GrafanaConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub admin_user: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub admin_password: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub ingress: Option<GrafanaIngress>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub persistence: Option<GrafanaPersistence>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub resources: Option<ResourceRequirements>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub log: Option<GrafanaLogConfig>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub security: Option<GrafanaSecurityConfig>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaLogConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub mode: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub level: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaSecurityConfig {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub admin_user: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub admin_password: Option<String>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaIngress {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub enabled: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub hosts: Option<Vec<String>>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaPersistence {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub enabled: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub storage_class_name: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub size: Option<String>,
}
// ------------------------------------------------------------------------------------------------
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "grafana.integreatly.org",
version = "v1beta1",
kind = "GrafanaDashboard",
plural = "grafanadashboards",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaDashboardSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub resync_period: Option<String>,
pub instance_selector: LabelSelector,
pub json: String,
}
// ------------------------------------------------------------------------------------------------
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "grafana.integreatly.org",
version = "v1beta1",
kind = "GrafanaDatasource",
plural = "grafanadatasources",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaDatasourceSpec {
pub instance_selector: LabelSelector,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub allow_cross_namespace_import: Option<bool>,
pub datasource: GrafanaDatasourceConfig,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaDatasourceConfig {
pub access: String,
pub database: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub json_data: Option<BTreeMap<String, String>>,
pub name: String,
pub r#type: String,
pub url: String,
}
// ------------------------------------------------------------------------------------------------
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, Default)]
#[serde(rename_all = "camelCase")]
pub struct ResourceRequirements {
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub limits: BTreeMap<String, String>,
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub requests: BTreeMap<String, String>,
}

View File

@ -0,0 +1,41 @@
use std::collections::BTreeMap;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::modules::monitoring::kube_prometheus::crd::rhob_prometheuses::LabelSelector;
/// MonitoringStack CRD for monitoring.rhobs/v1alpha1
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "monitoring.rhobs",
version = "v1alpha1",
kind = "MonitoringStack",
plural = "monitoringstacks",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct MonitoringStackSpec {
/// Verbosity of logs (e.g. "debug", "info", "warn", "error").
#[serde(default, skip_serializing_if = "Option::is_none")]
pub log_level: Option<String>,
/// Retention period for Prometheus TSDB data (e.g. "1d").
#[serde(default, skip_serializing_if = "Option::is_none")]
pub retention: Option<String>,
/// Resource selector for workloads monitored by this stack.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub resource_selector: Option<LabelSelector>,
}
impl Default for MonitoringStackSpec {
fn default() -> Self {
MonitoringStackSpec {
log_level: Some("info".into()),
retention: Some("7d".into()),
resource_selector: Some(LabelSelector::default()),
}
}
}

View File

@ -0,0 +1,57 @@
use std::collections::BTreeMap;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::modules::monitoring::alert_rule::prometheus_alert_rule::PrometheusAlertRule;
#[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[kube(
group = "monitoring.rhobs",
version = "v1",
kind = "PrometheusRule",
plural = "prometheusrules",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct PrometheusRuleSpec {
pub groups: Vec<RuleGroup>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct RuleGroup {
pub name: String,
pub rules: Vec<Rule>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Rule {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub alert: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub expr: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub for_: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub labels: Option<std::collections::BTreeMap<String, String>>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub annotations: Option<std::collections::BTreeMap<String, String>>,
}
impl From<PrometheusAlertRule> for Rule {
fn from(value: PrometheusAlertRule) -> Self {
Rule {
alert: Some(value.alert),
expr: Some(value.expr),
for_: value.r#for,
labels: Some(value.labels.into_iter().collect::<BTreeMap<_, _>>()),
annotations: Some(value.annotations.into_iter().collect::<BTreeMap<_, _>>()),
}
}
}

View File

@ -0,0 +1,118 @@
use std::collections::BTreeMap;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::modules::monitoring::kube_prometheus::types::Operator;
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "monitoring.rhobs",
version = "v1",
kind = "Prometheus",
plural = "prometheuses",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct PrometheusSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub alerting: Option<PrometheusSpecAlerting>,
pub service_account_name: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub service_monitor_namespace_selector: Option<LabelSelector>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub service_monitor_selector: Option<LabelSelector>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub service_discovery_role: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub pod_monitor_selector: Option<LabelSelector>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub rule_selector: Option<LabelSelector>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub rule_namespace_selector: Option<LabelSelector>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
#[serde(rename_all = "camelCase")]
pub struct NamespaceSelector {
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub match_names: Vec<String>,
}
/// Contains alerting configuration, specifically Alertmanager endpoints.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
pub struct PrometheusSpecAlerting {
#[serde(skip_serializing_if = "Option::is_none")]
pub alertmanagers: Option<Vec<AlertmanagerEndpoints>>,
}
/// Represents an Alertmanager endpoint configuration used by Prometheus.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
pub struct AlertmanagerEndpoints {
/// Name of the Alertmanager Service.
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
/// Namespace of the Alertmanager Service.
#[serde(skip_serializing_if = "Option::is_none")]
pub namespace: Option<String>,
/// Port to access on the Alertmanager Service (e.g. "web").
#[serde(skip_serializing_if = "Option::is_none")]
pub port: Option<String>,
/// Scheme to use for connecting (e.g. "http").
#[serde(skip_serializing_if = "Option::is_none")]
pub scheme: Option<String>,
// Other fields like `tls_config`, `path_prefix`, etc., can be added if needed.
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, Default)]
#[serde(rename_all = "camelCase")]
pub struct LabelSelector {
#[serde(default, skip_serializing_if = "BTreeMap::is_empty")]
pub match_labels: BTreeMap<String, String>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub match_expressions: Vec<LabelSelectorRequirement>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct LabelSelectorRequirement {
pub key: String,
pub operator: Operator,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub values: Vec<String>,
}
impl Default for PrometheusSpec {
fn default() -> Self {
PrometheusSpec {
alerting: None,
service_account_name: "prometheus".into(),
// null means "only my namespace"
service_monitor_namespace_selector: None,
// empty selector means match all ServiceMonitors in that namespace
service_monitor_selector: Some(LabelSelector::default()),
service_discovery_role: Some("Endpoints".into()),
pod_monitor_selector: None,
rule_selector: None,
rule_namespace_selector: Some(LabelSelector::default()),
}
}
}

View File

@ -0,0 +1,62 @@
use k8s_openapi::api::{
core::v1::ServiceAccount,
rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject},
};
use kube::api::ObjectMeta;
pub fn build_prom_role(role_name: String, namespace: String) -> Role {
Role {
metadata: ObjectMeta {
name: Some(role_name),
namespace: Some(namespace),
..Default::default()
},
rules: Some(vec![PolicyRule {
api_groups: Some(vec!["".into()]), // core API group
resources: Some(vec!["services".into(), "endpoints".into(), "pods".into()]),
verbs: vec!["get".into(), "list".into(), "watch".into()],
..Default::default()
}]),
}
}
pub fn build_prom_rolebinding(
role_name: String,
namespace: String,
service_account_name: String,
) -> RoleBinding {
RoleBinding {
metadata: ObjectMeta {
name: Some(format!("{}-rolebinding", role_name)),
namespace: Some(namespace.clone()),
..Default::default()
},
role_ref: RoleRef {
api_group: "rbac.authorization.k8s.io".into(),
kind: "Role".into(),
name: role_name,
},
subjects: Some(vec![Subject {
kind: "ServiceAccount".into(),
name: service_account_name,
namespace: Some(namespace.clone()),
..Default::default()
}]),
}
}
pub fn build_prom_service_account(
service_account_name: String,
namespace: String,
) -> ServiceAccount {
ServiceAccount {
automount_service_account_token: None,
image_pull_secrets: None,
metadata: ObjectMeta {
name: Some(service_account_name),
namespace: Some(namespace),
..Default::default()
},
secrets: None,
}
}

View File

@ -0,0 +1,87 @@
use std::collections::HashMap;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::modules::monitoring::kube_prometheus::types::{
HTTPScheme, MatchExpression, NamespaceSelector, Operator, Selector,
ServiceMonitor as KubeServiceMonitor, ServiceMonitorEndpoint,
};
/// This is the top-level struct for the ServiceMonitor Custom Resource.
/// The `#[derive(CustomResource)]` macro handles all the boilerplate for you,
/// including the `impl Resource`.
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "monitoring.rhobs",
version = "v1",
kind = "ServiceMonitor",
plural = "servicemonitors",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct ServiceMonitorSpec {
/// A label selector to select services to monitor.
pub selector: Selector,
/// A list of endpoints on the selected services to be monitored.
pub endpoints: Vec<ServiceMonitorEndpoint>,
/// Selector to select which namespaces the Kubernetes Endpoints objects
/// are discovered from.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub namespace_selector: Option<NamespaceSelector>,
/// The label to use to retrieve the job name from.
#[serde(default, skip_serializing_if = "Option::is_none")]
pub job_label: Option<String>,
/// Pod-based target labels to transfer from the Kubernetes Pod onto the target.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub pod_target_labels: Vec<String>,
/// TargetLabels transfers labels on the Kubernetes Service object to the target.
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub target_labels: Vec<String>,
}
impl Default for ServiceMonitorSpec {
fn default() -> Self {
let labels = HashMap::new();
Self {
selector: Selector {
match_labels: { labels },
match_expressions: vec![MatchExpression {
key: "app.kubernetes.io/name".into(),
operator: Operator::Exists,
values: vec![],
}],
},
endpoints: vec![ServiceMonitorEndpoint {
port: Some("http".to_string()),
path: Some("/metrics".into()),
interval: Some("30s".into()),
scheme: Some(HTTPScheme::HTTP),
..Default::default()
}],
namespace_selector: None, // only the same namespace
job_label: Some("app".into()),
pod_target_labels: vec![],
target_labels: vec![],
}
}
}
impl From<KubeServiceMonitor> for ServiceMonitorSpec {
fn from(value: KubeServiceMonitor) -> Self {
Self {
selector: value.selector,
endpoints: value.endpoints,
namespace_selector: value.namespace_selector,
job_label: value.job_label,
pod_target_labels: value.pod_target_labels,
target_labels: value.target_labels,
}
}
}

View File

@ -197,11 +197,6 @@ impl K8sPrometheusCRDAlertingInterpret {
} }
async fn ensure_grafana_operator(&self) -> Result<Outcome, InterpretError> { async fn ensure_grafana_operator(&self) -> Result<Outcome, InterpretError> {
if self.crd_exists("grafanas.grafana.integreatly.org").await {
debug!("grafana CRDs already exist — skipping install.");
return Ok(Outcome::success("Grafana CRDs already exist".to_string()));
}
let _ = Command::new("helm") let _ = Command::new("helm")
.args([ .args([
"repo", "repo",

View File

@ -2,3 +2,4 @@ pub mod alerts;
pub mod k8s_prometheus_alerting_score; pub mod k8s_prometheus_alerting_score;
#[allow(clippy::module_inception)] #[allow(clippy::module_inception)]
pub mod prometheus; pub mod prometheus;
pub mod rhob_alerting_score;

View File

@ -0,0 +1,486 @@
use std::fs;
use std::{collections::BTreeMap, sync::Arc};
use tempfile::tempdir;
use async_trait::async_trait;
use kube::api::ObjectMeta;
use log::{debug, info};
use serde::Serialize;
use std::process::Command;
use crate::modules::monitoring::kube_prometheus::crd::grafana_default_dashboard::build_default_dashboard;
use crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::RHOBObservability;
use crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanagers::{
Alertmanager, AlertmanagerSpec,
};
use crate::modules::monitoring::kube_prometheus::crd::rhob_grafana::{
Grafana, GrafanaDashboard, GrafanaDashboardSpec, GrafanaDatasource, GrafanaDatasourceConfig,
GrafanaDatasourceSpec, GrafanaSpec,
};
use crate::modules::monitoring::kube_prometheus::crd::rhob_monitoring_stack::{
MonitoringStack, MonitoringStackSpec,
};
use crate::modules::monitoring::kube_prometheus::crd::rhob_prometheus_rules::{
PrometheusRule, PrometheusRuleSpec, RuleGroup,
};
use crate::modules::monitoring::kube_prometheus::crd::rhob_prometheuses::LabelSelector;
use crate::modules::monitoring::kube_prometheus::crd::rhob_service_monitor::{
ServiceMonitor, ServiceMonitorSpec,
};
use crate::score::Score;
use crate::topology::oberservability::monitoring::AlertReceiver;
use crate::topology::{K8sclient, Topology, k8s::K8sClient};
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
};
use harmony_types::id::Id;
use super::prometheus::PrometheusApplicationMonitoring;
#[derive(Clone, Debug, Serialize)]
pub struct RHOBAlertingScore {
pub sender: RHOBObservability,
pub receivers: Vec<Box<dyn AlertReceiver<RHOBObservability>>>,
pub service_monitors: Vec<ServiceMonitor>,
pub prometheus_rules: Vec<RuleGroup>,
}
impl<T: Topology + K8sclient + PrometheusApplicationMonitoring<RHOBObservability>> Score<T>
for RHOBAlertingScore
{
fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> {
Box::new(RHOBAlertingInterpret {
sender: self.sender.clone(),
receivers: self.receivers.clone(),
service_monitors: self.service_monitors.clone(),
prometheus_rules: self.prometheus_rules.clone(),
})
}
fn name(&self) -> String {
"RHOB alerting [RHOBAlertingScore]".into()
}
}
#[derive(Clone, Debug)]
pub struct RHOBAlertingInterpret {
pub sender: RHOBObservability,
pub receivers: Vec<Box<dyn AlertReceiver<RHOBObservability>>>,
pub service_monitors: Vec<ServiceMonitor>,
pub prometheus_rules: Vec<RuleGroup>,
}
#[async_trait]
impl<T: Topology + K8sclient + PrometheusApplicationMonitoring<RHOBObservability>> Interpret<T>
for RHOBAlertingInterpret
{
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await.unwrap();
self.ensure_grafana_operator().await?;
self.install_prometheus(&client).await?;
self.install_client_kube_metrics().await?;
self.install_grafana(&client).await?;
self.install_receivers(&self.sender, &self.receivers)
.await?;
self.install_rules(&self.prometheus_rules, &client).await?;
self.install_monitors(self.service_monitors.clone(), &client)
.await?;
Ok(Outcome::success(
"K8s monitoring components installed".to_string(),
))
}
fn get_name(&self) -> InterpretName {
InterpretName::RHOBAlerting
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl RHOBAlertingInterpret {
async fn crd_exists(&self, crd: &str) -> bool {
let status = Command::new("sh")
.args(["-c", &format!("kubectl get crd -A | grep -i {crd}")])
.status()
.map_err(|e| InterpretError::new(format!("could not connect to cluster: {}", e)))
.unwrap();
status.success()
}
async fn install_chart(
&self,
chart_path: String,
chart_name: String,
) -> Result<(), InterpretError> {
let temp_dir =
tempdir().map_err(|e| InterpretError::new(format!("Tempdir error: {}", e)))?;
let temp_path = temp_dir.path().to_path_buf();
debug!("Using temp directory: {}", temp_path.display());
let chart = format!("{}/{}", chart_path, chart_name);
let pull_output = Command::new("helm")
.args(["pull", &chart, "--destination", temp_path.to_str().unwrap()])
.output()
.map_err(|e| InterpretError::new(format!("Helm pull error: {}", e)))?;
if !pull_output.status.success() {
return Err(InterpretError::new(format!(
"Helm pull failed: {}",
String::from_utf8_lossy(&pull_output.stderr)
)));
}
let tgz_path = fs::read_dir(&temp_path)
.unwrap()
.filter_map(|entry| {
let entry = entry.ok()?;
let path = entry.path();
if path.extension()? == "tgz" {
Some(path)
} else {
None
}
})
.next()
.ok_or_else(|| InterpretError::new("Could not find pulled Helm chart".into()))?;
debug!("Installing chart from: {}", tgz_path.display());
let install_output = Command::new("helm")
.args([
"upgrade",
"--install",
&chart_name,
tgz_path.to_str().unwrap(),
"--namespace",
&self.sender.namespace.clone(),
"--create-namespace",
"--wait",
"--atomic",
])
.output()
.map_err(|e| InterpretError::new(format!("Helm install error: {}", e)))?;
if !install_output.status.success() {
return Err(InterpretError::new(format!(
"Helm install failed: {}",
String::from_utf8_lossy(&install_output.stderr)
)));
}
debug!(
"Installed chart {}/{} in namespace: {}",
&chart_path,
&chart_name,
self.sender.namespace.clone()
);
Ok(())
}
async fn ensure_grafana_operator(&self) -> Result<Outcome, InterpretError> {
let _ = Command::new("helm")
.args([
"repo",
"add",
"grafana-operator",
"https://grafana.github.io/helm-charts",
])
.output()
.unwrap();
let _ = Command::new("helm")
.args(["repo", "update"])
.output()
.unwrap();
let output = Command::new("helm")
.args([
"install",
"grafana-operator",
"grafana-operator/grafana-operator",
"--namespace",
&self.sender.namespace.clone(),
"--create-namespace",
"--set",
"namespaceScope=true",
])
.output()
.unwrap();
if !output.status.success() {
return Err(InterpretError::new(format!(
"helm install failed:\nstdout: {}\nstderr: {}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr)
)));
}
Ok(Outcome::success(format!(
"installed grafana operator in ns {}",
self.sender.namespace.clone()
)))
}
async fn install_prometheus(&self, client: &Arc<K8sClient>) -> Result<Outcome, InterpretError> {
debug!(
"installing crd-prometheuses in namespace {}",
self.sender.namespace.clone()
);
let stack = MonitoringStack {
metadata: ObjectMeta {
name: Some(format!("{}-monitoring", self.sender.namespace.clone()).into()),
namespace: Some(self.sender.namespace.clone()),
labels: Some([("coo".into(), "example".into())].into()),
..Default::default()
},
spec: MonitoringStackSpec {
log_level: Some("debug".into()),
retention: Some("1d".into()),
resource_selector: Some(LabelSelector {
match_labels: [("app".into(), "demo".into())].into(),
..Default::default()
}),
},
};
client
.apply(&stack, Some(&self.sender.namespace.clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
info!("installed rhob monitoring stack",);
Ok(Outcome::success(format!(
"successfully deployed rhob-prometheus {:#?}",
stack
)))
}
async fn install_alert_manager(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let am = Alertmanager {
metadata: ObjectMeta {
name: Some(self.sender.namespace.clone()),
labels: Some(std::collections::BTreeMap::from([(
"alertmanagerConfig".to_string(),
"enabled".to_string(),
)])),
namespace: Some(self.sender.namespace.clone()),
..Default::default()
},
spec: AlertmanagerSpec::default(),
};
client
.apply(&am, Some(&self.sender.namespace.clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
Ok(Outcome::success(format!(
"successfully deployed service monitor {:#?}",
am.metadata.name
)))
}
async fn install_monitors(
&self,
mut monitors: Vec<ServiceMonitor>,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let default_service_monitor = ServiceMonitor {
metadata: ObjectMeta {
name: Some(self.sender.namespace.clone()),
labels: Some(std::collections::BTreeMap::from([
("alertmanagerConfig".to_string(), "enabled".to_string()),
("client".to_string(), "prometheus".to_string()),
(
"app.kubernetes.io/name".to_string(),
"kube-state-metrics".to_string(),
),
])),
namespace: Some(self.sender.namespace.clone()),
..Default::default()
},
spec: ServiceMonitorSpec::default(),
};
monitors.push(default_service_monitor);
for monitor in monitors.iter() {
client
.apply(monitor, Some(&self.sender.namespace.clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
}
Ok(Outcome::success(
"succesfully deployed service monitors".to_string(),
))
}
async fn install_rules(
&self,
#[allow(clippy::ptr_arg)] rules: &Vec<RuleGroup>,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let mut prom_rule_spec = PrometheusRuleSpec {
groups: rules.clone(),
};
let default_rules_group = RuleGroup {
name: "default-rules".to_string(),
rules: crate::modules::monitoring::kube_prometheus::crd::rhob_default_rules::build_default_application_rules(),
};
prom_rule_spec.groups.push(default_rules_group);
let prom_rules = PrometheusRule {
metadata: ObjectMeta {
name: Some(self.sender.namespace.clone()),
labels: Some(std::collections::BTreeMap::from([
("alertmanagerConfig".to_string(), "enabled".to_string()),
("role".to_string(), "prometheus-rule".to_string()),
])),
namespace: Some(self.sender.namespace.clone()),
..Default::default()
},
spec: prom_rule_spec,
};
client
.apply(&prom_rules, Some(&self.sender.namespace.clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
Ok(Outcome::success(format!(
"successfully deployed rules {:#?}",
prom_rules.metadata.name
)))
}
async fn install_client_kube_metrics(&self) -> Result<Outcome, InterpretError> {
self.install_chart(
"oci://hub.nationtech.io/harmony".to_string(),
"nt-kube-metrics".to_string(),
)
.await?;
Ok(Outcome::success(format!(
"Installed client kube metrics in ns {}",
&self.sender.namespace.clone()
)))
}
async fn install_grafana(&self, client: &Arc<K8sClient>) -> Result<Outcome, InterpretError> {
let mut label = BTreeMap::new();
label.insert("dashboards".to_string(), "grafana".to_string());
let labels = LabelSelector {
match_labels: label.clone(),
match_expressions: vec![],
};
let mut json_data = BTreeMap::new();
json_data.insert("timeInterval".to_string(), "5s".to_string());
let namespace = self.sender.namespace.clone();
let json = build_default_dashboard(&namespace);
let graf_data_source = GrafanaDatasource {
metadata: ObjectMeta {
name: Some(format!(
"grafana-datasource-{}",
self.sender.namespace.clone()
)),
namespace: Some(self.sender.namespace.clone()),
..Default::default()
},
spec: GrafanaDatasourceSpec {
instance_selector: labels.clone(),
allow_cross_namespace_import: Some(false),
datasource: GrafanaDatasourceConfig {
access: "proxy".to_string(),
database: Some("prometheus".to_string()),
json_data: Some(json_data),
//this is fragile
name: format!("prometheus-{}-0", self.sender.namespace.clone()),
r#type: "prometheus".to_string(),
url: format!(
"http://prometheus-operated.{}.svc.cluster.local:9090",
self.sender.namespace.clone()
),
},
},
};
client
.apply(&graf_data_source, Some(&self.sender.namespace.clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
let graf_dashboard = GrafanaDashboard {
metadata: ObjectMeta {
name: Some(format!(
"grafana-dashboard-{}",
self.sender.namespace.clone()
)),
namespace: Some(self.sender.namespace.clone()),
..Default::default()
},
spec: GrafanaDashboardSpec {
resync_period: Some("30s".to_string()),
instance_selector: labels.clone(),
json,
},
};
client
.apply(&graf_dashboard, Some(&self.sender.namespace.clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
let grafana = Grafana {
metadata: ObjectMeta {
name: Some(format!("grafana-{}", self.sender.namespace.clone())),
namespace: Some(self.sender.namespace.clone()),
labels: Some(label.clone()),
..Default::default()
},
spec: GrafanaSpec {
config: None,
admin_user: None,
admin_password: None,
ingress: None,
persistence: None,
resources: None,
},
};
client
.apply(&grafana, Some(&self.sender.namespace.clone()))
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
Ok(Outcome::success(format!(
"successfully deployed grafana instance {:#?}",
grafana.metadata.name
)))
}
async fn install_receivers(
&self,
sender: &RHOBObservability,
receivers: &Vec<Box<dyn AlertReceiver<RHOBObservability>>>,
) -> Result<Outcome, InterpretError> {
for receiver in receivers.iter() {
receiver.install(sender).await.map_err(|err| {
InterpretError::new(format!("failed to install receiver: {}", err))
})?;
}
Ok(Outcome::success("successfully deployed receivers".into()))
}
}

View File

@ -155,24 +155,24 @@ pub fn cidrv4(input: TokenStream) -> TokenStream {
/// ///
/// ``` /// ```
/// use harmony_types::net::Url; /// use harmony_types::net::Url;
/// use harmony_macros::remote_url; /// use harmony_macros::hurl;
/// ///
/// let remote_url = remote_url!("https://example.com/path"); /// let url = hurl!("https://example.com/path");
/// ///
/// let expected_url = url::Url::parse("https://example.com/path").unwrap(); /// let expected_url = url::Url::parse("https://example.com/path").unwrap();
/// assert!(matches!(remote_url, Url::Url(expected_url))); /// assert!(matches!(url, Url::Url(expected_url)));
/// ``` /// ```
/// ///
/// The following example will fail to compile: /// The following example will fail to compile:
/// ///
/// ```rust,compile_fail /// ```rust,compile_fail
/// use harmony_macros::remote_url; /// use harmony_macros::hurl;
/// ///
/// // This is not a valid URL and will cause a compilation error. /// // This is not a valid URL and will cause a compilation error.
/// let _invalid = remote_url!("not a valid url"); /// let _invalid = hurl!("not a valid url");
/// ``` /// ```
#[proc_macro] #[proc_macro]
pub fn remote_url(input: TokenStream) -> TokenStream { pub fn hurl(input: TokenStream) -> TokenStream {
let input_lit = parse_macro_input!(input as LitStr); let input_lit = parse_macro_input!(input as LitStr);
let url_str = input_lit.value(); let url_str = input_lit.value();

View File

@ -53,7 +53,7 @@ pub type IpAddress = std::net::IpAddr;
/// Represents a URL, which can either be a remote URL or a local file path. /// Represents a URL, which can either be a remote URL or a local file path.
/// ///
/// For convenience, the `harmony_macros` crate provides `remote_url!` and `local_folder!` /// For convenience, the `harmony_macros` crate provides `hurl!` and `local_folder!`
/// macros to construct `Url` variants from string literals. /// macros to construct `Url` variants from string literals.
/// ///
/// # Examples /// # Examples
@ -67,10 +67,10 @@ pub type IpAddress = std::net::IpAddr;
/// // The `use` statement below is for the doc test. In a real project, /// // The `use` statement below is for the doc test. In a real project,
/// // you would use `use harmony_types::Url;` /// // you would use `use harmony_types::Url;`
/// # use harmony_types::net::Url; /// # use harmony_types::net::Url;
/// let remote_url = Url::Url(url::Url::parse("https://example.com").unwrap()); /// let url = Url::Url(url::Url::parse("https://example.com").unwrap());
/// let local_path = Url::LocalFolder("/var/data".to_string()); /// let local_path = Url::LocalFolder("/var/data".to_string());
/// ///
/// assert!(matches!(remote_url, Url::Url(_))); /// assert!(matches!(url, Url::Url(_)));
/// assert!(matches!(local_path, Url::LocalFolder(_))); /// assert!(matches!(local_path, Url::LocalFolder(_)));
/// ``` /// ```
/// ///
@ -79,10 +79,10 @@ pub type IpAddress = std::net::IpAddr;
/// If `harmony_macros` is a dependency, you can create `Url`s more concisely. /// If `harmony_macros` is a dependency, you can create `Url`s more concisely.
/// ///
/// ```rust,ignore /// ```rust,ignore
/// use harmony_macros::{remote_url, local_folder}; /// use harmony_macros::{hurl, local_folder};
/// use harmony_types::Url; /// use harmony_types::Url;
/// ///
/// let remote_url = remote_url!("https://example.com"); /// let hurl = hurl!("https://example.com");
/// let local_path = local_folder!("/var/data"); /// let local_path = local_folder!("/var/data");
/// ``` /// ```
#[derive(Debug, Clone)] #[derive(Debug, Clone)]