feat/ceph_validate_health #121
11
examples/validate_ceph_cluster_health/Cargo.toml
Normal file
11
examples/validate_ceph_cluster_health/Cargo.toml
Normal file
@ -0,0 +1,11 @@
|
||||
[package]
|
||||
name = "example_validate_ceph_cluster_health"
|
||||
edition = "2024"
|
||||
version.workspace = true
|
||||
readme.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
harmony = { version = "0.1.0", path = "../../harmony" }
|
||||
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
|
||||
tokio.workspace = true
|
14
examples/validate_ceph_cluster_health/src/main.rs
Normal file
14
examples/validate_ceph_cluster_health/src/main.rs
Normal file
@ -0,0 +1,14 @@
|
||||
use harmony::{inventory::Inventory, modules::storage::ceph::ceph_validate_health_score::CephVerifyClusterHealth, topology::K8sAnywhereTopology};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let ceph_health_score = CephVerifyClusterHealth {
|
||||
rook_ceph_namespace: "rook-ceph".to_string(),
|
||||
};
|
||||
|
||||
let topology = K8sAnywhereTopology::from_env();
|
||||
let inventory = Inventory::autoload();
|
||||
harmony_cli::run(inventory, topology, vec![Box::new(ceph_health_score)], None)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
@ -32,6 +32,7 @@ pub enum InterpretName {
|
||||
Lamp,
|
||||
ApplicationMonitoring,
|
||||
K8sPrometheusCrdAlerting,
|
||||
CephClusterHealth,
|
||||
}
|
||||
|
||||
impl std::fmt::Display for InterpretName {
|
||||
@ -58,6 +59,7 @@ impl std::fmt::Display for InterpretName {
|
||||
InterpretName::Lamp => f.write_str("LAMP"),
|
||||
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
|
||||
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
|
||||
InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
130
harmony/src/modules/storage/ceph/ceph_validate_health_score.rs
Normal file
130
harmony/src/modules/storage/ceph/ceph_validate_health_score.rs
Normal file
@ -0,0 +1,130 @@
|
||||
use std::{sync::Arc, time::Duration};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::debug;
|
||||
use serde::Serialize;
|
||||
use tokio::time::Instant;
|
||||
|
||||
use crate::{
|
||||
data::{Id, Version},
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
score::Score,
|
||||
topology::{K8sclient, Topology, k8s::K8sClient},
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct CephVerifyClusterHealth {
|
||||
pub rook_ceph_namespace: String,
|
||||
}
|
||||
|
||||
impl<T: Topology + K8sclient> Score<T> for CephVerifyClusterHealth {
|
||||
fn name(&self) -> String {
|
||||
format!("CephValidateClusterHealth")
|
||||
}
|
||||
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(CephVerifyClusterHealthInterpret {
|
||||
score: self.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct CephVerifyClusterHealthInterpret {
|
||||
score: CephVerifyClusterHealth,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + K8sclient> Interpret<T> for CephVerifyClusterHealthInterpret {
|
||||
async fn execute(
|
||||
&self,
|
||||
_inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let client = topology.k8s_client().await.unwrap();
|
||||
self.verify_ceph_toolbox_exists(client.clone()).await?;
|
||||
|
||||
self.validate_ceph_cluster_health(client.clone()).await?;
|
||||
Ok(Outcome::success("Ceph cluster healthy".to_string()))
|
||||
}
|
||||
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::CephClusterHealth
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl CephVerifyClusterHealthInterpret {
|
||||
pub async fn verify_ceph_toolbox_exists(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let toolbox_dep = "rook-ceph-tools".to_string();
|
||||
|
||||
match client
|
||||
.get_deployment(&toolbox_dep, Some(&self.score.rook_ceph_namespace))
|
||||
.await
|
||||
{
|
||||
Ok(Some(deployment)) => {
|
||||
if let Some(status) = deployment.status {
|
||||
let ready_count = status.ready_replicas.unwrap_or(0);
|
||||
if ready_count >= 1 {
|
||||
return Ok(Outcome::success(format!(
|
||||
"'{}' is ready with {} replica(s).",
|
||||
&toolbox_dep, ready_count
|
||||
)));
|
||||
} else {
|
||||
return Err(InterpretError::new(
|
||||
"ceph-tool-box not ready in cluster".to_string(),
|
||||
));
|
||||
}
|
||||
} else {
|
||||
Err(InterpretError::new(format!(
|
||||
"failed to get deployment status {}",
|
||||
&toolbox_dep
|
||||
)))
|
||||
}
|
||||
}
|
||||
Ok(None) => Err(InterpretError::new(format!(
|
||||
"Deployment '{}' not found in namespace '{}'.",
|
||||
&toolbox_dep, self.score.rook_ceph_namespace
|
||||
))),
|
||||
Err(e) => Err(InterpretError::new(format!(
|
||||
"Failed to query for deployment '{}': {}",
|
||||
&toolbox_dep, e
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn validate_ceph_cluster_health(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
debug!("Verifying ceph cluster is in healthy state");
|
||||
|
||||
let health = client
|
||||
.exec_app_capture_output(
|
||||
"rook-ceph-tools".to_string(),
|
||||
"app".to_string(),
|
||||
Some(&self.score.rook_ceph_namespace),
|
||||
vec!["sh", "-c", "ceph health"],
|
||||
)
|
||||
.await?;
|
||||
|
||||
if health.contains("HEALTH_OK") {
|
||||
return Ok(Outcome::success("Ceph Cluster in healthy state".to_string()))
|
||||
} else {
|
||||
Err(InterpretError::new(format!("Ceph cluster unhealthy {}", health)))}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user
Ca me fait penser, et d'ailleurs ça intéressera @letian :
Je pense qu'on peut set du contexte dans le thread d'exécution Tokio. Ça pourrait être utile pour l'instrumentation.
Pourquoi j'y pense ici c'est qu'on pourrait transformer
verify_ceph_toolbox
enensure_ceph_toolbox
et lancer le score qui s'occupe de la lever. Cote UX actuellement on voit des logs apparaitre, et cote code nous on n'a pas la trace de qui a parti ca pourquoi. Mais en injectant du "metadata" dans le thread on pourrait faire en sorte qu'un score qui s'execute par une nouvelle task qui a un execution context qui a l'info suffisante pour de la bonne instrumentation (genre, c'est qui mon parent).Just food for thought.