From 89eb88d10ee257899df51d96f24463135f756c33 Mon Sep 17 00:00:00 2001 From: Willem Date: Wed, 20 Aug 2025 12:09:55 -0400 Subject: [PATCH] feat: socre to remove an osd from the ceph osd tree using K8sClient to interact with rook-ceph-toolbox pod --- harmony/src/domain/topology/k8s.rs | 80 +++- .../ceph/ceph_osd_replacement_score.rs | 355 ++++++++++++++++-- 2 files changed, 402 insertions(+), 33 deletions(-) diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 30b335f..1887c33 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -5,7 +5,7 @@ use k8s_openapi::{ }; use kube::{ Client, Config, Error, Resource, - api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt}, + api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt}, config::{KubeConfigOptions, Kubeconfig}, core::ErrorResponse, runtime::reflector::Lookup, @@ -19,6 +19,7 @@ use log::{debug, error, trace}; use serde::{Serialize, de::DeserializeOwned}; use serde_json::json; use similar::TextDiff; +use tokio::io::AsyncReadExt; #[derive(new, Clone)] pub struct K8sClient { @@ -97,6 +98,21 @@ impl K8sClient { Ok(()) } + pub async fn delete_deployment( + &self, + name: &str, + namespace: Option<&str>, + ) -> Result<(), Error> { + let deployments: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + let delete_params = DeleteParams::default(); + deployments.delete(name, &delete_params).await?; + Ok(()) + } + pub async fn wait_until_deployment_ready( &self, name: String, @@ -122,6 +138,68 @@ impl K8sClient { } } + /// Will execute a commond in the first pod found that matches the specified label + /// '{label}={name}' + pub async fn exec_app_capture_output( + &self, + name: String, + label: String, + namespace: Option<&str>, + command: Vec<&str>, + ) -> Result { + let api: Api; + + if let Some(ns) = namespace { + api = Api::namespaced(self.client.clone(), ns); + } else { + api = Api::default_namespaced(self.client.clone()); + } + let pod_list = api + .list(&ListParams::default().labels(format!("{label}={name}").as_str())) + .await + .expect("couldn't get list of pods"); + + let res = api + .exec( + pod_list + .items + .first() + .expect("couldn't get pod") + .name() + .expect("couldn't get pod name") + .into_owned() + .as_str(), + command, + &AttachParams::default().stdout(true).stderr(true), + ) + .await; + match res { + Err(e) => Err(e.to_string()), + Ok(mut process) => { + let status = process + .take_status() + .expect("Couldn't get status") + .await + .expect("Couldn't unwrap status"); + + if let Some(s) = status.status { + let mut stdout_buf = String::new(); + if let Some(mut stdout) = process.stdout().take() { + stdout.read_to_string(&mut stdout_buf).await; + } + debug!("Status: {} - {:?}", s, status.details); + if s == "Success" { + Ok(stdout_buf) + } else { + Err(s) + } + } else { + Err("Couldn't get inner status of pod exec".to_string()) + } + } + } + } + /// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}` pub async fn exec_app( &self, diff --git a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs index 173cca4..a6a85f5 100644 --- a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs +++ b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs @@ -1,72 +1,68 @@ -use std::process::Command; +use std::{ + process::Command, + sync::Arc, + time::{Duration, Instant}, +}; use async_trait::async_trait; -use serde::Serialize; +use log::{info, warn}; +use serde::{Deserialize, Serialize}; +use tokio::time::sleep; use crate::{ data::{Id, Version}, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, score::Score, - topology::{K8sclient, Topology}, + topology::{K8sclient, Topology, k8s::K8sClient}, }; #[derive(Debug, Clone, Serialize)] -pub struct PrepCephOsdReplacement { - osd_name: String, +pub struct CephRemoveOsd { + osd_deployment_name: String, rook_ceph_namespace: String, } -impl Score for PrepCephOsdReplacement { +impl Score for CephRemoveOsd { fn name(&self) -> String { - format!("CephOsdReplacementScore") + format!("CephRemoveOsdScore") } #[doc(hidden)] fn create_interpret(&self) -> Box> { - Box::new(PrepCephOsdReplacementInterpret { + Box::new(CephRemoveOsdInterpret { score: self.clone(), }) } } #[derive(Debug, Clone)] -pub struct PrepCephOsdReplacementInterpret { - score: PrepCephOsdReplacement, +pub struct CephRemoveOsdInterpret { + score: CephRemoveOsd, } #[async_trait] -impl Interpret for PrepCephOsdReplacementInterpret { +impl Interpret for CephRemoveOsdInterpret { async fn execute( &self, _inventory: &Inventory, topology: &T, ) -> Result { let client = topology.k8s_client().await.unwrap(); - client - .scale_deployment( - &self.score.osd_name, - Some(&self.score.rook_ceph_namespace), - 0, - ) - .await?; - let dep = client - .get_deployment(&self.score.osd_name, Some(&self.score.rook_ceph_namespace)) + self.scale_deployment(client.clone()).await?; + self.verify_deployment_scaled(client.clone()).await?; + self.delete_deployment(client.clone()).await?; + self.verify_deployment_deleted(client.clone()).await?; + let osd_id_full = self.get_ceph_osd_id().unwrap(); + self.purge_ceph_osd(client.clone(), &osd_id_full).await?; + self.verify_ceph_osd_removal(client.clone(), &osd_id_full) .await?; - match dep.unwrap().status.and_then(|s| s.ready_replicas) { - Some(0) => Ok(Outcome::success( - "Successfully prepared rook-ceph-cluster for disk replacement".to_string(), - )), - Some(_) => Err(InterpretError::new({ - "Deployment still has ready replicas".to_string() - })), - None => Err(InterpretError::new({ - "Failed to get rook-ceph-cluster status".to_string() - })), - } + Ok(Outcome::success(format!( + "Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}", + osd_id_full, self.score.osd_deployment_name + ))) } - fn get_name(&self) -> InterpretName { todo!() } @@ -83,3 +79,298 @@ impl Interpret for PrepCephOsdReplacementInterpret { todo!() } } + +impl CephRemoveOsdInterpret { + pub fn get_ceph_osd_id(&self) -> Result { + let osd_id_numeric = self + .score + .osd_deployment_name + .split('-') + .nth(3) + .ok_or_else(|| { + InterpretError::new(format!( + "Could not parse OSD id from deployment name {}", + self.score.osd_deployment_name + )) + })?; + let osd_id_full = format!("osd.{}", osd_id_numeric); + + info!( + "Targeting Ceph OSD: {} (parsed from deployment {})", + osd_id_full, self.score.osd_deployment_name + ); + + Ok(osd_id_full) + } + + fn build_timer(&self) -> (Duration, Duration, Instant) { + let timeout = Duration::from_secs(120); + let interval = Duration::from_secs(5); + let start = Instant::now(); + (timeout, interval, start) + } + + pub async fn scale_deployment( + &self, + client: Arc, + ) -> Result { + info!( + "Scaling down OSD deployment: {}", + self.score.osd_deployment_name + ); + client + .scale_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + 0, + ) + .await?; + Ok(Outcome::success(format!( + "Scaled down deployment {}", + self.score.osd_deployment_name + ))) + } + + pub async fn verify_deployment_scaled( + &self, + client: Arc, + ) -> Result { + let (timeout, interval, start) = self.build_timer(); + + info!("Waiting for OSD deployment to scale down to 0 replicas"); + loop { + let dep = client + .get_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + ) + .await?; + + if let Some(deployment) = dep { + if let Some(status) = deployment.status { + if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0 + { + return Ok(Outcome::success( + "Deployment successfully scaled down.".to_string(), + )); + } + } + } + + if start.elapsed() > timeout { + return Err(InterpretError::new(format!( + "Timed out waiting for deployment {} to scale down", + self.score.osd_deployment_name + ))); + } + sleep(interval).await; + } + } + + pub async fn delete_deployment( + &self, + client: Arc, + ) -> Result { + info!( + "Deleting OSD deployment: {}", + self.score.osd_deployment_name + ); + client + .delete_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + ) + .await?; + Ok(Outcome::success(format!( + "deployment {} deleted", + self.score.osd_deployment_name + ))) + } + + pub async fn verify_deployment_deleted( + &self, + client: Arc, + ) -> Result { + let (timeout, interval, start) = self.build_timer(); + + info!("Waiting for OSD deployment to scale down to 0 replicas"); + loop { + let dep = client + .get_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + ) + .await?; + + if dep.is_none() { + info!( + "Deployment {} successfully deleted.", + self.score.osd_deployment_name + ); + return Ok(Outcome::success(format!( + "Deployment {} deleted.", + self.score.osd_deployment_name + ))); + } + + if start.elapsed() > timeout { + return Err(InterpretError::new(format!( + "Timed out waiting for deployment {} to be deleted", + self.score.osd_deployment_name + ))); + } + sleep(interval).await; + } + } + + fn get_osd_tree(&self, json: serde_json::Value) -> Result { + let nodes = json.get("nodes").ok_or_else(|| { + InterpretError::new("Missing 'nodes' field in ceph osd tree JSON".to_string()) + })?; + let tree: CephOsdTree = CephOsdTree{ nodes: serde_json::from_value(nodes.clone()).map_err(|e| { + InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e)) + })?,}; + Ok(tree) + } + + pub async fn purge_ceph_osd( + &self, + client: Arc, + osd_id_full: &str, + ) -> Result { + info!( + "Purging OSD {} from Ceph cluster and removing its auth key", + osd_id_full + ); + client + .exec_app_capture_output( + "rook-ceph-tools".to_string(), + "app".to_string(), + Some(&self.score.rook_ceph_namespace), + vec![ + format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(), + format!("ceph auth del osd.{osd_id_full}").as_str(), + ], + ) + .await?; + Ok(Outcome::success(format!( + "osd id {} removed from osd tree", + osd_id_full + ))) + } + + pub async fn verify_ceph_osd_removal( + &self, + client: Arc, + osd_id_full: &str, + ) -> Result { + let (timeout, interval, start) = self.build_timer(); + info!( + "Verifying OSD {} has been removed from the Ceph tree...", + osd_id_full + ); + loop { + let output = client + .exec_app_capture_output( + "rook-ceph-tools".to_string(), + "app".to_string(), + Some(&self.score.rook_ceph_namespace), + vec!["ceph osd tree -f json"], + ) + .await?; + let tree = + self.get_osd_tree(serde_json::from_str(&output).expect("could not extract json")); + + let osd_found = tree + .unwrap() + .nodes + .iter() + .any(|node| node.name == osd_id_full); + + if !osd_found { + return Ok(Outcome::success(format!( + "Successfully verified that OSD {} is removed from the Ceph cluster.", + osd_id_full, + ))); + } + + if start.elapsed() > timeout { + return Err(InterpretError::new(format!( + "Timed out waiting for OSD {} to be removed from Ceph tree", + osd_id_full + ))); + } + + warn!( + "OSD {} still found in Ceph tree, retrying in {:?}...", + osd_id_full, interval + ); + sleep(interval).await; + } + } +} +#[derive(Debug, Deserialize, PartialEq)] +pub struct CephOsdTree { + pub nodes: Vec, +} + +#[derive(Debug, Deserialize, PartialEq)] +pub struct CephNode { + pub id: i32, + pub name: String, + #[serde(rename = "type")] + pub node_type: String, + pub type_id: Option, + pub children: Option>, + pub exists: Option, + pub status: Option, +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + + #[test] + fn test_get_osd_tree() { + let json_data = json!({ + "nodes": [ + {"id": 1, "name": "osd.1", "type": "osd", "primary_affinity":"1"}, + {"id": 2, "name": "osd.2", "type": "osd", "crush_weight": 1.22344} + ] + }); + let interpret = CephRemoveOsdInterpret { + score: CephRemoveOsd { + osd_deployment_name: "osd-1".to_string(), + rook_ceph_namespace: "dummy_ns".to_string(), + }, + }; + let json = interpret.get_osd_tree(json_data).unwrap(); + + let expected = CephOsdTree { + nodes: vec![ + CephNode { + id: 1, + name: "osd.1".to_string(), + node_type: "osd".to_string(), + type_id: None, + children: None, + exists: None, + status: None, + }, + CephNode { + id: 2, + name: "osd.2".to_string(), + node_type: "osd".to_string(), + type_id: None, + children: None, + exists: None, + status: None, + }, + ], + }; + + assert_eq!(json, expected); + } +}