feat(failoverPostgres): Its alive! We can now deploy a multisite postgres instance. The public hostname is still hardcoded, we will have to fix that but the rest is good enough
Some checks failed
Run Check Script / check (pull_request) Failing after 36s

This commit is contained in:
2025-12-17 16:43:37 -05:00
parent 66a9a76a6b
commit 204795a74f
11 changed files with 258 additions and 114 deletions

View File

@@ -0,0 +1,3 @@
export HARMONY_FAILOVER_TOPOLOGY_K8S_PRIMARY="context=default/api-your-openshift-cluster:6443/kube:admin"
export HARMONY_FAILOVER_TOPOLOGY_K8S_REPLICA="context=someuser/somecluster"
export RUST_LOG="harmony=debug"

View File

@@ -1,22 +1,19 @@
use harmony::{ use harmony::{
inventory::Inventory, inventory::Inventory,
modules::postgresql::{ modules::postgresql::{PublicPostgreSQLScore, capability::PostgreSQLConfig},
K8sPostgreSQLScore, PublicPostgreSQLScore, capability::PostgreSQLConfig,
},
topology::{FailoverTopology, K8sAnywhereTopology}, topology::{FailoverTopology, K8sAnywhereTopology},
}; };
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
// env_logger::init();
let postgres = PublicPostgreSQLScore { let postgres = PublicPostgreSQLScore {
postgres_score: K8sPostgreSQLScore {
config: PostgreSQLConfig { config: PostgreSQLConfig {
cluster_name: "harmony-postgres-example".to_string(), // Override default name cluster_name: "harmony-postgres-example".to_string(), // Override default name
namespace: "harmony-public-postgres".to_string(), namespace: "harmony-public-postgres".to_string(),
..Default::default() // Use harmony defaults, they are based on CNPG's default values : ..Default::default() // Use harmony defaults, they are based on CNPG's default values :
// "default" namespace, 1 instance, 1Gi storage // "default" namespace, 1 instance, 1Gi storage
}, },
},
hostname: "postgrestest.sto1.nationtech.io".to_string(), hostname: "postgrestest.sto1.nationtech.io".to_string(),
}; };

View File

@@ -10,14 +10,12 @@ use harmony::{
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let postgres = PublicPostgreSQLScore { let postgres = PublicPostgreSQLScore {
postgres_score: K8sPostgreSQLScore {
config: PostgreSQLConfig { config: PostgreSQLConfig {
cluster_name: "harmony-postgres-example".to_string(), // Override default name cluster_name: "harmony-postgres-example".to_string(), // Override default name
namespace: "harmony-public-postgres".to_string(), namespace: "harmony-public-postgres".to_string(),
..Default::default() // Use harmony defaults, they are based on CNPG's default values : ..Default::default() // Use harmony defaults, they are based on CNPG's default values :
// 1 instance, 1Gi storage // 1 instance, 1Gi storage
}, },
},
hostname: "postgrestest.sto1.nationtech.io".to_string(), hostname: "postgrestest.sto1.nationtech.io".to_string(),
}; };

View File

@@ -9,13 +9,30 @@ pub struct FailoverTopology<T> {
} }
#[async_trait] #[async_trait]
impl<T: Send + Sync> Topology for FailoverTopology<T> { impl<T: Topology + Send + Sync> Topology for FailoverTopology<T> {
fn name(&self) -> &str { fn name(&self) -> &str {
"FailoverTopology" "FailoverTopology"
} }
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> { async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
todo!() let primary_outcome = self.primary.ensure_ready().await?;
let replica_outcome = self.replica.ensure_ready().await?;
match (primary_outcome, replica_outcome) {
(PreparationOutcome::Noop, PreparationOutcome::Noop) => Ok(PreparationOutcome::Noop),
(p, r) => {
let mut details = Vec::new();
if let PreparationOutcome::Success { details: d } = p {
details.push(format!("Primary: {}", d));
}
if let PreparationOutcome::Success { details: d } = r {
details.push(format!("Replica: {}", d));
}
Ok(PreparationOutcome::Success {
details: details.join(", "),
})
}
}
} }
} }

View File

