Compare commits

..

2 Commits

Author SHA1 Message Date
c9fb0a4236 Merge pull request 'feat: Refactor load balancer to remove side effect and improve types' (#258) from feat/removesideeffect into master
Some checks failed
Compile and package harmony_composer / package_harmony_composer (push) Failing after 10m55s
Run Check Script / check (push) Failing after 11m5s
Reviewed-on: #258
2026-04-10 15:17:23 +00:00
f02f6ac0c3 feat: Refactor load balancer to remove side effect and improve types
All checks were successful
Run Check Script / check (pull_request) Successful in 2m20s
2026-04-09 22:50:10 -04:00
16 changed files with 98 additions and 290 deletions

View File

@@ -600,7 +600,6 @@ fn build_all_scores() -> Result<Vec<Box<dyn Score<OPNSenseFirewall>>>, Box<dyn s
},
],
private_services: vec![],
wan_firewall_ports: vec![],
};
let dhcp_score = DhcpScore::new(

View File

@@ -69,6 +69,5 @@ fn build_large_score() -> LoadBalancerScore {
lb_service.clone(),
lb_service.clone(),
],
wan_firewall_ports: vec![],
}
}

View File

@@ -15,7 +15,7 @@ use async_trait::async_trait;
use harmony_types::firewall::VipMode;
use harmony_types::id::Id;
use harmony_types::net::{IpAddress, MacAddress};
use log::info;
use log::{info, warn};
use serde::Serialize;
use crate::config::secret::{OPNSenseApiCredentials, OPNSenseFirewallCredentials};
@@ -176,8 +176,24 @@ impl DhcpServer for FirewallPairTopology {
}
async fn list_static_mappings(&self) -> Vec<(MacAddress, IpAddress)> {
// Return primary's view — both should be identical
self.primary.list_static_mappings().await
let primary_mappings = self.primary.list_static_mappings().await;
let backup_mappings = self.backup.list_static_mappings().await;
let primary_set: std::collections::HashSet<_> = primary_mappings.iter().collect();
let backup_set: std::collections::HashSet<_> = backup_mappings.iter().collect();
let only_primary: Vec<_> = primary_set.difference(&backup_set).collect();
let only_backup: Vec<_> = backup_set.difference(&primary_set).collect();
if !only_primary.is_empty() || !only_backup.is_empty() {
warn!(
"DHCP static mapping mismatch between primary and backup firewalls! \
Only on primary: {:?}, Only on backup: {:?}",
only_primary, only_backup
);
}
primary_mappings
}
/// Returns the primary firewall's IP. In a CARP setup, callers

View File

