From 77583a1ad114e93e04d333380b5c527fbb91c14c Mon Sep 17 00:00:00 2001 From: Jean-Gabriel Gill-Couture Date: Thu, 8 Jan 2026 16:03:15 -0500 Subject: [PATCH] wip: nats multi cluster, fixing helm command to follow multiple k8s config by providing the helm command from the topology itself, fix cli_logger that can now be initialized multiple times, some more stuff --- examples/nats/src/main.rs | 45 ++-- harmony/src/domain/topology/helm_command.rs | 6 +- .../topology/k8s_anywhere/k8s_anywhere.rs | 29 ++- harmony/src/domain/topology/router.rs | 11 +- harmony/src/modules/network/failover.rs | 2 +- .../src/modules/okd/crd/ingresses_config.rs | 214 ++++++++++++++++++ harmony/src/modules/okd/crd/mod.rs | 1 + harmony/src/modules/okd/crd/route.rs | 1 - harmony_cli/src/cli_logger.rs | 9 +- 9 files changed, 287 insertions(+), 31 deletions(-) create mode 100644 harmony/src/modules/okd/crd/ingresses_config.rs diff --git a/examples/nats/src/main.rs b/examples/nats/src/main.rs index e986bb4..7cdb3eb 100644 --- a/examples/nats/src/main.rs +++ b/examples/nats/src/main.rs @@ -3,15 +3,27 @@ use std::str::FromStr; use harmony::{ inventory::Inventory, modules::helm::chart::{HelmChartScore, HelmRepository, NonBlankString}, - topology::K8sAnywhereTopology, + topology::{HelmCommand, K8sAnywhereConfig, K8sAnywhereTopology, TlsRouter, Topology}, }; use harmony_macros::hurl; use log::info; #[tokio::main] async fn main() { - // env_logger::init(); - let values_yaml = Some( + deploy_nats(K8sAnywhereTopology::with_config( + K8sAnywhereConfig::remote_k8s_from_env_var("HARMONY_NATS_SITE_1"), + )) + .await; + deploy_nats(K8sAnywhereTopology::with_config( + K8sAnywhereConfig::remote_k8s_from_env_var("HARMONY_NATS_SITE_2"), + )) + .await; +} + +async fn deploy_nats(topology: T) { + topology.ensure_ready().await.unwrap(); + + let values_yaml = Some(format!( r#"config: cluster: enabled: true @@ -27,10 +39,12 @@ async fn main() { # port: 7422 websocket: enabled: true - ingress: - enabled: true - hosts: - - nats-demo.sto1.nationtech.io + ingress: + enabled: true + className: openshift-default + pathType: Prefix + hosts: + - nats-ws.{} gateway: enabled: false # name: my-gateway @@ -38,9 +52,9 @@ async fn main() { natsBox: container: image: - tag: nonroot"# - .to_string(), - ); + tag: nonroot"#, + topology.get_internal_domain().await.unwrap().unwrap(), + )); let namespace = "nats"; let nats = HelmChartScore { namespace: Some(NonBlankString::from_str(namespace).unwrap()), @@ -58,14 +72,9 @@ natsBox: )), }; - harmony_cli::run( - Inventory::autoload(), - K8sAnywhereTopology::from_env(), - vec![Box::new(nats)], - None, - ) - .await - .unwrap(); + harmony_cli::run(Inventory::autoload(), topology, vec![Box::new(nats)], None) + .await + .unwrap(); info!( "Enjoy! You can test your nats cluster by running : `kubectl exec -n {namespace} -it deployment/nats-box -- nats pub test hi`" diff --git a/harmony/src/domain/topology/helm_command.rs b/harmony/src/domain/topology/helm_command.rs index f3dd697..1d5cb19 100644 --- a/harmony/src/domain/topology/helm_command.rs +++ b/harmony/src/domain/topology/helm_command.rs @@ -1 +1,5 @@ -pub trait HelmCommand {} +use std::process::Command; + +pub trait HelmCommand { + fn get_helm_command(&self) -> Command; +} diff --git a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs index 22dfaad..72fa819 100644 --- a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs @@ -35,6 +35,7 @@ use crate::{ service_monitor::ServiceMonitor, }, }, + okd::crd::ingresses_config::Ingress as IngressResource, okd::route::OKDTlsPassthroughScore, prometheus::{ k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore, @@ -107,8 +108,32 @@ impl K8sclient for K8sAnywhereTopology { #[async_trait] impl TlsRouter for K8sAnywhereTopology { - async fn get_wildcard_domain(&self) -> Result, String> { - todo!() + async fn get_internal_domain(&self) -> Result, String> { + match self.get_k8s_distribution().await.map_err(|e| { + format!( + "Could not get internal domain, error getting k8s distribution : {}", + e.to_string() + ) + })? { + KubernetesDistribution::OpenshiftFamily => { + let client = self.k8s_client().await?; + if let Some(ingress_config) = client + .get_resource::("cluster", None) + .await + .map_err(|e| { + format!("Error attempting to get ingress config : {}", e.to_string()) + })? + { + debug!("Found ingress config {:?}", ingress_config.spec); + Ok(ingress_config.spec.domain.clone()) + } else { + warn!("Could not find a domain configured in this cluster"); + Ok(None) + } + } + KubernetesDistribution::K3sFamily => todo!(), + KubernetesDistribution::Default => todo!(), + } } /// Returns the port that this router exposes externally. diff --git a/harmony/src/domain/topology/router.rs b/harmony/src/domain/topology/router.rs index 5217c82..30d5b46 100644 --- a/harmony/src/domain/topology/router.rs +++ b/harmony/src/domain/topology/router.rs @@ -112,12 +112,13 @@ pub trait TlsRouter: Send + Sync { /// HAProxy frontend→backend \"postgres-upstream\". async fn install_route(&self, config: TlsRoute) -> Result<(), String>; - /// Gets the base domain that can be used to deploy applications that will be automatically - /// routed to this cluster. + /// Gets the base domain of this cluster. On openshift family clusters, this is the domain + /// used by default for all components, including the default ingress controller that + /// transforms ingress to routes. /// - /// For example, if we have *.apps.nationtech.io pointing to a public load balancer, then this - /// function would install route apps.nationtech.io - async fn get_wildcard_domain(&self) -> Result, String>; + /// For example, get_internal_domain on a cluster that has `console-openshift-console.apps.mycluster.something` + /// will return `apps.mycluster.something` + async fn get_internal_domain(&self) -> Result, String>; /// Returns the port that this router exposes externally. async fn get_router_port(&self) -> u16; diff --git a/harmony/src/modules/network/failover.rs b/harmony/src/modules/network/failover.rs index 3880c02..d5fd8c0 100644 --- a/harmony/src/modules/network/failover.rs +++ b/harmony/src/modules/network/failover.rs @@ -5,7 +5,7 @@ use crate::topology::{FailoverTopology, TlsRoute, TlsRouter}; #[async_trait] impl TlsRouter for FailoverTopology { - async fn get_wildcard_domain(&self) -> Result, String> { + async fn get_internal_domain(&self) -> Result, String> { todo!() } diff --git a/harmony/src/modules/okd/crd/ingresses_config.rs b/harmony/src/modules/okd/crd/ingresses_config.rs new file mode 100644 index 0000000..4c90156 --- /dev/null +++ b/harmony/src/modules/okd/crd/ingresses_config.rs @@ -0,0 +1,214 @@ +use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ListMeta, ObjectMeta}; +use k8s_openapi::{ClusterResourceScope, Resource}; +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Ingress { + #[serde(skip_serializing_if = "Option::is_none")] + pub api_version: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub kind: Option, + pub metadata: ObjectMeta, + + pub spec: IngressSpec, + + #[serde(skip_serializing_if = "Option::is_none")] + pub status: Option, +} + +impl Resource for Ingress { + const API_VERSION: &'static str = "config.openshift.io/v1"; + const GROUP: &'static str = "config.openshift.io"; + const VERSION: &'static str = "v1"; + const KIND: &'static str = "Ingress"; + const URL_PATH_SEGMENT: &'static str = "ingresses"; + type Scope = ClusterResourceScope; +} + +impl k8s_openapi::Metadata for Ingress { + type Ty = ObjectMeta; + + fn metadata(&self) -> &Self::Ty { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut Self::Ty { + &mut self.metadata + } +} + +impl Default for Ingress { + fn default() -> Self { + Ingress { + api_version: Some("config.openshift.io/v1".to_string()), + kind: Some("Ingress".to_string()), + metadata: ObjectMeta::default(), + spec: IngressSpec::default(), + status: None, + } + } +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct IngressList { + pub metadata: ListMeta, + pub items: Vec, +} + +impl Default for IngressList { + fn default() -> Self { + Self { + metadata: ListMeta::default(), + items: Vec::new(), + } + } +} + +impl Resource for IngressList { + const API_VERSION: &'static str = "config.openshift.io/v1"; + const GROUP: &'static str = "config.openshift.io"; + const VERSION: &'static str = "v1"; + const KIND: &'static str = "IngressList"; + const URL_PATH_SEGMENT: &'static str = "ingresses"; + type Scope = ClusterResourceScope; +} + +impl k8s_openapi::Metadata for IngressList { + type Ty = ListMeta; + + fn metadata(&self) -> &Self::Ty { + &self.metadata + } + + fn metadata_mut(&mut self) -> &mut Self::Ty { + &mut self.metadata + } +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct IngressSpec { + #[serde(skip_serializing_if = "Option::is_none")] + pub apps_domain: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub component_routes: Option>, + + #[serde(skip_serializing_if = "Option::is_none")] + pub domain: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub load_balancer: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub required_hsts_policies: Option>, +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ComponentRouteSpec { + pub hostname: String, + pub name: String, + pub namespace: String, + + #[serde(skip_serializing_if = "Option::is_none")] + pub serving_cert_key_pair_secret: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct SecretNameReference { + pub name: String, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct LoadBalancer { + #[serde(skip_serializing_if = "Option::is_none")] + pub platform: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct IngressPlatform { + #[serde(skip_serializing_if = "Option::is_none")] + pub aws: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub r#type: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct AWSPlatformLoadBalancer { + pub r#type: String, +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct RequiredHSTSPolicy { + pub domain_patterns: Vec, + + #[serde(skip_serializing_if = "Option::is_none")] + pub include_sub_domains_policy: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub max_age: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub namespace_selector: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub preload_policy: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct MaxAgePolicy { + #[serde(skip_serializing_if = "Option::is_none")] + pub largest_max_age: Option, + + #[serde(skip_serializing_if = "Option::is_none")] + pub smallest_max_age: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct IngressStatus { + #[serde(skip_serializing_if = "Option::is_none")] + pub component_routes: Option>, + + #[serde(skip_serializing_if = "Option::is_none")] + pub default_placement: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ComponentRouteStatus { + #[serde(skip_serializing_if = "Option::is_none")] + pub conditions: Option>, + + #[serde(skip_serializing_if = "Option::is_none")] + pub consuming_users: Option>, + + #[serde(skip_serializing_if = "Option::is_none")] + pub current_hostnames: Option>, + + pub default_hostname: String, + pub name: String, + pub namespace: String, + pub related_objects: Vec, +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ObjectReference { + pub group: String, + pub name: String, + pub namespace: String, + pub resource: String, +} + diff --git a/harmony/src/modules/okd/crd/mod.rs b/harmony/src/modules/okd/crd/mod.rs index 71c4d0a..c073458 100644 --- a/harmony/src/modules/okd/crd/mod.rs +++ b/harmony/src/modules/okd/crd/mod.rs @@ -1,2 +1,3 @@ pub mod nmstate; pub mod route; +pub mod ingresses_config; diff --git a/harmony/src/modules/okd/crd/route.rs b/harmony/src/modules/okd/crd/route.rs index 4c396d0..7c9f156 100644 --- a/harmony/src/modules/okd/crd/route.rs +++ b/harmony/src/modules/okd/crd/route.rs @@ -1,5 +1,4 @@ use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ListMeta, ObjectMeta, Time}; -use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString; use k8s_openapi::{NamespaceResourceScope, Resource}; use serde::{Deserialize, Serialize}; diff --git a/harmony_cli/src/cli_logger.rs b/harmony_cli/src/cli_logger.rs index 2cb2a93..03501ef 100644 --- a/harmony_cli/src/cli_logger.rs +++ b/harmony_cli/src/cli_logger.rs @@ -7,11 +7,14 @@ use harmony::{ }; use log::{error, info, log_enabled}; use std::io::Write; -use std::sync::Mutex; +use std::sync::{Mutex, OnceLock}; pub fn init() { - configure_logger(); - handle_events(); + static INITIALIZED: OnceLock<()> = OnceLock::new(); + INITIALIZED.get_or_init(|| { + configure_logger(); + handle_events(); + }); } fn configure_logger() {