Compare commits

..

18 Commits

Author SHA1 Message Date
Ian Letourneau
ea7322f38c Merge branch 'master' into configure-switch
All checks were successful
Run Check Script / check (pull_request) Successful in 1m6s
2025-10-15 14:50:12 -04:00
Ian Letourneau
2edd24753a implement port channel functions for Brocade::NetworkOperatingSystem
All checks were successful
Run Check Script / check (pull_request) Successful in 1m5s
2025-10-15 12:13:09 -04:00
Ian Letourneau
da5be17cb6 implement switch brocade setup by configuring interfaces with operating mode access
All checks were successful
Run Check Script / check (pull_request) Successful in 1m6s
2025-10-14 17:21:13 -04:00
Ian Letourneau
1265cebfa7 get the stack topology (inter-switch links) and all interfaces
All checks were successful
Run Check Script / check (pull_request) Successful in 1m13s
2025-10-08 17:05:25 -04:00
Ian Letourneau
073cccde2f improve wait for shell ready, restore dry run mode
All checks were successful
Run Check Script / check (pull_request) Successful in 1m12s
2025-10-08 10:36:02 -04:00
Ian Letourneau
77e09436a9 refactor brocade to support different shell versions (e.g. FastIron vs NOS)
All checks were successful
Run Check Script / check (pull_request) Successful in 1m13s
2025-10-08 08:14:40 -04:00
Ian Letourneau
45e0de2097 properly configure a new channel port for a host (work only with a clean slate for now) 2025-09-29 17:31:52 -04:00
Ian Letourneau
731dc5f404 add execution mode to run brocade commands in privileged mode (enable) 2025-09-28 16:35:53 -04:00
Ian Letourneau
1199564122 replace command execution by interactive shell 2025-09-28 15:58:13 -04:00
Ian Letourneau
f2f55d98d4 add option to run brocade commands in dry-run 2025-09-28 12:55:37 -04:00
7b6ac6641a add link to finish describing EthtoolSpec 2025-09-25 17:18:42 -04:00
58c1fd4a96 remove unused code 2025-09-25 17:17:23 -04:00
2388f585f5 reorganize modules 2025-09-25 17:13:42 -04:00
ffe3c09907 configure bond with NMState 2025-09-25 16:52:04 -04:00
0de52aedbf find ports in Brocade switch & configure port-channels (blind implementation) 2025-09-16 17:19:32 -04:00
427009bbfe merge 2 configure steps in 1 and let the topology handle it
All checks were successful
Run Check Script / check (pull_request) Successful in 1m2s
2025-09-16 11:34:06 -04:00
fe0501b784 split host & switch config 2025-09-15 22:05:09 -04:00
61b02e7a28 feat(switch): configure host network and switch network 2025-09-15 17:39:19 -04:00
33 changed files with 187 additions and 1002 deletions

48
Cargo.lock generated
View File

