From a0f32bb565378138933a70f98becd95c23546e4a Mon Sep 17 00:00:00 2001 From: wjro Date: Fri, 30 Jan 2026 10:57:22 -0500 Subject: [PATCH 1/4] wip: working on separation of concerns --- .../topology/k8s_anywhere/k8s_anywhere.rs | 116 ++++++------- .../src/domain/topology/k8s_anywhere/nats.rs | 117 +++++-------- harmony/src/modules/nats/capability.rs | 2 +- harmony/src/modules/nats/decentralized.rs | 2 +- harmony/src/modules/nats/score_nats_k8s.rs | 160 +++++++++++++++--- 5 files changed, 241 insertions(+), 156 deletions(-) diff --git a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs index 97d02e8c..94f23aa3 100644 --- a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs @@ -89,7 +89,7 @@ struct K8sState { message: String, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub enum KubernetesDistribution { OpenshiftFamily, K3sFamily, @@ -575,63 +575,63 @@ impl K8sAnywhereTopology { } } - async fn build_ca_bundle_secret( - &self, - namespace: &str, - nats_cluster: &NatsCluster, - bundle: &Vec, - ) -> Secret { - Secret { - metadata: ObjectMeta { - name: Some(nats_cluster.supercluster_ca_secret_name.to_string()), - namespace: Some(namespace.to_string()), - ..Default::default() - }, - data: Some(self.build_secret_data(bundle).await), - immutable: Some(false), - type_: Some("Opaque".to_string()), - string_data: None, - } - } - - async fn build_secret_data(&self, bundle: &Vec) -> BTreeMap { - let mut data = BTreeMap::new(); - - data.insert( - "ca.crt".to_string(), - ByteString(bundle.join("\n").into_bytes()), - ); - - data - } - - pub async fn create_ca_bundle_secret( - &self, - nats_cluster: &NatsCluster, - ca_bundle: &Vec, - ) -> Result { - let bundle_secret = self - .build_ca_bundle_secret(&nats_cluster.namespace, nats_cluster, ca_bundle) - .await; - - debug!( - "deploying secret to ns: {} \nsecret: {:#?}", - nats_cluster.namespace, bundle_secret - ); - - let k8ssecret = - K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone())); - - k8ssecret - .interpret(&Inventory::empty(), self) - .await - .map_err(|e| ExecutorError::UnexpectedError(format!("{e}")))?; - - Ok(Outcome::success(format!( - "successfully craeted ca bundle {} in ns: {}", - nats_cluster.supercluster_ca_secret_name, nats_cluster.namespace - ))) - } + // async fn build_ca_bundle_secret( + // &self, + // namespace: &str, + // nats_cluster: &NatsCluster, + // bundle: &Vec, + // ) -> Secret { + // Secret { + // metadata: ObjectMeta { + // name: Some(nats_cluster.supercluster_ca_secret_name.to_string()), + // namespace: Some(namespace.to_string()), + // ..Default::default() + // }, + // data: Some(self.build_secret_data(bundle).await), + // immutable: Some(false), + // type_: Some("Opaque".to_string()), + // string_data: None, + // } + // } + // + // async fn build_secret_data(&self, bundle: &Vec) -> BTreeMap { + // let mut data = BTreeMap::new(); + // + // data.insert( + // "ca.crt".to_string(), + // ByteString(bundle.join("\n").into_bytes()), + // ); + // + // data + // } + // + // pub async fn create_ca_bundle_secret( + // &self, + // nats_cluster: &NatsCluster, + // ca_bundle: &Vec, + // ) -> Result { + // let bundle_secret = self + // .build_ca_bundle_secret(&nats_cluster.namespace, nats_cluster, ca_bundle) + // .await; + // + // debug!( + // "deploying secret to ns: {} \nsecret: {:#?}", + // nats_cluster.namespace, bundle_secret + // ); + // + // let k8ssecret = + // K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone())); + // + // k8ssecret + // .interpret(&Inventory::empty(), self) + // .await + // .map_err(|e| ExecutorError::UnexpectedError(format!("{e}")))?; + // + // Ok(Outcome::success(format!( + // "successfully craeted ca bundle {} in ns: {}", + // nats_cluster.supercluster_ca_secret_name, nats_cluster.namespace + // ))) + // } pub async fn certificate_issuer_ready( &self, diff --git a/harmony/src/domain/topology/k8s_anywhere/nats.rs b/harmony/src/domain/topology/k8s_anywhere/nats.rs index 3fcba127..8176386e 100644 --- a/harmony/src/domain/topology/k8s_anywhere/nats.rs +++ b/harmony/src/domain/topology/k8s_anywhere/nats.rs @@ -1,4 +1,8 @@ +use std::collections::BTreeMap; + use async_trait::async_trait; +use k8s_openapi::{ByteString, api::core::v1::Secret}; +use kube::api::ObjectMeta; use log::{debug, info}; use crate::{ @@ -8,18 +12,14 @@ use crate::{ capability::{CertificateManagement, CertificateManagementConfig}, crd::CaIssuer, }, - k8s::ingress::K8sIngressScore, + k8s::resource::K8sResourceScore, nats::{ - capability::{Nats, NatsCluster, NatsEndpoint, NatsSupercluster}, + capability::{Nats, NatsCluster, NatsEndpoint}, score_nats_k8s::NatsK8sScore, }, - okd::{ - crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig}, - route::OKDRouteScore, - }, }, score::Score, - topology::{K8sAnywhereTopology, KubernetesDistribution, TlsRouter}, + topology::{K8sAnywhereTopology, TlsRouter}, }; #[async_trait] @@ -29,75 +29,11 @@ impl Nats for K8sAnywhereTopology { nats_cluster: &NatsCluster, peers: Option>, ) -> Result { - let domain = self.get_nats_endpoint(nats_cluster).await?; - match self - .get_k8s_distribution() - .await - .map_err(|e| format!("Error getting k8s distribution : {}", e.to_string()))? - { - KubernetesDistribution::OpenshiftFamily => { - let route = OKDRouteScore { - name: nats_cluster.name.to_string(), - namespace: nats_cluster.namespace.to_string(), - spec: RouteSpec { - to: RouteTargetReference { - kind: "Service".to_string(), - name: format!("{}", nats_cluster.name.to_string()), - weight: Some(100), - }, - host: Some(nats_cluster.dns_name.clone()), - port: Some(RoutePort { target_port: 7222 }), - tls: Some(TLSConfig { - insecure_edge_termination_policy: None, - termination: "passthrough".to_string(), - ..Default::default() - }), - wildcard_policy: None, - ..Default::default() - }, - }; - route - .interpret(&Inventory::empty(), self) - .await - .map_err(|e| format!("Failed to deploy OKD Route: {e}"))?; - } - KubernetesDistribution::K3sFamily => { - //TODO untested - K8sIngressScore { - name: todo!(), - host: todo!(), - backend_service: todo!(), - port: todo!(), - path: todo!(), - path_type: todo!(), - namespace: todo!(), - ingress_class_name: todo!(), - } - .interpret(&Inventory::empty(), self) - .await - .map_err(|e| format!("Failed to deploy K8s ingress: {e}"))?; - } - KubernetesDistribution::Default => { - //TODO untested - K8sIngressScore { - name: todo!(), - host: todo!(), - backend_service: todo!(), - port: todo!(), - path: todo!(), - path_type: todo!(), - namespace: todo!(), - ingress_class_name: todo!(), - } - .interpret(&Inventory::empty(), self) - .await - .map_err(|e| format!("Failed to deploy K8s ingress: {e}"))?; - } - } + let distro = self.get_k8s_distribution().await.unwrap(); NatsK8sScore { + distribution: distro.clone(), cluster: nats_cluster.clone(), - domain, peers, } .interpret(&Inventory::empty(), self) @@ -110,7 +46,7 @@ impl Nats for K8sAnywhereTopology { )) } - async fn get_nats_endpoint(&self, nats_cluster: &NatsCluster) -> Result { + async fn get_nats_endpoint(&self) -> Result { let endpoint = self.get_internal_domain().await?.unwrap(); Ok(NatsEndpoint { host: endpoint }) @@ -203,12 +139,39 @@ impl Nats for K8sAnywhereTopology { nats_cluster: NatsCluster, ca_certs: Option>, ) -> Result { + async fn build_secret_data(bundle: &Vec) -> BTreeMap { + let mut data = BTreeMap::new(); + + data.insert( + "ca.crt".to_string(), + ByteString(bundle.join("\n").into_bytes()), + ); + + data + } + match ca_certs { Some(certs) => { - self.create_ca_bundle_secret(&nats_cluster, &certs) + let bundle_secret = Secret { + metadata: ObjectMeta { + name: Some(nats_cluster.supercluster_ca_secret_name.to_string()), + namespace: Some(nats_cluster.namespace.to_string()), + ..Default::default() + }, + data: Some(build_secret_data(&certs).await), + immutable: Some(false), + type_: Some("Opaque".to_string()), + string_data: None, + }; + K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone())) + .interpret(&Inventory::empty(), self) .await - .map_err(|e| e.to_string())?; - Ok("Successfully created ca bundle secret".to_string()) + .map_err(|e| format!("{e}"))?; + + Ok(format!( + "successfully created ca bundle {} in ns: {}", + nats_cluster.supercluster_ca_secret_name, nats_cluster.namespace + )) } None => Ok("no certs to create".to_string()), } diff --git a/harmony/src/modules/nats/capability.rs b/harmony/src/modules/nats/capability.rs index fcdf8d91..838d8287 100644 --- a/harmony/src/modules/nats/capability.rs +++ b/harmony/src/modules/nats/capability.rs @@ -11,7 +11,7 @@ pub trait Nats { peers: Option>, ) -> Result; - async fn get_nats_endpoint(&self, nats_cluster: &NatsCluster) -> Result; + async fn get_nats_endpoint(&self) -> Result; async fn get_local_nats_ca_certificate( &self, diff --git a/harmony/src/modules/nats/decentralized.rs b/harmony/src/modules/nats/decentralized.rs index 0f7e68a7..f917c9d5 100644 --- a/harmony/src/modules/nats/decentralized.rs +++ b/harmony/src/modules/nats/decentralized.rs @@ -38,7 +38,7 @@ impl NatsSupercluster for Decentralize ca_bundle.append(&mut certs) } - debug!("creating ca bundle {:#?} on site 1", ca_bundle); + debug!("creating ca bundle {:#?} on site ", ca_bundle); for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { site.install_local_ca_bundle(cluster.clone(), Some(ca_bundle.clone())) diff --git a/harmony/src/modules/nats/score_nats_k8s.rs b/harmony/src/modules/nats/score_nats_k8s.rs index bee4b43e..1a22cda3 100644 --- a/harmony/src/modules/nats/score_nats_k8s.rs +++ b/harmony/src/modules/nats/score_nats_k8s.rs @@ -1,52 +1,167 @@ use std::str::FromStr; +use async_trait::async_trait; use harmony_macros::hurl; +use harmony_secret::{Secret, SecretManager}; +use harmony_types::id::Id; use log::debug; use non_blank_string_rs::NonBlankString; -use serde::Serialize; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; use crate::{ - interpret::Interpret, + data::Version, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, modules::{ helm::chart::{HelmChartScore, HelmRepository}, - nats::capability::{NatsCluster, NatsEndpoint}, + k8s::ingress::K8sIngressScore, + nats::capability::{Nats, NatsCluster, NatsEndpoint}, + okd::{ + crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig}, + route::OKDRouteScore, + }, }, score::Score, - topology::{HelmCommand, Topology}, + topology::{HelmCommand, K8sclient, KubernetesDistribution, Topology}, }; #[derive(Debug, Clone, Serialize)] pub struct NatsK8sScore { + pub distribution: KubernetesDistribution, pub cluster: NatsCluster, - pub domain: NatsEndpoint, pub peers: Option>, } -impl Score for NatsK8sScore { +impl Score for NatsK8sScore { fn name(&self) -> String { "NatsK8sScore".to_string() } fn create_interpret(&self) -> Box> { - self.build_deploy_nats_score( - self.cluster.clone(), - self.domain.clone(), - self.peers.clone(), - self.cluster.namespace.clone(), + Box::new(NatsK8sInterpret { + score: self.clone(), + }) + } +} +#[derive(Debug)] +pub struct NatsK8sInterpret { + score: NatsK8sScore, +} + +#[async_trait] +impl Interpret for NatsK8sInterpret { + async fn execute( + &self, + inventory: &Inventory, + topology: &T, + ) -> Result { + let domain = topology.get_nats_endpoint().await?; + self.create_ingress( + topology, + inventory, + self.score.distribution.clone(), + self.score.cluster.clone(), ) - .create_interpret() + .await?; + + self.deploy_nats( + topology, + inventory, + self.score.cluster.clone(), + domain, + self.score.peers.clone(), + self.score.cluster.namespace.clone(), + ) + .await?; + Ok(Outcome::success( + "successfully deployed nats K8s".to_string(), + )) + } + + fn get_name(&self) -> InterpretName { + todo!() + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() } } -impl NatsK8sScore { - fn build_deploy_nats_score( +impl NatsK8sInterpret { + async fn create_ingress( &self, + topology: &T, + inventory: &Inventory, + distribution: KubernetesDistribution, + nats_cluster: NatsCluster, + ) -> Result { + match distribution { + KubernetesDistribution::OpenshiftFamily => { + OKDRouteScore { + name: nats_cluster.name.to_string(), + namespace: nats_cluster.namespace.to_string(), + spec: RouteSpec { + to: RouteTargetReference { + kind: "Service".to_string(), + name: format!("{}", nats_cluster.name.to_string()), + weight: Some(100), + }, + host: Some(nats_cluster.dns_name.clone()), + port: Some(RoutePort { target_port: 7222 }), + tls: Some(TLSConfig { + insecure_edge_termination_policy: None, + termination: "passthrough".to_string(), + ..Default::default() + }), + wildcard_policy: None, + ..Default::default() + }, + } + .interpret(inventory, topology) + .await + } + KubernetesDistribution::K3sFamily | KubernetesDistribution::Default => { + //TODO untested + K8sIngressScore { + name: todo!(), + host: todo!(), + backend_service: todo!(), + port: todo!(), + path: todo!(), + path_type: todo!(), + namespace: todo!(), + ingress_class_name: todo!(), + } + .interpret(inventory, topology) + .await + } + } + } + + async fn deploy_nats( + &self, + topology: &T, + inventory: &Inventory, cluster: NatsCluster, domain: NatsEndpoint, peers: Option>, namespace: String, - ) -> Box> { + ) -> Result { let mut gateway_gateways = String::new(); + + let admin = SecretManager::get_or_prompt::().await.unwrap(); + let user = admin.user.clone(); + let password = admin.password.clone(); + match peers { Some(peers) => { for peer in peers { @@ -84,8 +199,8 @@ config: accounts: system: users: - - user: "admin" - password: "admin_2" + - user: "{user}" + password: "{password}" logtime: true debug: true trace: true @@ -134,6 +249,8 @@ natsBox: image: tag: nonroot"#, cluster_name = cluster.name, + user = user, + password = password, replicas = cluster.replicas, domain = domain, gateway_gateways = gateway_gateways, @@ -159,7 +276,12 @@ natsBox: true, )), }; - - Box::new(nats) + nats.interpret(inventory, topology).await } } + +#[derive(Secret, Serialize, Deserialize, JsonSchema, Debug, PartialEq, Clone)] +struct NatsAdmin { + user: String, + password: String, +} -- 2.39.5 From cd81d6584c345898332aedc20f79a5e932fc0f04 Mon Sep 17 00:00:00 2001 From: wjro Date: Fri, 30 Jan 2026 13:41:38 -0500 Subject: [PATCH 2/4] fix: removed nats implementation details from k8sanywhere, added secret prompt for nats cluster using harmony secret --- .../src/domain/topology/k8s_anywhere/nats.rs | 153 +----------------- harmony/src/modules/nats/capability.rs | 25 +-- harmony/src/modules/nats/decentralized.rs | 48 +++--- harmony/src/modules/nats/mod.rs | 1 + harmony/src/modules/nats/pki.rs | 74 +++++++++ harmony/src/modules/nats/score_nats_k8s.rs | 81 ++++++++-- .../modules/nats/score_nats_supercluster.rs | 6 +- 7 files changed, 182 insertions(+), 206 deletions(-) create mode 100644 harmony/src/modules/nats/pki.rs diff --git a/harmony/src/domain/topology/k8s_anywhere/nats.rs b/harmony/src/domain/topology/k8s_anywhere/nats.rs index 8176386e..113ad74f 100644 --- a/harmony/src/domain/topology/k8s_anywhere/nats.rs +++ b/harmony/src/domain/topology/k8s_anywhere/nats.rs @@ -1,25 +1,13 @@ -use std::collections::BTreeMap; - use async_trait::async_trait; -use k8s_openapi::{ByteString, api::core::v1::Secret}; -use kube::api::ObjectMeta; -use log::{debug, info}; use crate::{ inventory::Inventory, - modules::{ - cert_manager::{ - capability::{CertificateManagement, CertificateManagementConfig}, - crd::CaIssuer, - }, - k8s::resource::K8sResourceScore, - nats::{ - capability::{Nats, NatsCluster, NatsEndpoint}, - score_nats_k8s::NatsK8sScore, - }, + modules::nats::{ + capability::{Nats, NatsCluster}, + score_nats_k8s::NatsK8sScore, }, score::Score, - topology::{K8sAnywhereTopology, TlsRouter}, + topology::K8sAnywhereTopology, }; #[async_trait] @@ -28,6 +16,7 @@ impl Nats for K8sAnywhereTopology { &self, nats_cluster: &NatsCluster, peers: Option>, + ca_bundle: Option>, ) -> Result { let distro = self.get_k8s_distribution().await.unwrap(); @@ -35,6 +24,7 @@ impl Nats for K8sAnywhereTopology { distribution: distro.clone(), cluster: nats_cluster.clone(), peers, + ca_bundle, } .interpret(&Inventory::empty(), self) .await @@ -45,135 +35,4 @@ impl Nats for K8sAnywhereTopology { nats_cluster.namespace )) } - - async fn get_nats_endpoint(&self) -> Result { - let endpoint = self.get_internal_domain().await?.unwrap(); - - Ok(NatsEndpoint { host: endpoint }) - } - - async fn get_local_nats_ca_certificate( - &self, - nats_cluster: &NatsCluster, - ) -> Result { - let ca_issuer_name = "harmony-ca-issuer".to_string(); - let root_ca_cert_name = "harmony-root-ca".to_string(); - let self_signed_issuer_name = "harmony-self-signed-issuer".to_string(); - - debug!("creating self signed issuer config"); - let self_signed_config = CertificateManagementConfig { - namespace: Some(nats_cluster.namespace.to_string().clone()), - acme_issuer: None, - ca_issuer: None, - self_signed: true, - }; - - debug!("create ca issuer config"); - let root_ca_config = CertificateManagementConfig { - namespace: Some(nats_cluster.namespace.clone().into()), - acme_issuer: None, - ca_issuer: Some(CaIssuer { - secret_name: format!("{}-tls", root_ca_cert_name), - }), - self_signed: false, - }; - - debug!( - "creating harmony self signed issuer {}", - self_signed_issuer_name - ); - self.create_issuer(self_signed_issuer_name.clone(), &self_signed_config) - .await - .map_err(|e| format!("{}", e))?; - - debug!( - "creating root ca cert from self signed issuer {}", - &root_ca_cert_name - ); - self.create_certificate( - root_ca_cert_name.clone(), - self_signed_issuer_name.clone(), - Some(format!("harmony-{}-ca", nats_cluster.name)), - None, - Some(true), - &root_ca_config, - ) - .await - .map_err(|e| format!("{}", e))?; - - debug!("creating root_ca_issuer {}", &ca_issuer_name); - self.create_issuer(ca_issuer_name.clone(), &root_ca_config) - .await - .map_err(|e| format!("{e}"))?; - - debug!( - "creating tls cert for nats gateway {}", - nats_cluster.tls_cert_name - ); - self.create_certificate( - nats_cluster.tls_cert_name.into(), - ca_issuer_name, - None, - Some(vec![nats_cluster.dns_name.clone()]), - Some(false), - &root_ca_config, - ) - .await - .map_err(|e| format!("{e}"))?; - - info!( - "Successfully created nats cluster certificates in cluster {}", - nats_cluster.name - ); - - let ca_cert = self - .get_ca_certificate(root_ca_cert_name, &root_ca_config) - .await - .map_err(|e| format!("{e}"))?; - - Ok(ca_cert) - } - - async fn install_local_ca_bundle( - &self, - nats_cluster: NatsCluster, - ca_certs: Option>, - ) -> Result { - async fn build_secret_data(bundle: &Vec) -> BTreeMap { - let mut data = BTreeMap::new(); - - data.insert( - "ca.crt".to_string(), - ByteString(bundle.join("\n").into_bytes()), - ); - - data - } - - match ca_certs { - Some(certs) => { - let bundle_secret = Secret { - metadata: ObjectMeta { - name: Some(nats_cluster.supercluster_ca_secret_name.to_string()), - namespace: Some(nats_cluster.namespace.to_string()), - ..Default::default() - }, - data: Some(build_secret_data(&certs).await), - immutable: Some(false), - type_: Some("Opaque".to_string()), - string_data: None, - }; - K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone())) - .interpret(&Inventory::empty(), self) - .await - .map_err(|e| format!("{e}"))?; - - Ok(format!( - "successfully created ca bundle {} in ns: {}", - nats_cluster.supercluster_ca_secret_name, nats_cluster.namespace - )) - } - None => Ok("no certs to create".to_string()), - } - } } diff --git a/harmony/src/modules/nats/capability.rs b/harmony/src/modules/nats/capability.rs index 838d8287..baea1a2f 100644 --- a/harmony/src/modules/nats/capability.rs +++ b/harmony/src/modules/nats/capability.rs @@ -9,36 +9,19 @@ pub trait Nats { &self, nats_cluster: &NatsCluster, peers: Option>, - ) -> Result; - - async fn get_nats_endpoint(&self) -> Result; - - async fn get_local_nats_ca_certificate( - &self, - nats_cluster: &NatsCluster, - ) -> Result; - - async fn install_local_ca_bundle( - &self, - nats_cluster: NatsCluster, - ca_certs: Option>, + ca_bundle: Option>, ) -> Result; } #[async_trait] pub trait NatsSupercluster { - async fn deploy_supercluster(&self, nats_clusters: Vec) -> Result; + async fn deploy_site(&self, nats_clusters: Vec, ca_bundle: Vec) -> Result; - async fn distribute_ca_bundle( + async fn create_site_ca_bundle( &self, nats_clusters: Vec, ca_certs: Option>, - ) -> Result; -} - -pub struct SiteContext { - pub topology: T, - pub cluster: NatsCluster, + ) -> Result, String>; } #[derive(Debug, Clone, Serialize)] diff --git a/harmony/src/modules/nats/decentralized.rs b/harmony/src/modules/nats/decentralized.rs index f917c9d5..a57cf0f0 100644 --- a/harmony/src/modules/nats/decentralized.rs +++ b/harmony/src/modules/nats/decentralized.rs @@ -2,36 +2,22 @@ use async_trait::async_trait; use log::debug; use crate::{ - modules::nats::capability::{Nats, NatsCluster, NatsEndpoint, NatsSupercluster}, + modules::{cert_manager::capability::CertificateManagement, nats::{capability::{Nats, NatsCluster, NatsSupercluster}, pki::NatsPkiPolicy}}, topology::{TlsRouter, Topology, decentralized::DecentralizedTopology}, }; #[async_trait] -impl NatsSupercluster for DecentralizedTopology { - async fn deploy_supercluster(&self, nats_clusters: Vec) -> Result { - for (i, (site, cluster)) in self.sites.iter().zip(nats_clusters.iter()).enumerate() { - let peers: Vec = nats_clusters - .iter() - .enumerate() - .filter_map(|(j, c)| if i == j { None } else { Some(c.clone()) }) - .collect(); - - site.deploy(cluster, Some(peers)).await?; - } - - Ok(format!("Deployed Nats clusters across sites",)) - } - - async fn distribute_ca_bundle( +impl NatsSupercluster for DecentralizedTopology { + async fn create_site_ca_bundle( &self, nats_clusters: Vec, ca_certs: Option>, - ) -> Result { + ) -> Result, String> { debug!("extracting nats clusters"); let mut ca_bundle = Vec::new(); for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { - let ca_cert = site.get_local_nats_ca_certificate(&cluster).await?; - ca_bundle.push(ca_cert); + let policy = NatsPkiPolicy{ topology: site}; + ca_bundle.push(policy.ensure_nats_ca(cluster).await.unwrap()); } if let Some(mut certs) = ca_certs { @@ -40,13 +26,23 @@ impl NatsSupercluster for Decentralize debug!("creating ca bundle {:#?} on site ", ca_bundle); - for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { - site.install_local_ca_bundle(cluster.clone(), Some(ca_bundle.clone())) - .await?; + Ok(ca_bundle) + } + async fn deploy_site( + &self, + nats_clusters: Vec, + ca_bundle: Vec, + ) -> Result { + for (i, (site, cluster)) in self.sites.iter().zip(nats_clusters.iter()).enumerate() { + let peers: Vec = nats_clusters + .iter() + .enumerate() + .filter_map(|(j, c)| if i == j { None } else { Some(c.clone()) }) + .collect(); + + site.deploy(cluster, Some(peers), Some(ca_bundle.clone())).await?; } - Ok(format!( - "Successfully deployed nats ca bundle accross clusters", - )) + Ok(format!("Deployed Nats clusters across sites",)) } } diff --git a/harmony/src/modules/nats/mod.rs b/harmony/src/modules/nats/mod.rs index 1822f1f5..021ab26c 100644 --- a/harmony/src/modules/nats/mod.rs +++ b/harmony/src/modules/nats/mod.rs @@ -2,3 +2,4 @@ pub mod capability; pub mod decentralized; pub mod score_nats_k8s; pub mod score_nats_supercluster; +pub mod pki; diff --git a/harmony/src/modules/nats/pki.rs b/harmony/src/modules/nats/pki.rs new file mode 100644 index 00000000..5c281399 --- /dev/null +++ b/harmony/src/modules/nats/pki.rs @@ -0,0 +1,74 @@ +use crate::modules::{cert_manager::{capability::{CertificateManagement, CertificateManagementConfig}, crd::CaIssuer}, nats::capability::NatsCluster}; + +pub struct NatsPkiPolicy<'a, T> { + pub topology: &'a T, +} + +impl<'a, T> NatsPkiPolicy<'a, T> +where + T: CertificateManagement, +{ + pub async fn ensure_nats_ca( + &self, + cluster: &NatsCluster, + ) -> Result { + let ca_issuer_name = "harmony-ca-issuer"; + let root_ca_cert_name = "harmony-root-ca"; + let self_signed_issuer_name = "harmony-self-signed-issuer"; + + let self_signed_config = CertificateManagementConfig { + namespace: Some(cluster.namespace.clone()), + acme_issuer: None, + ca_issuer: None, + self_signed: true, + }; + + let root_ca_config = CertificateManagementConfig { + namespace: Some(cluster.namespace.clone()), + acme_issuer: None, + ca_issuer: Some(CaIssuer { + secret_name: format!("{}-tls", root_ca_cert_name), + }), + self_signed: false, + }; + + self.topology + .create_issuer(self_signed_issuer_name.into(), &self_signed_config) + .await + .map_err(|e| e.to_string())?; + + self.topology + .create_certificate( + root_ca_cert_name.into(), + self_signed_issuer_name.into(), + Some(format!("harmony-{}-ca", cluster.name)), + None, + Some(true), + &root_ca_config, + ) + .await + .map_err(|e| e.to_string())?; + + self.topology + .create_issuer(ca_issuer_name.into(), &root_ca_config) + .await + .map_err(|e| e.to_string())?; + + self.topology + .create_certificate( + cluster.tls_cert_name.into(), + ca_issuer_name.into(), + None, + Some(vec![cluster.dns_name.clone()]), + Some(false), + &root_ca_config, + ) + .await + .map_err(|e| e.to_string())?; + + self.topology + .get_ca_certificate(root_ca_cert_name.into(), &root_ca_config) + .await + .map_err(|e| e.to_string()) + } +} diff --git a/harmony/src/modules/nats/score_nats_k8s.rs b/harmony/src/modules/nats/score_nats_k8s.rs index 1a22cda3..cad35e38 100644 --- a/harmony/src/modules/nats/score_nats_k8s.rs +++ b/harmony/src/modules/nats/score_nats_k8s.rs @@ -1,10 +1,12 @@ -use std::str::FromStr; +use std::{collections::BTreeMap, str::FromStr}; use async_trait::async_trait; use harmony_macros::hurl; use harmony_secret::{Secret, SecretManager}; use harmony_types::id::Id; -use log::debug; +use k8s_openapi::{ByteString, api::core::v1::Secret as K8sSecret}; +use kube::api::ObjectMeta; +use log::{debug, info}; use non_blank_string_rs::NonBlankString; use schemars::JsonSchema; use serde::{Deserialize, Serialize}; @@ -15,7 +17,7 @@ use crate::{ inventory::Inventory, modules::{ helm::chart::{HelmChartScore, HelmRepository}, - k8s::ingress::K8sIngressScore, + k8s::{ingress::K8sIngressScore, resource::K8sResourceScore}, nats::capability::{Nats, NatsCluster, NatsEndpoint}, okd::{ crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig}, @@ -23,7 +25,7 @@ use crate::{ }, }, score::Score, - topology::{HelmCommand, K8sclient, KubernetesDistribution, Topology}, + topology::{HelmCommand, K8sclient, KubernetesDistribution, TlsRouter, Topology}, }; #[derive(Debug, Clone, Serialize)] @@ -31,9 +33,10 @@ pub struct NatsK8sScore { pub distribution: KubernetesDistribution, pub cluster: NatsCluster, pub peers: Option>, + pub ca_bundle: Option>, } -impl Score for NatsK8sScore { +impl Score for NatsK8sScore { fn name(&self) -> String { "NatsK8sScore".to_string() } @@ -50,13 +53,21 @@ pub struct NatsK8sInterpret { } #[async_trait] -impl Interpret for NatsK8sInterpret { +impl Interpret + for NatsK8sInterpret +{ async fn execute( &self, inventory: &Inventory, topology: &T, ) -> Result { - let domain = topology.get_nats_endpoint().await?; + let domain = topology + .get_internal_domain() + .await? + .ok_or(InterpretError::new("no internal domain found".to_string())) + .unwrap(); + + info!("creating ingress for nats cluster"); self.create_ingress( topology, inventory, @@ -65,6 +76,18 @@ impl Interpret for Na ) .await?; + let domain = NatsEndpoint { host: domain }; + + info!("creating nats ca bundle secret"); + self.create_ca_bundle_secret( + topology, + inventory, + self.score.cluster.clone(), + self.score.ca_bundle.clone(), + ) + .await?; + + info!("deploying nats cluster"); self.deploy_nats( topology, inventory, @@ -80,7 +103,7 @@ impl Interpret for Na } fn get_name(&self) -> InterpretName { - todo!() + InterpretName::Custom("NatsK8sInterpret") } fn get_version(&self) -> Version { @@ -147,6 +170,47 @@ impl NatsK8sInterpret { } } + async fn create_ca_bundle_secret( + &self, + topology: &T, + inventory: &Inventory, + nats_cluster: NatsCluster, + ca_bundle: Option>, + ) -> Result { + async fn build_secret_data(bundle: &Vec) -> BTreeMap { + let mut data = BTreeMap::new(); + + data.insert( + "ca.crt".to_string(), + ByteString(bundle.join("\n").into_bytes()), + ); + + data + } + + match ca_bundle { + Some(certs) => { + let bundle_secret = K8sSecret { + metadata: ObjectMeta { + name: Some(nats_cluster.supercluster_ca_secret_name.to_string()), + namespace: Some(nats_cluster.namespace.to_string()), + ..Default::default() + }, + data: Some(build_secret_data(&certs).await), + immutable: Some(false), + type_: Some("Opaque".to_string()), + string_data: None, + }; + + K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone())) + .interpret(inventory, topology) + .await + .map_err(|e| InterpretError::new(e.to_string())) + } + None => Ok(Outcome::noop("no certs to create".to_string())), + } + } + async fn deploy_nats( &self, topology: &T, @@ -157,7 +221,6 @@ impl NatsK8sInterpret { namespace: String, ) -> Result { let mut gateway_gateways = String::new(); - let admin = SecretManager::get_or_prompt::().await.unwrap(); let user = admin.user.clone(); let password = admin.password.clone(); diff --git a/harmony/src/modules/nats/score_nats_supercluster.rs b/harmony/src/modules/nats/score_nats_supercluster.rs index e6ef64e1..8ff244a3 100644 --- a/harmony/src/modules/nats/score_nats_supercluster.rs +++ b/harmony/src/modules/nats/score_nats_supercluster.rs @@ -45,14 +45,14 @@ impl Interpret for NatsSuperclusterInterpret topology: &T, ) -> Result { info!("creating nats supercluster ca bundle"); - topology - .distribute_ca_bundle(self.nats_clusters.clone(), self.ca_certs.clone()) + let ca_bundle = topology + .create_site_ca_bundle(self.nats_clusters.clone(), self.ca_certs.clone()) .await .map_err(|e| InterpretError::new(e))?; info!("deploying nats supercluster"); let cluster_name = topology - .deploy_supercluster(self.nats_clusters.clone()) + .deploy_site(self.nats_clusters.clone(), ca_bundle) .await .map_err(|e| InterpretError::new(e))?; -- 2.39.5 From 329d5d8473f44628f4a3044df6b6a9cecdecb90d Mon Sep 17 00:00:00 2001 From: wjro Date: Fri, 30 Jan 2026 13:42:01 -0500 Subject: [PATCH 3/4] fix: format --- harmony/src/modules/nats/capability.rs | 6 +++++- harmony/src/modules/nats/decentralized.rs | 17 +++++++++++++---- harmony/src/modules/nats/mod.rs | 2 +- harmony/src/modules/nats/pki.rs | 13 ++++++++----- 4 files changed, 27 insertions(+), 11 deletions(-) diff --git a/harmony/src/modules/nats/capability.rs b/harmony/src/modules/nats/capability.rs index baea1a2f..2988a2a0 100644 --- a/harmony/src/modules/nats/capability.rs +++ b/harmony/src/modules/nats/capability.rs @@ -15,7 +15,11 @@ pub trait Nats { #[async_trait] pub trait NatsSupercluster { - async fn deploy_site(&self, nats_clusters: Vec, ca_bundle: Vec) -> Result; + async fn deploy_site( + &self, + nats_clusters: Vec, + ca_bundle: Vec, + ) -> Result; async fn create_site_ca_bundle( &self, diff --git a/harmony/src/modules/nats/decentralized.rs b/harmony/src/modules/nats/decentralized.rs index a57cf0f0..7f805c18 100644 --- a/harmony/src/modules/nats/decentralized.rs +++ b/harmony/src/modules/nats/decentralized.rs @@ -2,12 +2,20 @@ use async_trait::async_trait; use log::debug; use crate::{ - modules::{cert_manager::capability::CertificateManagement, nats::{capability::{Nats, NatsCluster, NatsSupercluster}, pki::NatsPkiPolicy}}, + modules::{ + cert_manager::capability::CertificateManagement, + nats::{ + capability::{Nats, NatsCluster, NatsSupercluster}, + pki::NatsPkiPolicy, + }, + }, topology::{TlsRouter, Topology, decentralized::DecentralizedTopology}, }; #[async_trait] -impl NatsSupercluster for DecentralizedTopology { +impl NatsSupercluster + for DecentralizedTopology +{ async fn create_site_ca_bundle( &self, nats_clusters: Vec, @@ -16,7 +24,7 @@ impl NatsSuper debug!("extracting nats clusters"); let mut ca_bundle = Vec::new(); for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { - let policy = NatsPkiPolicy{ topology: site}; + let policy = NatsPkiPolicy { topology: site }; ca_bundle.push(policy.ensure_nats_ca(cluster).await.unwrap()); } @@ -40,7 +48,8 @@ impl NatsSuper .filter_map(|(j, c)| if i == j { None } else { Some(c.clone()) }) .collect(); - site.deploy(cluster, Some(peers), Some(ca_bundle.clone())).await?; + site.deploy(cluster, Some(peers), Some(ca_bundle.clone())) + .await?; } Ok(format!("Deployed Nats clusters across sites",)) diff --git a/harmony/src/modules/nats/mod.rs b/harmony/src/modules/nats/mod.rs index 021ab26c..6758c77b 100644 --- a/harmony/src/modules/nats/mod.rs +++ b/harmony/src/modules/nats/mod.rs @@ -1,5 +1,5 @@ pub mod capability; pub mod decentralized; +pub mod pki; pub mod score_nats_k8s; pub mod score_nats_supercluster; -pub mod pki; diff --git a/harmony/src/modules/nats/pki.rs b/harmony/src/modules/nats/pki.rs index 5c281399..0759c9b4 100644 --- a/harmony/src/modules/nats/pki.rs +++ b/harmony/src/modules/nats/pki.rs @@ -1,4 +1,10 @@ -use crate::modules::{cert_manager::{capability::{CertificateManagement, CertificateManagementConfig}, crd::CaIssuer}, nats::capability::NatsCluster}; +use crate::modules::{ + cert_manager::{ + capability::{CertificateManagement, CertificateManagementConfig}, + crd::CaIssuer, + }, + nats::capability::NatsCluster, +}; pub struct NatsPkiPolicy<'a, T> { pub topology: &'a T, @@ -8,10 +14,7 @@ impl<'a, T> NatsPkiPolicy<'a, T> where T: CertificateManagement, { - pub async fn ensure_nats_ca( - &self, - cluster: &NatsCluster, - ) -> Result { + pub async fn ensure_nats_ca(&self, cluster: &NatsCluster) -> Result { let ca_issuer_name = "harmony-ca-issuer"; let root_ca_cert_name = "harmony-root-ca"; let self_signed_issuer_name = "harmony-self-signed-issuer"; -- 2.39.5 From 8b200cfe9144edf7023df56e8ce45ccf35667332 Mon Sep 17 00:00:00 2001 From: wjro Date: Fri, 30 Jan 2026 14:02:52 -0500 Subject: [PATCH 4/4] chore: removed commented code --- .../topology/k8s_anywhere/k8s_anywhere.rs | 58 ------------------- 1 file changed, 58 deletions(-) diff --git a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs index 13c1a953..4f735dbc 100644 --- a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs @@ -568,64 +568,6 @@ impl K8sAnywhereTopology { } } - // async fn build_ca_bundle_secret( - // &self, - // namespace: &str, - // nats_cluster: &NatsCluster, - // bundle: &Vec, - // ) -> Secret { - // Secret { - // metadata: ObjectMeta { - // name: Some(nats_cluster.supercluster_ca_secret_name.to_string()), - // namespace: Some(namespace.to_string()), - // ..Default::default() - // }, - // data: Some(self.build_secret_data(bundle).await), - // immutable: Some(false), - // type_: Some("Opaque".to_string()), - // string_data: None, - // } - // } - // - // async fn build_secret_data(&self, bundle: &Vec) -> BTreeMap { - // let mut data = BTreeMap::new(); - // - // data.insert( - // "ca.crt".to_string(), - // ByteString(bundle.join("\n").into_bytes()), - // ); - // - // data - // } - // - // pub async fn create_ca_bundle_secret( - // &self, - // nats_cluster: &NatsCluster, - // ca_bundle: &Vec, - // ) -> Result { - // let bundle_secret = self - // .build_ca_bundle_secret(&nats_cluster.namespace, nats_cluster, ca_bundle) - // .await; - // - // debug!( - // "deploying secret to ns: {} \nsecret: {:#?}", - // nats_cluster.namespace, bundle_secret - // ); - // - // let k8ssecret = - // K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone())); - // - // k8ssecret - // .interpret(&Inventory::empty(), self) - // .await - // .map_err(|e| ExecutorError::UnexpectedError(format!("{e}")))?; - // - // Ok(Outcome::success(format!( - // "successfully craeted ca bundle {} in ns: {}", - // nats_cluster.supercluster_ca_secret_name, nats_cluster.namespace - // ))) - // } - pub async fn certificate_issuer_ready( &self, issuer_name: String, -- 2.39.5