Compare commits

..

No commits in common. "b85741215186f2aa94054e48bc4c8ff60a4fc2c1" and "78b80c2169805a26c80bce0b1b23c708e8e57c2c" have entirely different histories.

18 changed files with 352 additions and 385 deletions

2
Cargo.lock generated
View File

@ -1732,6 +1732,7 @@ dependencies = [
name = "example-pxe" name = "example-pxe"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"askama",
"cidr", "cidr",
"env_logger", "env_logger",
"harmony", "harmony",
@ -2168,7 +2169,6 @@ dependencies = [
name = "harmony" name = "harmony"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"askama",
"async-trait", "async-trait",
"base64 0.22.1", "base64 0.22.1",
"bollard", "bollard",

View File

@ -18,4 +18,5 @@ harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true } log = { workspace = true }
env_logger = { workspace = true } env_logger = { workspace = true }
url = { workspace = true } url = { workspace = true }
askama = "0.14.0"
serde.workspace = true serde.workspace = true

View File

@ -1,24 +1,97 @@
mod topology; mod topology;
use std::net::IpAddr;
use askama::Template;
use harmony::{
data::{FileContent, FilePath},
modules::{dhcp::DhcpScore, http::StaticFilesHttpScore, tftp::TftpScore},
score::Score,
topology::{HAClusterTopology, Url},
};
use crate::topology::{get_inventory, get_topology}; use crate::topology::{get_inventory, get_topology};
use harmony::modules::okd::ipxe::OkdIpxeScore;
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let inventory = get_inventory(); let inventory = get_inventory();
let topology = get_topology().await; let topology = get_topology().await;
let gateway_ip = &topology.router.get_gateway();
let kickstart_filename = "inventory.kickstart".to_string(); let kickstart_filename = "inventory.kickstart";
let cluster_pubkey_filename = "cluster_ssh_key.pub".to_string(); let cluster_pubkey_filename = "cluster_ssh_key.pub";
let harmony_inventory_agent = "harmony_inventory_agent".to_string(); let harmony_inventory_agent = "harmony_inventory_agent";
let ipxe_score = OkdIpxeScore { // TODO: this should be a single IPXEScore instead of having the user do this step by step
kickstart_filename, let scores: Vec<Box<dyn Score<HAClusterTopology>>> = vec![
harmony_inventory_agent, Box::new(DhcpScore {
cluster_pubkey_filename, host_binding: vec![],
}; next_server: Some(topology.router.get_gateway()),
boot_filename: None,
filename: Some("undionly.kpxe".to_string()),
filename64: Some("ipxe.efi".to_string()),
filenameipxe: Some(format!("http://{gateway_ip}:8080/boot.ipxe").to_string()),
}),
Box::new(TftpScore {
files_to_serve: Url::LocalFolder("./data/pxe/okd/tftpboot/".to_string()),
}),
Box::new(StaticFilesHttpScore {
// TODO The current russh based copy is way too slow, check for a lib update or use scp
// when available
//
// For now just run :
// scp -r data/pxe/okd/http_files/* root@192.168.1.1:/usr/local/http/
//
folder_to_serve: None,
// folder_to_serve: Some(Url::LocalFolder("./data/pxe/okd/http_files/".to_string())),
files: vec![
FileContent {
path: FilePath::Relative("boot.ipxe".to_string()),
content: BootIpxeTpl { gateway_ip }.to_string(),
},
FileContent {
path: FilePath::Relative(kickstart_filename.to_string()),
content: InventoryKickstartTpl {
gateway_ip,
harmony_inventory_agent,
cluster_pubkey_filename,
}
.to_string(),
},
FileContent {
path: FilePath::Relative("fallback.ipxe".to_string()),
content: FallbackIpxeTpl {
gateway_ip,
kickstart_filename,
}
.to_string(),
},
],
}),
];
harmony_cli::run(inventory, topology, vec![Box::new(ipxe_score)], None) harmony_cli::run(inventory, topology, scores, None)
.await .await
.unwrap(); .unwrap();
} }
#[derive(Template)]
#[template(path = "boot.ipxe.j2")]
struct BootIpxeTpl<'a> {
gateway_ip: &'a IpAddr,
}
#[derive(Template)]
#[template(path = "fallback.ipxe.j2")]
struct FallbackIpxeTpl<'a> {
gateway_ip: &'a IpAddr,
kickstart_filename: &'a str,
}
#[derive(Template)]
#[template(path = "inventory.kickstart.j2")]
struct InventoryKickstartTpl<'a> {
gateway_ip: &'a IpAddr,
cluster_pubkey_filename: &'a str,
harmony_inventory_agent: &'a str,
}

