feat/discover_inventory #127

Merged
letian merged 8 commits from feat/discover_inventory into refact/harmony_types 2025-08-31 22:45:09 +00:00
19 changed files with 442 additions and 304 deletions

3
Cargo.lock generated
View File

@@ -2366,9 +2366,12 @@ version = "0.1.0"
dependencies = [
"actix-web",
"env_logger",
"harmony_macros",
"harmony_types",
"local-ip-address",
"log",
"mdns-sd 0.14.1 (git+https://github.com/jggc/mdns-sd.git?branch=patch-1)",
"reqwest 0.12.20",
"serde",
"serde_json",
"sysinfo",

View File

@@ -67,4 +67,4 @@ serde = { version = "1.0.209", features = ["derive", "rc"] }
serde_json = "1.0.127"
askama = "0.14"
sqlx = { version = "0.8", features = ["runtime-tokio", "sqlite" ] }
reqwest = { version = "0.12", features = ["stream", "rustls-tls", "http2"], default-features = false }
reqwest = { version = "0.12", features = ["blocking", "stream", "rustls-tls", "http2", "json"], default-features = false }

Binary file not shown.

View File

@@ -87,8 +87,7 @@ async fn main() {
let inventory = Inventory {
location: Location::new("I am mobile".to_string(), "earth".to_string()),
switch: SwitchGroup::from([]),
firewall: FirewallGroup::from([PhysicalHost::empty(HostCategory::Firewall)
.management(Arc::new(OPNSenseManagementInterface::new()))]),
firewall_mgmt: Box::new(OPNSenseManagementInterface::new()),
storage_host: vec![],
worker_host: vec![
PhysicalHost::empty(HostCategory::Server)

View File

@@ -69,8 +69,7 @@ pub fn get_inventory() -> Inventory {
"testopnsense".to_string(),
),
switch: SwitchGroup::from([]),
firewall: FirewallGroup::from([PhysicalHost::empty(HostCategory::Firewall)
.management(Arc::new(OPNSenseManagementInterface::new()))]),
firewall_mgmt: Box::new(OPNSenseManagementInterface::new()),
storage_host: vec![],
worker_host: vec![],
control_plane_host: vec![],

View File

@@ -63,8 +63,7 @@ async fn main() {
"wk".to_string(),
),
switch: SwitchGroup::from([]),
firewall: FirewallGroup::from([PhysicalHost::empty(HostCategory::Firewall)
.management(Arc::new(OPNSenseManagementInterface::new()))]),
firewall_mgmt: Box::new(OPNSenseManagementInterface::new()),
storage_host: vec![],
worker_host: vec![],
control_plane_host: vec![

View File

@@ -12,4 +12,12 @@ lazy_static! {
std::env::var("HARMONY_REGISTRY_PROJECT").unwrap_or_else(|_| "harmony".to_string());
pub static ref DRY_RUN: bool =
std::env::var("HARMONY_DRY_RUN").is_ok_and(|value| value.parse().unwrap_or(false));
pub static ref DEFAULT_DATABASE_URL: String = "sqlite://harmony.sqlite".to_string();
pub static ref DATABASE_URL: String = std::env::var("HARMONY_DATABASE_URL")
.map(|value| if value.is_empty() {
(*DEFAULT_DATABASE_URL).clone()
} else {
value
})
.unwrap_or((*DEFAULT_DATABASE_URL).clone());
}

View File

@@ -1,24 +1,24 @@
use std::{str::FromStr, sync::Arc};
use std::sync::Arc;
use derive_new::new;
use harmony_inventory_agent::hwinfo::{CPU, MemoryModule, NetworkInterface, StorageDrive};
use harmony_types::net::MacAddress;
use serde::{Deserialize, Serialize, Serializer, ser::SerializeStruct};
use serde::{Deserialize, Serialize};
use serde_value::Value;
pub type HostGroup = Vec<PhysicalHost>;
pub type SwitchGroup = Vec<Switch>;
pub type FirewallGroup = Vec<PhysicalHost>;
#[derive(Debug, Clone)]
#[derive(Debug, Clone, Serialize)]
pub struct PhysicalHost {
pub id: Id,
pub category: HostCategory,
pub network: Vec<NetworkInterface>,
pub management: Arc<dyn ManagementInterface>,
pub storage: Vec<Storage>,
pub storage: Vec<StorageDrive>,
pub labels: Vec<Label>,
pub memory_size: Option<u64>,
pub cpu_count: Option<u64>,
pub memory_modules: Vec<MemoryModule>,
pub cpus: Vec<CPU>,
}
impl PhysicalHost {
@@ -29,12 +29,128 @@ impl PhysicalHost {
network: vec![],
storage: vec![],
labels: vec![],
management: Arc::new(ManualManagementInterface {}),
memory_size: None,
cpu_count: None,
memory_modules: vec![],
cpus: vec![],
}
}
pub fn summary(&self) -> String {
Review

A garder un oeil si on aurait besoin de presenter ce summary de manieres differentes (e.g. fichier json). A ce moment-la, il pourrait etre interessant d'y aller avec le pattern de "presenter":

fn summary(&self, presenter: PhysicalHostPresenter) {
  presenter.present_model(self.labels, self.category);
  presenter.present_cpus(self.cpus);
  presenter.present_memory(self.memory_modules);
  presenter.present_storage(self.storage);
  // ...
}
A garder un oeil si on aurait besoin de presenter ce summary de manieres differentes (e.g. fichier json). A ce moment-la, il pourrait etre interessant d'y aller avec le pattern de "presenter": ```rs fn summary(&self, presenter: PhysicalHostPresenter) { presenter.present_model(self.labels, self.category); presenter.present_cpus(self.cpus); presenter.present_memory(self.memory_modules); presenter.present_storage(self.storage); // ... } ```
let mut parts = Vec::new();
// Part 1: System Model (from labels) or Category as a fallback
let model = self
.labels
.iter()
.find(|l| l.name == "system-product-name" || l.name == "model")
.map(|l| l.value.clone())
.unwrap_or_else(|| self.category.to_string());
parts.push(model);
// Part 2: CPU Information
if !self.cpus.is_empty() {
let cpu_count = self.cpus.len();
let total_cores = self.cpus.iter().map(|c| c.cores).sum::<u32>();
let total_threads = self.cpus.iter().map(|c| c.threads).sum::<u32>();
let model_name = &self.cpus[0].model;
let cpu_summary = if cpu_count > 1 {
format!(
"{}x {} ({}c/{}t)",
cpu_count, model_name, total_cores, total_threads
)
} else {
format!("{} ({}c/{}t)", model_name, total_cores, total_threads)
};
parts.push(cpu_summary);
}
// Part 3: Memory Information
if !self.memory_modules.is_empty() {
let total_mem_bytes = self
.memory_modules
.iter()
.map(|m| m.size_bytes)
.sum::<u64>();
let total_mem_gb = (total_mem_bytes as f64 / (1024.0 * 1024.0 * 1024.0)).round() as u64;
// Find the most common speed among modules
let mut speeds = std::collections::HashMap::new();
for module in &self.memory_modules {
if let Some(speed) = module.speed_mhz {
*speeds.entry(speed).or_insert(0) += 1;
}
}
let common_speed = speeds
.into_iter()
.max_by_key(|&(_, count)| count)
.map(|(speed, _)| speed);
if let Some(speed) = common_speed {
parts.push(format!("{} GB RAM @ {}MHz", total_mem_gb, speed));
} else {
parts.push(format!("{} GB RAM", total_mem_gb));
}
}
// Part 4: Storage Information
if !self.storage.is_empty() {
let total_storage_bytes = self.storage.iter().map(|d| d.size_bytes).sum::<u64>();
let drive_count = self.storage.len();
let first_drive_model = &self.storage[0].model;
// Helper to format bytes into TB or GB
let format_storage = |bytes: u64| {
let tb = bytes as f64 / (1024.0 * 1024.0 * 1024.0 * 1024.0);
if tb >= 1.0 {
format!("{:.2} TB", tb)
} else {
let gb = bytes as f64 / (1024.0 * 1024.0 * 1024.0);
format!("{:.0} GB", gb)
}
};
let storage_summary = if drive_count > 1 {
format!(
"{} Storage ({}x {})",
format_storage(total_storage_bytes),
drive_count,
first_drive_model
)
} else {
format!(
"{} Storage ({})",
format_storage(total_storage_bytes),
first_drive_model
)
};
parts.push(storage_summary);
}
// Part 5: Network Information
// Prioritize an "up" interface with an IPv4 address
let best_nic = self
.network
.iter()
.find(|n| n.is_up && !n.ipv4_addresses.is_empty())
.or_else(|| self.network.first());
if let Some(nic) = best_nic {
let speed = nic
.speed_mbps
.map(|s| format!("{}Gbps", s / 1000))
.unwrap_or_else(|| "N/A".to_string());
let mac = nic.mac_address.to_string();
let nic_summary = if let Some(ip) = nic.ipv4_addresses.first() {
format!("NIC: {} ({}, {})", speed, ip, mac)
} else {
format!("NIC: {} ({})", speed, mac)
};
parts.push(nic_summary);
}
parts.join(" | ")
}
pub fn cluster_mac(&self) -> MacAddress {
self.network
.first()
@@ -42,37 +158,17 @@ impl PhysicalHost {
.mac_address
}
pub fn cpu(mut self, cpu_count: Option<u64>) -> Self {
self.cpu_count = cpu_count;
self
}
pub fn memory_size(mut self, memory_size: Option<u64>) -> Self {
self.memory_size = memory_size;
self
}
pub fn storage(
mut self,
connection: StorageConnectionType,
kind: StorageKind,
size: u64,
serial: String,
) -> Self {
self.storage.push(Storage {
connection,
kind,
size,
serial,
});
self
}
pub fn mac_address(mut self, mac_address: MacAddress) -> Self {
self.network.push(NetworkInterface {
name: None,
name: String::new(),
mac_address,
speed: None,
speed_mbps: None,
is_up: false,
mtu: 0,
ipv4_addresses: vec![],
ipv6_addresses: vec![],
driver: String::new(),
firmware_version: None,
});
self
}
@@ -81,57 +177,52 @@ impl PhysicalHost {
self.labels.push(Label { name, value });
self
}
pub fn management(mut self, management: Arc<dyn ManagementInterface>) -> Self {
self.management = management;
self
}
}
// Custom Serialize implementation for PhysicalHost
impl Serialize for PhysicalHost {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
// Determine the number of fields
let mut num_fields = 5; // category, network, storage, labels, management
if self.memory_size.is_some() {
num_fields += 1;
}
if self.cpu_count.is_some() {
num_fields += 1;
}
// Create a serialization structure
let mut state = serializer.serialize_struct("PhysicalHost", num_fields)?;
// Serialize the standard fields
state.serialize_field("category", &self.category)?;
state.serialize_field("network", &self.network)?;
state.serialize_field("storage", &self.storage)?;
state.serialize_field("labels", &self.labels)?;
// Serialize optional fields
if let Some(memory) = self.memory_size {
state.serialize_field("memory_size", &memory)?;
}
if let Some(cpu) = self.cpu_count {
state.serialize_field("cpu_count", &cpu)?;
}
let mgmt_data = self.management.serialize_management();
// pub management: Arc<dyn ManagementInterface>,
// Handle management interface - either as a field or flattened
state.serialize_field("management", &mgmt_data)?;
state.end()
}
}
// impl Serialize for PhysicalHost {
// fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
// where
// S: Serializer,
// {
// // Determine the number of fields
// let mut num_fields = 5; // category, network, storage, labels, management
// if self.memory_modules.is_some() {
// num_fields += 1;
// }
// if self.cpus.is_some() {
// num_fields += 1;
// }
//
// // Create a serialization structure
// let mut state = serializer.serialize_struct("PhysicalHost", num_fields)?;
//
// // Serialize the standard fields
// state.serialize_field("category", &self.category)?;
// state.serialize_field("network", &self.network)?;
// state.serialize_field("storage", &self.storage)?;
// state.serialize_field("labels", &self.labels)?;
//
// // Serialize optional fields
// if let Some(memory) = self.memory_modules {
// state.serialize_field("memory_size", &memory)?;
// }
// if let Some(cpu) = self.cpus {
// state.serialize_field("cpu_count", &cpu)?;
// }
//
// let mgmt_data = self.management.serialize_management();
// // pub management: Arc<dyn ManagementInterface>,
//
// // Handle management interface - either as a field or flattened
// state.serialize_field("management", &mgmt_data)?;
//
// state.end()
// }
// }
impl<'de> Deserialize<'de> for PhysicalHost {
fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
fn deserialize<D>(_deserializer: D) -> Result<Self, D::Error>
where
D: serde::Deserializer<'de>,
{
@@ -189,61 +280,10 @@ pub enum HostCategory {
Switch,
}
#[derive(Debug, new, Clone, Serialize)]
pub struct NetworkInterface {
pub name: Option<String>,
pub mac_address: MacAddress,
pub speed: Option<u64>,
}
#[cfg(test)]
use harmony_macros::mac_address;
use harmony_types::id::Id;
#[cfg(test)]
impl NetworkInterface {
pub fn dummy() -> Self {
Self {
name: Some(String::new()),
mac_address: mac_address!("00:00:00:00:00:00"),
speed: Some(0),
}
}
}
#[derive(Debug, new, Clone, Serialize)]
pub enum StorageConnectionType {
Sata3g,
Sata6g,
Sas6g,
Sas12g,
PCIE,
}
#[derive(Debug, Clone, Serialize)]
pub enum StorageKind {
SSD,
NVME,
HDD,
}
#[derive(Debug, new, Clone, Serialize)]
pub struct Storage {
pub connection: StorageConnectionType,
pub kind: StorageKind,
pub size: u64,
pub serial: String,
}
#[cfg(test)]
impl Storage {
pub fn dummy() -> Self {
Self {
connection: StorageConnectionType::Sata3g,
kind: StorageKind::SSD,
size: 0,
serial: String::new(),
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct Switch {
@@ -274,117 +314,43 @@ impl Location {
}
}
impl std::fmt::Display for HostCategory {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
HostCategory::Server => write!(f, "Server"),
HostCategory::Firewall => write!(f, "Firewall"),
HostCategory::Switch => write!(f, "Switch"),
}
}
}
impl std::fmt::Display for Label {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}: {}", self.name, self.value)
}
}
impl std::fmt::Display for Location {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Address: {}, Name: {}", self.address, self.name)
}
}
impl std::fmt::Display for PhysicalHost {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.summary())
}
}
impl std::fmt::Display for Switch {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "Switch with {} interfaces", self._interface.len())
}
}
#[cfg(test)]
mod tests {
use super::*;
use serde::{Deserialize, Serialize};
use std::sync::Arc;
// Mock implementation of ManagementInterface
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MockHPIlo {
ip: String,
username: String,
password: String,
firmware_version: String,
}
impl ManagementInterface for MockHPIlo {
fn boot_to_pxe(&self) {}
fn get_supported_protocol_names(&self) -> String {
String::new()
}
}
// Another mock implementation
#[derive(Debug, Clone, Serialize, Deserialize)]
struct MockDellIdrac {
hostname: String,
port: u16,
api_token: String,
}
impl ManagementInterface for MockDellIdrac {
fn boot_to_pxe(&self) {}
fn get_supported_protocol_names(&self) -> String {
String::new()
}
}
#[test]
fn test_serialize_physical_host_with_hp_ilo() {
// Create a PhysicalHost with HP iLO management
let host = PhysicalHost {
id: Id::empty(),
category: HostCategory::Server,
network: vec![NetworkInterface::dummy()],
management: Arc::new(MockHPIlo {
ip: "192.168.1.100".to_string(),
username: "admin".to_string(),
password: "password123".to_string(),
firmware_version: "2.5.0".to_string(),
}),
storage: vec![Storage::dummy()],
labels: vec![Label::new("datacenter".to_string(), "us-east".to_string())],
memory_size: Some(64_000_000),
cpu_count: Some(16),
};
// Serialize to JSON
let json = serde_json::to_string(&host).expect("Failed to serialize host");
// Check that the serialized JSON contains the HP iLO details
assert!(json.contains("192.168.1.100"));
assert!(json.contains("admin"));
assert!(json.contains("password123"));
assert!(json.contains("firmware_version"));
assert!(json.contains("2.5.0"));
// Parse back to verify structure (not the exact management interface)
let parsed: serde_json::Value = serde_json::from_str(&json).expect("Failed to parse JSON");
// Verify basic structure
assert_eq!(parsed["cpu_count"], 16);
assert_eq!(parsed["memory_size"], 64_000_000);
assert_eq!(parsed["network"][0]["name"], "");
}
#[test]
fn test_serialize_physical_host_with_dell_idrac() {
// Create a PhysicalHost with Dell iDRAC management
let host = PhysicalHost {
id: Id::empty(),
category: HostCategory::Server,
network: vec![NetworkInterface::dummy()],
management: Arc::new(MockDellIdrac {
hostname: "idrac-server01".to_string(),
port: 443,
api_token: "abcdef123456".to_string(),
}),
storage: vec![Storage::dummy()],
labels: vec![Label::new("env".to_string(), "production".to_string())],
memory_size: Some(128_000_000),
cpu_count: Some(32),
};
// Serialize to JSON
let json = serde_json::to_string(&host).expect("Failed to serialize host");
// Check that the serialized JSON contains the Dell iDRAC details
assert!(json.contains("idrac-server01"));
assert!(json.contains("443"));
assert!(json.contains("abcdef123456"));
// Parse back to verify structure
let parsed: serde_json::Value = serde_json::from_str(&json).expect("Failed to parse JSON");
// Verify basic structure
assert_eq!(parsed["cpu_count"], 32);
assert_eq!(parsed["memory_size"], 128_000_000);
assert_eq!(parsed["storage"][0]["path"], serde_json::Value::Null);
}
#[test]
fn test_different_management_implementations_produce_valid_json() {
@@ -393,31 +359,20 @@ mod tests {
id: Id::empty(),
category: HostCategory::Server,
network: vec![],
management: Arc::new(MockHPIlo {
ip: "10.0.0.1".to_string(),
username: "root".to_string(),
password: "secret".to_string(),
firmware_version: "3.0.0".to_string(),
}),
storage: vec![],
labels: vec![],
memory_size: None,
cpu_count: None,
memory_modules: vec![],
cpus: vec![],
};
let host2 = PhysicalHost {
id: Id::empty(),
category: HostCategory::Server,
network: vec![],
management: Arc::new(MockDellIdrac {
hostname: "server02-idrac".to_string(),
port: 8443,
api_token: "token123".to_string(),
}),
storage: vec![],
labels: vec![],
memory_size: None,
cpu_count: None,
memory_modules: vec![],
cpus: vec![],
};
// Both should serialize successfully
@@ -427,8 +382,5 @@ mod tests {
// Both JSONs should be valid and parseable
let _: serde_json::Value = serde_json::from_str(&json1).expect("Invalid JSON for host1");
let _: serde_json::Value = serde_json::from_str(&json2).expect("Invalid JSON for host2");
// The JSONs should be different because they contain different management interfaces
assert_ne!(json1, json2);
}
}

View File

@@ -18,6 +18,8 @@ impl InventoryFilter {
use derive_new::new;
use log::info;
use crate::hardware::{ManagementInterface, ManualManagementInterface};
use super::{
filter::Filter,
hardware::{FirewallGroup, HostGroup, Location, SwitchGroup},
@@ -30,7 +32,7 @@ pub struct Inventory {
// Firewall is really just a host but with somewhat specialized hardware
// I'm not entirely sure it belongs to its own category but it helps make things easier and
// clearer for now so let's try it this way.
pub firewall: FirewallGroup,
pub firewall_mgmt: Box<dyn ManagementInterface>,
pub worker_host: HostGroup,
pub storage_host: HostGroup,
pub control_plane_host: HostGroup,
@@ -41,7 +43,7 @@ impl Inventory {
Self {
location: Location::new("Empty".to_string(), "location".to_string()),
switch: vec![],
firewall: vec![],
firewall_mgmt: Box::new(ManualManagementInterface {}),
worker_host: vec![],
storage_host: vec![],
control_plane_host: vec![],
@@ -52,7 +54,7 @@ impl Inventory {
Self {
location: Location::test_building(),
switch: SwitchGroup::new(),
firewall: FirewallGroup::new(),
firewall_mgmt: Box::new(ManualManagementInterface {}),
worker_host: HostGroup::new(),
storage_host: HostGroup::new(),
control_plane_host: HostGroup::new(),

View File

@@ -1 +1,17 @@
mod sqlite;
use crate::{
config::DATABASE_URL,
infra::inventory::sqlite::SqliteInventoryRepository,
inventory::{InventoryRepository, RepoError},
};
pub mod sqlite;
pub struct InventoryRepositoryFactory;
impl InventoryRepositoryFactory {
pub async fn build() -> Result<Box<dyn InventoryRepository>, RepoError> {
Ok(Box::new(
SqliteInventoryRepository::new(&(*DATABASE_URL)).await?,
Review

Just sharing a thought even if it's not the most representative situation here (considering we're in the infra). Though it still indirectly highlights the underlying issue.

I understand the simplicity of having a config crate and using it when needed, but it actually makes things harder to reuse/test in isolation. All of this because in the end we depend on an env var that forces anyone using the harmony crate to use that as config mechanism instead of other options.

It's fine to use this config crate in the CLI (or TUI or any frontend), but for the actual harmony lib it would be a better practice to simply pass the params in the functions.

Now here, we're kinda "forced" to do this because the InventoryRepository is created in the middle of the execution of an Interpret. It would have been better to properly inject it, but that would mean its Score won't be stateless anymore (as it would be our only injection mechanism as of now).

I'm not saying we should fix this now because this is a bigger architectural issue, but we should keep an eye on it and tackle this sooner than later.

Extra note: despite the appearances using a static factory doesn't really save us from the OCP violation that we would introduce if we were directly instantiating the concrete class. Yes we can switch to another instance without modifying existing code, but it will force everyone using it to switch as well. Which might not be expected and will make it harder in the end to fix in a bigger codebase (it leaves scars 😢). Again, it's ok here, but we should be very careful about using this "pattern" (it's actually an anti-pattern).

Just sharing a thought even if it's not the most representative situation here (considering we're in the infra). Though it still indirectly highlights the underlying issue. I understand the simplicity of having a `config` crate and using it when needed, but it actually makes things harder to reuse/test in isolation. All of this because in the end we depend on an env var that forces anyone using the `harmony` crate to use that as config mechanism instead of other options. It's fine to use this `config` crate in the `CLI` (or `TUI` or any frontend), but for the actual `harmony` lib it would be a better practice to simply pass the params in the functions. Now here, we're kinda "forced" to do this because the `InventoryRepository` is created in the middle of the execution of an `Interpret`. It would have been better to properly inject it, but that would mean its `Score` won't be stateless anymore (as it would be our only injection mechanism as of now). I'm not saying we should fix this now because this is a bigger architectural issue, but we should keep an eye on it and tackle this sooner than later. _Extra note: despite the appearances using a static factory doesn't really save us from the OCP violation that we would introduce if we were directly instantiating the concrete class. Yes we can switch to another instance without modifying existing code, but it will force everyone using it to switch as well. Which might not be expected and will make it harder in the end to fix in a bigger codebase (it leaves scars 😢). Again, it's ok here, but we should be very careful about using this "pattern" (it's actually an anti-pattern)._
Review

I'll create an issue about this dependency injection problem we have in our Interprets (and at some extent the Scores statelessness).

I'll create an issue about this dependency injection problem we have in our `Interprets` (and at some extent the `Scores` statelessness).
Review

Yeah the factory doesn't bring much value here, no time to do better but at least it allows a minimum of decoupling if we want to switch repository type for the entire application instance, or provide more methods that do take parameters and can apply additional logic when building the repo.

Yeah the factory doesn't bring much value here, no time to do better but at least it allows a minimum of decoupling if we want to switch repository type for the entire application instance, or provide more methods that do take parameters and can apply additional logic when building the repo.
))
}
}

View File

@@ -19,11 +19,7 @@ impl SqliteInventoryRepository {
.await
.map_err(|e| RepoError::ConnectionFailed(e.to_string()))?;
todo!("make sure migrations are up to date");
info!(
"SQLite inventory repository initialized at '{}'",
database_url,
);
info!("SQLite inventory repository initialized at '{database_url}'");
Ok(Self { pool })
}
}
@@ -50,7 +46,7 @@ impl InventoryRepository for SqliteInventoryRepository {
}
async fn get_latest_by_id(&self, host_id: &str) -> Result<Option<PhysicalHost>, RepoError> {
let row = sqlx::query_as!(
let _row = sqlx::query_as!(
DbHost,
r#"SELECT id, version_id, data as "data: Json<PhysicalHost>" FROM physical_hosts WHERE id = ? ORDER BY version_id DESC LIMIT 1"#,
host_id

View File

@@ -1,10 +1,12 @@
use async_trait::async_trait;
use harmony_inventory_agent::local_presence::DiscoveryEvent;
use log::{debug, info};
use log::{debug, info, trace};
use serde::{Deserialize, Serialize};
use crate::{
data::Version,
hardware::{HostCategory, Label, PhysicalHost},
infra::inventory::InventoryRepositoryFactory,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
@@ -41,20 +43,89 @@ struct DiscoverInventoryAgentInterpret {
impl<T: Topology> Interpret<T> for DiscoverInventoryAgentInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
_inventory: &Inventory,
_topology: &T,
) -> Result<Outcome, InterpretError> {
harmony_inventory_agent::local_presence::discover_agents(
self.score.discovery_timeout,
|event: DiscoveryEvent| {
println!("Discovery event {event:?}");
|event: DiscoveryEvent| -> Result<(), String> {
debug!("Discovery event {event:?}");
match event {
DiscoveryEvent::ServiceResolved(service) => info!("Found instance {service:?}"),
DiscoveryEvent::ServiceResolved(service) => {
let service_name = service.fullname.clone();
info!("Found service {service_name}");
let address = match service.get_addresses().iter().next() {
Some(address) => address,
None => {
return Err(format!(
"Could not find address for service {service_name}"
));
}
};
let address = address.to_string();
let port = service.get_port();
tokio::task::spawn(async move {
info!("Getting inventory for host {address} at port {port}");
let host =
harmony_inventory_agent::client::get_host_inventory(&address, port)
.await
letian marked this conversation as resolved
Review

This log and the one above are very similar

This log and the one above are very similar
.unwrap();
trace!("Found host information {host:?}");
// TODO its useless to have two distinct host types but requires a bit much
// refactoring to do it now
let harmony_inventory_agent::hwinfo::PhysicalHost {
storage_drives,
Review

It seems useless but I'm not sure they should actually be merged: to me it feels like two different concepts with a different lifecycle. It is highlighted by the fact that the harmony_inventory_agent is a standalone service that could be consumed by other programs and that this inventory module here is just one of them. The inventory has its own representation of a PhysicalHost that might become different than the one from the harmony_inventory_agent. At least it's my hypothesis for now.

It seems useless but I'm not sure they should actually be merged: to me it feels like two different concepts with a different lifecycle. It is highlighted by the fact that the `harmony_inventory_agent` is a standalone service that could be consumed by other programs and that this `inventory` module here is just one of them. The `inventory` has its own representation of a `PhysicalHost` that might become different than the one from the `harmony_inventory_agent`. At least it's my hypothesis for now.
storage_controller,
memory_modules,
cpus,
chipset,
network_interfaces,
management_interface,
host_uuid,
} = host;
let host = PhysicalHost {
id: Id::from(host_uuid),
category: HostCategory::Server,
network: network_interfaces,
storage: storage_drives,
labels: vec![Label {
name: "discovered-by".to_string(),
value: "harmony-inventory-agent".to_string(),
}],
memory_modules,
cpus,
};
let repo = InventoryRepositoryFactory::build()
.await
.map_err(|e| format!("Could not build repository : {e}"))
.unwrap();
repo.save(&host)
.await
.map_err(|e| format!("Could not save host : {e}"))
.unwrap();
info!(
"Saved new host id {}, summary : {}",
host.id,
host.summary()
);
});
}
_ => debug!("Unhandled event {event:?}"),
}
};
Ok(())
},
);
todo!()
)
.await;
Ok(Outcome {
status: InterpretStatus::SUCCESS,
message: "Discovery process completed successfully".to_string(),

There's an edge case here that might break this score:

discover_agents is awaited, but the spawned task inside the callback are not awaited at all. So in some edge cases, discover_agents might finish executing before the task itself.

And because we don't await for the task inside the callback, error handling is a bit awkward. Currently it says "all good" even though there could be an error:

[INFO ] Getting inventory for host 192.168.5.232 at port 8080

thread 'tokio-runtime-worker' panicked at /home/ian/Projects/nation-tech/harmony/harmony/src/modules/inventory/mod.rs:105:34:
called `Result::unwrap()` on an `Err` value: "Could not build repository : Could not connect to the database: error returned from database: (code: 14) unable to open database file"
note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace
[INFO ] ✅ Discovery process completed successfully
There's an edge case here that might break this score: `discover_agents` is awaited, but the _spawned_ task inside the callback are not awaited at all. So in some edge cases, `discover_agents` might finish executing before the task itself. And because we don't _await_ for the task inside the callback, error handling is a bit awkward. Currently it says "all good" even though there could be an error: ``` [INFO ] Getting inventory for host 192.168.5.232 at port 8080 thread 'tokio-runtime-worker' panicked at /home/ian/Projects/nation-tech/harmony/harmony/src/modules/inventory/mod.rs:105:34: called `Result::unwrap()` on an `Err` value: "Could not build repository : Could not connect to the database: error returned from database: (code: 14) unable to open database file" note: run with `RUST_BACKTRACE=1` environment variable to display a backtrace [INFO ] ✅ Discovery process completed successfully ```
})
}
fn get_name(&self) -> InterpretName {

View File

@@ -12,6 +12,9 @@ log.workspace = true
env_logger.workspace = true
tokio.workspace = true
thiserror.workspace = true
reqwest.workspace = true
# mdns-sd = "0.14.1"
mdns-sd = { git = "https://github.com/jggc/mdns-sd.git", branch = "patch-1" }
local-ip-address = "0.6.5"
harmony_types = { path = "../harmony_types" }
harmony_macros = { path = "../harmony_macros" }

View File

@@ -0,0 +1,15 @@
use crate::hwinfo::PhysicalHost;
pub async fn get_host_inventory(host: &str, port: u16) -> Result<PhysicalHost, String> {
let url = format!("http://{host}:{port}/inventory");
let client = reqwest::Client::new();
let response = client
.get(url)
.send()
.await
.map_err(|e| format!("Failed to download file: {e}"))?;
let host = response.json().await.map_err(|e| e.to_string())?;
Ok(host)
}

View File

@@ -1,3 +1,4 @@
use harmony_types::net::MacAddress;
use log::{debug, warn};
use serde::{Deserialize, Serialize};
use serde_json::Value;
@@ -18,7 +19,7 @@ pub struct PhysicalHost {
pub host_uuid: String,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct StorageDrive {
pub name: String,
pub model: String,
@@ -32,13 +33,30 @@ pub struct StorageDrive {
pub smart_status: Option<String>,
}
impl StorageDrive {
pub fn dummy() -> Self {
Self {
name: String::new(),
model: String::new(),
serial: String::new(),
size_bytes: 0,
logical_block_size: 0,
physical_block_size: 0,
rotational: false,
wwn: None,
interface_type: String::new(),
smart_status: None,
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct StorageController {
pub name: String,
pub driver: String,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct MemoryModule {
pub size_bytes: u64,
pub speed_mhz: Option<u32>,
@@ -48,7 +66,7 @@ pub struct MemoryModule {
pub rank: Option<u8>,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct CPU {
pub model: String,
pub vendor: String,
@@ -63,10 +81,10 @@ pub struct Chipset {
pub vendor: String,
}
#[derive(Serialize, Deserialize, Debug)]
#[derive(Serialize, Deserialize, Debug, Clone)]
pub struct NetworkInterface {
pub name: String,
pub mac_address: String,
pub mac_address: MacAddress,
pub speed_mbps: Option<u32>,
pub is_up: bool,
pub mtu: u32,
@@ -76,6 +94,24 @@ pub struct NetworkInterface {
pub firmware_version: Option<String>,
}
impl NetworkInterface {
pub fn dummy() -> Self {
use harmony_macros::mac_address;
Self {
name: String::new(),
mac_address: mac_address!("00:00:00:00:00:00"),
speed_mbps: Some(0),
is_up: false,
mtu: 0,
ipv4_addresses: vec![],
ipv6_addresses: vec![],
driver: String::new(),
firmware_version: None,
}
}
}
#[derive(Serialize, Deserialize, Debug)]
pub struct ManagementInterface {
pub kind: String,
@@ -509,6 +545,7 @@ impl PhysicalHost {
let mac_address = Self::read_sysfs_string(&iface_path.join("address"))
.map_err(|e| format!("Failed to read MAC address for {}: {}", iface_name, e))?;
let mac_address = MacAddress::try_from(mac_address).map_err(|e| e.to_string())?;
let speed_mbps = if iface_path.join("speed").exists() {
match Self::read_sysfs_u32(&iface_path.join("speed")) {

View File

@@ -1,2 +1,3 @@
mod hwinfo;
pub mod client;
pub mod hwinfo;
pub mod local_presence;

View File

@@ -1,10 +1,14 @@
use log::{debug, error};
use mdns_sd::{ServiceDaemon, ServiceEvent};
use crate::local_presence::SERVICE_NAME;
pub type DiscoveryEvent = ServiceEvent;
pub fn discover_agents(timeout: Option<u64>, on_event: impl Fn(DiscoveryEvent) + Send + 'static) {
pub async fn discover_agents<F>(timeout: Option<u64>, on_event: F)
where
F: FnOnce(DiscoveryEvent) -> Result<(), String> + Send + 'static + Copy,
{
// Create a new mDNS daemon.
let mdns = ServiceDaemon::new().expect("Failed to create mDNS daemon");
@@ -12,23 +16,24 @@ pub fn discover_agents(timeout: Option<u64>, on_event: impl Fn(DiscoveryEvent) +
// The receiver will be a stream of events.
let receiver = mdns.browse(SERVICE_NAME).expect("Failed to browse");
std::thread::spawn(move || {
tokio::task::spawn_blocking(move || {
while let Ok(event) = receiver.recv() {
on_event(event.clone());
if let Err(e) = on_event(event.clone()) {
error!("Event callback failed : {e}");
}
match event {
ServiceEvent::ServiceResolved(resolved) => {
println!("Resolved a new service: {}", resolved.fullname);
debug!("Resolved a new service: {}", resolved.fullname);
}
other_event => {
println!("Received other event: {:?}", &other_event);
debug!("Received other event: {:?}", &other_event);
}
}
}
});
if let Some(timeout) = timeout {
// Gracefully shutdown the daemon.
std::thread::sleep(std::time::Duration::from_secs(timeout));
tokio::time::sleep(std::time::Duration::from_secs(timeout)).await;
mdns.shutdown().unwrap();
}
}

View File

@@ -1,6 +1,6 @@
use serde::Serialize;
use serde::{Deserialize, Serialize};
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize)]
#[derive(Copy, Clone, Debug, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct MacAddress(pub [u8; 6]);
impl MacAddress {
@@ -25,6 +25,30 @@ impl std::fmt::Display for MacAddress {
}
}
impl TryFrom<String> for MacAddress {
type Error = std::io::Error;
fn try_from(value: String) -> Result<Self, Self::Error> {
let parts: Vec<&str> = value.split(':').collect();
if parts.len() != 6 {
return Err(std::io::Error::new(
std::io::ErrorKind::InvalidInput,
"Invalid MAC address format: expected 6 colon-separated hex pairs",
));
}
let mut bytes = [0u8; 6];
for (i, part) in parts.iter().enumerate() {
bytes[i] = u8::from_str_radix(part, 16).map_err(|_| {
std::io::Error::new(
std::io::ErrorKind::InvalidInput,
format!("Invalid hex value in part {}: '{}'", i, part),
)
})?;
}
Ok(MacAddress(bytes))
}
}
pub type IpAddress = std::net::IpAddr;
#[derive(Debug, Clone)]

View File

@@ -0,0 +1,8 @@
-- Add migration script here
CREATE TABLE IF NOT EXISTS physical_hosts (
version_id TEXT PRIMARY KEY NOT NULL,
id TEXT NOT NULL,
data JSON NOT NULL
);
CREATE INDEX IF NOT EXISTS idx_host_id_time
ON physical_hosts (id, version_id DESC);