From 1adc2db5d9b80f2e1cd4d52fd1140b9f267f5a4b Mon Sep 17 00:00:00 2001 From: tahahawa Date: Mon, 30 Jun 2025 11:57:06 -0400 Subject: [PATCH] it works! --- Cargo.lock | 110 ++++++++++-------- Cargo.toml | 4 +- examples/kube-rs/Cargo.toml | 2 +- examples/ntfy/src/main.rs | 32 +---- harmony/Cargo.toml | 1 + harmony/src/domain/interpret/mod.rs | 8 ++ harmony/src/domain/topology/k8s.rs | 59 ++++++---- .../monitoring/kube_prometheus/helm/config.rs | 4 +- .../modules/monitoring/ntfy/helm/config.rs | 6 - .../src/modules/monitoring/ntfy/helm/mod.rs | 1 - .../monitoring/ntfy/helm/ntfy_helm_chart.rs | 16 +-- harmony/src/modules/monitoring/ntfy/ntfy.rs | 84 ++++++++++--- 12 files changed, 190 insertions(+), 137 deletions(-) delete mode 100644 harmony/src/modules/monitoring/ntfy/helm/config.rs diff --git a/Cargo.lock b/Cargo.lock index db8298b..a4671df 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -232,14 +232,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] -name = "backoff" -version = "0.4.0" +name = "backon" +version = "1.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b62ddb9cb1ec0a098ad4bbf9344d0713fa193ae1a80af55febcff2627b6a00c1" +checksum = "302eaff5357a264a2c42f127ecb8bac761cf99749fc3dc95677e2743991f99e7" dependencies = [ - "getrandom 0.2.16", - "instant", - "rand 0.8.5", + "fastrand", + "gloo-timers", + "tokio", ] [[package]] @@ -930,6 +930,26 @@ dependencies = [ "syn", ] +[[package]] +name = "derive_more" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "093242cf7570c207c83073cf82f79706fe7b8317e98620a47d5be7c3d8497678" +dependencies = [ + "derive_more-impl", +] + +[[package]] +name = "derive_more-impl" +version = "2.0.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda628edc44c4bb645fbe0f758797143e4e07926f7ebf4e9bdfbd3d2ce621df3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + [[package]] name = "des" version = "0.8.1" @@ -1246,7 +1266,7 @@ dependencies = [ "harmony_macros", "http 1.3.1", "inquire", - "k8s-openapi 0.25.0", + "k8s-openapi", "kube", "log", "serde_yaml", @@ -1614,6 +1634,18 @@ version = "0.31.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" +[[package]] +name = "gloo-timers" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbb143cf96099802033e0d4f4963b19fd2e0b728bcf076cd9cf7f6634f092994" +dependencies = [ + "futures-channel", + "futures-core", + "js-sys", + "wasm-bindgen", +] + [[package]] name = "group" version = "0.13.0" @@ -1686,7 +1718,7 @@ dependencies = [ "http 1.3.1", "inquire", "k3d-rs", - "k8s-openapi 0.24.0", + "k8s-openapi", "kube", "lazy_static", "libredfish", @@ -1707,6 +1739,7 @@ dependencies = [ "temp-dir", "temp-file", "tokio", + "tokio-util", "url", "uuid", ] @@ -2376,15 +2409,6 @@ dependencies = [ "syn", ] -[[package]] -name = "instant" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e0242819d153cba4b4b05a5a8f2a7e9bbf97b6055b2a002b395c96b5ff3c0222" -dependencies = [ - "cfg-if", -] - [[package]] name = "ipnet" version = "2.11.0" @@ -2458,9 +2482,9 @@ dependencies = [ [[package]] name = "json-patch" -version = "3.0.1" +version = "4.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "863726d7afb6bc2590eeff7135d923545e5e964f004c2ccf8716c25e70a86f08" +checksum = "159294d661a039f7644cea7e4d844e6b25aaf71c1ffe9d73a96d768c24b0faf4" dependencies = [ "jsonptr", "serde", @@ -2483,9 +2507,9 @@ dependencies = [ [[package]] name = "jsonptr" -version = "0.6.3" +version = "0.7.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5dea2b27dd239b2556ed7a25ba842fe47fd602e7fc7433c2a8d6106d4d9edd70" +checksum = "a5a3cc660ba5d72bce0b3bb295bf20847ccbb40fd423f3f05b61273672e561fe" dependencies = [ "serde", "serde_json", @@ -2525,19 +2549,6 @@ dependencies = [ "url", ] -[[package]] -name = "k8s-openapi" -version = "0.24.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2c75b990324f09bef15e791606b7b7a296d02fc88a344f6eba9390970a870ad5" -dependencies = [ - "base64 0.22.1", - "chrono", - "serde", - "serde-value", - "serde_json", -] - [[package]] name = "k8s-openapi" version = "0.25.0" @@ -2552,11 +2563,11 @@ dependencies = [ [[package]] name = "kube" -version = "0.98.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "32053dc495efad4d188c7b33cc7c02ef4a6e43038115348348876efd39a53cba" +checksum = "778f98664beaf4c3c11372721e14310d1ae00f5e2d9aabcf8906c881aa4e9f51" dependencies = [ - "k8s-openapi 0.24.0", + "k8s-openapi", "kube-client", "kube-core", "kube-runtime", @@ -2564,9 +2575,9 @@ dependencies = [ [[package]] name = "kube-client" -version = "0.98.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d34ad38cdfbd1fa87195d42569f57bb1dda6ba5f260ee32fef9570b7937a0c9" +checksum = "7cb276b85b6e94ded00ac8ea2c68fcf4697ea0553cb25fddc35d4a0ab718db8d" dependencies = [ "base64 0.22.1", "bytes", @@ -2583,12 +2594,10 @@ dependencies = [ "hyper-timeout", "hyper-util", "jsonpath-rust", - "k8s-openapi 0.24.0", + "k8s-openapi", "kube-core", "pem", - "rand 0.8.5", "rustls", - "rustls-pemfile 2.2.0", "secrecy", "serde", "serde_json", @@ -2604,15 +2613,16 @@ dependencies = [ [[package]] name = "kube-core" -version = "0.98.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97aa830b288a178a90e784d1b0f1539f2d200d2188c7b4a3146d9dc983d596f3" +checksum = "e3c56ff45deb0031f2a476017eed60c06872251f271b8387ad8020b8fef60960" dependencies = [ "chrono", + "derive_more", "form_urlencoded", "http 1.3.1", "json-patch", - "k8s-openapi 0.24.0", + "k8s-openapi", "serde", "serde-value", "serde_json", @@ -2621,22 +2631,20 @@ dependencies = [ [[package]] name = "kube-runtime" -version = "0.98.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a41af186a0fe80c71a13a13994abdc3ebff80859ca6a4b8a6079948328c135b" +checksum = "2f1326e946fadf6248febdf8a1c001809c3899ccf48cb9768cbc536b741040dc" dependencies = [ "ahash", "async-broadcast", "async-stream", - "async-trait", - "backoff", + "backon", "educe", "futures", "hashbrown 0.15.4", "hostname", "json-patch", - "jsonptr", - "k8s-openapi 0.24.0", + "k8s-openapi", "kube-client", "parking_lot", "pin-project", diff --git a/Cargo.toml b/Cargo.toml index da05b64..92bfc4a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -35,7 +35,7 @@ russh = "0.45" russh-keys = "0.45" rand = "0.8" url = "2.5" -kube = { version = "0.98", features = [ +kube = { version = "1.1.0", features = [ "config", "client", "runtime", @@ -43,7 +43,7 @@ kube = { version = "0.98", features = [ "ws", "jsonpatch", ] } -k8s-openapi = { version = "0.24", features = ["v1_30"] } +k8s-openapi = { version = "0.25", features = ["v1_30"] } serde_yaml = "0.9" serde-value = "0.7" http = "1.2" diff --git a/examples/kube-rs/Cargo.toml b/examples/kube-rs/Cargo.toml index e4cf5b7..7d81470 100644 --- a/examples/kube-rs/Cargo.toml +++ b/examples/kube-rs/Cargo.toml @@ -14,7 +14,7 @@ harmony_macros = { path = "../../harmony_macros" } log = { workspace = true } env_logger = { workspace = true } url = { workspace = true } -kube = "0.98.0" +kube = "1.1.0" k8s-openapi = { version = "0.25.0", features = ["v1_30"] } http = "1.2.0" serde_yaml = "0.9.34" diff --git a/examples/ntfy/src/main.rs b/examples/ntfy/src/main.rs index 86c1012..f359a61 100644 --- a/examples/ntfy/src/main.rs +++ b/examples/ntfy/src/main.rs @@ -1,36 +1,10 @@ -use std::{collections::HashMap, str::FromStr}; - use harmony::{ - inventory::Inventory, - maestro::Maestro, - modules::helm::chart::{HelmChartScore, HelmRepository, NonBlankString}, + inventory::Inventory, maestro::Maestro, modules::monitoring::ntfy::ntfy::NtfyScore, topology::K8sAnywhereTopology, }; #[tokio::main] async fn main() { - let mut ntfy_overrides: HashMap = HashMap::new(); - ntfy_overrides.insert( - NonBlankString::from_str("image.tag").unwrap(), - "v2.12.0".to_string(), - ); - - let ntfy_chart = HelmChartScore { - namespace: Some(NonBlankString::from_str("monitoring").unwrap()), - release_name: NonBlankString::from_str("ntfy").unwrap(), - chart_name: NonBlankString::from_str("sarab97/ntfy").unwrap(), - chart_version: Some(NonBlankString::from_str("0.1.7").unwrap()), - values_overrides: Some(ntfy_overrides), - values_yaml: None, - create_namespace: true, - install_only: false, - repository: Some(HelmRepository::new( - "sarab97".to_string(), - url::Url::parse("https://charts.sarabsingh.com").unwrap(), - true, - )), - }; - let mut maestro = Maestro::::initialize( Inventory::autoload(), K8sAnywhereTopology::from_env(), @@ -38,6 +12,8 @@ async fn main() { .await .unwrap(); - maestro.register_all(vec![Box::new(ntfy_chart)]); + maestro.register_all(vec![Box::new(NtfyScore { + namespace: "monitoring".to_string(), + })]); harmony_cli::init(maestro, None).await.unwrap(); } diff --git a/harmony/Cargo.toml b/harmony/Cargo.toml index f56c120..00582ef 100644 --- a/harmony/Cargo.toml +++ b/harmony/Cargo.toml @@ -55,3 +55,4 @@ temp-dir = "0.1.14" dyn-clone = "1.0.19" similar.workspace = true futures-util = "0.3.31" +tokio-util = "0.7.15" diff --git a/harmony/src/domain/interpret/mod.rs b/harmony/src/domain/interpret/mod.rs index 5e928cb..f5fc151 100644 --- a/harmony/src/domain/interpret/mod.rs +++ b/harmony/src/domain/interpret/mod.rs @@ -124,3 +124,11 @@ impl From for InterpretError { } } } + +impl From for InterpretError { + fn from(value: String) -> Self { + Self { + msg: format!("InterpretError : {value}"), + } + } +} diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 52f28aa..32da4ac 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -1,24 +1,17 @@ use derive_new::new; -use futures_util::TryStreamExt; +use futures_util::StreamExt; use k8s_openapi::{ ClusterResourceScope, NamespaceResourceScope, api::{apps::v1::Deployment, core::v1::Pod}, }; use kube::runtime::conditions; -use kube::runtime::wait::{Condition, await_condition}; +use kube::runtime::wait::await_condition; use kube::{ Client, Config, Error, Resource, - api::{ - Api, AttachParams, AttachedProcess, DeleteParams, ListParams, Patch, PatchParams, - PostParams, ResourceExt, WatchEvent, WatchParams, - }, + api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt}, config::{KubeConfigOptions, Kubeconfig}, core::ErrorResponse, - runtime::{ - WatchStreamExt, metadata_watcher, - reflector::Lookup, - watcher::{self, watch_object}, - }, + runtime::reflector::Lookup, }; use log::{debug, error, trace}; use serde::de::DeserializeOwned; @@ -40,7 +33,8 @@ impl K8sClient { &self, name: String, namespace: Option<&str>, - ) -> Result<(), Error> { + timeout: Option, + ) -> Result<(), String> { let api: Api; if let Some(ns) = namespace { @@ -49,18 +43,22 @@ impl K8sClient { api = Api::default_namespaced(self.client.clone()); } - // need to upgrade to latest kube-rs version https://docs.rs/kube-runtime/latest/kube_runtime/wait/conditions/fn.is_deployment_completed.html let establish = await_condition(api, name.as_str(), conditions::is_deployment_completed()); - let _ = tokio::time::timeout(std::time::Duration::from_secs(300), establish).await?; + let t = if let Some(t) = timeout { t } else { 300 }; + let res = tokio::time::timeout(std::time::Duration::from_secs(t), establish).await; - Ok(()) + if let Ok(r) = res { + return Ok(()); + } else { + return Err("timed out while waiting for deployment".to_string()); + } } pub async fn exec_pod( &self, name: String, namespace: Option<&str>, - command: Vec, + command: Vec<&str>, ) -> Result<(), String> { let api: Api; @@ -74,10 +72,8 @@ impl K8sClient { .await .expect("couldn't get list of pods"); - if pod_list.items.len() > 1 { - return Err("too many pods".into()); - } else { - api.exec( + let res = api + .exec( pod_list .items .first() @@ -90,9 +86,28 @@ impl K8sClient { &AttachParams::default(), ) .await; - } - Ok(()) + match res { + Err(e) => return Err(e.to_string()), + Ok(mut process) => { + let status = process + .take_status() + .expect("Couldn't get status") + .await + .expect("Couldn't unwrap status"); + + if let Some(s) = status.status { + debug!("Status: {}", s); + if s == "Success" { + return Ok(()); + } else { + return Err(s); + } + } else { + return Err("Couldn't get inner status of pod exec".to_string()); + } + } + } } /// Apply a resource in namespace diff --git a/harmony/src/modules/monitoring/kube_prometheus/helm/config.rs b/harmony/src/modules/monitoring/kube_prometheus/helm/config.rs index 8e83fd7..c0f614d 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/helm/config.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/helm/config.rs @@ -1,6 +1,8 @@ use serde::Serialize; -use crate::modules::monitoring::kube_prometheus::types::{AlertManagerAdditionalPromRules, AlertManagerChannelConfig}; +use crate::modules::monitoring::kube_prometheus::types::{ + AlertManagerAdditionalPromRules, AlertManagerChannelConfig, +}; #[derive(Debug, Clone, Serialize)] pub struct KubePrometheusConfig { diff --git a/harmony/src/modules/monitoring/ntfy/helm/config.rs b/harmony/src/modules/monitoring/ntfy/helm/config.rs deleted file mode 100644 index 14a2fdd..0000000 --- a/harmony/src/modules/monitoring/ntfy/helm/config.rs +++ /dev/null @@ -1,6 +0,0 @@ -use serde::Serialize; - -#[derive(Debug, Clone, Serialize)] -pub struct NtfyConfig { - pub namespace: String, -} diff --git a/harmony/src/modules/monitoring/ntfy/helm/mod.rs b/harmony/src/modules/monitoring/ntfy/helm/mod.rs index 9a39996..40d1524 100644 --- a/harmony/src/modules/monitoring/ntfy/helm/mod.rs +++ b/harmony/src/modules/monitoring/ntfy/helm/mod.rs @@ -1,2 +1 @@ -pub mod config; pub mod ntfy_helm_chart; diff --git a/harmony/src/modules/monitoring/ntfy/helm/ntfy_helm_chart.rs b/harmony/src/modules/monitoring/ntfy/helm/ntfy_helm_chart.rs index fd3c4aa..db7d9c4 100644 --- a/harmony/src/modules/monitoring/ntfy/helm/ntfy_helm_chart.rs +++ b/harmony/src/modules/monitoring/ntfy/helm/ntfy_helm_chart.rs @@ -1,17 +1,9 @@ use non_blank_string_rs::NonBlankString; -use std::{ - str::FromStr, - sync::{Arc, Mutex}, -}; +use std::str::FromStr; -use crate::modules::{ - helm::chart::{HelmChartScore, HelmRepository}, - monitoring::ntfy::helm::config::NtfyConfig, -}; - -pub fn ntfy_helm_chart_score(config: Arc>) -> HelmChartScore { - let config = config.lock().unwrap(); +use crate::modules::helm::chart::{HelmChartScore, HelmRepository}; +pub fn ntfy_helm_chart_score(namespace: String) -> HelmChartScore { let values = format!( r#" replicaCount: 1 @@ -74,7 +66,7 @@ persistence: ); HelmChartScore { - namespace: Some(NonBlankString::from_str(&config.namespace).unwrap()), + namespace: Some(NonBlankString::from_str(&namespace).unwrap()), release_name: NonBlankString::from_str("ntfy").unwrap(), chart_name: NonBlankString::from_str("sarab97/ntfy").unwrap(), chart_version: Some(NonBlankString::from_str("0.1.7").unwrap()), diff --git a/harmony/src/modules/monitoring/ntfy/ntfy.rs b/harmony/src/modules/monitoring/ntfy/ntfy.rs index 0573b6d..33e7b73 100644 --- a/harmony/src/modules/monitoring/ntfy/ntfy.rs +++ b/harmony/src/modules/monitoring/ntfy/ntfy.rs @@ -1,33 +1,91 @@ -use std::sync::{Arc, Mutex}; +use async_trait::async_trait; +use log::debug; +use serde::Serialize; use crate::{ - interpret::{InterpretError, Outcome}, + data::{Id, Version}, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, - modules::monitoring::ntfy::helm::{config::NtfyConfig, ntfy_helm_chart::ntfy_helm_chart_score}, + modules::monitoring::ntfy::helm::ntfy_helm_chart::ntfy_helm_chart_score, score::Score, topology::{HelmCommand, K8sclient, Topology}, }; -pub struct Ntfy { - pub config: Arc>, +#[derive(Debug, Clone, Serialize)] +pub struct NtfyScore { + pub namespace: String, } -impl Ntfy { - async fn install_ntfy( +impl Score for NtfyScore { + fn create_interpret(&self) -> Box> { + Box::new(NtfyInterpret { + score: self.clone(), + }) + } + + fn name(&self) -> String { + format!("Ntfy") + } +} + +#[derive(Debug, Serialize)] +pub struct NtfyInterpret { + pub score: NtfyScore, +} + +#[async_trait] +impl Interpret for NtfyInterpret { + async fn execute( &self, inventory: &Inventory, topology: &T, ) -> Result { - let result = ntfy_helm_chart_score(self.config.clone()) + ntfy_helm_chart_score(self.score.namespace.clone()) .create_interpret() .execute(inventory, topology) - .await; + .await?; - let client = topology.k8s_client().await.expect("couldn't get k8s client"); + debug!("installed ntfy helm chart"); + let client = topology + .k8s_client() + .await + .expect("couldn't get k8s client"); - client.wait_until_deployment_ready("ntfy", self.config.get_mut().expect("couldn't get config").namespace); - client. + client + .wait_until_deployment_ready( + "ntfy".to_string(), + Some(&self.score.namespace.as_str()), + None, + ) + .await?; + debug!("created k8s client"); - result + client + .exec_pod( + "ntfy".to_string(), + Some(&self.score.namespace), + vec![ + "sh", + "-c", + "NTFY_PASSWORD=harmony ntfy user add --role=admin harmony", + ], + ) + .await?; + debug!("exec into pod done"); + + Ok(Outcome::success("installed ntfy".to_string())) + } + + fn get_name(&self) -> InterpretName { + todo!() + } + fn get_version(&self) -> Version { + todo!() + } + fn get_status(&self) -> InterpretStatus { + todo!() + } + fn get_children(&self) -> Vec { + todo!() } }