Compare commits
6 Commits
759a9287d3
...
feat/multi
| Author | SHA1 | Date | |
|---|---|---|---|
| ab78a12599 | |||
| c7cbd9eeac | |||
| 83c1cc82b6 | |||
| 66d346a10c | |||
| 06a004a65d | |||
| 9d4e6acac0 |
19
Cargo.lock
generated
19
Cargo.lock
generated
@@ -1804,25 +1804,6 @@ dependencies = [
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "example-okd-cluster-alerts"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"brocade",
|
||||
"cidr",
|
||||
"env_logger",
|
||||
"harmony",
|
||||
"harmony_cli",
|
||||
"harmony_macros",
|
||||
"harmony_secret",
|
||||
"harmony_secret_derive",
|
||||
"harmony_types",
|
||||
"log",
|
||||
"serde",
|
||||
"tokio",
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "example-okd-install"
|
||||
version = "0.1.0"
|
||||
|
||||
105
docs/modules/Multisite_PostgreSQL.md
Normal file
105
docs/modules/Multisite_PostgreSQL.md
Normal file
@@ -0,0 +1,105 @@
|
||||
# Design Document: Harmony PostgreSQL Module
|
||||
|
||||
**Status:** Draft
|
||||
**Last Updated:** 2025-12-01
|
||||
**Context:** Multi-site Data Replication & Orchestration
|
||||
|
||||
## 1. Overview
|
||||
|
||||
The Harmony PostgreSQL Module provides a high-level abstraction for deploying and managing high-availability PostgreSQL clusters across geographically distributed Kubernetes/OKD sites.
|
||||
|
||||
Instead of manually configuring complex replication slots, firewalls, and operator settings on each cluster, users define a single intent (a **Score**), and Harmony orchestrates the underlying infrastructure (the **Arrangement**) to establish a Primary-Replica architecture.
|
||||
|
||||
Currently, the implementation relies on the **CloudNativePG (CNPG)** operator as the backing engine.
|
||||
|
||||
## 2. Architecture
|
||||
|
||||
### 2.1 The Abstraction Model
|
||||
Following **ADR 003 (Infrastructure Abstraction)**, Harmony separates the *intent* from the *implementation*.
|
||||
|
||||
1. **The Score (Intent):** The user defines a `MultisitePostgreSQL` resource. This describes *what* is needed (e.g., "A Postgres 15 cluster with 10GB storage, Primary on Site A, Replica on Site B").
|
||||
2. **The Interpret (Action):** Harmony MultisitePostgreSQLInterpret processes this Score and orchestrates the deployment on both sites to reach the state defined in the Score.
|
||||
3. **The Capability (Implementation):** The PostgreSQL Capability is implemented by the K8sTopology and the interpret can deploy it, configure it and fetch information about it. The concrete implementation will rely on the mature CloudnativePG operator to manage all the Kubernetes resources required.
|
||||
|
||||
### 2.2 Network Connectivity (TLS Passthrough)
|
||||
|
||||
One of the critical challenges in multi-site orchestration is secure connectivity between clusters that may have dynamic IPs or strict firewalls.
|
||||
|
||||
To solve this, we utilize **OKD/OpenShift Routes with TLS Passthrough**.
|
||||
|
||||
* **Mechanism:** The Primary site exposes a `Route` configured for `termination: passthrough`.
|
||||
* **Routing:** The OpenShift HAProxy router inspects the **SNI (Server Name Indication)** header of the incoming TCP connection to route traffic to the correct PostgreSQL Pod.
|
||||
* **Security:** SSL is **not** terminated at the ingress router. The encrypted stream is passed directly to the PostgreSQL instance. Mutual TLS (mTLS) authentication is handled natively by CNPG between the Primary and Replica instances.
|
||||
* **Dynamic IPs:** Because connections are established via DNS hostnames (the Route URL), this architecture is resilient to dynamic IP changes at the Primary site.
|
||||
|
||||
#### Traffic Flow Diagram
|
||||
|
||||
```text
|
||||
[ Site B: Replica ] [ Site A: Primary ]
|
||||
| |
|
||||
(CNPG Instance) --[Encrypted TCP]--> (OKD HAProxy Router)
|
||||
| (Port 443) |
|
||||
| |
|
||||
| [SNI Inspection]
|
||||
| |
|
||||
| v
|
||||
| (PostgreSQL Primary Pod)
|
||||
| (Port 5432)
|
||||
```
|
||||
|
||||
## 3. Design Decisions
|
||||
|
||||
### Why CloudNativePG?
|
||||
We selected CloudNativePG because it relies exclusively on standard Kubernetes primitives and uses the native PostgreSQL replication protocol (WAL shipping/Streaming). This aligns with Harmony's goal of being "K8s Native."
|
||||
|
||||
### Why TLS Passthrough instead of VPN/NodePort?
|
||||
* **NodePort:** Requires static IPs and opening non-standard ports on the firewall, which violates our security constraints.
|
||||
* **VPN (e.g., Wireguard/Tailscale):** While secure, it introduces significant complexity (sidecars, key management) and external dependencies.
|
||||
* **TLS Passthrough:** Leverages the existing Ingress/Router infrastructure already present in OKD. It requires zero additional software and respects multi-tenancy (Routes are namespaced).
|
||||
|
||||
### Configuration Philosophy (YAGNI)
|
||||
The current design exposes a **generic configuration surface**. Users can configure standard parameters (Storage size, CPU/Memory requests, Postgres version).
|
||||
|
||||
**We explicitly do not expose advanced CNPG or PostgreSQL configurations at this stage.**
|
||||
|
||||
* **Reasoning:** We aim to keep the API surface small and manageable.
|
||||
* **Future Path:** We plan to implement a "pass-through" mechanism to allow sending raw config maps or custom parameters to the underlying engine (CNPG) *only when a concrete use case arises*. Until then, we adhere to the **YAGNI (You Ain't Gonna Need It)** principle to avoid premature optimization and API bloat.
|
||||
|
||||
## 4. Usage Guide
|
||||
|
||||
To deploy a multi-site cluster, apply the `MultisitePostgreSQL` resource to the Harmony Control Plane.
|
||||
|
||||
### Example Manifest
|
||||
|
||||
```yaml
|
||||
apiVersion: harmony.io/v1alpha1
|
||||
kind: MultisitePostgreSQL
|
||||
metadata:
|
||||
name: finance-db
|
||||
namespace: tenant-a
|
||||
spec:
|
||||
version: "15"
|
||||
storage: "10Gi"
|
||||
resources:
|
||||
requests:
|
||||
cpu: "500m"
|
||||
memory: "1Gi"
|
||||
|
||||
# Topology Definition
|
||||
topology:
|
||||
primary:
|
||||
site: "site-paris" # The name of the cluster in Harmony
|
||||
replicas:
|
||||
- site: "site-newyork"
|
||||
```
|
||||
|
||||
### What happens next?
|
||||
1. Harmony detects the CR.
|
||||
2. **On Site Paris:** It deploys a CNPG Cluster (Primary) and creates a Passthrough Route `postgres-finance-db.apps.site-paris.example.com`.
|
||||
3. **On Site New York:** It deploys a CNPG Cluster (Replica) configured with `externalClusters` pointing to the Paris Route.
|
||||
4. Data begins replicating immediately over the encrypted channel.
|
||||
|
||||
## 5. Troubleshooting
|
||||
|
||||
* **Connection Refused:** Ensure the Primary site's Route is successfully admitted by the Ingress Controller.
|
||||
* **Certificate Errors:** CNPG manages mTLS automatically. If errors persist, ensure the CA secrets were correctly propagated by Harmony from Primary to Replica namespaces.
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::{
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
sync::Arc,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
|
||||
use brocade::BrocadeOptions;
|
||||
@@ -107,6 +107,7 @@ async fn main() {
|
||||
},
|
||||
],
|
||||
switch_client: switch_client.clone(),
|
||||
network_manager: OnceLock::new(),
|
||||
};
|
||||
|
||||
let inventory = Inventory {
|
||||
|
||||
@@ -1,22 +0,0 @@
|
||||
[package]
|
||||
name = "example-okd-cluster-alerts"
|
||||
edition = "2024"
|
||||
version.workspace = true
|
||||
readme.workspace = true
|
||||
license.workspace = true
|
||||
publish = false
|
||||
|
||||
[dependencies]
|
||||
harmony = { path = "../../harmony" }
|
||||
harmony_cli = { path = "../../harmony_cli" }
|
||||
harmony_types = { path = "../../harmony_types" }
|
||||
harmony_secret = { path = "../../harmony_secret" }
|
||||
harmony_secret_derive = { path = "../../harmony_secret_derive" }
|
||||
cidr = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
harmony_macros = { path = "../../harmony_macros" }
|
||||
log = { workspace = true }
|
||||
env_logger = { workspace = true }
|
||||
url = { workspace = true }
|
||||
serde.workspace = true
|
||||
brocade = { path = "../../brocade" }
|
||||
@@ -1,26 +0,0 @@
|
||||
use harmony::{
|
||||
inventory::Inventory,
|
||||
modules::monitoring::{
|
||||
alert_channel::discord_alert_channel::DiscordWebhook,
|
||||
okd::cluster_monitoring::OpenshiftClusterAlertScore,
|
||||
},
|
||||
topology::K8sAnywhereTopology,
|
||||
};
|
||||
use harmony_macros::hurl;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
harmony_cli::run(
|
||||
Inventory::autoload(),
|
||||
K8sAnywhereTopology::from_env(),
|
||||
vec![Box::new(OpenshiftClusterAlertScore {
|
||||
receivers: vec![Box::new(DiscordWebhook {
|
||||
name: "discord-webhook-example".to_string(),
|
||||
url: hurl!("http://something.o"),
|
||||
})],
|
||||
})],
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
@@ -9,7 +9,10 @@ use harmony::{
|
||||
use harmony_macros::{ip, ipv4};
|
||||
use harmony_secret::{Secret, SecretManager};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{net::IpAddr, sync::Arc};
|
||||
use std::{
|
||||
net::IpAddr,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
|
||||
#[derive(Secret, Serialize, Deserialize, Debug, PartialEq)]
|
||||
struct OPNSenseFirewallConfig {
|
||||
@@ -81,6 +84,7 @@ pub async fn get_topology() -> HAClusterTopology {
|
||||
},
|
||||
workers: vec![],
|
||||
switch_client: switch_client.clone(),
|
||||
network_manager: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -10,7 +10,10 @@ use harmony::{
|
||||
use harmony_macros::{ip, ipv4};
|
||||
use harmony_secret::{Secret, SecretManager};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::{net::IpAddr, sync::Arc};
|
||||
use std::{
|
||||
net::IpAddr,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
|
||||
pub async fn get_topology() -> HAClusterTopology {
|
||||
let firewall = harmony::topology::LogicalHost {
|
||||
@@ -76,6 +79,7 @@ pub async fn get_topology() -> HAClusterTopology {
|
||||
},
|
||||
workers: vec![],
|
||||
switch_client: switch_client.clone(),
|
||||
network_manager: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::{
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
sync::Arc,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
|
||||
use brocade::BrocadeOptions;
|
||||
@@ -79,6 +79,7 @@ async fn main() {
|
||||
},
|
||||
workers: vec![],
|
||||
switch_client: switch_client.clone(),
|
||||
network_manager: OnceLock::new(),
|
||||
};
|
||||
|
||||
let inventory = Inventory {
|
||||
|
||||
141
harmony/src/domain/topology/failover.rs
Normal file
141
harmony/src/domain/topology/failover.rs
Normal file
@@ -0,0 +1,141 @@
|
||||
use async_trait::async_trait;
|
||||
|
||||
use log::{debug, info};
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::{
|
||||
modules::postgresql::capability::{
|
||||
BootstrapConfig, BootstrapStrategy, ExternalClusterConfig, PostgreSQL,
|
||||
PostgreSQLClusterRole, PostgreSQLConfig, PostgreSQLEndpoint, ReplicaConfig,
|
||||
ReplicationCerts,
|
||||
},
|
||||
topology::{PreparationError, PreparationOutcome, Topology},
|
||||
};
|
||||
|
||||
pub struct FailoverTopology<T> {
|
||||
primary: T,
|
||||
replica: T,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Send + Sync> Topology for FailoverTopology<T> {
|
||||
fn name(&self) -> &str {
|
||||
"FailoverTopology"
|
||||
}
|
||||
|
||||
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: PostgreSQL> PostgreSQL for FailoverTopology<T> {
|
||||
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String> {
|
||||
info!(
|
||||
"Starting deployment of failover topology '{}'",
|
||||
config.cluster_name
|
||||
);
|
||||
|
||||
let primary_config = PostgreSQLConfig {
|
||||
cluster_name: config.cluster_name.clone(),
|
||||
instances: config.instances,
|
||||
storage_size: config.storage_size.clone(),
|
||||
role: PostgreSQLClusterRole::Primary,
|
||||
};
|
||||
|
||||
info!(
|
||||
"Deploying primary cluster '{{}}' ({} instances, {:?} storage)",
|
||||
primary_config.cluster_name, primary_config.storage_size
|
||||
);
|
||||
|
||||
let primary_cluster_name = self.primary.deploy(&primary_config).await?;
|
||||
|
||||
info!("Primary cluster '{primary_cluster_name}' deployed successfully");
|
||||
|
||||
info!("Retrieving replication certificates for primary '{primary_cluster_name}'");
|
||||
|
||||
let certs = self
|
||||
.primary
|
||||
.get_replication_certs(&primary_cluster_name)
|
||||
.await?;
|
||||
|
||||
info!("Replication certificates retrieved successfully");
|
||||
|
||||
info!("Retrieving public endpoint for primary '{primary_cluster_name}");
|
||||
|
||||
let endpoint = self
|
||||
.primary
|
||||
.get_public_endpoint(&primary_cluster_name)
|
||||
.await?
|
||||
.ok_or_else(|| "No public endpoint configured on primary cluster".to_string())?;
|
||||
|
||||
info!(
|
||||
"Public endpoint '{}:{}' retrieved for primary",
|
||||
endpoint.host, endpoint.port
|
||||
);
|
||||
|
||||
info!("Configuring replica connection parameters and bootstrap");
|
||||
|
||||
let mut connection_parameters = HashMap::new();
|
||||
connection_parameters.insert("host".to_string(), endpoint.host);
|
||||
connection_parameters.insert("port".to_string(), endpoint.port.to_string());
|
||||
connection_parameters.insert("dbname".to_string(), "postgres".to_string());
|
||||
connection_parameters.insert("user".to_string(), "streaming_replica".to_string());
|
||||
connection_parameters.insert("sslmode".to_string(), "verify-ca".to_string());
|
||||
connection_parameters.insert("sslnegotiation".to_string(), "direct".to_string());
|
||||
|
||||
debug!("Replica connection parameters: {:?}", connection_parameters);
|
||||
|
||||
let external_cluster = ExternalClusterConfig {
|
||||
name: primary_cluster_name.clone(),
|
||||
connection_parameters,
|
||||
};
|
||||
|
||||
let bootstrap_config = BootstrapConfig {
|
||||
strategy: BootstrapStrategy::PgBasebackup,
|
||||
};
|
||||
|
||||
let replica_cluster_config = ReplicaConfig {
|
||||
primary_cluster_name: primary_cluster_name.clone(),
|
||||
replication_certs: certs,
|
||||
bootstrap: bootstrap_config,
|
||||
external_cluster,
|
||||
};
|
||||
|
||||
let replica_config = PostgreSQLConfig {
|
||||
cluster_name: format!("{}-replica", primary_cluster_name),
|
||||
instances: config.instances,
|
||||
storage_size: config.storage_size.clone(),
|
||||
role: PostgreSQLClusterRole::Replica(replica_cluster_config),
|
||||
};
|
||||
|
||||
info!(
|
||||
"Deploying replica cluster '{}' ({} instances, {:?} storage) on replica topology",
|
||||
replica_config.cluster_name, replica_config.instances, replica_config.storage_size
|
||||
);
|
||||
|
||||
self.replica.deploy(&replica_config).await?;
|
||||
|
||||
info!(
|
||||
"Replica cluster '{}' deployed successfully; failover topology '{}' ready",
|
||||
replica_config.cluster_name, config.cluster_name
|
||||
);
|
||||
|
||||
Ok(primary_cluster_name)
|
||||
}
|
||||
|
||||
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String> {
|
||||
self.primary.get_replication_certs(cluster_name).await
|
||||
}
|
||||
|
||||
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String> {
|
||||
self.primary.get_endpoint(cluster_name).await
|
||||
}
|
||||
|
||||
async fn get_public_endpoint(
|
||||
&self,
|
||||
cluster_name: &str,
|
||||
) -> Result<Option<PostgreSQLEndpoint>, String> {
|
||||
self.primary.get_public_endpoint(cluster_name).await
|
||||
}
|
||||
}
|
||||
@@ -1,29 +1,25 @@
|
||||
use async_trait::async_trait;
|
||||
use harmony_macros::ip;
|
||||
use harmony_types::{
|
||||
id::Id,
|
||||
net::{MacAddress, Url},
|
||||
switch::PortLocation,
|
||||
};
|
||||
use kube::api::ObjectMeta;
|
||||
use log::debug;
|
||||
use log::info;
|
||||
|
||||
use crate::modules::okd::crd::nmstate::{self, NodeNetworkConfigurationPolicy};
|
||||
use crate::infra::network_manager::OpenShiftNmStateNetworkManager;
|
||||
use crate::topology::PxeOptions;
|
||||
use crate::{data::FileContent, modules::okd::crd::nmstate::NMState};
|
||||
use crate::{
|
||||
executors::ExecutorError, modules::okd::crd::nmstate::NodeNetworkConfigurationPolicySpec,
|
||||
};
|
||||
use crate::{data::FileContent, executors::ExecutorError};
|
||||
|
||||
use super::{
|
||||
DHCPStaticEntry, DhcpServer, DnsRecord, DnsRecordType, DnsServer, Firewall, HostNetworkConfig,
|
||||
HttpServer, IpAddress, K8sclient, LoadBalancer, LoadBalancerService, LogicalHost,
|
||||
PreparationError, PreparationOutcome, Router, Switch, SwitchClient, SwitchError, TftpServer,
|
||||
Topology, k8s::K8sClient,
|
||||
HttpServer, IpAddress, K8sclient, LoadBalancer, LoadBalancerService, LogicalHost, NetworkError,
|
||||
NetworkManager, PreparationError, PreparationOutcome, Router, Switch, SwitchClient,
|
||||
SwitchError, TftpServer, Topology, k8s::K8sClient,
|
||||
};
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::sync::{Arc, OnceLock};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HAClusterTopology {
|
||||
@@ -40,6 +36,7 @@ pub struct HAClusterTopology {
|
||||
pub control_plane: Vec<LogicalHost>,
|
||||
pub workers: Vec<LogicalHost>,
|
||||
pub kubeconfig: Option<String>,
|
||||
pub network_manager: OnceLock<Arc<dyn NetworkManager>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -63,7 +60,7 @@ impl K8sclient for HAClusterTopology {
|
||||
K8sClient::try_default().await.map_err(|e| e.to_string())?,
|
||||
)),
|
||||
Some(kubeconfig) => {
|
||||
let Some(client) = K8sClient::from_kubeconfig(&kubeconfig).await else {
|
||||
let Some(client) = K8sClient::from_kubeconfig(kubeconfig).await else {
|
||||
return Err("Failed to create k8s client".to_string());
|
||||
};
|
||||
Ok(Arc::new(client))
|
||||
@@ -93,191 +90,12 @@ impl HAClusterTopology {
|
||||
.to_string()
|
||||
}
|
||||
|
||||
async fn ensure_nmstate_operator_installed(&self) -> Result<(), String> {
|
||||
let k8s_client = self.k8s_client().await?;
|
||||
pub async fn network_manager(&self) -> &dyn NetworkManager {
|
||||
let k8s_client = self.k8s_client().await.unwrap();
|
||||
|
||||
debug!("Installing NMState controller...");
|
||||
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/nmstate.io_nmstates.yaml
|
||||
").unwrap(), Some("nmstate"))
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
debug!("Creating NMState namespace...");
|
||||
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/namespace.yaml
|
||||
").unwrap(), Some("nmstate"))
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
debug!("Creating NMState service account...");
|
||||
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/service_account.yaml
|
||||
").unwrap(), Some("nmstate"))
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
debug!("Creating NMState role...");
|
||||
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role.yaml
|
||||
").unwrap(), Some("nmstate"))
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
debug!("Creating NMState role binding...");
|
||||
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role_binding.yaml
|
||||
").unwrap(), Some("nmstate"))
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
debug!("Creating NMState operator...");
|
||||
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/operator.yaml
|
||||
").unwrap(), Some("nmstate"))
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
k8s_client
|
||||
.wait_until_deployment_ready("nmstate-operator", Some("nmstate"), None)
|
||||
.await?;
|
||||
|
||||
let nmstate = NMState {
|
||||
metadata: ObjectMeta {
|
||||
name: Some("nmstate".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
debug!("Creating NMState: {nmstate:#?}");
|
||||
k8s_client
|
||||
.apply(&nmstate, None)
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn get_next_bond_id(&self) -> u8 {
|
||||
42 // FIXME: Find a better way to declare the bond id
|
||||
}
|
||||
|
||||
async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> {
|
||||
self.ensure_nmstate_operator_installed()
|
||||
.await
|
||||
.map_err(|e| {
|
||||
SwitchError::new(format!(
|
||||
"Can't configure bond, NMState operator not available: {e}"
|
||||
))
|
||||
})?;
|
||||
|
||||
let bond_config = self.create_bond_configuration(config);
|
||||
debug!(
|
||||
"Applying NMState bond config for host {}: {bond_config:#?}",
|
||||
config.host_id
|
||||
);
|
||||
self.k8s_client()
|
||||
.await
|
||||
.unwrap()
|
||||
.apply(&bond_config, None)
|
||||
.await
|
||||
.map_err(|e| SwitchError::new(format!("Failed to configure bond: {e}")))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_bond_configuration(
|
||||
&self,
|
||||
config: &HostNetworkConfig,
|
||||
) -> NodeNetworkConfigurationPolicy {
|
||||
let host_name = &config.host_id;
|
||||
let bond_id = self.get_next_bond_id();
|
||||
let bond_name = format!("bond{bond_id}");
|
||||
|
||||
info!("Configuring bond '{bond_name}' for host '{host_name}'...");
|
||||
|
||||
let mut bond_mtu: Option<u32> = None;
|
||||
let mut copy_mac_from: Option<String> = None;
|
||||
let mut bond_ports = Vec::new();
|
||||
let mut interfaces: Vec<nmstate::InterfaceSpec> = Vec::new();
|
||||
|
||||
for switch_port in &config.switch_ports {
|
||||
let interface_name = switch_port.interface.name.clone();
|
||||
|
||||
interfaces.push(nmstate::InterfaceSpec {
|
||||
name: interface_name.clone(),
|
||||
description: Some(format!("Member of bond {bond_name}")),
|
||||
r#type: "ethernet".to_string(),
|
||||
state: "up".to_string(),
|
||||
mtu: Some(switch_port.interface.mtu),
|
||||
mac_address: Some(switch_port.interface.mac_address.to_string()),
|
||||
ipv4: Some(nmstate::IpStackSpec {
|
||||
enabled: Some(false),
|
||||
..Default::default()
|
||||
}),
|
||||
ipv6: Some(nmstate::IpStackSpec {
|
||||
enabled: Some(false),
|
||||
..Default::default()
|
||||
}),
|
||||
link_aggregation: None,
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
bond_ports.push(interface_name.clone());
|
||||
|
||||
// Use the first port's details for the bond mtu and mac address
|
||||
if bond_mtu.is_none() {
|
||||
bond_mtu = Some(switch_port.interface.mtu);
|
||||
}
|
||||
if copy_mac_from.is_none() {
|
||||
copy_mac_from = Some(interface_name);
|
||||
}
|
||||
}
|
||||
|
||||
interfaces.push(nmstate::InterfaceSpec {
|
||||
name: bond_name.clone(),
|
||||
description: Some(format!("Network bond for host {host_name}")),
|
||||
r#type: "bond".to_string(),
|
||||
state: "up".to_string(),
|
||||
copy_mac_from,
|
||||
ipv4: Some(nmstate::IpStackSpec {
|
||||
dhcp: Some(true),
|
||||
enabled: Some(true),
|
||||
..Default::default()
|
||||
}),
|
||||
ipv6: Some(nmstate::IpStackSpec {
|
||||
dhcp: Some(true),
|
||||
autoconf: Some(true),
|
||||
enabled: Some(true),
|
||||
..Default::default()
|
||||
}),
|
||||
link_aggregation: Some(nmstate::BondSpec {
|
||||
mode: "802.3ad".to_string(),
|
||||
ports: bond_ports,
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
NodeNetworkConfigurationPolicy {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(format!("{host_name}-bond-config")),
|
||||
..Default::default()
|
||||
},
|
||||
spec: NodeNetworkConfigurationPolicySpec {
|
||||
node_selector: Some(BTreeMap::from([(
|
||||
"kubernetes.io/hostname".to_string(),
|
||||
host_name.to_string(),
|
||||
)])),
|
||||
desired_state: nmstate::DesiredStateSpec { interfaces },
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> {
|
||||
debug!("Configuring port channel: {config:#?}");
|
||||
let switch_ports = config.switch_ports.iter().map(|s| s.port.clone()).collect();
|
||||
|
||||
self.switch_client
|
||||
.configure_port_channel(&format!("Harmony_{}", config.host_id), switch_ports)
|
||||
.await
|
||||
.map_err(|e| SwitchError::new(format!("Failed to configure switch: {e}")))?;
|
||||
|
||||
Ok(())
|
||||
self.network_manager
|
||||
.get_or_init(|| Arc::new(OpenShiftNmStateNetworkManager::new(k8s_client.clone())))
|
||||
.as_ref()
|
||||
}
|
||||
|
||||
pub fn autoload() -> Self {
|
||||
@@ -301,6 +119,7 @@ impl HAClusterTopology {
|
||||
bootstrap_host: dummy_host,
|
||||
control_plane: vec![],
|
||||
workers: vec![],
|
||||
network_manager: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -458,21 +277,40 @@ impl HttpServer for HAClusterTopology {
|
||||
#[async_trait]
|
||||
impl Switch for HAClusterTopology {
|
||||
async fn setup_switch(&self) -> Result<(), SwitchError> {
|
||||
self.switch_client.setup().await?;
|
||||
Ok(())
|
||||
self.switch_client.setup().await.map(|_| ())
|
||||
}
|
||||
|
||||
async fn get_port_for_mac_address(
|
||||
&self,
|
||||
mac_address: &MacAddress,
|
||||
) -> Result<Option<PortLocation>, SwitchError> {
|
||||
let port = self.switch_client.find_port(mac_address).await?;
|
||||
Ok(port)
|
||||
self.switch_client.find_port(mac_address).await
|
||||
}
|
||||
|
||||
async fn configure_host_network(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> {
|
||||
self.configure_bond(config).await?;
|
||||
self.configure_port_channel(config).await
|
||||
async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> {
|
||||
debug!("Configuring port channel: {config:#?}");
|
||||
let switch_ports = config.switch_ports.iter().map(|s| s.port.clone()).collect();
|
||||
|
||||
self.switch_client
|
||||
.configure_port_channel(&format!("Harmony_{}", config.host_id), switch_ports)
|
||||
.await
|
||||
.map_err(|e| SwitchError::new(format!("Failed to configure port-channel: {e}")))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl NetworkManager for HAClusterTopology {
|
||||
async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError> {
|
||||
self.network_manager()
|
||||
.await
|
||||
.ensure_network_manager_installed()
|
||||
.await
|
||||
}
|
||||
|
||||
async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError> {
|
||||
self.network_manager().await.configure_bond(config).await
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -5,15 +5,17 @@ use k8s_openapi::{
|
||||
ClusterResourceScope, NamespaceResourceScope,
|
||||
api::{
|
||||
apps::v1::Deployment,
|
||||
core::v1::{Pod, ServiceAccount},
|
||||
core::v1::{Node, Pod, ServiceAccount},
|
||||
},
|
||||
apimachinery::pkg::version::Info,
|
||||
};
|
||||
use kube::{
|
||||
Client, Config, Discovery, Error, Resource,
|
||||
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
|
||||
api::{
|
||||
Api, AttachParams, DeleteParams, ListParams, ObjectList, Patch, PatchParams, ResourceExt,
|
||||
},
|
||||
config::{KubeConfigOptions, Kubeconfig},
|
||||
core::{DynamicResourceScope, ErrorResponse},
|
||||
core::ErrorResponse,
|
||||
discovery::{ApiCapabilities, Scope},
|
||||
error::DiscoveryError,
|
||||
runtime::reflector::Lookup,
|
||||
@@ -23,7 +25,7 @@ use kube::{
|
||||
api::{ApiResource, GroupVersionKind},
|
||||
runtime::wait::await_condition,
|
||||
};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use log::{debug, error, trace, warn};
|
||||
use serde::{Serialize, de::DeserializeOwned};
|
||||
use serde_json::json;
|
||||
use similar::TextDiff;
|
||||
@@ -94,23 +96,6 @@ impl K8sClient {
|
||||
resource.get(name).await
|
||||
}
|
||||
|
||||
pub async fn get_secret_json_value(
|
||||
&self,
|
||||
name: &str,
|
||||
namespace: Option<&str>,
|
||||
) -> Result<DynamicObject, Error> {
|
||||
self.get_resource_json_value(
|
||||
name,
|
||||
namespace,
|
||||
&GroupVersionKind {
|
||||
group: "".to_string(),
|
||||
version: "v1".to_string(),
|
||||
kind: "Secret".to_string(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_deployment(
|
||||
&self,
|
||||
name: &str,
|
||||
@@ -354,169 +339,6 @@ impl K8sClient {
|
||||
}
|
||||
}
|
||||
|
||||
fn get_api_for_dynamic_object(
|
||||
&self,
|
||||
object: &DynamicObject,
|
||||
ns: Option<&str>,
|
||||
) -> Result<Api<DynamicObject>, Error> {
|
||||
let api_resource = object
|
||||
.types
|
||||
.as_ref()
|
||||
.and_then(|t| {
|
||||
let parts: Vec<&str> = t.api_version.split('/').collect();
|
||||
match parts.as_slice() {
|
||||
[version] => Some(ApiResource::from_gvk(&GroupVersionKind::gvk(
|
||||
"", version, &t.kind,
|
||||
))),
|
||||
[group, version] => Some(ApiResource::from_gvk(&GroupVersionKind::gvk(
|
||||
group, version, &t.kind,
|
||||
))),
|
||||
_ => None,
|
||||
}
|
||||
})
|
||||
.ok_or_else(|| {
|
||||
Error::BuildRequest(kube::core::request::Error::Validation(
|
||||
"Invalid apiVersion in DynamicObject {object:#?}".to_string(),
|
||||
))
|
||||
})?;
|
||||
|
||||
match ns {
|
||||
Some(ns) => Ok(Api::namespaced_with(self.client.clone(), ns, &api_resource)),
|
||||
None => Ok(Api::default_namespaced_with(
|
||||
self.client.clone(),
|
||||
&api_resource,
|
||||
)),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn apply_dynamic_many(
|
||||
&self,
|
||||
resource: &[DynamicObject],
|
||||
namespace: Option<&str>,
|
||||
force_conflicts: bool,
|
||||
) -> Result<Vec<DynamicObject>, Error> {
|
||||
let mut result = Vec::new();
|
||||
for r in resource.iter() {
|
||||
result.push(self.apply_dynamic(r, namespace, force_conflicts).await?);
|
||||
}
|
||||
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Apply DynamicObject resource to the cluster
|
||||
pub async fn apply_dynamic(
|
||||
&self,
|
||||
resource: &DynamicObject,
|
||||
namespace: Option<&str>,
|
||||
force_conflicts: bool,
|
||||
) -> Result<DynamicObject, Error> {
|
||||
// Build API for this dynamic object
|
||||
let api = self.get_api_for_dynamic_object(resource, namespace)?;
|
||||
let name = resource
|
||||
.metadata
|
||||
.name
|
||||
.as_ref()
|
||||
.ok_or_else(|| {
|
||||
Error::BuildRequest(kube::core::request::Error::Validation(
|
||||
"DynamicObject must have metadata.name".to_string(),
|
||||
))
|
||||
})?
|
||||
.as_str();
|
||||
|
||||
debug!(
|
||||
"Applying dynamic resource kind={:?} apiVersion={:?} name='{}' ns={:?}",
|
||||
resource.types.as_ref().map(|t| &t.kind),
|
||||
resource.types.as_ref().map(|t| &t.api_version),
|
||||
name,
|
||||
namespace
|
||||
);
|
||||
trace!(
|
||||
"Dynamic resource payload:\n{:#}",
|
||||
serde_json::to_value(resource).unwrap_or(serde_json::Value::Null)
|
||||
);
|
||||
|
||||
// Using same field manager as in apply()
|
||||
let mut patch_params = PatchParams::apply("harmony");
|
||||
patch_params.force = force_conflicts;
|
||||
|
||||
if *crate::config::DRY_RUN {
|
||||
// Dry-run path: fetch current, show diff, and return appropriate object
|
||||
match api.get(name).await {
|
||||
Ok(current) => {
|
||||
trace!("Received current dynamic value {current:#?}");
|
||||
|
||||
println!("\nPerforming dry-run for resource: '{}'", name);
|
||||
|
||||
// Serialize current and new, and strip status from current if present
|
||||
let mut current_yaml =
|
||||
serde_yaml::to_value(¤t).unwrap_or_else(|_| serde_yaml::Value::Null);
|
||||
if let Some(map) = current_yaml.as_mapping_mut() {
|
||||
if map.contains_key(&serde_yaml::Value::String("status".to_string())) {
|
||||
let removed =
|
||||
map.remove(&serde_yaml::Value::String("status".to_string()));
|
||||
trace!("Removed status from current dynamic object: {:?}", removed);
|
||||
} else {
|
||||
trace!(
|
||||
"Did not find status entry for current dynamic object {}/{}",
|
||||
current.metadata.namespace.as_deref().unwrap_or(""),
|
||||
current.metadata.name.as_deref().unwrap_or("")
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
let current_yaml = serde_yaml::to_string(¤t_yaml)
|
||||
.unwrap_or_else(|_| "Failed to serialize current resource".to_string());
|
||||
let new_yaml = serde_yaml::to_string(resource)
|
||||
.unwrap_or_else(|_| "Failed to serialize new resource".to_string());
|
||||
|
||||
if current_yaml == new_yaml {
|
||||
println!("No changes detected.");
|
||||
return Ok(current);
|
||||
}
|
||||
|
||||
println!("Changes detected:");
|
||||
let diff = TextDiff::from_lines(¤t_yaml, &new_yaml);
|
||||
for change in diff.iter_all_changes() {
|
||||
let sign = match change.tag() {
|
||||
similar::ChangeTag::Delete => "-",
|
||||
similar::ChangeTag::Insert => "+",
|
||||
similar::ChangeTag::Equal => " ",
|
||||
};
|
||||
print!("{}{}", sign, change);
|
||||
}
|
||||
|
||||
// Return the incoming resource as the would-be applied state
|
||||
Ok(resource.clone())
|
||||
}
|
||||
Err(Error::Api(ErrorResponse { code: 404, .. })) => {
|
||||
println!("\nPerforming dry-run for new resource: '{}'", name);
|
||||
println!(
|
||||
"Resource does not exist. It would be created with the following content:"
|
||||
);
|
||||
let new_yaml = serde_yaml::to_string(resource)
|
||||
.unwrap_or_else(|_| "Failed to serialize new resource".to_string());
|
||||
for line in new_yaml.lines() {
|
||||
println!("+{}", line);
|
||||
}
|
||||
Ok(resource.clone())
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to get dynamic resource '{}': {}", name, e);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
} else {
|
||||
// Real apply via server-side apply
|
||||
debug!("Patching (server-side apply) dynamic resource '{}'", name);
|
||||
api.patch(name, &patch_params, &Patch::Apply(resource))
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!("Failed to apply dynamic resource '{}': {}", name, e);
|
||||
e
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/// Apply a resource in namespace
|
||||
///
|
||||
/// See `kubectl apply` for more information on the expected behavior of this function
|
||||
@@ -744,7 +566,58 @@ impl K8sClient {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub(crate) async fn from_kubeconfig(path: &str) -> Option<K8sClient> {
|
||||
/// Gets a single named resource of a specific type `K`.
|
||||
///
|
||||
/// This function uses the `ApplyStrategy` trait to correctly determine
|
||||
/// whether to look in a specific namespace or in the entire cluster.
|
||||
///
|
||||
/// Returns `Ok(None)` if the resource is not found (404).
|
||||
pub async fn get_resource<K>(
|
||||
&self,
|
||||
name: &str,
|
||||
namespace: Option<&str>,
|
||||
) -> Result<Option<K>, Error>
|
||||
where
|
||||
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
|
||||
<K as Resource>::Scope: ApplyStrategy<K>,
|
||||
<K as kube::Resource>::DynamicType: Default,
|
||||
{
|
||||
let api: Api<K> =
|
||||
<<K as Resource>::Scope as ApplyStrategy<K>>::get_api(&self.client, namespace);
|
||||
|
||||
api.get_opt(name).await
|
||||
}
|
||||
|
||||
/// Lists all resources of a specific type `K`.
|
||||
///
|
||||
/// This function uses the `ApplyStrategy` trait to correctly determine
|
||||
/// whether to list from a specific namespace or from the entire cluster.
|
||||
pub async fn list_resources<K>(
|
||||
&self,
|
||||
namespace: Option<&str>,
|
||||
list_params: Option<ListParams>,
|
||||
) -> Result<ObjectList<K>, Error>
|
||||
where
|
||||
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
|
||||
<K as Resource>::Scope: ApplyStrategy<K>,
|
||||
<K as kube::Resource>::DynamicType: Default,
|
||||
{
|
||||
let api: Api<K> =
|
||||
<<K as Resource>::Scope as ApplyStrategy<K>>::get_api(&self.client, namespace);
|
||||
|
||||
let list_params = list_params.unwrap_or_default();
|
||||
api.list(&list_params).await
|
||||
}
|
||||
|
||||
/// Fetches a list of all Nodes in the cluster.
|
||||
pub async fn get_nodes(
|
||||
&self,
|
||||
list_params: Option<ListParams>,
|
||||
) -> Result<ObjectList<Node>, Error> {
|
||||
self.list_resources(None, list_params).await
|
||||
}
|
||||
|
||||
pub async fn from_kubeconfig(path: &str) -> Option<K8sClient> {
|
||||
let k = match Kubeconfig::read_from(path) {
|
||||
Ok(k) => k,
|
||||
Err(e) => {
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
mod ha_cluster;
|
||||
pub mod ingress;
|
||||
mod failover;
|
||||
pub use failover::*;
|
||||
use harmony_types::net::IpAddress;
|
||||
mod host_binding;
|
||||
mod http;
|
||||
|
||||
@@ -15,7 +15,7 @@ use harmony_types::{
|
||||
};
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::{executors::ExecutorError, hardware::PhysicalHost};
|
||||
use crate::executors::ExecutorError;
|
||||
|
||||
use super::{LogicalHost, k8s::K8sClient};
|
||||
|
||||
@@ -183,6 +183,37 @@ impl FromStr for DnsRecordType {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait NetworkManager: Debug + Send + Sync {
|
||||
async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError>;
|
||||
async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError>;
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, new)]
|
||||
pub struct NetworkError {
|
||||
msg: String,
|
||||
}
|
||||
|
||||
impl fmt::Display for NetworkError {
|
||||
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
|
||||
f.write_str(&self.msg)
|
||||
}
|
||||
}
|
||||
|
||||
impl Error for NetworkError {}
|
||||
|
||||
impl From<kube::Error> for NetworkError {
|
||||
fn from(value: kube::Error) -> Self {
|
||||
NetworkError::new(value.to_string())
|
||||
}
|
||||
}
|
||||
|
||||
impl From<String> for NetworkError {
|
||||
fn from(value: String) -> Self {
|
||||
NetworkError::new(value)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
pub trait Switch: Send + Sync {
|
||||
async fn setup_switch(&self) -> Result<(), SwitchError>;
|
||||
@@ -192,7 +223,7 @@ pub trait Switch: Send + Sync {
|
||||
mac_address: &MacAddress,
|
||||
) -> Result<Option<PortLocation>, SwitchError>;
|
||||
|
||||
async fn configure_host_network(&self, config: &HostNetworkConfig) -> Result<(), SwitchError>;
|
||||
async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError>;
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug, PartialEq)]
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use std::any::Any;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use kube::api::DynamicObject;
|
||||
use log::debug;
|
||||
|
||||
use crate::{
|
||||
@@ -77,14 +76,6 @@ pub trait AlertReceiver<S: AlertSender>: std::fmt::Debug + Send + Sync {
|
||||
fn name(&self) -> String;
|
||||
fn clone_box(&self) -> Box<dyn AlertReceiver<S>>;
|
||||
fn as_any(&self) -> &dyn Any;
|
||||
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String>;
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct AlertManagerReceiver {
|
||||
pub receiver_config: serde_json::Value,
|
||||
// FIXME we should not leak k8s here. DynamicObject is k8s specific
|
||||
pub additional_ressources: Vec<DynamicObject>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -14,7 +14,7 @@ use k8s_openapi::{
|
||||
},
|
||||
apimachinery::pkg::util::intstr::IntOrString,
|
||||
};
|
||||
use kube::{api::DynamicObject, Resource};
|
||||
use kube::Resource;
|
||||
use log::debug;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde_json::json;
|
||||
|
||||
@@ -4,5 +4,6 @@ pub mod hp_ilo;
|
||||
pub mod intel_amt;
|
||||
pub mod inventory;
|
||||
pub mod kube;
|
||||
pub mod network_manager;
|
||||
pub mod opnsense;
|
||||
mod sqlx;
|
||||
|
||||
257
harmony/src/infra/network_manager.rs
Normal file
257
harmony/src/infra/network_manager.rs
Normal file
@@ -0,0 +1,257 @@
|
||||
use std::{
|
||||
collections::{BTreeMap, HashSet},
|
||||
sync::Arc,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::id::Id;
|
||||
use k8s_openapi::api::core::v1::Node;
|
||||
use kube::{
|
||||
ResourceExt,
|
||||
api::{ObjectList, ObjectMeta},
|
||||
};
|
||||
use log::{debug, info};
|
||||
|
||||
use crate::{
|
||||
modules::okd::crd::nmstate,
|
||||
topology::{HostNetworkConfig, NetworkError, NetworkManager, k8s::K8sClient},
|
||||
};
|
||||
|
||||
pub struct OpenShiftNmStateNetworkManager {
|
||||
k8s_client: Arc<K8sClient>,
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for OpenShiftNmStateNetworkManager {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("OpenShiftNmStateNetworkManager").finish()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl NetworkManager for OpenShiftNmStateNetworkManager {
|
||||
async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError> {
|
||||
debug!("Installing NMState controller...");
|
||||
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/nmstate.io_nmstates.yaml
|
||||
").unwrap(), Some("nmstate"))
|
||||
.await?;
|
||||
|
||||
debug!("Creating NMState namespace...");
|
||||
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/namespace.yaml
|
||||
").unwrap(), Some("nmstate"))
|
||||
.await?;
|
||||
|
||||
debug!("Creating NMState service account...");
|
||||
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/service_account.yaml
|
||||
").unwrap(), Some("nmstate"))
|
||||
.await?;
|
||||
|
||||
debug!("Creating NMState role...");
|
||||
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role.yaml
|
||||
").unwrap(), Some("nmstate"))
|
||||
.await?;
|
||||
|
||||
debug!("Creating NMState role binding...");
|
||||
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role_binding.yaml
|
||||
").unwrap(), Some("nmstate"))
|
||||
.await?;
|
||||
|
||||
debug!("Creating NMState operator...");
|
||||
self.k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/operator.yaml
|
||||
").unwrap(), Some("nmstate"))
|
||||
.await?;
|
||||
|
||||
self.k8s_client
|
||||
.wait_until_deployment_ready("nmstate-operator", Some("nmstate"), None)
|
||||
.await?;
|
||||
|
||||
let nmstate = nmstate::NMState {
|
||||
metadata: ObjectMeta {
|
||||
name: Some("nmstate".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
};
|
||||
debug!(
|
||||
"Creating NMState:\n{}",
|
||||
serde_yaml::to_string(&nmstate).unwrap()
|
||||
);
|
||||
self.k8s_client.apply(&nmstate, None).await?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError> {
|
||||
let hostname = self.get_hostname(&config.host_id).await.map_err(|e| {
|
||||
NetworkError::new(format!(
|
||||
"Can't configure bond, can't get hostname for host '{}': {e}",
|
||||
config.host_id
|
||||
))
|
||||
})?;
|
||||
let bond_id = self.get_next_bond_id(&hostname).await.map_err(|e| {
|
||||
NetworkError::new(format!(
|
||||
"Can't configure bond, can't get an available bond id for host '{}': {e}",
|
||||
config.host_id
|
||||
))
|
||||
})?;
|
||||
let bond_config = self.create_bond_configuration(&hostname, &bond_id, config);
|
||||
|
||||
debug!(
|
||||
"Applying NMState bond config for host {}:\n{}",
|
||||
config.host_id,
|
||||
serde_yaml::to_string(&bond_config).unwrap(),
|
||||
);
|
||||
self.k8s_client
|
||||
.apply(&bond_config, None)
|
||||
.await
|
||||
.map_err(|e| NetworkError::new(format!("Failed to configure bond: {e}")))?;
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
impl OpenShiftNmStateNetworkManager {
|
||||
pub fn new(k8s_client: Arc<K8sClient>) -> Self {
|
||||
Self { k8s_client }
|
||||
}
|
||||
|
||||
fn create_bond_configuration(
|
||||
&self,
|
||||
host: &str,
|
||||
bond_name: &str,
|
||||
config: &HostNetworkConfig,
|
||||
) -> nmstate::NodeNetworkConfigurationPolicy {
|
||||
info!("Configuring bond '{bond_name}' for host '{host}'...");
|
||||
|
||||
let mut bond_mtu: Option<u32> = None;
|
||||
let mut copy_mac_from: Option<String> = None;
|
||||
let mut bond_ports = Vec::new();
|
||||
let mut interfaces: Vec<nmstate::Interface> = Vec::new();
|
||||
|
||||
for switch_port in &config.switch_ports {
|
||||
let interface_name = switch_port.interface.name.clone();
|
||||
|
||||
interfaces.push(nmstate::Interface {
|
||||
name: interface_name.clone(),
|
||||
description: Some(format!("Member of bond {bond_name}")),
|
||||
r#type: nmstate::InterfaceType::Ethernet,
|
||||
state: "up".to_string(),
|
||||
ipv4: Some(nmstate::IpStackSpec {
|
||||
enabled: Some(false),
|
||||
..Default::default()
|
||||
}),
|
||||
ipv6: Some(nmstate::IpStackSpec {
|
||||
enabled: Some(false),
|
||||
..Default::default()
|
||||
}),
|
||||
link_aggregation: None,
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
bond_ports.push(interface_name.clone());
|
||||
|
||||
// Use the first port's details for the bond mtu and mac address
|
||||
if bond_mtu.is_none() {
|
||||
bond_mtu = Some(switch_port.interface.mtu);
|
||||
}
|
||||
if copy_mac_from.is_none() {
|
||||
copy_mac_from = Some(interface_name);
|
||||
}
|
||||
}
|
||||
|
||||
interfaces.push(nmstate::Interface {
|
||||
name: bond_name.to_string(),
|
||||
description: Some(format!("HARMONY - Network bond for host {host}")),
|
||||
r#type: nmstate::InterfaceType::Bond,
|
||||
state: "up".to_string(),
|
||||
copy_mac_from,
|
||||
ipv4: Some(nmstate::IpStackSpec {
|
||||
dhcp: Some(true),
|
||||
enabled: Some(true),
|
||||
..Default::default()
|
||||
}),
|
||||
ipv6: Some(nmstate::IpStackSpec {
|
||||
dhcp: Some(true),
|
||||
autoconf: Some(true),
|
||||
enabled: Some(true),
|
||||
..Default::default()
|
||||
}),
|
||||
link_aggregation: Some(nmstate::BondSpec {
|
||||
mode: "802.3ad".to_string(),
|
||||
ports: bond_ports,
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
});
|
||||
|
||||
nmstate::NodeNetworkConfigurationPolicy {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(format!("{host}-bond-config")),
|
||||
..Default::default()
|
||||
},
|
||||
spec: nmstate::NodeNetworkConfigurationPolicySpec {
|
||||
node_selector: Some(BTreeMap::from([(
|
||||
"kubernetes.io/hostname".to_string(),
|
||||
host.to_string(),
|
||||
)])),
|
||||
desired_state: nmstate::NetworkState {
|
||||
interfaces,
|
||||
..Default::default()
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_hostname(&self, host_id: &Id) -> Result<String, String> {
|
||||
let nodes: ObjectList<Node> = self
|
||||
.k8s_client
|
||||
.list_resources(None, None)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to list nodes: {e}"))?;
|
||||
|
||||
let Some(node) = nodes.iter().find(|n| {
|
||||
n.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.node_info.as_ref())
|
||||
.map(|i| i.system_uuid == host_id.to_string())
|
||||
.unwrap_or(false)
|
||||
}) else {
|
||||
return Err(format!("No node found for host '{host_id}'"));
|
||||
};
|
||||
|
||||
node.labels()
|
||||
.get("kubernetes.io/hostname")
|
||||
.ok_or(format!(
|
||||
"Node '{host_id}' has no kubernetes.io/hostname label"
|
||||
))
|
||||
.cloned()
|
||||
}
|
||||
|
||||
async fn get_next_bond_id(&self, hostname: &str) -> Result<String, String> {
|
||||
let network_state: Option<nmstate::NodeNetworkState> = self
|
||||
.k8s_client
|
||||
.get_resource(hostname, None)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to list nodes: {e}"))?;
|
||||
|
||||
let interfaces = vec![];
|
||||
let existing_bonds: Vec<&nmstate::Interface> = network_state
|
||||
.as_ref()
|
||||
.and_then(|network_state| network_state.status.current_state.as_ref())
|
||||
.map_or(&interfaces, |current_state| ¤t_state.interfaces)
|
||||
.iter()
|
||||
.filter(|i| i.r#type == nmstate::InterfaceType::Bond)
|
||||
.collect();
|
||||
|
||||
let used_ids: HashSet<u32> = existing_bonds
|
||||
.iter()
|
||||
.filter_map(|i| {
|
||||
i.name
|
||||
.strip_prefix("bond")
|
||||
.and_then(|id| id.parse::<u32>().ok())
|
||||
})
|
||||
.collect();
|
||||
|
||||
let next_id = (0..).find(|id| !used_ids.contains(id)).unwrap();
|
||||
Ok(format!("bond{next_id}"))
|
||||
}
|
||||
}
|
||||
@@ -74,7 +74,11 @@ impl<T: Topology> Interpret<T> for DiscoverHostForRoleInterpret {
|
||||
|
||||
match ans {
|
||||
Ok(choice) => {
|
||||
info!("Selected {} as the bootstrap node.", choice.summary());
|
||||
info!(
|
||||
"Selected {} as the {:?} node.",
|
||||
choice.summary(),
|
||||
self.score.role
|
||||
);
|
||||
host_repo
|
||||
.save_role_mapping(&self.score.role, &choice)
|
||||
.await?;
|
||||
@@ -90,10 +94,7 @@ impl<T: Topology> Interpret<T> for DiscoverHostForRoleInterpret {
|
||||
"Failed to select node for role {:?} : {}",
|
||||
self.score.role, e
|
||||
);
|
||||
return Err(InterpretError::new(format!(
|
||||
"Could not select host : {}",
|
||||
e.to_string()
|
||||
)));
|
||||
return Err(InterpretError::new(format!("Could not select host : {e}")));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -17,3 +17,4 @@ pub mod prometheus;
|
||||
pub mod storage;
|
||||
pub mod tenant;
|
||||
pub mod tftp;
|
||||
pub mod postgresql;
|
||||
|
||||
@@ -3,20 +3,16 @@ use std::collections::BTreeMap;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use k8s_openapi::api::core::v1::Secret;
|
||||
use kube::Resource;
|
||||
use kube::api::{DynamicObject, ObjectMeta};
|
||||
use kube::api::ObjectMeta;
|
||||
use log::debug;
|
||||
use serde::Serialize;
|
||||
use serde_json::json;
|
||||
use serde_yaml::{Mapping, Value};
|
||||
|
||||
use crate::infra::kube::kube_resource_to_dynamic;
|
||||
use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::{
|
||||
AlertmanagerConfig, AlertmanagerConfigSpec, CRDPrometheus,
|
||||
};
|
||||
use crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::RHOBObservability;
|
||||
use crate::modules::monitoring::okd::OpenshiftClusterAlertSender;
|
||||
use crate::topology::oberservability::monitoring::AlertManagerReceiver;
|
||||
use crate::{
|
||||
interpret::{InterpretError, Outcome},
|
||||
modules::monitoring::{
|
||||
@@ -32,25 +28,14 @@ use harmony_types::net::Url;
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct DiscordWebhook {
|
||||
// FIXME use a stricter type as this is used as a k8s resource name. It could also be converted
|
||||
// to remove whitespace and other invalid characters, but this is a potential bug that is not
|
||||
// very easy to figure out for beginners.
|
||||
//
|
||||
// It gives out error messages like this :
|
||||
//
|
||||
// [2025-10-30 15:10:49 ERROR harmony::domain::topology::k8s] Failed to get dynamic resource 'Webhook example-secret': Failed to build request: failed to build request: invalid uri character
|
||||
// [2025-10-30 15:10:49 ERROR harmony_cli::cli_logger] ⚠️ InterpretError : Failed to build request: failed to build request: invalid uri character
|
||||
// [2025-10-30 15:10:49 DEBUG harmony::domain::maestro] Got result Err(InterpretError { msg: "InterpretError : Failed to build request: failed to build request: invalid uri character" })
|
||||
// [2025-10-30 15:10:49 INFO harmony_cli::cli_logger] 🎼 Harmony completed
|
||||
//
|
||||
// thread 'main' panicked at examples/okd_cluster_alerts/src/main.rs:25:6:
|
||||
// called `Result::unwrap()` on an `Err` value: InterpretError { msg: "InterpretError : Failed to build request: failed to build request: invalid uri character" }
|
||||
pub name: String,
|
||||
pub url: Url,
|
||||
}
|
||||
|
||||
impl DiscordWebhook {
|
||||
fn get_receiver_config(&self) -> Result<AlertManagerReceiver, String> {
|
||||
#[async_trait]
|
||||
impl AlertReceiver<RHOBObservability> for DiscordWebhook {
|
||||
async fn install(&self, sender: &RHOBObservability) -> Result<Outcome, InterpretError> {
|
||||
let ns = sender.namespace.clone();
|
||||
let secret_name = format!("{}-secret", self.name.clone());
|
||||
let webhook_key = format!("{}", self.url.clone());
|
||||
|
||||
@@ -67,74 +52,26 @@ impl DiscordWebhook {
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
Ok(AlertManagerReceiver {
|
||||
additional_ressources: vec![kube_resource_to_dynamic(&secret)?],
|
||||
|
||||
receiver_config: json!({
|
||||
"name": self.name,
|
||||
"discordConfigs": [
|
||||
{
|
||||
"apiURL": {
|
||||
"name": secret_name,
|
||||
"key": "webhook-url",
|
||||
},
|
||||
"title": "{{ template \"discord.default.title\" . }}",
|
||||
"message": "{{ template \"discord.default.message\" . }}"
|
||||
}
|
||||
]
|
||||
}),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AlertReceiver<OpenshiftClusterAlertSender> for DiscordWebhook {
|
||||
async fn install(
|
||||
&self,
|
||||
sender: &OpenshiftClusterAlertSender,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn name(&self) -> String {
|
||||
self.name.clone()
|
||||
}
|
||||
|
||||
fn clone_box(&self) -> Box<dyn AlertReceiver<OpenshiftClusterAlertSender>> {
|
||||
Box::new(self.clone())
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
|
||||
self.get_receiver_config()
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl AlertReceiver<RHOBObservability> for DiscordWebhook {
|
||||
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn install(&self, sender: &RHOBObservability) -> Result<Outcome, InterpretError> {
|
||||
let ns = sender.namespace.clone();
|
||||
|
||||
let config = self.get_receiver_config()?;
|
||||
for resource in config.additional_ressources.iter() {
|
||||
todo!("can I apply a dynamicresource");
|
||||
// sender.client.apply(resource, Some(&ns)).await;
|
||||
}
|
||||
|
||||
let _ = sender.client.apply(&secret, Some(&ns)).await;
|
||||
let spec = crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::AlertmanagerConfigSpec {
|
||||
data: json!({
|
||||
"route": {
|
||||
"receiver": self.name,
|
||||
},
|
||||
"receivers": [
|
||||
config.receiver_config
|
||||
{
|
||||
"name": self.name,
|
||||
"discordConfigs": [
|
||||
{
|
||||
"apiURL": {
|
||||
"name": secret_name,
|
||||
"key": "webhook-url",
|
||||
},
|
||||
"title": "{{ template \"discord.default.title\" . }}",
|
||||
"message": "{{ template \"discord.default.message\" . }}"
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}),
|
||||
};
|
||||
@@ -185,9 +122,6 @@ impl AlertReceiver<RHOBObservability> for DiscordWebhook {
|
||||
|
||||
#[async_trait]
|
||||
impl AlertReceiver<CRDPrometheus> for DiscordWebhook {
|
||||
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
|
||||
todo!()
|
||||
}
|
||||
async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> {
|
||||
let ns = sender.namespace.clone();
|
||||
let secret_name = format!("{}-secret", self.name.clone());
|
||||
@@ -266,9 +200,6 @@ impl AlertReceiver<CRDPrometheus> for DiscordWebhook {
|
||||
|
||||
#[async_trait]
|
||||
impl AlertReceiver<Prometheus> for DiscordWebhook {
|
||||
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
|
||||
todo!()
|
||||
}
|
||||
async fn install(&self, sender: &Prometheus) -> Result<Outcome, InterpretError> {
|
||||
sender.install_receiver(self).await
|
||||
}
|
||||
@@ -295,9 +226,6 @@ impl PrometheusReceiver for DiscordWebhook {
|
||||
|
||||
#[async_trait]
|
||||
impl AlertReceiver<KubePrometheus> for DiscordWebhook {
|
||||
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
|
||||
todo!()
|
||||
}
|
||||
async fn install(&self, sender: &KubePrometheus) -> Result<Outcome, InterpretError> {
|
||||
sender.install_receiver(self).await
|
||||
}
|
||||
|
||||
@@ -19,7 +19,7 @@ use crate::{
|
||||
},
|
||||
prometheus::prometheus::{Prometheus, PrometheusReceiver},
|
||||
},
|
||||
topology::oberservability::monitoring::{AlertManagerReceiver, AlertReceiver},
|
||||
topology::oberservability::monitoring::AlertReceiver,
|
||||
};
|
||||
use harmony_types::net::Url;
|
||||
|
||||
@@ -31,9 +31,6 @@ pub struct WebhookReceiver {
|
||||
|
||||
#[async_trait]
|
||||
impl AlertReceiver<RHOBObservability> for WebhookReceiver {
|
||||
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
|
||||
todo!()
|
||||
}
|
||||
async fn install(&self, sender: &RHOBObservability) -> Result<Outcome, InterpretError> {
|
||||
let spec = crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::AlertmanagerConfigSpec {
|
||||
data: json!({
|
||||
@@ -100,9 +97,6 @@ impl AlertReceiver<RHOBObservability> for WebhookReceiver {
|
||||
|
||||
#[async_trait]
|
||||
impl AlertReceiver<CRDPrometheus> for WebhookReceiver {
|
||||
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
|
||||
todo!()
|
||||
}
|
||||
async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> {
|
||||
let spec = crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::AlertmanagerConfigSpec {
|
||||
data: json!({
|
||||
@@ -164,9 +158,6 @@ impl AlertReceiver<CRDPrometheus> for WebhookReceiver {
|
||||
|
||||
#[async_trait]
|
||||
impl AlertReceiver<Prometheus> for WebhookReceiver {
|
||||
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
|
||||
todo!()
|
||||
}
|
||||
async fn install(&self, sender: &Prometheus) -> Result<Outcome, InterpretError> {
|
||||
sender.install_receiver(self).await
|
||||
}
|
||||
@@ -193,9 +184,6 @@ impl PrometheusReceiver for WebhookReceiver {
|
||||
|
||||
#[async_trait]
|
||||
impl AlertReceiver<KubePrometheus> for WebhookReceiver {
|
||||
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
|
||||
todo!()
|
||||
}
|
||||
async fn install(&self, sender: &KubePrometheus) -> Result<Outcome, InterpretError> {
|
||||
sender.install_receiver(self).await
|
||||
}
|
||||
|
||||
@@ -1,214 +0,0 @@
|
||||
use base64::prelude::*;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::id::Id;
|
||||
use kube::api::DynamicObject;
|
||||
use log::{debug, info, trace};
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::{
|
||||
data::Version,
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
modules::monitoring::okd::OpenshiftClusterAlertSender,
|
||||
score::Score,
|
||||
topology::{K8sclient, Topology, oberservability::monitoring::AlertReceiver},
|
||||
};
|
||||
|
||||
impl Clone for Box<dyn AlertReceiver<OpenshiftClusterAlertSender>> {
|
||||
fn clone(&self) -> Self {
|
||||
self.clone_box()
|
||||
}
|
||||
}
|
||||
|
||||
impl Serialize for Box<dyn AlertReceiver<OpenshiftClusterAlertSender>> {
|
||||
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct OpenshiftClusterAlertScore {
|
||||
pub receivers: Vec<Box<dyn AlertReceiver<OpenshiftClusterAlertSender>>>,
|
||||
}
|
||||
|
||||
impl<T: Topology + K8sclient> Score<T> for OpenshiftClusterAlertScore {
|
||||
fn name(&self) -> String {
|
||||
"ClusterAlertScore".to_string()
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(OpenshiftClusterAlertInterpret {
|
||||
receivers: self.receivers.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct OpenshiftClusterAlertInterpret {
|
||||
receivers: Vec<Box<dyn AlertReceiver<OpenshiftClusterAlertSender>>>,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + K8sclient> Interpret<T> for OpenshiftClusterAlertInterpret {
|
||||
async fn execute(
|
||||
&self,
|
||||
_inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let client = topology.k8s_client().await?;
|
||||
let openshift_monitoring_namespace = "openshift-monitoring";
|
||||
|
||||
let mut alertmanager_main_secret: DynamicObject = client
|
||||
.get_secret_json_value("alertmanager-main", Some(openshift_monitoring_namespace))
|
||||
.await?;
|
||||
trace!("Got secret {alertmanager_main_secret:#?}");
|
||||
|
||||
let data: &mut serde_json::Value = &mut alertmanager_main_secret.data;
|
||||
trace!("Alertmanager-main secret data {data:#?}");
|
||||
let data_obj = data
|
||||
.get_mut("data")
|
||||
.ok_or(InterpretError::new(
|
||||
"Missing 'data' field in alertmanager-main secret.".to_string(),
|
||||
))?
|
||||
.as_object_mut()
|
||||
.ok_or(InterpretError::new(
|
||||
"'data' field in alertmanager-main secret is expected to be an object ."
|
||||
.to_string(),
|
||||
))?;
|
||||
|
||||
let config_b64 = data_obj
|
||||
.get("alertmanager.yaml")
|
||||
.ok_or(InterpretError::new(
|
||||
"Missing 'alertmanager.yaml' in alertmanager-main secret data".to_string(),
|
||||
))?
|
||||
.as_str()
|
||||
.unwrap_or("");
|
||||
trace!("Config base64 {config_b64}");
|
||||
|
||||
let config_bytes = BASE64_STANDARD.decode(config_b64).unwrap_or_default();
|
||||
|
||||
let mut am_config: serde_yaml::Value =
|
||||
serde_yaml::from_str(&String::from_utf8(config_bytes).unwrap_or_default())
|
||||
.unwrap_or_default();
|
||||
|
||||
debug!("Current alertmanager config {am_config:#?}");
|
||||
|
||||
let existing_receivers_sequence = if let Some(receivers) = am_config.get_mut("receivers") {
|
||||
match receivers.as_sequence_mut() {
|
||||
Some(seq) => seq,
|
||||
None => {
|
||||
return Err(InterpretError::new(format!(
|
||||
"Expected alertmanager config receivers to be a sequence, got {:?}",
|
||||
receivers
|
||||
)));
|
||||
}
|
||||
}
|
||||
} else {
|
||||
&mut serde_yaml::Sequence::default()
|
||||
};
|
||||
|
||||
let mut additional_resources = vec![];
|
||||
|
||||
for custom_receiver in &self.receivers {
|
||||
let name = custom_receiver.name();
|
||||
let alertmanager_receiver = custom_receiver.as_alertmanager_receiver()?;
|
||||
|
||||
let json_value = alertmanager_receiver.receiver_config;
|
||||
|
||||
let yaml_string = serde_json::to_string(&json_value).map_err(|e| {
|
||||
InterpretError::new(format!("Failed to serialize receiver config: {}", e))
|
||||
})?;
|
||||
|
||||
let yaml_value: serde_yaml::Value =
|
||||
serde_yaml::from_str(&yaml_string).map_err(|e| {
|
||||
InterpretError::new(format!("Failed to parse receiver config as YAML: {}", e))
|
||||
})?;
|
||||
|
||||
if let Some(idx) = existing_receivers_sequence.iter().position(|r| {
|
||||
r.get("name")
|
||||
.and_then(|n| n.as_str())
|
||||
.map_or(false, |n| n == name)
|
||||
}) {
|
||||
info!("Replacing existing AlertManager receiver: {}", name);
|
||||
existing_receivers_sequence[idx] = yaml_value;
|
||||
} else {
|
||||
debug!("Adding new AlertManager receiver: {}", name);
|
||||
existing_receivers_sequence.push(yaml_value);
|
||||
}
|
||||
|
||||
additional_resources.push(alertmanager_receiver.additional_ressources);
|
||||
}
|
||||
|
||||
debug!("Current alertmanager config {am_config:#?}");
|
||||
// TODO
|
||||
// - save new version of alertmanager config
|
||||
// - write additional ressources to the cluster
|
||||
let am_config = serde_yaml::to_string(&am_config).map_err(|e| {
|
||||
InterpretError::new(format!(
|
||||
"Failed to serialize new alertmanager config to string : {e}"
|
||||
))
|
||||
})?;
|
||||
|
||||
let mut am_config_b64 = String::new();
|
||||
BASE64_STANDARD.encode_string(am_config, &mut am_config_b64);
|
||||
|
||||
// TODO put update configmap value and save new value
|
||||
data_obj.insert(
|
||||
"alertmanager.yaml".to_string(),
|
||||
serde_json::Value::String(am_config_b64),
|
||||
);
|
||||
|
||||
// https://kubernetes.io/docs/reference/using-api/server-side-apply/#field-management
|
||||
alertmanager_main_secret.metadata.managed_fields = None;
|
||||
|
||||
trace!("Applying new alertmanager_main_secret {alertmanager_main_secret:#?}");
|
||||
client
|
||||
.apply_dynamic(
|
||||
&alertmanager_main_secret,
|
||||
Some(openshift_monitoring_namespace),
|
||||
true,
|
||||
)
|
||||
.await?;
|
||||
|
||||
let additional_resources = additional_resources.concat();
|
||||
trace!("Applying additional ressources for alert receivers {additional_resources:#?}");
|
||||
client
|
||||
.apply_dynamic_many(
|
||||
&additional_resources,
|
||||
Some(openshift_monitoring_namespace),
|
||||
true,
|
||||
)
|
||||
.await?;
|
||||
|
||||
Ok(Outcome::success(format!(
|
||||
"Successfully configured {} cluster alert receivers: {}",
|
||||
self.receivers.len(),
|
||||
self.receivers
|
||||
.iter()
|
||||
.map(|r| r.name())
|
||||
.collect::<Vec<_>>()
|
||||
.join(", ")
|
||||
)))
|
||||
}
|
||||
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("OpenshiftClusterAlertInterpret")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
@@ -1,90 +0,0 @@
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use crate::{
|
||||
interpret::{InterpretError, Outcome},
|
||||
topology::k8s::K8sClient,
|
||||
};
|
||||
use k8s_openapi::api::core::v1::ConfigMap;
|
||||
use kube::api::ObjectMeta;
|
||||
|
||||
pub(crate) struct Config;
|
||||
|
||||
impl Config {
|
||||
pub async fn create_cluster_monitoring_config_cm(
|
||||
client: &Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let mut data = BTreeMap::new();
|
||||
data.insert(
|
||||
"config.yaml".to_string(),
|
||||
r#"
|
||||
enableUserWorkload: true
|
||||
alertmanagerMain:
|
||||
enableUserAlertmanagerConfig: true
|
||||
"#
|
||||
.to_string(),
|
||||
);
|
||||
|
||||
let cm = ConfigMap {
|
||||
metadata: ObjectMeta {
|
||||
name: Some("cluster-monitoring-config".to_string()),
|
||||
namespace: Some("openshift-monitoring".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
data: Some(data),
|
||||
..Default::default()
|
||||
};
|
||||
client.apply(&cm, Some("openshift-monitoring")).await?;
|
||||
|
||||
Ok(Outcome::success(
|
||||
"updated cluster-monitoring-config-map".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn create_user_workload_monitoring_config_cm(
|
||||
client: &Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let mut data = BTreeMap::new();
|
||||
data.insert(
|
||||
"config.yaml".to_string(),
|
||||
r#"
|
||||
alertmanager:
|
||||
enabled: true
|
||||
enableAlertmanagerConfig: true
|
||||
"#
|
||||
.to_string(),
|
||||
);
|
||||
let cm = ConfigMap {
|
||||
metadata: ObjectMeta {
|
||||
name: Some("user-workload-monitoring-config".to_string()),
|
||||
namespace: Some("openshift-user-workload-monitoring".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
data: Some(data),
|
||||
..Default::default()
|
||||
};
|
||||
client
|
||||
.apply(&cm, Some("openshift-user-workload-monitoring"))
|
||||
.await?;
|
||||
|
||||
Ok(Outcome::success(
|
||||
"updated openshift-user-monitoring-config-map".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn verify_user_workload(client: &Arc<K8sClient>) -> Result<Outcome, InterpretError> {
|
||||
let namespace = "openshift-user-workload-monitoring";
|
||||
let alertmanager_name = "alertmanager-user-workload-0";
|
||||
let prometheus_name = "prometheus-user-workload-0";
|
||||
client
|
||||
.wait_for_pod_ready(alertmanager_name, Some(namespace))
|
||||
.await?;
|
||||
client
|
||||
.wait_for_pod_ready(prometheus_name, Some(namespace))
|
||||
.await?;
|
||||
|
||||
Ok(Outcome::success(format!(
|
||||
"pods: {}, {} ready in ns: {}",
|
||||
alertmanager_name, prometheus_name, namespace
|
||||
)))
|
||||
}
|
||||
}
|
||||
@@ -1,13 +1,16 @@
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use crate::{
|
||||
data::Version,
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
modules::monitoring::okd::config::Config,
|
||||
score::Score,
|
||||
topology::{K8sclient, Topology},
|
||||
topology::{K8sclient, Topology, k8s::K8sClient},
|
||||
};
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::id::Id;
|
||||
use k8s_openapi::api::core::v1::ConfigMap;
|
||||
use kube::api::ObjectMeta;
|
||||
use serde::Serialize;
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
@@ -34,9 +37,10 @@ impl<T: Topology + K8sclient> Interpret<T> for OpenshiftUserWorkloadMonitoringIn
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let client = topology.k8s_client().await.unwrap();
|
||||
Config::create_cluster_monitoring_config_cm(&client).await?;
|
||||
Config::create_user_workload_monitoring_config_cm(&client).await?;
|
||||
Config::verify_user_workload(&client).await?;
|
||||
self.update_cluster_monitoring_config_cm(&client).await?;
|
||||
self.update_user_workload_monitoring_config_cm(&client)
|
||||
.await?;
|
||||
self.verify_user_workload(&client).await?;
|
||||
Ok(Outcome::success(
|
||||
"successfully enabled user-workload-monitoring".to_string(),
|
||||
))
|
||||
@@ -58,3 +62,88 @@ impl<T: Topology + K8sclient> Interpret<T> for OpenshiftUserWorkloadMonitoringIn
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl OpenshiftUserWorkloadMonitoringInterpret {
|
||||
pub async fn update_cluster_monitoring_config_cm(
|
||||
&self,
|
||||
client: &Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let mut data = BTreeMap::new();
|
||||
data.insert(
|
||||
"config.yaml".to_string(),
|
||||
r#"
|
||||
enableUserWorkload: true
|
||||
alertmanagerMain:
|
||||
enableUserAlertmanagerConfig: true
|
||||
"#
|
||||
.to_string(),
|
||||
);
|
||||
|
||||
let cm = ConfigMap {
|
||||
metadata: ObjectMeta {
|
||||
name: Some("cluster-monitoring-config".to_string()),
|
||||
namespace: Some("openshift-monitoring".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
data: Some(data),
|
||||
..Default::default()
|
||||
};
|
||||
client.apply(&cm, Some("openshift-monitoring")).await?;
|
||||
|
||||
Ok(Outcome::success(
|
||||
"updated cluster-monitoring-config-map".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn update_user_workload_monitoring_config_cm(
|
||||
&self,
|
||||
client: &Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let mut data = BTreeMap::new();
|
||||
data.insert(
|
||||
"config.yaml".to_string(),
|
||||
r#"
|
||||
alertmanager:
|
||||
enabled: true
|
||||
enableAlertmanagerConfig: true
|
||||
"#
|
||||
.to_string(),
|
||||
);
|
||||
let cm = ConfigMap {
|
||||
metadata: ObjectMeta {
|
||||
name: Some("user-workload-monitoring-config".to_string()),
|
||||
namespace: Some("openshift-user-workload-monitoring".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
data: Some(data),
|
||||
..Default::default()
|
||||
};
|
||||
client
|
||||
.apply(&cm, Some("openshift-user-workload-monitoring"))
|
||||
.await?;
|
||||
|
||||
Ok(Outcome::success(
|
||||
"updated openshift-user-monitoring-config-map".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
pub async fn verify_user_workload(
|
||||
&self,
|
||||
client: &Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let namespace = "openshift-user-workload-monitoring";
|
||||
let alertmanager_name = "alertmanager-user-workload-0";
|
||||
let prometheus_name = "prometheus-user-workload-0";
|
||||
client
|
||||
.wait_for_pod_ready(alertmanager_name, Some(namespace))
|
||||
.await?;
|
||||
client
|
||||
.wait_for_pod_ready(prometheus_name, Some(namespace))
|
||||
.await?;
|
||||
|
||||
Ok(Outcome::success(format!(
|
||||
"pods: {}, {} ready in ns: {}",
|
||||
alertmanager_name, prometheus_name, namespace
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,14 +1 @@
|
||||
use crate::topology::oberservability::monitoring::AlertSender;
|
||||
|
||||
pub mod cluster_monitoring;
|
||||
pub(crate) mod config;
|
||||
pub mod enable_user_workload;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct OpenshiftClusterAlertSender;
|
||||
|
||||
impl AlertSender for OpenshiftClusterAlertSender {
|
||||
fn name(&self) -> String {
|
||||
"OpenshiftClusterAlertSender".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use kube::CustomResource;
|
||||
use k8s_openapi::{ClusterResourceScope, Resource};
|
||||
use kube::{CustomResource, api::ObjectMeta};
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use serde_json::Value;
|
||||
@@ -47,28 +48,223 @@ pub struct ProbeDns {
|
||||
group = "nmstate.io",
|
||||
version = "v1",
|
||||
kind = "NodeNetworkConfigurationPolicy",
|
||||
namespaced
|
||||
namespaced = false
|
||||
)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct NodeNetworkConfigurationPolicySpec {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub node_selector: Option<BTreeMap<String, String>>,
|
||||
pub desired_state: DesiredStateSpec,
|
||||
pub desired_state: NetworkState,
|
||||
}
|
||||
|
||||
// Currently, kube-rs derive doesn't support resources without a `spec` field, so we have
|
||||
// to implement it ourselves.
|
||||
//
|
||||
// Ref:
|
||||
// - https://github.com/kube-rs/kube/issues/1763
|
||||
// - https://github.com/kube-rs/kube/discussions/1762
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct NodeNetworkState {
|
||||
metadata: ObjectMeta,
|
||||
pub status: NodeNetworkStateStatus,
|
||||
}
|
||||
|
||||
impl Resource for NodeNetworkState {
|
||||
const API_VERSION: &'static str = "nmstate.io/v1beta1";
|
||||
const GROUP: &'static str = "nmstate.io";
|
||||
const VERSION: &'static str = "v1beta1";
|
||||
const KIND: &'static str = "NodeNetworkState";
|
||||
const URL_PATH_SEGMENT: &'static str = "nodenetworkstates";
|
||||
type Scope = ClusterResourceScope;
|
||||
}
|
||||
|
||||
impl k8s_openapi::Metadata for NodeNetworkState {
|
||||
type Ty = ObjectMeta;
|
||||
|
||||
fn metadata(&self) -> &Self::Ty {
|
||||
&self.metadata
|
||||
}
|
||||
|
||||
fn metadata_mut(&mut self) -> &mut Self::Ty {
|
||||
&mut self.metadata
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct NodeNetworkStateStatus {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub current_state: Option<NetworkState>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub handler_nmstate_version: Option<String>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub host_network_manager_version: Option<String>,
|
||||
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub last_successful_update_time: Option<String>,
|
||||
}
|
||||
|
||||
/// The NetworkState is the top-level struct, representing the entire
|
||||
/// desired or current network state.
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct DesiredStateSpec {
|
||||
pub interfaces: Vec<InterfaceSpec>,
|
||||
#[serde(deny_unknown_fields)]
|
||||
pub struct NetworkState {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub hostname: Option<HostNameState>,
|
||||
#[serde(rename = "dns-resolver", skip_serializing_if = "Option::is_none")]
|
||||
pub dns: Option<DnsState>,
|
||||
#[serde(rename = "route-rules", skip_serializing_if = "Option::is_none")]
|
||||
pub rules: Option<RouteRuleState>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub routes: Option<RouteState>,
|
||||
#[serde(default, skip_serializing_if = "Vec::is_empty")]
|
||||
pub interfaces: Vec<Interface>,
|
||||
#[serde(rename = "ovs-db", skip_serializing_if = "Option::is_none")]
|
||||
pub ovsdb: Option<OvsDbGlobalConfig>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub ovn: Option<OvnConfiguration>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct InterfaceSpec {
|
||||
pub struct HostNameState {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub running: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub config: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct DnsState {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub running: Option<DnsResolverConfig>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub config: Option<DnsResolverConfig>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct DnsResolverConfig {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub search: Option<Vec<String>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub server: Option<Vec<String>>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct RouteRuleState {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub config: Option<Vec<RouteRule>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub running: Option<Vec<RouteRule>>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct RouteState {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub config: Option<Vec<Route>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub running: Option<Vec<Route>>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct RouteRule {
|
||||
#[serde(rename = "ip-from", skip_serializing_if = "Option::is_none")]
|
||||
pub ip_from: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub priority: Option<u32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub route_table: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct Route {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub destination: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub metric: Option<u32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub next_hop_address: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub next_hop_interface: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub table_id: Option<u32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub mtu: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct OvsDbGlobalConfig {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub external_ids: Option<BTreeMap<String, String>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub other_config: Option<BTreeMap<String, String>>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct OvnConfiguration {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub bridge_mappings: Option<Vec<OvnBridgeMapping>>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct OvnBridgeMapping {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub localnet: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub bridge: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(untagged)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum StpSpec {
|
||||
Bool(bool),
|
||||
Options(StpOptions),
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct LldpState {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub enabled: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct OvsDb {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub external_ids: Option<BTreeMap<String, String>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub other_config: Option<BTreeMap<String, String>>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct PatchState {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub peer: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct Interface {
|
||||
pub name: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub description: Option<String>,
|
||||
pub r#type: String,
|
||||
pub r#type: InterfaceType,
|
||||
pub state: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub mac_address: Option<String>,
|
||||
@@ -99,9 +295,81 @@ pub struct InterfaceSpec {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub linux_bridge: Option<LinuxBridgeSpec>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
#[serde(alias = "bridge")]
|
||||
pub ovs_bridge: Option<OvsBridgeSpec>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub ethtool: Option<EthtoolSpec>,
|
||||
pub ethtool: Option<Value>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub accept_all_mac_addresses: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub identifier: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub lldp: Option<LldpState>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub permanent_mac_address: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub max_mtu: Option<u32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub min_mtu: Option<u32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub mptcp: Option<Value>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub profile_name: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub wait_ip: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub ovs_db: Option<OvsDb>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub driver: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub patch: Option<PatchState>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, PartialEq, Eq, PartialOrd, Ord, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub enum InterfaceType {
|
||||
#[serde(rename = "unknown")]
|
||||
Unknown,
|
||||
#[serde(rename = "dummy")]
|
||||
Dummy,
|
||||
#[serde(rename = "loopback")]
|
||||
Loopback,
|
||||
#[serde(rename = "linux-bridge")]
|
||||
LinuxBridge,
|
||||
#[serde(rename = "ovs-bridge")]
|
||||
OvsBridge,
|
||||
#[serde(rename = "ovs-interface")]
|
||||
OvsInterface,
|
||||
#[serde(rename = "bond")]
|
||||
Bond,
|
||||
#[serde(rename = "ipvlan")]
|
||||
IpVlan,
|
||||
#[serde(rename = "vlan")]
|
||||
Vlan,
|
||||
#[serde(rename = "vxlan")]
|
||||
Vxlan,
|
||||
#[serde(rename = "mac-vlan")]
|
||||
Macvlan,
|
||||
#[serde(rename = "mac-vtap")]
|
||||
Macvtap,
|
||||
#[serde(rename = "ethernet")]
|
||||
Ethernet,
|
||||
#[serde(rename = "infiniband")]
|
||||
Infiniband,
|
||||
#[serde(rename = "vrf")]
|
||||
Vrf,
|
||||
#[serde(rename = "veth")]
|
||||
Veth,
|
||||
#[serde(rename = "ipsec")]
|
||||
Ipsec,
|
||||
#[serde(rename = "hsr")]
|
||||
Hrs,
|
||||
}
|
||||
|
||||
impl Default for InterfaceType {
|
||||
fn default() -> Self {
|
||||
Self::Loopback
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
|
||||
@@ -149,6 +417,7 @@ pub struct EthernetSpec {
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct BondSpec {
|
||||
pub mode: String,
|
||||
#[serde(alias = "port")]
|
||||
pub ports: Vec<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub options: Option<BTreeMap<String, Value>>,
|
||||
@@ -287,11 +556,15 @@ pub struct OvsBridgeSpec {
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct OvsBridgeOptions {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub stp: Option<bool>,
|
||||
pub stp: Option<StpSpec>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub rstp: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub mcast_snooping_enable: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub datapath: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub fail_mode: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
|
||||
@@ -305,18 +578,3 @@ pub struct OvsPortSpec {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub r#type: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct EthtoolSpec {
|
||||
// TODO: Properly describe this spec (https://nmstate.io/devel/yaml_api.html#ethtool)
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
|
||||
#[serde(rename_all = "kebab-case")]
|
||||
pub struct EthtoolFecSpec {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub auto: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub mode: Option<String>,
|
||||
}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::id::Id;
|
||||
use log::{debug, info};
|
||||
use log::{info, warn};
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::{
|
||||
@@ -9,7 +9,7 @@ use crate::{
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
score::Score,
|
||||
topology::{HostNetworkConfig, NetworkInterface, Switch, SwitchPort, Topology},
|
||||
topology::{HostNetworkConfig, NetworkInterface, NetworkManager, Switch, SwitchPort, Topology},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
@@ -17,7 +17,7 @@ pub struct HostNetworkConfigurationScore {
|
||||
pub hosts: Vec<PhysicalHost>,
|
||||
}
|
||||
|
||||
impl<T: Topology + Switch> Score<T> for HostNetworkConfigurationScore {
|
||||
impl<T: Topology + NetworkManager + Switch> Score<T> for HostNetworkConfigurationScore {
|
||||
fn name(&self) -> String {
|
||||
"HostNetworkConfigurationScore".into()
|
||||
}
|
||||
@@ -35,7 +35,7 @@ pub struct HostNetworkConfigurationInterpret {
|
||||
}
|
||||
|
||||
impl HostNetworkConfigurationInterpret {
|
||||
async fn configure_network_for_host<T: Topology + Switch>(
|
||||
async fn configure_network_for_host<T: Topology + NetworkManager + Switch>(
|
||||
&self,
|
||||
topology: &T,
|
||||
host: &PhysicalHost,
|
||||
@@ -49,6 +49,13 @@ impl HostNetworkConfigurationInterpret {
|
||||
switch_ports: vec![],
|
||||
});
|
||||
}
|
||||
if host.network.len() == 1 {
|
||||
info!("[Host {current_host}/{total_hosts}] Only one interface to configure, skipping");
|
||||
return Ok(HostNetworkConfig {
|
||||
host_id: host.id.clone(),
|
||||
switch_ports: vec![],
|
||||
});
|
||||
}
|
||||
|
||||
let switch_ports = self
|
||||
.collect_switch_ports_for_host(topology, host, current_host, total_hosts)
|
||||
@@ -59,7 +66,7 @@ impl HostNetworkConfigurationInterpret {
|
||||
switch_ports,
|
||||
};
|
||||
|
||||
if !config.switch_ports.is_empty() {
|
||||
if config.switch_ports.len() > 1 {
|
||||
info!(
|
||||
"[Host {current_host}/{total_hosts}] Found {} ports for {} interfaces",
|
||||
config.switch_ports.len(),
|
||||
@@ -67,15 +74,25 @@ impl HostNetworkConfigurationInterpret {
|
||||
);
|
||||
|
||||
info!("[Host {current_host}/{total_hosts}] Configuring host network...");
|
||||
topology.configure_bond(&config).await.map_err(|e| {
|
||||
InterpretError::new(format!("Failed to configure host network: {e}"))
|
||||
})?;
|
||||
topology
|
||||
.configure_host_network(&config)
|
||||
.configure_port_channel(&config)
|
||||
.await
|
||||
.map_err(|e| InterpretError::new(format!("Failed to configure host: {e}")))?;
|
||||
} else {
|
||||
.map_err(|e| {
|
||||
InterpretError::new(format!("Failed to configure host network: {e}"))
|
||||
})?;
|
||||
} else if config.switch_ports.is_empty() {
|
||||
info!(
|
||||
"[Host {current_host}/{total_hosts}] No ports found for {} interfaces, skipping",
|
||||
host.network.len()
|
||||
);
|
||||
} else {
|
||||
warn!(
|
||||
"[Host {current_host}/{total_hosts}] Found a single port for {} interfaces, skipping",
|
||||
host.network.len()
|
||||
);
|
||||
}
|
||||
|
||||
Ok(config)
|
||||
@@ -113,7 +130,7 @@ impl HostNetworkConfigurationInterpret {
|
||||
port,
|
||||
});
|
||||
}
|
||||
Ok(None) => debug!("No port found for '{mac_address}', skipping"),
|
||||
Ok(None) => {}
|
||||
Err(e) => {
|
||||
return Err(InterpretError::new(format!(
|
||||
"Failed to get port for host '{}': {}",
|
||||
@@ -133,15 +150,6 @@ impl HostNetworkConfigurationInterpret {
|
||||
];
|
||||
|
||||
for config in configs {
|
||||
let host = self
|
||||
.score
|
||||
.hosts
|
||||
.iter()
|
||||
.find(|h| h.id == config.host_id)
|
||||
.unwrap();
|
||||
|
||||
println!("[Host] {host}");
|
||||
|
||||
if config.switch_ports.is_empty() {
|
||||
report.push(format!(
|
||||
"⏭️ Host {}: SKIPPED (No matching switch ports found)",
|
||||
@@ -169,7 +177,7 @@ impl HostNetworkConfigurationInterpret {
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + Switch> Interpret<T> for HostNetworkConfigurationInterpret {
|
||||
impl<T: Topology + NetworkManager + Switch> Interpret<T> for HostNetworkConfigurationInterpret {
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("HostNetworkConfigurationInterpret")
|
||||
}
|
||||
@@ -198,6 +206,12 @@ impl<T: Topology + Switch> Interpret<T> for HostNetworkConfigurationInterpret {
|
||||
let host_count = self.score.hosts.len();
|
||||
info!("Started network configuration for {host_count} host(s)...",);
|
||||
|
||||
info!("Setting up NetworkManager...",);
|
||||
topology
|
||||
.ensure_network_manager_installed()
|
||||
.await
|
||||
.map_err(|e| InterpretError::new(format!("NetworkManager setup failed: {e}")))?;
|
||||
|
||||
info!("Setting up switch with sane defaults...");
|
||||
topology
|
||||
.setup_switch()
|
||||
@@ -216,6 +230,7 @@ impl<T: Topology + Switch> Interpret<T> for HostNetworkConfigurationInterpret {
|
||||
host_configurations.push(host_configuration);
|
||||
current_host += 1;
|
||||
}
|
||||
|
||||
if current_host > 1 {
|
||||
let details = self.format_host_configuration(host_configurations);
|
||||
|
||||
@@ -242,7 +257,8 @@ mod tests {
|
||||
use crate::{
|
||||
hardware::HostCategory,
|
||||
topology::{
|
||||
HostNetworkConfig, PreparationError, PreparationOutcome, SwitchError, SwitchPort,
|
||||
HostNetworkConfig, NetworkError, PreparationError, PreparationOutcome, SwitchError,
|
||||
SwitchPort,
|
||||
},
|
||||
};
|
||||
use std::{
|
||||
@@ -267,6 +283,18 @@ mod tests {
|
||||
speed_mbps: None,
|
||||
mtu: 1,
|
||||
};
|
||||
pub static ref YET_ANOTHER_EXISTING_INTERFACE: NetworkInterface = NetworkInterface {
|
||||
mac_address: MacAddress::try_from("AA:BB:CC:DD:EE:F3".to_string()).unwrap(),
|
||||
name: "interface-3".into(),
|
||||
speed_mbps: None,
|
||||
mtu: 1,
|
||||
};
|
||||
pub static ref LAST_EXISTING_INTERFACE: NetworkInterface = NetworkInterface {
|
||||
mac_address: MacAddress::try_from("AA:BB:CC:DD:EE:F4".to_string()).unwrap(),
|
||||
name: "interface-4".into(),
|
||||
speed_mbps: None,
|
||||
mtu: 1,
|
||||
};
|
||||
pub static ref UNKNOWN_INTERFACE: NetworkInterface = NetworkInterface {
|
||||
mac_address: MacAddress::try_from("11:22:33:44:55:61".to_string()).unwrap(),
|
||||
name: "unknown-interface".into(),
|
||||
@@ -275,6 +303,8 @@ mod tests {
|
||||
};
|
||||
pub static ref PORT: PortLocation = PortLocation(1, 0, 42);
|
||||
pub static ref ANOTHER_PORT: PortLocation = PortLocation(2, 0, 42);
|
||||
pub static ref YET_ANOTHER_PORT: PortLocation = PortLocation(1, 0, 45);
|
||||
pub static ref LAST_PORT: PortLocation = PortLocation(2, 0, 45);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
@@ -290,28 +320,33 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn host_with_one_mac_address_should_create_bond_with_one_interface() {
|
||||
async fn should_setup_network_manager() {
|
||||
let host = given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]);
|
||||
let score = given_score(vec![host]);
|
||||
let topology = TopologyWithSwitch::new();
|
||||
|
||||
let _ = score.interpret(&Inventory::empty(), &topology).await;
|
||||
|
||||
let configured_host_networks = topology.configured_host_networks.lock().unwrap();
|
||||
assert_that!(*configured_host_networks).contains_exactly(vec![(
|
||||
HOST_ID.clone(),
|
||||
HostNetworkConfig {
|
||||
host_id: HOST_ID.clone(),
|
||||
switch_ports: vec![SwitchPort {
|
||||
interface: EXISTING_INTERFACE.clone(),
|
||||
port: PORT.clone(),
|
||||
}],
|
||||
},
|
||||
)]);
|
||||
let network_manager_setup = topology.network_manager_setup.lock().unwrap();
|
||||
assert_that!(*network_manager_setup).is_true();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn host_with_multiple_mac_addresses_should_create_one_bond_with_all_interfaces() {
|
||||
async fn host_with_one_mac_address_should_skip_host_configuration() {
|
||||
let host = given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]);
|
||||
let score = given_score(vec![host]);
|
||||
let topology = TopologyWithSwitch::new();
|
||||
|
||||
let _ = score.interpret(&Inventory::empty(), &topology).await;
|
||||
|
||||
let config = topology.configured_bonds.lock().unwrap();
|
||||
assert_that!(*config).is_empty();
|
||||
let config = topology.configured_port_channels.lock().unwrap();
|
||||
assert_that!(*config).is_empty();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn host_with_multiple_mac_addresses_should_configure_one_bond_with_all_interfaces() {
|
||||
let score = given_score(vec![given_host(
|
||||
&HOST_ID,
|
||||
vec![
|
||||
@@ -323,8 +358,8 @@ mod tests {
|
||||
|
||||
let _ = score.interpret(&Inventory::empty(), &topology).await;
|
||||
|
||||
let configured_host_networks = topology.configured_host_networks.lock().unwrap();
|
||||
assert_that!(*configured_host_networks).contains_exactly(vec![(
|
||||
let config = topology.configured_bonds.lock().unwrap();
|
||||
assert_that!(*config).contains_exactly(vec![(
|
||||
HOST_ID.clone(),
|
||||
HostNetworkConfig {
|
||||
host_id: HOST_ID.clone(),
|
||||
@@ -343,49 +378,183 @@ mod tests {
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multiple_hosts_should_create_one_bond_per_host() {
|
||||
async fn host_with_multiple_mac_addresses_should_configure_one_port_channel_with_all_interfaces()
|
||||
{
|
||||
let score = given_score(vec![given_host(
|
||||
&HOST_ID,
|
||||
vec![
|
||||
EXISTING_INTERFACE.clone(),
|
||||
ANOTHER_EXISTING_INTERFACE.clone(),
|
||||
],
|
||||
)]);
|
||||
let topology = TopologyWithSwitch::new();
|
||||
|
||||
let _ = score.interpret(&Inventory::empty(), &topology).await;
|
||||
|
||||
let config = topology.configured_port_channels.lock().unwrap();
|
||||
assert_that!(*config).contains_exactly(vec![(
|
||||
HOST_ID.clone(),
|
||||
HostNetworkConfig {
|
||||
host_id: HOST_ID.clone(),
|
||||
switch_ports: vec![
|
||||
SwitchPort {
|
||||
interface: EXISTING_INTERFACE.clone(),
|
||||
port: PORT.clone(),
|
||||
},
|
||||
SwitchPort {
|
||||
interface: ANOTHER_EXISTING_INTERFACE.clone(),
|
||||
port: ANOTHER_PORT.clone(),
|
||||
},
|
||||
],
|
||||
},
|
||||
)]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn multiple_hosts_should_configure_one_bond_per_host() {
|
||||
let score = given_score(vec![
|
||||
given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]),
|
||||
given_host(&ANOTHER_HOST_ID, vec![ANOTHER_EXISTING_INTERFACE.clone()]),
|
||||
given_host(
|
||||
&HOST_ID,
|
||||
vec![
|
||||
EXISTING_INTERFACE.clone(),
|
||||
ANOTHER_EXISTING_INTERFACE.clone(),
|
||||
],
|
||||
),
|
||||
given_host(
|
||||
&ANOTHER_HOST_ID,
|
||||
vec![
|
||||
YET_ANOTHER_EXISTING_INTERFACE.clone(),
|
||||
LAST_EXISTING_INTERFACE.clone(),
|
||||
],
|
||||
),
|
||||
]);
|
||||
let topology = TopologyWithSwitch::new();
|
||||
|
||||
let _ = score.interpret(&Inventory::empty(), &topology).await;
|
||||
|
||||
let configured_host_networks = topology.configured_host_networks.lock().unwrap();
|
||||
assert_that!(*configured_host_networks).contains_exactly(vec![
|
||||
let config = topology.configured_bonds.lock().unwrap();
|
||||
assert_that!(*config).contains_exactly(vec![
|
||||
(
|
||||
HOST_ID.clone(),
|
||||
HostNetworkConfig {
|
||||
host_id: HOST_ID.clone(),
|
||||
switch_ports: vec![SwitchPort {
|
||||
interface: EXISTING_INTERFACE.clone(),
|
||||
port: PORT.clone(),
|
||||
}],
|
||||
switch_ports: vec![
|
||||
SwitchPort {
|
||||
interface: EXISTING_INTERFACE.clone(),
|
||||
port: PORT.clone(),
|
||||
},
|
||||
SwitchPort {
|
||||
interface: ANOTHER_EXISTING_INTERFACE.clone(),
|
||||
port: ANOTHER_PORT.clone(),
|
||||
},
|
||||
],
|
||||
},
|
||||
),
|
||||
(
|
||||
ANOTHER_HOST_ID.clone(),
|
||||
HostNetworkConfig {
|
||||
host_id: ANOTHER_HOST_ID.clone(),
|
||||
switch_ports: vec![SwitchPort {
|
||||
interface: ANOTHER_EXISTING_INTERFACE.clone(),
|
||||
port: ANOTHER_PORT.clone(),
|
||||
}],
|
||||
switch_ports: vec![
|
||||
SwitchPort {
|
||||
interface: YET_ANOTHER_EXISTING_INTERFACE.clone(),
|
||||
port: YET_ANOTHER_PORT.clone(),
|
||||
},
|
||||
SwitchPort {
|
||||
interface: LAST_EXISTING_INTERFACE.clone(),
|
||||
port: LAST_PORT.clone(),
|
||||
},
|
||||
],
|
||||
},
|
||||
),
|
||||
]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn port_not_found_for_mac_address_should_not_configure_interface() {
|
||||
async fn multiple_hosts_should_configure_one_port_channel_per_host() {
|
||||
let score = given_score(vec![
|
||||
given_host(
|
||||
&HOST_ID,
|
||||
vec![
|
||||
EXISTING_INTERFACE.clone(),
|
||||
ANOTHER_EXISTING_INTERFACE.clone(),
|
||||
],
|
||||
),
|
||||
given_host(
|
||||
&ANOTHER_HOST_ID,
|
||||
vec![
|
||||
YET_ANOTHER_EXISTING_INTERFACE.clone(),
|
||||
LAST_EXISTING_INTERFACE.clone(),
|
||||
],
|
||||
),
|
||||
]);
|
||||
let topology = TopologyWithSwitch::new();
|
||||
|
||||
let _ = score.interpret(&Inventory::empty(), &topology).await;
|
||||
|
||||
let config = topology.configured_port_channels.lock().unwrap();
|
||||
assert_that!(*config).contains_exactly(vec![
|
||||
(
|
||||
HOST_ID.clone(),
|
||||
HostNetworkConfig {
|
||||
host_id: HOST_ID.clone(),
|
||||
switch_ports: vec![
|
||||
SwitchPort {
|
||||
interface: EXISTING_INTERFACE.clone(),
|
||||
port: PORT.clone(),
|
||||
},
|
||||
SwitchPort {
|
||||
interface: ANOTHER_EXISTING_INTERFACE.clone(),
|
||||
port: ANOTHER_PORT.clone(),
|
||||
},
|
||||
],
|
||||
},
|
||||
),
|
||||
(
|
||||
ANOTHER_HOST_ID.clone(),
|
||||
HostNetworkConfig {
|
||||
host_id: ANOTHER_HOST_ID.clone(),
|
||||
switch_ports: vec![
|
||||
SwitchPort {
|
||||
interface: YET_ANOTHER_EXISTING_INTERFACE.clone(),
|
||||
port: YET_ANOTHER_PORT.clone(),
|
||||
},
|
||||
SwitchPort {
|
||||
interface: LAST_EXISTING_INTERFACE.clone(),
|
||||
port: LAST_PORT.clone(),
|
||||
},
|
||||
],
|
||||
},
|
||||
),
|
||||
]);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn port_not_found_for_mac_address_should_not_configure_host() {
|
||||
let score = given_score(vec![given_host(&HOST_ID, vec![UNKNOWN_INTERFACE.clone()])]);
|
||||
let topology = TopologyWithSwitch::new_port_not_found();
|
||||
|
||||
let _ = score.interpret(&Inventory::empty(), &topology).await;
|
||||
|
||||
let configured_host_networks = topology.configured_host_networks.lock().unwrap();
|
||||
assert_that!(*configured_host_networks).is_empty();
|
||||
let config = topology.configured_port_channels.lock().unwrap();
|
||||
assert_that!(*config).is_empty();
|
||||
let config = topology.configured_bonds.lock().unwrap();
|
||||
assert_that!(*config).is_empty();
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn only_one_port_found_for_multiple_mac_addresses_should_not_configure_host() {
|
||||
let score = given_score(vec![given_host(
|
||||
&HOST_ID,
|
||||
vec![EXISTING_INTERFACE.clone(), UNKNOWN_INTERFACE.clone()],
|
||||
)]);
|
||||
let topology = TopologyWithSwitch::new_single_port_found();
|
||||
|
||||
let _ = score.interpret(&Inventory::empty(), &topology).await;
|
||||
|
||||
let config = topology.configured_port_channels.lock().unwrap();
|
||||
assert_that!(*config).is_empty();
|
||||
let config = topology.configured_bonds.lock().unwrap();
|
||||
assert_that!(*config).is_empty();
|
||||
}
|
||||
|
||||
fn given_score(hosts: Vec<PhysicalHost>) -> HostNetworkConfigurationScore {
|
||||
@@ -422,26 +591,48 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct TopologyWithSwitch {
|
||||
available_ports: Arc<Mutex<Vec<PortLocation>>>,
|
||||
configured_host_networks: Arc<Mutex<Vec<(Id, HostNetworkConfig)>>>,
|
||||
configured_port_channels: Arc<Mutex<Vec<(Id, HostNetworkConfig)>>>,
|
||||
switch_setup: Arc<Mutex<bool>>,
|
||||
network_manager_setup: Arc<Mutex<bool>>,
|
||||
configured_bonds: Arc<Mutex<Vec<(Id, HostNetworkConfig)>>>,
|
||||
}
|
||||
|
||||
impl TopologyWithSwitch {
|
||||
fn new() -> Self {
|
||||
Self {
|
||||
available_ports: Arc::new(Mutex::new(vec![PORT.clone(), ANOTHER_PORT.clone()])),
|
||||
configured_host_networks: Arc::new(Mutex::new(vec![])),
|
||||
available_ports: Arc::new(Mutex::new(vec![
|
||||
PORT.clone(),
|
||||
ANOTHER_PORT.clone(),
|
||||
YET_ANOTHER_PORT.clone(),
|
||||
LAST_PORT.clone(),
|
||||
])),
|
||||
configured_port_channels: Arc::new(Mutex::new(vec![])),
|
||||
switch_setup: Arc::new(Mutex::new(false)),
|
||||
network_manager_setup: Arc::new(Mutex::new(false)),
|
||||
configured_bonds: Arc::new(Mutex::new(vec![])),
|
||||
}
|
||||
}
|
||||
|
||||
fn new_port_not_found() -> Self {
|
||||
Self {
|
||||
available_ports: Arc::new(Mutex::new(vec![])),
|
||||
configured_host_networks: Arc::new(Mutex::new(vec![])),
|
||||
configured_port_channels: Arc::new(Mutex::new(vec![])),
|
||||
switch_setup: Arc::new(Mutex::new(false)),
|
||||
network_manager_setup: Arc::new(Mutex::new(false)),
|
||||
configured_bonds: Arc::new(Mutex::new(vec![])),
|
||||
}
|
||||
}
|
||||
|
||||
fn new_single_port_found() -> Self {
|
||||
Self {
|
||||
available_ports: Arc::new(Mutex::new(vec![PORT.clone()])),
|
||||
configured_port_channels: Arc::new(Mutex::new(vec![])),
|
||||
switch_setup: Arc::new(Mutex::new(false)),
|
||||
network_manager_setup: Arc::new(Mutex::new(false)),
|
||||
configured_bonds: Arc::new(Mutex::new(vec![])),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -457,6 +648,22 @@ mod tests {
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl NetworkManager for TopologyWithSwitch {
|
||||
async fn ensure_network_manager_installed(&self) -> Result<(), NetworkError> {
|
||||
let mut network_manager_installed = self.network_manager_setup.lock().unwrap();
|
||||
*network_manager_installed = true;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), NetworkError> {
|
||||
let mut configured_bonds = self.configured_bonds.lock().unwrap();
|
||||
configured_bonds.push((config.host_id.clone(), config.clone()));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl Switch for TopologyWithSwitch {
|
||||
async fn setup_switch(&self) -> Result<(), SwitchError> {
|
||||
@@ -476,12 +683,12 @@ mod tests {
|
||||
Ok(Some(ports.remove(0)))
|
||||
}
|
||||
|
||||
async fn configure_host_network(
|
||||
async fn configure_port_channel(
|
||||
&self,
|
||||
config: &HostNetworkConfig,
|
||||
) -> Result<(), SwitchError> {
|
||||
let mut configured_host_networks = self.configured_host_networks.lock().unwrap();
|
||||
configured_host_networks.push((config.host_id.clone(), config.clone()));
|
||||
let mut configured_port_channels = self.configured_port_channels.lock().unwrap();
|
||||
configured_port_channels.push((config.host_id.clone(), config.clone()));
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
81
harmony/src/modules/postgresql/capability.rs
Normal file
81
harmony/src/modules/postgresql/capability.rs
Normal file
@@ -0,0 +1,81 @@
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::storage::StorageSize;
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[async_trait]
|
||||
pub trait PostgreSQL {
|
||||
async fn deploy(&self, config: &PostgreSQLConfig) -> Result<String, String>;
|
||||
|
||||
/// Extracts PostgreSQL-specific replication certs (PEM format) from a deployed primary cluster.
|
||||
/// Abstracts away storage/retrieval details (e.g., secrets, files).
|
||||
async fn get_replication_certs(&self, cluster_name: &str) -> Result<ReplicationCerts, String>;
|
||||
|
||||
/// Gets the internal/private endpoint (e.g., k8s service FQDN:5432) for the cluster.
|
||||
async fn get_endpoint(&self, cluster_name: &str) -> Result<PostgreSQLEndpoint, String>;
|
||||
|
||||
/// Gets the public/externally routable endpoint if configured (e.g., OKD Route:443 for TLS passthrough).
|
||||
/// Returns None if no public endpoint (internal-only cluster).
|
||||
/// UNSTABLE: This is opinionated for initial multisite use cases. Networking abstraction is complex
|
||||
/// (cf. k8s Ingress -> Gateway API evolution); may move to higher-order Networking/PostgreSQLNetworking trait.
|
||||
async fn get_public_endpoint(&self, cluster_name: &str) -> Result<Option<PostgreSQLEndpoint>, String>;
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PostgreSQLConfig {
|
||||
pub cluster_name: String,
|
||||
pub instances: u32,
|
||||
pub storage_size: StorageSize,
|
||||
pub role: PostgreSQLClusterRole,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum PostgreSQLClusterRole {
|
||||
Primary,
|
||||
Replica(ReplicaClusterConfig),
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ReplicaConfig {
|
||||
/// Name of the primary cluster this replica will sync from
|
||||
pub primary_cluster_name: String,
|
||||
/// Certs extracted from primary via Topology::get_replication_certs()
|
||||
pub replication_certs: ReplicationCerts,
|
||||
/// Bootstrap method (e.g., pg_basebackup from primary)
|
||||
pub bootstrap: BootstrapConfig,
|
||||
/// External cluster connection details for CNPG spec.externalClusters
|
||||
pub external_cluster: ExternalClusterConfig,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct BootstrapConfig {
|
||||
pub strategy: BootstrapStrategy,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub enum BootstrapStrategy {
|
||||
PgBasebackup,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ExternalClusterConfig {
|
||||
/// Name used in CNPG externalClusters list
|
||||
pub name: String,
|
||||
/// Connection params (host/port set by multisite logic, sslmode='verify-ca', etc.)
|
||||
pub connection_parameters: HashMap<String, String>,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct ReplicationCerts {
|
||||
/// PEM-encoded CA cert from primary
|
||||
pub ca_cert_pem: String,
|
||||
/// PEM-encoded streaming_replica client cert (tls.crt)
|
||||
pub streaming_replica_cert_pem: String,
|
||||
/// PEM-encoded streaming_replica client key (tls.key)
|
||||
pub streaming_replica_key_pem: String,
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct PostgreSQLEndpoint {
|
||||
pub host: String,
|
||||
pub port: u16,
|
||||
}
|
||||
7
harmony/src/modules/postgresql/mod.rs
Normal file
7
harmony/src/modules/postgresql/mod.rs
Normal file
@@ -0,0 +1,7 @@
|
||||
|
||||
pub mod capability;
|
||||
mod score;
|
||||
|
||||
|
||||
pub mod failover;
|
||||
|
||||
236
harmony/src/modules/postgresql/score.rs
Normal file
236
harmony/src/modules/postgresql/score.rs
Normal file
@@ -0,0 +1,236 @@
|
||||
use crate::{
|
||||
domain::{data::Version, interpret::InterpretStatus},
|
||||
interpret::{Interpret, InterpretError, InterpretName, Outcome},
|
||||
inventory::Inventory,
|
||||
modules::postgresql::capability::PostgreSQL,
|
||||
score::Score,
|
||||
topology::Topology,
|
||||
};
|
||||
|
||||
use super::capability::*;
|
||||
|
||||
use derive_new::new;
|
||||
use harmony_types::{id::Id, storage::StorageSize};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::info;
|
||||
use serde::Serialize;
|
||||
|
||||
pub struct PostgreSQLScore {
|
||||
config: PostgreSQLConfig,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct PostgreSQLInterpret {
|
||||
config: PostgreSQLConfig,
|
||||
version: Version,
|
||||
status: InterpretStatus,
|
||||
}
|
||||
|
||||
impl PostgreSQLInterpret {
|
||||
pub fn new(config: PostgreSQLConfig) -> Self {
|
||||
let version = Version::from("1.0.0").expect("Version should be valid");
|
||||
Self {
|
||||
config,
|
||||
version,
|
||||
status: InterpretStatus::QUEUED,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Topology + PostgreSQL> Score<T> for PostgreSQLScore {
|
||||
fn name(&self) -> String {
|
||||
"PostgreSQLScore".to_string()
|
||||
}
|
||||
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(PostgreSQLInterpret::new(self.config.clone()))
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + PostgreSQL> Interpret<T> for PostgreSQLInterpret {
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("PostgreSQLInterpret")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> crate::domain::data::Version {
|
||||
self.version.clone()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
self.status.clone()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn execute(
|
||||
&self,
|
||||
_inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!(
|
||||
"Executing PostgreSQLInterpret with config {:?}",
|
||||
self.config
|
||||
);
|
||||
|
||||
let cluster_name = topology
|
||||
.deploy(&self.config)
|
||||
.await
|
||||
.map_err(|e| InterpretError::from(e))?;
|
||||
|
||||
Ok(Outcome::success(format!(
|
||||
"Deployed PostgreSQL cluster `{cluster_name}`"
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, new, Clone, Serialize)]
|
||||
pub struct MultisitePostgreSQLScore {
|
||||
pub cluster_name: String,
|
||||
pub primary_site: Id,
|
||||
pub replica_sites: Vec<Id>,
|
||||
pub instances: u32,
|
||||
pub storage_size: StorageSize,
|
||||
}
|
||||
|
||||
impl<T: FailoverTopology + crate::modules::postgresql::capability::PostgreSQL> Score<T> for MultisitePostgreSQLScore {
|
||||
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(MultisitePostgreSQLInterpret::new(self.clone()))
|
||||
}
|
||||
|
||||
fn name(&self) -> String {
|
||||
"MultisitePostgreSQLScore".to_string()
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct MultisitePostgreSQLInterpret {
|
||||
score: MultisitePostgreSQLScore,
|
||||
version: Version,
|
||||
status: InterpretStatus,
|
||||
}
|
||||
|
||||
impl MultisitePostgreSQLInterpret {
|
||||
pub fn new(score: MultisitePostgreSQLScore) -> Self {
|
||||
let version = Version::from("1.0.0").expect("Version should be valid");
|
||||
Self {
|
||||
score,
|
||||
version,
|
||||
status: InterpretStatus::QUEUED,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: MultisiteTopology + PostgreSQL> Interpret<T> for MultisitePostgreSQLInterpret {
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("MultisitePostgreSQLInterpret")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
self.version.clone()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
self.status.clone()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!("Track child interprets per site")
|
||||
}
|
||||
|
||||
async fn execute(
|
||||
&self,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
|
||||
info!(
|
||||
"Orchestrating multisite PostgreSQL: primary {:?}, replicas {:?}",
|
||||
self.score.primary_site, self.score.replica_sites
|
||||
);
|
||||
|
||||
// 1. Deploy primary
|
||||
let primary_topo = topology.primary();
|
||||
|
||||
let primary_config = PostgreSQLConfig {
|
||||
cluster_name: self.score.cluster_name.clone(),
|
||||
instances: self.score.instances,
|
||||
storage_size: self.score.storage_size.clone(),
|
||||
role: ClusterRole::Primary,
|
||||
};
|
||||
let primary_cluster_name = primary_topo
|
||||
.deploy(&primary_config)
|
||||
.await
|
||||
.map_err(|e| InterpretError::from(format!("Primary deploy failed: {e}")))?;
|
||||
|
||||
// 2. Extract certs & public endpoint from primary
|
||||
let certs = primary_topo
|
||||
.get_replication_certs(&primary_cluster_name)
|
||||
.await
|
||||
.map_err(|e| InterpretError::from(format!("Certs extract failed: {e}")))?;
|
||||
let public_endpoint = primary_topo
|
||||
.get_public_endpoint(&primary_cluster_name)
|
||||
.await??
|
||||
.ok_or_else(|| InterpretError::from("No public endpoint on primary"))?;
|
||||
|
||||
// 3. Deploy replicas
|
||||
for replica_site in &self.score.replica_sites {
|
||||
let replica_topo = topology.replica();
|
||||
|
||||
.map_err(|e| {
|
||||
InterpretError::from(format!(
|
||||
"Replica site {:?} lookup failed: {e}",
|
||||
replica_site
|
||||
))
|
||||
})?;
|
||||
|
||||
let connection_params: HashMap<String, String> = [
|
||||
("host".to_string(), public_endpoint.host.clone()),
|
||||
("port".to_string(), public_endpoint.port.to_string()),
|
||||
("dbname".to_string(), "postgres".to_string()),
|
||||
("user".to_string(), "streaming_replica".to_string()),
|
||||
("sslmode".to_string(), "verify-ca".to_string()),
|
||||
("sslnegotiation".to_string(), "direct".to_string()),
|
||||
]
|
||||
.into_iter()
|
||||
.collect();
|
||||
|
||||
let external_cluster = ExternalClusterConfig {
|
||||
name: "primary-cluster".to_string(),
|
||||
connection_parameters: connection_params,
|
||||
};
|
||||
|
||||
let replica_config_struct = ReplicaConfig {
|
||||
primary_cluster_name: primary_cluster_name.clone(),
|
||||
replication_certs: certs.clone(),
|
||||
bootstrap: BootstrapConfig {
|
||||
strategy: BootstrapStrategy::PgBasebackup,
|
||||
},
|
||||
external_cluster,
|
||||
};
|
||||
|
||||
let replica_config = PostgreSQLConfig {
|
||||
cluster_name: format!("{}-replica-{}", self.score.cluster_name, replica_site),
|
||||
instances: self.score.instances,
|
||||
storage_size: self.score.storage_size.clone(),
|
||||
role: ClusterRole::Replica(replica_config_struct),
|
||||
};
|
||||
|
||||
let _replica_cluster = replica_topo.deploy(&replica_config).await.map_err(|e| {
|
||||
InterpretError::from(format!("Replica {:?} deploy failed: {e}", replica_site))
|
||||
})?;
|
||||
}
|
||||
|
||||
Ok(Outcome::success(format!(
|
||||
"Multisite PostgreSQL `{}` deployed: primary `{}`, {} replicas",
|
||||
self.score.cluster_name,
|
||||
primary_cluster_name,
|
||||
self.score.replica_sites.len()
|
||||
)))
|
||||
}
|
||||
}
|
||||
@@ -1,3 +1,4 @@
|
||||
pub mod id;
|
||||
pub mod net;
|
||||
pub mod switch;
|
||||
pub mod storage;
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
|
||||
pub struct MacAddress(pub [u8; 6]);
|
||||
|
||||
impl MacAddress {
|
||||
@@ -19,6 +19,14 @@ impl From<&MacAddress> for String {
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for MacAddress {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_tuple("MacAddress")
|
||||
.field(&String::from(self))
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Display for MacAddress {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_str(&String::from(self))
|
||||
|
||||
6
harmony_types/src/storage.rs
Normal file
6
harmony_types/src/storage.rs
Normal file
@@ -0,0 +1,6 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Copy, Clone, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord, Debug)]
|
||||
pub struct StorageSize {
|
||||
size_bytes: u64,
|
||||
}
|
||||
Reference in New Issue
Block a user