From 357ca93d9049483f8c6f31de9611a5f44fc8e350 Mon Sep 17 00:00:00 2001 From: Jean-Gabriel Gill-Couture Date: Mon, 8 Dec 2025 23:11:43 -0500 Subject: [PATCH] wip: FailoverTopology implementation for PostgreSQL on the way! --- harmony/src/domain/topology/failover.rs | 141 +++++++++++ harmony/src/domain/topology/mod.rs | 2 + harmony/src/modules/mod.rs | 1 + harmony/src/modules/postgresql/capability.rs | 81 +++++++ harmony/src/modules/postgresql/mod.rs | 7 + harmony/src/modules/postgresql/score.rs | 236 +++++++++++++++++++ harmony_types/src/lib.rs | 1 + harmony_types/src/storage.rs | 6 + 8 files changed, 475 insertions(+) create mode 100644 harmony/src/domain/topology/failover.rs create mode 100644 harmony/src/modules/postgresql/capability.rs create mode 100644 harmony/src/modules/postgresql/mod.rs create mode 100644 harmony/src/modules/postgresql/score.rs create mode 100644 harmony_types/src/storage.rs diff --git a/harmony/src/domain/topology/failover.rs b/harmony/src/domain/topology/failover.rs new file mode 100644 index 0000000..dfeb557 --- /dev/null +++ b/harmony/src/domain/topology/failover.rs @@ -0,0 +1,141 @@ +use async_trait::async_trait; + +use log::{debug, info}; +use std::collections::HashMap; + +use crate::{ + modules::postgresql::capability::{ + BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL, + PostgreSQLClusterRole, PostgreSQLConfig, PostgreSQLEndpoint, ReplicaConfig, + ReplicationCerts, + }, + topology::{PreparationError, PreparationOutcome, Topology}, +}; + +pub struct FailoverTopology { + primary: T, + replica: T, +} + +#[async_trait] +impl Topology for FailoverTopology { + fn name(&self) -> &str { + "FailoverTopology" + } + + async fn ensure_ready(&self) -> Result { + todo!() + } +} + +#[async_trait] +impl PostgreSQL for FailoverTopology { + async fn deploy(&self, config: &PostgreSQLConfig) -> Result { + info!( + "Starting deployment of failover topology '{}'", + config.cluster_name + ); + + let primary_config = PostgreSQLConfig { + cluster_name: config.cluster_name.clone(), + instances: config.instances, + storage_size: config.storage_size.clone(), + role: PostgreSQLClusterRole::Primary, + }; + + info!( + "Deploying primary cluster '{{}}' ({} instances, {:?} storage)", + primary_config.cluster_name, primary_config.storage_size + ); + + let primary_cluster_name = self.primary.deploy(&primary_config).await?; + + info!("Primary cluster '{primary_cluster_name}' deployed successfully"); + + info!("Retrieving replication certificates for primary '{primary_cluster_name}'"); + + let certs = self + .primary + .get_replication_certs(&primary_cluster_name) + .await?; + + info!("Replication certificates retrieved successfully"); + + info!("Retrieving public endpoint for primary '{primary_cluster_name}"); + + let endpoint = self + .primary + .get_public_endpoint(&primary_cluster_name) + .await? + .ok_or_else(|| "No public endpoint configured on primary cluster".to_string())?; + + info!( + "Public endpoint '{}:{}' retrieved for primary", + endpoint.host, endpoint.port + ); + + info!("Configuring replica connection parameters and bootstrap"); + + let mut connection_parameters = HashMap::new(); + connection_parameters.insert("host".to_string(), endpoint.host); + connection_parameters.insert("port".to_string(), endpoint.port.to_string()); + connection_parameters.insert("dbname".to_string(), "postgres".to_string()); + connection_parameters.insert("user".to_string(), "streaming_replica".to_string()); + connection_parameters.insert("sslmode".to_string(), "verify-ca".to_string()); + connection_parameters.insert("sslnegotiation".to_string(), "direct".to_string()); + + debug!("Replica connection parameters: {:?}", connection_parameters); + + let external_cluster = ExternalClusterConfig { + name: primary_cluster_name.clone(), + connection_parameters, + }; + + let bootstrap_config = BootstrapConfig { + strategy: BootstrapStrategy::PgBasebackup, + }; + + let replica_cluster_config = ReplicaConfig { + primary_cluster_name: primary_cluster_name.clone(), + replication_certs: certs, + bootstrap: bootstrap_config, + external_cluster, + }; + + let replica_config = PostgreSQLConfig { + cluster_name: format!("{}-replica", primary_cluster_name), + instances: config.instances, + storage_size: config.storage_size.clone(), + role: PostgreSQLClusterRole::Replica(replica_cluster_config), + }; + + info!( + "Deploying replica cluster '{}' ({} instances, {:?} storage) on replica topology", + replica_config.cluster_name, replica_config.instances, replica_config.storage_size + ); + + self.replica.deploy(&replica_config).await?; + + info!( + "Replica cluster '{}' deployed successfully; failover topology '{}' ready", + replica_config.cluster_name, config.cluster_name + ); + + Ok(primary_cluster_name) + } + + async fn get_replication_certs(&self, cluster_name: &str) -> Result { + self.primary.get_replication_certs(cluster_name).await + } + + async fn get_endpoint(&self, cluster_name: &str) -> Result { + self.primary.get_endpoint(cluster_name).await + } + + async fn get_public_endpoint( + &self, + cluster_name: &str, + ) -> Result, String> { + self.primary.get_public_endpoint(cluster_name).await + } +} diff --git a/harmony/src/domain/topology/mod.rs b/harmony/src/domain/topology/mod.rs index 85e57d7..62efa00 100644 --- a/harmony/src/domain/topology/mod.rs +++ b/harmony/src/domain/topology/mod.rs @@ -1,5 +1,7 @@ mod ha_cluster; pub mod ingress; +mod failover; +pub use failover::*; use harmony_types::net::IpAddress; mod host_binding; mod http; diff --git a/harmony/src/modules/mod.rs b/harmony/src/modules/mod.rs index 682e16b..1ae4d4a 100644 --- a/harmony/src/modules/mod.rs +++ b/harmony/src/modules/mod.rs @@ -17,3 +17,4 @@ pub mod prometheus; pub mod storage; pub mod tenant; pub mod tftp; +pub mod postgresql; diff --git a/harmony/src/modules/postgresql/capability.rs b/harmony/src/modules/postgresql/capability.rs new file mode 100644 index 0000000..2779956 --- /dev/null +++ b/harmony/src/modules/postgresql/capability.rs @@ -0,0 +1,81 @@ +use async_trait::async_trait; +use harmony_types::storage::StorageSize; +use std::collections::HashMap; + +#[async_trait] +pub trait PostgreSQL { + async fn deploy(&self, config: &PostgreSQLConfig) -> Result; + + /// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster. + /// Abstracts away storage/retrieval details (e.g., secrets, files). + async fn get_replication_certs(&self, cluster_name: &str) -> Result; + + /// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster. + async fn get_endpoint(&self, cluster_name: &str) -> Result; + + /// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough). + /// Returns None if no public endpoint (internal-only cluster). + /// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex + /// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait. + async fn get_public_endpoint(&self, cluster_name: &str) -> Result, String>; +} + +#[derive(Clone, Debug)] +pub struct PostgreSQLConfig { + pub cluster_name: String, + pub instances: u32, + pub storage_size: StorageSize, + pub role: PostgreSQLClusterRole, +} + +#[derive(Clone, Debug)] +pub enum PostgreSQLClusterRole { + Primary, + Replica(ReplicaClusterConfig), +} + +#[derive(Clone, Debug)] +pub struct ReplicaConfig { + /// Name of the primary cluster this replica will sync from + pub primary_cluster_name: String, + /// Certs extracted from primary via Topology::get_replication_certs() + pub replication_certs: ReplicationCerts, + /// Bootstrap method (e.g., pg_basebackup from primary) + pub bootstrap: BootstrapConfig, + /// External cluster connection details for CNPG spec.externalClusters + pub external_cluster: ExternalClusterConfig, +} + +#[derive(Clone, Debug)] +pub struct BootstrapConfig { + pub strategy: BootstrapStrategy, +} + +#[derive(Clone, Debug)] +pub enum BootstrapStrategy { + PgBasebackup, +} + +#[derive(Clone, Debug)] +pub struct ExternalClusterConfig { + /// Name used in CNPG externalClusters list + pub name: String, + /// Connection params (host/port set by multisite logic, sslmode='verify-ca', etc.) + pub connection_parameters: HashMap, +} + +#[derive(Clone, Debug)] +pub struct ReplicationCerts { + /// PEM-encoded CA cert from primary + pub ca_cert_pem: String, + /// PEM-encoded streaming_replica client cert (tls.crt) + pub streaming_replica_cert_pem: String, + /// PEM-encoded streaming_replica client key (tls.key) + pub streaming_replica_key_pem: String, +} + +#[derive(Clone, Debug)] +pub struct PostgreSQLEndpoint { + pub host: String, + pub port: u16, +} diff --git a/harmony/src/modules/postgresql/mod.rs b/harmony/src/modules/postgresql/mod.rs new file mode 100644 index 0000000..482f013 --- /dev/null +++ b/harmony/src/modules/postgresql/mod.rs @@ -0,0 +1,7 @@ + +pub mod capability; +mod score; + + +pub mod failover; + diff --git a/harmony/src/modules/postgresql/score.rs b/harmony/src/modules/postgresql/score.rs new file mode 100644 index 0000000..419c06b --- /dev/null +++ b/harmony/src/modules/postgresql/score.rs @@ -0,0 +1,236 @@ +use crate::{ + domain::{data::Version, interpret::InterpretStatus}, + interpret::{Interpret, InterpretError, InterpretName, Outcome}, + inventory::Inventory, + modules::postgresql::capability::PostgreSQL, + score::Score, + topology::Topology, +}; + +use super::capability::*; + +use derive_new::new; +use harmony_types::{id::Id, storage::StorageSize}; + +use async_trait::async_trait; +use log::info; +use serde::Serialize; + +pub struct PostgreSQLScore { + config: PostgreSQLConfig, +} + +#[derive(Debug, Clone)] +pub struct PostgreSQLInterpret { + config: PostgreSQLConfig, + version: Version, + status: InterpretStatus, +} + +impl PostgreSQLInterpret { + pub fn new(config: PostgreSQLConfig) -> Self { + let version = Version::from("1.0.0").expect("Version should be valid"); + Self { + config, + version, + status: InterpretStatus::QUEUED, + } + } +} + +impl Score for PostgreSQLScore { + fn name(&self) -> String { + "PostgreSQLScore".to_string() + } + + fn create_interpret(&self) -> Box> { + Box::new(PostgreSQLInterpret::new(self.config.clone())) + } +} + +#[async_trait] +impl Interpret for PostgreSQLInterpret { + fn get_name(&self) -> InterpretName { + InterpretName::Custom("PostgreSQLInterpret") + } + + fn get_version(&self) -> crate::domain::data::Version { + self.version.clone() + } + + fn get_status(&self) -> InterpretStatus { + self.status.clone() + } + + fn get_children(&self) -> Vec { + todo!() + } + + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + ) -> Result { + info!( + "Executing PostgreSQLInterpret with config {:?}", + self.config + ); + + let cluster_name = topology + .deploy(&self.config) + .await + .map_err(|e| InterpretError::from(e))?; + + Ok(Outcome::success(format!( + "Deployed PostgreSQL cluster `{cluster_name}`" + ))) + } +} + +#[derive(Debug, new, Clone, Serialize)] +pub struct MultisitePostgreSQLScore { + pub cluster_name: String, + pub primary_site: Id, + pub replica_sites: Vec, + pub instances: u32, + pub storage_size: StorageSize, +} + +impl Score for MultisitePostgreSQLScore { + + fn create_interpret(&self) -> Box> { + Box::new(MultisitePostgreSQLInterpret::new(self.clone())) + } + + fn name(&self) -> String { + "MultisitePostgreSQLScore".to_string() + } +} + +#[derive(Debug, Clone)] +pub struct MultisitePostgreSQLInterpret { + score: MultisitePostgreSQLScore, + version: Version, + status: InterpretStatus, +} + +impl MultisitePostgreSQLInterpret { + pub fn new(score: MultisitePostgreSQLScore) -> Self { + let version = Version::from("1.0.0").expect("Version should be valid"); + Self { + score, + version, + status: InterpretStatus::QUEUED, + } + } +} + +#[async_trait] +impl Interpret for MultisitePostgreSQLInterpret { + fn get_name(&self) -> InterpretName { + InterpretName::Custom("MultisitePostgreSQLInterpret") + } + + fn get_version(&self) -> Version { + self.version.clone() + } + + fn get_status(&self) -> InterpretStatus { + self.status.clone() + } + + fn get_children(&self) -> Vec { + todo!("Track child interprets per site") + } + +async fn execute( + &self, + inventory: &Inventory, + topology: &T, +) -> Result { + + info!( + "Orchestrating multisite PostgreSQL: primary {:?}, replicas {:?}", + self.score.primary_site, self.score.replica_sites + ); + + // 1. Deploy primary + let primary_topo = topology.primary(); + + let primary_config = PostgreSQLConfig { + cluster_name: self.score.cluster_name.clone(), + instances: self.score.instances, + storage_size: self.score.storage_size.clone(), + role: ClusterRole::Primary, + }; + let primary_cluster_name = primary_topo + .deploy(&primary_config) + .await + .map_err(|e| InterpretError::from(format!("Primary deploy failed: {e}")))?; + + // 2. Extract certs & public endpoint from primary + let certs = primary_topo + .get_replication_certs(&primary_cluster_name) + .await + .map_err(|e| InterpretError::from(format!("Certs extract failed: {e}")))?; + let public_endpoint = primary_topo + .get_public_endpoint(&primary_cluster_name) + .await?? + .ok_or_else(|| InterpretError::from("No public endpoint on primary"))?; + + // 3. Deploy replicas + for replica_site in &self.score.replica_sites { +let replica_topo = topology.replica(); + + .map_err(|e| { + InterpretError::from(format!( + "Replica site {:?} lookup failed: {e}", + replica_site + )) + })?; + + let connection_params: HashMap = [ + ("host".to_string(), public_endpoint.host.clone()), + ("port".to_string(), public_endpoint.port.to_string()), + ("dbname".to_string(), "postgres".to_string()), + ("user".to_string(), "streaming_replica".to_string()), + ("sslmode".to_string(), "verify-ca".to_string()), + ("sslnegotiation".to_string(), "direct".to_string()), + ] + .into_iter() + .collect(); + + let external_cluster = ExternalClusterConfig { + name: "primary-cluster".to_string(), + connection_parameters: connection_params, + }; + + let replica_config_struct = ReplicaConfig { + primary_cluster_name: primary_cluster_name.clone(), + replication_certs: certs.clone(), + bootstrap: BootstrapConfig { + strategy: BootstrapStrategy::PgBasebackup, + }, + external_cluster, + }; + + let replica_config = PostgreSQLConfig { + cluster_name: format!("{}-replica-{}", self.score.cluster_name, replica_site), + instances: self.score.instances, + storage_size: self.score.storage_size.clone(), + role: ClusterRole::Replica(replica_config_struct), + }; + + let _replica_cluster = replica_topo.deploy(&replica_config).await.map_err(|e| { + InterpretError::from(format!("Replica {:?} deploy failed: {e}", replica_site)) + })?; + } + + Ok(Outcome::success(format!( + "Multisite PostgreSQL `{}` deployed: primary `{}`, {} replicas", + self.score.cluster_name, + primary_cluster_name, + self.score.replica_sites.len() + ))) + } +} diff --git a/harmony_types/src/lib.rs b/harmony_types/src/lib.rs index 098379a..38a296f 100644 --- a/harmony_types/src/lib.rs +++ b/harmony_types/src/lib.rs @@ -1,3 +1,4 @@ pub mod id; pub mod net; pub mod switch; +pub mod storage; diff --git a/harmony_types/src/storage.rs b/harmony_types/src/storage.rs new file mode 100644 index 0000000..bd5fd95 --- /dev/null +++ b/harmony_types/src/storage.rs @@ -0,0 +1,6 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Debug)] +pub struct StorageSize { + size_bytes: u64, +}