Compare commits

..

22 Commits

Author SHA1 Message Date
204795a74f feat(failoverPostgres): Its alive! We can now deploy a multisite postgres instance. The public hostname is still hardcoded, we will have to fix that but the rest is good enough
Some checks failed
Run Check Script / check (pull_request) Failing after 36s
2025-12-17 16:43:37 -05:00
66a9a76a6b feat(postgres): Failover postgres example maybe working!? Added FailoverTopology implementations for required capabilities, documented a bit, some more tests, and quite a few utility functions
Some checks failed
Run Check Script / check (pull_request) Failing after 1m49s
2025-12-17 14:35:10 -05:00
440e684b35 feat: Postgresql score based on the postgres capability now. true infrastructure abstraction!
Some checks failed
Run Check Script / check (pull_request) Failing after 33s
2025-12-16 23:35:52 -05:00
b0383454f0 feat(types): Add utility initialization functions for StorageSize such as StorageSize::kb(324)
Some checks failed
Run Check Script / check (pull_request) Failing after 41s
2025-12-16 16:24:53 -05:00
9e8f3ce52f feat(postgres): Postgres Connection Test score now has a script that provides more insight. Not quite working properly but easy to improve at this point.
Some checks failed
Run Check Script / check (pull_request) Failing after 43s
2025-12-16 15:53:54 -05:00
c3ec7070ec feat: PostgreSQL public and Connection test score, also moved k8s_anywhere in a folder
Some checks failed
Run Check Script / check (pull_request) Failing after 40s
2025-12-16 14:57:02 -05:00
29821d5e9f feat: TlsPassthroughScore works, improved logging, fixed CRD
Some checks failed
Run Check Script / check (pull_request) Failing after 35s
2025-12-15 19:09:10 -05:00
446e079595 wip: public postgres many fixes and refactoring to have a more cohesive routing management
Some checks failed
Run Check Script / check (pull_request) Failing after 41s
2025-12-15 17:04:30 -05:00
e0da5764fb feat(types): Added Rfc1123 String type, useful for k8s names
Some checks failed
Run Check Script / check (pull_request) Failing after 38s
2025-12-15 12:57:52 -05:00
e9cab92585 feat: Impl TlsRoute for K8sAnywhereTopology 2025-12-14 22:22:09 -05:00
d06bd4dac6 feat: OKD route CRD and OKD specific route score
All checks were successful
Run Check Script / check (pull_request) Successful in 1m30s
2025-12-14 17:05:26 -05:00
142300802d wip: TlsRoute score first version
Some checks failed
Run Check Script / check (pull_request) Failing after 1m11s
2025-12-14 06:19:33 -05:00
2254641f3d fix: Tests, doctests, formatting
All checks were successful
Run Check Script / check (pull_request) Successful in 1m38s
2025-12-13 17:56:53 -05:00
b61e4f9a96 wip: Expose postgres publicly. Created tlsroute capability and postgres implementations
Some checks failed
Run Check Script / check (pull_request) Failing after 41s
2025-12-13 09:47:59 -05:00
2e367d88d4 feat: PostgreSQL score works, added postgresql example, tested on OKD 4.19, added note about incompatible default namespace settings
Some checks failed
Run Check Script / check (pull_request) Failing after 2m37s
2025-12-11 22:54:57 -05:00
9edc42a665 feat: PostgreSQLScore happy path using cnpg operator
Some checks failed
Run Check Script / check (pull_request) Failing after 37s
2025-12-11 14:36:39 -05:00
f242aafebb feat: Subscription for cnpg-operator fixed default values, tested and added to operatorhub example.
All checks were successful
Run Check Script / check (pull_request) Successful in 1m31s
2025-12-11 12:18:28 -05:00
3e14ebd62c feat: cnpg operator score
All checks were successful
Run Check Script / check (pull_request) Successful in 1m36s
2025-12-10 22:55:08 -05:00
1b19638df4 wip(failover): Started implementation of the FailoverTopology with PostgreSQL capability
All checks were successful
Run Check Script / check (pull_request) Successful in 1m32s
This is our first Higher Order Topology (see ADR-015)
2025-12-10 21:15:51 -05:00
d39b1957cd feat(k8s_app): OperatorhubCatalogSourceScore can now install the operatorhub catalogsource on a cluster that already has operator lifecycle manager installed 2025-12-10 16:58:58 -05:00
357ca93d90 wip: FailoverTopology implementation for PostgreSQL on the way! 2025-12-10 13:12:53 -05:00
8103932f23 doc: Initial documentation for the MultisitePostgreSQL module 2025-12-10 13:12:53 -05:00
111 changed files with 3851 additions and 1501 deletions

77
Cargo.lock generated
View File

@@ -690,23 +690,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "brocade-switch"
version = "0.1.0"
dependencies = [
"async-trait",
"brocade",
"env_logger",
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_types",
"log",
"serde",
"tokio",
"url",
]
[[package]]
name = "brotli"
version = "8.0.2"
@@ -1793,6 +1776,21 @@ dependencies = [
"url",
]
[[package]]
name = "example-multisite-postgres"
version = "0.1.0"
dependencies = [
"cidr",
"env_logger",
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_types",
"log",
"tokio",
"url",
]
[[package]]
name = "example-nanodc"
version = "0.1.0"
@@ -1852,6 +1850,21 @@ dependencies = [
"url",
]
[[package]]
name = "example-operatorhub-catalogsource"
version = "0.1.0"
dependencies = [
"cidr",
"env_logger",
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_types",
"log",
"tokio",
"url",
]
[[package]]
name = "example-opnsense"
version = "0.1.0"
@@ -2496,19 +2509,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "harmony_inventory_builder"
version = "0.1.0"
dependencies = [
"cidr",
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_types",
"tokio",
"url",
]
[[package]]
name = "harmony_macros"
version = "0.1.0"
@@ -2574,9 +2574,9 @@ dependencies = [
name = "harmony_types"
version = "0.1.0"
dependencies = [
"log",
"rand 0.9.2",
"serde",
"serde_json",
"url",
]
@@ -6080,21 +6080,6 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8f50febec83f5ee1df3015341d8bd429f2d1cc62bcba7ea2076759d315084683"
[[package]]
name = "test-score"
version = "0.1.0"
dependencies = [
"base64 0.22.1",
"env_logger",
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_types",
"log",
"tokio",
"url",
]
[[package]]
name = "thiserror"
version = "1.0.69"

View File

@@ -1,6 +1,6 @@
use std::net::{IpAddr, Ipv4Addr};
use brocade::{BrocadeOptions, ssh};
use brocade::BrocadeOptions;
use harmony_secret::{Secret, SecretManager};
use harmony_types::switch::PortLocation;
use serde::{Deserialize, Serialize};
@@ -16,28 +16,23 @@ async fn main() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
// let ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 250)); // old brocade @ ianlet
let ip = IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)); // brocade @ sto1
let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 55, 101)); // brocade @ sto1
// let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 4, 11)); // brocade @ st
let switch_addresses = vec![ip];
// let config = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
// .await
// .unwrap();
let config = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
.await
.unwrap();
let brocade = brocade::init(
&switch_addresses,
// &config.username,
// &config.password,
"admin",
"password",
BrocadeOptions {
22,
&config.username,
&config.password,
Some(BrocadeOptions {
dry_run: true,
ssh: ssh::SshOptions {
port: 2222,
..Default::default()
},
..Default::default()
},
}),
)
.await
.expect("Brocade client failed to connect");
@@ -59,7 +54,6 @@ async fn main() {
}
println!("--------------");
todo!();
let channel_name = "1";
brocade.clear_port_channel(channel_name).await.unwrap();

View File

