Compare commits

..

19 Commits

Author SHA1 Message Date
b0383454f0 feat(types): Add utility initialization functions for StorageSize such as StorageSize::kb(324)
Some checks failed
Run Check Script / check (pull_request) Failing after 41s
2025-12-16 16:24:53 -05:00
9e8f3ce52f feat(postgres): Postgres Connection Test score now has a script that provides more insight. Not quite working properly but easy to improve at this point.
Some checks failed
Run Check Script / check (pull_request) Failing after 43s
2025-12-16 15:53:54 -05:00
c3ec7070ec feat: PostgreSQL public and Connection test score, also moved k8s_anywhere in a folder
Some checks failed
Run Check Script / check (pull_request) Failing after 40s
2025-12-16 14:57:02 -05:00
29821d5e9f feat: TlsPassthroughScore works, improved logging, fixed CRD
Some checks failed
Run Check Script / check (pull_request) Failing after 35s
2025-12-15 19:09:10 -05:00
446e079595 wip: public postgres many fixes and refactoring to have a more cohesive routing management
Some checks failed
Run Check Script / check (pull_request) Failing after 41s
2025-12-15 17:04:30 -05:00
e0da5764fb feat(types): Added Rfc1123 String type, useful for k8s names
Some checks failed
Run Check Script / check (pull_request) Failing after 38s
2025-12-15 12:57:52 -05:00
e9cab92585 feat: Impl TlsRoute for K8sAnywhereTopology 2025-12-14 22:22:09 -05:00
d06bd4dac6 feat: OKD route CRD and OKD specific route score
All checks were successful
Run Check Script / check (pull_request) Successful in 1m30s
2025-12-14 17:05:26 -05:00
142300802d wip: TlsRoute score first version
Some checks failed
Run Check Script / check (pull_request) Failing after 1m11s
2025-12-14 06:19:33 -05:00
2254641f3d fix: Tests, doctests, formatting
All checks were successful
Run Check Script / check (pull_request) Successful in 1m38s
2025-12-13 17:56:53 -05:00
b61e4f9a96 wip: Expose postgres publicly. Created tlsroute capability and postgres implementations
Some checks failed
Run Check Script / check (pull_request) Failing after 41s
2025-12-13 09:47:59 -05:00
2e367d88d4 feat: PostgreSQL score works, added postgresql example, tested on OKD 4.19, added note about incompatible default namespace settings
Some checks failed
Run Check Script / check (pull_request) Failing after 2m37s
2025-12-11 22:54:57 -05:00
9edc42a665 feat: PostgreSQLScore happy path using cnpg operator
Some checks failed
Run Check Script / check (pull_request) Failing after 37s
2025-12-11 14:36:39 -05:00
f242aafebb feat: Subscription for cnpg-operator fixed default values, tested and added to operatorhub example.
All checks were successful
Run Check Script / check (pull_request) Successful in 1m31s
2025-12-11 12:18:28 -05:00
3e14ebd62c feat: cnpg operator score
All checks were successful
Run Check Script / check (pull_request) Successful in 1m36s
2025-12-10 22:55:08 -05:00
1b19638df4 wip(failover): Started implementation of the FailoverTopology with PostgreSQL capability
All checks were successful
Run Check Script / check (pull_request) Successful in 1m32s
This is our first Higher Order Topology (see ADR-015)
2025-12-10 21:15:51 -05:00
d39b1957cd feat(k8s_app): OperatorhubCatalogSourceScore can now install the operatorhub catalogsource on a cluster that already has operator lifecycle manager installed 2025-12-10 16:58:58 -05:00
357ca93d90 wip: FailoverTopology implementation for PostgreSQL on the way! 2025-12-10 13:12:53 -05:00
8103932f23 doc: Initial documentation for the MultisitePostgreSQL module 2025-12-10 13:12:53 -05:00
48 changed files with 3000 additions and 400 deletions

30
Cargo.lock generated
View File

@@ -1835,6 +1835,21 @@ dependencies = [
"url",
]
[[package]]
name = "example-operatorhub-catalogsource"
version = "0.1.0"
dependencies = [
"cidr",
"env_logger",
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_types",
"log",
"tokio",
"url",
]
[[package]]
name = "example-opnsense"
version = "0.1.0"
@@ -6049,21 +6064,6 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683"
[[package]]
name = "test-score"
version = "0.1.0"
dependencies = [
"base64 0.22.1",
"env_logger",
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_types",
"log",
"tokio",
"url",
]
[[package]]
name = "thiserror"
version = "1.0.69"

View File

@@ -0,0 +1,105 @@
# Design Document: Harmony PostgreSQL Module
**Status:** Draft
**Last Updated:** 2025-12-01
**Context:** Multi-site Data Replication & Orchestration
## 1. Overview
The Harmony PostgreSQL Module provides a high-level abstraction for deploying and managing high-availability PostgreSQL clusters across geographically distributed Kubernetes/OKD sites.
Instead of manually configuring complex replication slots, firewalls, and operator settings on each cluster, users define a single intent (a **Score**), and Harmony orchestrates the underlying infrastructure (the **Arrangement**) to establish a Primary-Replica architecture.
Currently, the implementation relies on the **CloudNativePG (CNPG)** operator as the backing engine.
## 2. Architecture
### 2.1 The Abstraction Model
Following **ADR 003 (Infrastructure Abstraction)**, Harmony separates the *intent* from the *implementation*.
1. **The Score (Intent):** The user defines a `MultisitePostgreSQL` resource. This describes *what* is needed (e.g., "A Postgres 15 cluster with 10GB storage, Primary on Site A, Replica on Site B").
2. **The Interpret (Action):** Harmony MultisitePostgreSQLInterpret processes this Score and orchestrates the deployment on both sites to reach the state defined in the Score.
3. **The Capability (Implementation):** The PostgreSQL Capability is implemented by the K8sTopology and the interpret can deploy it, configure it and fetch information about it. The concrete implementation will rely on the mature CloudnativePG operator to manage all the Kubernetes resources required.
### 2.2 Network Connectivity (TLS Passthrough)
One of the critical challenges in multi-site orchestration is secure connectivity between clusters that may have dynamic IPs or strict firewalls.
To solve this, we utilize **OKD/OpenShift Routes with TLS Passthrough**.
* **Mechanism:** The Primary site exposes a `Route` configured for `termination: passthrough`.
* **Routing:** The OpenShift HAProxy router inspects the **SNI (Server Name Indication)** header of the incoming TCP connection to route traffic to the correct PostgreSQL Pod.
* **Security:** SSL is **not** terminated at the ingress router. The encrypted stream is passed directly to the PostgreSQL instance. Mutual TLS (mTLS) authentication is handled natively by CNPG between the Primary and Replica instances.
* **Dynamic IPs:** Because connections are established via DNS hostnames (the Route URL), this architecture is resilient to dynamic IP changes at the Primary site.
#### Traffic Flow Diagram
```text
[ Site B: Replica ] [ Site A: Primary ]
| |
(CNPG Instance) --[Encrypted TCP]--> (OKD HAProxy Router)
| (Port 443) |
| |
| [SNI Inspection]
| |
| v
| (PostgreSQL Primary Pod)
| (Port 5432)
```
## 3. Design Decisions
### Why CloudNativePG?
We selected CloudNativePG because it relies exclusively on standard Kubernetes primitives and uses the native PostgreSQL replication protocol (WAL shipping/Streaming). This aligns with Harmony's goal of being "K8s Native."
### Why TLS Passthrough instead of VPN/NodePort?
* **NodePort:** Requires static IPs and opening non-standard ports on the firewall, which violates our security constraints.
* **VPN (e.g., Wireguard/Tailscale):** While secure, it introduces significant complexity (sidecars, key management) and external dependencies.
* **TLS Passthrough:** Leverages the existing Ingress/Router infrastructure already present in OKD. It requires zero additional software and respects multi-tenancy (Routes are namespaced).
### Configuration Philosophy (YAGNI)
The current design exposes a **generic configuration surface**. Users can configure standard parameters (Storage size, CPU/Memory requests, Postgres version).
**We explicitly do not expose advanced CNPG or PostgreSQL configurations at this stage.**
* **Reasoning:** We aim to keep the API surface small and manageable.
* **Future Path:** We plan to implement a "pass-through" mechanism to allow sending raw config maps or custom parameters to the underlying engine (CNPG) *only when a concrete use case arises*. Until then, we adhere to the **YAGNI (You Ain't Gonna Need It)** principle to avoid premature optimization and API bloat.
## 4. Usage Guide
To deploy a multi-site cluster, apply the `MultisitePostgreSQL` resource to the Harmony Control Plane.
### Example Manifest
```yaml
apiVersion: harmony.io/v1alpha1
kind: MultisitePostgreSQL
metadata:
name: finance-db
namespace: tenant-a
spec:
version: "15"
storage: "10Gi"
resources:
requests:
cpu: "500m"
memory: "1Gi"
# Topology Definition
topology:
primary:
site: "site-paris" # The name of the cluster in Harmony
replicas:
- site: "site-newyork"
```
### What happens next?
1. Harmony detects the CR.
2. **On Site Paris:** It deploys a CNPG Cluster (Primary) and creates a Passthrough Route `postgres-finance-db.apps.site-paris.example.com`.
3. **On Site New York:** It deploys a CNPG Cluster (Replica) configured with `externalClusters` pointing to the Paris Route.
4. Data begins replicating immediately over the encrypted channel.
## 5. Troubleshooting
* **Connection Refused:** Ensure the Primary site's Route is successfully admitted by the Ingress Controller.
* **Certificate Errors:** CNPG manages mTLS automatically. If errors persist, ensure the CA secrets were correctly propagated by Harmony from Primary to Replica namespaces.

View File

@@ -0,0 +1,18 @@
[package]
name = "example-operatorhub-catalogsource"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
cidr = { workspace = true }
tokio = { workspace = true }
harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }

View File

@@ -0,0 +1,22 @@
use std::str::FromStr;
use harmony::{
inventory::Inventory,
modules::{k8s::apps::OperatorHubCatalogSourceScore, postgresql::CloudNativePgOperatorScore},
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let operatorhub_catalog = OperatorHubCatalogSourceScore::default();
let cnpg_operator = CloudNativePgOperatorScore::default();
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(operatorhub_catalog), Box::new(cnpg_operator)],
None,
)
.await
.unwrap();
}

View File

@@ -0,0 +1,18 @@
[package]
name = "example-postgresql"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
cidr = { workspace = true }
tokio = { workspace = true }
harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }

View File

@@ -0,0 +1,22 @@
use harmony::{
inventory::Inventory, modules::postgresql::PostgreSQLScore, topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let postgresql = PostgreSQLScore {
name: "harmony-postgres-example".to_string(), // Override default name
namespace: "harmony-postgres-example".to_string(),
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
// "default" namespace, 1 instance, 1Gi storage
};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(postgresql)],
None,
)
.await
.unwrap();
}

View File

@@ -0,0 +1,18 @@
[package]
name = "example-public-postgres"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
cidr = { workspace = true }
tokio = { workspace = true }
harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }

View File

@@ -0,0 +1,35 @@
use harmony::{
inventory::Inventory,
modules::postgresql::{PostgreSQLConnectionScore, PostgreSQLScore, PublicPostgreSQLScore},
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let postgres = PublicPostgreSQLScore {
postgres_score: PostgreSQLScore {
name: "harmony-postgres-example".to_string(), // Override default name
namespace: "harmony-public-postgres".to_string(),
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
// "default" namespace, 1 instance, 1Gi storage
},
hostname: "postgrestest.sto1.nationtech.io".to_string(),
};
let test_connection = PostgreSQLConnectionScore {
name: "harmony-postgres-example".to_string(),
namespace: "harmony-public-postgres".to_string(),
cluster_name: "harmony-postgres-example".to_string(),
hostname: Some("postgrestest.sto1.nationtech.io".to_string()),
port_override: Some(443),
};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(postgres), Box::new(test_connection)],
None,
)
.await
.unwrap();
}

View File

@@ -152,6 +152,12 @@ pub struct InterpretError {
msg: String,
}
impl From<InterpretError> for String {
fn from(e: InterpretError) -> String {
format!("InterpretError : {}", e.msg)
}
}
impl std::fmt::Display for InterpretError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.msg)

View File

