This commit is contained in:
Ian Letourneau 2025-10-17 11:39:26 -04:00
parent dbd1f1b010
commit 83fcf9e8ac
3 changed files with 87 additions and 14 deletions

View File

@ -29,9 +29,9 @@ use super::{
Topology, k8s::K8sClient, Topology, k8s::K8sClient,
}; };
use std::collections::BTreeMap;
use std::net::IpAddr; use std::net::IpAddr;
use std::sync::Arc; use std::sync::Arc;
use std::{collections::BTreeMap, time::Duration};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct HAClusterTopology { pub struct HAClusterTopology {
@ -102,13 +102,13 @@ impl HAClusterTopology {
} }
async fn ensure_nmstate_operator_installed(&self) -> Result<(), String> { async fn ensure_nmstate_operator_installed(&self) -> Result<(), String> {
// FIXME: Find a way to check nmstate is already available (get pod -n openshift-nmstate) // FIXME: Find a way to check nmstate is already available (get pod -n nmstate)
debug!("Installing NMState operator..."); debug!("Installing NMState operator...");
let k8s_client = self.k8s_client().await?; let k8s_client = self.k8s_client().await?;
let nmstate_namespace = Namespace { let nmstate_namespace = Namespace {
metadata: ObjectMeta { metadata: ObjectMeta {
name: Some("openshift-nmstate".to_string()), name: Some("nmstate".to_string()),
finalizers: Some(vec!["kubernetes".to_string()]), finalizers: Some(vec!["kubernetes".to_string()]),
..Default::default() ..Default::default()
}, },
@ -122,24 +122,24 @@ impl HAClusterTopology {
let nmstate_operator_group = OperatorGroup { let nmstate_operator_group = OperatorGroup {
metadata: ObjectMeta { metadata: ObjectMeta {
name: Some("openshift-nmstate".to_string()), name: Some("nmstate".to_string()),
namespace: Some("openshift-nmstate".to_string()), namespace: Some("nmstate".to_string()),
..Default::default() ..Default::default()
}, },
spec: OperatorGroupSpec { spec: OperatorGroupSpec {
target_namespaces: vec!["openshift-nmstate".to_string()], target_namespaces: vec!["nmstate".to_string()],
}, },
}; };
debug!("Creating NMState operator group: {nmstate_operator_group:#?}"); debug!("Creating NMState operator group: {nmstate_operator_group:#?}");
k8s_client k8s_client
.apply(&nmstate_operator_group, Some("openshift-nmstate")) .apply(&nmstate_operator_group, Some("nmstate"))
.await .await
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
let nmstate_subscription = Subscription { let nmstate_subscription = Subscription {
metadata: ObjectMeta { metadata: ObjectMeta {
name: Some("kubernetes-nmstate-operator".to_string()), name: Some("kubernetes-nmstate-operator".to_string()),
namespace: Some("openshift-nmstate".to_string()), namespace: Some("nmstate".to_string()),
..Default::default() ..Default::default()
}, },
spec: SubscriptionSpec { spec: SubscriptionSpec {
@ -147,25 +147,34 @@ impl HAClusterTopology {
name: "kubernetes-nmstate-operator".to_string(), name: "kubernetes-nmstate-operator".to_string(),
source: "operatorhubio-catalog".to_string(), source: "operatorhubio-catalog".to_string(),
source_namespace: "openshift-marketplace".to_string(), source_namespace: "openshift-marketplace".to_string(),
install_plan_approval: Some(InstallPlanApproval::Automatic),
}, },
}; };
debug!("Subscribing to NMState Operator: {nmstate_subscription:#?}"); debug!("Subscribing to NMState Operator: {nmstate_subscription:#?}");
k8s_client k8s_client
.apply(&nmstate_subscription, Some("openshift-nmstate")) .apply(&nmstate_subscription, Some("nmstate"))
.await .await
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
k8s_client
.wait_for_operator(
"kubernetes-nmstate-operator",
Some("nmstate"),
Some(Duration::from_secs(30)),
)
.await?;
let nmstate = NMState { let nmstate = NMState {
metadata: ObjectMeta { metadata: ObjectMeta {
name: Some("nmstate".to_string()), name: Some("nmstate".to_string()),
namespace: Some("openshift-nmstate".to_string()), namespace: Some("nmstate".to_string()),
..Default::default() ..Default::default()
}, },
..Default::default() ..Default::default()
}; };
debug!("Creating NMState: {nmstate:#?}"); debug!("Creating NMState: {nmstate:#?}");
k8s_client k8s_client
.apply(&nmstate, Some("openshift-nmstate")) .apply(&nmstate, Some("nmstate"))
.await .await
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;

View File

@ -10,11 +10,13 @@ use k8s_openapi::{
}; };
use kube::{ use kube::{
Client, Config, Error, Resource, Client, Config, Error, Resource,
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt}, api::{
Api, AttachParams, DeleteParams, ListParams, ObjectMeta, Patch, PatchParams, ResourceExt,
},
config::{KubeConfigOptions, Kubeconfig}, config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse, core::ErrorResponse,
error::DiscoveryError, error::DiscoveryError,
runtime::reflector::Lookup, runtime::{reflector::Lookup, wait::Condition},
}; };
use kube::{api::DynamicObject, runtime::conditions}; use kube::{api::DynamicObject, runtime::conditions};
use kube::{ use kube::{
@ -22,11 +24,13 @@ use kube::{
runtime::wait::await_condition, runtime::wait::await_condition,
}; };
use log::{debug, error, trace}; use log::{debug, error, trace};
use serde::{Serialize, de::DeserializeOwned}; use serde::{Deserialize, Serialize, de::DeserializeOwned};
use serde_json::{Value, json}; use serde_json::{Value, json};
use similar::TextDiff; use similar::TextDiff;
use tokio::{io::AsyncReadExt, time::sleep}; use tokio::{io::AsyncReadExt, time::sleep};
use crate::modules::okd::crd::ClusterServiceVersion;
#[derive(new, Clone)] #[derive(new, Clone)]
pub struct K8sClient { pub struct K8sClient {
client: Client, client: Client,
@ -194,6 +198,33 @@ impl K8sClient {
} }
} }
pub async fn wait_for_operator(
&self,
operator_name: &str,
namespace: Option<&str>,
timeout: Option<Duration>,
) -> Result<(), String> {
let api: Api<ClusterServiceVersion>;
if let Some(ns) = namespace {
api = Api::namespaced(self.client.clone(), ns);
} else {
api = Api::default_namespaced(self.client.clone());
}
let establish = await_condition(api, operator_name, is_operator_ready());
let t = timeout.unwrap_or(Duration::from_secs(5));
let res = tokio::time::timeout(t, establish).await;
if res.is_ok() {
Ok(())
} else {
Err(format!(
"timed out while waiting for operator {operator_name}"
))
}
}
/// Will execute a commond in the first pod found that matches the specified label /// Will execute a commond in the first pod found that matches the specified label
/// '{label}={name}' /// '{label}={name}'
pub async fn exec_app_capture_output( pub async fn exec_app_capture_output(
@ -547,3 +578,14 @@ where
} }
} }
} }
fn is_operator_ready() -> impl Condition<ClusterServiceVersion> {
|obj: Option<&ClusterServiceVersion>| {
if let Some(csv) = obj {
if let Some(status) = &csv.spec.status {
return status.phase == "Succeeded";
}
}
false
}
}

View File

@ -28,7 +28,10 @@ pub struct SubscriptionSpec {
pub name: String, pub name: String,
pub source: String, pub source: String,
pub source_namespace: String, pub source_namespace: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub channel: Option<String>, pub channel: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub install_plan_approval: Option<InstallPlanApproval>,
} }
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] #[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
@ -38,3 +41,22 @@ pub enum InstallPlanApproval {
#[serde(rename = "Manual")] #[serde(rename = "Manual")]
Manual, Manual,
} }
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "operators.coreos.com",
version = "v1alpha1",
kind = "ClusterServiceVersion",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct ClusterServiceVersionSpec {
pub status: Option<ClusterServiceVersionStatus>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ClusterServiceVersionStatus {
pub phase: String,
pub reason: String,
}