Compare commits

..

22 Commits

Author SHA1 Message Date
Ian Letourneau
5cc93d3107 use new harmony_cli::run
All checks were successful
Run Check Script / check (pull_request) Successful in -44s
2025-08-04 17:22:08 -04:00
Ian Letourneau
569839bf66 Merge branch 'master' into feat/crd-alertmanager-configs 2025-08-04 17:04:50 -04:00
Ian Letourneau
e078f5c062 quick cleanup
All checks were successful
Run Check Script / check (pull_request) Successful in -44s
2025-08-04 15:28:01 -04:00
Ian Letourneau
a8394cda47 simpler check to see if a crd exists + cleanup
All checks were successful
Run Check Script / check (pull_request) Successful in -41s
2025-08-02 11:58:39 -04:00
Ian Letourneau
064f6d88ba quick cleanup
All checks were successful
Run Check Script / check (pull_request) Successful in -42s
2025-08-02 11:10:50 -04:00
Ian Letourneau
9403581be5 fix check to ensure prometheus operator is installed 2025-08-02 11:10:20 -04:00
Ian Letourneau
056152a1e5 remove comment
All checks were successful
Run Check Script / check (pull_request) Successful in -40s
2025-08-01 23:52:26 -04:00
Ian Letourneau
c6b255d0bd merge configure_receiver with AlertReceiver::install & cleanup unused stuff
All checks were successful
Run Check Script / check (pull_request) Successful in -39s
2025-08-01 23:09:12 -04:00
Ian Letourneau
4b6bebcaf1 remove unnecessary configure_receivers method from trait 2025-08-01 18:26:05 -04:00
Ian Letourneau
961a300154 cleanup unused k3d prometheus monitoring score & simplify design 2025-08-01 17:59:18 -04:00
a5deda647b wip: need to convert the generic type AlertReceiver<CRDPrometheus> to CRDAlertManagerReceiver in k8sAnywhereTopology which extends AlertReceiver<CRDPrometheus> in order to be able to configure and install the receiver and its associated crd-alertmanagerconfigs to the cluster 2025-07-31 16:17:30 -04:00
0b965b6570 Merge remote-tracking branch 'origin/master' into feat/crd-alertmanager-configs
All checks were successful
Run Check Script / check (pull_request) Successful in -37s
2025-07-28 15:22:24 -04:00
d7bce37b69 fix: cargo fmt
All checks were successful
Run Check Script / check (pull_request) Successful in -37s
2025-07-28 15:18:46 -04:00
b56a30de3c fix: prometheus operator and grafana operator deploy application namespace on local k3d
Some checks failed
Run Check Script / check (pull_request) Failing after -1m5s
if kube-prometheus-operator is present installation of prometheus-operator will skip
outside of local k3d installation installation of operator is skipped
2025-07-28 15:15:10 -04:00
b9e208f4cf feat: added default prometheus rules and grafana dashboard for application monitoring
All checks were successful
Run Check Script / check (pull_request) Successful in -32s
2025-07-22 13:26:03 -04:00
1d8b503bd2 Xwip: uses a helm chart to deploy a prometheus operator if crd are ont present in cluster, and deploys a grafana operator.
All checks were successful
Run Check Script / check (pull_request) Successful in -32s
added a sample dashboard and prometheus data source to grafana
2025-07-21 17:59:35 -04:00
114219385f wip:added impl for prometheuses, alertmanagers, prometheusrules, servicemonitors, and some default rules that are deployed for application monitor
All checks were successful
Run Check Script / check (pull_request) Successful in 2m19s
working on implementing grafana crds via grafana operator
need to link prometheus rules and alert managers in prometheus, testing it shows that prometheus isnt detecting them automatically
2025-07-16 15:56:00 -04:00
1525ac2226 fix: git conflict
All checks were successful
Run Check Script / check (pull_request) Successful in -19s
2025-07-14 14:34:53 -04:00
55a4e79ec4 fix: added updated Cargo
All checks were successful
Run Check Script / check (pull_request) Successful in 1m52s
2025-07-14 14:18:32 -04:00
7b91088828 feat: added impl for webhook receiver for crd alertmanagerconfigs
Some checks failed
Run Check Script / check (pull_request) Failing after 49s
2025-07-14 13:41:48 -04:00
e61ec015ab feat: added impl for Discordwebhook receiver to receive application alerts from namespaces from application feature
Some checks failed
Run Check Script / check (pull_request) Failing after 49s
2025-07-14 13:06:47 -04:00
819f4a32fd wip: added an implementation of CRDalertmanagerconfigs that can be used to add a discord webhook receiver, currently the namespace is hard coded and there are a bunch of todos!() that need to be cleaned up, and flags need to be added so that alertmanager will automatically register the crd 2025-07-11 16:01:52 -04:00
96 changed files with 871 additions and 2066 deletions

View File

@@ -9,7 +9,7 @@ jobs:
check: check:
runs-on: docker runs-on: docker
container: container:
image: hub.nationtech.io/harmony/harmony_composer:latest image: hub.nationtech.io/harmony/harmony_composer:latest@sha256:eb0406fcb95c63df9b7c4b19bc50ad7914dd8232ce98e9c9abef628e07c69386
steps: steps:
- name: Checkout code - name: Checkout code
uses: actions/checkout@v4 uses: actions/checkout@v4

View File

@@ -7,7 +7,7 @@ on:
jobs: jobs:
package_harmony_composer: package_harmony_composer:
container: container:
image: hub.nationtech.io/harmony/harmony_composer:latest image: hub.nationtech.io/harmony/harmony_composer:latest@sha256:eb0406fcb95c63df9b7c4b19bc50ad7914dd8232ce98e9c9abef628e07c69386
runs-on: dind runs-on: dind
steps: steps:
- name: Checkout code - name: Checkout code

View File

@@ -13,7 +13,6 @@ WORKDIR /app
RUN rustup target add x86_64-pc-windows-gnu RUN rustup target add x86_64-pc-windows-gnu
RUN rustup target add x86_64-unknown-linux-gnu RUN rustup target add x86_64-unknown-linux-gnu
RUN rustup component add rustfmt RUN rustup component add rustfmt
RUN rustup component add clippy
RUN apt update RUN apt update

View File

@@ -1,7 +1,5 @@
#!/bin/sh #!/bin/sh
set -e set -e
cargo check --all-targets --all-features --keep-going cargo check --all-targets --all-features --keep-going
cargo fmt --check cargo fmt --check
cargo clippy
cargo test cargo test

View File

