Some checks failed
Run Check Script / check (pull_request) Failing after 1m52s
481 lines
13 KiB
Rust
481 lines
13 KiB
Rust
use std::{collections::BTreeMap, str::FromStr};
|
|
|
|
use harmony::{
|
|
interpret::{InterpretError, Outcome},
|
|
inventory::Inventory,
|
|
modules::{
|
|
cert_manager::{
|
|
capability::{CertificateManagement, CertificateManagementConfig},
|
|
crd::CaIssuer,
|
|
},
|
|
helm::chart::{HelmChartScore, HelmRepository, NonBlankString},
|
|
k8s::resource::K8sResourceScore,
|
|
okd::{
|
|
crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig},
|
|
route::OKDRouteScore,
|
|
},
|
|
},
|
|
score::Score,
|
|
topology::{
|
|
HelmCommand, K8sAnywhereConfig, K8sAnywhereTopology, K8sclient, TlsRouter, Topology,
|
|
},
|
|
};
|
|
use harmony_macros::hurl;
|
|
use k8s_openapi::{
|
|
ByteString, api::core::v1::Secret, apimachinery::pkg::apis::meta::v1::ObjectMeta,
|
|
};
|
|
use log::{debug, info};
|
|
|
|
#[tokio::main]
|
|
async fn main() -> Result<(), InterpretError> {
|
|
let namespace = "nats-supercluster-test";
|
|
let self_signed_issuer_name = "harmony-self-signed-issuer";
|
|
let ca_issuer_name = "harmony-ca-issuer";
|
|
let root_ca_cert_name = "harmony-root-ca";
|
|
|
|
log::info!("starting nats supercluster bootstrap");
|
|
|
|
// --------------------------------------------------
|
|
// 1. Build site contexts
|
|
// --------------------------------------------------
|
|
|
|
let site1 = site(
|
|
"HARMONY_NATS_SITE_1",
|
|
"HARMONY_NATS_SITE_1_DOMAIN",
|
|
"nats-sto1-cert-test1",
|
|
);
|
|
|
|
let site2 = site(
|
|
"HARMONY_NATS_SITE_2",
|
|
"HARMONY_NATS_SITE_2_DOMAIN",
|
|
"nats-cb1-cert-test2",
|
|
);
|
|
|
|
// --------------------------------------------------
|
|
// 2. Ensure clusters are reachable
|
|
// --------------------------------------------------
|
|
|
|
log::info!("ensuring both topologies are ready");
|
|
|
|
tokio::try_join!(site1.topology.ensure_ready(), site2.topology.ensure_ready(),)?;
|
|
|
|
// --------------------------------------------------
|
|
// 3. Create certificates
|
|
// --------------------------------------------------
|
|
|
|
log::info!("creating certificates");
|
|
|
|
let root_ca_config = CertificateManagementConfig {
|
|
namespace: Some(namespace.into()),
|
|
acme_issuer: None,
|
|
ca_issuer: Some(CaIssuer {
|
|
secret_name: format!("{}-tls", root_ca_cert_name),
|
|
}),
|
|
self_signed: false,
|
|
};
|
|
|
|
let self_signed_config = CertificateManagementConfig {
|
|
namespace: Some(namespace.to_string().clone()),
|
|
acme_issuer: None,
|
|
ca_issuer: None,
|
|
self_signed: true,
|
|
};
|
|
|
|
tokio::try_join!(
|
|
create_nats_certs(
|
|
site1.topology.clone(),
|
|
&site1.cluster,
|
|
ca_issuer_name,
|
|
&root_ca_config,
|
|
self_signed_issuer_name,
|
|
&self_signed_config,
|
|
root_ca_cert_name
|
|
),
|
|
create_nats_certs(
|
|
site2.topology.clone(),
|
|
&site2.cluster,
|
|
ca_issuer_name,
|
|
&root_ca_config,
|
|
self_signed_issuer_name,
|
|
&self_signed_config,
|
|
root_ca_cert_name
|
|
),
|
|
)?;
|
|
|
|
// --------------------------------------------------
|
|
// 4. Build CA bundle
|
|
// --------------------------------------------------
|
|
|
|
log::info!("building supercluster CA bundle");
|
|
|
|
let mut ca_bundle = Vec::new();
|
|
|
|
ca_bundle.push(
|
|
site1
|
|
.topology
|
|
.get_ca_certificate(root_ca_cert_name.to_string(), &root_ca_config)
|
|
.await?,
|
|
);
|
|
ca_bundle.push(
|
|
site2
|
|
.topology
|
|
.get_ca_certificate(root_ca_cert_name.to_string(), &root_ca_config)
|
|
.await?,
|
|
);
|
|
|
|
// --------------------------------------------------
|
|
// 5. Build Scores
|
|
// --------------------------------------------------
|
|
|
|
log::info!("building scores");
|
|
|
|
let site1_scores = vec![
|
|
build_ca_bundle_secret_score(
|
|
site1.topology.clone(),
|
|
&site1.cluster,
|
|
&ca_bundle,
|
|
namespace.into(),
|
|
)
|
|
.await,
|
|
build_route_score(site1.topology.clone(), &site1.cluster, namespace.into()).await,
|
|
build_deploy_nats_score(
|
|
site1.topology.clone(),
|
|
&site1.cluster,
|
|
vec![&site2.cluster],
|
|
namespace.into(),
|
|
)
|
|
.await,
|
|
];
|
|
|
|
let site2_scores = vec![
|
|
build_ca_bundle_secret_score(
|
|
site2.topology.clone(),
|
|
&site2.cluster,
|
|
&ca_bundle,
|
|
namespace.into(),
|
|
)
|
|
.await,
|
|
build_route_score(site2.topology.clone(), &site2.cluster, namespace.into()).await,
|
|
build_deploy_nats_score(
|
|
site2.topology.clone(),
|
|
&site2.cluster,
|
|
vec![&site1.cluster],
|
|
namespace.into(),
|
|
)
|
|
.await,
|
|
];
|
|
|
|
// --------------------------------------------------
|
|
// 6. Apply Scores
|
|
// --------------------------------------------------
|
|
|
|
log::info!("applying scores");
|
|
|
|
tokio::try_join!(
|
|
apply_scores(site1.topology.clone(), site1_scores),
|
|
apply_scores(site2.topology.clone(), site2_scores),
|
|
)?;
|
|
|
|
log::info!("supercluster bootstrap complete");
|
|
log::info!(
|
|
"Enjoy! You can test your nats cluster by running : `kubectl exec -n {namespace} -it deployment/nats-box -- nats pub test hi`"
|
|
);
|
|
Ok(())
|
|
}
|
|
|
|
async fn apply_scores<T: Topology + 'static>(
|
|
topology: T,
|
|
scores: Vec<Box<dyn Score<T>>>,
|
|
) -> Result<(), InterpretError> {
|
|
info!("applying {} scores", scores.len());
|
|
|
|
harmony_cli::run(Inventory::autoload(), topology, scores, None)
|
|
.await
|
|
.map_err(|e| InterpretError::new(e.to_string()))?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn site(
|
|
topo_env: &str,
|
|
domain_env: &str,
|
|
cluster_name: &'static str,
|
|
) -> SiteContext<K8sAnywhereTopology> {
|
|
let domain = std::env::var(domain_env).expect("missing domain env");
|
|
|
|
let topology =
|
|
K8sAnywhereTopology::with_config(K8sAnywhereConfig::remote_k8s_from_env_var(topo_env));
|
|
|
|
SiteContext {
|
|
topology,
|
|
cluster: NatsCluster {
|
|
replicas: 1,
|
|
name: cluster_name,
|
|
gateway_advertise: format!("{cluster_name}-gw.{domain}:443"),
|
|
dns_name: format!("{cluster_name}-gw.{domain}"),
|
|
supercluster_ca_secret_name: "nats-supercluster-ca-bundle",
|
|
tls_cert_name: "nats-gateway",
|
|
jetstream_enabled: "true",
|
|
},
|
|
}
|
|
}
|
|
|
|
struct SiteContext<T> {
|
|
topology: T,
|
|
cluster: NatsCluster,
|
|
}
|
|
|
|
struct NatsCluster {
|
|
replicas: usize,
|
|
name: &'static str,
|
|
gateway_advertise: String,
|
|
dns_name: String,
|
|
supercluster_ca_secret_name: &'static str,
|
|
tls_cert_name: &'static str,
|
|
jetstream_enabled: &'static str,
|
|
}
|
|
|
|
async fn create_nats_certs<T: Topology + CertificateManagement>(
|
|
topology: T,
|
|
cluster: &NatsCluster,
|
|
ca_issuer_name: &str,
|
|
ca_cert_mgmt_config: &CertificateManagementConfig,
|
|
self_signed_issuer_name: &str,
|
|
self_signed_cert_config: &CertificateManagementConfig,
|
|
root_ca_cert_name: &str,
|
|
) -> Result<Outcome, InterpretError> {
|
|
//the order is pretty important
|
|
|
|
debug!(
|
|
"Applying certs to ns {:#?}",
|
|
ca_cert_mgmt_config.namespace.clone()
|
|
);
|
|
|
|
debug!("creating issuer '{}'", self_signed_issuer_name);
|
|
topology
|
|
.create_issuer(self_signed_issuer_name.to_string(), self_signed_cert_config)
|
|
.await?;
|
|
|
|
debug!("creating certificate {root_ca_cert_name}");
|
|
topology
|
|
.create_certificate(
|
|
root_ca_cert_name.to_string(),
|
|
self_signed_issuer_name.to_string(),
|
|
Some(format!("harmony-{}-ca", cluster.name)),
|
|
None,
|
|
Some(true),
|
|
ca_cert_mgmt_config,
|
|
)
|
|
.await?;
|
|
|
|
debug!("creating issuer '{}'", ca_issuer_name);
|
|
topology
|
|
.create_issuer(ca_issuer_name.to_string(), ca_cert_mgmt_config)
|
|
.await?;
|
|
|
|
debug!("creating certificate {}", cluster.tls_cert_name);
|
|
topology
|
|
.create_certificate(
|
|
cluster.tls_cert_name.to_string(),
|
|
ca_issuer_name.to_string(),
|
|
None,
|
|
Some(vec![cluster.dns_name.clone()]),
|
|
Some(true),
|
|
ca_cert_mgmt_config,
|
|
)
|
|
.await?;
|
|
|
|
Ok(Outcome::success("success".to_string()))
|
|
}
|
|
|
|
async fn build_ca_bundle_secret(
|
|
namespace: &str,
|
|
nats_cluster: &NatsCluster,
|
|
bundle: &[String],
|
|
) -> Secret {
|
|
Secret {
|
|
metadata: ObjectMeta {
|
|
name: Some(nats_cluster.supercluster_ca_secret_name.to_string()),
|
|
namespace: Some(namespace.to_string()),
|
|
..Default::default()
|
|
},
|
|
data: Some(build_secret_data(bundle).await),
|
|
immutable: Some(false),
|
|
type_: Some("Opaque".to_string()),
|
|
string_data: None,
|
|
}
|
|
}
|
|
|
|
async fn build_secret_data(bundle: &[String]) -> BTreeMap<String, ByteString> {
|
|
let mut data = BTreeMap::new();
|
|
|
|
data.insert(
|
|
"ca.crt".to_string(),
|
|
ByteString(bundle.join("\n").into_bytes()),
|
|
);
|
|
|
|
data
|
|
}
|
|
|
|
async fn build_ca_bundle_secret_score<T: Topology + K8sclient + 'static>(
|
|
_topology: T,
|
|
nats_cluster: &NatsCluster,
|
|
ca_bundle: &[String],
|
|
namespace: String,
|
|
) -> Box<dyn Score<T>> {
|
|
let bundle_secret = build_ca_bundle_secret(&namespace, nats_cluster, ca_bundle).await;
|
|
debug!(
|
|
"deploying secret to ns: {} \nsecret: {:#?}",
|
|
namespace, bundle_secret
|
|
);
|
|
let k8ssecret = K8sResourceScore::single(bundle_secret, Some(namespace));
|
|
Box::new(k8ssecret)
|
|
}
|
|
|
|
async fn build_route_score<T: Topology + K8sclient + 'static>(
|
|
_topology: T,
|
|
cluster: &NatsCluster,
|
|
namespace: String,
|
|
) -> Box<dyn Score<T>> {
|
|
let route = OKDRouteScore {
|
|
name: cluster.name.to_string(),
|
|
namespace,
|
|
annotations: Default::default(),
|
|
spec: RouteSpec {
|
|
to: RouteTargetReference {
|
|
kind: "Service".to_string(),
|
|
name: cluster.name.to_string(),
|
|
weight: Some(100),
|
|
},
|
|
host: Some(cluster.dns_name.clone()),
|
|
port: Some(RoutePort { target_port: 7222 }),
|
|
tls: Some(TLSConfig {
|
|
insecure_edge_termination_policy: None,
|
|
termination: "passthrough".to_string(),
|
|
..Default::default()
|
|
}),
|
|
wildcard_policy: None,
|
|
..Default::default()
|
|
},
|
|
};
|
|
Box::new(route)
|
|
}
|
|
|
|
async fn build_deploy_nats_score<T: Topology + HelmCommand + TlsRouter + 'static>(
|
|
topology: T,
|
|
cluster: &NatsCluster,
|
|
peers: Vec<&NatsCluster>,
|
|
namespace: String,
|
|
) -> Box<dyn Score<T>> {
|
|
let mut gateway_gateways = String::new();
|
|
for peer in peers {
|
|
// Construct wss:// URLs on port 443 for the remote gateways
|
|
gateway_gateways.push_str(&format!(
|
|
r#"
|
|
- name: {}
|
|
urls:
|
|
- nats://{}"#,
|
|
peer.name, peer.gateway_advertise
|
|
));
|
|
}
|
|
let domain = topology.get_internal_domain().await.unwrap().unwrap();
|
|
|
|
// Inject gateway config into the 'merge' block to comply with chart structure
|
|
let tls_secret_name = format!("{}-tls", cluster.tls_cert_name);
|
|
let values_yaml = Some(format!(
|
|
r#"config:
|
|
merge:
|
|
authorization:
|
|
default_permissions:
|
|
publish: ["TEST.*"]
|
|
subscribe: ["PUBLIC.>"]
|
|
users:
|
|
# - user: "admin"
|
|
# password: "admin_1"
|
|
# permissions:
|
|
# publish: ">"
|
|
# subscribe: ">"
|
|
- password: "enGk0cgZUabM6bN6FXHT"
|
|
user: "testUser"
|
|
accounts:
|
|
system:
|
|
users:
|
|
- user: "admin"
|
|
password: "admin_2"
|
|
logtime: true
|
|
debug: true
|
|
trace: true
|
|
system_account: system
|
|
cluster:
|
|
name: {cluster_name}
|
|
enabled: true
|
|
replicas: {replicas}
|
|
jetstream:
|
|
enabled: {jetstream_enabled}
|
|
fileStorage:
|
|
enabled: true
|
|
size: 10Gi
|
|
storageDirectory: /data/jetstream
|
|
leafnodes:
|
|
enabled: false
|
|
websocket:
|
|
enabled: false
|
|
ingress:
|
|
enabled: true
|
|
className: openshift-default
|
|
pathType: Prefix
|
|
hosts:
|
|
- nats-ws.{domain}
|
|
gateway:
|
|
enabled: true
|
|
port: 7222
|
|
name: {cluster_name}
|
|
merge:
|
|
advertise: {gateway_advertise}
|
|
gateways: {gateway_gateways}
|
|
tls:
|
|
enabled: true
|
|
secretName: {tls_secret_name}
|
|
# merge:
|
|
# ca_file: "/etc/nats-certs/gateway/ca.crt"
|
|
service:
|
|
ports:
|
|
gateway:
|
|
enabled: true
|
|
tlsCA:
|
|
enabled: true
|
|
secretName: {supercluster_ca_secret_name}
|
|
natsBox:
|
|
container:
|
|
image:
|
|
tag: nonroot"#,
|
|
cluster_name = cluster.name,
|
|
replicas = cluster.replicas,
|
|
domain = domain,
|
|
gateway_gateways = gateway_gateways,
|
|
gateway_advertise = cluster.gateway_advertise,
|
|
tls_secret_name = tls_secret_name,
|
|
jetstream_enabled = cluster.jetstream_enabled,
|
|
supercluster_ca_secret_name = cluster.supercluster_ca_secret_name,
|
|
));
|
|
|
|
debug!("Prepared Helm Chart values : \n{values_yaml:#?}");
|
|
let nats = HelmChartScore {
|
|
namespace: Some(NonBlankString::from_str(&namespace).unwrap()),
|
|
release_name: NonBlankString::from_str(cluster.name).unwrap(),
|
|
chart_name: NonBlankString::from_str("nats/nats").unwrap(),
|
|
chart_version: None,
|
|
values_overrides: None,
|
|
values_yaml,
|
|
create_namespace: true,
|
|
install_only: false,
|
|
repository: Some(HelmRepository::new(
|
|
"nats".to_string(),
|
|
hurl!("https://nats-io.github.io/k8s/helm/charts/"),
|
|
true,
|
|
)),
|
|
};
|
|
|
|
Box::new(nats)
|
|
}
|