wip(failover): Started implementation of the FailoverTopology with PostgreSQL capability
All checks were successful
Run Check Script / check (pull_request) Successful in 1m32s

This is our first Higher Order Topology (see ADR-015)
This commit is contained in:
2025-12-10 17:00:28 -05:00
parent d39b1957cd
commit 1b19638df4
13 changed files with 164 additions and 297 deletions

15
Cargo.lock generated
View File

@@ -1835,6 +1835,21 @@ dependencies = [
"url",
]
[[package]]
name = "example-operatorhub-catalogsource"
version = "0.1.0"
dependencies = [
"cidr",
"env_logger",
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_types",
"log",
"tokio",
"url",
]
[[package]]
name = "example-opnsense"
version = "0.1.0"

View File

@@ -3,13 +3,12 @@ use std::str::FromStr;
use harmony::{
inventory::Inventory,
modules::{k8s::apps::OperatorHubCatalogSourceScore, tenant::TenantScore},
topology::{tenant::TenantConfig, K8sAnywhereTopology},
topology::{K8sAnywhereTopology, tenant::TenantConfig},
};
use harmony_types::id::Id;
#[tokio::main]
async fn main() {
let operatorhub_catalog = OperatorHubCatalogSourceScore::default();
harmony_cli::run(

View File

@@ -1,20 +1,10 @@
use async_trait::async_trait;
use log::{debug, info};
use std::collections::HashMap;
use crate::{
modules::postgresql::capability::{
BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL,
PostgreSQLClusterRole, PostgreSQLConfig, PostgreSQLEndpoint, ReplicaConfig,
ReplicationCerts,
},
topology::{PreparationError, PreparationOutcome, Topology},
};
use crate::topology::{PreparationError, PreparationOutcome, Topology};
pub struct FailoverTopology<T> {
primary: T,
replica: T,
pub primary: T,
pub replica: T,
}
#[async_trait]
@@ -27,115 +17,3 @@ impl<T: Send + Sync> Topology for FailoverTopology<T> {
todo!()
}
}
#[async_trait]
impl<T: PostgreSQL> PostgreSQL for FailoverTopology<T> {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
info!(
"Starting deployment of failover topology '{}'",
config.cluster_name
);
let primary_config = PostgreSQLConfig {
cluster_name: config.cluster_name.clone(),
instances: config.instances,
storage_size: config.storage_size.clone(),
role: PostgreSQLClusterRole::Primary,
};
info!(
"Deploying primary cluster '{{}}' ({} instances, {:?} storage)",
primary_config.cluster_name, primary_config.storage_size
);
let primary_cluster_name = self.primary.deploy(&primary_config).await?;
info!("Primary cluster '{primary_cluster_name}' deployed successfully");
info!("Retrieving replication certificates for primary '{primary_cluster_name}'");
let certs = self
.primary
.get_replication_certs(&primary_cluster_name)
.await?;
info!("Replication certificates retrieved successfully");
info!("Retrieving public endpoint for primary '{primary_cluster_name}");
let endpoint = self
.primary
.get_public_endpoint(&primary_cluster_name)
.await?
.ok_or_else(|| "No public endpoint configured on primary cluster".to_string())?;
info!(
"Public endpoint '{}:{}' retrieved for primary",
endpoint.host, endpoint.port
);
info!("Configuring replica connection parameters and bootstrap");
let mut connection_parameters = HashMap::new();
connection_parameters.insert("host".to_string(), endpoint.host);
connection_parameters.insert("port".to_string(), endpoint.port.to_string());
connection_parameters.insert("dbname".to_string(), "postgres".to_string());
connection_parameters.insert("user".to_string(), "streaming_replica".to_string());
connection_parameters.insert("sslmode".to_string(), "verify-ca".to_string());
connection_parameters.insert("sslnegotiation".to_string(), "direct".to_string());
debug!("Replica connection parameters: {:?}", connection_parameters);
let external_cluster = ExternalClusterConfig {
name: primary_cluster_name.clone(),
connection_parameters,
};
let bootstrap_config = BootstrapConfig {
strategy: BootstrapStrategy::PgBasebackup,
};
let replica_cluster_config = ReplicaConfig {
primary_cluster_name: primary_cluster_name.clone(),
replication_certs: certs,
bootstrap: bootstrap_config,
external_cluster,
};
let replica_config = PostgreSQLConfig {
cluster_name: format!("{}-replica", primary_cluster_name),
instances: config.instances,
storage_size: config.storage_size.clone(),
role: PostgreSQLClusterRole::Replica(replica_cluster_config),
};
info!(
"Deploying replica cluster '{}' ({} instances, {:?} storage) on replica topology",
replica_config.cluster_name, replica_config.instances, replica_config.storage_size
);
self.replica.deploy(&replica_config).await?;
info!(
"Replica cluster '{}' deployed successfully; failover topology '{}' ready",
replica_config.cluster_name, config.cluster_name
);
Ok(primary_cluster_name)
}
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> {
self.primary.get_replication_certs(cluster_name).await
}
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String> {
self.primary.get_endpoint(cluster_name).await
}
async fn get_public_endpoint(
&self,
cluster_name: &str,
) -> Result<Option<PostgreSQLEndpoint>, String> {
self.primary.get_public_endpoint(cluster_name).await
}
}

