Compare commits
	
		
			2 Commits
		
	
	
		
			78b80c2169
			...
			b857412151
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
| b857412151 | |||
| 7bb3602ab8 | 
							
								
								
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							
							
						
						
									
										2
									
								
								Cargo.lock
									
									
									
										generated
									
									
									
								
							| @ -1732,7 +1732,6 @@ dependencies = [ | ||||
| name = "example-pxe" | ||||
| version = "0.1.0" | ||||
| dependencies = [ | ||||
|  "askama", | ||||
|  "cidr", | ||||
|  "env_logger", | ||||
|  "harmony", | ||||
| @ -2169,6 +2168,7 @@ dependencies = [ | ||||
| name = "harmony" | ||||
| version = "0.1.0" | ||||
| dependencies = [ | ||||
|  "askama", | ||||
|  "async-trait", | ||||
|  "base64 0.22.1", | ||||
|  "bollard", | ||||
|  | ||||
| @ -18,5 +18,4 @@ harmony_macros = { path = "../../harmony_macros" } | ||||
| log = { workspace = true } | ||||
| env_logger = { workspace = true } | ||||
| url = { workspace = true } | ||||
| askama = "0.14.0" | ||||
| serde.workspace = true | ||||
|  | ||||
| @ -1,97 +1,24 @@ | ||||
| mod topology; | ||||
| 
 | ||||
| use std::net::IpAddr; | ||||
| 
 | ||||
| use askama::Template; | ||||
| use harmony::{ | ||||
|     data::{FileContent, FilePath}, | ||||
|     modules::{dhcp::DhcpScore, http::StaticFilesHttpScore, tftp::TftpScore}, | ||||
|     score::Score, | ||||
|     topology::{HAClusterTopology, Url}, | ||||
| }; | ||||
| 
 | ||||
| use crate::topology::{get_inventory, get_topology}; | ||||
| use harmony::modules::okd::ipxe::OkdIpxeScore; | ||||
| 
 | ||||
| #[tokio::main] | ||||
| async fn main() { | ||||
|     let inventory = get_inventory(); | ||||
|     let topology = get_topology().await; | ||||
|     let gateway_ip = &topology.router.get_gateway(); | ||||
| 
 | ||||
|     let kickstart_filename = "inventory.kickstart"; | ||||
|     let cluster_pubkey_filename = "cluster_ssh_key.pub"; | ||||
|     let harmony_inventory_agent = "harmony_inventory_agent"; | ||||
|     let kickstart_filename = "inventory.kickstart".to_string(); | ||||
|     let cluster_pubkey_filename = "cluster_ssh_key.pub".to_string(); | ||||
|     let harmony_inventory_agent = "harmony_inventory_agent".to_string(); | ||||
| 
 | ||||
|     // TODO: this should be a single IPXEScore instead of having the user do this step by step
 | ||||
|     let scores: Vec<Box<dyn Score<HAClusterTopology>>> = vec![ | ||||
|         Box::new(DhcpScore { | ||||
|             host_binding: vec![], | ||||
|             next_server: Some(topology.router.get_gateway()), | ||||
|             boot_filename: None, | ||||
|             filename: Some("undionly.kpxe".to_string()), | ||||
|             filename64: Some("ipxe.efi".to_string()), | ||||
|             filenameipxe: Some(format!("http://{gateway_ip}:8080/boot.ipxe").to_string()), | ||||
|         }), | ||||
|         Box::new(TftpScore { | ||||
|             files_to_serve: Url::LocalFolder("./data/pxe/okd/tftpboot/".to_string()), | ||||
|         }), | ||||
|         Box::new(StaticFilesHttpScore { | ||||
|             // TODO The current russh based copy is way too slow, check for a lib update or use scp
 | ||||
|             // when available
 | ||||
|             //
 | ||||
|             // For now just run :
 | ||||
|             // scp -r data/pxe/okd/http_files/* root@192.168.1.1:/usr/local/http/
 | ||||
|             //
 | ||||
|             folder_to_serve: None, | ||||
|             // folder_to_serve: Some(Url::LocalFolder("./data/pxe/okd/http_files/".to_string())),
 | ||||
|             files: vec![ | ||||
|                 FileContent { | ||||
|                     path: FilePath::Relative("boot.ipxe".to_string()), | ||||
|                     content: BootIpxeTpl { gateway_ip }.to_string(), | ||||
|                 }, | ||||
|                 FileContent { | ||||
|                     path: FilePath::Relative(kickstart_filename.to_string()), | ||||
|                     content: InventoryKickstartTpl { | ||||
|                         gateway_ip, | ||||
|     let ipxe_score = OkdIpxeScore { | ||||
|         kickstart_filename, | ||||
|         harmony_inventory_agent, | ||||
|         cluster_pubkey_filename, | ||||
|                     } | ||||
|                     .to_string(), | ||||
|                 }, | ||||
|                 FileContent { | ||||
|                     path: FilePath::Relative("fallback.ipxe".to_string()), | ||||
|                     content: FallbackIpxeTpl { | ||||
|                         gateway_ip, | ||||
|                         kickstart_filename, | ||||
|                     } | ||||
|                     .to_string(), | ||||
|                 }, | ||||
|             ], | ||||
|         }), | ||||
|     ]; | ||||
|     }; | ||||
| 
 | ||||
|     harmony_cli::run(inventory, topology, scores, None) | ||||
|     harmony_cli::run(inventory, topology, vec![Box::new(ipxe_score)], None) | ||||
|         .await | ||||
|         .unwrap(); | ||||
| } | ||||
| 
 | ||||
| #[derive(Template)] | ||||
| #[template(path = "boot.ipxe.j2")] | ||||
| struct BootIpxeTpl<'a> { | ||||
|     gateway_ip: &'a IpAddr, | ||||
| } | ||||
| 
 | ||||
| #[derive(Template)] | ||||
| #[template(path = "fallback.ipxe.j2")] | ||||
| struct FallbackIpxeTpl<'a> { | ||||
|     gateway_ip: &'a IpAddr, | ||||
|     kickstart_filename: &'a str, | ||||
| } | ||||
| 
 | ||||
| #[derive(Template)] | ||||
| #[template(path = "inventory.kickstart.j2")] | ||||
| struct InventoryKickstartTpl<'a> { | ||||
|     gateway_ip: &'a IpAddr, | ||||
|     cluster_pubkey_filename: &'a str, | ||||
|     harmony_inventory_agent: &'a str, | ||||
| } | ||||
|  | ||||
| @ -1,5 +1,3 @@ | ||||
| use std::{net::IpAddr, sync::Arc}; | ||||
| 
 | ||||
| use cidr::Ipv4Cidr; | ||||
| use harmony::{ | ||||
|     hardware::{FirewallGroup, HostCategory, Location, PhysicalHost, SwitchGroup}, | ||||
| @ -10,6 +8,7 @@ use harmony::{ | ||||
| use harmony_macros::{ip, ipv4}; | ||||
| use harmony_secret::{Secret, SecretManager}; | ||||
| use serde::{Deserialize, Serialize}; | ||||
| use std::{net::IpAddr, sync::Arc}; | ||||
| 
 | ||||
| #[derive(Secret, Serialize, Deserialize, Debug, PartialEq)] | ||||
| struct OPNSenseFirewallConfig { | ||||
|  | ||||
| @ -69,6 +69,7 @@ base64.workspace = true | ||||
| once_cell = "1.21.3" | ||||
| harmony_inventory_agent = { path = "../harmony_inventory_agent" } | ||||
| harmony_secret_derive = { version = "0.1.0", path = "../harmony_secret_derive" } | ||||
| askama = "0.14.0" | ||||
| 
 | ||||
| [dev-dependencies] | ||||
| pretty_assertions.workspace = true | ||||
|  | ||||
| @ -1,6 +1,5 @@ | ||||
| use log::debug; | ||||
| use once_cell::sync::Lazy; | ||||
| use tokio::sync::broadcast; | ||||
| use std::{collections::HashMap, sync::Mutex}; | ||||
| 
 | ||||
| use crate::modules::application::ApplicationFeatureStatus; | ||||
| 
 | ||||
| @ -40,43 +39,43 @@ pub enum HarmonyEvent { | ||||
|     }, | ||||
| } | ||||
| 
 | ||||
| static HARMONY_EVENT_BUS: Lazy<broadcast::Sender<HarmonyEvent>> = Lazy::new(|| { | ||||
|     // TODO: Adjust channel capacity
 | ||||
|     let (tx, _rx) = broadcast::channel(100); | ||||
|     tx | ||||
| }); | ||||
| type Subscriber = Box<dyn Fn(&HarmonyEvent) + Send + Sync>; | ||||
| 
 | ||||
| pub fn instrument(event: HarmonyEvent) -> Result<(), &'static str> { | ||||
|     if cfg!(any(test, feature = "testing")) { | ||||
|         let _ = event; // Suppress the "unused variable" warning for `event`
 | ||||
|         Ok(()) | ||||
|     } else { | ||||
|         match HARMONY_EVENT_BUS.send(event) { | ||||
|             Ok(_) => Ok(()), | ||||
|             Err(_) => Err("send error: no subscribers"), | ||||
|         } | ||||
|     } | ||||
| } | ||||
| static SUBSCRIBERS: Lazy<Mutex<HashMap<String, Subscriber>>> = | ||||
|     Lazy::new(|| Mutex::new(HashMap::new())); | ||||
| 
 | ||||
| pub async fn subscribe<F, Fut>(name: &str, mut handler: F) | ||||
| /// Subscribes a listener to all instrumentation events.
 | ||||
| ///
 | ||||
| /// Simply provide a unique name and a closure to run when an event happens.
 | ||||
| ///
 | ||||
| /// # Example
 | ||||
| /// ```
 | ||||
| /// instrumentation::subscribe("my_logger", |event| {
 | ||||
| ///   println!("Event occurred: {:?}", event);
 | ||||
| /// });
 | ||||
| /// ```
 | ||||
| pub fn subscribe<F>(name: &str, callback: F) | ||||
| where | ||||
|     F: FnMut(HarmonyEvent) -> Fut + Send + 'static, | ||||
|     Fut: Future<Output = bool> + Send, | ||||
|     F: Fn(&HarmonyEvent) + Send + Sync + 'static, | ||||
| { | ||||
|     let mut rx = HARMONY_EVENT_BUS.subscribe(); | ||||
|     debug!("[{name}] Service started. Listening for events..."); | ||||
|     loop { | ||||
|         match rx.recv().await { | ||||
|             Ok(event) => { | ||||
|                 if !handler(event).await { | ||||
|                     debug!("[{name}] Handler requested exit."); | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|             Err(broadcast::error::RecvError::Lagged(n)) => { | ||||
|                 debug!("[{name}] Lagged behind by {n} messages."); | ||||
|             } | ||||
|             Err(_) => break, | ||||
|     let mut subs = SUBSCRIBERS.lock().unwrap(); | ||||
|     subs.insert(name.to_string(), Box::new(callback)); | ||||
| } | ||||
| 
 | ||||
| /// Instruments an event, notifying all subscribers.
 | ||||
| ///
 | ||||
| /// This will call every closure that was registered with `subscribe`.
 | ||||
| ///
 | ||||
| /// # Example
 | ||||
| /// ```
 | ||||
| /// instrumentation::instrument(HarmonyEvent::HarmonyStarted);
 | ||||
| /// ```
 | ||||
| pub fn instrument(event: HarmonyEvent) -> Result<(), &'static str> { | ||||
|     let subs = SUBSCRIBERS.lock().unwrap(); | ||||
| 
 | ||||
|     for callback in subs.values() { | ||||
|         callback(&event); | ||||
|     } | ||||
| 
 | ||||
|     Ok(()) | ||||
| } | ||||
|  | ||||
| @ -74,6 +74,7 @@ impl<T: Topology> Maestro<T> { | ||||
| 
 | ||||
|     fn is_topology_initialized(&self) -> bool { | ||||
|         self.topology_state.status == TopologyStatus::Success | ||||
|             || self.topology_state.status == TopologyStatus::Noop | ||||
|     } | ||||
| 
 | ||||
|     pub async fn interpret(&self, score: Box<dyn Score<T>>) -> Result<Outcome, InterpretError> { | ||||
|  | ||||
							
								
								
									
										148
									
								
								harmony/src/modules/okd/ipxe.rs
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										148
									
								
								harmony/src/modules/okd/ipxe.rs
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,148 @@ | ||||
| use askama::Template; | ||||
| use async_trait::async_trait; | ||||
| use derive_new::new; | ||||
| use serde::Serialize; | ||||
| use std::net::IpAddr; | ||||
| 
 | ||||
| use crate::{ | ||||
|     data::{FileContent, FilePath, Id, Version}, | ||||
|     interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, | ||||
|     inventory::Inventory, | ||||
|     modules::{dhcp::DhcpScore, http::StaticFilesHttpScore, tftp::TftpScore}, | ||||
|     score::Score, | ||||
|     topology::{DhcpServer, HttpServer, Router, TftpServer, Topology, Url}, | ||||
| }; | ||||
| 
 | ||||
| #[derive(Debug, new, Clone, Serialize)] | ||||
| pub struct OkdIpxeScore { | ||||
|     pub kickstart_filename: String, | ||||
|     pub harmony_inventory_agent: String, | ||||
|     pub cluster_pubkey_filename: String, | ||||
| } | ||||
| 
 | ||||
| impl<T: Topology + DhcpServer + TftpServer + HttpServer + Router> Score<T> for OkdIpxeScore { | ||||
|     fn create_interpret(&self) -> Box<dyn Interpret<T>> { | ||||
|         Box::new(IpxeInterpret::new(self.clone())) | ||||
|     } | ||||
| 
 | ||||
|     fn name(&self) -> String { | ||||
|         "OkdIpxeScore".to_string() | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, new, Clone)] | ||||
| pub struct IpxeInterpret { | ||||
|     score: OkdIpxeScore, | ||||
| } | ||||
| 
 | ||||
| #[async_trait] | ||||
| impl<T: Topology + DhcpServer + TftpServer + HttpServer + Router> Interpret<T> for IpxeInterpret { | ||||
|     async fn execute( | ||||
|         &self, | ||||
|         inventory: &Inventory, | ||||
|         topology: &T, | ||||
|     ) -> Result<Outcome, InterpretError> { | ||||
|         let gateway_ip = topology.get_gateway(); | ||||
| 
 | ||||
|         let scores: Vec<Box<dyn Score<T>>> = vec![ | ||||
|             Box::new(DhcpScore { | ||||
|                 host_binding: vec![], | ||||
|                 next_server: Some(topology.get_gateway()), | ||||
|                 boot_filename: None, | ||||
|                 filename: Some("undionly.kpxe".to_string()), | ||||
|                 filename64: Some("ipxe.efi".to_string()), | ||||
|                 filenameipxe: Some(format!("http://{gateway_ip}:8080/boot.ipxe").to_string()), | ||||
|             }), | ||||
|             Box::new(TftpScore { | ||||
|                 files_to_serve: Url::LocalFolder("./data/pxe/okd/tftpboot/".to_string()), | ||||
|             }), | ||||
|             Box::new(StaticFilesHttpScore { | ||||
|                 // TODO The current russh based copy is way too slow, check for a lib update or use scp
 | ||||
|                 // when available
 | ||||
|                 //
 | ||||
|                 // For now just run :
 | ||||
|                 // scp -r data/pxe/okd/http_files/* root@192.168.1.1:/usr/local/http/
 | ||||
|                 //
 | ||||
|                 folder_to_serve: None, | ||||
|                 // folder_to_serve: Some(Url::LocalFolder("./data/pxe/okd/http_files/".to_string())),
 | ||||
|                 files: vec![ | ||||
|                     FileContent { | ||||
|                         path: FilePath::Relative("boot.ipxe".to_string()), | ||||
|                         content: BootIpxeTpl { | ||||
|                             gateway_ip: &gateway_ip, | ||||
|                         } | ||||
|                         .to_string(), | ||||
|                     }, | ||||
|                     FileContent { | ||||
|                         path: FilePath::Relative(self.score.kickstart_filename.clone()), | ||||
|                         content: InventoryKickstartTpl { | ||||
|                             gateway_ip: &gateway_ip, | ||||
|                             harmony_inventory_agent: &self.score.harmony_inventory_agent, | ||||
|                             cluster_pubkey_filename: &self.score.cluster_pubkey_filename, | ||||
|                         } | ||||
|                         .to_string(), | ||||
|                     }, | ||||
|                     FileContent { | ||||
|                         path: FilePath::Relative("fallback.ipxe".to_string()), | ||||
|                         content: FallbackIpxeTpl { | ||||
|                             gateway_ip: &gateway_ip, | ||||
|                             kickstart_filename: &self.score.kickstart_filename, | ||||
|                         } | ||||
|                         .to_string(), | ||||
|                     }, | ||||
|                 ], | ||||
|             }), | ||||
|         ]; | ||||
| 
 | ||||
|         for score in scores { | ||||
|             let result = score.interpret(inventory, topology).await; | ||||
|             match result { | ||||
|                 Ok(outcome) => match outcome.status { | ||||
|                     InterpretStatus::SUCCESS => continue, | ||||
|                     InterpretStatus::NOOP => continue, | ||||
|                     _ => return Err(InterpretError::new(outcome.message)), | ||||
|                 }, | ||||
|                 Err(e) => return Err(e), | ||||
|             }; | ||||
|         } | ||||
| 
 | ||||
|         Ok(Outcome::success("Ipxe installed".to_string())) | ||||
|     } | ||||
| 
 | ||||
|     fn get_name(&self) -> InterpretName { | ||||
|         InterpretName::Ipxe | ||||
|     } | ||||
| 
 | ||||
|     fn get_version(&self) -> Version { | ||||
|         todo!() | ||||
|     } | ||||
| 
 | ||||
|     fn get_status(&self) -> InterpretStatus { | ||||
|         todo!() | ||||
|     } | ||||
| 
 | ||||
|     fn get_children(&self) -> Vec<Id> { | ||||
|         todo!() | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Template)] | ||||
| #[template(path = "boot.ipxe.j2")] | ||||
| struct BootIpxeTpl<'a> { | ||||
|     gateway_ip: &'a IpAddr, | ||||
| } | ||||
| 
 | ||||
| #[derive(Template)] | ||||
| #[template(path = "fallback.ipxe.j2")] | ||||
| struct FallbackIpxeTpl<'a> { | ||||
|     gateway_ip: &'a IpAddr, | ||||
|     kickstart_filename: &'a str, | ||||
| } | ||||
| 
 | ||||
| #[derive(Template)] | ||||
| #[template(path = "inventory.kickstart.j2")] | ||||
| struct InventoryKickstartTpl<'a> { | ||||
|     gateway_ip: &'a IpAddr, | ||||
|     cluster_pubkey_filename: &'a str, | ||||
|     harmony_inventory_agent: &'a str, | ||||
| } | ||||
| @ -2,5 +2,6 @@ pub mod bootstrap_dhcp; | ||||
| pub mod bootstrap_load_balancer; | ||||
| pub mod dhcp; | ||||
| pub mod dns; | ||||
| pub mod ipxe; | ||||
| pub mod load_balancer; | ||||
| pub mod upgrade; | ||||
|  | ||||
| @ -7,19 +7,11 @@ use harmony::{ | ||||
| }; | ||||
| use log::{error, info, log_enabled}; | ||||
| use std::io::Write; | ||||
| use std::sync::{Arc, Mutex}; | ||||
| use std::sync::Mutex; | ||||
| 
 | ||||
| pub fn init() -> tokio::task::JoinHandle<()> { | ||||
| pub fn init() { | ||||
|     configure_logger(); | ||||
|     let handle = tokio::spawn(handle_events()); | ||||
| 
 | ||||
|     loop { | ||||
|         if instrumentation::instrument(HarmonyEvent::HarmonyStarted).is_ok() { | ||||
|             break; | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     handle | ||||
|     handle_events(); | ||||
| } | ||||
| 
 | ||||
| fn configure_logger() { | ||||
| @ -86,16 +78,12 @@ fn configure_logger() { | ||||
|         .init(); | ||||
| } | ||||
| 
 | ||||
| async fn handle_events() { | ||||
|     let preparing_topology = Arc::new(Mutex::new(false)); | ||||
|     let current_score: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None)); | ||||
| fn handle_events() { | ||||
|     let preparing_topology = Mutex::new(false); | ||||
|     let current_score: Mutex<Option<String>> = Mutex::new(None); | ||||
| 
 | ||||
|     instrumentation::subscribe("Harmony CLI Logger", { | ||||
|         move |event| { | ||||
|             let preparing_topology = Arc::clone(&preparing_topology); | ||||
|             let current_score = Arc::clone(¤t_score); | ||||
| 
 | ||||
|             async move { | ||||
|             let mut preparing_topology = preparing_topology.lock().unwrap(); | ||||
|             let mut current_score = current_score.lock().unwrap(); | ||||
| 
 | ||||
| @ -104,7 +92,6 @@ async fn handle_events() { | ||||
|                 HarmonyEvent::HarmonyFinished => { | ||||
|                     let emoji = crate::theme::EMOJI_HARMONY.to_string(); | ||||
|                     info!(emoji = emoji.as_str(); "Harmony completed"); | ||||
|                         return false; | ||||
|                 } | ||||
|                 HarmonyEvent::TopologyStateChanged { | ||||
|                     topology, | ||||
| @ -113,7 +100,10 @@ async fn handle_events() { | ||||
|                 } => match status { | ||||
|                     TopologyStatus::Queued => {} | ||||
|                     TopologyStatus::Preparing => { | ||||
|                             let emoji = format!("{}", style(crate::theme::EMOJI_TOPOLOGY.to_string()).yellow()); | ||||
|                         let emoji = format!( | ||||
|                             "{}", | ||||
|                             style(crate::theme::EMOJI_TOPOLOGY.to_string()).yellow() | ||||
|                         ); | ||||
|                         info!(emoji = emoji.as_str(); "Preparing environment: {topology}..."); | ||||
|                         (*preparing_topology) = true; | ||||
|                     } | ||||
| @ -158,7 +148,7 @@ async fn handle_events() { | ||||
|                     score, | ||||
|                     outcome, | ||||
|                 } => { | ||||
|                         if current_score.is_some() && current_score.clone().unwrap() == score { | ||||
|                     if current_score.is_some() && ¤t_score.clone().unwrap() == score { | ||||
|                         (*current_score) = None; | ||||
|                     } | ||||
| 
 | ||||
| @ -175,7 +165,7 @@ async fn handle_events() { | ||||
|                             } | ||||
|                         }, | ||||
|                         Err(err) => { | ||||
|                                 error!(status = "failed"; "{}", err); | ||||
|                             error!(status = "failed"; "{err}"); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
| @ -186,19 +176,16 @@ async fn handle_events() { | ||||
|                     status, | ||||
|                 } => match status { | ||||
|                     ApplicationFeatureStatus::Installing => { | ||||
|                             info!("Installing feature '{}' for '{}'...", feature, application); | ||||
|                         info!("Installing feature '{feature}' for '{application}'..."); | ||||
|                     } | ||||
|                     ApplicationFeatureStatus::Installed => { | ||||
|                             info!(status = "finished"; "Feature '{}' installed", feature); | ||||
|                         info!(status = "finished"; "Feature '{feature}' installed"); | ||||
|                     } | ||||
|                     ApplicationFeatureStatus::Failed { details } => { | ||||
|                             error!(status = "failed"; "Feature '{}' installation failed: {}", feature, details); | ||||
|                         error!(status = "failed"; "Feature '{feature}' installation failed: {details}"); | ||||
|                     } | ||||
|                 }, | ||||
|             } | ||||
|                 true | ||||
|         } | ||||
|         } | ||||
|     }) | ||||
|     .await; | ||||
|     }); | ||||
| } | ||||
|  | ||||
| @ -115,7 +115,7 @@ pub async fn run_cli<T: Topology + Send + Sync + 'static>( | ||||
|     scores: Vec<Box<dyn Score<T>>>, | ||||
|     args: Args, | ||||
| ) -> Result<(), Box<dyn std::error::Error>> { | ||||
|     let cli_logger_handle = cli_logger::init(); | ||||
|     cli_logger::init(); | ||||
| 
 | ||||
|     let mut maestro = Maestro::initialize(inventory, topology).await.unwrap(); | ||||
|     maestro.register_all(scores); | ||||
| @ -123,7 +123,6 @@ pub async fn run_cli<T: Topology + Send + Sync + 'static>( | ||||
|     let result = init(maestro, args).await; | ||||
| 
 | ||||
|     instrumentation::instrument(instrumentation::HarmonyEvent::HarmonyFinished).unwrap(); | ||||
|     let _ = tokio::try_join!(cli_logger_handle); | ||||
|     result | ||||
| } | ||||
| 
 | ||||
|  | ||||
| @ -1,39 +1,26 @@ | ||||
| use harmony_cli::progress::{IndicatifProgressTracker, ProgressTracker}; | ||||
| use indicatif::MultiProgress; | ||||
| use std::sync::Arc; | ||||
| 
 | ||||
| use crate::instrumentation::{self, HarmonyComposerEvent}; | ||||
| 
 | ||||
| pub fn init() -> tokio::task::JoinHandle<()> { | ||||
| pub fn init() { | ||||
|     configure_logger(); | ||||
|     let handle = tokio::spawn(handle_events()); | ||||
| 
 | ||||
|     loop { | ||||
|         if instrumentation::instrument(HarmonyComposerEvent::HarmonyComposerStarted).is_ok() { | ||||
|             break; | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     handle | ||||
|     handle_events(); | ||||
| } | ||||
| 
 | ||||
| fn configure_logger() { | ||||
|     env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build(); | ||||
| } | ||||
| 
 | ||||
| pub async fn handle_events() { | ||||
|     let progress_tracker = Arc::new(IndicatifProgressTracker::new(MultiProgress::new())); | ||||
| pub fn handle_events() { | ||||
|     let progress_tracker = IndicatifProgressTracker::new(MultiProgress::new()); | ||||
| 
 | ||||
|     const SETUP_SECTION: &str = "project-initialization"; | ||||
|     const COMPILTATION_TASK: &str = "compilation"; | ||||
|     const PROGRESS_DEPLOYMENT: &str = "deployment"; | ||||
| 
 | ||||
|     instrumentation::subscribe("Harmony Composer Logger", { | ||||
|         move |event| { | ||||
|             let progress_tracker = Arc::clone(&progress_tracker); | ||||
| 
 | ||||
|             async move { | ||||
|                 match event { | ||||
|         move |event| match event { | ||||
|             HarmonyComposerEvent::HarmonyComposerStarted => {} | ||||
|             HarmonyComposerEvent::ProjectInitializationStarted => { | ||||
|                 progress_tracker.add_section( | ||||
| @ -46,13 +33,16 @@ pub async fn handle_events() { | ||||
|             } | ||||
|             HarmonyComposerEvent::ProjectInitialized => {} | ||||
|             HarmonyComposerEvent::ProjectCompilationStarted { details } => { | ||||
|                         progress_tracker.add_task(SETUP_SECTION, COMPILTATION_TASK, &details); | ||||
|                 progress_tracker.add_task(SETUP_SECTION, COMPILTATION_TASK, details); | ||||
|             } | ||||
|             HarmonyComposerEvent::ProjectCompiled => { | ||||
|                 progress_tracker.finish_task(COMPILTATION_TASK, "project compiled"); | ||||
|             } | ||||
|             HarmonyComposerEvent::ProjectCompilationFailed { details } => { | ||||
|                         progress_tracker.fail_task(COMPILTATION_TASK, &format!("failed to compile project:\n{details}")); | ||||
|                 progress_tracker.fail_task( | ||||
|                     COMPILTATION_TASK, | ||||
|                     &format!("failed to compile project:\n{details}"), | ||||
|                 ); | ||||
|             } | ||||
|             HarmonyComposerEvent::DeploymentStarted { target, profile } => { | ||||
|                 progress_tracker.add_section( | ||||
| @ -68,15 +58,9 @@ pub async fn handle_events() { | ||||
|             } | ||||
|             HarmonyComposerEvent::DeploymentFailed { details } => { | ||||
|                 progress_tracker.add_task(PROGRESS_DEPLOYMENT, "deployment-failed", ""); | ||||
|                         progress_tracker.fail_task("deployment-failed", &details); | ||||
|                     }, | ||||
|                     HarmonyComposerEvent::Shutdown => { | ||||
|                         return false; | ||||
|                     } | ||||
|                 } | ||||
|                 true | ||||
|                 progress_tracker.fail_task("deployment-failed", details); | ||||
|             } | ||||
|             HarmonyComposerEvent::Shutdown => {} | ||||
|         } | ||||
|     }) | ||||
|     .await | ||||
| } | ||||
|  | ||||
| @ -1,6 +1,5 @@ | ||||
| use log::debug; | ||||
| use once_cell::sync::Lazy; | ||||
| use tokio::sync::broadcast; | ||||
| use std::{collections::HashMap, sync::Mutex}; | ||||
| 
 | ||||
| use crate::{HarmonyProfile, HarmonyTarget}; | ||||
| 
 | ||||
| @ -27,48 +26,43 @@ pub enum HarmonyComposerEvent { | ||||
|     Shutdown, | ||||
| } | ||||
| 
 | ||||
| static HARMONY_COMPOSER_EVENT_BUS: Lazy<broadcast::Sender<HarmonyComposerEvent>> = | ||||
|     Lazy::new(|| { | ||||
|         // TODO: Adjust channel capacity
 | ||||
|         let (tx, _rx) = broadcast::channel(16); | ||||
|         tx | ||||
|     }); | ||||
| type Subscriber = Box<dyn Fn(&HarmonyComposerEvent) + Send + Sync>; | ||||
| 
 | ||||
| static SUBSCRIBERS: Lazy<Mutex<HashMap<String, Subscriber>>> = | ||||
|     Lazy::new(|| Mutex::new(HashMap::new())); | ||||
| 
 | ||||
| /// Subscribes a listener to all instrumentation events.
 | ||||
| ///
 | ||||
| /// Simply provide a unique name and a closure to run when an event happens.
 | ||||
| ///
 | ||||
| /// # Example
 | ||||
| /// ```
 | ||||
| /// instrumentation::subscribe("my_logger", |event| {
 | ||||
| ///   println!("Event occurred: {:?}", event);
 | ||||
| /// });
 | ||||
| /// ```
 | ||||
| pub fn subscribe<F>(name: &str, callback: F) | ||||
| where | ||||
|     F: Fn(&HarmonyComposerEvent) + Send + Sync + 'static, | ||||
| { | ||||
|     let mut subs = SUBSCRIBERS.lock().unwrap(); | ||||
|     subs.insert(name.to_string(), Box::new(callback)); | ||||
| } | ||||
| 
 | ||||
| /// Instruments an event, notifying all subscribers.
 | ||||
| ///
 | ||||
| /// This will call every closure that was registered with `subscribe`.
 | ||||
| ///
 | ||||
| /// # Example
 | ||||
| /// ```
 | ||||
| /// instrumentation::instrument(HarmonyEvent::HarmonyStarted);
 | ||||
| /// ```
 | ||||
| pub fn instrument(event: HarmonyComposerEvent) -> Result<(), &'static str> { | ||||
|     #[cfg(not(test))] | ||||
|     { | ||||
|         match HARMONY_COMPOSER_EVENT_BUS.send(event) { | ||||
|             Ok(_) => Ok(()), | ||||
|             Err(_) => Err("send error: no subscribers"), | ||||
|         } | ||||
|     let subs = SUBSCRIBERS.lock().unwrap(); | ||||
| 
 | ||||
|     for callback in subs.values() { | ||||
|         callback(&event); | ||||
|     } | ||||
| 
 | ||||
|     #[cfg(test)] | ||||
|     { | ||||
|         let _ = event; // Suppress the "unused variable" warning for `event`
 | ||||
|     Ok(()) | ||||
| } | ||||
| } | ||||
| 
 | ||||
| pub async fn subscribe<F, Fut>(name: &str, mut handler: F) | ||||
| where | ||||
|     F: FnMut(HarmonyComposerEvent) -> Fut + Send + 'static, | ||||
|     Fut: Future<Output = bool> + Send, | ||||
| { | ||||
|     let mut rx = HARMONY_COMPOSER_EVENT_BUS.subscribe(); | ||||
|     debug!("[{name}] Service started. Listening for events..."); | ||||
|     loop { | ||||
|         match rx.recv().await { | ||||
|             Ok(event) => { | ||||
|                 if !handler(event).await { | ||||
|                     debug!("[{name}] Handler requested exit."); | ||||
|                     break; | ||||
|                 } | ||||
|             } | ||||
|             Err(broadcast::error::RecvError::Lagged(n)) => { | ||||
|                 debug!("[{name}] Lagged behind by {n} messages."); | ||||
|             } | ||||
|             Err(_) => break, | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -99,7 +99,7 @@ impl std::fmt::Display for HarmonyProfile { | ||||
| 
 | ||||
| #[tokio::main] | ||||
| async fn main() { | ||||
|     let hc_logger_handle = harmony_composer_logger::init(); | ||||
|     harmony_composer_logger::init(); | ||||
|     let cli_args = GlobalArgs::parse(); | ||||
| 
 | ||||
|     let harmony_path = Path::new(&cli_args.harmony_path) | ||||
| @ -199,8 +199,6 @@ async fn main() { | ||||
|     } | ||||
| 
 | ||||
|     instrumentation::instrument(HarmonyComposerEvent::Shutdown).unwrap(); | ||||
| 
 | ||||
|     let _ = tokio::try_join!(hc_logger_handle); | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone, Debug, clap::ValueEnum)] | ||||
|  | ||||
| @ -94,13 +94,9 @@ async fn init_instrumentation() -> tokio::task::JoinHandle<()> { | ||||
| } | ||||
| 
 | ||||
| async fn handle_harmony_events() { | ||||
|     instrumentation::subscribe("Harmony TUI Logger", async |event| { | ||||
|         if let HarmonyEvent::HarmonyFinished = event { | ||||
|             return false; | ||||
|         }; | ||||
|         true | ||||
|     }) | ||||
|     .await; | ||||
|     instrumentation::subscribe("Harmony TUI Logger", |_| { | ||||
|         // TODO: Display events in the TUI
 | ||||
|     }); | ||||
| } | ||||
| 
 | ||||
| pub struct HarmonyTUI<T: Topology> { | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user