fix(opnsense-config): ensure load balancer service configuration is idempotent #129

Merged
letian merged 8 commits from idempotent-load-balancer into master 2025-10-20 19:18:50 +00:00
5 changed files with 79 additions and 22 deletions
Showing only changes of commit e9a1aa4831 - Show all commits

View File

@ -28,13 +28,7 @@ pub trait LoadBalancer: Send + Sync {
&self,
service: &LoadBalancerService,
) -> Result<(), ExecutorError> {
debug!(
"Listing LoadBalancer services {:?}",
self.list_services().await
);
if !self.list_services().await.contains(service) {
self.add_service(service).await?;
}
self.add_service(service).await?;
letian marked this conversation as resolved
Review

we might want to rename this add_or_update_service instead of just add_service

we might want to rename this `add_or_update_service` instead of just `add_service`
Ok(())
}
}

View File

@ -24,13 +24,11 @@ impl LoadBalancer for OPNSenseFirewall {
}
async fn add_service(&self, service: &LoadBalancerService) -> Result<(), ExecutorError> {
warn!(
"TODO : the current implementation does not check / cleanup / merge with existing haproxy services properly. Make sure to manually verify that the configuration is correct after executing any operation here"
);
let mut config = self.opnsense_config.write().await;
let mut load_balancer = config.load_balancer();
letian marked this conversation as resolved Outdated

For now we only merge the existing config with the service (if it already exists). Should we also remove stale services?

For now we only merge the existing config with the service (if it already exists). Should we also remove stale services?
let (frontend, backend, servers, healthcheck) =
harmony_load_balancer_service_to_haproxy_xml(service);
let mut load_balancer = config.load_balancer();
load_balancer.add_backend(backend);
load_balancer.add_frontend(frontend);
load_balancer.add_servers(servers);

View File

@ -77,7 +77,7 @@ impl YaSerializeTrait for HAProxyId {
}
}
#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone)]
pub struct HAProxyId(String);
impl Default for HAProxyId {

View File

@ -29,8 +29,7 @@ impl SshConfigManager {
self.opnsense_shell
.exec(&format!(
"cp /conf/config.xml /conf/backup/{}",
backup_filename
"cp /conf/config.xml /conf/backup/{backup_filename}"
))
.await
}

View File

@ -40,21 +40,87 @@ impl<'a> LoadBalancerConfig<'a> {
self.with_haproxy(|haproxy| haproxy.general.enabled = enabled as i32);
}
pub fn add_backend(&mut self, backend: HAProxyBackend) {
/// Adds or updates a backend pool.
/// If a backend with the same name exists, it is updated. Otherwise, it is added.
pub fn add_backend(&mut self, mut backend: HAProxyBackend) {
warn!("TODO make sure this new backend does not refer non-existing entities like servers or health checks");
self.with_haproxy(|haproxy| haproxy.backends.backends.push(backend));
self.with_haproxy(|haproxy| {
let existing_backend = haproxy
.backends
.backends
.iter_mut()
.find(|b| b.name == backend.name);
if let Some(existing_backend) = existing_backend {
backend.uuid = existing_backend.uuid.clone(); // This breaks the `frontend` config
// as it is now relying on a stale uuid
letian marked this conversation as resolved Outdated

As of now, the first time you add a load balancer service, everything will work as expected.

Though if you run it again, as we update in place the existing backend and we reuse its uuid, the frontend will be referring to a non-existent uuid for the default_backend (see harmony/src/infra/opnsense/load_balancer.rs on line 323). So we need to fix this before merging.

As of now, the first time you add a load balancer service, everything will work as expected. Though if you run it again, as we update in place the existing backend and we reuse its uuid, the frontend will be referring to a non-existent uuid for the `default_backend` (see `harmony/src/infra/opnsense/load_balancer.rs` on line 323). So we need to fix this before merging.
backend.id = existing_backend.id.clone();
*existing_backend = backend;
} else {
haproxy.backends.backends.push(backend);
}
});
}
pub fn add_frontend(&mut self, frontend: Frontend) {
self.with_haproxy(|haproxy| haproxy.frontends.frontend.push(frontend));
/// Adds or updates a frontend.
/// If a frontend with the same name exists, it is updated. Otherwise, it is added.
pub fn add_frontend(&mut self, mut frontend: Frontend) {
self.with_haproxy(|haproxy| {
let existing_frontend = haproxy
.frontends
.frontend
.iter_mut()
.find(|f| f.name == frontend.name);
if let Some(existing_frontend) = existing_frontend {
frontend.uuid = existing_frontend.uuid.clone();
frontend.id = existing_frontend.id.clone();
*existing_frontend = frontend;
} else {
haproxy.frontends.frontend.push(frontend);
}
});
}
pub fn add_healthcheck(&mut self, healthcheck: HAProxyHealthCheck) {
self.with_haproxy(|haproxy| haproxy.healthchecks.healthchecks.push(healthcheck));
/// Adds or updates a health check.
/// If a health check with the same name exists, it is updated. Otherwise, it is added.
pub fn add_healthcheck(&mut self, mut healthcheck: HAProxyHealthCheck) {
self.with_haproxy(|haproxy| {
let existing_healthcheck = haproxy
.healthchecks
.healthchecks
.iter_mut()
.find(|h| h.name == healthcheck.name);
if let Some(existing_check) = existing_healthcheck {
healthcheck.uuid = existing_check.uuid.clone();
*existing_check = healthcheck;
} else {
haproxy.healthchecks.healthchecks.push(healthcheck);
}
});
}
pub fn add_servers(&mut self, mut servers: Vec<HAProxyServer>) {
self.with_haproxy(|haproxy| haproxy.servers.servers.append(&mut servers));
/// Adds or updates a list of servers to the HAProxy configuration.
/// If a server with the same name already exists, it is updated. Otherwise, it is added.
pub fn add_servers(&mut self, servers: Vec<HAProxyServer>) {
self.with_haproxy(|haproxy| {
for server in servers {
let existing_server = haproxy
.servers
.servers
.iter_mut()
.find(|s| s.name == server.name);
if let Some(existing_server) = existing_server {
existing_server.address = server.address;
existing_server.port = server.port;
existing_server.enabled = server.enabled;
} else {
haproxy.servers.servers.push(server);
}
}
});
}
pub async fn reload_restart(&self) -> Result<(), Error> {