Compare commits
	
		
			No commits in common. "cd3ea6fc10bbaf1f20e8e1645094903022aa8ef5" and "d1a274b7054d1821d233bde5396339e7329d6e10" have entirely different histories.
		
	
	
		
			cd3ea6fc10
			...
			d1a274b705
		
	
		
| @ -5,7 +5,7 @@ use k8s_openapi::{ | |||||||
| }; | }; | ||||||
| use kube::{ | use kube::{ | ||||||
|     Client, Config, Error, Resource, |     Client, Config, Error, Resource, | ||||||
|     api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt}, |     api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt}, | ||||||
|     config::{KubeConfigOptions, Kubeconfig}, |     config::{KubeConfigOptions, Kubeconfig}, | ||||||
|     core::ErrorResponse, |     core::ErrorResponse, | ||||||
|     runtime::reflector::Lookup, |     runtime::reflector::Lookup, | ||||||
| @ -19,7 +19,6 @@ use log::{debug, error, trace}; | |||||||
| use serde::{Serialize, de::DeserializeOwned}; | use serde::{Serialize, de::DeserializeOwned}; | ||||||
| use serde_json::json; | use serde_json::json; | ||||||
| use similar::TextDiff; | use similar::TextDiff; | ||||||
| use tokio::io::AsyncReadExt; |  | ||||||
| 
 | 
 | ||||||
| #[derive(new, Clone)] | #[derive(new, Clone)] | ||||||
| pub struct K8sClient { | pub struct K8sClient { | ||||||
| @ -98,21 +97,6 @@ impl K8sClient { | |||||||
|         Ok(()) |         Ok(()) | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn delete_deployment( |  | ||||||
|         &self, |  | ||||||
|         name: &str, |  | ||||||
|         namespace: Option<&str>, |  | ||||||
|     ) -> Result<(), Error> { |  | ||||||
|         let deployments: Api<Deployment> = if let Some(ns) = namespace { |  | ||||||
|             Api::namespaced(self.client.clone(), ns) |  | ||||||
|         } else { |  | ||||||
|             Api::default_namespaced(self.client.clone()) |  | ||||||
|         }; |  | ||||||
|         let delete_params = DeleteParams::default(); |  | ||||||
|         deployments.delete(name, &delete_params).await?; |  | ||||||
|         Ok(()) |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     pub async fn wait_until_deployment_ready( |     pub async fn wait_until_deployment_ready( | ||||||
|         &self, |         &self, | ||||||
|         name: String, |         name: String, | ||||||
| @ -138,68 +122,6 @@ impl K8sClient { | |||||||
|         } |         } | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     /// Will execute a commond in the first pod found that matches the specified label
 |  | ||||||
|     /// '{label}={name}'
 |  | ||||||
|     pub async fn exec_app_capture_output( |  | ||||||
|         &self, |  | ||||||
|         name: String, |  | ||||||
|         label: String, |  | ||||||
|         namespace: Option<&str>, |  | ||||||
|         command: Vec<&str>, |  | ||||||
|     ) -> Result<String, String> { |  | ||||||
|         let api: Api<Pod>; |  | ||||||
| 
 |  | ||||||
|         if let Some(ns) = namespace { |  | ||||||
|             api = Api::namespaced(self.client.clone(), ns); |  | ||||||
|         } else { |  | ||||||
|             api = Api::default_namespaced(self.client.clone()); |  | ||||||
|         } |  | ||||||
|         let pod_list = api |  | ||||||
|             .list(&ListParams::default().labels(format!("{label}={name}").as_str())) |  | ||||||
|             .await |  | ||||||
|             .expect("couldn't get list of pods"); |  | ||||||
| 
 |  | ||||||
|         let res = api |  | ||||||
|             .exec( |  | ||||||
|                 pod_list |  | ||||||
|                     .items |  | ||||||
|                     .first() |  | ||||||
|                     .expect("couldn't get pod") |  | ||||||
|                     .name() |  | ||||||
|                     .expect("couldn't get pod name") |  | ||||||
|                     .into_owned() |  | ||||||
|                     .as_str(), |  | ||||||
|                 command, |  | ||||||
|                 &AttachParams::default().stdout(true).stderr(true), |  | ||||||
|             ) |  | ||||||
|             .await; |  | ||||||
|         match res { |  | ||||||
|             Err(e) => Err(e.to_string()), |  | ||||||
|             Ok(mut process) => { |  | ||||||
|                 let status = process |  | ||||||
|                     .take_status() |  | ||||||
|                     .expect("Couldn't get status") |  | ||||||
|                     .await |  | ||||||
|                     .expect("Couldn't unwrap status"); |  | ||||||
| 
 |  | ||||||
|                 if let Some(s) = status.status { |  | ||||||
|                     let mut stdout_buf = String::new(); |  | ||||||
|                     if let Some(mut stdout) = process.stdout().take() { |  | ||||||
|                         stdout.read_to_string(&mut stdout_buf).await; |  | ||||||
|                     } |  | ||||||
|                     debug!("Status: {} - {:?}", s, status.details); |  | ||||||
|                     if s == "Success" { |  | ||||||
|                         Ok(stdout_buf) |  | ||||||
|                     } else { |  | ||||||
|                         Err(s) |  | ||||||
|                     } |  | ||||||
|                 } else { |  | ||||||
|                     Err("Couldn't get inner status of pod exec".to_string()) |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     /// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}`
 |     /// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}`
 | ||||||
|     pub async fn exec_app( |     pub async fn exec_app( | ||||||
|         &self, |         &self, | ||||||
|  | |||||||
| @ -1,69 +1,72 @@ | |||||||
| use std::{ | use std::process::Command; | ||||||
|     process::Command, |  | ||||||
|     sync::Arc, |  | ||||||
|     time::{Duration, Instant}, |  | ||||||
| }; |  | ||||||
| 
 | 
 | ||||||
| use async_trait::async_trait; | use async_trait::async_trait; | ||||||
| use log::{info, warn}; | use serde::Serialize; | ||||||
| use serde::{Deserialize, Serialize}; |  | ||||||
| use tokio::time::sleep; |  | ||||||
| 
 | 
 | ||||||
| use crate::{ | use crate::{ | ||||||
|     data::{Id, Version}, |     data::{Id, Version}, | ||||||
|     interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, |     interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, | ||||||
|     inventory::Inventory, |     inventory::Inventory, | ||||||
|     score::Score, |     score::Score, | ||||||
|     topology::{K8sclient, Topology, k8s::K8sClient}, |     topology::{K8sclient, Topology}, | ||||||
| }; | }; | ||||||
| 
 | 
 | ||||||
| #[derive(Debug, Clone, Serialize)] | #[derive(Debug, Clone, Serialize)] | ||||||
| pub struct CephRemoveOsd { | pub struct PrepCephOsdReplacement { | ||||||
|     osd_deployment_name: String, |     osd_name: String, | ||||||
|     rook_ceph_namespace: String, |     rook_ceph_namespace: String, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd { | impl<T: Topology + K8sclient> Score<T> for PrepCephOsdReplacement { | ||||||
|     fn name(&self) -> String { |     fn name(&self) -> String { | ||||||
|         format!("CephRemoveOsdScore") |         format!("CephOsdReplacementScore") | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     #[doc(hidden)] |     #[doc(hidden)] | ||||||
|     fn create_interpret(&self) -> Box<dyn Interpret<T>> { |     fn create_interpret(&self) -> Box<dyn Interpret<T>> { | ||||||
|         Box::new(CephRemoveOsdInterpret { |         Box::new(PrepCephOsdReplacementInterpret { | ||||||
|             score: self.clone(), |             score: self.clone(), | ||||||
|         }) |         }) | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[derive(Debug, Clone)] | #[derive(Debug, Clone)] | ||||||
| pub struct CephRemoveOsdInterpret { | pub struct PrepCephOsdReplacementInterpret { | ||||||
|     score: CephRemoveOsd, |     score: PrepCephOsdReplacement, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[async_trait] | #[async_trait] | ||||||
| impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret { | impl<T: Topology + K8sclient> Interpret<T> for PrepCephOsdReplacementInterpret { | ||||||
|     async fn execute( |     async fn execute( | ||||||
|         &self, |         &self, | ||||||
|         _inventory: &Inventory, |         _inventory: &Inventory, | ||||||
|         topology: &T, |         topology: &T, | ||||||
|     ) -> Result<Outcome, InterpretError> { |     ) -> Result<Outcome, InterpretError> { | ||||||
|         let client = topology.k8s_client().await.unwrap(); |         let client = topology.k8s_client().await.unwrap(); | ||||||
|         self.verify_ceph_toolbox_exists(client.clone()).await?; |         client | ||||||
|         self.scale_deployment(client.clone()).await?; |             .scale_deployment( | ||||||
|         self.verify_deployment_scaled(client.clone()).await?; |                 &self.score.osd_name, | ||||||
|         self.delete_deployment(client.clone()).await?; |                 Some(&self.score.rook_ceph_namespace), | ||||||
|         self.verify_deployment_deleted(client.clone()).await?; |                 0, | ||||||
|         let osd_id_full = self.get_ceph_osd_id().unwrap(); |             ) | ||||||
|         self.purge_ceph_osd(client.clone(), &osd_id_full).await?; |             .await?; | ||||||
|         self.verify_ceph_osd_removal(client.clone(), &osd_id_full) |         let dep = client | ||||||
|  |             .get_deployment(&self.score.osd_name, Some(&self.score.rook_ceph_namespace)) | ||||||
|             .await?; |             .await?; | ||||||
| 
 | 
 | ||||||
|         Ok(Outcome::success(format!( |         match dep.unwrap().status.and_then(|s| s.ready_replicas) { | ||||||
|             "Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}", |             Some(0) => Ok(Outcome::success( | ||||||
|             osd_id_full, self.score.osd_deployment_name |                 "Successfully prepared rook-ceph-cluster for disk replacement".to_string(), | ||||||
|         ))) |             )), | ||||||
|  |             Some(_) => Err(InterpretError::new({ | ||||||
|  |                 "Deployment still has ready replicas".to_string() | ||||||
|  |             })), | ||||||
|  |             None => Err(InterpretError::new({ | ||||||
|  |                 "Failed to get rook-ceph-cluster status".to_string() | ||||||
|  |             })), | ||||||
|  |         } | ||||||
|     } |     } | ||||||
|  | 
 | ||||||
|     fn get_name(&self) -> InterpretName { |     fn get_name(&self) -> InterpretName { | ||||||
|         todo!() |         todo!() | ||||||
|     } |     } | ||||||
| @ -80,340 +83,3 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret { | |||||||
|         todo!() |         todo!() | ||||||
|     } |     } | ||||||
| } | } | ||||||
| 
 |  | ||||||
| impl CephRemoveOsdInterpret { |  | ||||||
|     pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> { |  | ||||||
|         let osd_id_numeric = self |  | ||||||
|             .score |  | ||||||
|             .osd_deployment_name |  | ||||||
|             .split('-') |  | ||||||
|             .nth(3) |  | ||||||
|             .ok_or_else(|| { |  | ||||||
|                 InterpretError::new(format!( |  | ||||||
|                     "Could not parse OSD id from deployment name {}", |  | ||||||
|                     self.score.osd_deployment_name |  | ||||||
|                 )) |  | ||||||
|             })?; |  | ||||||
|         let osd_id_full = format!("osd.{}", osd_id_numeric); |  | ||||||
| 
 |  | ||||||
|         info!( |  | ||||||
|             "Targeting Ceph OSD: {} (parsed from deployment {})", |  | ||||||
|             osd_id_full, self.score.osd_deployment_name |  | ||||||
|         ); |  | ||||||
| 
 |  | ||||||
|         Ok(osd_id_full) |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     pub async fn verify_ceph_toolbox_exists( |  | ||||||
|         &self, |  | ||||||
|         client: Arc<K8sClient>, |  | ||||||
|     ) -> Result<Outcome, InterpretError> { |  | ||||||
|         let toolbox_dep = "rook-ceph-tools".to_string(); |  | ||||||
| 
 |  | ||||||
|         match client |  | ||||||
|             .get_deployment(&toolbox_dep, Some(&self.score.rook_ceph_namespace)) |  | ||||||
|             .await |  | ||||||
|         { |  | ||||||
|             Ok(Some(deployment)) => { |  | ||||||
|                 if let Some(status) = deployment.status { |  | ||||||
|                     let ready_count = status.ready_replicas.unwrap_or(0); |  | ||||||
|                     if ready_count >= 1 { |  | ||||||
|                         return Ok(Outcome::success(format!( |  | ||||||
|                             "'{}' is ready with {} replica(s).", |  | ||||||
|                             &toolbox_dep, ready_count |  | ||||||
|                         ))); |  | ||||||
|                     } else { |  | ||||||
|                         return Err(InterpretError::new( |  | ||||||
|                             "ceph-tool-box not ready in cluster".to_string(), |  | ||||||
|                         )); |  | ||||||
|                     } |  | ||||||
|                 } else { |  | ||||||
|                     Err(InterpretError::new(format!( |  | ||||||
|                         "failed to get deployment status {}", |  | ||||||
|                         &toolbox_dep |  | ||||||
|                     ))) |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
|             Ok(None) => Err(InterpretError::new(format!( |  | ||||||
|                 "Deployment '{}' not found in namespace '{}'.", |  | ||||||
|                 &toolbox_dep, self.score.rook_ceph_namespace |  | ||||||
|             ))), |  | ||||||
|             Err(e) => Err(InterpretError::new(format!( |  | ||||||
|                 "Failed to query for deployment '{}': {}", |  | ||||||
|                 &toolbox_dep, e |  | ||||||
|             ))), |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     pub async fn scale_deployment( |  | ||||||
|         &self, |  | ||||||
|         client: Arc<K8sClient>, |  | ||||||
|     ) -> Result<Outcome, InterpretError> { |  | ||||||
|         info!( |  | ||||||
|             "Scaling down OSD deployment: {}", |  | ||||||
|             self.score.osd_deployment_name |  | ||||||
|         ); |  | ||||||
|         client |  | ||||||
|             .scale_deployment( |  | ||||||
|                 &self.score.osd_deployment_name, |  | ||||||
|                 Some(&self.score.rook_ceph_namespace), |  | ||||||
|                 0, |  | ||||||
|             ) |  | ||||||
|             .await?; |  | ||||||
|         Ok(Outcome::success(format!( |  | ||||||
|             "Scaled down deployment {}", |  | ||||||
|             self.score.osd_deployment_name |  | ||||||
|         ))) |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     pub async fn verify_deployment_scaled( |  | ||||||
|         &self, |  | ||||||
|         client: Arc<K8sClient>, |  | ||||||
|     ) -> Result<Outcome, InterpretError> { |  | ||||||
|         let (timeout, interval, start) = self.build_timer(); |  | ||||||
| 
 |  | ||||||
|         info!("Waiting for OSD deployment to scale down to 0 replicas"); |  | ||||||
|         loop { |  | ||||||
|             let dep = client |  | ||||||
|                 .get_deployment( |  | ||||||
|                     &self.score.osd_deployment_name, |  | ||||||
|                     Some(&self.score.rook_ceph_namespace), |  | ||||||
|                 ) |  | ||||||
|                 .await?; |  | ||||||
| 
 |  | ||||||
|             if let Some(deployment) = dep { |  | ||||||
|                 if let Some(status) = deployment.status { |  | ||||||
|                     if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0 |  | ||||||
|                     { |  | ||||||
|                         return Ok(Outcome::success( |  | ||||||
|                             "Deployment successfully scaled down.".to_string(), |  | ||||||
|                         )); |  | ||||||
|                     } |  | ||||||
|                 } |  | ||||||
|             } |  | ||||||
| 
 |  | ||||||
|             if start.elapsed() > timeout { |  | ||||||
|                 return Err(InterpretError::new(format!( |  | ||||||
|                     "Timed out waiting for deployment {} to scale down", |  | ||||||
|                     self.score.osd_deployment_name |  | ||||||
|                 ))); |  | ||||||
|             } |  | ||||||
|             sleep(interval).await; |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     fn build_timer(&self) -> (Duration, Duration, Instant) { |  | ||||||
|         let timeout = Duration::from_secs(120); |  | ||||||
|         let interval = Duration::from_secs(5); |  | ||||||
|         let start = Instant::now(); |  | ||||||
|         (timeout, interval, start) |  | ||||||
|     } |  | ||||||
|     pub async fn delete_deployment( |  | ||||||
|         &self, |  | ||||||
|         client: Arc<K8sClient>, |  | ||||||
|     ) -> Result<Outcome, InterpretError> { |  | ||||||
|         info!( |  | ||||||
|             "Deleting OSD deployment: {}", |  | ||||||
|             self.score.osd_deployment_name |  | ||||||
|         ); |  | ||||||
|         client |  | ||||||
|             .delete_deployment( |  | ||||||
|                 &self.score.osd_deployment_name, |  | ||||||
|                 Some(&self.score.rook_ceph_namespace), |  | ||||||
|             ) |  | ||||||
|             .await?; |  | ||||||
|         Ok(Outcome::success(format!( |  | ||||||
|             "deployment {} deleted", |  | ||||||
|             self.score.osd_deployment_name |  | ||||||
|         ))) |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     pub async fn verify_deployment_deleted( |  | ||||||
|         &self, |  | ||||||
|         client: Arc<K8sClient>, |  | ||||||
|     ) -> Result<Outcome, InterpretError> { |  | ||||||
|         let (timeout, interval, start) = self.build_timer(); |  | ||||||
| 
 |  | ||||||
|         info!("Waiting for OSD deployment to scale down to 0 replicas"); |  | ||||||
|         loop { |  | ||||||
|             let dep = client |  | ||||||
|                 .get_deployment( |  | ||||||
|                     &self.score.osd_deployment_name, |  | ||||||
|                     Some(&self.score.rook_ceph_namespace), |  | ||||||
|                 ) |  | ||||||
|                 .await?; |  | ||||||
| 
 |  | ||||||
|             if dep.is_none() { |  | ||||||
|                 info!( |  | ||||||
|                     "Deployment {} successfully deleted.", |  | ||||||
|                     self.score.osd_deployment_name |  | ||||||
|                 ); |  | ||||||
|                 return Ok(Outcome::success(format!( |  | ||||||
|                     "Deployment {} deleted.", |  | ||||||
|                     self.score.osd_deployment_name |  | ||||||
|                 ))); |  | ||||||
|             } |  | ||||||
| 
 |  | ||||||
|             if start.elapsed() > timeout { |  | ||||||
|                 return Err(InterpretError::new(format!( |  | ||||||
|                     "Timed out waiting for deployment {} to be deleted", |  | ||||||
|                     self.score.osd_deployment_name |  | ||||||
|                 ))); |  | ||||||
|             } |  | ||||||
|             sleep(interval).await; |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     fn get_osd_tree(&self, json: serde_json::Value) -> Result<CephOsdTree, InterpretError> { |  | ||||||
|         let nodes = json.get("nodes").ok_or_else(|| { |  | ||||||
|             InterpretError::new("Missing 'nodes' field in ceph osd tree JSON".to_string()) |  | ||||||
|         })?; |  | ||||||
|         let tree: CephOsdTree = CephOsdTree { |  | ||||||
|             nodes: serde_json::from_value(nodes.clone()).map_err(|e| { |  | ||||||
|                 InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e)) |  | ||||||
|             })?, |  | ||||||
|         }; |  | ||||||
|         Ok(tree) |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     pub async fn purge_ceph_osd( |  | ||||||
|         &self, |  | ||||||
|         client: Arc<K8sClient>, |  | ||||||
|         osd_id_full: &str, |  | ||||||
|     ) -> Result<Outcome, InterpretError> { |  | ||||||
|         info!( |  | ||||||
|             "Purging OSD {} from Ceph cluster and removing its auth key", |  | ||||||
|             osd_id_full |  | ||||||
|         ); |  | ||||||
|         client |  | ||||||
|             .exec_app_capture_output( |  | ||||||
|                 "rook-ceph-tools".to_string(), |  | ||||||
|                 "app".to_string(), |  | ||||||
|                 Some(&self.score.rook_ceph_namespace), |  | ||||||
|                 vec![ |  | ||||||
|                     format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(), |  | ||||||
|                     format!("ceph auth del osd.{osd_id_full}").as_str(), |  | ||||||
|                 ], |  | ||||||
|             ) |  | ||||||
|             .await?; |  | ||||||
|         Ok(Outcome::success(format!( |  | ||||||
|             "osd id {} removed from osd tree", |  | ||||||
|             osd_id_full |  | ||||||
|         ))) |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     pub async fn verify_ceph_osd_removal( |  | ||||||
|         &self, |  | ||||||
|         client: Arc<K8sClient>, |  | ||||||
|         osd_id_full: &str, |  | ||||||
|     ) -> Result<Outcome, InterpretError> { |  | ||||||
|         let (timeout, interval, start) = self.build_timer(); |  | ||||||
|         info!( |  | ||||||
|             "Verifying OSD {} has been removed from the Ceph tree...", |  | ||||||
|             osd_id_full |  | ||||||
|         ); |  | ||||||
|         loop { |  | ||||||
|             let output = client |  | ||||||
|                 .exec_app_capture_output( |  | ||||||
|                     "rook-ceph-tools".to_string(), |  | ||||||
|                     "app".to_string(), |  | ||||||
|                     Some(&self.score.rook_ceph_namespace), |  | ||||||
|                     vec!["ceph osd tree -f json"], |  | ||||||
|                 ) |  | ||||||
|                 .await?; |  | ||||||
|             let tree = |  | ||||||
|                 self.get_osd_tree(serde_json::from_str(&output).expect("could not extract json")); |  | ||||||
| 
 |  | ||||||
|             let osd_found = tree |  | ||||||
|                 .unwrap() |  | ||||||
|                 .nodes |  | ||||||
|                 .iter() |  | ||||||
|                 .any(|node| node.name == osd_id_full); |  | ||||||
| 
 |  | ||||||
|             if !osd_found { |  | ||||||
|                 return Ok(Outcome::success(format!( |  | ||||||
|                     "Successfully verified that OSD {} is removed from the Ceph cluster.", |  | ||||||
|                     osd_id_full, |  | ||||||
|                 ))); |  | ||||||
|             } |  | ||||||
| 
 |  | ||||||
|             if start.elapsed() > timeout { |  | ||||||
|                 return Err(InterpretError::new(format!( |  | ||||||
|                     "Timed out waiting for OSD {} to be removed from Ceph tree", |  | ||||||
|                     osd_id_full |  | ||||||
|                 ))); |  | ||||||
|             } |  | ||||||
| 
 |  | ||||||
|             warn!( |  | ||||||
|                 "OSD {} still found in Ceph tree, retrying in {:?}...", |  | ||||||
|                 osd_id_full, interval |  | ||||||
|             ); |  | ||||||
|             sleep(interval).await; |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| #[derive(Debug, Deserialize, PartialEq)] |  | ||||||
| pub struct CephOsdTree { |  | ||||||
|     pub nodes: Vec<CephNode>, |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| #[derive(Debug, Deserialize, PartialEq)] |  | ||||||
| pub struct CephNode { |  | ||||||
|     pub id: i32, |  | ||||||
|     pub name: String, |  | ||||||
|     #[serde(rename = "type")] |  | ||||||
|     pub node_type: String, |  | ||||||
|     pub type_id: Option<i32>, |  | ||||||
|     pub children: Option<Vec<i32>>, |  | ||||||
|     pub exists: Option<i32>, |  | ||||||
|     pub status: Option<String>, |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| #[cfg(test)] |  | ||||||
| mod tests { |  | ||||||
|     use serde_json::json; |  | ||||||
| 
 |  | ||||||
|     use super::*; |  | ||||||
| 
 |  | ||||||
|     #[test] |  | ||||||
|     fn test_get_osd_tree() { |  | ||||||
|         let json_data = json!({ |  | ||||||
|             "nodes": [ |  | ||||||
|                 {"id": 1, "name": "osd.1", "type": "osd", "primary_affinity":"1"}, |  | ||||||
|                 {"id": 2, "name": "osd.2", "type": "osd", "crush_weight": 1.22344} |  | ||||||
|             ] |  | ||||||
|         }); |  | ||||||
|         let interpret = CephRemoveOsdInterpret { |  | ||||||
|             score: CephRemoveOsd { |  | ||||||
|                 osd_deployment_name: "osd-1".to_string(), |  | ||||||
|                 rook_ceph_namespace: "dummy_ns".to_string(), |  | ||||||
|             }, |  | ||||||
|         }; |  | ||||||
|         let json = interpret.get_osd_tree(json_data).unwrap(); |  | ||||||
| 
 |  | ||||||
|         let expected = CephOsdTree { |  | ||||||
|             nodes: vec![ |  | ||||||
|                 CephNode { |  | ||||||
|                     id: 1, |  | ||||||
|                     name: "osd.1".to_string(), |  | ||||||
|                     node_type: "osd".to_string(), |  | ||||||
|                     type_id: None, |  | ||||||
|                     children: None, |  | ||||||
|                     exists: None, |  | ||||||
|                     status: None, |  | ||||||
|                 }, |  | ||||||
|                 CephNode { |  | ||||||
|                     id: 2, |  | ||||||
|                     name: "osd.2".to_string(), |  | ||||||
|                     node_type: "osd".to_string(), |  | ||||||
|                     type_id: None, |  | ||||||
|                     children: None, |  | ||||||
|                     exists: None, |  | ||||||
|                     status: None, |  | ||||||
|                 }, |  | ||||||
|             ], |  | ||||||
|         }; |  | ||||||
| 
 |  | ||||||
|         assert_eq!(json, expected); |  | ||||||
|     } |  | ||||||
| } |  | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user