diff --git a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs index ee705e15..4f735dbc 100644 --- a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs @@ -86,7 +86,7 @@ struct K8sState { message: String, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub enum KubernetesDistribution { OpenshiftFamily, K3sFamily, @@ -568,64 +568,6 @@ impl K8sAnywhereTopology { } } - async fn build_ca_bundle_secret( - &self, - namespace: &str, - nats_cluster: &NatsCluster, - bundle: &Vec, - ) -> Secret { - Secret { - metadata: ObjectMeta { - name: Some(nats_cluster.supercluster_ca_secret_name.to_string()), - namespace: Some(namespace.to_string()), - ..Default::default() - }, - data: Some(self.build_secret_data(bundle).await), - immutable: Some(false), - type_: Some("Opaque".to_string()), - string_data: None, - } - } - - async fn build_secret_data(&self, bundle: &Vec) -> BTreeMap { - let mut data = BTreeMap::new(); - - data.insert( - "ca.crt".to_string(), - ByteString(bundle.join("\n").into_bytes()), - ); - - data - } - - pub async fn create_ca_bundle_secret( - &self, - nats_cluster: &NatsCluster, - ca_bundle: &Vec, - ) -> Result { - let bundle_secret = self - .build_ca_bundle_secret(&nats_cluster.namespace, nats_cluster, ca_bundle) - .await; - - debug!( - "deploying secret to ns: {} \nsecret: {:#?}", - nats_cluster.namespace, bundle_secret - ); - - let k8ssecret = - K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone())); - - k8ssecret - .interpret(&Inventory::empty(), self) - .await - .map_err(|e| ExecutorError::UnexpectedError(format!("{e}")))?; - - Ok(Outcome::success(format!( - "successfully craeted ca bundle {} in ns: {}", - nats_cluster.supercluster_ca_secret_name, nats_cluster.namespace - ))) - } - pub async fn certificate_issuer_ready( &self, issuer_name: String, diff --git a/harmony/src/domain/topology/k8s_anywhere/nats.rs b/harmony/src/domain/topology/k8s_anywhere/nats.rs index 7e1f24cb..113ad74f 100644 --- a/harmony/src/domain/topology/k8s_anywhere/nats.rs +++ b/harmony/src/domain/topology/k8s_anywhere/nats.rs @@ -1,28 +1,13 @@ -use std::time::Duration; - use async_trait::async_trait; -use log::{debug, info}; -use tokio_retry::{Retry, strategy::ExponentialBackoff}; use crate::{ inventory::Inventory, - modules::{ - cert_manager::{ - capability::{CertificateManagement, CertificateManagementConfig}, - crd::CaIssuer, - }, - k8s::ingress::K8sIngressScore, - nats::{ - capability::{Nats, NatsCluster, NatsEndpoint, NatsSupercluster}, - score_nats_k8s::NatsK8sScore, - }, - okd::{ - crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig}, - route::OKDRouteScore, - }, + modules::nats::{ + capability::{Nats, NatsCluster}, + score_nats_k8s::NatsK8sScore, }, score::Score, - topology::{K8sAnywhereTopology, KubernetesDistribution, TlsRouter}, + topology::K8sAnywhereTopology, }; #[async_trait] @@ -31,77 +16,15 @@ impl Nats for K8sAnywhereTopology { &self, nats_cluster: &NatsCluster, peers: Option>, + ca_bundle: Option>, ) -> Result { - let domain = self.get_nats_endpoint(nats_cluster).await?; - match self - .get_k8s_distribution() - .await - .map_err(|e| format!("Error getting k8s distribution : {}", e.to_string()))? - { - KubernetesDistribution::OpenshiftFamily => { - let route = OKDRouteScore { - name: nats_cluster.name.to_string(), - namespace: nats_cluster.namespace.to_string(), - spec: RouteSpec { - to: RouteTargetReference { - kind: "Service".to_string(), - name: format!("{}", nats_cluster.name.to_string()), - weight: Some(100), - }, - host: Some(nats_cluster.dns_name.clone()), - port: Some(RoutePort { target_port: 7222 }), - tls: Some(TLSConfig { - insecure_edge_termination_policy: None, - termination: "passthrough".to_string(), - ..Default::default() - }), - wildcard_policy: None, - ..Default::default() - }, - }; - route - .interpret(&Inventory::empty(), self) - .await - .map_err(|e| format!("Failed to deploy OKD Route: {e}"))?; - } - KubernetesDistribution::K3sFamily => { - //TODO untested - K8sIngressScore { - name: todo!(), - host: todo!(), - backend_service: todo!(), - port: todo!(), - path: todo!(), - path_type: todo!(), - namespace: todo!(), - ingress_class_name: todo!(), - } - .interpret(&Inventory::empty(), self) - .await - .map_err(|e| format!("Failed to deploy K8s ingress: {e}"))?; - } - KubernetesDistribution::Default => { - //TODO untested - K8sIngressScore { - name: todo!(), - host: todo!(), - backend_service: todo!(), - port: todo!(), - path: todo!(), - path_type: todo!(), - namespace: todo!(), - ingress_class_name: todo!(), - } - .interpret(&Inventory::empty(), self) - .await - .map_err(|e| format!("Failed to deploy K8s ingress: {e}"))?; - } - } + let distro = self.get_k8s_distribution().await.unwrap(); NatsK8sScore { + distribution: distro.clone(), cluster: nats_cluster.clone(), - domain, peers, + ca_bundle, } .interpret(&Inventory::empty(), self) .await @@ -112,126 +35,4 @@ impl Nats for K8sAnywhereTopology { nats_cluster.namespace )) } - - async fn get_nats_endpoint(&self, nats_cluster: &NatsCluster) -> Result { - let endpoint = self.get_internal_domain().await?.unwrap(); - - Ok(NatsEndpoint { host: endpoint }) - } - - async fn get_local_nats_ca_certificate( - &self, - nats_cluster: &NatsCluster, - ) -> Result { - let ca_issuer_name = "harmony-ca-issuer".to_string(); - let root_ca_cert_name = "harmony-root-ca".to_string(); - let self_signed_issuer_name = "harmony-self-signed-issuer".to_string(); - - debug!("creating self signed issuer config"); - let self_signed_config = CertificateManagementConfig { - namespace: Some(nats_cluster.namespace.to_string().clone()), - acme_issuer: None, - ca_issuer: None, - self_signed: true, - }; - - debug!("create ca issuer config"); - let root_ca_config = CertificateManagementConfig { - namespace: Some(nats_cluster.namespace.clone().into()), - acme_issuer: None, - ca_issuer: Some(CaIssuer { - secret_name: format!("{}-tls", root_ca_cert_name), - }), - self_signed: false, - }; - - debug!( - "creating harmony self signed issuer {}", - self_signed_issuer_name - ); - self.create_issuer(self_signed_issuer_name.clone(), &self_signed_config) - .await - .map_err(|e| format!("{}", e))?; - - debug!( - "creating root ca cert from self signed issuer {}", - &root_ca_cert_name - ); - self.create_certificate( - root_ca_cert_name.clone(), - self_signed_issuer_name.clone(), - Some(format!("harmony-{}-ca", nats_cluster.name)), - None, - Some(true), - &root_ca_config, - ) - .await - .map_err(|e| format!("{}", e))?; - - debug!("creating root_ca_issuer {}", &ca_issuer_name); - self.create_issuer(ca_issuer_name.clone(), &root_ca_config) - .await - .map_err(|e| format!("{e}"))?; - - debug!( - "creating tls cert for nats gateway {}", - nats_cluster.tls_cert_name - ); - self.create_certificate( - nats_cluster.tls_cert_name.into(), - ca_issuer_name, - None, - Some(vec![nats_cluster.dns_name.clone()]), - Some(false), - &root_ca_config, - ) - .await - .map_err(|e| format!("{e}"))?; - - info!( - "Successfully created nats cluster certificates in cluster {}", - nats_cluster.name - ); - - let strategy = ExponentialBackoff::from_millis(250) - .factor(2) - .max_delay(Duration::from_millis(1000)) - .take(10); - - let ca_cert = Retry::spawn(strategy, || async { - log::debug!("Attempting CA cert fetch"); - - let res = self - .get_ca_certificate(root_ca_cert_name.clone(), &root_ca_config) - .await; - - match res { - Ok(cert) => Ok(cert), - Err(e) => { - log::warn!("Retryable error: {:?}", e); - Err(e) - } - } - }) - .await - .map_err(|e| format!("Retries exhausted: {:?}", e))?; - - Ok(ca_cert) - } - - async fn install_local_ca_bundle( - &self, - nats_cluster: NatsCluster, - ca_certs: Option>, - ) -> Result { - match ca_certs { - Some(certs) => { - self.create_ca_bundle_secret(&nats_cluster, &certs) - .await - .map_err(|e| e.to_string())?; - Ok("Successfully created ca bundle secret".to_string()) - } - None => Ok("no certs to create".to_string()), - } - } } diff --git a/harmony/src/modules/nats/capability.rs b/harmony/src/modules/nats/capability.rs index fcdf8d91..2988a2a0 100644 --- a/harmony/src/modules/nats/capability.rs +++ b/harmony/src/modules/nats/capability.rs @@ -9,36 +9,23 @@ pub trait Nats { &self, nats_cluster: &NatsCluster, peers: Option>, - ) -> Result; - - async fn get_nats_endpoint(&self, nats_cluster: &NatsCluster) -> Result; - - async fn get_local_nats_ca_certificate( - &self, - nats_cluster: &NatsCluster, - ) -> Result; - - async fn install_local_ca_bundle( - &self, - nats_cluster: NatsCluster, - ca_certs: Option>, + ca_bundle: Option>, ) -> Result; } #[async_trait] pub trait NatsSupercluster { - async fn deploy_supercluster(&self, nats_clusters: Vec) -> Result; + async fn deploy_site( + &self, + nats_clusters: Vec, + ca_bundle: Vec, + ) -> Result; - async fn distribute_ca_bundle( + async fn create_site_ca_bundle( &self, nats_clusters: Vec, ca_certs: Option>, - ) -> Result; -} - -pub struct SiteContext { - pub topology: T, - pub cluster: NatsCluster, + ) -> Result, String>; } #[derive(Debug, Clone, Serialize)] diff --git a/harmony/src/modules/nats/decentralized.rs b/harmony/src/modules/nats/decentralized.rs index 0f7e68a7..7f805c18 100644 --- a/harmony/src/modules/nats/decentralized.rs +++ b/harmony/src/modules/nats/decentralized.rs @@ -2,13 +2,45 @@ use async_trait::async_trait; use log::debug; use crate::{ - modules::nats::capability::{Nats, NatsCluster, NatsEndpoint, NatsSupercluster}, + modules::{ + cert_manager::capability::CertificateManagement, + nats::{ + capability::{Nats, NatsCluster, NatsSupercluster}, + pki::NatsPkiPolicy, + }, + }, topology::{TlsRouter, Topology, decentralized::DecentralizedTopology}, }; #[async_trait] -impl NatsSupercluster for DecentralizedTopology { - async fn deploy_supercluster(&self, nats_clusters: Vec) -> Result { +impl NatsSupercluster + for DecentralizedTopology +{ + async fn create_site_ca_bundle( + &self, + nats_clusters: Vec, + ca_certs: Option>, + ) -> Result, String> { + debug!("extracting nats clusters"); + let mut ca_bundle = Vec::new(); + for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { + let policy = NatsPkiPolicy { topology: site }; + ca_bundle.push(policy.ensure_nats_ca(cluster).await.unwrap()); + } + + if let Some(mut certs) = ca_certs { + ca_bundle.append(&mut certs) + } + + debug!("creating ca bundle {:#?} on site ", ca_bundle); + + Ok(ca_bundle) + } + async fn deploy_site( + &self, + nats_clusters: Vec, + ca_bundle: Vec, + ) -> Result { for (i, (site, cluster)) in self.sites.iter().zip(nats_clusters.iter()).enumerate() { let peers: Vec = nats_clusters .iter() @@ -16,37 +48,10 @@ impl NatsSupercluster for Decentralize .filter_map(|(j, c)| if i == j { None } else { Some(c.clone()) }) .collect(); - site.deploy(cluster, Some(peers)).await?; + site.deploy(cluster, Some(peers), Some(ca_bundle.clone())) + .await?; } Ok(format!("Deployed Nats clusters across sites",)) } - - async fn distribute_ca_bundle( - &self, - nats_clusters: Vec, - ca_certs: Option>, - ) -> Result { - debug!("extracting nats clusters"); - let mut ca_bundle = Vec::new(); - for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { - let ca_cert = site.get_local_nats_ca_certificate(&cluster).await?; - ca_bundle.push(ca_cert); - } - - if let Some(mut certs) = ca_certs { - ca_bundle.append(&mut certs) - } - - debug!("creating ca bundle {:#?} on site 1", ca_bundle); - - for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { - site.install_local_ca_bundle(cluster.clone(), Some(ca_bundle.clone())) - .await?; - } - - Ok(format!( - "Successfully deployed nats ca bundle accross clusters", - )) - } } diff --git a/harmony/src/modules/nats/mod.rs b/harmony/src/modules/nats/mod.rs index 1822f1f5..6758c77b 100644 --- a/harmony/src/modules/nats/mod.rs +++ b/harmony/src/modules/nats/mod.rs @@ -1,4 +1,5 @@ pub mod capability; pub mod decentralized; +pub mod pki; pub mod score_nats_k8s; pub mod score_nats_supercluster; diff --git a/harmony/src/modules/nats/pki.rs b/harmony/src/modules/nats/pki.rs new file mode 100644 index 00000000..0759c9b4 --- /dev/null +++ b/harmony/src/modules/nats/pki.rs @@ -0,0 +1,77 @@ +use crate::modules::{ + cert_manager::{ + capability::{CertificateManagement, CertificateManagementConfig}, + crd::CaIssuer, + }, + nats::capability::NatsCluster, +}; + +pub struct NatsPkiPolicy<'a, T> { + pub topology: &'a T, +} + +impl<'a, T> NatsPkiPolicy<'a, T> +where + T: CertificateManagement, +{ + pub async fn ensure_nats_ca(&self, cluster: &NatsCluster) -> Result { + let ca_issuer_name = "harmony-ca-issuer"; + let root_ca_cert_name = "harmony-root-ca"; + let self_signed_issuer_name = "harmony-self-signed-issuer"; + + let self_signed_config = CertificateManagementConfig { + namespace: Some(cluster.namespace.clone()), + acme_issuer: None, + ca_issuer: None, + self_signed: true, + }; + + let root_ca_config = CertificateManagementConfig { + namespace: Some(cluster.namespace.clone()), + acme_issuer: None, + ca_issuer: Some(CaIssuer { + secret_name: format!("{}-tls", root_ca_cert_name), + }), + self_signed: false, + }; + + self.topology + .create_issuer(self_signed_issuer_name.into(), &self_signed_config) + .await + .map_err(|e| e.to_string())?; + + self.topology + .create_certificate( + root_ca_cert_name.into(), + self_signed_issuer_name.into(), + Some(format!("harmony-{}-ca", cluster.name)), + None, + Some(true), + &root_ca_config, + ) + .await + .map_err(|e| e.to_string())?; + + self.topology + .create_issuer(ca_issuer_name.into(), &root_ca_config) + .await + .map_err(|e| e.to_string())?; + + self.topology + .create_certificate( + cluster.tls_cert_name.into(), + ca_issuer_name.into(), + None, + Some(vec![cluster.dns_name.clone()]), + Some(false), + &root_ca_config, + ) + .await + .map_err(|e| e.to_string())?; + + self.topology + .get_ca_certificate(root_ca_cert_name.into(), &root_ca_config) + .await + .map_err(|e| e.to_string()) + } +} diff --git a/harmony/src/modules/nats/score_nats_k8s.rs b/harmony/src/modules/nats/score_nats_k8s.rs index bee4b43e..cad35e38 100644 --- a/harmony/src/modules/nats/score_nats_k8s.rs +++ b/harmony/src/modules/nats/score_nats_k8s.rs @@ -1,52 +1,230 @@ -use std::str::FromStr; +use std::{collections::BTreeMap, str::FromStr}; +use async_trait::async_trait; use harmony_macros::hurl; -use log::debug; +use harmony_secret::{Secret, SecretManager}; +use harmony_types::id::Id; +use k8s_openapi::{ByteString, api::core::v1::Secret as K8sSecret}; +use kube::api::ObjectMeta; +use log::{debug, info}; use non_blank_string_rs::NonBlankString; -use serde::Serialize; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; use crate::{ - interpret::Interpret, + data::Version, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, modules::{ helm::chart::{HelmChartScore, HelmRepository}, - nats::capability::{NatsCluster, NatsEndpoint}, + k8s::{ingress::K8sIngressScore, resource::K8sResourceScore}, + nats::capability::{Nats, NatsCluster, NatsEndpoint}, + okd::{ + crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig}, + route::OKDRouteScore, + }, }, score::Score, - topology::{HelmCommand, Topology}, + topology::{HelmCommand, K8sclient, KubernetesDistribution, TlsRouter, Topology}, }; #[derive(Debug, Clone, Serialize)] pub struct NatsK8sScore { + pub distribution: KubernetesDistribution, pub cluster: NatsCluster, - pub domain: NatsEndpoint, pub peers: Option>, + pub ca_bundle: Option>, } -impl Score for NatsK8sScore { +impl Score for NatsK8sScore { fn name(&self) -> String { "NatsK8sScore".to_string() } fn create_interpret(&self) -> Box> { - self.build_deploy_nats_score( - self.cluster.clone(), - self.domain.clone(), - self.peers.clone(), - self.cluster.namespace.clone(), + Box::new(NatsK8sInterpret { + score: self.clone(), + }) + } +} +#[derive(Debug)] +pub struct NatsK8sInterpret { + score: NatsK8sScore, +} + +#[async_trait] +impl Interpret + for NatsK8sInterpret +{ + async fn execute( + &self, + inventory: &Inventory, + topology: &T, + ) -> Result { + let domain = topology + .get_internal_domain() + .await? + .ok_or(InterpretError::new("no internal domain found".to_string())) + .unwrap(); + + info!("creating ingress for nats cluster"); + self.create_ingress( + topology, + inventory, + self.score.distribution.clone(), + self.score.cluster.clone(), ) - .create_interpret() + .await?; + + let domain = NatsEndpoint { host: domain }; + + info!("creating nats ca bundle secret"); + self.create_ca_bundle_secret( + topology, + inventory, + self.score.cluster.clone(), + self.score.ca_bundle.clone(), + ) + .await?; + + info!("deploying nats cluster"); + self.deploy_nats( + topology, + inventory, + self.score.cluster.clone(), + domain, + self.score.peers.clone(), + self.score.cluster.namespace.clone(), + ) + .await?; + Ok(Outcome::success( + "successfully deployed nats K8s".to_string(), + )) + } + + fn get_name(&self) -> InterpretName { + InterpretName::Custom("NatsK8sInterpret") + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() } } -impl NatsK8sScore { - fn build_deploy_nats_score( +impl NatsK8sInterpret { + async fn create_ingress( &self, + topology: &T, + inventory: &Inventory, + distribution: KubernetesDistribution, + nats_cluster: NatsCluster, + ) -> Result { + match distribution { + KubernetesDistribution::OpenshiftFamily => { + OKDRouteScore { + name: nats_cluster.name.to_string(), + namespace: nats_cluster.namespace.to_string(), + spec: RouteSpec { + to: RouteTargetReference { + kind: "Service".to_string(), + name: format!("{}", nats_cluster.name.to_string()), + weight: Some(100), + }, + host: Some(nats_cluster.dns_name.clone()), + port: Some(RoutePort { target_port: 7222 }), + tls: Some(TLSConfig { + insecure_edge_termination_policy: None, + termination: "passthrough".to_string(), + ..Default::default() + }), + wildcard_policy: None, + ..Default::default() + }, + } + .interpret(inventory, topology) + .await + } + KubernetesDistribution::K3sFamily | KubernetesDistribution::Default => { + //TODO untested + K8sIngressScore { + name: todo!(), + host: todo!(), + backend_service: todo!(), + port: todo!(), + path: todo!(), + path_type: todo!(), + namespace: todo!(), + ingress_class_name: todo!(), + } + .interpret(inventory, topology) + .await + } + } + } + + async fn create_ca_bundle_secret( + &self, + topology: &T, + inventory: &Inventory, + nats_cluster: NatsCluster, + ca_bundle: Option>, + ) -> Result { + async fn build_secret_data(bundle: &Vec) -> BTreeMap { + let mut data = BTreeMap::new(); + + data.insert( + "ca.crt".to_string(), + ByteString(bundle.join("\n").into_bytes()), + ); + + data + } + + match ca_bundle { + Some(certs) => { + let bundle_secret = K8sSecret { + metadata: ObjectMeta { + name: Some(nats_cluster.supercluster_ca_secret_name.to_string()), + namespace: Some(nats_cluster.namespace.to_string()), + ..Default::default() + }, + data: Some(build_secret_data(&certs).await), + immutable: Some(false), + type_: Some("Opaque".to_string()), + string_data: None, + }; + + K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone())) + .interpret(inventory, topology) + .await + .map_err(|e| InterpretError::new(e.to_string())) + } + None => Ok(Outcome::noop("no certs to create".to_string())), + } + } + + async fn deploy_nats( + &self, + topology: &T, + inventory: &Inventory, cluster: NatsCluster, domain: NatsEndpoint, peers: Option>, namespace: String, - ) -> Box> { + ) -> Result { let mut gateway_gateways = String::new(); + let admin = SecretManager::get_or_prompt::().await.unwrap(); + let user = admin.user.clone(); + let password = admin.password.clone(); + match peers { Some(peers) => { for peer in peers { @@ -84,8 +262,8 @@ config: accounts: system: users: - - user: "admin" - password: "admin_2" + - user: "{user}" + password: "{password}" logtime: true debug: true trace: true @@ -134,6 +312,8 @@ natsBox: image: tag: nonroot"#, cluster_name = cluster.name, + user = user, + password = password, replicas = cluster.replicas, domain = domain, gateway_gateways = gateway_gateways, @@ -159,7 +339,12 @@ natsBox: true, )), }; - - Box::new(nats) + nats.interpret(inventory, topology).await } } + +#[derive(Secret, Serialize, Deserialize, JsonSchema, Debug, PartialEq, Clone)] +struct NatsAdmin { + user: String, + password: String, +} diff --git a/harmony/src/modules/nats/score_nats_supercluster.rs b/harmony/src/modules/nats/score_nats_supercluster.rs index e6ef64e1..8ff244a3 100644 --- a/harmony/src/modules/nats/score_nats_supercluster.rs +++ b/harmony/src/modules/nats/score_nats_supercluster.rs @@ -45,14 +45,14 @@ impl Interpret for NatsSuperclusterInterpret topology: &T, ) -> Result { info!("creating nats supercluster ca bundle"); - topology - .distribute_ca_bundle(self.nats_clusters.clone(), self.ca_certs.clone()) + let ca_bundle = topology + .create_site_ca_bundle(self.nats_clusters.clone(), self.ca_certs.clone()) .await .map_err(|e| InterpretError::new(e))?; info!("deploying nats supercluster"); let cluster_name = topology - .deploy_supercluster(self.nats_clusters.clone()) + .deploy_site(self.nats_clusters.clone(), ca_bundle) .await .map_err(|e| InterpretError::new(e))?;