make instrumentation sync instead of async to avoid concurrency issues
This commit is contained in:
@@ -1,6 +1,5 @@
|
||||
use log::debug;
|
||||
use once_cell::sync::Lazy;
|
||||
use tokio::sync::broadcast;
|
||||
use std::{collections::HashMap, sync::Mutex};
|
||||
|
||||
use crate::modules::application::ApplicationFeatureStatus;
|
||||
|
||||
@@ -40,43 +39,43 @@ pub enum HarmonyEvent {
|
||||
},
|
||||
}
|
||||
|
||||
static HARMONY_EVENT_BUS: Lazy<broadcast::Sender<HarmonyEvent>> = Lazy::new(|| {
|
||||
// TODO: Adjust channel capacity
|
||||
let (tx, _rx) = broadcast::channel(100);
|
||||
tx
|
||||
});
|
||||
type Subscriber = Box<dyn Fn(&HarmonyEvent) + Send + Sync>;
|
||||
|
||||
pub fn instrument(event: HarmonyEvent) -> Result<(), &'static str> {
|
||||
if cfg!(any(test, feature = "testing")) {
|
||||
let _ = event; // Suppress the "unused variable" warning for `event`
|
||||
Ok(())
|
||||
} else {
|
||||
match HARMONY_EVENT_BUS.send(event) {
|
||||
Ok(_) => Ok(()),
|
||||
Err(_) => Err("send error: no subscribers"),
|
||||
}
|
||||
}
|
||||
}
|
||||
static SUBSCRIBERS: Lazy<Mutex<HashMap<String, Subscriber>>> =
|
||||
Lazy::new(|| Mutex::new(HashMap::new()));
|
||||
|
||||
pub async fn subscribe<F, Fut>(name: &str, mut handler: F)
|
||||
/// Subscribes a listener to all instrumentation events.
|
||||
///
|
||||
/// Simply provide a unique name and a closure to run when an event happens.
|
||||
///
|
||||
/// # Example
|
||||
/// ```
|
||||
/// instrumentation::subscribe("my_logger", |event| {
|
||||
/// println!("Event occurred: {:?}", event);
|
||||
/// });
|
||||
/// ```
|
||||
pub fn subscribe<F>(name: &str, callback: F)
|
||||
where
|
||||
F: FnMut(HarmonyEvent) -> Fut + Send + 'static,
|
||||
Fut: Future<Output = bool> + Send,
|
||||
F: Fn(&HarmonyEvent) + Send + Sync + 'static,
|
||||
{
|
||||
let mut rx = HARMONY_EVENT_BUS.subscribe();
|
||||
debug!("[{name}] Service started. Listening for events...");
|
||||
loop {
|
||||
match rx.recv().await {
|
||||
Ok(event) => {
|
||||
if !handler(event).await {
|
||||
debug!("[{name}] Handler requested exit.");
|
||||
break;
|
||||
}
|
||||
}
|
||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
||||
debug!("[{name}] Lagged behind by {n} messages.");
|
||||
}
|
||||
Err(_) => break,
|
||||
}
|
||||
}
|
||||
let mut subs = SUBSCRIBERS.lock().unwrap();
|
||||
subs.insert(name.to_string(), Box::new(callback));
|
||||
}
|
||||
|
||||
/// Instruments an event, notifying all subscribers.
|
||||
///
|
||||
/// This will call every closure that was registered with `subscribe`.
|
||||
///
|
||||
/// # Example
|
||||
/// ```
|
||||
/// instrumentation::instrument(HarmonyEvent::HarmonyStarted);
|
||||
/// ```
|
||||
pub fn instrument(event: HarmonyEvent) -> Result<(), &'static str> {
|
||||
let subs = SUBSCRIBERS.lock().unwrap();
|
||||
|
||||
for callback in subs.values() {
|
||||
callback(&event);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -74,6 +74,7 @@ impl<T: Topology> Maestro<T> {
|
||||
|
||||
fn is_topology_initialized(&self) -> bool {
|
||||
self.topology_state.status == TopologyStatus::Success
|
||||
|| self.topology_state.status == TopologyStatus::Noop
|
||||
}
|
||||
|
||||
pub async fn interpret(&self, score: Box<dyn Score<T>>) -> Result<Outcome, InterpretError> {
|
||||
|
||||
Reference in New Issue
Block a user