Compare commits

..

35 Commits

Author SHA1 Message Date
b2825ec1ef Merge pull request 'feat/impl_installable_crd_prometheus' (#170) from feat/impl_installable_crd_prometheus into master
Some checks failed
Run Check Script / check (push) Successful in 1m25s
Compile and package harmony_composer / package_harmony_composer (push) Failing after 2m42s
Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/170
2025-10-24 16:42:54 +00:00
609d7acb5d feat: impl clone_box for ScrapeTarget<CRDPrometheus>
All checks were successful
Run Check Script / check (pull_request) Successful in 1m25s
2025-10-24 12:05:54 -04:00
de761cf538 Merge branch 'master' into feat/impl_installable_crd_prometheus 2025-10-24 11:23:56 -04:00
c069207f12 Merge pull request 'refactor(ha_cluster): inject switch client for better testability' (#174) from switch-client into master
Some checks failed
Run Check Script / check (push) Successful in 1m44s
Compile and package harmony_composer / package_harmony_composer (push) Failing after 2m43s
Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/174
2025-10-23 15:05:17 +00:00
Ian Letourneau
7368184917 fix(ha_cluster): inject switch client for better testability
All checks were successful
Run Check Script / check (pull_request) Successful in 1m30s
2025-10-22 15:12:53 -04:00
05205f4ac1 Merge pull request 'feat: scrape targets to be able to get snmp alerts from machines to prometheus' (#171) from feat/scrape_target into master
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/171
2025-10-22 15:33:24 +00:00
3174645c97 Merge branch 'master' into feat/scrape_target
All checks were successful
Run Check Script / check (pull_request) Successful in 1m32s
2025-10-22 15:33:01 +00:00
7536f4ec4b Merge pull request 'fix: fixed merge error that somehow got missed' (#172) from fix/merge_error into master
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/172
2025-10-21 16:02:39 +00:00
464347d3e5 fix: fixed merge error that somehow got missed
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-21 12:01:31 -04:00
7f415f5b98 Merge pull request 'feat: K8sFlavour' (#161) from feat/detect_k8s_flavour into master
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/161
2025-10-21 15:56:47 +00:00
2a520a1d7c Merge branch 'master' into feat/detect_k8s_flavour
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-21 15:56:18 +00:00
987f195e2f feat(cert-manager): add cluster issuer to okd cluster score (#157)
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
added score to install okd cluster issuer

Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/157
2025-10-21 15:55:55 +00:00
14d1823d15 fix: remove ceph osd deletes and purges osd from ceph osd tree\ (#120)
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
k8s returns None rather than zero when checking deployment for replicas
exec_app requires commands 's' and '-c' to run correctly

Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/120
Co-authored-by: Willem <wrolleman@nationtech.io>
Co-committed-by: Willem <wrolleman@nationtech.io>
2025-10-21 15:54:51 +00:00
2a48d51479 fix: naming of k8s distribution
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-21 11:09:45 -04:00
20a227bb41 Merge branch 'master' into feat/detect_k8s_flavour
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-21 15:02:15 +00:00
ce91ee0168 fix: removed dead code, mapped error from grafana operator to preparation error rather than ignoring it, modified k8sprometheus score to unwrap_or_default() service monitors
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-20 15:31:06 -04:00
ed7f81aa1f fix(opnsense-config): ensure load balancer service configuration is idempotent (#129)
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
The previous implementation blindly added HAProxy components without checking for existing configurations on the same port, which caused duplicate entries and errors when a service was updated.

This commit refactors the logic to a robust "remove-then-add" strategy. The configure_service method now finds and removes any existing frontend and its dependent components (backend, servers, health check) before adding the new, complete service definition.

This change makes the process fully idempotent, preventing configuration drift and ensuring a predictable state.

Co-authored-by: Ian Letourneau <letourneau.ian@gmail.com>
Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/129
2025-10-20 19:18:49 +00:00
cb66b7592e fix: made targets plural and changed scrape targets to option in AlertingInterpret
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-20 14:44:37 -04:00
a815f6ac9c feat: scrape targets to be able to get snmp alerts from machines to prometheus
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-20 11:44:11 -04:00
2d891e4463 Merge pull request 'feat(host_network): configure bonds and port channels' (#169) from config-host-network into master
Some checks failed
Run Check Script / check (push) Has been cancelled
Compile and package harmony_composer / package_harmony_composer (push) Has been cancelled
Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/169
2025-10-16 18:24:58 +00:00
f66e58b9ca Merge branch 'master' into config-host-network
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-16 18:24:34 +00:00
ea39d93aa7 feat(host_network): configure bonds on the host and switch port channels
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-16 14:23:41 -04:00
6989d208cf Merge pull request 'feat(switch/brocade): Implement client to interact with Brocade switches' (#168) from brocade-switch-client into master
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/168
2025-10-16 18:23:01 +00:00
c0d54a4466 Merge remote-tracking branch 'origin/master' into feat/impl_installable_crd_prometheus
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-16 14:17:32 -04:00
fc384599a1 feat: implementation of Installable for CRDPrometheusIntroduction of Grafana trait and its impl for k8sanywhereallows for CRDPrometheus to be installed via AlertingInterpret which standardizes the installation of alert receivers, alerting rules, and alert senders 2025-10-16 14:07:23 -04:00
c0bd8007c7 feat(switch/brocade): Implement client to interact with Brocade Switch
All checks were successful
Run Check Script / check (pull_request) Successful in 1m5s
* Expose a high-level `brocade::init()` function to connect to a Brocade switch and automatically pick the best implementation based on its OS and version
* Implement a client for Brocade switches running on Network Operating System (NOS)
* Implement a client for older Brocade switches running on FastIron (partial implementation)

The architecture for the library is based on 3 layers:
1. The `BrocadeClient` trait to describe the available capabilities to
   interact with a Brocade switch. It is partly opinionated in order to
   offer higher level features to group multiple commands into a single
   function (e.g. create a port channel). Its implementations are
   basically just the commands to run on the switch and the functions to
   parse the output.
2. The `BrocadeShell` struct to make it easier to authenticate, send commands, and interact with the switch.
3. The `ssh` module to actually connect to the switch over SSH and execute the commands.

With time, we will add support for more Brocade switches and their various OS/versions. If needed, shared behavior could be extracted into a separate module to make it easier to add new implementations.
2025-10-15 15:28:24 -04:00
7dff70edcf wip: fixed token expiration and configured grafana dashboard 2025-10-15 15:26:36 -04:00
06a0c44c3c wip: connected the thanos-datasource to grafana, need to complete connecting the openshift-userworkload-monitoring as well 2025-10-14 15:53:42 -04:00
85bec66e58 wip: fixing grafana datasource for openshift which requires creating a token, sa, secret and inserting them into the grafanadatasource 2025-10-10 12:09:26 -04:00
1f3796f503 refactor(prometheus): modified crd prometheus to impl the installable trait 2025-10-09 12:26:05 -04:00
5f78300d78 Merge branch 'master' into feat/detect_k8s_flavour
All checks were successful
Run Check Script / check (pull_request) Successful in 1m20s
2025-10-02 17:14:30 -04:00
2d3c32469c chore: Simplify k8s flavour detection algorithm and do not unwrap when it cannot be detected, just return Err 2025-09-30 22:59:50 -04:00
1cec398d4d fix: modifed naming scheme to OpenshiftFamily, K3sFamily, and defaultswitched discovery of openshiftfamily to look for projet.openshift.io 2025-09-29 11:29:34 -04:00
58b6268989 wip: moving the install steps for grafana and prometheus into the trait installable<T> 2025-09-29 10:46:29 -04:00
f073b7e5fb feat:added k8s flavour to k8s_aywhere topology to be able to get the type of cluster
All checks were successful
Run Check Script / check (pull_request) Successful in 33s
2025-09-24 13:28:46 -04:00
59 changed files with 1828 additions and 394 deletions

53
Cargo.lock generated
View File

@ -1780,6 +1780,7 @@ dependencies = [
name = "example-nanodc"
version = "0.1.0"
dependencies = [
"brocade",
"cidr",
"env_logger",
"harmony",
@ -1788,6 +1789,7 @@ dependencies = [
"harmony_tui",
"harmony_types",
"log",
"serde",
"tokio",
"url",
]
@ -1806,6 +1808,7 @@ dependencies = [
name = "example-okd-install"
version = "0.1.0"
dependencies = [
"brocade",
"cidr",
"env_logger",
"harmony",
@ -1836,25 +1839,16 @@ dependencies = [
name = "example-opnsense"
version = "0.1.0"
dependencies = [
"brocade",
"cidr",
"env_logger",
"harmony",
"harmony_macros",
"harmony_secret",
"harmony_tui",
"harmony_types",
"log",
"tokio",
"url",
]
[[package]]
name = "example-penpot"
version = "0.1.0"
dependencies = [
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_types",
"serde",
"tokio",
"url",
]
@ -1863,6 +1857,7 @@ dependencies = [
name = "example-pxe"
version = "0.1.0"
dependencies = [
"brocade",
"cidr",
"env_logger",
"harmony",
@ -1877,6 +1872,15 @@ dependencies = [
"url",
]
[[package]]
name = "example-remove-rook-osd"
version = "0.1.0"
dependencies = [
"harmony",
"harmony_cli",
"tokio",
]
[[package]]
name = "example-rust"
version = "0.1.0"
@ -2456,17 +2460,6 @@ dependencies = [
"tokio",
]
[[package]]
name = "harmony_derive"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d138bbb32bb346299c5f95fbb53532313f39927cb47c411c99c634ef8665ef7"
dependencies = [
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "harmony_inventory_agent"
version = "0.1.0"
@ -3913,19 +3906,6 @@ dependencies = [
"web-time",
]
[[package]]
name = "okd_host_network"
version = "0.1.0"
dependencies = [
"harmony",
"harmony_cli",
"harmony_derive",
"harmony_inventory_agent",
"harmony_macros",
"harmony_types",
"tokio",
]
[[package]]
name = "once_cell"
version = "1.21.3"
@ -3954,6 +3934,7 @@ checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
name = "opnsense-config"
version = "0.1.0"
dependencies = [
"assertor",
"async-trait",
"chrono",
"env_logger",

View File

@ -15,7 +15,8 @@ members = [
"harmony_inventory_agent",
"harmony_secret_derive",
"harmony_secret",
"adr/agent_discovery/mdns", "brocade",
"adr/agent_discovery/mdns",
"brocade",
]
[workspace.package]

View File

@ -10,6 +10,7 @@ use log::{debug, info};
use regex::Regex;
use std::{collections::HashSet, str::FromStr};
#[derive(Debug)]
pub struct FastIronClient {
shell: BrocadeShell,
version: BrocadeInfo,
@ -70,7 +71,7 @@ impl FastIronClient {
Some(Ok(InterSwitchLink {
local_port,
remote_port: None, // FIXME: Map the remote port as well
remote_port: None,
}))
}

View File

@ -162,7 +162,7 @@ pub async fn init(
}
#[async_trait]
pub trait BrocadeClient {
pub trait BrocadeClient: std::fmt::Debug {
/// Retrieves the operating system and version details from the connected Brocade switch.
///
/// This is typically the first call made after establishing a connection to determine

View File

@ -10,6 +10,7 @@ use crate::{
parse_brocade_mac_address, shell::BrocadeShell,
};
#[derive(Debug)]
pub struct NetworkOperatingSystemClient {
shell: BrocadeShell,
version: BrocadeInfo,
@ -270,7 +271,7 @@ impl BrocadeClient for NetworkOperatingSystemClient {
commands.push("no ip address".into());
commands.push("no fabric isl enable".into());
commands.push("no fabric trunk enable".into());
commands.push(format!("channel-group {} mode active", channel_id));
commands.push(format!("channel-group {channel_id} mode active"));
commands.push("no shutdown".into());
commands.push("exit".into());
}

View File

@ -13,6 +13,7 @@ use log::info;
use russh::ChannelMsg;
use tokio::time::timeout;
#[derive(Debug)]
pub struct BrocadeShell {
ip: IpAddr,
port: u16,

View File

@ -17,3 +17,5 @@ harmony_secret = { path = "../../harmony_secret" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }
serde = { workspace = true }
brocade = { path = "../../brocade" }

View File

@ -3,12 +3,13 @@ use std::{
sync::Arc,
};
use brocade::BrocadeOptions;
use cidr::Ipv4Cidr;
use harmony::{
config::secret::SshKeyPair,
data::{FileContent, FilePath},
hardware::{HostCategory, Location, PhysicalHost, SwitchGroup},
infra::opnsense::OPNSenseManagementInterface,
infra::{brocade::BrocadeSwitchClient, opnsense::OPNSenseManagementInterface},
inventory::Inventory,
modules::{
http::StaticFilesHttpScore,
@ -22,8 +23,9 @@ use harmony::{
topology::{LogicalHost, UnmanagedRouter},
};
use harmony_macros::{ip, mac_address};
use harmony_secret::SecretManager;
use harmony_secret::{Secret, SecretManager};
use harmony_types::net::Url;
use serde::{Deserialize, Serialize};
#[tokio::main]
async fn main() {
@ -32,6 +34,26 @@ async fn main() {
name: String::from("fw0"),
};
let switch_auth = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
.await
.expect("Failed to get credentials");
let switches: Vec<IpAddr> = vec![ip!("192.168.33.101")];
let brocade_options = Some(BrocadeOptions {
dry_run: *harmony::config::DRY_RUN,
..Default::default()
});
let switch_client = BrocadeSwitchClient::init(
&switches,
&switch_auth.username,
&switch_auth.password,
brocade_options,
)
.await
.expect("Failed to connect to switch");
let switch_client = Arc::new(switch_client);
let opnsense = Arc::new(
harmony::infra::opnsense::OPNSenseFirewall::new(firewall, None, "root", "opnsense").await,
);
@ -83,7 +105,7 @@ async fn main() {
name: "wk2".to_string(),
},
],
switch: vec![],
switch_client: switch_client.clone(),
};
let inventory = Inventory {
@ -166,3 +188,9 @@ async fn main() {
.await
.unwrap();
}
#[derive(Secret, Serialize, Deserialize, Debug)]
pub struct BrocadeSwitchAuth {
pub username: String,
pub password: String,
}

View File

@ -19,3 +19,4 @@ log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }
serde.workspace = true
brocade = { path = "../../brocade" }

View File

@ -1,7 +1,8 @@
use brocade::BrocadeOptions;
use cidr::Ipv4Cidr;
use harmony::{
hardware::{Location, SwitchGroup},
infra::opnsense::OPNSenseManagementInterface,
infra::{brocade::BrocadeSwitchClient, opnsense::OPNSenseManagementInterface},
inventory::Inventory,
topology::{HAClusterTopology, LogicalHost, UnmanagedRouter},
};
@ -22,6 +23,26 @@ pub async fn get_topology() -> HAClusterTopology {
name: String::from("opnsense-1"),
};
let switch_auth = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
.await
.expect("Failed to get credentials");
let switches: Vec<IpAddr> = vec![ip!("192.168.1.101")]; // TODO: Adjust me
let brocade_options = Some(BrocadeOptions {
dry_run: *harmony::config::DRY_RUN,
..Default::default()
});
let switch_client = BrocadeSwitchClient::init(
&switches,
&switch_auth.username,
&switch_auth.password,
brocade_options,
)
.await
.expect("Failed to connect to switch");
let switch_client = Arc::new(switch_client);
let config = SecretManager::get_or_prompt::<OPNSenseFirewallConfig>().await;
let config = config.unwrap();
@ -58,7 +79,7 @@ pub async fn get_topology() -> HAClusterTopology {
name: "bootstrap".to_string(),
},
workers: vec![],
switch: vec![],
switch_client: switch_client.clone(),
}
}
@ -75,3 +96,9 @@ pub fn get_inventory() -> Inventory {
control_plane_host: vec![],
}
}
#[derive(Secret, Serialize, Deserialize, Debug)]
pub struct BrocadeSwitchAuth {
pub username: String,
pub password: String,
}

View File

@ -19,3 +19,4 @@ log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }
serde.workspace = true
brocade = { path = "../../brocade" }

View File

@ -1,13 +1,15 @@
use brocade::BrocadeOptions;
use cidr::Ipv4Cidr;
use harmony::{
config::secret::OPNSenseFirewallCredentials,
hardware::{Location, SwitchGroup},
infra::opnsense::OPNSenseManagementInterface,
infra::{brocade::BrocadeSwitchClient, opnsense::OPNSenseManagementInterface},
inventory::Inventory,
topology::{HAClusterTopology, LogicalHost, UnmanagedRouter},
};
use harmony_macros::{ip, ipv4};
use harmony_secret::SecretManager;
use harmony_secret::{Secret, SecretManager};
use serde::{Deserialize, Serialize};
use std::{net::IpAddr, sync::Arc};
pub async fn get_topology() -> HAClusterTopology {
@ -16,6 +18,26 @@ pub async fn get_topology() -> HAClusterTopology {
name: String::from("opnsense-1"),
};
let switch_auth = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
.await
.expect("Failed to get credentials");
let switches: Vec<IpAddr> = vec![ip!("192.168.1.101")]; // TODO: Adjust me
let brocade_options = Some(BrocadeOptions {
dry_run: *harmony::config::DRY_RUN,
..Default::default()
});
let switch_client = BrocadeSwitchClient::init(
&switches,
&switch_auth.username,
&switch_auth.password,
brocade_options,
)
.await
.expect("Failed to connect to switch");
let switch_client = Arc::new(switch_client);
let config = SecretManager::get_or_prompt::<OPNSenseFirewallCredentials>().await;
let config = config.unwrap();
@ -52,7 +74,7 @@ pub async fn get_topology() -> HAClusterTopology {
name: "cp0".to_string(),
},
workers: vec![],
switch: vec![],
switch_client: switch_client.clone(),
}
}
@ -69,3 +91,9 @@ pub fn get_inventory() -> Inventory {
control_plane_host: vec![],
}
}
#[derive(Secret, Serialize, Deserialize, Debug)]
pub struct BrocadeSwitchAuth {
pub username: String,
pub password: String,
}

View File

@ -16,3 +16,6 @@ harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true }
env_logger = { workspace = true }
url = { workspace = true }
harmony_secret = { path = "../../harmony_secret" }
brocade = { path = "../../brocade" }
serde = { workspace = true }

View File

@ -3,10 +3,11 @@ use std::{
sync::Arc,
};
use brocade::BrocadeOptions;
use cidr::Ipv4Cidr;
use harmony::{
hardware::{HostCategory, Location, PhysicalHost, SwitchGroup},
infra::opnsense::OPNSenseManagementInterface,
infra::{brocade::BrocadeSwitchClient, opnsense::OPNSenseManagementInterface},
inventory::Inventory,
modules::{
dummy::{ErrorScore, PanicScore, SuccessScore},
@ -18,7 +19,9 @@ use harmony::{
topology::{LogicalHost, UnmanagedRouter},
};
use harmony_macros::{ip, mac_address};
use harmony_secret::{Secret, SecretManager};
use harmony_types::net::Url;
use serde::{Deserialize, Serialize};
#[tokio::main]
async fn main() {
@ -27,6 +30,26 @@ async fn main() {
name: String::from("opnsense-1"),
};
let switch_auth = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
.await
.expect("Failed to get credentials");
let switches: Vec<IpAddr> = vec![ip!("192.168.5.101")]; // TODO: Adjust me
let brocade_options = Some(BrocadeOptions {
dry_run: *harmony::config::DRY_RUN,
..Default::default()
});
let switch_client = BrocadeSwitchClient::init(
&switches,
&switch_auth.username,
&switch_auth.password,
brocade_options,
)
.await
.expect("Failed to connect to switch");
let switch_client = Arc::new(switch_client);
let opnsense = Arc::new(
harmony::infra::opnsense::OPNSenseFirewall::new(firewall, None, "root", "opnsense").await,
);
@ -54,7 +77,7 @@ async fn main() {
name: "cp0".to_string(),
},
workers: vec![],
switch: vec![],
switch_client: switch_client.clone(),
};
let inventory = Inventory {
@ -109,3 +132,9 @@ async fn main() {
.await
.unwrap();
}
#[derive(Secret, Serialize, Deserialize, Debug)]
pub struct BrocadeSwitchAuth {
pub username: String,
pub password: String,
}

View File

@ -0,0 +1,11 @@
[package]
name = "example-remove-rook-osd"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { version = "0.1.0", path = "../../harmony" }
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
tokio.workspace = true

View File

@ -0,0 +1,18 @@
use harmony::{
inventory::Inventory, modules::storage::ceph::ceph_remove_osd_score::CephRemoveOsd,
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let ceph_score = CephRemoveOsd {
osd_deployment_name: "rook-ceph-osd-2".to_string(),
rook_ceph_namespace: "rook-ceph".to_string(),
};
let topology = K8sAnywhereTopology::from_env();
let inventory = Inventory::autoload();
harmony_cli::run(inventory, topology, vec![Box::new(ceph_score)], None)
.await
.unwrap();
}

View File

@ -3,7 +3,7 @@ use harmony::{
modules::{
application::{
ApplicationScore, RustWebFramework, RustWebapp,
features::{PackagingDeployment, rhob_monitoring::Monitoring},
features::{Monitoring, PackagingDeployment},
},
monitoring::alert_channel::discord_alert_channel::DiscordWebhook,
},

View File

@ -12,11 +12,11 @@ pub type FirewallGroup = Vec<PhysicalHost>;
pub struct PhysicalHost {
pub id: Id,
pub category: HostCategory,
pub network: Vec<NetworkInterface>, // FIXME: Don't use harmony_inventory_agent::NetworkInterface
pub storage: Vec<StorageDrive>, // FIXME: Don't use harmony_inventory_agent::StorageDrive
pub network: Vec<NetworkInterface>,
pub storage: Vec<StorageDrive>,
pub labels: Vec<Label>,
pub memory_modules: Vec<MemoryModule>, // FIXME: Don't use harmony_inventory_agent::MemoryModule
pub cpus: Vec<CPU>, // FIXME: Don't use harmony_inventory_agent::CPU
pub memory_modules: Vec<MemoryModule>,
pub cpus: Vec<CPU>,
}
impl PhysicalHost {

View File

@ -30,6 +30,7 @@ pub enum InterpretName {
Lamp,
ApplicationMonitoring,
K8sPrometheusCrdAlerting,
CephRemoveOsd,
DiscoverInventoryAgent,
CephClusterHealth,
Custom(&'static str),
@ -61,6 +62,7 @@ impl std::fmt::Display for InterpretName {
InterpretName::Lamp => f.write_str("LAMP"),
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
InterpretName::CephRemoveOsd => f.write_str("CephRemoveOsd"),
InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"),
InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"),
InterpretName::Custom(name) => f.write_str(name),

View File

@ -1,10 +1,9 @@
use async_trait::async_trait;
use brocade::BrocadeOptions;
use harmony_macros::ip;
use harmony_secret::SecretManager;
use harmony_types::net::MacAddress;
use harmony_types::net::Url;
use harmony_types::switch::PortLocation;
use harmony_types::{
net::{MacAddress, Url},
switch::PortLocation,
};
use k8s_openapi::api::core::v1::Namespace;
use kube::api::ObjectMeta;
use log::debug;
@ -13,44 +12,20 @@ use log::info;
use crate::data::FileContent;
use crate::executors::ExecutorError;
use crate::hardware::PhysicalHost;
use crate::infra::brocade::BrocadeSwitchAuth;
use crate::infra::brocade::BrocadeSwitchClient;
use crate::modules::okd::crd::InstallPlanApproval;
use crate::modules::okd::crd::OperatorGroup;
use crate::modules::okd::crd::OperatorGroupSpec;
use crate::modules::okd::crd::Subscription;
use crate::modules::okd::crd::SubscriptionSpec;
use crate::modules::okd::crd::nmstate;
use crate::modules::okd::crd::nmstate::NMState;
use crate::modules::okd::crd::nmstate::NodeNetworkConfigurationPolicy;
use crate::modules::okd::crd::nmstate::NodeNetworkConfigurationPolicySpec;
use crate::modules::okd::crd::{
InstallPlanApproval, OperatorGroup, OperatorGroupSpec, Subscription, SubscriptionSpec,
nmstate::{self, NMState, NodeNetworkConfigurationPolicy, NodeNetworkConfigurationPolicySpec},
};
use crate::topology::PxeOptions;
use super::DHCPStaticEntry;
use super::DhcpServer;
use super::DnsRecord;
use super::DnsRecordType;
use super::DnsServer;
use super::Firewall;
use super::HostNetworkConfig;
use super::HttpServer;
use super::IpAddress;
use super::K8sclient;
use super::LoadBalancer;
use super::LoadBalancerService;
use super::LogicalHost;
use super::PreparationError;
use super::PreparationOutcome;
use super::Router;
use super::Switch;
use super::SwitchClient;
use super::SwitchError;
use super::TftpServer;
use super::{
DHCPStaticEntry, DhcpServer, DnsRecord, DnsRecordType, DnsServer, Firewall, HostNetworkConfig,
HttpServer, IpAddress, K8sclient, LoadBalancer, LoadBalancerService, LogicalHost,
PreparationError, PreparationOutcome, Router, Switch, SwitchClient, SwitchError, TftpServer,
Topology, k8s::K8sClient,
};
use super::Topology;
use super::k8s::K8sClient;
use std::collections::BTreeMap;
use std::net::IpAddr;
use std::sync::Arc;
#[derive(Debug, Clone)]
@ -63,10 +38,10 @@ pub struct HAClusterTopology {
pub tftp_server: Arc<dyn TftpServer>,
pub http_server: Arc<dyn HttpServer>,
pub dns_server: Arc<dyn DnsServer>,
pub switch_client: Arc<dyn SwitchClient>,
pub bootstrap_host: LogicalHost,
pub control_plane: Vec<LogicalHost>,
pub workers: Vec<LogicalHost>,
pub switch: Vec<LogicalHost>,
}
#[async_trait]
@ -300,36 +275,15 @@ impl HAClusterTopology {
}
}
async fn get_switch_client(&self) -> Result<Box<dyn SwitchClient>, SwitchError> {
let auth = SecretManager::get_or_prompt::<BrocadeSwitchAuth>()
.await
.map_err(|e| SwitchError::new(format!("Failed to get credentials: {e}")))?;
// FIXME: We assume Brocade switches
let switches: Vec<IpAddr> = self.switch.iter().map(|s| s.ip).collect();
let brocade_options = Some(BrocadeOptions {
dry_run: *crate::config::DRY_RUN,
..Default::default()
});
let client =
BrocadeSwitchClient::init(&switches, &auth.username, &auth.password, brocade_options)
.await
.map_err(|e| SwitchError::new(format!("Failed to connect to switch: {e}")))?;
Ok(Box::new(client))
}
async fn configure_port_channel(
&self,
host: &PhysicalHost,
config: &HostNetworkConfig,
) -> Result<(), SwitchError> {
debug!("Configuring port channel: {config:#?}");
let client = self.get_switch_client().await?;
let switch_ports = config.switch_ports.iter().map(|s| s.port.clone()).collect();
client
self.switch_client
.configure_port_channel(&format!("Harmony_{}", host.id), switch_ports)
.await
.map_err(|e| SwitchError::new(format!("Failed to configure switch: {e}")))?;
@ -353,10 +307,10 @@ impl HAClusterTopology {
tftp_server: dummy_infra.clone(),
http_server: dummy_infra.clone(),
dns_server: dummy_infra.clone(),
switch_client: dummy_infra.clone(),
bootstrap_host: dummy_host,
control_plane: vec![],
workers: vec![],
switch: vec![],
}
}
}
@ -514,8 +468,7 @@ impl HttpServer for HAClusterTopology {
#[async_trait]
impl Switch for HAClusterTopology {
async fn setup_switch(&self) -> Result<(), SwitchError> {
let client = self.get_switch_client().await?;
client.setup().await?;
self.switch_client.setup().await?;
Ok(())
}
@ -523,8 +476,7 @@ impl Switch for HAClusterTopology {
&self,
mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError> {
let client = self.get_switch_client().await?;
let port = client.find_port(mac_address).await?;
let port = self.switch_client.find_port(mac_address).await?;
Ok(port)
}
@ -533,7 +485,7 @@ impl Switch for HAClusterTopology {
host: &PhysicalHost,
config: HostNetworkConfig,
) -> Result<(), SwitchError> {
// self.configure_bond(host, &config).await?;
self.configure_bond(host, &config).await?;
self.configure_port_channel(host, &config).await
}
}
@ -724,3 +676,25 @@ impl DnsServer for DummyInfra {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
}
#[async_trait]
impl SwitchClient for DummyInfra {
async fn setup(&self) -> Result<(), SwitchError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
async fn find_port(
&self,
_mac_address: &MacAddress,
) -> Result<Option<PortLocation>, SwitchError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
async fn configure_port_channel(
&self,
_channel_name: &str,
_switch_ports: Vec<PortLocation>,
) -> Result<u8, SwitchError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
}

View File

@ -5,11 +5,12 @@ use k8s_openapi::{
ClusterResourceScope, NamespaceResourceScope,
api::{
apps::v1::Deployment,
core::v1::{Pod, PodStatus},
core::v1::{Pod, ServiceAccount},
},
apimachinery::pkg::version::Info,
};
use kube::{
Client, Config, Error, Resource,
Client, Config, Discovery, Error, Resource,
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse,
@ -21,9 +22,9 @@ use kube::{
api::{ApiResource, GroupVersionKind},
runtime::wait::await_condition,
};
use log::{debug, error, trace};
use log::{debug, error, info, trace};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::{Value, json};
use serde_json::json;
use similar::TextDiff;
use tokio::{io::AsyncReadExt, time::sleep};
@ -59,6 +60,22 @@ impl K8sClient {
})
}
pub async fn service_account_api(&self, namespace: &str) -> Api<ServiceAccount> {
let api: Api<ServiceAccount> = Api::namespaced(self.client.clone(), namespace);
api
}
pub async fn get_apiserver_version(&self) -> Result<Info, Error> {
let client: Client = self.client.clone();
let version_info: Info = client.apiserver_version().await?;
Ok(version_info)
}
pub async fn discovery(&self) -> Result<Discovery, Error> {
let discovery: Discovery = Discovery::new(self.client.clone()).run().await?;
Ok(discovery)
}
pub async fn get_resource_json_value(
&self,
name: &str,
@ -80,10 +97,13 @@ impl K8sClient {
namespace: Option<&str>,
) -> Result<Option<Deployment>, Error> {
let deps: Api<Deployment> = if let Some(ns) = namespace {
debug!("getting namespaced deployment");
Api::namespaced(self.client.clone(), ns)
} else {
debug!("getting default namespace deployment");
Api::default_namespaced(self.client.clone())
};
debug!("getting deployment {} in ns {}", name, namespace.unwrap());
Ok(deps.get_opt(name).await?)
}
@ -114,7 +134,7 @@ impl K8sClient {
}
});
let pp = PatchParams::default();
let scale = Patch::Apply(&patch);
let scale = Patch::Merge(&patch);
deployments.patch_scale(name, &pp, &scale).await?;
Ok(())
}

View File

@ -1,7 +1,12 @@
use std::{process::Command, sync::Arc};
use std::{collections::BTreeMap, process::Command, sync::Arc};
use async_trait::async_trait;
use kube::api::GroupVersionKind;
use base64::{Engine, engine::general_purpose};
use k8s_openapi::api::{
core::v1::Secret,
rbac::v1::{ClusterRoleBinding, RoleRef, Subject},
};
use kube::api::{DynamicObject, GroupVersionKind, ObjectMeta};
use log::{debug, info, warn};
use serde::Serialize;
use tokio::sync::OnceCell;
@ -12,14 +17,26 @@ use crate::{
inventory::Inventory,
modules::{
k3d::K3DInstallationScore,
monitoring::kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus,
prometheus_operator::prometheus_operator_helm_chart_score,
rhob_alertmanager_config::RHOBObservability,
k8s::ingress::{K8sIngressScore, PathType},
monitoring::{
grafana::{grafana::Grafana, helm::helm_grafana::grafana_helm_chart_score},
kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus,
crd_grafana::{
Grafana as GrafanaCRD, GrafanaCom, GrafanaDashboard,
GrafanaDashboardDatasource, GrafanaDashboardSpec, GrafanaDatasource,
GrafanaDatasourceConfig, GrafanaDatasourceJsonData,
GrafanaDatasourceSecureJsonData, GrafanaDatasourceSpec, GrafanaSpec,
},
crd_prometheuses::LabelSelector,
prometheus_operator::prometheus_operator_helm_chart_score,
rhob_alertmanager_config::RHOBObservability,
service_monitor::ServiceMonitor,
},
},
prometheus::{
k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore,
prometheus::PrometheusApplicationMonitoring, rhob_alerting_score::RHOBAlertingScore,
prometheus::PrometheusMonitoring, rhob_alerting_score::RHOBAlertingScore,
},
},
score::Score,
@ -47,6 +64,13 @@ struct K8sState {
message: String,
}
#[derive(Debug, Clone)]
pub enum KubernetesDistribution {
OpenshiftFamily,
K3sFamily,
Default,
}
#[derive(Debug, Clone)]
enum K8sSource {
LocalK3d,
@ -57,6 +81,7 @@ enum K8sSource {
pub struct K8sAnywhereTopology {
k8s_state: Arc<OnceCell<Option<K8sState>>>,
tenant_manager: Arc<OnceCell<K8sTenantManager>>,
k8s_distribution: Arc<OnceCell<KubernetesDistribution>>,
config: Arc<K8sAnywhereConfig>,
}
@ -78,41 +103,172 @@ impl K8sclient for K8sAnywhereTopology {
}
#[async_trait]
impl PrometheusApplicationMonitoring<CRDPrometheus> for K8sAnywhereTopology {
impl Grafana for K8sAnywhereTopology {
async fn ensure_grafana_operator(
&self,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
debug!("ensure grafana operator");
let client = self.k8s_client().await.unwrap();
let grafana_gvk = GroupVersionKind {
group: "grafana.integreatly.org".to_string(),
version: "v1beta1".to_string(),
kind: "Grafana".to_string(),
};
let name = "grafanas.grafana.integreatly.org";
let ns = "grafana";
let grafana_crd = client
.get_resource_json_value(name, Some(ns), &grafana_gvk)
.await;
match grafana_crd {
Ok(_) => {
return Ok(PreparationOutcome::Success {
details: "Found grafana CRDs in cluster".to_string(),
});
}
Err(_) => {
return self
.install_grafana_operator(inventory, Some("grafana"))
.await;
}
};
}
async fn install_grafana(&self) -> Result<PreparationOutcome, PreparationError> {
let ns = "grafana";
let mut label = BTreeMap::new();
label.insert("dashboards".to_string(), "grafana".to_string());
let label_selector = LabelSelector {
match_labels: label.clone(),
match_expressions: vec![],
};
let client = self.k8s_client().await?;
let grafana = self.build_grafana(ns, &label);
client.apply(&grafana, Some(ns)).await?;
//TODO change this to a ensure ready or something better than just a timeout
client
.wait_until_deployment_ready(
"grafana-grafana-deployment".to_string(),
Some("grafana"),
Some(30),
)
.await?;
let sa_name = "grafana-grafana-sa";
let token_secret_name = "grafana-sa-token-secret";
let sa_token_secret = self.build_sa_token_secret(token_secret_name, sa_name, ns);
client.apply(&sa_token_secret, Some(ns)).await?;
let secret_gvk = GroupVersionKind {
group: "".to_string(),
version: "v1".to_string(),
kind: "Secret".to_string(),
};
let secret = client
.get_resource_json_value(token_secret_name, Some(ns), &secret_gvk)
.await?;
let token = format!(
"Bearer {}",
self.extract_and_normalize_token(&secret).unwrap()
);
debug!("creating grafana clusterrole binding");
let clusterrolebinding =
self.build_cluster_rolebinding(sa_name, "cluster-monitoring-view", ns);
client.apply(&clusterrolebinding, Some(ns)).await?;
debug!("creating grafana datasource crd");
let thanos_url = format!(
"https://{}",
self.get_domain("thanos-querier-openshift-monitoring")
.await
.unwrap()
);
let thanos_openshift_datasource = self.build_grafana_datasource(
"thanos-openshift-monitoring",
ns,
&label_selector,
&thanos_url,
&token,
);
client.apply(&thanos_openshift_datasource, Some(ns)).await?;
debug!("creating grafana dashboard crd");
let dashboard = self.build_grafana_dashboard(ns, &label_selector);
client.apply(&dashboard, Some(ns)).await?;
debug!("creating grafana ingress");
let grafana_ingress = self.build_grafana_ingress(ns).await;
grafana_ingress
.interpret(&Inventory::empty(), self)
.await
.map_err(|e| PreparationError::new(e.to_string()))?;
Ok(PreparationOutcome::Success {
details: "Installed grafana composants".to_string(),
})
}
}
#[async_trait]
impl PrometheusMonitoring<CRDPrometheus> for K8sAnywhereTopology {
async fn install_prometheus(
&self,
sender: &CRDPrometheus,
inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>,
_inventory: &Inventory,
_receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>,
) -> Result<PreparationOutcome, PreparationError> {
let client = self.k8s_client().await?;
for monitor in sender.service_monitor.iter() {
client
.apply(monitor, Some(&sender.namespace))
.await
.map_err(|e| PreparationError::new(e.to_string()))?;
}
Ok(PreparationOutcome::Success {
details: "successfuly installed prometheus components".to_string(),
})
}
async fn ensure_prometheus_operator(
&self,
sender: &CRDPrometheus,
_inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
let po_result = self.ensure_prometheus_operator(sender).await?;
if po_result == PreparationOutcome::Noop {
debug!("Skipping Prometheus CR installation due to missing operator.");
return Ok(po_result);
}
let result = self
.get_k8s_prometheus_application_score(sender.clone(), receivers)
.await
.interpret(inventory, self)
.await;
match result {
Ok(outcome) => match outcome.status {
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
details: outcome.message,
}),
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
_ => Err(PreparationError::new(outcome.message)),
},
Err(err) => Err(PreparationError::new(err.to_string())),
match po_result {
PreparationOutcome::Success { details: _ } => {
debug!("Detected prometheus crds operator present in cluster.");
return Ok(po_result);
}
PreparationOutcome::Noop => {
debug!("Skipping Prometheus CR installation due to missing operator.");
return Ok(po_result);
}
}
}
}
#[async_trait]
impl PrometheusApplicationMonitoring<RHOBObservability> for K8sAnywhereTopology {
impl PrometheusMonitoring<RHOBObservability> for K8sAnywhereTopology {
async fn install_prometheus(
&self,
sender: &RHOBObservability,
@ -146,6 +302,14 @@ impl PrometheusApplicationMonitoring<RHOBObservability> for K8sAnywhereTopology
Err(err) => Err(PreparationError::new(err.to_string())),
}
}
async fn ensure_prometheus_operator(
&self,
sender: &RHOBObservability,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
todo!()
}
}
impl Serialize for K8sAnywhereTopology {
@ -162,6 +326,7 @@ impl K8sAnywhereTopology {
Self {
k8s_state: Arc::new(OnceCell::new()),
tenant_manager: Arc::new(OnceCell::new()),
k8s_distribution: Arc::new(OnceCell::new()),
config: Arc::new(K8sAnywhereConfig::from_env()),
}
}
@ -170,10 +335,216 @@ impl K8sAnywhereTopology {
Self {
k8s_state: Arc::new(OnceCell::new()),
tenant_manager: Arc::new(OnceCell::new()),
k8s_distribution: Arc::new(OnceCell::new()),
config: Arc::new(config),
}
}
pub async fn get_k8s_distribution(&self) -> Result<&KubernetesDistribution, PreparationError> {
self.k8s_distribution
.get_or_try_init(async || {
let client = self.k8s_client().await.unwrap();
let discovery = client.discovery().await.map_err(|e| {
PreparationError::new(format!("Could not discover API groups: {}", e))
})?;
let version = client.get_apiserver_version().await.map_err(|e| {
PreparationError::new(format!("Could not get server version: {}", e))
})?;
// OpenShift / OKD
if discovery
.groups()
.any(|g| g.name() == "project.openshift.io")
{
return Ok(KubernetesDistribution::OpenshiftFamily);
}
// K3d / K3s
if version.git_version.contains("k3s") {
return Ok(KubernetesDistribution::K3sFamily);
}
return Ok(KubernetesDistribution::Default);
})
.await
}
fn extract_and_normalize_token(&self, secret: &DynamicObject) -> Option<String> {
let token_b64 = secret
.data
.get("token")
.or_else(|| secret.data.get("data").and_then(|d| d.get("token")))
.and_then(|v| v.as_str())?;
let bytes = general_purpose::STANDARD.decode(token_b64).ok()?;
let s = String::from_utf8(bytes).ok()?;
let cleaned = s
.trim_matches(|c: char| c.is_whitespace() || c == '\0')
.to_string();
Some(cleaned)
}
pub fn build_cluster_rolebinding(
&self,
service_account_name: &str,
clusterrole_name: &str,
ns: &str,
) -> ClusterRoleBinding {
ClusterRoleBinding {
metadata: ObjectMeta {
name: Some(format!("{}-view-binding", service_account_name)),
..Default::default()
},
role_ref: RoleRef {
api_group: "rbac.authorization.k8s.io".into(),
kind: "ClusterRole".into(),
name: clusterrole_name.into(),
},
subjects: Some(vec![Subject {
kind: "ServiceAccount".into(),
name: service_account_name.into(),
namespace: Some(ns.into()),
..Default::default()
}]),
}
}
pub fn build_sa_token_secret(
&self,
secret_name: &str,
service_account_name: &str,
ns: &str,
) -> Secret {
let mut annotations = BTreeMap::new();
annotations.insert(
"kubernetes.io/service-account.name".to_string(),
service_account_name.to_string(),
);
Secret {
metadata: ObjectMeta {
name: Some(secret_name.into()),
namespace: Some(ns.into()),
annotations: Some(annotations),
..Default::default()
},
type_: Some("kubernetes.io/service-account-token".to_string()),
..Default::default()
}
}
fn build_grafana_datasource(
&self,
name: &str,
ns: &str,
label_selector: &LabelSelector,
url: &str,
token: &str,
) -> GrafanaDatasource {
let mut json_data = BTreeMap::new();
json_data.insert("timeInterval".to_string(), "5s".to_string());
GrafanaDatasource {
metadata: ObjectMeta {
name: Some(name.to_string()),
namespace: Some(ns.to_string()),
..Default::default()
},
spec: GrafanaDatasourceSpec {
instance_selector: label_selector.clone(),
allow_cross_namespace_import: Some(true),
values_from: None,
datasource: GrafanaDatasourceConfig {
access: "proxy".to_string(),
name: name.to_string(),
r#type: "prometheus".to_string(),
url: url.to_string(),
database: None,
json_data: Some(GrafanaDatasourceJsonData {
time_interval: Some("60s".to_string()),
http_header_name1: Some("Authorization".to_string()),
tls_skip_verify: Some(true),
oauth_pass_thru: Some(true),
}),
secure_json_data: Some(GrafanaDatasourceSecureJsonData {
http_header_value1: Some(format!("Bearer {token}")),
}),
is_default: Some(false),
editable: Some(true),
},
},
}
}
fn build_grafana_dashboard(
&self,
ns: &str,
label_selector: &LabelSelector,
) -> GrafanaDashboard {
let graf_dashboard = GrafanaDashboard {
metadata: ObjectMeta {
name: Some(format!("grafana-dashboard-{}", ns)),
namespace: Some(ns.to_string()),
..Default::default()
},
spec: GrafanaDashboardSpec {
resync_period: Some("30s".to_string()),
instance_selector: label_selector.clone(),
datasources: Some(vec![GrafanaDashboardDatasource {
input_name: "DS_PROMETHEUS".to_string(),
datasource_name: "thanos-openshift-monitoring".to_string(),
}]),
json: None,
grafana_com: Some(GrafanaCom {
id: 17406,
revision: None,
}),
},
};
graf_dashboard
}
fn build_grafana(&self, ns: &str, labels: &BTreeMap<String, String>) -> GrafanaCRD {
let grafana = GrafanaCRD {
metadata: ObjectMeta {
name: Some(format!("grafana-{}", ns)),
namespace: Some(ns.to_string()),
labels: Some(labels.clone()),
..Default::default()
},
spec: GrafanaSpec {
config: None,
admin_user: None,
admin_password: None,
ingress: None,
persistence: None,
resources: None,
},
};
grafana
}
async fn build_grafana_ingress(&self, ns: &str) -> K8sIngressScore {
let domain = self.get_domain(&format!("grafana-{}", ns)).await.unwrap();
let name = format!("{}-grafana", ns);
let backend_service = format!("grafana-{}-service", ns);
K8sIngressScore {
name: fqdn::fqdn!(&name),
host: fqdn::fqdn!(&domain),
backend_service: fqdn::fqdn!(&backend_service),
port: 3000,
path: Some("/".to_string()),
path_type: Some(PathType::Prefix),
namespace: Some(fqdn::fqdn!(&ns)),
ingress_class_name: Some("openshift-default".to_string()),
}
}
async fn get_cluster_observability_operator_prometheus_application_score(
&self,
sender: RHOBObservability,
@ -191,13 +562,14 @@ impl K8sAnywhereTopology {
&self,
sender: CRDPrometheus,
receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>,
service_monitors: Option<Vec<ServiceMonitor>>,
) -> K8sPrometheusCRDAlertingScore {
K8sPrometheusCRDAlertingScore {
return K8sPrometheusCRDAlertingScore {
sender,
receivers: receivers.unwrap_or_default(),
service_monitors: vec![],
service_monitors: service_monitors.unwrap_or_default(),
prometheus_rules: vec![],
}
};
}
async fn openshift_ingress_operator_available(&self) -> Result<(), PreparationError> {
@ -465,6 +837,30 @@ impl K8sAnywhereTopology {
details: "prometheus operator present in cluster".into(),
})
}
async fn install_grafana_operator(
&self,
inventory: &Inventory,
ns: Option<&str>,
) -> Result<PreparationOutcome, PreparationError> {
let namespace = ns.unwrap_or("grafana");
info!("installing grafana operator in ns {namespace}");
let tenant = self.get_k8s_tenant_manager()?.get_tenant_config().await;
let mut namespace_scope = false;
if tenant.is_some() {
namespace_scope = true;
}
let _grafana_operator_score = grafana_helm_chart_score(namespace, namespace_scope)
.interpret(inventory, self)
.await
.map_err(|e| PreparationError::new(e.to_string()));
Ok(PreparationOutcome::Success {
details: format!(
"Successfully installed grafana operator in ns {}",
ns.unwrap()
),
})
}
}
#[derive(Clone, Debug)]

View File

@ -28,13 +28,7 @@ pub trait LoadBalancer: Send + Sync {
&self,
service: &LoadBalancerService,
) -> Result<(), ExecutorError> {
debug!(
"Listing LoadBalancer services {:?}",
self.list_services().await
);
if !self.list_services().await.contains(service) {
self.add_service(service).await?;
}
self.add_service(service).await?;
Ok(())
}
}

View File

@ -1,4 +1,10 @@
use std::{error::Error, net::Ipv4Addr, str::FromStr, sync::Arc};
use std::{
error::Error,
fmt::{self, Debug},
net::Ipv4Addr,
str::FromStr,
sync::Arc,
};
use async_trait::async_trait;
use derive_new::new;
@ -19,8 +25,8 @@ pub struct DHCPStaticEntry {
pub ip: Ipv4Addr,
}
impl std::fmt::Display for DHCPStaticEntry {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for DHCPStaticEntry {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
let mac = self
.mac
.iter()
@ -42,8 +48,8 @@ pub trait Firewall: Send + Sync {
fn get_host(&self) -> LogicalHost;
}
impl std::fmt::Debug for dyn Firewall {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl Debug for dyn Firewall {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!("Firewall {}", self.get_ip()))
}
}
@ -65,7 +71,7 @@ pub struct PxeOptions {
}
#[async_trait]
pub trait DhcpServer: Send + Sync + std::fmt::Debug {
pub trait DhcpServer: Send + Sync + Debug {
async fn add_static_mapping(&self, entry: &DHCPStaticEntry) -> Result<(), ExecutorError>;
async fn remove_static_mapping(&self, mac: &MacAddress) -> Result<(), ExecutorError>;
async fn list_static_mappings(&self) -> Vec<(MacAddress, IpAddress)>;
@ -104,8 +110,8 @@ pub trait DnsServer: Send + Sync {
}
}
impl std::fmt::Debug for dyn DnsServer {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl Debug for dyn DnsServer {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_fmt(format_args!("DnsServer {}", self.get_ip()))
}
}
@ -141,8 +147,8 @@ pub enum DnsRecordType {
TXT,
}
impl std::fmt::Display for DnsRecordType {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for DnsRecordType {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
DnsRecordType::A => write!(f, "A"),
DnsRecordType::AAAA => write!(f, "AAAA"),
@ -216,8 +222,8 @@ pub struct SwitchError {
msg: String,
}
impl std::fmt::Display for SwitchError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
impl fmt::Display for SwitchError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.msg)
}
}
@ -225,7 +231,7 @@ impl std::fmt::Display for SwitchError {
impl Error for SwitchError {}
#[async_trait]
pub trait SwitchClient: Send + Sync {
pub trait SwitchClient: Debug + Send + Sync {
/// Executes essential, idempotent, one-time initial configuration steps.
///
/// This is an opiniated procedure that setups a switch to provide high availability

View File

@ -21,6 +21,7 @@ pub struct AlertingInterpret<S: AlertSender> {
pub sender: S,
pub receivers: Vec<Box<dyn AlertReceiver<S>>>,
pub rules: Vec<Box<dyn AlertRule<S>>>,
pub scrape_targets: Option<Vec<Box<dyn ScrapeTarget<S>>>>,
}
#[async_trait]
@ -30,6 +31,7 @@ impl<S: AlertSender + Installable<T>, T: Topology> Interpret<T> for AlertingInte
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
debug!("hit sender configure for AlertingInterpret");
self.sender.configure(inventory, topology).await?;
for receiver in self.receivers.iter() {
receiver.install(&self.sender).await?;
@ -38,6 +40,12 @@ impl<S: AlertSender + Installable<T>, T: Topology> Interpret<T> for AlertingInte
debug!("installing rule: {:#?}", rule);
rule.install(&self.sender).await?;
}
if let Some(targets) = &self.scrape_targets {
for target in targets.iter() {
debug!("installing scrape_target: {:#?}", target);
target.install(&self.sender).await?;
}
}
self.sender.ensure_installed(inventory, topology).await?;
Ok(Outcome::success(format!(
"successfully installed alert sender {}",
@ -77,6 +85,7 @@ pub trait AlertRule<S: AlertSender>: std::fmt::Debug + Send + Sync {
}
#[async_trait]
pub trait ScrapeTarget<S: AlertSender> {
async fn install(&self, sender: &S) -> Result<(), InterpretError>;
pub trait ScrapeTarget<S: AlertSender>: std::fmt::Debug + Send + Sync {
async fn install(&self, sender: &S) -> Result<Outcome, InterpretError>;
fn clone_box(&self) -> Box<dyn ScrapeTarget<S>>;
}

View File

@ -1,15 +1,14 @@
use async_trait::async_trait;
use brocade::{BrocadeClient, BrocadeOptions, InterSwitchLink, InterfaceStatus, PortOperatingMode};
use harmony_secret::Secret;
use harmony_types::{
net::{IpAddress, MacAddress},
switch::{PortDeclaration, PortLocation},
};
use option_ext::OptionExt;
use serde::{Deserialize, Serialize};
use crate::topology::{SwitchClient, SwitchError};
#[derive(Debug)]
pub struct BrocadeSwitchClient {
brocade: Box<dyn BrocadeClient + Send + Sync>,
}
@ -114,12 +113,6 @@ impl SwitchClient for BrocadeSwitchClient {
}
}
#[derive(Secret, Serialize, Deserialize, Debug)]
pub struct BrocadeSwitchAuth {
pub username: String,
pub password: String,
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, Mutex};
@ -235,7 +228,7 @@ mod tests {
assert_that!(*configured_interfaces).is_empty();
}
#[derive(Clone)]
#[derive(Debug, Clone)]
struct FakeBrocadeClient {
stack_topology: Vec<InterSwitchLink>,
interfaces: Vec<InterfaceInfo>,

View File

@ -26,19 +26,13 @@ impl LoadBalancer for OPNSenseFirewall {
}
async fn add_service(&self, service: &LoadBalancerService) -> Result<(), ExecutorError> {
warn!(
"TODO : the current implementation does not check / cleanup / merge with existing haproxy services properly. Make sure to manually verify that the configuration is correct after executing any operation here"
);
let mut config = self.opnsense_config.write().await;
let mut load_balancer = config.load_balancer();
let (frontend, backend, servers, healthcheck) =
harmony_load_balancer_service_to_haproxy_xml(service);
let mut load_balancer = config.load_balancer();
load_balancer.add_backend(backend);
load_balancer.add_frontend(frontend);
load_balancer.add_servers(servers);
if let Some(healthcheck) = healthcheck {
load_balancer.add_healthcheck(healthcheck);
}
load_balancer.configure_service(frontend, backend, servers, healthcheck);
Ok(())
}
@ -106,7 +100,7 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer(
.backends
.backends
.iter()
.find(|b| b.uuid == frontend.default_backend);
.find(|b| Some(b.uuid.clone()) == frontend.default_backend);
let mut health_check = None;
match matching_backend {
@ -116,8 +110,7 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer(
}
None => {
warn!(
"HAProxy config could not find a matching backend for frontend {:?}",
frontend
"HAProxy config could not find a matching backend for frontend {frontend:?}"
);
}
}
@ -152,11 +145,11 @@ pub(crate) fn get_servers_for_backend(
.servers
.iter()
.filter_map(|server| {
let address = server.address.clone()?;
let port = server.port?;
if backend_servers.contains(&server.uuid.as_str()) {
return Some(BackendServer {
address: server.address.clone(),
port: server.port,
});
return Some(BackendServer { address, port });
}
None
})
@ -347,7 +340,7 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
name: format!("frontend_{}", service.listening_port),
bind: service.listening_port.to_string(),
mode: "tcp".to_string(), // TODO do not depend on health check here
default_backend: backend.uuid.clone(),
default_backend: Some(backend.uuid.clone()),
..Default::default()
};
info!("HAPRoxy frontend and backend mode currently hardcoded to tcp");
@ -361,8 +354,8 @@ fn server_to_haproxy_server(server: &BackendServer) -> HAProxyServer {
uuid: Uuid::new_v4().to_string(),
name: format!("{}_{}", &server.address, &server.port),
enabled: 1,
address: server.address.clone(),
port: server.port,
address: Some(server.address.clone()),
port: Some(server.port),
mode: "active".to_string(),
server_type: "static".to_string(),
..Default::default()
@ -385,8 +378,8 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: "192.168.1.1".to_string(),
port: 80,
address: Some("192.168.1.1".to_string()),
port: Some(80),
..Default::default()
};
haproxy.servers.servers.push(server);
@ -411,8 +404,8 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: "192.168.1.1".to_string(),
port: 80,
address: Some("192.168.1.1".to_string()),
port: Some(80),
..Default::default()
};
haproxy.servers.servers.push(server);
@ -431,8 +424,8 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: "192.168.1.1".to_string(),
port: 80,
address: Some("192.168.1.1".to_string()),
port: Some(80),
..Default::default()
};
haproxy.servers.servers.push(server);
@ -453,16 +446,16 @@ mod tests {
let mut haproxy = HAProxy::default();
let server = HAProxyServer {
uuid: "server1".to_string(),
address: "some-hostname.test.mcd".to_string(),
port: 80,
address: Some("some-hostname.test.mcd".to_string()),
port: Some(80),
..Default::default()
};
haproxy.servers.servers.push(server);
let server = HAProxyServer {
uuid: "server2".to_string(),
address: "192.168.1.2".to_string(),
port: 8080,
address: Some("192.168.1.2".to_string()),
port: Some(8080),
..Default::default()
};
haproxy.servers.servers.push(server);

View File

@ -2,7 +2,11 @@ use crate::modules::application::{
Application, ApplicationFeature, InstallationError, InstallationOutcome,
};
use crate::modules::monitoring::application_monitoring::application_monitoring_score::ApplicationMonitoringScore;
use crate::modules::monitoring::grafana::grafana::Grafana;
use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::CRDPrometheus;
use crate::modules::monitoring::kube_prometheus::crd::service_monitor::{
ServiceMonitor, ServiceMonitorSpec,
};
use crate::topology::MultiTargetTopology;
use crate::topology::ingress::Ingress;
use crate::{
@ -14,7 +18,7 @@ use crate::{
topology::{HelmCommand, K8sclient, Topology, tenant::TenantManager},
};
use crate::{
modules::prometheus::prometheus::PrometheusApplicationMonitoring,
modules::prometheus::prometheus::PrometheusMonitoring,
topology::oberservability::monitoring::AlertReceiver,
};
use async_trait::async_trait;
@ -22,6 +26,7 @@ use base64::{Engine as _, engine::general_purpose};
use harmony_secret::SecretManager;
use harmony_secret_derive::Secret;
use harmony_types::net::Url;
use kube::api::ObjectMeta;
use log::{debug, info};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
@ -40,7 +45,8 @@ impl<
+ TenantManager
+ K8sclient
+ MultiTargetTopology
+ PrometheusApplicationMonitoring<CRDPrometheus>
+ PrometheusMonitoring<CRDPrometheus>
+ Grafana
+ Ingress
+ std::fmt::Debug,
> ApplicationFeature<T> for Monitoring
@ -57,10 +63,20 @@ impl<
.unwrap_or_else(|| self.application.name());
let domain = topology.get_domain("ntfy").await.unwrap();
let app_service_monitor = ServiceMonitor {
metadata: ObjectMeta {
name: Some(self.application.name()),
namespace: Some(namespace.clone()),
..Default::default()
},
spec: ServiceMonitorSpec::default(),
};
let mut alerting_score = ApplicationMonitoringScore {
sender: CRDPrometheus {
namespace: namespace.clone(),
client: topology.k8s_client().await.unwrap(),
service_monitor: vec![app_service_monitor],
},
application: self.application.clone(),
receivers: self.alert_receiver.clone(),

View File

@ -18,7 +18,7 @@ use crate::{
topology::{HelmCommand, K8sclient, Topology, tenant::TenantManager},
};
use crate::{
modules::prometheus::prometheus::PrometheusApplicationMonitoring,
modules::prometheus::prometheus::PrometheusMonitoring,
topology::oberservability::monitoring::AlertReceiver,
};
use async_trait::async_trait;
@ -42,7 +42,7 @@ impl<
+ MultiTargetTopology
+ Ingress
+ std::fmt::Debug
+ PrometheusApplicationMonitoring<RHOBObservability>,
+ PrometheusMonitoring<RHOBObservability>,
> ApplicationFeature<T> for Monitoring
{
async fn ensure_installed(

View File

@ -0,0 +1,209 @@
use std::sync::Arc;
use async_trait::async_trait;
use harmony_types::id::Id;
use kube::{CustomResource, api::ObjectMeta};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
};
#[derive(Clone, Debug, Serialize)]
pub struct ClusterIssuerScore {
email: String,
server: String,
issuer_name: String,
namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for ClusterIssuerScore {
fn name(&self) -> String {
"ClusterIssuerScore".to_string()
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(ClusterIssuerInterpret {
score: self.clone(),
})
}
}
#[derive(Debug, Clone)]
pub struct ClusterIssuerInterpret {
score: ClusterIssuerScore,
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for ClusterIssuerInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
self.apply_cluster_issuer(topology.k8s_client().await.unwrap())
.await
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("ClusterIssuer")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl ClusterIssuerInterpret {
async fn validate_cert_manager(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let cert_manager = "cert-manager".to_string();
let operator_namespace = "openshift-operators".to_string();
match client
.get_deployment(&cert_manager, Some(&operator_namespace))
.await
{
Ok(Some(deployment)) => {
if let Some(status) = deployment.status {
let ready_count = status.ready_replicas.unwrap_or(0);
if ready_count >= 1 {
return Ok(Outcome::success(format!(
"'{}' is ready with {} replica(s).",
&cert_manager, ready_count
)));
} else {
return Err(InterpretError::new(
"cert-manager operator not ready in cluster".to_string(),
));
}
} else {
Err(InterpretError::new(format!(
"failed to get deployment status {} in ns {}",
&cert_manager, &operator_namespace
)))
}
}
Ok(None) => Err(InterpretError::new(format!(
"Deployment '{}' not found in namespace '{}'.",
&cert_manager, &operator_namespace
))),
Err(e) => Err(InterpretError::new(format!(
"Failed to query for deployment '{}': {}",
&cert_manager, e
))),
}
}
fn build_cluster_issuer(&self) -> Result<ClusterIssuer, InterpretError> {
let issuer_name = &self.score.issuer_name;
let email = &self.score.email;
let server = &self.score.server;
let namespace = &self.score.namespace;
let cluster_issuer = ClusterIssuer {
metadata: ObjectMeta {
name: Some(issuer_name.to_string()),
namespace: Some(namespace.to_string()),
..Default::default()
},
spec: ClusterIssuerSpec {
acme: AcmeSpec {
email: email.to_string(),
private_key_secret_ref: PrivateKeySecretRef {
name: issuer_name.to_string(),
},
server: server.to_string(),
solvers: vec![SolverSpec {
http01: Some(Http01Solver {
ingress: Http01Ingress {
class: "nginx".to_string(),
},
}),
}],
},
},
};
Ok(cluster_issuer)
}
pub async fn apply_cluster_issuer(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let namespace = self.score.namespace.clone();
self.validate_cert_manager(&client).await?;
let cluster_issuer = self.build_cluster_issuer().unwrap();
client
.apply_yaml(
&serde_yaml::to_value(cluster_issuer).unwrap(),
Some(&namespace),
)
.await?;
Ok(Outcome::success(format!(
"successfully deployed cluster operator: {} in namespace: {}",
self.score.issuer_name, self.score.namespace
)))
}
}
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "cert-manager.io",
version = "v1",
kind = "ClusterIssuer",
plural = "clusterissuers"
)]
#[serde(rename_all = "camelCase")]
pub struct ClusterIssuerSpec {
pub acme: AcmeSpec,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct AcmeSpec {
pub email: String,
pub private_key_secret_ref: PrivateKeySecretRef,
pub server: String,
pub solvers: Vec<SolverSpec>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct PrivateKeySecretRef {
pub name: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct SolverSpec {
pub http01: Option<Http01Solver>,
// Other solver types (e.g., dns01) would go here as Options
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Http01Solver {
pub ingress: Http01Ingress,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Http01Ingress {
pub class: String,
}

View File

@ -1,2 +1,3 @@
pub mod cluster_issuer;
mod helm;
pub use helm::*;

View File

@ -1,21 +1,23 @@
use std::sync::Arc;
use async_trait::async_trait;
use log::debug;
use serde::Serialize;
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
interpret::Interpret,
modules::{
application::Application,
monitoring::kube_prometheus::crd::crd_alertmanager_config::CRDPrometheus,
prometheus::prometheus::PrometheusApplicationMonitoring,
monitoring::{
grafana::grafana::Grafana, kube_prometheus::crd::crd_alertmanager_config::CRDPrometheus,
},
prometheus::prometheus::PrometheusMonitoring,
},
score::Score,
topology::{PreparationOutcome, Topology, oberservability::monitoring::AlertReceiver},
topology::{
K8sclient, Topology,
oberservability::monitoring::{AlertReceiver, AlertingInterpret, ScrapeTarget},
},
};
use harmony_types::id::Id;
#[derive(Debug, Clone, Serialize)]
pub struct ApplicationMonitoringScore {
@ -24,12 +26,16 @@ pub struct ApplicationMonitoringScore {
pub receivers: Vec<Box<dyn AlertReceiver<CRDPrometheus>>>,
}
impl<T: Topology + PrometheusApplicationMonitoring<CRDPrometheus>> Score<T>
impl<T: Topology + PrometheusMonitoring<CRDPrometheus> + K8sclient + Grafana> Score<T>
for ApplicationMonitoringScore
{
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(ApplicationMonitoringInterpret {
score: self.clone(),
debug!("creating alerting interpret");
Box::new(AlertingInterpret {
sender: self.sender.clone(),
receivers: self.receivers.clone(),
rules: vec![],
scrape_targets: None,
})
}
@ -40,55 +46,3 @@ impl<T: Topology + PrometheusApplicationMonitoring<CRDPrometheus>> Score<T>
)
}
}
#[derive(Debug)]
pub struct ApplicationMonitoringInterpret {
score: ApplicationMonitoringScore,
}
#[async_trait]
impl<T: Topology + PrometheusApplicationMonitoring<CRDPrometheus>> Interpret<T>
for ApplicationMonitoringInterpret
{
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let result = topology
.install_prometheus(
&self.score.sender,
inventory,
Some(self.score.receivers.clone()),
)
.await;
match result {
Ok(outcome) => match outcome {
PreparationOutcome::Success { details: _ } => {
Ok(Outcome::success("Prometheus installed".into()))
}
PreparationOutcome::Noop => {
Ok(Outcome::noop("Prometheus installation skipped".into()))
}
},
Err(err) => Err(InterpretError::from(err)),
}
}
fn get_name(&self) -> InterpretName {
InterpretName::ApplicationMonitoring
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@ -12,7 +12,7 @@ use crate::{
monitoring::kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus, rhob_alertmanager_config::RHOBObservability,
},
prometheus::prometheus::PrometheusApplicationMonitoring,
prometheus::prometheus::PrometheusMonitoring,
},
score::Score,
topology::{PreparationOutcome, Topology, oberservability::monitoring::AlertReceiver},
@ -26,7 +26,7 @@ pub struct ApplicationRHOBMonitoringScore {
pub receivers: Vec<Box<dyn AlertReceiver<RHOBObservability>>>,
}
impl<T: Topology + PrometheusApplicationMonitoring<RHOBObservability>> Score<T>
impl<T: Topology + PrometheusMonitoring<RHOBObservability>> Score<T>
for ApplicationRHOBMonitoringScore
{
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
@ -49,7 +49,7 @@ pub struct ApplicationRHOBMonitoringInterpret {
}
#[async_trait]
impl<T: Topology + PrometheusApplicationMonitoring<RHOBObservability>> Interpret<T>
impl<T: Topology + PrometheusMonitoring<RHOBObservability>> Interpret<T>
for ApplicationRHOBMonitoringInterpret
{
async fn execute(

View File

@ -0,0 +1,17 @@
use async_trait::async_trait;
use k8s_openapi::Resource;
use crate::{
inventory::Inventory,
topology::{PreparationError, PreparationOutcome},
};
#[async_trait]
pub trait Grafana {
async fn ensure_grafana_operator(
&self,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError>;
async fn install_grafana(&self) -> Result<PreparationOutcome, PreparationError>;
}

View File

@ -1,27 +1,28 @@
use harmony_macros::hurl;
use non_blank_string_rs::NonBlankString;
use std::str::FromStr;
use std::{collections::HashMap, str::FromStr};
use crate::modules::helm::chart::HelmChartScore;
pub fn grafana_helm_chart_score(ns: &str) -> HelmChartScore {
let values = r#"
rbac:
namespaced: true
sidecar:
dashboards:
enabled: true
"#
.to_string();
use crate::modules::helm::chart::{HelmChartScore, HelmRepository};
pub fn grafana_helm_chart_score(ns: &str, namespace_scope: bool) -> HelmChartScore {
let mut values_overrides = HashMap::new();
values_overrides.insert(
NonBlankString::from_str("namespaceScope").unwrap(),
namespace_scope.to_string(),
);
HelmChartScore {
namespace: Some(NonBlankString::from_str(ns).unwrap()),
release_name: NonBlankString::from_str("grafana").unwrap(),
chart_name: NonBlankString::from_str("oci://ghcr.io/grafana/helm-charts/grafana").unwrap(),
release_name: NonBlankString::from_str("grafana-operator").unwrap(),
chart_name: NonBlankString::from_str("grafana/grafana-operator").unwrap(),
chart_version: None,
values_overrides: None,
values_yaml: Some(values.to_string()),
values_overrides: Some(values_overrides),
values_yaml: None,
create_namespace: true,
install_only: true,
repository: None,
repository: Some(HelmRepository::new(
"grafana".to_string(),
hurl!("https://grafana.github.io/helm-charts"),
true,
)),
}
}

View File

@ -1 +1,2 @@
pub mod grafana;
pub mod helm;

View File

@ -1,12 +1,25 @@
use std::sync::Arc;
use async_trait::async_trait;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::topology::{
k8s::K8sClient,
oberservability::monitoring::{AlertReceiver, AlertSender},
use crate::{
interpret::{InterpretError, Outcome},
inventory::Inventory,
modules::{
monitoring::{
grafana::grafana::Grafana, kube_prometheus::crd::service_monitor::ServiceMonitor,
},
prometheus::prometheus::PrometheusMonitoring,
},
topology::{
K8sclient, Topology,
installable::Installable,
k8s::K8sClient,
oberservability::monitoring::{AlertReceiver, AlertSender, ScrapeTarget},
},
};
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
@ -26,6 +39,7 @@ pub struct AlertmanagerConfigSpec {
pub struct CRDPrometheus {
pub namespace: String,
pub client: Arc<K8sClient>,
pub service_monitor: Vec<ServiceMonitor>,
}
impl AlertSender for CRDPrometheus {
@ -40,6 +54,12 @@ impl Clone for Box<dyn AlertReceiver<CRDPrometheus>> {
}
}
impl Clone for Box<dyn ScrapeTarget<CRDPrometheus>> {
fn clone(&self) -> Self {
self.clone_box()
}
}
impl Serialize for Box<dyn AlertReceiver<CRDPrometheus>> {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
@ -48,3 +68,24 @@ impl Serialize for Box<dyn AlertReceiver<CRDPrometheus>> {
todo!()
}
}
#[async_trait]
impl<T: Topology + K8sclient + PrometheusMonitoring<CRDPrometheus> + Grafana> Installable<T>
for CRDPrometheus
{
async fn configure(&self, inventory: &Inventory, topology: &T) -> Result<(), InterpretError> {
topology.ensure_grafana_operator(inventory).await?;
topology.ensure_prometheus_operator(self, inventory).await?;
Ok(())
}
async fn ensure_installed(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<(), InterpretError> {
topology.install_grafana().await?;
topology.install_prometheus(&self, inventory, None).await?;
Ok(())
}
}

View File

@ -103,9 +103,34 @@ pub struct GrafanaDashboardSpec {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub resync_period: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub datasources: Option<Vec<GrafanaDashboardDatasource>>,
pub instance_selector: LabelSelector,
pub json: String,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub json: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub grafana_com: Option<GrafanaCom>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaDashboardDatasource {
pub input_name: String,
pub datasource_name: String,
}
// ------------------------------------------------------------------------------------------------
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaCom {
pub id: u32,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub revision: Option<u32>,
}
// ------------------------------------------------------------------------------------------------
@ -126,20 +151,79 @@ pub struct GrafanaDatasourceSpec {
pub allow_cross_namespace_import: Option<bool>,
pub datasource: GrafanaDatasourceConfig,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub values_from: Option<Vec<GrafanaValueFrom>>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaValueFrom {
pub target_path: String,
pub value_from: GrafanaValueSource,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaValueSource {
pub secret_key_ref: GrafanaSecretKeyRef,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaSecretKeyRef {
pub name: String,
pub key: String,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaDatasourceConfig {
pub access: String,
pub database: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub json_data: Option<BTreeMap<String, String>>,
pub database: Option<String>,
pub name: String,
pub r#type: String,
pub url: String,
/// Represents jsonData in the GrafanaDatasource spec
#[serde(default, skip_serializing_if = "Option::is_none")]
pub json_data: Option<GrafanaDatasourceJsonData>,
/// Represents secureJsonData (secrets)
#[serde(default, skip_serializing_if = "Option::is_none")]
pub secure_json_data: Option<GrafanaDatasourceSecureJsonData>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub is_default: Option<bool>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub editable: Option<bool>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaDatasourceJsonData {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub time_interval: Option<String>,
#[serde(default, skip_serializing_if = "Option::is_none")]
pub http_header_name1: Option<String>,
/// Disable TLS skip verification (false = verify)
#[serde(default, skip_serializing_if = "Option::is_none")]
pub tls_skip_verify: Option<bool>,
/// Auth type - set to "forward" for OpenShift OAuth identity
#[serde(default, skip_serializing_if = "Option::is_none")]
pub oauth_pass_thru: Option<bool>,
}
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrafanaDatasourceSecureJsonData {
#[serde(default, skip_serializing_if = "Option::is_none")]
pub http_header_value1: Option<String>,
}
// ------------------------------------------------------------------------------------------------
#[derive(Serialize, Deserialize, Debug, Clone, JsonSchema, Default)]

View File

@ -0,0 +1,187 @@
use std::net::IpAddr;
use async_trait::async_trait;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::{
modules::monitoring::kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus, crd_prometheuses::LabelSelector,
},
topology::oberservability::monitoring::ScrapeTarget,
};
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "monitoring.coreos.com",
version = "v1alpha1",
kind = "ScrapeConfig",
plural = "scrapeconfigs",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct ScrapeConfigSpec {
/// List of static configurations.
pub static_configs: Option<Vec<StaticConfig>>,
/// Kubernetes service discovery.
pub kubernetes_sd_configs: Option<Vec<KubernetesSDConfig>>,
/// HTTP-based service discovery.
pub http_sd_configs: Option<Vec<HttpSDConfig>>,
/// File-based service discovery.
pub file_sd_configs: Option<Vec<FileSDConfig>>,
/// DNS-based service discovery.
pub dns_sd_configs: Option<Vec<DnsSDConfig>>,
/// Consul service discovery.
pub consul_sd_configs: Option<Vec<ConsulSDConfig>>,
/// Relabeling configuration applied to discovered targets.
pub relabel_configs: Option<Vec<RelabelConfig>>,
/// Metric relabeling configuration applied to scraped samples.
pub metric_relabel_configs: Option<Vec<RelabelConfig>>,
/// Path to scrape metrics from (defaults to `/metrics`).
pub metrics_path: Option<String>,
/// Interval at which Prometheus scrapes targets (e.g., "30s").
pub scrape_interval: Option<String>,
/// Timeout for scraping (e.g., "10s").
pub scrape_timeout: Option<String>,
/// Optional job name override.
pub job_name: Option<String>,
/// Optional scheme (http or https).
pub scheme: Option<String>,
/// Authorization paramaters for snmp walk
pub params: Option<Params>,
}
/// Static configuration section of a ScrapeConfig.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct StaticConfig {
pub targets: Vec<String>,
pub labels: Option<LabelSelector>,
}
/// Relabeling configuration for target or metric relabeling.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct RelabelConfig {
pub source_labels: Option<Vec<String>>,
pub separator: Option<String>,
pub target_label: Option<String>,
pub regex: Option<String>,
pub modulus: Option<u64>,
pub replacement: Option<String>,
pub action: Option<String>,
}
/// Kubernetes service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct KubernetesSDConfig {
///"pod", "service", "endpoints"pub role: String,
pub namespaces: Option<NamespaceSelector>,
pub selectors: Option<Vec<LabelSelector>>,
pub api_server: Option<String>,
pub bearer_token_file: Option<String>,
pub tls_config: Option<TLSConfig>,
}
/// Namespace selector for Kubernetes service discovery.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct NamespaceSelector {
pub any: Option<bool>,
pub match_names: Option<Vec<String>>,
}
/// HTTP-based service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct HttpSDConfig {
pub url: String,
pub refresh_interval: Option<String>,
pub basic_auth: Option<BasicAuth>,
pub authorization: Option<Authorization>,
pub tls_config: Option<TLSConfig>,
}
/// File-based service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct FileSDConfig {
pub files: Vec<String>,
pub refresh_interval: Option<String>,
}
/// DNS-based service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct DnsSDConfig {
pub names: Vec<String>,
pub refresh_interval: Option<String>,
pub type_: Option<String>, // SRV, A, AAAA
pub port: Option<u16>,
}
/// Consul service discovery configuration.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ConsulSDConfig {
pub server: String,
pub services: Option<Vec<String>>,
pub scheme: Option<String>,
pub datacenter: Option<String>,
pub tag_separator: Option<String>,
pub refresh_interval: Option<String>,
pub tls_config: Option<TLSConfig>,
}
/// Basic authentication credentials.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct BasicAuth {
pub username: String,
pub password: Option<String>,
pub password_file: Option<String>,
}
/// Bearer token or other auth mechanisms.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Authorization {
pub credentials: Option<String>,
pub credentials_file: Option<String>,
pub type_: Option<String>,
}
/// TLS configuration for secure scraping.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct TLSConfig {
pub ca_file: Option<String>,
pub cert_file: Option<String>,
pub key_file: Option<String>,
pub server_name: Option<String>,
pub insecure_skip_verify: Option<bool>,
}
/// Authorization parameters for SNMP walk.
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Params {
pub auth: Option<Vec<String>>,
pub module: Option<Vec<String>>,
}

View File

@ -4,6 +4,7 @@ pub mod crd_default_rules;
pub mod crd_grafana;
pub mod crd_prometheus_rules;
pub mod crd_prometheuses;
pub mod crd_scrape_config;
pub mod grafana_default_dashboard;
pub mod grafana_operator;
pub mod prometheus_operator;

View File

@ -31,6 +31,7 @@ impl<T: Topology + HelmCommand + TenantManager> Score<T> for HelmPrometheusAlert
sender: KubePrometheus { config },
receivers: self.receivers.clone(),
rules: self.rules.clone(),
scrape_targets: None,
})
}
fn name(&self) -> String {

View File

@ -6,3 +6,4 @@ pub mod kube_prometheus;
pub mod ntfy;
pub mod okd;
pub mod prometheus;
pub mod scrape_target;

View File

@ -114,7 +114,7 @@ impl Prometheus {
};
if let Some(ns) = namespace.as_deref() {
grafana_helm_chart_score(ns)
grafana_helm_chart_score(ns, false)
.interpret(inventory, topology)
.await
} else {

View File

@ -0,0 +1 @@
pub mod server;

View File

@ -0,0 +1,80 @@
use std::net::IpAddr;
use async_trait::async_trait;
use kube::api::ObjectMeta;
use serde::Serialize;
use crate::{
interpret::{InterpretError, Outcome},
modules::monitoring::kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus,
crd_scrape_config::{Params, RelabelConfig, ScrapeConfig, ScrapeConfigSpec, StaticConfig},
},
topology::oberservability::monitoring::ScrapeTarget,
};
#[derive(Debug, Clone, Serialize)]
pub struct Server {
pub name: String,
pub ip: IpAddr,
pub auth: String,
pub module: String,
pub domain: String,
}
#[async_trait]
impl ScrapeTarget<CRDPrometheus> for Server {
async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> {
let scrape_config_spec = ScrapeConfigSpec {
static_configs: Some(vec![StaticConfig {
targets: vec![self.ip.to_string()],
labels: None,
}]),
scrape_interval: Some("2m".to_string()),
kubernetes_sd_configs: None,
http_sd_configs: None,
file_sd_configs: None,
dns_sd_configs: None,
params: Some(Params {
auth: Some(vec![self.auth.clone()]),
module: Some(vec![self.module.clone()]),
}),
consul_sd_configs: None,
relabel_configs: Some(vec![RelabelConfig {
action: None,
source_labels: Some(vec!["__address__".to_string()]),
separator: None,
target_label: Some("__param_target".to_string()),
regex: None,
replacement: Some(format!("snmp.{}:31080", self.domain.clone())),
modulus: None,
}]),
metric_relabel_configs: None,
metrics_path: Some("/snmp".to_string()),
scrape_timeout: Some("2m".to_string()),
job_name: Some(format!("snmp_exporter/cloud/{}", self.name.clone())),
scheme: None,
};
let scrape_config = ScrapeConfig {
metadata: ObjectMeta {
name: Some(self.name.clone()),
namespace: Some(sender.namespace.clone()),
..Default::default()
},
spec: scrape_config_spec,
};
sender
.client
.apply(&scrape_config, Some(&sender.namespace.clone()))
.await?;
Ok(Outcome::success(format!(
"installed scrape target {}",
self.name.clone()
)))
}
fn clone_box(&self) -> Box<dyn ScrapeTarget<CRDPrometheus>> {
Box::new(self.clone())
}
}

View File

@ -215,7 +215,7 @@ impl OKDSetup03ControlPlaneInterpret {
) -> Result<(), InterpretError> {
info!("[ControlPlane] Ensuring persistent bonding");
let score = HostNetworkConfigurationScore {
hosts: hosts.clone(), // FIXME: Avoid clone if possible
hosts: hosts.clone(),
};
score.interpret(inventory, topology).await?;

View File

@ -77,6 +77,8 @@ impl OKDBootstrapLoadBalancerScore {
address: topology.bootstrap_host.ip.to_string(),
port,
});
backend.dedup();
backend
}
}

View File

@ -240,7 +240,7 @@ pub struct OvsPortSpec {
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]
#[serde(rename_all = "kebab-case")]
pub struct EthtoolSpec {
// FIXME: Properly describe this spec (https://nmstate.io/devel/yaml_api.html#ethtool)
// TODO: Properly describe this spec (https://nmstate.io/devel/yaml_api.html#ethtool)
}
#[derive(Deserialize, Serialize, Clone, Debug, Default, JsonSchema)]

View File

@ -50,6 +50,7 @@ impl HostNetworkConfigurationInterpret {
Ok(())
}
async fn collect_switch_ports_for_host<T: Topology + Switch>(
&self,
topology: &T,
@ -125,7 +126,6 @@ impl<T: Topology + Switch> Interpret<T> for HostNetworkConfigurationInterpret {
let mut configured_host_count = 0;
for host in &self.score.hosts {
// FIXME: Clear the previous config for host
self.configure_network_for_host(topology, host).await?;
configured_host_count += 1;
}
@ -283,7 +283,6 @@ mod tests {
#[tokio::test]
async fn port_not_found_for_mac_address_should_not_configure_interface() {
// FIXME: Should it still configure an empty bond/port channel?
let score = given_score(vec![given_host(&HOST_ID, vec![UNKNOWN_INTERFACE.clone()])]);
let topology = TopologyWithSwitch::new_port_not_found();

View File

@ -12,7 +12,8 @@ use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::C
use crate::modules::monitoring::kube_prometheus::crd::crd_default_rules::build_default_application_rules;
use crate::modules::monitoring::kube_prometheus::crd::crd_grafana::{
Grafana, GrafanaDashboard, GrafanaDashboardSpec, GrafanaDatasource, GrafanaDatasourceConfig,
GrafanaDatasourceSpec, GrafanaSpec,
GrafanaDatasourceJsonData, GrafanaDatasourceSpec, GrafanaSecretKeyRef, GrafanaSpec,
GrafanaValueFrom, GrafanaValueSource,
};
use crate::modules::monitoring::kube_prometheus::crd::crd_prometheus_rules::{
PrometheusRule, PrometheusRuleSpec, RuleGroup,
@ -39,7 +40,7 @@ use crate::{
};
use harmony_types::id::Id;
use super::prometheus::PrometheusApplicationMonitoring;
use super::prometheus::PrometheusMonitoring;
#[derive(Clone, Debug, Serialize)]
pub struct K8sPrometheusCRDAlertingScore {
@ -49,7 +50,7 @@ pub struct K8sPrometheusCRDAlertingScore {
pub prometheus_rules: Vec<RuleGroup>,
}
impl<T: Topology + K8sclient + PrometheusApplicationMonitoring<CRDPrometheus>> Score<T>
impl<T: Topology + K8sclient + PrometheusMonitoring<CRDPrometheus>> Score<T>
for K8sPrometheusCRDAlertingScore
{
fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> {
@ -75,7 +76,7 @@ pub struct K8sPrometheusCRDAlertingInterpret {
}
#[async_trait]
impl<T: Topology + K8sclient + PrometheusApplicationMonitoring<CRDPrometheus>> Interpret<T>
impl<T: Topology + K8sclient + PrometheusMonitoring<CRDPrometheus>> Interpret<T>
for K8sPrometheusCRDAlertingInterpret
{
async fn execute(
@ -466,10 +467,13 @@ impl K8sPrometheusCRDAlertingInterpret {
match_labels: label.clone(),
match_expressions: vec![],
};
let mut json_data = BTreeMap::new();
json_data.insert("timeInterval".to_string(), "5s".to_string());
let namespace = self.sender.namespace.clone();
let json_data = GrafanaDatasourceJsonData {
time_interval: Some("5s".to_string()),
http_header_name1: None,
tls_skip_verify: Some(true),
oauth_pass_thru: Some(true),
};
let json = build_default_dashboard(&namespace);
let graf_data_source = GrafanaDatasource {
@ -495,7 +499,11 @@ impl K8sPrometheusCRDAlertingInterpret {
"http://prometheus-operated.{}.svc.cluster.local:9090",
self.sender.namespace.clone()
),
secure_json_data: None,
is_default: None,
editable: None,
},
values_from: None,
},
};
@ -516,7 +524,9 @@ impl K8sPrometheusCRDAlertingInterpret {
spec: GrafanaDashboardSpec {
resync_period: Some("30s".to_string()),
instance_selector: labels.clone(),
json,
json: Some(json),
grafana_com: None,
datasources: None,
},
};

View File

@ -9,11 +9,17 @@ use crate::{
};
#[async_trait]
pub trait PrometheusApplicationMonitoring<S: AlertSender> {
pub trait PrometheusMonitoring<S: AlertSender> {
async fn install_prometheus(
&self,
sender: &S,
inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<S>>>>,
) -> Result<PreparationOutcome, PreparationError>;
async fn ensure_prometheus_operator(
&self,
sender: &S,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError>;
}

View File

@ -38,7 +38,7 @@ use crate::{
};
use harmony_types::id::Id;
use super::prometheus::PrometheusApplicationMonitoring;
use super::prometheus::PrometheusMonitoring;
#[derive(Clone, Debug, Serialize)]
pub struct RHOBAlertingScore {
@ -48,8 +48,8 @@ pub struct RHOBAlertingScore {
pub prometheus_rules: Vec<RuleGroup>,
}
impl<T: Topology + K8sclient + Ingress + PrometheusApplicationMonitoring<RHOBObservability>>
Score<T> for RHOBAlertingScore
impl<T: Topology + K8sclient + Ingress + PrometheusMonitoring<RHOBObservability>> Score<T>
for RHOBAlertingScore
{
fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> {
Box::new(RHOBAlertingInterpret {
@ -74,8 +74,8 @@ pub struct RHOBAlertingInterpret {
}
#[async_trait]
impl<T: Topology + K8sclient + Ingress + PrometheusApplicationMonitoring<RHOBObservability>>
Interpret<T> for RHOBAlertingInterpret
impl<T: Topology + K8sclient + Ingress + PrometheusMonitoring<RHOBObservability>> Interpret<T>
for RHOBAlertingInterpret
{
async fn execute(
&self,

View File

@ -4,7 +4,7 @@ use std::{
};
use async_trait::async_trait;
use log::{info, warn};
use log::{debug, warn};
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
@ -19,8 +19,8 @@ use harmony_types::id::Id;
#[derive(Debug, Clone, Serialize)]
pub struct CephRemoveOsd {
osd_deployment_name: String,
rook_ceph_namespace: String,
pub osd_deployment_name: String,
pub rook_ceph_namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
@ -54,18 +54,17 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
self.verify_deployment_scaled(client.clone()).await?;
self.delete_deployment(client.clone()).await?;
self.verify_deployment_deleted(client.clone()).await?;
let osd_id_full = self.get_ceph_osd_id().unwrap();
self.purge_ceph_osd(client.clone(), &osd_id_full).await?;
self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
.await?;
self.purge_ceph_osd(client.clone()).await?;
self.verify_ceph_osd_removal(client.clone()).await?;
let osd_id_full = self.get_ceph_osd_id().unwrap();
Ok(Outcome::success(format!(
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
osd_id_full, self.score.osd_deployment_name
)))
}
fn get_name(&self) -> InterpretName {
todo!()
InterpretName::CephRemoveOsd
}
fn get_version(&self) -> Version {
@ -82,7 +81,7 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
}
impl CephRemoveOsdInterpret {
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
pub fn get_ceph_osd_id_numeric(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self
.score
.osd_deployment_name
@ -94,9 +93,14 @@ impl CephRemoveOsdInterpret {
self.score.osd_deployment_name
))
})?;
Ok(osd_id_numeric.to_string())
}
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
let osd_id_full = format!("osd.{}", osd_id_numeric);
info!(
debug!(
"Targeting Ceph OSD: {} (parsed from deployment {})",
osd_id_full, self.score.osd_deployment_name
);
@ -108,6 +112,7 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
debug!("verifying toolbox exists");
let toolbox_dep = "rook-ceph-tools".to_string();
match client
@ -149,7 +154,7 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
debug!(
"Scaling down OSD deployment: {}",
self.score.osd_deployment_name
);
@ -172,7 +177,7 @@ impl CephRemoveOsdInterpret {
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
debug!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
@ -180,11 +185,9 @@ impl CephRemoveOsdInterpret {
Some(&self.score.rook_ceph_namespace),
)
.await?;
if let Some(deployment) = dep {
if let Some(status) = deployment.status {
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0
{
if status.replicas == None && status.ready_replicas == None {
return Ok(Outcome::success(
"Deployment successfully scaled down.".to_string(),
));
@ -212,7 +215,7 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
debug!(
"Deleting OSD deployment: {}",
self.score.osd_deployment_name
);
@ -234,7 +237,7 @@ impl CephRemoveOsdInterpret {
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
debug!("Verifying OSD deployment deleted");
loop {
let dep = client
.get_deployment(
@ -244,7 +247,7 @@ impl CephRemoveOsdInterpret {
.await?;
if dep.is_none() {
info!(
debug!(
"Deployment {} successfully deleted.",
self.score.osd_deployment_name
);
@ -276,12 +279,10 @@ impl CephRemoveOsdInterpret {
Ok(tree)
}
pub async fn purge_ceph_osd(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
info!(
pub async fn purge_ceph_osd(&self, client: Arc<K8sClient>) -> Result<Outcome, InterpretError> {
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
let osd_id_full = self.get_ceph_osd_id().unwrap();
debug!(
"Purging OSD {} from Ceph cluster and removing its auth key",
osd_id_full
);
@ -291,8 +292,9 @@ impl CephRemoveOsdInterpret {
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec![
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(),
format!("ceph auth del osd.{osd_id_full}").as_str(),
"sh",
"-c",
format!("ceph osd purge {osd_id_numeric} --yes-i-really-mean-it && ceph auth del {osd_id_full}").as_str(),
],
)
.await?;
@ -305,10 +307,10 @@ impl CephRemoveOsdInterpret {
pub async fn verify_ceph_osd_removal(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!(
let osd_id_full = self.get_ceph_osd_id().unwrap();
debug!(
"Verifying OSD {} has been removed from the Ceph tree...",
osd_id_full
);
@ -318,7 +320,7 @@ impl CephRemoveOsdInterpret {
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec!["ceph osd tree -f json"],
vec!["sh", "-c", "ceph osd tree -f json"],
)
.await?;
let tree =

View File

@ -1,2 +1,2 @@
pub mod ceph_osd_replacement_score;
pub mod ceph_remove_osd_score;
pub mod ceph_validate_health_score;

View File

@ -77,7 +77,7 @@ impl YaSerializeTrait for HAProxyId {
}
}
#[derive(PartialEq, Debug)]
#[derive(PartialEq, Debug, Clone)]
pub struct HAProxyId(String);
impl Default for HAProxyId {
@ -297,7 +297,7 @@ pub struct HAProxyFrontends {
pub frontend: Vec<Frontend>,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct Frontend {
#[yaserde(attribute = true)]
pub uuid: String,
@ -310,7 +310,7 @@ pub struct Frontend {
pub bind_options: MaybeString,
pub mode: String,
#[yaserde(rename = "defaultBackend")]
pub default_backend: String,
pub default_backend: Option<String>,
pub ssl_enabled: i32,
pub ssl_certificates: MaybeString,
pub ssl_default_certificate: MaybeString,
@ -416,7 +416,7 @@ pub struct HAProxyBackends {
pub backends: Vec<HAProxyBackend>,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct HAProxyBackend {
#[yaserde(attribute = true, rename = "uuid")]
pub uuid: String,
@ -535,7 +535,7 @@ pub struct HAProxyServers {
pub servers: Vec<HAProxyServer>,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct HAProxyServer {
#[yaserde(attribute = true, rename = "uuid")]
pub uuid: String,
@ -543,8 +543,8 @@ pub struct HAProxyServer {
pub enabled: u8,
pub name: String,
pub description: MaybeString,
pub address: String,
pub port: u16,
pub address: Option<String>,
pub port: Option<u16>,
pub checkport: MaybeString,
pub mode: String,
pub multiplexer_protocol: MaybeString,
@ -589,7 +589,7 @@ pub struct HAProxyHealthChecks {
pub healthchecks: Vec<HAProxyHealthCheck>,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
#[derive(Clone, Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct HAProxyHealthCheck {
#[yaserde(attribute = true)]
pub uuid: String,

View File

@ -25,6 +25,7 @@ sha2 = "0.10.9"
[dev-dependencies]
pretty_assertions.workspace = true
assertor.workspace = true
[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(e2e_test)'] }

View File

@ -30,8 +30,7 @@ impl SshConfigManager {
self.opnsense_shell
.exec(&format!(
"cp /conf/config.xml /conf/backup/{}",
backup_filename
"cp /conf/config.xml /conf/backup/{backup_filename}"
))
.await
}

View File

@ -1,9 +1,7 @@
mod ssh;
pub use ssh::*;
use async_trait::async_trait;
use crate::Error;
use async_trait::async_trait;
pub use ssh::*;
#[async_trait]
pub trait OPNsenseShell: std::fmt::Debug + Send + Sync {

View File

@ -1,11 +1,8 @@
use std::sync::Arc;
use log::warn;
use crate::{config::OPNsenseShell, Error};
use opnsense_config_xml::{
Frontend, HAProxy, HAProxyBackend, HAProxyHealthCheck, HAProxyServer, OPNsense,
};
use crate::{config::OPNsenseShell, Error};
use std::{collections::HashSet, sync::Arc};
pub struct LoadBalancerConfig<'a> {
opnsense: &'a mut OPNsense,
@ -31,7 +28,7 @@ impl<'a> LoadBalancerConfig<'a> {
match &mut self.opnsense.opnsense.haproxy.as_mut() {
Some(haproxy) => f(haproxy),
None => unimplemented!(
"Adding a backend is not supported when haproxy config does not exist yet"
"Cannot configure load balancer when haproxy config does not exist yet"
),
}
}
@ -40,21 +37,67 @@ impl<'a> LoadBalancerConfig<'a> {
self.with_haproxy(|haproxy| haproxy.general.enabled = enabled as i32);
}
pub fn add_backend(&mut self, backend: HAProxyBackend) {
warn!("TODO make sure this new backend does not refer non-existing entities like servers or health checks");
self.with_haproxy(|haproxy| haproxy.backends.backends.push(backend));
/// Configures a service by removing any existing service on the same port
/// and then adding the new definition. This ensures idempotency.
pub fn configure_service(
&mut self,
frontend: Frontend,
backend: HAProxyBackend,
servers: Vec<HAProxyServer>,
healthcheck: Option<HAProxyHealthCheck>,
) {
self.remove_service_by_bind_address(&frontend.bind);
self.remove_servers(&servers);
self.add_new_service(frontend, backend, servers, healthcheck);
}
pub fn add_frontend(&mut self, frontend: Frontend) {
self.with_haproxy(|haproxy| haproxy.frontends.frontend.push(frontend));
// Remove the corresponding real servers based on their name if they already exist.
fn remove_servers(&mut self, servers: &[HAProxyServer]) {
let server_names: HashSet<_> = servers.iter().map(|s| s.name.clone()).collect();
self.with_haproxy(|haproxy| {
haproxy
.servers
.servers
.retain(|s| !server_names.contains(&s.name));
});
}
pub fn add_healthcheck(&mut self, healthcheck: HAProxyHealthCheck) {
self.with_haproxy(|haproxy| haproxy.healthchecks.healthchecks.push(healthcheck));
/// Removes a service and its dependent components based on the frontend's bind address.
/// This performs a cascading delete of the frontend, backend, servers, and health check.
fn remove_service_by_bind_address(&mut self, bind_address: &str) {
self.with_haproxy(|haproxy| {
let Some(old_frontend) = remove_frontend_by_bind_address(haproxy, bind_address) else {
return;
};
let Some(old_backend) = remove_backend(haproxy, old_frontend) else {
return;
};
remove_healthcheck(haproxy, &old_backend);
remove_linked_servers(haproxy, &old_backend);
});
}
pub fn add_servers(&mut self, mut servers: Vec<HAProxyServer>) {
self.with_haproxy(|haproxy| haproxy.servers.servers.append(&mut servers));
/// Adds the components of a new service to the HAProxy configuration.
/// This function de-duplicates servers by name to prevent configuration errors.
fn add_new_service(
&mut self,
frontend: Frontend,
backend: HAProxyBackend,
servers: Vec<HAProxyServer>,
healthcheck: Option<HAProxyHealthCheck>,
) {
self.with_haproxy(|haproxy| {
if let Some(check) = healthcheck {
haproxy.healthchecks.healthchecks.push(check);
}
haproxy.servers.servers.extend(servers);
haproxy.backends.backends.push(backend);
haproxy.frontends.frontend.push(frontend);
});
}
pub async fn reload_restart(&self) -> Result<(), Error> {
@ -82,3 +125,262 @@ impl<'a> LoadBalancerConfig<'a> {
Ok(())
}
}
fn remove_frontend_by_bind_address(haproxy: &mut HAProxy, bind_address: &str) -> Option<Frontend> {
let pos = haproxy
.frontends
.frontend
.iter()
.position(|f| f.bind == bind_address);
match pos {
Some(pos) => Some(haproxy.frontends.frontend.remove(pos)),
None => None,
}
}
fn remove_backend(haproxy: &mut HAProxy, old_frontend: Frontend) -> Option<HAProxyBackend> {
let default_backend = old_frontend.default_backend?;
let pos = haproxy
.backends
.backends
.iter()
.position(|b| b.uuid == default_backend);
match pos {
Some(pos) => Some(haproxy.backends.backends.remove(pos)),
None => None, // orphaned frontend, shouldn't happen
}
}
fn remove_healthcheck(haproxy: &mut HAProxy, backend: &HAProxyBackend) {
if let Some(uuid) = &backend.health_check.content {
haproxy
.healthchecks
.healthchecks
.retain(|h| h.uuid != *uuid);
}
}
/// Remove the backend's servers. This assumes servers are not shared between services.
fn remove_linked_servers(haproxy: &mut HAProxy, backend: &HAProxyBackend) {
if let Some(server_uuids_str) = &backend.linked_servers.content {
let server_uuids_to_remove: HashSet<_> = server_uuids_str.split(',').collect();
haproxy
.servers
.servers
.retain(|s| !server_uuids_to_remove.contains(s.uuid.as_str()));
}
}
#[cfg(test)]
mod tests {
use crate::config::DummyOPNSenseShell;
use assertor::*;
use opnsense_config_xml::{
Frontend, HAProxy, HAProxyBackend, HAProxyBackends, HAProxyFrontends, HAProxyHealthCheck,
HAProxyHealthChecks, HAProxyId, HAProxyServer, HAProxyServers, MaybeString, OPNsense,
};
use std::sync::Arc;
use super::LoadBalancerConfig;
static SERVICE_BIND_ADDRESS: &str = "192.168.1.1:80";
static OTHER_SERVICE_BIND_ADDRESS: &str = "192.168.1.1:443";
static SERVER_ADDRESS: &str = "1.1.1.1:80";
static OTHER_SERVER_ADDRESS: &str = "1.1.1.1:443";
#[test]
fn configure_service_should_add_all_service_components_to_haproxy() {
let mut opnsense = given_opnsense();
let mut load_balancer = given_load_balancer(&mut opnsense);
let (healthcheck, servers, backend, frontend) =
given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS);
load_balancer.configure_service(
frontend.clone(),
backend.clone(),
servers.clone(),
Some(healthcheck.clone()),
);
assert_haproxy_configured_with(
opnsense,
vec![frontend],
vec![backend],
servers,
vec![healthcheck],
);
}
#[test]
fn configure_service_should_replace_service_on_same_bind_address() {
let (healthcheck, servers, backend, frontend) =
given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS);
let mut opnsense = given_opnsense_with(given_haproxy(
vec![frontend.clone()],
vec![backend.clone()],
servers.clone(),
vec![healthcheck.clone()],
));
let mut load_balancer = given_load_balancer(&mut opnsense);
let (updated_healthcheck, updated_servers, updated_backend, updated_frontend) =
given_service(SERVICE_BIND_ADDRESS, OTHER_SERVER_ADDRESS);
load_balancer.configure_service(
updated_frontend.clone(),
updated_backend.clone(),
updated_servers.clone(),
Some(updated_healthcheck.clone()),
);
assert_haproxy_configured_with(
opnsense,
vec![updated_frontend],
vec![updated_backend],
updated_servers,
vec![updated_healthcheck],
);
}
#[test]
fn configure_service_should_keep_existing_service_on_different_bind_addresses() {
let (healthcheck, servers, backend, frontend) =
given_service(SERVICE_BIND_ADDRESS, SERVER_ADDRESS);
let (other_healthcheck, other_servers, other_backend, other_frontend) =
given_service(OTHER_SERVICE_BIND_ADDRESS, OTHER_SERVER_ADDRESS);
let mut opnsense = given_opnsense_with(given_haproxy(
vec![frontend.clone()],
vec![backend.clone()],
servers.clone(),
vec![healthcheck.clone()],
));
let mut load_balancer = given_load_balancer(&mut opnsense);
load_balancer.configure_service(
other_frontend.clone(),
other_backend.clone(),
other_servers.clone(),
Some(other_healthcheck.clone()),
);
assert_haproxy_configured_with(
opnsense,
vec![frontend, other_frontend],
vec![backend, other_backend],
[servers, other_servers].concat(),
vec![healthcheck, other_healthcheck],
);
}
fn assert_haproxy_configured_with(
opnsense: OPNsense,
frontends: Vec<Frontend>,
backends: Vec<HAProxyBackend>,
servers: Vec<HAProxyServer>,
healthchecks: Vec<HAProxyHealthCheck>,
) {
let haproxy = opnsense.opnsense.haproxy.as_ref().unwrap();
assert_that!(haproxy.frontends.frontend).contains_exactly(frontends);
assert_that!(haproxy.backends.backends).contains_exactly(backends);
assert_that!(haproxy.servers.servers).is_equal_to(servers);
assert_that!(haproxy.healthchecks.healthchecks).contains_exactly(healthchecks);
}
fn given_opnsense() -> OPNsense {
OPNsense::default()
}
fn given_opnsense_with(haproxy: HAProxy) -> OPNsense {
let mut opnsense = OPNsense::default();
opnsense.opnsense.haproxy = Some(haproxy);
opnsense
}
fn given_load_balancer<'a>(opnsense: &'a mut OPNsense) -> LoadBalancerConfig<'a> {
let opnsense_shell = Arc::new(DummyOPNSenseShell {});
if opnsense.opnsense.haproxy.is_none() {
opnsense.opnsense.haproxy = Some(HAProxy::default());
}
LoadBalancerConfig::new(opnsense, opnsense_shell)
}
fn given_service(
bind_address: &str,
server_address: &str,
) -> (
HAProxyHealthCheck,
Vec<HAProxyServer>,
HAProxyBackend,
Frontend,
) {
let healthcheck = given_healthcheck();
let servers = vec![given_server(server_address)];
let backend = given_backend();
let frontend = given_frontend(bind_address);
(healthcheck, servers, backend, frontend)
}
fn given_haproxy(
frontends: Vec<Frontend>,
backends: Vec<HAProxyBackend>,
servers: Vec<HAProxyServer>,
healthchecks: Vec<HAProxyHealthCheck>,
) -> HAProxy {
HAProxy {
frontends: HAProxyFrontends {
frontend: frontends,
},
backends: HAProxyBackends { backends },
servers: HAProxyServers { servers },
healthchecks: HAProxyHealthChecks { healthchecks },
..Default::default()
}
}
fn given_frontend(bind_address: &str) -> Frontend {
Frontend {
uuid: "uuid".into(),
id: HAProxyId::default(),
enabled: 1,
name: format!("frontend_{bind_address}"),
bind: bind_address.into(),
default_backend: Some("backend-uuid".into()),
..Default::default()
}
}
fn given_backend() -> HAProxyBackend {
HAProxyBackend {
uuid: "backend-uuid".into(),
id: HAProxyId::default(),
enabled: 1,
name: "backend_192.168.1.1:80".into(),
linked_servers: MaybeString::from("server-uuid"),
health_check_enabled: 1,
health_check: MaybeString::from("healthcheck-uuid"),
..Default::default()
}
}
fn given_server(address: &str) -> HAProxyServer {
HAProxyServer {
uuid: "server-uuid".into(),
id: HAProxyId::default(),
name: address.into(),
address: Some(address.into()),
..Default::default()
}
}
fn given_healthcheck() -> HAProxyHealthCheck {
HAProxyHealthCheck {
uuid: "healthcheck-uuid".into(),
name: "healthcheck".into(),
..Default::default()
}
}
}