Compare commits

..

13 Commits

Author SHA1 Message Date
Ian Letourneau
ad61be277b refactor brocade to support different shell versions (e.g. FastIron vs NOS)
Some checks failed
Run Check Script / check (pull_request) Failing after 1m17s
2025-10-07 21:27:45 -04:00
Ian Letourneau
45e0de2097 properly configure a new channel port for a host (work only with a clean slate for now) 2025-09-29 17:31:52 -04:00
Ian Letourneau
731dc5f404 add execution mode to run brocade commands in privileged mode (enable) 2025-09-28 16:35:53 -04:00
Ian Letourneau
1199564122 replace command execution by interactive shell 2025-09-28 15:58:13 -04:00
Ian Letourneau
f2f55d98d4 add option to run brocade commands in dry-run 2025-09-28 12:55:37 -04:00
7b6ac6641a add link to finish describing EthtoolSpec 2025-09-25 17:18:42 -04:00
58c1fd4a96 remove unused code 2025-09-25 17:17:23 -04:00
2388f585f5 reorganize modules 2025-09-25 17:13:42 -04:00
ffe3c09907 configure bond with NMState 2025-09-25 16:52:04 -04:00
0de52aedbf find ports in Brocade switch & configure port-channels (blind implementation) 2025-09-16 17:19:32 -04:00
427009bbfe merge 2 configure steps in 1 and let the topology handle it
All checks were successful
Run Check Script / check (pull_request) Successful in 1m2s
2025-09-16 11:34:06 -04:00
fe0501b784 split host & switch config 2025-09-15 22:05:09 -04:00
61b02e7a28 feat(switch): configure host network and switch network 2025-09-15 17:39:19 -04:00
60 changed files with 688 additions and 2628 deletions

56
Cargo.lock generated
View File

