diff --git a/Cargo.lock b/Cargo.lock index e8bee18..ac4598b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1776,6 +1776,21 @@ dependencies = [ "url", ] +[[package]] +name = "example-multisite-postgres" +version = "0.1.0" +dependencies = [ + "cidr", + "env_logger", + "harmony", + "harmony_cli", + "harmony_macros", + "harmony_types", + "log", + "tokio", + "url", +] + [[package]] name = "example-nanodc" version = "0.1.0" @@ -2561,6 +2576,7 @@ version = "0.1.0" dependencies = [ "rand 0.9.2", "serde", + "serde_json", "url", ] diff --git a/examples/multisite_postgres/Cargo.toml b/examples/multisite_postgres/Cargo.toml new file mode 100644 index 0000000..249b9f8 --- /dev/null +++ b/examples/multisite_postgres/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "example-multisite-postgres" +edition = "2024" +version.workspace = true +readme.workspace = true +license.workspace = true +publish = false + +[dependencies] +harmony = { path = "../../harmony" } +harmony_cli = { path = "../../harmony_cli" } +harmony_types = { path = "../../harmony_types" } +cidr = { workspace = true } +tokio = { workspace = true } +harmony_macros = { path = "../../harmony_macros" } +log = { workspace = true } +env_logger = { workspace = true } +url = { workspace = true } diff --git a/examples/multisite_postgres/src/main.rs b/examples/multisite_postgres/src/main.rs new file mode 100644 index 0000000..bcb02c1 --- /dev/null +++ b/examples/multisite_postgres/src/main.rs @@ -0,0 +1,31 @@ +use harmony::{ + inventory::Inventory, + modules::postgresql::{ + K8sPostgreSQLScore, PublicPostgreSQLScore, capability::PostgreSQLConfig, + }, + topology::{FailoverTopology, K8sAnywhereTopology}, +}; + +#[tokio::main] +async fn main() { + let postgres = PublicPostgreSQLScore { + postgres_score: K8sPostgreSQLScore { + config: PostgreSQLConfig { + cluster_name: "harmony-postgres-example".to_string(), // Override default name + namespace: "harmony-public-postgres".to_string(), + ..Default::default() // Use harmony defaults, they are based on CNPG's default values : + // "default" namespace, 1 instance, 1Gi storage + }, + }, + hostname: "postgrestest.sto1.nationtech.io".to_string(), + }; + + harmony_cli::run( + Inventory::autoload(), + FailoverTopology::::from_env(), + vec![Box::new(postgres)], + None, + ) + .await + .unwrap(); +} diff --git a/harmony/src/domain/topology/failover.rs b/harmony/src/domain/topology/failover.rs index 6df9cea..43c2ac6 100644 --- a/harmony/src/domain/topology/failover.rs +++ b/harmony/src/domain/topology/failover.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; -use crate::topology::{PreparationError, PreparationOutcome, Topology}; +use crate::topology::k8s_anywhere::K8sAnywhereConfig; +use crate::topology::{K8sAnywhereTopology, PreparationError, PreparationOutcome, Topology}; pub struct FailoverTopology { pub primary: T, @@ -17,3 +18,30 @@ impl Topology for FailoverTopology { todo!() } } + +impl FailoverTopology { + /// Creates a new `FailoverTopology` from environment variables. + /// + /// Expects two environment variables: + /// - `HARMONY_FAILOVER_TOPOLOGY_K8S_PRIMARY`: Comma-separated `key=value` pairs, e.g., + /// `kubeconfig=/path/to/primary.kubeconfig,context_name=primary-ctx` + /// - `HARMONY_FAILOVER_TOPOLOGY_K8S_REPLICA`: Same format for the replica. + /// + /// Parses `kubeconfig` (path to kubeconfig file) and `context_name` (Kubernetes context), + /// and constructs `K8sAnywhereConfig` with local installs disabled (`use_local_k3d=false`, + /// `autoinstall=false`, `use_system_kubeconfig=false`). + /// `harmony_profile` is read from `HARMONY_PROFILE` env or defaults to `"dev"`. + /// + /// Panics if required env vars are missing or malformed. + pub fn from_env() -> Self { + let primary_config = + K8sAnywhereConfig::remote_k8s_from_env_var("HARMONY_FAILOVER_TOPOLOGY_K8S_PRIMARY"); + let replica_config = + K8sAnywhereConfig::remote_k8s_from_env_var("HARMONY_FAILOVER_TOPOLOGY_K8S_REPLICA"); + + let primary = K8sAnywhereTopology::with_config(primary_config); + let replica = K8sAnywhereTopology::with_config(replica_config); + + Self { primary, replica } + } +} diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 3cab0b8..7884512 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -631,6 +631,23 @@ impl K8sClient { } pub async fn from_kubeconfig(path: &str) -> Option { + Self::from_kubeconfig_with_opts(path, &KubeConfigOptions::default()).await + } + + pub async fn from_kubeconfig_with_context( + path: &str, + context: Option, + ) -> Option { + let mut opts = KubeConfigOptions::default(); + opts.context = context; + + Self::from_kubeconfig_with_opts(path, &opts).await + } + + pub async fn from_kubeconfig_with_opts( + path: &str, + opts: &KubeConfigOptions, + ) -> Option { let k = match Kubeconfig::read_from(path) { Ok(k) => k, Err(e) => { @@ -638,13 +655,9 @@ impl K8sClient { return None; } }; + Some(K8sClient::new( - Client::try_from( - Config::from_custom_kubeconfig(k, &KubeConfigOptions::default()) - .await - .unwrap(), - ) - .unwrap(), + Client::try_from(Config::from_custom_kubeconfig(k, &opts).await.unwrap()).unwrap(), )) } } diff --git a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs index 694ce85..c3c6fa3 100644 --- a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs @@ -644,7 +644,7 @@ impl K8sAnywhereTopology { } async fn try_load_kubeconfig(&self, path: &str) -> Option { - K8sClient::from_kubeconfig(path).await + K8sClient::from_kubeconfig_with_context(path, self.config.k8s_context.clone()).await } fn get_k3d_installation_score(&self) -> K3DInstallationScore { @@ -922,9 +922,59 @@ pub struct K8sAnywhereConfig { /// default: true pub use_local_k3d: bool, pub harmony_profile: String, + + /// Name of the kubeconfig context to use. + /// + /// If None, it will use the current context. + /// + /// If the context name is not found, it will fail to initialize. + pub k8s_context: Option, } impl K8sAnywhereConfig { + /// Reads an environment variable `env_var` and parses its content : + /// Comma-separated `key=value` pairs, e.g., + /// `kubeconfig=/path/to/primary.kubeconfig,context_name=primary-ctx` + /// + /// Then creates a K8sAnywhereConfig from it local installs disabled (`use_local_k3d=false`, + /// `autoinstall=false`, `use_system_kubeconfig=false`). + /// `harmony_profile` is read from `HARMONY_PROFILE` env or defaults to `"dev"`. + /// + /// Panics if `env_var` is missing or malformed. + pub fn remote_k8s_from_env_var(env_var: &str) -> Self { + Self::remote_k8s_from_env_var_with_profile(env_var, "HARMONY_PROFILE") + } + + pub fn remote_k8s_from_env_var_with_profile(env_var: &str, profile_env_var: &str) -> Self { + let env_var_value = std::env::var(env_var) + .map_err(|_| format!("Missing required env var {env_var}")) + .unwrap(); + info!("Initializing remote k8s from env var value : {env_var_value}"); + + let mut kubeconfig: Option = None; + let mut k8s_context: Option = None; + + for part in env_var_value.split(',') { + let kv: Vec<&str> = part.splitn(2, '=').collect(); + if kv.len() == 2 { + match kv[0].trim() { + "kubeconfig" => kubeconfig = Some(kv[1].trim().to_string()), + "context_name" => k8s_context = Some(kv[1].trim().to_string()), + _ => {} + } + } + } + + K8sAnywhereConfig { + kubeconfig, + k8s_context, + use_system_kubeconfig: false, + autoinstall: false, + use_local_k3d: false, + harmony_profile: std::env::var(profile_env_var).unwrap_or_else(|_| "dev".to_string()), + } + } + fn from_env() -> Self { Self { kubeconfig: std::env::var("KUBECONFIG").ok().map(|v| v.to_string()), @@ -939,6 +989,7 @@ impl K8sAnywhereConfig { ), use_local_k3d: std::env::var("HARMONY_USE_LOCAL_K3D") .map_or_else(|_| true, |v| v.parse().ok().unwrap_or(true)), + k8s_context: std::env::var("HARMONY_K8S_CONTEXT").ok(), } } } @@ -1045,3 +1096,118 @@ impl Ingress for K8sAnywhereTopology { } } } + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + + static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0); + + /// Sets environment variables with unique names to avoid concurrency issues between tests. + /// Returns the names of the (config_var, profile_var) used. + fn setup_env_vars(config_value: Option<&str>, profile_value: Option<&str>) -> (String, String) { + let id = TEST_COUNTER.fetch_add(1, Ordering::SeqCst); + let config_var = format!("TEST_VAR_{}", id); + let profile_var = format!("TEST_PROFILE_{}", id); + + unsafe { + if let Some(v) = config_value { + std::env::set_var(&config_var, v); + } else { + std::env::remove_var(&config_var); + } + + if let Some(v) = profile_value { + std::env::set_var(&profile_var, v); + } else { + std::env::remove_var(&profile_var); + } + } + (config_var, profile_var) + } + + #[test] + fn test_remote_k8s_from_env_var_full() { + let (config_var, profile_var) = setup_env_vars( + Some("kubeconfig=/foo.kc,context_name=bar"), + Some("testprof"), + ); + + let cfg = + K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); + + assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc")); + assert_eq!(cfg.k8s_context.as_deref(), Some("bar")); + assert_eq!(cfg.harmony_profile, "testprof"); + assert!(!cfg.use_local_k3d); + assert!(!cfg.autoinstall); + assert!(!cfg.use_system_kubeconfig); + } + + #[test] + fn test_remote_k8s_from_env_var_only_kubeconfig() { + let (config_var, profile_var) = setup_env_vars(Some("kubeconfig=/foo.kc"), None); + + let cfg = + K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); + + assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc")); + assert_eq!(cfg.k8s_context, None); + assert_eq!(cfg.harmony_profile, "dev"); + } + + #[test] + fn test_remote_k8s_from_env_var_only_context() { + let (config_var, profile_var) = setup_env_vars(Some("context_name=bar"), None); + + let cfg = + K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); + + assert_eq!(cfg.kubeconfig, None); + assert_eq!(cfg.k8s_context.as_deref(), Some("bar")); + } + + #[test] + fn test_remote_k8s_from_env_var_unknown_key_trim() { + let (config_var, profile_var) = setup_env_vars( + Some(" unknown=bla , kubeconfig= /foo.kc ,context_name= bar "), + None, + ); + + let cfg = + K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); + + assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc")); + assert_eq!(cfg.k8s_context.as_deref(), Some("bar")); + } + + #[test] + fn test_remote_k8s_from_env_var_empty_malformed() { + let (config_var, profile_var) = setup_env_vars(Some("malformed,no=,equal"), None); + + let cfg = + K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); + + // Unknown/malformed ignored, defaults to None + assert_eq!(cfg.kubeconfig, None); + assert_eq!(cfg.k8s_context, None); + } + + #[test] + #[should_panic(expected = "Missing required env var")] + fn test_remote_k8s_from_env_var_missing() { + let (config_var, profile_var) = setup_env_vars(None, None); + K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); + } + + #[test] + fn test_remote_k8s_from_env_var_harmony_profile_default() { + let (config_var, profile_var) = setup_env_vars(Some("kubeconfig=/foo"), None); + + let cfg = + K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); + + assert_eq!(cfg.harmony_profile, "dev"); + } +} diff --git a/harmony/src/domain/topology/k8s_anywhere/postgres.rs b/harmony/src/domain/topology/k8s_anywhere/postgres.rs index 275c283..9af9810 100644 --- a/harmony/src/domain/topology/k8s_anywhere/postgres.rs +++ b/harmony/src/domain/topology/k8s_anywhere/postgres.rs @@ -8,9 +8,12 @@ use crate::{ capability::{PostgreSQL, PostgreSQLConfig, PostgreSQLEndpoint, ReplicationCerts}, }, score::Score, - topology::K8sAnywhereTopology, + topology::{K8sAnywhereTopology, K8sclient}, }; +use k8s_openapi::api::core::v1::{Secret, Service}; +use log::info; + #[async_trait] impl PostgreSQL for K8sAnywhereTopology { async fn deploy(&self, config: &PostgreSQLConfig) -> Result { @@ -27,12 +30,78 @@ impl PostgreSQL for K8sAnywhereTopology { /// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster. /// Abstracts away storage/retrieval details (e.g., secrets, files). async fn get_replication_certs(&self, cluster_name: &str) -> Result { - todo!() + let k8s_client = self.k8s_client().await.map_err(|e| e.to_string())?; + + let replication_secret_name = format!("{cluster_name}-replication"); + let replication_secret = k8s_client + .get_resource::(&replication_secret_name, None) + .await + .map_err(|e| format!("Failed to get {replication_secret_name}: {e}"))? + .ok_or_else(|| format!("Replication secret '{replication_secret_name}' not found"))?; + + let ca_secret_name = format!("{cluster_name}-ca"); + let ca_secret = k8s_client + .get_resource::(&ca_secret_name, None) + .await + .map_err(|e| format!("Failed to get {ca_secret_name}: {e}"))? + .ok_or_else(|| format!("CA secret '{ca_secret_name}' not found"))?; + + let replication_data = replication_secret + .data + .as_ref() + .ok_or("Replication secret has no data".to_string())?; + let ca_data = ca_secret + .data + .as_ref() + .ok_or("CA secret has no data".to_string())?; + + let tls_key_bs = replication_data + .get("tls.key") + .ok_or("missing tls.key in replication secret".to_string())?; + let tls_crt_bs = replication_data + .get("tls.crt") + .ok_or("missing tls.crt in replication secret".to_string())?; + let ca_crt_bs = ca_data + .get("ca.crt") + .ok_or("missing ca.crt in CA secret".to_string())?; + + let streaming_replica_key_pem = String::from_utf8_lossy(&tls_key_bs.0).to_string(); + let streaming_replica_cert_pem = String::from_utf8_lossy(&tls_crt_bs.0).to_string(); + let ca_cert_pem = String::from_utf8_lossy(&ca_crt_bs.0).to_string(); + + info!("Successfully extracted replication certs for cluster '{cluster_name}'"); + + Ok(ReplicationCerts { + ca_cert_pem, + streaming_replica_cert_pem, + streaming_replica_key_pem, + }) } /// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster. async fn get_endpoint(&self, cluster_name: &str) -> Result { - todo!() + let k8s_client = self.k8s_client().await.map_err(|e| e.to_string())?; + + let service_name = format!("{cluster_name}-rw"); + let service = k8s_client + .get_resource::(&service_name, None) + .await + .map_err(|e| format!("Failed to get service '{service_name}': {e}"))? + .ok_or_else(|| { + format!("Service '{service_name}' not found for cluster '{cluster_name}") + })?; + + let ns = service + .metadata + .namespace + .as_deref() + .unwrap_or("default") + .to_string(); + let host = format!("{service_name}.{ns}.svc.cluster.local"); + + info!("Internal endpoint for '{cluster_name}': {host}:5432"); + + Ok(PostgreSQLEndpoint { host, port: 5432 }) } /// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough). @@ -43,6 +112,9 @@ impl PostgreSQL for K8sAnywhereTopology { &self, cluster_name: &str, ) -> Result, String> { - todo!() + // TODO: Implement OpenShift Route lookup targeting '{cluster_name}-rw' service on port 5432 with TLS passthrough + // For now, return None assuming internal-only access or manual route configuration + info!("Public endpoint lookup not implemented for '{cluster_name}', returning None"); + Ok(None) } } diff --git a/harmony/src/domain/topology/router.rs b/harmony/src/domain/topology/router.rs index a904e1d..f5db5bf 100644 --- a/harmony/src/domain/topology/router.rs +++ b/harmony/src/domain/topology/router.rs @@ -80,15 +80,15 @@ pub struct TlsRoute { pub namespace: String, } - impl TlsRoute { - pub fn to_string_short(&self) -> String { - format!("{}-{}:{}", self.hostname, self.backend, self.target_port) - } +impl TlsRoute { + pub fn to_string_short(&self) -> String { + format!("{}-{}:{}", self.hostname, self.backend, self.target_port) + } - pub fn backend_info_string(&self) -> String { - format!("{}:{}", self.backend, self.target_port) - } - } + pub fn backend_info_string(&self) -> String { + format!("{}:{}", self.backend, self.target_port) + } +} /// Installs and queries TLS passthrough routes (L4 TCP/SNI forwarding, no TLS termination). /// Agnostic to impl: OKD Route, AWS NLB+HAProxy, k3s Envoy Gateway, Apache ProxyPass. diff --git a/harmony/src/modules/k8s/failover.rs b/harmony/src/modules/k8s/failover.rs new file mode 100644 index 0000000..939d9ab --- /dev/null +++ b/harmony/src/modules/k8s/failover.rs @@ -0,0 +1,19 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use log::warn; + +use crate::topology::{FailoverTopology, K8sclient, k8s::K8sClient}; + +#[async_trait] +impl K8sclient for FailoverTopology { + // TODO figure out how to structure this properly. This gives access only to the primary k8s + // client, which will work in many cases but is clearly not good enough for all uses cases + // where k8s_client can be used. Logging a warning for now. + async fn k8s_client(&self) -> Result, String> { + warn!( + "Failover topology k8s_client capability currently defers to the primary only. Make sure to check this is OK for you" + ); + self.primary.k8s_client().await + } +} diff --git a/harmony/src/modules/k8s/mod.rs b/harmony/src/modules/k8s/mod.rs index 56c9201..29264b4 100644 --- a/harmony/src/modules/k8s/mod.rs +++ b/harmony/src/modules/k8s/mod.rs @@ -1,5 +1,6 @@ pub mod apps; pub mod deployment; +mod failover; pub mod ingress; pub mod namespace; pub mod resource; diff --git a/harmony/src/modules/network/failover.rs b/harmony/src/modules/network/failover.rs new file mode 100644 index 0000000..897abbd --- /dev/null +++ b/harmony/src/modules/network/failover.rs @@ -0,0 +1,14 @@ +use async_trait::async_trait; +use log::warn; + +use crate::topology::{FailoverTopology, TlsRoute, TlsRouter}; + +#[async_trait] +impl TlsRouter for FailoverTopology { + async fn install_route(&self, config: TlsRoute) -> Result<(), String> { + warn!( + "Failover topology TlsRouter capability currently defers to the primary only. Make sure to check this is OK for you. The Replica Topology WILL NOT be affected here" + ); + self.primary.install_route(config).await + } +} diff --git a/harmony/src/modules/network/mod.rs b/harmony/src/modules/network/mod.rs index c70dd89..53c91f2 100644 --- a/harmony/src/modules/network/mod.rs +++ b/harmony/src/modules/network/mod.rs @@ -1,2 +1,3 @@ +mod failover; mod tls_router; pub use tls_router::*; diff --git a/harmony/src/modules/okd/route.rs b/harmony/src/modules/okd/route.rs index 9ca2f82..a20c968 100644 --- a/harmony/src/modules/okd/route.rs +++ b/harmony/src/modules/okd/route.rs @@ -88,7 +88,11 @@ impl Score for OKDTlsPassthroughScore { }), ..Default::default() }; - let route_score = OKDRouteScore::new(&self.name.to_string(), &self.route.namespace, passthrough_spec); + let route_score = OKDRouteScore::new( + &self.name.to_string(), + &self.route.namespace, + passthrough_spec, + ); route_score.create_interpret() } diff --git a/harmony/src/modules/postgresql/failover.rs b/harmony/src/modules/postgresql/failover.rs index 8e80166..1df5008 100644 --- a/harmony/src/modules/postgresql/failover.rs +++ b/harmony/src/modules/postgresql/failover.rs @@ -3,7 +3,6 @@ use log::debug; use log::info; use std::collections::HashMap; -use crate::interpret::Outcome; use crate::{ modules::postgresql::capability::{ BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL, diff --git a/harmony/src/modules/postgresql/mod.rs b/harmony/src/modules/postgresql/mod.rs index 75e91d6..4e6efd5 100644 --- a/harmony/src/modules/postgresql/mod.rs +++ b/harmony/src/modules/postgresql/mod.rs @@ -1,6 +1,6 @@ pub mod capability; -mod score_k8s; mod score_connect; +mod score_k8s; pub use score_connect::*; pub use score_k8s::*; mod score_public; diff --git a/harmony_types/src/lib.rs b/harmony_types/src/lib.rs index 3d72c80..b367a75 100644 --- a/harmony_types/src/lib.rs +++ b/harmony_types/src/lib.rs @@ -1,5 +1,5 @@ pub mod id; pub mod net; +pub mod rfc1123; pub mod storage; pub mod switch; -pub mod rfc1123; diff --git a/harmony_types/src/rfc1123.rs b/harmony_types/src/rfc1123.rs index 190d8c4..fc0b722 100644 --- a/harmony_types/src/rfc1123.rs +++ b/harmony_types/src/rfc1123.rs @@ -43,7 +43,9 @@ impl TryFrom<&str> for Rfc1123Name { } // Trim leading/trailing non-alphanumeric - content = content.trim_matches(|c: char| !c.is_ascii_alphanumeric()).to_string(); + content = content + .trim_matches(|c: char| !c.is_ascii_alphanumeric()) + .to_string(); if content.is_empty() { return Err(format!("Input '{}' resulted in empty string", s)); @@ -55,7 +57,6 @@ impl TryFrom<&str> for Rfc1123Name { type Error = String; } - /// Converts an `Rfc1123Name` into a `String`. /// /// This allows using `Rfc1123Name` in contexts where a `String` is expected. @@ -99,7 +100,6 @@ impl std::fmt::Display for Rfc1123Name { } } - #[cfg(test)] mod tests { use super::Rfc1123Name; @@ -229,4 +229,3 @@ mod tests { assert_eq!(name.content, "a.b.c"); } } - diff --git a/harmony_types/src/storage.rs b/harmony_types/src/storage.rs index 28a7ef4..4be3ea1 100644 --- a/harmony_types/src/storage.rs +++ b/harmony_types/src/storage.rs @@ -71,7 +71,7 @@ impl StorageSize { Self { size_bytes: size * 1024 * 1024 * 1024 * 1024, display_value: Some(size), - display_suffix: Some("TiB".to_string()), + display_suffix: Some("Ti".to_string()), } }