diff --git a/Cargo.lock b/Cargo.lock index 429f09b..666fe3a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4613,6 +4613,15 @@ version = "0.8.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" +[[package]] +name = "remove_rook_osd" +version = "0.1.0" +dependencies = [ + "harmony", + "harmony_cli", + "tokio", +] + [[package]] name = "reqwest" version = "0.11.27" diff --git a/examples/remove_rook_osd/Cargo.toml b/examples/remove_rook_osd/Cargo.toml new file mode 100644 index 0000000..6e35ac0 --- /dev/null +++ b/examples/remove_rook_osd/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "example-remove-rook-osd" +edition = "2024" +version.workspace = true +readme.workspace = true +license.workspace = true + +[dependencies] +harmony = { version = "0.1.0", path = "../../harmony" } +harmony_cli = { version = "0.1.0", path = "../../harmony_cli" } +tokio.workspace = true diff --git a/examples/remove_rook_osd/src/main.rs b/examples/remove_rook_osd/src/main.rs new file mode 100644 index 0000000..2794927 --- /dev/null +++ b/examples/remove_rook_osd/src/main.rs @@ -0,0 +1,18 @@ +use harmony::{ + inventory::Inventory, modules::storage::ceph::ceph_remove_osd_score::CephRemoveOsd, + topology::K8sAnywhereTopology, +}; + +#[tokio::main] +async fn main() { + let ceph_score = CephRemoveOsd { + osd_deployment_name: "rook-ceph-osd-2".to_string(), + rook_ceph_namespace: "rook-ceph".to_string(), + }; + + let topology = K8sAnywhereTopology::from_env(); + let inventory = Inventory::autoload(); + harmony_cli::run(inventory, topology, vec![Box::new(ceph_score)], None) + .await + .unwrap(); +} diff --git a/harmony/src/domain/interpret/mod.rs b/harmony/src/domain/interpret/mod.rs index d555d9e..ad3dac3 100644 --- a/harmony/src/domain/interpret/mod.rs +++ b/harmony/src/domain/interpret/mod.rs @@ -30,6 +30,7 @@ pub enum InterpretName { Lamp, ApplicationMonitoring, K8sPrometheusCrdAlerting, + CephRemoveOsd, DiscoverInventoryAgent, CephClusterHealth, Custom(&'static str), @@ -61,7 +62,11 @@ impl std::fmt::Display for InterpretName { InterpretName::Lamp => f.write_str("LAMP"), InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"), InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"), +<<<<<<< HEAD + InterpretName::CephRemoveOsd => f.write_str("CephRemoveOsd"), +======= InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"), +>>>>>>> origin/master InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"), InterpretName::Custom(name) => f.write_str(name), InterpretName::RHOBAlerting => f.write_str("RHOBAlerting"), diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 4f99eda..fc96d76 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -19,7 +19,7 @@ use kube::{ api::{ApiResource, GroupVersionKind}, runtime::wait::await_condition, }; -use log::{debug, error, trace}; +use log::{debug, error, info, trace}; use serde::{Serialize, de::DeserializeOwned}; use serde_json::{Value, json}; use similar::TextDiff; @@ -89,10 +89,13 @@ impl K8sClient { namespace: Option<&str>, ) -> Result, Error> { let deps: Api = if let Some(ns) = namespace { + debug!("getting namespaced deployment"); Api::namespaced(self.client.clone(), ns) } else { + debug!("getting default namespace deployment"); Api::default_namespaced(self.client.clone()) }; + debug!("getting deployment {} in ns {}", name, namespace.unwrap()); Ok(deps.get_opt(name).await?) } @@ -123,7 +126,7 @@ impl K8sClient { } }); let pp = PatchParams::default(); - let scale = Patch::Apply(&patch); + let scale = Patch::Merge(&patch); deployments.patch_scale(name, &pp, &scale).await?; Ok(()) } diff --git a/harmony/src/modules/cert_manager/cluster_issuer.rs b/harmony/src/modules/cert_manager/cluster_issuer.rs new file mode 100644 index 0000000..70294fe --- /dev/null +++ b/harmony/src/modules/cert_manager/cluster_issuer.rs @@ -0,0 +1,209 @@ +use std::sync::Arc; + +use async_trait::async_trait; +use harmony_types::id::Id; +use kube::{CustomResource, api::ObjectMeta}; +use schemars::JsonSchema; +use serde::{Deserialize, Serialize}; + +use crate::{ + data::Version, + interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, + inventory::Inventory, + score::Score, + topology::{K8sclient, Topology, k8s::K8sClient}, +}; + +#[derive(Clone, Debug, Serialize)] +pub struct ClusterIssuerScore { + email: String, + server: String, + issuer_name: String, + namespace: String, +} + +impl Score for ClusterIssuerScore { + fn name(&self) -> String { + "ClusterIssuerScore".to_string() + } + + #[doc(hidden)] + fn create_interpret(&self) -> Box> { + Box::new(ClusterIssuerInterpret { + score: self.clone(), + }) + } +} + +#[derive(Debug, Clone)] +pub struct ClusterIssuerInterpret { + score: ClusterIssuerScore, +} + +#[async_trait] +impl Interpret for ClusterIssuerInterpret { + async fn execute( + &self, + _inventory: &Inventory, + topology: &T, + ) -> Result { + self.apply_cluster_issuer(topology.k8s_client().await.unwrap()) + .await + } + + fn get_name(&self) -> InterpretName { + InterpretName::Custom("ClusterIssuer") + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +} + +impl ClusterIssuerInterpret { + async fn validate_cert_manager( + &self, + client: &Arc, + ) -> Result { + let cert_manager = "cert-manager".to_string(); + let operator_namespace = "openshift-operators".to_string(); + match client + .get_deployment(&cert_manager, Some(&operator_namespace)) + .await + { + Ok(Some(deployment)) => { + if let Some(status) = deployment.status { + let ready_count = status.ready_replicas.unwrap_or(0); + if ready_count >= 1 { + return Ok(Outcome::success(format!( + "'{}' is ready with {} replica(s).", + &cert_manager, ready_count + ))); + } else { + return Err(InterpretError::new( + "cert-manager operator not ready in cluster".to_string(), + )); + } + } else { + Err(InterpretError::new(format!( + "failed to get deployment status {} in ns {}", + &cert_manager, &operator_namespace + ))) + } + } + Ok(None) => Err(InterpretError::new(format!( + "Deployment '{}' not found in namespace '{}'.", + &cert_manager, &operator_namespace + ))), + Err(e) => Err(InterpretError::new(format!( + "Failed to query for deployment '{}': {}", + &cert_manager, e + ))), + } + } + + fn build_cluster_issuer(&self) -> Result { + let issuer_name = &self.score.issuer_name; + let email = &self.score.email; + let server = &self.score.server; + let namespace = &self.score.namespace; + let cluster_issuer = ClusterIssuer { + metadata: ObjectMeta { + name: Some(issuer_name.to_string()), + namespace: Some(namespace.to_string()), + ..Default::default() + }, + spec: ClusterIssuerSpec { + acme: AcmeSpec { + email: email.to_string(), + private_key_secret_ref: PrivateKeySecretRef { + name: issuer_name.to_string(), + }, + server: server.to_string(), + solvers: vec![SolverSpec { + http01: Some(Http01Solver { + ingress: Http01Ingress { + class: "nginx".to_string(), + }, + }), + }], + }, + }, + }; + + Ok(cluster_issuer) + } + + pub async fn apply_cluster_issuer( + &self, + client: Arc, + ) -> Result { + let namespace = self.score.namespace.clone(); + self.validate_cert_manager(&client).await?; + let cluster_issuer = self.build_cluster_issuer().unwrap(); + client + .apply_yaml( + &serde_yaml::to_value(cluster_issuer).unwrap(), + Some(&namespace), + ) + .await?; + Ok(Outcome::success(format!( + "successfully deployed cluster operator: {} in namespace: {}", + self.score.issuer_name, self.score.namespace + ))) + } +} + +#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[kube( + group = "cert-manager.io", + version = "v1", + kind = "ClusterIssuer", + plural = "clusterissuers" +)] +#[serde(rename_all = "camelCase")] +pub struct ClusterIssuerSpec { + pub acme: AcmeSpec, +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct AcmeSpec { + pub email: String, + pub private_key_secret_ref: PrivateKeySecretRef, + pub server: String, + pub solvers: Vec, +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct PrivateKeySecretRef { + pub name: String, +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct SolverSpec { + pub http01: Option, + // Other solver types (e.g., dns01) would go here as Options +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct Http01Solver { + pub ingress: Http01Ingress, +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct Http01Ingress { + pub class: String, +} diff --git a/harmony/src/modules/cert_manager/mod.rs b/harmony/src/modules/cert_manager/mod.rs index 8fd309a..032439e 100644 --- a/harmony/src/modules/cert_manager/mod.rs +++ b/harmony/src/modules/cert_manager/mod.rs @@ -1,2 +1,3 @@ +pub mod cluster_issuer; mod helm; pub use helm::*; diff --git a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs b/harmony/src/modules/storage/ceph/ceph_remove_osd_score.rs similarity index 90% rename from harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs rename to harmony/src/modules/storage/ceph/ceph_remove_osd_score.rs index 77dd24a..787f9cc 100644 --- a/harmony/src/modules/storage/ceph/ceph_osd_replacement_score.rs +++ b/harmony/src/modules/storage/ceph/ceph_remove_osd_score.rs @@ -4,7 +4,7 @@ use std::{ }; use async_trait::async_trait; -use log::{info, warn}; +use log::{debug, warn}; use serde::{Deserialize, Serialize}; use tokio::time::sleep; @@ -19,8 +19,8 @@ use harmony_types::id::Id; #[derive(Debug, Clone, Serialize)] pub struct CephRemoveOsd { - osd_deployment_name: String, - rook_ceph_namespace: String, + pub osd_deployment_name: String, + pub rook_ceph_namespace: String, } impl Score for CephRemoveOsd { @@ -54,18 +54,17 @@ impl Interpret for CephRemoveOsdInterpret { self.verify_deployment_scaled(client.clone()).await?; self.delete_deployment(client.clone()).await?; self.verify_deployment_deleted(client.clone()).await?; - let osd_id_full = self.get_ceph_osd_id().unwrap(); - self.purge_ceph_osd(client.clone(), &osd_id_full).await?; - self.verify_ceph_osd_removal(client.clone(), &osd_id_full) - .await?; + self.purge_ceph_osd(client.clone()).await?; + self.verify_ceph_osd_removal(client.clone()).await?; + let osd_id_full = self.get_ceph_osd_id().unwrap(); Ok(Outcome::success(format!( "Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}", osd_id_full, self.score.osd_deployment_name ))) } fn get_name(&self) -> InterpretName { - todo!() + InterpretName::CephRemoveOsd } fn get_version(&self) -> Version { @@ -82,7 +81,7 @@ impl Interpret for CephRemoveOsdInterpret { } impl CephRemoveOsdInterpret { - pub fn get_ceph_osd_id(&self) -> Result { + pub fn get_ceph_osd_id_numeric(&self) -> Result { let osd_id_numeric = self .score .osd_deployment_name @@ -94,9 +93,14 @@ impl CephRemoveOsdInterpret { self.score.osd_deployment_name )) })?; + Ok(osd_id_numeric.to_string()) + } + + pub fn get_ceph_osd_id(&self) -> Result { + let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap(); let osd_id_full = format!("osd.{}", osd_id_numeric); - info!( + debug!( "Targeting Ceph OSD: {} (parsed from deployment {})", osd_id_full, self.score.osd_deployment_name ); @@ -108,6 +112,7 @@ impl CephRemoveOsdInterpret { &self, client: Arc, ) -> Result { + debug!("verifying toolbox exists"); let toolbox_dep = "rook-ceph-tools".to_string(); match client @@ -149,7 +154,7 @@ impl CephRemoveOsdInterpret { &self, client: Arc, ) -> Result { - info!( + debug!( "Scaling down OSD deployment: {}", self.score.osd_deployment_name ); @@ -172,7 +177,7 @@ impl CephRemoveOsdInterpret { ) -> Result { let (timeout, interval, start) = self.build_timer(); - info!("Waiting for OSD deployment to scale down to 0 replicas"); + debug!("Waiting for OSD deployment to scale down to 0 replicas"); loop { let dep = client .get_deployment( @@ -180,11 +185,9 @@ impl CephRemoveOsdInterpret { Some(&self.score.rook_ceph_namespace), ) .await?; - if let Some(deployment) = dep { if let Some(status) = deployment.status { - if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0 - { + if status.replicas == None && status.ready_replicas == None { return Ok(Outcome::success( "Deployment successfully scaled down.".to_string(), )); @@ -212,7 +215,7 @@ impl CephRemoveOsdInterpret { &self, client: Arc, ) -> Result { - info!( + debug!( "Deleting OSD deployment: {}", self.score.osd_deployment_name ); @@ -234,7 +237,7 @@ impl CephRemoveOsdInterpret { ) -> Result { let (timeout, interval, start) = self.build_timer(); - info!("Waiting for OSD deployment to scale down to 0 replicas"); + debug!("Verifying OSD deployment deleted"); loop { let dep = client .get_deployment( @@ -244,7 +247,7 @@ impl CephRemoveOsdInterpret { .await?; if dep.is_none() { - info!( + debug!( "Deployment {} successfully deleted.", self.score.osd_deployment_name ); @@ -276,12 +279,10 @@ impl CephRemoveOsdInterpret { Ok(tree) } - pub async fn purge_ceph_osd( - &self, - client: Arc, - osd_id_full: &str, - ) -> Result { - info!( + pub async fn purge_ceph_osd(&self, client: Arc) -> Result { + let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap(); + let osd_id_full = self.get_ceph_osd_id().unwrap(); + debug!( "Purging OSD {} from Ceph cluster and removing its auth key", osd_id_full ); @@ -291,8 +292,9 @@ impl CephRemoveOsdInterpret { "app".to_string(), Some(&self.score.rook_ceph_namespace), vec![ - format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(), - format!("ceph auth del osd.{osd_id_full}").as_str(), + "sh", + "-c", + format!("ceph osd purge {osd_id_numeric} --yes-i-really-mean-it && ceph auth del {osd_id_full}").as_str(), ], ) .await?; @@ -305,10 +307,10 @@ impl CephRemoveOsdInterpret { pub async fn verify_ceph_osd_removal( &self, client: Arc, - osd_id_full: &str, ) -> Result { let (timeout, interval, start) = self.build_timer(); - info!( + let osd_id_full = self.get_ceph_osd_id().unwrap(); + debug!( "Verifying OSD {} has been removed from the Ceph tree...", osd_id_full ); @@ -318,7 +320,7 @@ impl CephRemoveOsdInterpret { "rook-ceph-tools".to_string(), "app".to_string(), Some(&self.score.rook_ceph_namespace), - vec!["ceph osd tree -f json"], + vec!["sh", "-c", "ceph osd tree -f json"], ) .await?; let tree = diff --git a/harmony/src/modules/storage/ceph/mod.rs b/harmony/src/modules/storage/ceph/mod.rs index 3e3250e..0a3dcec 100644 --- a/harmony/src/modules/storage/ceph/mod.rs +++ b/harmony/src/modules/storage/ceph/mod.rs @@ -1,2 +1,2 @@ -pub mod ceph_osd_replacement_score; +pub mod ceph_remove_osd_score; pub mod ceph_validate_health_score;