From 8959719375b0e74e88a4af682f7e35f466e7a968 Mon Sep 17 00:00:00 2001 From: wjro Date: Mon, 26 Jan 2026 16:21:26 -0500 Subject: [PATCH 01/10] wip: created decentralized topology, capability nats and nats super cluster --- Cargo.lock | 16 + harmony/src/domain/topology/decentralized.rs | 68 +++++ .../topology/k8s_anywhere/k8s_anywhere.rs | 76 ++++- .../src/domain/topology/k8s_anywhere/mod.rs | 1 + .../src/domain/topology/k8s_anywhere/nats.rs | 148 +++++++++ harmony/src/domain/topology/mod.rs | 1 + harmony/src/modules/mod.rs | 1 + harmony/src/modules/nats/capability.rs | 55 ++++ harmony/src/modules/nats/decentralized.rs | 281 ++++++++++++++++++ harmony/src/modules/nats/mod.rs | 4 + harmony/src/modules/nats/score_nats_k8s.rs | 163 ++++++++++ .../modules/nats/score_nats_supercluster.rs | 66 ++++ 12 files changed, 874 insertions(+), 6 deletions(-) create mode 100644 harmony/src/domain/topology/decentralized.rs create mode 100644 harmony/src/domain/topology/k8s_anywhere/nats.rs create mode 100644 harmony/src/modules/nats/capability.rs create mode 100644 harmony/src/modules/nats/decentralized.rs create mode 100644 harmony/src/modules/nats/mod.rs create mode 100644 harmony/src/modules/nats/score_nats_k8s.rs create mode 100644 harmony/src/modules/nats/score_nats_supercluster.rs diff --git a/Cargo.lock b/Cargo.lock index 4c39919f..750a4949 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1912,6 +1912,22 @@ dependencies = [ "url", ] +[[package]] +name = "example-nats-supercluster" +version = "0.1.0" +dependencies = [ + "cidr", + "env_logger", + "harmony", + "harmony_cli", + "harmony_macros", + "harmony_types", + "k8s-openapi", + "log", + "tokio", + "url", +] + [[package]] name = "example-ntfy" version = "0.1.0" diff --git a/harmony/src/domain/topology/decentralized.rs b/harmony/src/domain/topology/decentralized.rs new file mode 100644 index 00000000..64f0ba2b --- /dev/null +++ b/harmony/src/domain/topology/decentralized.rs @@ -0,0 +1,68 @@ +use async_trait::async_trait; + +use crate::{modules::nats::capability::{Nats, NatsCluster}, topology::{ + K8sAnywhereConfig, K8sAnywhereTopology, PreparationError, PreparationOutcome, Topology, +}}; + +pub struct DecentralizedTopology { + pub site_1: T, + pub site_2: T, + pub site_3: T, +} + +#[async_trait] +impl Topology for DecentralizedTopology { + fn name(&self) -> &str { + "DecentralizedTopology" + } + + async fn ensure_ready(&self) -> Result { + let site_1_outcome = self.site_1.ensure_ready().await?; + let site_2_outcome = self.site_2.ensure_ready().await?; + let site_3_outcome = self.site_3.ensure_ready().await?; + + match (site_1_outcome, site_2_outcome, site_3_outcome) { + (PreparationOutcome::Noop, PreparationOutcome::Noop, PreparationOutcome::Noop) => { + Ok(PreparationOutcome::Noop) + } + (a, b, c) => { + let mut details = Vec::new(); + if let PreparationOutcome::Success { details: d } = a { + details.push(format!("Site_1: {}", d)); + } + + if let PreparationOutcome::Success { details: d } = b { + details.push(format!("Site_2: {}", d)); + } + if let PreparationOutcome::Success { details: d } = c { + details.push(format!("Site_3: {}", d)); + } + Ok(PreparationOutcome::Success { + details: details.join(","), + }) + } + } + } +} + +impl DecentralizedTopology { + pub fn from_env() -> Self { + let site_1_config = + K8sAnywhereConfig::remote_k8s_from_env_var("HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_1"); + let site_2_config = + K8sAnywhereConfig::remote_k8s_from_env_var("HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_2"); + let site_3_config = + K8sAnywhereConfig::remote_k8s_from_env_var("HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_3"); + + let site_1 = K8sAnywhereTopology::with_config(site_1_config); + let site_2 = K8sAnywhereTopology::with_config(site_2_config); + let site_3 = K8sAnywhereTopology::with_config(site_3_config); + + Self { + site_1, + site_2, + site_3, + } + } +} + diff --git a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs index a10061d5..f87d71d1 100644 --- a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs @@ -3,9 +3,12 @@ use std::{collections::BTreeMap, process::Command, sync::Arc, time::Duration}; use async_trait::async_trait; use base64::{Engine, engine::general_purpose}; use harmony_types::rfc1123::Rfc1123Name; -use k8s_openapi::api::{ - core::v1::{Pod, Secret}, - rbac::v1::{ClusterRoleBinding, RoleRef, Subject}, +use k8s_openapi::{ + ByteString, + api::{ + core::v1::{Pod, Secret}, + rbac::v1::{ClusterRoleBinding, RoleRef, Subject}, + }, }; use kube::api::{DynamicObject, GroupVersionKind, ObjectMeta}; use log::{debug, info, trace, warn}; @@ -28,7 +31,10 @@ use crate::{ score_cert_management::CertificateManagementScore, }, k3d::K3DInstallationScore, - k8s::ingress::{K8sIngressScore, PathType}, + k8s::{ + ingress::{K8sIngressScore, PathType}, + resource::K8sResourceScore, + }, monitoring::{ grafana::{grafana::Grafana, helm::helm_grafana::grafana_helm_chart_score}, kube_prometheus::crd::{ @@ -45,8 +51,8 @@ use crate::{ service_monitor::ServiceMonitor, }, }, - okd::crd::ingresses_config::Ingress as IngressResource, - okd::route::OKDTlsPassthroughScore, + nats::capability::NatsCluster, + okd::{crd::ingresses_config::Ingress as IngressResource, route::OKDTlsPassthroughScore}, prometheus::{ k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore, prometheus::PrometheusMonitoring, rhob_alerting_score::RHOBAlertingScore, @@ -559,6 +565,64 @@ impl K8sAnywhereTopology { } } + async fn build_ca_bundle_secret( + &self, + namespace: &str, + nats_cluster: &NatsCluster, + bundle: &Vec, + ) -> Secret { + Secret { + metadata: ObjectMeta { + name: Some(nats_cluster.supercluster_ca_secret_name.to_string()), + namespace: Some(namespace.to_string()), + ..Default::default() + }, + data: Some(self.build_secret_data(bundle).await), + immutable: Some(false), + type_: Some("Opaque".to_string()), + string_data: None, + } + } + + async fn build_secret_data(&self, bundle: &Vec) -> BTreeMap { + let mut data = BTreeMap::new(); + + data.insert( + "ca.crt".to_string(), + ByteString(bundle.join("\n").into_bytes()), + ); + + data + } + + pub async fn create_ca_bundle_secret( + &self, + nats_cluster: &NatsCluster, + ca_bundle: &Vec, + ) -> Result { + let bundle_secret = self + .build_ca_bundle_secret(&nats_cluster.namespace, nats_cluster, ca_bundle) + .await; + + debug!( + "deploying secret to ns: {} \nsecret: {:#?}", + nats_cluster.namespace, bundle_secret + ); + + let k8ssecret = + K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone())); + + k8ssecret + .interpret(&Inventory::empty(), self) + .await + .map_err(|e| ExecutorError::UnexpectedError(format!("{e}")))?; + + Ok(Outcome::success(format!( + "successfully craeted ca bundle {} in ns: {}", + nats_cluster.supercluster_ca_secret_name, nats_cluster.namespace + ))) + } + pub async fn certificate_issuer_ready( &self, issuer_name: String, diff --git a/harmony/src/domain/topology/k8s_anywhere/mod.rs b/harmony/src/domain/topology/k8s_anywhere/mod.rs index be870823..ca1c87c8 100644 --- a/harmony/src/domain/topology/k8s_anywhere/mod.rs +++ b/harmony/src/domain/topology/k8s_anywhere/mod.rs @@ -1,3 +1,4 @@ mod k8s_anywhere; mod postgres; +pub mod nats; pub use k8s_anywhere::*; diff --git a/harmony/src/domain/topology/k8s_anywhere/nats.rs b/harmony/src/domain/topology/k8s_anywhere/nats.rs new file mode 100644 index 00000000..eae9f61b --- /dev/null +++ b/harmony/src/domain/topology/k8s_anywhere/nats.rs @@ -0,0 +1,148 @@ +use std::collections::BTreeMap; + +use async_trait::async_trait; +use k8s_openapi::{ByteString, api::core::v1::Secret}; +use kube::api::ObjectMeta; +use log::debug; + +use crate::{ + inventory::Inventory, + modules::{ + k8s::{ingress::K8sIngressScore, resource::K8sResourceScore}, + nats::{ + capability::{Nats, NatsCluster, NatsEndpoint, NatsSupercluster}, + score_nats_k8s::NatsK8sScore, + }, + okd::{ + crd::{ + ingresses_config::Ingress, + route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig}, + }, + route::OKDRouteScore, + }, + }, + score::Score, + topology::{K8sAnywhereTopology, K8sclient, KubernetesDistribution, TlsRouter, Topology}, +}; + +#[async_trait] +impl Nats for K8sAnywhereTopology { + async fn deploy( + &self, + nats_cluster: &NatsCluster, + peers: Option>, + ) -> Result { + let domain = self.get_nats_endpoint(nats_cluster).await?; + match self + .get_k8s_distribution() + .await + .map_err(|e| format!("Error getting k8s distribution : {}", e.to_string()))? + { + KubernetesDistribution::OpenshiftFamily => { + let route = OKDRouteScore { + name: nats_cluster.name.to_string(), + namespace: nats_cluster.namespace.to_string(), + spec: RouteSpec { + to: RouteTargetReference { + kind: "Service".to_string(), + name: nats_cluster.name.to_string(), + weight: Some(100), + }, + host: Some(nats_cluster.dns_name.clone()), + port: Some(RoutePort { target_port: 7222 }), + tls: Some(TLSConfig { + insecure_edge_termination_policy: None, + termination: "passthrough".to_string(), + ..Default::default() + }), + wildcard_policy: None, + ..Default::default() + }, + }; + route + .interpret(&Inventory::empty(), self) + .await + .map_err(|e| format!("Failed to deploy OKD Route: {e}"))?; + } + KubernetesDistribution::K3sFamily => { + //TODO untested + K8sIngressScore { + name: todo!(), + host: todo!(), + backend_service: todo!(), + port: todo!(), + path: todo!(), + path_type: todo!(), + namespace: todo!(), + ingress_class_name: todo!(), + } + .interpret(&Inventory::empty(), self) + .await + .map_err(|e| format!("Failed to deploy K8s ingress: {e}"))?; + } + KubernetesDistribution::Default => { + //TODO untested + K8sIngressScore { + name: todo!(), + host: todo!(), + backend_service: todo!(), + port: todo!(), + path: todo!(), + path_type: todo!(), + namespace: todo!(), + ingress_class_name: todo!(), + } + .interpret(&Inventory::empty(), self) + .await + .map_err(|e| format!("Failed to deploy K8s ingress: {e}"))?; + } + } + + NatsK8sScore { + cluster: nats_cluster.clone(), + domain, + peers, + } + .interpret(&Inventory::empty(), self) + .await + .map_err(|e| format!("Failed to deploy nats cluster: {}", e))?; + + Ok(format!( + "Nats cluster deployed in ns {}", + nats_cluster.namespace + )) + } + + async fn get_nats_endpoint(&self, nats_cluster: &NatsCluster) -> Result { + let endpoint = self.get_internal_domain().await?.unwrap(); + + Ok(NatsEndpoint { host: endpoint }) + } + + async fn get_nats_ca_certificate( + &self, + nats_cluster: &NatsCluster, + ) -> Result, String> { + //TODO get individual certs for each cluster and return a vec that will be appended + //to the ca bundle vec + todo!() + } +} + +#[async_trait] +impl NatsSupercluster for K8sAnywhereTopology { + async fn create_ca_bundle( + &self, + nats_cluster: &NatsCluster, + ca_certs: Option>, + ) -> Result { + if let Some(ca_certs) = ca_certs { + self.create_ca_bundle_secret(nats_cluster, &ca_certs) + .await + .map_err(|e| format!("{}", e))?; + Ok("Successfully created ca bundle".to_string()) + } else { + Ok("No ca bundle certs provided".to_string()) + } + } +} diff --git a/harmony/src/domain/topology/mod.rs b/harmony/src/domain/topology/mod.rs index 42add5c7..c7d3da36 100644 --- a/harmony/src/domain/topology/mod.rs +++ b/harmony/src/domain/topology/mod.rs @@ -3,6 +3,7 @@ mod ha_cluster; pub mod ingress; pub mod node_exporter; pub mod opnsense; +pub mod decentralized; pub use failover::*; use harmony_types::net::IpAddress; mod host_binding; diff --git a/harmony/src/modules/mod.rs b/harmony/src/modules/mod.rs index c845fb10..5601ba41 100644 --- a/harmony/src/modules/mod.rs +++ b/harmony/src/modules/mod.rs @@ -21,3 +21,4 @@ pub mod prometheus; pub mod storage; pub mod tenant; pub mod tftp; +pub mod nats; diff --git a/harmony/src/modules/nats/capability.rs b/harmony/src/modules/nats/capability.rs new file mode 100644 index 00000000..18d840cb --- /dev/null +++ b/harmony/src/modules/nats/capability.rs @@ -0,0 +1,55 @@ +use std::fmt::Display; + +use async_trait::async_trait; +use serde::Serialize; + +#[async_trait] +pub trait Nats { + async fn deploy( + &self, + nats_cluster: &NatsCluster, + peers: Option>, + ) -> Result; + + async fn get_nats_endpoint(&self, nats_cluster: &NatsCluster) -> Result; + + async fn get_nats_ca_certificate(&self, nats_cluster: &NatsCluster) -> Result, String>; +} + +#[async_trait] +pub trait NatsSupercluster: Nats { + async fn create_ca_bundle( + &self, + nats_cluster: &NatsCluster, + ca_certs: Option>, + ) -> Result; +} + +pub struct SiteContext { + pub topology: T, + pub cluster: NatsCluster, +} + +#[derive(Debug, Clone, Serialize)] +pub struct NatsCluster { + pub namespace: String, + pub domain: String, + pub replicas: usize, + pub name: String, + pub gateway_advertise: String, + pub dns_name: String, + pub supercluster_ca_secret_name: &'static str, + pub tls_cert_name: &'static str, + pub jetstream_enabled: &'static str, +} + +#[derive(Debug, Clone, Serialize)] +pub struct NatsEndpoint { + pub host: String, +} + +impl Display for NatsEndpoint { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "({})", self.host,) + } +} diff --git a/harmony/src/modules/nats/decentralized.rs b/harmony/src/modules/nats/decentralized.rs new file mode 100644 index 00000000..0a305617 --- /dev/null +++ b/harmony/src/modules/nats/decentralized.rs @@ -0,0 +1,281 @@ +use async_trait::async_trait; + +use crate::{ + modules::nats::capability::{Nats, NatsCluster, NatsEndpoint, NatsSupercluster}, + topology::{TlsRouter, Topology, decentralized::DecentralizedTopology}, +}; + +#[async_trait] +impl Nats for DecentralizedTopology { + async fn deploy( + &self, + nats_cluster: &NatsCluster, + peers: Option>, + ) -> Result { + let supercluster_ca_secret_name = nats_cluster.supercluster_ca_secret_name; + + let site_1_cluster_name = format!("{}-site-1", nats_cluster.name); + let site_1_domain = format!( + "{}-gw.{}", + site_1_cluster_name, + self.site_1.get_nats_endpoint(nats_cluster).await? + ); + + let nats_site_1_cluster = NatsCluster { + namespace: nats_cluster.namespace.clone(), + replicas: nats_cluster.replicas.clone(), + name: site_1_cluster_name, + gateway_advertise: format!("{}:443", site_1_domain), + dns_name: site_1_domain.clone(), + supercluster_ca_secret_name, + tls_cert_name: nats_cluster.tls_cert_name, + jetstream_enabled: nats_cluster.jetstream_enabled, + domain: site_1_domain.clone(), + }; + + let site_2_cluster_name = format!("{}-site-2", nats_cluster.name); + let site_2_domain = format!( + "{}-gw.{}", + site_2_cluster_name, + self.site_2.get_nats_endpoint(nats_cluster).await? + ); + + let nats_site_2_cluster = NatsCluster { + namespace: nats_cluster.namespace.clone(), + replicas: nats_cluster.replicas.clone(), + name: site_2_cluster_name, + gateway_advertise: format!("{}:443", site_2_domain), + dns_name: site_2_domain.clone(), + supercluster_ca_secret_name, + tls_cert_name: nats_cluster.tls_cert_name, + jetstream_enabled: nats_cluster.jetstream_enabled, + domain: site_2_domain.clone(), + }; + + let site_3_cluster_name = format!("{}-site-3", nats_cluster.name); + let site_3_domain = format!( + "{}-gw.{}", + site_3_cluster_name, + self.site_3.get_nats_endpoint(nats_cluster).await? + ); + + let nats_site_3_cluster = NatsCluster { + namespace: nats_cluster.namespace.clone(), + replicas: nats_cluster.replicas.clone(), + name: site_3_cluster_name, + gateway_advertise: format!("{}:443", site_3_domain), + dns_name: site_3_domain.clone(), + supercluster_ca_secret_name, + tls_cert_name: nats_cluster.tls_cert_name, + jetstream_enabled: nats_cluster.jetstream_enabled, + domain: site_3_domain.clone(), + }; + + self.site_1 + .deploy( + &nats_site_1_cluster, + Some(vec![ + nats_site_2_cluster.clone(), + nats_site_3_cluster.clone(), + ]), + ) + .await?; + self.site_2 + .deploy( + &nats_site_2_cluster, + Some(vec![ + nats_site_1_cluster.clone(), + nats_site_3_cluster.clone(), + ]), + ) + .await?; + self.site_3 + .deploy( + &nats_site_3_cluster, + Some(vec![ + nats_site_1_cluster.clone(), + nats_site_2_cluster.clone(), + ]), + ) + .await?; + Ok(format!( + "Deployed Nats clusters across sites: \n1: {} \n2: {} \n3: {}", + nats_site_1_cluster.name, nats_site_2_cluster.name, nats_site_3_cluster.name + )) + } + + async fn get_nats_endpoint(&self, nats_cluster: &NatsCluster) -> Result { + //This feels like a problem + todo!() + } + + async fn get_nats_ca_certificate( + &self, + nats_cluster: &NatsCluster, + ) -> Result, String> { + let supercluster_ca_secret_name = nats_cluster.supercluster_ca_secret_name; + + let site_1_cluster_name = format!("{}-site-1", nats_cluster.name); + let site_1_domain = format!( + "{}-gw.{}", + site_1_cluster_name, + self.site_1.get_nats_endpoint(nats_cluster).await? + ); + + let nats_site_1_cluster = NatsCluster { + namespace: nats_cluster.namespace.clone(), + replicas: nats_cluster.replicas.clone(), + name: site_1_cluster_name, + gateway_advertise: format!("{}:443", site_1_domain), + dns_name: site_1_domain.clone(), + supercluster_ca_secret_name, + tls_cert_name: nats_cluster.tls_cert_name, + jetstream_enabled: nats_cluster.jetstream_enabled, + domain: site_1_domain.clone(), + }; + + let site_2_cluster_name = format!("{}-site-2", nats_cluster.name); + let site_2_domain = format!( + "{}-gw.{}", + site_2_cluster_name, + self.site_2.get_nats_endpoint(nats_cluster).await? + ); + + let nats_site_2_cluster = NatsCluster { + namespace: nats_cluster.namespace.clone(), + replicas: nats_cluster.replicas.clone(), + name: site_2_cluster_name, + gateway_advertise: format!("{}:443", site_2_domain), + dns_name: site_2_domain.clone(), + supercluster_ca_secret_name, + tls_cert_name: nats_cluster.tls_cert_name, + jetstream_enabled: nats_cluster.jetstream_enabled, + domain: site_2_domain.clone(), + }; + + let site_3_cluster_name = format!("{}-site-3", nats_cluster.name); + let site_3_domain = format!( + "{}-gw.{}", + site_3_cluster_name, + self.site_3.get_nats_endpoint(nats_cluster).await? + ); + + let nats_site_3_cluster = NatsCluster { + namespace: nats_cluster.namespace.clone(), + replicas: nats_cluster.replicas.clone(), + name: site_3_cluster_name, + gateway_advertise: format!("{}:443", site_3_domain), + dns_name: site_3_domain.clone(), + supercluster_ca_secret_name, + tls_cert_name: nats_cluster.tls_cert_name, + jetstream_enabled: nats_cluster.jetstream_enabled, + domain: site_3_domain.clone(), + }; + let mut ca_bundle = vec![]; + ca_bundle.append( + &mut self + .site_1 + .get_nats_ca_certificate(&nats_site_1_cluster) + .await?, + ); + ca_bundle.append( + &mut self + .site_2 + .get_nats_ca_certificate(&nats_site_2_cluster) + .await?, + ); + ca_bundle.append( + &mut self + .site_3 + .get_nats_ca_certificate(&nats_site_3_cluster) + .await?, + ); + Ok(ca_bundle) + } +} + +#[async_trait] +impl NatsSupercluster + for DecentralizedTopology +{ + async fn create_ca_bundle( + &self, + nats_cluster: &NatsCluster, + ca_certs: Option>, + ) -> Result { + let ca_bundle = self.get_nats_ca_certificate(nats_cluster).await?; + + let supercluster_ca_secret_name = nats_cluster.supercluster_ca_secret_name; + + let site_1_cluster_name = format!("{}-site-1", nats_cluster.name); + let site_1_domain = format!( + "{}-gw.{}", + site_1_cluster_name, + self.site_1.get_nats_endpoint(nats_cluster).await? + ); + + let nats_site_1_cluster = NatsCluster { + namespace: nats_cluster.namespace.clone(), + replicas: nats_cluster.replicas.clone(), + name: site_1_cluster_name, + gateway_advertise: format!("{}:443", site_1_domain), + dns_name: site_1_domain.clone(), + supercluster_ca_secret_name, + tls_cert_name: nats_cluster.tls_cert_name, + jetstream_enabled: nats_cluster.jetstream_enabled, + domain: site_1_domain.clone(), + }; + + let site_2_cluster_name = format!("{}-site-2", nats_cluster.name); + let site_2_domain = format!( + "{}-gw.{}", + site_2_cluster_name, + self.site_2.get_nats_endpoint(nats_cluster).await? + ); + + let nats_site_2_cluster = NatsCluster { + namespace: nats_cluster.namespace.clone(), + replicas: nats_cluster.replicas.clone(), + name: site_2_cluster_name, + gateway_advertise: format!("{}:443", site_2_domain), + dns_name: site_2_domain.clone(), + supercluster_ca_secret_name, + tls_cert_name: nats_cluster.tls_cert_name, + jetstream_enabled: nats_cluster.jetstream_enabled, + domain: site_2_domain.clone(), + }; + + let site_3_cluster_name = format!("{}-site-3", nats_cluster.name); + let site_3_domain = format!( + "{}-gw.{}", + site_3_cluster_name, + self.site_3.get_nats_endpoint(nats_cluster).await? + ); + + let nats_site_3_cluster = NatsCluster { + namespace: nats_cluster.namespace.clone(), + replicas: nats_cluster.replicas.clone(), + name: site_3_cluster_name, + gateway_advertise: format!("{}:443", site_3_domain), + dns_name: site_3_domain.clone(), + supercluster_ca_secret_name, + tls_cert_name: nats_cluster.tls_cert_name, + jetstream_enabled: nats_cluster.jetstream_enabled, + domain: site_3_domain.clone(), + }; + self.site_1 + .create_ca_bundle(&nats_site_1_cluster, Some(ca_bundle.clone())) + .await?; + self.site_2 + .create_ca_bundle(&nats_site_2_cluster, Some(ca_bundle.clone())) + .await?; + self.site_3 + .create_ca_bundle(&nats_site_3_cluster, Some(ca_bundle.clone())) + .await?; + + Ok(format!( + "Successfully deployed nats ca bundle accross clusters \n1 {}, \n2 {}, \n3 {}", + nats_site_1_cluster.name, nats_site_2_cluster.name, nats_site_2_cluster.name + )) + } +} diff --git a/harmony/src/modules/nats/mod.rs b/harmony/src/modules/nats/mod.rs new file mode 100644 index 00000000..a67fa0c0 --- /dev/null +++ b/harmony/src/modules/nats/mod.rs @@ -0,0 +1,4 @@ +pub mod capability; +pub mod decentralized; +pub mod score_nats_supercluster; +pub mod score_nats_k8s; diff --git a/harmony/src/modules/nats/score_nats_k8s.rs b/harmony/src/modules/nats/score_nats_k8s.rs new file mode 100644 index 00000000..4939bb1e --- /dev/null +++ b/harmony/src/modules/nats/score_nats_k8s.rs @@ -0,0 +1,163 @@ +use std::str::FromStr; + +use harmony_macros::hurl; +use log::debug; +use non_blank_string_rs::NonBlankString; +use serde::Serialize; + +use crate::{ + interpret::Interpret, + modules::{ + helm::chart::{HelmChartScore, HelmRepository}, + nats::capability::{NatsCluster, NatsEndpoint}, + }, + score::Score, + topology::{HelmCommand, Topology}, +}; + +#[derive(Debug, Clone, Serialize)] +pub struct NatsK8sScore { + pub cluster: NatsCluster, + pub domain: NatsEndpoint, + pub peers: Option>, +} + +impl Score for NatsK8sScore { + fn name(&self) -> String { + "NatsK8sScore".to_string() + } + + fn create_interpret(&self) -> Box> { + self.build_deploy_nats_score( + self.cluster.clone(), + self.domain.clone(), + self.peers.clone(), + self.cluster.namespace.clone(), + ) + .create_interpret() + } +} + +impl NatsK8sScore { + fn build_deploy_nats_score( + &self, + cluster: NatsCluster, + domain: NatsEndpoint, + peers: Option>, + namespace: String, + ) -> Box> { + let mut gateway_gateways = String::new(); + match peers { + Some(peers) => { + for peer in peers { + // Construct wss:// URLs on port 443 for the remote gateways + gateway_gateways.push_str(&format!( + r#" + - name: {} + urls: + - nats://{}"#, + peer.name, peer.gateway_advertise + )); + } + }, + None => debug!("no peers to push to gateway") , + } + + // Inject gateway config into the 'merge' block to comply with chart structure + let values_yaml = Some(format!( + r#"config: + merge: + authorization: + default_permissions: + publish: ["TEST.*"] + subscribe: ["PUBLIC.>"] + users: + # - user: "admin" + # password: "admin_1" + # permissions: + # publish: ">" + # subscribe: ">" + - password: "enGk0cgZUabM6bN6FXHT" + user: "testUser" + accounts: + system: + users: + - user: "admin" + password: "admin_2" + logtime: true + debug: true + trace: true + system_account: system + cluster: + name: {cluster_name} + enabled: true + replicas: {replicas} + jetstream: + enabled: {jetstream_enabled} + fileStorage: + enabled: true + size: 10Gi + storageDirectory: /data/jetstream + leafnodes: + enabled: false + websocket: + enabled: false + ingress: + enabled: true + className: openshift-default + pathType: Prefix + hosts: + - nats-ws.{domain} + gateway: + enabled: true + port: 7222 + name: {cluster_name} + merge: + advertise: {gateway_advertise} + gateways: {gateway_gateways} + tls: + enabled: true + secretName: {tls_secret_name} + # merge: + # ca_file: "/etc/nats-certs/gateway/ca.crt" +service: + ports: + gateway: + enabled: true +tlsCA: + enabled: true + secretName: {supercluster_ca_secret_name} +natsBox: + container: + image: + tag: nonroot"#, + cluster_name = cluster.name, + replicas = cluster.replicas, + domain = domain, + gateway_gateways = gateway_gateways, + gateway_advertise = cluster.gateway_advertise, + tls_secret_name = format!("{}-tls", cluster.tls_cert_name), + jetstream_enabled = cluster.jetstream_enabled, + supercluster_ca_secret_name = cluster.supercluster_ca_secret_name, + )); + + debug!("Prepared Helm Chart values : \n{values_yaml:#?}"); + let nats = HelmChartScore { + namespace: Some(NonBlankString::from_str(&namespace).unwrap()), + release_name: NonBlankString::from_str(&cluster.name).unwrap(), + chart_name: NonBlankString::from_str("nats/nats").unwrap(), + chart_version: None, + values_overrides: None, + values_yaml, + create_namespace: true, + install_only: false, + repository: Some(HelmRepository::new( + "nats".to_string(), + hurl!("https://nats-io.github.io/k8s/helm/charts/"), + true, + )), + }; + + Box::new(nats) + } +} diff --git a/harmony/src/modules/nats/score_nats_supercluster.rs b/harmony/src/modules/nats/score_nats_supercluster.rs new file mode 100644 index 00000000..3e3cb1a3 --- /dev/null +++ b/harmony/src/modules/nats/score_nats_supercluster.rs @@ -0,0 +1,66 @@ +use async_trait::async_trait; +use harmony_types::id::Id; +use serde::Serialize; + +use crate::{ + data::Version, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, modules::nats::capability::{NatsCluster, NatsSupercluster}, score::Score, topology::Topology +}; + +#[derive(Debug, Clone, Serialize)] +pub struct NatsSuperclusterScore { + nats_cluster: NatsCluster, + peers: Option>, +} + +impl Score for NatsSuperclusterScore { + fn name(&self) -> String { + "NatsSuperclusterScore".to_string() + } + + fn create_interpret(&self) -> Box> { + Box::new(NatsSuperclusterInterpret { + nats_cluster: self.nats_cluster.clone(), + peers: self.peers.clone(), + }) + } +} + +#[derive(Debug)] +pub struct NatsSuperclusterInterpret { + nats_cluster: NatsCluster, + peers: Option>, +} + +#[async_trait] +impl Interpret for NatsSuperclusterInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + ) -> Result { + let cluster_name = topology + .deploy(&self.nats_cluster, self.peers.clone()) + .await + .map_err(|e| InterpretError::new(e))?; + Ok(Outcome::success(format!( + "Nats cluster '{}' deployed in namespace '{}'", + cluster_name, self.nats_cluster.namespace + ))) + } + + fn get_name(&self) -> InterpretName { + InterpretName::Custom("NatsSuperclusterInterpret") + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +} -- 2.39.5 From 92d4e3488a42cd8f7c48b610e50ab5d8524154c0 Mon Sep 17 00:00:00 2001 From: wjro Date: Wed, 28 Jan 2026 09:43:43 -0500 Subject: [PATCH 02/10] refactor: modified struct to accept N sites and N clusters --- examples/nats-module/Cargo.toml | 19 ++ examples/nats-module/src/main.rs | 78 +++++ harmony/src/domain/topology/decentralized.rs | 72 ++--- harmony/src/domain/topology/k8s.rs | 2 +- .../topology/k8s_anywhere/k8s_anywhere.rs | 69 +++-- .../src/domain/topology/k8s_anywhere/mod.rs | 2 +- .../src/domain/topology/k8s_anywhere/nats.rs | 117 +++++-- harmony/src/domain/topology/mod.rs | 2 +- harmony/src/modules/mod.rs | 2 +- harmony/src/modules/nats/capability.rs | 6 +- harmony/src/modules/nats/decentralized.rs | 288 +++--------------- harmony/src/modules/nats/mod.rs | 2 +- harmony/src/modules/nats/score_nats_k8s.rs | 4 +- .../modules/nats/score_nats_supercluster.rs | 38 ++- 14 files changed, 339 insertions(+), 362 deletions(-) create mode 100644 examples/nats-module/Cargo.toml create mode 100644 examples/nats-module/src/main.rs diff --git a/examples/nats-module/Cargo.toml b/examples/nats-module/Cargo.toml new file mode 100644 index 00000000..73be5c70 --- /dev/null +++ b/examples/nats-module/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "example-nats-module-supercluster" +edition = "2024" +version.workspace = true +readme.workspace = true +license.workspace = true +publish = false + +[dependencies] +harmony = { path = "../../harmony" } +harmony_cli = { path = "../../harmony_cli" } +harmony_types = { path = "../../harmony_types" } +cidr = { workspace = true } +tokio = { workspace = true } +harmony_macros = { path = "../../harmony_macros" } +log = { workspace = true } +env_logger = { workspace = true } +url = { workspace = true } +k8s-openapi.workspace = true diff --git a/examples/nats-module/src/main.rs b/examples/nats-module/src/main.rs new file mode 100644 index 00000000..99ef46aa --- /dev/null +++ b/examples/nats-module/src/main.rs @@ -0,0 +1,78 @@ +use harmony::{ + inventory::Inventory, + modules::nats::{capability::NatsCluster, score_nats_supercluster::NatsSuperclusterScore}, + topology::{K8sAnywhereTopology, decentralized::DecentralizedTopology}, +}; + +#[tokio::main] +async fn main() { + let supercluster_ca_secret_name = "nats-supercluster-ca-bundle"; + let tls_cert_name = "nats-gateway"; + let jetstream_enabled = "false"; + let nats_namespace = "nats-2".to_string(); + + let site_1_name = "site-1".to_string(); + let site_1_domain = + std::env::var("HARMONY_NATS_SITE_1_DOMAIN").expect("missing domain in env for site_1"); + + let nats_site_1 = NatsCluster { + namespace: nats_namespace.clone(), + domain: site_1_domain.clone(), + replicas: 1, + name: site_1_name.clone(), + gateway_advertise: format!("{site_1_name}-gw.{site_1_domain}:443"), + dns_name: format!("{site_1_name}-gw.{site_1_domain}"), + supercluster_ca_secret_name: supercluster_ca_secret_name, + tls_cert_name: tls_cert_name, + jetstream_enabled: jetstream_enabled, + }; + + let site_2_name = "site-2".to_string(); + let site_2_domain = + std::env::var("HARMONY_NATS_SITE_2_DOMAIN").expect("missing domain in env for site_2"); + + let nats_site_2 = NatsCluster { + namespace: nats_namespace.clone(), + domain: site_2_domain.clone(), + replicas: 1, + name: site_2_name.clone(), + gateway_advertise: format!("{site_2_name}-gw.{site_2_domain}:443"), + dns_name: format!("{site_2_name}-gw.{site_2_domain}"), + supercluster_ca_secret_name: supercluster_ca_secret_name, + tls_cert_name: tls_cert_name, + jetstream_enabled: jetstream_enabled, + }; + + let site_3_name = "site-3".to_string(); + let site_3_domain = + std::env::var("HARMONY_NATS_SITE_3_DOMAIN").expect("missing domain in env for site_3"); + + let nats_site_3 = NatsCluster { + namespace: nats_namespace.clone(), + domain: site_3_domain.clone(), + replicas: 1, + name: site_3_name.clone(), + gateway_advertise: format!("{site_3_name}-gw.{site_3_domain}:443"), + dns_name: format!("{site_3_name}-gw.{site_3_domain}"), + supercluster_ca_secret_name: supercluster_ca_secret_name, + tls_cert_name: tls_cert_name, + jetstream_enabled: jetstream_enabled, + }; + + let clusters = vec![nats_site_1, nats_site_2]; + // let clusters = vec![nats_site_1, nats_site_2, nats_site_3]; + + let nats_supercluster = NatsSuperclusterScore { + nats_cluster: clusters, + ca_certs: None, + }; + + harmony_cli::run( + Inventory::autoload(), + DecentralizedTopology::::from_env(), + vec![Box::new(nats_supercluster)], + None, + ) + .await + .unwrap(); +} diff --git a/harmony/src/domain/topology/decentralized.rs b/harmony/src/domain/topology/decentralized.rs index 64f0ba2b..edf957e1 100644 --- a/harmony/src/domain/topology/decentralized.rs +++ b/harmony/src/domain/topology/decentralized.rs @@ -1,13 +1,14 @@ use async_trait::async_trait; +use log::info; -use crate::{modules::nats::capability::{Nats, NatsCluster}, topology::{ - K8sAnywhereConfig, K8sAnywhereTopology, PreparationError, PreparationOutcome, Topology, -}}; +use crate:: + topology::{ + K8sAnywhereConfig, K8sAnywhereTopology, PreparationError, PreparationOutcome, Topology, + } +; pub struct DecentralizedTopology { - pub site_1: T, - pub site_2: T, - pub site_3: T, + pub sites: Vec, } #[async_trait] @@ -17,52 +18,43 @@ impl Topology for DecentralizedTopology { } async fn ensure_ready(&self) -> Result { - let site_1_outcome = self.site_1.ensure_ready().await?; - let site_2_outcome = self.site_2.ensure_ready().await?; - let site_3_outcome = self.site_3.ensure_ready().await?; + let mut details = Vec::new(); - match (site_1_outcome, site_2_outcome, site_3_outcome) { - (PreparationOutcome::Noop, PreparationOutcome::Noop, PreparationOutcome::Noop) => { - Ok(PreparationOutcome::Noop) - } - (a, b, c) => { - let mut details = Vec::new(); - if let PreparationOutcome::Success { details: d } = a { - details.push(format!("Site_1: {}", d)); + for site in &self.sites { + let outcome = site.ensure_ready().await?; + match outcome { + PreparationOutcome::Success { details: d } => { + details.push(d); } - - if let PreparationOutcome::Success { details: d } = b { - details.push(format!("Site_2: {}", d)); + PreparationOutcome::Noop => { + details.push("site ready Noop".to_string()); + info!("site ready"); } - if let PreparationOutcome::Success { details: d } = c { - details.push(format!("Site_3: {}", d)); - } - Ok(PreparationOutcome::Success { - details: details.join(","), - }) } } + + Ok(PreparationOutcome::Success { + details: details.join(","), + }) } } impl DecentralizedTopology { pub fn from_env() -> Self { - let site_1_config = - K8sAnywhereConfig::remote_k8s_from_env_var("HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_1"); - let site_2_config = - K8sAnywhereConfig::remote_k8s_from_env_var("HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_2"); - let site_3_config = - K8sAnywhereConfig::remote_k8s_from_env_var("HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_3"); + let mut sites = Vec::new(); - let site_1 = K8sAnywhereTopology::with_config(site_1_config); - let site_2 = K8sAnywhereTopology::with_config(site_2_config); - let site_3 = K8sAnywhereTopology::with_config(site_3_config); + for i in 1.. { + let var = format!("HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_{}", i); - Self { - site_1, - site_2, - site_3, + match std::env::var(&var) { + Ok(_) => { + let cfg = K8sAnywhereConfig::remote_k8s_from_env_var(&var); + sites.push(K8sAnywhereTopology::with_config(cfg)); + } + Err(_) => break, + } } + + Self { sites } } } - diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index f4595ee4..5738c68b 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -19,7 +19,7 @@ use kube::{ core::{DynamicResourceScope, ErrorResponse}, discovery::{ApiCapabilities, Scope}, error::DiscoveryError, - runtime::reflector::Lookup, + runtime::{reflector::Lookup, wait::Condition}, }; use kube::{api::DynamicObject, runtime::conditions}; use kube::{ diff --git a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs index f87d71d1..d974431e 100644 --- a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs @@ -10,10 +10,13 @@ use k8s_openapi::{ rbac::v1::{ClusterRoleBinding, RoleRef, Subject}, }, }; -use kube::api::{DynamicObject, GroupVersionKind, ObjectMeta}; +use kube::{ + api::{DynamicObject, GroupVersionKind, ObjectMeta}, + runtime::conditions, +}; use log::{debug, info, trace, warn}; use serde::Serialize; -use tokio::sync::OnceCell; +use tokio::{sync::OnceCell, time::{Instant, sleep}}; use crate::{ executors::ExecutorError, @@ -507,41 +510,45 @@ impl CertificateManagement for K8sAnywhereTopology { { let secret_name = certificate.spec.secret_name.clone(); - trace!("Secret Name {:#?}", secret_name); - if let Some(secret) = client - .get_resource::(&secret_name, Some(&namespace)) - .await - .map_err(|e| { - ExecutorError::UnexpectedError(format!( - "secret {} not found in namespace {}: {}", - secret_name, namespace, e - )) - })? - { - let ca_cert = secret - .data - .as_ref() - .and_then(|d| d.get("ca.crt")) - .ok_or_else(|| { - ExecutorError::UnexpectedError("Secret missing key 'ca.crt'".into()) - })?; + let deadline = Instant::now() + tokio::time::Duration::from_secs(120); - let ca_cert = String::from_utf8(ca_cert.0.clone()).map_err(|_| { - ExecutorError::UnexpectedError("ca.crt is not valid UTF-8".into()) - })?; + loop { + if Instant::now() > deadline { + return Err(ExecutorError::UnexpectedError(format!( + "Timed out waiting for Secret {} to contain ca.crt", + secret_name + ))); + } - return Ok(ca_cert); - } else { - Err(ExecutorError::UnexpectedError(format!( - "Error getting secret associated with cert_name: {}", - cert_name - ))) + debug!("Secret Name {:#?}", secret_name); + + if let Some(secret) = client + .get_resource::(&secret_name, Some(&namespace)) + .await + .map_err(|e| { + ExecutorError::UnexpectedError(format!( + "secret {} not found in namespace {}: {}", + secret_name, namespace, e + )) + })? + { + if let Some(ca_cert) = secret.data.as_ref().and_then(|d| d.get("ca.crt")) { + let ca_cert = String::from_utf8(ca_cert.0.clone()).map_err(|_| { + ExecutorError::UnexpectedError("ca.crt is not valid UTF-8".into()) + })?; + + return Ok(ca_cert); + } + } + + // Secret not ready yet → wait and retry + sleep(Duration::from_millis(500)).await; } } else { - return Err(ExecutorError::UnexpectedError(format!( + Err(ExecutorError::UnexpectedError(format!( "Certificate {} not found in namespace {}", cert_name, namespace - ))); + ))) } } } diff --git a/harmony/src/domain/topology/k8s_anywhere/mod.rs b/harmony/src/domain/topology/k8s_anywhere/mod.rs index ca1c87c8..d14c28ba 100644 --- a/harmony/src/domain/topology/k8s_anywhere/mod.rs +++ b/harmony/src/domain/topology/k8s_anywhere/mod.rs @@ -1,4 +1,4 @@ mod k8s_anywhere; -mod postgres; pub mod nats; +mod postgres; pub use k8s_anywhere::*; diff --git a/harmony/src/domain/topology/k8s_anywhere/nats.rs b/harmony/src/domain/topology/k8s_anywhere/nats.rs index eae9f61b..e64306fc 100644 --- a/harmony/src/domain/topology/k8s_anywhere/nats.rs +++ b/harmony/src/domain/topology/k8s_anywhere/nats.rs @@ -1,28 +1,25 @@ -use std::collections::BTreeMap; - use async_trait::async_trait; -use k8s_openapi::{ByteString, api::core::v1::Secret}; -use kube::api::ObjectMeta; -use log::debug; +use log::{debug, info}; use crate::{ inventory::Inventory, modules::{ - k8s::{ingress::K8sIngressScore, resource::K8sResourceScore}, + cert_manager::{ + capability::{CertificateManagement, CertificateManagementConfig}, + crd::CaIssuer, + }, + k8s::ingress::K8sIngressScore, nats::{ capability::{Nats, NatsCluster, NatsEndpoint, NatsSupercluster}, score_nats_k8s::NatsK8sScore, }, okd::{ - crd::{ - ingresses_config::Ingress, - route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig}, - }, + crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig}, route::OKDRouteScore, }, }, score::Score, - topology::{K8sAnywhereTopology, K8sclient, KubernetesDistribution, TlsRouter, Topology}, + topology::{K8sAnywhereTopology, KubernetesDistribution, TlsRouter}, }; #[async_trait] @@ -119,27 +116,103 @@ impl Nats for K8sAnywhereTopology { Ok(NatsEndpoint { host: endpoint }) } - async fn get_nats_ca_certificate( - &self, - nats_cluster: &NatsCluster, - ) -> Result, String> { - //TODO get individual certs for each cluster and return a vec that will be appended - //to the ca bundle vec - todo!() + async fn get_nats_ca_certificate(&self, nats_cluster: &NatsCluster) -> Result { + let ca_issuer_name = "harmony-ca-issuer".to_string(); + let root_ca_cert_name = "harmony-root-ca".to_string(); + let self_signed_issuer_name = "harmony-self-signed-issuer".to_string(); + + debug!("creating self signed issuer config"); + let self_signed_config = CertificateManagementConfig { + namespace: Some(nats_cluster.namespace.to_string().clone()), + acme_issuer: None, + ca_issuer: None, + self_signed: true, + }; + + debug!("create ca issuer config"); + let root_ca_config = CertificateManagementConfig { + namespace: Some(nats_cluster.namespace.clone().into()), + acme_issuer: None, + ca_issuer: Some(CaIssuer { + secret_name: format!("{}-tls", root_ca_cert_name), + }), + self_signed: false, + }; + + debug!( + "creating harmony self signed issuer {}", + self_signed_issuer_name + ); + self.create_issuer(self_signed_issuer_name.clone(), &self_signed_config) + .await + .map_err(|e| format!("{}", e))?; + + debug!( + "creating root ca cert from self signed issuer {}", + &root_ca_cert_name + ); + self.create_certificate( + root_ca_cert_name.clone(), + self_signed_issuer_name.clone(), + Some(format!("harmony-{}-ca", nats_cluster.name)), + None, + Some(true), + &root_ca_config, + ) + .await + .map_err(|e| format!("{}", e))?; + + debug!("creating root_ca_issuer {}", &ca_issuer_name); + self.create_issuer(ca_issuer_name.clone(), &root_ca_config) + .await + .map_err(|e| format!("{e}"))?; + + debug!( + "creating tls cert for nats gateway {}", + nats_cluster.tls_cert_name + ); + self.create_certificate( + nats_cluster.tls_cert_name.into(), + ca_issuer_name, + None, + Some(vec![nats_cluster.dns_name.clone()]), + Some(true), + &root_ca_config, + ) + .await + .map_err(|e| format!("{e}"))?; + + info!( + "Successfully created nats cluster certificates in cluster {}", + nats_cluster.name + ); + + let ca_cert = self + .get_ca_certificate(root_ca_cert_name, &root_ca_config) + .await + .map_err(|e| format!("{e}"))?; + + Ok(ca_cert) } } #[async_trait] impl NatsSupercluster for K8sAnywhereTopology { + async fn deploy_supercluster(&self, nats_clusters: Vec) -> Result { + todo!() + } + async fn create_ca_bundle( &self, - nats_cluster: &NatsCluster, + nats_clusters: Vec, ca_certs: Option>, ) -> Result { if let Some(ca_certs) = ca_certs { - self.create_ca_bundle_secret(nats_cluster, &ca_certs) - .await - .map_err(|e| format!("{}", e))?; + for cluster in nats_clusters { + self.create_ca_bundle_secret(&cluster, &ca_certs) + .await + .map_err(|e| format!("{}", e))?; + } Ok("Successfully created ca bundle".to_string()) } else { Ok("No ca bundle certs provided".to_string()) diff --git a/harmony/src/domain/topology/mod.rs b/harmony/src/domain/topology/mod.rs index c7d3da36..28161fcf 100644 --- a/harmony/src/domain/topology/mod.rs +++ b/harmony/src/domain/topology/mod.rs @@ -1,9 +1,9 @@ +pub mod decentralized; mod failover; mod ha_cluster; pub mod ingress; pub mod node_exporter; pub mod opnsense; -pub mod decentralized; pub use failover::*; use harmony_types::net::IpAddress; mod host_binding; diff --git a/harmony/src/modules/mod.rs b/harmony/src/modules/mod.rs index 5601ba41..3fa69469 100644 --- a/harmony/src/modules/mod.rs +++ b/harmony/src/modules/mod.rs @@ -13,6 +13,7 @@ pub mod k8s; pub mod lamp; pub mod load_balancer; pub mod monitoring; +pub mod nats; pub mod network; pub mod okd; pub mod opnsense; @@ -21,4 +22,3 @@ pub mod prometheus; pub mod storage; pub mod tenant; pub mod tftp; -pub mod nats; diff --git a/harmony/src/modules/nats/capability.rs b/harmony/src/modules/nats/capability.rs index 18d840cb..fc390042 100644 --- a/harmony/src/modules/nats/capability.rs +++ b/harmony/src/modules/nats/capability.rs @@ -13,14 +13,16 @@ pub trait Nats { async fn get_nats_endpoint(&self, nats_cluster: &NatsCluster) -> Result; - async fn get_nats_ca_certificate(&self, nats_cluster: &NatsCluster) -> Result, String>; + async fn get_nats_ca_certificate(&self, nats_cluster: &NatsCluster) -> Result; } #[async_trait] pub trait NatsSupercluster: Nats { + async fn deploy_supercluster(&self, nats_clusters: Vec) -> Result; + async fn create_ca_bundle( &self, - nats_cluster: &NatsCluster, + nats_clusters: Vec, ca_certs: Option>, ) -> Result; } diff --git a/harmony/src/modules/nats/decentralized.rs b/harmony/src/modules/nats/decentralized.rs index 0a305617..c1192510 100644 --- a/harmony/src/modules/nats/decentralized.rs +++ b/harmony/src/modules/nats/decentralized.rs @@ -1,4 +1,5 @@ use async_trait::async_trait; +use log::debug; use crate::{ modules::nats::capability::{Nats, NatsCluster, NatsEndpoint, NatsSupercluster}, @@ -12,96 +13,7 @@ impl Nats for DecentralizedTopology nats_cluster: &NatsCluster, peers: Option>, ) -> Result { - let supercluster_ca_secret_name = nats_cluster.supercluster_ca_secret_name; - - let site_1_cluster_name = format!("{}-site-1", nats_cluster.name); - let site_1_domain = format!( - "{}-gw.{}", - site_1_cluster_name, - self.site_1.get_nats_endpoint(nats_cluster).await? - ); - - let nats_site_1_cluster = NatsCluster { - namespace: nats_cluster.namespace.clone(), - replicas: nats_cluster.replicas.clone(), - name: site_1_cluster_name, - gateway_advertise: format!("{}:443", site_1_domain), - dns_name: site_1_domain.clone(), - supercluster_ca_secret_name, - tls_cert_name: nats_cluster.tls_cert_name, - jetstream_enabled: nats_cluster.jetstream_enabled, - domain: site_1_domain.clone(), - }; - - let site_2_cluster_name = format!("{}-site-2", nats_cluster.name); - let site_2_domain = format!( - "{}-gw.{}", - site_2_cluster_name, - self.site_2.get_nats_endpoint(nats_cluster).await? - ); - - let nats_site_2_cluster = NatsCluster { - namespace: nats_cluster.namespace.clone(), - replicas: nats_cluster.replicas.clone(), - name: site_2_cluster_name, - gateway_advertise: format!("{}:443", site_2_domain), - dns_name: site_2_domain.clone(), - supercluster_ca_secret_name, - tls_cert_name: nats_cluster.tls_cert_name, - jetstream_enabled: nats_cluster.jetstream_enabled, - domain: site_2_domain.clone(), - }; - - let site_3_cluster_name = format!("{}-site-3", nats_cluster.name); - let site_3_domain = format!( - "{}-gw.{}", - site_3_cluster_name, - self.site_3.get_nats_endpoint(nats_cluster).await? - ); - - let nats_site_3_cluster = NatsCluster { - namespace: nats_cluster.namespace.clone(), - replicas: nats_cluster.replicas.clone(), - name: site_3_cluster_name, - gateway_advertise: format!("{}:443", site_3_domain), - dns_name: site_3_domain.clone(), - supercluster_ca_secret_name, - tls_cert_name: nats_cluster.tls_cert_name, - jetstream_enabled: nats_cluster.jetstream_enabled, - domain: site_3_domain.clone(), - }; - - self.site_1 - .deploy( - &nats_site_1_cluster, - Some(vec![ - nats_site_2_cluster.clone(), - nats_site_3_cluster.clone(), - ]), - ) - .await?; - self.site_2 - .deploy( - &nats_site_2_cluster, - Some(vec![ - nats_site_1_cluster.clone(), - nats_site_3_cluster.clone(), - ]), - ) - .await?; - self.site_3 - .deploy( - &nats_site_3_cluster, - Some(vec![ - nats_site_1_cluster.clone(), - nats_site_2_cluster.clone(), - ]), - ) - .await?; - Ok(format!( - "Deployed Nats clusters across sites: \n1: {} \n2: {} \n3: {}", - nats_site_1_cluster.name, nats_site_2_cluster.name, nats_site_3_cluster.name - )) + todo!() } async fn get_nats_endpoint(&self, nats_cluster: &NatsCluster) -> Result { @@ -109,88 +21,8 @@ impl Nats for DecentralizedTopology todo!() } - async fn get_nats_ca_certificate( - &self, - nats_cluster: &NatsCluster, - ) -> Result, String> { - let supercluster_ca_secret_name = nats_cluster.supercluster_ca_secret_name; - - let site_1_cluster_name = format!("{}-site-1", nats_cluster.name); - let site_1_domain = format!( - "{}-gw.{}", - site_1_cluster_name, - self.site_1.get_nats_endpoint(nats_cluster).await? - ); - - let nats_site_1_cluster = NatsCluster { - namespace: nats_cluster.namespace.clone(), - replicas: nats_cluster.replicas.clone(), - name: site_1_cluster_name, - gateway_advertise: format!("{}:443", site_1_domain), - dns_name: site_1_domain.clone(), - supercluster_ca_secret_name, - tls_cert_name: nats_cluster.tls_cert_name, - jetstream_enabled: nats_cluster.jetstream_enabled, - domain: site_1_domain.clone(), - }; - - let site_2_cluster_name = format!("{}-site-2", nats_cluster.name); - let site_2_domain = format!( - "{}-gw.{}", - site_2_cluster_name, - self.site_2.get_nats_endpoint(nats_cluster).await? - ); - - let nats_site_2_cluster = NatsCluster { - namespace: nats_cluster.namespace.clone(), - replicas: nats_cluster.replicas.clone(), - name: site_2_cluster_name, - gateway_advertise: format!("{}:443", site_2_domain), - dns_name: site_2_domain.clone(), - supercluster_ca_secret_name, - tls_cert_name: nats_cluster.tls_cert_name, - jetstream_enabled: nats_cluster.jetstream_enabled, - domain: site_2_domain.clone(), - }; - - let site_3_cluster_name = format!("{}-site-3", nats_cluster.name); - let site_3_domain = format!( - "{}-gw.{}", - site_3_cluster_name, - self.site_3.get_nats_endpoint(nats_cluster).await? - ); - - let nats_site_3_cluster = NatsCluster { - namespace: nats_cluster.namespace.clone(), - replicas: nats_cluster.replicas.clone(), - name: site_3_cluster_name, - gateway_advertise: format!("{}:443", site_3_domain), - dns_name: site_3_domain.clone(), - supercluster_ca_secret_name, - tls_cert_name: nats_cluster.tls_cert_name, - jetstream_enabled: nats_cluster.jetstream_enabled, - domain: site_3_domain.clone(), - }; - let mut ca_bundle = vec![]; - ca_bundle.append( - &mut self - .site_1 - .get_nats_ca_certificate(&nats_site_1_cluster) - .await?, - ); - ca_bundle.append( - &mut self - .site_2 - .get_nats_ca_certificate(&nats_site_2_cluster) - .await?, - ); - ca_bundle.append( - &mut self - .site_3 - .get_nats_ca_certificate(&nats_site_3_cluster) - .await?, - ); - Ok(ca_bundle) + async fn get_nats_ca_certificate(&self, nats_cluster: &NatsCluster) -> Result { + todo!() } } @@ -198,84 +30,46 @@ impl Nats for DecentralizedTopology impl NatsSupercluster for DecentralizedTopology { - async fn create_ca_bundle( - &self, - nats_cluster: &NatsCluster, - ca_certs: Option>, - ) -> Result { - let ca_bundle = self.get_nats_ca_certificate(nats_cluster).await?; + async fn deploy_supercluster(&self, nats_clusters: Vec) -> Result { + for (i, (site, cluster)) in self.sites.iter().zip(nats_clusters.iter()).enumerate() { + let peers: Vec = nats_clusters + .iter() + .enumerate() + .filter_map(|(j, c)| if i == j { None } else { Some(c.clone()) }) + .collect(); - let supercluster_ca_secret_name = nats_cluster.supercluster_ca_secret_name; - - let site_1_cluster_name = format!("{}-site-1", nats_cluster.name); - let site_1_domain = format!( - "{}-gw.{}", - site_1_cluster_name, - self.site_1.get_nats_endpoint(nats_cluster).await? - ); - - let nats_site_1_cluster = NatsCluster { - namespace: nats_cluster.namespace.clone(), - replicas: nats_cluster.replicas.clone(), - name: site_1_cluster_name, - gateway_advertise: format!("{}:443", site_1_domain), - dns_name: site_1_domain.clone(), - supercluster_ca_secret_name, - tls_cert_name: nats_cluster.tls_cert_name, - jetstream_enabled: nats_cluster.jetstream_enabled, - domain: site_1_domain.clone(), - }; - - let site_2_cluster_name = format!("{}-site-2", nats_cluster.name); - let site_2_domain = format!( - "{}-gw.{}", - site_2_cluster_name, - self.site_2.get_nats_endpoint(nats_cluster).await? - ); - - let nats_site_2_cluster = NatsCluster { - namespace: nats_cluster.namespace.clone(), - replicas: nats_cluster.replicas.clone(), - name: site_2_cluster_name, - gateway_advertise: format!("{}:443", site_2_domain), - dns_name: site_2_domain.clone(), - supercluster_ca_secret_name, - tls_cert_name: nats_cluster.tls_cert_name, - jetstream_enabled: nats_cluster.jetstream_enabled, - domain: site_2_domain.clone(), - }; - - let site_3_cluster_name = format!("{}-site-3", nats_cluster.name); - let site_3_domain = format!( - "{}-gw.{}", - site_3_cluster_name, - self.site_3.get_nats_endpoint(nats_cluster).await? - ); - - let nats_site_3_cluster = NatsCluster { - namespace: nats_cluster.namespace.clone(), - replicas: nats_cluster.replicas.clone(), - name: site_3_cluster_name, - gateway_advertise: format!("{}:443", site_3_domain), - dns_name: site_3_domain.clone(), - supercluster_ca_secret_name, - tls_cert_name: nats_cluster.tls_cert_name, - jetstream_enabled: nats_cluster.jetstream_enabled, - domain: site_3_domain.clone(), - }; - self.site_1 - .create_ca_bundle(&nats_site_1_cluster, Some(ca_bundle.clone())) - .await?; - self.site_2 - .create_ca_bundle(&nats_site_2_cluster, Some(ca_bundle.clone())) - .await?; - self.site_3 - .create_ca_bundle(&nats_site_3_cluster, Some(ca_bundle.clone())) - .await?; + site.deploy(cluster, Some(peers)).await?; + } Ok(format!( - "Successfully deployed nats ca bundle accross clusters \n1 {}, \n2 {}, \n3 {}", - nats_site_1_cluster.name, nats_site_2_cluster.name, nats_site_2_cluster.name + "Deployed Nats clusters across sites", + )) + } + + async fn create_ca_bundle( + &self, + nats_clusters: Vec, + ca_certs: Option>, + ) -> Result { + debug!("extracting nats clusters"); + let mut ca_bundle = Vec::new(); + for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { + let ca_cert = site.get_nats_ca_certificate(&cluster).await?; + ca_bundle.push(ca_cert); + } + + if let Some(mut certs) = ca_certs { + ca_bundle.append(&mut certs) + } + + debug!("creating ca bundle {:#?} on site 1", ca_bundle); + + for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { + site.create_ca_bundle(vec![cluster.clone()], Some(ca_bundle.clone())).await?; + } + + Ok(format!( + "Successfully deployed nats ca bundle accross clusters", )) } } diff --git a/harmony/src/modules/nats/mod.rs b/harmony/src/modules/nats/mod.rs index a67fa0c0..1822f1f5 100644 --- a/harmony/src/modules/nats/mod.rs +++ b/harmony/src/modules/nats/mod.rs @@ -1,4 +1,4 @@ pub mod capability; pub mod decentralized; -pub mod score_nats_supercluster; pub mod score_nats_k8s; +pub mod score_nats_supercluster; diff --git a/harmony/src/modules/nats/score_nats_k8s.rs b/harmony/src/modules/nats/score_nats_k8s.rs index 4939bb1e..6a6813df 100644 --- a/harmony/src/modules/nats/score_nats_k8s.rs +++ b/harmony/src/modules/nats/score_nats_k8s.rs @@ -59,8 +59,8 @@ impl NatsK8sScore { peer.name, peer.gateway_advertise )); } - }, - None => debug!("no peers to push to gateway") , + } + None => debug!("no peers to push to gateway"), } // Inject gateway config into the 'merge' block to comply with chart structure diff --git a/harmony/src/modules/nats/score_nats_supercluster.rs b/harmony/src/modules/nats/score_nats_supercluster.rs index 3e3cb1a3..2f31c464 100644 --- a/harmony/src/modules/nats/score_nats_supercluster.rs +++ b/harmony/src/modules/nats/score_nats_supercluster.rs @@ -1,15 +1,21 @@ use async_trait::async_trait; use harmony_types::id::Id; +use log::info; use serde::Serialize; use crate::{ - data::Version, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, modules::nats::capability::{NatsCluster, NatsSupercluster}, score::Score, topology::Topology + data::Version, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + modules::nats::capability::{NatsCluster, NatsSupercluster}, + score::Score, + topology::Topology, }; #[derive(Debug, Clone, Serialize)] pub struct NatsSuperclusterScore { - nats_cluster: NatsCluster, - peers: Option>, + pub nats_cluster: Vec, + pub ca_certs: Option>, } impl Score for NatsSuperclusterScore { @@ -19,16 +25,16 @@ impl Score for NatsSuperclusterScore { fn create_interpret(&self) -> Box> { Box::new(NatsSuperclusterInterpret { - nats_cluster: self.nats_cluster.clone(), - peers: self.peers.clone(), + nats_clusters: self.nats_cluster.clone(), + ca_certs: self.ca_certs.clone(), }) } } #[derive(Debug)] pub struct NatsSuperclusterInterpret { - nats_cluster: NatsCluster, - peers: Option>, + nats_clusters: Vec, + ca_certs: Option>, } #[async_trait] @@ -38,14 +44,20 @@ impl Interpret for NatsSuperclusterInterpret _inventory: &Inventory, topology: &T, ) -> Result { - let cluster_name = topology - .deploy(&self.nats_cluster, self.peers.clone()) + + info!("creating nats supercluster ca bundle"); + topology + .create_ca_bundle(self.nats_clusters.clone(), self.ca_certs.clone()) .await .map_err(|e| InterpretError::new(e))?; - Ok(Outcome::success(format!( - "Nats cluster '{}' deployed in namespace '{}'", - cluster_name, self.nats_cluster.namespace - ))) + + info!("deploying nats supercluster"); + let cluster_name = topology + .deploy_supercluster(self.nats_clusters.clone()) + .await + .map_err(|e| InterpretError::new(e))?; + + Ok(Outcome::success(format!("Nats cluster '{}'", cluster_name))) } fn get_name(&self) -> InterpretName { -- 2.39.5 From 00d4b9de73b3b0149be52d4f906839464d5621be Mon Sep 17 00:00:00 2001 From: wjro Date: Wed, 28 Jan 2026 14:25:24 -0500 Subject: [PATCH 03/10] fix: added fullnameOverride to helmchart score values so that the helm deployed cluster name and service name match the cluster name defined in the NatsK8sScore, without this field helm appends -nats to svc and cluster name, breaking the tls chain --- harmony/src/modules/nats/score_nats_k8s.rs | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/harmony/src/modules/nats/score_nats_k8s.rs b/harmony/src/modules/nats/score_nats_k8s.rs index 6a6813df..bee4b43e 100644 --- a/harmony/src/modules/nats/score_nats_k8s.rs +++ b/harmony/src/modules/nats/score_nats_k8s.rs @@ -65,7 +65,9 @@ impl NatsK8sScore { // Inject gateway config into the 'merge' block to comply with chart structure let values_yaml = Some(format!( - r#"config: + r#" +fullnameOverride: {cluster_name} +config: merge: authorization: default_permissions: -- 2.39.5 From edf94554b8d56cebbf01169f47c4a7ac84a01f76 Mon Sep 17 00:00:00 2001 From: wjro Date: Wed, 28 Jan 2026 14:43:50 -0500 Subject: [PATCH 04/10] fix: formating and example env variables --- Cargo.lock | 16 ++++++++++++++++ examples/nats-module/env_example.sh | 9 +++++++++ examples/nats-module/src/main.rs | 5 ++--- harmony/src/domain/topology/decentralized.rs | 8 +++----- .../domain/topology/k8s_anywhere/k8s_anywhere.rs | 5 ++++- harmony/src/domain/topology/k8s_anywhere/nats.rs | 4 ++-- harmony/src/modules/nats/decentralized.rs | 7 +++---- .../src/modules/nats/score_nats_supercluster.rs | 1 - 8 files changed, 39 insertions(+), 16 deletions(-) create mode 100644 examples/nats-module/env_example.sh diff --git a/Cargo.lock b/Cargo.lock index 750a4949..ba9fa528 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1912,6 +1912,22 @@ dependencies = [ "url", ] +[[package]] +name = "example-nats-module-supercluster" +version = "0.1.0" +dependencies = [ + "cidr", + "env_logger", + "harmony", + "harmony_cli", + "harmony_macros", + "harmony_types", + "k8s-openapi", + "log", + "tokio", + "url", +] + [[package]] name = "example-nats-supercluster" version = "0.1.0" diff --git a/examples/nats-module/env_example.sh b/examples/nats-module/env_example.sh new file mode 100644 index 00000000..7d3fee03 --- /dev/null +++ b/examples/nats-module/env_example.sh @@ -0,0 +1,9 @@ +# Cluster 1 +export HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_1="kubeconfig=$HOME/.kube/config,context=cluster-context" +export HARMONY_NATS_SITE_1_DOMAIN="your.domain.1" +# Cluster 2 +export HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_2="kubeconfig=$HOME/.kube/config,context=cluster-context" +export HARMONY_NATS_SITE_2_DOMAIN="your.domain.2" +# Cluster 3 +export HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_3="kubeconfig=$HOME/.kube/config,context=cluster-context" +export HARMONY_NATS_SITE_3_DOMAIN="your.domain.3" diff --git a/examples/nats-module/src/main.rs b/examples/nats-module/src/main.rs index 99ef46aa..2b2b1d8c 100644 --- a/examples/nats-module/src/main.rs +++ b/examples/nats-module/src/main.rs @@ -9,7 +9,7 @@ async fn main() { let supercluster_ca_secret_name = "nats-supercluster-ca-bundle"; let tls_cert_name = "nats-gateway"; let jetstream_enabled = "false"; - let nats_namespace = "nats-2".to_string(); + let nats_namespace = "nats-example".to_string(); let site_1_name = "site-1".to_string(); let site_1_domain = @@ -59,8 +59,7 @@ async fn main() { jetstream_enabled: jetstream_enabled, }; - let clusters = vec![nats_site_1, nats_site_2]; - // let clusters = vec![nats_site_1, nats_site_2, nats_site_3]; + let clusters = vec![nats_site_1, nats_site_2, nats_site_3]; let nats_supercluster = NatsSuperclusterScore { nats_cluster: clusters, diff --git a/harmony/src/domain/topology/decentralized.rs b/harmony/src/domain/topology/decentralized.rs index edf957e1..1ee2e010 100644 --- a/harmony/src/domain/topology/decentralized.rs +++ b/harmony/src/domain/topology/decentralized.rs @@ -1,11 +1,9 @@ use async_trait::async_trait; use log::info; -use crate:: - topology::{ - K8sAnywhereConfig, K8sAnywhereTopology, PreparationError, PreparationOutcome, Topology, - } -; +use crate::topology::{ + K8sAnywhereConfig, K8sAnywhereTopology, PreparationError, PreparationOutcome, Topology, +}; pub struct DecentralizedTopology { pub sites: Vec, diff --git a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs index d974431e..97d02e8c 100644 --- a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs @@ -16,7 +16,10 @@ use kube::{ }; use log::{debug, info, trace, warn}; use serde::Serialize; -use tokio::{sync::OnceCell, time::{Instant, sleep}}; +use tokio::{ + sync::OnceCell, + time::{Instant, sleep}, +}; use crate::{ executors::ExecutorError, diff --git a/harmony/src/domain/topology/k8s_anywhere/nats.rs b/harmony/src/domain/topology/k8s_anywhere/nats.rs index e64306fc..13b7adfd 100644 --- a/harmony/src/domain/topology/k8s_anywhere/nats.rs +++ b/harmony/src/domain/topology/k8s_anywhere/nats.rs @@ -42,7 +42,7 @@ impl Nats for K8sAnywhereTopology { spec: RouteSpec { to: RouteTargetReference { kind: "Service".to_string(), - name: nats_cluster.name.to_string(), + name: format!("{}", nats_cluster.name.to_string()), weight: Some(100), }, host: Some(nats_cluster.dns_name.clone()), @@ -176,7 +176,7 @@ impl Nats for K8sAnywhereTopology { ca_issuer_name, None, Some(vec![nats_cluster.dns_name.clone()]), - Some(true), + Some(false), &root_ca_config, ) .await diff --git a/harmony/src/modules/nats/decentralized.rs b/harmony/src/modules/nats/decentralized.rs index c1192510..b1f56a1b 100644 --- a/harmony/src/modules/nats/decentralized.rs +++ b/harmony/src/modules/nats/decentralized.rs @@ -41,9 +41,7 @@ impl NatsSupercluster site.deploy(cluster, Some(peers)).await?; } - Ok(format!( - "Deployed Nats clusters across sites", - )) + Ok(format!("Deployed Nats clusters across sites",)) } async fn create_ca_bundle( @@ -65,7 +63,8 @@ impl NatsSupercluster debug!("creating ca bundle {:#?} on site 1", ca_bundle); for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { - site.create_ca_bundle(vec![cluster.clone()], Some(ca_bundle.clone())).await?; + site.create_ca_bundle(vec![cluster.clone()], Some(ca_bundle.clone())) + .await?; } Ok(format!( diff --git a/harmony/src/modules/nats/score_nats_supercluster.rs b/harmony/src/modules/nats/score_nats_supercluster.rs index 2f31c464..de1bc952 100644 --- a/harmony/src/modules/nats/score_nats_supercluster.rs +++ b/harmony/src/modules/nats/score_nats_supercluster.rs @@ -44,7 +44,6 @@ impl Interpret for NatsSuperclusterInterpret _inventory: &Inventory, topology: &T, ) -> Result { - info!("creating nats supercluster ca bundle"); topology .create_ca_bundle(self.nats_clusters.clone(), self.ca_certs.clone()) -- 2.39.5 From 666a3c007185f0429315567be0b1ec78a99e1c8c Mon Sep 17 00:00:00 2001 From: wjro Date: Wed, 28 Jan 2026 15:16:46 -0500 Subject: [PATCH 05/10] fix: modified nats trait and nats supercluster trait to better respect interface segregation --- .../src/domain/topology/k8s_anywhere/nats.rs | 29 ++++++++---------- harmony/src/modules/nats/capability.rs | 15 ++++++++-- harmony/src/modules/nats/decentralized.rs | 30 +++---------------- .../modules/nats/score_nats_supercluster.rs | 2 +- 4 files changed, 29 insertions(+), 47 deletions(-) diff --git a/harmony/src/domain/topology/k8s_anywhere/nats.rs b/harmony/src/domain/topology/k8s_anywhere/nats.rs index 13b7adfd..3fcba127 100644 --- a/harmony/src/domain/topology/k8s_anywhere/nats.rs +++ b/harmony/src/domain/topology/k8s_anywhere/nats.rs @@ -116,7 +116,10 @@ impl Nats for K8sAnywhereTopology { Ok(NatsEndpoint { host: endpoint }) } - async fn get_nats_ca_certificate(&self, nats_cluster: &NatsCluster) -> Result { + async fn get_local_nats_ca_certificate( + &self, + nats_cluster: &NatsCluster, + ) -> Result { let ca_issuer_name = "harmony-ca-issuer".to_string(); let root_ca_cert_name = "harmony-root-ca".to_string(); let self_signed_issuer_name = "harmony-self-signed-issuer".to_string(); @@ -194,28 +197,20 @@ impl Nats for K8sAnywhereTopology { Ok(ca_cert) } -} -#[async_trait] -impl NatsSupercluster for K8sAnywhereTopology { - async fn deploy_supercluster(&self, nats_clusters: Vec) -> Result { - todo!() - } - - async fn create_ca_bundle( + async fn install_local_ca_bundle( &self, - nats_clusters: Vec, + nats_cluster: NatsCluster, ca_certs: Option>, ) -> Result { - if let Some(ca_certs) = ca_certs { - for cluster in nats_clusters { - self.create_ca_bundle_secret(&cluster, &ca_certs) + match ca_certs { + Some(certs) => { + self.create_ca_bundle_secret(&nats_cluster, &certs) .await - .map_err(|e| format!("{}", e))?; + .map_err(|e| e.to_string())?; + Ok("Successfully created ca bundle secret".to_string()) } - Ok("Successfully created ca bundle".to_string()) - } else { - Ok("No ca bundle certs provided".to_string()) + None => Ok("no certs to create".to_string()), } } } diff --git a/harmony/src/modules/nats/capability.rs b/harmony/src/modules/nats/capability.rs index fc390042..fcdf8d91 100644 --- a/harmony/src/modules/nats/capability.rs +++ b/harmony/src/modules/nats/capability.rs @@ -13,14 +13,23 @@ pub trait Nats { async fn get_nats_endpoint(&self, nats_cluster: &NatsCluster) -> Result; - async fn get_nats_ca_certificate(&self, nats_cluster: &NatsCluster) -> Result; + async fn get_local_nats_ca_certificate( + &self, + nats_cluster: &NatsCluster, + ) -> Result; + + async fn install_local_ca_bundle( + &self, + nats_cluster: NatsCluster, + ca_certs: Option>, + ) -> Result; } #[async_trait] -pub trait NatsSupercluster: Nats { +pub trait NatsSupercluster { async fn deploy_supercluster(&self, nats_clusters: Vec) -> Result; - async fn create_ca_bundle( + async fn distribute_ca_bundle( &self, nats_clusters: Vec, ca_certs: Option>, diff --git a/harmony/src/modules/nats/decentralized.rs b/harmony/src/modules/nats/decentralized.rs index b1f56a1b..0f7e68a7 100644 --- a/harmony/src/modules/nats/decentralized.rs +++ b/harmony/src/modules/nats/decentralized.rs @@ -7,29 +7,7 @@ use crate::{ }; #[async_trait] -impl Nats for DecentralizedTopology { - async fn deploy( - &self, - nats_cluster: &NatsCluster, - peers: Option>, - ) -> Result { - todo!() - } - - async fn get_nats_endpoint(&self, nats_cluster: &NatsCluster) -> Result { - //This feels like a problem - todo!() - } - - async fn get_nats_ca_certificate(&self, nats_cluster: &NatsCluster) -> Result { - todo!() - } -} - -#[async_trait] -impl NatsSupercluster - for DecentralizedTopology -{ +impl NatsSupercluster for DecentralizedTopology { async fn deploy_supercluster(&self, nats_clusters: Vec) -> Result { for (i, (site, cluster)) in self.sites.iter().zip(nats_clusters.iter()).enumerate() { let peers: Vec = nats_clusters @@ -44,7 +22,7 @@ impl NatsSupercluster Ok(format!("Deployed Nats clusters across sites",)) } - async fn create_ca_bundle( + async fn distribute_ca_bundle( &self, nats_clusters: Vec, ca_certs: Option>, @@ -52,7 +30,7 @@ impl NatsSupercluster debug!("extracting nats clusters"); let mut ca_bundle = Vec::new(); for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { - let ca_cert = site.get_nats_ca_certificate(&cluster).await?; + let ca_cert = site.get_local_nats_ca_certificate(&cluster).await?; ca_bundle.push(ca_cert); } @@ -63,7 +41,7 @@ impl NatsSupercluster debug!("creating ca bundle {:#?} on site 1", ca_bundle); for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { - site.create_ca_bundle(vec![cluster.clone()], Some(ca_bundle.clone())) + site.install_local_ca_bundle(cluster.clone(), Some(ca_bundle.clone())) .await?; } diff --git a/harmony/src/modules/nats/score_nats_supercluster.rs b/harmony/src/modules/nats/score_nats_supercluster.rs index de1bc952..e6ef64e1 100644 --- a/harmony/src/modules/nats/score_nats_supercluster.rs +++ b/harmony/src/modules/nats/score_nats_supercluster.rs @@ -46,7 +46,7 @@ impl Interpret for NatsSuperclusterInterpret ) -> Result { info!("creating nats supercluster ca bundle"); topology - .create_ca_bundle(self.nats_clusters.clone(), self.ca_certs.clone()) + .distribute_ca_bundle(self.nats_clusters.clone(), self.ca_certs.clone()) .await .map_err(|e| InterpretError::new(e))?; -- 2.39.5 From 7df8429181b83f7a374a37d0e39f29905db450cf Mon Sep 17 00:00:00 2001 From: wjro Date: Thu, 29 Jan 2026 15:03:33 -0500 Subject: [PATCH 06/10] feat: introduced crate tokio-retry to allow multiple attempts to get secret from k8s --- harmony/Cargo.toml | 1 + .../topology/k8s_anywhere/k8s_anywhere.rs | 67 +++++++++---------- .../src/domain/topology/k8s_anywhere/nats.rs | 29 ++++++-- 3 files changed, 56 insertions(+), 41 deletions(-) diff --git a/harmony/Cargo.toml b/harmony/Cargo.toml index 634cbe96..a1a6e7b3 100644 --- a/harmony/Cargo.toml +++ b/harmony/Cargo.toml @@ -79,6 +79,7 @@ sqlx.workspace = true inquire.workspace = true brocade = { path = "../brocade" } option-ext = "0.2.0" +tokio-retry = "0.3.0" [dev-dependencies] pretty_assertions.workspace = true diff --git a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs index 97d02e8c..ee705e15 100644 --- a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs @@ -16,10 +16,7 @@ use kube::{ }; use log::{debug, info, trace, warn}; use serde::Serialize; -use tokio::{ - sync::OnceCell, - time::{Instant, sleep}, -}; +use tokio::sync::OnceCell; use crate::{ executors::ExecutorError, @@ -513,45 +510,41 @@ impl CertificateManagement for K8sAnywhereTopology { { let secret_name = certificate.spec.secret_name.clone(); - let deadline = Instant::now() + tokio::time::Duration::from_secs(120); + debug!("Secret Name {:#?}", secret_name); + if let Some(secret) = client + .get_resource::(&secret_name, Some(&namespace)) + .await + .map_err(|e| { + ExecutorError::UnexpectedError(format!( + "secret {} not found in namespace {}: {}", + secret_name, namespace, e + )) + })? + { + let ca_cert = secret + .data + .as_ref() + .and_then(|d| d.get("ca.crt")) + .ok_or_else(|| { + ExecutorError::UnexpectedError("Secret missing key 'ca.crt'".into()) + })?; - loop { - if Instant::now() > deadline { - return Err(ExecutorError::UnexpectedError(format!( - "Timed out waiting for Secret {} to contain ca.crt", - secret_name - ))); - } + let ca_cert = String::from_utf8(ca_cert.0.clone()).map_err(|_| { + ExecutorError::UnexpectedError("ca.crt is not valid UTF-8".into()) + })?; - debug!("Secret Name {:#?}", secret_name); - - if let Some(secret) = client - .get_resource::(&secret_name, Some(&namespace)) - .await - .map_err(|e| { - ExecutorError::UnexpectedError(format!( - "secret {} not found in namespace {}: {}", - secret_name, namespace, e - )) - })? - { - if let Some(ca_cert) = secret.data.as_ref().and_then(|d| d.get("ca.crt")) { - let ca_cert = String::from_utf8(ca_cert.0.clone()).map_err(|_| { - ExecutorError::UnexpectedError("ca.crt is not valid UTF-8".into()) - })?; - - return Ok(ca_cert); - } - } - - // Secret not ready yet → wait and retry - sleep(Duration::from_millis(500)).await; + return Ok(ca_cert); + } else { + Err(ExecutorError::UnexpectedError(format!( + "Error getting secret associated with cert_name: {}, secret_name: {}", + cert_name, secret_name + ))) } } else { - Err(ExecutorError::UnexpectedError(format!( + return Err(ExecutorError::UnexpectedError(format!( "Certificate {} not found in namespace {}", cert_name, namespace - ))) + ))); } } } diff --git a/harmony/src/domain/topology/k8s_anywhere/nats.rs b/harmony/src/domain/topology/k8s_anywhere/nats.rs index 3fcba127..7e1f24cb 100644 --- a/harmony/src/domain/topology/k8s_anywhere/nats.rs +++ b/harmony/src/domain/topology/k8s_anywhere/nats.rs @@ -1,5 +1,8 @@ +use std::time::Duration; + use async_trait::async_trait; use log::{debug, info}; +use tokio_retry::{Retry, strategy::ExponentialBackoff}; use crate::{ inventory::Inventory, @@ -190,10 +193,28 @@ impl Nats for K8sAnywhereTopology { nats_cluster.name ); - let ca_cert = self - .get_ca_certificate(root_ca_cert_name, &root_ca_config) - .await - .map_err(|e| format!("{e}"))?; + let strategy = ExponentialBackoff::from_millis(250) + .factor(2) + .max_delay(Duration::from_millis(1000)) + .take(10); + + let ca_cert = Retry::spawn(strategy, || async { + log::debug!("Attempting CA cert fetch"); + + let res = self + .get_ca_certificate(root_ca_cert_name.clone(), &root_ca_config) + .await; + + match res { + Ok(cert) => Ok(cert), + Err(e) => { + log::warn!("Retryable error: {:?}", e); + Err(e) + } + } + }) + .await + .map_err(|e| format!("Retries exhausted: {:?}", e))?; Ok(ca_cert) } -- 2.39.5 From a0f32bb565378138933a70f98becd95c23546e4a Mon Sep 17 00:00:00 2001 From: wjro Date: Fri, 30 Jan 2026 10:57:22 -0500 Subject: [PATCH 07/10] wip: working on separation of concerns --- .../topology/k8s_anywhere/k8s_anywhere.rs | 116 ++++++------- .../src/domain/topology/k8s_anywhere/nats.rs | 117 +++++-------- harmony/src/modules/nats/capability.rs | 2 +- harmony/src/modules/nats/decentralized.rs | 2 +- harmony/src/modules/nats/score_nats_k8s.rs | 160 +++++++++++++++--- 5 files changed, 241 insertions(+), 156 deletions(-) diff --git a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs index 97d02e8c..94f23aa3 100644 --- a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs @@ -89,7 +89,7 @@ struct K8sState { message: String, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub enum KubernetesDistribution { OpenshiftFamily, K3sFamily, @@ -575,63 +575,63 @@ impl K8sAnywhereTopology { } } - async fn build_ca_bundle_secret( - &self, - namespace: &str, - nats_cluster: &NatsCluster, - bundle: &Vec, - ) -> Secret { - Secret { - metadata: ObjectMeta { - name: Some(nats_cluster.supercluster_ca_secret_name.to_string()), - namespace: Some(namespace.to_string()), - ..Default::default() - }, - data: Some(self.build_secret_data(bundle).await), - immutable: Some(false), - type_: Some("Opaque".to_string()), - string_data: None, - } - } - - async fn build_secret_data(&self, bundle: &Vec) -> BTreeMap { - let mut data = BTreeMap::new(); - - data.insert( - "ca.crt".to_string(), - ByteString(bundle.join("\n").into_bytes()), - ); - - data - } - - pub async fn create_ca_bundle_secret( - &self, - nats_cluster: &NatsCluster, - ca_bundle: &Vec, - ) -> Result { - let bundle_secret = self - .build_ca_bundle_secret(&nats_cluster.namespace, nats_cluster, ca_bundle) - .await; - - debug!( - "deploying secret to ns: {} \nsecret: {:#?}", - nats_cluster.namespace, bundle_secret - ); - - let k8ssecret = - K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone())); - - k8ssecret - .interpret(&Inventory::empty(), self) - .await - .map_err(|e| ExecutorError::UnexpectedError(format!("{e}")))?; - - Ok(Outcome::success(format!( - "successfully craeted ca bundle {} in ns: {}", - nats_cluster.supercluster_ca_secret_name, nats_cluster.namespace - ))) - } + // async fn build_ca_bundle_secret( + // &self, + // namespace: &str, + // nats_cluster: &NatsCluster, + // bundle: &Vec, + // ) -> Secret { + // Secret { + // metadata: ObjectMeta { + // name: Some(nats_cluster.supercluster_ca_secret_name.to_string()), + // namespace: Some(namespace.to_string()), + // ..Default::default() + // }, + // data: Some(self.build_secret_data(bundle).await), + // immutable: Some(false), + // type_: Some("Opaque".to_string()), + // string_data: None, + // } + // } + // + // async fn build_secret_data(&self, bundle: &Vec) -> BTreeMap { + // let mut data = BTreeMap::new(); + // + // data.insert( + // "ca.crt".to_string(), + // ByteString(bundle.join("\n").into_bytes()), + // ); + // + // data + // } + // + // pub async fn create_ca_bundle_secret( + // &self, + // nats_cluster: &NatsCluster, + // ca_bundle: &Vec, + // ) -> Result { + // let bundle_secret = self + // .build_ca_bundle_secret(&nats_cluster.namespace, nats_cluster, ca_bundle) + // .await; + // + // debug!( + // "deploying secret to ns: {} \nsecret: {:#?}", + // nats_cluster.namespace, bundle_secret + // ); + // + // let k8ssecret = + // K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone())); + // + // k8ssecret + // .interpret(&Inventory::empty(), self) + // .await + // .map_err(|e| ExecutorError::UnexpectedError(format!("{e}")))?; + // + // Ok(Outcome::success(format!( + // "successfully craeted ca bundle {} in ns: {}", + // nats_cluster.supercluster_ca_secret_name, nats_cluster.namespace + // ))) + // } pub async fn certificate_issuer_ready( &self, diff --git a/harmony/src/domain/topology/k8s_anywhere/nats.rs b/harmony/src/domain/topology/k8s_anywhere/nats.rs index 3fcba127..8176386e 100644 --- a/harmony/src/domain/topology/k8s_anywhere/nats.rs +++ b/harmony/src/domain/topology/k8s_anywhere/nats.rs @@ -1,4 +1,8 @@ +use std::collections::BTreeMap; + use async_trait::async_trait; +use k8s_openapi::{ByteString, api::core::v1::Secret}; +use kube::api::ObjectMeta; use log::{debug, info}; use crate::{ @@ -8,18 +12,14 @@ use crate::{ capability::{CertificateManagement, CertificateManagementConfig}, crd::CaIssuer, }, - k8s::ingress::K8sIngressScore, + k8s::resource::K8sResourceScore, nats::{ - capability::{Nats, NatsCluster, NatsEndpoint, NatsSupercluster}, + capability::{Nats, NatsCluster, NatsEndpoint}, score_nats_k8s::NatsK8sScore, }, - okd::{ - crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig}, - route::OKDRouteScore, - }, }, score::Score, - topology::{K8sAnywhereTopology, KubernetesDistribution, TlsRouter}, + topology::{K8sAnywhereTopology, TlsRouter}, }; #[async_trait] @@ -29,75 +29,11 @@ impl Nats for K8sAnywhereTopology { nats_cluster: &NatsCluster, peers: Option>, ) -> Result { - let domain = self.get_nats_endpoint(nats_cluster).await?; - match self - .get_k8s_distribution() - .await - .map_err(|e| format!("Error getting k8s distribution : {}", e.to_string()))? - { - KubernetesDistribution::OpenshiftFamily => { - let route = OKDRouteScore { - name: nats_cluster.name.to_string(), - namespace: nats_cluster.namespace.to_string(), - spec: RouteSpec { - to: RouteTargetReference { - kind: "Service".to_string(), - name: format!("{}", nats_cluster.name.to_string()), - weight: Some(100), - }, - host: Some(nats_cluster.dns_name.clone()), - port: Some(RoutePort { target_port: 7222 }), - tls: Some(TLSConfig { - insecure_edge_termination_policy: None, - termination: "passthrough".to_string(), - ..Default::default() - }), - wildcard_policy: None, - ..Default::default() - }, - }; - route - .interpret(&Inventory::empty(), self) - .await - .map_err(|e| format!("Failed to deploy OKD Route: {e}"))?; - } - KubernetesDistribution::K3sFamily => { - //TODO untested - K8sIngressScore { - name: todo!(), - host: todo!(), - backend_service: todo!(), - port: todo!(), - path: todo!(), - path_type: todo!(), - namespace: todo!(), - ingress_class_name: todo!(), - } - .interpret(&Inventory::empty(), self) - .await - .map_err(|e| format!("Failed to deploy K8s ingress: {e}"))?; - } - KubernetesDistribution::Default => { - //TODO untested - K8sIngressScore { - name: todo!(), - host: todo!(), - backend_service: todo!(), - port: todo!(), - path: todo!(), - path_type: todo!(), - namespace: todo!(), - ingress_class_name: todo!(), - } - .interpret(&Inventory::empty(), self) - .await - .map_err(|e| format!("Failed to deploy K8s ingress: {e}"))?; - } - } + let distro = self.get_k8s_distribution().await.unwrap(); NatsK8sScore { + distribution: distro.clone(), cluster: nats_cluster.clone(), - domain, peers, } .interpret(&Inventory::empty(), self) @@ -110,7 +46,7 @@ impl Nats for K8sAnywhereTopology { )) } - async fn get_nats_endpoint(&self, nats_cluster: &NatsCluster) -> Result { + async fn get_nats_endpoint(&self) -> Result { let endpoint = self.get_internal_domain().await?.unwrap(); Ok(NatsEndpoint { host: endpoint }) @@ -203,12 +139,39 @@ impl Nats for K8sAnywhereTopology { nats_cluster: NatsCluster, ca_certs: Option>, ) -> Result { + async fn build_secret_data(bundle: &Vec) -> BTreeMap { + let mut data = BTreeMap::new(); + + data.insert( + "ca.crt".to_string(), + ByteString(bundle.join("\n").into_bytes()), + ); + + data + } + match ca_certs { Some(certs) => { - self.create_ca_bundle_secret(&nats_cluster, &certs) + let bundle_secret = Secret { + metadata: ObjectMeta { + name: Some(nats_cluster.supercluster_ca_secret_name.to_string()), + namespace: Some(nats_cluster.namespace.to_string()), + ..Default::default() + }, + data: Some(build_secret_data(&certs).await), + immutable: Some(false), + type_: Some("Opaque".to_string()), + string_data: None, + }; + K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone())) + .interpret(&Inventory::empty(), self) .await - .map_err(|e| e.to_string())?; - Ok("Successfully created ca bundle secret".to_string()) + .map_err(|e| format!("{e}"))?; + + Ok(format!( + "successfully created ca bundle {} in ns: {}", + nats_cluster.supercluster_ca_secret_name, nats_cluster.namespace + )) } None => Ok("no certs to create".to_string()), } diff --git a/harmony/src/modules/nats/capability.rs b/harmony/src/modules/nats/capability.rs index fcdf8d91..838d8287 100644 --- a/harmony/src/modules/nats/capability.rs +++ b/harmony/src/modules/nats/capability.rs @@ -11,7 +11,7 @@ pub trait Nats { peers: Option>, ) -> Result; - async fn get_nats_endpoint(&self, nats_cluster: &NatsCluster) -> Result; + async fn get_nats_endpoint(&self) -> Result; async fn get_local_nats_ca_certificate( &self, diff --git a/harmony/src/modules/nats/decentralized.rs b/harmony/src/modules/nats/decentralized.rs index 0f7e68a7..f917c9d5 100644 --- a/harmony/src/modules/nats/decentralized.rs +++ b/harmony/src/modules/nats/decentralized.rs @@ -38,7 +38,7 @@ impl NatsSupercluster for Decentralize ca_bundle.append(&mut certs) } - debug!("creating ca bundle {:#?} on site 1", ca_bundle); + debug!("creating ca bundle {:#?} on site ", ca_bundle); for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { site.install_local_ca_bundle(cluster.clone(), Some(ca_bundle.clone())) diff --git a/harmony/src/modules/nats/score_nats_k8s.rs b/harmony/src/modules/nats/score_nats_k8s.rs index bee4b43e..1a22cda3 100644 --- a/harmony/src/modules/nats/score_nats_k8s.rs +++ b/harmony/src/modules/nats/score_nats_k8s.rs @@ -1,52 +1,167 @@ use std::str::FromStr; +use async_trait::async_trait; use harmony_macros::hurl; +use harmony_secret::{Secret, SecretManager}; +use harmony_types::id::Id; use log::debug; use non_blank_string_rs::NonBlankString; -use serde::Serialize; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; use crate::{ - interpret::Interpret, + data::Version, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, modules::{ helm::chart::{HelmChartScore, HelmRepository}, - nats::capability::{NatsCluster, NatsEndpoint}, + k8s::ingress::K8sIngressScore, + nats::capability::{Nats, NatsCluster, NatsEndpoint}, + okd::{ + crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig}, + route::OKDRouteScore, + }, }, score::Score, - topology::{HelmCommand, Topology}, + topology::{HelmCommand, K8sclient, KubernetesDistribution, Topology}, }; #[derive(Debug, Clone, Serialize)] pub struct NatsK8sScore { + pub distribution: KubernetesDistribution, pub cluster: NatsCluster, - pub domain: NatsEndpoint, pub peers: Option>, } -impl Score for NatsK8sScore { +impl Score for NatsK8sScore { fn name(&self) -> String { "NatsK8sScore".to_string() } fn create_interpret(&self) -> Box> { - self.build_deploy_nats_score( - self.cluster.clone(), - self.domain.clone(), - self.peers.clone(), - self.cluster.namespace.clone(), + Box::new(NatsK8sInterpret { + score: self.clone(), + }) + } +} +#[derive(Debug)] +pub struct NatsK8sInterpret { + score: NatsK8sScore, +} + +#[async_trait] +impl Interpret for NatsK8sInterpret { + async fn execute( + &self, + inventory: &Inventory, + topology: &T, + ) -> Result { + let domain = topology.get_nats_endpoint().await?; + self.create_ingress( + topology, + inventory, + self.score.distribution.clone(), + self.score.cluster.clone(), ) - .create_interpret() + .await?; + + self.deploy_nats( + topology, + inventory, + self.score.cluster.clone(), + domain, + self.score.peers.clone(), + self.score.cluster.namespace.clone(), + ) + .await?; + Ok(Outcome::success( + "successfully deployed nats K8s".to_string(), + )) + } + + fn get_name(&self) -> InterpretName { + todo!() + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() } } -impl NatsK8sScore { - fn build_deploy_nats_score( +impl NatsK8sInterpret { + async fn create_ingress( &self, + topology: &T, + inventory: &Inventory, + distribution: KubernetesDistribution, + nats_cluster: NatsCluster, + ) -> Result { + match distribution { + KubernetesDistribution::OpenshiftFamily => { + OKDRouteScore { + name: nats_cluster.name.to_string(), + namespace: nats_cluster.namespace.to_string(), + spec: RouteSpec { + to: RouteTargetReference { + kind: "Service".to_string(), + name: format!("{}", nats_cluster.name.to_string()), + weight: Some(100), + }, + host: Some(nats_cluster.dns_name.clone()), + port: Some(RoutePort { target_port: 7222 }), + tls: Some(TLSConfig { + insecure_edge_termination_policy: None, + termination: "passthrough".to_string(), + ..Default::default() + }), + wildcard_policy: None, + ..Default::default() + }, + } + .interpret(inventory, topology) + .await + } + KubernetesDistribution::K3sFamily | KubernetesDistribution::Default => { + //TODO untested + K8sIngressScore { + name: todo!(), + host: todo!(), + backend_service: todo!(), + port: todo!(), + path: todo!(), + path_type: todo!(), + namespace: todo!(), + ingress_class_name: todo!(), + } + .interpret(inventory, topology) + .await + } + } + } + + async fn deploy_nats( + &self, + topology: &T, + inventory: &Inventory, cluster: NatsCluster, domain: NatsEndpoint, peers: Option>, namespace: String, - ) -> Box> { + ) -> Result { let mut gateway_gateways = String::new(); + + let admin = SecretManager::get_or_prompt::().await.unwrap(); + let user = admin.user.clone(); + let password = admin.password.clone(); + match peers { Some(peers) => { for peer in peers { @@ -84,8 +199,8 @@ config: accounts: system: users: - - user: "admin" - password: "admin_2" + - user: "{user}" + password: "{password}" logtime: true debug: true trace: true @@ -134,6 +249,8 @@ natsBox: image: tag: nonroot"#, cluster_name = cluster.name, + user = user, + password = password, replicas = cluster.replicas, domain = domain, gateway_gateways = gateway_gateways, @@ -159,7 +276,12 @@ natsBox: true, )), }; - - Box::new(nats) + nats.interpret(inventory, topology).await } } + +#[derive(Secret, Serialize, Deserialize, JsonSchema, Debug, PartialEq, Clone)] +struct NatsAdmin { + user: String, + password: String, +} -- 2.39.5 From cd81d6584c345898332aedc20f79a5e932fc0f04 Mon Sep 17 00:00:00 2001 From: wjro Date: Fri, 30 Jan 2026 13:41:38 -0500 Subject: [PATCH 08/10] fix: removed nats implementation details from k8sanywhere, added secret prompt for nats cluster using harmony secret --- .../src/domain/topology/k8s_anywhere/nats.rs | 153 +----------------- harmony/src/modules/nats/capability.rs | 25 +-- harmony/src/modules/nats/decentralized.rs | 48 +++--- harmony/src/modules/nats/mod.rs | 1 + harmony/src/modules/nats/pki.rs | 74 +++++++++ harmony/src/modules/nats/score_nats_k8s.rs | 81 ++++++++-- .../modules/nats/score_nats_supercluster.rs | 6 +- 7 files changed, 182 insertions(+), 206 deletions(-) create mode 100644 harmony/src/modules/nats/pki.rs diff --git a/harmony/src/domain/topology/k8s_anywhere/nats.rs b/harmony/src/domain/topology/k8s_anywhere/nats.rs index 8176386e..113ad74f 100644 --- a/harmony/src/domain/topology/k8s_anywhere/nats.rs +++ b/harmony/src/domain/topology/k8s_anywhere/nats.rs @@ -1,25 +1,13 @@ -use std::collections::BTreeMap; - use async_trait::async_trait; -use k8s_openapi::{ByteString, api::core::v1::Secret}; -use kube::api::ObjectMeta; -use log::{debug, info}; use crate::{ inventory::Inventory, - modules::{ - cert_manager::{ - capability::{CertificateManagement, CertificateManagementConfig}, - crd::CaIssuer, - }, - k8s::resource::K8sResourceScore, - nats::{ - capability::{Nats, NatsCluster, NatsEndpoint}, - score_nats_k8s::NatsK8sScore, - }, + modules::nats::{ + capability::{Nats, NatsCluster}, + score_nats_k8s::NatsK8sScore, }, score::Score, - topology::{K8sAnywhereTopology, TlsRouter}, + topology::K8sAnywhereTopology, }; #[async_trait] @@ -28,6 +16,7 @@ impl Nats for K8sAnywhereTopology { &self, nats_cluster: &NatsCluster, peers: Option>, + ca_bundle: Option>, ) -> Result { let distro = self.get_k8s_distribution().await.unwrap(); @@ -35,6 +24,7 @@ impl Nats for K8sAnywhereTopology { distribution: distro.clone(), cluster: nats_cluster.clone(), peers, + ca_bundle, } .interpret(&Inventory::empty(), self) .await @@ -45,135 +35,4 @@ impl Nats for K8sAnywhereTopology { nats_cluster.namespace )) } - - async fn get_nats_endpoint(&self) -> Result { - let endpoint = self.get_internal_domain().await?.unwrap(); - - Ok(NatsEndpoint { host: endpoint }) - } - - async fn get_local_nats_ca_certificate( - &self, - nats_cluster: &NatsCluster, - ) -> Result { - let ca_issuer_name = "harmony-ca-issuer".to_string(); - let root_ca_cert_name = "harmony-root-ca".to_string(); - let self_signed_issuer_name = "harmony-self-signed-issuer".to_string(); - - debug!("creating self signed issuer config"); - let self_signed_config = CertificateManagementConfig { - namespace: Some(nats_cluster.namespace.to_string().clone()), - acme_issuer: None, - ca_issuer: None, - self_signed: true, - }; - - debug!("create ca issuer config"); - let root_ca_config = CertificateManagementConfig { - namespace: Some(nats_cluster.namespace.clone().into()), - acme_issuer: None, - ca_issuer: Some(CaIssuer { - secret_name: format!("{}-tls", root_ca_cert_name), - }), - self_signed: false, - }; - - debug!( - "creating harmony self signed issuer {}", - self_signed_issuer_name - ); - self.create_issuer(self_signed_issuer_name.clone(), &self_signed_config) - .await - .map_err(|e| format!("{}", e))?; - - debug!( - "creating root ca cert from self signed issuer {}", - &root_ca_cert_name - ); - self.create_certificate( - root_ca_cert_name.clone(), - self_signed_issuer_name.clone(), - Some(format!("harmony-{}-ca", nats_cluster.name)), - None, - Some(true), - &root_ca_config, - ) - .await - .map_err(|e| format!("{}", e))?; - - debug!("creating root_ca_issuer {}", &ca_issuer_name); - self.create_issuer(ca_issuer_name.clone(), &root_ca_config) - .await - .map_err(|e| format!("{e}"))?; - - debug!( - "creating tls cert for nats gateway {}", - nats_cluster.tls_cert_name - ); - self.create_certificate( - nats_cluster.tls_cert_name.into(), - ca_issuer_name, - None, - Some(vec![nats_cluster.dns_name.clone()]), - Some(false), - &root_ca_config, - ) - .await - .map_err(|e| format!("{e}"))?; - - info!( - "Successfully created nats cluster certificates in cluster {}", - nats_cluster.name - ); - - let ca_cert = self - .get_ca_certificate(root_ca_cert_name, &root_ca_config) - .await - .map_err(|e| format!("{e}"))?; - - Ok(ca_cert) - } - - async fn install_local_ca_bundle( - &self, - nats_cluster: NatsCluster, - ca_certs: Option>, - ) -> Result { - async fn build_secret_data(bundle: &Vec) -> BTreeMap { - let mut data = BTreeMap::new(); - - data.insert( - "ca.crt".to_string(), - ByteString(bundle.join("\n").into_bytes()), - ); - - data - } - - match ca_certs { - Some(certs) => { - let bundle_secret = Secret { - metadata: ObjectMeta { - name: Some(nats_cluster.supercluster_ca_secret_name.to_string()), - namespace: Some(nats_cluster.namespace.to_string()), - ..Default::default() - }, - data: Some(build_secret_data(&certs).await), - immutable: Some(false), - type_: Some("Opaque".to_string()), - string_data: None, - }; - K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone())) - .interpret(&Inventory::empty(), self) - .await - .map_err(|e| format!("{e}"))?; - - Ok(format!( - "successfully created ca bundle {} in ns: {}", - nats_cluster.supercluster_ca_secret_name, nats_cluster.namespace - )) - } - None => Ok("no certs to create".to_string()), - } - } } diff --git a/harmony/src/modules/nats/capability.rs b/harmony/src/modules/nats/capability.rs index 838d8287..baea1a2f 100644 --- a/harmony/src/modules/nats/capability.rs +++ b/harmony/src/modules/nats/capability.rs @@ -9,36 +9,19 @@ pub trait Nats { &self, nats_cluster: &NatsCluster, peers: Option>, - ) -> Result; - - async fn get_nats_endpoint(&self) -> Result; - - async fn get_local_nats_ca_certificate( - &self, - nats_cluster: &NatsCluster, - ) -> Result; - - async fn install_local_ca_bundle( - &self, - nats_cluster: NatsCluster, - ca_certs: Option>, + ca_bundle: Option>, ) -> Result; } #[async_trait] pub trait NatsSupercluster { - async fn deploy_supercluster(&self, nats_clusters: Vec) -> Result; + async fn deploy_site(&self, nats_clusters: Vec, ca_bundle: Vec) -> Result; - async fn distribute_ca_bundle( + async fn create_site_ca_bundle( &self, nats_clusters: Vec, ca_certs: Option>, - ) -> Result; -} - -pub struct SiteContext { - pub topology: T, - pub cluster: NatsCluster, + ) -> Result, String>; } #[derive(Debug, Clone, Serialize)] diff --git a/harmony/src/modules/nats/decentralized.rs b/harmony/src/modules/nats/decentralized.rs index f917c9d5..a57cf0f0 100644 --- a/harmony/src/modules/nats/decentralized.rs +++ b/harmony/src/modules/nats/decentralized.rs @@ -2,36 +2,22 @@ use async_trait::async_trait; use log::debug; use crate::{ - modules::nats::capability::{Nats, NatsCluster, NatsEndpoint, NatsSupercluster}, + modules::{cert_manager::capability::CertificateManagement, nats::{capability::{Nats, NatsCluster, NatsSupercluster}, pki::NatsPkiPolicy}}, topology::{TlsRouter, Topology, decentralized::DecentralizedTopology}, }; #[async_trait] -impl NatsSupercluster for DecentralizedTopology { - async fn deploy_supercluster(&self, nats_clusters: Vec) -> Result { - for (i, (site, cluster)) in self.sites.iter().zip(nats_clusters.iter()).enumerate() { - let peers: Vec = nats_clusters - .iter() - .enumerate() - .filter_map(|(j, c)| if i == j { None } else { Some(c.clone()) }) - .collect(); - - site.deploy(cluster, Some(peers)).await?; - } - - Ok(format!("Deployed Nats clusters across sites",)) - } - - async fn distribute_ca_bundle( +impl NatsSupercluster for DecentralizedTopology { + async fn create_site_ca_bundle( &self, nats_clusters: Vec, ca_certs: Option>, - ) -> Result { + ) -> Result, String> { debug!("extracting nats clusters"); let mut ca_bundle = Vec::new(); for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { - let ca_cert = site.get_local_nats_ca_certificate(&cluster).await?; - ca_bundle.push(ca_cert); + let policy = NatsPkiPolicy{ topology: site}; + ca_bundle.push(policy.ensure_nats_ca(cluster).await.unwrap()); } if let Some(mut certs) = ca_certs { @@ -40,13 +26,23 @@ impl NatsSupercluster for Decentralize debug!("creating ca bundle {:#?} on site ", ca_bundle); - for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { - site.install_local_ca_bundle(cluster.clone(), Some(ca_bundle.clone())) - .await?; + Ok(ca_bundle) + } + async fn deploy_site( + &self, + nats_clusters: Vec, + ca_bundle: Vec, + ) -> Result { + for (i, (site, cluster)) in self.sites.iter().zip(nats_clusters.iter()).enumerate() { + let peers: Vec = nats_clusters + .iter() + .enumerate() + .filter_map(|(j, c)| if i == j { None } else { Some(c.clone()) }) + .collect(); + + site.deploy(cluster, Some(peers), Some(ca_bundle.clone())).await?; } - Ok(format!( - "Successfully deployed nats ca bundle accross clusters", - )) + Ok(format!("Deployed Nats clusters across sites",)) } } diff --git a/harmony/src/modules/nats/mod.rs b/harmony/src/modules/nats/mod.rs index 1822f1f5..021ab26c 100644 --- a/harmony/src/modules/nats/mod.rs +++ b/harmony/src/modules/nats/mod.rs @@ -2,3 +2,4 @@ pub mod capability; pub mod decentralized; pub mod score_nats_k8s; pub mod score_nats_supercluster; +pub mod pki; diff --git a/harmony/src/modules/nats/pki.rs b/harmony/src/modules/nats/pki.rs new file mode 100644 index 00000000..5c281399 --- /dev/null +++ b/harmony/src/modules/nats/pki.rs @@ -0,0 +1,74 @@ +use crate::modules::{cert_manager::{capability::{CertificateManagement, CertificateManagementConfig}, crd::CaIssuer}, nats::capability::NatsCluster}; + +pub struct NatsPkiPolicy<'a, T> { + pub topology: &'a T, +} + +impl<'a, T> NatsPkiPolicy<'a, T> +where + T: CertificateManagement, +{ + pub async fn ensure_nats_ca( + &self, + cluster: &NatsCluster, + ) -> Result { + let ca_issuer_name = "harmony-ca-issuer"; + let root_ca_cert_name = "harmony-root-ca"; + let self_signed_issuer_name = "harmony-self-signed-issuer"; + + let self_signed_config = CertificateManagementConfig { + namespace: Some(cluster.namespace.clone()), + acme_issuer: None, + ca_issuer: None, + self_signed: true, + }; + + let root_ca_config = CertificateManagementConfig { + namespace: Some(cluster.namespace.clone()), + acme_issuer: None, + ca_issuer: Some(CaIssuer { + secret_name: format!("{}-tls", root_ca_cert_name), + }), + self_signed: false, + }; + + self.topology + .create_issuer(self_signed_issuer_name.into(), &self_signed_config) + .await + .map_err(|e| e.to_string())?; + + self.topology + .create_certificate( + root_ca_cert_name.into(), + self_signed_issuer_name.into(), + Some(format!("harmony-{}-ca", cluster.name)), + None, + Some(true), + &root_ca_config, + ) + .await + .map_err(|e| e.to_string())?; + + self.topology + .create_issuer(ca_issuer_name.into(), &root_ca_config) + .await + .map_err(|e| e.to_string())?; + + self.topology + .create_certificate( + cluster.tls_cert_name.into(), + ca_issuer_name.into(), + None, + Some(vec![cluster.dns_name.clone()]), + Some(false), + &root_ca_config, + ) + .await + .map_err(|e| e.to_string())?; + + self.topology + .get_ca_certificate(root_ca_cert_name.into(), &root_ca_config) + .await + .map_err(|e| e.to_string()) + } +} diff --git a/harmony/src/modules/nats/score_nats_k8s.rs b/harmony/src/modules/nats/score_nats_k8s.rs index 1a22cda3..cad35e38 100644 --- a/harmony/src/modules/nats/score_nats_k8s.rs +++ b/harmony/src/modules/nats/score_nats_k8s.rs @@ -1,10 +1,12 @@ -use std::str::FromStr; +use std::{collections::BTreeMap, str::FromStr}; use async_trait::async_trait; use harmony_macros::hurl; use harmony_secret::{Secret, SecretManager}; use harmony_types::id::Id; -use log::debug; +use k8s_openapi::{ByteString, api::core::v1::Secret as K8sSecret}; +use kube::api::ObjectMeta; +use log::{debug, info}; use non_blank_string_rs::NonBlankString; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -15,7 +17,7 @@ use crate::{ inventory::Inventory, modules::{ helm::chart::{HelmChartScore, HelmRepository}, - k8s::ingress::K8sIngressScore, + k8s::{ingress::K8sIngressScore, resource::K8sResourceScore}, nats::capability::{Nats, NatsCluster, NatsEndpoint}, okd::{ crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig}, @@ -23,7 +25,7 @@ use crate::{ }, }, score::Score, - topology::{HelmCommand, K8sclient, KubernetesDistribution, Topology}, + topology::{HelmCommand, K8sclient, KubernetesDistribution, TlsRouter, Topology}, }; #[derive(Debug, Clone, Serialize)] @@ -31,9 +33,10 @@ pub struct NatsK8sScore { pub distribution: KubernetesDistribution, pub cluster: NatsCluster, pub peers: Option>, + pub ca_bundle: Option>, } -impl Score for NatsK8sScore { +impl Score for NatsK8sScore { fn name(&self) -> String { "NatsK8sScore".to_string() } @@ -50,13 +53,21 @@ pub struct NatsK8sInterpret { } #[async_trait] -impl Interpret for NatsK8sInterpret { +impl Interpret + for NatsK8sInterpret +{ async fn execute( &self, inventory: &Inventory, topology: &T, ) -> Result { - let domain = topology.get_nats_endpoint().await?; + let domain = topology + .get_internal_domain() + .await? + .ok_or(InterpretError::new("no internal domain found".to_string())) + .unwrap(); + + info!("creating ingress for nats cluster"); self.create_ingress( topology, inventory, @@ -65,6 +76,18 @@ impl Interpret for Na ) .await?; + let domain = NatsEndpoint { host: domain }; + + info!("creating nats ca bundle secret"); + self.create_ca_bundle_secret( + topology, + inventory, + self.score.cluster.clone(), + self.score.ca_bundle.clone(), + ) + .await?; + + info!("deploying nats cluster"); self.deploy_nats( topology, inventory, @@ -80,7 +103,7 @@ impl Interpret for Na } fn get_name(&self) -> InterpretName { - todo!() + InterpretName::Custom("NatsK8sInterpret") } fn get_version(&self) -> Version { @@ -147,6 +170,47 @@ impl NatsK8sInterpret { } } + async fn create_ca_bundle_secret( + &self, + topology: &T, + inventory: &Inventory, + nats_cluster: NatsCluster, + ca_bundle: Option>, + ) -> Result { + async fn build_secret_data(bundle: &Vec) -> BTreeMap { + let mut data = BTreeMap::new(); + + data.insert( + "ca.crt".to_string(), + ByteString(bundle.join("\n").into_bytes()), + ); + + data + } + + match ca_bundle { + Some(certs) => { + let bundle_secret = K8sSecret { + metadata: ObjectMeta { + name: Some(nats_cluster.supercluster_ca_secret_name.to_string()), + namespace: Some(nats_cluster.namespace.to_string()), + ..Default::default() + }, + data: Some(build_secret_data(&certs).await), + immutable: Some(false), + type_: Some("Opaque".to_string()), + string_data: None, + }; + + K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone())) + .interpret(inventory, topology) + .await + .map_err(|e| InterpretError::new(e.to_string())) + } + None => Ok(Outcome::noop("no certs to create".to_string())), + } + } + async fn deploy_nats( &self, topology: &T, @@ -157,7 +221,6 @@ impl NatsK8sInterpret { namespace: String, ) -> Result { let mut gateway_gateways = String::new(); - let admin = SecretManager::get_or_prompt::().await.unwrap(); let user = admin.user.clone(); let password = admin.password.clone(); diff --git a/harmony/src/modules/nats/score_nats_supercluster.rs b/harmony/src/modules/nats/score_nats_supercluster.rs index e6ef64e1..8ff244a3 100644 --- a/harmony/src/modules/nats/score_nats_supercluster.rs +++ b/harmony/src/modules/nats/score_nats_supercluster.rs @@ -45,14 +45,14 @@ impl Interpret for NatsSuperclusterInterpret topology: &T, ) -> Result { info!("creating nats supercluster ca bundle"); - topology - .distribute_ca_bundle(self.nats_clusters.clone(), self.ca_certs.clone()) + let ca_bundle = topology + .create_site_ca_bundle(self.nats_clusters.clone(), self.ca_certs.clone()) .await .map_err(|e| InterpretError::new(e))?; info!("deploying nats supercluster"); let cluster_name = topology - .deploy_supercluster(self.nats_clusters.clone()) + .deploy_site(self.nats_clusters.clone(), ca_bundle) .await .map_err(|e| InterpretError::new(e))?; -- 2.39.5 From 329d5d8473f44628f4a3044df6b6a9cecdecb90d Mon Sep 17 00:00:00 2001 From: wjro Date: Fri, 30 Jan 2026 13:42:01 -0500 Subject: [PATCH 09/10] fix: format --- harmony/src/modules/nats/capability.rs | 6 +++++- harmony/src/modules/nats/decentralized.rs | 17 +++++++++++++---- harmony/src/modules/nats/mod.rs | 2 +- harmony/src/modules/nats/pki.rs | 13 ++++++++----- 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/harmony/src/modules/nats/capability.rs b/harmony/src/modules/nats/capability.rs index baea1a2f..2988a2a0 100644 --- a/harmony/src/modules/nats/capability.rs +++ b/harmony/src/modules/nats/capability.rs @@ -15,7 +15,11 @@ pub trait Nats { #[async_trait] pub trait NatsSupercluster { - async fn deploy_site(&self, nats_clusters: Vec, ca_bundle: Vec) -> Result; + async fn deploy_site( + &self, + nats_clusters: Vec, + ca_bundle: Vec, + ) -> Result; async fn create_site_ca_bundle( &self, diff --git a/harmony/src/modules/nats/decentralized.rs b/harmony/src/modules/nats/decentralized.rs index a57cf0f0..7f805c18 100644 --- a/harmony/src/modules/nats/decentralized.rs +++ b/harmony/src/modules/nats/decentralized.rs @@ -2,12 +2,20 @@ use async_trait::async_trait; use log::debug; use crate::{ - modules::{cert_manager::capability::CertificateManagement, nats::{capability::{Nats, NatsCluster, NatsSupercluster}, pki::NatsPkiPolicy}}, + modules::{ + cert_manager::capability::CertificateManagement, + nats::{ + capability::{Nats, NatsCluster, NatsSupercluster}, + pki::NatsPkiPolicy, + }, + }, topology::{TlsRouter, Topology, decentralized::DecentralizedTopology}, }; #[async_trait] -impl NatsSupercluster for DecentralizedTopology { +impl NatsSupercluster + for DecentralizedTopology +{ async fn create_site_ca_bundle( &self, nats_clusters: Vec, @@ -16,7 +24,7 @@ impl NatsSuper debug!("extracting nats clusters"); let mut ca_bundle = Vec::new(); for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { - let policy = NatsPkiPolicy{ topology: site}; + let policy = NatsPkiPolicy { topology: site }; ca_bundle.push(policy.ensure_nats_ca(cluster).await.unwrap()); } @@ -40,7 +48,8 @@ impl NatsSuper .filter_map(|(j, c)| if i == j { None } else { Some(c.clone()) }) .collect(); - site.deploy(cluster, Some(peers), Some(ca_bundle.clone())).await?; + site.deploy(cluster, Some(peers), Some(ca_bundle.clone())) + .await?; } Ok(format!("Deployed Nats clusters across sites",)) diff --git a/harmony/src/modules/nats/mod.rs b/harmony/src/modules/nats/mod.rs index 021ab26c..6758c77b 100644 --- a/harmony/src/modules/nats/mod.rs +++ b/harmony/src/modules/nats/mod.rs @@ -1,5 +1,5 @@ pub mod capability; pub mod decentralized; +pub mod pki; pub mod score_nats_k8s; pub mod score_nats_supercluster; -pub mod pki; diff --git a/harmony/src/modules/nats/pki.rs b/harmony/src/modules/nats/pki.rs index 5c281399..0759c9b4 100644 --- a/harmony/src/modules/nats/pki.rs +++ b/harmony/src/modules/nats/pki.rs @@ -1,4 +1,10 @@ -use crate::modules::{cert_manager::{capability::{CertificateManagement, CertificateManagementConfig}, crd::CaIssuer}, nats::capability::NatsCluster}; +use crate::modules::{ + cert_manager::{ + capability::{CertificateManagement, CertificateManagementConfig}, + crd::CaIssuer, + }, + nats::capability::NatsCluster, +}; pub struct NatsPkiPolicy<'a, T> { pub topology: &'a T, @@ -8,10 +14,7 @@ impl<'a, T> NatsPkiPolicy<'a, T> where T: CertificateManagement, { - pub async fn ensure_nats_ca( - &self, - cluster: &NatsCluster, - ) -> Result { + pub async fn ensure_nats_ca(&self, cluster: &NatsCluster) -> Result { let ca_issuer_name = "harmony-ca-issuer"; let root_ca_cert_name = "harmony-root-ca"; let self_signed_issuer_name = "harmony-self-signed-issuer"; -- 2.39.5 From 8b200cfe9144edf7023df56e8ce45ccf35667332 Mon Sep 17 00:00:00 2001 From: wjro Date: Fri, 30 Jan 2026 14:02:52 -0500 Subject: [PATCH 10/10] chore: removed commented code --- .../topology/k8s_anywhere/k8s_anywhere.rs | 58 ------------------- 1 file changed, 58 deletions(-) diff --git a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs index 13c1a953..4f735dbc 100644 --- a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs @@ -568,64 +568,6 @@ impl K8sAnywhereTopology { } } - // async fn build_ca_bundle_secret( - // &self, - // namespace: &str, - // nats_cluster: &NatsCluster, - // bundle: &Vec, - // ) -> Secret { - // Secret { - // metadata: ObjectMeta { - // name: Some(nats_cluster.supercluster_ca_secret_name.to_string()), - // namespace: Some(namespace.to_string()), - // ..Default::default() - // }, - // data: Some(self.build_secret_data(bundle).await), - // immutable: Some(false), - // type_: Some("Opaque".to_string()), - // string_data: None, - // } - // } - // - // async fn build_secret_data(&self, bundle: &Vec) -> BTreeMap { - // let mut data = BTreeMap::new(); - // - // data.insert( - // "ca.crt".to_string(), - // ByteString(bundle.join("\n").into_bytes()), - // ); - // - // data - // } - // - // pub async fn create_ca_bundle_secret( - // &self, - // nats_cluster: &NatsCluster, - // ca_bundle: &Vec, - // ) -> Result { - // let bundle_secret = self - // .build_ca_bundle_secret(&nats_cluster.namespace, nats_cluster, ca_bundle) - // .await; - // - // debug!( - // "deploying secret to ns: {} \nsecret: {:#?}", - // nats_cluster.namespace, bundle_secret - // ); - // - // let k8ssecret = - // K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone())); - // - // k8ssecret - // .interpret(&Inventory::empty(), self) - // .await - // .map_err(|e| ExecutorError::UnexpectedError(format!("{e}")))?; - // - // Ok(Outcome::success(format!( - // "successfully craeted ca bundle {} in ns: {}", - // nats_cluster.supercluster_ca_secret_name, nats_cluster.namespace - // ))) - // } - pub async fn certificate_issuer_ready( &self, issuer_name: String, -- 2.39.5