fix: remove ceph osd deletes and purges osd from ceph osd tree\ #120
10
Cargo.lock
generated
10
Cargo.lock
generated
@ -1700,6 +1700,16 @@ dependencies = [
|
|||||||
"url",
|
"url",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "example_remove_rook_osd"
|
||||||
|
version = "0.1.0"
|
||||||
|
dependencies = [
|
||||||
|
"harmony",
|
||||||
|
"harmony_cli",
|
||||||
|
"harmony_tui",
|
||||||
|
"tokio",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "eyre"
|
name = "eyre"
|
||||||
version = "0.6.12"
|
version = "0.6.12"
|
||||||
|
@ -32,6 +32,7 @@ pub enum InterpretName {
|
|||||||
Lamp,
|
Lamp,
|
||||||
ApplicationMonitoring,
|
ApplicationMonitoring,
|
||||||
K8sPrometheusCrdAlerting,
|
K8sPrometheusCrdAlerting,
|
||||||
|
CephRemoveOsd,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl std::fmt::Display for InterpretName {
|
impl std::fmt::Display for InterpretName {
|
||||||
@ -58,6 +59,7 @@ impl std::fmt::Display for InterpretName {
|
|||||||
InterpretName::Lamp => f.write_str("LAMP"),
|
InterpretName::Lamp => f.write_str("LAMP"),
|
||||||
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
|
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
|
||||||
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
|
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
|
||||||
|
InterpretName::CephRemoveOsd => f.write_str("CephRemoveOsd"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -15,7 +15,7 @@ use kube::{
|
|||||||
api::{ApiResource, GroupVersionKind},
|
api::{ApiResource, GroupVersionKind},
|
||||||
runtime::wait::await_condition,
|
runtime::wait::await_condition,
|
||||||
};
|
};
|
||||||
use log::{debug, error, trace};
|
use log::{debug, error, info, trace};
|
||||||
use serde::{Serialize, de::DeserializeOwned};
|
use serde::{Serialize, de::DeserializeOwned};
|
||||||
use serde_json::json;
|
use serde_json::json;
|
||||||
use similar::TextDiff;
|
use similar::TextDiff;
|
||||||
@ -59,10 +59,17 @@ impl K8sClient {
|
|||||||
namespace: Option<&str>,
|
namespace: Option<&str>,
|
||||||
) -> Result<Option<Deployment>, Error> {
|
) -> Result<Option<Deployment>, Error> {
|
||||||
let deps: Api<Deployment> = if let Some(ns) = namespace {
|
let deps: Api<Deployment> = if let Some(ns) = namespace {
|
||||||
|
debug!("getting namespaced deployment");
|
||||||
Api::namespaced(self.client.clone(), ns)
|
Api::namespaced(self.client.clone(), ns)
|
||||||
} else {
|
} else {
|
||||||
|
debug!("getting default namespace deployment");
|
||||||
Api::default_namespaced(self.client.clone())
|
Api::default_namespaced(self.client.clone())
|
||||||
};
|
};
|
||||||
|
debug!(
|
||||||
|
"getting deployment {} in ns {}",
|
||||||
|
name,
|
||||||
|
namespace.unwrap()
|
||||||
|
);
|
||||||
Ok(deps.get_opt(name).await?)
|
Ok(deps.get_opt(name).await?)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -93,7 +100,7 @@ impl K8sClient {
|
|||||||
}
|
}
|
||||||
});
|
});
|
||||||
let pp = PatchParams::default();
|
let pp = PatchParams::default();
|
||||||
let scale = Patch::Apply(&patch);
|
let scale = Patch::Merge(&patch);
|
||||||
deployments.patch_scale(name, &pp, &scale).await?;
|
deployments.patch_scale(name, &pp, &scale).await?;
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -1,11 +1,10 @@
|
|||||||
use std::{
|
use std::{
|
||||||
process::Command,
|
|
||||||
sync::Arc,
|
sync::Arc,
|
||||||
time::{Duration, Instant},
|
time::{Duration, Instant},
|
||||||
};
|
};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use log::{info, warn};
|
use log::{debug, warn};
|
||||||
use serde::{Deserialize, Serialize};
|
use serde::{Deserialize, Serialize};
|
||||||
use tokio::time::sleep;
|
use tokio::time::sleep;
|
||||||
|
|
||||||
@ -19,8 +18,8 @@ use crate::{
|
|||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
pub struct CephRemoveOsd {
|
pub struct CephRemoveOsd {
|
||||||
osd_deployment_name: String,
|
pub osd_deployment_name: String,
|
||||||
rook_ceph_namespace: String,
|
pub rook_ceph_namespace: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
|
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
|
||||||
@ -54,18 +53,17 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
|
|||||||
self.verify_deployment_scaled(client.clone()).await?;
|
self.verify_deployment_scaled(client.clone()).await?;
|
||||||
self.delete_deployment(client.clone()).await?;
|
self.delete_deployment(client.clone()).await?;
|
||||||
self.verify_deployment_deleted(client.clone()).await?;
|
self.verify_deployment_deleted(client.clone()).await?;
|
||||||
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
self.purge_ceph_osd(client.clone()).await?;
|
||||||
self.purge_ceph_osd(client.clone(), &osd_id_full).await?;
|
self.verify_ceph_osd_removal(client.clone()).await?;
|
||||||
self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
|
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
||||||
Ok(Outcome::success(format!(
|
Ok(Outcome::success(format!(
|
||||||
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
|
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
|
||||||
osd_id_full, self.score.osd_deployment_name
|
osd_id_full, self.score.osd_deployment_name
|
||||||
)))
|
)))
|
||||||
}
|
}
|
||||||
fn get_name(&self) -> InterpretName {
|
fn get_name(&self) -> InterpretName {
|
||||||
todo!()
|
InterpretName::CephRemoveOsd
|
||||||
}
|
}
|
||||||
|
|
||||||
fn get_version(&self) -> Version {
|
fn get_version(&self) -> Version {
|
||||||
@ -82,7 +80,7 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
|
|||||||
}
|
}
|
||||||
|
|
||||||
impl CephRemoveOsdInterpret {
|
impl CephRemoveOsdInterpret {
|
||||||
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
|
pub fn get_ceph_osd_id_numeric(&self) -> Result<String, InterpretError> {
|
||||||
let osd_id_numeric = self
|
let osd_id_numeric = self
|
||||||
.score
|
.score
|
||||||
.osd_deployment_name
|
.osd_deployment_name
|
||||||
@ -94,9 +92,14 @@ impl CephRemoveOsdInterpret {
|
|||||||
self.score.osd_deployment_name
|
self.score.osd_deployment_name
|
||||||
))
|
))
|
||||||
})?;
|
})?;
|
||||||
|
Ok(osd_id_numeric.to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
|
||||||
|
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
|
||||||
let osd_id_full = format!("osd.{}", osd_id_numeric);
|
let osd_id_full = format!("osd.{}", osd_id_numeric);
|
||||||
|
|
||||||
info!(
|
debug!(
|
||||||
"Targeting Ceph OSD: {} (parsed from deployment {})",
|
"Targeting Ceph OSD: {} (parsed from deployment {})",
|
||||||
osd_id_full, self.score.osd_deployment_name
|
osd_id_full, self.score.osd_deployment_name
|
||||||
);
|
);
|
||||||
@ -108,6 +111,7 @@ impl CephRemoveOsdInterpret {
|
|||||||
&self,
|
&self,
|
||||||
client: Arc<K8sClient>,
|
client: Arc<K8sClient>,
|
||||||
) -> Result<Outcome, InterpretError> {
|
) -> Result<Outcome, InterpretError> {
|
||||||
|
debug!("verifying toolbox exists");
|
||||||
let toolbox_dep = "rook-ceph-tools".to_string();
|
let toolbox_dep = "rook-ceph-tools".to_string();
|
||||||
|
|
||||||
match client
|
match client
|
||||||
@ -149,7 +153,7 @@ impl CephRemoveOsdInterpret {
|
|||||||
&self,
|
&self,
|
||||||
client: Arc<K8sClient>,
|
client: Arc<K8sClient>,
|
||||||
) -> Result<Outcome, InterpretError> {
|
) -> Result<Outcome, InterpretError> {
|
||||||
info!(
|
debug!(
|
||||||
"Scaling down OSD deployment: {}",
|
"Scaling down OSD deployment: {}",
|
||||||
self.score.osd_deployment_name
|
self.score.osd_deployment_name
|
||||||
);
|
);
|
||||||
@ -172,7 +176,7 @@ impl CephRemoveOsdInterpret {
|
|||||||
) -> Result<Outcome, InterpretError> {
|
) -> Result<Outcome, InterpretError> {
|
||||||
let (timeout, interval, start) = self.build_timer();
|
let (timeout, interval, start) = self.build_timer();
|
||||||
|
|
||||||
info!("Waiting for OSD deployment to scale down to 0 replicas");
|
debug!("Waiting for OSD deployment to scale down to 0 replicas");
|
||||||
loop {
|
loop {
|
||||||
let dep = client
|
let dep = client
|
||||||
.get_deployment(
|
.get_deployment(
|
||||||
@ -180,11 +184,9 @@ impl CephRemoveOsdInterpret {
|
|||||||
Some(&self.score.rook_ceph_namespace),
|
Some(&self.score.rook_ceph_namespace),
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if let Some(deployment) = dep {
|
if let Some(deployment) = dep {
|
||||||
if let Some(status) = deployment.status {
|
if let Some(status) = deployment.status {
|
||||||
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0
|
if status.replicas == None && status.ready_replicas == None {
|
||||||
{
|
|
||||||
return Ok(Outcome::success(
|
return Ok(Outcome::success(
|
||||||
"Deployment successfully scaled down.".to_string(),
|
"Deployment successfully scaled down.".to_string(),
|
||||||
));
|
));
|
||||||
@ -212,7 +214,7 @@ impl CephRemoveOsdInterpret {
|
|||||||
&self,
|
&self,
|
||||||
client: Arc<K8sClient>,
|
client: Arc<K8sClient>,
|
||||||
) -> Result<Outcome, InterpretError> {
|
) -> Result<Outcome, InterpretError> {
|
||||||
info!(
|
debug!(
|
||||||
"Deleting OSD deployment: {}",
|
"Deleting OSD deployment: {}",
|
||||||
self.score.osd_deployment_name
|
self.score.osd_deployment_name
|
||||||
);
|
);
|
||||||
@ -234,7 +236,7 @@ impl CephRemoveOsdInterpret {
|
|||||||
) -> Result<Outcome, InterpretError> {
|
) -> Result<Outcome, InterpretError> {
|
||||||
let (timeout, interval, start) = self.build_timer();
|
let (timeout, interval, start) = self.build_timer();
|
||||||
|
|
||||||
info!("Waiting for OSD deployment to scale down to 0 replicas");
|
debug!("Verifying OSD deployment deleted");
|
||||||
loop {
|
loop {
|
||||||
let dep = client
|
let dep = client
|
||||||
.get_deployment(
|
.get_deployment(
|
||||||
@ -244,7 +246,7 @@ impl CephRemoveOsdInterpret {
|
|||||||
.await?;
|
.await?;
|
||||||
|
|
||||||
if dep.is_none() {
|
if dep.is_none() {
|
||||||
info!(
|
debug!(
|
||||||
"Deployment {} successfully deleted.",
|
"Deployment {} successfully deleted.",
|
||||||
self.score.osd_deployment_name
|
self.score.osd_deployment_name
|
||||||
);
|
);
|
||||||
@ -276,12 +278,10 @@ impl CephRemoveOsdInterpret {
|
|||||||
Ok(tree)
|
Ok(tree)
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn purge_ceph_osd(
|
pub async fn purge_ceph_osd(&self, client: Arc<K8sClient>) -> Result<Outcome, InterpretError> {
|
||||||
&self,
|
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
|
||||||
client: Arc<K8sClient>,
|
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
||||||
osd_id_full: &str,
|
debug!(
|
||||||
) -> Result<Outcome, InterpretError> {
|
|
||||||
info!(
|
|
||||||
"Purging OSD {} from Ceph cluster and removing its auth key",
|
"Purging OSD {} from Ceph cluster and removing its auth key",
|
||||||
osd_id_full
|
osd_id_full
|
||||||
);
|
);
|
||||||
@ -291,8 +291,10 @@ impl CephRemoveOsdInterpret {
|
|||||||
"app".to_string(),
|
"app".to_string(),
|
||||||
Some(&self.score.rook_ceph_namespace),
|
Some(&self.score.rook_ceph_namespace),
|
||||||
vec![
|
vec![
|
||||||
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(),
|
"sh",
|
||||||
format!("ceph auth del osd.{osd_id_full}").as_str(),
|
"-c",
|
||||||
|
format!("ceph osd purge {osd_id_numeric} --yes-i-really-mean-it").as_str(),
|
||||||
|
format!("ceph auth del {osd_id_full}").as_str(),
|
||||||
|
|||||||
],
|
],
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
@ -305,10 +307,10 @@ impl CephRemoveOsdInterpret {
|
|||||||
pub async fn verify_ceph_osd_removal(
|
pub async fn verify_ceph_osd_removal(
|
||||||
&self,
|
&self,
|
||||||
client: Arc<K8sClient>,
|
client: Arc<K8sClient>,
|
||||||
osd_id_full: &str,
|
|
||||||
) -> Result<Outcome, InterpretError> {
|
) -> Result<Outcome, InterpretError> {
|
||||||
let (timeout, interval, start) = self.build_timer();
|
let (timeout, interval, start) = self.build_timer();
|
||||||
info!(
|
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
||||||
|
debug!(
|
||||||
"Verifying OSD {} has been removed from the Ceph tree...",
|
"Verifying OSD {} has been removed from the Ceph tree...",
|
||||||
osd_id_full
|
osd_id_full
|
||||||
);
|
);
|
||||||
@ -318,7 +320,7 @@ impl CephRemoveOsdInterpret {
|
|||||||
"rook-ceph-tools".to_string(),
|
"rook-ceph-tools".to_string(),
|
||||||
"app".to_string(),
|
"app".to_string(),
|
||||||
Some(&self.score.rook_ceph_namespace),
|
Some(&self.score.rook_ceph_namespace),
|
||||||
vec!["ceph osd tree -f json"],
|
vec!["sh", "-c", "ceph osd tree -f json"],
|
||||||
)
|
)
|
||||||
.await?;
|
.await?;
|
||||||
let tree =
|
let tree =
|
||||||
|
Loading…
Reference in New Issue
Block a user
Not sure it works. Needs either && or two exec calls.