Compare commits

..

3 Commits

Author SHA1 Message Date
b61e4f9a96 wip: Expose postgres publicly. Created tlsroute capability and postgres implementations
Some checks failed
Run Check Script / check (pull_request) Failing after 41s
2025-12-13 09:47:59 -05:00
2e367d88d4 feat: PostgreSQL score works, added postgresql example, tested on OKD 4.19, added note about incompatible default namespace settings
Some checks failed
Run Check Script / check (pull_request) Failing after 2m37s
2025-12-11 22:54:57 -05:00
9edc42a665 feat: PostgreSQLScore happy path using cnpg operator
Some checks failed
Run Check Script / check (pull_request) Failing after 37s
2025-12-11 14:36:39 -05:00
8 changed files with 353 additions and 73 deletions

View File

@@ -0,0 +1,18 @@
[package]
name = "example-postgresql"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
cidr = { workspace = true }
tokio = { workspace = true }
harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }

View File

@@ -0,0 +1,22 @@
use harmony::{
inventory::Inventory, modules::postgresql::PostgreSQLScore, topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let postgresql = PostgreSQLScore {
name: "harmony-postgres-example".to_string(), // Override default name
namespace: "harmony-postgres-example".to_string(),
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
// "default" namespace, 1 instance, 1Gi storage
};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(postgresql)],
None,
)
.await
.unwrap();
}

View File

@@ -1,11 +1,19 @@
use async_trait::async_trait;
use cidr::Ipv4Cidr;
use derive_new::new;
use super::{IpAddress, LogicalHost};
/// Basic network router abstraction (L3 IP routing/gateway).
/// Distinguished from TlsRouter (L4 TLS passthrough).
pub trait Router: Send + Sync {
/// Gateway IP address for this subnet/router.
fn get_gateway(&self) -> IpAddress;
/// CIDR block managed by this router.
fn get_cidr(&self) -> Ipv4Cidr;
/// Logical host associated with this router.
fn get_host(&self) -> LogicalHost;
}
@@ -38,3 +46,76 @@ impl Router for UnmanagedRouter {
todo!()
}
}
#[derive(Clone, Debug)]
/// Desired state config for a TLS passthrough route.
/// Forwards external TLS (port 443) → backend service:target_port (no termination at router).
/// Inspired by CNPG multisite: exposes `-rw`/`-ro` services publicly via OKD Route/HAProxy/K8s
/// Gateway etc.
///
/// # Example
/// ```
/// use harmony::domain::topology::router::TlsRoute;
/// let postgres_rw = TlsRoute {
/// hostname: "postgres-cluster-example.public.domain.io".to_string(),
/// backend: "postgres-cluster-example-rw".to_string(), // k8s Service or HAProxy upstream
/// target_port: 5432,
/// };
/// ```
pub struct TlsRoute {
/// Public hostname clients connect to (TLS SNI, port 443 implicit).
/// Router matches this for passthrough forwarding.
pub hostname: String,
/// Backend/host identifier (k8s Service, HAProxy upstream, IP/FQDN, etc.).
pub backend: String,
/// Backend TCP port (Postgres: 5432).
pub target_port: u16,
}
#[async_trait]
/// Installs and queries TLS passthrough routes (L4 TCP/SNI forwarding, no TLS termination).
/// Agnostic to impl: OKD Route, AWS NLB+HAProxy, k3s Envoy Gateway, Apache ProxyPass.
/// Used by PostgreSQL capability to expose CNPG clusters multisite (site1 → site2 replication).
///
/// # Usage
/// ```rust,no_run
/// // After CNPG deploy, expose RW endpoint
/// let topology = okd_topology();
/// let route = TlsRoute { /* ... */ };
/// topology.install_route(route).await?; // OKD Route, HAProxy reload, etc.
///
/// // Client: psql \\"host={route.hostname} port=443 sslmode=verify-ca sslnegotiation=direct\\"
/// let public_ep = Endpoint { host: topology.hostname(), port: 443 };
/// ```
pub trait TlsRouter: Send + Sync {
/// Provisions the route (idempotent where possible).
/// Example: OKD Route{ host, to: backend:target_port, tls: {passthrough} };
/// HAProxy frontend→backend \"postgres-upstream\".
async fn install_route(&self, config: TlsRoute) -> Result<(), String>;
/// Installed route's public hostname.
fn hostname(&self) -> String;
/// Installed route's backend identifier.
fn backend(&self) -> String;
/// Installed route's backend port.
fn target_port(&self) -> u16;
}
impl std::fmt::Debug for dyn TlsRouter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"TlsRouter[hostname={}, backend={}:{}]",
self.hostname(),
self.backend(),
self.target_port()
))
}
}

