Merge branch 'master' into feat/impl_installable_crd_prometheus

This commit is contained in:
2025-10-24 11:23:56 -04:00
56 changed files with 4195 additions and 232 deletions

View File

@@ -77,6 +77,9 @@ harmony_secret = { path = "../harmony_secret" }
askama.workspace = true
sqlx.workspace = true
inquire.workspace = true
brocade = { path = "../brocade" }
option-ext = "0.2.0"
[dev-dependencies]
pretty_assertions.workspace = true
assertor.workspace = true

View File

@@ -30,6 +30,7 @@ pub enum InterpretName {
Lamp,
ApplicationMonitoring,
K8sPrometheusCrdAlerting,
CephRemoveOsd,
DiscoverInventoryAgent,
CephClusterHealth,
Custom(&'static str),
@@ -61,6 +62,7 @@ impl std::fmt::Display for InterpretName {
InterpretName::Lamp => f.write_str("LAMP"),
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
InterpretName::CephRemoveOsd => f.write_str("CephRemoveOsd"),
InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"),
InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"),
InterpretName::Custom(name) => f.write_str(name),

View File

@@ -1,33 +1,31 @@
use async_trait::async_trait;
use harmony_macros::ip;
use harmony_types::net::MacAddress;
use harmony_types::net::Url;
use harmony_types::{
net::{MacAddress, Url},
switch::PortLocation,
};
use k8s_openapi::api::core::v1::Namespace;
use kube::api::ObjectMeta;
use log::debug;
use log::info;
use crate::data::FileContent;
use crate::executors::ExecutorError;
use crate::hardware::PhysicalHost;
use crate::modules::okd::crd::{
InstallPlanApproval, OperatorGroup, OperatorGroupSpec, Subscription, SubscriptionSpec,
nmstate::{self, NMState, NodeNetworkConfigurationPolicy, NodeNetworkConfigurationPolicySpec},
};
use crate::topology::PxeOptions;
use super::DHCPStaticEntry;
use super::DhcpServer;
use super::DnsRecord;
use super::DnsRecordType;
use super::DnsServer;
use super::Firewall;
use super::HttpServer;
use super::IpAddress;
use super::K8sclient;
use super::LoadBalancer;
use super::LoadBalancerService;
use super::LogicalHost;
use super::PreparationError;
use super::PreparationOutcome;
use super::Router;
use super::TftpServer;
use super::{
DHCPStaticEntry, DhcpServer, DnsRecord, DnsRecordType, DnsServer, Firewall, HostNetworkConfig,
HttpServer, IpAddress, K8sclient, LoadBalancer, LoadBalancerService, LogicalHost,
PreparationError, PreparationOutcome, Router, Switch, SwitchClient, SwitchError, TftpServer,
Topology, k8s::K8sClient,
};
use super::Topology;
use super::k8s::K8sClient;
use std::collections::BTreeMap;
use std::sync::Arc;
#[derive(Debug, Clone)]
@@ -40,10 +38,10 @@ pub struct HAClusterTopology {
pub tftp_server: Arc<dyn TftpServer>,
pub http_server: Arc<dyn HttpServer>,
pub dns_server: Arc<dyn DnsServer>,
pub switch_client: Arc<dyn SwitchClient>,
pub bootstrap_host: LogicalHost,
pub control_plane: Vec<LogicalHost>,
pub workers: Vec<LogicalHost>,
pub switch: Vec<LogicalHost>,
}
#[async_trait]
@@ -89,6 +87,210 @@ impl HAClusterTopology {
.to_string()
}
async fn ensure_nmstate_operator_installed(&self) -> Result<(), String> {
// FIXME: Find a way to check nmstate is already available (get pod -n openshift-nmstate)
debug!("Installing NMState operator...");
let k8s_client = self.k8s_client().await?;
let nmstate_namespace = Namespace {
metadata: ObjectMeta {
name: Some("openshift-nmstate".to_string()),
finalizers: Some(vec!["kubernetes".to_string()]),
..Default::default()
},
..Default::default()
};
debug!("Creating NMState namespace: {nmstate_namespace:#?}");
k8s_client
.apply(&nmstate_namespace, None)
.await
.map_err(|e| e.to_string())?;
let nmstate_operator_group = OperatorGroup {
metadata: ObjectMeta {
name: Some("openshift-nmstate".to_string()),
namespace: Some("openshift-nmstate".to_string()),
..Default::default()
},
spec: OperatorGroupSpec {
target_namespaces: vec!["openshift-nmstate".to_string()],
},
};
debug!("Creating NMState operator group: {nmstate_operator_group:#?}");
k8s_client
.apply(&nmstate_operator_group, None)
.await
.map_err(|e| e.to_string())?;
let nmstate_subscription = Subscription {
metadata: ObjectMeta {
name: Some("kubernetes-nmstate-operator".to_string()),
namespace: Some("openshift-nmstate".to_string()),
..Default::default()
},
spec: SubscriptionSpec {
channel: Some("stable".to_string()),
install_plan_approval: Some(InstallPlanApproval::Automatic),
name: "kubernetes-nmstate-operator".to_string(),
source: "redhat-operators".to_string(),
source_namespace: "openshift-marketplace".to_string(),
},
};
debug!("Subscribing to NMState Operator: {nmstate_subscription:#?}");
k8s_client
.apply(&nmstate_subscription, None)
.await
.map_err(|e| e.to_string())?;
let nmstate = NMState {
metadata: ObjectMeta {
name: Some("nmstate".to_string()),
..Default::default()
},
..Default::default()
};
debug!("Creating NMState: {nmstate:#?}");
k8s_client
.apply(&nmstate, None)
.await
.map_err(|e| e.to_string())?;
Ok(())
}
fn get_next_bond_id(&self) -> u8 {
42 // FIXME: Find a better way to declare the bond id
}
async fn configure_bond(
&self,
host: &PhysicalHost,
config: &HostNetworkConfig,
) -> Result<(), SwitchError> {
self.ensure_nmstate_operator_installed()
.await
.map_err(|e| {
SwitchError::new(format!(
"Can't configure bond, NMState operator not available: {e}"
))
})?;
let bond_config = self.create_bond_configuration(host, config);
debug!("Configuring bond for host {host:?}: {bond_config:#?}");
self.k8s_client()
.await
.unwrap()
.apply(&bond_config, None)
.await
.unwrap();
todo!()
}
fn create_bond_configuration(
&self,
host: &PhysicalHost,
config: &HostNetworkConfig,
) -> NodeNetworkConfigurationPolicy {
let host_name = host.id.clone();
let bond_id = self.get_next_bond_id();
let bond_name = format!("bond{bond_id}");
let mut bond_mtu: Option<u32> = None;
let mut bond_mac_address: Option<String> = None;
let mut bond_ports = Vec::new();
let mut interfaces: Vec<nmstate::InterfaceSpec> = Vec::new();
for switch_port in &config.switch_ports {
let interface_name = switch_port.interface.name.clone();
interfaces.push(nmstate::InterfaceSpec {
name: interface_name.clone(),
description: Some(format!("Member of bond {bond_name}")),
r#type: "ethernet".to_string(),
state: "up".to_string(),
mtu: Some(switch_port.interface.mtu),
mac_address: Some(switch_port.interface.mac_address.to_string()),
ipv4: Some(nmstate::IpStackSpec {
enabled: Some(false),
..Default::default()
}),
ipv6: Some(nmstate::IpStackSpec {
enabled: Some(false),
..Default::default()
}),
link_aggregation: None,
..Default::default()
});
bond_ports.push(interface_name);
// Use the first port's details for the bond mtu and mac address
if bond_mtu.is_none() {
bond_mtu = Some(switch_port.interface.mtu);
}
if bond_mac_address.is_none() {
bond_mac_address = Some(switch_port.interface.mac_address.to_string());
}
}
interfaces.push(nmstate::InterfaceSpec {
name: bond_name.clone(),
description: Some(format!("Network bond for host {host_name}")),
r#type: "bond".to_string(),
state: "up".to_string(),
mtu: bond_mtu,
mac_address: bond_mac_address,
ipv4: Some(nmstate::IpStackSpec {
dhcp: Some(true),
enabled: Some(true),
..Default::default()
}),
ipv6: Some(nmstate::IpStackSpec {
dhcp: Some(true),
autoconf: Some(true),
enabled: Some(true),
..Default::default()
}),
link_aggregation: Some(nmstate::BondSpec {
mode: "802.3ad".to_string(),
ports: bond_ports,
..Default::default()
}),
..Default::default()
});
NodeNetworkConfigurationPolicy {
metadata: ObjectMeta {
name: Some(format!("{host_name}-bond-config")),
..Default::default()
},
spec: NodeNetworkConfigurationPolicySpec {
node_selector: Some(BTreeMap::from([(
"kubernetes.io/hostname".to_string(),
host_name.to_string(),
)])),
desired_state: nmstate::DesiredStateSpec { interfaces },
},
}
}
async fn configure_port_channel(
&self,
host: &PhysicalHost,
config: &HostNetworkConfig,
) -> Result<(), SwitchError> {
debug!("Configuring port channel: {config:#?}");
let switch_ports = config.switch_ports.iter().map(|s| s.port.clone()).collect();
self.switch_client
.configure_port_channel(&format!("Harmony_{}", host.id), switch_ports)
.await
.map_err(|e| SwitchError::new(format!("Failed to configure switch: {e}")))?;
Ok(())
}
pub fn autoload() -> Self {
let dummy_infra = Arc::new(DummyInfra {});
let dummy_host = LogicalHost {
@@ -105,10 +307,10 @@ impl HAClusterTopology {
tftp_server: dummy_infra.clone(),
http_server: dummy_infra.clone(),
dns_server: dummy_infra.clone(),
switch_client: dummy_infra.clone(),
bootstrap_host: dummy_host,
control_plane: vec![],
workers: vec![],
switch: vec![],
}
}
}
@@ -263,6 +465,31 @@ impl HttpServer for HAClusterTopology {
}
}
#[async_trait]
impl Switch for HAClusterTopology {
async fn setup_switch(&self) -> Result<(), SwitchError> {
self.switch_client.setup().await?;
Ok(())
}
async fn get_port_for_mac_address(
&self,
mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError> {
let port = self.switch_client.find_port(mac_address).await?;
Ok(port)
}
async fn configure_host_network(
&self,
host: &PhysicalHost,
config: HostNetworkConfig,
) -> Result<(), SwitchError> {
self.configure_bond(host, &config).await?;
self.configure_port_channel(host, &config).await
}
}
#[derive(Debug)]
pub struct DummyInfra;
@@ -332,8 +559,8 @@ impl DhcpServer for DummyInfra {
}
async fn set_dhcp_range(
&self,
start: &IpAddress,
end: &IpAddress,
_start: &IpAddress,
_end: &IpAddress,
) -> Result<(), ExecutorError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
@@ -449,3 +676,25 @@ impl DnsServer for DummyInfra {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
}
#[async_trait]
impl SwitchClient for DummyInfra {
async fn setup(&self) -> Result<(), SwitchError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
async fn find_port(
&self,
_mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
async fn configure_port_channel(
&self,
_channel_name: &str,
_switch_ports: Vec<PortLocation>,
) -> Result<u8, SwitchError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
}

View File

@@ -22,7 +22,7 @@ use kube::{
api::{ApiResource, GroupVersionKind},
runtime::wait::await_condition,
};
use log::{debug, error, trace};
use log::{debug, error, info, trace};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::json;
use similar::TextDiff;
@@ -97,10 +97,13 @@ impl K8sClient {
namespace: Option<&str>,
) -> Result<Option<Deployment>, Error> {
let deps: Api<Deployment> = if let Some(ns) = namespace {
debug!("getting namespaced deployment");
Api::namespaced(self.client.clone(), ns)
} else {
debug!("getting default namespace deployment");
Api::default_namespaced(self.client.clone())
};
debug!("getting deployment {} in ns {}", name, namespace.unwrap());
Ok(deps.get_opt(name).await?)
}
@@ -131,7 +134,7 @@ impl K8sClient {
}
});
let pp = PatchParams::default();
let scale = Patch::Apply(&patch);
let scale = Patch::Merge(&patch);
deployments.patch_scale(name, &pp, &scale).await?;
Ok(())
}

View File

@@ -65,10 +65,10 @@ struct K8sState {
}
#[derive(Debug, Clone)]
pub enum K8sFlavour {
Okd,
K3d,
K8s,
pub enum KubernetesDistribution {
OpenshiftFamily,
K3sFamily,
Default,
}
#[derive(Debug, Clone)]
@@ -81,7 +81,7 @@ enum K8sSource {
pub struct K8sAnywhereTopology {
k8s_state: Arc<OnceCell<Option<K8sState>>>,
tenant_manager: Arc<OnceCell<K8sTenantManager>>,
flavour: Arc<OnceCell<K8sFlavour>>,
k8s_distribution: Arc<OnceCell<KubernetesDistribution>>,
config: Arc<K8sAnywhereConfig>,
}
@@ -326,7 +326,7 @@ impl K8sAnywhereTopology {
Self {
k8s_state: Arc::new(OnceCell::new()),
tenant_manager: Arc::new(OnceCell::new()),
flavour: Arc::new(OnceCell::new()),
k8s_distribution: Arc::new(OnceCell::new()),
config: Arc::new(K8sAnywhereConfig::from_env()),
}
}
@@ -335,13 +335,13 @@ impl K8sAnywhereTopology {
Self {
k8s_state: Arc::new(OnceCell::new()),
tenant_manager: Arc::new(OnceCell::new()),
flavour: Arc::new(OnceCell::new()),
k8s_distribution: Arc::new(OnceCell::new()),
config: Arc::new(config),
}
}
pub async fn get_k8s_flavour(&self) -> K8sFlavour {
self.flavour
pub async fn get_k8s_distribution(&self) -> Result<&KubernetesDistribution, PreparationError> {
self.k8s_distribution
.get_or_try_init(async || {
let client = self.k8s_client().await.unwrap();
@@ -353,43 +353,22 @@ impl K8sAnywhereTopology {
PreparationError::new(format!("Could not get server version: {}", e))
})?;
let rules: &[&dyn Fn() -> Option<K8sFlavour>] = &[
// OpenShift / OKD
&|| {
discovery
.groups()
.any(|g| g.name().ends_with("openshift.io"))
.then_some(K8sFlavour::Okd)
},
// K3d / K3s
&|| {
version
.git_version
.contains("k3s")
.then_some(K8sFlavour::K3d)
},
// Vanilla Kubernetes
&|| {
if !discovery
.groups()
.any(|g| g.name().ends_with("openshift.io"))
&& !version.git_version.contains("k3s")
&& !version.git_version.contains("k3d")
{
Some(K8sFlavour::K8s)
} else {
None
}
},
];
// OpenShift / OKD
if discovery
.groups()
.any(|g| g.name() == "project.openshift.io")
{
return Ok(KubernetesDistribution::OpenshiftFamily);
}
rules.iter().find_map(|rule| rule()).ok_or_else(|| {
PreparationError::new("Unknown Kubernetes cluster flavour".to_string())
})
// K3d / K3s
if version.git_version.contains("k3s") {
return Ok(KubernetesDistribution::K3sFamily);
}
return Ok(KubernetesDistribution::Default);
})
.await
.unwrap()
.clone()
}
fn extract_and_normalize_token(&self, secret: &DynamicObject) -> Option<String> {

View File

@@ -28,13 +28,7 @@ pub trait LoadBalancer: Send + Sync {
&self,
service: &LoadBalancerService,
) -> Result<(), ExecutorError> {
debug!(
"Listing LoadBalancer services {:?}",
self.list_services().await
);
if !self.list_services().await.contains(service) {
self.add_service(service).await?;
}
self.add_service(service).await?;
Ok(())
}
}

View File

@@ -1,10 +1,20 @@
use std::{net::Ipv4Addr, str::FromStr, sync::Arc};
use std::{
error::Error,
fmt::{self, Debug},
net::Ipv4Addr,
str::FromStr,
sync::Arc,
};
use async_trait::async_trait;
use harmony_types::net::{IpAddress, MacAddress};
use derive_new::new;
use harmony_types::{
net::{IpAddress, MacAddress},
switch::PortLocation,
};
use serde::Serialize;
use crate::executors::ExecutorError;
use crate::{executors::ExecutorError, hardware::PhysicalHost};
use super::{LogicalHost, k8s::K8sClient};
@@ -15,8 +25,8 @@ pub struct DHCPStaticEntry {
pub ip: Ipv4Addr,
}
impl std::fmt::Display for DHCPStaticEntry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for DHCPStaticEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mac = self
.mac
.iter()
@@ -38,8 +48,8 @@ pub trait Firewall: Send + Sync {
fn get_host(&self) -> LogicalHost;
}
impl std::fmt::Debug for dyn Firewall {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl Debug for dyn Firewall {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!("Firewall {}", self.get_ip()))
}
}
@@ -61,7 +71,7 @@ pub struct PxeOptions {
}
#[async_trait]
pub trait DhcpServer: Send + Sync + std::fmt::Debug {
pub trait DhcpServer: Send + Sync + Debug {
async fn add_static_mapping(&self, entry: &DHCPStaticEntry) -> Result<(), ExecutorError>;
async fn remove_static_mapping(&self, mac: &MacAddress) -> Result<(), ExecutorError>;
async fn list_static_mappings(&self) -> Vec<(MacAddress, IpAddress)>;
@@ -100,8 +110,8 @@ pub trait DnsServer: Send + Sync {
}
}
impl std::fmt::Debug for dyn DnsServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl Debug for dyn DnsServer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!("DnsServer {}", self.get_ip()))
}
}
@@ -137,8 +147,8 @@ pub enum DnsRecordType {
TXT,
}
impl std::fmt::Display for DnsRecordType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for DnsRecordType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DnsRecordType::A => write!(f, "A"),
DnsRecordType::AAAA => write!(f, "AAAA"),
@@ -172,6 +182,80 @@ impl FromStr for DnsRecordType {
}
}
#[async_trait]
pub trait Switch: Send + Sync {
async fn setup_switch(&self) -> Result<(), SwitchError>;
async fn get_port_for_mac_address(
&self,
mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError>;
async fn configure_host_network(
&self,
host: &PhysicalHost,
config: HostNetworkConfig,
) -> Result<(), SwitchError>;
}
#[derive(Clone, Debug, PartialEq)]
pub struct HostNetworkConfig {
pub switch_ports: Vec<SwitchPort>,
}
#[derive(Clone, Debug, PartialEq)]
pub struct SwitchPort {
pub interface: NetworkInterface,
pub port: PortLocation,
}
#[derive(Clone, Debug, PartialEq)]
pub struct NetworkInterface {
pub name: String,
pub mac_address: MacAddress,
pub speed_mbps: Option<u32>,
pub mtu: u32,
}
#[derive(Debug, Clone, new)]
pub struct SwitchError {
msg: String,
}
impl fmt::Display for SwitchError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.msg)
}
}
impl Error for SwitchError {}
#[async_trait]
pub trait SwitchClient: Debug + Send + Sync {
/// Executes essential, idempotent, one-time initial configuration steps.
///
/// This is an opiniated procedure that setups a switch to provide high availability
/// capabilities as decided by the NationTech team.
///
/// This includes tasks like enabling switchport for all interfaces
/// except the ones intended for Fabric Networking, etc.
///
/// The implementation must ensure the operation is **idempotent** (safe to run multiple times)
/// and that it doesn't break existing configurations.
async fn setup(&self) -> Result<(), SwitchError>;
async fn find_port(
&self,
mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError>;
async fn configure_port_channel(
&self,
channel_name: &str,
switch_ports: Vec<PortLocation>,
) -> Result<u8, SwitchError>;
}
#[cfg(test)]
mod test {
use std::sync::Arc;

View File

@@ -21,6 +21,7 @@ pub struct AlertingInterpret<S: AlertSender> {
pub sender: S,
pub receivers: Vec<Box<dyn AlertReceiver<S>>>,
pub rules: Vec<Box<dyn AlertRule<S>>>,
pub scrape_targets: Option<Vec<Box<dyn ScrapeTarget<S>>>>,
}
#[async_trait]
@@ -39,7 +40,12 @@ impl<S: AlertSender + Installable<T>, T: Topology> Interpret<T> for AlertingInte
debug!("installing rule: {:#?}", rule);
rule.install(&self.sender).await?;
}
debug!("hit sender ensure installed for AlertingInterpret");
if let Some(targets) = &self.scrape_targets {
for target in targets.iter() {
debug!("installing scrape_target: {:#?}", target);
target.install(&self.sender).await?;
}
}
self.sender.ensure_installed(inventory, topology).await?;
Ok(Outcome::success(format!(
"successfully installed alert sender {}",
@@ -79,6 +85,6 @@ pub trait AlertRule<S: AlertSender>: std::fmt::Debug + Send + Sync {
}
#[async_trait]
pub trait ScrapeTarget<S: AlertSender> {
async fn install(&self, sender: &S) -> Result<(), InterpretError>;
pub trait ScrapeTarget<S: AlertSender>: std::fmt::Debug + Send + Sync {
async fn install(&self, sender: &S) -> Result<Outcome, InterpretError>;
}

View File

@@ -0,0 +1,378 @@
use async_trait::async_trait;
use brocade::{BrocadeClient, BrocadeOptions, InterSwitchLink, InterfaceStatus, PortOperatingMode};
use harmony_types::{
net::{IpAddress, MacAddress},
switch::{PortDeclaration, PortLocation},
};
use option_ext::OptionExt;
use crate::topology::{SwitchClient, SwitchError};
#[derive(Debug)]
pub struct BrocadeSwitchClient {
brocade: Box<dyn BrocadeClient + Send + Sync>,
}
impl BrocadeSwitchClient {
pub async fn init(
ip_addresses: &[IpAddress],
username: &str,
password: &str,
options: Option<BrocadeOptions>,
) -> Result<Self, brocade::Error> {
let brocade = brocade::init(ip_addresses, 22, username, password, options).await?;
Ok(Self { brocade })
}
}
#[async_trait]
impl SwitchClient for BrocadeSwitchClient {
async fn setup(&self) -> Result<(), SwitchError> {
let stack_topology = self
.brocade
.get_stack_topology()
.await
.map_err(|e| SwitchError::new(e.to_string()))?;
let interfaces = self
.brocade
.get_interfaces()
.await
.map_err(|e| SwitchError::new(e.to_string()))?;
let interfaces: Vec<(String, PortOperatingMode)> = interfaces
.into_iter()
.filter(|interface| {
interface.operating_mode.is_none() && interface.status == InterfaceStatus::Connected
})
.filter(|interface| {
!stack_topology.iter().any(|link: &InterSwitchLink| {
link.local_port == interface.port_location
|| link.remote_port.contains(&interface.port_location)
})
})
.map(|interface| (interface.name.clone(), PortOperatingMode::Access))
.collect();
if interfaces.is_empty() {
return Ok(());
}
self.brocade
.configure_interfaces(interfaces)
.await
.map_err(|e| SwitchError::new(e.to_string()))?;
Ok(())
}
async fn find_port(
&self,
mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError> {
let table = self
.brocade
.get_mac_address_table()
.await
.map_err(|e| SwitchError::new(format!("{e}")))?;
let port = table
.iter()
.find(|entry| entry.mac_address == *mac_address)
.map(|entry| match &entry.port {
PortDeclaration::Single(port_location) => Ok(port_location.clone()),
_ => Err(SwitchError::new(
"Multiple ports found for MAC address".into(),
)),
});
match port {
Some(Ok(p)) => Ok(Some(p)),
Some(Err(e)) => Err(e),
None => Ok(None),
}
}
async fn configure_port_channel(
&self,
channel_name: &str,
switch_ports: Vec<PortLocation>,
) -> Result<u8, SwitchError> {
let channel_id = self
.brocade
.find_available_channel_id()
.await
.map_err(|e| SwitchError::new(format!("{e}")))?;
self.brocade
.create_port_channel(channel_id, channel_name, &switch_ports)
.await
.map_err(|e| SwitchError::new(format!("{e}")))?;
Ok(channel_id)
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
use assertor::*;
use async_trait::async_trait;
use brocade::{
BrocadeClient, BrocadeInfo, Error, InterSwitchLink, InterfaceInfo, InterfaceStatus,
InterfaceType, MacAddressEntry, PortChannelId, PortOperatingMode,
};
use harmony_types::switch::PortLocation;
use crate::{infra::brocade::BrocadeSwitchClient, topology::SwitchClient};
#[tokio::test]
async fn setup_should_configure_ethernet_interfaces_as_access_ports() {
let first_interface = given_interface()
.with_port_location(PortLocation(1, 0, 1))
.build();
let second_interface = given_interface()
.with_port_location(PortLocation(1, 0, 4))
.build();
let brocade = Box::new(FakeBrocadeClient::new(
vec![],
vec![first_interface.clone(), second_interface.clone()],
));
let client = BrocadeSwitchClient {
brocade: brocade.clone(),
};
client.setup().await.unwrap();
let configured_interfaces = brocade.configured_interfaces.lock().unwrap();
assert_that!(*configured_interfaces).contains_exactly(vec![
(first_interface.name.clone(), PortOperatingMode::Access),
(second_interface.name.clone(), PortOperatingMode::Access),
]);
}
#[tokio::test]
async fn setup_with_an_already_configured_interface_should_skip_configuration() {
let brocade = Box::new(FakeBrocadeClient::new(
vec![],
vec![
given_interface()
.with_operating_mode(Some(PortOperatingMode::Access))
.build(),
],
));
let client = BrocadeSwitchClient {
brocade: brocade.clone(),
};
client.setup().await.unwrap();
let configured_interfaces = brocade.configured_interfaces.lock().unwrap();
assert_that!(*configured_interfaces).is_empty();
}
#[tokio::test]
async fn setup_with_a_disconnected_interface_should_skip_configuration() {
let brocade = Box::new(FakeBrocadeClient::new(
vec![],
vec![
given_interface()
.with_status(InterfaceStatus::SfpAbsent)
.build(),
given_interface()
.with_status(InterfaceStatus::NotConnected)
.build(),
],
));
let client = BrocadeSwitchClient {
brocade: brocade.clone(),
};
client.setup().await.unwrap();
let configured_interfaces = brocade.configured_interfaces.lock().unwrap();
assert_that!(*configured_interfaces).is_empty();
}
#[tokio::test]
async fn setup_with_inter_switch_links_should_not_configure_interfaces_used_to_form_stack() {
let brocade = Box::new(FakeBrocadeClient::new(
vec![
given_inter_switch_link()
.between(PortLocation(1, 0, 1), PortLocation(2, 0, 1))
.build(),
given_inter_switch_link()
.between(PortLocation(2, 0, 2), PortLocation(3, 0, 1))
.build(),
],
vec![
given_interface()
.with_port_location(PortLocation(1, 0, 1))
.build(),
given_interface()
.with_port_location(PortLocation(2, 0, 1))
.build(),
given_interface()
.with_port_location(PortLocation(3, 0, 1))
.build(),
],
));
let client = BrocadeSwitchClient {
brocade: brocade.clone(),
};
client.setup().await.unwrap();
let configured_interfaces = brocade.configured_interfaces.lock().unwrap();
assert_that!(*configured_interfaces).is_empty();
}
#[derive(Debug, Clone)]
struct FakeBrocadeClient {
stack_topology: Vec<InterSwitchLink>,
interfaces: Vec<InterfaceInfo>,
configured_interfaces: Arc<Mutex<Vec<(String, PortOperatingMode)>>>,
}
#[async_trait]
impl BrocadeClient for FakeBrocadeClient {
async fn version(&self) -> Result<BrocadeInfo, Error> {
todo!()
}
async fn get_mac_address_table(&self) -> Result<Vec<MacAddressEntry>, Error> {
todo!()
}
async fn get_stack_topology(&self) -> Result<Vec<InterSwitchLink>, Error> {
Ok(self.stack_topology.clone())
}
async fn get_interfaces(&self) -> Result<Vec<InterfaceInfo>, Error> {
Ok(self.interfaces.clone())
}
async fn configure_interfaces(
&self,
interfaces: Vec<(String, PortOperatingMode)>,
) -> Result<(), Error> {
let mut configured_interfaces = self.configured_interfaces.lock().unwrap();
*configured_interfaces = interfaces;
Ok(())
}
async fn find_available_channel_id(&self) -> Result<PortChannelId, Error> {
todo!()
}
async fn create_port_channel(
&self,
_channel_id: PortChannelId,
_channel_name: &str,
_ports: &[PortLocation],
) -> Result<(), Error> {
todo!()
}
async fn clear_port_channel(&self, _channel_name: &str) -> Result<(), Error> {
todo!()
}
}
impl FakeBrocadeClient {
fn new(stack_topology: Vec<InterSwitchLink>, interfaces: Vec<InterfaceInfo>) -> Self {
Self {
stack_topology,
interfaces,
configured_interfaces: Arc::new(Mutex::new(vec![])),
}
}
}
struct InterfaceInfoBuilder {
port_location: Option<PortLocation>,
interface_type: Option<InterfaceType>,
operating_mode: Option<PortOperatingMode>,
status: Option<InterfaceStatus>,
}
impl InterfaceInfoBuilder {
fn build(&self) -> InterfaceInfo {
let interface_type = self
.interface_type
.clone()
.unwrap_or(InterfaceType::Ethernet("TenGigabitEthernet".into()));
let port_location = self.port_location.clone().unwrap_or(PortLocation(1, 0, 1));
let name = format!("{interface_type} {port_location}");
let status = self.status.clone().unwrap_or(InterfaceStatus::Connected);
InterfaceInfo {
name,
port_location,
interface_type,
operating_mode: self.operating_mode.clone(),
status,
}
}
fn with_port_location(self, port_location: PortLocation) -> Self {
Self {
port_location: Some(port_location),
..self
}
}
fn with_operating_mode(self, operating_mode: Option<PortOperatingMode>) -> Self {
Self {
operating_mode,
..self
}
}
fn with_status(self, status: InterfaceStatus) -> Self {
Self {
status: Some(status),
..self
}
}
}
struct InterSwitchLinkBuilder {
link: Option<(PortLocation, PortLocation)>,
}
impl InterSwitchLinkBuilder {
fn build(&self) -> InterSwitchLink {
let link = self
.link
.clone()
.unwrap_or((PortLocation(1, 0, 1), PortLocation(2, 0, 1)));
InterSwitchLink {
local_port: link.0,
remote_port: Some(link.1),
}
}
fn between(self, local_port: PortLocation, remote_port: PortLocation) -> Self {
Self {
link: Some((local_port, remote_port)),
}
}
}
fn given_interface() -> InterfaceInfoBuilder {
InterfaceInfoBuilder {
port_location: None,
interface_type: None,
operating_mode: None,
status: None,
}
}
fn given_inter_switch_link() -> InterSwitchLinkBuilder {
InterSwitchLinkBuilder { link: None }
}
}

View File

@@ -1,3 +1,4 @@
pub mod brocade;
pub mod executors;
pub mod hp_ilo;
pub mod intel_amt;

View File

@@ -26,19 +26,13 @@ impl LoadBalancer for OPNSenseFirewall {
}
async fn add_service(&self, service: &LoadBalancerService) -> Result<(), ExecutorError> {
warn!(
"TODO : the current implementation does not check / cleanup / merge with existing haproxy services properly. Make sure to manually verify that the configuration is correct after executing any operation here"
);
let mut config = self.opnsense_config.write().await;
let mut load_balancer = config.load_balancer();
let (frontend, backend, servers, healthcheck) =
harmony_load_balancer_service_to_haproxy_xml(service);
let mut load_balancer = config.load_balancer();
load_balancer.add_backend(backend);
load_balancer.add_frontend(frontend);
load_balancer.add_servers(servers);
if let Some(healthcheck) = healthcheck {
load_balancer.add_healthcheck(healthcheck);
}
load_balancer.configure_service(frontend, backend, servers, healthcheck);
Ok(())
}
@@ -106,7 +100,7 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer(
.backends
.backends
.iter()
.find(|b| b.uuid == frontend.default_backend);
.find(|b| Some(b.uuid.clone()) == frontend.default_backend);
let mut health_check = None;
match matching_backend {
@@ -116,8 +110,7 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer(
}
None => {
warn!(
"HAProxy config could not find a matching backend for frontend {:?}",
frontend
"HAProxy config could not find a matching backend for frontend {frontend:?}"
);
}
}
@@ -152,11 +145,11 @@ pub(crate) fn get_servers_for_backend(
.servers
.iter()
.filter_map(|server| {
let address = server.address.clone()?;
let port = server.port?;
if backend_servers.contains(&server.uuid.as_str()) {
return Some(BackendServer {
address: server.address.clone(),
port: server.port,
});
return Some(BackendServer { address, port });
}
None
})
@@ -347,7 +340,7 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
name: format!("frontend_{}", service.listening_port),
bind: service.listening_port.to_string(),
mode: "tcp".to_string(), // TODO do not depend on health check here
default_backend: backend.uuid.clone(),
default_backend: Some(backend.uuid.clone()),
..Default::default()
};
info!("HAPRoxy frontend and backend mode currently hardcoded to tcp");
@@ -361,8 +354,8 @@ fn server_to_haproxy_server(server: &BackendServer) -> HAProxyServer {
uuid: Uuid::new_v4().to_string(),
name: format!("{}_{}", &server.address, &server.port),
enabled: 1,
address: server.address.clone(),
port: server.port,
address: Some(server.address.clone()),
port: Some(server.port),
mode: "active".to_string(),
server_type: "static".to_string(),
..Default::default()
@@ -385,8 +378,8 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: "192.168.1.1".to_string(),
port: 80,
address: Some("192.168.1.1".to_string()),
port: Some(80),
..Default::default()
};
haproxy.servers.servers.push(server);
@@ -411,8 +404,8 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: "192.168.1.1".to_string(),
port: 80,
address: Some("192.168.1.1".to_string()),
port: Some(80),
..Default::default()
};
haproxy.servers.servers.push(server);
@@ -431,8 +424,8 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: "192.168.1.1".to_string(),
port: 80,
address: Some("192.168.1.1".to_string()),
port: Some(80),
..Default::default()
};
haproxy.servers.servers.push(server);
@@ -453,16 +446,16 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: "some-hostname.test.mcd".to_string(),
port: 80,
address: Some("some-hostname.test.mcd".to_string()),
port: Some(80),
..Default::default()
};
haproxy.servers.servers.push(server);
let server = HAProxyServer {
uuid: "server2".to_string(),
address: "192.168.1.2".to_string(),
port: 8080,
address: Some("192.168.1.2".to_string()),
port: Some(8080),
..Default::default()
};
haproxy.servers.servers.push(server);

View File

@@ -0,0 +1,209 @@
use std::sync::Arc;
use async_trait::async_trait;
use harmony_types::id::Id;
use kube::{CustomResource, api::ObjectMeta};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
};
#[derive(Clone, Debug, Serialize)]
pub struct ClusterIssuerScore {
email: String,
server: String,
issuer_name: String,
namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for ClusterIssuerScore {
fn name(&self) -> String {
"ClusterIssuerScore".to_string()
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(ClusterIssuerInterpret {
score: self.clone(),
})
}
}
#[derive(Debug, Clone)]
pub struct ClusterIssuerInterpret {
score: ClusterIssuerScore,
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for ClusterIssuerInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
self.apply_cluster_issuer(topology.k8s_client().await.unwrap())
.await
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("ClusterIssuer")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl ClusterIssuerInterpret {
async fn validate_cert_manager(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let cert_manager = "cert-manager".to_string();
let operator_namespace = "openshift-operators".to_string();
match client
.get_deployment(&cert_manager, Some(&operator_namespace))
.await
{
Ok(Some(deployment)) => {
if let Some(status) = deployment.status {
let ready_count = status.ready_replicas.unwrap_or(0);
if ready_count >= 1 {
return Ok(Outcome::success(format!(
"'{}' is ready with {} replica(s).",
&cert_manager, ready_count
)));
} else {
return Err(InterpretError::new(
"cert-manager operator not ready in cluster".to_string(),
));
}
} else {
Err(InterpretError::new(format!(
"failed to get deployment status {} in ns {}",
&cert_manager, &operator_namespace
)))
}
}
Ok(None) => Err(InterpretError::new(format!(
"Deployment '{}' not found in namespace '{}'.",
&cert_manager, &operator_namespace
))),
Err(e) => Err(InterpretError::new(format!(
"Failed to query for deployment '{}': {}",
&cert_manager, e
))),
}
}
fn build_cluster_issuer(&self) -> Result<ClusterIssuer, InterpretError> {
let issuer_name = &self.score.issuer_name;
let email = &self.score.email;
let server = &self.score.server;
let namespace = &self.score.namespace;
let cluster_issuer = ClusterIssuer {
metadata: ObjectMeta {
name: Some(issuer_name.to_string()),
namespace: Some(namespace.to_string()),
..Default::default()
},
spec: ClusterIssuerSpec {
acme: AcmeSpec {
email: email.to_string(),
private_key_secret_ref: PrivateKeySecretRef {
name: issuer_name.to_string(),
},
server: server.to_string(),
solvers: vec![SolverSpec {
http01: Some(Http01Solver {
ingress: Http01Ingress {
class: "nginx".to_string(),
},
}),
}],
},
},
};
Ok(cluster_issuer)
}
pub async fn apply_cluster_issuer(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let namespace = self.score.namespace.clone();
self.validate_cert_manager(&client).await?;
let cluster_issuer = self.build_cluster_issuer().unwrap();
client
.apply_yaml(
&serde_yaml::to_value(cluster_issuer).unwrap(),
Some(&namespace),
)
.await?;
Ok(Outcome::success(format!(
"successfully deployed cluster operator: {} in namespace: {}",
self.score.issuer_name, self.score.namespace
)))
}
}
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "cert-manager.io",
version = "v1",
kind = "ClusterIssuer",
plural = "clusterissuers"
)]
#[serde(rename_all = "camelCase")]
pub struct ClusterIssuerSpec {
pub acme: AcmeSpec,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct AcmeSpec {
pub email: String,
pub private_key_secret_ref: PrivateKeySecretRef,
pub server: String,
pub solvers: Vec<SolverSpec>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct PrivateKeySecretRef {
pub name: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct SolverSpec {
pub http01: Option<Http01Solver>,
// Other solver types (e.g., dns01) would go here as Options
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Http01Solver {
pub ingress: Http01Ingress,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Http01Ingress {
pub class: String,
}

View File

@@ -1,2 +1,3 @@
pub mod cluster_issuer;
mod helm;
pub use helm::*;

View File

@@ -0,0 +1,187 @@
use std::net::IpAddr;
use async_trait::async_trait;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::{
modules::monitoring::kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus, crd_prometheuses::LabelSelector,
},
topology::oberservability::monitoring::ScrapeTarget,
};
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "monitoring.coreos.com",
version = "v1alpha1",
kind = "ScrapeConfig",
plural = "scrapeconfigs",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct ScrapeConfigSpec {
/// List of static configurations.
pub static_configs: Option<Vec<StaticConfig>>,
/// Kubernetes service discovery.
pub kubernetes_sd_configs: Option<Vec<KubernetesSDConfig>>,
/// HTTP-based service discovery.
pub http_sd_configs: Option<Vec<HttpSDConfig>>,
/// File-based service discovery.
pub file_sd_configs: Option<Vec<FileSDConfig>>,
/// DNS-based service discovery.
pub dns_sd_configs: Option<Vec<DnsSDConfig>>,
/// Consul service discovery.
pub consul_sd_configs: Option<Vec<ConsulSDConfig>>,
/// Relabeling configuration applied to discovered targets.
pub relabel_configs: Option<Vec<RelabelConfig>>,
/// Metric relabeling configuration applied to scraped samples.
pub metric_relabel_configs: Option<Vec<RelabelConfig>>,
/// Path to scrape metrics from (defaults to `/metrics`).
pub metrics_path: Option<String>,
/// Interval at which Prometheus scrapes targets (e.g., "30s").
pub scrape_interval: Option<String>,
/// Timeout for scraping (e.g., "10s").
pub scrape_timeout: Option<String>,
/// Optional job name override.
pub job_name: Option<String>,
/// Optional scheme (http or https).
pub scheme: Option<String>,
/// Authorization paramaters for snmp walk
pub params: Option<Params>,
}
/// Static configuration section of a ScrapeConfig.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct StaticConfig {
pub targets: Vec<String>,
pub labels: Option<LabelSelector>,
}
/// Relabeling configuration for target or metric relabeling.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct RelabelConfig {
pub source_labels: Option<Vec<String>>,
pub separator: Option<String>,
pub target_label: Option<String>,
pub regex: Option<String>,
pub modulus: Option<u64>,
pub replacement: Option<String>,
pub action: Option<String>,
}
/// Kubernetes service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct KubernetesSDConfig {
///"pod", "service", "endpoints"pub role: String,
pub namespaces: Option<NamespaceSelector>,
pub selectors: Option<Vec<LabelSelector>>,
pub api_server: Option<String>,
pub bearer_token_file: Option<String>,
pub tls_config: Option<TLSConfig>,
}
/// Namespace selector for Kubernetes service discovery.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct NamespaceSelector {
pub any: Option<bool>,
pub match_names: Option<Vec<String>>,
}
/// HTTP-based service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct HttpSDConfig {
pub url: String,
pub refresh_interval: Option<String>,
pub basic_auth: Option<BasicAuth>,
pub authorization: Option<Authorization>,
pub tls_config: Option<TLSConfig>,
}
/// File-based service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct FileSDConfig {
pub files: Vec<String>,
pub refresh_interval: Option<String>,
}
/// DNS-based service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct DnsSDConfig {
pub names: Vec<String>,
pub refresh_interval: Option<String>,
pub type_: Option<String>, // SRV, A, AAAA
pub port: Option<u16>,
}
/// Consul service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ConsulSDConfig {
pub server: String,
pub services: Option<Vec<String>>,
pub scheme: Option<String>,
pub datacenter: Option<String>,
pub tag_separator: Option<String>,
pub refresh_interval: Option<String>,
pub tls_config: Option<TLSConfig>,
}
/// Basic authentication credentials.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct BasicAuth {
pub username: String,
pub password: Option<String>,
pub password_file: Option<String>,
}
/// Bearer token or other auth mechanisms.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Authorization {
pub credentials: Option<String>,
pub credentials_file: Option<String>,
pub type_: Option<String>,
}
/// TLS configuration for secure scraping.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct TLSConfig {
pub ca_file: Option<String>,
pub cert_file: Option<String>,
pub key_file: Option<String>,
pub server_name: Option<String>,
pub insecure_skip_verify: Option<bool>,
}
/// Authorization parameters for SNMP walk.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Params {
pub auth: Option<Vec<String>>,
pub module: Option<Vec<String>>,
}

