Compare commits

...

4 Commits

Author SHA1 Message Date
fb0fc50eac Merge remote-tracking branch 'origin/master' into fix/ceph_remove_osd 2025-08-25 16:12:04 -04:00
63ae213c35 fix: cargo fmt 2025-08-25 16:06:59 -04:00
3567d0f420 fix: added example use case, fixed exec app command to use && 2025-08-25 16:03:31 -04:00
3867d436ad fix: remove ceph osd deletes and purges osd from ceph osd tree\
k8s returns None rather than zero when checking deployment for replicas
exec_app requires commands 's' and '-c' to run correctly
2025-08-25 11:17:14 -04:00
7 changed files with 77 additions and 33 deletions

9
Cargo.lock generated
View File

@ -4255,6 +4255,15 @@ version = "0.8.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c"
[[package]]
name = "remove_rook_osd"
version = "0.1.0"
dependencies = [
"harmony",
"harmony_cli",
"tokio",
]
[[package]]
name = "reqwest"
version = "0.11.27"

View File

@ -0,0 +1,11 @@
[package]
name = "example-remove-rook-osd"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { version = "0.1.0", path = "../../harmony" }
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
tokio.workspace = true

View File

@ -0,0 +1,18 @@
use harmony::{
inventory::Inventory, modules::storage::ceph::ceph_remove_osd_score::CephRemoveOsd,
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let ceph_score = CephRemoveOsd {
osd_deployment_name: "rook-ceph-osd-2".to_string(),
rook_ceph_namespace: "rook-ceph".to_string(),
};
let topology = K8sAnywhereTopology::from_env();
let inventory = Inventory::autoload();
harmony_cli::run(inventory, topology, vec![Box::new(ceph_score)], None)
.await
.unwrap();
}

View File

@ -32,6 +32,7 @@ pub enum InterpretName {
Lamp,
ApplicationMonitoring,
K8sPrometheusCrdAlerting,
CephRemoveOsd,
CephClusterHealth,
}
@ -59,6 +60,7 @@ impl std::fmt::Display for InterpretName {
InterpretName::Lamp => f.write_str("LAMP"),
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
InterpretName::CephRemoveOsd => f.write_str("CephRemoveOsd"),
InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"),
}
}

View File

@ -15,7 +15,7 @@ use kube::{
api::{ApiResource, GroupVersionKind},
runtime::wait::await_condition,
};
use log::{debug, error, trace};
use log::{debug, error, info, trace};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::json;
use similar::TextDiff;
@ -59,10 +59,13 @@ impl K8sClient {
namespace: Option<&str>,
) -> Result<Option<Deployment>, Error> {
let deps: Api<Deployment> = if let Some(ns) = namespace {
debug!("getting namespaced deployment");
Api::namespaced(self.client.clone(), ns)
} else {
debug!("getting default namespace deployment");
Api::default_namespaced(self.client.clone())
};
debug!("getting deployment {} in ns {}", name, namespace.unwrap());
Ok(deps.get_opt(name).await?)
}
@ -93,7 +96,7 @@ impl K8sClient {
}
});
let pp = PatchParams::default();
let scale = Patch::Apply(&patch);
let scale = Patch::Merge(&patch);
deployments.patch_scale(name, &pp, &scale).await?;
Ok(())
}

View File

