diff --git a/examples/multisite_postgres/Cargo.toml b/examples/multisite_postgres/Cargo.toml new file mode 100644 index 0000000..249b9f8 --- /dev/null +++ b/examples/multisite_postgres/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "example-multisite-postgres" +edition = "2024" +version.workspace = true +readme.workspace = true +license.workspace = true +publish = false + +[dependencies] +harmony = { path = "../../harmony" } +harmony_cli = { path = "../../harmony_cli" } +harmony_types = { path = "../../harmony_types" } +cidr = { workspace = true } +tokio = { workspace = true } +harmony_macros = { path = "../../harmony_macros" } +log = { workspace = true } +env_logger = { workspace = true } +url = { workspace = true } diff --git a/examples/multisite_postgres/src/main.rs b/examples/multisite_postgres/src/main.rs new file mode 100644 index 0000000..6147f76 --- /dev/null +++ b/examples/multisite_postgres/src/main.rs @@ -0,0 +1,31 @@ +use harmony::{ + inventory::Inventory, + modules::postgresql::{ + capability::PostgreSQLConfig, K8sPostgreSQLScore, PublicPostgreSQLScore + }, + topology::{FailoverTopology, K8sAnywhereTopology}, +}; + +#[tokio::main] +async fn main() { + let postgres = PublicPostgreSQLScore { + postgres_score: K8sPostgreSQLScore { + config: PostgreSQLConfig { + cluster_name: "harmony-postgres-example".to_string(), // Override default name + namespace: "harmony-public-postgres".to_string(), + ..Default::default() // Use harmony defaults, they are based on CNPG's default values : + // "default" namespace, 1 instance, 1Gi storage + }, + }, + hostname: "postgrestest.sto1.nationtech.io".to_string(), + }; + + harmony_cli::run( + Inventory::autoload(), + FailoverTopology::::from_env(), + vec![Box::new(postgres)], + None, + ) + .await + .unwrap(); +} diff --git a/harmony/src/domain/topology/failover.rs b/harmony/src/domain/topology/failover.rs index 6df9cea..f46331c 100644 --- a/harmony/src/domain/topology/failover.rs +++ b/harmony/src/domain/topology/failover.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; -use crate::topology::{PreparationError, PreparationOutcome, Topology}; +use crate::topology::k8s_anywhere::K8sAnywhereConfig; +use crate::topology::{K8sAnywhereTopology, PreparationError, PreparationOutcome, Topology}; pub struct FailoverTopology { pub primary: T, @@ -17,3 +18,29 @@ impl Topology for FailoverTopology { todo!() } } + +impl FailoverTopology { + /// Creates a new `FailoverTopology` from environment variables. + /// + /// Expects two environment variables: + /// - `HARMONY_FAILOVER_TOPOLOGY_K8S_PRIMARY`: Comma-separated `key=value` pairs, e.g., + /// `kubeconfig=/path/to/primary.kubeconfig,context_name=primary-ctx` + /// - `HARMONY_FAILOVER_TOPOLOGY_K8S_REPLICA`: Same format for the replica. + /// + /// Parses `kubeconfig` (path to kubeconfig file) and `context_name` (Kubernetes context), + /// and constructs `K8sAnywhereConfig` with local installs disabled (`use_local_k3d=false`, + /// `autoinstall=false`, `use_system_kubeconfig=false`). + /// `harmony_profile` is read from `HARMONY_PROFILE` env or defaults to `"dev"`. + /// + /// Panics if required env vars are missing or malformed. + pub fn from_env() -> Self { + + let primary_config = K8sAnywhereConfig::remote_k8s_from_env_var("HARMONY_FAILOVER_TOPOLOGY_K8S_PRIMARY"); + let replica_config = K8sAnywhereConfig::remote_k8s_from_env_var("HARMONY_FAILOVER_TOPOLOGY_K8S_REPLICA"); + + let primary = K8sAnywhereTopology::with_config(primary_config); + let replica = K8sAnywhereTopology::with_config(replica_config); + + Self { primary, replica } + } +} diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 3cab0b8..7884512 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -631,6 +631,23 @@ impl K8sClient { } pub async fn from_kubeconfig(path: &str) -> Option { + Self::from_kubeconfig_with_opts(path, &KubeConfigOptions::default()).await + } + + pub async fn from_kubeconfig_with_context( + path: &str, + context: Option, + ) -> Option { + let mut opts = KubeConfigOptions::default(); + opts.context = context; + + Self::from_kubeconfig_with_opts(path, &opts).await + } + + pub async fn from_kubeconfig_with_opts( + path: &str, + opts: &KubeConfigOptions, + ) -> Option { let k = match Kubeconfig::read_from(path) { Ok(k) => k, Err(e) => { @@ -638,13 +655,9 @@ impl K8sClient { return None; } }; + Some(K8sClient::new( - Client::try_from( - Config::from_custom_kubeconfig(k, &KubeConfigOptions::default()) - .await - .unwrap(), - ) - .unwrap(), + Client::try_from(Config::from_custom_kubeconfig(k, &opts).await.unwrap()).unwrap(), )) } } diff --git a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs index 694ce85..2bca7ad 100644 --- a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs @@ -644,7 +644,7 @@ impl K8sAnywhereTopology { } async fn try_load_kubeconfig(&self, path: &str) -> Option { - K8sClient::from_kubeconfig(path).await + K8sClient::from_kubeconfig_with_context(path, self.config.k8s_context.clone()).await } fn get_k3d_installation_score(&self) -> K3DInstallationScore { @@ -922,9 +922,59 @@ pub struct K8sAnywhereConfig { /// default: true pub use_local_k3d: bool, pub harmony_profile: String, + + /// Name of the kubeconfig context to use. + /// + /// If None, it will use the current context. + /// + /// If the context name is not found, it will fail to initialize. + pub k8s_context: Option, } impl K8sAnywhereConfig { + /// Reads an environment variable `env_var` and parses its content : + /// Comma-separated `key=value` pairs, e.g., + /// `kubeconfig=/path/to/primary.kubeconfig,context_name=primary-ctx` + /// + /// Then creates a K8sAnywhereConfig from it local installs disabled (`use_local_k3d=false`, + /// `autoinstall=false`, `use_system_kubeconfig=false`). + /// `harmony_profile` is read from `HARMONY_PROFILE` env or defaults to `"dev"`. + /// + /// Panics if `env_var` is missing or malformed. + pub fn remote_k8s_from_env_var(env_var: &str) -> Self { + Self::remote_k8s_from_env_var_with_profile(env_var, "HARMONY_PROFILE") + } + + pub fn remote_k8s_from_env_var_with_profile(env_var: &str, profile_env_var: &str) -> Self { + let env_var_value = std::env::var(env_var) + .map_err(|_| format!("Missing required env var {env_var}")) + .unwrap(); + info!("Initializing remote k8s from env var value : {env_var_value}"); + + let mut kubeconfig: Option = None; + let mut k8s_context: Option = None; + + for part in env_var_value.split(',') { + let kv: Vec<&str> = part.splitn(2, '=').collect(); + if kv.len() == 2 { + match kv[0].trim() { + "kubeconfig" => kubeconfig = Some(kv[1].trim().to_string()), + "context_name" => k8s_context = Some(kv[1].trim().to_string()), + _ => {} + } + } + } + + K8sAnywhereConfig { + kubeconfig, + k8s_context, + use_system_kubeconfig: false, + autoinstall: false, + use_local_k3d: false, + harmony_profile: std::env::var(profile_env_var).unwrap_or_else(|_| "dev".to_string()), + } + } + fn from_env() -> Self { Self { kubeconfig: std::env::var("KUBECONFIG").ok().map(|v| v.to_string()), @@ -939,6 +989,7 @@ impl K8sAnywhereConfig { ), use_local_k3d: std::env::var("HARMONY_USE_LOCAL_K3D") .map_or_else(|_| true, |v| v.parse().ok().unwrap_or(true)), + k8s_context: std::env::var("HARMONY_K8S_CONTEXT").ok(), } } } @@ -1045,3 +1096,124 @@ impl Ingress for K8sAnywhereTopology { } } } + +#[cfg(test)] +mod tests { + use super::*; + use std::sync::atomic::{AtomicUsize, Ordering}; + + static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0); + + /// Sets environment variables with unique names to avoid concurrency issues between tests. + /// Returns the names of the (config_var, profile_var) used. + fn setup_env_vars(config_value: Option<&str>, profile_value: Option<&str>) -> (String, String) { + let id = TEST_COUNTER.fetch_add(1, Ordering::SeqCst); + let config_var = format!("TEST_VAR_{}", id); + let profile_var = format!("TEST_PROFILE_{}", id); + + unsafe { + if let Some(v) = config_value { + std::env::set_var(&config_var, v); + } else { + std::env::remove_var(&config_var); + } + + if let Some(v) = profile_value { + std::env::set_var(&profile_var, v); + } else { + std::env::remove_var(&profile_var); + } + } + (config_var, profile_var) + } + + #[test] + fn test_remote_k8s_from_env_var_full() { + let (config_var, profile_var) = setup_env_vars( + Some("kubeconfig=/foo.kc,context_name=bar"), + Some("testprof") + ); + + let cfg = K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); + + assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc")); + assert_eq!(cfg.k8s_context.as_deref(), Some("bar")); + assert_eq!(cfg.harmony_profile, "testprof"); + assert!(!cfg.use_local_k3d); + assert!(!cfg.autoinstall); + assert!(!cfg.use_system_kubeconfig); + } + + #[test] + fn test_remote_k8s_from_env_var_only_kubeconfig() { + let (config_var, profile_var) = setup_env_vars( + Some("kubeconfig=/foo.kc"), + None + ); + + let cfg = K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); + + assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc")); + assert_eq!(cfg.k8s_context, None); + assert_eq!(cfg.harmony_profile, "dev"); + } + + #[test] + fn test_remote_k8s_from_env_var_only_context() { + let (config_var, profile_var) = setup_env_vars( + Some("context_name=bar"), + None + ); + + let cfg = K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); + + assert_eq!(cfg.kubeconfig, None); + assert_eq!(cfg.k8s_context.as_deref(), Some("bar")); + } + + #[test] + fn test_remote_k8s_from_env_var_unknown_key_trim() { + let (config_var, profile_var) = setup_env_vars( + Some(" unknown=bla , kubeconfig= /foo.kc ,context_name= bar "), + None + ); + + let cfg = K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); + + assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc")); + assert_eq!(cfg.k8s_context.as_deref(), Some("bar")); + } + + #[test] + fn test_remote_k8s_from_env_var_empty_malformed() { + let (config_var, profile_var) = setup_env_vars( + Some("malformed,no=,equal"), + None + ); + + let cfg = K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); + + // Unknown/malformed ignored, defaults to None + assert_eq!(cfg.kubeconfig, None); + assert_eq!(cfg.k8s_context, None); + } + + #[test] + #[should_panic(expected = "Missing required env var")] + fn test_remote_k8s_from_env_var_missing() { + let (config_var, profile_var) = setup_env_vars(None, None); + K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); + } + + #[test] + fn test_remote_k8s_from_env_var_harmony_profile_default() { + let (config_var, profile_var) = setup_env_vars( + Some("kubeconfig=/foo"), + None + ); + + let cfg = K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); + + assert_eq!(cfg.harmony_profile, "dev"); + } +} diff --git a/harmony/src/domain/topology/k8s_anywhere/postgres.rs b/harmony/src/domain/topology/k8s_anywhere/postgres.rs index 275c283..9af9810 100644 --- a/harmony/src/domain/topology/k8s_anywhere/postgres.rs +++ b/harmony/src/domain/topology/k8s_anywhere/postgres.rs @@ -8,9 +8,12 @@ use crate::{ capability::{PostgreSQL, PostgreSQLConfig, PostgreSQLEndpoint, ReplicationCerts}, }, score::Score, - topology::K8sAnywhereTopology, + topology::{K8sAnywhereTopology, K8sclient}, }; +use k8s_openapi::api::core::v1::{Secret, Service}; +use log::info; + #[async_trait] impl PostgreSQL for K8sAnywhereTopology { async fn deploy(&self, config: &PostgreSQLConfig) -> Result { @@ -27,12 +30,78 @@ impl PostgreSQL for K8sAnywhereTopology { /// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster. /// Abstracts away storage/retrieval details (e.g., secrets, files). async fn get_replication_certs(&self, cluster_name: &str) -> Result { - todo!() + let k8s_client = self.k8s_client().await.map_err(|e| e.to_string())?; + + let replication_secret_name = format!("{cluster_name}-replication"); + let replication_secret = k8s_client + .get_resource::(&replication_secret_name, None) + .await + .map_err(|e| format!("Failed to get {replication_secret_name}: {e}"))? + .ok_or_else(|| format!("Replication secret '{replication_secret_name}' not found"))?; + + let ca_secret_name = format!("{cluster_name}-ca"); + let ca_secret = k8s_client + .get_resource::(&ca_secret_name, None) + .await + .map_err(|e| format!("Failed to get {ca_secret_name}: {e}"))? + .ok_or_else(|| format!("CA secret '{ca_secret_name}' not found"))?; + + let replication_data = replication_secret + .data + .as_ref() + .ok_or("Replication secret has no data".to_string())?; + let ca_data = ca_secret + .data + .as_ref() + .ok_or("CA secret has no data".to_string())?; + + let tls_key_bs = replication_data + .get("tls.key") + .ok_or("missing tls.key in replication secret".to_string())?; + let tls_crt_bs = replication_data + .get("tls.crt") + .ok_or("missing tls.crt in replication secret".to_string())?; + let ca_crt_bs = ca_data + .get("ca.crt") + .ok_or("missing ca.crt in CA secret".to_string())?; + + let streaming_replica_key_pem = String::from_utf8_lossy(&tls_key_bs.0).to_string(); + let streaming_replica_cert_pem = String::from_utf8_lossy(&tls_crt_bs.0).to_string(); + let ca_cert_pem = String::from_utf8_lossy(&ca_crt_bs.0).to_string(); + + info!("Successfully extracted replication certs for cluster '{cluster_name}'"); + + Ok(ReplicationCerts { + ca_cert_pem, + streaming_replica_cert_pem, + streaming_replica_key_pem, + }) } /// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster. async fn get_endpoint(&self, cluster_name: &str) -> Result { - todo!() + let k8s_client = self.k8s_client().await.map_err(|e| e.to_string())?; + + let service_name = format!("{cluster_name}-rw"); + let service = k8s_client + .get_resource::(&service_name, None) + .await + .map_err(|e| format!("Failed to get service '{service_name}': {e}"))? + .ok_or_else(|| { + format!("Service '{service_name}' not found for cluster '{cluster_name}") + })?; + + let ns = service + .metadata + .namespace + .as_deref() + .unwrap_or("default") + .to_string(); + let host = format!("{service_name}.{ns}.svc.cluster.local"); + + info!("Internal endpoint for '{cluster_name}': {host}:5432"); + + Ok(PostgreSQLEndpoint { host, port: 5432 }) } /// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough). @@ -43,6 +112,9 @@ impl PostgreSQL for K8sAnywhereTopology { &self, cluster_name: &str, ) -> Result, String> { - todo!() + // TODO: Implement OpenShift Route lookup targeting '{cluster_name}-rw' service on port 5432 with TLS passthrough + // For now, return None assuming internal-only access or manual route configuration + info!("Public endpoint lookup not implemented for '{cluster_name}', returning None"); + Ok(None) } } diff --git a/harmony/src/modules/k8s/failover.rs b/harmony/src/modules/k8s/failover.rs new file mode 100644 index 0000000..731403d --- /dev/null +++ b/harmony/src/modules/k8s/failover.rs @@ -0,0 +1,18 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use log::warn; + +use crate::topology::{k8s::K8sClient, FailoverTopology, K8sclient}; + + +#[async_trait] +impl K8sclient for FailoverTopology { + // TODO figure out how to structure this properly. This gives access only to the primary k8s + // client, which will work in many cases but is clearly not good enough for all uses cases + // where k8s_client can be used. Logging a warning for now. + async fn k8s_client(&self) -> Result, String> { + warn!("Failover topology k8s_client capability currently defers to the primary only. Make sure to check this is OK for you"); + self.primary.k8s_client().await + } +} diff --git a/harmony/src/modules/k8s/mod.rs b/harmony/src/modules/k8s/mod.rs index 56c9201..9a2af66 100644 --- a/harmony/src/modules/k8s/mod.rs +++ b/harmony/src/modules/k8s/mod.rs @@ -3,3 +3,4 @@ pub mod deployment; pub mod ingress; pub mod namespace; pub mod resource; +mod failover; diff --git a/harmony/src/modules/network/failover.rs b/harmony/src/modules/network/failover.rs new file mode 100644 index 0000000..897abbd --- /dev/null +++ b/harmony/src/modules/network/failover.rs @@ -0,0 +1,14 @@ +use async_trait::async_trait; +use log::warn; + +use crate::topology::{FailoverTopology, TlsRoute, TlsRouter}; + +#[async_trait] +impl TlsRouter for FailoverTopology { + async fn install_route(&self, config: TlsRoute) -> Result<(), String> { + warn!( + "Failover topology TlsRouter capability currently defers to the primary only. Make sure to check this is OK for you. The Replica Topology WILL NOT be affected here" + ); + self.primary.install_route(config).await + } +} diff --git a/harmony/src/modules/network/mod.rs b/harmony/src/modules/network/mod.rs index c70dd89..c269ef2 100644 --- a/harmony/src/modules/network/mod.rs +++ b/harmony/src/modules/network/mod.rs @@ -1,2 +1,3 @@ mod tls_router; +mod failover; pub use tls_router::*; diff --git a/harmony/src/modules/postgresql/failover.rs b/harmony/src/modules/postgresql/failover.rs index 8e80166..1df5008 100644 --- a/harmony/src/modules/postgresql/failover.rs +++ b/harmony/src/modules/postgresql/failover.rs @@ -3,7 +3,6 @@ use log::debug; use log::info; use std::collections::HashMap; -use crate::interpret::Outcome; use crate::{ modules::postgresql::capability::{ BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL,