wip: FailoverTopology implementation for PostgreSQL on the way!
Some checks failed
Run Check Script / check (pull_request) Failing after 37s

This commit is contained in:
2025-12-08 23:11:43 -05:00
parent c7cbd9eeac
commit ab78a12599
8 changed files with 475 additions and 0 deletions

View File

@@ -0,0 +1,141 @@
use async_trait::async_trait;
use log::{debug, info};
use std::collections::HashMap;
use crate::{
modules::postgresql::capability::{
BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL,
PostgreSQLClusterRole, PostgreSQLConfig, PostgreSQLEndpoint, ReplicaConfig,
ReplicationCerts,
},
topology::{PreparationError, PreparationOutcome, Topology},
};
pub struct FailoverTopology<T> {
primary: T,
replica: T,
}
#[async_trait]
impl<T: Send + Sync> Topology for FailoverTopology<T> {
fn name(&self) -> &str {
"FailoverTopology"
}
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
todo!()
}
}
#[async_trait]
impl<T: PostgreSQL> PostgreSQL for FailoverTopology<T> {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
info!(
"Starting deployment of failover topology '{}'",
config.cluster_name
);
let primary_config = PostgreSQLConfig {
cluster_name: config.cluster_name.clone(),
instances: config.instances,
storage_size: config.storage_size.clone(),
role: PostgreSQLClusterRole::Primary,
};
info!(
"Deploying primary cluster '{{}}' ({} instances, {:?} storage)",
primary_config.cluster_name, primary_config.storage_size
);
let primary_cluster_name = self.primary.deploy(&primary_config).await?;
info!("Primary cluster '{primary_cluster_name}' deployed successfully");
info!("Retrieving replication certificates for primary '{primary_cluster_name}'");
let certs = self
.primary
.get_replication_certs(&primary_cluster_name)
.await?;
info!("Replication certificates retrieved successfully");
info!("Retrieving public endpoint for primary '{primary_cluster_name}");
let endpoint = self
.primary
.get_public_endpoint(&primary_cluster_name)
.await?
.ok_or_else(|| "No public endpoint configured on primary cluster".to_string())?;
info!(
"Public endpoint '{}:{}' retrieved for primary",
endpoint.host, endpoint.port
);
info!("Configuring replica connection parameters and bootstrap");
let mut connection_parameters = HashMap::new();
connection_parameters.insert("host".to_string(), endpoint.host);
connection_parameters.insert("port".to_string(), endpoint.port.to_string());
connection_parameters.insert("dbname".to_string(), "postgres".to_string());
connection_parameters.insert("user".to_string(), "streaming_replica".to_string());
connection_parameters.insert("sslmode".to_string(), "verify-ca".to_string());
connection_parameters.insert("sslnegotiation".to_string(), "direct".to_string());
debug!("Replica connection parameters: {:?}", connection_parameters);
let external_cluster = ExternalClusterConfig {
name: primary_cluster_name.clone(),
connection_parameters,
};
let bootstrap_config = BootstrapConfig {
strategy: BootstrapStrategy::PgBasebackup,
};
let replica_cluster_config = ReplicaConfig {
primary_cluster_name: primary_cluster_name.clone(),
replication_certs: certs,
bootstrap: bootstrap_config,
external_cluster,
};
let replica_config = PostgreSQLConfig {
cluster_name: format!("{}-replica", primary_cluster_name),
instances: config.instances,
storage_size: config.storage_size.clone(),
role: PostgreSQLClusterRole::Replica(replica_cluster_config),
};
info!(
"Deploying replica cluster '{}' ({} instances, {:?} storage) on replica topology",
replica_config.cluster_name, replica_config.instances, replica_config.storage_size
);
self.replica.deploy(&replica_config).await?;
info!(
"Replica cluster '{}' deployed successfully; failover topology '{}' ready",
replica_config.cluster_name, config.cluster_name
);
Ok(primary_cluster_name)
}
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> {
self.primary.get_replication_certs(cluster_name).await
}
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String> {
self.primary.get_endpoint(cluster_name).await
}
async fn get_public_endpoint(
&self,
cluster_name: &str,
) -> Result<Option<PostgreSQLEndpoint>, String> {
self.primary.get_public_endpoint(cluster_name).await
}
}

View File

@@ -1,5 +1,7 @@
mod ha_cluster; mod ha_cluster;
pub mod ingress; pub mod ingress;
mod failover;
pub use failover::*;
use harmony_types::net::IpAddress; use harmony_types::net::IpAddress;
mod host_binding; mod host_binding;
mod http; mod http;

View File

@@ -17,3 +17,4 @@ pub mod prometheus;
pub mod storage; pub mod storage;
pub mod tenant; pub mod tenant;
pub mod tftp; pub mod tftp;
pub mod postgresql;

View File

@@ -0,0 +1,81 @@
use async_trait::async_trait;
use harmony_types::storage::StorageSize;
use std::collections::HashMap;
#[async_trait]
pub trait PostgreSQL {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String>;
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
/// Abstracts away storage/retrieval details (e.g., secrets, files).
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String>;
/// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster.
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String>;
/// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough).
/// Returns None if no public endpoint (internal-only cluster).
/// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex
/// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait.
async fn get_public_endpoint(&self, cluster_name: &str) -> Result<Option<PostgreSQLEndpoint>, String>;
}
#[derive(Clone, Debug)]
pub struct PostgreSQLConfig {
pub cluster_name: String,
pub instances: u32,
pub storage_size: StorageSize,
pub role: PostgreSQLClusterRole,
}
#[derive(Clone, Debug)]
pub enum PostgreSQLClusterRole {
Primary,
Replica(ReplicaClusterConfig),
}
#[derive(Clone, Debug)]
pub struct ReplicaConfig {
/// Name of the primary cluster this replica will sync from
pub primary_cluster_name: String,
/// Certs extracted from primary via Topology::get_replication_certs()
pub replication_certs: ReplicationCerts,
/// Bootstrap method (e.g., pg_basebackup from primary)
pub bootstrap: BootstrapConfig,
/// External cluster connection details for CNPG spec.externalClusters
pub external_cluster: ExternalClusterConfig,
}
#[derive(Clone, Debug)]
pub struct BootstrapConfig {
pub strategy: BootstrapStrategy,
}
#[derive(Clone, Debug)]
pub enum BootstrapStrategy {
PgBasebackup,
}
#[derive(Clone, Debug)]
pub struct ExternalClusterConfig {
/// Name used in CNPG externalClusters list
pub name: String,
/// Connection params (host/port set by multisite logic, sslmode='verify-ca', etc.)
pub connection_parameters: HashMap<String, String>,
}
#[derive(Clone, Debug)]
pub struct ReplicationCerts {
/// PEM-encoded CA cert from primary
pub ca_cert_pem: String,
/// PEM-encoded streaming_replica client cert (tls.crt)
pub streaming_replica_cert_pem: String,
/// PEM-encoded streaming_replica client key (tls.key)
pub streaming_replica_key_pem: String,
}
#[derive(Clone, Debug)]
pub struct PostgreSQLEndpoint {
pub host: String,
pub port: u16,
}

View File

@@ -0,0 +1,7 @@
pub mod capability;
mod score;
pub mod failover;

View File

@@ -0,0 +1,236 @@
use crate::{
domain::{data::Version, interpret::InterpretStatus},
interpret::{Interpret, InterpretError, InterpretName, Outcome},
inventory::Inventory,
modules::postgresql::capability::PostgreSQL,
score::Score,
topology::Topology,
};
use super::capability::*;
use derive_new::new;
use harmony_types::{id::Id, storage::StorageSize};
use async_trait::async_trait;
use log::info;
use serde::Serialize;
pub struct PostgreSQLScore {
config: PostgreSQLConfig,
}
#[derive(Debug, Clone)]
pub struct PostgreSQLInterpret {
config: PostgreSQLConfig,
version: Version,
status: InterpretStatus,
}
impl PostgreSQLInterpret {
pub fn new(config: PostgreSQLConfig) -> Self {
let version = Version::from("1.0.0").expect("Version should be valid");
Self {
config,
version,
status: InterpretStatus::QUEUED,
}
}
}
impl<T: Topology + PostgreSQL> Score<T> for PostgreSQLScore {
fn name(&self) -> String {
"PostgreSQLScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(PostgreSQLInterpret::new(self.config.clone()))
}
}
#[async_trait]
impl<T: Topology + PostgreSQL> Interpret<T> for PostgreSQLInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("PostgreSQLInterpret")
}
fn get_version(&self) -> crate::domain::data::Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
info!(
"Executing PostgreSQLInterpret with config {:?}",
self.config
);
let cluster_name = topology
.deploy(&self.config)
.await
.map_err(|e| InterpretError::from(e))?;
Ok(Outcome::success(format!(
"Deployed PostgreSQL cluster `{cluster_name}`"
)))
}
}
#[derive(Debug, new, Clone, Serialize)]
pub struct MultisitePostgreSQLScore {
pub cluster_name: String,
pub primary_site: Id,
pub replica_sites: Vec<Id>,
pub instances: u32,
pub storage_size: StorageSize,
}
impl<T: FailoverTopology + crate::modules::postgresql::capability::PostgreSQL> Score<T> for MultisitePostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(MultisitePostgreSQLInterpret::new(self.clone()))
}
fn name(&self) -> String {
"MultisitePostgreSQLScore".to_string()
}
}
#[derive(Debug, Clone)]
pub struct MultisitePostgreSQLInterpret {
score: MultisitePostgreSQLScore,
version: Version,
status: InterpretStatus,
}
impl MultisitePostgreSQLInterpret {
pub fn new(score: MultisitePostgreSQLScore) -> Self {
let version = Version::from("1.0.0").expect("Version should be valid");
Self {
score,
version,
status: InterpretStatus::QUEUED,
}
}
}
#[async_trait]
impl<T: MultisiteTopology + PostgreSQL> Interpret<T> for MultisitePostgreSQLInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("MultisitePostgreSQLInterpret")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<Id> {
todo!("Track child interprets per site")
}
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
info!(
"Orchestrating multisite PostgreSQL: primary {:?}, replicas {:?}",
self.score.primary_site, self.score.replica_sites
);
// 1. Deploy primary
let primary_topo = topology.primary();
let primary_config = PostgreSQLConfig {
cluster_name: self.score.cluster_name.clone(),
instances: self.score.instances,
storage_size: self.score.storage_size.clone(),
role: ClusterRole::Primary,
};
let primary_cluster_name = primary_topo
.deploy(&primary_config)
.await
.map_err(|e| InterpretError::from(format!("Primary deploy failed: {e}")))?;
// 2. Extract certs & public endpoint from primary
let certs = primary_topo
.get_replication_certs(&primary_cluster_name)
.await
.map_err(|e| InterpretError::from(format!("Certs extract failed: {e}")))?;
let public_endpoint = primary_topo
.get_public_endpoint(&primary_cluster_name)
.await??
.ok_or_else(|| InterpretError::from("No public endpoint on primary"))?;
// 3. Deploy replicas
for replica_site in &self.score.replica_sites {
let replica_topo = topology.replica();
.map_err(|e| {
InterpretError::from(format!(
"Replica site {:?} lookup failed: {e}",
replica_site
))
})?;
let connection_params: HashMap<String, String> = [
("host".to_string(), public_endpoint.host.clone()),
("port".to_string(), public_endpoint.port.to_string()),
("dbname".to_string(), "postgres".to_string()),
("user".to_string(), "streaming_replica".to_string()),
("sslmode".to_string(), "verify-ca".to_string()),
("sslnegotiation".to_string(), "direct".to_string()),
]
.into_iter()
.collect();
let external_cluster = ExternalClusterConfig {
name: "primary-cluster".to_string(),
connection_parameters: connection_params,
};
let replica_config_struct = ReplicaConfig {
primary_cluster_name: primary_cluster_name.clone(),
replication_certs: certs.clone(),
bootstrap: BootstrapConfig {
strategy: BootstrapStrategy::PgBasebackup,
},
external_cluster,
};
let replica_config = PostgreSQLConfig {
cluster_name: format!("{}-replica-{}", self.score.cluster_name, replica_site),
instances: self.score.instances,
storage_size: self.score.storage_size.clone(),
role: ClusterRole::Replica(replica_config_struct),
};
let _replica_cluster = replica_topo.deploy(&replica_config).await.map_err(|e| {
InterpretError::from(format!("Replica {:?} deploy failed: {e}", replica_site))
})?;
}
Ok(Outcome::success(format!(
"Multisite PostgreSQL `{}` deployed: primary `{}`, {} replicas",
self.score.cluster_name,
primary_cluster_name,
self.score.replica_sites.len()
)))
}
}

View File

@@ -1,3 +1,4 @@
pub mod id; pub mod id;
pub mod net; pub mod net;
pub mod switch; pub mod switch;
pub mod storage;

View File

@@ -0,0 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Debug)]
pub struct StorageSize {
size_bytes: u64,
}