use std::sync::Arc; use crate::{ executors::ExecutorError, topology::k8s::{ApplyStrategy, K8sClient}, }; use async_trait::async_trait; use k8s_openapi::{ api::{ core::v1::{LimitRange, Namespace, ResourceQuota}, networking::v1::{ NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule, NetworkPolicyPort, }, }, apimachinery::pkg::util::intstr::IntOrString, }; use kube::Resource; use log::debug; use serde::de::DeserializeOwned; use serde_json::json; use tokio::sync::OnceCell; use super::{TenantConfig, TenantManager}; #[derive(Clone, Debug)] pub struct K8sTenantManager { k8s_client: Arc, k8s_tenant_config: Arc>, } impl K8sTenantManager { pub fn new(client: Arc) -> Self { Self { k8s_client: client, k8s_tenant_config: Arc::new(OnceCell::new()), } } } impl K8sTenantManager { fn get_namespace_name(&self, config: &TenantConfig) -> String { config.name.clone() } fn ensure_constraints(&self, _namespace: &Namespace) -> Result<(), ExecutorError> { // TODO: Ensure constraints are applied to namespace (https://git.nationtech.io/NationTech/harmony/issues/98) Ok(()) } async fn apply_resource< K: Resource + std::fmt::Debug + Sync + DeserializeOwned + Default + serde::Serialize + Clone, >( &self, mut resource: K, config: &TenantConfig, ) -> Result where ::DynamicType: Default, ::Scope: ApplyStrategy, { self.apply_labels(&mut resource, config); self.k8s_client .apply(&resource, Some(&self.get_namespace_name(config))) .await .map_err(|e| { ExecutorError::UnexpectedError(format!("Could not create Tenant resource : {e}")) }) } fn apply_labels(&self, resource: &mut K, config: &TenantConfig) { let labels = resource.meta_mut().labels.get_or_insert_default(); labels.insert( "app.kubernetes.io/managed-by".to_string(), "harmony".to_string(), ); labels.insert("harmony/tenant-id".to_string(), config.id.to_string()); labels.insert("harmony/tenant-name".to_string(), config.name.clone()); } fn build_namespace(&self, config: &TenantConfig) -> Result { let namespace = json!( { "apiVersion": "v1", "kind": "Namespace", "metadata": { "labels": { "harmony.nationtech.io/tenant.id": config.id.to_string(), "harmony.nationtech.io/tenant.name": config.name, }, "name": self.get_namespace_name(config), }, } ); serde_json::from_value(namespace).map_err(|e| { ExecutorError::ConfigurationError(format!( "Could not build TenantManager Namespace. {}", e )) }) } fn build_resource_quota(&self, config: &TenantConfig) -> Result { let resource_quota = json!( { "apiVersion": "v1", "kind": "ResourceQuota", "metadata": { "name": format!("{}-quota", config.name), "labels": { "harmony.nationtech.io/tenant.id": config.id.to_string(), "harmony.nationtech.io/tenant.name": config.name, }, "namespace": self.get_namespace_name(config), }, "spec": { "hard": { "limits.cpu": format!("{:.0}",config.resource_limits.cpu_limit_cores), "limits.memory": format!("{:.3}Gi", config.resource_limits.memory_limit_gb), "requests.cpu": format!("{:.0}",config.resource_limits.cpu_request_cores), "requests.memory": format!("{:.3}Gi", config.resource_limits.memory_request_gb), "requests.storage": format!("{:.3}Gi", config.resource_limits.storage_total_gb), "pods": "20", "services": "10", "configmaps": "60", "secrets": "60", "persistentvolumeclaims": "15", "services.loadbalancers": "2", "services.nodeports": "5", "limits.ephemeral-storage": "10Gi", } } } ); serde_json::from_value(resource_quota).map_err(|e| { ExecutorError::ConfigurationError(format!( "Could not build TenantManager ResourceQuota. {}", e )) }) } fn build_limit_range(&self, config: &TenantConfig) -> Result { let limit_range = json!({ "apiVersion": "v1", "kind": "LimitRange", "metadata": { "name": format!("{}-defaults", config.name), "labels": { "harmony.nationtech.io/tenant.id": config.id.to_string(), "harmony.nationtech.io/tenant.name": config.name, }, "namespace": self.get_namespace_name(config), }, "spec": { "limits": [ { "type": "Container", "default": { "cpu": "500m", "memory": "500Mi" }, "defaultRequest": { "cpu": "100m", "memory": "100Mi" }, } ] } }); serde_json::from_value(limit_range).map_err(|e| { ExecutorError::ConfigurationError(format!( "Could not build TenantManager LimitRange. {}", e )) }) } fn build_network_policy(&self, config: &TenantConfig) -> Result { let network_policy = json!({ "apiVersion": "networking.k8s.io/v1", "kind": "NetworkPolicy", "metadata": { "name": format!("{}-network-policy", config.name), "namespace": self.get_namespace_name(config), }, "spec": { "podSelector": {}, "egress": [ { "to": [ { "podSelector": {} } ] }, { "to": [ { "podSelector": {}, "namespaceSelector": { "matchLabels": { "kubernetes.io/metadata.name": "kube-system" } } } ] }, { "to": [ { "podSelector": {}, "namespaceSelector": { "matchLabels": { "kubernetes.io/metadata.name": "openshift-dns" } } } ] }, { "to": [ { "ipBlock": { "cidr": "10.43.0.1/32", } } ] }, { "to": [ { //TODO this ip is from the docker network that k3d is running on //since k3d does not deploy kube-api-server as a pod it needs to ahve the ip //address opened up //need to find a way to automatically detect the ip address from the docker //network "ipBlock": { "cidr": "172.24.0.0/16", } } ] }, { "to": [ { "ipBlock": { "cidr": "0.0.0.0/0", "except": [ "10.0.0.0/8", "172.16.0.0/12", "192.168.0.0/16", "192.0.0.0/24", "192.0.2.0/24", "192.88.99.0/24", "192.18.0.0/15", "198.51.100.0/24", "169.254.0.0/16", "203.0.113.0/24", "127.0.0.0/8", "224.0.0.0/4", "240.0.0.0/4", "100.64.0.0/10", "233.252.0.0/24", "0.0.0.0/8" ] } } ] } ], "ingress": [ { "from": [ { "podSelector": {} } ] } ], "policyTypes": [ "Ingress", "Egress" ] } }); let mut network_policy: NetworkPolicy = serde_json::from_value(network_policy).map_err(|e| { ExecutorError::ConfigurationError(format!( "Could not build TenantManager NetworkPolicy. {}", e )) })?; config .network_policy .additional_allowed_cidr_ingress .iter() .try_for_each(|c| -> Result<(), ExecutorError> { let cidr_list: Vec = c.0.iter() .map(|ci| { json!({ "ipBlock": { "cidr": ci.to_string(), } }) }) .collect(); let ports: Option> = c.1.as_ref().map(|spec| match &spec.data { super::PortSpecData::SinglePort(port) => vec![NetworkPolicyPort { port: Some(IntOrString::Int((*port).into())), ..Default::default() }], super::PortSpecData::PortRange(start, end) => vec![NetworkPolicyPort { port: Some(IntOrString::Int((*start).into())), end_port: Some((*end).into()), protocol: None, // Not currently supported by Harmony }], super::PortSpecData::ListOfPorts(items) => items .iter() .map(|i| NetworkPolicyPort { port: Some(IntOrString::Int((*i).into())), ..Default::default() }) .collect(), }); let rule = serde_json::from_value::(json!({ "from": cidr_list, "ports": ports, })) .map_err(|e| { ExecutorError::ConfigurationError(format!( "Could not build TenantManager NetworkPolicyIngressRule. {}", e )) })?; network_policy .spec .as_mut() .unwrap() .ingress .as_mut() .unwrap() .push(rule); Ok(()) })?; config .network_policy .additional_allowed_cidr_egress .iter() .try_for_each(|c| -> Result<(), ExecutorError> { let cidr_list: Vec = c.0.iter() .map(|ci| { json!({ "ipBlock": { "cidr": ci.to_string(), } }) }) .collect(); let ports: Option> = c.1.as_ref().map(|spec| match &spec.data { super::PortSpecData::SinglePort(port) => vec![NetworkPolicyPort { port: Some(IntOrString::Int((*port).into())), ..Default::default() }], super::PortSpecData::PortRange(start, end) => vec![NetworkPolicyPort { port: Some(IntOrString::Int((*start).into())), end_port: Some((*end).into()), protocol: None, // Not currently supported by Harmony }], super::PortSpecData::ListOfPorts(items) => items .iter() .map(|i| NetworkPolicyPort { port: Some(IntOrString::Int((*i).into())), ..Default::default() }) .collect(), }); let rule = serde_json::from_value::(json!({ "to": cidr_list, "ports": ports, })) .map_err(|e| { ExecutorError::ConfigurationError(format!( "Could not build TenantManager NetworkPolicyEgressRule. {}", e )) })?; network_policy .spec .as_mut() .unwrap() .egress .as_mut() .unwrap() .push(rule); Ok(()) })?; Ok(network_policy) } fn store_config(&self, config: &TenantConfig) { let _ = self.k8s_tenant_config.set(config.clone()); } } #[async_trait] impl TenantManager for K8sTenantManager { async fn provision_tenant(&self, config: &TenantConfig) -> Result<(), ExecutorError> { let namespace = self.build_namespace(config)?; let resource_quota = self.build_resource_quota(config)?; let network_policy = self.build_network_policy(config)?; let resource_limit_range = self.build_limit_range(config)?; self.ensure_constraints(&namespace)?; debug!("Creating namespace for tenant {}", config.name); self.apply_resource(namespace, config).await?; debug!("Creating resource_quota for tenant {}", config.name); self.apply_resource(resource_quota, config).await?; debug!("Creating limit_range for tenant {}", config.name); self.apply_resource(resource_limit_range, config).await?; debug!("Creating network_policy for tenant {}", config.name); self.apply_resource(network_policy, config).await?; debug!( "Success provisionning K8s tenant id {} name {}", config.id, config.name ); self.store_config(config); Ok(()) } async fn get_tenant_config(&self) -> Option { self.k8s_tenant_config.get().cloned() } }