Merge pull request 'feat: Introducing FailoverTopology and OperatorHub Catalog Subscription with example' (#196) from feat/multisitePostgreSQL into master
Some checks failed
Run Check Script / check (push) Has been cancelled
Compile and package harmony_composer / package_harmony_composer (push) Has been cancelled

Reviewed-on: #196
Reviewed-by: wjro <wrolleman@nationtech.io>
This commit is contained in:
2026-01-06 15:41:12 +00:00
18 changed files with 761 additions and 0 deletions

View File

@@ -0,0 +1,19 @@
use async_trait::async_trait;
use crate::topology::{PreparationError, PreparationOutcome, Topology};
pub struct FailoverTopology<T> {
pub primary: T,
pub replica: T,
}
#[async_trait]
impl<T: Send + Sync> Topology for FailoverTopology<T> {
fn name(&self) -> &str {
"FailoverTopology"
}
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
todo!()
}
}

View File

@@ -1,5 +1,7 @@
mod failover;
mod ha_cluster;
pub mod ingress;
pub use failover::*;
use harmony_types::net::IpAddress;
mod host_binding;
mod http;

View File

@@ -0,0 +1,157 @@
use std::collections::BTreeMap;
use k8s_openapi::{
api::core::v1::{Affinity, Toleration},
apimachinery::pkg::apis::meta::v1::ObjectMeta,
};
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
#[kube(
group = "operators.coreos.com",
version = "v1alpha1",
kind = "CatalogSource",
plural = "catalogsources",
namespaced = true,
schema = "disabled"
)]
#[serde(rename_all = "camelCase")]
pub struct CatalogSourceSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub address: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub config_map: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub display_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub grpc_pod_config: Option<GrpcPodConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub icon: Option<Icon>,
#[serde(skip_serializing_if = "Option::is_none")]
pub image: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub priority: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub publisher: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub run_as_root: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub secrets: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub source_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub update_strategy: Option<UpdateStrategy>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct GrpcPodConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub affinity: Option<Affinity>,
#[serde(skip_serializing_if = "Option::is_none")]
pub extract_content: Option<ExtractContent>,
#[serde(skip_serializing_if = "Option::is_none")]
pub memory_target: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_selector: Option<BTreeMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub priority_class_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub security_context_config: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tolerations: Option<Vec<Toleration>>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ExtractContent {
pub cache_dir: String,
pub catalog_dir: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Icon {
pub base64data: String,
pub mediatype: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct UpdateStrategy {
#[serde(skip_serializing_if = "Option::is_none")]
pub registry_poll: Option<RegistryPoll>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct RegistryPoll {
#[serde(skip_serializing_if = "Option::is_none")]
pub interval: Option<String>,
}
impl Default for CatalogSource {
fn default() -> Self {
Self {
metadata: ObjectMeta::default(),
spec: CatalogSourceSpec {
address: None,
config_map: None,
description: None,
display_name: None,
grpc_pod_config: None,
icon: None,
image: None,
priority: None,
publisher: None,
run_as_root: None,
secrets: None,
source_type: None,
update_strategy: None,
},
}
}
}
impl Default for CatalogSourceSpec {
fn default() -> Self {
Self {
address: None,
config_map: None,
description: None,
display_name: None,
grpc_pod_config: None,
icon: None,
image: None,
priority: None,
publisher: None,
run_as_root: None,
secrets: None,
source_type: None,
update_strategy: None,
}
}
}

View File

@@ -0,0 +1,2 @@
mod catalogsources_operators_coreos_com;
pub use catalogsources_operators_coreos_com::*;

View File

@@ -0,0 +1,3 @@
mod operatorhub;
pub use operatorhub::*;
pub mod crd;

View File

@@ -0,0 +1,107 @@
// Write operatorhub catalog score
// for now this will only support on OKD with the default catalog and operatorhub setup and does not verify OLM state or anything else. Very opinionated and bare-bones to start
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use serde::Serialize;
use crate::interpret::Interpret;
use crate::modules::k8s::apps::crd::{
CatalogSource, CatalogSourceSpec, RegistryPoll, UpdateStrategy,
};
use crate::modules::k8s::resource::K8sResourceScore;
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
/// Installs the CatalogSource in a cluster which already has the required services and CRDs installed.
///
/// ```rust
/// use harmony::modules::k8s::apps::OperatorHubCatalogSourceScore;
///
/// let score = OperatorHubCatalogSourceScore::default();
/// ```
///
/// Required services:
/// - catalog-operator
/// - olm-operator
///
/// They are installed by default with OKD/Openshift
///
/// **Warning** : this initial implementation does not manage the dependencies. They must already
/// exist in the cluster.
#[derive(Debug, Clone, Serialize)]
pub struct OperatorHubCatalogSourceScore {
pub name: String,
pub namespace: String,
pub image: String,
}
impl OperatorHubCatalogSourceScore {
pub fn new(name: &str, namespace: &str, image: &str) -> Self {
Self {
name: name.to_string(),
namespace: namespace.to_string(),
image: image.to_string(),
}
}
}
impl Default for OperatorHubCatalogSourceScore {
/// This default implementation will create this k8s resource :
///
/// ```yaml
/// apiVersion: operators.coreos.com/v1alpha1
/// kind: CatalogSource
/// metadata:
/// name: operatorhubio-catalog
/// namespace: openshift-marketplace
/// spec:
/// sourceType: grpc
/// image: quay.io/operatorhubio/catalog:latest
/// displayName: Operatorhub Operators
/// publisher: OperatorHub.io
/// updateStrategy:
/// registryPoll:
/// interval: 60m
/// ```
fn default() -> Self {
OperatorHubCatalogSourceScore {
name: "operatorhubio-catalog".to_string(),
namespace: "openshift-marketplace".to_string(),
image: "quay.io/operatorhubio/catalog:latest".to_string(),
}
}
}
impl<T: Topology + K8sclient> Score<T> for OperatorHubCatalogSourceScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let metadata = ObjectMeta {
name: Some(self.name.clone()),
namespace: Some(self.namespace.clone()),
..ObjectMeta::default()
};
let spec = CatalogSourceSpec {
source_type: Some("grpc".to_string()),
image: Some(self.image.clone()),
display_name: Some("Operatorhub Operators".to_string()),
publisher: Some("OperatorHub.io".to_string()),
update_strategy: Some(UpdateStrategy {
registry_poll: Some(RegistryPoll {
interval: Some("60m".to_string()),
}),
}),
..CatalogSourceSpec::default()
};
let catalog_source = CatalogSource {
metadata,
spec: spec,
};
K8sResourceScore::single(catalog_source, Some(self.namespace.clone())).create_interpret()
}
fn name(&self) -> String {
format!("OperatorHubCatalogSourceScore({})", self.name)
}
}

View File

@@ -1,3 +1,4 @@
pub mod apps;
pub mod deployment;
pub mod ingress;
pub mod namespace;

View File

@@ -13,6 +13,7 @@ pub mod load_balancer;
pub mod monitoring;
pub mod okd;
pub mod opnsense;
pub mod postgresql;
pub mod prometheus;
pub mod storage;
pub mod tenant;

View File

@@ -0,0 +1,85 @@
use async_trait::async_trait;
use harmony_types::storage::StorageSize;
use serde::Serialize;
use std::collections::HashMap;
#[async_trait]
pub trait PostgreSQL: Send + Sync {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String>;
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
/// Abstracts away storage/retrieval details (e.g., secrets, files).
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String>;
/// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster.
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String>;
/// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough).
/// Returns None if no public endpoint (internal-only cluster).
/// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex
/// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait.
async fn get_public_endpoint(
&self,
cluster_name: &str,
) -> Result<Option<PostgreSQLEndpoint>, String>;
}
#[derive(Clone, Debug, Serialize)]
pub struct PostgreSQLConfig {
pub cluster_name: String,
pub instances: u32,
pub storage_size: StorageSize,
pub role: PostgreSQLClusterRole,
}
#[derive(Clone, Debug, Serialize)]
pub enum PostgreSQLClusterRole {
Primary,
Replica(ReplicaConfig),
}
#[derive(Clone, Debug, Serialize)]
pub struct ReplicaConfig {
/// Name of the primary cluster this replica will sync from
pub primary_cluster_name: String,
/// Certs extracted from primary via Topology::get_replication_certs()
pub replication_certs: ReplicationCerts,
/// Bootstrap method (e.g., pg_basebackup from primary)
pub bootstrap: BootstrapConfig,
/// External cluster connection details for CNPG spec.externalClusters
pub external_cluster: ExternalClusterConfig,
}
#[derive(Clone, Debug, Serialize)]
pub struct BootstrapConfig {
pub strategy: BootstrapStrategy,
}
#[derive(Clone, Debug, Serialize)]
pub enum BootstrapStrategy {
PgBasebackup,
}
#[derive(Clone, Debug, Serialize)]
pub struct ExternalClusterConfig {
/// Name used in CNPG externalClusters list
pub name: String,
/// Connection params (host/port set by multisite logic, sslmode='verify-ca', etc.)
pub connection_parameters: HashMap<String, String>,
}
#[derive(Clone, Debug, Serialize)]
pub struct ReplicationCerts {
/// PEM-encoded CA cert from primary
pub ca_cert_pem: String,
/// PEM-encoded streaming_replica client cert (tls.crt)
pub streaming_replica_cert_pem: String,
/// PEM-encoded streaming_replica client key (tls.key)
pub streaming_replica_key_pem: String,
}
#[derive(Clone, Debug)]
pub struct PostgreSQLEndpoint {
pub host: String,
pub port: u16,
}

View File

@@ -0,0 +1,125 @@
use async_trait::async_trait;
use log::debug;
use log::info;
use std::collections::HashMap;
use crate::{
modules::postgresql::capability::{
BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL,
PostgreSQLClusterRole, PostgreSQLConfig, PostgreSQLEndpoint, ReplicaConfig,
ReplicationCerts,
},
topology::FailoverTopology,
};
#[async_trait]
impl<T: PostgreSQL> PostgreSQL for FailoverTopology<T> {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
info!(
"Starting deployment of failover topology '{}'",
config.cluster_name
);
let primary_config = PostgreSQLConfig {
cluster_name: config.cluster_name.clone(),
instances: config.instances,
storage_size: config.storage_size.clone(),
role: PostgreSQLClusterRole::Primary,
};
info!(
"Deploying primary cluster '{{}}' ({} instances, {:?} storage)",
primary_config.cluster_name, primary_config.storage_size
);
let primary_cluster_name = self.primary.deploy(&primary_config).await?;
info!("Primary cluster '{primary_cluster_name}' deployed successfully");
info!("Retrieving replication certificates for primary '{primary_cluster_name}'");
let certs = self
.primary
.get_replication_certs(&primary_cluster_name)
.await?;
info!("Replication certificates retrieved successfully");
info!("Retrieving public endpoint for primary '{primary_cluster_name}");
let endpoint = self
.primary
.get_public_endpoint(&primary_cluster_name)
.await?
.ok_or_else(|| "No public endpoint configured on primary cluster".to_string())?;
info!(
"Public endpoint '{}:{}' retrieved for primary",
endpoint.host, endpoint.port
);
info!("Configuring replica connection parameters and bootstrap");
let mut connection_parameters = HashMap::new();
connection_parameters.insert("host".to_string(), endpoint.host);
connection_parameters.insert("port".to_string(), endpoint.port.to_string());
connection_parameters.insert("dbname".to_string(), "postgres".to_string());
connection_parameters.insert("user".to_string(), "streaming_replica".to_string());
connection_parameters.insert("sslmode".to_string(), "verify-ca".to_string());
connection_parameters.insert("sslnegotiation".to_string(), "direct".to_string());
debug!("Replica connection parameters: {:?}", connection_parameters);
let external_cluster = ExternalClusterConfig {
name: primary_cluster_name.clone(),
connection_parameters,
};
let bootstrap_config = BootstrapConfig {
strategy: BootstrapStrategy::PgBasebackup,
};
let replica_cluster_config = ReplicaConfig {
primary_cluster_name: primary_cluster_name.clone(),
replication_certs: certs,
bootstrap: bootstrap_config,
external_cluster,
};
let replica_config = PostgreSQLConfig {
cluster_name: format!("{}-replica", primary_cluster_name),
instances: config.instances,
storage_size: config.storage_size.clone(),
role: PostgreSQLClusterRole::Replica(replica_cluster_config),
};
info!(
"Deploying replica cluster '{}' ({} instances, {:?} storage) on replica topology",
replica_config.cluster_name, replica_config.instances, replica_config.storage_size
);
self.replica.deploy(&replica_config).await?;
info!(
"Replica cluster '{}' deployed successfully; failover topology '{}' ready",
replica_config.cluster_name, config.cluster_name
);
Ok(primary_cluster_name)
}
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> {
self.primary.get_replication_certs(cluster_name).await
}
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String> {
self.primary.get_endpoint(cluster_name).await
}
async fn get_public_endpoint(
&self,
cluster_name: &str,
) -> Result<Option<PostgreSQLEndpoint>, String> {
self.primary.get_public_endpoint(cluster_name).await
}
}

