diff --git a/harmony/src/domain/topology/ha_cluster.rs b/harmony/src/domain/topology/ha_cluster.rs index 5e9a567..4107953 100644 --- a/harmony/src/domain/topology/ha_cluster.rs +++ b/harmony/src/domain/topology/ha_cluster.rs @@ -29,9 +29,9 @@ use super::{ Topology, k8s::K8sClient, }; -use std::collections::BTreeMap; use std::net::IpAddr; use std::sync::Arc; +use std::{collections::BTreeMap, time::Duration}; #[derive(Debug, Clone)] pub struct HAClusterTopology { @@ -102,13 +102,13 @@ impl HAClusterTopology { } async fn ensure_nmstate_operator_installed(&self) -> Result<(), String> { - // FIXME: Find a way to check nmstate is already available (get pod -n openshift-nmstate) + // FIXME: Find a way to check nmstate is already available (get pod -n nmstate) debug!("Installing NMState operator..."); let k8s_client = self.k8s_client().await?; let nmstate_namespace = Namespace { metadata: ObjectMeta { - name: Some("openshift-nmstate".to_string()), + name: Some("nmstate".to_string()), finalizers: Some(vec!["kubernetes".to_string()]), ..Default::default() }, @@ -122,24 +122,24 @@ impl HAClusterTopology { let nmstate_operator_group = OperatorGroup { metadata: ObjectMeta { - name: Some("openshift-nmstate".to_string()), - namespace: Some("openshift-nmstate".to_string()), + name: Some("nmstate".to_string()), + namespace: Some("nmstate".to_string()), ..Default::default() }, spec: OperatorGroupSpec { - target_namespaces: vec!["openshift-nmstate".to_string()], + target_namespaces: vec!["nmstate".to_string()], }, }; debug!("Creating NMState operator group: {nmstate_operator_group:#?}"); k8s_client - .apply(&nmstate_operator_group, Some("openshift-nmstate")) + .apply(&nmstate_operator_group, Some("nmstate")) .await .map_err(|e| e.to_string())?; let nmstate_subscription = Subscription { metadata: ObjectMeta { name: Some("kubernetes-nmstate-operator".to_string()), - namespace: Some("openshift-nmstate".to_string()), + namespace: Some("nmstate".to_string()), ..Default::default() }, spec: SubscriptionSpec { @@ -147,25 +147,34 @@ impl HAClusterTopology { name: "kubernetes-nmstate-operator".to_string(), source: "operatorhubio-catalog".to_string(), source_namespace: "openshift-marketplace".to_string(), + install_plan_approval: Some(InstallPlanApproval::Automatic), }, }; debug!("Subscribing to NMState Operator: {nmstate_subscription:#?}"); k8s_client - .apply(&nmstate_subscription, Some("openshift-nmstate")) + .apply(&nmstate_subscription, Some("nmstate")) .await .map_err(|e| e.to_string())?; + k8s_client + .wait_for_operator( + "kubernetes-nmstate-operator", + Some("nmstate"), + Some(Duration::from_secs(30)), + ) + .await?; + let nmstate = NMState { metadata: ObjectMeta { name: Some("nmstate".to_string()), - namespace: Some("openshift-nmstate".to_string()), + namespace: Some("nmstate".to_string()), ..Default::default() }, ..Default::default() }; debug!("Creating NMState: {nmstate:#?}"); k8s_client - .apply(&nmstate, Some("openshift-nmstate")) + .apply(&nmstate, Some("nmstate")) .await .map_err(|e| e.to_string())?; diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 5a1e6ec..22c1799 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -10,11 +10,13 @@ use k8s_openapi::{ }; use kube::{ Client, Config, Error, Resource, - api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt}, + api::{ + Api, AttachParams, DeleteParams, ListParams, ObjectMeta, Patch, PatchParams, ResourceExt, + }, config::{KubeConfigOptions, Kubeconfig}, core::ErrorResponse, error::DiscoveryError, - runtime::reflector::Lookup, + runtime::{reflector::Lookup, wait::Condition}, }; use kube::{api::DynamicObject, runtime::conditions}; use kube::{ @@ -22,11 +24,13 @@ use kube::{ runtime::wait::await_condition, }; use log::{debug, error, trace}; -use serde::{Serialize, de::DeserializeOwned}; +use serde::{Deserialize, Serialize, de::DeserializeOwned}; use serde_json::{Value, json}; use similar::TextDiff; use tokio::{io::AsyncReadExt, time::sleep}; +use crate::modules::okd::crd::ClusterServiceVersion; + #[derive(new, Clone)] pub struct K8sClient { client: Client, @@ -194,6 +198,33 @@ impl K8sClient { } } + pub async fn wait_for_operator( + &self, + operator_name: &str, + namespace: Option<&str>, + timeout: Option, + ) -> Result<(), String> { + let api: Api; + + if let Some(ns) = namespace { + api = Api::namespaced(self.client.clone(), ns); + } else { + api = Api::default_namespaced(self.client.clone()); + } + + let establish = await_condition(api, operator_name, is_operator_ready()); + let t = timeout.unwrap_or(Duration::from_secs(5)); + let res = tokio::time::timeout(t, establish).await; + + if res.is_ok() { + Ok(()) + } else { + Err(format!( + "timed out while waiting for operator {operator_name}" + )) + } + } + /// Will execute a commond in the first pod found that matches the specified label /// '{label}={name}' pub async fn exec_app_capture_output( @@ -547,3 +578,14 @@ where } } } + +fn is_operator_ready() -> impl Condition { + |obj: Option<&ClusterServiceVersion>| { + if let Some(csv) = obj { + if let Some(status) = &csv.spec.status { + return status.phase == "Succeeded"; + } + } + false + } +} diff --git a/harmony/src/modules/okd/crd/mod.rs b/harmony/src/modules/okd/crd/mod.rs index c6b3416..b26ad8d 100644 --- a/harmony/src/modules/okd/crd/mod.rs +++ b/harmony/src/modules/okd/crd/mod.rs @@ -28,7 +28,10 @@ pub struct SubscriptionSpec { pub name: String, pub source: String, pub source_namespace: String, + #[serde(skip_serializing_if = "Option::is_none")] pub channel: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub install_plan_approval: Option, } #[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] @@ -38,3 +41,22 @@ pub enum InstallPlanApproval { #[serde(rename = "Manual")] Manual, } + +#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[kube( + group = "operators.coreos.com", + version = "v1alpha1", + kind = "ClusterServiceVersion", + namespaced +)] +#[serde(rename_all = "camelCase")] +pub struct ClusterServiceVersionSpec { + pub status: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)] +#[serde(rename_all = "camelCase")] +pub struct ClusterServiceVersionStatus { + pub phase: String, + pub reason: String, +}