fix: merge conflict
Some checks failed
Run Check Script / check (pull_request) Failing after 41s

This commit is contained in:
Willem 2025-07-02 13:46:26 -04:00
commit 82119076cf
30 changed files with 1463 additions and 279 deletions

789
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@ -24,19 +24,31 @@ log = "0.4"
env_logger = "0.11" env_logger = "0.11"
derive-new = "0.7" derive-new = "0.7"
async-trait = "0.1" async-trait = "0.1"
tokio = { version = "1.40", features = ["io-std", "fs", "macros", "rt-multi-thread"] } tokio = { version = "1.40", features = [
"io-std",
"fs",
"macros",
"rt-multi-thread",
] }
cidr = { features = ["serde"], version = "0.2" } cidr = { features = ["serde"], version = "0.2" }
russh = "0.45" russh = "0.45"
russh-keys = "0.45" russh-keys = "0.45"
rand = "0.8" rand = "0.8"
url = "2.5" url = "2.5"
kube = "0.98" kube = { version = "1.1.0", features = [
k8s-openapi = { version = "0.24", features = ["v1_30"] } "config",
"client",
"runtime",
"rustls-tls",
"ws",
"jsonpatch",
] }
k8s-openapi = { version = "0.25", features = ["v1_30"] }
serde_yaml = "0.9" serde_yaml = "0.9"
serde-value = "0.7" serde-value = "0.7"
http = "1.2" http = "1.2"
inquire = "0.7" inquire = "0.7"
convert_case = "0.8" convert_case = "0.8"
chrono = "0.4" chrono = "0.4"
similar = "2" similar = "2"
uuid = { version = "1.11", features = [ "v4", "fast-rng", "macro-diagnostics" ] } uuid = { version = "1.11", features = ["v4", "fast-rng", "macro-diagnostics"] }

View File

