Compare commits

..

19 Commits

Author SHA1 Message Date
c069207f12 Merge pull request 'refactor(ha_cluster): inject switch client for better testability' (#174) from switch-client into master
Some checks failed
Run Check Script / check (push) Successful in 1m44s
Compile and package harmony_composer / package_harmony_composer (push) Failing after 2m43s
Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/174
2025-10-23 15:05:17 +00:00
Ian Letourneau
7368184917 fix(ha_cluster): inject switch client for better testability
All checks were successful
Run Check Script / check (pull_request) Successful in 1m30s
2025-10-22 15:12:53 -04:00
05205f4ac1 Merge pull request 'feat: scrape targets to be able to get snmp alerts from machines to prometheus' (#171) from feat/scrape_target into master
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/171
2025-10-22 15:33:24 +00:00
3174645c97 Merge branch 'master' into feat/scrape_target
All checks were successful
Run Check Script / check (pull_request) Successful in 1m32s
2025-10-22 15:33:01 +00:00
7536f4ec4b Merge pull request 'fix: fixed merge error that somehow got missed' (#172) from fix/merge_error into master
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/172
2025-10-21 16:02:39 +00:00
464347d3e5 fix: fixed merge error that somehow got missed
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-21 12:01:31 -04:00
7f415f5b98 Merge pull request 'feat: K8sFlavour' (#161) from feat/detect_k8s_flavour into master
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/161
2025-10-21 15:56:47 +00:00
2a520a1d7c Merge branch 'master' into feat/detect_k8s_flavour
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-21 15:56:18 +00:00
987f195e2f feat(cert-manager): add cluster issuer to okd cluster score (#157)
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
added score to install okd cluster issuer

Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/157
2025-10-21 15:55:55 +00:00
14d1823d15 fix: remove ceph osd deletes and purges osd from ceph osd tree\ (#120)
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
k8s returns None rather than zero when checking deployment for replicas
exec_app requires commands 's' and '-c' to run correctly

Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/120
Co-authored-by: Willem <wrolleman@nationtech.io>
Co-committed-by: Willem <wrolleman@nationtech.io>
2025-10-21 15:54:51 +00:00
2a48d51479 fix: naming of k8s distribution
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-21 11:09:45 -04:00
20a227bb41 Merge branch 'master' into feat/detect_k8s_flavour
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-21 15:02:15 +00:00
ed7f81aa1f fix(opnsense-config): ensure load balancer service configuration is idempotent (#129)
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
The previous implementation blindly added HAProxy components without checking for existing configurations on the same port, which caused duplicate entries and errors when a service was updated.

This commit refactors the logic to a robust "remove-then-add" strategy. The configure_service method now finds and removes any existing frontend and its dependent components (backend, servers, health check) before adding the new, complete service definition.

This change makes the process fully idempotent, preventing configuration drift and ensuring a predictable state.

Co-authored-by: Ian Letourneau <letourneau.ian@gmail.com>
Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/129
2025-10-20 19:18:49 +00:00
cb66b7592e fix: made targets plural and changed scrape targets to option in AlertingInterpret
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-20 14:44:37 -04:00
a815f6ac9c feat: scrape targets to be able to get snmp alerts from machines to prometheus
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-20 11:44:11 -04:00
5f78300d78 Merge branch 'master' into feat/detect_k8s_flavour
All checks were successful
Run Check Script / check (pull_request) Successful in 1m20s
2025-10-02 17:14:30 -04:00
2d3c32469c chore: Simplify k8s flavour detection algorithm and do not unwrap when it cannot be detected, just return Err 2025-09-30 22:59:50 -04:00
1cec398d4d fix: modifed naming scheme to OpenshiftFamily, K3sFamily, and defaultswitched discovery of openshiftfamily to look for projet.openshift.io 2025-09-29 11:29:34 -04:00
f073b7e5fb feat:added k8s flavour to k8s_aywhere topology to be able to get the type of cluster
All checks were successful
Run Check Script / check (pull_request) Successful in 33s
2025-09-24 13:28:46 -04:00
51 changed files with 1371 additions and 577 deletions

17
Cargo.lock generated
View File

@ -1780,6 +1780,7 @@ dependencies = [
name = "example-nanodc"
version = "0.1.0"
dependencies = [
"brocade",
"cidr",
"env_logger",
"harmony",
@ -1788,6 +1789,7 @@ dependencies = [
"harmony_tui",
"harmony_types",
"log",
"serde",
"tokio",
"url",
]
@ -1806,6 +1808,7 @@ dependencies = [
name = "example-okd-install"
version = "0.1.0"
dependencies = [
"brocade",
"cidr",
"env_logger",
"harmony",
@ -1836,13 +1839,16 @@ dependencies = [
name = "example-opnsense"
version = "0.1.0"
dependencies = [
"brocade",
"cidr",
"env_logger",
"harmony",
"harmony_macros",
"harmony_secret",
"harmony_tui",
"harmony_types",
"log",
"serde",
"tokio",
"url",
]
@ -1851,6 +1857,7 @@ dependencies = [
name = "example-pxe"
version = "0.1.0"
dependencies = [
"brocade",
"cidr",
"env_logger",
"harmony",
@ -1865,6 +1872,15 @@ dependencies = [
"url",
]
[[package]]
name = "example-remove-rook-osd"
version = "0.1.0"
dependencies = [
"harmony",
"harmony_cli",
"tokio",
]
[[package]]
name = "example-rust"
version = "0.1.0"
@ -3918,6 +3934,7 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
name = "opnsense-config"
version = "0.1.0"
dependencies = [
"assertor",
"async-trait",
"chrono",
"env_logger",

View File

@ -15,7 +15,8 @@ members = [
"harmony_inventory_agent",
"harmony_secret_derive",
"harmony_secret",
"adr/agent_discovery/mdns", "brocade",
"adr/agent_discovery/mdns",
"brocade",
]
[workspace.package]

View File

@ -10,6 +10,7 @@ use log::{debug, info};
use regex::Regex;
use std::{collections::HashSet, str::FromStr};
#[derive(Debug)]
pub struct FastIronClient {
shell: BrocadeShell,
version: BrocadeInfo,

View File

@ -162,7 +162,7 @@ pub async fn init(
}
#[async_trait]
pub trait BrocadeClient {
pub trait BrocadeClient: std::fmt::Debug {
/// Retrieves the operating system and version details from the connected Brocade switch.
///
/// This is typically the first call made after establishing a connection to determine

View File

@ -10,6 +10,7 @@ use crate::{
parse_brocade_mac_address, shell::BrocadeShell,
};
#[derive(Debug)]
pub struct NetworkOperatingSystemClient {
shell: BrocadeShell,
version: BrocadeInfo,
@ -247,12 +248,7 @@ impl BrocadeClient for NetworkOperatingSystemClient {
ports: &[PortLocation],
) -> Result<(), Error> {
info!(
"[Brocade] Configuring port-channel '{channel_id} {channel_name}' with ports: {}",
ports
.iter()
.map(|p| format!("{p}"))
.collect::<Vec<String>>()
.join(", ")
"[Brocade] Configuring port-channel '{channel_name} {channel_id}' with ports: {ports:?}"
);
let interfaces = self.get_interfaces().await?;

View File

@ -13,6 +13,7 @@ use log::info;
use russh::ChannelMsg;
use tokio::time::timeout;
#[derive(Debug)]
pub struct BrocadeShell {
ip: IpAddr,
port: u16,

View File

@ -17,3 +17,5 @@ harmony_secret = { path = "../../harmony_secret" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }
serde = { workspace = true }
brocade = { path = "../../brocade" }

View File

@ -3,12 +3,13 @@ use std::{
sync::Arc,
};
use brocade::BrocadeOptions;
use cidr::Ipv4Cidr;
use harmony::{
config::secret::SshKeyPair,
data::{FileContent, FilePath},
hardware::{HostCategory, Location, PhysicalHost, SwitchGroup},
infra::opnsense::OPNSenseManagementInterface,
infra::{brocade::BrocadeSwitchClient, opnsense::OPNSenseManagementInterface},
inventory::Inventory,
modules::{
http::StaticFilesHttpScore,
@ -22,8 +23,9 @@ use harmony::{
topology::{LogicalHost, UnmanagedRouter},
};
use harmony_macros::{ip, mac_address};
use harmony_secret::SecretManager;
use harmony_secret::{Secret, SecretManager};
use harmony_types::net::Url;
use serde::{Deserialize, Serialize};
#[tokio::main]
async fn main() {
@ -32,6 +34,26 @@ async fn main() {
name: String::from("fw0"),
};
let switch_auth = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
.await
.expect("Failed to get credentials");
let switches: Vec<IpAddr> = vec![ip!("192.168.33.101")];
let brocade_options = Some(BrocadeOptions {
dry_run: *harmony::config::DRY_RUN,
..Default::default()
});
let switch_client = BrocadeSwitchClient::init(
&switches,
&switch_auth.username,
&switch_auth.password,
brocade_options,
)
.await
.expect("Failed to connect to switch");
let switch_client = Arc::new(switch_client);
let opnsense = Arc::new(
harmony::infra::opnsense::OPNSenseFirewall::new(firewall, None, "root", "opnsense").await,
);
@ -39,7 +61,6 @@ async fn main() {
let gateway_ipv4 = Ipv4Addr::new(192, 168, 33, 1);
let gateway_ip = IpAddr::V4(gateway_ipv4);
let topology = harmony::topology::HAClusterTopology {
kubeconfig: None,
domain_name: "ncd0.harmony.mcd".to_string(), // TODO this must be set manually correctly
// when setting up the opnsense firewall
router: Arc::new(UnmanagedRouter::new(
@ -84,7 +105,7 @@ async fn main() {
name: "wk2".to_string(),
},
],
switch: vec![],
switch_client: switch_client.clone(),
};
let inventory = Inventory {
@ -167,3 +188,9 @@ async fn main() {
.await
.unwrap();
}
#[derive(Secret, Serialize, Deserialize, Debug)]
pub struct BrocadeSwitchAuth {
pub username: String,
pub password: String,
}

View File

@ -19,3 +19,4 @@ log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }
serde.workspace = true
brocade = { path = "../../brocade" }

View File

@ -1,7 +1,8 @@
use brocade::BrocadeOptions;
use cidr::Ipv4Cidr;
use harmony::{
hardware::{Location, SwitchGroup},
infra::opnsense::OPNSenseManagementInterface,
infra::{brocade::BrocadeSwitchClient, opnsense::OPNSenseManagementInterface},
inventory::Inventory,
topology::{HAClusterTopology, LogicalHost, UnmanagedRouter},
};
@ -22,6 +23,26 @@ pub async fn get_topology() -> HAClusterTopology {
name: String::from("opnsense-1"),
};
let switch_auth = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
.await
.expect("Failed to get credentials");
let switches: Vec<IpAddr> = vec![ip!("192.168.1.101")]; // TODO: Adjust me
let brocade_options = Some(BrocadeOptions {
dry_run: *harmony::config::DRY_RUN,
..Default::default()
});
let switch_client = BrocadeSwitchClient::init(
&switches,
&switch_auth.username,
&switch_auth.password,
brocade_options,
)
.await
.expect("Failed to connect to switch");
let switch_client = Arc::new(switch_client);
let config = SecretManager::get_or_prompt::<OPNSenseFirewallConfig>().await;
let config = config.unwrap();
@ -38,7 +59,6 @@ pub async fn get_topology() -> HAClusterTopology {
let gateway_ipv4 = ipv4!("192.168.1.1");
let gateway_ip = IpAddr::V4(gateway_ipv4);
harmony::topology::HAClusterTopology {
kubeconfig: None,
domain_name: "demo.harmony.mcd".to_string(),
router: Arc::new(UnmanagedRouter::new(
gateway_ip,
@ -59,7 +79,7 @@ pub async fn get_topology() -> HAClusterTopology {
name: "bootstrap".to_string(),
},
workers: vec![],
switch: vec![],
switch_client: switch_client.clone(),
}
}
@ -76,3 +96,9 @@ pub fn get_inventory() -> Inventory {
control_plane_host: vec![],
}
}
#[derive(Secret, Serialize, Deserialize, Debug)]
pub struct BrocadeSwitchAuth {
pub username: String,
pub password: String,
}

View File

@ -19,3 +19,4 @@ log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }
serde.workspace = true
brocade = { path = "../../brocade" }

View File

@ -1,13 +1,15 @@
use brocade::BrocadeOptions;
use cidr::Ipv4Cidr;
use harmony::{
config::secret::OPNSenseFirewallCredentials,
hardware::{Location, SwitchGroup},
infra::opnsense::OPNSenseManagementInterface,
infra::{brocade::BrocadeSwitchClient, opnsense::OPNSenseManagementInterface},
inventory::Inventory,
topology::{HAClusterTopology, LogicalHost, UnmanagedRouter},
};
use harmony_macros::{ip, ipv4};
use harmony_secret::SecretManager;
use harmony_secret::{Secret, SecretManager};
use serde::{Deserialize, Serialize};
use std::{net::IpAddr, sync::Arc};
pub async fn get_topology() -> HAClusterTopology {
@ -16,6 +18,26 @@ pub async fn get_topology() -> HAClusterTopology {
name: String::from("opnsense-1"),
};
let switch_auth = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
.await
.expect("Failed to get credentials");
let switches: Vec<IpAddr> = vec![ip!("192.168.1.101")]; // TODO: Adjust me
let brocade_options = Some(BrocadeOptions {
dry_run: *harmony::config::DRY_RUN,
..Default::default()
});
let switch_client = BrocadeSwitchClient::init(
&switches,
&switch_auth.username,
&switch_auth.password,
brocade_options,
)
.await
.expect("Failed to connect to switch");
let switch_client = Arc::new(switch_client);
let config = SecretManager::get_or_prompt::<OPNSenseFirewallCredentials>().await;
let config = config.unwrap();
@ -32,7 +54,6 @@ pub async fn get_topology() -> HAClusterTopology {
let gateway_ipv4 = ipv4!("192.168.1.1");
let gateway_ip = IpAddr::V4(gateway_ipv4);
harmony::topology::HAClusterTopology {
kubeconfig: None,
domain_name: "demo.harmony.mcd".to_string(),
router: Arc::new(UnmanagedRouter::new(
gateway_ip,
@ -53,7 +74,7 @@ pub async fn get_topology() -> HAClusterTopology {
name: "cp0".to_string(),
},
workers: vec![],
switch: vec![],
switch_client: switch_client.clone(),
}
}
@ -70,3 +91,9 @@ pub fn get_inventory() -> Inventory {
control_plane_host: vec![],
}
}
#[derive(Secret, Serialize, Deserialize, Debug)]
pub struct BrocadeSwitchAuth {
pub username: String,
pub password: String,
}

View File

@ -16,3 +16,6 @@ harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }
harmony_secret = { path = "../../harmony_secret" }
brocade = { path = "../../brocade" }
serde = { workspace = true }

View File

@ -3,10 +3,11 @@ use std::{
sync::Arc,
};
use brocade::BrocadeOptions;
use cidr::Ipv4Cidr;
use harmony::{
hardware::{HostCategory, Location, PhysicalHost, SwitchGroup},
infra::opnsense::OPNSenseManagementInterface,
infra::{brocade::BrocadeSwitchClient, opnsense::OPNSenseManagementInterface},
inventory::Inventory,
modules::{
dummy::{ErrorScore, PanicScore, SuccessScore},
@ -18,7 +19,9 @@ use harmony::{
topology::{LogicalHost, UnmanagedRouter},
};
use harmony_macros::{ip, mac_address};
use harmony_secret::{Secret, SecretManager};
use harmony_types::net::Url;
use serde::{Deserialize, Serialize};
#[tokio::main]
async fn main() {
@ -27,6 +30,26 @@ async fn main() {
name: String::from("opnsense-1"),
};
let switch_auth = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
.await
.expect("Failed to get credentials");
let switches: Vec<IpAddr> = vec![ip!("192.168.5.101")]; // TODO: Adjust me
let brocade_options = Some(BrocadeOptions {
dry_run: *harmony::config::DRY_RUN,
..Default::default()
});
let switch_client = BrocadeSwitchClient::init(
&switches,
&switch_auth.username,
&switch_auth.password,
brocade_options,
)
.await
.expect("Failed to connect to switch");
let switch_client = Arc::new(switch_client);
let opnsense = Arc::new(
harmony::infra::opnsense::OPNSenseFirewall::new(firewall, None, "root", "opnsense").await,
);
@ -34,7 +57,6 @@ async fn main() {
let gateway_ipv4 = Ipv4Addr::new(10, 100, 8, 1);
let gateway_ip = IpAddr::V4(gateway_ipv4);
let topology = harmony::topology::HAClusterTopology {
kubeconfig: None,
domain_name: "demo.harmony.mcd".to_string(),
router: Arc::new(UnmanagedRouter::new(
gateway_ip,
@ -55,7 +77,7 @@ async fn main() {
name: "cp0".to_string(),
},
workers: vec![],
switch: vec![],
switch_client: switch_client.clone(),
};
let inventory = Inventory {
@ -110,3 +132,9 @@ async fn main() {
.await
.unwrap();
}
#[derive(Secret, Serialize, Deserialize, Debug)]
pub struct BrocadeSwitchAuth {
pub username: String,
pub password: String,
}

View File

@ -0,0 +1,11 @@
[package]
name = "example-remove-rook-osd"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { version = "0.1.0", path = "../../harmony" }
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
tokio.workspace = true

View File

@ -0,0 +1,18 @@
use harmony::{
inventory::Inventory, modules::storage::ceph::ceph_remove_osd_score::CephRemoveOsd,
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let ceph_score = CephRemoveOsd {
osd_deployment_name: "rook-ceph-osd-2".to_string(),
rook_ceph_namespace: "rook-ceph".to_string(),
};
let topology = K8sAnywhereTopology::from_env();
let inventory = Inventory::autoload();
harmony_cli::run(inventory, topology, vec![Box::new(ceph_score)], None)
.await
.unwrap();
}

View File

@ -30,6 +30,7 @@ pub enum InterpretName {
Lamp,
ApplicationMonitoring,
K8sPrometheusCrdAlerting,
CephRemoveOsd,
DiscoverInventoryAgent,
CephClusterHealth,
Custom(&'static str),
@ -61,6 +62,7 @@ impl std::fmt::Display for InterpretName {
InterpretName::Lamp => f.write_str("LAMP"),
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
InterpretName::CephRemoveOsd => f.write_str("CephRemoveOsd"),
InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"),
InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"),
InterpretName::Custom(name) => f.write_str(name),

View File

@ -1,21 +1,20 @@
use async_trait::async_trait;
use brocade::BrocadeOptions;
use harmony_macros::ip;
use harmony_secret::SecretManager;
use harmony_types::{
net::{MacAddress, Url},
switch::PortLocation,
};
use k8s_openapi::api::core::v1::Namespace;
use kube::api::ObjectMeta;
use log::debug;
use log::info;
use crate::data::FileContent;
use crate::executors::ExecutorError;
use crate::infra::brocade::BrocadeSwitchAuth;
use crate::infra::brocade::BrocadeSwitchClient;
use crate::modules::okd::crd::nmstate::{
self, NMState, NodeNetworkConfigurationPolicy, NodeNetworkConfigurationPolicySpec,
use crate::hardware::PhysicalHost;
use crate::modules::okd::crd::{
InstallPlanApproval, OperatorGroup, OperatorGroupSpec, Subscription, SubscriptionSpec,
nmstate::{self, NMState, NodeNetworkConfigurationPolicy, NodeNetworkConfigurationPolicySpec},
};
use crate::topology::PxeOptions;
@ -27,7 +26,6 @@ use super::{
};
use std::collections::BTreeMap;
use std::net::IpAddr;
use std::sync::Arc;
#[derive(Debug, Clone)]
@ -40,11 +38,10 @@ pub struct HAClusterTopology {
pub tftp_server: Arc<dyn TftpServer>,
pub http_server: Arc<dyn HttpServer>,
pub dns_server: Arc<dyn DnsServer>,
pub switch_client: Arc<dyn SwitchClient>,
pub bootstrap_host: LogicalHost,
pub control_plane: Vec<LogicalHost>,
pub workers: Vec<LogicalHost>,
pub switch: Vec<LogicalHost>,
pub kubeconfig: Option<String>,
}
#[async_trait]
@ -63,17 +60,9 @@ impl Topology for HAClusterTopology {
#[async_trait]
impl K8sclient for HAClusterTopology {
async fn k8s_client(&self) -> Result<Arc<K8sClient>, String> {
match &self.kubeconfig {
None => Ok(Arc::new(
Ok(Arc::new(
K8sClient::try_default().await.map_err(|e| e.to_string())?,
)),
Some(kubeconfig) => {
let Some(client) = K8sClient::from_kubeconfig(&kubeconfig).await else {
return Err("Failed to create k8s client".to_string());
};
Ok(Arc::new(client))
}
}
))
}
}
@ -99,47 +88,59 @@ impl HAClusterTopology {
}
async fn ensure_nmstate_operator_installed(&self) -> Result<(), String> {
// FIXME: Find a way to check nmstate is already available (get pod -n openshift-nmstate)
debug!("Installing NMState operator...");
let k8s_client = self.k8s_client().await?;
debug!("Installing NMState controller...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/nmstate.io_nmstates.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
debug!("Creating NMState namespace...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/namespace.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
debug!("Creating NMState service account...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/service_account.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
debug!("Creating NMState role...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
debug!("Creating NMState role binding...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role_binding.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
debug!("Creating NMState operator...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/operator.yaml
").unwrap(), Some("nmstate"))
.await
.map_err(|e| e.to_string())?;
let nmstate_namespace = Namespace {
metadata: ObjectMeta {
name: Some("openshift-nmstate".to_string()),
finalizers: Some(vec!["kubernetes".to_string()]),
..Default::default()
},
..Default::default()
};
debug!("Creating NMState namespace: {nmstate_namespace:#?}");
k8s_client
.wait_until_deployment_ready("nmstate-operator", Some("nmstate"), None)
.await?;
.apply(&nmstate_namespace, None)
.await
.map_err(|e| e.to_string())?;
let nmstate_operator_group = OperatorGroup {
metadata: ObjectMeta {
name: Some("openshift-nmstate".to_string()),
namespace: Some("openshift-nmstate".to_string()),
..Default::default()
},
spec: OperatorGroupSpec {
target_namespaces: vec!["openshift-nmstate".to_string()],
},
};
debug!("Creating NMState operator group: {nmstate_operator_group:#?}");
k8s_client
.apply(&nmstate_operator_group, None)
.await
.map_err(|e| e.to_string())?;
let nmstate_subscription = Subscription {
metadata: ObjectMeta {
name: Some("kubernetes-nmstate-operator".to_string()),
namespace: Some("openshift-nmstate".to_string()),
..Default::default()
},
spec: SubscriptionSpec {
channel: Some("stable".to_string()),
install_plan_approval: Some(InstallPlanApproval::Automatic),
name: "kubernetes-nmstate-operator".to_string(),
source: "redhat-operators".to_string(),
source_namespace: "openshift-marketplace".to_string(),
},
};
debug!("Subscribing to NMState Operator: {nmstate_subscription:#?}");
k8s_client
.apply(&nmstate_subscription, None)
.await
.map_err(|e| e.to_string())?;
let nmstate = NMState {
metadata: ObjectMeta {
@ -161,7 +162,11 @@ impl HAClusterTopology {
42 // FIXME: Find a better way to declare the bond id
}
async fn configure_bond(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> {
async fn configure_bond(
&self,
host: &PhysicalHost,
config: &HostNetworkConfig,
) -> Result<(), SwitchError> {
self.ensure_nmstate_operator_installed()
.await
.map_err(|e| {
@ -170,26 +175,24 @@ impl HAClusterTopology {
))
})?;
let bond_config = self.create_bond_configuration(config);
debug!(
"Configuring bond for host {}: {bond_config:#?}",
config.host_id
);
let bond_config = self.create_bond_configuration(host, config);
debug!("Configuring bond for host {host:?}: {bond_config:#?}");
self.k8s_client()
.await
.unwrap()
.apply(&bond_config, None)
.await
.map_err(|e| SwitchError::new(format!("Failed to configure bond: {e}")))?;
.unwrap();
Ok(())
todo!()
}
fn create_bond_configuration(
&self,
host: &PhysicalHost,
config: &HostNetworkConfig,
) -> NodeNetworkConfigurationPolicy {
let host_name = &config.host_id;
let host_name = host.id.clone();
let bond_id = self.get_next_bond_id();
let bond_name = format!("bond{bond_id}");
@ -272,33 +275,16 @@ impl HAClusterTopology {
}
}
async fn get_switch_client(&self) -> Result<Box<dyn SwitchClient>, SwitchError> {
let auth = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
.await
.map_err(|e| SwitchError::new(format!("Failed to get credentials: {e}")))?;
// FIXME: We assume Brocade switches
let switches: Vec<IpAddr> = self.switch.iter().map(|s| s.ip).collect();
let brocade_options = Some(BrocadeOptions {
dry_run: *crate::config::DRY_RUN,
..Default::default()
});
let client =
BrocadeSwitchClient::init(&switches, &auth.username, &auth.password, brocade_options)
.await
.map_err(|e| SwitchError::new(format!("Failed to connect to switch: {e}")))?;
Ok(Box::new(client))
}
async fn configure_port_channel(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> {
async fn configure_port_channel(
&self,
host: &PhysicalHost,
config: &HostNetworkConfig,
) -> Result<(), SwitchError> {
debug!("Configuring port channel: {config:#?}");
let client = self.get_switch_client().await?;
let switch_ports = config.switch_ports.iter().map(|s| s.port.clone()).collect();
client
.configure_port_channel(&format!("Harmony_{}", config.host_id), switch_ports)
self.switch_client
.configure_port_channel(&format!("Harmony_{}", host.id), switch_ports)
.await
.map_err(|e| SwitchError::new(format!("Failed to configure switch: {e}")))?;
@ -313,7 +299,6 @@ impl HAClusterTopology {
};
Self {
kubeconfig: None,
domain_name: "DummyTopology".to_string(),
router: dummy_infra.clone(),
load_balancer: dummy_infra.clone(),
@ -322,10 +307,10 @@ impl HAClusterTopology {
tftp_server: dummy_infra.clone(),
http_server: dummy_infra.clone(),
dns_server: dummy_infra.clone(),
switch_client: dummy_infra.clone(),
bootstrap_host: dummy_host,
control_plane: vec![],
workers: vec![],
switch: vec![],
}
}
}
@ -483,8 +468,7 @@ impl HttpServer for HAClusterTopology {
#[async_trait]
impl Switch for HAClusterTopology {
async fn setup_switch(&self) -> Result<(), SwitchError> {
let client = self.get_switch_client().await?;
client.setup().await?;
self.switch_client.setup().await?;
Ok(())
}
@ -492,14 +476,17 @@ impl Switch for HAClusterTopology {
&self,
mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError> {
let client = self.get_switch_client().await?;
let port = client.find_port(mac_address).await?;
let port = self.switch_client.find_port(mac_address).await?;
Ok(port)
}
async fn configure_host_network(&self, config: &HostNetworkConfig) -> Result<(), SwitchError> {
self.configure_bond(config).await?;
self.configure_port_channel(config).await
async fn configure_host_network(
&self,
host: &PhysicalHost,
config: HostNetworkConfig,
) -> Result<(), SwitchError> {
self.configure_bond(host, &config).await?;
self.configure_port_channel(host, &config).await
}
}
@ -689,3 +676,25 @@ impl DnsServer for DummyInfra {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
}
#[async_trait]
impl SwitchClient for DummyInfra {
async fn setup(&self) -> Result<(), SwitchError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
async fn find_port(
&self,
_mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
async fn configure_port_channel(
&self,
_channel_name: &str,
_switch_ports: Vec<PortLocation>,
) -> Result<u8, SwitchError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
}

View File

@ -4,13 +4,13 @@ use derive_new::new;
use k8s_openapi::{
ClusterResourceScope, NamespaceResourceScope,
api::{apps::v1::Deployment, core::v1::Pod},
apimachinery::pkg::version::Info,
};
use kube::{
Client, Config, Error, Resource,
Client, Config, Discovery, Error, Resource,
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse,
discovery::{ApiCapabilities, Scope},
error::DiscoveryError,
runtime::reflector::Lookup,
};
@ -19,12 +19,11 @@ use kube::{
api::{ApiResource, GroupVersionKind},
runtime::wait::await_condition,
};
use log::{debug, error, trace, warn};
use log::{debug, error, info, trace};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::json;
use serde_json::{Value, json};
use similar::TextDiff;
use tokio::{io::AsyncReadExt, time::sleep};
use url::Url;
#[derive(new, Clone)]
pub struct K8sClient {
@ -58,6 +57,17 @@ impl K8sClient {
})
}
pub async fn get_apiserver_version(&self) -> Result<Info, Error> {
let client: Client = self.client.clone();
let version_info: Info = client.apiserver_version().await?;
Ok(version_info)
}
pub async fn discovery(&self) -> Result<Discovery, Error> {
let discovery: Discovery = Discovery::new(self.client.clone()).run().await?;
Ok(discovery)
}
pub async fn get_resource_json_value(
&self,
name: &str,
@ -70,8 +80,7 @@ impl K8sClient {
} else {
Api::default_namespaced_with(self.client.clone(), &gvk)
};
resource.get(name).await
Ok(resource.get(name).await?)
}
pub async fn get_deployment(
@ -80,12 +89,14 @@ impl K8sClient {
namespace: Option<&str>,
) -> Result<Option<Deployment>, Error> {
let deps: Api<Deployment> = if let Some(ns) = namespace {
debug!("getting namespaced deployment");
Api::namespaced(self.client.clone(), ns)
} else {
debug!("getting default namespace deployment");
Api::default_namespaced(self.client.clone())
};
deps.get_opt(name).await
debug!("getting deployment {} in ns {}", name, namespace.unwrap());
Ok(deps.get_opt(name).await?)
}
pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result<Option<Pod>, Error> {
@ -94,8 +105,7 @@ impl K8sClient {
} else {
Api::default_namespaced(self.client.clone())
};
pods.get_opt(name).await
Ok(pods.get_opt(name).await?)
}
pub async fn scale_deployment(
@ -116,7 +126,7 @@ impl K8sClient {
}
});
let pp = PatchParams::default();
let scale = Patch::Apply(&patch);
let scale = Patch::Merge(&patch);
deployments.patch_scale(name, &pp, &scale).await?;
Ok(())
}
@ -138,9 +148,9 @@ impl K8sClient {
pub async fn wait_until_deployment_ready(
&self,
name: &str,
name: String,
namespace: Option<&str>,
timeout: Option<Duration>,
timeout: Option<u64>,
) -> Result<(), String> {
let api: Api<Deployment>;
@ -150,9 +160,9 @@ impl K8sClient {
api = Api::default_namespaced(self.client.clone());
}
let establish = await_condition(api, name, conditions::is_deployment_completed());
let timeout = timeout.unwrap_or(Duration::from_secs(120));
let res = tokio::time::timeout(timeout, establish).await;
let establish = await_condition(api, name.as_str(), conditions::is_deployment_completed());
let t = timeout.unwrap_or(300);
let res = tokio::time::timeout(std::time::Duration::from_secs(t), establish).await;
if res.is_ok() {
Ok(())
@ -242,7 +252,7 @@ impl K8sClient {
if let Some(s) = status.status {
let mut stdout_buf = String::new();
if let Some(mut stdout) = process.stdout() {
if let Some(mut stdout) = process.stdout().take() {
stdout
.read_to_string(&mut stdout_buf)
.await
@ -348,14 +358,14 @@ impl K8sClient {
Ok(current) => {
trace!("Received current value {current:#?}");
// The resource exists, so we calculate and display a diff.
println!("\nPerforming dry-run for resource: '{name}'");
println!("\nPerforming dry-run for resource: '{}'", name);
let mut current_yaml = serde_yaml::to_value(&current).unwrap_or_else(|_| {
panic!("Could not serialize current value : {current:#?}")
});
if current_yaml.is_mapping() && current_yaml.get("status").is_some() {
let map = current_yaml.as_mapping_mut().unwrap();
let removed = map.remove_entry("status");
trace!("Removed status {removed:?}");
trace!("Removed status {:?}", removed);
} else {
trace!(
"Did not find status entry for current object {}/{}",
@ -384,14 +394,14 @@ impl K8sClient {
similar::ChangeTag::Insert => "+",
similar::ChangeTag::Equal => " ",
};
print!("{sign}{change}");
print!("{}{}", sign, change);
}
// In a dry run, we return the new resource state that would have been applied.
Ok(resource.clone())
}
Err(Error::Api(ErrorResponse { code: 404, .. })) => {
// The resource does not exist, so the "diff" is the entire new resource.
println!("\nPerforming dry-run for new resource: '{name}'");
println!("\nPerforming dry-run for new resource: '{}'", name);
println!(
"Resource does not exist. It would be created with the following content:"
);
@ -400,14 +410,14 @@ impl K8sClient {
// Print each line of the new resource with a '+' prefix.
for line in new_yaml.lines() {
println!("+{line}");
println!("+{}", line);
}
// In a dry run, we return the new resource state that would have been created.
Ok(resource.clone())
}
Err(e) => {
// Another API error occurred.
error!("Failed to get resource '{name}': {e}");
error!("Failed to get resource '{}': {}", name, e);
Err(e)
}
}
@ -422,7 +432,7 @@ impl K8sClient {
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize,
<K as Resource>::Scope: ApplyStrategy<K>,
<K as Resource>::DynamicType: Default,
<K as kube::Resource>::DynamicType: Default,
{
let mut result = Vec::new();
for r in resource.iter() {
@ -487,7 +497,10 @@ impl K8sClient {
// 6. Apply the object to the cluster using Server-Side Apply.
// This will create the resource if it doesn't exist, or update it if it does.
println!("Applying '{name}' in namespace '{namespace}'...",);
println!(
"Applying Argo Application '{}' in namespace '{}'...",
name, namespace
);
let patch_params = PatchParams::apply("harmony"); // Use a unique field manager name
let result = api.patch(name, &patch_params, &Patch::Apply(&obj)).await?;
@ -496,51 +509,6 @@ impl K8sClient {
Ok(())
}
/// Apply a resource from a URL
///
/// It is the equivalent of `kubectl apply -f <url>`
pub async fn apply_url(&self, url: Url, ns: Option<&str>) -> Result<(), Error> {
let patch_params = PatchParams::apply("harmony");
let discovery = kube::Discovery::new(self.client.clone()).run().await?;
let yaml = reqwest::get(url)
.await
.expect("Could not get URL")
.text()
.await
.expect("Could not get content from URL");
for doc in multidoc_deserialize(&yaml).expect("failed to parse YAML from file") {
let obj: DynamicObject =
serde_yaml::from_value(doc).expect("cannot apply without valid YAML");
let namespace = obj.metadata.namespace.as_deref().or(ns);
let type_meta = obj
.types
.as_ref()
.expect("cannot apply object without valid TypeMeta");
let gvk = GroupVersionKind::try_from(type_meta)
.expect("cannot apply object without valid GroupVersionKind");
let name = obj.name_any();
if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) {
let api = get_dynamic_api(ar, caps, self.client.clone(), namespace, false);
trace!(
"Applying {}: \n{}",
gvk.kind,
serde_yaml::to_string(&obj).expect("Failed to serialize YAML")
);
let data: serde_json::Value =
serde_json::to_value(&obj).expect("Failed to serialize JSON");
let _r = api.patch(&name, &patch_params, &Patch::Apply(data)).await?;
debug!("applied {} {}", gvk.kind, name);
} else {
warn!("Cannot apply document for unknown {gvk:?}");
}
}
Ok(())
}
pub(crate) async fn from_kubeconfig(path: &str) -> Option<K8sClient> {
let k = match Kubeconfig::read_from(path) {
Ok(k) => k,
@ -560,31 +528,6 @@ impl K8sClient {
}
}
fn get_dynamic_api(
resource: ApiResource,
capabilities: ApiCapabilities,
client: Client,
ns: Option<&str>,
all: bool,
) -> Api<DynamicObject> {
if capabilities.scope == Scope::Cluster || all {
Api::all_with(client, &resource)
} else if let Some(namespace) = ns {
Api::namespaced_with(client, namespace, &resource)
} else {
Api::default_namespaced_with(client, &resource)
}
}
fn multidoc_deserialize(data: &str) -> Result<Vec<serde_yaml::Value>, serde_yaml::Error> {
use serde::Deserialize;
let mut docs = vec![];
for de in serde_yaml::Deserializer::from_str(data) {
docs.push(serde_yaml::Value::deserialize(de)?);
}
Ok(docs)
}
pub trait ApplyStrategy<K: Resource> {
fn get_api(client: &Client, ns: Option<&str>) -> Api<K>;
}

View File

@ -47,6 +47,13 @@ struct K8sState {
message: String,
}
#[derive(Debug, Clone)]
pub enum KubernetesDistribution {
OpenshiftFamily,
K3sFamily,
Default,
}
#[derive(Debug, Clone)]
enum K8sSource {
LocalK3d,
@ -57,6 +64,7 @@ enum K8sSource {
pub struct K8sAnywhereTopology {
k8s_state: Arc<OnceCell<Option<K8sState>>>,
tenant_manager: Arc<OnceCell<K8sTenantManager>>,
k8s_distribution: Arc<OnceCell<KubernetesDistribution>>,
config: Arc<K8sAnywhereConfig>,
}
@ -162,6 +170,7 @@ impl K8sAnywhereTopology {
Self {
k8s_state: Arc::new(OnceCell::new()),
tenant_manager: Arc::new(OnceCell::new()),
k8s_distribution: Arc::new(OnceCell::new()),
config: Arc::new(K8sAnywhereConfig::from_env()),
}
}
@ -170,10 +179,42 @@ impl K8sAnywhereTopology {
Self {
k8s_state: Arc::new(OnceCell::new()),
tenant_manager: Arc::new(OnceCell::new()),
k8s_distribution: Arc::new(OnceCell::new()),
config: Arc::new(config),
}
}
pub async fn get_k8s_distribution(&self) -> Result<&KubernetesDistribution, PreparationError> {
self.k8s_distribution
.get_or_try_init(async || {
let client = self.k8s_client().await.unwrap();
let discovery = client.discovery().await.map_err(|e| {
PreparationError::new(format!("Could not discover API groups: {}", e))
})?;
let version = client.get_apiserver_version().await.map_err(|e| {
PreparationError::new(format!("Could not get server version: {}", e))
})?;
// OpenShift / OKD
if discovery
.groups()
.any(|g| g.name() == "project.openshift.io")
{
return Ok(KubernetesDistribution::OpenshiftFamily);
}
// K3d / K3s
if version.git_version.contains("k3s") {
return Ok(KubernetesDistribution::K3sFamily);
}
return Ok(KubernetesDistribution::Default);
})
.await
}
async fn get_cluster_observability_operator_prometheus_application_score(
&self,
sender: RHOBObservability,

View File

@ -28,13 +28,7 @@ pub trait LoadBalancer: Send + Sync {
&self,
service: &LoadBalancerService,
) -> Result<(), ExecutorError> {
debug!(
"Listing LoadBalancer services {:?}",
self.list_services().await
);
if !self.list_services().await.contains(service) {
self.add_service(service).await?;
}
Ok(())
}
}

View File

@ -1,9 +1,14 @@
use std::{error::Error, net::Ipv4Addr, str::FromStr, sync::Arc};
use std::{
error::Error,
fmt::{self, Debug},
net::Ipv4Addr,
str::FromStr,
sync::Arc,
};
use async_trait::async_trait;
use derive_new::new;
use harmony_types::{
id::Id,
net::{IpAddress, MacAddress},
switch::PortLocation,
};
@ -20,8 +25,8 @@ pub struct DHCPStaticEntry {
pub ip: Ipv4Addr,
}
impl std::fmt::Display for DHCPStaticEntry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for DHCPStaticEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mac = self
.mac
.iter()
@ -43,8 +48,8 @@ pub trait Firewall: Send + Sync {
fn get_host(&self) -> LogicalHost;
}
impl std::fmt::Debug for dyn Firewall {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl Debug for dyn Firewall {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!("Firewall {}", self.get_ip()))
}
}
@ -66,7 +71,7 @@ pub struct PxeOptions {
}
#[async_trait]
pub trait DhcpServer: Send + Sync + std::fmt::Debug {
pub trait DhcpServer: Send + Sync + Debug {
async fn add_static_mapping(&self, entry: &DHCPStaticEntry) -> Result<(), ExecutorError>;
async fn remove_static_mapping(&self, mac: &MacAddress) -> Result<(), ExecutorError>;
async fn list_static_mappings(&self) -> Vec<(MacAddress, IpAddress)>;
@ -105,8 +110,8 @@ pub trait DnsServer: Send + Sync {
}
}
impl std::fmt::Debug for dyn DnsServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl Debug for dyn DnsServer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!("DnsServer {}", self.get_ip()))
}
}
@ -142,8 +147,8 @@ pub enum DnsRecordType {
TXT,
}
impl std::fmt::Display for DnsRecordType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for DnsRecordType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DnsRecordType::A => write!(f, "A"),
DnsRecordType::AAAA => write!(f, "AAAA"),
@ -186,12 +191,15 @@ pub trait Switch: Send + Sync {
mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError>;
async fn configure_host_network(&self, config: &HostNetworkConfig) -> Result<(), SwitchError>;
async fn configure_host_network(
&self,
host: &PhysicalHost,
config: HostNetworkConfig,
) -> Result<(), SwitchError>;
}
#[derive(Clone, Debug, PartialEq)]
pub struct HostNetworkConfig {
pub host_id: Id,
pub switch_ports: Vec<SwitchPort>,
}
@ -214,8 +222,8 @@ pub struct SwitchError {
msg: String,
}
impl std::fmt::Display for SwitchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for SwitchError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.msg)
}
}
@ -223,7 +231,7 @@ impl std::fmt::Display for SwitchError {
impl Error for SwitchError {}
#[async_trait]
pub trait SwitchClient: Send + Sync {
pub trait SwitchClient: Debug + Send + Sync {
/// Executes essential, idempotent, one-time initial configuration steps.
///
/// This is an opiniated procedure that setups a switch to provide high availability

View File

@ -21,6 +21,7 @@ pub struct AlertingInterpret<S: AlertSender> {
pub sender: S,
pub receivers: Vec<Box<dyn AlertReceiver<S>>>,
pub rules: Vec<Box<dyn AlertRule<S>>>,
pub scrape_targets: Option<Vec<Box<dyn ScrapeTarget<S>>>>,
}
#[async_trait]
@ -38,6 +39,12 @@ impl<S: AlertSender + Installable<T>, T: Topology> Interpret<T> for AlertingInte
debug!("installing rule: {:#?}", rule);
rule.install(&self.sender).await?;
}
if let Some(targets) = &self.scrape_targets {
for target in targets.iter() {
debug!("installing scrape_target: {:#?}", target);
target.install(&self.sender).await?;
}
}
self.sender.ensure_installed(inventory, topology).await?;
Ok(Outcome::success(format!(
"successfully installed alert sender {}",
@ -77,6 +84,6 @@ pub trait AlertRule<S: AlertSender>: std::fmt::Debug + Send + Sync {
}
#[async_trait]
pub trait ScrapeTarget<S: AlertSender> {
async fn install(&self, sender: &S) -> Result<(), InterpretError>;
pub trait ScrapeTarget<S: AlertSender>: std::fmt::Debug + Send + Sync {
async fn install(&self, sender: &S) -> Result<Outcome, InterpretError>;
}

View File

@ -1,15 +1,14 @@
use async_trait::async_trait;
use brocade::{BrocadeClient, BrocadeOptions, InterSwitchLink, InterfaceStatus, PortOperatingMode};
use harmony_secret::Secret;
use harmony_types::{
net::{IpAddress, MacAddress},
switch::{PortDeclaration, PortLocation},
};
use option_ext::OptionExt;
use serde::{Deserialize, Serialize};
use crate::topology::{SwitchClient, SwitchError};
#[derive(Debug)]
pub struct BrocadeSwitchClient {
brocade: Box<dyn BrocadeClient + Send + Sync>,
}
@ -114,12 +113,6 @@ impl SwitchClient for BrocadeSwitchClient {
}
}
#[derive(Secret, Serialize, Deserialize, Debug)]
pub struct BrocadeSwitchAuth {
pub username: String,
pub password: String,
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
@ -235,7 +228,7 @@ mod tests {
assert_that!(*configured_interfaces).is_empty();
}
#[derive(Clone)]
#[derive(Debug, Clone)]
struct FakeBrocadeClient {
stack_topology: Vec<InterSwitchLink>,
interfaces: Vec<InterfaceInfo>,

View File

@ -26,19 +26,13 @@ impl LoadBalancer for OPNSenseFirewall {
}
async fn add_service(&self, service: &LoadBalancerService) -> Result<(), ExecutorError> {
warn!(
"TODO : the current implementation does not check / cleanup / merge with existing haproxy services properly. Make sure to manually verify that the configuration is correct after executing any operation here"
);
let mut config = self.opnsense_config.write().await;
let mut load_balancer = config.load_balancer();
let (frontend, backend, servers, healthcheck) =
harmony_load_balancer_service_to_haproxy_xml(service);
let mut load_balancer = config.load_balancer();
load_balancer.add_backend(backend);
load_balancer.add_frontend(frontend);
load_balancer.add_servers(servers);
if let Some(healthcheck) = healthcheck {
load_balancer.add_healthcheck(healthcheck);
}
load_balancer.configure_service(frontend, backend, servers, healthcheck);
Ok(())
}
@ -106,7 +100,7 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer(
.backends
.backends
.iter()
.find(|b| b.uuid == frontend.default_backend);
.find(|b| Some(b.uuid.clone()) == frontend.default_backend);
let mut health_check = None;
match matching_backend {
@ -116,8 +110,7 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer(
}
None => {
warn!(
"HAProxy config could not find a matching backend for frontend {:?}",
frontend
"HAProxy config could not find a matching backend for frontend {frontend:?}"
);
}
}
@ -152,11 +145,11 @@ pub(crate) fn get_servers_for_backend(
.servers
.iter()
.filter_map(|server| {
let address = server.address.clone()?;
let port = server.port?;
if backend_servers.contains(&server.uuid.as_str()) {
return Some(BackendServer {
address: server.address.clone(),
port: server.port,
});
return Some(BackendServer { address, port });
}
None
})
@ -347,7 +340,7 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
name: format!("frontend_{}", service.listening_port),
bind: service.listening_port.to_string(),
mode: "tcp".to_string(), // TODO do not depend on health check here
default_backend: backend.uuid.clone(),
default_backend: Some(backend.uuid.clone()),
..Default::default()
};
info!("HAPRoxy frontend and backend mode currently hardcoded to tcp");
@ -361,8 +354,8 @@ fn server_to_haproxy_server(server: &BackendServer) -> HAProxyServer {
uuid: Uuid::new_v4().to_string(),
name: format!("{}_{}", &server.address, &server.port),
enabled: 1,
address: server.address.clone(),
port: server.port,
address: Some(server.address.clone()),
port: Some(server.port),
mode: "active".to_string(),
server_type: "static".to_string(),
..Default::default()
@ -385,8 +378,8 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: "192.168.1.1".to_string(),
port: 80,
address: Some("192.168.1.1".to_string()),
port: Some(80),
..Default::default()
};
haproxy.servers.servers.push(server);
@ -411,8 +404,8 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: "192.168.1.1".to_string(),
port: 80,
address: Some("192.168.1.1".to_string()),
port: Some(80),
..Default::default()
};
haproxy.servers.servers.push(server);
@ -431,8 +424,8 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: "192.168.1.1".to_string(),
port: 80,
address: Some("192.168.1.1".to_string()),
port: Some(80),
..Default::default()
};
haproxy.servers.servers.push(server);
@ -453,16 +446,16 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: "some-hostname.test.mcd".to_string(),
port: 80,
address: Some("some-hostname.test.mcd".to_string()),
port: Some(80),
..Default::default()
};
haproxy.servers.servers.push(server);
let server = HAProxyServer {
uuid: "server2".to_string(),
address: "192.168.1.2".to_string(),
port: 8080,
address: Some("192.168.1.2".to_string()),
port: Some(8080),
..Default::default()
};
haproxy.servers.servers.push(server);

View File

@ -0,0 +1,209 @@
use std::sync::Arc;
use async_trait::async_trait;
use harmony_types::id::Id;
use kube::{CustomResource, api::ObjectMeta};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
};
#[derive(Clone, Debug, Serialize)]
pub struct ClusterIssuerScore {
email: String,
server: String,
issuer_name: String,
namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for ClusterIssuerScore {
fn name(&self) -> String {
"ClusterIssuerScore".to_string()
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(ClusterIssuerInterpret {
score: self.clone(),
})
}
}
#[derive(Debug, Clone)]
pub struct ClusterIssuerInterpret {
score: ClusterIssuerScore,
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for ClusterIssuerInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
self.apply_cluster_issuer(topology.k8s_client().await.unwrap())
.await
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("ClusterIssuer")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl ClusterIssuerInterpret {
async fn validate_cert_manager(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let cert_manager = "cert-manager".to_string();
let operator_namespace = "openshift-operators".to_string();
match client
.get_deployment(&cert_manager, Some(&operator_namespace))
.await
{
Ok(Some(deployment)) => {
if let Some(status) = deployment.status {
let ready_count = status.ready_replicas.unwrap_or(0);
if ready_count >= 1 {
return Ok(Outcome::success(format!(
"'{}' is ready with {} replica(s).",
&cert_manager, ready_count
)));
} else {
return Err(InterpretError::new(
"cert-manager operator not ready in cluster".to_string(),
));
}
} else {
Err(InterpretError::new(format!(
"failed to get deployment status {} in ns {}",
&cert_manager, &operator_namespace
)))
}
}
Ok(None) => Err(InterpretError::new(format!(
"Deployment '{}' not found in namespace '{}'.",
&cert_manager, &operator_namespace
))),
Err(e) => Err(InterpretError::new(format!(
"Failed to query for deployment '{}': {}",
&cert_manager, e
))),
}
}
fn build_cluster_issuer(&self) -> Result<ClusterIssuer, InterpretError> {
let issuer_name = &self.score.issuer_name;
let email = &self.score.email;
let server = &self.score.server;
let namespace = &self.score.namespace;
let cluster_issuer = ClusterIssuer {
metadata: ObjectMeta {
name: Some(issuer_name.to_string()),
namespace: Some(namespace.to_string()),
..Default::default()
},
spec: ClusterIssuerSpec {
acme: AcmeSpec {
email: email.to_string(),
private_key_secret_ref: PrivateKeySecretRef {
name: issuer_name.to_string(),
},
server: server.to_string(),
solvers: vec![SolverSpec {
http01: Some(Http01Solver {
ingress: Http01Ingress {
class: "nginx".to_string(),
},
}),
}],
},
},
};
Ok(cluster_issuer)
}
pub async fn apply_cluster_issuer(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let namespace = self.score.namespace.clone();
self.validate_cert_manager(&client).await?;
let cluster_issuer = self.build_cluster_issuer().unwrap();
client
.apply_yaml(
&serde_yaml::to_value(cluster_issuer).unwrap(),
Some(&namespace),
)
.await?;
Ok(Outcome::success(format!(
"successfully deployed cluster operator: {} in namespace: {}",
self.score.issuer_name, self.score.namespace
)))
}
}
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "cert-manager.io",
version = "v1",
kind = "ClusterIssuer",
plural = "clusterissuers"
)]
#[serde(rename_all = "camelCase")]
pub struct ClusterIssuerSpec {
pub acme: AcmeSpec,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct AcmeSpec {
pub email: String,
pub private_key_secret_ref: PrivateKeySecretRef,
pub server: String,
pub solvers: Vec<SolverSpec>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct PrivateKeySecretRef {
pub name: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct SolverSpec {
pub http01: Option<Http01Solver>,
// Other solver types (e.g., dns01) would go here as Options
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Http01Solver {
pub ingress: Http01Ingress,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Http01Ingress {
pub class: String,
}

View File

@ -1,2 +1,3 @@
pub mod cluster_issuer;
mod helm;
pub use helm::*;

View File

@ -38,15 +38,13 @@ impl<
+ 'static
+ Send
+ Clone,
T: Topology + K8sclient,
T: Topology,
> Score<T> for K8sResourceScore<K>
where
<K as kube::Resource>::DynamicType: Default,
{
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(K8sResourceInterpret {
score: self.clone(),
})
todo!()
}
fn name(&self) -> String {

View File

@ -0,0 +1,187 @@
use std::net::IpAddr;
use async_trait::async_trait;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::{
modules::monitoring::kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus, crd_prometheuses::LabelSelector,
},
topology::oberservability::monitoring::ScrapeTarget,
};
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "monitoring.coreos.com",
version = "v1alpha1",
kind = "ScrapeConfig",
plural = "scrapeconfigs",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct ScrapeConfigSpec {
/// List of static configurations.
pub static_configs: Option<Vec<StaticConfig>>,
/// Kubernetes service discovery.
pub kubernetes_sd_configs: Option<Vec<KubernetesSDConfig>>,
/// HTTP-based service discovery.
pub http_sd_configs: Option<Vec<HttpSDConfig>>,
/// File-based service discovery.
pub file_sd_configs: Option<Vec<FileSDConfig>>,
/// DNS-based service discovery.
pub dns_sd_configs: Option<Vec<DnsSDConfig>>,
/// Consul service discovery.
pub consul_sd_configs: Option<Vec<ConsulSDConfig>>,
/// Relabeling configuration applied to discovered targets.
pub relabel_configs: Option<Vec<RelabelConfig>>,
/// Metric relabeling configuration applied to scraped samples.
pub metric_relabel_configs: Option<Vec<RelabelConfig>>,
/// Path to scrape metrics from (defaults to `/metrics`).
pub metrics_path: Option<String>,
/// Interval at which Prometheus scrapes targets (e.g., "30s").
pub scrape_interval: Option<String>,
/// Timeout for scraping (e.g., "10s").
pub scrape_timeout: Option<String>,
/// Optional job name override.
pub job_name: Option<String>,
/// Optional scheme (http or https).
pub scheme: Option<String>,
/// Authorization paramaters for snmp walk
pub params: Option<Params>,
}
/// Static configuration section of a ScrapeConfig.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct StaticConfig {
pub targets: Vec<String>,
pub labels: Option<LabelSelector>,
}
/// Relabeling configuration for target or metric relabeling.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct RelabelConfig {
pub source_labels: Option<Vec<String>>,
pub separator: Option<String>,
pub target_label: Option<String>,
pub regex: Option<String>,
pub modulus: Option<u64>,
pub replacement: Option<String>,
pub action: Option<String>,
}
/// Kubernetes service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct KubernetesSDConfig {
///"pod", "service", "endpoints"pub role: String,
pub namespaces: Option<NamespaceSelector>,
pub selectors: Option<Vec<LabelSelector>>,
pub api_server: Option<String>,
pub bearer_token_file: Option<String>,
pub tls_config: Option<TLSConfig>,
}
/// Namespace selector for Kubernetes service discovery.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct NamespaceSelector {
pub any: Option<bool>,
pub match_names: Option<Vec<String>>,
}
/// HTTP-based service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct HttpSDConfig {
pub url: String,
pub refresh_interval: Option<String>,
pub basic_auth: Option<BasicAuth>,
pub authorization: Option<Authorization>,
pub tls_config: Option<TLSConfig>,
}
/// File-based service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct FileSDConfig {
pub files: Vec<String>,
pub refresh_interval: Option<String>,
}
/// DNS-based service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct DnsSDConfig {
pub names: Vec<String>,
pub refresh_interval: Option<String>,
pub type_: Option<String>, // SRV, A, AAAA
pub port: Option<u16>,
}
/// Consul service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ConsulSDConfig {
pub server: String,
pub services: Option<Vec<String>>,
pub scheme: Option<String>,
pub datacenter: Option<String>,
pub tag_separator: Option<String>,
pub refresh_interval: Option<String>,
pub tls_config: Option<TLSConfig>,
}
/// Basic authentication credentials.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct BasicAuth {
pub username: String,
pub password: Option<String>,
pub password_file: Option<String>,
}
/// Bearer token or other auth mechanisms.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Authorization {
pub credentials: Option<String>,
pub credentials_file: Option<String>,
pub type_: Option<String>,
}
/// TLS configuration for secure scraping.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct TLSConfig {
pub ca_file: Option<String>,
pub cert_file: Option<String>,
pub key_file: Option<String>,
pub server_name: Option<String>,
pub insecure_skip_verify: Option<bool>,
}
/// Authorization parameters for SNMP walk.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Params {
pub auth: Option<Vec<String>>,
pub module: Option<Vec<String>>,
}

View File

@ -4,6 +4,7 @@ pub mod crd_default_rules;
pub mod crd_grafana;
pub mod crd_prometheus_rules;
pub mod crd_prometheuses;
pub mod crd_scrape_config;
pub mod grafana_default_dashboard;
pub mod grafana_operator;
pub mod prometheus_operator;

View File

@ -31,6 +31,7 @@ impl<T: Topology + HelmCommand + TenantManager> Score<T> for HelmPrometheusAlert
sender: KubePrometheus { config },
receivers: self.receivers.clone(),
rules: self.rules.clone(),
scrape_targets: None,
})
}
fn name(&self) -> String {

View File

@ -6,3 +6,4 @@ pub mod kube_prometheus;
pub mod ntfy;
pub mod okd;
pub mod prometheus;
pub mod scrape_target;

View File

@ -100,7 +100,11 @@ impl<T: Topology + HelmCommand + K8sclient + MultiTargetTopology> Interpret<T> f
info!("deploying ntfy...");
client
.wait_until_deployment_ready("ntfy", Some(self.score.namespace.as_str()), None)
.wait_until_deployment_ready(
"ntfy".to_string(),
Some(self.score.namespace.as_str()),
None,
)
.await?;
info!("ntfy deployed");

View File

@ -0,0 +1 @@
pub mod server;

View File

@ -0,0 +1,76 @@
use std::net::IpAddr;
use async_trait::async_trait;
use kube::api::ObjectMeta;
use serde::Serialize;
use crate::{
interpret::{InterpretError, Outcome},
modules::monitoring::kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus,
crd_scrape_config::{Params, RelabelConfig, ScrapeConfig, ScrapeConfigSpec, StaticConfig},
},
topology::oberservability::monitoring::ScrapeTarget,
};
#[derive(Debug, Clone, Serialize)]
pub struct Server {
pub name: String,
pub ip: IpAddr,
pub auth: String,
pub module: String,
pub domain: String,
}
#[async_trait]
impl ScrapeTarget<CRDPrometheus> for Server {
async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> {
let scrape_config_spec = ScrapeConfigSpec {
static_configs: Some(vec![StaticConfig {
targets: vec![self.ip.to_string()],
labels: None,
}]),
scrape_interval: Some("2m".to_string()),
kubernetes_sd_configs: None,
http_sd_configs: None,
file_sd_configs: None,
dns_sd_configs: None,
params: Some(Params {
auth: Some(vec![self.auth.clone()]),
module: Some(vec![self.module.clone()]),
}),
consul_sd_configs: None,
relabel_configs: Some(vec![RelabelConfig {
action: None,
source_labels: Some(vec!["__address__".to_string()]),
separator: None,
target_label: Some("__param_target".to_string()),
regex: None,
replacement: Some(format!("snmp.{}:31080", self.domain.clone())),
modulus: None,
}]),
metric_relabel_configs: None,
metrics_path: Some("/snmp".to_string()),
scrape_timeout: Some("2m".to_string()),
job_name: Some(format!("snmp_exporter/cloud/{}", self.name.clone())),
scheme: None,
};
let scrape_config = ScrapeConfig {
metadata: ObjectMeta {
name: Some(self.name.clone()),
namespace: Some(sender.namespace.clone()),
..Default::default()
},
spec: scrape_config_spec,
};
sender
.client
.apply(&scrape_config, Some(&sender.namespace.clone()))
.await?;
Ok(Outcome::success(format!(
"installed scrape target {}",
self.name.clone()
)))
}
}

View File

@ -5,8 +5,10 @@ use crate::{
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::{HostRole, Inventory},
modules::{
dhcp::DhcpHostBindingScore, http::IPxeMacBootFileScore,
inventory::DiscoverHostForRoleScore, okd::templates::BootstrapIpxeTpl,
dhcp::DhcpHostBindingScore,
http::IPxeMacBootFileScore,
inventory::DiscoverHostForRoleScore,
okd::{host_network::HostNetworkConfigurationScore, templates::BootstrapIpxeTpl},
},
score::Score,
topology::{HAClusterTopology, HostBinding},
@ -203,6 +205,28 @@ impl OKDSetup03ControlPlaneInterpret {
Ok(())
}
/// Placeholder for automating network bonding configuration.
async fn persist_network_bond(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
hosts: &Vec<PhysicalHost>,
) -> Result<(), InterpretError> {
info!("[ControlPlane] Ensuring persistent bonding");
let score = HostNetworkConfigurationScore {
hosts: hosts.clone(),
};
score.interpret(inventory, topology).await?;
inquire::Confirm::new(
"Network configuration for control plane nodes is not automated yet. Configure it manually if needed.",
)
.prompt()
.map_err(|e| InterpretError::new(format!("User prompt failed: {e}")))?;
Ok(())
}
}
#[async_trait]
@ -241,6 +265,10 @@ impl Interpret<HAClusterTopology> for OKDSetup03ControlPlaneInterpret {
// 4. Reboot the nodes to start the OS installation.
self.reboot_targets(&nodes).await?;
// 5. Placeholder for post-boot network configuration (e.g., bonding).
self.persist_network_bond(inventory, topology, &nodes)
.await?;
// TODO: Implement a step to wait for the control plane nodes to join the cluster
// and for the cluster operators to become available. This would be similar to
// the `wait-for bootstrap-complete` command.

View File

@ -77,6 +77,8 @@ impl OKDBootstrapLoadBalancerScore {
address: topology.bootstrap_host.ip.to_string(),
port,
});
backend.dedup();
backend
}
}

View File

@ -1,126 +0,0 @@
use crate::{
data::Version,
hardware::PhysicalHost,
infra::inventory::InventoryRepositoryFactory,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::{HostRole, Inventory},
modules::okd::host_network::HostNetworkConfigurationScore,
score::Score,
topology::HAClusterTopology,
};
use async_trait::async_trait;
use derive_new::new;
use harmony_types::id::Id;
use log::info;
use serde::Serialize;
// -------------------------------------------------------------------------------------------------
// Persist Network Bond
// - Persist bonding via NMState
// - Persist port channels on the Switch
// -------------------------------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize, new)]
pub struct OKDSetupPersistNetworkBondScore {}
impl Score<HAClusterTopology> for OKDSetupPersistNetworkBondScore {
fn create_interpret(&self) -> Box<dyn Interpret<HAClusterTopology>> {
Box::new(OKDSetupPersistNetworkBondInterpet::new())
}
fn name(&self) -> String {
"OKDSetupPersistNetworkBondScore".to_string()
}
}
#[derive(Debug, Clone)]
pub struct OKDSetupPersistNetworkBondInterpet {
version: Version,
status: InterpretStatus,
}
impl OKDSetupPersistNetworkBondInterpet {
pub fn new() -> Self {
let version = Version::from("1.0.0").unwrap();
Self {
version,
status: InterpretStatus::QUEUED,
}
}
/// Ensures that three physical hosts are discovered and available for the ControlPlane role.
/// It will trigger discovery if not enough hosts are found.
async fn get_nodes(
&self,
_inventory: &Inventory,
_topology: &HAClusterTopology,
) -> Result<Vec<PhysicalHost>, InterpretError> {
const REQUIRED_HOSTS: usize = 3;
let repo = InventoryRepositoryFactory::build().await?;
let control_plane_hosts = repo.get_host_for_role(&HostRole::ControlPlane).await?;
if control_plane_hosts.len() < REQUIRED_HOSTS {
Err(InterpretError::new(format!(
"OKD Requires at least {} control plane hosts, but only found {}. Cannot proceed.",
REQUIRED_HOSTS,
control_plane_hosts.len()
)))
} else {
// Take exactly the number of required hosts to ensure consistency.
Ok(control_plane_hosts
.into_iter()
.take(REQUIRED_HOSTS)
.collect())
}
}
async fn persist_network_bond(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
hosts: &Vec<PhysicalHost>,
) -> Result<(), InterpretError> {
info!("Ensuring persistent bonding");
let score = HostNetworkConfigurationScore {
hosts: hosts.clone(),
};
score.interpret(inventory, topology).await?;
Ok(())
}
}
#[async_trait]
impl Interpret<HAClusterTopology> for OKDSetupPersistNetworkBondInterpet {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDSetup03ControlPlane")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<Id> {
vec![]
}
async fn execute(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
) -> Result<Outcome, InterpretError> {
let nodes = self.get_nodes(inventory, topology).await?;
self.persist_network_bond(inventory, topology, &nodes)
.await?;
Ok(Outcome::success(
"Network bond successfully persisted".into(),
))
}
}

View File

@ -1 +1,41 @@
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
pub mod nmstate;
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "operators.coreos.com",
version = "v1",
kind = "OperatorGroup",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct OperatorGroupSpec {
pub target_namespaces: Vec<String>,
}
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "operators.coreos.com",
version = "v1alpha1",
kind = "Subscription",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct SubscriptionSpec {
pub name: String,
pub source: String,
pub source_namespace: String,
pub channel: Option<String>,
pub install_plan_approval: Option<InstallPlanApproval>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub enum InstallPlanApproval {
#[serde(rename = "Automatic")]
Automatic,
#[serde(rename = "Manual")]
Manual,
}

View File

@ -6,16 +6,9 @@ use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "nmstate.io",
version = "v1",
kind = "NMState",
plural = "nmstates",
namespaced = false
)]
#[kube(group = "nmstate.io", version = "v1", kind = "NMState", namespaced)]
#[serde(rename_all = "camelCase")]
pub struct NMStateSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub probe_configuration: Option<ProbeConfig>,
}

View File

@ -39,34 +39,16 @@ impl HostNetworkConfigurationInterpret {
&self,
topology: &T,
host: &PhysicalHost,
current_host: &usize,
total_hosts: &usize,
) -> Result<HostNetworkConfig, InterpretError> {
info!("[Host {current_host}/{total_hosts}] Collecting ports on switch...");
) -> Result<(), InterpretError> {
let switch_ports = self.collect_switch_ports_for_host(topology, host).await?;
let config = HostNetworkConfig {
host_id: host.id.clone(),
switch_ports,
};
if !config.switch_ports.is_empty() {
info!(
"[Host {current_host}/{total_hosts}] Found {} ports for {} interfaces",
config.switch_ports.len(),
host.network.len()
);
info!("[Host {current_host}/{total_hosts}] Configuring host network...");
if !switch_ports.is_empty() {
topology
.configure_host_network(&config)
.configure_host_network(host, HostNetworkConfig { switch_ports })
.await
.map_err(|e| InterpretError::new(format!("Failed to configure host: {e}")))?;
} else {
info!("[Host {current_host}/{total_hosts}] No ports found");
}
Ok(config)
Ok(())
}
async fn collect_switch_ports_for_host<T: Topology + Switch>(
@ -103,47 +85,6 @@ impl HostNetworkConfigurationInterpret {
Ok(switch_ports)
}
fn format_host_configuration(&self, configs: Vec<HostNetworkConfig>) -> Vec<String> {
let mut report = vec![
"Network Configuration Report".to_string(),
"------------------------------------------------------------------".to_string(),
];
for config in configs {
let host = self
.score
.hosts
.iter()
.find(|h| h.id == config.host_id)
.unwrap();
println!("[Host] {host}");
if config.switch_ports.is_empty() {
report.push(format!(
"⏭️ Host {}: SKIPPED (No matching switch ports found)",
config.host_id
));
} else {
let mappings: Vec<String> = config
.switch_ports
.iter()
.map(|p| format!("[{} -> {}]", p.interface.name, p.port))
.collect();
report.push(format!(
"✅ Host {}: Bonded {} port(s) {}",
config.host_id,
config.switch_ports.len(),
mappings.join(", ")
));
}
}
report
.push("------------------------------------------------------------------".to_string());
report
}
}
#[async_trait]
@ -173,36 +114,27 @@ impl<T: Topology + Switch> Interpret<T> for HostNetworkConfigurationInterpret {
return Ok(Outcome::noop("No hosts to configure".into()));
}
let host_count = self.score.hosts.len();
info!("Started network configuration for {host_count} host(s)...",);
info!(
"Started network configuration for {} host(s)...",
self.score.hosts.len()
);
topology
.setup_switch()
.await
.map_err(|e| InterpretError::new(format!("Switch setup failed: {e}")))?;
let mut current_host = 1;
let mut host_configurations = vec![];
let mut configured_host_count = 0;
for host in &self.score.hosts {
let host_configuration = self
.configure_network_for_host(topology, host, &current_host, &host_count)
.await?;
host_configurations.push(host_configuration);
current_host += 1;
self.configure_network_for_host(topology, host).await?;
configured_host_count += 1;
}
if current_host > 1 {
let details = self.format_host_configuration(host_configurations);
Ok(Outcome::success_with_details(
format!(
"Configured {}/{} host(s)",
current_host - 1,
if configured_host_count > 0 {
Ok(Outcome::success(format!(
"Configured {configured_host_count}/{} host(s)",
self.score.hosts.len()
),
details,
))
)))
} else {
Ok(Outcome::noop("No hosts configured".into()))
}
@ -277,7 +209,6 @@ mod tests {
assert_that!(*configured_host_networks).contains_exactly(vec![(
HOST_ID.clone(),
HostNetworkConfig {
host_id: HOST_ID.clone(),
switch_ports: vec![SwitchPort {
interface: EXISTING_INTERFACE.clone(),
port: PORT.clone(),
@ -303,7 +234,6 @@ mod tests {
assert_that!(*configured_host_networks).contains_exactly(vec![(
HOST_ID.clone(),
HostNetworkConfig {
host_id: HOST_ID.clone(),
switch_ports: vec![
SwitchPort {
interface: EXISTING_INTERFACE.clone(),
@ -333,7 +263,6 @@ mod tests {
(
HOST_ID.clone(),
HostNetworkConfig {
host_id: HOST_ID.clone(),
switch_ports: vec![SwitchPort {
interface: EXISTING_INTERFACE.clone(),
port: PORT.clone(),
@ -343,7 +272,6 @@ mod tests {
(
ANOTHER_HOST_ID.clone(),
HostNetworkConfig {
host_id: ANOTHER_HOST_ID.clone(),
switch_ports: vec![SwitchPort {
interface: ANOTHER_EXISTING_INTERFACE.clone(),
port: ANOTHER_PORT.clone(),
@ -454,10 +382,11 @@ mod tests {
async fn configure_host_network(
&self,
config: &HostNetworkConfig,
host: &PhysicalHost,
config: HostNetworkConfig,
) -> Result<(), SwitchError> {
let mut configured_host_networks = self.configured_host_networks.lock().unwrap();
configured_host_networks.push((config.host_id.clone(), config.clone()));
configured_host_networks.push((host.id.clone(), config.clone()));
Ok(())
}

View File

@ -50,7 +50,7 @@
use crate::{
modules::okd::{
OKDSetup01InventoryScore, OKDSetup02BootstrapScore, OKDSetup03ControlPlaneScore,
OKDSetup04WorkersScore, OKDSetup05SanityCheckScore, OKDSetupPersistNetworkBondScore,
OKDSetup04WorkersScore, OKDSetup05SanityCheckScore,
bootstrap_06_installation_report::OKDSetup06InstallationReportScore,
},
score::Score,
@ -65,7 +65,6 @@ impl OKDInstallationPipeline {
Box::new(OKDSetup01InventoryScore::new()),
Box::new(OKDSetup02BootstrapScore::new()),
Box::new(OKDSetup03ControlPlaneScore::new()),
Box::new(OKDSetupPersistNetworkBondScore::new()),
Box::new(OKDSetup04WorkersScore::new()),
Box::new(OKDSetup05SanityCheckScore::new()),
Box::new(OKDSetup06InstallationReportScore::new()),

View File

@ -6,7 +6,6 @@ mod bootstrap_05_sanity_check;
mod bootstrap_06_installation_report;
pub mod bootstrap_dhcp;
pub mod bootstrap_load_balancer;
mod bootstrap_persist_network_bond;
pub mod dhcp;
pub mod dns;
pub mod installation;
@ -20,6 +19,5 @@ pub use bootstrap_03_control_plane::*;
pub use bootstrap_04_workers::*;
pub use bootstrap_05_sanity_check::*;
pub use bootstrap_06_installation_report::*;
pub use bootstrap_persist_network_bond::*;
pub mod crd;
pub mod host_network;

View File

@ -4,7 +4,7 @@ use std::{
};
use async_trait::async_trait;
use log::{info, warn};
use log::{debug, warn};
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
@ -19,8 +19,8 @@ use harmony_types::id::Id;
#[derive(Debug, Clone, Serialize)]
pub struct CephRemoveOsd {
osd_deployment_name: String,
rook_ceph_namespace: String,
pub osd_deployment_name: String,
pub rook_ceph_namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
@ -54,18 +54,17 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
self.verify_deployment_scaled(client.clone()).await?;
self.delete_deployment(client.clone()).await?;
self.verify_deployment_deleted(client.clone()).await?;
let osd_id_full = self.get_ceph_osd_id().unwrap();
self.purge_ceph_osd(client.clone(), &osd_id_full).await?;
self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
.await?;
self.purge_ceph_osd(client.clone()).await?;
self.verify_ceph_osd_removal(client.clone()).await?;
let osd_id_full = self.get_ceph_osd_id().unwrap();
Ok(Outcome::success(format!(
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
osd_id_full, self.score.osd_deployment_name
)))
}
fn get_name(&self) -> InterpretName {
todo!()
InterpretName::CephRemoveOsd
}
fn get_version(&self) -> Version {
@ -82,7 +81,7 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
}
impl CephRemoveOsdInterpret {
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
pub fn get_ceph_osd_id_numeric(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self
.score
.osd_deployment_name
@ -94,9 +93,14 @@ impl CephRemoveOsdInterpret {
self.score.osd_deployment_name
))
})?;
Ok(osd_id_numeric.to_string())
}
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
let osd_id_full = format!("osd.{}", osd_id_numeric);
info!(
debug!(
"Targeting Ceph OSD: {} (parsed from deployment {})",
osd_id_full, self.score.osd_deployment_name
);
@ -108,6 +112,7 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
debug!("verifying toolbox exists");
let toolbox_dep = "rook-ceph-tools".to_string();
match client
@ -149,7 +154,7 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
debug!(
"Scaling down OSD deployment: {}",
self.score.osd_deployment_name
);
@ -172,7 +177,7 @@ impl CephRemoveOsdInterpret {
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
debug!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
@ -180,11 +185,9 @@ impl CephRemoveOsdInterpret {
Some(&self.score.rook_ceph_namespace),
)
.await?;
if let Some(deployment) = dep {
if let Some(status) = deployment.status {
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0
{
if status.replicas == None && status.ready_replicas == None {
return Ok(Outcome::success(
"Deployment successfully scaled down.".to_string(),
));
@ -212,7 +215,7 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
debug!(
"Deleting OSD deployment: {}",
self.score.osd_deployment_name
);
@ -234,7 +237,7 @@ impl CephRemoveOsdInterpret {
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
debug!("Verifying OSD deployment deleted");
loop {
let dep = client
.get_deployment(
@ -244,7 +247,7 @@ impl CephRemoveOsdInterpret {
.await?;
if dep.is_none() {
info!(
debug!(
"Deployment {} successfully deleted.",
self.score.osd_deployment_name
);
@ -276,12 +279,10 @@ impl CephRemoveOsdInterpret {
Ok(tree)
}
pub async fn purge_ceph_osd(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
info!(
pub async fn purge_ceph_osd(&self, client: Arc<K8sClient>) -> Result<Outcome, InterpretError> {
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
let osd_id_full = self.get_ceph_osd_id().unwrap();
debug!(
"Purging OSD {} from Ceph cluster and removing its auth key",
osd_id_full
);
@ -291,8 +292,9 @@ impl CephRemoveOsdInterpret {
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec![
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(),
format!("ceph auth del osd.{osd_id_full}").as_str(),
"sh",
"-c",
format!("ceph osd purge {osd_id_numeric} --yes-i-really-mean-it && ceph auth del {osd_id_full}").as_str(),
],
)
.await?;
@ -305,10 +307,10 @@ impl CephRemoveOsdInterpret {
pub async fn verify_ceph_osd_removal(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!(
let osd_id_full = self.get_ceph_osd_id().unwrap();
debug!(
"Verifying OSD {} has been removed from the Ceph tree...",
osd_id_full
);
@ -318,7 +320,7 @@ impl CephRemoveOsdInterpret {
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec!["ceph osd tree -f json"],
vec!["sh", "-c", "ceph osd tree -f json"],
)
.await?;
let tree =

View File

@ -1,2 +1,2 @@
pub mod ceph_osd_replacement_score;
pub mod ceph_remove_osd_score;
pub mod ceph_validate_health_score;

View File

@ -77,7 +77,7 @@ impl YaSerializeTrait for HAProxyId {
}
}
#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone)]
pub struct HAProxyId(String);
impl Default for HAProxyId {
@ -297,7 +297,7 @@ pub struct HAProxyFrontends {
pub frontend: Vec<Frontend>,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct Frontend {
#[yaserde(attribute = true)]
pub uuid: String,
@ -310,7 +310,7 @@ pub struct Frontend {
pub bind_options: MaybeString,
pub mode: String,
#[yaserde(rename = "defaultBackend")]
pub default_backend: String,
pub default_backend: Option<String>,
pub ssl_enabled: i32,
pub ssl_certificates: MaybeString,
pub ssl_default_certificate: MaybeString,
@ -416,7 +416,7 @@ pub struct HAProxyBackends {
pub backends: Vec<HAProxyBackend>,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct HAProxyBackend {
#[yaserde(attribute = true, rename = "uuid")]
pub uuid: String,
@ -535,7 +535,7 @@ pub struct HAProxyServers {
pub servers: Vec<HAProxyServer>,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct HAProxyServer {
#[yaserde(attribute = true, rename = "uuid")]
pub uuid: String,
@ -543,8 +543,8 @@ pub struct HAProxyServer {
pub enabled: u8,
pub name: String,
pub description: MaybeString,
pub address: String,
pub port: u16,
pub address: Option<String>,
pub port: Option<u16>,
pub checkport: MaybeString,
pub mode: String,
pub multiplexer_protocol: MaybeString,
@ -589,7 +589,7 @@ pub struct HAProxyHealthChecks {
pub healthchecks: Vec<HAProxyHealthCheck>,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct HAProxyHealthCheck {
#[yaserde(attribute = true)]
pub uuid: String,

View File

@ -9,7 +9,7 @@ pub struct Interface {
pub physical_interface_name: String,
pub descr: Option<MaybeString>,
pub mtu: Option<MaybeString>,
pub enable: Option<MaybeString>,
pub enable: MaybeString,
pub lock: Option<MaybeString>,
#[yaserde(rename = "spoofmac")]
pub spoof_mac: Option<MaybeString>,

View File

@ -25,6 +25,7 @@ sha2 = "0.10.9"
[dev-dependencies]
pretty_assertions.workspace = true
assertor.workspace = true
[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(e2e_test)'] }

View File

@ -30,8 +30,7 @@ impl SshConfigManager {
self.opnsense_shell
.exec(&format!(
"cp /conf/config.xml /conf/backup/{}",
backup_filename
"cp /conf/config.xml /conf/backup/{backup_filename}"
))
.await
}

View File

@ -1,9 +1,7 @@
mod ssh;
pub use ssh::*;
use async_trait::async_trait;
use crate::Error;
use async_trait::async_trait;
pub use ssh::*;
#[async_trait]
pub trait OPNsenseShell: std::fmt::Debug + Send + Sync {

View File

@ -1,11 +1,8 @@
use std::sync::Arc;
use log::warn;
use crate::{config::OPNsenseShell, Error};
use opnsense_config_xml::{
Frontend, HAProxy, HAProxyBackend, HAProxyHealthCheck, HAProxyServer, OPNsense,
};
use crate::{config::OPNsenseShell, Error};
use std::{collections::HashSet, sync::Arc};
pub struct LoadBalancerConfig<'a> {
opnsense: &'a mut OPNsense,
@ -31,7 +28,7 @@ impl<'a> LoadBalancerConfig<'a> {
match &mut self.opnsense.opnsense.haproxy.as_mut() {
Some(haproxy) => f(haproxy),
None => unimplemented!(
"Adding a backend is not supported when haproxy config does not exist yet"
"Cannot configure load balancer when haproxy config does not exist yet"
),
}
}
@ -40,21 +37,67 @@ impl<'a> LoadBalancerConfig<'a> {
self.with_haproxy(|haproxy| haproxy.general.enabled = enabled as i32);
}
pub fn add_backend(&mut self, backend: HAProxyBackend) {
warn!("TODO make sure this new backend does not refer non-existing entities like servers or health checks");
self.with_haproxy(|haproxy| haproxy.backends.backends.push(backend));
/// Configures a service by removing any existing service on the same port
/// and then adding the new definition. This ensures idempotency.
pub fn configure_service(
&mut self,
frontend: Frontend,
backend: HAProxyBackend,
servers: Vec<HAProxyServer>,
healthcheck: Option<HAProxyHealthCheck>,
) {
self.remove_service_by_bind_address(&frontend.bind);
self.remove_servers(&servers);
self.add_new_service(frontend, backend, servers, healthcheck);
}
pub fn add_frontend(&mut self, frontend: Frontend) {
self.with_haproxy(|haproxy| haproxy.frontends.frontend.push(frontend));
// Remove the corresponding real servers based on their name if they already exist.
fn remove_servers(&mut self, servers: &[HAProxyServer]) {
let server_names: HashSet<_> = servers.iter().map(|s| s.name.clone()).collect();
self.with_haproxy(|haproxy| {
haproxy
.servers
.servers
.retain(|s| !server_names.contains(&s.name));
});
}
pub fn add_healthcheck(&mut self, healthcheck: HAProxyHealthCheck) {
self.with_haproxy(|haproxy| haproxy.healthchecks.healthchecks.push(healthcheck));
/// Removes a service and its dependent components based on the frontend's bind address.
/// This performs a cascading delete of the frontend, backend, servers, and health check.
fn remove_service_by_bind_address(&mut self, bind_address: &str) {
self.with_haproxy(|haproxy| {
let Some(old_frontend) = remove_frontend_by_bind_address(haproxy, bind_address) else {
return;
};
let Some(old_backend) = remove_backend(haproxy, old_frontend) else {
return;
};
remove_healthcheck(haproxy, &old_backend);
remove_linked_servers(haproxy, &old_backend);
});
}
pub fn add_servers(&mut self, mut servers: Vec<HAProxyServer>) {
self.with_haproxy(|haproxy| haproxy.servers.servers.append(&mut servers));
/// Adds the components of a new service to the HAProxy configuration.
/// This function de-duplicates servers by name to prevent configuration errors.
fn add_new_service(
&mut self,
frontend: Frontend,
backend: HAProxyBackend,
servers: Vec<HAProxyServer>,
healthcheck: Option<HAProxyHealthCheck>,
) {
self.with_haproxy(|haproxy| {
if let Some(check) = healthcheck {
haproxy.healthchecks.healthchecks.push(check);
}
haproxy.servers.servers.extend(servers);
haproxy.backends.backends.push(backend);
haproxy.frontends.frontend.push(frontend);
});
}
pub async fn reload_restart(&self) -> Result<(), Error> {
@ -82,3 +125,262 @@ impl<'a> LoadBalancerConfig<'a> {
Ok(())
}
}
fn remove_frontend_by_bind_address(haproxy: &mut HAProxy, bind_address: &str) -> Option<Frontend> {
let pos = haproxy
.frontends
.frontend
.iter()
.position(|f| f.bind == bind_address);
match pos {
Some(pos) => Some(haproxy.frontends.frontend.remove(pos)),
None => None,
}
}
fn remove_backend(haproxy: &mut HAProxy, old_frontend: Frontend) -> Option<HAProxyBackend> {
let default_backend = old_frontend.default_backend?;
let pos = haproxy
.backends
.backends
.iter()
.position(|b| b.uuid == default_backend);
match pos {
Some(pos) => Some(haproxy.backends.backends.remove(pos)),
None => None, // orphaned frontend, shouldn't happen
}
}
fn remove_healthcheck(haproxy: &mut HAProxy, backend: &HAProxyBackend) {
if let Some(uuid) = &backend.health_check.content {
haproxy
.healthchecks
.healthchecks
.retain(|h| h.uuid != *uuid);
}
}
/// Remove the backend's servers. This assumes servers are not shared between services.
fn remove_linked_servers(haproxy: &mut HAProxy, backend: &HAProxyBackend) {
if let Some(server_uuids_str) = &backend.linked_servers.content {
let server_uuids_to_remove: HashSet<_> = server_uuids_str.split(',').collect();
haproxy
.servers
.servers
.retain(|s| !server_uuids_to_remove.contains(s.uuid.as_str()));
}
}
#[cfg(test)]
mod tests {
use crate::config::DummyOPNSenseShell;
use assertor::*;
use opnsense_config_xml::{
Frontend, HAProxy, HAProxyBackend, HAProxyBackends, HAProxyFrontends, HAProxyHealthCheck,
HAProxyHealthChecks, HAProxyId, HAProxyServer, HAProxyServers, MaybeString, OPNsense,
};
use std::sync::Arc;
use super::LoadBalancerConfig;
static SERVICE_BIND_ADDRESS: &str = "192.168.1.1:80";
static OTHER_SERVICE_BIND_ADDRESS: &str = "192.168.1.1:443";
static SERVER_ADDRESS: &str = "1.1.1.1:80";
static OTHER_SERVER_ADDRESS: &str = "1.1.1.1:443";
#[test]
fn configure_service_should_add_all_service_components_to_haproxy() {
let mut opnsense = given_opnsense();
let mut load_balancer = given_load_balancer(&mut opnsense);
let (healthcheck, servers, backend, frontend) =
given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS);
load_balancer.configure_service(
frontend.clone(),
backend.clone(),
servers.clone(),
Some(healthcheck.clone()),
);
assert_haproxy_configured_with(
opnsense,
vec![frontend],
vec![backend],
servers,
vec![healthcheck],
);
}
#[test]
fn configure_service_should_replace_service_on_same_bind_address() {
let (healthcheck, servers, backend, frontend) =
given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS);
let mut opnsense = given_opnsense_with(given_haproxy(
vec![frontend.clone()],
vec![backend.clone()],
servers.clone(),
vec![healthcheck.clone()],
));
let mut load_balancer = given_load_balancer(&mut opnsense);
let (updated_healthcheck, updated_servers, updated_backend, updated_frontend) =
given_service(SERVICE_BIND_ADDRESS, OTHER_SERVER_ADDRESS);
load_balancer.configure_service(
updated_frontend.clone(),
updated_backend.clone(),
updated_servers.clone(),
Some(updated_healthcheck.clone()),
);
assert_haproxy_configured_with(
opnsense,
vec![updated_frontend],
vec![updated_backend],
updated_servers,
vec![updated_healthcheck],
);
}
#[test]
fn configure_service_should_keep_existing_service_on_different_bind_addresses() {
let (healthcheck, servers, backend, frontend) =
given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS);
let (other_healthcheck, other_servers, other_backend, other_frontend) =
given_service(OTHER_SERVICE_BIND_ADDRESS, OTHER_SERVER_ADDRESS);
let mut opnsense = given_opnsense_with(given_haproxy(
vec![frontend.clone()],
vec![backend.clone()],
servers.clone(),
vec![healthcheck.clone()],
));
let mut load_balancer = given_load_balancer(&mut opnsense);
load_balancer.configure_service(
other_frontend.clone(),
other_backend.clone(),
other_servers.clone(),
Some(other_healthcheck.clone()),
);
assert_haproxy_configured_with(
opnsense,
vec![frontend, other_frontend],
vec![backend, other_backend],
[servers, other_servers].concat(),
vec![healthcheck, other_healthcheck],
);
}
fn assert_haproxy_configured_with(
opnsense: OPNsense,
frontends: Vec<Frontend>,
backends: Vec<HAProxyBackend>,
servers: Vec<HAProxyServer>,
healthchecks: Vec<HAProxyHealthCheck>,
) {
let haproxy = opnsense.opnsense.haproxy.as_ref().unwrap();
assert_that!(haproxy.frontends.frontend).contains_exactly(frontends);
assert_that!(haproxy.backends.backends).contains_exactly(backends);
assert_that!(haproxy.servers.servers).is_equal_to(servers);
assert_that!(haproxy.healthchecks.healthchecks).contains_exactly(healthchecks);
}
fn given_opnsense() -> OPNsense {
OPNsense::default()
}
fn given_opnsense_with(haproxy: HAProxy) -> OPNsense {
let mut opnsense = OPNsense::default();
opnsense.opnsense.haproxy = Some(haproxy);
opnsense
}
fn given_load_balancer<'a>(opnsense: &'a mut OPNsense) -> LoadBalancerConfig<'a> {
let opnsense_shell = Arc::new(DummyOPNSenseShell {});
if opnsense.opnsense.haproxy.is_none() {
opnsense.opnsense.haproxy = Some(HAProxy::default());
}
LoadBalancerConfig::new(opnsense, opnsense_shell)
}
fn given_service(
bind_address: &str,
server_address: &str,
) -> (
HAProxyHealthCheck,
Vec<HAProxyServer>,
HAProxyBackend,
Frontend,
) {
let healthcheck = given_healthcheck();
let servers = vec![given_server(server_address)];
let backend = given_backend();
let frontend = given_frontend(bind_address);
(healthcheck, servers, backend, frontend)
}
fn given_haproxy(
frontends: Vec<Frontend>,
backends: Vec<HAProxyBackend>,
servers: Vec<HAProxyServer>,
healthchecks: Vec<HAProxyHealthCheck>,
) -> HAProxy {
HAProxy {
frontends: HAProxyFrontends {
frontend: frontends,
},
backends: HAProxyBackends { backends },
servers: HAProxyServers { servers },
healthchecks: HAProxyHealthChecks { healthchecks },
..Default::default()
}
}
fn given_frontend(bind_address: &str) -> Frontend {
Frontend {
uuid: "uuid".into(),
id: HAProxyId::default(),
enabled: 1,
name: format!("frontend_{bind_address}"),
bind: bind_address.into(),
default_backend: Some("backend-uuid".into()),
..Default::default()
}
}
fn given_backend() -> HAProxyBackend {
HAProxyBackend {
uuid: "backend-uuid".into(),
id: HAProxyId::default(),
enabled: 1,
name: "backend_192.168.1.1:80".into(),
linked_servers: MaybeString::from("server-uuid"),
health_check_enabled: 1,
health_check: MaybeString::from("healthcheck-uuid"),
..Default::default()
}
}
fn given_server(address: &str) -> HAProxyServer {
HAProxyServer {
uuid: "server-uuid".into(),
id: HAProxyId::default(),
name: address.into(),
address: Some(address.into()),
..Default::default()
}
}
fn given_healthcheck() -> HAProxyHealthCheck {
HAProxyHealthCheck {
uuid: "healthcheck-uuid".into(),
name: "healthcheck".into(),
..Default::default()
}
}
}