View File

@@ -4,6 +4,7 @@ pub mod crd_default_rules;
pub mod crd_grafana;
pub mod crd_prometheus_rules;
pub mod crd_prometheuses;
pub mod crd_scrape_config;
pub mod grafana_default_dashboard;
pub mod grafana_operator;
pub mod prometheus_operator;

View File

@@ -31,6 +31,7 @@ impl<T: Topology + HelmCommand + TenantManager> Score<T> for HelmPrometheusAlert
sender: KubePrometheus { config },
receivers: self.receivers.clone(),
rules: self.rules.clone(),
scrape_targets: None,
})
}
fn name(&self) -> String {

View File

@@ -6,3 +6,4 @@ pub mod kube_prometheus;
pub mod ntfy;
pub mod okd;
pub mod prometheus;
pub mod scrape_target;

View File

@@ -0,0 +1 @@
pub mod server;

View File

@@ -0,0 +1,76 @@
use std::net::IpAddr;
use async_trait::async_trait;
use kube::api::ObjectMeta;
use serde::Serialize;
use crate::{
interpret::{InterpretError, Outcome},
modules::monitoring::kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus,
crd_scrape_config::{Params, RelabelConfig, ScrapeConfig, ScrapeConfigSpec, StaticConfig},
},
topology::oberservability::monitoring::ScrapeTarget,
};
#[derive(Debug, Clone, Serialize)]
pub struct Server {
pub name: String,
pub ip: IpAddr,
pub auth: String,
pub module: String,
pub domain: String,
}
#[async_trait]
impl ScrapeTarget<CRDPrometheus> for Server {
async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> {
let scrape_config_spec = ScrapeConfigSpec {
static_configs: Some(vec![StaticConfig {
targets: vec![self.ip.to_string()],
labels: None,
}]),
scrape_interval: Some("2m".to_string()),
kubernetes_sd_configs: None,
http_sd_configs: None,
file_sd_configs: None,
dns_sd_configs: None,
params: Some(Params {
auth: Some(vec![self.auth.clone()]),
module: Some(vec![self.module.clone()]),
}),
consul_sd_configs: None,
relabel_configs: Some(vec![RelabelConfig {
action: None,
source_labels: Some(vec!["__address__".to_string()]),
separator: None,
target_label: Some("__param_target".to_string()),
regex: None,
replacement: Some(format!("snmp.{}:31080", self.domain.clone())),
modulus: None,
}]),
metric_relabel_configs: None,
metrics_path: Some("/snmp".to_string()),
scrape_timeout: Some("2m".to_string()),
job_name: Some(format!("snmp_exporter/cloud/{}", self.name.clone())),
scheme: None,
};
let scrape_config = ScrapeConfig {
metadata: ObjectMeta {
name: Some(self.name.clone()),
namespace: Some(sender.namespace.clone()),
..Default::default()
},
spec: scrape_config_spec,
};
sender
.client
.apply(&scrape_config, Some(&sender.namespace.clone()))
.await?;
Ok(Outcome::success(format!(
"installed scrape target {}",
self.name.clone()
)))
}
}

