feat/ceph-osd-score #116

Merged
wjro merged 4 commits from feat/ceph-osd-score into master 2025-08-20 18:19:48 +00:00
2 changed files with 402 additions and 33 deletions
Showing only changes of commit 89eb88d10e - Show all commits

View File

@ -5,7 +5,7 @@ use k8s_openapi::{
}; };
use kube::{ use kube::{
Client, Config, Error, Resource, Client, Config, Error, Resource,
api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt}, api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
config::{KubeConfigOptions, Kubeconfig}, config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse, core::ErrorResponse,
runtime::reflector::Lookup, runtime::reflector::Lookup,
@ -19,6 +19,7 @@ use log::{debug, error, trace};
use serde::{Serialize, de::DeserializeOwned}; use serde::{Serialize, de::DeserializeOwned};
use serde_json::json; use serde_json::json;
use similar::TextDiff; use similar::TextDiff;
use tokio::io::AsyncReadExt;
#[derive(new, Clone)] #[derive(new, Clone)]
pub struct K8sClient { pub struct K8sClient {
@ -97,6 +98,21 @@ impl K8sClient {
Ok(()) Ok(())
} }
pub async fn delete_deployment(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<(), Error> {
let deployments: Api<Deployment> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
let delete_params = DeleteParams::default();
deployments.delete(name, &delete_params).await?;
Ok(())
}
pub async fn wait_until_deployment_ready( pub async fn wait_until_deployment_ready(
&self, &self,
name: String, name: String,
@ -122,6 +138,68 @@ impl K8sClient {
} }
} }
/// Will execute a commond in the first pod found that matches the specified label
/// '{label}={name}'
pub async fn exec_app_capture_output(
&self,
name: String,
label: String,
namespace: Option<&str>,
command: Vec<&str>,
) -> Result<String, String> {
let api: Api<Pod>;
if let Some(ns) = namespace {
api = Api::namespaced(self.client.clone(), ns);
} else {
api = Api::default_namespaced(self.client.clone());
}
let pod_list = api
.list(&ListParams::default().labels(format!("{label}={name}").as_str()))
.await
.expect("couldn't get list of pods");
let res = api
.exec(
pod_list
.items
.first()
.expect("couldn't get pod")
.name()
.expect("couldn't get pod name")
.into_owned()
.as_str(),
command,
&AttachParams::default().stdout(true).stderr(true),
)
.await;
match res {
Err(e) => Err(e.to_string()),
Ok(mut process) => {
let status = process
.take_status()
.expect("Couldn't get status")
.await
.expect("Couldn't unwrap status");
if let Some(s) = status.status {
let mut stdout_buf = String::new();
if let Some(mut stdout) = process.stdout().take() {
stdout.read_to_string(&mut stdout_buf).await;
}
debug!("Status: {} - {:?}", s, status.details);
if s == "Success" {
Ok(stdout_buf)
} else {
Err(s)
}
} else {
Err("Couldn't get inner status of pod exec".to_string())
}
}
}
}
/// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}` /// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}`
pub async fn exec_app( pub async fn exec_app(
&self, &self,

View File

@ -1,72 +1,68 @@
use std::process::Command; use std::{
process::Command,
sync::Arc,
time::{Duration, Instant},
};
use async_trait::async_trait; use async_trait::async_trait;
use serde::Serialize; use log::{info, warn};
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use crate::{ use crate::{
data::{Id, Version}, data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}, interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory, inventory::Inventory,
score::Score, score::Score,
topology::{K8sclient, Topology}, topology::{K8sclient, Topology, k8s::K8sClient},
}; };
#[derive(Debug, Clone, Serialize)] #[derive(Debug, Clone, Serialize)]
pub struct PrepCephOsdReplacement { pub struct CephRemoveOsd {
osd_name: String, osd_deployment_name: String,
rook_ceph_namespace: String, rook_ceph_namespace: String,
} }
impl<T: Topology + K8sclient> Score<T> for PrepCephOsdReplacement { impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
fn name(&self) -> String { fn name(&self) -> String {
format!("CephOsdReplacementScore") format!("CephRemoveOsdScore")
} }
#[doc(hidden)] #[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> { fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(PrepCephOsdReplacementInterpret { Box::new(CephRemoveOsdInterpret {
score: self.clone(), score: self.clone(),
}) })
} }
} }
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct PrepCephOsdReplacementInterpret { pub struct CephRemoveOsdInterpret {
score: PrepCephOsdReplacement, score: CephRemoveOsd,
} }
#[async_trait] #[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for PrepCephOsdReplacementInterpret { impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
async fn execute( async fn execute(
&self, &self,
_inventory: &Inventory, _inventory: &Inventory,
topology: &T, topology: &T,
) -> Result<Outcome, InterpretError> { ) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await.unwrap(); let client = topology.k8s_client().await.unwrap();
client self.scale_deployment(client.clone()).await?;
Review

Nice, super lisible.

Eventuellement plusieurs de ces helper functions pourront aller dans un module ceph management ou quelque chose comme ca.

Nice, super lisible. Eventuellement plusieurs de ces helper functions pourront aller dans un module ceph management ou quelque chose comme ca.
.scale_deployment( self.verify_deployment_scaled(client.clone()).await?;
&self.score.osd_name, self.delete_deployment(client.clone()).await?;
Some(&self.score.rook_ceph_namespace), self.verify_deployment_deleted(client.clone()).await?;
0, let osd_id_full = self.get_ceph_osd_id().unwrap();
) self.purge_ceph_osd(client.clone(), &osd_id_full).await?;
.await?; self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
let dep = client
.get_deployment(&self.score.osd_name, Some(&self.score.rook_ceph_namespace))
.await?; .await?;
match dep.unwrap().status.and_then(|s| s.ready_replicas) { Ok(Outcome::success(format!(
Some(0) => Ok(Outcome::success( "Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
"Successfully prepared rook-ceph-cluster for disk replacement".to_string(), osd_id_full, self.score.osd_deployment_name
)), )))
Some(_) => Err(InterpretError::new({
"Deployment still has ready replicas".to_string()
})),
None => Err(InterpretError::new({
"Failed to get rook-ceph-cluster status".to_string()
})),
} }
}
fn get_name(&self) -> InterpretName { fn get_name(&self) -> InterpretName {
todo!() todo!()
} }
@ -83,3 +79,298 @@ impl<T: Topology + K8sclient> Interpret<T> for PrepCephOsdReplacementInterpret {
todo!() todo!()
} }
} }
impl CephRemoveOsdInterpret {
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self
.score
.osd_deployment_name
.split('-')
.nth(3)
.ok_or_else(|| {
InterpretError::new(format!(
"Could not parse OSD id from deployment name {}",
self.score.osd_deployment_name
))
})?;
let osd_id_full = format!("osd.{}", osd_id_numeric);
info!(
"Targeting Ceph OSD: {} (parsed from deployment {})",
osd_id_full, self.score.osd_deployment_name
);
Ok(osd_id_full)
}
fn build_timer(&self) -> (Duration, Duration, Instant) {
let timeout = Duration::from_secs(120);
let interval = Duration::from_secs(5);
let start = Instant::now();
(timeout, interval, start)
}
pub async fn scale_deployment(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
"Scaling down OSD deployment: {}",
self.score.osd_deployment_name
);
client
.scale_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
0,
)
Review