@ -1847,6 +1847,18 @@ dependencies = [
"url",
]
[[package]]
name = "example-penpot"
version = "0.1.0"
dependencies = [
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_types",
"tokio",
"url",
]
[[package]]
name = "example-pxe"
version = "0.1.0"
@ -1918,8 +1930,6 @@ dependencies = [
"env_logger",
"harmony",
"harmony_macros",
"harmony_secret",
"harmony_secret_derive",
"harmony_tui",
"harmony_types",
"log",
@ -2446,6 +2456,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "harmony_derive"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d138bbb32bb346299c5f95fbb53532313f39927cb47c411c99c634ef8665ef7"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "harmony_inventory_agent"
version = "0.1.0"
@ -3892,6 +3913,19 @@ dependencies = [
"web-time",
]
[[package]]
name = "okd_host_network"
version = "0.1.0"
dependencies = [
"harmony",
"harmony_cli",
"harmony_derive",
"harmony_inventory_agent",
"harmony_macros",
"harmony_types",
"tokio",
]
[[package]]
name = "once_cell"
version = "1.21.3"
@ -3920,7 +3954,6 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
name = "opnsense-config"
version = "0.1.0"
dependencies = [
"assertor",
"async-trait",
"chrono",
"env_logger",
@ -4613,15 +4646,6 @@ version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001"
[[package]]
name = "remove_rook_osd"
version = "0.1.0"
dependencies = [
"harmony",
"harmony_cli",
"tokio",
]
[[package]]
name = "reqwest"
version = "0.11.27"

View File

@ -15,8 +15,7 @@ members = [
"harmony_inventory_agent",
"harmony_secret_derive",
"harmony_secret",
"adr/agent_discovery/mdns",
"brocade",
"adr/agent_discovery/mdns", "brocade",
]
[workspace.package]

View File

@ -70,7 +70,7 @@ impl FastIronClient {
Some(Ok(InterSwitchLink {
local_port,
remote_port: None,
remote_port: None, // FIXME: Map the remote port as well
}))
}

View File

@ -270,7 +270,7 @@ impl BrocadeClient for NetworkOperatingSystemClient {
commands.push("no ip address".into());
commands.push("no fabric isl enable".into());
commands.push("no fabric trunk enable".into());
commands.push(format!("channel-group {channel_id} mode active"));
commands.push(format!("channel-group {} mode active", channel_id));
commands.push("no shutdown".into());
commands.push("exit".into());
}

View File

@ -1,11 +0,0 @@
[package]
name = "example-remove-rook-osd"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { version = "0.1.0", path = "../../harmony" }
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
tokio.workspace = true

View File

@ -1,18 +0,0 @@
use harmony::{
inventory::Inventory, modules::storage::ceph::ceph_remove_osd_score::CephRemoveOsd,
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let ceph_score = CephRemoveOsd {
osd_deployment_name: "rook-ceph-osd-2".to_string(),
rook_ceph_namespace: "rook-ceph".to_string(),
};
let topology = K8sAnywhereTopology::from_env();
let inventory = Inventory::autoload();
harmony_cli::run(inventory, topology, vec![Box::new(ceph_score)], None)
.await
.unwrap();
}

View File

@ -12,11 +12,11 @@ pub type FirewallGroup = Vec<PhysicalHost>;
pub struct PhysicalHost {
pub id: Id,
pub category: HostCategory,
pub network: Vec<NetworkInterface>,
pub storage: Vec<StorageDrive>,
pub network: Vec<NetworkInterface>, // FIXME: Don't use harmony_inventory_agent::NetworkInterface
pub storage: Vec<StorageDrive>, // FIXME: Don't use harmony_inventory_agent::StorageDrive
pub labels: Vec<Label>,
pub memory_modules: Vec<MemoryModule>,
pub cpus: Vec<CPU>,
pub memory_modules: Vec<MemoryModule>, // FIXME: Don't use harmony_inventory_agent::MemoryModule
pub cpus: Vec<CPU>, // FIXME: Don't use harmony_inventory_agent::CPU
}
impl PhysicalHost {

View File

@ -30,7 +30,6 @@ pub enum InterpretName {
Lamp,
ApplicationMonitoring,
K8sPrometheusCrdAlerting,
CephRemoveOsd,
DiscoverInventoryAgent,
CephClusterHealth,
Custom(&'static str),
@ -62,7 +61,6 @@ impl std::fmt::Display for InterpretName {
InterpretName::Lamp => f.write_str("LAMP"),
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
InterpretName::CephRemoveOsd => f.write_str("CephRemoveOsd"),
InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"),
InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"),
InterpretName::Custom(name) => f.write_str(name),

View File

@ -2,10 +2,9 @@ use async_trait::async_trait;
use brocade::BrocadeOptions;
use harmony_macros::ip;
use harmony_secret::SecretManager;
use harmony_types::{
net::{MacAddress, Url},
switch::PortLocation,
};
use harmony_types::net::MacAddress;
use harmony_types::net::Url;
use harmony_types::switch::PortLocation;
use k8s_openapi::api::core::v1::Namespace;
use kube::api::ObjectMeta;
use log::debug;
@ -16,19 +15,40 @@ use crate::executors::ExecutorError;
use crate::hardware::PhysicalHost;
use crate::infra::brocade::BrocadeSwitchAuth;
use crate::infra::brocade::BrocadeSwitchClient;
use crate::modules::okd::crd::{
InstallPlanApproval, OperatorGroup, OperatorGroupSpec, Subscription, SubscriptionSpec,
nmstate::{self, NMState, NodeNetworkConfigurationPolicy, NodeNetworkConfigurationPolicySpec},
};
use crate::modules::okd::crd::InstallPlanApproval;
use crate::modules::okd::crd::OperatorGroup;
use crate::modules::okd::crd::OperatorGroupSpec;
use crate::modules::okd::crd::Subscription;
use crate::modules::okd::crd::SubscriptionSpec;
use crate::modules::okd::crd::nmstate;
use crate::modules::okd::crd::nmstate::NMState;
use crate::modules::okd::crd::nmstate::NodeNetworkConfigurationPolicy;
use crate::modules::okd::crd::nmstate::NodeNetworkConfigurationPolicySpec;
use crate::topology::PxeOptions;
use super::{
DHCPStaticEntry, DhcpServer, DnsRecord, DnsRecordType, DnsServer, Firewall, HostNetworkConfig,
HttpServer, IpAddress, K8sclient, LoadBalancer, LoadBalancerService, LogicalHost,
PreparationError, PreparationOutcome, Router, Switch, SwitchClient, SwitchError, TftpServer,
Topology, k8s::K8sClient,
};
use super::DHCPStaticEntry;
use super::DhcpServer;
use super::DnsRecord;
use super::DnsRecordType;
use super::DnsServer;
use super::Firewall;
use super::HostNetworkConfig;
use super::HttpServer;
use super::IpAddress;
use super::K8sclient;
use super::LoadBalancer;
use super::LoadBalancerService;
use super::LogicalHost;
use super::PreparationError;
use super::PreparationOutcome;
use super::Router;
use super::Switch;
use super::SwitchClient;
use super::SwitchError;
use super::TftpServer;
use super::Topology;
use super::k8s::K8sClient;
use std::collections::BTreeMap;
use std::net::IpAddr;
use std::sync::Arc;
@ -513,7 +533,7 @@ impl Switch for HAClusterTopology {
host: &PhysicalHost,
config: HostNetworkConfig,
) -> Result<(), SwitchError> {
self.configure_bond(host, &config).await?;
// self.configure_bond(host, &config).await?;
self.configure_port_channel(host, &config).await
}
}

View File

@ -3,11 +3,13 @@ use std::time::Duration;
use derive_new::new;
use k8s_openapi::{
ClusterResourceScope, NamespaceResourceScope,
api::{apps::v1::Deployment, core::v1::Pod},
apimachinery::pkg::version::Info,
api::{
apps::v1::Deployment,
core::v1::{Pod, PodStatus},
},
};
use kube::{
Client, Config, Discovery, Error, Resource,
Client, Config, Error, Resource,
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse,
@ -19,7 +21,7 @@ use kube::{
api::{ApiResource, GroupVersionKind},
runtime::wait::await_condition,
};
use log::{debug, error, info, trace};
use log::{debug, error, trace};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::{Value, json};
use similar::TextDiff;
@ -57,17 +59,6 @@ impl K8sClient {
})
}
pub async fn get_apiserver_version(&self) -> Result<Info, Error> {
let client: Client = self.client.clone();
let version_info: Info = client.apiserver_version().await?;
Ok(version_info)
}
pub async fn discovery(&self) -> Result<Discovery, Error> {
let discovery: Discovery = Discovery::new(self.client.clone()).run().await?;
Ok(discovery)
}
pub async fn get_resource_json_value(
&self,
name: &str,
@ -89,13 +80,10 @@ impl K8sClient {
namespace: Option<&str>,
) -> Result<Option<Deployment>, Error> {
let deps: Api<Deployment> = if let Some(ns) = namespace {
debug!("getting namespaced deployment");
Api::namespaced(self.client.clone(), ns)
} else {
debug!("getting default namespace deployment");
Api::default_namespaced(self.client.clone())
};
debug!("getting deployment {} in ns {}", name, namespace.unwrap());
Ok(deps.get_opt(name).await?)
}
@ -126,7 +114,7 @@ impl K8sClient {
}
});
let pp = PatchParams::default();
let scale = Patch::Merge(&patch);
let scale = Patch::Apply(&patch);
deployments.patch_scale(name, &pp, &scale).await?;
Ok(())
}

View File

@ -47,13 +47,6 @@ struct K8sState {
message: String,
}
#[derive(Debug, Clone)]
pub enum KubernetesDistribution {
OpenshiftFamily,
K3sFamily,
Default,
}
#[derive(Debug, Clone)]
enum K8sSource {
LocalK3d,
@ -64,7 +57,6 @@ enum K8sSource {
pub struct K8sAnywhereTopology {
k8s_state: Arc<OnceCell<Option<K8sState>>>,
tenant_manager: Arc<OnceCell<K8sTenantManager>>,
k8s_distribution: Arc<OnceCell<KubernetesDistribution>>,
config: Arc<K8sAnywhereConfig>,
}
@ -170,7 +162,6 @@ impl K8sAnywhereTopology {
Self {
k8s_state: Arc::new(OnceCell::new()),
tenant_manager: Arc::new(OnceCell::new()),
k8s_distribution: Arc::new(OnceCell::new()),
config: Arc::new(K8sAnywhereConfig::from_env()),
}
}
@ -179,42 +170,10 @@ impl K8sAnywhereTopology {
Self {
k8s_state: Arc::new(OnceCell::new()),
tenant_manager: Arc::new(OnceCell::new()),
k8s_distribution: Arc::new(OnceCell::new()),
config: Arc::new(config),
}
}
pub async fn get_k8s_distribution(&self) -> Result<&KubernetesDistribution, PreparationError> {
self.k8s_distribution
.get_or_try_init(async || {
let client = self.k8s_client().await.unwrap();
let discovery = client.discovery().await.map_err(|e| {
PreparationError::new(format!("Could not discover API groups: {}", e))
})?;
let version = client.get_apiserver_version().await.map_err(|e| {
PreparationError::new(format!("Could not get server version: {}", e))
})?;
// OpenShift / OKD
if discovery
.groups()
.any(|g| g.name() == "project.openshift.io")
{
return Ok(KubernetesDistribution::OpenshiftFamily);
}
// K3d / K3s
if version.git_version.contains("k3s") {
return Ok(KubernetesDistribution::K3sFamily);
}
return Ok(KubernetesDistribution::Default);
})
.await
}
async fn get_cluster_observability_operator_prometheus_application_score(
&self,
sender: RHOBObservability,

View File

@ -28,7 +28,13 @@ pub trait LoadBalancer: Send + Sync {
&self,
service: &LoadBalancerService,
) -> Result<(), ExecutorError> {
self.add_service(service).await?;
debug!(
"Listing LoadBalancer services {:?}",
self.list_services().await
);
if !self.list_services().await.contains(service) {
self.add_service(service).await?;
}
Ok(())
}
}

View File

@ -21,7 +21,6 @@ pub struct AlertingInterpret<S: AlertSender> {
pub sender: S,
pub receivers: Vec<Box<dyn AlertReceiver<S>>>,
pub rules: Vec<Box<dyn AlertRule<S>>>,
pub scrape_targets: Option<Vec<Box<dyn ScrapeTarget<S>>>>,
}
#[async_trait]
@ -39,12 +38,6 @@ impl<S: AlertSender + Installable<T>, T: Topology> Interpret<T> for AlertingInte
debug!("installing rule: {:#?}", rule);
rule.install(&self.sender).await?;
}
if let Some(targets) = &self.scrape_targets {
for target in targets.iter() {
debug!("installing scrape_target: {:#?}", target);
target.install(&self.sender).await?;
}
}
self.sender.ensure_installed(inventory, topology).await?;
Ok(Outcome::success(format!(
"successfully installed alert sender {}",
@ -84,6 +77,6 @@ pub trait AlertRule<S: AlertSender>: std::fmt::Debug + Send + Sync {
}
#[async_trait]
pub trait ScrapeTarget<S: AlertSender>: std::fmt::Debug + Send + Sync {
async fn install(&self, sender: &S) -> Result<Outcome, InterpretError>;
pub trait ScrapeTarget<S: AlertSender> {
async fn install(&self, sender: &S) -> Result<(), InterpretError>;
}

View File

@ -26,13 +26,19 @@ impl LoadBalancer for OPNSenseFirewall {
}
async fn add_service(&self, service: &LoadBalancerService) -> Result<(), ExecutorError> {
warn!(
"TODO : the current implementation does not check / cleanup / merge with existing haproxy services properly. Make sure to manually verify that the configuration is correct after executing any operation here"
);
let mut config = self.opnsense_config.write().await;
let mut load_balancer = config.load_balancer();
let (frontend, backend, servers, healthcheck) =
harmony_load_balancer_service_to_haproxy_xml(service);
load_balancer.configure_service(frontend, backend, servers, healthcheck);
let mut load_balancer = config.load_balancer();
load_balancer.add_backend(backend);
load_balancer.add_frontend(frontend);
load_balancer.add_servers(servers);
if let Some(healthcheck) = healthcheck {
load_balancer.add_healthcheck(healthcheck);
}
Ok(())
}
@ -100,7 +106,7 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer(
.backends
.backends
.iter()
.find(|b| Some(b.uuid.clone()) == frontend.default_backend);
.find(|b| b.uuid == frontend.default_backend);
let mut health_check = None;
match matching_backend {
@ -110,7 +116,8 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer(
}
None => {
warn!(
"HAProxy config could not find a matching backend for frontend {frontend:?}"
"HAProxy config could not find a matching backend for frontend {:?}",
frontend
);
}
}
@ -145,11 +152,11 @@ pub(crate) fn get_servers_for_backend(
.servers
.iter()
.filter_map(|server| {
let address = server.address.clone()?;
let port = server.port?;
if backend_servers.contains(&server.uuid.as_str()) {
return Some(BackendServer { address, port });
return Some(BackendServer {
address: server.address.clone(),
port: server.port,
});
}
None
})
@ -340,7 +347,7 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
name: format!("frontend_{}", service.listening_port),
bind: service.listening_port.to_string(),
mode: "tcp".to_string(), // TODO do not depend on health check here
default_backend: Some(backend.uuid.clone()),
default_backend: backend.uuid.clone(),
..Default::default()
};
info!("HAPRoxy frontend and backend mode currently hardcoded to tcp");
@ -354,8 +361,8 @@ fn server_to_haproxy_server(server: &BackendServer) -> HAProxyServer {
uuid: Uuid::new_v4().to_string(),
name: format!("{}_{}", &server.address, &server.port),
enabled: 1,
address: Some(server.address.clone()),
port: Some(server.port),
address: server.address.clone(),
port: server.port,
mode: "active".to_string(),
server_type: "static".to_string(),
..Default::default()
@ -378,8 +385,8 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: Some("192.168.1.1".to_string()),
port: Some(80),
address: "192.168.1.1".to_string(),
port: 80,
..Default::default()
};
haproxy.servers.servers.push(server);
@ -404,8 +411,8 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: Some("192.168.1.1".to_string()),
port: Some(80),
address: "192.168.1.1".to_string(),
port: 80,
..Default::default()
};
haproxy.servers.servers.push(server);
@ -424,8 +431,8 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: Some("192.168.1.1".to_string()),
port: Some(80),
address: "192.168.1.1".to_string(),
port: 80,
..Default::default()
};
haproxy.servers.servers.push(server);
@ -446,16 +453,16 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: Some("some-hostname.test.mcd".to_string()),
port: Some(80),
address: "some-hostname.test.mcd".to_string(),
port: 80,
..Default::default()
};
haproxy.servers.servers.push(server);
let server = HAProxyServer {
uuid: "server2".to_string(),
address: Some("192.168.1.2".to_string()),
port: Some(8080),
address: "192.168.1.2".to_string(),
port: 8080,
..Default::default()
};
haproxy.servers.servers.push(server);

View File

@ -1,209 +0,0 @@
use std::sync::Arc;
use async_trait::async_trait;
use harmony_types::id::Id;
use kube::{CustomResource, api::ObjectMeta};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
};
#[derive(Clone, Debug, Serialize)]
pub struct ClusterIssuerScore {
email: String,
server: String,
issuer_name: String,
namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for ClusterIssuerScore {
fn name(&self) -> String {
"ClusterIssuerScore".to_string()
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(ClusterIssuerInterpret {
score: self.clone(),
})
}
}
#[derive(Debug, Clone)]
pub struct ClusterIssuerInterpret {
score: ClusterIssuerScore,
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for ClusterIssuerInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
self.apply_cluster_issuer(topology.k8s_client().await.unwrap())
.await
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("ClusterIssuer")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl ClusterIssuerInterpret {
async fn validate_cert_manager(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let cert_manager = "cert-manager".to_string();
let operator_namespace = "openshift-operators".to_string();
match client
.get_deployment(&cert_manager, Some(&operator_namespace))
.await
{
Ok(Some(deployment)) => {
if let Some(status) = deployment.status {
let ready_count = status.ready_replicas.unwrap_or(0);
if ready_count >= 1 {
return Ok(Outcome::success(format!(
"'{}' is ready with {} replica(s).",
&cert_manager, ready_count
)));
} else {
return Err(InterpretError::new(
"cert-manager operator not ready in cluster".to_string(),
));
}
} else {
Err(InterpretError::new(format!(
"failed to get deployment status {} in ns {}",
&cert_manager, &operator_namespace
)))
}
}
Ok(None) => Err(InterpretError::new(format!(
"Deployment '{}' not found in namespace '{}'.",
&cert_manager, &operator_namespace
))),
Err(e) => Err(InterpretError::new(format!(
"Failed to query for deployment '{}': {}",
&cert_manager, e
))),
}
}
fn build_cluster_issuer(&self) -> Result<ClusterIssuer, InterpretError> {
let issuer_name = &self.score.issuer_name;
let email = &self.score.email;
let server = &self.score.server;
let namespace = &self.score.namespace;
let cluster_issuer = ClusterIssuer {
metadata: ObjectMeta {
name: Some(issuer_name.to_string()),
namespace: Some(namespace.to_string()),
..Default::default()
},
spec: ClusterIssuerSpec {
acme: AcmeSpec {
email: email.to_string(),
private_key_secret_ref: PrivateKeySecretRef {
name: issuer_name.to_string(),
},
server: server.to_string(),
solvers: vec![SolverSpec {
http01: Some(Http01Solver {
ingress: Http01Ingress {
class: "nginx".to_string(),
},
}),
}],
},
},
};
Ok(cluster_issuer)
}
pub async fn apply_cluster_issuer(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let namespace = self.score.namespace.clone();
self.validate_cert_manager(&client).await?;
let cluster_issuer = self.build_cluster_issuer().unwrap();
client
.apply_yaml(
&serde_yaml::to_value(cluster_issuer).unwrap(),
Some(&namespace),
)
.await?;
Ok(Outcome::success(format!(
"successfully deployed cluster operator: {} in namespace: {}",
self.score.issuer_name, self.score.namespace
)))
}
}
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "cert-manager.io",
version = "v1",
kind = "ClusterIssuer",
plural = "clusterissuers"
)]
#[serde(rename_all = "camelCase")]
pub struct ClusterIssuerSpec {
pub acme: AcmeSpec,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct AcmeSpec {
pub email: String,
pub private_key_secret_ref: PrivateKeySecretRef,
pub server: String,
pub solvers: Vec<SolverSpec>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct PrivateKeySecretRef {
pub name: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct SolverSpec {
pub http01: Option<Http01Solver>,
// Other solver types (e.g., dns01) would go here as Options
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Http01Solver {
pub ingress: Http01Ingress,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Http01Ingress {
pub class: String,
}

View File

@ -1,3 +1,2 @@
pub mod cluster_issuer;
mod helm;
pub use helm::*;

View File

@ -1,187 +0,0 @@
use std::net::IpAddr;
use async_trait::async_trait;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::{
modules::monitoring::kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus, crd_prometheuses::LabelSelector,
},
topology::oberservability::monitoring::ScrapeTarget,
};
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "monitoring.coreos.com",
version = "v1alpha1",
kind = "ScrapeConfig",
plural = "scrapeconfigs",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct ScrapeConfigSpec {
/// List of static configurations.
pub static_configs: Option<Vec<StaticConfig>>,
/// Kubernetes service discovery.
pub kubernetes_sd_configs: Option<Vec<KubernetesSDConfig>>,
/// HTTP-based service discovery.
pub http_sd_configs: Option<Vec<HttpSDConfig>>,
/// File-based service discovery.
pub file_sd_configs: Option<Vec<FileSDConfig>>,
/// DNS-based service discovery.
pub dns_sd_configs: Option<Vec<DnsSDConfig>>,
/// Consul service discovery.
pub consul_sd_configs: Option<Vec<ConsulSDConfig>>,
/// Relabeling configuration applied to discovered targets.
pub relabel_configs: Option<Vec<RelabelConfig>>,
/// Metric relabeling configuration applied to scraped samples.
pub metric_relabel_configs: Option<Vec<RelabelConfig>>,
/// Path to scrape metrics from (defaults to `/metrics`).
pub metrics_path: Option<String>,
/// Interval at which Prometheus scrapes targets (e.g., "30s").
pub scrape_interval: Option<String>,
/// Timeout for scraping (e.g., "10s").
pub scrape_timeout: Option<String>,
/// Optional job name override.
pub job_name: Option<String>,
/// Optional scheme (http or https).
pub scheme: Option<String>,
/// Authorization paramaters for snmp walk
pub params: Option<Params>,
}
/// Static configuration section of a ScrapeConfig.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct StaticConfig {
pub targets: Vec<String>,
pub labels: Option<LabelSelector>,
}
/// Relabeling configuration for target or metric relabeling.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct RelabelConfig {
pub source_labels: Option<Vec<String>>,
pub separator: Option<String>,
pub target_label: Option<String>,
pub regex: Option<String>,
pub modulus: Option<u64>,
pub replacement: Option<String>,
pub action: Option<String>,
}
/// Kubernetes service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct KubernetesSDConfig {
///"pod", "service", "endpoints"pub role: String,
pub namespaces: Option<NamespaceSelector>,
pub selectors: Option<Vec<LabelSelector>>,
pub api_server: Option<String>,
pub bearer_token_file: Option<String>,
pub tls_config: Option<TLSConfig>,
}
/// Namespace selector for Kubernetes service discovery.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct NamespaceSelector {
pub any: Option<bool>,
pub match_names: Option<Vec<String>>,
}
/// HTTP-based service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct HttpSDConfig {
pub url: String,
pub refresh_interval: Option<String>,
pub basic_auth: Option<BasicAuth>,
pub authorization: Option<Authorization>,
pub tls_config: Option<TLSConfig>,
}
/// File-based service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct FileSDConfig {
pub files: Vec<String>,
pub refresh_interval: Option<String>,
}
/// DNS-based service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct DnsSDConfig {
pub names: Vec<String>,
pub refresh_interval: Option<String>,
pub type_: Option<String>, // SRV, A, AAAA
pub port: Option<u16>,
}
/// Consul service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ConsulSDConfig {
pub server: String,
pub services: Option<Vec<String>>,
pub scheme: Option<String>,
pub datacenter: Option<String>,
pub tag_separator: Option<String>,
pub refresh_interval: Option<String>,
pub tls_config: Option<TLSConfig>,
}
/// Basic authentication credentials.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct BasicAuth {
pub username: String,
pub password: Option<String>,
pub password_file: Option<String>,
}
/// Bearer token or other auth mechanisms.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Authorization {
pub credentials: Option<String>,
pub credentials_file: Option<String>,
pub type_: Option<String>,
}
/// TLS configuration for secure scraping.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct TLSConfig {
pub ca_file: Option<String>,
pub cert_file: Option<String>,
pub key_file: Option<String>,
pub server_name: Option<String>,
pub insecure_skip_verify: Option<bool>,
}
/// Authorization parameters for SNMP walk.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Params {
pub auth: Option<Vec<String>>,
pub module: Option<Vec<String>>,
}

