Compare commits
1 Commits
feat/multi
...
feat/unshe
| Author | SHA1 | Date | |
|---|---|---|---|
| f87e223d75 |
@@ -1,105 +0,0 @@
|
||||
# Design Document: Harmony PostgreSQL Module
|
||||
|
||||
**Status:** Draft
|
||||
**Last Updated:** 2025-12-01
|
||||
**Context:** Multi-site Data Replication & Orchestration
|
||||
|
||||
## 1. Overview
|
||||
|
||||
The Harmony PostgreSQL Module provides a high-level abstraction for deploying and managing high-availability PostgreSQL clusters across geographically distributed Kubernetes/OKD sites.
|
||||
|
||||
Instead of manually configuring complex replication slots, firewalls, and operator settings on each cluster, users define a single intent (a **Score**), and Harmony orchestrates the underlying infrastructure (the **Arrangement**) to establish a Primary-Replica architecture.
|
||||
|
||||
Currently, the implementation relies on the **CloudNativePG (CNPG)** operator as the backing engine.
|
||||
|
||||
## 2. Architecture
|
||||
|
||||
### 2.1 The Abstraction Model
|
||||
Following **ADR 003 (Infrastructure Abstraction)**, Harmony separates the *intent* from the *implementation*.
|
||||
|
||||
1. **The Score (Intent):** The user defines a `MultisitePostgreSQL` resource. This describes *what* is needed (e.g., "A Postgres 15 cluster with 10GB storage, Primary on Site A, Replica on Site B").
|
||||
2. **The Interpret (Action):** Harmony MultisitePostgreSQLInterpret processes this Score and orchestrates the deployment on both sites to reach the state defined in the Score.
|
||||
3. **The Capability (Implementation):** The PostgreSQL Capability is implemented by the K8sTopology and the interpret can deploy it, configure it and fetch information about it. The concrete implementation will rely on the mature CloudnativePG operator to manage all the Kubernetes resources required.
|
||||
|
||||
### 2.2 Network Connectivity (TLS Passthrough)
|
||||
|
||||
One of the critical challenges in multi-site orchestration is secure connectivity between clusters that may have dynamic IPs or strict firewalls.
|
||||
|
||||
To solve this, we utilize **OKD/OpenShift Routes with TLS Passthrough**.
|
||||
|
||||
* **Mechanism:** The Primary site exposes a `Route` configured for `termination: passthrough`.
|
||||
* **Routing:** The OpenShift HAProxy router inspects the **SNI (Server Name Indication)** header of the incoming TCP connection to route traffic to the correct PostgreSQL Pod.
|
||||
* **Security:** SSL is **not** terminated at the ingress router. The encrypted stream is passed directly to the PostgreSQL instance. Mutual TLS (mTLS) authentication is handled natively by CNPG between the Primary and Replica instances.
|
||||
* **Dynamic IPs:** Because connections are established via DNS hostnames (the Route URL), this architecture is resilient to dynamic IP changes at the Primary site.
|
||||
|
||||
#### Traffic Flow Diagram
|
||||
|
||||
```text
|
||||
[ Site B: Replica ] [ Site A: Primary ]
|
||||
| |
|
||||
(CNPG Instance) --[Encrypted TCP]--> (OKD HAProxy Router)
|
||||
| (Port 443) |
|
||||
| |
|
||||
| [SNI Inspection]
|
||||
| |
|
||||
| v
|
||||
| (PostgreSQL Primary Pod)
|
||||
| (Port 5432)
|
||||
```
|
||||
|
||||
## 3. Design Decisions
|
||||
|
||||
### Why CloudNativePG?
|
||||
We selected CloudNativePG because it relies exclusively on standard Kubernetes primitives and uses the native PostgreSQL replication protocol (WAL shipping/Streaming). This aligns with Harmony's goal of being "K8s Native."
|
||||
|
||||
### Why TLS Passthrough instead of VPN/NodePort?
|
||||
* **NodePort:** Requires static IPs and opening non-standard ports on the firewall, which violates our security constraints.
|
||||
* **VPN (e.g., Wireguard/Tailscale):** While secure, it introduces significant complexity (sidecars, key management) and external dependencies.
|
||||
* **TLS Passthrough:** Leverages the existing Ingress/Router infrastructure already present in OKD. It requires zero additional software and respects multi-tenancy (Routes are namespaced).
|
||||
|
||||
### Configuration Philosophy (YAGNI)
|
||||
The current design exposes a **generic configuration surface**. Users can configure standard parameters (Storage size, CPU/Memory requests, Postgres version).
|
||||
|
||||
**We explicitly do not expose advanced CNPG or PostgreSQL configurations at this stage.**
|
||||
|
||||
* **Reasoning:** We aim to keep the API surface small and manageable.
|
||||
* **Future Path:** We plan to implement a "pass-through" mechanism to allow sending raw config maps or custom parameters to the underlying engine (CNPG) *only when a concrete use case arises*. Until then, we adhere to the **YAGNI (You Ain't Gonna Need It)** principle to avoid premature optimization and API bloat.
|
||||
|
||||
## 4. Usage Guide
|
||||
|
||||
To deploy a multi-site cluster, apply the `MultisitePostgreSQL` resource to the Harmony Control Plane.
|
||||
|
||||
### Example Manifest
|
||||
|
||||
```yaml
|
||||
apiVersion: harmony.io/v1alpha1
|
||||
kind: MultisitePostgreSQL
|
||||
metadata:
|
||||
name: finance-db
|
||||
namespace: tenant-a
|
||||
spec:
|
||||
version: "15"
|
||||
storage: "10Gi"
|
||||
resources:
|
||||
requests:
|
||||
cpu: "500m"
|
||||
memory: "1Gi"
|
||||
|
||||
# Topology Definition
|
||||
topology:
|
||||
primary:
|
||||
site: "site-paris" # The name of the cluster in Harmony
|
||||
replicas:
|
||||
- site: "site-newyork"
|
||||
```
|
||||
|
||||
### What happens next?
|
||||
1. Harmony detects the CR.
|
||||
2. **On Site Paris:** It deploys a CNPG Cluster (Primary) and creates a Passthrough Route `postgres-finance-db.apps.site-paris.example.com`.
|
||||
3. **On Site New York:** It deploys a CNPG Cluster (Replica) configured with `externalClusters` pointing to the Paris Route.
|
||||
4. Data begins replicating immediately over the encrypted channel.
|
||||
|
||||
## 5. Troubleshooting
|
||||
|
||||
* **Connection Refused:** Ensure the Primary site's Route is successfully admitted by the Ingress Controller.
|
||||
* **Certificate Errors:** CNPG manages mTLS automatically. If errors persist, ensure the CA secrets were correctly propagated by Harmony from Primary to Replica namespaces.
|
||||
@@ -1,141 +0,0 @@
|
||||
use async_trait::async_trait;
|
||||
|
||||
use log::{debug, info};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::{
|
||||
modules::postgresql::capability::{
|
||||
BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL,
|
||||
PostgreSQLClusterRole, PostgreSQLConfig, PostgreSQLEndpoint, ReplicaConfig,
|
||||
ReplicationCerts,
|
||||
},
|
||||
topology::{PreparationError, PreparationOutcome, Topology},
|
||||
};
|
||||
|
||||
pub struct FailoverTopology<T> {
|
||||
primary: T,
|
||||
replica: T,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Send + Sync> Topology for FailoverTopology<T> {
|
||||
fn name(&self) -> &str {
|
||||
"FailoverTopology"
|
||||
}
|
||||
|
||||
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: PostgreSQL> PostgreSQL for FailoverTopology<T> {
|
||||
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
|
||||
info!(
|
||||
"Starting deployment of failover topology '{}'",
|
||||
config.cluster_name
|
||||
);
|
||||
|
||||
let primary_config = PostgreSQLConfig {
|
||||
cluster_name: config.cluster_name.clone(),
|
||||
instances: config.instances,
|
||||
storage_size: config.storage_size.clone(),
|
||||
role: PostgreSQLClusterRole::Primary,
|
||||
};
|
||||
|
||||
info!(
|
||||
"Deploying primary cluster '{{}}' ({} instances, {:?} storage)",
|
||||
primary_config.cluster_name, primary_config.storage_size
|
||||
);
|
||||
|
||||
let primary_cluster_name = self.primary.deploy(&primary_config).await?;
|
||||
|
||||
info!("Primary cluster '{primary_cluster_name}' deployed successfully");
|
||||
|
||||
info!("Retrieving replication certificates for primary '{primary_cluster_name}'");
|
||||
|
||||
let certs = self
|
||||
.primary
|
||||
.get_replication_certs(&primary_cluster_name)
|
||||
.await?;
|
||||
|
||||
info!("Replication certificates retrieved successfully");
|
||||
|
||||
info!("Retrieving public endpoint for primary '{primary_cluster_name}");
|
||||
|
||||
let endpoint = self
|
||||
.primary
|
||||
.get_public_endpoint(&primary_cluster_name)
|
||||
.await?
|
||||
.ok_or_else(|| "No public endpoint configured on primary cluster".to_string())?;
|
||||
|
||||
info!(
|
||||
"Public endpoint '{}:{}' retrieved for primary",
|
||||
endpoint.host, endpoint.port
|
||||
);
|
||||
|
||||
info!("Configuring replica connection parameters and bootstrap");
|
||||
|
||||
let mut connection_parameters = HashMap::new();
|
||||
connection_parameters.insert("host".to_string(), endpoint.host);
|
||||
connection_parameters.insert("port".to_string(), endpoint.port.to_string());
|
||||
connection_parameters.insert("dbname".to_string(), "postgres".to_string());
|
||||
connection_parameters.insert("user".to_string(), "streaming_replica".to_string());
|
||||
connection_parameters.insert("sslmode".to_string(), "verify-ca".to_string());
|
||||
connection_parameters.insert("sslnegotiation".to_string(), "direct".to_string());
|
||||
|
||||
debug!("Replica connection parameters: {:?}", connection_parameters);
|
||||
|
||||
let external_cluster = ExternalClusterConfig {
|
||||
name: primary_cluster_name.clone(),
|
||||
connection_parameters,
|
||||
};
|
||||
|
||||
let bootstrap_config = BootstrapConfig {
|
||||
strategy: BootstrapStrategy::PgBasebackup,
|
||||
};
|
||||
|
||||
let replica_cluster_config = ReplicaConfig {
|
||||
primary_cluster_name: primary_cluster_name.clone(),
|
||||
replication_certs: certs,
|
||||
bootstrap: bootstrap_config,
|
||||
external_cluster,
|
||||
};
|
||||
|
||||
let replica_config = PostgreSQLConfig {
|
||||
cluster_name: format!("{}-replica", primary_cluster_name),
|
||||
instances: config.instances,
|
||||
storage_size: config.storage_size.clone(),
|
||||
role: PostgreSQLClusterRole::Replica(replica_cluster_config),
|
||||
};
|
||||
|
||||
info!(
|
||||
"Deploying replica cluster '{}' ({} instances, {:?} storage) on replica topology",
|
||||
replica_config.cluster_name, replica_config.instances, replica_config.storage_size
|
||||
);
|
||||
|
||||
self.replica.deploy(&replica_config).await?;
|
||||
|
||||
info!(
|
||||
"Replica cluster '{}' deployed successfully; failover topology '{}' ready",
|
||||
replica_config.cluster_name, config.cluster_name
|
||||
);
|
||||
|
||||
Ok(primary_cluster_name)
|
||||
}
|
||||
|
||||
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> {
|
||||
self.primary.get_replication_certs(cluster_name).await
|
||||
}
|
||||
|
||||
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String> {
|
||||
self.primary.get_endpoint(cluster_name).await
|
||||
}
|
||||
|
||||
async fn get_public_endpoint(
|
||||
&self,
|
||||
cluster_name: &str,
|
||||
) -> Result<Option<PostgreSQLEndpoint>, String> {
|
||||
self.primary.get_public_endpoint(cluster_name).await
|
||||
}
|
||||
}
|
||||
@@ -27,7 +27,7 @@ use kube::{
|
||||
};
|
||||
use log::{debug, error, trace, warn};
|
||||
use serde::{Serialize, de::DeserializeOwned};
|
||||
use serde_json::json;
|
||||
use serde_json::{json, Value};
|
||||
use similar::TextDiff;
|
||||
use tokio::{io::AsyncReadExt, time::sleep};
|
||||
use url::Url;
|
||||
@@ -64,6 +64,10 @@ impl K8sClient {
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn patch_resource(&self, patch: Value, gvk: &GroupVersionKind) -> Result<(), Error> {
|
||||
|
||||
}
|
||||
|
||||
pub async fn service_account_api(&self, namespace: &str) -> Api<ServiceAccount> {
|
||||
let api: Api<ServiceAccount> = Api::namespaced(self.client.clone(), namespace);
|
||||
api
|
||||
|
||||
@@ -1,7 +1,5 @@
|
||||
mod ha_cluster;
|
||||
pub mod ingress;
|
||||
mod failover;
|
||||
pub use failover::*;
|
||||
use harmony_types::net::IpAddress;
|
||||
mod host_binding;
|
||||
mod http;
|
||||
|
||||
@@ -17,4 +17,3 @@ pub mod prometheus;
|
||||
pub mod storage;
|
||||
pub mod tenant;
|
||||
pub mod tftp;
|
||||
pub mod postgresql;
|
||||
|
||||
87
harmony/src/modules/okd/control_plane.rs
Normal file
87
harmony/src/modules/okd/control_plane.rs
Normal file
@@ -0,0 +1,87 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::id::Id;
|
||||
use kube::api::GroupVersionKind;
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
|
||||
use crate::{
|
||||
data::Version,
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
score::Score,
|
||||
topology::{K8sclient, Topology, k8s::K8sClient},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct ControlPlaneConfig {}
|
||||
|
||||
impl<T: Topology + K8sclient> Score<T> for ControlPlaneConfig {
|
||||
fn name(&self) -> String {
|
||||
"ControlPlaneConfig".to_string()
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct ControlPlaneConfigInterpret {
|
||||
score: ControlPlaneConfig,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + K8sclient> Interpret<T> for ControlPlaneConfigInterpret {
|
||||
async fn execute(
|
||||
&self,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let client = topology.k8s_client().await.unwrap();
|
||||
self.control_plane_unschedulable(&client).await
|
||||
}
|
||||
|
||||
fn get_name(&self) -> InterpretName {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl ControlPlaneConfigInterpret {
|
||||
async fn control_plane_unschedulable(
|
||||
&self,
|
||||
client: &Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let patch = json!({
|
||||
"spec": {
|
||||
"mastersSchedulable": false
|
||||
}
|
||||
});
|
||||
|
||||
let resource = GroupVersionKind {
|
||||
group: "config.openshift.io".to_string(),
|
||||
version: "v1".to_string(),
|
||||
kind: "Scheduler".to_string(),
|
||||
};
|
||||
|
||||
client.patch_resource(patch, &resource).await?;
|
||||
|
||||
Ok(Outcome::success(
|
||||
"control planes are no longer schedulable".to_string(),
|
||||
))
|
||||
}
|
||||
}
|
||||
@@ -1,81 +0,0 @@
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::storage::StorageSize;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[async_trait]
|
||||
pub trait PostgreSQL {
|
||||
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String>;
|
||||
|
||||
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
|
||||
/// Abstracts away storage/retrieval details (e.g., secrets, files).
|
||||
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String>;
|
||||
|
||||
/// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster.
|
||||
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String>;
|
||||
|
||||
/// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough).
|
||||
/// Returns None if no public endpoint (internal-only cluster).
|
||||
/// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex
|
||||
/// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait.
|
||||
async fn get_public_endpoint(&self, cluster_name: &str) -> Result<Option<PostgreSQLEndpoint>, String>;
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PostgreSQLConfig {
|
||||
pub cluster_name: String,
|
||||
pub instances: u32,
|
||||
pub storage_size: StorageSize,
|
||||
pub role: PostgreSQLClusterRole,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum PostgreSQLClusterRole {
|
||||
Primary,
|
||||
Replica(ReplicaClusterConfig),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ReplicaConfig {
|
||||
/// Name of the primary cluster this replica will sync from
|
||||
pub primary_cluster_name: String,
|
||||
/// Certs extracted from primary via Topology::get_replication_certs()
|
||||
pub replication_certs: ReplicationCerts,
|
||||
/// Bootstrap method (e.g., pg_basebackup from primary)
|
||||
pub bootstrap: BootstrapConfig,
|
||||
/// External cluster connection details for CNPG spec.externalClusters
|
||||
pub external_cluster: ExternalClusterConfig,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct BootstrapConfig {
|
||||
pub strategy: BootstrapStrategy,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum BootstrapStrategy {
|
||||
PgBasebackup,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ExternalClusterConfig {
|
||||
/// Name used in CNPG externalClusters list
|
||||
pub name: String,
|
||||
/// Connection params (host/port set by multisite logic, sslmode='verify-ca', etc.)
|
||||
pub connection_parameters: HashMap<String, String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ReplicationCerts {
|
||||
/// PEM-encoded CA cert from primary
|
||||
pub ca_cert_pem: String,
|
||||
/// PEM-encoded streaming_replica client cert (tls.crt)
|
||||
pub streaming_replica_cert_pem: String,
|
||||
/// PEM-encoded streaming_replica client key (tls.key)
|
||||
pub streaming_replica_key_pem: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PostgreSQLEndpoint {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
}
|
||||
@@ -1,7 +0,0 @@
|
||||
|
||||
pub mod capability;
|
||||
mod score;
|
||||
|
||||
|
||||
pub mod failover;
|
||||
|
||||
@@ -1,236 +0,0 @@
|
||||
use crate::{
|
||||
domain::{data::Version, interpret::InterpretStatus},
|
||||
interpret::{Interpret, InterpretError, InterpretName, Outcome},
|
||||
inventory::Inventory,
|
||||
modules::postgresql::capability::PostgreSQL,
|
||||
score::Score,
|
||||
topology::Topology,
|
||||
};
|
||||
|
||||
use super::capability::*;
|
||||
|
||||
use derive_new::new;
|
||||
use harmony_types::{id::Id, storage::StorageSize};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::info;
|
||||
use serde::Serialize;
|
||||
|
||||
pub struct PostgreSQLScore {
|
||||
config: PostgreSQLConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PostgreSQLInterpret {
|
||||
config: PostgreSQLConfig,
|
||||
version: Version,
|
||||
status: InterpretStatus,
|
||||
}
|
||||
|
||||
impl PostgreSQLInterpret {
|
||||
pub fn new(config: PostgreSQLConfig) -> Self {
|
||||
let version = Version::from("1.0.0").expect("Version should be valid");
|
||||
Self {
|
||||
config,
|
||||
version,
|
||||
status: InterpretStatus::QUEUED,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Topology + PostgreSQL> Score<T> for PostgreSQLScore {
|
||||
fn name(&self) -> String {
|
||||
"PostgreSQLScore".to_string()
|
||||
}
|
||||
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(PostgreSQLInterpret::new(self.config.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + PostgreSQL> Interpret<T> for PostgreSQLInterpret {
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("PostgreSQLInterpret")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> crate::domain::data::Version {
|
||||
self.version.clone()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
self.status.clone()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn execute(
|
||||
&self,
|
||||
_inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!(
|
||||
"Executing PostgreSQLInterpret with config {:?}",
|
||||
self.config
|
||||
);
|
||||
|
||||
let cluster_name = topology
|
||||
.deploy(&self.config)
|
||||
.await
|
||||
.map_err(|e| InterpretError::from(e))?;
|
||||
|
||||
Ok(Outcome::success(format!(
|
||||
"Deployed PostgreSQL cluster `{cluster_name}`"
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, new, Clone, Serialize)]
|
||||
pub struct MultisitePostgreSQLScore {
|
||||
pub cluster_name: String,
|
||||
pub primary_site: Id,
|
||||
pub replica_sites: Vec<Id>,
|
||||
pub instances: u32,
|
||||
pub storage_size: StorageSize,
|
||||
}
|
||||
|
||||
impl<T: FailoverTopology + crate::modules::postgresql::capability::PostgreSQL> Score<T> for MultisitePostgreSQLScore {
|
||||
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(MultisitePostgreSQLInterpret::new(self.clone()))
|
||||
}
|
||||
|
||||
fn name(&self) -> String {
|
||||
"MultisitePostgreSQLScore".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MultisitePostgreSQLInterpret {
|
||||
score: MultisitePostgreSQLScore,
|
||||
version: Version,
|
||||
status: InterpretStatus,
|
||||
}
|
||||
|
||||
impl MultisitePostgreSQLInterpret {
|
||||
pub fn new(score: MultisitePostgreSQLScore) -> Self {
|
||||
let version = Version::from("1.0.0").expect("Version should be valid");
|
||||
Self {
|
||||
score,
|
||||
version,
|
||||
status: InterpretStatus::QUEUED,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: MultisiteTopology + PostgreSQL> Interpret<T> for MultisitePostgreSQLInterpret {
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("MultisitePostgreSQLInterpret")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
self.version.clone()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
self.status.clone()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!("Track child interprets per site")
|
||||
}
|
||||
|
||||
async fn execute(
|
||||
&self,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
|
||||
info!(
|
||||
"Orchestrating multisite PostgreSQL: primary {:?}, replicas {:?}",
|
||||
self.score.primary_site, self.score.replica_sites
|
||||
);
|
||||
|
||||
// 1. Deploy primary
|
||||
let primary_topo = topology.primary();
|
||||
|
||||
let primary_config = PostgreSQLConfig {
|
||||
cluster_name: self.score.cluster_name.clone(),
|
||||
instances: self.score.instances,
|
||||
storage_size: self.score.storage_size.clone(),
|
||||
role: ClusterRole::Primary,
|
||||
};
|
||||
let primary_cluster_name = primary_topo
|
||||
.deploy(&primary_config)
|
||||
.await
|
||||
.map_err(|e| InterpretError::from(format!("Primary deploy failed: {e}")))?;
|
||||
|
||||
// 2. Extract certs & public endpoint from primary
|
||||
let certs = primary_topo
|
||||
.get_replication_certs(&primary_cluster_name)
|
||||
.await
|
||||
.map_err(|e| InterpretError::from(format!("Certs extract failed: {e}")))?;
|
||||
let public_endpoint = primary_topo
|
||||
.get_public_endpoint(&primary_cluster_name)
|
||||
.await??
|
||||
.ok_or_else(|| InterpretError::from("No public endpoint on primary"))?;
|
||||
|
||||
// 3. Deploy replicas
|
||||
for replica_site in &self.score.replica_sites {
|
||||
let replica_topo = topology.replica();
|
||||
|
||||
.map_err(|e| {
|
||||
InterpretError::from(format!(
|
||||
"Replica site {:?} lookup failed: {e}",
|
||||
replica_site
|
||||
))
|
||||
})?;
|
||||
|
||||
let connection_params: HashMap<String, String> = [
|
||||
("host".to_string(), public_endpoint.host.clone()),
|
||||
("port".to_string(), public_endpoint.port.to_string()),
|
||||
("dbname".to_string(), "postgres".to_string()),
|
||||
("user".to_string(), "streaming_replica".to_string()),
|
||||
("sslmode".to_string(), "verify-ca".to_string()),
|
||||
("sslnegotiation".to_string(), "direct".to_string()),
|
||||
]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let external_cluster = ExternalClusterConfig {
|
||||
name: "primary-cluster".to_string(),
|
||||
connection_parameters: connection_params,
|
||||
};
|
||||
|
||||
let replica_config_struct = ReplicaConfig {
|
||||
primary_cluster_name: primary_cluster_name.clone(),
|
||||
replication_certs: certs.clone(),
|
||||
bootstrap: BootstrapConfig {
|
||||
strategy: BootstrapStrategy::PgBasebackup,
|
||||
},
|
||||
external_cluster,
|
||||
};
|
||||
|
||||
let replica_config = PostgreSQLConfig {
|
||||
cluster_name: format!("{}-replica-{}", self.score.cluster_name, replica_site),
|
||||
instances: self.score.instances,
|
||||
storage_size: self.score.storage_size.clone(),
|
||||
role: ClusterRole::Replica(replica_config_struct),
|
||||
};
|
||||
|
||||
let _replica_cluster = replica_topo.deploy(&replica_config).await.map_err(|e| {
|
||||
InterpretError::from(format!("Replica {:?} deploy failed: {e}", replica_site))
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(Outcome::success(format!(
|
||||
"Multisite PostgreSQL `{}` deployed: primary `{}`, {} replicas",
|
||||
self.score.cluster_name,
|
||||
primary_cluster_name,
|
||||
self.score.replica_sites.len()
|
||||
)))
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,3 @@
|
||||
pub mod id;
|
||||
pub mod net;
|
||||
pub mod switch;
|
||||
pub mod storage;
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Debug)]
|
||||
pub struct StorageSize {
|
||||
size_bytes: u64,
|
||||
}
|
||||
Reference in New Issue
Block a user