View File

@@ -1,6 +1,6 @@
mod failover;
mod ha_cluster;
pub mod ingress;
mod failover;
pub use failover::*;
use harmony_types::net::IpAddress;
mod host_binding;

View File

@@ -1,4 +1,2 @@
mod catalogsources_operators_coreos_com;
pub use catalogsources_operators_coreos_com::*;

View File

@@ -1,4 +1,3 @@
mod operatorhub;
pub use operatorhub::*;
pub mod crd;

View File

@@ -1,5 +1,5 @@
pub mod apps;
pub mod deployment;
pub mod ingress;
pub mod namespace;
pub mod resource;
pub mod apps;

View File

@@ -13,8 +13,8 @@ pub mod load_balancer;
pub mod monitoring;
pub mod okd;
pub mod opnsense;
pub mod postgresql;
pub mod prometheus;
pub mod storage;
pub mod tenant;
pub mod tftp;
pub mod postgresql;

View File

@@ -1,9 +1,10 @@
use async_trait::async_trait;
use harmony_types::storage::StorageSize;
use serde::Serialize;
use std::collections::HashMap;
#[async_trait]
pub trait PostgreSQL {
pub trait PostgreSQL: Send + Sync {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String>;
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
@@ -17,10 +18,13 @@ pub trait PostgreSQL {
/// Returns None if no public endpoint (internal-only cluster).
/// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex
/// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait.
async fn get_public_endpoint(&self, cluster_name: &str) -> Result<Option<PostgreSQLEndpoint>, String>;
async fn get_public_endpoint(
&self,
cluster_name: &str,
) -> Result<Option<PostgreSQLEndpoint>, String>;
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize)]
pub struct PostgreSQLConfig {
pub cluster_name: String,
pub instances: u32,
@@ -28,13 +32,13 @@ pub struct PostgreSQLConfig {
pub role: PostgreSQLClusterRole,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize)]
pub enum PostgreSQLClusterRole {
Primary,
Replica(ReplicaClusterConfig),
Replica(ReplicaConfig),
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize)]
pub struct ReplicaConfig {
/// Name of the primary cluster this replica will sync from
pub primary_cluster_name: String,
@@ -46,17 +50,17 @@ pub struct ReplicaConfig {
pub external_cluster: ExternalClusterConfig,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize)]
pub struct BootstrapConfig {
pub strategy: BootstrapStrategy,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize)]
pub enum BootstrapStrategy {
PgBasebackup,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize)]
pub struct ExternalClusterConfig {
/// Name used in CNPG externalClusters list
pub name: String,
@@ -64,7 +68,7 @@ pub struct ExternalClusterConfig {
pub connection_parameters: HashMap<String, String>,
}
#[derive(Clone, Debug)]
#[derive(Clone, Debug, Serialize)]
pub struct ReplicationCerts {
/// PEM-encoded CA cert from primary
pub ca_cert_pem: String,

View File

@@ -0,0 +1,125 @@
use async_trait::async_trait;
use log::debug;
use log::info;
use std::collections::HashMap;
use crate::{
modules::postgresql::capability::{
BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL,
PostgreSQLClusterRole, PostgreSQLConfig, PostgreSQLEndpoint, ReplicaConfig,
ReplicationCerts,
},
topology::FailoverTopology,
};
#[async_trait]
impl<T: PostgreSQL> PostgreSQL for FailoverTopology<T> {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
info!(
"Starting deployment of failover topology '{}'",
config.cluster_name
);
let primary_config = PostgreSQLConfig {
cluster_name: config.cluster_name.clone(),
instances: config.instances,
storage_size: config.storage_size.clone(),
role: PostgreSQLClusterRole::Primary,
};
info!(
"Deploying primary cluster '{{}}' ({} instances, {:?} storage)",
primary_config.cluster_name, primary_config.storage_size
);
let primary_cluster_name = self.primary.deploy(&primary_config).await?;
info!("Primary cluster '{primary_cluster_name}' deployed successfully");
info!("Retrieving replication certificates for primary '{primary_cluster_name}'");
let certs = self
.primary
.get_replication_certs(&primary_cluster_name)
.await?;
info!("Replication certificates retrieved successfully");
info!("Retrieving public endpoint for primary '{primary_cluster_name}");
let endpoint = self
.primary
.get_public_endpoint(&primary_cluster_name)
.await?
.ok_or_else(|| "No public endpoint configured on primary cluster".to_string())?;
info!(
"Public endpoint '{}:{}' retrieved for primary",
endpoint.host, endpoint.port
);
info!("Configuring replica connection parameters and bootstrap");
let mut connection_parameters = HashMap::new();
connection_parameters.insert("host".to_string(), endpoint.host);
connection_parameters.insert("port".to_string(), endpoint.port.to_string());
connection_parameters.insert("dbname".to_string(), "postgres".to_string());
connection_parameters.insert("user".to_string(), "streaming_replica".to_string());
connection_parameters.insert("sslmode".to_string(), "verify-ca".to_string());
connection_parameters.insert("sslnegotiation".to_string(), "direct".to_string());
debug!("Replica connection parameters: {:?}", connection_parameters);
let external_cluster = ExternalClusterConfig {
name: primary_cluster_name.clone(),
connection_parameters,
};
let bootstrap_config = BootstrapConfig {
strategy: BootstrapStrategy::PgBasebackup,
};
let replica_cluster_config = ReplicaConfig {
primary_cluster_name: primary_cluster_name.clone(),
replication_certs: certs,
bootstrap: bootstrap_config,
external_cluster,
};
let replica_config = PostgreSQLConfig {
cluster_name: format!("{}-replica", primary_cluster_name),
instances: config.instances,
storage_size: config.storage_size.clone(),
role: PostgreSQLClusterRole::Replica(replica_cluster_config),
};
info!(
"Deploying replica cluster '{}' ({} instances, {:?} storage) on replica topology",
replica_config.cluster_name, replica_config.instances, replica_config.storage_size
);
self.replica.deploy(&replica_config).await?;
info!(
"Replica cluster '{}' deployed successfully; failover topology '{}' ready",
replica_config.cluster_name, config.cluster_name
);
Ok(primary_cluster_name)
}
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> {
self.primary.get_replication_certs(cluster_name).await
}
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String> {
self.primary.get_endpoint(cluster_name).await
}
async fn get_public_endpoint(
&self,
cluster_name: &str,
) -> Result<Option<PostgreSQLEndpoint>, String> {
self.primary.get_public_endpoint(cluster_name).await
}
}

View File

@@ -1,7 +1,4 @@
pub mod capability;
mod score;
pub mod failover;

View File

@@ -9,13 +9,13 @@ use crate::{
use super::capability::*;
use derive_new::new;
use harmony_types::{id::Id, storage::StorageSize};
use harmony_types::id::Id;
use async_trait::async_trait;
use log::info;
use serde::Serialize;
#[derive(Clone, Debug, Serialize)]
pub struct PostgreSQLScore {
config: PostgreSQLConfig,
}
@@ -86,151 +86,3 @@ impl<T: Topology + PostgreSQL> Interpret<T> for PostgreSQLInterpret {
)))
}
}
#[derive(Debug, new, Clone, Serialize)]
pub struct MultisitePostgreSQLScore {
pub cluster_name: String,
pub primary_site: Id,
pub replica_sites: Vec<Id>,
pub instances: u32,
pub storage_size: StorageSize,
}
impl<T: FailoverTopology + crate::modules::postgresql::capability::PostgreSQL> Score<T> for MultisitePostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(MultisitePostgreSQLInterpret::new(self.clone()))
}
fn name(&self) -> String {
"MultisitePostgreSQLScore".to_string()
}
}
#[derive(Debug, Clone)]
pub struct MultisitePostgreSQLInterpret {
score: MultisitePostgreSQLScore,
version: Version,
status: InterpretStatus,
}
impl MultisitePostgreSQLInterpret {
pub fn new(score: MultisitePostgreSQLScore) -> Self {
let version = Version::from("1.0.0").expect("Version should be valid");
Self {
score,
version,
status: InterpretStatus::QUEUED,
}
}
}
#[async_trait]
impl<T: MultisiteTopology + PostgreSQL> Interpret<T> for MultisitePostgreSQLInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("MultisitePostgreSQLInterpret")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<Id> {
todo!("Track child interprets per site")
}
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
info!(
"Orchestrating multisite PostgreSQL: primary {:?}, replicas {:?}",
self.score.primary_site, self.score.replica_sites
);
// 1. Deploy primary
let primary_topo = topology.primary();
let primary_config = PostgreSQLConfig {
cluster_name: self.score.cluster_name.clone(),
instances: self.score.instances,
storage_size: self.score.storage_size.clone(),
role: ClusterRole::Primary,
};
let primary_cluster_name = primary_topo
.deploy(&primary_config)
.await
.map_err(|e| InterpretError::from(format!("Primary deploy failed: {e}")))?;
// 2. Extract certs & public endpoint from primary
let certs = primary_topo
.get_replication_certs(&primary_cluster_name)
.await
.map_err(|e| InterpretError::from(format!("Certs extract failed: {e}")))?;
let public_endpoint = primary_topo
.get_public_endpoint(&primary_cluster_name)
.await??
.ok_or_else(|| InterpretError::from("No public endpoint on primary"))?;
// 3. Deploy replicas
for replica_site in &self.score.replica_sites {
let replica_topo = topology.replica();
.map_err(|e| {
InterpretError::from(format!(
"Replica site {:?} lookup failed: {e}",
replica_site
))
})?;
let connection_params: HashMap<String, String> = [
("host".to_string(), public_endpoint.host.clone()),
("port".to_string(), public_endpoint.port.to_string()),
("dbname".to_string(), "postgres".to_string()),
("user".to_string(), "streaming_replica".to_string()),
("sslmode".to_string(), "verify-ca".to_string()),
("sslnegotiation".to_string(), "direct".to_string()),
]
.into_iter()
.collect();
let external_cluster = ExternalClusterConfig {
name: "primary-cluster".to_string(),
connection_parameters: connection_params,
};
let replica_config_struct = ReplicaConfig {
primary_cluster_name: primary_cluster_name.clone(),
replication_certs: certs.clone(),
bootstrap: BootstrapConfig {
strategy: BootstrapStrategy::PgBasebackup,
},
external_cluster,
};
let replica_config = PostgreSQLConfig {
cluster_name: format!("{}-replica-{}", self.score.cluster_name, replica_site),
instances: self.score.instances,
storage_size: self.score.storage_size.clone(),
role: ClusterRole::Replica(replica_config_struct),
};
let _replica_cluster = replica_topo.deploy(&replica_config).await.map_err(|e| {
InterpretError::from(format!("Replica {:?} deploy failed: {e}", replica_site))
})?;
}
Ok(Outcome::success(format!(
"Multisite PostgreSQL `{}` deployed: primary `{}`, {} replicas",
self.score.cluster_name,
primary_cluster_name,
self.score.replica_sites.len()
)))
}
}

View File

@@ -1,4 +1,4 @@
pub mod id;
pub mod net;
pub mod switch;
pub mod storage;
pub mod switch;