diff --git a/harmony/src/domain/instrumentation.rs b/harmony/src/domain/instrumentation.rs index 6f0497e..a41b70d 100644 --- a/harmony/src/domain/instrumentation.rs +++ b/harmony/src/domain/instrumentation.rs @@ -1,6 +1,5 @@ -use log::debug; use once_cell::sync::Lazy; -use tokio::sync::broadcast; +use std::{collections::HashMap, sync::Mutex}; use crate::modules::application::ApplicationFeatureStatus; @@ -40,43 +39,43 @@ pub enum HarmonyEvent { }, } -static HARMONY_EVENT_BUS: Lazy> = Lazy::new(|| { - // TODO: Adjust channel capacity - let (tx, _rx) = broadcast::channel(100); - tx -}); +type Subscriber = Box; -pub fn instrument(event: HarmonyEvent) -> Result<(), &'static str> { - if cfg!(any(test, feature = "testing")) { - let _ = event; // Suppress the "unused variable" warning for `event` - Ok(()) - } else { - match HARMONY_EVENT_BUS.send(event) { - Ok(_) => Ok(()), - Err(_) => Err("send error: no subscribers"), - } - } -} +static SUBSCRIBERS: Lazy>> = + Lazy::new(|| Mutex::new(HashMap::new())); -pub async fn subscribe(name: &str, mut handler: F) +/// Subscribes a listener to all instrumentation events. +/// +/// Simply provide a unique name and a closure to run when an event happens. +/// +/// # Example +/// ``` +/// instrumentation::subscribe("my_logger", |event| { +/// println!("Event occurred: {:?}", event); +/// }); +/// ``` +pub fn subscribe(name: &str, callback: F) where - F: FnMut(HarmonyEvent) -> Fut + Send + 'static, - Fut: Future + Send, + F: Fn(&HarmonyEvent) + Send + Sync + 'static, { - let mut rx = HARMONY_EVENT_BUS.subscribe(); - debug!("[{name}] Service started. Listening for events..."); - loop { - match rx.recv().await { - Ok(event) => { - if !handler(event).await { - debug!("[{name}] Handler requested exit."); - break; - } - } - Err(broadcast::error::RecvError::Lagged(n)) => { - debug!("[{name}] Lagged behind by {n} messages."); - } - Err(_) => break, - } - } + let mut subs = SUBSCRIBERS.lock().unwrap(); + subs.insert(name.to_string(), Box::new(callback)); +} + +/// Instruments an event, notifying all subscribers. +/// +/// This will call every closure that was registered with `subscribe`. +/// +/// # Example +/// ``` +/// instrumentation::instrument(HarmonyEvent::HarmonyStarted); +/// ``` +pub fn instrument(event: HarmonyEvent) -> Result<(), &'static str> { + let subs = SUBSCRIBERS.lock().unwrap(); + + for callback in subs.values() { + callback(&event); + } + + Ok(()) } diff --git a/harmony/src/domain/maestro/mod.rs b/harmony/src/domain/maestro/mod.rs index d9587c8..3469ea3 100644 --- a/harmony/src/domain/maestro/mod.rs +++ b/harmony/src/domain/maestro/mod.rs @@ -74,6 +74,7 @@ impl Maestro { fn is_topology_initialized(&self) -> bool { self.topology_state.status == TopologyStatus::Success + || self.topology_state.status == TopologyStatus::Noop } pub async fn interpret(&self, score: Box>) -> Result { diff --git a/harmony_cli/src/cli_logger.rs b/harmony_cli/src/cli_logger.rs index c2fc8d0..be61c2a 100644 --- a/harmony_cli/src/cli_logger.rs +++ b/harmony_cli/src/cli_logger.rs @@ -7,19 +7,11 @@ use harmony::{ }; use log::{error, info, log_enabled}; use std::io::Write; -use std::sync::{Arc, Mutex}; +use std::sync::Mutex; -pub fn init() -> tokio::task::JoinHandle<()> { +pub fn init() { configure_logger(); - let handle = tokio::spawn(handle_events()); - - loop { - if instrumentation::instrument(HarmonyEvent::HarmonyStarted).is_ok() { - break; - } - } - - handle + handle_events(); } fn configure_logger() { @@ -86,119 +78,114 @@ fn configure_logger() { .init(); } -async fn handle_events() { - let preparing_topology = Arc::new(Mutex::new(false)); - let current_score: Arc>> = Arc::new(Mutex::new(None)); +fn handle_events() { + let preparing_topology = Mutex::new(false); + let current_score: Mutex> = Mutex::new(None); instrumentation::subscribe("Harmony CLI Logger", { move |event| { - let preparing_topology = Arc::clone(&preparing_topology); - let current_score = Arc::clone(¤t_score); + let mut preparing_topology = preparing_topology.lock().unwrap(); + let mut current_score = current_score.lock().unwrap(); - async move { - let mut preparing_topology = preparing_topology.lock().unwrap(); - let mut current_score = current_score.lock().unwrap(); - - match event { - HarmonyEvent::HarmonyStarted => {} - HarmonyEvent::HarmonyFinished => { - let emoji = crate::theme::EMOJI_HARMONY.to_string(); - info!(emoji = emoji.as_str(); "Harmony completed"); - return false; - } - HarmonyEvent::TopologyStateChanged { - topology, - status, - message, - } => match status { - TopologyStatus::Queued => {} - TopologyStatus::Preparing => { - let emoji = format!("{}", style(crate::theme::EMOJI_TOPOLOGY.to_string()).yellow()); - info!(emoji = emoji.as_str(); "Preparing environment: {topology}..."); - (*preparing_topology) = true; - } - TopologyStatus::Success => { - (*preparing_topology) = false; - if let Some(message) = message { - info!(status = "finished"; "{message}"); - } - } - TopologyStatus::Noop => { - (*preparing_topology) = false; - if let Some(message) = message { - info!(status = "skipped"; "{message}"); - } - } - TopologyStatus::Error => { - (*preparing_topology) = false; - if let Some(message) = message { - error!(status = "failed"; "{message}"); - } - } - }, - HarmonyEvent::InterpretExecutionStarted { - execution_id: _, - topology: _, - interpret: _, - score, - message, - } => { - if *preparing_topology || current_score.is_some() { - info!("{message}"); - } else { - (*current_score) = Some(score.clone()); - let emoji = format!("{}", style(crate::theme::EMOJI_SCORE).blue()); - info!(emoji = emoji.as_str(); "Interpreting score: {score}..."); - } - } - HarmonyEvent::InterpretExecutionFinished { - execution_id: _, - topology: _, - interpret: _, - score, - outcome, - } => { - if current_score.is_some() && current_score.clone().unwrap() == score { - (*current_score) = None; - } - - match outcome { - Ok(outcome) => match outcome.status { - harmony::interpret::InterpretStatus::SUCCESS => { - info!(status = "finished"; "{}", outcome.message); - } - harmony::interpret::InterpretStatus::NOOP => { - info!(status = "skipped"; "{}", outcome.message); - } - _ => { - error!(status = "failed"; "{}", outcome.message); - } - }, - Err(err) => { - error!(status = "failed"; "{}", err); - } - } - } - HarmonyEvent::ApplicationFeatureStateChanged { - topology: _, - application, - feature, - status, - } => match status { - ApplicationFeatureStatus::Installing => { - info!("Installing feature '{}' for '{}'...", feature, application); - } - ApplicationFeatureStatus::Installed => { - info!(status = "finished"; "Feature '{}' installed", feature); - } - ApplicationFeatureStatus::Failed { details } => { - error!(status = "failed"; "Feature '{}' installation failed: {}", feature, details); - } - }, + match event { + HarmonyEvent::HarmonyStarted => {} + HarmonyEvent::HarmonyFinished => { + let emoji = crate::theme::EMOJI_HARMONY.to_string(); + info!(emoji = emoji.as_str(); "Harmony completed"); } - true + HarmonyEvent::TopologyStateChanged { + topology, + status, + message, + } => match status { + TopologyStatus::Queued => {} + TopologyStatus::Preparing => { + let emoji = format!( + "{}", + style(crate::theme::EMOJI_TOPOLOGY.to_string()).yellow() + ); + info!(emoji = emoji.as_str(); "Preparing environment: {topology}..."); + (*preparing_topology) = true; + } + TopologyStatus::Success => { + (*preparing_topology) = false; + if let Some(message) = message { + info!(status = "finished"; "{message}"); + } + } + TopologyStatus::Noop => { + (*preparing_topology) = false; + if let Some(message) = message { + info!(status = "skipped"; "{message}"); + } + } + TopologyStatus::Error => { + (*preparing_topology) = false; + if let Some(message) = message { + error!(status = "failed"; "{message}"); + } + } + }, + HarmonyEvent::InterpretExecutionStarted { + execution_id: _, + topology: _, + interpret: _, + score, + message, + } => { + if *preparing_topology || current_score.is_some() { + info!("{message}"); + } else { + (*current_score) = Some(score.clone()); + let emoji = format!("{}", style(crate::theme::EMOJI_SCORE).blue()); + info!(emoji = emoji.as_str(); "Interpreting score: {score}..."); + } + } + HarmonyEvent::InterpretExecutionFinished { + execution_id: _, + topology: _, + interpret: _, + score, + outcome, + } => { + if current_score.is_some() && ¤t_score.clone().unwrap() == score { + (*current_score) = None; + } + + match outcome { + Ok(outcome) => match outcome.status { + harmony::interpret::InterpretStatus::SUCCESS => { + info!(status = "finished"; "{}", outcome.message); + } + harmony::interpret::InterpretStatus::NOOP => { + info!(status = "skipped"; "{}", outcome.message); + } + _ => { + error!(status = "failed"; "{}", outcome.message); + } + }, + Err(err) => { + error!(status = "failed"; "{err}"); + } + } + } + HarmonyEvent::ApplicationFeatureStateChanged { + topology: _, + application, + feature, + status, + } => match status { + ApplicationFeatureStatus::Installing => { + info!("Installing feature '{feature}' for '{application}'..."); + } + ApplicationFeatureStatus::Installed => { + info!(status = "finished"; "Feature '{feature}' installed"); + } + ApplicationFeatureStatus::Failed { details } => { + error!(status = "failed"; "Feature '{feature}' installation failed: {details}"); + } + }, } } - }) - .await; + }); } diff --git a/harmony_cli/src/lib.rs b/harmony_cli/src/lib.rs index 53de86e..0bfb1e7 100644 --- a/harmony_cli/src/lib.rs +++ b/harmony_cli/src/lib.rs @@ -115,7 +115,7 @@ pub async fn run_cli( scores: Vec>>, args: Args, ) -> Result<(), Box> { - let cli_logger_handle = cli_logger::init(); + cli_logger::init(); let mut maestro = Maestro::initialize(inventory, topology).await.unwrap(); maestro.register_all(scores); @@ -123,7 +123,6 @@ pub async fn run_cli( let result = init(maestro, args).await; instrumentation::instrument(instrumentation::HarmonyEvent::HarmonyFinished).unwrap(); - let _ = tokio::try_join!(cli_logger_handle); result } diff --git a/harmony_composer/src/harmony_composer_logger.rs b/harmony_composer/src/harmony_composer_logger.rs index 5e0261f..040a167 100644 --- a/harmony_composer/src/harmony_composer_logger.rs +++ b/harmony_composer/src/harmony_composer_logger.rs @@ -1,82 +1,66 @@ use harmony_cli::progress::{IndicatifProgressTracker, ProgressTracker}; use indicatif::MultiProgress; -use std::sync::Arc; use crate::instrumentation::{self, HarmonyComposerEvent}; -pub fn init() -> tokio::task::JoinHandle<()> { +pub fn init() { configure_logger(); - let handle = tokio::spawn(handle_events()); - - loop { - if instrumentation::instrument(HarmonyComposerEvent::HarmonyComposerStarted).is_ok() { - break; - } - } - - handle + handle_events(); } fn configure_logger() { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build(); } -pub async fn handle_events() { - let progress_tracker = Arc::new(IndicatifProgressTracker::new(MultiProgress::new())); +pub fn handle_events() { + let progress_tracker = IndicatifProgressTracker::new(MultiProgress::new()); const SETUP_SECTION: &str = "project-initialization"; const COMPILTATION_TASK: &str = "compilation"; const PROGRESS_DEPLOYMENT: &str = "deployment"; instrumentation::subscribe("Harmony Composer Logger", { - move |event| { - let progress_tracker = Arc::clone(&progress_tracker); - - async move { - match event { - HarmonyComposerEvent::HarmonyComposerStarted => {} - HarmonyComposerEvent::ProjectInitializationStarted => { - progress_tracker.add_section( - SETUP_SECTION, - &format!( - "{} Initializing Harmony project...", - harmony_cli::theme::EMOJI_HARMONY, - ), - ); - } - HarmonyComposerEvent::ProjectInitialized => {} - HarmonyComposerEvent::ProjectCompilationStarted { details } => { - progress_tracker.add_task(SETUP_SECTION, COMPILTATION_TASK, &details); - } - HarmonyComposerEvent::ProjectCompiled => { - progress_tracker.finish_task(COMPILTATION_TASK, "project compiled"); - } - HarmonyComposerEvent::ProjectCompilationFailed { details } => { - progress_tracker.fail_task(COMPILTATION_TASK, &format!("failed to compile project:\n{details}")); - } - HarmonyComposerEvent::DeploymentStarted { target, profile } => { - progress_tracker.add_section( - PROGRESS_DEPLOYMENT, - &format!( - "\n{} Deploying project on target '{target}' with profile '{profile}'...\n", - harmony_cli::theme::EMOJI_DEPLOY, - ), - ); - } - HarmonyComposerEvent::DeploymentCompleted => { - progress_tracker.clear(); - } - HarmonyComposerEvent::DeploymentFailed { details } => { - progress_tracker.add_task(PROGRESS_DEPLOYMENT, "deployment-failed", ""); - progress_tracker.fail_task("deployment-failed", &details); - }, - HarmonyComposerEvent::Shutdown => { - return false; - } - } - true + move |event| match event { + HarmonyComposerEvent::HarmonyComposerStarted => {} + HarmonyComposerEvent::ProjectInitializationStarted => { + progress_tracker.add_section( + SETUP_SECTION, + &format!( + "{} Initializing Harmony project...", + harmony_cli::theme::EMOJI_HARMONY, + ), + ); } + HarmonyComposerEvent::ProjectInitialized => {} + HarmonyComposerEvent::ProjectCompilationStarted { details } => { + progress_tracker.add_task(SETUP_SECTION, COMPILTATION_TASK, details); + } + HarmonyComposerEvent::ProjectCompiled => { + progress_tracker.finish_task(COMPILTATION_TASK, "project compiled"); + } + HarmonyComposerEvent::ProjectCompilationFailed { details } => { + progress_tracker.fail_task( + COMPILTATION_TASK, + &format!("failed to compile project:\n{details}"), + ); + } + HarmonyComposerEvent::DeploymentStarted { target, profile } => { + progress_tracker.add_section( + PROGRESS_DEPLOYMENT, + &format!( + "\n{} Deploying project on target '{target}' with profile '{profile}'...\n", + harmony_cli::theme::EMOJI_DEPLOY, + ), + ); + } + HarmonyComposerEvent::DeploymentCompleted => { + progress_tracker.clear(); + } + HarmonyComposerEvent::DeploymentFailed { details } => { + progress_tracker.add_task(PROGRESS_DEPLOYMENT, "deployment-failed", ""); + progress_tracker.fail_task("deployment-failed", details); + } + HarmonyComposerEvent::Shutdown => {} } }) - .await } diff --git a/harmony_composer/src/instrumentation.rs b/harmony_composer/src/instrumentation.rs index 6f2fa01..b9164b7 100644 --- a/harmony_composer/src/instrumentation.rs +++ b/harmony_composer/src/instrumentation.rs @@ -1,6 +1,5 @@ -use log::debug; use once_cell::sync::Lazy; -use tokio::sync::broadcast; +use std::{collections::HashMap, sync::Mutex}; use crate::{HarmonyProfile, HarmonyTarget}; @@ -27,48 +26,43 @@ pub enum HarmonyComposerEvent { Shutdown, } -static HARMONY_COMPOSER_EVENT_BUS: Lazy> = - Lazy::new(|| { - // TODO: Adjust channel capacity - let (tx, _rx) = broadcast::channel(16); - tx - }); +type Subscriber = Box; -pub fn instrument(event: HarmonyComposerEvent) -> Result<(), &'static str> { - #[cfg(not(test))] - { - match HARMONY_COMPOSER_EVENT_BUS.send(event) { - Ok(_) => Ok(()), - Err(_) => Err("send error: no subscribers"), - } - } +static SUBSCRIBERS: Lazy>> = + Lazy::new(|| Mutex::new(HashMap::new())); - #[cfg(test)] - { - let _ = event; // Suppress the "unused variable" warning for `event` - Ok(()) - } -} - -pub async fn subscribe(name: &str, mut handler: F) +/// Subscribes a listener to all instrumentation events. +/// +/// Simply provide a unique name and a closure to run when an event happens. +/// +/// # Example +/// ``` +/// instrumentation::subscribe("my_logger", |event| { +/// println!("Event occurred: {:?}", event); +/// }); +/// ``` +pub fn subscribe(name: &str, callback: F) where - F: FnMut(HarmonyComposerEvent) -> Fut + Send + 'static, - Fut: Future + Send, + F: Fn(&HarmonyComposerEvent) + Send + Sync + 'static, { - let mut rx = HARMONY_COMPOSER_EVENT_BUS.subscribe(); - debug!("[{name}] Service started. Listening for events..."); - loop { - match rx.recv().await { - Ok(event) => { - if !handler(event).await { - debug!("[{name}] Handler requested exit."); - break; - } - } - Err(broadcast::error::RecvError::Lagged(n)) => { - debug!("[{name}] Lagged behind by {n} messages."); - } - Err(_) => break, - } - } + let mut subs = SUBSCRIBERS.lock().unwrap(); + subs.insert(name.to_string(), Box::new(callback)); +} + +/// Instruments an event, notifying all subscribers. +/// +/// This will call every closure that was registered with `subscribe`. +/// +/// # Example +/// ``` +/// instrumentation::instrument(HarmonyEvent::HarmonyStarted); +/// ``` +pub fn instrument(event: HarmonyComposerEvent) -> Result<(), &'static str> { + let subs = SUBSCRIBERS.lock().unwrap(); + + for callback in subs.values() { + callback(&event); + } + + Ok(()) } diff --git a/harmony_composer/src/main.rs b/harmony_composer/src/main.rs index f0a8513..4119460 100644 --- a/harmony_composer/src/main.rs +++ b/harmony_composer/src/main.rs @@ -99,7 +99,7 @@ impl std::fmt::Display for HarmonyProfile { #[tokio::main] async fn main() { - let hc_logger_handle = harmony_composer_logger::init(); + harmony_composer_logger::init(); let cli_args = GlobalArgs::parse(); let harmony_path = Path::new(&cli_args.harmony_path) @@ -199,8 +199,6 @@ async fn main() { } instrumentation::instrument(HarmonyComposerEvent::Shutdown).unwrap(); - - let _ = tokio::try_join!(hc_logger_handle); } #[derive(Clone, Debug, clap::ValueEnum)] diff --git a/harmony_tui/src/lib.rs b/harmony_tui/src/lib.rs index 4fb4591..35b5dea 100644 --- a/harmony_tui/src/lib.rs +++ b/harmony_tui/src/lib.rs @@ -94,13 +94,9 @@ async fn init_instrumentation() -> tokio::task::JoinHandle<()> { } async fn handle_harmony_events() { - instrumentation::subscribe("Harmony TUI Logger", async |event| { - if let HarmonyEvent::HarmonyFinished = event { - return false; - }; - true - }) - .await; + instrumentation::subscribe("Harmony TUI Logger", |_| { + // TODO: Display events in the TUI + }); } pub struct HarmonyTUI {