View File

@ -1,3 +1,5 @@
use std::{net::IpAddr, sync::Arc};
use cidr::Ipv4Cidr; use cidr::Ipv4Cidr;
use harmony::{ use harmony::{
hardware::{FirewallGroup, HostCategory, Location, PhysicalHost, SwitchGroup}, hardware::{FirewallGroup, HostCategory, Location, PhysicalHost, SwitchGroup},
@ -8,7 +10,6 @@ use harmony::{
use harmony_macros::{ip, ipv4}; use harmony_macros::{ip, ipv4};
use harmony_secret::{Secret, SecretManager}; use harmony_secret::{Secret, SecretManager};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::{net::IpAddr, sync::Arc};
#[derive(Secret, Serialize, Deserialize, Debug, PartialEq)] #[derive(Secret, Serialize, Deserialize, Debug, PartialEq)]
struct OPNSenseFirewallConfig { struct OPNSenseFirewallConfig {

View File

@ -69,7 +69,6 @@ base64.workspace = true
once_cell = "1.21.3" once_cell = "1.21.3"
harmony_inventory_agent = { path = "../harmony_inventory_agent" } harmony_inventory_agent = { path = "../harmony_inventory_agent" }
harmony_secret_derive = { version = "0.1.0", path = "../harmony_secret_derive" } harmony_secret_derive = { version = "0.1.0", path = "../harmony_secret_derive" }
askama = "0.14.0"
[dev-dependencies] [dev-dependencies]
pretty_assertions.workspace = true pretty_assertions.workspace = true

View File

@ -1,5 +1,6 @@
use log::debug;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::{collections::HashMap, sync::Mutex}; use tokio::sync::broadcast;
use crate::modules::application::ApplicationFeatureStatus; use crate::modules::application::ApplicationFeatureStatus;
@ -39,43 +40,43 @@ pub enum HarmonyEvent {
}, },
} }
type Subscriber = Box<dyn Fn(&HarmonyEvent) + Send + Sync>; static HARMONY_EVENT_BUS: Lazy<broadcast::Sender<HarmonyEvent>> = Lazy::new(|| {
// TODO: Adjust channel capacity
let (tx, _rx) = broadcast::channel(100);
tx
});
static SUBSCRIBERS: Lazy<Mutex<HashMap<String, Subscriber>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
/// Subscribes a listener to all instrumentation events.
///
/// Simply provide a unique name and a closure to run when an event happens.
///
/// # Example
/// ```
/// instrumentation::subscribe("my_logger", |event| {
/// println!("Event occurred: {:?}", event);
/// });
/// ```
pub fn subscribe<F>(name: &str, callback: F)
where
F: Fn(&HarmonyEvent) + Send + Sync + 'static,
{
let mut subs = SUBSCRIBERS.lock().unwrap();
subs.insert(name.to_string(), Box::new(callback));
}
/// Instruments an event, notifying all subscribers.
///
/// This will call every closure that was registered with `subscribe`.
///
/// # Example
/// ```
/// instrumentation::instrument(HarmonyEvent::HarmonyStarted);
/// ```
pub fn instrument(event: HarmonyEvent) -> Result<(), &'static str> { pub fn instrument(event: HarmonyEvent) -> Result<(), &'static str> {
let subs = SUBSCRIBERS.lock().unwrap(); if cfg!(any(test, feature = "testing")) {
let _ = event; // Suppress the "unused variable" warning for `event`
for callback in subs.values() { Ok(())
callback(&event); } else {
match HARMONY_EVENT_BUS.send(event) {
Ok(_) => Ok(()),
Err(_) => Err("send error: no subscribers"),
}
}
}
pub async fn subscribe<F, Fut>(name: &str, mut handler: F)
where
F: FnMut(HarmonyEvent) -> Fut + Send + 'static,
Fut: Future<Output = bool> + Send,
{
let mut rx = HARMONY_EVENT_BUS.subscribe();
debug!("[{name}] Service started. Listening for events...");
loop {
match rx.recv().await {
Ok(event) => {
if !handler(event).await {
debug!("[{name}] Handler requested exit.");
break;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
debug!("[{name}] Lagged behind by {n} messages.");
}
Err(_) => break,
}
} }
Ok(())
} }

View File

@ -74,7 +74,6 @@ impl<T: Topology> Maestro<T> {
fn is_topology_initialized(&self) -> bool { fn is_topology_initialized(&self) -> bool {
self.topology_state.status == TopologyStatus::Success self.topology_state.status == TopologyStatus::Success
|| self.topology_state.status == TopologyStatus::Noop
} }
pub async fn interpret(&self, score: Box<dyn Score<T>>) -> Result<Outcome, InterpretError> { pub async fn interpret(&self, score: Box<dyn Score<T>>) -> Result<Outcome, InterpretError> {

View File

@ -1,148 +0,0 @@
use askama::Template;
use async_trait::async_trait;
use derive_new::new;
use serde::Serialize;
use std::net::IpAddr;
use crate::{
data::{FileContent, FilePath, Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
modules::{dhcp::DhcpScore, http::StaticFilesHttpScore, tftp::TftpScore},
score::Score,
topology::{DhcpServer, HttpServer, Router, TftpServer, Topology, Url},
};
#[derive(Debug, new, Clone, Serialize)]
pub struct OkdIpxeScore {
pub kickstart_filename: String,
pub harmony_inventory_agent: String,
pub cluster_pubkey_filename: String,
}
impl<T: Topology + DhcpServer + TftpServer + HttpServer + Router> Score<T> for OkdIpxeScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(IpxeInterpret::new(self.clone()))
}
fn name(&self) -> String {
"OkdIpxeScore".to_string()
}
}
#[derive(Debug, new, Clone)]
pub struct IpxeInterpret {
score: OkdIpxeScore,
}
#[async_trait]
impl<T: Topology + DhcpServer + TftpServer + HttpServer + Router> Interpret<T> for IpxeInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let gateway_ip = topology.get_gateway();
let scores: Vec<Box<dyn Score<T>>> = vec![
Box::new(DhcpScore {
host_binding: vec![],
next_server: Some(topology.get_gateway()),
boot_filename: None,
filename: Some("undionly.kpxe".to_string()),
filename64: Some("ipxe.efi".to_string()),
filenameipxe: Some(format!("http://{gateway_ip}:8080/boot.ipxe").to_string()),
}),
Box::new(TftpScore {
files_to_serve: Url::LocalFolder("./data/pxe/okd/tftpboot/".to_string()),
}),
Box::new(StaticFilesHttpScore {
// TODO The current russh based copy is way too slow, check for a lib update or use scp
// when available
//
// For now just run :
// scp -r data/pxe/okd/http_files/* root@192.168.1.1:/usr/local/http/
//
folder_to_serve: None,
// folder_to_serve: Some(Url::LocalFolder("./data/pxe/okd/http_files/".to_string())),
files: vec![
FileContent {
path: FilePath::Relative("boot.ipxe".to_string()),
content: BootIpxeTpl {
gateway_ip: &gateway_ip,
}
.to_string(),
},
FileContent {
path: FilePath::Relative(self.score.kickstart_filename.clone()),
content: InventoryKickstartTpl {
gateway_ip: &gateway_ip,
harmony_inventory_agent: &self.score.harmony_inventory_agent,
cluster_pubkey_filename: &self.score.cluster_pubkey_filename,
}
.to_string(),
},
FileContent {
path: FilePath::Relative("fallback.ipxe".to_string()),
content: FallbackIpxeTpl {
gateway_ip: &gateway_ip,
kickstart_filename: &self.score.kickstart_filename,
}
.to_string(),
},
],
}),
];
for score in scores {
let result = score.interpret(inventory, topology).await;
match result {
Ok(outcome) => match outcome.status {
InterpretStatus::SUCCESS => continue,
InterpretStatus::NOOP => continue,
_ => return Err(InterpretError::new(outcome.message)),
},
Err(e) => return Err(e),
};
}
Ok(Outcome::success("Ipxe installed".to_string()))
}
fn get_name(&self) -> InterpretName {
InterpretName::Ipxe
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
#[derive(Template)]
#[template(path = "boot.ipxe.j2")]
struct BootIpxeTpl<'a> {
gateway_ip: &'a IpAddr,
}
#[derive(Template)]
#[template(path = "fallback.ipxe.j2")]
struct FallbackIpxeTpl<'a> {
gateway_ip: &'a IpAddr,
kickstart_filename: &'a str,
}
#[derive(Template)]
#[template(path = "inventory.kickstart.j2")]
struct InventoryKickstartTpl<'a> {
gateway_ip: &'a IpAddr,
cluster_pubkey_filename: &'a str,
harmony_inventory_agent: &'a str,
}

View File

@ -2,6 +2,5 @@ pub mod bootstrap_dhcp;
pub mod bootstrap_load_balancer; pub mod bootstrap_load_balancer;
pub mod dhcp; pub mod dhcp;
pub mod dns; pub mod dns;
pub mod ipxe;
pub mod load_balancer; pub mod load_balancer;
pub mod upgrade; pub mod upgrade;

View File

@ -7,11 +7,19 @@ use harmony::{
}; };
use log::{error, info, log_enabled}; use log::{error, info, log_enabled};
use std::io::Write; use std::io::Write;
use std::sync::Mutex; use std::sync::{Arc, Mutex};
pub fn init() { pub fn init() -> tokio::task::JoinHandle<()> {
configure_logger(); configure_logger();
handle_events(); let handle = tokio::spawn(handle_events());
loop {
if instrumentation::instrument(HarmonyEvent::HarmonyStarted).is_ok() {
break;
}
}
handle
} }
fn configure_logger() { fn configure_logger() {
@ -78,114 +86,119 @@ fn configure_logger() {
.init(); .init();
} }
fn handle_events() { async fn handle_events() {
let preparing_topology = Mutex::new(false); let preparing_topology = Arc::new(Mutex::new(false));
let current_score: Mutex<Option<String>> = Mutex::new(None); let current_score: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
instrumentation::subscribe("Harmony CLI Logger", { instrumentation::subscribe("Harmony CLI Logger", {
move |event| { move |event| {
let mut preparing_topology = preparing_topology.lock().unwrap(); let preparing_topology = Arc::clone(&preparing_topology);
let mut current_score = current_score.lock().unwrap(); let current_score = Arc::clone(&current_score);
match event { async move {
HarmonyEvent::HarmonyStarted => {} let mut preparing_topology = preparing_topology.lock().unwrap();
HarmonyEvent::HarmonyFinished => { let mut current_score = current_score.lock().unwrap();
let emoji = crate::theme::EMOJI_HARMONY.to_string();
info!(emoji = emoji.as_str(); "Harmony completed");
}
HarmonyEvent::TopologyStateChanged {
topology,
status,
message,
} => match status {
TopologyStatus::Queued => {}
TopologyStatus::Preparing => {
let emoji = format!(
"{}",
style(crate::theme::EMOJI_TOPOLOGY.to_string()).yellow()
);
info!(emoji = emoji.as_str(); "Preparing environment: {topology}...");
(*preparing_topology) = true;
}
TopologyStatus::Success => {
(*preparing_topology) = false;
if let Some(message) = message {
info!(status = "finished"; "{message}");
}
}
TopologyStatus::Noop => {
(*preparing_topology) = false;
if let Some(message) = message {
info!(status = "skipped"; "{message}");
}
}
TopologyStatus::Error => {
(*preparing_topology) = false;
if let Some(message) = message {
error!(status = "failed"; "{message}");
}
}
},
HarmonyEvent::InterpretExecutionStarted {
execution_id: _,
topology: _,
interpret: _,
score,
message,
} => {
if *preparing_topology || current_score.is_some() {
info!("{message}");
} else {
(*current_score) = Some(score.clone());
let emoji = format!("{}", style(crate::theme::EMOJI_SCORE).blue());
info!(emoji = emoji.as_str(); "Interpreting score: {score}...");
}
}
HarmonyEvent::InterpretExecutionFinished {
execution_id: _,
topology: _,
interpret: _,
score,
outcome,
} => {
if current_score.is_some() && &current_score.clone().unwrap() == score {
(*current_score) = None;
}
match outcome { match event {
Ok(outcome) => match outcome.status { HarmonyEvent::HarmonyStarted => {}
harmony::interpret::InterpretStatus::SUCCESS => { HarmonyEvent::HarmonyFinished => {
info!(status = "finished"; "{}", outcome.message); let emoji = crate::theme::EMOJI_HARMONY.to_string();
info!(emoji = emoji.as_str(); "Harmony completed");
return false;
}
HarmonyEvent::TopologyStateChanged {
topology,
status,
message,
} => match status {
TopologyStatus::Queued => {}
TopologyStatus::Preparing => {
let emoji = format!("{}", style(crate::theme::EMOJI_TOPOLOGY.to_string()).yellow());
info!(emoji = emoji.as_str(); "Preparing environment: {topology}...");
(*preparing_topology) = true;
}
TopologyStatus::Success => {
(*preparing_topology) = false;
if let Some(message) = message {
info!(status = "finished"; "{message}");
} }
harmony::interpret::InterpretStatus::NOOP => { }
info!(status = "skipped"; "{}", outcome.message); TopologyStatus::Noop => {
(*preparing_topology) = false;
if let Some(message) = message {
info!(status = "skipped"; "{message}");
} }
_ => { }
error!(status = "failed"; "{}", outcome.message); TopologyStatus::Error => {
(*preparing_topology) = false;
if let Some(message) = message {
error!(status = "failed"; "{message}");
} }
}, }
Err(err) => { },
error!(status = "failed"; "{err}"); HarmonyEvent::InterpretExecutionStarted {
execution_id: _,
topology: _,
interpret: _,
score,
message,
} => {
if *preparing_topology || current_score.is_some() {
info!("{message}");
} else {
(*current_score) = Some(score.clone());
let emoji = format!("{}", style(crate::theme::EMOJI_SCORE).blue());
info!(emoji = emoji.as_str(); "Interpreting score: {score}...");
} }
} }
HarmonyEvent::InterpretExecutionFinished {
execution_id: _,
topology: _,
interpret: _,
score,
outcome,
} => {
if current_score.is_some() && current_score.clone().unwrap() == score {
(*current_score) = None;
}
match outcome {
Ok(outcome) => match outcome.status {
harmony::interpret::InterpretStatus::SUCCESS => {
info!(status = "finished"; "{}", outcome.message);
}
harmony::interpret::InterpretStatus::NOOP => {
info!(status = "skipped"; "{}", outcome.message);
}
_ => {
error!(status = "failed"; "{}", outcome.message);
}
},
Err(err) => {
error!(status = "failed"; "{}", err);
}
}
}
HarmonyEvent::ApplicationFeatureStateChanged {
topology: _,
application,
feature,
status,
} => match status {
ApplicationFeatureStatus::Installing => {
info!("Installing feature '{}' for '{}'...", feature, application);
}
ApplicationFeatureStatus::Installed => {
info!(status = "finished"; "Feature '{}' installed", feature);
}
ApplicationFeatureStatus::Failed { details } => {
error!(status = "failed"; "Feature '{}' installation failed: {}", feature, details);
}
},
} }
HarmonyEvent::ApplicationFeatureStateChanged { true
topology: _,
application,
feature,
status,
} => match status {
ApplicationFeatureStatus::Installing => {
info!("Installing feature '{feature}' for '{application}'...");
}
ApplicationFeatureStatus::Installed => {
info!(status = "finished"; "Feature '{feature}' installed");
}
ApplicationFeatureStatus::Failed { details } => {
error!(status = "failed"; "Feature '{feature}' installation failed: {details}");
}
},
} }
} }
}); })
.await;
} }

View File

@ -115,7 +115,7 @@ pub async fn run_cli<T: Topology + Send + Sync + 'static>(
scores: Vec<Box<dyn Score<T>>>, scores: Vec<Box<dyn Score<T>>>,
args: Args, args: Args,
) -> Result<(), Box<dyn std::error::Error>> { ) -> Result<(), Box<dyn std::error::Error>> {
cli_logger::init(); let cli_logger_handle = cli_logger::init();
let mut maestro = Maestro::initialize(inventory, topology).await.unwrap(); let mut maestro = Maestro::initialize(inventory, topology).await.unwrap();
maestro.register_all(scores); maestro.register_all(scores);
@ -123,6 +123,7 @@ pub async fn run_cli<T: Topology + Send + Sync + 'static>(
let result = init(maestro, args).await; let result = init(maestro, args).await;
instrumentation::instrument(instrumentation::HarmonyEvent::HarmonyFinished).unwrap(); instrumentation::instrument(instrumentation::HarmonyEvent::HarmonyFinished).unwrap();
let _ = tokio::try_join!(cli_logger_handle);
result result
} }

