Files
harmony/examples/nats-supercluster/src/main.rs
2026-05-20 12:03:19 -04:00

481 lines
13 KiB
Rust

use std::{collections::BTreeMap, str::FromStr};
use harmony::{
interpret::{InterpretError, Outcome},
inventory::Inventory,
modules::{
cert_manager::{
capability::{CertificateManagement, CertificateManagementConfig},
crd::CaIssuer,
},
helm::chart::{HelmChartScore, HelmRepository, NonBlankString},
k8s::resource::K8sResourceScore,
okd::{
crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig},
route::OKDRouteScore,
},
},
score::Score,
topology::{
HelmCommand, K8sAnywhereConfig, K8sAnywhereTopology, K8sclient, TlsRouter, Topology,
},
};
use harmony_macros::hurl;
use k8s_openapi::{
ByteString, api::core::v1::Secret, apimachinery::pkg::apis::meta::v1::ObjectMeta,
};
use log::{debug, info};
#[tokio::main]
async fn main() -> Result<(), InterpretError> {
let namespace = "nats-supercluster-test";
let self_signed_issuer_name = "harmony-self-signed-issuer";
let ca_issuer_name = "harmony-ca-issuer";
let root_ca_cert_name = "harmony-root-ca";
log::info!("starting nats supercluster bootstrap");
// --------------------------------------------------
// 1. Build site contexts
// --------------------------------------------------
let site1 = site(
"HARMONY_NATS_SITE_1",
"HARMONY_NATS_SITE_1_DOMAIN",
"nats-sto1-cert-test1",
);
let site2 = site(
"HARMONY_NATS_SITE_2",
"HARMONY_NATS_SITE_2_DOMAIN",
"nats-cb1-cert-test2",
);
// --------------------------------------------------
// 2. Ensure clusters are reachable
// --------------------------------------------------
log::info!("ensuring both topologies are ready");
tokio::try_join!(site1.topology.ensure_ready(), site2.topology.ensure_ready(),)?;
// --------------------------------------------------
// 3. Create certificates
// --------------------------------------------------
log::info!("creating certificates");
let root_ca_config = CertificateManagementConfig {
namespace: Some(namespace.into()),
acme_issuer: None,
ca_issuer: Some(CaIssuer {
secret_name: format!("{}-tls", root_ca_cert_name),
}),
self_signed: false,
};
let self_signed_config = CertificateManagementConfig {
namespace: Some(namespace.to_string().clone()),
acme_issuer: None,
ca_issuer: None,
self_signed: true,
};
tokio::try_join!(
create_nats_certs(
site1.topology.clone(),
&site1.cluster,
ca_issuer_name,
&root_ca_config,
self_signed_issuer_name,
&self_signed_config,
root_ca_cert_name
),
create_nats_certs(
site2.topology.clone(),
&site2.cluster,
ca_issuer_name,
&root_ca_config,
self_signed_issuer_name,
&self_signed_config,
root_ca_cert_name
),
)?;
// --------------------------------------------------
// 4. Build CA bundle
// --------------------------------------------------
log::info!("building supercluster CA bundle");
let mut ca_bundle = Vec::new();
ca_bundle.push(
site1
.topology
.get_ca_certificate(root_ca_cert_name.to_string(), &root_ca_config)
.await?,
);
ca_bundle.push(
site2
.topology
.get_ca_certificate(root_ca_cert_name.to_string(), &root_ca_config)
.await?,
);
// --------------------------------------------------
// 5. Build Scores
// --------------------------------------------------
log::info!("building scores");
let site1_scores = vec![
build_ca_bundle_secret_score(
site1.topology.clone(),
&site1.cluster,
&ca_bundle,
namespace.into(),
)
.await,
build_route_score(site1.topology.clone(), &site1.cluster, namespace.into()).await,
build_deploy_nats_score(
site1.topology.clone(),
&site1.cluster,
vec![&site2.cluster],
namespace.into(),
)
.await,
];
let site2_scores = vec![
build_ca_bundle_secret_score(
site2.topology.clone(),
&site2.cluster,
&ca_bundle,
namespace.into(),
)
.await,
build_route_score(site2.topology.clone(), &site2.cluster, namespace.into()).await,
build_deploy_nats_score(
site2.topology.clone(),
&site2.cluster,
vec![&site1.cluster],
namespace.into(),
)
.await,
];
// --------------------------------------------------
// 6. Apply Scores
// --------------------------------------------------
log::info!("applying scores");
tokio::try_join!(
apply_scores(site1.topology.clone(), site1_scores),
apply_scores(site2.topology.clone(), site2_scores),
)?;
log::info!("supercluster bootstrap complete");
log::info!(
"Enjoy! You can test your nats cluster by running : `kubectl exec -n {namespace} -it deployment/nats-box -- nats pub test hi`"
);
Ok(())
}
async fn apply_scores<T: Topology + 'static>(
topology: T,
scores: Vec<Box<dyn Score<T>>>,
) -> Result<(), InterpretError> {
info!("applying {} scores", scores.len());
harmony_cli::run(Inventory::autoload(), topology, scores, None)
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
Ok(())
}
fn site(
topo_env: &str,
domain_env: &str,
cluster_name: &'static str,
) -> SiteContext<K8sAnywhereTopology> {
let domain = std::env::var(domain_env).expect("missing domain env");
let topology =
K8sAnywhereTopology::with_config(K8sAnywhereConfig::remote_k8s_from_env_var(topo_env));
SiteContext {
topology,
cluster: NatsCluster {
replicas: 1,
name: cluster_name,
gateway_advertise: format!("{cluster_name}-gw.{domain}:443"),
dns_name: format!("{cluster_name}-gw.{domain}"),
supercluster_ca_secret_name: "nats-supercluster-ca-bundle",
tls_cert_name: "nats-gateway",
jetstream_enabled: "true",
},
}
}
struct SiteContext<T> {
topology: T,
cluster: NatsCluster,
}
struct NatsCluster {
replicas: usize,
name: &'static str,
gateway_advertise: String,
dns_name: String,
supercluster_ca_secret_name: &'static str,
tls_cert_name: &'static str,
jetstream_enabled: &'static str,
}
async fn create_nats_certs<T: Topology + CertificateManagement>(
topology: T,
cluster: &NatsCluster,
ca_issuer_name: &str,
ca_cert_mgmt_config: &CertificateManagementConfig,
self_signed_issuer_name: &str,
self_signed_cert_config: &CertificateManagementConfig,
root_ca_cert_name: &str,
) -> Result<Outcome, InterpretError> {
//the order is pretty important
debug!(
"Applying certs to ns {:#?}",
ca_cert_mgmt_config.namespace.clone()
);
debug!("creating issuer '{}'", self_signed_issuer_name);
topology
.create_issuer(self_signed_issuer_name.to_string(), self_signed_cert_config)
.await?;
debug!("creating certificate {root_ca_cert_name}");
topology
.create_certificate(
root_ca_cert_name.to_string(),
self_signed_issuer_name.to_string(),
Some(format!("harmony-{}-ca", cluster.name)),
None,
Some(true),
ca_cert_mgmt_config,
)
.await?;
debug!("creating issuer '{}'", ca_issuer_name);
topology
.create_issuer(ca_issuer_name.to_string(), ca_cert_mgmt_config)
.await?;
debug!("creating certificate {}", cluster.tls_cert_name);
topology
.create_certificate(
cluster.tls_cert_name.to_string(),
ca_issuer_name.to_string(),
None,
Some(vec![cluster.dns_name.clone()]),
Some(true),
ca_cert_mgmt_config,
)
.await?;
Ok(Outcome::success("success".to_string()))
}
async fn build_ca_bundle_secret(
namespace: &str,
nats_cluster: &NatsCluster,
bundle: &[String],
) -> Secret {
Secret {
metadata: ObjectMeta {
name: Some(nats_cluster.supercluster_ca_secret_name.to_string()),
namespace: Some(namespace.to_string()),
..Default::default()
},
data: Some(build_secret_data(bundle).await),
immutable: Some(false),
type_: Some("Opaque".to_string()),
string_data: None,
}
}
async fn build_secret_data(bundle: &[String]) -> BTreeMap<String, ByteString> {
let mut data = BTreeMap::new();
data.insert(
"ca.crt".to_string(),
ByteString(bundle.join("\n").into_bytes()),
);
data
}
async fn build_ca_bundle_secret_score<T: Topology + K8sclient + 'static>(
_topology: T,
nats_cluster: &NatsCluster,
ca_bundle: &[String],
namespace: String,
) -> Box<dyn Score<T>> {
let bundle_secret = build_ca_bundle_secret(&namespace, nats_cluster, ca_bundle).await;
debug!(
"deploying secret to ns: {} \nsecret: {:#?}",
namespace, bundle_secret
);
let k8ssecret = K8sResourceScore::single(bundle_secret, Some(namespace));
Box::new(k8ssecret)
}
async fn build_route_score<T: Topology + K8sclient + 'static>(
_topology: T,
cluster: &NatsCluster,
namespace: String,
) -> Box<dyn Score<T>> {
let route = OKDRouteScore {
name: cluster.name.to_string(),
namespace,
annotations: Default::default(),
spec: RouteSpec {
to: RouteTargetReference {
kind: "Service".to_string(),
name: cluster.name.to_string(),
weight: Some(100),
},
host: Some(cluster.dns_name.clone()),
port: Some(RoutePort { target_port: 7222 }),
tls: Some(TLSConfig {
insecure_edge_termination_policy: None,
termination: "passthrough".to_string(),
..Default::default()
}),
wildcard_policy: None,
..Default::default()
},
};
Box::new(route)
}
async fn build_deploy_nats_score<T: Topology + HelmCommand + TlsRouter + 'static>(
topology: T,
cluster: &NatsCluster,
peers: Vec<&NatsCluster>,
namespace: String,
) -> Box<dyn Score<T>> {
let mut gateway_gateways = String::new();
for peer in peers {
// Construct wss:// URLs on port 443 for the remote gateways
gateway_gateways.push_str(&format!(
r#"
- name: {}
urls:
- nats://{}"#,
peer.name, peer.gateway_advertise
));
}
let domain = topology.get_internal_domain().await.unwrap().unwrap();
// Inject gateway config into the 'merge' block to comply with chart structure
let tls_secret_name = format!("{}-tls", cluster.tls_cert_name);
let values_yaml = Some(format!(
r#"config:
merge:
authorization:
default_permissions:
publish: ["TEST.*"]
subscribe: ["PUBLIC.>"]
users:
# - user: "admin"
# password: "admin_1"
# permissions:
# publish: ">"
# subscribe: ">"
- password: "enGk0cgZUabM6bN6FXHT"
user: "testUser"
accounts:
system:
users:
- user: "admin"
password: "admin_2"
logtime: true
debug: true
trace: true
system_account: system
cluster:
name: {cluster_name}
enabled: true
replicas: {replicas}
jetstream:
enabled: {jetstream_enabled}
fileStorage:
enabled: true
size: 10Gi
storageDirectory: /data/jetstream
leafnodes:
enabled: false
websocket:
enabled: false
ingress:
enabled: true
className: openshift-default
pathType: Prefix
hosts:
- nats-ws.{domain}
gateway:
enabled: true
port: 7222
name: {cluster_name}
merge:
advertise: {gateway_advertise}
gateways: {gateway_gateways}
tls:
enabled: true
secretName: {tls_secret_name}
# merge:
# ca_file: "/etc/nats-certs/gateway/ca.crt"
service:
ports:
gateway:
enabled: true
tlsCA:
enabled: true
secretName: {supercluster_ca_secret_name}
natsBox:
container:
image:
tag: nonroot"#,
cluster_name = cluster.name,
replicas = cluster.replicas,
domain = domain,
gateway_gateways = gateway_gateways,
gateway_advertise = cluster.gateway_advertise,
tls_secret_name = tls_secret_name,
jetstream_enabled = cluster.jetstream_enabled,
supercluster_ca_secret_name = cluster.supercluster_ca_secret_name,
));
debug!("Prepared Helm Chart values : \n{values_yaml:#?}");
let nats = HelmChartScore {
namespace: Some(NonBlankString::from_str(&namespace).unwrap()),
release_name: NonBlankString::from_str(cluster.name).unwrap(),
chart_name: NonBlankString::from_str("nats/nats").unwrap(),
chart_version: None,
values_overrides: None,
values_yaml,
create_namespace: true,
install_only: false,
repository: Some(HelmRepository::new(
"nats".to_string(),
hurl!("https://nats-io.github.io/k8s/helm/charts/"),
true,
)),
};
Box::new(nats)
}