@ -0,0 +1,78 @@
# Architecture Decision Record: Monitoring Notifications
Initial Author: Taha Hawa
Initial Date: 2025-06-26
Last Updated Date: 2025-06-26
## Status
Proposed
## Context
We need to send notifications (typically from AlertManager/Prometheus) and we need to receive said notifications on mobile devices for sure in some way, whether it's push messages, SMS, phone call, email, etc or all of the above.
## Decision
We should go with https://ntfy.sh except host it ourselves.
`ntfy` is an open source solution written in Go that has the features we need.
## Rationale
`ntfy` has pretty much everything we need (push notifications, email forwarding, receives via webhook), and nothing/not much we don't. Good fit, lightweight.
## Consequences
Pros:
- topics, with ACLs
- lightweight
- reliable
- easy to configure
- mobile app
- the mobile app can listen via websocket, poll, or receive via Firebase/GCM on Android, or similar on iOS.
- Forward to email
- Text-to-Speech phone call messages using Twilio integration
- Operates based on simple HTTP requests/Webhooks, easily usable via AlertManager
Cons:
- No SMS pushes
- SQLite DB, makes it harder to HA/scale
## Alternatives considered
[AWS SNS](https://aws.amazon.com/sns/):
Pros:
- highly reliable
- no hosting needed
Cons:
- no control, not self hosted
- costs (per usage)
[Apprise](https://github.com/caronc/apprise):
Pros:
- Way more ways of sending notifications
- Can use ntfy as one of the backends/ways of sending
Cons:
- Way too overkill for what we need in terms of features
[Gotify](https://github.com/gotify/server):
Pros:
- simple, lightweight, golang, etc
Cons:
- Pushes topics are per-user
## Additional Notes

View File

@ -14,8 +14,8 @@ harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true } log = { workspace = true }
env_logger = { workspace = true } env_logger = { workspace = true }
url = { workspace = true } url = { workspace = true }
kube = "0.98.0" kube = "1.1.0"
k8s-openapi = { version = "0.24.0", features = [ "v1_30" ] } k8s-openapi = { version = "0.25.0", features = ["v1_30"] }
http = "1.2.0" http = "1.2.0"
serde_yaml = "0.9.34" serde_yaml = "0.9.34"
inquire.workspace = true inquire.workspace = true

View File

@ -8,5 +8,6 @@ license.workspace = true
[dependencies] [dependencies]
harmony = { version = "0.1.0", path = "../../harmony" } harmony = { version = "0.1.0", path = "../../harmony" }
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" } harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
harmony_macros = { version = "0.1.0", path = "../../harmony_macros" }
tokio.workspace = true tokio.workspace = true
url.workspace = true url.workspace = true

View File

@ -1,3 +1,5 @@
use std::collections::HashMap;
use harmony::{ use harmony::{
inventory::Inventory, inventory::Inventory,
maestro::Maestro, maestro::Maestro,
@ -41,9 +43,30 @@ async fn main() {
], ],
); );
let service_monitor_endpoint = ServiceMonitorEndpoint {
port: Some("80".to_string()),
path: "/metrics".to_string(),
scheme: HTTPScheme::HTTP,
..Default::default()
};
let service_monitor = ServiceMonitor {
name: "test-service-monitor".to_string(),
selector: Selector {
match_labels: HashMap::new(),
match_expressions: vec![MatchExpression {
key: "test".to_string(),
operator: Operator::In,
values: vec!["test-service".to_string()],
}],
},
endpoints: vec![service_monitor_endpoint],
..Default::default()
};
let alerting_score = HelmPrometheusAlertingScore { let alerting_score = HelmPrometheusAlertingScore {
receivers: vec![Box::new(discord_receiver)], receivers: vec![Box::new(discord_receiver)],
rules: vec![Box::new(additional_rules), Box::new(additional_rules2)], rules: vec![Box::new(additional_rules), Box::new(additional_rules2)],
service_monitors: vec![service_monitor],
}; };
let mut maestro = Maestro::<K8sAnywhereTopology>::initialize( let mut maestro = Maestro::<K8sAnywhereTopology>::initialize(
Inventory::autoload(), Inventory::autoload(),

12
examples/ntfy/Cargo.toml Normal file
View File

@ -0,0 +1,12 @@
[package]
name = "example-ntfy"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { version = "0.1.0", path = "../../harmony" }
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
tokio.workspace = true
url.workspace = true

19
examples/ntfy/src/main.rs Normal file
View File

@ -0,0 +1,19 @@
use harmony::{
inventory::Inventory, maestro::Maestro, modules::monitoring::ntfy::ntfy::NtfyScore,
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let mut maestro = Maestro::<K8sAnywhereTopology>::initialize(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
)
.await
.unwrap();
maestro.register_all(vec![Box::new(NtfyScore {
namespace: "monitoring".to_string(),
})]);
harmony_cli::init(maestro, None).await.unwrap();
}

14
examples/rust/Cargo.toml Normal file
View File

@ -0,0 +1,14 @@
[package]
name = "example-rust"
version = "0.1.0"
edition = "2024"
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
harmony_macros = { path = "../../harmony_macros" }
tokio = { workspace = true }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }

20
examples/rust/src/main.rs Normal file
View File

@ -0,0 +1,20 @@
use harmony::{
inventory::Inventory,
maestro::Maestro,
modules::application::{RustWebappScore, features::ContinuousDelivery},
topology::{K8sAnywhereTopology, Url},
};
#[tokio::main]
async fn main() {
let app = RustWebappScore {
name: "Example Rust Webapp".to_string(),
domain: Url::Url(url::Url::parse("https://rustapp.harmony.example.com").unwrap()),
features: vec![Box::new(ContinuousDelivery {})],
};
let topology = K8sAnywhereTopology::from_env();
let mut maestro = Maestro::new(Inventory::autoload(), topology);
maestro.register_all(vec![Box::new(app)]);
harmony_cli::init(maestro, None).await.unwrap();
}

View File

@ -10,9 +10,9 @@ publish = false
harmony = { path = "../../harmony" } harmony = { path = "../../harmony" }
harmony_tui = { path = "../../harmony_tui" } harmony_tui = { path = "../../harmony_tui" }
harmony_types = { path = "../../harmony_types" } harmony_types = { path = "../../harmony_types" }
harmony_macros = { path = "../../harmony_macros" }
cidr = { workspace = true } cidr = { workspace = true }
tokio = { workspace = true } tokio = { workspace = true }
harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true } log = { workspace = true }
env_logger = { workspace = true } env_logger = { workspace = true }
url = { workspace = true } url = { workspace = true }

View File

@ -54,3 +54,6 @@ fqdn = { version = "0.4.6", features = [
temp-dir = "0.1.14" temp-dir = "0.1.14"
dyn-clone = "1.0.19" dyn-clone = "1.0.19"
similar.workspace = true similar.workspace = true
futures-util = "0.3.31"
tokio-util = "0.7.15"
strum = { version = "0.27.1", features = ["derive"] }

View File

@ -21,6 +21,7 @@ pub enum InterpretName {
OPNSense, OPNSense,
K3dInstallation, K3dInstallation,
TenantInterpret, TenantInterpret,
Application,
} }
impl std::fmt::Display for InterpretName { impl std::fmt::Display for InterpretName {
@ -37,6 +38,7 @@ impl std::fmt::Display for InterpretName {
InterpretName::OPNSense => f.write_str("OPNSense"), InterpretName::OPNSense => f.write_str("OPNSense"),
InterpretName::K3dInstallation => f.write_str("K3dInstallation"), InterpretName::K3dInstallation => f.write_str("K3dInstallation"),
InterpretName::TenantInterpret => f.write_str("Tenant"), InterpretName::TenantInterpret => f.write_str("Tenant"),
InterpretName::Application => f.write_str("Application"),
} }
} }
} }
@ -124,3 +126,11 @@ impl From<kube::Error> for InterpretError {
} }
} }
} }
impl From<String> for InterpretError {
fn from(value: String) -> Self {
Self {
msg: format!("InterpretError : {value}"),
}
}
}

