diff --git a/.gitea/workflows/check.yml b/.gitea/workflows/check.yml index ae2d001..2014633 100644 --- a/.gitea/workflows/check.yml +++ b/.gitea/workflows/check.yml @@ -9,7 +9,7 @@ jobs: check: runs-on: docker container: - image: hub.nationtech.io/harmony/harmony_composer:latest@sha256:eb0406fcb95c63df9b7c4b19bc50ad7914dd8232ce98e9c9abef628e07c69386 + image: hub.nationtech.io/harmony/harmony_composer:latest steps: - name: Checkout code uses: actions/checkout@v4 diff --git a/.gitea/workflows/harmony_composer.yaml b/.gitea/workflows/harmony_composer.yaml index fbb809b..273922a 100644 --- a/.gitea/workflows/harmony_composer.yaml +++ b/.gitea/workflows/harmony_composer.yaml @@ -7,7 +7,7 @@ on: jobs: package_harmony_composer: container: - image: hub.nationtech.io/harmony/harmony_composer:latest@sha256:eb0406fcb95c63df9b7c4b19bc50ad7914dd8232ce98e9c9abef628e07c69386 + image: hub.nationtech.io/harmony/harmony_composer:latest runs-on: dind steps: - name: Checkout code @@ -45,14 +45,14 @@ jobs: -H "Authorization: token ${{ secrets.GITEATOKEN }}" \ "https://git.nationtech.io/api/v1/repos/nationtech/harmony/releases/tags/snapshot-latest" \ | jq -r '.id // empty') - + if [ -n "$RELEASE_ID" ]; then # Delete existing release curl -X DELETE \ -H "Authorization: token ${{ secrets.GITEATOKEN }}" \ "https://git.nationtech.io/api/v1/repos/nationtech/harmony/releases/$RELEASE_ID" fi - + # Create new release RESPONSE=$(curl -X POST \ -H "Authorization: token ${{ secrets.GITEATOKEN }}" \ @@ -65,7 +65,7 @@ jobs: "prerelease": true }' \ "https://git.nationtech.io/api/v1/repos/nationtech/harmony/releases") - + echo "RELEASE_ID=$(echo $RESPONSE | jq -r '.id')" >> $GITHUB_ENV - name: Upload Linux binary diff --git a/harmony/Cargo.toml b/harmony/Cargo.toml index 3d53bba..5a42cf7 100644 --- a/harmony/Cargo.toml +++ b/harmony/Cargo.toml @@ -5,6 +5,9 @@ version.workspace = true readme.workspace = true license.workspace = true +[features] +testing = [] + [dependencies] rand = "0.9" hex = "0.4" diff --git a/harmony/src/domain/config.rs b/harmony/src/domain/config.rs index 20f08a2..62f612f 100644 --- a/harmony/src/domain/config.rs +++ b/harmony/src/domain/config.rs @@ -11,5 +11,5 @@ lazy_static! { pub static ref REGISTRY_PROJECT: String = std::env::var("HARMONY_REGISTRY_PROJECT").unwrap_or_else(|_| "harmony".to_string()); pub static ref DRY_RUN: bool = - std::env::var("HARMONY_DRY_RUN").map_or(true, |value| value.parse().unwrap_or(true)); + std::env::var("HARMONY_DRY_RUN").is_ok_and(|value| value.parse().unwrap_or(false)); } diff --git a/harmony/src/domain/instrumentation.rs b/harmony/src/domain/instrumentation.rs index 3d45722..2e113a3 100644 --- a/harmony/src/domain/instrumentation.rs +++ b/harmony/src/domain/instrumentation.rs @@ -2,28 +2,34 @@ use log::debug; use once_cell::sync::Lazy; use tokio::sync::broadcast; -use super::interpret::{InterpretError, Outcome}; +use super::{ + interpret::{InterpretError, Outcome}, + topology::TopologyStatus, +}; #[derive(Debug, Clone)] pub enum HarmonyEvent { HarmonyStarted, - PrepareTopologyStarted { - topology: String, - }, - TopologyPrepared { - topology: String, - outcome: Outcome, - }, + HarmonyFinished, InterpretExecutionStarted { + execution_id: String, topology: String, interpret: String, + score: String, message: String, }, InterpretExecutionFinished { + execution_id: String, topology: String, interpret: String, + score: String, outcome: Result, }, + TopologyStateChanged { + topology: String, + status: TopologyStatus, + message: Option, + }, } static HARMONY_EVENT_BUS: Lazy> = Lazy::new(|| { @@ -33,9 +39,14 @@ static HARMONY_EVENT_BUS: Lazy> = Lazy::new(|| { }); pub fn instrument(event: HarmonyEvent) -> Result<(), &'static str> { - match HARMONY_EVENT_BUS.send(event) { - Ok(_) => Ok(()), - Err(_) => Err("send error: no subscribers"), + if cfg!(any(test, feature = "testing")) { + let _ = event; // Suppress the "unused variable" warning for `event` + Ok(()) + } else { + match HARMONY_EVENT_BUS.send(event) { + Ok(_) => Ok(()), + Err(_) => Err("send error: no subscribers"), + } } } diff --git a/harmony/src/domain/interpret/mod.rs b/harmony/src/domain/interpret/mod.rs index add7b70..cfbf2b5 100644 --- a/harmony/src/domain/interpret/mod.rs +++ b/harmony/src/domain/interpret/mod.rs @@ -7,6 +7,7 @@ use super::{ data::{Id, Version}, executors::ExecutorError, inventory::Inventory, + topology::PreparationError, }; pub enum InterpretName { @@ -23,6 +24,14 @@ pub enum InterpretName { TenantInterpret, Application, ArgoCD, + Alerting, + Ntfy, + HelmChart, + HelmCommand, + K8sResource, + Lamp, + ApplicationMonitoring, + K8sPrometheusCrdAlerting, } impl std::fmt::Display for InterpretName { @@ -41,6 +50,14 @@ impl std::fmt::Display for InterpretName { InterpretName::TenantInterpret => f.write_str("Tenant"), InterpretName::Application => f.write_str("Application"), InterpretName::ArgoCD => f.write_str("ArgoCD"), + InterpretName::Alerting => f.write_str("Alerting"), + InterpretName::Ntfy => f.write_str("Ntfy"), + InterpretName::HelmChart => f.write_str("HelmChart"), + InterpretName::HelmCommand => f.write_str("HelmCommand"), + InterpretName::K8sResource => f.write_str("K8sResource"), + InterpretName::Lamp => f.write_str("LAMP"), + InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"), + InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"), } } } @@ -113,6 +130,14 @@ impl std::fmt::Display for InterpretError { } impl Error for InterpretError {} +impl From for InterpretError { + fn from(value: PreparationError) -> Self { + Self { + msg: format!("InterpretError : {value}"), + } + } +} + impl From for InterpretError { fn from(value: ExecutorError) -> Self { Self { diff --git a/harmony/src/domain/maestro/mod.rs b/harmony/src/domain/maestro/mod.rs index ceab27a..d9587c8 100644 --- a/harmony/src/domain/maestro/mod.rs +++ b/harmony/src/domain/maestro/mod.rs @@ -1,14 +1,14 @@ -use std::sync::{Arc, Mutex, RwLock}; +use std::sync::{Arc, RwLock}; use log::{debug, warn}; -use crate::instrumentation::{self, HarmonyEvent}; +use crate::topology::TopologyStatus; use super::{ - interpret::{InterpretError, InterpretStatus, Outcome}, + interpret::{InterpretError, Outcome}, inventory::Inventory, score::Score, - topology::Topology, + topology::{PreparationError, PreparationOutcome, Topology, TopologyState}, }; type ScoreVec = Vec>>; @@ -17,7 +17,7 @@ pub struct Maestro { inventory: Inventory, topology: T, scores: Arc>>, - topology_preparation_result: Mutex>, + topology_state: TopologyState, } impl Maestro { @@ -25,41 +25,46 @@ impl Maestro { /// /// This should rarely be used. Most of the time Maestro::initialize should be used instead. pub fn new_without_initialization(inventory: Inventory, topology: T) -> Self { + let topology_name = topology.name().to_string(); + Self { inventory, topology, scores: Arc::new(RwLock::new(Vec::new())), - topology_preparation_result: None.into(), + topology_state: TopologyState::new(topology_name), } } - pub async fn initialize(inventory: Inventory, topology: T) -> Result { - let instance = Self::new_without_initialization(inventory, topology); + pub async fn initialize(inventory: Inventory, topology: T) -> Result { + let mut instance = Self::new_without_initialization(inventory, topology); instance.prepare_topology().await?; Ok(instance) } /// Ensures the associated Topology is ready for operations. /// Delegates the readiness check and potential setup actions to the Topology. - pub async fn prepare_topology(&self) -> Result { - instrumentation::instrument(HarmonyEvent::PrepareTopologyStarted { - topology: self.topology.name().to_string(), - }) - .unwrap(); + async fn prepare_topology(&mut self) -> Result { + self.topology_state.prepare(); - let outcome = self.topology.ensure_ready().await?; + let result = self.topology.ensure_ready().await; - instrumentation::instrument(HarmonyEvent::TopologyPrepared { - topology: self.topology.name().to_string(), - outcome: outcome.clone(), - }) - .unwrap(); - - self.topology_preparation_result - .lock() - .unwrap() - .replace(outcome.clone()); - Ok(outcome) + match result { + Ok(outcome) => { + match outcome.clone() { + PreparationOutcome::Success { details } => { + self.topology_state.success(details); + } + PreparationOutcome::Noop => { + self.topology_state.noop(); + } + }; + Ok(outcome) + } + Err(err) => { + self.topology_state.error(err.to_string()); + Err(err) + } + } } pub fn register_all(&mut self, mut scores: ScoreVec) { @@ -68,12 +73,7 @@ impl Maestro { } fn is_topology_initialized(&self) -> bool { - let result = self.topology_preparation_result.lock().unwrap(); - if let Some(outcome) = result.as_ref() { - matches!(outcome.status, InterpretStatus::SUCCESS) - } else { - false - } + self.topology_state.status == TopologyStatus::Success } pub async fn interpret(&self, score: Box>) -> Result { @@ -84,10 +84,8 @@ impl Maestro { self.topology.name(), ); } - debug!("Running score {score:?}"); - let interpret = score.create_interpret(); - debug!("Launching interpret {interpret:?}"); - let result = interpret.execute(&self.inventory, &self.topology).await; + debug!("Interpreting score {score:?}"); + let result = score.interpret(&self.inventory, &self.topology).await; debug!("Got result {result:?}"); result } diff --git a/harmony/src/domain/score.rs b/harmony/src/domain/score.rs index a7eb9c7..d2585fe 100644 --- a/harmony/src/domain/score.rs +++ b/harmony/src/domain/score.rs @@ -1,15 +1,55 @@ use std::collections::BTreeMap; +use async_trait::async_trait; use serde::Serialize; use serde_value::Value; -use super::{interpret::Interpret, topology::Topology}; +use super::{ + data::Id, + instrumentation::{self, HarmonyEvent}, + interpret::{Interpret, InterpretError, Outcome}, + inventory::Inventory, + topology::Topology, +}; +#[async_trait] pub trait Score: std::fmt::Debug + ScoreToString + Send + Sync + CloneBoxScore + SerializeScore { - fn create_interpret(&self) -> Box>; + async fn interpret( + &self, + inventory: &Inventory, + topology: &T, + ) -> Result { + let id = Id::default(); + let interpret = self.create_interpret(); + + instrumentation::instrument(HarmonyEvent::InterpretExecutionStarted { + execution_id: id.clone().to_string(), + topology: topology.name().into(), + interpret: interpret.get_name().to_string(), + score: self.name(), + message: format!("{} running...", interpret.get_name()), + }) + .unwrap(); + let result = interpret.execute(inventory, topology).await; + + instrumentation::instrument(HarmonyEvent::InterpretExecutionFinished { + execution_id: id.clone().to_string(), + topology: topology.name().into(), + interpret: interpret.get_name().to_string(), + score: self.name(), + outcome: result.clone(), + }) + .unwrap(); + + result + } + fn name(&self) -> String; + + #[doc(hidden)] + fn create_interpret(&self) -> Box>; } pub trait SerializeScore { diff --git a/harmony/src/domain/topology/ha_cluster.rs b/harmony/src/domain/topology/ha_cluster.rs index a114e18..02ee66e 100644 --- a/harmony/src/domain/topology/ha_cluster.rs +++ b/harmony/src/domain/topology/ha_cluster.rs @@ -4,8 +4,6 @@ use harmony_types::net::MacAddress; use log::info; use crate::executors::ExecutorError; -use crate::interpret::InterpretError; -use crate::interpret::Outcome; use super::DHCPStaticEntry; use super::DhcpServer; @@ -19,6 +17,8 @@ use super::K8sclient; use super::LoadBalancer; use super::LoadBalancerService; use super::LogicalHost; +use super::PreparationError; +use super::PreparationOutcome; use super::Router; use super::TftpServer; @@ -48,7 +48,7 @@ impl Topology for HAClusterTopology { fn name(&self) -> &str { "HAClusterTopology" } - async fn ensure_ready(&self) -> Result { + async fn ensure_ready(&self) -> Result { todo!( "ensure_ready, not entirely sure what it should do here, probably something like verify that the hosts are reachable and all services are up and ready." ) @@ -244,10 +244,12 @@ impl Topology for DummyInfra { todo!() } - async fn ensure_ready(&self) -> Result { + async fn ensure_ready(&self) -> Result { let dummy_msg = "This is a dummy infrastructure that does nothing"; info!("{dummy_msg}"); - Ok(Outcome::success(dummy_msg.to_string())) + Ok(PreparationOutcome::Success { + details: dummy_msg.into(), + }) } } diff --git a/harmony/src/domain/topology/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere.rs index 65a226e..f81bef4 100644 --- a/harmony/src/domain/topology/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere.rs @@ -7,7 +7,7 @@ use tokio::sync::OnceCell; use crate::{ executors::ExecutorError, - interpret::{InterpretError, InterpretStatus, Outcome}, + interpret::InterpretStatus, inventory::Inventory, modules::{ k3d::K3DInstallationScore, @@ -24,7 +24,8 @@ use crate::{ }; use super::{ - DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, Topology, + DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, PreparationError, + PreparationOutcome, Topology, k8s::K8sClient, oberservability::monitoring::AlertReceiver, tenant::{ @@ -80,22 +81,30 @@ impl PrometheusApplicationMonitoring for K8sAnywhereTopology { sender: &CRDPrometheus, inventory: &Inventory, receivers: Option>>>, - ) -> Result { + ) -> Result { let po_result = self.ensure_prometheus_operator(sender).await?; - if po_result.status == InterpretStatus::NOOP { + if po_result == PreparationOutcome::Noop { debug!("Skipping Prometheus CR installation due to missing operator."); - return Ok(Outcome::noop()); + return Ok(po_result); } - self.get_k8s_prometheus_application_score(sender.clone(), receivers) - .await - .create_interpret() - .execute(inventory, self) - .await?; - Ok(Outcome::success( - "No action, working on cluster ".to_string(), - )) + let result = self + .get_k8s_prometheus_application_score(sender.clone(), receivers) + .await + .interpret(inventory, self) + .await; + + match result { + Ok(outcome) => match outcome.status { + InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success { + details: outcome.message, + }), + InterpretStatus::NOOP => Ok(PreparationOutcome::Noop), + _ => Err(PreparationError::new(outcome.message)), + }, + Err(err) => Err(PreparationError::new(err.to_string())), + } } } @@ -166,15 +175,23 @@ impl K8sAnywhereTopology { K3DInstallationScore::default() } - async fn try_install_k3d(&self) -> Result<(), InterpretError> { - self.get_k3d_installation_score() - .create_interpret() - .execute(&Inventory::empty(), self) - .await?; - Ok(()) + async fn try_install_k3d(&self) -> Result<(), PreparationError> { + let result = self + .get_k3d_installation_score() + .interpret(&Inventory::empty(), self) + .await; + + match result { + Ok(outcome) => match outcome.status { + InterpretStatus::SUCCESS => Ok(()), + InterpretStatus::NOOP => Ok(()), + _ => Err(PreparationError::new(outcome.message)), + }, + Err(err) => Err(PreparationError::new(err.to_string())), + } } - async fn try_get_or_install_k8s_client(&self) -> Result, InterpretError> { + async fn try_get_or_install_k8s_client(&self) -> Result, PreparationError> { let k8s_anywhere_config = &self.config; // TODO this deserves some refactoring, it is becoming a bit hard to figure out @@ -193,7 +210,7 @@ impl K8sAnywhereTopology { })); } None => { - return Err(InterpretError::new(format!( + return Err(PreparationError::new(format!( "Failed to load kubeconfig from {kubeconfig}" ))); } @@ -272,11 +289,11 @@ impl K8sAnywhereTopology { async fn ensure_prometheus_operator( &self, sender: &CRDPrometheus, - ) -> Result { + ) -> Result { let status = Command::new("sh") .args(["-c", "kubectl get crd -A | grep -i prometheuses"]) .status() - .map_err(|e| InterpretError::new(format!("could not connect to cluster: {}", e)))?; + .map_err(|e| PreparationError::new(format!("could not connect to cluster: {}", e)))?; if !status.success() { if let Some(Some(k8s_state)) = self.k8s_state.get() { @@ -285,30 +302,37 @@ impl K8sAnywhereTopology { debug!("installing prometheus operator"); let op_score = prometheus_operator_helm_chart_score(sender.namespace.clone()); - op_score - .create_interpret() - .execute(&Inventory::empty(), self) - .await?; - return Ok(Outcome::success( - "installed prometheus operator".to_string(), - )); + let result = op_score.interpret(&Inventory::empty(), self).await; + + return match result { + Ok(outcome) => match outcome.status { + InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success { + details: "installed prometheus operator".into(), + }), + InterpretStatus::NOOP => Ok(PreparationOutcome::Noop), + _ => Err(PreparationError::new( + "failed to install prometheus operator (unknown error)".into(), + )), + }, + Err(err) => Err(PreparationError::new(err.to_string())), + }; } K8sSource::Kubeconfig => { debug!("unable to install prometheus operator, contact cluster admin"); - return Ok(Outcome::noop()); + return Ok(PreparationOutcome::Noop); } } } else { warn!("Unable to detect k8s_state. Skipping Prometheus Operator install."); - return Ok(Outcome::noop()); + return Ok(PreparationOutcome::Noop); } } debug!("Prometheus operator is already present, skipping install"); - Ok(Outcome::success( - "prometheus operator present in cluster".to_string(), - )) + Ok(PreparationOutcome::Success { + details: "prometheus operator present in cluster".into(), + }) } } @@ -367,26 +391,25 @@ impl Topology for K8sAnywhereTopology { "K8sAnywhereTopology" } - async fn ensure_ready(&self) -> Result { + async fn ensure_ready(&self) -> Result { let k8s_state = self .k8s_state .get_or_try_init(|| self.try_get_or_install_k8s_client()) .await?; - let k8s_state: &K8sState = k8s_state.as_ref().ok_or(InterpretError::new( - "No K8s client could be found or installed".to_string(), + let k8s_state: &K8sState = k8s_state.as_ref().ok_or(PreparationError::new( + "no K8s client could be found or installed".to_string(), ))?; self.ensure_k8s_tenant_manager(k8s_state) .await - .map_err(InterpretError::new)?; + .map_err(PreparationError::new)?; match self.is_helm_available() { - Ok(()) => Ok(Outcome::success(format!( - "{} + helm available", - k8s_state.message.clone() - ))), - Err(e) => Err(InterpretError::new(format!("helm unavailable: {}", e))), + Ok(()) => Ok(PreparationOutcome::Success { + details: format!("{} + helm available", k8s_state.message.clone()), + }), + Err(e) => Err(PreparationError::new(format!("helm unavailable: {}", e))), } } } diff --git a/harmony/src/domain/topology/localhost.rs b/harmony/src/domain/topology/localhost.rs index c5dcc75..71a8b93 100644 --- a/harmony/src/domain/topology/localhost.rs +++ b/harmony/src/domain/topology/localhost.rs @@ -1,9 +1,7 @@ use async_trait::async_trait; use derive_new::new; -use crate::interpret::{InterpretError, Outcome}; - -use super::{HelmCommand, Topology}; +use super::{HelmCommand, PreparationError, PreparationOutcome, Topology}; #[derive(new)] pub struct LocalhostTopology; @@ -14,10 +12,10 @@ impl Topology for LocalhostTopology { "LocalHostTopology" } - async fn ensure_ready(&self) -> Result { - Ok(Outcome::success( - "Localhost is Chuck Norris, always ready.".to_string(), - )) + async fn ensure_ready(&self) -> Result { + Ok(PreparationOutcome::Success { + details: "Localhost is Chuck Norris, always ready.".into(), + }) } } diff --git a/harmony/src/domain/topology/mod.rs b/harmony/src/domain/topology/mod.rs index 72c6c3f..81b3f12 100644 --- a/harmony/src/domain/topology/mod.rs +++ b/harmony/src/domain/topology/mod.rs @@ -6,6 +6,7 @@ mod k8s_anywhere; mod localhost; pub mod oberservability; pub mod tenant; +use derive_new::new; pub use k8s_anywhere::*; pub use localhost::*; pub mod k8s; @@ -26,10 +27,13 @@ pub use tftp::*; mod helm_command; pub use helm_command::*; +use super::{ + executors::ExecutorError, + instrumentation::{self, HarmonyEvent}, +}; +use std::error::Error; use std::net::IpAddr; -use super::interpret::{InterpretError, Outcome}; - /// Represents a logical view of an infrastructure environment providing specific capabilities. /// /// A Topology acts as a self-contained "package" responsible for managing access @@ -57,9 +61,128 @@ pub trait Topology: Send + Sync { /// * **Internal Orchestration:** For complex topologies, this method might manage dependencies on other sub-topologies, ensuring *their* `ensure_ready` is called first. Using nested `Maestros` to run setup `Scores` against these sub-topologies is the recommended pattern for non-trivial bootstrapping, allowing reuse of Harmony's core orchestration logic. /// /// # Returns - /// - `Ok(Outcome)`: Indicates the topology is now ready. The `Outcome` status might be `SUCCESS` if actions were taken, or `NOOP` if it was already ready. The message should provide context. - /// - `Err(TopologyError)`: Indicates the topology could not reach a ready state due to configuration issues, discovery failures, bootstrap errors, or unsupported environments. - async fn ensure_ready(&self) -> Result; + /// - `Ok(PreparationOutcome)`: Indicates the topology is now ready. The `Outcome` status might be `SUCCESS` if actions were taken, or `NOOP` if it was already ready. The message should provide context. + /// - `Err(PreparationError)`: Indicates the topology could not reach a ready state due to configuration issues, discovery failures, bootstrap errors, or unsupported environments. + async fn ensure_ready(&self) -> Result; +} + +#[derive(Debug, Clone, PartialEq, Eq)] +pub enum PreparationOutcome { + Success { details: String }, + Noop, +} + +#[derive(Debug, Clone, new)] +pub struct PreparationError { + msg: String, +} + +impl std::fmt::Display for PreparationError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.write_str(&self.msg) + } +} + +impl Error for PreparationError {} + +impl From for PreparationError { + fn from(value: ExecutorError) -> Self { + Self { + msg: format!("InterpretError : {value}"), + } + } +} + +impl From for PreparationError { + fn from(value: kube::Error) -> Self { + Self { + msg: format!("PreparationError : {value}"), + } + } +} + +impl From for PreparationError { + fn from(value: String) -> Self { + Self { + msg: format!("PreparationError : {value}"), + } + } +} + +#[derive(Clone, Debug, PartialEq)] +pub enum TopologyStatus { + Queued, + Preparing, + Success, + Noop, + Error, +} + +pub struct TopologyState { + pub topology: String, + pub status: TopologyStatus, +} + +impl TopologyState { + pub fn new(topology: String) -> Self { + let instance = Self { + topology, + status: TopologyStatus::Queued, + }; + + instrumentation::instrument(HarmonyEvent::TopologyStateChanged { + topology: instance.topology.clone(), + status: instance.status.clone(), + message: None, + }) + .unwrap(); + + instance + } + + pub fn prepare(&mut self) { + self.status = TopologyStatus::Preparing; + + instrumentation::instrument(HarmonyEvent::TopologyStateChanged { + topology: self.topology.clone(), + status: self.status.clone(), + message: None, + }) + .unwrap(); + } + + pub fn success(&mut self, message: String) { + self.status = TopologyStatus::Success; + + instrumentation::instrument(HarmonyEvent::TopologyStateChanged { + topology: self.topology.clone(), + status: self.status.clone(), + message: Some(message), + }) + .unwrap(); + } + + pub fn noop(&mut self) { + self.status = TopologyStatus::Noop; + + instrumentation::instrument(HarmonyEvent::TopologyStateChanged { + topology: self.topology.clone(), + status: self.status.clone(), + message: None, + }) + .unwrap(); + } + + pub fn error(&mut self, message: String) { + self.status = TopologyStatus::Error; + + instrumentation::instrument(HarmonyEvent::TopologyStateChanged { + topology: self.topology.clone(), + status: self.status.clone(), + message: Some(message), + }) + .unwrap(); + } } #[derive(Debug)] diff --git a/harmony/src/domain/topology/oberservability/monitoring.rs b/harmony/src/domain/topology/oberservability/monitoring.rs index 7fa6eb4..c2e93d6 100644 --- a/harmony/src/domain/topology/oberservability/monitoring.rs +++ b/harmony/src/domain/topology/oberservability/monitoring.rs @@ -45,7 +45,7 @@ impl, T: Topology> Interpret for AlertingInte } fn get_name(&self) -> InterpretName { - todo!() + InterpretName::Alerting } fn get_version(&self) -> Version { diff --git a/harmony/src/domain/topology/tenant/k8s.rs b/harmony/src/domain/topology/tenant/k8s.rs index 0d74a62..3570d83 100644 --- a/harmony/src/domain/topology/tenant/k8s.rs +++ b/harmony/src/domain/topology/tenant/k8s.rs @@ -15,7 +15,7 @@ use k8s_openapi::{ apimachinery::pkg::util::intstr::IntOrString, }; use kube::Resource; -use log::{debug, info, warn}; +use log::debug; use serde::de::DeserializeOwned; use serde_json::json; use tokio::sync::OnceCell; @@ -46,8 +46,7 @@ impl K8sTenantManager { } fn ensure_constraints(&self, _namespace: &Namespace) -> Result<(), ExecutorError> { - warn!("Validate that when tenant already exists (by id) that name has not changed"); - warn!("Make sure other Tenant constraints are respected by this k8s implementation"); + // TODO: Ensure constraints are applied to namespace (https://git.nationtech.io/NationTech/harmony/issues/98) Ok(()) } @@ -437,13 +436,14 @@ impl TenantManager for K8sTenantManager { debug!("Creating network_policy for tenant {}", config.name); self.apply_resource(network_policy, config).await?; - info!( + debug!( "Success provisionning K8s tenant id {} name {}", config.id, config.name ); self.store_config(config); Ok(()) } + async fn get_tenant_config(&self) -> Option { self.k8s_tenant_config.get().cloned() } diff --git a/harmony/src/modules/application/features/continuous_delivery.rs b/harmony/src/modules/application/features/continuous_delivery.rs index de778fb..e53bd36 100644 --- a/harmony/src/modules/application/features/continuous_delivery.rs +++ b/harmony/src/modules/application/features/continuous_delivery.rs @@ -193,8 +193,7 @@ impl< })], }; score - .create_interpret() - .execute(&Inventory::empty(), topology) + .interpret(&Inventory::empty(), topology) .await .unwrap(); } diff --git a/harmony/src/modules/application/features/helm_argocd_score.rs b/harmony/src/modules/application/features/helm_argocd_score.rs index ff79740..5a91798 100644 --- a/harmony/src/modules/application/features/helm_argocd_score.rs +++ b/harmony/src/modules/application/features/helm_argocd_score.rs @@ -51,10 +51,7 @@ impl Interpret for ArgoInterpret { topology: &T, ) -> Result { error!("Uncomment below, only disabled for debugging"); - self.score - .create_interpret() - .execute(inventory, topology) - .await?; + self.score.interpret(inventory, topology).await?; let k8s_client = topology.k8s_client().await?; k8s_client @@ -62,7 +59,7 @@ impl Interpret for ArgoInterpret { .await .unwrap(); Ok(Outcome::success(format!( - "Successfully installed ArgoCD and {} Applications", + "ArgoCD installed with {} applications", self.argo_apps.len() ))) } diff --git a/harmony/src/modules/application/features/monitoring.rs b/harmony/src/modules/application/features/monitoring.rs index 4c7632c..1ffdace 100644 --- a/harmony/src/modules/application/features/monitoring.rs +++ b/harmony/src/modules/application/features/monitoring.rs @@ -57,8 +57,7 @@ impl< namespace: namespace.clone(), host: "localhost".to_string(), }; - ntfy.create_interpret() - .execute(&Inventory::empty(), topology) + ntfy.interpret(&Inventory::empty(), topology) .await .expect("couldn't create interpret for ntfy"); @@ -95,8 +94,7 @@ impl< alerting_score.receivers.push(Box::new(ntfy_receiver)); alerting_score - .create_interpret() - .execute(&Inventory::empty(), topology) + .interpret(&Inventory::empty(), topology) .await .unwrap(); Ok(()) diff --git a/harmony/src/modules/application/mod.rs b/harmony/src/modules/application/mod.rs index 3e70e78..4ca9c54 100644 --- a/harmony/src/modules/application/mod.rs +++ b/harmony/src/modules/application/mod.rs @@ -60,7 +60,7 @@ impl Interpret for Application } }; } - Ok(Outcome::success("successfully created app".to_string())) + Ok(Outcome::success("Application created".to_string())) } fn get_name(&self) -> InterpretName { diff --git a/harmony/src/modules/application/rust.rs b/harmony/src/modules/application/rust.rs index 4f30e3e..22a1c42 100644 --- a/harmony/src/modules/application/rust.rs +++ b/harmony/src/modules/application/rust.rs @@ -46,7 +46,7 @@ where } fn name(&self) -> String { - format!("Application: {}", self.application.name()) + format!("{} [ApplicationScore]", self.application.name()) } } diff --git a/harmony/src/modules/helm/chart.rs b/harmony/src/modules/helm/chart.rs index c4321d4..dd94678 100644 --- a/harmony/src/modules/helm/chart.rs +++ b/harmony/src/modules/helm/chart.rs @@ -55,7 +55,7 @@ impl Score for HelmChartScore { } fn name(&self) -> String { - format!("{} {} HelmChartScore", self.release_name, self.chart_name) + format!("{} [HelmChartScore]", self.release_name) } } @@ -225,24 +225,27 @@ impl Interpret for HelmChartInterpret { match status { helm_wrapper_rs::HelmDeployStatus::Deployed => Ok(Outcome::new( InterpretStatus::SUCCESS, - "Helm Chart deployed".to_string(), + format!("Helm Chart {} deployed", self.score.release_name), )), helm_wrapper_rs::HelmDeployStatus::PendingInstall => Ok(Outcome::new( InterpretStatus::RUNNING, - "Helm Chart Pending install".to_string(), + format!("Helm Chart {} pending install...", self.score.release_name), )), helm_wrapper_rs::HelmDeployStatus::PendingUpgrade => Ok(Outcome::new( InterpretStatus::RUNNING, - "Helm Chart pending upgrade".to_string(), - )), - helm_wrapper_rs::HelmDeployStatus::Failed => Err(InterpretError::new( - "Failed to install helm chart".to_string(), + format!("Helm Chart {} pending upgrade...", self.score.release_name), )), + helm_wrapper_rs::HelmDeployStatus::Failed => Err(InterpretError::new(format!( + "Helm Chart {} installation failed", + self.score.release_name + ))), } } + fn get_name(&self) -> InterpretName { - todo!() + InterpretName::HelmChart } + fn get_version(&self) -> Version { todo!() } diff --git a/harmony/src/modules/helm/command.rs b/harmony/src/modules/helm/command.rs index 60c6fb6..149d6c6 100644 --- a/harmony/src/modules/helm/command.rs +++ b/harmony/src/modules/helm/command.rs @@ -349,7 +349,7 @@ impl Interpret for HelmChartInterpretV } fn get_name(&self) -> InterpretName { - todo!() + InterpretName::HelmCommand } fn get_version(&self) -> Version { todo!() diff --git a/harmony/src/modules/k3d/install.rs b/harmony/src/modules/k3d/install.rs index 9490dee..245cf41 100644 --- a/harmony/src/modules/k3d/install.rs +++ b/harmony/src/modules/k3d/install.rs @@ -7,7 +7,6 @@ use serde::Serialize; use crate::{ config::HARMONY_DATA_DIR, data::{Id, Version}, - instrumentation::{self, HarmonyEvent}, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, inventory::Inventory, score::Score, @@ -30,14 +29,14 @@ impl Default for K3DInstallationScore { } impl Score for K3DInstallationScore { - fn create_interpret(&self) -> Box> { + fn create_interpret(&self) -> Box> { Box::new(K3dInstallationInterpret { score: self.clone(), }) } fn name(&self) -> String { - todo!() + "K3dInstallationScore".into() } } @@ -51,20 +50,14 @@ impl Interpret for K3dInstallationInterpret { async fn execute( &self, _inventory: &Inventory, - topology: &T, + _topology: &T, ) -> Result { - instrumentation::instrument(HarmonyEvent::InterpretExecutionStarted { - topology: topology.name().into(), - interpret: "k3d-installation".into(), - message: "installing k3d...".into(), - }) - .unwrap(); - let k3d = k3d_rs::K3d::new( self.score.installation_path.clone(), Some(self.score.cluster_name.clone()), ); - let outcome = match k3d.ensure_installed().await { + + match k3d.ensure_installed().await { Ok(_client) => { let msg = format!("k3d cluster '{}' installed ", self.score.cluster_name); debug!("{msg}"); @@ -73,16 +66,7 @@ impl Interpret for K3dInstallationInterpret { Err(msg) => Err(InterpretError::new(format!( "failed to ensure k3d is installed : {msg}" ))), - }; - - instrumentation::instrument(HarmonyEvent::InterpretExecutionFinished { - topology: topology.name().into(), - interpret: "k3d-installation".into(), - outcome: outcome.clone(), - }) - .unwrap(); - - outcome + } } fn get_name(&self) -> InterpretName { InterpretName::K3dInstallation diff --git a/harmony/src/modules/k8s/resource.rs b/harmony/src/modules/k8s/resource.rs index 3c0b2bf..b6709ea 100644 --- a/harmony/src/modules/k8s/resource.rs +++ b/harmony/src/modules/k8s/resource.rs @@ -89,7 +89,7 @@ where )) } fn get_name(&self) -> InterpretName { - todo!() + InterpretName::K8sResource } fn get_version(&self) -> Version { todo!() diff --git a/harmony/src/modules/lamp.rs b/harmony/src/modules/lamp.rs index 29228d8..1a853ea 100644 --- a/harmony/src/modules/lamp.rs +++ b/harmony/src/modules/lamp.rs @@ -128,10 +128,7 @@ impl Interpret for LAMPInterpret { info!("Deploying score {deployment_score:#?}"); - deployment_score - .create_interpret() - .execute(inventory, topology) - .await?; + deployment_score.interpret(inventory, topology).await?; info!("LAMP deployment_score {deployment_score:?}"); @@ -153,10 +150,7 @@ impl Interpret for LAMPInterpret { .map(|nbs| fqdn!(nbs.to_string().as_str())), }; - lamp_ingress - .create_interpret() - .execute(inventory, topology) - .await?; + lamp_ingress.interpret(inventory, topology).await?; info!("LAMP lamp_ingress {lamp_ingress:?}"); @@ -166,7 +160,7 @@ impl Interpret for LAMPInterpret { } fn get_name(&self) -> InterpretName { - todo!() + InterpretName::Lamp } fn get_version(&self) -> Version { @@ -215,7 +209,7 @@ impl LAMPInterpret { repository: None, }; - score.create_interpret().execute(inventory, topology).await + score.interpret(inventory, topology).await } fn build_dockerfile(&self, score: &LAMPScore) -> Result> { let mut dockerfile = Dockerfile::new(); diff --git a/harmony/src/modules/monitoring/application_monitoring/application_monitoring_score.rs b/harmony/src/modules/monitoring/application_monitoring/application_monitoring_score.rs index f888b19..51c8ff9 100644 --- a/harmony/src/modules/monitoring/application_monitoring/application_monitoring_score.rs +++ b/harmony/src/modules/monitoring/application_monitoring/application_monitoring_score.rs @@ -13,7 +13,7 @@ use crate::{ prometheus::prometheus::PrometheusApplicationMonitoring, }, score::Score, - topology::{Topology, oberservability::monitoring::AlertReceiver}, + topology::{PreparationOutcome, Topology, oberservability::monitoring::AlertReceiver}, }; #[derive(Debug, Clone, Serialize)] @@ -33,7 +33,10 @@ impl> Score } fn name(&self) -> String { - "ApplicationMonitoringScore".to_string() + format!( + "{} monitoring [ApplicationMonitoringScore]", + self.application.name() + ) } } @@ -51,17 +54,27 @@ impl> Interpret inventory: &Inventory, topology: &T, ) -> Result { - topology + let result = topology .install_prometheus( &self.score.sender, inventory, Some(self.score.receivers.clone()), ) - .await + .await; + + match result { + Ok(outcome) => match outcome { + PreparationOutcome::Success { details: _ } => { + Ok(Outcome::success("Prometheus installed".into())) + } + PreparationOutcome::Noop => Ok(Outcome::noop()), + }, + Err(err) => Err(InterpretError::from(err)), + } } fn get_name(&self) -> InterpretName { - todo!() + InterpretName::ApplicationMonitoring } fn get_version(&self) -> Version { diff --git a/harmony/src/modules/monitoring/kube_prometheus/prometheus.rs b/harmony/src/modules/monitoring/kube_prometheus/prometheus.rs index 4cf2b47..98970e6 100644 --- a/harmony/src/modules/monitoring/kube_prometheus/prometheus.rs +++ b/harmony/src/modules/monitoring/kube_prometheus/prometheus.rs @@ -119,8 +119,7 @@ impl KubePrometheus { topology: &T, ) -> Result { kube_prometheus_helm_chart_score(self.config.clone()) - .create_interpret() - .execute(inventory, topology) + .interpret(inventory, topology) .await } } diff --git a/harmony/src/modules/monitoring/ntfy/ntfy.rs b/harmony/src/modules/monitoring/ntfy/ntfy.rs index a640bf4..8ad3230 100644 --- a/harmony/src/modules/monitoring/ntfy/ntfy.rs +++ b/harmony/src/modules/monitoring/ntfy/ntfy.rs @@ -28,7 +28,7 @@ impl Score for NtfyScore { } fn name(&self) -> String { - "Ntfy".to_string() + "alert receiver [NtfyScore]".into() } } @@ -96,8 +96,7 @@ impl Interpret for NtfyInterpret { topology: &T, ) -> Result { ntfy_helm_chart_score(self.score.namespace.clone(), self.score.host.clone()) - .create_interpret() - .execute(inventory, topology) + .interpret(inventory, topology) .await?; debug!("installed ntfy helm chart"); @@ -120,12 +119,13 @@ impl Interpret for NtfyInterpret { debug!("exec into pod done"); - Ok(Outcome::success("installed ntfy".to_string())) + Ok(Outcome::success("Ntfy installed".to_string())) } fn get_name(&self) -> InterpretName { - todo!() + InterpretName::Ntfy } + fn get_version(&self) -> Version { todo!() } diff --git a/harmony/src/modules/monitoring/prometheus/prometheus.rs b/harmony/src/modules/monitoring/prometheus/prometheus.rs index 934f1ae..a207d5a 100644 --- a/harmony/src/modules/monitoring/prometheus/prometheus.rs +++ b/harmony/src/modules/monitoring/prometheus/prometheus.rs @@ -100,8 +100,7 @@ impl Prometheus { topology: &T, ) -> Result { prometheus_helm_chart_score(self.config.clone()) - .create_interpret() - .execute(inventory, topology) + .interpret(inventory, topology) .await } pub async fn install_grafana( @@ -116,8 +115,7 @@ impl Prometheus { if let Some(ns) = namespace.as_deref() { grafana_helm_chart_score(ns) - .create_interpret() - .execute(inventory, topology) + .interpret(inventory, topology) .await } else { Err(InterpretError::new( diff --git a/harmony/src/modules/prometheus/k8s_prometheus_alerting_score.rs b/harmony/src/modules/prometheus/k8s_prometheus_alerting_score.rs index e806c63..c70f5e5 100644 --- a/harmony/src/modules/prometheus/k8s_prometheus_alerting_score.rs +++ b/harmony/src/modules/prometheus/k8s_prometheus_alerting_score.rs @@ -61,7 +61,7 @@ impl> S } fn name(&self) -> String { - "CRDApplicationAlertingScore".into() + "prometheus alerting [CRDAlertingScore]".into() } } @@ -94,12 +94,12 @@ impl> I self.install_monitors(self.service_monitors.clone(), &client) .await?; Ok(Outcome::success( - "deployed application monitoring composants".to_string(), + "K8s monitoring components installed".to_string(), )) } fn get_name(&self) -> InterpretName { - todo!() + InterpretName::K8sPrometheusCrdAlerting } fn get_version(&self) -> Version { @@ -118,7 +118,7 @@ impl> I impl K8sPrometheusCRDAlertingInterpret { async fn crd_exists(&self, crd: &str) -> bool { let status = Command::new("sh") - .args(["-c", "kubectl get crd -A | grep -i", crd]) + .args(["-c", &format!("kubectl get crd -A | grep -i {crd}")]) .status() .map_err(|e| InterpretError::new(format!("could not connect to cluster: {}", e))) .unwrap(); diff --git a/harmony/src/modules/prometheus/prometheus.rs b/harmony/src/modules/prometheus/prometheus.rs index 865b9ca..d3940c7 100644 --- a/harmony/src/modules/prometheus/prometheus.rs +++ b/harmony/src/modules/prometheus/prometheus.rs @@ -1,9 +1,11 @@ use async_trait::async_trait; use crate::{ - interpret::{InterpretError, Outcome}, inventory::Inventory, - topology::oberservability::monitoring::{AlertReceiver, AlertSender}, + topology::{ + PreparationError, PreparationOutcome, + oberservability::monitoring::{AlertReceiver, AlertSender}, + }, }; #[async_trait] @@ -13,5 +15,5 @@ pub trait PrometheusApplicationMonitoring { sender: &S, inventory: &Inventory, receivers: Option>>>, - ) -> Result; + ) -> Result; } diff --git a/harmony/src/modules/tenant/credentials.rs b/harmony/src/modules/tenant/credentials.rs index 0e5917a..2ee24ec 100644 --- a/harmony/src/modules/tenant/credentials.rs +++ b/harmony/src/modules/tenant/credentials.rs @@ -17,7 +17,7 @@ impl Score for TenantCredentialScore { } fn name(&self) -> String { - todo!() + "TenantCredentialScore".into() } } diff --git a/harmony/src/modules/tenant/mod.rs b/harmony/src/modules/tenant/mod.rs index 1803b69..b1a49c2 100644 --- a/harmony/src/modules/tenant/mod.rs +++ b/harmony/src/modules/tenant/mod.rs @@ -28,7 +28,7 @@ impl Score for TenantScore { } fn name(&self) -> String { - format!("{} TenantScore", self.config.name) + format!("{} [TenantScore]", self.config.name) } } @@ -47,8 +47,8 @@ impl Interpret for TenantInterpret { topology.provision_tenant(&self.tenant_config).await?; Ok(Outcome::success(format!( - "Successfully provisioned tenant {} with id {}", - self.tenant_config.name, self.tenant_config.id + "Tenant provisioned with id '{}'", + self.tenant_config.id ))) } diff --git a/harmony_cli/Cargo.toml b/harmony_cli/Cargo.toml index 227b39e..a887b60 100644 --- a/harmony_cli/Cargo.toml +++ b/harmony_cli/Cargo.toml @@ -5,6 +5,10 @@ version.workspace = true readme.workspace = true license.workspace = true +[features] +default = ["tui"] +tui = ["dep:harmony_tui"] + [dependencies] assert_cmd = "2.0.17" clap = { version = "4.5.35", features = ["derive"] } @@ -19,7 +23,5 @@ lazy_static = "1.5.0" log.workspace = true indicatif-log-bridge = "0.2.3" - -[features] -default = ["tui"] -tui = ["dep:harmony_tui"] +[dev-dependencies] +harmony = { path = "../harmony", features = ["testing"] } diff --git a/harmony_cli/src/cli_logger.rs b/harmony_cli/src/cli_logger.rs index a35f450..c2ed79d 100644 --- a/harmony_cli/src/cli_logger.rs +++ b/harmony_cli/src/cli_logger.rs @@ -1,16 +1,16 @@ -use harmony::instrumentation::{self, HarmonyEvent}; -use indicatif::{MultiProgress, ProgressBar}; -use indicatif_log_bridge::LogWrapper; -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, +use harmony::{ + instrumentation::{self, HarmonyEvent}, + topology::TopologyStatus, }; +use indicatif::MultiProgress; +use indicatif_log_bridge::LogWrapper; +use std::sync::{Arc, Mutex}; -use crate::progress; +use crate::progress::{IndicatifProgressTracker, ProgressTracker}; pub fn init() -> tokio::task::JoinHandle<()> { - configure_logger(); - let handle = tokio::spawn(handle_events()); + let base_progress = configure_logger(); + let handle = tokio::spawn(handle_events(base_progress)); loop { if instrumentation::instrument(HarmonyEvent::HarmonyStarted).is_ok() { @@ -21,91 +21,144 @@ pub fn init() -> tokio::task::JoinHandle<()> { handle } -fn configure_logger() { +fn configure_logger() -> MultiProgress { let logger = env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build(); let level = logger.filter(); - let multi = MultiProgress::new(); - LogWrapper::new(multi.clone(), logger).try_init().unwrap(); + let progress = MultiProgress::new(); + + LogWrapper::new(progress.clone(), logger) + .try_init() + .unwrap(); log::set_max_level(level); + + progress } -async fn handle_events() { - instrumentation::subscribe("Harmony CLI Logger", { - let sections: Arc>> = - Arc::new(Mutex::new(HashMap::new())); - let progress_bars: Arc>> = - Arc::new(Mutex::new(HashMap::new())); +async fn handle_events(base_progress: MultiProgress) { + let progress_tracker = Arc::new(IndicatifProgressTracker::new(base_progress.clone())); + let preparing_topology = Arc::new(Mutex::new(false)); + let current_score: Arc>> = Arc::new(Mutex::new(None)); + instrumentation::subscribe("Harmony CLI Logger", { move |event| { - let sections_clone = Arc::clone(§ions); - let progress_bars_clone = Arc::clone(&progress_bars); + let progress_tracker = Arc::clone(&progress_tracker); + let preparing_topology = Arc::clone(&preparing_topology); + let current_score = Arc::clone(¤t_score); async move { - let mut sections = sections_clone.lock().unwrap(); - let mut progress_bars = progress_bars_clone.lock().unwrap(); + let mut preparing_topology = preparing_topology.lock().unwrap(); + let mut current_score = current_score.lock().unwrap(); match event { HarmonyEvent::HarmonyStarted => {} - HarmonyEvent::PrepareTopologyStarted { topology: name } => { - let section = progress::new_section(format!( - "{} Preparing environment: {name}...", - crate::theme::EMOJI_TOPOLOGY, - )); - (*sections).insert(name, section); + HarmonyEvent::HarmonyFinished => { + progress_tracker.add_section( + "harmony-summary", + &format!("\n{} Harmony completed\n\n", crate::theme::EMOJI_HARMONY), + ); + progress_tracker.add_section("harmony-finished", "\n\n"); + return false; } - HarmonyEvent::TopologyPrepared { - topology: name, - outcome, + HarmonyEvent::TopologyStateChanged { + topology, + status, + message, } => { - let section = (*sections).get(&name).unwrap(); - let progress = progress::add_spinner(section, "".into()); + let section_key = topology_key(&topology); - match outcome.status { - harmony::interpret::InterpretStatus::SUCCESS => { - progress::success(section, Some(progress), outcome.message); + match status { + TopologyStatus::Queued => {} + TopologyStatus::Preparing => { + progress_tracker.add_section( + §ion_key, + &format!( + "\n{} Preparing environment: {topology}...", + crate::theme::EMOJI_TOPOLOGY + ), + ); + (*preparing_topology) = true; } - harmony::interpret::InterpretStatus::FAILURE => { - progress::error(section, Some(progress), outcome.message); + TopologyStatus::Success => { + (*preparing_topology) = false; + progress_tracker.add_task(§ion_key, "topology-success", ""); + progress_tracker + .finish_task("topology-success", &message.unwrap_or("".into())); } - harmony::interpret::InterpretStatus::RUNNING => todo!(), - harmony::interpret::InterpretStatus::QUEUED => todo!(), - harmony::interpret::InterpretStatus::BLOCKED => todo!(), - harmony::interpret::InterpretStatus::NOOP => { - progress::skip(section, Some(progress), outcome.message); + TopologyStatus::Noop => { + (*preparing_topology) = false; + progress_tracker.add_task(§ion_key, "topology-skip", ""); + progress_tracker + .skip_task("topology-skip", &message.unwrap_or("".into())); + } + TopologyStatus::Error => { + progress_tracker.add_task(§ion_key, "topology-error", ""); + (*preparing_topology) = false; + progress_tracker + .fail_task("topology-error", &message.unwrap_or("".into())); } } } HarmonyEvent::InterpretExecutionStarted { - interpret: name, + execution_id: task_key, topology, + interpret: _, + score, message, } => { - let section = (*sections).get(&topology).unwrap(); - let progress_bar = progress::add_spinner(section, message); + let is_key_topology = (*preparing_topology) + && progress_tracker.contains_section(&topology_key(&topology)); + let is_key_current_score = current_score.is_some() + && progress_tracker + .contains_section(&score_key(¤t_score.clone().unwrap())); + let is_key_score = progress_tracker.contains_section(&score_key(&score)); - (*progress_bars).insert(name, progress_bar); + let section_key = if is_key_topology { + topology_key(&topology) + } else if is_key_current_score { + score_key(¤t_score.clone().unwrap()) + } else if is_key_score { + score_key(&score) + } else { + (*current_score) = Some(score.clone()); + let key = score_key(&score); + progress_tracker.add_section( + &key, + &format!( + "{} Interpreting score: {score}...", + crate::theme::EMOJI_SCORE + ), + ); + key + }; + + progress_tracker.add_task(§ion_key, &task_key, &message); } HarmonyEvent::InterpretExecutionFinished { - topology, - interpret: name, + execution_id: task_key, + topology: _, + interpret: _, + score, outcome, } => { - let section = (*sections).get(&topology).unwrap(); - let progress_bar = (*progress_bars).get(&name).cloned(); - - let _ = section.clear(); - - match outcome { - Ok(outcome) => { - progress::success(section, progress_bar, outcome.message); - } - Err(err) => { - progress::error(section, progress_bar, err.to_string()); - } + if current_score.is_some() && current_score.clone().unwrap() == score { + (*current_score) = None; } - (*progress_bars).remove(&name); + match outcome { + Ok(outcome) => match outcome.status { + harmony::interpret::InterpretStatus::SUCCESS => { + progress_tracker.finish_task(&task_key, &outcome.message); + } + harmony::interpret::InterpretStatus::NOOP => { + progress_tracker.skip_task(&task_key, &outcome.message); + } + _ => progress_tracker.fail_task(&task_key, &outcome.message), + }, + Err(err) => { + progress_tracker.fail_task(&task_key, &err.to_string()); + } + } } } true @@ -114,3 +167,11 @@ async fn handle_events() { }) .await; } + +fn topology_key(topology: &str) -> String { + format!("topology-{topology}") +} + +fn score_key(score: &str) -> String { + format!("score-{score}") +} diff --git a/harmony_cli/src/lib.rs b/harmony_cli/src/lib.rs index b6cf885..11ac572 100644 --- a/harmony_cli/src/lib.rs +++ b/harmony_cli/src/lib.rs @@ -1,5 +1,6 @@ use clap::Parser; use clap::builder::ArgPredicate; +use harmony::instrumentation; use harmony::inventory::Inventory; use harmony::maestro::Maestro; use harmony::{score::Score, topology::Topology}; @@ -97,6 +98,7 @@ pub async fn run( let result = init(maestro, args_struct).await; + instrumentation::instrument(instrumentation::HarmonyEvent::HarmonyFinished).unwrap(); let _ = tokio::try_join!(cli_logger_handle); result } @@ -163,7 +165,7 @@ async fn init( } #[cfg(test)] -mod test { +mod tests { use harmony::{ inventory::Inventory, maestro::Maestro, diff --git a/harmony_cli/src/progress.rs b/harmony_cli/src/progress.rs index 4008bc8..eee8adc 100644 --- a/harmony_cli/src/progress.rs +++ b/harmony_cli/src/progress.rs @@ -1,50 +1,163 @@ +use indicatif::{MultiProgress, ProgressBar}; +use std::collections::HashMap; +use std::sync::{Arc, Mutex}; use std::time::Duration; -use indicatif::{MultiProgress, ProgressBar}; - -pub fn new_section(title: String) -> MultiProgress { - let multi_progress = MultiProgress::new(); - let _ = multi_progress.println(title); - - multi_progress +pub trait ProgressTracker: Send + Sync { + fn contains_section(&self, id: &str) -> bool; + fn add_section(&self, id: &str, message: &str); + fn add_task(&self, section_id: &str, task_id: &str, message: &str); + fn finish_task(&self, id: &str, message: &str); + fn fail_task(&self, id: &str, message: &str); + fn skip_task(&self, id: &str, message: &str); + fn clear(&self); } -pub fn add_spinner(multi_progress: &MultiProgress, message: String) -> ProgressBar { - let progress = multi_progress.add(ProgressBar::new_spinner()); - - progress.set_style(crate::theme::SPINNER_STYLE.clone()); - progress.set_message(message); - progress.enable_steady_tick(Duration::from_millis(100)); - - progress +struct Section { + header_index: usize, + task_count: usize, + pb: ProgressBar, } -pub fn success(multi_progress: &MultiProgress, progress: Option, message: String) { - if let Some(progress) = progress { - multi_progress.remove(&progress) +struct IndicatifProgressTrackerState { + sections: HashMap, + tasks: HashMap, + pb_count: usize, +} + +#[derive(Clone)] +pub struct IndicatifProgressTracker { + mp: MultiProgress, + state: Arc>, +} + +impl IndicatifProgressTracker { + pub fn new(base: MultiProgress) -> Self { + // The indicatif log bridge will insert a progress bar at the top. + // To prevent our first section from being erased, we need to create + // a dummy progress bar as our first progress bar. + let _ = base.clear(); + let log_pb = base.add(ProgressBar::new(1)); + + let mut sections = HashMap::new(); + sections.insert( + "__log__".into(), + Section { + header_index: 0, + task_count: 0, + pb: log_pb.clone(), + }, + ); + + let mut tasks = HashMap::new(); + tasks.insert("__log__".into(), log_pb); + + let state = Arc::new(Mutex::new(IndicatifProgressTrackerState { + sections, + tasks, + pb_count: 1, + })); + + Self { mp: base, state } + } +} + +impl ProgressTracker for IndicatifProgressTracker { + fn add_section(&self, id: &str, message: &str) { + let mut state = self.state.lock().unwrap(); + + let header_pb = self + .mp + .add(ProgressBar::new(1).with_style(crate::theme::SECTION_STYLE.clone())); + header_pb.finish_with_message(message.to_string()); + + let header_index = state.pb_count; + state.pb_count += 1; + + state.sections.insert( + id.to_string(), + Section { + header_index, + task_count: 0, + pb: header_pb, + }, + ); } - let progress = multi_progress.add(ProgressBar::new_spinner()); - progress.set_style(crate::theme::SUCCESS_SPINNER_STYLE.clone()); - progress.finish_with_message(message); -} + fn add_task(&self, section_id: &str, task_id: &str, message: &str) { + let mut state = self.state.lock().unwrap(); -pub fn error(multi_progress: &MultiProgress, progress: Option, message: String) { - if let Some(progress) = progress { - multi_progress.remove(&progress) + let insertion_index = { + let current_section = state + .sections + .get(section_id) + .expect("Section ID not found"); + current_section.header_index + current_section.task_count + 1 // +1 to insert after header + }; + + let pb = self.mp.insert(insertion_index, ProgressBar::new_spinner()); + pb.set_style(crate::theme::SPINNER_STYLE.clone()); + pb.set_prefix(" "); + pb.set_message(message.to_string()); + pb.enable_steady_tick(Duration::from_millis(80)); + + state.pb_count += 1; + + let section = state + .sections + .get_mut(section_id) + .expect("Section ID not found"); + section.task_count += 1; + + // We inserted a new progress bar, so we must update the header_index + // for all subsequent sections. + for (id, s) in state.sections.iter_mut() { + if id != section_id && s.header_index >= insertion_index { + s.header_index += 1; + } + } + + state.tasks.insert(task_id.to_string(), pb); } - let progress = multi_progress.add(ProgressBar::new_spinner()); - progress.set_style(crate::theme::ERROR_SPINNER_STYLE.clone()); - progress.finish_with_message(message); -} - -pub fn skip(multi_progress: &MultiProgress, progress: Option, message: String) { - if let Some(progress) = progress { - multi_progress.remove(&progress) + fn finish_task(&self, id: &str, message: &str) { + let state = self.state.lock().unwrap(); + if let Some(pb) = state.tasks.get(id) { + pb.set_style(crate::theme::SUCCESS_SPINNER_STYLE.clone()); + pb.finish_with_message(message.to_string()); + } } - let progress = multi_progress.add(ProgressBar::new_spinner()); - progress.set_style(crate::theme::SKIP_SPINNER_STYLE.clone()); - progress.finish_with_message(message); + fn fail_task(&self, id: &str, message: &str) { + let state = self.state.lock().unwrap(); + if let Some(pb) = state.tasks.get(id) { + pb.set_style(crate::theme::ERROR_SPINNER_STYLE.clone()); + pb.finish_with_message(message.to_string()); + } + } + + fn skip_task(&self, id: &str, message: &str) { + let state = self.state.lock().unwrap(); + if let Some(pb) = state.tasks.get(id) { + pb.set_style(crate::theme::SKIP_SPINNER_STYLE.clone()); + pb.finish_with_message(message.to_string()); + } + } + + fn contains_section(&self, id: &str) -> bool { + let state = self.state.lock().unwrap(); + state.sections.contains_key(id) + } + + fn clear(&self) { + let mut state = self.state.lock().unwrap(); + + state.tasks.values().for_each(|p| self.mp.remove(p)); + state.tasks.clear(); + state.sections.values().for_each(|s| self.mp.remove(&s.pb)); + state.sections.clear(); + state.pb_count = 0; + + let _ = self.mp.clear(); + } } diff --git a/harmony_cli/src/theme.rs b/harmony_cli/src/theme.rs index ee25077..d86e194 100644 --- a/harmony_cli/src/theme.rs +++ b/harmony_cli/src/theme.rs @@ -8,10 +8,14 @@ pub static EMOJI_SKIP: Emoji<'_, '_> = Emoji("⏭️", ""); pub static EMOJI_ERROR: Emoji<'_, '_> = Emoji("⚠️", ""); pub static EMOJI_DEPLOY: Emoji<'_, '_> = Emoji("🚀", ""); pub static EMOJI_TOPOLOGY: Emoji<'_, '_> = Emoji("📦", ""); +pub static EMOJI_SCORE: Emoji<'_, '_> = Emoji("🎶", ""); lazy_static! { + pub static ref SECTION_STYLE: ProgressStyle = ProgressStyle::default_spinner() + .template("{wide_msg:.bold}") + .unwrap(); pub static ref SPINNER_STYLE: ProgressStyle = ProgressStyle::default_spinner() - .template(" {spinner:.green} {msg}") + .template(" {spinner:.green} {wide_msg}") .unwrap() .tick_strings(&["⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"]); pub static ref SUCCESS_SPINNER_STYLE: ProgressStyle = SPINNER_STYLE diff --git a/harmony_composer/src/harmony_composer_logger.rs b/harmony_composer/src/harmony_composer_logger.rs index 00e0fb7..e05cb58 100644 --- a/harmony_composer/src/harmony_composer_logger.rs +++ b/harmony_composer/src/harmony_composer_logger.rs @@ -1,10 +1,7 @@ -use indicatif::{MultiProgress, ProgressBar}; -use indicatif_log_bridge::LogWrapper; +use harmony_cli::progress::{IndicatifProgressTracker, ProgressTracker}; +use indicatif::MultiProgress; use log::error; -use std::{ - collections::HashMap, - sync::{Arc, Mutex}, -}; +use std::sync::Arc; use crate::instrumentation::{self, HarmonyComposerEvent}; @@ -22,85 +19,57 @@ pub fn init() -> tokio::task::JoinHandle<()> { } fn configure_logger() { - let logger = - env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build(); - let level = logger.filter(); - let multi = MultiProgress::new(); - LogWrapper::new(multi.clone(), logger).try_init().unwrap(); - log::set_max_level(level); + env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build(); } pub async fn handle_events() { - const PROGRESS_SETUP: &str = "project-initialization"; + let progress_tracker = Arc::new(IndicatifProgressTracker::new(MultiProgress::new())); + + const SETUP_SECTION: &str = "project-initialization"; + const COMPILTATION_TASK: &str = "compilation"; const PROGRESS_DEPLOYMENT: &str = "deployment"; instrumentation::subscribe("Harmony Composer Logger", { - let progresses: Arc>> = - Arc::new(Mutex::new(HashMap::new())); - let compilation_progress = Arc::new(Mutex::new(None::)); - move |event| { - let progresses_clone = Arc::clone(&progresses); - let compilation_progress_clone = Arc::clone(&compilation_progress); + let progress_tracker = Arc::clone(&progress_tracker); async move { - let mut progresses_guard = progresses_clone.lock().unwrap(); - let mut compilation_progress_guard = compilation_progress_clone.lock().unwrap(); - match event { HarmonyComposerEvent::HarmonyComposerStarted => {} HarmonyComposerEvent::ProjectInitializationStarted => { - let multi_progress = harmony_cli::progress::new_section(format!( - "{} Initializing Harmony project...", - harmony_cli::theme::EMOJI_HARMONY, - )); - (*progresses_guard).insert(PROGRESS_SETUP.to_string(), multi_progress); + progress_tracker.add_section( + SETUP_SECTION, + &format!( + "{} Initializing Harmony project...", + harmony_cli::theme::EMOJI_HARMONY, + ), + ); } - HarmonyComposerEvent::ProjectInitialized => println!("\n"), + HarmonyComposerEvent::ProjectInitialized => {} HarmonyComposerEvent::ProjectCompilationStarted { details } => { - let initialization_progress = - (*progresses_guard).get(PROGRESS_SETUP).unwrap(); - let _ = initialization_progress.clear(); - - let progress = - harmony_cli::progress::add_spinner(initialization_progress, details); - *compilation_progress_guard = Some(progress); + progress_tracker.add_task(SETUP_SECTION, COMPILTATION_TASK, &details); } HarmonyComposerEvent::ProjectCompiled => { - let initialization_progress = - (*progresses_guard).get(PROGRESS_SETUP).unwrap(); - - harmony_cli::progress::success( - initialization_progress, - (*compilation_progress_guard).take(), - "project compiled".to_string(), - ); + progress_tracker.finish_task(COMPILTATION_TASK, "project compiled"); } HarmonyComposerEvent::ProjectCompilationFailed { details } => { - let initialization_progress = - (*progresses_guard).get(PROGRESS_SETUP).unwrap(); - - harmony_cli::progress::error( - initialization_progress, - (*compilation_progress_guard).take(), - "failed to compile project".to_string(), - ); + progress_tracker.fail_task(COMPILTATION_TASK, "failed to compile project"); error!("{details}"); } HarmonyComposerEvent::DeploymentStarted { target } => { - let multi_progress = harmony_cli::progress::new_section(format!( - "{} Starting deployment to {target}...\n\n", - harmony_cli::theme::EMOJI_DEPLOY - )); - (*progresses_guard).insert(PROGRESS_DEPLOYMENT.to_string(), multi_progress); + progress_tracker.add_section( + PROGRESS_DEPLOYMENT, + &format!( + "\n{} Deploying project to {target}...\n", + harmony_cli::theme::EMOJI_DEPLOY, + ), + ); + } + HarmonyComposerEvent::DeploymentCompleted => { + progress_tracker.clear(); } - HarmonyComposerEvent::DeploymentCompleted => println!("\n"), HarmonyComposerEvent::Shutdown => { - for (_, progresses) in (*progresses_guard).iter() { - progresses.clear().unwrap(); - } - return false; } } diff --git a/harmony_composer/src/instrumentation.rs b/harmony_composer/src/instrumentation.rs index eafa4d0..b2e9c99 100644 --- a/harmony_composer/src/instrumentation.rs +++ b/harmony_composer/src/instrumentation.rs @@ -23,9 +23,18 @@ static HARMONY_COMPOSER_EVENT_BUS: Lazy> }); pub fn instrument(event: HarmonyComposerEvent) -> Result<(), &'static str> { - match HARMONY_COMPOSER_EVENT_BUS.send(event) { - Ok(_) => Ok(()), - Err(_) => Err("send error: no subscribers"), + #[cfg(not(test))] + { + match HARMONY_COMPOSER_EVENT_BUS.send(event) { + Ok(_) => Ok(()), + Err(_) => Err("send error: no subscribers"), + } + } + + #[cfg(test)] + { + let _ = event; // Suppress the "unused variable" warning for `event` + Ok(()) } } diff --git a/harmony_composer/src/main.rs b/harmony_composer/src/main.rs index 8866b4a..9f2c9ea 100644 --- a/harmony_composer/src/main.rs +++ b/harmony_composer/src/main.rs @@ -20,7 +20,7 @@ mod instrumentation; #[derive(Parser)] #[command(version, about, long_about = None, flatten_help = true, propagate_version = true)] struct GlobalArgs { - #[arg(long, default_value = "harmony")] + #[arg(long, default_value = ".")] harmony_path: String, #[arg(long)] diff --git a/k3d/src/downloadable_asset.rs b/k3d/src/downloadable_asset.rs index ababc77..085d382 100644 --- a/k3d/src/downloadable_asset.rs +++ b/k3d/src/downloadable_asset.rs @@ -1,5 +1,5 @@ use futures_util::StreamExt; -use log::{debug, info, warn}; +use log::{debug, warn}; use sha2::{Digest, Sha256}; use std::io::Read; use std::path::PathBuf; @@ -45,7 +45,7 @@ pub(crate) struct DownloadableAsset { impl DownloadableAsset { fn verify_checksum(&self, file: PathBuf) -> bool { if !file.exists() { - warn!("File does not exist: {:?}", file); + debug!("File does not exist: {:?}", file); return false; } @@ -155,7 +155,7 @@ impl DownloadableAsset { return Err(CHECKSUM_FAILED_MSG.to_string()); } - info!( + debug!( "File downloaded and verified successfully: {}", target_file_path.to_string_lossy() ); diff --git a/k3d/src/lib.rs b/k3d/src/lib.rs index 7733ee6..7117d72 100644 --- a/k3d/src/lib.rs +++ b/k3d/src/lib.rs @@ -64,7 +64,6 @@ impl K3d { .text() .await .unwrap(); - println!("body: {body}"); let checksum = body .lines() @@ -104,8 +103,7 @@ impl K3d { .get_latest() .await .map_err(|e| e.to_string())?; - // debug!("Got k3d releases {releases:#?}"); - println!("Got k3d first releases {latest_release:#?}"); + debug!("Got k3d releases {latest_release:#?}"); Ok(latest_release) }