@@ -1,6 +1,4 @@
mod repository;
use std::fmt;
pub use repository::*;
#[derive(Debug, new, Clone)]
@@ -71,14 +69,5 @@ pub enum HostRole {
Bootstrap,
ControlPlane,
Worker,
}
impl fmt::Display for HostRole {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
HostRole::Bootstrap => write!(f, "Bootstrap"),
HostRole::ControlPlane => write!(f, "ControlPlane"),
HostRole::Worker => write!(f, "Worker"),
}
}
Storage,
}

View File

@@ -0,0 +1,19 @@
use async_trait::async_trait;
use crate::topology::{PreparationError, PreparationOutcome, Topology};
pub struct FailoverTopology<T> {
pub primary: T,
pub replica: T,
}
#[async_trait]
impl<T: Send + Sync> Topology for FailoverTopology<T> {
fn name(&self) -> &str {
"FailoverTopology"
}
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
todo!()
}
}

View File

@@ -451,7 +451,20 @@ impl K8sClient {
{
let mut result = Vec::new();
for r in resource.iter() {
result.push(self.apply(r, ns).await?);
let apply_result = self.apply(r, ns).await;
if apply_result.is_err() {
// NOTE : We should be careful about this one, it may leak sensitive information in
// logs
// Maybe just reducing it to debug would be enough as we already know debug logs
// are unsafe.
// But keeping it at warn makes it much easier to understand what is going on. So be it for now.
warn!(
"Failed to apply k8s resource : {}",
serde_json::to_string_pretty(r).map_err(|e| Error::SerdeError(e))?
);
}
result.push(apply_result?);
}
Ok(result)

View File

@@ -2,6 +2,7 @@ use std::{collections::BTreeMap, process::Command, sync::Arc, time::Duration};
use async_trait::async_trait;
use base64::{Engine, engine::general_purpose};
use harmony_types::rfc1123::Rfc1123Name;
use k8s_openapi::api::{
core::v1::Secret,
rbac::v1::{ClusterRoleBinding, RoleRef, Subject},
@@ -34,16 +35,17 @@ use crate::{
service_monitor::ServiceMonitor,
},
},
okd::route::OKDTlsPassthroughScore,
prometheus::{
k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore,
prometheus::PrometheusMonitoring, rhob_alerting_score::RHOBAlertingScore,
},
},
score::Score,
topology::ingress::Ingress,
topology::{TlsRoute, TlsRouter, ingress::Ingress},
};
use super::{
use super::super::{
DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, PreparationError,
PreparationOutcome, Topology,
k8s::K8sClient,
@@ -102,6 +104,31 @@ impl K8sclient for K8sAnywhereTopology {
}
}
#[async_trait]
impl TlsRouter for K8sAnywhereTopology {
async fn install_route(&self, route: TlsRoute) -> Result<(), String> {
let distro = self
.get_k8s_distribution()
.await
.map_err(|e| format!("Could not get k8s distribution {e}"))?;
match distro {
KubernetesDistribution::OpenshiftFamily => {
OKDTlsPassthroughScore {
name: Rfc1123Name::try_from(route.backend_info_string().as_str())?,
route,
}
.interpret(&Inventory::empty(), self)
.await?;
Ok(())
}
KubernetesDistribution::K3sFamily | KubernetesDistribution::Default => Err(format!(
"Distribution not supported yet for Tlsrouter {distro:?}"
)),
}
}
}
#[async_trait]
impl Grafana for K8sAnywhereTopology {
async fn ensure_grafana_operator(
@@ -343,6 +370,7 @@ impl K8sAnywhereTopology {
pub async fn get_k8s_distribution(&self) -> Result<&KubernetesDistribution, PreparationError> {
self.k8s_distribution
.get_or_try_init(async || {
debug!("Trying to detect k8s distribution");
let client = self.k8s_client().await.unwrap();
let discovery = client.discovery().await.map_err(|e| {
@@ -358,14 +386,17 @@ impl K8sAnywhereTopology {
.groups()
.any(|g| g.name() == "project.openshift.io")
{
info!("Found KubernetesDistribution OpenshiftFamily");
return Ok(KubernetesDistribution::OpenshiftFamily);
}
// K3d / K3s
if version.git_version.contains("k3s") {
info!("Found KubernetesDistribution K3sFamily");
return Ok(KubernetesDistribution::K3sFamily);
}
info!("Could not identify KubernetesDistribution, using Default");
return Ok(KubernetesDistribution::Default);
})
.await

View File

@@ -0,0 +1,3 @@
mod k8s_anywhere;
mod postgres;
pub use k8s_anywhere::*;

View File

@@ -0,0 +1,37 @@
use async_trait::async_trait;
use crate::{
modules::postgresql::capability::{
PostgreSQL, PostgreSQLConfig, PostgreSQLEndpoint, ReplicationCerts,
},
topology::K8sAnywhereTopology,
};
#[async_trait]
impl PostgreSQL for K8sAnywhereTopology {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
todo!()
}
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
/// Abstracts away storage/retrieval details (e.g., secrets, files).
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> {
todo!()
}
/// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster.
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String> {
todo!()
}
/// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough).
/// Returns None if no public endpoint (internal-only cluster).
/// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex
/// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait.
async fn get_public_endpoint(
&self,
cluster_name: &str,
) -> Result<Option<PostgreSQLEndpoint>, String> {
todo!()
}
}

View File

@@ -1,5 +1,7 @@
mod failover;
mod ha_cluster;
pub mod ingress;
pub use failover::*;
use harmony_types::net::IpAddress;
mod host_binding;
mod http;
@@ -13,7 +15,7 @@ pub use k8s_anywhere::*;
pub use localhost::*;
pub mod k8s;
mod load_balancer;
mod router;
pub mod router;
mod tftp;
use async_trait::async_trait;
pub use ha_cluster::*;

View File

@@ -1,11 +1,20 @@
use async_trait::async_trait;
use cidr::Ipv4Cidr;
use derive_new::new;
use serde::Serialize;
use super::{IpAddress, LogicalHost};
/// Basic network router abstraction (L3 IP routing/gateway).
/// Distinguished from TlsRouter (L4 TLS passthrough).
pub trait Router: Send + Sync {
/// Gateway IP address for this subnet/router.
fn get_gateway(&self) -> IpAddress;
/// CIDR block managed by this router.
fn get_cidr(&self) -> Ipv4Cidr;
/// Logical host associated with this router.
fn get_host(&self) -> LogicalHost;
}
@@ -38,3 +47,67 @@ impl Router for UnmanagedRouter {
todo!()
}
}
/// Desired state config for a TLS passthrough route.
/// Forwards external TLS (port 443) → backend service:target_port (no termination at router).
/// Inspired by CNPG multisite: exposes `-rw`/`-ro` services publicly via OKD Route/HAProxy/K8s
/// Gateway etc.
///
/// # Example
/// ```
/// use harmony::topology::router::TlsRoute;
/// let postgres_rw = TlsRoute {
/// hostname: "postgres-cluster-example.public.domain.io".to_string(),
/// backend: "postgres-cluster-example-rw".to_string(), // k8s Service or HAProxy upstream
/// target_port: 5432,
/// };
/// ```
#[derive(Clone, Debug, Serialize)]
pub struct TlsRoute {
/// Public hostname clients connect to (TLS SNI, port 443 implicit).
/// Router matches this for passthrough forwarding.
pub hostname: String,
/// Backend/host identifier (k8s Service, HAProxy upstream, IP/FQDN, etc.).
pub backend: String,
/// Backend TCP port (Postgres: 5432).
pub target_port: u16,
/// The environment in which it lives.
/// TODO clarify how we handle this in higher level abstractions. The namespace name is a
/// direct mapping to k8s but that could be misleading for other implementations.
pub namespace: String,
}
impl TlsRoute {
pub fn to_string_short(&self) -> String {
format!("{}-{}:{}", self.hostname, self.backend, self.target_port)
}
pub fn backend_info_string(&self) -> String {
format!("{}:{}", self.backend, self.target_port)
}
}
/// Installs and queries TLS passthrough routes (L4 TCP/SNI forwarding, no TLS termination).
/// Agnostic to impl: OKD Route, AWS NLB+HAProxy, k3s Envoy Gateway, Apache ProxyPass.
/// Used by PostgreSQL capability to expose CNPG clusters multisite (site1 → site2 replication).
///
/// # Usage
/// ```ignore
/// use harmony::topology::router::TlsRoute;
/// // After CNPG deploy, expose RW endpoint
/// async fn route() {
/// let topology = okd_topology();
/// let route = TlsRoute { /* ... */ };
/// topology.install_route(route).await; // OKD Route, HAProxy reload, etc.
/// }
/// ```
#[async_trait]
pub trait TlsRouter: Send + Sync {
/// Provisions the route (idempotent where possible).
/// Example: OKD Route{ host, to: backend:target_port, tls: {passthrough} };
/// HAProxy frontend→backend \"postgres-upstream\".
async fn install_route(&self, config: TlsRoute) -> Result<(), String>;
}

View File

@@ -0,0 +1,157 @@
use std::collections::BTreeMap;
use k8s_openapi::{
api::core::v1::{Affinity, Toleration},
apimachinery::pkg::apis::meta::v1::ObjectMeta,
};
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
#[kube(
group = "operators.coreos.com",
version = "v1alpha1",
kind = "CatalogSource",
plural = "catalogsources",
namespaced = true,
schema = "disabled"
)]
#[serde(rename_all = "camelCase")]
pub struct CatalogSourceSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub address: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub config_map: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub display_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub grpc_pod_config: Option<GrpcPodConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub icon: Option<Icon>,
#[serde(skip_serializing_if = "Option::is_none")]
pub image: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub priority: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub publisher: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub run_as_root: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub secrets: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub source_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub update_strategy: Option<UpdateStrategy>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct GrpcPodConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub affinity: Option<Affinity>,
#[serde(skip_serializing_if = "Option::is_none")]
pub extract_content: Option<ExtractContent>,
#[serde(skip_serializing_if = "Option::is_none")]
pub memory_target: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_selector: Option<BTreeMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub priority_class_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub security_context_config: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tolerations: Option<Vec<Toleration>>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ExtractContent {
pub cache_dir: String,
pub catalog_dir: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Icon {
pub base64data: String,
pub mediatype: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct UpdateStrategy {
#[serde(skip_serializing_if = "Option::is_none")]
pub registry_poll: Option<RegistryPoll>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct RegistryPoll {
#[serde(skip_serializing_if = "Option::is_none")]
pub interval: Option<String>,
}
impl Default for CatalogSource {
fn default() -> Self {
Self {
metadata: ObjectMeta::default(),
spec: CatalogSourceSpec {
address: None,
config_map: None,
description: None,
display_name: None,
grpc_pod_config: None,
icon: None,
image: None,
priority: None,
publisher: None,
run_as_root: None,
secrets: None,
source_type: None,
update_strategy: None,
},
}
}
}
impl Default for CatalogSourceSpec {
fn default() -> Self {
Self {
address: None,
config_map: None,
description: None,
display_name: None,
grpc_pod_config: None,
icon: None,
image: None,
priority: None,
publisher: None,
run_as_root: None,
secrets: None,
source_type: None,
update_strategy: None,
}
}
}

View File

@@ -0,0 +1,4 @@
mod catalogsources_operators_coreos_com;
pub use catalogsources_operators_coreos_com::*;
mod subscriptions_operators_coreos_com;
pub use subscriptions_operators_coreos_com::*;

View File

@@ -0,0 +1,68 @@
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::CustomResource;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
#[kube(
group = "operators.coreos.com",
version = "v1alpha1",
kind = "Subscription",
plural = "subscriptions",
namespaced = true,
schema = "disabled"
)]
#[serde(rename_all = "camelCase")]
pub struct SubscriptionSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub channel: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub config: Option<SubscriptionConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub install_plan_approval: Option<String>,
pub name: String,
pub source: String,
pub source_namespace: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub starting_csv: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct SubscriptionConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub env: Option<Vec<k8s_openapi::api::core::v1::EnvVar>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_selector: Option<std::collections::BTreeMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tolerations: Option<Vec<k8s_openapi::api::core::v1::Toleration>>,
}
impl Default for Subscription {
fn default() -> Self {
Subscription {
metadata: ObjectMeta::default(),
spec: SubscriptionSpec::default(),
}
}
}
impl Default for SubscriptionSpec {
fn default() -> SubscriptionSpec {
SubscriptionSpec {
name: String::new(),
source: String::new(),
source_namespace: String::new(),
channel: None,
config: None,
install_plan_approval: None,
starting_csv: None,
}
}
}

View File

@@ -0,0 +1,3 @@
mod operatorhub;
pub use operatorhub::*;
pub mod crd;

View File

@@ -0,0 +1,107 @@
// Write operatorhub catalog score
// for now this will only support on OKD with the default catalog and operatorhub setup and does not verify OLM state or anything else. Very opinionated and bare-bones to start
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use serde::Serialize;
use crate::interpret::Interpret;
use crate::modules::k8s::apps::crd::{
CatalogSource, CatalogSourceSpec, RegistryPoll, UpdateStrategy,
};
use crate::modules::k8s::resource::K8sResourceScore;
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
/// Installs the CatalogSource in a cluster which already has the required services and CRDs installed.
///
/// ```rust
/// use harmony::modules::k8s::apps::OperatorHubCatalogSourceScore;
///
/// let score = OperatorHubCatalogSourceScore::default();
/// ```
///
/// Required services:
/// - catalog-operator
/// - olm-operator
///
/// They are installed by default with OKD/Openshift
///
/// **Warning** : this initial implementation does not manage the dependencies. They must already
/// exist in the cluster.
#[derive(Debug, Clone, Serialize)]
pub struct OperatorHubCatalogSourceScore {
pub name: String,
pub namespace: String,
pub image: String,
}
impl OperatorHubCatalogSourceScore {
pub fn new(name: &str, namespace: &str, image: &str) -> Self {
Self {
name: name.to_string(),
namespace: namespace.to_string(),
image: image.to_string(),
}
}
}
impl Default for OperatorHubCatalogSourceScore {
/// This default implementation will create this k8s resource :
///
/// ```yaml
/// apiVersion: operators.coreos.com/v1alpha1
/// kind: CatalogSource
/// metadata:
/// name: operatorhubio-catalog
/// namespace: openshift-marketplace
/// spec:
/// sourceType: grpc
/// image: quay.io/operatorhubio/catalog:latest
/// displayName: Operatorhub Operators
/// publisher: OperatorHub.io
/// updateStrategy:
/// registryPoll:
/// interval: 60m
/// ```
fn default() -> Self {
OperatorHubCatalogSourceScore {
name: "operatorhubio-catalog".to_string(),
namespace: "openshift-marketplace".to_string(),
image: "quay.io/operatorhubio/catalog:latest".to_string(),
}
}
}
impl<T: Topology + K8sclient> Score<T> for OperatorHubCatalogSourceScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let metadata = ObjectMeta {
name: Some(self.name.clone()),
namespace: Some(self.namespace.clone()),
..ObjectMeta::default()
};
let spec = CatalogSourceSpec {
source_type: Some("grpc".to_string()),
image: Some(self.image.clone()),
display_name: Some("Operatorhub Operators".to_string()),
publisher: Some("OperatorHub.io".to_string()),
update_strategy: Some(UpdateStrategy {
registry_poll: Some(RegistryPoll {
interval: Some("60m".to_string()),
}),
}),
..CatalogSourceSpec::default()
};
let catalog_source = CatalogSource {
metadata,
spec: spec,
};
K8sResourceScore::single(catalog_source, Some(self.namespace.clone())).create_interpret()
}
fn name(&self) -> String {
format!("OperatorHubCatalogSourceScore({})", self.name)
}
}

View File

@@ -1,3 +1,4 @@
pub mod apps;
pub mod deployment;
pub mod ingress;
pub mod namespace;

View File

@@ -79,7 +79,33 @@ where
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
info!("Applying {} resources", self.score.resource.len());
// TODO improve this log
let resource_names: Vec<String> = self
.score
.resource
.iter()
.map(|r| {
format!(
"{}{}",
r.meta()
.name
.as_ref()
.map(|n| format!("{n}"))
.unwrap_or_default(),
r.meta()
.namespace
.as_ref()
.map(|ns| format!("@{}", ns))
.unwrap_or_default()
)
})
.collect();
info!(
"Applying {} resources : {}",
resource_names.len(),
resource_names.join(", ")
);
topology
.k8s_client()
.await

View File

@@ -11,8 +11,10 @@ pub mod k8s;
pub mod lamp;
pub mod load_balancer;
pub mod monitoring;
pub mod network;
pub mod okd;
pub mod opnsense;
pub mod postgresql;
pub mod prometheus;
pub mod storage;
pub mod tenant;

View File

@@ -0,0 +1,2 @@
mod tls_router;
pub use tls_router::*;

View File

@@ -0,0 +1,91 @@
use async_trait::async_trait;
use harmony_types::id::Id;
use serde::Serialize;
use crate::data::Version;
use crate::domain::topology::router::{TlsRoute, TlsRouter};
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory;
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
/// Score for provisioning a TLS passthrough route.
/// Exposes backend services via TLS passthrough (L4 TCP/SNI forwarding).
/// Agnostic to underlying router impl (OKD Route, HAProxy, Envoy, etc.).
///
/// TlsPassthroughScore relies on the TlsRouter Capability for its entire functionnality,
/// the implementation depends entirely on how the Topology implements it.
///
/// # Usage
/// ```
/// use harmony::modules::network::TlsPassthroughScore;
/// use harmony::topology::router::TlsRoute;
/// let score = TlsPassthroughScore {
/// route: TlsRoute {
/// backend: "postgres-cluster-rw".to_string(),
/// hostname: "postgres-rw.example.com".to_string(),
/// target_port: 5432,
/// },
/// };
/// ```
///
/// # Hint
///
/// **This TlsPassthroughScore should be used whenever possible.** It is effectively
/// an abstraction over the concept of tls passthrough, and it will allow much more flexible
/// usage over multiple types of Topology than using a lower level module such as
/// OKDTlsPassthroughScore.
///
/// On the other hand, some implementation specific options might not be available or practical
/// to use through this high level TlsPassthroughScore.
#[derive(Debug, Clone, Serialize)]
pub struct TlsPassthroughScore {
pub route: TlsRoute,
}
impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Score<T> for TlsPassthroughScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(TlsPassthroughInterpret {
tls_route: self.route.clone(),
})
}
fn name(&self) -> String {
format!(
"TlsRouterScore({}:{}{})",
self.route.backend, self.route.target_port, self.route.hostname
)
}
}
/// Custom interpret: provisions the TLS passthrough route on the topology.
#[derive(Debug, Clone)]
struct TlsPassthroughInterpret {
tls_route: TlsRoute,
}
#[async_trait]
impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Interpret<T> for TlsPassthroughInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("TlsRouterInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
async fn execute(&self, _inventory: &Inventory, topo: &T) -> Result<Outcome, InterpretError> {
topo.install_route(self.tls_route.clone())
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
Ok(Outcome::success(format!(
"TLS route installed: {}{}:{}",
self.tls_route.hostname, self.tls_route.backend, self.tls_route.target_port
)))
}
}

