fix: remove ceph osd deletes and purges osd from ceph osd tree\ #120
							
								
								
									
										9
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										9
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @ -4613,6 +4613,15 @@ version = "0.8.6" | ||||
| source = "registry+https://github.com/rust-lang/crates.io-index" | ||||
| checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" | ||||
| 
 | ||||
| [[package]] | ||||
| name = "remove_rook_osd" | ||||
| version = "0.1.0" | ||||
| dependencies = [ | ||||
|  "harmony", | ||||
|  "harmony_cli", | ||||
|  "tokio", | ||||
| ] | ||||
| 
 | ||||
| [[package]] | ||||
| name = "reqwest" | ||||
| version = "0.11.27" | ||||
|  | ||||
							
								
								
									
										11
									
								
								examples/remove_rook_osd/Cargo.toml
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										11
									
								
								examples/remove_rook_osd/Cargo.toml
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,11 @@ | ||||
| [package] | ||||
| name = "example-remove-rook-osd" | ||||
| edition = "2024" | ||||
| version.workspace = true | ||||
| readme.workspace = true | ||||
| license.workspace = true | ||||
| 
 | ||||
| [dependencies] | ||||
| harmony = { version = "0.1.0", path = "../../harmony" } | ||||
| harmony_cli = { version = "0.1.0", path = "../../harmony_cli" } | ||||
| tokio.workspace = true | ||||
							
								
								
									
										18
									
								
								examples/remove_rook_osd/src/main.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										18
									
								
								examples/remove_rook_osd/src/main.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,18 @@ | ||||
| use harmony::{ | ||||
|     inventory::Inventory, modules::storage::ceph::ceph_remove_osd_score::CephRemoveOsd, | ||||
|     topology::K8sAnywhereTopology, | ||||
| }; | ||||
| 
 | ||||
| #[tokio::main] | ||||
| async fn main() { | ||||
|     let ceph_score = CephRemoveOsd { | ||||
|         osd_deployment_name: "rook-ceph-osd-2".to_string(), | ||||
|         rook_ceph_namespace: "rook-ceph".to_string(), | ||||
|     }; | ||||
| 
 | ||||
|     let topology = K8sAnywhereTopology::from_env(); | ||||
|     let inventory = Inventory::autoload(); | ||||
|     harmony_cli::run(inventory, topology, vec![Box::new(ceph_score)], None) | ||||
|         .await | ||||
|         .unwrap(); | ||||
| } | ||||
| @ -30,6 +30,7 @@ pub enum InterpretName { | ||||
|     Lamp, | ||||
|     ApplicationMonitoring, | ||||
|     K8sPrometheusCrdAlerting, | ||||
|     CephRemoveOsd, | ||||
|     DiscoverInventoryAgent, | ||||
|     CephClusterHealth, | ||||
|     Custom(&'static str), | ||||
| @ -61,7 +62,11 @@ impl std::fmt::Display for InterpretName { | ||||
|             InterpretName::Lamp => f.write_str("LAMP"), | ||||
|             InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"), | ||||
|             InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"), | ||||
| <<<<<<< HEAD | ||||
|             InterpretName::CephRemoveOsd => f.write_str("CephRemoveOsd"), | ||||
| ======= | ||||
|             InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"), | ||||
| >>>>>>> origin/master | ||||
|             InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"), | ||||
|             InterpretName::Custom(name) => f.write_str(name), | ||||
|             InterpretName::RHOBAlerting => f.write_str("RHOBAlerting"), | ||||
|  | ||||
| @ -21,7 +21,7 @@ use kube::{ | ||||
|     api::{ApiResource, GroupVersionKind}, | ||||
|     runtime::wait::await_condition, | ||||
| }; | ||||
| use log::{debug, error, trace}; | ||||
| use log::{debug, error, info, trace}; | ||||
| use serde::{Serialize, de::DeserializeOwned}; | ||||
| use serde_json::{Value, json}; | ||||
| use similar::TextDiff; | ||||
| @ -80,10 +80,13 @@ impl K8sClient { | ||||
|         namespace: Option<&str>, | ||||
|     ) -> Result<Option<Deployment>, Error> { | ||||
|         let deps: Api<Deployment> = if let Some(ns) = namespace { | ||||
|             debug!("getting namespaced deployment"); | ||||
|             Api::namespaced(self.client.clone(), ns) | ||||
|         } else { | ||||
|             debug!("getting default namespace deployment"); | ||||
|             Api::default_namespaced(self.client.clone()) | ||||
|         }; | ||||
|         debug!("getting deployment {} in ns {}", name, namespace.unwrap()); | ||||
|         Ok(deps.get_opt(name).await?) | ||||
|     } | ||||
| 
 | ||||
| @ -114,7 +117,7 @@ impl K8sClient { | ||||
|             } | ||||
|         }); | ||||
|         let pp = PatchParams::default(); | ||||
|         let scale = Patch::Apply(&patch); | ||||
|         let scale = Patch::Merge(&patch); | ||||
|         deployments.patch_scale(name, &pp, &scale).await?; | ||||
|         Ok(()) | ||||
|     } | ||||
|  | ||||
| @ -4,7 +4,7 @@ use std::{ | ||||
| }; | ||||
| 
 | ||||