View File

@ -34,6 +34,17 @@ pub struct Inventory {
} }
impl Inventory { impl Inventory {
pub fn empty() -> Self {
Self {
location: Location::new("Empty".to_string(), "location".to_string()),
switch: vec![],
firewall: vec![],
worker_host: vec![],
storage_host: vec![],
control_plane_host: vec![],
}
}
pub fn autoload() -> Self { pub fn autoload() -> Self {
Self { Self {
location: Location::test_building(), location: Location::test_building(),

View File

@ -46,7 +46,7 @@ pub struct HAClusterTopology {
#[async_trait] #[async_trait]
impl Topology for HAClusterTopology { impl Topology for HAClusterTopology {
fn name(&self) -> &str { fn name(&self) -> &str {
todo!() "HAClusterTopology"
} }
async fn ensure_ready(&self) -> Result<Outcome, InterpretError> { async fn ensure_ready(&self) -> Result<Outcome, InterpretError> {
todo!( todo!(

View File

@ -1,14 +1,21 @@
use derive_new::new; use derive_new::new;
use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope}; use futures_util::StreamExt;
use k8s_openapi::{
ClusterResourceScope, NamespaceResourceScope,
api::{apps::v1::Deployment, core::v1::Pod},
};
use kube::runtime::conditions;
use kube::runtime::wait::await_condition;
use kube::{ use kube::{
Api, Client, Config, Error, Resource, Client, Config, Error, Resource,
api::{Patch, PatchParams}, api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt},
config::{KubeConfigOptions, Kubeconfig}, config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse, core::ErrorResponse,
runtime::reflector::Lookup,
}; };
use log::{debug, error, trace}; use log::{debug, error, trace};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use similar::TextDiff; use similar::{DiffableStr, TextDiff};
#[derive(new, Clone)] #[derive(new, Clone)]
pub struct K8sClient { pub struct K8sClient {
@ -33,6 +40,88 @@ impl K8sClient {
}) })
} }
pub async fn wait_until_deployment_ready(
&self,
name: String,
namespace: Option<&str>,
timeout: Option<u64>,
) -> Result<(), String> {
let api: Api<Deployment>;
if let Some(ns) = namespace {
api = Api::namespaced(self.client.clone(), ns);
} else {
api = Api::default_namespaced(self.client.clone());
}
let establish = await_condition(api, name.as_str(), conditions::is_deployment_completed());
let t = if let Some(t) = timeout { t } else { 300 };
let res = tokio::time::timeout(std::time::Duration::from_secs(t), establish).await;
if let Ok(r) = res {
return Ok(());
} else {
return Err("timed out while waiting for deployment".to_string());
}
}
/// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}`
pub async fn exec_app(
&self,
name: String,
namespace: Option<&str>,
command: Vec<&str>,
) -> Result<(), String> {
let api: Api<Pod>;
if let Some(ns) = namespace {
api = Api::namespaced(self.client.clone(), ns);
} else {
api = Api::default_namespaced(self.client.clone());
}
let pod_list = api
.list(&ListParams::default().labels(format!("app.kubernetes.io/name={name}").as_str()))
.await
.expect("couldn't get list of pods");
let res = api
.exec(
pod_list
.items
.first()
.expect("couldn't get pod")
.name()
.expect("couldn't get pod name")
.into_owned()
.as_str(),
command,
&AttachParams::default(),
)
.await;
match res {
Err(e) => return Err(e.to_string()),
Ok(mut process) => {
let status = process
.take_status()
.expect("Couldn't get status")
.await
.expect("Couldn't unwrap status");
if let Some(s) = status.status {
debug!("Status: {}", s);
if s == "Success" {
return Ok(());
} else {
return Err(s);
}
} else {
return Err("Couldn't get inner status of pod exec".to_string());
}
}
}
}
/// Apply a resource in namespace /// Apply a resource in namespace
/// ///
/// See `kubectl apply` for more information on the expected behavior of this function /// See `kubectl apply` for more information on the expected behavior of this function

View File

@ -1,7 +1,14 @@
use async_trait::async_trait; use async_trait::async_trait;
use log::info; use log::info;
use serde_json::Value;
use crate::{modules::application::ApplicationFeature, topology::Topology}; use crate::{
data::Version,
inventory::Inventory,
modules::{application::ApplicationFeature, helm::chart::HelmChartScore},
score::Score,
topology::{HelmCommand, Topology, Url},
};
/// ContinuousDelivery in Harmony provides this functionality : /// ContinuousDelivery in Harmony provides this functionality :
/// ///
@ -30,13 +37,48 @@ use crate::{modules::application::ApplicationFeature, topology::Topology};
/// - Harbor as artifact registru /// - Harbor as artifact registru
/// - ArgoCD to install/upgrade/rollback/inspect k8s resources /// - ArgoCD to install/upgrade/rollback/inspect k8s resources
/// - Kubernetes for runtime orchestration /// - Kubernetes for runtime orchestration
#[derive(Debug, Default)] #[derive(Debug, Default, Clone)]
pub struct ContinuousDelivery {} pub struct ContinuousDelivery {}
#[async_trait] #[async_trait]
impl<T: Topology + 'static> ApplicationFeature<T> for ContinuousDelivery { impl<T: Topology + HelmCommand + 'static> ApplicationFeature<T> for ContinuousDelivery {
async fn ensure_installed(&self, _topology: &T) -> Result<(), String> { async fn ensure_installed(&self, topology: &T) -> Result<(), String> {
info!("Installing ContinuousDelivery feature"); info!("Installing ContinuousDelivery feature");
todo!() let cd_server = HelmChartScore {
namespace: todo!(
"ArgoCD Helm chart with proper understanding of Tenant, see how Will did it for Monitoring for now"
),
release_name: todo!("argocd helm chart whatever"),
chart_name: todo!(),
chart_version: todo!(),
values_overrides: todo!(),
values_yaml: todo!(),
create_namespace: todo!(),
install_only: todo!(),
repository: todo!(),
};
let interpret = cd_server.create_interpret();
interpret.execute(&Inventory::empty(), topology);
todo!("1. Create ArgoCD score that installs argo using helm chart, see if Taha's already done it
2. Package app (docker image, helm chart)
3. Push to registry if staging or prod
4. Poke Argo
5. Ensure app is up")
}
fn name(&self) -> String {
"ContinuousDelivery".to_string()
} }
} }
/// For now this is entirely bound to K8s / ArgoCD, will have to be revisited when we support
/// more CD systems
pub struct CDApplicationConfig {
version: Version,
helm_chart_url: Url,
values_overrides: Value,
}
pub trait ContinuousDeliveryApplication {
fn get_config(&self) -> CDApplicationConfig;
}

View File

@ -6,7 +6,7 @@ use crate::{
topology::{K8sclient, Topology}, topology::{K8sclient, Topology},
}; };
#[derive(Debug)] #[derive(Debug, Clone)]
pub struct PublicEndpoint { pub struct PublicEndpoint {
application_port: u16, application_port: u16,
} }
@ -36,4 +36,7 @@ impl<T: Topology + K8sclient + 'static> ApplicationFeature<T> for PublicEndpoint
); );
todo!() todo!()
} }
fn name(&self) -> String {
"PublicEndpoint".to_string()
}
} }

View File

@ -6,7 +6,7 @@ use crate::{
topology::{HelmCommand, Topology}, topology::{HelmCommand, Topology},
}; };
#[derive(Debug, Default)] #[derive(Debug, Default, Clone)]
pub struct Monitoring {} pub struct Monitoring {}
#[async_trait] #[async_trait]
@ -15,4 +15,7 @@ impl<T: Topology + HelmCommand + 'static> ApplicationFeature<T> for Monitoring {
info!("Ensuring monitoring is available for application"); info!("Ensuring monitoring is available for application");
todo!("create and execute k8s prometheus score, depends on Will's work") todo!("create and execute k8s prometheus score, depends on Will's work")
} }
fn name(&self) -> String {
"Monitoring".to_string()
}
} }

