feat: created decentralized topology, capability nats and nats super cluster #221
32
Cargo.lock
generated
32
Cargo.lock
generated
@@ -1874,6 +1874,22 @@ dependencies = [
|
|||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "example-nats-module-supercluster"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"cidr",
|
||||||
|
"env_logger",
|
||||||
|
"harmony",
|
||||||
|
"harmony_cli",
|
||||||
|
"harmony_macros",
|
||||||
|
"harmony_types",
|
||||||
|
"k8s-openapi",
|
||||||
|
"log",
|
||||||
|
"tokio",
|
||||||
|
"url",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "example-nats-supercluster"
|
name = "example-nats-supercluster"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
@@ -2260,9 +2276,9 @@ dependencies = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "fqdn"
|
name = "fqdn"
|
||||||
version = "0.4.6"
|
version = "0.5.2"
|
||||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
checksum = "c0f5d7f7b3eed2f771fc7f6fcb651f9560d7b0c483d75876082acb4649d266b3"
|
checksum = "886ac788f62d16d6b0f26b2fa762b34ef16ebfb4b624c2c15fbcadc9173c0f72"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"punycode",
|
"punycode",
|
||||||
"serde",
|
"serde",
|
||||||
@@ -2596,6 +2612,7 @@ dependencies = [
|
|||||||
"tempfile",
|
"tempfile",
|
||||||
"thiserror 2.0.16",
|
"thiserror 2.0.16",
|
||||||
"tokio",
|
"tokio",
|
||||||
|
"tokio-retry",
|
||||||
"tokio-util",
|
"tokio-util",
|
||||||
"url",
|
"url",
|
||||||
"uuid",
|
"uuid",
|
||||||
@@ -6453,6 +6470,17 @@ dependencies = [
|
|||||||
"syn 2.0.106",
|
"syn 2.0.106",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "tokio-retry"
|
||||||
|
version = "0.3.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f"
|
||||||
|
dependencies = [
|
||||||
|
"pin-project",
|
||||||
|
"rand 0.8.5",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "tokio-rustls"
|
name = "tokio-rustls"
|
||||||
version = "0.24.1"
|
version = "0.24.1"
|
||||||
|
|||||||
19
examples/nats-module/Cargo.toml
Normal file
19
examples/nats-module/Cargo.toml
Normal file
@@ -0,0 +1,19 @@
|
|||||||
|
[package]
|
||||||
|
name = "example-nats-module-supercluster"
|
||||||
|
edition = "2024"
|
||||||
|
version.workspace = true
|
||||||
|
readme.workspace = true
|
||||||
|
license.workspace = true
|
||||||
|
publish = false
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
harmony = { path = "../../harmony" }
|
||||||
|
harmony_cli = { path = "../../harmony_cli" }
|
||||||
|
harmony_types = { path = "../../harmony_types" }
|
||||||
|
cidr = { workspace = true }
|
||||||
|
tokio = { workspace = true }
|
||||||
|
harmony_macros = { path = "../../harmony_macros" }
|
||||||
|
log = { workspace = true }
|
||||||
|
env_logger = { workspace = true }
|
||||||
|
url = { workspace = true }
|
||||||
|
k8s-openapi.workspace = true
|
||||||
9
examples/nats-module/env_example.sh
Normal file
9
examples/nats-module/env_example.sh
Normal file
@@ -0,0 +1,9 @@
|
|||||||
|
# Cluster 1
|
||||||
|
export HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_1="kubeconfig=$HOME/.kube/config,context=cluster-context"
|
||||||
|
export HARMONY_NATS_SITE_1_DOMAIN="your.domain.1"
|
||||||
|
# Cluster 2
|
||||||
|
export HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_2="kubeconfig=$HOME/.kube/config,context=cluster-context"
|
||||||
|
export HARMONY_NATS_SITE_2_DOMAIN="your.domain.2"
|
||||||
|
# Cluster 3
|
||||||
|
export HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_3="kubeconfig=$HOME/.kube/config,context=cluster-context"
|
||||||
|
export HARMONY_NATS_SITE_3_DOMAIN="your.domain.3"
|
||||||
77
examples/nats-module/src/main.rs
Normal file
77
examples/nats-module/src/main.rs
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
use harmony::{
|
||||||
|
inventory::Inventory,
|
||||||
|
modules::nats::{capability::NatsCluster, score_nats_supercluster::NatsSuperclusterScore},
|
||||||
|
topology::{K8sAnywhereTopology, decentralized::DecentralizedTopology},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() {
|
||||||
|
let supercluster_ca_secret_name = "nats-supercluster-ca-bundle";
|
||||||
|
let tls_cert_name = "nats-gateway";
|
||||||
|
let jetstream_enabled = "false";
|
||||||
|
let nats_namespace = "nats-example".to_string();
|
||||||
|
|
||||||
|
let site_1_name = "site-1".to_string();
|
||||||
|
let site_1_domain =
|
||||||
|
std::env::var("HARMONY_NATS_SITE_1_DOMAIN").expect("missing domain in env for site_1");
|
||||||
|
|
||||||
|
let nats_site_1 = NatsCluster {
|
||||||
|
namespace: nats_namespace.clone(),
|
||||||
|
domain: site_1_domain.clone(),
|
||||||
|
replicas: 1,
|
||||||
|
name: site_1_name.clone(),
|
||||||
|
gateway_advertise: format!("{site_1_name}-gw.{site_1_domain}:443"),
|
||||||
|
dns_name: format!("{site_1_name}-gw.{site_1_domain}"),
|
||||||
|
supercluster_ca_secret_name: supercluster_ca_secret_name,
|
||||||
|
tls_cert_name: tls_cert_name,
|
||||||
|
jetstream_enabled: jetstream_enabled,
|
||||||
|
};
|
||||||
|
|
||||||
|
let site_2_name = "site-2".to_string();
|
||||||
|
let site_2_domain =
|
||||||
|
std::env::var("HARMONY_NATS_SITE_2_DOMAIN").expect("missing domain in env for site_2");
|
||||||
|
|
||||||
|
let nats_site_2 = NatsCluster {
|
||||||
|
namespace: nats_namespace.clone(),
|
||||||
|
domain: site_2_domain.clone(),
|
||||||
|
replicas: 1,
|
||||||
|
name: site_2_name.clone(),
|
||||||
|
gateway_advertise: format!("{site_2_name}-gw.{site_2_domain}:443"),
|
||||||
|
dns_name: format!("{site_2_name}-gw.{site_2_domain}"),
|
||||||
|
supercluster_ca_secret_name: supercluster_ca_secret_name,
|
||||||
|
tls_cert_name: tls_cert_name,
|
||||||
|
jetstream_enabled: jetstream_enabled,
|
||||||
|
};
|
||||||
|
|
||||||
|
let site_3_name = "site-3".to_string();
|
||||||
|
let site_3_domain =
|
||||||
|
std::env::var("HARMONY_NATS_SITE_3_DOMAIN").expect("missing domain in env for site_3");
|
||||||
|
|
||||||
|
let nats_site_3 = NatsCluster {
|
||||||
|
namespace: nats_namespace.clone(),
|
||||||
|
domain: site_3_domain.clone(),
|
||||||
|
replicas: 1,
|
||||||
|
name: site_3_name.clone(),
|
||||||
|
gateway_advertise: format!("{site_3_name}-gw.{site_3_domain}:443"),
|
||||||
|
dns_name: format!("{site_3_name}-gw.{site_3_domain}"),
|
||||||
|
supercluster_ca_secret_name: supercluster_ca_secret_name,
|
||||||
|
tls_cert_name: tls_cert_name,
|
||||||
|
jetstream_enabled: jetstream_enabled,
|
||||||
|
};
|
||||||
|
|
||||||
|
let clusters = vec![nats_site_1, nats_site_2, nats_site_3];
|
||||||
|
|
||||||
|
let nats_supercluster = NatsSuperclusterScore {
|
||||||
|
nats_cluster: clusters,
|
||||||
|
ca_certs: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
harmony_cli::run(
|
||||||
|
Inventory::autoload(),
|
||||||
|
DecentralizedTopology::<K8sAnywhereTopology>::from_env(),
|
||||||
|
vec![Box::new(nats_supercluster)],
|
||||||
|
None,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.unwrap();
|
||||||
|
}
|
||||||
@@ -79,6 +79,7 @@ sqlx.workspace = true
|
|||||||
inquire.workspace = true
|
inquire.workspace = true
|
||||||
brocade = { path = "../brocade" }
|
brocade = { path = "../brocade" }
|
||||||
option-ext = "0.2.0"
|
option-ext = "0.2.0"
|
||||||
|
tokio-retry = "0.3.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
pretty_assertions.workspace = true
|
pretty_assertions.workspace = true
|
||||||
|
|||||||
58
harmony/src/domain/topology/decentralized.rs
Normal file
58
harmony/src/domain/topology/decentralized.rs
Normal file
@@ -0,0 +1,58 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
use log::info;
|
||||||
|
|
||||||
|
use crate::topology::{
|
||||||
|
K8sAnywhereConfig, K8sAnywhereTopology, PreparationError, PreparationOutcome, Topology,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct DecentralizedTopology<T> {
|
||||||
|
pub sites: Vec<T>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<T: Topology + Send + Sync> Topology for DecentralizedTopology<T> {
|
||||||
|
fn name(&self) -> &str {
|
||||||
|
"DecentralizedTopology"
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
|
||||||
|
let mut details = Vec::new();
|
||||||
|
|
||||||
|
for site in &self.sites {
|
||||||
|
let outcome = site.ensure_ready().await?;
|
||||||
|
match outcome {
|
||||||
|
PreparationOutcome::Success { details: d } => {
|
||||||
|
details.push(d);
|
||||||
|
}
|
||||||
|
PreparationOutcome::Noop => {
|
||||||
|
details.push("site ready Noop".to_string());
|
||||||
|
info!("site ready");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(PreparationOutcome::Success {
|
||||||
|
details: details.join(","),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl DecentralizedTopology<K8sAnywhereTopology> {
|
||||||
|
pub fn from_env() -> Self {
|
||||||
|
let mut sites = Vec::new();
|
||||||
|
|
||||||
|
for i in 1.. {
|
||||||
|
let var = format!("HARMONY_DECENTRALIZED_TOPOLOGY_K8S_SITE_{}", i);
|
||||||
|
|
||||||
|
match std::env::var(&var) {
|
||||||
|
Ok(_) => {
|
||||||
|
let cfg = K8sAnywhereConfig::remote_k8s_from_env_var(&var);
|
||||||
|
sites.push(K8sAnywhereTopology::with_config(cfg));
|
||||||
|
}
|
||||||
|
Err(_) => break,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Self { sites }
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -19,7 +19,7 @@ use kube::{
|
|||||||
core::{DynamicResourceScope, ErrorResponse},
|
core::{DynamicResourceScope, ErrorResponse},
|
||||||
discovery::{ApiCapabilities, Scope},
|
discovery::{ApiCapabilities, Scope},
|
||||||
error::DiscoveryError,
|
error::DiscoveryError,
|
||||||
runtime::reflector::Lookup,
|
runtime::{reflector::Lookup, wait::Condition},
|
||||||
};
|
};
|
||||||
use kube::{api::DynamicObject, runtime::conditions};
|
use kube::{api::DynamicObject, runtime::conditions};
|
||||||
use kube::{
|
use kube::{
|
||||||
|
|||||||
@@ -3,11 +3,17 @@ use std::{collections::BTreeMap, process::Command, sync::Arc, time::Duration};
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use base64::{Engine, engine::general_purpose};
|
use base64::{Engine, engine::general_purpose};
|
||||||
use harmony_types::rfc1123::Rfc1123Name;
|
use harmony_types::rfc1123::Rfc1123Name;
|
||||||
use k8s_openapi::api::{
|
use k8s_openapi::{
|
||||||
|
ByteString,
|
||||||
|
api::{
|
||||||
core::v1::{Pod, Secret},
|
core::v1::{Pod, Secret},
|
||||||
rbac::v1::{ClusterRoleBinding, RoleRef, Subject},
|
rbac::v1::{ClusterRoleBinding, RoleRef, Subject},
|
||||||
|
},
|
||||||
|
};
|
||||||
|
use kube::{
|
||||||
|
api::{DynamicObject, GroupVersionKind, ObjectMeta},
|
||||||
|
runtime::conditions,
|
||||||
};
|
};
|
||||||
use kube::api::{DynamicObject, GroupVersionKind, ObjectMeta};
|
|
||||||
use log::{debug, info, trace, warn};
|
use log::{debug, info, trace, warn};
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
use tokio::sync::OnceCell;
|
use tokio::sync::OnceCell;
|
||||||
@@ -28,7 +34,10 @@ use crate::{
|
|||||||
score_cert_management::CertificateManagementScore,
|
score_cert_management::CertificateManagementScore,
|
||||||
},
|
},
|
||||||
k3d::K3DInstallationScore,
|
k3d::K3DInstallationScore,
|
||||||
k8s::ingress::{K8sIngressScore, PathType},
|
k8s::{
|
||||||
|
ingress::{K8sIngressScore, PathType},
|
||||||
|
resource::K8sResourceScore,
|
||||||
|
},
|
||||||
monitoring::{
|
monitoring::{
|
||||||
grafana::{grafana::Grafana, helm::helm_grafana::grafana_helm_chart_score},
|
grafana::{grafana::Grafana, helm::helm_grafana::grafana_helm_chart_score},
|
||||||
kube_prometheus::crd::{
|
kube_prometheus::crd::{
|
||||||
@@ -45,8 +54,8 @@ use crate::{
|
|||||||
service_monitor::ServiceMonitor,
|
service_monitor::ServiceMonitor,
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
okd::crd::ingresses_config::Ingress as IngressResource,
|
nats::capability::NatsCluster,
|
||||||
okd::route::OKDTlsPassthroughScore,
|
okd::{crd::ingresses_config::Ingress as IngressResource, route::OKDTlsPassthroughScore},
|
||||||
prometheus::{
|
prometheus::{
|
||||||
k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore,
|
k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore,
|
||||||
prometheus::PrometheusMonitoring, rhob_alerting_score::RHOBAlertingScore,
|
prometheus::PrometheusMonitoring, rhob_alerting_score::RHOBAlertingScore,
|
||||||
@@ -77,7 +86,7 @@ struct K8sState {
|
|||||||
message: String,
|
message: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
pub enum KubernetesDistribution {
|
pub enum KubernetesDistribution {
|
||||||
OpenshiftFamily,
|
OpenshiftFamily,
|
||||||
K3sFamily,
|
K3sFamily,
|
||||||
@@ -501,7 +510,7 @@ impl CertificateManagement for K8sAnywhereTopology {
|
|||||||
{
|
{
|
||||||
let secret_name = certificate.spec.secret_name.clone();
|
let secret_name = certificate.spec.secret_name.clone();
|
||||||
|
|
||||||
trace!("Secret Name {:#?}", secret_name);
|
debug!("Secret Name {:#?}", secret_name);
|
||||||
if let Some(secret) = client
|
if let Some(secret) = client
|
||||||
.get_resource::<Secret>(&secret_name, Some(&namespace))
|
.get_resource::<Secret>(&secret_name, Some(&namespace))
|
||||||
.await
|
.await
|
||||||
@@ -527,8 +536,8 @@ impl CertificateManagement for K8sAnywhereTopology {
|
|||||||
return Ok(ca_cert);
|
return Ok(ca_cert);
|
||||||
} else {
|
} else {
|
||||||
Err(ExecutorError::UnexpectedError(format!(
|
Err(ExecutorError::UnexpectedError(format!(
|
||||||
"Error getting secret associated with cert_name: {}",
|
"Error getting secret associated with cert_name: {}, secret_name: {}",
|
||||||
cert_name
|
cert_name, secret_name
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|||||||
@@ -1,3 +1,4 @@
|
|||||||
mod k8s_anywhere;
|
mod k8s_anywhere;
|
||||||
|
pub mod nats;
|
||||||
mod postgres;
|
mod postgres;
|
||||||
pub use k8s_anywhere::*;
|
pub use k8s_anywhere::*;
|
||||||
|
|||||||
38
harmony/src/domain/topology/k8s_anywhere/nats.rs
Normal file
38
harmony/src/domain/topology/k8s_anywhere/nats.rs
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
inventory::Inventory,
|
||||||
|
modules::nats::{
|
||||||
|
capability::{Nats, NatsCluster},
|
||||||
|
score_nats_k8s::NatsK8sScore,
|
||||||
|
},
|
||||||
|
score::Score,
|
||||||
|
topology::K8sAnywhereTopology,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl Nats for K8sAnywhereTopology {
|
||||||
|
async fn deploy(
|
||||||
|
&self,
|
||||||
|
nats_cluster: &NatsCluster,
|
||||||
|
peers: Option<Vec<NatsCluster>>,
|
||||||
|
ca_bundle: Option<Vec<String>>,
|
||||||
|
) -> Result<String, String> {
|
||||||
|
let distro = self.get_k8s_distribution().await.unwrap();
|
||||||
|
|
||||||
|
NatsK8sScore {
|
||||||
|
distribution: distro.clone(),
|
||||||
|
cluster: nats_cluster.clone(),
|
||||||
|
peers,
|
||||||
|
wjro marked this conversation as resolved
|
|||||||
|
ca_bundle,
|
||||||
|
}
|
||||||
|
.interpret(&Inventory::empty(), self)
|
||||||
|
.await
|
||||||
|
.map_err(|e| format!("Failed to deploy nats cluster: {}", e))?;
|
||||||
|
|
||||||
|
Ok(format!(
|
||||||
|
"Nats cluster deployed in ns {}",
|
||||||
|
nats_cluster.namespace
|
||||||
|
))
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,3 +1,4 @@
|
|||||||
|
pub mod decentralized;
|
||||||
mod failover;
|
mod failover;
|
||||||
mod ha_cluster;
|
mod ha_cluster;
|
||||||
pub mod ingress;
|
pub mod ingress;
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ pub mod k8s;
|
|||||||
pub mod lamp;
|
pub mod lamp;
|
||||||
pub mod load_balancer;
|
pub mod load_balancer;
|
||||||
pub mod monitoring;
|
pub mod monitoring;
|
||||||
|
pub mod nats;
|
||||||
pub mod network;
|
pub mod network;
|
||||||
pub mod okd;
|
pub mod okd;
|
||||||
pub mod opnsense;
|
pub mod opnsense;
|
||||||
|
|||||||
53
harmony/src/modules/nats/capability.rs
Normal file
53
harmony/src/modules/nats/capability.rs
Normal file
@@ -0,0 +1,53 @@
|
|||||||
|
use std::fmt::Display;
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use serde::Serialize;
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait Nats {
|
||||||
|
async fn deploy(
|
||||||
|
&self,
|
||||||
|
nats_cluster: &NatsCluster,
|
||||||
|
peers: Option<Vec<NatsCluster>>,
|
||||||
|
ca_bundle: Option<Vec<String>>,
|
||||||
|
) -> Result<String, String>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait NatsSupercluster {
|
||||||
|
async fn deploy_site(
|
||||||
|
&self,
|
||||||
|
nats_clusters: Vec<NatsCluster>,
|
||||||
|
ca_bundle: Vec<String>,
|
||||||
|
) -> Result<String, String>;
|
||||||
|
|
||||||
|
async fn create_site_ca_bundle(
|
||||||
|
&self,
|
||||||
|
nats_clusters: Vec<NatsCluster>,
|
||||||
|
ca_certs: Option<Vec<String>>,
|
||||||
|
) -> Result<Vec<String>, String>;
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct NatsCluster {
|
||||||
|
pub namespace: String,
|
||||||
|
pub domain: String,
|
||||||
|
pub replicas: usize,
|
||||||
|
pub name: String,
|
||||||
|
pub gateway_advertise: String,
|
||||||
|
pub dns_name: String,
|
||||||
|
pub supercluster_ca_secret_name: &'static str,
|
||||||
|
pub tls_cert_name: &'static str,
|
||||||
|
pub jetstream_enabled: &'static str,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct NatsEndpoint {
|
||||||
|
pub host: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Display for NatsEndpoint {
|
||||||
|
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||||
|
write!(f, "({})", self.host,)
|
||||||
|
}
|
||||||
|
}
|
||||||
57
harmony/src/modules/nats/decentralized.rs
Normal file
57
harmony/src/modules/nats/decentralized.rs
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
use log::debug;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
modules::{
|
||||||
|
cert_manager::capability::CertificateManagement,
|
||||||
|
nats::{
|
||||||
|
capability::{Nats, NatsCluster, NatsSupercluster},
|
||||||
|
pki::NatsPkiPolicy,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
topology::{TlsRouter, Topology, decentralized::DecentralizedTopology},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<T: Topology + Nats + TlsRouter + CertificateManagement + 'static> NatsSupercluster
|
||||||
|
for DecentralizedTopology<T>
|
||||||
|
{
|
||||||
|
async fn create_site_ca_bundle(
|
||||||
|
&self,
|
||||||
|
nats_clusters: Vec<NatsCluster>,
|
||||||
|
ca_certs: Option<Vec<String>>,
|
||||||
|
) -> Result<Vec<String>, String> {
|
||||||
|
debug!("extracting nats clusters");
|
||||||
|
let mut ca_bundle = Vec::new();
|
||||||
|
for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) {
|
||||||
|
let policy = NatsPkiPolicy { topology: site };
|
||||||
|
ca_bundle.push(policy.ensure_nats_ca(cluster).await.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(mut certs) = ca_certs {
|
||||||
|
ca_bundle.append(&mut certs)
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("creating ca bundle {:#?} on site ", ca_bundle);
|
||||||
|
|
||||||
|
Ok(ca_bundle)
|
||||||
|
}
|
||||||
|
async fn deploy_site(
|
||||||
|
&self,
|
||||||
|
nats_clusters: Vec<NatsCluster>,
|
||||||
|
ca_bundle: Vec<String>,
|
||||||
|
) -> Result<String, String> {
|
||||||
|
for (i, (site, cluster)) in self.sites.iter().zip(nats_clusters.iter()).enumerate() {
|
||||||
|
let peers: Vec<NatsCluster> = nats_clusters
|
||||||
|
.iter()
|
||||||
|
.enumerate()
|
||||||
|
.filter_map(|(j, c)| if i == j { None } else { Some(c.clone()) })
|
||||||
|
.collect();
|
||||||
|
|
||||||
|
site.deploy(cluster, Some(peers), Some(ca_bundle.clone()))
|
||||||
|
.await?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(format!("Deployed Nats clusters across sites",))
|
||||||
|
}
|
||||||
|
}
|
||||||
5
harmony/src/modules/nats/mod.rs
Normal file
5
harmony/src/modules/nats/mod.rs
Normal file
@@ -0,0 +1,5 @@
|
|||||||
|
pub mod capability;
|
||||||
|
pub mod decentralized;
|
||||||
|
pub mod pki;
|
||||||
|
pub mod score_nats_k8s;
|
||||||
|
pub mod score_nats_supercluster;
|
||||||
77
harmony/src/modules/nats/pki.rs
Normal file
77
harmony/src/modules/nats/pki.rs
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
use crate::modules::{
|
||||||
|
cert_manager::{
|
||||||
|
capability::{CertificateManagement, CertificateManagementConfig},
|
||||||
|
crd::CaIssuer,
|
||||||
|
},
|
||||||
|
nats::capability::NatsCluster,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct NatsPkiPolicy<'a, T> {
|
||||||
|
pub topology: &'a T,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T> NatsPkiPolicy<'a, T>
|
||||||
|
where
|
||||||
|
T: CertificateManagement,
|
||||||
|
{
|
||||||
|
pub async fn ensure_nats_ca(&self, cluster: &NatsCluster) -> Result<String, String> {
|
||||||
|
let ca_issuer_name = "harmony-ca-issuer";
|
||||||
|
let root_ca_cert_name = "harmony-root-ca";
|
||||||
|
let self_signed_issuer_name = "harmony-self-signed-issuer";
|
||||||
|
|
||||||
|
let self_signed_config = CertificateManagementConfig {
|
||||||
|
namespace: Some(cluster.namespace.clone()),
|
||||||
|
acme_issuer: None,
|
||||||
|
ca_issuer: None,
|
||||||
|
self_signed: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
let root_ca_config = CertificateManagementConfig {
|
||||||
|
namespace: Some(cluster.namespace.clone()),
|
||||||
|
acme_issuer: None,
|
||||||
|
ca_issuer: Some(CaIssuer {
|
||||||
|
secret_name: format!("{}-tls", root_ca_cert_name),
|
||||||
|
}),
|
||||||
|
self_signed: false,
|
||||||
|
};
|
||||||
|
|
||||||
|
self.topology
|
||||||
|
.create_issuer(self_signed_issuer_name.into(), &self_signed_config)
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.to_string())?;
|
||||||
|
|
||||||
|
self.topology
|
||||||
|
.create_certificate(
|
||||||
|
root_ca_cert_name.into(),
|
||||||
|
self_signed_issuer_name.into(),
|
||||||
|
Some(format!("harmony-{}-ca", cluster.name)),
|
||||||
|
None,
|
||||||
|
Some(true),
|
||||||
|
&root_ca_config,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.to_string())?;
|
||||||
|
|
||||||
|
self.topology
|
||||||
|
.create_issuer(ca_issuer_name.into(), &root_ca_config)
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.to_string())?;
|
||||||
|
|
||||||
|
self.topology
|
||||||
|
.create_certificate(
|
||||||
|
cluster.tls_cert_name.into(),
|
||||||
|
ca_issuer_name.into(),
|
||||||
|
None,
|
||||||
|
Some(vec![cluster.dns_name.clone()]),
|
||||||
|
Some(false),
|
||||||
|
&root_ca_config,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.to_string())?;
|
||||||
|
|
||||||
|
self.topology
|
||||||
|
.get_ca_certificate(root_ca_cert_name.into(), &root_ca_config)
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
350
harmony/src/modules/nats/score_nats_k8s.rs
Normal file
350
harmony/src/modules/nats/score_nats_k8s.rs
Normal file
@@ -0,0 +1,350 @@
|
|||||||
|
use std::{collections::BTreeMap, str::FromStr};
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use harmony_macros::hurl;
|
||||||
|
use harmony_secret::{Secret, SecretManager};
|
||||||
|
use harmony_types::id::Id;
|
||||||
|
use k8s_openapi::{ByteString, api::core::v1::Secret as K8sSecret};
|
||||||
|
use kube::api::ObjectMeta;
|
||||||
|
use log::{debug, info};
|
||||||
|
use non_blank_string_rs::NonBlankString;
|
||||||
|
use schemars::JsonSchema;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
data::Version,
|
||||||
|
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||||
|
inventory::Inventory,
|
||||||
|
modules::{
|
||||||
|
helm::chart::{HelmChartScore, HelmRepository},
|
||||||
|
k8s::{ingress::K8sIngressScore, resource::K8sResourceScore},
|
||||||
|
nats::capability::{Nats, NatsCluster, NatsEndpoint},
|
||||||
|
okd::{
|
||||||
|
crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig},
|
||||||
|
route::OKDRouteScore,
|
||||||
|
},
|
||||||
|
},
|
||||||
|
score::Score,
|
||||||
|
topology::{HelmCommand, K8sclient, KubernetesDistribution, TlsRouter, Topology},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct NatsK8sScore {
|
||||||
|
pub distribution: KubernetesDistribution,
|
||||||
|
pub cluster: NatsCluster,
|
||||||
|
pub peers: Option<Vec<NatsCluster>>,
|
||||||
|
pub ca_bundle: Option<Vec<String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Topology + HelmCommand + Nats + K8sclient + TlsRouter + 'static> Score<T> for NatsK8sScore {
|
||||||
|
fn name(&self) -> String {
|
||||||
|
"NatsK8sScore".to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||||
|
Box::new(NatsK8sInterpret {
|
||||||
|
score: self.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct NatsK8sInterpret {
|
||||||
|
score: NatsK8sScore,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<T: Topology + HelmCommand + Nats + K8sclient + TlsRouter + 'static> Interpret<T>
|
||||||
|
for NatsK8sInterpret
|
||||||
|
{
|
||||||
|
async fn execute(
|
||||||
|
&self,
|
||||||
|
inventory: &Inventory,
|
||||||
|
topology: &T,
|
||||||
|
) -> Result<Outcome, InterpretError> {
|
||||||
|
let domain = topology
|
||||||
|
.get_internal_domain()
|
||||||
|
.await?
|
||||||
|
.ok_or(InterpretError::new("no internal domain found".to_string()))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
info!("creating ingress for nats cluster");
|
||||||
|
self.create_ingress(
|
||||||
|
topology,
|
||||||
|
inventory,
|
||||||
|
self.score.distribution.clone(),
|
||||||
|
self.score.cluster.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
let domain = NatsEndpoint { host: domain };
|
||||||
|
|
||||||
|
info!("creating nats ca bundle secret");
|
||||||
|
self.create_ca_bundle_secret(
|
||||||
|
topology,
|
||||||
|
inventory,
|
||||||
|
self.score.cluster.clone(),
|
||||||
|
self.score.ca_bundle.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
info!("deploying nats cluster");
|
||||||
|
self.deploy_nats(
|
||||||
|
topology,
|
||||||
|
inventory,
|
||||||
|
self.score.cluster.clone(),
|
||||||
|
domain,
|
||||||
|
self.score.peers.clone(),
|
||||||
|
self.score.cluster.namespace.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
Ok(Outcome::success(
|
||||||
|
"successfully deployed nats K8s".to_string(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_name(&self) -> InterpretName {
|
||||||
|
InterpretName::Custom("NatsK8sInterpret")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_version(&self) -> Version {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_status(&self) -> InterpretStatus {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_children(&self) -> Vec<Id> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl NatsK8sInterpret {
|
||||||
|
async fn create_ingress<T: Topology + K8sclient>(
|
||||||
|
&self,
|
||||||
|
topology: &T,
|
||||||
|
inventory: &Inventory,
|
||||||
|
distribution: KubernetesDistribution,
|
||||||
|
nats_cluster: NatsCluster,
|
||||||
|
) -> Result<Outcome, InterpretError> {
|
||||||
|
match distribution {
|
||||||
|
KubernetesDistribution::OpenshiftFamily => {
|
||||||
|
OKDRouteScore {
|
||||||
|
name: nats_cluster.name.to_string(),
|
||||||
|
namespace: nats_cluster.namespace.to_string(),
|
||||||
|
spec: RouteSpec {
|
||||||
|
to: RouteTargetReference {
|
||||||
|
kind: "Service".to_string(),
|
||||||
|
name: format!("{}", nats_cluster.name.to_string()),
|
||||||
|
weight: Some(100),
|
||||||
|
},
|
||||||
|
host: Some(nats_cluster.dns_name.clone()),
|
||||||
|
port: Some(RoutePort { target_port: 7222 }),
|
||||||
|
tls: Some(TLSConfig {
|
||||||
|
insecure_edge_termination_policy: None,
|
||||||
|
termination: "passthrough".to_string(),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
wildcard_policy: None,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
.interpret(inventory, topology)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
KubernetesDistribution::K3sFamily | KubernetesDistribution::Default => {
|
||||||
|
//TODO untested
|
||||||
|
K8sIngressScore {
|
||||||
|
name: todo!(),
|
||||||
|
host: todo!(),
|
||||||
|
backend_service: todo!(),
|
||||||
|
port: todo!(),
|
||||||
|
path: todo!(),
|
||||||
|
path_type: todo!(),
|
||||||
|
namespace: todo!(),
|
||||||
|
ingress_class_name: todo!(),
|
||||||
|
}
|
||||||
|
.interpret(inventory, topology)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_ca_bundle_secret<T: Topology + K8sclient>(
|
||||||
|
&self,
|
||||||
|
topology: &T,
|
||||||
|
inventory: &Inventory,
|
||||||
|
nats_cluster: NatsCluster,
|
||||||
|
ca_bundle: Option<Vec<String>>,
|
||||||
|
) -> Result<Outcome, InterpretError> {
|
||||||
|
async fn build_secret_data(bundle: &Vec<String>) -> BTreeMap<String, ByteString> {
|
||||||
|
let mut data = BTreeMap::new();
|
||||||
|
|
||||||
|
data.insert(
|
||||||
|
"ca.crt".to_string(),
|
||||||
|
ByteString(bundle.join("\n").into_bytes()),
|
||||||
|
);
|
||||||
|
|
||||||
|
data
|
||||||
|
}
|
||||||
|
|
||||||
|
match ca_bundle {
|
||||||
|
Some(certs) => {
|
||||||
|
let bundle_secret = K8sSecret {
|
||||||
|
metadata: ObjectMeta {
|
||||||
|
name: Some(nats_cluster.supercluster_ca_secret_name.to_string()),
|
||||||
|
namespace: Some(nats_cluster.namespace.to_string()),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
data: Some(build_secret_data(&certs).await),
|
||||||
|
immutable: Some(false),
|
||||||
|
type_: Some("Opaque".to_string()),
|
||||||
|
string_data: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone()))
|
||||||
|
.interpret(inventory, topology)
|
||||||
|
.await
|
||||||
|
.map_err(|e| InterpretError::new(e.to_string()))
|
||||||
|
}
|
||||||
|
None => Ok(Outcome::noop("no certs to create".to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn deploy_nats<T: Topology + HelmCommand + 'static>(
|
||||||
|
&self,
|
||||||
|
topology: &T,
|
||||||
|
inventory: &Inventory,
|
||||||
|
cluster: NatsCluster,
|
||||||
|
domain: NatsEndpoint,
|
||||||
|
peers: Option<Vec<NatsCluster>>,
|
||||||
|
namespace: String,
|
||||||
|
) -> Result<Outcome, InterpretError> {
|
||||||
|
let mut gateway_gateways = String::new();
|
||||||
|
let admin = SecretManager::get_or_prompt::<NatsAdmin>().await.unwrap();
|
||||||
|
let user = admin.user.clone();
|
||||||
|
let password = admin.password.clone();
|
||||||
|
|
||||||
|
match peers {
|
||||||
|
Some(peers) => {
|
||||||
|
for peer in peers {
|
||||||
|
// Construct wss:// URLs on port 443 for the remote gateways
|
||||||
|
gateway_gateways.push_str(&format!(
|
||||||
|
r#"
|
||||||
|
- name: {}
|
||||||
|
urls:
|
||||||
|
- nats://{}"#,
|
||||||
|
peer.name, peer.gateway_advertise
|
||||||
|
));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
None => debug!("no peers to push to gateway"),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Inject gateway config into the 'merge' block to comply with chart structure
|
||||||
|
let values_yaml = Some(format!(
|
||||||
|
r#"
|
||||||
|
fullnameOverride: {cluster_name}
|
||||||
|
config:
|
||||||
|
merge:
|
||||||
|
authorization:
|
||||||
|
default_permissions:
|
||||||
|
publish: ["TEST.*"]
|
||||||
|
subscribe: ["PUBLIC.>"]
|
||||||
|
users:
|
||||||
|
# - user: "admin"
|
||||||
|
# password: "admin_1"
|
||||||
|
# permissions:
|
||||||
|
# publish: ">"
|
||||||
|
# subscribe: ">"
|
||||||
|
- password: "enGk0cgZUabM6bN6FXHT"
|
||||||
|
user: "testUser"
|
||||||
|
accounts:
|
||||||
|
system:
|
||||||
|
users:
|
||||||
|
- user: "{user}"
|
||||||
|
password: "{password}"
|
||||||
|
logtime: true
|
||||||
|
debug: true
|
||||||
|
trace: true
|
||||||
|
system_account: system
|
||||||
|
cluster:
|
||||||
|
name: {cluster_name}
|
||||||
|
enabled: true
|
||||||
|
replicas: {replicas}
|
||||||
|
jetstream:
|
||||||
|
enabled: {jetstream_enabled}
|
||||||
|
fileStorage:
|
||||||
|
enabled: true
|
||||||
|
size: 10Gi
|
||||||
|
storageDirectory: /data/jetstream
|
||||||
|
leafnodes:
|
||||||
|
enabled: false
|
||||||
|
websocket:
|
||||||
|
enabled: false
|
||||||
|
ingress:
|
||||||
|
enabled: true
|
||||||
|
className: openshift-default
|
||||||
|
pathType: Prefix
|
||||||
|
hosts:
|
||||||
|
- nats-ws.{domain}
|
||||||
|
gateway:
|
||||||
|
enabled: true
|
||||||
|
port: 7222
|
||||||
|
name: {cluster_name}
|
||||||
|
merge:
|
||||||
|
advertise: {gateway_advertise}
|
||||||
|
gateways: {gateway_gateways}
|
||||||
|
tls:
|
||||||
|
enabled: true
|
||||||
|
secretName: {tls_secret_name}
|
||||||
|
# merge:
|
||||||
|
# ca_file: "/etc/nats-certs/gateway/ca.crt"
|
||||||
|
service:
|
||||||
|
ports:
|
||||||
|
gateway:
|
||||||
|
enabled: true
|
||||||
|
tlsCA:
|
||||||
|
enabled: true
|
||||||
|
secretName: {supercluster_ca_secret_name}
|
||||||
|
natsBox:
|
||||||
|
container:
|
||||||
|
image:
|
||||||
|
tag: nonroot"#,
|
||||||
|
cluster_name = cluster.name,
|
||||||
|
user = user,
|
||||||
|
password = password,
|
||||||
|
replicas = cluster.replicas,
|
||||||
|
domain = domain,
|
||||||
|
gateway_gateways = gateway_gateways,
|
||||||
|
gateway_advertise = cluster.gateway_advertise,
|
||||||
|
tls_secret_name = format!("{}-tls", cluster.tls_cert_name),
|
||||||
|
jetstream_enabled = cluster.jetstream_enabled,
|
||||||
|
supercluster_ca_secret_name = cluster.supercluster_ca_secret_name,
|
||||||
|
));
|
||||||
|
|
||||||
|
debug!("Prepared Helm Chart values : \n{values_yaml:#?}");
|
||||||
|
let nats = HelmChartScore {
|
||||||
|
namespace: Some(NonBlankString::from_str(&namespace).unwrap()),
|
||||||
|
release_name: NonBlankString::from_str(&cluster.name).unwrap(),
|
||||||
|
chart_name: NonBlankString::from_str("nats/nats").unwrap(),
|
||||||
|
chart_version: None,
|
||||||
|
values_overrides: None,
|
||||||
|
values_yaml,
|
||||||
|
create_namespace: true,
|
||||||
|
install_only: false,
|
||||||
|
repository: Some(HelmRepository::new(
|
||||||
|
"nats".to_string(),
|
||||||
|
hurl!("https://nats-io.github.io/k8s/helm/charts/"),
|
||||||
|
true,
|
||||||
|
)),
|
||||||
|
};
|
||||||
|
nats.interpret(inventory, topology).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Secret, Serialize, Deserialize, JsonSchema, Debug, PartialEq, Clone)]
|
||||||
|
struct NatsAdmin {
|
||||||
|
user: String,
|
||||||
|
password: String,
|
||||||
|
}
|
||||||
77
harmony/src/modules/nats/score_nats_supercluster.rs
Normal file
77
harmony/src/modules/nats/score_nats_supercluster.rs
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
use harmony_types::id::Id;
|
||||||
|
use log::info;
|
||||||
|
use serde::Serialize;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
data::Version,
|
||||||
|
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||||
|
inventory::Inventory,
|
||||||
|
modules::nats::capability::{NatsCluster, NatsSupercluster},
|
||||||
|
score::Score,
|
||||||
|
topology::Topology,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
pub struct NatsSuperclusterScore {
|
||||||
|
pub nats_cluster: Vec<NatsCluster>,
|
||||||
|
pub ca_certs: Option<Vec<String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Topology + NatsSupercluster> Score<T> for NatsSuperclusterScore {
|
||||||
|
fn name(&self) -> String {
|
||||||
|
"NatsSuperclusterScore".to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||||
|
Box::new(NatsSuperclusterInterpret {
|
||||||
|
nats_clusters: self.nats_cluster.clone(),
|
||||||
|
ca_certs: self.ca_certs.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct NatsSuperclusterInterpret {
|
||||||
|
nats_clusters: Vec<NatsCluster>,
|
||||||
|
ca_certs: Option<Vec<String>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<T: Topology + NatsSupercluster> Interpret<T> for NatsSuperclusterInterpret {
|
||||||
|
async fn execute(
|
||||||
|
&self,
|
||||||
|
_inventory: &Inventory,
|
||||||
|
topology: &T,
|
||||||
|
) -> Result<Outcome, InterpretError> {
|
||||||
|
info!("creating nats supercluster ca bundle");
|
||||||
|
let ca_bundle = topology
|
||||||
|
.create_site_ca_bundle(self.nats_clusters.clone(), self.ca_certs.clone())
|
||||||
|
.await
|
||||||
|
.map_err(|e| InterpretError::new(e))?;
|
||||||
|
|
||||||
|
info!("deploying nats supercluster");
|
||||||
|
let cluster_name = topology
|
||||||
|
.deploy_site(self.nats_clusters.clone(), ca_bundle)
|
||||||
|
.await
|
||||||
|
.map_err(|e| InterpretError::new(e))?;
|
||||||
|
|
||||||
|
Ok(Outcome::success(format!("Nats cluster '{}'", cluster_name)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_name(&self) -> InterpretName {
|
||||||
|
InterpretName::Custom("NatsSuperclusterInterpret")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_version(&self) -> Version {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_status(&self) -> InterpretStatus {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_children(&self) -> Vec<Id> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user
Once again, this contains way too many implementation details. There should be a NatsK8s module that contains the nats specific logid, then k8s anywhere only does the dispatching depending on its own logic/state (which k8s is the anywhere pointing to right now). K8sAnywhere should not know how to deploy nats. It should tell a natsk8s module "deploy to okd" or something along those lines.