View File

@@ -1,8 +1,20 @@
use crate::{
interpret::Interpret, inventory::HostRole, modules::okd::bootstrap_okd_node::OKDNodeInterpret,
score::Score, topology::HAClusterTopology,
data::Version,
hardware::PhysicalHost,
infra::inventory::InventoryRepositoryFactory,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::{HostRole, Inventory},
modules::{
dhcp::DhcpHostBindingScore, http::IPxeMacBootFileScore,
inventory::DiscoverHostForRoleScore, okd::templates::BootstrapIpxeTpl,
},
score::Score,
topology::{HAClusterTopology, HostBinding},
};
use async_trait::async_trait;
use derive_new::new;
use harmony_types::id::Id;
use log::{debug, info};
use serde::Serialize;
// -------------------------------------------------------------------------------------------------
@@ -16,13 +28,226 @@ pub struct OKDSetup03ControlPlaneScore {}
impl Score<HAClusterTopology> for OKDSetup03ControlPlaneScore {
fn create_interpret(&self) -> Box<dyn Interpret<HAClusterTopology>> {
// TODO: Implement a step to wait for the control plane nodes to join the cluster
// and for the cluster operators to become available. This would be similar to
// the `wait-for bootstrap-complete` command.
Box::new(OKDNodeInterpret::new(HostRole::ControlPlane))
Box::new(OKDSetup03ControlPlaneInterpret::new())
}
fn name(&self) -> String {
"OKDSetup03ControlPlaneScore".to_string()
}
}
#[derive(Debug, Clone)]
pub struct OKDSetup03ControlPlaneInterpret {
version: Version,
status: InterpretStatus,
}
impl OKDSetup03ControlPlaneInterpret {
pub fn new() -> Self {
let version = Version::from("1.0.0").unwrap();
Self {
version,
status: InterpretStatus::QUEUED,
}
}
/// Ensures that three physical hosts are discovered and available for the ControlPlane role.
/// It will trigger discovery if not enough hosts are found.
async fn get_nodes(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
) -> Result<Vec<PhysicalHost>, InterpretError> {
const REQUIRED_HOSTS: usize = 3;
let repo = InventoryRepositoryFactory::build().await?;
let mut control_plane_hosts = repo.get_host_for_role(&HostRole::ControlPlane).await?;
while control_plane_hosts.len() < REQUIRED_HOSTS {
info!(
"Discovery of {} control plane hosts in progress, current number {}",
REQUIRED_HOSTS,
control_plane_hosts.len()
);
// This score triggers the discovery agent for a specific role.
DiscoverHostForRoleScore {
role: HostRole::ControlPlane,
}
.interpret(inventory, topology)
.await?;
control_plane_hosts = repo.get_host_for_role(&HostRole::ControlPlane).await?;
}
if control_plane_hosts.len() < REQUIRED_HOSTS {
Err(InterpretError::new(format!(
"OKD Requires at least {} control plane hosts, but only found {}. Cannot proceed.",
REQUIRED_HOSTS,
control_plane_hosts.len()
)))
} else {
// Take exactly the number of required hosts to ensure consistency.
Ok(control_plane_hosts
.into_iter()
.take(REQUIRED_HOSTS)
.collect())
}
}
/// Configures DHCP host bindings for all control plane nodes.
async fn configure_host_binding(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
nodes: &Vec<PhysicalHost>,
) -> Result<(), InterpretError> {
info!("[ControlPlane] Configuring host bindings for control plane nodes.");
// Ensure the topology definition matches the number of physical nodes found.
if topology.control_plane.len() != nodes.len() {
return Err(InterpretError::new(format!(
"Mismatch between logical control plane hosts defined in topology ({}) and physical nodes found ({}).",
topology.control_plane.len(),
nodes.len()
)));
}
// Create a binding for each physical host to its corresponding logical host.
let bindings: Vec<HostBinding> = topology
.control_plane
.iter()
.zip(nodes.iter())
.map(|(logical_host, physical_host)| {
info!(
"Creating binding: Logical Host '{}' -> Physical Host ID '{}'",
logical_host.name, physical_host.id
);
HostBinding {
logical_host: logical_host.clone(),
physical_host: physical_host.clone(),
}
})
.collect();
DhcpHostBindingScore {
host_binding: bindings,
domain: Some(topology.domain_name.clone()),
}
.interpret(inventory, topology)
.await?;
Ok(())
}
/// Renders and deploys a per-MAC iPXE boot file for each control plane node.
async fn configure_ipxe(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
nodes: &Vec<PhysicalHost>,
) -> Result<(), InterpretError> {
info!("[ControlPlane] Rendering per-MAC iPXE configurations.");
// The iPXE script content is the same for all control plane nodes,
// pointing to the 'master.ign' ignition file.
let content = BootstrapIpxeTpl {
http_ip: &topology.http_server.get_ip().to_string(),
scos_path: "scos",
ignition_http_path: "okd_ignition_files",
installation_device: "/dev/sda", // This might need to be configurable per-host in the future
ignition_file_name: "master.ign", // Control plane nodes use the master ignition file
}
.to_string();
debug!("[ControlPlane] iPXE content template:\n{content}");
// Create and apply an iPXE boot file for each node.
for node in nodes {
let mac_address = node.get_mac_address();
if mac_address.is_empty() {
return Err(InterpretError::new(format!(
"Physical host with ID '{}' has no MAC addresses defined.",
node.id
)));
}
info!(
"[ControlPlane] Applying iPXE config for node ID '{}' with MACs: {:?}",
node.id, mac_address
);
IPxeMacBootFileScore {
mac_address,
content: content.clone(),
}
.interpret(inventory, topology)
.await?;
}
Ok(())
}
/// Prompts the user to reboot the target control plane nodes.
async fn reboot_targets(&self, nodes: &Vec<PhysicalHost>) -> Result<(), InterpretError> {
let node_ids: Vec<String> = nodes.iter().map(|n| n.id.to_string()).collect();
info!("[ControlPlane] Requesting reboot for control plane nodes: {node_ids:?}",);
let confirmation = inquire::Confirm::new(
&format!("Please reboot the {} control plane nodes ({}) to apply their PXE configuration. Press enter when ready.", nodes.len(), node_ids.join(", ")),
)
.prompt()
.map_err(|e| InterpretError::new(format!("User prompt failed: {e}")))?;
if !confirmation {
return Err(InterpretError::new(
"User aborted the operation.".to_string(),
));
}
Ok(())
}
}
#[async_trait]
impl Interpret<HAClusterTopology> for OKDSetup03ControlPlaneInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDSetup03ControlPlane")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<Id> {
vec![]
}
async fn execute(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
) -> Result<Outcome, InterpretError> {
// 1. Ensure we have 3 physical hosts for the control plane.
let nodes = self.get_nodes(inventory, topology).await?;
// 2. Create DHCP reservations for the control plane nodes.
self.configure_host_binding(inventory, topology, &nodes)
.await?;
// 3. Create iPXE files for each control plane node to boot from the master ignition.
self.configure_ipxe(inventory, topology, &nodes).await?;
// 4. Reboot the nodes to start the OS installation.
self.reboot_targets(&nodes).await?;
// TODO: Implement a step to wait for the control plane nodes to join the cluster
// and for the cluster operators to become available. This would be similar to
// the `wait-for bootstrap-complete` command.
info!("[ControlPlane] Provisioning initiated. Monitor the cluster convergence manually.");
Ok(Outcome::success(
"Control plane provisioning has been successfully initiated.".into(),
))
}
}

