harmony/harmony/src/infra/opnsense/load_balancer.rs
Ian Letourneau ed7f81aa1f
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
fix(opnsense-config): ensure load balancer service configuration is idempotent (#129)
The previous implementation blindly added HAProxy components without checking for existing configurations on the same port, which caused duplicate entries and errors when a service was updated.

This commit refactors the logic to a robust "remove-then-add" strategy. The configure_service method now finds and removes any existing frontend and its dependent components (backend, servers, health check) before adding the new, complete service definition.

This change makes the process fully idempotent, preventing configuration drift and ensuring a predictable state.

Co-authored-by: Ian Letourneau <letourneau.ian@gmail.com>
Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/129
2025-10-20 19:18:49 +00:00

481 lines
16 KiB
Rust

use async_trait::async_trait;
use log::{debug, error, info, warn};
use opnsense_config_xml::{
Frontend, HAProxy, HAProxyBackend, HAProxyHealthCheck, HAProxyServer, MaybeString,
};
use uuid::Uuid;
use crate::{
executors::ExecutorError,
topology::{
BackendServer, HealthCheck, HttpMethod, HttpStatusCode, LoadBalancer, LoadBalancerService,
LogicalHost, SSL,
},
};
use harmony_types::net::IpAddress;
use super::OPNSenseFirewall;
#[async_trait]
impl LoadBalancer for OPNSenseFirewall {
fn get_ip(&self) -> IpAddress {
OPNSenseFirewall::get_ip(self)
}
fn get_host(&self) -> LogicalHost {
self.host.clone()
}
async fn add_service(&self, service: &LoadBalancerService) -> Result<(), ExecutorError> {
let mut config = self.opnsense_config.write().await;
let mut load_balancer = config.load_balancer();
let (frontend, backend, servers, healthcheck) =
harmony_load_balancer_service_to_haproxy_xml(service);
load_balancer.configure_service(frontend, backend, servers, healthcheck);
Ok(())
}
async fn remove_service(&self, service: &LoadBalancerService) -> Result<(), ExecutorError> {
todo!("Remove service not implemented yet {service:?}")
}
async fn commit_config(&self) -> Result<(), ExecutorError> {
OPNSenseFirewall::commit_config(self).await
}
async fn reload_restart(&self) -> Result<(), ExecutorError> {
self.opnsense_config
.write()
.await
.load_balancer()
.reload_restart()
.await
.map_err(|e| ExecutorError::UnexpectedError(e.to_string()))
}
async fn ensure_initialized(&self) -> Result<(), ExecutorError> {
let mut config = self.opnsense_config.write().await;
let load_balancer = config.load_balancer();
if let Some(config) = load_balancer.get_full_config() {
debug!(
"HAProxy config available in opnsense config, assuming it is already installed, {config:?}"
);
} else {
config.install_package("os-haproxy").await.map_err(|e| {
ExecutorError::UnexpectedError(format!(
"Executor failed when trying to install os-haproxy package with error {e:?}"
))
})?;
}
config.load_balancer().enable(true);
Ok(())
}
async fn list_services(&self) -> Vec<LoadBalancerService> {
let mut config = self.opnsense_config.write().await;
let load_balancer = config.load_balancer();
let haproxy_xml_config = load_balancer.get_full_config();
haproxy_xml_config_to_harmony_loadbalancer(haproxy_xml_config)
}
}
pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer(
haproxy: &Option<HAProxy>,
) -> Vec<LoadBalancerService> {
let haproxy = match haproxy {
Some(haproxy) => haproxy,
None => return vec![],
};
haproxy
.frontends
.frontend
.iter()
.map(|frontend| {
let mut backend_servers = vec![];
let matching_backend = haproxy
.backends
.backends
.iter()
.find(|b| Some(b.uuid.clone()) == frontend.default_backend);
let mut health_check = None;
match matching_backend {
Some(backend) => {
backend_servers.append(&mut get_servers_for_backend(backend, haproxy));
health_check = get_health_check_for_backend(backend, haproxy);
}
None => {
warn!(
"HAProxy config could not find a matching backend for frontend {frontend:?}"
);
}
}
LoadBalancerService {
backend_servers,
listening_port: frontend.bind.parse().unwrap_or_else(|_| {
panic!(
"HAProxy frontend address should be a valid SocketAddr, got {}",
frontend.bind
)
}),
health_check,
}
})
.collect()
}
pub(crate) fn get_servers_for_backend(
backend: &HAProxyBackend,
haproxy: &HAProxy,
) -> Vec<BackendServer> {
let backend_servers: Vec<&str> = match &backend.linked_servers.content {
Some(linked_servers) => linked_servers.split(',').collect(),
None => {
info!("No server defined for HAProxy backend {:?}", backend);
return vec![];
}
};
haproxy
.servers
.servers
.iter()
.filter_map(|server| {
let address = server.address.clone()?;
let port = server.port?;
if backend_servers.contains(&server.uuid.as_str()) {
return Some(BackendServer { address, port });
}
None
})
.collect()
}
pub(crate) fn get_health_check_for_backend(
backend: &HAProxyBackend,
haproxy: &HAProxy,
) -> Option<HealthCheck> {
let health_check_uuid = match &backend.health_check.content {
Some(uuid) => uuid,
None => return None,
};
let haproxy_health_check = haproxy
.healthchecks
.healthchecks
.iter()
.find(|h| &h.uuid == health_check_uuid)?;
let binding = haproxy_health_check.health_check_type.to_uppercase();
let uppercase = binding.as_str();
match uppercase {
"TCP" => {
if let Some(checkport) = haproxy_health_check.checkport.content.as_ref() {
if !checkport.is_empty() {
return Some(HealthCheck::TCP(Some(checkport.parse().unwrap_or_else(
|_| {
panic!(
"HAProxy check port should be a valid port number, got {checkport}"
)
},
))));
}
}
Some(HealthCheck::TCP(None))
}
"HTTP" => {
let path: String = haproxy_health_check
.http_uri
.content
.clone()
.unwrap_or_default();
let method: HttpMethod = haproxy_health_check
.http_method
.content
.clone()
.unwrap_or_default()
.into();
let status_code: HttpStatusCode = HttpStatusCode::Success2xx;
let ssl = match haproxy_health_check
.ssl
.content_string()
.to_uppercase()
.as_str()
{
"SSL" => SSL::SSL,
"SSLNI" => SSL::SNI,
"NOSSL" => SSL::Disabled,
"" => SSL::Default,
other => {
error!("Unknown haproxy health check ssl config {other}");
SSL::Other(other.to_string())
}
};
Some(HealthCheck::HTTP(path, method, status_code, ssl))
}
_ => panic!("Received unsupported health check type {}", uppercase),
}
}
pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
service: &LoadBalancerService,
) -> (
Frontend,
HAProxyBackend,
Vec<HAProxyServer>,
Option<HAProxyHealthCheck>,
) {
// Here we have to build :
// One frontend
// One backend
// One Option<healthcheck>
// Vec of servers
//
// Then merge then with haproxy config individually
//
// We also have to take into account that it is entirely possible that a backe uses a server
// with the same definition as in another backend. So when creating a new backend, we must not
// blindly create new servers because the backend does not exist yet. Even if it is a new
// backend, it may very well reuse existing servers
//
// Also we need to support router integration for port forwarding on WAN as a strategy to
// handle dyndns
// server is standalone
// backend points on server
// backend points to health check
// frontend points to backend
let healthcheck = if let Some(health_check) = &service.health_check {
match health_check {
HealthCheck::HTTP(path, http_method, _http_status_code, ssl) => {
let ssl: MaybeString = match ssl {
SSL::SSL => "ssl".into(),
SSL::SNI => "sslni".into(),
SSL::Disabled => "nossl".into(),
SSL::Default => "".into(),
SSL::Other(other) => other.as_str().into(),
};
let haproxy_check = HAProxyHealthCheck {
name: format!("HTTP_{http_method}_{path}"),
uuid: Uuid::new_v4().to_string(),
http_method: http_method.to_string().into(),
health_check_type: "http".to_string(),
http_uri: path.clone().into(),
interval: "2s".to_string(),
ssl,
..Default::default()
};
Some(haproxy_check)
}
HealthCheck::TCP(port) => {
let (port, port_name) = match port {
Some(port) => (Some(port.to_string()), port.to_string()),
None => (None, "serverport".to_string()),
};
let haproxy_check = HAProxyHealthCheck {
name: format!("TCP_{port_name}"),
uuid: Uuid::new_v4().to_string(),
health_check_type: "tcp".to_string(),
checkport: port.into(),
interval: "2s".to_string(),
..Default::default()
};
Some(haproxy_check)
}
}
} else {
None
};
debug!("Built healthcheck {healthcheck:?}");
let servers: Vec<HAProxyServer> = service
.backend_servers
.iter()
.map(server_to_haproxy_server)
.collect();
debug!("Built servers {servers:?}");
let mut backend = HAProxyBackend {
uuid: Uuid::new_v4().to_string(),
enabled: 1,
name: format!("backend_{}", service.listening_port),
algorithm: "roundrobin".to_string(),
random_draws: Some(2),
stickiness_expire: "30m".to_string(),
stickiness_size: "50k".to_string(),
stickiness_conn_rate_period: "10s".to_string(),
stickiness_sess_rate_period: "10s".to_string(),
stickiness_http_req_rate_period: "10s".to_string(),
stickiness_http_err_rate_period: "10s".to_string(),
stickiness_bytes_in_rate_period: "1m".to_string(),
stickiness_bytes_out_rate_period: "1m".to_string(),
mode: "tcp".to_string(), // TODO do not depend on health check here
..Default::default()
};
info!("HAPRoxy backend algorithm is currently hardcoded to roundrobin");
if let Some(hcheck) = &healthcheck {
backend.health_check_enabled = 1;
backend.health_check = hcheck.uuid.clone().into();
}
backend.linked_servers = servers
.iter()
.map(|s| s.uuid.as_str())
.collect::<Vec<&str>>()
.join(",")
.into();
debug!("Built backend {backend:?}");
let frontend = Frontend {
uuid: uuid::Uuid::new_v4().to_string(),
enabled: 1,
name: format!("frontend_{}", service.listening_port),
bind: service.listening_port.to_string(),
mode: "tcp".to_string(), // TODO do not depend on health check here
default_backend: Some(backend.uuid.clone()),
..Default::default()
};
info!("HAPRoxy frontend and backend mode currently hardcoded to tcp");
debug!("Built frontend {frontend:?}");
(frontend, backend, servers, healthcheck)
}
fn server_to_haproxy_server(server: &BackendServer) -> HAProxyServer {
HAProxyServer {
uuid: Uuid::new_v4().to_string(),
name: format!("{}_{}", &server.address, &server.port),
enabled: 1,
address: Some(server.address.clone()),
port: Some(server.port),
mode: "active".to_string(),
server_type: "static".to_string(),
..Default::default()
}
}
#[cfg(test)]
mod tests {
use opnsense_config_xml::HAProxyServer;
use super::*;
#[test]
fn test_get_servers_for_backend_with_linked_servers() {
// Create a backend with linked servers
let mut backend = HAProxyBackend::default();
backend.linked_servers.content = Some("server1,server2".to_string());
// Create an HAProxy instance with servers
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: Some("192.168.1.1".to_string()),
port: Some(80),
..Default::default()
};
haproxy.servers.servers.push(server);
// Call the function
let result = get_servers_for_backend(&backend, &haproxy);
// Check the result
assert_eq!(
result,
vec![BackendServer {
address: "192.168.1.1".to_string(),
port: 80,
},]
);
}
#[test]
fn test_get_servers_for_backend_no_linked_servers() {
// Create a backend with no linked servers
let backend = HAProxyBackend::default();
// Create an HAProxy instance with servers
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: Some("192.168.1.1".to_string()),
port: Some(80),
..Default::default()
};
haproxy.servers.servers.push(server);
// Call the function
let result = get_servers_for_backend(&backend, &haproxy);
// Check the result
assert_eq!(result, vec![]);
}
#[test]
fn test_get_servers_for_backend_no_matching_servers() {
// Create a backend with linked servers that do not match any in HAProxy
let mut backend = HAProxyBackend::default();
backend.linked_servers.content = Some("server4,server5".to_string());
// Create an HAProxy instance with servers
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: Some("192.168.1.1".to_string()),
port: Some(80),
..Default::default()
};
haproxy.servers.servers.push(server);
// Call the function
let result = get_servers_for_backend(&backend, &haproxy);
// Check the result
assert_eq!(result, vec![]);
}
#[test]
fn test_get_servers_for_backend_multiple_linked_servers() {
// Create a backend with multiple linked servers
#[allow(clippy::field_reassign_with_default)]
let mut backend = HAProxyBackend::default();
backend.linked_servers.content = Some("server1,server2".to_string());
//
// Create an HAProxy instance with matching servers
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: Some("some-hostname.test.mcd".to_string()),
port: Some(80),
..Default::default()
};
haproxy.servers.servers.push(server);
let server = HAProxyServer {
uuid: "server2".to_string(),
address: Some("192.168.1.2".to_string()),
port: Some(8080),
..Default::default()
};
haproxy.servers.servers.push(server);
// Call the function
let result = get_servers_for_backend(&backend, &haproxy);
// Check the result
assert_eq!(
result,
vec![
BackendServer {
address: "some-hostname.test.mcd".to_string(),
port: 80,
},
BackendServer {
address: "192.168.1.2".to_string(),
port: 8080,
},
]
);
}
}