From d456a1f9eeb2373405abd82f55138b274a556dc0 Mon Sep 17 00:00:00 2001 From: Willem Date: Mon, 25 Aug 2025 13:30:32 -0400 Subject: [PATCH 1/3] feat: score to validate whether the ceph cluster is healthy --- .../validate_ceph_cluster_health/Cargo.toml | 11 ++ .../validate_ceph_cluster_health/src/main.rs | 14 ++ harmony/src/domain/interpret/mod.rs | 2 + .../ceph/ceph_validate_health_score.rs | 130 ++++++++++++++++++ 4 files changed, 157 insertions(+) create mode 100644 examples/validate_ceph_cluster_health/Cargo.toml create mode 100644 examples/validate_ceph_cluster_health/src/main.rs create mode 100644 harmony/src/modules/storage/ceph/ceph_validate_health_score.rs diff --git a/examples/validate_ceph_cluster_health/Cargo.toml b/examples/validate_ceph_cluster_health/Cargo.toml new file mode 100644 index 0000000..89a74b5 --- /dev/null +++ b/examples/validate_ceph_cluster_health/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "example_validate_ceph_cluster_health" +edition = "2024" +version.workspace = true +readme.workspace = true +license.workspace = true + +[dependencies] +harmony = { version = "0.1.0", path = "../../harmony" } +harmony_cli = { version = "0.1.0", path = "../../harmony_cli" } +tokio.workspace = true diff --git a/examples/validate_ceph_cluster_health/src/main.rs b/examples/validate_ceph_cluster_health/src/main.rs new file mode 100644 index 0000000..21383a8 --- /dev/null +++ b/examples/validate_ceph_cluster_health/src/main.rs @@ -0,0 +1,14 @@ +use harmony::{inventory::Inventory, modules::storage::ceph::ceph_validate_health_score::CephVerifyClusterHealth, topology::K8sAnywhereTopology}; + +#[tokio::main] +async fn main() { + let ceph_health_score = CephVerifyClusterHealth { + rook_ceph_namespace: "rook-ceph".to_string(), + }; + + let topology = K8sAnywhereTopology::from_env(); + let inventory = Inventory::autoload(); + harmony_cli::run(inventory, topology, vec![Box::new(ceph_health_score)], None) + .await + .unwrap(); +} diff --git a/harmony/src/domain/interpret/mod.rs b/harmony/src/domain/interpret/mod.rs index cfbf2b5..0e66a95 100644 --- a/harmony/src/domain/interpret/mod.rs +++ b/harmony/src/domain/interpret/mod.rs @@ -32,6 +32,7 @@ pub enum InterpretName { Lamp, ApplicationMonitoring, K8sPrometheusCrdAlerting, + CephClusterHealth, } impl std::fmt::Display for InterpretName { @@ -58,6 +59,7 @@ impl std::fmt::Display for InterpretName { InterpretName::Lamp => f.write_str("LAMP"), InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"), InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"), + InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"), } } } diff --git a/harmony/src/modules/storage/ceph/ceph_validate_health_score.rs b/harmony/src/modules/storage/ceph/ceph_validate_health_score.rs new file mode 100644 index 0000000..d28a488 --- /dev/null +++ b/harmony/src/modules/storage/ceph/ceph_validate_health_score.rs @@ -0,0 +1,130 @@ +use std::{sync::Arc, time::Duration}; + +use async_trait::async_trait; +use log::debug; +use serde::Serialize; +use tokio::time::Instant; + +use crate::{ + data::{Id, Version}, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + score::Score, + topology::{K8sclient, Topology, k8s::K8sClient}, +}; + +#[derive(Clone, Debug, Serialize)] +pub struct CephVerifyClusterHealth { + pub rook_ceph_namespace: String, +} + +impl Score for CephVerifyClusterHealth { + fn name(&self) -> String { + format!("CephValidateClusterHealth") + } + + fn create_interpret(&self) -> Box> { + Box::new(CephVerifyClusterHealthInterpret { + score: self.clone(), + }) + } +} + +#[derive(Clone, Debug)] +pub struct CephVerifyClusterHealthInterpret { + score: CephVerifyClusterHealth, +} + +#[async_trait] +impl Interpret for CephVerifyClusterHealthInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + ) -> Result { + let client = topology.k8s_client().await.unwrap(); + self.verify_ceph_toolbox_exists(client.clone()).await?; + self.validate_ceph_cluster_health(client.clone()).await?; + Ok(Outcome::success("Ceph cluster healthy".to_string())) + } + + fn get_name(&self) -> InterpretName { + InterpretName::CephClusterHealth + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +} + +impl CephVerifyClusterHealthInterpret { + pub async fn verify_ceph_toolbox_exists( + &self, + client: Arc, + ) -> Result { + let toolbox_dep = "rook-ceph-tools".to_string(); + + match client + .get_deployment(&toolbox_dep, Some(&self.score.rook_ceph_namespace)) + .await + { + Ok(Some(deployment)) => { + if let Some(status) = deployment.status { + let ready_count = status.ready_replicas.unwrap_or(0); + if ready_count >= 1 { + return Ok(Outcome::success(format!( + "'{}' is ready with {} replica(s).", + &toolbox_dep, ready_count + ))); + } else { + return Err(InterpretError::new( + "ceph-tool-box not ready in cluster".to_string(), + )); + } + } else { + Err(InterpretError::new(format!( + "failed to get deployment status {}", + &toolbox_dep + ))) + } + } + Ok(None) => Err(InterpretError::new(format!( + "Deployment '{}' not found in namespace '{}'.", + &toolbox_dep, self.score.rook_ceph_namespace + ))), + Err(e) => Err(InterpretError::new(format!( + "Failed to query for deployment '{}': {}", + &toolbox_dep, e + ))), + } + } + + pub async fn validate_ceph_cluster_health( + &self, + client: Arc, + ) -> Result { + debug!("Verifying ceph cluster is in healthy state"); + + let health = client + .exec_app_capture_output( + "rook-ceph-tools".to_string(), + "app".to_string(), + Some(&self.score.rook_ceph_namespace), + vec!["sh", "-c", "ceph health"], + ) + .await?; + + if health.contains("HEALTH_OK") { + return Ok(Outcome::success("Ceph Cluster in healthy state".to_string())) + } else { + Err(InterpretError::new(format!("Ceph cluster unhealthy {}", health)))} + } +} From 65cc9befebba3a635825b9f6809045813494f5e7 Mon Sep 17 00:00:00 2001 From: Willem Date: Mon, 25 Aug 2025 13:31:39 -0400 Subject: [PATCH 2/3] mod.rs --- harmony/src/modules/storage/ceph/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/harmony/src/modules/storage/ceph/mod.rs b/harmony/src/modules/storage/ceph/mod.rs index a993c3d..3e3250e 100644 --- a/harmony/src/modules/storage/ceph/mod.rs +++ b/harmony/src/modules/storage/ceph/mod.rs @@ -1 +1,2 @@ pub mod ceph_osd_replacement_score; +pub mod ceph_validate_health_score; From a9fe4ab2673bf081b61a388eb81388faf14e5fa7 Mon Sep 17 00:00:00 2001 From: Willem Date: Mon, 25 Aug 2025 13:33:36 -0400 Subject: [PATCH 3/3] fix: cargo fmt --- examples/validate_ceph_cluster_health/src/main.rs | 6 +++++- .../storage/ceph/ceph_validate_health_score.rs | 12 +++++++++--- 2 files changed, 14 insertions(+), 4 deletions(-) diff --git a/examples/validate_ceph_cluster_health/src/main.rs b/examples/validate_ceph_cluster_health/src/main.rs index 21383a8..a7b8f57 100644 --- a/examples/validate_ceph_cluster_health/src/main.rs +++ b/examples/validate_ceph_cluster_health/src/main.rs @@ -1,4 +1,8 @@ -use harmony::{inventory::Inventory, modules::storage::ceph::ceph_validate_health_score::CephVerifyClusterHealth, topology::K8sAnywhereTopology}; +use harmony::{ + inventory::Inventory, + modules::storage::ceph::ceph_validate_health_score::CephVerifyClusterHealth, + topology::K8sAnywhereTopology, +}; #[tokio::main] async fn main() { diff --git a/harmony/src/modules/storage/ceph/ceph_validate_health_score.rs b/harmony/src/modules/storage/ceph/ceph_validate_health_score.rs index d28a488..2f7f87c 100644 --- a/harmony/src/modules/storage/ceph/ceph_validate_health_score.rs +++ b/harmony/src/modules/storage/ceph/ceph_validate_health_score.rs @@ -49,7 +49,7 @@ impl Interpret for CephVerifyClusterHealthInterpret } fn get_name(&self) -> InterpretName { - InterpretName::CephClusterHealth + InterpretName::CephClusterHealth } fn get_version(&self) -> Version { @@ -123,8 +123,14 @@ impl CephVerifyClusterHealthInterpret { .await?; if health.contains("HEALTH_OK") { - return Ok(Outcome::success("Ceph Cluster in healthy state".to_string())) + return Ok(Outcome::success( + "Ceph Cluster in healthy state".to_string(), + )); } else { - Err(InterpretError::new(format!("Ceph cluster unhealthy {}", health)))} + Err(InterpretError::new(format!( + "Ceph cluster unhealthy {}", + health + ))) + } } }