TenantManager_impl_k8s_anywhere #47
@ -1,11 +1,11 @@
 | 
				
			|||||||
use derive_new::new;
 | 
					use derive_new::new;
 | 
				
			||||||
use k8s_openapi::NamespaceResourceScope;
 | 
					use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope};
 | 
				
			||||||
use kube::{
 | 
					use kube::{
 | 
				
			||||||
    Api, Client, Config, Error, Resource,
 | 
					    Api, Client, Config, Error, Resource,
 | 
				
			||||||
    api::PostParams,
 | 
					    api::PostParams,
 | 
				
			||||||
    config::{KubeConfigOptions, Kubeconfig},
 | 
					    config::{KubeConfigOptions, Kubeconfig},
 | 
				
			||||||
};
 | 
					};
 | 
				
			||||||
use log::error;
 | 
					use log::{debug, error, trace};
 | 
				
			||||||
use serde::de::DeserializeOwned;
 | 
					use serde::de::DeserializeOwned;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
#[derive(new)]
 | 
					#[derive(new)]
 | 
				
			||||||
@ -20,52 +20,31 @@ impl K8sClient {
 | 
				
			|||||||
        })
 | 
					        })
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub async fn apply_all<
 | 
					    pub async fn apply<K>(&self, resource: &K, ns: Option<&str>) -> Result<K, Error>
 | 
				
			||||||
        K: Resource<Scope = NamespaceResourceScope>
 | 
					 | 
				
			||||||
            + std::fmt::Debug
 | 
					 | 
				
			||||||
            + Sync
 | 
					 | 
				
			||||||
            + DeserializeOwned
 | 
					 | 
				
			||||||
            + Default
 | 
					 | 
				
			||||||
            + serde::Serialize
 | 
					 | 
				
			||||||
            + Clone,
 | 
					 | 
				
			||||||
    >(
 | 
					 | 
				
			||||||
        &self,
 | 
					 | 
				
			||||||
        resource: &Vec<K>,
 | 
					 | 
				
			||||||
    ) -> Result<Vec<K>, kube::Error>
 | 
					 | 
				
			||||||
    where
 | 
					    where
 | 
				
			||||||
 | 
					        K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize,
 | 
				
			||||||
 | 
					        <K as Resource>::Scope: ApplyStrategy<K>,
 | 
				
			||||||
        <K as kube::Resource>::DynamicType: Default,
 | 
					        <K as kube::Resource>::DynamicType: Default,
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        let mut result = vec![];
 | 
					        debug!("Applying resource {:?} with ns {:?}", resource.meta().name, ns);
 | 
				
			||||||
        for r in resource.iter() {
 | 
					        trace!("{:#?}", serde_json::to_string(resource));
 | 
				
			||||||
            let api: Api<K> = Api::all(self.client.clone());
 | 
					
 | 
				
			||||||
            result.push(api.create(&PostParams::default(), &r).await?);
 | 
					        let api: Api<K> = <<K as Resource>::Scope as ApplyStrategy<K>>::get_api(&self.client, ns);
 | 
				
			||||||
        }
 | 
					        api.create(&PostParams::default(), &resource).await
 | 
				
			||||||
        Ok(result)
 | 
					 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub async fn apply_namespaced<K>(
 | 
					    pub async fn apply_many<K>(&self, resource: &Vec<K>, ns: Option<&str>) -> Result<Vec<K>, Error>
 | 
				
			||||||
        &self,
 | 
					 | 
				
			||||||
        resource: &Vec<K>,
 | 
					 | 
				
			||||||
        ns: Option<&str>,
 | 
					 | 
				
			||||||
    ) -> Result<Vec<K>, Error>
 | 
					 | 
				
			||||||
    where
 | 
					    where
 | 
				
			||||||
        K: Resource<Scope = NamespaceResourceScope>
 | 
					        K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize,
 | 
				
			||||||
            + Clone
 | 
					        <K as Resource>::Scope: ApplyStrategy<K>,
 | 
				
			||||||
            + std::fmt::Debug
 | 
					 | 
				
			||||||
            + DeserializeOwned
 | 
					 | 
				
			||||||
            + serde::Serialize
 | 
					 | 
				
			||||||
            + Default,
 | 
					 | 
				
			||||||
        <K as kube::Resource>::DynamicType: Default,
 | 
					        <K as kube::Resource>::DynamicType: Default,
 | 
				
			||||||
    {
 | 
					    {
 | 
				
			||||||
        let mut resources = Vec::new();
 | 
					        let mut result = Vec::new();
 | 
				
			||||||
        for r in resource.iter() {
 | 
					        for r in resource.iter() {
 | 
				
			||||||
            let api: Api<K> = match ns {
 | 
					            result.push(self.apply(r, ns).await?);
 | 
				
			||||||
                Some(ns) => Api::namespaced(self.client.clone(), ns),
 | 
					 | 
				
			||||||
                None => Api::default_namespaced(self.client.clone()),
 | 
					 | 
				
			||||||
            };
 | 
					 | 
				
			||||||
            resources.push(api.create(&PostParams::default(), &r).await?);
 | 
					 | 
				
			||||||
        }
 | 
					        }
 | 
				
			||||||
        Ok(resources)
 | 
					
 | 
				
			||||||
 | 
					        Ok(result)
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    pub(crate) async fn from_kubeconfig(path: &str) -> Option<K8sClient> {
 | 
					    pub(crate) async fn from_kubeconfig(path: &str) -> Option<K8sClient> {
 | 
				
			||||||
@ -86,3 +65,35 @@ impl K8sClient {
 | 
				
			|||||||
        ))
 | 
					        ))
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					pub trait ApplyStrategy<K: Resource> {
 | 
				
			||||||
 | 
					    fn get_api(client: &Client, ns: Option<&str>) -> Api<K>;
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/// Implementation for all resources that are cluster-scoped.
 | 
				
			||||||
 | 
					/// It will always use `Api::all` and ignore the namespace parameter.
 | 
				
			||||||
 | 
					impl<K> ApplyStrategy<K> for ClusterResourceScope
 | 
				
			||||||
 | 
					where
 | 
				
			||||||
 | 
					    K: Resource<Scope = ClusterResourceScope>,
 | 
				
			||||||
 | 
					    <K as kube::Resource>::DynamicType: Default,
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    fn get_api(client: &Client, _ns: Option<&str>) -> Api<K> {
 | 
				
			||||||
 | 
					        Api::all(client.clone())
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					/// Implementation for all resources that are namespace-scoped.
 | 
				
			||||||
 | 
					/// It will use `Api::namespaced` if a namespace is provided, otherwise
 | 
				
			||||||
 | 
					/// it falls back to the default namespace configured in your kubeconfig.
 | 
				
			||||||
 | 
					impl<K> ApplyStrategy<K> for NamespaceResourceScope
 | 
				
			||||||
 | 
					where
 | 
				
			||||||
 | 
					    K: Resource<Scope = NamespaceResourceScope>,
 | 
				
			||||||
 | 
					    <K as kube::Resource>::DynamicType: Default,
 | 
				
			||||||
 | 
					{
 | 
				
			||||||
 | 
					    fn get_api(client: &Client, ns: Option<&str>) -> Api<K> {
 | 
				
			||||||
 | 
					        match ns {
 | 
				
			||||||
 | 
					            Some(ns) => Api::namespaced(client.clone(), ns),
 | 
				
			||||||
 | 
					            None => Api::default_namespaced(client.clone()),
 | 
				
			||||||
 | 
					        }
 | 
				
			||||||
 | 
					    }
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -6,6 +6,7 @@ use log::{info, warn};
 | 
				
			|||||||
use tokio::sync::OnceCell;
 | 
					use tokio::sync::OnceCell;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use crate::{
 | 
					use crate::{
 | 
				
			||||||
 | 
					    data::Id,
 | 
				
			||||||
    executors::ExecutorError,
 | 
					    executors::ExecutorError,
 | 
				
			||||||
    interpret::{InterpretError, Outcome},
 | 
					    interpret::{InterpretError, Outcome},
 | 
				
			||||||
    inventory::Inventory,
 | 
					    inventory::Inventory,
 | 
				
			||||||
@ -259,27 +260,27 @@ impl TenantManager for K8sAnywhereTopology {
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
    async fn update_tenant_resource_limits(
 | 
					    async fn update_tenant_resource_limits(
 | 
				
			||||||
        &self,
 | 
					        &self,
 | 
				
			||||||
        tenant_name: &str,
 | 
					        tenant_id: &Id,
 | 
				
			||||||
        new_limits: &ResourceLimits,
 | 
					        new_limits: &ResourceLimits,
 | 
				
			||||||
    ) -> Result<(), ExecutorError> {
 | 
					    ) -> Result<(), ExecutorError> {
 | 
				
			||||||
        self.get_k8s_tenant_manager()?
 | 
					        self.get_k8s_tenant_manager()?
 | 
				
			||||||
            .update_tenant_resource_limits(tenant_name, new_limits)
 | 
					            .update_tenant_resource_limits(tenant_id, new_limits)
 | 
				
			||||||
            .await
 | 
					            .await
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async fn update_tenant_network_policy(
 | 
					    async fn update_tenant_network_policy(
 | 
				
			||||||
        &self,
 | 
					        &self,
 | 
				
			||||||
        tenant_name: &str,
 | 
					        tenant_id: &Id,
 | 
				
			||||||
        new_policy: &TenantNetworkPolicy,
 | 
					        new_policy: &TenantNetworkPolicy,
 | 
				
			||||||
    ) -> Result<(), ExecutorError> {
 | 
					    ) -> Result<(), ExecutorError> {
 | 
				
			||||||
        self.get_k8s_tenant_manager()?
 | 
					        self.get_k8s_tenant_manager()?
 | 
				
			||||||
            .update_tenant_network_policy(tenant_name, new_policy)
 | 
					            .update_tenant_network_policy(tenant_id, new_policy)
 | 
				
			||||||
            .await
 | 
					            .await
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
 | 
					
 | 
				
			||||||
    async fn deprovision_tenant(&self, tenant_name: &str) -> Result<(), ExecutorError> {
 | 
					    async fn deprovision_tenant(&self, tenant_id: &Id) -> Result<(), ExecutorError> {
 | 
				
			||||||
        self.get_k8s_tenant_manager()?
 | 
					        self.get_k8s_tenant_manager()?
 | 
				
			||||||
            .deprovision_tenant(tenant_name)
 | 
					            .deprovision_tenant(tenant_id)
 | 
				
			||||||
            .await
 | 
					            .await
 | 
				
			||||||
    }
 | 
					    }
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
				
			|||||||
@ -1,6 +1,7 @@
 | 
				
			|||||||
use async_trait::async_trait;
 | 
					use async_trait::async_trait;
 | 
				
			||||||
use k8s_openapi::NamespaceResourceScope;
 | 
					use k8s_openapi::NamespaceResourceScope;
 | 
				
			||||||
use kube::Resource;
 | 
					use kube::Resource;
 | 
				
			||||||
 | 
					use log::info;
 | 
				
			||||||
use serde::{Serialize, de::DeserializeOwned};
 | 
					use serde::{Serialize, de::DeserializeOwned};
 | 
				
			||||||
 | 
					
 | 
				
			||||||
use crate::{
 | 
					use crate::{
 | 
				
			||||||
@ -75,11 +76,12 @@ where
 | 
				
			|||||||
        _inventory: &Inventory,
 | 
					        _inventory: &Inventory,
 | 
				
			||||||
        topology: &T,
 | 
					        topology: &T,
 | 
				
			||||||
    ) -> Result<Outcome, InterpretError> {
 | 
					    ) -> Result<Outcome, InterpretError> {
 | 
				
			||||||
 | 
					        info!("Applying {} resources", self.score.resource.len());
 | 
				
			||||||
        topology
 | 
					        topology
 | 
				
			||||||
            .k8s_client()
 | 
					            .k8s_client()
 | 
				
			||||||
            .await
 | 
					            .await
 | 
				
			||||||
            .expect("Environment should provide enough information to instanciate a client")
 | 
					            .expect("Environment should provide enough information to instanciate a client")
 | 
				
			||||||
            .apply_namespaced(&self.score.resource, self.score.namespace.as_deref())
 | 
					            .apply_many(&self.score.resource, self.score.namespace.as_deref())
 | 
				
			||||||
            .await?;
 | 
					            .await?;
 | 
				
			||||||
 | 
					
 | 
				
			||||||
        Ok(Outcome::success(
 | 
					        Ok(Outcome::success(
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
		Reference in New Issue
	
	Block a user