Compare commits
2 Commits
78b80c2169
...
b857412151
| Author | SHA1 | Date | |
|---|---|---|---|
| b857412151 | |||
| 7bb3602ab8 |
2
Cargo.lock
generated
2
Cargo.lock
generated
@ -1732,7 +1732,6 @@ dependencies = [
|
|||||||
name = "example-pxe"
|
name = "example-pxe"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
"askama",
|
|
||||||
"cidr",
|
"cidr",
|
||||||
"env_logger",
|
"env_logger",
|
||||||
"harmony",
|
"harmony",
|
||||||
@ -2169,6 +2168,7 @@ dependencies = [
|
|||||||
name = "harmony"
|
name = "harmony"
|
||||||
version = "0.1.0"
|
version = "0.1.0"
|
||||||
dependencies = [
|
dependencies = [
|
||||||
|
"askama",
|
||||||
"async-trait",
|
"async-trait",
|
||||||
"base64 0.22.1",
|
"base64 0.22.1",
|
||||||
"bollard",
|
"bollard",
|
||||||
|
|||||||
@ -18,5 +18,4 @@ harmony_macros = { path = "../../harmony_macros" }
|
|||||||
log = { workspace = true }
|
log = { workspace = true }
|
||||||
env_logger = { workspace = true }
|
env_logger = { workspace = true }
|
||||||
url = { workspace = true }
|
url = { workspace = true }
|
||||||
askama = "0.14.0"
|
|
||||||
serde.workspace = true
|
serde.workspace = true
|
||||||
|
|||||||
@ -1,97 +1,24 @@
|
|||||||
mod topology;
|
mod topology;
|
||||||
|
|
||||||
use std::net::IpAddr;
|
|
||||||
|
|
||||||
use askama::Template;
|
|
||||||
use harmony::{
|
|
||||||
data::{FileContent, FilePath},
|
|
||||||
modules::{dhcp::DhcpScore, http::StaticFilesHttpScore, tftp::TftpScore},
|
|
||||||
score::Score,
|
|
||||||
topology::{HAClusterTopology, Url},
|
|
||||||
};
|
|
||||||
|
|
||||||
use crate::topology::{get_inventory, get_topology};
|
use crate::topology::{get_inventory, get_topology};
|
||||||
|
use harmony::modules::okd::ipxe::OkdIpxeScore;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
let inventory = get_inventory();
|
let inventory = get_inventory();
|
||||||
let topology = get_topology().await;
|
let topology = get_topology().await;
|
||||||
let gateway_ip = &topology.router.get_gateway();
|
|
||||||
|
|
||||||
let kickstart_filename = "inventory.kickstart";
|
let kickstart_filename = "inventory.kickstart".to_string();
|
||||||
let cluster_pubkey_filename = "cluster_ssh_key.pub";
|
let cluster_pubkey_filename = "cluster_ssh_key.pub".to_string();
|
||||||
let harmony_inventory_agent = "harmony_inventory_agent";
|
let harmony_inventory_agent = "harmony_inventory_agent".to_string();
|
||||||
|
|
||||||
// TODO: this should be a single IPXEScore instead of having the user do this step by step
|
let ipxe_score = OkdIpxeScore {
|
||||||
let scores: Vec<Box<dyn Score<HAClusterTopology>>> = vec![
|
kickstart_filename,
|
||||||
Box::new(DhcpScore {
|
|
||||||
host_binding: vec![],
|
|
||||||
next_server: Some(topology.router.get_gateway()),
|
|
||||||
boot_filename: None,
|
|
||||||
filename: Some("undionly.kpxe".to_string()),
|
|
||||||
filename64: Some("ipxe.efi".to_string()),
|
|
||||||
filenameipxe: Some(format!("http://{gateway_ip}:8080/boot.ipxe").to_string()),
|
|
||||||
}),
|
|
||||||
Box::new(TftpScore {
|
|
||||||
files_to_serve: Url::LocalFolder("./data/pxe/okd/tftpboot/".to_string()),
|
|
||||||
}),
|
|
||||||
Box::new(StaticFilesHttpScore {
|
|
||||||
// TODO The current russh based copy is way too slow, check for a lib update or use scp
|
|
||||||
// when available
|
|
||||||
//
|
|
||||||
// For now just run :
|
|
||||||
// scp -r data/pxe/okd/http_files/* root@192.168.1.1:/usr/local/http/
|
|
||||||
//
|
|
||||||
folder_to_serve: None,
|
|
||||||
// folder_to_serve: Some(Url::LocalFolder("./data/pxe/okd/http_files/".to_string())),
|
|
||||||
files: vec![
|
|
||||||
FileContent {
|
|
||||||
path: FilePath::Relative("boot.ipxe".to_string()),
|
|
||||||
content: BootIpxeTpl { gateway_ip }.to_string(),
|
|
||||||
},
|
|
||||||
FileContent {
|
|
||||||
path: FilePath::Relative(kickstart_filename.to_string()),
|
|
||||||
content: InventoryKickstartTpl {
|
|
||||||
gateway_ip,
|
|
||||||
harmony_inventory_agent,
|
harmony_inventory_agent,
|
||||||
cluster_pubkey_filename,
|
cluster_pubkey_filename,
|
||||||
}
|
};
|
||||||
.to_string(),
|
|
||||||
},
|
|
||||||
FileContent {
|
|
||||||
path: FilePath::Relative("fallback.ipxe".to_string()),
|
|
||||||
content: FallbackIpxeTpl {
|
|
||||||
gateway_ip,
|
|
||||||
kickstart_filename,
|
|
||||||
}
|
|
||||||
.to_string(),
|
|
||||||
},
|
|
||||||
],
|
|
||||||
}),
|
|
||||||
];
|
|
||||||
|
|
||||||
harmony_cli::run(inventory, topology, scores, None)
|
harmony_cli::run(inventory, topology, vec![Box::new(ipxe_score)], None)
|
||||||
.await
|
.await
|
||||||
.unwrap();
|
.unwrap();
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Template)]
|
|
||||||
#[template(path = "boot.ipxe.j2")]
|
|
||||||
struct BootIpxeTpl<'a> {
|
|
||||||
gateway_ip: &'a IpAddr,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Template)]
|
|
||||||
#[template(path = "fallback.ipxe.j2")]
|
|
||||||
struct FallbackIpxeTpl<'a> {
|
|
||||||
gateway_ip: &'a IpAddr,
|
|
||||||
kickstart_filename: &'a str,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Template)]
|
|
||||||
#[template(path = "inventory.kickstart.j2")]
|
|
||||||
struct InventoryKickstartTpl<'a> {
|
|
||||||
gateway_ip: &'a IpAddr,
|
|
||||||
cluster_pubkey_filename: &'a str,
|
|
||||||
harmony_inventory_agent: &'a str,
|
|
||||||
}
|
|
||||||
|
|||||||
@ -1,5 +1,3 @@
|
|||||||
use std::{net::IpAddr, sync::Arc};
|
|
||||||
|
|
||||||
use cidr::Ipv4Cidr;
|
use cidr::Ipv4Cidr;
|
||||||
use harmony::{
|
use harmony::{
|
||||||
hardware::{FirewallGroup, HostCategory, Location, PhysicalHost, SwitchGroup},
|
hardware::{FirewallGroup, HostCategory, Location, PhysicalHost, SwitchGroup},
|
||||||
@ -10,6 +8,7 @@ use harmony::{
|
|||||||
use harmony_macros::{ip, ipv4};
|
use harmony_macros::{ip, ipv4};
|
||||||
use harmony_secret::{Secret, SecretManager};
|
use harmony_secret::{Secret, SecretManager};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::{net::IpAddr, sync::Arc};
|
||||||
|
|
||||||
#[derive(Secret, Serialize, Deserialize, Debug, PartialEq)]
|
#[derive(Secret, Serialize, Deserialize, Debug, PartialEq)]
|
||||||
struct OPNSenseFirewallConfig {
|
struct OPNSenseFirewallConfig {
|
||||||
|
|||||||
@ -69,6 +69,7 @@ base64.workspace = true
|
|||||||
once_cell = "1.21.3"
|
once_cell = "1.21.3"
|
||||||
harmony_inventory_agent = { path = "../harmony_inventory_agent" }
|
harmony_inventory_agent = { path = "../harmony_inventory_agent" }
|
||||||
harmony_secret_derive = { version = "0.1.0", path = "../harmony_secret_derive" }
|
harmony_secret_derive = { version = "0.1.0", path = "../harmony_secret_derive" }
|
||||||
|
askama = "0.14.0"
|
||||||
|
|
||||||
[dev-dependencies]
|
[dev-dependencies]
|
||||||
pretty_assertions.workspace = true
|
pretty_assertions.workspace = true
|
||||||
|
|||||||
@ -1,6 +1,5 @@
|
|||||||
use log::debug;
|
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use tokio::sync::broadcast;
|
use std::{collections::HashMap, sync::Mutex};
|
||||||
|
|
||||||
use crate::modules::application::ApplicationFeatureStatus;
|
use crate::modules::application::ApplicationFeatureStatus;
|
||||||
|
|
||||||
@ -40,43 +39,43 @@ pub enum HarmonyEvent {
|
|||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
static HARMONY_EVENT_BUS: Lazy<broadcast::Sender<HarmonyEvent>> = Lazy::new(|| {
|
type Subscriber = Box<dyn Fn(&HarmonyEvent) + Send + Sync>;
|
||||||
// TODO: Adjust channel capacity
|
|
||||||
let (tx, _rx) = broadcast::channel(100);
|
|
||||||
tx
|
|
||||||
});
|
|
||||||
|
|
||||||
pub fn instrument(event: HarmonyEvent) -> Result<(), &'static str> {
|
static SUBSCRIBERS: Lazy<Mutex<HashMap<String, Subscriber>>> =
|
||||||
if cfg!(any(test, feature = "testing")) {
|
Lazy::new(|| Mutex::new(HashMap::new()));
|
||||||
let _ = event; // Suppress the "unused variable" warning for `event`
|
|
||||||
Ok(())
|
|
||||||
} else {
|
|
||||||
match HARMONY_EVENT_BUS.send(event) {
|
|
||||||
Ok(_) => Ok(()),
|
|
||||||
Err(_) => Err("send error: no subscribers"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn subscribe<F, Fut>(name: &str, mut handler: F)
|
/// Subscribes a listener to all instrumentation events.
|
||||||
|
///
|
||||||
|
/// Simply provide a unique name and a closure to run when an event happens.
|
||||||
|
///
|
||||||
|
/// # Example
|
||||||
|
/// ```
|
||||||
|
/// instrumentation::subscribe("my_logger", |event| {
|
||||||
|
/// println!("Event occurred: {:?}", event);
|
||||||
|
/// });
|
||||||
|
/// ```
|
||||||
|
pub fn subscribe<F>(name: &str, callback: F)
|
||||||
where
|
where
|
||||||
F: FnMut(HarmonyEvent) -> Fut + Send + 'static,
|
F: Fn(&HarmonyEvent) + Send + Sync + 'static,
|
||||||
Fut: Future<Output = bool> + Send,
|
|
||||||
{
|
{
|
||||||
let mut rx = HARMONY_EVENT_BUS.subscribe();
|
let mut subs = SUBSCRIBERS.lock().unwrap();
|
||||||
debug!("[{name}] Service started. Listening for events...");
|
subs.insert(name.to_string(), Box::new(callback));
|
||||||
loop {
|
}
|
||||||
match rx.recv().await {
|
|
||||||
Ok(event) => {
|
/// Instruments an event, notifying all subscribers.
|
||||||
if !handler(event).await {
|
///
|
||||||
debug!("[{name}] Handler requested exit.");
|
/// This will call every closure that was registered with `subscribe`.
|
||||||
break;
|
///
|
||||||
}
|
/// # Example
|
||||||
}
|
/// ```
|
||||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
/// instrumentation::instrument(HarmonyEvent::HarmonyStarted);
|
||||||
debug!("[{name}] Lagged behind by {n} messages.");
|
/// ```
|
||||||
}
|
pub fn instrument(event: HarmonyEvent) -> Result<(), &'static str> {
|
||||||
Err(_) => break,
|
let subs = SUBSCRIBERS.lock().unwrap();
|
||||||
}
|
|
||||||
}
|
for callback in subs.values() {
|
||||||
|
callback(&event);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -74,6 +74,7 @@ impl<T: Topology> Maestro<T> {
|
|||||||
|
|
||||||
fn is_topology_initialized(&self) -> bool {
|
fn is_topology_initialized(&self) -> bool {
|
||||||
self.topology_state.status == TopologyStatus::Success
|
self.topology_state.status == TopologyStatus::Success
|
||||||
|
|| self.topology_state.status == TopologyStatus::Noop
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn interpret(&self, score: Box<dyn Score<T>>) -> Result<Outcome, InterpretError> {
|
pub async fn interpret(&self, score: Box<dyn Score<T>>) -> Result<Outcome, InterpretError> {
|
||||||
|
|||||||
148
harmony/src/modules/okd/ipxe.rs
Normal file
148
harmony/src/modules/okd/ipxe.rs
Normal file
@ -0,0 +1,148 @@
|
|||||||
|
use askama::Template;
|
||||||
|
use async_trait::async_trait;
|
||||||
|
use derive_new::new;
|
||||||
|
use serde::Serialize;
|
||||||
|
use std::net::IpAddr;
|
||||||
|
|
||||||
|
use crate::{
|
||||||
|
data::{FileContent, FilePath, Id, Version},
|
||||||
|
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||||
|
inventory::Inventory,
|
||||||
|
modules::{dhcp::DhcpScore, http::StaticFilesHttpScore, tftp::TftpScore},
|
||||||
|
score::Score,
|
||||||
|
topology::{DhcpServer, HttpServer, Router, TftpServer, Topology, Url},
|
||||||
|
};
|
||||||
|
|
||||||
|
#[derive(Debug, new, Clone, Serialize)]
|
||||||
|
pub struct OkdIpxeScore {
|
||||||
|
pub kickstart_filename: String,
|
||||||
|
pub harmony_inventory_agent: String,
|
||||||
|
pub cluster_pubkey_filename: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Topology + DhcpServer + TftpServer + HttpServer + Router> Score<T> for OkdIpxeScore {
|
||||||
|
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||||
|
Box::new(IpxeInterpret::new(self.clone()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn name(&self) -> String {
|
||||||
|
"OkdIpxeScore".to_string()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, new, Clone)]
|
||||||
|
pub struct IpxeInterpret {
|
||||||
|
score: OkdIpxeScore,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
impl<T: Topology + DhcpServer + TftpServer + HttpServer + Router> Interpret<T> for IpxeInterpret {
|
||||||
|
async fn execute(
|
||||||
|
&self,
|
||||||
|
inventory: &Inventory,
|
||||||
|
topology: &T,
|
||||||
|
) -> Result<Outcome, InterpretError> {
|
||||||
|
let gateway_ip = topology.get_gateway();
|
||||||
|
|
||||||
|
let scores: Vec<Box<dyn Score<T>>> = vec![
|
||||||
|
Box::new(DhcpScore {
|
||||||
|
host_binding: vec![],
|
||||||
|
next_server: Some(topology.get_gateway()),
|
||||||
|
boot_filename: None,
|
||||||
|
filename: Some("undionly.kpxe".to_string()),
|
||||||
|
filename64: Some("ipxe.efi".to_string()),
|
||||||
|
filenameipxe: Some(format!("http://{gateway_ip}:8080/boot.ipxe").to_string()),
|
||||||
|
}),
|
||||||
|
Box::new(TftpScore {
|
||||||
|
files_to_serve: Url::LocalFolder("./data/pxe/okd/tftpboot/".to_string()),
|
||||||
|
}),
|
||||||
|
Box::new(StaticFilesHttpScore {
|
||||||
|
// TODO The current russh based copy is way too slow, check for a lib update or use scp
|
||||||
|
// when available
|
||||||
|
//
|
||||||
|
// For now just run :
|
||||||
|
// scp -r data/pxe/okd/http_files/* root@192.168.1.1:/usr/local/http/
|
||||||
|
//
|
||||||
|
folder_to_serve: None,
|
||||||
|
// folder_to_serve: Some(Url::LocalFolder("./data/pxe/okd/http_files/".to_string())),
|
||||||
|
files: vec![
|
||||||
|
FileContent {
|
||||||
|
path: FilePath::Relative("boot.ipxe".to_string()),
|
||||||
|
content: BootIpxeTpl {
|
||||||
|
gateway_ip: &gateway_ip,
|
||||||
|
}
|
||||||
|
.to_string(),
|
||||||
|
},
|
||||||
|
FileContent {
|
||||||
|
path: FilePath::Relative(self.score.kickstart_filename.clone()),
|
||||||
|
content: InventoryKickstartTpl {
|
||||||
|
gateway_ip: &gateway_ip,
|
||||||
|
harmony_inventory_agent: &self.score.harmony_inventory_agent,
|
||||||
|
cluster_pubkey_filename: &self.score.cluster_pubkey_filename,
|
||||||
|
}
|
||||||
|
.to_string(),
|
||||||
|
},
|
||||||
|
FileContent {
|
||||||
|
path: FilePath::Relative("fallback.ipxe".to_string()),
|
||||||
|
content: FallbackIpxeTpl {
|
||||||
|
gateway_ip: &gateway_ip,
|
||||||
|
kickstart_filename: &self.score.kickstart_filename,
|
||||||
|
}
|
||||||
|
.to_string(),
|
||||||
|
},
|
||||||
|
],
|
||||||
|
}),
|
||||||
|
];
|
||||||
|
|
||||||
|
for score in scores {
|
||||||
|
let result = score.interpret(inventory, topology).await;
|
||||||
|
match result {
|
||||||
|
Ok(outcome) => match outcome.status {
|
||||||
|
InterpretStatus::SUCCESS => continue,
|
||||||
|
InterpretStatus::NOOP => continue,
|
||||||
|
_ => return Err(InterpretError::new(outcome.message)),
|
||||||
|
},
|
||||||
|
Err(e) => return Err(e),
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(Outcome::success("Ipxe installed".to_string()))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_name(&self) -> InterpretName {
|
||||||
|
InterpretName::Ipxe
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_version(&self) -> Version {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_status(&self) -> InterpretStatus {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_children(&self) -> Vec<Id> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Template)]
|
||||||
|
#[template(path = "boot.ipxe.j2")]
|
||||||
|
struct BootIpxeTpl<'a> {
|
||||||
|
gateway_ip: &'a IpAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Template)]
|
||||||
|
#[template(path = "fallback.ipxe.j2")]
|
||||||
|
struct FallbackIpxeTpl<'a> {
|
||||||
|
gateway_ip: &'a IpAddr,
|
||||||
|
kickstart_filename: &'a str,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Template)]
|
||||||
|
#[template(path = "inventory.kickstart.j2")]
|
||||||
|
struct InventoryKickstartTpl<'a> {
|
||||||
|
gateway_ip: &'a IpAddr,
|
||||||
|
cluster_pubkey_filename: &'a str,
|
||||||
|
harmony_inventory_agent: &'a str,
|
||||||
|
}
|
||||||
@ -2,5 +2,6 @@ pub mod bootstrap_dhcp;
|
|||||||
pub mod bootstrap_load_balancer;
|
pub mod bootstrap_load_balancer;
|
||||||
pub mod dhcp;
|
pub mod dhcp;
|
||||||
pub mod dns;
|
pub mod dns;
|
||||||
|
pub mod ipxe;
|
||||||
pub mod load_balancer;
|
pub mod load_balancer;
|
||||||
pub mod upgrade;
|
pub mod upgrade;
|
||||||
|
|||||||
@ -7,19 +7,11 @@ use harmony::{
|
|||||||
};
|
};
|
||||||
use log::{error, info, log_enabled};
|
use log::{error, info, log_enabled};
|
||||||
use std::io::Write;
|
use std::io::Write;
|
||||||
use std::sync::{Arc, Mutex};
|
use std::sync::Mutex;
|
||||||
|
|
||||||
pub fn init() -> tokio::task::JoinHandle<()> {
|
pub fn init() {
|
||||||
configure_logger();
|
configure_logger();
|
||||||
let handle = tokio::spawn(handle_events());
|
handle_events();
|
||||||
|
|
||||||
loop {
|
|
||||||
if instrumentation::instrument(HarmonyEvent::HarmonyStarted).is_ok() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
handle
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn configure_logger() {
|
fn configure_logger() {
|
||||||
@ -86,16 +78,12 @@ fn configure_logger() {
|
|||||||
.init();
|
.init();
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_events() {
|
fn handle_events() {
|
||||||
let preparing_topology = Arc::new(Mutex::new(false));
|
let preparing_topology = Mutex::new(false);
|
||||||
let current_score: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
|
let current_score: Mutex<Option<String>> = Mutex::new(None);
|
||||||
|
|
||||||
instrumentation::subscribe("Harmony CLI Logger", {
|
instrumentation::subscribe("Harmony CLI Logger", {
|
||||||
move |event| {
|
move |event| {
|
||||||
let preparing_topology = Arc::clone(&preparing_topology);
|
|
||||||
let current_score = Arc::clone(¤t_score);
|
|
||||||
|
|
||||||
async move {
|
|
||||||
let mut preparing_topology = preparing_topology.lock().unwrap();
|
let mut preparing_topology = preparing_topology.lock().unwrap();
|
||||||
let mut current_score = current_score.lock().unwrap();
|
let mut current_score = current_score.lock().unwrap();
|
||||||
|
|
||||||
@ -104,7 +92,6 @@ async fn handle_events() {
|
|||||||
HarmonyEvent::HarmonyFinished => {
|
HarmonyEvent::HarmonyFinished => {
|
||||||
let emoji = crate::theme::EMOJI_HARMONY.to_string();
|
let emoji = crate::theme::EMOJI_HARMONY.to_string();
|
||||||
info!(emoji = emoji.as_str(); "Harmony completed");
|
info!(emoji = emoji.as_str(); "Harmony completed");
|
||||||
return false;
|
|
||||||
}
|
}
|
||||||
HarmonyEvent::TopologyStateChanged {
|
HarmonyEvent::TopologyStateChanged {
|
||||||
topology,
|
topology,
|
||||||
@ -113,7 +100,10 @@ async fn handle_events() {
|
|||||||
} => match status {
|
} => match status {
|
||||||
TopologyStatus::Queued => {}
|
TopologyStatus::Queued => {}
|
||||||
TopologyStatus::Preparing => {
|
TopologyStatus::Preparing => {
|
||||||
let emoji = format!("{}", style(crate::theme::EMOJI_TOPOLOGY.to_string()).yellow());
|
let emoji = format!(
|
||||||
|
"{}",
|
||||||
|
style(crate::theme::EMOJI_TOPOLOGY.to_string()).yellow()
|
||||||
|
);
|
||||||
info!(emoji = emoji.as_str(); "Preparing environment: {topology}...");
|
info!(emoji = emoji.as_str(); "Preparing environment: {topology}...");
|
||||||
(*preparing_topology) = true;
|
(*preparing_topology) = true;
|
||||||
}
|
}
|
||||||
@ -158,7 +148,7 @@ async fn handle_events() {
|
|||||||
score,
|
score,
|
||||||
outcome,
|
outcome,
|
||||||
} => {
|
} => {
|
||||||
if current_score.is_some() && current_score.clone().unwrap() == score {
|
if current_score.is_some() && ¤t_score.clone().unwrap() == score {
|
||||||
(*current_score) = None;
|
(*current_score) = None;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -175,7 +165,7 @@ async fn handle_events() {
|
|||||||
}
|
}
|
||||||
},
|
},
|
||||||
Err(err) => {
|
Err(err) => {
|
||||||
error!(status = "failed"; "{}", err);
|
error!(status = "failed"; "{err}");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -186,19 +176,16 @@ async fn handle_events() {
|
|||||||
status,
|
status,
|
||||||
} => match status {
|
} => match status {
|
||||||
ApplicationFeatureStatus::Installing => {
|
ApplicationFeatureStatus::Installing => {
|
||||||
info!("Installing feature '{}' for '{}'...", feature, application);
|
info!("Installing feature '{feature}' for '{application}'...");
|
||||||
}
|
}
|
||||||
ApplicationFeatureStatus::Installed => {
|
ApplicationFeatureStatus::Installed => {
|
||||||
info!(status = "finished"; "Feature '{}' installed", feature);
|
info!(status = "finished"; "Feature '{feature}' installed");
|
||||||
}
|
}
|
||||||
ApplicationFeatureStatus::Failed { details } => {
|
ApplicationFeatureStatus::Failed { details } => {
|
||||||
error!(status = "failed"; "Feature '{}' installation failed: {}", feature, details);
|
error!(status = "failed"; "Feature '{feature}' installation failed: {details}");
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
true
|
|
||||||
}
|
}
|
||||||
}
|
});
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -115,7 +115,7 @@ pub async fn run_cli<T: Topology + Send + Sync + 'static>(
|
|||||||
scores: Vec<Box<dyn Score<T>>>,
|
scores: Vec<Box<dyn Score<T>>>,
|
||||||
args: Args,
|
args: Args,
|
||||||
) -> Result<(), Box<dyn std::error::Error>> {
|
) -> Result<(), Box<dyn std::error::Error>> {
|
||||||
let cli_logger_handle = cli_logger::init();
|
cli_logger::init();
|
||||||
|
|
||||||
let mut maestro = Maestro::initialize(inventory, topology).await.unwrap();
|
let mut maestro = Maestro::initialize(inventory, topology).await.unwrap();
|
||||||
maestro.register_all(scores);
|
maestro.register_all(scores);
|
||||||
@ -123,7 +123,6 @@ pub async fn run_cli<T: Topology + Send + Sync + 'static>(
|
|||||||
let result = init(maestro, args).await;
|
let result = init(maestro, args).await;
|
||||||
|
|
||||||
instrumentation::instrument(instrumentation::HarmonyEvent::HarmonyFinished).unwrap();
|
instrumentation::instrument(instrumentation::HarmonyEvent::HarmonyFinished).unwrap();
|
||||||
let _ = tokio::try_join!(cli_logger_handle);
|
|
||||||
result
|
result
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -1,39 +1,26 @@
|
|||||||
use harmony_cli::progress::{IndicatifProgressTracker, ProgressTracker};
|
use harmony_cli::progress::{IndicatifProgressTracker, ProgressTracker};
|
||||||
use indicatif::MultiProgress;
|
use indicatif::MultiProgress;
|
||||||
use std::sync::Arc;
|
|
||||||
|
|
||||||
use crate::instrumentation::{self, HarmonyComposerEvent};
|
use crate::instrumentation::{self, HarmonyComposerEvent};
|
||||||
|
|
||||||
pub fn init() -> tokio::task::JoinHandle<()> {
|
pub fn init() {
|
||||||
configure_logger();
|
configure_logger();
|
||||||
let handle = tokio::spawn(handle_events());
|
handle_events();
|
||||||
|
|
||||||
loop {
|
|
||||||
if instrumentation::instrument(HarmonyComposerEvent::HarmonyComposerStarted).is_ok() {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
handle
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn configure_logger() {
|
fn configure_logger() {
|
||||||
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build();
|
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build();
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn handle_events() {
|
pub fn handle_events() {
|
||||||
let progress_tracker = Arc::new(IndicatifProgressTracker::new(MultiProgress::new()));
|
let progress_tracker = IndicatifProgressTracker::new(MultiProgress::new());
|
||||||
|
|
||||||
const SETUP_SECTION: &str = "project-initialization";
|
const SETUP_SECTION: &str = "project-initialization";
|
||||||
const COMPILTATION_TASK: &str = "compilation";
|
const COMPILTATION_TASK: &str = "compilation";
|
||||||
const PROGRESS_DEPLOYMENT: &str = "deployment";
|
const PROGRESS_DEPLOYMENT: &str = "deployment";
|
||||||
|
|
||||||
instrumentation::subscribe("Harmony Composer Logger", {
|
instrumentation::subscribe("Harmony Composer Logger", {
|
||||||
move |event| {
|
move |event| match event {
|
||||||
let progress_tracker = Arc::clone(&progress_tracker);
|
|
||||||
|
|
||||||
async move {
|
|
||||||
match event {
|
|
||||||
HarmonyComposerEvent::HarmonyComposerStarted => {}
|
HarmonyComposerEvent::HarmonyComposerStarted => {}
|
||||||
HarmonyComposerEvent::ProjectInitializationStarted => {
|
HarmonyComposerEvent::ProjectInitializationStarted => {
|
||||||
progress_tracker.add_section(
|
progress_tracker.add_section(
|
||||||
@ -46,13 +33,16 @@ pub async fn handle_events() {
|
|||||||
}
|
}
|
||||||
HarmonyComposerEvent::ProjectInitialized => {}
|
HarmonyComposerEvent::ProjectInitialized => {}
|
||||||
HarmonyComposerEvent::ProjectCompilationStarted { details } => {
|
HarmonyComposerEvent::ProjectCompilationStarted { details } => {
|
||||||
progress_tracker.add_task(SETUP_SECTION, COMPILTATION_TASK, &details);
|
progress_tracker.add_task(SETUP_SECTION, COMPILTATION_TASK, details);
|
||||||
}
|
}
|
||||||
HarmonyComposerEvent::ProjectCompiled => {
|
HarmonyComposerEvent::ProjectCompiled => {
|
||||||
progress_tracker.finish_task(COMPILTATION_TASK, "project compiled");
|
progress_tracker.finish_task(COMPILTATION_TASK, "project compiled");
|
||||||
}
|
}
|
||||||
HarmonyComposerEvent::ProjectCompilationFailed { details } => {
|
HarmonyComposerEvent::ProjectCompilationFailed { details } => {
|
||||||
progress_tracker.fail_task(COMPILTATION_TASK, &format!("failed to compile project:\n{details}"));
|
progress_tracker.fail_task(
|
||||||
|
COMPILTATION_TASK,
|
||||||
|
&format!("failed to compile project:\n{details}"),
|
||||||
|
);
|
||||||
}
|
}
|
||||||
HarmonyComposerEvent::DeploymentStarted { target, profile } => {
|
HarmonyComposerEvent::DeploymentStarted { target, profile } => {
|
||||||
progress_tracker.add_section(
|
progress_tracker.add_section(
|
||||||
@ -68,15 +58,9 @@ pub async fn handle_events() {
|
|||||||
}
|
}
|
||||||
HarmonyComposerEvent::DeploymentFailed { details } => {
|
HarmonyComposerEvent::DeploymentFailed { details } => {
|
||||||
progress_tracker.add_task(PROGRESS_DEPLOYMENT, "deployment-failed", "");
|
progress_tracker.add_task(PROGRESS_DEPLOYMENT, "deployment-failed", "");
|
||||||
progress_tracker.fail_task("deployment-failed", &details);
|
progress_tracker.fail_task("deployment-failed", details);
|
||||||
},
|
|
||||||
HarmonyComposerEvent::Shutdown => {
|
|
||||||
return false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
true
|
|
||||||
}
|
}
|
||||||
|
HarmonyComposerEvent::Shutdown => {}
|
||||||
}
|
}
|
||||||
})
|
})
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,6 +1,5 @@
|
|||||||
use log::debug;
|
|
||||||
use once_cell::sync::Lazy;
|
use once_cell::sync::Lazy;
|
||||||
use tokio::sync::broadcast;
|
use std::{collections::HashMap, sync::Mutex};
|
||||||
|
|
||||||
use crate::{HarmonyProfile, HarmonyTarget};
|
use crate::{HarmonyProfile, HarmonyTarget};
|
||||||
|
|
||||||
@ -27,48 +26,43 @@ pub enum HarmonyComposerEvent {
|
|||||||
Shutdown,
|
Shutdown,
|
||||||
}
|
}
|
||||||
|
|
||||||
static HARMONY_COMPOSER_EVENT_BUS: Lazy<broadcast::Sender<HarmonyComposerEvent>> =
|
type Subscriber = Box<dyn Fn(&HarmonyComposerEvent) + Send + Sync>;
|
||||||
Lazy::new(|| {
|
|
||||||
// TODO: Adjust channel capacity
|
|
||||||
let (tx, _rx) = broadcast::channel(16);
|
|
||||||
tx
|
|
||||||
});
|
|
||||||
|
|
||||||
pub fn instrument(event: HarmonyComposerEvent) -> Result<(), &'static str> {
|
static SUBSCRIBERS: Lazy<Mutex<HashMap<String, Subscriber>>> =
|
||||||
#[cfg(not(test))]
|
Lazy::new(|| Mutex::new(HashMap::new()));
|
||||||
{
|
|
||||||
match HARMONY_COMPOSER_EVENT_BUS.send(event) {
|
|
||||||
Ok(_) => Ok(()),
|
|
||||||
Err(_) => Err("send error: no subscribers"),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
/// Subscribes a listener to all instrumentation events.
|
||||||
{
|
///
|
||||||
let _ = event; // Suppress the "unused variable" warning for `event`
|
/// Simply provide a unique name and a closure to run when an event happens.
|
||||||
Ok(())
|
///
|
||||||
}
|
/// # Example
|
||||||
}
|
/// ```
|
||||||
|
/// instrumentation::subscribe("my_logger", |event| {
|
||||||
pub async fn subscribe<F, Fut>(name: &str, mut handler: F)
|
/// println!("Event occurred: {:?}", event);
|
||||||
|
/// });
|
||||||
|
/// ```
|
||||||
|
pub fn subscribe<F>(name: &str, callback: F)
|
||||||
where
|
where
|
||||||
F: FnMut(HarmonyComposerEvent) -> Fut + Send + 'static,
|
F: Fn(&HarmonyComposerEvent) + Send + Sync + 'static,
|
||||||
Fut: Future<Output = bool> + Send,
|
|
||||||
{
|
{
|
||||||
let mut rx = HARMONY_COMPOSER_EVENT_BUS.subscribe();
|
let mut subs = SUBSCRIBERS.lock().unwrap();
|
||||||
debug!("[{name}] Service started. Listening for events...");
|
subs.insert(name.to_string(), Box::new(callback));
|
||||||
loop {
|
}
|
||||||
match rx.recv().await {
|
|
||||||
Ok(event) => {
|
/// Instruments an event, notifying all subscribers.
|
||||||
if !handler(event).await {
|
///
|
||||||
debug!("[{name}] Handler requested exit.");
|
/// This will call every closure that was registered with `subscribe`.
|
||||||
break;
|
///
|
||||||
}
|
/// # Example
|
||||||
}
|
/// ```
|
||||||
Err(broadcast::error::RecvError::Lagged(n)) => {
|
/// instrumentation::instrument(HarmonyEvent::HarmonyStarted);
|
||||||
debug!("[{name}] Lagged behind by {n} messages.");
|
/// ```
|
||||||
}
|
pub fn instrument(event: HarmonyComposerEvent) -> Result<(), &'static str> {
|
||||||
Err(_) => break,
|
let subs = SUBSCRIBERS.lock().unwrap();
|
||||||
}
|
|
||||||
}
|
for callback in subs.values() {
|
||||||
|
callback(&event);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
@ -99,7 +99,7 @@ impl std::fmt::Display for HarmonyProfile {
|
|||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() {
|
async fn main() {
|
||||||
let hc_logger_handle = harmony_composer_logger::init();
|
harmony_composer_logger::init();
|
||||||
let cli_args = GlobalArgs::parse();
|
let cli_args = GlobalArgs::parse();
|
||||||
|
|
||||||
let harmony_path = Path::new(&cli_args.harmony_path)
|
let harmony_path = Path::new(&cli_args.harmony_path)
|
||||||
@ -199,8 +199,6 @@ async fn main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
instrumentation::instrument(HarmonyComposerEvent::Shutdown).unwrap();
|
instrumentation::instrument(HarmonyComposerEvent::Shutdown).unwrap();
|
||||||
|
|
||||||
let _ = tokio::try_join!(hc_logger_handle);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Clone, Debug, clap::ValueEnum)]
|
#[derive(Clone, Debug, clap::ValueEnum)]
|
||||||
|
|||||||
@ -94,13 +94,9 @@ async fn init_instrumentation() -> tokio::task::JoinHandle<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
async fn handle_harmony_events() {
|
async fn handle_harmony_events() {
|
||||||
instrumentation::subscribe("Harmony TUI Logger", async |event| {
|
instrumentation::subscribe("Harmony TUI Logger", |_| {
|
||||||
if let HarmonyEvent::HarmonyFinished = event {
|
// TODO: Display events in the TUI
|
||||||
return false;
|
});
|
||||||
};
|
|
||||||
true
|
|
||||||
})
|
|
||||||
.await;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct HarmonyTUI<T: Topology> {
|
pub struct HarmonyTUI<T: Topology> {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user