View File

@@ -0,0 +1,58 @@
use kube::{api::ObjectMeta, CustomResource};
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
#[kube(
group = "postgresql.cnpg.io",
version = "v1",
kind = "Cluster",
plural = "clusters",
namespaced = true,
schema = "disabled"
)]
#[serde(rename_all = "camelCase")]
pub struct ClusterSpec {
pub instances: i32,
pub image_name: Option<String>,
pub storage: Storage,
pub bootstrap: Bootstrap,
}
impl Default for Cluster {
fn default() -> Self {
Cluster {
metadata: ObjectMeta::default(),
spec: ClusterSpec::default(),
}
}
}
impl Default for ClusterSpec {
fn default() -> Self {
Self {
instances: 1,
image_name: None,
storage: Storage::default(),
bootstrap: Bootstrap::default(),
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Storage {
pub size: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Bootstrap {
pub initdb: Initdb,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Initdb {
pub database: String,
pub owner: String,
}

View File

@@ -0,0 +1,2 @@
mod crd;
pub use crd::*;

View File

@@ -1,6 +1,11 @@
pub mod capability;
mod score;
pub use score::*;
mod score_public;
pub use score_public::*;
pub mod failover;
mod operator;
pub use operator::*;
pub mod cnpg;

View File

@@ -1,88 +1,93 @@
use crate::{
domain::{data::Version, interpret::InterpretStatus},
interpret::{Interpret, InterpretError, InterpretName, Outcome},
inventory::Inventory,
modules::postgresql::capability::PostgreSQL,
score::Score,
topology::Topology,
};
use super::capability::*;
use harmony_types::id::Id;
use async_trait::async_trait;
use log::info;
use serde::Serialize;
#[derive(Clone, Debug, Serialize)]
use crate::interpret::Interpret;
use crate::modules::k8s::resource::K8sResourceScore;
use crate::modules::postgresql::cnpg::{Bootstrap, Cluster, ClusterSpec, Initdb, Storage};
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
/// Deploys an opinionated, highly available PostgreSQL cluster managed by CNPG.
///
/// # Goals
/// - Production-ready Postgres HA (3 instances), persistent storage, app DB.
///
/// # Usage
/// ```
/// use harmony::modules::postgresql::PostgreSQLScore;
/// let score = PostgreSQLScore::new("my-app-ns");
/// ```
///
/// # Limitations (Happy Path)
/// - Requires CNPG operator installed (use CloudNativePgOperatorScore).
/// - No backups, monitoring, extensions configured.
///
/// TODO : refactor this to declare a clean dependency on cnpg operator. Then cnpg operator will
/// self-deploy either using operatorhub or helm chart depending on k8s flavor. This is cnpg
/// specific behavior
#[derive(Debug, Clone, Serialize)]
pub struct PostgreSQLScore {
config: PostgreSQLConfig,
pub name: String,
/// **Note :** on OpenShfit based clusters, the namespace `default` has security
/// settings incompatible with the default CNPG behavior.
pub namespace: String,
pub instances: i32,
pub storage_size: String,
pub image_name: Option<String>,
}
#[derive(Debug, Clone)]
pub struct PostgreSQLInterpret {
config: PostgreSQLConfig,
version: Version,
status: InterpretStatus,
}
impl PostgreSQLInterpret {
pub fn new(config: PostgreSQLConfig) -> Self {
let version = Version::from("1.0.0").expect("Version should be valid");
impl Default for PostgreSQLScore {
fn default() -> Self {
Self {
config,
version,
status: InterpretStatus::QUEUED,
name: "harmony-pg".to_string(),
// We are using the namespace harmony by default since some clusters (openshift family)
// have incompatible configuration of the default namespace with cnpg
namespace: "harmony".to_string(),
instances: 1,
storage_size: "1Gi".to_string(),
image_name: None, // This lets cnpg use its default image
}
}
}
impl<T: Topology + PostgreSQL> Score<T> for PostgreSQLScore {
fn name(&self) -> String {
"PostgreSQLScore".to_string()
impl PostgreSQLScore {
pub fn new(namespace: &str) -> Self {
Self {
namespace: namespace.to_string(),
..Default::default()
}
}
}
impl<T: Topology + K8sclient> Score<T> for PostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(PostgreSQLInterpret::new(self.config.clone()))
}
}
#[async_trait]
impl<T: Topology + PostgreSQL> Interpret<T> for PostgreSQLInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("PostgreSQLInterpret")
}
fn get_version(&self) -> crate::domain::data::Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
info!(
"Executing PostgreSQLInterpret with config {:?}",
self.config
);
let cluster_name = topology
.deploy(&self.config)
.await
.map_err(|e| InterpretError::from(e))?;
Ok(Outcome::success(format!(
"Deployed PostgreSQL cluster `{cluster_name}`"
)))
let metadata = ObjectMeta {
name: Some(self.name.clone()),
namespace: Some(self.namespace.clone()),
..ObjectMeta::default()
};
let spec = ClusterSpec {
instances: self.instances,
image_name: self.image_name.clone(),
storage: Storage {
size: self.storage_size.clone(),
},
bootstrap: Bootstrap {
initdb: Initdb {
database: "app".to_string(),
owner: "app".to_string(),
},
},
..ClusterSpec::default()
};
let cluster = Cluster { metadata, spec };
K8sResourceScore::single(cluster, Some(self.namespace.clone())).create_interpret()
}
fn name(&self) -> String {
format!("PostgreSQLScore({})", self.namespace)
}
}

View File

@@ -0,0 +1,89 @@
use async_trait::async_trait;
use serde::Serialize;
use crate::domain::topology::router::{TlsRoute, TlsRouter};
use crate::interpret::Interpret;
use crate::modules::k8s::resource::K8sResourceScore;
use crate::modules::postgresql::cnpg::{Bootstrap, Cluster, ClusterSpec, Initdb, Storage};
use crate::modules::postgresql::PostgreSQLScore;
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
/// Deploys a public PostgreSQL cluster: CNPG + TLS passthrough route for RW endpoint.
/// For failover/multisite: exposes single-instance or small HA Postgres publicly.
///
/// Sequence: PostgreSQLScore → TlsRouter::install_route (RW backend).
///
/// # Usage
/// ```
/// use harmony::modules::postgresql::PublicPostgreSQLScore;
/// let score = PublicPostgreSQLScore::new("harmony", "pg-rw.example.com");
/// ```
#[derive(Debug, Clone, Serialize)]
pub struct PublicPostgreSQLScore {
/// Inner non-public Postgres cluster config.
pub inner: PostgreSQLScore,
/// Public hostname for RW TLS passthrough (port 443 → cluster-rw:5432).
pub hostname: String,
}
impl Default for PublicPostgreSQLScore {
fn default() -> Self {
Self {
inner: PostgreSQLScore::default(),
hostname: "postgres.default.public".to_string(),
}
}
}
impl PublicPostgreSQLScore {
pub fn new(namespace: &str, hostname: &str) -> Self {
Self {
inner: PostgreSQLScore::new(namespace),
hostname: hostname.to_string(),
}
}
}
/// Custom interpret: deploy Postgres then install public TLS route.
#[derive(Debug, Clone)]
struct PublicPostgreSQLInterpret {
postgres_score: PostgreSQLScore,
tls_route: TlsRoute,
}
#[async_trait]
impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Interpret<T> for PublicPostgreSQLInterpret {
async fn interpret(&self, topo: &T) -> Result<(), Box<dyn std::error::Error>> {
// Deploy CNPG cluster first (creates -rw service)
self.postgres_score.create_interpret().interpret(topo).await?;
// Expose RW publicly via TLS passthrough
topo.install_route(self.tls_route.clone()).await.map_err(|e| Box::new(std::io::Error::new(std::io::ErrorKind::Other, e)) as Box<dyn std::error::Error>)?;
Ok(())
}
}
impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Score<T> for PublicPostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let rw_backend = format!("{}-rw", self.inner.name);
let tls_route = TlsRoute {
hostname: self.hostname.clone(),
backend: rw_backend,
target_port: 5432,
};
Box::new(PublicPostgreSQLInterpret {
postgres_score: self.inner.clone(),
tls_route,
})
}
fn name(&self) -> String {
format!("PublicPostgreSQLScore({}:{})", self.inner.namespace, self.hostname)
}
}
// TODO: Add RO route (separate hostname/backend="cluster-ro"), backups, failover logic.