Compare commits
	
		
			2 Commits
		
	
	
		
			78b80c2169
			...
			b857412151
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| b857412151 | |||
| 7bb3602ab8 | 
							
								
								
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @ -1732,7 +1732,6 @@ dependencies = [ | |||||||
| name = "example-pxe" | name = "example-pxe" | ||||||
| version = "0.1.0" | version = "0.1.0" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  "askama", |  | ||||||
|  "cidr", |  "cidr", | ||||||
|  "env_logger", |  "env_logger", | ||||||
|  "harmony", |  "harmony", | ||||||
| @ -2169,6 +2168,7 @@ dependencies = [ | |||||||
| name = "harmony" | name = "harmony" | ||||||
| version = "0.1.0" | version = "0.1.0" | ||||||
| dependencies = [ | dependencies = [ | ||||||
|  |  "askama", | ||||||
|  "async-trait", |  "async-trait", | ||||||
|  "base64 0.22.1", |  "base64 0.22.1", | ||||||
|  "bollard", |  "bollard", | ||||||
|  | |||||||
| @ -18,5 +18,4 @@ harmony_macros = { path = "../../harmony_macros" } | |||||||
| log = { workspace = true } | log = { workspace = true } | ||||||
| env_logger = { workspace = true } | env_logger = { workspace = true } | ||||||
| url = { workspace = true } | url = { workspace = true } | ||||||
| askama = "0.14.0" |  | ||||||
| serde.workspace = true | serde.workspace = true | ||||||
|  | |||||||
| @ -1,97 +1,24 @@ | |||||||
| mod topology; | mod topology; | ||||||
| 
 | 
 | ||||||
| use std::net::IpAddr; |  | ||||||
| 
 |  | ||||||
| use askama::Template; |  | ||||||
| use harmony::{ |  | ||||||
|     data::{FileContent, FilePath}, |  | ||||||
|     modules::{dhcp::DhcpScore, http::StaticFilesHttpScore, tftp::TftpScore}, |  | ||||||
|     score::Score, |  | ||||||
|     topology::{HAClusterTopology, Url}, |  | ||||||
| }; |  | ||||||
| 
 |  | ||||||
| use crate::topology::{get_inventory, get_topology}; | use crate::topology::{get_inventory, get_topology}; | ||||||
|  | use harmony::modules::okd::ipxe::OkdIpxeScore; | ||||||
| 
 | 
 | ||||||
| #[tokio::main] | #[tokio::main] | ||||||
| async fn main() { | async fn main() { | ||||||
|     let inventory = get_inventory(); |     let inventory = get_inventory(); | ||||||
|     let topology = get_topology().await; |     let topology = get_topology().await; | ||||||
|     let gateway_ip = &topology.router.get_gateway(); |  | ||||||
| 
 | 
 | ||||||
|     let kickstart_filename = "inventory.kickstart"; |     let kickstart_filename = "inventory.kickstart".to_string(); | ||||||
|     let cluster_pubkey_filename = "cluster_ssh_key.pub"; |     let cluster_pubkey_filename = "cluster_ssh_key.pub".to_string(); | ||||||
|     let harmony_inventory_agent = "harmony_inventory_agent"; |     let harmony_inventory_agent = "harmony_inventory_agent".to_string(); | ||||||
| 
 | 
 | ||||||
|     // TODO: this should be a single IPXEScore instead of having the user do this step by step
 |     let ipxe_score = OkdIpxeScore { | ||||||
|     let scores: Vec<Box<dyn Score<HAClusterTopology>>> = vec![ |         kickstart_filename, | ||||||
|         Box::new(DhcpScore { |  | ||||||
|             host_binding: vec![], |  | ||||||
|             next_server: Some(topology.router.get_gateway()), |  | ||||||
|             boot_filename: None, |  | ||||||
|             filename: Some("undionly.kpxe".to_string()), |  | ||||||
|             filename64: Some("ipxe.efi".to_string()), |  | ||||||
|             filenameipxe: Some(format!("http://{gateway_ip}:8080/boot.ipxe").to_string()), |  | ||||||
|         }), |  | ||||||
|         Box::new(TftpScore { |  | ||||||
|             files_to_serve: Url::LocalFolder("./data/pxe/okd/tftpboot/".to_string()), |  | ||||||
|         }), |  | ||||||
|         Box::new(StaticFilesHttpScore { |  | ||||||
|             // TODO The current russh based copy is way too slow, check for a lib update or use scp
 |  | ||||||
|             // when available
 |  | ||||||
|             //
 |  | ||||||
|             // For now just run :
 |  | ||||||
|             // scp -r data/pxe/okd/http_files/* root@192.168.1.1:/usr/local/http/
 |  | ||||||
|             //
 |  | ||||||
|             folder_to_serve: None, |  | ||||||
|             // folder_to_serve: Some(Url::LocalFolder("./data/pxe/okd/http_files/".to_string())),
 |  | ||||||
|             files: vec![ |  | ||||||
|                 FileContent { |  | ||||||
|                     path: FilePath::Relative("boot.ipxe".to_string()), |  | ||||||
|                     content: BootIpxeTpl { gateway_ip }.to_string(), |  | ||||||
|                 }, |  | ||||||
|                 FileContent { |  | ||||||
|                     path: FilePath::Relative(kickstart_filename.to_string()), |  | ||||||
|                     content: InventoryKickstartTpl { |  | ||||||
|                         gateway_ip, |  | ||||||
|         harmony_inventory_agent, |         harmony_inventory_agent, | ||||||
|         cluster_pubkey_filename, |         cluster_pubkey_filename, | ||||||
|                     } |     }; | ||||||
|                     .to_string(), |  | ||||||
|                 }, |  | ||||||
|                 FileContent { |  | ||||||
|                     path: FilePath::Relative("fallback.ipxe".to_string()), |  | ||||||
|                     content: FallbackIpxeTpl { |  | ||||||
|                         gateway_ip, |  | ||||||
|                         kickstart_filename, |  | ||||||
|                     } |  | ||||||
|                     .to_string(), |  | ||||||
|                 }, |  | ||||||
|             ], |  | ||||||
|         }), |  | ||||||
|     ]; |  | ||||||
| 
 | 
 | ||||||
|     harmony_cli::run(inventory, topology, scores, None) |     harmony_cli::run(inventory, topology, vec![Box::new(ipxe_score)], None) | ||||||
|         .await |         .await | ||||||
|         .unwrap(); |         .unwrap(); | ||||||
| } | } | ||||||
| 
 |  | ||||||
| #[derive(Template)] |  | ||||||
| #[template(path = "boot.ipxe.j2")] |  | ||||||
| struct BootIpxeTpl<'a> { |  | ||||||
|     gateway_ip: &'a IpAddr, |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| #[derive(Template)] |  | ||||||
| #[template(path = "fallback.ipxe.j2")] |  | ||||||
| struct FallbackIpxeTpl<'a> { |  | ||||||
|     gateway_ip: &'a IpAddr, |  | ||||||
|     kickstart_filename: &'a str, |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| #[derive(Template)] |  | ||||||
| #[template(path = "inventory.kickstart.j2")] |  | ||||||
| struct InventoryKickstartTpl<'a> { |  | ||||||
|     gateway_ip: &'a IpAddr, |  | ||||||
|     cluster_pubkey_filename: &'a str, |  | ||||||
|     harmony_inventory_agent: &'a str, |  | ||||||
| } |  | ||||||
|  | |||||||
| @ -1,5 +1,3 @@ | |||||||
| use std::{net::IpAddr, sync::Arc}; |  | ||||||
| 
 |  | ||||||
| use cidr::Ipv4Cidr; | use cidr::Ipv4Cidr; | ||||||
| use harmony::{ | use harmony::{ | ||||||
|     hardware::{FirewallGroup, HostCategory, Location, PhysicalHost, SwitchGroup}, |     hardware::{FirewallGroup, HostCategory, Location, PhysicalHost, SwitchGroup}, | ||||||
| @ -10,6 +8,7 @@ use harmony::{ | |||||||
| use harmony_macros::{ip, ipv4}; | use harmony_macros::{ip, ipv4}; | ||||||
| use harmony_secret::{Secret, SecretManager}; | use harmony_secret::{Secret, SecretManager}; | ||||||
| use serde::{Deserialize, Serialize}; | use serde::{Deserialize, Serialize}; | ||||||
|  | use std::{net::IpAddr, sync::Arc}; | ||||||
| 
 | 
 | ||||||
| #[derive(Secret, Serialize, Deserialize, Debug, PartialEq)] | #[derive(Secret, Serialize, Deserialize, Debug, PartialEq)] | ||||||
| struct OPNSenseFirewallConfig { | struct OPNSenseFirewallConfig { | ||||||
|  | |||||||
| @ -69,6 +69,7 @@ base64.workspace = true | |||||||
| once_cell = "1.21.3" | once_cell = "1.21.3" | ||||||
| harmony_inventory_agent = { path = "../harmony_inventory_agent" } | harmony_inventory_agent = { path = "../harmony_inventory_agent" } | ||||||
| harmony_secret_derive = { version = "0.1.0", path = "../harmony_secret_derive" } | harmony_secret_derive = { version = "0.1.0", path = "../harmony_secret_derive" } | ||||||
|  | askama = "0.14.0" | ||||||
| 
 | 
 | ||||||
| [dev-dependencies] | [dev-dependencies] | ||||||
| pretty_assertions.workspace = true | pretty_assertions.workspace = true | ||||||
|  | |||||||
| @ -1,6 +1,5 @@ | |||||||
| use log::debug; |  | ||||||
| use once_cell::sync::Lazy; | use once_cell::sync::Lazy; | ||||||
| use tokio::sync::broadcast; | use std::{collections::HashMap, sync::Mutex}; | ||||||
| 
 | 
 | ||||||
| use crate::modules::application::ApplicationFeatureStatus; | use crate::modules::application::ApplicationFeatureStatus; | ||||||
| 
 | 
 | ||||||
| @ -40,43 +39,43 @@ pub enum HarmonyEvent { | |||||||
|     }, |     }, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| static HARMONY_EVENT_BUS: Lazy<broadcast::Sender<HarmonyEvent>> = Lazy::new(|| { | type Subscriber = Box<dyn Fn(&HarmonyEvent) + Send + Sync>; | ||||||
|     // TODO: Adjust channel capacity
 |  | ||||||
|     let (tx, _rx) = broadcast::channel(100); |  | ||||||
|     tx |  | ||||||
| }); |  | ||||||
| 
 | 
 | ||||||
| pub fn instrument(event: HarmonyEvent) -> Result<(), &'static str> { | static SUBSCRIBERS: Lazy<Mutex<HashMap<String, Subscriber>>> = | ||||||
|     if cfg!(any(test, feature = "testing")) { |     Lazy::new(|| Mutex::new(HashMap::new())); | ||||||
|         let _ = event; // Suppress the "unused variable" warning for `event`
 |  | ||||||
|         Ok(()) |  | ||||||
|     } else { |  | ||||||
|         match HARMONY_EVENT_BUS.send(event) { |  | ||||||
|             Ok(_) => Ok(()), |  | ||||||
|             Err(_) => Err("send error: no subscribers"), |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| } |  | ||||||
| 
 | 
 | ||||||
| pub async fn subscribe<F, Fut>(name: &str, mut handler: F) | /// Subscribes a listener to all instrumentation events.
 | ||||||
|  | ///
 | ||||||
|  | /// Simply provide a unique name and a closure to run when an event happens.
 | ||||||
|  | ///
 | ||||||
|  | /// # Example
 | ||||||
|  | /// ```
 | ||||||
|  | /// instrumentation::subscribe("my_logger", |event| {
 | ||||||
|  | ///   println!("Event occurred: {:?}", event);
 | ||||||
|  | /// });
 | ||||||
|  | /// ```
 | ||||||
|  | pub fn subscribe<F>(name: &str, callback: F) | ||||||
| where | where | ||||||
|     F: FnMut(HarmonyEvent) -> Fut + Send + 'static, |     F: Fn(&HarmonyEvent) + Send + Sync + 'static, | ||||||
|     Fut: Future<Output = bool> + Send, |  | ||||||
| { | { | ||||||
|     let mut rx = HARMONY_EVENT_BUS.subscribe(); |     let mut subs = SUBSCRIBERS.lock().unwrap(); | ||||||
|     debug!("[{name}] Service started. Listening for events..."); |     subs.insert(name.to_string(), Box::new(callback)); | ||||||
|     loop { | } | ||||||
|         match rx.recv().await { | 
 | ||||||
|             Ok(event) => { | /// Instruments an event, notifying all subscribers.
 | ||||||
|                 if !handler(event).await { | ///
 | ||||||
|                     debug!("[{name}] Handler requested exit."); | /// This will call every closure that was registered with `subscribe`.
 | ||||||
|                     break; | ///
 | ||||||
|                 } | /// # Example
 | ||||||
|             } | /// ```
 | ||||||
|             Err(broadcast::error::RecvError::Lagged(n)) => { | /// instrumentation::instrument(HarmonyEvent::HarmonyStarted);
 | ||||||
|                 debug!("[{name}] Lagged behind by {n} messages."); | /// ```
 | ||||||
|             } | pub fn instrument(event: HarmonyEvent) -> Result<(), &'static str> { | ||||||
|             Err(_) => break, |     let subs = SUBSCRIBERS.lock().unwrap(); | ||||||
|         } | 
 | ||||||
|     } |     for callback in subs.values() { | ||||||
|  |         callback(&event); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     Ok(()) | ||||||
| } | } | ||||||
|  | |||||||
| @ -74,6 +74,7 @@ impl<T: Topology> Maestro<T> { | |||||||
| 
 | 
 | ||||||
|     fn is_topology_initialized(&self) -> bool { |     fn is_topology_initialized(&self) -> bool { | ||||||
|         self.topology_state.status == TopologyStatus::Success |         self.topology_state.status == TopologyStatus::Success | ||||||
|  |             || self.topology_state.status == TopologyStatus::Noop | ||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     pub async fn interpret(&self, score: Box<dyn Score<T>>) -> Result<Outcome, InterpretError> { |     pub async fn interpret(&self, score: Box<dyn Score<T>>) -> Result<Outcome, InterpretError> { | ||||||
|  | |||||||
							
								
								
									
										148
									
								
								harmony/src/modules/okd/ipxe.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										148
									
								
								harmony/src/modules/okd/ipxe.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,148 @@ | |||||||
|  | use askama::Template; | ||||||
|  | use async_trait::async_trait; | ||||||
|  | use derive_new::new; | ||||||
|  | use serde::Serialize; | ||||||
|  | use std::net::IpAddr; | ||||||
|  | 
 | ||||||
|  | use crate::{ | ||||||
|  |     data::{FileContent, FilePath, Id, Version}, | ||||||
|  |     interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, | ||||||
|  |     inventory::Inventory, | ||||||
|  |     modules::{dhcp::DhcpScore, http::StaticFilesHttpScore, tftp::TftpScore}, | ||||||
|  |     score::Score, | ||||||
|  |     topology::{DhcpServer, HttpServer, Router, TftpServer, Topology, Url}, | ||||||
|  | }; | ||||||
|  | 
 | ||||||
|  | #[derive(Debug, new, Clone, Serialize)] | ||||||
|  | pub struct OkdIpxeScore { | ||||||
|  |     pub kickstart_filename: String, | ||||||
|  |     pub harmony_inventory_agent: String, | ||||||
|  |     pub cluster_pubkey_filename: String, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | impl<T: Topology + DhcpServer + TftpServer + HttpServer + Router> Score<T> for OkdIpxeScore { | ||||||
|  |     fn create_interpret(&self) -> Box<dyn Interpret<T>> { | ||||||
|  |         Box::new(IpxeInterpret::new(self.clone())) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn name(&self) -> String { | ||||||
|  |         "OkdIpxeScore".to_string() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[derive(Debug, new, Clone)] | ||||||
|  | pub struct IpxeInterpret { | ||||||
|  |     score: OkdIpxeScore, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[async_trait] | ||||||
|  | impl<T: Topology + DhcpServer + TftpServer + HttpServer + Router> Interpret<T> for IpxeInterpret { | ||||||
|  |     async fn execute( | ||||||
|  |         &self, | ||||||
|  |         inventory: &Inventory, | ||||||
|  |         topology: &T, | ||||||
|  |     ) -> Result<Outcome, InterpretError> { | ||||||
|  |         let gateway_ip = topology.get_gateway(); | ||||||
|  | 
 | ||||||
|  |         let scores: Vec<Box<dyn Score<T>>> = vec![ | ||||||
|  |             Box::new(DhcpScore { | ||||||
|  |                 host_binding: vec![], | ||||||
|  |                 next_server: Some(topology.get_gateway()), | ||||||
|  |                 boot_filename: None, | ||||||
|  |                 filename: Some("undionly.kpxe".to_string()), | ||||||
|  |                 filename64: Some("ipxe.efi".to_string()), | ||||||
|  |                 filenameipxe: Some(format!("http://{gateway_ip}:8080/boot.ipxe").to_string()), | ||||||
|  |             }), | ||||||
|  |             Box::new(TftpScore { | ||||||
|  |                 files_to_serve: Url::LocalFolder("./data/pxe/okd/tftpboot/".to_string()), | ||||||
|  |             }), | ||||||
|  |             Box::new(StaticFilesHttpScore { | ||||||
|  |                 // TODO The current russh based copy is way too slow, check for a lib update or use scp
 | ||||||
|  |                 // when available
 | ||||||
|  |                 //
 | ||||||
|  |                 // For now just run :
 | ||||||
|  |                 // scp -r data/pxe/okd/http_files/* root@192.168.1.1:/usr/local/http/
 | ||||||
|  |                 //
 | ||||||
|  |                 folder_to_serve: None, | ||||||
|  |                 // folder_to_serve: Some(Url::LocalFolder("./data/pxe/okd/http_files/".to_string())),
 | ||||||
|  |                 files: vec![ | ||||||
|  |                     FileContent { | ||||||
|  |                         path: FilePath::Relative("boot.ipxe".to_string()), | ||||||
|  |                         content: BootIpxeTpl { | ||||||
|  |                             gateway_ip: &gateway_ip, | ||||||
|  |                         } | ||||||
|  |                         .to_string(), | ||||||
|  |                     }, | ||||||
|  |                     FileContent { | ||||||
|  |                         path: FilePath::Relative(self.score.kickstart_filename.clone()), | ||||||
|  |                         content: InventoryKickstartTpl { | ||||||
|  |                             gateway_ip: &gateway_ip, | ||||||
|  |                             harmony_inventory_agent: &self.score.harmony_inventory_agent, | ||||||
|  |                             cluster_pubkey_filename: &self.score.cluster_pubkey_filename, | ||||||
|  |                         } | ||||||
|  |                         .to_string(), | ||||||
|  |                     }, | ||||||
|  |                     FileContent { | ||||||
|  |                         path: FilePath::Relative("fallback.ipxe".to_string()), | ||||||
|  |                         content: FallbackIpxeTpl { | ||||||
|  |                             gateway_ip: &gateway_ip, | ||||||
|  |                             kickstart_filename: &self.score.kickstart_filename, | ||||||
|  |                         } | ||||||
|  |                         .to_string(), | ||||||
|  |                     }, | ||||||
|  |                 ], | ||||||
|  |             }), | ||||||
|  |         ]; | ||||||
|  | 
 | ||||||
|  |         for score in scores { | ||||||
|  |             let result = score.interpret(inventory, topology).await; | ||||||
|  |             match result { | ||||||
|  |                 Ok(outcome) => match outcome.status { | ||||||
|  |                     InterpretStatus::SUCCESS => continue, | ||||||
|  |                     InterpretStatus::NOOP => continue, | ||||||
|  |                     _ => return Err(InterpretError::new(outcome.message)), | ||||||
|  |                 }, | ||||||
|  |                 Err(e) => return Err(e), | ||||||
|  |             }; | ||||||
|  |         } | ||||||
|  | 
 | ||||||
|  |         Ok(Outcome::success("Ipxe installed".to_string())) | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn get_name(&self) -> InterpretName { | ||||||
|  |         InterpretName::Ipxe | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn get_version(&self) -> Version { | ||||||
|  |         todo!() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn get_status(&self) -> InterpretStatus { | ||||||
|  |         todo!() | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     fn get_children(&self) -> Vec<Id> { | ||||||
|  |         todo!() | ||||||
|  |     } | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[derive(Template)] | ||||||
|  | #[template(path = "boot.ipxe.j2")] | ||||||
|  | struct BootIpxeTpl<'a> { | ||||||
|  |     gateway_ip: &'a IpAddr, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[derive(Template)] | ||||||
|  | #[template(path = "fallback.ipxe.j2")] | ||||||
|  | struct FallbackIpxeTpl<'a> { | ||||||
|  |     gateway_ip: &'a IpAddr, | ||||||
|  |     kickstart_filename: &'a str, | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | #[derive(Template)] | ||||||
|  | #[template(path = "inventory.kickstart.j2")] | ||||||
|  | struct InventoryKickstartTpl<'a> { | ||||||
|  |     gateway_ip: &'a IpAddr, | ||||||
|  |     cluster_pubkey_filename: &'a str, | ||||||
|  |     harmony_inventory_agent: &'a str, | ||||||
|  | } | ||||||
| @ -2,5 +2,6 @@ pub mod bootstrap_dhcp; | |||||||
| pub mod bootstrap_load_balancer; | pub mod bootstrap_load_balancer; | ||||||
| pub mod dhcp; | pub mod dhcp; | ||||||
| pub mod dns; | pub mod dns; | ||||||
|  | pub mod ipxe; | ||||||
| pub mod load_balancer; | pub mod load_balancer; | ||||||
| pub mod upgrade; | pub mod upgrade; | ||||||
|  | |||||||
| @ -7,19 +7,11 @@ use harmony::{ | |||||||
| }; | }; | ||||||
| use log::{error, info, log_enabled}; | use log::{error, info, log_enabled}; | ||||||
| use std::io::Write; | use std::io::Write; | ||||||
| use std::sync::{Arc, Mutex}; | use std::sync::Mutex; | ||||||
| 
 | 
 | ||||||
| pub fn init() -> tokio::task::JoinHandle<()> { | pub fn init() { | ||||||
|     configure_logger(); |     configure_logger(); | ||||||
|     let handle = tokio::spawn(handle_events()); |     handle_events(); | ||||||
| 
 |  | ||||||
|     loop { |  | ||||||
|         if instrumentation::instrument(HarmonyEvent::HarmonyStarted).is_ok() { |  | ||||||
|             break; |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     handle |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| fn configure_logger() { | fn configure_logger() { | ||||||
| @ -86,16 +78,12 @@ fn configure_logger() { | |||||||
|         .init(); |         .init(); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| async fn handle_events() { | fn handle_events() { | ||||||
|     let preparing_topology = Arc::new(Mutex::new(false)); |     let preparing_topology = Mutex::new(false); | ||||||
|     let current_score: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None)); |     let current_score: Mutex<Option<String>> = Mutex::new(None); | ||||||
| 
 | 
 | ||||||
|     instrumentation::subscribe("Harmony CLI Logger", { |     instrumentation::subscribe("Harmony CLI Logger", { | ||||||
|         move |event| { |         move |event| { | ||||||
|             let preparing_topology = Arc::clone(&preparing_topology); |  | ||||||
|             let current_score = Arc::clone(¤t_score); |  | ||||||
| 
 |  | ||||||
|             async move { |  | ||||||
|             let mut preparing_topology = preparing_topology.lock().unwrap(); |             let mut preparing_topology = preparing_topology.lock().unwrap(); | ||||||
|             let mut current_score = current_score.lock().unwrap(); |             let mut current_score = current_score.lock().unwrap(); | ||||||
| 
 | 
 | ||||||
| @ -104,7 +92,6 @@ async fn handle_events() { | |||||||
|                 HarmonyEvent::HarmonyFinished => { |                 HarmonyEvent::HarmonyFinished => { | ||||||
|                     let emoji = crate::theme::EMOJI_HARMONY.to_string(); |                     let emoji = crate::theme::EMOJI_HARMONY.to_string(); | ||||||
|                     info!(emoji = emoji.as_str(); "Harmony completed"); |                     info!(emoji = emoji.as_str(); "Harmony completed"); | ||||||
|                         return false; |  | ||||||
|                 } |                 } | ||||||
|                 HarmonyEvent::TopologyStateChanged { |                 HarmonyEvent::TopologyStateChanged { | ||||||
|                     topology, |                     topology, | ||||||
| @ -113,7 +100,10 @@ async fn handle_events() { | |||||||
|                 } => match status { |                 } => match status { | ||||||
|                     TopologyStatus::Queued => {} |                     TopologyStatus::Queued => {} | ||||||
|                     TopologyStatus::Preparing => { |                     TopologyStatus::Preparing => { | ||||||
|                             let emoji = format!("{}", style(crate::theme::EMOJI_TOPOLOGY.to_string()).yellow()); |                         let emoji = format!( | ||||||
|  |                             "{}", | ||||||
|  |                             style(crate::theme::EMOJI_TOPOLOGY.to_string()).yellow() | ||||||
|  |                         ); | ||||||
|                         info!(emoji = emoji.as_str(); "Preparing environment: {topology}..."); |                         info!(emoji = emoji.as_str(); "Preparing environment: {topology}..."); | ||||||
|                         (*preparing_topology) = true; |                         (*preparing_topology) = true; | ||||||
|                     } |                     } | ||||||
| @ -158,7 +148,7 @@ async fn handle_events() { | |||||||
|                     score, |                     score, | ||||||
|                     outcome, |                     outcome, | ||||||
|                 } => { |                 } => { | ||||||
|                         if current_score.is_some() && current_score.clone().unwrap() == score { |                     if current_score.is_some() && ¤t_score.clone().unwrap() == score { | ||||||
|                         (*current_score) = None; |                         (*current_score) = None; | ||||||
|                     } |                     } | ||||||
| 
 | 
 | ||||||
| @ -175,7 +165,7 @@ async fn handle_events() { | |||||||
|                             } |                             } | ||||||
|                         }, |                         }, | ||||||
|                         Err(err) => { |                         Err(err) => { | ||||||
|                                 error!(status = "failed"; "{}", err); |                             error!(status = "failed"; "{err}"); | ||||||
|                         } |                         } | ||||||
|                     } |                     } | ||||||
|                 } |                 } | ||||||
| @ -186,19 +176,16 @@ async fn handle_events() { | |||||||
|                     status, |                     status, | ||||||
|                 } => match status { |                 } => match status { | ||||||
|                     ApplicationFeatureStatus::Installing => { |                     ApplicationFeatureStatus::Installing => { | ||||||
|                             info!("Installing feature '{}' for '{}'...", feature, application); |                         info!("Installing feature '{feature}' for '{application}'..."); | ||||||
|                     } |                     } | ||||||
|                     ApplicationFeatureStatus::Installed => { |                     ApplicationFeatureStatus::Installed => { | ||||||
|                             info!(status = "finished"; "Feature '{}' installed", feature); |                         info!(status = "finished"; "Feature '{feature}' installed"); | ||||||
|                     } |                     } | ||||||
|                     ApplicationFeatureStatus::Failed { details } => { |                     ApplicationFeatureStatus::Failed { details } => { | ||||||
|                             error!(status = "failed"; "Feature '{}' installation failed: {}", feature, details); |                         error!(status = "failed"; "Feature '{feature}' installation failed: {details}"); | ||||||
|                     } |                     } | ||||||
|                 }, |                 }, | ||||||
|             } |             } | ||||||
|                 true |  | ||||||
|         } |         } | ||||||
|         } |     }); | ||||||
|     }) |  | ||||||
|     .await; |  | ||||||
| } | } | ||||||
|  | |||||||
| @ -115,7 +115,7 @@ pub async fn run_cli<T: Topology + Send + Sync + 'static>( | |||||||
|     scores: Vec<Box<dyn Score<T>>>, |     scores: Vec<Box<dyn Score<T>>>, | ||||||
|     args: Args, |     args: Args, | ||||||
| ) -> Result<(), Box<dyn std::error::Error>> { | ) -> Result<(), Box<dyn std::error::Error>> { | ||||||
|     let cli_logger_handle = cli_logger::init(); |     cli_logger::init(); | ||||||
| 
 | 
 | ||||||
|     let mut maestro = Maestro::initialize(inventory, topology).await.unwrap(); |     let mut maestro = Maestro::initialize(inventory, topology).await.unwrap(); | ||||||
|     maestro.register_all(scores); |     maestro.register_all(scores); | ||||||
| @ -123,7 +123,6 @@ pub async fn run_cli<T: Topology + Send + Sync + 'static>( | |||||||
|     let result = init(maestro, args).await; |     let result = init(maestro, args).await; | ||||||
| 
 | 
 | ||||||
|     instrumentation::instrument(instrumentation::HarmonyEvent::HarmonyFinished).unwrap(); |     instrumentation::instrument(instrumentation::HarmonyEvent::HarmonyFinished).unwrap(); | ||||||
|     let _ = tokio::try_join!(cli_logger_handle); |  | ||||||
|     result |     result | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
|  | |||||||
| @ -1,39 +1,26 @@ | |||||||
| use harmony_cli::progress::{IndicatifProgressTracker, ProgressTracker}; | use harmony_cli::progress::{IndicatifProgressTracker, ProgressTracker}; | ||||||
| use indicatif::MultiProgress; | use indicatif::MultiProgress; | ||||||
| use std::sync::Arc; |  | ||||||
| 
 | 
 | ||||||
| use crate::instrumentation::{self, HarmonyComposerEvent}; | use crate::instrumentation::{self, HarmonyComposerEvent}; | ||||||
| 
 | 
 | ||||||
| pub fn init() -> tokio::task::JoinHandle<()> { | pub fn init() { | ||||||
|     configure_logger(); |     configure_logger(); | ||||||
|     let handle = tokio::spawn(handle_events()); |     handle_events(); | ||||||
| 
 |  | ||||||
|     loop { |  | ||||||
|         if instrumentation::instrument(HarmonyComposerEvent::HarmonyComposerStarted).is_ok() { |  | ||||||
|             break; |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| 
 |  | ||||||
|     handle |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| fn configure_logger() { | fn configure_logger() { | ||||||
|     env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build(); |     env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build(); | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub async fn handle_events() { | pub fn handle_events() { | ||||||
|     let progress_tracker = Arc::new(IndicatifProgressTracker::new(MultiProgress::new())); |     let progress_tracker = IndicatifProgressTracker::new(MultiProgress::new()); | ||||||
| 
 | 
 | ||||||
|     const SETUP_SECTION: &str = "project-initialization"; |     const SETUP_SECTION: &str = "project-initialization"; | ||||||
|     const COMPILTATION_TASK: &str = "compilation"; |     const COMPILTATION_TASK: &str = "compilation"; | ||||||
|     const PROGRESS_DEPLOYMENT: &str = "deployment"; |     const PROGRESS_DEPLOYMENT: &str = "deployment"; | ||||||
| 
 | 
 | ||||||
|     instrumentation::subscribe("Harmony Composer Logger", { |     instrumentation::subscribe("Harmony Composer Logger", { | ||||||
|         move |event| { |         move |event| match event { | ||||||
|             let progress_tracker = Arc::clone(&progress_tracker); |  | ||||||
| 
 |  | ||||||
|             async move { |  | ||||||
|                 match event { |  | ||||||
|             HarmonyComposerEvent::HarmonyComposerStarted => {} |             HarmonyComposerEvent::HarmonyComposerStarted => {} | ||||||
|             HarmonyComposerEvent::ProjectInitializationStarted => { |             HarmonyComposerEvent::ProjectInitializationStarted => { | ||||||
|                 progress_tracker.add_section( |                 progress_tracker.add_section( | ||||||
| @ -46,13 +33,16 @@ pub async fn handle_events() { | |||||||
|             } |             } | ||||||
|             HarmonyComposerEvent::ProjectInitialized => {} |             HarmonyComposerEvent::ProjectInitialized => {} | ||||||
|             HarmonyComposerEvent::ProjectCompilationStarted { details } => { |             HarmonyComposerEvent::ProjectCompilationStarted { details } => { | ||||||
|                         progress_tracker.add_task(SETUP_SECTION, COMPILTATION_TASK, &details); |                 progress_tracker.add_task(SETUP_SECTION, COMPILTATION_TASK, details); | ||||||
|             } |             } | ||||||
|             HarmonyComposerEvent::ProjectCompiled => { |             HarmonyComposerEvent::ProjectCompiled => { | ||||||
|                 progress_tracker.finish_task(COMPILTATION_TASK, "project compiled"); |                 progress_tracker.finish_task(COMPILTATION_TASK, "project compiled"); | ||||||
|             } |             } | ||||||
|             HarmonyComposerEvent::ProjectCompilationFailed { details } => { |             HarmonyComposerEvent::ProjectCompilationFailed { details } => { | ||||||
|                         progress_tracker.fail_task(COMPILTATION_TASK, &format!("failed to compile project:\n{details}")); |                 progress_tracker.fail_task( | ||||||
|  |                     COMPILTATION_TASK, | ||||||
|  |                     &format!("failed to compile project:\n{details}"), | ||||||
|  |                 ); | ||||||
|             } |             } | ||||||
|             HarmonyComposerEvent::DeploymentStarted { target, profile } => { |             HarmonyComposerEvent::DeploymentStarted { target, profile } => { | ||||||
|                 progress_tracker.add_section( |                 progress_tracker.add_section( | ||||||
| @ -68,15 +58,9 @@ pub async fn handle_events() { | |||||||
|             } |             } | ||||||
|             HarmonyComposerEvent::DeploymentFailed { details } => { |             HarmonyComposerEvent::DeploymentFailed { details } => { | ||||||
|                 progress_tracker.add_task(PROGRESS_DEPLOYMENT, "deployment-failed", ""); |                 progress_tracker.add_task(PROGRESS_DEPLOYMENT, "deployment-failed", ""); | ||||||
|                         progress_tracker.fail_task("deployment-failed", &details); |                 progress_tracker.fail_task("deployment-failed", details); | ||||||
|                     }, |  | ||||||
|                     HarmonyComposerEvent::Shutdown => { |  | ||||||
|                         return false; |  | ||||||
|                     } |  | ||||||
|                 } |  | ||||||
|                 true |  | ||||||
|             } |             } | ||||||
|  |             HarmonyComposerEvent::Shutdown => {} | ||||||
|         } |         } | ||||||
|     }) |     }) | ||||||
|     .await |  | ||||||
| } | } | ||||||
|  | |||||||
| @ -1,6 +1,5 @@ | |||||||
| use log::debug; |  | ||||||
| use once_cell::sync::Lazy; | use once_cell::sync::Lazy; | ||||||
| use tokio::sync::broadcast; | use std::{collections::HashMap, sync::Mutex}; | ||||||
| 
 | 
 | ||||||
| use crate::{HarmonyProfile, HarmonyTarget}; | use crate::{HarmonyProfile, HarmonyTarget}; | ||||||
| 
 | 
 | ||||||
| @ -27,48 +26,43 @@ pub enum HarmonyComposerEvent { | |||||||
|     Shutdown, |     Shutdown, | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| static HARMONY_COMPOSER_EVENT_BUS: Lazy<broadcast::Sender<HarmonyComposerEvent>> = | type Subscriber = Box<dyn Fn(&HarmonyComposerEvent) + Send + Sync>; | ||||||
|     Lazy::new(|| { |  | ||||||
|         // TODO: Adjust channel capacity
 |  | ||||||
|         let (tx, _rx) = broadcast::channel(16); |  | ||||||
|         tx |  | ||||||
|     }); |  | ||||||
| 
 | 
 | ||||||
| pub fn instrument(event: HarmonyComposerEvent) -> Result<(), &'static str> { | static SUBSCRIBERS: Lazy<Mutex<HashMap<String, Subscriber>>> = | ||||||
|     #[cfg(not(test))] |     Lazy::new(|| Mutex::new(HashMap::new())); | ||||||
|     { |  | ||||||
|         match HARMONY_COMPOSER_EVENT_BUS.send(event) { |  | ||||||
|             Ok(_) => Ok(()), |  | ||||||
|             Err(_) => Err("send error: no subscribers"), |  | ||||||
|         } |  | ||||||
|     } |  | ||||||
| 
 | 
 | ||||||
|     #[cfg(test)] | /// Subscribes a listener to all instrumentation events.
 | ||||||
|     { | ///
 | ||||||
|         let _ = event; // Suppress the "unused variable" warning for `event`
 | /// Simply provide a unique name and a closure to run when an event happens.
 | ||||||
|         Ok(()) | ///
 | ||||||
|     } | /// # Example
 | ||||||
| } | /// ```
 | ||||||
| 
 | /// instrumentation::subscribe("my_logger", |event| {
 | ||||||
| pub async fn subscribe<F, Fut>(name: &str, mut handler: F) | ///   println!("Event occurred: {:?}", event);
 | ||||||
|  | /// });
 | ||||||
|  | /// ```
 | ||||||
|  | pub fn subscribe<F>(name: &str, callback: F) | ||||||
| where | where | ||||||
|     F: FnMut(HarmonyComposerEvent) -> Fut + Send + 'static, |     F: Fn(&HarmonyComposerEvent) + Send + Sync + 'static, | ||||||
|     Fut: Future<Output = bool> + Send, |  | ||||||
| { | { | ||||||
|     let mut rx = HARMONY_COMPOSER_EVENT_BUS.subscribe(); |     let mut subs = SUBSCRIBERS.lock().unwrap(); | ||||||
|     debug!("[{name}] Service started. Listening for events..."); |     subs.insert(name.to_string(), Box::new(callback)); | ||||||
|     loop { | } | ||||||
|         match rx.recv().await { | 
 | ||||||
|             Ok(event) => { | /// Instruments an event, notifying all subscribers.
 | ||||||
|                 if !handler(event).await { | ///
 | ||||||
|                     debug!("[{name}] Handler requested exit."); | /// This will call every closure that was registered with `subscribe`.
 | ||||||
|                     break; | ///
 | ||||||
|                 } | /// # Example
 | ||||||
|             } | /// ```
 | ||||||
|             Err(broadcast::error::RecvError::Lagged(n)) => { | /// instrumentation::instrument(HarmonyEvent::HarmonyStarted);
 | ||||||
|                 debug!("[{name}] Lagged behind by {n} messages."); | /// ```
 | ||||||
|             } | pub fn instrument(event: HarmonyComposerEvent) -> Result<(), &'static str> { | ||||||
|             Err(_) => break, |     let subs = SUBSCRIBERS.lock().unwrap(); | ||||||
|         } | 
 | ||||||
|     } |     for callback in subs.values() { | ||||||
|  |         callback(&event); | ||||||
|  |     } | ||||||
|  | 
 | ||||||
|  |     Ok(()) | ||||||
| } | } | ||||||
|  | |||||||
| @ -99,7 +99,7 @@ impl std::fmt::Display for HarmonyProfile { | |||||||
| 
 | 
 | ||||||
| #[tokio::main] | #[tokio::main] | ||||||
| async fn main() { | async fn main() { | ||||||
|     let hc_logger_handle = harmony_composer_logger::init(); |     harmony_composer_logger::init(); | ||||||
|     let cli_args = GlobalArgs::parse(); |     let cli_args = GlobalArgs::parse(); | ||||||
| 
 | 
 | ||||||
|     let harmony_path = Path::new(&cli_args.harmony_path) |     let harmony_path = Path::new(&cli_args.harmony_path) | ||||||
| @ -199,8 +199,6 @@ async fn main() { | |||||||
|     } |     } | ||||||
| 
 | 
 | ||||||
|     instrumentation::instrument(HarmonyComposerEvent::Shutdown).unwrap(); |     instrumentation::instrument(HarmonyComposerEvent::Shutdown).unwrap(); | ||||||
| 
 |  | ||||||
|     let _ = tokio::try_join!(hc_logger_handle); |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| #[derive(Clone, Debug, clap::ValueEnum)] | #[derive(Clone, Debug, clap::ValueEnum)] | ||||||
|  | |||||||
| @ -94,13 +94,9 @@ async fn init_instrumentation() -> tokio::task::JoinHandle<()> { | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| async fn handle_harmony_events() { | async fn handle_harmony_events() { | ||||||
|     instrumentation::subscribe("Harmony TUI Logger", async |event| { |     instrumentation::subscribe("Harmony TUI Logger", |_| { | ||||||
|         if let HarmonyEvent::HarmonyFinished = event { |         // TODO: Display events in the TUI
 | ||||||
|             return false; |     }); | ||||||
|         }; |  | ||||||
|         true |  | ||||||
|     }) |  | ||||||
|     .await; |  | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| pub struct HarmonyTUI<T: Topology> { | pub struct HarmonyTUI<T: Topology> { | ||||||
|  | |||||||
		Loading…
	
		Reference in New Issue
	
	Block a user