View File

@@ -5,8 +5,10 @@ use crate::{
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::{HostRole, Inventory},
modules::{
dhcp::DhcpHostBindingScore, http::IPxeMacBootFileScore,
inventory::DiscoverHostForRoleScore, okd::templates::BootstrapIpxeTpl,
dhcp::DhcpHostBindingScore,
http::IPxeMacBootFileScore,
inventory::DiscoverHostForRoleScore,
okd::{host_network::HostNetworkConfigurationScore, templates::BootstrapIpxeTpl},
},
score::Score,
topology::{HAClusterTopology, HostBinding},
@@ -28,7 +30,7 @@ pub struct OKDSetup03ControlPlaneScore {}
impl Score<HAClusterTopology> for OKDSetup03ControlPlaneScore {
fn create_interpret(&self) -> Box<dyn Interpret<HAClusterTopology>> {
Box::new(OKDSetup03ControlPlaneInterpret::new(self.clone()))
Box::new(OKDSetup03ControlPlaneInterpret::new())
}
fn name(&self) -> String {
@@ -38,17 +40,15 @@ impl Score<HAClusterTopology> for OKDSetup03ControlPlaneScore {
#[derive(Debug, Clone)]
pub struct OKDSetup03ControlPlaneInterpret {
score: OKDSetup03ControlPlaneScore,
version: Version,
status: InterpretStatus,
}
impl OKDSetup03ControlPlaneInterpret {
pub fn new(score: OKDSetup03ControlPlaneScore) -> Self {
pub fn new() -> Self {
let version = Version::from("1.0.0").unwrap();
Self {
version,
score,
status: InterpretStatus::QUEUED,
}
}
@@ -159,7 +159,7 @@ impl OKDSetup03ControlPlaneInterpret {
}
.to_string();
debug!("[ControlPlane] iPXE content template:\n{}", content);
debug!("[ControlPlane] iPXE content template:\n{content}");
// Create and apply an iPXE boot file for each node.
for node in nodes {
@@ -189,16 +189,13 @@ impl OKDSetup03ControlPlaneInterpret {
/// Prompts the user to reboot the target control plane nodes.
async fn reboot_targets(&self, nodes: &Vec<PhysicalHost>) -> Result<(), InterpretError> {
let node_ids: Vec<String> = nodes.iter().map(|n| n.id.to_string()).collect();
info!(
"[ControlPlane] Requesting reboot for control plane nodes: {:?}",
node_ids
);
info!("[ControlPlane] Requesting reboot for control plane nodes: {node_ids:?}",);
let confirmation = inquire::Confirm::new(
&format!("Please reboot the {} control plane nodes ({}) to apply their PXE configuration. Press enter when ready.", nodes.len(), node_ids.join(", ")),
)
.prompt()
.map_err(|e| InterpretError::new(format!("User prompt failed: {}", e)))?;
.map_err(|e| InterpretError::new(format!("User prompt failed: {e}")))?;
if !confirmation {
return Err(InterpretError::new(
@@ -210,14 +207,23 @@ impl OKDSetup03ControlPlaneInterpret {
}
/// Placeholder for automating network bonding configuration.
async fn persist_network_bond(&self) -> Result<(), InterpretError> {
// Generate MC or NNCP from inventory NIC data; apply via ignition or post-join.
info!("[ControlPlane] Ensuring persistent bonding via MachineConfig/NNCP");
async fn persist_network_bond(
&self,
inventory: &Inventory,
topology: &HAClusterTopology,
hosts: &Vec<PhysicalHost>,
) -> Result<(), InterpretError> {
info!("[ControlPlane] Ensuring persistent bonding");
let score = HostNetworkConfigurationScore {
hosts: hosts.clone(),
};
score.interpret(inventory, topology).await?;
inquire::Confirm::new(
"Network configuration for control plane nodes is not automated yet. Configure it manually if needed.",
)
.prompt()
.map_err(|e| InterpretError::new(format!("User prompt failed: {}", e)))?;
.map_err(|e| InterpretError::new(format!("User prompt failed: {e}")))?;
Ok(())
}
@@ -260,7 +266,8 @@ impl Interpret<HAClusterTopology> for OKDSetup03ControlPlaneInterpret {
self.reboot_targets(&nodes).await?;
// 5. Placeholder for post-boot network configuration (e.g., bonding).
self.persist_network_bond().await?;
self.persist_network_bond(inventory, topology, &nodes)
.await?;
// TODO: Implement a step to wait for the control plane nodes to join the cluster
// and for the cluster operators to become available. This would be similar to

View File

@@ -77,6 +77,8 @@ impl OKDBootstrapLoadBalancerScore {
address: topology.bootstrap_host.ip.to_string(),
port,
});
backend.dedup();
backend
}
}

View File

@@ -0,0 +1,41 @@
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
pub mod nmstate;
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "operators.coreos.com",
version = "v1",
kind = "OperatorGroup",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct OperatorGroupSpec {
pub target_namespaces: Vec<String>,
}
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "operators.coreos.com",
version = "v1alpha1",
kind = "Subscription",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct SubscriptionSpec {
pub name: String,
pub source: String,
pub source_namespace: String,
pub channel: Option<String>,
pub install_plan_approval: Option<InstallPlanApproval>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub enum InstallPlanApproval {
#[serde(rename = "Automatic")]
Automatic,
#[serde(rename = "Manual")]
Manual,
}

View File

@@ -0,0 +1,251 @@
use std::collections::BTreeMap;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(group = "nmstate.io", version = "v1", kind = "NMState", namespaced)]
#[serde(rename_all = "camelCase")]
pub struct NMStateSpec {
pub probe_configuration: Option<ProbeConfig>,
}
impl Default for NMState {
fn default() -> Self {
Self {
metadata: Default::default(),
spec: NMStateSpec {
probe_configuration: None,
},
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProbeConfig {
pub dns: ProbeDns,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ProbeDns {
pub host: String,
}
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "nmstate.io",
version = "v1",
kind = "NodeNetworkConfigurationPolicy",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct NodeNetworkConfigurationPolicySpec {
pub node_selector: Option<BTreeMap<String, String>>,
pub desired_state: DesiredStateSpec,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct DesiredStateSpec {
pub interfaces: Vec<InterfaceSpec>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct InterfaceSpec {
pub name: String,
pub description: Option<String>,
pub r#type: String,
pub state: String,
pub mac_address: Option<String>,
pub mtu: Option<u32>,
pub controller: Option<String>,
pub ipv4: Option<IpStackSpec>,
pub ipv6: Option<IpStackSpec>,
pub ethernet: Option<EthernetSpec>,
pub link_aggregation: Option<BondSpec>,
pub vlan: Option<VlanSpec>,
pub vxlan: Option<VxlanSpec>,
pub mac_vtap: Option<MacVtapSpec>,
pub mac_vlan: Option<MacVlanSpec>,
pub infiniband: Option<InfinibandSpec>,
pub linux_bridge: Option<LinuxBridgeSpec>,
pub ovs_bridge: Option<OvsBridgeSpec>,
pub ethtool: Option<EthtoolSpec>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct IpStackSpec {
pub enabled: Option<bool>,
pub dhcp: Option<bool>,
pub autoconf: Option<bool>,
pub address: Option<Vec<IpAddressSpec>>,
pub auto_dns: Option<bool>,
pub auto_gateway: Option<bool>,
pub auto_routes: Option<bool>,
pub dhcp_client_id: Option<String>,
pub dhcp_duid: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct IpAddressSpec {
pub ip: String,
pub prefix_length: u8,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct EthernetSpec {
pub speed: Option<u32>,
pub duplex: Option<String>,
pub auto_negotiation: Option<bool>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct BondSpec {
pub mode: String,
pub ports: Vec<String>,
pub options: Option<BTreeMap<String, Value>>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct VlanSpec {
pub base_iface: String,
pub id: u16,
pub protocol: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct VxlanSpec {
pub base_iface: String,
pub id: u32,
pub remote: String,
pub local: Option<String>,
pub learning: Option<bool>,
pub destination_port: Option<u16>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct MacVtapSpec {
pub base_iface: String,
pub mode: String,
pub promiscuous: Option<bool>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct MacVlanSpec {
pub base_iface: String,
pub mode: String,
pub promiscuous: Option<bool>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct InfinibandSpec {
pub base_iface: String,
pub pkey: String,
pub mode: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct LinuxBridgeSpec {
pub options: Option<LinuxBridgeOptions>,
pub ports: Option<Vec<LinuxBridgePort>>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct LinuxBridgeOptions {
pub mac_ageing_time: Option<u32>,
pub multicast_snooping: Option<bool>,
pub stp: Option<StpOptions>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct StpOptions {
pub enabled: Option<bool>,
pub forward_delay: Option<u16>,
pub hello_time: Option<u16>,
pub max_age: Option<u16>,
pub priority: Option<u16>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct LinuxBridgePort {
pub name: String,
pub vlan: Option<LinuxBridgePortVlan>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct LinuxBridgePortVlan {
pub mode: Option<String>,
pub trunk_tags: Option<Vec<VlanTag>>,
pub tag: Option<u16>,
pub enable_native: Option<bool>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct VlanTag {
pub id: u16,
pub id_range: Option<VlanIdRange>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct VlanIdRange {
pub min: u16,
pub max: u16,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct OvsBridgeSpec {
pub options: Option<OvsBridgeOptions>,
pub ports: Option<Vec<OvsPortSpec>>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct OvsBridgeOptions {
pub stp: Option<bool>,
pub rstp: Option<bool>,
pub mcast_snooping_enable: Option<bool>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct OvsPortSpec {
pub name: String,
pub link_aggregation: Option<BondSpec>,
pub vlan: Option<LinuxBridgePortVlan>,
pub r#type: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct EthtoolSpec {
// TODO: Properly describe this spec (https://nmstate.io/devel/yaml_api.html#ethtool)
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct EthtoolFecSpec {
pub auto: Option<bool>,
pub mode: Option<String>,
}

View File

@@ -0,0 +1,394 @@
use async_trait::async_trait;
use harmony_types::id::Id;
use log::{debug, info};
use serde::Serialize;
use crate::{
data::Version,
hardware::PhysicalHost,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{HostNetworkConfig, NetworkInterface, Switch, SwitchPort, Topology},
};
#[derive(Debug, Clone, Serialize)]
pub struct HostNetworkConfigurationScore {
pub hosts: Vec<PhysicalHost>,
}
impl<T: Topology + Switch> Score<T> for HostNetworkConfigurationScore {
fn name(&self) -> String {
"HostNetworkConfigurationScore".into()
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(HostNetworkConfigurationInterpret {
score: self.clone(),
})
}
}
#[derive(Debug)]
pub struct HostNetworkConfigurationInterpret {
score: HostNetworkConfigurationScore,
}
impl HostNetworkConfigurationInterpret {
async fn configure_network_for_host<T: Topology + Switch>(
&self,
topology: &T,
host: &PhysicalHost,
) -> Result<(), InterpretError> {
let switch_ports = self.collect_switch_ports_for_host(topology, host).await?;
if !switch_ports.is_empty() {
topology
.configure_host_network(host, HostNetworkConfig { switch_ports })
.await
.map_err(|e| InterpretError::new(format!("Failed to configure host: {e}")))?;
}
Ok(())
}
async fn collect_switch_ports_for_host<T: Topology + Switch>(
&self,
topology: &T,
host: &PhysicalHost,
) -> Result<Vec<SwitchPort>, InterpretError> {
let mut switch_ports = vec![];
for network_interface in &host.network {
let mac_address = network_interface.mac_address;
match topology.get_port_for_mac_address(&mac_address).await {
Ok(Some(port)) => {
switch_ports.push(SwitchPort {
interface: NetworkInterface {
name: network_interface.name.clone(),
mac_address,
speed_mbps: network_interface.speed_mbps,
mtu: network_interface.mtu,
},
port,
});
}
Ok(None) => debug!("No port found for host '{}', skipping", host.id),
Err(e) => {
return Err(InterpretError::new(format!(
"Failed to get port for host '{}': {}",
host.id, e
)));
}
}
}
Ok(switch_ports)
}
}
#[async_trait]
impl<T: Topology + Switch> Interpret<T> for HostNetworkConfigurationInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("HostNetworkConfigurationInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
vec![]
}
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
if self.score.hosts.is_empty() {
return Ok(Outcome::noop("No hosts to configure".into()));
}
info!(
"Started network configuration for {} host(s)...",
self.score.hosts.len()
);
topology
.setup_switch()
.await
.map_err(|e| InterpretError::new(format!("Switch setup failed: {e}")))?;
let mut configured_host_count = 0;
for host in &self.score.hosts {
self.configure_network_for_host(topology, host).await?;
configured_host_count += 1;
}
if configured_host_count > 0 {
Ok(Outcome::success(format!(
"Configured {configured_host_count}/{} host(s)",
self.score.hosts.len()
)))
} else {
Ok(Outcome::noop("No hosts configured".into()))
}
}
}
#[cfg(test)]
mod tests {
use assertor::*;
use harmony_types::{net::MacAddress, switch::PortLocation};
use lazy_static::lazy_static;
use crate::{
hardware::HostCategory,
topology::{
HostNetworkConfig, PreparationError, PreparationOutcome, SwitchError, SwitchPort,
},
};
use std::{
str::FromStr,
sync::{Arc, Mutex},
};
use super::*;
lazy_static! {
pub static ref HOST_ID: Id = Id::from_str("host-1").unwrap();
pub static ref ANOTHER_HOST_ID: Id = Id::from_str("host-2").unwrap();
pub static ref EXISTING_INTERFACE: NetworkInterface = NetworkInterface {
mac_address: MacAddress::try_from("AA:BB:CC:DD:EE:F1".to_string()).unwrap(),
name: "interface-1".into(),
speed_mbps: None,
mtu: 1,
};
pub static ref ANOTHER_EXISTING_INTERFACE: NetworkInterface = NetworkInterface {
mac_address: MacAddress::try_from("AA:BB:CC:DD:EE:F2".to_string()).unwrap(),
name: "interface-2".into(),
speed_mbps: None,
mtu: 1,
};
pub static ref UNKNOWN_INTERFACE: NetworkInterface = NetworkInterface {
mac_address: MacAddress::try_from("11:22:33:44:55:61".to_string()).unwrap(),
name: "unknown-interface".into(),
speed_mbps: None,
mtu: 1,
};
pub static ref PORT: PortLocation = PortLocation(1, 0, 42);
pub static ref ANOTHER_PORT: PortLocation = PortLocation(2, 0, 42);
}
#[tokio::test]
async fn should_setup_switch() {
let host = given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]);
let score = given_score(vec![host]);
let topology = TopologyWithSwitch::new();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let switch_setup = topology.switch_setup.lock().unwrap();
assert_that!(*switch_setup).is_true();
}
#[tokio::test]
async fn host_with_one_mac_address_should_create_bond_with_one_interface() {
let host = given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]);
let score = given_score(vec![host]);
let topology = TopologyWithSwitch::new();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let configured_host_networks = topology.configured_host_networks.lock().unwrap();
assert_that!(*configured_host_networks).contains_exactly(vec![(
HOST_ID.clone(),
HostNetworkConfig {
switch_ports: vec![SwitchPort {
interface: EXISTING_INTERFACE.clone(),
port: PORT.clone(),
}],
},
)]);
}
#[tokio::test]
async fn host_with_multiple_mac_addresses_should_create_one_bond_with_all_interfaces() {
let score = given_score(vec![given_host(
&HOST_ID,
vec![
EXISTING_INTERFACE.clone(),
ANOTHER_EXISTING_INTERFACE.clone(),
],
)]);
let topology = TopologyWithSwitch::new();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let configured_host_networks = topology.configured_host_networks.lock().unwrap();
assert_that!(*configured_host_networks).contains_exactly(vec![(
HOST_ID.clone(),
HostNetworkConfig {
switch_ports: vec![
SwitchPort {
interface: EXISTING_INTERFACE.clone(),
port: PORT.clone(),
},
SwitchPort {
interface: ANOTHER_EXISTING_INTERFACE.clone(),
port: ANOTHER_PORT.clone(),
},
],
},
)]);
}
#[tokio::test]
async fn multiple_hosts_should_create_one_bond_per_host() {
let score = given_score(vec![
given_host(&HOST_ID, vec![EXISTING_INTERFACE.clone()]),
given_host(&ANOTHER_HOST_ID, vec![ANOTHER_EXISTING_INTERFACE.clone()]),
]);
let topology = TopologyWithSwitch::new();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let configured_host_networks = topology.configured_host_networks.lock().unwrap();
assert_that!(*configured_host_networks).contains_exactly(vec![
(
HOST_ID.clone(),
HostNetworkConfig {
switch_ports: vec![SwitchPort {
interface: EXISTING_INTERFACE.clone(),
port: PORT.clone(),
}],
},
),
(
ANOTHER_HOST_ID.clone(),
HostNetworkConfig {
switch_ports: vec![SwitchPort {
interface: ANOTHER_EXISTING_INTERFACE.clone(),
port: ANOTHER_PORT.clone(),
}],
},
),
]);
}
#[tokio::test]
async fn port_not_found_for_mac_address_should_not_configure_interface() {
let score = given_score(vec![given_host(&HOST_ID, vec![UNKNOWN_INTERFACE.clone()])]);
let topology = TopologyWithSwitch::new_port_not_found();
let _ = score.interpret(&Inventory::empty(), &topology).await;
let configured_host_networks = topology.configured_host_networks.lock().unwrap();
assert_that!(*configured_host_networks).is_empty();
}
fn given_score(hosts: Vec<PhysicalHost>) -> HostNetworkConfigurationScore {
HostNetworkConfigurationScore { hosts }
}
fn given_host(id: &Id, network_interfaces: Vec<NetworkInterface>) -> PhysicalHost {
let network = network_interfaces.iter().map(given_interface).collect();
PhysicalHost {
id: id.clone(),
category: HostCategory::Server,
network,
storage: vec![],
labels: vec![],
memory_modules: vec![],
cpus: vec![],
}
}
fn given_interface(
interface: &NetworkInterface,
) -> harmony_inventory_agent::hwinfo::NetworkInterface {
harmony_inventory_agent::hwinfo::NetworkInterface {
name: interface.name.clone(),
mac_address: interface.mac_address,
speed_mbps: interface.speed_mbps,
is_up: true,
mtu: interface.mtu,
ipv4_addresses: vec![],
ipv6_addresses: vec![],
driver: "driver".into(),
firmware_version: None,
}
}
struct TopologyWithSwitch {
available_ports: Arc<Mutex<Vec<PortLocation>>>,
configured_host_networks: Arc<Mutex<Vec<(Id, HostNetworkConfig)>>>,
switch_setup: Arc<Mutex<bool>>,
}
impl TopologyWithSwitch {
fn new() -> Self {
Self {
available_ports: Arc::new(Mutex::new(vec![PORT.clone(), ANOTHER_PORT.clone()])),
configured_host_networks: Arc::new(Mutex::new(vec![])),
switch_setup: Arc::new(Mutex::new(false)),
}
}
fn new_port_not_found() -> Self {
Self {
available_ports: Arc::new(Mutex::new(vec![])),
configured_host_networks: Arc::new(Mutex::new(vec![])),
switch_setup: Arc::new(Mutex::new(false)),
}
}
}
#[async_trait]
impl Topology for TopologyWithSwitch {
fn name(&self) -> &str {
"SwitchWithPortTopology"
}
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
Ok(PreparationOutcome::Success { details: "".into() })
}
}
#[async_trait]
impl Switch for TopologyWithSwitch {
async fn setup_switch(&self) -> Result<(), SwitchError> {
let mut switch_configured = self.switch_setup.lock().unwrap();
*switch_configured = true;
Ok(())
}
async fn get_port_for_mac_address(
&self,
_mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError> {
let mut ports = self.available_ports.lock().unwrap();
if ports.is_empty() {
return Ok(None);
}
Ok(Some(ports.remove(0)))
}
async fn configure_host_network(
&self,
host: &PhysicalHost,
config: HostNetworkConfig,
) -> Result<(), SwitchError> {
let mut configured_host_networks = self.configured_host_networks.lock().unwrap();
configured_host_networks.push((host.id.clone(), config.clone()));
Ok(())
}
}
}

View File

@@ -19,3 +19,5 @@ pub use bootstrap_03_control_plane::*;
pub use bootstrap_04_workers::*;
pub use bootstrap_05_sanity_check::*;
pub use bootstrap_06_installation_report::*;
pub mod crd;
pub mod host_network;

View File

@@ -4,7 +4,7 @@ use std::{
};
use async_trait::async_trait;
use log::{info, warn};
use log::{debug, warn};
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
@@ -19,8 +19,8 @@ use harmony_types::id::Id;
#[derive(Debug, Clone, Serialize)]
pub struct CephRemoveOsd {
osd_deployment_name: String,
rook_ceph_namespace: String,
pub osd_deployment_name: String,
pub rook_ceph_namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
@@ -54,18 +54,17 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
self.verify_deployment_scaled(client.clone()).await?;
self.delete_deployment(client.clone()).await?;
self.verify_deployment_deleted(client.clone()).await?;
let osd_id_full = self.get_ceph_osd_id().unwrap();
self.purge_ceph_osd(client.clone(), &osd_id_full).await?;
self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
.await?;
self.purge_ceph_osd(client.clone()).await?;
self.verify_ceph_osd_removal(client.clone()).await?;
let osd_id_full = self.get_ceph_osd_id().unwrap();
Ok(Outcome::success(format!(
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
osd_id_full, self.score.osd_deployment_name
)))
}
fn get_name(&self) -> InterpretName {
todo!()
InterpretName::CephRemoveOsd
}
fn get_version(&self) -> Version {
@@ -82,7 +81,7 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
}
impl CephRemoveOsdInterpret {
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
pub fn get_ceph_osd_id_numeric(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self
.score
.osd_deployment_name
@@ -94,9 +93,14 @@ impl CephRemoveOsdInterpret {
self.score.osd_deployment_name
))
})?;
Ok(osd_id_numeric.to_string())
}
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
let osd_id_full = format!("osd.{}", osd_id_numeric);
info!(
debug!(
"Targeting Ceph OSD: {} (parsed from deployment {})",
osd_id_full, self.score.osd_deployment_name
);
@@ -108,6 +112,7 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
debug!("verifying toolbox exists");
let toolbox_dep = "rook-ceph-tools".to_string();
match client
@@ -149,7 +154,7 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
debug!(
"Scaling down OSD deployment: {}",
self.score.osd_deployment_name
);
@@ -172,7 +177,7 @@ impl CephRemoveOsdInterpret {
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
debug!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
@@ -180,11 +185,9 @@ impl CephRemoveOsdInterpret {
Some(&self.score.rook_ceph_namespace),
)
.await?;
if let Some(deployment) = dep {
if let Some(status) = deployment.status {
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0
{
if status.replicas == None && status.ready_replicas == None {
return Ok(Outcome::success(
"Deployment successfully scaled down.".to_string(),
));
@@ -212,7 +215,7 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
debug!(
"Deleting OSD deployment: {}",
self.score.osd_deployment_name
);
@@ -234,7 +237,7 @@ impl CephRemoveOsdInterpret {
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
debug!("Verifying OSD deployment deleted");
loop {
let dep = client
.get_deployment(
@@ -244,7 +247,7 @@ impl CephRemoveOsdInterpret {
.await?;
if dep.is_none() {
info!(
debug!(
"Deployment {} successfully deleted.",
self.score.osd_deployment_name
);
@@ -276,12 +279,10 @@ impl CephRemoveOsdInterpret {
Ok(tree)
}
pub async fn purge_ceph_osd(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
info!(
pub async fn purge_ceph_osd(&self, client: Arc<K8sClient>) -> Result<Outcome, InterpretError> {
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
let osd_id_full = self.get_ceph_osd_id().unwrap();
debug!(
"Purging OSD {} from Ceph cluster and removing its auth key",
osd_id_full
);
@@ -291,8 +292,9 @@ impl CephRemoveOsdInterpret {
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec![
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(),
format!("ceph auth del osd.{osd_id_full}").as_str(),
"sh",
"-c",
format!("ceph osd purge {osd_id_numeric} --yes-i-really-mean-it && ceph auth del {osd_id_full}").as_str(),
],
)
.await?;
@@ -305,10 +307,10 @@ impl CephRemoveOsdInterpret {
pub async fn verify_ceph_osd_removal(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!(
let osd_id_full = self.get_ceph_osd_id().unwrap();
debug!(
"Verifying OSD {} has been removed from the Ceph tree...",
osd_id_full
);
@@ -318,7 +320,7 @@ impl CephRemoveOsdInterpret {
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec!["ceph osd tree -f json"],
vec!["sh", "-c", "ceph osd tree -f json"],
)
.await?;
let tree =

View File

@@ -1,2 +1,2 @@
pub mod ceph_osd_replacement_score;
pub mod ceph_remove_osd_score;
pub mod ceph_validate_health_score;