From b43ca7c7408ffa0087e6229db2c04cc8be40d175 Mon Sep 17 00:00:00 2001 From: Willem Date: Fri, 15 Aug 2025 14:51:16 -0400 Subject: [PATCH 1/4] feat: score for preparing rook ceph cluster to remove drive based on rook-ceph-osd deployment name added functions to K8sclient to be able to scale deployment to a desired replicaset number and get pod based on name and namespace --- harmony/src/domain/topology/k8s.rs | 33 ++++++++ harmony/src/modules/mod.rs | 1 + .../ceph/ceph_osd_replacement_score.rs | 79 +++++++++++++++++++ harmony/src/modules/storage/ceph/mod.rs | 1 + harmony/src/modules/storage/mod.rs | 1 + 5 files changed, 115 insertions(+) create mode 100644 harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs create mode 100644 harmony/src/modules/storage/ceph/mod.rs create mode 100644 harmony/src/modules/storage/mod.rs diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 388ff9d..1eae139 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -17,6 +17,7 @@ use kube::{ }; use log::{debug, error, trace}; use serde::{Serialize, de::DeserializeOwned}; +use serde_json::json; use similar::TextDiff; #[derive(new, Clone)] @@ -51,6 +52,38 @@ impl K8sClient { }) } + pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result, Error> { + let pods: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + Ok(pods.get_opt(name).await?) + } + + pub async fn scale_deployment( + &self, + name: &str, + namespace: Option<&str>, + replicas: u32, + ) -> Result<(), Error> { + let deployments: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + + let patch = json!({ + "spec": { + "replicas": replicas + } + }); + let pp = PatchParams::default(); + let scale = Patch::Apply(&patch); + deployments.patch_scale(name, &pp, &scale).await?; + Ok(()) + } + pub async fn wait_until_deployment_ready( &self, name: String, diff --git a/harmony/src/modules/mod.rs b/harmony/src/modules/mod.rs index e9b6c52..6df5c41 100644 --- a/harmony/src/modules/mod.rs +++ b/harmony/src/modules/mod.rs @@ -14,5 +14,6 @@ pub mod monitoring; pub mod okd; pub mod opnsense; pub mod prometheus; +pub mod storage; pub mod tenant; pub mod tftp; diff --git a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs new file mode 100644 index 0000000..0223e62 --- /dev/null +++ b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs @@ -0,0 +1,79 @@ +use std::process::Command; + +use async_trait::async_trait; +use serde::Serialize; + +use crate::{ + data::{Id, Version}, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + score::Score, + topology::{K8sclient, Topology}, +}; + +#[derive(Debug, Clone, Serialize)] +pub struct PrepCephOsdReplacement { + osd_name: String, + rook_ceph_namespace: String, +} + +impl Score for PrepCephOsdReplacement { + fn name(&self) -> String { + format!("CephOsdReplacementScore") + } + + #[doc(hidden)] + fn create_interpret(&self) -> Box> { + Box::new(PrepCephOsdReplacementInterpret { + score: self.clone(), + }) + } +} + +#[derive(Debug, Clone)] +pub struct PrepCephOsdReplacementInterpret { + score: PrepCephOsdReplacement, +} + +#[async_trait] +impl Interpret for PrepCephOsdReplacementInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + ) -> Result { + let client = topology.k8s_client().await.unwrap(); + client + .scale_deployment( + &self.score.osd_name, + Some(&self.score.rook_ceph_namespace), + 0, + ) + .await?; + client + .get_pod(&self.score.osd_name, Some(&self.score.rook_ceph_namespace)) + .await?; + + Ok(Outcome::success( + "Successfully prepared rook-ceph-cluster for disk replacement".to_string(), + )) + } + + fn get_name(&self) -> InterpretName { + todo!() + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +} + +impl PrepCephOsdReplacementInterpret {} diff --git a/harmony/src/modules/storage/ceph/mod.rs b/harmony/src/modules/storage/ceph/mod.rs new file mode 100644 index 0000000..a993c3d --- /dev/null +++ b/harmony/src/modules/storage/ceph/mod.rs @@ -0,0 +1 @@ +pub mod ceph_osd_replacement_score; diff --git a/harmony/src/modules/storage/mod.rs b/harmony/src/modules/storage/mod.rs new file mode 100644 index 0000000..ee3e235 --- /dev/null +++ b/harmony/src/modules/storage/mod.rs @@ -0,0 +1 @@ +pub mod ceph; From d1a274b7054d1821d233bde5396339e7329d6e10 Mon Sep 17 00:00:00 2001 From: Willem Date: Fri, 15 Aug 2025 15:44:06 -0400 Subject: [PATCH 2/4] fix: checks deployment status ready replicas rather than pod name since the pod name is not necessarily matching the deployment name and often has a random generated number in it --- harmony/src/domain/topology/k8s.rs | 13 ++++++++++++ .../ceph/ceph_osd_replacement_score.rs | 20 ++++++++++++------- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 1eae139..30b335f 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -52,6 +52,19 @@ impl K8sClient { }) } + pub async fn get_deployment( + &self, + name: &str, + namespace: Option<&str>, + ) -> Result, Error> { + let deps: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + Ok(deps.get_opt(name).await?) + } + pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result, Error> { let pods: Api = if let Some(ns) = namespace { Api::namespaced(self.client.clone(), ns) diff --git a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs index 0223e62..173cca4 100644 --- a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs +++ b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs @@ -50,13 +50,21 @@ impl Interpret for PrepCephOsdReplacementInterpret { 0, ) .await?; - client - .get_pod(&self.score.osd_name, Some(&self.score.rook_ceph_namespace)) + let dep = client + .get_deployment(&self.score.osd_name, Some(&self.score.rook_ceph_namespace)) .await?; - Ok(Outcome::success( - "Successfully prepared rook-ceph-cluster for disk replacement".to_string(), - )) + match dep.unwrap().status.and_then(|s| s.ready_replicas) { + Some(0) => Ok(Outcome::success( + "Successfully prepared rook-ceph-cluster for disk replacement".to_string(), + )), + Some(_) => Err(InterpretError::new({ + "Deployment still has ready replicas".to_string() + })), + None => Err(InterpretError::new({ + "Failed to get rook-ceph-cluster status".to_string() + })), + } } fn get_name(&self) -> InterpretName { @@ -75,5 +83,3 @@ impl Interpret for PrepCephOsdReplacementInterpret { todo!() } } - -impl PrepCephOsdReplacementInterpret {} From 89eb88d10ee257899df51d96f24463135f756c33 Mon Sep 17 00:00:00 2001 From: Willem Date: Wed, 20 Aug 2025 12:09:55 -0400 Subject: [PATCH 3/4] feat: socre to remove an osd from the ceph osd tree using K8sClient to interact with rook-ceph-toolbox pod --- harmony/src/domain/topology/k8s.rs | 80 +++- .../ceph/ceph_osd_replacement_score.rs | 355 ++++++++++++++++-- 2 files changed, 402 insertions(+), 33 deletions(-) diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 30b335f..1887c33 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -5,7 +5,7 @@ use k8s_openapi::{ }; use kube::{ Client, Config, Error, Resource, - api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt}, + api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt}, config::{KubeConfigOptions, Kubeconfig}, core::ErrorResponse, runtime::reflector::Lookup, @@ -19,6 +19,7 @@ use log::{debug, error, trace}; use serde::{Serialize, de::DeserializeOwned}; use serde_json::json; use similar::TextDiff; +use tokio::io::AsyncReadExt; #[derive(new, Clone)] pub struct K8sClient { @@ -97,6 +98,21 @@ impl K8sClient { Ok(()) } + pub async fn delete_deployment( + &self, + name: &str, + namespace: Option<&str>, + ) -> Result<(), Error> { + let deployments: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + let delete_params = DeleteParams::default(); + deployments.delete(name, &delete_params).await?; + Ok(()) + } + pub async fn wait_until_deployment_ready( &self, name: String, @@ -122,6 +138,68 @@ impl K8sClient { } } + /// Will execute a commond in the first pod found that matches the specified label + /// '{label}={name}' + pub async fn exec_app_capture_output( + &self, + name: String, + label: String, + namespace: Option<&str>, + command: Vec<&str>, + ) -> Result { + let api: Api; + + if let Some(ns) = namespace { + api = Api::namespaced(self.client.clone(), ns); + } else { + api = Api::default_namespaced(self.client.clone()); + } + let pod_list = api + .list(&ListParams::default().labels(format!("{label}={name}").as_str())) + .await + .expect("couldn't get list of pods"); + + let res = api + .exec( + pod_list + .items + .first() + .expect("couldn't get pod") + .name() + .expect("couldn't get pod name") + .into_owned() + .as_str(), + command, + &AttachParams::default().stdout(true).stderr(true), + ) + .await; + match res { + Err(e) => Err(e.to_string()), + Ok(mut process) => { + let status = process + .take_status() + .expect("Couldn't get status") + .await + .expect("Couldn't unwrap status"); + + if let Some(s) = status.status { + let mut stdout_buf = String::new(); + if let Some(mut stdout) = process.stdout().take() { + stdout.read_to_string(&mut stdout_buf).await; + } + debug!("Status: {} - {:?}", s, status.details); + if s == "Success" { + Ok(stdout_buf) + } else { + Err(s) + } + } else { + Err("Couldn't get inner status of pod exec".to_string()) + } + } + } + } + /// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}` pub async fn exec_app( &self, diff --git a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs index 173cca4..a6a85f5 100644 --- a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs +++ b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs @@ -1,72 +1,68 @@ -use std::process::Command; +use std::{ + process::Command, + sync::Arc, + time::{Duration, Instant}, +}; use async_trait::async_trait; -use serde::Serialize; +use log::{info, warn}; +use serde::{Deserialize, Serialize}; +use tokio::time::sleep; use crate::{ data::{Id, Version}, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, score::Score, - topology::{K8sclient, Topology}, + topology::{K8sclient, Topology, k8s::K8sClient}, }; #[derive(Debug, Clone, Serialize)] -pub struct PrepCephOsdReplacement { - osd_name: String, +pub struct CephRemoveOsd { + osd_deployment_name: String, rook_ceph_namespace: String, } -impl Score for PrepCephOsdReplacement { +impl Score for CephRemoveOsd { fn name(&self) -> String { - format!("CephOsdReplacementScore") + format!("CephRemoveOsdScore") } #[doc(hidden)] fn create_interpret(&self) -> Box> { - Box::new(PrepCephOsdReplacementInterpret { + Box::new(CephRemoveOsdInterpret { score: self.clone(), }) } } #[derive(Debug, Clone)] -pub struct PrepCephOsdReplacementInterpret { - score: PrepCephOsdReplacement, +pub struct CephRemoveOsdInterpret { + score: CephRemoveOsd, } #[async_trait] -impl Interpret for PrepCephOsdReplacementInterpret { +impl Interpret for CephRemoveOsdInterpret { async fn execute( &self, _inventory: &Inventory, topology: &T, ) -> Result { let client = topology.k8s_client().await.unwrap(); - client - .scale_deployment( - &self.score.osd_name, - Some(&self.score.rook_ceph_namespace), - 0, - ) - .await?; - let dep = client - .get_deployment(&self.score.osd_name, Some(&self.score.rook_ceph_namespace)) + self.scale_deployment(client.clone()).await?; + self.verify_deployment_scaled(client.clone()).await?; + self.delete_deployment(client.clone()).await?; + self.verify_deployment_deleted(client.clone()).await?; + let osd_id_full = self.get_ceph_osd_id().unwrap(); + self.purge_ceph_osd(client.clone(), &osd_id_full).await?; + self.verify_ceph_osd_removal(client.clone(), &osd_id_full) .await?; - match dep.unwrap().status.and_then(|s| s.ready_replicas) { - Some(0) => Ok(Outcome::success( - "Successfully prepared rook-ceph-cluster for disk replacement".to_string(), - )), - Some(_) => Err(InterpretError::new({ - "Deployment still has ready replicas".to_string() - })), - None => Err(InterpretError::new({ - "Failed to get rook-ceph-cluster status".to_string() - })), - } + Ok(Outcome::success(format!( + "Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}", + osd_id_full, self.score.osd_deployment_name + ))) } - fn get_name(&self) -> InterpretName { todo!() } @@ -83,3 +79,298 @@ impl Interpret for PrepCephOsdReplacementInterpret { todo!() } } + +impl CephRemoveOsdInterpret { + pub fn get_ceph_osd_id(&self) -> Result { + let osd_id_numeric = self + .score + .osd_deployment_name + .split('-') + .nth(3) + .ok_or_else(|| { + InterpretError::new(format!( + "Could not parse OSD id from deployment name {}", + self.score.osd_deployment_name + )) + })?; + let osd_id_full = format!("osd.{}", osd_id_numeric); + + info!( + "Targeting Ceph OSD: {} (parsed from deployment {})", + osd_id_full, self.score.osd_deployment_name + ); + + Ok(osd_id_full) + } + + fn build_timer(&self) -> (Duration, Duration, Instant) { + let timeout = Duration::from_secs(120); + let interval = Duration::from_secs(5); + let start = Instant::now(); + (timeout, interval, start) + } + + pub async fn scale_deployment( + &self, + client: Arc, + ) -> Result { + info!( + "Scaling down OSD deployment: {}", + self.score.osd_deployment_name + ); + client + .scale_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + 0, + ) + .await?; + Ok(Outcome::success(format!( + "Scaled down deployment {}", + self.score.osd_deployment_name + ))) + } + + pub async fn verify_deployment_scaled( + &self, + client: Arc, + ) -> Result { + let (timeout, interval, start) = self.build_timer(); + + info!("Waiting for OSD deployment to scale down to 0 replicas"); + loop { + let dep = client + .get_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + ) + .await?; + + if let Some(deployment) = dep { + if let Some(status) = deployment.status { + if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0 + { + return Ok(Outcome::success( + "Deployment successfully scaled down.".to_string(), + )); + } + } + } + + if start.elapsed() > timeout { + return Err(InterpretError::new(format!( + "Timed out waiting for deployment {} to scale down", + self.score.osd_deployment_name + ))); + } + sleep(interval).await; + } + } + + pub async fn delete_deployment( + &self, + client: Arc, + ) -> Result { + info!( + "Deleting OSD deployment: {}", + self.score.osd_deployment_name + ); + client + .delete_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + ) + .await?; + Ok(Outcome::success(format!( + "deployment {} deleted", + self.score.osd_deployment_name + ))) + } + + pub async fn verify_deployment_deleted( + &self, + client: Arc, + ) -> Result { + let (timeout, interval, start) = self.build_timer(); + + info!("Waiting for OSD deployment to scale down to 0 replicas"); + loop { + let dep = client + .get_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + ) + .await?; + + if dep.is_none() { + info!( + "Deployment {} successfully deleted.", + self.score.osd_deployment_name + ); + return Ok(Outcome::success(format!( + "Deployment {} deleted.", + self.score.osd_deployment_name + ))); + } + + if start.elapsed() > timeout { + return Err(InterpretError::new(format!( + "Timed out waiting for deployment {} to be deleted", + self.score.osd_deployment_name + ))); + } + sleep(interval).await; + } + } + + fn get_osd_tree(&self, json: serde_json::Value) -> Result { + let nodes = json.get("nodes").ok_or_else(|| { + InterpretError::new("Missing 'nodes' field in ceph osd tree JSON".to_string()) + })?; + let tree: CephOsdTree = CephOsdTree{ nodes: serde_json::from_value(nodes.clone()).map_err(|e| { + InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e)) + })?,}; + Ok(tree) + } + + pub async fn purge_ceph_osd( + &self, + client: Arc, + osd_id_full: &str, + ) -> Result { + info!( + "Purging OSD {} from Ceph cluster and removing its auth key", + osd_id_full + ); + client + .exec_app_capture_output( + "rook-ceph-tools".to_string(), + "app".to_string(), + Some(&self.score.rook_ceph_namespace), + vec![ + format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(), + format!("ceph auth del osd.{osd_id_full}").as_str(), + ], + ) + .await?; + Ok(Outcome::success(format!( + "osd id {} removed from osd tree", + osd_id_full + ))) + } + + pub async fn verify_ceph_osd_removal( + &self, + client: Arc, + osd_id_full: &str, + ) -> Result { + let (timeout, interval, start) = self.build_timer(); + info!( + "Verifying OSD {} has been removed from the Ceph tree...", + osd_id_full + ); + loop { + let output = client + .exec_app_capture_output( + "rook-ceph-tools".to_string(), + "app".to_string(), + Some(&self.score.rook_ceph_namespace), + vec!["ceph osd tree -f json"], + ) + .await?; + let tree = + self.get_osd_tree(serde_json::from_str(&output).expect("could not extract json")); + + let osd_found = tree + .unwrap() + .nodes + .iter() + .any(|node| node.name == osd_id_full); + + if !osd_found { + return Ok(Outcome::success(format!( + "Successfully verified that OSD {} is removed from the Ceph cluster.", + osd_id_full, + ))); + } + + if start.elapsed() > timeout { + return Err(InterpretError::new(format!( + "Timed out waiting for OSD {} to be removed from Ceph tree", + osd_id_full + ))); + } + + warn!( + "OSD {} still found in Ceph tree, retrying in {:?}...", + osd_id_full, interval + ); + sleep(interval).await; + } + } +} +#[derive(Debug, Deserialize, PartialEq)] +pub struct CephOsdTree { + pub nodes: Vec, +} + +#[derive(Debug, Deserialize, PartialEq)] +pub struct CephNode { + pub id: i32, + pub name: String, + #[serde(rename = "type")] + pub node_type: String, + pub type_id: Option, + pub children: Option>, + pub exists: Option, + pub status: Option, +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + + #[test] + fn test_get_osd_tree() { + let json_data = json!({ + "nodes": [ + {"id": 1, "name": "osd.1", "type": "osd", "primary_affinity":"1"}, + {"id": 2, "name": "osd.2", "type": "osd", "crush_weight": 1.22344} + ] + }); + let interpret = CephRemoveOsdInterpret { + score: CephRemoveOsd { + osd_deployment_name: "osd-1".to_string(), + rook_ceph_namespace: "dummy_ns".to_string(), + }, + }; + let json = interpret.get_osd_tree(json_data).unwrap(); + + let expected = CephOsdTree { + nodes: vec![ + CephNode { + id: 1, + name: "osd.1".to_string(), + node_type: "osd".to_string(), + type_id: None, + children: None, + exists: None, + status: None, + }, + CephNode { + id: 2, + name: "osd.2".to_string(), + node_type: "osd".to_string(), + type_id: None, + children: None, + exists: None, + status: None, + }, + ], + }; + + assert_eq!(json, expected); + } +} From cd3ea6fc10bbaf1f20e8e1645094903022aa8ef5 Mon Sep 17 00:00:00 2001 From: Willem Date: Wed, 20 Aug 2025 12:54:19 -0400 Subject: [PATCH 4/4] fix: added check to ensure that rook-ceph-tools is available in the designated namespace --- .../ceph/ceph_osd_replacement_score.rs | 59 ++++++++++++++++--- 1 file changed, 51 insertions(+), 8 deletions(-) diff --git a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs index a6a85f5..a4b0cb0 100644 --- a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs +++ b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs @@ -49,6 +49,7 @@ impl Interpret for CephRemoveOsdInterpret { topology: &T, ) -> Result { let client = topology.k8s_client().await.unwrap(); + self.verify_ceph_toolbox_exists(client.clone()).await?; self.scale_deployment(client.clone()).await?; self.verify_deployment_scaled(client.clone()).await?; self.delete_deployment(client.clone()).await?; @@ -103,11 +104,45 @@ impl CephRemoveOsdInterpret { Ok(osd_id_full) } - fn build_timer(&self) -> (Duration, Duration, Instant) { - let timeout = Duration::from_secs(120); - let interval = Duration::from_secs(5); - let start = Instant::now(); - (timeout, interval, start) + pub async fn verify_ceph_toolbox_exists( + &self, + client: Arc, + ) -> Result { + let toolbox_dep = "rook-ceph-tools".to_string(); + + match client + .get_deployment(&toolbox_dep, Some(&self.score.rook_ceph_namespace)) + .await + { + Ok(Some(deployment)) => { + if let Some(status) = deployment.status { + let ready_count = status.ready_replicas.unwrap_or(0); + if ready_count >= 1 { + return Ok(Outcome::success(format!( + "'{}' is ready with {} replica(s).", + &toolbox_dep, ready_count + ))); + } else { + return Err(InterpretError::new( + "ceph-tool-box not ready in cluster".to_string(), + )); + } + } else { + Err(InterpretError::new(format!( + "failed to get deployment status {}", + &toolbox_dep + ))) + } + } + Ok(None) => Err(InterpretError::new(format!( + "Deployment '{}' not found in namespace '{}'.", + &toolbox_dep, self.score.rook_ceph_namespace + ))), + Err(e) => Err(InterpretError::new(format!( + "Failed to query for deployment '{}': {}", + &toolbox_dep, e + ))), + } } pub async fn scale_deployment( @@ -167,6 +202,12 @@ impl CephRemoveOsdInterpret { } } + fn build_timer(&self) -> (Duration, Duration, Instant) { + let timeout = Duration::from_secs(120); + let interval = Duration::from_secs(5); + let start = Instant::now(); + (timeout, interval, start) + } pub async fn delete_deployment( &self, client: Arc, @@ -227,9 +268,11 @@ impl CephRemoveOsdInterpret { let nodes = json.get("nodes").ok_or_else(|| { InterpretError::new("Missing 'nodes' field in ceph osd tree JSON".to_string()) })?; - let tree: CephOsdTree = CephOsdTree{ nodes: serde_json::from_value(nodes.clone()).map_err(|e| { - InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e)) - })?,}; + let tree: CephOsdTree = CephOsdTree { + nodes: serde_json::from_value(nodes.clone()).map_err(|e| { + InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e)) + })?, + }; Ok(tree) }