@@ -140,7 +140,7 @@ impl BrocadeClient for FastIronClient {
async fn configure_interfaces(
&self,
_interfaces: &Vec<(String, PortOperatingMode)>,
_interfaces: Vec<(String, PortOperatingMode)>,
) -> Result<(), Error> {
todo!()
}

View File

@@ -14,12 +14,11 @@ use async_trait::async_trait;
use harmony_types::net::MacAddress;
use harmony_types::switch::{PortDeclaration, PortLocation};
use regex::Regex;
use serde::Serialize;
mod fast_iron;
mod network_operating_system;
mod shell;
pub mod ssh;
mod ssh;
#[derive(Default, Clone, Debug)]
pub struct BrocadeOptions {
@@ -57,7 +56,7 @@ enum ExecutionMode {
#[derive(Clone, Debug)]
pub struct BrocadeInfo {
os: BrocadeOs,
version: String,
_version: String,
}
#[derive(Clone, Debug)]
@@ -119,7 +118,7 @@ impl fmt::Display for InterfaceType {
}
/// Defines the primary configuration mode of a switch interface, representing mutually exclusive roles.
#[derive(Debug, PartialEq, Eq, Clone, Serialize)]
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum PortOperatingMode {
/// The interface is explicitly configured for Brocade fabric roles (ISL or Trunk enabled).
Fabric,
@@ -142,11 +141,12 @@ pub enum InterfaceStatus {
pub async fn init(
ip_addresses: &[IpAddr],
port: u16,
username: &str,
password: &str,
options: BrocadeOptions,
options: Option<BrocadeOptions>,
) -> Result<Box<dyn BrocadeClient + Send + Sync>, Error> {
let shell = BrocadeShell::init(ip_addresses, username, password, options).await?;
let shell = BrocadeShell::init(ip_addresses, port, username, password, options).await?;
let version_info = shell
.with_session(ExecutionMode::Regular, |session| {
@@ -208,7 +208,7 @@ pub trait BrocadeClient: std::fmt::Debug {
/// Configures a set of interfaces to be operated with a specified mode (access ports, ISL, etc.).
async fn configure_interfaces(
&self,
interfaces: &Vec<(String, PortOperatingMode)>,
interfaces: Vec<(String, PortOperatingMode)>,
) -> Result<(), Error>;
/// Scans the existing configuration to find the next available (unused)
@@ -263,7 +263,7 @@ async fn get_brocade_info(session: &mut BrocadeSession) -> Result<BrocadeInfo, E
return Ok(BrocadeInfo {
os: BrocadeOs::NetworkOperatingSystem,
version,
_version: version,
});
} else if output.contains("ICX") {
let re = Regex::new(r"(?m)^\s*SW: Version\s*(?P<version>[a-zA-Z0-9.\-]+)")
@@ -276,7 +276,7 @@ async fn get_brocade_info(session: &mut BrocadeSession) -> Result<BrocadeInfo, E
return Ok(BrocadeInfo {
os: BrocadeOs::FastIron,
version,
_version: version,
});
}

View File

@@ -187,7 +187,7 @@ impl BrocadeClient for NetworkOperatingSystemClient {
async fn configure_interfaces(
&self,
interfaces: &Vec<(String, PortOperatingMode)>,
interfaces: Vec<(String, PortOperatingMode)>,
) -> Result<(), Error> {
info!("[Brocade] Configuring {} interface(s)...", interfaces.len());
@@ -204,12 +204,9 @@ impl BrocadeClient for NetworkOperatingSystemClient {
PortOperatingMode::Trunk => {
commands.push("switchport".into());
commands.push("switchport mode trunk".into());
commands.push("switchport trunk allowed vlan all".into());
commands.push("no switchport trunk tag native-vlan".into());
commands.push("spanning-tree shutdown".into());
commands.push("no spanning-tree shutdown".into());
commands.push("no fabric isl enable".into());
commands.push("no fabric trunk enable".into());
commands.push("no shutdown".into());
}
PortOperatingMode::Access => {
commands.push("switchport".into());

View File

@@ -16,6 +16,7 @@ use tokio::time::timeout;
#[derive(Debug)]
pub struct BrocadeShell {
ip: IpAddr,
port: u16,
username: String,
password: String,
options: BrocadeOptions,
@@ -26,31 +27,33 @@ pub struct BrocadeShell {
impl BrocadeShell {
pub async fn init(
ip_addresses: &[IpAddr],
port: u16,
username: &str,
password: &str,
options: BrocadeOptions,
options: Option<BrocadeOptions>,
) -> Result<Self, Error> {
let ip = ip_addresses
.first()
.ok_or_else(|| Error::ConfigurationError("No IP addresses provided".to_string()))?;
let brocade_ssh_client_options =
ssh::try_init_client(username, password, ip, options).await?;
let base_options = options.unwrap_or_default();
let options = ssh::try_init_client(username, password, ip, base_options).await?;
Ok(Self {
ip: *ip,
port,
username: username.to_string(),
password: password.to_string(),
before_all_commands: vec![],
after_all_commands: vec![],
options: brocade_ssh_client_options,
options,
})
}
pub async fn open_session(&self, mode: ExecutionMode) -> Result<BrocadeSession, Error> {
BrocadeSession::open(
self.ip,
self.options.ssh.port,
self.port,
&self.username,
&self.password,
self.options.clone(),

View File

@@ -2,7 +2,6 @@ use std::borrow::Cow;
use std::sync::Arc;
use async_trait::async_trait;
use log::debug;
use russh::client::Handler;
use russh::kex::DH_G1_SHA1;
use russh::kex::ECDH_SHA2_NISTP256;
@@ -11,43 +10,29 @@ use russh_keys::key::SSH_RSA;
use super::BrocadeOptions;
use super::Error;
#[derive(Clone, Debug)]
#[derive(Default, Clone, Debug)]
pub struct SshOptions {
pub preferred_algorithms: russh::Preferred,
pub port: u16,
}
impl Default for SshOptions {
fn default() -> Self {
Self {
preferred_algorithms: Default::default(),
port: 22,
}
}
}
impl SshOptions {
fn ecdhsa_sha2_nistp256(port: u16) -> Self {
fn ecdhsa_sha2_nistp256() -> Self {
Self {
preferred_algorithms: russh::Preferred {
kex: Cow::Borrowed(&[ECDH_SHA2_NISTP256]),
key: Cow::Borrowed(&[SSH_RSA]),
..Default::default()
},
port,
..Default::default()
}
}
fn legacy(port: u16) -> Self {
fn legacy() -> Self {
Self {
preferred_algorithms: russh::Preferred {
kex: Cow::Borrowed(&[DH_G1_SHA1]),
key: Cow::Borrowed(&[SSH_RSA]),
..Default::default()
},
port,
..Default::default()
}
}
}
@@ -72,21 +57,18 @@ pub async fn try_init_client(
ip: &std::net::IpAddr,
base_options: BrocadeOptions,
) -> Result<BrocadeOptions, Error> {
let mut default = SshOptions::default();
default.port = base_options.ssh.port;
let ssh_options = vec![
default,
SshOptions::ecdhsa_sha2_nistp256(base_options.ssh.port),
SshOptions::legacy(base_options.ssh.port),
SshOptions::default(),
SshOptions::ecdhsa_sha2_nistp256(),
SshOptions::legacy(),
];
for ssh in ssh_options {
let opts = BrocadeOptions {
ssh: ssh.clone(),
ssh,
..base_options.clone()
};
debug!("Creating client {ip}:{} {username}", ssh.port);
let client = create_client(*ip, ssh.port, username, password, &opts).await;
let client = create_client(*ip, 22, username, password, &opts).await;
match client {
Ok(_) => {

Binary file not shown.

View File

@@ -0,0 +1,105 @@
# Design Document: Harmony PostgreSQL Module
**Status:** Draft
**Last Updated:** 2025-12-01
**Context:** Multi-site Data Replication & Orchestration
## 1. Overview
The Harmony PostgreSQL Module provides a high-level abstraction for deploying and managing high-availability PostgreSQL clusters across geographically distributed Kubernetes/OKD sites.
Instead of manually configuring complex replication slots, firewalls, and operator settings on each cluster, users define a single intent (a **Score**), and Harmony orchestrates the underlying infrastructure (the **Arrangement**) to establish a Primary-Replica architecture.
Currently, the implementation relies on the **CloudNativePG (CNPG)** operator as the backing engine.
## 2. Architecture
### 2.1 The Abstraction Model
Following **ADR 003 (Infrastructure Abstraction)**, Harmony separates the *intent* from the *implementation*.
1. **The Score (Intent):** The user defines a `MultisitePostgreSQL` resource. This describes *what* is needed (e.g., "A Postgres 15 cluster with 10GB storage, Primary on Site A, Replica on Site B").
2. **The Interpret (Action):** Harmony MultisitePostgreSQLInterpret processes this Score and orchestrates the deployment on both sites to reach the state defined in the Score.
3. **The Capability (Implementation):** The PostgreSQL Capability is implemented by the K8sTopology and the interpret can deploy it, configure it and fetch information about it. The concrete implementation will rely on the mature CloudnativePG operator to manage all the Kubernetes resources required.
### 2.2 Network Connectivity (TLS Passthrough)
One of the critical challenges in multi-site orchestration is secure connectivity between clusters that may have dynamic IPs or strict firewalls.
To solve this, we utilize **OKD/OpenShift Routes with TLS Passthrough**.
* **Mechanism:** The Primary site exposes a `Route` configured for `termination: passthrough`.
* **Routing:** The OpenShift HAProxy router inspects the **SNI (Server Name Indication)** header of the incoming TCP connection to route traffic to the correct PostgreSQL Pod.
* **Security:** SSL is **not** terminated at the ingress router. The encrypted stream is passed directly to the PostgreSQL instance. Mutual TLS (mTLS) authentication is handled natively by CNPG between the Primary and Replica instances.
* **Dynamic IPs:** Because connections are established via DNS hostnames (the Route URL), this architecture is resilient to dynamic IP changes at the Primary site.
#### Traffic Flow Diagram
```text
[ Site B: Replica ] [ Site A: Primary ]
| |
(CNPG Instance) --[Encrypted TCP]--> (OKD HAProxy Router)
| (Port 443) |
| |
| [SNI Inspection]
| |
| v
| (PostgreSQL Primary Pod)
| (Port 5432)
```
## 3. Design Decisions
### Why CloudNativePG?
We selected CloudNativePG because it relies exclusively on standard Kubernetes primitives and uses the native PostgreSQL replication protocol (WAL shipping/Streaming). This aligns with Harmony's goal of being "K8s Native."
### Why TLS Passthrough instead of VPN/NodePort?
* **NodePort:** Requires static IPs and opening non-standard ports on the firewall, which violates our security constraints.
* **VPN (e.g., Wireguard/Tailscale):** While secure, it introduces significant complexity (sidecars, key management) and external dependencies.
* **TLS Passthrough:** Leverages the existing Ingress/Router infrastructure already present in OKD. It requires zero additional software and respects multi-tenancy (Routes are namespaced).
### Configuration Philosophy (YAGNI)
The current design exposes a **generic configuration surface**. Users can configure standard parameters (Storage size, CPU/Memory requests, Postgres version).
**We explicitly do not expose advanced CNPG or PostgreSQL configurations at this stage.**
* **Reasoning:** We aim to keep the API surface small and manageable.
* **Future Path:** We plan to implement a "pass-through" mechanism to allow sending raw config maps or custom parameters to the underlying engine (CNPG) *only when a concrete use case arises*. Until then, we adhere to the **YAGNI (You Ain't Gonna Need It)** principle to avoid premature optimization and API bloat.
## 4. Usage Guide
To deploy a multi-site cluster, apply the `MultisitePostgreSQL` resource to the Harmony Control Plane.
### Example Manifest
```yaml
apiVersion: harmony.io/v1alpha1
kind: MultisitePostgreSQL
metadata:
name: finance-db
namespace: tenant-a
spec:
version: "15"
storage: "10Gi"
resources:
requests:
cpu: "500m"
memory: "1Gi"
# Topology Definition
topology:
primary:
site: "site-paris" # The name of the cluster in Harmony
replicas:
- site: "site-newyork"
```
### What happens next?
1. Harmony detects the CR.
2. **On Site Paris:** It deploys a CNPG Cluster (Primary) and creates a Passthrough Route `postgres-finance-db.apps.site-paris.example.com`.
3. **On Site New York:** It deploys a CNPG Cluster (Replica) configured with `externalClusters` pointing to the Paris Route.
4. Data begins replicating immediately over the encrypted channel.
## 5. Troubleshooting
* **Connection Refused:** Ensure the Primary site's Route is successfully admitted by the Ingress Controller.
* **Certificate Errors:** CNPG manages mTLS automatically. If errors persist, ensure the CA secrets were correctly propagated by Harmony from Primary to Replica namespaces.

Binary file not shown.

View File

@@ -1,157 +0,0 @@
use std::str::FromStr;
use async_trait::async_trait;
use brocade::{BrocadeOptions, PortOperatingMode};
use harmony::{
data::Version,
infra::brocade::BrocadeSwitchClient,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{
HostNetworkConfig, PortConfig, PreparationError, PreparationOutcome, Switch, SwitchClient,
SwitchError, Topology,
},
};
use harmony_macros::ip;
use harmony_types::{id::Id, net::MacAddress, switch::PortLocation};
use log::{debug, info};
use serde::Serialize;
#[tokio::main]
async fn main() {
let switch_score = BrocadeSwitchScore {
port_channels_to_clear: vec![
Id::from_str("17").unwrap(),
Id::from_str("19").unwrap(),
Id::from_str("18").unwrap(),
],
ports_to_configure: vec![
(PortLocation(2, 0, 17), PortOperatingMode::Trunk),
(PortLocation(2, 0, 19), PortOperatingMode::Trunk),
(PortLocation(1, 0, 18), PortOperatingMode::Trunk),
],
};
harmony_cli::run(
Inventory::autoload(),
SwitchTopology::new().await,
vec![Box::new(switch_score)],
None,
)
.await
.unwrap();
}
#[derive(Clone, Debug, Serialize)]
struct BrocadeSwitchScore {
port_channels_to_clear: Vec<Id>,
ports_to_configure: Vec<PortConfig>,
}
impl<T: Topology + Switch> Score<T> for BrocadeSwitchScore {
fn name(&self) -> String {
"BrocadeSwitchScore".to_string()
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(BrocadeSwitchInterpret {
score: self.clone(),
})
}
}
#[derive(Debug)]
struct BrocadeSwitchInterpret {
score: BrocadeSwitchScore,
}
#[async_trait]
impl<T: Topology + Switch> Interpret<T> for BrocadeSwitchInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
info!("Applying switch configuration {:?}", self.score);
debug!(
"Clearing port channel {:?}",
self.score.port_channels_to_clear
);
topology
.clear_port_channel(&self.score.port_channels_to_clear)
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
debug!("Configuring interfaces {:?}", self.score.ports_to_configure);
topology
.configure_interface(&self.score.ports_to_configure)
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
Ok(Outcome::success("switch configured".to_string()))
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("BrocadeSwitchInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
struct SwitchTopology {
client: Box<dyn SwitchClient>,
}
#[async_trait]
impl Topology for SwitchTopology {
fn name(&self) -> &str {
"SwitchTopology"
}
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
Ok(PreparationOutcome::Noop)
}
}
impl SwitchTopology {
async fn new() -> Self {
let mut options = BrocadeOptions::default();
options.ssh.port = 2222;
let client =
BrocadeSwitchClient::init(&vec![ip!("127.0.0.1")], &"admin", &"password", options)
.await
.expect("Failed to connect to switch");
let client = Box::new(client);
Self { client }
}
}
#[async_trait]
impl Switch for SwitchTopology {
async fn setup_switch(&self) -> Result<(), SwitchError> {
todo!()
}
async fn get_port_for_mac_address(
&self,
_mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError> {
todo!()
}
async fn configure_port_channel(&self, _config: &HostNetworkConfig) -> Result<(), SwitchError> {
todo!()
}
async fn clear_port_channel(&self, ids: &Vec<Id>) -> Result<(), SwitchError> {
self.client.clear_port_channel(ids).await
}
async fn configure_interface(&self, ports: &Vec<PortConfig>) -> Result<(), SwitchError> {
self.client.configure_interface(ports).await
}
}

View File

@@ -2,7 +2,7 @@ use harmony::{
inventory::Inventory,
modules::{
dummy::{ErrorScore, PanicScore, SuccessScore},
inventory::{HarmonyDiscoveryStrategy, LaunchDiscoverInventoryAgentScore},
inventory::LaunchDiscoverInventoryAgentScore,
},
topology::LocalhostTopology,
};
@@ -18,7 +18,6 @@ async fn main() {
Box::new(PanicScore {}),
Box::new(LaunchDiscoverInventoryAgentScore {
discovery_timeout: Some(10),
discovery_strategy: HarmonyDiscoveryStrategy::MDNS,
}),
],
None,

View File

@@ -1,11 +0,0 @@
cargo build -p harmony_inventory_builder --release --target x86_64-unknown-linux-musl
SCRIPT_DIR="$(dirname ${0})"
cd "${SCRIPT_DIR}/docker/"
cp ../../../target/x86_64-unknown-linux-musl/release/harmony_inventory_builder .
docker build . -t hub.nationtech.io/harmony/harmony_inventory_builder
docker push hub.nationtech.io/harmony/harmony_inventory_builder

View File

@@ -1,10 +0,0 @@
FROM debian:12-slim
RUN mkdir /app
WORKDIR /app/
COPY harmony_inventory_builder /app/
ENV RUST_LOG=info
CMD ["sleep", "infinity"]

View File

@@ -1,36 +0,0 @@
use harmony::{
inventory::{HostRole, Inventory},
modules::inventory::{DiscoverHostForRoleScore, HarmonyDiscoveryStrategy},
topology::LocalhostTopology,
};
use harmony_macros::cidrv4;
#[tokio::main]
async fn main() {
let discover_worker = DiscoverHostForRoleScore {
role: HostRole::Worker,
number_desired_hosts: 3,
discovery_strategy: HarmonyDiscoveryStrategy::SUBNET {
cidr: cidrv4!("192.168.0.1/25"),
port: 25000,
},
};
let discover_control_plane = DiscoverHostForRoleScore {
role: HostRole::ControlPlane,
number_desired_hosts: 3,
discovery_strategy: HarmonyDiscoveryStrategy::SUBNET {
cidr: cidrv4!("192.168.0.1/25"),
port: 25000,
},
};
harmony_cli::run(
Inventory::autoload(),
LocalhostTopology::new(),
vec![Box::new(discover_worker), Box::new(discover_control_plane)],
None,
)
.await
.unwrap();
}

View File

@@ -1,19 +1,18 @@
[package]
name = "brocade-switch"
name = "example-multisite-postgres"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_macros = { path = "../../harmony_macros" }
harmony_types = { path = "../../harmony_types" }
tokio.workspace = true
url.workspace = true
async-trait.workspace = true
serde.workspace = true
log.workspace = true
env_logger.workspace = true
brocade = { path = "../../brocade" }
cidr = { workspace = true }
tokio = { workspace = true }
harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }

View File

@@ -0,0 +1,3 @@
export HARMONY_FAILOVER_TOPOLOGY_K8S_PRIMARY="context=default/api-your-openshift-cluster:6443/kube:admin"
export HARMONY_FAILOVER_TOPOLOGY_K8S_REPLICA="context=someuser/somecluster"
export RUST_LOG="harmony=debug"

View File

@@ -0,0 +1,28 @@
use harmony::{
inventory::Inventory,
modules::postgresql::{PublicPostgreSQLScore, capability::PostgreSQLConfig},
topology::{FailoverTopology, K8sAnywhereTopology},
};
#[tokio::main]
async fn main() {
// env_logger::init();
let postgres = PublicPostgreSQLScore {
config: PostgreSQLConfig {
cluster_name: "harmony-postgres-example".to_string(), // Override default name
namespace: "harmony-public-postgres".to_string(),
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
// "default" namespace, 1 instance, 1Gi storage
},
hostname: "postgrestest.sto1.nationtech.io".to_string(),
};
harmony_cli::run(
Inventory::autoload(),
FailoverTopology::<K8sAnywhereTopology>::from_env(),
vec![Box::new(postgres)],
None,
)
.await
.unwrap();
}

View File

@@ -39,10 +39,10 @@ async fn main() {
.expect("Failed to get credentials");
let switches: Vec<IpAddr> = vec![ip!("192.168.33.101")];
let brocade_options = BrocadeOptions {
let brocade_options = Some(BrocadeOptions {
dry_run: *harmony::config::DRY_RUN,
..Default::default()
};
});
let switch_client = BrocadeSwitchClient::init(
&switches,
&switch_auth.username,

View File

@@ -4,10 +4,7 @@ use crate::topology::{get_inventory, get_topology};
use harmony::{
config::secret::SshKeyPair,
data::{FileContent, FilePath},
modules::{
inventory::HarmonyDiscoveryStrategy,
okd::{installation::OKDInstallationPipeline, ipxe::OKDIpxeScore},
},
modules::okd::{installation::OKDInstallationPipeline, ipxe::OKDIpxeScore},
score::Score,
topology::HAClusterTopology,
};
@@ -29,8 +26,7 @@ async fn main() {
},
})];
scores
.append(&mut OKDInstallationPipeline::get_all_scores(HarmonyDiscoveryStrategy::MDNS).await);
scores.append(&mut OKDInstallationPipeline::get_all_scores().await);
harmony_cli::run(inventory, topology, scores, None)
.await

View File

@@ -31,10 +31,10 @@ pub async fn get_topology() -> HAClusterTopology {
.expect("Failed to get credentials");
let switches: Vec<IpAddr> = vec![ip!("192.168.1.101")]; // TODO: Adjust me
let brocade_options = BrocadeOptions {
let brocade_options = Some(BrocadeOptions {
dry_run: *harmony::config::DRY_RUN,
..Default::default()
};
});
let switch_client = BrocadeSwitchClient::init(
&switches,
&switch_auth.username,

View File

@@ -26,10 +26,10 @@ pub async fn get_topology() -> HAClusterTopology {
.expect("Failed to get credentials");
let switches: Vec<IpAddr> = vec![ip!("192.168.1.101")]; // TODO: Adjust me
let brocade_options = BrocadeOptions {
let brocade_options = Some(BrocadeOptions {
dry_run: *harmony::config::DRY_RUN,
..Default::default()
};
});
let switch_client = BrocadeSwitchClient::init(
&switches,
&switch_auth.username,

View File

@@ -1,4 +1,4 @@
use std::{collections::HashMap, str::FromStr};
use std::str::FromStr;
use harmony::{
inventory::Inventory,

View File

@@ -0,0 +1,18 @@
[package]
name = "example-operatorhub-catalogsource"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
cidr = { workspace = true }
tokio = { workspace = true }
harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }

View File

@@ -0,0 +1,22 @@
use std::str::FromStr;
use harmony::{
inventory::Inventory,
modules::{k8s::apps::OperatorHubCatalogSourceScore, postgresql::CloudNativePgOperatorScore},
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let operatorhub_catalog = OperatorHubCatalogSourceScore::default();
let cnpg_operator = CloudNativePgOperatorScore::default();
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(operatorhub_catalog), Box::new(cnpg_operator)],
None,
)
.await
.unwrap();
}

View File

@@ -35,10 +35,10 @@ async fn main() {
.expect("Failed to get credentials");
let switches: Vec<IpAddr> = vec![ip!("192.168.5.101")]; // TODO: Adjust me
let brocade_options = BrocadeOptions {
let brocade_options = Some(BrocadeOptions {
dry_run: *harmony::config::DRY_RUN,
..Default::default()
};
});
let switch_client = BrocadeSwitchClient::init(
&switches,
&switch_auth.username,

View File

@@ -1,15 +1,18 @@
[package]
name = "harmony_inventory_builder"
name = "example-postgresql"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_macros = { path = "../../harmony_macros" }
harmony_types = { path = "../../harmony_types" }
tokio.workspace = true
url.workspace = true
cidr.workspace = true
cidr = { workspace = true }
tokio = { workspace = true }
harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }

View File

@@ -0,0 +1,26 @@
use harmony::{
inventory::Inventory,
modules::postgresql::{PostgreSQLScore, capability::PostgreSQLConfig},
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let postgresql = PostgreSQLScore {
config: PostgreSQLConfig {
cluster_name: "harmony-postgres-example".to_string(), // Override default name
namespace: "harmony-postgres-example".to_string(),
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
// "default" namespace, 1 instance, 1Gi storage
},
};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(postgresql)],
None,
)
.await
.unwrap();
}

View File

@@ -0,0 +1,18 @@
[package]
name = "example-public-postgres"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
cidr = { workspace = true }
tokio = { workspace = true }
harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }

View File

@@ -0,0 +1,38 @@
use harmony::{
inventory::Inventory,
modules::postgresql::{
K8sPostgreSQLScore, PostgreSQLConnectionScore, PublicPostgreSQLScore,
capability::PostgreSQLConfig,
},
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let postgres = PublicPostgreSQLScore {
config: PostgreSQLConfig {
cluster_name: "harmony-postgres-example".to_string(), // Override default name
namespace: "harmony-public-postgres".to_string(),
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
// 1 instance, 1Gi storage
},
hostname: "postgrestest.sto1.nationtech.io".to_string(),
};
let test_connection = PostgreSQLConnectionScore {
name: "harmony-postgres-example".to_string(),
namespace: "harmony-public-postgres".to_string(),
cluster_name: "harmony-postgres-example".to_string(),
hostname: Some("postgrestest.sto1.nationtech.io".to_string()),
port_override: Some(443),
};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(postgres), Box::new(test_connection)],
None,
)
.await
.unwrap();
}

View File

@@ -152,10 +152,10 @@ impl PhysicalHost {
pub fn parts_list(&self) -> String {
let PhysicalHost {
id,
category: _,
category,
network,
storage,
labels: _,
labels,
memory_modules,
cpus,
} = self;
@@ -226,8 +226,8 @@ impl PhysicalHost {
speed_mhz,
manufacturer,
part_number,
serial_number: _,
rank: _,
serial_number,
rank,
} = mem;
parts_list.push_str(&format!(
"\n{}Gb, {}Mhz, Manufacturer ({}), Part Number ({})",

View File

@@ -4,8 +4,6 @@ use std::error::Error;
use async_trait::async_trait;
use derive_new::new;
use crate::inventory::HostRole;
use super::{
data::Version, executors::ExecutorError, inventory::Inventory, topology::PreparationError,
};
@@ -154,6 +152,12 @@ pub struct InterpretError {
msg: String,
}
impl From<InterpretError> for String {
fn from(e: InterpretError) -> String {
format!("InterpretError : {}", e.msg)
}
}
impl std::fmt::Display for InterpretError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.msg)

View File

@@ -1,6 +1,4 @@
mod repository;
use std::fmt;
pub use repository::*;
#[derive(Debug, new, Clone)]
@@ -71,14 +69,5 @@ pub enum HostRole {
Bootstrap,
ControlPlane,
Worker,
}
impl fmt::Display for HostRole {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
HostRole::Bootstrap => write!(f, "Bootstrap"),
HostRole::ControlPlane => write!(f, "ControlPlane"),
HostRole::Worker => write!(f, "Worker"),
}
}
Storage,
}

View File

@@ -0,0 +1,64 @@
use async_trait::async_trait;
use crate::topology::k8s_anywhere::K8sAnywhereConfig;
use crate::topology::{K8sAnywhereTopology, PreparationError, PreparationOutcome, Topology};
pub struct FailoverTopology<T> {
pub primary: T,
pub replica: T,
}
#[async_trait]
impl<T: Topology + Send + Sync> Topology for FailoverTopology<T> {
fn name(&self) -> &str {
"FailoverTopology"
}
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
let primary_outcome = self.primary.ensure_ready().await?;
let replica_outcome = self.replica.ensure_ready().await?;
match (primary_outcome, replica_outcome) {
(PreparationOutcome::Noop, PreparationOutcome::Noop) => Ok(PreparationOutcome::Noop),
(p, r) => {
let mut details = Vec::new();
if let PreparationOutcome::Success { details: d } = p {
details.push(format!("Primary: {}", d));
}
if let PreparationOutcome::Success { details: d } = r {
details.push(format!("Replica: {}", d));
}
Ok(PreparationOutcome::Success {
details: details.join(", "),
})
}
}
}
}
impl FailoverTopology<K8sAnywhereTopology> {
/// Creates a new `FailoverTopology` from environment variables.
///
/// Expects two environment variables:
/// - `HARMONY_FAILOVER_TOPOLOGY_K8S_PRIMARY`: Comma-separated `key=value` pairs, e.g.,
/// `kubeconfig=/path/to/primary.kubeconfig,context_name=primary-ctx`
/// - `HARMONY_FAILOVER_TOPOLOGY_K8S_REPLICA`: Same format for the replica.
///
/// Parses `kubeconfig` (path to kubeconfig file) and `context_name` (Kubernetes context),
/// and constructs `K8sAnywhereConfig` with local installs disabled (`use_local_k3d=false`,
/// `autoinstall=false`, `use_system_kubeconfig=false`).
/// `harmony_profile` is read from `HARMONY_PROFILE` env or defaults to `"dev"`.
///
/// Panics if required env vars are missing or malformed.
pub fn from_env() -> Self {
let primary_config =
K8sAnywhereConfig::remote_k8s_from_env_var("HARMONY_FAILOVER_TOPOLOGY_K8S_PRIMARY");
let replica_config =
K8sAnywhereConfig::remote_k8s_from_env_var("HARMONY_FAILOVER_TOPOLOGY_K8S_REPLICA");
let primary = K8sAnywhereTopology::with_config(primary_config);
let replica = K8sAnywhereTopology::with_config(replica_config);
Self { primary, replica }
}
}

View File

@@ -1,5 +1,4 @@
use async_trait::async_trait;
use brocade::PortOperatingMode;
use harmony_macros::ip;
use harmony_types::{
id::Id,
@@ -9,9 +8,9 @@ use harmony_types::{
use log::debug;
use log::info;
use crate::infra::network_manager::OpenShiftNmStateNetworkManager;
use crate::topology::PxeOptions;
use crate::{data::FileContent, executors::ExecutorError};
use crate::{infra::network_manager::OpenShiftNmStateNetworkManager, topology::PortConfig};
use crate::{modules::inventory::HarmonyDiscoveryStrategy, topology::PxeOptions};
use super::{
DHCPStaticEntry, DhcpServer, DnsRecord, DnsRecordType, DnsServer, Firewall, HostNetworkConfig,
@@ -299,13 +298,6 @@ impl Switch for HAClusterTopology {
Ok(())
}
async fn clear_port_channel(&self, ids: &Vec<Id>) -> Result<(), SwitchError> {
todo!()
}
async fn configure_interface(&self, ports: &Vec<PortConfig>) -> Result<(), SwitchError> {
todo!()
}
}
#[async_trait]
@@ -529,10 +521,4 @@ impl SwitchClient for DummyInfra {
) -> Result<u8, SwitchError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
async fn clear_port_channel(&self, ids: &Vec<Id>) -> Result<(), SwitchError> {
todo!()
}
async fn configure_interface(&self, ports: &Vec<PortConfig>) -> Result<(), SwitchError> {
todo!()
}
}

View File

@@ -451,7 +451,20 @@ impl K8sClient {
{
let mut result = Vec::new();
for r in resource.iter() {
result.push(self.apply(r, ns).await?);
let apply_result = self.apply(r, ns).await;
if apply_result.is_err() {
// NOTE : We should be careful about this one, it may leak sensitive information in
// logs
// Maybe just reducing it to debug would be enough as we already know debug logs
// are unsafe.
// But keeping it at warn makes it much easier to understand what is going on. So be it for now.
warn!(
"Failed to apply k8s resource : {}",
serde_json::to_string_pretty(r).map_err(|e| Error::SerdeError(e))?
);
}
result.push(apply_result?);
}
Ok(result)
@@ -618,6 +631,23 @@ impl K8sClient {
}
pub async fn from_kubeconfig(path: &str) -> Option<K8sClient> {
Self::from_kubeconfig_with_opts(path, &KubeConfigOptions::default()).await
}
pub async fn from_kubeconfig_with_context(
path: &str,
context: Option<String>,
) -> Option<K8sClient> {
let mut opts = KubeConfigOptions::default();
opts.context = context;
Self::from_kubeconfig_with_opts(path, &opts).await
}
pub async fn from_kubeconfig_with_opts(
path: &str,
opts: &KubeConfigOptions,
) -> Option<K8sClient> {
let k = match Kubeconfig::read_from(path) {
Ok(k) => k,
Err(e) => {
@@ -625,13 +655,9 @@ impl K8sClient {
return None;
}
};
Some(K8sClient::new(
Client::try_from(
Config::from_custom_kubeconfig(k, &KubeConfigOptions::default())
.await
.unwrap(),
)
.unwrap(),
Client::try_from(Config::from_custom_kubeconfig(k, &opts).await.unwrap()).unwrap(),
))
}
}

View File

@@ -2,6 +2,7 @@ use std::{collections::BTreeMap, process::Command, sync::Arc, time::Duration};
use async_trait::async_trait;
use base64::{Engine, engine::general_purpose};
use harmony_types::rfc1123::Rfc1123Name;
use k8s_openapi::api::{
core::v1::Secret,
rbac::v1::{ClusterRoleBinding, RoleRef, Subject},
@@ -34,16 +35,17 @@ use crate::{
service_monitor::ServiceMonitor,
},
},
okd::route::OKDTlsPassthroughScore,
prometheus::{
k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore,
prometheus::PrometheusMonitoring, rhob_alerting_score::RHOBAlertingScore,
},
},
score::Score,
topology::ingress::Ingress,
topology::{TlsRoute, TlsRouter, ingress::Ingress},
};
use super::{
use super::super::{
DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, PreparationError,
PreparationOutcome, Topology,
k8s::K8sClient,
@@ -102,6 +104,41 @@ impl K8sclient for K8sAnywhereTopology {
}
}
#[async_trait]
impl TlsRouter for K8sAnywhereTopology {
async fn get_wildcard_domain(&self) -> Result<Option<String>, String> {
todo!()
}
/// Returns the port that this router exposes externally.
async fn get_router_port(&self) -> u16 {
// TODO un-hardcode this :)
443
}
async fn install_route(&self, route: TlsRoute) -> Result<(), String> {
let distro = self
.get_k8s_distribution()
.await
.map_err(|e| format!("Could not get k8s distribution {e}"))?;
match distro {
KubernetesDistribution::OpenshiftFamily => {
OKDTlsPassthroughScore {
name: Rfc1123Name::try_from(route.backend_info_string().as_str())?,
route,
}
.interpret(&Inventory::empty(), self)
.await?;
Ok(())
}
KubernetesDistribution::K3sFamily | KubernetesDistribution::Default => Err(format!(
"Distribution not supported yet for Tlsrouter {distro:?}"
)),
}
}
}
#[async_trait]
impl Grafana for K8sAnywhereTopology {
async fn ensure_grafana_operator(
@@ -343,6 +380,7 @@ impl K8sAnywhereTopology {
pub async fn get_k8s_distribution(&self) -> Result<&KubernetesDistribution, PreparationError> {
self.k8s_distribution
.get_or_try_init(async || {
debug!("Trying to detect k8s distribution");
let client = self.k8s_client().await.unwrap();
let discovery = client.discovery().await.map_err(|e| {
@@ -358,14 +396,17 @@ impl K8sAnywhereTopology {
.groups()
.any(|g| g.name() == "project.openshift.io")
{
info!("Found KubernetesDistribution OpenshiftFamily");
return Ok(KubernetesDistribution::OpenshiftFamily);
}
// K3d / K3s
if version.git_version.contains("k3s") {
info!("Found KubernetesDistribution K3sFamily");
return Ok(KubernetesDistribution::K3sFamily);
}
info!("Could not identify KubernetesDistribution, using Default");
return Ok(KubernetesDistribution::Default);
})
.await
@@ -613,7 +654,7 @@ impl K8sAnywhereTopology {
}
async fn try_load_kubeconfig(&self, path: &str) -> Option<K8sClient> {
K8sClient::from_kubeconfig(path).await
K8sClient::from_kubeconfig_with_context(path, self.config.k8s_context.clone()).await
}
fn get_k3d_installation_score(&self) -> K3DInstallationScore {
@@ -651,7 +692,14 @@ impl K8sAnywhereTopology {
return Ok(Some(K8sState {
client: Arc::new(client),
source: K8sSource::Kubeconfig,
message: format!("Loaded k8s client from kubeconfig {kubeconfig}"),
message: format!(
"Loaded k8s client from kubeconfig {kubeconfig} using context {}",
self.config
.k8s_context
.as_ref()
.map(|s| s.clone())
.unwrap_or_default()
),
}));
}
None => {
@@ -891,9 +939,71 @@ pub struct K8sAnywhereConfig {
/// default: true
pub use_local_k3d: bool,
pub harmony_profile: String,
/// Name of the kubeconfig context to use.
///
/// If None, it will use the current context.
///
/// If the context name is not found, it will fail to initialize.
pub k8s_context: Option<String>,
}
impl K8sAnywhereConfig {
/// Reads an environment variable `env_var` and parses its content :
/// Comma-separated `key=value` pairs, e.g.,
/// `kubeconfig=/path/to/primary.kubeconfig,context=primary-ctx`
///
/// Then creates a K8sAnywhereConfig from it local installs disabled (`use_local_k3d=false`,
/// `autoinstall=false`, `use_system_kubeconfig=false`).
/// `harmony_profile` is read from `HARMONY_PROFILE` env or defaults to `"dev"`.
///
/// If no kubeconfig path is provided it will fall back to system kubeconfig
///
/// Panics if `env_var` is missing or malformed.
pub fn remote_k8s_from_env_var(env_var: &str) -> Self {
Self::remote_k8s_from_env_var_with_profile(env_var, "HARMONY_PROFILE")
}
pub fn remote_k8s_from_env_var_with_profile(env_var: &str, profile_env_var: &str) -> Self {
debug!("Looking for env var named : {env_var}");
let env_var_value = std::env::var(env_var)
.map_err(|e| format!("Missing required env var {env_var} : {e}"))
.unwrap();
info!("Initializing remote k8s from env var value : {env_var_value}");
let mut kubeconfig: Option<String> = None;
let mut k8s_context: Option<String> = None;
for part in env_var_value.split(',') {
let kv: Vec<&str> = part.splitn(2, '=').collect();
if kv.len() == 2 {
match kv[0].trim() {
"kubeconfig" => kubeconfig = Some(kv[1].trim().to_string()),
"context" => k8s_context = Some(kv[1].trim().to_string()),
_ => {}
}
}
}
debug!("Found in {env_var} : kubeconfig {kubeconfig:?} and context {k8s_context:?}");
let use_system_kubeconfig = kubeconfig.is_none();
if let Some(kubeconfig_value) = std::env::var("KUBECONFIG").ok().map(|v| v.to_string()) {
kubeconfig.get_or_insert(kubeconfig_value);
}
info!("Loading k8s environment with kubeconfig {kubeconfig:?} and context {k8s_context:?}");
K8sAnywhereConfig {
kubeconfig,
k8s_context,
use_system_kubeconfig,
autoinstall: false,
use_local_k3d: false,
harmony_profile: std::env::var(profile_env_var).unwrap_or_else(|_| "dev".to_string()),
}
}
fn from_env() -> Self {
Self {
kubeconfig: std::env::var("KUBECONFIG").ok().map(|v| v.to_string()),
@@ -908,6 +1018,7 @@ impl K8sAnywhereConfig {
),
use_local_k3d: std::env::var("HARMONY_USE_LOCAL_K3D")
.map_or_else(|_| true, |v| v.parse().ok().unwrap_or(true)),
k8s_context: std::env::var("HARMONY_K8S_CONTEXT").ok(),
}
}
}
@@ -1014,3 +1125,181 @@ impl Ingress for K8sAnywhereTopology {
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
/// Sets environment variables with unique names to avoid concurrency issues between tests.
/// Returns the names of the (config_var, profile_var) used.
fn setup_env_vars(config_value: Option<&str>, profile_value: Option<&str>) -> (String, String) {
let id = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
let config_var = format!("TEST_VAR_{}", id);
let profile_var = format!("TEST_PROFILE_{}", id);
unsafe {
if let Some(v) = config_value {
std::env::set_var(&config_var, v);
} else {
std::env::remove_var(&config_var);
}
if let Some(v) = profile_value {
std::env::set_var(&profile_var, v);
} else {
std::env::remove_var(&profile_var);
}
}
(config_var, profile_var)
}
/// Runs a test in a separate thread to avoid polluting the process environment.
fn run_in_isolated_env<F>(f: F)
where
F: FnOnce() + Send + 'static,
{
let handle = std::thread::spawn(f);
handle.join().expect("Test thread panicked");
}
#[test]
fn test_remote_k8s_from_env_var_full() {
let (config_var, profile_var) =
setup_env_vars(Some("kubeconfig=/foo.kc,context=bar"), Some("testprof"));
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc"));
assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
assert_eq!(cfg.harmony_profile, "testprof");
assert!(!cfg.use_local_k3d);
assert!(!cfg.autoinstall);
assert!(!cfg.use_system_kubeconfig);
}
#[test]
fn test_remote_k8s_from_env_var_only_kubeconfig() {
let (config_var, profile_var) = setup_env_vars(Some("kubeconfig=/foo.kc"), None);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc"));
assert_eq!(cfg.k8s_context, None);
assert_eq!(cfg.harmony_profile, "dev");
}
#[test]
fn test_remote_k8s_from_env_var_only_context() {
run_in_isolated_env(|| {
unsafe {
std::env::remove_var("KUBECONFIG");
}
let (config_var, profile_var) = setup_env_vars(Some("context=bar"), None);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.kubeconfig, None);
assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
});
}
#[test]
fn test_remote_k8s_from_env_var_unknown_key_trim() {
run_in_isolated_env(|| {
unsafe {
std::env::remove_var("KUBECONFIG");
}
let (config_var, profile_var) = setup_env_vars(
Some(" unknown=bla , kubeconfig= /foo.kc ,context= bar "),
None,
);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc"));
assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
});
}
#[test]
fn test_remote_k8s_from_env_var_empty_malformed() {
run_in_isolated_env(|| {
unsafe {
std::env::remove_var("KUBECONFIG");
}
let (config_var, profile_var) = setup_env_vars(Some("malformed,no=,equal"), None);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
// Unknown/malformed ignored, defaults to None
assert_eq!(cfg.kubeconfig, None);
assert_eq!(cfg.k8s_context, None);
});
}
#[test]
fn test_remote_k8s_from_env_var_kubeconfig_fallback() {
run_in_isolated_env(|| {
unsafe {
std::env::set_var("KUBECONFIG", "/fallback/path");
}
let (config_var, profile_var) = setup_env_vars(Some("context=bar"), None);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.kubeconfig.as_deref(), Some("/fallback/path"));
assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
});
}
#[test]
fn test_remote_k8s_from_env_var_kubeconfig_no_fallback_if_provided() {
run_in_isolated_env(|| {
unsafe {
std::env::set_var("KUBECONFIG", "/fallback/path");
}
let (config_var, profile_var) =
setup_env_vars(Some("kubeconfig=/primary/path,context=bar"), None);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
// Primary path should take precedence
assert_eq!(cfg.kubeconfig.as_deref(), Some("/primary/path"));
assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
});
}
#[test]
#[should_panic(expected = "Missing required env var")]
fn test_remote_k8s_from_env_var_missing() {
let (config_var, profile_var) = setup_env_vars(None, None);
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
}
#[test]
fn test_remote_k8s_from_env_var_context_key() {
let (config_var, profile_var) = setup_env_vars(
Some("context=default/api-sto1-harmony-mcd:6443/kube:admin"),
None,
);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(
cfg.k8s_context.as_deref(),
Some("default/api-sto1-harmony-mcd:6443/kube:admin")
);
}
}

View File

@@ -0,0 +1,3 @@
mod k8s_anywhere;
mod postgres;
pub use k8s_anywhere::*;

View File

@@ -0,0 +1,125 @@
use async_trait::async_trait;
use crate::{
interpret::Outcome,
inventory::Inventory,
modules::postgresql::{
K8sPostgreSQLScore,
capability::{PostgreSQL, PostgreSQLConfig, PostgreSQLEndpoint, ReplicationCerts},
},
score::Score,
topology::{K8sAnywhereTopology, K8sclient},
};
use k8s_openapi::api::core::v1::{Secret, Service};
use log::info;
#[async_trait]
impl PostgreSQL for K8sAnywhereTopology {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
K8sPostgreSQLScore {
config: config.clone(),
}
.interpret(&Inventory::empty(), self)
.await
.map_err(|e| format!("Failed to deploy k8s postgresql : {e}"))?;
Ok(config.cluster_name.clone())
}
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
/// Abstracts away storage/retrieval details (e.g., secrets, files).
async fn get_replication_certs(&self, config: &PostgreSQLConfig) -> Result<ReplicationCerts, String> {
let cluster_name = &config.cluster_name;
let namespace = &config.namespace;
let k8s_client = self.k8s_client().await.map_err(|e| e.to_string())?;
let replication_secret_name = format!("{cluster_name}-replication");
let replication_secret = k8s_client
.get_resource::<Secret>(&replication_secret_name, Some(namespace))
.await
.map_err(|e| format!("Failed to get {replication_secret_name}: {e}"))?
.ok_or_else(|| format!("Replication secret '{replication_secret_name}' not found"))?;
let ca_secret_name = format!("{cluster_name}-ca");
let ca_secret = k8s_client
.get_resource::<Secret>(&ca_secret_name, Some(namespace))
.await
.map_err(|e| format!("Failed to get {ca_secret_name}: {e}"))?
.ok_or_else(|| format!("CA secret '{ca_secret_name}' not found"))?;
let replication_data = replication_secret
.data
.as_ref()
.ok_or("Replication secret has no data".to_string())?;
let ca_data = ca_secret
.data
.as_ref()
.ok_or("CA secret has no data".to_string())?;
let tls_key_bs = replication_data
.get("tls.key")
.ok_or("missing tls.key in replication secret".to_string())?;
let tls_crt_bs = replication_data
.get("tls.crt")
.ok_or("missing tls.crt in replication secret".to_string())?;
let ca_crt_bs = ca_data
.get("ca.crt")
.ok_or("missing ca.crt in CA secret".to_string())?;
let streaming_replica_key_pem = String::from_utf8_lossy(&tls_key_bs.0).to_string();
let streaming_replica_cert_pem = String::from_utf8_lossy(&tls_crt_bs.0).to_string();
let ca_cert_pem = String::from_utf8_lossy(&ca_crt_bs.0).to_string();
info!("Successfully extracted replication certs for cluster '{cluster_name}'");
Ok(ReplicationCerts {
ca_cert_pem,
streaming_replica_cert_pem,
streaming_replica_key_pem,
})
}
/// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster.
async fn get_endpoint(&self, config: &PostgreSQLConfig) -> Result<PostgreSQLEndpoint, String> {
let cluster_name = &config.cluster_name;
let namespace = &config.namespace;
let k8s_client = self.k8s_client().await.map_err(|e| e.to_string())?;
let service_name = format!("{cluster_name}-rw");
let service = k8s_client
.get_resource::<Service>(&service_name, Some(namespace))
.await
.map_err(|e| format!("Failed to get service '{service_name}': {e}"))?
.ok_or_else(|| {
format!("Service '{service_name}' not found for cluster '{cluster_name}")
})?;
let ns = service
.metadata
.namespace
.as_deref()
.unwrap_or("default")
.to_string();
let host = format!("{service_name}.{ns}.svc.cluster.local");
info!("Internal endpoint for '{cluster_name}': {host}:5432");
Ok(PostgreSQLEndpoint { host, port: 5432 })
}
// /// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough).
// /// Returns None if no public endpoint (internal-only cluster).
// /// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex
// /// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait.
// async fn get_public_endpoint(
// &self,
// cluster_name: &str,
// ) -> Result<Option<PostgreSQLEndpoint>, String> {
// // TODO: Implement OpenShift Route lookup targeting '{cluster_name}-rw' service on port 5432 with TLS passthrough
// // For now, return None assuming internal-only access or manual route configuration
// info!("Public endpoint lookup not implemented for '{cluster_name}', returning None");
// Ok(None)
// }
}

View File

@@ -1,5 +1,7 @@
mod failover;
mod ha_cluster;
pub mod ingress;
pub use failover::*;
use harmony_types::net::IpAddress;
mod host_binding;
mod http;
@@ -13,7 +15,7 @@ pub use k8s_anywhere::*;
pub use localhost::*;
pub mod k8s;
mod load_balancer;
mod router;
pub mod router;
mod tftp;
use async_trait::async_trait;
pub use ha_cluster::*;

View File

@@ -7,7 +7,6 @@ use std::{
};
use async_trait::async_trait;
use brocade::PortOperatingMode;
use derive_new::new;
use harmony_types::{
id::Id,
@@ -215,8 +214,6 @@ impl From<String> for NetworkError {
}
}
pub type PortConfig = (PortLocation, PortOperatingMode);
#[async_trait]
pub trait Switch: Send + Sync {
async fn setup_switch(&self) -> Result<(), SwitchError>;
@@ -227,8 +224,6 @@ pub trait Switch: Send + Sync {
) -> Result<Option<PortLocation>, SwitchError>;
async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError>;
async fn clear_port_channel(&self, ids: &Vec<Id>) -> Result<(), SwitchError>;
async fn configure_interface(&self, ports: &Vec<PortConfig>) -> Result<(), SwitchError>;
}
#[derive(Clone, Debug, PartialEq)]
@@ -288,9 +283,6 @@ pub trait SwitchClient: Debug + Send + Sync {
channel_name: &str,
switch_ports: Vec<PortLocation>,
) -> Result<u8, SwitchError>;
async fn clear_port_channel(&self, ids: &Vec<Id>) -> Result<(), SwitchError>;
async fn configure_interface(&self, ports: &Vec<PortConfig>) -> Result<(), SwitchError>;
}
#[cfg(test)]

View File

@@ -1,11 +1,20 @@
use async_trait::async_trait;
use cidr::Ipv4Cidr;
use derive_new::new;
use serde::Serialize;
use super::{IpAddress, LogicalHost};
/// Basic network router abstraction (L3 IP routing/gateway).
/// Distinguished from TlsRouter (L4 TLS passthrough).
pub trait Router: Send + Sync {
/// Gateway IP address for this subnet/router.
fn get_gateway(&self) -> IpAddress;
/// CIDR block managed by this router.
fn get_cidr(&self) -> Ipv4Cidr;
/// Logical host associated with this router.
fn get_host(&self) -> LogicalHost;
}
@@ -38,3 +47,81 @@ impl Router for UnmanagedRouter {
todo!()
}
}
/// Desired state config for a TLS passthrough route.
/// Forwards external TLS (port 443) → backend service:target_port (no termination at router).
/// Inspired by CNPG multisite: exposes `-rw`/`-ro` services publicly via OKD Route/HAProxy/K8s
/// Gateway etc.
///
/// # Example
/// ```
/// use harmony::topology::router::TlsRoute;
/// let postgres_rw = TlsRoute {
/// hostname: "postgres-cluster-example.public.domain.io".to_string(),
/// backend: "postgres-cluster-example-rw".to_string(), // k8s Service or HAProxy upstream
/// target_port: 5432,
/// };
/// ```
#[derive(Clone, Debug, Serialize)]
pub struct TlsRoute {
/// Public hostname clients connect to (TLS SNI, port 443 implicit).
/// Router matches this for passthrough forwarding.
pub hostname: String,
/// Backend/host identifier (k8s Service, HAProxy upstream, IP/FQDN, etc.).
pub backend: String,
/// Backend TCP port (Postgres: 5432).
pub target_port: u16,
/// The environment in which it lives.
/// TODO clarify how we handle this in higher level abstractions. The namespace name is a
/// direct mapping to k8s but that could be misleading for other implementations.
pub namespace: String,
}
impl TlsRoute {
pub fn to_string_short(&self) -> String {
format!("{}-{}:{}", self.hostname, self.backend, self.target_port)
}
pub fn backend_info_string(&self) -> String {
format!("{}:{}", self.backend, self.target_port)
}
}
/// Installs and queries TLS passthrough routes (L4 TCP/SNI forwarding, no TLS termination).
/// Agnostic to impl: OKD Route, AWS NLB+HAProxy, k3s Envoy Gateway, Apache ProxyPass.
/// Used by PostgreSQL capability to expose CNPG clusters multisite (site1 → site2 replication).
///
/// # Usage
/// ```ignore
/// use harmony::topology::router::TlsRoute;
/// // After CNPG deploy, expose RW endpoint
/// async fn route() {
/// let topology = okd_topology();
/// let route = TlsRoute { /* ... */ };
/// topology.install_route(route).await; // OKD Route, HAProxy reload, etc.
/// }
/// ```
#[async_trait]
pub trait TlsRouter: Send + Sync {
/// Provisions the route (idempotent where possible).
/// Example: OKD Route{ host, to: backend:target_port, tls: {passthrough} };
/// HAProxy frontend→backend \"postgres-upstream\".
async fn install_route(&self, config: TlsRoute) -> Result<(), String>;
/// Gets the base domain that can be used to deploy applications that will be automatically
/// routed to this cluster.
///
/// For example, if we have *.apps.nationtech.io pointing to a public load balancer, then this
/// function would return
///
/// ```
/// Some(String::new("apps.nationtech.io"))
/// ```
async fn get_wildcard_domain(&self) -> Result<Option<String>, String>;
/// Returns the port that this router exposes externally.
async fn get_router_port(&self) -> u16;
}

View File

@@ -14,7 +14,7 @@ use k8s_openapi::{
},
apimachinery::pkg::util::intstr::IntOrString,
};
use kube::{Resource, api::DynamicObject};
use kube::Resource;
use log::debug;
use serde::de::DeserializeOwned;
use serde_json::json;

View File

@@ -1,13 +1,12 @@
use async_trait::async_trait;
use brocade::{BrocadeClient, BrocadeOptions, InterSwitchLink, InterfaceStatus, PortOperatingMode};
use harmony_types::{
id::Id,
net::{IpAddress, MacAddress},
switch::{PortDeclaration, PortLocation},
};
use option_ext::OptionExt;
use crate::topology::{PortConfig, SwitchClient, SwitchError};
use crate::topology::{SwitchClient, SwitchError};
#[derive(Debug)]
pub struct BrocadeSwitchClient {
@@ -19,9 +18,9 @@ impl BrocadeSwitchClient {
ip_addresses: &[IpAddress],
username: &str,
password: &str,
options: BrocadeOptions,
options: Option<BrocadeOptions>,
) -> Result<Self, brocade::Error> {
let brocade = brocade::init(ip_addresses, username, password, options).await?;
let brocade = brocade::init(ip_addresses, 22, username, password, options).await?;
Ok(Self { brocade })
}
}
@@ -60,7 +59,7 @@ impl SwitchClient for BrocadeSwitchClient {
}
self.brocade
.configure_interfaces(&interfaces)
.configure_interfaces(interfaces)
.await
.map_err(|e| SwitchError::new(e.to_string()))?;
@@ -112,27 +111,6 @@ impl SwitchClient for BrocadeSwitchClient {
Ok(channel_id)
}
async fn clear_port_channel(&self, ids: &Vec<Id>) -> Result<(), SwitchError> {
for i in ids {
self.brocade
.clear_port_channel(&i.to_string())
.await
.map_err(|e| SwitchError::new(e.to_string()))?;
}
Ok(())
}
async fn configure_interface(&self, ports: &Vec<PortConfig>) -> Result<(), SwitchError> {
// FIXME hardcoded TenGigabitEthernet = bad
let ports = ports
.iter()
.map(|p| (format!("TenGigabitEthernet {}", p.0), p.1.clone()))
.collect();
self.brocade
.configure_interfaces(&ports)
.await
.map_err(|e| SwitchError::new(e.to_string()))?;
Ok(())
}
}
#[cfg(test)]
@@ -167,7 +145,6 @@ mod tests {
client.setup().await.unwrap();
//TODO not sure about this
let configured_interfaces = brocade.configured_interfaces.lock().unwrap();
assert_that!(*configured_interfaces).contains_exactly(vec![
(first_interface.name.clone(), PortOperatingMode::Access),
@@ -278,10 +255,10 @@ mod tests {
async fn configure_interfaces(
&self,
interfaces: &Vec<(String, PortOperatingMode)>,
interfaces: Vec<(String, PortOperatingMode)>,
) -> Result<(), Error> {
let mut configured_interfaces = self.configured_interfaces.lock().unwrap();
*configured_interfaces = interfaces.clone();
*configured_interfaces = interfaces;
Ok(())
}

View File

@@ -121,7 +121,7 @@ mod test {
#[test]
fn deployment_to_dynamic_roundtrip() {
// Create a sample Deployment with nested structures
let deployment = Deployment {
let mut deployment = Deployment {
metadata: ObjectMeta {
name: Some("my-deployment".to_string()),
labels: Some({

View File

@@ -8,6 +8,7 @@ mod tftp;
use std::sync::Arc;
pub use management::*;
use opnsense_config_xml::Host;
use tokio::sync::RwLock;
use crate::{executors::ExecutorError, topology::LogicalHost};

View File

@@ -1,11 +1,9 @@
use async_trait::async_trait;
use harmony_macros::hurl;
use kube::{Api, api::GroupVersionKind};
use log::{debug, warn};
use kube::api::GroupVersionKind;
use non_blank_string_rs::NonBlankString;
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::{process::Command, str::FromStr, sync::Arc};
use std::{str::FromStr, sync::Arc};
use crate::{
data::Version,
@@ -13,10 +11,7 @@ use crate::{
inventory::Inventory,
modules::helm::chart::{HelmChartScore, HelmRepository},
score::Score,
topology::{
HelmCommand, K8sclient, PreparationError, PreparationOutcome, Topology, ingress::Ingress,
k8s::K8sClient,
},
topology::{HelmCommand, K8sclient, Topology, ingress::Ingress, k8s::K8sClient},
};
use harmony_types::id::Id;

View File

@@ -19,11 +19,8 @@ pub struct DhcpScore {
pub host_binding: Vec<HostBinding>,
pub next_server: Option<IpAddress>,
pub boot_filename: Option<String>,
/// Boot filename to be provided to PXE clients identifying as BIOS
pub filename: Option<String>,
/// Boot filename to be provided to PXE clients identifying as uefi but NOT iPXE
pub filename64: Option<String>,
/// Boot filename to be provided to PXE clients identifying as iPXE
pub filenameipxe: Option<String>,
pub dhcp_range: (IpAddress, IpAddress),
pub domain: Option<String>,

View File

@@ -5,10 +5,11 @@ use serde::{Deserialize, Serialize};
use crate::{
data::Version,
hardware::PhysicalHost,
infra::inventory::InventoryRepositoryFactory,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::{HostRole, Inventory},
modules::inventory::{HarmonyDiscoveryStrategy, LaunchDiscoverInventoryAgentScore},
modules::inventory::LaunchDiscoverInventoryAgentScore,
score::Score,
topology::Topology,
};
@@ -16,13 +17,11 @@ use crate::{
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DiscoverHostForRoleScore {
pub role: HostRole,
pub number_desired_hosts: i16,
pub discovery_strategy: HarmonyDiscoveryStrategy,
}
impl<T: Topology> Score<T> for DiscoverHostForRoleScore {
fn name(&self) -> String {
format!("DiscoverHostForRoleScore({:?})", self.role)
"DiscoverInventoryAgentScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
@@ -49,15 +48,13 @@ impl<T: Topology> Interpret<T> for DiscoverHostForRoleInterpret {
);
LaunchDiscoverInventoryAgentScore {
discovery_timeout: None,
discovery_strategy: self.score.discovery_strategy.clone(),
}
.interpret(inventory, topology)
.await?;
let mut chosen_hosts = vec![];
let host: PhysicalHost;
let host_repo = InventoryRepositoryFactory::build().await?;
let mut assigned_hosts = 0;
loop {
let all_hosts = host_repo.get_all_hosts().await?;
@@ -78,24 +75,15 @@ impl<T: Topology> Interpret<T> for DiscoverHostForRoleInterpret {
match ans {
Ok(choice) => {
info!(
"Assigned role {:?} for node {}",
self.score.role,
choice.summary()
"Selected {} as the {:?} node.",
choice.summary(),
self.score.role
);
host_repo
.save_role_mapping(&self.score.role, &choice)
.await?;
chosen_hosts.push(choice);
assigned_hosts += 1;
info!(
"Found {assigned_hosts} hosts for role {:?}",
self.score.role
);
if assigned_hosts == self.score.number_desired_hosts {
break;
}
host = choice;
break;
}
Err(inquire::InquireError::OperationCanceled) => {
info!("Refresh requested. Fetching list of discovered hosts again...");
@@ -112,13 +100,8 @@ impl<T: Topology> Interpret<T> for DiscoverHostForRoleInterpret {
}
Ok(Outcome::success(format!(
"Successfully discovered {} hosts {} for role {:?}",
self.score.number_desired_hosts,
chosen_hosts
.iter()
.map(|h| h.summary())
.collect::<Vec<String>>()
.join(", "),
"Successfully discovered host {} for role {:?}",
host.summary(),
self.score.role
)))
}

View File

@@ -1,10 +1,6 @@
mod discovery;
pub mod inspect;
use std::net::Ipv4Addr;
use cidr::{Ipv4Cidr, Ipv4Inet};
pub use discovery::*;
use tokio::time::{Duration, timeout};
use async_trait::async_trait;
use harmony_inventory_agent::local_presence::DiscoveryEvent;
@@ -28,7 +24,6 @@ use harmony_types::id::Id;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct LaunchDiscoverInventoryAgentScore {
pub discovery_timeout: Option<u64>,
pub discovery_strategy: HarmonyDiscoveryStrategy,
}
impl<T: Topology> Score<T> for LaunchDiscoverInventoryAgentScore {
@@ -48,12 +43,6 @@ struct DiscoverInventoryAgentInterpret {
score: LaunchDiscoverInventoryAgentScore,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum HarmonyDiscoveryStrategy {
MDNS,
SUBNET { cidr: cidr::Ipv4Cidr, port: u16 },
}
#[async_trait]
impl<T: Topology> Interpret<T> for DiscoverInventoryAgentInterpret {
async fn execute(
@@ -68,37 +57,6 @@ impl<T: Topology> Interpret<T> for DiscoverInventoryAgentInterpret {
),
};
match self.score.discovery_strategy {
HarmonyDiscoveryStrategy::MDNS => self.launch_mdns_discovery().await,
HarmonyDiscoveryStrategy::SUBNET { cidr, port } => {
self.launch_cidr_discovery(&cidr, port).await
}
};
Ok(Outcome::success(
"Discovery process completed successfully".to_string(),
))
}
fn get_name(&self) -> InterpretName {
InterpretName::DiscoverInventoryAgent
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl DiscoverInventoryAgentInterpret {
async fn launch_mdns_discovery(&self) {
harmony_inventory_agent::local_presence::discover_agents(
self.score.discovery_timeout,
|event: DiscoveryEvent| -> Result<(), String> {
@@ -154,8 +112,6 @@ impl DiscoverInventoryAgentInterpret {
cpus,
};
// FIXME only save the host when it is new or something changed in it.
// we currently are saving the host every time it is discovered.
let repo = InventoryRepositoryFactory::build()
.await
.map_err(|e| format!("Could not build repository : {e}"))
@@ -176,111 +132,25 @@ impl DiscoverInventoryAgentInterpret {
Ok(())
},
)
.await
.await;
Ok(Outcome::success(
"Discovery process completed successfully".to_string(),
))
}
// async fn launch_cidr_discovery(&self, cidr : &Ipv4Cidr, port: u16) {
// todo!("launnch cidr discovery for {cidr} : {port}
// - Iterate over all possible addresses in cidr
// - make calls in batches of 20 attempting to reach harmony inventory agent on <addr, port> using same as above harmony_inventory_agent::client::get_host_inventory(&address, port)
// - Log warn when response is 404, it means the port was used by something else unexpected
// - Log error when response is 5xx
// - Log debug when no response (timeout 15 seconds)
// - Log info when found and response is 2xx
// ");
// }
async fn launch_cidr_discovery(&self, cidr: &Ipv4Cidr, port: u16) {
let addrs: Vec<Ipv4Inet> = cidr.iter().collect();
let total = addrs.len();
info!(
"Starting CIDR discovery for {} hosts on {}/{} (port {})",
total,
cidr.network_length(),
cidr,
port
);
fn get_name(&self) -> InterpretName {
InterpretName::DiscoverInventoryAgent
}
let batch_size: usize = 20;
let timeout_secs = 5;
let request_timeout = Duration::from_secs(timeout_secs);
fn get_version(&self) -> Version {
todo!()
}
let mut current_batch = 0;
let num_batches = addrs.len() / batch_size;
fn get_status(&self) -> InterpretStatus {
todo!()
}
for batch in addrs.chunks(batch_size) {
current_batch += 1;
info!("Starting query batch {current_batch} of {num_batches}, timeout {timeout_secs}");
let mut tasks = Vec::with_capacity(batch.len());
for addr in batch {
let addr = addr.address().to_string();
let port = port;
let task = tokio::spawn(async move {
match timeout(
request_timeout,
harmony_inventory_agent::client::get_host_inventory(&addr, port),
)
.await
{
Ok(Ok(host)) => {
info!("Found and response is 2xx for {addr}:{port}");
// Reuse the same conversion to PhysicalHost as MDNS flow
let harmony_inventory_agent::hwinfo::PhysicalHost {
storage_drives,
storage_controller,
memory_modules,
cpus,
chipset,
network_interfaces,
management_interface,
host_uuid,
} = host;
let host = PhysicalHost {
id: Id::from(host_uuid),
category: HostCategory::Server,
network: network_interfaces,
storage: storage_drives,
labels: vec![Label {
name: "discovered-by".to_string(),
value: "harmony-inventory-agent".to_string(),
}],
memory_modules,
cpus,
};
// Save host to inventory
let repo = InventoryRepositoryFactory::build()
.await
.map_err(|e| format!("Could not build repository : {e}"))
.unwrap();
if let Err(e) = repo.save(&host).await {
log::debug!("Failed to save host {}: {e}", host.id);
} else {
info!("Saved host id {}, summary : {}", host.id, host.summary());
}
}
Ok(Err(e)) => {
log::info!("Error querying inventory agent on {addr}:{port} : {e}");
}
Err(_) => {
// Timeout for this host
log::debug!("No response (timeout) for {addr}:{port}");
}
}
});
tasks.push(task);
}
// Wait for this batch to complete
for t in tasks {
let _ = t.await;
}
}
info!("CIDR discovery completed");
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@@ -0,0 +1,157 @@
use std::collections::BTreeMap;
use k8s_openapi::{
api::core::v1::{Affinity, Toleration},
apimachinery::pkg::apis::meta::v1::ObjectMeta,
};
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
#[kube(
group = "operators.coreos.com",
version = "v1alpha1",
kind = "CatalogSource",
plural = "catalogsources",
namespaced = true,
schema = "disabled"
)]
#[serde(rename_all = "camelCase")]
pub struct CatalogSourceSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub address: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub config_map: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub display_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub grpc_pod_config: Option<GrpcPodConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub icon: Option<Icon>,
#[serde(skip_serializing_if = "Option::is_none")]
pub image: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub priority: Option<i64>,
#[serde(skip_serializing_if = "Option::is_none")]
pub publisher: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub run_as_root: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub secrets: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub source_type: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub update_strategy: Option<UpdateStrategy>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct GrpcPodConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub affinity: Option<Affinity>,
#[serde(skip_serializing_if = "Option::is_none")]
pub extract_content: Option<ExtractContent>,
#[serde(skip_serializing_if = "Option::is_none")]
pub memory_target: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_selector: Option<BTreeMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub priority_class_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub security_context_config: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tolerations: Option<Vec<Toleration>>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ExtractContent {
pub cache_dir: String,
pub catalog_dir: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Icon {
pub base64data: String,
pub mediatype: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct UpdateStrategy {
#[serde(skip_serializing_if = "Option::is_none")]
pub registry_poll: Option<RegistryPoll>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct RegistryPoll {
#[serde(skip_serializing_if = "Option::is_none")]
pub interval: Option<String>,
}
impl Default for CatalogSource {
fn default() -> Self {
Self {
metadata: ObjectMeta::default(),
spec: CatalogSourceSpec {
address: None,
config_map: None,
description: None,
display_name: None,
grpc_pod_config: None,
icon: None,
image: None,
priority: None,
publisher: None,
run_as_root: None,
secrets: None,
source_type: None,
update_strategy: None,
},
}
}
}
impl Default for CatalogSourceSpec {
fn default() -> Self {
Self {
address: None,
config_map: None,
description: None,
display_name: None,
grpc_pod_config: None,
icon: None,
image: None,
priority: None,
publisher: None,
run_as_root: None,
secrets: None,
source_type: None,
update_strategy: None,
}
}
}

View File

@@ -0,0 +1,4 @@
mod catalogsources_operators_coreos_com;
pub use catalogsources_operators_coreos_com::*;
mod subscriptions_operators_coreos_com;
pub use subscriptions_operators_coreos_com::*;

View File

@@ -0,0 +1,68 @@
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::CustomResource;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
#[kube(
group = "operators.coreos.com",
version = "v1alpha1",
kind = "Subscription",
plural = "subscriptions",
namespaced = true,
schema = "disabled"
)]
#[serde(rename_all = "camelCase")]
pub struct SubscriptionSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub channel: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub config: Option<SubscriptionConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub install_plan_approval: Option<String>,
pub name: String,
pub source: String,
pub source_namespace: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub starting_csv: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct SubscriptionConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub env: Option<Vec<k8s_openapi::api::core::v1::EnvVar>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_selector: Option<std::collections::BTreeMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tolerations: Option<Vec<k8s_openapi::api::core::v1::Toleration>>,
}
impl Default for Subscription {
fn default() -> Self {
Subscription {
metadata: ObjectMeta::default(),
spec: SubscriptionSpec::default(),
}
}
}
impl Default for SubscriptionSpec {
fn default() -> SubscriptionSpec {
SubscriptionSpec {
name: String::new(),
source: String::new(),
source_namespace: String::new(),
channel: None,
config: None,
install_plan_approval: None,
starting_csv: None,
}
}
}

View File

@@ -0,0 +1,3 @@
mod operatorhub;
pub use operatorhub::*;
pub mod crd;

View File

@@ -0,0 +1,107 @@
// Write operatorhub catalog score
// for now this will only support on OKD with the default catalog and operatorhub setup and does not verify OLM state or anything else. Very opinionated and bare-bones to start
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use serde::Serialize;
use crate::interpret::Interpret;
use crate::modules::k8s::apps::crd::{
CatalogSource, CatalogSourceSpec, RegistryPoll, UpdateStrategy,
};
use crate::modules::k8s::resource::K8sResourceScore;
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
/// Installs the CatalogSource in a cluster which already has the required services and CRDs installed.
///
/// ```rust
/// use harmony::modules::k8s::apps::OperatorHubCatalogSourceScore;
///
/// let score = OperatorHubCatalogSourceScore::default();
/// ```
///
/// Required services:
/// - catalog-operator
/// - olm-operator
///
/// They are installed by default with OKD/Openshift
///
/// **Warning** : this initial implementation does not manage the dependencies. They must already
/// exist in the cluster.
#[derive(Debug, Clone, Serialize)]
pub struct OperatorHubCatalogSourceScore {
pub name: String,
pub namespace: String,
pub image: String,
}
impl OperatorHubCatalogSourceScore {
pub fn new(name: &str, namespace: &str, image: &str) -> Self {
Self {
name: name.to_string(),
namespace: namespace.to_string(),
image: image.to_string(),
}
}
}
impl Default for OperatorHubCatalogSourceScore {
/// This default implementation will create this k8s resource :
///
/// ```yaml
/// apiVersion: operators.coreos.com/v1alpha1
/// kind: CatalogSource
/// metadata:
/// name: operatorhubio-catalog
/// namespace: openshift-marketplace
/// spec:
/// sourceType: grpc
/// image: quay.io/operatorhubio/catalog:latest
/// displayName: Operatorhub Operators
/// publisher: OperatorHub.io
/// updateStrategy:
/// registryPoll:
/// interval: 60m
/// ```
fn default() -> Self {
OperatorHubCatalogSourceScore {
name: "operatorhubio-catalog".to_string(),
namespace: "openshift-marketplace".to_string(),
image: "quay.io/operatorhubio/catalog:latest".to_string(),
}
}
}
impl<T: Topology + K8sclient> Score<T> for OperatorHubCatalogSourceScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let metadata = ObjectMeta {
name: Some(self.name.clone()),
namespace: Some(self.namespace.clone()),
..ObjectMeta::default()
};
let spec = CatalogSourceSpec {
source_type: Some("grpc".to_string()),
image: Some(self.image.clone()),
display_name: Some("Operatorhub Operators".to_string()),
publisher: Some("OperatorHub.io".to_string()),
update_strategy: Some(UpdateStrategy {
registry_poll: Some(RegistryPoll {
interval: Some("60m".to_string()),
}),
}),
..CatalogSourceSpec::default()
};
let catalog_source = CatalogSource {
metadata,
spec: spec,
};
K8sResourceScore::single(catalog_source, Some(self.namespace.clone())).create_interpret()
}
fn name(&self) -> String {
format!("OperatorHubCatalogSourceScore({})", self.name)
}
}

View File

@@ -0,0 +1,19 @@
use std::sync::Arc;
use async_trait::async_trait;
use log::warn;
use crate::topology::{FailoverTopology, K8sclient, k8s::K8sClient};
#[async_trait]
impl<T: K8sclient> K8sclient for FailoverTopology<T> {
// TODO figure out how to structure this properly. This gives access only to the primary k8s
// client, which will work in many cases but is clearly not good enough for all uses cases
// where k8s_client can be used. Logging a warning for now.
async fn k8s_client(&self) -> Result<Arc<K8sClient>, String> {
warn!(
"Failover topology k8s_client capability currently defers to the primary only. Make sure to check this is OK for you"
);
self.primary.k8s_client().await
}
}

View File

@@ -1,4 +1,6 @@
pub mod apps;
pub mod deployment;
mod failover;
pub mod ingress;
pub mod namespace;
pub mod resource;

View File

@@ -79,7 +79,33 @@ where
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
info!("Applying {} resources", self.score.resource.len());
// TODO improve this log
let resource_names: Vec<String> = self
.score
.resource
.iter()
.map(|r| {
format!(
"{}{}",
r.meta()
.name
.as_ref()
.map(|n| format!("{n}"))
.unwrap_or_default(),
r.meta()
.namespace
.as_ref()
.map(|ns| format!("@{}", ns))
.unwrap_or_default()
)
})
.collect();
info!(
"Applying {} resources : {}",
resource_names.len(),
resource_names.join(", ")
);
topology
.k8s_client()
.await

View File

@@ -11,8 +11,10 @@ pub mod k8s;
pub mod lamp;
pub mod load_balancer;
pub mod monitoring;
pub mod network;
pub mod okd;
pub mod opnsense;
pub mod postgresql;
pub mod prometheus;
pub mod storage;
pub mod tenant;

View File

@@ -0,0 +1,18 @@
use async_trait::async_trait;
use log::warn;
use crate::topology::{FailoverTopology, TlsRoute, TlsRouter};
#[async_trait]
impl<T: TlsRouter> TlsRouter for FailoverTopology<T> {
async fn get_wildcard_domain(&self) -> Result<Option<String>, String> {todo!()}
/// Returns the port that this router exposes externally.
async fn get_router_port(&self) -> u16 {todo!()}
async fn install_route(&self, config: TlsRoute) -> Result<(), String> {
warn!(
"Failover topology TlsRouter capability currently defers to the primary only. Make sure to check this is OK for you. The Replica Topology WILL NOT be affected here"
);
self.primary.install_route(config).await
}
}

View File

@@ -0,0 +1,3 @@
mod failover;
mod tls_router;
pub use tls_router::*;

View File

@@ -0,0 +1,91 @@
use async_trait::async_trait;
use harmony_types::id::Id;
use serde::Serialize;
use crate::data::Version;
use crate::domain::topology::router::{TlsRoute, TlsRouter};
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory;
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
/// Score for provisioning a TLS passthrough route.
/// Exposes backend services via TLS passthrough (L4 TCP/SNI forwarding).
/// Agnostic to underlying router impl (OKD Route, HAProxy, Envoy, etc.).
///
/// TlsPassthroughScore relies on the TlsRouter Capability for its entire functionnality,
/// the implementation depends entirely on how the Topology implements it.
///
/// # Usage
/// ```
/// use harmony::modules::network::TlsPassthroughScore;
/// use harmony::topology::router::TlsRoute;
/// let score = TlsPassthroughScore {
/// route: TlsRoute {
/// backend: "postgres-cluster-rw".to_string(),
/// hostname: "postgres-rw.example.com".to_string(),
/// target_port: 5432,
/// },
/// };
/// ```
///
/// # Hint
///
/// **This TlsPassthroughScore should be used whenever possible.** It is effectively
/// an abstraction over the concept of tls passthrough, and it will allow much more flexible
/// usage over multiple types of Topology than using a lower level module such as
/// OKDTlsPassthroughScore.
///
/// On the other hand, some implementation specific options might not be available or practical
/// to use through this high level TlsPassthroughScore.
#[derive(Debug, Clone, Serialize)]
pub struct TlsPassthroughScore {
pub route: TlsRoute,
}
impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Score<T> for TlsPassthroughScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(TlsPassthroughInterpret {
tls_route: self.route.clone(),
})
}
fn name(&self) -> String {
format!(
"TlsRouterScore({}:{}{})",
self.route.backend, self.route.target_port, self.route.hostname
)
}
}
/// Custom interpret: provisions the TLS passthrough route on the topology.
#[derive(Debug, Clone)]
struct TlsPassthroughInterpret {
tls_route: TlsRoute,
}
#[async_trait]
impl<T: Topology + K8sclient + TlsRouter + Send + Sync> Interpret<T> for TlsPassthroughInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("TlsRouterInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
async fn execute(&self, _inventory: &Inventory, topo: &T) -> Result<Outcome, InterpretError> {
topo.install_route(self.tls_route.clone())
.await
.map_err(|e| InterpretError::new(e.to_string()))?;
Ok(Outcome::success(format!(
"TLS route installed: {}{}:{}",
self.tls_route.hostname, self.tls_route.backend, self.tls_route.target_port
)))
}
}

View File

@@ -4,7 +4,7 @@ use crate::{
infra::inventory::InventoryRepositoryFactory,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::{HostRole, Inventory},
modules::inventory::{DiscoverHostForRoleScore, HarmonyDiscoveryStrategy},
modules::inventory::DiscoverHostForRoleScore,
score::Score,
topology::HAClusterTopology,
};
@@ -104,8 +104,6 @@ When you can dig them, confirm to continue.
bootstrap_host = hosts.into_iter().next().to_owned();
DiscoverHostForRoleScore {
role: HostRole::Bootstrap,
number_desired_hosts: 1,
discovery_strategy: HarmonyDiscoveryStrategy::MDNS,
}
.interpret(inventory, topology)
.await?;

View File

@@ -1,10 +1,20 @@
use crate::{
interpret::Interpret,
inventory::HostRole,
modules::{inventory::HarmonyDiscoveryStrategy, okd::bootstrap_okd_node::OKDNodeInterpret},
data::Version,
hardware::PhysicalHost,
infra::inventory::InventoryRepositoryFactory,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::{HostRole, Inventory},
modules::{
dhcp::DhcpHostBindingScore, http::IPxeMacBootFileScore,
inventory::DiscoverHostForRoleScore, okd::templates::BootstrapIpxeTpl,
},
score::Score,
topology::HAClusterTopology,
topology::{HAClusterTopology, HostBinding},
};
use async_trait::async_trait;
use derive_new::new;
use harmony_types::id::Id;
use log::{debug, info};
use serde::Serialize;
// -------------------------------------------------------------------------------------------------
@@ -13,23 +23,231 @@ use serde::Serialize;
// - Persist bonding via MachineConfigs (or NNCP) once SCOS is active.
// -------------------------------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize)]
pub struct OKDSetup03ControlPlaneScore {
pub discovery_strategy: HarmonyDiscoveryStrategy,
}
#[derive(Debug, Clone, Serialize, new)]
pub struct OKDSetup03ControlPlaneScore {}
impl Score<HAClusterTopology> for OKDSetup03ControlPlaneScore {
fn create_interpret(&self) -> Box<dyn Interpret<HAClusterTopology>> {
// TODO: Implement a step to wait for the control plane nodes to join the cluster
// and for the cluster operators to become available. This would be similar to
// the `wait-for bootstrap-complete` command.
Box::new(OKDNodeInterpret::new(
HostRole::ControlPlane,
self.discovery_strategy.clone(),
))
Box::new(OKDSetup03ControlPlaneInterpret::new())
}
fn name(&self) -> String {
"OKDSetup03ControlPlaneScore".to_string()
}
}
#[derive(Debug, Clone)]
pub struct OKDSetup03ControlPlaneInterpret {
version: Version,
status: InterpretStatus,
}
impl OKDSetup03ControlPlaneInterpret {
pub fn new() -> Self {
let version = Version::from("1.0.0").unwrap();
Self {
version,
status: InterpretStatus::QUEUED,
}
}
/// Ensures that three physical hosts are discovered and available for the ControlPlane role.
/// It will trigger discovery if not enough hosts are found.
async fn get_nodes(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
) -> Result<Vec<PhysicalHost>, InterpretError> {
const REQUIRED_HOSTS: usize = 3;
let repo = InventoryRepositoryFactory::build().await?;
let mut control_plane_hosts = repo.get_host_for_role(&HostRole::ControlPlane).await?;
while control_plane_hosts.len() < REQUIRED_HOSTS {
info!(
"Discovery of {} control plane hosts in progress, current number {}",
REQUIRED_HOSTS,
control_plane_hosts.len()
);
// This score triggers the discovery agent for a specific role.
DiscoverHostForRoleScore {
role: HostRole::ControlPlane,
}
.interpret(inventory, topology)
.await?;
control_plane_hosts = repo.get_host_for_role(&HostRole::ControlPlane).await?;
}
if control_plane_hosts.len() < REQUIRED_HOSTS {
Err(InterpretError::new(format!(
"OKD Requires at least {} control plane hosts, but only found {}. Cannot proceed.",
REQUIRED_HOSTS,
control_plane_hosts.len()
)))
} else {
// Take exactly the number of required hosts to ensure consistency.
Ok(control_plane_hosts
.into_iter()
.take(REQUIRED_HOSTS)
.collect())
}
}
/// Configures DHCP host bindings for all control plane nodes.
async fn configure_host_binding(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
nodes: &Vec<PhysicalHost>,
) -> Result<(), InterpretError> {
info!("[ControlPlane] Configuring host bindings for control plane nodes.");
// Ensure the topology definition matches the number of physical nodes found.
if topology.control_plane.len() != nodes.len() {
return Err(InterpretError::new(format!(
"Mismatch between logical control plane hosts defined in topology ({}) and physical nodes found ({}).",
topology.control_plane.len(),
nodes.len()
)));
}
// Create a binding for each physical host to its corresponding logical host.
let bindings: Vec<HostBinding> = topology
.control_plane
.iter()
.zip(nodes.iter())
.map(|(logical_host, physical_host)| {
info!(
"Creating binding: Logical Host '{}' -> Physical Host ID '{}'",
logical_host.name, physical_host.id
);
HostBinding {
logical_host: logical_host.clone(),
physical_host: physical_host.clone(),
}
})
.collect();
DhcpHostBindingScore {
host_binding: bindings,
domain: Some(topology.domain_name.clone()),
}
.interpret(inventory, topology)
.await?;
Ok(())
}
/// Renders and deploys a per-MAC iPXE boot file for each control plane node.
async fn configure_ipxe(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
nodes: &Vec<PhysicalHost>,
) -> Result<(), InterpretError> {
info!("[ControlPlane] Rendering per-MAC iPXE configurations.");
// The iPXE script content is the same for all control plane nodes,
// pointing to the 'master.ign' ignition file.
let content = BootstrapIpxeTpl {
http_ip: &topology.http_server.get_ip().to_string(),
scos_path: "scos",
ignition_http_path: "okd_ignition_files",
installation_device: "/dev/sda", // This might need to be configurable per-host in the future
ignition_file_name: "master.ign", // Control plane nodes use the master ignition file
}
.to_string();
debug!("[ControlPlane] iPXE content template:\n{content}");
// Create and apply an iPXE boot file for each node.
for node in nodes {
let mac_address = node.get_mac_address();
if mac_address.is_empty() {
return Err(InterpretError::new(format!(
"Physical host with ID '{}' has no MAC addresses defined.",
node.id
)));
}
info!(
"[ControlPlane] Applying iPXE config for node ID '{}' with MACs: {:?}",
node.id, mac_address
);
IPxeMacBootFileScore {
mac_address,
content: content.clone(),
}
.interpret(inventory, topology)
.await?;
}
Ok(())
}
/// Prompts the user to reboot the target control plane nodes.
async fn reboot_targets(&self, nodes: &Vec<PhysicalHost>) -> Result<(), InterpretError> {
let node_ids: Vec<String> = nodes.iter().map(|n| n.id.to_string()).collect();
info!("[ControlPlane] Requesting reboot for control plane nodes: {node_ids:?}",);
let confirmation = inquire::Confirm::new(
&format!("Please reboot the {} control plane nodes ({}) to apply their PXE configuration. Press enter when ready.", nodes.len(), node_ids.join(", ")),
)
.prompt()
.map_err(|e| InterpretError::new(format!("User prompt failed: {e}")))?;
if !confirmation {
return Err(InterpretError::new(
"User aborted the operation.".to_string(),
));
}
Ok(())
}
}
#[async_trait]
impl Interpret<HAClusterTopology> for OKDSetup03ControlPlaneInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDSetup03ControlPlane")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<Id> {
vec![]
}
async fn execute(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
) -> Result<Outcome, InterpretError> {
// 1. Ensure we have 3 physical hosts for the control plane.
let nodes = self.get_nodes(inventory, topology).await?;
// 2. Create DHCP reservations for the control plane nodes.
self.configure_host_binding(inventory, topology, &nodes)
.await?;
// 3. Create iPXE files for each control plane node to boot from the master ignition.
self.configure_ipxe(inventory, topology, &nodes).await?;
// 4. Reboot the nodes to start the OS installation.
self.reboot_targets(&nodes).await?;
// TODO: Implement a step to wait for the control plane nodes to join the cluster
// and for the cluster operators to become available. This would be similar to
// the `wait-for bootstrap-complete` command.
info!("[ControlPlane] Provisioning initiated. Monitor the cluster convergence manually.");
Ok(Outcome::success(
"Control plane provisioning has been successfully initiated.".into(),
))
}
}

View File

@@ -1,9 +1,13 @@
use async_trait::async_trait;
use derive_new::new;
use harmony_types::id::Id;
use log::info;
use serde::Serialize;
use crate::{
interpret::Interpret,
inventory::HostRole,
modules::{inventory::HarmonyDiscoveryStrategy, okd::bootstrap_okd_node::OKDNodeInterpret},
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::HAClusterTopology,
};
@@ -14,20 +18,66 @@ use crate::{
// - Persist bonding via MC/NNCP as required (same approach as masters).
// -------------------------------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize)]
pub struct OKDSetup04WorkersScore {
pub discovery_strategy: HarmonyDiscoveryStrategy,
}
#[derive(Debug, Clone, Serialize, new)]
pub struct OKDSetup04WorkersScore {}
impl Score<HAClusterTopology> for OKDSetup04WorkersScore {
fn create_interpret(&self) -> Box<dyn Interpret<HAClusterTopology>> {
Box::new(OKDNodeInterpret::new(
HostRole::ControlPlane,
self.discovery_strategy.clone(),
))
Box::new(OKDSetup04WorkersInterpret::new(self.clone()))
}
fn name(&self) -> String {
"OKDSetup04WorkersScore".to_string()
}
}
#[derive(Debug, Clone)]
pub struct OKDSetup04WorkersInterpret {
score: OKDSetup04WorkersScore,
version: Version,
status: InterpretStatus,
}
impl OKDSetup04WorkersInterpret {
pub fn new(score: OKDSetup04WorkersScore) -> Self {
let version = Version::from("1.0.0").unwrap();
Self {
version,
score,
status: InterpretStatus::QUEUED,
}
}
async fn render_and_reboot(&self) -> Result<(), InterpretError> {
info!("[Workers] Rendering per-MAC PXE for workers and rebooting");
Ok(())
}
}
#[async_trait]
impl Interpret<HAClusterTopology> for OKDSetup04WorkersInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDSetup04Workers")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<Id> {
vec![]
}
async fn execute(
&self,
_inventory: &Inventory,
_topology: &HAClusterTopology,
) -> Result<Outcome, InterpretError> {
self.render_and_reboot().await?;
Ok(Outcome::success("Workers provisioned".into()))
}
}

View File

@@ -1,313 +0,0 @@
use async_trait::async_trait;
use derive_new::new;
use harmony_types::id::Id;
use log::{debug, info};
use serde::Serialize;
use crate::{
data::Version,
hardware::PhysicalHost,
infra::inventory::InventoryRepositoryFactory,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::{HostRole, Inventory},
modules::{
dhcp::DhcpHostBindingScore,
http::IPxeMacBootFileScore,
inventory::{DiscoverHostForRoleScore, HarmonyDiscoveryStrategy},
okd::{
okd_node::{BootstrapRole, ControlPlaneRole, OKDRoleProperties, WorkerRole},
templates::BootstrapIpxeTpl,
},
},
score::Score,
topology::{HAClusterTopology, HostBinding, LogicalHost},
};
#[derive(Debug, Clone, Serialize, new)]
pub struct OKDNodeInstallationScore {
host_role: HostRole,
discovery_strategy: HarmonyDiscoveryStrategy,
}
impl Score<HAClusterTopology> for OKDNodeInstallationScore {
fn name(&self) -> String {
"OKDNodeScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<HAClusterTopology>> {
Box::new(OKDNodeInterpret::new(
self.host_role.clone(),
self.discovery_strategy.clone(),
))
}
}
#[derive(Debug, Clone)]
pub struct OKDNodeInterpret {
host_role: HostRole,
discovery_strategy: HarmonyDiscoveryStrategy,
}
impl OKDNodeInterpret {
pub fn new(host_role: HostRole, discovery_strategy: HarmonyDiscoveryStrategy) -> Self {
Self {
host_role,
discovery_strategy,
}
}
fn okd_role_properties(&self, role: &HostRole) -> &'static dyn OKDRoleProperties {
match role {
HostRole::Bootstrap => &BootstrapRole,
HostRole::ControlPlane => &ControlPlaneRole,
HostRole::Worker => &WorkerRole,
}
}
async fn get_nodes(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
) -> Result<Vec<PhysicalHost>, InterpretError> {
let repo = InventoryRepositoryFactory::build().await?;
let mut hosts = repo.get_host_for_role(&self.host_role).await?;
let okd_host_properties = self.okd_role_properties(&self.host_role);
let required_hosts: i16 = okd_host_properties.required_hosts();
info!(
"Discovery of {} {} hosts in progress, current number {}",
required_hosts,
self.host_role,
hosts.len()
);
// This score triggers the discovery agent for a specific role.
DiscoverHostForRoleScore {
role: self.host_role.clone(),
number_desired_hosts: required_hosts,
discovery_strategy: self.discovery_strategy.clone(),
}
.interpret(inventory, topology)
.await?;
hosts = repo.get_host_for_role(&self.host_role).await?;
if hosts.len() < required_hosts.try_into().unwrap_or(0) {
Err(InterpretError::new(format!(
"OKD Requires at least {} {} hosts, but only found {}. Cannot proceed.",
required_hosts,
self.host_role,
hosts.len()
)))
} else {
// Take exactly the number of required hosts to ensure consistency.
Ok(hosts
.into_iter()
.take(required_hosts.try_into().unwrap())
.collect())
}
}
/// Configures DHCP host bindings for all nodes.
async fn configure_host_binding(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
nodes: &Vec<PhysicalHost>,
) -> Result<(), InterpretError> {
info!(
"[{}] Configuring host bindings for {} plane nodes.",
self.host_role, self.host_role,
);
let host_properties = self.okd_role_properties(&self.host_role);
self.validate_host_node_match(nodes, host_properties.logical_hosts(topology))?;
let bindings: Vec<HostBinding> =
self.host_bindings(nodes, host_properties.logical_hosts(topology));
DhcpHostBindingScore {
host_binding: bindings,
domain: Some(topology.domain_name.clone()),
}
.interpret(inventory, topology)
.await?;
Ok(())
}
// Ensure the topology definition matches the number of physical nodes found.
fn validate_host_node_match(
&self,
nodes: &Vec<PhysicalHost>,
hosts: &Vec<LogicalHost>,
) -> Result<(), InterpretError> {
if hosts.len() != nodes.len() {
return Err(InterpretError::new(format!(
"Mismatch between logical hosts defined in topology ({}) and physical nodes found ({}).",
hosts.len(),
nodes.len()
)));
}
Ok(())
}
// Create a binding for each physical host to its corresponding logical host.
fn host_bindings(
&self,
nodes: &Vec<PhysicalHost>,
hosts: &Vec<LogicalHost>,
) -> Vec<HostBinding> {
hosts
.iter()
.zip(nodes.iter())
.map(|(logical_host, physical_host)| {
info!(
"Creating binding: Logical Host '{}' -> Physical Host ID '{}'",
logical_host.name, physical_host.id
);
HostBinding {
logical_host: logical_host.clone(),
physical_host: physical_host.clone(),
}
})
.collect()
}
/// Renders and deploys a per-MAC iPXE boot file for each node.
async fn configure_ipxe(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
nodes: &Vec<PhysicalHost>,
) -> Result<(), InterpretError> {
info!(
"[{}] Rendering per-MAC iPXE configurations.",
self.host_role
);
let okd_role_properties = self.okd_role_properties(&self.host_role);
// The iPXE script content is the same for all control plane nodes,
// pointing to the 'master.ign' ignition file.
let content = BootstrapIpxeTpl {
http_ip: &topology.http_server.get_ip().to_string(),
scos_path: "scos",
ignition_http_path: "okd_ignition_files",
//TODO must be refactored to not only use /dev/sda
installation_device: "/dev/sda", // This might need to be configurable per-host in the future
ignition_file_name: okd_role_properties.ignition_file(),
}
.to_string();
debug!("[{}] iPXE content template:\n{content}", self.host_role);
// Create and apply an iPXE boot file for each node.
for node in nodes {
let mac_address = node.get_mac_address();
if mac_address.is_empty() {
return Err(InterpretError::new(format!(
"Physical host with ID '{}' has no MAC addresses defined.",
node.id
)));
}
info!(
"[{}] Applying iPXE config for node ID '{}' with MACs: {:?}",
self.host_role, node.id, mac_address
);
IPxeMacBootFileScore {
mac_address,
content: content.clone(),
}
.interpret(inventory, topology)
.await?;
}
Ok(())
}
/// Prompts the user to reboot the target control plane nodes.
async fn reboot_targets(&self, nodes: &Vec<PhysicalHost>) -> Result<(), InterpretError> {
let node_ids: Vec<String> = nodes.iter().map(|n| n.id.to_string()).collect();
info!(
"[{}] Requesting reboot for control plane nodes: {node_ids:?}",
self.host_role
);
let confirmation = inquire::Confirm::new(
&format!("Please reboot the {} {} nodes ({}) to apply their PXE configuration. Press enter when ready.", nodes.len(), self.host_role, node_ids.join(", ")),
)
.prompt()
.map_err(|e| InterpretError::new(format!("User prompt failed: {e}")))?;
if !confirmation {
return Err(InterpretError::new(
"User aborted the operation.".to_string(),
));
}
Ok(())
}
}
#[async_trait]
impl Interpret<HAClusterTopology> for OKDNodeInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
) -> Result<Outcome, InterpretError> {
// 1. Ensure we have the specfied number of physical hosts.
let nodes = self.get_nodes(inventory, topology).await?;
// 2. Create DHCP reservations for the nodes.
self.configure_host_binding(inventory, topology, &nodes)
.await?;
// 3. Create iPXE files for each node to boot from the ignition.
self.configure_ipxe(inventory, topology, &nodes).await?;
// 4. Reboot the nodes to start the OS installation.
self.reboot_targets(&nodes).await?;
// TODO: Implement a step to validate that the installation of the nodes is
// complete and for the cluster operators to become available.
//
// The OpenShift installer only provides two wait commands which currently need to be
// run manually:
// - `openshift-install wait-for bootstrap-complete`
// - `openshift-install wait-for install-complete`
//
// There is no installer command that waits specifically for worker node
// provisioning. Worker nodes join asynchronously (via ignition + CSR approval),
// and the cluster becomes fully functional only once all nodes are Ready and the
// cluster operators report Available=True.
info!(
"[{}] Provisioning initiated. Monitor the cluster convergence manually.",
self.host_role
);
Ok(Outcome::success(format!(
"{} provisioning has been successfully initiated.",
self.host_role
)))
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDNodeSetup".into())
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@@ -1 +1,2 @@
pub mod nmstate;
pub mod route;

View File

@@ -0,0 +1,287 @@
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ListMeta, ObjectMeta, Time};
use k8s_openapi::apimachinery::pkg::util::intstr::IntOrString;
use k8s_openapi::{NamespaceResourceScope, Resource};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct LocalObjectReference {
#[serde(skip_serializing_if = "Option::is_none")]
pub name: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Route {
#[serde(skip_serializing_if = "Option::is_none")]
pub api_version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub kind: Option<String>,
pub metadata: ObjectMeta,
pub spec: RouteSpec,
#[serde(skip_serializing_if = "Option::is_none")]
pub status: Option<RouteStatus>,
}
impl Resource for Route {
const API_VERSION: &'static str = "route.openshift.io/v1";
const GROUP: &'static str = "route.openshift.io";
const VERSION: &'static str = "v1";
const KIND: &'static str = "Route";
const URL_PATH_SEGMENT: &'static str = "routes";
type Scope = NamespaceResourceScope;
}
impl k8s_openapi::Metadata for Route {
type Ty = ObjectMeta;
fn metadata(&self) -> &Self::Ty {
&self.metadata
}
fn metadata_mut(&mut self) -> &mut Self::Ty {
&mut self.metadata
}
}
impl Default for Route {
fn default() -> Self {
Route {
api_version: Some("route.openshift.io/v1".to_string()),
kind: Some("Route".to_string()),
metadata: ObjectMeta::default(),
spec: RouteSpec::default(),
status: None,
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteList {
pub metadata: ListMeta,
pub items: Vec<Route>,
}
impl Default for RouteList {
fn default() -> Self {
Self {
metadata: ListMeta::default(),
items: Vec::new(),
}
}
}
impl Resource for RouteList {
const API_VERSION: &'static str = "route.openshift.io/v1";
const GROUP: &'static str = "route.openshift.io";
const VERSION: &'static str = "v1";
const KIND: &'static str = "RouteList";
const URL_PATH_SEGMENT: &'static str = "routes";
type Scope = NamespaceResourceScope;
}
impl k8s_openapi::Metadata for RouteList {
type Ty = ListMeta;
fn metadata(&self) -> &Self::Ty {
&self.metadata
}
fn metadata_mut(&mut self) -> &mut Self::Ty {
&mut self.metadata
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub alternate_backends: Option<Vec<RouteTargetReference>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub host: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub http_headers: Option<RouteHTTPHeaders>,
#[serde(skip_serializing_if = "Option::is_none")]
pub path: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub port: Option<RoutePort>,
#[serde(skip_serializing_if = "Option::is_none")]
pub subdomain: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tls: Option<TLSConfig>,
pub to: RouteTargetReference,
#[serde(skip_serializing_if = "Option::is_none")]
pub wildcard_policy: Option<String>,
}
impl Default for RouteSpec {
fn default() -> RouteSpec {
RouteSpec {
alternate_backends: None,
host: None,
http_headers: None,
path: None,
port: None,
subdomain: None,
tls: None,
to: RouteTargetReference::default(),
wildcard_policy: None,
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteTargetReference {
pub kind: String,
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub weight: Option<i32>,
}
impl Default for RouteTargetReference {
fn default() -> RouteTargetReference {
RouteTargetReference {
kind: String::default(),
name: String::default(),
weight: None,
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RoutePort {
pub target_port: u16,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct TLSConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub ca_certificate: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub certificate: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub destination_ca_certificate: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub external_certificate: Option<LocalObjectReference>,
#[serde(skip_serializing_if = "Option::is_none")]
pub insecure_edge_termination_policy: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub key: Option<String>,
pub termination: String,
}
impl Default for TLSConfig {
fn default() -> Self {
Self {
ca_certificate: None,
certificate: None,
destination_ca_certificate: None,
external_certificate: None,
insecure_edge_termination_policy: None,
key: None,
termination: "edge".to_string(),
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteStatus {
#[serde(skip_serializing_if = "Option::is_none")]
pub ingress: Option<Vec<RouteIngress>>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteIngress {
#[serde(skip_serializing_if = "Option::is_none")]
pub host: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub router_canonical_hostname: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub router_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub wildcard_policy: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub conditions: Option<Vec<RouteIngressCondition>>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteIngressCondition {
#[serde(skip_serializing_if = "Option::is_none")]
pub last_transition_time: Option<Time>,
#[serde(skip_serializing_if = "Option::is_none")]
pub message: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub reason: Option<String>,
pub status: String,
#[serde(rename = "type")]
pub condition_type: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteHTTPHeader {
pub name: String,
pub action: RouteHTTPHeaderActionUnion,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteHTTPHeaderActionUnion {
#[serde(skip_serializing_if = "Option::is_none")]
pub set: Option<RouteSetHTTPHeader>,
#[serde(rename = "type")]
pub action_type: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteSetHTTPHeader {
pub value: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct RouteHTTPHeaderActions {
#[serde(skip_serializing_if = "Option::is_none")]
pub request: Option<Vec<RouteHTTPHeader>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub response: Option<Vec<RouteHTTPHeader>>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct RouteHTTPHeaders {
#[serde(skip_serializing_if = "Option::is_none")]
pub actions: Option<RouteHTTPHeaderActions>,
}

View File

@@ -251,15 +251,14 @@ impl<T: Topology + NetworkManager + Switch> Interpret<T> for HostNetworkConfigur
#[cfg(test)]
mod tests {
use assertor::*;
use brocade::PortOperatingMode;
use harmony_types::{net::MacAddress, switch::PortLocation};
use lazy_static::lazy_static;
use crate::{
hardware::HostCategory,
topology::{
HostNetworkConfig, NetworkError, PortConfig, PreparationError, PreparationOutcome,
SwitchError, SwitchPort,
HostNetworkConfig, NetworkError, PreparationError, PreparationOutcome, SwitchError,
SwitchPort,
},
};
use std::{
@@ -693,14 +692,5 @@ mod tests {
Ok(())
}
async fn clear_port_channel(&self, ids: &Vec<Id>) -> Result<(), SwitchError> {
todo!()
}
async fn configure_interface(
&self,
port_config: &Vec<PortConfig>,
) -> Result<(), SwitchError> {
todo!()
}
}
}

View File

@@ -48,13 +48,10 @@
//! - internal_domain: Internal cluster domain (e.g., cluster.local or harmony.mcd).
use crate::{
modules::{
inventory::HarmonyDiscoveryStrategy,
okd::{
OKDSetup01InventoryScore, OKDSetup02BootstrapScore, OKDSetup03ControlPlaneScore,
OKDSetup04WorkersScore, OKDSetup05SanityCheckScore, OKDSetupPersistNetworkBondScore,
bootstrap_06_installation_report::OKDSetup06InstallationReportScore,
},
modules::okd::{
OKDSetup01InventoryScore, OKDSetup02BootstrapScore, OKDSetup03ControlPlaneScore,
OKDSetup04WorkersScore, OKDSetup05SanityCheckScore, OKDSetupPersistNetworkBondScore,
bootstrap_06_installation_report::OKDSetup06InstallationReportScore,
},
score::Score,
topology::HAClusterTopology,
@@ -63,19 +60,13 @@ use crate::{
pub struct OKDInstallationPipeline;
impl OKDInstallationPipeline {
pub async fn get_all_scores(
discovery_strategy: HarmonyDiscoveryStrategy,
) -> Vec<Box<dyn Score<HAClusterTopology>>> {
pub async fn get_all_scores() -> Vec<Box<dyn Score<HAClusterTopology>>> {
vec![
Box::new(OKDSetup01InventoryScore::new()),
Box::new(OKDSetup02BootstrapScore::new()),
Box::new(OKDSetup03ControlPlaneScore {
discovery_strategy: discovery_strategy.clone(),
}),
Box::new(OKDSetup03ControlPlaneScore::new()),
Box::new(OKDSetupPersistNetworkBondScore::new()),
Box::new(OKDSetup04WorkersScore {
discovery_strategy: discovery_strategy.clone(),
}),
Box::new(OKDSetup04WorkersScore::new()),
Box::new(OKDSetup05SanityCheckScore::new()),
Box::new(OKDSetup06InstallationReportScore::new()),
]

View File

@@ -6,14 +6,13 @@ mod bootstrap_05_sanity_check;
mod bootstrap_06_installation_report;
pub mod bootstrap_dhcp;
pub mod bootstrap_load_balancer;
pub mod bootstrap_okd_node;
mod bootstrap_persist_network_bond;
pub mod dhcp;
pub mod dns;
pub mod installation;
pub mod ipxe;
pub mod load_balancer;
pub mod okd_node;
pub mod route;
pub mod templates;
pub mod upgrade;
pub use bootstrap_01_prepare::*;

View File

@@ -1,54 +0,0 @@
use crate::topology::{HAClusterTopology, LogicalHost};
pub trait OKDRoleProperties {
fn ignition_file(&self) -> &'static str;
fn required_hosts(&self) -> i16;
fn logical_hosts<'a>(&self, t: &'a HAClusterTopology) -> &'a Vec<LogicalHost>;
}
pub struct BootstrapRole;
pub struct ControlPlaneRole;
pub struct WorkerRole;
pub struct StorageRole;
impl OKDRoleProperties for BootstrapRole {
fn ignition_file(&self) -> &'static str {
"bootstrap.ign"
}
fn required_hosts(&self) -> i16 {
1
}
fn logical_hosts<'a>(&self, t: &'a HAClusterTopology) -> &'a Vec<LogicalHost> {
todo!()
}
}
impl OKDRoleProperties for ControlPlaneRole {
fn ignition_file(&self) -> &'static str {
"master.ign"
}
fn required_hosts(&self) -> i16 {
3
}
fn logical_hosts<'a>(&self, t: &'a HAClusterTopology) -> &'a Vec<LogicalHost> {
&t.control_plane
}
}
impl OKDRoleProperties for WorkerRole {
fn ignition_file(&self) -> &'static str {
"worker.ign"
}
fn required_hosts(&self) -> i16 {
2
}
fn logical_hosts<'a>(&self, t: &'a HAClusterTopology) -> &'a Vec<LogicalHost> {
&t.workers
}
}

View File

@@ -0,0 +1,105 @@
// TODO
// Write OKDRouteScore : This is the real one which will apply the k8s resource and expose all
// relevant option to Harmony's various use cases
//
// Write OKDTlsPassthroughScore : This one will use an OKDRouteScore under the hood and simply fill
// in all settings to make this route a TlsPassthrough
//
// These scores are meant to be used by an OKD based topology to provide Capabilities like
// TlsRouter
//
// The first use case to serve here is the postgresql multisite setup, so exposing only the
// settings relevant to this use case is enough at first, following YAGNI.
//
// These scores are not intended to be used directly by a user, unless the user knows that he will
// always be dealing only with okd/openshift compatible topologies and is ready to manage the
// additional maintenance burden that comes with a lower level functionnality.
use harmony_types::rfc1123::Rfc1123Name;
use kube::api::ObjectMeta;
use serde::Serialize;
use crate::modules::k8s::resource::K8sResourceScore;
use crate::modules::okd::crd::route::{
Route, RoutePort, RouteSpec, RouteTargetReference, TLSConfig,
};
use crate::score::Score;
use crate::topology::{K8sclient, TlsRoute, Topology};
#[derive(Debug, Clone, Serialize)]
pub struct OKDRouteScore {
pub name: String,
pub namespace: String,
pub spec: RouteSpec,
}
impl OKDRouteScore {
pub fn new(name: &str, namespace: &str, spec: RouteSpec) -> Self {
Self {
name: name.to_string(),
namespace: namespace.to_string(),
spec,
}
}
}
impl<T: Topology + K8sclient> Score<T> for OKDRouteScore {
fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> {
let route = Route {
metadata: ObjectMeta {
name: Some(self.name.clone()),
namespace: Some(self.namespace.clone()),
..ObjectMeta::default()
},
spec: self.spec.clone(),
..Default::default()
};
K8sResourceScore::single(route, Some(self.namespace.clone())).create_interpret()
}
fn name(&self) -> String {
format!("OKDRouteScore({})", self.name)
}
}
#[derive(Debug, Clone, Serialize)]
pub struct OKDTlsPassthroughScore {
pub route: TlsRoute,
pub name: Rfc1123Name,
}
impl<T: Topology + K8sclient> Score<T> for OKDTlsPassthroughScore {
fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> {
let passthrough_spec = RouteSpec {
host: Some(self.route.hostname.clone()),
wildcard_policy: Some("None".to_string()),
to: RouteTargetReference {
kind: "Service".to_string(),
name: self.route.backend.clone(),
weight: Some(100),
},
port: Some(RoutePort {
target_port: self.route.target_port,
}),
tls: Some(TLSConfig {
termination: "passthrough".to_string(),
insecure_edge_termination_policy: Some("None".to_string()),
..Default::default()
}),
..Default::default()
};
let route_score = OKDRouteScore::new(
&self.name.to_string(),
&self.route.namespace,
passthrough_spec,
);
route_score.create_interpret()
}
fn name(&self) -> String {
format!(
"OKDTlsPassthroughScore({}:{}/{}{})",
self.route.backend, self.route.target_port, self.route.namespace, self.route.hostname
)
}
}

View File

@@ -0,0 +1,107 @@
use async_trait::async_trait;
use harmony_types::storage::StorageSize;
use serde::Serialize;
use std::collections::HashMap;
#[async_trait]
pub trait PostgreSQL: Send + Sync {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String>;
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
/// Abstracts away storage/retrieval details (e.g., secrets, files).
async fn get_replication_certs(&self, config: &PostgreSQLConfig) -> Result<ReplicationCerts, String>;
/// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster.
async fn get_endpoint(&self, config: &PostgreSQLConfig) -> Result<PostgreSQLEndpoint, String>;
// /// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough).
// /// Returns None if no public endpoint (internal-only cluster).
// /// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex
// /// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait.
// async fn get_public_endpoint(
// &self,
// cluster_name: &str,
// ) -> Result<Option<PostgreSQLEndpoint>, String>;
}
#[derive(Clone, Debug, Serialize)]
pub struct PostgreSQLConfig {
pub cluster_name: String,
pub instances: u32,
pub storage_size: StorageSize,
pub role: PostgreSQLClusterRole,
/// **Note :** on OpenShfit based clusters, the namespace `default` has security
/// settings incompatible with the default CNPG behavior.
pub namespace: String,
}
impl PostgreSQLConfig {
pub fn with_namespace(&self, namespace: &str) -> PostgreSQLConfig {
let mut new = self.clone();
new.namespace = namespace.to_string();
new
}
}
impl Default for PostgreSQLConfig {
fn default() -> Self {
Self {
cluster_name: "harmony-pg".to_string(),
instances: 1,
storage_size: StorageSize::gi(1),
role: PostgreSQLClusterRole::Primary,
namespace: "harmony".to_string(),
}
}
}
#[derive(Clone, Debug, Serialize)]
pub enum PostgreSQLClusterRole {
Primary,
Replica(ReplicaConfig),
}
#[derive(Clone, Debug, Serialize)]
pub struct ReplicaConfig {
/// Name of the primary cluster this replica will sync from
pub primary_cluster_name: String,
/// Certs extracted from primary via Topology::get_replication_certs()
pub replication_certs: ReplicationCerts,
/// Bootstrap method (e.g., pg_basebackup from primary)
pub bootstrap: BootstrapConfig,
/// External cluster connection details for CNPG spec.externalClusters
pub external_cluster: ExternalClusterConfig,
}
#[derive(Clone, Debug, Serialize)]
pub struct BootstrapConfig {
pub strategy: BootstrapStrategy,
}
#[derive(Clone, Debug, Serialize)]
pub enum BootstrapStrategy {
PgBasebackup,
}
#[derive(Clone, Debug, Serialize)]
pub struct ExternalClusterConfig {
/// Name used in CNPG externalClusters list
pub name: String,
/// Connection params (host/port set by multisite logic, sslmode='verify-ca', etc.)
pub connection_parameters: HashMap<String, String>,
}
#[derive(Clone, Debug, Serialize)]
pub struct ReplicationCerts {
/// PEM-encoded CA cert from primary
pub ca_cert_pem: String,
/// PEM-encoded streaming_replica client cert (tls.crt)
pub streaming_replica_cert_pem: String,
/// PEM-encoded streaming_replica client key (tls.key)
pub streaming_replica_key_pem: String,
}
#[derive(Clone, Debug)]
pub struct PostgreSQLEndpoint {
pub host: String,
pub port: u16,
}

View File

@@ -0,0 +1,58 @@
use kube::{CustomResource, api::ObjectMeta};
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
#[kube(
group = "postgresql.cnpg.io",
version = "v1",
kind = "Cluster",
plural = "clusters",
namespaced = true,
schema = "disabled"
)]
#[serde(rename_all = "camelCase")]
pub struct ClusterSpec {
pub instances: u32,
pub image_name: Option<String>,
pub storage: Storage,
pub bootstrap: Bootstrap,
}
impl Default for Cluster {
fn default() -> Self {
Cluster {
metadata: ObjectMeta::default(),
spec: ClusterSpec::default(),
}
}
}
impl Default for ClusterSpec {
fn default() -> Self {
Self {
instances: 1,
image_name: None,
storage: Storage::default(),
bootstrap: Bootstrap::default(),
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Storage {
pub size: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Bootstrap {
pub initdb: Initdb,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Initdb {
pub database: String,
pub owner: String,
}

View File

@@ -0,0 +1,2 @@
mod crd;
pub use crd::*;

View File

@@ -0,0 +1,130 @@
use async_trait::async_trait;
use log::debug;
use log::info;
use std::collections::HashMap;
use crate::topology::TlsRouter;
use crate::{
modules::postgresql::capability::{
BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL,
PostgreSQLClusterRole, PostgreSQLConfig, PostgreSQLEndpoint, ReplicaConfig,
ReplicationCerts,
},
topology::FailoverTopology,
};
#[async_trait]
impl<T: PostgreSQL + TlsRouter> PostgreSQL for FailoverTopology<T> {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
info!(
"Starting deployment of failover topology '{}'",
config.cluster_name
);
let primary_config = PostgreSQLConfig {
cluster_name: config.cluster_name.clone(),
instances: config.instances,
storage_size: config.storage_size.clone(),
role: PostgreSQLClusterRole::Primary,
namespace: config.namespace.clone(),
};
info!(
"Deploying primary cluster '{{}}' ({} instances, {:?} storage)",
primary_config.cluster_name, primary_config.storage_size
);
let primary_cluster_name = self.primary.deploy(&primary_config).await?;
info!("Primary cluster '{primary_cluster_name}' deployed successfully");
info!("Retrieving replication certificates for primary '{primary_cluster_name}'");
let certs = self.primary.get_replication_certs(&primary_config).await?;
info!("Replication certificates retrieved successfully");
info!("Retrieving public endpoint for primary '{primary_cluster_name}");
// TODO we should be getting the public endpoint for a service by calling a method on
// TlsRouter capability.
// Something along the lines of `TlsRouter::get_hostname_for_service(...).await?;`
let endpoint = PostgreSQLEndpoint {
host: "postgrestest.sto1.nationtech.io".to_string(),
port: self.primary.get_router_port().await,
};
info!(
"Public endpoint '{}:{}' retrieved for primary",
endpoint.host, endpoint.port
);
info!("Configuring replica connection parameters and bootstrap");
let mut connection_parameters = HashMap::new();
connection_parameters.insert("host".to_string(), endpoint.host);
connection_parameters.insert("port".to_string(), endpoint.port.to_string());
connection_parameters.insert("dbname".to_string(), "postgres".to_string());
connection_parameters.insert("user".to_string(), "streaming_replica".to_string());
connection_parameters.insert("sslmode".to_string(), "verify-ca".to_string());
connection_parameters.insert("sslnegotiation".to_string(), "direct".to_string());
debug!("Replica connection parameters: {:?}", connection_parameters);
let external_cluster = ExternalClusterConfig {
name: primary_cluster_name.clone(),
connection_parameters,
};
let bootstrap_config = BootstrapConfig {
strategy: BootstrapStrategy::PgBasebackup,
};
let replica_cluster_config = ReplicaConfig {
primary_cluster_name: primary_cluster_name.clone(),
replication_certs: certs,
bootstrap: bootstrap_config,
external_cluster,
};
let replica_config = PostgreSQLConfig {
cluster_name: format!("{}-replica", primary_cluster_name),
instances: config.instances,
storage_size: config.storage_size.clone(),
role: PostgreSQLClusterRole::Replica(replica_cluster_config),
namespace: config.namespace.clone(),
};
info!(
"Deploying replica cluster '{}' ({} instances, {:?} storage) on replica topology",
replica_config.cluster_name, replica_config.instances, replica_config.storage_size
);
self.replica.deploy(&replica_config).await?;
info!(
"Replica cluster '{}' deployed successfully; failover topology '{}' ready",
replica_config.cluster_name, replica_config.cluster_name
);
Ok(primary_cluster_name)
}
async fn get_replication_certs(
&self,
config: &PostgreSQLConfig,
) -> Result<ReplicationCerts, String> {
self.primary.get_replication_certs(config).await
}
async fn get_endpoint(&self, config: &PostgreSQLConfig) -> Result<PostgreSQLEndpoint, String> {
self.primary.get_endpoint(config).await
}
// async fn get_public_endpoint(
// &self,
// cluster_name: &str,
// ) -> Result<Option<PostgreSQLEndpoint>, String> {
// self.primary.get_public_endpoint(cluster_name).await
// }
}

View File

@@ -0,0 +1,16 @@
pub mod capability;
mod score_connect;
mod score_k8s;
pub use score_connect::*;
pub use score_k8s::*;
mod score_public;
pub use score_public::*;
pub mod failover;
mod operator;
pub use operator::*;
mod score;
pub use score::*;
pub mod cnpg;

View File

@@ -0,0 +1,102 @@
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use serde::Serialize;
use crate::interpret::Interpret;
use crate::modules::k8s::apps::crd::{Subscription, SubscriptionSpec};
use crate::modules::k8s::resource::K8sResourceScore;
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
/// Install the CloudNativePg (CNPG) Operator via an OperatorHub `Subscription`.
///
/// This Score creates a a `Subscription` Custom Resource in the specified namespace.
///
/// The default implementation pulls the `cloudnative-pg` operator from the
/// `operatorhubio-catalog` source.
///
/// # Goals
/// - Deploy the CNPG Operator to manage PostgreSQL clusters in OpenShift/OKD environments.
///
/// # Usage
/// ```
/// use harmony::modules::postgresql::CloudNativePgOperatorScore;
/// let score = CloudNativePgOperatorScore::default();
/// ```
///
/// Or, you can take control of most relevant fiedls this way :
///
/// ```
/// use harmony::modules::postgresql::CloudNativePgOperatorScore;
///
/// let score = CloudNativePgOperatorScore {
/// namespace: "custom-cnpg-namespace".to_string(),
/// channel: "unstable-i-want-bleedingedge-v498437".to_string(),
/// install_plan_approval: "Manual".to_string(),
/// source: "operatorhubio-catalog-but-different".to_string(),
/// source_namespace: "i-customize-everything-marketplace".to_string(),
/// };
/// ```
///
/// # Limitations
/// - **OperatorHub dependency**: Requires OperatorHub catalog sources (e.g., `operatorhubio-catalog` in `openshift-marketplace`).
/// - **OKD/OpenShift assumption**: Catalog/source names and namespaces are hardcoded for OKD-like setups; adjust for upstream OpenShift.
/// - **Hardcoded values in Default implementation**: Operator name (`cloudnative-pg`), channel (`stable-v1`), automatic install plan approval.
/// - **No config options**: Does not support custom `SubscriptionConfig` (env vars, node selectors, tolerations).
/// - **Single namespace**: Targets one namespace per score instance.
#[derive(Debug, Clone, Serialize)]
pub struct CloudNativePgOperatorScore {
pub namespace: String,
pub channel: String,
pub install_plan_approval: String,
pub source: String,
pub source_namespace: String,
}
impl Default for CloudNativePgOperatorScore {
fn default() -> Self {
Self {
namespace: "openshift-operators".to_string(),
channel: "stable-v1".to_string(),
install_plan_approval: "Automatic".to_string(),
source: "operatorhubio-catalog".to_string(),
source_namespace: "openshift-marketplace".to_string(),
}
}
}
impl CloudNativePgOperatorScore {
pub fn new(namespace: &str) -> Self {
Self {
namespace: namespace.to_string(),
..Default::default()
}
}
}
impl<T: Topology + K8sclient> Score<T> for CloudNativePgOperatorScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let metadata = ObjectMeta {
name: Some("cloudnative-pg".to_string()),
namespace: Some(self.namespace.clone()),
..ObjectMeta::default()
};
let spec = SubscriptionSpec {
channel: Some(self.channel.clone()),
config: None,
install_plan_approval: Some(self.install_plan_approval.clone()),
name: "cloudnative-pg".to_string(),
source: self.source.clone(),
source_namespace: self.source_namespace.clone(),
starting_csv: None,
};
let subscription = Subscription { metadata, spec };
K8sResourceScore::single(subscription, Some(self.namespace.clone())).create_interpret()
}
fn name(&self) -> String {
format!("CloudNativePgOperatorScore({})", self.namespace)
}
}

View File

@@ -0,0 +1,106 @@
use async_trait::async_trait;
use harmony_types::id::Id;
use serde::Serialize;
use crate::data::Version;
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory;
use crate::modules::postgresql::capability::{PostgreSQL, PostgreSQLConfig};
use crate::score::Score;
use crate::topology::Topology;
/// High-level, infrastructure-agnostic PostgreSQL deployment score.
///
/// Delegates to the Topology's PostgreSQL capability implementation,
/// allowing flexibility in deployment strategy (k8s/CNPG, cloud-managed, etc.).
///
/// # Usage
/// ```
/// use harmony::modules::postgresql::PostgreSQLScore;
/// let score = PostgreSQLScore::new("harmony");
/// ```
///
/// # Design
/// - PostgreSQLScore: High-level, relies on Topology's PostgreSQL implementation
/// - Topology implements PostgreSQL capability (decoupled from score)
/// - K8s topologies use K8sPostgreSQLScore internally for CNPG deployment
///
/// This layered approach gives users choice:
/// - Use PostgreSQLScore for portability across topologies
/// - Use K8sPostgreSQLScore directly for k8s-specific control
#[derive(Debug, Clone, Serialize)]
pub struct PostgreSQLScore {
pub config: PostgreSQLConfig,
}
impl Default for PostgreSQLScore {
fn default() -> Self {
Self {
config: PostgreSQLConfig::default(),
}
}
}
impl PostgreSQLScore {
pub fn new(namespace: &str) -> Self {
Self {
config: PostgreSQLConfig {
namespace: namespace.to_string(),
..Default::default()
},
}
}
}
impl<T: Topology + PostgreSQL + Send + Sync> Score<T> for PostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(PostgreSQLInterpret {
config: self.config.clone(),
})
}
fn name(&self) -> String {
format!(
"PostgreSQLScore({}:{})",
self.config.namespace, self.config.cluster_name
)
}
}
/// Interpret implementation that delegates to Topology's PostgreSQL capability.
#[derive(Debug, Clone)]
struct PostgreSQLInterpret {
config: PostgreSQLConfig,
}
#[async_trait]
impl<T: Topology + PostgreSQL + Send + Sync> Interpret<T> for PostgreSQLInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("PostgreSQLInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
async fn execute(&self, _inventory: &Inventory, topo: &T) -> Result<Outcome, InterpretError> {
// Delegate to topology's PostgreSQL capability
let cluster_name = topo
.deploy(&self.config)
.await
.map_err(|e| InterpretError::new(e))?;
Ok(Outcome::success(format!(
"PostgreSQL cluster '{}' deployed in namespace '{}'",
cluster_name, self.config.namespace
)))
}
}

View File

@@ -0,0 +1,442 @@
use async_trait::async_trait;
use k8s_openapi::ByteString;
use k8s_openapi::api::core::v1::Secret;
use log::{debug, error, info, trace};
use serde::Serialize;
use std::collections::BTreeMap;
use std::path::{Path, PathBuf};
use tokio::process::Command;
use crate::data::Version;
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory;
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
use harmony_types::id::Id;
/// PostgreSQLConnectionScore tests PostgreSQL database connectivity and performance metrics
/// for databases exposed via public endpoints. This score is specifically designed to verify
/// that PostgreSQL instances installed using the PublicPostgreSQLScore can be accessed by external clients.
///
/// The score performs the following tests:
/// 1. Verifies TLS/SSL connection using CA certificates from Kubernetes secrets
/// 2. Tests basic connectivity to the database
/// 3. (Optional, when db permissions are setup) Collects comprehensive performance metrics including :
/// - Database size and schema usage statistics
/// - Active connections and query activity
/// - Performance metrics (transactions per second, cache hit ratio)
/// - Index usage and table statistics
/// - Configuration parameters
///
/// The implementation uses a Docker container running PostgreSQL client tools to execute
/// the connection test, ensuring consistent behavior across different environments.
///
/// # Kubernetes Secrets Required
///
/// The score requires two Kubernetes secrets in the target namespace:
/// - `{cluster_name}-app`: Contains connection parameters (host, port, username, password, dbname)
/// - `{cluster_name}-ca`: Contains CA certificate (ca.crt) for TLS verification
///
/// # Usage
///
/// ```rust
/// use harmony::modules::postgresql::PostgreSQLConnectionScore;
///
/// let score = PostgreSQLConnectionScore::new(
/// "default",
/// "my-postgres-cluster",
/// None
/// );
/// ```
///
/// # Parameters
///
/// - `namespace`: Kubernetes namespace where the PostgreSQL secrets are located
/// - `cluster_name`: Name of the PostgreSQL cluster (used to construct secret names)
/// - `hostname_override`: Optional hostname override for connection testing
/// - `port_override`: Optional port override for connection testing
#[derive(Debug, Clone, Serialize)]
pub struct PostgreSQLConnectionScore {
pub name: String,
pub namespace: String,
pub cluster_name: String,
pub hostname: Option<String>,
pub port_override: Option<u16>,
}
fn decode_secret(data: &BTreeMap<String, ByteString>, key: &str) -> Result<String, InterpretError> {
let val = data
.get(key)
.ok_or_else(|| InterpretError::new(format!("Secret missing key {}", key)))?;
String::from_utf8(val.0.clone())
.map_err(|e| InterpretError::new(format!("Failed to decode {}: {}", key, e)))
}
impl PostgreSQLConnectionScore {
pub fn new(namespace: &str, cluster_name: &str, hostname_override: Option<String>) -> Self {
Self {
name: format!("postgres-connection-{}", cluster_name),
namespace: namespace.to_string(),
cluster_name: cluster_name.to_string(),
hostname: hostname_override,
port_override: None,
}
}
}
impl<T: Topology + K8sclient + Send + Sync> Score<T> for PostgreSQLConnectionScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(PostgreSQLConnectionInterpret {
score: self.clone(),
})
}
fn name(&self) -> String {
format!("PostgreSQLConnectionScore : {}", self.name)
}
}
#[derive(Debug, Clone)]
struct PostgreSQLConnectionInterpret {
score: PostgreSQLConnectionScore,
}
impl PostgreSQLConnectionInterpret {
async fn fetch_app_secret<T: K8sclient>(&self, topo: &T) -> Result<Secret, InterpretError> {
let app_secret_name = format!("{}-app", self.score.cluster_name);
info!("Fetching app secret {}", app_secret_name);
let k8s_client = topo.k8s_client().await?;
k8s_client
.get_resource(&app_secret_name, Some(&self.score.namespace))
.await
.map_err(|e| InterpretError::new(format!("Failed to get app secret: {e}")))?
.ok_or_else(|| InterpretError::new(format!("App secret {} not found", app_secret_name)))
}
async fn fetch_ca_secret<T: K8sclient>(&self, topo: &T) -> Result<Secret, InterpretError> {
let ca_secret_name = format!("{}-ca", self.score.cluster_name);
info!("Fetching CA secret {}", ca_secret_name);
let k8s_client = topo.k8s_client().await?;
k8s_client
.get_resource(&ca_secret_name, Some(&self.score.namespace))
.await
.map_err(|e| InterpretError::new(format!("Failed to get CA secret: {e}")))?
.ok_or_else(|| InterpretError::new(format!("CA secret {} not found", ca_secret_name)))
}
fn get_secret_data(
&self,
secret: &Secret,
secret_type: &str,
) -> Result<BTreeMap<String, ByteString>, InterpretError> {
secret
.data
.as_ref()
.ok_or_else(|| InterpretError::new(format!("{} secret has no data", secret_type)))
.map(|b| b.clone())
}
fn create_temp_dir(&self) -> Result<tempfile::TempDir, InterpretError> {
tempfile::Builder::new()
.prefix("pg-connection-test-")
.tempdir()
.map_err(|e| InterpretError::new(format!("Failed to create temp directory: {e}")))
}
fn write_ca_cert(
&self,
temp_dir: &Path,
ca_data: &BTreeMap<String, ByteString>,
) -> Result<PathBuf, InterpretError> {
let ca_crt = ca_data
.get("ca.crt")
.ok_or_else(|| InterpretError::new("CA secret missing ca.crt".to_string()))?;
let ca_file = temp_dir.join("ca.crt");
std::fs::write(&ca_file, &ca_crt.0)
.map_err(|e| InterpretError::new(format!("Failed to write CA cert: {e}")))?;
Ok(ca_file)
}
fn get_host(&self, data: &BTreeMap<String, ByteString>) -> Result<String, InterpretError> {
self.score
.hostname
.clone()
.or_else(|| decode_secret(data, "host").ok())
.ok_or_else(|| {
InterpretError::new("No hostname found in secret or override".to_string())
})
}
fn get_port(&self, data: &BTreeMap<String, ByteString>) -> Result<u16, InterpretError> {
self.score
.port_override
.or_else(|| {
decode_secret(data, "port")
.ok()
.and_then(|p| p.parse().ok())
})
.ok_or_else(|| InterpretError::new("Port not found in secret or override".to_string()))
}
fn create_test_script(&self, temp_dir: &Path) -> Result<PathBuf, InterpretError> {
let script_path = temp_dir.join("test_connection.sh");
let script_content = postgres_scipt_content();
std::fs::write(&script_path, script_content)
.map_err(|e| InterpretError::new(format!("Failed to write test script: {e}")))?;
debug!("Wrote script content : \n{script_content}");
#[cfg(unix)]
{
use std::os::unix::fs::PermissionsExt;
let mut perms = std::fs::metadata(&script_path)
.map_err(|e| InterpretError::new(format!("Failed to get script metadata: {e}")))?
.permissions();
perms.set_mode(0o755);
std::fs::set_permissions(&script_path, perms).map_err(|e| {
InterpretError::new(format!("Failed to set script permissions: {e}"))
})?;
}
Ok(script_path)
}
async fn run_docker_test(
&self,
temp_dir: &Path,
cmd: &str,
password: &str,
) -> Result<Outcome, InterpretError> {
info!("Running connection test in Docker container...");
let container_cmd = format!("PGPASSWORD={} /tmp/test_connection.sh {}", password, cmd);
debug!("Starting docker container with cmd : {container_cmd}");
let mut cmd = Command::new("docker");
cmd.arg("run")
.arg("--rm")
.arg("-i")
.arg("-v")
.arg(format!("{}/:/tmp", temp_dir.display()))
.arg("--workdir")
.arg("/tmp")
.arg("--entrypoint")
.arg("/bin/sh")
.arg("postgres:latest")
.arg("-c")
.arg(container_cmd)
.env("PGPASSWORD", password)
.stdout(std::process::Stdio::inherit())
.stderr(std::process::Stdio::inherit());
debug!("Running Command {cmd:?}");
let output = cmd
.spawn()
.map_err(|e| InterpretError::new(format!("Failed to spawn docker container: {e}")))?
.wait_with_output()
.await
.map_err(|e| {
InterpretError::new(format!("Failed to wait for docker container: {e}"))
})?;
if output.status.success() {
info!("Successfully connected to PostgreSQL!");
Ok(Outcome::success("Connection successful".to_string()))
} else {
let stdout = String::from_utf8_lossy(&output.stdout);
let stderr = String::from_utf8_lossy(&output.stderr);
error!("Connection failed: stdout:\n{stdout}\nstderr:\n{stderr}");
Err(InterpretError::new(format!(
"Connection failed: stdout:\n{stdout}\nstderr:\n{stderr}",
)))
}
}
}
#[async_trait]
impl<T: Topology + K8sclient + Send + Sync> Interpret<T> for PostgreSQLConnectionInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("PostgreSQLConnectionInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
vec![]
}
async fn execute(&self, _inventory: &Inventory, topo: &T) -> Result<Outcome, InterpretError> {
// Fetch secrets
let app_secret = self.fetch_app_secret(topo).await?;
trace!("Got app_secret {app_secret:?}");
let ca_secret = self.fetch_ca_secret(topo).await?;
trace!("Got ca_secret {ca_secret:?}");
// Get secret data
let app_data = self.get_secret_data(&app_secret, "App")?;
trace!("Got app_data {app_data:?}");
let ca_data = self.get_secret_data(&ca_secret, "CA")?;
trace!("Got ca_data {ca_data:?}");
// Create temp directory
let temp_dir = self.create_temp_dir()?;
let temp_dir_path = temp_dir.path();
debug!("Created temp dir {temp_dir_path:?}");
// Write CA cert
let ca_file = self.write_ca_cert(temp_dir_path, &ca_data)?;
debug!("Wrote ca_file {ca_file:?}");
// Get connection details
let username = decode_secret(&app_data, "username")?;
let password = decode_secret(&app_data, "password")?;
let dbname = decode_secret(&app_data, "dbname")?;
let host = self.get_host(&app_data)?;
let port = self.get_port(&app_data)?;
// Create test script
let script_path = self.create_test_script(temp_dir_path)?;
let ca_file_in_container = Path::new("/tmp").join(ca_file.file_name().unwrap());
let script_cmd = format!(
"{host} {port} {username} {dbname} {}",
ca_file_in_container.display()
);
debug!("Prepared test script in {}", temp_dir_path.display());
// Run connection test
self.run_docker_test(temp_dir_path, &script_cmd, &password)
.await
}
}
fn postgres_scipt_content() -> &'static str {
r#"
#!/bin/sh
# PostgreSQL connection test and metrics collection script
# Basic connectivity test
echo "=== CONNECTION TEST ==="
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT 1" > /dev/null 2>&1
if [ $? -ne 0 ]; then
echo "ERROR: Connection failed"
exit 1
fi
echo "Connection successful"
# Database size metrics
echo -e "\n=== DATABASE SIZE METRICS ==="
echo "Total database size (MB):"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT pg_size_pretty(pg_database_size(current_database()))" -t -A
echo "Database size breakdown:"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT
schema_name,
pg_size_pretty(sum(table_size)) as total_size
FROM (
SELECT
n.nspname as schema_name,
c.relname as table_name,
pg_total_relation_size(c.oid) as table_size
FROM pg_class c
LEFT JOIN pg_namespace n ON n.oid = c.relnamespace
WHERE n.nspname NOT IN ('pg_catalog', 'information_schema')
AND c.relkind = 'r'
) t
GROUP BY schema_name
ORDER BY sum(table_size) DESC" -t
# Connection and activity metrics
echo -e "\n=== CONNECTION & ACTIVITY ==="
echo "Active connections:"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT count(*) FROM pg_stat_activity" -t -A
echo "Current queries (running longer than 1 second):"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT
pid,
usename,
query_start,
now() - query_start as duration,
state,
left(query, 50) as query_preview
FROM pg_stat_activity
WHERE state = 'active' AND now() - query_start > interval '1 second'
ORDER BY duration DESC" -t
# Performance metrics
echo -e "\n=== PERFORMANCE METRICS ==="
echo "Database load (transactions per second):"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT
tps,
tps_commit,
tps_rollback,
blks_read,
blks_hit,
hit_ratio
FROM (
SELECT
xact_commit as tps_commit,
xact_rollback as tps_rollback,
(xact_commit + xact_rollback) as tps,
blks_read,
blks_hit,
CASE WHEN blks_read + blks_hit = 0 THEN 0 ELSE (blks_hit * 100.0 / (blks_read + blks_hit))::numeric(5,2) END as hit_ratio
FROM pg_stat_database
WHERE datname = current_database()
) stats" -t
echo "Current locks:"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT
count(*) as lock_count,
string_agg(mode, ', ' ORDER BY mode) as lock_modes
FROM pg_locks" -t
# Table statistics
echo -e "\n=== TABLE STATISTICS ==="
echo "Most accessed tables:"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT
relname,
seq_scan,
idx_scan,
n_tup_ins,
n_tup_upd,
n_tup_del
FROM pg_stat_user_tables
ORDER BY seq_scan + idx_scan + n_tup_ins + n_tup_upd + n_tup_del DESC
LIMIT 10" -t
# Index usage
echo -e "\n=== INDEX USAGE ==="
echo "Index usage statistics:"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT
indexrelname as index_name,
idx_scan,
idx_tup_read,
idx_tup_fetch
FROM pg_stat_user_indexes
ORDER BY idx_scan DESC
LIMIT 5" -t
# Configuration and limits
echo -e "\n=== CONFIGURATION ==="
echo "Current database parameters:"
psql "host=$1 port=$2 user=$3 dbname=$4 sslmode=verify-ca sslrootcert=$5 sslnegotiation=direct" -c "SELECT
name,
setting,
unit
FROM pg_settings
WHERE category = 'Resource Usage'
ORDER BY name" -t
echo -e "\n=== TEST COMPLETE ==="
echo "All metrics collected successfully"
exit 0
"#
}

View File

@@ -0,0 +1,80 @@
use serde::Serialize;
use crate::interpret::Interpret;
use crate::modules::k8s::resource::K8sResourceScore;
use crate::modules::postgresql::capability::PostgreSQLConfig;
use crate::modules::postgresql::cnpg::{Bootstrap, Cluster, ClusterSpec, Initdb, Storage};
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
/// Deploys an opinionated, highly available PostgreSQL cluster managed by CNPG.
///
/// # Usage
/// ```
/// use harmony::modules::postgresql::PostgreSQLScore;
/// let score = PostgreSQLScore::new("my-app-ns");
/// ```
///
/// # Limitations (Happy Path)
/// - Requires CNPG operator installed (use CloudNativePgOperatorScore).
/// - No backups, monitoring, extensions configured.
///
/// TODO : refactor this to declare a clean dependency on cnpg operator. Then cnpg operator will
/// self-deploy either using operatorhub or helm chart depending on k8s flavor. This is cnpg
/// specific behavior
#[derive(Debug, Clone, Serialize)]
pub struct K8sPostgreSQLScore {
pub config: PostgreSQLConfig,
}
impl Default for K8sPostgreSQLScore {
fn default() -> Self {
Self {
config: PostgreSQLConfig::default(),
}
}
}
impl K8sPostgreSQLScore {
pub fn new(namespace: &str) -> Self {
Self {
config: PostgreSQLConfig {
namespace: namespace.to_string(),
..Default::default()
},
}
}
}
impl<T: Topology + K8sclient> Score<T> for K8sPostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let metadata = ObjectMeta {
name: Some(self.config.cluster_name.clone()),
namespace: Some(self.config.namespace.clone()),
..ObjectMeta::default()
};
let spec = ClusterSpec {
instances: self.config.instances,
storage: Storage {
size: self.config.storage_size.to_string(),
},
bootstrap: Bootstrap {
initdb: Initdb {
database: "app".to_string(),
owner: "app".to_string(),
},
},
..ClusterSpec::default()
};
let cluster = Cluster { metadata, spec };
K8sResourceScore::single(cluster, Some(self.config.namespace.clone())).create_interpret()
}
fn name(&self) -> String {
format!("PostgreSQLScore({})", self.config.namespace)
}
}

View File

@@ -0,0 +1,104 @@
use async_trait::async_trait;
use harmony_types::id::Id;
use serde::Serialize;
use crate::data::Version;
use crate::domain::topology::router::{TlsRoute, TlsRouter};
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory;
use crate::modules::postgresql::capability::{PostgreSQL, PostgreSQLConfig};
use crate::score::Score;
use crate::topology::Topology;
/// Deploys a public PostgreSQL cluster: CNPG + TLS passthrough route for RW endpoint.
/// For failover/multisite: exposes single-instance or small HA Postgres publicly.
///
/// Sequence: PostgreSQLScore → TlsRouter::install_route (RW backend).
///
/// # Usage
/// ```
/// use harmony::modules::postgresql::PublicPostgreSQLScore;
/// let score = PublicPostgreSQLScore::new("harmony", "pg-rw.example.com");
/// ```
#[derive(Debug, Clone, Serialize)]
pub struct PublicPostgreSQLScore {
/// Inner non-public Postgres cluster config.
pub config: PostgreSQLConfig,
/// Public hostname for RW TLS passthrough (port 443 → cluster-rw:5432).
pub hostname: String,
}
impl PublicPostgreSQLScore {
pub fn new(namespace: &str, hostname: &str) -> Self {
Self {
config: PostgreSQLConfig::default().with_namespace(namespace),
hostname: hostname.to_string(),
}
}
}
impl<T: Topology + PostgreSQL + TlsRouter + Send + Sync> Score<T> for PublicPostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let rw_backend = format!("{}-rw", self.config.cluster_name);
let tls_route = TlsRoute {
namespace: self.config.namespace.clone(),
hostname: self.hostname.clone(),
backend: rw_backend,
target_port: 5432,
};
Box::new(PublicPostgreSQLInterpret {
config: self.config.clone(),
tls_route,
})
}
fn name(&self) -> String {
format!(
"PublicPostgreSQLScore({}:{})",
self.config.namespace, self.hostname
)
}
}
/// Custom interpret: deploy Postgres then install public TLS route.
#[derive(Debug, Clone)]
struct PublicPostgreSQLInterpret {
config: PostgreSQLConfig,
tls_route: TlsRoute,
}
#[async_trait]
impl<T: Topology + PostgreSQL + TlsRouter + Send + Sync> Interpret<T>
for PublicPostgreSQLInterpret
{
fn get_name(&self) -> InterpretName {
InterpretName::Custom("PublicPostgreSQLInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
async fn execute(&self, _inventory: &Inventory, topo: &T) -> Result<Outcome, InterpretError> {
// Deploy CNPG cluster first (creates -rw service)
topo.deploy(&self.config)
.await
.map_err(|e| InterpretError::new(e))?;
// Expose RW publicly via TLS passthrough
topo.install_route(self.tls_route.clone())
.await
.map_err(|e| InterpretError::new(e))?;
Ok(Outcome::success(format!(
"Public CNPG cluster '{}' deployed with TLS passthrough route '{}'",
self.config.cluster_name.clone(),
self.tls_route.hostname
)))
}
}

View File

@@ -1,11 +0,0 @@
cargo build -p harmony_inventory_agent --release --target x86_64-unknown-linux-musl
SCRIPT_DIR="$(dirname ${0})"
cd "${SCRIPT_DIR}/docker/"
cp ../../target/x86_64-unknown-linux-musl/release/harmony_inventory_agent .
docker build . -t hub.nationtech.io/harmony/harmony_inventory_agent
docker push hub.nationtech.io/harmony/harmony_inventory_agent

View File

@@ -1 +0,0 @@
harmony_inventory_agent

View File

@@ -1,17 +0,0 @@
FROM debian:12-slim
# install packages required to make these commands available : lspci, lsmod, dmidecode, smartctl, ip
RUN apt-get update && \
apt-get install -y --no-install-recommends pciutils kmod dmidecode smartmontools iproute2 && \
rm -rf /var/lib/apt/lists/*
RUN mkdir /app
WORKDIR /app/
COPY harmony_inventory_agent /app/
ENV RUST_LOG=info
CMD [ "/app/harmony_inventory_agent" ]

View File

@@ -1,117 +0,0 @@
apiVersion: v1
kind: Namespace
metadata:
name: harmony-inventory-agent
labels:
pod-security.kubernetes.io/enforce: privileged
pod-security.kubernetes.io/audit: privileged
pod-security.kubernetes.io/warn: privileged
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: harmony-inventory-agent
namespace: harmony-inventory-agent
---
# Grant the built-in "privileged" SCC to the SA
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: use-privileged-scc
namespace: harmony-inventory-agent
rules:
- apiGroups: ["security.openshift.io"]
resources: ["securitycontextconstraints"]
resourceNames: ["privileged"]
verbs: ["use"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: use-privileged-scc
namespace: harmony-inventory-agent
subjects:
- kind: ServiceAccount
name: harmony-inventory-agent
namespace: harmony-inventory-agent
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: use-privileged-scc
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: harmony-inventory-agent
namespace: harmony-inventory-agent
spec:
selector:
matchLabels:
app: harmony-inventory-agent
template:
metadata:
labels:
app: harmony-inventory-agent
spec:
serviceAccountName: harmony-inventory-agent
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
tolerations:
- key: "node-role.kubernetes.io/master"
operator: "Exists"
effect: "NoSchedule"
containers:
- name: inventory-agent
image: hub.nationtech.io/harmony/harmony_inventory_agent
imagePullPolicy: Always
env:
- name: RUST_LOG
value: "harmony_inventory_agent=trace,info"
resources:
limits:
cpu: 200m
memory: 256Mi
requests:
cpu: 100m
memory: 128Mi
securityContext:
privileged: true
# optional: leave the rest unset since privileged SCC allows it
#
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: harmony-inventory-builder
namespace: harmony-inventory-agent
spec:
replicas: 1
strategy: {}
selector:
matchLabels:
app: harmony-inventory-builder
template:
metadata:
labels:
app: harmony-inventory-builder
spec:
serviceAccountName: harmony-inventory-agent
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
containers:
- name: inventory-agent
image: hub.nationtech.io/harmony/harmony_inventory_builder
imagePullPolicy: Always
env:
- name: RUST_LOG
value: "harmony_inventory_builder=trace,info"
resources:
limits:
cpu: 200m
memory: 256Mi
requests:
cpu: 100m
memory: 128Mi
securityContext:
privileged: true
# optional: leave the rest unset since privileged SCC allows it

View File

@@ -1,5 +1,5 @@
use harmony_types::net::MacAddress;
use log::{debug, trace, warn};
use log::{debug, warn};
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::fs;
@@ -121,48 +121,20 @@ pub struct ManagementInterface {
impl PhysicalHost {
pub fn gather() -> Result<Self, String> {
trace!("Start gathering physical host information");
let mut sys = System::new_all();
trace!("System new_all called");
sys.refresh_all();
trace!("System refresh_all called");
Self::all_tools_available()?;
trace!("All tools_available success");
let storage_drives = Self::gather_storage_drives()?;
trace!("got storage drives");
let storage_controller = Self::gather_storage_controller()?;
trace!("got storage controller");
let memory_modules = Self::gather_memory_modules()?;
trace!("got memory_modules");
let cpus = Self::gather_cpus(&sys)?;
trace!("got cpus");
let chipset = Self::gather_chipset()?;
trace!("got chipsets");
let network_interfaces = Self::gather_network_interfaces()?;
trace!("got network_interfaces");
let management_interface = Self::gather_management_interface()?;
trace!("got management_interface");
let host_uuid = Self::get_host_uuid()?;
Ok(Self {
storage_drives,
storage_controller,
memory_modules,
cpus,
chipset,
network_interfaces,
management_interface,
host_uuid,
storage_drives: Self::gather_storage_drives()?,
storage_controller: Self::gather_storage_controller()?,
memory_modules: Self::gather_memory_modules()?,
cpus: Self::gather_cpus(&sys)?,
chipset: Self::gather_chipset()?,
network_interfaces: Self::gather_network_interfaces()?,
management_interface: Self::gather_management_interface()?,
host_uuid: Self::get_host_uuid()?,
})
}
@@ -236,8 +208,6 @@ impl PhysicalHost {
));
}
debug!("All tools found!");
Ok(())
}
@@ -261,10 +231,7 @@ impl PhysicalHost {
fn gather_storage_drives() -> Result<Vec<StorageDrive>, String> {
let mut drives = Vec::new();
trace!("Starting storage drive discovery using lsblk");
// Use lsblk with JSON output for robust parsing
trace!("Executing 'lsblk -d -o NAME,MODEL,SERIAL,SIZE,ROTA,WWN -n -e 7 --json'");
let output = Command::new("lsblk")
.args([
"-d",
@@ -278,18 +245,13 @@ impl PhysicalHost {
.output()
.map_err(|e| format!("Failed to execute lsblk: {}", e))?;
trace!(
"lsblk command executed successfully (status: {:?})",
output.status
);
if !output.status.success() {
let stderr_str = String::from_utf8_lossy(&output.stderr);
debug!("lsblk command failed: {stderr_str}");
return Err(format!("lsblk command failed: {stderr_str}"));
return Err(format!(
"lsblk command failed: {}",
String::from_utf8_lossy(&output.stderr)
));
}
trace!("Parsing lsblk JSON output");
let json: Value = serde_json::from_slice(&output.stdout)
.map_err(|e| format!("Failed to parse lsblk JSON output: {}", e))?;
@@ -298,8 +260,6 @@ impl PhysicalHost {
.and_then(|v| v.as_array())
.ok_or("Invalid lsblk JSON: missing 'blockdevices' array")?;
trace!("Found {} blockdevices in lsblk output", blockdevices.len());
for device in blockdevices {
let name = device
.get("name")
@@ -308,72 +268,52 @@ impl PhysicalHost {
.to_string();
if name.is_empty() {
trace!("Skipping unnamed device entry: {:?}", device);
continue;
}
trace!("Inspecting block device: {name}");
// Extract metadata fields
let model = device
.get("model")
.and_then(|v| v.as_str())
.map(|s| s.trim().to_string())
.unwrap_or_default();
trace!("Model for {name}: '{}'", model);
let serial = device
.get("serial")
.and_then(|v| v.as_str())
.map(|s| s.trim().to_string())
.unwrap_or_default();
trace!("Serial for {name}: '{}'", serial);
let size_str = device
.get("size")
.and_then(|v| v.as_str())
.ok_or("Missing 'size' in lsblk device")?;
trace!("Reported size for {name}: {}", size_str);
let size_bytes = Self::parse_size(size_str)?;
trace!("Parsed size for {name}: {} bytes", size_bytes);
let rotational = device
.get("rota")
.and_then(|v| v.as_bool())
.ok_or("Missing 'rota' in lsblk device")?;
trace!("Rotational flag for {name}: {}", rotational);
let wwn = device
.get("wwn")
.and_then(|v| v.as_str())
.map(|s| s.trim().to_string())
.filter(|s| !s.is_empty() && s != "null");
trace!("WWN for {name}: {:?}", wwn);
let device_path = Path::new("/sys/block").join(&name);
trace!("Sysfs path for {name}: {:?}", device_path);
trace!("Reading logical block size for {name}");
let logical_block_size = Self::read_sysfs_u32(
&device_path.join("queue/logical_block_size"),
)
.map_err(|e| format!("Failed to read logical block size for {}: {}", name, e))?;
trace!("Logical block size for {name}: {}", logical_block_size);
trace!("Reading physical block size for {name}");
let physical_block_size = Self::read_sysfs_u32(
&device_path.join("queue/physical_block_size"),
)
.map_err(|e| format!("Failed to read physical block size for {}: {}", name, e))?;
trace!("Physical block size for {name}: {}", physical_block_size);
trace!("Determining interface type for {name}");
let interface_type = Self::get_interface_type(&name, &device_path)?;
trace!("Interface type for {name}: {}", interface_type);
trace!("Getting SMART status for {name}");
let smart_status = Self::get_smart_status(&name)?;
trace!("SMART status for {name}: {:?}", smart_status);
let mut drive = StorageDrive {
name: name.clone(),
@@ -390,31 +330,19 @@ impl PhysicalHost {
// Enhance with additional sysfs info if available
if device_path.exists() {
trace!("Enhancing drive {name} with extra sysfs metadata");
if drive.model.is_empty() {
trace!("Reading model from sysfs for {name}");
drive.model = Self::read_sysfs_string(&device_path.join("device/model"))
.unwrap_or_else(|_| format!("Failed to read model for {}", name));
.unwrap_or(format!("Failed to read model for {}", name));
}
if drive.serial.is_empty() {
trace!("Reading serial from sysfs for {name}");
drive.serial = Self::read_sysfs_string(&device_path.join("device/serial"))
.unwrap_or_else(|_| format!("Failed to read serial for {}", name));
.unwrap_or(format!("Failed to read serial for {}", name));
}
} else {
trace!(
"Sysfs path {:?} not found for drive {name}, skipping extra metadata",
device_path
);
}
debug!("Discovered storage drive: {drive:?}");
drives.push(drive);
}
debug!("Discovered total {} storage drives", drives.len());
trace!("All discovered dives: {drives:?}");
Ok(drives)
}
@@ -490,8 +418,6 @@ impl PhysicalHost {
}
}
debug!("Found storage controller {controller:?}");
Ok(controller)
}
@@ -560,7 +486,6 @@ impl PhysicalHost {
}
}
debug!("Found memory modules {modules:?}");
Ok(modules)
}
@@ -576,30 +501,22 @@ impl PhysicalHost {
frequency_mhz: global_cpu.frequency(),
});
debug!("Found cpus {cpus:?}");
Ok(cpus)
}
fn gather_chipset() -> Result<Chipset, String> {
let chipset = Chipset {
Ok(Chipset {
name: Self::read_dmi("baseboard-product-name")?,
vendor: Self::read_dmi("baseboard-manufacturer")?,
};
debug!("Found chipset {chipset:?}");
Ok(chipset)
})
}
fn gather_network_interfaces() -> Result<Vec<NetworkInterface>, String> {
let mut interfaces = Vec::new();
let sys_net_path = Path::new("/sys/class/net");
trace!("Reading /sys/class/net");
let entries = fs::read_dir(sys_net_path)
.map_err(|e| format!("Failed to read /sys/class/net: {}", e))?;
trace!("Got entries {entries:?}");
for entry in entries {
let entry = entry.map_err(|e| format!("Failed to read directory entry: {}", e))?;
@@ -608,7 +525,6 @@ impl PhysicalHost {
.into_string()
.map_err(|_| "Invalid UTF-8 in interface name")?;
let iface_path = entry.path();
trace!("Inspecting interface {iface_name} path {iface_path:?}");
// Skip virtual interfaces
if iface_name.starts_with("lo")
@@ -619,101 +535,70 @@ impl PhysicalHost {
|| iface_name.starts_with("tun")
|| iface_name.starts_with("wg")
{
trace!(
"Skipping interface {iface_name} because it appears to be virtual/unsupported"
);
continue;
}
// Check if it's a physical interface by looking for device directory
if !iface_path.join("device").exists() {
trace!(
"Skipping interface {iface_name} since {iface_path:?}/device does not exist"
);
continue;
}
trace!("Reading MAC address for {iface_name}");
let mac_address = Self::read_sysfs_string(&iface_path.join("address"))
.map_err(|e| format!("Failed to read MAC address for {}: {}", iface_name, e))?;
let mac_address = MacAddress::try_from(mac_address).map_err(|e| e.to_string())?;
trace!("MAC address for {iface_name}: {mac_address}");
let speed_path = iface_path.join("speed");
let speed_mbps = if speed_path.exists() {
trace!("Reading speed for {iface_name} from {:?}", speed_path);
match Self::read_sysfs_u32(&speed_path) {
Ok(speed) => {
trace!("Speed for {iface_name}: {speed} Mbps");
Some(speed)
}
let speed_mbps = if iface_path.join("speed").exists() {
match Self::read_sysfs_u32(&iface_path.join("speed")) {
Ok(speed) => Some(speed),
Err(e) => {
debug!(
"Failed to read speed for {}: {} (this may be expected on WiFi interfaces)",
"Failed to read speed for {}: {} . This is expected to fail on wifi interfaces.",
iface_name, e
);
None
}
}
} else {
trace!("Speed file not found for {iface_name}, skipping");
None
};
trace!("Reading operstate for {iface_name}");
let operstate = Self::read_sysfs_string(&iface_path.join("operstate"))
.map_err(|e| format!("Failed to read operstate for {}: {}", iface_name, e))?;
trace!("Operstate for {iface_name}: {operstate}");
trace!("Reading MTU for {iface_name}");
let mtu = Self::read_sysfs_u32(&iface_path.join("mtu"))
.map_err(|e| format!("Failed to read MTU for {}: {}", iface_name, e))?;
trace!("MTU for {iface_name}: {mtu}");
trace!("Reading driver for {iface_name}");
let driver =
Self::read_sysfs_symlink_basename(&iface_path.join("device/driver/module"))
.map_err(|e| format!("Failed to read driver for {}: {}", iface_name, e))?;
trace!("Driver for {iface_name}: {driver}");
trace!("Reading firmware version for {iface_name}");
let firmware_version = Self::read_sysfs_opt_string(
&iface_path.join("device/firmware_version"),
)
.map_err(|e| format!("Failed to read firmware version for {}: {}", iface_name, e))?;
trace!("Firmware version for {iface_name}: {firmware_version:?}");
trace!("Fetching IP addresses for {iface_name}");
// Get IP addresses using ip command with JSON output
let (ipv4_addresses, ipv6_addresses) = Self::get_interface_ips_json(&iface_name)
.map_err(|e| format!("Failed to get IP addresses for {}: {}", iface_name, e))?;
trace!("Interface {iface_name} has IPv4: {ipv4_addresses:?}, IPv6: {ipv6_addresses:?}");
let is_up = operstate == "up";
trace!("Constructing NetworkInterface for {iface_name} (is_up={is_up})");
let iface = NetworkInterface {
name: iface_name.clone(),
interfaces.push(NetworkInterface {
name: iface_name,
mac_address,
speed_mbps,
is_up,
is_up: operstate == "up",
mtu,
ipv4_addresses,
ipv6_addresses,
driver,
firmware_version,
};
debug!("Discovered interface: {iface:?}");
interfaces.push(iface);
});
}
debug!("Discovered total {} network interfaces", interfaces.len());
trace!("Interfaces collected: {interfaces:?}");
Ok(interfaces)
}
fn gather_management_interface() -> Result<Option<ManagementInterface>, String> {
let mgmt = if Path::new("/dev/ipmi0").exists() {
if Path::new("/dev/ipmi0").exists() {
Ok(Some(ManagementInterface {
kind: "IPMI".to_string(),
address: None,
@@ -727,16 +612,11 @@ impl PhysicalHost {
}))
} else {
Ok(None)
};
debug!("Found management interface {mgmt:?}");
mgmt
}
}
fn get_host_uuid() -> Result<String, String> {
let uuid = Self::read_dmi("system-uuid");
debug!("Found uuid {uuid:?}");
uuid
Self::read_dmi("system-uuid")
}
// Helper methods
@@ -829,8 +709,7 @@ impl PhysicalHost {
Ok("Ramdisk".to_string())
} else {
// Try to determine from device path
let subsystem = Self::read_sysfs_string(&device_path.join("device/subsystem"))
.unwrap_or(String::new());
let subsystem = Self::read_sysfs_string(&device_path.join("device/subsystem"))?;
Ok(subsystem
.split('/')
.next_back()
@@ -900,8 +779,6 @@ impl PhysicalHost {
size.map(|s| s as u64)
}
// FIXME when scanning an interface that is part of a bond/bridge we won't get an address on the
// interface, we should be looking at the bond/bridge device. For example, br-ex on k8s nodes.
fn get_interface_ips_json(iface_name: &str) -> Result<(Vec<String>, Vec<String>), String> {
let mut ipv4 = Vec::new();
let mut ipv6 = Vec::new();

View File

@@ -1,4 +1,4 @@
use log::{debug, error, info, trace, warn};
use log::{debug, error, info, warn};
use mdns_sd::{ServiceDaemon, ServiceInfo};
use std::collections::HashMap;
@@ -12,7 +12,6 @@ use crate::{
/// This function is synchronous and non-blocking. It spawns a background Tokio task
/// to handle the mDNS advertisement for the lifetime of the application.
pub fn advertise(service_port: u16) -> Result<(), PresenceError> {
trace!("starting advertisement process for port {service_port}");
let host_id = match PhysicalHost::gather() {
Ok(host) => Some(host.host_uuid),
Err(e) => {
@@ -21,15 +20,11 @@ pub fn advertise(service_port: u16) -> Result<(), PresenceError> {
}
};
trace!("Found host id {host_id:?}");
let instance_name = format!(
"inventory-agent-{}",
host_id.clone().unwrap_or("unknown".to_string())
);
trace!("Found host id {host_id:?}, name : {instance_name}");
let spawned_msg = format!("Spawned local presence advertisement task for '{instance_name}'.");
tokio::spawn(async move {

View File

@@ -28,7 +28,7 @@ async fn inventory() -> impl Responder {
async fn main() -> std::io::Result<()> {
env_logger::init();
let port = env::var("HARMONY_INVENTORY_AGENT_PORT").unwrap_or_else(|_| "25000".to_string());
let port = env::var("HARMONY_INVENTORY_AGENT_PORT").unwrap_or_else(|_| "8080".to_string());
let port = port
.parse::<u16>()
.expect(&format!("Invalid port number, cannot parse to u16 {port}"));

View File

@@ -135,17 +135,15 @@ pub fn ingress_path(input: TokenStream) -> TokenStream {
#[proc_macro]
pub fn cidrv4(input: TokenStream) -> TokenStream {
let lit = parse_macro_input!(input as LitStr);
let input = parse_macro_input!(input as LitStr);
let cidr_str = input.value();
// This is the IMPORTANT part:
// we re-emit the *string literal itself*
let expanded = quote! {
#lit
.parse::<cidr::Ipv4Cidr>()
.expect("Invalid IPv4 CIDR literal")
};
if cidr_str.parse::<cidr::Ipv4Cidr>().is_ok() {
let expanded = quote! { #cidr_str.parse::<cidr::Ipv4Cidr>().unwrap() };
return TokenStream::from(expanded);
}
TokenStream::from(expanded)
panic!("Invalid IPv4 CIDR : {}", cidr_str);
}
/// Creates a `harmony_types::net::Url::Url` from a string literal.

View File

@@ -9,4 +9,4 @@ license.workspace = true
serde.workspace = true
url.workspace = true
rand.workspace = true
log.workspace = true
serde_json.workspace = true

View File

@@ -1,3 +1,5 @@
pub mod id;
pub mod net;
pub mod rfc1123;
pub mod storage;
pub mod switch;

View File

@@ -0,0 +1,231 @@
/// A String that can be used as a subdomain.
///
/// This means the name must:
///
/// - contain no more than 253 characters
/// - contain only lowercase alphanumeric characters, '-' or '.'
/// - start with an alphanumeric character
/// - end with an alphanumeric character
///
/// https://datatracker.ietf.org/doc/html/rfc1123
///
/// This is relevant in harmony since most k8s resource names are required to be usable as dns
/// subdomains.
///
/// See https://kubernetes.io/docs/concepts/overview/working-with-objects/names/
#[derive(Debug, Clone)]
pub struct Rfc1123Name {
content: String,
}
impl TryFrom<&str> for Rfc1123Name {
fn try_from(s: &str) -> Result<Self, String> {
let mut content = s.to_lowercase();
// Remove invalid characters
content.retain(|c| c.is_ascii_alphanumeric() || c == '-' || c == '.');
// Enforce max length
if content.len() > 253 {
content.truncate(253);
}
// Trim leading/trailing dots
content = content.trim_matches('.').to_string();
// Deduplicate consecutive dots
loop {
let new_content = content.replace("..", ".");
if new_content == content {
break;
}
content = new_content;
}
// Trim leading/trailing non-alphanumeric
content = content
.trim_matches(|c: char| !c.is_ascii_alphanumeric())
.to_string();
if content.is_empty() {
return Err(format!("Input '{}' resulted in empty string", s));
}
Ok(Self { content })
}
type Error = String;
}
/// Converts an `Rfc1123Name` into a `String`.
///
/// This allows using `Rfc1123Name` in contexts where a `String` is expected.
impl From<Rfc1123Name> for String {
fn from(name: Rfc1123Name) -> Self {
name.content
}
}
/// Serializes the `Rfc1123Name` as a string.
///
/// This directly serializes the inner `String` content without additional wrapping.
impl serde::Serialize for Rfc1123Name {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
serializer.serialize_str(&self.content)
}
}
/// Deserializes an `Rfc1123Name` from a string.
///
/// This directly deserializes into the inner `String` content without additional wrapping.
impl<'de> serde::Deserialize<'de> for Rfc1123Name {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
let content = String::deserialize(deserializer)?;
Ok(Self { content })
}
}
/// Displays the `Rfc1123Name` as a string.
///
/// This directly displays the inner `String` content without additional wrapping.
impl std::fmt::Display for Rfc1123Name {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.content)
}
}
#[cfg(test)]
mod tests {
use super::Rfc1123Name;
#[test]
fn test_try_from_empty() {
let name = Rfc1123Name::try_from("");
assert!(name.is_err());
}
#[test]
fn test_try_from_valid() {
let name = Rfc1123Name::try_from("hello-world").unwrap();
assert_eq!(name.content, "hello-world");
}
#[test]
fn test_try_from_uppercase() {
let name = Rfc1123Name::try_from("Hello-World").unwrap();
assert_eq!(name.content, "hello-world");
}
#[test]
fn test_try_from_invalid_chars() {
let name = Rfc1123Name::try_from("hel@lo#w!or%ld123").unwrap();
assert_eq!(name.content, "helloworld123");
}
#[test]
fn test_try_from_leading_dot() {
let name = Rfc1123Name::try_from(".hello").unwrap();
assert_eq!(name.content, "hello");
}
#[test]
fn test_try_from_trailing_dot() {
let name = Rfc1123Name::try_from("hello.").unwrap();
assert_eq!(name.content, "hello");
}
#[test]
fn test_try_from_leading_hyphen() {
let name = Rfc1123Name::try_from("-hello").unwrap();
assert_eq!(name.content, "hello");
}
#[test]
fn test_try_from_complicated_string() {
let name = Rfc1123Name::try_from("--h--e,}{}12!$#)\np_aulbS\r\t.!@o--._--").unwrap();
assert_eq!(name.content, "h--e12paulbs.o");
}
#[test]
fn test_try_from_trailing_hyphen() {
let name = Rfc1123Name::try_from("hello-").unwrap();
assert_eq!(name.content, "hello");
}
#[test]
fn test_try_from_single_hyphen() {
let name = Rfc1123Name::try_from("-");
assert!(name.is_err());
}
#[test]
fn test_from_str() {
let name: Rfc1123Name = "test-name".try_into().unwrap();
assert_eq!(name.content, "test-name");
}
#[test]
fn test_into_string() {
let name = Rfc1123Name::try_from("test").unwrap();
let s: String = name.into();
assert_eq!(s, "test");
}
#[test]
fn test_compliance() {
let inputs = vec![
"valid",
"in-VALID",
".dots",
"-hyphen",
"hyphen-",
"!!1@",
"aaaaaaaaaa",
"--abc--",
"a.b-c",
];
for input in inputs {
let name = Rfc1123Name::try_from(input).unwrap();
let s = &name.content;
// Check only allowed characters
for c in s.chars() {
assert!(c.is_ascii_alphanumeric() || c == '-' || c == '.');
}
// Check starts and ends with alphanumeric
if !s.is_empty() {
assert!(s.chars().next().unwrap().is_ascii_alphanumeric());
assert!(s.chars().last().unwrap().is_ascii_alphanumeric());
}
}
}
#[test]
fn test_enforces_max_length() {
let long_input = "a".repeat(300);
let name = Rfc1123Name::try_from(long_input.as_str()).unwrap();
assert_eq!(name.content.len(), 253);
assert_eq!(name.content, "a".repeat(253));
}
#[test]
fn test_truncate_trim_end() {
let input = "a".repeat(252) + "-";
let name = Rfc1123Name::try_from(input.as_str()).unwrap();
assert_eq!(name.content.len(), 252);
assert_eq!(name.content, "a".repeat(252));
}
#[test]
fn test_dedup_dots() {
let input = "a..b...c";
let name = Rfc1123Name::try_from(input).unwrap();
assert_eq!(name.content, "a.b.c");
}
}

View File

@@ -0,0 +1,171 @@
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Debug)]
pub struct StorageSize {
size_bytes: u64,
#[serde(skip)]
display_value: Option<u64>,
#[serde(skip)]
display_suffix: Option<String>,
}
impl StorageSize {
pub fn new(size_bytes: u64) -> Self {
Self {
size_bytes,
display_value: None,
display_suffix: None,
}
}
pub fn b(size: u64) -> Self {
Self {
size_bytes: size,
display_value: Some(size),
display_suffix: Some("B".to_string()),
}
}
pub fn kb(size: u64) -> Self {
Self {
size_bytes: size * 1024,
display_value: Some(size),
display_suffix: Some("KB".to_string()),
}
}
pub fn mb(size: u64) -> Self {
Self {
size_bytes: size * 1024 * 1024,
display_value: Some(size),
display_suffix: Some("MB".to_string()),
}
}
pub fn gb(size: u64) -> Self {
Self {
size_bytes: size * 1024 * 1024 * 1024,
display_value: Some(size),
display_suffix: Some("GB".to_string()),
}
}
pub fn gi(size: u64) -> Self {
Self {
size_bytes: size * 1024 * 1024 * 1024,
display_value: Some(size),
display_suffix: Some("Gi".to_string()),
}
}
pub fn tb(size: u64) -> Self {
Self {
size_bytes: size * 1024 * 1024 * 1024 * 1024,
display_value: Some(size),
display_suffix: Some("TB".to_string()),
}
}
pub fn ti(size: u64) -> Self {
Self {
size_bytes: size * 1024 * 1024 * 1024 * 1024,
display_value: Some(size),
display_suffix: Some("Ti".to_string()),
}
}
pub fn bytes(&self) -> u64 {
self.size_bytes
}
}
impl fmt::Display for StorageSize {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
if let Some(suffix) = &self.display_suffix {
let value = self.display_value.unwrap_or(self.size_bytes);
write!(f, "{}{}", value, suffix)
} else {
write!(f, "{}B", self.size_bytes)
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_bytes() {
let size = StorageSize::b(123);
assert_eq!(size.bytes(), 123);
assert_eq!(size.to_string(), "123B");
}
#[test]
fn test_kilobytes() {
let size = StorageSize::kb(2);
assert_eq!(size.bytes(), 2048);
assert_eq!(size.to_string(), "2KB");
}
#[test]
fn test_megabytes() {
let size = StorageSize::mb(3);
assert_eq!(size.bytes(), 3 * 1024 * 1024);
assert_eq!(size.to_string(), "3MB");
}
#[test]
fn test_gigabytes() {
let size = StorageSize::gb(4);
assert_eq!(size.bytes(), 4 * 1024 * 1024 * 1024);
assert_eq!(size.to_string(), "4GB");
}
#[test]
fn test_gibibytes() {
let size = StorageSize::gi(1);
assert_eq!(size.bytes(), 1024 * 1024 * 1024);
assert_eq!(size.to_string(), "1Gi");
}
#[test]
fn test_terabytes() {
let size = StorageSize::tb(5);
assert_eq!(size.bytes(), 5 * 1024 * 1024 * 1024 * 1024);
assert_eq!(size.to_string(), "5TB");
}
#[test]
fn test_tebibytes() {
let size = StorageSize::ti(1);
assert_eq!(size.bytes(), 1024 * 1024 * 1024 * 1024);
assert_eq!(size.to_string(), "1Ti");
}
#[test]
fn test_new_without_suffix() {
let size = StorageSize::new(999);
assert_eq!(size.bytes(), 999);
assert_eq!(size.to_string(), "999B");
}
#[test]
fn test_serde_roundtrip() {
let original = StorageSize::gi(1);
let serialized = serde_json::to_string(&original).unwrap();
let deserialized: StorageSize = serde_json::from_str(&serialized).unwrap();
assert_eq!(original.bytes(), deserialized.bytes());
// Note: suffix is lost during serialization/deserialization
assert_ne!(original.to_string(), deserialized.to_string());
}
#[test]
fn test_ord() {
let one_gb = StorageSize::gb(1);
let one_gi = StorageSize::gi(1);
assert!(one_gb < one_gi); // 1GB = 1000MB, 1Gi = 1024MB
}
}

View File

@@ -1,5 +1,3 @@
use log::trace;
use serde::Serialize;
use std::{fmt, str::FromStr};
/// Simple error type for port parsing failures.
@@ -23,7 +21,7 @@ impl fmt::Display for PortParseError {
/// Represents the atomic, physical location of a switch port: `<Stack>/<Module>/<Port>`.
///
/// Example: `1/1/1`
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, Serialize)]
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub struct PortLocation(pub u8, pub u8, pub u8);
impl fmt::Display for PortLocation {
@@ -72,12 +70,6 @@ impl FromStr for PortLocation {
pub enum PortDeclaration {
/// A single switch port defined by its location. Example: `PortDeclaration::Single(1/1/1)`
Single(PortLocation),
/// A Named port, often used for virtual ports such as PortChannels. Example
/// ```rust
/// # use harmony_types::switch::PortDeclaration;
/// PortDeclaration::Named("1".to_string());
/// ```
Named(String),
/// A strictly sequential range defined by two endpoints using the hyphen separator (`-`).
/// All ports between the endpoints (inclusive) are implicitly included.
/// Example: `PortDeclaration::Range(1/1/1, 1/1/4)`
@@ -138,25 +130,8 @@ impl PortDeclaration {
return Ok(PortDeclaration::Set(start_port, end_port));
}
match PortLocation::from_str(port_str) {
Ok(loc) => Ok(PortDeclaration::Single(loc)),
Err(e) => {
let segments: Vec<&str> = port_str.split('/').collect();
let segment_count = segments.len();
// Logic:
// If it has 3 segments but failed (e.g., "1/A/1"), it's an InvalidSegment.
// If it has MORE than 3 segments (e.g., "1/1/1/1" or "1/1/1/"), it's an InvalidFormat.
if segment_count >= 3 {
return Err(e);
}
// Otherwise, it's something else entirely (e.g., "eth0", "vlan10"),
// so we treat it as a Named port.
trace!("Falling back on named port for: {port_str}");
Ok(PortDeclaration::Named(port_str.to_string()))
}
}
let location = PortLocation::from_str(port_str)?;
Ok(PortDeclaration::Single(location))
}
}
@@ -166,7 +141,6 @@ impl fmt::Display for PortDeclaration {
PortDeclaration::Single(port) => write!(f, "{port}"),
PortDeclaration::Range(start, end) => write!(f, "{start}-{end}"),
PortDeclaration::Set(start, end) => write!(f, "{start}*{end}"),
PortDeclaration::Named(name) => write!(f, "{name}"),
}
}
}

View File

@@ -106,37 +106,11 @@ pub struct HAProxy {
pub groups: MaybeString,
pub users: MaybeString,
pub cpus: MaybeString,
pub resolvers: HAProxyResolvers,
pub resolvers: MaybeString,
pub mailers: MaybeString,
pub maintenance: Maintenance,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct HAProxyResolvers {
#[yaserde(rename = "resolver")]
pub resolver: Option<Resolver>,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct Resolver {
pub id: String,
pub enabled: i32,
pub name: String,
pub description: MaybeString,
pub nameservers: String,
pub parse_resolv_conf: String,
pub resolve_retries: i32,
pub timeout_resolve: String,
pub timeout_retry: String,
pub accepted_payload_size: MaybeString,
pub hold_valid: MaybeString,
pub hold_obsolete: MaybeString,
pub hold_refused: MaybeString,
pub hold_nx: MaybeString,
pub hold_timeout: MaybeString,
pub hold_other: MaybeString,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct Maintenance {
#[yaserde(rename = "cronjobs")]

View File

@@ -136,7 +136,6 @@ pub struct Rule {
pub updated: Option<Updated>,
pub created: Option<Created>,
pub disabled: Option<MaybeString>,
pub log: Option<u32>,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
@@ -196,7 +195,7 @@ pub struct System {
pub disablechecksumoffloading: u8,
pub disablesegmentationoffloading: u8,
pub disablelargereceiveoffloading: u8,
pub ipv6allow: Option<u8>,
pub ipv6allow: u8,
pub powerd_ac_mode: String,
pub powerd_battery_mode: String,
pub powerd_normal_mode: String,
@@ -217,7 +216,7 @@ pub struct System {
pub maximumfrags: Option<MaybeString>,
pub aliasesresolveinterval: Option<MaybeString>,
pub maximumtableentries: Option<MaybeString>,
pub language: Option<String>,
pub language: String,
pub dnsserver: Option<MaybeString>,
pub dns1gw: Option<String>,
pub dns2gw: Option<String>,
@@ -227,7 +226,6 @@ pub struct System {
pub dns6gw: Option<String>,
pub dns7gw: Option<String>,
pub dns8gw: Option<String>,
pub prefer_ipv4: Option<String>,
pub dnsallowoverride: u8,
pub dnsallowoverride_exclude: Option<MaybeString>,
}
@@ -331,7 +329,6 @@ pub struct Range {
pub struct StaticMap {
pub mac: String,
pub ipaddr: String,
pub cid: Option<MaybeString>,
pub hostname: String,
pub descr: Option<MaybeString>,
pub winsserver: MaybeString,
@@ -767,19 +764,9 @@ pub struct Jobs {
pub struct Job {
#[yaserde(attribute = true)]
pub uuid: MaybeString,
pub name: Option<MaybeString>,
#[yaserde(rename = "name")]
pub name: MaybeString,
// Add other fields as needed
pub origin: Option<MaybeString>,
pub enabled: Option<MaybeString>,
pub minutes: Option<MaybeString>,
pub hours: Option<MaybeString>,
pub days: Option<MaybeString>,
pub months: Option<MaybeString>,
pub weekdays: Option<MaybeString>,
pub who: Option<MaybeString>,
pub command: Option<MaybeString>,
pub parameters: Option<MaybeString>,
pub description: Option<MaybeString>,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
@@ -908,28 +895,28 @@ pub struct Proxy {
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct ProxyGeneral {
pub enabled: i8,
pub error_pages: Option<MaybeString>,
pub error_pages: String,
#[yaserde(rename = "icpPort")]
pub icp_port: MaybeString,
pub logging: Logging,
#[yaserde(rename = "alternateDNSservers")]
pub alternate_dns_servers: MaybeString,
#[yaserde(rename = "dnsV4First")]
pub dns_v4_first: Option<MaybeString>,
pub dns_v4_first: i8,
#[yaserde(rename = "forwardedForHandling")]
pub forwarded_for_handling: Option<MaybeString>,
pub forwarded_for_handling: String,
#[yaserde(rename = "uriWhitespaceHandling")]
pub uri_whitespace_handling: Option<MaybeString>,
pub uri_whitespace_handling: String,
#[yaserde(rename = "enablePinger")]
pub enable_pinger: i8,
#[yaserde(rename = "useViaHeader")]
pub use_via_header: Option<MaybeString>,
pub use_via_header: i8,
#[yaserde(rename = "suppressVersion")]
pub suppress_version: Option<MaybeString>,
pub suppress_version: i32,
#[yaserde(rename = "connecttimeout")]
pub connect_timeout: Option<MaybeString>,
pub connect_timeout: MaybeString,
#[yaserde(rename = "VisibleEmail")]
pub visible_email: Option<MaybeString>,
pub visible_email: String,
#[yaserde(rename = "VisibleHostname")]
pub visible_hostname: MaybeString,
pub cache: Cache,
@@ -966,7 +953,7 @@ pub struct LocalCache {
pub cache_mem: i32,
pub maximum_object_size: MaybeString,
pub maximum_object_size_in_memory: MaybeString,
pub memory_cache_mode: MaybeString,
pub memory_cache_mode: String,
pub size: i32,
pub l1: i32,
pub l2: i32,
@@ -978,13 +965,13 @@ pub struct LocalCache {
pub struct Traffic {
pub enabled: i32,
#[yaserde(rename = "maxDownloadSize")]
pub max_download_size: MaybeString,
pub max_download_size: i32,
#[yaserde(rename = "maxUploadSize")]
pub max_upload_size: MaybeString,
pub max_upload_size: i32,
#[yaserde(rename = "OverallBandwidthTrotteling")]
pub overall_bandwidth_trotteling: MaybeString,
pub overall_bandwidth_trotteling: i32,
#[yaserde(rename = "perHostTrotteling")]
pub per_host_trotteling: MaybeString,
pub per_host_trotteling: i32,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
@@ -1001,7 +988,7 @@ pub struct ParentProxy {
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct Forward {
pub interfaces: MaybeString,
pub interfaces: String,
pub port: i32,
pub sslbumpport: i32,
pub sslbump: i32,
@@ -1046,9 +1033,9 @@ pub struct Acl {
pub google_apps: MaybeString,
pub youtube: MaybeString,
#[yaserde(rename = "safePorts")]
pub safe_ports: MaybeString,
pub safe_ports: String,
#[yaserde(rename = "sslPorts")]
pub ssl_ports: MaybeString,
pub ssl_ports: String,
#[yaserde(rename = "remoteACLs")]
pub remote_acls: RemoteAcls,
}
@@ -1064,9 +1051,9 @@ pub struct RemoteAcls {
pub struct Icap {
pub enable: i32,
#[yaserde(rename = "RequestURL")]
pub request_url: MaybeString,
pub request_url: String,
#[yaserde(rename = "ResponseURL")]
pub response_url: MaybeString,
pub response_url: String,
#[yaserde(rename = "SendClientIP")]
pub send_client_ip: i32,
#[yaserde(rename = "SendUsername")]
@@ -1074,7 +1061,7 @@ pub struct Icap {
#[yaserde(rename = "EncodeUsername")]
pub encode_username: i32,
#[yaserde(rename = "UsernameHeader")]
pub username_header: MaybeString,
pub username_header: String,
#[yaserde(rename = "EnablePreview")]
pub enable_preview: i32,
#[yaserde(rename = "PreviewSize")]
@@ -1089,9 +1076,9 @@ pub struct Authentication {
pub method: MaybeString,
#[yaserde(rename = "authEnforceGroup")]
pub auth_enforce_group: MaybeString,
pub realm: MaybeString,
pub credentialsttl: MaybeString, // This field is already in snake_case
pub children: MaybeString,
pub realm: String,
pub credentialsttl: i32, // This field is already in snake_case
pub children: i32,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
@@ -1153,7 +1140,6 @@ pub struct UnboundGeneral {
pub local_zone_type: String,
pub outgoing_interface: MaybeString,
pub enable_wpad: MaybeString,
pub safesearch: MaybeString,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
@@ -1207,15 +1193,15 @@ pub struct Acls {
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct Dnsbl {
pub enabled: Option<i32>,
pub safesearch: Option<MaybeString>,
pub enabled: i32,
pub safesearch: MaybeString,
#[yaserde(rename = "type")]
pub r#type: Option<MaybeString>,
pub lists: Option<MaybeString>,
pub whitelists: Option<MaybeString>,
pub blocklists: Option<MaybeString>,
pub wildcards: Option<MaybeString>,
pub address: Option<MaybeString>,
pub r#type: MaybeString,
pub lists: MaybeString,
pub whitelists: MaybeString,
pub blocklists: MaybeString,
pub wildcards: MaybeString,
pub address: MaybeString,
pub nxdomain: Option<i32>,
}
@@ -1243,7 +1229,6 @@ pub struct Host {
pub ttl: Option<MaybeString>,
pub server: String,
pub description: Option<String>,
pub txtdata: MaybeString,
}
impl Host {
@@ -1259,7 +1244,6 @@ impl Host {
ttl: Some(MaybeString::default()),
mx: MaybeString::default(),
description: None,
txtdata: MaybeString::default(),
}
}
}
@@ -1309,7 +1293,6 @@ pub struct WireguardServerItem {
pub peers: String,
pub endpoint: MaybeString,
pub peer_dns: MaybeString,
pub debug: Option<MaybeString>,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
@@ -1494,7 +1477,6 @@ pub struct Ppp {
pub ports: Option<MaybeString>,
pub username: Option<MaybeString>,
pub password: Option<MaybeString>,
pub provider: Option<MaybeString>,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]

View File

@@ -86,7 +86,10 @@ impl<'a> DhcpConfigLegacyISC<'a> {
mac,
ipaddr: ipaddr.to_string(),
hostname,
..Default::default()
descr: Default::default(),
winsserver: Default::default(),
dnsserver: Default::default(),
ntpserver: Default::default(),
};
existing_mappings.push(static_map);
@@ -123,7 +126,9 @@ impl<'a> DhcpConfigLegacyISC<'a> {
ipaddr: entry["ipaddr"].as_str().unwrap_or_default().to_string(),
hostname: entry["hostname"].as_str().unwrap_or_default().to_string(),
descr: entry["descr"].as_str().map(MaybeString::from),
..Default::default()
winsserver: MaybeString::default(),
dnsserver: MaybeString::default(),
ntpserver: MaybeString::default(),
})
.collect();

View File

@@ -612,7 +612,6 @@
<local_zone_type>transparent</local_zone_type>
<outgoing_interface/>
<enable_wpad>0</enable_wpad>
<safesearch/>
</general>
<advanced>
<hideidentity>0</hideidentity>

Some files were not shown because too many files have changed in this diff Show More