View File

@@ -1,9 +1,15 @@
use async_trait::async_trait;
use derive_new::new;
use harmony_types::id::Id;
use log::info;
use serde::Serialize;
use crate::{
interpret::Interpret, inventory::HostRole, modules::okd::bootstrap_okd_node::OKDNodeInterpret,
score::Score, topology::HAClusterTopology,
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::HAClusterTopology,
};
// -------------------------------------------------------------------------------------------------
@@ -17,10 +23,61 @@ pub struct OKDSetup04WorkersScore {}
impl Score<HAClusterTopology> for OKDSetup04WorkersScore {
fn create_interpret(&self) -> Box<dyn Interpret<HAClusterTopology>> {
Box::new(OKDNodeInterpret::new(HostRole::Worker))
Box::new(OKDSetup04WorkersInterpret::new(self.clone()))
}
fn name(&self) -> String {
"OKDSetup04WorkersScore".to_string()
}
}
#[derive(Debug, Clone)]
pub struct OKDSetup04WorkersInterpret {
score: OKDSetup04WorkersScore,
version: Version,
status: InterpretStatus,
}
impl OKDSetup04WorkersInterpret {
pub fn new(score: OKDSetup04WorkersScore) -> Self {
let version = Version::from("1.0.0").unwrap();
Self {
version,
score,
status: InterpretStatus::QUEUED,
}
}
async fn render_and_reboot(&self) -> Result<(), InterpretError> {
info!("[Workers] Rendering per-MAC PXE for workers and rebooting");
Ok(())
}
}
#[async_trait]
impl Interpret<HAClusterTopology> for OKDSetup04WorkersInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDSetup04Workers")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<Id> {
vec![]
}
async fn execute(
&self,
_inventory: &Inventory,
_topology: &HAClusterTopology,
) -> Result<Outcome, InterpretError> {
self.render_and_reboot().await?;
Ok(Outcome::success("Workers provisioned".into()))
}
}

View File

