Compare commits

..

3 Commits

Author SHA1 Message Date
2a520a1d7c Merge branch 'master' into feat/detect_k8s_flavour
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-10-21 15:56:18 +00:00
987f195e2f feat(cert-manager): add cluster issuer to okd cluster score (#157)
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
added score to install okd cluster issuer

Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/157
2025-10-21 15:55:55 +00:00
14d1823d15 fix: remove ceph osd deletes and purges osd from ceph osd tree\ (#120)
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
k8s returns None rather than zero when checking deployment for replicas
exec_app requires commands 's' and '-c' to run correctly

Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/120
Co-authored-by: Willem <wrolleman@nationtech.io>
Co-committed-by: Willem <wrolleman@nationtech.io>
2025-10-21 15:54:51 +00:00
9 changed files with 290 additions and 32 deletions

9
Cargo.lock generated
View File

@ -4613,6 +4613,15 @@ version = "0.8.6"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001" checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001"
[[package]]
name = "remove_rook_osd"
version = "0.1.0"
dependencies = [
"harmony",
"harmony_cli",
"tokio",
]
[[package]] [[package]]
name = "reqwest" name = "reqwest"
version = "0.11.27" version = "0.11.27"

View File

@ -0,0 +1,11 @@
[package]
name = "example-remove-rook-osd"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { version = "0.1.0", path = "../../harmony" }
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
tokio.workspace = true

View File

@ -0,0 +1,18 @@
use harmony::{
inventory::Inventory, modules::storage::ceph::ceph_remove_osd_score::CephRemoveOsd,
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let ceph_score = CephRemoveOsd {
osd_deployment_name: "rook-ceph-osd-2".to_string(),
rook_ceph_namespace: "rook-ceph".to_string(),
};
let topology = K8sAnywhereTopology::from_env();
let inventory = Inventory::autoload();
harmony_cli::run(inventory, topology, vec![Box::new(ceph_score)], None)
.await
.unwrap();
}

View File

@ -30,6 +30,7 @@ pub enum InterpretName {
Lamp, Lamp,
ApplicationMonitoring, ApplicationMonitoring,
K8sPrometheusCrdAlerting, K8sPrometheusCrdAlerting,
CephRemoveOsd,
DiscoverInventoryAgent, DiscoverInventoryAgent,
CephClusterHealth, CephClusterHealth,
Custom(&'static str), Custom(&'static str),
@ -61,7 +62,11 @@ impl std::fmt::Display for InterpretName {
InterpretName::Lamp => f.write_str("LAMP"), InterpretName::Lamp => f.write_str("LAMP"),
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"), InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"), InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
<<<<<<< HEAD
InterpretName::CephRemoveOsd => f.write_str("CephRemoveOsd"),
=======
InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"), InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"),
>>>>>>> origin/master
InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"), InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"),
InterpretName::Custom(name) => f.write_str(name), InterpretName::Custom(name) => f.write_str(name),
InterpretName::RHOBAlerting => f.write_str("RHOBAlerting"), InterpretName::RHOBAlerting => f.write_str("RHOBAlerting"),

View File

@ -19,7 +19,7 @@ use kube::{
api::{ApiResource, GroupVersionKind}, api::{ApiResource, GroupVersionKind},
runtime::wait::await_condition, runtime::wait::await_condition,
}; };
use log::{debug, error, trace}; use log::{debug, error, info, trace};
use serde::{Serialize, de::DeserializeOwned}; use serde::{Serialize, de::DeserializeOwned};
use serde_json::{Value, json}; use serde_json::{Value, json};
use similar::TextDiff; use similar::TextDiff;
@ -89,10 +89,13 @@ impl K8sClient {
namespace: Option<&str>, namespace: Option<&str>,
) -> Result<Option<Deployment>, Error> { ) -> Result<Option<Deployment>, Error> {
let deps: Api<Deployment> = if let Some(ns) = namespace { let deps: Api<Deployment> = if let Some(ns) = namespace {
debug!("getting namespaced deployment");
Api::namespaced(self.client.clone(), ns) Api::namespaced(self.client.clone(), ns)
} else { } else {
debug!("getting default namespace deployment");
Api::default_namespaced(self.client.clone()) Api::default_namespaced(self.client.clone())
}; };
debug!("getting deployment {} in ns {}", name, namespace.unwrap());
Ok(deps.get_opt(name).await?) Ok(deps.get_opt(name).await?)
} }
@ -123,7 +126,7 @@ impl K8sClient {
} }
}); });
let pp = PatchParams::default(); let pp = PatchParams::default();
let scale = Patch::Apply(&patch); let scale = Patch::Merge(&patch);
deployments.patch_scale(name, &pp, &scale).await?; deployments.patch_scale(name, &pp, &scale).await?;
Ok(()) Ok(())
} }