@@ -161,12 +161,8 @@ impl DnsServer for HAClusterTopology {
async fn register_hosts(&self, hosts: Vec<DnsRecord>) -> Result<(), ExecutorError> {
self.dns_server.register_hosts(hosts).await
}
async fn remove_record(
&self,
name: &str,
record_type: DnsRecordType,
) -> Result<(), ExecutorError> {
self.dns_server.remove_record(name, record_type).await
fn remove_record(&self, name: &str, record_type: DnsRecordType) -> Result<(), ExecutorError> {
self.dns_server.remove_record(name, record_type)
}
async fn list_records(&self) -> Vec<DnsRecord> {
self.dns_server.list_records().await
@@ -493,6 +489,9 @@ impl LoadBalancer for DummyInfra {
async fn reload_restart(&self) -> Result<(), ExecutorError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
async fn ensure_wan_access(&self, _port: u16) -> Result<(), ExecutorError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
}
#[async_trait]
@@ -552,11 +551,7 @@ impl DnsServer for DummyInfra {
async fn register_hosts(&self, _hosts: Vec<DnsRecord>) -> Result<(), ExecutorError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
async fn remove_record(
&self,
_name: &str,
_record_type: DnsRecordType,
) -> Result<(), ExecutorError> {
fn remove_record(&self, _name: &str, _record_type: DnsRecordType) -> Result<(), ExecutorError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
async fn list_records(&self) -> Vec<DnsRecord> {

View File

@@ -37,11 +37,9 @@ pub trait LoadBalancer: Send + Sync {
/// from the WAN interface. Used by load balancers that need to receive
/// external traffic (e.g., OKD ingress on ports 80/443).
///
/// Default implementation is a no-op for topologies that don't manage
/// firewall rules (e.g., cloud environments with security groups).
async fn ensure_wan_access(&self, _port: u16) -> Result<(), ExecutorError> {
Ok(())
}
/// Topologies that don't manage firewall rules (e.g., cloud environments
/// with security groups) should return `Ok(())`.
async fn ensure_wan_access(&self, port: u16) -> Result<(), ExecutorError>;
}
#[derive(Debug, PartialEq, Clone, Serialize)]

View File

@@ -90,11 +90,7 @@ pub trait DhcpServer: Send + Sync + Debug {
pub trait DnsServer: Send + Sync {
async fn register_dhcp_leases(&self, register: bool) -> Result<(), ExecutorError>;
async fn register_hosts(&self, hosts: Vec<DnsRecord>) -> Result<(), ExecutorError>;
async fn remove_record(
&self,
name: &str,
record_type: DnsRecordType,
) -> Result<(), ExecutorError>;
fn remove_record(&self, name: &str, record_type: DnsRecordType) -> Result<(), ExecutorError>;
async fn list_records(&self) -> Vec<DnsRecord>;
fn get_ip(&self) -> IpAddress;
fn get_host(&self) -> LogicalHost;
@@ -394,7 +390,7 @@ mod test {
Ok(())
}
async fn remove_record(
fn remove_record(
&self,
_name: &str,
_record_type: DnsRecordType,

View File

@@ -1,96 +1,29 @@
use crate::infra::opnsense::LogicalHost;
use crate::{
executors::ExecutorError,
topology::{DnsRecord, DnsRecordType, DnsServer},
topology::{DnsRecord, DnsServer},
};
use async_trait::async_trait;
use harmony_types::net::IpAddress;
use log::{info, warn};
use super::OPNSenseFirewall;
#[async_trait]
impl DnsServer for OPNSenseFirewall {
async fn register_hosts(&self, hosts: Vec<DnsRecord>) -> Result<(), ExecutorError> {
let dhcp = self.opnsense_config.dhcp();
for record in &hosts {
info!(
"Registering DNS host override: {}.{} -> {}",
record.host, record.domain, record.value
);
dhcp.add_dns_host(&record.host, &record.domain, &record.value.to_string())
.await
.map_err(|e| {
ExecutorError::UnexpectedError(format!(
"Failed to register DNS host {}.{}: {e}",
record.host, record.domain
))
})?;
}
Ok(())
async fn register_hosts(&self, _hosts: Vec<DnsRecord>) -> Result<(), ExecutorError> {
todo!("Refactor this to use dnsmasq API")
}
async fn remove_record(
fn remove_record(
&self,
name: &str,
_record_type: DnsRecordType,
_name: &str,
_record_type: crate::topology::DnsRecordType,
) -> Result<(), ExecutorError> {
let (hostname, domain) = name.split_once('.').ok_or_else(|| {
ExecutorError::UnexpectedError(format!(
"DNS record name '{name}' must be a fully qualified name (host.domain)"
))
})?;
info!("Removing DNS host override: {hostname}.{domain}");
self.opnsense_config
.dhcp()
.remove_dns_host(hostname, domain)
.await
.map_err(|e| {
ExecutorError::UnexpectedError(format!(
"Failed to remove DNS host {hostname}.{domain}: {e}"
))
})
todo!()
}
async fn list_records(&self) -> Vec<DnsRecord> {
match self.opnsense_config.dhcp().list_dns_hosts().await {
Ok(entries) => entries
.into_iter()
.filter_map(|entry| {
let ip: IpAddress = match entry.ip.parse() {
Ok(ip) => ip,
Err(e) => {
warn!(
"Skipping DNS host {}.{} with unparseable IP '{}': {e}",
entry.host, entry.domain, entry.ip
);
return None;
}
};
// Dnsmasq host overrides are A records (IPv4) or AAAA (IPv6)
let record_type = if ip.is_ipv4() {
DnsRecordType::A
} else {
DnsRecordType::AAAA
};
Some(DnsRecord {
host: entry.host,
domain: entry.domain,
record_type,
value: ip,
})
})
.collect(),
Err(e) => {
warn!("Failed to list DNS records: {e}");
vec![]
}
}
async fn list_records(&self) -> Vec<crate::topology::DnsRecord> {
todo!("Refactor this to use dnsmasq API")
}
fn get_ip(&self) -> IpAddress {
@@ -101,15 +34,8 @@ impl DnsServer for OPNSenseFirewall {
self.host.clone()
}
async fn register_dhcp_leases(&self, register: bool) -> Result<(), ExecutorError> {
info!("Setting register DHCP leases as DNS: {register}");
self.opnsense_config
.dhcp()
.set_register_dhcp_leases(register)
.await
.map_err(|e| {
ExecutorError::UnexpectedError(format!("Failed to set register DHCP leases: {e}"))
})
async fn register_dhcp_leases(&self, _register: bool) -> Result<(), ExecutorError> {
todo!("Refactor this to use dnsmasq API")
}
async fn commit_config(&self) -> Result<(), ExecutorError> {

View File

@@ -111,12 +111,16 @@ impl LoadBalancer for OPNSenseFirewall {
}
fn haproxy_service_to_harmony(svc: &HaproxyService) -> Option<LoadBalancerService> {
let listening_port = svc.bind.parse().unwrap_or_else(|_| {
panic!(
"HAProxy frontend address should be a valid SocketAddr, got {}",
svc.bind
)
});
let listening_port = match svc.bind.parse() {
Ok(addr) => addr,
Err(e) => {
warn!(
"Skipping HAProxy service: bind address '{}' is not a valid SocketAddr: {e}",
svc.bind
);
return None;
}
};
let backend_servers: Vec<BackendServer> = svc
.servers

View File

@@ -10,6 +10,7 @@ use virt::sys;
use super::error::KvmError;
use super::types::{CdromConfig, NetworkConfig, VmConfig, VmInterface, VmStatus};
use super::xml;
use harmony_types::net::MacAddress;
/// A handle to a libvirt hypervisor.
///
@@ -374,14 +375,15 @@ impl KvmExecutor {
pub async fn set_interface_link(
&self,
vm_name: &str,
mac: &str,
mac: &MacAddress,
up: bool,
) -> Result<(), KvmError> {
let state = if up { "up" } else { "down" };
info!("Setting {vm_name} interface {mac} link {state}");
let mac_str = mac.to_string();
info!("Setting {vm_name} interface {mac_str} link {state}");
let output = tokio::process::Command::new("virsh")
.args(["-c", &self.uri, "domif-setlink", vm_name, mac, state])
.args(["-c", &self.uri, "domif-setlink", vm_name, &mac_str, state])
.output()
.await?;
@@ -420,11 +422,18 @@ impl KvmExecutor {
// virsh domiflist columns: Interface, Type, Source, Model, MAC
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 5 {
let mac = match MacAddress::try_from(parts[4].to_string()) {
Ok(m) => m,
Err(e) => {
warn!("Skipping interface with invalid MAC '{}': {e}", parts[4]);
continue;
}
};
interfaces.push(VmInterface {
interface_type: parts[1].to_string(),
source: parts[2].to_string(),
model: parts[3].to_string(),
mac: parts[4].to_string(),
mac,
});
}
}

View File

@@ -1,3 +1,4 @@
use harmony_types::net::MacAddress;
use serde::{Deserialize, Serialize};
/// Information about a VM's network interface, as reported by `virsh domiflist`.
@@ -10,7 +11,7 @@ pub struct VmInterface {
/// Device model (e.g. "virtio")
pub model: String,
/// MAC address
pub mac: String,
pub mac: MacAddress,
}
/// Specifies how a KVM host is accessed.
@@ -95,7 +96,7 @@ pub struct NetworkRef {
pub name: String,
/// Optional fixed MAC address for this interface. When `None`, libvirt
/// assigns one automatically.
pub mac: Option<String>,
pub mac: Option<MacAddress>,
}
impl NetworkRef {
@@ -106,8 +107,8 @@ impl NetworkRef {
}
}
pub fn with_mac(mut self, mac: impl Into<String>) -> Self {
self.mac = Some(mac.into());
pub fn with_mac(mut self, mac: MacAddress) -> Self {
self.mac = Some(mac);
self
}
}
@@ -260,7 +261,7 @@ impl VmConfigBuilder {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DhcpHost {
/// MAC address (e.g. `"52:54:00:00:50:01"`).
pub mac: String,
pub mac: MacAddress,
/// IP to assign (e.g. `"10.50.0.2"`).
pub ip: String,
/// Optional hostname.
@@ -356,12 +357,12 @@ impl NetworkConfigBuilder {
/// Add a static DHCP host entry (MAC → fixed IP).
pub fn dhcp_host(
mut self,
mac: impl Into<String>,
mac: MacAddress,
ip: impl Into<String>,
name: Option<String>,
) -> Self {
self.dhcp_hosts.push(DhcpHost {
mac: mac.into(),
mac,
ip: ip.into(),
name,
});

View File

@@ -136,7 +136,7 @@ fn nic_devices(vm: &VmConfig) -> String {
.map(|net| {
let mac_line = net
.mac
.as_deref()
.as_ref()
.map(|m| format!("\n <mac address='{m}'/>"))
.unwrap_or_default();
format!(
@@ -221,6 +221,7 @@ mod tests {
use crate::modules::kvm::types::{
BootDevice, ForwardMode, NetworkConfig, NetworkRef, VmConfig,
};
use harmony_types::net::MacAddress;
// ── Domain XML ──────────────────────────────────────────────────────
@@ -284,12 +285,13 @@ mod tests {
#[test]
fn domain_xml_nic_with_mac_address() {
let mac: MacAddress = "52:54:00:aa:bb:cc".to_string().try_into().unwrap();
let vm = VmConfig::builder("mac-test")
.network(NetworkRef::named("mynet").with_mac("52:54:00:AA:BB:CC"))
.network(NetworkRef::named("mynet").with_mac(mac))
.build();
let xml = domain_xml(&vm, "/tmp");
assert!(xml.contains("mac address='52:54:00:AA:BB:CC'"));
assert!(xml.contains("mac address='52:54:00:aa:bb:cc'"));
}
#[test]
@@ -454,14 +456,11 @@ mod tests {
#[test]
fn network_xml_with_dhcp_host() {
let mac: MacAddress = "52:54:00:00:50:01".to_string().try_into().unwrap();
let cfg = NetworkConfig::builder("hostnet")
.subnet("10.50.0.1", 24)
.dhcp_range("10.50.0.100", "10.50.0.200")
.dhcp_host(
"52:54:00:00:50:01",
"10.50.0.2",
Some("opnsense".to_string()),
)
.dhcp_host(mac, "10.50.0.2", Some("opnsense".to_string()))
.build();
let xml = network_xml(&cfg);

View File

@@ -19,12 +19,6 @@ pub struct LoadBalancerScore {
// (listen_interface, LoadBalancerService) tuples or something like that
// I am not sure what to use as listen_interface, should it be interface name, ip address,
// uuid?
/// TCP ports that must be open for inbound WAN traffic.
///
/// The load balancer interpret will call `ensure_wan_access` for each port
/// before configuring services, so that the load balancer is reachable
/// from outside the LAN.
pub wan_firewall_ports: Vec<u16>,
}
impl<T: Topology + LoadBalancer> Score<T> for LoadBalancerScore {
@@ -66,11 +60,6 @@ impl<T: Topology + LoadBalancer> Interpret<T> for LoadBalancerInterpret {
load_balancer.ensure_initialized().await?
);
for port in &self.score.wan_firewall_ports {
info!("Ensuring WAN access for port {port}");
load_balancer.ensure_wan_access(*port).await?;
}
for service in self.score.public_services.iter() {
info!("Ensuring service exists {service:?}");

View File

@@ -350,13 +350,20 @@ impl OKDSetup02BootstrapInterpret {
&self,
inventory: &Inventory,
) -> Result<(), InterpretError> {
info!("[Stage 02/Bootstrap] Waiting for bootstrap to complete...");
let timeout_minutes: u64 = std::env::var("HARMONY_OKD_BOOTSTRAP_TIMEOUT_MINUTES")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(90);
info!(
"[Stage 02/Bootstrap] Waiting for bootstrap to complete (timeout: {timeout_minutes}m)..."
);
info!("[Stage 02/Bootstrap] Running: openshift-install wait-for bootstrap-complete");
let okd_installation_path =
format!("./data/okd/installation_files_{}", inventory.location.name);
let output = Command::new("./data/okd/bin/openshift-install")
let child = Command::new("./data/okd/bin/openshift-install")
.args([
"wait-for",
"bootstrap-complete",
@@ -364,8 +371,17 @@ impl OKDSetup02BootstrapInterpret {
&okd_installation_path,
"--log-level=info",
])
.output()
.output();
let timeout = std::time::Duration::from_secs(timeout_minutes * 60);
let output = tokio::time::timeout(timeout, child)
.await
.map_err(|_| {
InterpretError::new(format!(
"[Stage 02/Bootstrap] bootstrap-complete timed out after {timeout_minutes} minutes. \
Set HARMONY_OKD_BOOTSTRAP_TIMEOUT_MINUTES to increase the timeout and retry."
))
})?
.map_err(|e| {
InterpretError::new(format!(
"[Stage 02/Bootstrap] Failed to run openshift-install wait-for bootstrap-complete: {e}"

View File

@@ -56,7 +56,6 @@ impl OKDBootstrapLoadBalancerScore {
load_balancer_score: LoadBalancerScore {
public_services: vec![],
private_services,
wan_firewall_ports: vec![80, 443],
},
}
}

View File

@@ -114,7 +114,6 @@ impl OKDLoadBalancerScore {
load_balancer_score: LoadBalancerScore {
public_services,
private_services,
wan_firewall_ports: vec![80, 443],
},
}
}
@@ -339,13 +338,6 @@ mod tests {
assert_eq!(private_service_22623.backend_servers.len(), 3);
}
#[test]
fn test_wan_firewall_ports_include_http_and_https() {
let topology = create_test_topology();
let score = OKDLoadBalancerScore::new(&topology);
assert_eq!(score.load_balancer_score.wan_firewall_ports, vec![80, 443]);
}
#[test]
fn test_all_backend_servers_have_correct_port() {
let topology = create_test_topology();

View File

@@ -77,14 +77,6 @@ fn extract_selected_key(value: &serde_json::Value) -> Option<String> {
}
}
/// A DNS host override entry returned by [`DhcpConfigDnsMasq::list_dns_hosts`].
pub struct DnsHostEntry {
pub uuid: String,
pub host: String,
pub domain: String,
pub ip: String,
}
impl DhcpConfigDnsMasq {
pub fn new(client: OpnsenseClient, shell: Arc<dyn OPNsenseShell>) -> Self {
Self { client, shell }
@@ -451,128 +443,6 @@ dhcp-boot=tag:bios,tag:!ipxe,{bios_filename}{tftp_str}
Ok(result)
}
// ── DNS host override methods ────────────────────────────────────────
/// Lists all DNS host override entries (hostname, domain, IP).
///
/// Entries with missing hostname or IP are silently skipped.
pub async fn list_dns_hosts(&self) -> Result<Vec<DnsHostEntry>, Error> {
let settings = self.get_settings().await?;
let mut result = Vec::new();
for (uuid, entry) in &settings.dnsmasq.hosts {
let Some(host) = entry.host.as_deref().filter(|s| !s.is_empty()) else {
continue;
};
let Some(ip) = entry.ip.as_deref().filter(|s| !s.is_empty()) else {
continue;
};
let domain = entry.domain.as_deref().unwrap_or("").to_string();
result.push(DnsHostEntry {
uuid: uuid.clone(),
host: host.to_string(),
domain,
ip: ip.to_string(),
});
}
Ok(result)
}
/// Adds a DNS host override entry for the given hostname, domain, and IP.
///
/// If an entry with the same hostname and domain already exists, its IP is
/// updated instead of creating a duplicate.
pub async fn add_dns_host(&self, hostname: &str, domain: &str, ip: &str) -> Result<(), Error> {
let settings = self.get_settings().await?;
// Check for existing entry with same hostname + domain
let existing = settings.dnsmasq.hosts.iter().find(|(_, h)| {
h.host.as_deref() == Some(hostname) && h.domain.as_deref() == Some(domain)
});
let host = DnsmasqHost {
host: Some(hostname.to_string()),
domain: Some(domain.to_string()),
ip: Some(vec![ip.to_string()]),
local: Some(true),
..Default::default()
};
if let Some((uuid, _)) = existing {
info!("Updating DNS host override {uuid}: {hostname}.{domain} -> {ip}");
self.api().set_host(uuid, &host).await.map_err(Error::Api)?;
} else {
info!("Creating DNS host override: {hostname}.{domain} -> {ip}");
self.api().add_host(&host).await.map_err(Error::Api)?;
}
self.client
.reconfigure("dnsmasq")
.await
.map_err(Error::Api)?;
Ok(())
}
/// Removes a DNS host override by hostname and domain.
///
/// Returns `Ok(())` even if no matching entry exists (idempotent).
pub async fn remove_dns_host(&self, hostname: &str, domain: &str) -> Result<(), Error> {
let settings = self.get_settings().await?;
let matching: Vec<String> = settings
.dnsmasq
.hosts
.iter()
.filter(|(_, h)| {
h.host.as_deref() == Some(hostname) && h.domain.as_deref() == Some(domain)
})
.map(|(uuid, _)| uuid.clone())
.collect();
for uuid in &matching {
info!("Deleting DNS host override {uuid}: {hostname}.{domain}");
self.api().del_host(uuid).await.map_err(Error::Api)?;
}
if !matching.is_empty() {
self.client
.reconfigure("dnsmasq")
.await
.map_err(Error::Api)?;
}
Ok(())
}
/// Enable or disable registering DHCP leases as DNS entries.
///
/// Sets both `regdhcp` (dynamic leases) and `regdhcpstatic` (static mappings).
pub async fn set_register_dhcp_leases(&self, register: bool) -> Result<(), Error> {
let settings = opnsense_api::generated::dnsmasq::Dnsmasq {
regdhcp: Some(register),
regdhcpstatic: Some(register),
..Default::default()
};
// The OPNsense API expects the top-level settings wrapped in {"dnsmasq": {...}}
let envelope = serde_json::json!({ "dnsmasq": settings });
let _: serde_json::Value = self
.client
.post_typed("dnsmasq", "settings", "set", Some(&envelope))
.await
.map_err(Error::Api)?;
info!("Set register DHCP leases as DNS: regdhcp={register}, regdhcpstatic={register}");
self.client
.reconfigure("dnsmasq")
.await
.map_err(Error::Api)?;
Ok(())
}
fn is_valid_mac(mac: &str) -> bool {
let parts: Vec<&str> = mac.split(':').collect();
if parts.len() != 6 {