diff --git a/examples/validate_ceph_cluster_health/Cargo.toml b/examples/validate_ceph_cluster_health/Cargo.toml new file mode 100644 index 0000000..89a74b5 --- /dev/null +++ b/examples/validate_ceph_cluster_health/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "example_validate_ceph_cluster_health" +edition = "2024" +version.workspace = true +readme.workspace = true +license.workspace = true + +[dependencies] +harmony = { version = "0.1.0", path = "../../harmony" } +harmony_cli = { version = "0.1.0", path = "../../harmony_cli" } +tokio.workspace = true diff --git a/examples/validate_ceph_cluster_health/src/main.rs b/examples/validate_ceph_cluster_health/src/main.rs new file mode 100644 index 0000000..a7b8f57 --- /dev/null +++ b/examples/validate_ceph_cluster_health/src/main.rs @@ -0,0 +1,18 @@ +use harmony::{ + inventory::Inventory, + modules::storage::ceph::ceph_validate_health_score::CephVerifyClusterHealth, + topology::K8sAnywhereTopology, +}; + +#[tokio::main] +async fn main() { + let ceph_health_score = CephVerifyClusterHealth { + rook_ceph_namespace: "rook-ceph".to_string(), + }; + + let topology = K8sAnywhereTopology::from_env(); + let inventory = Inventory::autoload(); + harmony_cli::run(inventory, topology, vec![Box::new(ceph_health_score)], None) + .await + .unwrap(); +} diff --git a/harmony/src/domain/interpret/mod.rs b/harmony/src/domain/interpret/mod.rs index cfbf2b5..0e66a95 100644 --- a/harmony/src/domain/interpret/mod.rs +++ b/harmony/src/domain/interpret/mod.rs @@ -32,6 +32,7 @@ pub enum InterpretName { Lamp, ApplicationMonitoring, K8sPrometheusCrdAlerting, + CephClusterHealth, } impl std::fmt::Display for InterpretName { @@ -58,6 +59,7 @@ impl std::fmt::Display for InterpretName { InterpretName::Lamp => f.write_str("LAMP"), InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"), InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"), + InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"), } } } diff --git a/harmony/src/modules/storage/ceph/ceph_validate_health_score.rs b/harmony/src/modules/storage/ceph/ceph_validate_health_score.rs new file mode 100644 index 0000000..2f7f87c --- /dev/null +++ b/harmony/src/modules/storage/ceph/ceph_validate_health_score.rs @@ -0,0 +1,136 @@ +use std::{sync::Arc, time::Duration}; + +use async_trait::async_trait; +use log::debug; +use serde::Serialize; +use tokio::time::Instant; + +use crate::{ + data::{Id, Version}, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + score::Score, + topology::{K8sclient, Topology, k8s::K8sClient}, +}; + +#[derive(Clone, Debug, Serialize)] +pub struct CephVerifyClusterHealth { + pub rook_ceph_namespace: String, +} + +impl Score for CephVerifyClusterHealth { + fn name(&self) -> String { + format!("CephValidateClusterHealth") + } + + fn create_interpret(&self) -> Box> { + Box::new(CephVerifyClusterHealthInterpret { + score: self.clone(), + }) + } +} + +#[derive(Clone, Debug)] +pub struct CephVerifyClusterHealthInterpret { + score: CephVerifyClusterHealth, +} + +#[async_trait] +impl Interpret for CephVerifyClusterHealthInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + ) -> Result { + let client = topology.k8s_client().await.unwrap(); + self.verify_ceph_toolbox_exists(client.clone()).await?; + self.validate_ceph_cluster_health(client.clone()).await?; + Ok(Outcome::success("Ceph cluster healthy".to_string())) + } + + fn get_name(&self) -> InterpretName { + InterpretName::CephClusterHealth + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +} + +impl CephVerifyClusterHealthInterpret { + pub async fn verify_ceph_toolbox_exists( + &self, + client: Arc, + ) -> Result { + let toolbox_dep = "rook-ceph-tools".to_string(); + + match client + .get_deployment(&toolbox_dep, Some(&self.score.rook_ceph_namespace)) + .await + { + Ok(Some(deployment)) => { + if let Some(status) = deployment.status { + let ready_count = status.ready_replicas.unwrap_or(0); + if ready_count >= 1 { + return Ok(Outcome::success(format!( + "'{}' is ready with {} replica(s).", + &toolbox_dep, ready_count + ))); + } else { + return Err(InterpretError::new( + "ceph-tool-box not ready in cluster".to_string(), + )); + } + } else { + Err(InterpretError::new(format!( + "failed to get deployment status {}", + &toolbox_dep + ))) + } + } + Ok(None) => Err(InterpretError::new(format!( + "Deployment '{}' not found in namespace '{}'.", + &toolbox_dep, self.score.rook_ceph_namespace + ))), + Err(e) => Err(InterpretError::new(format!( + "Failed to query for deployment '{}': {}", + &toolbox_dep, e + ))), + } + } + + pub async fn validate_ceph_cluster_health( + &self, + client: Arc, + ) -> Result { + debug!("Verifying ceph cluster is in healthy state"); + + let health = client + .exec_app_capture_output( + "rook-ceph-tools".to_string(), + "app".to_string(), + Some(&self.score.rook_ceph_namespace), + vec!["sh", "-c", "ceph health"], + ) + .await?; + + if health.contains("HEALTH_OK") { + return Ok(Outcome::success( + "Ceph Cluster in healthy state".to_string(), + )); + } else { + Err(InterpretError::new(format!( + "Ceph cluster unhealthy {}", + health + ))) + } + } +} diff --git a/harmony/src/modules/storage/ceph/mod.rs b/harmony/src/modules/storage/ceph/mod.rs index a993c3d..3e3250e 100644 --- a/harmony/src/modules/storage/ceph/mod.rs +++ b/harmony/src/modules/storage/ceph/mod.rs @@ -1 +1,2 @@ pub mod ceph_osd_replacement_score; +pub mod ceph_validate_health_score;