View File

@ -0,0 +1,209 @@
use std::sync::Arc;
use async_trait::async_trait;
use harmony_types::id::Id;
use kube::{CustomResource, api::ObjectMeta};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
};
#[derive(Clone, Debug, Serialize)]
pub struct ClusterIssuerScore {
email: String,
server: String,
issuer_name: String,
namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for ClusterIssuerScore {
fn name(&self) -> String {
"ClusterIssuerScore".to_string()
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(ClusterIssuerInterpret {
score: self.clone(),
})
}
}
#[derive(Debug, Clone)]
pub struct ClusterIssuerInterpret {
score: ClusterIssuerScore,
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for ClusterIssuerInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
self.apply_cluster_issuer(topology.k8s_client().await.unwrap())
.await
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("ClusterIssuer")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl ClusterIssuerInterpret {
async fn validate_cert_manager(
&self,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let cert_manager = "cert-manager".to_string();
let operator_namespace = "openshift-operators".to_string();
match client
.get_deployment(&cert_manager, Some(&operator_namespace))
.await
{
Ok(Some(deployment)) => {
if let Some(status) = deployment.status {
let ready_count = status.ready_replicas.unwrap_or(0);
if ready_count >= 1 {
return Ok(Outcome::success(format!(
"'{}' is ready with {} replica(s).",
&cert_manager, ready_count
)));
} else {
return Err(InterpretError::new(
"cert-manager operator not ready in cluster".to_string(),
));
}
} else {
Err(InterpretError::new(format!(
"failed to get deployment status {} in ns {}",
&cert_manager, &operator_namespace
)))
}
}
Ok(None) => Err(InterpretError::new(format!(
"Deployment '{}' not found in namespace '{}'.",
&cert_manager, &operator_namespace
))),
Err(e) => Err(InterpretError::new(format!(
"Failed to query for deployment '{}': {}",
&cert_manager, e
))),
}
}
fn build_cluster_issuer(&self) -> Result<ClusterIssuer, InterpretError> {
let issuer_name = &self.score.issuer_name;
let email = &self.score.email;
let server = &self.score.server;
let namespace = &self.score.namespace;
let cluster_issuer = ClusterIssuer {
metadata: ObjectMeta {
name: Some(issuer_name.to_string()),
namespace: Some(namespace.to_string()),
..Default::default()
},
spec: ClusterIssuerSpec {
acme: AcmeSpec {
email: email.to_string(),
private_key_secret_ref: PrivateKeySecretRef {
name: issuer_name.to_string(),
},
server: server.to_string(),
solvers: vec![SolverSpec {
http01: Some(Http01Solver {
ingress: Http01Ingress {
class: "nginx".to_string(),
},
}),
}],
},
},
};
Ok(cluster_issuer)
}
pub async fn apply_cluster_issuer(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let namespace = self.score.namespace.clone();
self.validate_cert_manager(&client).await?;
let cluster_issuer = self.build_cluster_issuer().unwrap();
client
.apply_yaml(
&serde_yaml::to_value(cluster_issuer).unwrap(),
Some(&namespace),
)
.await?;
Ok(Outcome::success(format!(
"successfully deployed cluster operator: {} in namespace: {}",
self.score.issuer_name, self.score.namespace
)))
}
}
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "cert-manager.io",
version = "v1",
kind = "ClusterIssuer",
plural = "clusterissuers"
)]
#[serde(rename_all = "camelCase")]
pub struct ClusterIssuerSpec {
pub acme: AcmeSpec,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct AcmeSpec {
pub email: String,
pub private_key_secret_ref: PrivateKeySecretRef,
pub server: String,
pub solvers: Vec<SolverSpec>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct PrivateKeySecretRef {
pub name: String,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct SolverSpec {
pub http01: Option<Http01Solver>,
// Other solver types (e.g., dns01) would go here as Options
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Http01Solver {
pub ingress: Http01Ingress,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct Http01Ingress {
pub class: String,
}

View File

@ -1,2 +1,3 @@
pub mod cluster_issuer;
mod helm; mod helm;
pub use helm::*; pub use helm::*;

View File

@ -4,7 +4,7 @@ use std::{
}; };
use async_trait::async_trait; use async_trait::async_trait;
use log::{info, warn}; use log::{debug, warn};
use serde::{Deserialize, Serialize}; use serde::{Deserialize, Serialize};
use tokio::time::sleep; use tokio::time::sleep;
@ -19,8 +19,8 @@ use harmony_types::id::Id;
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct CephRemoveOsd { pub struct CephRemoveOsd {
osd_deployment_name: String, pub osd_deployment_name: String,
rook_ceph_namespace: String, pub rook_ceph_namespace: String,
} }
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd { impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
@ -54,18 +54,17 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
self.verify_deployment_scaled(client.clone()).await?; self.verify_deployment_scaled(client.clone()).await?;
self.delete_deployment(client.clone()).await?; self.delete_deployment(client.clone()).await?;
self.verify_deployment_deleted(client.clone()).await?; self.verify_deployment_deleted(client.clone()).await?;
let osd_id_full = self.get_ceph_osd_id().unwrap(); self.purge_ceph_osd(client.clone()).await?;
self.purge_ceph_osd(client.clone(), &osd_id_full).await?; self.verify_ceph_osd_removal(client.clone()).await?;
self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
.await?;
let osd_id_full = self.get_ceph_osd_id().unwrap();
Ok(Outcome::success(format!( Ok(Outcome::success(format!(
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}", "Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
osd_id_full, self.score.osd_deployment_name osd_id_full, self.score.osd_deployment_name
))) )))
} }
fn get_name(&self) -> InterpretName { fn get_name(&self) -> InterpretName {
todo!() InterpretName::CephRemoveOsd
} }
fn get_version(&self) -> Version { fn get_version(&self) -> Version {
@ -82,7 +81,7 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
} }
impl CephRemoveOsdInterpret { impl CephRemoveOsdInterpret {
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> { pub fn get_ceph_osd_id_numeric(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self let osd_id_numeric = self
.score .score
.osd_deployment_name .osd_deployment_name
@ -94,9 +93,14 @@ impl CephRemoveOsdInterpret {
self.score.osd_deployment_name self.score.osd_deployment_name
)) ))
})?; })?;
Ok(osd_id_numeric.to_string())
}
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
let osd_id_full = format!("osd.{}", osd_id_numeric); let osd_id_full = format!("osd.{}", osd_id_numeric);
info!( debug!(
"Targeting Ceph OSD: {} (parsed from deployment {})", "Targeting Ceph OSD: {} (parsed from deployment {})",
osd_id_full, self.score.osd_deployment_name osd_id_full, self.score.osd_deployment_name
); );
@ -108,6 +112,7 @@ impl CephRemoveOsdInterpret {
&self, &self,
client: Arc<K8sClient>, client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
debug!("verifying toolbox exists");
let toolbox_dep = "rook-ceph-tools".to_string(); let toolbox_dep = "rook-ceph-tools".to_string();
match client match client
@ -149,7 +154,7 @@ impl CephRemoveOsdInterpret {
&self, &self,
client: Arc<K8sClient>, client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
info!( debug!(
"Scaling down OSD deployment: {}", "Scaling down OSD deployment: {}",
self.score.osd_deployment_name self.score.osd_deployment_name
); );
@ -172,7 +177,7 @@ impl CephRemoveOsdInterpret {
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer(); let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas"); debug!("Waiting for OSD deployment to scale down to 0 replicas");
loop { loop {
let dep = client let dep = client
.get_deployment( .get_deployment(
@ -180,11 +185,9 @@ impl CephRemoveOsdInterpret {
Some(&self.score.rook_ceph_namespace), Some(&self.score.rook_ceph_namespace),
) )
.await?; .await?;
if let Some(deployment) = dep { if let Some(deployment) = dep {
if let Some(status) = deployment.status { if let Some(status) = deployment.status {
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0 if status.replicas == None && status.ready_replicas == None {
{
return Ok(Outcome::success( return Ok(Outcome::success(
"Deployment successfully scaled down.".to_string(), "Deployment successfully scaled down.".to_string(),
)); ));
@ -212,7 +215,7 @@ impl CephRemoveOsdInterpret {
&self, &self,
client: Arc<K8sClient>, client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
info!( debug!(
"Deleting OSD deployment: {}", "Deleting OSD deployment: {}",
self.score.osd_deployment_name self.score.osd_deployment_name
); );
@ -234,7 +237,7 @@ impl CephRemoveOsdInterpret {
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer(); let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas"); debug!("Verifying OSD deployment deleted");
loop { loop {
let dep = client let dep = client
.get_deployment( .get_deployment(
@ -244,7 +247,7 @@ impl CephRemoveOsdInterpret {
.await?; .await?;
if dep.is_none() { if dep.is_none() {
info!( debug!(
"Deployment {} successfully deleted.", "Deployment {} successfully deleted.",
self.score.osd_deployment_name self.score.osd_deployment_name
); );
@ -276,12 +279,10 @@ impl CephRemoveOsdInterpret {
Ok(tree) Ok(tree)
} }
pub async fn purge_ceph_osd( pub async fn purge_ceph_osd(&self, client: Arc<K8sClient>) -> Result<Outcome, InterpretError> {
&self, let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
client: Arc<K8sClient>, let osd_id_full = self.get_ceph_osd_id().unwrap();
osd_id_full: &str, debug!(
) -> Result<Outcome, InterpretError> {
info!(
"Purging OSD {} from Ceph cluster and removing its auth key", "Purging OSD {} from Ceph cluster and removing its auth key",
osd_id_full osd_id_full
); );
@ -291,8 +292,9 @@ impl CephRemoveOsdInterpret {
"app".to_string(), "app".to_string(),
Some(&self.score.rook_ceph_namespace), Some(&self.score.rook_ceph_namespace),
vec![ vec![
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(), "sh",
format!("ceph auth del osd.{osd_id_full}").as_str(), "-c",
format!("ceph osd purge {osd_id_numeric} --yes-i-really-mean-it && ceph auth del {osd_id_full}").as_str(),
], ],
) )
.await?; .await?;
@ -305,10 +307,10 @@ impl CephRemoveOsdInterpret {
pub async fn verify_ceph_osd_removal( pub async fn verify_ceph_osd_removal(
&self, &self,
client: Arc<K8sClient>, client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer(); let (timeout, interval, start) = self.build_timer();
info!( let osd_id_full = self.get_ceph_osd_id().unwrap();
debug!(
"Verifying OSD {} has been removed from the Ceph tree...", "Verifying OSD {} has been removed from the Ceph tree...",
osd_id_full osd_id_full
); );
@ -318,7 +320,7 @@ impl CephRemoveOsdInterpret {
"rook-ceph-tools".to_string(), "rook-ceph-tools".to_string(),
"app".to_string(), "app".to_string(),
Some(&self.score.rook_ceph_namespace), Some(&self.score.rook_ceph_namespace),
vec!["ceph osd tree -f json"], vec!["sh", "-c", "ceph osd tree -f json"],
) )
.await?; .await?;
let tree = let tree =

View File

@ -1,2 +1,2 @@
pub mod ceph_osd_replacement_score; pub mod ceph_remove_osd_score;
pub mod ceph_validate_health_score; pub mod ceph_validate_health_score;