Merge branch 'master' into feat/detect_k8s_flavour
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
This commit is contained in:
commit
2a520a1d7c
9
Cargo.lock
generated
9
Cargo.lock
generated
@ -4613,6 +4613,15 @@ version = "0.8.6"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "caf4aa5b0f434c91fe5c7f1ecb6a5ece2130b02ad2a590589dda5146df959001"
|
||||
|
||||
[[package]]
|
||||
name = "remove_rook_osd"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"harmony",
|
||||
"harmony_cli",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "reqwest"
|
||||
version = "0.11.27"
|
||||
|
11
examples/remove_rook_osd/Cargo.toml
Normal file
11
examples/remove_rook_osd/Cargo.toml
Normal file
@ -0,0 +1,11 @@
|
||||
[package]
|
||||
name = "example-remove-rook-osd"
|
||||
edition = "2024"
|
||||
version.workspace = true
|
||||
readme.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
harmony = { version = "0.1.0", path = "../../harmony" }
|
||||
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
|
||||
tokio.workspace = true
|
18
examples/remove_rook_osd/src/main.rs
Normal file
18
examples/remove_rook_osd/src/main.rs
Normal file
@ -0,0 +1,18 @@
|
||||
use harmony::{
|
||||
inventory::Inventory, modules::storage::ceph::ceph_remove_osd_score::CephRemoveOsd,
|
||||
topology::K8sAnywhereTopology,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let ceph_score = CephRemoveOsd {
|
||||
osd_deployment_name: "rook-ceph-osd-2".to_string(),
|
||||
rook_ceph_namespace: "rook-ceph".to_string(),
|
||||
};
|
||||
|
||||
let topology = K8sAnywhereTopology::from_env();
|
||||
let inventory = Inventory::autoload();
|
||||
harmony_cli::run(inventory, topology, vec![Box::new(ceph_score)], None)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
@ -30,6 +30,7 @@ pub enum InterpretName {
|
||||
Lamp,
|
||||
ApplicationMonitoring,
|
||||
K8sPrometheusCrdAlerting,
|
||||
CephRemoveOsd,
|
||||
DiscoverInventoryAgent,
|
||||
CephClusterHealth,
|
||||
Custom(&'static str),
|
||||
@ -61,7 +62,11 @@ impl std::fmt::Display for InterpretName {
|
||||
InterpretName::Lamp => f.write_str("LAMP"),
|
||||
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
|
||||
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
|
||||
<<<<<<< HEAD
|
||||
InterpretName::CephRemoveOsd => f.write_str("CephRemoveOsd"),
|
||||
=======
|
||||
InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"),
|
||||
>>>>>>> origin/master
|
||||
InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"),
|
||||
InterpretName::Custom(name) => f.write_str(name),
|
||||
InterpretName::RHOBAlerting => f.write_str("RHOBAlerting"),
|
||||
|
@ -19,7 +19,7 @@ use kube::{
|
||||
api::{ApiResource, GroupVersionKind},
|
||||
runtime::wait::await_condition,
|
||||
};
|
||||
use log::{debug, error, trace};
|
||||
use log::{debug, error, info, trace};
|
||||
use serde::{Serialize, de::DeserializeOwned};
|
||||
use serde_json::{Value, json};
|
||||
use similar::TextDiff;
|
||||
@ -89,10 +89,13 @@ impl K8sClient {
|
||||
namespace: Option<&str>,
|
||||
) -> Result<Option<Deployment>, Error> {
|
||||
let deps: Api<Deployment> = if let Some(ns) = namespace {
|
||||
debug!("getting namespaced deployment");
|
||||
Api::namespaced(self.client.clone(), ns)
|
||||
} else {
|
||||
debug!("getting default namespace deployment");
|
||||
Api::default_namespaced(self.client.clone())
|
||||
};
|
||||
debug!("getting deployment {} in ns {}", name, namespace.unwrap());
|
||||
Ok(deps.get_opt(name).await?)
|
||||
}
|
||||
|
||||
@ -123,7 +126,7 @@ impl K8sClient {
|
||||
}
|
||||
});
|
||||
let pp = PatchParams::default();
|
||||
let scale = Patch::Apply(&patch);
|
||||
let scale = Patch::Merge(&patch);
|
||||
deployments.patch_scale(name, &pp, &scale).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
209
harmony/src/modules/cert_manager/cluster_issuer.rs
Normal file
209
harmony/src/modules/cert_manager/cluster_issuer.rs
Normal file
@ -0,0 +1,209 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::id::Id;
|
||||
use kube::{CustomResource, api::ObjectMeta};
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
data::Version,
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
score::Score,
|
||||
topology::{K8sclient, Topology, k8s::K8sClient},
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct ClusterIssuerScore {
|
||||
email: String,
|
||||
server: String,
|
||||
issuer_name: String,
|
||||
namespace: String,
|
||||
}
|
||||
|
||||
impl<T: Topology + K8sclient> Score<T> for ClusterIssuerScore {
|
||||
fn name(&self) -> String {
|
||||
"ClusterIssuerScore".to_string()
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(ClusterIssuerInterpret {
|
||||
score: self.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ClusterIssuerInterpret {
|
||||
score: ClusterIssuerScore,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + K8sclient> Interpret<T> for ClusterIssuerInterpret {
|
||||
async fn execute(
|
||||
&self,
|
||||
_inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
self.apply_cluster_issuer(topology.k8s_client().await.unwrap())
|
||||
.await
|
||||
}
|
||||
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("ClusterIssuer")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl ClusterIssuerInterpret {
|
||||
async fn validate_cert_manager(
|
||||
&self,
|
||||
client: &Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let cert_manager = "cert-manager".to_string();
|
||||
let operator_namespace = "openshift-operators".to_string();
|
||||
match client
|
||||
.get_deployment(&cert_manager, Some(&operator_namespace))
|
||||
.await
|
||||
{
|
||||
Ok(Some(deployment)) => {
|
||||
if let Some(status) = deployment.status {
|
||||
let ready_count = status.ready_replicas.unwrap_or(0);
|
||||
if ready_count >= 1 {
|
||||
return Ok(Outcome::success(format!(
|
||||
"'{}' is ready with {} replica(s).",
|
||||
&cert_manager, ready_count
|
||||
)));
|
||||
} else {
|
||||
return Err(InterpretError::new(
|
||||
"cert-manager operator not ready in cluster".to_string(),
|
||||
));
|
||||
}
|
||||
} else {
|
||||
Err(InterpretError::new(format!(
|
||||
"failed to get deployment status {} in ns {}",
|
||||
&cert_manager, &operator_namespace
|
||||
)))
|
||||
}
|
||||
}
|
||||
Ok(None) => Err(InterpretError::new(format!(
|
||||
"Deployment '{}' not found in namespace '{}'.",
|
||||
&cert_manager, &operator_namespace
|
||||
))),
|
||||
Err(e) => Err(InterpretError::new(format!(
|
||||
"Failed to query for deployment '{}': {}",
|
||||
&cert_manager, e
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_cluster_issuer(&self) -> Result<ClusterIssuer, InterpretError> {
|
||||
let issuer_name = &self.score.issuer_name;
|
||||
let email = &self.score.email;
|
||||
let server = &self.score.server;
|
||||
let namespace = &self.score.namespace;
|
||||
let cluster_issuer = ClusterIssuer {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(issuer_name.to_string()),
|
||||
namespace: Some(namespace.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: ClusterIssuerSpec {
|
||||
acme: AcmeSpec {
|
||||
email: email.to_string(),
|
||||
private_key_secret_ref: PrivateKeySecretRef {
|
||||
name: issuer_name.to_string(),
|
||||
},
|
||||
server: server.to_string(),
|
||||
solvers: vec![SolverSpec {
|
||||
http01: Some(Http01Solver {
|
||||
ingress: Http01Ingress {
|
||||
class: "nginx".to_string(),
|
||||
},
|
||||
}),
|
||||
}],
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
Ok(cluster_issuer)
|
||||
}
|
||||
|
||||
pub async fn apply_cluster_issuer(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let namespace = self.score.namespace.clone();
|
||||
self.validate_cert_manager(&client).await?;
|
||||
let cluster_issuer = self.build_cluster_issuer().unwrap();
|
||||
client
|
||||
.apply_yaml(
|
||||
&serde_yaml::to_value(cluster_issuer).unwrap(),
|
||||
Some(&namespace),
|
||||
)
|
||||
.await?;
|
||||
Ok(Outcome::success(format!(
|
||||
"successfully deployed cluster operator: {} in namespace: {}",
|
||||
self.score.issuer_name, self.score.namespace
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[kube(
|
||||
group = "cert-manager.io",
|
||||
version = "v1",
|
||||
kind = "ClusterIssuer",
|
||||
plural = "clusterissuers"
|
||||
)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ClusterIssuerSpec {
|
||||
pub acme: AcmeSpec,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct AcmeSpec {
|
||||
pub email: String,
|
||||
pub private_key_secret_ref: PrivateKeySecretRef,
|
||||
pub server: String,
|
||||
pub solvers: Vec<SolverSpec>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct PrivateKeySecretRef {
|
||||
pub name: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SolverSpec {
|
||||
pub http01: Option<Http01Solver>,
|
||||
// Other solver types (e.g., dns01) would go here as Options
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Http01Solver {
|
||||
pub ingress: Http01Ingress,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Http01Ingress {
|
||||
pub class: String,
|
||||
}
|
@ -1,2 +1,3 @@
|
||||
pub mod cluster_issuer;
|
||||
mod helm;
|
||||
pub use helm::*;
|
||||
|
@ -4,7 +4,7 @@ use std::{
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::{info, warn};
|
||||
use log::{debug, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::time::sleep;
|
||||
|
||||
@ -19,8 +19,8 @@ use harmony_types::id::Id;
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct CephRemoveOsd {
|
||||
osd_deployment_name: String,
|
||||
rook_ceph_namespace: String,
|
||||
pub osd_deployment_name: String,
|
||||
pub rook_ceph_namespace: String,
|
||||
}
|
||||
|
||||
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
|
||||
@ -54,18 +54,17 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
|
||||
self.verify_deployment_scaled(client.clone()).await?;
|
||||
self.delete_deployment(client.clone()).await?;
|
||||
self.verify_deployment_deleted(client.clone()).await?;
|
||||
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
||||
self.purge_ceph_osd(client.clone(), &osd_id_full).await?;
|
||||
self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
|
||||
.await?;
|
||||
self.purge_ceph_osd(client.clone()).await?;
|
||||
self.verify_ceph_osd_removal(client.clone()).await?;
|
||||
|
||||
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
||||
Ok(Outcome::success(format!(
|
||||
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
|
||||
osd_id_full, self.score.osd_deployment_name
|
||||
)))
|
||||
}
|
||||
fn get_name(&self) -> InterpretName {
|
||||
todo!()
|
||||
InterpretName::CephRemoveOsd
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
@ -82,7 +81,7 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
|
||||
}
|
||||
|
||||
impl CephRemoveOsdInterpret {
|
||||
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
|
||||
pub fn get_ceph_osd_id_numeric(&self) -> Result<String, InterpretError> {
|
||||
let osd_id_numeric = self
|
||||
.score
|
||||
.osd_deployment_name
|
||||
@ -94,9 +93,14 @@ impl CephRemoveOsdInterpret {
|
||||
self.score.osd_deployment_name
|
||||
))
|
||||
})?;
|
||||
Ok(osd_id_numeric.to_string())
|
||||
}
|
||||
|
||||
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
|
||||
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
|
||||
let osd_id_full = format!("osd.{}", osd_id_numeric);
|
||||
|
||||
info!(
|
||||
debug!(
|
||||
"Targeting Ceph OSD: {} (parsed from deployment {})",
|
||||
osd_id_full, self.score.osd_deployment_name
|
||||
);
|
||||
@ -108,6 +112,7 @@ impl CephRemoveOsdInterpret {
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
debug!("verifying toolbox exists");
|
||||
let toolbox_dep = "rook-ceph-tools".to_string();
|
||||
|
||||
match client
|
||||
@ -149,7 +154,7 @@ impl CephRemoveOsdInterpret {
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!(
|
||||
debug!(
|
||||
"Scaling down OSD deployment: {}",
|
||||
self.score.osd_deployment_name
|
||||
);
|
||||
@ -172,7 +177,7 @@ impl CephRemoveOsdInterpret {
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let (timeout, interval, start) = self.build_timer();
|
||||
|
||||
info!("Waiting for OSD deployment to scale down to 0 replicas");
|
||||
debug!("Waiting for OSD deployment to scale down to 0 replicas");
|
||||
loop {
|
||||
let dep = client
|
||||
.get_deployment(
|
||||
@ -180,11 +185,9 @@ impl CephRemoveOsdInterpret {
|
||||
Some(&self.score.rook_ceph_namespace),
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(deployment) = dep {
|
||||
if let Some(status) = deployment.status {
|
||||
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0
|
||||
{
|
||||
if status.replicas == None && status.ready_replicas == None {
|
||||
return Ok(Outcome::success(
|
||||
"Deployment successfully scaled down.".to_string(),
|
||||
));
|
||||
@ -212,7 +215,7 @@ impl CephRemoveOsdInterpret {
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!(
|
||||
debug!(
|
||||
"Deleting OSD deployment: {}",
|
||||
self.score.osd_deployment_name
|
||||
);
|
||||
@ -234,7 +237,7 @@ impl CephRemoveOsdInterpret {
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let (timeout, interval, start) = self.build_timer();
|
||||
|
||||
info!("Waiting for OSD deployment to scale down to 0 replicas");
|
||||
debug!("Verifying OSD deployment deleted");
|
||||
loop {
|
||||
let dep = client
|
||||
.get_deployment(
|
||||
@ -244,7 +247,7 @@ impl CephRemoveOsdInterpret {
|
||||
.await?;
|
||||
|
||||
if dep.is_none() {
|
||||
info!(
|
||||
debug!(
|
||||
"Deployment {} successfully deleted.",
|
||||
self.score.osd_deployment_name
|
||||
);
|
||||
@ -276,12 +279,10 @@ impl CephRemoveOsdInterpret {
|
||||
Ok(tree)
|
||||
}
|
||||
|
||||
pub async fn purge_ceph_osd(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
osd_id_full: &str,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!(
|
||||
pub async fn purge_ceph_osd(&self, client: Arc<K8sClient>) -> Result<Outcome, InterpretError> {
|
||||
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
|
||||
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
||||
debug!(
|
||||
"Purging OSD {} from Ceph cluster and removing its auth key",
|
||||
osd_id_full
|
||||
);
|
||||
@ -291,8 +292,9 @@ impl CephRemoveOsdInterpret {
|
||||
"app".to_string(),
|
||||
Some(&self.score.rook_ceph_namespace),
|
||||
vec![
|
||||
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(),
|
||||
format!("ceph auth del osd.{osd_id_full}").as_str(),
|
||||
"sh",
|
||||
"-c",
|
||||
format!("ceph osd purge {osd_id_numeric} --yes-i-really-mean-it && ceph auth del {osd_id_full}").as_str(),
|
||||
],
|
||||
)
|
||||
.await?;
|
||||
@ -305,10 +307,10 @@ impl CephRemoveOsdInterpret {
|
||||
pub async fn verify_ceph_osd_removal(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
osd_id_full: &str,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let (timeout, interval, start) = self.build_timer();
|
||||
info!(
|
||||
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
||||
debug!(
|
||||
"Verifying OSD {} has been removed from the Ceph tree...",
|
||||
osd_id_full
|
||||
);
|
||||
@ -318,7 +320,7 @@ impl CephRemoveOsdInterpret {
|
||||
"rook-ceph-tools".to_string(),
|
||||
"app".to_string(),
|
||||
Some(&self.score.rook_ceph_namespace),
|
||||
vec!["ceph osd tree -f json"],
|
||||
vec!["sh", "-c", "ceph osd tree -f json"],
|
||||
)
|
||||
.await?;
|
||||
let tree =
|
@ -1,2 +1,2 @@
|
||||
pub mod ceph_osd_replacement_score;
|
||||
pub mod ceph_remove_osd_score;
|
||||
pub mod ceph_validate_health_score;
|
||||
|
Loading…
Reference in New Issue
Block a user