Compare commits

..

8 Commits

Author SHA1 Message Date
9fbdc72cd0 fix: git ignore
All checks were successful
Run Check Script / check (pull_request) Successful in 1m29s
2025-11-18 08:41:09 -05:00
78e595e696 feat: added alert manager routes to openshift cluster monitoring
All checks were successful
Run Check Script / check (pull_request) Successful in 1m37s
2025-11-17 15:22:43 -05:00
90b89224d8 fix: added K8sName type for strict naming of Kubernetes resources 2025-11-17 15:20:51 -05:00
759a9287d3 Merge remote-tracking branch 'origin/master' into feat/cluster_monitoring
Some checks failed
Run Check Script / check (pull_request) Failing after 19s
2025-11-05 17:02:10 -05:00
24922321b1 fix: webhook name must be k8s field compliant, add a FIXME note 2025-11-05 16:59:48 -05:00
cf84f2cce8 wip: cluster_monitoring almost there, a kink to fix in the yaml handling
All checks were successful
Run Check Script / check (pull_request) Successful in 1m15s
2025-10-29 23:12:34 -04:00
a12d12aa4f feat: example OpenshiftClusterAlertScore
All checks were successful
Run Check Script / check (pull_request) Successful in 1m17s
2025-10-29 17:29:28 -04:00
cefb65933a wip: cluster monitoring score coming along, this simply edits OKD builtin alertmanager instance and adds a receiver 2025-10-29 17:26:21 -04:00
47 changed files with 1317 additions and 1690 deletions

19
Cargo.lock generated
View File

@@ -1804,6 +1804,25 @@ dependencies = [
"url",
]
[[package]]
name = "example-okd-cluster-alerts"
version = "0.1.0"
dependencies = [
"brocade",
"cidr",
"env_logger",
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_secret",
"harmony_secret_derive",
"harmony_types",
"log",
"serde",
"tokio",
"url",
]
[[package]]
name = "example-okd-install"
version = "0.1.0"

View File

@@ -1,105 +0,0 @@
# Design Document: Harmony PostgreSQL Module
**Status:** Draft
**Last Updated:** 2025-12-01
**Context:** Multi-site Data Replication & Orchestration
## 1. Overview
The Harmony PostgreSQL Module provides a high-level abstraction for deploying and managing high-availability PostgreSQL clusters across geographically distributed Kubernetes/OKD sites.
Instead of manually configuring complex replication slots, firewalls, and operator settings on each cluster, users define a single intent (a **Score**), and Harmony orchestrates the underlying infrastructure (the **Arrangement**) to establish a Primary-Replica architecture.
Currently, the implementation relies on the **CloudNativePG (CNPG)** operator as the backing engine.
## 2. Architecture
### 2.1 The Abstraction Model
Following **ADR 003 (Infrastructure Abstraction)**, Harmony separates the *intent* from the *implementation*.
1. **The Score (Intent):** The user defines a `MultisitePostgreSQL` resource. This describes *what* is needed (e.g., "A Postgres 15 cluster with 10GB storage, Primary on Site A, Replica on Site B").
2. **The Interpret (Action):** Harmony MultisitePostgreSQLInterpret processes this Score and orchestrates the deployment on both sites to reach the state defined in the Score.
3. **The Capability (Implementation):** The PostgreSQL Capability is implemented by the K8sTopology and the interpret can deploy it, configure it and fetch information about it. The concrete implementation will rely on the mature CloudnativePG operator to manage all the Kubernetes resources required.
### 2.2 Network Connectivity (TLS Passthrough)
One of the critical challenges in multi-site orchestration is secure connectivity between clusters that may have dynamic IPs or strict firewalls.
To solve this, we utilize **OKD/OpenShift Routes with TLS Passthrough**.
* **Mechanism:** The Primary site exposes a `Route` configured for `termination: passthrough`.
* **Routing:** The OpenShift HAProxy router inspects the **SNI (Server Name Indication)** header of the incoming TCP connection to route traffic to the correct PostgreSQL Pod.
* **Security:** SSL is **not** terminated at the ingress router. The encrypted stream is passed directly to the PostgreSQL instance. Mutual TLS (mTLS) authentication is handled natively by CNPG between the Primary and Replica instances.
* **Dynamic IPs:** Because connections are established via DNS hostnames (the Route URL), this architecture is resilient to dynamic IP changes at the Primary site.
#### Traffic Flow Diagram
```text
[ Site B: Replica ] [ Site A: Primary ]
| |
(CNPG Instance) --[Encrypted TCP]--> (OKD HAProxy Router)
| (Port 443) |
| |
| [SNI Inspection]
| |
| v
| (PostgreSQL Primary Pod)
| (Port 5432)
```
## 3. Design Decisions
### Why CloudNativePG?
We selected CloudNativePG because it relies exclusively on standard Kubernetes primitives and uses the native PostgreSQL replication protocol (WAL shipping/Streaming). This aligns with Harmony's goal of being "K8s Native."
### Why TLS Passthrough instead of VPN/NodePort?
* **NodePort:** Requires static IPs and opening non-standard ports on the firewall, which violates our security constraints.
* **VPN (e.g., Wireguard/Tailscale):** While secure, it introduces significant complexity (sidecars, key management) and external dependencies.
* **TLS Passthrough:** Leverages the existing Ingress/Router infrastructure already present in OKD. It requires zero additional software and respects multi-tenancy (Routes are namespaced).
### Configuration Philosophy (YAGNI)
The current design exposes a **generic configuration surface**. Users can configure standard parameters (Storage size, CPU/Memory requests, Postgres version).
**We explicitly do not expose advanced CNPG or PostgreSQL configurations at this stage.**
* **Reasoning:** We aim to keep the API surface small and manageable.
* **Future Path:** We plan to implement a "pass-through" mechanism to allow sending raw config maps or custom parameters to the underlying engine (CNPG) *only when a concrete use case arises*. Until then, we adhere to the **YAGNI (You Ain't Gonna Need It)** principle to avoid premature optimization and API bloat.
## 4. Usage Guide
To deploy a multi-site cluster, apply the `MultisitePostgreSQL` resource to the Harmony Control Plane.
### Example Manifest
```yaml
apiVersion: harmony.io/v1alpha1
kind: MultisitePostgreSQL
metadata:
name: finance-db
namespace: tenant-a
spec:
version: "15"
storage: "10Gi"
resources:
requests:
cpu: "500m"
memory: "1Gi"
# Topology Definition
topology:
primary:
site: "site-paris" # The name of the cluster in Harmony
replicas:
- site: "site-newyork"
```
### What happens next?
1. Harmony detects the CR.
2. **On Site Paris:** It deploys a CNPG Cluster (Primary) and creates a Passthrough Route `postgres-finance-db.apps.site-paris.example.com`.
3. **On Site New York:** It deploys a CNPG Cluster (Replica) configured with `externalClusters` pointing to the Paris Route.
4. Data begins replicating immediately over the encrypted channel.
## 5. Troubleshooting
* **Connection Refused:** Ensure the Primary site's Route is successfully admitted by the Ingress Controller.
* **Certificate Errors:** CNPG manages mTLS automatically. If errors persist, ensure the CA secrets were correctly propagated by Harmony from Primary to Replica namespaces.

View File

@@ -24,13 +24,14 @@ use harmony::{
},
topology::K8sAnywhereTopology,
};
use harmony_types::net::Url;
use harmony_types::{k8s_name::K8sName, net::Url};
#[tokio::main]
async fn main() {
let discord_receiver = DiscordWebhook {
name: "test-discord".to_string(),
name: K8sName("test-discord".to_string()),
url: Url::Url(url::Url::parse("https://discord.doesnt.exist.com").unwrap()),
selectors: vec![],
};
let high_pvc_fill_rate_over_two_days_alert = high_pvc_fill_rate_over_two_days();

View File

@@ -22,8 +22,8 @@ use harmony::{
tenant::{ResourceLimits, TenantConfig, TenantNetworkPolicy},
},
};
use harmony_types::id::Id;
use harmony_types::net::Url;
use harmony_types::{id::Id, k8s_name::K8sName};
#[tokio::main]
async fn main() {
@@ -43,8 +43,9 @@ async fn main() {
};
let discord_receiver = DiscordWebhook {
name: "test-discord".to_string(),
name: K8sName("test-discord".to_string()),
url: Url::Url(url::Url::parse("https://discord.doesnt.exist.com").unwrap()),
selectors: vec![],
};
let high_pvc_fill_rate_over_two_days_alert = high_pvc_fill_rate_over_two_days();

View File

@@ -1,6 +1,6 @@
use std::{
net::{IpAddr, Ipv4Addr},
sync::{Arc, OnceLock},
sync::Arc,
};
use brocade::BrocadeOptions;
@@ -107,7 +107,6 @@ async fn main() {
},
],
switch_client: switch_client.clone(),
network_manager: OnceLock::new(),
};
let inventory = Inventory {

View File

@@ -0,0 +1,22 @@
[package]
name = "example-okd-cluster-alerts"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
harmony_secret = { path = "../../harmony_secret" }
harmony_secret_derive = { path = "../../harmony_secret_derive" }
cidr = { workspace = true }
tokio = { workspace = true }
harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }
serde.workspace = true
brocade = { path = "../../brocade" }

View File

@@ -0,0 +1,38 @@
use std::collections::HashMap;
use harmony::{
inventory::Inventory,
modules::monitoring::{
alert_channel::discord_alert_channel::DiscordWebhook,
okd::cluster_monitoring::OpenshiftClusterAlertScore,
},
topology::K8sAnywhereTopology,
};
use harmony_macros::hurl;
use harmony_types::k8s_name::K8sName;
#[tokio::main]
async fn main() {
let mut sel = HashMap::new();
sel.insert(
"openshift_io_alert_source".to_string(),
"platform".to_string(),
);
let mut sel2 = HashMap::new();
sel2.insert("openshift_io_alert_source".to_string(), "".to_string());
let selectors = vec![sel, sel2];
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(OpenshiftClusterAlertScore {
receivers: vec![Box::new(DiscordWebhook {
name: K8sName("wills-discord-webhook-example".to_string()),
url: hurl!("https://something.io"),
selectors: selectors,
})],
})],
None,
)
.await
.unwrap();
}

View File

@@ -9,10 +9,7 @@ use harmony::{
use harmony_macros::{ip, ipv4};
use harmony_secret::{Secret, SecretManager};
use serde::{Deserialize, Serialize};
use std::{
net::IpAddr,
sync::{Arc, OnceLock},
};
use std::{net::IpAddr, sync::Arc};
#[derive(Secret, Serialize, Deserialize, Debug, PartialEq)]
struct OPNSenseFirewallConfig {
@@ -84,7 +81,6 @@ pub async fn get_topology() -> HAClusterTopology {
},
workers: vec![],
switch_client: switch_client.clone(),
network_manager: OnceLock::new(),
}
}

View File