@ -1,11 +1,10 @@
use std::{
process::Command,
sync::Arc,
time::{Duration, Instant},
};
use async_trait::async_trait;
use log::{info, warn};
use log::{debug, warn};
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
@ -19,8 +18,8 @@ use crate::{
#[derive(Debug, Clone, Serialize)]
pub struct CephRemoveOsd {
osd_deployment_name: String,
rook_ceph_namespace: String,
pub osd_deployment_name: String,
pub rook_ceph_namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
@ -54,18 +53,17 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
self.verify_deployment_scaled(client.clone()).await?;
self.delete_deployment(client.clone()).await?;
self.verify_deployment_deleted(client.clone()).await?;
let osd_id_full = self.get_ceph_osd_id().unwrap();
self.purge_ceph_osd(client.clone(), &osd_id_full).await?;
self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
.await?;
self.purge_ceph_osd(client.clone()).await?;
self.verify_ceph_osd_removal(client.clone()).await?;
let osd_id_full = self.get_ceph_osd_id().unwrap();
Ok(Outcome::success(format!(
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
osd_id_full, self.score.osd_deployment_name
)))
}
fn get_name(&self) -> InterpretName {
todo!()
InterpretName::CephRemoveOsd
}
fn get_version(&self) -> Version {
@ -82,7 +80,7 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
}
impl CephRemoveOsdInterpret {
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
pub fn get_ceph_osd_id_numeric(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self
.score
.osd_deployment_name
@ -94,9 +92,14 @@ impl CephRemoveOsdInterpret {
self.score.osd_deployment_name
))
})?;
Ok(osd_id_numeric.to_string())
}
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
let osd_id_full = format!("osd.{}", osd_id_numeric);
info!(
debug!(
"Targeting Ceph OSD: {} (parsed from deployment {})",
osd_id_full, self.score.osd_deployment_name
);
@ -108,6 +111,7 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
debug!("verifying toolbox exists");
let toolbox_dep = "rook-ceph-tools".to_string();
match client
@ -149,7 +153,7 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
debug!(
"Scaling down OSD deployment: {}",
self.score.osd_deployment_name
);
@ -172,7 +176,7 @@ impl CephRemoveOsdInterpret {
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
debug!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
@ -180,11 +184,9 @@ impl CephRemoveOsdInterpret {
Some(&self.score.rook_ceph_namespace),
)
.await?;
if let Some(deployment) = dep {
if let Some(status) = deployment.status {
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0
{
if status.replicas == None && status.ready_replicas == None {
return Ok(Outcome::success(
"Deployment successfully scaled down.".to_string(),
));
@ -212,7 +214,7 @@ impl CephRemoveOsdInterpret {
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
debug!(
"Deleting OSD deployment: {}",
self.score.osd_deployment_name
);
@ -234,7 +236,7 @@ impl CephRemoveOsdInterpret {
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
debug!("Verifying OSD deployment deleted");
loop {
let dep = client
.get_deployment(
@ -244,7 +246,7 @@ impl CephRemoveOsdInterpret {
.await?;
if dep.is_none() {
info!(
debug!(
"Deployment {} successfully deleted.",
self.score.osd_deployment_name
);
@ -276,12 +278,10 @@ impl CephRemoveOsdInterpret {
Ok(tree)
}
pub async fn purge_ceph_osd(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
info!(
pub async fn purge_ceph_osd(&self, client: Arc<K8sClient>) -> Result<Outcome, InterpretError> {
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
let osd_id_full = self.get_ceph_osd_id().unwrap();
debug!(
"Purging OSD {} from Ceph cluster and removing its auth key",
osd_id_full
);
@ -291,8 +291,9 @@ impl CephRemoveOsdInterpret {
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec![
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(),
format!("ceph auth del osd.{osd_id_full}").as_str(),
"sh",
"-c",
format!("ceph osd purge {osd_id_numeric} --yes-i-really-mean-it && ceph auth del {osd_id_full}").as_str(),
],
)
.await?;
@ -305,10 +306,10 @@ impl CephRemoveOsdInterpret {
pub async fn verify_ceph_osd_removal(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!(
let osd_id_full = self.get_ceph_osd_id().unwrap();
debug!(
"Verifying OSD {} has been removed from the Ceph tree...",
osd_id_full
);
@ -318,7 +319,7 @@ impl CephRemoveOsdInterpret {
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec!["ceph osd tree -f json"],
vec!["sh", "-c", "ceph osd tree -f json"],
)
.await?;
let tree =

View File

@ -1,2 +1,2 @@
pub mod ceph_osd_replacement_score;
pub mod ceph_remove_osd_score;
pub mod ceph_validate_health_score;