diff --git a/examples/public_postgres/src/main.rs b/examples/public_postgres/src/main.rs index b1e9e66..43078ec 100644 --- a/examples/public_postgres/src/main.rs +++ b/examples/public_postgres/src/main.rs @@ -1,32 +1,33 @@ use harmony::{ inventory::Inventory, - modules::{network::TlsPassthroughScore, postgresql::PostgreSQLScore}, - topology::{K8sAnywhereTopology, TlsRoute}, + modules::postgresql::{PostgreSQLConnectionScore, PostgreSQLScore, PublicPostgreSQLScore}, + topology::K8sAnywhereTopology, }; #[tokio::main] async fn main() { - let namespace = "harmony-postgres-example".to_string(); - let postgresql = PostgreSQLScore { - name: "harmony-postgres-example".to_string(), // Override default name - namespace: namespace.clone(), - ..Default::default() // Use harmony defaults, they are based on CNPG's default values : - // "default" namespace, 1 instance, 1Gi storage + let postgres = PublicPostgreSQLScore { + postgres_score: PostgreSQLScore { + name: "harmony-postgres-example".to_string(), // Override default name + namespace: "harmony-public-postgres".to_string(), + ..Default::default() // Use harmony defaults, they are based on CNPG's default values : + // "default" namespace, 1 instance, 1Gi storage + }, + hostname: "postgrestest.sto1.nationtech.io".to_string(), }; - let tls_passthrough = TlsPassthroughScore { - route: TlsRoute { - hostname: "postgres.example.com".to_string(), // CNPG creates a -rw service for read-write endpoint - backend: format!("{}-rw", postgresql.name), // Public hostname for TLS SNI - namespace: namespace.clone(), - target_port: 5432, // PostgreSQL default port - }, + let test_connection = PostgreSQLConnectionScore { + name: "harmony-postgres-example".to_string(), + namespace: "harmony-public-postgres".to_string(), + cluster_name: "harmony-postgres-example".to_string(), + hostname: Some("postgrestest.sto1.nationtech.io".to_string()), + port_override: Some(443), }; harmony_cli::run( Inventory::autoload(), K8sAnywhereTopology::from_env(), - vec![Box::new(postgresql), Box::new(tls_passthrough)], + vec![Box::new(postgres), Box::new(test_connection)], None, ) .await diff --git a/harmony/src/domain/topology/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs similarity index 99% rename from harmony/src/domain/topology/k8s_anywhere.rs rename to harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs index 5739d57..694ce85 100644 --- a/harmony/src/domain/topology/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs @@ -35,7 +35,6 @@ use crate::{ service_monitor::ServiceMonitor, }, }, - network::TlsPassthroughScore, okd::route::OKDTlsPassthroughScore, prometheus::{ k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore, @@ -46,7 +45,7 @@ use crate::{ topology::{TlsRoute, TlsRouter, ingress::Ingress}, }; -use super::{ +use super::super::{ DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, PreparationError, PreparationOutcome, Topology, k8s::K8sClient, diff --git a/harmony/src/domain/topology/k8s_anywhere/mod.rs b/harmony/src/domain/topology/k8s_anywhere/mod.rs new file mode 100644 index 0000000..be87082 --- /dev/null +++ b/harmony/src/domain/topology/k8s_anywhere/mod.rs @@ -0,0 +1,3 @@ +mod k8s_anywhere; +mod postgres; +pub use k8s_anywhere::*; diff --git a/harmony/src/domain/topology/k8s_anywhere/postgres.rs b/harmony/src/domain/topology/k8s_anywhere/postgres.rs new file mode 100644 index 0000000..2774fc3 --- /dev/null +++ b/harmony/src/domain/topology/k8s_anywhere/postgres.rs @@ -0,0 +1,37 @@ +use async_trait::async_trait; + +use crate::{ + modules::postgresql::capability::{ + PostgreSQL, PostgreSQLConfig, PostgreSQLEndpoint, ReplicationCerts, + }, + topology::K8sAnywhereTopology, +}; + +#[async_trait] +impl PostgreSQL for K8sAnywhereTopology { + async fn deploy(&self, config: &PostgreSQLConfig) -> Result { + todo!() + } + + /// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster. + /// Abstracts away storage/retrieval details (e.g., secrets, files). + async fn get_replication_certs(&self, cluster_name: &str) -> Result { + todo!() + } + + /// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster. + async fn get_endpoint(&self, cluster_name: &str) -> Result { + todo!() + } + + /// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough). + /// Returns None if no public endpoint (internal-only cluster). + /// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex + /// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait. + async fn get_public_endpoint( + &self, + cluster_name: &str, + ) -> Result, String> { + todo!() + } +} diff --git a/harmony/src/modules/postgresql/mod.rs b/harmony/src/modules/postgresql/mod.rs index 11c21aa..ea50106 100644 --- a/harmony/src/modules/postgresql/mod.rs +++ b/harmony/src/modules/postgresql/mod.rs @@ -1,5 +1,7 @@ pub mod capability; mod score; +mod score_connect; +pub use score_connect::*; pub use score::*; mod score_public; pub use score_public::*; diff --git a/harmony/src/modules/postgresql/score_connect.rs b/harmony/src/modules/postgresql/score_connect.rs new file mode 100644 index 0000000..fa72ba5 --- /dev/null +++ b/harmony/src/modules/postgresql/score_connect.rs @@ -0,0 +1,299 @@ +use async_trait::async_trait; +use k8s_openapi::ByteString; +use k8s_openapi::api::core::v1::Secret; +use log::{debug, error, info, trace}; +use serde::Serialize; +use std::collections::BTreeMap; +use std::io::Write; +use std::path::{Path, PathBuf}; +use std::process::Stdio; +use tokio::process::Command; + +use crate::data::Version; +use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}; +use crate::inventory::Inventory; +use crate::score::Score; +use crate::topology::{K8sclient, Topology}; +use harmony_types::id::Id; + +#[derive(Debug, Clone, Serialize)] +pub struct PostgreSQLConnectionScore { + pub name: String, + pub namespace: String, + pub cluster_name: String, + pub hostname: Option, + pub port_override: Option, +} + +fn decode_secret(data: &BTreeMap, key: &str) -> Result { + let val = data + .get(key) + .ok_or_else(|| InterpretError::new(format!("Secret missing key {}", key)))?; + String::from_utf8(val.0.clone()) + .map_err(|e| InterpretError::new(format!("Failed to decode {}: {}", key, e))) +} + +impl PostgreSQLConnectionScore { + pub fn new(namespace: &str, cluster_name: &str, hostname_override: Option) -> Self { + Self { + name: format!("postgres-connection-{}", cluster_name), + namespace: namespace.to_string(), + cluster_name: cluster_name.to_string(), + hostname: hostname_override, + port_override: None, + } + } +} + +impl Score for PostgreSQLConnectionScore { + fn create_interpret(&self) -> Box> { + Box::new(PostgreSQLConnectionInterpret { + score: self.clone(), + }) + } + + fn name(&self) -> String { + format!("PostgreSQLConnectionScore : {}", self.name) + } +} + +#[derive(Debug, Clone)] +struct PostgreSQLConnectionInterpret { + score: PostgreSQLConnectionScore, +} + +impl PostgreSQLConnectionInterpret { + async fn fetch_app_secret(&self, topo: &T) -> Result { + let app_secret_name = format!("{}-app", self.score.cluster_name); + info!("Fetching app secret {}", app_secret_name); + + let k8s_client = topo.k8s_client().await?; + k8s_client + .get_resource(&app_secret_name, Some(&self.score.namespace)) + .await + .map_err(|e| InterpretError::new(format!("Failed to get app secret: {e}")))? + .ok_or_else(|| InterpretError::new(format!("App secret {} not found", app_secret_name))) + } + + async fn fetch_ca_secret(&self, topo: &T) -> Result { + let ca_secret_name = format!("{}-ca", self.score.cluster_name); + info!("Fetching CA secret {}", ca_secret_name); + + let k8s_client = topo.k8s_client().await?; + k8s_client + .get_resource(&ca_secret_name, Some(&self.score.namespace)) + .await + .map_err(|e| InterpretError::new(format!("Failed to get CA secret: {e}")))? + .ok_or_else(|| InterpretError::new(format!("CA secret {} not found", ca_secret_name))) + } + + fn get_secret_data( + &self, + secret: &Secret, + secret_type: &str, + ) -> Result, InterpretError> { + secret + .data + .as_ref() + .ok_or_else(|| InterpretError::new(format!("{} secret has no data", secret_type))) + .map(|b| b.clone()) + } + + fn create_temp_dir(&self) -> Result { + tempfile::Builder::new() + .prefix("pg-connection-test-") + .tempdir() + .map_err(|e| InterpretError::new(format!("Failed to create temp directory: {e}"))) + } + + fn write_ca_cert( + &self, + temp_dir: &Path, + ca_data: &BTreeMap, + ) -> Result { + let ca_crt = ca_data + .get("ca.crt") + .ok_or_else(|| InterpretError::new("CA secret missing ca.crt".to_string()))?; + let ca_file = temp_dir.join("ca.crt"); + + std::fs::write(&ca_file, &ca_crt.0) + .map_err(|e| InterpretError::new(format!("Failed to write CA cert: {e}")))?; + + Ok(ca_file) + } + + fn get_host(&self, data: &BTreeMap) -> Result { + self.score + .hostname + .clone() + .or_else(|| decode_secret(data, "host").ok()) + .ok_or_else(|| { + InterpretError::new("No hostname found in secret or override".to_string()) + }) + } + + fn get_port(&self, data: &BTreeMap) -> Result { + self.score + .port_override + .or_else(|| { + decode_secret(data, "port") + .ok() + .and_then(|p| p.parse().ok()) + }) + .ok_or_else(|| InterpretError::new("Port not found in secret or override".to_string())) + } + + fn create_test_script( + &self, + temp_dir: &Path, + ca_file: &Path, + username: &str, + password: &str, + dbname: &str, + host: &str, + port: u16, + ) -> Result { + let script_path = temp_dir.join("test_connection.sh"); + let ca_file_in_container = Path::new("/tmp").join(ca_file.file_name().unwrap()); + let script_content = format!( + "#!/bin/sh\n\\ + psql \"host={} port={} user={} dbname={} sslmode=verify-ca sslrootcert={} sslnegotiation=direct\" -c \"SELECT 1\"", + host, + port, + username, + dbname, + ca_file_in_container.display() + ); + + debug!("Wrote script content : \n{script_content}"); + std::fs::write(&script_path, script_content) + .map_err(|e| InterpretError::new(format!("Failed to write test script: {e}")))?; + + #[cfg(unix)] + { + use std::os::unix::fs::PermissionsExt; + let mut perms = std::fs::metadata(&script_path) + .map_err(|e| InterpretError::new(format!("Failed to get script metadata: {e}")))? + .permissions(); + perms.set_mode(0o755); + std::fs::set_permissions(&script_path, perms).map_err(|e| { + InterpretError::new(format!("Failed to set script permissions: {e}")) + })?; + } + + Ok(script_path) + } + + async fn run_docker_test( + &self, + temp_dir: &Path, + script_path: &Path, + password: &str, + ) -> Result { + info!("Running connection test in Docker container..."); + + let output = Command::new("docker") + .arg("run") + .arg("--rm") + .arg("-i") + .arg("-v") + .arg(format!("{}/:/tmp", temp_dir.display())) + .arg("--workdir") + .arg("/tmp") + .arg("--entrypoint") + .arg("/bin/sh") + .arg("postgres:latest") + .arg("-c") + .arg(format!("PGPASSWORD={} /tmp/test_connection.sh", password)) + .env("PGPASSWORD", password) + .stdout(std::process::Stdio::inherit()) + .stderr(std::process::Stdio::inherit()) + .spawn() + .map_err(|e| InterpretError::new(format!("Failed to spawn docker container: {e}")))? + .wait_with_output() + .await + .map_err(|e| { + InterpretError::new(format!("Failed to wait for docker container: {e}")) + })?; + + if output.status.success() { + info!("Successfully connected to PostgreSQL!"); + Ok(Outcome::success("Connection successful".to_string())) + } else { + let stderr = String::from_utf8_lossy(&output.stderr); + error!("Connection failed: {}", stderr); + Err(InterpretError::new(format!( + "Connection failed: {}", + stderr + ))) + } + } +} + +#[async_trait] +impl Interpret for PostgreSQLConnectionInterpret { + fn get_name(&self) -> InterpretName { + InterpretName::Custom("PostgreSQLConnectionInterpret") + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + vec![] + } + + async fn execute(&self, _inventory: &Inventory, topo: &T) -> Result { + // Fetch secrets + let app_secret = self.fetch_app_secret(topo).await?; + trace!("Got app_secret {app_secret:?}"); + let ca_secret = self.fetch_ca_secret(topo).await?; + trace!("Got ca_secret {ca_secret:?}"); + + // Get secret data + let app_data = self.get_secret_data(&app_secret, "App")?; + trace!("Got app_data {app_data:?}"); + let ca_data = self.get_secret_data(&ca_secret, "CA")?; + trace!("Got ca_data {ca_data:?}"); + + // Create temp directory + let temp_dir = self.create_temp_dir()?; + let temp_dir_path = temp_dir.path(); + debug!("Created temp dir {temp_dir_path:?}"); + + // Write CA cert + let ca_file = self.write_ca_cert(temp_dir_path, &ca_data)?; + debug!("Wrote ca_file {ca_file:?}"); + + // Get connection details + let username = decode_secret(&app_data, "username")?; + let password = decode_secret(&app_data, "password")?; + let dbname = decode_secret(&app_data, "dbname")?; + let host = self.get_host(&app_data)?; + let port = self.get_port(&app_data)?; + + + // Create test script + let script_path = self.create_test_script( + temp_dir_path, + &ca_file, + &username, + &password, + &dbname, + &host, + port, + )?; + + debug!("Prepared test script in {}", temp_dir_path.display()); + tokio::time::sleep(std::time::Duration::from_secs(10)).await; + + // Run connection test + self.run_docker_test(temp_dir_path, &script_path, &password) + .await + } +} diff --git a/harmony/src/modules/postgresql/score_public.rs b/harmony/src/modules/postgresql/score_public.rs index 8202176..d729005 100644 --- a/harmony/src/modules/postgresql/score_public.rs +++ b/harmony/src/modules/postgresql/score_public.rs @@ -37,6 +37,30 @@ impl PublicPostgreSQLScore { } } +impl Score for PublicPostgreSQLScore { + fn create_interpret(&self) -> Box> { + let rw_backend = format!("{}-rw", self.postgres_score.name); + let tls_route = TlsRoute { + namespace: self.postgres_score.namespace.clone(), + hostname: self.hostname.clone(), + backend: rw_backend, + target_port: 5432, + }; + + Box::new(PublicPostgreSQLInterpret { + postgres_score: self.postgres_score.clone(), + tls_route, + }) + } + + fn name(&self) -> String { + format!( + "PublicPostgreSQLScore({}:{})", + self.postgres_score.namespace, self.hostname + ) + } +} + /// Custom interpret: deploy Postgres then install public TLS route. #[derive(Debug, Clone)] struct PublicPostgreSQLInterpret { @@ -74,27 +98,3 @@ impl Interpret for PublicP ))) } } - -impl Score for PublicPostgreSQLScore { - fn create_interpret(&self) -> Box> { - let rw_backend = format!("{}-rw", self.postgres_score.name); - let tls_route = TlsRoute { - namespace: self.postgres_score.namespace.clone(), - hostname: self.hostname.clone(), - backend: rw_backend, - target_port: 5432, - }; - - Box::new(PublicPostgreSQLInterpret { - postgres_score: self.postgres_score.clone(), - tls_route, - }) - } - - fn name(&self) -> String { - format!( - "PublicPostgreSQLScore({}:{})", - self.postgres_score.namespace, self.hostname - ) - } -}