| use async_trait::async_trait; | ||||
| use log::{info, warn}; | ||||
| use log::{debug, warn}; | ||||
| use serde::{Deserialize, Serialize}; | ||||
| use tokio::time::sleep; | ||||
| 
 | ||||
| @ -19,8 +19,8 @@ use harmony_types::id::Id; | ||||
| 
 | ||||
| #[derive(Debug, Clone, Serialize)] | ||||
| pub struct CephRemoveOsd { | ||||
|     osd_deployment_name: String, | ||||
|     rook_ceph_namespace: String, | ||||
|     pub osd_deployment_name: String, | ||||
|     pub rook_ceph_namespace: String, | ||||
| } | ||||
| 
 | ||||
| impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd { | ||||
| @ -54,18 +54,17 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret { | ||||
|         self.verify_deployment_scaled(client.clone()).await?; | ||||
|         self.delete_deployment(client.clone()).await?; | ||||
|         self.verify_deployment_deleted(client.clone()).await?; | ||||
|         let osd_id_full = self.get_ceph_osd_id().unwrap(); | ||||
|         self.purge_ceph_osd(client.clone(), &osd_id_full).await?; | ||||
|         self.verify_ceph_osd_removal(client.clone(), &osd_id_full) | ||||
|             .await?; | ||||
|         self.purge_ceph_osd(client.clone()).await?; | ||||
|         self.verify_ceph_osd_removal(client.clone()).await?; | ||||
| 
 | ||||
|         let osd_id_full = self.get_ceph_osd_id().unwrap(); | ||||
|         Ok(Outcome::success(format!( | ||||
|             "Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}", | ||||
|             osd_id_full, self.score.osd_deployment_name | ||||
|         ))) | ||||
|     } | ||||
|     fn get_name(&self) -> InterpretName { | ||||
|         todo!() | ||||
|         InterpretName::CephRemoveOsd | ||||
|     } | ||||
| 
 | ||||
|     fn get_version(&self) -> Version { | ||||
| @ -82,7 +81,7 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret { | ||||
| } | ||||
| 
 | ||||
| impl CephRemoveOsdInterpret { | ||||
|     pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> { | ||||
|     pub fn get_ceph_osd_id_numeric(&self) -> Result<String, InterpretError> { | ||||
|         let osd_id_numeric = self | ||||
|             .score | ||||
|             .osd_deployment_name | ||||
| @ -94,9 +93,14 @@ impl CephRemoveOsdInterpret { | ||||
|                     self.score.osd_deployment_name | ||||
|                 )) | ||||
|             })?; | ||||
|         Ok(osd_id_numeric.to_string()) | ||||
|     } | ||||
| 
 | ||||
|     pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> { | ||||
|         let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap(); | ||||
|         let osd_id_full = format!("osd.{}", osd_id_numeric); | ||||
| 
 | ||||
|         info!( | ||||
|         debug!( | ||||
|             "Targeting Ceph OSD: {} (parsed from deployment {})", | ||||
|             osd_id_full, self.score.osd_deployment_name | ||||
|         ); | ||||
| @ -108,6 +112,7 @@ impl CephRemoveOsdInterpret { | ||||
|         &self, | ||||
|         client: Arc<K8sClient>, | ||||
|     ) -> Result<Outcome, InterpretError> { | ||||
|         debug!("verifying toolbox exists"); | ||||
|         let toolbox_dep = "rook-ceph-tools".to_string(); | ||||
| 
 | ||||
|         match client | ||||
| @ -149,7 +154,7 @@ impl CephRemoveOsdInterpret { | ||||
|         &self, | ||||
|         client: Arc<K8sClient>, | ||||
|     ) -> Result<Outcome, InterpretError> { | ||||
|         info!( | ||||
|         debug!( | ||||
|             "Scaling down OSD deployment: {}", | ||||
|             self.score.osd_deployment_name | ||||
|         ); | ||||
| @ -172,7 +177,7 @@ impl CephRemoveOsdInterpret { | ||||
|     ) -> Result<Outcome, InterpretError> { | ||||
|         let (timeout, interval, start) = self.build_timer(); | ||||
| 
 | ||||
|         info!("Waiting for OSD deployment to scale down to 0 replicas"); | ||||
|         debug!("Waiting for OSD deployment to scale down to 0 replicas"); | ||||
|         loop { | ||||
|             let dep = client | ||||
|                 .get_deployment( | ||||
| @ -180,11 +185,9 @@ impl CephRemoveOsdInterpret { | ||||
|                     Some(&self.score.rook_ceph_namespace), | ||||
|                 ) | ||||
|                 .await?; | ||||
| 
 | ||||
|             if let Some(deployment) = dep { | ||||
|                 if let Some(status) = deployment.status { | ||||
|                     if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0 | ||||
|                     { | ||||
|                     if status.replicas == None && status.ready_replicas == None { | ||||
|                         return Ok(Outcome::success( | ||||
|                             "Deployment successfully scaled down.".to_string(), | ||||
|                         )); | ||||
| @ -212,7 +215,7 @@ impl CephRemoveOsdInterpret { | ||||
|         &self, | ||||
|         client: Arc<K8sClient>, | ||||
|     ) -> Result<Outcome, InterpretError> { | ||||
|         info!( | ||||
|         debug!( | ||||
|             "Deleting OSD deployment: {}", | ||||
|             self.score.osd_deployment_name | ||||
|         ); | ||||
| @ -234,7 +237,7 @@ impl CephRemoveOsdInterpret { | ||||
|     ) -> Result<Outcome, InterpretError> { | ||||
|         let (timeout, interval, start) = self.build_timer(); | ||||
| 
 | ||||
|         info!("Waiting for OSD deployment to scale down to 0 replicas"); | ||||
|         debug!("Verifying OSD deployment deleted"); | ||||
|         loop { | ||||
|             let dep = client | ||||
|                 .get_deployment( | ||||
| @ -244,7 +247,7 @@ impl CephRemoveOsdInterpret { | ||||
|                 .await?; | ||||
| 
 | ||||
|             if dep.is_none() { | ||||
|                 info!( | ||||
|                 debug!( | ||||
|                     "Deployment {} successfully deleted.", | ||||
|                     self.score.osd_deployment_name | ||||
|                 ); | ||||
| @ -276,12 +279,10 @@ impl CephRemoveOsdInterpret { | ||||
|         Ok(tree) | ||||
|     } | ||||
| 
 | ||||
|     pub async fn purge_ceph_osd( | ||||
|         &self, | ||||
|         client: Arc<K8sClient>, | ||||
|         osd_id_full: &str, | ||||
|     ) -> Result<Outcome, InterpretError> { | ||||
|         info!( | ||||
|     pub async fn purge_ceph_osd(&self, client: Arc<K8sClient>) -> Result<Outcome, InterpretError> { | ||||
|         let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap(); | ||||
|         let osd_id_full = self.get_ceph_osd_id().unwrap(); | ||||
|         debug!( | ||||
|             "Purging OSD {} from Ceph cluster and removing its auth key", | ||||
|             osd_id_full | ||||
|         ); | ||||
| @ -291,8 +292,9 @@ impl CephRemoveOsdInterpret { | ||||
|                 "app".to_string(), | ||||
|                 Some(&self.score.rook_ceph_namespace), | ||||
|                 vec![ | ||||
|                     format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(), | ||||
|                     format!("ceph auth del osd.{osd_id_full}").as_str(), | ||||
|                     "sh", | ||||
|                     "-c", | ||||
|                     format!("ceph osd purge {osd_id_numeric} --yes-i-really-mean-it && ceph auth del {osd_id_full}").as_str(), | ||||
|                 ], | ||||
|             ) | ||||
|             .await?; | ||||
| @ -305,10 +307,10 @@ impl CephRemoveOsdInterpret { | ||||
|     pub async fn verify_ceph_osd_removal( | ||||
|         &self, | ||||
|         client: Arc<K8sClient>, | ||||
|         osd_id_full: &str, | ||||
|     ) -> Result<Outcome, InterpretError> { | ||||
|         let (timeout, interval, start) = self.build_timer(); | ||||
|         info!( | ||||
|         let osd_id_full = self.get_ceph_osd_id().unwrap(); | ||||
|         debug!( | ||||
|             "Verifying OSD {} has been removed from the Ceph tree...", | ||||
|             osd_id_full | ||||
|         ); | ||||
| @ -318,7 +320,7 @@ impl CephRemoveOsdInterpret { | ||||
|                     "rook-ceph-tools".to_string(), | ||||
|                     "app".to_string(), | ||||
|                     Some(&self.score.rook_ceph_namespace), | ||||
|                     vec!["ceph osd tree -f json"], | ||||
|                     vec!["sh", "-c", "ceph osd tree -f json"], | ||||
|                 ) | ||||
|                 .await?; | ||||
|             let tree = | ||||
| @ -1,2 +1,2 @@ | ||||
| pub mod ceph_osd_replacement_score; | ||||
| pub mod ceph_remove_osd_score; | ||||
| pub mod ceph_validate_health_score; | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user