View File

@ -1,9 +1,11 @@
mod feature;
pub mod features; pub mod features;
mod rust; mod rust;
pub use feature::*;
use log::info;
pub use rust::*; pub use rust::*;
use async_trait::async_trait; use async_trait::async_trait;
use serde::Serialize;
use crate::{ use crate::{
data::{Id, Version}, data::{Id, Version},
@ -12,9 +14,14 @@ use crate::{
topology::Topology, topology::Topology,
}; };
pub trait Application: std::fmt::Debug + Send + Sync {
fn name(&self) -> String;
}
#[derive(Debug)] #[derive(Debug)]
pub struct ApplicationInterpret<T: Topology + std::fmt::Debug> { pub struct ApplicationInterpret<T: Topology + std::fmt::Debug> {
features: Vec<Box<dyn ApplicationFeature<T>>>, features: Vec<Box<dyn ApplicationFeature<T>>>,
application: Box<dyn Application>,
} }
#[async_trait] #[async_trait]
@ -22,17 +29,43 @@ impl<T: Topology + std::fmt::Debug> Interpret<T> for ApplicationInterpret<T> {
async fn execute( async fn execute(
&self, &self,
_inventory: &Inventory, _inventory: &Inventory,
_topology: &T, topology: &T,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
todo!() let app_name = self.application.name();
info!(
"Preparing {} features [{}] for application {app_name}",
self.features.len(),
self.features
.iter()
.map(|f| f.name())
.collect::<Vec<String>>()
.join(", ")
);
for feature in self.features.iter() {
info!(
"Installing feature {} for application {app_name}",
feature.name()
);
let _ = match feature.ensure_installed(topology).await {
Ok(()) => (),
Err(msg) => {
return Err(InterpretError::new(format!(
"Application Interpret failed to install feature : {msg}"
)));
}
};
}
todo!(
"Do I need to do anything more than this here?? I feel like the Application trait itself should expose something like ensure_ready but its becoming redundant. We'll see as this evolves."
)
} }
fn get_name(&self) -> InterpretName { fn get_name(&self) -> InterpretName {
todo!() InterpretName::Application
} }
fn get_version(&self) -> Version { fn get_version(&self) -> Version {
todo!() Version::from("1.0.0").unwrap()
} }
fn get_status(&self) -> InterpretStatus { fn get_status(&self) -> InterpretStatus {
@ -43,25 +76,3 @@ impl<T: Topology + std::fmt::Debug> Interpret<T> for ApplicationInterpret<T> {
todo!() todo!()
} }
} }
/// An ApplicationFeature provided by harmony, such as Backups, Monitoring, MultisiteAvailability,
/// ContinuousIntegration, ContinuousDelivery
#[async_trait]
pub trait ApplicationFeature<T: Topology>: std::fmt::Debug + Send + Sync {
async fn ensure_installed(&self, topology: &T) -> Result<(), String>;
}
impl<T: Topology> Serialize for Box<dyn ApplicationFeature<T>> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
todo!()
}
}
impl<T: Topology> Clone for Box<dyn ApplicationFeature<T>> {
fn clone(&self) -> Self {
todo!()
}
}