@@ -10,10 +10,7 @@ use harmony::{
use harmony_macros::{ip, ipv4};
use harmony_secret::{Secret, SecretManager};
use serde::{Deserialize, Serialize};
use std::{
net::IpAddr,
sync::{Arc, OnceLock},
};
use std::{net::IpAddr, sync::Arc};
pub async fn get_topology() -> HAClusterTopology {
let firewall = harmony::topology::LogicalHost {
@@ -79,7 +76,6 @@ pub async fn get_topology() -> HAClusterTopology {
},
workers: vec![],
switch_client: switch_client.clone(),
network_manager: OnceLock::new(),
}
}

View File

@@ -1,6 +1,6 @@
use std::{
net::{IpAddr, Ipv4Addr},
sync::{Arc, OnceLock},
sync::Arc,
};
use brocade::BrocadeOptions;
@@ -79,7 +79,6 @@ async fn main() {
},
workers: vec![],
switch_client: switch_client.clone(),
network_manager: OnceLock::new(),
};
let inventory = Inventory {

View File

@@ -1,4 +1,4 @@
use std::{path::PathBuf, sync::Arc};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use harmony::{
inventory::Inventory,
@@ -10,7 +10,7 @@ use harmony::{
},
topology::K8sAnywhereTopology,
};
use harmony_types::net::Url;
use harmony_types::{k8s_name::K8sName, net::Url};
#[tokio::main]
async fn main() {
@@ -22,8 +22,9 @@ async fn main() {
});
let discord_receiver = DiscordWebhook {
name: "test-discord".to_string(),
name: K8sName("test-discord".to_string()),
url: Url::Url(url::Url::parse("https://discord.doesnt.exist.com").unwrap()),
selectors: vec![],
};
let app = ApplicationScore {

View File

@@ -1,3 +1,4 @@
Dockerfile.harmony
.harmony_generated
harmony
webapp

View File

@@ -1,4 +1,4 @@
use std::{path::PathBuf, sync::Arc};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use harmony::{
inventory::Inventory,
@@ -14,6 +14,7 @@ use harmony::{
topology::K8sAnywhereTopology,
};
use harmony_macros::hurl;
use harmony_types::k8s_name::K8sName;
#[tokio::main]
async fn main() {
@@ -25,8 +26,9 @@ async fn main() {
});
let discord_receiver = DiscordWebhook {
name: "test-discord".to_string(),
name: K8sName("test-discord".to_string()),
url: hurl!("https://discord.doesnt.exist.com"),
selectors: vec![],
};
let webhook_receiver = WebhookReceiver {

View File

@@ -0,0 +1,7 @@
apiVersion: v2
name: harmony-example-rust-webapp-chart
description: A Helm chart for the harmony-example-rust-webapp web application.
type: application
version: 0.1.0
appVersion: "latest"

View File

@@ -0,0 +1,16 @@
{{/*
Expand the name of the chart.
*/}}
{{- define "chart.name" -}}
{{- default .Chart.Name .Values.nameOverride | trunc 63 | trimSuffix "-" }}
{{- end }}
{{/*
Create a default fully qualified app name.
We truncate at 63 chars because some Kubernetes name fields are limited to this (by the DNS naming spec).
*/}}
{{- define "chart.fullname" -}}
{{- $name := default .Chart.Name .Values.nameOverride }}
{{- printf "%s-%s" .Release.Name $name | trunc 63 | trimSuffix "-" }}
{{- end }}

View File

@@ -0,0 +1,23 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ include "chart.fullname" . }}
spec:
replicas: {{ .Values.replicaCount }}
selector:
matchLabels:
app: {{ include "chart.name" . }}
template:
metadata:
labels:
app: {{ include "chart.name" . }}
spec:
containers:
- name: {{ .Chart.Name }}
image: "{{ .Values.image.repository }}:{{ .Values.image.tag | default .Chart.AppVersion }}"
imagePullPolicy: {{ .Values.image.pullPolicy }}
ports:
- name: http
containerPort: 3000
protocol: TCP

View File

@@ -0,0 +1,35 @@
{{- if .Values.ingress.enabled -}}
apiVersion: networking.k8s.io/v1
kind: Ingress
metadata:
name: {{ include "chart.fullname" . }}
annotations:
{{- toYaml .Values.ingress.annotations | nindent 4 }}
spec:
{{- if .Values.ingress.tls }}
tls:
{{- range .Values.ingress.tls }}
- hosts:
{{- range .hosts }}
- {{ . | quote }}
{{- end }}
secretName: {{ .secretName }}
{{- end }}
{{- end }}
rules:
{{- range .Values.ingress.hosts }}
- host: {{ .host | quote }}
http:
paths:
{{- range .paths }}
- path: {{ .path }}
pathType: {{ .pathType }}
backend:
service:
name: {{ include "chart.fullname" $ }}
port:
number: 3000
{{- end }}
{{- end }}
{{- end }}

View File

@@ -0,0 +1,14 @@
apiVersion: v1
kind: Service
metadata:
name: {{ include "chart.fullname" . }}
spec:
type: {{ .Values.service.type }}
ports:
- port: {{ .Values.service.port }}
targetPort: 3000
protocol: TCP
name: http
selector:
app: {{ include "chart.name" . }}

View File

@@ -0,0 +1,34 @@
# Default values for harmony-example-rust-webapp-chart.
# This is a YAML-formatted file.
# Declare variables to be passed into your templates.
replicaCount: 1
image:
repository: hub.nationtech.io/harmony/harmony-example-rust-webapp
pullPolicy: IfNotPresent
# Overridden by the chart's appVersion
tag: "latest"
service:
type: ClusterIP
port: 3000
ingress:
enabled: true
# Annotations for cert-manager to handle SSL.
annotations:
cert-manager.io/cluster-issuer: "letsencrypt-prod"
# Add other annotations like nginx ingress class if needed
# kubernetes.io/ingress.class: nginx
hosts:
- host: chart-example.local
paths:
- path: /
pathType: ImplementationSpecific
tls:
- secretName: harmony-example-rust-webapp-tls
hosts:
- chart-example.local

View File

@@ -10,6 +10,7 @@ use harmony::{
topology::K8sAnywhereTopology,
};
use harmony_macros::hurl;
use harmony_types::k8s_name::K8sName;
use std::{path::PathBuf, sync::Arc};
#[tokio::main]
@@ -31,8 +32,9 @@ async fn main() {
Box::new(Monitoring {
application: application.clone(),
alert_receiver: vec![Box::new(DiscordWebhook {
name: "test-discord".to_string(),
name: K8sName("test-discord".to_string()),
url: hurl!("https://discord.doesnt.exist.com"),
selectors: vec![],
})],
}),
],

View File

@@ -1,141 +0,0 @@
use async_trait::async_trait;
use log::{debug, info};
use std::collections::HashMap;
use crate::{
modules::postgresql::capability::{
BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL,
PostgreSQLClusterRole, PostgreSQLConfig, PostgreSQLEndpoint, ReplicaConfig,
ReplicationCerts,
},
topology::{PreparationError, PreparationOutcome, Topology},
};
pub struct FailoverTopology<T> {
primary: T,
replica: T,
}
#[async_trait]
impl<T: Send + Sync> Topology for FailoverTopology<T> {
fn name(&self) -> &str {
"FailoverTopology"
}
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
todo!()
}
}
#[async_trait]
impl<T: PostgreSQL> PostgreSQL for FailoverTopology<T> {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
info!(
"Starting deployment of failover topology '{}'",
config.cluster_name
);
let primary_config = PostgreSQLConfig {
cluster_name: config.cluster_name.clone(),
instances: config.instances,
storage_size: config.storage_size.clone(),
role: PostgreSQLClusterRole::Primary,
};
info!(
"Deploying primary cluster '{{}}' ({} instances, {:?} storage)",
primary_config.cluster_name, primary_config.storage_size
);
let primary_cluster_name = self.primary.deploy(&primary_config).await?;
info!("Primary cluster '{primary_cluster_name}' deployed successfully");
info!("Retrieving replication certificates for primary '{primary_cluster_name}'");
let certs = self
.primary
.get_replication_certs(&primary_cluster_name)
.await?;
info!("Replication certificates retrieved successfully");
info!("Retrieving public endpoint for primary '{primary_cluster_name}");
let endpoint = self
.primary
.get_public_endpoint(&primary_cluster_name)
.await?
.ok_or_else(|| "No public endpoint configured on primary cluster".to_string())?;
info!(
"Public endpoint '{}:{}' retrieved for primary",
endpoint.host, endpoint.port
);
info!("Configuring replica connection parameters and bootstrap");
let mut connection_parameters = HashMap::new();
connection_parameters.insert("host".to_string(), endpoint.host);
connection_parameters.insert("port".to_string(), endpoint.port.to_string());
connection_parameters.insert("dbname".to_string(), "postgres".to_string());
connection_parameters.insert("user".to_string(), "streaming_replica".to_string());
connection_parameters.insert("sslmode".to_string(), "verify-ca".to_string());
connection_parameters.insert("sslnegotiation".to_string(), "direct".to_string());
debug!("Replica connection parameters: {:?}", connection_parameters);
let external_cluster = ExternalClusterConfig {
name: primary_cluster_name.clone(),
connection_parameters,
};
let bootstrap_config = BootstrapConfig {
strategy: BootstrapStrategy::PgBasebackup,
};
let replica_cluster_config = ReplicaConfig {
primary_cluster_name: primary_cluster_name.clone(),
replication_certs: certs,
bootstrap: bootstrap_config,
external_cluster,
};
let replica_config = PostgreSQLConfig {
cluster_name: format!("{}-replica", primary_cluster_name),
instances: config.instances,
storage_size: config.storage_size.clone(),
role: PostgreSQLClusterRole::Replica(replica_cluster_config),
};
info!(
"Deploying replica cluster '{}' ({} instances, {:?} storage) on replica topology",
replica_config.cluster_name, replica_config.instances, replica_config.storage_size
);
self.replica.deploy(&replica_config).await?;
info!(
"Replica cluster '{}' deployed successfully; failover topology '{}' ready",
replica_config.cluster_name, config.cluster_name
);
Ok(primary_cluster_name)
}
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> {
self.primary.get_replication_certs(cluster_name).await
}
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String> {
self.primary.get_endpoint(cluster_name).await
}
async fn get_public_endpoint(
&self,
cluster_name: &str,
) -> Result<Option<PostgreSQLEndpoint>, String> {
self.primary.get_public_endpoint(cluster_name).await
}
}

View File