@@ -1,11 +1,17 @@
use std::{path::PathBuf, str::FromStr, sync::Arc}; use std::{path::PathBuf, sync::Arc};
use harmony::{ use harmony::{
data::Id, data::Id,
inventory::Inventory, inventory::Inventory,
maestro::Maestro,
modules::{ modules::{
application::{ApplicationScore, RustWebFramework, RustWebapp, features::Monitoring}, application::{
monitoring::alert_channel::webhook_receiver::WebhookReceiver, ApplicationScore, RustWebFramework, RustWebapp,
features::{ContinuousDelivery, Monitoring},
},
monitoring::alert_channel::{
discord_alert_channel::DiscordWebhook, webhook_receiver::WebhookReceiver,
},
tenant::TenantScore, tenant::TenantScore,
}, },
topology::{K8sAnywhereTopology, Url, tenant::TenantConfig}, topology::{K8sAnywhereTopology, Url, tenant::TenantConfig},
@@ -19,7 +25,7 @@ async fn main() {
//the TenantConfig.name must match //the TenantConfig.name must match
let tenant = TenantScore { let tenant = TenantScore {
config: TenantConfig { config: TenantConfig {
id: Id::from_str("test-tenant-id").unwrap(), id: Id::from_str("test-tenant-id"),
name: "example-monitoring".to_string(), name: "example-monitoring".to_string(),
..Default::default() ..Default::default()
}, },

View File

@@ -125,47 +125,40 @@ spec:
name: nginx"#, name: nginx"#,
) )
.unwrap(); .unwrap();
deployment return deployment;
} }
fn nginx_deployment_2() -> Deployment { fn nginx_deployment_2() -> Deployment {
let pod_template = PodTemplateSpec { let mut pod_template = PodTemplateSpec::default();
metadata: Some(ObjectMeta { pod_template.metadata = Some(ObjectMeta {
labels: Some(BTreeMap::from([( labels: Some(BTreeMap::from([(
"app".to_string(), "app".to_string(),
"nginx-test".to_string(), "nginx-test".to_string(),
)])), )])),
..Default::default() ..Default::default()
}), });
spec: Some(PodSpec { pod_template.spec = Some(PodSpec {
containers: vec![Container { containers: vec![Container {
name: "nginx".to_string(), name: "nginx".to_string(),
image: Some("nginx".to_string()), image: Some("nginx".to_string()),
..Default::default() ..Default::default()
}], }],
..Default::default() ..Default::default()
}), });
}; let mut spec = DeploymentSpec::default();
spec.template = pod_template;
let spec = DeploymentSpec { spec.selector = LabelSelector {
template: pod_template,
selector: LabelSelector {
match_expressions: None, match_expressions: None,
match_labels: Some(BTreeMap::from([( match_labels: Some(BTreeMap::from([(
"app".to_string(), "app".to_string(),
"nginx-test".to_string(), "nginx-test".to_string(),
)])), )])),
},
..Default::default()
}; };
Deployment { let mut deployment = Deployment::default();
spec: Some(spec), deployment.spec = Some(spec);
metadata: ObjectMeta { deployment.metadata.name = Some("nginx-test".to_string());
name: Some("nginx-test".to_string()),
..Default::default() deployment
},
..Default::default()
}
} }
fn nginx_deployment() -> Deployment { fn nginx_deployment() -> Deployment {

View File

@@ -23,7 +23,7 @@ async fn main() {
// This config can be extended as needed for more complicated configurations // This config can be extended as needed for more complicated configurations
config: LAMPConfig { config: LAMPConfig {
project_root: "./php".into(), project_root: "./php".into(),
database_size: "4Gi".to_string().into(), database_size: format!("4Gi").into(),
..Default::default() ..Default::default()
}, },
}; };

View File

@@ -1,4 +1,4 @@
use std::{collections::HashMap, str::FromStr}; use std::collections::HashMap;
use harmony::{ use harmony::{
data::Id, data::Id,
@@ -28,7 +28,7 @@ use harmony::{
async fn main() { async fn main() {
let tenant = TenantScore { let tenant = TenantScore {
config: TenantConfig { config: TenantConfig {
id: Id::from_str("1234").unwrap(), id: Id::from_string("1234".to_string()),
name: "test-tenant".to_string(), name: "test-tenant".to_string(),
resource_limits: ResourceLimits { resource_limits: ResourceLimits {
cpu_request_cores: 6.0, cpu_request_cores: 6.0,

View File

@@ -1,12 +0,0 @@
[package]
name = "example_remove_rook_osd"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { version = "0.1.0", path = "../../harmony" }
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
harmony_tui = { version = "0.1.0", path = "../../harmony_tui" }
tokio.workspace = true

View File

@@ -1,18 +0,0 @@
use harmony::{
inventory::Inventory, modules::storage::ceph::ceph_remove_osd_score::CephRemoveOsd,
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let ceph_score = CephRemoveOsd {
osd_deployment_name: "rook-ceph-osd-2".to_string(),
rook_ceph_namespace: "rook-ceph".to_string(),
};
let topology = K8sAnywhereTopology::from_env();
let inventory = Inventory::autoload();
harmony_cli::run(inventory, topology, vec![Box::new(ceph_score)], None)
.await
.unwrap();
}

View File

@@ -1,5 +1,3 @@
use std::str::FromStr;
use harmony::{ use harmony::{
data::Id, data::Id,
inventory::Inventory, inventory::Inventory,
@@ -11,7 +9,7 @@ use harmony::{
async fn main() { async fn main() {
let tenant = TenantScore { let tenant = TenantScore {
config: TenantConfig { config: TenantConfig {
id: Id::from_str("test-tenant-id").unwrap(), id: Id::from_str("test-tenant-id"),
name: "testtenant".to_string(), name: "testtenant".to_string(),
..Default::default() ..Default::default()
}, },

View File

@@ -5,9 +5,6 @@ version.workspace = true
readme.workspace = true readme.workspace = true
license.workspace = true license.workspace = true
[features]
testing = []
[dependencies] [dependencies]
rand = "0.9" rand = "0.9"
hex = "0.4" hex = "0.4"

View File

@@ -11,5 +11,5 @@ lazy_static! {
pub static ref REGISTRY_PROJECT: String = pub static ref REGISTRY_PROJECT: String =
std::env::var("HARMONY_REGISTRY_PROJECT").unwrap_or_else(|_| "harmony".to_string()); std::env::var("HARMONY_REGISTRY_PROJECT").unwrap_or_else(|_| "harmony".to_string());
pub static ref DRY_RUN: bool = pub static ref DRY_RUN: bool =
std::env::var("HARMONY_DRY_RUN").is_ok_and(|value| value.parse().unwrap_or(false)); std::env::var("HARMONY_DRY_RUN").map_or(true, |value| value.parse().unwrap_or(true));
} }

View File

@@ -1,6 +1,5 @@
use rand::distr::Alphanumeric; use rand::distr::Alphanumeric;
use rand::distr::SampleString; use rand::distr::SampleString;
use std::str::FromStr;
use std::time::SystemTime; use std::time::SystemTime;
use std::time::UNIX_EPOCH; use std::time::UNIX_EPOCH;
@@ -24,13 +23,13 @@ pub struct Id {
value: String, value: String,
} }
impl FromStr for Id { impl Id {
type Err = (); pub fn from_string(value: String) -> Self {
Self { value }
}
fn from_str(s: &str) -> Result<Self, Self::Err> { pub fn from_str(value: &str) -> Self {
Ok(Id { Self::from_string(value.to_string())
value: s.to_string(),
})
} }
} }

View File

@@ -47,7 +47,7 @@ impl serde::Serialize for Version {
impl std::fmt::Display for Version { impl std::fmt::Display for Version {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
self.value.fmt(f) return self.value.fmt(f);
} }
} }

View File

@@ -35,9 +35,10 @@ impl PhysicalHost {
pub fn cluster_mac(&self) -> MacAddress { pub fn cluster_mac(&self) -> MacAddress {
self.network self.network
.first() .get(0)
.expect("Cluster physical host should have a network interface") .expect("Cluster physical host should have a network interface")
.mac_address .mac_address
.clone()
} }
pub fn cpu(mut self, cpu_count: Option<u64>) -> Self { pub fn cpu(mut self, cpu_count: Option<u64>) -> Self {

View File

@@ -2,42 +2,28 @@ use log::debug;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use crate::modules::application::ApplicationFeatureStatus; use super::interpret::{InterpretError, Outcome};
use super::{
interpret::{InterpretError, Outcome},
topology::TopologyStatus,
};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum HarmonyEvent { pub enum HarmonyEvent {
HarmonyStarted, HarmonyStarted,
HarmonyFinished, PrepareTopologyStarted {
topology: String,
},
TopologyPrepared {
topology: String,
outcome: Outcome,
},
InterpretExecutionStarted { InterpretExecutionStarted {
execution_id: String,
topology: String, topology: String,
interpret: String, interpret: String,
score: String,
message: String, message: String,
}, },
InterpretExecutionFinished { InterpretExecutionFinished {
execution_id: String,
topology: String, topology: String,
interpret: String, interpret: String,
score: String,
outcome: Result<Outcome, InterpretError>, outcome: Result<Outcome, InterpretError>,
}, },
TopologyStateChanged {
topology: String,
status: TopologyStatus,
message: Option<String>,
},
ApplicationFeatureStateChanged {
topology: String,
application: String,
feature: String,
status: ApplicationFeatureStatus,
},
} }
static HARMONY_EVENT_BUS: Lazy<broadcast::Sender<HarmonyEvent>> = Lazy::new(|| { static HARMONY_EVENT_BUS: Lazy<broadcast::Sender<HarmonyEvent>> = Lazy::new(|| {
@@ -47,15 +33,10 @@ static HARMONY_EVENT_BUS: Lazy<broadcast::Sender<HarmonyEvent>> = Lazy::new(|| {
}); });
pub fn instrument(event: HarmonyEvent) -> Result<(), &'static str> { pub fn instrument(event: HarmonyEvent) -> Result<(), &'static str> {
if cfg!(any(test, feature = "testing")) {
let _ = event; // Suppress the "unused variable" warning for `event`
Ok(())
} else {
match HARMONY_EVENT_BUS.send(event) { match HARMONY_EVENT_BUS.send(event) {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(_) => Err("send error: no subscribers"), Err(_) => Err("send error: no subscribers"),
} }
}
} }
pub async fn subscribe<F, Fut>(name: &str, mut handler: F) pub async fn subscribe<F, Fut>(name: &str, mut handler: F)

View File

@@ -7,7 +7,6 @@ use super::{
data::{Id, Version}, data::{Id, Version},
executors::ExecutorError, executors::ExecutorError,
inventory::Inventory, inventory::Inventory,
topology::PreparationError,
}; };
pub enum InterpretName { pub enum InterpretName {
@@ -24,14 +23,6 @@ pub enum InterpretName {
TenantInterpret, TenantInterpret,
Application, Application,
ArgoCD, ArgoCD,
Alerting,
Ntfy,
HelmChart,
HelmCommand,
K8sResource,
Lamp,
ApplicationMonitoring,
K8sPrometheusCrdAlerting,
} }
impl std::fmt::Display for InterpretName { impl std::fmt::Display for InterpretName {
@@ -50,14 +41,6 @@ impl std::fmt::Display for InterpretName {
InterpretName::TenantInterpret => f.write_str("Tenant"), InterpretName::TenantInterpret => f.write_str("Tenant"),
InterpretName::Application => f.write_str("Application"), InterpretName::Application => f.write_str("Application"),
InterpretName::ArgoCD => f.write_str("ArgoCD"), InterpretName::ArgoCD => f.write_str("ArgoCD"),
InterpretName::Alerting => f.write_str("Alerting"),
InterpretName::Ntfy => f.write_str("Ntfy"),
InterpretName::HelmChart => f.write_str("HelmChart"),
InterpretName::HelmCommand => f.write_str("HelmCommand"),
InterpretName::K8sResource => f.write_str("K8sResource"),
InterpretName::Lamp => f.write_str("LAMP"),
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
} }
} }
} }
@@ -130,14 +113,6 @@ impl std::fmt::Display for InterpretError {
} }
impl Error for InterpretError {} impl Error for InterpretError {}
impl From<PreparationError> for InterpretError {
fn from(value: PreparationError) -> Self {
Self {
msg: format!("InterpretError : {value}"),
}
}
}
impl From<ExecutorError> for InterpretError { impl From<ExecutorError> for InterpretError {
fn from(value: ExecutorError) -> Self { fn from(value: ExecutorError) -> Self {
Self { Self {

View File

@@ -1,14 +1,14 @@
use std::sync::{Arc, RwLock}; use std::sync::{Arc, Mutex, RwLock};
use log::{debug, warn}; use log::{debug, warn};
use crate::topology::TopologyStatus; use crate::instrumentation::{self, HarmonyEvent};
use super::{ use super::{
interpret::{InterpretError, Outcome}, interpret::{InterpretError, InterpretStatus, Outcome},
inventory::Inventory, inventory::Inventory,
score::Score, score::Score,
topology::{PreparationError, PreparationOutcome, Topology, TopologyState}, topology::Topology,
}; };
type ScoreVec<T> = Vec<Box<dyn Score<T>>>; type ScoreVec<T> = Vec<Box<dyn Score<T>>>;
@@ -17,7 +17,7 @@ pub struct Maestro<T: Topology> {
inventory: Inventory, inventory: Inventory,
topology: T, topology: T,
scores: Arc<RwLock<ScoreVec<T>>>, scores: Arc<RwLock<ScoreVec<T>>>,
topology_state: TopologyState, topology_preparation_result: Mutex<Option<Outcome>>,
} }
impl<T: Topology> Maestro<T> { impl<T: Topology> Maestro<T> {
@@ -25,47 +25,42 @@ impl<T: Topology> Maestro<T> {
/// ///
/// This should rarely be used. Most of the time Maestro::initialize should be used instead. /// This should rarely be used. Most of the time Maestro::initialize should be used instead.
pub fn new_without_initialization(inventory: Inventory, topology: T) -> Self { pub fn new_without_initialization(inventory: Inventory, topology: T) -> Self {
let topology_name = topology.name().to_string();
Self { Self {
inventory, inventory,
topology, topology,
scores: Arc::new(RwLock::new(Vec::new())), scores: Arc::new(RwLock::new(Vec::new())),
topology_state: TopologyState::new(topology_name), topology_preparation_result: None.into(),
} }
} }
pub async fn initialize(inventory: Inventory, topology: T) -> Result<Self, PreparationError> { pub async fn initialize(inventory: Inventory, topology: T) -> Result<Self, InterpretError> {
let mut instance = Self::new_without_initialization(inventory, topology); let instance = Self::new_without_initialization(inventory, topology);
instance.prepare_topology().await?; instance.prepare_topology().await?;
Ok(instance) Ok(instance)
} }
/// Ensures the associated Topology is ready for operations. /// Ensures the associated Topology is ready for operations.
/// Delegates the readiness check and potential setup actions to the Topology. /// Delegates the readiness check and potential setup actions to the Topology.
async fn prepare_topology(&mut self) -> Result<PreparationOutcome, PreparationError> { pub async fn prepare_topology(&self) -> Result<Outcome, InterpretError> {
self.topology_state.prepare(); instrumentation::instrument(HarmonyEvent::PrepareTopologyStarted {
topology: self.topology.name().to_string(),
})
.unwrap();
let result = self.topology.ensure_ready().await; let outcome = self.topology.ensure_ready().await?;
match result { instrumentation::instrument(HarmonyEvent::TopologyPrepared {
Ok(outcome) => { topology: self.topology.name().to_string(),
match outcome.clone() { outcome: outcome.clone(),
PreparationOutcome::Success { details } => { })
self.topology_state.success(details); .unwrap();
}
PreparationOutcome::Noop => { self.topology_preparation_result
self.topology_state.noop(); .lock()
} .unwrap()
}; .replace(outcome.clone());
Ok(outcome) Ok(outcome)
} }
Err(err) => {
self.topology_state.error(err.to_string());
Err(err)
}
}
}
pub fn register_all(&mut self, mut scores: ScoreVec<T>) { pub fn register_all(&mut self, mut scores: ScoreVec<T>) {
let mut score_mut = self.scores.write().expect("Should acquire lock"); let mut score_mut = self.scores.write().expect("Should acquire lock");
@@ -73,7 +68,15 @@ impl<T: Topology> Maestro<T> {
} }
fn is_topology_initialized(&self) -> bool { fn is_topology_initialized(&self) -> bool {
self.topology_state.status == TopologyStatus::Success let result = self.topology_preparation_result.lock().unwrap();
if let Some(outcome) = result.as_ref() {
match outcome.status {
InterpretStatus::SUCCESS => return true,
_ => return false,
}
} else {
false
}
} }
pub async fn interpret(&self, score: Box<dyn Score<T>>) -> Result<Outcome, InterpretError> { pub async fn interpret(&self, score: Box<dyn Score<T>>) -> Result<Outcome, InterpretError> {
@@ -84,8 +87,10 @@ impl<T: Topology> Maestro<T> {
self.topology.name(), self.topology.name(),
); );
} }
debug!("Interpreting score {score:?}"); debug!("Running score {score:?}");
let result = score.interpret(&self.inventory, &self.topology).await; let interpret = score.create_interpret();
debug!("Launching interpret {interpret:?}");
let result = interpret.execute(&self.inventory, &self.topology).await;
debug!("Got result {result:?}"); debug!("Got result {result:?}");
result result
} }

View File

@@ -1,62 +1,22 @@
use std::collections::BTreeMap; use std::collections::BTreeMap;
use async_trait::async_trait;
use serde::Serialize; use serde::Serialize;
use serde_value::Value; use serde_value::Value;
use super::{ use super::{interpret::Interpret, topology::Topology};
data::Id,
instrumentation::{self, HarmonyEvent},
interpret::{Interpret, InterpretError, Outcome},
inventory::Inventory,
topology::Topology,
};
#[async_trait]
pub trait Score<T: Topology>: pub trait Score<T: Topology>:
std::fmt::Debug + ScoreToString<T> + Send + Sync + CloneBoxScore<T> + SerializeScore<T> std::fmt::Debug + ScoreToString<T> + Send + Sync + CloneBoxScore<T> + SerializeScore<T>
{ {
async fn interpret(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let id = Id::default();
let interpret = self.create_interpret();
instrumentation::instrument(HarmonyEvent::InterpretExecutionStarted {
execution_id: id.clone().to_string(),
topology: topology.name().into(),
interpret: interpret.get_name().to_string(),
score: self.name(),
message: format!("{} running...", interpret.get_name()),
})
.unwrap();
let result = interpret.execute(inventory, topology).await;
instrumentation::instrument(HarmonyEvent::InterpretExecutionFinished {
execution_id: id.clone().to_string(),
topology: topology.name().into(),
interpret: interpret.get_name().to_string(),
score: self.name(),
outcome: result.clone(),
})
.unwrap();
result
}
fn name(&self) -> String;
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>>; fn create_interpret(&self) -> Box<dyn Interpret<T>>;
fn name(&self) -> String;
} }
pub trait SerializeScore<T: Topology> { pub trait SerializeScore<T: Topology> {
fn serialize(&self) -> Value; fn serialize(&self) -> Value;
} }
impl<S, T> SerializeScore<T> for S impl<'de, S, T> SerializeScore<T> for S
where where
T: Topology, T: Topology,
S: Score<T> + Serialize, S: Score<T> + Serialize,
@@ -64,7 +24,7 @@ where
fn serialize(&self) -> Value { fn serialize(&self) -> Value {
// TODO not sure if this is the right place to handle the error or it should bubble // TODO not sure if this is the right place to handle the error or it should bubble
// up? // up?
serde_value::to_value(self).expect("Score should serialize successfully") serde_value::to_value(&self).expect("Score should serialize successfully")
} }
} }

View File

@@ -4,6 +4,8 @@ use harmony_types::net::MacAddress;
use log::info; use log::info;
use crate::executors::ExecutorError; use crate::executors::ExecutorError;
use crate::interpret::InterpretError;
use crate::interpret::Outcome;
use super::DHCPStaticEntry; use super::DHCPStaticEntry;
use super::DhcpServer; use super::DhcpServer;
@@ -17,8 +19,6 @@ use super::K8sclient;
use super::LoadBalancer; use super::LoadBalancer;
use super::LoadBalancerService; use super::LoadBalancerService;
use super::LogicalHost; use super::LogicalHost;
use super::PreparationError;
use super::PreparationOutcome;
use super::Router; use super::Router;
use super::TftpServer; use super::TftpServer;
@@ -48,7 +48,7 @@ impl Topology for HAClusterTopology {
fn name(&self) -> &str { fn name(&self) -> &str {
"HAClusterTopology" "HAClusterTopology"
} }
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> { async fn ensure_ready(&self) -> Result<Outcome, InterpretError> {
todo!( todo!(
"ensure_ready, not entirely sure what it should do here, probably something like verify that the hosts are reachable and all services are up and ready." "ensure_ready, not entirely sure what it should do here, probably something like verify that the hosts are reachable and all services are up and ready."
) )
@@ -244,12 +244,10 @@ impl Topology for DummyInfra {
todo!() todo!()
} }
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> { async fn ensure_ready(&self) -> Result<Outcome, InterpretError> {
let dummy_msg = "This is a dummy infrastructure that does nothing"; let dummy_msg = "This is a dummy infrastructure that does nothing";
info!("{dummy_msg}"); info!("{dummy_msg}");
Ok(PreparationOutcome::Success { Ok(Outcome::success(dummy_msg.to_string()))
details: dummy_msg.into(),
})
} }
} }

View File

@@ -1,11 +1,12 @@
use derive_new::new; use derive_new::new;
use futures_util::StreamExt;
use k8s_openapi::{ use k8s_openapi::{
ClusterResourceScope, NamespaceResourceScope, ClusterResourceScope, NamespaceResourceScope,
api::{apps::v1::Deployment, core::v1::Pod}, api::{apps::v1::Deployment, core::v1::Pod},
}; };
use kube::{ use kube::{
Client, Config, Error, Resource, Client, Config, Error, Resource,
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt}, api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt},
config::{KubeConfigOptions, Kubeconfig}, config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse, core::ErrorResponse,
runtime::reflector::Lookup, runtime::reflector::Lookup,
@@ -17,9 +18,7 @@ use kube::{
}; };
use log::{debug, error, trace}; use log::{debug, error, trace};
use serde::{Serialize, de::DeserializeOwned}; use serde::{Serialize, de::DeserializeOwned};
use serde_json::json; use similar::{DiffableStr, TextDiff};
use similar::TextDiff;
use tokio::io::AsyncReadExt;
#[derive(new, Clone)] #[derive(new, Clone)]
pub struct K8sClient { pub struct K8sClient {
@@ -53,66 +52,6 @@ impl K8sClient {
}) })
} }
pub async fn get_deployment(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<Option<Deployment>, Error> {
let deps: Api<Deployment> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
Ok(deps.get_opt(name).await?)
}
pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result<Option<Pod>, Error> {
let pods: Api<Pod> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
Ok(pods.get_opt(name).await?)
}
pub async fn scale_deployment(
&self,
name: &str,
namespace: Option<&str>,
replicas: u32,
) -> Result<(), Error> {
let deployments: Api<Deployment> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
let patch = json!({
"spec": {
"replicas": replicas
}
});
let pp = PatchParams::default();
let scale = Patch::Apply(&patch);
deployments.patch_scale(name, &pp, &scale).await?;
Ok(())
}
pub async fn delete_deployment(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<(), Error> {
let deployments: Api<Deployment> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
let delete_params = DeleteParams::default();
deployments.delete(name, &delete_params).await?;
Ok(())
}
pub async fn wait_until_deployment_ready( pub async fn wait_until_deployment_ready(
&self, &self,
name: String, name: String,
@@ -128,75 +67,13 @@ impl K8sClient {
} }
let establish = await_condition(api, name.as_str(), conditions::is_deployment_completed()); let establish = await_condition(api, name.as_str(), conditions::is_deployment_completed());
let t = timeout.unwrap_or(300); let t = if let Some(t) = timeout { t } else { 300 };
let res = tokio::time::timeout(std::time::Duration::from_secs(t), establish).await; let res = tokio::time::timeout(std::time::Duration::from_secs(t), establish).await;
if res.is_ok() { if let Ok(r) = res {
Ok(()) return Ok(());
} else { } else {
Err("timed out while waiting for deployment".to_string()) return Err("timed out while waiting for deployment".to_string());
}
}
/// Will execute a commond in the first pod found that matches the specified label
/// '{label}={name}'
pub async fn exec_app_capture_output(
&self,
name: String,
label: String,
namespace: Option<&str>,
command: Vec<&str>,
) -> Result<String, String> {
let api: Api<Pod>;
if let Some(ns) = namespace {
api = Api::namespaced(self.client.clone(), ns);
} else {
api = Api::default_namespaced(self.client.clone());
}
let pod_list = api
.list(&ListParams::default().labels(format!("{label}={name}").as_str()))
.await
.expect("couldn't get list of pods");
let res = api
.exec(
pod_list
.items
.first()
.expect("couldn't get pod")
.name()
.expect("couldn't get pod name")
.into_owned()
.as_str(),
command,
&AttachParams::default().stdout(true).stderr(true),
)
.await;
match res {
Err(e) => Err(e.to_string()),
Ok(mut process) => {
let status = process
.take_status()
.expect("Couldn't get status")
.await
.expect("Couldn't unwrap status");
if let Some(s) = status.status {
let mut stdout_buf = String::new();
if let Some(mut stdout) = process.stdout().take() {
stdout.read_to_string(&mut stdout_buf).await;
}
debug!("Status: {} - {:?}", s, status.details);
if s == "Success" {
Ok(stdout_buf)
} else {
Err(s)
}
} else {
Err("Couldn't get inner status of pod exec".to_string())
}
}
} }
} }
@@ -235,7 +112,7 @@ impl K8sClient {
.await; .await;
match res { match res {
Err(e) => Err(e.to_string()), Err(e) => return Err(e.to_string()),
Ok(mut process) => { Ok(mut process) => {
let status = process let status = process
.take_status() .take_status()
@@ -244,10 +121,14 @@ impl K8sClient {
.expect("Couldn't unwrap status"); .expect("Couldn't unwrap status");
if let Some(s) = status.status { if let Some(s) = status.status {
debug!("Status: {} - {:?}", s, status.details); debug!("Status: {}", s);
if s == "Success" { Ok(()) } else { Err(s) } if s == "Success" {
return Ok(());
} else { } else {
Err("Couldn't get inner status of pod exec".to_string()) return Err(s);
}
} else {
return Err("Couldn't get inner status of pod exec".to_string());
} }
} }
} }
@@ -288,9 +169,8 @@ impl K8sClient {
trace!("Received current value {current:#?}"); trace!("Received current value {current:#?}");
// The resource exists, so we calculate and display a diff. // The resource exists, so we calculate and display a diff.
println!("\nPerforming dry-run for resource: '{}'", name); println!("\nPerforming dry-run for resource: '{}'", name);
let mut current_yaml = serde_yaml::to_value(&current).unwrap_or_else(|_| { let mut current_yaml = serde_yaml::to_value(&current)
panic!("Could not serialize current value : {current:#?}") .expect(&format!("Could not serialize current value : {current:#?}"));
});
if current_yaml.is_mapping() && current_yaml.get("status").is_some() { if current_yaml.is_mapping() && current_yaml.get("status").is_some() {
let map = current_yaml.as_mapping_mut().unwrap(); let map = current_yaml.as_mapping_mut().unwrap();
let removed = map.remove_entry("status"); let removed = map.remove_entry("status");
@@ -357,7 +237,7 @@ impl K8sClient {
} }
} }
pub async fn apply_many<K>(&self, resource: &[K], ns: Option<&str>) -> Result<Vec<K>, Error> pub async fn apply_many<K>(&self, resource: &Vec<K>, ns: Option<&str>) -> Result<Vec<K>, Error>
where where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize, K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize,
<K as Resource>::Scope: ApplyStrategy<K>, <K as Resource>::Scope: ApplyStrategy<K>,
@@ -373,7 +253,7 @@ impl K8sClient {
pub async fn apply_yaml_many( pub async fn apply_yaml_many(
&self, &self,
#[allow(clippy::ptr_arg)] yaml: &Vec<serde_yaml::Value>, yaml: &Vec<serde_yaml::Value>,
ns: Option<&str>, ns: Option<&str>,
) -> Result<(), Error> { ) -> Result<(), Error> {
for y in yaml.iter() { for y in yaml.iter() {

View File

@@ -7,7 +7,7 @@ use tokio::sync::OnceCell;
use crate::{ use crate::{
executors::ExecutorError, executors::ExecutorError,
interpret::InterpretStatus, interpret::{InterpretError, InterpretStatus, Outcome},
inventory::Inventory, inventory::Inventory,
modules::{ modules::{
k3d::K3DInstallationScore, k3d::K3DInstallationScore,
@@ -24,17 +24,10 @@ use crate::{
}; };
use super::{ use super::{
DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, PreparationError, DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, Topology,
PreparationOutcome, Topology,
k8s::K8sClient, k8s::K8sClient,
oberservability::monitoring::AlertReceiver, oberservability::monitoring::AlertReceiver,
tenant::{ tenant::{TenantConfig, TenantManager, k8s::K8sTenantManager},
TenantConfig, TenantManager,
k8s::K8sTenantManager,
network_policy::{
K3dNetworkPolicyStrategy, NetworkPolicyStrategy, NoopNetworkPolicyStrategy,
},
},
}; };
#[derive(Clone, Debug)] #[derive(Clone, Debug)]
@@ -81,30 +74,20 @@ impl PrometheusApplicationMonitoring<CRDPrometheus> for K8sAnywhereTopology {
sender: &CRDPrometheus, sender: &CRDPrometheus,
inventory: &Inventory, inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>, receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>,
) -> Result<PreparationOutcome, PreparationError> { ) -> Result<Outcome, InterpretError> {
let po_result = self.ensure_prometheus_operator(sender).await?; let po_result = self.ensure_prometheus_operator(sender).await?;
if po_result == PreparationOutcome::Noop { if po_result.status == InterpretStatus::NOOP {
debug!("Skipping Prometheus CR installation due to missing operator."); debug!("Skipping Prometheus CR installation due to missing operator.");
return Ok(po_result); return Ok(Outcome::noop());
} }
self.get_k8s_prometheus_application_score(sender.clone(), receivers)
let result = self
.get_k8s_prometheus_application_score(sender.clone(), receivers)
.await .await
.interpret(inventory, self) .create_interpret()
.await; .execute(inventory, self)
.await?;
match result { Ok(Outcome::success(format!("No action, working on cluster ")))
Ok(outcome) => match outcome.status {
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
details: outcome.message,
}),
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
_ => Err(PreparationError::new(outcome.message)),
},
Err(err) => Err(PreparationError::new(err.to_string())),
}
} }
} }
@@ -141,7 +124,7 @@ impl K8sAnywhereTopology {
) -> K8sPrometheusCRDAlertingScore { ) -> K8sPrometheusCRDAlertingScore {
K8sPrometheusCRDAlertingScore { K8sPrometheusCRDAlertingScore {
sender, sender,
receivers: receivers.unwrap_or_default(), receivers: receivers.unwrap_or_else(Vec::new),
service_monitors: vec![], service_monitors: vec![],
prometheus_rules: vec![], prometheus_rules: vec![],
} }
@@ -175,23 +158,15 @@ impl K8sAnywhereTopology {
K3DInstallationScore::default() K3DInstallationScore::default()
} }
async fn try_install_k3d(&self) -> Result<(), PreparationError> { async fn try_install_k3d(&self) -> Result<(), InterpretError> {
let result = self self.get_k3d_installation_score()
.get_k3d_installation_score() .create_interpret()
.interpret(&Inventory::empty(), self) .execute(&Inventory::empty(), self)
.await; .await?;
Ok(())
match result {
Ok(outcome) => match outcome.status {
InterpretStatus::SUCCESS => Ok(()),
InterpretStatus::NOOP => Ok(()),
_ => Err(PreparationError::new(outcome.message)),
},
Err(err) => Err(PreparationError::new(err.to_string())),
}
} }
async fn try_get_or_install_k8s_client(&self) -> Result<Option<K8sState>, PreparationError> { async fn try_get_or_install_k8s_client(&self) -> Result<Option<K8sState>, InterpretError> {
let k8s_anywhere_config = &self.config; let k8s_anywhere_config = &self.config;
// TODO this deserves some refactoring, it is becoming a bit hard to figure out // TODO this deserves some refactoring, it is becoming a bit hard to figure out
@@ -201,7 +176,7 @@ impl K8sAnywhereTopology {
} else { } else {
if let Some(kubeconfig) = &k8s_anywhere_config.kubeconfig { if let Some(kubeconfig) = &k8s_anywhere_config.kubeconfig {
debug!("Loading kubeconfig {kubeconfig}"); debug!("Loading kubeconfig {kubeconfig}");
match self.try_load_kubeconfig(kubeconfig).await { match self.try_load_kubeconfig(&kubeconfig).await {
Some(client) => { Some(client) => {
return Ok(Some(K8sState { return Ok(Some(K8sState {
client: Arc::new(client), client: Arc::new(client),
@@ -210,7 +185,7 @@ impl K8sAnywhereTopology {
})); }));
} }
None => { None => {
return Err(PreparationError::new(format!( return Err(InterpretError::new(format!(
"Failed to load kubeconfig from {kubeconfig}" "Failed to load kubeconfig from {kubeconfig}"
))); )));
} }
@@ -256,21 +231,16 @@ impl K8sAnywhereTopology {
Ok(Some(state)) Ok(Some(state))
} }
async fn ensure_k8s_tenant_manager(&self, k8s_state: &K8sState) -> Result<(), String> { async fn ensure_k8s_tenant_manager(&self) -> Result<(), String> {
if self.tenant_manager.get().is_some() { if let Some(_) = self.tenant_manager.get() {
return Ok(()); return Ok(());
} }
self.tenant_manager self.tenant_manager
.get_or_try_init(async || -> Result<K8sTenantManager, String> { .get_or_try_init(async || -> Result<K8sTenantManager, String> {
// TOOD: checker si K8s ou K3d/s tenant manager (ref. issue https://git.nationtech.io/NationTech/harmony/issues/94)
let k8s_client = self.k8s_client().await?; let k8s_client = self.k8s_client().await?;
let network_policy_strategy: Box<dyn NetworkPolicyStrategy> = match k8s_state.source Ok(K8sTenantManager::new(k8s_client))
{
K8sSource::LocalK3d => Box::new(K3dNetworkPolicyStrategy::new()),
K8sSource::Kubeconfig => Box::new(NoopNetworkPolicyStrategy::new()),
};
Ok(K8sTenantManager::new(k8s_client, network_policy_strategy))
}) })
.await?; .await?;
@@ -289,11 +259,11 @@ impl K8sAnywhereTopology {
async fn ensure_prometheus_operator( async fn ensure_prometheus_operator(
&self, &self,
sender: &CRDPrometheus, sender: &CRDPrometheus,
) -> Result<PreparationOutcome, PreparationError> { ) -> Result<Outcome, InterpretError> {
let status = Command::new("sh") let status = Command::new("sh")
.args(["-c", "kubectl get crd -A | grep -i prometheuses"]) .args(["-c", "kubectl get crd -A | grep -i prometheuses"])
.status() .status()
.map_err(|e| PreparationError::new(format!("could not connect to cluster: {}", e)))?; .map_err(|e| InterpretError::new(format!("could not connect to cluster: {}", e)))?;
if !status.success() { if !status.success() {
if let Some(Some(k8s_state)) = self.k8s_state.get() { if let Some(Some(k8s_state)) = self.k8s_state.get() {
@@ -302,37 +272,30 @@ impl K8sAnywhereTopology {
debug!("installing prometheus operator"); debug!("installing prometheus operator");
let op_score = let op_score =
prometheus_operator_helm_chart_score(sender.namespace.clone()); prometheus_operator_helm_chart_score(sender.namespace.clone());
let result = op_score.interpret(&Inventory::empty(), self).await; op_score
.create_interpret()
return match result { .execute(&Inventory::empty(), self)
Ok(outcome) => match outcome.status { .await?;
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success { return Ok(Outcome::success(
details: "installed prometheus operator".into(), "installed prometheus operator".to_string(),
}), ));
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
_ => Err(PreparationError::new(
"failed to install prometheus operator (unknown error)".into(),
)),
},
Err(err) => Err(PreparationError::new(err.to_string())),
};
} }
K8sSource::Kubeconfig => { K8sSource::Kubeconfig => {
debug!("unable to install prometheus operator, contact cluster admin"); debug!("unable to install prometheus operator, contact cluster admin");
return Ok(PreparationOutcome::Noop); return Ok(Outcome::noop());
} }
} }
} else { } else {
warn!("Unable to detect k8s_state. Skipping Prometheus Operator install."); warn!("Unable to detect k8s_state. Skipping Prometheus Operator install.");
return Ok(PreparationOutcome::Noop); return Ok(Outcome::noop());
} }
} }
debug!("Prometheus operator is already present, skipping install"); debug!("Prometheus operator is already present, skipping install");
Ok(PreparationOutcome::Success { Ok(Outcome::success(
details: "prometheus operator present in cluster".into(), "prometheus operator present in cluster".to_string(),
}) ))
} }
} }
@@ -391,25 +354,26 @@ impl Topology for K8sAnywhereTopology {
"K8sAnywhereTopology" "K8sAnywhereTopology"
} }
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> { async fn ensure_ready(&self) -> Result<Outcome, InterpretError> {
let k8s_state = self let k8s_state = self
.k8s_state .k8s_state
.get_or_try_init(|| self.try_get_or_install_k8s_client()) .get_or_try_init(|| self.try_get_or_install_k8s_client())
.await?; .await?;
let k8s_state: &K8sState = k8s_state.as_ref().ok_or(PreparationError::new( let k8s_state: &K8sState = k8s_state.as_ref().ok_or(InterpretError::new(
"no K8s client could be found or installed".to_string(), "No K8s client could be found or installed".to_string(),
))?; ))?;
self.ensure_k8s_tenant_manager(k8s_state) self.ensure_k8s_tenant_manager()
.await .await
.map_err(PreparationError::new)?; .map_err(|e| InterpretError::new(e))?;
match self.is_helm_available() { match self.is_helm_available() {
Ok(()) => Ok(PreparationOutcome::Success { Ok(()) => Ok(Outcome::success(format!(
details: format!("{} + helm available", k8s_state.message.clone()), "{} + helm available",
}), k8s_state.message.clone()
Err(e) => Err(PreparationError::new(format!("helm unavailable: {}", e))), ))),
Err(e) => Err(InterpretError::new(format!("helm unavailable: {}", e))),
} }
} }
} }

View File

@@ -1,7 +1,9 @@
use async_trait::async_trait; use async_trait::async_trait;
use derive_new::new; use derive_new::new;
use super::{HelmCommand, PreparationError, PreparationOutcome, Topology}; use crate::interpret::{InterpretError, Outcome};
use super::{HelmCommand, Topology};
#[derive(new)] #[derive(new)]
pub struct LocalhostTopology; pub struct LocalhostTopology;
@@ -12,10 +14,10 @@ impl Topology for LocalhostTopology {
"LocalHostTopology" "LocalHostTopology"
} }
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> { async fn ensure_ready(&self) -> Result<Outcome, InterpretError> {
Ok(PreparationOutcome::Success { Ok(Outcome::success(
details: "Localhost is Chuck Norris, always ready.".into(), "Localhost is Chuck Norris, always ready.".to_string(),
}) ))
} }
} }

View File

@@ -6,7 +6,6 @@ mod k8s_anywhere;
mod localhost; mod localhost;
pub mod oberservability; pub mod oberservability;
pub mod tenant; pub mod tenant;
use derive_new::new;
pub use k8s_anywhere::*; pub use k8s_anywhere::*;
pub use localhost::*; pub use localhost::*;
pub mod k8s; pub mod k8s;
@@ -27,13 +26,10 @@ pub use tftp::*;
mod helm_command; mod helm_command;
pub use helm_command::*; pub use helm_command::*;
use super::{
executors::ExecutorError,
instrumentation::{self, HarmonyEvent},
};
use std::error::Error;
use std::net::IpAddr; use std::net::IpAddr;
use super::interpret::{InterpretError, Outcome};
/// Represents a logical view of an infrastructure environment providing specific capabilities. /// Represents a logical view of an infrastructure environment providing specific capabilities.
/// ///
/// A Topology acts as a self-contained "package" responsible for managing access /// A Topology acts as a self-contained "package" responsible for managing access
@@ -61,128 +57,9 @@ pub trait Topology: Send + Sync {
/// * **Internal Orchestration:** For complex topologies, this method might manage dependencies on other sub-topologies, ensuring *their* `ensure_ready` is called first. Using nested `Maestros` to run setup `Scores` against these sub-topologies is the recommended pattern for non-trivial bootstrapping, allowing reuse of Harmony's core orchestration logic. /// * **Internal Orchestration:** For complex topologies, this method might manage dependencies on other sub-topologies, ensuring *their* `ensure_ready` is called first. Using nested `Maestros` to run setup `Scores` against these sub-topologies is the recommended pattern for non-trivial bootstrapping, allowing reuse of Harmony's core orchestration logic.
/// ///
/// # Returns /// # Returns
/// - `Ok(PreparationOutcome)`: Indicates the topology is now ready. The `Outcome` status might be `SUCCESS` if actions were taken, or `NOOP` if it was already ready. The message should provide context. /// - `Ok(Outcome)`: Indicates the topology is now ready. The `Outcome` status might be `SUCCESS` if actions were taken, or `NOOP` if it was already ready. The message should provide context.
/// - `Err(PreparationError)`: Indicates the topology could not reach a ready state due to configuration issues, discovery failures, bootstrap errors, or unsupported environments. /// - `Err(TopologyError)`: Indicates the topology could not reach a ready state due to configuration issues, discovery failures, bootstrap errors, or unsupported environments.
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError>; async fn ensure_ready(&self) -> Result<Outcome, InterpretError>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PreparationOutcome {
Success { details: String },
Noop,
}
#[derive(Debug, Clone, new)]
pub struct PreparationError {
msg: String,
}
impl std::fmt::Display for PreparationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.msg)
}
}
impl Error for PreparationError {}
impl From<ExecutorError> for PreparationError {
fn from(value: ExecutorError) -> Self {
Self {
msg: format!("InterpretError : {value}"),
}
}
}
impl From<kube::Error> for PreparationError {
fn from(value: kube::Error) -> Self {
Self {
msg: format!("PreparationError : {value}"),
}
}
}
impl From<String> for PreparationError {
fn from(value: String) -> Self {
Self {
msg: format!("PreparationError : {value}"),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum TopologyStatus {
Queued,
Preparing,
Success,
Noop,
Error,
}
pub struct TopologyState {
pub topology: String,
pub status: TopologyStatus,
}
impl TopologyState {
pub fn new(topology: String) -> Self {
let instance = Self {
topology,
status: TopologyStatus::Queued,
};
instrumentation::instrument(HarmonyEvent::TopologyStateChanged {
topology: instance.topology.clone(),
status: instance.status.clone(),
message: None,
})
.unwrap();
instance
}
pub fn prepare(&mut self) {
self.status = TopologyStatus::Preparing;
instrumentation::instrument(HarmonyEvent::TopologyStateChanged {
topology: self.topology.clone(),
status: self.status.clone(),
message: None,
})
.unwrap();
}
pub fn success(&mut self, message: String) {
self.status = TopologyStatus::Success;
instrumentation::instrument(HarmonyEvent::TopologyStateChanged {
topology: self.topology.clone(),
status: self.status.clone(),
message: Some(message),
})
.unwrap();
}
pub fn noop(&mut self) {
self.status = TopologyStatus::Noop;
instrumentation::instrument(HarmonyEvent::TopologyStateChanged {
topology: self.topology.clone(),
status: self.status.clone(),
message: None,
})
.unwrap();
}
pub fn error(&mut self, message: String) {
self.status = TopologyStatus::Error;
instrumentation::instrument(HarmonyEvent::TopologyStateChanged {
topology: self.topology.clone(),
status: self.status.clone(),
message: Some(message),
})
.unwrap();
}
} }
#[derive(Debug)] #[derive(Debug)]
@@ -211,7 +88,7 @@ impl Serialize for Url {
{ {
match self { match self {
Url::LocalFolder(path) => serializer.serialize_str(path), Url::LocalFolder(path) => serializer.serialize_str(path),
Url::Url(url) => serializer.serialize_str(url.as_str()), Url::Url(url) => serializer.serialize_str(&url.as_str()),
} }
} }
} }

View File

@@ -45,7 +45,7 @@ impl<S: AlertSender + Installable<T>, T: Topology> Interpret<T> for AlertingInte
} }
fn get_name(&self) -> InterpretName { fn get_name(&self) -> InterpretName {
InterpretName::Alerting todo!()
} }
fn get_version(&self) -> Version { fn get_version(&self) -> Version {

View File

@@ -27,11 +27,11 @@ pub struct UnmanagedRouter {
impl Router for UnmanagedRouter { impl Router for UnmanagedRouter {
fn get_gateway(&self) -> IpAddress { fn get_gateway(&self) -> IpAddress {
self.gateway self.gateway.clone()
} }
fn get_cidr(&self) -> Ipv4Cidr { fn get_cidr(&self) -> Ipv4Cidr {
self.cidr self.cidr.clone()
} }
fn get_host(&self) -> LogicalHost { fn get_host(&self) -> LogicalHost {

View File

@@ -15,38 +15,36 @@ use k8s_openapi::{
apimachinery::pkg::util::intstr::IntOrString, apimachinery::pkg::util::intstr::IntOrString,
}; };
use kube::Resource; use kube::Resource;
use log::debug; use log::{debug, info, warn};
use serde::de::DeserializeOwned; use serde::de::DeserializeOwned;
use serde_json::json; use serde_json::json;
use tokio::sync::OnceCell; use tokio::sync::OnceCell;
use super::{TenantConfig, TenantManager, network_policy::NetworkPolicyStrategy}; use super::{TenantConfig, TenantManager};
#[derive(Debug)] #[derive(Clone, Debug)]
pub struct K8sTenantManager { pub struct K8sTenantManager {
k8s_client: Arc<K8sClient>, k8s_client: Arc<K8sClient>,
k8s_tenant_config: Arc<OnceCell<TenantConfig>>, k8s_tenant_config: Arc<OnceCell<TenantConfig>>,
network_policy_strategy: Box<dyn NetworkPolicyStrategy>,
} }
impl K8sTenantManager { impl K8sTenantManager {
pub fn new( pub fn new(client: Arc<K8sClient>) -> Self {
client: Arc<K8sClient>,
network_policy_strategy: Box<dyn NetworkPolicyStrategy>,
) -> Self {
Self { Self {
k8s_client: client, k8s_client: client,
k8s_tenant_config: Arc::new(OnceCell::new()), k8s_tenant_config: Arc::new(OnceCell::new()),
network_policy_strategy,
} }
} }
}
impl K8sTenantManager {
fn get_namespace_name(&self, config: &TenantConfig) -> String { fn get_namespace_name(&self, config: &TenantConfig) -> String {
config.name.clone() config.name.clone()
} }
fn ensure_constraints(&self, _namespace: &Namespace) -> Result<(), ExecutorError> { fn ensure_constraints(&self, _namespace: &Namespace) -> Result<(), ExecutorError> {
// TODO: Ensure constraints are applied to namespace (https://git.nationtech.io/NationTech/harmony/issues/98) warn!("Validate that when tenant already exists (by id) that name has not changed");
warn!("Make sure other Tenant constraints are respected by this k8s implementation");
Ok(()) Ok(())
} }
@@ -221,6 +219,29 @@ impl K8sTenantManager {
} }
] ]
}, },
{
"to": [
{
"ipBlock": {
"cidr": "10.43.0.1/32",
}
}
]
},
{
"to": [
{
//TODO this ip is from the docker network that k3d is running on
//since k3d does not deploy kube-api-server as a pod it needs to ahve the ip
//address opened up
//need to find a way to automatically detect the ip address from the docker
//network
"ipBlock": {
"cidr": "172.24.0.0/16",
}
}
]
},
{ {
"to": [ "to": [
{ {
@@ -288,19 +309,19 @@ impl K8sTenantManager {
let ports: Option<Vec<NetworkPolicyPort>> = let ports: Option<Vec<NetworkPolicyPort>> =
c.1.as_ref().map(|spec| match &spec.data { c.1.as_ref().map(|spec| match &spec.data {
super::PortSpecData::SinglePort(port) => vec![NetworkPolicyPort { super::PortSpecData::SinglePort(port) => vec![NetworkPolicyPort {
port: Some(IntOrString::Int((*port).into())), port: Some(IntOrString::Int(port.clone().into())),
..Default::default() ..Default::default()
}], }],
super::PortSpecData::PortRange(start, end) => vec![NetworkPolicyPort { super::PortSpecData::PortRange(start, end) => vec![NetworkPolicyPort {
port: Some(IntOrString::Int((*start).into())), port: Some(IntOrString::Int(start.clone().into())),
end_port: Some((*end).into()), end_port: Some(end.clone().into()),
protocol: None, // Not currently supported by Harmony protocol: None, // Not currently supported by Harmony
}], }],
super::PortSpecData::ListOfPorts(items) => items super::PortSpecData::ListOfPorts(items) => items
.iter() .iter()
.map(|i| NetworkPolicyPort { .map(|i| NetworkPolicyPort {
port: Some(IntOrString::Int((*i).into())), port: Some(IntOrString::Int(i.clone().into())),
..Default::default() ..Default::default()
}) })
.collect(), .collect(),
@@ -345,19 +366,19 @@ impl K8sTenantManager {
let ports: Option<Vec<NetworkPolicyPort>> = let ports: Option<Vec<NetworkPolicyPort>> =
c.1.as_ref().map(|spec| match &spec.data { c.1.as_ref().map(|spec| match &spec.data {
super::PortSpecData::SinglePort(port) => vec![NetworkPolicyPort { super::PortSpecData::SinglePort(port) => vec![NetworkPolicyPort {
port: Some(IntOrString::Int((*port).into())), port: Some(IntOrString::Int(port.clone().into())),
..Default::default() ..Default::default()
}], }],
super::PortSpecData::PortRange(start, end) => vec![NetworkPolicyPort { super::PortSpecData::PortRange(start, end) => vec![NetworkPolicyPort {
port: Some(IntOrString::Int((*start).into())), port: Some(IntOrString::Int(start.clone().into())),
end_port: Some((*end).into()), end_port: Some(end.clone().into()),
protocol: None, // Not currently supported by Harmony protocol: None, // Not currently supported by Harmony
}], }],
super::PortSpecData::ListOfPorts(items) => items super::PortSpecData::ListOfPorts(items) => items
.iter() .iter()
.map(|i| NetworkPolicyPort { .map(|i| NetworkPolicyPort {
port: Some(IntOrString::Int((*i).into())), port: Some(IntOrString::Int(i.clone().into())),
..Default::default() ..Default::default()
}) })
.collect(), .collect(),
@@ -390,27 +411,12 @@ impl K8sTenantManager {
} }
} }
impl Clone for K8sTenantManager {
fn clone(&self) -> Self {
Self {
k8s_client: self.k8s_client.clone(),
k8s_tenant_config: self.k8s_tenant_config.clone(),
network_policy_strategy: self.network_policy_strategy.clone_box(),
}
}
}
#[async_trait] #[async_trait]
impl TenantManager for K8sTenantManager { impl TenantManager for K8sTenantManager {
async fn provision_tenant(&self, config: &TenantConfig) -> Result<(), ExecutorError> { async fn provision_tenant(&self, config: &TenantConfig) -> Result<(), ExecutorError> {
let namespace = self.build_namespace(config)?; let namespace = self.build_namespace(config)?;
let resource_quota = self.build_resource_quota(config)?; let resource_quota = self.build_resource_quota(config)?;
let network_policy = self.build_network_policy(config)?; let network_policy = self.build_network_policy(config)?;
let network_policy = self
.network_policy_strategy
.adjust_policy(network_policy, config);
let resource_limit_range = self.build_limit_range(config)?; let resource_limit_range = self.build_limit_range(config)?;
self.ensure_constraints(&namespace)?; self.ensure_constraints(&namespace)?;
@@ -427,14 +433,13 @@ impl TenantManager for K8sTenantManager {
debug!("Creating network_policy for tenant {}", config.name); debug!("Creating network_policy for tenant {}", config.name);
self.apply_resource(network_policy, config).await?; self.apply_resource(network_policy, config).await?;
debug!( info!(
"Success provisionning K8s tenant id {} name {}", "Success provisionning K8s tenant id {} name {}",
config.id, config.name config.id, config.name
); );
self.store_config(config); self.store_config(config);
Ok(()) Ok(())
} }
async fn get_tenant_config(&self) -> Option<TenantConfig> { async fn get_tenant_config(&self) -> Option<TenantConfig> {
self.k8s_tenant_config.get().cloned() self.k8s_tenant_config.get().cloned()
} }

View File

@@ -1,11 +1,11 @@
pub mod k8s; pub mod k8s;
mod manager; mod manager;
pub mod network_policy; use std::str::FromStr;
use crate::data::Id;
pub use manager::*; pub use manager::*;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::str::FromStr;
use crate::data::Id;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] // Assuming serde for Scores #[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] // Assuming serde for Scores
pub struct TenantConfig { pub struct TenantConfig {

View File

@@ -1,120 +0,0 @@
use k8s_openapi::api::networking::v1::{
IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyPeer, NetworkPolicySpec,
};
use super::TenantConfig;
pub trait NetworkPolicyStrategy: Send + Sync + std::fmt::Debug {
fn clone_box(&self) -> Box<dyn NetworkPolicyStrategy>;
fn adjust_policy(&self, policy: NetworkPolicy, config: &TenantConfig) -> NetworkPolicy;
}
#[derive(Clone, Debug)]
pub struct NoopNetworkPolicyStrategy {}
impl NoopNetworkPolicyStrategy {
pub fn new() -> Self {
Self {}
}
}
impl Default for NoopNetworkPolicyStrategy {
fn default() -> Self {
Self::new()
}
}
impl NetworkPolicyStrategy for NoopNetworkPolicyStrategy {
fn clone_box(&self) -> Box<dyn NetworkPolicyStrategy> {
Box::new(self.clone())
}
fn adjust_policy(&self, policy: NetworkPolicy, _config: &TenantConfig) -> NetworkPolicy {
policy
}
}
#[derive(Clone, Debug)]
pub struct K3dNetworkPolicyStrategy {}
impl K3dNetworkPolicyStrategy {
pub fn new() -> Self {
Self {}
}
}
impl Default for K3dNetworkPolicyStrategy {
fn default() -> Self {
Self::new()
}
}
impl NetworkPolicyStrategy for K3dNetworkPolicyStrategy {
fn clone_box(&self) -> Box<dyn NetworkPolicyStrategy> {
Box::new(self.clone())
}
fn adjust_policy(&self, policy: NetworkPolicy, _config: &TenantConfig) -> NetworkPolicy {
let mut egress = policy
.spec
.clone()
.unwrap_or_default()
.egress
.clone()
.unwrap_or_default();
egress.push(NetworkPolicyEgressRule {
to: Some(vec![NetworkPolicyPeer {
ip_block: Some(IPBlock {
cidr: "172.18.0.0/16".into(), // TODO: query the IP range https://git.nationtech.io/NationTech/harmony/issues/108
..Default::default()
}),
..Default::default()
}]),
..Default::default()
});
NetworkPolicy {
spec: Some(NetworkPolicySpec {
egress: Some(egress),
..policy.spec.unwrap_or_default()
}),
..policy
}
}
}
#[cfg(test)]
mod tests {
use k8s_openapi::api::networking::v1::{
IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyPeer, NetworkPolicySpec,
};
use super::{K3dNetworkPolicyStrategy, NetworkPolicyStrategy};
#[test]
pub fn should_add_ip_block_for_k3d_harmony_server() {
let strategy = K3dNetworkPolicyStrategy::new();
let policy =
strategy.adjust_policy(NetworkPolicy::default(), &super::TenantConfig::default());
let expected_policy = NetworkPolicy {
spec: Some(NetworkPolicySpec {
egress: Some(vec![NetworkPolicyEgressRule {
to: Some(vec![NetworkPolicyPeer {
ip_block: Some(IPBlock {
cidr: "172.18.0.0/16".into(),
..Default::default()
}),
..Default::default()
}]),
..Default::default()
}]),
..Default::default()
}),
..Default::default()
};
assert_eq!(expected_policy, policy);
}
}

View File

@@ -60,7 +60,7 @@ impl DnsServer for OPNSenseFirewall {
} }
fn get_ip(&self) -> IpAddress { fn get_ip(&self) -> IpAddress {
OPNSenseFirewall::get_ip(self) OPNSenseFirewall::get_ip(&self)
} }
fn get_host(&self) -> LogicalHost { fn get_host(&self) -> LogicalHost {

View File

@@ -48,7 +48,7 @@ impl HttpServer for OPNSenseFirewall {
async fn ensure_initialized(&self) -> Result<(), ExecutorError> { async fn ensure_initialized(&self) -> Result<(), ExecutorError> {
let mut config = self.opnsense_config.write().await; let mut config = self.opnsense_config.write().await;
let caddy = config.caddy(); let caddy = config.caddy();
if caddy.get_full_config().is_none() { if let None = caddy.get_full_config() {
info!("Http config not available in opnsense config, installing package"); info!("Http config not available in opnsense config, installing package");
config.install_package("os-caddy").await.map_err(|e| { config.install_package("os-caddy").await.map_err(|e| {
ExecutorError::UnexpectedError(format!( ExecutorError::UnexpectedError(format!(

View File

@@ -121,12 +121,10 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer(
LoadBalancerService { LoadBalancerService {
backend_servers, backend_servers,
listening_port: frontend.bind.parse().unwrap_or_else(|_| { listening_port: frontend.bind.parse().expect(&format!(
panic!(
"HAProxy frontend address should be a valid SocketAddr, got {}", "HAProxy frontend address should be a valid SocketAddr, got {}",
frontend.bind frontend.bind
) )),
}),
health_check, health_check,
} }
}) })
@@ -169,28 +167,28 @@ pub(crate) fn get_health_check_for_backend(
None => return None, None => return None,
}; };
let haproxy_health_check = haproxy let haproxy_health_check = match haproxy
.healthchecks .healthchecks
.healthchecks .healthchecks
.iter() .iter()
.find(|h| &h.uuid == health_check_uuid)?; .find(|h| &h.uuid == health_check_uuid)
{
Some(health_check) => health_check,
None => return None,
};
let binding = haproxy_health_check.health_check_type.to_uppercase(); let binding = haproxy_health_check.health_check_type.to_uppercase();
let uppercase = binding.as_str(); let uppercase = binding.as_str();
match uppercase { match uppercase {
"TCP" => { "TCP" => {
if let Some(checkport) = haproxy_health_check.checkport.content.as_ref() { if let Some(checkport) = haproxy_health_check.checkport.content.as_ref() {
if !checkport.is_empty() { if checkport.len() > 0 {
return Some(HealthCheck::TCP(Some(checkport.parse().unwrap_or_else( return Some(HealthCheck::TCP(Some(checkport.parse().expect(&format!(
|_| {
panic!(
"HAProxy check port should be a valid port number, got {checkport}" "HAProxy check port should be a valid port number, got {checkport}"
) )))));
},
))));
} }
} }
Some(HealthCheck::TCP(None)) return Some(HealthCheck::TCP(None));
} }
"HTTP" => { "HTTP" => {
let path: String = haproxy_health_check let path: String = haproxy_health_check
@@ -357,13 +355,16 @@ mod tests {
// Create an HAProxy instance with servers // Create an HAProxy instance with servers
let mut haproxy = HAProxy::default(); let mut haproxy = HAProxy::default();
let server = HAProxyServer { let mut server = HAProxyServer::default();
uuid: "server1".to_string(), server.uuid = "server1".to_string();
address: "192.168.1.1".to_string(), server.address = "192.168.1.1".to_string();
port: 80, server.port = 80;
..Default::default()
};
haproxy.servers.servers.push(server); haproxy.servers.servers.push(server);
let mut server = HAProxyServer::default();
server.uuid = "server3".to_string();
server.address = "192.168.1.3".to_string();
server.port = 8080;
// Call the function // Call the function
let result = get_servers_for_backend(&backend, &haproxy); let result = get_servers_for_backend(&backend, &haproxy);
@@ -383,12 +384,10 @@ mod tests {
let backend = HAProxyBackend::default(); let backend = HAProxyBackend::default();
// Create an HAProxy instance with servers // Create an HAProxy instance with servers
let mut haproxy = HAProxy::default(); let mut haproxy = HAProxy::default();
let server = HAProxyServer { let mut server = HAProxyServer::default();
uuid: "server1".to_string(), server.uuid = "server1".to_string();
address: "192.168.1.1".to_string(), server.address = "192.168.1.1".to_string();
port: 80, server.port = 80;
..Default::default()
};
haproxy.servers.servers.push(server); haproxy.servers.servers.push(server);
// Call the function // Call the function
let result = get_servers_for_backend(&backend, &haproxy); let result = get_servers_for_backend(&backend, &haproxy);
@@ -403,12 +402,10 @@ mod tests {
backend.linked_servers.content = Some("server4,server5".to_string()); backend.linked_servers.content = Some("server4,server5".to_string());
// Create an HAProxy instance with servers // Create an HAProxy instance with servers
let mut haproxy = HAProxy::default(); let mut haproxy = HAProxy::default();
let server = HAProxyServer { let mut server = HAProxyServer::default();
uuid: "server1".to_string(), server.uuid = "server1".to_string();
address: "192.168.1.1".to_string(), server.address = "192.168.1.1".to_string();
port: 80, server.port = 80;
..Default::default()
};
haproxy.servers.servers.push(server); haproxy.servers.servers.push(server);
// Call the function // Call the function
let result = get_servers_for_backend(&backend, &haproxy); let result = get_servers_for_backend(&backend, &haproxy);
@@ -419,28 +416,20 @@ mod tests {
#[test] #[test]
fn test_get_servers_for_backend_multiple_linked_servers() { fn test_get_servers_for_backend_multiple_linked_servers() {
// Create a backend with multiple linked servers // Create a backend with multiple linked servers
#[allow(clippy::field_reassign_with_default)]
let mut backend = HAProxyBackend::default(); let mut backend = HAProxyBackend::default();
backend.linked_servers.content = Some("server1,server2".to_string()); backend.linked_servers.content = Some("server1,server2".to_string());
//
// Create an HAProxy instance with matching servers // Create an HAProxy instance with matching servers
let mut haproxy = HAProxy::default(); let mut haproxy = HAProxy::default();
let server = HAProxyServer { let mut server = HAProxyServer::default();
uuid: "server1".to_string(), server.uuid = "server1".to_string();
address: "some-hostname.test.mcd".to_string(), server.address = "some-hostname.test.mcd".to_string();
port: 80, server.port = 80;
..Default::default()
};
haproxy.servers.servers.push(server); haproxy.servers.servers.push(server);
let mut server = HAProxyServer::default();
let server = HAProxyServer { server.uuid = "server2".to_string();
uuid: "server2".to_string(), server.address = "192.168.1.2".to_string();
address: "192.168.1.2".to_string(), server.port = 8080;
port: 8080,
..Default::default()
};
haproxy.servers.servers.push(server); haproxy.servers.servers.push(server);
// Call the function // Call the function
let result = get_servers_for_backend(&backend, &haproxy); let result = get_servers_for_backend(&backend, &haproxy);
// Check the result // Check the result

View File

@@ -58,7 +58,7 @@ impl TftpServer for OPNSenseFirewall {
async fn ensure_initialized(&self) -> Result<(), ExecutorError> { async fn ensure_initialized(&self) -> Result<(), ExecutorError> {
let mut config = self.opnsense_config.write().await; let mut config = self.opnsense_config.write().await;
let tftp = config.tftp(); let tftp = config.tftp();
if tftp.get_full_config().is_none() { if let None = tftp.get_full_config() {
info!("Tftp config not available in opnsense config, installing package"); info!("Tftp config not available in opnsense config, installing package");
config.install_package("os-tftp").await.map_err(|e| { config.install_package("os-tftp").await.map_err(|e| {
ExecutorError::UnexpectedError(format!( ExecutorError::UnexpectedError(format!(

View File

@@ -13,7 +13,7 @@ pub trait ApplicationFeature<T: Topology>:
fn name(&self) -> String; fn name(&self) -> String;
} }
pub trait ApplicationFeatureClone<T: Topology> { trait ApplicationFeatureClone<T: Topology> {
fn clone_box(&self) -> Box<dyn ApplicationFeature<T>>; fn clone_box(&self) -> Box<dyn ApplicationFeature<T>>;
} }
@@ -27,7 +27,7 @@ where
} }
impl<T: Topology> Serialize for Box<dyn ApplicationFeature<T>> { impl<T: Topology> Serialize for Box<dyn ApplicationFeature<T>> {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where where
S: serde::Serializer, S: serde::Serializer,
{ {

View File

@@ -184,11 +184,12 @@ impl ArgoApplication {
pub fn to_yaml(&self) -> serde_yaml::Value { pub fn to_yaml(&self) -> serde_yaml::Value {
let name = &self.name; let name = &self.name;
let namespace = if let Some(ns) = self.namespace.as_ref() { let namespace = if let Some(ns) = self.namespace.as_ref() {
ns &ns
} else { } else {
"argocd" "argocd"
}; };
let project = &self.project; let project = &self.project;
let source = &self.source;
let yaml_str = format!( let yaml_str = format!(
r#" r#"
@@ -227,7 +228,7 @@ spec:
serde_yaml::to_value(&self.source).expect("couldn't serialize source to value"); serde_yaml::to_value(&self.source).expect("couldn't serialize source to value");
let sync_policy = serde_yaml::to_value(&self.sync_policy) let sync_policy = serde_yaml::to_value(&self.sync_policy)
.expect("couldn't serialize sync_policy to value"); .expect("couldn't serialize sync_policy to value");
let revision_history_limit = serde_yaml::to_value(self.revision_history_limit) let revision_history_limit = serde_yaml::to_value(&self.revision_history_limit)
.expect("couldn't serialize revision_history_limit to value"); .expect("couldn't serialize revision_history_limit to value");
spec.insert( spec.insert(

View File

@@ -1,7 +1,7 @@
use std::{io::Write, process::Command, sync::Arc}; use std::{io::Write, process::Command, sync::Arc};
use async_trait::async_trait; use async_trait::async_trait;
use log::info; use log::{debug, error};
use serde_yaml::Value; use serde_yaml::Value;
use tempfile::NamedTempFile; use tempfile::NamedTempFile;
@@ -10,7 +10,7 @@ use crate::{
data::Version, data::Version,
inventory::Inventory, inventory::Inventory,
modules::application::{ modules::application::{
ApplicationFeature, HelmPackage, OCICompliant, Application, ApplicationFeature, HelmPackage, OCICompliant,
features::{ArgoApplication, ArgoHelmScore}, features::{ArgoApplication, ArgoHelmScore},
}, },
score::Score, score::Score,
@@ -56,11 +56,14 @@ impl<A: OCICompliant + HelmPackage> ContinuousDelivery<A> {
chart_url: String, chart_url: String,
image_name: String, image_name: String,
) -> Result<(), String> { ) -> Result<(), String> {
// TODO: This works only with local k3d installations, which is fine only for current demo purposes. We assume usage of K8sAnywhereTopology" error!(
// https://git.nationtech.io/NationTech/harmony/issues/106 "FIXME This works only with local k3d installations, which is fine only for current demo purposes. We assume usage of K8sAnywhereTopology"
);
error!("TODO hardcoded k3d bin path is wrong");
let k3d_bin_path = (*HARMONY_DATA_DIR).join("k3d").join("k3d"); let k3d_bin_path = (*HARMONY_DATA_DIR).join("k3d").join("k3d");
// --- 1. Import the container image into the k3d cluster --- // --- 1. Import the container image into the k3d cluster ---
info!( debug!(
"Importing image '{}' into k3d cluster 'harmony'", "Importing image '{}' into k3d cluster 'harmony'",
image_name image_name
); );
@@ -77,7 +80,7 @@ impl<A: OCICompliant + HelmPackage> ContinuousDelivery<A> {
} }
// --- 2. Get the kubeconfig for the k3d cluster and write it to a temp file --- // --- 2. Get the kubeconfig for the k3d cluster and write it to a temp file ---
info!("Retrieving kubeconfig for k3d cluster 'harmony'"); debug!("Retrieving kubeconfig for k3d cluster 'harmony'");
let kubeconfig_output = Command::new(&k3d_bin_path) let kubeconfig_output = Command::new(&k3d_bin_path)
.args(["kubeconfig", "get", "harmony"]) .args(["kubeconfig", "get", "harmony"])
.output() .output()
@@ -98,7 +101,7 @@ impl<A: OCICompliant + HelmPackage> ContinuousDelivery<A> {
let kubeconfig_path = temp_kubeconfig.path().to_str().unwrap(); let kubeconfig_path = temp_kubeconfig.path().to_str().unwrap();
// --- 3. Install or upgrade the Helm chart in the cluster --- // --- 3. Install or upgrade the Helm chart in the cluster ---
info!( debug!(
"Deploying Helm chart '{}' to namespace '{}'", "Deploying Helm chart '{}' to namespace '{}'",
chart_url, app_name chart_url, app_name
); );
@@ -128,7 +131,7 @@ impl<A: OCICompliant + HelmPackage> ContinuousDelivery<A> {
)); ));
} }
info!("Successfully deployed '{}' to local k3d cluster.", app_name); debug!("Successfully deployed '{}' to local k3d cluster.", app_name);
Ok(()) Ok(())
} }
} }
@@ -148,12 +151,14 @@ impl<
// Or ask for it when unknown // Or ask for it when unknown
let helm_chart = self.application.build_push_helm_package(&image).await?; let helm_chart = self.application.build_push_helm_package(&image).await?;
debug!("Pushed new helm chart {helm_chart}");
// TODO: Make building image configurable/skippable if image already exists (prompt)") error!("TODO Make building image configurable/skippable if image already exists (prompt)");
// https://git.nationtech.io/NationTech/harmony/issues/104
let image = self.application.build_push_oci_image().await?; let image = self.application.build_push_oci_image().await?;
debug!("Pushed new docker image {image}");
// TODO: this is a temporary hack for demo purposes, the deployment target should be driven debug!("Installing ContinuousDelivery feature");
// TODO this is a temporary hack for demo purposes, the deployment target should be driven
// by the topology only and we should not have to know how to perform tasks like this for // by the topology only and we should not have to know how to perform tasks like this for
// which the topology should be responsible. // which the topology should be responsible.
// //
@@ -166,20 +171,17 @@ impl<
// access it. This forces every Topology to understand the concept of targets though... So // access it. This forces every Topology to understand the concept of targets though... So
// instead I'll create a new Capability which is MultiTargetTopology and we'll see how it // instead I'll create a new Capability which is MultiTargetTopology and we'll see how it
// goes. It still does not feel right though. // goes. It still does not feel right though.
//
// https://git.nationtech.io/NationTech/harmony/issues/106
match topology.current_target() { match topology.current_target() {
DeploymentTarget::LocalDev => { DeploymentTarget::LocalDev => {
info!("Deploying {} locally...", self.application.name());
self.deploy_to_local_k3d(self.application.name(), helm_chart, image) self.deploy_to_local_k3d(self.application.name(), helm_chart, image)
.await?; .await?;
} }
target => { target => {
info!("Deploying {} to target {target:?}", self.application.name()); debug!("Deploying to target {target:?}");
let score = ArgoHelmScore { let score = ArgoHelmScore {
namespace: "harmony-example-rust-webapp".to_string(), namespace: "harmonydemo-staging".to_string(),
openshift: true, openshift: false,
domain: "argo.harmonydemo.apps.ncd0.harmony.mcd".to_string(), domain: "argo.harmonydemo.apps.st.mcd".to_string(),
argo_apps: vec![ArgoApplication::from(CDApplicationConfig { argo_apps: vec![ArgoApplication::from(CDApplicationConfig {
// helm pull oci://hub.nationtech.io/harmony/harmony-example-rust-webapp-chart --version 0.1.0 // helm pull oci://hub.nationtech.io/harmony/harmony-example-rust-webapp-chart --version 0.1.0
version: Version::from("0.1.0").unwrap(), version: Version::from("0.1.0").unwrap(),
@@ -187,11 +189,12 @@ impl<
helm_chart_name: "harmony-example-rust-webapp-chart".to_string(), helm_chart_name: "harmony-example-rust-webapp-chart".to_string(),
values_overrides: None, values_overrides: None,
name: "harmony-demo-rust-webapp".to_string(), name: "harmony-demo-rust-webapp".to_string(),
namespace: "harmony-example-rust-webapp".to_string(), namespace: "harmonydemo-staging".to_string(),
})], })],
}; };
score score
.interpret(&Inventory::empty(), topology) .create_interpret()
.execute(&Inventory::empty(), topology)
.await .await
.unwrap(); .unwrap();
} }

View File

@@ -1,4 +1,5 @@
use async_trait::async_trait; use async_trait::async_trait;
use log::error;
use non_blank_string_rs::NonBlankString; use non_blank_string_rs::NonBlankString;
use serde::Serialize; use serde::Serialize;
use std::str::FromStr; use std::str::FromStr;
@@ -49,21 +50,20 @@ impl<T: Topology + K8sclient + HelmCommand> Interpret<T> for ArgoInterpret {
inventory: &Inventory, inventory: &Inventory,
topology: &T, topology: &T,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
self.score.interpret(inventory, topology).await?; error!("Uncomment below, only disabled for debugging");
self.score
.create_interpret()
.execute(inventory, topology)
.await?;
let k8s_client = topology.k8s_client().await?; let k8s_client = topology.k8s_client().await?;
k8s_client k8s_client
.apply_yaml_many(&self.argo_apps.iter().map(|a| a.to_yaml()).collect(), None) .apply_yaml_many(&self.argo_apps.iter().map(|a| a.to_yaml()).collect(), None)
.await .await
.unwrap(); .unwrap();
Ok(Outcome::success(format!( Ok(Outcome::success(format!(
"ArgoCD installed with {} {}", "Successfully installed ArgoCD and {} Applications",
self.argo_apps.len(), self.argo_apps.len()
match self.argo_apps.len() {
1 => "application",
_ => "applications",
}
))) )))
} }
@@ -986,7 +986,7 @@ commitServer:
); );
HelmChartScore { HelmChartScore {
namespace: Some(NonBlankString::from_str(namespace).unwrap()), namespace: Some(NonBlankString::from_str(&namespace).unwrap()),
release_name: NonBlankString::from_str("argo-cd").unwrap(), release_name: NonBlankString::from_str("argo-cd").unwrap(),
chart_name: NonBlankString::from_str("argo/argo-cd").unwrap(), chart_name: NonBlankString::from_str("argo/argo-cd").unwrap(),
chart_version: Some(NonBlankString::from_str("8.1.2").unwrap()), chart_version: Some(NonBlankString::from_str("8.1.2").unwrap()),

View File

@@ -4,7 +4,6 @@ use crate::modules::application::{Application, ApplicationFeature};
use crate::modules::monitoring::application_monitoring::application_monitoring_score::ApplicationMonitoringScore; use crate::modules::monitoring::application_monitoring::application_monitoring_score::ApplicationMonitoringScore;
use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::CRDPrometheus; use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::CRDPrometheus;
use crate::topology::MultiTargetTopology;
use crate::{ use crate::{
inventory::Inventory, inventory::Inventory,
modules::monitoring::{ modules::monitoring::{
@@ -34,7 +33,6 @@ impl<
+ 'static + 'static
+ TenantManager + TenantManager
+ K8sclient + K8sclient
+ MultiTargetTopology
+ std::fmt::Debug + std::fmt::Debug
+ PrometheusApplicationMonitoring<CRDPrometheus>, + PrometheusApplicationMonitoring<CRDPrometheus>,
> ApplicationFeature<T> for Monitoring > ApplicationFeature<T> for Monitoring
@@ -57,11 +55,12 @@ impl<
}; };
let ntfy = NtfyScore { let ntfy = NtfyScore {
namespace: namespace.clone(), namespace: namespace.clone(),
host: "ntfy.harmonydemo.apps.ncd0.harmony.mcd".to_string(), host: "localhost".to_string(),
}; };
ntfy.interpret(&Inventory::empty(), topology) ntfy.create_interpret()
.execute(&Inventory::empty(), topology)
.await .await
.map_err(|e| e.to_string())?; .expect("couldn't create interpret for ntfy");
let ntfy_default_auth_username = "harmony"; let ntfy_default_auth_username = "harmony";
let ntfy_default_auth_password = "harmony"; let ntfy_default_auth_password = "harmony";
@@ -96,9 +95,10 @@ impl<
alerting_score.receivers.push(Box::new(ntfy_receiver)); alerting_score.receivers.push(Box::new(ntfy_receiver));
alerting_score alerting_score
.interpret(&Inventory::empty(), topology) .create_interpret()
.execute(&Inventory::empty(), topology)
.await .await
.map_err(|e| e.to_string())?; .unwrap();
Ok(()) Ok(())
} }
fn name(&self) -> String { fn name(&self) -> String {

View File

@@ -14,19 +14,11 @@ use serde::Serialize;
use crate::{ use crate::{
data::{Id, Version}, data::{Id, Version},
instrumentation::{self, HarmonyEvent},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory, inventory::Inventory,
topology::Topology, topology::Topology,
}; };
#[derive(Clone, Debug)]
pub enum ApplicationFeatureStatus {
Installing,
Installed,
Failed { details: String },
}
pub trait Application: std::fmt::Debug + Send + Sync { pub trait Application: std::fmt::Debug + Send + Sync {
fn name(&self) -> String; fn name(&self) -> String;
} }
@@ -55,41 +47,20 @@ impl<A: Application, T: Topology + std::fmt::Debug> Interpret<T> for Application
.join(", ") .join(", ")
); );
for feature in self.features.iter() { for feature in self.features.iter() {
instrumentation::instrument(HarmonyEvent::ApplicationFeatureStateChanged { debug!(
topology: topology.name().into(), "Installing feature {} for application {app_name}",
application: self.application.name(), feature.name()
feature: feature.name(), );
status: ApplicationFeatureStatus::Installing,
})
.unwrap();
let _ = match feature.ensure_installed(topology).await { let _ = match feature.ensure_installed(topology).await {
Ok(()) => { Ok(()) => (),
instrumentation::instrument(HarmonyEvent::ApplicationFeatureStateChanged {
topology: topology.name().into(),
application: self.application.name(),
feature: feature.name(),
status: ApplicationFeatureStatus::Installed,
})
.unwrap();
}
Err(msg) => { Err(msg) => {
instrumentation::instrument(HarmonyEvent::ApplicationFeatureStateChanged {
topology: topology.name().into(),
application: self.application.name(),
feature: feature.name(),
status: ApplicationFeatureStatus::Failed {
details: msg.clone(),
},
})
.unwrap();
return Err(InterpretError::new(format!( return Err(InterpretError::new(format!(
"Application Interpret failed to install feature : {msg}" "Application Interpret failed to install feature : {msg}"
))); )));
} }
}; };
} }
Ok(Outcome::success("Application created".to_string())) Ok(Outcome::success("successfully created app".to_string()))
} }
fn get_name(&self) -> InterpretName { fn get_name(&self) -> InterpretName {
@@ -110,7 +81,7 @@ impl<A: Application, T: Topology + std::fmt::Debug> Interpret<T> for Application
} }
impl Serialize for dyn Application { impl Serialize for dyn Application {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error> fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where where
S: serde::Serializer, S: serde::Serializer,
{ {

View File

@@ -1,5 +1,5 @@
use std::fs; use std::fs;
use std::path::{Path, PathBuf}; use std::path::PathBuf;
use std::process; use std::process;
use std::sync::Arc; use std::sync::Arc;
@@ -10,7 +10,7 @@ use dockerfile_builder::Dockerfile;
use dockerfile_builder::instruction::{CMD, COPY, ENV, EXPOSE, FROM, RUN, USER, WORKDIR}; use dockerfile_builder::instruction::{CMD, COPY, ENV, EXPOSE, FROM, RUN, USER, WORKDIR};
use dockerfile_builder::instruction_builder::CopyBuilder; use dockerfile_builder::instruction_builder::CopyBuilder;
use futures_util::StreamExt; use futures_util::StreamExt;
use log::{debug, info, log_enabled}; use log::{debug, error, log_enabled};
use serde::Serialize; use serde::Serialize;
use tar::Archive; use tar::Archive;
@@ -46,7 +46,7 @@ where
} }
fn name(&self) -> String { fn name(&self) -> String {
format!("{} [ApplicationScore]", self.application.name()) format!("Application: {}", self.application.name())
} }
} }
@@ -73,19 +73,19 @@ impl Application for RustWebapp {
#[async_trait] #[async_trait]
impl HelmPackage for RustWebapp { impl HelmPackage for RustWebapp {
async fn build_push_helm_package(&self, image_url: &str) -> Result<String, String> { async fn build_push_helm_package(&self, image_url: &str) -> Result<String, String> {
info!("Starting Helm chart build and push for '{}'", self.name); debug!("Starting Helm chart build and push for '{}'", self.name);
// 1. Create the Helm chart files on disk. // 1. Create the Helm chart files on disk.
let chart_dir = self let chart_dir = self
.create_helm_chart_files(image_url) .create_helm_chart_files(image_url)
.map_err(|e| format!("Failed to create Helm chart files: {}", e))?; .map_err(|e| format!("Failed to create Helm chart files: {}", e))?;
info!("Successfully created Helm chart files in {:?}", chart_dir); debug!("Successfully created Helm chart files in {:?}", chart_dir);
// 2. Package the chart into a .tgz archive. // 2. Package the chart into a .tgz archive.
let packaged_chart_path = self let packaged_chart_path = self
.package_helm_chart(&chart_dir) .package_helm_chart(&chart_dir)
.map_err(|e| format!("Failed to package Helm chart: {}", e))?; .map_err(|e| format!("Failed to package Helm chart: {}", e))?;
info!( debug!(
"Successfully packaged Helm chart: {}", "Successfully packaged Helm chart: {}",
packaged_chart_path.to_string_lossy() packaged_chart_path.to_string_lossy()
); );
@@ -94,7 +94,7 @@ impl HelmPackage for RustWebapp {
let oci_chart_url = self let oci_chart_url = self
.push_helm_chart(&packaged_chart_path) .push_helm_chart(&packaged_chart_path)
.map_err(|e| format!("Failed to push Helm chart: {}", e))?; .map_err(|e| format!("Failed to push Helm chart: {}", e))?;
info!("Successfully pushed Helm chart to: {}", oci_chart_url); debug!("Successfully pushed Helm chart to: {}", oci_chart_url);
Ok(oci_chart_url) Ok(oci_chart_url)
} }
@@ -107,20 +107,20 @@ impl OCICompliant for RustWebapp {
async fn build_push_oci_image(&self) -> Result<String, String> { async fn build_push_oci_image(&self) -> Result<String, String> {
// This function orchestrates the build and push process. // This function orchestrates the build and push process.
// It's async to match the trait definition, though the underlying docker commands are blocking. // It's async to match the trait definition, though the underlying docker commands are blocking.
info!("Starting OCI image build and push for '{}'", self.name); debug!("Starting OCI image build and push for '{}'", self.name);
// 1. Build the image by calling the synchronous helper function. // 1. Build the image by calling the synchronous helper function.
let image_tag = self.image_name(); let image_tag = self.image_name();
self.build_docker_image(&image_tag) self.build_docker_image(&image_tag)
.await .await
.map_err(|e| format!("Failed to build Docker image: {}", e))?; .map_err(|e| format!("Failed to build Docker image: {}", e))?;
info!("Successfully built Docker image: {}", image_tag); debug!("Successfully built Docker image: {}", image_tag);
// 2. Push the image to the registry. // 2. Push the image to the registry.
self.push_docker_image(&image_tag) self.push_docker_image(&image_tag)
.await .await
.map_err(|e| format!("Failed to push Docker image: {}", e))?; .map_err(|e| format!("Failed to push Docker image: {}", e))?;
info!("Successfully pushed Docker image to: {}", image_tag); debug!("Successfully pushed Docker image to: {}", image_tag);
Ok(image_tag) Ok(image_tag)
} }
@@ -174,7 +174,7 @@ impl RustWebapp {
.platform("linux/x86_64"); .platform("linux/x86_64");
let mut temp_tar_builder = tar::Builder::new(Vec::new()); let mut temp_tar_builder = tar::Builder::new(Vec::new());
temp_tar_builder let _ = temp_tar_builder
.append_dir_all("", self.project_root.clone()) .append_dir_all("", self.project_root.clone())
.unwrap(); .unwrap();
let archive = temp_tar_builder let archive = temp_tar_builder
@@ -195,7 +195,7 @@ impl RustWebapp {
); );
while let Some(msg) = image_build_stream.next().await { while let Some(msg) = image_build_stream.next().await {
debug!("Message: {msg:?}"); println!("Message: {msg:?}");
} }
Ok(image_name.to_string()) Ok(image_name.to_string())
@@ -219,7 +219,7 @@ impl RustWebapp {
); );
while let Some(msg) = push_image_stream.next().await { while let Some(msg) = push_image_stream.next().await {
debug!("Message: {msg:?}"); println!("Message: {msg:?}");
} }
Ok(image_tag.to_string()) Ok(image_tag.to_string())
@@ -288,8 +288,9 @@ impl RustWebapp {
.unwrap(), .unwrap(),
); );
// Copy the compiled binary from the builder stage. // Copy the compiled binary from the builder stage.
// TODO: Should not be using score name here, instead should use name from Cargo.toml error!(
// https://git.nationtech.io/NationTech/harmony/issues/105 "FIXME Should not be using score name here, instead should use name from Cargo.toml"
);
let binary_path_in_builder = format!("/app/target/release/{}", self.name); let binary_path_in_builder = format!("/app/target/release/{}", self.name);
let binary_path_in_final = format!("/home/appuser/{}", self.name); let binary_path_in_final = format!("/home/appuser/{}", self.name);
dockerfile.push( dockerfile.push(
@@ -327,8 +328,9 @@ impl RustWebapp {
)); ));
// Copy only the compiled binary from the builder stage. // Copy only the compiled binary from the builder stage.
// TODO: Should not be using score name here, instead should use name from Cargo.toml error!(
// https://git.nationtech.io/NationTech/harmony/issues/105 "FIXME Should not be using score name here, instead should use name from Cargo.toml"
);
let binary_path_in_builder = format!("/app/target/release/{}", self.name); let binary_path_in_builder = format!("/app/target/release/{}", self.name);
let binary_path_in_final = format!("/usr/local/bin/{}", self.name); let binary_path_in_final = format!("/usr/local/bin/{}", self.name);
dockerfile.push( dockerfile.push(
@@ -528,7 +530,10 @@ spec:
} }
/// Packages a Helm chart directory into a .tgz file. /// Packages a Helm chart directory into a .tgz file.
fn package_helm_chart(&self, chart_dir: &Path) -> Result<PathBuf, Box<dyn std::error::Error>> { fn package_helm_chart(
&self,
chart_dir: &PathBuf,
) -> Result<PathBuf, Box<dyn std::error::Error>> {
let chart_dirname = chart_dir.file_name().expect("Should find a chart dirname"); let chart_dirname = chart_dir.file_name().expect("Should find a chart dirname");
debug!( debug!(
"Launching `helm package {}` cli with CWD {}", "Launching `helm package {}` cli with CWD {}",
@@ -541,13 +546,14 @@ spec:
); );
let output = process::Command::new("helm") let output = process::Command::new("helm")
.args(["package", chart_dirname.to_str().unwrap()]) .args(["package", chart_dirname.to_str().unwrap()])
.current_dir(self.project_root.join(".harmony_generated").join("helm")) // Run package from the parent dir .current_dir(&self.project_root.join(".harmony_generated").join("helm")) // Run package from the parent dir
.output()?; .output()?;
self.check_output(&output, "Failed to package Helm chart")?; self.check_output(&output, "Failed to package Helm chart")?;
// Helm prints the path of the created chart to stdout. // Helm prints the path of the created chart to stdout.
let tgz_name = String::from_utf8(output.stdout)? let tgz_name = String::from_utf8(output.stdout)?
.trim()
.split_whitespace() .split_whitespace()
.last() .last()
.unwrap_or_default() .unwrap_or_default()
@@ -567,7 +573,7 @@ spec:
/// Pushes a packaged Helm chart to an OCI registry. /// Pushes a packaged Helm chart to an OCI registry.
fn push_helm_chart( fn push_helm_chart(
&self, &self,
packaged_chart_path: &Path, packaged_chart_path: &PathBuf,
) -> Result<String, Box<dyn std::error::Error>> { ) -> Result<String, Box<dyn std::error::Error>> {
// The chart name is the file stem of the .tgz file // The chart name is the file stem of the .tgz file
let chart_file_name = packaged_chart_path.file_stem().unwrap().to_str().unwrap(); let chart_file_name = packaged_chart_path.file_stem().unwrap().to_str().unwrap();

View File

@@ -41,6 +41,6 @@ impl<T: Topology + HelmCommand> Score<T> for CertManagerHelmScore {
} }
fn name(&self) -> String { fn name(&self) -> String {
"CertManagerHelmScore".to_string() format!("CertManagerHelmScore")
} }
} }

View File

@@ -111,7 +111,7 @@ impl DhcpInterpret {
let boot_filename_outcome = match &self.score.boot_filename { let boot_filename_outcome = match &self.score.boot_filename {
Some(boot_filename) => { Some(boot_filename) => {
dhcp_server.set_boot_filename(boot_filename).await?; dhcp_server.set_boot_filename(&boot_filename).await?;
Outcome::new( Outcome::new(
InterpretStatus::SUCCESS, InterpretStatus::SUCCESS,
format!("Dhcp Interpret Set boot filename to {boot_filename}"), format!("Dhcp Interpret Set boot filename to {boot_filename}"),
@@ -122,7 +122,7 @@ impl DhcpInterpret {
let filename_outcome = match &self.score.filename { let filename_outcome = match &self.score.filename {
Some(filename) => { Some(filename) => {
dhcp_server.set_filename(filename).await?; dhcp_server.set_filename(&filename).await?;
Outcome::new( Outcome::new(
InterpretStatus::SUCCESS, InterpretStatus::SUCCESS,
format!("Dhcp Interpret Set filename to {filename}"), format!("Dhcp Interpret Set filename to {filename}"),
@@ -133,7 +133,7 @@ impl DhcpInterpret {
let filename64_outcome = match &self.score.filename64 { let filename64_outcome = match &self.score.filename64 {
Some(filename64) => { Some(filename64) => {
dhcp_server.set_filename64(filename64).await?; dhcp_server.set_filename64(&filename64).await?;
Outcome::new( Outcome::new(
InterpretStatus::SUCCESS, InterpretStatus::SUCCESS,
format!("Dhcp Interpret Set filename64 to {filename64}"), format!("Dhcp Interpret Set filename64 to {filename64}"),
@@ -144,7 +144,7 @@ impl DhcpInterpret {
let filenameipxe_outcome = match &self.score.filenameipxe { let filenameipxe_outcome = match &self.score.filenameipxe {
Some(filenameipxe) => { Some(filenameipxe) => {
dhcp_server.set_filenameipxe(filenameipxe).await?; dhcp_server.set_filenameipxe(&filenameipxe).await?;
Outcome::new( Outcome::new(
InterpretStatus::SUCCESS, InterpretStatus::SUCCESS,
format!("Dhcp Interpret Set filenameipxe to {filenameipxe}"), format!("Dhcp Interpret Set filenameipxe to {filenameipxe}"),
@@ -209,7 +209,7 @@ impl<T: DhcpServer> Interpret<T> for DhcpInterpret {
Ok(Outcome::new( Ok(Outcome::new(
InterpretStatus::SUCCESS, InterpretStatus::SUCCESS,
"Dhcp Interpret execution successful".to_string(), format!("Dhcp Interpret execution successful"),
)) ))
} }
} }

View File

@@ -112,7 +112,7 @@ impl<T: Topology + DnsServer> Interpret<T> for DnsInterpret {
Ok(Outcome::new( Ok(Outcome::new(
InterpretStatus::SUCCESS, InterpretStatus::SUCCESS,
"Dns Interpret execution successful".to_string(), format!("Dns Interpret execution successful"),
)) ))
} }
} }

View File

@@ -55,7 +55,7 @@ impl<T: Topology + HelmCommand> Score<T> for HelmChartScore {
} }
fn name(&self) -> String { fn name(&self) -> String {
format!("{} [HelmChartScore]", self.release_name) format!("{} {} HelmChartScore", self.release_name, self.chart_name)
} }
} }
@@ -90,10 +90,14 @@ impl HelmChartInterpret {
); );
match add_output.status.success() { match add_output.status.success() {
true => Ok(()), true => {
false => Err(InterpretError::new(format!( return Ok(());
}
false => {
return Err(InterpretError::new(format!(
"Failed to add helm repository!\n{full_output}" "Failed to add helm repository!\n{full_output}"
))), )));
}
} }
} }
} }
@@ -208,7 +212,7 @@ impl<T: Topology + HelmCommand> Interpret<T> for HelmChartInterpret {
} }
let res = helm_executor.install_or_upgrade( let res = helm_executor.install_or_upgrade(
ns, &ns,
&self.score.release_name, &self.score.release_name,
&self.score.chart_name, &self.score.chart_name,
self.score.chart_version.as_ref(), self.score.chart_version.as_ref(),
@@ -225,27 +229,24 @@ impl<T: Topology + HelmCommand> Interpret<T> for HelmChartInterpret {
match status { match status {
helm_wrapper_rs::HelmDeployStatus::Deployed => Ok(Outcome::new( helm_wrapper_rs::HelmDeployStatus::Deployed => Ok(Outcome::new(
InterpretStatus::SUCCESS, InterpretStatus::SUCCESS,
format!("Helm Chart {} deployed", self.score.release_name), "Helm Chart deployed".to_string(),
)), )),
helm_wrapper_rs::HelmDeployStatus::PendingInstall => Ok(Outcome::new( helm_wrapper_rs::HelmDeployStatus::PendingInstall => Ok(Outcome::new(
InterpretStatus::RUNNING, InterpretStatus::RUNNING,
format!("Helm Chart {} pending install...", self.score.release_name), "Helm Chart Pending install".to_string(),
)), )),
helm_wrapper_rs::HelmDeployStatus::PendingUpgrade => Ok(Outcome::new( helm_wrapper_rs::HelmDeployStatus::PendingUpgrade => Ok(Outcome::new(
InterpretStatus::RUNNING, InterpretStatus::RUNNING,
format!("Helm Chart {} pending upgrade...", self.score.release_name), "Helm Chart pending upgrade".to_string(),
)),
helm_wrapper_rs::HelmDeployStatus::Failed => Err(InterpretError::new(
"Failed to install helm chart".to_string(),
)), )),
helm_wrapper_rs::HelmDeployStatus::Failed => Err(InterpretError::new(format!(
"Helm Chart {} installation failed",
self.score.release_name
))),
} }
} }
fn get_name(&self) -> InterpretName { fn get_name(&self) -> InterpretName {
InterpretName::HelmChart todo!()
} }
fn get_version(&self) -> Version { fn get_version(&self) -> Version {
todo!() todo!()
} }

View File

@@ -77,11 +77,14 @@ impl HelmCommandExecutor {
)?; )?;
} }
let out = self.clone().run_command( let out = match self.clone().run_command(
self.chart self.chart
.clone() .clone()
.helm_args(self.globals.chart_home.clone().unwrap()), .helm_args(self.globals.chart_home.clone().unwrap()),
)?; ) {
Ok(out) => out,
Err(e) => return Err(e),
};
// TODO: don't use unwrap here // TODO: don't use unwrap here
let s = String::from_utf8(out.stdout).unwrap(); let s = String::from_utf8(out.stdout).unwrap();
@@ -95,11 +98,14 @@ impl HelmCommandExecutor {
} }
pub fn version(self) -> Result<String, std::io::Error> { pub fn version(self) -> Result<String, std::io::Error> {
let out = self.run_command(vec![ let out = match self.run_command(vec![
"version".to_string(), "version".to_string(),
"-c".to_string(), "-c".to_string(),
"--short".to_string(), "--short".to_string(),
])?; ]) {
Ok(out) => out,
Err(e) => return Err(e),
};
// TODO: don't use unwrap // TODO: don't use unwrap
Ok(String::from_utf8(out.stdout).unwrap()) Ok(String::from_utf8(out.stdout).unwrap())
@@ -123,11 +129,15 @@ impl HelmCommandExecutor {
None => PathBuf::from(TempDir::new()?.path()), None => PathBuf::from(TempDir::new()?.path()),
}; };
if let Some(yaml_str) = self.chart.values_inline { match self.chart.values_inline {
let tf: TempFile = temp_file::with_contents(yaml_str.as_bytes()); Some(yaml_str) => {
let tf: TempFile;
tf = temp_file::with_contents(yaml_str.as_bytes());
self.chart self.chart
.additional_values_files .additional_values_files
.push(PathBuf::from(tf.path())); .push(PathBuf::from(tf.path()));
}
None => (),
}; };
self.env.insert( self.env.insert(
@@ -170,9 +180,9 @@ impl HelmChart {
match self.repo { match self.repo {
Some(r) => { Some(r) => {
if r.starts_with("oci://") { if r.starts_with("oci://") {
args.push( args.push(String::from(
r.trim_end_matches("/").to_string() + "/" + self.name.clone().as_str(), r.trim_end_matches("/").to_string() + "/" + self.name.clone().as_str(),
); ));
} else { } else {
args.push("--repo".to_string()); args.push("--repo".to_string());
args.push(r.to_string()); args.push(r.to_string());
@@ -183,10 +193,13 @@ impl HelmChart {
None => args.push(self.name), None => args.push(self.name),
}; };
if let Some(v) = self.version { match self.version {
Some(v) => {
args.push("--version".to_string()); args.push("--version".to_string());
args.push(v.to_string()); args.push(v.to_string());
} }
None => (),
}
args args
} }
@@ -349,7 +362,7 @@ impl<T: Topology + K8sclient + HelmCommand> Interpret<T> for HelmChartInterpretV
} }
fn get_name(&self) -> InterpretName { fn get_name(&self) -> InterpretName {
InterpretName::HelmCommand todo!()
} }
fn get_version(&self) -> Version { fn get_version(&self) -> Version {
todo!() todo!()

View File

@@ -1,12 +1,13 @@
use std::path::PathBuf; use std::path::PathBuf;
use async_trait::async_trait; use async_trait::async_trait;
use log::debug; use log::{debug, info};
use serde::Serialize; use serde::Serialize;
use crate::{ use crate::{
config::HARMONY_DATA_DIR, config::HARMONY_DATA_DIR,
data::{Id, Version}, data::{Id, Version},
instrumentation::{self, HarmonyEvent},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory, inventory::Inventory,
score::Score, score::Score,
@@ -29,14 +30,14 @@ impl Default for K3DInstallationScore {
} }
impl<T: Topology> Score<T> for K3DInstallationScore { impl<T: Topology> Score<T> for K3DInstallationScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> { fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> {
Box::new(K3dInstallationInterpret { Box::new(K3dInstallationInterpret {
score: self.clone(), score: self.clone(),
}) })
} }
fn name(&self) -> String { fn name(&self) -> String {
"K3dInstallationScore".into() todo!()
} }
} }
@@ -50,14 +51,20 @@ impl<T: Topology> Interpret<T> for K3dInstallationInterpret {
async fn execute( async fn execute(
&self, &self,
_inventory: &Inventory, _inventory: &Inventory,
_topology: &T, topology: &T,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
instrumentation::instrument(HarmonyEvent::InterpretExecutionStarted {
topology: topology.name().into(),
interpret: "k3d-installation".into(),
message: "installing k3d...".into(),
})
.unwrap();
let k3d = k3d_rs::K3d::new( let k3d = k3d_rs::K3d::new(
self.score.installation_path.clone(), self.score.installation_path.clone(),
Some(self.score.cluster_name.clone()), Some(self.score.cluster_name.clone()),
); );
let outcome = match k3d.ensure_installed().await {
match k3d.ensure_installed().await {
Ok(_client) => { Ok(_client) => {
let msg = format!("k3d cluster '{}' installed ", self.score.cluster_name); let msg = format!("k3d cluster '{}' installed ", self.score.cluster_name);
debug!("{msg}"); debug!("{msg}");
@@ -66,7 +73,16 @@ impl<T: Topology> Interpret<T> for K3dInstallationInterpret {
Err(msg) => Err(InterpretError::new(format!( Err(msg) => Err(InterpretError::new(format!(
"failed to ensure k3d is installed : {msg}" "failed to ensure k3d is installed : {msg}"
))), ))),
} };
instrumentation::instrument(HarmonyEvent::InterpretExecutionFinished {
topology: topology.name().into(),
interpret: "k3d-installation".into(),
outcome: outcome.clone(),
})
.unwrap();
outcome
} }
fn get_name(&self) -> InterpretName { fn get_name(&self) -> InterpretName {
InterpretName::K3dInstallation InterpretName::K3dInstallation

View File

@@ -89,7 +89,7 @@ where
)) ))
} }
fn get_name(&self) -> InterpretName { fn get_name(&self) -> InterpretName {
InterpretName::K8sResource todo!()
} }
fn get_version(&self) -> Version { fn get_version(&self) -> Version {
todo!() todo!()

View File

@@ -128,12 +128,13 @@ impl<T: Topology + K8sclient + HelmCommand> Interpret<T> for LAMPInterpret {
info!("Deploying score {deployment_score:#?}"); info!("Deploying score {deployment_score:#?}");
deployment_score.interpret(inventory, topology).await?; deployment_score
.create_interpret()
.execute(inventory, topology)
.await?;
info!("LAMP deployment_score {deployment_score:?}"); info!("LAMP deployment_score {deployment_score:?}");
let ingress_path = ingress_path!("/");
let lamp_ingress = K8sIngressScore { let lamp_ingress = K8sIngressScore {
name: fqdn!("lamp-ingress"), name: fqdn!("lamp-ingress"),
host: fqdn!("test"), host: fqdn!("test"),
@@ -143,14 +144,17 @@ impl<T: Topology + K8sclient + HelmCommand> Interpret<T> for LAMPInterpret {
.as_str() .as_str()
), ),
port: 8080, port: 8080,
path: Some(ingress_path), path: Some(ingress_path!("/")),
path_type: None, path_type: None,
namespace: self namespace: self
.get_namespace() .get_namespace()
.map(|nbs| fqdn!(nbs.to_string().as_str())), .map(|nbs| fqdn!(nbs.to_string().as_str())),
}; };
lamp_ingress.interpret(inventory, topology).await?; lamp_ingress
.create_interpret()
.execute(inventory, topology)
.await?;
info!("LAMP lamp_ingress {lamp_ingress:?}"); info!("LAMP lamp_ingress {lamp_ingress:?}");
@@ -160,7 +164,7 @@ impl<T: Topology + K8sclient + HelmCommand> Interpret<T> for LAMPInterpret {
} }
fn get_name(&self) -> InterpretName { fn get_name(&self) -> InterpretName {
InterpretName::Lamp todo!()
} }
fn get_version(&self) -> Version { fn get_version(&self) -> Version {
@@ -209,7 +213,7 @@ impl LAMPInterpret {
repository: None, repository: None,
}; };
score.interpret(inventory, topology).await score.create_interpret().execute(inventory, topology).await
} }
fn build_dockerfile(&self, score: &LAMPScore) -> Result<PathBuf, Box<dyn std::error::Error>> { fn build_dockerfile(&self, score: &LAMPScore) -> Result<PathBuf, Box<dyn std::error::Error>> {
let mut dockerfile = Dockerfile::new(); let mut dockerfile = Dockerfile::new();

View File

@@ -14,6 +14,5 @@ pub mod monitoring;
pub mod okd; pub mod okd;
pub mod opnsense; pub mod opnsense;
pub mod prometheus; pub mod prometheus;
pub mod storage;
pub mod tenant; pub mod tenant;
pub mod tftp; pub mod tftp;

View File

@@ -18,7 +18,7 @@ use crate::{
#[async_trait] #[async_trait]
impl AlertRule<KubePrometheus> for AlertManagerRuleGroup { impl AlertRule<KubePrometheus> for AlertManagerRuleGroup {
async fn install(&self, sender: &KubePrometheus) -> Result<Outcome, InterpretError> { async fn install(&self, sender: &KubePrometheus) -> Result<Outcome, InterpretError> {
sender.install_rule(self).await sender.install_rule(&self).await
} }
fn clone_box(&self) -> Box<dyn AlertRule<KubePrometheus>> { fn clone_box(&self) -> Box<dyn AlertRule<KubePrometheus>> {
Box::new(self.clone()) Box::new(self.clone())
@@ -28,7 +28,7 @@ impl AlertRule<KubePrometheus> for AlertManagerRuleGroup {
#[async_trait] #[async_trait]
impl AlertRule<Prometheus> for AlertManagerRuleGroup { impl AlertRule<Prometheus> for AlertManagerRuleGroup {
async fn install(&self, sender: &Prometheus) -> Result<Outcome, InterpretError> { async fn install(&self, sender: &Prometheus) -> Result<Outcome, InterpretError> {
sender.install_rule(self).await sender.install_rule(&self).await
} }
fn clone_box(&self) -> Box<dyn AlertRule<Prometheus>> { fn clone_box(&self) -> Box<dyn AlertRule<Prometheus>> {
Box::new(self.clone()) Box::new(self.clone())

View File

@@ -13,7 +13,7 @@ use crate::{
prometheus::prometheus::PrometheusApplicationMonitoring, prometheus::prometheus::PrometheusApplicationMonitoring,
}, },
score::Score, score::Score,
topology::{PreparationOutcome, Topology, oberservability::monitoring::AlertReceiver}, topology::{Topology, oberservability::monitoring::AlertReceiver},
}; };
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
@@ -33,10 +33,7 @@ impl<T: Topology + PrometheusApplicationMonitoring<CRDPrometheus>> Score<T>
} }
fn name(&self) -> String { fn name(&self) -> String {
format!( "ApplicationMonitoringScore".to_string()
"{} monitoring [ApplicationMonitoringScore]",
self.application.name()
)
} }
} }
@@ -54,27 +51,17 @@ impl<T: Topology + PrometheusApplicationMonitoring<CRDPrometheus>> Interpret<T>
inventory: &Inventory, inventory: &Inventory,
topology: &T, topology: &T,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
let result = topology topology
.install_prometheus( .install_prometheus(
&self.score.sender, &self.score.sender,
inventory, inventory,
Some(self.score.receivers.clone()), Some(self.score.receivers.clone()),
) )
.await; .await
match result {
Ok(outcome) => match outcome {
PreparationOutcome::Success { details: _ } => {
Ok(Outcome::success("Prometheus installed".into()))
}
PreparationOutcome::Noop => Ok(Outcome::noop()),
},
Err(err) => Err(InterpretError::from(err)),
}
} }
fn get_name(&self) -> InterpretName { fn get_name(&self) -> InterpretName {
InterpretName::ApplicationMonitoring todo!()
} }
fn get_version(&self) -> Version { fn get_version(&self) -> Version {

View File

@@ -4,14 +4,15 @@ use std::str::FromStr;
use crate::modules::helm::chart::HelmChartScore; use crate::modules::helm::chart::HelmChartScore;
pub fn grafana_helm_chart_score(ns: &str) -> HelmChartScore { pub fn grafana_helm_chart_score(ns: &str) -> HelmChartScore {
let values = r#" let values = format!(
r#"
rbac: rbac:
namespaced: true namespaced: true
sidecar: sidecar:
dashboards: dashboards:
enabled: true enabled: true
"# "#
.to_string(); );
HelmChartScore { HelmChartScore {
namespace: Some(NonBlankString::from_str(ns).unwrap()), namespace: Some(NonBlankString::from_str(ns).unwrap()),

View File

@@ -1,6 +1,7 @@
use kube::CustomResource; use kube::CustomResource;
use schemars::JsonSchema; use schemars::JsonSchema;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use super::crd_prometheuses::LabelSelector; use super::crd_prometheuses::LabelSelector;

View File

@@ -1,8 +1,13 @@
use crate::modules::prometheus::alerts::k8s::{ use std::collections::BTreeMap;
use crate::modules::{
monitoring::alert_rule::prometheus_alert_rule::PrometheusAlertRule,
prometheus::alerts::k8s::{
deployment::alert_deployment_unavailable, deployment::alert_deployment_unavailable,
pod::{alert_container_restarting, alert_pod_not_ready, pod_failed}, pod::{alert_container_restarting, alert_pod_not_ready, pod_failed},
pvc::high_pvc_fill_rate_over_two_days, pvc::high_pvc_fill_rate_over_two_days,
service::alert_service_down, service::alert_service_down,
},
}; };
use super::crd_prometheus_rules::Rule; use super::crd_prometheus_rules::Rule;

View File

@@ -6,6 +6,8 @@ use serde::{Deserialize, Serialize};
use crate::modules::monitoring::alert_rule::prometheus_alert_rule::PrometheusAlertRule; use crate::modules::monitoring::alert_rule::prometheus_alert_rule::PrometheusAlertRule;
use super::crd_default_rules::build_default_application_rules;
#[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)] #[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[kube( #[kube(
group = "monitoring.coreos.com", group = "monitoring.coreos.com",

View File

@@ -1,9 +1,11 @@
use std::collections::HashMap; use std::collections::{BTreeMap, HashMap};
use kube::CustomResource; use kube::{CustomResource, Resource, api::ObjectMeta};
use schemars::JsonSchema; use schemars::JsonSchema;
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use crate::interpret::InterpretError;
use crate::modules::monitoring::kube_prometheus::types::{ use crate::modules::monitoring::kube_prometheus::types::{
HTTPScheme, MatchExpression, NamespaceSelector, Operator, Selector, HTTPScheme, MatchExpression, NamespaceSelector, Operator, Selector,
ServiceMonitor as KubeServiceMonitor, ServiceMonitorEndpoint, ServiceMonitor as KubeServiceMonitor, ServiceMonitorEndpoint,
@@ -48,7 +50,7 @@ pub struct ServiceMonitorSpec {
impl Default for ServiceMonitorSpec { impl Default for ServiceMonitorSpec {
fn default() -> Self { fn default() -> Self {
let labels = HashMap::new(); let mut labels = HashMap::new();
Self { Self {
selector: Selector { selector: Selector {
match_labels: { labels }, match_labels: { labels },

View File

@@ -27,12 +27,6 @@ pub struct KubePrometheusConfig {
pub alert_rules: Vec<AlertManagerAdditionalPromRules>, pub alert_rules: Vec<AlertManagerAdditionalPromRules>,
pub additional_service_monitors: Vec<ServiceMonitor>, pub additional_service_monitors: Vec<ServiceMonitor>,
} }
impl Default for KubePrometheusConfig {
fn default() -> Self {
Self::new()
}
}
impl KubePrometheusConfig { impl KubePrometheusConfig {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {

View File

@@ -35,7 +35,7 @@ pub fn kube_prometheus_helm_chart_score(
let kube_proxy = config.kube_proxy.to_string(); let kube_proxy = config.kube_proxy.to_string();
let kube_state_metrics = config.kube_state_metrics.to_string(); let kube_state_metrics = config.kube_state_metrics.to_string();
let node_exporter = config.node_exporter.to_string(); let node_exporter = config.node_exporter.to_string();
let _prometheus_operator = config.prometheus_operator.to_string(); let prometheus_operator = config.prometheus_operator.to_string();
let prometheus = config.prometheus.to_string(); let prometheus = config.prometheus.to_string();
let resource_limit = Resources { let resource_limit = Resources {
limits: Limits { limits: Limits {
@@ -64,7 +64,7 @@ pub fn kube_prometheus_helm_chart_score(
indent_lines(&yaml, indent_level + 2) indent_lines(&yaml, indent_level + 2)
) )
} }
let _resource_section = resource_block(&resource_limit, 2); let resource_section = resource_block(&resource_limit, 2);
let mut values = format!( let mut values = format!(
r#" r#"

View File

@@ -55,12 +55,6 @@ pub struct KubePrometheus {
pub config: Arc<Mutex<KubePrometheusConfig>>, pub config: Arc<Mutex<KubePrometheusConfig>>,
} }
impl Default for KubePrometheus {
fn default() -> Self {
Self::new()
}
}
impl KubePrometheus { impl KubePrometheus {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
@@ -119,7 +113,8 @@ impl KubePrometheus {
topology: &T, topology: &T,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
kube_prometheus_helm_chart_score(self.config.clone()) kube_prometheus_helm_chart_score(self.config.clone())
.interpret(inventory, topology) .create_interpret()
.execute(inventory, topology)
.await .await
} }
} }

View File

@@ -1,25 +1,9 @@
use non_blank_string_rs::NonBlankString; use non_blank_string_rs::NonBlankString;
use std::str::FromStr; use std::str::FromStr;
use crate::{modules::helm::chart::HelmChartScore, topology::DeploymentTarget}; use crate::modules::helm::chart::{HelmChartScore, HelmRepository};
pub fn ntfy_helm_chart_score(
namespace: String,
host: String,
target: DeploymentTarget,
) -> HelmChartScore {
// TODO not actually the correct logic, this should be fixed by using an ingresss which is the
// correct k8s standard.
//
// Another option is to delegate to the topology the ingress technology it wants to use Route,
// Ingress or other
let route_enabled = match target {
DeploymentTarget::LocalDev => false,
DeploymentTarget::Staging => true,
DeploymentTarget::Production => true,
};
let ingress_enabled = !route_enabled;
pub fn ntfy_helm_chart_score(namespace: String, host: String) -> HelmChartScore {
let values = format!( let values = format!(
r#" r#"
replicaCount: 1 replicaCount: 1
@@ -41,14 +25,23 @@ serviceAccount:
service: service:
type: ClusterIP type: ClusterIP
port: 8080 port: 80
ingress: ingress:
enabled: {ingress_enabled} enabled: true
# annotations:
# kubernetes.io/ingress.class: nginx
# kubernetes.io/tls-acme: "true"
hosts:
- host: {host}
paths:
- path: /
pathType: ImplementationSpecific
tls: []
# - secretName: chart-example-tls
# hosts:
# - chart-example.local
route:
enabled: {route_enabled}
host: {host}
autoscaling: autoscaling:
enabled: false enabled: false
@@ -56,7 +49,7 @@ autoscaling:
config: config:
enabled: true enabled: true
data: data:
base-url: "https://{host}" # base-url: "https://ntfy.something.com"
auth-file: "/var/cache/ntfy/user.db" auth-file: "/var/cache/ntfy/user.db"
auth-default-access: "deny-all" auth-default-access: "deny-all"
cache-file: "/var/cache/ntfy/cache.db" cache-file: "/var/cache/ntfy/cache.db"
@@ -66,7 +59,6 @@ config:
enable-signup: false enable-signup: false
enable-login: "true" enable-login: "true"
enable-metrics: "true" enable-metrics: "true"
listen-http: ":8080"
persistence: persistence:
enabled: true enabled: true
@@ -77,12 +69,16 @@ persistence:
HelmChartScore { HelmChartScore {
namespace: Some(NonBlankString::from_str(&namespace).unwrap()), namespace: Some(NonBlankString::from_str(&namespace).unwrap()),
release_name: NonBlankString::from_str("ntfy").unwrap(), release_name: NonBlankString::from_str("ntfy").unwrap(),
chart_name: NonBlankString::from_str("oci://hub.nationtech.io/harmony/ntfy").unwrap(), chart_name: NonBlankString::from_str("sarab97/ntfy").unwrap(),
chart_version: Some(NonBlankString::from_str("0.1.7-nationtech.1").unwrap()), chart_version: Some(NonBlankString::from_str("0.1.7").unwrap()),
values_overrides: None, values_overrides: None,
values_yaml: Some(values.to_string()), values_yaml: Some(values.to_string()),
create_namespace: true, create_namespace: true,
install_only: false, install_only: false,
repository: None, repository: Some(HelmRepository::new(
"sarab97".to_string(),
url::Url::parse("https://charts.sarabsingh.com").unwrap(),
true,
)),
} }
} }

View File

@@ -1,3 +1,2 @@
pub mod helm; pub mod helm;
#[allow(clippy::module_inception)]
pub mod ntfy; pub mod ntfy;

View File

@@ -1,7 +1,7 @@
use std::sync::Arc; use std::sync::Arc;
use async_trait::async_trait; use async_trait::async_trait;
use log::info; use log::debug;
use serde::Serialize; use serde::Serialize;
use strum::{Display, EnumString}; use strum::{Display, EnumString};
@@ -11,7 +11,7 @@ use crate::{
inventory::Inventory, inventory::Inventory,
modules::monitoring::ntfy::helm::ntfy_helm_chart::ntfy_helm_chart_score, modules::monitoring::ntfy::helm::ntfy_helm_chart::ntfy_helm_chart_score,
score::Score, score::Score,
topology::{HelmCommand, K8sclient, MultiTargetTopology, Topology, k8s::K8sClient}, topology::{HelmCommand, K8sclient, Topology, k8s::K8sClient},
}; };
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
@@ -20,7 +20,7 @@ pub struct NtfyScore {
pub host: String, pub host: String,
} }
impl<T: Topology + HelmCommand + K8sclient + MultiTargetTopology> Score<T> for NtfyScore { impl<T: Topology + HelmCommand + K8sclient> Score<T> for NtfyScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> { fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(NtfyInterpret { Box::new(NtfyInterpret {
score: self.clone(), score: self.clone(),
@@ -28,7 +28,7 @@ impl<T: Topology + HelmCommand + K8sclient + MultiTargetTopology> Score<T> for N
} }
fn name(&self) -> String { fn name(&self) -> String {
"alert receiver [NtfyScore]".into() format!("Ntfy")
} }
} }
@@ -39,21 +39,31 @@ pub struct NtfyInterpret {
#[derive(Debug, EnumString, Display)] #[derive(Debug, EnumString, Display)]
enum NtfyAccessMode { enum NtfyAccessMode {
#[strum(serialize = "read-write", serialize = "rw")] #[strum(serialize = "read-write", serialize = "rw", to_string = "read-write")]
ReadWrite, ReadWrite,
#[strum(serialize = "read-only", serialize = "ro", serialize = "read")] #[strum(
serialize = "read-only",
serialize = "ro",
serialize = "read",
to_string = "read-only"
)]
ReadOnly, ReadOnly,
#[strum(serialize = "write-only", serialize = "wo", serialize = "write")] #[strum(
serialize = "write-only",
serialize = "wo",
serialize = "write",
to_string = "write-only"
)]
WriteOnly, WriteOnly,
#[strum(serialize = "deny", serialize = "none")] #[strum(serialize = "none", to_string = "deny")]
Deny, Deny,
} }
#[derive(Debug, EnumString, Display)] #[derive(Debug, EnumString, Display)]
enum NtfyRole { enum NtfyRole {
#[strum(serialize = "user")] #[strum(serialize = "user", to_string = "user")]
User, User,
#[strum(serialize = "admin")] #[strum(serialize = "admin", to_string = "admin")]
Admin, Admin,
} }
@@ -77,7 +87,7 @@ impl NtfyInterpret {
vec![ vec![
"sh", "sh",
"-c", "-c",
format!("NTFY_PASSWORD={password} ntfy user add --role={role} --ignore-exists {username}") format!("NTFY_PASSWORD={password} ntfy user add --role={role} {username}")
.as_str(), .as_str(),
], ],
) )
@@ -85,52 +95,69 @@ impl NtfyInterpret {
Ok(()) Ok(())
} }
async fn set_access(
&self,
k8s_client: Arc<K8sClient>,
username: &str,
topic: &str,
mode: NtfyAccessMode,
) -> Result<(), String> {
k8s_client
.exec_app(
"ntfy".to_string(),
Some(&self.score.namespace),
vec![
"sh",
"-c",
format!("ntfy access {username} {topic} {mode}").as_str(),
],
)
.await?;
Ok(())
}
} }
/// We need a ntfy interpret to wrap the HelmChartScore in order to run the score, and then bootstrap the config inside ntfy /// We need a ntfy interpret to wrap the HelmChartScore in order to run the score, and then bootstrap the config inside ntfy
#[async_trait] #[async_trait]
impl<T: Topology + HelmCommand + K8sclient + MultiTargetTopology> Interpret<T> for NtfyInterpret { impl<T: Topology + HelmCommand + K8sclient> Interpret<T> for NtfyInterpret {
async fn execute( async fn execute(
&self, &self,
inventory: &Inventory, inventory: &Inventory,
topology: &T, topology: &T,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
ntfy_helm_chart_score( ntfy_helm_chart_score(self.score.namespace.clone(), self.score.host.clone())
self.score.namespace.clone(), .create_interpret()
self.score.host.clone(), .execute(inventory, topology)
topology.current_target(),
)
.interpret(inventory, topology)
.await?; .await?;
info!("installed ntfy helm chart"); debug!("installed ntfy helm chart");
let client = topology let client = topology
.k8s_client() .k8s_client()
.await .await
.expect("couldn't get k8s client"); .expect("couldn't get k8s client");
info!("deploying ntfy...");
client client
.wait_until_deployment_ready( .wait_until_deployment_ready(
"ntfy".to_string(), "ntfy".to_string(),
Some(self.score.namespace.as_str()), Some(&self.score.namespace.as_str()),
None, None,
) )
.await?; .await?;
info!("ntfy deployed"); debug!("created k8s client");
info!("adding user harmony");
self.add_user(client, "harmony", "harmony", Some(NtfyRole::Admin)) self.add_user(client, "harmony", "harmony", Some(NtfyRole::Admin))
.await?; .await?;
info!("user added");
Ok(Outcome::success("Ntfy installed".to_string())) debug!("exec into pod done");
Ok(Outcome::success("installed ntfy".to_string()))
} }
fn get_name(&self) -> InterpretName { fn get_name(&self) -> InterpretName {
InterpretName::Ntfy todo!()
} }
fn get_version(&self) -> Version { fn get_version(&self) -> Version {
todo!() todo!()
} }

View File

@@ -1,4 +1,3 @@
pub mod helm; pub mod helm;
#[allow(clippy::module_inception)]
pub mod prometheus; pub mod prometheus;
pub mod prometheus_config; pub mod prometheus_config;

View File

@@ -37,12 +37,6 @@ impl AlertSender for Prometheus {
} }
} }
impl Default for Prometheus {
fn default() -> Self {
Self::new()
}
}
impl Prometheus { impl Prometheus {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {
@@ -100,7 +94,8 @@ impl Prometheus {
topology: &T, topology: &T,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
prometheus_helm_chart_score(self.config.clone()) prometheus_helm_chart_score(self.config.clone())
.interpret(inventory, topology) .create_interpret()
.execute(inventory, topology)
.await .await
} }
pub async fn install_grafana<T: Topology + HelmCommand + Send + Sync>( pub async fn install_grafana<T: Topology + HelmCommand + Send + Sync>(
@@ -115,12 +110,13 @@ impl Prometheus {
if let Some(ns) = namespace.as_deref() { if let Some(ns) = namespace.as_deref() {
grafana_helm_chart_score(ns) grafana_helm_chart_score(ns)
.interpret(inventory, topology) .create_interpret()
.execute(inventory, topology)
.await .await
} else { } else {
Err(InterpretError::new( Err(InterpretError::new(format!(
"could not install grafana, missing namespace".to_string(), "could not install grafana, missing namespace",
)) )))
} }
} }
} }

View File

@@ -16,12 +16,6 @@ pub struct PrometheusConfig {
pub additional_service_monitors: Vec<ServiceMonitor>, pub additional_service_monitors: Vec<ServiceMonitor>,
} }
impl Default for PrometheusConfig {
fn default() -> Self {
Self::new()
}
}
impl PrometheusConfig { impl PrometheusConfig {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {

View File

@@ -32,7 +32,7 @@ impl OKDBootstrapDhcpScore {
logical_host: topology.bootstrap_host.clone(), logical_host: topology.bootstrap_host.clone(),
physical_host: inventory physical_host: inventory
.worker_host .worker_host
.first() .get(0)
.expect("Should have at least one worker to be used as bootstrap node") .expect("Should have at least one worker to be used as bootstrap node")
.clone(), .clone(),
}); });

View File

@@ -6,12 +6,6 @@ pub struct OKDUpgradeScore {
_target_version: Version, _target_version: Version,
} }
impl Default for OKDUpgradeScore {
fn default() -> Self {
Self::new()
}
}
impl OKDUpgradeScore { impl OKDUpgradeScore {
pub fn new() -> Self { pub fn new() -> Self {
Self { Self {

View File

@@ -61,7 +61,7 @@ impl<T: Topology + K8sclient + PrometheusApplicationMonitoring<CRDPrometheus>> S
} }
fn name(&self) -> String { fn name(&self) -> String {
"prometheus alerting [CRDAlertingScore]".into() "CRDApplicationAlertingScore".into()
} }
} }
@@ -93,13 +93,13 @@ impl<T: Topology + K8sclient + PrometheusApplicationMonitoring<CRDPrometheus>> I
self.install_rules(&self.prometheus_rules, &client).await?; self.install_rules(&self.prometheus_rules, &client).await?;
self.install_monitors(self.service_monitors.clone(), &client) self.install_monitors(self.service_monitors.clone(), &client)
.await?; .await?;
Ok(Outcome::success( Ok(Outcome::success(format!(
"K8s monitoring components installed".to_string(), "deployed application monitoring composants"
)) )))
} }
fn get_name(&self) -> InterpretName { fn get_name(&self) -> InterpretName {
InterpretName::K8sPrometheusCrdAlerting todo!()
} }
fn get_version(&self) -> Version { fn get_version(&self) -> Version {
@@ -118,7 +118,7 @@ impl<T: Topology + K8sclient + PrometheusApplicationMonitoring<CRDPrometheus>> I
impl K8sPrometheusCRDAlertingInterpret { impl K8sPrometheusCRDAlertingInterpret {
async fn crd_exists(&self, crd: &str) -> bool { async fn crd_exists(&self, crd: &str) -> bool {
let status = Command::new("sh") let status = Command::new("sh")
.args(["-c", &format!("kubectl get crd -A | grep -i {crd}")]) .args(["-c", "kubectl get crd -A | grep -i", crd])
.status() .status()
.map_err(|e| InterpretError::new(format!("could not connect to cluster: {}", e))) .map_err(|e| InterpretError::new(format!("could not connect to cluster: {}", e)))
.unwrap(); .unwrap();
@@ -166,8 +166,7 @@ impl K8sPrometheusCRDAlertingInterpret {
let install_output = Command::new("helm") let install_output = Command::new("helm")
.args([ .args([
"upgrade", "install",
"--install",
&chart_name, &chart_name,
tgz_path.to_str().unwrap(), tgz_path.to_str().unwrap(),
"--namespace", "--namespace",
@@ -416,7 +415,7 @@ impl K8sPrometheusCRDAlertingInterpret {
async fn install_rules( async fn install_rules(
&self, &self,
#[allow(clippy::ptr_arg)] rules: &Vec<RuleGroup>, rules: &Vec<RuleGroup>,
client: &Arc<K8sClient>, client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
let mut prom_rule_spec = PrometheusRuleSpec { let mut prom_rule_spec = PrometheusRuleSpec {
@@ -424,7 +423,7 @@ impl K8sPrometheusCRDAlertingInterpret {
}; };
let default_rules_group = RuleGroup { let default_rules_group = RuleGroup {
name: "default-rules".to_string(), name: format!("default-rules"),
rules: build_default_application_rules(), rules: build_default_application_rules(),
}; };

View File

@@ -1,4 +1,3 @@
pub mod alerts; pub mod alerts;
pub mod k8s_prometheus_alerting_score; pub mod k8s_prometheus_alerting_score;
#[allow(clippy::module_inception)]
pub mod prometheus; pub mod prometheus;

View File

@@ -1,11 +1,9 @@
use async_trait::async_trait; use async_trait::async_trait;
use crate::{ use crate::{
interpret::{InterpretError, Outcome},
inventory::Inventory, inventory::Inventory,
topology::{ topology::oberservability::monitoring::{AlertReceiver, AlertSender},
PreparationError, PreparationOutcome,
oberservability::monitoring::{AlertReceiver, AlertSender},
},
}; };
#[async_trait] #[async_trait]
@@ -15,5 +13,5 @@ pub trait PrometheusApplicationMonitoring<S: AlertSender> {
sender: &S, sender: &S,
inventory: &Inventory, inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<S>>>>, receivers: Option<Vec<Box<dyn AlertReceiver<S>>>>,
) -> Result<PreparationOutcome, PreparationError>; ) -> Result<Outcome, InterpretError>;
} }

View File

@@ -1,419 +0,0 @@
use std::{
process::Command,
sync::Arc,
time::{Duration, Instant},
};
use async_trait::async_trait;
use log::{info, warn};
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use crate::{
data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
};
#[derive(Debug, Clone, Serialize)]
pub struct CephRemoveOsd {
pub osd_deployment_name: String,
pub rook_ceph_namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
fn name(&self) -> String {
format!("CephRemoveOsdScore")
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(CephRemoveOsdInterpret {
score: self.clone(),
})
}
}
#[derive(Debug, Clone)]
pub struct CephRemoveOsdInterpret {
score: CephRemoveOsd,
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await.unwrap();
self.verify_ceph_toolbox_exists(client.clone()).await?;
self.scale_deployment(client.clone()).await?;
self.verify_deployment_scaled(client.clone()).await?;
self.delete_deployment(client.clone()).await?;
self.verify_deployment_deleted(client.clone()).await?;
let osd_id_full = self.get_ceph_osd_id().unwrap();
self.purge_ceph_osd(client.clone(), &osd_id_full).await?;
self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
.await?;
Ok(Outcome::success(format!(
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
osd_id_full, self.score.osd_deployment_name
)))
}
fn get_name(&self) -> InterpretName {
todo!()
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl CephRemoveOsdInterpret {
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self
.score
.osd_deployment_name
.split('-')
.nth(3)
.ok_or_else(|| {
InterpretError::new(format!(
"Could not parse OSD id from deployment name {}",
self.score.osd_deployment_name
))
})?;
let osd_id_full = format!("osd.{}", osd_id_numeric);
info!(
"Targeting Ceph OSD: {} (parsed from deployment {})",
osd_id_full, self.score.osd_deployment_name
);
Ok(osd_id_full)
}
pub async fn verify_ceph_toolbox_exists(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let toolbox_dep = "rook-ceph-tools".to_string();
match client
.get_deployment(&toolbox_dep, Some(&self.score.rook_ceph_namespace))
.await
{
Ok(Some(deployment)) => {
if let Some(status) = deployment.status {
let ready_count = status.ready_replicas.unwrap_or(0);
if ready_count >= 1 {
return Ok(Outcome::success(format!(
"'{}' is ready with {} replica(s).",
&toolbox_dep, ready_count
)));
} else {
return Err(InterpretError::new(
"ceph-tool-box not ready in cluster".to_string(),
));
}
} else {
Err(InterpretError::new(format!(
"failed to get deployment status {}",
&toolbox_dep
)))
}
}
Ok(None) => Err(InterpretError::new(format!(
"Deployment '{}' not found in namespace '{}'.",
&toolbox_dep, self.score.rook_ceph_namespace
))),
Err(e) => Err(InterpretError::new(format!(
"Failed to query for deployment '{}': {}",
&toolbox_dep, e
))),
}
}
pub async fn scale_deployment(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
"Scaling down OSD deployment: {}",
self.score.osd_deployment_name
);
client
.scale_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
0,
)
.await?;
Ok(Outcome::success(format!(
"Scaled down deployment {}",
self.score.osd_deployment_name
)))
}
pub async fn verify_deployment_scaled(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
)
.await?;
if let Some(deployment) = dep {
if let Some(status) = deployment.status {
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0
{
return Ok(Outcome::success(
"Deployment successfully scaled down.".to_string(),
));
}
}
}
if start.elapsed() > timeout {
return Err(InterpretError::new(format!(
"Timed out waiting for deployment {} to scale down",
self.score.osd_deployment_name
)));
}
sleep(interval).await;
}
}
fn build_timer(&self) -> (Duration, Duration, Instant) {
let timeout = Duration::from_secs(120);
let interval = Duration::from_secs(5);
let start = Instant::now();
(timeout, interval, start)
}
pub async fn delete_deployment(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
"Deleting OSD deployment: {}",
self.score.osd_deployment_name
);
client
.delete_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
)
.await?;
Ok(Outcome::success(format!(
"deployment {} deleted",
self.score.osd_deployment_name
)))
}
pub async fn verify_deployment_deleted(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
)
.await?;
if dep.is_none() {
info!(
"Deployment {} successfully deleted.",
self.score.osd_deployment_name
);
return Ok(Outcome::success(format!(
"Deployment {} deleted.",
self.score.osd_deployment_name
)));
}
if start.elapsed() > timeout {
return Err(InterpretError::new(format!(
"Timed out waiting for deployment {} to be deleted",
self.score.osd_deployment_name
)));
}
sleep(interval).await;
}
}
fn get_osd_tree(&self, json: serde_json::Value) -> Result<CephOsdTree, InterpretError> {
let nodes = json.get("nodes").ok_or_else(|| {
InterpretError::new("Missing 'nodes' field in ceph osd tree JSON".to_string())
})?;
let tree: CephOsdTree = CephOsdTree {
nodes: serde_json::from_value(nodes.clone()).map_err(|e| {
InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e))
})?,
};
Ok(tree)
}
pub async fn purge_ceph_osd(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
info!(
"Purging OSD {} from Ceph cluster and removing its auth key",
osd_id_full
);
client
.exec_app_capture_output(
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec![
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(),
format!("ceph auth del osd.{osd_id_full}").as_str(),
],
)
.await?;
Ok(Outcome::success(format!(
"osd id {} removed from osd tree",
osd_id_full
)))
}
pub async fn verify_ceph_osd_removal(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!(
"Verifying OSD {} has been removed from the Ceph tree...",
osd_id_full
);
loop {
let output = client
.exec_app_capture_output(
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec!["ceph osd tree -f json"],
)
.await?;
let tree =
self.get_osd_tree(serde_json::from_str(&output).expect("could not extract json"));
let osd_found = tree
.unwrap()
.nodes
.iter()
.any(|node| node.name == osd_id_full);
if !osd_found {
return Ok(Outcome::success(format!(
"Successfully verified that OSD {} is removed from the Ceph cluster.",
osd_id_full,
)));
}
if start.elapsed() > timeout {
return Err(InterpretError::new(format!(
"Timed out waiting for OSD {} to be removed from Ceph tree",
osd_id_full
)));
}
warn!(
"OSD {} still found in Ceph tree, retrying in {:?}...",
osd_id_full, interval
);
sleep(interval).await;
}
}
}
#[derive(Debug, Deserialize, PartialEq)]
pub struct CephOsdTree {
pub nodes: Vec<CephNode>,
}
#[derive(Debug, Deserialize, PartialEq)]
pub struct CephNode {
pub id: i32,
pub name: String,
#[serde(rename = "type")]
pub node_type: String,
pub type_id: Option<i32>,
pub children: Option<Vec<i32>>,
pub exists: Option<i32>,
pub status: Option<String>,
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
#[test]
fn test_get_osd_tree() {
let json_data = json!({
"nodes": [
{"id": 1, "name": "osd.1", "type": "osd", "primary_affinity":"1"},
{"id": 2, "name": "osd.2", "type": "osd", "crush_weight": 1.22344}
]
});
let interpret = CephRemoveOsdInterpret {
score: CephRemoveOsd {
osd_deployment_name: "osd-1".to_string(),
rook_ceph_namespace: "dummy_ns".to_string(),
},
};
let json = interpret.get_osd_tree(json_data).unwrap();
let expected = CephOsdTree {
nodes: vec![
CephNode {
id: 1,
name: "osd.1".to_string(),
node_type: "osd".to_string(),
type_id: None,
children: None,
exists: None,
status: None,
},
CephNode {
id: 2,
name: "osd.2".to_string(),
node_type: "osd".to_string(),
type_id: None,
children: None,
exists: None,
status: None,
},
],
};
assert_eq!(json, expected);
}
}

View File

@@ -1 +0,0 @@
pub mod ceph_remove_osd_score;

View File

@@ -1 +0,0 @@
pub mod ceph;

View File

@@ -17,7 +17,7 @@ impl<T: Topology + TenantCredentialManager> Score<T> for TenantCredentialScore {
} }
fn name(&self) -> String { fn name(&self) -> String {
"TenantCredentialScore".into() todo!()
} }
} }

View File

@@ -28,7 +28,7 @@ impl<T: Topology + TenantManager> Score<T> for TenantScore {
} }
fn name(&self) -> String { fn name(&self) -> String {
format!("{} [TenantScore]", self.config.name) format!("{} TenantScore", self.config.name)
} }
} }
@@ -47,8 +47,8 @@ impl<T: Topology + TenantManager> Interpret<T> for TenantInterpret {
topology.provision_tenant(&self.tenant_config).await?; topology.provision_tenant(&self.tenant_config).await?;
Ok(Outcome::success(format!( Ok(Outcome::success(format!(
"Tenant provisioned with id '{}'", "Successfully provisioned tenant {} with id {}",
self.tenant_config.id self.tenant_config.name, self.tenant_config.id
))) )))
} }

View File

@@ -5,10 +5,6 @@ version.workspace = true
readme.workspace = true readme.workspace = true
license.workspace = true license.workspace = true
[features]
default = ["tui"]
tui = ["dep:harmony_tui"]
[dependencies] [dependencies]
assert_cmd = "2.0.17" assert_cmd = "2.0.17"
clap = { version = "4.5.35", features = ["derive"] } clap = { version = "4.5.35", features = ["derive"] }
@@ -23,5 +19,7 @@ lazy_static = "1.5.0"
log.workspace = true log.workspace = true
indicatif-log-bridge = "0.2.3" indicatif-log-bridge = "0.2.3"
[dev-dependencies]
harmony = { path = "../harmony", features = ["testing"] } [features]
default = ["tui"]
tui = ["dep:harmony_tui"]

View File

@@ -1,22 +1,16 @@
use harmony::{ use harmony::instrumentation::{self, HarmonyEvent};
instrumentation::{self, HarmonyEvent}, use indicatif::{MultiProgress, ProgressBar};
modules::application::ApplicationFeatureStatus,
topology::TopologyStatus,
};
use indicatif::MultiProgress;
use indicatif_log_bridge::LogWrapper; use indicatif_log_bridge::LogWrapper;
use log::error;
use std::{ use std::{
collections::{HashMap, hash_map},
sync::{Arc, Mutex}, sync::{Arc, Mutex},
thread,
time::Duration,
}; };
use crate::progress::{IndicatifProgressTracker, ProgressTracker}; use crate::progress;
pub fn init() -> tokio::task::JoinHandle<()> { pub fn init() -> tokio::task::JoinHandle<()> {
let base_progress = configure_logger(); configure_logger();
let handle = tokio::spawn(handle_events(base_progress)); let handle = tokio::spawn(handle_events());
loop { loop {
if instrumentation::instrument(HarmonyEvent::HarmonyStarted).is_ok() { if instrumentation::instrument(HarmonyEvent::HarmonyStarted).is_ok() {
@@ -27,175 +21,91 @@ pub fn init() -> tokio::task::JoinHandle<()> {
handle handle
} }
fn configure_logger() -> MultiProgress { fn configure_logger() {
let logger = let logger =
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build(); env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build();
let level = logger.filter(); let level = logger.filter();
let progress = MultiProgress::new(); let multi = MultiProgress::new();
LogWrapper::new(multi.clone(), logger).try_init().unwrap();
LogWrapper::new(progress.clone(), logger)
.try_init()
.unwrap();
log::set_max_level(level); log::set_max_level(level);
progress
} }
async fn handle_events(base_progress: MultiProgress) { async fn handle_events() {
let progress_tracker = Arc::new(IndicatifProgressTracker::new(base_progress.clone()));
let preparing_topology = Arc::new(Mutex::new(false));
let current_score: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
instrumentation::subscribe("Harmony CLI Logger", { instrumentation::subscribe("Harmony CLI Logger", {
let sections: Arc<Mutex<HashMap<String, MultiProgress>>> =
Arc::new(Mutex::new(HashMap::new()));
let progress_bars: Arc<Mutex<HashMap<String, ProgressBar>>> =
Arc::new(Mutex::new(HashMap::new()));
move |event| { move |event| {
let progress_tracker = Arc::clone(&progress_tracker); let sections_clone = Arc::clone(&sections);
let preparing_topology = Arc::clone(&preparing_topology); let progress_bars_clone = Arc::clone(&progress_bars);
let current_score = Arc::clone(&current_score);
async move { async move {
let mut preparing_topology = preparing_topology.lock().unwrap(); let mut sections = sections_clone.lock().unwrap();
let mut current_score = current_score.lock().unwrap(); let mut progress_bars = progress_bars_clone.lock().unwrap();
match event { match event {
HarmonyEvent::HarmonyStarted => {} HarmonyEvent::HarmonyStarted => {}
HarmonyEvent::HarmonyFinished => { HarmonyEvent::PrepareTopologyStarted { topology: name } => {
progress_tracker.add_section( let section = progress::new_section(format!(
"harmony-summary", "{} Preparing environment: {name}...",
&format!("\n{} Harmony completed\n\n", crate::theme::EMOJI_HARMONY), crate::theme::EMOJI_TOPOLOGY,
); ));
progress_tracker.add_section("harmony-finished", "\n\n"); (*sections).insert(name, section);
thread::sleep(Duration::from_millis(200));
return false;
} }
HarmonyEvent::TopologyStateChanged { HarmonyEvent::TopologyPrepared {
topology, topology: name,
status, outcome,
message,
} => { } => {
let section_key = topology_key(&topology); let section = (*sections).get(&name).unwrap();
let progress = progress::add_spinner(section, "".into());
match status { match outcome.status {
TopologyStatus::Queued => {} harmony::interpret::InterpretStatus::SUCCESS => {
TopologyStatus::Preparing => { progress::success(section, Some(progress), outcome.message);
progress_tracker.add_section(
&section_key,
&format!(
"\n{} Preparing environment: {topology}...",
crate::theme::EMOJI_TOPOLOGY
),
);
(*preparing_topology) = true;
} }
TopologyStatus::Success => { harmony::interpret::InterpretStatus::FAILURE => {
(*preparing_topology) = false; progress::error(section, Some(progress), outcome.message);
progress_tracker.add_task(&section_key, "topology-success", "");
progress_tracker
.finish_task("topology-success", &message.unwrap_or("".into()));
} }
TopologyStatus::Noop => { harmony::interpret::InterpretStatus::RUNNING => todo!(),
(*preparing_topology) = false; harmony::interpret::InterpretStatus::QUEUED => todo!(),
progress_tracker.add_task(&section_key, "topology-skip", ""); harmony::interpret::InterpretStatus::BLOCKED => todo!(),
progress_tracker harmony::interpret::InterpretStatus::NOOP => {
.skip_task("topology-skip", &message.unwrap_or("".into())); progress::skip(section, Some(progress), outcome.message);
}
TopologyStatus::Error => {
progress_tracker.add_task(&section_key, "topology-error", "");
(*preparing_topology) = false;
progress_tracker
.fail_task("topology-error", &message.unwrap_or("".into()));
} }
} }
} }
HarmonyEvent::InterpretExecutionStarted { HarmonyEvent::InterpretExecutionStarted {
execution_id: task_key, interpret: name,
topology, topology,
interpret: _,
score,
message, message,
} => { } => {
let is_key_topology = (*preparing_topology) let section = (*sections).get(&topology).unwrap();
&& progress_tracker.contains_section(&topology_key(&topology)); let progress_bar = progress::add_spinner(section, message);
let is_key_current_score = current_score.is_some()
&& progress_tracker
.contains_section(&score_key(&current_score.clone().unwrap()));
let is_key_score = progress_tracker.contains_section(&score_key(&score));
let section_key = if is_key_topology { (*progress_bars).insert(name, progress_bar);
topology_key(&topology)
} else if is_key_current_score {
score_key(&current_score.clone().unwrap())
} else if is_key_score {
score_key(&score)
} else {
(*current_score) = Some(score.clone());
let key = score_key(&score);
progress_tracker.add_section(
&key,
&format!(
"{} Interpreting score: {score}...",
crate::theme::EMOJI_SCORE
),
);
key
};
progress_tracker.add_task(&section_key, &task_key, &message);
} }
HarmonyEvent::InterpretExecutionFinished { HarmonyEvent::InterpretExecutionFinished {
execution_id: task_key, topology,
topology: _, interpret: name,
interpret: _,
score,
outcome, outcome,
} => { } => {
if current_score.is_some() && current_score.clone().unwrap() == score { let section = (*sections).get(&topology).unwrap();
(*current_score) = None; let progress_bar = (*progress_bars).get(&name).cloned();
}
let _ = section.clear();
match outcome { match outcome {
Ok(outcome) => match outcome.status { Ok(outcome) => {
harmony::interpret::InterpretStatus::SUCCESS => { progress::success(section, progress_bar, outcome.message);
progress_tracker.finish_task(&task_key, &outcome.message);
} }
harmony::interpret::InterpretStatus::NOOP => {
progress_tracker.skip_task(&task_key, &outcome.message);
}
_ => progress_tracker.fail_task(&task_key, &outcome.message),
},
Err(err) => { Err(err) => {
error!("Interpret error: {err}"); progress::error(section, progress_bar, err.to_string());
progress_tracker.fail_task(&task_key, &err.to_string());
} }
} }
}
HarmonyEvent::ApplicationFeatureStateChanged {
topology: _,
application,
feature,
status,
} => {
if let Some(score) = &(*current_score) {
let section_key = score_key(score);
let task_key = app_feature_key(&application, &feature);
match status { (*progress_bars).remove(&name);
ApplicationFeatureStatus::Installing => {
let message = format!("Feature '{}' installing...", feature);
progress_tracker.add_task(&section_key, &task_key, &message);
}
ApplicationFeatureStatus::Installed => {
let message = format!("Feature '{}' installed", feature);
progress_tracker.finish_task(&task_key, &message);
}
ApplicationFeatureStatus::Failed { details } => {
let message = format!(
"Feature '{}' installation failed: {}",
feature, details
);
progress_tracker.fail_task(&task_key, &message);
}
}
}
} }
} }
true true
@@ -204,15 +114,3 @@ async fn handle_events(base_progress: MultiProgress) {
}) })
.await; .await;
} }
fn topology_key(topology: &str) -> String {
format!("topology-{topology}")
}
fn score_key(score: &str) -> String {
format!("score-{score}")
}
fn app_feature_key(application: &str, feature: &str) -> String {
format!("app-{application}-{feature}")
}

View File

@@ -1,6 +1,5 @@
use clap::Parser; use clap::Parser;
use clap::builder::ArgPredicate; use clap::builder::ArgPredicate;
use harmony::instrumentation;
use harmony::inventory::Inventory; use harmony::inventory::Inventory;
use harmony::maestro::Maestro; use harmony::maestro::Maestro;
use harmony::{score::Score, topology::Topology}; use harmony::{score::Score, topology::Topology};
@@ -12,6 +11,8 @@ pub mod progress;
pub mod theme; pub mod theme;
#[cfg(feature = "tui")] #[cfg(feature = "tui")]
use harmony_tui;
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(version, about, long_about = None)] #[command(version, about, long_about = None)]
pub struct Args { pub struct Args {
@@ -72,7 +73,7 @@ fn maestro_scores_filter<T: Topology>(
} }
}; };
scores_vec return scores_vec;
} }
// TODO: consider adding doctest for this function // TODO: consider adding doctest for this function
@@ -82,7 +83,7 @@ fn list_scores_with_index<T: Topology>(scores_vec: &Vec<Box<dyn Score<T>>>) -> S
let name = s.name(); let name = s.name();
display_str.push_str(&format!("\n{i}: {name}")); display_str.push_str(&format!("\n{i}: {name}"));
} }
display_str return display_str;
} }
pub async fn run<T: Topology + Send + Sync + 'static>( pub async fn run<T: Topology + Send + Sync + 'static>(
@@ -98,7 +99,6 @@ pub async fn run<T: Topology + Send + Sync + 'static>(
let result = init(maestro, args_struct).await; let result = init(maestro, args_struct).await;
instrumentation::instrument(instrumentation::HarmonyEvent::HarmonyFinished).unwrap();
let _ = tokio::try_join!(cli_logger_handle); let _ = tokio::try_join!(cli_logger_handle);
result result
} }
@@ -126,15 +126,14 @@ async fn init<T: Topology + Send + Sync + 'static>(
let scores_vec = maestro_scores_filter(&maestro, args.all, args.filter, args.number); let scores_vec = maestro_scores_filter(&maestro, args.all, args.filter, args.number);
if scores_vec.is_empty() { if scores_vec.len() == 0 {
return Err("No score found".into()); return Err("No score found".into());
} }
// if list option is specified, print filtered list and exit // if list option is specified, print filtered list and exit
if args.list { if args.list {
let num_scores = scores_vec.len(); println!("Available scores:");
println!("Available scores {num_scores}:"); println!("{}", list_scores_with_index(&scores_vec));
println!("{}\n\n", list_scores_with_index(&scores_vec));
return Ok(()); return Ok(());
} }
@@ -166,7 +165,7 @@ async fn init<T: Topology + Send + Sync + 'static>(
} }
#[cfg(test)] #[cfg(test)]
mod tests { mod test {
use harmony::{ use harmony::{
inventory::Inventory, inventory::Inventory,
maestro::Maestro, maestro::Maestro,
@@ -266,7 +265,7 @@ mod tests {
assert!( assert!(
maestro maestro
.interpret(res.first().unwrap().clone_box()) .interpret(res.get(0).unwrap().clone_box())
.await .await
.is_ok() .is_ok()
); );
@@ -282,7 +281,7 @@ mod tests {
assert!( assert!(
maestro maestro
.interpret(res.first().unwrap().clone_box()) .interpret(res.get(0).unwrap().clone_box())
.await .await
.is_err() .is_err()
); );
@@ -298,7 +297,7 @@ mod tests {
assert!( assert!(
maestro maestro
.interpret(res.first().unwrap().clone_box()) .interpret(res.get(0).unwrap().clone_box())
.await .await
.is_ok() .is_ok()
); );
@@ -320,7 +319,7 @@ mod tests {
assert!( assert!(
maestro maestro
.interpret(res.first().unwrap().clone_box()) .interpret(res.get(0).unwrap().clone_box())
.await .await
.is_ok() .is_ok()
); );
@@ -332,6 +331,6 @@ mod tests {
let res = crate::maestro_scores_filter(&maestro, false, None, 11); let res = crate::maestro_scores_filter(&maestro, false, None, 11);
assert!(res.is_empty()); assert!(res.len() == 0);
} }
} }

View File

@@ -1,147 +1,50 @@
use indicatif::{MultiProgress, ProgressBar};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration; use std::time::Duration;
pub trait ProgressTracker: Send + Sync { use indicatif::{MultiProgress, ProgressBar};
fn contains_section(&self, id: &str) -> bool;
fn add_section(&self, id: &str, message: &str); pub fn new_section(title: String) -> MultiProgress {
fn add_task(&self, section_id: &str, task_id: &str, message: &str); let multi_progress = MultiProgress::new();
fn finish_task(&self, id: &str, message: &str); let _ = multi_progress.println(title);
fn fail_task(&self, id: &str, message: &str);
fn skip_task(&self, id: &str, message: &str); multi_progress
fn clear(&self);
} }
struct Section { pub fn add_spinner(multi_progress: &MultiProgress, message: String) -> ProgressBar {
header_index: usize, let progress = multi_progress.add(ProgressBar::new_spinner());
task_count: usize,
pb: ProgressBar, progress.set_style(crate::theme::SPINNER_STYLE.clone());
progress.set_message(message);
progress.enable_steady_tick(Duration::from_millis(100));
progress
} }
struct IndicatifProgressTrackerState { pub fn success(multi_progress: &MultiProgress, progress: Option<ProgressBar>, message: String) {
sections: HashMap<String, Section>, if let Some(progress) = progress {
tasks: HashMap<String, ProgressBar>, multi_progress.remove(&progress)
pb_count: usize, }
let progress = multi_progress.add(ProgressBar::new_spinner());
progress.set_style(crate::theme::SUCCESS_SPINNER_STYLE.clone());
progress.finish_with_message(message);
} }
#[derive(Clone)] pub fn error(multi_progress: &MultiProgress, progress: Option<ProgressBar>, message: String) {
pub struct IndicatifProgressTracker { if let Some(progress) = progress {
mp: MultiProgress, multi_progress.remove(&progress)
state: Arc<Mutex<IndicatifProgressTrackerState>>, }
let progress = multi_progress.add(ProgressBar::new_spinner());
progress.set_style(crate::theme::ERROR_SPINNER_STYLE.clone());
progress.finish_with_message(message);
} }
impl IndicatifProgressTracker { pub fn skip(multi_progress: &MultiProgress, progress: Option<ProgressBar>, message: String) {
pub fn new(base: MultiProgress) -> Self { if let Some(progress) = progress {
let sections = HashMap::new(); multi_progress.remove(&progress)
let tasks = HashMap::new();
let state = Arc::new(Mutex::new(IndicatifProgressTrackerState {
sections,
tasks,
pb_count: 0,
}));
Self { mp: base, state }
}
}
impl ProgressTracker for IndicatifProgressTracker {
fn add_section(&self, id: &str, message: &str) {
let mut state = self.state.lock().unwrap();
let header_pb = self
.mp
.add(ProgressBar::new(1).with_style(crate::theme::SECTION_STYLE.clone()));
header_pb.finish_with_message(message.to_string());
let header_index = state.pb_count;
state.pb_count += 1;
state.sections.insert(
id.to_string(),
Section {
header_index,
task_count: 0,
pb: header_pb,
},
);
}
fn add_task(&self, section_id: &str, task_id: &str, message: &str) {
let mut state = self.state.lock().unwrap();
let insertion_index = {
let current_section = state
.sections
.get(section_id)
.expect("Section ID not found");
current_section.header_index + current_section.task_count + 1 // +1 to insert after header
};
let pb = self.mp.insert(insertion_index, ProgressBar::new_spinner());
pb.set_style(crate::theme::SPINNER_STYLE.clone());
pb.set_prefix(" ");
pb.set_message(message.to_string());
pb.enable_steady_tick(Duration::from_millis(80));
state.pb_count += 1;
let section = state
.sections
.get_mut(section_id)
.expect("Section ID not found");
section.task_count += 1;
// We inserted a new progress bar, so we must update the header_index
// for all subsequent sections.
for (id, s) in state.sections.iter_mut() {
if id != section_id && s.header_index >= insertion_index {
s.header_index += 1;
}
}
state.tasks.insert(task_id.to_string(), pb);
}
fn finish_task(&self, id: &str, message: &str) {
let state = self.state.lock().unwrap();
if let Some(pb) = state.tasks.get(id) {
pb.set_style(crate::theme::SUCCESS_SPINNER_STYLE.clone());
pb.finish_with_message(message.to_string());
}
}
fn fail_task(&self, id: &str, message: &str) {
let state = self.state.lock().unwrap();
if let Some(pb) = state.tasks.get(id) {
pb.set_style(crate::theme::ERROR_SPINNER_STYLE.clone());
pb.finish_with_message(message.to_string());
}
}
fn skip_task(&self, id: &str, message: &str) {
let state = self.state.lock().unwrap();
if let Some(pb) = state.tasks.get(id) {
pb.set_style(crate::theme::SKIP_SPINNER_STYLE.clone());
pb.finish_with_message(message.to_string());
}
}
fn contains_section(&self, id: &str) -> bool {
let state = self.state.lock().unwrap();
state.sections.contains_key(id)
}
fn clear(&self) {
let mut state = self.state.lock().unwrap();
state.tasks.values().for_each(|p| self.mp.remove(p));
state.tasks.clear();
state.sections.values().for_each(|s| self.mp.remove(&s.pb));
state.sections.clear();
state.pb_count = 0;
let _ = self.mp.clear();
} }
let progress = multi_progress.add(ProgressBar::new_spinner());
progress.set_style(crate::theme::SKIP_SPINNER_STYLE.clone());
progress.finish_with_message(message);
} }

View File

@@ -8,27 +8,19 @@ pub static EMOJI_SKIP: Emoji<'_, '_> = Emoji("⏭️", "");
pub static EMOJI_ERROR: Emoji<'_, '_> = Emoji("⚠️", ""); pub static EMOJI_ERROR: Emoji<'_, '_> = Emoji("⚠️", "");
pub static EMOJI_DEPLOY: Emoji<'_, '_> = Emoji("🚀", ""); pub static EMOJI_DEPLOY: Emoji<'_, '_> = Emoji("🚀", "");
pub static EMOJI_TOPOLOGY: Emoji<'_, '_> = Emoji("📦", ""); pub static EMOJI_TOPOLOGY: Emoji<'_, '_> = Emoji("📦", "");
pub static EMOJI_SCORE: Emoji<'_, '_> = Emoji("🎶", "");
lazy_static! { lazy_static! {
pub static ref SECTION_STYLE: ProgressStyle = ProgressStyle::default_spinner()
.template("{wide_msg:.bold}")
.unwrap();
pub static ref SPINNER_STYLE: ProgressStyle = ProgressStyle::default_spinner() pub static ref SPINNER_STYLE: ProgressStyle = ProgressStyle::default_spinner()
.template(" {spinner:.green} {wide_msg}") .template(" {spinner:.green} {msg}")
.unwrap() .unwrap()
.tick_strings(&["", "", "", "", "", "", "", "", "", ""]); .tick_strings(&["", "", "", "", "", "", "", "", "", ""]);
pub static ref SUCCESS_SPINNER_STYLE: ProgressStyle = SPINNER_STYLE pub static ref SUCCESS_SPINNER_STYLE: ProgressStyle = SPINNER_STYLE
.clone() .clone()
.tick_strings(&[format!("{}", EMOJI_SUCCESS).as_str()]); .tick_strings(&[format!("{}", EMOJI_SUCCESS).as_str()]);
pub static ref SKIP_SPINNER_STYLE: ProgressStyle = ProgressStyle::default_spinner() pub static ref SKIP_SPINNER_STYLE: ProgressStyle = SPINNER_STYLE
.template(" {spinner:.orange} {wide_msg}")
.unwrap()
.clone() .clone()
.tick_strings(&[format!("{}", EMOJI_SKIP).as_str()]); .tick_strings(&[format!("{}", EMOJI_SKIP).as_str()]);
pub static ref ERROR_SPINNER_STYLE: ProgressStyle = ProgressStyle::default_spinner() pub static ref ERROR_SPINNER_STYLE: ProgressStyle = SPINNER_STYLE
.template(" {spinner:.red} {wide_msg}")
.unwrap()
.clone() .clone()
.tick_strings(&[format!("{}", EMOJI_ERROR).as_str()]); .tick_strings(&[format!("{}", EMOJI_ERROR).as_str()]);
} }

View File

@@ -1,6 +1,10 @@
use harmony_cli::progress::{IndicatifProgressTracker, ProgressTracker}; use indicatif::{MultiProgress, ProgressBar};
use indicatif::MultiProgress; use indicatif_log_bridge::LogWrapper;
use std::sync::Arc; use log::error;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use crate::instrumentation::{self, HarmonyComposerEvent}; use crate::instrumentation::{self, HarmonyComposerEvent};
@@ -18,59 +22,85 @@ pub fn init() -> tokio::task::JoinHandle<()> {
} }
fn configure_logger() { fn configure_logger() {
let logger =
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build(); env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build();
let level = logger.filter();
let multi = MultiProgress::new();
LogWrapper::new(multi.clone(), logger).try_init().unwrap();
log::set_max_level(level);
} }
pub async fn handle_events() { pub async fn handle_events() {
let progress_tracker = Arc::new(IndicatifProgressTracker::new(MultiProgress::new())); const PROGRESS_SETUP: &str = "project-initialization";
const SETUP_SECTION: &str = "project-initialization";
const COMPILTATION_TASK: &str = "compilation";
const PROGRESS_DEPLOYMENT: &str = "deployment"; const PROGRESS_DEPLOYMENT: &str = "deployment";
instrumentation::subscribe("Harmony Composer Logger", { instrumentation::subscribe("Harmony Composer Logger", {
let progresses: Arc<Mutex<HashMap<String, MultiProgress>>> =
Arc::new(Mutex::new(HashMap::new()));
let compilation_progress = Arc::new(Mutex::new(None::<ProgressBar>));
move |event| { move |event| {
let progress_tracker = Arc::clone(&progress_tracker); let progresses_clone = Arc::clone(&progresses);
let compilation_progress_clone = Arc::clone(&compilation_progress);
async move { async move {
let mut progresses_guard = progresses_clone.lock().unwrap();
let mut compilation_progress_guard = compilation_progress_clone.lock().unwrap();
match event { match event {
HarmonyComposerEvent::HarmonyComposerStarted => {} HarmonyComposerEvent::HarmonyComposerStarted => {}
HarmonyComposerEvent::ProjectInitializationStarted => { HarmonyComposerEvent::ProjectInitializationStarted => {
progress_tracker.add_section( let multi_progress = harmony_cli::progress::new_section(format!(
SETUP_SECTION,
&format!(
"{} Initializing Harmony project...", "{} Initializing Harmony project...",
harmony_cli::theme::EMOJI_HARMONY, harmony_cli::theme::EMOJI_HARMONY,
), ));
); (*progresses_guard).insert(PROGRESS_SETUP.to_string(), multi_progress);
} }
HarmonyComposerEvent::ProjectInitialized => {} HarmonyComposerEvent::ProjectInitialized => println!("\n"),
HarmonyComposerEvent::ProjectCompilationStarted { details } => { HarmonyComposerEvent::ProjectCompilationStarted { details } => {
progress_tracker.add_task(SETUP_SECTION, COMPILTATION_TASK, &details); let initialization_progress =
(*progresses_guard).get(PROGRESS_SETUP).unwrap();
let _ = initialization_progress.clear();
let progress =
harmony_cli::progress::add_spinner(initialization_progress, details);
*compilation_progress_guard = Some(progress);
} }
HarmonyComposerEvent::ProjectCompiled => { HarmonyComposerEvent::ProjectCompiled => {
progress_tracker.finish_task(COMPILTATION_TASK, "project compiled"); let initialization_progress =
} (*progresses_guard).get(PROGRESS_SETUP).unwrap();
HarmonyComposerEvent::ProjectCompilationFailed { details } => {
progress_tracker.fail_task(COMPILTATION_TASK, &format!("failed to compile project:\n{details}")); harmony_cli::progress::success(
} initialization_progress,
HarmonyComposerEvent::DeploymentStarted { target, profile } => { (*compilation_progress_guard).take(),
progress_tracker.add_section( "project compiled".to_string(),
PROGRESS_DEPLOYMENT,
&format!(
"\n{} Deploying project on target '{target}' with profile '{profile}'...\n",
harmony_cli::theme::EMOJI_DEPLOY,
),
); );
} }
HarmonyComposerEvent::DeploymentCompleted => { HarmonyComposerEvent::ProjectCompilationFailed { details } => {
progress_tracker.clear(); let initialization_progress =
(*progresses_guard).get(PROGRESS_SETUP).unwrap();
harmony_cli::progress::error(
initialization_progress,
(*compilation_progress_guard).take(),
"failed to compile project".to_string(),
);
error!("{details}");
} }
HarmonyComposerEvent::DeploymentFailed { details } => { HarmonyComposerEvent::DeploymentStarted { target } => {
progress_tracker.add_task(PROGRESS_DEPLOYMENT, "deployment-failed", ""); let multi_progress = harmony_cli::progress::new_section(format!(
progress_tracker.fail_task("deployment-failed", &details); "{} Starting deployment to {target}...\n\n",
}, harmony_cli::theme::EMOJI_DEPLOY
));
(*progresses_guard).insert(PROGRESS_DEPLOYMENT.to_string(), multi_progress);
}
HarmonyComposerEvent::DeploymentCompleted { details } => println!("\n"),
HarmonyComposerEvent::Shutdown => { HarmonyComposerEvent::Shutdown => {
for (_, progresses) in (*progresses_guard).iter() {
progresses.clear().unwrap();
}
return false; return false;
} }
} }

View File

@@ -2,28 +2,16 @@ use log::debug;
use once_cell::sync::Lazy; use once_cell::sync::Lazy;
use tokio::sync::broadcast; use tokio::sync::broadcast;
use crate::{HarmonyProfile, HarmonyTarget};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub enum HarmonyComposerEvent { pub enum HarmonyComposerEvent {
HarmonyComposerStarted, HarmonyComposerStarted,
ProjectInitializationStarted, ProjectInitializationStarted,
ProjectInitialized, ProjectInitialized,
ProjectCompilationStarted { ProjectCompilationStarted { details: String },
details: String,
},
ProjectCompiled, ProjectCompiled,
ProjectCompilationFailed { ProjectCompilationFailed { details: String },
details: String, DeploymentStarted { target: String },
}, DeploymentCompleted { details: String },
DeploymentStarted {
target: HarmonyTarget,
profile: HarmonyProfile,
},
DeploymentCompleted,
DeploymentFailed {
details: String,
},
Shutdown, Shutdown,
} }
@@ -35,19 +23,10 @@ static HARMONY_COMPOSER_EVENT_BUS: Lazy<broadcast::Sender<HarmonyComposerEvent>>
}); });
pub fn instrument(event: HarmonyComposerEvent) -> Result<(), &'static str> { pub fn instrument(event: HarmonyComposerEvent) -> Result<(), &'static str> {
#[cfg(not(test))]
{
match HARMONY_COMPOSER_EVENT_BUS.send(event) { match HARMONY_COMPOSER_EVENT_BUS.send(event) {
Ok(_) => Ok(()), Ok(_) => Ok(()),
Err(_) => Err("send error: no subscribers"), Err(_) => Err("send error: no subscribers"),
} }
}
#[cfg(test)]
{
let _ = event; // Suppress the "unused variable" warning for `event`
Ok(())
}
} }
pub async fn subscribe<F, Fut>(name: &str, mut handler: F) pub async fn subscribe<F, Fut>(name: &str, mut handler: F)

View File

@@ -20,7 +20,7 @@ mod instrumentation;
#[derive(Parser)] #[derive(Parser)]
#[command(version, about, long_about = None, flatten_help = true, propagate_version = true)] #[command(version, about, long_about = None, flatten_help = true, propagate_version = true)]
struct GlobalArgs { struct GlobalArgs {
#[arg(long, default_value = ".")] #[arg(long, default_value = "harmony")]
harmony_path: String, harmony_path: String,
#[arg(long)] #[arg(long)]
@@ -49,11 +49,14 @@ struct CheckArgs {
#[derive(Args, Clone, Debug)] #[derive(Args, Clone, Debug)]
struct DeployArgs { struct DeployArgs {
#[arg(long = "target", short = 't', default_value = "local")] #[arg(long, default_value_t = false)]
harmony_target: HarmonyTarget, staging: bool,
#[arg(long = "profile", short = 'p', default_value = "dev")] #[arg(long, default_value_t = false)]
harmony_profile: HarmonyProfile, prod: bool,
#[arg(long, default_value_t = false)]
smoke_test: bool,
} }
#[derive(Args, Clone, Debug)] #[derive(Args, Clone, Debug)]
@@ -65,38 +68,6 @@ struct AllArgs {
deploy: DeployArgs, deploy: DeployArgs,
} }
#[derive(Clone, Debug, clap::ValueEnum)]
enum HarmonyTarget {
Local,
Remote,
}
impl std::fmt::Display for HarmonyTarget {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
HarmonyTarget::Local => f.write_str("local"),
HarmonyTarget::Remote => f.write_str("remote"),
}
}
}
#[derive(Clone, Debug, clap::ValueEnum)]
enum HarmonyProfile {
Dev,
Staging,
Production,
}
impl std::fmt::Display for HarmonyProfile {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
HarmonyProfile::Dev => f.write_str("dev"),
HarmonyProfile::Staging => f.write_str("staging"),
HarmonyProfile::Production => f.write_str("production"),
}
}
}
#[tokio::main] #[tokio::main]
async fn main() { async fn main() {
let hc_logger_handle = harmony_composer_logger::init(); let hc_logger_handle = harmony_composer_logger::init();
@@ -109,13 +80,14 @@ async fn main() {
instrumentation::instrument(HarmonyComposerEvent::ProjectInitializationStarted).unwrap(); instrumentation::instrument(HarmonyComposerEvent::ProjectInitializationStarted).unwrap();
let harmony_bin_path: PathBuf = match harmony_path { let harmony_bin_path: PathBuf = match harmony_path {
true => compile_harmony( true => {
compile_harmony(
cli_args.compile_method, cli_args.compile_method,
cli_args.compile_platform, cli_args.compile_platform,
cli_args.harmony_path.clone(), cli_args.harmony_path.clone(),
) )
.await .await
.expect("couldn't compile harmony"), }
false => todo!("implement autodetect code"), false => todo!("implement autodetect code"),
}; };
@@ -151,44 +123,32 @@ async fn main() {
); );
} }
Commands::Deploy(args) => { Commands::Deploy(args) => {
let deploy = if args.staging {
instrumentation::instrument(HarmonyComposerEvent::DeploymentStarted { instrumentation::instrument(HarmonyComposerEvent::DeploymentStarted {
target: args.harmony_target.clone(), target: "staging".to_string(),
profile: args.harmony_profile.clone(),
}) })
.unwrap(); .unwrap();
todo!("implement staging deployment")
if matches!(args.harmony_profile, HarmonyProfile::Dev) } else if args.prod {
&& !matches!(args.harmony_target, HarmonyTarget::Local) instrumentation::instrument(HarmonyComposerEvent::DeploymentStarted {
{ target: "prod".to_string(),
instrumentation::instrument(HarmonyComposerEvent::DeploymentFailed { })
details: format!( .unwrap();
"Cannot run profile '{}' on target '{}'. Profile '{}' can run locally only.", todo!("implement prod deployment")
args.harmony_profile, args.harmony_target, args.harmony_profile } else {
), instrumentation::instrument(HarmonyComposerEvent::DeploymentStarted {
}).unwrap(); target: "dev".to_string(),
return; })
.unwrap();
Command::new(harmony_bin_path).arg("-y").arg("-a").spawn()
} }
.expect("failed to run harmony deploy");
let use_local_k3d = match args.harmony_target {
HarmonyTarget::Local => true,
HarmonyTarget::Remote => false,
};
let mut command = Command::new(harmony_bin_path);
command
.env("HARMONY_USE_LOCAL_K3D", format!("{use_local_k3d}"))
.env("HARMONY_PROFILE", format!("{}", args.harmony_profile))
.arg("-y")
.arg("-a");
info!("{:?}", command);
let deploy = command.spawn().expect("failed to run harmony deploy");
let deploy_output = deploy.wait_with_output().unwrap(); let deploy_output = deploy.wait_with_output().unwrap();
debug!("{}", String::from_utf8(deploy_output.stdout).unwrap()); instrumentation::instrument(HarmonyComposerEvent::DeploymentCompleted {
details: String::from_utf8(deploy_output.stdout).unwrap(),
instrumentation::instrument(HarmonyComposerEvent::DeploymentCompleted).unwrap(); })
.unwrap();
} }
Commands::All(_args) => todo!( Commands::All(_args) => todo!(
"take all previous match arms and turn them into separate functions, and call them all one after the other" "take all previous match arms and turn them into separate functions, and call them all one after the other"
@@ -213,7 +173,7 @@ async fn compile_harmony(
method: Option<CompileMethod>, method: Option<CompileMethod>,
platform: Option<String>, platform: Option<String>,
harmony_location: String, harmony_location: String,
) -> Result<PathBuf, String> { ) -> PathBuf {
let platform = match platform { let platform = match platform {
Some(p) => p, Some(p) => p,
None => current_platform::CURRENT_PLATFORM.to_string(), None => current_platform::CURRENT_PLATFORM.to_string(),
@@ -243,7 +203,6 @@ async fn compile_harmony(
details: "compiling project with cargo".to_string(), details: "compiling project with cargo".to_string(),
}) })
.unwrap(); .unwrap();
compile_cargo(platform, harmony_location).await compile_cargo(platform, harmony_location).await
} }
CompileMethod::Docker => { CompileMethod::Docker => {
@@ -251,28 +210,16 @@ async fn compile_harmony(
details: "compiling project with docker".to_string(), details: "compiling project with docker".to_string(),
}) })
.unwrap(); .unwrap();
compile_docker(platform, harmony_location).await compile_docker(platform, harmony_location).await
} }
}; };
match path {
Ok(path) => {
instrumentation::instrument(HarmonyComposerEvent::ProjectCompiled).unwrap(); instrumentation::instrument(HarmonyComposerEvent::ProjectCompiled).unwrap();
Ok(path) path
}
Err(err) => {
instrumentation::instrument(HarmonyComposerEvent::ProjectCompilationFailed {
details: err.clone(),
})
.unwrap();
Err(err)
}
}
} }
// TODO: make sure this works with cargo workspaces // TODO: make sure this works with cargo workspaces
async fn compile_cargo(platform: String, harmony_location: String) -> Result<PathBuf, String> { async fn compile_cargo(platform: String, harmony_location: String) -> PathBuf {
let metadata = MetadataCommand::new() let metadata = MetadataCommand::new()
.manifest_path(format!("{}/Cargo.toml", harmony_location)) .manifest_path(format!("{}/Cargo.toml", harmony_location))
.exec() .exec()
@@ -321,10 +268,7 @@ async fn compile_cargo(platform: String, harmony_location: String) -> Result<Pat
} }
} }
let res = cargo_build.wait(); //.expect("run cargo command failed"); cargo_build.wait().expect("run cargo command failed");
if res.is_err() {
return Err("cargo build failed".into());
}
let bin = artifacts let bin = artifacts
.last() .last()
@@ -342,10 +286,10 @@ async fn compile_cargo(platform: String, harmony_location: String) -> Result<Pat
let _copy_res = fs::copy(&bin, &bin_out).await; let _copy_res = fs::copy(&bin, &bin_out).await;
} }
Ok(bin_out) bin_out
} }
async fn compile_docker(platform: String, harmony_location: String) -> Result<PathBuf, String> { async fn compile_docker(platform: String, harmony_location: String) -> PathBuf {
let docker_client = let docker_client =
bollard::Docker::connect_with_local_defaults().expect("couldn't connect to docker"); bollard::Docker::connect_with_local_defaults().expect("couldn't connect to docker");
@@ -361,7 +305,7 @@ async fn compile_docker(platform: String, harmony_location: String) -> Result<Pa
.await .await
.expect("list containers failed"); .expect("list containers failed");
if !containers.is_empty() { if containers.len() > 0 {
docker_client docker_client
.remove_container("harmony_build", None::<RemoveContainerOptions>) .remove_container("harmony_build", None::<RemoveContainerOptions>)
.await .await
@@ -423,12 +367,12 @@ async fn compile_docker(platform: String, harmony_location: String) -> Result<Pa
} }
// wait until container is no longer running // wait until container is no longer running
while (wait.next().await).is_some() {} while let Some(_) = wait.next().await {}
// hack that should be cleaned up // hack that should be cleaned up
if platform.contains("windows") { if platform.contains("windows") {
Ok(PathBuf::from(format!("{}/harmony.exe", harmony_location))) return PathBuf::from(format!("{}/harmony.exe", harmony_location));
} else { } else {
Ok(PathBuf::from(format!("{}/harmony", harmony_location))) return PathBuf::from(format!("{}/harmony", harmony_location));
} }
} }

View File

@@ -11,13 +11,13 @@ pub fn ip(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as LitStr); let input = parse_macro_input!(input as LitStr);
let ip_str = input.value(); let ip_str = input.value();
if ip_str.parse::<std::net::Ipv4Addr>().is_ok() { if let Ok(_) = ip_str.parse::<std::net::Ipv4Addr>() {
let expanded = let expanded =
quote! { std::net::IpAddr::V4(#ip_str.parse::<std::net::Ipv4Addr>().unwrap()) }; quote! { std::net::IpAddr::V4(#ip_str.parse::<std::net::Ipv4Addr>().unwrap()) };
return TokenStream::from(expanded); return TokenStream::from(expanded);
} }
if ip_str.parse::<std::net::Ipv6Addr>().is_ok() { if let Ok(_) = ip_str.parse::<std::net::Ipv6Addr>() {
let expanded = let expanded =
quote! { std::net::IpAddr::V6(#ip_str.parse::<std::net::Ipv6Addr>().unwrap()) }; quote! { std::net::IpAddr::V6(#ip_str.parse::<std::net::Ipv6Addr>().unwrap()) };
return TokenStream::from(expanded); return TokenStream::from(expanded);
@@ -31,7 +31,7 @@ pub fn ipv4(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as LitStr); let input = parse_macro_input!(input as LitStr);
let ip_str = input.value(); let ip_str = input.value();
if ip_str.parse::<std::net::Ipv4Addr>().is_ok() { if let Ok(_) = ip_str.parse::<std::net::Ipv4Addr>() {
let expanded = quote! { #ip_str.parse::<std::net::Ipv4Addr>().unwrap() }; let expanded = quote! { #ip_str.parse::<std::net::Ipv4Addr>().unwrap() };
return TokenStream::from(expanded); return TokenStream::from(expanded);
} }
@@ -127,7 +127,7 @@ pub fn ingress_path(input: TokenStream) -> TokenStream {
match path_str.starts_with("/") { match path_str.starts_with("/") {
true => { true => {
let expanded = quote! {(#path_str.to_string()) }; let expanded = quote! {(#path_str.to_string()) };
TokenStream::from(expanded) return TokenStream::from(expanded);
} }
false => panic!("Invalid ingress path"), false => panic!("Invalid ingress path"),
} }
@@ -138,7 +138,7 @@ pub fn cidrv4(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as LitStr); let input = parse_macro_input!(input as LitStr);
let cidr_str = input.value(); let cidr_str = input.value();
if cidr_str.parse::<cidr::Ipv4Cidr>().is_ok() { if let Ok(_) = cidr_str.parse::<cidr::Ipv4Cidr>() {
let expanded = quote! { #cidr_str.parse::<cidr::Ipv4Cidr>().unwrap() }; let expanded = quote! { #cidr_str.parse::<cidr::Ipv4Cidr>().unwrap() };
return TokenStream::from(expanded); return TokenStream::from(expanded);
} }

View File

@@ -14,9 +14,9 @@ use tokio::sync::mpsc;
#[derive(Debug)] #[derive(Debug)]
enum ExecutionState { enum ExecutionState {
Initiated, INITIATED,
Running, RUNNING,
Canceled, CANCELED,
} }
struct Execution<T: Topology> { struct Execution<T: Topology> {
@@ -62,7 +62,7 @@ impl<T: Topology> ScoreListWidget<T> {
pub(crate) fn launch_execution(&mut self) { pub(crate) fn launch_execution(&mut self) {
if let Some(score) = self.get_selected_score() { if let Some(score) = self.get_selected_score() {
self.execution = Some(Execution { self.execution = Some(Execution {
state: ExecutionState::Initiated, state: ExecutionState::INITIATED,
score: score.clone_box(), score: score.clone_box(),
}); });
info!("{}\n\nConfirm Execution (Press y/n)", score.name()); info!("{}\n\nConfirm Execution (Press y/n)", score.name());
@@ -106,7 +106,7 @@ impl<T: Topology> ScoreListWidget<T> {
if let Some(execution) = &mut self.execution { if let Some(execution) = &mut self.execution {
match confirm { match confirm {
true => { true => {
execution.state = ExecutionState::Running; execution.state = ExecutionState::RUNNING;
info!("Launch execution {execution}"); info!("Launch execution {execution}");
self.sender self.sender
.send(HarmonyTuiEvent::LaunchScore(execution.score.clone_box())) .send(HarmonyTuiEvent::LaunchScore(execution.score.clone_box()))
@@ -114,7 +114,7 @@ impl<T: Topology> ScoreListWidget<T> {
.expect("Should be able to send message"); .expect("Should be able to send message");
} }
false => { false => {
execution.state = ExecutionState::Canceled; execution.state = ExecutionState::CANCELED;
info!("Execution cancelled"); info!("Execution cancelled");
self.clear_execution(); self.clear_execution();
} }
@@ -144,11 +144,7 @@ impl<T: Topology> Widget for &ScoreListWidget<T> {
Self: Sized, Self: Sized,
{ {
let mut list_state = self.list_state.write().unwrap(); let mut list_state = self.list_state.write().unwrap();
let scores_items: Vec<ListItem<'_>> = self let scores_items: Vec<ListItem<'_>> = self.scores.iter().map(score_to_list_item).collect();
.scores
.iter()
.map(|score| ListItem::new(score.name()))
.collect();
let list = List::new(scores_items) let list = List::new(scores_items)
.highlight_style(Style::new().bold().italic()) .highlight_style(Style::new().bold().italic())
.highlight_symbol("🠊 "); .highlight_symbol("🠊 ");
@@ -156,3 +152,7 @@ impl<T: Topology> Widget for &ScoreListWidget<T> {
StatefulWidget::render(list, area, buf, &mut list_state) StatefulWidget::render(list, area, buf, &mut list_state)
} }
} }
fn score_to_list_item<'a, T: Topology>(score: &'a Box<dyn Score<T>>) -> ListItem<'a> {
ListItem::new(score.name())
}

View File

@@ -1,5 +1,5 @@
use futures_util::StreamExt; use futures_util::StreamExt;
use log::{debug, warn}; use log::{debug, info, warn};
use sha2::{Digest, Sha256}; use sha2::{Digest, Sha256};
use std::io::Read; use std::io::Read;
use std::path::PathBuf; use std::path::PathBuf;
@@ -45,7 +45,7 @@ pub(crate) struct DownloadableAsset {
impl DownloadableAsset { impl DownloadableAsset {
fn verify_checksum(&self, file: PathBuf) -> bool { fn verify_checksum(&self, file: PathBuf) -> bool {
if !file.exists() { if !file.exists() {
debug!("File does not exist: {:?}", file); warn!("File does not exist: {:?}", file);
return false; return false;
} }
@@ -155,7 +155,7 @@ impl DownloadableAsset {
return Err(CHECKSUM_FAILED_MSG.to_string()); return Err(CHECKSUM_FAILED_MSG.to_string());
} }
debug!( info!(
"File downloaded and verified successfully: {}", "File downloaded and verified successfully: {}",
target_file_path.to_string_lossy() target_file_path.to_string_lossy()
); );

View File

@@ -2,7 +2,7 @@ mod downloadable_asset;
use downloadable_asset::*; use downloadable_asset::*;
use kube::Client; use kube::Client;
use log::debug; use log::{debug, warn};
use std::path::PathBuf; use std::path::PathBuf;
const K3D_BIN_FILE_NAME: &str = "k3d"; const K3D_BIN_FILE_NAME: &str = "k3d";
@@ -64,6 +64,7 @@ impl K3d {
.text() .text()
.await .await
.unwrap(); .unwrap();
println!("body: {body}");
let checksum = body let checksum = body
.lines() .lines()
@@ -103,7 +104,8 @@ impl K3d {
.get_latest() .get_latest()
.await .await
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
debug!("Got k3d releases {latest_release:#?}"); // debug!("Got k3d releases {releases:#?}");
println!("Got k3d first releases {latest_release:#?}");
Ok(latest_release) Ok(latest_release)
} }
@@ -366,7 +368,7 @@ mod test {
async fn k3d_latest_release_should_get_latest() { async fn k3d_latest_release_should_get_latest() {
let dir = get_clean_test_directory(); let dir = get_clean_test_directory();
assert!(!dir.join(K3D_BIN_FILE_NAME).exists()); assert_eq!(dir.join(K3D_BIN_FILE_NAME).exists(), false);
let k3d = K3d::new(dir.clone(), None); let k3d = K3d::new(dir.clone(), None);
let latest_release = k3d.get_latest_release_tag().await.unwrap(); let latest_release = k3d.get_latest_release_tag().await.unwrap();
@@ -380,12 +382,12 @@ mod test {
async fn k3d_download_latest_release_should_get_latest_bin() { async fn k3d_download_latest_release_should_get_latest_bin() {
let dir = get_clean_test_directory(); let dir = get_clean_test_directory();
assert!(!dir.join(K3D_BIN_FILE_NAME).exists()); assert_eq!(dir.join(K3D_BIN_FILE_NAME).exists(), false);
let k3d = K3d::new(dir.clone(), None); let k3d = K3d::new(dir.clone(), None);
let bin_file_path = k3d.download_latest_release().await.unwrap(); let bin_file_path = k3d.download_latest_release().await.unwrap();
assert_eq!(bin_file_path, dir.join(K3D_BIN_FILE_NAME)); assert_eq!(bin_file_path, dir.join(K3D_BIN_FILE_NAME));
assert!(dir.join(K3D_BIN_FILE_NAME).exists()); assert_eq!(dir.join(K3D_BIN_FILE_NAME).exists(), true);
} }
fn get_clean_test_directory() -> PathBuf { fn get_clean_test_directory() -> PathBuf {

View File

@@ -1,3 +1,4 @@
use rand;
use rand::Rng; use rand::Rng;
use xml::reader::XmlEvent as ReadEvent; use xml::reader::XmlEvent as ReadEvent;
use xml::writer::XmlEvent as WriteEvent; use xml::writer::XmlEvent as WriteEvent;
@@ -13,7 +14,7 @@ impl YaDeserializeTrait for HAProxyId {
ReadEvent::StartElement { ReadEvent::StartElement {
name, attributes, .. name, attributes, ..
} => { } => {
if !attributes.is_empty() { if attributes.len() > 0 {
return Err(String::from( return Err(String::from(
"Attributes not currently supported by HAProxyId", "Attributes not currently supported by HAProxyId",
)); ));

View File

@@ -51,7 +51,7 @@ pub struct OPNsense {
impl From<String> for OPNsense { impl From<String> for OPNsense {
fn from(content: String) -> Self { fn from(content: String) -> Self {
yaserde::de::from_str(&content) yaserde::de::from_str(&content)
.map_err(|e| println!("{}", e)) .map_err(|e| println!("{}", e.to_string()))
.expect("OPNSense received invalid string, should be full XML") .expect("OPNSense received invalid string, should be full XML")
} }
} }
@@ -59,7 +59,7 @@ impl From<String> for OPNsense {
impl OPNsense { impl OPNsense {
pub fn to_xml(&self) -> String { pub fn to_xml(&self) -> String {
to_xml_str(self) to_xml_str(self)
.map_err(|e| error!("{}", e)) .map_err(|e| error!("{}", e.to_string()))
.expect("OPNSense could not serialize to XML") .expect("OPNSense could not serialize to XML")
} }
} }

View File

@@ -6,7 +6,7 @@ readme.workspace = true
license.workspace = true license.workspace = true
[dependencies] [dependencies]
serde = { version = "1.0.123", features = ["derive"] } serde = { version = "1.0.123", features = [ "derive" ] }
log = { workspace = true } log = { workspace = true }
env_logger = { workspace = true } env_logger = { workspace = true }
russh = { workspace = true } russh = { workspace = true }
@@ -18,11 +18,8 @@ opnsense-config-xml = { path = "../opnsense-config-xml" }
chrono = "0.4.38" chrono = "0.4.38"
russh-sftp = "2.0.6" russh-sftp = "2.0.6"
serde_json = "1.0.133" serde_json = "1.0.133"
tokio-util = { version = "0.7.13", features = ["codec"] } tokio-util = { version = "0.7.13", features = [ "codec" ] }
tokio-stream = "0.1.17" tokio-stream = "0.1.17"
[dev-dependencies] [dev-dependencies]
pretty_assertions.workspace = true pretty_assertions.workspace = true
[lints.rust]
unexpected_cfgs = { level = "warn", check-cfg = ['cfg(e2e_test)'] }

View File

@@ -210,7 +210,7 @@ mod tests {
#[tokio::test] #[tokio::test]
async fn test_load_config_from_local_file() { async fn test_load_config_from_local_file() {
for path in [ for path in vec![
"src/tests/data/config-opnsense-25.1.xml", "src/tests/data/config-opnsense-25.1.xml",
"src/tests/data/config-vm-test.xml", "src/tests/data/config-vm-test.xml",
"src/tests/data/config-structure.xml", "src/tests/data/config-structure.xml",
@@ -236,9 +236,9 @@ mod tests {
// Since the order of all fields is not always the same in opnsense config files // Since the order of all fields is not always the same in opnsense config files
// I think it is good enough to have exactly the same amount of the same lines // I think it is good enough to have exactly the same amount of the same lines
[config_file_str.lines().collect::<Vec<_>>()].sort(); let config_file_str_sorted = vec![config_file_str.lines().collect::<Vec<_>>()].sort();
[config_file_str.lines().collect::<Vec<_>>()].sort(); let serialized_sorted = vec![config_file_str.lines().collect::<Vec<_>>()].sort();
assert_eq!((), ()); assert_eq!(config_file_str_sorted, serialized_sorted);
} }
} }
@@ -292,7 +292,7 @@ mod tests {
/// ///
/// * `true` if the package name is found in the CSV string, `false` otherwise. /// * `true` if the package name is found in the CSV string, `false` otherwise.
fn is_package_in_csv(csv_string: &str, package_name: &str) -> bool { fn is_package_in_csv(csv_string: &str, package_name: &str) -> bool {
!package_name.is_empty() && csv_string.split(',').any(|pkg| pkg.trim() == package_name) package_name.len() > 0 && csv_string.split(',').any(|pkg| pkg.trim() == package_name)
} }
#[cfg(test)] #[cfg(test)]

View File

@@ -45,7 +45,7 @@ impl SshConfigManager {
async fn reload_all_services(&self) -> Result<String, Error> { async fn reload_all_services(&self) -> Result<String, Error> {
info!("Reloading all opnsense services"); info!("Reloading all opnsense services");
self.opnsense_shell self.opnsense_shell
.exec("configctl service reload all") .exec(&format!("configctl service reload all"))
.await .await
} }
} }

View File

@@ -1,4 +1,3 @@
#[allow(clippy::module_inception)]
mod config; mod config;
mod manager; mod manager;
mod shell; mod shell;

View File

@@ -107,7 +107,7 @@ impl OPNsenseShell for SshOPNSenseShell {
match result { match result {
Ok(bytes) => { Ok(bytes) => {
if !bytes.is_empty() { if !bytes.is_empty() {
AsyncWriteExt::write_all(&mut remote_file, &bytes).await?; remote_file.write(&bytes).await?;
} }
} }
Err(e) => todo!("Error unhandled {e}"), Err(e) => todo!("Error unhandled {e}"),
@@ -194,9 +194,9 @@ async fn wait_for_completion(channel: &mut Channel<Msg>) -> Result<String, Error
))); )));
} }
} }
russh::ChannelMsg::Success russh::ChannelMsg::Success { .. }
| russh::ChannelMsg::WindowAdjusted { .. } | russh::ChannelMsg::WindowAdjusted { .. }
| russh::ChannelMsg::Eof => {} | russh::ChannelMsg::Eof { .. } => {}
_ => { _ => {
return Err(Error::Unexpected(format!( return Err(Error::Unexpected(format!(
"Russh got unexpected msg {msg:?}" "Russh got unexpected msg {msg:?}"

View File

@@ -64,7 +64,7 @@ impl<'a> DhcpConfig<'a> {
.dhcpd .dhcpd
.elements .elements
.iter_mut() .iter_mut()
.find(|(name, _config)| name == "lan") .find(|(name, _config)| return name == "lan")
.expect("Interface lan should have dhcpd activated") .expect("Interface lan should have dhcpd activated")
.1 .1
} }
@@ -93,7 +93,11 @@ impl<'a> DhcpConfig<'a> {
== ipaddr == ipaddr
&& m.mac == mac && m.mac == mac
}) { }) {
info!("Mapping already exists for {} [{}], skipping", ipaddr, mac); info!(
"Mapping already exists for {} [{}], skipping",
ipaddr.to_string(),
mac
);
return Ok(()); return Ok(());
} }
@@ -141,8 +145,9 @@ impl<'a> DhcpConfig<'a> {
.exec("configctl dhcpd list static") .exec("configctl dhcpd list static")
.await?; .await?;
let value: serde_json::Value = serde_json::from_str(&list_static_output) let value: serde_json::Value = serde_json::from_str(&list_static_output).expect(&format!(
.unwrap_or_else(|_| panic!("Got invalid json from configctl {list_static_output}")); "Got invalid json from configctl {list_static_output}"
));
let static_maps = value["dhcpd"] let static_maps = value["dhcpd"]
.as_array() .as_array()
.ok_or(Error::Command(format!( .ok_or(Error::Command(format!(