Tres cool d'avoir ce check. Eventuellement on pourra etre plus intelligent et proposer au user de demarrer la toolbox mais c'est hors sujet pour l'instant 👍

Tres cool d'avoir ce check. Eventuellement on pourra etre plus intelligent et proposer au user de demarrer la toolbox mais c'est hors sujet pour l'instant 👍
.await?;
Ok(Outcome::success(format!(
"Scaled down deployment {}",
self.score.osd_deployment_name
)))
}
pub async fn verify_deployment_scaled(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
)
.await?;
if let Some(deployment) = dep {
if let Some(status) = deployment.status {
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0
{
return Ok(Outcome::success(
"Deployment successfully scaled down.".to_string(),
));
}
}
}
if start.elapsed() > timeout {
return Err(InterpretError::new(format!(
"Timed out waiting for deployment {} to scale down",
self.score.osd_deployment_name
)));
}
sleep(interval).await;
}
}
pub async fn delete_deployment(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
"Deleting OSD deployment: {}",
self.score.osd_deployment_name
);
client
.delete_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
)
.await?;
Ok(Outcome::success(format!(
"deployment {} deleted",
self.score.osd_deployment_name
)))
}
pub async fn verify_deployment_deleted(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
)
.await?;
if dep.is_none() {
info!(
"Deployment {} successfully deleted.",
self.score.osd_deployment_name
);
return Ok(Outcome::success(format!(
"Deployment {} deleted.",
self.score.osd_deployment_name
)));
}
if start.elapsed() > timeout {
return Err(InterpretError::new(format!(
"Timed out waiting for deployment {} to be deleted",
self.score.osd_deployment_name
)));
}
sleep(interval).await;
}
}
fn get_osd_tree(&self, json: serde_json::Value) -> Result<CephOsdTree, InterpretError> {
let nodes = json.get("nodes").ok_or_else(|| {
InterpretError::new("Missing 'nodes' field in ceph osd tree JSON".to_string())
})?;
let tree: CephOsdTree = CephOsdTree{ nodes: serde_json::from_value(nodes.clone()).map_err(|e| {
InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e))
})?,};
Ok(tree)
}
pub async fn purge_ceph_osd(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
info!(
"Purging OSD {} from Ceph cluster and removing its auth key",
osd_id_full
);
client
.exec_app_capture_output(
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec![
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(),
format!("ceph auth del osd.{osd_id_full}").as_str(),
],
)
.await?;
Ok(Outcome::success(format!(
"osd id {} removed from osd tree",
osd_id_full
)))
}
pub async fn verify_ceph_osd_removal(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!(
"Verifying OSD {} has been removed from the Ceph tree...",
osd_id_full
);
loop {
let output = client
.exec_app_capture_output(
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec!["ceph osd tree -f json"],
)
.await?;
let tree =
self.get_osd_tree(serde_json::from_str(&output).expect("could not extract json"));
let osd_found = tree
.unwrap()
.nodes
.iter()
.any(|node| node.name == osd_id_full);
if !osd_found {
return Ok(Outcome::success(format!(
"Successfully verified that OSD {} is removed from the Ceph cluster.",
osd_id_full,
)));
}
if start.elapsed() > timeout {
return Err(InterpretError::new(format!(
"Timed out waiting for OSD {} to be removed from Ceph tree",
osd_id_full
)));
}
warn!(
"OSD {} still found in Ceph tree, retrying in {:?}...",
osd_id_full, interval
);
sleep(interval).await;
}
}
}
#[derive(Debug, Deserialize, PartialEq)]
pub struct CephOsdTree {
pub nodes: Vec<CephNode>,
}
#[derive(Debug, Deserialize, PartialEq)]
pub struct CephNode {
pub id: i32,
pub name: String,
#[serde(rename = "type")]
pub node_type: String,
pub type_id: Option<i32>,
pub children: Option<Vec<i32>>,
pub exists: Option<i32>,
pub status: Option<String>,
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
#[test]
fn test_get_osd_tree() {
let json_data = json!({
"nodes": [
{"id": 1, "name": "osd.1", "type": "osd", "primary_affinity":"1"},
{"id": 2, "name": "osd.2", "type": "osd", "crush_weight": 1.22344}
]
});
let interpret = CephRemoveOsdInterpret {
score: CephRemoveOsd {
osd_deployment_name: "osd-1".to_string(),
rook_ceph_namespace: "dummy_ns".to_string(),
},
};
let json = interpret.get_osd_tree(json_data).unwrap();
let expected = CephOsdTree {
nodes: vec![
CephNode {
id: 1,
name: "osd.1".to_string(),
node_type: "osd".to_string(),
type_id: None,
children: None,
exists: None,
status: None,
},
CephNode {
id: 2,
name: "osd.2".to_string(),
node_type: "osd".to_string(),
type_id: None,
children: None,
exists: None,
status: None,
},
],
};
assert_eq!(json, expected);
}
}