View File

@ -4,7 +4,6 @@ pub mod crd_default_rules;
pub mod crd_grafana;
pub mod crd_prometheus_rules;
pub mod crd_prometheuses;
pub mod crd_scrape_config;
pub mod grafana_default_dashboard;
pub mod grafana_operator;
pub mod prometheus_operator;

View File

@ -31,7 +31,6 @@ impl<T: Topology + HelmCommand + TenantManager> Score<T> for HelmPrometheusAlert
sender: KubePrometheus { config },
receivers: self.receivers.clone(),
rules: self.rules.clone(),
scrape_targets: None,
})
}
fn name(&self) -> String {

View File

@ -6,4 +6,3 @@ pub mod kube_prometheus;
pub mod ntfy;
pub mod okd;
pub mod prometheus;
pub mod scrape_target;

View File

@ -1 +0,0 @@
pub mod server;

View File

@ -1,76 +0,0 @@
use std::net::IpAddr;
use async_trait::async_trait;
use kube::api::ObjectMeta;
use serde::Serialize;
use crate::{
interpret::{InterpretError, Outcome},
modules::monitoring::kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus,
crd_scrape_config::{Params, RelabelConfig, ScrapeConfig, ScrapeConfigSpec, StaticConfig},
},
topology::oberservability::monitoring::ScrapeTarget,
};
#[derive(Debug, Clone, Serialize)]
pub struct Server {
pub name: String,
pub ip: IpAddr,
pub auth: String,
pub module: String,
pub domain: String,
}
#[async_trait]
impl ScrapeTarget<CRDPrometheus> for Server {
async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> {
let scrape_config_spec = ScrapeConfigSpec {
static_configs: Some(vec![StaticConfig {
targets: vec![self.ip.to_string()],
labels: None,
}]),
scrape_interval: Some("2m".to_string()),
kubernetes_sd_configs: None,
http_sd_configs: None,
file_sd_configs: None,
dns_sd_configs: None,
params: Some(Params {
auth: Some(vec![self.auth.clone()]),
module: Some(vec![self.module.clone()]),
}),
consul_sd_configs: None,
relabel_configs: Some(vec![RelabelConfig {
action: None,
source_labels: Some(vec!["__address__".to_string()]),
separator: None,
target_label: Some("__param_target".to_string()),
regex: None,
replacement: Some(format!("snmp.{}:31080", self.domain.clone())),
modulus: None,
}]),
metric_relabel_configs: None,
metrics_path: Some("/snmp".to_string()),
scrape_timeout: Some("2m".to_string()),
job_name: Some(format!("snmp_exporter/cloud/{}", self.name.clone())),
scheme: None,
};
let scrape_config = ScrapeConfig {
metadata: ObjectMeta {
name: Some(self.name.clone()),
namespace: Some(sender.namespace.clone()),
..Default::default()
},
spec: scrape_config_spec,
};
sender
.client
.apply(&scrape_config, Some(&sender.namespace.clone()))
.await?;
Ok(Outcome::success(format!(
"installed scrape target {}",
self.name.clone()
)))
}
}