@@ -106,6 +106,16 @@ impl K8sclient for K8sAnywhereTopology {
#[async_trait] #[async_trait]
impl TlsRouter for K8sAnywhereTopology { impl TlsRouter for K8sAnywhereTopology {
async fn get_wildcard_domain(&self) -> Result<Option<String>, String> {
todo!()
}
/// Returns the port that this router exposes externally.
async fn get_router_port(&self) -> u16 {
// TODO un-hardcode this :)
443
}
async fn install_route(&self, route: TlsRoute) -> Result<(), String> { async fn install_route(&self, route: TlsRoute) -> Result<(), String> {
let distro = self let distro = self
.get_k8s_distribution() .get_k8s_distribution()
@@ -682,7 +692,14 @@ impl K8sAnywhereTopology {
return Ok(Some(K8sState { return Ok(Some(K8sState {
client: Arc::new(client), client: Arc::new(client),
source: K8sSource::Kubeconfig, source: K8sSource::Kubeconfig,
message: format!("Loaded k8s client from kubeconfig {kubeconfig}"), message: format!(
"Loaded k8s client from kubeconfig {kubeconfig} using context {}",
self.config
.k8s_context
.as_ref()
.map(|s| s.clone())
.unwrap_or_default()
),
})); }));
} }
None => { None => {
@@ -934,20 +951,23 @@ pub struct K8sAnywhereConfig {
impl K8sAnywhereConfig { impl K8sAnywhereConfig {
/// Reads an environment variable `env_var` and parses its content : /// Reads an environment variable `env_var` and parses its content :
/// Comma-separated `key=value` pairs, e.g., /// Comma-separated `key=value` pairs, e.g.,
/// `kubeconfig=/path/to/primary.kubeconfig,context_name=primary-ctx` /// `kubeconfig=/path/to/primary.kubeconfig,context=primary-ctx`
/// ///
/// Then creates a K8sAnywhereConfig from it local installs disabled (`use_local_k3d=false`, /// Then creates a K8sAnywhereConfig from it local installs disabled (`use_local_k3d=false`,
/// `autoinstall=false`, `use_system_kubeconfig=false`). /// `autoinstall=false`, `use_system_kubeconfig=false`).
/// `harmony_profile` is read from `HARMONY_PROFILE` env or defaults to `"dev"`. /// `harmony_profile` is read from `HARMONY_PROFILE` env or defaults to `"dev"`.
/// ///
/// If no kubeconfig path is provided it will fall back to system kubeconfig
///
/// Panics if `env_var` is missing or malformed. /// Panics if `env_var` is missing or malformed.
pub fn remote_k8s_from_env_var(env_var: &str) -> Self { pub fn remote_k8s_from_env_var(env_var: &str) -> Self {
Self::remote_k8s_from_env_var_with_profile(env_var, "HARMONY_PROFILE") Self::remote_k8s_from_env_var_with_profile(env_var, "HARMONY_PROFILE")
} }
pub fn remote_k8s_from_env_var_with_profile(env_var: &str, profile_env_var: &str) -> Self { pub fn remote_k8s_from_env_var_with_profile(env_var: &str, profile_env_var: &str) -> Self {
debug!("Looking for env var named : {env_var}");
let env_var_value = std::env::var(env_var) let env_var_value = std::env::var(env_var)
.map_err(|_| format!("Missing required env var {env_var}")) .map_err(|e| format!("Missing required env var {env_var} : {e}"))
.unwrap(); .unwrap();
info!("Initializing remote k8s from env var value : {env_var_value}"); info!("Initializing remote k8s from env var value : {env_var_value}");
@@ -959,16 +979,25 @@ impl K8sAnywhereConfig {
if kv.len() == 2 { if kv.len() == 2 {
match kv[0].trim() { match kv[0].trim() {
"kubeconfig" => kubeconfig = Some(kv[1].trim().to_string()), "kubeconfig" => kubeconfig = Some(kv[1].trim().to_string()),
"context_name" => k8s_context = Some(kv[1].trim().to_string()), "context" => k8s_context = Some(kv[1].trim().to_string()),
_ => {} _ => {}
} }
} }
} }
debug!("Found in {env_var} : kubeconfig {kubeconfig:?} and context {k8s_context:?}");
let use_system_kubeconfig = kubeconfig.is_none();
if let Some(kubeconfig_value) = std::env::var("KUBECONFIG").ok().map(|v| v.to_string()) {
kubeconfig.get_or_insert(kubeconfig_value);
}
info!("Loading k8s environment with kubeconfig {kubeconfig:?} and context {k8s_context:?}");
K8sAnywhereConfig { K8sAnywhereConfig {
kubeconfig, kubeconfig,
k8s_context, k8s_context,
use_system_kubeconfig: false, use_system_kubeconfig,
autoinstall: false, autoinstall: false,
use_local_k3d: false, use_local_k3d: false,
harmony_profile: std::env::var(profile_env_var).unwrap_or_else(|_| "dev".to_string()), harmony_profile: std::env::var(profile_env_var).unwrap_or_else(|_| "dev".to_string()),
@@ -1124,15 +1153,23 @@ mod tests {
std::env::remove_var(&profile_var); std::env::remove_var(&profile_var);
} }
} }
(config_var, profile_var) (config_var, profile_var)
} }
/// Runs a test in a separate thread to avoid polluting the process environment.
fn run_in_isolated_env<F>(f: F)
where
F: FnOnce() + Send + 'static,
{
let handle = std::thread::spawn(f);
handle.join().expect("Test thread panicked");
}
#[test] #[test]
fn test_remote_k8s_from_env_var_full() { fn test_remote_k8s_from_env_var_full() {
let (config_var, profile_var) = setup_env_vars( let (config_var, profile_var) =
Some("kubeconfig=/foo.kc,context_name=bar"), setup_env_vars(Some("kubeconfig=/foo.kc,context=bar"), Some("testprof"));
Some("testprof"),
);
let cfg = let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
@@ -1159,19 +1196,28 @@ mod tests {
#[test] #[test]
fn test_remote_k8s_from_env_var_only_context() { fn test_remote_k8s_from_env_var_only_context() {
let (config_var, profile_var) = setup_env_vars(Some("context_name=bar"), None); run_in_isolated_env(|| {
unsafe {
std::env::remove_var("KUBECONFIG");
}
let (config_var, profile_var) = setup_env_vars(Some("context=bar"), None);
let cfg = let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.kubeconfig, None); assert_eq!(cfg.kubeconfig, None);
assert_eq!(cfg.k8s_context.as_deref(), Some("bar")); assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
});
} }
#[test] #[test]
fn test_remote_k8s_from_env_var_unknown_key_trim() { fn test_remote_k8s_from_env_var_unknown_key_trim() {
run_in_isolated_env(|| {
unsafe {
std::env::remove_var("KUBECONFIG");
}
let (config_var, profile_var) = setup_env_vars( let (config_var, profile_var) = setup_env_vars(
Some(" unknown=bla , kubeconfig= /foo.kc ,context_name= bar "), Some(" unknown=bla , kubeconfig= /foo.kc ,context= bar "),
None, None,
); );
@@ -1180,10 +1226,15 @@ mod tests {
assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc")); assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc"));
assert_eq!(cfg.k8s_context.as_deref(), Some("bar")); assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
});
} }
#[test] #[test]
fn test_remote_k8s_from_env_var_empty_malformed() { fn test_remote_k8s_from_env_var_empty_malformed() {
run_in_isolated_env(|| {
unsafe {
std::env::remove_var("KUBECONFIG");
}
let (config_var, profile_var) = setup_env_vars(Some("malformed,no=,equal"), None); let (config_var, profile_var) = setup_env_vars(Some("malformed,no=,equal"), None);
let cfg = let cfg =
@@ -1192,6 +1243,41 @@ mod tests {
// Unknown/malformed ignored, defaults to None // Unknown/malformed ignored, defaults to None
assert_eq!(cfg.kubeconfig, None); assert_eq!(cfg.kubeconfig, None);
assert_eq!(cfg.k8s_context, None); assert_eq!(cfg.k8s_context, None);
});
}
#[test]
fn test_remote_k8s_from_env_var_kubeconfig_fallback() {
run_in_isolated_env(|| {
unsafe {
std::env::set_var("KUBECONFIG", "/fallback/path");
}
let (config_var, profile_var) = setup_env_vars(Some("context=bar"), None);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.kubeconfig.as_deref(), Some("/fallback/path"));
assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
});
}
#[test]
fn test_remote_k8s_from_env_var_kubeconfig_no_fallback_if_provided() {
run_in_isolated_env(|| {
unsafe {
std::env::set_var("KUBECONFIG", "/fallback/path");
}
let (config_var, profile_var) =
setup_env_vars(Some("kubeconfig=/primary/path,context=bar"), None);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
// Primary path should take precedence
assert_eq!(cfg.kubeconfig.as_deref(), Some("/primary/path"));
assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
});
} }
#[test] #[test]
@@ -1202,12 +1288,18 @@ mod tests {
} }
#[test] #[test]
fn test_remote_k8s_from_env_var_harmony_profile_default() { fn test_remote_k8s_from_env_var_context_key() {
let (config_var, profile_var) = setup_env_vars(Some("kubeconfig=/foo"), None); let (config_var, profile_var) = setup_env_vars(
Some("context=default/api-sto1-harmony-mcd:6443/kube:admin"),
None,
);
let cfg = let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var); K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.harmony_profile, "dev"); assert_eq!(
cfg.k8s_context.as_deref(),
Some("default/api-sto1-harmony-mcd:6443/kube:admin")
);
} }
} }

View File

@@ -29,19 +29,21 @@ impl PostgreSQL for K8sAnywhereTopology {
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster. /// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
/// Abstracts away storage/retrieval details (e.g., secrets, files). /// Abstracts away storage/retrieval details (e.g., secrets, files).
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> { async fn get_replication_certs(&self, config: &PostgreSQLConfig) -> Result<ReplicationCerts, String> {
let cluster_name = &config.cluster_name;
let namespace = &config.namespace;
let k8s_client = self.k8s_client().await.map_err(|e| e.to_string())?; let k8s_client = self.k8s_client().await.map_err(|e| e.to_string())?;
let replication_secret_name = format!("{cluster_name}-replication"); let replication_secret_name = format!("{cluster_name}-replication");
let replication_secret = k8s_client let replication_secret = k8s_client
.get_resource::<Secret>(&replication_secret_name, None) .get_resource::<Secret>(&replication_secret_name, Some(namespace))
.await .await
.map_err(|e| format!("Failed to get {replication_secret_name}: {e}"))? .map_err(|e| format!("Failed to get {replication_secret_name}: {e}"))?
.ok_or_else(|| format!("Replication secret '{replication_secret_name}' not found"))?; .ok_or_else(|| format!("Replication secret '{replication_secret_name}' not found"))?;
let ca_secret_name = format!("{cluster_name}-ca"); let ca_secret_name = format!("{cluster_name}-ca");
let ca_secret = k8s_client let ca_secret = k8s_client
.get_resource::<Secret>(&ca_secret_name, None) .get_resource::<Secret>(&ca_secret_name, Some(namespace))
.await .await
.map_err(|e| format!("Failed to get {ca_secret_name}: {e}"))? .map_err(|e| format!("Failed to get {ca_secret_name}: {e}"))?
.ok_or_else(|| format!("CA secret '{ca_secret_name}' not found"))?; .ok_or_else(|| format!("CA secret '{ca_secret_name}' not found"))?;
@@ -79,12 +81,15 @@ impl PostgreSQL for K8sAnywhereTopology {
} }
/// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster. /// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster.
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String> { async fn get_endpoint(&self, config: &PostgreSQLConfig) -> Result<PostgreSQLEndpoint, String> {
let cluster_name = &config.cluster_name;
let namespace = &config.namespace;
let k8s_client = self.k8s_client().await.map_err(|e| e.to_string())?; let k8s_client = self.k8s_client().await.map_err(|e| e.to_string())?;
let service_name = format!("{cluster_name}-rw"); let service_name = format!("{cluster_name}-rw");
let service = k8s_client let service = k8s_client
.get_resource::<Service>(&service_name, None) .get_resource::<Service>(&service_name, Some(namespace))
.await .await
.map_err(|e| format!("Failed to get service '{service_name}': {e}"))? .map_err(|e| format!("Failed to get service '{service_name}': {e}"))?
.ok_or_else(|| { .ok_or_else(|| {
@@ -104,17 +109,17 @@ impl PostgreSQL for K8sAnywhereTopology {
Ok(PostgreSQLEndpoint { host, port: 5432 }) Ok(PostgreSQLEndpoint { host, port: 5432 })
} }
/// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough). // /// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough).
/// Returns None if no public endpoint (internal-only cluster). // /// Returns None if no public endpoint (internal-only cluster).
/// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex // /// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex
/// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait. // /// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait.
async fn get_public_endpoint( // async fn get_public_endpoint(
&self, // &self,
cluster_name: &str, // cluster_name: &str,
) -> Result<Option<PostgreSQLEndpoint>, String> { // ) -> Result<Option<PostgreSQLEndpoint>, String> {
// TODO: Implement OpenShift Route lookup targeting '{cluster_name}-rw' service on port 5432 with TLS passthrough // // TODO: Implement OpenShift Route lookup targeting '{cluster_name}-rw' service on port 5432 with TLS passthrough
// For now, return None assuming internal-only access or manual route configuration // // For now, return None assuming internal-only access or manual route configuration
info!("Public endpoint lookup not implemented for '{cluster_name}', returning None"); // info!("Public endpoint lookup not implemented for '{cluster_name}', returning None");
Ok(None) // Ok(None)
} // }
} }

View File

@@ -110,4 +110,18 @@ pub trait TlsRouter: Send + Sync {
/// Example: OKD Route{ host, to: backend:target_port, tls: {passthrough} }; /// Example: OKD Route{ host, to: backend:target_port, tls: {passthrough} };
/// HAProxy frontend→backend \"postgres-upstream\". /// HAProxy frontend→backend \"postgres-upstream\".
async fn install_route(&self, config: TlsRoute) -> Result<(), String>; async fn install_route(&self, config: TlsRoute) -> Result<(), String>;
/// Gets the base domain that can be used to deploy applications that will be automatically
/// routed to this cluster.
///
/// For example, if we have *.apps.nationtech.io pointing to a public load balancer, then this
/// function would return
///
/// ```
/// Some(String::new("apps.nationtech.io"))
/// ```
async fn get_wildcard_domain(&self) -> Result<Option<String>, String>;
/// Returns the port that this router exposes externally.
async fn get_router_port(&self) -> u16;
} }

View File

@@ -5,6 +5,10 @@ use crate::topology::{FailoverTopology, TlsRoute, TlsRouter};
#[async_trait] #[async_trait]
impl<T: TlsRouter> TlsRouter for FailoverTopology<T> { impl<T: TlsRouter> TlsRouter for FailoverTopology<T> {
async fn get_wildcard_domain(&self) -> Result<Option<String>, String> {todo!()}
/// Returns the port that this router exposes externally.
async fn get_router_port(&self) -> u16 {todo!()}
async fn install_route(&self, config: TlsRoute) -> Result<(), String> { async fn install_route(&self, config: TlsRoute) -> Result<(), String> {
warn!( warn!(
"Failover topology TlsRouter capability currently defers to the primary only. Make sure to check this is OK for you. The Replica Topology WILL NOT be affected here" "Failover topology TlsRouter capability currently defers to the primary only. Make sure to check this is OK for you. The Replica Topology WILL NOT be affected here"

View File

@@ -9,19 +9,19 @@ pub trait PostgreSQL: Send + Sync {
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster. /// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
/// Abstracts away storage/retrieval details (e.g., secrets, files). /// Abstracts away storage/retrieval details (e.g., secrets, files).
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String>; async fn get_replication_certs(&self, config: &PostgreSQLConfig) -> Result<ReplicationCerts, String>;
/// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster. /// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster.
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String>; async fn get_endpoint(&self, config: &PostgreSQLConfig) -> Result<PostgreSQLEndpoint, String>;
/// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough). // /// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough).
/// Returns None if no public endpoint (internal-only cluster). // /// Returns None if no public endpoint (internal-only cluster).
/// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex // /// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex
/// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait. // /// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait.
async fn get_public_endpoint( // async fn get_public_endpoint(
&self, // &self,
cluster_name: &str, // cluster_name: &str,
) -> Result<Option<PostgreSQLEndpoint>, String>; // ) -> Result<Option<PostgreSQLEndpoint>, String>;
} }
#[derive(Clone, Debug, Serialize)] #[derive(Clone, Debug, Serialize)]
@@ -34,6 +34,13 @@ pub struct PostgreSQLConfig {
/// settings incompatible with the default CNPG behavior. /// settings incompatible with the default CNPG behavior.
pub namespace: String, pub namespace: String,
} }
impl PostgreSQLConfig {
pub fn with_namespace(&self, namespace: &str) -> PostgreSQLConfig {
let mut new = self.clone();
new.namespace = namespace.to_string();
new
}
}
impl Default for PostgreSQLConfig { impl Default for PostgreSQLConfig {
fn default() -> Self { fn default() -> Self {

View File

@@ -3,6 +3,7 @@ use log::debug;
use log::info; use log::info;
use std::collections::HashMap; use std::collections::HashMap;
use crate::topology::TlsRouter;
use crate::{ use crate::{
modules::postgresql::capability::{ modules::postgresql::capability::{
BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL, BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL,
@@ -13,7 +14,7 @@ use crate::{
}; };
#[async_trait] #[async_trait]
impl<T: PostgreSQL> PostgreSQL for FailoverTopology<T> { impl<T: PostgreSQL + TlsRouter> PostgreSQL for FailoverTopology<T> {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> { async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
info!( info!(
"Starting deployment of failover topology '{}'", "Starting deployment of failover topology '{}'",
@@ -39,20 +40,19 @@ impl<T: PostgreSQL> PostgreSQL for FailoverTopology<T> {
info!("Retrieving replication certificates for primary '{primary_cluster_name}'"); info!("Retrieving replication certificates for primary '{primary_cluster_name}'");
let certs = self let certs = self.primary.get_replication_certs(&primary_config).await?;
.primary
.get_replication_certs(&primary_cluster_name)
.await?;
info!("Replication certificates retrieved successfully"); info!("Replication certificates retrieved successfully");
info!("Retrieving public endpoint for primary '{primary_cluster_name}"); info!("Retrieving public endpoint for primary '{primary_cluster_name}");
let endpoint = self // TODO we should be getting the public endpoint for a service by calling a method on
.primary // TlsRouter capability.
.get_public_endpoint(&primary_cluster_name) // Something along the lines of `TlsRouter::get_hostname_for_service(...).await?;`
.await? let endpoint = PostgreSQLEndpoint {
.ok_or_else(|| "No public endpoint configured on primary cluster".to_string())?; host: "postgrestest.sto1.nationtech.io".to_string(),
port: self.primary.get_router_port().await,
};
info!( info!(
"Public endpoint '{}:{}' retrieved for primary", "Public endpoint '{}:{}' retrieved for primary",
@@ -110,18 +110,21 @@ impl<T: PostgreSQL> PostgreSQL for FailoverTopology<T> {
Ok(primary_cluster_name) Ok(primary_cluster_name)
} }
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> { async fn get_replication_certs(
self.primary.get_replication_certs(cluster_name).await
}
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String> {
self.primary.get_endpoint(cluster_name).await
}
async fn get_public_endpoint(
&self, &self,
cluster_name: &str, config: &PostgreSQLConfig,
) -> Result<Option<PostgreSQLEndpoint>, String> { ) -> Result<ReplicationCerts, String> {
self.primary.get_public_endpoint(cluster_name).await self.primary.get_replication_certs(config).await
} }
async fn get_endpoint(&self, config: &PostgreSQLConfig) -> Result<PostgreSQLEndpoint, String> {
self.primary.get_endpoint(config).await
}
// async fn get_public_endpoint(
// &self,
// cluster_name: &str,
// ) -> Result<Option<PostgreSQLEndpoint>, String> {
// self.primary.get_public_endpoint(cluster_name).await
// }
} }

View File

@@ -6,9 +6,9 @@ use crate::data::Version;
use crate::domain::topology::router::{TlsRoute, TlsRouter}; use crate::domain::topology::router::{TlsRoute, TlsRouter};
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}; use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory; use crate::inventory::Inventory;
use crate::modules::postgresql::K8sPostgreSQLScore; use crate::modules::postgresql::capability::{PostgreSQL, PostgreSQLConfig};
use crate::score::Score; use crate::score::Score;
use crate::topology::{K8sclient, Topology}; use crate::topology::Topology;
/// Deploys a public PostgreSQL cluster: CNPG + TLS passthrough route for RW endpoint. /// Deploys a public PostgreSQL cluster: CNPG + TLS passthrough route for RW endpoint.
/// For failover/multisite: exposes single-instance or small HA Postgres publicly. /// For failover/multisite: exposes single-instance or small HA Postgres publicly.
@@ -23,7 +23,7 @@ use crate::topology::{K8sclient, Topology};
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct PublicPostgreSQLScore { pub struct PublicPostgreSQLScore {
/// Inner non-public Postgres cluster config. /// Inner non-public Postgres cluster config.
pub postgres_score: K8sPostgreSQLScore, pub config: PostgreSQLConfig,
/// Public hostname for RW TLS passthrough (port 443 → cluster-rw:5432). /// Public hostname for RW TLS passthrough (port 443 → cluster-rw:5432).
pub hostname: String, pub hostname: String,
} }
@@ -31,24 +31,24 @@ pub struct PublicPostgreSQLScore {
impl PublicPostgreSQLScore { impl PublicPostgreSQLScore {
pub fn new(namespace: &str, hostname: &str) -> Self { pub fn new(namespace: &str, hostname: &str) -> Self {
Self { Self {
postgres_score: K8sPostgreSQLScore::new(namespace), config: PostgreSQLConfig::default().with_namespace(namespace),
hostname: hostname.to_string(), hostname: hostname.to_string(),
} }
} }
} }
impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Score<T> for PublicPostgreSQLScore { impl<T: Topology + PostgreSQL + TlsRouter + Send + Sync> Score<T> for PublicPostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> { fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let rw_backend = format!("{}-rw", self.postgres_score.config.cluster_name); let rw_backend = format!("{}-rw", self.config.cluster_name);
let tls_route = TlsRoute { let tls_route = TlsRoute {
namespace: self.postgres_score.config.namespace.clone(), namespace: self.config.namespace.clone(),
hostname: self.hostname.clone(), hostname: self.hostname.clone(),
backend: rw_backend, backend: rw_backend,
target_port: 5432, target_port: 5432,
}; };
Box::new(PublicPostgreSQLInterpret { Box::new(PublicPostgreSQLInterpret {
postgres_score: self.postgres_score.clone(), config: self.config.clone(),
tls_route, tls_route,
}) })
} }
@@ -56,7 +56,7 @@ impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Score<T> for PublicPostg
fn name(&self) -> String { fn name(&self) -> String {
format!( format!(
"PublicPostgreSQLScore({}:{})", "PublicPostgreSQLScore({}:{})",
self.postgres_score.config.namespace, self.hostname self.config.namespace, self.hostname
) )
} }
} }
@@ -64,12 +64,14 @@ impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Score<T> for PublicPostg
/// Custom interpret: deploy Postgres then install public TLS route. /// Custom interpret: deploy Postgres then install public TLS route.
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct PublicPostgreSQLInterpret { struct PublicPostgreSQLInterpret {
postgres_score: K8sPostgreSQLScore, config: PostgreSQLConfig,
tls_route: TlsRoute, tls_route: TlsRoute,
} }
#[async_trait] #[async_trait]
impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Interpret<T> for PublicPostgreSQLInterpret { impl<T: Topology + PostgreSQL + TlsRouter + Send + Sync> Interpret<T>
for PublicPostgreSQLInterpret
{
fn get_name(&self) -> InterpretName { fn get_name(&self) -> InterpretName {
InterpretName::Custom("PublicPostgreSQLInterpret") InterpretName::Custom("PublicPostgreSQLInterpret")
} }
@@ -82,9 +84,11 @@ impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Interpret<T> for PublicP
fn get_children(&self) -> Vec<Id> { fn get_children(&self) -> Vec<Id> {
todo!() todo!()
} }
async fn execute(&self, inventory: &Inventory, topo: &T) -> Result<Outcome, InterpretError> { async fn execute(&self, _inventory: &Inventory, topo: &T) -> Result<Outcome, InterpretError> {
// Deploy CNPG cluster first (creates -rw service) // Deploy CNPG cluster first (creates -rw service)
self.postgres_score.interpret(inventory, topo).await?; topo.deploy(&self.config)
.await
.map_err(|e| InterpretError::new(e))?;
// Expose RW publicly via TLS passthrough // Expose RW publicly via TLS passthrough
topo.install_route(self.tls_route.clone()) topo.install_route(self.tls_route.clone())
@@ -93,7 +97,7 @@ impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Interpret<T> for PublicP
Ok(Outcome::success(format!( Ok(Outcome::success(format!(
"Public CNPG cluster '{}' deployed with TLS passthrough route '{}'", "Public CNPG cluster '{}' deployed with TLS passthrough route '{}'",
self.postgres_score.config.cluster_name.clone(), self.config.cluster_name.clone(),
self.tls_route.hostname self.tls_route.hostname
))) )))
} }