wip(failover): Started implementation of the FailoverTopology with PostgreSQL capability
Some checks failed
Run Check Script / check (pull_request) Failing after 45s
Some checks failed
Run Check Script / check (pull_request) Failing after 45s
This is our first Higher Order Topology (see ADR-015)
This commit is contained in:
@@ -1,20 +1,10 @@
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
|
||||||
use log::{debug, info};
|
use crate::topology::{PreparationError, PreparationOutcome, Topology};
|
||||||
use std::collections::HashMap;
|
|
||||||
|
|
||||||
use crate::{
|
|
||||||
modules::postgresql::capability::{
|
|
||||||
BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL,
|
|
||||||
PostgreSQLClusterRole, PostgreSQLConfig, PostgreSQLEndpoint, ReplicaConfig,
|
|
||||||
ReplicationCerts,
|
|
||||||
},
|
|
||||||
topology::{PreparationError, PreparationOutcome, Topology},
|
|
||||||
};
|
|
||||||
|
|
||||||
pub struct FailoverTopology<T> {
|
pub struct FailoverTopology<T> {
|
||||||
primary: T,
|
pub primary: T,
|
||||||
replica: T,
|
pub replica: T,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@@ -27,115 +17,3 @@ impl<T: Send + Sync> Topology for FailoverTopology<T> {
|
|||||||
todo!()
|
todo!()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<T: PostgreSQL> PostgreSQL for FailoverTopology<T> {
|
|
||||||
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
|
|
||||||
info!(
|
|
||||||
"Starting deployment of failover topology '{}'",
|
|
||||||
config.cluster_name
|
|
||||||
);
|
|
||||||
|
|
||||||
let primary_config = PostgreSQLConfig {
|
|
||||||
cluster_name: config.cluster_name.clone(),
|
|
||||||
instances: config.instances,
|
|
||||||
storage_size: config.storage_size.clone(),
|
|
||||||
role: PostgreSQLClusterRole::Primary,
|
|
||||||
};
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"Deploying primary cluster '{{}}' ({} instances, {:?} storage)",
|
|
||||||
primary_config.cluster_name, primary_config.storage_size
|
|
||||||
);
|
|
||||||
|
|
||||||
let primary_cluster_name = self.primary.deploy(&primary_config).await?;
|
|
||||||
|
|
||||||
info!("Primary cluster '{primary_cluster_name}' deployed successfully");
|
|
||||||
|
|
||||||
info!("Retrieving replication certificates for primary '{primary_cluster_name}'");
|
|
||||||
|
|
||||||
let certs = self
|
|
||||||
.primary
|
|
||||||
.get_replication_certs(&primary_cluster_name)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
info!("Replication certificates retrieved successfully");
|
|
||||||
|
|
||||||
info!("Retrieving public endpoint for primary '{primary_cluster_name}");
|
|
||||||
|
|
||||||
let endpoint = self
|
|
||||||
.primary
|
|
||||||
.get_public_endpoint(&primary_cluster_name)
|
|
||||||
.await?
|
|
||||||
.ok_or_else(|| "No public endpoint configured on primary cluster".to_string())?;
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"Public endpoint '{}:{}' retrieved for primary",
|
|
||||||
endpoint.host, endpoint.port
|
|
||||||
);
|
|
||||||
|
|
||||||
info!("Configuring replica connection parameters and bootstrap");
|
|
||||||
|
|
||||||
let mut connection_parameters = HashMap::new();
|
|
||||||
connection_parameters.insert("host".to_string(), endpoint.host);
|
|
||||||
connection_parameters.insert("port".to_string(), endpoint.port.to_string());
|
|
||||||
connection_parameters.insert("dbname".to_string(), "postgres".to_string());
|
|
||||||
connection_parameters.insert("user".to_string(), "streaming_replica".to_string());
|
|
||||||
connection_parameters.insert("sslmode".to_string(), "verify-ca".to_string());
|
|
||||||
connection_parameters.insert("sslnegotiation".to_string(), "direct".to_string());
|
|
||||||
|
|
||||||
debug!("Replica connection parameters: {:?}", connection_parameters);
|
|
||||||
|
|
||||||
let external_cluster = ExternalClusterConfig {
|
|
||||||
name: primary_cluster_name.clone(),
|
|
||||||
connection_parameters,
|
|
||||||
};
|
|
||||||
|
|
||||||
let bootstrap_config = BootstrapConfig {
|
|
||||||
strategy: BootstrapStrategy::PgBasebackup,
|
|
||||||
};
|
|
||||||
|
|
||||||
let replica_cluster_config = ReplicaConfig {
|
|
||||||
primary_cluster_name: primary_cluster_name.clone(),
|
|
||||||
replication_certs: certs,
|
|
||||||
bootstrap: bootstrap_config,
|
|
||||||
external_cluster,
|
|
||||||
};
|
|
||||||
|
|
||||||
let replica_config = PostgreSQLConfig {
|
|
||||||
cluster_name: format!("{}-replica", primary_cluster_name),
|
|
||||||
instances: config.instances,
|
|
||||||
storage_size: config.storage_size.clone(),
|
|
||||||
role: PostgreSQLClusterRole::Replica(replica_cluster_config),
|
|
||||||
};
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"Deploying replica cluster '{}' ({} instances, {:?} storage) on replica topology",
|
|
||||||
replica_config.cluster_name, replica_config.instances, replica_config.storage_size
|
|
||||||
);
|
|
||||||
|
|
||||||
self.replica.deploy(&replica_config).await?;
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"Replica cluster '{}' deployed successfully; failover topology '{}' ready",
|
|
||||||
replica_config.cluster_name, config.cluster_name
|
|
||||||
);
|
|
||||||
|
|
||||||
Ok(primary_cluster_name)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> {
|
|
||||||
self.primary.get_replication_certs(cluster_name).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String> {
|
|
||||||
self.primary.get_endpoint(cluster_name).await
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_public_endpoint(
|
|
||||||
&self,
|
|
||||||
cluster_name: &str,
|
|
||||||
) -> Result<Option<PostgreSQLEndpoint>, String> {
|
|
||||||
self.primary.get_public_endpoint(cluster_name).await
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
@@ -1,9 +1,10 @@
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use harmony_types::storage::StorageSize;
|
use harmony_types::storage::StorageSize;
|
||||||
|
use serde::Serialize;
|
||||||
use std::collections::HashMap;
|
use std::collections::HashMap;
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
pub trait PostgreSQL {
|
pub trait PostgreSQL: Send + Sync {
|
||||||
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String>;
|
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String>;
|
||||||
|
|
||||||
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
|
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
|
||||||
@@ -20,7 +21,7 @@ pub trait PostgreSQL {
|
|||||||
async fn get_public_endpoint(&self, cluster_name: &str) -> Result<Option<PostgreSQLEndpoint>, String>;
|
async fn get_public_endpoint(&self, cluster_name: &str) -> Result<Option<PostgreSQLEndpoint>, String>;
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, Serialize)]
|
||||||
pub struct PostgreSQLConfig {
|
pub struct PostgreSQLConfig {
|
||||||
pub cluster_name: String,
|
pub cluster_name: String,
|
||||||
pub instances: u32,
|
pub instances: u32,
|
||||||
@@ -28,13 +29,13 @@ pub struct PostgreSQLConfig {
|
|||||||
pub role: PostgreSQLClusterRole,
|
pub role: PostgreSQLClusterRole,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, Serialize)]
|
||||||
pub enum PostgreSQLClusterRole {
|
pub enum PostgreSQLClusterRole {
|
||||||
Primary,
|
Primary,
|
||||||
Replica(ReplicaClusterConfig),
|
Replica(ReplicaConfig),
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, Serialize)]
|
||||||
pub struct ReplicaConfig {
|
pub struct ReplicaConfig {
|
||||||
/// Name of the primary cluster this replica will sync from
|
/// Name of the primary cluster this replica will sync from
|
||||||
pub primary_cluster_name: String,
|
pub primary_cluster_name: String,
|
||||||
@@ -46,17 +47,17 @@ pub struct ReplicaConfig {
|
|||||||
pub external_cluster: ExternalClusterConfig,
|
pub external_cluster: ExternalClusterConfig,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, Serialize)]
|
||||||
pub struct BootstrapConfig {
|
pub struct BootstrapConfig {
|
||||||
pub strategy: BootstrapStrategy,
|
pub strategy: BootstrapStrategy,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, Serialize)]
|
||||||
pub enum BootstrapStrategy {
|
pub enum BootstrapStrategy {
|
||||||
PgBasebackup,
|
PgBasebackup,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, Serialize)]
|
||||||
pub struct ExternalClusterConfig {
|
pub struct ExternalClusterConfig {
|
||||||
/// Name used in CNPG externalClusters list
|
/// Name used in CNPG externalClusters list
|
||||||
pub name: String,
|
pub name: String,
|
||||||
@@ -64,7 +65,7 @@ pub struct ExternalClusterConfig {
|
|||||||
pub connection_parameters: HashMap<String, String>,
|
pub connection_parameters: HashMap<String, String>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
#[derive(Clone, Debug, Serialize)]
|
||||||
pub struct ReplicationCerts {
|
pub struct ReplicationCerts {
|
||||||
/// PEM-encoded CA cert from primary
|
/// PEM-encoded CA cert from primary
|
||||||
pub ca_cert_pem: String,
|
pub ca_cert_pem: String,
|
||||||
|
|||||||
125
harmony/src/modules/postgresql/failover.rs
Normal file
125
harmony/src/modules/postgresql/failover.rs
Normal file
@@ -0,0 +1,125 @@
|
|||||||
|
use async_trait::async_trait;
|
||||||
|
use log::debug;
|
||||||
|
use log::info;
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
modules::postgresql::capability::{
|
||||||
|
BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL,
|
||||||
|
PostgreSQLClusterRole, PostgreSQLConfig, PostgreSQLEndpoint, ReplicaConfig,
|
||||||
|
ReplicationCerts,
|
||||||
|
},
|
||||||
|
topology::FailoverTopology,
|
||||||
|
};
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<T: PostgreSQL> PostgreSQL for FailoverTopology<T> {
|
||||||
|
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
|
||||||
|
info!(
|
||||||
|
"Starting deployment of failover topology '{}'",
|
||||||
|
config.cluster_name
|
||||||
|
);
|
||||||
|
|
||||||
|
let primary_config = PostgreSQLConfig {
|
||||||
|
cluster_name: config.cluster_name.clone(),
|
||||||
|
instances: config.instances,
|
||||||
|
storage_size: config.storage_size.clone(),
|
||||||
|
role: PostgreSQLClusterRole::Primary,
|
||||||
|
};
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"Deploying primary cluster '{{}}' ({} instances, {:?} storage)",
|
||||||
|
primary_config.cluster_name, primary_config.storage_size
|
||||||
|
);
|
||||||
|
|
||||||
|
let primary_cluster_name = self.primary.deploy(&primary_config).await?;
|
||||||
|
|
||||||
|
info!("Primary cluster '{primary_cluster_name}' deployed successfully");
|
||||||
|
|
||||||
|
info!("Retrieving replication certificates for primary '{primary_cluster_name}'");
|
||||||
|
|
||||||
|
let certs = self
|
||||||
|
.primary
|
||||||
|
.get_replication_certs(&primary_cluster_name)
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
info!("Replication certificates retrieved successfully");
|
||||||
|
|
||||||
|
info!("Retrieving public endpoint for primary '{primary_cluster_name}");
|
||||||
|
|
||||||
|
let endpoint = self
|
||||||
|
.primary
|
||||||
|
.get_public_endpoint(&primary_cluster_name)
|
||||||
|
.await?
|
||||||
|
.ok_or_else(|| "No public endpoint configured on primary cluster".to_string())?;
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"Public endpoint '{}:{}' retrieved for primary",
|
||||||
|
endpoint.host, endpoint.port
|
||||||
|
);
|
||||||
|
|
||||||
|
info!("Configuring replica connection parameters and bootstrap");
|
||||||
|
|
||||||
|
let mut connection_parameters = HashMap::new();
|
||||||
|
connection_parameters.insert("host".to_string(), endpoint.host);
|
||||||
|
connection_parameters.insert("port".to_string(), endpoint.port.to_string());
|
||||||
|
connection_parameters.insert("dbname".to_string(), "postgres".to_string());
|
||||||
|
connection_parameters.insert("user".to_string(), "streaming_replica".to_string());
|
||||||
|
connection_parameters.insert("sslmode".to_string(), "verify-ca".to_string());
|
||||||
|
connection_parameters.insert("sslnegotiation".to_string(), "direct".to_string());
|
||||||
|
|
||||||
|
debug!("Replica connection parameters: {:?}", connection_parameters);
|
||||||
|
|
||||||
|
let external_cluster = ExternalClusterConfig {
|
||||||
|
name: primary_cluster_name.clone(),
|
||||||
|
connection_parameters,
|
||||||
|
};
|
||||||
|
|
||||||
|
let bootstrap_config = BootstrapConfig {
|
||||||
|
strategy: BootstrapStrategy::PgBasebackup,
|
||||||
|
};
|
||||||
|
|
||||||
|
let replica_cluster_config = ReplicaConfig {
|
||||||
|
primary_cluster_name: primary_cluster_name.clone(),
|
||||||
|
replication_certs: certs,
|
||||||
|
bootstrap: bootstrap_config,
|
||||||
|
external_cluster,
|
||||||
|
};
|
||||||
|
|
||||||
|
let replica_config = PostgreSQLConfig {
|
||||||
|
cluster_name: format!("{}-replica", primary_cluster_name),
|
||||||
|
instances: config.instances,
|
||||||
|
storage_size: config.storage_size.clone(),
|
||||||
|
role: PostgreSQLClusterRole::Replica(replica_cluster_config),
|
||||||
|
};
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"Deploying replica cluster '{}' ({} instances, {:?} storage) on replica topology",
|
||||||
|
replica_config.cluster_name, replica_config.instances, replica_config.storage_size
|
||||||
|
);
|
||||||
|
|
||||||
|
self.replica.deploy(&replica_config).await?;
|
||||||
|
|
||||||
|
info!(
|
||||||
|
"Replica cluster '{}' deployed successfully; failover topology '{}' ready",
|
||||||
|
replica_config.cluster_name, config.cluster_name
|
||||||
|
);
|
||||||
|
|
||||||
|
Ok(primary_cluster_name)
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> {
|
||||||
|
self.primary.get_replication_certs(cluster_name).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String> {
|
||||||
|
self.primary.get_endpoint(cluster_name).await
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn get_public_endpoint(
|
||||||
|
&self,
|
||||||
|
cluster_name: &str,
|
||||||
|
) -> Result<Option<PostgreSQLEndpoint>, String> {
|
||||||
|
self.primary.get_public_endpoint(cluster_name).await
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -9,13 +9,13 @@ use crate::{
|
|||||||
|
|
||||||
use super::capability::*;
|
use super::capability::*;
|
||||||
|
|
||||||
use derive_new::new;
|
use harmony_types::id::Id;
|
||||||
use harmony_types::{id::Id, storage::StorageSize};
|
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use log::info;
|
use log::info;
|
||||||
use serde::Serialize;
|
use serde::Serialize;
|
||||||
|
|
||||||
|
#[derive(Clone, Debug, Serialize)]
|
||||||
pub struct PostgreSQLScore {
|
pub struct PostgreSQLScore {
|
||||||
config: PostgreSQLConfig,
|
config: PostgreSQLConfig,
|
||||||
}
|
}
|
||||||
@@ -86,151 +86,3 @@ impl<T: Topology + PostgreSQL> Interpret<T> for PostgreSQLInterpret {
|
|||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, new, Clone, Serialize)]
|
|
||||||
pub struct MultisitePostgreSQLScore {
|
|
||||||
pub cluster_name: String,
|
|
||||||
pub primary_site: Id,
|
|
||||||
pub replica_sites: Vec<Id>,
|
|
||||||
pub instances: u32,
|
|
||||||
pub storage_size: StorageSize,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: FailoverTopology + crate::modules::postgresql::capability::PostgreSQL> Score<T> for MultisitePostgreSQLScore {
|
|
||||||
|
|
||||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
|
||||||
Box::new(MultisitePostgreSQLInterpret::new(self.clone()))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn name(&self) -> String {
|
|
||||||
"MultisitePostgreSQLScore".to_string()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct MultisitePostgreSQLInterpret {
|
|
||||||
score: MultisitePostgreSQLScore,
|
|
||||||
version: Version,
|
|
||||||
status: InterpretStatus,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl MultisitePostgreSQLInterpret {
|
|
||||||
pub fn new(score: MultisitePostgreSQLScore) -> Self {
|
|
||||||
let version = Version::from("1.0.0").expect("Version should be valid");
|
|
||||||
Self {
|
|
||||||
score,
|
|
||||||
version,
|
|
||||||
status: InterpretStatus::QUEUED,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<T: MultisiteTopology + PostgreSQL> Interpret<T> for MultisitePostgreSQLInterpret {
|
|
||||||
fn get_name(&self) -> InterpretName {
|
|
||||||
InterpretName::Custom("MultisitePostgreSQLInterpret")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_version(&self) -> Version {
|
|
||||||
self.version.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_status(&self) -> InterpretStatus {
|
|
||||||
self.status.clone()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_children(&self) -> Vec<Id> {
|
|
||||||
todo!("Track child interprets per site")
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn execute(
|
|
||||||
&self,
|
|
||||||
inventory: &Inventory,
|
|
||||||
topology: &T,
|
|
||||||
) -> Result<Outcome, InterpretError> {
|
|
||||||
|
|
||||||
info!(
|
|
||||||
"Orchestrating multisite PostgreSQL: primary {:?}, replicas {:?}",
|
|
||||||
self.score.primary_site, self.score.replica_sites
|
|
||||||
);
|
|
||||||
|
|
||||||
// 1. Deploy primary
|
|
||||||
let primary_topo = topology.primary();
|
|
||||||
|
|
||||||
let primary_config = PostgreSQLConfig {
|
|
||||||
cluster_name: self.score.cluster_name.clone(),
|
|
||||||
instances: self.score.instances,
|
|
||||||
storage_size: self.score.storage_size.clone(),
|
|
||||||
role: ClusterRole::Primary,
|
|
||||||
};
|
|
||||||
let primary_cluster_name = primary_topo
|
|
||||||
.deploy(&primary_config)
|
|
||||||
.await
|
|
||||||
.map_err(|e| InterpretError::from(format!("Primary deploy failed: {e}")))?;
|
|
||||||
|
|
||||||
// 2. Extract certs & public endpoint from primary
|
|
||||||
let certs = primary_topo
|
|
||||||
.get_replication_certs(&primary_cluster_name)
|
|
||||||
.await
|
|
||||||
.map_err(|e| InterpretError::from(format!("Certs extract failed: {e}")))?;
|
|
||||||
let public_endpoint = primary_topo
|
|
||||||
.get_public_endpoint(&primary_cluster_name)
|
|
||||||
.await??
|
|
||||||
.ok_or_else(|| InterpretError::from("No public endpoint on primary"))?;
|
|
||||||
|
|
||||||
// 3. Deploy replicas
|
|
||||||
for replica_site in &self.score.replica_sites {
|
|
||||||
let replica_topo = topology.replica();
|
|
||||||
|
|
||||||
.map_err(|e| {
|
|
||||||
InterpretError::from(format!(
|
|
||||||
"Replica site {:?} lookup failed: {e}",
|
|
||||||
replica_site
|
|
||||||
))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let connection_params: HashMap<String, String> = [
|
|
||||||
("host".to_string(), public_endpoint.host.clone()),
|
|
||||||
("port".to_string(), public_endpoint.port.to_string()),
|
|
||||||
("dbname".to_string(), "postgres".to_string()),
|
|
||||||
("user".to_string(), "streaming_replica".to_string()),
|
|
||||||
("sslmode".to_string(), "verify-ca".to_string()),
|
|
||||||
("sslnegotiation".to_string(), "direct".to_string()),
|
|
||||||
]
|
|
||||||
.into_iter()
|
|
||||||
.collect();
|
|
||||||
|
|
||||||
let external_cluster = ExternalClusterConfig {
|
|
||||||
name: "primary-cluster".to_string(),
|
|
||||||
connection_parameters: connection_params,
|
|
||||||
};
|
|
||||||
|
|
||||||
let replica_config_struct = ReplicaConfig {
|
|
||||||
primary_cluster_name: primary_cluster_name.clone(),
|
|
||||||
replication_certs: certs.clone(),
|
|
||||||
bootstrap: BootstrapConfig {
|
|
||||||
strategy: BootstrapStrategy::PgBasebackup,
|
|
||||||
},
|
|
||||||
external_cluster,
|
|
||||||
};
|
|
||||||
|
|
||||||
let replica_config = PostgreSQLConfig {
|
|
||||||
cluster_name: format!("{}-replica-{}", self.score.cluster_name, replica_site),
|
|
||||||
instances: self.score.instances,
|
|
||||||
storage_size: self.score.storage_size.clone(),
|
|
||||||
role: ClusterRole::Replica(replica_config_struct),
|
|
||||||
};
|
|
||||||
|
|
||||||
let _replica_cluster = replica_topo.deploy(&replica_config).await.map_err(|e| {
|
|
||||||
InterpretError::from(format!("Replica {:?} deploy failed: {e}", replica_site))
|
|
||||||
})?;
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(Outcome::success(format!(
|
|
||||||
"Multisite PostgreSQL `{}` deployed: primary `{}`, {} replicas",
|
|
||||||
self.score.cluster_name,
|
|
||||||
primary_cluster_name,
|
|
||||||
self.score.replica_sites.len()
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|||||||
Reference in New Issue
Block a user