refactor: Remove InterpretStatus/Error & Outcome from Topology #99
| @ -11,5 +11,5 @@ lazy_static! { | ||||
|     pub static ref REGISTRY_PROJECT: String = | ||||
|         std::env::var("HARMONY_REGISTRY_PROJECT").unwrap_or_else(|_| "harmony".to_string()); | ||||
|     pub static ref DRY_RUN: bool = | ||||
|         std::env::var("HARMONY_DRY_RUN").map_or(true, |value| value.parse().unwrap_or(true)); | ||||
|         std::env::var("HARMONY_DRY_RUN").is_ok_and(|value| value.parse().unwrap_or(false)); | ||||
| } | ||||
|  | ||||
| @ -2,18 +2,14 @@ use log::debug; | ||||
| use once_cell::sync::Lazy; | ||||
| use tokio::sync::broadcast; | ||||
| 
 | ||||
| use super::interpret::{InterpretError, Outcome}; | ||||
| use super::{ | ||||
|     interpret::{InterpretError, Outcome}, | ||||
|     topology::TopologyStatus, | ||||
| }; | ||||
| 
 | ||||
| #[derive(Debug, Clone)] | ||||
| pub enum HarmonyEvent { | ||||
|     HarmonyStarted, | ||||
|     PrepareTopologyStarted { | ||||
|         topology: String, | ||||
|     }, | ||||
|     TopologyPrepared { | ||||
|         topology: String, | ||||
|         outcome: Outcome, | ||||
|     }, | ||||
|     InterpretExecutionStarted { | ||||
|         topology: String, | ||||
|         interpret: String, | ||||
| @ -24,6 +20,11 @@ pub enum HarmonyEvent { | ||||
|         interpret: String, | ||||
|         outcome: Result<Outcome, InterpretError>, | ||||
|     }, | ||||
|     TopologyStateChanged { | ||||
|         topology: String, | ||||
|         status: TopologyStatus, | ||||
|         message: Option<String>, | ||||
|     }, | ||||
| } | ||||
| 
 | ||||
| static HARMONY_EVENT_BUS: Lazy<broadcast::Sender<HarmonyEvent>> = Lazy::new(|| { | ||||
|  | ||||
| @ -7,6 +7,7 @@ use super::{ | ||||
|     data::{Id, Version}, | ||||
|     executors::ExecutorError, | ||||
|     inventory::Inventory, | ||||
|     topology::PreparationError, | ||||
| }; | ||||
| 
 | ||||
| pub enum InterpretName { | ||||
| @ -113,6 +114,14 @@ impl std::fmt::Display for InterpretError { | ||||
| } | ||||
| impl Error for InterpretError {} | ||||
| 
 | ||||
| impl From<PreparationError> for InterpretError { | ||||
|     fn from(value: PreparationError) -> Self { | ||||
|         Self { | ||||
|             msg: format!("InterpretError : {value}"), | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<ExecutorError> for InterpretError { | ||||
|     fn from(value: ExecutorError) -> Self { | ||||
|         Self { | ||||
|  | ||||
| @ -1,14 +1,14 @@ | ||||
| use std::sync::{Arc, Mutex, RwLock}; | ||||
| use std::sync::{Arc, RwLock}; | ||||
| 
 | ||||
| use log::{debug, warn}; | ||||
| 
 | ||||
| use crate::instrumentation::{self, HarmonyEvent}; | ||||
| use crate::topology::TopologyStatus; | ||||
| 
 | ||||
| use super::{ | ||||
|     interpret::{InterpretError, InterpretStatus, Outcome}, | ||||
|     interpret::{InterpretError, Outcome}, | ||||
|     inventory::Inventory, | ||||
|     score::Score, | ||||
|     topology::Topology, | ||||
|     topology::{PreparationError, PreparationOutcome, Topology, TopologyState}, | ||||
| }; | ||||
| 
 | ||||
| type ScoreVec<T> = Vec<Box<dyn Score<T>>>; | ||||
| @ -17,7 +17,7 @@ pub struct Maestro<T: Topology> { | ||||
|     inventory: Inventory, | ||||
|     topology: T, | ||||
|     scores: Arc<RwLock<ScoreVec<T>>>, | ||||
|     topology_preparation_result: Mutex<Option<Outcome>>, | ||||
|     topology_state: TopologyState, | ||||
| } | ||||
| 
 | ||||
| impl<T: Topology> Maestro<T> { | ||||
| @ -25,42 +25,47 @@ impl<T: Topology> Maestro<T> { | ||||
|     ///
 | ||||
|     /// This should rarely be used. Most of the time Maestro::initialize should be used instead.
 | ||||
|     pub fn new_without_initialization(inventory: Inventory, topology: T) -> Self { | ||||
|         let topology_name = topology.name().to_string(); | ||||
| 
 | ||||
|         Self { | ||||
|             inventory, | ||||
|             topology, | ||||
|             scores: Arc::new(RwLock::new(Vec::new())), | ||||
|             topology_preparation_result: None.into(), | ||||
|             topology_state: TopologyState::new(topology_name), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     pub async fn initialize(inventory: Inventory, topology: T) -> Result<Self, InterpretError> { | ||||
|         let instance = Self::new_without_initialization(inventory, topology); | ||||
|     pub async fn initialize(inventory: Inventory, topology: T) -> Result<Self, PreparationError> { | ||||
|         let mut instance = Self::new_without_initialization(inventory, topology); | ||||
|         instance.prepare_topology().await?; | ||||
|         Ok(instance) | ||||
|     } | ||||
| 
 | ||||
|     /// Ensures the associated Topology is ready for operations.
 | ||||
|     /// Delegates the readiness check and potential setup actions to the Topology.
 | ||||
|     pub async fn prepare_topology(&self) -> Result<Outcome, InterpretError> { | ||||
|         instrumentation::instrument(HarmonyEvent::PrepareTopologyStarted { | ||||
|             topology: self.topology.name().to_string(), | ||||
|         }) | ||||
|         .unwrap(); | ||||
|     async fn prepare_topology(&mut self) -> Result<PreparationOutcome, PreparationError> { | ||||
|         self.topology_state.prepare(); | ||||
| 
 | ||||
|         let outcome = self.topology.ensure_ready().await?; | ||||
|         let result = self.topology.ensure_ready().await; | ||||
| 
 | ||||
|         instrumentation::instrument(HarmonyEvent::TopologyPrepared { | ||||
|             topology: self.topology.name().to_string(), | ||||
|             outcome: outcome.clone(), | ||||
|         }) | ||||
|         .unwrap(); | ||||
| 
 | ||||
|         self.topology_preparation_result | ||||
|             .lock() | ||||
|             .unwrap() | ||||
|             .replace(outcome.clone()); | ||||
|         match result { | ||||
|             Ok(outcome) => { | ||||
|                 match outcome.clone() { | ||||
|                     PreparationOutcome::Success { details } => { | ||||
|                         self.topology_state.success(details); | ||||
|                     } | ||||
|                     PreparationOutcome::Noop => { | ||||
|                         self.topology_state.noop(); | ||||
|                     } | ||||
|                 }; | ||||
|                 Ok(outcome) | ||||
|             } | ||||
|             Err(err) => { | ||||
|                 self.topology_state.error(err.to_string()); | ||||
|                 Err(err) | ||||
|             } | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     pub fn register_all(&mut self, mut scores: ScoreVec<T>) { | ||||
|         let mut score_mut = self.scores.write().expect("Should acquire lock"); | ||||
| @ -68,12 +73,7 @@ impl<T: Topology> Maestro<T> { | ||||
|     } | ||||
| 
 | ||||
|     fn is_topology_initialized(&self) -> bool { | ||||
|         let result = self.topology_preparation_result.lock().unwrap(); | ||||
|         if let Some(outcome) = result.as_ref() { | ||||
|             matches!(outcome.status, InterpretStatus::SUCCESS) | ||||
|         } else { | ||||
|             false | ||||
|         } | ||||
|         self.topology_state.status == TopologyStatus::Success | ||||
|     } | ||||
| 
 | ||||
|     pub async fn interpret(&self, score: Box<dyn Score<T>>) -> Result<Outcome, InterpretError> { | ||||
|  | ||||
| @ -4,8 +4,6 @@ use harmony_types::net::MacAddress; | ||||
| use log::info; | ||||
| 
 | ||||
| use crate::executors::ExecutorError; | ||||
| use crate::interpret::InterpretError; | ||||
| use crate::interpret::Outcome; | ||||
| 
 | ||||
| use super::DHCPStaticEntry; | ||||
| use super::DhcpServer; | ||||
| @ -19,6 +17,8 @@ use super::K8sclient; | ||||
| use super::LoadBalancer; | ||||
| use super::LoadBalancerService; | ||||
| use super::LogicalHost; | ||||
| use super::PreparationError; | ||||
| use super::PreparationOutcome; | ||||
| use super::Router; | ||||
| use super::TftpServer; | ||||
| 
 | ||||
| @ -48,7 +48,7 @@ impl Topology for HAClusterTopology { | ||||
|     fn name(&self) -> &str { | ||||
|         "HAClusterTopology" | ||||
|     } | ||||
|     async fn ensure_ready(&self) -> Result<Outcome, InterpretError> { | ||||
|     async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> { | ||||
|         todo!( | ||||
|             "ensure_ready, not entirely sure what it should do here, probably something like verify that the hosts are reachable and all services are up and ready." | ||||
|         ) | ||||
| @ -244,10 +244,12 @@ impl Topology for DummyInfra { | ||||
|         todo!() | ||||
|     } | ||||
| 
 | ||||
|     async fn ensure_ready(&self) -> Result<Outcome, InterpretError> { | ||||
|     async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> { | ||||
|         let dummy_msg = "This is a dummy infrastructure that does nothing"; | ||||
|         info!("{dummy_msg}"); | ||||
|         Ok(Outcome::success(dummy_msg.to_string())) | ||||
|         Ok(PreparationOutcome::Success { | ||||
|             details: dummy_msg.into(), | ||||
|         }) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|  | ||||
| @ -7,7 +7,7 @@ use tokio::sync::OnceCell; | ||||
| 
 | ||||
| use crate::{ | ||||
|     executors::ExecutorError, | ||||
|     interpret::{InterpretError, InterpretStatus, Outcome}, | ||||
|     interpret::InterpretStatus, | ||||
|     inventory::Inventory, | ||||
|     modules::{ | ||||
|         k3d::K3DInstallationScore, | ||||
| @ -24,7 +24,8 @@ use crate::{ | ||||
| }; | ||||
| 
 | ||||
| use super::{ | ||||
|     DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, Topology, | ||||
|     DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, PreparationError, | ||||
|     PreparationOutcome, Topology, | ||||
|     k8s::K8sClient, | ||||
|     oberservability::monitoring::AlertReceiver, | ||||
|     tenant::{TenantConfig, TenantManager, k8s::K8sTenantManager}, | ||||
| @ -74,22 +75,31 @@ impl PrometheusApplicationMonitoring<CRDPrometheus> for K8sAnywhereTopology { | ||||
|         sender: &CRDPrometheus, | ||||
|         inventory: &Inventory, | ||||
|         receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>, | ||||
|     ) -> Result<Outcome, InterpretError> { | ||||
|     ) -> Result<PreparationOutcome, PreparationError> { | ||||
|         let po_result = self.ensure_prometheus_operator(sender).await?; | ||||
| 
 | ||||
|         if po_result.status == InterpretStatus::NOOP { | ||||
|         if po_result == PreparationOutcome::Noop { | ||||
|             debug!("Skipping Prometheus CR installation due to missing operator."); | ||||
|             return Ok(Outcome::noop()); | ||||
|             return Ok(po_result); | ||||
|         } | ||||
|         self.get_k8s_prometheus_application_score(sender.clone(), receivers) | ||||
| 
 | ||||
|         let result = self | ||||
|             .get_k8s_prometheus_application_score(sender.clone(), receivers) | ||||
|             .await | ||||
|             .create_interpret() | ||||
|             .execute(inventory, self) | ||||
|             .await?; | ||||
|             .await; | ||||
| 
 | ||||
|         Ok(Outcome::success( | ||||
|             "No action, working on cluster  ".to_string(), | ||||
|         )) | ||||
|         match result { | ||||
|             Ok(outcome) => match outcome.status { | ||||
|                 InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success { | ||||
|                     details: outcome.message, | ||||
|                 }), | ||||
|                 InterpretStatus::NOOP => Ok(PreparationOutcome::Noop), | ||||
|                 _ => Err(PreparationError::new(outcome.message)), | ||||
|             }, | ||||
|             Err(err) => Err(PreparationError::new(err.to_string())), | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| @ -160,15 +170,24 @@ impl K8sAnywhereTopology { | ||||
|         K3DInstallationScore::default() | ||||
|     } | ||||
| 
 | ||||
|     async fn try_install_k3d(&self) -> Result<(), InterpretError> { | ||||
|         self.get_k3d_installation_score() | ||||
|     async fn try_install_k3d(&self) -> Result<(), PreparationError> { | ||||
|         let result = self | ||||
|             .get_k3d_installation_score() | ||||
|             .create_interpret() | ||||
|             .execute(&Inventory::empty(), self) | ||||
|             .await?; | ||||
|         Ok(()) | ||||
|             .await; | ||||
| 
 | ||||
|         match result { | ||||
|             Ok(outcome) => match outcome.status { | ||||
|                 InterpretStatus::SUCCESS => Ok(()), | ||||
|                 InterpretStatus::NOOP => Ok(()), | ||||
|                 _ => Err(PreparationError::new(outcome.message)), | ||||
|             }, | ||||
|             Err(err) => Err(PreparationError::new(err.to_string())), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     async fn try_get_or_install_k8s_client(&self) -> Result<Option<K8sState>, InterpretError> { | ||||
|     async fn try_get_or_install_k8s_client(&self) -> Result<Option<K8sState>, PreparationError> { | ||||
|         let k8s_anywhere_config = &self.config; | ||||
| 
 | ||||
|         // TODO this deserves some refactoring, it is becoming a bit hard to figure out
 | ||||
| @ -187,7 +206,7 @@ impl K8sAnywhereTopology { | ||||
|                         })); | ||||
|                     } | ||||
|                     None => { | ||||
|                         return Err(InterpretError::new(format!( | ||||
|                         return Err(PreparationError::new(format!( | ||||
|                             "Failed to load kubeconfig from {kubeconfig}" | ||||
|                         ))); | ||||
|                     } | ||||
| @ -261,11 +280,11 @@ impl K8sAnywhereTopology { | ||||
|     async fn ensure_prometheus_operator( | ||||
|         &self, | ||||
|         sender: &CRDPrometheus, | ||||
|     ) -> Result<Outcome, InterpretError> { | ||||
|     ) -> Result<PreparationOutcome, PreparationError> { | ||||
|         let status = Command::new("sh") | ||||
|             .args(["-c", "kubectl get crd -A | grep -i prometheuses"]) | ||||
|             .status() | ||||
|             .map_err(|e| InterpretError::new(format!("could not connect to cluster: {}", e)))?; | ||||
|             .map_err(|e| PreparationError::new(format!("could not connect to cluster: {}", e)))?; | ||||
| 
 | ||||
|         if !status.success() { | ||||
|             if let Some(Some(k8s_state)) = self.k8s_state.get() { | ||||
| @ -274,30 +293,40 @@ impl K8sAnywhereTopology { | ||||
|                         debug!("installing prometheus operator"); | ||||
|                         let op_score = | ||||
|                             prometheus_operator_helm_chart_score(sender.namespace.clone()); | ||||
|                         op_score | ||||
|                         let result = op_score | ||||
|                             .create_interpret() | ||||
|                             .execute(&Inventory::empty(), self) | ||||
|                             .await?; | ||||
|                         return Ok(Outcome::success( | ||||
|                             "installed prometheus operator".to_string(), | ||||
|                         )); | ||||
|                             .await; | ||||
| 
 | ||||
|                         return match result { | ||||
|                             Ok(outcome) => match outcome.status { | ||||
|                                 InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success { | ||||
|                                     details: "installed prometheus operator".into(), | ||||
|                                 }), | ||||
|                                 InterpretStatus::NOOP => Ok(PreparationOutcome::Noop), | ||||
|                                 _ => Err(PreparationError::new( | ||||
|                                     "failed to install prometheus operator (unknown error)".into(), | ||||
|                                 )), | ||||
|                             }, | ||||
|                             Err(err) => Err(PreparationError::new(err.to_string())), | ||||
|                         }; | ||||
|                     } | ||||
|                     K8sSource::Kubeconfig => { | ||||
|                         debug!("unable to install prometheus operator, contact cluster admin"); | ||||
|                         return Ok(Outcome::noop()); | ||||
|                         return Ok(PreparationOutcome::Noop); | ||||
|                     } | ||||
|                 } | ||||
|             } else { | ||||
|                 warn!("Unable to detect k8s_state. Skipping Prometheus Operator install."); | ||||
|                 return Ok(Outcome::noop()); | ||||
|                 return Ok(PreparationOutcome::Noop); | ||||
|             } | ||||
|         } | ||||
| 
 | ||||
|         debug!("Prometheus operator is already present, skipping install"); | ||||
| 
 | ||||
|         Ok(Outcome::success( | ||||
|             "prometheus operator present in cluster".to_string(), | ||||
|         )) | ||||
|         Ok(PreparationOutcome::Success { | ||||
|             details: "prometheus operator present in cluster".into(), | ||||
|         }) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| @ -356,26 +385,25 @@ impl Topology for K8sAnywhereTopology { | ||||
|         "K8sAnywhereTopology" | ||||
|     } | ||||
| 
 | ||||
|     async fn ensure_ready(&self) -> Result<Outcome, InterpretError> { | ||||
|     async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> { | ||||
|         let k8s_state = self | ||||
|             .k8s_state | ||||
|             .get_or_try_init(|| self.try_get_or_install_k8s_client()) | ||||
|             .await?; | ||||
| 
 | ||||
|         let k8s_state: &K8sState = k8s_state.as_ref().ok_or(InterpretError::new( | ||||
|             "No K8s client could be found or installed".to_string(), | ||||
|         let k8s_state: &K8sState = k8s_state.as_ref().ok_or(PreparationError::new( | ||||
|             "no K8s client could be found or installed".to_string(), | ||||
|         ))?; | ||||
| 
 | ||||
|         self.ensure_k8s_tenant_manager() | ||||
|             .await | ||||
|             .map_err(InterpretError::new)?; | ||||
|             .map_err(PreparationError::new)?; | ||||
| 
 | ||||
|         match self.is_helm_available() { | ||||
|             Ok(()) => Ok(Outcome::success(format!( | ||||
|                 "{} + helm available", | ||||
|                 k8s_state.message.clone() | ||||
|             ))), | ||||
|             Err(e) => Err(InterpretError::new(format!("helm unavailable: {}", e))), | ||||
|             Ok(()) => Ok(PreparationOutcome::Success { | ||||
|                 details: format!("{} + helm available", k8s_state.message.clone()), | ||||
|             }), | ||||
|             Err(e) => Err(PreparationError::new(format!("helm unavailable: {}", e))), | ||||
|         } | ||||
|     } | ||||
| } | ||||
|  | ||||
| @ -1,9 +1,7 @@ | ||||
| use async_trait::async_trait; | ||||
| use derive_new::new; | ||||
| 
 | ||||
| use crate::interpret::{InterpretError, Outcome}; | ||||
| 
 | ||||
| use super::{HelmCommand, Topology}; | ||||
| use super::{HelmCommand, PreparationError, PreparationOutcome, Topology}; | ||||
| 
 | ||||
| #[derive(new)] | ||||
| pub struct LocalhostTopology; | ||||
| @ -14,10 +12,10 @@ impl Topology for LocalhostTopology { | ||||
|         "LocalHostTopology" | ||||
|     } | ||||
| 
 | ||||
|     async fn ensure_ready(&self) -> Result<Outcome, InterpretError> { | ||||
|         Ok(Outcome::success( | ||||
|             "Localhost is Chuck Norris, always ready.".to_string(), | ||||
|         )) | ||||
|     async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> { | ||||
|         Ok(PreparationOutcome::Success { | ||||
|             details: "Localhost is Chuck Norris, always ready.".into(), | ||||
|         }) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
|  | ||||
| @ -6,6 +6,7 @@ mod k8s_anywhere; | ||||
| mod localhost; | ||||
| pub mod oberservability; | ||||
| pub mod tenant; | ||||
| use derive_new::new; | ||||
| pub use k8s_anywhere::*; | ||||
| pub use localhost::*; | ||||
| pub mod k8s; | ||||
| @ -26,10 +27,13 @@ pub use tftp::*; | ||||
| mod helm_command; | ||||
| pub use helm_command::*; | ||||
| 
 | ||||
| use super::{ | ||||
|     executors::ExecutorError, | ||||
|     instrumentation::{self, HarmonyEvent}, | ||||
| }; | ||||
| use std::error::Error; | ||||
| use std::net::IpAddr; | ||||
| 
 | ||||
| use super::interpret::{InterpretError, Outcome}; | ||||
| 
 | ||||
| /// Represents a logical view of an infrastructure environment providing specific capabilities.
 | ||||
| ///
 | ||||
| /// A Topology acts as a self-contained "package" responsible for managing access
 | ||||
| @ -57,9 +61,128 @@ pub trait Topology: Send + Sync { | ||||
|     ///     *   **Internal Orchestration:** For complex topologies, this method might manage dependencies on other sub-topologies, ensuring *their* `ensure_ready` is called first. Using nested `Maestros` to run setup `Scores` against these sub-topologies is the recommended pattern for non-trivial bootstrapping, allowing reuse of Harmony's core orchestration logic.
 | ||||
|     ///
 | ||||
|     /// # Returns
 | ||||
|     /// - `Ok(Outcome)`: Indicates the topology is now ready. The `Outcome` status might be `SUCCESS` if actions were taken, or `NOOP` if it was already ready. The message should provide context.
 | ||||
|     /// - `Err(TopologyError)`: Indicates the topology could not reach a ready state due to configuration issues, discovery failures, bootstrap errors, or unsupported environments.
 | ||||
|     async fn ensure_ready(&self) -> Result<Outcome, InterpretError>; | ||||
|     /// - `Ok(PreparationOutcome)`: Indicates the topology is now ready. The `Outcome` status might be `SUCCESS` if actions were taken, or `NOOP` if it was already ready. The message should provide context.
 | ||||
|     /// - `Err(PreparationError)`: Indicates the topology could not reach a ready state due to configuration issues, discovery failures, bootstrap errors, or unsupported environments.
 | ||||
|     async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError>; | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, Clone, PartialEq, Eq)] | ||||
| pub enum PreparationOutcome { | ||||
|     Success { details: String }, | ||||
|     Noop, | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug, Clone, new)] | ||||
| pub struct PreparationError { | ||||
|  | ||||
|     msg: String, | ||||
| } | ||||
| 
 | ||||
| impl std::fmt::Display for PreparationError { | ||||
|     fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { | ||||
|         f.write_str(&self.msg) | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl Error for PreparationError {} | ||||
| 
 | ||||
| impl From<ExecutorError> for PreparationError { | ||||
|     fn from(value: ExecutorError) -> Self { | ||||
|         Self { | ||||
|             msg: format!("InterpretError : {value}"), | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<kube::Error> for PreparationError { | ||||
|     fn from(value: kube::Error) -> Self { | ||||
|         Self { | ||||
|             msg: format!("PreparationError : {value}"), | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| impl From<String> for PreparationError { | ||||
|     fn from(value: String) -> Self { | ||||
|         Self { | ||||
|             msg: format!("PreparationError : {value}"), | ||||
|         } | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Clone, Debug, PartialEq)] | ||||
| pub enum TopologyStatus { | ||||
|     Queued, | ||||
|     Preparing, | ||||
|     Success, | ||||
|     Noop, | ||||
|     Error, | ||||
| } | ||||
| 
 | ||||
| pub struct TopologyState { | ||||
|     pub topology: String, | ||||
|     pub status: TopologyStatus, | ||||
| } | ||||
| 
 | ||||
| impl TopologyState { | ||||
|     pub fn new(topology: String) -> Self { | ||||
|         let instance = Self { | ||||
|             topology, | ||||
|             status: TopologyStatus::Queued, | ||||
|         }; | ||||
| 
 | ||||
|         instrumentation::instrument(HarmonyEvent::TopologyStateChanged { | ||||
|             topology: instance.topology.clone(), | ||||
|             status: instance.status.clone(), | ||||
|             message: None, | ||||
|         }) | ||||
|         .unwrap(); | ||||
| 
 | ||||
|         instance | ||||
|     } | ||||
| 
 | ||||
|     pub fn prepare(&mut self) { | ||||
|         self.status = TopologyStatus::Preparing; | ||||
| 
 | ||||
|         instrumentation::instrument(HarmonyEvent::TopologyStateChanged { | ||||
|             topology: self.topology.clone(), | ||||
|             status: self.status.clone(), | ||||
|             message: None, | ||||
|         }) | ||||
|         .unwrap(); | ||||
|     } | ||||
| 
 | ||||
|     pub fn success(&mut self, message: String) { | ||||
|         self.status = TopologyStatus::Success; | ||||
| 
 | ||||
|         instrumentation::instrument(HarmonyEvent::TopologyStateChanged { | ||||
|             topology: self.topology.clone(), | ||||
|             status: self.status.clone(), | ||||
|             message: Some(message), | ||||
|         }) | ||||
|         .unwrap(); | ||||
|     } | ||||
| 
 | ||||
|     pub fn noop(&mut self) { | ||||
|         self.status = TopologyStatus::Noop; | ||||
| 
 | ||||
|         instrumentation::instrument(HarmonyEvent::TopologyStateChanged { | ||||
|             topology: self.topology.clone(), | ||||
|             status: self.status.clone(), | ||||
|             message: None, | ||||
|         }) | ||||
|         .unwrap(); | ||||
|     } | ||||
| 
 | ||||
|     pub fn error(&mut self, message: String) { | ||||
|         self.status = TopologyStatus::Error; | ||||
| 
 | ||||
|         instrumentation::instrument(HarmonyEvent::TopologyStateChanged { | ||||
|             topology: self.topology.clone(), | ||||
|             status: self.status.clone(), | ||||
|             message: Some(message), | ||||
|         }) | ||||
|         .unwrap(); | ||||
|     } | ||||
| } | ||||
| 
 | ||||
| #[derive(Debug)] | ||||
|  | ||||
| @ -15,7 +15,7 @@ use k8s_openapi::{ | ||||
|     apimachinery::pkg::util::intstr::IntOrString, | ||||
| }; | ||||
| use kube::Resource; | ||||
| use log::{debug, info, warn}; | ||||
| use log::debug; | ||||
| use serde::de::DeserializeOwned; | ||||
| use serde_json::json; | ||||
| use tokio::sync::OnceCell; | ||||
| @ -43,8 +43,7 @@ impl K8sTenantManager { | ||||
|     } | ||||
| 
 | ||||
|     fn ensure_constraints(&self, _namespace: &Namespace) -> Result<(), ExecutorError> { | ||||
|         warn!("Validate that when tenant already exists (by id) that name has not changed"); | ||||
|         warn!("Make sure other Tenant constraints are respected by this k8s implementation"); | ||||
|         // TODO: Ensure constraints are applied to namespace (https://git.nationtech.io/NationTech/harmony/issues/98)
 | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
| @ -433,13 +432,14 @@ impl TenantManager for K8sTenantManager { | ||||
|         debug!("Creating network_policy for tenant {}", config.name); | ||||
|         self.apply_resource(network_policy, config).await?; | ||||
| 
 | ||||
|         info!( | ||||
|         debug!( | ||||
|             "Success provisionning K8s tenant id {} name {}", | ||||
|             config.id, config.name | ||||
|         ); | ||||
|         self.store_config(config); | ||||
|         Ok(()) | ||||
|     } | ||||
| 
 | ||||
|     async fn get_tenant_config(&self) -> Option<TenantConfig> { | ||||
|         self.k8s_tenant_config.get().cloned() | ||||
|     } | ||||
|  | ||||
| @ -13,7 +13,7 @@ use crate::{ | ||||
|         prometheus::prometheus::PrometheusApplicationMonitoring, | ||||
|     }, | ||||
|     score::Score, | ||||
|     topology::{Topology, oberservability::monitoring::AlertReceiver}, | ||||
|     topology::{PreparationOutcome, Topology, oberservability::monitoring::AlertReceiver}, | ||||
| }; | ||||
| 
 | ||||
| #[derive(Debug, Clone, Serialize)] | ||||
| @ -51,13 +51,21 @@ impl<T: Topology + PrometheusApplicationMonitoring<CRDPrometheus>> Interpret<T> | ||||
|         inventory: &Inventory, | ||||
|         topology: &T, | ||||
|     ) -> Result<Outcome, InterpretError> { | ||||
|         topology | ||||
|         let result = topology | ||||
|             .install_prometheus( | ||||
|                 &self.score.sender, | ||||
|                 inventory, | ||||
|                 Some(self.score.receivers.clone()), | ||||
|             ) | ||||
|             .await | ||||
|             .await; | ||||
| 
 | ||||
|         match result { | ||||
|             Ok(outcome) => match outcome { | ||||
|                 PreparationOutcome::Success { details } => Ok(Outcome::success(details)), | ||||
|                 PreparationOutcome::Noop => Ok(Outcome::noop()), | ||||
|             }, | ||||
|             Err(err) => Err(InterpretError::from(err)), | ||||
|         } | ||||
|     } | ||||
| 
 | ||||
|     fn get_name(&self) -> InterpretName { | ||||
|  | ||||
| @ -1,9 +1,11 @@ | ||||
| use async_trait::async_trait; | ||||
| 
 | ||||
| use crate::{ | ||||
|     interpret::{InterpretError, Outcome}, | ||||
|     inventory::Inventory, | ||||
|     topology::oberservability::monitoring::{AlertReceiver, AlertSender}, | ||||
|     topology::{ | ||||
|         PreparationError, PreparationOutcome, | ||||
|         oberservability::monitoring::{AlertReceiver, AlertSender}, | ||||
|     }, | ||||
| }; | ||||
| 
 | ||||
| #[async_trait] | ||||
| @ -13,5 +15,5 @@ pub trait PrometheusApplicationMonitoring<S: AlertSender> { | ||||
|         sender: &S, | ||||
|         inventory: &Inventory, | ||||
|         receivers: Option<Vec<Box<dyn AlertReceiver<S>>>>, | ||||
|     ) -> Result<Outcome, InterpretError>; | ||||
|     ) -> Result<PreparationOutcome, PreparationError>; | ||||
| } | ||||
|  | ||||
| @ -1,4 +1,7 @@ | ||||
| use harmony::instrumentation::{self, HarmonyEvent}; | ||||
| use harmony::{ | ||||
|     instrumentation::{self, HarmonyEvent}, | ||||
|     topology::TopologyStatus, | ||||
| }; | ||||
| use indicatif::{MultiProgress, ProgressBar}; | ||||
| use indicatif_log_bridge::LogWrapper; | ||||
| use std::{ | ||||
| @ -47,52 +50,90 @@ async fn handle_events() { | ||||
| 
 | ||||
|                 match event { | ||||
|                     HarmonyEvent::HarmonyStarted => {} | ||||
|                     HarmonyEvent::PrepareTopologyStarted { topology: name } => { | ||||
|                     HarmonyEvent::TopologyStateChanged { | ||||
|                         topology, | ||||
|                         status, | ||||
|                         message, | ||||
|                     } => { | ||||
|                         let section_key = topology_key(&topology); | ||||
| 
 | ||||
|                         match status { | ||||
|                             TopologyStatus::Queued => {} | ||||
|                             TopologyStatus::Preparing => { | ||||
|                                 let section = progress::new_section(format!( | ||||
|                             "{} Preparing environment: {name}...", | ||||
|                                     "{} Preparing environment: {topology}...", | ||||
|                                     crate::theme::EMOJI_TOPOLOGY, | ||||
|                                 )); | ||||
|                         (*sections).insert(name, section); | ||||
|                                 (*sections).insert(section_key, section); | ||||
|                             } | ||||
|                     HarmonyEvent::TopologyPrepared { | ||||
|                         topology: name, | ||||
|                         outcome, | ||||
|                     } => { | ||||
|                         let section = (*sections).get(&name).unwrap(); | ||||
|                             TopologyStatus::Success => { | ||||
|                                 let section = (*sections).get(§ion_key).unwrap(); | ||||
|                                 let progress = progress::add_spinner(section, "".into()); | ||||
| 
 | ||||
|                         match outcome.status { | ||||
|                             harmony::interpret::InterpretStatus::SUCCESS => { | ||||
|                                 progress::success(section, Some(progress), outcome.message); | ||||
|                                 progress::success( | ||||
|                                     section, | ||||
|                                     Some(progress), | ||||
|                                     message.unwrap_or("".into()), | ||||
|                                 ); | ||||
| 
 | ||||
|                                 (*sections).remove(§ion_key); | ||||
|                             } | ||||
|                             harmony::interpret::InterpretStatus::FAILURE => { | ||||
|                                 progress::error(section, Some(progress), outcome.message); | ||||
|                             TopologyStatus::Noop => { | ||||
|                                 let section = (*sections).get(§ion_key).unwrap(); | ||||
|                                 let progress = progress::add_spinner(section, "".into()); | ||||
| 
 | ||||
|                                 progress::skip( | ||||
|                                     section, | ||||
|                                     Some(progress), | ||||
|                                     message.unwrap_or("".into()), | ||||
|                                 ); | ||||
| 
 | ||||
|                                 (*sections).remove(§ion_key); | ||||
|                             } | ||||
|                             harmony::interpret::InterpretStatus::RUNNING => todo!(), | ||||
|                             harmony::interpret::InterpretStatus::QUEUED => todo!(), | ||||
|                             harmony::interpret::InterpretStatus::BLOCKED => todo!(), | ||||
|                             harmony::interpret::InterpretStatus::NOOP => { | ||||
|                                 progress::skip(section, Some(progress), outcome.message); | ||||
|                             TopologyStatus::Error => { | ||||
|                                 let section = (*sections).get(§ion_key).unwrap(); | ||||
|                                 let progress = progress::add_spinner(section, "".into()); | ||||
| 
 | ||||
|                                 progress::error( | ||||
|                                     section, | ||||
|                                     Some(progress), | ||||
|                                     message.unwrap_or("".into()), | ||||
|                                 ); | ||||
| 
 | ||||
|                                 (*sections).remove(§ion_key); | ||||
|                             } | ||||
|                         } | ||||
|                     } | ||||
|                     HarmonyEvent::InterpretExecutionStarted { | ||||
|                         interpret: name, | ||||
|                         topology, | ||||
|                         interpret, | ||||
|                         message, | ||||
|                     } => { | ||||
|                         let section = (*sections).get(&topology).unwrap(); | ||||
|                         let section_key = if (*sections).contains_key(&topology_key(&topology)) { | ||||
|                             topology_key(&topology) | ||||
|                         } else { | ||||
|                             interpret_key(&interpret) | ||||
|                         }; | ||||
|                         let section = (*sections).get(§ion_key).unwrap(); | ||||
|                         let progress_bar = progress::add_spinner(section, message); | ||||
| 
 | ||||
|                         (*progress_bars).insert(name, progress_bar); | ||||
|                         (*progress_bars).insert(interpret_key(&interpret), progress_bar); | ||||
|                     } | ||||
|                     HarmonyEvent::InterpretExecutionFinished { | ||||
|                         topology, | ||||
|                         interpret: name, | ||||
|                         interpret, | ||||
|                         outcome, | ||||
|                     } => { | ||||
|                         let section = (*sections).get(&topology).unwrap(); | ||||
|                         let progress_bar = (*progress_bars).get(&name).cloned(); | ||||
|                         let has_topology = (*sections).contains_key(&topology_key(&topology)); | ||||
|                         let section_key = if has_topology { | ||||
|                             topology_key(&topology) | ||||
|                         } else { | ||||
|                             interpret_key(&interpret) | ||||
|                         }; | ||||
| 
 | ||||
|                         let section = (*sections).get(§ion_key).unwrap(); | ||||
|                         let progress_bar = | ||||
|                             (*progress_bars).get(&interpret_key(&interpret)).cloned(); | ||||
| 
 | ||||
|                         let _ = section.clear(); | ||||
| 
 | ||||
| @ -105,7 +146,9 @@ async fn handle_events() { | ||||
|                             } | ||||
|                         } | ||||
| 
 | ||||
|                         (*progress_bars).remove(&name); | ||||
|                         if !has_topology { | ||||
|                             (*progress_bars).remove(§ion_key); | ||||
|                         } | ||||
|                     } | ||||
|                 } | ||||
|                 true | ||||
| @ -114,3 +157,11 @@ async fn handle_events() { | ||||
|     }) | ||||
|     .await; | ||||
| } | ||||
| 
 | ||||
| fn topology_key(topology: &str) -> String { | ||||
|     format!("topology-{topology}") | ||||
| } | ||||
| 
 | ||||
| fn interpret_key(interpret: &str) -> String { | ||||
|     format!("interpret-{interpret}") | ||||
| } | ||||
|  | ||||
| @ -20,7 +20,7 @@ mod instrumentation; | ||||
| #[derive(Parser)] | ||||
| #[command(version, about, long_about = None, flatten_help = true, propagate_version = true)] | ||||
| struct GlobalArgs { | ||||
|     #[arg(long, default_value = "harmony")] | ||||
|     #[arg(long, default_value = ".")] | ||||
|     harmony_path: String, | ||||
| 
 | ||||
|     #[arg(long)] | ||||
|  | ||||
| @ -1,5 +1,5 @@ | ||||
| use futures_util::StreamExt; | ||||
| use log::{debug, info, warn}; | ||||
| use log::{debug, warn}; | ||||
| use sha2::{Digest, Sha256}; | ||||
| use std::io::Read; | ||||
| use std::path::PathBuf; | ||||
| @ -45,7 +45,7 @@ pub(crate) struct DownloadableAsset { | ||||
| impl DownloadableAsset { | ||||
|     fn verify_checksum(&self, file: PathBuf) -> bool { | ||||
|         if !file.exists() { | ||||
|             warn!("File does not exist: {:?}", file); | ||||
|             debug!("File does not exist: {:?}", file); | ||||
|             return false; | ||||
|         } | ||||
| 
 | ||||
| @ -155,7 +155,7 @@ impl DownloadableAsset { | ||||
|             return Err(CHECKSUM_FAILED_MSG.to_string()); | ||||
|         } | ||||
| 
 | ||||
|         info!( | ||||
|         debug!( | ||||
|             "File downloaded and verified successfully: {}", | ||||
|             target_file_path.to_string_lossy() | ||||
|         ); | ||||
|  | ||||
| @ -64,7 +64,6 @@ impl K3d { | ||||
|             .text() | ||||
|             .await | ||||
|             .unwrap(); | ||||
|         println!("body: {body}"); | ||||
| 
 | ||||
|         let checksum = body | ||||
|             .lines() | ||||
| @ -104,8 +103,7 @@ impl K3d { | ||||
|             .get_latest() | ||||
|             .await | ||||
|             .map_err(|e| e.to_string())?; | ||||
|         // debug!("Got k3d releases {releases:#?}");
 | ||||
|         println!("Got k3d first releases {latest_release:#?}"); | ||||
|         debug!("Got k3d releases {latest_release:#?}"); | ||||
| 
 | ||||
|         Ok(latest_release) | ||||
|     } | ||||
|  | ||||
		Loading…
	
		Reference in New Issue
	
	Block a user
	
I love this! Feels natural and clean.
Also very naturally outlines a trend to eventually avoid the boilerplate of having each module define its own set of Outcome, Error. We will then refactor with a standardized approach when time comes.