forked from NationTech/harmony
		
	Merge pull request 'feat/ceph-osd-score' (#116) from feat/ceph-osd-score into master
Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/116 Reviewed-by: johnride <jg@nationtech.io>
This commit is contained in:
		
						commit
						bfca9cf163
					
				| @ -5,7 +5,7 @@ use k8s_openapi::{ | |||||||
| }; | }; | ||||||
| use kube::{ | use kube::{ | ||||||
|     Client, Config, Error, Resource, |     Client, Config, Error, Resource, | ||||||
|     api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt}, |     api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt}, | ||||||
|     config::{KubeConfigOptions, Kubeconfig}, |     config::{KubeConfigOptions, Kubeconfig}, | ||||||
|     core::ErrorResponse, |     core::ErrorResponse, | ||||||
|     runtime::reflector::Lookup, |     runtime::reflector::Lookup, | ||||||
| @ -17,7 +17,9 @@ use kube::{ | |||||||
| }; | }; | ||||||
| use log::{debug, error, trace}; | use log::{debug, error, trace}; | ||||||
| use serde::{Serialize, de::DeserializeOwned}; | use serde::{Serialize, de::DeserializeOwned}; | ||||||
|  | use serde_json::json; | ||||||
| use similar::TextDiff; | use similar::TextDiff; | ||||||
|  | use tokio::io::AsyncReadExt; | ||||||
| 
 | 
 | ||||||
