fix/dynamically_get_public_domain #234

Merged
johnride merged 7 commits from fix/dynamically_get_public_domain into master 2026-03-15 14:07:26 +00:00
10 changed files with 348 additions and 59 deletions

View File

@@ -14,7 +14,6 @@ async fn main() {
..Default::default() // Use harmony defaults, they are based on CNPG's default values : ..Default::default() // Use harmony defaults, they are based on CNPG's default values :
// "default" namespace, 1 instance, 1Gi storage // "default" namespace, 1 instance, 1Gi storage
}, },
hostname: "postgrestest.sto1.nationtech.io".to_string(),
}; };
harmony_cli::run( harmony_cli::run(

View File

@@ -15,7 +15,6 @@ async fn main() {
..Default::default() // Use harmony defaults, they are based on CNPG's default values : ..Default::default() // Use harmony defaults, they are based on CNPG's default values :
// 1 instance, 1Gi storage // 1 instance, 1Gi storage
}, },
hostname: "postgrestest.sto1.nationtech.io".to_string(),
}; };
let test_connection = PostgreSQLConnectionScore { let test_connection = PostgreSQLConnectionScore {

View File

@@ -109,6 +109,13 @@ impl K8sclient for K8sAnywhereTopology {
#[async_trait] #[async_trait]
impl TlsRouter for K8sAnywhereTopology { impl TlsRouter for K8sAnywhereTopology {
async fn get_public_domain(&self) -> Result<String, String> {
match &self.config.public_domain {
Some(public_domain) => Ok(public_domain.to_string()),
None => Err("Public domain not available".to_string()),
}
}
async fn get_internal_domain(&self) -> Result<Option<String>, String> { async fn get_internal_domain(&self) -> Result<Option<String>, String> {
match self.get_k8s_distribution().await.map_err(|e| { match self.get_k8s_distribution().await.map_err(|e| {
format!( format!(
@@ -1124,6 +1131,7 @@ pub struct K8sAnywhereConfig {
/// ///
/// If the context name is not found, it will fail to initialize. /// If the context name is not found, it will fail to initialize.
pub k8s_context: Option<String>, pub k8s_context: Option<String>,
public_domain: Option<String>,
} }
impl K8sAnywhereConfig { impl K8sAnywhereConfig {
@@ -1151,6 +1159,7 @@ impl K8sAnywhereConfig {
let mut kubeconfig: Option<String> = None; let mut kubeconfig: Option<String> = None;
let mut k8s_context: Option<String> = None; let mut k8s_context: Option<String> = None;
let mut public_domain: Option<String> = None;
for part in env_var_value.split(',') { for part in env_var_value.split(',') {
let kv: Vec<&str> = part.splitn(2, '=').collect(); let kv: Vec<&str> = part.splitn(2, '=').collect();
@@ -1158,6 +1167,7 @@ impl K8sAnywhereConfig {
match kv[0].trim() { match kv[0].trim() {
"kubeconfig" => kubeconfig = Some(kv[1].trim().to_string()), "kubeconfig" => kubeconfig = Some(kv[1].trim().to_string()),
"context" => k8s_context = Some(kv[1].trim().to_string()), "context" => k8s_context = Some(kv[1].trim().to_string()),
"public_domain" => public_domain = Some(kv[1].trim().to_string()),
_ => {} _ => {}
} }
} }
@@ -1175,6 +1185,7 @@ impl K8sAnywhereConfig {
K8sAnywhereConfig { K8sAnywhereConfig {
kubeconfig, kubeconfig,
k8s_context, k8s_context,
public_domain,
use_system_kubeconfig, use_system_kubeconfig,
autoinstall: false, autoinstall: false,
use_local_k3d: false, use_local_k3d: false,
@@ -1217,6 +1228,7 @@ impl K8sAnywhereConfig {
use_local_k3d: std::env::var("HARMONY_USE_LOCAL_K3D") use_local_k3d: std::env::var("HARMONY_USE_LOCAL_K3D")
.map_or_else(|_| true, |v| v.parse().ok().unwrap_or(true)), .map_or_else(|_| true, |v| v.parse().ok().unwrap_or(true)),
k8s_context: std::env::var("HARMONY_K8S_CONTEXT").ok(), k8s_context: std::env::var("HARMONY_K8S_CONTEXT").ok(),
public_domain: std::env::var("HARMONY_PUBLIC_DOMAIN").ok(),
} }
} }
} }

View File

@@ -122,4 +122,6 @@ pub trait TlsRouter: Send + Sync {
/// Returns the port that this router exposes externally. /// Returns the port that this router exposes externally.
async fn get_router_port(&self) -> u16; async fn get_router_port(&self) -> u16;
async fn get_public_domain(&self) -> Result<String, String>;
} }

View File

@@ -4,7 +4,20 @@ use log::warn;
use crate::topology::{FailoverTopology, TlsRoute, TlsRouter}; use crate::topology::{FailoverTopology, TlsRoute, TlsRouter};
#[async_trait] #[async_trait]
impl<T: TlsRouter> TlsRouter for FailoverTopology<T> { impl<T: TlsRouter + Send> TlsRouter for FailoverTopology<T> {
async fn get_public_domain(&self) -> Result<String, String> {
/*
let primary_domain = self
.primary
.get_public_domain()
.await
.map_err(|e| e.to_string())?;
Ok(primary_domain)
*/
todo!()
}
async fn get_internal_domain(&self) -> Result<Option<String>, String> { async fn get_internal_domain(&self) -> Result<Option<String>, String> {
todo!() todo!()
} }

View File

@@ -37,6 +37,7 @@ pub struct PostgreSQLConfig {
/// settings incompatible with the default CNPG behavior. /// settings incompatible with the default CNPG behavior.
pub namespace: String, pub namespace: String,
} }
impl PostgreSQLConfig { impl PostgreSQLConfig {
pub fn with_namespace(&self, namespace: &str) -> PostgreSQLConfig { pub fn with_namespace(&self, namespace: &str) -> PostgreSQLConfig {
let mut new = self.clone(); let mut new = self.clone();

View File

@@ -1,4 +1,5 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::collections::HashMap;
use kube::{CustomResource, api::ObjectMeta}; use kube::{CustomResource, api::ObjectMeta};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
@@ -19,6 +20,10 @@ pub struct ClusterSpec {
pub image_name: Option<String>, pub image_name: Option<String>,
pub storage: Storage, pub storage: Storage,
pub bootstrap: Bootstrap, pub bootstrap: Bootstrap,
#[serde(skip_serializing_if = "Option::is_none")]
pub external_clusters: Option<Vec<ExternalCluster>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub replica: Option<ReplicaSpec>,
/// This must be set to None if you want cnpg to generate a superuser secret /// This must be set to None if you want cnpg to generate a superuser secret
#[serde(skip_serializing_if = "Option::is_none")] #[serde(skip_serializing_if = "Option::is_none")]
pub superuser_secret: Option<BTreeMap<String, String>>, pub superuser_secret: Option<BTreeMap<String, String>>,
@@ -41,6 +46,8 @@ impl Default for ClusterSpec {
image_name: None, image_name: None,
storage: Storage::default(), storage: Storage::default(),
bootstrap: Bootstrap::default(), bootstrap: Bootstrap::default(),
external_clusters: None,
replica: None,
superuser_secret: None, superuser_secret: None,
enable_superuser_access: false, enable_superuser_access: false,
} }
@@ -56,7 +63,13 @@ pub struct Storage {
#[derive(Deserialize, Serialize, Clone, Debug, Default)] #[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct Bootstrap { pub struct Bootstrap {
pub initdb: Initdb, #[serde(skip_serializing_if = "Option::is_none")]
pub initdb: Option<Initdb>,
#[serde(skip_serializing_if = "Option::is_none")]
pub recovery: Option<Recovery>,
#[serde(rename = "pg_basebackup")]
#[serde(skip_serializing_if = "Option::is_none")]
pub pg_basebackup: Option<PgBaseBackup>,
} }
#[derive(Deserialize, Serialize, Clone, Debug, Default)] #[derive(Deserialize, Serialize, Clone, Debug, Default)]
@@ -65,3 +78,50 @@ pub struct Initdb {
pub database: String, pub database: String,
pub owner: String, pub owner: String,
} }
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Recovery {
pub source: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct PgBaseBackup {
pub source: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ExternalCluster {
pub name: String,
pub connection_parameters: HashMap<String, String>,
pub ssl_key: Option<SecretKeySelector>,
pub ssl_cert: Option<SecretKeySelector>,
pub ssl_root_cert: Option<SecretKeySelector>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ConnectionParameters {
pub host: String,
pub user: String,
pub dbname: String,
pub sslmode: String,
pub sslnegotiation: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ReplicaSpec {
pub enabled: bool,
pub source: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub primary: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct SecretKeySelector {
pub name: String,
pub key: String,
}

View File

@@ -3,6 +3,8 @@ use log::debug;
use log::info; use log::info;
use std::collections::HashMap; use std::collections::HashMap;
use crate::interpret::InterpretError;
use crate::topology::TlsRoute;
use crate::topology::TlsRouter; use crate::topology::TlsRouter;
use crate::{ use crate::{
modules::postgresql::capability::{ modules::postgresql::capability::{
@@ -49,8 +51,18 @@ impl<T: PostgreSQL + TlsRouter> PostgreSQL for FailoverTopology<T> {
// TODO we should be getting the public endpoint for a service by calling a method on // TODO we should be getting the public endpoint for a service by calling a method on
// TlsRouter capability. // TlsRouter capability.
// Something along the lines of `TlsRouter::get_hostname_for_service(...).await?;` // Something along the lines of `TlsRouter::get_hostname_for_service(...).await?;`
let host = format!(
"{}.{}.{}",
config.cluster_name,
config.namespace,
self.primary
.get_public_domain()
.await
.expect("failed to retrieve public domain")
.to_string()
);
let endpoint = PostgreSQLEndpoint { let endpoint = PostgreSQLEndpoint {
host: "postgrestest.sto1.nationtech.io".to_string(), host,
port: self.primary.get_router_port().await, port: self.primary.get_router_port().await,
}; };
@@ -59,6 +71,46 @@ impl<T: PostgreSQL + TlsRouter> PostgreSQL for FailoverTopology<T> {
endpoint.host, endpoint.port endpoint.host, endpoint.port
); );
info!("installing primary postgres route");
let prim_hostname = format!(
"{}.{}.{}",
config.cluster_name,
config.namespace,
self.primary.get_public_domain().await?
);
let rw_backend = format!("{}-rw", config.cluster_name);
let tls_route = TlsRoute {
hostname: prim_hostname,
backend: rw_backend,
target_port: 5432,
namespace: config.namespace.clone(),
};
// Expose RW publicly via TLS passthrough
self.primary
.install_route(tls_route.clone())
.await
.map_err(|e| InterpretError::new(e))?;
info!("installing replica postgres route");
let rep_hostname = format!(
"{}.{}.{}",
config.cluster_name,
config.namespace,
self.replica.get_public_domain().await?
);
let rw_backend = format!("{}-rw", config.cluster_name);
let tls_route = TlsRoute {
hostname: rep_hostname,
backend: rw_backend,
target_port: 5432,
namespace: config.namespace.clone(),
};
// Expose RW publicly via TLS passthrough
self.replica
.install_route(tls_route.clone())
.await
.map_err(|e| InterpretError::new(e))?;
info!("Configuring replica connection parameters and bootstrap"); info!("Configuring replica connection parameters and bootstrap");
let mut connection_parameters = HashMap::new(); let mut connection_parameters = HashMap::new();

View File

@@ -1,14 +1,21 @@
use std::collections::BTreeMap; use crate::data::Version;
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory;
use serde::Serialize;
use crate::interpret::Interpret;
use crate::modules::k8s::resource::K8sResourceScore; use crate::modules::k8s::resource::K8sResourceScore;
use crate::modules::postgresql::capability::PostgreSQLConfig; use crate::modules::postgresql::capability::PostgreSQLConfig;
use crate::modules::postgresql::cnpg::{Bootstrap, Cluster, ClusterSpec, Initdb, Storage}; use crate::modules::postgresql::cnpg::{
Bootstrap, Cluster, ClusterSpec, ExternalCluster, Initdb, PgBaseBackup, ReplicaSpec,
SecretKeySelector, Storage,
};
use crate::score::Score; use crate::score::Score;
use crate::topology::{K8sclient, Topology}; use crate::topology::{K8sclient, Topology};
use async_trait::async_trait;
use harmony_types::id::Id;
use k8s_openapi::ByteString;
use k8s_openapi::api::core::v1::Secret;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use serde::Serialize;
/// Deploys an opinionated, highly available PostgreSQL cluster managed by CNPG. /// Deploys an opinionated, highly available PostgreSQL cluster managed by CNPG.
/// ///
@@ -51,37 +58,184 @@ impl K8sPostgreSQLScore {
impl<T: Topology + K8sclient> Score<T> for K8sPostgreSQLScore { impl<T: Topology + K8sclient> Score<T> for K8sPostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> { fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let metadata = ObjectMeta { Box::new(K8sPostgreSQLInterpret {
name: Some(self.config.cluster_name.clone()), config: self.config.clone(),
namespace: Some(self.config.namespace.clone()), })
..ObjectMeta::default()
};
let spec = ClusterSpec {
instances: self.config.instances,
storage: Storage {
size: self.config.storage_size.to_string(),
},
bootstrap: Bootstrap {
initdb: Initdb {
database: "app".to_string(),
owner: "app".to_string(),
},
},
// superuser_secret: Some(BTreeMap::from([(
// "name".to_string(),
// format!("{}-superuser", self.config.cluster_name.clone()),
// )])),
enable_superuser_access: true,
..ClusterSpec::default()
};
let cluster = Cluster { metadata, spec };
K8sResourceScore::single(cluster, Some(self.config.namespace.clone())).create_interpret()
} }
fn name(&self) -> String { fn name(&self) -> String {
format!("PostgreSQLScore({})", self.config.namespace) format!("PostgreSQLScore({})", self.config.namespace)
} }
} }
#[derive(Debug)]
pub struct K8sPostgreSQLInterpret {
config: PostgreSQLConfig,
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for K8sPostgreSQLInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
match &self.config.role {
super::capability::PostgreSQLClusterRole::Primary => {
let metadata = ObjectMeta {
name: Some(self.config.cluster_name.clone()),
namespace: Some(self.config.namespace.clone()),
..ObjectMeta::default()
};
let spec = ClusterSpec {
instances: self.config.instances,
storage: Storage {
size: self.config.storage_size.to_string(),
},
bootstrap: Bootstrap {
initdb: Some(Initdb {
database: "app".to_string(),
owner: "app".to_string(),
}),
recovery: None,
pg_basebackup: None,
},
enable_superuser_access: true,
..ClusterSpec::default()
};
let cluster = Cluster { metadata, spec };
Ok(
K8sResourceScore::single(cluster, Some(self.config.namespace.clone()))
.create_interpret()
.execute(inventory, topology)
.await?,
)
}
super::capability::PostgreSQLClusterRole::Replica(replica_config) => {
let metadata = ObjectMeta {
name: Some("streaming-replica-certs".to_string()),
namespace: Some(self.config.namespace.clone()),
..ObjectMeta::default()
};
// The data must be base64-encoded. If you already have PEM strings in your config, encode them:
let mut data = std::collections::BTreeMap::new();
data.insert(
"tls.key".to_string(),
ByteString(
replica_config
.replication_certs
.streaming_replica_key_pem
.as_bytes()
.to_vec(),
),
);
data.insert(
"tls.crt".to_string(),
ByteString(
replica_config
.replication_certs
.streaming_replica_cert_pem
.as_bytes()
.to_vec(),
),
);
data.insert(
"ca.crt".to_string(),
ByteString(
replica_config
.replication_certs
.ca_cert_pem
.as_bytes()
.to_vec(),
),
);
let secret = Secret {
metadata,
data: Some(data),
string_data: None, // You could use string_data if you prefer raw strings
type_: Some("Opaque".to_string()),
..Secret::default()
};
K8sResourceScore::single(secret, Some(self.config.namespace.clone()))
.create_interpret()
.execute(inventory, topology)
.await?;
let metadata = ObjectMeta {
name: Some(self.config.cluster_name.clone()),
namespace: Some(self.config.namespace.clone()),
..ObjectMeta::default()
};
let spec = ClusterSpec {
instances: self.config.instances,
storage: Storage {
size: self.config.storage_size.to_string(),
},
bootstrap: Bootstrap {
initdb: None,
recovery: None,
pg_basebackup: Some(PgBaseBackup {
source: replica_config.primary_cluster_name.clone(),
}),
},
external_clusters: Some(vec![ExternalCluster {
name: replica_config.primary_cluster_name.clone(),
connection_parameters: replica_config
.external_cluster
.connection_parameters
.clone(),
ssl_key: Some(SecretKeySelector {
name: "streaming-replica-certs".to_string(),
key: "tls.key".to_string(),
}),
ssl_cert: Some(SecretKeySelector {
name: "streaming-replica-certs".to_string(),
key: "tls.crt".to_string(),
}),
ssl_root_cert: Some(SecretKeySelector {
name: "streaming-replica-certs".to_string(),
key: "ca.crt".to_string(),
}),
}]),
replica: Some(ReplicaSpec {
enabled: true,
source: replica_config.primary_cluster_name.clone(),
primary: None,
}),
..ClusterSpec::default()
};
let cluster = Cluster { metadata, spec };
Ok(
K8sResourceScore::single(cluster, Some(self.config.namespace.clone()))
.create_interpret()
.execute(inventory, topology)
.await?,
)
}
}
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("K8sPostgreSQLInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@@ -18,46 +18,31 @@ use crate::topology::Topology;
/// # Usage /// # Usage
/// ``` /// ```
/// use harmony::modules::postgresql::PublicPostgreSQLScore; /// use harmony::modules::postgresql::PublicPostgreSQLScore;
/// let score = PublicPostgreSQLScore::new("harmony", "pg-rw.example.com"); /// let score = PublicPostgreSQLScore::new("harmony");
/// ``` /// ```
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct PublicPostgreSQLScore { pub struct PublicPostgreSQLScore {
/// Inner non-public Postgres cluster config. /// Inner non-public Postgres cluster config.
pub config: PostgreSQLConfig, pub config: PostgreSQLConfig,
/// Public hostname for RW TLS passthrough (port 443 → cluster-rw:5432).
pub hostname: String,
} }
impl PublicPostgreSQLScore { impl PublicPostgreSQLScore {
pub fn new(namespace: &str, hostname: &str) -> Self { pub fn new(namespace: &str) -> Self {
Self { Self {
config: PostgreSQLConfig::default().with_namespace(namespace), config: PostgreSQLConfig::default().with_namespace(namespace),
hostname: hostname.to_string(),
} }
} }
} }
impl<T: Topology + PostgreSQL + TlsRouter> Score<T> for PublicPostgreSQLScore { impl<T: Topology + PostgreSQL + TlsRouter> Score<T> for PublicPostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> { fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let rw_backend = format!("{}-rw", self.config.cluster_name);
let tls_route = TlsRoute {
namespace: self.config.namespace.clone(),
hostname: self.hostname.clone(),
backend: rw_backend,
target_port: 5432,
};
Box::new(PublicPostgreSQLInterpret { Box::new(PublicPostgreSQLInterpret {
config: self.config.clone(), config: self.config.clone(),
tls_route,
}) })
} }
fn name(&self) -> String { fn name(&self) -> String {
format!( format!("PublicPostgreSQLScore({})", self.config.namespace)
"PublicPostgreSQLScore({}:{})",
self.config.namespace, self.hostname
)
} }
} }
@@ -65,7 +50,6 @@ impl<T: Topology + PostgreSQL + TlsRouter> Score<T> for PublicPostgreSQLScore {
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
struct PublicPostgreSQLInterpret { struct PublicPostgreSQLInterpret {
config: PostgreSQLConfig, config: PostgreSQLConfig,
tls_route: TlsRoute,
} }
#[async_trait] #[async_trait]
@@ -76,15 +60,28 @@ impl<T: Topology + PostgreSQL + TlsRouter> Interpret<T> for PublicPostgreSQLInte
.await .await
.map_err(|e| InterpretError::new(e))?; .map_err(|e| InterpretError::new(e))?;
let hostname = format!(
"{}.{}.{}",
self.config.cluster_name,
self.config.namespace,
topo.get_public_domain().await?
);
let rw_backend = format!("{}-rw", self.config.cluster_name);
let tls_route = TlsRoute {
hostname,
backend: rw_backend,
target_port: 5432,
namespace: self.config.namespace.clone(),
};
// Expose RW publicly via TLS passthrough // Expose RW publicly via TLS passthrough
topo.install_route(self.tls_route.clone()) topo.install_route(tls_route.clone())
.await .await
.map_err(|e| InterpretError::new(e))?; .map_err(|e| InterpretError::new(e))?;
Ok(Outcome::success(format!( Ok(Outcome::success(format!(
"Public CNPG cluster '{}' deployed with TLS passthrough route '{}'", "Public CNPG cluster '{}' deployed with TLS passthrough route '{}'",
self.config.cluster_name.clone(), self.config.cluster_name.clone(),
self.tls_route.hostname tls_route.hostname
))) )))
} }