@@ -1,25 +1,29 @@
use async_trait::async_trait;
use harmony_macros::ip;
use harmony_types::{
id::Id,
net::{MacAddress, Url},
switch::PortLocation,
};
use kube::api::ObjectMeta;
use log::debug;
use log::info;
use crate::infra::network_manager::OpenShiftNmStateNetworkManager;
use crate::modules::okd::crd::nmstate::{self, NodeNetworkConfigurationPolicy};
use crate::topology::PxeOptions;
use crate::{data::FileContent, executors::ExecutorError};
use crate::{data::FileContent, modules::okd::crd::nmstate::NMState};
use crate::{
executors::ExecutorError, modules::okd::crd::nmstate::NodeNetworkConfigurationPolicySpec,
};
use super::{
DHCPStaticEntry, DhcpServer, DnsRecord, DnsRecordType, DnsServer, Firewall, HostNetworkConfig,
HttpServer, IpAddress, K8sclient, LoadBalancer, LoadBalancerService, LogicalHost, NetworkError,
NetworkManager, PreparationError, PreparationOutcome, Router, Switch, SwitchClient,
SwitchError, TftpServer, Topology, k8s::K8sClient,
HttpServer, IpAddress, K8sclient, LoadBalancer, LoadBalancerService, LogicalHost,
PreparationError, PreparationOutcome, Router, Switch, SwitchClient, SwitchError, TftpServer,
Topology, k8s::K8sClient,
};
use std::sync::{Arc, OnceLock};
use std::collections::BTreeMap;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub struct HAClusterTopology {
@@ -36,7 +40,6 @@ pub struct HAClusterTopology {
pub control_plane: Vec<LogicalHost>,
pub workers: Vec<LogicalHost>,
pub kubeconfig: Option<String>,
pub network_manager: OnceLock<Arc<dyn NetworkManager>>,
}
#[async_trait]
@@ -60,7 +63,7 @@ impl K8sclient for HAClusterTopology {
K8sClient::try_default().await.map_err(|e| e.to_string())?,
)),
Some(kubeconfig) => {
let Some(client) = K8sClient::from_kubeconfig(kubeconfig).await else {
let Some(client) = K8sClient::from_kubeconfig(&kubeconfig).await else {
return Err("Failed to create k8s client".to_string());
};
Ok(Arc::new(client))
@@ -90,12 +93,191 @@ impl HAClusterTopology {
.to_string()
}
pub async fn network_manager(&self) -> &dyn NetworkManager {
let k8s_client = self.k8s_client().await.unwrap();
async fn ensure_nmstate_operator_installed(&self) -> Result<(), String> {
let k8s_client = self.k8s_client().await?;
self.network_manager
.get_or_init(|| Arc::new(OpenShiftNmStateNetworkManager::new(k8s_client.clone())))
.as_ref()
debug!("Installing NMState controller...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/nmstate.io_nmstates.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
debug!("Creating NMState namespace...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/namespace.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
debug!("Creating NMState service account...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/service_account.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
debug!("Creating NMState role...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
debug!("Creating NMState role binding...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role_binding.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
debug!("Creating NMState operator...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/operator.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
k8s_client
.wait_until_deployment_ready("nmstate-operator", Some("nmstate"), None)
.await?;
let nmstate = NMState {
metadata: ObjectMeta {
name: Some("nmstate".to_string()),
..Default::default()
},
..Default::default()
};
debug!("Creating NMState: {nmstate:#?}");
k8s_client
.apply(&nmstate, None)
.await
.map_err(|e| e.to_string())?;
Ok(())
}
fn get_next_bond_id(&self) -> u8 {
42 // FIXME: Find a better way to declare the bond id
}
async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> {
self.ensure_nmstate_operator_installed()
.await
.map_err(|e| {
SwitchError::new(format!(
"Can't configure bond, NMState operator not available: {e}"
))
})?;
let bond_config = self.create_bond_configuration(config);
debug!(
"Applying NMState bond config for host {}: {bond_config:#?}",
config.host_id
);
self.k8s_client()
.await
.unwrap()
.apply(&bond_config, None)
.await
.map_err(|e| SwitchError::new(format!("Failed to configure bond: {e}")))?;
Ok(())
}
fn create_bond_configuration(
&self,
config: &HostNetworkConfig,
) -> NodeNetworkConfigurationPolicy {
let host_name = &config.host_id;
let bond_id = self.get_next_bond_id();
let bond_name = format!("bond{bond_id}");
info!("Configuring bond '{bond_name}' for host '{host_name}'...");
let mut bond_mtu: Option<u32> = None;
let mut copy_mac_from: Option<String> = None;
let mut bond_ports = Vec::new();
let mut interfaces: Vec<nmstate::InterfaceSpec> = Vec::new();
for switch_port in &config.switch_ports {
let interface_name = switch_port.interface.name.clone();
interfaces.push(nmstate::InterfaceSpec {
name: interface_name.clone(),
description: Some(format!("Member of bond {bond_name}")),
r#type: "ethernet".to_string(),
state: "up".to_string(),
mtu: Some(switch_port.interface.mtu),
mac_address: Some(switch_port.interface.mac_address.to_string()),
ipv4: Some(nmstate::IpStackSpec {
enabled: Some(false),
..Default::default()
}),
ipv6: Some(nmstate::IpStackSpec {
enabled: Some(false),
..Default::default()
}),
link_aggregation: None,
..Default::default()
});
bond_ports.push(interface_name.clone());
// Use the first port's details for the bond mtu and mac address
if bond_mtu.is_none() {
bond_mtu = Some(switch_port.interface.mtu);
}
if copy_mac_from.is_none() {
copy_mac_from = Some(interface_name);
}
}
interfaces.push(nmstate::InterfaceSpec {
name: bond_name.clone(),
description: Some(format!("Network bond for host {host_name}")),
r#type: "bond".to_string(),
state: "up".to_string(),
copy_mac_from,
ipv4: Some(nmstate::IpStackSpec {
dhcp: Some(true),
enabled: Some(true),
..Default::default()
}),
ipv6: Some(nmstate::IpStackSpec {
dhcp: Some(true),
autoconf: Some(true),
enabled: Some(true),
..Default::default()
}),
link_aggregation: Some(nmstate::BondSpec {
mode: "802.3ad".to_string(),
ports: bond_ports,
..Default::default()
}),
..Default::default()
});
NodeNetworkConfigurationPolicy {
metadata: ObjectMeta {
name: Some(format!("{host_name}-bond-config")),
..Default::default()
},
spec: NodeNetworkConfigurationPolicySpec {
node_selector: Some(BTreeMap::from([(
"kubernetes.io/hostname".to_string(),
host_name.to_string(),
)])),
desired_state: nmstate::DesiredStateSpec { interfaces },
},
}
}
async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> {
debug!("Configuring port channel: {config:#?}");
let switch_ports = config.switch_ports.iter().map(|s| s.port.clone()).collect();
self.switch_client
.configure_port_channel(&format!("Harmony_{}", config.host_id), switch_ports)
.await
.map_err(|e| SwitchError::new(format!("Failed to configure switch: {e}")))?;
Ok(())
}
pub fn autoload() -> Self {
@@ -119,7 +301,6 @@ impl HAClusterTopology {
bootstrap_host: dummy_host,
control_plane: vec![],
workers: vec![],
network_manager: OnceLock::new(),
}
}
}
@@ -277,40 +458,21 @@ impl HttpServer for HAClusterTopology {
#[async_trait]
impl Switch for HAClusterTopology {
async fn setup_switch(&self) -> Result<(), SwitchError> {
self.switch_client.setup().await.map(|_| ())
self.switch_client.setup().await?;
Ok(())
}
async fn get_port_for_mac_address(
&self,
mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError> {
self.switch_client.find_port(mac_address).await
let port = self.switch_client.find_port(mac_address).await?;
Ok(port)
}
async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> {
debug!("Configuring port channel: {config:#?}");
let switch_ports = config.switch_ports.iter().map(|s| s.port.clone()).collect();
self.switch_client
.configure_port_channel(&format!("Harmony_{}", config.host_id), switch_ports)
.await
.map_err(|e| SwitchError::new(format!("Failed to configure port-channel: {e}")))?;
Ok(())
}
}
#[async_trait]
impl NetworkManager for HAClusterTopology {
async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError> {
self.network_manager()
.await
.ensure_network_manager_installed()
.await
}
async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError> {
self.network_manager().await.configure_bond(config).await
async fn configure_host_network(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> {
self.configure_bond(config).await?;
self.configure_port_channel(config).await
}
}

View File

@@ -5,17 +5,15 @@ use k8s_openapi::{
ClusterResourceScope, NamespaceResourceScope,
api::{
apps::v1::Deployment,
core::v1::{Node, Pod, ServiceAccount},
core::v1::{Pod, ServiceAccount},
},
apimachinery::pkg::version::Info,
};
use kube::{
Client, Config, Discovery, Error, Resource,
api::{
Api, AttachParams, DeleteParams, ListParams, ObjectList, Patch, PatchParams, ResourceExt,
},
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse,
core::{DynamicResourceScope, ErrorResponse},
discovery::{ApiCapabilities, Scope},
error::DiscoveryError,
runtime::reflector::Lookup,
@@ -25,7 +23,7 @@ use kube::{
api::{ApiResource, GroupVersionKind},
runtime::wait::await_condition,
};
use log::{debug, error, trace, warn};
use log::{debug, error, info, trace, warn};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::json;
use similar::TextDiff;
@@ -96,6 +94,23 @@ impl K8sClient {
resource.get(name).await
}
pub async fn get_secret_json_value(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<DynamicObject, Error> {
self.get_resource_json_value(
name,
namespace,
&GroupVersionKind {
group: "".to_string(),
version: "v1".to_string(),
kind: "Secret".to_string(),
},
)
.await
}
pub async fn get_deployment(
&self,
name: &str,
@@ -339,6 +354,169 @@ impl K8sClient {
}
}
fn get_api_for_dynamic_object(
&self,
object: &DynamicObject,
ns: Option<&str>,
) -> Result<Api<DynamicObject>, Error> {
let api_resource = object
.types
.as_ref()
.and_then(|t| {
let parts: Vec<&str> = t.api_version.split('/').collect();
match parts.as_slice() {
[version] => Some(ApiResource::from_gvk(&GroupVersionKind::gvk(
"", version, &t.kind,
))),
[group, version] => Some(ApiResource::from_gvk(&GroupVersionKind::gvk(
group, version, &t.kind,
))),
_ => None,
}
})
.ok_or_else(|| {
Error::BuildRequest(kube::core::request::Error::Validation(
"Invalid apiVersion in DynamicObject {object:#?}".to_string(),
))
})?;
match ns {
Some(ns) => Ok(Api::namespaced_with(self.client.clone(), ns, &api_resource)),
None => Ok(Api::default_namespaced_with(
self.client.clone(),
&api_resource,
)),
}
}
pub async fn apply_dynamic_many(
&self,
resource: &[DynamicObject],
namespace: Option<&str>,
force_conflicts: bool,
) -> Result<Vec<DynamicObject>, Error> {
let mut result = Vec::new();
for r in resource.iter() {
result.push(self.apply_dynamic(r, namespace, force_conflicts).await?);
}
Ok(result)
}
/// Apply DynamicObject resource to the cluster
pub async fn apply_dynamic(
&self,
resource: &DynamicObject,
namespace: Option<&str>,
force_conflicts: bool,
) -> Result<DynamicObject, Error> {
// Build API for this dynamic object
let api = self.get_api_for_dynamic_object(resource, namespace)?;
let name = resource
.metadata
.name
.as_ref()
.ok_or_else(|| {
Error::BuildRequest(kube::core::request::Error::Validation(
"DynamicObject must have metadata.name".to_string(),
))
})?
.as_str();
debug!(
"Applying dynamic resource kind={:?} apiVersion={:?} name='{}' ns={:?}",
resource.types.as_ref().map(|t| &t.kind),
resource.types.as_ref().map(|t| &t.api_version),
name,
namespace
);
trace!(
"Dynamic resource payload:\n{:#}",
serde_json::to_value(resource).unwrap_or(serde_json::Value::Null)
);
// Using same field manager as in apply()
let mut patch_params = PatchParams::apply("harmony");
patch_params.force = force_conflicts;
if *crate::config::DRY_RUN {
// Dry-run path: fetch current, show diff, and return appropriate object
match api.get(name).await {
Ok(current) => {
trace!("Received current dynamic value {current:#?}");
println!("\nPerforming dry-run for resource: '{}'", name);
// Serialize current and new, and strip status from current if present
let mut current_yaml =
serde_yaml::to_value(&current).unwrap_or_else(|_| serde_yaml::Value::Null);
if let Some(map) = current_yaml.as_mapping_mut() {
if map.contains_key(&serde_yaml::Value::String("status".to_string())) {
let removed =
map.remove(&serde_yaml::Value::String("status".to_string()));
trace!("Removed status from current dynamic object: {:?}", removed);
} else {
trace!(
"Did not find status entry for current dynamic object {}/{}",
current.metadata.namespace.as_deref().unwrap_or(""),
current.metadata.name.as_deref().unwrap_or("")
);
}
}
let current_yaml = serde_yaml::to_string(&current_yaml)
.unwrap_or_else(|_| "Failed to serialize current resource".to_string());
let new_yaml = serde_yaml::to_string(resource)
.unwrap_or_else(|_| "Failed to serialize new resource".to_string());
if current_yaml == new_yaml {
println!("No changes detected.");
return Ok(current);
}
println!("Changes detected:");
let diff = TextDiff::from_lines(&current_yaml, &new_yaml);
for change in diff.iter_all_changes() {
let sign = match change.tag() {
similar::ChangeTag::Delete => "-",
similar::ChangeTag::Insert => "+",
similar::ChangeTag::Equal => " ",
};
print!("{}{}", sign, change);
}
// Return the incoming resource as the would-be applied state
Ok(resource.clone())
}
Err(Error::Api(ErrorResponse { code: 404, .. })) => {
println!("\nPerforming dry-run for new resource: '{}'", name);
println!(
"Resource does not exist. It would be created with the following content:"
);
let new_yaml = serde_yaml::to_string(resource)
.unwrap_or_else(|_| "Failed to serialize new resource".to_string());
for line in new_yaml.lines() {
println!("+{}", line);
}
Ok(resource.clone())
}
Err(e) => {
error!("Failed to get dynamic resource '{}': {}", name, e);
Err(e)
}
}
} else {
// Real apply via server-side apply
debug!("Patching (server-side apply) dynamic resource '{}'", name);
api.patch(name, &patch_params, &Patch::Apply(resource))
.await
.map_err(|e| {
error!("Failed to apply dynamic resource '{}': {}", name, e);
e
})
}
}
/// Apply a resource in namespace
///
/// See `kubectl apply` for more information on the expected behavior of this function
@@ -566,58 +744,7 @@ impl K8sClient {
Ok(())
}
/// Gets a single named resource of a specific type `K`.
///
/// This function uses the `ApplyStrategy` trait to correctly determine
/// whether to look in a specific namespace or in the entire cluster.
///
/// Returns `Ok(None)` if the resource is not found (404).
pub async fn get_resource<K>(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<Option<K>, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
<K as Resource>::Scope: ApplyStrategy<K>,
<K as kube::Resource>::DynamicType: Default,
{
let api: Api<K> =
<<K as Resource>::Scope as ApplyStrategy<K>>::get_api(&self.client, namespace);
api.get_opt(name).await
}
/// Lists all resources of a specific type `K`.
///
/// This function uses the `ApplyStrategy` trait to correctly determine
/// whether to list from a specific namespace or from the entire cluster.
pub async fn list_resources<K>(
&self,
namespace: Option<&str>,
list_params: Option<ListParams>,
) -> Result<ObjectList<K>, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
<K as Resource>::Scope: ApplyStrategy<K>,
<K as kube::Resource>::DynamicType: Default,
{
let api: Api<K> =
<<K as Resource>::Scope as ApplyStrategy<K>>::get_api(&self.client, namespace);
let list_params = list_params.unwrap_or_default();
api.list(&list_params).await
}
/// Fetches a list of all Nodes in the cluster.
pub async fn get_nodes(
&self,
list_params: Option<ListParams>,
) -> Result<ObjectList<Node>, Error> {
self.list_resources(None, list_params).await
}
pub async fn from_kubeconfig(path: &str) -> Option<K8sClient> {
pub(crate) async fn from_kubeconfig(path: &str) -> Option<K8sClient> {
let k = match Kubeconfig::read_from(path) {
Ok(k) => k,
Err(e) => {

View File

@@ -1,7 +1,5 @@
mod ha_cluster;
pub mod ingress;
mod failover;
pub use failover::*;
use harmony_types::net::IpAddress;
mod host_binding;
mod http;

View File

@@ -15,7 +15,7 @@ use harmony_types::{
};
use serde::Serialize;
use crate::executors::ExecutorError;
use crate::{executors::ExecutorError, hardware::PhysicalHost};
use super::{LogicalHost, k8s::K8sClient};
@@ -183,37 +183,6 @@ impl FromStr for DnsRecordType {
}
}
#[async_trait]
pub trait NetworkManager: Debug + Send + Sync {
async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError>;
async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError>;
}
#[derive(Debug, Clone, new)]
pub struct NetworkError {
msg: String,
}
impl fmt::Display for NetworkError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.msg)
}
}
impl Error for NetworkError {}
impl From<kube::Error> for NetworkError {
fn from(value: kube::Error) -> Self {
NetworkError::new(value.to_string())
}
}
impl From<String> for NetworkError {
fn from(value: String) -> Self {
NetworkError::new(value)
}
}
#[async_trait]
pub trait Switch: Send + Sync {
async fn setup_switch(&self) -> Result<(), SwitchError>;
@@ -223,7 +192,7 @@ pub trait Switch: Send + Sync {
mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError>;
async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError>;
async fn configure_host_network(&self, config: &HostNetworkConfig) -> Result<(), SwitchError>;
}
#[derive(Clone, Debug, PartialEq)]

View File

@@ -1,6 +1,7 @@
use std::any::Any;
use std::{any::Any, collections::HashMap};
use async_trait::async_trait;
use kube::api::DynamicObject;
use log::debug;
use crate::{
@@ -76,6 +77,15 @@ pub trait AlertReceiver<S: AlertSender>: std::fmt::Debug + Send + Sync {
fn name(&self) -> String;
fn clone_box(&self) -> Box<dyn AlertReceiver<S>>;
fn as_any(&self) -> &dyn Any;
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String>;
}
#[derive(Debug)]
pub struct AlertManagerReceiver {
pub receiver_config: serde_json::Value,
// FIXME we should not leak k8s here. DynamicObject is k8s specific
pub additional_ressources: Vec<DynamicObject>,
pub route_config: serde_json::Value,
}
#[async_trait]

View File

@@ -14,7 +14,7 @@ use k8s_openapi::{
},
apimachinery::pkg::util::intstr::IntOrString,
};
use kube::Resource;
use kube::{Resource, api::DynamicObject};
use log::debug;
use serde::de::DeserializeOwned;
use serde_json::json;

View File

@@ -4,6 +4,5 @@ pub mod hp_ilo;
pub mod intel_amt;
pub mod inventory;
pub mod kube;
pub mod network_manager;
pub mod opnsense;
mod sqlx;

View File

@@ -1,257 +0,0 @@
use std::{
collections::{BTreeMap, HashSet},
sync::Arc,
};
use async_trait::async_trait;
use harmony_types::id::Id;
use k8s_openapi::api::core::v1::Node;
use kube::{
ResourceExt,
api::{ObjectList, ObjectMeta},
};
use log::{debug, info};
use crate::{
modules::okd::crd::nmstate,
topology::{HostNetworkConfig, NetworkError, NetworkManager, k8s::K8sClient},
};
pub struct OpenShiftNmStateNetworkManager {
k8s_client: Arc<K8sClient>,
}
impl std::fmt::Debug for OpenShiftNmStateNetworkManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpenShiftNmStateNetworkManager").finish()
}
}
#[async_trait]
impl NetworkManager for OpenShiftNmStateNetworkManager {
async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError> {
debug!("Installing NMState controller...");
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/nmstate.io_nmstates.yaml
").unwrap(), Some("nmstate"))
.await?;
debug!("Creating NMState namespace...");
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/namespace.yaml
").unwrap(), Some("nmstate"))
.await?;
debug!("Creating NMState service account...");
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/service_account.yaml
").unwrap(), Some("nmstate"))
.await?;
debug!("Creating NMState role...");
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role.yaml
").unwrap(), Some("nmstate"))
.await?;
debug!("Creating NMState role binding...");
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role_binding.yaml
").unwrap(), Some("nmstate"))
.await?;
debug!("Creating NMState operator...");
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/operator.yaml
").unwrap(), Some("nmstate"))
.await?;
self.k8s_client
.wait_until_deployment_ready("nmstate-operator", Some("nmstate"), None)
.await?;
let nmstate = nmstate::NMState {
metadata: ObjectMeta {
name: Some("nmstate".to_string()),
..Default::default()
},
..Default::default()
};
debug!(
"Creating NMState:\n{}",
serde_yaml::to_string(&nmstate).unwrap()
);
self.k8s_client.apply(&nmstate, None).await?;
Ok(())
}
async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError> {
let hostname = self.get_hostname(&config.host_id).await.map_err(|e| {
NetworkError::new(format!(
"Can't configure bond, can't get hostname for host '{}': {e}",
config.host_id
))
})?;
let bond_id = self.get_next_bond_id(&hostname).await.map_err(|e| {
NetworkError::new(format!(
"Can't configure bond, can't get an available bond id for host '{}': {e}",
config.host_id
))
})?;
let bond_config = self.create_bond_configuration(&hostname, &bond_id, config);
debug!(
"Applying NMState bond config for host {}:\n{}",
config.host_id,
serde_yaml::to_string(&bond_config).unwrap(),
);
self.k8s_client
.apply(&bond_config, None)
.await
.map_err(|e| NetworkError::new(format!("Failed to configure bond: {e}")))?;
Ok(())
}
}
impl OpenShiftNmStateNetworkManager {
pub fn new(k8s_client: Arc<K8sClient>) -> Self {
Self { k8s_client }
}
fn create_bond_configuration(
&self,
host: &str,
bond_name: &str,
config: &HostNetworkConfig,
) -> nmstate::NodeNetworkConfigurationPolicy {
info!("Configuring bond '{bond_name}' for host '{host}'...");
let mut bond_mtu: Option<u32> = None;
let mut copy_mac_from: Option<String> = None;
let mut bond_ports = Vec::new();
let mut interfaces: Vec<nmstate::Interface> = Vec::new();
for switch_port in &config.switch_ports {
let interface_name = switch_port.interface.name.clone();
interfaces.push(nmstate::Interface {
name: interface_name.clone(),
description: Some(format!("Member of bond {bond_name}")),
r#type: nmstate::InterfaceType::Ethernet,
state: "up".to_string(),
ipv4: Some(nmstate::IpStackSpec {
enabled: Some(false),
..Default::default()
}),
ipv6: Some(nmstate::IpStackSpec {
enabled: Some(false),
..Default::default()
}),
link_aggregation: None,
..Default::default()
});
bond_ports.push(interface_name.clone());
// Use the first port's details for the bond mtu and mac address
if bond_mtu.is_none() {
bond_mtu = Some(switch_port.interface.mtu);
}
if copy_mac_from.is_none() {
copy_mac_from = Some(interface_name);
}
}
interfaces.push(nmstate::Interface {
name: bond_name.to_string(),
description: Some(format!("HARMONY - Network bond for host {host}")),
r#type: nmstate::InterfaceType::Bond,
state: "up".to_string(),
copy_mac_from,
ipv4: Some(nmstate::IpStackSpec {
dhcp: Some(true),
enabled: Some(true),
..Default::default()
}),
ipv6: Some(nmstate::IpStackSpec {
dhcp: Some(true),
autoconf: Some(true),
enabled: Some(true),
..Default::default()
}),
link_aggregation: Some(nmstate::BondSpec {
mode: "802.3ad".to_string(),
ports: bond_ports,
..Default::default()
}),
..Default::default()
});
nmstate::NodeNetworkConfigurationPolicy {
metadata: ObjectMeta {
name: Some(format!("{host}-bond-config")),
..Default::default()
},
spec: nmstate::NodeNetworkConfigurationPolicySpec {
node_selector: Some(BTreeMap::from([(
"kubernetes.io/hostname".to_string(),
host.to_string(),
)])),
desired_state: nmstate::NetworkState {
interfaces,
..Default::default()
},
},
}
}
async fn get_hostname(&self, host_id: &Id) -> Result<String, String> {
let nodes: ObjectList<Node> = self
.k8s_client
.list_resources(None, None)
.await
.map_err(|e| format!("Failed to list nodes: {e}"))?;
let Some(node) = nodes.iter().find(|n| {
n.status
.as_ref()
.and_then(|s| s.node_info.as_ref())
.map(|i| i.system_uuid == host_id.to_string())
.unwrap_or(false)
}) else {
return Err(format!("No node found for host '{host_id}'"));
};
node.labels()
.get("kubernetes.io/hostname")
.ok_or(format!(
"Node '{host_id}' has no kubernetes.io/hostname label"
))
.cloned()
}
async fn get_next_bond_id(&self, hostname: &str) -> Result<String, String> {
let network_state: Option<nmstate::NodeNetworkState> = self
.k8s_client
.get_resource(hostname, None)
.await
.map_err(|e| format!("Failed to list nodes: {e}"))?;
let interfaces = vec![];
let existing_bonds: Vec<&nmstate::Interface> = network_state
.as_ref()
.and_then(|network_state| network_state.status.current_state.as_ref())
.map_or(&interfaces, |current_state| &current_state.interfaces)
.iter()
.filter(|i| i.r#type == nmstate::InterfaceType::Bond)
.collect();
let used_ids: HashSet<u32> = existing_bonds
.iter()
.filter_map(|i| {
i.name
.strip_prefix("bond")
.and_then(|id| id.parse::<u32>().ok())
})
.collect();
let next_id = (0..).find(|id| !used_ids.contains(id)).unwrap();
Ok(format!("bond{next_id}"))
}
}

View File

@@ -74,11 +74,7 @@ impl<T: Topology> Interpret<T> for DiscoverHostForRoleInterpret {
match ans {
Ok(choice) => {
info!(
"Selected {} as the {:?} node.",
choice.summary(),
self.score.role
);
info!("Selected {} as the bootstrap node.", choice.summary());
host_repo
.save_role_mapping(&self.score.role, &choice)
.await?;
@@ -94,7 +90,10 @@ impl<T: Topology> Interpret<T> for DiscoverHostForRoleInterpret {
"Failed to select node for role {:?} : {}",
self.score.role, e
);
return Err(InterpretError::new(format!("Could not select host : {e}")));
return Err(InterpretError::new(format!(
"Could not select host : {}",
e.to_string()
)));
}
}
}

View File

@@ -17,4 +17,3 @@ pub mod prometheus;
pub mod storage;
pub mod tenant;
pub mod tftp;
pub mod postgresql;

View File

@@ -1,18 +1,23 @@
use std::any::Any;
use std::collections::BTreeMap;
use std::collections::{BTreeMap, HashMap};
use async_trait::async_trait;
use harmony_types::k8s_name::K8sName;
use k8s_openapi::api::core::v1::Secret;
use kube::api::ObjectMeta;
use log::debug;
use kube::Resource;
use kube::api::{DynamicObject, ObjectMeta};
use log::{debug, trace};
use serde::Serialize;
use serde_json::json;
use serde_yaml::{Mapping, Value};
use crate::infra::kube::kube_resource_to_dynamic;
use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::{
AlertmanagerConfig, AlertmanagerConfigSpec, CRDPrometheus,
};
use crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::RHOBObservability;
use crate::modules::monitoring::okd::OpenshiftClusterAlertSender;
use crate::topology::oberservability::monitoring::AlertManagerReceiver;
use crate::{
interpret::{InterpretError, Outcome},
modules::monitoring::{
@@ -28,14 +33,13 @@ use harmony_types::net::Url;
#[derive(Debug, Clone, Serialize)]
pub struct DiscordWebhook {
pub name: String,
pub name: K8sName,
pub url: Url,
pub selectors: Vec<HashMap<String, String>>,
}
#[async_trait]
impl AlertReceiver<RHOBObservability> for DiscordWebhook {
async fn install(&self, sender: &RHOBObservability) -> Result<Outcome, InterpretError> {
let ns = sender.namespace.clone();
impl DiscordWebhook {
fn get_receiver_config(&self) -> Result<AlertManagerReceiver, String> {
let secret_name = format!("{}-secret", self.name.clone());
let webhook_key = format!("{}", self.url.clone());
@@ -52,33 +56,91 @@ impl AlertReceiver<RHOBObservability> for DiscordWebhook {
..Default::default()
};
let _ = sender.client.apply(&secret, Some(&ns)).await;
let mut matchers: Vec<String> = Vec::new();
for selector in &self.selectors {
trace!("selector: {:#?}", selector);
for (k, v) in selector {
matchers.push(format!("{} = {}", k, v));
}
}
Ok(AlertManagerReceiver {
additional_ressources: vec![kube_resource_to_dynamic(&secret)?],
receiver_config: json!({
"name": self.name,
"discord_configs": [
{
"webhook_url": self.url.clone(),
"title": "{{ template \"discord.default.title\" . }}",
"message": "{{ template \"discord.default.message\" . }}"
}
]
}),
route_config: json!({
"receiver": self.name,
"matchers": matchers,
}),
})
}
}
#[async_trait]
impl AlertReceiver<OpenshiftClusterAlertSender> for DiscordWebhook {
async fn install(
&self,
sender: &OpenshiftClusterAlertSender,
) -> Result<Outcome, InterpretError> {
todo!()
}
fn name(&self) -> String {
self.name.clone().to_string()
}
fn clone_box(&self) -> Box<dyn AlertReceiver<OpenshiftClusterAlertSender>> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
todo!()
}
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
self.get_receiver_config()
}
}
#[async_trait]
impl AlertReceiver<RHOBObservability> for DiscordWebhook {
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
todo!()
}
async fn install(&self, sender: &RHOBObservability) -> Result<Outcome, InterpretError> {
let ns = sender.namespace.clone();
let config = self.get_receiver_config()?;
for resource in config.additional_ressources.iter() {
todo!("can I apply a dynamicresource");
// sender.client.apply(resource, Some(&ns)).await;
}
let spec = crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::AlertmanagerConfigSpec {
data: json!({
"route": {
"receiver": self.name,
},
"receivers": [
{
"name": self.name,
"discordConfigs": [
{
"apiURL": {
"name": secret_name,
"key": "webhook-url",
},
"title": "{{ template \"discord.default.title\" . }}",
"message": "{{ template \"discord.default.message\" . }}"
}
]
}
config.receiver_config
]
}),
};
let alertmanager_configs = crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::AlertmanagerConfig {
metadata: ObjectMeta {
name: Some(self.name.clone()),
name: Some(self.name.clone().to_string()),
labels: Some(std::collections::BTreeMap::from([(
"alertmanagerConfig".to_string(),
"enabled".to_string(),
@@ -122,6 +184,9 @@ impl AlertReceiver<RHOBObservability> for DiscordWebhook {
#[async_trait]
impl AlertReceiver<CRDPrometheus> for DiscordWebhook {
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
todo!()
}
async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> {
let ns = sender.namespace.clone();
let secret_name = format!("{}-secret", self.name.clone());
@@ -167,7 +232,7 @@ impl AlertReceiver<CRDPrometheus> for DiscordWebhook {
let alertmanager_configs = AlertmanagerConfig {
metadata: ObjectMeta {
name: Some(self.name.clone()),
name: Some(self.name.clone().to_string()),
labels: Some(std::collections::BTreeMap::from([(
"alertmanagerConfig".to_string(),
"enabled".to_string(),
@@ -200,6 +265,9 @@ impl AlertReceiver<CRDPrometheus> for DiscordWebhook {
#[async_trait]
impl AlertReceiver<Prometheus> for DiscordWebhook {
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
todo!()
}
async fn install(&self, sender: &Prometheus) -> Result<Outcome, InterpretError> {
sender.install_receiver(self).await
}
@@ -217,7 +285,7 @@ impl AlertReceiver<Prometheus> for DiscordWebhook {
#[async_trait]
impl PrometheusReceiver for DiscordWebhook {
fn name(&self) -> String {
self.name.clone()
self.name.clone().to_string()
}
async fn configure_receiver(&self) -> AlertManagerChannelConfig {
self.get_config().await
@@ -226,6 +294,9 @@ impl PrometheusReceiver for DiscordWebhook {
#[async_trait]
impl AlertReceiver<KubePrometheus> for DiscordWebhook {
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
todo!()
}
async fn install(&self, sender: &KubePrometheus) -> Result<Outcome, InterpretError> {
sender.install_receiver(self).await
}
@@ -243,7 +314,7 @@ impl AlertReceiver<KubePrometheus> for DiscordWebhook {
#[async_trait]
impl KubePrometheusReceiver for DiscordWebhook {
fn name(&self) -> String {
self.name.clone()
self.name.clone().to_string()
}
async fn configure_receiver(&self) -> AlertManagerChannelConfig {
self.get_config().await
@@ -270,7 +341,7 @@ impl DiscordWebhook {
let mut route = Mapping::new();
route.insert(
Value::String("receiver".to_string()),
Value::String(self.name.clone()),
Value::String(self.name.clone().to_string()),
);
route.insert(
Value::String("matchers".to_string()),
@@ -284,7 +355,7 @@ impl DiscordWebhook {
let mut receiver = Mapping::new();
receiver.insert(
Value::String("name".to_string()),
Value::String(self.name.clone()),
Value::String(self.name.clone().to_string()),
);
let mut discord_config = Mapping::new();
@@ -309,8 +380,9 @@ mod tests {
#[tokio::test]
async fn discord_serialize_should_match() {
let discord_receiver = DiscordWebhook {
name: "test-discord".to_string(),
name: K8sName("test-discord".to_string()),
url: Url::Url(url::Url::parse("https://discord.i.dont.exist.com").unwrap()),
selectors: vec![],
};
let discord_receiver_receiver =

View File

@@ -19,7 +19,7 @@ use crate::{
},
prometheus::prometheus::{Prometheus, PrometheusReceiver},
},
topology::oberservability::monitoring::AlertReceiver,
topology::oberservability::monitoring::{AlertManagerReceiver, AlertReceiver},
};
use harmony_types::net::Url;
@@ -31,6 +31,9 @@ pub struct WebhookReceiver {
#[async_trait]
impl AlertReceiver<RHOBObservability> for WebhookReceiver {
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
todo!()
}
async fn install(&self, sender: &RHOBObservability) -> Result<Outcome, InterpretError> {
let spec = crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::AlertmanagerConfigSpec {
data: json!({
@@ -97,6 +100,9 @@ impl AlertReceiver<RHOBObservability> for WebhookReceiver {
#[async_trait]
impl AlertReceiver<CRDPrometheus> for WebhookReceiver {
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
todo!()
}
async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> {
let spec = crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::AlertmanagerConfigSpec {
data: json!({
@@ -158,6 +164,9 @@ impl AlertReceiver<CRDPrometheus> for WebhookReceiver {
#[async_trait]
impl AlertReceiver<Prometheus> for WebhookReceiver {
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
todo!()
}
async fn install(&self, sender: &Prometheus) -> Result<Outcome, InterpretError> {
sender.install_receiver(self).await
}
@@ -184,6 +193,9 @@ impl PrometheusReceiver for WebhookReceiver {
#[async_trait]
impl AlertReceiver<KubePrometheus> for WebhookReceiver {
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
todo!()
}
async fn install(&self, sender: &KubePrometheus) -> Result<Outcome, InterpretError> {
sender.install_receiver(self).await
}

View File

@@ -0,0 +1,270 @@
use base64::prelude::*;
use async_trait::async_trait;
use harmony_types::id::Id;
use kube::api::DynamicObject;
use log::{debug, info, trace};
use serde::Serialize;
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
modules::monitoring::okd::OpenshiftClusterAlertSender,
score::Score,
topology::{K8sclient, Topology, oberservability::monitoring::AlertReceiver},
};
impl Clone for Box<dyn AlertReceiver<OpenshiftClusterAlertSender>> {
fn clone(&self) -> Self {
self.clone_box()
}
}
impl Serialize for Box<dyn AlertReceiver<OpenshiftClusterAlertSender>> {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
todo!()
}
}
#[derive(Debug, Clone, Serialize)]
pub struct OpenshiftClusterAlertScore {
pub receivers: Vec<Box<dyn AlertReceiver<OpenshiftClusterAlertSender>>>,
}
impl<T: Topology + K8sclient> Score<T> for OpenshiftClusterAlertScore {
fn name(&self) -> String {
"ClusterAlertScore".to_string()
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(OpenshiftClusterAlertInterpret {
receivers: self.receivers.clone(),
})
}
}
#[derive(Debug)]
pub struct OpenshiftClusterAlertInterpret {
receivers: Vec<Box<dyn AlertReceiver<OpenshiftClusterAlertSender>>>,
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for OpenshiftClusterAlertInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await?;
let openshift_monitoring_namespace = "openshift-monitoring";
let mut alertmanager_main_secret: DynamicObject = client
.get_secret_json_value("alertmanager-main", Some(openshift_monitoring_namespace))
.await?;
trace!("Got secret {alertmanager_main_secret:#?}");
let data: &mut serde_json::Value = &mut alertmanager_main_secret.data;
trace!("Alertmanager-main secret data {data:#?}");
let data_obj = data
.get_mut("data")
.ok_or(InterpretError::new(
"Missing 'data' field in alertmanager-main secret.".to_string(),
))?
.as_object_mut()
.ok_or(InterpretError::new(
"'data' field in alertmanager-main secret is expected to be an object ."
.to_string(),
))?;
let config_b64 = data_obj
.get("alertmanager.yaml")
.ok_or(InterpretError::new(
"Missing 'alertmanager.yaml' in alertmanager-main secret data".to_string(),
))?
.as_str()
.unwrap_or("");
trace!("Config base64 {config_b64}");
let config_bytes = BASE64_STANDARD.decode(config_b64).unwrap_or_default();
let mut am_config: serde_yaml::Value =
serde_yaml::from_str(&String::from_utf8(config_bytes).unwrap_or_default())
.unwrap_or_default();
debug!("Current alertmanager config {am_config:#?}");
let existing_receivers_sequence = if let Some(receivers) = am_config.get_mut("receivers") {
match receivers.as_sequence_mut() {
Some(seq) => seq,
None => {
return Err(InterpretError::new(format!(
"Expected alertmanager config receivers to be a sequence, got {:?}",
receivers
)));
}
}
} else {
&mut serde_yaml::Sequence::default()
};
let mut additional_resources = vec![];
for custom_receiver in &self.receivers {
let name = custom_receiver.name();
let alertmanager_receiver = custom_receiver.as_alertmanager_receiver()?;
let receiver_json_value = alertmanager_receiver.receiver_config;
let receiver_yaml_string =
serde_json::to_string(&receiver_json_value).map_err(|e| {
InterpretError::new(format!("Failed to serialize receiver config: {}", e))
})?;
let receiver_yaml_value: serde_yaml::Value =
serde_yaml::from_str(&receiver_yaml_string).map_err(|e| {
InterpretError::new(format!("Failed to parse receiver config as YAML: {}", e))
})?;
if let Some(idx) = existing_receivers_sequence.iter().position(|r| {
r.get("name")
.and_then(|n| n.as_str())
.map_or(false, |n| n == name)
}) {
info!("Replacing existing AlertManager receiver: {}", name);
existing_receivers_sequence[idx] = receiver_yaml_value;
} else {
debug!("Adding new AlertManager receiver: {}", name);
existing_receivers_sequence.push(receiver_yaml_value);
}
additional_resources.push(alertmanager_receiver.additional_ressources);
}
let existing_route_mapping = if let Some(route) = am_config.get_mut("route") {
match route.as_mapping_mut() {
Some(map) => map,
None => {
return Err(InterpretError::new(format!(
"Expected alertmanager config route to be a mapping, got {:?}",
route
)));
}
}
} else {
&mut serde_yaml::Mapping::default()
};
let existing_route_sequence = if let Some(routes) = existing_route_mapping.get_mut("routes")
{
match routes.as_sequence_mut() {
Some(seq) => seq,
None => {
return Err(InterpretError::new(format!(
"Expected alertmanager config routes to be a sequence, got {:?}",
routes
)));
}
}
} else {
&mut serde_yaml::Sequence::default()
};
for custom_receiver in &self.receivers {
let name = custom_receiver.name();
let alertmanager_receiver = custom_receiver.as_alertmanager_receiver()?;
let route_json_value = alertmanager_receiver.route_config;
let route_yaml_string = serde_json::to_string(&route_json_value).map_err(|e| {
InterpretError::new(format!("Failed to serialize route config: {}", e))
})?;
let route_yaml_value: serde_yaml::Value = serde_yaml::from_str(&route_yaml_string)
.map_err(|e| {
InterpretError::new(format!("Failed to parse route config as YAML: {}", e))
})?;
if let Some(idy) = existing_route_sequence.iter().position(|r| {
r.get("receiver")
.and_then(|n| n.as_str())
.map_or(false, |n| n == name)
}) {
info!("Replacing existing AlertManager receiver: {}", name);
existing_route_sequence[idy] = route_yaml_value;
} else {
debug!("Adding new AlertManager receiver: {}", name);
existing_route_sequence.push(route_yaml_value);
}
}
debug!("Current alertmanager config {am_config:#?}");
// TODO
// - save new version of alertmanager config
// - write additional ressources to the cluster
let am_config = serde_yaml::to_string(&am_config).map_err(|e| {
InterpretError::new(format!(
"Failed to serialize new alertmanager config to string : {e}"
))
})?;
let mut am_config_b64 = String::new();
BASE64_STANDARD.encode_string(am_config, &mut am_config_b64);
// TODO put update configmap value and save new value
data_obj.insert(
"alertmanager.yaml".to_string(),
serde_json::Value::String(am_config_b64),
);
// https://kubernetes.io/docs/reference/using-api/server-side-apply/#field-management
alertmanager_main_secret.metadata.managed_fields = None;
trace!("Applying new alertmanager_main_secret {alertmanager_main_secret:#?}");
client
.apply_dynamic(
&alertmanager_main_secret,
Some(openshift_monitoring_namespace),
true,
)
.await?;
let additional_resources = additional_resources.concat();
trace!("Applying additional ressources for alert receivers {additional_resources:#?}");
client
.apply_dynamic_many(
&additional_resources,
Some(openshift_monitoring_namespace),
true,
)
.await?;
Ok(Outcome::success(format!(
"Successfully configured {} cluster alert receivers: {}",
self.receivers.len(),
self.receivers
.iter()
.map(|r| r.name())
.collect::<Vec<_>>()
.join(", ")
)))
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OpenshiftClusterAlertInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@@ -0,0 +1,90 @@
use std::{collections::BTreeMap, sync::Arc};
use crate::{
interpret::{InterpretError, Outcome},
topology::k8s::K8sClient,
};
use k8s_openapi::api::core::v1::ConfigMap;
use kube::api::ObjectMeta;
pub(crate) struct Config;
impl Config {
pub async fn create_cluster_monitoring_config_cm(
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let mut data = BTreeMap::new();
data.insert(
"config.yaml".to_string(),
r#"
enableUserWorkload: true
alertmanagerMain:
enableUserAlertmanagerConfig: true
"#
.to_string(),
);
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("cluster-monitoring-config".to_string()),
namespace: Some("openshift-monitoring".to_string()),
..Default::default()
},
data: Some(data),
..Default::default()
};
client.apply(&cm, Some("openshift-monitoring")).await?;
Ok(Outcome::success(
"updated cluster-monitoring-config-map".to_string(),
))
}
pub async fn create_user_workload_monitoring_config_cm(
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let mut data = BTreeMap::new();
data.insert(
"config.yaml".to_string(),
r#"
alertmanager:
enabled: true
enableAlertmanagerConfig: true
"#
.to_string(),
);
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("user-workload-monitoring-config".to_string()),
namespace: Some("openshift-user-workload-monitoring".to_string()),
..Default::default()
},
data: Some(data),
..Default::default()
};
client
.apply(&cm, Some("openshift-user-workload-monitoring"))
.await?;
Ok(Outcome::success(
"updated openshift-user-monitoring-config-map".to_string(),
))
}
pub async fn verify_user_workload(client: &Arc<K8sClient>) -> Result<Outcome, InterpretError> {
let namespace = "openshift-user-workload-monitoring";
let alertmanager_name = "alertmanager-user-workload-0";
let prometheus_name = "prometheus-user-workload-0";
client
.wait_for_pod_ready(alertmanager_name, Some(namespace))
.await?;
client
.wait_for_pod_ready(prometheus_name, Some(namespace))
.await?;
Ok(Outcome::success(format!(
"pods: {}, {} ready in ns: {}",
alertmanager_name, prometheus_name, namespace
)))
}
}

View File

@@ -1,16 +1,13 @@
use std::{collections::BTreeMap, sync::Arc};
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
modules::monitoring::okd::config::Config,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
topology::{K8sclient, Topology},
};
use async_trait::async_trait;
use harmony_types::id::Id;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::api::ObjectMeta;
use serde::Serialize;
#[derive(Clone, Debug, Serialize)]
@@ -37,10 +34,9 @@ impl<T: Topology + K8sclient> Interpret<T> for OpenshiftUserWorkloadMonitoringIn
topology: &T,
) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await.unwrap();
self.update_cluster_monitoring_config_cm(&client).await?;
self.update_user_workload_monitoring_config_cm(&client)
.await?;
self.verify_user_workload(&client).await?;
Config::create_cluster_monitoring_config_cm(&client).await?;
Config::create_user_workload_monitoring_config_cm(&client).await?;
Config::verify_user_workload(&client).await?;
Ok(Outcome::success(
"successfully enabled user-workload-monitoring".to_string(),
))
@@ -62,88 +58,3 @@ impl<T: Topology + K8sclient> Interpret<T> for OpenshiftUserWorkloadMonitoringIn
todo!()
}
}
impl OpenshiftUserWorkloadMonitoringInterpret {
pub async fn update_cluster_monitoring_config_cm(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let mut data = BTreeMap::new();
data.insert(
"config.yaml".to_string(),
r#"
enableUserWorkload: true
alertmanagerMain:
enableUserAlertmanagerConfig: true
"#
.to_string(),
);
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("cluster-monitoring-config".to_string()),
namespace: Some("openshift-monitoring".to_string()),
..Default::default()
},
data: Some(data),
..Default::default()
};
client.apply(&cm, Some("openshift-monitoring")).await?;
Ok(Outcome::success(
"updated cluster-monitoring-config-map".to_string(),
))
}
pub async fn update_user_workload_monitoring_config_cm(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let mut data = BTreeMap::new();
data.insert(
"config.yaml".to_string(),
r#"
alertmanager:
enabled: true
enableAlertmanagerConfig: true
"#
.to_string(),
);
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("user-workload-monitoring-config".to_string()),
namespace: Some("openshift-user-workload-monitoring".to_string()),
..Default::default()
},
data: Some(data),
..Default::default()
};
client
.apply(&cm, Some("openshift-user-workload-monitoring"))
.await?;
Ok(Outcome::success(
"updated openshift-user-monitoring-config-map".to_string(),
))
}
pub async fn verify_user_workload(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let namespace = "openshift-user-workload-monitoring";
let alertmanager_name = "alertmanager-user-workload-0";
let prometheus_name = "prometheus-user-workload-0";
client
.wait_for_pod_ready(alertmanager_name, Some(namespace))
.await?;
client
.wait_for_pod_ready(prometheus_name, Some(namespace))
.await?;
Ok(Outcome::success(format!(
"pods: {}, {} ready in ns: {}",
alertmanager_name, prometheus_name, namespace
)))
}
}

View File

@@ -1 +1,14 @@
use crate::topology::oberservability::monitoring::AlertSender;
pub mod cluster_monitoring;
pub(crate) mod config;
pub mod enable_user_workload;
#[derive(Debug)]
pub struct OpenshiftClusterAlertSender;
impl AlertSender for OpenshiftClusterAlertSender {
fn name(&self) -> String {
"OpenshiftClusterAlertSender".to_string()
}
}

View File

@@ -1,7 +1,6 @@
use std::collections::BTreeMap;
use k8s_openapi::{ClusterResourceScope, Resource};
use kube::{CustomResource, api::ObjectMeta};
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -48,223 +47,28 @@ pub struct ProbeDns {
group = "nmstate.io",
version = "v1",
kind = "NodeNetworkConfigurationPolicy",
namespaced = false
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct NodeNetworkConfigurationPolicySpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub node_selector: Option<BTreeMap<String, String>>,
pub desired_state: NetworkState,
}
// Currently, kube-rs derive doesn't support resources without a `spec` field, so we have
// to implement it ourselves.
//
// Ref:
// - https://github.com/kube-rs/kube/issues/1763
// - https://github.com/kube-rs/kube/discussions/1762
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct NodeNetworkState {
metadata: ObjectMeta,
pub status: NodeNetworkStateStatus,
}
impl Resource for NodeNetworkState {
const API_VERSION: &'static str = "nmstate.io/v1beta1";
const GROUP: &'static str = "nmstate.io";
const VERSION: &'static str = "v1beta1";
const KIND: &'static str = "NodeNetworkState";
const URL_PATH_SEGMENT: &'static str = "nodenetworkstates";
type Scope = ClusterResourceScope;
}
impl k8s_openapi::Metadata for NodeNetworkState {
type Ty = ObjectMeta;
fn metadata(&self) -> &Self::Ty {
&self.metadata
}
fn metadata_mut(&mut self) -> &mut Self::Ty {
&mut self.metadata
}
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct NodeNetworkStateStatus {
#[serde(skip_serializing_if = "Option::is_none")]
pub current_state: Option<NetworkState>,
#[serde(skip_serializing_if = "Option::is_none")]
pub handler_nmstate_version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub host_network_manager_version: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub last_successful_update_time: Option<String>,
}
/// The NetworkState is the top-level struct, representing the entire
/// desired or current network state.
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
#[serde(deny_unknown_fields)]
pub struct NetworkState {
#[serde(skip_serializing_if = "Option::is_none")]
pub hostname: Option<HostNameState>,
#[serde(rename = "dns-resolver", skip_serializing_if = "Option::is_none")]
pub dns: Option<DnsState>,
#[serde(rename = "route-rules", skip_serializing_if = "Option::is_none")]
pub rules: Option<RouteRuleState>,
#[serde(skip_serializing_if = "Option::is_none")]
pub routes: Option<RouteState>,
#[serde(default, skip_serializing_if = "Vec::is_empty")]
pub interfaces: Vec<Interface>,
#[serde(rename = "ovs-db", skip_serializing_if = "Option::is_none")]
pub ovsdb: Option<OvsDbGlobalConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ovn: Option<OvnConfiguration>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct HostNameState {
#[serde(skip_serializing_if = "Option::is_none")]
pub running: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub config: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct DnsState {
#[serde(skip_serializing_if = "Option::is_none")]
pub running: Option<DnsResolverConfig>,
#[serde(skip_serializing_if = "Option::is_none")]
pub config: Option<DnsResolverConfig>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct DnsResolverConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub search: Option<Vec<String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub server: Option<Vec<String>>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct RouteRuleState {
#[serde(skip_serializing_if = "Option::is_none")]
pub config: Option<Vec<RouteRule>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub running: Option<Vec<RouteRule>>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct RouteState {
#[serde(skip_serializing_if = "Option::is_none")]
pub config: Option<Vec<Route>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub running: Option<Vec<Route>>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct RouteRule {
#[serde(rename = "ip-from", skip_serializing_if = "Option::is_none")]
pub ip_from: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub priority: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub route_table: Option<u32>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct Route {
#[serde(skip_serializing_if = "Option::is_none")]
pub destination: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub metric: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub next_hop_address: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub next_hop_interface: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub table_id: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub mtu: Option<u32>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct OvsDbGlobalConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub external_ids: Option<BTreeMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub other_config: Option<BTreeMap<String, String>>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct OvnConfiguration {
#[serde(skip_serializing_if = "Option::is_none")]
pub bridge_mappings: Option<Vec<OvnBridgeMapping>>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct OvnBridgeMapping {
#[serde(skip_serializing_if = "Option::is_none")]
pub localnet: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub bridge: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(untagged)]
#[serde(rename_all = "kebab-case")]
pub enum StpSpec {
Bool(bool),
Options(StpOptions),
pub desired_state: DesiredStateSpec,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct LldpState {
#[serde(skip_serializing_if = "Option::is_none")]
pub enabled: Option<bool>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct OvsDb {
#[serde(skip_serializing_if = "Option::is_none")]
pub external_ids: Option<BTreeMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub other_config: Option<BTreeMap<String, String>>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct PatchState {
#[serde(skip_serializing_if = "Option::is_none")]
pub peer: Option<String>,
pub struct DesiredStateSpec {
pub interfaces: Vec<InterfaceSpec>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct Interface {
pub struct InterfaceSpec {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub description: Option<String>,
pub r#type: InterfaceType,
pub r#type: String,
pub state: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub mac_address: Option<String>,
@@ -295,81 +99,9 @@ pub struct Interface {
#[serde(skip_serializing_if = "Option::is_none")]
pub linux_bridge: Option<LinuxBridgeSpec>,
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(alias = "bridge")]
pub ovs_bridge: Option<OvsBridgeSpec>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ethtool: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub accept_all_mac_addresses: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub identifier: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub lldp: Option<LldpState>,
#[serde(skip_serializing_if = "Option::is_none")]
pub permanent_mac_address: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_mtu: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub min_mtu: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub mptcp: Option<Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub profile_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub wait_ip: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ovs_db: Option<OvsDb>,
#[serde(skip_serializing_if = "Option::is_none")]
pub driver: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub patch: Option<PatchState>,
}
#[derive(Deserialize, Serialize, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub enum InterfaceType {
#[serde(rename = "unknown")]
Unknown,
#[serde(rename = "dummy")]
Dummy,
#[serde(rename = "loopback")]
Loopback,
#[serde(rename = "linux-bridge")]
LinuxBridge,
#[serde(rename = "ovs-bridge")]
OvsBridge,
#[serde(rename = "ovs-interface")]
OvsInterface,
#[serde(rename = "bond")]
Bond,
#[serde(rename = "ipvlan")]
IpVlan,
#[serde(rename = "vlan")]
Vlan,
#[serde(rename = "vxlan")]
Vxlan,
#[serde(rename = "mac-vlan")]
Macvlan,
#[serde(rename = "mac-vtap")]
Macvtap,
#[serde(rename = "ethernet")]
Ethernet,
#[serde(rename = "infiniband")]
Infiniband,
#[serde(rename = "vrf")]
Vrf,
#[serde(rename = "veth")]
Veth,
#[serde(rename = "ipsec")]
Ipsec,
#[serde(rename = "hsr")]
Hrs,
}
impl Default for InterfaceType {
fn default() -> Self {
Self::Loopback
}
pub ethtool: Option<EthtoolSpec>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
@@ -417,7 +149,6 @@ pub struct EthernetSpec {
#[serde(rename_all = "kebab-case")]
pub struct BondSpec {
pub mode: String,
#[serde(alias = "port")]
pub ports: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub options: Option<BTreeMap<String, Value>>,
@@ -556,15 +287,11 @@ pub struct OvsBridgeSpec {
#[serde(rename_all = "kebab-case")]
pub struct OvsBridgeOptions {
#[serde(skip_serializing_if = "Option::is_none")]
pub stp: Option<StpSpec>,
pub stp: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub rstp: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub mcast_snooping_enable: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub datapath: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub fail_mode: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
@@ -578,3 +305,18 @@ pub struct OvsPortSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub r#type: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct EthtoolSpec {
// TODO: Properly describe this spec (https://nmstate.io/devel/yaml_api.html#ethtool)
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct EthtoolFecSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub auto: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub mode: Option<String>,
}

View File

@@ -1,6 +1,6 @@
use async_trait::async_trait;
use harmony_types::id::Id;
use log::{info, warn};
use log::{debug, info};
use serde::Serialize;
use crate::{
@@ -9,7 +9,7 @@ use crate::{
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{HostNetworkConfig, NetworkInterface, NetworkManager, Switch, SwitchPort, Topology},
topology::{HostNetworkConfig, NetworkInterface, Switch, SwitchPort, Topology},
};
#[derive(Debug, Clone, Serialize)]
@@ -17,7 +17,7 @@ pub struct HostNetworkConfigurationScore {
pub hosts: Vec<PhysicalHost>,
}
impl<T: Topology + NetworkManager + Switch> Score<T> for HostNetworkConfigurationScore {
impl<T: Topology + Switch> Score<T> for HostNetworkConfigurationScore {
fn name(&self) -> String {
"HostNetworkConfigurationScore".into()
}
@@ -35,7 +35,7 @@ pub struct HostNetworkConfigurationInterpret {
}
impl HostNetworkConfigurationInterpret {
async fn configure_network_for_host<T: Topology + NetworkManager + Switch>(
async fn configure_network_for_host<T: Topology + Switch>(
&self,
topology: &T,
host: &PhysicalHost,
@@ -49,13 +49,6 @@ impl HostNetworkConfigurationInterpret {
switch_ports: vec![],
});
}
if host.network.len() == 1 {
info!("[Host {current_host}/{total_hosts}] Only one interface to configure, skipping");
return Ok(HostNetworkConfig {
host_id: host.id.clone(),
switch_ports: vec![],
});
}
let switch_ports = self
.collect_switch_ports_for_host(topology, host, current_host, total_hosts)
@@ -66,7 +59,7 @@ impl HostNetworkConfigurationInterpret {
switch_ports,
};
if config.switch_ports.len() > 1 {
if !config.switch_ports.is_empty() {
info!(
"[Host {current_host}/{total_hosts}] Found {} ports for {} interfaces",
config.switch_ports.len(),
@@ -74,25 +67,15 @@ impl HostNetworkConfigurationInterpret {
);
info!("[Host {current_host}/{total_hosts}] Configuring host network...");
topology.configure_bond(&config).await.map_err(|e| {
InterpretError::new(format!("Failed to configure host network: {e}"))
})?;
topology
.configure_port_channel(&config)
.configure_host_network(&config)
.await
.map_err(|e| {
InterpretError::new(format!("Failed to configure host network: {e}"))
})?;
} else if config.switch_ports.is_empty() {
.map_err(|e| InterpretError::new(format!("Failed to configure host: {e}")))?;
} else {
info!(
"[Host {current_host}/{total_hosts}] No ports found for {} interfaces, skipping",
host.network.len()
);
} else {
warn!(
"[Host {current_host}/{total_hosts}] Found a single port for {} interfaces, skipping",
host.network.len()
);
}
Ok(config)
@@ -130,7 +113,7 @@ impl HostNetworkConfigurationInterpret {
port,
});
}
Ok(None) => {}
Ok(None) => debug!("No port found for '{mac_address}', skipping"),
Err(e) => {
return Err(InterpretError::new(format!(
"Failed to get port for host '{}': {}",
@@ -150,6 +133,15 @@ impl HostNetworkConfigurationInterpret {
];
for config in configs {
let host = self
.score
.hosts
.iter()
.find(|h| h.id == config.host_id)
.unwrap();
println!("[Host] {host}");
if config.switch_ports.is_empty() {
report.push(format!(
"⏭️ Host {}: SKIPPED (No matching switch ports found)",
@@ -177,7 +169,7 @@ impl HostNetworkConfigurationInterpret {
}
#[async_trait]
impl<T: Topology + NetworkManager + Switch> Interpret<T> for HostNetworkConfigurationInterpret {
impl<T: Topology + Switch> Interpret<T> for HostNetworkConfigurationInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("HostNetworkConfigurationInterpret")
}
@@ -206,12 +198,6 @@ impl<T: Topology + NetworkManager + Switch> Interpret<T> for HostNetworkConfigur
let host_count = self.score.hosts.len();
info!("Started network configuration for {host_count} host(s)...",);
info!("Setting up NetworkManager...",);
topology
.ensure_network_manager_installed()
.await
.map_err(|e| InterpretError::new(format!("NetworkManager setup failed: {e}")))?;
info!("Setting up switch with sane defaults...");
topology
.setup_switch()
@@ -230,7 +216,6 @@ impl<T: Topology + NetworkManager + Switch> Interpret<T> for HostNetworkConfigur
host_configurations.push(host_configuration);
current_host += 1;
}
if current_host > 1 {
let details = self.format_host_configuration(host_configurations);
@@ -257,8 +242,7 @@ mod tests {
use crate::{
hardware::HostCategory,
topology::{
HostNetworkConfig, NetworkError, PreparationError, PreparationOutcome, SwitchError,
SwitchPort,
HostNetworkConfig, PreparationError, PreparationOutcome, SwitchError, SwitchPort,
},
};
use std::{
@@ -283,18 +267,6 @@ mod tests {
speed_mbps: None,
mtu: 1,
};
pub static ref YET_ANOTHER_EXISTING_INTERFACE: NetworkInterface = NetworkInterface {
mac_address: MacAddress::try_from("AA:BB:CC:DD:EE:F3".to_string()).unwrap(),
name: "interface-3".into(),
speed_mbps: None,
mtu: 1,
};
pub static ref LAST_EXISTING_INTERFACE: NetworkInterface = NetworkInterface {
mac_address: MacAddress::try_from("AA:BB:CC:DD:EE:F4".to_string()).unwrap(),
name: "interface-4".into(),
speed_mbps: None,
mtu: 1,
};
pub static ref UNKNOWN_INTERFACE: NetworkInterface = NetworkInterface {
mac_address: MacAddress::try_from("11:22:33:44:55:61".to_string()).unwrap(),
name: "unknown-interface".into(),
@@ -303,8 +275,6 @@ mod tests {
};
pub static ref PORT: PortLocation = PortLocation(1, 0, 42);
pub static ref ANOTHER_PORT: PortLocation = PortLocation(2, 0, 42);
pub static ref YET_ANOTHER_PORT: PortLocation = PortLocation(1, 0, 45);
pub static ref LAST_PORT: PortLocation = PortLocation(2, 0, 45);
}
#[tokio::test]
@@ -320,33 +290,28 @@ mod tests {
}
#[tokio::test]
async fn should_setup_network_manager() {
async fn host_with_one_mac_address_should_create_bond_with_one_interface() {
let host = given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]);
let score = given_score(vec![host]);
let topology = TopologyWithSwitch::new();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let network_manager_setup = topology.network_manager_setup.lock().unwrap();
assert_that!(*network_manager_setup).is_true();
let configured_host_networks = topology.configured_host_networks.lock().unwrap();
assert_that!(*configured_host_networks).contains_exactly(vec![(
HOST_ID.clone(),
HostNetworkConfig {
host_id: HOST_ID.clone(),
switch_ports: vec![SwitchPort {
interface: EXISTING_INTERFACE.clone(),
port: PORT.clone(),
}],
},
)]);
}
#[tokio::test]
async fn host_with_one_mac_address_should_skip_host_configuration() {
let host = given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]);
let score = given_score(vec![host]);
let topology = TopologyWithSwitch::new();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let config = topology.configured_bonds.lock().unwrap();
assert_that!(*config).is_empty();
let config = topology.configured_port_channels.lock().unwrap();
assert_that!(*config).is_empty();
}
#[tokio::test]
async fn host_with_multiple_mac_addresses_should_configure_one_bond_with_all_interfaces() {
async fn host_with_multiple_mac_addresses_should_create_one_bond_with_all_interfaces() {
let score = given_score(vec![given_host(
&HOST_ID,
vec![
@@ -358,8 +323,8 @@ mod tests {
let _ = score.interpret(&Inventory::empty(), &topology).await;
let config = topology.configured_bonds.lock().unwrap();
assert_that!(*config).contains_exactly(vec![(
let configured_host_networks = topology.configured_host_networks.lock().unwrap();
assert_that!(*configured_host_networks).contains_exactly(vec![(
HOST_ID.clone(),
HostNetworkConfig {
host_id: HOST_ID.clone(),
@@ -378,183 +343,49 @@ mod tests {
}
#[tokio::test]
async fn host_with_multiple_mac_addresses_should_configure_one_port_channel_with_all_interfaces()
{
let score = given_score(vec![given_host(
&HOST_ID,
vec![
EXISTING_INTERFACE.clone(),
ANOTHER_EXISTING_INTERFACE.clone(),
],
)]);
async fn multiple_hosts_should_create_one_bond_per_host() {
let score = given_score(vec![
given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]),
given_host(&ANOTHER_HOST_ID, vec![ANOTHER_EXISTING_INTERFACE.clone()]),
]);
let topology = TopologyWithSwitch::new();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let config = topology.configured_port_channels.lock().unwrap();
assert_that!(*config).contains_exactly(vec![(
HOST_ID.clone(),
HostNetworkConfig {
host_id: HOST_ID.clone(),
switch_ports: vec![
SwitchPort {
let configured_host_networks = topology.configured_host_networks.lock().unwrap();
assert_that!(*configured_host_networks).contains_exactly(vec![
(
HOST_ID.clone(),
HostNetworkConfig {
host_id: HOST_ID.clone(),
switch_ports: vec![SwitchPort {
interface: EXISTING_INTERFACE.clone(),
port: PORT.clone(),
},
SwitchPort {
}],
},
),
(
ANOTHER_HOST_ID.clone(),
HostNetworkConfig {
host_id: ANOTHER_HOST_ID.clone(),
switch_ports: vec![SwitchPort {
interface: ANOTHER_EXISTING_INTERFACE.clone(),
port: ANOTHER_PORT.clone(),
},
],
},
)]);
}
#[tokio::test]
async fn multiple_hosts_should_configure_one_bond_per_host() {
let score = given_score(vec![
given_host(
&HOST_ID,
vec![
EXISTING_INTERFACE.clone(),
ANOTHER_EXISTING_INTERFACE.clone(),
],
),
given_host(
&ANOTHER_HOST_ID,
vec![
YET_ANOTHER_EXISTING_INTERFACE.clone(),
LAST_EXISTING_INTERFACE.clone(),
],
),
]);
let topology = TopologyWithSwitch::new();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let config = topology.configured_bonds.lock().unwrap();
assert_that!(*config).contains_exactly(vec![
(
HOST_ID.clone(),
HostNetworkConfig {
host_id: HOST_ID.clone(),
switch_ports: vec![
SwitchPort {
interface: EXISTING_INTERFACE.clone(),
port: PORT.clone(),
},
SwitchPort {
interface: ANOTHER_EXISTING_INTERFACE.clone(),
port: ANOTHER_PORT.clone(),
},
],
},
),
(
ANOTHER_HOST_ID.clone(),
HostNetworkConfig {
host_id: ANOTHER_HOST_ID.clone(),
switch_ports: vec![
SwitchPort {
interface: YET_ANOTHER_EXISTING_INTERFACE.clone(),
port: YET_ANOTHER_PORT.clone(),
},
SwitchPort {
interface: LAST_EXISTING_INTERFACE.clone(),
port: LAST_PORT.clone(),
},
],
}],
},
),
]);
}
#[tokio::test]
async fn multiple_hosts_should_configure_one_port_channel_per_host() {
let score = given_score(vec![
given_host(
&HOST_ID,
vec![
EXISTING_INTERFACE.clone(),
ANOTHER_EXISTING_INTERFACE.clone(),
],
),
given_host(
&ANOTHER_HOST_ID,
vec![
YET_ANOTHER_EXISTING_INTERFACE.clone(),
LAST_EXISTING_INTERFACE.clone(),
],
),
]);
let topology = TopologyWithSwitch::new();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let config = topology.configured_port_channels.lock().unwrap();
assert_that!(*config).contains_exactly(vec![
(
HOST_ID.clone(),
HostNetworkConfig {
host_id: HOST_ID.clone(),
switch_ports: vec![
SwitchPort {
interface: EXISTING_INTERFACE.clone(),
port: PORT.clone(),
},
SwitchPort {
interface: ANOTHER_EXISTING_INTERFACE.clone(),
port: ANOTHER_PORT.clone(),
},
],
},
),
(
ANOTHER_HOST_ID.clone(),
HostNetworkConfig {
host_id: ANOTHER_HOST_ID.clone(),
switch_ports: vec![
SwitchPort {
interface: YET_ANOTHER_EXISTING_INTERFACE.clone(),
port: YET_ANOTHER_PORT.clone(),
},
SwitchPort {
interface: LAST_EXISTING_INTERFACE.clone(),
port: LAST_PORT.clone(),
},
],
},
),
]);
}
#[tokio::test]
async fn port_not_found_for_mac_address_should_not_configure_host() {
async fn port_not_found_for_mac_address_should_not_configure_interface() {
let score = given_score(vec![given_host(&HOST_ID, vec![UNKNOWN_INTERFACE.clone()])]);
let topology = TopologyWithSwitch::new_port_not_found();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let config = topology.configured_port_channels.lock().unwrap();
assert_that!(*config).is_empty();
let config = topology.configured_bonds.lock().unwrap();
assert_that!(*config).is_empty();
}
#[tokio::test]
async fn only_one_port_found_for_multiple_mac_addresses_should_not_configure_host() {
let score = given_score(vec![given_host(
&HOST_ID,
vec![EXISTING_INTERFACE.clone(), UNKNOWN_INTERFACE.clone()],
)]);
let topology = TopologyWithSwitch::new_single_port_found();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let config = topology.configured_port_channels.lock().unwrap();
assert_that!(*config).is_empty();
let config = topology.configured_bonds.lock().unwrap();
assert_that!(*config).is_empty();
let configured_host_networks = topology.configured_host_networks.lock().unwrap();
assert_that!(*configured_host_networks).is_empty();
}
fn given_score(hosts: Vec<PhysicalHost>) -> HostNetworkConfigurationScore {
@@ -591,48 +422,26 @@ mod tests {
}
}
#[derive(Debug)]
struct TopologyWithSwitch {
available_ports: Arc<Mutex<Vec<PortLocation>>>,
configured_port_channels: Arc<Mutex<Vec<(Id, HostNetworkConfig)>>>,
configured_host_networks: Arc<Mutex<Vec<(Id, HostNetworkConfig)>>>,
switch_setup: Arc<Mutex<bool>>,
network_manager_setup: Arc<Mutex<bool>>,
configured_bonds: Arc<Mutex<Vec<(Id, HostNetworkConfig)>>>,
}
impl TopologyWithSwitch {
fn new() -> Self {
Self {
available_ports: Arc::new(Mutex::new(vec![
PORT.clone(),
ANOTHER_PORT.clone(),
YET_ANOTHER_PORT.clone(),
LAST_PORT.clone(),
])),
configured_port_channels: Arc::new(Mutex::new(vec![])),
available_ports: Arc::new(Mutex::new(vec![PORT.clone(), ANOTHER_PORT.clone()])),
configured_host_networks: Arc::new(Mutex::new(vec![])),
switch_setup: Arc::new(Mutex::new(false)),
network_manager_setup: Arc::new(Mutex::new(false)),
configured_bonds: Arc::new(Mutex::new(vec![])),
}
}
fn new_port_not_found() -> Self {
Self {
available_ports: Arc::new(Mutex::new(vec![])),
configured_port_channels: Arc::new(Mutex::new(vec![])),
configured_host_networks: Arc::new(Mutex::new(vec![])),
switch_setup: Arc::new(Mutex::new(false)),
network_manager_setup: Arc::new(Mutex::new(false)),
configured_bonds: Arc::new(Mutex::new(vec![])),
}
}
fn new_single_port_found() -> Self {
Self {
available_ports: Arc::new(Mutex::new(vec![PORT.clone()])),
configured_port_channels: Arc::new(Mutex::new(vec![])),
switch_setup: Arc::new(Mutex::new(false)),
network_manager_setup: Arc::new(Mutex::new(false)),
configured_bonds: Arc::new(Mutex::new(vec![])),
}
}
}
@@ -648,22 +457,6 @@ mod tests {
}
}
#[async_trait]
impl NetworkManager for TopologyWithSwitch {
async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError> {
let mut network_manager_installed = self.network_manager_setup.lock().unwrap();
*network_manager_installed = true;
Ok(())
}
async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError> {
let mut configured_bonds = self.configured_bonds.lock().unwrap();
configured_bonds.push((config.host_id.clone(), config.clone()));
Ok(())
}
}
#[async_trait]
impl Switch for TopologyWithSwitch {
async fn setup_switch(&self) -> Result<(), SwitchError> {
@@ -683,12 +476,12 @@ mod tests {
Ok(Some(ports.remove(0)))
}
async fn configure_port_channel(
async fn configure_host_network(
&self,
config: &HostNetworkConfig,
) -> Result<(), SwitchError> {
let mut configured_port_channels = self.configured_port_channels.lock().unwrap();
configured_port_channels.push((config.host_id.clone(), config.clone()));
let mut configured_host_networks = self.configured_host_networks.lock().unwrap();
configured_host_networks.push((config.host_id.clone(), config.clone()));
Ok(())
}

View File

@@ -1,81 +0,0 @@
use async_trait::async_trait;
use harmony_types::storage::StorageSize;
use std::collections::HashMap;
#[async_trait]
pub trait PostgreSQL {
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String>;
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
/// Abstracts away storage/retrieval details (e.g., secrets, files).
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String>;
/// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster.
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String>;
/// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough).
/// Returns None if no public endpoint (internal-only cluster).
/// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex
/// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait.
async fn get_public_endpoint(&self, cluster_name: &str) -> Result<Option<PostgreSQLEndpoint>, String>;
}
#[derive(Clone, Debug)]
pub struct PostgreSQLConfig {
pub cluster_name: String,
pub instances: u32,
pub storage_size: StorageSize,
pub role: PostgreSQLClusterRole,
}
#[derive(Clone, Debug)]
pub enum PostgreSQLClusterRole {
Primary,
Replica(ReplicaClusterConfig),
}
#[derive(Clone, Debug)]
pub struct ReplicaConfig {
/// Name of the primary cluster this replica will sync from
pub primary_cluster_name: String,
/// Certs extracted from primary via Topology::get_replication_certs()
pub replication_certs: ReplicationCerts,
/// Bootstrap method (e.g., pg_basebackup from primary)
pub bootstrap: BootstrapConfig,
/// External cluster connection details for CNPG spec.externalClusters
pub external_cluster: ExternalClusterConfig,
}
#[derive(Clone, Debug)]
pub struct BootstrapConfig {
pub strategy: BootstrapStrategy,
}
#[derive(Clone, Debug)]
pub enum BootstrapStrategy {
PgBasebackup,
}
#[derive(Clone, Debug)]
pub struct ExternalClusterConfig {
/// Name used in CNPG externalClusters list
pub name: String,
/// Connection params (host/port set by multisite logic, sslmode='verify-ca', etc.)
pub connection_parameters: HashMap<String, String>,
}
#[derive(Clone, Debug)]
pub struct ReplicationCerts {
/// PEM-encoded CA cert from primary
pub ca_cert_pem: String,
/// PEM-encoded streaming_replica client cert (tls.crt)
pub streaming_replica_cert_pem: String,
/// PEM-encoded streaming_replica client key (tls.key)
pub streaming_replica_key_pem: String,
}
#[derive(Clone, Debug)]
pub struct PostgreSQLEndpoint {
pub host: String,
pub port: u16,
}

View File

@@ -1,7 +0,0 @@
pub mod capability;
mod score;
pub mod failover;

View File

@@ -1,236 +0,0 @@
use crate::{
domain::{data::Version, interpret::InterpretStatus},
interpret::{Interpret, InterpretError, InterpretName, Outcome},
inventory::Inventory,
modules::postgresql::capability::PostgreSQL,
score::Score,
topology::Topology,
};
use super::capability::*;
use derive_new::new;
use harmony_types::{id::Id, storage::StorageSize};
use async_trait::async_trait;
use log::info;
use serde::Serialize;
pub struct PostgreSQLScore {
config: PostgreSQLConfig,
}
#[derive(Debug, Clone)]
pub struct PostgreSQLInterpret {
config: PostgreSQLConfig,
version: Version,
status: InterpretStatus,
}
impl PostgreSQLInterpret {
pub fn new(config: PostgreSQLConfig) -> Self {
let version = Version::from("1.0.0").expect("Version should be valid");
Self {
config,
version,
status: InterpretStatus::QUEUED,
}
}
}
impl<T: Topology + PostgreSQL> Score<T> for PostgreSQLScore {
fn name(&self) -> String {
"PostgreSQLScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(PostgreSQLInterpret::new(self.config.clone()))
}
}
#[async_trait]
impl<T: Topology + PostgreSQL> Interpret<T> for PostgreSQLInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("PostgreSQLInterpret")
}
fn get_version(&self) -> crate::domain::data::Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
info!(
"Executing PostgreSQLInterpret with config {:?}",
self.config
);
let cluster_name = topology
.deploy(&self.config)
.await
.map_err(|e| InterpretError::from(e))?;
Ok(Outcome::success(format!(
"Deployed PostgreSQL cluster `{cluster_name}`"
)))
}
}
#[derive(Debug, new, Clone, Serialize)]
pub struct MultisitePostgreSQLScore {
pub cluster_name: String,
pub primary_site: Id,
pub replica_sites: Vec<Id>,
pub instances: u32,
pub storage_size: StorageSize,
}
impl<T: FailoverTopology + crate::modules::postgresql::capability::PostgreSQL> Score<T> for MultisitePostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(MultisitePostgreSQLInterpret::new(self.clone()))
}
fn name(&self) -> String {
"MultisitePostgreSQLScore".to_string()
}
}
#[derive(Debug, Clone)]
pub struct MultisitePostgreSQLInterpret {
score: MultisitePostgreSQLScore,
version: Version,
status: InterpretStatus,
}
impl MultisitePostgreSQLInterpret {
pub fn new(score: MultisitePostgreSQLScore) -> Self {
let version = Version::from("1.0.0").expect("Version should be valid");
Self {
score,
version,
status: InterpretStatus::QUEUED,
}
}
}
#[async_trait]
impl<T: MultisiteTopology + PostgreSQL> Interpret<T> for MultisitePostgreSQLInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("MultisitePostgreSQLInterpret")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<Id> {
todo!("Track child interprets per site")
}
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
info!(
"Orchestrating multisite PostgreSQL: primary {:?}, replicas {:?}",
self.score.primary_site, self.score.replica_sites
);
// 1. Deploy primary
let primary_topo = topology.primary();
let primary_config = PostgreSQLConfig {
cluster_name: self.score.cluster_name.clone(),
instances: self.score.instances,
storage_size: self.score.storage_size.clone(),
role: ClusterRole::Primary,
};
let primary_cluster_name = primary_topo
.deploy(&primary_config)
.await
.map_err(|e| InterpretError::from(format!("Primary deploy failed: {e}")))?;
// 2. Extract certs & public endpoint from primary
let certs = primary_topo
.get_replication_certs(&primary_cluster_name)
.await
.map_err(|e| InterpretError::from(format!("Certs extract failed: {e}")))?;
let public_endpoint = primary_topo
.get_public_endpoint(&primary_cluster_name)
.await??
.ok_or_else(|| InterpretError::from("No public endpoint on primary"))?;
// 3. Deploy replicas
for replica_site in &self.score.replica_sites {
let replica_topo = topology.replica();
.map_err(|e| {
InterpretError::from(format!(
"Replica site {:?} lookup failed: {e}",
replica_site
))
})?;
let connection_params: HashMap<String, String> = [
("host".to_string(), public_endpoint.host.clone()),
("port".to_string(), public_endpoint.port.to_string()),
("dbname".to_string(), "postgres".to_string()),
("user".to_string(), "streaming_replica".to_string()),
("sslmode".to_string(), "verify-ca".to_string()),
("sslnegotiation".to_string(), "direct".to_string()),
]
.into_iter()
.collect();
let external_cluster = ExternalClusterConfig {
name: "primary-cluster".to_string(),
connection_parameters: connection_params,
};
let replica_config_struct = ReplicaConfig {
primary_cluster_name: primary_cluster_name.clone(),
replication_certs: certs.clone(),
bootstrap: BootstrapConfig {
strategy: BootstrapStrategy::PgBasebackup,
},
external_cluster,
};
let replica_config = PostgreSQLConfig {
cluster_name: format!("{}-replica-{}", self.score.cluster_name, replica_site),
instances: self.score.instances,
storage_size: self.score.storage_size.clone(),
role: ClusterRole::Replica(replica_config_struct),
};
let _replica_cluster = replica_topo.deploy(&replica_config).await.map_err(|e| {
InterpretError::from(format!("Replica {:?} deploy failed: {e}", replica_site))
})?;
}
Ok(Outcome::success(format!(
"Multisite PostgreSQL `{}` deployed: primary `{}`, {} replicas",
self.score.cluster_name,
primary_cluster_name,
self.score.replica_sites.len()
)))
}
}

View File

@@ -0,0 +1,96 @@
use std::str::FromStr;
use serde::Serialize;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize)]
pub struct K8sName(pub String);
impl K8sName {
#[cfg(test)]
pub fn dummy() -> Self {
K8sName("example".to_string())
}
fn is_valid(name: &str) -> bool {
if name.is_empty() || name.len() > 63 {
return false;
}
let b = name.as_bytes();
if !b[0].is_ascii_alphanumeric() || !b[b.len() - 1].is_ascii_alphanumeric() {
return false;
}
b.iter()
.all(|c| c.is_ascii_lowercase() || c.is_ascii_digit() || *c == b'-')
}
}
impl FromStr for K8sName {
type Err = K8sNameError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if !Self::is_valid(s) {
return Err(K8sNameError::InvalidFormat(format!(
"Invalid Kubernetes resource name '{s}': \
must match DNS-1123 (lowercase alphanumeric, hyphens, <=63 chars)"
)));
};
Ok(K8sName(s.to_string()))
}
}
#[derive(Debug)]
pub enum K8sNameError {
InvalidFormat(String),
}
impl From<&K8sName> for String {
fn from(value: &K8sName) -> Self {
value.0.clone()
}
}
impl std::fmt::Display for K8sName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_valid_name() {
assert!(K8sName::from_str("k8s-name-test").is_ok());
assert!(K8sName::from_str("n").is_ok());
assert!(K8sName::from_str("node1").is_ok());
assert!(K8sName::from_str("my-app-v2").is_ok());
assert!(K8sName::from_str("service123").is_ok());
assert!(K8sName::from_str("abcdefghijklmnopqrstuvwxyz-1234567890").is_ok());
}
#[test]
fn test_invalid_name() {
assert!(K8sName::from_str("").is_err());
assert!(K8sName::from_str(".config").is_err());
assert!(K8sName::from_str("_hidden").is_err());
assert!(K8sName::from_str("UPPER-CASE").is_err());
assert!(K8sName::from_str("123-$$$").is_err());
assert!(K8sName::from_str("app!name").is_err());
assert!(K8sName::from_str("my..app").is_err());
assert!(K8sName::from_str("backend-").is_err());
assert!(K8sName::from_str("-frontend").is_err());
assert!(K8sName::from_str("InvalidName").is_err());
assert!(
K8sName::from_str("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa")
.is_err()
);
assert!(K8sName::from_str("k8s name").is_err());
assert!(K8sName::from_str("k8s_name").is_err());
assert!(K8sName::from_str("k8s@name").is_err());
}
}

View File

@@ -1,4 +1,4 @@
pub mod id;
pub mod k8s_name;
pub mod net;
pub mod switch;
pub mod storage;

View File

@@ -1,6 +1,6 @@
use serde::{Deserialize, Serialize};
#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
pub struct MacAddress(pub [u8; 6]);
impl MacAddress {
@@ -19,14 +19,6 @@ impl From<&MacAddress> for String {
}
}
impl std::fmt::Debug for MacAddress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_tuple("MacAddress")
.field(&String::from(self))
.finish()
}
}
impl std::fmt::Display for MacAddress {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&String::from(self))

View File

@@ -1,6 +0,0 @@
use serde::{Deserialize, Serialize};
#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Debug)]
pub struct StorageSize {
size_bytes: u64,
}