diff --git a/examples/multisite_postgres/src/main.rs b/examples/multisite_postgres/src/main.rs index 7ae4beb..739198f 100644 --- a/examples/multisite_postgres/src/main.rs +++ b/examples/multisite_postgres/src/main.rs @@ -14,7 +14,6 @@ async fn main() { ..Default::default() // Use harmony defaults, they are based on CNPG's default values : // "default" namespace, 1 instance, 1Gi storage }, - hostname: "postgrestest.sto1.nationtech.io".to_string(), }; harmony_cli::run( diff --git a/examples/public_postgres/src/main.rs b/examples/public_postgres/src/main.rs index 772e801..ebab12b 100644 --- a/examples/public_postgres/src/main.rs +++ b/examples/public_postgres/src/main.rs @@ -15,7 +15,6 @@ async fn main() { ..Default::default() // Use harmony defaults, they are based on CNPG's default values : // 1 instance, 1Gi storage }, - hostname: "postgrestest.sto1.nationtech.io".to_string(), }; let test_connection = PostgreSQLConnectionScore { diff --git a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs index 0f93a18..f457610 100644 --- a/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs @@ -109,6 +109,13 @@ impl K8sclient for K8sAnywhereTopology { #[async_trait] impl TlsRouter for K8sAnywhereTopology { + async fn get_public_domain(&self) -> Result { + match &self.config.public_domain { + Some(public_domain) => Ok(public_domain.to_string()), + None => Err("Public domain not available".to_string()), + } + } + async fn get_internal_domain(&self) -> Result, String> { match self.get_k8s_distribution().await.map_err(|e| { format!( @@ -1124,6 +1131,7 @@ pub struct K8sAnywhereConfig { /// /// If the context name is not found, it will fail to initialize. pub k8s_context: Option, + public_domain: Option, } impl K8sAnywhereConfig { @@ -1151,6 +1159,7 @@ impl K8sAnywhereConfig { let mut kubeconfig: Option = None; let mut k8s_context: Option = None; + let mut public_domain: Option = None; for part in env_var_value.split(',') { let kv: Vec<&str> = part.splitn(2, '=').collect(); @@ -1158,6 +1167,7 @@ impl K8sAnywhereConfig { match kv[0].trim() { "kubeconfig" => kubeconfig = Some(kv[1].trim().to_string()), "context" => k8s_context = Some(kv[1].trim().to_string()), + "public_domain" => public_domain = Some(kv[1].trim().to_string()), _ => {} } } @@ -1175,6 +1185,7 @@ impl K8sAnywhereConfig { K8sAnywhereConfig { kubeconfig, k8s_context, + public_domain, use_system_kubeconfig, autoinstall: false, use_local_k3d: false, @@ -1217,6 +1228,7 @@ impl K8sAnywhereConfig { use_local_k3d: std::env::var("HARMONY_USE_LOCAL_K3D") .map_or_else(|_| true, |v| v.parse().ok().unwrap_or(true)), k8s_context: std::env::var("HARMONY_K8S_CONTEXT").ok(), + public_domain: std::env::var("HARMONY_PUBLIC_DOMAIN").ok(), } } } diff --git a/harmony/src/domain/topology/router.rs b/harmony/src/domain/topology/router.rs index 30d5b46..59382d5 100644 --- a/harmony/src/domain/topology/router.rs +++ b/harmony/src/domain/topology/router.rs @@ -122,4 +122,6 @@ pub trait TlsRouter: Send + Sync { /// Returns the port that this router exposes externally. async fn get_router_port(&self) -> u16; + + async fn get_public_domain(&self) -> Result; } diff --git a/harmony/src/modules/network/failover.rs b/harmony/src/modules/network/failover.rs index d5fd8c0..6e9b0fb 100644 --- a/harmony/src/modules/network/failover.rs +++ b/harmony/src/modules/network/failover.rs @@ -4,7 +4,20 @@ use log::warn; use crate::topology::{FailoverTopology, TlsRoute, TlsRouter}; #[async_trait] -impl TlsRouter for FailoverTopology { +impl TlsRouter for FailoverTopology { + async fn get_public_domain(&self) -> Result { + /* + let primary_domain = self + .primary + .get_public_domain() + .await + .map_err(|e| e.to_string())?; + + Ok(primary_domain) + */ + todo!() + } + async fn get_internal_domain(&self) -> Result, String> { todo!() } diff --git a/harmony/src/modules/postgresql/capability.rs b/harmony/src/modules/postgresql/capability.rs index 81ca83e..0530b69 100644 --- a/harmony/src/modules/postgresql/capability.rs +++ b/harmony/src/modules/postgresql/capability.rs @@ -37,6 +37,7 @@ pub struct PostgreSQLConfig { /// settings incompatible with the default CNPG behavior. pub namespace: String, } + impl PostgreSQLConfig { pub fn with_namespace(&self, namespace: &str) -> PostgreSQLConfig { let mut new = self.clone(); diff --git a/harmony/src/modules/postgresql/cnpg/crd.rs b/harmony/src/modules/postgresql/cnpg/crd.rs index 5b54fd0..c27a1d1 100644 --- a/harmony/src/modules/postgresql/cnpg/crd.rs +++ b/harmony/src/modules/postgresql/cnpg/crd.rs @@ -1,4 +1,5 @@ use std::collections::BTreeMap; +use std::collections::HashMap; use kube::{CustomResource, api::ObjectMeta}; use serde::{Deserialize, Serialize}; @@ -19,6 +20,10 @@ pub struct ClusterSpec { pub image_name: Option, pub storage: Storage, pub bootstrap: Bootstrap, + #[serde(skip_serializing_if = "Option::is_none")] + pub external_clusters: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub replica: Option, /// This must be set to None if you want cnpg to generate a superuser secret #[serde(skip_serializing_if = "Option::is_none")] pub superuser_secret: Option>, @@ -41,6 +46,8 @@ impl Default for ClusterSpec { image_name: None, storage: Storage::default(), bootstrap: Bootstrap::default(), + external_clusters: None, + replica: None, superuser_secret: None, enable_superuser_access: false, } @@ -56,7 +63,13 @@ pub struct Storage { #[derive(Deserialize, Serialize, Clone, Debug, Default)] #[serde(rename_all = "camelCase")] pub struct Bootstrap { - pub initdb: Initdb, + #[serde(skip_serializing_if = "Option::is_none")] + pub initdb: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub recovery: Option, + #[serde(rename = "pg_basebackup")] + #[serde(skip_serializing_if = "Option::is_none")] + pub pg_basebackup: Option, } #[derive(Deserialize, Serialize, Clone, Debug, Default)] @@ -65,3 +78,50 @@ pub struct Initdb { pub database: String, pub owner: String, } + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct Recovery { + pub source: String, +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +pub struct PgBaseBackup { + pub source: String, +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ExternalCluster { + pub name: String, + pub connection_parameters: HashMap, + pub ssl_key: Option, + pub ssl_cert: Option, + pub ssl_root_cert: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ConnectionParameters { + pub host: String, + pub user: String, + pub dbname: String, + pub sslmode: String, + pub sslnegotiation: String, +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ReplicaSpec { + pub enabled: bool, + pub source: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub primary: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct SecretKeySelector { + pub name: String, + pub key: String, +} diff --git a/harmony/src/modules/postgresql/failover.rs b/harmony/src/modules/postgresql/failover.rs index 10fd654..b4ae813 100644 --- a/harmony/src/modules/postgresql/failover.rs +++ b/harmony/src/modules/postgresql/failover.rs @@ -3,6 +3,8 @@ use log::debug; use log::info; use std::collections::HashMap; +use crate::interpret::InterpretError; +use crate::topology::TlsRoute; use crate::topology::TlsRouter; use crate::{ modules::postgresql::capability::{ @@ -49,8 +51,18 @@ impl PostgreSQL for FailoverTopology { // TODO we should be getting the public endpoint for a service by calling a method on // TlsRouter capability. // Something along the lines of `TlsRouter::get_hostname_for_service(...).await?;` + let host = format!( + "{}.{}.{}", + config.cluster_name, + config.namespace, + self.primary + .get_public_domain() + .await + .expect("failed to retrieve public domain") + .to_string() + ); let endpoint = PostgreSQLEndpoint { - host: "postgrestest.sto1.nationtech.io".to_string(), + host, port: self.primary.get_router_port().await, }; @@ -59,6 +71,46 @@ impl PostgreSQL for FailoverTopology { endpoint.host, endpoint.port ); + info!("installing primary postgres route"); + let prim_hostname = format!( + "{}.{}.{}", + config.cluster_name, + config.namespace, + self.primary.get_public_domain().await? + ); + let rw_backend = format!("{}-rw", config.cluster_name); + let tls_route = TlsRoute { + hostname: prim_hostname, + backend: rw_backend, + target_port: 5432, + namespace: config.namespace.clone(), + }; + // Expose RW publicly via TLS passthrough + self.primary + .install_route(tls_route.clone()) + .await + .map_err(|e| InterpretError::new(e))?; + + info!("installing replica postgres route"); + let rep_hostname = format!( + "{}.{}.{}", + config.cluster_name, + config.namespace, + self.replica.get_public_domain().await? + ); + let rw_backend = format!("{}-rw", config.cluster_name); + let tls_route = TlsRoute { + hostname: rep_hostname, + backend: rw_backend, + target_port: 5432, + namespace: config.namespace.clone(), + }; + + // Expose RW publicly via TLS passthrough + self.replica + .install_route(tls_route.clone()) + .await + .map_err(|e| InterpretError::new(e))?; info!("Configuring replica connection parameters and bootstrap"); let mut connection_parameters = HashMap::new(); diff --git a/harmony/src/modules/postgresql/score_k8s.rs b/harmony/src/modules/postgresql/score_k8s.rs index 5c4b834..0c096d3 100644 --- a/harmony/src/modules/postgresql/score_k8s.rs +++ b/harmony/src/modules/postgresql/score_k8s.rs @@ -1,14 +1,21 @@ -use std::collections::BTreeMap; +use crate::data::Version; +use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}; +use crate::inventory::Inventory; -use serde::Serialize; - -use crate::interpret::Interpret; use crate::modules::k8s::resource::K8sResourceScore; use crate::modules::postgresql::capability::PostgreSQLConfig; -use crate::modules::postgresql::cnpg::{Bootstrap, Cluster, ClusterSpec, Initdb, Storage}; +use crate::modules::postgresql::cnpg::{ + Bootstrap, Cluster, ClusterSpec, ExternalCluster, Initdb, PgBaseBackup, ReplicaSpec, + SecretKeySelector, Storage, +}; use crate::score::Score; use crate::topology::{K8sclient, Topology}; +use async_trait::async_trait; +use harmony_types::id::Id; +use k8s_openapi::ByteString; +use k8s_openapi::api::core::v1::Secret; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; +use serde::Serialize; /// Deploys an opinionated, highly available PostgreSQL cluster managed by CNPG. /// @@ -51,37 +58,184 @@ impl K8sPostgreSQLScore { impl Score for K8sPostgreSQLScore { fn create_interpret(&self) -> Box> { - let metadata = ObjectMeta { - name: Some(self.config.cluster_name.clone()), - namespace: Some(self.config.namespace.clone()), - ..ObjectMeta::default() - }; - - let spec = ClusterSpec { - instances: self.config.instances, - storage: Storage { - size: self.config.storage_size.to_string(), - }, - bootstrap: Bootstrap { - initdb: Initdb { - database: "app".to_string(), - owner: "app".to_string(), - }, - }, - // superuser_secret: Some(BTreeMap::from([( - // "name".to_string(), - // format!("{}-superuser", self.config.cluster_name.clone()), - // )])), - enable_superuser_access: true, - ..ClusterSpec::default() - }; - - let cluster = Cluster { metadata, spec }; - - K8sResourceScore::single(cluster, Some(self.config.namespace.clone())).create_interpret() + Box::new(K8sPostgreSQLInterpret { + config: self.config.clone(), + }) } fn name(&self) -> String { format!("PostgreSQLScore({})", self.config.namespace) } } + +#[derive(Debug)] +pub struct K8sPostgreSQLInterpret { + config: PostgreSQLConfig, +} + +#[async_trait] +impl Interpret for K8sPostgreSQLInterpret { + async fn execute( + &self, + inventory: &Inventory, + topology: &T, + ) -> Result { + match &self.config.role { + super::capability::PostgreSQLClusterRole::Primary => { + let metadata = ObjectMeta { + name: Some(self.config.cluster_name.clone()), + namespace: Some(self.config.namespace.clone()), + ..ObjectMeta::default() + }; + + let spec = ClusterSpec { + instances: self.config.instances, + storage: Storage { + size: self.config.storage_size.to_string(), + }, + bootstrap: Bootstrap { + initdb: Some(Initdb { + database: "app".to_string(), + owner: "app".to_string(), + }), + recovery: None, + pg_basebackup: None, + }, + enable_superuser_access: true, + ..ClusterSpec::default() + }; + let cluster = Cluster { metadata, spec }; + + Ok( + K8sResourceScore::single(cluster, Some(self.config.namespace.clone())) + .create_interpret() + .execute(inventory, topology) + .await?, + ) + } + super::capability::PostgreSQLClusterRole::Replica(replica_config) => { + let metadata = ObjectMeta { + name: Some("streaming-replica-certs".to_string()), + namespace: Some(self.config.namespace.clone()), + ..ObjectMeta::default() + }; + + // The data must be base64-encoded. If you already have PEM strings in your config, encode them: + let mut data = std::collections::BTreeMap::new(); + data.insert( + "tls.key".to_string(), + ByteString( + replica_config + .replication_certs + .streaming_replica_key_pem + .as_bytes() + .to_vec(), + ), + ); + data.insert( + "tls.crt".to_string(), + ByteString( + replica_config + .replication_certs + .streaming_replica_cert_pem + .as_bytes() + .to_vec(), + ), + ); + data.insert( + "ca.crt".to_string(), + ByteString( + replica_config + .replication_certs + .ca_cert_pem + .as_bytes() + .to_vec(), + ), + ); + + let secret = Secret { + metadata, + data: Some(data), + string_data: None, // You could use string_data if you prefer raw strings + type_: Some("Opaque".to_string()), + ..Secret::default() + }; + + K8sResourceScore::single(secret, Some(self.config.namespace.clone())) + .create_interpret() + .execute(inventory, topology) + .await?; + + let metadata = ObjectMeta { + name: Some(self.config.cluster_name.clone()), + namespace: Some(self.config.namespace.clone()), + ..ObjectMeta::default() + }; + + let spec = ClusterSpec { + instances: self.config.instances, + storage: Storage { + size: self.config.storage_size.to_string(), + }, + bootstrap: Bootstrap { + initdb: None, + recovery: None, + pg_basebackup: Some(PgBaseBackup { + source: replica_config.primary_cluster_name.clone(), + }), + }, + external_clusters: Some(vec![ExternalCluster { + name: replica_config.primary_cluster_name.clone(), + connection_parameters: replica_config + .external_cluster + .connection_parameters + .clone(), + ssl_key: Some(SecretKeySelector { + name: "streaming-replica-certs".to_string(), + key: "tls.key".to_string(), + }), + ssl_cert: Some(SecretKeySelector { + name: "streaming-replica-certs".to_string(), + key: "tls.crt".to_string(), + }), + ssl_root_cert: Some(SecretKeySelector { + name: "streaming-replica-certs".to_string(), + key: "ca.crt".to_string(), + }), + }]), + replica: Some(ReplicaSpec { + enabled: true, + source: replica_config.primary_cluster_name.clone(), + primary: None, + }), + ..ClusterSpec::default() + }; + + let cluster = Cluster { metadata, spec }; + + Ok( + K8sResourceScore::single(cluster, Some(self.config.namespace.clone())) + .create_interpret() + .execute(inventory, topology) + .await?, + ) + } + } + } + + fn get_name(&self) -> InterpretName { + InterpretName::Custom("K8sPostgreSQLInterpret") + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +} diff --git a/harmony/src/modules/postgresql/score_public.rs b/harmony/src/modules/postgresql/score_public.rs index eaf3c88..a7ef5a8 100644 --- a/harmony/src/modules/postgresql/score_public.rs +++ b/harmony/src/modules/postgresql/score_public.rs @@ -18,46 +18,31 @@ use crate::topology::Topology; /// # Usage /// ``` /// use harmony::modules::postgresql::PublicPostgreSQLScore; -/// let score = PublicPostgreSQLScore::new("harmony", "pg-rw.example.com"); +/// let score = PublicPostgreSQLScore::new("harmony"); /// ``` #[derive(Debug, Clone, Serialize)] pub struct PublicPostgreSQLScore { /// Inner non-public Postgres cluster config. pub config: PostgreSQLConfig, - /// Public hostname for RW TLS passthrough (port 443 → cluster-rw:5432). - pub hostname: String, } impl PublicPostgreSQLScore { - pub fn new(namespace: &str, hostname: &str) -> Self { + pub fn new(namespace: &str) -> Self { Self { config: PostgreSQLConfig::default().with_namespace(namespace), - hostname: hostname.to_string(), } } } impl Score for PublicPostgreSQLScore { fn create_interpret(&self) -> Box> { - let rw_backend = format!("{}-rw", self.config.cluster_name); - let tls_route = TlsRoute { - namespace: self.config.namespace.clone(), - hostname: self.hostname.clone(), - backend: rw_backend, - target_port: 5432, - }; - Box::new(PublicPostgreSQLInterpret { config: self.config.clone(), - tls_route, }) } fn name(&self) -> String { - format!( - "PublicPostgreSQLScore({}:{})", - self.config.namespace, self.hostname - ) + format!("PublicPostgreSQLScore({})", self.config.namespace) } } @@ -65,7 +50,6 @@ impl Score for PublicPostgreSQLScore { #[derive(Debug, Clone)] struct PublicPostgreSQLInterpret { config: PostgreSQLConfig, - tls_route: TlsRoute, } #[async_trait] @@ -76,15 +60,28 @@ impl Interpret for PublicPostgreSQLInte .await .map_err(|e| InterpretError::new(e))?; + let hostname = format!( + "{}.{}.{}", + self.config.cluster_name, + self.config.namespace, + topo.get_public_domain().await? + ); + let rw_backend = format!("{}-rw", self.config.cluster_name); + let tls_route = TlsRoute { + hostname, + backend: rw_backend, + target_port: 5432, + namespace: self.config.namespace.clone(), + }; // Expose RW publicly via TLS passthrough - topo.install_route(self.tls_route.clone()) + topo.install_route(tls_route.clone()) .await .map_err(|e| InterpretError::new(e))?; Ok(Outcome::success(format!( "Public CNPG cluster '{}' deployed with TLS passthrough route '{}'", self.config.cluster_name.clone(), - self.tls_route.hostname + tls_route.hostname ))) }