View File

@@ -0,0 +1,4 @@
pub mod capability;
mod score;
pub mod failover;

View File

@@ -0,0 +1,88 @@
use crate::{
domain::{data::Version, interpret::InterpretStatus},
interpret::{Interpret, InterpretError, InterpretName, Outcome},
inventory::Inventory,
modules::postgresql::capability::PostgreSQL,
score::Score,
topology::Topology,
};
use super::capability::*;
use harmony_types::id::Id;
use async_trait::async_trait;
use log::info;
use serde::Serialize;
#[derive(Clone, Debug, Serialize)]
pub struct PostgreSQLScore {
config: PostgreSQLConfig,
}
#[derive(Debug, Clone)]
pub struct PostgreSQLInterpret {
config: PostgreSQLConfig,
version: Version,
status: InterpretStatus,
}
impl PostgreSQLInterpret {
pub fn new(config: PostgreSQLConfig) -> Self {
let version = Version::from("1.0.0").expect("Version should be valid");
Self {
config,
version,
status: InterpretStatus::QUEUED,
}
}
}
impl<T: Topology + PostgreSQL> Score<T> for PostgreSQLScore {
fn name(&self) -> String {
"PostgreSQLScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(PostgreSQLInterpret::new(self.config.clone()))
}
}
#[async_trait]
impl<T: Topology + PostgreSQL> Interpret<T> for PostgreSQLInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("PostgreSQLInterpret")
}
fn get_version(&self) -> crate::domain::data::Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
info!(
"Executing PostgreSQLInterpret with config {:?}",
self.config
);
let cluster_name = topology
.deploy(&self.config)
.await
.map_err(|e| InterpretError::from(e))?;
Ok(Outcome::success(format!(
"Deployed PostgreSQL cluster `{cluster_name}`"
)))
}
}