From b43ca7c7408ffa0087e6229db2c04cc8be40d175 Mon Sep 17 00:00:00 2001 From: Willem Date: Fri, 15 Aug 2025 14:51:16 -0400 Subject: [PATCH 1/6] feat: score for preparing rook ceph cluster to remove drive based on rook-ceph-osd deployment name added functions to K8sclient to be able to scale deployment to a desired replicaset number and get pod based on name and namespace --- harmony/src/domain/topology/k8s.rs | 33 ++++++++ harmony/src/modules/mod.rs | 1 + .../ceph/ceph_osd_replacement_score.rs | 79 +++++++++++++++++++ harmony/src/modules/storage/ceph/mod.rs | 1 + harmony/src/modules/storage/mod.rs | 1 + 5 files changed, 115 insertions(+) create mode 100644 harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs create mode 100644 harmony/src/modules/storage/ceph/mod.rs create mode 100644 harmony/src/modules/storage/mod.rs diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 388ff9d..1eae139 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -17,6 +17,7 @@ use kube::{ }; use log::{debug, error, trace}; use serde::{Serialize, de::DeserializeOwned}; +use serde_json::json; use similar::TextDiff; #[derive(new, Clone)] @@ -51,6 +52,38 @@ impl K8sClient { }) } + pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result, Error> { + let pods: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + Ok(pods.get_opt(name).await?) + } + + pub async fn scale_deployment( + &self, + name: &str, + namespace: Option<&str>, + replicas: u32, + ) -> Result<(), Error> { + let deployments: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + + let patch = json!({ + "spec": { + "replicas": replicas + } + }); + let pp = PatchParams::default(); + let scale = Patch::Apply(&patch); + deployments.patch_scale(name, &pp, &scale).await?; + Ok(()) + } + pub async fn wait_until_deployment_ready( &self, name: String, diff --git a/harmony/src/modules/mod.rs b/harmony/src/modules/mod.rs index e9b6c52..6df5c41 100644 --- a/harmony/src/modules/mod.rs +++ b/harmony/src/modules/mod.rs @@ -14,5 +14,6 @@ pub mod monitoring; pub mod okd; pub mod opnsense; pub mod prometheus; +pub mod storage; pub mod tenant; pub mod tftp; diff --git a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs new file mode 100644 index 0000000..0223e62 --- /dev/null +++ b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs @@ -0,0 +1,79 @@ +use std::process::Command; + +use async_trait::async_trait; +use serde::Serialize; + +use crate::{ + data::{Id, Version}, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + score::Score, + topology::{K8sclient, Topology}, +}; + +#[derive(Debug, Clone, Serialize)] +pub struct PrepCephOsdReplacement { + osd_name: String, + rook_ceph_namespace: String, +} + +impl Score for PrepCephOsdReplacement { + fn name(&self) -> String { + format!("CephOsdReplacementScore") + } + + #[doc(hidden)] + fn create_interpret(&self) -> Box> { + Box::new(PrepCephOsdReplacementInterpret { + score: self.clone(), + }) + } +} + +#[derive(Debug, Clone)] +pub struct PrepCephOsdReplacementInterpret { + score: PrepCephOsdReplacement, +} + +#[async_trait] +impl Interpret for PrepCephOsdReplacementInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + ) -> Result { + let client = topology.k8s_client().await.unwrap(); + client + .scale_deployment( + &self.score.osd_name, + Some(&self.score.rook_ceph_namespace), + 0, + ) + .await?; + client + .get_pod(&self.score.osd_name, Some(&self.score.rook_ceph_namespace)) + .await?; + + Ok(Outcome::success( + "Successfully prepared rook-ceph-cluster for disk replacement".to_string(), + )) + } + + fn get_name(&self) -> InterpretName { + todo!() + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +} + +impl PrepCephOsdReplacementInterpret {} diff --git a/harmony/src/modules/storage/ceph/mod.rs b/harmony/src/modules/storage/ceph/mod.rs new file mode 100644 index 0000000..a993c3d --- /dev/null +++ b/harmony/src/modules/storage/ceph/mod.rs @@ -0,0 +1 @@ +pub mod ceph_osd_replacement_score; diff --git a/harmony/src/modules/storage/mod.rs b/harmony/src/modules/storage/mod.rs new file mode 100644 index 0000000..ee3e235 --- /dev/null +++ b/harmony/src/modules/storage/mod.rs @@ -0,0 +1 @@ +pub mod ceph; From d1a274b7054d1821d233bde5396339e7329d6e10 Mon Sep 17 00:00:00 2001 From: Willem Date: Fri, 15 Aug 2025 15:44:06 -0400 Subject: [PATCH 2/6] fix: checks deployment status ready replicas rather than pod name since the pod name is not necessarily matching the deployment name and often has a random generated number in it --- harmony/src/domain/topology/k8s.rs | 13 ++++++++++++ .../ceph/ceph_osd_replacement_score.rs | 20 ++++++++++++------- 2 files changed, 26 insertions(+), 7 deletions(-) diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 1eae139..30b335f 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -52,6 +52,19 @@ impl K8sClient { }) } + pub async fn get_deployment( + &self, + name: &str, + namespace: Option<&str>, + ) -> Result, Error> { + let deps: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + Ok(deps.get_opt(name).await?) + } + pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result, Error> { let pods: Api = if let Some(ns) = namespace { Api::namespaced(self.client.clone(), ns) diff --git a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs index 0223e62..173cca4 100644 --- a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs +++ b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs @@ -50,13 +50,21 @@ impl Interpret for PrepCephOsdReplacementInterpret { 0, ) .await?; - client - .get_pod(&self.score.osd_name, Some(&self.score.rook_ceph_namespace)) + let dep = client + .get_deployment(&self.score.osd_name, Some(&self.score.rook_ceph_namespace)) .await?; - Ok(Outcome::success( - "Successfully prepared rook-ceph-cluster for disk replacement".to_string(), - )) + match dep.unwrap().status.and_then(|s| s.ready_replicas) { + Some(0) => Ok(Outcome::success( + "Successfully prepared rook-ceph-cluster for disk replacement".to_string(), + )), + Some(_) => Err(InterpretError::new({ + "Deployment still has ready replicas".to_string() + })), + None => Err(InterpretError::new({ + "Failed to get rook-ceph-cluster status".to_string() + })), + } } fn get_name(&self) -> InterpretName { @@ -75,5 +83,3 @@ impl Interpret for PrepCephOsdReplacementInterpret { todo!() } } - -impl PrepCephOsdReplacementInterpret {} From 6685b05cc5bd31f3d51f53a0caae18205d93058d Mon Sep 17 00:00:00 2001 From: Jean-Gabriel Gill-Couture Date: Tue, 19 Aug 2025 17:05:23 -0400 Subject: [PATCH 3/6] wip(inventory_agent): Refactoring for better error handling in progress --- Cargo.lock | 4 +- harmony_inventory_agent/src/hwinfo.rs | 156 +++++++++++++++++++++----- harmony_inventory_agent/src/main.rs | 12 +- 3 files changed, 143 insertions(+), 29 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d08a31f..e02ce86 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -105,7 +105,7 @@ dependencies = [ "futures-core", "futures-util", "mio 1.0.4", - "socket2", + "socket2 0.5.10", "tokio", "tracing", ] @@ -167,7 +167,7 @@ dependencies = [ "serde_json", "serde_urlencoded", "smallvec", - "socket2", + "socket2 0.5.10", "time", "tracing", "url", diff --git a/harmony_inventory_agent/src/hwinfo.rs b/harmony_inventory_agent/src/hwinfo.rs index 0a628b0..51867fd 100644 --- a/harmony_inventory_agent/src/hwinfo.rs +++ b/harmony_inventory_agent/src/hwinfo.rs @@ -83,20 +83,95 @@ pub struct ManagementInterface { } impl PhysicalHost { - pub fn gather() -> Self { + pub fn gather() -> Result { let mut sys = System::new_all(); sys.refresh_all(); - Self { - storage_drives: Self::gather_storage_drives(), - storage_controller: Self::gather_storage_controller(), - memory_modules: Self::gather_memory_modules(), - cpus: Self::gather_cpus(&sys), - chipset: Self::gather_chipset(), - network_interfaces: Self::gather_network_interfaces(), - management_interface: Self::gather_management_interface(), - host_uuid: Self::get_host_uuid(), + Self::all_tools_available()?; + + Ok(Self { + storage_drives: Self::gather_storage_drives()?, + storage_controller: Self::gather_storage_controller()?, + memory_modules: Self::gather_memory_modules()?, + cpus: Self::gather_cpus(&sys)?, + chipset: Self::gather_chipset()?, + network_interfaces: Self::gather_network_interfaces()?, + management_interface: Self::gather_management_interface()?, + host_uuid: Self::get_host_uuid()?, + }) + } + + fn all_tools_available() -> Result<(), String>{ + let required_tools = [ + ("lsblk", "--version"), + ("lspci", "--version"), + ("lsmod", "--version"), + ("dmidecode", "--version"), + ("smartctl", "--version"), + ("ip", "route"), // No version flag available + ]; + + let mut missing_tools = Vec::new(); + + for (tool, tool_arg) in required_tools.iter() { + // First check if tool exists in PATH using which(1) + let exists = if let Ok(output) = Command::new("which").arg(tool).output() { + output.status.success() + } else { + // Fallback: manual PATH search if which(1) is unavailable + if let Ok(path_var) = std::env::var("PATH") { + path_var.split(':').any(|dir| { + let tool_path = std::path::Path::new(dir).join(tool); + tool_path.exists() && Self::is_executable(&tool_path) + }) + } else { + false + } + }; + + if !exists { + missing_tools.push(*tool); + continue; + } + + // Verify tool is functional by checking version/help output + let mut cmd = Command::new(tool); + cmd.arg(tool_arg); + cmd.stdout(std::process::Stdio::null()); + cmd.stderr(std::process::Stdio::null()); + + missing_tools.push(*tool); } + + if !missing_tools.is_empty() { + let missing_str = missing_tools + .iter() + .map(|s| s.to_string()) + .collect::>() + .join(", "); + return Err(format!( + "The following required tools are not available: {}. Please install these tools to use PhysicalHost::gather()", + missing_str + )); + } + + Ok(()) + } + + #[cfg(unix)] + fn is_executable(path: &std::path::Path) -> bool { + use std::os::unix::fs::PermissionsExt; + + match std::fs::metadata(path) { + Ok(meta) => meta.permissions().mode() & 0o111 != 0, + Err(_) => false, + } + } + + #[cfg(not(unix))] + fn is_executable(_path: &std::path::Path) -> bool { + // On non-Unix systems, we assume existence implies executability + true } fn gather_storage_drives() -> Vec { @@ -331,11 +406,11 @@ impl PhysicalHost { cpus } - fn gather_chipset() -> Chipset { - Chipset { - name: Self::read_dmi("board-product-name").unwrap_or_else(|| "Unknown".to_string()), - vendor: Self::read_dmi("board-manufacturer").unwrap_or_else(|| "Unknown".to_string()), - } + fn gather_chipset() -> Result { + Ok(Chipset { + name: Self::read_dmi("board-product-name")?, + vendor: Self::read_dmi("board-manufacturer")?, + }) } fn gather_network_interfaces() -> Vec { @@ -436,16 +511,47 @@ impl PhysicalHost { .and_then(|s| s.trim().parse().ok()) } - fn read_dmi(field: &str) -> Option { - Command::new("dmidecode") - .arg("-s") - .arg(field) - .output() - .ok() - .filter(|output| output.status.success()) - .and_then(|output| String::from_utf8(output.stdout).ok()) - .map(|s| s.trim().to_string()) - .filter(|s| !s.is_empty()) + // Valid string keywords are: + // bios-vendor + // bios-version + // bios-release-date + // bios-revision + // firmware-revision + // system-manufacturer + // system-product-name + // system-version + // system-serial-number + // system-uuid + // system-sku-number + // system-family + // baseboard-manufacturer + // baseboard-product-name + // baseboard-version + // baseboard-serial-number + // baseboard-asset-tag + // chassis-manufacturer + // chassis-type + // chassis-version + // chassis-serial-number + // chassis-asset-tag + // processor-family + // processor-manufacturer + // processor-version + // processor-frequency + fn read_dmi(field: &str) -> Result { + match Command::new("dmidecode").arg("-s").arg(field).output() { + Ok(output) => { + let stdout = String::from_utf8(output.stdout).expect("Output should parse as utf8"); + if output.status.success() && stdout.is_empty() { + return Ok(stdout); + } else { + return Err(format!( + "dmidecode command failed for field {field} : {stdout}" + )); + } + } + Err(e) => Err(format!("dmidecode command failed for field {field} : {e}")), + } } fn get_interface_type(device_name: &str, device_path: &Path) -> String { diff --git a/harmony_inventory_agent/src/main.rs b/harmony_inventory_agent/src/main.rs index 9421056..8784b00 100644 --- a/harmony_inventory_agent/src/main.rs +++ b/harmony_inventory_agent/src/main.rs @@ -9,8 +9,16 @@ mod hwinfo; async fn inventory() -> impl Responder { log::info!("Received inventory request"); let host = PhysicalHost::gather(); - log::info!("Inventory data gathered successfully"); - actix_web::HttpResponse::Ok().json(host) + match host { + Ok(host) => { + log::info!("Inventory data gathered successfully"); + actix_web::HttpResponse::Ok().json(host) + } + Err(error) => { + log::error!("Inventory data gathering FAILED"); + actix_web::HttpResponse::InternalServerError().json(error) + } + } } #[actix_web::main] From 72fb05b5cc0bcfa1dd97b7eeb1dae4e2624cde15 Mon Sep 17 00:00:00 2001 From: Jean-Gabriel Gill-Couture Date: Tue, 19 Aug 2025 17:56:06 -0400 Subject: [PATCH 4/6] fix(inventory_agent) : Agent now retreives correct dmidecode fields, fixed uuid generation which is unacceptable, fixed storage drive parsing, much better error handling, much more strict behavior which also leads to more complete output as missing fields will raise errors unless explicitely optional --- Cargo.lock | 1 - harmony_inventory_agent/Cargo.toml | 1 - harmony_inventory_agent/src/hwinfo.rs | 806 +++++++++++++++----------- 3 files changed, 477 insertions(+), 331 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e02ce86..b1295be 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2178,7 +2178,6 @@ dependencies = [ "serde", "serde_json", "sysinfo", - "uuid", ] [[package]] diff --git a/harmony_inventory_agent/Cargo.toml b/harmony_inventory_agent/Cargo.toml index a299ca0..3b1be2c 100644 --- a/harmony_inventory_agent/Cargo.toml +++ b/harmony_inventory_agent/Cargo.toml @@ -10,4 +10,3 @@ serde.workspace = true serde_json.workspace = true log.workspace = true env_logger.workspace = true -uuid.workspace = true diff --git a/harmony_inventory_agent/src/hwinfo.rs b/harmony_inventory_agent/src/hwinfo.rs index 51867fd..21045cc 100644 --- a/harmony_inventory_agent/src/hwinfo.rs +++ b/harmony_inventory_agent/src/hwinfo.rs @@ -1,3 +1,4 @@ +use log::debug; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::fs; @@ -101,7 +102,7 @@ impl PhysicalHost { }) } - fn all_tools_available() -> Result<(), String>{ + fn all_tools_available() -> Result<(), String> { let required_tools = [ ("lsblk", "--version"), ("lspci", "--version"), @@ -140,7 +141,13 @@ impl PhysicalHost { cmd.stdout(std::process::Stdio::null()); cmd.stderr(std::process::Stdio::null()); - missing_tools.push(*tool); + if let Ok(status) = cmd.status() { + if !status.success() { + missing_tools.push(*tool); + } + } else { + missing_tools.push(*tool); + } } if !missing_tools.is_empty() { @@ -174,11 +181,11 @@ impl PhysicalHost { true } - fn gather_storage_drives() -> Vec { + fn gather_storage_drives() -> Result, String> { let mut drives = Vec::new(); // Use lsblk with JSON output for robust parsing - if let Ok(output) = Command::new("lsblk") + let output = Command::new("lsblk") .args([ "-d", "-o", @@ -189,132 +196,165 @@ impl PhysicalHost { "--json", ]) .output() - && output.status.success() - && let Ok(json) = serde_json::from_slice::(&output.stdout) - && let Some(blockdevices) = json.get("blockdevices").and_then(|v| v.as_array()) - { - for device in blockdevices { - let name = device - .get("name") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - if name.is_empty() { - continue; - } + .map_err(|e| format!("Failed to execute lsblk: {}", e))?; - let model = device - .get("model") - .and_then(|v| v.as_str()) - .map(|s| s.trim().to_string()) - .unwrap_or_default(); - - let serial = device - .get("serial") - .and_then(|v| v.as_str()) - .map(|s| s.trim().to_string()) - .unwrap_or_default(); - - let size_str = device.get("size").and_then(|v| v.as_str()).unwrap_or("0"); - let size_bytes = Self::parse_size(size_str).unwrap_or(0); - - let rotational = device - .get("rota") - .and_then(|v| v.as_bool()) - .unwrap_or(false); - - let wwn = device - .get("wwn") - .and_then(|v| v.as_str()) - .map(|s| s.trim().to_string()) - .filter(|s| !s.is_empty() && s != "null"); - - let device_path = Path::new("/sys/block").join(&name); - - let mut drive = StorageDrive { - name: name.clone(), - model, - serial, - size_bytes, - logical_block_size: Self::read_sysfs_u32( - &device_path.join("queue/logical_block_size"), - ) - .unwrap_or(512), - physical_block_size: Self::read_sysfs_u32( - &device_path.join("queue/physical_block_size"), - ) - .unwrap_or(512), - rotational, - wwn, - interface_type: Self::get_interface_type(&name, &device_path), - smart_status: Self::get_smart_status(&name), - }; - - // Enhance with additional sysfs info if available - if device_path.exists() { - if drive.model.is_empty() { - drive.model = Self::read_sysfs_string(&device_path.join("device/model")); - } - if drive.serial.is_empty() { - drive.serial = Self::read_sysfs_string(&device_path.join("device/serial")); - } - } - - drives.push(drive); - } + if !output.status.success() { + return Err(format!( + "lsblk command failed: {}", + String::from_utf8_lossy(&output.stderr) + )); } - drives + let json: Value = serde_json::from_slice(&output.stdout) + .map_err(|e| format!("Failed to parse lsblk JSON output: {}", e))?; + + let blockdevices = json + .get("blockdevices") + .and_then(|v| v.as_array()) + .ok_or("Invalid lsblk JSON: missing 'blockdevices' array")?; + + for device in blockdevices { + let name = device + .get("name") + .and_then(|v| v.as_str()) + .ok_or("Missing 'name' in lsblk device")? + .to_string(); + + if name.is_empty() { + continue; + } + + let model = device + .get("model") + .and_then(|v| v.as_str()) + .map(|s| s.trim().to_string()) + .unwrap_or_default(); + + let serial = device + .get("serial") + .and_then(|v| v.as_str()) + .map(|s| s.trim().to_string()) + .unwrap_or_default(); + + let size_str = device + .get("size") + .and_then(|v| v.as_str()) + .ok_or("Missing 'size' in lsblk device")?; + let size_bytes = Self::parse_size(size_str)?; + + let rotational = device + .get("rota") + .and_then(|v| v.as_bool()) + .ok_or("Missing 'rota' in lsblk device")?; + + let wwn = device + .get("wwn") + .and_then(|v| v.as_str()) + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty() && s != "null"); + + let device_path = Path::new("/sys/block").join(&name); + + let logical_block_size = Self::read_sysfs_u32( + &device_path.join("queue/logical_block_size"), + ) + .map_err(|e| format!("Failed to read logical block size for {}: {}", name, e))?; + + let physical_block_size = Self::read_sysfs_u32( + &device_path.join("queue/physical_block_size"), + ) + .map_err(|e| format!("Failed to read physical block size for {}: {}", name, e))?; + + let interface_type = Self::get_interface_type(&name, &device_path)?; + let smart_status = Self::get_smart_status(&name)?; + + let mut drive = StorageDrive { + name: name.clone(), + model, + serial, + size_bytes, + logical_block_size, + physical_block_size, + rotational, + wwn, + interface_type, + smart_status, + }; + + // Enhance with additional sysfs info if available + if device_path.exists() { + if drive.model.is_empty() { + drive.model = Self::read_sysfs_string(&device_path.join("device/model")) + .map_err(|e| format!("Failed to read model for {}: {}", name, e))?; + } + if drive.serial.is_empty() { + drive.serial = Self::read_sysfs_string(&device_path.join("device/serial")) + .map_err(|e| format!("Failed to read serial for {}: {}", name, e))?; + } + } + + drives.push(drive); + } + + Ok(drives) } - fn gather_storage_controller() -> StorageController { + fn gather_storage_controller() -> Result { let mut controller = StorageController { name: "Unknown".to_string(), driver: "Unknown".to_string(), }; // Use lspci with JSON output if available - if let Ok(output) = Command::new("lspci") + let output = Command::new("lspci") .args(["-nn", "-d", "::0100", "-J"]) // Storage controllers class with JSON .output() - && output.status.success() - && let Ok(json) = serde_json::from_slice::(&output.stdout) - && let Some(devices) = json.as_array() - { - for device in devices { - if let Some(device_info) = device.as_object() - && let Some(name) = device_info - .get("device") - .and_then(|v| v.as_object()) - .and_then(|v| v.get("name")) - .and_then(|v| v.as_str()) - { - controller.name = name.to_string(); - break; + .map_err(|e| format!("Failed to execute lspci: {}", e))?; + + if output.status.success() { + let json: Value = serde_json::from_slice(&output.stdout) + .map_err(|e| format!("Failed to parse lspci JSON output: {}", e))?; + + if let Some(devices) = json.as_array() { + for device in devices { + if let Some(device_info) = device.as_object() + && let Some(name) = device_info + .get("device") + .and_then(|v| v.as_object()) + .and_then(|v| v.get("name")) + .and_then(|v| v.as_str()) + { + controller.name = name.to_string(); + break; + } } } } - // Fallback to text output if JSON fails - if controller.name == "Unknown" - && let Ok(output) = Command::new("lspci") + // Fallback to text output if JSON fails or no device found + if controller.name == "Unknown" { + let output = Command::new("lspci") .args(["-nn", "-d", "::0100"]) // Storage controllers class .output() - && output.status.success() - { - let output_str = String::from_utf8_lossy(&output.stdout); - if let Some(line) = output_str.lines().next() { - let parts: Vec<&str> = line.split(':').collect(); - if parts.len() > 2 { - controller.name = parts[2].trim().to_string(); + .map_err(|e| format!("Failed to execute lspci (fallback): {}", e))?; + + if output.status.success() { + let output_str = String::from_utf8_lossy(&output.stdout); + if let Some(line) = output_str.lines().next() { + let parts: Vec<&str> = line.split(':').collect(); + if parts.len() > 2 { + controller.name = parts[2].trim().to_string(); + } } } } // Try to get driver info from lsmod - if let Ok(output) = Command::new("lsmod").output() - && output.status.success() - { + let output = Command::new("lsmod") + .output() + .map_err(|e| format!("Failed to execute lsmod: {}", e))?; + + if output.status.success() { let output_str = String::from_utf8_lossy(&output.stdout); for line in output_str.lines() { if line.contains("ahci") @@ -331,67 +371,78 @@ impl PhysicalHost { } } - controller + Ok(controller) } - fn gather_memory_modules() -> Vec { + fn gather_memory_modules() -> Result, String> { let mut modules = Vec::new(); - if let Ok(output) = Command::new("dmidecode").arg("--type").arg("17").output() - && output.status.success() - { - let output_str = String::from_utf8_lossy(&output.stdout); - let sections: Vec<&str> = output_str.split("Memory Device").collect(); + let output = Command::new("dmidecode") + .arg("--type") + .arg("17") + .output() + .map_err(|e| format!("Failed to execute dmidecode: {}", e))?; - for section in sections.into_iter().skip(1) { - let mut module = MemoryModule { - size_bytes: 0, - speed_mhz: None, - manufacturer: None, - part_number: None, - serial_number: None, - rank: None, - }; + if !output.status.success() { + return Err(format!( + "dmidecode command failed: {}", + String::from_utf8_lossy(&output.stderr) + )); + } - for line in section.lines() { - let line = line.trim(); - if let Some(size_str) = line.strip_prefix("Size: ") { - if size_str != "No Module Installed" - && let Some((num, unit)) = size_str.split_once(' ') - && let Ok(num) = num.parse::() - { - module.size_bytes = match unit { - "MB" => num * 1024 * 1024, - "GB" => num * 1024 * 1024 * 1024, - "KB" => num * 1024, - _ => 0, - }; - } - } else if let Some(speed_str) = line.strip_prefix("Speed: ") { - if let Some((num, _unit)) = speed_str.split_once(' ') { - module.speed_mhz = num.parse().ok(); - } - } else if let Some(man) = line.strip_prefix("Manufacturer: ") { - module.manufacturer = Some(man.to_string()); - } else if let Some(part) = line.strip_prefix("Part Number: ") { - module.part_number = Some(part.to_string()); - } else if let Some(serial) = line.strip_prefix("Serial Number: ") { - module.serial_number = Some(serial.to_string()); - } else if let Some(rank) = line.strip_prefix("Rank: ") { - module.rank = rank.parse().ok(); + let output_str = String::from_utf8(output.stdout) + .map_err(|e| format!("Failed to parse dmidecode output: {}", e))?; + + let sections: Vec<&str> = output_str.split("Memory Device").collect(); + + for section in sections.into_iter().skip(1) { + let mut module = MemoryModule { + size_bytes: 0, + speed_mhz: None, + manufacturer: None, + part_number: None, + serial_number: None, + rank: None, + }; + + for line in section.lines() { + let line = line.trim(); + if let Some(size_str) = line.strip_prefix("Size: ") { + if size_str != "No Module Installed" + && let Some((num, unit)) = size_str.split_once(' ') + && let Ok(num) = num.parse::() + { + module.size_bytes = match unit { + "MB" => num * 1024 * 1024, + "GB" => num * 1024 * 1024 * 1024, + "KB" => num * 1024, + _ => 0, + }; } + } else if let Some(speed_str) = line.strip_prefix("Speed: ") { + if let Some((num, _unit)) = speed_str.split_once(' ') { + module.speed_mhz = num.parse().ok(); + } + } else if let Some(man) = line.strip_prefix("Manufacturer: ") { + module.manufacturer = Some(man.to_string()); + } else if let Some(part) = line.strip_prefix("Part Number: ") { + module.part_number = Some(part.to_string()); + } else if let Some(serial) = line.strip_prefix("Serial Number: ") { + module.serial_number = Some(serial.to_string()); + } else if let Some(rank) = line.strip_prefix("Rank: ") { + module.rank = rank.parse().ok(); } + } - if module.size_bytes > 0 { - modules.push(module); - } + if module.size_bytes > 0 { + modules.push(module); } } - modules + Ok(modules) } - fn gather_cpus(sys: &System) -> Vec { + fn gather_cpus(sys: &System) -> Result, String> { let mut cpus = Vec::new(); let global_cpu = sys.global_cpu_info(); @@ -403,232 +454,310 @@ impl PhysicalHost { frequency_mhz: global_cpu.frequency(), }); - cpus + Ok(cpus) } fn gather_chipset() -> Result { Ok(Chipset { - name: Self::read_dmi("board-product-name")?, - vendor: Self::read_dmi("board-manufacturer")?, + name: Self::read_dmi("baseboard-product-name")?, + vendor: Self::read_dmi("baseboard-manufacturer")?, }) } - fn gather_network_interfaces() -> Vec { + fn gather_network_interfaces() -> Result, String> { let mut interfaces = Vec::new(); let sys_net_path = Path::new("/sys/class/net"); - if let Ok(entries) = fs::read_dir(sys_net_path) { - for entry in entries.flatten() { - let iface_name = entry.file_name().into_string().unwrap_or_default(); - let iface_path = entry.path(); + let entries = fs::read_dir(sys_net_path) + .map_err(|e| format!("Failed to read /sys/class/net: {}", e))?; - // Skip virtual interfaces - if iface_name.starts_with("lo") - || iface_name.starts_with("docker") - || iface_name.starts_with("virbr") - || iface_name.starts_with("veth") - || iface_name.starts_with("br-") - || iface_name.starts_with("tun") - || iface_name.starts_with("wg") - { - continue; - } + for entry in entries { + let entry = entry.map_err(|e| format!("Failed to read directory entry: {}", e))?; + let iface_name = entry + .file_name() + .into_string() + .map_err(|_| "Invalid UTF-8 in interface name")?; + let iface_path = entry.path(); - // Check if it's a physical interface by looking for device directory - if !iface_path.join("device").exists() { - continue; - } - - let mac_address = Self::read_sysfs_string(&iface_path.join("address")); - let speed_mbps = Self::read_sysfs_u32(&iface_path.join("speed")); - let operstate = Self::read_sysfs_string(&iface_path.join("operstate")); - let mtu = Self::read_sysfs_u32(&iface_path.join("mtu")).unwrap_or(1500); - let driver = Self::read_sysfs_string(&iface_path.join("device/driver/module")); - let firmware_version = - Self::read_sysfs_opt_string(&iface_path.join("device/firmware_version")); - - // Get IP addresses using ip command with JSON output - let (ipv4_addresses, ipv6_addresses) = Self::get_interface_ips_json(&iface_name); - - interfaces.push(NetworkInterface { - name: iface_name, - mac_address, - speed_mbps, - is_up: operstate == "up", - mtu, - ipv4_addresses, - ipv6_addresses, - driver, - firmware_version, - }); + // Skip virtual interfaces + if iface_name.starts_with("lo") + || iface_name.starts_with("docker") + || iface_name.starts_with("virbr") + || iface_name.starts_with("veth") + || iface_name.starts_with("br-") + || iface_name.starts_with("tun") + || iface_name.starts_with("wg") + { + continue; } + + // Check if it's a physical interface by looking for device directory + if !iface_path.join("device").exists() { + continue; + } + + let mac_address = Self::read_sysfs_string(&iface_path.join("address")) + .map_err(|e| format!("Failed to read MAC address for {}: {}", iface_name, e))?; + + let speed_mbps = if iface_path.join("speed").exists() { + match Self::read_sysfs_u32(&iface_path.join("speed")) { + Ok(speed) => Some(speed), + Err(e) => { + debug!( + "Failed to read speed for {}: {} . This is expected to fail on wifi interfaces.", + iface_name, e + ); + None + } + } + } else { + None + }; + + let operstate = Self::read_sysfs_string(&iface_path.join("operstate")) + .map_err(|e| format!("Failed to read operstate for {}: {}", iface_name, e))?; + + let mtu = Self::read_sysfs_u32(&iface_path.join("mtu")) + .map_err(|e| format!("Failed to read MTU for {}: {}", iface_name, e))?; + + let driver = + Self::read_sysfs_symlink_basename(&iface_path.join("device/driver/module")) + .map_err(|e| format!("Failed to read driver for {}: {}", iface_name, e))?; + + let firmware_version = Self::read_sysfs_opt_string( + &iface_path.join("device/firmware_version"), + ) + .map_err(|e| format!("Failed to read firmware version for {}: {}", iface_name, e))?; + + // Get IP addresses using ip command with JSON output + let (ipv4_addresses, ipv6_addresses) = Self::get_interface_ips_json(&iface_name) + .map_err(|e| format!("Failed to get IP addresses for {}: {}", iface_name, e))?; + + interfaces.push(NetworkInterface { + name: iface_name, + mac_address, + speed_mbps, + is_up: operstate == "up", + mtu, + ipv4_addresses, + ipv6_addresses, + driver, + firmware_version, + }); } - interfaces + Ok(interfaces) } - fn gather_management_interface() -> Option { - // Try to detect common management interfaces + fn gather_management_interface() -> Result, String> { if Path::new("/dev/ipmi0").exists() { - Some(ManagementInterface { + Ok(Some(ManagementInterface { kind: "IPMI".to_string(), address: None, - firmware: Self::read_dmi("bios-version"), - }) + firmware: Some(Self::read_dmi("bios-version")?), + })) } else if Path::new("/sys/class/misc/mei").exists() { - Some(ManagementInterface { + Ok(Some(ManagementInterface { kind: "Intel ME".to_string(), address: None, firmware: None, - }) + })) } else { - None + Ok(None) } } - fn get_host_uuid() -> String { - Self::read_dmi("system-uuid").unwrap() + fn get_host_uuid() -> Result { + Self::read_dmi("system-uuid") } // Helper methods - fn read_sysfs_string(path: &Path) -> String { + fn read_sysfs_string(path: &Path) -> Result { fs::read_to_string(path) - .unwrap_or_default() - .trim() - .to_string() - } - - fn read_sysfs_opt_string(path: &Path) -> Option { - fs::read_to_string(path) - .ok() .map(|s| s.trim().to_string()) - .filter(|s| !s.is_empty()) + .map_err(|e| format!("Failed to read {}: {}", path.display(), e)) } - fn read_sysfs_u32(path: &Path) -> Option { - fs::read_to_string(path) - .ok() - .and_then(|s| s.trim().parse().ok()) - } - - // Valid string keywords are: - // bios-vendor - // bios-version - // bios-release-date - // bios-revision - // firmware-revision - // system-manufacturer - // system-product-name - // system-version - // system-serial-number - // system-uuid - // system-sku-number - // system-family - // baseboard-manufacturer - // baseboard-product-name - // baseboard-version - // baseboard-serial-number - // baseboard-asset-tag - // chassis-manufacturer - // chassis-type - // chassis-version - // chassis-serial-number - // chassis-asset-tag - // processor-family - // processor-manufacturer - // processor-version - // processor-frequency - fn read_dmi(field: &str) -> Result { - match Command::new("dmidecode").arg("-s").arg(field).output() { - Ok(output) => { - let stdout = String::from_utf8(output.stdout).expect("Output should parse as utf8"); - if output.status.success() && stdout.is_empty() { - return Ok(stdout); - } else { - return Err(format!( - "dmidecode command failed for field {field} : {stdout}" - )); - } + fn read_sysfs_opt_string(path: &Path) -> Result, String> { + match fs::read_to_string(path) { + Ok(s) => { + let s = s.trim().to_string(); + Ok(if s.is_empty() { None } else { Some(s) }) } - Err(e) => Err(format!("dmidecode command failed for field {field} : {e}")), + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(format!("Failed to read {}: {}", path.display(), e)), } } - fn get_interface_type(device_name: &str, device_path: &Path) -> String { - if device_name.starts_with("nvme") { - "NVMe".to_string() - } else if device_name.starts_with("sd") { - "SATA".to_string() - } else if device_name.starts_with("hd") { - "IDE".to_string() - } else if device_name.starts_with("vd") { - "VirtIO".to_string() - } else { - // Try to determine from device path - Self::read_sysfs_string(&device_path.join("device/subsystem")) - .split('/') - .next_back() - .unwrap_or("Unknown") - .to_string() + fn read_sysfs_u32(path: &Path) -> Result { + fs::read_to_string(path) + .map_err(|e| format!("Failed to read {}: {}", path.display(), e))? + .trim() + .parse() + .map_err(|e| format!("Failed to parse {}: {}", path.display(), e)) + } + + fn read_sysfs_symlink_basename(path: &Path) -> Result { + match fs::read_link(path) { + Ok(target_path) => match target_path.file_name() { + Some(name_osstr) => match name_osstr.to_str() { + Some(name_str) => Ok(name_str.to_string()), + None => Err(format!( + "Symlink target basename is not valid UTF-8: {}", + target_path.display() + )), + }, + None => Err(format!( + "Symlink target has no basename: {} -> {}", + path.display(), + target_path.display() + )), + }, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Err(format!( + "Could not resolve symlink for path : {}", + path.display() + )), + Err(e) => Err(format!("Failed to read symlink {}: {}", path.display(), e)), } } - fn get_smart_status(device_name: &str) -> Option { - Command::new("smartctl") - .arg("-H") - .arg(format!("/dev/{}", device_name)) + fn read_dmi(field: &str) -> Result { + let output = Command::new("dmidecode") + .arg("-s") + .arg(field) .output() - .ok() - .filter(|output| output.status.success()) - .and_then(|output| String::from_utf8(output.stdout).ok()) - .and_then(|s| { - s.lines() - .find(|line| line.contains("SMART overall-health self-assessment")) - .and_then(|line| line.split(':').nth(1)) - .map(|s| s.trim().to_string()) + .map_err(|e| format!("Failed to execute dmidecode for field {}: {}", field, e))?; + + if !output.status.success() { + return Err(format!( + "dmidecode command failed for field {}: {}", + field, + String::from_utf8_lossy(&output.stderr) + )); + } + + String::from_utf8(output.stdout) + .map(|s| s.trim().to_string()) + .map_err(|e| { + format!( + "Failed to parse dmidecode output for field {}: {}", + field, e + ) }) } - fn parse_size(size_str: &str) -> Option { - if size_str.ends_with('T') { - size_str[..size_str.len() - 1] - .parse::() - .ok() - .map(|t| t * 1024 * 1024 * 1024 * 1024) - } else if size_str.ends_with('G') { - size_str[..size_str.len() - 1] - .parse::() - .ok() - .map(|g| g * 1024 * 1024 * 1024) - } else if size_str.ends_with('M') { - size_str[..size_str.len() - 1] - .parse::() - .ok() - .map(|m| m * 1024 * 1024) - } else if size_str.ends_with('K') { - size_str[..size_str.len() - 1] - .parse::() - .ok() - .map(|k| k * 1024) - } else if size_str.ends_with('B') { - size_str[..size_str.len() - 1].parse::().ok() + fn get_interface_type(device_name: &str, device_path: &Path) -> Result { + if device_name.starts_with("nvme") { + Ok("NVMe".to_string()) + } else if device_name.starts_with("sd") { + Ok("SATA".to_string()) + } else if device_name.starts_with("hd") { + Ok("IDE".to_string()) + } else if device_name.starts_with("vd") { + Ok("VirtIO".to_string()) } else { - size_str.parse::().ok() + // Try to determine from device path + let subsystem = Self::read_sysfs_string(&device_path.join("device/subsystem"))?; + Ok(subsystem + .split('/') + .next_back() + .unwrap_or("Unknown") + .to_string()) } } - fn get_interface_ips_json(iface_name: &str) -> (Vec, Vec) { + fn get_smart_status(device_name: &str) -> Result, String> { + let output = Command::new("smartctl") + .arg("-H") + .arg(format!("/dev/{}", device_name)) + .output() + .map_err(|e| format!("Failed to execute smartctl for {}: {}", device_name, e))?; + + if !output.status.success() { + return Ok(None); + } + + let stdout = String::from_utf8(output.stdout) + .map_err(|e| format!("Failed to parse smartctl output for {}: {}", device_name, e))?; + + for line in stdout.lines() { + if line.contains("SMART overall-health self-assessment") { + if let Some(status) = line.split(':').nth(1) { + return Ok(Some(status.trim().to_string())); + } + } + } + + Ok(None) + } + + fn parse_size(size_str: &str) -> Result { + debug!("Parsing size_str '{size_str}'"); + let size; + if size_str.ends_with('T') { + size = size_str[..size_str.len() - 1] + .parse::() + .map(|t| t * 1024.0 * 1024.0 * 1024.0 * 1024.0) + .map_err(|e| format!("Failed to parse T size '{}': {}", size_str, e)) + } else if size_str.ends_with('G') { + size = size_str[..size_str.len() - 1] + .parse::() + .map(|g| g * 1024.0 * 1024.0 * 1024.0) + .map_err(|e| format!("Failed to parse G size '{}': {}", size_str, e)) + } else if size_str.ends_with('M') { + size = size_str[..size_str.len() - 1] + .parse::() + .map(|m| m * 1024.0 * 1024.0) + .map_err(|e| format!("Failed to parse M size '{}': {}", size_str, e)) + } else if size_str.ends_with('K') { + size = size_str[..size_str.len() - 1] + .parse::() + .map(|k| k * 1024.0) + .map_err(|e| format!("Failed to parse K size '{}': {}", size_str, e)) + } else if size_str.ends_with('B') { + size = size_str[..size_str.len() - 1] + .parse::() + .map_err(|e| format!("Failed to parse B size '{}': {}", size_str, e)) + } else { + size = size_str + .parse::() + .map_err(|e| format!("Failed to parse size '{}': {}", size_str, e)) + } + + size.map(|s| s as u64) + } + + fn get_interface_ips_json(iface_name: &str) -> Result<(Vec, Vec), String> { let mut ipv4 = Vec::new(); let mut ipv6 = Vec::new(); // Get IPv4 addresses using JSON output - if let Ok(output) = Command::new("ip") + let output = Command::new("ip") .args(["-j", "-4", "addr", "show", iface_name]) .output() - && output.status.success() - && let Ok(json) = serde_json::from_slice::(&output.stdout) - && let Some(addrs) = json.as_array() - { + .map_err(|e| { + format!( + "Failed to execute ip command for IPv4 on {}: {}", + iface_name, e + ) + })?; + + if !output.status.success() { + return Err(format!( + "ip command for IPv4 on {} failed: {}", + iface_name, + String::from_utf8_lossy(&output.stderr) + )); + } + + let json: Value = serde_json::from_slice(&output.stdout).map_err(|e| { + format!( + "Failed to parse ip JSON output for IPv4 on {}: {}", + iface_name, e + ) + })?; + + if let Some(addrs) = json.as_array() { for addr_info in addrs { if let Some(addr_info_obj) = addr_info.as_object() && let Some(addr_info) = @@ -646,13 +775,32 @@ impl PhysicalHost { } // Get IPv6 addresses using JSON output - if let Ok(output) = Command::new("ip") + let output = Command::new("ip") .args(["-j", "-6", "addr", "show", iface_name]) .output() - && output.status.success() - && let Ok(json) = serde_json::from_slice::(&output.stdout) - && let Some(addrs) = json.as_array() - { + .map_err(|e| { + format!( + "Failed to execute ip command for IPv6 on {}: {}", + iface_name, e + ) + })?; + + if !output.status.success() { + return Err(format!( + "ip command for IPv6 on {} failed: {}", + iface_name, + String::from_utf8_lossy(&output.stderr) + )); + } + + let json: Value = serde_json::from_slice(&output.stdout).map_err(|e| { + format!( + "Failed to parse ip JSON output for IPv6 on {}: {}", + iface_name, e + ) + })?; + + if let Some(addrs) = json.as_array() { for addr_info in addrs { if let Some(addr_info_obj) = addr_info.as_object() && let Some(addr_info) = @@ -672,6 +820,6 @@ impl PhysicalHost { } } - (ipv4, ipv6) + Ok((ipv4, ipv6)) } } From 89eb88d10ee257899df51d96f24463135f756c33 Mon Sep 17 00:00:00 2001 From: Willem Date: Wed, 20 Aug 2025 12:09:55 -0400 Subject: [PATCH 5/6] feat: socre to remove an osd from the ceph osd tree using K8sClient to interact with rook-ceph-toolbox pod --- harmony/src/domain/topology/k8s.rs | 80 +++- .../ceph/ceph_osd_replacement_score.rs | 355 ++++++++++++++++-- 2 files changed, 402 insertions(+), 33 deletions(-) diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 30b335f..1887c33 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -5,7 +5,7 @@ use k8s_openapi::{ }; use kube::{ Client, Config, Error, Resource, - api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt}, + api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt}, config::{KubeConfigOptions, Kubeconfig}, core::ErrorResponse, runtime::reflector::Lookup, @@ -19,6 +19,7 @@ use log::{debug, error, trace}; use serde::{Serialize, de::DeserializeOwned}; use serde_json::json; use similar::TextDiff; +use tokio::io::AsyncReadExt; #[derive(new, Clone)] pub struct K8sClient { @@ -97,6 +98,21 @@ impl K8sClient { Ok(()) } + pub async fn delete_deployment( + &self, + name: &str, + namespace: Option<&str>, + ) -> Result<(), Error> { + let deployments: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + let delete_params = DeleteParams::default(); + deployments.delete(name, &delete_params).await?; + Ok(()) + } + pub async fn wait_until_deployment_ready( &self, name: String, @@ -122,6 +138,68 @@ impl K8sClient { } } + /// Will execute a commond in the first pod found that matches the specified label + /// '{label}={name}' + pub async fn exec_app_capture_output( + &self, + name: String, + label: String, + namespace: Option<&str>, + command: Vec<&str>, + ) -> Result { + let api: Api; + + if let Some(ns) = namespace { + api = Api::namespaced(self.client.clone(), ns); + } else { + api = Api::default_namespaced(self.client.clone()); + } + let pod_list = api + .list(&ListParams::default().labels(format!("{label}={name}").as_str())) + .await + .expect("couldn't get list of pods"); + + let res = api + .exec( + pod_list + .items + .first() + .expect("couldn't get pod") + .name() + .expect("couldn't get pod name") + .into_owned() + .as_str(), + command, + &AttachParams::default().stdout(true).stderr(true), + ) + .await; + match res { + Err(e) => Err(e.to_string()), + Ok(mut process) => { + let status = process + .take_status() + .expect("Couldn't get status") + .await + .expect("Couldn't unwrap status"); + + if let Some(s) = status.status { + let mut stdout_buf = String::new(); + if let Some(mut stdout) = process.stdout().take() { + stdout.read_to_string(&mut stdout_buf).await; + } + debug!("Status: {} - {:?}", s, status.details); + if s == "Success" { + Ok(stdout_buf) + } else { + Err(s) + } + } else { + Err("Couldn't get inner status of pod exec".to_string()) + } + } + } + } + /// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}` pub async fn exec_app( &self, diff --git a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs index 173cca4..a6a85f5 100644 --- a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs +++ b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs @@ -1,72 +1,68 @@ -use std::process::Command; +use std::{ + process::Command, + sync::Arc, + time::{Duration, Instant}, +}; use async_trait::async_trait; -use serde::Serialize; +use log::{info, warn}; +use serde::{Deserialize, Serialize}; +use tokio::time::sleep; use crate::{ data::{Id, Version}, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, score::Score, - topology::{K8sclient, Topology}, + topology::{K8sclient, Topology, k8s::K8sClient}, }; #[derive(Debug, Clone, Serialize)] -pub struct PrepCephOsdReplacement { - osd_name: String, +pub struct CephRemoveOsd { + osd_deployment_name: String, rook_ceph_namespace: String, } -impl Score for PrepCephOsdReplacement { +impl Score for CephRemoveOsd { fn name(&self) -> String { - format!("CephOsdReplacementScore") + format!("CephRemoveOsdScore") } #[doc(hidden)] fn create_interpret(&self) -> Box> { - Box::new(PrepCephOsdReplacementInterpret { + Box::new(CephRemoveOsdInterpret { score: self.clone(), }) } } #[derive(Debug, Clone)] -pub struct PrepCephOsdReplacementInterpret { - score: PrepCephOsdReplacement, +pub struct CephRemoveOsdInterpret { + score: CephRemoveOsd, } #[async_trait] -impl Interpret for PrepCephOsdReplacementInterpret { +impl Interpret for CephRemoveOsdInterpret { async fn execute( &self, _inventory: &Inventory, topology: &T, ) -> Result { let client = topology.k8s_client().await.unwrap(); - client - .scale_deployment( - &self.score.osd_name, - Some(&self.score.rook_ceph_namespace), - 0, - ) - .await?; - let dep = client - .get_deployment(&self.score.osd_name, Some(&self.score.rook_ceph_namespace)) + self.scale_deployment(client.clone()).await?; + self.verify_deployment_scaled(client.clone()).await?; + self.delete_deployment(client.clone()).await?; + self.verify_deployment_deleted(client.clone()).await?; + let osd_id_full = self.get_ceph_osd_id().unwrap(); + self.purge_ceph_osd(client.clone(), &osd_id_full).await?; + self.verify_ceph_osd_removal(client.clone(), &osd_id_full) .await?; - match dep.unwrap().status.and_then(|s| s.ready_replicas) { - Some(0) => Ok(Outcome::success( - "Successfully prepared rook-ceph-cluster for disk replacement".to_string(), - )), - Some(_) => Err(InterpretError::new({ - "Deployment still has ready replicas".to_string() - })), - None => Err(InterpretError::new({ - "Failed to get rook-ceph-cluster status".to_string() - })), - } + Ok(Outcome::success(format!( + "Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}", + osd_id_full, self.score.osd_deployment_name + ))) } - fn get_name(&self) -> InterpretName { todo!() } @@ -83,3 +79,298 @@ impl Interpret for PrepCephOsdReplacementInterpret { todo!() } } + +impl CephRemoveOsdInterpret { + pub fn get_ceph_osd_id(&self) -> Result { + let osd_id_numeric = self + .score + .osd_deployment_name + .split('-') + .nth(3) + .ok_or_else(|| { + InterpretError::new(format!( + "Could not parse OSD id from deployment name {}", + self.score.osd_deployment_name + )) + })?; + let osd_id_full = format!("osd.{}", osd_id_numeric); + + info!( + "Targeting Ceph OSD: {} (parsed from deployment {})", + osd_id_full, self.score.osd_deployment_name + ); + + Ok(osd_id_full) + } + + fn build_timer(&self) -> (Duration, Duration, Instant) { + let timeout = Duration::from_secs(120); + let interval = Duration::from_secs(5); + let start = Instant::now(); + (timeout, interval, start) + } + + pub async fn scale_deployment( + &self, + client: Arc, + ) -> Result { + info!( + "Scaling down OSD deployment: {}", + self.score.osd_deployment_name + ); + client + .scale_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + 0, + ) + .await?; + Ok(Outcome::success(format!( + "Scaled down deployment {}", + self.score.osd_deployment_name + ))) + } + + pub async fn verify_deployment_scaled( + &self, + client: Arc, + ) -> Result { + let (timeout, interval, start) = self.build_timer(); + + info!("Waiting for OSD deployment to scale down to 0 replicas"); + loop { + let dep = client + .get_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + ) + .await?; + + if let Some(deployment) = dep { + if let Some(status) = deployment.status { + if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0 + { + return Ok(Outcome::success( + "Deployment successfully scaled down.".to_string(), + )); + } + } + } + + if start.elapsed() > timeout { + return Err(InterpretError::new(format!( + "Timed out waiting for deployment {} to scale down", + self.score.osd_deployment_name + ))); + } + sleep(interval).await; + } + } + + pub async fn delete_deployment( + &self, + client: Arc, + ) -> Result { + info!( + "Deleting OSD deployment: {}", + self.score.osd_deployment_name + ); + client + .delete_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + ) + .await?; + Ok(Outcome::success(format!( + "deployment {} deleted", + self.score.osd_deployment_name + ))) + } + + pub async fn verify_deployment_deleted( + &self, + client: Arc, + ) -> Result { + let (timeout, interval, start) = self.build_timer(); + + info!("Waiting for OSD deployment to scale down to 0 replicas"); + loop { + let dep = client + .get_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + ) + .await?; + + if dep.is_none() { + info!( + "Deployment {} successfully deleted.", + self.score.osd_deployment_name + ); + return Ok(Outcome::success(format!( + "Deployment {} deleted.", + self.score.osd_deployment_name + ))); + } + + if start.elapsed() > timeout { + return Err(InterpretError::new(format!( + "Timed out waiting for deployment {} to be deleted", + self.score.osd_deployment_name + ))); + } + sleep(interval).await; + } + } + + fn get_osd_tree(&self, json: serde_json::Value) -> Result { + let nodes = json.get("nodes").ok_or_else(|| { + InterpretError::new("Missing 'nodes' field in ceph osd tree JSON".to_string()) + })?; + let tree: CephOsdTree = CephOsdTree{ nodes: serde_json::from_value(nodes.clone()).map_err(|e| { + InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e)) + })?,}; + Ok(tree) + } + + pub async fn purge_ceph_osd( + &self, + client: Arc, + osd_id_full: &str, + ) -> Result { + info!( + "Purging OSD {} from Ceph cluster and removing its auth key", + osd_id_full + ); + client + .exec_app_capture_output( + "rook-ceph-tools".to_string(), + "app".to_string(), + Some(&self.score.rook_ceph_namespace), + vec![ + format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(), + format!("ceph auth del osd.{osd_id_full}").as_str(), + ], + ) + .await?; + Ok(Outcome::success(format!( + "osd id {} removed from osd tree", + osd_id_full + ))) + } + + pub async fn verify_ceph_osd_removal( + &self, + client: Arc, + osd_id_full: &str, + ) -> Result { + let (timeout, interval, start) = self.build_timer(); + info!( + "Verifying OSD {} has been removed from the Ceph tree...", + osd_id_full + ); + loop { + let output = client + .exec_app_capture_output( + "rook-ceph-tools".to_string(), + "app".to_string(), + Some(&self.score.rook_ceph_namespace), + vec!["ceph osd tree -f json"], + ) + .await?; + let tree = + self.get_osd_tree(serde_json::from_str(&output).expect("could not extract json")); + + let osd_found = tree + .unwrap() + .nodes + .iter() + .any(|node| node.name == osd_id_full); + + if !osd_found { + return Ok(Outcome::success(format!( + "Successfully verified that OSD {} is removed from the Ceph cluster.", + osd_id_full, + ))); + } + + if start.elapsed() > timeout { + return Err(InterpretError::new(format!( + "Timed out waiting for OSD {} to be removed from Ceph tree", + osd_id_full + ))); + } + + warn!( + "OSD {} still found in Ceph tree, retrying in {:?}...", + osd_id_full, interval + ); + sleep(interval).await; + } + } +} +#[derive(Debug, Deserialize, PartialEq)] +pub struct CephOsdTree { + pub nodes: Vec, +} + +#[derive(Debug, Deserialize, PartialEq)] +pub struct CephNode { + pub id: i32, + pub name: String, + #[serde(rename = "type")] + pub node_type: String, + pub type_id: Option, + pub children: Option>, + pub exists: Option, + pub status: Option, +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + + #[test] + fn test_get_osd_tree() { + let json_data = json!({ + "nodes": [ + {"id": 1, "name": "osd.1", "type": "osd", "primary_affinity":"1"}, + {"id": 2, "name": "osd.2", "type": "osd", "crush_weight": 1.22344} + ] + }); + let interpret = CephRemoveOsdInterpret { + score: CephRemoveOsd { + osd_deployment_name: "osd-1".to_string(), + rook_ceph_namespace: "dummy_ns".to_string(), + }, + }; + let json = interpret.get_osd_tree(json_data).unwrap(); + + let expected = CephOsdTree { + nodes: vec![ + CephNode { + id: 1, + name: "osd.1".to_string(), + node_type: "osd".to_string(), + type_id: None, + children: None, + exists: None, + status: None, + }, + CephNode { + id: 2, + name: "osd.2".to_string(), + node_type: "osd".to_string(), + type_id: None, + children: None, + exists: None, + status: None, + }, + ], + }; + + assert_eq!(json, expected); + } +} From cd3ea6fc10bbaf1f20e8e1645094903022aa8ef5 Mon Sep 17 00:00:00 2001 From: Willem Date: Wed, 20 Aug 2025 12:54:19 -0400 Subject: [PATCH 6/6] fix: added check to ensure that rook-ceph-tools is available in the designated namespace --- .../ceph/ceph_osd_replacement_score.rs | 59 ++++++++++++++++--- 1 file changed, 51 insertions(+), 8 deletions(-) diff --git a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs index a6a85f5..a4b0cb0 100644 --- a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs +++ b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs @@ -49,6 +49,7 @@ impl Interpret for CephRemoveOsdInterpret { topology: &T, ) -> Result { let client = topology.k8s_client().await.unwrap(); + self.verify_ceph_toolbox_exists(client.clone()).await?; self.scale_deployment(client.clone()).await?; self.verify_deployment_scaled(client.clone()).await?; self.delete_deployment(client.clone()).await?; @@ -103,11 +104,45 @@ impl CephRemoveOsdInterpret { Ok(osd_id_full) } - fn build_timer(&self) -> (Duration, Duration, Instant) { - let timeout = Duration::from_secs(120); - let interval = Duration::from_secs(5); - let start = Instant::now(); - (timeout, interval, start) + pub async fn verify_ceph_toolbox_exists( + &self, + client: Arc, + ) -> Result { + let toolbox_dep = "rook-ceph-tools".to_string(); + + match client + .get_deployment(&toolbox_dep, Some(&self.score.rook_ceph_namespace)) + .await + { + Ok(Some(deployment)) => { + if let Some(status) = deployment.status { + let ready_count = status.ready_replicas.unwrap_or(0); + if ready_count >= 1 { + return Ok(Outcome::success(format!( + "'{}' is ready with {} replica(s).", + &toolbox_dep, ready_count + ))); + } else { + return Err(InterpretError::new( + "ceph-tool-box not ready in cluster".to_string(), + )); + } + } else { + Err(InterpretError::new(format!( + "failed to get deployment status {}", + &toolbox_dep + ))) + } + } + Ok(None) => Err(InterpretError::new(format!( + "Deployment '{}' not found in namespace '{}'.", + &toolbox_dep, self.score.rook_ceph_namespace + ))), + Err(e) => Err(InterpretError::new(format!( + "Failed to query for deployment '{}': {}", + &toolbox_dep, e + ))), + } } pub async fn scale_deployment( @@ -167,6 +202,12 @@ impl CephRemoveOsdInterpret { } } + fn build_timer(&self) -> (Duration, Duration, Instant) { + let timeout = Duration::from_secs(120); + let interval = Duration::from_secs(5); + let start = Instant::now(); + (timeout, interval, start) + } pub async fn delete_deployment( &self, client: Arc, @@ -227,9 +268,11 @@ impl CephRemoveOsdInterpret { let nodes = json.get("nodes").ok_or_else(|| { InterpretError::new("Missing 'nodes' field in ceph osd tree JSON".to_string()) })?; - let tree: CephOsdTree = CephOsdTree{ nodes: serde_json::from_value(nodes.clone()).map_err(|e| { - InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e)) - })?,}; + let tree: CephOsdTree = CephOsdTree { + nodes: serde_json::from_value(nodes.clone()).map_err(|e| { + InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e)) + })?, + }; Ok(tree) }