View File

@ -215,7 +215,7 @@ impl OKDSetup03ControlPlaneInterpret {
) -> Result<(), InterpretError> {
info!("[ControlPlane] Ensuring persistent bonding");
let score = HostNetworkConfigurationScore {
hosts: hosts.clone(),
hosts: hosts.clone(), // FIXME: Avoid clone if possible
};
score.interpret(inventory, topology).await?;

View File

@ -77,8 +77,6 @@ impl OKDBootstrapLoadBalancerScore {
address: topology.bootstrap_host.ip.to_string(),
port,
});
backend.dedup();
backend
}
}

View File

@ -240,7 +240,7 @@ pub struct OvsPortSpec {
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct EthtoolSpec {
// TODO: Properly describe this spec (https://nmstate.io/devel/yaml_api.html#ethtool)
// FIXME: Properly describe this spec (https://nmstate.io/devel/yaml_api.html#ethtool)
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]

View File

@ -50,7 +50,6 @@ impl HostNetworkConfigurationInterpret {
Ok(())
}
async fn collect_switch_ports_for_host<T: Topology + Switch>(
&self,
topology: &T,
@ -126,6 +125,7 @@ impl<T: Topology + Switch> Interpret<T> for HostNetworkConfigurationInterpret {
let mut configured_host_count = 0;
for host in &self.score.hosts {
// FIXME: Clear the previous config for host
self.configure_network_for_host(topology, host).await?;
configured_host_count += 1;
}
@ -283,6 +283,7 @@ mod tests {
#[tokio::test]
async fn port_not_found_for_mac_address_should_not_configure_interface() {
// FIXME: Should it still configure an empty bond/port channel?
let score = given_score(vec![given_host(&HOST_ID, vec![UNKNOWN_INTERFACE.clone()])]);
let topology = TopologyWithSwitch::new_port_not_found();

View File

@ -4,7 +4,7 @@ use std::{
};
use async_trait::async_trait;
use log::{debug, warn};
use log::{info, warn};
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
@ -19,8 +19,8 @@ use harmony_types::id::Id;
#[derive(Debug, Clone, Serialize)]
pub struct CephRemoveOsd {
pub osd_deployment_name: String,
pub rook_ceph_namespace: String,
osd_deployment_name: String,
rook_ceph_namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
@ -54,17 +54,18 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
self.verify_deployment_scaled(client.clone()).await?;
self.delete_deployment(client.clone()).await?;
self.verify_deployment_deleted(client.clone()).await?;
self.purge_ceph_osd(client.clone()).await?;
self.verify_ceph_osd_removal(client.clone()).await?;
let osd_id_full = self.get_ceph_osd_id().unwrap();
self.purge_ceph_osd(client.clone(), &osd_id_full).await?;
self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
.await?;
Ok(Outcome::success(format!(
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
osd_id_full, self.score.osd_deployment_name
)))
}
fn get_name(&self) -> InterpretName {
InterpretName::CephRemoveOsd
todo!()
}
fn get_version(&self) -> Version {
@ -81,7 +82,7 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
}
impl CephRemoveOsdInterpret {
pub fn get_ceph_osd_id_numeric(&self) -> Result<String, InterpretError> {
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self
.score
.osd_deployment_name
@ -93,14 +94,9 @@ impl CephRemoveOsdInterpret {
self.score.osd_deployment_name
))
})?;
Ok(osd_id_numeric.to_string())
}
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
let osd_id_full = format!("osd.{}", osd_id_numeric);
debug!(
info!(
"Targeting Ceph OSD: {} (parsed from deployment {})",
osd_id_full, self.score.osd_deployment_name
);
@ -112,7 +108,6 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
debug!("verifying toolbox exists");
let toolbox_dep = "rook-ceph-tools".to_string();
match client
@ -154,7 +149,7 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
debug!(
info!(
"Scaling down OSD deployment: {}",
self.score.osd_deployment_name
);
@ -177,7 +172,7 @@ impl CephRemoveOsdInterpret {
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
debug!("Waiting for OSD deployment to scale down to 0 replicas");
info!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
@ -185,9 +180,11 @@ impl CephRemoveOsdInterpret {
Some(&self.score.rook_ceph_namespace),
)
.await?;
if let Some(deployment) = dep {
if let Some(status) = deployment.status {
if status.replicas == None && status.ready_replicas == None {
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0
{
return Ok(Outcome::success(
"Deployment successfully scaled down.".to_string(),
));
@ -215,7 +212,7 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
debug!(
info!(
"Deleting OSD deployment: {}",
self.score.osd_deployment_name
);
@ -237,7 +234,7 @@ impl CephRemoveOsdInterpret {
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
debug!("Verifying OSD deployment deleted");
info!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
@ -247,7 +244,7 @@ impl CephRemoveOsdInterpret {
.await?;
if dep.is_none() {
debug!(
info!(
"Deployment {} successfully deleted.",
self.score.osd_deployment_name
);
@ -279,10 +276,12 @@ impl CephRemoveOsdInterpret {
Ok(tree)
}
pub async fn purge_ceph_osd(&self, client: Arc<K8sClient>) -> Result<Outcome, InterpretError> {
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
let osd_id_full = self.get_ceph_osd_id().unwrap();
debug!(
pub async fn purge_ceph_osd(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
info!(
"Purging OSD {} from Ceph cluster and removing its auth key",
osd_id_full
);
@ -292,9 +291,8 @@ impl CephRemoveOsdInterpret {
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec![
"sh",
"-c",
format!("ceph osd purge {osd_id_numeric} --yes-i-really-mean-it && ceph auth del {osd_id_full}").as_str(),
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(),
format!("ceph auth del osd.{osd_id_full}").as_str(),
],
)
.await?;
@ -307,10 +305,10 @@ impl CephRemoveOsdInterpret {
pub async fn verify_ceph_osd_removal(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
let osd_id_full = self.get_ceph_osd_id().unwrap();
debug!(
info!(
"Verifying OSD {} has been removed from the Ceph tree...",
osd_id_full
);
@ -320,7 +318,7 @@ impl CephRemoveOsdInterpret {
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec!["sh", "-c", "ceph osd tree -f json"],
vec!["ceph osd tree -f json"],
)
.await?;
let tree =

View File

@ -1,2 +1,2 @@
pub mod ceph_remove_osd_score;
pub mod ceph_osd_replacement_score;
pub mod ceph_validate_health_score;

View File

@ -77,7 +77,7 @@ impl YaSerializeTrait for HAProxyId {
}
}
#[derive(PartialEq, Debug, Clone)]
#[derive(PartialEq, Debug)]
pub struct HAProxyId(String);
impl Default for HAProxyId {
@ -297,7 +297,7 @@ pub struct HAProxyFrontends {
pub frontend: Vec<Frontend>,
}
#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct Frontend {
#[yaserde(attribute = true)]
pub uuid: String,
@ -310,7 +310,7 @@ pub struct Frontend {
pub bind_options: MaybeString,
pub mode: String,
#[yaserde(rename = "defaultBackend")]
pub default_backend: Option<String>,
pub default_backend: String,
pub ssl_enabled: i32,
pub ssl_certificates: MaybeString,
pub ssl_default_certificate: MaybeString,
@ -416,7 +416,7 @@ pub struct HAProxyBackends {
pub backends: Vec<HAProxyBackend>,
}
#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct HAProxyBackend {
#[yaserde(attribute = true, rename = "uuid")]
pub uuid: String,
@ -535,7 +535,7 @@ pub struct HAProxyServers {
pub servers: Vec<HAProxyServer>,
}
#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct HAProxyServer {
#[yaserde(attribute = true, rename = "uuid")]
pub uuid: String,
@ -543,8 +543,8 @@ pub struct HAProxyServer {
pub enabled: u8,
pub name: String,
pub description: MaybeString,
pub address: Option<String>,
pub port: Option<u16>,
pub address: String,
pub port: u16,
pub checkport: MaybeString,
pub mode: String,
pub multiplexer_protocol: MaybeString,
@ -589,7 +589,7 @@ pub struct HAProxyHealthChecks {
pub healthchecks: Vec<HAProxyHealthCheck>,
}
#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct HAProxyHealthCheck {
#[yaserde(attribute = true)]
pub uuid: String,

View File

@ -25,7 +25,6 @@ sha2 = "0.10.9"
[dev-dependencies]
pretty_assertions.workspace = true
assertor.workspace = true
[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(e2e_test)'] }

View File

@ -30,7 +30,8 @@ impl SshConfigManager {
self.opnsense_shell
.exec(&format!(
"cp /conf/config.xml /conf/backup/{backup_filename}"
"cp /conf/config.xml /conf/backup/{}",
backup_filename
))
.await
}

View File

@ -1,8 +1,10 @@
mod ssh;
use crate::Error;
use async_trait::async_trait;
pub use ssh::*;
use async_trait::async_trait;
use crate::Error;
#[async_trait]
pub trait OPNsenseShell: std::fmt::Debug + Send + Sync {
async fn exec(&self, command: &str) -> Result<String, Error>;

View File

@ -1,8 +1,11 @@
use crate::{config::OPNsenseShell, Error};
use std::sync::Arc;
use log::warn;
use opnsense_config_xml::{
Frontend, HAProxy, HAProxyBackend, HAProxyHealthCheck, HAProxyServer, OPNsense,
};
use std::{collections::HashSet, sync::Arc};
use crate::{config::OPNsenseShell, Error};
pub struct LoadBalancerConfig<'a> {
opnsense: &'a mut OPNsense,
@ -28,7 +31,7 @@ impl<'a> LoadBalancerConfig<'a> {
match &mut self.opnsense.opnsense.haproxy.as_mut() {
Some(haproxy) => f(haproxy),
None => unimplemented!(
"Cannot configure load balancer when haproxy config does not exist yet"
"Adding a backend is not supported when haproxy config does not exist yet"
),
}
}
@ -37,67 +40,21 @@ impl<'a> LoadBalancerConfig<'a> {
self.with_haproxy(|haproxy| haproxy.general.enabled = enabled as i32);
}
/// Configures a service by removing any existing service on the same port
/// and then adding the new definition. This ensures idempotency.
pub fn configure_service(
&mut self,
frontend: Frontend,
backend: HAProxyBackend,
servers: Vec<HAProxyServer>,
healthcheck: Option<HAProxyHealthCheck>,
) {
self.remove_service_by_bind_address(&frontend.bind);
self.remove_servers(&servers);
self.add_new_service(frontend, backend, servers, healthcheck);
pub fn add_backend(&mut self, backend: HAProxyBackend) {
warn!("TODO make sure this new backend does not refer non-existing entities like servers or health checks");
self.with_haproxy(|haproxy| haproxy.backends.backends.push(backend));
}
// Remove the corresponding real servers based on their name if they already exist.
fn remove_servers(&mut self, servers: &[HAProxyServer]) {
let server_names: HashSet<_> = servers.iter().map(|s| s.name.clone()).collect();
self.with_haproxy(|haproxy| {
haproxy
.servers
.servers
.retain(|s| !server_names.contains(&s.name));
});
pub fn add_frontend(&mut self, frontend: Frontend) {
self.with_haproxy(|haproxy| haproxy.frontends.frontend.push(frontend));
}
/// Removes a service and its dependent components based on the frontend's bind address.
/// This performs a cascading delete of the frontend, backend, servers, and health check.
fn remove_service_by_bind_address(&mut self, bind_address: &str) {
self.with_haproxy(|haproxy| {
let Some(old_frontend) = remove_frontend_by_bind_address(haproxy, bind_address) else {
return;
};
let Some(old_backend) = remove_backend(haproxy, old_frontend) else {
return;
};
remove_healthcheck(haproxy, &old_backend);
remove_linked_servers(haproxy, &old_backend);
});
pub fn add_healthcheck(&mut self, healthcheck: HAProxyHealthCheck) {
self.with_haproxy(|haproxy| haproxy.healthchecks.healthchecks.push(healthcheck));
}
/// Adds the components of a new service to the HAProxy configuration.
/// This function de-duplicates servers by name to prevent configuration errors.
fn add_new_service(
&mut self,
frontend: Frontend,
backend: HAProxyBackend,
servers: Vec<HAProxyServer>,
healthcheck: Option<HAProxyHealthCheck>,
) {
self.with_haproxy(|haproxy| {
if let Some(check) = healthcheck {
haproxy.healthchecks.healthchecks.push(check);
}
haproxy.servers.servers.extend(servers);
haproxy.backends.backends.push(backend);
haproxy.frontends.frontend.push(frontend);
});
pub fn add_servers(&mut self, mut servers: Vec<HAProxyServer>) {
self.with_haproxy(|haproxy| haproxy.servers.servers.append(&mut servers));
}
pub async fn reload_restart(&self) -> Result<(), Error> {
@ -125,262 +82,3 @@ impl<'a> LoadBalancerConfig<'a> {
Ok(())
}
}
fn remove_frontend_by_bind_address(haproxy: &mut HAProxy, bind_address: &str) -> Option<Frontend> {
let pos = haproxy
.frontends
.frontend
.iter()
.position(|f| f.bind == bind_address);
match pos {
Some(pos) => Some(haproxy.frontends.frontend.remove(pos)),
None => None,
}
}
fn remove_backend(haproxy: &mut HAProxy, old_frontend: Frontend) -> Option<HAProxyBackend> {
let default_backend = old_frontend.default_backend?;
let pos = haproxy
.backends
.backends
.iter()
.position(|b| b.uuid == default_backend);
match pos {
Some(pos) => Some(haproxy.backends.backends.remove(pos)),
None => None, // orphaned frontend, shouldn't happen
}
}
fn remove_healthcheck(haproxy: &mut HAProxy, backend: &HAProxyBackend) {
if let Some(uuid) = &backend.health_check.content {
haproxy
.healthchecks
.healthchecks
.retain(|h| h.uuid != *uuid);
}
}
/// Remove the backend's servers. This assumes servers are not shared between services.
fn remove_linked_servers(haproxy: &mut HAProxy, backend: &HAProxyBackend) {
if let Some(server_uuids_str) = &backend.linked_servers.content {
let server_uuids_to_remove: HashSet<_> = server_uuids_str.split(',').collect();
haproxy
.servers
.servers
.retain(|s| !server_uuids_to_remove.contains(s.uuid.as_str()));
}
}
#[cfg(test)]
mod tests {
use crate::config::DummyOPNSenseShell;
use assertor::*;
use opnsense_config_xml::{
Frontend, HAProxy, HAProxyBackend, HAProxyBackends, HAProxyFrontends, HAProxyHealthCheck,
HAProxyHealthChecks, HAProxyId, HAProxyServer, HAProxyServers, MaybeString, OPNsense,
};
use std::sync::Arc;
use super::LoadBalancerConfig;
static SERVICE_BIND_ADDRESS: &str = "192.168.1.1:80";
static OTHER_SERVICE_BIND_ADDRESS: &str = "192.168.1.1:443";
static SERVER_ADDRESS: &str = "1.1.1.1:80";
static OTHER_SERVER_ADDRESS: &str = "1.1.1.1:443";
#[test]
fn configure_service_should_add_all_service_components_to_haproxy() {
let mut opnsense = given_opnsense();
let mut load_balancer = given_load_balancer(&mut opnsense);
let (healthcheck, servers, backend, frontend) =
given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS);
load_balancer.configure_service(
frontend.clone(),
backend.clone(),
servers.clone(),
Some(healthcheck.clone()),
);
assert_haproxy_configured_with(
opnsense,
vec![frontend],
vec![backend],
servers,
vec![healthcheck],
);
}
#[test]
fn configure_service_should_replace_service_on_same_bind_address() {
let (healthcheck, servers, backend, frontend) =
given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS);
let mut opnsense = given_opnsense_with(given_haproxy(
vec![frontend.clone()],
vec![backend.clone()],
servers.clone(),
vec![healthcheck.clone()],
));
let mut load_balancer = given_load_balancer(&mut opnsense);
let (updated_healthcheck, updated_servers, updated_backend, updated_frontend) =
given_service(SERVICE_BIND_ADDRESS, OTHER_SERVER_ADDRESS);
load_balancer.configure_service(
updated_frontend.clone(),
updated_backend.clone(),
updated_servers.clone(),
Some(updated_healthcheck.clone()),
);
assert_haproxy_configured_with(
opnsense,
vec![updated_frontend],
vec![updated_backend],
updated_servers,
vec![updated_healthcheck],
);
}
#[test]
fn configure_service_should_keep_existing_service_on_different_bind_addresses() {
let (healthcheck, servers, backend, frontend) =
given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS);
let (other_healthcheck, other_servers, other_backend, other_frontend) =
given_service(OTHER_SERVICE_BIND_ADDRESS, OTHER_SERVER_ADDRESS);
let mut opnsense = given_opnsense_with(given_haproxy(
vec![frontend.clone()],
vec![backend.clone()],
servers.clone(),
vec![healthcheck.clone()],
));
let mut load_balancer = given_load_balancer(&mut opnsense);
load_balancer.configure_service(
other_frontend.clone(),
other_backend.clone(),
other_servers.clone(),
Some(other_healthcheck.clone()),
);
assert_haproxy_configured_with(
opnsense,
vec![frontend, other_frontend],
vec![backend, other_backend],
[servers, other_servers].concat(),
vec![healthcheck, other_healthcheck],
);
}
fn assert_haproxy_configured_with(
opnsense: OPNsense,
frontends: Vec<Frontend>,
backends: Vec<HAProxyBackend>,
servers: Vec<HAProxyServer>,
healthchecks: Vec<HAProxyHealthCheck>,
) {
let haproxy = opnsense.opnsense.haproxy.as_ref().unwrap();
assert_that!(haproxy.frontends.frontend).contains_exactly(frontends);
assert_that!(haproxy.backends.backends).contains_exactly(backends);
assert_that!(haproxy.servers.servers).is_equal_to(servers);
assert_that!(haproxy.healthchecks.healthchecks).contains_exactly(healthchecks);
}
fn given_opnsense() -> OPNsense {
OPNsense::default()
}
fn given_opnsense_with(haproxy: HAProxy) -> OPNsense {
let mut opnsense = OPNsense::default();
opnsense.opnsense.haproxy = Some(haproxy);
opnsense
}
fn given_load_balancer<'a>(opnsense: &'a mut OPNsense) -> LoadBalancerConfig<'a> {
let opnsense_shell = Arc::new(DummyOPNSenseShell {});
if opnsense.opnsense.haproxy.is_none() {
opnsense.opnsense.haproxy = Some(HAProxy::default());
}
LoadBalancerConfig::new(opnsense, opnsense_shell)
}
fn given_service(
bind_address: &str,
server_address: &str,
) -> (
HAProxyHealthCheck,
Vec<HAProxyServer>,
HAProxyBackend,
Frontend,
) {
let healthcheck = given_healthcheck();
let servers = vec![given_server(server_address)];
let backend = given_backend();
let frontend = given_frontend(bind_address);
(healthcheck, servers, backend, frontend)
}
fn given_haproxy(
frontends: Vec<Frontend>,
backends: Vec<HAProxyBackend>,
servers: Vec<HAProxyServer>,
healthchecks: Vec<HAProxyHealthCheck>,
) -> HAProxy {
HAProxy {
frontends: HAProxyFrontends {
frontend: frontends,
},
backends: HAProxyBackends { backends },
servers: HAProxyServers { servers },
healthchecks: HAProxyHealthChecks { healthchecks },
..Default::default()
}
}
fn given_frontend(bind_address: &str) -> Frontend {
Frontend {
uuid: "uuid".into(),
id: HAProxyId::default(),
enabled: 1,
name: format!("frontend_{bind_address}"),
bind: bind_address.into(),
default_backend: Some("backend-uuid".into()),
..Default::default()
}
}
fn given_backend() -> HAProxyBackend {
HAProxyBackend {
uuid: "backend-uuid".into(),
id: HAProxyId::default(),
enabled: 1,
name: "backend_192.168.1.1:80".into(),
linked_servers: MaybeString::from("server-uuid"),
health_check_enabled: 1,
health_check: MaybeString::from("healthcheck-uuid"),
..Default::default()
}
}
fn given_server(address: &str) -> HAProxyServer {
HAProxyServer {
uuid: "server-uuid".into(),
id: HAProxyId::default(),
name: address.into(),
address: Some(address.into()),
..Default::default()
}
}
fn given_healthcheck() -> HAProxyHealthCheck {
HAProxyHealthCheck {
uuid: "healthcheck-uuid".into(),
name: "healthcheck".into(),
..Default::default()
}
}
}