Compare commits
	
		
			2 Commits
		
	
	
		
			528ee8a696
			...
			79e406f126
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| 79e406f126 | |||
| 0700e30299 | 
| @ -27,6 +27,7 @@ async fn main() { | ||||
|     }; | ||||
|     let application = Arc::new(RustWebapp { | ||||
|         name: "example-monitoring".to_string(), | ||||
|         dns: "example-monitoring.harmony.mcd".to_string(), | ||||
|         project_root: PathBuf::from("./examples/rust/webapp"), | ||||
|         framework: Some(RustWebFramework::Leptos), | ||||
|         service_port: 3000, | ||||
|  | ||||
| @ -16,6 +16,7 @@ use harmony_types::net::Url; | ||||
| async fn main() { | ||||
|     let application = Arc::new(RustWebapp { | ||||
|         name: "test-rhob-monitoring".to_string(), | ||||
|         dns: "test-rhob-monitoring.harmony.mcd".to_string(), | ||||
|         project_root: PathBuf::from("./webapp"), // Relative from 'harmony-path' param
 | ||||
|         framework: Some(RustWebFramework::Leptos), | ||||
|         service_port: 3000, | ||||
|  | ||||
| @ -19,6 +19,7 @@ use harmony_macros::hurl; | ||||
| async fn main() { | ||||
|     let application = Arc::new(RustWebapp { | ||||
|         name: "harmony-example-rust-webapp".to_string(), | ||||
|         dns: "harmony-example-rust-webapp.harmony.mcd".to_string(), | ||||
|         project_root: PathBuf::from("./webapp"), | ||||
|         framework: Some(RustWebFramework::Leptos), | ||||
|         service_port: 3000, | ||||
|  | ||||
| @ -2,12 +2,11 @@ use harmony::{ | ||||
|     inventory::Inventory, | ||||
|     modules::{ | ||||
|         application::{ | ||||
|             ApplicationScore, RustWebFramework, RustWebapp, | ||||
|             features::{PackagingDeployment, rhob_monitoring::Monitoring}, | ||||
|             features::{rhob_monitoring::Monitoring, PackagingDeployment}, ApplicationScore, RustWebFramework, RustWebapp | ||||
|         }, | ||||
|         monitoring::alert_channel::discord_alert_channel::DiscordWebhook, | ||||
|     }, | ||||
|     topology::K8sAnywhereTopology, | ||||
|     topology::{K8sAnywhereTopology, LocalhostTopology}, | ||||
| }; | ||||
| use harmony_macros::hurl; | ||||
| use std::{path::PathBuf, sync::Arc}; | ||||
| @ -22,8 +21,8 @@ async fn main() { | ||||
|     }); | ||||
| 
 | ||||
|     let discord_webhook = DiscordWebhook { | ||||
|         name: "harmony_demo".to_string(), | ||||
|         url: hurl!("http://not_a_url.com"), | ||||
|         name: "harmony-demo".to_string(), | ||||
|         url: hurl!("https://discord.com/api/webhooks/1415391405681021050/V6KzV41vQ7yvbn7BchejRu9C8OANxy0i2ESZOz2nvCxG8xAY3-2i3s5MS38k568JKTzH"), | ||||
|     }; | ||||
| 
 | ||||
|     let app = ApplicationScore { | ||||
|  | ||||
| @ -16,6 +16,7 @@ use std::{path::PathBuf, sync::Arc}; | ||||
| async fn main() { | ||||
|     let application = Arc::new(RustWebapp { | ||||
|         name: "harmony-example-tryrust".to_string(), | ||||
|         dns: "tryrust.example.harmony.mcd".to_string(), | ||||
|         project_root: PathBuf::from("./tryrust.org"), // <== Project root, in this case it is a
 | ||||
|         // submodule
 | ||||
|         framework: Some(RustWebFramework::Leptos), | ||||
|  | ||||
| @ -1,9 +1,10 @@ | ||||
| use std::time::Duration; | ||||
| use std::{collections::HashMap, time::Duration}; | ||||
| 
 | ||||
| use derive_new::new; | ||||
| use k8s_openapi::{ | ||||
|     ClusterResourceScope, NamespaceResourceScope, | ||||
|     api::{apps::v1::Deployment, core::v1::Pod}, | ||||
|     apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition, | ||||
|     apimachinery::pkg::version::Info, | ||||
| }; | ||||
| use kube::{ | ||||
| @ -21,7 +22,7 @@ use kube::{ | ||||
| }; | ||||
| use log::{debug, error, trace}; | ||||
| use serde::{Serialize, de::DeserializeOwned}; | ||||
| use serde_json::{Value, json}; | ||||
| use serde_json::{json, Value}; | ||||
| use similar::TextDiff; | ||||
| use tokio::{io::AsyncReadExt, time::sleep}; | ||||
| 
 | ||||
| @ -57,6 +58,148 @@ impl K8sClient { | ||||
|         }) | ||||
|     } | ||||
| 
 | ||||
|      // Returns true if any deployment in the given namespace matching the label selector
 | ||||
|     // has status.availableReplicas > 0 (or condition Available=True).
 | ||||
|     pub async fn has_healthy_deployment_with_label( | ||||
|         &self, | ||||
|         namespace: &str, | ||||
|         label_selector: &str, | ||||
|     ) -> Result<bool, Error> { | ||||
|         let api: Api<Deployment> = Api::namespaced(self.client.clone(), namespace); | ||||
|         let lp = ListParams::default().labels(label_selector); | ||||
|         let list = api.list(&lp).await?; | ||||
|         for d in list.items { | ||||
|             // Check AvailableReplicas > 0 or Available condition
 | ||||
|             let available = d | ||||
|                 .status | ||||
|                 .as_ref() | ||||
|                 .and_then(|s| s.available_replicas) | ||||
|                 .unwrap_or(0); | ||||
|             if available > 0 { | ||||
|                 return Ok(true); | ||||
|             } | ||||
|             // Fallback: scan conditions
 | ||||
|             if let Some(conds) = d.status.as_ref().and_then(|s| s.conditions.as_ref()) { | ||||
|                 if conds.iter().any(|c| { | ||||
|                     c.type_ == "Available" | ||||
|                         && c.status == "True" | ||||
|                 }) { | ||||
|                     return Ok(true); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         Ok(false) | ||||
|     } | ||||
| 
 | ||||
|     // Cluster-wide: returns namespaces that have at least one healthy deployment
 | ||||
|     // matching the label selector (equivalent to kubectl -A -l ...).
 | ||||
|     pub async fn list_namespaces_with_healthy_deployments( | ||||
|         &self, | ||||
|         label_selector: &str, | ||||
|     ) -> Result<Vec<String>, Error> { | ||||
|         let api: Api<Deployment> = Api::all(self.client.clone()); | ||||
|         let lp = ListParams::default().labels(label_selector); | ||||
|         let list = api.list(&lp).await?; | ||||
| 
 | ||||
|         let mut healthy_ns: HashMap<String, bool> = HashMap::new(); | ||||
|         for d in list.items { | ||||
|             let ns = match d.metadata.namespace.clone() { | ||||
|                 Some(n) => n, | ||||
|                 None => continue, | ||||
|             }; | ||||
|             let available = d | ||||
|                 .status | ||||
|                 .as_ref() | ||||
|                 .and_then(|s| s.available_replicas) | ||||
|                 .unwrap_or(0); | ||||
|             let is_healthy = if available > 0 { | ||||
|                 true | ||||
|             } else { | ||||
|                 d.status | ||||
|                     .as_ref() | ||||
|                     .and_then(|s| s.conditions.as_ref()) | ||||
|                     .map(|conds| { | ||||
|                         conds.iter().any(|c| { | ||||
|                             c.type_ == "Available" | ||||
|                                 && c.status == "True" | ||||
|                         }) | ||||
|                     }) | ||||
|                     .unwrap_or(false) | ||||
|             }; | ||||
|             if is_healthy { | ||||
|                 healthy_ns.insert(ns, true); | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         Ok(healthy_ns.into_keys().collect()) | ||||
|     } | ||||
| 
 | ||||
|     // Get the application-controller ServiceAccount name (fallback to default)
 | ||||
|     pub async fn get_argocd_controller_sa_name(&self, ns: &str) -> Result<String, Error> { | ||||
|         let api: Api<Deployment> = Api::namespaced(self.client.clone(), ns); | ||||
|         let lp = ListParams::default().labels("app.kubernetes.io/component=controller"); | ||||
|         let list = api.list(&lp).await?; | ||||
|         if let Some(dep) = list.items.get(0) { | ||||
|             if let Some(sa) = dep | ||||
|                 .spec | ||||
|                 .as_ref() | ||||
|                 .and_then(|ds| ds.template.spec.as_ref()) | ||||
|                 .and_then(|ps| ps.service_account_name.clone()) | ||||
|             { | ||||
|                 return Ok(sa); | ||||
|             } | ||||
|         } | ||||
|         Ok("argocd-application-controller".to_string()) | ||||
|     } | ||||
| 
 | ||||
|     // List ClusterRoleBindings dynamically and return as JSON values
 | ||||
|     pub async fn list_clusterrolebindings_json(&self) -> Result<Vec<Value>, Error> { | ||||
|         let gvk = kube::api::GroupVersionKind::gvk( | ||||
|             "rbac.authorization.k8s.io", | ||||
|             "v1", | ||||
|             "ClusterRoleBinding", | ||||
|         ); | ||||
|         let ar = kube::api::ApiResource::from_gvk(&gvk); | ||||
|         let api: Api<kube::api::DynamicObject> = Api::all_with(self.client.clone(), &ar); | ||||
|         let crbs = api.list(&ListParams::default()).await?; | ||||
|         let mut out = Vec::new(); | ||||
|         for o in crbs { | ||||
|             let v = serde_json::to_value(&o).unwrap_or(Value::Null); | ||||
|             out.push(v); | ||||
|         } | ||||
|         Ok(out) | ||||
|     } | ||||
| 
 | ||||
|     // Determine if Argo controller in ns has cluster-wide permissions via CRBs
 | ||||
|     // TODO This does not belong in the generic k8s client, should be refactored at some point
 | ||||
|     pub async fn is_argocd_cluster_wide(&self, ns: &str) -> Result<bool, Error> { | ||||
|         let sa = self.get_argocd_controller_sa_name(ns).await?; | ||||
|         let crbs = self.list_clusterrolebindings_json().await?; | ||||
|         let sa_user = format!("system:serviceaccount:{}:{}", ns, sa); | ||||
|         for crb in crbs { | ||||
|             if let Some(subjects) = crb.get("subjects").and_then(|s| s.as_array()) { | ||||
|                 for subj in subjects { | ||||
|                     let kind = subj.get("kind").and_then(|v| v.as_str()).unwrap_or(""); | ||||
|                     let name = subj.get("name").and_then(|v| v.as_str()).unwrap_or(""); | ||||
|                     let subj_ns = subj.get("namespace").and_then(|v| v.as_str()).unwrap_or(""); | ||||
|                     if (kind == "ServiceAccount" && name == sa && subj_ns == ns) | ||||
|                         || (kind == "User" && name == sa_user) | ||||
|                     { | ||||
|                         return Ok(true); | ||||
|                     } | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|         Ok(false) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn has_crd(&self, name: &str) -> Result<bool, Error> { | ||||
|         let api: Api<CustomResourceDefinition> = Api::all(self.client.clone()); | ||||
|         let lp = ListParams::default().fields(&format!("metadata.name={}", name)); | ||||
|         let crds = api.list(&lp).await?; | ||||
|         Ok(!crds.items.is_empty()) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn get_apiserver_version(&self) -> Result<Info, Error> { | ||||
|         let client: Client = self.client.clone(); | ||||
|         let version_info: Info = client.apiserver_version().await?; | ||||
|  | ||||
| @ -2,7 +2,7 @@ use std::{process::Command, sync::Arc}; | ||||
| 
 | ||||
| use async_trait::async_trait; | ||||
| use kube::api::GroupVersionKind; | ||||
| use log::{debug, info, warn}; | ||||
| use log::{debug, info, trace, warn}; | ||||
| use serde::Serialize; | ||||
| use tokio::sync::OnceCell; | ||||
| 
 | ||||
| @ -71,6 +71,7 @@ pub struct K8sAnywhereTopology { | ||||
| #[async_trait] | ||||
| impl K8sclient for K8sAnywhereTopology { | ||||
|     async fn k8s_client(&self) -> Result<Arc<K8sClient>, String> { | ||||
|         trace!("getting k8s client"); | ||||
|         let state = match self.k8s_state.get() { | ||||
|             Some(state) => state, | ||||
|             None => return Err("K8s state not initialized yet".to_string()), | ||||
| @ -620,36 +621,56 @@ impl TenantManager for K8sAnywhereTopology { | ||||
| 
 | ||||
| #[async_trait] | ||||
| impl Ingress for K8sAnywhereTopology { | ||||
|     //TODO this is specifically for openshift/okd which violates the k8sanywhere idea
 | ||||
|     async fn get_domain(&self, service: &str) -> Result<String, PreparationError> { | ||||
|         use log::{trace, debug, warn}; | ||||
| 
 | ||||
|         let client = self.k8s_client().await?; | ||||
| 
 | ||||
|         if let Some(Some(k8s_state)) = self.k8s_state.get() { | ||||
|             match k8s_state.source { | ||||
|                 K8sSource::LocalK3d => Ok(format!("{service}.local.k3d")), | ||||
|                 K8sSource::LocalK3d => { | ||||
|                     // Local developer UX
 | ||||
|                     return Ok(format!("{service}.local.k3d")); | ||||
|                 } | ||||
|                 K8sSource::Kubeconfig => { | ||||
|                     self.openshift_ingress_operator_available().await?; | ||||
|                     trace!("K8sSource is kubeconfig; attempting to detect domain"); | ||||
| 
 | ||||
|                     let gvk = GroupVersionKind { | ||||
|                         group: "operator.openshift.io".into(), | ||||
|                         version: "v1".into(), | ||||
|                         kind: "IngressController".into(), | ||||
|                     }; | ||||
|                     let ic = client | ||||
|                         .get_resource_json_value( | ||||
|                             "default", | ||||
|                             Some("openshift-ingress-operator"), | ||||
|                             &gvk, | ||||
|                         ) | ||||
|                         .await | ||||
|                         .map_err(|_| { | ||||
|                             PreparationError::new("Failed to fetch IngressController".to_string()) | ||||
|                         })?; | ||||
|                     // 1) Try OpenShift IngressController domain (backward compatible)
 | ||||
|                     if self.openshift_ingress_operator_available().await.is_ok() { | ||||
|                         trace!("OpenShift ingress operator detected; using IngressController"); | ||||
|                         let gvk = GroupVersionKind { | ||||
|                             group: "operator.openshift.io".into(), | ||||
|                             version: "v1".into(), | ||||
|                             kind: "IngressController".into(), | ||||
|                         }; | ||||
|                         let ic = client | ||||
|                             .get_resource_json_value("default", Some("openshift-ingress-operator"), &gvk) | ||||
|                             .await | ||||
|                             .map_err(|_| PreparationError::new("Failed to fetch IngressController".to_string()))?; | ||||
| 
 | ||||
|                     match ic.data["status"]["domain"].as_str() { | ||||
|                         Some(domain) => Ok(format!("{service}.{domain}")), | ||||
|                         None => Err(PreparationError::new("Could not find domain".to_string())), | ||||
|                         if let Some(domain) = ic.data["status"]["domain"].as_str() { | ||||
|                             return Ok(format!("{service}.{domain}")); | ||||
|                         } else { | ||||
|                             warn!("OpenShift IngressController present but no status.domain set"); | ||||
|                         } | ||||
|                     } else { | ||||
|                         trace!("OpenShift ingress operator not detected; trying generic Kubernetes"); | ||||
|                     } | ||||
| 
 | ||||
|                     // 2) Try NGINX Ingress Controller common setups
 | ||||
|                     // 2.a) Well-known namespace/name for the controller Service
 | ||||
|                     //      - upstream default: namespace "ingress-nginx", service "ingress-nginx-controller"
 | ||||
|                     //      - some distros: "ingress-nginx-controller" svc in "ingress-nginx" ns
 | ||||
|                     // If found with LoadBalancer ingress hostname, use its base domain.
 | ||||
|                     if let Some(domain) = try_nginx_lb_domain(&client).await? { | ||||
|                         return Ok(format!("{service}.{domain}")); | ||||
|                     } | ||||
| 
 | ||||
|                     // 3) Fallback: internal cluster DNS suffix (service.namespace.svc.cluster.local)
 | ||||
|                     // We don't have tenant namespace here, so we fallback to 'default' with a warning.
 | ||||
|                     warn!("Could not determine external ingress domain; falling back to internal-only DNS"); | ||||
|                     let internal = format!("{service}.default.svc.cluster.local"); | ||||
|                     Ok(internal) | ||||
|                 } | ||||
|             } | ||||
|         } else { | ||||
| @ -659,3 +680,57 @@ impl Ingress for K8sAnywhereTopology { | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| async fn try_nginx_lb_domain(client: &K8sClient) -> Result<Option<String>, PreparationError> { | ||||
|     use log::{trace, debug}; | ||||
| 
 | ||||
|     // Try common service path: svc/ingress-nginx-controller in ns/ingress-nginx
 | ||||
|     let svc_gvk = GroupVersionKind { | ||||
|         group: "".into(), // core
 | ||||
|         version: "v1".into(), | ||||
|         kind: "Service".into(), | ||||
|     }; | ||||
| 
 | ||||
|     let candidates = [ | ||||
|         ("ingress-nginx", "ingress-nginx-controller"), | ||||
|         ("ingress-nginx", "ingress-nginx-controller-internal"), | ||||
|         ("ingress-nginx", "ingress-nginx"), // some charts name the svc like this
 | ||||
|         ("kube-system", "ingress-nginx-controller"), // less common but seen
 | ||||
|     ]; | ||||
| 
 | ||||
|     for (ns, name) in candidates { | ||||
|         trace!("Checking NGINX Service {ns}/{name} for LoadBalancer hostname"); | ||||
|         if let Ok(svc) = client.get_resource_json_value(ns, Some(name), &svc_gvk).await { | ||||
|             let lb_hosts = svc.data["status"]["loadBalancer"]["ingress"].as_array().cloned().unwrap_or_default(); | ||||
|             for entry in lb_hosts { | ||||
|                 if let Some(host) = entry.get("hostname").and_then(|v| v.as_str()) { | ||||
|                     debug!("Found NGINX LB hostname: {host}"); | ||||
|                     if let Some(domain) = extract_base_domain(host) { | ||||
|                         return Ok(Some(domain.to_string())); | ||||
|                     } else { | ||||
|                         return Ok(Some(host.to_string())); // already a domain
 | ||||
|                     } | ||||
|                 } | ||||
|                 if let Some(ip) = entry.get("ip").and_then(|v| v.as_str()) { | ||||
|                     // If only an IP is exposed, we can't create a hostname; return None to keep searching
 | ||||
|                     debug!("NGINX LB exposes IP {ip} (no hostname); skipping"); | ||||
|                 } | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     Ok(None) | ||||
| } | ||||
| 
 | ||||
| fn extract_base_domain(host: &str) -> Option<String> { | ||||
|     // For a host like a1b2c3d4e5f6abcdef.elb.amazonaws.com -> base domain elb.amazonaws.com
 | ||||
|     // For a managed DNS like xyz.example.com -> base domain example.com (keep 2+ labels)
 | ||||
|     // Heuristic: keep last 2 labels by default; special-case known multi-label TLDs if needed.
 | ||||
|     let parts: Vec<&str> = host.split('.').collect(); | ||||
|     if parts.len() >= 2 { | ||||
|         // Very conservative: last 2 labels
 | ||||
|         Some(parts[parts.len() - 2..].join(".")) | ||||
|     } else { | ||||
|         None | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -10,7 +10,7 @@ use super::OPNSenseFirewall; | ||||
| 
 | ||||
| #[async_trait] | ||||
| impl DnsServer for OPNSenseFirewall { | ||||
|     async fn register_hosts(&self, hosts: Vec<DnsRecord>) -> Result<(), ExecutorError> { | ||||
|     async fn register_hosts(&self, _hosts: Vec<DnsRecord>) -> Result<(), ExecutorError> { | ||||
|         todo!("Refactor this to use dnsmasq") | ||||
|         // let mut writable_opnsense = self.opnsense_config.write().await;
 | ||||
|         // let mut dns = writable_opnsense.dns();
 | ||||
| @ -68,7 +68,7 @@ impl DnsServer for OPNSenseFirewall { | ||||
|         self.host.clone() | ||||
|     } | ||||
| 
 | ||||
|     async fn register_dhcp_leases(&self, register: bool) -> Result<(), ExecutorError> { | ||||
|     async fn register_dhcp_leases(&self, _register: bool) -> Result<(), ExecutorError> { | ||||
|         todo!("Refactor this to use dnsmasq") | ||||
|         // let mut writable_opnsense = self.opnsense_config.write().await;
 | ||||
|         // let mut dns = writable_opnsense.dns();
 | ||||
|  | ||||
| @ -21,7 +21,7 @@ pub struct Helm { | ||||
|     pub skip_schema_validation: Option<bool>, | ||||
|     pub version: Option<String>, | ||||
|     pub kube_version: Option<String>, | ||||
|     pub api_versions: Vec<String>, | ||||
|     // pub api_versions: Vec<String>,
 | ||||
|     pub namespace: Option<String>, | ||||
| } | ||||
| 
 | ||||
| @ -105,7 +105,7 @@ impl Default for ArgoApplication { | ||||
|                     skip_schema_validation: None, | ||||
|                     version: None, | ||||
|                     kube_version: None, | ||||
|                     api_versions: vec![], | ||||
|                     // api_versions: vec![],
 | ||||
|                     namespace: None, | ||||
|                 }, | ||||
|                 path: "".to_string(), | ||||
| @ -155,7 +155,7 @@ impl From<CDApplicationConfig> for ArgoApplication { | ||||
|                     skip_schema_validation: None, | ||||
|                     version: None, | ||||
|                     kube_version: None, | ||||
|                     api_versions: vec![], | ||||
|                     // api_versions: vec![],
 | ||||
|                     namespace: None, | ||||
|                 }, | ||||
|             }, | ||||
| @ -181,13 +181,11 @@ impl From<CDApplicationConfig> for ArgoApplication { | ||||
| } | ||||
| 
 | ||||
| impl ArgoApplication { | ||||
|     pub fn to_yaml(&self) -> serde_yaml::Value { | ||||
|     pub fn to_yaml(&self, target_namespace: Option<&str>) -> serde_yaml::Value { | ||||
|         let name = &self.name; | ||||
|         let namespace = if let Some(ns) = self.namespace.as_ref() { | ||||
|             ns | ||||
|         } else { | ||||
|             "argocd" | ||||
|         }; | ||||
|         let default_ns = "argocd".to_string(); | ||||
|         let namespace: &str = | ||||
|             target_namespace.unwrap_or(self.namespace.as_ref().unwrap_or(&default_ns)); | ||||
|         let project = &self.project; | ||||
| 
 | ||||
|         let yaml_str = format!( | ||||
| @ -285,7 +283,7 @@ mod tests { | ||||
|                     skip_schema_validation: None, | ||||
|                     version: None, | ||||
|                     kube_version: None, | ||||
|                     api_versions: vec![], | ||||
|                     // api_versions: vec![],
 | ||||
|                     namespace: None, | ||||
|                 }, | ||||
|                 path: "".to_string(), | ||||
| @ -345,7 +343,7 @@ spec: | ||||
| 
 | ||||
|         assert_eq!( | ||||
|             expected_yaml_output.trim(), | ||||
|             serde_yaml::to_string(&app.clone().to_yaml()) | ||||
|             serde_yaml::to_string(&app.clone().to_yaml(None)) | ||||
|                 .unwrap() | ||||
|                 .trim() | ||||
|         ); | ||||
|  | ||||
| @ -1,21 +1,19 @@ | ||||
| use async_trait::async_trait; | ||||
| use kube::{Api, api::GroupVersionKind}; | ||||
| use log::{debug, warn}; | ||||
| use log::{debug, info, trace, warn}; | ||||
| use non_blank_string_rs::NonBlankString; | ||||
| use serde::Serialize; | ||||
| use serde::de::DeserializeOwned; | ||||
| use std::{process::Command, str::FromStr, sync::Arc}; | ||||
| use std::{str::FromStr, sync::Arc}; | ||||
| 
 | ||||
| use crate::{ | ||||
|     data::Version, | ||||
|     interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, | ||||
|     inventory::Inventory, | ||||
|     modules::helm::chart::{HelmChartScore, HelmRepository}, | ||||
|     score::Score, | ||||
|     topology::{ | ||||
|         HelmCommand, K8sclient, PreparationError, PreparationOutcome, Topology, ingress::Ingress, | ||||
|         k8s::K8sClient, | ||||
|     modules::{ | ||||
|         argocd::{ArgoDeploymentType, detect_argo_deployment_type}, | ||||
|         helm::chart::{HelmChartScore, HelmRepository}, | ||||
|     }, | ||||
|     score::Score, | ||||
|     topology::{HelmCommand, K8sclient, Topology, ingress::Ingress, k8s::K8sClient}, | ||||
| }; | ||||
| use harmony_types::id::Id; | ||||
| 
 | ||||
| @ -24,6 +22,7 @@ use super::ArgoApplication; | ||||
| #[derive(Debug, Serialize, Clone)] | ||||
| pub struct ArgoHelmScore { | ||||
|     pub namespace: String, | ||||
|     // TODO: remove and rely on topology (it now knows the flavor)
 | ||||
|     pub openshift: bool, | ||||
|     pub argo_apps: Vec<ArgoApplication>, | ||||
| } | ||||
| @ -54,31 +53,101 @@ impl<T: Topology + K8sclient + HelmCommand + Ingress> Interpret<T> for ArgoInter | ||||
|         inventory: &Inventory, | ||||
|         topology: &T, | ||||
|     ) -> Result<Outcome, InterpretError> { | ||||
|         let k8s_client = topology.k8s_client().await?; | ||||
|         let svc = format!("argo-{}", self.score.namespace.clone()); | ||||
|         trace!("Starting ArgoInterpret execution {self:?}"); | ||||
|         let k8s_client: Arc<K8sClient> = topology.k8s_client().await?; | ||||
|         trace!("Got k8s client"); | ||||
|         let desired_ns = self.score.namespace.clone(); | ||||
| 
 | ||||
|         debug!("ArgoInterpret detecting cluster configuration"); | ||||
|         let svc = format!("argo-{}", desired_ns); | ||||
|         let domain = topology.get_domain(&svc).await?; | ||||
|         // FIXME we now have a way to know if we're running on openshift family
 | ||||
|         
 | ||||
|         let helm_score = | ||||
|             argo_helm_chart_score(&self.score.namespace, self.score.openshift, &domain); 
 | ||||
|         debug!("Resolved Argo service domain for '{}': {}", svc, domain); | ||||
| 
 | ||||
|         helm_score.interpret(inventory, topology).await?; | ||||
|         // Detect current Argo deployment type
 | ||||
|         let current = detect_argo_deployment_type(&k8s_client, &desired_ns).await?; | ||||
|         info!("Detected Argo deployment type: {:?}", current); | ||||
| 
 | ||||
|         // Decide control namespace and whether we must install
 | ||||
|         let (control_ns, must_install) = match current.clone() { | ||||
|             ArgoDeploymentType::NotInstalled => { | ||||
|                 info!( | ||||
|                     "Argo CD not installed. Will install via Helm into namespace '{}'.", | ||||
|                     desired_ns | ||||
|                 ); | ||||
|                 (desired_ns.clone(), true) | ||||
|             } | ||||
|             ArgoDeploymentType::AvailableInDesiredNamespace(ns) => { | ||||
|                 info!( | ||||
|                     "Argo CD already installed by Harmony in '{}'. Skipping install.", | ||||
|                     ns | ||||
|                 ); | ||||
|                 (ns, false) | ||||
|             } | ||||
|             ArgoDeploymentType::InstalledClusterWide(ns) => { | ||||
|                 info!( | ||||
|                     "Argo CD installed cluster-wide in namespace '{}'.", | ||||
|                     ns | ||||
|                 ); | ||||
|                 (ns, false) | ||||
|             } | ||||
|             ArgoDeploymentType::InstalledNamespaceScoped(ns) => { | ||||
|                 // TODO we could support this use case by installing a new argo instance. But that
 | ||||
|                 // means handling a few cases that are out of scope for now :
 | ||||
|                 // - Wether argo operator is installed
 | ||||
|                 // - Managing CRD versions compatibility
 | ||||
|                 // - Potentially handling the various k8s flavors and setups we might encounter
 | ||||
|                 //
 | ||||
|                 // There is a possibility that the helm chart already handles most or even all of these use cases but they are out of scope for now.
 | ||||
|                 let msg = format!( | ||||
|                     "Argo CD found in '{}' but it is namespace-scoped and not supported for attachment yet.", | ||||
|                     ns | ||||
|                 ); | ||||
|                 warn!("{}", msg); | ||||
|                 return Err(InterpretError::new(msg)); | ||||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         info!("ArgoCD will be installed : {must_install} . Current argocd status : {current:?} "); | ||||
| 
 | ||||
|         if must_install { | ||||
|             let helm_score = argo_helm_chart_score(&desired_ns, self.score.openshift, &domain); | ||||
|             info!( | ||||
|                 "Installing Argo CD via Helm into namespace '{}' ...", | ||||
|                 desired_ns | ||||
|             ); | ||||
|             helm_score.interpret(inventory, topology).await?; | ||||
|             info!("Argo CD install complete in '{}'.", desired_ns); | ||||
|         } | ||||
| 
 | ||||
|         let yamls: Vec<serde_yaml::Value> = self | ||||
|             .argo_apps | ||||
|             .iter() | ||||
|             .map(|a| a.to_yaml(Some(&control_ns))) | ||||
|             .collect(); | ||||
|         info!( | ||||
|             "Applying {} Argo application object(s) into control namespace '{}'.", | ||||
|             yamls.len(), | ||||
|             control_ns | ||||
|         ); | ||||
|         k8s_client | ||||
|             .apply_yaml_many(&self.argo_apps.iter().map(|a| a.to_yaml()).collect(), None) | ||||
|             .apply_yaml_many(&yamls, Some(control_ns.as_str())) | ||||
|             .await | ||||
|             .unwrap(); | ||||
|             .map_err(|e| InterpretError::new(format!("Failed applying Argo CRs: {e}")))?; | ||||
| 
 | ||||
|         Ok(Outcome::success_with_details( | ||||
|             format!( | ||||
|                 "ArgoCD {} {}", | ||||
|                 self.argo_apps.len(), | ||||
|                 match self.argo_apps.len() { | ||||
|                     1 => "application", | ||||
|                     _ => "applications", | ||||
|                 if self.argo_apps.len() == 1 { | ||||
|                     "application" | ||||
|                 } else { | ||||
|                     "applications" | ||||
|                 } | ||||
|             ), | ||||
|             vec![format!("argo application: http://{}", domain)], | ||||
|             vec![ | ||||
|                 format!("control_namespace={}", control_ns), | ||||
|                 format!("argo ui: http://{}", domain), | ||||
|             ], | ||||
|         )) | ||||
|     } | ||||
| 
 | ||||
| @ -87,7 +156,7 @@ impl<T: Topology + K8sclient + HelmCommand + Ingress> Interpret<T> for ArgoInter | ||||
|     } | ||||
| 
 | ||||
|     fn get_version(&self) -> Version { | ||||
|         todo!() | ||||
|         Version::from("0.1.0").unwrap() | ||||
|     } | ||||
| 
 | ||||
|     fn get_status(&self) -> InterpretStatus { | ||||
| @ -95,39 +164,7 @@ impl<T: Topology + K8sclient + HelmCommand + Ingress> Interpret<T> for ArgoInter | ||||
|     } | ||||
| 
 | ||||
|     fn get_children(&self) -> Vec<Id> { | ||||
|         todo!() | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl ArgoInterpret { | ||||
|     pub async fn get_host_domain( | ||||
|         &self, | ||||
|         client: Arc<K8sClient>, | ||||
|         openshift: bool, | ||||
|     ) -> Result<String, InterpretError> { | ||||
|         //This should be the job of the topology to determine if we are in
 | ||||
|         //openshift, potentially we need on openshift topology the same way we create a
 | ||||
|         //localhosttopology
 | ||||
|         match openshift { | ||||
|             true => { | ||||
|                 let gvk = GroupVersionKind { | ||||
|                     group: "operator.openshift.io".into(), | ||||
|                     version: "v1".into(), | ||||
|                     kind: "IngressController".into(), | ||||
|                 }; | ||||
|                 let ic = client | ||||
|                     .get_resource_json_value("default", Some("openshift-ingress-operator"), &gvk) | ||||
|                     .await?; | ||||
| 
 | ||||
|                 match ic.data["status"]["domain"].as_str() { | ||||
|                     Some(domain) => return Ok(domain.to_string()), | ||||
|                     None => return Err(InterpretError::new("Could not find domain".to_string())), | ||||
|                 } | ||||
|             } | ||||
|             false => { | ||||
|                 todo!() | ||||
|             } | ||||
|         }; | ||||
|         vec![] | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|  | ||||
| @ -198,7 +198,7 @@ impl< | ||||
|                     openshift: true, | ||||
|                     argo_apps: vec![ArgoApplication::from(CDApplicationConfig { | ||||
|                         // helm pull oci://hub.nationtech.io/harmony/harmony-example-rust-webapp-chart --version 0.1.0
 | ||||
|                         version: Version::from("0.1.0").unwrap(), | ||||
|                         version: Version::from("0.2.1").unwrap(), | ||||
|                         helm_chart_repo_url: "hub.nationtech.io/harmony".to_string(), | ||||
|                         helm_chart_name: format!("{}-chart", self.application.name()), | ||||
|                         values_overrides: None, | ||||
|  | ||||
| @ -3,7 +3,6 @@ use std::sync::Arc; | ||||
| use crate::modules::application::{ | ||||
|     Application, ApplicationFeature, InstallationError, InstallationOutcome, | ||||
| }; | ||||
| use crate::modules::monitoring::application_monitoring::application_monitoring_score::ApplicationMonitoringScore; | ||||
| use crate::modules::monitoring::application_monitoring::rhobs_application_monitoring_score::ApplicationRHOBMonitoringScore; | ||||
| 
 | ||||
| use crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::RHOBObservability; | ||||
|  | ||||
| @ -205,10 +205,10 @@ impl RustWebapp { | ||||
|                     Some(body_full(tar_data.into())), | ||||
|                 ); | ||||
| 
 | ||||
|                 while let Some(mut msg) = image_build_stream.next().await { | ||||
|                 while let Some(msg) = image_build_stream.next().await { | ||||
|                     trace!("Got bollard msg {msg:?}"); | ||||
|                     match msg { | ||||
|                         Ok(mut msg) => { | ||||
|                         Ok(msg) => { | ||||
|                             if let Some(progress) = msg.progress_detail { | ||||
|                                 info!( | ||||
|                                     "Build progress {}/{}", | ||||
| @ -480,7 +480,7 @@ apiVersion: v2 | ||||
| name: {chart_name} | ||||
| description: A Helm chart for the {app_name} web application. | ||||
| type: application | ||||
| version: 0.2.0 | ||||
| version: 0.2.1 | ||||
| appVersion: "{image_tag}" | ||||
| "#,
 | ||||
|         ); | ||||
|  | ||||
| @ -1,20 +0,0 @@ | ||||
| /// Discover the current ArgoCD setup
 | ||||
| ///
 | ||||
| /// 1. No argo installed
 | ||||
| /// 2. Argo installed in current namespace
 | ||||
| /// 3. Argo installed in different namespace (assuming cluster wide access)
 | ||||
| /// 
 | ||||
| /// For now we will go ahead with this very basic logic, there are many intricacies that can be
 | ||||
| /// dealt with later, such as multitenant management in a single argo instance, credentials setup t
 | ||||
| 
 | ||||
| #[async_trait] | ||||
| pub trait ArgoCD { | ||||
|     async fn ensure_installed() { | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| struct CurrentNamespaceArgo; | ||||
| 
 | ||||
| 
 | ||||
| impl ArgoCD for CurrentNamespaceArgo { | ||||
| } | ||||
| @ -1,2 +1,203 @@ | ||||
| mod discover; | ||||
| pub use discover::*; | ||||
| use std::sync::Arc; | ||||
| 
 | ||||
| use log::{debug, info}; | ||||
| 
 | ||||
| use crate::{interpret::InterpretError, topology::k8s::K8sClient}; | ||||
| 
 | ||||
| #[derive(Clone, Debug, PartialEq, Eq)] | ||||
| pub enum ArgoScope { | ||||
|     ClusterWide(String), | ||||
|     NamespaceScoped(String), | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone, Debug)] | ||||
| pub struct DiscoveredArgo { | ||||
|     pub control_namespace: String, | ||||
|     pub scope: ArgoScope, | ||||
|     pub has_crds: bool, | ||||
|     pub has_applicationset: bool, | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone, Debug, PartialEq, Eq)] | ||||
| pub enum ArgoDeploymentType { | ||||
|     NotInstalled, | ||||
|     AvailableInDesiredNamespace(String), | ||||
|     InstalledClusterWide(String), | ||||
|     InstalledNamespaceScoped(String), | ||||
| } | ||||
| 
 | ||||
| pub async fn discover_argo_all( | ||||
|     k8s: &Arc<K8sClient>, | ||||
| ) -> Result<Vec<DiscoveredArgo>, InterpretError> { | ||||
|     use log::{debug, info, trace, warn}; | ||||
| 
 | ||||
|     trace!("Starting Argo discovery"); | ||||
| 
 | ||||
|     // CRDs
 | ||||
|     let mut has_crds = true; | ||||
|     let required_crds = vec!["applications.argoproj.io", "appprojects.argoproj.io"]; | ||||
|     trace!("Checking required Argo CRDs: {:?}", required_crds); | ||||
| 
 | ||||
|     for crd in required_crds { | ||||
|         trace!("Verifying CRD presence: {crd}"); | ||||
|         let crd_exists = k8s.has_crd(crd).await.map_err(|e| { | ||||
|             InterpretError::new(format!("Failed to verify existence of CRD {crd}: {e}")) | ||||
|         })?; | ||||
| 
 | ||||
|         debug!("CRD {crd} exists: {crd_exists}"); | ||||
|         if !crd_exists { | ||||
|             info!( | ||||
|                 "Missing Argo CRD {crd}, looks like Argo CD is not installed (or partially installed)" | ||||
|             ); | ||||
|             has_crds = false; | ||||
|             break; | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     trace!( | ||||
|         "Listing namespaces with healthy Argo CD deployments using selector app.kubernetes.io/part-of=argocd" | ||||
|     ); | ||||
|     let mut candidate_namespaces = k8s | ||||
|         .list_namespaces_with_healthy_deployments("app.kubernetes.io/part-of=argocd") | ||||
|         .await | ||||
|         .map_err(|e| InterpretError::new(format!("List healthy argocd deployments: {e}")))?; | ||||
|     trace!( | ||||
|         "Listing namespaces with healthy Argo CD deployments using selector app.kubernetes.io/name=argo-cd" | ||||
|     ); | ||||
|     candidate_namespaces.append( | ||||
|         &mut k8s | ||||
|             .list_namespaces_with_healthy_deployments("app.kubernetes.io/name=argo-cd") | ||||
|             .await | ||||
|             .map_err(|e| InterpretError::new(format!("List healthy argocd deployments: {e}")))?, | ||||
|     ); | ||||
| 
 | ||||
|     debug!( | ||||
|         "Discovered {} candidate namespace(s) for Argo CD: {:?}", | ||||
|         candidate_namespaces.len(), | ||||
|         candidate_namespaces | ||||
|     ); | ||||
| 
 | ||||
|     let mut found = Vec::new(); | ||||
|     for ns in candidate_namespaces { | ||||
|         trace!("Evaluating namespace '{ns}' for Argo CD instance"); | ||||
| 
 | ||||
|         // Require the application-controller to be healthy (sanity check)
 | ||||
|         trace!( | ||||
|             "Checking healthy deployment with label app.kubernetes.io/name=argocd-application-controller in namespace '{ns}'" | ||||
|         ); | ||||
|         let controller_ok = k8s | ||||
|             .has_healthy_deployment_with_label( | ||||
|                 &ns, | ||||
|                 "app.kubernetes.io/name=argocd-application-controller", | ||||
|             ) | ||||
|             .await | ||||
|             .unwrap_or_else(|e| { | ||||
|                 warn!( | ||||
|                     "Error while checking application-controller health in namespace '{ns}': {e}" | ||||
|                 ); | ||||
|                 false | ||||
|             }) || k8s | ||||
|             .has_healthy_deployment_with_label( | ||||
|                 &ns, | ||||
|                 "app.kubernetes.io/component=controller", | ||||
|             ) | ||||
|             .await | ||||
|             .unwrap_or_else(|e| { | ||||
|                 warn!( | ||||
|                     "Error while checking application-controller health in namespace '{ns}': {e}" | ||||
|                 ); | ||||
|                 false | ||||
|             }); | ||||
|         debug!("Namespace '{ns}': application-controller healthy = {controller_ok}"); | ||||
| 
 | ||||
|         if !controller_ok { | ||||
|             trace!("Skipping namespace '{ns}' because application-controller is not healthy"); | ||||
|             continue; | ||||
|         } | ||||
| 
 | ||||
|         trace!("Determining Argo CD scope for namespace '{ns}' (cluster-wide vs namespace-scoped)"); | ||||
|         let scope = match k8s.is_argocd_cluster_wide(&ns).await { | ||||
|             Ok(true) => { | ||||
|                 debug!("Namespace '{ns}' identified as cluster-wide Argo CD control plane"); | ||||
|                 ArgoScope::ClusterWide(ns.to_string()) | ||||
|             } | ||||
|             Ok(false) => { | ||||
|                 debug!("Namespace '{ns}' identified as namespace-scoped Argo CD control plane"); | ||||
|                 ArgoScope::NamespaceScoped(ns.to_string()) | ||||
|             } | ||||
|             Err(e) => { | ||||
|                 warn!( | ||||
|                     "Failed to determine Argo CD scope for namespace '{ns}': {e}. Assuming namespace-scoped." | ||||
|                 ); | ||||
|                 ArgoScope::NamespaceScoped(ns.to_string()) | ||||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         trace!("Checking optional ApplicationSet CRD (applicationsets.argoproj.io)"); | ||||
|         let has_applicationset = match k8s.has_crd("applicationsets.argoproj.io").await { | ||||
|             Ok(v) => { | ||||
|                 debug!("applicationsets.argoproj.io present: {v}"); | ||||
|                 v | ||||
|             } | ||||
|             Err(e) => { | ||||
|                 warn!("Failed to check applicationsets.argoproj.io CRD: {e}. Assuming absent."); | ||||
|                 false | ||||
|             } | ||||
|         }; | ||||
| 
 | ||||
|         let argo = DiscoveredArgo { | ||||
|             control_namespace: ns.clone(), | ||||
|             scope, | ||||
|             has_crds, | ||||
|             has_applicationset, | ||||
|         }; | ||||
| 
 | ||||
|         debug!("Discovered Argo instance in '{ns}': {argo:?}"); | ||||
|         found.push(argo); | ||||
|     } | ||||
| 
 | ||||
|     if found.is_empty() { | ||||
|         info!("No Argo CD installations discovered"); | ||||
|     } else { | ||||
|         info!( | ||||
|             "Argo CD discovery complete: {} instance(s) found", | ||||
|             found.len() | ||||
|         ); | ||||
|     } | ||||
| 
 | ||||
|     Ok(found) | ||||
| } | ||||
| 
 | ||||
| pub async fn detect_argo_deployment_type( | ||||
|     k8s: &Arc<K8sClient>, | ||||
|     desired_namespace: &str, | ||||
| ) -> Result<ArgoDeploymentType, InterpretError> { | ||||
|     let discovered = discover_argo_all(k8s).await?; | ||||
|     debug!("Discovered argo instances {discovered:?}"); | ||||
| 
 | ||||
|     if discovered.is_empty() { | ||||
|         return Ok(ArgoDeploymentType::NotInstalled); | ||||
|     } | ||||
| 
 | ||||
|     if let Some(d) = discovered | ||||
|         .iter() | ||||
|         .find(|d| d.control_namespace == desired_namespace) | ||||
|     { | ||||
|         return Ok(ArgoDeploymentType::AvailableInDesiredNamespace( | ||||
|             d.control_namespace.clone(), | ||||
|         )); | ||||
|     } | ||||
| 
 | ||||
|     if let Some(d) = discovered | ||||
|         .iter() | ||||
|         .find(|d| matches!(d.scope, ArgoScope::ClusterWide(_))) | ||||
|     { | ||||
|         return Ok(ArgoDeploymentType::InstalledClusterWide( | ||||
|             d.control_namespace.clone(), | ||||
|         )); | ||||
|     } | ||||
| 
 | ||||
|     Ok(ArgoDeploymentType::InstalledNamespaceScoped( | ||||
|         discovered[0].control_namespace.clone(), | ||||
|     )) | ||||
| } | ||||
|  | ||||
| @ -90,12 +90,12 @@ impl<T: Topology> Interpret<T> for DiscoverInventoryAgentInterpret { | ||||
|                             // refactoring to do it now
 | ||||
|                             let harmony_inventory_agent::hwinfo::PhysicalHost { | ||||
|                                 storage_drives, | ||||
|                                 storage_controller, | ||||
|                                 storage_controller: _, | ||||
|                                 memory_modules, | ||||
|                                 cpus, | ||||
|                                 chipset, | ||||
|                                 chipset: _, | ||||
|                                 network_interfaces, | ||||
|                                 management_interface, | ||||
|                                 management_interface: _, | ||||
|                                 host_uuid, | ||||
|                             } = host; | ||||
| 
 | ||||
|  | ||||
| @ -1,12 +1,8 @@ | ||||
| use std::collections::BTreeMap; | ||||
| 
 | ||||
| use kube::CustomResource; | ||||
| use schemars::JsonSchema; | ||||
| use serde::{Deserialize, Serialize}; | ||||
| 
 | ||||
| use crate::modules::monitoring::kube_prometheus::crd::rhob_prometheuses::{ | ||||
|     LabelSelector, PrometheusSpec, | ||||
| }; | ||||
| use crate::modules::monitoring::kube_prometheus::crd::rhob_prometheuses::LabelSelector; | ||||
| 
 | ||||
| /// MonitoringStack CRD for monitoring.rhobs/v1alpha1
 | ||||
| #[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)] | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user