View File

@ -5,7 +5,7 @@ use crate::{
topology::{Topology, Url}, topology::{Topology, Url},
}; };
use super::{ApplicationFeature, ApplicationInterpret}; use super::{Application, ApplicationFeature, ApplicationInterpret};
#[derive(Debug, Serialize, Clone)] #[derive(Debug, Serialize, Clone)]
pub struct RustWebappScore<T: Topology + Clone + Serialize> { pub struct RustWebappScore<T: Topology + Clone + Serialize> {
@ -16,10 +16,26 @@ pub struct RustWebappScore<T: Topology + Clone + Serialize> {
impl<T: Topology + std::fmt::Debug + Clone + Serialize + 'static> Score<T> for RustWebappScore<T> { impl<T: Topology + std::fmt::Debug + Clone + Serialize + 'static> Score<T> for RustWebappScore<T> {
fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> { fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> {
Box::new(ApplicationInterpret { features: todo!() }) Box::new(ApplicationInterpret {
features: self.features.clone(),
application: Box::new(RustWebapp {
name: self.name.clone(),
}),
})
} }
fn name(&self) -> String { fn name(&self) -> String {
format!("{}-RustWebapp", self.name) format!("{}-RustWebapp", self.name)
} }
} }
#[derive(Debug)]
struct RustWebapp {
name: String,
}
impl Application for RustWebapp {
fn name(&self) -> String {
self.name.clone()
}
}

View File

@ -1,7 +1,7 @@
use serde::Serialize; use serde::Serialize;
use crate::modules::monitoring::kube_prometheus::types::{ use crate::modules::monitoring::kube_prometheus::types::{
AlertManagerAdditionalPromRules, AlertManagerChannelConfig, AlertManagerAdditionalPromRules, AlertManagerChannelConfig, ServiceMonitor,
}; };
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
@ -25,6 +25,7 @@ pub struct KubePrometheusConfig {
pub prometheus_operator: bool, pub prometheus_operator: bool,
pub alert_receiver_configs: Vec<AlertManagerChannelConfig>, pub alert_receiver_configs: Vec<AlertManagerChannelConfig>,
pub alert_rules: Vec<AlertManagerAdditionalPromRules>, pub alert_rules: Vec<AlertManagerAdditionalPromRules>,
pub additional_service_monitors: Vec<ServiceMonitor>,
} }
impl KubePrometheusConfig { impl KubePrometheusConfig {
pub fn new() -> Self { pub fn new() -> Self {
@ -48,6 +49,7 @@ impl KubePrometheusConfig {
kube_scheduler: false, kube_scheduler: false,
alert_receiver_configs: vec![], alert_receiver_configs: vec![],
alert_rules: vec![], alert_rules: vec![],
additional_service_monitors: vec![],
} }
} }
} }

View File

