Merge branch 'master' into feat/scrape_target
All checks were successful
Run Check Script / check (pull_request) Successful in 1m32s
All checks were successful
Run Check Script / check (pull_request) Successful in 1m32s
This commit is contained in:
@@ -30,6 +30,7 @@ pub enum InterpretName {
|
||||
Lamp,
|
||||
ApplicationMonitoring,
|
||||
K8sPrometheusCrdAlerting,
|
||||
CephRemoveOsd,
|
||||
DiscoverInventoryAgent,
|
||||
CephClusterHealth,
|
||||
Custom(&'static str),
|
||||
@@ -61,6 +62,7 @@ impl std::fmt::Display for InterpretName {
|
||||
InterpretName::Lamp => f.write_str("LAMP"),
|
||||
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
|
||||
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
|
||||
InterpretName::CephRemoveOsd => f.write_str("CephRemoveOsd"),
|
||||
InterpretName::DiscoverInventoryAgent => f.write_str("DiscoverInventoryAgent"),
|
||||
InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"),
|
||||
InterpretName::Custom(name) => f.write_str(name),
|
||||
|
||||
@@ -3,13 +3,11 @@ use std::time::Duration;
|
||||
use derive_new::new;
|
||||
use k8s_openapi::{
|
||||
ClusterResourceScope, NamespaceResourceScope,
|
||||
api::{
|
||||
apps::v1::Deployment,
|
||||
core::v1::{Pod, PodStatus},
|
||||
},
|
||||
api::{apps::v1::Deployment, core::v1::Pod},
|
||||
apimachinery::pkg::version::Info,
|
||||
};
|
||||
use kube::{
|
||||
Client, Config, Error, Resource,
|
||||
Client, Config, Discovery, Error, Resource,
|
||||
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
|
||||
config::{KubeConfigOptions, Kubeconfig},
|
||||
core::ErrorResponse,
|
||||
@@ -21,7 +19,7 @@ use kube::{
|
||||
api::{ApiResource, GroupVersionKind},
|
||||
runtime::wait::await_condition,
|
||||
};
|
||||
use log::{debug, error, trace};
|
||||
use log::{debug, error, info, trace};
|
||||
use serde::{Serialize, de::DeserializeOwned};
|
||||
use serde_json::{Value, json};
|
||||
use similar::TextDiff;
|
||||
@@ -59,6 +57,17 @@ impl K8sClient {
|
||||
})
|
||||
}
|
||||
|
||||
pub async fn get_apiserver_version(&self) -> Result<Info, Error> {
|
||||
let client: Client = self.client.clone();
|
||||
let version_info: Info = client.apiserver_version().await?;
|
||||
Ok(version_info)
|
||||
}
|
||||
|
||||
pub async fn discovery(&self) -> Result<Discovery, Error> {
|
||||
let discovery: Discovery = Discovery::new(self.client.clone()).run().await?;
|
||||
Ok(discovery)
|
||||
}
|
||||
|
||||
pub async fn get_resource_json_value(
|
||||
&self,
|
||||
name: &str,
|
||||
@@ -80,10 +89,13 @@ impl K8sClient {
|
||||
namespace: Option<&str>,
|
||||
) -> Result<Option<Deployment>, Error> {
|
||||
let deps: Api<Deployment> = if let Some(ns) = namespace {
|
||||
debug!("getting namespaced deployment");
|
||||
Api::namespaced(self.client.clone(), ns)
|
||||
} else {
|
||||
debug!("getting default namespace deployment");
|
||||
Api::default_namespaced(self.client.clone())
|
||||
};
|
||||
debug!("getting deployment {} in ns {}", name, namespace.unwrap());
|
||||
Ok(deps.get_opt(name).await?)
|
||||
}
|
||||
|
||||
@@ -114,7 +126,7 @@ impl K8sClient {
|
||||
}
|
||||
});
|
||||
let pp = PatchParams::default();
|
||||
let scale = Patch::Apply(&patch);
|
||||
let scale = Patch::Merge(&patch);
|
||||
deployments.patch_scale(name, &pp, &scale).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
@@ -47,6 +47,13 @@ struct K8sState {
|
||||
message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum KubernetesDistribution {
|
||||
OpenshiftFamily,
|
||||
K3sFamily,
|
||||
Default,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum K8sSource {
|
||||
LocalK3d,
|
||||
@@ -57,6 +64,7 @@ enum K8sSource {
|
||||
pub struct K8sAnywhereTopology {
|
||||
k8s_state: Arc<OnceCell<Option<K8sState>>>,
|
||||
tenant_manager: Arc<OnceCell<K8sTenantManager>>,
|
||||
k8s_distribution: Arc<OnceCell<KubernetesDistribution>>,
|
||||
config: Arc<K8sAnywhereConfig>,
|
||||
}
|
||||
|
||||
@@ -162,6 +170,7 @@ impl K8sAnywhereTopology {
|
||||
Self {
|
||||
k8s_state: Arc::new(OnceCell::new()),
|
||||
tenant_manager: Arc::new(OnceCell::new()),
|
||||
k8s_distribution: Arc::new(OnceCell::new()),
|
||||
config: Arc::new(K8sAnywhereConfig::from_env()),
|
||||
}
|
||||
}
|
||||
@@ -170,10 +179,42 @@ impl K8sAnywhereTopology {
|
||||
Self {
|
||||
k8s_state: Arc::new(OnceCell::new()),
|
||||
tenant_manager: Arc::new(OnceCell::new()),
|
||||
k8s_distribution: Arc::new(OnceCell::new()),
|
||||
config: Arc::new(config),
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn get_k8s_distribution(&self) -> Result<&KubernetesDistribution, PreparationError> {
|
||||
self.k8s_distribution
|
||||
.get_or_try_init(async || {
|
||||
let client = self.k8s_client().await.unwrap();
|
||||
|
||||
let discovery = client.discovery().await.map_err(|e| {
|
||||
PreparationError::new(format!("Could not discover API groups: {}", e))
|
||||
})?;
|
||||
|
||||
let version = client.get_apiserver_version().await.map_err(|e| {
|
||||
PreparationError::new(format!("Could not get server version: {}", e))
|
||||
})?;
|
||||
|
||||
// OpenShift / OKD
|
||||
if discovery
|
||||
.groups()
|
||||
.any(|g| g.name() == "project.openshift.io")
|
||||
{
|
||||
return Ok(KubernetesDistribution::OpenshiftFamily);
|
||||
}
|
||||
|
||||
// K3d / K3s
|
||||
if version.git_version.contains("k3s") {
|
||||
return Ok(KubernetesDistribution::K3sFamily);
|
||||
}
|
||||
|
||||
return Ok(KubernetesDistribution::Default);
|
||||
})
|
||||
.await
|
||||
}
|
||||
|
||||
async fn get_cluster_observability_operator_prometheus_application_score(
|
||||
&self,
|
||||
sender: RHOBObservability,
|
||||
|
||||
@@ -28,13 +28,7 @@ pub trait LoadBalancer: Send + Sync {
|
||||
&self,
|
||||
service: &LoadBalancerService,
|
||||
) -> Result<(), ExecutorError> {
|
||||
debug!(
|
||||
"Listing LoadBalancer services {:?}",
|
||||
self.list_services().await
|
||||
);
|
||||
if !self.list_services().await.contains(service) {
|
||||
self.add_service(service).await?;
|
||||
}
|
||||
self.add_service(service).await?;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -26,19 +26,13 @@ impl LoadBalancer for OPNSenseFirewall {
|
||||
}
|
||||
|
||||
async fn add_service(&self, service: &LoadBalancerService) -> Result<(), ExecutorError> {
|
||||
warn!(
|
||||
"TODO : the current implementation does not check / cleanup / merge with existing haproxy services properly. Make sure to manually verify that the configuration is correct after executing any operation here"
|
||||
);
|
||||
let mut config = self.opnsense_config.write().await;
|
||||
let mut load_balancer = config.load_balancer();
|
||||
|
||||
let (frontend, backend, servers, healthcheck) =
|
||||
harmony_load_balancer_service_to_haproxy_xml(service);
|
||||
let mut load_balancer = config.load_balancer();
|
||||
load_balancer.add_backend(backend);
|
||||
load_balancer.add_frontend(frontend);
|
||||
load_balancer.add_servers(servers);
|
||||
if let Some(healthcheck) = healthcheck {
|
||||
load_balancer.add_healthcheck(healthcheck);
|
||||
}
|
||||
|
||||
load_balancer.configure_service(frontend, backend, servers, healthcheck);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
@@ -106,7 +100,7 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer(
|
||||
.backends
|
||||
.backends
|
||||
.iter()
|
||||
.find(|b| b.uuid == frontend.default_backend);
|
||||
.find(|b| Some(b.uuid.clone()) == frontend.default_backend);
|
||||
|
||||
let mut health_check = None;
|
||||
match matching_backend {
|
||||
@@ -116,8 +110,7 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer(
|
||||
}
|
||||
None => {
|
||||
warn!(
|
||||
"HAProxy config could not find a matching backend for frontend {:?}",
|
||||
frontend
|
||||
"HAProxy config could not find a matching backend for frontend {frontend:?}"
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -152,11 +145,11 @@ pub(crate) fn get_servers_for_backend(
|
||||
.servers
|
||||
.iter()
|
||||
.filter_map(|server| {
|
||||
let address = server.address.clone()?;
|
||||
let port = server.port?;
|
||||
|
||||
if backend_servers.contains(&server.uuid.as_str()) {
|
||||
return Some(BackendServer {
|
||||
address: server.address.clone(),
|
||||
port: server.port,
|
||||
});
|
||||
return Some(BackendServer { address, port });
|
||||
}
|
||||
None
|
||||
})
|
||||
@@ -347,7 +340,7 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
|
||||
name: format!("frontend_{}", service.listening_port),
|
||||
bind: service.listening_port.to_string(),
|
||||
mode: "tcp".to_string(), // TODO do not depend on health check here
|
||||
default_backend: backend.uuid.clone(),
|
||||
default_backend: Some(backend.uuid.clone()),
|
||||
..Default::default()
|
||||
};
|
||||
info!("HAPRoxy frontend and backend mode currently hardcoded to tcp");
|
||||
@@ -361,8 +354,8 @@ fn server_to_haproxy_server(server: &BackendServer) -> HAProxyServer {
|
||||
uuid: Uuid::new_v4().to_string(),
|
||||
name: format!("{}_{}", &server.address, &server.port),
|
||||
enabled: 1,
|
||||
address: server.address.clone(),
|
||||
port: server.port,
|
||||
address: Some(server.address.clone()),
|
||||
port: Some(server.port),
|
||||
mode: "active".to_string(),
|
||||
server_type: "static".to_string(),
|
||||
..Default::default()
|
||||
@@ -385,8 +378,8 @@ mod tests {
|
||||
let mut haproxy = HAProxy::default();
|
||||
let server = HAProxyServer {
|
||||
uuid: "server1".to_string(),
|
||||
address: "192.168.1.1".to_string(),
|
||||
port: 80,
|
||||
address: Some("192.168.1.1".to_string()),
|
||||
port: Some(80),
|
||||
..Default::default()
|
||||
};
|
||||
haproxy.servers.servers.push(server);
|
||||
@@ -411,8 +404,8 @@ mod tests {
|
||||
let mut haproxy = HAProxy::default();
|
||||
let server = HAProxyServer {
|
||||
uuid: "server1".to_string(),
|
||||
address: "192.168.1.1".to_string(),
|
||||
port: 80,
|
||||
address: Some("192.168.1.1".to_string()),
|
||||
port: Some(80),
|
||||
..Default::default()
|
||||
};
|
||||
haproxy.servers.servers.push(server);
|
||||
@@ -431,8 +424,8 @@ mod tests {
|
||||
let mut haproxy = HAProxy::default();
|
||||
let server = HAProxyServer {
|
||||
uuid: "server1".to_string(),
|
||||
address: "192.168.1.1".to_string(),
|
||||
port: 80,
|
||||
address: Some("192.168.1.1".to_string()),
|
||||
port: Some(80),
|
||||
..Default::default()
|
||||
};
|
||||
haproxy.servers.servers.push(server);
|
||||
@@ -453,16 +446,16 @@ mod tests {
|
||||
let mut haproxy = HAProxy::default();
|
||||
let server = HAProxyServer {
|
||||
uuid: "server1".to_string(),
|
||||
address: "some-hostname.test.mcd".to_string(),
|
||||
port: 80,
|
||||
address: Some("some-hostname.test.mcd".to_string()),
|
||||
port: Some(80),
|
||||
..Default::default()
|
||||
};
|
||||
haproxy.servers.servers.push(server);
|
||||
|
||||
let server = HAProxyServer {
|
||||
uuid: "server2".to_string(),
|
||||
address: "192.168.1.2".to_string(),
|
||||
port: 8080,
|
||||
address: Some("192.168.1.2".to_string()),
|
||||
port: Some(8080),
|
||||
..Default::default()
|
||||
};
|
||||
haproxy.servers.servers.push(server);
|
||||
|
||||
209
harmony/src/modules/cert_manager/cluster_issuer.rs
Normal file
209
harmony/src/modules/cert_manager/cluster_issuer.rs
Normal file
@@ -0,0 +1,209 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::id::Id;
|
||||
use kube::{CustomResource, api::ObjectMeta};
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
data::Version,
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
score::Score,
|
||||
topology::{K8sclient, Topology, k8s::K8sClient},
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct ClusterIssuerScore {
|
||||
email: String,
|
||||
server: String,
|
||||
issuer_name: String,
|
||||
namespace: String,
|
||||
}
|
||||
|
||||
impl<T: Topology + K8sclient> Score<T> for ClusterIssuerScore {
|
||||
fn name(&self) -> String {
|
||||
"ClusterIssuerScore".to_string()
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(ClusterIssuerInterpret {
|
||||
score: self.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct ClusterIssuerInterpret {
|
||||
score: ClusterIssuerScore,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + K8sclient> Interpret<T> for ClusterIssuerInterpret {
|
||||
async fn execute(
|
||||
&self,
|
||||
_inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
self.apply_cluster_issuer(topology.k8s_client().await.unwrap())
|
||||
.await
|
||||
}
|
||||
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("ClusterIssuer")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
impl ClusterIssuerInterpret {
|
||||
async fn validate_cert_manager(
|
||||
&self,
|
||||
client: &Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let cert_manager = "cert-manager".to_string();
|
||||
let operator_namespace = "openshift-operators".to_string();
|
||||
match client
|
||||
.get_deployment(&cert_manager, Some(&operator_namespace))
|
||||
.await
|
||||
{
|
||||
Ok(Some(deployment)) => {
|
||||
if let Some(status) = deployment.status {
|
||||
let ready_count = status.ready_replicas.unwrap_or(0);
|
||||
if ready_count >= 1 {
|
||||
return Ok(Outcome::success(format!(
|
||||
"'{}' is ready with {} replica(s).",
|
||||
&cert_manager, ready_count
|
||||
)));
|
||||
} else {
|
||||
return Err(InterpretError::new(
|
||||
"cert-manager operator not ready in cluster".to_string(),
|
||||
));
|
||||
}
|
||||
} else {
|
||||
Err(InterpretError::new(format!(
|
||||
"failed to get deployment status {} in ns {}",
|
||||
&cert_manager, &operator_namespace
|
||||
)))
|
||||
}
|
||||
}
|
||||
Ok(None) => Err(InterpretError::new(format!(
|
||||
"Deployment '{}' not found in namespace '{}'.",
|
||||
&cert_manager, &operator_namespace
|
||||
))),
|
||||
Err(e) => Err(InterpretError::new(format!(
|
||||
"Failed to query for deployment '{}': {}",
|
||||
&cert_manager, e
|
||||
))),
|
||||
}
|
||||
}
|
||||
|
||||
fn build_cluster_issuer(&self) -> Result<ClusterIssuer, InterpretError> {
|
||||
let issuer_name = &self.score.issuer_name;
|
||||
let email = &self.score.email;
|
||||
let server = &self.score.server;
|
||||
let namespace = &self.score.namespace;
|
||||
let cluster_issuer = ClusterIssuer {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(issuer_name.to_string()),
|
||||
namespace: Some(namespace.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: ClusterIssuerSpec {
|
||||
acme: AcmeSpec {
|
||||
email: email.to_string(),
|
||||
private_key_secret_ref: PrivateKeySecretRef {
|
||||
name: issuer_name.to_string(),
|
||||
},
|
||||
server: server.to_string(),
|
||||
solvers: vec![SolverSpec {
|
||||
http01: Some(Http01Solver {
|
||||
ingress: Http01Ingress {
|
||||
class: "nginx".to_string(),
|
||||
},
|
||||
}),
|
||||
}],
|
||||
},
|
||||
},
|
||||
};
|
||||
|
||||
Ok(cluster_issuer)
|
||||
}
|
||||
|
||||
pub async fn apply_cluster_issuer(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let namespace = self.score.namespace.clone();
|
||||
self.validate_cert_manager(&client).await?;
|
||||
let cluster_issuer = self.build_cluster_issuer().unwrap();
|
||||
client
|
||||
.apply_yaml(
|
||||
&serde_yaml::to_value(cluster_issuer).unwrap(),
|
||||
Some(&namespace),
|
||||
)
|
||||
.await?;
|
||||
Ok(Outcome::success(format!(
|
||||
"successfully deployed cluster operator: {} in namespace: {}",
|
||||
self.score.issuer_name, self.score.namespace
|
||||
)))
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[kube(
|
||||
group = "cert-manager.io",
|
||||
version = "v1",
|
||||
kind = "ClusterIssuer",
|
||||
plural = "clusterissuers"
|
||||
)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ClusterIssuerSpec {
|
||||
pub acme: AcmeSpec,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct AcmeSpec {
|
||||
pub email: String,
|
||||
pub private_key_secret_ref: PrivateKeySecretRef,
|
||||
pub server: String,
|
||||
pub solvers: Vec<SolverSpec>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct PrivateKeySecretRef {
|
||||
pub name: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SolverSpec {
|
||||
pub http01: Option<Http01Solver>,
|
||||
// Other solver types (e.g., dns01) would go here as Options
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Http01Solver {
|
||||
pub ingress: Http01Ingress,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Http01Ingress {
|
||||
pub class: String,
|
||||
}
|
||||
@@ -1,2 +1,3 @@
|
||||
pub mod cluster_issuer;
|
||||
mod helm;
|
||||
pub use helm::*;
|
||||
|
||||
@@ -77,6 +77,8 @@ impl OKDBootstrapLoadBalancerScore {
|
||||
address: topology.bootstrap_host.ip.to_string(),
|
||||
port,
|
||||
});
|
||||
|
||||
backend.dedup();
|
||||
backend
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,7 +4,7 @@ use std::{
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use log::{info, warn};
|
||||
use log::{debug, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::time::sleep;
|
||||
|
||||
@@ -19,8 +19,8 @@ use harmony_types::id::Id;
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct CephRemoveOsd {
|
||||
osd_deployment_name: String,
|
||||
rook_ceph_namespace: String,
|
||||
pub osd_deployment_name: String,
|
||||
pub rook_ceph_namespace: String,
|
||||
}
|
||||
|
||||
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
|
||||
@@ -54,18 +54,17 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
|
||||
self.verify_deployment_scaled(client.clone()).await?;
|
||||
self.delete_deployment(client.clone()).await?;
|
||||
self.verify_deployment_deleted(client.clone()).await?;
|
||||
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
||||
self.purge_ceph_osd(client.clone(), &osd_id_full).await?;
|
||||
self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
|
||||
.await?;
|
||||
self.purge_ceph_osd(client.clone()).await?;
|
||||
self.verify_ceph_osd_removal(client.clone()).await?;
|
||||
|
||||
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
||||
Ok(Outcome::success(format!(
|
||||
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
|
||||
osd_id_full, self.score.osd_deployment_name
|
||||
)))
|
||||
}
|
||||
fn get_name(&self) -> InterpretName {
|
||||
todo!()
|
||||
InterpretName::CephRemoveOsd
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
@@ -82,7 +81,7 @@ impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
|
||||
}
|
||||
|
||||
impl CephRemoveOsdInterpret {
|
||||
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
|
||||
pub fn get_ceph_osd_id_numeric(&self) -> Result<String, InterpretError> {
|
||||
let osd_id_numeric = self
|
||||
.score
|
||||
.osd_deployment_name
|
||||
@@ -94,9 +93,14 @@ impl CephRemoveOsdInterpret {
|
||||
self.score.osd_deployment_name
|
||||
))
|
||||
})?;
|
||||
Ok(osd_id_numeric.to_string())
|
||||
}
|
||||
|
||||
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
|
||||
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
|
||||
let osd_id_full = format!("osd.{}", osd_id_numeric);
|
||||
|
||||
info!(
|
||||
debug!(
|
||||
"Targeting Ceph OSD: {} (parsed from deployment {})",
|
||||
osd_id_full, self.score.osd_deployment_name
|
||||
);
|
||||
@@ -108,6 +112,7 @@ impl CephRemoveOsdInterpret {
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
debug!("verifying toolbox exists");
|
||||
let toolbox_dep = "rook-ceph-tools".to_string();
|
||||
|
||||
match client
|
||||
@@ -149,7 +154,7 @@ impl CephRemoveOsdInterpret {
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!(
|
||||
debug!(
|
||||
"Scaling down OSD deployment: {}",
|
||||
self.score.osd_deployment_name
|
||||
);
|
||||
@@ -172,7 +177,7 @@ impl CephRemoveOsdInterpret {
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let (timeout, interval, start) = self.build_timer();
|
||||
|
||||
info!("Waiting for OSD deployment to scale down to 0 replicas");
|
||||
debug!("Waiting for OSD deployment to scale down to 0 replicas");
|
||||
loop {
|
||||
let dep = client
|
||||
.get_deployment(
|
||||
@@ -180,11 +185,9 @@ impl CephRemoveOsdInterpret {
|
||||
Some(&self.score.rook_ceph_namespace),
|
||||
)
|
||||
.await?;
|
||||
|
||||
if let Some(deployment) = dep {
|
||||
if let Some(status) = deployment.status {
|
||||
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0
|
||||
{
|
||||
if status.replicas == None && status.ready_replicas == None {
|
||||
return Ok(Outcome::success(
|
||||
"Deployment successfully scaled down.".to_string(),
|
||||
));
|
||||
@@ -212,7 +215,7 @@ impl CephRemoveOsdInterpret {
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!(
|
||||
debug!(
|
||||
"Deleting OSD deployment: {}",
|
||||
self.score.osd_deployment_name
|
||||
);
|
||||
@@ -234,7 +237,7 @@ impl CephRemoveOsdInterpret {
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let (timeout, interval, start) = self.build_timer();
|
||||
|
||||
info!("Waiting for OSD deployment to scale down to 0 replicas");
|
||||
debug!("Verifying OSD deployment deleted");
|
||||
loop {
|
||||
let dep = client
|
||||
.get_deployment(
|
||||
@@ -244,7 +247,7 @@ impl CephRemoveOsdInterpret {
|
||||
.await?;
|
||||
|
||||
if dep.is_none() {
|
||||
info!(
|
||||
debug!(
|
||||
"Deployment {} successfully deleted.",
|
||||
self.score.osd_deployment_name
|
||||
);
|
||||
@@ -276,12 +279,10 @@ impl CephRemoveOsdInterpret {
|
||||
Ok(tree)
|
||||
}
|
||||
|
||||
pub async fn purge_ceph_osd(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
osd_id_full: &str,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!(
|
||||
pub async fn purge_ceph_osd(&self, client: Arc<K8sClient>) -> Result<Outcome, InterpretError> {
|
||||
let osd_id_numeric = self.get_ceph_osd_id_numeric().unwrap();
|
||||
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
||||
debug!(
|
||||
"Purging OSD {} from Ceph cluster and removing its auth key",
|
||||
osd_id_full
|
||||
);
|
||||
@@ -291,8 +292,9 @@ impl CephRemoveOsdInterpret {
|
||||
"app".to_string(),
|
||||
Some(&self.score.rook_ceph_namespace),
|
||||
vec![
|
||||
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(),
|
||||
format!("ceph auth del osd.{osd_id_full}").as_str(),
|
||||
"sh",
|
||||
"-c",
|
||||
format!("ceph osd purge {osd_id_numeric} --yes-i-really-mean-it && ceph auth del {osd_id_full}").as_str(),
|
||||
],
|
||||
)
|
||||
.await?;
|
||||
@@ -305,10 +307,10 @@ impl CephRemoveOsdInterpret {
|
||||
pub async fn verify_ceph_osd_removal(
|
||||
&self,
|
||||
client: Arc<K8sClient>,
|
||||
osd_id_full: &str,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let (timeout, interval, start) = self.build_timer();
|
||||
info!(
|
||||
let osd_id_full = self.get_ceph_osd_id().unwrap();
|
||||
debug!(
|
||||
"Verifying OSD {} has been removed from the Ceph tree...",
|
||||
osd_id_full
|
||||
);
|
||||
@@ -318,7 +320,7 @@ impl CephRemoveOsdInterpret {
|
||||
"rook-ceph-tools".to_string(),
|
||||
"app".to_string(),
|
||||
Some(&self.score.rook_ceph_namespace),
|
||||
vec!["ceph osd tree -f json"],
|
||||
vec!["sh", "-c", "ceph osd tree -f json"],
|
||||
)
|
||||
.await?;
|
||||
let tree =
|
||||
@@ -1,2 +1,2 @@
|
||||
pub mod ceph_osd_replacement_score;
|
||||
pub mod ceph_remove_osd_score;
|
||||
pub mod ceph_validate_health_score;
|
||||
|
||||
Reference in New Issue
Block a user