feat(postgres): Failover postgres example maybe working!? Added FailoverTopology implementations for required capabilities, documented a bit, some more tests, and quite a few utility functions
Some checks failed
Run Check Script / check (pull_request) Failing after 1m49s

This commit is contained in:
2025-12-17 14:32:23 -05:00
parent 440e684b35
commit 66a9a76a6b
18 changed files with 410 additions and 29 deletions

16
Cargo.lock generated
View File

@@ -1776,6 +1776,21 @@ dependencies = [
"url",
]
[[package]]
name = "example-multisite-postgres"
version = "0.1.0"
dependencies = [
"cidr",
"env_logger",
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_types",
"log",
"tokio",
"url",
]
[[package]]
name = "example-nanodc"
version = "0.1.0"
@@ -2561,6 +2576,7 @@ version = "0.1.0"
dependencies = [
"rand 0.9.2",
"serde",
"serde_json",
"url",
]

View File

@@ -0,0 +1,18 @@
[package]
name = "example-multisite-postgres"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
cidr = { workspace = true }
tokio = { workspace = true }
harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }

View File

@@ -0,0 +1,31 @@
use harmony::{
inventory::Inventory,
modules::postgresql::{
K8sPostgreSQLScore, PublicPostgreSQLScore, capability::PostgreSQLConfig,
},
topology::{FailoverTopology, K8sAnywhereTopology},
};
#[tokio::main]
async fn main() {
let postgres = PublicPostgreSQLScore {
postgres_score: K8sPostgreSQLScore {
config: PostgreSQLConfig {
cluster_name: "harmony-postgres-example".to_string(), // Override default name
namespace: "harmony-public-postgres".to_string(),
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
// "default" namespace, 1 instance, 1Gi storage
},
},
hostname: "postgrestest.sto1.nationtech.io".to_string(),
};
harmony_cli::run(
Inventory::autoload(),
FailoverTopology::<K8sAnywhereTopology>::from_env(),
vec![Box::new(postgres)],
None,
)
.await
.unwrap();
}

View File

@@ -1,6 +1,7 @@
use async_trait::async_trait;
use crate::topology::{PreparationError, PreparationOutcome, Topology};
use crate::topology::k8s_anywhere::K8sAnywhereConfig;
use crate::topology::{K8sAnywhereTopology, PreparationError, PreparationOutcome, Topology};
pub struct FailoverTopology<T> {
pub primary: T,
@@ -17,3 +18,30 @@ impl<T: Send + Sync> Topology for FailoverTopology<T> {
todo!()
}
}
impl FailoverTopology<K8sAnywhereTopology> {
/// Creates a new `FailoverTopology` from environment variables.
///
/// Expects two environment variables:
/// - `HARMONY_FAILOVER_TOPOLOGY_K8S_PRIMARY`: Comma-separated `key=value` pairs, e.g.,
/// `kubeconfig=/path/to/primary.kubeconfig,context_name=primary-ctx`
/// - `HARMONY_FAILOVER_TOPOLOGY_K8S_REPLICA`: Same format for the replica.
///
/// Parses `kubeconfig` (path to kubeconfig file) and `context_name` (Kubernetes context),
/// and constructs `K8sAnywhereConfig` with local installs disabled (`use_local_k3d=false`,
/// `autoinstall=false`, `use_system_kubeconfig=false`).
/// `harmony_profile` is read from `HARMONY_PROFILE` env or defaults to `"dev"`.
///
/// Panics if required env vars are missing or malformed.
pub fn from_env() -> Self {
let primary_config =
K8sAnywhereConfig::remote_k8s_from_env_var("HARMONY_FAILOVER_TOPOLOGY_K8S_PRIMARY");
let replica_config =
K8sAnywhereConfig::remote_k8s_from_env_var("HARMONY_FAILOVER_TOPOLOGY_K8S_REPLICA");
let primary = K8sAnywhereTopology::with_config(primary_config);
let replica = K8sAnywhereTopology::with_config(replica_config);
Self { primary, replica }
}
}

View File

@@ -631,6 +631,23 @@ impl K8sClient {
}
pub async fn from_kubeconfig(path: &str) -> Option<K8sClient> {
Self::from_kubeconfig_with_opts(path, &KubeConfigOptions::default()).await
}
pub async fn from_kubeconfig_with_context(
path: &str,
context: Option<String>,
) -> Option<K8sClient> {
let mut opts = KubeConfigOptions::default();
opts.context = context;
Self::from_kubeconfig_with_opts(path, &opts).await
}
pub async fn from_kubeconfig_with_opts(
path: &str,
opts: &KubeConfigOptions,
) -> Option<K8sClient> {
let k = match Kubeconfig::read_from(path) {
Ok(k) => k,
Err(e) => {
@@ -638,13 +655,9 @@ impl K8sClient {
return None;
}
};
Some(K8sClient::new(
Client::try_from(
Config::from_custom_kubeconfig(k, &KubeConfigOptions::default())
.await
.unwrap(),
)
.unwrap(),
Client::try_from(Config::from_custom_kubeconfig(k, &opts).await.unwrap()).unwrap(),
))
}
}

