Compare commits
3 Commits
feat/multi
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
| 9617e1cfde | |||
| a953284386 | |||
| bfde5f58ed |
114
adr/015-higher-order-topologies.md
Normal file
114
adr/015-higher-order-topologies.md
Normal file
@@ -0,0 +1,114 @@
|
||||
# Architecture Decision Record: Higher-Order Topologies
|
||||
|
||||
**Initial Author:** Jean-Gabriel Gill-Couture
|
||||
**Initial Date:** 2025-12-08
|
||||
**Last Updated Date:** 2025-12-08
|
||||
|
||||
## Status
|
||||
|
||||
Implemented
|
||||
|
||||
## Context
|
||||
|
||||
Harmony models infrastructure as **Topologies** (deployment targets like `K8sAnywhereTopology`, `LinuxHostTopology`) implementing **Capabilities** (tech traits like `PostgreSQL`, `Docker`).
|
||||
|
||||
**Higher-Order Topologies** (e.g., `FailoverTopology<T>`) compose/orchestrate capabilities *across* multiple underlying topologies (e.g., primary+replica `T`).
|
||||
|
||||
Naive design requires manual `impl Capability for HigherOrderTopology<T>` *per T per capability*, causing:
|
||||
- **Impl explosion**: N topologies × M capabilities = N×M boilerplate.
|
||||
- **ISP violation**: Topologies forced to impl unrelated capabilities.
|
||||
- **Maintenance hell**: New topology needs impls for *all* orchestrated capabilities; new capability needs impls for *all* topologies/higher-order.
|
||||
- **Barrier to extension**: Users can't easily add topologies without todos/panics.
|
||||
|
||||
This makes scaling Harmony impractical as ecosystem grows.
|
||||
|
||||
## Decision
|
||||
|
||||
Use **blanket trait impls** on higher-order topologies to *automatically* derive orchestration:
|
||||
|
||||
````rust
|
||||
/// Higher-Order Topology: Orchestrates capabilities across sub-topologies.
|
||||
pub struct FailoverTopology<T> {
|
||||
/// Primary sub-topology.
|
||||
primary: T,
|
||||
/// Replica sub-topology.
|
||||
replica: T,
|
||||
}
|
||||
|
||||
/// Automatically provides PostgreSQL failover for *any* `T: PostgreSQL`.
|
||||
/// Delegates to primary for queries; orchestrates deploy across both.
|
||||
#[async_trait]
|
||||
impl<T: PostgreSQL> PostgreSQL for FailoverTopology<T> {
|
||||
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
|
||||
// Deploy primary; extract certs/endpoint;
|
||||
// deploy replica with pg_basebackup + TLS passthrough.
|
||||
// (Full impl logged/elaborated.)
|
||||
}
|
||||
|
||||
// Delegate queries to primary.
|
||||
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> {
|
||||
self.primary.get_replication_certs(cluster_name).await
|
||||
}
|
||||
// ...
|
||||
}
|
||||
|
||||
/// Similarly for other capabilities.
|
||||
#[async_trait]
|
||||
impl<T: Docker> Docker for FailoverTopology<T> {
|
||||
// Failover Docker orchestration.
|
||||
}
|
||||
````
|
||||
|
||||
**Key properties:**
|
||||
- **Auto-derivation**: `Failover<K8sAnywhere>` gets `PostgreSQL` iff `K8sAnywhere: PostgreSQL`.
|
||||
- **No boilerplate**: One blanket impl per capability *per higher-order type*.
|
||||
|
||||
## Rationale
|
||||
|
||||
- **Composition via generics**: Rust trait solver auto-selects impls; zero runtime cost.
|
||||
- **Compile-time safety**: Missing `T: Capability` → compile error (no panics).
|
||||
- **Scalable**: O(capabilities) impls per higher-order; new `T` auto-works.
|
||||
- **ISP-respecting**: Capabilities only surface if sub-topology provides.
|
||||
- **Centralized logic**: Orchestration (e.g., cert propagation) in one place.
|
||||
|
||||
**Example usage:**
|
||||
````rust
|
||||
// ✅ Works: K8sAnywhere: PostgreSQL → Failover provides failover PG
|
||||
let pg_failover: FailoverTopology<K8sAnywhereTopology> = ...;
|
||||
pg_failover.deploy_pg(config).await;
|
||||
|
||||
// ✅ Works: LinuxHost: Docker → Failover provides failover Docker
|
||||
let docker_failover: FailoverTopology<LinuxHostTopology> = ...;
|
||||
docker_failover.deploy_docker(...).await;
|
||||
|
||||
// ❌ Compile fail: K8sAnywhere !: Docker
|
||||
let invalid: FailoverTopology<K8sAnywhereTopology>;
|
||||
invalid.deploy_docker(...); // `T: Docker` bound unsatisfied
|
||||
````
|
||||
|
||||
## Consequences
|
||||
|
||||
**Pros:**
|
||||
- **Extensible**: New topology `AWSTopology: PostgreSQL` → instant `Failover<AWSTopology>: PostgreSQL`.
|
||||
- **Lean**: No useless impls (e.g., no `K8sAnywhere: Docker`).
|
||||
- **Observable**: Logs trace every step.
|
||||
|
||||
**Cons:**
|
||||
- **Monomorphization**: Generics generate code per T (mitigated: few Ts).
|
||||
- **Delegation opacity**: Relies on rustdoc/logs for internals.
|
||||
|
||||
## Alternatives considered
|
||||
|
||||
| Approach | Pros | Cons |
|
||||
|----------|------|------|
|
||||
| **Manual per-T impls**<br>`impl PG for Failover<K8s> {..}`<br>`impl PG for Failover<Linux> {..}` | Explicit control | N×M explosion; violates ISP; hard to extend. |
|
||||
| **Dynamic trait objects**<br>`Box<dyn AnyCapability>` | Runtime flex | Perf hit; type erasure; error-prone dispatch. |
|
||||
| **Mega-topology trait**<br>All-in-one `OrchestratedTopology` | Simple wiring | Monolithic; poor composition. |
|
||||
| **Registry dispatch**<br>Runtime capability lookup | Decoupled | Complex; no compile safety; perf/debug overhead. |
|
||||
|
||||
**Selected**: Blanket impls leverage Rust generics for safe, zero-cost composition.
|
||||
|
||||
## Additional Notes
|
||||
|
||||
- Applies to `MultisiteTopology<T>`, `ShardedTopology<T>`, etc.
|
||||
- `FailoverTopology` in `failover.rs` is first implementation.
|
||||
153
adr/015-higher-order-topologies/example.rs
Normal file
153
adr/015-higher-order-topologies/example.rs
Normal file
@@ -0,0 +1,153 @@
|
||||
//! Example of Higher-Order Topologies in Harmony.
|
||||
//! Demonstrates how `FailoverTopology<T>` automatically provides failover for *any* capability
|
||||
//! supported by a sub-topology `T` via blanket trait impls.
|
||||
//!
|
||||
//! Key insight: No manual impls per T or capability -- scales effortlessly.
|
||||
//! Users can:
|
||||
//! - Write new `Topology` (impl capabilities on a struct).
|
||||
//! - Compose with `FailoverTopology` (gets capabilities if T has them).
|
||||
//! - Compile fails if capability missing (safety).
|
||||
|
||||
use async_trait::async_trait;
|
||||
use tokio;
|
||||
|
||||
/// Capability trait: Deploy and manage PostgreSQL.
|
||||
#[async_trait]
|
||||
pub trait PostgreSQL {
|
||||
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String>;
|
||||
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String>;
|
||||
}
|
||||
|
||||
/// Capability trait: Deploy Docker.
|
||||
#[async_trait]
|
||||
pub trait Docker {
|
||||
async fn deploy_docker(&self) -> Result<String, String>;
|
||||
}
|
||||
|
||||
/// Configuration for PostgreSQL deployments.
|
||||
#[derive(Clone)]
|
||||
pub struct PostgreSQLConfig;
|
||||
|
||||
/// Replication certificates.
|
||||
#[derive(Clone)]
|
||||
pub struct ReplicationCerts;
|
||||
|
||||
/// Concrete topology: Kubernetes Anywhere (supports PostgreSQL).
|
||||
#[derive(Clone)]
|
||||
pub struct K8sAnywhereTopology;
|
||||
|
||||
#[async_trait]
|
||||
impl PostgreSQL for K8sAnywhereTopology {
|
||||
async fn deploy(&self, _config: &PostgreSQLConfig) -> Result<String, String> {
|
||||
// Real impl: Use k8s helm chart, operator, etc.
|
||||
Ok("K8sAnywhere PostgreSQL deployed".to_string())
|
||||
}
|
||||
|
||||
async fn get_replication_certs(&self, _cluster_name: &str) -> Result<ReplicationCerts, String> {
|
||||
Ok(ReplicationCerts)
|
||||
}
|
||||
}
|
||||
|
||||
/// Concrete topology: Linux Host (supports Docker).
|
||||
#[derive(Clone)]
|
||||
pub struct LinuxHostTopology;
|
||||
|
||||
#[async_trait]
|
||||
impl Docker for LinuxHostTopology {
|
||||
async fn deploy_docker(&self) -> Result<String, String> {
|
||||
// Real impl: Install/configure Docker on host.
|
||||
Ok("LinuxHost Docker deployed".to_string())
|
||||
}
|
||||
}
|
||||
|
||||
/// Higher-Order Topology: Composes multiple sub-topologies (primary + replica).
|
||||
/// Automatically derives *all* capabilities of `T` with failover orchestration.
|
||||
///
|
||||
/// - If `T: PostgreSQL`, then `FailoverTopology<T>: PostgreSQL` (blanket impl).
|
||||
/// - Same for `Docker`, etc. No boilerplate!
|
||||
/// - Compile-time safe: Missing `T: Capability` → error.
|
||||
#[derive(Clone)]
|
||||
pub struct FailoverTopology<T> {
|
||||
/// Primary sub-topology.
|
||||
pub primary: T,
|
||||
/// Replica sub-topology.
|
||||
pub replica: T,
|
||||
}
|
||||
|
||||
/// Blanket impl: Failover PostgreSQL if T provides PostgreSQL.
|
||||
/// Delegates reads to primary; deploys to both.
|
||||
#[async_trait]
|
||||
impl<T: PostgreSQL + Send + Sync + Clone> PostgreSQL for FailoverTopology<T> {
|
||||
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
|
||||
// Orchestrate: Deploy primary first, then replica (e.g., via pg_basebackup).
|
||||
let primary_result = self.primary.deploy(config).await?;
|
||||
let replica_result = self.replica.deploy(config).await?;
|
||||
Ok(format!("Failover PG deployed: {} | {}", primary_result, replica_result))
|
||||
}
|
||||
|
||||
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> {
|
||||
// Delegate to primary (replica follows).
|
||||
self.primary.get_replication_certs(cluster_name).await
|
||||
}
|
||||
}
|
||||
|
||||
/// Blanket impl: Failover Docker if T provides Docker.
|
||||
#[async_trait]
|
||||
impl<T: Docker + Send + Sync + Clone> Docker for FailoverTopology<T> {
|
||||
async fn deploy_docker(&self) -> Result<String, String> {
|
||||
// Orchestrate across primary + replica.
|
||||
let primary_result = self.primary.deploy_docker().await?;
|
||||
let replica_result = self.replica.deploy_docker().await?;
|
||||
Ok(format!("Failover Docker deployed: {} | {}", primary_result, replica_result))
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let config = PostgreSQLConfig;
|
||||
|
||||
println!("=== ✅ PostgreSQL Failover (K8sAnywhere supports PG) ===");
|
||||
let pg_failover = FailoverTopology {
|
||||
primary: K8sAnywhereTopology,
|
||||
replica: K8sAnywhereTopology,
|
||||
};
|
||||
let result = pg_failover.deploy(&config).await.unwrap();
|
||||
println!("Result: {}", result);
|
||||
|
||||
println!("\n=== ✅ Docker Failover (LinuxHost supports Docker) ===");
|
||||
let docker_failover = FailoverTopology {
|
||||
primary: LinuxHostTopology,
|
||||
replica: LinuxHostTopology,
|
||||
};
|
||||
let result = docker_failover.deploy_docker().await.unwrap();
|
||||
println!("Result: {}", result);
|
||||
|
||||
println!("\n=== ❌ Would fail to compile (K8sAnywhere !: Docker) ===");
|
||||
// let invalid = FailoverTopology {
|
||||
// primary: K8sAnywhereTopology,
|
||||
// replica: K8sAnywhereTopology,
|
||||
// };
|
||||
// invalid.deploy_docker().await.unwrap(); // Error: `K8sAnywhereTopology: Docker` not satisfied!
|
||||
// Very clear error message :
|
||||
// error[E0599]: the method `deploy_docker` exists for struct `FailoverTopology<K8sAnywhereTopology>`, but its trait bounds were not satisfied
|
||||
// --> src/main.rs:90:9
|
||||
// |
|
||||
// 4 | pub struct FailoverTopology<T> {
|
||||
// | ------------------------------ method `deploy_docker` not found for this struct because it doesn't satisfy `FailoverTopology<K8sAnywhereTopology>: Docker`
|
||||
// ...
|
||||
// 37 | struct K8sAnywhereTopology;
|
||||
// | -------------------------- doesn't satisfy `K8sAnywhereTopology: Docker`
|
||||
// ...
|
||||
// 90 | invalid.deploy_docker(); // `T: Docker` bound unsatisfied
|
||||
// | ^^^^^^^^^^^^^ method cannot be called on `FailoverTopology<K8sAnywhereTopology>` due to unsatisfied trait bounds
|
||||
// |
|
||||
// note: trait bound `K8sAnywhereTopology: Docker` was not satisfied
|
||||
// --> src/main.rs:61:9
|
||||
// |
|
||||
// 61 | impl<T: Docker + Send + Sync> Docker for FailoverTopology<T> {
|
||||
// | ^^^^^^ ------ -------------------
|
||||
// | |
|
||||
// | unsatisfied trait bound introduced here
|
||||
// note: the trait `Docker` must be implemented
|
||||
}
|
||||
|
||||
@@ -1,105 +0,0 @@
|
||||
# Design Document: Harmony PostgreSQL Module
|
||||
|
||||
**Status:** Draft
|
||||
**Last Updated:** 2025-12-01
|
||||
**Context:** Multi-site Data Replication & Orchestration
|
||||
|
||||
## 1. Overview
|
||||
|
||||
The Harmony PostgreSQL Module provides a high-level abstraction for deploying and managing high-availability PostgreSQL clusters across geographically distributed Kubernetes/OKD sites.
|
||||
|
||||
Instead of manually configuring complex replication slots, firewalls, and operator settings on each cluster, users define a single intent (a **Score**), and Harmony orchestrates the underlying infrastructure (the **Arrangement**) to establish a Primary-Replica architecture.
|
||||
|
||||
Currently, the implementation relies on the **CloudNativePG (CNPG)** operator as the backing engine.
|
||||
|
||||
## 2. Architecture
|
||||
|
||||
### 2.1 The Abstraction Model
|
||||
Following **ADR 003 (Infrastructure Abstraction)**, Harmony separates the *intent* from the *implementation*.
|
||||
|
||||
1. **The Score (Intent):** The user defines a `MultisitePostgreSQL` resource. This describes *what* is needed (e.g., "A Postgres 15 cluster with 10GB storage, Primary on Site A, Replica on Site B").
|
||||
2. **The Interpret (Action):** Harmony MultisitePostgreSQLInterpret processes this Score and orchestrates the deployment on both sites to reach the state defined in the Score.
|
||||
3. **The Capability (Implementation):** The PostgreSQL Capability is implemented by the K8sTopology and the interpret can deploy it, configure it and fetch information about it. The concrete implementation will rely on the mature CloudnativePG operator to manage all the Kubernetes resources required.
|
||||
|
||||
### 2.2 Network Connectivity (TLS Passthrough)
|
||||
|
||||
One of the critical challenges in multi-site orchestration is secure connectivity between clusters that may have dynamic IPs or strict firewalls.
|
||||
|
||||
To solve this, we utilize **OKD/OpenShift Routes with TLS Passthrough**.
|
||||
|
||||
* **Mechanism:** The Primary site exposes a `Route` configured for `termination: passthrough`.
|
||||
* **Routing:** The OpenShift HAProxy router inspects the **SNI (Server Name Indication)** header of the incoming TCP connection to route traffic to the correct PostgreSQL Pod.
|
||||
* **Security:** SSL is **not** terminated at the ingress router. The encrypted stream is passed directly to the PostgreSQL instance. Mutual TLS (mTLS) authentication is handled natively by CNPG between the Primary and Replica instances.
|
||||
* **Dynamic IPs:** Because connections are established via DNS hostnames (the Route URL), this architecture is resilient to dynamic IP changes at the Primary site.
|
||||
|
||||
#### Traffic Flow Diagram
|
||||
|
||||
```text
|
||||
[ Site B: Replica ] [ Site A: Primary ]
|
||||
| |
|
||||
(CNPG Instance) --[Encrypted TCP]--> (OKD HAProxy Router)
|
||||
| (Port 443) |
|
||||
| |
|
||||
| [SNI Inspection]
|
||||
| |
|
||||
| v
|
||||
| (PostgreSQL Primary Pod)
|
||||
| (Port 5432)
|
||||
```
|
||||
|
||||
## 3. Design Decisions
|
||||
|
||||
### Why CloudNativePG?
|
||||
We selected CloudNativePG because it relies exclusively on standard Kubernetes primitives and uses the native PostgreSQL replication protocol (WAL shipping/Streaming). This aligns with Harmony's goal of being "K8s Native."
|
||||
|
||||
### Why TLS Passthrough instead of VPN/NodePort?
|
||||
* **NodePort:** Requires static IPs and opening non-standard ports on the firewall, which violates our security constraints.
|
||||
* **VPN (e.g., Wireguard/Tailscale):** While secure, it introduces significant complexity (sidecars, key management) and external dependencies.
|
||||
* **TLS Passthrough:** Leverages the existing Ingress/Router infrastructure already present in OKD. It requires zero additional software and respects multi-tenancy (Routes are namespaced).
|
||||
|
||||
### Configuration Philosophy (YAGNI)
|
||||
The current design exposes a **generic configuration surface**. Users can configure standard parameters (Storage size, CPU/Memory requests, Postgres version).
|
||||
|
||||
**We explicitly do not expose advanced CNPG or PostgreSQL configurations at this stage.**
|
||||
|
||||
* **Reasoning:** We aim to keep the API surface small and manageable.
|
||||
* **Future Path:** We plan to implement a "pass-through" mechanism to allow sending raw config maps or custom parameters to the underlying engine (CNPG) *only when a concrete use case arises*. Until then, we adhere to the **YAGNI (You Ain't Gonna Need It)** principle to avoid premature optimization and API bloat.
|
||||
|
||||
## 4. Usage Guide
|
||||
|
||||
To deploy a multi-site cluster, apply the `MultisitePostgreSQL` resource to the Harmony Control Plane.
|
||||
|
||||
### Example Manifest
|
||||
|
||||
```yaml
|
||||
apiVersion: harmony.io/v1alpha1
|
||||
kind: MultisitePostgreSQL
|
||||
metadata:
|
||||
name: finance-db
|
||||
namespace: tenant-a
|
||||
spec:
|
||||
version: "15"
|
||||
storage: "10Gi"
|
||||
resources:
|
||||
requests:
|
||||
cpu: "500m"
|
||||
memory: "1Gi"
|
||||
|
||||
# Topology Definition
|
||||
topology:
|
||||
primary:
|
||||
site: "site-paris" # The name of the cluster in Harmony
|
||||
replicas:
|
||||
- site: "site-newyork"
|
||||
```
|
||||
|
||||
### What happens next?
|
||||
1. Harmony detects the CR.
|
||||
2. **On Site Paris:** It deploys a CNPG Cluster (Primary) and creates a Passthrough Route `postgres-finance-db.apps.site-paris.example.com`.
|
||||
3. **On Site New York:** It deploys a CNPG Cluster (Replica) configured with `externalClusters` pointing to the Paris Route.
|
||||
4. Data begins replicating immediately over the encrypted channel.
|
||||
|
||||
## 5. Troubleshooting
|
||||
|
||||
* **Connection Refused:** Ensure the Primary site's Route is successfully admitted by the Ingress Controller.
|
||||
* **Certificate Errors:** CNPG manages mTLS automatically. If errors persist, ensure the CA secrets were correctly propagated by Harmony from Primary to Replica namespaces.
|
||||
@@ -1,141 +0,0 @@
|
||||
use async_trait::async_trait;
|
||||
|
||||
use log::{debug, info};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::{
|
||||
modules::postgresql::capability::{
|
||||
BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL,
|
||||
PostgreSQLClusterRole, PostgreSQLConfig, PostgreSQLEndpoint, ReplicaConfig,
|
||||
ReplicationCerts,
|
||||
},
|
||||
topology::{PreparationError, PreparationOutcome, Topology},
|
||||
};
|
||||
|
||||
pub struct FailoverTopology<T> {
|
||||
primary: T,
|
||||
replica: T,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Send + Sync> Topology for FailoverTopology<T> {
|
||||
fn name(&self) -> &str {
|
||||
"FailoverTopology"
|
||||
}
|
||||
|
||||
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: PostgreSQL> PostgreSQL for FailoverTopology<T> {
|
||||
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
|
||||
info!(
|
||||
"Starting deployment of failover topology '{}'",
|
||||
config.cluster_name
|
||||
);
|
||||
|
||||
let primary_config = PostgreSQLConfig {
|
||||
cluster_name: config.cluster_name.clone(),
|
||||
instances: config.instances,
|
||||
storage_size: config.storage_size.clone(),
|
||||
role: PostgreSQLClusterRole::Primary,
|
||||
};
|
||||
|
||||
info!(
|
||||
"Deploying primary cluster '{{}}' ({} instances, {:?} storage)",
|
||||
primary_config.cluster_name, primary_config.storage_size
|
||||
);
|
||||
|
||||
let primary_cluster_name = self.primary.deploy(&primary_config).await?;
|
||||
|
||||
info!("Primary cluster '{primary_cluster_name}' deployed successfully");
|
||||
|
||||
info!("Retrieving replication certificates for primary '{primary_cluster_name}'");
|
||||
|
||||
let certs = self
|
||||
.primary
|
||||
.get_replication_certs(&primary_cluster_name)
|
||||
.await?;
|
||||
|
||||
info!("Replication certificates retrieved successfully");
|
||||
|
||||
info!("Retrieving public endpoint for primary '{primary_cluster_name}");
|
||||
|
||||
let endpoint = self
|
||||
.primary
|
||||
.get_public_endpoint(&primary_cluster_name)
|
||||
.await?
|
||||
.ok_or_else(|| "No public endpoint configured on primary cluster".to_string())?;
|
||||
|
||||
info!(
|
||||
"Public endpoint '{}:{}' retrieved for primary",
|
||||
endpoint.host, endpoint.port
|
||||
);
|
||||
|
||||
info!("Configuring replica connection parameters and bootstrap");
|
||||
|
||||
let mut connection_parameters = HashMap::new();
|
||||
connection_parameters.insert("host".to_string(), endpoint.host);
|
||||
connection_parameters.insert("port".to_string(), endpoint.port.to_string());
|
||||
connection_parameters.insert("dbname".to_string(), "postgres".to_string());
|
||||
connection_parameters.insert("user".to_string(), "streaming_replica".to_string());
|
||||
connection_parameters.insert("sslmode".to_string(), "verify-ca".to_string());
|
||||
connection_parameters.insert("sslnegotiation".to_string(), "direct".to_string());
|
||||
|
||||
debug!("Replica connection parameters: {:?}", connection_parameters);
|
||||
|
||||
let external_cluster = ExternalClusterConfig {
|
||||
name: primary_cluster_name.clone(),
|
||||
connection_parameters,
|
||||
};
|
||||
|
||||
let bootstrap_config = BootstrapConfig {
|
||||
strategy: BootstrapStrategy::PgBasebackup,
|
||||
};
|
||||
|
||||
let replica_cluster_config = ReplicaConfig {
|
||||
primary_cluster_name: primary_cluster_name.clone(),
|
||||
replication_certs: certs,
|
||||
bootstrap: bootstrap_config,
|
||||
external_cluster,
|
||||
};
|
||||
|
||||
let replica_config = PostgreSQLConfig {
|
||||
cluster_name: format!("{}-replica", primary_cluster_name),
|
||||
instances: config.instances,
|
||||
storage_size: config.storage_size.clone(),
|
||||
role: PostgreSQLClusterRole::Replica(replica_cluster_config),
|
||||
};
|
||||
|
||||
info!(
|
||||
"Deploying replica cluster '{}' ({} instances, {:?} storage) on replica topology",
|
||||
replica_config.cluster_name, replica_config.instances, replica_config.storage_size
|
||||
);
|
||||
|
||||
self.replica.deploy(&replica_config).await?;
|
||||
|
||||
info!(
|
||||
"Replica cluster '{}' deployed successfully; failover topology '{}' ready",
|
||||
replica_config.cluster_name, config.cluster_name
|
||||
);
|
||||
|
||||
Ok(primary_cluster_name)
|
||||
}
|
||||
|
||||
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> {
|
||||
self.primary.get_replication_certs(cluster_name).await
|
||||
}
|
||||
|
||||
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String> {
|
||||
self.primary.get_endpoint(cluster_name).await
|
||||
}
|
||||
|
||||
async fn get_public_endpoint(
|
||||
&self,
|
||||
cluster_name: &str,
|
||||
) -> Result<Option<PostgreSQLEndpoint>, String> {
|
||||
self.primary.get_public_endpoint(cluster_name).await
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,5 @@
|
||||
mod ha_cluster;
|
||||
pub mod ingress;
|
||||
mod failover;
|
||||
pub use failover::*;
|
||||
use harmony_types::net::IpAddress;
|
||||
mod host_binding;
|
||||
mod http;
|
||||
|
||||
@@ -17,6 +17,12 @@ use crate::{
|
||||
topology::{HostNetworkConfig, NetworkError, NetworkManager, k8s::K8sClient},
|
||||
};
|
||||
|
||||
/// TODO document properly the non-intuitive behavior or "roll forward only" of nmstate in general
|
||||
/// It is documented in nmstate official doc, but worth mentionning here :
|
||||
///
|
||||
/// - You create a bond, nmstate will apply it
|
||||
/// - You delete de bond from nmstate, it will NOT delete it
|
||||
/// - To delete it you have to update it with configuration set to null
|
||||
pub struct OpenShiftNmStateNetworkManager {
|
||||
k8s_client: Arc<K8sClient>,
|
||||
}
|
||||
@@ -31,6 +37,7 @@ impl std::fmt::Debug for OpenShiftNmStateNetworkManager {
|
||||
impl NetworkManager for OpenShiftNmStateNetworkManager {
|
||||
async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError> {
|
||||
debug!("Installing NMState controller...");
|
||||
// TODO use operatorhub maybe?
|
||||
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/nmstate.io_nmstates.yaml
|
||||
").unwrap(), Some("nmstate"))
|
||||
.await?;
|
||||
|
||||
@@ -17,4 +17,3 @@ pub mod prometheus;
|
||||
pub mod storage;
|
||||
pub mod tenant;
|
||||
pub mod tftp;
|
||||
pub mod postgresql;
|
||||
|
||||
@@ -1,81 +0,0 @@
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::storage::StorageSize;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[async_trait]
|
||||
pub trait PostgreSQL {
|
||||
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String>;
|
||||
|
||||
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
|
||||
/// Abstracts away storage/retrieval details (e.g., secrets, files).
|
||||
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String>;
|
||||
|
||||
/// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster.
|
||||
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String>;
|
||||
|
||||
/// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough).
|
||||
/// Returns None if no public endpoint (internal-only cluster).
|
||||
/// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex
|
||||
/// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait.
|
||||
async fn get_public_endpoint(&self, cluster_name: &str) -> Result<Option<PostgreSQLEndpoint>, String>;
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PostgreSQLConfig {
|
||||
pub cluster_name: String,
|
||||
pub instances: u32,
|
||||
pub storage_size: StorageSize,
|
||||
pub role: PostgreSQLClusterRole,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum PostgreSQLClusterRole {
|
||||
Primary,
|
||||
Replica(ReplicaClusterConfig),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ReplicaConfig {
|
||||
/// Name of the primary cluster this replica will sync from
|
||||
pub primary_cluster_name: String,
|
||||
/// Certs extracted from primary via Topology::get_replication_certs()
|
||||
pub replication_certs: ReplicationCerts,
|
||||
/// Bootstrap method (e.g., pg_basebackup from primary)
|
||||
pub bootstrap: BootstrapConfig,
|
||||
/// External cluster connection details for CNPG spec.externalClusters
|
||||
pub external_cluster: ExternalClusterConfig,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct BootstrapConfig {
|
||||
pub strategy: BootstrapStrategy,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum BootstrapStrategy {
|
||||
PgBasebackup,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ExternalClusterConfig {
|
||||
/// Name used in CNPG externalClusters list
|
||||
pub name: String,
|
||||
/// Connection params (host/port set by multisite logic, sslmode='verify-ca', etc.)
|
||||
pub connection_parameters: HashMap<String, String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ReplicationCerts {
|
||||
/// PEM-encoded CA cert from primary
|
||||
pub ca_cert_pem: String,
|
||||
/// PEM-encoded streaming_replica client cert (tls.crt)
|
||||
pub streaming_replica_cert_pem: String,
|
||||
/// PEM-encoded streaming_replica client key (tls.key)
|
||||
pub streaming_replica_key_pem: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PostgreSQLEndpoint {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
}
|
||||
@@ -1,7 +0,0 @@
|
||||
|
||||
pub mod capability;
|
||||
mod score;
|
||||
|
||||
|
||||
pub mod failover;
|
||||
|
||||
@@ -1,236 +0,0 @@
|
||||
use crate::{
|
||||
domain::{data::Version, interpret::InterpretStatus},
|
||||
interpret::{Interpret, InterpretError, InterpretName, Outcome},
|
||||
inventory::Inventory,
|
||||
modules::postgresql::capability::PostgreSQL,
|
||||
score::Score,
|
||||
topology::Topology,
|
||||
};
|
||||
|
||||
use super::capability::*;
|
||||
|
||||
use derive_new::new;
|
||||
use harmony_types::{id::Id, storage::StorageSize};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::info;
|
||||
use serde::Serialize;
|
||||
|
||||
pub struct PostgreSQLScore {
|
||||
config: PostgreSQLConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PostgreSQLInterpret {
|
||||
config: PostgreSQLConfig,
|
||||
version: Version,
|
||||
status: InterpretStatus,
|
||||
}
|
||||
|
||||
impl PostgreSQLInterpret {
|
||||
pub fn new(config: PostgreSQLConfig) -> Self {
|
||||
let version = Version::from("1.0.0").expect("Version should be valid");
|
||||
Self {
|
||||
config,
|
||||
version,
|
||||
status: InterpretStatus::QUEUED,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Topology + PostgreSQL> Score<T> for PostgreSQLScore {
|
||||
fn name(&self) -> String {
|
||||
"PostgreSQLScore".to_string()
|
||||
}
|
||||
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(PostgreSQLInterpret::new(self.config.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + PostgreSQL> Interpret<T> for PostgreSQLInterpret {
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("PostgreSQLInterpret")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> crate::domain::data::Version {
|
||||
self.version.clone()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
self.status.clone()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn execute(
|
||||
&self,
|
||||
_inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!(
|
||||
"Executing PostgreSQLInterpret with config {:?}",
|
||||
self.config
|
||||
);
|
||||
|
||||
let cluster_name = topology
|
||||
.deploy(&self.config)
|
||||
.await
|
||||
.map_err(|e| InterpretError::from(e))?;
|
||||
|
||||
Ok(Outcome::success(format!(
|
||||
"Deployed PostgreSQL cluster `{cluster_name}`"
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, new, Clone, Serialize)]
|
||||
pub struct MultisitePostgreSQLScore {
|
||||
pub cluster_name: String,
|
||||
pub primary_site: Id,
|
||||
pub replica_sites: Vec<Id>,
|
||||
pub instances: u32,
|
||||
pub storage_size: StorageSize,
|
||||
}
|
||||
|
||||
impl<T: FailoverTopology + crate::modules::postgresql::capability::PostgreSQL> Score<T> for MultisitePostgreSQLScore {
|
||||
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(MultisitePostgreSQLInterpret::new(self.clone()))
|
||||
}
|
||||
|
||||
fn name(&self) -> String {
|
||||
"MultisitePostgreSQLScore".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MultisitePostgreSQLInterpret {
|
||||
score: MultisitePostgreSQLScore,
|
||||
version: Version,
|
||||
status: InterpretStatus,
|
||||
}
|
||||
|
||||
impl MultisitePostgreSQLInterpret {
|
||||
pub fn new(score: MultisitePostgreSQLScore) -> Self {
|
||||
let version = Version::from("1.0.0").expect("Version should be valid");
|
||||
Self {
|
||||
score,
|
||||
version,
|
||||
status: InterpretStatus::QUEUED,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: MultisiteTopology + PostgreSQL> Interpret<T> for MultisitePostgreSQLInterpret {
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("MultisitePostgreSQLInterpret")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
self.version.clone()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
self.status.clone()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!("Track child interprets per site")
|
||||
}
|
||||
|
||||
async fn execute(
|
||||
&self,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
|
||||
info!(
|
||||
"Orchestrating multisite PostgreSQL: primary {:?}, replicas {:?}",
|
||||
self.score.primary_site, self.score.replica_sites
|
||||
);
|
||||
|
||||
// 1. Deploy primary
|
||||
let primary_topo = topology.primary();
|
||||
|
||||
let primary_config = PostgreSQLConfig {
|
||||
cluster_name: self.score.cluster_name.clone(),
|
||||
instances: self.score.instances,
|
||||
storage_size: self.score.storage_size.clone(),
|
||||
role: ClusterRole::Primary,
|
||||
};
|
||||
let primary_cluster_name = primary_topo
|
||||
.deploy(&primary_config)
|
||||
.await
|
||||
.map_err(|e| InterpretError::from(format!("Primary deploy failed: {e}")))?;
|
||||
|
||||
// 2. Extract certs & public endpoint from primary
|
||||
let certs = primary_topo
|
||||
.get_replication_certs(&primary_cluster_name)
|
||||
.await
|
||||
.map_err(|e| InterpretError::from(format!("Certs extract failed: {e}")))?;
|
||||
let public_endpoint = primary_topo
|
||||
.get_public_endpoint(&primary_cluster_name)
|
||||
.await??
|
||||
.ok_or_else(|| InterpretError::from("No public endpoint on primary"))?;
|
||||
|
||||
// 3. Deploy replicas
|
||||
for replica_site in &self.score.replica_sites {
|
||||
let replica_topo = topology.replica();
|
||||
|
||||
.map_err(|e| {
|
||||
InterpretError::from(format!(
|
||||
"Replica site {:?} lookup failed: {e}",
|
||||
replica_site
|
||||
))
|
||||
})?;
|
||||
|
||||
let connection_params: HashMap<String, String> = [
|
||||
("host".to_string(), public_endpoint.host.clone()),
|
||||
("port".to_string(), public_endpoint.port.to_string()),
|
||||
("dbname".to_string(), "postgres".to_string()),
|
||||
("user".to_string(), "streaming_replica".to_string()),
|
||||
("sslmode".to_string(), "verify-ca".to_string()),
|
||||
("sslnegotiation".to_string(), "direct".to_string()),
|
||||
]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let external_cluster = ExternalClusterConfig {
|
||||
name: "primary-cluster".to_string(),
|
||||
connection_parameters: connection_params,
|
||||
};
|
||||
|
||||
let replica_config_struct = ReplicaConfig {
|
||||
primary_cluster_name: primary_cluster_name.clone(),
|
||||
replication_certs: certs.clone(),
|
||||
bootstrap: BootstrapConfig {
|
||||
strategy: BootstrapStrategy::PgBasebackup,
|
||||
},
|
||||
external_cluster,
|
||||
};
|
||||
|
||||
let replica_config = PostgreSQLConfig {
|
||||
cluster_name: format!("{}-replica-{}", self.score.cluster_name, replica_site),
|
||||
instances: self.score.instances,
|
||||
storage_size: self.score.storage_size.clone(),
|
||||
role: ClusterRole::Replica(replica_config_struct),
|
||||
};
|
||||
|
||||
let _replica_cluster = replica_topo.deploy(&replica_config).await.map_err(|e| {
|
||||
InterpretError::from(format!("Replica {:?} deploy failed: {e}", replica_site))
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(Outcome::success(format!(
|
||||
"Multisite PostgreSQL `{}` deployed: primary `{}`, {} replicas",
|
||||
self.score.cluster_name,
|
||||
primary_cluster_name,
|
||||
self.score.replica_sites.len()
|
||||
)))
|
||||
}
|
||||
}
|
||||
@@ -1,4 +1,3 @@
|
||||
pub mod id;
|
||||
pub mod net;
|
||||
pub mod switch;
|
||||
pub mod storage;
|
||||
|
||||
@@ -1,6 +0,0 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Debug)]
|
||||
pub struct StorageSize {
|
||||
size_bytes: u64,
|
||||
}
|
||||
Reference in New Issue
Block a user