feat: PostgreSQL public and Connection test score, also moved k8s_anywhere in a folder
Some checks failed
Run Check Script / check (pull_request) Failing after 40s

This commit is contained in:
2025-12-16 14:57:02 -05:00
parent 29821d5e9f
commit c3ec7070ec
7 changed files with 383 additions and 42 deletions

View File

@@ -1,32 +1,33 @@
use harmony::{
inventory::Inventory,
modules::{network::TlsPassthroughScore, postgresql::PostgreSQLScore},
topology::{K8sAnywhereTopology, TlsRoute},
modules::postgresql::{PostgreSQLConnectionScore, PostgreSQLScore, PublicPostgreSQLScore},
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let namespace = "harmony-postgres-example".to_string();
let postgresql = PostgreSQLScore {
name: "harmony-postgres-example".to_string(), // Override default name
namespace: namespace.clone(),
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
// "default" namespace, 1 instance, 1Gi storage
let postgres = PublicPostgreSQLScore {
postgres_score: PostgreSQLScore {
name: "harmony-postgres-example".to_string(), // Override default name
namespace: "harmony-public-postgres".to_string(),
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
// "default" namespace, 1 instance, 1Gi storage
},
hostname: "postgrestest.sto1.nationtech.io".to_string(),
};
let tls_passthrough = TlsPassthroughScore {
route: TlsRoute {
hostname: "postgres.example.com".to_string(), // CNPG creates a -rw service for read-write endpoint
backend: format!("{}-rw", postgresql.name), // Public hostname for TLS SNI
namespace: namespace.clone(),
target_port: 5432, // PostgreSQL default port
},
let test_connection = PostgreSQLConnectionScore {
name: "harmony-postgres-example".to_string(),
namespace: "harmony-public-postgres".to_string(),
cluster_name: "harmony-postgres-example".to_string(),
hostname: Some("postgrestest.sto1.nationtech.io".to_string()),
port_override: Some(443),
};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(postgresql), Box::new(tls_passthrough)],
vec![Box::new(postgres), Box::new(test_connection)],
None,
)
.await

View File

@@ -35,7 +35,6 @@ use crate::{
service_monitor::ServiceMonitor,
},
},
network::TlsPassthroughScore,
okd::route::OKDTlsPassthroughScore,
prometheus::{
k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore,
@@ -46,7 +45,7 @@ use crate::{
topology::{TlsRoute, TlsRouter, ingress::Ingress},
};
use super::{
use super::super::{
DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, PreparationError,
PreparationOutcome, Topology,
k8s::K8sClient,

View File

@@ -0,0 +1,3 @@
mod k8s_anywhere;
mod postgres;
pub use k8s_anywhere::*;

View File

@@ -0,0 +1,37 @@
use async_trait::async_trait;
use crate::{
modules::postgresql::capability::{
PostgreSQL, PostgreSQLConfig, PostgreSQLEndpoint, ReplicationCerts,
},
topology::K8sAnywhereTopology,
};
#[async_trait]
impl PostgreSQL for K8sAnywhereTopology {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
todo!()
}
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
/// Abstracts away storage/retrieval details (e.g., secrets, files).
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> {
todo!()
}
/// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster.
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String> {
todo!()
}
/// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough).
/// Returns None if no public endpoint (internal-only cluster).
/// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex
/// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait.
async fn get_public_endpoint(
&self,
cluster_name: &str,
) -> Result<Option<PostgreSQLEndpoint>, String> {
todo!()
}
}

View File

@@ -1,5 +1,7 @@
pub mod capability;
mod score;
mod score_connect;
pub use score_connect::*;
pub use score::*;
mod score_public;
pub use score_public::*;

View File

@@ -0,0 +1,299 @@
use async_trait::async_trait;
use k8s_openapi::ByteString;
use k8s_openapi::api::core::v1::Secret;
use log::{debug, error, info, trace};
use serde::Serialize;
use std::collections::BTreeMap;
use std::io::Write;
use std::path::{Path, PathBuf};
use std::process::Stdio;
use tokio::process::Command;
use crate::data::Version;
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory;
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
use harmony_types::id::Id;
#[derive(Debug, Clone, Serialize)]
pub struct PostgreSQLConnectionScore {
pub name: String,
pub namespace: String,
pub cluster_name: String,
pub hostname: Option<String>,
pub port_override: Option<u16>,
}
fn decode_secret(data: &BTreeMap<String, ByteString>, key: &str) -> Result<String, InterpretError> {
let val = data
.get(key)
.ok_or_else(|| InterpretError::new(format!("Secret missing key {}", key)))?;
String::from_utf8(val.0.clone())
.map_err(|e| InterpretError::new(format!("Failed to decode {}: {}", key, e)))
}
impl PostgreSQLConnectionScore {
pub fn new(namespace: &str, cluster_name: &str, hostname_override: Option<String>) -> Self {
Self {
name: format!("postgres-connection-{}", cluster_name),
namespace: namespace.to_string(),
cluster_name: cluster_name.to_string(),
hostname: hostname_override,
port_override: None,
}
}
}
impl<T: Topology + K8sclient + Send + Sync> Score<T> for PostgreSQLConnectionScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(PostgreSQLConnectionInterpret {
score: self.clone(),
})
}
fn name(&self) -> String {
format!("PostgreSQLConnectionScore : {}", self.name)
}
}
#[derive(Debug, Clone)]
struct PostgreSQLConnectionInterpret {
score: PostgreSQLConnectionScore,
}
impl PostgreSQLConnectionInterpret {
async fn fetch_app_secret<T: K8sclient>(&self, topo: &T) -> Result<Secret, InterpretError> {
let app_secret_name = format!("{}-app", self.score.cluster_name);
info!("Fetching app secret {}", app_secret_name);
let k8s_client = topo.k8s_client().await?;
k8s_client
.get_resource(&app_secret_name, Some(&self.score.namespace))
.await
.map_err(|e| InterpretError::new(format!("Failed to get app secret: {e}")))?
.ok_or_else(|| InterpretError::new(format!("App secret {} not found", app_secret_name)))
}
async fn fetch_ca_secret<T: K8sclient>(&self, topo: &T) -> Result<Secret, InterpretError> {
let ca_secret_name = format!("{}-ca", self.score.cluster_name);
info!("Fetching CA secret {}", ca_secret_name);
let k8s_client = topo.k8s_client().await?;
k8s_client
.get_resource(&ca_secret_name, Some(&self.score.namespace))
.await
.map_err(|e| InterpretError::new(format!("Failed to get CA secret: {e}")))?
.ok_or_else(|| InterpretError::new(format!("CA secret {} not found", ca_secret_name)))
}
fn get_secret_data(
&self,
secret: &Secret,
secret_type: &str,
) -> Result<BTreeMap<String, ByteString>, InterpretError> {
secret
.data
.as_ref()
.ok_or_else(|| InterpretError::new(format!("{} secret has no data", secret_type)))
.map(|b| b.clone())
}
fn create_temp_dir(&self) -> Result<tempfile::TempDir, InterpretError> {
tempfile::Builder::new()
.prefix("pg-connection-test-")
.tempdir()
.map_err(|e| InterpretError::new(format!("Failed to create temp directory: {e}")))
}
fn write_ca_cert(
&self,
temp_dir: &Path,
ca_data: &BTreeMap<String, ByteString>,
) -> Result<PathBuf, InterpretError> {
let ca_crt = ca_data
.get("ca.crt")
.ok_or_else(|| InterpretError::new("CA secret missing ca.crt".to_string()))?;
let ca_file = temp_dir.join("ca.crt");
std::fs::write(&ca_file, &ca_crt.0)
.map_err(|e| InterpretError::new(format!("Failed to write CA cert: {e}")))?;
Ok(ca_file)
}
fn get_host(&self, data: &BTreeMap<String, ByteString>) -> Result<String, InterpretError> {
self.score
.hostname
.clone()
.or_else(|| decode_secret(data, "host").ok())
.ok_or_else(|| {
InterpretError::new("No hostname found in secret or override".to_string())
})
}
fn get_port(&self, data: &BTreeMap<String, ByteString>) -> Result<u16, InterpretError> {
self.score
.port_override
.or_else(|| {
decode_secret(data, "port")
.ok()
.and_then(|p| p.parse().ok())
})
.ok_or_else(|| InterpretError::new("Port not found in secret or override".to_string()))
}
fn create_test_script(
&self,
temp_dir: &Path,
ca_file: &Path,
username: &str,
password: &str,
dbname: &str,
host: &str,
port: u16,
) -> Result<PathBuf, InterpretError> {
let script_path = temp_dir.join("test_connection.sh");
let ca_file_in_container = Path::new("/tmp").join(ca_file.file_name().unwrap());
let script_content = format!(
"#!/bin/sh\n\\
psql \"host={} port={} user={} dbname={} sslmode=verify-ca sslrootcert={} sslnegotiation=direct\" -c \"SELECT 1\"",
host,
port,
username,
dbname,
ca_file_in_container.display()
);
debug!("Wrote script content : \n{script_content}");
std::fs::write(&script_path, script_content)
.map_err(|e| InterpretError::new(format!("Failed to write test script: {e}")))?;
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = std::fs::metadata(&script_path)
.map_err(|e| InterpretError::new(format!("Failed to get script metadata: {e}")))?
.permissions();
perms.set_mode(0o755);
std::fs::set_permissions(&script_path, perms).map_err(|e| {
InterpretError::new(format!("Failed to set script permissions: {e}"))
})?;
}
Ok(script_path)
}
async fn run_docker_test(
&self,
temp_dir: &Path,
script_path: &Path,
password: &str,
) -> Result<Outcome, InterpretError> {
info!("Running connection test in Docker container...");
let output = Command::new("docker")
.arg("run")
.arg("--rm")
.arg("-i")
.arg("-v")
.arg(format!("{}/:/tmp", temp_dir.display()))
.arg("--workdir")
.arg("/tmp")
.arg("--entrypoint")
.arg("/bin/sh")
.arg("postgres:latest")
.arg("-c")
.arg(format!("PGPASSWORD={} /tmp/test_connection.sh", password))
.env("PGPASSWORD", password)
.stdout(std::process::Stdio::inherit())
.stderr(std::process::Stdio::inherit())
.spawn()
.map_err(|e| InterpretError::new(format!("Failed to spawn docker container: {e}")))?
.wait_with_output()
.await
.map_err(|e| {
InterpretError::new(format!("Failed to wait for docker container: {e}"))
})?;
if output.status.success() {
info!("Successfully connected to PostgreSQL!");
Ok(Outcome::success("Connection successful".to_string()))
} else {
let stderr = String::from_utf8_lossy(&output.stderr);
error!("Connection failed: {}", stderr);
Err(InterpretError::new(format!(
"Connection failed: {}",
stderr
)))
}
}
}
#[async_trait]
impl<T: Topology + K8sclient + Send + Sync> Interpret<T> for PostgreSQLConnectionInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("PostgreSQLConnectionInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
vec![]
}
async fn execute(&self, _inventory: &Inventory, topo: &T) -> Result<Outcome, InterpretError> {
// Fetch secrets
let app_secret = self.fetch_app_secret(topo).await?;
trace!("Got app_secret {app_secret:?}");
let ca_secret = self.fetch_ca_secret(topo).await?;
trace!("Got ca_secret {ca_secret:?}");
// Get secret data
let app_data = self.get_secret_data(&app_secret, "App")?;
trace!("Got app_data {app_data:?}");
let ca_data = self.get_secret_data(&ca_secret, "CA")?;
trace!("Got ca_data {ca_data:?}");
// Create temp directory
let temp_dir = self.create_temp_dir()?;
let temp_dir_path = temp_dir.path();
debug!("Created temp dir {temp_dir_path:?}");
// Write CA cert
let ca_file = self.write_ca_cert(temp_dir_path, &ca_data)?;
debug!("Wrote ca_file {ca_file:?}");
// Get connection details
let username = decode_secret(&app_data, "username")?;
let password = decode_secret(&app_data, "password")?;
let dbname = decode_secret(&app_data, "dbname")?;
let host = self.get_host(&app_data)?;
let port = self.get_port(&app_data)?;
// Create test script
let script_path = self.create_test_script(
temp_dir_path,
&ca_file,
&username,
&password,
&dbname,
&host,
port,
)?;
debug!("Prepared test script in {}", temp_dir_path.display());
tokio::time::sleep(std::time::Duration::from_secs(10)).await;
// Run connection test
self.run_docker_test(temp_dir_path, &script_path, &password)
.await
}
}

