diff --git a/Cargo.lock b/Cargo.lock index 8f3e86d..99822cb 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2193,7 +2193,6 @@ dependencies = [ "serde", "serde_json", "sysinfo", - "uuid", ] [[package]] diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 388ff9d..1887c33 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -5,7 +5,7 @@ use k8s_openapi::{ }; use kube::{ Client, Config, Error, Resource, - api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt}, + api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt}, config::{KubeConfigOptions, Kubeconfig}, core::ErrorResponse, runtime::reflector::Lookup, @@ -17,7 +17,9 @@ use kube::{ }; use log::{debug, error, trace}; use serde::{Serialize, de::DeserializeOwned}; +use serde_json::json; use similar::TextDiff; +use tokio::io::AsyncReadExt; #[derive(new, Clone)] pub struct K8sClient { @@ -51,6 +53,66 @@ impl K8sClient { }) } + pub async fn get_deployment( + &self, + name: &str, + namespace: Option<&str>, + ) -> Result, Error> { + let deps: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + Ok(deps.get_opt(name).await?) + } + + pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result, Error> { + let pods: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + Ok(pods.get_opt(name).await?) + } + + pub async fn scale_deployment( + &self, + name: &str, + namespace: Option<&str>, + replicas: u32, + ) -> Result<(), Error> { + let deployments: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + + let patch = json!({ + "spec": { + "replicas": replicas + } + }); + let pp = PatchParams::default(); + let scale = Patch::Apply(&patch); + deployments.patch_scale(name, &pp, &scale).await?; + Ok(()) + } + + pub async fn delete_deployment( + &self, + name: &str, + namespace: Option<&str>, + ) -> Result<(), Error> { + let deployments: Api = if let Some(ns) = namespace { + Api::namespaced(self.client.clone(), ns) + } else { + Api::default_namespaced(self.client.clone()) + }; + let delete_params = DeleteParams::default(); + deployments.delete(name, &delete_params).await?; + Ok(()) + } + pub async fn wait_until_deployment_ready( &self, name: String, @@ -76,6 +138,68 @@ impl K8sClient { } } + /// Will execute a commond in the first pod found that matches the specified label + /// '{label}={name}' + pub async fn exec_app_capture_output( + &self, + name: String, + label: String, + namespace: Option<&str>, + command: Vec<&str>, + ) -> Result { + let api: Api; + + if let Some(ns) = namespace { + api = Api::namespaced(self.client.clone(), ns); + } else { + api = Api::default_namespaced(self.client.clone()); + } + let pod_list = api + .list(&ListParams::default().labels(format!("{label}={name}").as_str())) + .await + .expect("couldn't get list of pods"); + + let res = api + .exec( + pod_list + .items + .first() + .expect("couldn't get pod") + .name() + .expect("couldn't get pod name") + .into_owned() + .as_str(), + command, + &AttachParams::default().stdout(true).stderr(true), + ) + .await; + match res { + Err(e) => Err(e.to_string()), + Ok(mut process) => { + let status = process + .take_status() + .expect("Couldn't get status") + .await + .expect("Couldn't unwrap status"); + + if let Some(s) = status.status { + let mut stdout_buf = String::new(); + if let Some(mut stdout) = process.stdout().take() { + stdout.read_to_string(&mut stdout_buf).await; + } + debug!("Status: {} - {:?}", s, status.details); + if s == "Success" { + Ok(stdout_buf) + } else { + Err(s) + } + } else { + Err("Couldn't get inner status of pod exec".to_string()) + } + } + } + } + /// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}` pub async fn exec_app( &self, diff --git a/harmony/src/modules/mod.rs b/harmony/src/modules/mod.rs index e9b6c52..6df5c41 100644 --- a/harmony/src/modules/mod.rs +++ b/harmony/src/modules/mod.rs @@ -14,5 +14,6 @@ pub mod monitoring; pub mod okd; pub mod opnsense; pub mod prometheus; +pub mod storage; pub mod tenant; pub mod tftp; diff --git a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs new file mode 100644 index 0000000..a4b0cb0 --- /dev/null +++ b/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs @@ -0,0 +1,419 @@ +use std::{ + process::Command, + sync::Arc, + time::{Duration, Instant}, +}; + +use async_trait::async_trait; +use log::{info, warn}; +use serde::{Deserialize, Serialize}; +use tokio::time::sleep; + +use crate::{ + data::{Id, Version}, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + score::Score, + topology::{K8sclient, Topology, k8s::K8sClient}, +}; + +#[derive(Debug, Clone, Serialize)] +pub struct CephRemoveOsd { + osd_deployment_name: String, + rook_ceph_namespace: String, +} + +impl Score for CephRemoveOsd { + fn name(&self) -> String { + format!("CephRemoveOsdScore") + } + + #[doc(hidden)] + fn create_interpret(&self) -> Box> { + Box::new(CephRemoveOsdInterpret { + score: self.clone(), + }) + } +} + +#[derive(Debug, Clone)] +pub struct CephRemoveOsdInterpret { + score: CephRemoveOsd, +} + +#[async_trait] +impl Interpret for CephRemoveOsdInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + ) -> Result { + let client = topology.k8s_client().await.unwrap(); + self.verify_ceph_toolbox_exists(client.clone()).await?; + self.scale_deployment(client.clone()).await?; + self.verify_deployment_scaled(client.clone()).await?; + self.delete_deployment(client.clone()).await?; + self.verify_deployment_deleted(client.clone()).await?; + let osd_id_full = self.get_ceph_osd_id().unwrap(); + self.purge_ceph_osd(client.clone(), &osd_id_full).await?; + self.verify_ceph_osd_removal(client.clone(), &osd_id_full) + .await?; + + Ok(Outcome::success(format!( + "Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}", + osd_id_full, self.score.osd_deployment_name + ))) + } + fn get_name(&self) -> InterpretName { + todo!() + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +} + +impl CephRemoveOsdInterpret { + pub fn get_ceph_osd_id(&self) -> Result { + let osd_id_numeric = self + .score + .osd_deployment_name + .split('-') + .nth(3) + .ok_or_else(|| { + InterpretError::new(format!( + "Could not parse OSD id from deployment name {}", + self.score.osd_deployment_name + )) + })?; + let osd_id_full = format!("osd.{}", osd_id_numeric); + + info!( + "Targeting Ceph OSD: {} (parsed from deployment {})", + osd_id_full, self.score.osd_deployment_name + ); + + Ok(osd_id_full) + } + + pub async fn verify_ceph_toolbox_exists( + &self, + client: Arc, + ) -> Result { + let toolbox_dep = "rook-ceph-tools".to_string(); + + match client + .get_deployment(&toolbox_dep, Some(&self.score.rook_ceph_namespace)) + .await + { + Ok(Some(deployment)) => { + if let Some(status) = deployment.status { + let ready_count = status.ready_replicas.unwrap_or(0); + if ready_count >= 1 { + return Ok(Outcome::success(format!( + "'{}' is ready with {} replica(s).", + &toolbox_dep, ready_count + ))); + } else { + return Err(InterpretError::new( + "ceph-tool-box not ready in cluster".to_string(), + )); + } + } else { + Err(InterpretError::new(format!( + "failed to get deployment status {}", + &toolbox_dep + ))) + } + } + Ok(None) => Err(InterpretError::new(format!( + "Deployment '{}' not found in namespace '{}'.", + &toolbox_dep, self.score.rook_ceph_namespace + ))), + Err(e) => Err(InterpretError::new(format!( + "Failed to query for deployment '{}': {}", + &toolbox_dep, e + ))), + } + } + + pub async fn scale_deployment( + &self, + client: Arc, + ) -> Result { + info!( + "Scaling down OSD deployment: {}", + self.score.osd_deployment_name + ); + client + .scale_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + 0, + ) + .await?; + Ok(Outcome::success(format!( + "Scaled down deployment {}", + self.score.osd_deployment_name + ))) + } + + pub async fn verify_deployment_scaled( + &self, + client: Arc, + ) -> Result { + let (timeout, interval, start) = self.build_timer(); + + info!("Waiting for OSD deployment to scale down to 0 replicas"); + loop { + let dep = client + .get_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + ) + .await?; + + if let Some(deployment) = dep { + if let Some(status) = deployment.status { + if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0 + { + return Ok(Outcome::success( + "Deployment successfully scaled down.".to_string(), + )); + } + } + } + + if start.elapsed() > timeout { + return Err(InterpretError::new(format!( + "Timed out waiting for deployment {} to scale down", + self.score.osd_deployment_name + ))); + } + sleep(interval).await; + } + } + + fn build_timer(&self) -> (Duration, Duration, Instant) { + let timeout = Duration::from_secs(120); + let interval = Duration::from_secs(5); + let start = Instant::now(); + (timeout, interval, start) + } + pub async fn delete_deployment( + &self, + client: Arc, + ) -> Result { + info!( + "Deleting OSD deployment: {}", + self.score.osd_deployment_name + ); + client + .delete_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + ) + .await?; + Ok(Outcome::success(format!( + "deployment {} deleted", + self.score.osd_deployment_name + ))) + } + + pub async fn verify_deployment_deleted( + &self, + client: Arc, + ) -> Result { + let (timeout, interval, start) = self.build_timer(); + + info!("Waiting for OSD deployment to scale down to 0 replicas"); + loop { + let dep = client + .get_deployment( + &self.score.osd_deployment_name, + Some(&self.score.rook_ceph_namespace), + ) + .await?; + + if dep.is_none() { + info!( + "Deployment {} successfully deleted.", + self.score.osd_deployment_name + ); + return Ok(Outcome::success(format!( + "Deployment {} deleted.", + self.score.osd_deployment_name + ))); + } + + if start.elapsed() > timeout { + return Err(InterpretError::new(format!( + "Timed out waiting for deployment {} to be deleted", + self.score.osd_deployment_name + ))); + } + sleep(interval).await; + } + } + + fn get_osd_tree(&self, json: serde_json::Value) -> Result { + let nodes = json.get("nodes").ok_or_else(|| { + InterpretError::new("Missing 'nodes' field in ceph osd tree JSON".to_string()) + })?; + let tree: CephOsdTree = CephOsdTree { + nodes: serde_json::from_value(nodes.clone()).map_err(|e| { + InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e)) + })?, + }; + Ok(tree) + } + + pub async fn purge_ceph_osd( + &self, + client: Arc, + osd_id_full: &str, + ) -> Result { + info!( + "Purging OSD {} from Ceph cluster and removing its auth key", + osd_id_full + ); + client + .exec_app_capture_output( + "rook-ceph-tools".to_string(), + "app".to_string(), + Some(&self.score.rook_ceph_namespace), + vec![ + format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(), + format!("ceph auth del osd.{osd_id_full}").as_str(), + ], + ) + .await?; + Ok(Outcome::success(format!( + "osd id {} removed from osd tree", + osd_id_full + ))) + } + + pub async fn verify_ceph_osd_removal( + &self, + client: Arc, + osd_id_full: &str, + ) -> Result { + let (timeout, interval, start) = self.build_timer(); + info!( + "Verifying OSD {} has been removed from the Ceph tree...", + osd_id_full + ); + loop { + let output = client + .exec_app_capture_output( + "rook-ceph-tools".to_string(), + "app".to_string(), + Some(&self.score.rook_ceph_namespace), + vec!["ceph osd tree -f json"], + ) + .await?; + let tree = + self.get_osd_tree(serde_json::from_str(&output).expect("could not extract json")); + + let osd_found = tree + .unwrap() + .nodes + .iter() + .any(|node| node.name == osd_id_full); + + if !osd_found { + return Ok(Outcome::success(format!( + "Successfully verified that OSD {} is removed from the Ceph cluster.", + osd_id_full, + ))); + } + + if start.elapsed() > timeout { + return Err(InterpretError::new(format!( + "Timed out waiting for OSD {} to be removed from Ceph tree", + osd_id_full + ))); + } + + warn!( + "OSD {} still found in Ceph tree, retrying in {:?}...", + osd_id_full, interval + ); + sleep(interval).await; + } + } +} +#[derive(Debug, Deserialize, PartialEq)] +pub struct CephOsdTree { + pub nodes: Vec, +} + +#[derive(Debug, Deserialize, PartialEq)] +pub struct CephNode { + pub id: i32, + pub name: String, + #[serde(rename = "type")] + pub node_type: String, + pub type_id: Option, + pub children: Option>, + pub exists: Option, + pub status: Option, +} + +#[cfg(test)] +mod tests { + use serde_json::json; + + use super::*; + + #[test] + fn test_get_osd_tree() { + let json_data = json!({ + "nodes": [ + {"id": 1, "name": "osd.1", "type": "osd", "primary_affinity":"1"}, + {"id": 2, "name": "osd.2", "type": "osd", "crush_weight": 1.22344} + ] + }); + let interpret = CephRemoveOsdInterpret { + score: CephRemoveOsd { + osd_deployment_name: "osd-1".to_string(), + rook_ceph_namespace: "dummy_ns".to_string(), + }, + }; + let json = interpret.get_osd_tree(json_data).unwrap(); + + let expected = CephOsdTree { + nodes: vec![ + CephNode { + id: 1, + name: "osd.1".to_string(), + node_type: "osd".to_string(), + type_id: None, + children: None, + exists: None, + status: None, + }, + CephNode { + id: 2, + name: "osd.2".to_string(), + node_type: "osd".to_string(), + type_id: None, + children: None, + exists: None, + status: None, + }, + ], + }; + + assert_eq!(json, expected); + } +} diff --git a/harmony/src/modules/storage/ceph/mod.rs b/harmony/src/modules/storage/ceph/mod.rs new file mode 100644 index 0000000..a993c3d --- /dev/null +++ b/harmony/src/modules/storage/ceph/mod.rs @@ -0,0 +1 @@ +pub mod ceph_osd_replacement_score; diff --git a/harmony/src/modules/storage/mod.rs b/harmony/src/modules/storage/mod.rs new file mode 100644 index 0000000..ee3e235 --- /dev/null +++ b/harmony/src/modules/storage/mod.rs @@ -0,0 +1 @@ +pub mod ceph; diff --git a/harmony_inventory_agent/Cargo.toml b/harmony_inventory_agent/Cargo.toml index a299ca0..3b1be2c 100644 --- a/harmony_inventory_agent/Cargo.toml +++ b/harmony_inventory_agent/Cargo.toml @@ -10,4 +10,3 @@ serde.workspace = true serde_json.workspace = true log.workspace = true env_logger.workspace = true -uuid.workspace = true diff --git a/harmony_inventory_agent/src/hwinfo.rs b/harmony_inventory_agent/src/hwinfo.rs index 0a628b0..21045cc 100644 --- a/harmony_inventory_agent/src/hwinfo.rs +++ b/harmony_inventory_agent/src/hwinfo.rs @@ -1,3 +1,4 @@ +use log::debug; use serde::{Deserialize, Serialize}; use serde_json::Value; use std::fs; @@ -83,27 +84,108 @@ pub struct ManagementInterface { } impl PhysicalHost { - pub fn gather() -> Self { + pub fn gather() -> Result { let mut sys = System::new_all(); sys.refresh_all(); - Self { - storage_drives: Self::gather_storage_drives(), - storage_controller: Self::gather_storage_controller(), - memory_modules: Self::gather_memory_modules(), - cpus: Self::gather_cpus(&sys), - chipset: Self::gather_chipset(), - network_interfaces: Self::gather_network_interfaces(), - management_interface: Self::gather_management_interface(), - host_uuid: Self::get_host_uuid(), + Self::all_tools_available()?; + + Ok(Self { + storage_drives: Self::gather_storage_drives()?, + storage_controller: Self::gather_storage_controller()?, + memory_modules: Self::gather_memory_modules()?, + cpus: Self::gather_cpus(&sys)?, + chipset: Self::gather_chipset()?, + network_interfaces: Self::gather_network_interfaces()?, + management_interface: Self::gather_management_interface()?, + host_uuid: Self::get_host_uuid()?, + }) + } + + fn all_tools_available() -> Result<(), String> { + let required_tools = [ + ("lsblk", "--version"), + ("lspci", "--version"), + ("lsmod", "--version"), + ("dmidecode", "--version"), + ("smartctl", "--version"), + ("ip", "route"), // No version flag available + ]; + + let mut missing_tools = Vec::new(); + + for (tool, tool_arg) in required_tools.iter() { + // First check if tool exists in PATH using which(1) + let exists = if let Ok(output) = Command::new("which").arg(tool).output() { + output.status.success() + } else { + // Fallback: manual PATH search if which(1) is unavailable + if let Ok(path_var) = std::env::var("PATH") { + path_var.split(':').any(|dir| { + let tool_path = std::path::Path::new(dir).join(tool); + tool_path.exists() && Self::is_executable(&tool_path) + }) + } else { + false + } + }; + + if !exists { + missing_tools.push(*tool); + continue; + } + + // Verify tool is functional by checking version/help output + let mut cmd = Command::new(tool); + cmd.arg(tool_arg); + cmd.stdout(std::process::Stdio::null()); + cmd.stderr(std::process::Stdio::null()); + + if let Ok(status) = cmd.status() { + if !status.success() { + missing_tools.push(*tool); + } + } else { + missing_tools.push(*tool); + } + } + + if !missing_tools.is_empty() { + let missing_str = missing_tools + .iter() + .map(|s| s.to_string()) + .collect::>() + .join(", "); + return Err(format!( + "The following required tools are not available: {}. Please install these tools to use PhysicalHost::gather()", + missing_str + )); + } + + Ok(()) + } + + #[cfg(unix)] + fn is_executable(path: &std::path::Path) -> bool { + use std::os::unix::fs::PermissionsExt; + + match std::fs::metadata(path) { + Ok(meta) => meta.permissions().mode() & 0o111 != 0, + Err(_) => false, } } - fn gather_storage_drives() -> Vec { + #[cfg(not(unix))] + fn is_executable(_path: &std::path::Path) -> bool { + // On non-Unix systems, we assume existence implies executability + true + } + + fn gather_storage_drives() -> Result, String> { let mut drives = Vec::new(); // Use lsblk with JSON output for robust parsing - if let Ok(output) = Command::new("lsblk") + let output = Command::new("lsblk") .args([ "-d", "-o", @@ -114,132 +196,165 @@ impl PhysicalHost { "--json", ]) .output() - && output.status.success() - && let Ok(json) = serde_json::from_slice::(&output.stdout) - && let Some(blockdevices) = json.get("blockdevices").and_then(|v| v.as_array()) - { - for device in blockdevices { - let name = device - .get("name") - .and_then(|v| v.as_str()) - .unwrap_or("") - .to_string(); - if name.is_empty() { - continue; - } + .map_err(|e| format!("Failed to execute lsblk: {}", e))?; - let model = device - .get("model") - .and_then(|v| v.as_str()) - .map(|s| s.trim().to_string()) - .unwrap_or_default(); - - let serial = device - .get("serial") - .and_then(|v| v.as_str()) - .map(|s| s.trim().to_string()) - .unwrap_or_default(); - - let size_str = device.get("size").and_then(|v| v.as_str()).unwrap_or("0"); - let size_bytes = Self::parse_size(size_str).unwrap_or(0); - - let rotational = device - .get("rota") - .and_then(|v| v.as_bool()) - .unwrap_or(false); - - let wwn = device - .get("wwn") - .and_then(|v| v.as_str()) - .map(|s| s.trim().to_string()) - .filter(|s| !s.is_empty() && s != "null"); - - let device_path = Path::new("/sys/block").join(&name); - - let mut drive = StorageDrive { - name: name.clone(), - model, - serial, - size_bytes, - logical_block_size: Self::read_sysfs_u32( - &device_path.join("queue/logical_block_size"), - ) - .unwrap_or(512), - physical_block_size: Self::read_sysfs_u32( - &device_path.join("queue/physical_block_size"), - ) - .unwrap_or(512), - rotational, - wwn, - interface_type: Self::get_interface_type(&name, &device_path), - smart_status: Self::get_smart_status(&name), - }; - - // Enhance with additional sysfs info if available - if device_path.exists() { - if drive.model.is_empty() { - drive.model = Self::read_sysfs_string(&device_path.join("device/model")); - } - if drive.serial.is_empty() { - drive.serial = Self::read_sysfs_string(&device_path.join("device/serial")); - } - } - - drives.push(drive); - } + if !output.status.success() { + return Err(format!( + "lsblk command failed: {}", + String::from_utf8_lossy(&output.stderr) + )); } - drives + let json: Value = serde_json::from_slice(&output.stdout) + .map_err(|e| format!("Failed to parse lsblk JSON output: {}", e))?; + + let blockdevices = json + .get("blockdevices") + .and_then(|v| v.as_array()) + .ok_or("Invalid lsblk JSON: missing 'blockdevices' array")?; + + for device in blockdevices { + let name = device + .get("name") + .and_then(|v| v.as_str()) + .ok_or("Missing 'name' in lsblk device")? + .to_string(); + + if name.is_empty() { + continue; + } + + let model = device + .get("model") + .and_then(|v| v.as_str()) + .map(|s| s.trim().to_string()) + .unwrap_or_default(); + + let serial = device + .get("serial") + .and_then(|v| v.as_str()) + .map(|s| s.trim().to_string()) + .unwrap_or_default(); + + let size_str = device + .get("size") + .and_then(|v| v.as_str()) + .ok_or("Missing 'size' in lsblk device")?; + let size_bytes = Self::parse_size(size_str)?; + + let rotational = device + .get("rota") + .and_then(|v| v.as_bool()) + .ok_or("Missing 'rota' in lsblk device")?; + + let wwn = device + .get("wwn") + .and_then(|v| v.as_str()) + .map(|s| s.trim().to_string()) + .filter(|s| !s.is_empty() && s != "null"); + + let device_path = Path::new("/sys/block").join(&name); + + let logical_block_size = Self::read_sysfs_u32( + &device_path.join("queue/logical_block_size"), + ) + .map_err(|e| format!("Failed to read logical block size for {}: {}", name, e))?; + + let physical_block_size = Self::read_sysfs_u32( + &device_path.join("queue/physical_block_size"), + ) + .map_err(|e| format!("Failed to read physical block size for {}: {}", name, e))?; + + let interface_type = Self::get_interface_type(&name, &device_path)?; + let smart_status = Self::get_smart_status(&name)?; + + let mut drive = StorageDrive { + name: name.clone(), + model, + serial, + size_bytes, + logical_block_size, + physical_block_size, + rotational, + wwn, + interface_type, + smart_status, + }; + + // Enhance with additional sysfs info if available + if device_path.exists() { + if drive.model.is_empty() { + drive.model = Self::read_sysfs_string(&device_path.join("device/model")) + .map_err(|e| format!("Failed to read model for {}: {}", name, e))?; + } + if drive.serial.is_empty() { + drive.serial = Self::read_sysfs_string(&device_path.join("device/serial")) + .map_err(|e| format!("Failed to read serial for {}: {}", name, e))?; + } + } + + drives.push(drive); + } + + Ok(drives) } - fn gather_storage_controller() -> StorageController { + fn gather_storage_controller() -> Result { let mut controller = StorageController { name: "Unknown".to_string(), driver: "Unknown".to_string(), }; // Use lspci with JSON output if available - if let Ok(output) = Command::new("lspci") + let output = Command::new("lspci") .args(["-nn", "-d", "::0100", "-J"]) // Storage controllers class with JSON .output() - && output.status.success() - && let Ok(json) = serde_json::from_slice::(&output.stdout) - && let Some(devices) = json.as_array() - { - for device in devices { - if let Some(device_info) = device.as_object() - && let Some(name) = device_info - .get("device") - .and_then(|v| v.as_object()) - .and_then(|v| v.get("name")) - .and_then(|v| v.as_str()) - { - controller.name = name.to_string(); - break; + .map_err(|e| format!("Failed to execute lspci: {}", e))?; + + if output.status.success() { + let json: Value = serde_json::from_slice(&output.stdout) + .map_err(|e| format!("Failed to parse lspci JSON output: {}", e))?; + + if let Some(devices) = json.as_array() { + for device in devices { + if let Some(device_info) = device.as_object() + && let Some(name) = device_info + .get("device") + .and_then(|v| v.as_object()) + .and_then(|v| v.get("name")) + .and_then(|v| v.as_str()) + { + controller.name = name.to_string(); + break; + } } } } - // Fallback to text output if JSON fails - if controller.name == "Unknown" - && let Ok(output) = Command::new("lspci") + // Fallback to text output if JSON fails or no device found + if controller.name == "Unknown" { + let output = Command::new("lspci") .args(["-nn", "-d", "::0100"]) // Storage controllers class .output() - && output.status.success() - { - let output_str = String::from_utf8_lossy(&output.stdout); - if let Some(line) = output_str.lines().next() { - let parts: Vec<&str> = line.split(':').collect(); - if parts.len() > 2 { - controller.name = parts[2].trim().to_string(); + .map_err(|e| format!("Failed to execute lspci (fallback): {}", e))?; + + if output.status.success() { + let output_str = String::from_utf8_lossy(&output.stdout); + if let Some(line) = output_str.lines().next() { + let parts: Vec<&str> = line.split(':').collect(); + if parts.len() > 2 { + controller.name = parts[2].trim().to_string(); + } } } } // Try to get driver info from lsmod - if let Ok(output) = Command::new("lsmod").output() - && output.status.success() - { + let output = Command::new("lsmod") + .output() + .map_err(|e| format!("Failed to execute lsmod: {}", e))?; + + if output.status.success() { let output_str = String::from_utf8_lossy(&output.stdout); for line in output_str.lines() { if line.contains("ahci") @@ -256,67 +371,78 @@ impl PhysicalHost { } } - controller + Ok(controller) } - fn gather_memory_modules() -> Vec { + fn gather_memory_modules() -> Result, String> { let mut modules = Vec::new(); - if let Ok(output) = Command::new("dmidecode").arg("--type").arg("17").output() - && output.status.success() - { - let output_str = String::from_utf8_lossy(&output.stdout); - let sections: Vec<&str> = output_str.split("Memory Device").collect(); + let output = Command::new("dmidecode") + .arg("--type") + .arg("17") + .output() + .map_err(|e| format!("Failed to execute dmidecode: {}", e))?; - for section in sections.into_iter().skip(1) { - let mut module = MemoryModule { - size_bytes: 0, - speed_mhz: None, - manufacturer: None, - part_number: None, - serial_number: None, - rank: None, - }; + if !output.status.success() { + return Err(format!( + "dmidecode command failed: {}", + String::from_utf8_lossy(&output.stderr) + )); + } - for line in section.lines() { - let line = line.trim(); - if let Some(size_str) = line.strip_prefix("Size: ") { - if size_str != "No Module Installed" - && let Some((num, unit)) = size_str.split_once(' ') - && let Ok(num) = num.parse::() - { - module.size_bytes = match unit { - "MB" => num * 1024 * 1024, - "GB" => num * 1024 * 1024 * 1024, - "KB" => num * 1024, - _ => 0, - }; - } - } else if let Some(speed_str) = line.strip_prefix("Speed: ") { - if let Some((num, _unit)) = speed_str.split_once(' ') { - module.speed_mhz = num.parse().ok(); - } - } else if let Some(man) = line.strip_prefix("Manufacturer: ") { - module.manufacturer = Some(man.to_string()); - } else if let Some(part) = line.strip_prefix("Part Number: ") { - module.part_number = Some(part.to_string()); - } else if let Some(serial) = line.strip_prefix("Serial Number: ") { - module.serial_number = Some(serial.to_string()); - } else if let Some(rank) = line.strip_prefix("Rank: ") { - module.rank = rank.parse().ok(); + let output_str = String::from_utf8(output.stdout) + .map_err(|e| format!("Failed to parse dmidecode output: {}", e))?; + + let sections: Vec<&str> = output_str.split("Memory Device").collect(); + + for section in sections.into_iter().skip(1) { + let mut module = MemoryModule { + size_bytes: 0, + speed_mhz: None, + manufacturer: None, + part_number: None, + serial_number: None, + rank: None, + }; + + for line in section.lines() { + let line = line.trim(); + if let Some(size_str) = line.strip_prefix("Size: ") { + if size_str != "No Module Installed" + && let Some((num, unit)) = size_str.split_once(' ') + && let Ok(num) = num.parse::() + { + module.size_bytes = match unit { + "MB" => num * 1024 * 1024, + "GB" => num * 1024 * 1024 * 1024, + "KB" => num * 1024, + _ => 0, + }; } + } else if let Some(speed_str) = line.strip_prefix("Speed: ") { + if let Some((num, _unit)) = speed_str.split_once(' ') { + module.speed_mhz = num.parse().ok(); + } + } else if let Some(man) = line.strip_prefix("Manufacturer: ") { + module.manufacturer = Some(man.to_string()); + } else if let Some(part) = line.strip_prefix("Part Number: ") { + module.part_number = Some(part.to_string()); + } else if let Some(serial) = line.strip_prefix("Serial Number: ") { + module.serial_number = Some(serial.to_string()); + } else if let Some(rank) = line.strip_prefix("Rank: ") { + module.rank = rank.parse().ok(); } + } - if module.size_bytes > 0 { - modules.push(module); - } + if module.size_bytes > 0 { + modules.push(module); } } - modules + Ok(modules) } - fn gather_cpus(sys: &System) -> Vec { + fn gather_cpus(sys: &System) -> Result, String> { let mut cpus = Vec::new(); let global_cpu = sys.global_cpu_info(); @@ -328,201 +454,310 @@ impl PhysicalHost { frequency_mhz: global_cpu.frequency(), }); - cpus + Ok(cpus) } - fn gather_chipset() -> Chipset { - Chipset { - name: Self::read_dmi("board-product-name").unwrap_or_else(|| "Unknown".to_string()), - vendor: Self::read_dmi("board-manufacturer").unwrap_or_else(|| "Unknown".to_string()), - } + fn gather_chipset() -> Result { + Ok(Chipset { + name: Self::read_dmi("baseboard-product-name")?, + vendor: Self::read_dmi("baseboard-manufacturer")?, + }) } - fn gather_network_interfaces() -> Vec { + fn gather_network_interfaces() -> Result, String> { let mut interfaces = Vec::new(); let sys_net_path = Path::new("/sys/class/net"); - if let Ok(entries) = fs::read_dir(sys_net_path) { - for entry in entries.flatten() { - let iface_name = entry.file_name().into_string().unwrap_or_default(); - let iface_path = entry.path(); + let entries = fs::read_dir(sys_net_path) + .map_err(|e| format!("Failed to read /sys/class/net: {}", e))?; - // Skip virtual interfaces - if iface_name.starts_with("lo") - || iface_name.starts_with("docker") - || iface_name.starts_with("virbr") - || iface_name.starts_with("veth") - || iface_name.starts_with("br-") - || iface_name.starts_with("tun") - || iface_name.starts_with("wg") - { - continue; - } + for entry in entries { + let entry = entry.map_err(|e| format!("Failed to read directory entry: {}", e))?; + let iface_name = entry + .file_name() + .into_string() + .map_err(|_| "Invalid UTF-8 in interface name")?; + let iface_path = entry.path(); - // Check if it's a physical interface by looking for device directory - if !iface_path.join("device").exists() { - continue; - } - - let mac_address = Self::read_sysfs_string(&iface_path.join("address")); - let speed_mbps = Self::read_sysfs_u32(&iface_path.join("speed")); - let operstate = Self::read_sysfs_string(&iface_path.join("operstate")); - let mtu = Self::read_sysfs_u32(&iface_path.join("mtu")).unwrap_or(1500); - let driver = Self::read_sysfs_string(&iface_path.join("device/driver/module")); - let firmware_version = - Self::read_sysfs_opt_string(&iface_path.join("device/firmware_version")); - - // Get IP addresses using ip command with JSON output - let (ipv4_addresses, ipv6_addresses) = Self::get_interface_ips_json(&iface_name); - - interfaces.push(NetworkInterface { - name: iface_name, - mac_address, - speed_mbps, - is_up: operstate == "up", - mtu, - ipv4_addresses, - ipv6_addresses, - driver, - firmware_version, - }); + // Skip virtual interfaces + if iface_name.starts_with("lo") + || iface_name.starts_with("docker") + || iface_name.starts_with("virbr") + || iface_name.starts_with("veth") + || iface_name.starts_with("br-") + || iface_name.starts_with("tun") + || iface_name.starts_with("wg") + { + continue; } + + // Check if it's a physical interface by looking for device directory + if !iface_path.join("device").exists() { + continue; + } + + let mac_address = Self::read_sysfs_string(&iface_path.join("address")) + .map_err(|e| format!("Failed to read MAC address for {}: {}", iface_name, e))?; + + let speed_mbps = if iface_path.join("speed").exists() { + match Self::read_sysfs_u32(&iface_path.join("speed")) { + Ok(speed) => Some(speed), + Err(e) => { + debug!( + "Failed to read speed for {}: {} . This is expected to fail on wifi interfaces.", + iface_name, e + ); + None + } + } + } else { + None + }; + + let operstate = Self::read_sysfs_string(&iface_path.join("operstate")) + .map_err(|e| format!("Failed to read operstate for {}: {}", iface_name, e))?; + + let mtu = Self::read_sysfs_u32(&iface_path.join("mtu")) + .map_err(|e| format!("Failed to read MTU for {}: {}", iface_name, e))?; + + let driver = + Self::read_sysfs_symlink_basename(&iface_path.join("device/driver/module")) + .map_err(|e| format!("Failed to read driver for {}: {}", iface_name, e))?; + + let firmware_version = Self::read_sysfs_opt_string( + &iface_path.join("device/firmware_version"), + ) + .map_err(|e| format!("Failed to read firmware version for {}: {}", iface_name, e))?; + + // Get IP addresses using ip command with JSON output + let (ipv4_addresses, ipv6_addresses) = Self::get_interface_ips_json(&iface_name) + .map_err(|e| format!("Failed to get IP addresses for {}: {}", iface_name, e))?; + + interfaces.push(NetworkInterface { + name: iface_name, + mac_address, + speed_mbps, + is_up: operstate == "up", + mtu, + ipv4_addresses, + ipv6_addresses, + driver, + firmware_version, + }); } - interfaces + Ok(interfaces) } - fn gather_management_interface() -> Option { - // Try to detect common management interfaces + fn gather_management_interface() -> Result, String> { if Path::new("/dev/ipmi0").exists() { - Some(ManagementInterface { + Ok(Some(ManagementInterface { kind: "IPMI".to_string(), address: None, - firmware: Self::read_dmi("bios-version"), - }) + firmware: Some(Self::read_dmi("bios-version")?), + })) } else if Path::new("/sys/class/misc/mei").exists() { - Some(ManagementInterface { + Ok(Some(ManagementInterface { kind: "Intel ME".to_string(), address: None, firmware: None, - }) + })) } else { - None + Ok(None) } } - fn get_host_uuid() -> String { - Self::read_dmi("system-uuid").unwrap() + fn get_host_uuid() -> Result { + Self::read_dmi("system-uuid") } // Helper methods - fn read_sysfs_string(path: &Path) -> String { + fn read_sysfs_string(path: &Path) -> Result { fs::read_to_string(path) - .unwrap_or_default() - .trim() - .to_string() - } - - fn read_sysfs_opt_string(path: &Path) -> Option { - fs::read_to_string(path) - .ok() .map(|s| s.trim().to_string()) - .filter(|s| !s.is_empty()) + .map_err(|e| format!("Failed to read {}: {}", path.display(), e)) } - fn read_sysfs_u32(path: &Path) -> Option { + fn read_sysfs_opt_string(path: &Path) -> Result, String> { + match fs::read_to_string(path) { + Ok(s) => { + let s = s.trim().to_string(); + Ok(if s.is_empty() { None } else { Some(s) }) + } + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None), + Err(e) => Err(format!("Failed to read {}: {}", path.display(), e)), + } + } + + fn read_sysfs_u32(path: &Path) -> Result { fs::read_to_string(path) - .ok() - .and_then(|s| s.trim().parse().ok()) + .map_err(|e| format!("Failed to read {}: {}", path.display(), e))? + .trim() + .parse() + .map_err(|e| format!("Failed to parse {}: {}", path.display(), e)) } - fn read_dmi(field: &str) -> Option { - Command::new("dmidecode") + fn read_sysfs_symlink_basename(path: &Path) -> Result { + match fs::read_link(path) { + Ok(target_path) => match target_path.file_name() { + Some(name_osstr) => match name_osstr.to_str() { + Some(name_str) => Ok(name_str.to_string()), + None => Err(format!( + "Symlink target basename is not valid UTF-8: {}", + target_path.display() + )), + }, + None => Err(format!( + "Symlink target has no basename: {} -> {}", + path.display(), + target_path.display() + )), + }, + Err(e) if e.kind() == std::io::ErrorKind::NotFound => Err(format!( + "Could not resolve symlink for path : {}", + path.display() + )), + Err(e) => Err(format!("Failed to read symlink {}: {}", path.display(), e)), + } + } + + fn read_dmi(field: &str) -> Result { + let output = Command::new("dmidecode") .arg("-s") .arg(field) .output() - .ok() - .filter(|output| output.status.success()) - .and_then(|output| String::from_utf8(output.stdout).ok()) - .map(|s| s.trim().to_string()) - .filter(|s| !s.is_empty()) - } + .map_err(|e| format!("Failed to execute dmidecode for field {}: {}", field, e))?; - fn get_interface_type(device_name: &str, device_path: &Path) -> String { - if device_name.starts_with("nvme") { - "NVMe".to_string() - } else if device_name.starts_with("sd") { - "SATA".to_string() - } else if device_name.starts_with("hd") { - "IDE".to_string() - } else if device_name.starts_with("vd") { - "VirtIO".to_string() - } else { - // Try to determine from device path - Self::read_sysfs_string(&device_path.join("device/subsystem")) - .split('/') - .next_back() - .unwrap_or("Unknown") - .to_string() + if !output.status.success() { + return Err(format!( + "dmidecode command failed for field {}: {}", + field, + String::from_utf8_lossy(&output.stderr) + )); } - } - fn get_smart_status(device_name: &str) -> Option { - Command::new("smartctl") - .arg("-H") - .arg(format!("/dev/{}", device_name)) - .output() - .ok() - .filter(|output| output.status.success()) - .and_then(|output| String::from_utf8(output.stdout).ok()) - .and_then(|s| { - s.lines() - .find(|line| line.contains("SMART overall-health self-assessment")) - .and_then(|line| line.split(':').nth(1)) - .map(|s| s.trim().to_string()) + String::from_utf8(output.stdout) + .map(|s| s.trim().to_string()) + .map_err(|e| { + format!( + "Failed to parse dmidecode output for field {}: {}", + field, e + ) }) } - fn parse_size(size_str: &str) -> Option { - if size_str.ends_with('T') { - size_str[..size_str.len() - 1] - .parse::() - .ok() - .map(|t| t * 1024 * 1024 * 1024 * 1024) - } else if size_str.ends_with('G') { - size_str[..size_str.len() - 1] - .parse::() - .ok() - .map(|g| g * 1024 * 1024 * 1024) - } else if size_str.ends_with('M') { - size_str[..size_str.len() - 1] - .parse::() - .ok() - .map(|m| m * 1024 * 1024) - } else if size_str.ends_with('K') { - size_str[..size_str.len() - 1] - .parse::() - .ok() - .map(|k| k * 1024) - } else if size_str.ends_with('B') { - size_str[..size_str.len() - 1].parse::().ok() + fn get_interface_type(device_name: &str, device_path: &Path) -> Result { + if device_name.starts_with("nvme") { + Ok("NVMe".to_string()) + } else if device_name.starts_with("sd") { + Ok("SATA".to_string()) + } else if device_name.starts_with("hd") { + Ok("IDE".to_string()) + } else if device_name.starts_with("vd") { + Ok("VirtIO".to_string()) } else { - size_str.parse::().ok() + // Try to determine from device path + let subsystem = Self::read_sysfs_string(&device_path.join("device/subsystem"))?; + Ok(subsystem + .split('/') + .next_back() + .unwrap_or("Unknown") + .to_string()) } } - fn get_interface_ips_json(iface_name: &str) -> (Vec, Vec) { + fn get_smart_status(device_name: &str) -> Result, String> { + let output = Command::new("smartctl") + .arg("-H") + .arg(format!("/dev/{}", device_name)) + .output() + .map_err(|e| format!("Failed to execute smartctl for {}: {}", device_name, e))?; + + if !output.status.success() { + return Ok(None); + } + + let stdout = String::from_utf8(output.stdout) + .map_err(|e| format!("Failed to parse smartctl output for {}: {}", device_name, e))?; + + for line in stdout.lines() { + if line.contains("SMART overall-health self-assessment") { + if let Some(status) = line.split(':').nth(1) { + return Ok(Some(status.trim().to_string())); + } + } + } + + Ok(None) + } + + fn parse_size(size_str: &str) -> Result { + debug!("Parsing size_str '{size_str}'"); + let size; + if size_str.ends_with('T') { + size = size_str[..size_str.len() - 1] + .parse::() + .map(|t| t * 1024.0 * 1024.0 * 1024.0 * 1024.0) + .map_err(|e| format!("Failed to parse T size '{}': {}", size_str, e)) + } else if size_str.ends_with('G') { + size = size_str[..size_str.len() - 1] + .parse::() + .map(|g| g * 1024.0 * 1024.0 * 1024.0) + .map_err(|e| format!("Failed to parse G size '{}': {}", size_str, e)) + } else if size_str.ends_with('M') { + size = size_str[..size_str.len() - 1] + .parse::() + .map(|m| m * 1024.0 * 1024.0) + .map_err(|e| format!("Failed to parse M size '{}': {}", size_str, e)) + } else if size_str.ends_with('K') { + size = size_str[..size_str.len() - 1] + .parse::() + .map(|k| k * 1024.0) + .map_err(|e| format!("Failed to parse K size '{}': {}", size_str, e)) + } else if size_str.ends_with('B') { + size = size_str[..size_str.len() - 1] + .parse::() + .map_err(|e| format!("Failed to parse B size '{}': {}", size_str, e)) + } else { + size = size_str + .parse::() + .map_err(|e| format!("Failed to parse size '{}': {}", size_str, e)) + } + + size.map(|s| s as u64) + } + + fn get_interface_ips_json(iface_name: &str) -> Result<(Vec, Vec), String> { let mut ipv4 = Vec::new(); let mut ipv6 = Vec::new(); // Get IPv4 addresses using JSON output - if let Ok(output) = Command::new("ip") + let output = Command::new("ip") .args(["-j", "-4", "addr", "show", iface_name]) .output() - && output.status.success() - && let Ok(json) = serde_json::from_slice::(&output.stdout) - && let Some(addrs) = json.as_array() - { + .map_err(|e| { + format!( + "Failed to execute ip command for IPv4 on {}: {}", + iface_name, e + ) + })?; + + if !output.status.success() { + return Err(format!( + "ip command for IPv4 on {} failed: {}", + iface_name, + String::from_utf8_lossy(&output.stderr) + )); + } + + let json: Value = serde_json::from_slice(&output.stdout).map_err(|e| { + format!( + "Failed to parse ip JSON output for IPv4 on {}: {}", + iface_name, e + ) + })?; + + if let Some(addrs) = json.as_array() { for addr_info in addrs { if let Some(addr_info_obj) = addr_info.as_object() && let Some(addr_info) = @@ -540,13 +775,32 @@ impl PhysicalHost { } // Get IPv6 addresses using JSON output - if let Ok(output) = Command::new("ip") + let output = Command::new("ip") .args(["-j", "-6", "addr", "show", iface_name]) .output() - && output.status.success() - && let Ok(json) = serde_json::from_slice::(&output.stdout) - && let Some(addrs) = json.as_array() - { + .map_err(|e| { + format!( + "Failed to execute ip command for IPv6 on {}: {}", + iface_name, e + ) + })?; + + if !output.status.success() { + return Err(format!( + "ip command for IPv6 on {} failed: {}", + iface_name, + String::from_utf8_lossy(&output.stderr) + )); + } + + let json: Value = serde_json::from_slice(&output.stdout).map_err(|e| { + format!( + "Failed to parse ip JSON output for IPv6 on {}: {}", + iface_name, e + ) + })?; + + if let Some(addrs) = json.as_array() { for addr_info in addrs { if let Some(addr_info_obj) = addr_info.as_object() && let Some(addr_info) = @@ -566,6 +820,6 @@ impl PhysicalHost { } } - (ipv4, ipv6) + Ok((ipv4, ipv6)) } } diff --git a/harmony_inventory_agent/src/main.rs b/harmony_inventory_agent/src/main.rs index 9421056..8784b00 100644 --- a/harmony_inventory_agent/src/main.rs +++ b/harmony_inventory_agent/src/main.rs @@ -9,8 +9,16 @@ mod hwinfo; async fn inventory() -> impl Responder { log::info!("Received inventory request"); let host = PhysicalHost::gather(); - log::info!("Inventory data gathered successfully"); - actix_web::HttpResponse::Ok().json(host) + match host { + Ok(host) => { + log::info!("Inventory data gathered successfully"); + actix_web::HttpResponse::Ok().json(host) + } + Err(error) => { + log::error!("Inventory data gathering FAILED"); + actix_web::HttpResponse::InternalServerError().json(error) + } + } } #[actix_web::main]