diff --git a/Cargo.lock b/Cargo.lock index 2c444f85..c53f46c8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1874,6 +1874,22 @@ dependencies = [ "url", ] +[[package]] +name = "example-nats-module-supercluster" +version = "0.1.0" +dependencies = [ + "cidr", + "env_logger", + "harmony", + "harmony_cli", + "harmony_macros", + "harmony_types", + "k8s-openapi", + "log", + "tokio", + "url", +] + [[package]] name = "example-nats-supercluster" version = "0.1.0" @@ -2260,9 +2276,9 @@ dependencies = [ [[package]] name = "fqdn" -version = "0.4.6" +version = "0.5.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c0f5d7f7b3eed2f771fc7f6fcb651f9560d7b0c483d75876082acb4649d266b3" +checksum = "886ac788f62d16d6b0f26b2fa762b34ef16ebfb4b624c2c15fbcadc9173c0f72" dependencies = [ "punycode", "serde", @@ -2596,6 +2612,7 @@ dependencies = [ "tempfile", "thiserror 2.0.16", "tokio", + "tokio-retry", "tokio-util", "url", "uuid", @@ -6453,6 +6470,17 @@ dependencies = [ "syn 2.0.106", ] +[[package]] +name = "tokio-retry" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" +dependencies = [ + "pin-project", + "rand 0.8.5", + "tokio", +] + [[package]] name = "tokio-rustls" version = "0.24.1" diff --git a/examples/nats-module/Cargo.toml b/examples/nats-module/Cargo.toml new file mode 100644 index 00000000..73be5c70 --- /dev/null +++ b/examples/nats-module/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "example-nats-module-supercluster" +edition = "2024" +version.workspace = true +readme.workspace = true +license.workspace = true +publish = false + +[dependencies] +harmony = { path = "../../harmony" } +harmony_cli = { path = "../../harmony_cli" } +harmony_types = { path = "../../harmony_types" } +cidr = { workspace = true } +tokio = { workspace = true } +harmony_macros = { path = "../../harmony_macros" } +log = { workspace = true } +env_logger = { workspace = true } +url = { workspace = true } +k8s-openapi.workspace = true diff --git a/examples/nats-module/env_example.sh b/examples/nats-module/env_example.sh new file mode 100644 index 00000000..7d3fee03 --- /dev/null +++ b/examples/nats-module/env_example.sh @@ -0,0 +1,9 @@ +# Cluster 1 +export HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_1="kubeconfig=$HOME/.kube/config,context=cluster-context" +export HARMONY_NATS_SITE_1_DOMAIN="your.domain.1" +# Cluster 2 +export HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_2="kubeconfig=$HOME/.kube/config,context=cluster-context" +export HARMONY_NATS_SITE_2_DOMAIN="your.domain.2" +# Cluster 3 +export HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_3="kubeconfig=$HOME/.kube/config,context=cluster-context" +export HARMONY_NATS_SITE_3_DOMAIN="your.domain.3" diff --git a/examples/nats-module/src/main.rs b/examples/nats-module/src/main.rs new file mode 100644 index 00000000..2b2b1d8c --- /dev/null +++ b/examples/nats-module/src/main.rs @@ -0,0 +1,77 @@ +use harmony::{ + inventory::Inventory, + modules::nats::{capability::NatsCluster, score_nats_supercluster::NatsSuperclusterScore}, + topology::{K8sAnywhereTopology, decentralized::DecentralizedTopology}, +}; + +#[tokio::main] +async fn main() { + let supercluster_ca_secret_name = "nats-supercluster-ca-bundle"; + let tls_cert_name = "nats-gateway"; + let jetstream_enabled = "false"; + let nats_namespace = "nats-example".to_string(); + + let site_1_name = "site-1".to_string(); + let site_1_domain = + std::env::var("HARMONY_NATS_SITE_1_DOMAIN").expect("missing domain in env for site_1"); + + let nats_site_1 = NatsCluster { + namespace: nats_namespace.clone(), + domain: site_1_domain.clone(), + replicas: 1, + name: site_1_name.clone(), + gateway_advertise: format!("{site_1_name}-gw.{site_1_domain}:443"), + dns_name: format!("{site_1_name}-gw.{site_1_domain}"), + supercluster_ca_secret_name: supercluster_ca_secret_name, + tls_cert_name: tls_cert_name, + jetstream_enabled: jetstream_enabled, + }; + + let site_2_name = "site-2".to_string(); + let site_2_domain = + std::env::var("HARMONY_NATS_SITE_2_DOMAIN").expect("missing domain in env for site_2"); + + let nats_site_2 = NatsCluster { + namespace: nats_namespace.clone(), + domain: site_2_domain.clone(), + replicas: 1, + name: site_2_name.clone(), + gateway_advertise: format!("{site_2_name}-gw.{site_2_domain}:443"), + dns_name: format!("{site_2_name}-gw.{site_2_domain}"), + supercluster_ca_secret_name: supercluster_ca_secret_name, + tls_cert_name: tls_cert_name, + jetstream_enabled: jetstream_enabled, + }; + + let site_3_name = "site-3".to_string(); + let site_3_domain = + std::env::var("HARMONY_NATS_SITE_3_DOMAIN").expect("missing domain in env for site_3"); + + let nats_site_3 = NatsCluster { + namespace: nats_namespace.clone(), + domain: site_3_domain.clone(), + replicas: 1, + name: site_3_name.clone(), + gateway_advertise: format!("{site_3_name}-gw.{site_3_domain}:443"), + dns_name: format!("{site_3_name}-gw.{site_3_domain}"), + supercluster_ca_secret_name: supercluster_ca_secret_name, + tls_cert_name: tls_cert_name, + jetstream_enabled: jetstream_enabled, + }; + + let clusters = vec![nats_site_1, nats_site_2, nats_site_3]; + + let nats_supercluster = NatsSuperclusterScore { + nats_cluster: clusters, + ca_certs: None, + }; + + harmony_cli::run( + Inventory::autoload(), + DecentralizedTopology::::from_env(), + vec![Box::new(nats_supercluster)], + None, + ) + .await + .unwrap(); +} diff --git a/harmony/Cargo.toml b/harmony/Cargo.toml index 6e889d21..b4a0bed6 100644 --- a/harmony/Cargo.toml +++ b/harmony/Cargo.toml @@ -79,6 +79,7 @@ sqlx.workspace = true inquire.workspace = true brocade = { path = "../brocade" } option-ext = "0.2.0" +tokio-retry = "0.3.0" [dev-dependencies] pretty_assertions.workspace = true diff --git a/harmony/src/domain/topology/decentralized.rs b/harmony/src/domain/topology/decentralized.rs new file mode 100644 index 00000000..1ee2e010 --- /dev/null +++ b/harmony/src/domain/topology/decentralized.rs @@ -0,0 +1,58 @@ +use async_trait::async_trait; +use log::info; + +use crate::topology::{ + K8sAnywhereConfig, K8sAnywhereTopology, PreparationError, PreparationOutcome, Topology, +}; + +pub struct DecentralizedTopology { + pub sites: Vec, +} + +#[async_trait] +impl Topology for DecentralizedTopology { + fn name(&self) -> &str { + "DecentralizedTopology" + } + + async fn ensure_ready(&self) -> Result { + let mut details = Vec::new(); + + for site in &self.sites { + let outcome = site.ensure_ready().await?; + match outcome { + PreparationOutcome::Success { details: d } => { + details.push(d); + } + PreparationOutcome::Noop => { + details.push("site ready Noop".to_string()); + info!("site ready"); + } + } + } + + Ok(PreparationOutcome::Success { + details: details.join(","), + }) + } +} + +impl DecentralizedTopology { + pub fn from_env() -> Self { + let mut sites = Vec::new(); + + for i in 1.. { + let var = format!("HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_{}", i); + + match std::env::var(&var) { + Ok(_) => { + let cfg = K8sAnywhereConfig::remote_k8s_from_env_var(&var); + sites.push(K8sAnywhereTopology::with_config(cfg)); + } + Err(_) => break, + } + } + + Self { sites } + } +} diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index f4595ee4..5738c68b 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -19,7 +19,7 @@ use kube::{ core::{DynamicResourceScope, ErrorResponse}, discovery::{ApiCapabilities, Scope}, error::DiscoveryError, - runtime::reflector::Lookup, + runtime::{reflector::Lookup, wait::Condition}, }; use kube::{api::DynamicObject, runtime::conditions}; use kube::{ diff --git a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs index a10061d5..4f735dbc 100644 --- a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs @@ -3,11 +3,17 @@ use std::{collections::BTreeMap, process::Command, sync::Arc, time::Duration}; use async_trait::async_trait; use base64::{Engine, engine::general_purpose}; use harmony_types::rfc1123::Rfc1123Name; -use k8s_openapi::api::{ - core::v1::{Pod, Secret}, - rbac::v1::{ClusterRoleBinding, RoleRef, Subject}, +use k8s_openapi::{ + ByteString, + api::{ + core::v1::{Pod, Secret}, + rbac::v1::{ClusterRoleBinding, RoleRef, Subject}, + }, +}; +use kube::{ + api::{DynamicObject, GroupVersionKind, ObjectMeta}, + runtime::conditions, }; -use kube::api::{DynamicObject, GroupVersionKind, ObjectMeta}; use log::{debug, info, trace, warn}; use serde::Serialize; use tokio::sync::OnceCell; @@ -28,7 +34,10 @@ use crate::{ score_cert_management::CertificateManagementScore, }, k3d::K3DInstallationScore, - k8s::ingress::{K8sIngressScore, PathType}, + k8s::{ + ingress::{K8sIngressScore, PathType}, + resource::K8sResourceScore, + }, monitoring::{ grafana::{grafana::Grafana, helm::helm_grafana::grafana_helm_chart_score}, kube_prometheus::crd::{ @@ -45,8 +54,8 @@ use crate::{ service_monitor::ServiceMonitor, }, }, - okd::crd::ingresses_config::Ingress as IngressResource, - okd::route::OKDTlsPassthroughScore, + nats::capability::NatsCluster, + okd::{crd::ingresses_config::Ingress as IngressResource, route::OKDTlsPassthroughScore}, prometheus::{ k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore, prometheus::PrometheusMonitoring, rhob_alerting_score::RHOBAlertingScore, @@ -77,7 +86,7 @@ struct K8sState { message: String, } -#[derive(Debug, Clone)] +#[derive(Debug, Clone, Serialize)] pub enum KubernetesDistribution { OpenshiftFamily, K3sFamily, @@ -501,7 +510,7 @@ impl CertificateManagement for K8sAnywhereTopology { { let secret_name = certificate.spec.secret_name.clone(); - trace!("Secret Name {:#?}", secret_name); + debug!("Secret Name {:#?}", secret_name); if let Some(secret) = client .get_resource::(&secret_name, Some(&namespace)) .await @@ -527,8 +536,8 @@ impl CertificateManagement for K8sAnywhereTopology { return Ok(ca_cert); } else { Err(ExecutorError::UnexpectedError(format!( - "Error getting secret associated with cert_name: {}", - cert_name + "Error getting secret associated with cert_name: {}, secret_name: {}", + cert_name, secret_name ))) } } else { diff --git a/harmony/src/domain/topology/k8s_anywhere/mod.rs b/harmony/src/domain/topology/k8s_anywhere/mod.rs index be870823..d14c28ba 100644 --- a/harmony/src/domain/topology/k8s_anywhere/mod.rs +++ b/harmony/src/domain/topology/k8s_anywhere/mod.rs @@ -1,3 +1,4 @@ mod k8s_anywhere; +pub mod nats; mod postgres; pub use k8s_anywhere::*; diff --git a/harmony/src/domain/topology/k8s_anywhere/nats.rs b/harmony/src/domain/topology/k8s_anywhere/nats.rs new file mode 100644 index 00000000..113ad74f --- /dev/null +++ b/harmony/src/domain/topology/k8s_anywhere/nats.rs @@ -0,0 +1,38 @@ +use async_trait::async_trait; + +use crate::{ + inventory::Inventory, + modules::nats::{ + capability::{Nats, NatsCluster}, + score_nats_k8s::NatsK8sScore, + }, + score::Score, + topology::K8sAnywhereTopology, +}; + +#[async_trait] +impl Nats for K8sAnywhereTopology { + async fn deploy( + &self, + nats_cluster: &NatsCluster, + peers: Option>, + ca_bundle: Option>, + ) -> Result { + let distro = self.get_k8s_distribution().await.unwrap(); + + NatsK8sScore { + distribution: distro.clone(), + cluster: nats_cluster.clone(), + peers, + ca_bundle, + } + .interpret(&Inventory::empty(), self) + .await + .map_err(|e| format!("Failed to deploy nats cluster: {}", e))?; + + Ok(format!( + "Nats cluster deployed in ns {}", + nats_cluster.namespace + )) + } +} diff --git a/harmony/src/domain/topology/mod.rs b/harmony/src/domain/topology/mod.rs index 42add5c7..28161fcf 100644 --- a/harmony/src/domain/topology/mod.rs +++ b/harmony/src/domain/topology/mod.rs @@ -1,3 +1,4 @@ +pub mod decentralized; mod failover; mod ha_cluster; pub mod ingress; diff --git a/harmony/src/modules/mod.rs b/harmony/src/modules/mod.rs index c845fb10..3fa69469 100644 --- a/harmony/src/modules/mod.rs +++ b/harmony/src/modules/mod.rs @@ -13,6 +13,7 @@ pub mod k8s; pub mod lamp; pub mod load_balancer; pub mod monitoring; +pub mod nats; pub mod network; pub mod okd; pub mod opnsense; diff --git a/harmony/src/modules/nats/capability.rs b/harmony/src/modules/nats/capability.rs new file mode 100644 index 00000000..2988a2a0 --- /dev/null +++ b/harmony/src/modules/nats/capability.rs @@ -0,0 +1,53 @@ +use std::fmt::Display; + +use async_trait::async_trait; +use serde::Serialize; + +#[async_trait] +pub trait Nats { + async fn deploy( + &self, + nats_cluster: &NatsCluster, + peers: Option>, + ca_bundle: Option>, + ) -> Result; +} + +#[async_trait] +pub trait NatsSupercluster { + async fn deploy_site( + &self, + nats_clusters: Vec, + ca_bundle: Vec, + ) -> Result; + + async fn create_site_ca_bundle( + &self, + nats_clusters: Vec, + ca_certs: Option>, + ) -> Result, String>; +} + +#[derive(Debug, Clone, Serialize)] +pub struct NatsCluster { + pub namespace: String, + pub domain: String, + pub replicas: usize, + pub name: String, + pub gateway_advertise: String, + pub dns_name: String, + pub supercluster_ca_secret_name: &'static str, + pub tls_cert_name: &'static str, + pub jetstream_enabled: &'static str, +} + +#[derive(Debug, Clone, Serialize)] +pub struct NatsEndpoint { + pub host: String, +} + +impl Display for NatsEndpoint { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "({})", self.host,) + } +} diff --git a/harmony/src/modules/nats/decentralized.rs b/harmony/src/modules/nats/decentralized.rs new file mode 100644 index 00000000..7f805c18 --- /dev/null +++ b/harmony/src/modules/nats/decentralized.rs @@ -0,0 +1,57 @@ +use async_trait::async_trait; +use log::debug; + +use crate::{ + modules::{ + cert_manager::capability::CertificateManagement, + nats::{ + capability::{Nats, NatsCluster, NatsSupercluster}, + pki::NatsPkiPolicy, + }, + }, + topology::{TlsRouter, Topology, decentralized::DecentralizedTopology}, +}; + +#[async_trait] +impl NatsSupercluster + for DecentralizedTopology +{ + async fn create_site_ca_bundle( + &self, + nats_clusters: Vec, + ca_certs: Option>, + ) -> Result, String> { + debug!("extracting nats clusters"); + let mut ca_bundle = Vec::new(); + for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) { + let policy = NatsPkiPolicy { topology: site }; + ca_bundle.push(policy.ensure_nats_ca(cluster).await.unwrap()); + } + + if let Some(mut certs) = ca_certs { + ca_bundle.append(&mut certs) + } + + debug!("creating ca bundle {:#?} on site ", ca_bundle); + + Ok(ca_bundle) + } + async fn deploy_site( + &self, + nats_clusters: Vec, + ca_bundle: Vec, + ) -> Result { + for (i, (site, cluster)) in self.sites.iter().zip(nats_clusters.iter()).enumerate() { + let peers: Vec = nats_clusters + .iter() + .enumerate() + .filter_map(|(j, c)| if i == j { None } else { Some(c.clone()) }) + .collect(); + + site.deploy(cluster, Some(peers), Some(ca_bundle.clone())) + .await?; + } + + Ok(format!("Deployed Nats clusters across sites",)) + } +} diff --git a/harmony/src/modules/nats/mod.rs b/harmony/src/modules/nats/mod.rs new file mode 100644 index 00000000..6758c77b --- /dev/null +++ b/harmony/src/modules/nats/mod.rs @@ -0,0 +1,5 @@ +pub mod capability; +pub mod decentralized; +pub mod pki; +pub mod score_nats_k8s; +pub mod score_nats_supercluster; diff --git a/harmony/src/modules/nats/pki.rs b/harmony/src/modules/nats/pki.rs new file mode 100644 index 00000000..0759c9b4 --- /dev/null +++ b/harmony/src/modules/nats/pki.rs @@ -0,0 +1,77 @@ +use crate::modules::{ + cert_manager::{ + capability::{CertificateManagement, CertificateManagementConfig}, + crd::CaIssuer, + }, + nats::capability::NatsCluster, +}; + +pub struct NatsPkiPolicy<'a, T> { + pub topology: &'a T, +} + +impl<'a, T> NatsPkiPolicy<'a, T> +where + T: CertificateManagement, +{ + pub async fn ensure_nats_ca(&self, cluster: &NatsCluster) -> Result { + let ca_issuer_name = "harmony-ca-issuer"; + let root_ca_cert_name = "harmony-root-ca"; + let self_signed_issuer_name = "harmony-self-signed-issuer"; + + let self_signed_config = CertificateManagementConfig { + namespace: Some(cluster.namespace.clone()), + acme_issuer: None, + ca_issuer: None, + self_signed: true, + }; + + let root_ca_config = CertificateManagementConfig { + namespace: Some(cluster.namespace.clone()), + acme_issuer: None, + ca_issuer: Some(CaIssuer { + secret_name: format!("{}-tls", root_ca_cert_name), + }), + self_signed: false, + }; + + self.topology + .create_issuer(self_signed_issuer_name.into(), &self_signed_config) + .await + .map_err(|e| e.to_string())?; + + self.topology + .create_certificate( + root_ca_cert_name.into(), + self_signed_issuer_name.into(), + Some(format!("harmony-{}-ca", cluster.name)), + None, + Some(true), + &root_ca_config, + ) + .await + .map_err(|e| e.to_string())?; + + self.topology + .create_issuer(ca_issuer_name.into(), &root_ca_config) + .await + .map_err(|e| e.to_string())?; + + self.topology + .create_certificate( + cluster.tls_cert_name.into(), + ca_issuer_name.into(), + None, + Some(vec![cluster.dns_name.clone()]), + Some(false), + &root_ca_config, + ) + .await + .map_err(|e| e.to_string())?; + + self.topology + .get_ca_certificate(root_ca_cert_name.into(), &root_ca_config) + .await + .map_err(|e| e.to_string()) + } +} diff --git a/harmony/src/modules/nats/score_nats_k8s.rs b/harmony/src/modules/nats/score_nats_k8s.rs new file mode 100644 index 00000000..cad35e38 --- /dev/null +++ b/harmony/src/modules/nats/score_nats_k8s.rs @@ -0,0 +1,350 @@ +use std::{collections::BTreeMap, str::FromStr}; + +use async_trait::async_trait; +use harmony_macros::hurl; +use harmony_secret::{Secret, SecretManager}; +use harmony_types::id::Id; +use k8s_openapi::{ByteString, api::core::v1::Secret as K8sSecret}; +use kube::api::ObjectMeta; +use log::{debug, info}; +use non_blank_string_rs::NonBlankString; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::{ + data::Version, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + modules::{ + helm::chart::{HelmChartScore, HelmRepository}, + k8s::{ingress::K8sIngressScore, resource::K8sResourceScore}, + nats::capability::{Nats, NatsCluster, NatsEndpoint}, + okd::{ + crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig}, + route::OKDRouteScore, + }, + }, + score::Score, + topology::{HelmCommand, K8sclient, KubernetesDistribution, TlsRouter, Topology}, +}; + +#[derive(Debug, Clone, Serialize)] +pub struct NatsK8sScore { + pub distribution: KubernetesDistribution, + pub cluster: NatsCluster, + pub peers: Option>, + pub ca_bundle: Option>, +} + +impl Score for NatsK8sScore { + fn name(&self) -> String { + "NatsK8sScore".to_string() + } + + fn create_interpret(&self) -> Box> { + Box::new(NatsK8sInterpret { + score: self.clone(), + }) + } +} +#[derive(Debug)] +pub struct NatsK8sInterpret { + score: NatsK8sScore, +} + +#[async_trait] +impl Interpret + for NatsK8sInterpret +{ + async fn execute( + &self, + inventory: &Inventory, + topology: &T, + ) -> Result { + let domain = topology + .get_internal_domain() + .await? + .ok_or(InterpretError::new("no internal domain found".to_string())) + .unwrap(); + + info!("creating ingress for nats cluster"); + self.create_ingress( + topology, + inventory, + self.score.distribution.clone(), + self.score.cluster.clone(), + ) + .await?; + + let domain = NatsEndpoint { host: domain }; + + info!("creating nats ca bundle secret"); + self.create_ca_bundle_secret( + topology, + inventory, + self.score.cluster.clone(), + self.score.ca_bundle.clone(), + ) + .await?; + + info!("deploying nats cluster"); + self.deploy_nats( + topology, + inventory, + self.score.cluster.clone(), + domain, + self.score.peers.clone(), + self.score.cluster.namespace.clone(), + ) + .await?; + Ok(Outcome::success( + "successfully deployed nats K8s".to_string(), + )) + } + + fn get_name(&self) -> InterpretName { + InterpretName::Custom("NatsK8sInterpret") + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +} + +impl NatsK8sInterpret { + async fn create_ingress( + &self, + topology: &T, + inventory: &Inventory, + distribution: KubernetesDistribution, + nats_cluster: NatsCluster, + ) -> Result { + match distribution { + KubernetesDistribution::OpenshiftFamily => { + OKDRouteScore { + name: nats_cluster.name.to_string(), + namespace: nats_cluster.namespace.to_string(), + spec: RouteSpec { + to: RouteTargetReference { + kind: "Service".to_string(), + name: format!("{}", nats_cluster.name.to_string()), + weight: Some(100), + }, + host: Some(nats_cluster.dns_name.clone()), + port: Some(RoutePort { target_port: 7222 }), + tls: Some(TLSConfig { + insecure_edge_termination_policy: None, + termination: "passthrough".to_string(), + ..Default::default() + }), + wildcard_policy: None, + ..Default::default() + }, + } + .interpret(inventory, topology) + .await + } + KubernetesDistribution::K3sFamily | KubernetesDistribution::Default => { + //TODO untested + K8sIngressScore { + name: todo!(), + host: todo!(), + backend_service: todo!(), + port: todo!(), + path: todo!(), + path_type: todo!(), + namespace: todo!(), + ingress_class_name: todo!(), + } + .interpret(inventory, topology) + .await + } + } + } + + async fn create_ca_bundle_secret( + &self, + topology: &T, + inventory: &Inventory, + nats_cluster: NatsCluster, + ca_bundle: Option>, + ) -> Result { + async fn build_secret_data(bundle: &Vec) -> BTreeMap { + let mut data = BTreeMap::new(); + + data.insert( + "ca.crt".to_string(), + ByteString(bundle.join("\n").into_bytes()), + ); + + data + } + + match ca_bundle { + Some(certs) => { + let bundle_secret = K8sSecret { + metadata: ObjectMeta { + name: Some(nats_cluster.supercluster_ca_secret_name.to_string()), + namespace: Some(nats_cluster.namespace.to_string()), + ..Default::default() + }, + data: Some(build_secret_data(&certs).await), + immutable: Some(false), + type_: Some("Opaque".to_string()), + string_data: None, + }; + + K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone())) + .interpret(inventory, topology) + .await + .map_err(|e| InterpretError::new(e.to_string())) + } + None => Ok(Outcome::noop("no certs to create".to_string())), + } + } + + async fn deploy_nats( + &self, + topology: &T, + inventory: &Inventory, + cluster: NatsCluster, + domain: NatsEndpoint, + peers: Option>, + namespace: String, + ) -> Result { + let mut gateway_gateways = String::new(); + let admin = SecretManager::get_or_prompt::().await.unwrap(); + let user = admin.user.clone(); + let password = admin.password.clone(); + + match peers { + Some(peers) => { + for peer in peers { + // Construct wss:// URLs on port 443 for the remote gateways + gateway_gateways.push_str(&format!( + r#" + - name: {} + urls: + - nats://{}"#, + peer.name, peer.gateway_advertise + )); + } + } + None => debug!("no peers to push to gateway"), + } + + // Inject gateway config into the 'merge' block to comply with chart structure + let values_yaml = Some(format!( + r#" +fullnameOverride: {cluster_name} +config: + merge: + authorization: + default_permissions: + publish: ["TEST.*"] + subscribe: ["PUBLIC.>"] + users: + # - user: "admin" + # password: "admin_1" + # permissions: + # publish: ">" + # subscribe: ">" + - password: "enGk0cgZUabM6bN6FXHT" + user: "testUser" + accounts: + system: + users: + - user: "{user}" + password: "{password}" + logtime: true + debug: true + trace: true + system_account: system + cluster: + name: {cluster_name} + enabled: true + replicas: {replicas} + jetstream: + enabled: {jetstream_enabled} + fileStorage: + enabled: true + size: 10Gi + storageDirectory: /data/jetstream + leafnodes: + enabled: false + websocket: + enabled: false + ingress: + enabled: true + className: openshift-default + pathType: Prefix + hosts: + - nats-ws.{domain} + gateway: + enabled: true + port: 7222 + name: {cluster_name} + merge: + advertise: {gateway_advertise} + gateways: {gateway_gateways} + tls: + enabled: true + secretName: {tls_secret_name} + # merge: + # ca_file: "/etc/nats-certs/gateway/ca.crt" +service: + ports: + gateway: + enabled: true +tlsCA: + enabled: true + secretName: {supercluster_ca_secret_name} +natsBox: + container: + image: + tag: nonroot"#, + cluster_name = cluster.name, + user = user, + password = password, + replicas = cluster.replicas, + domain = domain, + gateway_gateways = gateway_gateways, + gateway_advertise = cluster.gateway_advertise, + tls_secret_name = format!("{}-tls", cluster.tls_cert_name), + jetstream_enabled = cluster.jetstream_enabled, + supercluster_ca_secret_name = cluster.supercluster_ca_secret_name, + )); + + debug!("Prepared Helm Chart values : \n{values_yaml:#?}"); + let nats = HelmChartScore { + namespace: Some(NonBlankString::from_str(&namespace).unwrap()), + release_name: NonBlankString::from_str(&cluster.name).unwrap(), + chart_name: NonBlankString::from_str("nats/nats").unwrap(), + chart_version: None, + values_overrides: None, + values_yaml, + create_namespace: true, + install_only: false, + repository: Some(HelmRepository::new( + "nats".to_string(), + hurl!("https://nats-io.github.io/k8s/helm/charts/"), + true, + )), + }; + nats.interpret(inventory, topology).await + } +} + +#[derive(Secret, Serialize, Deserialize, JsonSchema, Debug, PartialEq, Clone)] +struct NatsAdmin { + user: String, + password: String, +} diff --git a/harmony/src/modules/nats/score_nats_supercluster.rs b/harmony/src/modules/nats/score_nats_supercluster.rs new file mode 100644 index 00000000..8ff244a3 --- /dev/null +++ b/harmony/src/modules/nats/score_nats_supercluster.rs @@ -0,0 +1,77 @@ +use async_trait::async_trait; +use harmony_types::id::Id; +use log::info; +use serde::Serialize; + +use crate::{ + data::Version, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + modules::nats::capability::{NatsCluster, NatsSupercluster}, + score::Score, + topology::Topology, +}; + +#[derive(Debug, Clone, Serialize)] +pub struct NatsSuperclusterScore { + pub nats_cluster: Vec, + pub ca_certs: Option>, +} + +impl Score for NatsSuperclusterScore { + fn name(&self) -> String { + "NatsSuperclusterScore".to_string() + } + + fn create_interpret(&self) -> Box> { + Box::new(NatsSuperclusterInterpret { + nats_clusters: self.nats_cluster.clone(), + ca_certs: self.ca_certs.clone(), + }) + } +} + +#[derive(Debug)] +pub struct NatsSuperclusterInterpret { + nats_clusters: Vec, + ca_certs: Option>, +} + +#[async_trait] +impl Interpret for NatsSuperclusterInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + ) -> Result { + info!("creating nats supercluster ca bundle"); + let ca_bundle = topology + .create_site_ca_bundle(self.nats_clusters.clone(), self.ca_certs.clone()) + .await + .map_err(|e| InterpretError::new(e))?; + + info!("deploying nats supercluster"); + let cluster_name = topology + .deploy_site(self.nats_clusters.clone(), ca_bundle) + .await + .map_err(|e| InterpretError::new(e))?; + + Ok(Outcome::success(format!("Nats cluster '{}'", cluster_name))) + } + + fn get_name(&self) -> InterpretName { + InterpretName::Custom("NatsSuperclusterInterpret") + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +}