diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 388ff9d..1887c33 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -5,7 +5,7 @@ use k8s_openapi::{ }; use kube::{ Client, Config, Error, Resource, - api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt}, + api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt}, config::{KubeConfigOptions, Kubeconfig}, core::ErrorResponse, runtime::reflector::Lookup, @@ -17,7 +17,9 @@ use kube::{ }; use log::{debug, error, trace}; use serde::{Serialize, de::DeserializeOwned}; +use serde_json::json; use similar::TextDiff; +use tokio::io::AsyncReadExt; #[derive(new, Clone)] pub struct K8sClient { @@ -51,6 +53,66 @@ impl K8sClient { }) } + pub async fn get_deployment( + &self, + name: &str, + namespace: Option<&str>, + ) -> Result, Error> { + let deps: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + Ok(deps.get_opt(name).await?) + } + + pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result, Error> { + let pods: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + Ok(pods.get_opt(name).await?) + } + + pub async fn scale_deployment( + &self, + name: &str, + namespace: Option<&str>, + replicas: u32, + ) -> Result<(), Error> { + let deployments: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + + let patch = json!({ + "spec": { + "replicas": replicas + } + }); + let pp = PatchParams::default(); + let scale = Patch::Apply(&patch); + deployments.patch_scale(name, &pp, &scale).await?; + Ok(()) + } + + pub async fn delete_deployment( + &self, + name: &str, + namespace: Option<&str>, + ) -> Result<(), Error> { + let deployments: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + let delete_params = DeleteParams::default(); + deployments.delete(name, &delete_params).await?; + Ok(()) + } + pub async fn wait_until_deployment_ready( &self, name: String, @@ -76,6 +138,68 @@ impl K8sClient { } } + /// Will execute a commond in the first pod found that matches the specified label + /// '{label}={name}' + pub async fn exec_app_capture_output( + &self, + name: String, + label: String, + namespace: Option<&str>, + command: Vec<&str>, + ) -> Result { + let api: Api; + + if let Some(ns) = namespace { + api = Api::namespaced(self.client.clone(), ns); + } else { + api = Api::default_namespaced(self.client.clone()); + } + let pod_list = api + .list(&ListParams::default().labels(format!("{label}={name}").as_str())) + .await + .expect("couldn't get list of pods"); + + let res = api + .exec( + pod_list + .items + .first() + .expect("couldn't get pod") + .name() + .expect("couldn't get pod name") + .into_owned() + .as_str(), + command, + &AttachParams::default().stdout(true).stderr(true), + ) + .await; + match res { + Err(e) => Err(e.to_string()), + Ok(mut process) => { + let status = process + .take_status() + .expect("Couldn't get status") + .await + .expect("Couldn't unwrap status"); + + if let Some(s) = status.status { + let mut stdout_buf = String::new(); + if let Some(mut stdout) = process.stdout().take() { + stdout.read_to_string(&mut stdout_buf).await; + } + debug!("Status: {} - {:?}", s, status.details); + if s == "Success" { + Ok(stdout_buf) + } else { + Err(s) + } + } else { + Err("Couldn't get inner status of pod exec".to_string()) + } + } + } + } + /// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}` pub async fn exec_app( &self, diff --git a/harmony/src/modules/mod.rs b/harmony/src/modules/mod.rs index e9b6c52..6df5c41 100644 --- a/harmony/src/modules/mod.rs +++ b/harmony/src/modules/mod.rs @@ -14,5 +14,6 @@ pub mod monitoring; pub mod okd; pub mod opnsense; pub mod prometheus; +pub mod storage; pub mod tenant; pub mod tftp; diff --git a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs new file mode 100644 index 0000000..a4b0cb0 --- /dev/null +++ b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs @@ -0,0 +1,419 @@ +use std::{ + process::Command, + sync::Arc, + time::{Duration, Instant}, +}; + +use async_trait::async_trait; +use log::{info, warn}; +use serde::{Deserialize, Serialize}; +use tokio::time::sleep; + +use crate::{ + data::{Id, Version}, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + score::Score, + topology::{K8sclient, Topology, k8s::K8sClient}, +}; + +#[derive(Debug, Clone, Serialize)] +pub struct CephRemoveOsd { + osd_deployment_name: String, + rook_ceph_namespace: String, +} + +impl Score for CephRemoveOsd { + fn name(&self) -> String { + format!("CephRemoveOsdScore") + } + + #[doc(hidden)] + fn create_interpret(&self) -> Box> { + Box::new(CephRemoveOsdInterpret { + score: self.clone(), + }) + } +} + +#[derive(Debug, Clone)] +pub struct CephRemoveOsdInterpret { + score: CephRemoveOsd, +} + +#[async_trait] +impl Interpret for CephRemoveOsdInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + ) -> Result { + let client = topology.k8s_client().await.unwrap(); + self.verify_ceph_toolbox_exists(client.clone()).await?; + self.scale_deployment(client.clone()).await?; + self.verify_deployment_scaled(client.clone()).await?; + self.delete_deployment(client.clone()).await?; + self.verify_deployment_deleted(client.clone()).await?; + let osd_id_full = self.get_ceph_osd_id().unwrap(); + self.purge_ceph_osd(client.clone(), &osd_id_full).await?; + self.verify_ceph_osd_removal(client.clone(), &osd_id_full) + .await?; + + Ok(Outcome::success(format!( + "Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}", + osd_id_full, self.score.osd_deployment_name + ))) + } + fn get_name(&self) -> InterpretName { + todo!() + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +} + +impl CephRemoveOsdInterpret { + pub fn get_ceph_osd_id(&self) -> Result { + let osd_id_numeric = self + .score + .osd_deployment_name + .split('-') + .nth(3) + .ok_or_else(|| { + InterpretError::new(format!( + "Could not parse OSD id from deployment name {}", + self.score.osd_deployment_name + )) + })?; + let osd_id_full = format!("osd.{}", osd_id_numeric); + + info!( + "Targeting Ceph OSD: {} (parsed from deployment {})", + osd_id_full, self.score.osd_deployment_name + ); + + Ok(osd_id_full) + } + + pub async fn verify_ceph_toolbox_exists( + &self, + client: Arc, + ) -> Result { + let toolbox_dep = "rook-ceph-tools".to_string(); + + match client + .get_deployment(&toolbox_dep, Some(&self.score.rook_ceph_namespace)) + .await + { + Ok(Some(deployment)) => { + if let Some(status) = deployment.status { + let ready_count = status.ready_replicas.unwrap_or(0); + if ready_count >= 1 { + return Ok(Outcome::success(format!( + "'{}' is ready with {} replica(s).", + &toolbox_dep, ready_count + ))); + } else { + return Err(InterpretError::new( + "ceph-tool-box not ready in cluster".to_string(), + )); + } + } else { + Err(InterpretError::new(format!( + "failed to get deployment status {}", + &toolbox_dep + ))) + } + } + Ok(None) => Err(InterpretError::new(format!( + "Deployment '{}' not found in namespace '{}'.", + &toolbox_dep, self.score.rook_ceph_namespace + ))), + Err(e) => Err(InterpretError::new(format!( + "Failed to query for deployment '{}': {}", + &toolbox_dep, e + ))), + } + } + + pub async fn scale_deployment( + &self, + client: Arc, + ) -> Result { + info!( + "Scaling down OSD deployment: {}", + self.score.osd_deployment_name + ); + client + .scale_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + 0, + ) + .await?; + Ok(Outcome::success(format!( + "Scaled down deployment {}", + self.score.osd_deployment_name + ))) + } + + pub async fn verify_deployment_scaled( + &self, + client: Arc, + ) -> Result { + let (timeout, interval, start) = self.build_timer(); + + info!("Waiting for OSD deployment to scale down to 0 replicas"); + loop { + let dep = client + .get_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + ) + .await?; + + if let Some(deployment) = dep { + if let Some(status) = deployment.status { + if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0 + { + return Ok(Outcome::success( + "Deployment successfully scaled down.".to_string(), + )); + } + } + } + + if start.elapsed() > timeout { + return Err(InterpretError::new(format!( + "Timed out waiting for deployment {} to scale down", + self.score.osd_deployment_name + ))); + } + sleep(interval).await; + } + } + + fn build_timer(&self) -> (Duration, Duration, Instant) { + let timeout = Duration::from_secs(120); + let interval = Duration::from_secs(5); + let start = Instant::now(); + (timeout, interval, start) + } + pub async fn delete_deployment( + &self, + client: Arc, + ) -> Result { + info!( + "Deleting OSD deployment: {}", + self.score.osd_deployment_name + ); + client + .delete_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + ) + .await?; + Ok(Outcome::success(format!( + "deployment {} deleted", + self.score.osd_deployment_name + ))) + } + + pub async fn verify_deployment_deleted( + &self, + client: Arc, + ) -> Result { + let (timeout, interval, start) = self.build_timer(); + + info!("Waiting for OSD deployment to scale down to 0 replicas"); + loop { + let dep = client + .get_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + ) + .await?; + + if dep.is_none() { + info!( + "Deployment {} successfully deleted.", + self.score.osd_deployment_name + ); + return Ok(Outcome::success(format!( + "Deployment {} deleted.", + self.score.osd_deployment_name + ))); + } + + if start.elapsed() > timeout { + return Err(InterpretError::new(format!( + "Timed out waiting for deployment {} to be deleted", + self.score.osd_deployment_name + ))); + } + sleep(interval).await; + } + } + + fn get_osd_tree(&self, json: serde_json::Value) -> Result { + let nodes = json.get("nodes").ok_or_else(|| { + InterpretError::new("Missing 'nodes' field in ceph osd tree JSON".to_string()) + })?; + let tree: CephOsdTree = CephOsdTree { + nodes: serde_json::from_value(nodes.clone()).map_err(|e| { + InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e)) + })?, + }; + Ok(tree) + } + + pub async fn purge_ceph_osd( + &self, + client: Arc, + osd_id_full: &str, + ) -> Result { + info!( + "Purging OSD {} from Ceph cluster and removing its auth key", + osd_id_full + ); + client + .exec_app_capture_output( + "rook-ceph-tools".to_string(), + "app".to_string(), + Some(&self.score.rook_ceph_namespace), + vec![ + format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(), + format!("ceph auth del osd.{osd_id_full}").as_str(), + ], + ) + .await?; + Ok(Outcome::success(format!( + "osd id {} removed from osd tree", + osd_id_full + ))) + } + + pub async fn verify_ceph_osd_removal( + &self, + client: Arc, + osd_id_full: &str, + ) -> Result { + let (timeout, interval, start) = self.build_timer(); + info!( + "Verifying OSD {} has been removed from the Ceph tree...", + osd_id_full + ); + loop { + let output = client + .exec_app_capture_output( + "rook-ceph-tools".to_string(), + "app".to_string(), + Some(&self.score.rook_ceph_namespace), + vec!["ceph osd tree -f json"], + ) + .await?; + let tree = + self.get_osd_tree(serde_json::from_str(&output).expect("could not extract json")); + + let osd_found = tree + .unwrap() + .nodes + .iter() + .any(|node| node.name == osd_id_full); + + if !osd_found { + return Ok(Outcome::success(format!( + "Successfully verified that OSD {} is removed from the Ceph cluster.", + osd_id_full, + ))); + } + + if start.elapsed() > timeout { + return Err(InterpretError::new(format!( + "Timed out waiting for OSD {} to be removed from Ceph tree", + osd_id_full + ))); + } + + warn!( + "OSD {} still found in Ceph tree, retrying in {:?}...", + osd_id_full, interval + ); + sleep(interval).await; + } + } +} +#[derive(Debug, Deserialize, PartialEq)] +pub struct CephOsdTree { + pub nodes: Vec, +} + +#[derive(Debug, Deserialize, PartialEq)] +pub struct CephNode { + pub id: i32, + pub name: String, + #[serde(rename = "type")] + pub node_type: String, + pub type_id: Option, + pub children: Option>, + pub exists: Option, + pub status: Option, +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + + #[test] + fn test_get_osd_tree() { + let json_data = json!({ + "nodes": [ + {"id": 1, "name": "osd.1", "type": "osd", "primary_affinity":"1"}, + {"id": 2, "name": "osd.2", "type": "osd", "crush_weight": 1.22344} + ] + }); + let interpret = CephRemoveOsdInterpret { + score: CephRemoveOsd { + osd_deployment_name: "osd-1".to_string(), + rook_ceph_namespace: "dummy_ns".to_string(), + }, + }; + let json = interpret.get_osd_tree(json_data).unwrap(); + + let expected = CephOsdTree { + nodes: vec![ + CephNode { + id: 1, + name: "osd.1".to_string(), + node_type: "osd".to_string(), + type_id: None, + children: None, + exists: None, + status: None, + }, + CephNode { + id: 2, + name: "osd.2".to_string(), + node_type: "osd".to_string(), + type_id: None, + children: None, + exists: None, + status: None, + }, + ], + }; + + assert_eq!(json, expected); + } +} diff --git a/harmony/src/modules/storage/ceph/mod.rs b/harmony/src/modules/storage/ceph/mod.rs new file mode 100644 index 0000000..a993c3d --- /dev/null +++ b/harmony/src/modules/storage/ceph/mod.rs @@ -0,0 +1 @@ +pub mod ceph_osd_replacement_score; diff --git a/harmony/src/modules/storage/mod.rs b/harmony/src/modules/storage/mod.rs new file mode 100644 index 0000000..ee3e235 --- /dev/null +++ b/harmony/src/modules/storage/mod.rs @@ -0,0 +1 @@ +pub mod ceph;