View File

@@ -644,7 +644,7 @@ impl K8sAnywhereTopology {
}
async fn try_load_kubeconfig(&self, path: &str) -> Option<K8sClient> {
K8sClient::from_kubeconfig(path).await
K8sClient::from_kubeconfig_with_context(path, self.config.k8s_context.clone()).await
}
fn get_k3d_installation_score(&self) -> K3DInstallationScore {
@@ -922,9 +922,59 @@ pub struct K8sAnywhereConfig {
/// default: true
pub use_local_k3d: bool,
pub harmony_profile: String,
/// Name of the kubeconfig context to use.
///
/// If None, it will use the current context.
///
/// If the context name is not found, it will fail to initialize.
pub k8s_context: Option<String>,
}
impl K8sAnywhereConfig {
/// Reads an environment variable `env_var` and parses its content :
/// Comma-separated `key=value` pairs, e.g.,
/// `kubeconfig=/path/to/primary.kubeconfig,context_name=primary-ctx`
///
/// Then creates a K8sAnywhereConfig from it local installs disabled (`use_local_k3d=false`,
/// `autoinstall=false`, `use_system_kubeconfig=false`).
/// `harmony_profile` is read from `HARMONY_PROFILE` env or defaults to `"dev"`.
///
/// Panics if `env_var` is missing or malformed.
pub fn remote_k8s_from_env_var(env_var: &str) -> Self {
Self::remote_k8s_from_env_var_with_profile(env_var, "HARMONY_PROFILE")
}
pub fn remote_k8s_from_env_var_with_profile(env_var: &str, profile_env_var: &str) -> Self {
let env_var_value = std::env::var(env_var)
.map_err(|_| format!("Missing required env var {env_var}"))
.unwrap();
info!("Initializing remote k8s from env var value : {env_var_value}");
let mut kubeconfig: Option<String> = None;
let mut k8s_context: Option<String> = None;
for part in env_var_value.split(',') {
let kv: Vec<&str> = part.splitn(2, '=').collect();
if kv.len() == 2 {
match kv[0].trim() {
"kubeconfig" => kubeconfig = Some(kv[1].trim().to_string()),
"context_name" => k8s_context = Some(kv[1].trim().to_string()),
_ => {}
}
}
}
K8sAnywhereConfig {
kubeconfig,
k8s_context,
use_system_kubeconfig: false,
autoinstall: false,
use_local_k3d: false,
harmony_profile: std::env::var(profile_env_var).unwrap_or_else(|_| "dev".to_string()),
}
}
fn from_env() -> Self {
Self {
kubeconfig: std::env::var("KUBECONFIG").ok().map(|v| v.to_string()),
@@ -939,6 +989,7 @@ impl K8sAnywhereConfig {
),
use_local_k3d: std::env::var("HARMONY_USE_LOCAL_K3D")
.map_or_else(|_| true, |v| v.parse().ok().unwrap_or(true)),
k8s_context: std::env::var("HARMONY_K8S_CONTEXT").ok(),
}
}
}
@@ -1045,3 +1096,118 @@ impl Ingress for K8sAnywhereTopology {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
/// Sets environment variables with unique names to avoid concurrency issues between tests.
/// Returns the names of the (config_var, profile_var) used.
fn setup_env_vars(config_value: Option<&str>, profile_value: Option<&str>) -> (String, String) {
let id = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
let config_var = format!("TEST_VAR_{}", id);
let profile_var = format!("TEST_PROFILE_{}", id);
unsafe {
if let Some(v) = config_value {
std::env::set_var(&config_var, v);
} else {
std::env::remove_var(&config_var);
}
if let Some(v) = profile_value {
std::env::set_var(&profile_var, v);
} else {
std::env::remove_var(&profile_var);
}
}
(config_var, profile_var)
}
#[test]
fn test_remote_k8s_from_env_var_full() {
let (config_var, profile_var) = setup_env_vars(
Some("kubeconfig=/foo.kc,context_name=bar"),
Some("testprof"),
);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc"));
assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
assert_eq!(cfg.harmony_profile, "testprof");
assert!(!cfg.use_local_k3d);
assert!(!cfg.autoinstall);
assert!(!cfg.use_system_kubeconfig);
}
#[test]
fn test_remote_k8s_from_env_var_only_kubeconfig() {
let (config_var, profile_var) = setup_env_vars(Some("kubeconfig=/foo.kc"), None);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc"));
assert_eq!(cfg.k8s_context, None);
assert_eq!(cfg.harmony_profile, "dev");
}
#[test]
fn test_remote_k8s_from_env_var_only_context() {
let (config_var, profile_var) = setup_env_vars(Some("context_name=bar"), None);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.kubeconfig, None);
assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
}
#[test]
fn test_remote_k8s_from_env_var_unknown_key_trim() {
let (config_var, profile_var) = setup_env_vars(
Some(" unknown=bla , kubeconfig= /foo.kc ,context_name= bar "),
None,
);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc"));
assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
}
#[test]
fn test_remote_k8s_from_env_var_empty_malformed() {
let (config_var, profile_var) = setup_env_vars(Some("malformed,no=,equal"), None);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
// Unknown/malformed ignored, defaults to None
assert_eq!(cfg.kubeconfig, None);
assert_eq!(cfg.k8s_context, None);
}
#[test]
#[should_panic(expected = "Missing required env var")]
fn test_remote_k8s_from_env_var_missing() {
let (config_var, profile_var) = setup_env_vars(None, None);
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
}
#[test]
fn test_remote_k8s_from_env_var_harmony_profile_default() {
let (config_var, profile_var) = setup_env_vars(Some("kubeconfig=/foo"), None);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.harmony_profile, "dev");
}
}