@@ -1,303 +0,0 @@
use async_trait::async_trait;
use derive_new::new;
use harmony_types::id::Id;
use log::{debug, info};
use serde::Serialize;
use crate::{
data::Version,
hardware::PhysicalHost,
infra::inventory::InventoryRepositoryFactory,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::{HostRole, Inventory},
modules::{
dhcp::DhcpHostBindingScore,
http::IPxeMacBootFileScore,
inventory::DiscoverHostForRoleScore,
okd::{
okd_node::{
BootstrapRole, ControlPlaneRole, OKDRoleProperties, StorageRole, WorkerRole,
},
templates::BootstrapIpxeTpl,
},
},
score::Score,
topology::{HAClusterTopology, HostBinding, LogicalHost},
};
#[derive(Debug, Clone, Serialize, new)]
pub struct OKDNodeInstallationScore {
host_role: HostRole,
}
impl Score<HAClusterTopology> for OKDNodeInstallationScore {
fn name(&self) -> String {
"OKDNodeScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<HAClusterTopology>> {
Box::new(OKDNodeInterpret::new(self.host_role.clone()))
}
}
#[derive(Debug, Clone)]
pub struct OKDNodeInterpret {
host_role: HostRole,
}
impl OKDNodeInterpret {
pub fn new(host_role: HostRole) -> Self {
Self { host_role }
}
fn okd_role_properties(&self, role: &HostRole) -> &'static dyn OKDRoleProperties {
match role {
HostRole::Bootstrap => &BootstrapRole,
HostRole::ControlPlane => &ControlPlaneRole,
HostRole::Worker => &WorkerRole,
}
}
async fn get_nodes(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
) -> Result<Vec<PhysicalHost>, InterpretError> {
let repo = InventoryRepositoryFactory::build().await?;
let mut hosts = repo.get_host_for_role(&self.host_role).await?;
let okd_host_properties = self.okd_role_properties(&self.host_role);
let required_hosts: usize = okd_host_properties.required_hosts();
while hosts.len() < required_hosts {
info!(
"Discovery of {} {} hosts in progress, current number {}",
required_hosts,
self.host_role,
hosts.len()
);
// This score triggers the discovery agent for a specific role.
DiscoverHostForRoleScore {
role: self.host_role.clone(),
}
.interpret(inventory, topology)
.await?;
hosts = repo.get_host_for_role(&self.host_role).await?;
}
if hosts.len() < required_hosts {
Err(InterpretError::new(format!(
"OKD Requires at least {} {} hosts, but only found {}. Cannot proceed.",
required_hosts,
self.host_role,
hosts.len()
)))
} else {
// Take exactly the number of required hosts to ensure consistency.
Ok(hosts.into_iter().take(required_hosts).collect())
}
}
/// Configures DHCP host bindings for all nodes.
async fn configure_host_binding(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
nodes: &Vec<PhysicalHost>,
) -> Result<(), InterpretError> {
info!(
"[{}] Configuring host bindings for {} plane nodes.",
self.host_role, self.host_role,
);
let host_properties = self.okd_role_properties(&self.host_role);
self.validate_host_node_match(nodes, host_properties.logical_hosts(topology))?;
let bindings: Vec<HostBinding> =
self.host_bindings(nodes, host_properties.logical_hosts(topology));
DhcpHostBindingScore {
host_binding: bindings,
domain: Some(topology.domain_name.clone()),
}
.interpret(inventory, topology)
.await?;
Ok(())
}
// Ensure the topology definition matches the number of physical nodes found.
fn validate_host_node_match(
&self,
nodes: &Vec<PhysicalHost>,
hosts: &Vec<LogicalHost>,
) -> Result<(), InterpretError> {
if hosts.len() != nodes.len() {
return Err(InterpretError::new(format!(
"Mismatch between logical hosts defined in topology ({}) and physical nodes found ({}).",
hosts.len(),
nodes.len()
)));
}
Ok(())
}
// Create a binding for each physical host to its corresponding logical host.
fn host_bindings(
&self,
nodes: &Vec<PhysicalHost>,
hosts: &Vec<LogicalHost>,
) -> Vec<HostBinding> {
hosts
.iter()
.zip(nodes.iter())
.map(|(logical_host, physical_host)| {
info!(
"Creating binding: Logical Host '{}' -> Physical Host ID '{}'",
logical_host.name, physical_host.id
);
HostBinding {
logical_host: logical_host.clone(),
physical_host: physical_host.clone(),
}
})
.collect()
}
/// Renders and deploys a per-MAC iPXE boot file for each node.
async fn configure_ipxe(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
nodes: &Vec<PhysicalHost>,
) -> Result<(), InterpretError> {
info!(
"[{}] Rendering per-MAC iPXE configurations.",
self.host_role
);
let okd_role_properties = self.okd_role_properties(&self.host_role);
// The iPXE script content is the same for all control plane nodes,
// pointing to the 'master.ign' ignition file.
let content = BootstrapIpxeTpl {
http_ip: &topology.http_server.get_ip().to_string(),
scos_path: "scos",
ignition_http_path: "okd_ignition_files",
//TODO must be refactored to not only use /dev/sda
installation_device: "/dev/sda", // This might need to be configurable per-host in the future
ignition_file_name: okd_role_properties.ignition_file(),
}
.to_string();
debug!("[{}] iPXE content template:\n{content}", self.host_role);
// Create and apply an iPXE boot file for each node.
for node in nodes {
let mac_address = node.get_mac_address();
if mac_address.is_empty() {
return Err(InterpretError::new(format!(
"Physical host with ID '{}' has no MAC addresses defined.",
node.id
)));
}
info!(
"[{}] Applying iPXE config for node ID '{}' with MACs: {:?}",
self.host_role, node.id, mac_address
);
IPxeMacBootFileScore {
mac_address,
content: content.clone(),
}
.interpret(inventory, topology)
.await?;
}
Ok(())
}
/// Prompts the user to reboot the target control plane nodes.
async fn reboot_targets(&self, nodes: &Vec<PhysicalHost>) -> Result<(), InterpretError> {
let node_ids: Vec<String> = nodes.iter().map(|n| n.id.to_string()).collect();
info!(
"[{}] Requesting reboot for control plane nodes: {node_ids:?}",
self.host_role
);
let confirmation = inquire::Confirm::new(
&format!("Please reboot the {} {} nodes ({}) to apply their PXE configuration. Press enter when ready.", nodes.len(), self.host_role, node_ids.join(", ")),
)
.prompt()
.map_err(|e| InterpretError::new(format!("User prompt failed: {e}")))?;
if !confirmation {
return Err(InterpretError::new(
"User aborted the operation.".to_string(),
));
}
Ok(())
}
}
#[async_trait]
impl Interpret<HAClusterTopology> for OKDNodeInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
) -> Result<Outcome, InterpretError> {
// 1. Ensure we have the specfied number of physical hosts.
let nodes = self.get_nodes(inventory, topology).await?;
// 2. Create DHCP reservations for the nodes.
self.configure_host_binding(inventory, topology, &nodes)
.await?;
// 3. Create iPXE files for each node to boot from the ignition.
self.configure_ipxe(inventory, topology, &nodes).await?;
// 4. Reboot the nodes to start the OS installation.
self.reboot_targets(&nodes).await?;
// TODO: Implement a step to validate that the installation of the nodes is
// complete and for the cluster operators to become available.
//
// The OpenShift installer only provides two wait commands which currently need to be
// run manually:
// - `openshift-install wait-for bootstrap-complete`
// - `openshift-install wait-for install-complete`
//
// There is no installer command that waits specifically for worker node
// provisioning. Worker nodes join asynchronously (via ignition + CSR approval),
// and the cluster becomes fully functional only once all nodes are Ready and the
// cluster operators report Available=True.
info!(
"[{}] Provisioning initiated. Monitor the cluster convergence manually.",
self.host_role
);
Ok(Outcome::success(format!(
"{} provisioning has been successfully initiated.",
self.host_role
)))
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDNodeSetup".into())
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@@ -1 +1,2 @@
pub mod nmstate;
pub mod route;

View File

@@ -0,0 +1,287 @@
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ListMeta, ObjectMeta, Time};
use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
use k8s_openapi::{NamespaceResourceScope, Resource};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct LocalObjectReference {
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Route {
#[serde(skip_serializing_if = "Option::is_none")]
pub api_version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub kind: Option<String>,
pub metadata: ObjectMeta,
pub spec: RouteSpec,
#[serde(skip_serializing_if = "Option::is_none")]
pub status: Option<RouteStatus>,
}
impl Resource for Route {
const API_VERSION: &'static str = "route.openshift.io/v1";
const GROUP: &'static str = "route.openshift.io";
const VERSION: &'static str = "v1";
const KIND: &'static str = "Route";
const URL_PATH_SEGMENT: &'static str = "routes";
type Scope = NamespaceResourceScope;
}
impl k8s_openapi::Metadata for Route {
type Ty = ObjectMeta;
fn metadata(&self) -> &Self::Ty {
&self.metadata
}
fn metadata_mut(&mut self) -> &mut Self::Ty {
&mut self.metadata
}
}
impl Default for Route {
fn default() -> Self {
Route {
api_version: Some("route.openshift.io/v1".to_string()),
kind: Some("Route".to_string()),
metadata: ObjectMeta::default(),
spec: RouteSpec::default(),
status: None,
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteList {
pub metadata: ListMeta,
pub items: Vec<Route>,
}
impl Default for RouteList {
fn default() -> Self {
Self {
metadata: ListMeta::default(),
items: Vec::new(),
}
}
}
impl Resource for RouteList {
const API_VERSION: &'static str = "route.openshift.io/v1";
const GROUP: &'static str = "route.openshift.io";
const VERSION: &'static str = "v1";
const KIND: &'static str = "RouteList";
const URL_PATH_SEGMENT: &'static str = "routes";
type Scope = NamespaceResourceScope;
}
impl k8s_openapi::Metadata for RouteList {
type Ty = ListMeta;
fn metadata(&self) -> &Self::Ty {
&self.metadata
}
fn metadata_mut(&mut self) -> &mut Self::Ty {
&mut self.metadata
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub alternate_backends: Option<Vec<RouteTargetReference>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub host: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub http_headers: Option<RouteHTTPHeaders>,
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub port: Option<RoutePort>,
#[serde(skip_serializing_if = "Option::is_none")]
pub subdomain: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tls: Option<TLSConfig>,
pub to: RouteTargetReference,
#[serde(skip_serializing_if = "Option::is_none")]
pub wildcard_policy: Option<String>,
}
impl Default for RouteSpec {
fn default() -> RouteSpec {
RouteSpec {
alternate_backends: None,
host: None,
http_headers: None,
path: None,
port: None,
subdomain: None,
tls: None,
to: RouteTargetReference::default(),
wildcard_policy: None,
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteTargetReference {
pub kind: String,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub weight: Option<i32>,
}
impl Default for RouteTargetReference {
fn default() -> RouteTargetReference {
RouteTargetReference {
kind: String::default(),
name: String::default(),
weight: None,
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RoutePort {
pub target_port: u16,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct TLSConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub ca_certificate: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub certificate: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub destination_ca_certificate: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub external_certificate: Option<LocalObjectReference>,
#[serde(skip_serializing_if = "Option::is_none")]
pub insecure_edge_termination_policy: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub key: Option<String>,
pub termination: String,
}
impl Default for TLSConfig {
fn default() -> Self {
Self {
ca_certificate: None,
certificate: None,
destination_ca_certificate: None,
external_certificate: None,
insecure_edge_termination_policy: None,
key: None,
termination: "edge".to_string(),
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteStatus {
#[serde(skip_serializing_if = "Option::is_none")]
pub ingress: Option<Vec<RouteIngress>>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteIngress {
#[serde(skip_serializing_if = "Option::is_none")]
pub host: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub router_canonical_hostname: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub router_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub wildcard_policy: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub conditions: Option<Vec<RouteIngressCondition>>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteIngressCondition {
#[serde(skip_serializing_if = "Option::is_none")]
pub last_transition_time: Option<Time>,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
pub status: String,
#[serde(rename = "type")]
pub condition_type: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteHTTPHeader {
pub name: String,
pub action: RouteHTTPHeaderActionUnion,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteHTTPHeaderActionUnion {
#[serde(skip_serializing_if = "Option::is_none")]
pub set: Option<RouteSetHTTPHeader>,
#[serde(rename = "type")]
pub action_type: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteSetHTTPHeader {
pub value: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct RouteHTTPHeaderActions {
#[serde(skip_serializing_if = "Option::is_none")]
pub request: Option<Vec<RouteHTTPHeader>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub response: Option<Vec<RouteHTTPHeader>>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteHTTPHeaders {
#[serde(skip_serializing_if = "Option::is_none")]
pub actions: Option<RouteHTTPHeaderActions>,
}

View File

@@ -6,14 +6,13 @@ mod bootstrap_05_sanity_check;
mod bootstrap_06_installation_report;
pub mod bootstrap_dhcp;
pub mod bootstrap_load_balancer;
pub mod bootstrap_okd_node;
mod bootstrap_persist_network_bond;
pub mod dhcp;
pub mod dns;
pub mod installation;
pub mod ipxe;
pub mod load_balancer;
pub mod okd_node;
pub mod route;
pub mod templates;
pub mod upgrade;
pub use bootstrap_01_prepare::*;

View File

@@ -1,54 +0,0 @@
use crate::topology::{HAClusterTopology, LogicalHost};
pub trait OKDRoleProperties {
fn ignition_file(&self) -> &'static str;
fn required_hosts(&self) -> usize;
fn logical_hosts<'a>(&self, t: &'a HAClusterTopology) -> &'a Vec<LogicalHost>;
}
pub struct BootstrapRole;
pub struct ControlPlaneRole;
pub struct WorkerRole;
pub struct StorageRole;
impl OKDRoleProperties for BootstrapRole {
fn ignition_file(&self) -> &'static str {
"bootstrap.ign"
}
fn required_hosts(&self) -> usize {
1
}
fn logical_hosts<'a>(&self, t: &'a HAClusterTopology) -> &'a Vec<LogicalHost> {
todo!()
}
}
impl OKDRoleProperties for ControlPlaneRole {
fn ignition_file(&self) -> &'static str {
"master.ign"
}
fn required_hosts(&self) -> usize {
3
}
fn logical_hosts<'a>(&self, t: &'a HAClusterTopology) -> &'a Vec<LogicalHost> {
&t.control_plane
}
}
impl OKDRoleProperties for WorkerRole {
fn ignition_file(&self) -> &'static str {
"worker.ign"
}
fn required_hosts(&self) -> usize {
2
}
fn logical_hosts<'a>(&self, t: &'a HAClusterTopology) -> &'a Vec<LogicalHost> {
&t.workers
}
}

View File

@@ -0,0 +1,101 @@
// TODO
// Write OKDRouteScore : This is the real one which will apply the k8s resource and expose all
// relevant option to Harmony's various use cases
//
// Write OKDTlsPassthroughScore : This one will use an OKDRouteScore under the hood and simply fill
// in all settings to make this route a TlsPassthrough
//
// These scores are meant to be used by an OKD based topology to provide Capabilities like
// TlsRouter
//
// The first use case to serve here is the postgresql multisite setup, so exposing only the
// settings relevant to this use case is enough at first, following YAGNI.
//
// These scores are not intended to be used directly by a user, unless the user knows that he will
// always be dealing only with okd/openshift compatible topologies and is ready to manage the
// additional maintenance burden that comes with a lower level functionnality.
use harmony_types::rfc1123::Rfc1123Name;
use kube::api::ObjectMeta;
use serde::Serialize;
use crate::modules::k8s::resource::K8sResourceScore;
use crate::modules::okd::crd::route::{
Route, RoutePort, RouteSpec, RouteTargetReference, TLSConfig,
};
use crate::score::Score;
use crate::topology::{K8sclient, TlsRoute, Topology};
#[derive(Debug, Clone, Serialize)]
pub struct OKDRouteScore {
pub name: String,
pub namespace: String,
pub spec: RouteSpec,
}
impl OKDRouteScore {
pub fn new(name: &str, namespace: &str, spec: RouteSpec) -> Self {
Self {
name: name.to_string(),
namespace: namespace.to_string(),
spec,
}
}
}
impl<T: Topology + K8sclient> Score<T> for OKDRouteScore {
fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> {
let route = Route {
metadata: ObjectMeta {
name: Some(self.name.clone()),
namespace: Some(self.namespace.clone()),
..ObjectMeta::default()
},
spec: self.spec.clone(),
..Default::default()
};
K8sResourceScore::single(route, Some(self.namespace.clone())).create_interpret()
}
fn name(&self) -> String {
format!("OKDRouteScore({})", self.name)
}
}
#[derive(Debug, Clone, Serialize)]
pub struct OKDTlsPassthroughScore {
pub route: TlsRoute,
pub name: Rfc1123Name,
}
impl<T: Topology + K8sclient> Score<T> for OKDTlsPassthroughScore {
fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> {
let passthrough_spec = RouteSpec {
host: Some(self.route.hostname.clone()),
wildcard_policy: Some("None".to_string()),
to: RouteTargetReference {
kind: "Service".to_string(),
name: self.route.backend.clone(),
weight: Some(100),
},
port: Some(RoutePort {
target_port: self.route.target_port,
}),
tls: Some(TLSConfig {
termination: "passthrough".to_string(),
insecure_edge_termination_policy: Some("None".to_string()),
..Default::default()
}),
..Default::default()
};
let route_score = OKDRouteScore::new(&self.name.to_string(), &self.route.namespace, passthrough_spec);
route_score.create_interpret()
}
fn name(&self) -> String {
format!(
"OKDTlsPassthroughScore({}:{}/{}{})",
self.route.backend, self.route.target_port, self.route.namespace, self.route.hostname
)
}
}

View File

@@ -0,0 +1,85 @@
use async_trait::async_trait;
use harmony_types::storage::StorageSize;
use serde::Serialize;
use std::collections::HashMap;
#[async_trait]
pub trait PostgreSQL: Send + Sync {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String>;
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
/// Abstracts away storage/retrieval details (e.g., secrets, files).
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String>;
/// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster.
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String>;
/// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough).
/// Returns None if no public endpoint (internal-only cluster).
/// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex
/// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait.
async fn get_public_endpoint(
&self,
cluster_name: &str,
) -> Result<Option<PostgreSQLEndpoint>, String>;
}
#[derive(Clone, Debug, Serialize)]
pub struct PostgreSQLConfig {
pub cluster_name: String,
pub instances: u32,
pub storage_size: StorageSize,
pub role: PostgreSQLClusterRole,
}
#[derive(Clone, Debug, Serialize)]
pub enum PostgreSQLClusterRole {
Primary,
Replica(ReplicaConfig),
}
#[derive(Clone, Debug, Serialize)]
pub struct ReplicaConfig {
/// Name of the primary cluster this replica will sync from
pub primary_cluster_name: String,
/// Certs extracted from primary via Topology::get_replication_certs()
pub replication_certs: ReplicationCerts,
/// Bootstrap method (e.g., pg_basebackup from primary)
pub bootstrap: BootstrapConfig,
/// External cluster connection details for CNPG spec.externalClusters
pub external_cluster: ExternalClusterConfig,
}
#[derive(Clone, Debug, Serialize)]
pub struct BootstrapConfig {
pub strategy: BootstrapStrategy,
}
#[derive(Clone, Debug, Serialize)]
pub enum BootstrapStrategy {
PgBasebackup,
}
#[derive(Clone, Debug, Serialize)]
pub struct ExternalClusterConfig {
/// Name used in CNPG externalClusters list
pub name: String,
/// Connection params (host/port set by multisite logic, sslmode='verify-ca', etc.)
pub connection_parameters: HashMap<String, String>,
}
#[derive(Clone, Debug, Serialize)]
pub struct ReplicationCerts {
/// PEM-encoded CA cert from primary
pub ca_cert_pem: String,
/// PEM-encoded streaming_replica client cert (tls.crt)
pub streaming_replica_cert_pem: String,
/// PEM-encoded streaming_replica client key (tls.key)
pub streaming_replica_key_pem: String,
}
#[derive(Clone, Debug)]
pub struct PostgreSQLEndpoint {
pub host: String,
pub port: u16,
}

View File

@@ -0,0 +1,58 @@
use kube::{CustomResource, api::ObjectMeta};
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
#[kube(
group = "postgresql.cnpg.io",
version = "v1",
kind = "Cluster",
plural = "clusters",
namespaced = true,
schema = "disabled"
)]
#[serde(rename_all = "camelCase")]
pub struct ClusterSpec {
pub instances: i32,
pub image_name: Option<String>,
pub storage: Storage,
pub bootstrap: Bootstrap,
}
impl Default for Cluster {
fn default() -> Self {
Cluster {
metadata: ObjectMeta::default(),
spec: ClusterSpec::default(),
}
}
}
impl Default for ClusterSpec {
fn default() -> Self {
Self {
instances: 1,
image_name: None,
storage: Storage::default(),
bootstrap: Bootstrap::default(),
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Storage {
pub size: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Bootstrap {
pub initdb: Initdb,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Initdb {
pub database: String,
pub owner: String,
}

View File

@@ -0,0 +1,2 @@
mod crd;
pub use crd::*;

View File

@@ -0,0 +1,125 @@
use async_trait::async_trait;
use log::debug;
use log::info;
use std::collections::HashMap;
use crate::{
modules::postgresql::capability::{
BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL,
PostgreSQLClusterRole, PostgreSQLConfig, PostgreSQLEndpoint, ReplicaConfig,
ReplicationCerts,
},
topology::FailoverTopology,
};
#[async_trait]
impl<T: PostgreSQL> PostgreSQL for FailoverTopology<T> {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
info!(
"Starting deployment of failover topology '{}'",
config.cluster_name
);
let primary_config = PostgreSQLConfig {
cluster_name: config.cluster_name.clone(),
instances: config.instances,
storage_size: config.storage_size.clone(),
role: PostgreSQLClusterRole::Primary,
};
info!(
"Deploying primary cluster '{{}}' ({} instances, {:?} storage)",
primary_config.cluster_name, primary_config.storage_size
);
let primary_cluster_name = self.primary.deploy(&primary_config).await?;
info!("Primary cluster '{primary_cluster_name}' deployed successfully");
info!("Retrieving replication certificates for primary '{primary_cluster_name}'");
let certs = self
.primary
.get_replication_certs(&primary_cluster_name)
.await?;
info!("Replication certificates retrieved successfully");
info!("Retrieving public endpoint for primary '{primary_cluster_name}");
let endpoint = self
.primary
.get_public_endpoint(&primary_cluster_name)
.await?
.ok_or_else(|| "No public endpoint configured on primary cluster".to_string())?;
info!(
"Public endpoint '{}:{}' retrieved for primary",
endpoint.host, endpoint.port
);
info!("Configuring replica connection parameters and bootstrap");
let mut connection_parameters = HashMap::new();
connection_parameters.insert("host".to_string(), endpoint.host);
connection_parameters.insert("port".to_string(), endpoint.port.to_string());
connection_parameters.insert("dbname".to_string(), "postgres".to_string());
connection_parameters.insert("user".to_string(), "streaming_replica".to_string());
connection_parameters.insert("sslmode".to_string(), "verify-ca".to_string());
connection_parameters.insert("sslnegotiation".to_string(), "direct".to_string());
debug!("Replica connection parameters: {:?}", connection_parameters);
let external_cluster = ExternalClusterConfig {
name: primary_cluster_name.clone(),
connection_parameters,
};
let bootstrap_config = BootstrapConfig {
strategy: BootstrapStrategy::PgBasebackup,
};
let replica_cluster_config = ReplicaConfig {
primary_cluster_name: primary_cluster_name.clone(),
replication_certs: certs,
bootstrap: bootstrap_config,
external_cluster,
};
let replica_config = PostgreSQLConfig {
cluster_name: format!("{}-replica", primary_cluster_name),
instances: config.instances,
storage_size: config.storage_size.clone(),
role: PostgreSQLClusterRole::Replica(replica_cluster_config),
};
info!(
"Deploying replica cluster '{}' ({} instances, {:?} storage) on replica topology",
replica_config.cluster_name, replica_config.instances, replica_config.storage_size
);
self.replica.deploy(&replica_config).await?;
info!(
"Replica cluster '{}' deployed successfully; failover topology '{}' ready",
replica_config.cluster_name, config.cluster_name
);
Ok(primary_cluster_name)
}
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> {
self.primary.get_replication_certs(cluster_name).await
}
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String> {
self.primary.get_endpoint(cluster_name).await
}
async fn get_public_endpoint(
&self,
cluster_name: &str,
) -> Result<Option<PostgreSQLEndpoint>, String> {
self.primary.get_public_endpoint(cluster_name).await
}
}

View File

@@ -0,0 +1,13 @@
pub mod capability;
mod score;
mod score_connect;
pub use score_connect::*;
pub use score::*;
mod score_public;
pub use score_public::*;
pub mod failover;
mod operator;
pub use operator::*;
pub mod cnpg;

View File

@@ -0,0 +1,102 @@
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use serde::Serialize;
use crate::interpret::Interpret;
use crate::modules::k8s::apps::crd::{Subscription, SubscriptionSpec};
use crate::modules::k8s::resource::K8sResourceScore;
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
/// Install the CloudNativePg (CNPG) Operator via an OperatorHub `Subscription`.
///
/// This Score creates a a `Subscription` Custom Resource in the specified namespace.
///
/// The default implementation pulls the `cloudnative-pg` operator from the
/// `operatorhubio-catalog` source.
///
/// # Goals
/// - Deploy the CNPG Operator to manage PostgreSQL clusters in OpenShift/OKD environments.
///
/// # Usage
/// ```
/// use harmony::modules::postgresql::CloudNativePgOperatorScore;
/// let score = CloudNativePgOperatorScore::default();
/// ```
///
/// Or, you can take control of most relevant fiedls this way :
///
/// ```
/// use harmony::modules::postgresql::CloudNativePgOperatorScore;
///
/// let score = CloudNativePgOperatorScore {
/// namespace: "custom-cnpg-namespace".to_string(),
/// channel: "unstable-i-want-bleedingedge-v498437".to_string(),
/// install_plan_approval: "Manual".to_string(),
/// source: "operatorhubio-catalog-but-different".to_string(),
/// source_namespace: "i-customize-everything-marketplace".to_string(),
/// };
/// ```
///
/// # Limitations
/// - **OperatorHub dependency**: Requires OperatorHub catalog sources (e.g., `operatorhubio-catalog` in `openshift-marketplace`).
/// - **OKD/OpenShift assumption**: Catalog/source names and namespaces are hardcoded for OKD-like setups; adjust for upstream OpenShift.
/// - **Hardcoded values in Default implementation**: Operator name (`cloudnative-pg`), channel (`stable-v1`), automatic install plan approval.
/// - **No config options**: Does not support custom `SubscriptionConfig` (env vars, node selectors, tolerations).
/// - **Single namespace**: Targets one namespace per score instance.
#[derive(Debug, Clone, Serialize)]
pub struct CloudNativePgOperatorScore {
pub namespace: String,
pub channel: String,
pub install_plan_approval: String,
pub source: String,
pub source_namespace: String,
}
impl Default for CloudNativePgOperatorScore {
fn default() -> Self {
Self {
namespace: "openshift-operators".to_string(),
channel: "stable-v1".to_string(),
install_plan_approval: "Automatic".to_string(),
source: "operatorhubio-catalog".to_string(),
source_namespace: "openshift-marketplace".to_string(),
}
}
}
impl CloudNativePgOperatorScore {
pub fn new(namespace: &str) -> Self {
Self {
namespace: namespace.to_string(),
..Default::default()
}
}
}
impl<T: Topology + K8sclient> Score<T> for CloudNativePgOperatorScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let metadata = ObjectMeta {
name: Some("cloudnative-pg".to_string()),
namespace: Some(self.namespace.clone()),
..ObjectMeta::default()
};
let spec = SubscriptionSpec {
channel: Some(self.channel.clone()),
config: None,
install_plan_approval: Some(self.install_plan_approval.clone()),
name: "cloudnative-pg".to_string(),
source: self.source.clone(),
source_namespace: self.source_namespace.clone(),
starting_csv: None,
};
let subscription = Subscription { metadata, spec };
K8sResourceScore::single(subscription, Some(self.namespace.clone())).create_interpret()
}
fn name(&self) -> String {
format!("CloudNativePgOperatorScore({})", self.namespace)
}
}

View File

@@ -0,0 +1,93 @@
use serde::Serialize;
use crate::interpret::Interpret;
use crate::modules::k8s::resource::K8sResourceScore;
use crate::modules::postgresql::cnpg::{Bootstrap, Cluster, ClusterSpec, Initdb, Storage};
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
/// Deploys an opinionated, highly available PostgreSQL cluster managed by CNPG.
///
/// # Goals
/// - Production-ready Postgres HA (3 instances), persistent storage, app DB.
///
/// # Usage
/// ```
/// use harmony::modules::postgresql::PostgreSQLScore;
/// let score = PostgreSQLScore::new("my-app-ns");
/// ```
///
/// # Limitations (Happy Path)
/// - Requires CNPG operator installed (use CloudNativePgOperatorScore).
/// - No backups, monitoring, extensions configured.
///
/// TODO : refactor this to declare a clean dependency on cnpg operator. Then cnpg operator will
/// self-deploy either using operatorhub or helm chart depending on k8s flavor. This is cnpg
/// specific behavior
#[derive(Debug, Clone, Serialize)]
pub struct PostgreSQLScore {
pub name: String,
/// **Note :** on OpenShfit based clusters, the namespace `default` has security
/// settings incompatible with the default CNPG behavior.
pub namespace: String,
pub instances: i32,
pub storage_size: String,
pub image_name: Option<String>,
}
impl Default for PostgreSQLScore {
fn default() -> Self {
Self {
name: "harmony-pg".to_string(),
// We are using the namespace harmony by default since some clusters (openshift family)
// have incompatible configuration of the default namespace with cnpg
namespace: "harmony".to_string(),
instances: 1,
storage_size: "1Gi".to_string(),
image_name: None, // This lets cnpg use its default image
}
}
}
impl PostgreSQLScore {
pub fn new(namespace: &str) -> Self {
Self {
namespace: namespace.to_string(),
..Default::default()
}
}
}
impl<T: Topology + K8sclient> Score<T> for PostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let metadata = ObjectMeta {
name: Some(self.name.clone()),
namespace: Some(self.namespace.clone()),
..ObjectMeta::default()
};
let spec = ClusterSpec {
instances: self.instances,
image_name: self.image_name.clone(),
storage: Storage {
size: self.storage_size.clone(),
},
bootstrap: Bootstrap {
initdb: Initdb {
database: "app".to_string(),
owner: "app".to_string(),
},
},
..ClusterSpec::default()
};
let cluster = Cluster { metadata, spec };
K8sResourceScore::single(cluster, Some(self.namespace.clone())).create_interpret()
}
fn name(&self) -> String {
format!("PostgreSQLScore({})", self.namespace)
}
}

View File

@@ -0,0 +1,442 @@
use async_trait::async_trait;
use k8s_openapi::ByteString;
use k8s_openapi::api::core::v1::Secret;
use log::{debug, error, info, trace};
use serde::Serialize;
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use tokio::process::Command;
use crate::data::Version;
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory;
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
use harmony_types::id::Id;
/// PostgreSQLConnectionScore tests PostgreSQL database connectivity and performance metrics
/// for databases exposed via public endpoints. This score is specifically designed to verify
/// that PostgreSQL instances installed using the PublicPostgreSQLScore can be accessed by external clients.
///
/// The score performs the following tests:
/// 1. Verifies TLS/SSL connection using CA certificates from Kubernetes secrets
/// 2. Tests basic connectivity to the database
/// 3. (Optional, when db permissions are setup) Collects comprehensive performance metrics including :
/// - Database size and schema usage statistics
/// - Active connections and query activity
/// - Performance metrics (transactions per second, cache hit ratio)
/// - Index usage and table statistics
/// - Configuration parameters
///
/// The implementation uses a Docker container running PostgreSQL client tools to execute
/// the connection test, ensuring consistent behavior across different environments.
///
/// # Kubernetes Secrets Required
///
/// The score requires two Kubernetes secrets in the target namespace:
/// - `{cluster_name}-app`: Contains connection parameters (host, port, username, password, dbname)
/// - `{cluster_name}-ca`: Contains CA certificate (ca.crt) for TLS verification
///
/// # Usage
///
/// ```rust
/// use harmony::modules::postgresql::PostgreSQLConnectionScore;
///
/// let score = PostgreSQLConnectionScore::new(
/// "default",
/// "my-postgres-cluster",
/// None
/// );
/// ```
///
/// # Parameters
///
/// - `namespace`: Kubernetes namespace where the PostgreSQL secrets are located
/// - `cluster_name`: Name of the PostgreSQL cluster (used to construct secret names)
/// - `hostname_override`: Optional hostname override for connection testing
/// - `port_override`: Optional port override for connection testing
#[derive(Debug, Clone, Serialize)]
pub struct PostgreSQLConnectionScore {
pub name: String,
pub namespace: String,
pub cluster_name: String,
pub hostname: Option<String>,
pub port_override: Option<u16>,
}
fn decode_secret(data: &BTreeMap<String, ByteString>, key: &str) -> Result<String, InterpretError> {
let val = data
.get(key)
.ok_or_else(|| InterpretError::new(format!("Secret missing key {}", key)))?;
String::from_utf8(val.0.clone())
.map_err(|e| InterpretError::new(format!("Failed to decode {}: {}", key, e)))
}
impl PostgreSQLConnectionScore {
pub fn new(namespace: &str, cluster_name: &str, hostname_override: Option<String>) -> Self {
Self {
name: format!("postgres-connection-{}", cluster_name),
namespace: namespace.to_string(),
cluster_name: cluster_name.to_string(),
hostname: hostname_override,
port_override: None,
}
}
}
impl<T: Topology + K8sclient + Send + Sync> Score<T> for PostgreSQLConnectionScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(PostgreSQLConnectionInterpret {
score: self.clone(),
})
}
fn name(&self) -> String {
format!("PostgreSQLConnectionScore : {}", self.name)
}
}
#[derive(Debug, Clone)]
struct PostgreSQLConnectionInterpret {
score: PostgreSQLConnectionScore,
}
impl PostgreSQLConnectionInterpret {
async fn fetch_app_secret<T: K8sclient>(&self, topo: &T) -> Result<Secret, InterpretError> {
let app_secret_name = format!("{}-app", self.score.cluster_name);
info!("Fetching app secret {}", app_secret_name);
let k8s_client = topo.k8s_client().await?;
k8s_client
.get_resource(&app_secret_name, Some(&self.score.namespace))
.await
.map_err(|e| InterpretError::new(format!("Failed to get app secret: {e}")))?
.ok_or_else(|| InterpretError::new(format!("App secret {} not found", app_secret_name)))
}
async fn fetch_ca_secret<T: K8sclient>(&self, topo: &T) -> Result<Secret, InterpretError> {
let ca_secret_name = format!("{}-ca", self.score.cluster_name);
info!("Fetching CA secret {}", ca_secret_name);
let k8s_client = topo.k8s_client().await?;
k8s_client
.get_resource(&ca_secret_name, Some(&self.score.namespace))
.await
.map_err(|e| InterpretError::new(format!("Failed to get CA secret: {e}")))?
.ok_or_else(|| InterpretError::new(format!("CA secret {} not found", ca_secret_name)))
}
fn get_secret_data(
&self,
secret: &Secret,
secret_type: &str,
) -> Result<BTreeMap<String, ByteString>, InterpretError> {
secret
.data
.as_ref()
.ok_or_else(|| InterpretError::new(format!("{} secret has no data", secret_type)))
.map(|b| b.clone())
}
fn create_temp_dir(&self) -> Result<tempfile::TempDir, InterpretError> {
tempfile::Builder::new()
.prefix("pg-connection-test-")
.tempdir()
.map_err(|e| InterpretError::new(format!("Failed to create temp directory: {e}")))
}
fn write_ca_cert(
&self,
temp_dir: &Path,
ca_data: &BTreeMap<String, ByteString>,
) -> Result<PathBuf, InterpretError> {
let ca_crt = ca_data
.get("ca.crt")
.ok_or_else(|| InterpretError::new("CA secret missing ca.crt".to_string()))?;
let ca_file = temp_dir.join("ca.crt");
std::fs::write(&ca_file, &ca_crt.0)
.map_err(|e| InterpretError::new(format!("Failed to write CA cert: {e}")))?;
Ok(ca_file)
}
fn get_host(&self, data: &BTreeMap<String, ByteString>) -> Result<String, InterpretError> {
self.score
.hostname
.clone()
.or_else(|| decode_secret(data, "host").ok())
.ok_or_else(|| {
InterpretError::new("No hostname found in secret or override".to_string())
})
}
fn get_port(&self, data: &BTreeMap<String, ByteString>) -> Result<u16, InterpretError> {
self.score
.port_override
.or_else(|| {
decode_secret(data, "port")
.ok()
.and_then(|p| p.parse().ok())
})
.ok_or_else(|| InterpretError::new("Port not found in secret or override".to_string()))
}
fn create_test_script(&self, temp_dir: &Path) -> Result<PathBuf, InterpretError> {
let script_path = temp_dir.join("test_connection.sh");
let script_content = postgres_scipt_content();
std::fs::write(&script_path, script_content)
.map_err(|e| InterpretError::new(format!("Failed to write test script: {e}")))?;
debug!("Wrote script content : \n{script_content}");
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = std::fs::metadata(&script_path)
.map_err(|e| InterpretError::new(format!("Failed to get script metadata: {e}")))?
.permissions();
perms.set_mode(0o755);
std::fs::set_permissions(&script_path, perms).map_err(|e| {
InterpretError::new(format!("Failed to set script permissions: {e}"))
})?;
}
Ok(script_path)
}
async fn run_docker_test(
&self,
temp_dir: &Path,
cmd: &str,
password: &str,
) -> Result<Outcome, InterpretError> {
info!("Running connection test in Docker container...");
let container_cmd = format!("PGPASSWORD={} /tmp/test_connection.sh {}", password, cmd);
debug!("Starting docker container with cmd : {container_cmd}");
let mut cmd = Command::new("docker");
cmd.arg("run")
.arg("--rm")
.arg("-i")
.arg("-v")
.arg(format!("{}/:/tmp", temp_dir.display()))
.arg("--workdir")
.arg("/tmp")
.arg("--entrypoint")
.arg("/bin/sh")
.arg("postgres:latest")
.arg("-c")
.arg(container_cmd)
.env("PGPASSWORD", password)
.stdout(std::process::Stdio::inherit())
.stderr(std::process::Stdio::inherit());
debug!("Running Command {cmd:?}");
let output = cmd
.spawn()
.map_err(|e| InterpretError::new(format!("Failed to spawn docker container: {e}")))?
.wait_with_output()
.await
.map_err(|e| {
InterpretError::new(format!("Failed to wait for docker container: {e}"))
})?;
if output.status.success() {
info!("Successfully connected to PostgreSQL!");
Ok(Outcome::success("Connection successful".to_string()))
} else {
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
error!("Connection failed: stdout:\n{stdout}\nstderr:\n{stderr}");
Err(InterpretError::new(format!(
"Connection failed: stdout:\n{stdout}\nstderr:\n{stderr}",
)))
}
}
}
#[async_trait]
impl<T: Topology + K8sclient + Send + Sync> Interpret<T> for PostgreSQLConnectionInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("PostgreSQLConnectionInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
vec![]
}
async fn execute(&self, _inventory: &Inventory, topo: &T) -> Result<Outcome, InterpretError> {
// Fetch secrets
let app_secret = self.fetch_app_secret(topo).await?;
trace!("Got app_secret {app_secret:?}");
let ca_secret = self.fetch_ca_secret(topo).await?;
trace!("Got ca_secret {ca_secret:?}");
// Get secret data
let app_data = self.get_secret_data(&app_secret, "App")?;
trace!("Got app_data {app_data:?}");
let ca_data = self.get_secret_data(&ca_secret, "CA")?;
trace!("Got ca_data {ca_data:?}");
// Create temp directory
let temp_dir = self.create_temp_dir()?;
let temp_dir_path = temp_dir.path();
debug!("Created temp dir {temp_dir_path:?}");
// Write CA cert
let ca_file = self.write_ca_cert(temp_dir_path, &ca_data)?;
debug!("Wrote ca_file {ca_file:?}");
// Get connection details
let username = decode_secret(&app_data, "username")?;
let password = decode_secret(&app_data, "password")?;
let dbname = decode_secret(&app_data, "dbname")?;
let host = self.get_host(&app_data)?;
let port = self.get_port(&app_data)?;
// Create test script
let script_path = self.create_test_script(temp_dir_path)?;
let ca_file_in_container = Path::new("/tmp").join(ca_file.file_name().unwrap());
let script_cmd = format!(
"{host} {port} {username} {dbname} {}",
ca_file_in_container.display()
);
debug!("Prepared test script in {}", temp_dir_path.display());
// Run connection test
self.run_docker_test(temp_dir_path, &script_cmd, &password)
.await
}
}
fn postgres_scipt_content() -> &'static str {
r#"
#!/bin/sh
# PostgreSQL connection test and metrics collection script
# Basic connectivity test
echo "=== CONNECTION TEST ==="
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT 1" > /dev/null 2>&1
if [ $? -ne 0 ]; then
echo "ERROR: Connection failed"
exit 1
fi
echo "Connection successful"
# Database size metrics
echo -e "\n=== DATABASE SIZE METRICS ==="
echo "Total database size (MB):"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT pg_size_pretty(pg_database_size(current_database()))" -t -A
echo "Database size breakdown:"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT
schema_name,
pg_size_pretty(sum(table_size)) as total_size
FROM (
SELECT
n.nspname as schema_name,
c.relname as table_name,
pg_total_relation_size(c.oid) as table_size
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname NOT IN ('pg_catalog', 'information_schema')
AND c.relkind = 'r'
) t
GROUP BY schema_name
ORDER BY sum(table_size) DESC" -t
# Connection and activity metrics
echo -e "\n=== CONNECTION & ACTIVITY ==="
echo "Active connections:"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT count(*) FROM pg_stat_activity" -t -A
echo "Current queries (running longer than 1 second):"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT
pid,
usename,
query_start,
now() - query_start as duration,
state,
left(query, 50) as query_preview
FROM pg_stat_activity
WHERE state = 'active' AND now() - query_start > interval '1 second'
ORDER BY duration DESC" -t
# Performance metrics
echo -e "\n=== PERFORMANCE METRICS ==="
echo "Database load (transactions per second):"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT
tps,
tps_commit,
tps_rollback,
blks_read,
blks_hit,
hit_ratio
FROM (
SELECT
xact_commit as tps_commit,
xact_rollback as tps_rollback,
(xact_commit + xact_rollback) as tps,
blks_read,
blks_hit,
CASE WHEN blks_read + blks_hit = 0 THEN 0 ELSE (blks_hit * 100.0 / (blks_read + blks_hit))::numeric(5,2) END as hit_ratio
FROM pg_stat_database
WHERE datname = current_database()
) stats" -t
echo "Current locks:"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT
count(*) as lock_count,
string_agg(mode, ', ' ORDER BY mode) as lock_modes
FROM pg_locks" -t
# Table statistics
echo -e "\n=== TABLE STATISTICS ==="
echo "Most accessed tables:"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT
relname,
seq_scan,
idx_scan,
n_tup_ins,
n_tup_upd,
n_tup_del
FROM pg_stat_user_tables
ORDER BY seq_scan + idx_scan + n_tup_ins + n_tup_upd + n_tup_del DESC
LIMIT 10" -t
# Index usage
echo -e "\n=== INDEX USAGE ==="
echo "Index usage statistics:"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT
indexrelname as index_name,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes
ORDER BY idx_scan DESC
LIMIT 5" -t
# Configuration and limits
echo -e "\n=== CONFIGURATION ==="
echo "Current database parameters:"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT
name,
setting,
unit
FROM pg_settings
WHERE category = 'Resource Usage'
ORDER BY name" -t
echo -e "\n=== TEST COMPLETE ==="
echo "All metrics collected successfully"
exit 0
"#
}

View File

@@ -0,0 +1,100 @@
use async_trait::async_trait;
use harmony_types::id::Id;
use serde::Serialize;
use crate::data::Version;
use crate::domain::topology::router::{TlsRoute, TlsRouter};
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory;
use crate::modules::postgresql::PostgreSQLScore;
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
/// Deploys a public PostgreSQL cluster: CNPG + TLS passthrough route for RW endpoint.
/// For failover/multisite: exposes single-instance or small HA Postgres publicly.
///
/// Sequence: PostgreSQLScore → TlsRouter::install_route (RW backend).
///
/// # Usage
/// ```
/// use harmony::modules::postgresql::PublicPostgreSQLScore;
/// let score = PublicPostgreSQLScore::new("harmony", "pg-rw.example.com");
/// ```
#[derive(Debug, Clone, Serialize)]
pub struct PublicPostgreSQLScore {
/// Inner non-public Postgres cluster config.
pub postgres_score: PostgreSQLScore,
/// Public hostname for RW TLS passthrough (port 443 → cluster-rw:5432).
pub hostname: String,
}
impl PublicPostgreSQLScore {
pub fn new(namespace: &str, hostname: &str) -> Self {
Self {
postgres_score: PostgreSQLScore::new(namespace),
hostname: hostname.to_string(),
}
}
}
impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Score<T> for PublicPostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let rw_backend = format!("{}-rw", self.postgres_score.name);
let tls_route = TlsRoute {
namespace: self.postgres_score.namespace.clone(),
hostname: self.hostname.clone(),
backend: rw_backend,
target_port: 5432,
};
Box::new(PublicPostgreSQLInterpret {
postgres_score: self.postgres_score.clone(),
tls_route,
})
}
fn name(&self) -> String {
format!(
"PublicPostgreSQLScore({}:{})",
self.postgres_score.namespace, self.hostname
)
}
}
/// Custom interpret: deploy Postgres then install public TLS route.
#[derive(Debug, Clone)]
struct PublicPostgreSQLInterpret {
postgres_score: PostgreSQLScore,
tls_route: TlsRoute,
}
#[async_trait]
impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Interpret<T> for PublicPostgreSQLInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("PublicPostgreSQLInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
async fn execute(&self, inventory: &Inventory, topo: &T) -> Result<Outcome, InterpretError> {
// Deploy CNPG cluster first (creates -rw service)
self.postgres_score.interpret(inventory, topo).await?;
// Expose RW publicly via TLS passthrough
topo.install_route(self.tls_route.clone())
.await
.map_err(|e| InterpretError::new(e))?;
Ok(Outcome::success(format!(
"Public CNPG cluster '{}' deployed with TLS passthrough route '{}'",
self.postgres_score.name.clone(),
self.tls_route.hostname
)))
}
}

View File

@@ -9,3 +9,4 @@ license.workspace = true
serde.workspace = true
url.workspace = true
rand.workspace = true
serde_json.workspace = true

View File

@@ -1,3 +1,5 @@
pub mod id;
pub mod net;
pub mod storage;
pub mod switch;
pub mod rfc1123;

View File

@@ -0,0 +1,232 @@
/// A String that can be used as a subdomain.
///
/// This means the name must:
///
/// - contain no more than 253 characters
/// - contain only lowercase alphanumeric characters, '-' or '.'
/// - start with an alphanumeric character
/// - end with an alphanumeric character
///
/// https://datatracker.ietf.org/doc/html/rfc1123
///
/// This is relevant in harmony since most k8s resource names are required to be usable as dns
/// subdomains.
///
/// See https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
#[derive(Debug, Clone)]
pub struct Rfc1123Name {
content: String,
}
impl TryFrom<&str> for Rfc1123Name {
fn try_from(s: &str) -> Result<Self, String> {
let mut content = s.to_lowercase();
// Remove invalid characters
content.retain(|c| c.is_ascii_alphanumeric() || c == '-' || c == '.');
// Enforce max length
if content.len() > 253 {
content.truncate(253);
}
// Trim leading/trailing dots
content = content.trim_matches('.').to_string();
// Deduplicate consecutive dots
loop {
let new_content = content.replace("..", ".");
if new_content == content {
break;
}
content = new_content;
}
// Trim leading/trailing non-alphanumeric
content = content.trim_matches(|c: char| !c.is_ascii_alphanumeric()).to_string();
if content.is_empty() {
return Err(format!("Input '{}' resulted in empty string", s));
}
Ok(Self { content })
}
type Error = String;
}
/// Converts an `Rfc1123Name` into a `String`.
///
/// This allows using `Rfc1123Name` in contexts where a `String` is expected.
impl From<Rfc1123Name> for String {
fn from(name: Rfc1123Name) -> Self {
name.content
}
}
/// Serializes the `Rfc1123Name` as a string.
///
/// This directly serializes the inner `String` content without additional wrapping.
impl serde::Serialize for Rfc1123Name {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.content)
}
}
/// Deserializes an `Rfc1123Name` from a string.
///
/// This directly deserializes into the inner `String` content without additional wrapping.
impl<'de> serde::Deserialize<'de> for Rfc1123Name {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let content = String::deserialize(deserializer)?;
Ok(Self { content })
}
}
/// Displays the `Rfc1123Name` as a string.
///
/// This directly displays the inner `String` content without additional wrapping.
impl std::fmt::Display for Rfc1123Name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.content)
}
}
#[cfg(test)]
mod tests {
use super::Rfc1123Name;
#[test]
fn test_try_from_empty() {
let name = Rfc1123Name::try_from("");
assert!(name.is_err());
}
#[test]
fn test_try_from_valid() {
let name = Rfc1123Name::try_from("hello-world").unwrap();
assert_eq!(name.content, "hello-world");
}
#[test]
fn test_try_from_uppercase() {
let name = Rfc1123Name::try_from("Hello-World").unwrap();
assert_eq!(name.content, "hello-world");
}
#[test]
fn test_try_from_invalid_chars() {
let name = Rfc1123Name::try_from("hel@lo#w!or%ld123").unwrap();
assert_eq!(name.content, "helloworld123");
}
#[test]
fn test_try_from_leading_dot() {
let name = Rfc1123Name::try_from(".hello").unwrap();
assert_eq!(name.content, "hello");
}
#[test]
fn test_try_from_trailing_dot() {
let name = Rfc1123Name::try_from("hello.").unwrap();
assert_eq!(name.content, "hello");
}
#[test]
fn test_try_from_leading_hyphen() {
let name = Rfc1123Name::try_from("-hello").unwrap();
assert_eq!(name.content, "hello");
}
#[test]
fn test_try_from_complicated_string() {
let name = Rfc1123Name::try_from("--h--e,}{}12!$#)\np_aulbS\r\t.!@o--._--").unwrap();
assert_eq!(name.content, "h--e12paulbs.o");
}
#[test]
fn test_try_from_trailing_hyphen() {
let name = Rfc1123Name::try_from("hello-").unwrap();
assert_eq!(name.content, "hello");
}
#[test]
fn test_try_from_single_hyphen() {
let name = Rfc1123Name::try_from("-");
assert!(name.is_err());
}
#[test]
fn test_from_str() {
let name: Rfc1123Name = "test-name".try_into().unwrap();
assert_eq!(name.content, "test-name");
}
#[test]
fn test_into_string() {
let name = Rfc1123Name::try_from("test").unwrap();
let s: String = name.into();
assert_eq!(s, "test");
}
#[test]
fn test_compliance() {
let inputs = vec![
"valid",
"in-VALID",
".dots",
"-hyphen",
"hyphen-",
"!!1@",
"aaaaaaaaaa",
"--abc--",
"a.b-c",
];
for input in inputs {
let name = Rfc1123Name::try_from(input).unwrap();
let s = &name.content;
// Check only allowed characters
for c in s.chars() {
assert!(c.is_ascii_alphanumeric() || c == '-' || c == '.');
}
// Check starts and ends with alphanumeric
if !s.is_empty() {
assert!(s.chars().next().unwrap().is_ascii_alphanumeric());
assert!(s.chars().last().unwrap().is_ascii_alphanumeric());
}
}
}
#[test]
fn test_enforces_max_length() {
let long_input = "a".repeat(300);
let name = Rfc1123Name::try_from(long_input.as_str()).unwrap();
assert_eq!(name.content.len(), 253);
assert_eq!(name.content, "a".repeat(253));
}
#[test]
fn test_truncate_trim_end() {
let input = "a".repeat(252) + "-";
let name = Rfc1123Name::try_from(input.as_str()).unwrap();
assert_eq!(name.content.len(), 252);
assert_eq!(name.content, "a".repeat(252));
}
#[test]
fn test_dedup_dots() {
let input = "a..b...c";
let name = Rfc1123Name::try_from(input).unwrap();
assert_eq!(name.content, "a.b.c");
}
}

