diff --git a/harmony/src/domain/topology/failover.rs b/harmony/src/domain/topology/failover.rs index dfeb557..6df9cea 100644 --- a/harmony/src/domain/topology/failover.rs +++ b/harmony/src/domain/topology/failover.rs @@ -1,20 +1,10 @@ use async_trait::async_trait; -use log::{debug, info}; -use std::collections::HashMap; - -use crate::{ - modules::postgresql::capability::{ - BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL, - PostgreSQLClusterRole, PostgreSQLConfig, PostgreSQLEndpoint, ReplicaConfig, - ReplicationCerts, - }, - topology::{PreparationError, PreparationOutcome, Topology}, -}; +use crate::topology::{PreparationError, PreparationOutcome, Topology}; pub struct FailoverTopology { - primary: T, - replica: T, + pub primary: T, + pub replica: T, } #[async_trait] @@ -27,115 +17,3 @@ impl Topology for FailoverTopology { todo!() } } - -#[async_trait] -impl PostgreSQL for FailoverTopology { - async fn deploy(&self, config: &PostgreSQLConfig) -> Result { - info!( - "Starting deployment of failover topology '{}'", - config.cluster_name - ); - - let primary_config = PostgreSQLConfig { - cluster_name: config.cluster_name.clone(), - instances: config.instances, - storage_size: config.storage_size.clone(), - role: PostgreSQLClusterRole::Primary, - }; - - info!( - "Deploying primary cluster '{{}}' ({} instances, {:?} storage)", - primary_config.cluster_name, primary_config.storage_size - ); - - let primary_cluster_name = self.primary.deploy(&primary_config).await?; - - info!("Primary cluster '{primary_cluster_name}' deployed successfully"); - - info!("Retrieving replication certificates for primary '{primary_cluster_name}'"); - - let certs = self - .primary - .get_replication_certs(&primary_cluster_name) - .await?; - - info!("Replication certificates retrieved successfully"); - - info!("Retrieving public endpoint for primary '{primary_cluster_name}"); - - let endpoint = self - .primary - .get_public_endpoint(&primary_cluster_name) - .await? - .ok_or_else(|| "No public endpoint configured on primary cluster".to_string())?; - - info!( - "Public endpoint '{}:{}' retrieved for primary", - endpoint.host, endpoint.port - ); - - info!("Configuring replica connection parameters and bootstrap"); - - let mut connection_parameters = HashMap::new(); - connection_parameters.insert("host".to_string(), endpoint.host); - connection_parameters.insert("port".to_string(), endpoint.port.to_string()); - connection_parameters.insert("dbname".to_string(), "postgres".to_string()); - connection_parameters.insert("user".to_string(), "streaming_replica".to_string()); - connection_parameters.insert("sslmode".to_string(), "verify-ca".to_string()); - connection_parameters.insert("sslnegotiation".to_string(), "direct".to_string()); - - debug!("Replica connection parameters: {:?}", connection_parameters); - - let external_cluster = ExternalClusterConfig { - name: primary_cluster_name.clone(), - connection_parameters, - }; - - let bootstrap_config = BootstrapConfig { - strategy: BootstrapStrategy::PgBasebackup, - }; - - let replica_cluster_config = ReplicaConfig { - primary_cluster_name: primary_cluster_name.clone(), - replication_certs: certs, - bootstrap: bootstrap_config, - external_cluster, - }; - - let replica_config = PostgreSQLConfig { - cluster_name: format!("{}-replica", primary_cluster_name), - instances: config.instances, - storage_size: config.storage_size.clone(), - role: PostgreSQLClusterRole::Replica(replica_cluster_config), - }; - - info!( - "Deploying replica cluster '{}' ({} instances, {:?} storage) on replica topology", - replica_config.cluster_name, replica_config.instances, replica_config.storage_size - ); - - self.replica.deploy(&replica_config).await?; - - info!( - "Replica cluster '{}' deployed successfully; failover topology '{}' ready", - replica_config.cluster_name, config.cluster_name - ); - - Ok(primary_cluster_name) - } - - async fn get_replication_certs(&self, cluster_name: &str) -> Result { - self.primary.get_replication_certs(cluster_name).await - } - - async fn get_endpoint(&self, cluster_name: &str) -> Result { - self.primary.get_endpoint(cluster_name).await - } - - async fn get_public_endpoint( - &self, - cluster_name: &str, - ) -> Result, String> { - self.primary.get_public_endpoint(cluster_name).await - } -} diff --git a/harmony/src/modules/postgresql/capability.rs b/harmony/src/modules/postgresql/capability.rs index 2779956..3b8390f 100644 --- a/harmony/src/modules/postgresql/capability.rs +++ b/harmony/src/modules/postgresql/capability.rs @@ -1,9 +1,10 @@ use async_trait::async_trait; use harmony_types::storage::StorageSize; +use serde::Serialize; use std::collections::HashMap; #[async_trait] -pub trait PostgreSQL { +pub trait PostgreSQL: Send + Sync { async fn deploy(&self, config: &PostgreSQLConfig) -> Result; /// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster. @@ -20,7 +21,7 @@ pub trait PostgreSQL { async fn get_public_endpoint(&self, cluster_name: &str) -> Result, String>; } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize)] pub struct PostgreSQLConfig { pub cluster_name: String, pub instances: u32, @@ -28,13 +29,13 @@ pub struct PostgreSQLConfig { pub role: PostgreSQLClusterRole, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize)] pub enum PostgreSQLClusterRole { Primary, - Replica(ReplicaClusterConfig), + Replica(ReplicaConfig), } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize)] pub struct ReplicaConfig { /// Name of the primary cluster this replica will sync from pub primary_cluster_name: String, @@ -46,17 +47,17 @@ pub struct ReplicaConfig { pub external_cluster: ExternalClusterConfig, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize)] pub struct BootstrapConfig { pub strategy: BootstrapStrategy, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize)] pub enum BootstrapStrategy { PgBasebackup, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize)] pub struct ExternalClusterConfig { /// Name used in CNPG externalClusters list pub name: String, @@ -64,7 +65,7 @@ pub struct ExternalClusterConfig { pub connection_parameters: HashMap, } -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Serialize)] pub struct ReplicationCerts { /// PEM-encoded CA cert from primary pub ca_cert_pem: String, diff --git a/harmony/src/modules/postgresql/failover.rs b/harmony/src/modules/postgresql/failover.rs new file mode 100644 index 0000000..9e54b2f --- /dev/null +++ b/harmony/src/modules/postgresql/failover.rs @@ -0,0 +1,125 @@ +use async_trait::async_trait; +use log::debug; +use log::info; +use std::collections::HashMap; + +use crate::{ + modules::postgresql::capability::{ + BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL, + PostgreSQLClusterRole, PostgreSQLConfig, PostgreSQLEndpoint, ReplicaConfig, + ReplicationCerts, + }, + topology::FailoverTopology, +}; + +#[async_trait] +impl PostgreSQL for FailoverTopology { + async fn deploy(&self, config: &PostgreSQLConfig) -> Result { + info!( + "Starting deployment of failover topology '{}'", + config.cluster_name + ); + + let primary_config = PostgreSQLConfig { + cluster_name: config.cluster_name.clone(), + instances: config.instances, + storage_size: config.storage_size.clone(), + role: PostgreSQLClusterRole::Primary, + }; + + info!( + "Deploying primary cluster '{{}}' ({} instances, {:?} storage)", + primary_config.cluster_name, primary_config.storage_size + ); + + let primary_cluster_name = self.primary.deploy(&primary_config).await?; + + info!("Primary cluster '{primary_cluster_name}' deployed successfully"); + + info!("Retrieving replication certificates for primary '{primary_cluster_name}'"); + + let certs = self + .primary + .get_replication_certs(&primary_cluster_name) + .await?; + + info!("Replication certificates retrieved successfully"); + + info!("Retrieving public endpoint for primary '{primary_cluster_name}"); + + let endpoint = self + .primary + .get_public_endpoint(&primary_cluster_name) + .await? + .ok_or_else(|| "No public endpoint configured on primary cluster".to_string())?; + + info!( + "Public endpoint '{}:{}' retrieved for primary", + endpoint.host, endpoint.port + ); + + info!("Configuring replica connection parameters and bootstrap"); + + let mut connection_parameters = HashMap::new(); + connection_parameters.insert("host".to_string(), endpoint.host); + connection_parameters.insert("port".to_string(), endpoint.port.to_string()); + connection_parameters.insert("dbname".to_string(), "postgres".to_string()); + connection_parameters.insert("user".to_string(), "streaming_replica".to_string()); + connection_parameters.insert("sslmode".to_string(), "verify-ca".to_string()); + connection_parameters.insert("sslnegotiation".to_string(), "direct".to_string()); + + debug!("Replica connection parameters: {:?}", connection_parameters); + + let external_cluster = ExternalClusterConfig { + name: primary_cluster_name.clone(), + connection_parameters, + }; + + let bootstrap_config = BootstrapConfig { + strategy: BootstrapStrategy::PgBasebackup, + }; + + let replica_cluster_config = ReplicaConfig { + primary_cluster_name: primary_cluster_name.clone(), + replication_certs: certs, + bootstrap: bootstrap_config, + external_cluster, + }; + + let replica_config = PostgreSQLConfig { + cluster_name: format!("{}-replica", primary_cluster_name), + instances: config.instances, + storage_size: config.storage_size.clone(), + role: PostgreSQLClusterRole::Replica(replica_cluster_config), + }; + + info!( + "Deploying replica cluster '{}' ({} instances, {:?} storage) on replica topology", + replica_config.cluster_name, replica_config.instances, replica_config.storage_size + ); + + self.replica.deploy(&replica_config).await?; + + info!( + "Replica cluster '{}' deployed successfully; failover topology '{}' ready", + replica_config.cluster_name, config.cluster_name + ); + + Ok(primary_cluster_name) + } + + async fn get_replication_certs(&self, cluster_name: &str) -> Result { + self.primary.get_replication_certs(cluster_name).await + } + + async fn get_endpoint(&self, cluster_name: &str) -> Result { + self.primary.get_endpoint(cluster_name).await + } + + async fn get_public_endpoint( + &self, + cluster_name: &str, + ) -> Result, String> { + self.primary.get_public_endpoint(cluster_name).await + } +} diff --git a/harmony/src/modules/postgresql/score.rs b/harmony/src/modules/postgresql/score.rs index 419c06b..5c6f428 100644 --- a/harmony/src/modules/postgresql/score.rs +++ b/harmony/src/modules/postgresql/score.rs @@ -9,13 +9,13 @@ use crate::{ use super::capability::*; -use derive_new::new; -use harmony_types::{id::Id, storage::StorageSize}; +use harmony_types::id::Id; use async_trait::async_trait; use log::info; use serde::Serialize; +#[derive(Clone, Debug, Serialize)] pub struct PostgreSQLScore { config: PostgreSQLConfig, } @@ -86,151 +86,3 @@ impl Interpret for PostgreSQLInterpret { ))) } } - -#[derive(Debug, new, Clone, Serialize)] -pub struct MultisitePostgreSQLScore { - pub cluster_name: String, - pub primary_site: Id, - pub replica_sites: Vec, - pub instances: u32, - pub storage_size: StorageSize, -} - -impl Score for MultisitePostgreSQLScore { - - fn create_interpret(&self) -> Box> { - Box::new(MultisitePostgreSQLInterpret::new(self.clone())) - } - - fn name(&self) -> String { - "MultisitePostgreSQLScore".to_string() - } -} - -#[derive(Debug, Clone)] -pub struct MultisitePostgreSQLInterpret { - score: MultisitePostgreSQLScore, - version: Version, - status: InterpretStatus, -} - -impl MultisitePostgreSQLInterpret { - pub fn new(score: MultisitePostgreSQLScore) -> Self { - let version = Version::from("1.0.0").expect("Version should be valid"); - Self { - score, - version, - status: InterpretStatus::QUEUED, - } - } -} - -#[async_trait] -impl Interpret for MultisitePostgreSQLInterpret { - fn get_name(&self) -> InterpretName { - InterpretName::Custom("MultisitePostgreSQLInterpret") - } - - fn get_version(&self) -> Version { - self.version.clone() - } - - fn get_status(&self) -> InterpretStatus { - self.status.clone() - } - - fn get_children(&self) -> Vec { - todo!("Track child interprets per site") - } - -async fn execute( - &self, - inventory: &Inventory, - topology: &T, -) -> Result { - - info!( - "Orchestrating multisite PostgreSQL: primary {:?}, replicas {:?}", - self.score.primary_site, self.score.replica_sites - ); - - // 1. Deploy primary - let primary_topo = topology.primary(); - - let primary_config = PostgreSQLConfig { - cluster_name: self.score.cluster_name.clone(), - instances: self.score.instances, - storage_size: self.score.storage_size.clone(), - role: ClusterRole::Primary, - }; - let primary_cluster_name = primary_topo - .deploy(&primary_config) - .await - .map_err(|e| InterpretError::from(format!("Primary deploy failed: {e}")))?; - - // 2. Extract certs & public endpoint from primary - let certs = primary_topo - .get_replication_certs(&primary_cluster_name) - .await - .map_err(|e| InterpretError::from(format!("Certs extract failed: {e}")))?; - let public_endpoint = primary_topo - .get_public_endpoint(&primary_cluster_name) - .await?? - .ok_or_else(|| InterpretError::from("No public endpoint on primary"))?; - - // 3. Deploy replicas - for replica_site in &self.score.replica_sites { -let replica_topo = topology.replica(); - - .map_err(|e| { - InterpretError::from(format!( - "Replica site {:?} lookup failed: {e}", - replica_site - )) - })?; - - let connection_params: HashMap = [ - ("host".to_string(), public_endpoint.host.clone()), - ("port".to_string(), public_endpoint.port.to_string()), - ("dbname".to_string(), "postgres".to_string()), - ("user".to_string(), "streaming_replica".to_string()), - ("sslmode".to_string(), "verify-ca".to_string()), - ("sslnegotiation".to_string(), "direct".to_string()), - ] - .into_iter() - .collect(); - - let external_cluster = ExternalClusterConfig { - name: "primary-cluster".to_string(), - connection_parameters: connection_params, - }; - - let replica_config_struct = ReplicaConfig { - primary_cluster_name: primary_cluster_name.clone(), - replication_certs: certs.clone(), - bootstrap: BootstrapConfig { - strategy: BootstrapStrategy::PgBasebackup, - }, - external_cluster, - }; - - let replica_config = PostgreSQLConfig { - cluster_name: format!("{}-replica-{}", self.score.cluster_name, replica_site), - instances: self.score.instances, - storage_size: self.score.storage_size.clone(), - role: ClusterRole::Replica(replica_config_struct), - }; - - let _replica_cluster = replica_topo.deploy(&replica_config).await.map_err(|e| { - InterpretError::from(format!("Replica {:?} deploy failed: {e}", replica_site)) - })?; - } - - Ok(Outcome::success(format!( - "Multisite PostgreSQL `{}` deployed: primary `{}`, {} replicas", - self.score.cluster_name, - primary_cluster_name, - self.score.replica_sites.len() - ))) - } -}