View File

@@ -8,9 +8,12 @@ use crate::{
capability::{PostgreSQL, PostgreSQLConfig, PostgreSQLEndpoint, ReplicationCerts},
},
score::Score,
topology::K8sAnywhereTopology,
topology::{K8sAnywhereTopology, K8sclient},
};
use k8s_openapi::api::core::v1::{Secret, Service};
use log::info;
#[async_trait]
impl PostgreSQL for K8sAnywhereTopology {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
@@ -27,12 +30,78 @@ impl PostgreSQL for K8sAnywhereTopology {
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
/// Abstracts away storage/retrieval details (e.g., secrets, files).
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> {
todo!()
let k8s_client = self.k8s_client().await.map_err(|e| e.to_string())?;
let replication_secret_name = format!("{cluster_name}-replication");
let replication_secret = k8s_client
.get_resource::<Secret>(&replication_secret_name, None)
.await
.map_err(|e| format!("Failed to get {replication_secret_name}: {e}"))?
.ok_or_else(|| format!("Replication secret '{replication_secret_name}' not found"))?;
let ca_secret_name = format!("{cluster_name}-ca");
let ca_secret = k8s_client
.get_resource::<Secret>(&ca_secret_name, None)
.await
.map_err(|e| format!("Failed to get {ca_secret_name}: {e}"))?
.ok_or_else(|| format!("CA secret '{ca_secret_name}' not found"))?;
let replication_data = replication_secret
.data
.as_ref()
.ok_or("Replication secret has no data".to_string())?;
let ca_data = ca_secret
.data
.as_ref()
.ok_or("CA secret has no data".to_string())?;
let tls_key_bs = replication_data
.get("tls.key")
.ok_or("missing tls.key in replication secret".to_string())?;
let tls_crt_bs = replication_data
.get("tls.crt")
.ok_or("missing tls.crt in replication secret".to_string())?;
let ca_crt_bs = ca_data
.get("ca.crt")
.ok_or("missing ca.crt in CA secret".to_string())?;
let streaming_replica_key_pem = String::from_utf8_lossy(&tls_key_bs.0).to_string();
let streaming_replica_cert_pem = String::from_utf8_lossy(&tls_crt_bs.0).to_string();
let ca_cert_pem = String::from_utf8_lossy(&ca_crt_bs.0).to_string();
info!("Successfully extracted replication certs for cluster '{cluster_name}'");
Ok(ReplicationCerts {
ca_cert_pem,
streaming_replica_cert_pem,
streaming_replica_key_pem,
})
}
/// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster.
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String> {
todo!()
let k8s_client = self.k8s_client().await.map_err(|e| e.to_string())?;
let service_name = format!("{cluster_name}-rw");
let service = k8s_client
.get_resource::<Service>(&service_name, None)
.await
.map_err(|e| format!("Failed to get service '{service_name}': {e}"))?
.ok_or_else(|| {
format!("Service '{service_name}' not found for cluster '{cluster_name}")
})?;
let ns = service
.metadata
.namespace
.as_deref()
.unwrap_or("default")
.to_string();
let host = format!("{service_name}.{ns}.svc.cluster.local");
info!("Internal endpoint for '{cluster_name}': {host}:5432");
Ok(PostgreSQLEndpoint { host, port: 5432 })
}
/// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough).
@@ -43,6 +112,9 @@ impl PostgreSQL for K8sAnywhereTopology {
&self,
cluster_name: &str,
) -> Result<Option<PostgreSQLEndpoint>, String> {
todo!()
// TODO: Implement OpenShift Route lookup targeting '{cluster_name}-rw' service on port 5432 with TLS passthrough
// For now, return None assuming internal-only access or manual route configuration
info!("Public endpoint lookup not implemented for '{cluster_name}', returning None");
Ok(None)
}
}

View File

@@ -80,7 +80,7 @@ pub struct TlsRoute {
pub namespace: String,
}
impl TlsRoute {
impl TlsRoute {
pub fn to_string_short(&self) -> String {
format!("{}-{}:{}", self.hostname, self.backend, self.target_port)
}
@@ -88,7 +88,7 @@ pub struct TlsRoute {
pub fn backend_info_string(&self) -> String {
format!("{}:{}", self.backend, self.target_port)
}
}
}
/// Installs and queries TLS passthrough routes (L4 TCP/SNI forwarding, no TLS termination).
/// Agnostic to impl: OKD Route, AWS NLB+HAProxy, k3s Envoy Gateway, Apache ProxyPass.

View File

@@ -0,0 +1,19 @@
use std::sync::Arc;
use async_trait::async_trait;
use log::warn;
use crate::topology::{FailoverTopology, K8sclient, k8s::K8sClient};
#[async_trait]
impl<T: K8sclient> K8sclient for FailoverTopology<T> {
// TODO figure out how to structure this properly. This gives access only to the primary k8s
// client, which will work in many cases but is clearly not good enough for all uses cases
// where k8s_client can be used. Logging a warning for now.
async fn k8s_client(&self) -> Result<Arc<K8sClient>, String> {
warn!(
"Failover topology k8s_client capability currently defers to the primary only. Make sure to check this is OK for you"
);
self.primary.k8s_client().await
}
}

View File

@@ -1,5 +1,6 @@
pub mod apps;
pub mod deployment;
mod failover;
pub mod ingress;
pub mod namespace;
pub mod resource;

View File

@@ -0,0 +1,14 @@
use async_trait::async_trait;
use log::warn;
use crate::topology::{FailoverTopology, TlsRoute, TlsRouter};
#[async_trait]
impl<T: TlsRouter> TlsRouter for FailoverTopology<T> {
async fn install_route(&self, config: TlsRoute) -> Result<(), String> {
warn!(
"Failover topology TlsRouter capability currently defers to the primary only. Make sure to check this is OK for you. The Replica Topology WILL NOT be affected here"
);
self.primary.install_route(config).await
}
}

View File

@@ -1,2 +1,3 @@
mod failover;
mod tls_router;
pub use tls_router::*;

View File

@@ -88,7 +88,11 @@ impl<T: Topology + K8sclient> Score<T> for OKDTlsPassthroughScore {
}),
..Default::default()
};
let route_score = OKDRouteScore::new(&self.name.to_string(), &self.route.namespace, passthrough_spec);
let route_score = OKDRouteScore::new(
&self.name.to_string(),
&self.route.namespace,
passthrough_spec,
);
route_score.create_interpret()
}