View File

@ -1,66 +1,82 @@
use harmony_cli::progress::{IndicatifProgressTracker, ProgressTracker}; use harmony_cli::progress::{IndicatifProgressTracker, ProgressTracker};
use indicatif::MultiProgress; use indicatif::MultiProgress;
use std::sync::Arc;
use crate::instrumentation::{self, HarmonyComposerEvent}; use crate::instrumentation::{self, HarmonyComposerEvent};
pub fn init() { pub fn init() -> tokio::task::JoinHandle<()> {
configure_logger(); configure_logger();
handle_events(); let handle = tokio::spawn(handle_events());
loop {
if instrumentation::instrument(HarmonyComposerEvent::HarmonyComposerStarted).is_ok() {
break;
}
}
handle
} }
fn configure_logger() { fn configure_logger() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build(); env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build();
} }
pub fn handle_events() { pub async fn handle_events() {
let progress_tracker = IndicatifProgressTracker::new(MultiProgress::new()); let progress_tracker = Arc::new(IndicatifProgressTracker::new(MultiProgress::new()));
const SETUP_SECTION: &str = "project-initialization"; const SETUP_SECTION: &str = "project-initialization";
const COMPILTATION_TASK: &str = "compilation"; const COMPILTATION_TASK: &str = "compilation";
const PROGRESS_DEPLOYMENT: &str = "deployment"; const PROGRESS_DEPLOYMENT: &str = "deployment";
instrumentation::subscribe("Harmony Composer Logger", { instrumentation::subscribe("Harmony Composer Logger", {
move |event| match event { move |event| {
HarmonyComposerEvent::HarmonyComposerStarted => {} let progress_tracker = Arc::clone(&progress_tracker);
HarmonyComposerEvent::ProjectInitializationStarted => {
progress_tracker.add_section( async move {
SETUP_SECTION, match event {
&format!( HarmonyComposerEvent::HarmonyComposerStarted => {}
"{} Initializing Harmony project...", HarmonyComposerEvent::ProjectInitializationStarted => {
harmony_cli::theme::EMOJI_HARMONY, progress_tracker.add_section(
), SETUP_SECTION,
); &format!(
"{} Initializing Harmony project...",
harmony_cli::theme::EMOJI_HARMONY,
),
);
}
HarmonyComposerEvent::ProjectInitialized => {}
HarmonyComposerEvent::ProjectCompilationStarted { details } => {
progress_tracker.add_task(SETUP_SECTION, COMPILTATION_TASK, &details);
}
HarmonyComposerEvent::ProjectCompiled => {
progress_tracker.finish_task(COMPILTATION_TASK, "project compiled");
}
HarmonyComposerEvent::ProjectCompilationFailed { details } => {
progress_tracker.fail_task(COMPILTATION_TASK, &format!("failed to compile project:\n{details}"));
}
HarmonyComposerEvent::DeploymentStarted { target, profile } => {
progress_tracker.add_section(
PROGRESS_DEPLOYMENT,
&format!(
"\n{} Deploying project on target '{target}' with profile '{profile}'...\n",
harmony_cli::theme::EMOJI_DEPLOY,
),
);
}
HarmonyComposerEvent::DeploymentCompleted => {
progress_tracker.clear();
}
HarmonyComposerEvent::DeploymentFailed { details } => {
progress_tracker.add_task(PROGRESS_DEPLOYMENT, "deployment-failed", "");
progress_tracker.fail_task("deployment-failed", &details);
},
HarmonyComposerEvent::Shutdown => {
return false;
}
}
true
} }
HarmonyComposerEvent::ProjectInitialized => {}
HarmonyComposerEvent::ProjectCompilationStarted { details } => {
progress_tracker.add_task(SETUP_SECTION, COMPILTATION_TASK, details);
}
HarmonyComposerEvent::ProjectCompiled => {
progress_tracker.finish_task(COMPILTATION_TASK, "project compiled");
}
HarmonyComposerEvent::ProjectCompilationFailed { details } => {
progress_tracker.fail_task(
COMPILTATION_TASK,
&format!("failed to compile project:\n{details}"),
);
}
HarmonyComposerEvent::DeploymentStarted { target, profile } => {
progress_tracker.add_section(
PROGRESS_DEPLOYMENT,
&format!(
"\n{} Deploying project on target '{target}' with profile '{profile}'...\n",
harmony_cli::theme::EMOJI_DEPLOY,
),
);
}
HarmonyComposerEvent::DeploymentCompleted => {
progress_tracker.clear();
}
HarmonyComposerEvent::DeploymentFailed { details } => {
progress_tracker.add_task(PROGRESS_DEPLOYMENT, "deployment-failed", "");
progress_tracker.fail_task("deployment-failed", details);
}
HarmonyComposerEvent::Shutdown => {}
} }
}) })
.await
} }