View File

@@ -0,0 +1,160 @@
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Debug)]
pub struct StorageSize {
size_bytes: u64,
#[serde(skip)]
display_suffix: Option<String>,
}
impl StorageSize {
pub fn new(size_bytes: u64) -> Self {
Self {
size_bytes,
display_suffix: None,
}
}
pub fn b(size: u64) -> Self {
Self {
size_bytes: size,
display_suffix: Some("B".to_string()),
}
}
pub fn kb(size: u64) -> Self {
Self {
size_bytes: size * 1024,
display_suffix: Some("KB".to_string()),
}
}
pub fn mb(size: u64) -> Self {
Self {
size_bytes: size * 1024 * 1024,
display_suffix: Some("MB".to_string()),
}
}
pub fn gb(size: u64) -> Self {
Self {
size_bytes: size * 1024 * 1024 * 1024,
display_suffix: Some("GB".to_string()),
}
}
pub fn gi(size: u64) -> Self {
Self {
size_bytes: size * 1024 * 1024 * 1024,
display_suffix: Some("GiB".to_string()),
}
}
pub fn tb(size: u64) -> Self {
Self {
size_bytes: size * 1024 * 1024 * 1024 * 1024,
display_suffix: Some("TB".to_string()),
}
}
pub fn ti(size: u64) -> Self {
Self {
size_bytes: size * 1024 * 1024 * 1024 * 1024,
display_suffix: Some("TiB".to_string()),
}
}
pub fn bytes(&self) -> u64 {
self.size_bytes
}
}
impl fmt::Display for StorageSize {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(suffix) = &self.display_suffix {
write!(f, "{}{}", self.size_bytes, suffix)
} else {
write!(f, "{}B", self.size_bytes)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bytes() {
let size = StorageSize::b(123);
assert_eq!(size.bytes(), 123);
assert_eq!(size.to_string(), "123B");
}
#[test]
fn test_kilobytes() {
let size = StorageSize::kb(2);
assert_eq!(size.bytes(), 2048);
assert_eq!(size.to_string(), "2048KB");
}
#[test]
fn test_megabytes() {
let size = StorageSize::mb(3);
assert_eq!(size.bytes(), 3 * 1024 * 1024);
assert_eq!(size.to_string(), "3145728MB");
}
#[test]
fn test_gigabytes() {
let size = StorageSize::gb(4);
assert_eq!(size.bytes(), 4 * 1024 * 1024 * 1024);
assert_eq!(size.to_string(), "4294967296GB");
}
#[test]
fn test_gibibytes() {
let size = StorageSize::gi(1);
assert_eq!(size.bytes(), 1024 * 1024 * 1024);
assert_eq!(size.to_string(), "1073741824GiB");
}
#[test]
fn test_terabytes() {
let size = StorageSize::tb(5);
assert_eq!(size.bytes(), 5 * 1024 * 1024 * 1024 * 1024);
assert_eq!(size.to_string(), "5497558138880TB");
}
#[test]
fn test_tebibytes() {
let size = StorageSize::ti(1);
assert_eq!(size.bytes(), 1024 * 1024 * 1024 * 1024);
assert_eq!(size.to_string(), "1099511627776TiB");
}
#[test]
fn test_new_without_suffix() {
let size = StorageSize::new(999);
assert_eq!(size.bytes(), 999);
assert_eq!(size.to_string(), "999B");
}
#[test]
fn test_serde_roundtrip() {
let original = StorageSize::gi(1);
let serialized = serde_json::to_string(&original).unwrap();
let deserialized: StorageSize = serde_json::from_str(&serialized).unwrap();
assert_eq!(original.bytes(), deserialized.bytes());
// Note: suffix is lost during serialization/deserialization
assert_ne!(original.to_string(), deserialized.to_string());
}
#[test]
fn test_ord() {
let one_gb = StorageSize::gb(1);
let one_gi = StorageSize::gi(1);
assert!(one_gb < one_gi); // 1GB = 1000MB, 1GiB = 1024MB
}
}