@@ -680,13 +680,11 @@ version = "0.1.0"
dependencies = [
"async-trait",
"env_logger",
"harmony_secret",
"harmony_types",
"log",
"regex",
"russh",
"russh-keys",
"serde",
"tokio",
]
@@ -1780,7 +1778,6 @@ dependencies = [
name = "example-nanodc"
version = "0.1.0"
dependencies = [
"brocade",
"cidr",
"env_logger",
"harmony",
@@ -1789,7 +1786,6 @@ dependencies = [
"harmony_tui",
"harmony_types",
"log",
"serde",
"tokio",
"url",
]
@@ -1808,7 +1804,6 @@ dependencies = [
name = "example-okd-install"
version = "0.1.0"
dependencies = [
"brocade",
"cidr",
"env_logger",
"harmony",
@@ -1823,32 +1818,17 @@ dependencies = [
"url",
]
[[package]]
name = "example-openbao"
version = "0.1.0"
dependencies = [
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_types",
"tokio",
"url",
]
[[package]]
name = "example-opnsense"
version = "0.1.0"
dependencies = [
"brocade",
"cidr",
"env_logger",
"harmony",
"harmony_macros",
"harmony_secret",
"harmony_tui",
"harmony_types",
"log",
"serde",
"tokio",
"url",
]
@@ -1857,7 +1837,6 @@ dependencies = [
name = "example-pxe"
version = "0.1.0"
dependencies = [
"brocade",
"cidr",
"env_logger",
"harmony",
@@ -1872,15 +1851,6 @@ dependencies = [
"url",
]
[[package]]
name = "example-remove-rook-osd"
version = "0.1.0"
dependencies = [
"harmony",
"harmony_cli",
"tokio",
]
[[package]]
name = "example-rust"
version = "0.1.0"
@@ -2393,7 +2363,6 @@ dependencies = [
"once_cell",
"opnsense-config",
"opnsense-config-xml",
"option-ext",
"pretty_assertions",
"reqwest 0.11.27",
"russh",
@@ -2460,6 +2429,17 @@ dependencies = [
"tokio",
]
[[package]]
name = "harmony_derive"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d138bbb32bb346299c5f95fbb53532313f39927cb47c411c99c634ef8665ef7"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "harmony_inventory_agent"
version = "0.1.0"
@@ -3906,6 +3886,19 @@ dependencies = [
"web-time",
]
[[package]]
name = "okd_host_network"
version = "0.1.0"
dependencies = [
"harmony",
"harmony_cli",
"harmony_derive",
"harmony_inventory_agent",
"harmony_macros",
"harmony_types",
"tokio",
]
[[package]]
name = "once_cell"
version = "1.21.3"
@@ -3934,7 +3927,6 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
name = "opnsense-config"
version = "0.1.0"
dependencies = [
"assertor",
"async-trait",
"chrono",
"env_logger",

View File

@@ -15,8 +15,7 @@ members = [
"harmony_inventory_agent",
"harmony_secret_derive",
"harmony_secret",
"adr/agent_discovery/mdns",
"brocade",
"adr/agent_discovery/mdns", "brocade",
]
[workspace.package]

View File

@@ -14,5 +14,3 @@ tokio.workspace = true
log.workspace = true
env_logger.workspace = true
regex = "1.11.3"
harmony_secret = { path = "../harmony_secret" }
serde.workspace = true

View File

@@ -1,70 +1,54 @@
use std::net::{IpAddr, Ipv4Addr};
use brocade::BrocadeOptions;
use harmony_secret::{Secret, SecretManager};
use harmony_types::switch::PortLocation;
use serde::{Deserialize, Serialize};
#[derive(Secret, Clone, Debug, Serialize, Deserialize)]
struct BrocadeSwitchAuth {
username: String,
password: String,
}
#[tokio::main]
async fn main() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).init();
env_logger::init();
// let ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 250)); // old brocade @ ianlet
let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 55, 101)); // brocade @ sto1
// let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 4, 11)); // brocade @ st
let ip = IpAddr::V4(Ipv4Addr::new(10, 0, 0, 250)); // old brocade @ ianlet
// let ip = IpAddr::V4(Ipv4Addr::new(192, 168, 55, 101)); // brocade @ sto1
let switch_addresses = vec![ip];
let config = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
let brocade = brocade::init(&switch_addresses, 22, "admin", "password", None)
.await
.unwrap();
let brocade = brocade::init(
&switch_addresses,
22,
&config.username,
&config.password,
Some(BrocadeOptions {
dry_run: true,
..Default::default()
}),
)
.await
.expect("Brocade client failed to connect");
let entries = brocade.get_stack_topology().await.unwrap();
println!("Stack topology: {entries:#?}");
let entries = brocade.get_interfaces().await.unwrap();
println!("Interfaces: {entries:#?}");
.expect("Brocade client failed to connect");
let version = brocade.version().await.unwrap();
println!("Version: {version:?}");
println!("--------------");
let mac_adddresses = brocade.get_mac_address_table().await.unwrap();
println!("Showing MAC Address table...");
let mac_adddresses = brocade.show_mac_address_table().await.unwrap();
println!("VLAN\tMAC\t\t\tPORT");
for mac in mac_adddresses {
println!("{}\t{}\t{}", mac.vlan, mac.mac_address, mac.port);
}
println!("--------------");
let channel_name = "1";
let channel_name = "HARMONY_LAG";
println!("Clearing port channel '{channel_name}'...");
brocade.clear_port_channel(channel_name).await.unwrap();
println!("Cleared");
println!("--------------");
println!("Finding next available channel...");
let channel_id = brocade.find_available_channel_id().await.unwrap();
println!("Channel id: {channel_id}");
println!("--------------");
let channel_name = "HARMONY_LAG";
let ports = [PortLocation(2, 0, 35)];
let ports = [PortLocation(1, 1, 3), PortLocation(1, 1, 4)];
println!("Creating port channel '{channel_name}' with ports {ports:?}'...");
brocade
.create_port_channel(channel_id, channel_name, &ports)
.await
.unwrap();
println!("Created");
}

View File

@@ -1,7 +1,7 @@
use super::BrocadeClient;
use crate::{
BrocadeInfo, Error, ExecutionMode, InterSwitchLink, InterfaceInfo, MacAddressEntry,
PortChannelId, PortOperatingMode, parse_brocade_mac_address, shell::BrocadeShell,
BrocadeInfo, Error, ExecutionMode, MacAddressEntry, PortChannelId, parse_brocade_mac_address,
shell::BrocadeShell,
};
use async_trait::async_trait;
@@ -10,24 +10,13 @@ use log::{debug, info};
use regex::Regex;
use std::{collections::HashSet, str::FromStr};
#[derive(Debug)]
pub struct FastIronClient {
shell: BrocadeShell,
version: BrocadeInfo,
pub shell: BrocadeShell,
pub version: BrocadeInfo,
}
impl FastIronClient {
pub fn init(mut shell: BrocadeShell, version_info: BrocadeInfo) -> Self {
shell.before_all(vec!["skip-page-display".into()]);
shell.after_all(vec!["page".into()]);
Self {
shell,
version: version_info,
}
}
fn parse_mac_entry(&self, line: &str) -> Option<Result<MacAddressEntry, Error>> {
pub fn parse_mac_entry(&self, line: &str) -> Option<Result<MacAddressEntry, Error>> {
debug!("[Brocade] Parsing mac address entry: {line}");
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 3 {
@@ -60,25 +49,10 @@ impl FastIronClient {
}
}
fn parse_stack_port_entry(&self, line: &str) -> Option<Result<InterSwitchLink, Error>> {
debug!("[Brocade] Parsing stack port entry: {line}");
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 10 {
return None;
}
let local_port = PortLocation::from_str(parts[0]).ok()?;
Some(Ok(InterSwitchLink {
local_port,
remote_port: None,
}))
}
fn build_port_channel_commands(
pub fn build_port_channel_commands(
&self,
channel_id: PortChannelId,
channel_name: &str,
channel_id: u8,
ports: &[PortLocation],
) -> Vec<String> {
let mut commands = vec![
@@ -106,7 +80,7 @@ impl BrocadeClient for FastIronClient {
Ok(self.version.clone())
}
async fn get_mac_address_table(&self) -> Result<Vec<MacAddressEntry>, Error> {
async fn show_mac_address_table(&self) -> Result<Vec<MacAddressEntry>, Error> {
info!("[Brocade] Showing MAC address table...");
let output = self
@@ -121,30 +95,6 @@ impl BrocadeClient for FastIronClient {
.collect()
}
async fn get_stack_topology(&self) -> Result<Vec<InterSwitchLink>, Error> {
let output = self
.shell
.run_command("show interface stack-ports", crate::ExecutionMode::Regular)
.await?;
output
.lines()
.skip(1)
.filter_map(|line| self.parse_stack_port_entry(line))
.collect()
}
async fn get_interfaces(&self) -> Result<Vec<InterfaceInfo>, Error> {
todo!()
}
async fn configure_interfaces(
&self,
_interfaces: Vec<(String, PortOperatingMode)>,
) -> Result<(), Error> {
todo!()
}
async fn find_available_channel_id(&self) -> Result<PortChannelId, Error> {
info!("[Brocade] Finding next available channel id...");
@@ -185,7 +135,7 @@ impl BrocadeClient for FastIronClient {
"[Brocade] Configuring port-channel '{channel_name} {channel_id}' with ports: {ports:?}"
);
let commands = self.build_port_channel_commands(channel_id, channel_name, ports);
let commands = self.build_port_channel_commands(channel_name, channel_id, ports);
self.shell
.run_commands(commands, ExecutionMode::Privileged)
.await?;
@@ -195,7 +145,7 @@ impl BrocadeClient for FastIronClient {
}
async fn clear_port_channel(&self, channel_name: &str) -> Result<(), Error> {
info!("[Brocade] Clearing port-channel: {channel_name}");
debug!("[Brocade] Clearing port-channel: {channel_name}");
let commands = vec![
"configure terminal".to_string(),
@@ -206,7 +156,6 @@ impl BrocadeClient for FastIronClient {
.run_commands(commands, ExecutionMode::Privileged)
.await?;
info!("[Brocade] Port-channel '{channel_name}' cleared.");
Ok(())
}
}

View File

@@ -4,7 +4,6 @@ use std::{
time::Duration,
};
use crate::network_operating_system::NetworkOperatingSystemClient;
use crate::{
fast_iron::FastIronClient,
shell::{BrocadeSession, BrocadeShell},
@@ -16,7 +15,6 @@ use harmony_types::switch::{PortDeclaration, PortLocation};
use regex::Regex;
mod fast_iron;
mod network_operating_system;
mod shell;
mod ssh;
@@ -38,7 +36,7 @@ pub struct TimeoutConfig {
impl Default for TimeoutConfig {
fn default() -> Self {
Self {
shell_ready: Duration::from_secs(10),
shell_ready: Duration::from_secs(3),
command_execution: Duration::from_secs(60), // Commands like `deploy` (for a LAG) can take a while
cleanup: Duration::from_secs(10),
message_wait: Duration::from_millis(500),
@@ -73,70 +71,6 @@ pub struct MacAddressEntry {
pub type PortChannelId = u8;
/// Represents a single physical or logical link connecting two switches within a stack or fabric.
///
/// This structure provides a standardized view of the topology regardless of the
/// underlying Brocade OS configuration (stacking vs. fabric).
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct InterSwitchLink {
/// The local port on the switch where the topology command was run.
pub local_port: PortLocation,
/// The port on the directly connected neighboring switch.
pub remote_port: Option<PortLocation>,
}
/// Represents the key running configuration status of a single switch interface.
#[derive(Debug, PartialEq, Eq, Clone)]
pub struct InterfaceInfo {
/// The full configuration name (e.g., "TenGigabitEthernet 1/0/1", "FortyGigabitEthernet 2/0/2").
pub name: String,
/// The physical location of the interface.
pub port_location: PortLocation,
/// The parsed type and name prefix of the interface.
pub interface_type: InterfaceType,
/// The primary configuration mode defining the interface's behavior (L2, L3, Fabric).
pub operating_mode: Option<PortOperatingMode>,
/// Indicates the current state of the interface.
pub status: InterfaceStatus,
}
/// Categorizes the functional type of a switch interface.
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum InterfaceType {
/// Physical or virtual Ethernet interface (e.g., TenGigabitEthernet, FortyGigabitEthernet).
Ethernet(String),
}
impl fmt::Display for InterfaceType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
InterfaceType::Ethernet(name) => write!(f, "{name}"),
}
}
}
/// Defines the primary configuration mode of a switch interface, representing mutually exclusive roles.
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum PortOperatingMode {
/// The interface is explicitly configured for Brocade fabric roles (ISL or Trunk enabled).
Fabric,
/// The interface is configured for standard Layer 2 switching as Trunk port (`switchport mode trunk`).
Trunk,
/// The interface is configured for standard Layer 2 switching as Access port (`switchport` without trunk mode).
Access,
}
/// Defines the possible status of an interface.
#[derive(Debug, PartialEq, Eq, Clone)]
pub enum InterfaceStatus {
/// The interface is connected.
Connected,
/// The interface is not connected and is not expected to be.
NotConnected,
/// The interface is not connected but is expected to be (configured with `no shutdown`).
SfpAbsent,
}
pub async fn init(
ip_addresses: &[IpAddr],
port: u16,
@@ -153,81 +87,23 @@ pub async fn init(
.await?;
Ok(match version_info.os {
BrocadeOs::FastIron => Box::new(FastIronClient::init(shell, version_info)),
BrocadeOs::NetworkOperatingSystem => {
Box::new(NetworkOperatingSystemClient::init(shell, version_info))
}
BrocadeOs::FastIron => Box::new(FastIronClient {
shell,
version: version_info,
}),
BrocadeOs::NetworkOperatingSystem => todo!(),
BrocadeOs::Unknown => todo!(),
})
}
#[async_trait]
pub trait BrocadeClient: std::fmt::Debug {
/// Retrieves the operating system and version details from the connected Brocade switch.
///
/// This is typically the first call made after establishing a connection to determine
/// the switch OS family (e.g., FastIron, NOS) for feature compatibility.
///
/// # Returns
///
/// A `BrocadeInfo` structure containing parsed OS type and version string.
pub trait BrocadeClient {
async fn version(&self) -> Result<BrocadeInfo, Error>;
/// Retrieves the dynamically learned MAC address table from the switch.
///
/// This is crucial for discovering where specific network endpoints (MAC addresses)
/// are currently located on the physical ports.
///
/// # Returns
///
/// A vector of `MacAddressEntry`, where each entry typically contains VLAN, MAC address,
/// and the associated port name/index.
async fn get_mac_address_table(&self) -> Result<Vec<MacAddressEntry>, Error>;
async fn show_mac_address_table(&self) -> Result<Vec<MacAddressEntry>, Error>;
/// Derives the physical connections used to link multiple switches together
/// to form a single logical entity (stack, fabric, etc.).
///
/// This abstracts the underlying configuration (e.g., stack ports, fabric ports)
/// to return a standardized view of the topology.
///
/// # Returns
///
/// A vector of `InterSwitchLink` structs detailing which ports are used for stacking/fabric.
/// If the switch is not stacked, returns an empty vector.
async fn get_stack_topology(&self) -> Result<Vec<InterSwitchLink>, Error>;
/// Retrieves the status for all interfaces
///
/// # Returns
///
/// A vector of `InterfaceInfo` structures.
async fn get_interfaces(&self) -> Result<Vec<InterfaceInfo>, Error>;
/// Configures a set of interfaces to be operated with a specified mode (access ports, ISL, etc.).
async fn configure_interfaces(
&self,
interfaces: Vec<(String, PortOperatingMode)>,
) -> Result<(), Error>;
/// Scans the existing configuration to find the next available (unused)
/// Port-Channel ID (`lag` or `trunk`) for assignment.
///
/// # Returns
///
/// The smallest, unassigned `PortChannelId` within the supported range.
async fn find_available_channel_id(&self) -> Result<PortChannelId, Error>;
/// Creates and configures a new Port-Channel (Link Aggregation Group or LAG)
/// using the specified channel ID and ports.
///
/// The resulting configuration must be persistent (saved to startup-config).
/// Assumes a static LAG configuration mode unless specified otherwise by the implementation.
///
/// # Parameters
///
/// * `channel_id`: The ID (e.g., 1-128) for the logical port channel.
/// * `channel_name`: A descriptive name for the LAG (used in configuration context).
/// * `ports`: A slice of `PortLocation` structs defining the physical member ports.
async fn create_port_channel(
&self,
channel_id: PortChannelId,
@@ -235,15 +111,6 @@ pub trait BrocadeClient: std::fmt::Debug {
ports: &[PortLocation],
) -> Result<(), Error>;
/// Removes all configuration associated with the specified Port-Channel name.
///
/// This operation should be idempotent; attempting to clear a non-existent
/// channel should succeed (or return a benign error).
///
/// # Parameters
///
/// * `channel_name`: The name of the Port-Channel (LAG) to delete.
///
async fn clear_port_channel(&self, channel_name: &str) -> Result<(), Error>;
}

View File

@@ -1,307 +0,0 @@
use std::str::FromStr;
use async_trait::async_trait;
use harmony_types::switch::{PortDeclaration, PortLocation};
use log::{debug, info};
use crate::{
BrocadeClient, BrocadeInfo, Error, ExecutionMode, InterSwitchLink, InterfaceInfo,
InterfaceStatus, InterfaceType, MacAddressEntry, PortChannelId, PortOperatingMode,
parse_brocade_mac_address, shell::BrocadeShell,
};
#[derive(Debug)]
pub struct NetworkOperatingSystemClient {
shell: BrocadeShell,
version: BrocadeInfo,
}
impl NetworkOperatingSystemClient {
pub fn init(mut shell: BrocadeShell, version_info: BrocadeInfo) -> Self {
shell.before_all(vec!["terminal length 0".into()]);
Self {
shell,
version: version_info,
}
}
fn parse_mac_entry(&self, line: &str) -> Option<Result<MacAddressEntry, Error>> {
debug!("[Brocade] Parsing mac address entry: {line}");
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 5 {
return None;
}
let (vlan, mac_address, port) = match parts.len() {
5 => (
u16::from_str(parts[0]).ok()?,
parse_brocade_mac_address(parts[1]).ok()?,
parts[4].to_string(),
),
_ => (
u16::from_str(parts[0]).ok()?,
parse_brocade_mac_address(parts[1]).ok()?,
parts[5].to_string(),
),
};
let port =
PortDeclaration::parse(&port).map_err(|e| Error::UnexpectedError(format!("{e}")));
match port {
Ok(p) => Some(Ok(MacAddressEntry {
vlan,
mac_address,
port: p,
})),
Err(e) => Some(Err(e)),
}
}
fn parse_inter_switch_link_entry(&self, line: &str) -> Option<Result<InterSwitchLink, Error>> {
debug!("[Brocade] Parsing inter switch link entry: {line}");
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 10 {
return None;
}
let local_port = PortLocation::from_str(parts[2]).ok()?;
let remote_port = PortLocation::from_str(parts[5]).ok()?;
Some(Ok(InterSwitchLink {
local_port,
remote_port: Some(remote_port),
}))
}
fn parse_interface_status_entry(&self, line: &str) -> Option<Result<InterfaceInfo, Error>> {
debug!("[Brocade] Parsing interface status entry: {line}");
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 6 {
return None;
}
let interface_type = match parts[0] {
"Fo" => InterfaceType::Ethernet("FortyGigabitEthernet".to_string()),
"Te" => InterfaceType::Ethernet("TenGigabitEthernet".to_string()),
_ => return None,
};
let port_location = PortLocation::from_str(parts[1]).ok()?;
let status = match parts[2] {
"connected" => InterfaceStatus::Connected,
"notconnected" => InterfaceStatus::NotConnected,
"sfpAbsent" => InterfaceStatus::SfpAbsent,
_ => return None,
};
let operating_mode = match parts[3] {
"ISL" => Some(PortOperatingMode::Fabric),
"Trunk" => Some(PortOperatingMode::Trunk),
"Access" => Some(PortOperatingMode::Access),
"--" => None,
_ => return None,
};
Some(Ok(InterfaceInfo {
name: format!("{} {}", interface_type, port_location),
port_location,
interface_type,
operating_mode,
status,
}))
}
}
#[async_trait]
impl BrocadeClient for NetworkOperatingSystemClient {
async fn version(&self) -> Result<BrocadeInfo, Error> {
Ok(self.version.clone())
}
async fn get_mac_address_table(&self) -> Result<Vec<MacAddressEntry>, Error> {
let output = self
.shell
.run_command("show mac-address-table", ExecutionMode::Regular)
.await?;
output
.lines()
.skip(1)
.filter_map(|line| self.parse_mac_entry(line))
.collect()
}
async fn get_stack_topology(&self) -> Result<Vec<InterSwitchLink>, Error> {
let output = self
.shell
.run_command("show fabric isl", ExecutionMode::Regular)
.await?;
output
.lines()
.skip(6)
.filter_map(|line| self.parse_inter_switch_link_entry(line))
.collect()
}
async fn get_interfaces(&self) -> Result<Vec<InterfaceInfo>, Error> {
let output = self
.shell
.run_command(
"show interface status rbridge-id all",
ExecutionMode::Regular,
)
.await?;
output
.lines()
.skip(2)
.filter_map(|line| self.parse_interface_status_entry(line))
.collect()
}
async fn configure_interfaces(
&self,
interfaces: Vec<(String, PortOperatingMode)>,
) -> Result<(), Error> {
info!("[Brocade] Configuring {} interface(s)...", interfaces.len());
let mut commands = vec!["configure terminal".to_string()];
for interface in interfaces {
commands.push(format!("interface {}", interface.0));
match interface.1 {
PortOperatingMode::Fabric => {
commands.push("fabric isl enable".into());
commands.push("fabric trunk enable".into());
}
PortOperatingMode::Trunk => {
commands.push("switchport".into());
commands.push("switchport mode trunk".into());
commands.push("no spanning-tree shutdown".into());
commands.push("no fabric isl enable".into());
commands.push("no fabric trunk enable".into());
}
PortOperatingMode::Access => {
commands.push("switchport".into());
commands.push("switchport mode access".into());
commands.push("switchport access vlan 1".into());
commands.push("no spanning-tree shutdown".into());
commands.push("no fabric isl enable".into());
commands.push("no fabric trunk enable".into());
}
}
commands.push("no shutdown".into());
commands.push("exit".into());
}
commands.push("write memory".into());
self.shell
.run_commands(commands, ExecutionMode::Regular)
.await?;
info!("[Brocade] Interfaces configured.");
Ok(())
}
async fn find_available_channel_id(&self) -> Result<PortChannelId, Error> {
info!("[Brocade] Finding next available channel id...");
let output = self
.shell
.run_command("show port-channel", ExecutionMode::Regular)
.await?;
let used_ids: Vec<u8> = output
.lines()
.skip(6)
.filter_map(|line| {
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() < 8 {
return None;
}
u8::from_str(parts[0]).ok()
})
.collect();
let mut next_id: u8 = 1;
loop {
if !used_ids.contains(&next_id) {
break;
}
next_id += 1;
}
info!("[Brocade] Found channel id: {next_id}");
Ok(next_id)
}
async fn create_port_channel(
&self,
channel_id: PortChannelId,
channel_name: &str,
ports: &[PortLocation],
) -> Result<(), Error> {
info!(
"[Brocade] Configuring port-channel '{channel_name} {channel_id}' with ports: {ports:?}"
);
let interfaces = self.get_interfaces().await?;
let mut commands = vec![
"configure terminal".into(),
format!("interface port-channel {}", channel_id),
"no shutdown".into(),
"exit".into(),
];
for port in ports {
let interface = interfaces.iter().find(|i| i.port_location == *port);
let Some(interface) = interface else {
continue;
};
commands.push(format!("interface {}", interface.name));
commands.push("no switchport".into());
commands.push("no ip address".into());
commands.push("no fabric isl enable".into());
commands.push("no fabric trunk enable".into());
commands.push(format!("channel-group {channel_id} mode active"));
commands.push("no shutdown".into());
commands.push("exit".into());
}
commands.push("write memory".into());
self.shell
.run_commands(commands, ExecutionMode::Regular)
.await?;
info!("[Brocade] Port-channel '{channel_name}' configured.");
Ok(())
}
async fn clear_port_channel(&self, channel_name: &str) -> Result<(), Error> {
info!("[Brocade] Clearing port-channel: {channel_name}");
let commands = vec![
"configure terminal".into(),
format!("no interface port-channel {}", channel_name),
"exit".into(),
"write memory".into(),
];
self.shell
.run_commands(commands, ExecutionMode::Regular)
.await?;
info!("[Brocade] Port-channel '{channel_name}' cleared.");
Ok(())
}
}

View File

@@ -13,15 +13,12 @@ use log::info;
use russh::ChannelMsg;
use tokio::time::timeout;
#[derive(Debug)]
pub struct BrocadeShell {
ip: IpAddr,
port: u16,
username: String,
password: String,
options: BrocadeOptions,
before_all_commands: Vec<String>,
after_all_commands: Vec<String>,
pub ip: IpAddr,
pub port: u16,
pub username: String,
pub password: String,
pub options: BrocadeOptions,
}
impl BrocadeShell {
@@ -44,8 +41,6 @@ impl BrocadeShell {
port,
username: username.to_string(),
password: password.to_string(),
before_all_commands: vec![],
after_all_commands: vec![],
options,
})
}
@@ -71,22 +66,14 @@ impl BrocadeShell {
>,
{
let mut session = self.open_session(mode).await?;
let _ = session.run_commands(self.before_all_commands.clone()).await;
let result = callback(&mut session).await;
let _ = session.run_commands(self.after_all_commands.clone()).await;
session.close().await?;
result
}
pub async fn run_command(&self, command: &str, mode: ExecutionMode) -> Result<String, Error> {
let mut session = self.open_session(mode).await?;
let _ = session.run_commands(self.before_all_commands.clone()).await;
let result = session.run_command(command).await;
let _ = session.run_commands(self.after_all_commands.clone()).await;
session.close().await?;
result
}
@@ -97,22 +84,10 @@ impl BrocadeShell {
mode: ExecutionMode,
) -> Result<(), Error> {
let mut session = self.open_session(mode).await?;
let _ = session.run_commands(self.before_all_commands.clone()).await;
let result = session.run_commands(commands).await;
let _ = session.run_commands(self.after_all_commands.clone()).await;
session.close().await?;
result
}
pub fn before_all(&mut self, commands: Vec<String>) {
self.before_all_commands = commands;
}
pub fn after_all(&mut self, commands: Vec<String>) {
self.after_all_commands = commands;
}
}
pub struct BrocadeSession {
@@ -173,10 +148,6 @@ impl BrocadeSession {
}
pub async fn run_command(&mut self, command: &str) -> Result<String, Error> {
if self.should_skip_command(command) {
return Ok(String::new());
}
debug!("[Brocade] Running command: '{command}'...");
self.channel
@@ -199,15 +170,7 @@ impl BrocadeSession {
Ok(())
}
fn should_skip_command(&self, command: &str) -> bool {
if (command.starts_with("write") || command.starts_with("deploy")) && self.options.dry_run {
info!("[Brocade] Dry-run mode enabled, skipping command: {command}");
return true;
}
false
}
async fn collect_command_output(&mut self) -> Result<Vec<u8>, Error> {
pub async fn collect_command_output(&mut self) -> Result<Vec<u8>, Error> {
let mut output = Vec::new();
let start = Instant::now();
let read_timeout = Duration::from_millis(500);
@@ -259,7 +222,7 @@ impl BrocadeSession {
Ok(output)
}
fn check_for_command_errors(&self, output: &str, command: &str) -> Result<(), Error> {
pub fn check_for_command_errors(&self, output: &str, command: &str) -> Result<(), Error> {
const ERROR_PATTERNS: &[&str] = &[
"invalid input",
"syntax error",
@@ -291,7 +254,7 @@ impl BrocadeSession {
}
}
async fn wait_for_shell_ready(
pub async fn wait_for_shell_ready(
channel: &mut russh::Channel<russh::client::Msg>,
timeouts: &TimeoutConfig,
) -> Result<(), Error> {
@@ -303,7 +266,6 @@ async fn wait_for_shell_ready(
Ok(Some(ChannelMsg::Data { data })) => {
buffer.extend_from_slice(&data);
let output = String::from_utf8_lossy(&buffer);
let output = output.trim();
if output.ends_with('>') || output.ends_with('#') {
debug!("[Brocade] Shell ready");
return Ok(());
@@ -317,7 +279,7 @@ async fn wait_for_shell_ready(
Ok(())
}
async fn try_elevate_session(
pub async fn try_elevate_session(
channel: &mut russh::Channel<russh::client::Msg>,
username: &str,
password: &str,

View File

@@ -1,133 +0,0 @@
## Working procedure to clone and restore CoreOS disk from OKD Cluster
### **Step 1 - take a backup**
```
sudo dd if=/dev/old of=/dev/backup status=progress
```
### **Step 2 - clone beginning of old disk to new**
```
sudo dd if=/dev/old of=/dev/backup status=progress count=1000 bs=1M
```
### **Step 3 - verify and modify disk partitions**
list disk partitions
```
sgdisk -p /dev/new
```
if new disk is smaller than old disk and there is space on the xfs partition of the old disk, modify partitions of new disk
```
gdisk /dev/new
```
inside of gdisk commands
```
-v -> verify table
-p -> print table
-d -> select partition to delete partition
-n -> recreate partition with same partition number as deleted partition
```
For end sector, either specify the new end or just press Enter for maximum available
When asked about partition type, enter the same type code (it will show the old one)
```
p - >to verify
w -> to write
```
make xfs file system for new partition <new4>
```
sudo mkfs.xfs -f /dev/new4
```
### **Step 4 - copy old PARTUUID **
**careful here**
get old patuuid:
```
sgdisk -i <partition_number> /dev/old_disk # Note the "Partition unique GUID"
```
get labels
```
sgdisk -p /dev/old_disk # Shows partition names in the table
blkid /dev/old_disk* # Shows PARTUUIDs and labels for all partitions
```
set it on new disk
```
sgdisk -u <partition_number>:<old_partuuid> /dev/sdc
```
partition name:
```
sgdisk -c <partition_number>:"<old_name>" /dev/sdc
```
verify all:
```
lsblk -o NAME,SIZE,PARTUUID,PARTLABEL /dev/old_disk
```
### **Step 5 - Mount disks and copy files from old to new disk**
mount files before copy:
```
mkdir -p /mnt/new
mkdir -p /mnt/old
mount /dev/old4 /mnt/old
mount /dev/new4 /mnt/new
```
copy:
with -n flag can run as dry-run
```
rsync -aAXHvn --numeric-ids /source/ /destination/
```
```
rsync -aAXHv --numeric-ids /source/ /destination/
```
### **Step 6 - Set correct UUID for new partition 4**
to set uuid with xfs_admin you must unmount first
unmount old devices
```
umount /mnt/new
umount /mnt/old
```
to set correct uuid for partition 4
```
blkid /dev/old4
```
```
xfs_admin -U <old_uuid> /dev/new_partition
```
to set labels
get it
```
sgdisk -i 4 /dev/sda | grep "Partition name"
```
set it
```
sgdisk -c 4:"<label_name>" /dev/sdc
or
(check existing with xfs_admin -l /dev/old_partition)
Use xfs_admin -L <label> /dev/new_partition
```
### **Step 7 - Verify**
verify everything:
```
sgdisk -p /dev/sda # Old disk
sgdisk -p /dev/sdc # New disk
```
```
lsblk -o NAME,SIZE,PARTUUID,PARTLABEL /dev/sda
lsblk -o NAME,SIZE,PARTUUID,PARTLABEL /dev/sdc
```
```
blkid /dev/sda* | grep UUID=
blkid /dev/sdc* | grep UUID=
```

View File

@@ -17,5 +17,3 @@ harmony_secret = { path = "../../harmony_secret" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }
serde = { workspace = true }
brocade = { path = "../../brocade" }

View File

@@ -3,13 +3,12 @@ use std::{
sync::Arc,
};
use brocade::BrocadeOptions;
use cidr::Ipv4Cidr;
use harmony::{
config::secret::SshKeyPair,
data::{FileContent, FilePath},
hardware::{HostCategory, Location, PhysicalHost, SwitchGroup},
infra::{brocade::BrocadeSwitchClient, opnsense::OPNSenseManagementInterface},
infra::opnsense::OPNSenseManagementInterface,
inventory::Inventory,
modules::{
http::StaticFilesHttpScore,
@@ -23,9 +22,8 @@ use harmony::{
topology::{LogicalHost, UnmanagedRouter},
};
use harmony_macros::{ip, mac_address};
use harmony_secret::{Secret, SecretManager};
use harmony_secret::SecretManager;
use harmony_types::net::Url;
use serde::{Deserialize, Serialize};
#[tokio::main]
async fn main() {
@@ -34,26 +32,6 @@ async fn main() {
name: String::from("fw0"),
};
let switch_auth = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
.await
.expect("Failed to get credentials");
let switches: Vec<IpAddr> = vec![ip!("192.168.33.101")];
let brocade_options = Some(BrocadeOptions {
dry_run: *harmony::config::DRY_RUN,
..Default::default()
});
let switch_client = BrocadeSwitchClient::init(
&switches,
&switch_auth.username,
&switch_auth.password,
brocade_options,
)
.await
.expect("Failed to connect to switch");
let switch_client = Arc::new(switch_client);
let opnsense = Arc::new(
harmony::infra::opnsense::OPNSenseFirewall::new(firewall, None, "root", "opnsense").await,
);
@@ -105,7 +83,7 @@ async fn main() {
name: "wk2".to_string(),
},
],
switch_client: switch_client.clone(),
switch: vec![],
};
let inventory = Inventory {
@@ -188,9 +166,3 @@ async fn main() {
.await
.unwrap();
}
#[derive(Secret, Serialize, Deserialize, Debug)]
pub struct BrocadeSwitchAuth {
pub username: String,
pub password: String,
}

View File

@@ -19,4 +19,3 @@ log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }
serde.workspace = true
brocade = { path = "../../brocade" }

View File

@@ -1,8 +1,7 @@
use brocade::BrocadeOptions;
use cidr::Ipv4Cidr;
use harmony::{
hardware::{Location, SwitchGroup},
infra::{brocade::BrocadeSwitchClient, opnsense::OPNSenseManagementInterface},
infra::opnsense::OPNSenseManagementInterface,
inventory::Inventory,
topology::{HAClusterTopology, LogicalHost, UnmanagedRouter},
};
@@ -23,26 +22,6 @@ pub async fn get_topology() -> HAClusterTopology {
name: String::from("opnsense-1"),
};
let switch_auth = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
.await
.expect("Failed to get credentials");
let switches: Vec<IpAddr> = vec![ip!("192.168.1.101")]; // TODO: Adjust me
let brocade_options = Some(BrocadeOptions {
dry_run: *harmony::config::DRY_RUN,
..Default::default()
});
let switch_client = BrocadeSwitchClient::init(
&switches,
&switch_auth.username,
&switch_auth.password,
brocade_options,
)
.await
.expect("Failed to connect to switch");
let switch_client = Arc::new(switch_client);
let config = SecretManager::get_or_prompt::<OPNSenseFirewallConfig>().await;
let config = config.unwrap();
@@ -79,7 +58,7 @@ pub async fn get_topology() -> HAClusterTopology {
name: "bootstrap".to_string(),
},
workers: vec![],
switch_client: switch_client.clone(),
switch: vec![],
}
}
@@ -96,9 +75,3 @@ pub fn get_inventory() -> Inventory {
control_plane_host: vec![],
}
}
#[derive(Secret, Serialize, Deserialize, Debug)]
pub struct BrocadeSwitchAuth {
pub username: String,
pub password: String,
}

View File

@@ -19,4 +19,3 @@ log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }
serde.workspace = true
brocade = { path = "../../brocade" }

View File

@@ -1,15 +1,13 @@
use brocade::BrocadeOptions;
use cidr::Ipv4Cidr;
use harmony::{
config::secret::OPNSenseFirewallCredentials,
hardware::{Location, SwitchGroup},
infra::{brocade::BrocadeSwitchClient, opnsense::OPNSenseManagementInterface},
infra::opnsense::OPNSenseManagementInterface,
inventory::Inventory,
topology::{HAClusterTopology, LogicalHost, UnmanagedRouter},
};
use harmony_macros::{ip, ipv4};
use harmony_secret::{Secret, SecretManager};
use serde::{Deserialize, Serialize};
use harmony_secret::SecretManager;
use std::{net::IpAddr, sync::Arc};
pub async fn get_topology() -> HAClusterTopology {
@@ -18,26 +16,6 @@ pub async fn get_topology() -> HAClusterTopology {
name: String::from("opnsense-1"),
};
let switch_auth = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
.await
.expect("Failed to get credentials");
let switches: Vec<IpAddr> = vec![ip!("192.168.1.101")]; // TODO: Adjust me
let brocade_options = Some(BrocadeOptions {
dry_run: *harmony::config::DRY_RUN,
..Default::default()
});
let switch_client = BrocadeSwitchClient::init(
&switches,
&switch_auth.username,
&switch_auth.password,
brocade_options,
)
.await
.expect("Failed to connect to switch");
let switch_client = Arc::new(switch_client);
let config = SecretManager::get_or_prompt::<OPNSenseFirewallCredentials>().await;
let config = config.unwrap();
@@ -74,7 +52,7 @@ pub async fn get_topology() -> HAClusterTopology {
name: "cp0".to_string(),
},
workers: vec![],
switch_client: switch_client.clone(),
switch: vec![],
}
}
@@ -91,9 +69,3 @@ pub fn get_inventory() -> Inventory {
control_plane_host: vec![],
}
}
#[derive(Secret, Serialize, Deserialize, Debug)]
pub struct BrocadeSwitchAuth {
pub username: String,
pub password: String,
}

View File

@@ -1,14 +0,0 @@
[package]
name = "example-openbao"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_macros = { path = "../../harmony_macros" }
harmony_types = { path = "../../harmony_types" }
tokio.workspace = true
url.workspace = true

View File

@@ -1,7 +0,0 @@
To install an openbao instance with harmony simply `cargo run -p example-openbao` .
Depending on your environement configuration, it will either install a k3d cluster locally and deploy on it, or install to a remote cluster.
Then follow the openbao documentation to initialize and unseal, this will make openbao usable.
https://openbao.org/docs/platform/k8s/helm/run/

View File

@@ -1,67 +0,0 @@
use std::{collections::HashMap, str::FromStr};
use harmony::{
inventory::Inventory,
modules::helm::chart::{HelmChartScore, HelmRepository, NonBlankString},
topology::K8sAnywhereTopology,
};
use harmony_macros::hurl;
#[tokio::main]
async fn main() {
let values_yaml = Some(
r#"server:
standalone:
enabled: true
config: |
listener "tcp" {
tls_disable = true
address = "[::]:8200"
cluster_address = "[::]:8201"
}
storage "file" {
path = "/openbao/data"
}
service:
enabled: true
dataStorage:
enabled: true
size: 10Gi
storageClass: null
accessMode: ReadWriteOnce
auditStorage:
enabled: true
size: 10Gi
storageClass: null
accessMode: ReadWriteOnce"#
.to_string(),
);
let openbao = HelmChartScore {
namespace: Some(NonBlankString::from_str("openbao").unwrap()),
release_name: NonBlankString::from_str("openbao").unwrap(),
chart_name: NonBlankString::from_str("openbao/openbao").unwrap(),
chart_version: None,
values_overrides: None,
values_yaml,
create_namespace: true,
install_only: true,
repository: Some(HelmRepository::new(
"openbao".to_string(),
hurl!("https://openbao.github.io/openbao-helm"),
true,
)),
};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(openbao)],
None,
)
.await
.unwrap();
}

View File

@@ -16,6 +16,3 @@ harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }
harmony_secret = { path = "../../harmony_secret" }
brocade = { path = "../../brocade" }
serde = { workspace = true }

View File

@@ -3,11 +3,10 @@ use std::{
sync::Arc,
};
use brocade::BrocadeOptions;
use cidr::Ipv4Cidr;
use harmony::{
hardware::{HostCategory, Location, PhysicalHost, SwitchGroup},
infra::{brocade::BrocadeSwitchClient, opnsense::OPNSenseManagementInterface},
infra::opnsense::OPNSenseManagementInterface,
inventory::Inventory,
modules::{
dummy::{ErrorScore, PanicScore, SuccessScore},
@@ -19,9 +18,7 @@ use harmony::{
topology::{LogicalHost, UnmanagedRouter},
};
use harmony_macros::{ip, mac_address};
use harmony_secret::{Secret, SecretManager};
use harmony_types::net::Url;
use serde::{Deserialize, Serialize};
#[tokio::main]
async fn main() {
@@ -30,26 +27,6 @@ async fn main() {
name: String::from("opnsense-1"),
};
let switch_auth = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
.await
.expect("Failed to get credentials");
let switches: Vec<IpAddr> = vec![ip!("192.168.5.101")]; // TODO: Adjust me
let brocade_options = Some(BrocadeOptions {
dry_run: *harmony::config::DRY_RUN,
..Default::default()
});
let switch_client = BrocadeSwitchClient::init(
&switches,
&switch_auth.username,
&switch_auth.password,
brocade_options,
)
.await
.expect("Failed to connect to switch");
let switch_client = Arc::new(switch_client);
let opnsense = Arc::new(
harmony::infra::opnsense::OPNSenseFirewall::new(firewall, None, "root", "opnsense").await,
);
@@ -77,7 +54,7 @@ async fn main() {
name: "cp0".to_string(),
},
workers: vec![],
switch_client: switch_client.clone(),
switch: vec![],
};
let inventory = Inventory {
@@ -132,9 +109,3 @@ async fn main() {
.await
.unwrap();
}
#[derive(Secret, Serialize, Deserialize, Debug)]
pub struct BrocadeSwitchAuth {
pub username: String,
pub password: String,
}

View File

@@ -1,11 +0,0 @@
[package]
name = "example-remove-rook-osd"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { version = "0.1.0", path = "../../harmony" }
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
tokio.workspace = true

View File

@@ -1,18 +0,0 @@
use harmony::{
inventory::Inventory, modules::storage::ceph::ceph_remove_osd_score::CephRemoveOsd,
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let ceph_score = CephRemoveOsd {
osd_deployment_name: "rook-ceph-osd-2".to_string(),
rook_ceph_namespace: "rook-ceph".to_string(),
};
let topology = K8sAnywhereTopology::from_env();
let inventory = Inventory::autoload();
harmony_cli::run(inventory, topology, vec![Box::new(ceph_score)], None)
.await
.unwrap();
}

View File

@@ -78,7 +78,6 @@ askama.workspace = true
sqlx.workspace = true
inquire.workspace = true
brocade = { path = "../brocade" }
option-ext = "0.2.0"
[dev-dependencies]
pretty_assertions.workspace = true

View File

@@ -12,11 +12,11 @@ pub type FirewallGroup = Vec<PhysicalHost>;
pub struct PhysicalHost {
pub id: Id,
pub category: HostCategory,
pub network: Vec<NetworkInterface>,
pub storage: Vec<StorageDrive>,
pub network: Vec<NetworkInterface>, // FIXME: Don't use harmony_inventory_agent::NetworkInterface
pub storage: Vec<StorageDrive>, // FIXME: Don't use harmony_inventory_agent::StorageDrive
pub labels: Vec<Label>,
pub memory_modules: Vec<MemoryModule>,
pub cpus: Vec<CPU>,
pub memory_modules: Vec<MemoryModule>, // FIXME: Don't use harmony_inventory_agent::MemoryModule
pub cpus: Vec<CPU>, // FIXME: Don't use harmony_inventory_agent::CPU
}
impl PhysicalHost {

View File

@@ -30,7 +30,6 @@ pub enum InterpretName {
Lamp,
ApplicationMonitoring,
K8sPrometheusCrdAlerting,
CephRemoveOsd,
DiscoverInventoryAgent,
CephClusterHealth,
Custom(&'static str),
@@ -62,7 +61,6 @@ impl std::fmt::Display for InterpretName {
InterpretName::Lamp => f.write_str("LAMP"),
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
InterpretName::CephRemoveOsd => f.write_str("CephRemoveOsd"),
InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"),
InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"),
InterpretName::Custom(name) => f.write_str(name),

View File

@@ -1,9 +1,10 @@
use async_trait::async_trait;
use brocade::BrocadeOptions;
use harmony_macros::ip;
use harmony_types::{
net::{MacAddress, Url},
switch::PortLocation,
};
use harmony_secret::SecretManager;
use harmony_types::net::MacAddress;
use harmony_types::net::Url;
use harmony_types::switch::PortLocation;
use k8s_openapi::api::core::v1::Namespace;
use kube::api::ObjectMeta;
use log::debug;
@@ -12,20 +13,44 @@ use log::info;
use crate::data::FileContent;
use crate::executors::ExecutorError;
use crate::hardware::PhysicalHost;
use crate::modules::okd::crd::{
InstallPlanApproval, OperatorGroup, OperatorGroupSpec, Subscription, SubscriptionSpec,
nmstate::{self, NMState, NodeNetworkConfigurationPolicy, NodeNetworkConfigurationPolicySpec},
};
use crate::infra::brocade::BrocadeSwitchAuth;
use crate::infra::brocade::BrocadeSwitchClient;
use crate::modules::okd::crd::InstallPlanApproval;
use crate::modules::okd::crd::OperatorGroup;
use crate::modules::okd::crd::OperatorGroupSpec;
use crate::modules::okd::crd::Subscription;
use crate::modules::okd::crd::SubscriptionSpec;
use crate::modules::okd::crd::nmstate;
use crate::modules::okd::crd::nmstate::NMState;
use crate::modules::okd::crd::nmstate::NodeNetworkConfigurationPolicy;
use crate::modules::okd::crd::nmstate::NodeNetworkConfigurationPolicySpec;
use crate::topology::PxeOptions;
use super::{
DHCPStaticEntry, DhcpServer, DnsRecord, DnsRecordType, DnsServer, Firewall, HostNetworkConfig,
HttpServer, IpAddress, K8sclient, LoadBalancer, LoadBalancerService, LogicalHost,
PreparationError, PreparationOutcome, Router, Switch, SwitchClient, SwitchError, TftpServer,
Topology, k8s::K8sClient,
};
use super::DHCPStaticEntry;
use super::DhcpServer;
use super::DnsRecord;
use super::DnsRecordType;
use super::DnsServer;
use super::Firewall;
use super::HostNetworkConfig;
use super::HttpServer;
use super::IpAddress;
use super::K8sclient;
use super::LoadBalancer;
use super::LoadBalancerService;
use super::LogicalHost;
use super::PreparationError;
use super::PreparationOutcome;
use super::Router;
use super::Switch;
use super::SwitchClient;
use super::SwitchError;
use super::TftpServer;
use super::Topology;
use super::k8s::K8sClient;
use std::collections::BTreeMap;
use std::net::IpAddr;
use std::sync::Arc;
#[derive(Debug, Clone)]
@@ -38,10 +63,10 @@ pub struct HAClusterTopology {
pub tftp_server: Arc<dyn TftpServer>,
pub http_server: Arc<dyn HttpServer>,
pub dns_server: Arc<dyn DnsServer>,
pub switch_client: Arc<dyn SwitchClient>,
pub bootstrap_host: LogicalHost,
pub control_plane: Vec<LogicalHost>,
pub workers: Vec<LogicalHost>,
pub switch: Vec<LogicalHost>,
}
#[async_trait]
@@ -275,15 +300,36 @@ impl HAClusterTopology {
}
}
async fn get_switch_client(&self) -> Result<Box<dyn SwitchClient>, SwitchError> {
let auth = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
.await
.map_err(|e| SwitchError::new(format!("Failed to get credentials: {e}")))?;
// FIXME: We assume Brocade switches
let switches: Vec<IpAddr> = self.switch.iter().map(|s| s.ip).collect();
let brocade_options = Some(BrocadeOptions {
dry_run: *crate::config::DRY_RUN,
..Default::default()
});
let client =
BrocadeSwitchClient::init(&switches, &auth.username, &auth.password, brocade_options)
.await
.map_err(|e| SwitchError::new(format!("Failed to connect to switch: {e}")))?;
Ok(Box::new(client))
}
async fn configure_port_channel(
&self,
host: &PhysicalHost,
config: &HostNetworkConfig,
) -> Result<(), SwitchError> {
debug!("Configuring port channel: {config:#?}");
let client = self.get_switch_client().await?;
let switch_ports = config.switch_ports.iter().map(|s| s.port.clone()).collect();
self.switch_client
client
.configure_port_channel(&format!("Harmony_{}", host.id), switch_ports)
.await
.map_err(|e| SwitchError::new(format!("Failed to configure switch: {e}")))?;
@@ -307,10 +353,10 @@ impl HAClusterTopology {
tftp_server: dummy_infra.clone(),
http_server: dummy_infra.clone(),
dns_server: dummy_infra.clone(),
switch_client: dummy_infra.clone(),
bootstrap_host: dummy_host,
control_plane: vec![],
workers: vec![],
switch: vec![],
}
}
}
@@ -467,16 +513,12 @@ impl HttpServer for HAClusterTopology {
#[async_trait]
impl Switch for HAClusterTopology {
async fn setup_switch(&self) -> Result<(), SwitchError> {
self.switch_client.setup().await?;
Ok(())
}
async fn get_port_for_mac_address(
&self,
mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError> {
let port = self.switch_client.find_port(mac_address).await?;
let client = self.get_switch_client().await?;
let port = client.find_port(mac_address).await?;
Ok(port)
}
@@ -485,7 +527,7 @@ impl Switch for HAClusterTopology {
host: &PhysicalHost,
config: HostNetworkConfig,
) -> Result<(), SwitchError> {
self.configure_bond(host, &config).await?;
// self.configure_bond(host, &config).await?;
self.configure_port_channel(host, &config).await
}
}
@@ -676,25 +718,3 @@ impl DnsServer for DummyInfra {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
}
#[async_trait]
impl SwitchClient for DummyInfra {
async fn setup(&self) -> Result<(), SwitchError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
async fn find_port(
&self,
_mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
async fn configure_port_channel(
&self,
_channel_name: &str,
_switch_ports: Vec<PortLocation>,
) -> Result<u8, SwitchError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
}

View File

@@ -1,17 +1,13 @@
use std::time::Duration;
use derive_new::new;
use k8s_openapi::{
ClusterResourceScope, NamespaceResourceScope,
api::{apps::v1::Deployment, core::v1::Pod},
apimachinery::pkg::version::Info,
};
use kube::{
Client, Config, Discovery, Error, Resource,
Client, Config, Error, Resource,
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse,
error::DiscoveryError,
runtime::reflector::Lookup,
};
use kube::{api::DynamicObject, runtime::conditions};
@@ -19,11 +15,11 @@ use kube::{
api::{ApiResource, GroupVersionKind},
runtime::wait::await_condition,
};
use log::{debug, error, info, trace};
use log::{debug, error, trace};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::{Value, json};
use similar::TextDiff;
use tokio::{io::AsyncReadExt, time::sleep};
use tokio::io::AsyncReadExt;
#[derive(new, Clone)]
pub struct K8sClient {
@@ -57,17 +53,6 @@ impl K8sClient {
})
}
pub async fn get_apiserver_version(&self) -> Result<Info, Error> {
let client: Client = self.client.clone();
let version_info: Info = client.apiserver_version().await?;
Ok(version_info)
}
pub async fn discovery(&self) -> Result<Discovery, Error> {
let discovery: Discovery = Discovery::new(self.client.clone()).run().await?;
Ok(discovery)
}
pub async fn get_resource_json_value(
&self,
name: &str,
@@ -89,13 +74,10 @@ impl K8sClient {
namespace: Option<&str>,
) -> Result<Option<Deployment>, Error> {
let deps: Api<Deployment> = if let Some(ns) = namespace {
debug!("getting namespaced deployment");
Api::namespaced(self.client.clone(), ns)
} else {
debug!("getting default namespace deployment");
Api::default_namespaced(self.client.clone())
};
debug!("getting deployment {} in ns {}", name, namespace.unwrap());
Ok(deps.get_opt(name).await?)
}
@@ -126,7 +108,7 @@ impl K8sClient {
}
});
let pp = PatchParams::default();
let scale = Patch::Merge(&patch);
let scale = Patch::Apply(&patch);
deployments.patch_scale(name, &pp, &scale).await?;
Ok(())
}
@@ -171,41 +153,6 @@ impl K8sClient {
}
}
pub async fn wait_for_pod_ready(
&self,
pod_name: &str,
namespace: Option<&str>,
) -> Result<(), Error> {
let mut elapsed = 0;
let interval = 5; // seconds between checks
let timeout_secs = 120;
loop {
let pod = self.get_pod(pod_name, namespace).await?;
if let Some(p) = pod {
if let Some(status) = p.status {
if let Some(phase) = status.phase {
if phase.to_lowercase() == "running" {
return Ok(());
}
}
}
}
if elapsed >= timeout_secs {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"'{}' in ns '{}' did not become ready within {}s",
pod_name,
namespace.unwrap(),
timeout_secs
))));
}
sleep(Duration::from_secs(interval)).await;
elapsed += interval;
}
}
/// Will execute a commond in the first pod found that matches the specified label
/// '{label}={name}'
pub async fn exec_app_capture_output(
@@ -472,12 +419,9 @@ impl K8sClient {
.as_str()
.expect("couldn't get kind as str");
let mut it = api_version.splitn(2, '/');
let first = it.next().unwrap();
let (g, v) = match it.next() {
Some(second) => (first, second),
None => ("", first),
};
let split: Vec<&str> = api_version.splitn(2, "/").collect();
let g = split[0];
let v = split[1];
let gvk = GroupVersionKind::gvk(g, v, kind);
let api_resource = ApiResource::from_gvk(&gvk);

View File

@@ -47,13 +47,6 @@ struct K8sState {
message: String,
}
#[derive(Debug, Clone)]
pub enum KubernetesDistribution {
OpenshiftFamily,
K3sFamily,
Default,
}
#[derive(Debug, Clone)]
enum K8sSource {
LocalK3d,
@@ -64,7 +57,6 @@ enum K8sSource {
pub struct K8sAnywhereTopology {
k8s_state: Arc<OnceCell<Option<K8sState>>>,
tenant_manager: Arc<OnceCell<K8sTenantManager>>,
k8s_distribution: Arc<OnceCell<KubernetesDistribution>>,
config: Arc<K8sAnywhereConfig>,
}
@@ -170,7 +162,6 @@ impl K8sAnywhereTopology {
Self {
k8s_state: Arc::new(OnceCell::new()),
tenant_manager: Arc::new(OnceCell::new()),
k8s_distribution: Arc::new(OnceCell::new()),
config: Arc::new(K8sAnywhereConfig::from_env()),
}
}
@@ -179,42 +170,10 @@ impl K8sAnywhereTopology {
Self {
k8s_state: Arc::new(OnceCell::new()),
tenant_manager: Arc::new(OnceCell::new()),
k8s_distribution: Arc::new(OnceCell::new()),
config: Arc::new(config),
}
}
pub async fn get_k8s_distribution(&self) -> Result<&KubernetesDistribution, PreparationError> {
self.k8s_distribution
.get_or_try_init(async || {
let client = self.k8s_client().await.unwrap();
let discovery = client.discovery().await.map_err(|e| {
PreparationError::new(format!("Could not discover API groups: {}", e))
})?;
let version = client.get_apiserver_version().await.map_err(|e| {
PreparationError::new(format!("Could not get server version: {}", e))
})?;
// OpenShift / OKD
if discovery
.groups()
.any(|g| g.name() == "project.openshift.io")
{
return Ok(KubernetesDistribution::OpenshiftFamily);
}
// K3d / K3s
if version.git_version.contains("k3s") {
return Ok(KubernetesDistribution::K3sFamily);
}
return Ok(KubernetesDistribution::Default);
})
.await
}
async fn get_cluster_observability_operator_prometheus_application_score(
&self,
sender: RHOBObservability,

View File

@@ -28,7 +28,13 @@ pub trait LoadBalancer: Send + Sync {
&self,
service: &LoadBalancerService,
) -> Result<(), ExecutorError> {
self.add_service(service).await?;
debug!(
"Listing LoadBalancer services {:?}",
self.list_services().await
);
if !self.list_services().await.contains(service) {
self.add_service(service).await?;
}
Ok(())
}
}

View File

@@ -1,10 +1,4 @@
use std::{
error::Error,
fmt::{self, Debug},
net::Ipv4Addr,
str::FromStr,
sync::Arc,
};
use std::{error::Error, net::Ipv4Addr, str::FromStr, sync::Arc};
use async_trait::async_trait;
use derive_new::new;
@@ -25,8 +19,8 @@ pub struct DHCPStaticEntry {
pub ip: Ipv4Addr,
}
impl fmt::Display for DHCPStaticEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
impl std::fmt::Display for DHCPStaticEntry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let mac = self
.mac
.iter()
@@ -48,8 +42,8 @@ pub trait Firewall: Send + Sync {
fn get_host(&self) -> LogicalHost;
}
impl Debug for dyn Firewall {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
impl std::fmt::Debug for dyn Firewall {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("Firewall {}", self.get_ip()))
}
}
@@ -71,7 +65,7 @@ pub struct PxeOptions {
}
#[async_trait]
pub trait DhcpServer: Send + Sync + Debug {
pub trait DhcpServer: Send + Sync + std::fmt::Debug {
async fn add_static_mapping(&self, entry: &DHCPStaticEntry) -> Result<(), ExecutorError>;
async fn remove_static_mapping(&self, mac: &MacAddress) -> Result<(), ExecutorError>;
async fn list_static_mappings(&self) -> Vec<(MacAddress, IpAddress)>;
@@ -110,8 +104,8 @@ pub trait DnsServer: Send + Sync {
}
}
impl Debug for dyn DnsServer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
impl std::fmt::Debug for dyn DnsServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!("DnsServer {}", self.get_ip()))
}
}
@@ -147,8 +141,8 @@ pub enum DnsRecordType {
TXT,
}
impl fmt::Display for DnsRecordType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
impl std::fmt::Display for DnsRecordType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
DnsRecordType::A => write!(f, "A"),
DnsRecordType::AAAA => write!(f, "AAAA"),
@@ -184,8 +178,6 @@ impl FromStr for DnsRecordType {
#[async_trait]
pub trait Switch: Send + Sync {
async fn setup_switch(&self) -> Result<(), SwitchError>;
async fn get_port_for_mac_address(
&self,
mac_address: &MacAddress,
@@ -222,8 +214,8 @@ pub struct SwitchError {
msg: String,
}
impl fmt::Display for SwitchError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
impl std::fmt::Display for SwitchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.msg)
}
}
@@ -231,19 +223,7 @@ impl fmt::Display for SwitchError {
impl Error for SwitchError {}
#[async_trait]
pub trait SwitchClient: Debug + Send + Sync {
/// Executes essential, idempotent, one-time initial configuration steps.
///
/// This is an opiniated procedure that setups a switch to provide high availability
/// capabilities as decided by the NationTech team.
///
/// This includes tasks like enabling switchport for all interfaces
/// except the ones intended for Fabric Networking, etc.
///
/// The implementation must ensure the operation is **idempotent** (safe to run multiple times)
/// and that it doesn't break existing configurations.
async fn setup(&self) -> Result<(), SwitchError>;
pub trait SwitchClient: Send + Sync {
async fn find_port(
&self,
mac_address: &MacAddress,

View File

@@ -21,7 +21,6 @@ pub struct AlertingInterpret<S: AlertSender> {
pub sender: S,
pub receivers: Vec<Box<dyn AlertReceiver<S>>>,
pub rules: Vec<Box<dyn AlertRule<S>>>,
pub scrape_targets: Option<Vec<Box<dyn ScrapeTarget<S>>>>,
}
#[async_trait]
@@ -39,12 +38,6 @@ impl<S: AlertSender + Installable<T>, T: Topology> Interpret<T> for AlertingInte
debug!("installing rule: {:#?}", rule);
rule.install(&self.sender).await?;
}
if let Some(targets) = &self.scrape_targets {
for target in targets.iter() {
debug!("installing scrape_target: {:#?}", target);
target.install(&self.sender).await?;
}
}
self.sender.ensure_installed(inventory, topology).await?;
Ok(Outcome::success(format!(
"successfully installed alert sender {}",
@@ -84,6 +77,6 @@ pub trait AlertRule<S: AlertSender>: std::fmt::Debug + Send + Sync {
}
#[async_trait]
pub trait ScrapeTarget<S: AlertSender>: std::fmt::Debug + Send + Sync {
async fn install(&self, sender: &S) -> Result<Outcome, InterpretError>;
pub trait ScrapeTarget<S: AlertSender> {
async fn install(&self, sender: &S) -> Result<(), InterpretError>;
}

View File

@@ -1,14 +1,14 @@
use async_trait::async_trait;
use brocade::{BrocadeClient, BrocadeOptions, InterSwitchLink, InterfaceStatus, PortOperatingMode};
use brocade::{BrocadeClient, BrocadeOptions};
use harmony_secret::Secret;
use harmony_types::{
net::{IpAddress, MacAddress},
switch::{PortDeclaration, PortLocation},
};
use option_ext::OptionExt;
use serde::{Deserialize, Serialize};
use crate::topology::{SwitchClient, SwitchError};
#[derive(Debug)]
pub struct BrocadeSwitchClient {
brocade: Box<dyn BrocadeClient + Send + Sync>,
}
@@ -27,52 +27,13 @@ impl BrocadeSwitchClient {
#[async_trait]
impl SwitchClient for BrocadeSwitchClient {
async fn setup(&self) -> Result<(), SwitchError> {
let stack_topology = self
.brocade
.get_stack_topology()
.await
.map_err(|e| SwitchError::new(e.to_string()))?;
let interfaces = self
.brocade
.get_interfaces()
.await
.map_err(|e| SwitchError::new(e.to_string()))?;
let interfaces: Vec<(String, PortOperatingMode)> = interfaces
.into_iter()
.filter(|interface| {
interface.operating_mode.is_none() && interface.status == InterfaceStatus::Connected
})
.filter(|interface| {
!stack_topology.iter().any(|link: &InterSwitchLink| {
link.local_port == interface.port_location
|| link.remote_port.contains(&interface.port_location)
})
})
.map(|interface| (interface.name.clone(), PortOperatingMode::Access))
.collect();
if interfaces.is_empty() {
return Ok(());
}
self.brocade
.configure_interfaces(interfaces)
.await
.map_err(|e| SwitchError::new(e.to_string()))?;
Ok(())
}
async fn find_port(
&self,
mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError> {
let table = self
.brocade
.get_mac_address_table()
.show_mac_address_table()
.await
.map_err(|e| SwitchError::new(format!("{e}")))?;
@@ -113,266 +74,8 @@ impl SwitchClient for BrocadeSwitchClient {
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use assertor::*;
use async_trait::async_trait;
use brocade::{
BrocadeClient, BrocadeInfo, Error, InterSwitchLink, InterfaceInfo, InterfaceStatus,
InterfaceType, MacAddressEntry, PortChannelId, PortOperatingMode,
};
use harmony_types::switch::PortLocation;
use crate::{infra::brocade::BrocadeSwitchClient, topology::SwitchClient};
#[tokio::test]
async fn setup_should_configure_ethernet_interfaces_as_access_ports() {
let first_interface = given_interface()
.with_port_location(PortLocation(1, 0, 1))
.build();
let second_interface = given_interface()
.with_port_location(PortLocation(1, 0, 4))
.build();
let brocade = Box::new(FakeBrocadeClient::new(
vec![],
vec![first_interface.clone(), second_interface.clone()],
));
let client = BrocadeSwitchClient {
brocade: brocade.clone(),
};
client.setup().await.unwrap();
let configured_interfaces = brocade.configured_interfaces.lock().unwrap();
assert_that!(*configured_interfaces).contains_exactly(vec![
(first_interface.name.clone(), PortOperatingMode::Access),
(second_interface.name.clone(), PortOperatingMode::Access),
]);
}
#[tokio::test]
async fn setup_with_an_already_configured_interface_should_skip_configuration() {
let brocade = Box::new(FakeBrocadeClient::new(
vec![],
vec![
given_interface()
.with_operating_mode(Some(PortOperatingMode::Access))
.build(),
],
));
let client = BrocadeSwitchClient {
brocade: brocade.clone(),
};
client.setup().await.unwrap();
let configured_interfaces = brocade.configured_interfaces.lock().unwrap();
assert_that!(*configured_interfaces).is_empty();
}
#[tokio::test]
async fn setup_with_a_disconnected_interface_should_skip_configuration() {
let brocade = Box::new(FakeBrocadeClient::new(
vec![],
vec![
given_interface()
.with_status(InterfaceStatus::SfpAbsent)
.build(),
given_interface()
.with_status(InterfaceStatus::NotConnected)
.build(),
],
));
let client = BrocadeSwitchClient {
brocade: brocade.clone(),
};
client.setup().await.unwrap();
let configured_interfaces = brocade.configured_interfaces.lock().unwrap();
assert_that!(*configured_interfaces).is_empty();
}
#[tokio::test]
async fn setup_with_inter_switch_links_should_not_configure_interfaces_used_to_form_stack() {
let brocade = Box::new(FakeBrocadeClient::new(
vec![
given_inter_switch_link()
.between(PortLocation(1, 0, 1), PortLocation(2, 0, 1))
.build(),
given_inter_switch_link()
.between(PortLocation(2, 0, 2), PortLocation(3, 0, 1))
.build(),
],
vec![
given_interface()
.with_port_location(PortLocation(1, 0, 1))
.build(),
given_interface()
.with_port_location(PortLocation(2, 0, 1))
.build(),
given_interface()
.with_port_location(PortLocation(3, 0, 1))
.build(),
],
));
let client = BrocadeSwitchClient {
brocade: brocade.clone(),
};
client.setup().await.unwrap();
let configured_interfaces = brocade.configured_interfaces.lock().unwrap();
assert_that!(*configured_interfaces).is_empty();
}
#[derive(Debug, Clone)]
struct FakeBrocadeClient {
stack_topology: Vec<InterSwitchLink>,
interfaces: Vec<InterfaceInfo>,
configured_interfaces: Arc<Mutex<Vec<(String, PortOperatingMode)>>>,
}
#[async_trait]
impl BrocadeClient for FakeBrocadeClient {
async fn version(&self) -> Result<BrocadeInfo, Error> {
todo!()
}
async fn get_mac_address_table(&self) -> Result<Vec<MacAddressEntry>, Error> {
todo!()
}
async fn get_stack_topology(&self) -> Result<Vec<InterSwitchLink>, Error> {
Ok(self.stack_topology.clone())
}
async fn get_interfaces(&self) -> Result<Vec<InterfaceInfo>, Error> {
Ok(self.interfaces.clone())
}
async fn configure_interfaces(
&self,
interfaces: Vec<(String, PortOperatingMode)>,
) -> Result<(), Error> {
let mut configured_interfaces = self.configured_interfaces.lock().unwrap();
*configured_interfaces = interfaces;
Ok(())
}
async fn find_available_channel_id(&self) -> Result<PortChannelId, Error> {
todo!()
}
async fn create_port_channel(
&self,
_channel_id: PortChannelId,
_channel_name: &str,
_ports: &[PortLocation],
) -> Result<(), Error> {
todo!()
}
async fn clear_port_channel(&self, _channel_name: &str) -> Result<(), Error> {
todo!()
}
}
impl FakeBrocadeClient {
fn new(stack_topology: Vec<InterSwitchLink>, interfaces: Vec<InterfaceInfo>) -> Self {
Self {
stack_topology,
interfaces,
configured_interfaces: Arc::new(Mutex::new(vec![])),
}
}
}
struct InterfaceInfoBuilder {
port_location: Option<PortLocation>,
interface_type: Option<InterfaceType>,
operating_mode: Option<PortOperatingMode>,
status: Option<InterfaceStatus>,
}
impl InterfaceInfoBuilder {
fn build(&self) -> InterfaceInfo {
let interface_type = self
.interface_type
.clone()
.unwrap_or(InterfaceType::Ethernet("TenGigabitEthernet".into()));
let port_location = self.port_location.clone().unwrap_or(PortLocation(1, 0, 1));
let name = format!("{interface_type} {port_location}");
let status = self.status.clone().unwrap_or(InterfaceStatus::Connected);
InterfaceInfo {
name,
port_location,
interface_type,
operating_mode: self.operating_mode.clone(),
status,
}
}
fn with_port_location(self, port_location: PortLocation) -> Self {
Self {
port_location: Some(port_location),
..self
}
}
fn with_operating_mode(self, operating_mode: Option<PortOperatingMode>) -> Self {
Self {
operating_mode,
..self
}
}
fn with_status(self, status: InterfaceStatus) -> Self {
Self {
status: Some(status),
..self
}
}
}
struct InterSwitchLinkBuilder {
link: Option<(PortLocation, PortLocation)>,
}
impl InterSwitchLinkBuilder {
fn build(&self) -> InterSwitchLink {
let link = self
.link
.clone()
.unwrap_or((PortLocation(1, 0, 1), PortLocation(2, 0, 1)));
InterSwitchLink {
local_port: link.0,
remote_port: Some(link.1),
}
}
fn between(self, local_port: PortLocation, remote_port: PortLocation) -> Self {
Self {
link: Some((local_port, remote_port)),
}
}
}
fn given_interface() -> InterfaceInfoBuilder {
InterfaceInfoBuilder {
port_location: None,
interface_type: None,
operating_mode: None,
status: None,
}
}
fn given_inter_switch_link() -> InterSwitchLinkBuilder {
InterSwitchLinkBuilder { link: None }
}
#[derive(Secret, Serialize, Deserialize, Debug)]
pub struct BrocadeSwitchAuth {
pub username: String,
pub password: String,
}

View File

@@ -26,13 +26,19 @@ impl LoadBalancer for OPNSenseFirewall {
}
async fn add_service(&self, service: &LoadBalancerService) -> Result<(), ExecutorError> {
warn!(
"TODO : the current implementation does not check / cleanup / merge with existing haproxy services properly. Make sure to manually verify that the configuration is correct after executing any operation here"
);
let mut config = self.opnsense_config.write().await;
let mut load_balancer = config.load_balancer();
let (frontend, backend, servers, healthcheck) =
harmony_load_balancer_service_to_haproxy_xml(service);
load_balancer.configure_service(frontend, backend, servers, healthcheck);
let mut load_balancer = config.load_balancer();
load_balancer.add_backend(backend);
load_balancer.add_frontend(frontend);
load_balancer.add_servers(servers);
if let Some(healthcheck) = healthcheck {
load_balancer.add_healthcheck(healthcheck);
}
Ok(())
}
@@ -100,7 +106,7 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer(
.backends
.backends
.iter()
.find(|b| Some(b.uuid.clone()) == frontend.default_backend);
.find(|b| b.uuid == frontend.default_backend);
let mut health_check = None;
match matching_backend {
@@ -110,7 +116,8 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer(
}
None => {
warn!(
"HAProxy config could not find a matching backend for frontend {frontend:?}"
"HAProxy config could not find a matching backend for frontend {:?}",
frontend
);
}
}
@@ -145,11 +152,11 @@ pub(crate) fn get_servers_for_backend(
.servers
.iter()
.filter_map(|server| {
let address = server.address.clone()?;
let port = server.port?;
if backend_servers.contains(&server.uuid.as_str()) {
return Some(BackendServer { address, port });
return Some(BackendServer {
address: server.address.clone(),
port: server.port,
});
}
None
})
@@ -340,7 +347,7 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
name: format!("frontend_{}", service.listening_port),
bind: service.listening_port.to_string(),
mode: "tcp".to_string(), // TODO do not depend on health check here
default_backend: Some(backend.uuid.clone()),
default_backend: backend.uuid.clone(),
..Default::default()
};
info!("HAPRoxy frontend and backend mode currently hardcoded to tcp");
@@ -354,8 +361,8 @@ fn server_to_haproxy_server(server: &BackendServer) -> HAProxyServer {
uuid: Uuid::new_v4().to_string(),
name: format!("{}_{}", &server.address, &server.port),
enabled: 1,
address: Some(server.address.clone()),
port: Some(server.port),
address: server.address.clone(),
port: server.port,
mode: "active".to_string(),
server_type: "static".to_string(),
..Default::default()
@@ -378,8 +385,8 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: Some("192.168.1.1".to_string()),
port: Some(80),
address: "192.168.1.1".to_string(),
port: 80,
..Default::default()
};
haproxy.servers.servers.push(server);
@@ -404,8 +411,8 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: Some("192.168.1.1".to_string()),
port: Some(80),
address: "192.168.1.1".to_string(),
port: 80,
..Default::default()
};
haproxy.servers.servers.push(server);
@@ -424,8 +431,8 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: Some("192.168.1.1".to_string()),
port: Some(80),
address: "192.168.1.1".to_string(),
port: 80,
..Default::default()
};
haproxy.servers.servers.push(server);
@@ -446,16 +453,16 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: Some("some-hostname.test.mcd".to_string()),
port: Some(80),
address: "some-hostname.test.mcd".to_string(),
port: 80,
..Default::default()
};
haproxy.servers.servers.push(server);
let server = HAProxyServer {
uuid: "server2".to_string(),
address: Some("192.168.1.2".to_string()),
port: Some(8080),
address: "192.168.1.2".to_string(),
port: 8080,
..Default::default()
};
haproxy.servers.servers.push(server);

View File

@@ -1,5 +1,4 @@
use async_trait::async_trait;
use harmony_macros::hurl;
use kube::{Api, api::GroupVersionKind};
use log::{debug, warn};
use non_blank_string_rs::NonBlankString;
@@ -1052,7 +1051,7 @@ commitServer:
install_only: false,
repository: Some(HelmRepository::new(
"argo".to_string(),
hurl!("https://argoproj.github.io/argo-helm"),
url::Url::parse("https://argoproj.github.io/argo-helm").unwrap(),
true,
)),
}

View File

@@ -1,209 +0,0 @@
use std::sync::Arc;
use async_trait::async_trait;
use harmony_types::id::Id;
use kube::{CustomResource, api::ObjectMeta};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
};
#[derive(Clone, Debug, Serialize)]
pub struct ClusterIssuerScore {
email: String,
server: String,
issuer_name: String,
namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for ClusterIssuerScore {
fn name(&self) -> String {
"ClusterIssuerScore".to_string()
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(ClusterIssuerInterpret {
score: self.clone(),
})
}
}
#[derive(Debug, Clone)]
pub struct ClusterIssuerInterpret {
score: ClusterIssuerScore,
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for ClusterIssuerInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
self.apply_cluster_issuer(topology.k8s_client().await.unwrap())
.await
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("ClusterIssuer")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl ClusterIssuerInterpret {
async fn validate_cert_manager(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let cert_manager = "cert-manager".to_string();
let operator_namespace = "openshift-operators".to_string();
match client
.get_deployment(&cert_manager, Some(&operator_namespace))
.await
{
Ok(Some(deployment)) => {
if let Some(status) = deployment.status {
let ready_count = status.ready_replicas.unwrap_or(0);
if ready_count >= 1 {
return Ok(Outcome::success(format!(
"'{}' is ready with {} replica(s).",
&cert_manager, ready_count
)));
} else {
return Err(InterpretError::new(
"cert-manager operator not ready in cluster".to_string(),
));
}
} else {
Err(InterpretError::new(format!(
"failed to get deployment status {} in ns {}",
&cert_manager, &operator_namespace
)))
}
}
Ok(None) => Err(InterpretError::new(format!(
"Deployment '{}' not found in namespace '{}'.",
&cert_manager, &operator_namespace
))),
Err(e) => Err(InterpretError::new(format!(
"Failed to query for deployment '{}': {}",
&cert_manager, e
))),
}
}
fn build_cluster_issuer(&self) -> Result<ClusterIssuer, InterpretError> {
let issuer_name = &self.score.issuer_name;
let email = &self.score.email;
let server = &self.score.server;
let namespace = &self.score.namespace;
let cluster_issuer = ClusterIssuer {
metadata: ObjectMeta {
name: Some(issuer_name.to_string()),
namespace: Some(namespace.to_string()),
..Default::default()
},
spec: ClusterIssuerSpec {
acme: AcmeSpec {
email: email.to_string(),
private_key_secret_ref: PrivateKeySecretRef {
name: issuer_name.to_string(),
},
server: server.to_string(),
solvers: vec![SolverSpec {
http01: Some(Http01Solver {
ingress: Http01Ingress {
class: "nginx".to_string(),
},
}),
}],
},
},
};
Ok(cluster_issuer)
}
pub async fn apply_cluster_issuer(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let namespace = self.score.namespace.clone();
self.validate_cert_manager(&client).await?;
let cluster_issuer = self.build_cluster_issuer().unwrap();
client
.apply_yaml(
&serde_yaml::to_value(cluster_issuer).unwrap(),
Some(&namespace),
)
.await?;
Ok(Outcome::success(format!(
"successfully deployed cluster operator: {} in namespace: {}",
self.score.issuer_name, self.score.namespace
)))
}
}
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "cert-manager.io",
version = "v1",
kind = "ClusterIssuer",
plural = "clusterissuers"
)]
#[serde(rename_all = "camelCase")]
pub struct ClusterIssuerSpec {
pub acme: AcmeSpec,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct AcmeSpec {
pub email: String,
pub private_key_secret_ref: PrivateKeySecretRef,
pub server: String,
pub solvers: Vec<SolverSpec>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct PrivateKeySecretRef {
pub name: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct SolverSpec {
pub http01: Option<Http01Solver>,
// Other solver types (e.g., dns01) would go here as Options
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Http01Solver {
pub ingress: Http01Ingress,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Http01Ingress {
pub class: String,
}

View File

@@ -1,6 +1,5 @@
use std::{collections::HashMap, str::FromStr};
use harmony_macros::hurl;
use non_blank_string_rs::NonBlankString;
use serde::Serialize;
use url::Url;
@@ -34,7 +33,7 @@ impl<T: Topology + HelmCommand> Score<T> for CertManagerHelmScore {
install_only: true,
repository: Some(HelmRepository::new(
"jetstack".to_string(),
hurl!("https://charts.jetstack.io"),
Url::parse("https://charts.jetstack.io").unwrap(),
true,
)),
}

View File

@@ -1,3 +1,2 @@
pub mod cluster_issuer;
mod helm;
pub use helm::*;

View File

@@ -5,7 +5,6 @@ use crate::score::Score;
use crate::topology::{HelmCommand, Topology};
use async_trait::async_trait;
use harmony_types::id::Id;
use harmony_types::net::Url;
use helm_wrapper_rs;
use helm_wrapper_rs::blocking::{DefaultHelmExecutor, HelmExecutor};
use log::{debug, info, warn};
@@ -16,6 +15,7 @@ use std::path::Path;
use std::process::{Command, Output, Stdio};
use std::str::FromStr;
use temp_file::TempFile;
use url::Url;
#[derive(Debug, Clone, Serialize)]
pub struct HelmRepository {
@@ -78,8 +78,7 @@ impl HelmChartInterpret {
repo.name, repo.url, repo.force_update
);
let repo_url = repo.url.to_string();
let mut add_args = vec!["repo", "add", &repo.name, &repo_url];
let mut add_args = vec!["repo", "add", &repo.name, repo.url.as_str()];
if repo.force_update {
add_args.push("--force-update");
}

View File

@@ -0,0 +1,364 @@
use async_trait::async_trait;
use log::debug;
use serde::Serialize;
use std::collections::HashMap;
use std::io::ErrorKind;
use std::path::PathBuf;
use std::process::{Command, Output};
use temp_dir::{self, TempDir};
use temp_file::TempFile;
use crate::data::Version;
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory;
use crate::score::Score;
use crate::topology::{HelmCommand, K8sclient, Topology};
use harmony_types::id::Id;
#[derive(Clone)]
pub struct HelmCommandExecutor {
pub env: HashMap<String, String>,
pub path: Option<PathBuf>,
pub args: Vec<String>,
pub api_versions: Option<Vec<String>>,
pub kube_version: String,
pub debug: Option<bool>,
pub globals: HelmGlobals,
pub chart: HelmChart,
}
#[derive(Clone)]
pub struct HelmGlobals {
pub chart_home: Option<PathBuf>,
pub config_home: Option<PathBuf>,
}
#[derive(Debug, Clone, Serialize)]
pub struct HelmChart {
pub name: String,
pub version: Option<String>,
pub repo: Option<String>,
pub release_name: Option<String>,
pub namespace: Option<String>,
pub additional_values_files: Vec<PathBuf>,
pub values_file: Option<PathBuf>,
pub values_inline: Option<String>,
pub include_crds: Option<bool>,
pub skip_hooks: Option<bool>,
pub api_versions: Option<Vec<String>>,
pub kube_version: Option<String>,
pub name_template: String,
pub skip_tests: Option<bool>,
pub debug: Option<bool>,
}
impl HelmCommandExecutor {
pub fn generate(mut self) -> Result<String, std::io::Error> {
if self.globals.chart_home.is_none() {
self.globals.chart_home = Some(PathBuf::from("charts"));
}
if self
.clone()
.chart
.clone()
.chart_exists_locally(self.clone().globals.chart_home.unwrap())
.is_none()
{
if self.chart.repo.is_none() {
return Err(std::io::Error::new(
ErrorKind::Other,
"Chart doesn't exist locally and no repo specified",
));
}
self.clone().run_command(
self.chart
.clone()
.pull_command(self.globals.chart_home.clone().unwrap()),
)?;
}
let out = self.clone().run_command(
self.chart
.clone()
.helm_args(self.globals.chart_home.clone().unwrap()),
)?;
// TODO: don't use unwrap here
let s = String::from_utf8(out.stdout).unwrap();
debug!("helm stderr: {}", String::from_utf8(out.stderr).unwrap());
debug!("helm status: {}", out.status);
debug!("helm output: {s}");
let clean = s.split_once("---").unwrap().1;
Ok(clean.to_string())
}
pub fn version(self) -> Result<String, std::io::Error> {
let out = self.run_command(vec![
"version".to_string(),
"-c".to_string(),
"--short".to_string(),
])?;
// TODO: don't use unwrap
Ok(String::from_utf8(out.stdout).unwrap())
}
pub fn run_command(mut self, mut args: Vec<String>) -> Result<Output, std::io::Error> {
if let Some(d) = self.debug {
if d {
args.push("--debug".to_string());
}
}
let path = if let Some(p) = self.path {
p
} else {
PathBuf::from("helm")
};
let config_home = match self.globals.config_home {
Some(p) => p,
None => PathBuf::from(TempDir::new()?.path()),
};
if let Some(yaml_str) = self.chart.values_inline {
let tf: TempFile = temp_file::with_contents(yaml_str.as_bytes());
self.chart
.additional_values_files
.push(PathBuf::from(tf.path()));
};
self.env.insert(
"HELM_CONFIG_HOME".to_string(),
config_home.to_str().unwrap().to_string(),
);
self.env.insert(
"HELM_CACHE_HOME".to_string(),
config_home.to_str().unwrap().to_string(),
);
self.env.insert(
"HELM_DATA_HOME".to_string(),
config_home.to_str().unwrap().to_string(),
);
Command::new(path).envs(self.env).args(args).output()
}
}
impl HelmChart {
pub fn chart_exists_locally(self, chart_home: PathBuf) -> Option<PathBuf> {
let chart_path =
PathBuf::from(chart_home.to_str().unwrap().to_string() + "/" + &self.name.to_string());
if chart_path.exists() {
Some(chart_path)
} else {
None
}
}
pub fn pull_command(self, chart_home: PathBuf) -> Vec<String> {
let mut args = vec![
"pull".to_string(),
"--untar".to_string(),
"--untardir".to_string(),
chart_home.to_str().unwrap().to_string(),
];
match self.repo {
Some(r) => {
if r.starts_with("oci://") {
args.push(
r.trim_end_matches("/").to_string() + "/" + self.name.clone().as_str(),
);
} else {
args.push("--repo".to_string());
args.push(r.to_string());
args.push(self.name);
}
}
None => args.push(self.name),
};
if let Some(v) = self.version {
args.push("--version".to_string());
args.push(v.to_string());
}
args
}
pub fn helm_args(self, chart_home: PathBuf) -> Vec<String> {
let mut args: Vec<String> = vec!["template".to_string()];
match self.release_name {
Some(rn) => args.push(rn.to_string()),
None => args.push("--generate-name".to_string()),
}
args.push(
PathBuf::from(chart_home.to_str().unwrap().to_string() + "/" + self.name.as_str())
.to_str()
.unwrap()
.to_string(),
);
if let Some(n) = self.namespace {
args.push("--namespace".to_string());
args.push(n.to_string());
}
if let Some(f) = self.values_file {
args.push("-f".to_string());
args.push(f.to_str().unwrap().to_string());
}
for f in self.additional_values_files {
args.push("-f".to_string());
args.push(f.to_str().unwrap().to_string());
}
if let Some(vv) = self.api_versions {
for v in vv {
args.push("--api-versions".to_string());
args.push(v);
}
}
if let Some(kv) = self.kube_version {
args.push("--kube-version".to_string());
args.push(kv);
}
if let Some(crd) = self.include_crds {
if crd {
args.push("--include-crds".to_string());
}
}
if let Some(st) = self.skip_tests {
if st {
args.push("--skip-tests".to_string());
}
}
if let Some(sh) = self.skip_hooks {
if sh {
args.push("--no-hooks".to_string());
}
}
if let Some(d) = self.debug {
if d {
args.push("--debug".to_string());
}
}
args
}
}
#[derive(Debug, Clone, Serialize)]
pub struct HelmChartScoreV2 {
pub chart: HelmChart,
}
impl<T: Topology + K8sclient + HelmCommand> Score<T> for HelmChartScoreV2 {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(HelmChartInterpretV2 {
score: self.clone(),
})
}
fn name(&self) -> String {
format!(
"{} {} HelmChartScoreV2",
self.chart
.release_name
.clone()
.unwrap_or("Unknown".to_string()),
self.chart.name
)
}
}
#[derive(Debug, Serialize)]
pub struct HelmChartInterpretV2 {
pub score: HelmChartScoreV2,
}
impl HelmChartInterpretV2 {}
#[async_trait]
impl<T: Topology + K8sclient + HelmCommand> Interpret<T> for HelmChartInterpretV2 {
async fn execute(
&self,
_inventory: &Inventory,
_topology: &T,
) -> Result<Outcome, InterpretError> {
let _ns = self
.score
.chart
.namespace
.as_ref()
.unwrap_or_else(|| todo!("Get namespace from active kubernetes cluster"));
let helm_executor = HelmCommandExecutor {
env: HashMap::new(),
path: None,
args: vec![],
api_versions: None,
kube_version: "v1.33.0".to_string(),
debug: Some(false),
globals: HelmGlobals {
chart_home: None,
config_home: None,
},
chart: self.score.chart.clone(),
};
// let mut helm_options = Vec::new();
// if self.score.create_namespace {
// helm_options.push(NonBlankString::from_str("--create-namespace").unwrap());
// }
let res = helm_executor.generate();
let _output = match res {
Ok(output) => output,
Err(err) => return Err(InterpretError::new(err.to_string())),
};
// TODO: implement actually applying the YAML from the templating in the generate function to a k8s cluster, having trouble passing in straight YAML into the k8s client
// let k8s_resource = k8s_openapi::serde_json::from_str(output.as_str()).unwrap();
// let client = topology
// .k8s_client()
// .await
// .expect("Environment should provide enough information to instanciate a client")
// .apply_namespaced(&vec![output], Some(ns.to_string().as_str()));
// match client.apply_yaml(output) {
// Ok(_) => return Ok(Outcome::success("Helm chart deployed".to_string())),
// Err(e) => return Err(InterpretError::new(e)),
// }
Ok(Outcome::success("Helm chart deployed".to_string()))
}
fn get_name(&self) -> InterpretName {
InterpretName::HelmCommand
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@@ -1 +1,2 @@
pub mod chart;
pub mod command;

View File

@@ -1,187 +0,0 @@
use std::net::IpAddr;
use async_trait::async_trait;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::{
modules::monitoring::kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus, crd_prometheuses::LabelSelector,
},
topology::oberservability::monitoring::ScrapeTarget,
};
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "monitoring.coreos.com",
version = "v1alpha1",
kind = "ScrapeConfig",
plural = "scrapeconfigs",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct ScrapeConfigSpec {
/// List of static configurations.
pub static_configs: Option<Vec<StaticConfig>>,
/// Kubernetes service discovery.
pub kubernetes_sd_configs: Option<Vec<KubernetesSDConfig>>,
/// HTTP-based service discovery.
pub http_sd_configs: Option<Vec<HttpSDConfig>>,
/// File-based service discovery.
pub file_sd_configs: Option<Vec<FileSDConfig>>,
/// DNS-based service discovery.
pub dns_sd_configs: Option<Vec<DnsSDConfig>>,
/// Consul service discovery.
pub consul_sd_configs: Option<Vec<ConsulSDConfig>>,
/// Relabeling configuration applied to discovered targets.
pub relabel_configs: Option<Vec<RelabelConfig>>,
/// Metric relabeling configuration applied to scraped samples.
pub metric_relabel_configs: Option<Vec<RelabelConfig>>,
/// Path to scrape metrics from (defaults to `/metrics`).
pub metrics_path: Option<String>,
/// Interval at which Prometheus scrapes targets (e.g., "30s").
pub scrape_interval: Option<String>,
/// Timeout for scraping (e.g., "10s").
pub scrape_timeout: Option<String>,
/// Optional job name override.
pub job_name: Option<String>,
/// Optional scheme (http or https).
pub scheme: Option<String>,
/// Authorization paramaters for snmp walk
pub params: Option<Params>,
}
/// Static configuration section of a ScrapeConfig.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct StaticConfig {
pub targets: Vec<String>,
pub labels: Option<LabelSelector>,
}
/// Relabeling configuration for target or metric relabeling.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct RelabelConfig {
pub source_labels: Option<Vec<String>>,
pub separator: Option<String>,
pub target_label: Option<String>,
pub regex: Option<String>,
pub modulus: Option<u64>,
pub replacement: Option<String>,
pub action: Option<String>,
}
/// Kubernetes service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct KubernetesSDConfig {
///"pod", "service", "endpoints"pub role: String,
pub namespaces: Option<NamespaceSelector>,
pub selectors: Option<Vec<LabelSelector>>,
pub api_server: Option<String>,
pub bearer_token_file: Option<String>,
pub tls_config: Option<TLSConfig>,
}
/// Namespace selector for Kubernetes service discovery.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct NamespaceSelector {
pub any: Option<bool>,
pub match_names: Option<Vec<String>>,
}
/// HTTP-based service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct HttpSDConfig {
pub url: String,
pub refresh_interval: Option<String>,
pub basic_auth: Option<BasicAuth>,
pub authorization: Option<Authorization>,
pub tls_config: Option<TLSConfig>,
}
/// File-based service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct FileSDConfig {
pub files: Vec<String>,
pub refresh_interval: Option<String>,
}
/// DNS-based service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct DnsSDConfig {
pub names: Vec<String>,
pub refresh_interval: Option<String>,
pub type_: Option<String>, // SRV, A, AAAA
pub port: Option<u16>,
}
/// Consul service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ConsulSDConfig {
pub server: String,
pub services: Option<Vec<String>>,
pub scheme: Option<String>,
pub datacenter: Option<String>,
pub tag_separator: Option<String>,
pub refresh_interval: Option<String>,
pub tls_config: Option<TLSConfig>,
}
/// Basic authentication credentials.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct BasicAuth {
pub username: String,
pub password: Option<String>,
pub password_file: Option<String>,
}
/// Bearer token or other auth mechanisms.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Authorization {
pub credentials: Option<String>,
pub credentials_file: Option<String>,
pub type_: Option<String>,
}
/// TLS configuration for secure scraping.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct TLSConfig {
pub ca_file: Option<String>,
pub cert_file: Option<String>,
pub key_file: Option<String>,
pub server_name: Option<String>,
pub insecure_skip_verify: Option<bool>,
}
/// Authorization parameters for SNMP walk.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Params {
pub auth: Option<Vec<String>>,
pub module: Option<Vec<String>>,
}

View File

@@ -4,7 +4,6 @@ pub mod crd_default_rules;
pub mod crd_grafana;
pub mod crd_prometheus_rules;
pub mod crd_prometheuses;
pub mod crd_scrape_config;
pub mod grafana_default_dashboard;
pub mod grafana_operator;
pub mod prometheus_operator;

View File

@@ -31,7 +31,6 @@ impl<T: Topology + HelmCommand + TenantManager> Score<T> for HelmPrometheusAlert
sender: KubePrometheus { config },
receivers: self.receivers.clone(),
rules: self.rules.clone(),
scrape_targets: None,
})
}
fn name(&self) -> String {

View File

@@ -4,6 +4,4 @@ pub mod application_monitoring;
pub mod grafana;
pub mod kube_prometheus;
pub mod ntfy;
pub mod okd;
pub mod prometheus;
pub mod scrape_target;

View File

@@ -1,149 +0,0 @@
use std::{collections::BTreeMap, sync::Arc};
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
};
use async_trait::async_trait;
use harmony_types::id::Id;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::api::ObjectMeta;
use serde::Serialize;
#[derive(Clone, Debug, Serialize)]
pub struct OpenshiftUserWorkloadMonitoring {}
impl<T: Topology + K8sclient> Score<T> for OpenshiftUserWorkloadMonitoring {
fn name(&self) -> String {
"OpenshiftUserWorkloadMonitoringScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(OpenshiftUserWorkloadMonitoringInterpret {})
}
}
#[derive(Clone, Debug, Serialize)]
pub struct OpenshiftUserWorkloadMonitoringInterpret {}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for OpenshiftUserWorkloadMonitoringInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await.unwrap();
self.update_cluster_monitoring_config_cm(&client).await?;
self.update_user_workload_monitoring_config_cm(&client)
.await?;
self.verify_user_workload(&client).await?;
Ok(Outcome::success(
"successfully enabled user-workload-monitoring".to_string(),
))
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OpenshiftUserWorkloadMonitoring")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl OpenshiftUserWorkloadMonitoringInterpret {
pub async fn update_cluster_monitoring_config_cm(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let mut data = BTreeMap::new();
data.insert(
"config.yaml".to_string(),
r#"
enableUserWorkload: true
alertmanagerMain:
enableUserAlertmanagerConfig: true
"#
.to_string(),
);
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("cluster-monitoring-config".to_string()),
namespace: Some("openshift-monitoring".to_string()),
..Default::default()
},
data: Some(data),
..Default::default()
};
client.apply(&cm, Some("openshift-monitoring")).await?;
Ok(Outcome::success(
"updated cluster-monitoring-config-map".to_string(),
))
}
pub async fn update_user_workload_monitoring_config_cm(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let mut data = BTreeMap::new();
data.insert(
"config.yaml".to_string(),
r#"
alertmanager:
enabled: true
enableAlertmanagerConfig: true
"#
.to_string(),
);
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some("user-workload-monitoring-config".to_string()),
namespace: Some("openshift-user-workload-monitoring".to_string()),
..Default::default()
},
data: Some(data),
..Default::default()
};
client
.apply(&cm, Some("openshift-user-workload-monitoring"))
.await?;
Ok(Outcome::success(
"updated openshift-user-monitoring-config-map".to_string(),
))
}
pub async fn verify_user_workload(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let namespace = "openshift-user-workload-monitoring";
let alertmanager_name = "alertmanager-user-workload-0";
let prometheus_name = "prometheus-user-workload-0";
client
.wait_for_pod_ready(alertmanager_name, Some(namespace))
.await?;
client
.wait_for_pod_ready(prometheus_name, Some(namespace))
.await?;
Ok(Outcome::success(format!(
"pods: {}, {} ready in ns: {}",
alertmanager_name, prometheus_name, namespace
)))
}
}

View File

@@ -1 +0,0 @@
pub mod enable_user_workload;

View File

@@ -1 +0,0 @@
pub mod server;

View File

@@ -1,76 +0,0 @@
use std::net::IpAddr;
use async_trait::async_trait;
use kube::api::ObjectMeta;
use serde::Serialize;
use crate::{
interpret::{InterpretError, Outcome},
modules::monitoring::kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus,
crd_scrape_config::{Params, RelabelConfig, ScrapeConfig, ScrapeConfigSpec, StaticConfig},
},
topology::oberservability::monitoring::ScrapeTarget,
};
#[derive(Debug, Clone, Serialize)]
pub struct Server {
pub name: String,
pub ip: IpAddr,
pub auth: String,
pub module: String,
pub domain: String,
}
#[async_trait]
impl ScrapeTarget<CRDPrometheus> for Server {
async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> {
let scrape_config_spec = ScrapeConfigSpec {
static_configs: Some(vec![StaticConfig {
targets: vec![self.ip.to_string()],
labels: None,
}]),
scrape_interval: Some("2m".to_string()),
kubernetes_sd_configs: None,
http_sd_configs: None,
file_sd_configs: None,
dns_sd_configs: None,
params: Some(Params {
auth: Some(vec![self.auth.clone()]),
module: Some(vec![self.module.clone()]),
}),
consul_sd_configs: None,
relabel_configs: Some(vec![RelabelConfig {
action: None,
source_labels: Some(vec!["__address__".to_string()]),
separator: None,
target_label: Some("__param_target".to_string()),
regex: None,
replacement: Some(format!("snmp.{}:31080", self.domain.clone())),
modulus: None,
}]),
metric_relabel_configs: None,
metrics_path: Some("/snmp".to_string()),
scrape_timeout: Some("2m".to_string()),
job_name: Some(format!("snmp_exporter/cloud/{}", self.name.clone())),
scheme: None,
};
let scrape_config = ScrapeConfig {
metadata: ObjectMeta {
name: Some(self.name.clone()),
namespace: Some(sender.namespace.clone()),
..Default::default()
},
spec: scrape_config_spec,
};
sender
.client
.apply(&scrape_config, Some(&sender.namespace.clone()))
.await?;
Ok(Outcome::success(format!(
"installed scrape target {}",
self.name.clone()
)))
}
}

View File

@@ -215,7 +215,7 @@ impl OKDSetup03ControlPlaneInterpret {
) -> Result<(), InterpretError> {
info!("[ControlPlane] Ensuring persistent bonding");
let score = HostNetworkConfigurationScore {
hosts: hosts.clone(),
hosts: hosts.clone(), // FIXME: Avoid clone if possible
};
score.interpret(inventory, topology).await?;

View File

@@ -77,8 +77,6 @@ impl OKDBootstrapLoadBalancerScore {
address: topology.bootstrap_host.ip.to_string(),
port,
});
backend.dedup();
backend
}
}

View File

@@ -240,7 +240,7 @@ pub struct OvsPortSpec {
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct EthtoolSpec {
// TODO: Properly describe this spec (https://nmstate.io/devel/yaml_api.html#ethtool)
// FIXME: Properly describe this spec (https://nmstate.io/devel/yaml_api.html#ethtool)
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]

View File

@@ -1,6 +1,6 @@
use async_trait::async_trait;
use harmony_types::id::Id;
use log::{debug, info};
use log::{debug, info, warn};
use serde::Serialize;
use crate::{
@@ -34,59 +34,6 @@ pub struct HostNetworkConfigurationInterpret {
score: HostNetworkConfigurationScore,
}
impl HostNetworkConfigurationInterpret {
async fn configure_network_for_host<T: Topology + Switch>(
&self,
topology: &T,
host: &PhysicalHost,
) -> Result<(), InterpretError> {
let switch_ports = self.collect_switch_ports_for_host(topology, host).await?;
if !switch_ports.is_empty() {
topology
.configure_host_network(host, HostNetworkConfig { switch_ports })
.await
.map_err(|e| InterpretError::new(format!("Failed to configure host: {e}")))?;
}
Ok(())
}
async fn collect_switch_ports_for_host<T: Topology + Switch>(
&self,
topology: &T,
host: &PhysicalHost,
) -> Result<Vec<SwitchPort>, InterpretError> {
let mut switch_ports = vec![];
for network_interface in &host.network {
let mac_address = network_interface.mac_address;
match topology.get_port_for_mac_address(&mac_address).await {
Ok(Some(port)) => {
switch_ports.push(SwitchPort {
interface: NetworkInterface {
name: network_interface.name.clone(),
mac_address,
speed_mbps: network_interface.speed_mbps,
mtu: network_interface.mtu,
},
port,
});
}
Ok(None) => debug!("No port found for host '{}', skipping", host.id),
Err(e) => {
return Err(InterpretError::new(format!(
"Failed to get port for host '{}': {}",
host.id, e
)));
}
}
}
Ok(switch_ports)
}
}
#[async_trait]
impl<T: Topology + Switch> Interpret<T> for HostNetworkConfigurationInterpret {
fn get_name(&self) -> InterpretName {
@@ -110,24 +57,43 @@ impl<T: Topology + Switch> Interpret<T> for HostNetworkConfigurationInterpret {
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
if self.score.hosts.is_empty() {
return Ok(Outcome::noop("No hosts to configure".into()));
}
info!(
"Started network configuration for {} host(s)...",
self.score.hosts.len()
);
topology
.setup_switch()
.await
.map_err(|e| InterpretError::new(format!("Switch setup failed: {e}")))?;
let mut configured_host_count = 0;
for host in &self.score.hosts {
self.configure_network_for_host(topology, host).await?;
configured_host_count += 1;
let mut switch_ports = vec![];
for network_interface in &host.network {
let mac_address = network_interface.mac_address;
match topology.get_port_for_mac_address(&mac_address).await {
Ok(Some(port)) => {
switch_ports.push(SwitchPort {
interface: NetworkInterface {
name: network_interface.name.clone(),
mac_address,
speed_mbps: network_interface.speed_mbps,
mtu: network_interface.mtu,
},
port,
});
}
Ok(None) => debug!("No port found for host '{}', skipping", host.id),
Err(e) => warn!("Failed to get port for host '{}': {}", host.id, e),
}
}
if !switch_ports.is_empty() {
configured_host_count += 1;
topology
.configure_host_network(host, HostNetworkConfig { switch_ports })
.await
.map_err(|e| InterpretError::new(format!("Failed to configure host: {e}")))?;
}
}
if configured_host_count > 0 {
@@ -185,18 +151,6 @@ mod tests {
pub static ref ANOTHER_PORT: PortLocation = PortLocation(2, 0, 42);
}
#[tokio::test]
async fn should_setup_switch() {
let host = given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]);
let score = given_score(vec![host]);
let topology = TopologyWithSwitch::new();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let switch_setup = topology.switch_setup.lock().unwrap();
assert_that!(*switch_setup).is_true();
}
#[tokio::test]
async fn host_with_one_mac_address_should_create_bond_with_one_interface() {
let host = given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]);
@@ -283,6 +237,7 @@ mod tests {
#[tokio::test]
async fn port_not_found_for_mac_address_should_not_configure_interface() {
// FIXME: Should it still configure an empty bond/port channel?
let score = given_score(vec![given_host(&HOST_ID, vec![UNKNOWN_INTERFACE.clone()])]);
let topology = TopologyWithSwitch::new_port_not_found();
@@ -329,7 +284,6 @@ mod tests {
struct TopologyWithSwitch {
available_ports: Arc<Mutex<Vec<PortLocation>>>,
configured_host_networks: Arc<Mutex<Vec<(Id, HostNetworkConfig)>>>,
switch_setup: Arc<Mutex<bool>>,
}
impl TopologyWithSwitch {
@@ -337,7 +291,6 @@ mod tests {
Self {
available_ports: Arc::new(Mutex::new(vec![PORT.clone(), ANOTHER_PORT.clone()])),
configured_host_networks: Arc::new(Mutex::new(vec![])),
switch_setup: Arc::new(Mutex::new(false)),
}
}
@@ -345,7 +298,6 @@ mod tests {
Self {
available_ports: Arc::new(Mutex::new(vec![])),
configured_host_networks: Arc::new(Mutex::new(vec![])),
switch_setup: Arc::new(Mutex::new(false)),
}
}
}
@@ -363,12 +315,6 @@ mod tests {
#[async_trait]
impl Switch for TopologyWithSwitch {
async fn setup_switch(&self) -> Result<(), SwitchError> {
let mut switch_configured = self.switch_setup.lock().unwrap();
*switch_configured = true;
Ok(())
}
async fn get_port_for_mac_address(
&self,
_mac_address: &MacAddress,

View File

@@ -4,7 +4,7 @@ use std::{
};
use async_trait::async_trait;
use log::{debug, warn};
use log::{info, warn};
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
@@ -19,8 +19,8 @@ use harmony_types::id::Id;
#[derive(Debug, Clone, Serialize)]
pub struct CephRemoveOsd {
pub osd_deployment_name: String,
pub rook_ceph_namespace: String,
osd_deployment_name: String,
rook_ceph_namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
@@ -54,17 +54,18 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
self.verify_deployment_scaled(client.clone()).await?;
self.delete_deployment(client.clone()).await?;
self.verify_deployment_deleted(client.clone()).await?;
self.purge_ceph_osd(client.clone()).await?;
self.verify_ceph_osd_removal(client.clone()).await?;
let osd_id_full = self.get_ceph_osd_id().unwrap();
self.purge_ceph_osd(client.clone(), &osd_id_full).await?;
self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
.await?;
Ok(Outcome::success(format!(
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
osd_id_full, self.score.osd_deployment_name
)))
}
fn get_name(&self) -> InterpretName {
InterpretName::CephRemoveOsd
todo!()
}
fn get_version(&self) -> Version {
@@ -81,7 +82,7 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
}
impl CephRemoveOsdInterpret {
pub fn get_ceph_osd_id_numeric(&self) -> Result<String, InterpretError> {
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self
.score
.osd_deployment_name
@@ -93,14 +94,9 @@ impl CephRemoveOsdInterpret {
self.score.osd_deployment_name
))
})?;
Ok(osd_id_numeric.to_string())
}
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
let osd_id_full = format!("osd.{}", osd_id_numeric);
debug!(
info!(
"Targeting Ceph OSD: {} (parsed from deployment {})",
osd_id_full, self.score.osd_deployment_name
);
@@ -112,7 +108,6 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
debug!("verifying toolbox exists");
let toolbox_dep = "rook-ceph-tools".to_string();
match client
@@ -154,7 +149,7 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
debug!(
info!(
"Scaling down OSD deployment: {}",
self.score.osd_deployment_name
);
@@ -177,7 +172,7 @@ impl CephRemoveOsdInterpret {
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
debug!("Waiting for OSD deployment to scale down to 0 replicas");
info!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
@@ -185,9 +180,11 @@ impl CephRemoveOsdInterpret {
Some(&self.score.rook_ceph_namespace),
)
.await?;
if let Some(deployment) = dep {
if let Some(status) = deployment.status {
if status.replicas == None && status.ready_replicas == None {
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0
{
return Ok(Outcome::success(
"Deployment successfully scaled down.".to_string(),
));
@@ -215,7 +212,7 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
debug!(
info!(
"Deleting OSD deployment: {}",
self.score.osd_deployment_name
);
@@ -237,7 +234,7 @@ impl CephRemoveOsdInterpret {
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
debug!("Verifying OSD deployment deleted");
info!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
@@ -247,7 +244,7 @@ impl CephRemoveOsdInterpret {
.await?;
if dep.is_none() {
debug!(
info!(
"Deployment {} successfully deleted.",
self.score.osd_deployment_name
);
@@ -279,10 +276,12 @@ impl CephRemoveOsdInterpret {
Ok(tree)
}
pub async fn purge_ceph_osd(&self, client: Arc<K8sClient>) -> Result<Outcome, InterpretError> {
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
let osd_id_full = self.get_ceph_osd_id().unwrap();
debug!(
pub async fn purge_ceph_osd(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
info!(
"Purging OSD {} from Ceph cluster and removing its auth key",
osd_id_full
);
@@ -292,9 +291,8 @@ impl CephRemoveOsdInterpret {
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec![
"sh",
"-c",
format!("ceph osd purge {osd_id_numeric} --yes-i-really-mean-it && ceph auth del {osd_id_full}").as_str(),
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(),
format!("ceph auth del osd.{osd_id_full}").as_str(),
],
)
.await?;
@@ -307,10 +305,10 @@ impl CephRemoveOsdInterpret {
pub async fn verify_ceph_osd_removal(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
let osd_id_full = self.get_ceph_osd_id().unwrap();
debug!(
info!(
"Verifying OSD {} has been removed from the Ceph tree...",
osd_id_full
);
@@ -320,7 +318,7 @@ impl CephRemoveOsdInterpret {
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec!["sh", "-c", "ceph osd tree -f json"],
vec!["ceph osd tree -f json"],
)
.await?;
let tree =

View File

@@ -1,2 +1,2 @@
pub mod ceph_remove_osd_score;
pub mod ceph_osd_replacement_score;
pub mod ceph_validate_health_score;

View File

@@ -39,7 +39,7 @@ impl FromStr for PortLocation {
///
/// ```rust
/// use std::str::FromStr;
/// use harmony_types::switch::PortLocation;
/// use brocade::port::PortLocation;
///
/// assert_eq!(PortLocation::from_str("1/1/1").unwrap(), PortLocation(1, 1, 1));
/// assert_eq!(PortLocation::from_str("12/5/48").unwrap(), PortLocation(12, 5, 48));
@@ -64,8 +64,8 @@ impl FromStr for PortLocation {
}
}
/// Represents a Port configuration input, which can be a single port, a sequential range,
/// or an explicit set defined by endpoints.
/// Represents a Brocade Port configuration input, which can be a single port, a sequential
/// range, or an explicit set defined by endpoints.
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone)]
pub enum PortDeclaration {
/// A single switch port defined by its location. Example: `PortDeclaration::Single(1/1/1)`
@@ -82,7 +82,7 @@ pub enum PortDeclaration {
}
impl PortDeclaration {
/// Parses a port configuration string into a structured `PortDeclaration` enum.
/// Parses a Brocade port configuration string into a structured `PortDeclaration` enum.
///
/// This function performs only basic format and numerical parsing, assuming the input
/// strings (e.g., from `show` commands) are semantically valid and logically ordered.
@@ -101,7 +101,7 @@ impl PortDeclaration {
/// # Examples
///
/// ```rust
/// use harmony_types::switch::{PortDeclaration, PortLocation};
/// use brocade::port::{PortDeclaration, PortLocation};
///
/// // Single Port
/// assert_eq!(PortDeclaration::parse("3/2/15").unwrap(), PortDeclaration::Single(PortLocation(3, 2, 15)));
@@ -150,7 +150,7 @@ mod tests {
use super::*;
#[test]
fn test_parse_port_location_invalid() {
fn test_parse_single_port_location_invalid() {
assert!(PortLocation::from_str("1/1").is_err());
assert!(PortLocation::from_str("1/A/1").is_err());
assert!(PortLocation::from_str("1/1/256").is_err());

View File

@@ -77,7 +77,7 @@ impl YaSerializeTrait for HAProxyId {
}
}
#[derive(PartialEq, Debug, Clone)]
#[derive(PartialEq, Debug)]
pub struct HAProxyId(String);
impl Default for HAProxyId {
@@ -297,7 +297,7 @@ pub struct HAProxyFrontends {
pub frontend: Vec<Frontend>,
}
#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct Frontend {
#[yaserde(attribute = true)]
pub uuid: String,
@@ -310,7 +310,7 @@ pub struct Frontend {
pub bind_options: MaybeString,
pub mode: String,
#[yaserde(rename = "defaultBackend")]
pub default_backend: Option<String>,
pub default_backend: String,
pub ssl_enabled: i32,
pub ssl_certificates: MaybeString,
pub ssl_default_certificate: MaybeString,
@@ -416,7 +416,7 @@ pub struct HAProxyBackends {
pub backends: Vec<HAProxyBackend>,
}
#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct HAProxyBackend {
#[yaserde(attribute = true, rename = "uuid")]
pub uuid: String,
@@ -535,7 +535,7 @@ pub struct HAProxyServers {
pub servers: Vec<HAProxyServer>,
}
#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct HAProxyServer {
#[yaserde(attribute = true, rename = "uuid")]
pub uuid: String,
@@ -543,8 +543,8 @@ pub struct HAProxyServer {
pub enabled: u8,
pub name: String,
pub description: MaybeString,
pub address: Option<String>,
pub port: Option<u16>,
pub address: String,
pub port: u16,
pub checkport: MaybeString,
pub mode: String,
pub multiplexer_protocol: MaybeString,
@@ -589,7 +589,7 @@ pub struct HAProxyHealthChecks {
pub healthchecks: Vec<HAProxyHealthCheck>,
}
#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct HAProxyHealthCheck {
#[yaserde(attribute = true)]
pub uuid: String,

View File

@@ -25,7 +25,6 @@ sha2 = "0.10.9"
[dev-dependencies]
pretty_assertions.workspace = true
assertor.workspace = true
[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(e2e_test)'] }

View File

@@ -30,7 +30,8 @@ impl SshConfigManager {
self.opnsense_shell
.exec(&format!(
"cp /conf/config.xml /conf/backup/{backup_filename}"
"cp /conf/config.xml /conf/backup/{}",
backup_filename
))
.await
}

View File

@@ -1,8 +1,10 @@
mod ssh;
use crate::Error;
use async_trait::async_trait;
pub use ssh::*;
use async_trait::async_trait;
use crate::Error;
#[async_trait]
pub trait OPNsenseShell: std::fmt::Debug + Send + Sync {
async fn exec(&self, command: &str) -> Result<String, Error>;

View File

@@ -1,8 +1,11 @@
use crate::{config::OPNsenseShell, Error};
use std::sync::Arc;
use log::warn;
use opnsense_config_xml::{
Frontend, HAProxy, HAProxyBackend, HAProxyHealthCheck, HAProxyServer, OPNsense,
};
use std::{collections::HashSet, sync::Arc};
use crate::{config::OPNsenseShell, Error};
pub struct LoadBalancerConfig<'a> {
opnsense: &'a mut OPNsense,
@@ -28,7 +31,7 @@ impl<'a> LoadBalancerConfig<'a> {
match &mut self.opnsense.opnsense.haproxy.as_mut() {
Some(haproxy) => f(haproxy),
None => unimplemented!(
"Cannot configure load balancer when haproxy config does not exist yet"
"Adding a backend is not supported when haproxy config does not exist yet"
),
}
}
@@ -37,67 +40,21 @@ impl<'a> LoadBalancerConfig<'a> {
self.with_haproxy(|haproxy| haproxy.general.enabled = enabled as i32);
}
/// Configures a service by removing any existing service on the same port
/// and then adding the new definition. This ensures idempotency.
pub fn configure_service(
&mut self,
frontend: Frontend,
backend: HAProxyBackend,
servers: Vec<HAProxyServer>,
healthcheck: Option<HAProxyHealthCheck>,
) {
self.remove_service_by_bind_address(&frontend.bind);
self.remove_servers(&servers);
self.add_new_service(frontend, backend, servers, healthcheck);
pub fn add_backend(&mut self, backend: HAProxyBackend) {
warn!("TODO make sure this new backend does not refer non-existing entities like servers or health checks");
self.with_haproxy(|haproxy| haproxy.backends.backends.push(backend));
}
// Remove the corresponding real servers based on their name if they already exist.
fn remove_servers(&mut self, servers: &[HAProxyServer]) {
let server_names: HashSet<_> = servers.iter().map(|s| s.name.clone()).collect();
self.with_haproxy(|haproxy| {
haproxy
.servers
.servers
.retain(|s| !server_names.contains(&s.name));
});
pub fn add_frontend(&mut self, frontend: Frontend) {
self.with_haproxy(|haproxy| haproxy.frontends.frontend.push(frontend));
}
/// Removes a service and its dependent components based on the frontend's bind address.
/// This performs a cascading delete of the frontend, backend, servers, and health check.
fn remove_service_by_bind_address(&mut self, bind_address: &str) {
self.with_haproxy(|haproxy| {
let Some(old_frontend) = remove_frontend_by_bind_address(haproxy, bind_address) else {
return;
};
let Some(old_backend) = remove_backend(haproxy, old_frontend) else {
return;
};
remove_healthcheck(haproxy, &old_backend);
remove_linked_servers(haproxy, &old_backend);
});
pub fn add_healthcheck(&mut self, healthcheck: HAProxyHealthCheck) {
self.with_haproxy(|haproxy| haproxy.healthchecks.healthchecks.push(healthcheck));
}
/// Adds the components of a new service to the HAProxy configuration.
/// This function de-duplicates servers by name to prevent configuration errors.
fn add_new_service(
&mut self,
frontend: Frontend,
backend: HAProxyBackend,
servers: Vec<HAProxyServer>,
healthcheck: Option<HAProxyHealthCheck>,
) {
self.with_haproxy(|haproxy| {
if let Some(check) = healthcheck {
haproxy.healthchecks.healthchecks.push(check);
}
haproxy.servers.servers.extend(servers);
haproxy.backends.backends.push(backend);
haproxy.frontends.frontend.push(frontend);
});
pub fn add_servers(&mut self, mut servers: Vec<HAProxyServer>) {
self.with_haproxy(|haproxy| haproxy.servers.servers.append(&mut servers));
}
pub async fn reload_restart(&self) -> Result<(), Error> {
@@ -125,262 +82,3 @@ impl<'a> LoadBalancerConfig<'a> {
Ok(())
}
}
fn remove_frontend_by_bind_address(haproxy: &mut HAProxy, bind_address: &str) -> Option<Frontend> {
let pos = haproxy
.frontends
.frontend
.iter()
.position(|f| f.bind == bind_address);
match pos {
Some(pos) => Some(haproxy.frontends.frontend.remove(pos)),
None => None,
}
}
fn remove_backend(haproxy: &mut HAProxy, old_frontend: Frontend) -> Option<HAProxyBackend> {
let default_backend = old_frontend.default_backend?;
let pos = haproxy
.backends
.backends
.iter()
.position(|b| b.uuid == default_backend);
match pos {
Some(pos) => Some(haproxy.backends.backends.remove(pos)),
None => None, // orphaned frontend, shouldn't happen
}
}
fn remove_healthcheck(haproxy: &mut HAProxy, backend: &HAProxyBackend) {
if let Some(uuid) = &backend.health_check.content {
haproxy
.healthchecks
.healthchecks
.retain(|h| h.uuid != *uuid);
}
}
/// Remove the backend's servers. This assumes servers are not shared between services.
fn remove_linked_servers(haproxy: &mut HAProxy, backend: &HAProxyBackend) {
if let Some(server_uuids_str) = &backend.linked_servers.content {
let server_uuids_to_remove: HashSet<_> = server_uuids_str.split(',').collect();
haproxy
.servers
.servers
.retain(|s| !server_uuids_to_remove.contains(s.uuid.as_str()));
}
}
#[cfg(test)]
mod tests {
use crate::config::DummyOPNSenseShell;
use assertor::*;
use opnsense_config_xml::{
Frontend, HAProxy, HAProxyBackend, HAProxyBackends, HAProxyFrontends, HAProxyHealthCheck,
HAProxyHealthChecks, HAProxyId, HAProxyServer, HAProxyServers, MaybeString, OPNsense,
};
use std::sync::Arc;
use super::LoadBalancerConfig;
static SERVICE_BIND_ADDRESS: &str = "192.168.1.1:80";
static OTHER_SERVICE_BIND_ADDRESS: &str = "192.168.1.1:443";
static SERVER_ADDRESS: &str = "1.1.1.1:80";
static OTHER_SERVER_ADDRESS: &str = "1.1.1.1:443";
#[test]
fn configure_service_should_add_all_service_components_to_haproxy() {
let mut opnsense = given_opnsense();
let mut load_balancer = given_load_balancer(&mut opnsense);
let (healthcheck, servers, backend, frontend) =
given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS);
load_balancer.configure_service(
frontend.clone(),
backend.clone(),
servers.clone(),
Some(healthcheck.clone()),
);
assert_haproxy_configured_with(
opnsense,
vec![frontend],
vec![backend],
servers,
vec![healthcheck],
);
}
#[test]
fn configure_service_should_replace_service_on_same_bind_address() {
let (healthcheck, servers, backend, frontend) =
given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS);
let mut opnsense = given_opnsense_with(given_haproxy(
vec![frontend.clone()],
vec![backend.clone()],
servers.clone(),
vec![healthcheck.clone()],
));
let mut load_balancer = given_load_balancer(&mut opnsense);
let (updated_healthcheck, updated_servers, updated_backend, updated_frontend) =
given_service(SERVICE_BIND_ADDRESS, OTHER_SERVER_ADDRESS);
load_balancer.configure_service(
updated_frontend.clone(),
updated_backend.clone(),
updated_servers.clone(),
Some(updated_healthcheck.clone()),
);
assert_haproxy_configured_with(
opnsense,
vec![updated_frontend],
vec![updated_backend],
updated_servers,
vec![updated_healthcheck],
);
}
#[test]
fn configure_service_should_keep_existing_service_on_different_bind_addresses() {
let (healthcheck, servers, backend, frontend) =
given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS);
let (other_healthcheck, other_servers, other_backend, other_frontend) =
given_service(OTHER_SERVICE_BIND_ADDRESS, OTHER_SERVER_ADDRESS);
let mut opnsense = given_opnsense_with(given_haproxy(
vec![frontend.clone()],
vec![backend.clone()],
servers.clone(),
vec![healthcheck.clone()],
));
let mut load_balancer = given_load_balancer(&mut opnsense);
load_balancer.configure_service(
other_frontend.clone(),
other_backend.clone(),
other_servers.clone(),
Some(other_healthcheck.clone()),
);
assert_haproxy_configured_with(
opnsense,
vec![frontend, other_frontend],
vec![backend, other_backend],
[servers, other_servers].concat(),
vec![healthcheck, other_healthcheck],
);
}
fn assert_haproxy_configured_with(
opnsense: OPNsense,
frontends: Vec<Frontend>,
backends: Vec<HAProxyBackend>,
servers: Vec<HAProxyServer>,
healthchecks: Vec<HAProxyHealthCheck>,
) {
let haproxy = opnsense.opnsense.haproxy.as_ref().unwrap();
assert_that!(haproxy.frontends.frontend).contains_exactly(frontends);
assert_that!(haproxy.backends.backends).contains_exactly(backends);
assert_that!(haproxy.servers.servers).is_equal_to(servers);
assert_that!(haproxy.healthchecks.healthchecks).contains_exactly(healthchecks);
}
fn given_opnsense() -> OPNsense {
OPNsense::default()
}
fn given_opnsense_with(haproxy: HAProxy) -> OPNsense {
let mut opnsense = OPNsense::default();
opnsense.opnsense.haproxy = Some(haproxy);
opnsense
}
fn given_load_balancer<'a>(opnsense: &'a mut OPNsense) -> LoadBalancerConfig<'a> {
let opnsense_shell = Arc::new(DummyOPNSenseShell {});
if opnsense.opnsense.haproxy.is_none() {
opnsense.opnsense.haproxy = Some(HAProxy::default());
}
LoadBalancerConfig::new(opnsense, opnsense_shell)
}
fn given_service(
bind_address: &str,
server_address: &str,
) -> (
HAProxyHealthCheck,
Vec<HAProxyServer>,
HAProxyBackend,
Frontend,
) {
let healthcheck = given_healthcheck();
let servers = vec![given_server(server_address)];
let backend = given_backend();
let frontend = given_frontend(bind_address);
(healthcheck, servers, backend, frontend)
}
fn given_haproxy(
frontends: Vec<Frontend>,
backends: Vec<HAProxyBackend>,
servers: Vec<HAProxyServer>,
healthchecks: Vec<HAProxyHealthCheck>,
) -> HAProxy {
HAProxy {
frontends: HAProxyFrontends {
frontend: frontends,
},
backends: HAProxyBackends { backends },
servers: HAProxyServers { servers },
healthchecks: HAProxyHealthChecks { healthchecks },
..Default::default()
}
}
fn given_frontend(bind_address: &str) -> Frontend {
Frontend {
uuid: "uuid".into(),
id: HAProxyId::default(),
enabled: 1,
name: format!("frontend_{bind_address}"),
bind: bind_address.into(),
default_backend: Some("backend-uuid".into()),
..Default::default()
}
}
fn given_backend() -> HAProxyBackend {
HAProxyBackend {
uuid: "backend-uuid".into(),
id: HAProxyId::default(),
enabled: 1,
name: "backend_192.168.1.1:80".into(),
linked_servers: MaybeString::from("server-uuid"),
health_check_enabled: 1,
health_check: MaybeString::from("healthcheck-uuid"),
..Default::default()
}
}
fn given_server(address: &str) -> HAProxyServer {
HAProxyServer {
uuid: "server-uuid".into(),
id: HAProxyId::default(),
name: address.into(),
address: Some(address.into()),
..Default::default()
}
}
fn given_healthcheck() -> HAProxyHealthCheck {
HAProxyHealthCheck {
uuid: "healthcheck-uuid".into(),
name: "healthcheck".into(),
..Default::default()
}
}
}