| #[derive(new, Clone)] | #[derive(new, Clone)] | ||||||
| pub struct K8sClient { | pub struct K8sClient { | ||||||
| @ -51,6 +53,66 @@ impl K8sClient { | |||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     pub async fn get_deployment( | ||||||
|  |         &self, | ||||||
|  |         name: &str, | ||||||
|  |         namespace: Option<&str>, | ||||||
|  |     ) -> Result<Option<Deployment>, Error> { | ||||||
|  |         let deps: Api<Deployment> = if let Some(ns) = namespace { | ||||||
|  |             Api::namespaced(self.client.clone(), ns) | ||||||
|  |         } else { | ||||||
|  |             Api::default_namespaced(self.client.clone()) | ||||||
|  |         }; | ||||||
|  |         Ok(deps.get_opt(name).await?) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result<Option<Pod>, Error> { | ||||||
|  |         let pods: Api<Pod> = if let Some(ns) = namespace { | ||||||
|  |             Api::namespaced(self.client.clone(), ns) | ||||||
|  |         } else { | ||||||
|  |             Api::default_namespaced(self.client.clone()) | ||||||
|  |         }; | ||||||
|  |         Ok(pods.get_opt(name).await?) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub async fn scale_deployment( | ||||||
|  |         &self, | ||||||
|  |         name: &str, | ||||||
|  |         namespace: Option<&str>, | ||||||
|  |         replicas: u32, | ||||||
|  |     ) -> Result<(), Error> { | ||||||
|  |         let deployments: Api<Deployment> = if let Some(ns) = namespace { | ||||||
|  |             Api::namespaced(self.client.clone(), ns) | ||||||
|  |         } else { | ||||||
|  |             Api::default_namespaced(self.client.clone()) | ||||||
|  |         }; | ||||||
|  | 
 | ||||||
|  |         let patch = json!({ | ||||||
|  |             "spec": { | ||||||
|  |                 "replicas": replicas | ||||||
|  |             } | ||||||
|  |         }); | ||||||
|  |         let pp = PatchParams::default(); | ||||||
|  |         let scale = Patch::Apply(&patch); | ||||||
|  |         deployments.patch_scale(name, &pp, &scale).await?; | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub async fn delete_deployment( | ||||||
|  |         &self, | ||||||
|  |         name: &str, | ||||||
|  |         namespace: Option<&str>, | ||||||
|  |     ) -> Result<(), Error> { | ||||||
|  |         let deployments: Api<Deployment> = if let Some(ns) = namespace { | ||||||
|  |             Api::namespaced(self.client.clone(), ns) | ||||||
|  |         } else { | ||||||
|  |             Api::default_namespaced(self.client.clone()) | ||||||
|  |         }; | ||||||
|  |         let delete_params = DeleteParams::default(); | ||||||
|  |         deployments.delete(name, &delete_params).await?; | ||||||
|  |         Ok(()) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     pub async fn wait_until_deployment_ready( |     pub async fn wait_until_deployment_ready( | ||||||
|         &self, |         &self, | ||||||
|         name: String, |         name: String, | ||||||
| @ -76,6 +138,68 @@ impl K8sClient { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|  |     /// Will execute a commond in the first pod found that matches the specified label
 | ||||||
|  |     /// '{label}={name}'
 | ||||||
|  |     pub async fn exec_app_capture_output( | ||||||
|  |         &self, | ||||||
|  |         name: String, | ||||||
|  |         label: String, | ||||||
|  |         namespace: Option<&str>, | ||||||
|  |         command: Vec<&str>, | ||||||
|  |     ) -> Result<String, String> { | ||||||
|  |         let api: Api<Pod>; | ||||||
|  | 
 | ||||||
|  |         if let Some(ns) = namespace { | ||||||
|  |             api = Api::namespaced(self.client.clone(), ns); | ||||||
|  |         } else { | ||||||
|  |             api = Api::default_namespaced(self.client.clone()); | ||||||
|  |         } | ||||||
|  |         let pod_list = api | ||||||
|  |             .list(&ListParams::default().labels(format!("{label}={name}").as_str())) | ||||||
|  |             .await | ||||||
|  |             .expect("couldn't get list of pods"); | ||||||
|  | 
 | ||||||
|  |         let res = api | ||||||
|  |             .exec( | ||||||
|  |                 pod_list | ||||||
|  |                     .items | ||||||
|  |                     .first() | ||||||
|  |                     .expect("couldn't get pod") | ||||||
|  |                     .name() | ||||||
|  |                     .expect("couldn't get pod name") | ||||||
|  |                     .into_owned() | ||||||
|  |                     .as_str(), | ||||||
|  |                 command, | ||||||
|  |                 &AttachParams::default().stdout(true).stderr(true), | ||||||
|  |             ) | ||||||
|  |             .await; | ||||||
|  |         match res { | ||||||
|  |             Err(e) => Err(e.to_string()), | ||||||
|  |             Ok(mut process) => { | ||||||
|  |                 let status = process | ||||||
|  |                     .take_status() | ||||||
|  |                     .expect("Couldn't get status") | ||||||
|  |                     .await | ||||||
|  |                     .expect("Couldn't unwrap status"); | ||||||
|  | 
 | ||||||
|  |                 if let Some(s) = status.status { | ||||||
|  |                     let mut stdout_buf = String::new(); | ||||||
|  |                     if let Some(mut stdout) = process.stdout().take() { | ||||||
|  |                         stdout.read_to_string(&mut stdout_buf).await; | ||||||
|  |                     } | ||||||
|  |                     debug!("Status: {} - {:?}", s, status.details); | ||||||
|  |                     if s == "Success" { | ||||||
|  |                         Ok(stdout_buf) | ||||||
|  |                     } else { | ||||||
|  |                         Err(s) | ||||||
|  |                     } | ||||||
|  |                 } else { | ||||||
|  |                     Err("Couldn't get inner status of pod exec".to_string()) | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|     /// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}`
 |     /// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}`
 | ||||||
|     pub async fn exec_app( |     pub async fn exec_app( | ||||||
|         &self, |         &self, | ||||||
|  | |||||||
| @ -14,5 +14,6 @@ pub mod monitoring; | |||||||
| pub mod okd; | pub mod okd; | ||||||
| pub mod opnsense; | pub mod opnsense; | ||||||
| pub mod prometheus; | pub mod prometheus; | ||||||
|  | pub mod storage; | ||||||
| pub mod tenant; | pub mod tenant; | ||||||
| pub mod tftp; | pub mod tftp; | ||||||
|  | |||||||
							
								
								
									
										419
									
								
								harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										419
									
								
								harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,419 @@ | |||||||
|  | use std::{ | ||||||
|  |     process::Command, | ||||||
|  |     sync::Arc, | ||||||
|  |     time::{Duration, Instant}, | ||||||
|  | }; | ||||||
|  | 
 | ||||||
|  | use async_trait::async_trait; | ||||||
|  | use log::{info, warn}; | ||||||
|  | use serde::{Deserialize, Serialize}; | ||||||
|  | use tokio::time::sleep; | ||||||
|  | 
 | ||||||
|  | use crate::{ | ||||||
|  |     data::{Id, Version}, | ||||||
|  |     interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, | ||||||
|  |     inventory::Inventory, | ||||||
|  |     score::Score, | ||||||
|  |     topology::{K8sclient, Topology, k8s::K8sClient}, | ||||||
|  | }; | ||||||
|  | 
 | ||||||
|  | #[derive(Debug, Clone, Serialize)] | ||||||
|  | pub struct CephRemoveOsd { | ||||||
|  |     osd_deployment_name: String, | ||||||
|  |     rook_ceph_namespace: String, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd { | ||||||
|  |     fn name(&self) -> String { | ||||||
|  |         format!("CephRemoveOsdScore") | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     #[doc(hidden)] | ||||||
|  |     fn create_interpret(&self) -> Box<dyn Interpret<T>> { | ||||||
|  |         Box::new(CephRemoveOsdInterpret { | ||||||
|  |             score: self.clone(), | ||||||
|  |         }) | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[derive(Debug, Clone)] | ||||||
|  | pub struct CephRemoveOsdInterpret { | ||||||
|  |     score: CephRemoveOsd, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[async_trait] | ||||||
|  | impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret { | ||||||
|  |     async fn execute( | ||||||
|  |         &self, | ||||||
|  |         _inventory: &Inventory, | ||||||
|  |         topology: &T, | ||||||
|  |     ) -> Result<Outcome, InterpretError> { | ||||||
|  |         let client = topology.k8s_client().await.unwrap(); | ||||||
|  |         self.verify_ceph_toolbox_exists(client.clone()).await?; | ||||||
|  |         self.scale_deployment(client.clone()).await?; | ||||||
|  |         self.verify_deployment_scaled(client.clone()).await?; | ||||||
|  |         self.delete_deployment(client.clone()).await?; | ||||||
|  |         self.verify_deployment_deleted(client.clone()).await?; | ||||||
|  |         let osd_id_full = self.get_ceph_osd_id().unwrap(); | ||||||
|  |         self.purge_ceph_osd(client.clone(), &osd_id_full).await?; | ||||||
|  |         self.verify_ceph_osd_removal(client.clone(), &osd_id_full) | ||||||
|  |             .await?; | ||||||
|  | 
 | ||||||
|  |         Ok(Outcome::success(format!( | ||||||
|  |             "Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}", | ||||||
|  |             osd_id_full, self.score.osd_deployment_name | ||||||
|  |         ))) | ||||||
|  |     } | ||||||
|  |     fn get_name(&self) -> InterpretName { | ||||||
|  |         todo!() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn get_version(&self) -> Version { | ||||||
|  |         todo!() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn get_status(&self) -> InterpretStatus { | ||||||
|  |         todo!() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn get_children(&self) -> Vec<Id> { | ||||||
|  |         todo!() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl CephRemoveOsdInterpret { | ||||||
|  |     pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> { | ||||||
|  |         let osd_id_numeric = self | ||||||
|  |             .score | ||||||
|  |             .osd_deployment_name | ||||||
|  |             .split('-') | ||||||
|  |             .nth(3) | ||||||
|  |             .ok_or_else(|| { | ||||||
|  |                 InterpretError::new(format!( | ||||||
|  |                     "Could not parse OSD id from deployment name {}", | ||||||
|  |                     self.score.osd_deployment_name | ||||||
|  |                 )) | ||||||
|  |             })?; | ||||||
|  |         let osd_id_full = format!("osd.{}", osd_id_numeric); | ||||||
|  | 
 | ||||||
|  |         info!( | ||||||
|  |             "Targeting Ceph OSD: {} (parsed from deployment {})", | ||||||
|  |             osd_id_full, self.score.osd_deployment_name | ||||||
|  |         ); | ||||||
|  | 
 | ||||||
|  |         Ok(osd_id_full) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub async fn verify_ceph_toolbox_exists( | ||||||
|  |         &self, | ||||||
|  |         client: Arc<K8sClient>, | ||||||
|  |     ) -> Result<Outcome, InterpretError> { | ||||||
|  |         let toolbox_dep = "rook-ceph-tools".to_string(); | ||||||
|  | 
 | ||||||
|  |         match client | ||||||
|  |             .get_deployment(&toolbox_dep, Some(&self.score.rook_ceph_namespace)) | ||||||
|  |             .await | ||||||
|  |         { | ||||||
|  |             Ok(Some(deployment)) => { | ||||||
|  |                 if let Some(status) = deployment.status { | ||||||
|  |                     let ready_count = status.ready_replicas.unwrap_or(0); | ||||||
|  |                     if ready_count >= 1 { | ||||||
|  |                         return Ok(Outcome::success(format!( | ||||||
|  |                             "'{}' is ready with {} replica(s).", | ||||||
|  |                             &toolbox_dep, ready_count | ||||||
|  |                         ))); | ||||||
|  |                     } else { | ||||||
|  |                         return Err(InterpretError::new( | ||||||
|  |                             "ceph-tool-box not ready in cluster".to_string(), | ||||||
|  |                         )); | ||||||
|  |                     } | ||||||
|  |                 } else { | ||||||
|  |                     Err(InterpretError::new(format!( | ||||||
|  |                         "failed to get deployment status {}", | ||||||
|  |                         &toolbox_dep | ||||||
|  |                     ))) | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  |             Ok(None) => Err(InterpretError::new(format!( | ||||||
|  |                 "Deployment '{}' not found in namespace '{}'.", | ||||||
|  |                 &toolbox_dep, self.score.rook_ceph_namespace | ||||||
|  |             ))), | ||||||
|  |             Err(e) => Err(InterpretError::new(format!( | ||||||
|  |                 "Failed to query for deployment '{}': {}", | ||||||
|  |                 &toolbox_dep, e | ||||||
|  |             ))), | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub async fn scale_deployment( | ||||||
|  |         &self, | ||||||
|  |         client: Arc<K8sClient>, | ||||||
|  |     ) -> Result<Outcome, InterpretError> { | ||||||
|  |         info!( | ||||||
|  |             "Scaling down OSD deployment: {}", | ||||||
|  |             self.score.osd_deployment_name | ||||||
|  |         ); | ||||||
|  |         client | ||||||
|  |             .scale_deployment( | ||||||
|  |                 &self.score.osd_deployment_name, | ||||||
|  |                 Some(&self.score.rook_ceph_namespace), | ||||||
|  |                 0, | ||||||
|  |             ) | ||||||
|  |             .await?; | ||||||
|  |         Ok(Outcome::success(format!( | ||||||
|  |             "Scaled down deployment {}", | ||||||
|  |             self.score.osd_deployment_name | ||||||
|  |         ))) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub async fn verify_deployment_scaled( | ||||||
|  |         &self, | ||||||
|  |         client: Arc<K8sClient>, | ||||||
|  |     ) -> Result<Outcome, InterpretError> { | ||||||
|  |         let (timeout, interval, start) = self.build_timer(); | ||||||
|  | 
 | ||||||
|  |         info!("Waiting for OSD deployment to scale down to 0 replicas"); | ||||||
|  |         loop { | ||||||
|  |             let dep = client | ||||||
|  |                 .get_deployment( | ||||||
|  |                     &self.score.osd_deployment_name, | ||||||
|  |                     Some(&self.score.rook_ceph_namespace), | ||||||
|  |                 ) | ||||||
|  |                 .await?; | ||||||
|  | 
 | ||||||
|  |             if let Some(deployment) = dep { | ||||||
|  |                 if let Some(status) = deployment.status { | ||||||
|  |                     if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0 | ||||||
|  |                     { | ||||||
|  |                         return Ok(Outcome::success( | ||||||
|  |                             "Deployment successfully scaled down.".to_string(), | ||||||
|  |                         )); | ||||||
|  |                     } | ||||||
|  |                 } | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             if start.elapsed() > timeout { | ||||||
|  |                 return Err(InterpretError::new(format!( | ||||||
|  |                     "Timed out waiting for deployment {} to scale down", | ||||||
|  |                     self.score.osd_deployment_name | ||||||
|  |                 ))); | ||||||
|  |             } | ||||||
|  |             sleep(interval).await; | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn build_timer(&self) -> (Duration, Duration, Instant) { | ||||||
|  |         let timeout = Duration::from_secs(120); | ||||||
|  |         let interval = Duration::from_secs(5); | ||||||
|  |         let start = Instant::now(); | ||||||
|  |         (timeout, interval, start) | ||||||
|  |     } | ||||||
|  |     pub async fn delete_deployment( | ||||||
|  |         &self, | ||||||
|  |         client: Arc<K8sClient>, | ||||||
|  |     ) -> Result<Outcome, InterpretError> { | ||||||
|  |         info!( | ||||||
|  |             "Deleting OSD deployment: {}", | ||||||
|  |             self.score.osd_deployment_name | ||||||
|  |         ); | ||||||
|  |         client | ||||||
|  |             .delete_deployment( | ||||||
|  |                 &self.score.osd_deployment_name, | ||||||
|  |                 Some(&self.score.rook_ceph_namespace), | ||||||
|  |             ) | ||||||
|  |             .await?; | ||||||
|  |         Ok(Outcome::success(format!( | ||||||
|  |             "deployment {} deleted", | ||||||
|  |             self.score.osd_deployment_name | ||||||
|  |         ))) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub async fn verify_deployment_deleted( | ||||||
|  |         &self, | ||||||
|  |         client: Arc<K8sClient>, | ||||||
|  |     ) -> Result<Outcome, InterpretError> { | ||||||
|  |         let (timeout, interval, start) = self.build_timer(); | ||||||
|  | 
 | ||||||
|  |         info!("Waiting for OSD deployment to scale down to 0 replicas"); | ||||||
|  |         loop { | ||||||
|  |             let dep = client | ||||||
|  |                 .get_deployment( | ||||||
|  |                     &self.score.osd_deployment_name, | ||||||
|  |                     Some(&self.score.rook_ceph_namespace), | ||||||
|  |                 ) | ||||||
|  |                 .await?; | ||||||
|  | 
 | ||||||
|  |             if dep.is_none() { | ||||||
|  |                 info!( | ||||||
|  |                     "Deployment {} successfully deleted.", | ||||||
|  |                     self.score.osd_deployment_name | ||||||
|  |                 ); | ||||||
|  |                 return Ok(Outcome::success(format!( | ||||||
|  |                     "Deployment {} deleted.", | ||||||
|  |                     self.score.osd_deployment_name | ||||||
|  |                 ))); | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             if start.elapsed() > timeout { | ||||||
|  |                 return Err(InterpretError::new(format!( | ||||||
|  |                     "Timed out waiting for deployment {} to be deleted", | ||||||
|  |                     self.score.osd_deployment_name | ||||||
|  |                 ))); | ||||||
|  |             } | ||||||
|  |             sleep(interval).await; | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn get_osd_tree(&self, json: serde_json::Value) -> Result<CephOsdTree, InterpretError> { | ||||||
|  |         let nodes = json.get("nodes").ok_or_else(|| { | ||||||
|  |             InterpretError::new("Missing 'nodes' field in ceph osd tree JSON".to_string()) | ||||||
|  |         })?; | ||||||
|  |         let tree: CephOsdTree = CephOsdTree { | ||||||
|  |             nodes: serde_json::from_value(nodes.clone()).map_err(|e| { | ||||||
|  |                 InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e)) | ||||||
|  |             })?, | ||||||
|  |         }; | ||||||
|  |         Ok(tree) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub async fn purge_ceph_osd( | ||||||
|  |         &self, | ||||||
|  |         client: Arc<K8sClient>, | ||||||
|  |         osd_id_full: &str, | ||||||
|  |     ) -> Result<Outcome, InterpretError> { | ||||||
|  |         info!( | ||||||
|  |             "Purging OSD {} from Ceph cluster and removing its auth key", | ||||||
|  |             osd_id_full | ||||||
|  |         ); | ||||||
|  |         client | ||||||
|  |             .exec_app_capture_output( | ||||||
|  |                 "rook-ceph-tools".to_string(), | ||||||
|  |                 "app".to_string(), | ||||||
|  |                 Some(&self.score.rook_ceph_namespace), | ||||||
|  |                 vec![ | ||||||
|  |                     format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(), | ||||||
|  |                     format!("ceph auth del osd.{osd_id_full}").as_str(), | ||||||
|  |                 ], | ||||||
|  |             ) | ||||||
|  |             .await?; | ||||||
|  |         Ok(Outcome::success(format!( | ||||||
|  |             "osd id {} removed from osd tree", | ||||||
|  |             osd_id_full | ||||||
|  |         ))) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     pub async fn verify_ceph_osd_removal( | ||||||
|  |         &self, | ||||||
|  |         client: Arc<K8sClient>, | ||||||
|  |         osd_id_full: &str, | ||||||
|  |     ) -> Result<Outcome, InterpretError> { | ||||||
|  |         let (timeout, interval, start) = self.build_timer(); | ||||||
|  |         info!( | ||||||
|  |             "Verifying OSD {} has been removed from the Ceph tree...", | ||||||
|  |             osd_id_full | ||||||
|  |         ); | ||||||
|  |         loop { | ||||||
|  |             let output = client | ||||||
|  |                 .exec_app_capture_output( | ||||||
|  |                     "rook-ceph-tools".to_string(), | ||||||
|  |                     "app".to_string(), | ||||||
|  |                     Some(&self.score.rook_ceph_namespace), | ||||||
|  |                     vec!["ceph osd tree -f json"], | ||||||
|  |                 ) | ||||||
|  |                 .await?; | ||||||
|  |             let tree = | ||||||
|  |                 self.get_osd_tree(serde_json::from_str(&output).expect("could not extract json")); | ||||||
|  | 
 | ||||||
|  |             let osd_found = tree | ||||||
|  |                 .unwrap() | ||||||
|  |                 .nodes | ||||||
|  |                 .iter() | ||||||
|  |                 .any(|node| node.name == osd_id_full); | ||||||
|  | 
 | ||||||
|  |             if !osd_found { | ||||||
|  |                 return Ok(Outcome::success(format!( | ||||||
|  |                     "Successfully verified that OSD {} is removed from the Ceph cluster.", | ||||||
|  |                     osd_id_full, | ||||||
|  |                 ))); | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             if start.elapsed() > timeout { | ||||||
|  |                 return Err(InterpretError::new(format!( | ||||||
|  |                     "Timed out waiting for OSD {} to be removed from Ceph tree", | ||||||
|  |                     osd_id_full | ||||||
|  |                 ))); | ||||||
|  |             } | ||||||
|  | 
 | ||||||
|  |             warn!( | ||||||
|  |                 "OSD {} still found in Ceph tree, retrying in {:?}...", | ||||||
|  |                 osd_id_full, interval | ||||||
|  |             ); | ||||||
|  |             sleep(interval).await; | ||||||
|  |         } | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | #[derive(Debug, Deserialize, PartialEq)] | ||||||
|  | pub struct CephOsdTree { | ||||||
|  |     pub nodes: Vec<CephNode>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[derive(Debug, Deserialize, PartialEq)] | ||||||
|  | pub struct CephNode { | ||||||
|  |     pub id: i32, | ||||||
|  |     pub name: String, | ||||||
|  |     #[serde(rename = "type")] | ||||||
|  |     pub node_type: String, | ||||||
|  |     pub type_id: Option<i32>, | ||||||
|  |     pub children: Option<Vec<i32>>, | ||||||
|  |     pub exists: Option<i32>, | ||||||
|  |     pub status: Option<String>, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[cfg(test)] | ||||||
|  | mod tests { | ||||||
|  |     use serde_json::json; | ||||||
|  | 
 | ||||||
|  |     use super::*; | ||||||
|  | 
 | ||||||
|  |     #[test] | ||||||
|  |     fn test_get_osd_tree() { | ||||||
|  |         let json_data = json!({ | ||||||
|  |             "nodes": [ | ||||||
|  |                 {"id": 1, "name": "osd.1", "type": "osd", "primary_affinity":"1"}, | ||||||
|  |                 {"id": 2, "name": "osd.2", "type": "osd", "crush_weight": 1.22344} | ||||||
|  |             ] | ||||||
|  |         }); | ||||||
|  |         let interpret = CephRemoveOsdInterpret { | ||||||
|  |             score: CephRemoveOsd { | ||||||
|  |                 osd_deployment_name: "osd-1".to_string(), | ||||||
|  |                 rook_ceph_namespace: "dummy_ns".to_string(), | ||||||
|  |             }, | ||||||
|  |         }; | ||||||
|  |         let json = interpret.get_osd_tree(json_data).unwrap(); | ||||||
|  | 
 | ||||||
|  |         let expected = CephOsdTree { | ||||||
|  |             nodes: vec![ | ||||||
|  |                 CephNode { | ||||||
|  |                     id: 1, | ||||||
|  |                     name: "osd.1".to_string(), | ||||||
|  |                     node_type: "osd".to_string(), | ||||||
|  |                     type_id: None, | ||||||
|  |                     children: None, | ||||||
|  |                     exists: None, | ||||||
|  |                     status: None, | ||||||
|  |                 }, | ||||||
|  |                 CephNode { | ||||||
|  |                     id: 2, | ||||||
|  |                     name: "osd.2".to_string(), | ||||||
|  |                     node_type: "osd".to_string(), | ||||||
|  |                     type_id: None, | ||||||
|  |                     children: None, | ||||||
|  |                     exists: None, | ||||||
|  |                     status: None, | ||||||
|  |                 }, | ||||||
|  |             ], | ||||||
|  |         }; | ||||||
|  | 
 | ||||||
|  |         assert_eq!(json, expected); | ||||||
|  |     } | ||||||
|  | } | ||||||
							
								
								
									
										1
									
								
								harmony/src/modules/storage/ceph/mod.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								harmony/src/modules/storage/ceph/mod.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1 @@ | |||||||
|  | pub mod ceph_osd_replacement_score; | ||||||
							
								
								
									
										1
									
								
								harmony/src/modules/storage/mod.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										1
									
								
								harmony/src/modules/storage/mod.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1 @@ | |||||||
|  | pub mod ceph; | ||||||
		Loading…
	
		Reference in New Issue
	
	Block a user