fix: remove ceph osd deletes and purges osd from ceph osd tree\ (#120)
k8s returns None rather than zero when checking deployment for replicas exec_app requires commands 's' and '-c' to run correctly Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/120 Co-authored-by: Willem <wrolleman@nationtech.io> Co-committed-by: Willem <wrolleman@nationtech.io>
This commit is contained in:
parent
ed7f81aa1f
commit
14d1823d15
9
Cargo.lock
generated
9
Cargo.lock
generated
@ -4613,6 +4613,15 @@ version = "0.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001"
|
||||
|
||||
[[package]]
|
||||
name = "remove_rook_osd"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"harmony",
|
||||
"harmony_cli",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reqwest"
|
||||
version = "0.11.27"
|
||||
|
11
examples/remove_rook_osd/Cargo.toml
Normal file
11
examples/remove_rook_osd/Cargo.toml
Normal file
@ -0,0 +1,11 @@
|
||||
[package]
|
||||
name = "example-remove-rook-osd"
|
||||
edition = "2024"
|
||||
version.workspace = true
|
||||
readme.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
harmony = { version = "0.1.0", path = "../../harmony" }
|
||||
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
|
||||
tokio.workspace = true
|
18
examples/remove_rook_osd/src/main.rs
Normal file
18
examples/remove_rook_osd/src/main.rs
Normal file
@ -0,0 +1,18 @@
|
||||
use harmony::{
|
||||
inventory::Inventory, modules::storage::ceph::ceph_remove_osd_score::CephRemoveOsd,
|
||||
topology::K8sAnywhereTopology,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let ceph_score = CephRemoveOsd {
|
||||
osd_deployment_name: "rook-ceph-osd-2".to_string(),
|
||||
rook_ceph_namespace: "rook-ceph".to_string(),
|
||||
};
|
||||
|
||||
let topology = K8sAnywhereTopology::from_env();
|
||||
let inventory = Inventory::autoload();
|
||||
harmony_cli::run(inventory, topology, vec![Box::new(ceph_score)], None)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
@ -30,6 +30,7 @@ pub enum InterpretName {
|
||||
Lamp,
|
||||
ApplicationMonitoring,
|
||||
K8sPrometheusCrdAlerting,
|
||||
CephRemoveOsd,
|
||||
DiscoverInventoryAgent,
|
||||
CephClusterHealth,
|
||||
Custom(&'static str),
|
||||
@ -61,7 +62,11 @@ impl std::fmt::Display for InterpretName {
|
||||
InterpretName::Lamp => f.write_str("LAMP"),
|
||||
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
|
||||
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
|
||||
<<<<<<< HEAD
|
||||
InterpretName::CephRemoveOsd => f.write_str("CephRemoveOsd"),
|
||||
=======
|
||||
InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"),
|
||||
>>>>>>> origin/master
|
||||
InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"),
|
||||
InterpretName::Custom(name) => f.write_str(name),
|
||||
InterpretName::RHOBAlerting => f.write_str("RHOBAlerting"),
|
||||
|
@ -21,7 +21,7 @@ use kube::{
|
||||
api::{ApiResource, GroupVersionKind},
|
||||
runtime::wait::await_condition,
|
||||
};
|
||||
use log::{debug, error, trace};
|
||||
use log::{debug, error, info, trace};
|
||||
use serde::{Serialize, de::DeserializeOwned};
|
||||
use serde_json::{Value, json};
|
||||
use similar::TextDiff;
|
||||
@ -80,10 +80,13 @@ impl K8sClient {
|
||||
namespace: Option<&str>,
|
||||
) -> Result<Option<Deployment>, Error> {
|
||||
let deps: Api<Deployment> = if let Some(ns) = namespace {
|
||||
debug!("getting namespaced deployment");
|
||||
Api::namespaced(self.client.clone(), ns)
|
||||
} else {
|
||||
debug!("getting default namespace deployment");
|
||||
Api::default_namespaced(self.client.clone())
|
||||
};
|
||||
debug!("getting deployment {} in ns {}", name, namespace.unwrap());
|
||||
Ok(deps.get_opt(name).await?)
|
||||
}
|
||||
|
||||
@ -114,7 +117,7 @@ impl K8sClient {
|
||||
}
|
||||
});
|
||||
let pp = PatchParams::default();
|
||||
let scale = Patch::Apply(&patch);
|
||||
let scale = Patch::Merge(&patch);
|
||||
deployments.patch_scale(name, &pp, &scale).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
@ -4,7 +4,7 @@ use std::{
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::{info, warn};
|
||||
use log::{debug, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::time::sleep;
|
||||
|
||||
@ -19,8 +19,8 @@ use harmony_types::id::Id;
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct CephRemoveOsd {
|
||||
osd_deployment_name: String,
|
||||
rook_ceph_namespace: String,
|
||||
pub osd_deployment_name: String,
|
||||
pub rook_ceph_namespace: String,
|
||||
}
|
||||
|
||||
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
|
||||
@ -54,18 +54,17 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
|
||||
self.verify_deployment_scaled(client.clone()).await?;
|
||||
self.delete_deployment(client.clone()).await?;
|
||||
self.verify_deployment_deleted(client.clone()).await?;
|
||||
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
||||
self.purge_ceph_osd(client.clone(), &osd_id_full).await?;
|
||||
self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
|
||||
.await?;
|
||||
self.purge_ceph_osd(client.clone()).await?;
|
||||
self.verify_ceph_osd_removal(client.clone()).await?;
|
||||
|
||||
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
||||
Ok(Outcome::success(format!(
|
||||
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
|
||||
osd_id_full, self.score.osd_deployment_name
|
||||
)))
|
||||
}
|
||||
fn get_name(&self) -> InterpretName {
|
||||
todo!()
|
||||
InterpretName::CephRemoveOsd
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
@ -82,7 +81,7 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
|
||||
}
|
||||
|
||||
impl CephRemoveOsdInterpret {
|
||||
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
|
||||
pub fn get_ceph_osd_id_numeric(&self) -> Result<String, InterpretError> {
|
||||
let osd_id_numeric = self
|
||||
.score
|
||||
.osd_deployment_name
|
||||
@ -94,9 +93,14 @@ impl CephRemoveOsdInterpret {
|
||||
self.score.osd_deployment_name
|
||||
))
|
||||
})?;
|
||||
Ok(osd_id_numeric.to_string())
|
||||
}
|
||||
|
||||
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
|
||||
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
|
||||
let osd_id_full = format!("osd.{}", osd_id_numeric);
|
||||
|
||||
info!(
|
||||
debug!(
|
||||
"Targeting Ceph OSD: {} (parsed from deployment {})",
|
||||
osd_id_full, self.score.osd_deployment_name
|
||||
);
|
||||
@ -108,6 +112,7 @@ impl CephRemoveOsdInterpret {
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
debug!("verifying toolbox exists");
|
||||
let toolbox_dep = "rook-ceph-tools".to_string();
|
||||
|
||||
match client
|
||||
@ -149,7 +154,7 @@ impl CephRemoveOsdInterpret {
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!(
|
||||
debug!(
|
||||
"Scaling down OSD deployment: {}",
|
||||
self.score.osd_deployment_name
|
||||
);
|
||||
@ -172,7 +177,7 @@ impl CephRemoveOsdInterpret {
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let (timeout, interval, start) = self.build_timer();
|
||||
|
||||
info!("Waiting for OSD deployment to scale down to 0 replicas");
|
||||
debug!("Waiting for OSD deployment to scale down to 0 replicas");
|
||||
loop {
|
||||
let dep = client
|
||||
.get_deployment(
|
||||
@ -180,11 +185,9 @@ impl CephRemoveOsdInterpret {
|
||||
Some(&self.score.rook_ceph_namespace),
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(deployment) = dep {
|
||||
if let Some(status) = deployment.status {
|
||||
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0
|
||||
{
|
||||
if status.replicas == None && status.ready_replicas == None {
|
||||
return Ok(Outcome::success(
|
||||
"Deployment successfully scaled down.".to_string(),
|
||||
));
|
||||
@ -212,7 +215,7 @@ impl CephRemoveOsdInterpret {
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!(
|
||||
debug!(
|
||||
"Deleting OSD deployment: {}",
|
||||
self.score.osd_deployment_name
|
||||
);
|
||||
@ -234,7 +237,7 @@ impl CephRemoveOsdInterpret {
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let (timeout, interval, start) = self.build_timer();
|
||||
|
||||
info!("Waiting for OSD deployment to scale down to 0 replicas");
|
||||
debug!("Verifying OSD deployment deleted");
|
||||
loop {
|
||||
let dep = client
|
||||
.get_deployment(
|
||||
@ -244,7 +247,7 @@ impl CephRemoveOsdInterpret {
|
||||
.await?;
|
||||
|
||||
if dep.is_none() {
|
||||
info!(
|
||||
debug!(
|
||||
"Deployment {} successfully deleted.",
|
||||
self.score.osd_deployment_name
|
||||
);
|
||||
@ -276,12 +279,10 @@ impl CephRemoveOsdInterpret {
|
||||
Ok(tree)
|
||||
}
|
||||
|
||||
pub async fn purge_ceph_osd(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
osd_id_full: &str,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!(
|
||||
pub async fn purge_ceph_osd(&self, client: Arc<K8sClient>) -> Result<Outcome, InterpretError> {
|
||||
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
|
||||
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
||||
debug!(
|
||||
"Purging OSD {} from Ceph cluster and removing its auth key",
|
||||
osd_id_full
|
||||
);
|
||||
@ -291,8 +292,9 @@ impl CephRemoveOsdInterpret {
|
||||
"app".to_string(),
|
||||
Some(&self.score.rook_ceph_namespace),
|
||||
vec![
|
||||
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(),
|
||||
format!("ceph auth del osd.{osd_id_full}").as_str(),
|
||||
"sh",
|
||||
"-c",
|
||||
format!("ceph osd purge {osd_id_numeric} --yes-i-really-mean-it && ceph auth del {osd_id_full}").as_str(),
|
||||
],
|
||||
)
|
||||
.await?;
|
||||
@ -305,10 +307,10 @@ impl CephRemoveOsdInterpret {
|
||||
pub async fn verify_ceph_osd_removal(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
osd_id_full: &str,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let (timeout, interval, start) = self.build_timer();
|
||||
info!(
|
||||
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
||||
debug!(
|
||||
"Verifying OSD {} has been removed from the Ceph tree...",
|
||||
osd_id_full
|
||||
);
|
||||
@ -318,7 +320,7 @@ impl CephRemoveOsdInterpret {
|
||||
"rook-ceph-tools".to_string(),
|
||||
"app".to_string(),
|
||||
Some(&self.score.rook_ceph_namespace),
|
||||
vec!["ceph osd tree -f json"],
|
||||
vec!["sh", "-c", "ceph osd tree -f json"],
|
||||
)
|
||||
.await?;
|
||||
let tree =
|
@ -1,2 +1,2 @@
|
||||
pub mod ceph_osd_replacement_score;
|
||||
pub mod ceph_remove_osd_score;
|
||||
pub mod ceph_validate_health_score;
|
||||
|
Loading…
Reference in New Issue
Block a user