Compare commits

..

5 Commits

18 changed files with 791 additions and 266 deletions

1
Cargo.lock generated
View File

@@ -1834,7 +1834,6 @@ name = "harmony_cli"
version = "0.1.0"
dependencies = [
"assert_cmd",
"chrono",
"clap",
"console",
"env_logger",

View File

@@ -20,7 +20,7 @@ readme = "README.md"
license = "GNU AGPL v3"
[workspace.dependencies]
log = { version = "0.4", features = ["kv"] }
log = "0.4"
env_logger = "0.11"
derive-new = "0.7"
async-trait = "0.1"

View File

@@ -8,6 +8,7 @@ use harmony::{
hardware::{FirewallGroup, HostCategory, Location, PhysicalHost, SwitchGroup},
infra::opnsense::OPNSenseManagementInterface,
inventory::Inventory,
maestro::Maestro,
modules::{
http::StaticFilesHttpScore,
ipxe::IpxeScore,
@@ -129,21 +130,16 @@ async fn main() {
"./data/watchguard/pxe-http-files".to_string(),
));
let ipxe_score = IpxeScore::new();
harmony_tui::run(
inventory,
topology,
vec![
Box::new(dns_score),
Box::new(bootstrap_dhcp_score),
Box::new(bootstrap_load_balancer_score),
Box::new(load_balancer_score),
Box::new(tftp_score),
Box::new(http_score),
Box::new(ipxe_score),
Box::new(dhcp_score),
],
)
.await
.unwrap();
let mut maestro = Maestro::initialize(inventory, topology).await.unwrap();
maestro.register_all(vec![
Box::new(dns_score),
Box::new(bootstrap_dhcp_score),
Box::new(bootstrap_load_balancer_score),
Box::new(load_balancer_score),
Box::new(tftp_score),
Box::new(http_score),
Box::new(ipxe_score),
Box::new(dhcp_score),
]);
harmony_tui::init(maestro).await.unwrap();
}

View File

@@ -8,6 +8,7 @@ use harmony::{
hardware::{FirewallGroup, HostCategory, Location, PhysicalHost, SwitchGroup},
infra::opnsense::OPNSenseManagementInterface,
inventory::Inventory,
maestro::Maestro,
modules::{
dummy::{ErrorScore, PanicScore, SuccessScore},
http::StaticFilesHttpScore,
@@ -83,25 +84,20 @@ async fn main() {
let http_score = StaticFilesHttpScore::new(Url::LocalFolder(
"./data/watchguard/pxe-http-files".to_string(),
));
harmony_tui::run(
inventory,
topology,
vec![
Box::new(dns_score),
Box::new(dhcp_score),
Box::new(load_balancer_score),
Box::new(tftp_score),
Box::new(http_score),
Box::new(OPNsenseShellCommandScore {
opnsense: opnsense.get_opnsense_config(),
command: "touch /tmp/helloharmonytouching".to_string(),
}),
Box::new(SuccessScore {}),
Box::new(ErrorScore {}),
Box::new(PanicScore {}),
],
)
.await
.unwrap();
let mut maestro = Maestro::initialize(inventory, topology).await.unwrap();
maestro.register_all(vec![
Box::new(dns_score),
Box::new(dhcp_score),
Box::new(load_balancer_score),
Box::new(tftp_score),
Box::new(http_score),
Box::new(OPNsenseShellCommandScore {
opnsense: opnsense.get_opnsense_config(),
command: "touch /tmp/helloharmonytouching".to_string(),
}),
Box::new(SuccessScore {}),
Box::new(ErrorScore {}),
Box::new(PanicScore {}),
]);
harmony_tui::init(maestro).await.unwrap();
}

View File

@@ -0,0 +1,12 @@
[package]
name = "example_remove_rook_osd"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { version = "0.1.0", path = "../../harmony" }
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
harmony_tui = { version = "0.1.0", path = "../../harmony_tui" }
tokio.workspace = true

View File

@@ -0,0 +1,18 @@
use harmony::{
inventory::Inventory, modules::storage::ceph::ceph_remove_osd_score::CephRemoveOsd,
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let ceph_score = CephRemoveOsd {
osd_deployment_name: "rook-ceph-osd-2".to_string(),
rook_ceph_namespace: "rook-ceph".to_string(),
};
let topology = K8sAnywhereTopology::from_env();
let inventory = Inventory::autoload();
harmony_cli::run(inventory, topology, vec![Box::new(ceph_score)], None)
.await
.unwrap();
}

View File

@@ -2,6 +2,7 @@ use std::net::{SocketAddr, SocketAddrV4};
use harmony::{
inventory::Inventory,
maestro::Maestro,
modules::{
dns::DnsScore,
dummy::{ErrorScore, PanicScore, SuccessScore},
@@ -15,19 +16,18 @@ use harmony_macros::ipv4;
#[tokio::main]
async fn main() {
harmony_tui::run(
Inventory::autoload(),
DummyInfra {},
vec![
Box::new(SuccessScore {}),
Box::new(ErrorScore {}),
Box::new(PanicScore {}),
Box::new(DnsScore::new(vec![], None)),
Box::new(build_large_score()),
],
)
.await
.unwrap();
let inventory = Inventory::autoload();
let topology = DummyInfra {};
let mut maestro = Maestro::initialize(inventory, topology).await.unwrap();
maestro.register_all(vec![
Box::new(SuccessScore {}),
Box::new(ErrorScore {}),
Box::new(PanicScore {}),
Box::new(DnsScore::new(vec![], None)),
Box::new(build_large_score()),
]);
harmony_tui::init(maestro).await.unwrap();
}
fn build_large_score() -> LoadBalancerScore {

View File

@@ -241,7 +241,7 @@ pub struct DummyInfra;
#[async_trait]
impl Topology for DummyInfra {
fn name(&self) -> &str {
"DummyInfra"
todo!()
}
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {

View File

@@ -5,7 +5,7 @@ use k8s_openapi::{
};
use kube::{
Client, Config, Error, Resource,
api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt},
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse,
runtime::reflector::Lookup,
@@ -17,7 +17,9 @@ use kube::{
};
use log::{debug, error, trace};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::json;
use similar::TextDiff;
use tokio::io::AsyncReadExt;
#[derive(new, Clone)]
pub struct K8sClient {
@@ -51,6 +53,66 @@ impl K8sClient {
})
}
pub async fn get_deployment(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<Option<Deployment>, Error> {
let deps: Api<Deployment> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
Ok(deps.get_opt(name).await?)
}
pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result<Option<Pod>, Error> {
let pods: Api<Pod> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
Ok(pods.get_opt(name).await?)
}
pub async fn scale_deployment(
&self,
name: &str,
namespace: Option<&str>,
replicas: u32,
) -> Result<(), Error> {
let deployments: Api<Deployment> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
let patch = json!({
"spec": {
"replicas": replicas
}
});
let pp = PatchParams::default();
let scale = Patch::Apply(&patch);
deployments.patch_scale(name, &pp, &scale).await?;
Ok(())
}
pub async fn delete_deployment(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<(), Error> {
let deployments: Api<Deployment> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
let delete_params = DeleteParams::default();
deployments.delete(name, &delete_params).await?;
Ok(())
}
pub async fn wait_until_deployment_ready(
&self,
name: String,
@@ -76,6 +138,68 @@ impl K8sClient {
}
}
/// Will execute a commond in the first pod found that matches the specified label
/// '{label}={name}'
pub async fn exec_app_capture_output(
&self,
name: String,
label: String,
namespace: Option<&str>,
command: Vec<&str>,
) -> Result<String, String> {
let api: Api<Pod>;
if let Some(ns) = namespace {
api = Api::namespaced(self.client.clone(), ns);
} else {
api = Api::default_namespaced(self.client.clone());
}
let pod_list = api
.list(&ListParams::default().labels(format!("{label}={name}").as_str()))
.await
.expect("couldn't get list of pods");
let res = api
.exec(
pod_list
.items
.first()
.expect("couldn't get pod")
.name()
.expect("couldn't get pod name")
.into_owned()
.as_str(),
command,
&AttachParams::default().stdout(true).stderr(true),
)
.await;
match res {
Err(e) => Err(e.to_string()),
Ok(mut process) => {
let status = process
.take_status()
.expect("Couldn't get status")
.await
.expect("Couldn't unwrap status");
if let Some(s) = status.status {
let mut stdout_buf = String::new();
if let Some(mut stdout) = process.stdout().take() {
stdout.read_to_string(&mut stdout_buf).await;
}
debug!("Status: {} - {:?}", s, status.details);
if s == "Success" {
Ok(stdout_buf)
} else {
Err(s)
}
} else {
Err("Couldn't get inner status of pod exec".to_string())
}
}
}
}
/// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}`
pub async fn exec_app(
&self,

View File

@@ -14,5 +14,6 @@ pub mod monitoring;
pub mod okd;
pub mod opnsense;
pub mod prometheus;
pub mod storage;
pub mod tenant;
pub mod tftp;

View File

@@ -0,0 +1,419 @@
use std::{
process::Command,
sync::Arc,
time::{Duration, Instant},
};
use async_trait::async_trait;
use log::{info, warn};
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use crate::{
data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
};
#[derive(Debug, Clone, Serialize)]
pub struct CephRemoveOsd {
pub osd_deployment_name: String,
pub rook_ceph_namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
fn name(&self) -> String {
format!("CephRemoveOsdScore")
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(CephRemoveOsdInterpret {
score: self.clone(),
})
}
}
#[derive(Debug, Clone)]
pub struct CephRemoveOsdInterpret {
score: CephRemoveOsd,
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await.unwrap();
self.verify_ceph_toolbox_exists(client.clone()).await?;
self.scale_deployment(client.clone()).await?;
self.verify_deployment_scaled(client.clone()).await?;
self.delete_deployment(client.clone()).await?;
self.verify_deployment_deleted(client.clone()).await?;
let osd_id_full = self.get_ceph_osd_id().unwrap();
self.purge_ceph_osd(client.clone(), &osd_id_full).await?;
self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
.await?;
Ok(Outcome::success(format!(
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
osd_id_full, self.score.osd_deployment_name
)))
}
fn get_name(&self) -> InterpretName {
todo!()
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl CephRemoveOsdInterpret {
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self
.score
.osd_deployment_name
.split('-')
.nth(3)
.ok_or_else(|| {
InterpretError::new(format!(
"Could not parse OSD id from deployment name {}",
self.score.osd_deployment_name
))
})?;
let osd_id_full = format!("osd.{}", osd_id_numeric);
info!(
"Targeting Ceph OSD: {} (parsed from deployment {})",
osd_id_full, self.score.osd_deployment_name
);
Ok(osd_id_full)
}
pub async fn verify_ceph_toolbox_exists(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let toolbox_dep = "rook-ceph-tools".to_string();
match client
.get_deployment(&toolbox_dep, Some(&self.score.rook_ceph_namespace))
.await
{
Ok(Some(deployment)) => {
if let Some(status) = deployment.status {
let ready_count = status.ready_replicas.unwrap_or(0);
if ready_count >= 1 {
return Ok(Outcome::success(format!(
"'{}' is ready with {} replica(s).",
&toolbox_dep, ready_count
)));
} else {
return Err(InterpretError::new(
"ceph-tool-box not ready in cluster".to_string(),
));
}
} else {
Err(InterpretError::new(format!(
"failed to get deployment status {}",
&toolbox_dep
)))
}
}
Ok(None) => Err(InterpretError::new(format!(
"Deployment '{}' not found in namespace '{}'.",
&toolbox_dep, self.score.rook_ceph_namespace
))),
Err(e) => Err(InterpretError::new(format!(
"Failed to query for deployment '{}': {}",
&toolbox_dep, e
))),
}
}
pub async fn scale_deployment(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
"Scaling down OSD deployment: {}",
self.score.osd_deployment_name
);
client
.scale_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
0,
)
.await?;
Ok(Outcome::success(format!(
"Scaled down deployment {}",
self.score.osd_deployment_name
)))
}
pub async fn verify_deployment_scaled(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
)
.await?;
if let Some(deployment) = dep {
if let Some(status) = deployment.status {
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0
{
return Ok(Outcome::success(
"Deployment successfully scaled down.".to_string(),
));
}
}
}
if start.elapsed() > timeout {
return Err(InterpretError::new(format!(
"Timed out waiting for deployment {} to scale down",
self.score.osd_deployment_name
)));
}
sleep(interval).await;
}
}
fn build_timer(&self) -> (Duration, Duration, Instant) {
let timeout = Duration::from_secs(120);
let interval = Duration::from_secs(5);
let start = Instant::now();
(timeout, interval, start)
}
pub async fn delete_deployment(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
"Deleting OSD deployment: {}",
self.score.osd_deployment_name
);
client
.delete_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
)
.await?;
Ok(Outcome::success(format!(
"deployment {} deleted",
self.score.osd_deployment_name
)))
}
pub async fn verify_deployment_deleted(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
)
.await?;
if dep.is_none() {
info!(
"Deployment {} successfully deleted.",
self.score.osd_deployment_name
);
return Ok(Outcome::success(format!(
"Deployment {} deleted.",
self.score.osd_deployment_name
)));
}
if start.elapsed() > timeout {
return Err(InterpretError::new(format!(
"Timed out waiting for deployment {} to be deleted",
self.score.osd_deployment_name
)));
}
sleep(interval).await;
}
}
fn get_osd_tree(&self, json: serde_json::Value) -> Result<CephOsdTree, InterpretError> {
let nodes = json.get("nodes").ok_or_else(|| {
InterpretError::new("Missing 'nodes' field in ceph osd tree JSON".to_string())
})?;
let tree: CephOsdTree = CephOsdTree {
nodes: serde_json::from_value(nodes.clone()).map_err(|e| {
InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e))
})?,
};
Ok(tree)
}
pub async fn purge_ceph_osd(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
info!(
"Purging OSD {} from Ceph cluster and removing its auth key",
osd_id_full
);
client
.exec_app_capture_output(
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec![
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(),
format!("ceph auth del osd.{osd_id_full}").as_str(),
],
)
.await?;
Ok(Outcome::success(format!(
"osd id {} removed from osd tree",
osd_id_full
)))
}
pub async fn verify_ceph_osd_removal(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!(
"Verifying OSD {} has been removed from the Ceph tree...",
osd_id_full
);
loop {
let output = client
.exec_app_capture_output(
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec!["ceph osd tree -f json"],
)
.await?;
let tree =
self.get_osd_tree(serde_json::from_str(&output).expect("could not extract json"));
let osd_found = tree
.unwrap()
.nodes
.iter()
.any(|node| node.name == osd_id_full);
if !osd_found {
return Ok(Outcome::success(format!(
"Successfully verified that OSD {} is removed from the Ceph cluster.",
osd_id_full,
)));
}
if start.elapsed() > timeout {
return Err(InterpretError::new(format!(
"Timed out waiting for OSD {} to be removed from Ceph tree",
osd_id_full
)));
}
warn!(
"OSD {} still found in Ceph tree, retrying in {:?}...",
osd_id_full, interval
);
sleep(interval).await;
}
}
}
#[derive(Debug, Deserialize, PartialEq)]
pub struct CephOsdTree {
pub nodes: Vec<CephNode>,
}
#[derive(Debug, Deserialize, PartialEq)]
pub struct CephNode {
pub id: i32,
pub name: String,
#[serde(rename = "type")]
pub node_type: String,
pub type_id: Option<i32>,
pub children: Option<Vec<i32>>,
pub exists: Option<i32>,
pub status: Option<String>,
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
#[test]
fn test_get_osd_tree() {
let json_data = json!({
"nodes": [
{"id": 1, "name": "osd.1", "type": "osd", "primary_affinity":"1"},
{"id": 2, "name": "osd.2", "type": "osd", "crush_weight": 1.22344}
]
});
let interpret = CephRemoveOsdInterpret {
score: CephRemoveOsd {
osd_deployment_name: "osd-1".to_string(),
rook_ceph_namespace: "dummy_ns".to_string(),
},
};
let json = interpret.get_osd_tree(json_data).unwrap();
let expected = CephOsdTree {
nodes: vec![
CephNode {
id: 1,
name: "osd.1".to_string(),
node_type: "osd".to_string(),
type_id: None,
children: None,
exists: None,
status: None,
},
CephNode {
id: 2,
name: "osd.2".to_string(),
node_type: "osd".to_string(),
type_id: None,
children: None,
exists: None,
status: None,
},
],
};
assert_eq!(json, expected);
}
}

