fix(host_network): adjust bond & port-channel configuration (partial) #175

Merged
letian merged 12 commits from nmstate into master 2025-10-29 17:09:16 +00:00
7 changed files with 119 additions and 110 deletions
Showing only changes of commit 0184e18c66 - Show all commits

View File

@ -102,80 +102,58 @@ impl HAClusterTopology {
} }
async fn ensure_nmstate_operator_installed(&self) -> Result<(), String> { async fn ensure_nmstate_operator_installed(&self) -> Result<(), String> {
// FIXME: Find a way to check nmstate is already available (get pod -n nmstate)
debug!("Installing NMState operator...");
let k8s_client = self.k8s_client().await?; let k8s_client = self.k8s_client().await?;
let nmstate_namespace = Namespace { debug!("Installing NMState controller...");
metadata: ObjectMeta { k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/nmstate.io_nmstates.yaml
name: Some("nmstate".to_string()), ").unwrap(), Some("nmstate"))
finalizers: Some(vec!["kubernetes".to_string()]),
..Default::default()
},
..Default::default()
};
debug!("Creating NMState namespace: {nmstate_namespace:#?}");
k8s_client
.apply(&nmstate_namespace, None)
.await .await
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
let nmstate_operator_group = OperatorGroup { debug!("Creating NMState namespace...");
metadata: ObjectMeta { k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/namespace.yaml
name: Some("nmstate".to_string()), ").unwrap(), Some("nmstate"))
namespace: Some("nmstate".to_string()),
..Default::default()
},
spec: OperatorGroupSpec {
target_namespaces: vec!["nmstate".to_string()],
},
};
debug!("Creating NMState operator group: {nmstate_operator_group:#?}");
k8s_client
.apply(&nmstate_operator_group, Some("nmstate"))
.await .await
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
let nmstate_subscription = Subscription { debug!("Creating NMState service account...");
metadata: ObjectMeta { k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/service_account.yaml
name: Some("kubernetes-nmstate-operator".to_string()), ").unwrap(), Some("nmstate"))
namespace: Some("nmstate".to_string()), .await
..Default::default() .map_err(|e| e.to_string())?;
},
spec: SubscriptionSpec { debug!("Creating NMState role...");
channel: Some("alpha".to_string()), k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role.yaml
name: "kubernetes-nmstate-operator".to_string(), ").unwrap(), Some("nmstate"))
source: "operatorhubio-catalog".to_string(), .await
source_namespace: "openshift-marketplace".to_string(), .map_err(|e| e.to_string())?;
install_plan_approval: Some(InstallPlanApproval::Automatic),
}, debug!("Creating NMState role binding...");
}; k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/role_binding.yaml
debug!("Subscribing to NMState Operator: {nmstate_subscription:#?}"); ").unwrap(), Some("nmstate"))
k8s_client .await
.apply(&nmstate_subscription, Some("nmstate")) .map_err(|e| e.to_string())?;
debug!("Creating NMState operator...");
k8s_client.apply_url(url::Url::parse("https://github.com/nmstate/kubernetes-nmstate/releases/download/v0.84.0/operator.yaml
").unwrap(), Some("nmstate"))
.await .await
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;
k8s_client k8s_client
.wait_for_operator( .wait_until_deployment_ready("nmstate-operator", Some("nmstate"), None)
"kubernetes-nmstate-operator",
Some("nmstate"),
Some(Duration::from_secs(30)),
)
.await?; .await?;
let nmstate = NMState { let nmstate = NMState {
metadata: ObjectMeta { metadata: ObjectMeta {
name: Some("nmstate".to_string()), name: Some("nmstate".to_string()),
namespace: Some("nmstate".to_string()),
..Default::default() ..Default::default()
}, },
..Default::default() ..Default::default()
}; };
debug!("Creating NMState: {nmstate:#?}"); debug!("Creating NMState: {nmstate:#?}");
k8s_client k8s_client
.apply(&nmstate, Some("nmstate")) .apply(&nmstate, None)
.await .await
.map_err(|e| e.to_string())?; .map_err(|e| e.to_string())?;

View File

@ -15,6 +15,7 @@ use kube::{
}, },
config::{KubeConfigOptions, Kubeconfig}, config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse, core::ErrorResponse,
discovery::{ApiCapabilities, Scope},
error::DiscoveryError, error::DiscoveryError,
runtime::{reflector::Lookup, wait::Condition}, runtime::{reflector::Lookup, wait::Condition},
}; };
@ -23,11 +24,13 @@ use kube::{
api::{ApiResource, GroupVersionKind}, api::{ApiResource, GroupVersionKind},
runtime::wait::await_condition, runtime::wait::await_condition,
}; };
use log::{debug, error, trace}; use log::{debug, error, info, trace, warn};
use serde::{Deserialize, Serialize, de::DeserializeOwned}; use serde::{Deserialize, Serialize, de::DeserializeOwned};
use serde_json::{Value, json}; use serde_json::{Value, json};
use serde_value::DeserializerError;
use similar::TextDiff; use similar::TextDiff;
use tokio::{io::AsyncReadExt, time::sleep}; use tokio::{io::AsyncReadExt, time::sleep};
use url::Url;
use crate::modules::okd::crd::ClusterServiceVersion; use crate::modules::okd::crd::ClusterServiceVersion;
@ -140,9 +143,9 @@ impl K8sClient {
pub async fn wait_until_deployment_ready( pub async fn wait_until_deployment_ready(
&self, &self,
name: String, name: &str,
namespace: Option<&str>, namespace: Option<&str>,
timeout: Option<u64>, timeout: Option<Duration>,
) -> Result<(), String> { ) -> Result<(), String> {
let api: Api<Deployment>; let api: Api<Deployment>;
@ -152,9 +155,9 @@ impl K8sClient {
api = Api::default_namespaced(self.client.clone()); api = Api::default_namespaced(self.client.clone());
} }
let establish = await_condition(api, name.as_str(), conditions::is_deployment_completed()); let establish = await_condition(api, name, conditions::is_deployment_completed());
let t = timeout.unwrap_or(300); let timeout = timeout.unwrap_or(Duration::from_secs(120));
let res = tokio::time::timeout(std::time::Duration::from_secs(t), establish).await; let res = tokio::time::timeout(timeout, establish).await;
if res.is_ok() { if res.is_ok() {
Ok(()) Ok(())
@ -451,7 +454,7 @@ impl K8sClient {
where where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize, K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize,
<K as Resource>::Scope: ApplyStrategy<K>, <K as Resource>::Scope: ApplyStrategy<K>,
<K as kube::Resource>::DynamicType: Default, <K as Resource>::DynamicType: Default,
{ {
let mut result = Vec::new(); let mut result = Vec::new();
for r in resource.iter() { for r in resource.iter() {
@ -516,10 +519,7 @@ impl K8sClient {
// 6. Apply the object to the cluster using Server-Side Apply. // 6. Apply the object to the cluster using Server-Side Apply.
// This will create the resource if it doesn't exist, or update it if it does. // This will create the resource if it doesn't exist, or update it if it does.
println!( println!("Applying Argo Application '{name}' in namespace '{namespace}'...",);
"Applying Argo Application '{}' in namespace '{}'...",
name, namespace
);
let patch_params = PatchParams::apply("harmony"); // Use a unique field manager name let patch_params = PatchParams::apply("harmony"); // Use a unique field manager name
let result = api.patch(name, &patch_params, &Patch::Apply(&obj)).await?; let result = api.patch(name, &patch_params, &Patch::Apply(&obj)).await?;
@ -528,6 +528,51 @@ impl K8sClient {
Ok(()) Ok(())
} }
/// Apply a resource from a URL
///
/// It is the equivalent of `kubectl apply -f <url>`
pub async fn apply_url(&self, url: Url, ns: Option<&str>) -> Result<(), Error> {
let patch_params = PatchParams::apply("harmony");
let discovery = kube::Discovery::new(self.client.clone()).run().await?;
let yaml = reqwest::get(url)
.await
.expect("Could not get URL")
.text()
.await
.expect("Could not get content from URL");
for doc in multidoc_deserialize(&yaml).expect("failed to parse YAML from file") {
let obj: DynamicObject =
serde_yaml::from_value(doc).expect("cannot apply without valid YAML");
let namespace = obj.metadata.namespace.as_deref().or(ns);
let type_meta = obj
.types
.as_ref()
.expect("cannot apply object without valid TypeMeta");
let gvk = GroupVersionKind::try_from(type_meta)
.expect("cannot apply object without valid GroupVersionKind");
let name = obj.name_any();
if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) {
let api = get_dynamic_api(ar, caps, self.client.clone(), namespace, false);
trace!(
"Applying {}: \n{}",
gvk.kind,
serde_yaml::to_string(&obj).expect("Failed to serialize YAML")
);
let data: serde_json::Value =
serde_json::to_value(&obj).expect("Failed to serialize JSON");
let _r = api.patch(&name, &patch_params, &Patch::Apply(data)).await?;
debug!("applied {} {}", gvk.kind, name);
} else {
warn!("Cannot apply document for unknown {gvk:?}");
}
}
Ok(())
}
pub(crate) async fn from_kubeconfig(path: &str) -> Option<K8sClient> { pub(crate) async fn from_kubeconfig(path: &str) -> Option<K8sClient> {
let k = match Kubeconfig::read_from(path) { let k = match Kubeconfig::read_from(path) {
Ok(k) => k, Ok(k) => k,
@ -547,6 +592,31 @@ impl K8sClient {
} }
} }
fn get_dynamic_api(
resource: ApiResource,
capabilities: ApiCapabilities,
client: Client,
ns: Option<&str>,
all: bool,
) -> Api<DynamicObject> {
if capabilities.scope == Scope::Cluster || all {
Api::all_with(client, &resource)
} else if let Some(namespace) = ns {
Api::namespaced_with(client, namespace, &resource)
} else {
Api::default_namespaced_with(client, &resource)
}
}
fn multidoc_deserialize(data: &str) -> Result<Vec<serde_yaml::Value>, serde_yaml::Error> {
use serde::Deserialize;
let mut docs = vec![];
for de in serde_yaml::Deserializer::from_str(data) {
docs.push(serde_yaml::Value::deserialize(de)?);
}
Ok(docs)
}
pub trait ApplyStrategy<K: Resource> { pub trait ApplyStrategy<K: Resource> {
fn get_api(client: &Client, ns: Option<&str>) -> Api<K>; fn get_api(client: &Client, ns: Option<&str>) -> Api<K>;
} }

View File

@ -1 +0,0 @@
pub mod types;

View File

@ -1,40 +0,0 @@
use std::collections::BTreeMap;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Default, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(
group = "operators.coreos.com",
version = "v1alpha1",
kind = "CatalogSource",
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct CatalogSourceSpec {
pub source_type: String,
pub image: String,
pub display_name: String,
pub publisher: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub grpc_pod_config: Option<GrpcPodConfig>,
}
impl Default for CatalogSource {
fn default() -> Self {
Self {
metadata: Default::default(),
spec: Default::default(),
}
}
}
#[derive(Default, Serialize, Deserialize, Clone, Debug, JsonSchema)]
#[serde(rename_all = "camelCase")]
pub struct GrpcPodConfig {
#[serde(skip_serializing_if = "Option::is_none")]
pub memory_target: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub node_selector: Option<BTreeMap<String, String>>,
}

View File

@ -3,6 +3,5 @@ pub mod executors;
pub mod hp_ilo; pub mod hp_ilo;
pub mod intel_amt; pub mod intel_amt;
pub mod inventory; pub mod inventory;
pub mod kubers;
pub mod opnsense; pub mod opnsense;
mod sqlx; mod sqlx;

View File

@ -100,11 +100,7 @@ impl<T: Topology + HelmCommand + K8sclient + MultiTargetTopology> Interpret<T> f
info!("deploying ntfy..."); info!("deploying ntfy...");
client client
.wait_until_deployment_ready( .wait_until_deployment_ready("ntfy", Some(self.score.namespace.as_str()), None)
"ntfy".to_string(),
Some(self.score.namespace.as_str()),
None,
)
.await?; .await?;
info!("ntfy deployed"); info!("ntfy deployed");

View File

@ -6,9 +6,16 @@ use serde::{Deserialize, Serialize};
use serde_json::Value; use serde_json::Value;
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)] #[derive(CustomResource, Deserialize, Serialize, Clone, Debug, JsonSchema)]
#[kube(group = "nmstate.io", version = "v1", kind = "NMState", namespaced)] #[kube(
group = "nmstate.io",
version = "v1",
kind = "NMState",
plural = "nmstates",
namespaced = false
)]
#[serde(rename_all = "camelCase")] #[serde(rename_all = "camelCase")]
pub struct NMStateSpec { pub struct NMStateSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub probe_configuration: Option<ProbeConfig>, pub probe_configuration: Option<ProbeConfig>,
} }