View File

@@ -37,6 +37,30 @@ impl PublicPostgreSQLScore {
}
}
impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Score<T> for PublicPostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let rw_backend = format!("{}-rw", self.postgres_score.name);
let tls_route = TlsRoute {
namespace: self.postgres_score.namespace.clone(),
hostname: self.hostname.clone(),
backend: rw_backend,
target_port: 5432,
};
Box::new(PublicPostgreSQLInterpret {
postgres_score: self.postgres_score.clone(),
tls_route,
})
}
fn name(&self) -> String {
format!(
"PublicPostgreSQLScore({}:{})",
self.postgres_score.namespace, self.hostname
)
}
}
/// Custom interpret: deploy Postgres then install public TLS route.
#[derive(Debug, Clone)]
struct PublicPostgreSQLInterpret {
@@ -74,27 +98,3 @@ impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Interpret<T> for PublicP
)))
}
}
impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Score<T> for PublicPostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let rw_backend = format!("{}-rw", self.postgres_score.name);
let tls_route = TlsRoute {
namespace: self.postgres_score.namespace.clone(),
hostname: self.hostname.clone(),
backend: rw_backend,
target_port: 5432,
};
Box::new(PublicPostgreSQLInterpret {
postgres_score: self.postgres_score.clone(),
tls_route,
})
}
fn name(&self) -> String {
format!(
"PublicPostgreSQLScore({}:{})",
self.postgres_score.namespace, self.hostname
)
}
}