diff --git a/harmony/src/domain/topology/k8s.rs b/harmony/src/domain/topology/k8s.rs index 08868e8..b969cad 100644 --- a/harmony/src/domain/topology/k8s.rs +++ b/harmony/src/domain/topology/k8s.rs @@ -1,11 +1,11 @@ use derive_new::new; -use k8s_openapi::NamespaceResourceScope; +use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope}; use kube::{ Api, Client, Config, Error, Resource, api::PostParams, config::{KubeConfigOptions, Kubeconfig}, }; -use log::error; +use log::{debug, error, trace}; use serde::de::DeserializeOwned; #[derive(new)] @@ -20,52 +20,31 @@ impl K8sClient { }) } - pub async fn apply_all< - K: Resource - + std::fmt::Debug - + Sync - + DeserializeOwned - + Default - + serde::Serialize - + Clone, - >( - &self, - resource: &Vec, - ) -> Result, kube::Error> + pub async fn apply(&self, resource: &K, ns: Option<&str>) -> Result where + K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize, + ::Scope: ApplyStrategy, ::DynamicType: Default, { - let mut result = vec![]; - for r in resource.iter() { - let api: Api = Api::all(self.client.clone()); - result.push(api.create(&PostParams::default(), &r).await?); - } - Ok(result) + debug!("Applying resource {:?} with ns {:?}", resource.meta().name, ns); + trace!("{:#?}", serde_json::to_string(resource)); + + let api: Api = <::Scope as ApplyStrategy>::get_api(&self.client, ns); + api.create(&PostParams::default(), &resource).await } - pub async fn apply_namespaced( - &self, - resource: &Vec, - ns: Option<&str>, - ) -> Result, Error> + pub async fn apply_many(&self, resource: &Vec, ns: Option<&str>) -> Result, Error> where - K: Resource - + Clone - + std::fmt::Debug - + DeserializeOwned - + serde::Serialize - + Default, + K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize, + ::Scope: ApplyStrategy, ::DynamicType: Default, { - let mut resources = Vec::new(); + let mut result = Vec::new(); for r in resource.iter() { - let api: Api = match ns { - Some(ns) => Api::namespaced(self.client.clone(), ns), - None => Api::default_namespaced(self.client.clone()), - }; - resources.push(api.create(&PostParams::default(), &r).await?); + result.push(self.apply(r, ns).await?); } - Ok(resources) + + Ok(result) } pub(crate) async fn from_kubeconfig(path: &str) -> Option { @@ -86,3 +65,35 @@ impl K8sClient { )) } } + +pub trait ApplyStrategy { + fn get_api(client: &Client, ns: Option<&str>) -> Api; +} + +/// Implementation for all resources that are cluster-scoped. +/// It will always use `Api::all` and ignore the namespace parameter. +impl ApplyStrategy for ClusterResourceScope +where + K: Resource, + ::DynamicType: Default, +{ + fn get_api(client: &Client, _ns: Option<&str>) -> Api { + Api::all(client.clone()) + } +} + +/// Implementation for all resources that are namespace-scoped. +/// It will use `Api::namespaced` if a namespace is provided, otherwise +/// it falls back to the default namespace configured in your kubeconfig. +impl ApplyStrategy for NamespaceResourceScope +where + K: Resource, + ::DynamicType: Default, +{ + fn get_api(client: &Client, ns: Option<&str>) -> Api { + match ns { + Some(ns) => Api::namespaced(client.clone(), ns), + None => Api::default_namespaced(client.clone()), + } + } +} diff --git a/harmony/src/domain/topology/k8s_anywhere.rs b/harmony/src/domain/topology/k8s_anywhere.rs index bd2f261..fd0685d 100644 --- a/harmony/src/domain/topology/k8s_anywhere.rs +++ b/harmony/src/domain/topology/k8s_anywhere.rs @@ -6,6 +6,7 @@ use log::{info, warn}; use tokio::sync::OnceCell; use crate::{ + data::Id, executors::ExecutorError, interpret::{InterpretError, Outcome}, inventory::Inventory, @@ -259,27 +260,27 @@ impl TenantManager for K8sAnywhereTopology { async fn update_tenant_resource_limits( &self, - tenant_name: &str, + tenant_id: &Id, new_limits: &ResourceLimits, ) -> Result<(), ExecutorError> { self.get_k8s_tenant_manager()? - .update_tenant_resource_limits(tenant_name, new_limits) + .update_tenant_resource_limits(tenant_id, new_limits) .await } async fn update_tenant_network_policy( &self, - tenant_name: &str, + tenant_id: &Id, new_policy: &TenantNetworkPolicy, ) -> Result<(), ExecutorError> { self.get_k8s_tenant_manager()? - .update_tenant_network_policy(tenant_name, new_policy) + .update_tenant_network_policy(tenant_id, new_policy) .await } - async fn deprovision_tenant(&self, tenant_name: &str) -> Result<(), ExecutorError> { + async fn deprovision_tenant(&self, tenant_id: &Id) -> Result<(), ExecutorError> { self.get_k8s_tenant_manager()? - .deprovision_tenant(tenant_name) + .deprovision_tenant(tenant_id) .await } } diff --git a/harmony/src/modules/k8s/resource.rs b/harmony/src/modules/k8s/resource.rs index 6880292..3c0b2bf 100644 --- a/harmony/src/modules/k8s/resource.rs +++ b/harmony/src/modules/k8s/resource.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use k8s_openapi::NamespaceResourceScope; use kube::Resource; +use log::info; use serde::{Serialize, de::DeserializeOwned}; use crate::{ @@ -75,11 +76,12 @@ where _inventory: &Inventory, topology: &T, ) -> Result { + info!("Applying {} resources", self.score.resource.len()); topology .k8s_client() .await .expect("Environment should provide enough information to instanciate a client") - .apply_namespaced(&self.score.resource, self.score.namespace.as_deref()) + .apply_many(&self.score.resource, self.score.namespace.as_deref()) .await?; Ok(Outcome::success(