391 lines
14 KiB
Rust
391 lines
14 KiB
Rust
use std::sync::Arc;
|
|
|
|
use crate::{
|
|
executors::ExecutorError,
|
|
topology::k8s::{ApplyStrategy, K8sClient},
|
|
};
|
|
use async_trait::async_trait;
|
|
use derive_new::new;
|
|
use k8s_openapi::{
|
|
api::{
|
|
core::v1::{LimitRange, Namespace, ResourceQuota},
|
|
networking::v1::{
|
|
NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyIngressRule, NetworkPolicyPort,
|
|
},
|
|
},
|
|
apimachinery::pkg::util::intstr::IntOrString,
|
|
};
|
|
use kube::Resource;
|
|
use log::{debug, info, warn};
|
|
use serde::de::DeserializeOwned;
|
|
use serde_json::json;
|
|
|
|
use super::{TenantConfig, TenantManager};
|
|
|
|
#[derive(new, Clone, Debug)]
|
|
pub struct K8sTenantManager {
|
|
k8s_client: Arc<K8sClient>,
|
|
}
|
|
|
|
impl K8sTenantManager {
|
|
fn get_namespace_name(&self, config: &TenantConfig) -> String {
|
|
config.name.clone()
|
|
}
|
|
|
|
fn ensure_constraints(&self, _namespace: &Namespace) -> Result<(), ExecutorError> {
|
|
warn!("Validate that when tenant already exists (by id) that name has not changed");
|
|
warn!("Make sure other Tenant constraints are respected by this k8s implementation");
|
|
Ok(())
|
|
}
|
|
|
|
async fn apply_resource<
|
|
K: Resource + std::fmt::Debug + Sync + DeserializeOwned + Default + serde::Serialize + Clone,
|
|
>(
|
|
&self,
|
|
mut resource: K,
|
|
config: &TenantConfig,
|
|
) -> Result<K, ExecutorError>
|
|
where
|
|
<K as kube::Resource>::DynamicType: Default,
|
|
<K as kube::Resource>::Scope: ApplyStrategy<K>,
|
|
{
|
|
self.apply_labels(&mut resource, config);
|
|
self.k8s_client
|
|
.apply(&resource, Some(&self.get_namespace_name(config)))
|
|
.await
|
|
.map_err(|e| {
|
|
ExecutorError::UnexpectedError(format!("Could not create Tenant resource : {e}"))
|
|
})
|
|
}
|
|
|
|
fn apply_labels<K: Resource>(&self, resource: &mut K, config: &TenantConfig) {
|
|
let labels = resource.meta_mut().labels.get_or_insert_default();
|
|
labels.insert(
|
|
"app.kubernetes.io/managed-by".to_string(),
|
|
"harmony".to_string(),
|
|
);
|
|
labels.insert("harmony/tenant-id".to_string(), config.id.to_string());
|
|
labels.insert("harmony/tenant-name".to_string(), config.name.clone());
|
|
}
|
|
|
|
fn build_namespace(&self, config: &TenantConfig) -> Result<Namespace, ExecutorError> {
|
|
let namespace = json!(
|
|
{
|
|
"apiVersion": "v1",
|
|
"kind": "Namespace",
|
|
"metadata": {
|
|
"labels": {
|
|
"harmony.nationtech.io/tenant.id": config.id.to_string(),
|
|
"harmony.nationtech.io/tenant.name": config.name,
|
|
},
|
|
"name": self.get_namespace_name(config),
|
|
},
|
|
}
|
|
);
|
|
serde_json::from_value(namespace).map_err(|e| {
|
|
ExecutorError::ConfigurationError(format!(
|
|
"Could not build TenantManager Namespace. {}",
|
|
e
|
|
))
|
|
})
|
|
}
|
|
|
|
fn build_resource_quota(&self, config: &TenantConfig) -> Result<ResourceQuota, ExecutorError> {
|
|
let resource_quota = json!(
|
|
{
|
|
"apiVersion": "v1",
|
|
"kind": "ResourceQuota",
|
|
"metadata": {
|
|
"name": format!("{}-quota", config.name),
|
|
"labels": {
|
|
"harmony.nationtech.io/tenant.id": config.id.to_string(),
|
|
"harmony.nationtech.io/tenant.name": config.name,
|
|
},
|
|
"namespace": self.get_namespace_name(config),
|
|
},
|
|
"spec": {
|
|
"hard": {
|
|
"limits.cpu": format!("{:.0}",config.resource_limits.cpu_limit_cores),
|
|
"limits.memory": format!("{:.3}Gi", config.resource_limits.memory_limit_gb),
|
|
"requests.cpu": format!("{:.0}",config.resource_limits.cpu_request_cores),
|
|
"requests.memory": format!("{:.3}Gi", config.resource_limits.memory_request_gb),
|
|
"requests.storage": format!("{:.3}Gi", config.resource_limits.storage_total_gb),
|
|
"pods": "20",
|
|
"services": "10",
|
|
"configmaps": "30",
|
|
"secrets": "30",
|
|
"persistentvolumeclaims": "15",
|
|
"services.loadbalancers": "2",
|
|
"services.nodeports": "5",
|
|
"limits.ephemeral-storage": "10Gi",
|
|
|
|
}
|
|
}
|
|
}
|
|
|
|
);
|
|
serde_json::from_value(resource_quota).map_err(|e| {
|
|
ExecutorError::ConfigurationError(format!(
|
|
"Could not build TenantManager ResourceQuota. {}",
|
|
e
|
|
))
|
|
})
|
|
}
|
|
|
|
fn build_limit_range(&self, config: &TenantConfig) -> Result<LimitRange, ExecutorError> {
|
|
let limit_range = json!({
|
|
"apiVersion": "v1",
|
|
"kind": "LimitRange",
|
|
"metadata": {
|
|
"name": format!("{}-defaults", config.name),
|
|
"labels": {
|
|
"harmony.nationtech.io/tenant.id": config.id.to_string(),
|
|
"harmony.nationtech.io/tenant.name": config.name,
|
|
},
|
|
"namespace": self.get_namespace_name(config),
|
|
},
|
|
"spec": {
|
|
"limits": [
|
|
{
|
|
"type": "Container",
|
|
"default": {
|
|
"cpu": "500m",
|
|
"memory": "500Mi"
|
|
},
|
|
"defaultRequest": {
|
|
"cpu": "100m",
|
|
"memory": "100Mi"
|
|
},
|
|
}
|
|
]
|
|
}
|
|
});
|
|
|
|
serde_json::from_value(limit_range).map_err(|e| {
|
|
ExecutorError::ConfigurationError(format!(
|
|
"Could not build TenantManager LimitRange. {}",
|
|
e
|
|
))
|
|
})
|
|
}
|
|
|
|
fn build_network_policy(&self, config: &TenantConfig) -> Result<NetworkPolicy, ExecutorError> {
|
|
let network_policy = json!({
|
|
"apiVersion": "networking.k8s.io/v1",
|
|
"kind": "NetworkPolicy",
|
|
"metadata": {
|
|
"name": format!("{}-network-policy", config.name),
|
|
"namespace": self.get_namespace_name(config),
|
|
},
|
|
"spec": {
|
|
"podSelector": {},
|
|
"egress": [
|
|
{ "to": [ {"podSelector": {}}]},
|
|
{ "to":
|
|
[
|
|
{
|
|
"podSelector": {},
|
|
"namespaceSelector": {
|
|
"matchLabels": {
|
|
"kubernetes.io/metadata.name":"openshift-dns"
|
|
}
|
|
}
|
|
},
|
|
]
|
|
},
|
|
{ "to": [
|
|
{
|
|
"ipBlock": {
|
|
|
|
"cidr": "0.0.0.0/0",
|
|
// See https://en.wikipedia.org/wiki/Reserved_IP_addresses
|
|
"except": [
|
|
"10.0.0.0/8",
|
|
"172.16.0.0/12",
|
|
"192.168.0.0/16",
|
|
"192.0.0.0/24",
|
|
"192.0.2.0/24",
|
|
"192.88.99.0/24",
|
|
"192.18.0.0/15",
|
|
"198.51.100.0/24",
|
|
"169.254.0.0/16",
|
|
"203.0.113.0/24",
|
|
"127.0.0.0/8",
|
|
|
|
// Not sure we should block this one as it is
|
|
// used for multicast. But better block more than less.
|
|
"224.0.0.0/4",
|
|
"240.0.0.0/4",
|
|
"100.64.0.0/10",
|
|
"233.252.0.0/24",
|
|
"0.0.0.0/8",
|
|
],
|
|
}
|
|
}
|
|
]
|
|
},
|
|
],
|
|
"ingress": [
|
|
{ "from": [ {"podSelector": {}}]}
|
|
],
|
|
"policyTypes": [
|
|
"Ingress", "Egress",
|
|
]
|
|
}
|
|
});
|
|
|
|
let mut network_policy: NetworkPolicy =
|
|
serde_json::from_value(network_policy).map_err(|e| {
|
|
ExecutorError::ConfigurationError(format!(
|
|
"Could not build TenantManager NetworkPolicy. {}",
|
|
e
|
|
))
|
|
})?;
|
|
|
|
config
|
|
.network_policy
|
|
.additional_allowed_cidr_ingress
|
|
.iter()
|
|
.try_for_each(|c| -> Result<(), ExecutorError> {
|
|
let cidr_list: Vec<serde_json::Value> =
|
|
c.0.iter()
|
|
.map(|ci| {
|
|
json!({
|
|
"ipBlock": {
|
|
"cidr": ci.to_string(),
|
|
}
|
|
})
|
|
})
|
|
.collect();
|
|
let ports: Option<Vec<NetworkPolicyPort>> =
|
|
c.1.as_ref().map(|spec| match &spec.data {
|
|
super::PortSpecData::SinglePort(port) => vec![NetworkPolicyPort {
|
|
port: Some(IntOrString::Int(port.clone().into())),
|
|
..Default::default()
|
|
}],
|
|
super::PortSpecData::PortRange(start, end) => vec![NetworkPolicyPort {
|
|
port: Some(IntOrString::Int(start.clone().into())),
|
|
end_port: Some(end.clone().into()),
|
|
protocol: None, // Not currently supported by Harmony
|
|
}],
|
|
|
|
super::PortSpecData::ListOfPorts(items) => items
|
|
.iter()
|
|
.map(|i| NetworkPolicyPort {
|
|
port: Some(IntOrString::Int(i.clone().into())),
|
|
..Default::default()
|
|
})
|
|
.collect(),
|
|
});
|
|
let rule = serde_json::from_value::<NetworkPolicyIngressRule>(json!({
|
|
"from": cidr_list,
|
|
"ports": ports,
|
|
}))
|
|
.map_err(|e| {
|
|
ExecutorError::ConfigurationError(format!(
|
|
"Could not build TenantManager NetworkPolicyIngressRule. {}",
|
|
e
|
|
))
|
|
})?;
|
|
|
|
network_policy
|
|
.spec
|
|
.as_mut()
|
|
.unwrap()
|
|
.ingress
|
|
.as_mut()
|
|
.unwrap()
|
|
.push(rule);
|
|
Ok(())
|
|
})?;
|
|
|
|
config
|
|
.network_policy
|
|
.additional_allowed_cidr_egress
|
|
.iter()
|
|
.try_for_each(|c| -> Result<(), ExecutorError> {
|
|
let cidr_list: Vec<serde_json::Value> =
|
|
c.0.iter()
|
|
.map(|ci| {
|
|
json!({
|
|
"ipBlock": {
|
|
"cidr": ci.to_string(),
|
|
}
|
|
})
|
|
})
|
|
.collect();
|
|
let ports: Option<Vec<NetworkPolicyPort>> =
|
|
c.1.as_ref().map(|spec| match &spec.data {
|
|
super::PortSpecData::SinglePort(port) => vec![NetworkPolicyPort {
|
|
port: Some(IntOrString::Int(port.clone().into())),
|
|
..Default::default()
|
|
}],
|
|
super::PortSpecData::PortRange(start, end) => vec![NetworkPolicyPort {
|
|
port: Some(IntOrString::Int(start.clone().into())),
|
|
end_port: Some(end.clone().into()),
|
|
protocol: None, // Not currently supported by Harmony
|
|
}],
|
|
|
|
super::PortSpecData::ListOfPorts(items) => items
|
|
.iter()
|
|
.map(|i| NetworkPolicyPort {
|
|
port: Some(IntOrString::Int(i.clone().into())),
|
|
..Default::default()
|
|
})
|
|
.collect(),
|
|
});
|
|
let rule = serde_json::from_value::<NetworkPolicyEgressRule>(json!({
|
|
"to": cidr_list,
|
|
"ports": ports,
|
|
}))
|
|
.map_err(|e| {
|
|
ExecutorError::ConfigurationError(format!(
|
|
"Could not build TenantManager NetworkPolicyEgressRule. {}",
|
|
e
|
|
))
|
|
})?;
|
|
network_policy
|
|
.spec
|
|
.as_mut()
|
|
.unwrap()
|
|
.egress
|
|
.as_mut()
|
|
.unwrap()
|
|
.push(rule);
|
|
Ok(())
|
|
})?;
|
|
|
|
Ok(network_policy)
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl TenantManager for K8sTenantManager {
|
|
async fn provision_tenant(&self, config: &TenantConfig) -> Result<(), ExecutorError> {
|
|
let namespace = self.build_namespace(config)?;
|
|
let resource_quota = self.build_resource_quota(config)?;
|
|
let network_policy = self.build_network_policy(config)?;
|
|
let resource_limit_range = self.build_limit_range(config)?;
|
|
|
|
self.ensure_constraints(&namespace)?;
|
|
|
|
debug!("Creating namespace for tenant {}", config.name);
|
|
self.apply_resource(namespace, config).await?;
|
|
|
|
debug!("Creating resource_quota for tenant {}", config.name);
|
|
self.apply_resource(resource_quota, config).await?;
|
|
|
|
debug!("Creating limit_range for tenant {}", config.name);
|
|
self.apply_resource(resource_limit_range, config).await?;
|
|
|
|
debug!("Creating network_policy for tenant {}", config.name);
|
|
self.apply_resource(network_policy, config).await?;
|
|
|
|
info!(
|
|
"Success provisionning K8s tenant id {} name {}",
|
|
config.id, config.name
|
|
);
|
|
Ok(())
|
|
}
|
|
}
|