@ -11,9 +11,7 @@ use std::{
use crate::modules::{ use crate::modules::{
helm::chart::HelmChartScore, helm::chart::HelmChartScore,
monitoring::kube_prometheus::types::{ monitoring::kube_prometheus::types::{
AlertGroup, AlertManager, AlertManagerAdditionalPromRules, AlertManagerConfig, AlertGroup, AlertManager, AlertManagerAdditionalPromRules, AlertManagerConfig, AlertManagerRoute, AlertManagerSpec, AlertManagerValues, ConfigReloader, Limits, PrometheusConfig, Requests, Resources
AlertManagerRoute, AlertManagerSpec, AlertManagerValues, ConfigReloader, Limits, Requests,
Resources,
}, },
}; };
@ -278,6 +276,22 @@ prometheusOperator:
"#, "#,
); );
let prometheus_config =
crate::modules::monitoring::kube_prometheus::types::PrometheusConfigValues {
prometheus: PrometheusConfig {
prometheus: bool::from_str(prometheus.as_str()).expect("couldn't parse bool"),
additional_service_monitors: config.additional_service_monitors.clone(),
},
};
let prometheus_config_yaml =
serde_yaml::to_string(&prometheus_config).expect("Failed to serialize YAML");
debug!(
"serialized prometheus config: \n {:#}",
prometheus_config_yaml
);
values.push_str(&prometheus_config_yaml);
// add required null receiver for prometheus alert manager // add required null receiver for prometheus alert manager
let mut null_receiver = Mapping::new(); let mut null_receiver = Mapping::new();
null_receiver.insert( null_receiver.insert(

View File

@ -4,6 +4,7 @@ use serde::Serialize;
use super::{helm::config::KubePrometheusConfig, prometheus::Prometheus}; use super::{helm::config::KubePrometheusConfig, prometheus::Prometheus};
use crate::{ use crate::{
modules::monitoring::kube_prometheus::types::ServiceMonitor,
score::Score, score::Score,
topology::{ topology::{
HelmCommand, Topology, HelmCommand, Topology,
@ -16,10 +17,16 @@ use crate::{
pub struct HelmPrometheusAlertingScore { pub struct HelmPrometheusAlertingScore {
pub receivers: Vec<Box<dyn AlertReceiver<Prometheus>>>, pub receivers: Vec<Box<dyn AlertReceiver<Prometheus>>>,
pub rules: Vec<Box<dyn AlertRule<Prometheus>>>, pub rules: Vec<Box<dyn AlertRule<Prometheus>>>,
pub service_monitors: Vec<ServiceMonitor>,
} }
impl<T: Topology + HelmCommand + TenantManager> Score<T> for HelmPrometheusAlertingScore { impl<T: Topology + HelmCommand + TenantManager> Score<T> for HelmPrometheusAlertingScore {
fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> { fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> {
let config = Arc::new(Mutex::new(KubePrometheusConfig::new()));
config
.try_lock()
.expect("couldn't lock config")
.additional_service_monitors = self.service_monitors.clone();
Box::new(AlertingInterpret { Box::new(AlertingInterpret {
sender: Prometheus::new(), sender: Prometheus::new(),
receivers: self.receivers.clone(), receivers: self.receivers.clone(),

View File

@ -1,4 +1,4 @@
use std::collections::BTreeMap; use std::collections::{BTreeMap, HashMap};
use async_trait::async_trait; use async_trait::async_trait;
use serde::Serialize; use serde::Serialize;
@ -85,3 +85,202 @@ pub struct AlertManagerAdditionalPromRules {
pub struct AlertGroup { pub struct AlertGroup {
pub groups: Vec<AlertManagerRuleGroup>, pub groups: Vec<AlertManagerRuleGroup>,
} }
#[derive(Debug, Clone, Serialize)]
pub enum HTTPScheme {
#[serde(rename = "http")]
HTTP,
#[serde(rename = "https")]
HTTPS,
}
#[derive(Debug, Clone, Serialize)]
pub enum Operator {
In,
NotIn,
Exists,
DoesNotExist,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PrometheusConfigValues {
pub prometheus: PrometheusConfig,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct PrometheusConfig {
pub prometheus: bool,
pub additional_service_monitors: Vec<ServiceMonitor>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ServiceMonitorTLSConfig {
// ## Path to the CA file
// ##
pub ca_file: Option<String>,
// ## Path to client certificate file
// ##
pub cert_file: Option<String>,
// ## Skip certificate verification
// ##
pub insecure_skip_verify: Option<bool>,
// ## Path to client key file
// ##
pub key_file: Option<String>,
// ## Server name used to verify host name
// ##
pub server_name: Option<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ServiceMonitorEndpoint {
// ## Name of the endpoint's service port
// ## Mutually exclusive with targetPort
pub port: Option<String>,
// ## Name or number of the endpoint's target port
// ## Mutually exclusive with port
pub target_port: Option<String>,
// ## File containing bearer token to be used when scraping targets
// ##
pub bearer_token_file: Option<String>,
// ## Interval at which metrics should be scraped
// ##
pub interval: Option<String>,
// ## HTTP path to scrape for metrics
// ##
pub path: String,
// ## HTTP scheme to use for scraping
// ##
pub scheme: HTTPScheme,
// ## TLS configuration to use when scraping the endpoint
// ##
pub tls_config: Option<ServiceMonitorTLSConfig>,
// ## MetricRelabelConfigs to apply to samples after scraping, but before ingestion.
// ## ref: https://github.com/prometheus-operator/prometheus-operator/blob/main/Documentation/api-reference/api.md#relabelconfig
// ##
// # - action: keep
// # regex: 'kube_(daemonset|deployment|pod|namespace|node|statefulset).+'
// # sourceLabels: [__name__]
pub metric_relabelings: Vec<Mapping>,
// ## RelabelConfigs to apply to samples before scraping
// ## ref: https://github.com/prometheus-operator/prometheus-operator/blob/main/Documentation/api-reference/api.md#relabelconfig
// ##
// # - sourceLabels: [__meta_kubernetes_pod_node_name]
// # separator: ;
// # regex: ^(.*)$
// # targetLabel: nodename
// # replacement: $1
// # action: replace
pub relabelings: Vec<Mapping>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct MatchExpression {
pub key: String,
pub operator: Operator,
pub values: Vec<String>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct Selector {
// # label selector for services
pub match_labels: HashMap<String, String>,
pub match_expressions: Vec<MatchExpression>,
}
#[derive(Debug, Clone, Serialize)]
#[serde(rename_all = "camelCase")]
pub struct ServiceMonitor {
pub name: String,
// # Additional labels to set used for the ServiceMonitorSelector. Together with standard labels from the chart
pub additional_labels: Option<Mapping>,
// # Service label for use in assembling a job name of the form <label value>-<port>
// # If no label is specified, the service name is used.
pub job_label: Option<String>,
// # labels to transfer from the kubernetes service to the target
pub target_labels: Vec<String>,
// # labels to transfer from the kubernetes pods to the target
pub pod_target_labels: Vec<String>,
// # Label selector for services to which this ServiceMonitor applies
// # Example which selects all services to be monitored
// # with label "monitoredby" with values any of "example-service-1" or "example-service-2"
// matchExpressions:
// - key: "monitoredby"
// operator: In
// values:
// - example-service-1
// - example-service-2
pub selector: Selector,
// # Namespaces from which services are selected
// # Match any namespace
// any: bool,
// # Explicit list of namespace names to select
// matchNames: Vec,
pub namespace_selector: Option<Mapping>,
// # Endpoints of the selected service to be monitored
pub endpoints: Vec<ServiceMonitorEndpoint>,
// # Fallback scrape protocol used by Prometheus for scraping metrics
// # ref: https://github.com/prometheus-operator/prometheus-operator/blob/main/Documentation/api-reference/api.md#monitoring.coreos.com/v1.ScrapeProtocol
pub fallback_scrape_protocol: Option<String>,
}
impl Default for ServiceMonitor {
fn default() -> Self {
Self {
name: Default::default(),
additional_labels: Default::default(),
job_label: Default::default(),
target_labels: Default::default(),
pod_target_labels: Default::default(),
selector: Selector {
match_labels: HashMap::new(),
match_expressions: vec![],
},
namespace_selector: Default::default(),
endpoints: Default::default(),
fallback_scrape_protocol: Default::default(),
}
}
}
impl Default for ServiceMonitorEndpoint {
fn default() -> Self {
Self {
port: Some("80".to_string()),
target_port: Default::default(),
bearer_token_file: Default::default(),
interval: Default::default(),
path: "/metrics".to_string(),
scheme: HTTPScheme::HTTP,
tls_config: Default::default(),
metric_relabelings: Default::default(),
relabelings: Default::default(),
}
}
}

View File

@ -1,3 +1,4 @@
pub mod alert_channel; pub mod alert_channel;
pub mod alert_rule; pub mod alert_rule;
pub mod kube_prometheus; pub mod kube_prometheus;
pub mod ntfy;

View File

@ -0,0 +1 @@
pub mod ntfy_helm_chart;

View File

@ -0,0 +1,83 @@
use non_blank_string_rs::NonBlankString;
use std::str::FromStr;
use crate::modules::helm::chart::{HelmChartScore, HelmRepository};
pub fn ntfy_helm_chart_score(namespace: String) -> HelmChartScore {
let values = format!(
r#"
replicaCount: 1
image:
repository: binwiederhier/ntfy
pullPolicy: IfNotPresent
# Overrides the image tag whose default is the chart appVersion.
tag: "v2.12.0"
serviceAccount:
# Specifies whether a service account should be created
create: true
# Annotations to add to the service account
# annotations:
# The name of the service account to use.
# If not set and create is true, a name is generated using the fullname template
# name: ""
service:
type: ClusterIP
port: 80
ingress:
enabled: false
# annotations:
# kubernetes.io/ingress.class: nginx
# kubernetes.io/tls-acme: "true"
hosts:
- host: ntfy.host.com
paths:
- path: /
pathType: ImplementationSpecific
tls: []
# - secretName: chart-example-tls
# hosts:
# - chart-example.local
autoscaling:
enabled: false
config:
enabled: true
data:
# base-url: "https://ntfy.something.com"
auth-file: "/var/cache/ntfy/user.db"
auth-default-access: "deny-all"
cache-file: "/var/cache/ntfy/cache.db"
attachment-cache-dir: "/var/cache/ntfy/attachments"
behind-proxy: true
# web-root: "disable"
enable-signup: false
enable-login: "true"
persistence:
enabled: true
size: 200Mi
"#,
);
HelmChartScore {
namespace: Some(NonBlankString::from_str(&namespace).unwrap()),
release_name: NonBlankString::from_str("ntfy").unwrap(),
chart_name: NonBlankString::from_str("sarab97/ntfy").unwrap(),
chart_version: Some(NonBlankString::from_str("0.1.7").unwrap()),
values_overrides: None,
values_yaml: Some(values.to_string()),
create_namespace: true,
install_only: false,
repository: Some(HelmRepository::new(
"sarab97".to_string(),
url::Url::parse("https://charts.sarabsingh.com").unwrap(),
true,
)),
}
}

View File

@ -0,0 +1,2 @@
pub mod helm;
pub mod ntfy;

View File

@ -0,0 +1,169 @@
use std::sync::Arc;
use async_trait::async_trait;
use log::debug;
use serde::Serialize;
use strum::{Display, EnumString};
use crate::{
data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
modules::monitoring::ntfy::helm::ntfy_helm_chart::ntfy_helm_chart_score,
score::Score,
topology::{HelmCommand, K8sclient, Topology, k8s::K8sClient},
};
#[derive(Debug, Clone, Serialize)]
pub struct NtfyScore {
pub namespace: String,
}
impl<T: Topology + HelmCommand + K8sclient> Score<T> for NtfyScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(NtfyInterpret {
score: self.clone(),
})
}
fn name(&self) -> String {
format!("Ntfy")
}
}
#[derive(Debug, Serialize)]
pub struct NtfyInterpret {
pub score: NtfyScore,
}
#[derive(Debug, EnumString, Display)]
enum NtfyAccessMode {
#[strum(serialize = "read-write", serialize = "rw", to_string = "read-write")]
ReadWrite,
#[strum(
serialize = "read-only",
serialize = "ro",
serialize = "read",
to_string = "read-only"
)]
ReadOnly,
#[strum(
serialize = "write-only",
serialize = "wo",
serialize = "write",
to_string = "write-only"
)]
WriteOnly,
#[strum(serialize = "none", to_string = "deny")]
Deny,
}
#[derive(Debug, EnumString, Display)]
enum NtfyRole {
#[strum(serialize = "user", to_string = "user")]
User,
#[strum(serialize = "admin", to_string = "admin")]
Admin,
}
impl NtfyInterpret {
async fn add_user(
&self,
k8s_client: Arc<K8sClient>,
username: &str,
password: &str,
role: Option<NtfyRole>,
) -> Result<(), String> {
let role = match role {
Some(r) => r,
None => NtfyRole::User,
};
k8s_client
.exec_app(
"ntfy".to_string(),
Some(&self.score.namespace),
vec![
"sh",
"-c",
format!("NTFY_PASSWORD={password} ntfy user add --role={role} {username}")
.as_str(),
],
)
.await?;
Ok(())
}
async fn set_access(
&self,
k8s_client: Arc<K8sClient>,
username: &str,
topic: &str,
mode: NtfyAccessMode,
) -> Result<(), String> {
k8s_client
.exec_app(
"ntfy".to_string(),
Some(&self.score.namespace),
vec![
"sh",
"-c",
format!("ntfy access {username} {topic} {mode}").as_str(),
],
)
.await?;
Ok(())
}
}
/// We need a ntfy interpret to wrap the HelmChartScore in order to run the score, and then bootstrap the config inside ntfy
#[async_trait]
impl<T: Topology + HelmCommand + K8sclient> Interpret<T> for NtfyInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
ntfy_helm_chart_score(self.score.namespace.clone())
.create_interpret()
.execute(inventory, topology)
.await?;
debug!("installed ntfy helm chart");
let client = topology
.k8s_client()
.await
.expect("couldn't get k8s client");
client
.wait_until_deployment_ready(
"ntfy".to_string(),
Some(&self.score.namespace.as_str()),
None,
)
.await?;
debug!("created k8s client");
self.add_user(client, "harmony", "harmony", Some(NtfyRole::Admin))
.await?;
debug!("exec into pod done");
Ok(Outcome::success("installed ntfy".to_string()))
}
fn get_name(&self) -> InterpretName {
todo!()
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}