View File

@@ -3,7 +3,6 @@ use log::debug;
use log::info;
use std::collections::HashMap;
use crate::interpret::Outcome;
use crate::{
modules::postgresql::capability::{
BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL,

View File

@@ -1,6 +1,6 @@
pub mod capability;
mod score_k8s;
mod score_connect;
mod score_k8s;
pub use score_connect::*;
pub use score_k8s::*;
mod score_public;

View File

@@ -1,5 +1,5 @@
pub mod id;
pub mod net;
pub mod rfc1123;
pub mod storage;
pub mod switch;
pub mod rfc1123;

View File

@@ -43,7 +43,9 @@ impl TryFrom<&str> for Rfc1123Name {
}
// Trim leading/trailing non-alphanumeric
content = content.trim_matches(|c: char| !c.is_ascii_alphanumeric()).to_string();
content = content
.trim_matches(|c: char| !c.is_ascii_alphanumeric())
.to_string();
if content.is_empty() {
return Err(format!("Input '{}' resulted in empty string", s));
@@ -55,7 +57,6 @@ impl TryFrom<&str> for Rfc1123Name {
type Error = String;
}
/// Converts an `Rfc1123Name` into a `String`.
///
/// This allows using `Rfc1123Name` in contexts where a `String` is expected.
@@ -99,7 +100,6 @@ impl std::fmt::Display for Rfc1123Name {
}
}
#[cfg(test)]
mod tests {
use super::Rfc1123Name;
@@ -229,4 +229,3 @@ mod tests {
assert_eq!(name.content, "a.b.c");
}
}

View File

@@ -71,7 +71,7 @@ impl StorageSize {
Self {
size_bytes: size * 1024 * 1024 * 1024 * 1024,
display_value: Some(size),
display_suffix: Some("TiB".to_string()),
display_suffix: Some("Ti".to_string()),
}
}