View File

@ -1,5 +1,6 @@
use log::debug;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use std::{collections::HashMap, sync::Mutex}; use tokio::sync::broadcast;
use crate::{HarmonyProfile, HarmonyTarget}; use crate::{HarmonyProfile, HarmonyTarget};
@ -26,43 +27,48 @@ pub enum HarmonyComposerEvent {
Shutdown, Shutdown,
} }
type Subscriber = Box<dyn Fn(&HarmonyComposerEvent) + Send + Sync>; static HARMONY_COMPOSER_EVENT_BUS: Lazy<broadcast::Sender<HarmonyComposerEvent>> =
Lazy::new(|| {
// TODO: Adjust channel capacity
let (tx, _rx) = broadcast::channel(16);
tx
});
static SUBSCRIBERS: Lazy<Mutex<HashMap<String, Subscriber>>> =
Lazy::new(|| Mutex::new(HashMap::new()));
/// Subscribes a listener to all instrumentation events.
///
/// Simply provide a unique name and a closure to run when an event happens.
///
/// # Example
/// ```
/// instrumentation::subscribe("my_logger", |event| {
/// println!("Event occurred: {:?}", event);
/// });
/// ```
pub fn subscribe<F>(name: &str, callback: F)
where
F: Fn(&HarmonyComposerEvent) + Send + Sync + 'static,
{
let mut subs = SUBSCRIBERS.lock().unwrap();
subs.insert(name.to_string(), Box::new(callback));
}
/// Instruments an event, notifying all subscribers.
///
/// This will call every closure that was registered with `subscribe`.
///
/// # Example
/// ```
/// instrumentation::instrument(HarmonyEvent::HarmonyStarted);
/// ```
pub fn instrument(event: HarmonyComposerEvent) -> Result<(), &'static str> { pub fn instrument(event: HarmonyComposerEvent) -> Result<(), &'static str> {
let subs = SUBSCRIBERS.lock().unwrap(); #[cfg(not(test))]
{
for callback in subs.values() { match HARMONY_COMPOSER_EVENT_BUS.send(event) {
callback(&event); Ok(_) => Ok(()),
Err(_) => Err("send error: no subscribers"),
}
} }
Ok(()) #[cfg(test)]
{
let _ = event; // Suppress the "unused variable" warning for `event`
Ok(())
}
}
pub async fn subscribe<F, Fut>(name: &str, mut handler: F)
where
F: FnMut(HarmonyComposerEvent) -> Fut + Send + 'static,
Fut: Future<Output = bool> + Send,
{
let mut rx = HARMONY_COMPOSER_EVENT_BUS.subscribe();
debug!("[{name}] Service started. Listening for events...");
loop {
match rx.recv().await {
Ok(event) => {
if !handler(event).await {
debug!("[{name}] Handler requested exit.");
break;
}
}
Err(broadcast::error::RecvError::Lagged(n)) => {
debug!("[{name}] Lagged behind by {n} messages.");
}
Err(_) => break,
}
}
} }

