fix/nats-isp #226
@@ -86,7 +86,7 @@ struct K8sState {
|
|||||||
message: String,
|
message: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
pub enum KubernetesDistribution {
|
pub enum KubernetesDistribution {
|
||||||
OpenshiftFamily,
|
OpenshiftFamily,
|
||||||
K3sFamily,
|
K3sFamily,
|
||||||
@@ -568,64 +568,6 @@ impl K8sAnywhereTopology {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn build_ca_bundle_secret(
|
|
||||||
&self,
|
|
||||||
namespace: &str,
|
|
||||||
nats_cluster: &NatsCluster,
|
|
||||||
bundle: &Vec<String>,
|
|
||||||
) -> Secret {
|
|
||||||
Secret {
|
|
||||||
metadata: ObjectMeta {
|
|
||||||
name: Some(nats_cluster.supercluster_ca_secret_name.to_string()),
|
|
||||||
namespace: Some(namespace.to_string()),
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
data: Some(self.build_secret_data(bundle).await),
|
|
||||||
immutable: Some(false),
|
|
||||||
type_: Some("Opaque".to_string()),
|
|
||||||
string_data: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn build_secret_data(&self, bundle: &Vec<String>) -> BTreeMap<String, ByteString> {
|
|
||||||
let mut data = BTreeMap::new();
|
|
||||||
|
|
||||||
data.insert(
|
|
||||||
"ca.crt".to_string(),
|
|
||||||
ByteString(bundle.join("\n").into_bytes()),
|
|
||||||
);
|
|
||||||
|
|
||||||
data
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn create_ca_bundle_secret(
|
|
||||||
&self,
|
|
||||||
nats_cluster: &NatsCluster,
|
|
||||||
ca_bundle: &Vec<String>,
|
|
||||||
) -> Result<Outcome, ExecutorError> {
|
|
||||||
let bundle_secret = self
|
|
||||||
.build_ca_bundle_secret(&nats_cluster.namespace, nats_cluster, ca_bundle)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
"deploying secret to ns: {} \nsecret: {:#?}",
|
|
||||||
nats_cluster.namespace, bundle_secret
|
|
||||||
);
|
|
||||||
|
|
||||||
let k8ssecret =
|
|
||||||
K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone()));
|
|
||||||
|
|
||||||
k8ssecret
|
|
||||||
.interpret(&Inventory::empty(), self)
|
|
||||||
.await
|
|
||||||
.map_err(|e| ExecutorError::UnexpectedError(format!("{e}")))?;
|
|
||||||
|
|
||||||
Ok(Outcome::success(format!(
|
|
||||||
"successfully craeted ca bundle {} in ns: {}",
|
|
||||||
nats_cluster.supercluster_ca_secret_name, nats_cluster.namespace
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn certificate_issuer_ready(
|
pub async fn certificate_issuer_ready(
|
||||||
&self,
|
&self,
|
||||||
issuer_name: String,
|
issuer_name: String,
|
||||||
|
|||||||
@@ -1,28 +1,13 @@
|
|||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use log::{debug, info};
|
|
||||||
use tokio_retry::{Retry, strategy::ExponentialBackoff};
|
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
inventory::Inventory,
|
inventory::Inventory,
|
||||||
modules::{
|
modules::nats::{
|
||||||
cert_manager::{
|
capability::{Nats, NatsCluster},
|
||||||
capability::{CertificateManagement, CertificateManagementConfig},
|
|
||||||
crd::CaIssuer,
|
|
||||||
},
|
|
||||||
k8s::ingress::K8sIngressScore,
|
|
||||||
nats::{
|
|
||||||
capability::{Nats, NatsCluster, NatsEndpoint, NatsSupercluster},
|
|
||||||
score_nats_k8s::NatsK8sScore,
|
score_nats_k8s::NatsK8sScore,
|
||||||
},
|
},
|
||||||
okd::{
|
|
||||||
crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig},
|
|
||||||
route::OKDRouteScore,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
score::Score,
|
score::Score,
|
||||||
topology::{K8sAnywhereTopology, KubernetesDistribution, TlsRouter},
|
topology::K8sAnywhereTopology,
|
||||||
};
|
};
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -31,77 +16,15 @@ impl Nats for K8sAnywhereTopology {
|
|||||||
&self,
|
&self,
|
||||||
nats_cluster: &NatsCluster,
|
nats_cluster: &NatsCluster,
|
||||||
peers: Option<Vec<NatsCluster>>,
|
peers: Option<Vec<NatsCluster>>,
|
||||||
|
ca_bundle: Option<Vec<String>>,
|
||||||
) -> Result<String, String> {
|
) -> Result<String, String> {
|
||||||
let domain = self.get_nats_endpoint(nats_cluster).await?;
|
let distro = self.get_k8s_distribution().await.unwrap();
|
||||||
match self
|
|
||||||
.get_k8s_distribution()
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("Error getting k8s distribution : {}", e.to_string()))?
|
|
||||||
{
|
|
||||||
KubernetesDistribution::OpenshiftFamily => {
|
|
||||||
let route = OKDRouteScore {
|
|
||||||
name: nats_cluster.name.to_string(),
|
|
||||||
namespace: nats_cluster.namespace.to_string(),
|
|
||||||
spec: RouteSpec {
|
|
||||||
to: RouteTargetReference {
|
|
||||||
kind: "Service".to_string(),
|
|
||||||
name: format!("{}", nats_cluster.name.to_string()),
|
|
||||||
weight: Some(100),
|
|
||||||
},
|
|
||||||
host: Some(nats_cluster.dns_name.clone()),
|
|
||||||
port: Some(RoutePort { target_port: 7222 }),
|
|
||||||
tls: Some(TLSConfig {
|
|
||||||
insecure_edge_termination_policy: None,
|
|
||||||
termination: "passthrough".to_string(),
|
|
||||||
..Default::default()
|
|
||||||
}),
|
|
||||||
wildcard_policy: None,
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
};
|
|
||||||
route
|
|
||||||
.interpret(&Inventory::empty(), self)
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("Failed to deploy OKD Route: {e}"))?;
|
|
||||||
}
|
|
||||||
KubernetesDistribution::K3sFamily => {
|
|
||||||
//TODO untested
|
|
||||||
K8sIngressScore {
|
|
||||||
name: todo!(),
|
|
||||||
host: todo!(),
|
|
||||||
backend_service: todo!(),
|
|
||||||
port: todo!(),
|
|
||||||
path: todo!(),
|
|
||||||
path_type: todo!(),
|
|
||||||
namespace: todo!(),
|
|
||||||
ingress_class_name: todo!(),
|
|
||||||
}
|
|
||||||
.interpret(&Inventory::empty(), self)
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("Failed to deploy K8s ingress: {e}"))?;
|
|
||||||
}
|
|
||||||
KubernetesDistribution::Default => {
|
|
||||||
//TODO untested
|
|
||||||
K8sIngressScore {
|
|
||||||
name: todo!(),
|
|
||||||
host: todo!(),
|
|
||||||
backend_service: todo!(),
|
|
||||||
port: todo!(),
|
|
||||||
path: todo!(),
|
|
||||||
path_type: todo!(),
|
|
||||||
namespace: todo!(),
|
|
||||||
ingress_class_name: todo!(),
|
|
||||||
}
|
|
||||||
.interpret(&Inventory::empty(), self)
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("Failed to deploy K8s ingress: {e}"))?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
NatsK8sScore {
|
NatsK8sScore {
|
||||||
|
distribution: distro.clone(),
|
||||||
cluster: nats_cluster.clone(),
|
cluster: nats_cluster.clone(),
|
||||||
domain,
|
|
||||||
peers,
|
peers,
|
||||||
|
ca_bundle,
|
||||||
}
|
}
|
||||||
.interpret(&Inventory::empty(), self)
|
.interpret(&Inventory::empty(), self)
|
||||||
.await
|
.await
|
||||||
@@ -112,126 +35,4 @@ impl Nats for K8sAnywhereTopology {
|
|||||||
nats_cluster.namespace
|
nats_cluster.namespace
|
||||||
))
|
))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn get_nats_endpoint(&self, nats_cluster: &NatsCluster) -> Result<NatsEndpoint, String> {
|
|
||||||
let endpoint = self.get_internal_domain().await?.unwrap();
|
|
||||||
|
|
||||||
Ok(NatsEndpoint { host: endpoint })
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_local_nats_ca_certificate(
|
|
||||||
&self,
|
|
||||||
nats_cluster: &NatsCluster,
|
|
||||||
) -> Result<String, String> {
|
|
||||||
let ca_issuer_name = "harmony-ca-issuer".to_string();
|
|
||||||
let root_ca_cert_name = "harmony-root-ca".to_string();
|
|
||||||
let self_signed_issuer_name = "harmony-self-signed-issuer".to_string();
|
|
||||||
|
|
||||||
debug!("creating self signed issuer config");
|
|
||||||
let self_signed_config = CertificateManagementConfig {
|
|
||||||
namespace: Some(nats_cluster.namespace.to_string().clone()),
|
|
||||||
acme_issuer: None,
|
|
||||||
ca_issuer: None,
|
|
||||||
self_signed: true,
|
|
||||||
};
|
|
||||||
|
|
||||||
debug!("create ca issuer config");
|
|
||||||
let root_ca_config = CertificateManagementConfig {
|
|
||||||
namespace: Some(nats_cluster.namespace.clone().into()),
|
|
||||||
acme_issuer: None,
|
|
||||||
ca_issuer: Some(CaIssuer {
|
|
||||||
secret_name: format!("{}-tls", root_ca_cert_name),
|
|
||||||
}),
|
|
||||||
self_signed: false,
|
|
||||||
};
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
"creating harmony self signed issuer {}",
|
|
||||||
self_signed_issuer_name
|
|
||||||
);
|
|
||||||
self.create_issuer(self_signed_issuer_name.clone(), &self_signed_config)
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("{}", e))?;
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
"creating root ca cert from self signed issuer {}",
|
|
||||||
&root_ca_cert_name
|
|
||||||
);
|
|
||||||
self.create_certificate(
|
|
||||||
root_ca_cert_name.clone(),
|
|
||||||
self_signed_issuer_name.clone(),
|
|
||||||
Some(format!("harmony-{}-ca", nats_cluster.name)),
|
|
||||||
None,
|
|
||||||
Some(true),
|
|
||||||
&root_ca_config,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("{}", e))?;
|
|
||||||
|
|
||||||
debug!("creating root_ca_issuer {}", &ca_issuer_name);
|
|
||||||
self.create_issuer(ca_issuer_name.clone(), &root_ca_config)
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("{e}"))?;
|
|
||||||
|
|
||||||
debug!(
|
|
||||||
"creating tls cert for nats gateway {}",
|
|
||||||
nats_cluster.tls_cert_name
|
|
||||||
);
|
|
||||||
self.create_certificate(
|
|
||||||
nats_cluster.tls_cert_name.into(),
|
|
||||||
ca_issuer_name,
|
|
||||||
None,
|
|
||||||
Some(vec![nats_cluster.dns_name.clone()]),
|
|
||||||
Some(false),
|
|
||||||
&root_ca_config,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("{e}"))?;
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"Successfully created nats cluster certificates in cluster {}",
|
|
||||||
nats_cluster.name
|
|
||||||
);
|
|
||||||
|
|
||||||
let strategy = ExponentialBackoff::from_millis(250)
|
|
||||||
.factor(2)
|
|
||||||
.max_delay(Duration::from_millis(1000))
|
|
||||||
.take(10);
|
|
||||||
|
|
||||||
let ca_cert = Retry::spawn(strategy, || async {
|
|
||||||
log::debug!("Attempting CA cert fetch");
|
|
||||||
|
|
||||||
let res = self
|
|
||||||
.get_ca_certificate(root_ca_cert_name.clone(), &root_ca_config)
|
|
||||||
.await;
|
|
||||||
|
|
||||||
match res {
|
|
||||||
Ok(cert) => Ok(cert),
|
|
||||||
Err(e) => {
|
|
||||||
log::warn!("Retryable error: {:?}", e);
|
|
||||||
Err(e)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
})
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("Retries exhausted: {:?}", e))?;
|
|
||||||
|
|
||||||
Ok(ca_cert)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn install_local_ca_bundle(
|
|
||||||
&self,
|
|
||||||
nats_cluster: NatsCluster,
|
|
||||||
ca_certs: Option<Vec<String>>,
|
|
||||||
) -> Result<String, String> {
|
|
||||||
match ca_certs {
|
|
||||||
Some(certs) => {
|
|
||||||
self.create_ca_bundle_secret(&nats_cluster, &certs)
|
|
||||||
.await
|
|
||||||
.map_err(|e| e.to_string())?;
|
|
||||||
Ok("Successfully created ca bundle secret".to_string())
|
|
||||||
}
|
|
||||||
None => Ok("no certs to create".to_string()),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,36 +9,23 @@ pub trait Nats {
|
|||||||
&self,
|
&self,
|
||||||
nats_cluster: &NatsCluster,
|
nats_cluster: &NatsCluster,
|
||||||
peers: Option<Vec<NatsCluster>>,
|
peers: Option<Vec<NatsCluster>>,
|
||||||
) -> Result<String, String>;
|
ca_bundle: Option<Vec<String>>,
|
||||||
|
|
||||||
async fn get_nats_endpoint(&self, nats_cluster: &NatsCluster) -> Result<NatsEndpoint, String>;
|
|
||||||
|
|
||||||
async fn get_local_nats_ca_certificate(
|
|
||||||
&self,
|
|
||||||
nats_cluster: &NatsCluster,
|
|
||||||
) -> Result<String, String>;
|
|
||||||
|
|
||||||
async fn install_local_ca_bundle(
|
|
||||||
&self,
|
|
||||||
nats_cluster: NatsCluster,
|
|
||||||
ca_certs: Option<Vec<String>>,
|
|
||||||
) -> Result<String, String>;
|
) -> Result<String, String>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait NatsSupercluster {
|
pub trait NatsSupercluster {
|
||||||
async fn deploy_supercluster(&self, nats_clusters: Vec<NatsCluster>) -> Result<String, String>;
|
async fn deploy_site(
|
||||||
|
&self,
|
||||||
|
nats_clusters: Vec<NatsCluster>,
|
||||||
|
ca_bundle: Vec<String>,
|
||||||
|
) -> Result<String, String>;
|
||||||
|
|
||||||
async fn distribute_ca_bundle(
|
async fn create_site_ca_bundle(
|
||||||
&self,
|
&self,
|
||||||
nats_clusters: Vec<NatsCluster>,
|
nats_clusters: Vec<NatsCluster>,
|
||||||
ca_certs: Option<Vec<String>>,
|
ca_certs: Option<Vec<String>>,
|
||||||
) -> Result<String, String>;
|
) -> Result<Vec<String>, String>;
|
||||||
}
|
|
||||||
|
|
||||||
pub struct SiteContext<T> {
|
|
||||||
pub topology: T,
|
|
||||||
pub cluster: NatsCluster,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
|
|||||||
@@ -2,13 +2,45 @@ use async_trait::async_trait;
|
|||||||
use log::debug;
|
use log::debug;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
modules::nats::capability::{Nats, NatsCluster, NatsEndpoint, NatsSupercluster},
|
modules::{
|
||||||
|
cert_manager::capability::CertificateManagement,
|
||||||
|
nats::{
|
||||||
|
capability::{Nats, NatsCluster, NatsSupercluster},
|
||||||
|
pki::NatsPkiPolicy,
|
||||||
|
},
|
||||||
|
},
|
||||||
topology::{TlsRouter, Topology, decentralized::DecentralizedTopology},
|
topology::{TlsRouter, Topology, decentralized::DecentralizedTopology},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
impl<T: Topology + Nats + TlsRouter + 'static> NatsSupercluster for DecentralizedTopology<T> {
|
impl<T: Topology + Nats + TlsRouter + CertificateManagement + 'static> NatsSupercluster
|
||||||
async fn deploy_supercluster(&self, nats_clusters: Vec<NatsCluster>) -> Result<String, String> {
|
for DecentralizedTopology<T>
|
||||||
|
{
|
||||||
|
async fn create_site_ca_bundle(
|
||||||
|
&self,
|
||||||
|
nats_clusters: Vec<NatsCluster>,
|
||||||
|
ca_certs: Option<Vec<String>>,
|
||||||
|
) -> Result<Vec<String>, String> {
|
||||||
|
debug!("extracting nats clusters");
|
||||||
|
let mut ca_bundle = Vec::new();
|
||||||
|
for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) {
|
||||||
|
let policy = NatsPkiPolicy { topology: site };
|
||||||
|
ca_bundle.push(policy.ensure_nats_ca(cluster).await.unwrap());
|
||||||
|
}
|
||||||
|
|
||||||
|
if let Some(mut certs) = ca_certs {
|
||||||
|
ca_bundle.append(&mut certs)
|
||||||
|
}
|
||||||
|
|
||||||
|
debug!("creating ca bundle {:#?} on site ", ca_bundle);
|
||||||
|
|
||||||
|
Ok(ca_bundle)
|
||||||
|
}
|
||||||
|
async fn deploy_site(
|
||||||
|
&self,
|
||||||
|
nats_clusters: Vec<NatsCluster>,
|
||||||
|
ca_bundle: Vec<String>,
|
||||||
|
) -> Result<String, String> {
|
||||||
for (i, (site, cluster)) in self.sites.iter().zip(nats_clusters.iter()).enumerate() {
|
for (i, (site, cluster)) in self.sites.iter().zip(nats_clusters.iter()).enumerate() {
|
||||||
let peers: Vec<NatsCluster> = nats_clusters
|
let peers: Vec<NatsCluster> = nats_clusters
|
||||||
.iter()
|
.iter()
|
||||||
@@ -16,37 +48,10 @@ impl<T: Topology + Nats + TlsRouter + 'static> NatsSupercluster for Decentralize
|
|||||||
.filter_map(|(j, c)| if i == j { None } else { Some(c.clone()) })
|
.filter_map(|(j, c)| if i == j { None } else { Some(c.clone()) })
|
||||||
.collect();
|
.collect();
|
||||||
|
|
||||||
site.deploy(cluster, Some(peers)).await?;
|
site.deploy(cluster, Some(peers), Some(ca_bundle.clone()))
|
||||||
|
.await?;
|
||||||
}
|
}
|
||||||
|
|
||||||
Ok(format!("Deployed Nats clusters across sites",))
|
Ok(format!("Deployed Nats clusters across sites",))
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn distribute_ca_bundle(
|
|
||||||
&self,
|
|
||||||
nats_clusters: Vec<NatsCluster>,
|
|
||||||
ca_certs: Option<Vec<String>>,
|
|
||||||
) -> Result<String, String> {
|
|
||||||
debug!("extracting nats clusters");
|
|
||||||
let mut ca_bundle = Vec::new();
|
|
||||||
for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) {
|
|
||||||
let ca_cert = site.get_local_nats_ca_certificate(&cluster).await?;
|
|
||||||
ca_bundle.push(ca_cert);
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(mut certs) = ca_certs {
|
|
||||||
ca_bundle.append(&mut certs)
|
|
||||||
}
|
|
||||||
|
|
||||||
debug!("creating ca bundle {:#?} on site 1", ca_bundle);
|
|
||||||
|
|
||||||
for (site, cluster) in self.sites.iter().zip(nats_clusters.iter()) {
|
|
||||||
site.install_local_ca_bundle(cluster.clone(), Some(ca_bundle.clone()))
|
|
||||||
.await?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(format!(
|
|
||||||
"Successfully deployed nats ca bundle accross clusters",
|
|
||||||
))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,4 +1,5 @@
|
|||||||
pub mod capability;
|
pub mod capability;
|
||||||
pub mod decentralized;
|
pub mod decentralized;
|
||||||
|
pub mod pki;
|
||||||
pub mod score_nats_k8s;
|
pub mod score_nats_k8s;
|
||||||
pub mod score_nats_supercluster;
|
pub mod score_nats_supercluster;
|
||||||
|
|||||||
77
harmony/src/modules/nats/pki.rs
Normal file
77
harmony/src/modules/nats/pki.rs
Normal file
@@ -0,0 +1,77 @@
|
|||||||
|
use crate::modules::{
|
||||||
|
cert_manager::{
|
||||||
|
capability::{CertificateManagement, CertificateManagementConfig},
|
||||||
|
crd::CaIssuer,
|
||||||
|
},
|
||||||
|
nats::capability::NatsCluster,
|
||||||
|
};
|
||||||
|
|
||||||
|
pub struct NatsPkiPolicy<'a, T> {
|
||||||
|
pub topology: &'a T,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<'a, T> NatsPkiPolicy<'a, T>
|
||||||
|
where
|
||||||
|
T: CertificateManagement,
|
||||||
|
{
|
||||||
|
pub async fn ensure_nats_ca(&self, cluster: &NatsCluster) -> Result<String, String> {
|
||||||
|
let ca_issuer_name = "harmony-ca-issuer";
|
||||||
|
let root_ca_cert_name = "harmony-root-ca";
|
||||||
|
let self_signed_issuer_name = "harmony-self-signed-issuer";
|
||||||
|
|
||||||
|
let self_signed_config = CertificateManagementConfig {
|
||||||
|
namespace: Some(cluster.namespace.clone()),
|
||||||
|
acme_issuer: None,
|
||||||
|
ca_issuer: None,
|
||||||
|
self_signed: true,
|
||||||
|
};
|
||||||
|
|
||||||
|
let root_ca_config = CertificateManagementConfig {
|
||||||
|
namespace: Some(cluster.namespace.clone()),
|
||||||
|
acme_issuer: None,
|
||||||
|
ca_issuer: Some(CaIssuer {
|
||||||
|
secret_name: format!("{}-tls", root_ca_cert_name),
|
||||||
|
}),
|
||||||
|
self_signed: false,
|
||||||
|
};
|
||||||
|
|
||||||
|
self.topology
|
||||||
|
.create_issuer(self_signed_issuer_name.into(), &self_signed_config)
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.to_string())?;
|
||||||
|
|
||||||
|
self.topology
|
||||||
|
.create_certificate(
|
||||||
|
root_ca_cert_name.into(),
|
||||||
|
self_signed_issuer_name.into(),
|
||||||
|
Some(format!("harmony-{}-ca", cluster.name)),
|
||||||
|
None,
|
||||||
|
Some(true),
|
||||||
|
&root_ca_config,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.to_string())?;
|
||||||
|
|
||||||
|
self.topology
|
||||||
|
.create_issuer(ca_issuer_name.into(), &root_ca_config)
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.to_string())?;
|
||||||
|
|
||||||
|
self.topology
|
||||||
|
.create_certificate(
|
||||||
|
cluster.tls_cert_name.into(),
|
||||||
|
ca_issuer_name.into(),
|
||||||
|
None,
|
||||||
|
Some(vec![cluster.dns_name.clone()]),
|
||||||
|
Some(false),
|
||||||
|
&root_ca_config,
|
||||||
|
)
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.to_string())?;
|
||||||
|
|
||||||
|
self.topology
|
||||||
|
.get_ca_certificate(root_ca_cert_name.into(), &root_ca_config)
|
||||||
|
.await
|
||||||
|
.map_err(|e| e.to_string())
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,52 +1,230 @@
|
|||||||
use std::str::FromStr;
|
use std::{collections::BTreeMap, str::FromStr};
|
||||||
|
|
||||||
|
use async_trait::async_trait;
|
||||||
use harmony_macros::hurl;
|
use harmony_macros::hurl;
|
||||||
use log::debug;
|
use harmony_secret::{Secret, SecretManager};
|
||||||
|
use harmony_types::id::Id;
|
||||||
|
use k8s_openapi::{ByteString, api::core::v1::Secret as K8sSecret};
|
||||||
|
use kube::api::ObjectMeta;
|
||||||
|
use log::{debug, info};
|
||||||
use non_blank_string_rs::NonBlankString;
|
use non_blank_string_rs::NonBlankString;
|
||||||
use serde::Serialize;
|
use schemars::JsonSchema;
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
interpret::Interpret,
|
data::Version,
|
||||||
|
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||||
|
inventory::Inventory,
|
||||||
modules::{
|
modules::{
|
||||||
helm::chart::{HelmChartScore, HelmRepository},
|
helm::chart::{HelmChartScore, HelmRepository},
|
||||||
nats::capability::{NatsCluster, NatsEndpoint},
|
k8s::{ingress::K8sIngressScore, resource::K8sResourceScore},
|
||||||
|
nats::capability::{Nats, NatsCluster, NatsEndpoint},
|
||||||
|
okd::{
|
||||||
|
crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig},
|
||||||
|
route::OKDRouteScore,
|
||||||
|
},
|
||||||
},
|
},
|
||||||
score::Score,
|
score::Score,
|
||||||
topology::{HelmCommand, Topology},
|
topology::{HelmCommand, K8sclient, KubernetesDistribution, TlsRouter, Topology},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
pub struct NatsK8sScore {
|
pub struct NatsK8sScore {
|
||||||
|
pub distribution: KubernetesDistribution,
|
||||||
pub cluster: NatsCluster,
|
pub cluster: NatsCluster,
|
||||||
pub domain: NatsEndpoint,
|
|
||||||
pub peers: Option<Vec<NatsCluster>>,
|
pub peers: Option<Vec<NatsCluster>>,
|
||||||
|
pub ca_bundle: Option<Vec<String>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Topology + HelmCommand + 'static> Score<T> for NatsK8sScore {
|
impl<T: Topology + HelmCommand + Nats + K8sclient + TlsRouter + 'static> Score<T> for NatsK8sScore {
|
||||||
fn name(&self) -> String {
|
fn name(&self) -> String {
|
||||||
"NatsK8sScore".to_string()
|
"NatsK8sScore".to_string()
|
||||||
}
|
}
|
||||||
|
|
||||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||||
self.build_deploy_nats_score(
|
Box::new(NatsK8sInterpret {
|
||||||
self.cluster.clone(),
|
score: self.clone(),
|
||||||
self.domain.clone(),
|
})
|
||||||
self.peers.clone(),
|
}
|
||||||
self.cluster.namespace.clone(),
|
}
|
||||||
|
#[derive(Debug)]
|
||||||
|
pub struct NatsK8sInterpret {
|
||||||
|
score: NatsK8sScore,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<T: Topology + HelmCommand + Nats + K8sclient + TlsRouter + 'static> Interpret<T>
|
||||||
|
for NatsK8sInterpret
|
||||||
|
{
|
||||||
|
async fn execute(
|
||||||
|
&self,
|
||||||
|
inventory: &Inventory,
|
||||||
|
topology: &T,
|
||||||
|
) -> Result<Outcome, InterpretError> {
|
||||||
|
let domain = topology
|
||||||
|
.get_internal_domain()
|
||||||
|
.await?
|
||||||
|
.ok_or(InterpretError::new("no internal domain found".to_string()))
|
||||||
|
.unwrap();
|
||||||
|
|
||||||
|
info!("creating ingress for nats cluster");
|
||||||
|
self.create_ingress(
|
||||||
|
topology,
|
||||||
|
inventory,
|
||||||
|
self.score.distribution.clone(),
|
||||||
|
self.score.cluster.clone(),
|
||||||
)
|
)
|
||||||
.create_interpret()
|
.await?;
|
||||||
|
|
||||||
|
let domain = NatsEndpoint { host: domain };
|
||||||
|
|
||||||
|
info!("creating nats ca bundle secret");
|
||||||
|
self.create_ca_bundle_secret(
|
||||||
|
topology,
|
||||||
|
inventory,
|
||||||
|
self.score.cluster.clone(),
|
||||||
|
self.score.ca_bundle.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
info!("deploying nats cluster");
|
||||||
|
self.deploy_nats(
|
||||||
|
topology,
|
||||||
|
inventory,
|
||||||
|
self.score.cluster.clone(),
|
||||||
|
domain,
|
||||||
|
self.score.peers.clone(),
|
||||||
|
self.score.cluster.namespace.clone(),
|
||||||
|
)
|
||||||
|
.await?;
|
||||||
|
Ok(Outcome::success(
|
||||||
|
"successfully deployed nats K8s".to_string(),
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_name(&self) -> InterpretName {
|
||||||
|
InterpretName::Custom("NatsK8sInterpret")
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_version(&self) -> Version {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_status(&self) -> InterpretStatus {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_children(&self) -> Vec<Id> {
|
||||||
|
todo!()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
impl NatsK8sScore {
|
impl NatsK8sInterpret {
|
||||||
fn build_deploy_nats_score<T: Topology + HelmCommand + 'static>(
|
async fn create_ingress<T: Topology + K8sclient>(
|
||||||
&self,
|
&self,
|
||||||
|
topology: &T,
|
||||||
|
inventory: &Inventory,
|
||||||
|
distribution: KubernetesDistribution,
|
||||||
|
nats_cluster: NatsCluster,
|
||||||
|
) -> Result<Outcome, InterpretError> {
|
||||||
|
match distribution {
|
||||||
|
KubernetesDistribution::OpenshiftFamily => {
|
||||||
|
OKDRouteScore {
|
||||||
|
name: nats_cluster.name.to_string(),
|
||||||
|
namespace: nats_cluster.namespace.to_string(),
|
||||||
|
spec: RouteSpec {
|
||||||
|
to: RouteTargetReference {
|
||||||
|
kind: "Service".to_string(),
|
||||||
|
name: format!("{}", nats_cluster.name.to_string()),
|
||||||
|
weight: Some(100),
|
||||||
|
},
|
||||||
|
host: Some(nats_cluster.dns_name.clone()),
|
||||||
|
port: Some(RoutePort { target_port: 7222 }),
|
||||||
|
tls: Some(TLSConfig {
|
||||||
|
insecure_edge_termination_policy: None,
|
||||||
|
termination: "passthrough".to_string(),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
wildcard_policy: None,
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
}
|
||||||
|
.interpret(inventory, topology)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
KubernetesDistribution::K3sFamily | KubernetesDistribution::Default => {
|
||||||
|
//TODO untested
|
||||||
|
K8sIngressScore {
|
||||||
|
name: todo!(),
|
||||||
|
host: todo!(),
|
||||||
|
backend_service: todo!(),
|
||||||
|
port: todo!(),
|
||||||
|
path: todo!(),
|
||||||
|
path_type: todo!(),
|
||||||
|
namespace: todo!(),
|
||||||
|
ingress_class_name: todo!(),
|
||||||
|
}
|
||||||
|
.interpret(inventory, topology)
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn create_ca_bundle_secret<T: Topology + K8sclient>(
|
||||||
|
&self,
|
||||||
|
topology: &T,
|
||||||
|
inventory: &Inventory,
|
||||||
|
nats_cluster: NatsCluster,
|
||||||
|
ca_bundle: Option<Vec<String>>,
|
||||||
|
) -> Result<Outcome, InterpretError> {
|
||||||
|
async fn build_secret_data(bundle: &Vec<String>) -> BTreeMap<String, ByteString> {
|
||||||
|
let mut data = BTreeMap::new();
|
||||||
|
|
||||||
|
data.insert(
|
||||||
|
"ca.crt".to_string(),
|
||||||
|
ByteString(bundle.join("\n").into_bytes()),
|
||||||
|
);
|
||||||
|
|
||||||
|
data
|
||||||
|
}
|
||||||
|
|
||||||
|
match ca_bundle {
|
||||||
|
Some(certs) => {
|
||||||
|
let bundle_secret = K8sSecret {
|
||||||
|
metadata: ObjectMeta {
|
||||||
|
name: Some(nats_cluster.supercluster_ca_secret_name.to_string()),
|
||||||
|
namespace: Some(nats_cluster.namespace.to_string()),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
data: Some(build_secret_data(&certs).await),
|
||||||
|
immutable: Some(false),
|
||||||
|
type_: Some("Opaque".to_string()),
|
||||||
|
string_data: None,
|
||||||
|
};
|
||||||
|
|
||||||
|
K8sResourceScore::single(bundle_secret, Some(nats_cluster.namespace.clone()))
|
||||||
|
.interpret(inventory, topology)
|
||||||
|
.await
|
||||||
|
.map_err(|e| InterpretError::new(e.to_string()))
|
||||||
|
}
|
||||||
|
None => Ok(Outcome::noop("no certs to create".to_string())),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn deploy_nats<T: Topology + HelmCommand + 'static>(
|
||||||
|
&self,
|
||||||
|
topology: &T,
|
||||||
|
inventory: &Inventory,
|
||||||
cluster: NatsCluster,
|
cluster: NatsCluster,
|
||||||
domain: NatsEndpoint,
|
domain: NatsEndpoint,
|
||||||
peers: Option<Vec<NatsCluster>>,
|
peers: Option<Vec<NatsCluster>>,
|
||||||
namespace: String,
|
namespace: String,
|
||||||
) -> Box<dyn Score<T>> {
|
) -> Result<Outcome, InterpretError> {
|
||||||
let mut gateway_gateways = String::new();
|
let mut gateway_gateways = String::new();
|
||||||
|
let admin = SecretManager::get_or_prompt::<NatsAdmin>().await.unwrap();
|
||||||
|
let user = admin.user.clone();
|
||||||
|
let password = admin.password.clone();
|
||||||
|
|
||||||
match peers {
|
match peers {
|
||||||
Some(peers) => {
|
Some(peers) => {
|
||||||
for peer in peers {
|
for peer in peers {
|
||||||
@@ -84,8 +262,8 @@ config:
|
|||||||
accounts:
|
accounts:
|
||||||
system:
|
system:
|
||||||
users:
|
users:
|
||||||
- user: "admin"
|
- user: "{user}"
|
||||||
password: "admin_2"
|
password: "{password}"
|
||||||
logtime: true
|
logtime: true
|
||||||
debug: true
|
debug: true
|
||||||
trace: true
|
trace: true
|
||||||
@@ -134,6 +312,8 @@ natsBox:
|
|||||||
image:
|
image:
|
||||||
tag: nonroot"#,
|
tag: nonroot"#,
|
||||||
cluster_name = cluster.name,
|
cluster_name = cluster.name,
|
||||||
|
user = user,
|
||||||
|
password = password,
|
||||||
replicas = cluster.replicas,
|
replicas = cluster.replicas,
|
||||||
domain = domain,
|
domain = domain,
|
||||||
gateway_gateways = gateway_gateways,
|
gateway_gateways = gateway_gateways,
|
||||||
@@ -159,7 +339,12 @@ natsBox:
|
|||||||
true,
|
true,
|
||||||
)),
|
)),
|
||||||
};
|
};
|
||||||
|
nats.interpret(inventory, topology).await
|
||||||
Box::new(nats)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[derive(Secret, Serialize, Deserialize, JsonSchema, Debug, PartialEq, Clone)]
|
||||||
|
struct NatsAdmin {
|
||||||
|
user: String,
|
||||||
|
password: String,
|
||||||
|
}
|
||||||
|
|||||||
@@ -45,14 +45,14 @@ impl<T: Topology + NatsSupercluster> Interpret<T> for NatsSuperclusterInterpret
|
|||||||
topology: &T,
|
topology: &T,
|
||||||
) -> Result<Outcome, InterpretError> {
|
) -> Result<Outcome, InterpretError> {
|
||||||
info!("creating nats supercluster ca bundle");
|
info!("creating nats supercluster ca bundle");
|
||||||
topology
|
let ca_bundle = topology
|
||||||
.distribute_ca_bundle(self.nats_clusters.clone(), self.ca_certs.clone())
|
.create_site_ca_bundle(self.nats_clusters.clone(), self.ca_certs.clone())
|
||||||
.await
|
.await
|
||||||
.map_err(|e| InterpretError::new(e))?;
|
.map_err(|e| InterpretError::new(e))?;
|
||||||
|
|
||||||
info!("deploying nats supercluster");
|
info!("deploying nats supercluster");
|
||||||
let cluster_name = topology
|
let cluster_name = topology
|
||||||
.deploy_supercluster(self.nats_clusters.clone())
|
.deploy_site(self.nats_clusters.clone(), ca_bundle)
|
||||||
.await
|
.await
|
||||||
.map_err(|e| InterpretError::new(e))?;
|
.map_err(|e| InterpretError::new(e))?;
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user