remove unused wait_for_operator

This commit is contained in:
Ian Letourneau 2025-10-23 14:27:34 -04:00
parent 0184e18c66
commit 2fe1c5d147
3 changed files with 23 additions and 129 deletions

View File

@ -6,7 +6,6 @@ use harmony_types::{
net::{MacAddress, Url}, net::{MacAddress, Url},
switch::PortLocation, switch::PortLocation,
}; };
use k8s_openapi::api::core::v1::Namespace;
use kube::api::ObjectMeta; use kube::api::ObjectMeta;
use log::debug; use log::debug;
use log::info; use log::info;
@ -16,9 +15,8 @@ use crate::executors::ExecutorError;
use crate::hardware::PhysicalHost; use crate::hardware::PhysicalHost;
use crate::infra::brocade::BrocadeSwitchAuth; use crate::infra::brocade::BrocadeSwitchAuth;
use crate::infra::brocade::BrocadeSwitchClient; use crate::infra::brocade::BrocadeSwitchClient;
use crate::modules::okd::crd::{ use crate::modules::okd::crd::nmstate::{
InstallPlanApproval, OperatorGroup, OperatorGroupSpec, Subscription, SubscriptionSpec, self, NMState, NodeNetworkConfigurationPolicy, NodeNetworkConfigurationPolicySpec,
nmstate::{self, NMState, NodeNetworkConfigurationPolicy, NodeNetworkConfigurationPolicySpec},
}; };
use crate::topology::PxeOptions; use crate::topology::PxeOptions;
@ -29,9 +27,9 @@ use super::{
Topology, k8s::K8sClient, Topology, k8s::K8sClient,
}; };
use std::collections::BTreeMap;
use std::net::IpAddr; use std::net::IpAddr;
use std::sync::Arc; use std::sync::Arc;
use std::{collections::BTreeMap, time::Duration};
#[derive(Debug, Clone)] #[derive(Debug, Clone)]
pub struct HAClusterTopology { pub struct HAClusterTopology {

View File

@ -3,37 +3,29 @@ use std::time::Duration;
use derive_new::new; use derive_new::new;
use k8s_openapi::{ use k8s_openapi::{
ClusterResourceScope, NamespaceResourceScope, ClusterResourceScope, NamespaceResourceScope,
api::{ api::{apps::v1::Deployment, core::v1::Pod},
apps::v1::Deployment,
core::v1::{Pod, PodStatus},
},
}; };
use kube::{ use kube::{
Client, Config, Error, Resource, Client, Config, Error, Resource,
api::{ api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
Api, AttachParams, DeleteParams, ListParams, ObjectMeta, Patch, PatchParams, ResourceExt,
},
config::{KubeConfigOptions, Kubeconfig}, config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse, core::ErrorResponse,
discovery::{ApiCapabilities, Scope}, discovery::{ApiCapabilities, Scope},
error::DiscoveryError, error::DiscoveryError,
runtime::{reflector::Lookup, wait::Condition}, runtime::reflector::Lookup,
}; };
use kube::{api::DynamicObject, runtime::conditions}; use kube::{api::DynamicObject, runtime::conditions};
use kube::{ use kube::{
api::{ApiResource, GroupVersionKind}, api::{ApiResource, GroupVersionKind},
runtime::wait::await_condition, runtime::wait::await_condition,
}; };
use log::{debug, error, info, trace, warn}; use log::{debug, error, trace, warn};
use serde::{Deserialize, Serialize, de::DeserializeOwned}; use serde::{Serialize, de::DeserializeOwned};
use serde_json::{Value, json}; use serde_json::json;
use serde_value::DeserializerError;
use similar::TextDiff; use similar::TextDiff;
use tokio::{io::AsyncReadExt, time::sleep}; use tokio::{io::AsyncReadExt, time::sleep};
use url::Url; use url::Url;
use crate::modules::okd::crd::ClusterServiceVersion;
#[derive(new, Clone)] #[derive(new, Clone)]
pub struct K8sClient { pub struct K8sClient {
client: Client, client: Client,
@ -78,7 +70,8 @@ impl K8sClient {
} else { } else {
Api::default_namespaced_with(self.client.clone(), &gvk) Api::default_namespaced_with(self.client.clone(), &gvk)
}; };
Ok(resource.get(name).await?)
resource.get(name).await
} }
pub async fn get_deployment( pub async fn get_deployment(
@ -91,7 +84,8 @@ impl K8sClient {
} else { } else {
Api::default_namespaced(self.client.clone()) Api::default_namespaced(self.client.clone())
}; };
Ok(deps.get_opt(name).await?)
deps.get_opt(name).await
} }
pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result<Option<Pod>, Error> { pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result<Option<Pod>, Error> {
@ -100,7 +94,8 @@ impl K8sClient {
} else { } else {
Api::default_namespaced(self.client.clone()) Api::default_namespaced(self.client.clone())
}; };
Ok(pods.get_opt(name).await?)
pods.get_opt(name).await
} }
pub async fn scale_deployment( pub async fn scale_deployment(
@ -201,33 +196,6 @@ impl K8sClient {
} }
} }
pub async fn wait_for_operator(
&self,
operator_name: &str,
namespace: Option<&str>,
timeout: Option<Duration>,
) -> Result<(), String> {
let api: Api<ClusterServiceVersion>;
if let Some(ns) = namespace {
api = Api::namespaced(self.client.clone(), ns);
} else {
api = Api::default_namespaced(self.client.clone());
}
let establish = await_condition(api, operator_name, is_operator_ready());
let t = timeout.unwrap_or(Duration::from_secs(5));
let res = tokio::time::timeout(t, establish).await;
if res.is_ok() {
Ok(())
} else {
Err(format!(
"timed out while waiting for operator {operator_name}"
))
}
}
/// Will execute a commond in the first pod found that matches the specified label /// Will execute a commond in the first pod found that matches the specified label
/// '{label}={name}' /// '{label}={name}'
pub async fn exec_app_capture_output( pub async fn exec_app_capture_output(
@ -274,7 +242,7 @@ impl K8sClient {
if let Some(s) = status.status { if let Some(s) = status.status {
let mut stdout_buf = String::new(); let mut stdout_buf = String::new();
if let Some(mut stdout) = process.stdout().take() { if let Some(mut stdout) = process.stdout() {
stdout stdout
.read_to_string(&mut stdout_buf) .read_to_string(&mut stdout_buf)
.await .await
@ -380,14 +348,14 @@ impl K8sClient {
Ok(current) => { Ok(current) => {
trace!("Received current value {current:#?}"); trace!("Received current value {current:#?}");
// The resource exists, so we calculate and display a diff. // The resource exists, so we calculate and display a diff.
println!("\nPerforming dry-run for resource: '{}'", name); println!("\nPerforming dry-run for resource: '{name}'");
let mut current_yaml = serde_yaml::to_value(&current).unwrap_or_else(|_| { let mut current_yaml = serde_yaml::to_value(&current).unwrap_or_else(|_| {
panic!("Could not serialize current value : {current:#?}") panic!("Could not serialize current value : {current:#?}")
}); });
if current_yaml.is_mapping() && current_yaml.get("status").is_some() { if current_yaml.is_mapping() && current_yaml.get("status").is_some() {
let map = current_yaml.as_mapping_mut().unwrap(); let map = current_yaml.as_mapping_mut().unwrap();
let removed = map.remove_entry("status"); let removed = map.remove_entry("status");
trace!("Removed status {:?}", removed); trace!("Removed status {removed:?}");
} else { } else {
trace!( trace!(
"Did not find status entry for current object {}/{}", "Did not find status entry for current object {}/{}",
@ -416,14 +384,14 @@ impl K8sClient {
similar::ChangeTag::Insert => "+", similar::ChangeTag::Insert => "+",
similar::ChangeTag::Equal => " ", similar::ChangeTag::Equal => " ",
}; };
print!("{}{}", sign, change); print!("{sign}{change}");
} }
// In a dry run, we return the new resource state that would have been applied. // In a dry run, we return the new resource state that would have been applied.
Ok(resource.clone()) Ok(resource.clone())
} }
Err(Error::Api(ErrorResponse { code: 404, .. })) => { Err(Error::Api(ErrorResponse { code: 404, .. })) => {
// The resource does not exist, so the "diff" is the entire new resource. // The resource does not exist, so the "diff" is the entire new resource.
println!("\nPerforming dry-run for new resource: '{}'", name); println!("\nPerforming dry-run for new resource: '{name}'");
println!( println!(
"Resource does not exist. It would be created with the following content:" "Resource does not exist. It would be created with the following content:"
); );
@ -432,14 +400,14 @@ impl K8sClient {
// Print each line of the new resource with a '+' prefix. // Print each line of the new resource with a '+' prefix.
for line in new_yaml.lines() { for line in new_yaml.lines() {
println!("+{}", line); println!("+{line}");
} }
// In a dry run, we return the new resource state that would have been created. // In a dry run, we return the new resource state that would have been created.
Ok(resource.clone()) Ok(resource.clone())
} }
Err(e) => { Err(e) => {
// Another API error occurred. // Another API error occurred.
error!("Failed to get resource '{}': {}", name, e); error!("Failed to get resource '{name}': {e}");
Err(e) Err(e)
} }
} }
@ -519,7 +487,7 @@ impl K8sClient {
// 6. Apply the object to the cluster using Server-Side Apply. // 6. Apply the object to the cluster using Server-Side Apply.
// This will create the resource if it doesn't exist, or update it if it does. // This will create the resource if it doesn't exist, or update it if it does.
println!("Applying Argo Application '{name}' in namespace '{namespace}'...",); println!("Applying '{name}' in namespace '{namespace}'...",);
let patch_params = PatchParams::apply("harmony"); // Use a unique field manager name let patch_params = PatchParams::apply("harmony"); // Use a unique field manager name
let result = api.patch(name, &patch_params, &Patch::Apply(&obj)).await?; let result = api.patch(name, &patch_params, &Patch::Apply(&obj)).await?;
@ -648,14 +616,3 @@ where
} }
} }
} }
fn is_operator_ready() -> impl Condition<ClusterServiceVersion> {
|obj: Option<&ClusterServiceVersion>| {
if let Some(csv) = obj {
if let Some(status) = &csv.spec.status {
return status.phase == "Succeeded";
}
}
false
}
}

View File

@ -1,62 +1 @@
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
pub mod nmstate; pub mod nmstate;
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "operators.coreos.com",
version = "v1",
kind = "OperatorGroup",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct OperatorGroupSpec {
pub target_namespaces: Vec<String>,
}
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "operators.coreos.com",
version = "v1alpha1",
kind = "Subscription",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct SubscriptionSpec {
pub name: String,
pub source: String,
pub source_namespace: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub channel: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub install_plan_approval: Option<InstallPlanApproval>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
pub enum InstallPlanApproval {
#[serde(rename = "Automatic")]
Automatic,
#[serde(rename = "Manual")]
Manual,
}
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "operators.coreos.com",
version = "v1alpha1",
kind = "ClusterServiceVersion",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct ClusterServiceVersionSpec {
pub status: Option<ClusterServiceVersionStatus>,
}
#[derive(Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct ClusterServiceVersionStatus {
pub phase: String,
pub reason: String,
}