feat/discover_inventory #127

Merged
letian merged 8 commits from feat/discover_inventory into refact/harmony_types 2025-08-31 22:45:09 +00:00
18 changed files with 450 additions and 206 deletions
Showing only changes of commit 637ffde992 - Show all commits

View File

@@ -3,7 +3,7 @@ use std::sync::Arc;
use derive_new::new;
use harmony_inventory_agent::hwinfo::{CPU, MemoryModule, NetworkInterface, StorageDrive};
use harmony_types::net::MacAddress;
use serde::{Deserialize, Serialize, Serializer, ser::SerializeStruct};
use serde::{Deserialize, Serialize};
use serde_value::Value;
pub type HostGroup = Vec<PhysicalHost>;
@@ -35,7 +35,120 @@ impl PhysicalHost {
}
pub fn summary(&self) -> String {
Review

A garder un oeil si on aurait besoin de presenter ce summary de manieres differentes (e.g. fichier json). A ce moment-la, il pourrait etre interessant d'y aller avec le pattern de "presenter":

fn summary(&self, presenter: PhysicalHostPresenter) {
  presenter.present_model(self.labels, self.category);
  presenter.present_cpus(self.cpus);
  presenter.present_memory(self.memory_modules);
  presenter.present_storage(self.storage);
  // ...
}
A garder un oeil si on aurait besoin de presenter ce summary de manieres differentes (e.g. fichier json). A ce moment-la, il pourrait etre interessant d'y aller avec le pattern de "presenter": ```rs fn summary(&self, presenter: PhysicalHostPresenter) { presenter.present_model(self.labels, self.category); presenter.present_cpus(self.cpus); presenter.present_memory(self.memory_modules); presenter.present_storage(self.storage); // ... } ```
todo!();
let mut parts = Vec::new();
// Part 1: System Model (from labels) or Category as a fallback
let model = self
.labels
.iter()
.find(|l| l.name == "system-product-name" || l.name == "model")
.map(|l| l.value.clone())
.unwrap_or_else(|| self.category.to_string());
parts.push(model);
// Part 2: CPU Information
if !self.cpus.is_empty() {
let cpu_count = self.cpus.len();
let total_cores = self.cpus.iter().map(|c| c.cores).sum::<u32>();
let total_threads = self.cpus.iter().map(|c| c.threads).sum::<u32>();
let model_name = &self.cpus[0].model;
let cpu_summary = if cpu_count > 1 {
format!(
"{}x {} ({}c/{}t)",
cpu_count, model_name, total_cores, total_threads
)
} else {
format!("{} ({}c/{}t)", model_name, total_cores, total_threads)
};
parts.push(cpu_summary);
}
// Part 3: Memory Information
if !self.memory_modules.is_empty() {
let total_mem_bytes = self
.memory_modules
.iter()
.map(|m| m.size_bytes)
.sum::<u64>();
let total_mem_gb = (total_mem_bytes as f64 / (1024.0 * 1024.0 * 1024.0)).round() as u64;
// Find the most common speed among modules
let mut speeds = std::collections::HashMap::new();
for module in &self.memory_modules {
if let Some(speed) = module.speed_mhz {
*speeds.entry(speed).or_insert(0) += 1;
}
}
let common_speed = speeds
.into_iter()
.max_by_key(|&(_, count)| count)
.map(|(speed, _)| speed);
if let Some(speed) = common_speed {
parts.push(format!("{} GB RAM @ {}MHz", total_mem_gb, speed));
} else {
parts.push(format!("{} GB RAM", total_mem_gb));
}
}
// Part 4: Storage Information
if !self.storage.is_empty() {
let total_storage_bytes = self.storage.iter().map(|d| d.size_bytes).sum::<u64>();
let drive_count = self.storage.len();
let first_drive_model = &self.storage[0].model;
// Helper to format bytes into TB or GB
let format_storage = |bytes: u64| {
let tb = bytes as f64 / (1024.0 * 1024.0 * 1024.0 * 1024.0);
if tb >= 1.0 {
format!("{:.2} TB", tb)
} else {
let gb = bytes as f64 / (1024.0 * 1024.0 * 1024.0);
format!("{:.0} GB", gb)
}
};
let storage_summary = if drive_count > 1 {
format!(
"{} Storage ({}x {})",
format_storage(total_storage_bytes),
drive_count,
first_drive_model
)
} else {
format!(
"{} Storage ({})",
format_storage(total_storage_bytes),
first_drive_model
)
};
parts.push(storage_summary);
}
// Part 5: Network Information
// Prioritize an "up" interface with an IPv4 address
let best_nic = self
.network
.iter()
.find(|n| n.is_up && !n.ipv4_addresses.is_empty())
.or_else(|| self.network.first());
if let Some(nic) = best_nic {
let speed = nic
.speed_mbps
.map(|s| format!("{}Gbps", s / 1000))
.unwrap_or_else(|| "N/A".to_string());
let mac = nic.mac_address.to_string();
let nic_summary = if let Some(ip) = nic.ipv4_addresses.first() {
format!("NIC: {} ({}, {})", speed, ip, mac)
} else {
format!("NIC: {} ({})", speed, mac)
};
parts.push(nic_summary);
}
parts.join(" | ")
}
pub fn cluster_mac(&self) -> MacAddress {

View File

@@ -71,7 +71,6 @@ impl<T: Topology> Interpret<T> for DiscoverInventoryAgentInterpret {
info!("Getting host inventory on service at {address} port {port}");
tokio::task::spawn(async move {
todo!("are we here");
info!("Getting inventory for host {address} {port}");
letian marked this conversation as resolved
Review

This log and the one above are very similar

This log and the one above are very similar
let host =
harmony_inventory_agent::client::get_host_inventory(&address, port)
@@ -126,20 +125,9 @@ impl<T: Topology> Interpret<T> for DiscoverInventoryAgentInterpret {
},
)
.await;

There's an edge case here that might break this score:

discover_agents is awaited, but the spawned task inside the callback are not awaited at all. So in some edge cases, discover_agents might finish executing before the task itself.

And because we don't await for the task inside the callback, error handling is a bit awkward. Currently it says "all good" even though there could be an error:

[INFO ] Getting inventory for host 192.168.5.232 at port 8080

thread 'tokio-runtime-worker' panicked at /home/ian/Projects/nation-tech/harmony/harmony/src/modules/inventory/mod.rs:105:34:
called `Result::unwrap()` on an `Err` value: "Could not build repository : Could not connect to the database: error returned from database: (code: 14) unable to open database file"
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
[INFO ] ✅ Discovery process completed successfully
There's an edge case here that might break this score: `discover_agents` is awaited, but the _spawned_ task inside the callback are not awaited at all. So in some edge cases, `discover_agents` might finish executing before the task itself. And because we don't _await_ for the task inside the callback, error handling is a bit awkward. Currently it says "all good" even though there could be an error: ``` [INFO ] Getting inventory for host 192.168.5.232 at port 8080 thread 'tokio-runtime-worker' panicked at /home/ian/Projects/nation-tech/harmony/harmony/src/modules/inventory/mod.rs:105:34: called `Result::unwrap()` on an `Err` value: "Could not build repository : Could not connect to the database: error returned from database: (code: 14) unable to open database file" note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace [INFO ] ✅ Discovery process completed successfully ```
info!("Launched inventory host information gathering");
info!(
"tokio current {:?}",
tokio::runtime::Handle::try_current().unwrap()
);
tokio::spawn(async {
info!("Spawned a sleeper");
tokio::time::sleep(Duration::from_millis(100)).await;
info!("done a sleeper");
});
tokio::time::sleep(Duration::from_millis(1000)).await;
Ok(Outcome {
status: InterpretStatus::RUNNING,
message: "Launched discovery process".to_string(),
status: InterpretStatus::SUCCESS,
message: "Discovery process completed successfully".to_string(),
})
}

View File

@@ -16,7 +16,7 @@ where
// The receiver will be a stream of events.
let receiver = mdns.browse(SERVICE_NAME).expect("Failed to browse");
tokio::spawn(async move {
tokio::task::spawn_blocking(move || {
while let Ok(event) = receiver.recv() {
if let Err(e) = on_event(event.clone()) {
error!("Event callback failed : {e}");

View File

@@ -0,0 +1,8 @@
-- Add migration script here
CREATE TABLE IF NOT EXISTS physical_hosts (
version_id TEXT PRIMARY KEY NOT NULL,
id TEXT NOT NULL,
data JSON NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_host_id_time
ON physical_hosts (id, version_id DESC);