View File

@@ -0,0 +1 @@
pub mod ceph_remove_osd_score;

View File

@@ -0,0 +1 @@
pub mod ceph;

View File

@@ -22,7 +22,6 @@ indicatif = "0.18.0"
lazy_static = "1.5.0"
log.workspace = true
indicatif-log-bridge = "0.2.3"
chrono.workspace = true
[dev-dependencies]
harmony = { path = "../harmony", features = ["testing"] }

View File

@@ -1,17 +1,22 @@
use chrono::Local;
use console::style;
use harmony::{
instrumentation::{self, HarmonyEvent},
modules::application::ApplicationFeatureStatus,
topology::TopologyStatus,
};
use log::{error, info, log_enabled};
use std::io::Write;
use std::sync::{Arc, Mutex};
use indicatif::MultiProgress;
use indicatif_log_bridge::LogWrapper;
use log::error;
use std::{
sync::{Arc, Mutex},
thread,
time::Duration,
};
use crate::progress::{IndicatifProgressTracker, ProgressTracker};
pub fn init() -> tokio::task::JoinHandle<()> {
configure_logger();
let handle = tokio::spawn(handle_events());
let base_progress = configure_logger();
let handle = tokio::spawn(handle_events(base_progress));
loop {
if instrumentation::instrument(HarmonyEvent::HarmonyStarted).is_ok() {
@@ -22,76 +27,28 @@ pub fn init() -> tokio::task::JoinHandle<()> {
handle
}
fn configure_logger() {
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info"))
.format(|buf, record| {
let debug_mode = log_enabled!(log::Level::Debug);
let timestamp = Local::now().format("%Y-%m-%d %H:%M:%S");
fn configure_logger() -> MultiProgress {
let logger =
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build();
let level = logger.filter();
let progress = MultiProgress::new();
let level = match record.level() {
log::Level::Error => style("ERROR").red(),
log::Level::Warn => style("WARN").yellow(),
log::Level::Info => style("INFO").green(),
log::Level::Debug => style("DEBUG").blue(),
log::Level::Trace => style("TRACE").magenta(),
};
if let Some(status) = record.key_values().get(log::kv::Key::from("status")) {
let status = status.to_borrowed_str().unwrap();
let emoji = match status {
"finished" => style(crate::theme::EMOJI_SUCCESS.to_string()).green(),
"skipped" => style(crate::theme::EMOJI_SKIP.to_string()).yellow(),
"failed" => style(crate::theme::EMOJI_ERROR.to_string()).red(),
_ => style("".into()),
};
if debug_mode {
writeln!(
buf,
"[{} {:<5} {}] {} {}",
timestamp,
level,
record.target(),
emoji,
record.args()
)
} else {
writeln!(buf, "[{:<5}] {} {}", level, emoji, record.args())
}
} else if let Some(emoji) = record.key_values().get(log::kv::Key::from("emoji")) {
if debug_mode {
writeln!(
buf,
"[{} {:<5} {}] {} {}",
timestamp,
level,
record.target(),
emoji,
record.args()
)
} else {
writeln!(buf, "[{:<5}] {} {}", level, emoji, record.args())
}
} else if debug_mode {
writeln!(
buf,
"[{} {:<5} {}] {}",
timestamp,
level,
record.target(),
record.args()
)
} else {
writeln!(buf, "[{:<5}] {}", level, record.args())
}
})
.init();
LogWrapper::new(progress.clone(), logger)
.try_init()
.unwrap();
log::set_max_level(level);
progress
}
async fn handle_events() {
async fn handle_events(base_progress: MultiProgress) {
let progress_tracker = Arc::new(IndicatifProgressTracker::new(base_progress.clone()));
let preparing_topology = Arc::new(Mutex::new(false));
let current_score: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
instrumentation::subscribe("Harmony CLI Logger", {
move |event| {
let progress_tracker = Arc::clone(&progress_tracker);
let preparing_topology = Arc::clone(&preparing_topology);
let current_score = Arc::clone(&current_score);
@@ -102,57 +59,90 @@ async fn handle_events() {
match event {
HarmonyEvent::HarmonyStarted => {}
HarmonyEvent::HarmonyFinished => {
let emoji = crate::theme::EMOJI_HARMONY.to_string();
info!(emoji = emoji.as_str(); "Harmony completed");
progress_tracker.add_section(
"harmony-summary",
&format!("\n{} Harmony completed\n\n", crate::theme::EMOJI_HARMONY),
);
progress_tracker.add_section("harmony-finished", "\n\n");
thread::sleep(Duration::from_millis(200));
return false;
}
HarmonyEvent::TopologyStateChanged {
topology,
status,
message,
} => match status {
TopologyStatus::Queued => {}
TopologyStatus::Preparing => {
let emoji = format!("{}", style(crate::theme::EMOJI_TOPOLOGY.to_string()).yellow());
info!(emoji = emoji.as_str(); "Preparing environment: {topology}...");
(*preparing_topology) = true;
}
TopologyStatus::Success => {
(*preparing_topology) = false;
if let Some(message) = message {
info!(status = "finished"; "{message}");
} => {
let section_key = topology_key(&topology);
match status {
TopologyStatus::Queued => {}
TopologyStatus::Preparing => {
progress_tracker.add_section(
&section_key,
&format!(
"\n{} Preparing environment: {topology}...",
crate::theme::EMOJI_TOPOLOGY
),
);
(*preparing_topology) = true;
}
TopologyStatus::Success => {
(*preparing_topology) = false;
progress_tracker.add_task(&section_key, "topology-success", "");
progress_tracker
.finish_task("topology-success", &message.unwrap_or("".into()));
}
TopologyStatus::Noop => {
(*preparing_topology) = false;
progress_tracker.add_task(&section_key, "topology-skip", "");
progress_tracker
.skip_task("topology-skip", &message.unwrap_or("".into()));
}
TopologyStatus::Error => {
progress_tracker.add_task(&section_key, "topology-error", "");
(*preparing_topology) = false;
progress_tracker
.fail_task("topology-error", &message.unwrap_or("".into()));
}
}
TopologyStatus::Noop => {
(*preparing_topology) = false;
if let Some(message) = message {
info!(status = "skipped"; "{message}");
}
}
TopologyStatus::Error => {
(*preparing_topology) = false;
if let Some(message) = message {
error!(status = "failed"; "{message}");
}
}
},
}
HarmonyEvent::InterpretExecutionStarted {
execution_id: _,
topology: _,
execution_id: task_key,
topology,
interpret: _,
score,
message,
} => {
if *preparing_topology || current_score.is_some() {
info!("{message}");
let is_key_topology = (*preparing_topology)
&& progress_tracker.contains_section(&topology_key(&topology));
let is_key_current_score = current_score.is_some()
&& progress_tracker
.contains_section(&score_key(&current_score.clone().unwrap()));
let is_key_score = progress_tracker.contains_section(&score_key(&score));
let section_key = if is_key_topology {
topology_key(&topology)
} else if is_key_current_score {
score_key(&current_score.clone().unwrap())
} else if is_key_score {
score_key(&score)
} else {
(*current_score) = Some(score.clone());
let emoji = format!("{}", style(crate::theme::EMOJI_SCORE).blue());
info!(emoji = emoji.as_str(); "Interpreting score: {score}...");
}
let key = score_key(&score);
progress_tracker.add_section(
&key,
&format!(
"{} Interpreting score: {score}...",
crate::theme::EMOJI_SCORE
),
);
key
};
progress_tracker.add_task(&section_key, &task_key, &message);
}
HarmonyEvent::InterpretExecutionFinished {
execution_id: _,
execution_id: task_key,
topology: _,
interpret: _,
score,
@@ -165,17 +155,16 @@ async fn handle_events() {
match outcome {
Ok(outcome) => match outcome.status {
harmony::interpret::InterpretStatus::SUCCESS => {
info!(status = "finished"; "{}", outcome.message);
progress_tracker.finish_task(&task_key, &outcome.message);
}
harmony::interpret::InterpretStatus::NOOP => {
info!(status = "skipped"; "{}", outcome.message);
}
_ => {
error!(status = "failed"; "{}", outcome.message);
progress_tracker.skip_task(&task_key, &outcome.message);
}
_ => progress_tracker.fail_task(&task_key, &outcome.message),
},
Err(err) => {
error!(status = "failed"; "{}", err);
error!("Interpret error: {err}");
progress_tracker.fail_task(&task_key, &err.to_string());
}
}
}
@@ -184,17 +173,30 @@ async fn handle_events() {
application,
feature,
status,
} => match status {
ApplicationFeatureStatus::Installing => {
info!("Installing feature '{}' for '{}'...", feature, application);
} => {
if let Some(score) = &(*current_score) {
let section_key = score_key(score);
let task_key = app_feature_key(&application, &feature);
match status {
ApplicationFeatureStatus::Installing => {
let message = format!("Feature '{}' installing...", feature);
progress_tracker.add_task(&section_key, &task_key, &message);
}
ApplicationFeatureStatus::Installed => {
let message = format!("Feature '{}' installed", feature);
progress_tracker.finish_task(&task_key, &message);
}
ApplicationFeatureStatus::Failed { details } => {
let message = format!(
"Feature '{}' installation failed: {}",
feature, details
);
progress_tracker.fail_task(&task_key, &message);
}
}
}
ApplicationFeatureStatus::Installed => {
info!(status = "finished"; "Feature '{}' installed", feature);
}
ApplicationFeatureStatus::Failed { details } => {
error!(status = "failed"; "Feature '{}' installation failed: {}", feature, details);
}
},
}
}
true
}
@@ -202,3 +204,15 @@ async fn handle_events() {
})
.await;
}
fn topology_key(topology: &str) -> String {
format!("topology-{topology}")
}
fn score_key(score: &str) -> String {
format!("score-{score}")
}
fn app_feature_key(application: &str, feature: &str) -> String {
format!("app-{application}-{feature}")
}

View File

@@ -90,37 +90,13 @@ pub async fn run<T: Topology + Send + Sync + 'static>(
topology: T,
scores: Vec<Box<dyn Score<T>>>,
args_struct: Option<Args>,
) -> Result<(), Box<dyn std::error::Error>> {
let args = match args_struct {
Some(args) => args,
None => Args::parse(),
};
#[cfg(not(feature = "tui"))]
if args.interactive {
return Err("Not compiled with interactive support".into());
}
#[cfg(feature = "tui")]
if args.interactive {
return harmony_tui::run(inventory, topology, scores).await;
}
run_cli(inventory, topology, scores, args).await
}
pub async fn run_cli<T: Topology + Send + Sync + 'static>(
inventory: Inventory,
topology: T,
scores: Vec<Box<dyn Score<T>>>,
args: Args,
) -> Result<(), Box<dyn std::error::Error>> {
let cli_logger_handle = cli_logger::init();
let mut maestro = Maestro::initialize(inventory, topology).await.unwrap();
maestro.register_all(scores);
let result = init(maestro, args).await;
let result = init(maestro, args_struct).await;
instrumentation::instrument(instrumentation::HarmonyEvent::HarmonyFinished).unwrap();
let _ = tokio::try_join!(cli_logger_handle);
@@ -129,8 +105,23 @@ pub async fn run_cli<T: Topology + Send + Sync + 'static>(
async fn init<T: Topology + Send + Sync + 'static>(
maestro: harmony::maestro::Maestro<T>,
args: Args,
args_struct: Option<Args>,
) -> Result<(), Box<dyn std::error::Error>> {
let args = match args_struct {
Some(args) => args,
None => Args::parse(),
};
#[cfg(feature = "tui")]
if args.interactive {
return harmony_tui::init(maestro).await;
}
#[cfg(not(feature = "tui"))]
if args.interactive {
return Err("Not compiled with interactive support".into());
}
let _ = env_logger::builder().try_init();
let scores_vec = maestro_scores_filter(&maestro, args.all, args.filter, args.number);
@@ -202,14 +193,14 @@ mod tests {
let maestro = init_test_maestro();
let res = crate::init(
maestro,
crate::Args {
Some(crate::Args {
yes: true,
filter: Some("SuccessScore".to_owned()),
interactive: false,
all: true,
number: 0,
list: false,
},
}),
)
.await;
@@ -222,14 +213,14 @@ mod tests {
let res = crate::init(
maestro,
crate::Args {
Some(crate::Args {
yes: true,
filter: Some("ErrorScore".to_owned()),
interactive: false,
all: true,
number: 0,
list: false,
},
}),
)
.await;
@@ -242,14 +233,14 @@ mod tests {
let res = crate::init(
maestro,
crate::Args {
Some(crate::Args {
yes: true,
filter: None,
interactive: false,
all: false,
number: 0,
list: false,
},
}),
)
.await;

View File

@@ -9,13 +9,7 @@ use widget::{help::HelpWidget, score::ScoreListWidget};
use std::{panic, sync::Arc, time::Duration};
use crossterm::event::{Event, EventStream, KeyCode, KeyEventKind};
use harmony::{
instrumentation::{self, HarmonyEvent},
inventory::Inventory,
maestro::Maestro,
score::Score,
topology::Topology,
};
use harmony::{maestro::Maestro, score::Score, topology::Topology};
use ratatui::{
self, Frame,
layout::{Constraint, Layout, Position},
@@ -45,62 +39,22 @@ pub mod tui {
///
/// #[tokio::main]
/// async fn main() {
/// harmony_tui::run(
/// Inventory::autoload(),
/// HAClusterTopology::autoload(),
/// vec![
/// Box::new(SuccessScore {}),
/// Box::new(ErrorScore {}),
/// Box::new(PanicScore {}),
/// ]
/// ).await.unwrap();
/// let inventory = Inventory::autoload();
/// let topology = HAClusterTopology::autoload();
/// let mut maestro = Maestro::new_without_initialization(inventory, topology);
///
/// maestro.register_all(vec![
/// Box::new(SuccessScore {}),
/// Box::new(ErrorScore {}),
/// Box::new(PanicScore {}),
/// ]);
/// harmony_tui::init(maestro).await.unwrap();
/// }
/// ```
pub async fn run<T: Topology + Send + Sync + 'static>(
inventory: Inventory,
topology: T,
scores: Vec<Box<dyn Score<T>>>,
) -> Result<(), Box<dyn std::error::Error>> {
let handle = init_instrumentation().await;
let mut maestro = Maestro::initialize(inventory, topology).await.unwrap();
maestro.register_all(scores);
let result = init(maestro).await;
let _ = tokio::try_join!(handle);
result
}
async fn init<T: Topology + Send + Sync + 'static>(
pub async fn init<T: Topology + Send + Sync + 'static>(
maestro: Maestro<T>,
) -> Result<(), Box<dyn std::error::Error>> {
let result = HarmonyTUI::new(maestro).init().await;
instrumentation::instrument(HarmonyEvent::HarmonyFinished).unwrap();
result
}
async fn init_instrumentation() -> tokio::task::JoinHandle<()> {
let handle = tokio::spawn(handle_harmony_events());
loop {
if instrumentation::instrument(HarmonyEvent::HarmonyStarted).is_ok() {
break;
}
}
handle
}
async fn handle_harmony_events() {
instrumentation::subscribe("Harmony TUI Logger", async |event| {
if let HarmonyEvent::HarmonyFinished = event {
return false;
};
true
})
.await;
HarmonyTUI::new(maestro).init().await
}
pub struct HarmonyTUI<T: Topology> {