View File

@ -99,7 +99,7 @@ impl std::fmt::Display for HarmonyProfile {
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
harmony_composer_logger::init(); let hc_logger_handle = harmony_composer_logger::init();
let cli_args = GlobalArgs::parse(); let cli_args = GlobalArgs::parse();
let harmony_path = Path::new(&cli_args.harmony_path) let harmony_path = Path::new(&cli_args.harmony_path)
@ -199,6 +199,8 @@ async fn main() {
} }
instrumentation::instrument(HarmonyComposerEvent::Shutdown).unwrap(); instrumentation::instrument(HarmonyComposerEvent::Shutdown).unwrap();
let _ = tokio::try_join!(hc_logger_handle);
} }
#[derive(Clone, Debug, clap::ValueEnum)] #[derive(Clone, Debug, clap::ValueEnum)]

View File

@ -94,9 +94,13 @@ async fn init_instrumentation() -> tokio::task::JoinHandle<()> {
} }
async fn handle_harmony_events() { async fn handle_harmony_events() {
instrumentation::subscribe("Harmony TUI Logger", |_| { instrumentation::subscribe("Harmony TUI Logger", async |event| {
// TODO: Display events in the TUI if let HarmonyEvent::HarmonyFinished = event {
}); return false;
};
true
})
.await;
} }
pub struct HarmonyTUI<T: Topology> { pub struct HarmonyTUI<T: Topology> {