Files
harmony/harmony/src/domain/topology/k8s_anywhere/k8s_anywhere.rs

1306 lines
46 KiB
Rust

use std::{collections::BTreeMap, process::Command, sync::Arc, time::Duration};
use async_trait::async_trait;
use base64::{Engine, engine::general_purpose};
use harmony_types::rfc1123::Rfc1123Name;
use k8s_openapi::api::{
core::v1::Secret,
rbac::v1::{ClusterRoleBinding, RoleRef, Subject},
};
use kube::api::{DynamicObject, GroupVersionKind, ObjectMeta};
use log::{debug, info, warn};
use serde::Serialize;
use tokio::sync::OnceCell;
use crate::{
executors::ExecutorError,
interpret::InterpretStatus,
inventory::Inventory,
modules::{
k3d::K3DInstallationScore,
k8s::ingress::{K8sIngressScore, PathType},
monitoring::{
grafana::{grafana::Grafana, helm::helm_grafana::grafana_helm_chart_score},
kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus,
crd_grafana::{
Grafana as GrafanaCRD, GrafanaCom, GrafanaDashboard,
GrafanaDashboardDatasource, GrafanaDashboardSpec, GrafanaDatasource,
GrafanaDatasourceConfig, GrafanaDatasourceJsonData,
GrafanaDatasourceSecureJsonData, GrafanaDatasourceSpec, GrafanaSpec,
},
crd_prometheuses::LabelSelector,
prometheus_operator::prometheus_operator_helm_chart_score,
rhob_alertmanager_config::RHOBObservability,
service_monitor::ServiceMonitor,
},
},
okd::route::OKDTlsPassthroughScore,
prometheus::{
k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore,
prometheus::PrometheusMonitoring, rhob_alerting_score::RHOBAlertingScore,
},
},
score::Score,
topology::{TlsRoute, TlsRouter, ingress::Ingress},
};
use super::super::{
DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, PreparationError,
PreparationOutcome, Topology,
k8s::K8sClient,
oberservability::monitoring::AlertReceiver,
tenant::{
TenantConfig, TenantManager,
k8s::K8sTenantManager,
network_policy::{
K3dNetworkPolicyStrategy, NetworkPolicyStrategy, NoopNetworkPolicyStrategy,
},
},
};
#[derive(Clone, Debug)]
struct K8sState {
client: Arc<K8sClient>,
source: K8sSource,
message: String,
}
#[derive(Debug, Clone)]
pub enum KubernetesDistribution {
OpenshiftFamily,
K3sFamily,
Default,
}
#[derive(Debug, Clone)]
enum K8sSource {
LocalK3d,
Kubeconfig,
}
#[derive(Clone, Debug)]
pub struct K8sAnywhereTopology {
k8s_state: Arc<OnceCell<Option<K8sState>>>,
tenant_manager: Arc<OnceCell<K8sTenantManager>>,
k8s_distribution: Arc<OnceCell<KubernetesDistribution>>,
config: Arc<K8sAnywhereConfig>,
}
#[async_trait]
impl K8sclient for K8sAnywhereTopology {
async fn k8s_client(&self) -> Result<Arc<K8sClient>, String> {
let state = match self.k8s_state.get() {
Some(state) => state,
None => return Err("K8s state not initialized yet".to_string()),
};
let state = match state {
Some(state) => state,
None => return Err("K8s client initialized but empty".to_string()),
};
Ok(state.client.clone())
}
}
#[async_trait]
impl TlsRouter for K8sAnywhereTopology {
async fn get_wildcard_domain(&self) -> Result<Option<String>, String> {
todo!()
}
/// Returns the port that this router exposes externally.
async fn get_router_port(&self) -> u16 {
// TODO un-hardcode this :)
443
}
async fn install_route(&self, route: TlsRoute) -> Result<(), String> {
let distro = self
.get_k8s_distribution()
.await
.map_err(|e| format!("Could not get k8s distribution {e}"))?;
match distro {
KubernetesDistribution::OpenshiftFamily => {
OKDTlsPassthroughScore {
name: Rfc1123Name::try_from(route.backend_info_string().as_str())?,
route,
}
.interpret(&Inventory::empty(), self)
.await?;
Ok(())
}
KubernetesDistribution::K3sFamily | KubernetesDistribution::Default => Err(format!(
"Distribution not supported yet for Tlsrouter {distro:?}"
)),
}
}
}
#[async_trait]
impl Grafana for K8sAnywhereTopology {
async fn ensure_grafana_operator(
&self,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
debug!("ensure grafana operator");
let client = self.k8s_client().await.unwrap();
let grafana_gvk = GroupVersionKind {
group: "grafana.integreatly.org".to_string(),
version: "v1beta1".to_string(),
kind: "Grafana".to_string(),
};
let name = "grafanas.grafana.integreatly.org";
let ns = "grafana";
let grafana_crd = client
.get_resource_json_value(name, Some(ns), &grafana_gvk)
.await;
match grafana_crd {
Ok(_) => {
return Ok(PreparationOutcome::Success {
details: "Found grafana CRDs in cluster".to_string(),
});
}
Err(_) => {
return self
.install_grafana_operator(inventory, Some("grafana"))
.await;
}
};
}
async fn install_grafana(&self) -> Result<PreparationOutcome, PreparationError> {
let ns = "grafana";
let mut label = BTreeMap::new();
label.insert("dashboards".to_string(), "grafana".to_string());
let label_selector = LabelSelector {
match_labels: label.clone(),
match_expressions: vec![],
};
let client = self.k8s_client().await?;
let grafana = self.build_grafana(ns, &label);
client.apply(&grafana, Some(ns)).await?;
//TODO change this to a ensure ready or something better than just a timeout
client
.wait_until_deployment_ready(
"grafana-grafana-deployment",
Some("grafana"),
Some(Duration::from_secs(30)),
)
.await?;
let sa_name = "grafana-grafana-sa";
let token_secret_name = "grafana-sa-token-secret";
let sa_token_secret = self.build_sa_token_secret(token_secret_name, sa_name, ns);
client.apply(&sa_token_secret, Some(ns)).await?;
let secret_gvk = GroupVersionKind {
group: "".to_string(),
version: "v1".to_string(),
kind: "Secret".to_string(),
};
let secret = client
.get_resource_json_value(token_secret_name, Some(ns), &secret_gvk)
.await?;
let token = format!(
"Bearer {}",
self.extract_and_normalize_token(&secret).unwrap()
);
debug!("creating grafana clusterrole binding");
let clusterrolebinding =
self.build_cluster_rolebinding(sa_name, "cluster-monitoring-view", ns);
client.apply(&clusterrolebinding, Some(ns)).await?;
debug!("creating grafana datasource crd");
let thanos_url = format!(
"https://{}",
self.get_domain("thanos-querier-openshift-monitoring")
.await
.unwrap()
);
let thanos_openshift_datasource = self.build_grafana_datasource(
"thanos-openshift-monitoring",
ns,
&label_selector,
&thanos_url,
&token,
);
client.apply(&thanos_openshift_datasource, Some(ns)).await?;
debug!("creating grafana dashboard crd");
let dashboard = self.build_grafana_dashboard(ns, &label_selector);
client.apply(&dashboard, Some(ns)).await?;
debug!("creating grafana ingress");
let grafana_ingress = self.build_grafana_ingress(ns).await;
grafana_ingress
.interpret(&Inventory::empty(), self)
.await
.map_err(|e| PreparationError::new(e.to_string()))?;
Ok(PreparationOutcome::Success {
details: "Installed grafana composants".to_string(),
})
}
}
#[async_trait]
impl PrometheusMonitoring<CRDPrometheus> for K8sAnywhereTopology {
async fn install_prometheus(
&self,
sender: &CRDPrometheus,
_inventory: &Inventory,
_receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>,
) -> Result<PreparationOutcome, PreparationError> {
let client = self.k8s_client().await?;
for monitor in sender.service_monitor.iter() {
client
.apply(monitor, Some(&sender.namespace))
.await
.map_err(|e| PreparationError::new(e.to_string()))?;
}
Ok(PreparationOutcome::Success {
details: "successfuly installed prometheus components".to_string(),
})
}
async fn ensure_prometheus_operator(
&self,
sender: &CRDPrometheus,
_inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
let po_result = self.ensure_prometheus_operator(sender).await?;
match po_result {
PreparationOutcome::Success { details: _ } => {
debug!("Detected prometheus crds operator present in cluster.");
return Ok(po_result);
}
PreparationOutcome::Noop => {
debug!("Skipping Prometheus CR installation due to missing operator.");
return Ok(po_result);
}
}
}
}
#[async_trait]
impl PrometheusMonitoring<RHOBObservability> for K8sAnywhereTopology {
async fn install_prometheus(
&self,
sender: &RHOBObservability,
inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<RHOBObservability>>>>,
) -> Result<PreparationOutcome, PreparationError> {
let po_result = self.ensure_cluster_observability_operator(sender).await?;
if po_result == PreparationOutcome::Noop {
debug!("Skipping Prometheus CR installation due to missing operator.");
return Ok(po_result);
}
let result = self
.get_cluster_observability_operator_prometheus_application_score(
sender.clone(),
receivers,
)
.await
.interpret(inventory, self)
.await;
match result {
Ok(outcome) => match outcome.status {
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
details: outcome.message,
}),
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
_ => Err(PreparationError::new(outcome.message)),
},
Err(err) => Err(PreparationError::new(err.to_string())),
}
}
async fn ensure_prometheus_operator(
&self,
sender: &RHOBObservability,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
todo!()
}
}
impl Serialize for K8sAnywhereTopology {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
todo!()
}
}
impl K8sAnywhereTopology {
pub fn from_env() -> Self {
Self {
k8s_state: Arc::new(OnceCell::new()),
tenant_manager: Arc::new(OnceCell::new()),
k8s_distribution: Arc::new(OnceCell::new()),
config: Arc::new(K8sAnywhereConfig::from_env()),
}
}
pub fn with_config(config: K8sAnywhereConfig) -> Self {
Self {
k8s_state: Arc::new(OnceCell::new()),
tenant_manager: Arc::new(OnceCell::new()),
k8s_distribution: Arc::new(OnceCell::new()),
config: Arc::new(config),
}
}
pub async fn get_k8s_distribution(&self) -> Result<&KubernetesDistribution, PreparationError> {
self.k8s_distribution
.get_or_try_init(async || {
debug!("Trying to detect k8s distribution");
let client = self.k8s_client().await.unwrap();
let discovery = client.discovery().await.map_err(|e| {
PreparationError::new(format!("Could not discover API groups: {}", e))
})?;
let version = client.get_apiserver_version().await.map_err(|e| {
PreparationError::new(format!("Could not get server version: {}", e))
})?;
// OpenShift / OKD
if discovery
.groups()
.any(|g| g.name() == "project.openshift.io")
{
info!("Found KubernetesDistribution OpenshiftFamily");
return Ok(KubernetesDistribution::OpenshiftFamily);
}
// K3d / K3s
if version.git_version.contains("k3s") {
info!("Found KubernetesDistribution K3sFamily");
return Ok(KubernetesDistribution::K3sFamily);
}
info!("Could not identify KubernetesDistribution, using Default");
return Ok(KubernetesDistribution::Default);
})
.await
}
fn extract_and_normalize_token(&self, secret: &DynamicObject) -> Option<String> {
let token_b64 = secret
.data
.get("token")
.or_else(|| secret.data.get("data").and_then(|d| d.get("token")))
.and_then(|v| v.as_str())?;
let bytes = general_purpose::STANDARD.decode(token_b64).ok()?;
let s = String::from_utf8(bytes).ok()?;
let cleaned = s
.trim_matches(|c: char| c.is_whitespace() || c == '\0')
.to_string();
Some(cleaned)
}
pub fn build_cluster_rolebinding(
&self,
service_account_name: &str,
clusterrole_name: &str,
ns: &str,
) -> ClusterRoleBinding {
ClusterRoleBinding {
metadata: ObjectMeta {
name: Some(format!("{}-view-binding", service_account_name)),
..Default::default()
},
role_ref: RoleRef {
api_group: "rbac.authorization.k8s.io".into(),
kind: "ClusterRole".into(),
name: clusterrole_name.into(),
},
subjects: Some(vec![Subject {
kind: "ServiceAccount".into(),
name: service_account_name.into(),
namespace: Some(ns.into()),
..Default::default()
}]),
}
}
pub fn build_sa_token_secret(
&self,
secret_name: &str,
service_account_name: &str,
ns: &str,
) -> Secret {
let mut annotations = BTreeMap::new();
annotations.insert(
"kubernetes.io/service-account.name".to_string(),
service_account_name.to_string(),
);
Secret {
metadata: ObjectMeta {
name: Some(secret_name.into()),
namespace: Some(ns.into()),
annotations: Some(annotations),
..Default::default()
},
type_: Some("kubernetes.io/service-account-token".to_string()),
..Default::default()
}
}
fn build_grafana_datasource(
&self,
name: &str,
ns: &str,
label_selector: &LabelSelector,
url: &str,
token: &str,
) -> GrafanaDatasource {
let mut json_data = BTreeMap::new();
json_data.insert("timeInterval".to_string(), "5s".to_string());
GrafanaDatasource {
metadata: ObjectMeta {
name: Some(name.to_string()),
namespace: Some(ns.to_string()),
..Default::default()
},
spec: GrafanaDatasourceSpec {
instance_selector: label_selector.clone(),
allow_cross_namespace_import: Some(true),
values_from: None,
datasource: GrafanaDatasourceConfig {
access: "proxy".to_string(),
name: name.to_string(),
r#type: "prometheus".to_string(),
url: url.to_string(),
database: None,
json_data: Some(GrafanaDatasourceJsonData {
time_interval: Some("60s".to_string()),
http_header_name1: Some("Authorization".to_string()),
tls_skip_verify: Some(true),
oauth_pass_thru: Some(true),
}),
secure_json_data: Some(GrafanaDatasourceSecureJsonData {
http_header_value1: Some(format!("Bearer {token}")),
}),
is_default: Some(false),
editable: Some(true),
},
},
}
}
fn build_grafana_dashboard(
&self,
ns: &str,
label_selector: &LabelSelector,
) -> GrafanaDashboard {
let graf_dashboard = GrafanaDashboard {
metadata: ObjectMeta {
name: Some(format!("grafana-dashboard-{}", ns)),
namespace: Some(ns.to_string()),
..Default::default()
},
spec: GrafanaDashboardSpec {
resync_period: Some("30s".to_string()),
instance_selector: label_selector.clone(),
datasources: Some(vec![GrafanaDashboardDatasource {
input_name: "DS_PROMETHEUS".to_string(),
datasource_name: "thanos-openshift-monitoring".to_string(),
}]),
json: None,
grafana_com: Some(GrafanaCom {
id: 17406,
revision: None,
}),
},
};
graf_dashboard
}
fn build_grafana(&self, ns: &str, labels: &BTreeMap<String, String>) -> GrafanaCRD {
let grafana = GrafanaCRD {
metadata: ObjectMeta {
name: Some(format!("grafana-{}", ns)),
namespace: Some(ns.to_string()),
labels: Some(labels.clone()),
..Default::default()
},
spec: GrafanaSpec {
config: None,
admin_user: None,
admin_password: None,
ingress: None,
persistence: None,
resources: None,
},
};
grafana
}
async fn build_grafana_ingress(&self, ns: &str) -> K8sIngressScore {
let domain = self.get_domain(&format!("grafana-{}", ns)).await.unwrap();
let name = format!("{}-grafana", ns);
let backend_service = format!("grafana-{}-service", ns);
K8sIngressScore {
name: fqdn::fqdn!(&name),
host: fqdn::fqdn!(&domain),
backend_service: fqdn::fqdn!(&backend_service),
port: 3000,
path: Some("/".to_string()),
path_type: Some(PathType::Prefix),
namespace: Some(fqdn::fqdn!(&ns)),
ingress_class_name: Some("openshift-default".to_string()),
}
}
async fn get_cluster_observability_operator_prometheus_application_score(
&self,
sender: RHOBObservability,
receivers: Option<Vec<Box<dyn AlertReceiver<RHOBObservability>>>>,
) -> RHOBAlertingScore {
RHOBAlertingScore {
sender,
receivers: receivers.unwrap_or_default(),
service_monitors: vec![],
prometheus_rules: vec![],
}
}
async fn get_k8s_prometheus_application_score(
&self,
sender: CRDPrometheus,
receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>,
service_monitors: Option<Vec<ServiceMonitor>>,
) -> K8sPrometheusCRDAlertingScore {
return K8sPrometheusCRDAlertingScore {
sender,
receivers: receivers.unwrap_or_default(),
service_monitors: service_monitors.unwrap_or_default(),
prometheus_rules: vec![],
};
}
async fn openshift_ingress_operator_available(&self) -> Result<(), PreparationError> {
let client = self.k8s_client().await?;
let gvk = GroupVersionKind {
group: "operator.openshift.io".into(),
version: "v1".into(),
kind: "IngressController".into(),
};
let ic = client
.get_resource_json_value("default", Some("openshift-ingress-operator"), &gvk)
.await?;
let ready_replicas = ic.data["status"]["availableReplicas"].as_i64().unwrap_or(0);
if ready_replicas >= 1 {
return Ok(());
} else {
return Err(PreparationError::new(
"openshift-ingress-operator not available".to_string(),
));
}
}
fn is_helm_available(&self) -> Result<(), String> {
let version_result = Command::new("helm")
.arg("version")
.output()
.map_err(|e| format!("Failed to execute 'helm -version': {}", e))?;
if !version_result.status.success() {
return Err("Failed to run 'helm -version'".to_string());
}
let version_output = String::from_utf8_lossy(&version_result.stdout);
debug!("Helm version: {}", version_output.trim());
Ok(())
}
async fn try_load_system_kubeconfig(&self) -> Option<K8sClient> {
todo!("Use kube-rs default behavior to load system kubeconfig");
}
async fn try_load_kubeconfig(&self, path: &str) -> Option<K8sClient> {
K8sClient::from_kubeconfig_with_context(path, self.config.k8s_context.clone()).await
}
fn get_k3d_installation_score(&self) -> K3DInstallationScore {
K3DInstallationScore::default()
}
async fn try_install_k3d(&self) -> Result<(), PreparationError> {
let result = self
.get_k3d_installation_score()
.interpret(&Inventory::empty(), self)
.await;
match result {
Ok(outcome) => match outcome.status {
InterpretStatus::SUCCESS => Ok(()),
InterpretStatus::NOOP => Ok(()),
_ => Err(PreparationError::new(outcome.message)),
},
Err(err) => Err(PreparationError::new(err.to_string())),
}
}
async fn try_get_or_install_k8s_client(&self) -> Result<Option<K8sState>, PreparationError> {
let k8s_anywhere_config = &self.config;
// TODO this deserves some refactoring, it is becoming a bit hard to figure out
// be careful when making modifications here
if k8s_anywhere_config.use_local_k3d {
debug!("Using local k3d cluster because of use_local_k3d set to true");
} else {
if let Some(kubeconfig) = &k8s_anywhere_config.kubeconfig {
debug!("Loading kubeconfig {kubeconfig}");
match self.try_load_kubeconfig(kubeconfig).await {
Some(client) => {
return Ok(Some(K8sState {
client: Arc::new(client),
source: K8sSource::Kubeconfig,
message: format!(
"Loaded k8s client from kubeconfig {kubeconfig} using context {}",
self.config
.k8s_context
.as_ref()
.map(|s| s.clone())
.unwrap_or_default()
),
}));
}
None => {
return Err(PreparationError::new(format!(
"Failed to load kubeconfig from {kubeconfig}"
)));
}
}
}
if k8s_anywhere_config.use_system_kubeconfig {
debug!("Loading system kubeconfig");
match self.try_load_system_kubeconfig().await {
Some(_client) => todo!(),
None => todo!(),
}
}
info!("No kubernetes configuration found");
}
if !k8s_anywhere_config.autoinstall {
warn!(
"Installation cancelled, K8sAnywhere could not initialize a valid Kubernetes client"
);
return Ok(None);
}
debug!("Starting K8sAnywhere installation");
self.try_install_k3d().await?;
let k3d_score = self.get_k3d_installation_score();
// I feel like having to rely on the k3d_rs crate here is a smell
// I think we should have a way to interact more deeply with scores/interpret. Maybe the
// K3DInstallationScore should expose a method to get_client ? Not too sure what would be a
// good implementation due to the stateful nature of the k3d thing. Which is why I went
// with this solution for now
let k3d = k3d_rs::K3d::new(k3d_score.installation_path, Some(k3d_score.cluster_name));
let state = match k3d.get_client().await {
Ok(client) => K8sState {
client: Arc::new(K8sClient::new(client)),
source: K8sSource::LocalK3d,
message: "K8s client ready".to_string(),
},
Err(_) => todo!(),
};
Ok(Some(state))
}
async fn ensure_k8s_tenant_manager(&self, k8s_state: &K8sState) -> Result<(), String> {
if self.tenant_manager.get().is_some() {
return Ok(());
}
self.tenant_manager
.get_or_try_init(async || -> Result<K8sTenantManager, String> {
let k8s_client = self.k8s_client().await?;
let network_policy_strategy: Box<dyn NetworkPolicyStrategy> = match k8s_state.source
{
K8sSource::LocalK3d => Box::new(K3dNetworkPolicyStrategy::new()),
K8sSource::Kubeconfig => Box::new(NoopNetworkPolicyStrategy::new()),
};
Ok(K8sTenantManager::new(k8s_client, network_policy_strategy))
})
.await?;
Ok(())
}
fn get_k8s_tenant_manager(&self) -> Result<&K8sTenantManager, ExecutorError> {
match self.tenant_manager.get() {
Some(t) => Ok(t),
None => Err(ExecutorError::UnexpectedError(
"K8sTenantManager not available".to_string(),
)),
}
}
async fn ensure_cluster_observability_operator(
&self,
sender: &RHOBObservability,
) -> Result<PreparationOutcome, PreparationError> {
let status = Command::new("sh")
.args(["-c", "kubectl get crd -A | grep -i rhobs"])
.status()
.map_err(|e| PreparationError::new(format!("could not connect to cluster: {}", e)))?;
if !status.success() {
if let Some(Some(k8s_state)) = self.k8s_state.get() {
match k8s_state.source {
K8sSource::LocalK3d => {
warn!(
"Installing observability operator is not supported on LocalK3d source"
);
return Ok(PreparationOutcome::Noop);
debug!("installing cluster observability operator");
todo!();
let op_score =
prometheus_operator_helm_chart_score(sender.namespace.clone());
let result = op_score.interpret(&Inventory::empty(), self).await;
return match result {
Ok(outcome) => match outcome.status {
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
details: "installed cluster observability operator".into(),
}),
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
_ => Err(PreparationError::new(
"failed to install cluster observability operator (unknown error)".into(),
)),
},
Err(err) => Err(PreparationError::new(err.to_string())),
};
}
K8sSource::Kubeconfig => {
debug!(
"unable to install cluster observability operator, contact cluster admin"
);
return Ok(PreparationOutcome::Noop);
}
}
} else {
warn!(
"Unable to detect k8s_state. Skipping Cluster Observability Operator install."
);
return Ok(PreparationOutcome::Noop);
}
}
debug!("Cluster Observability Operator is already present, skipping install");
Ok(PreparationOutcome::Success {
details: "cluster observability operator present in cluster".into(),
})
}
async fn ensure_prometheus_operator(
&self,
sender: &CRDPrometheus,
) -> Result<PreparationOutcome, PreparationError> {
let status = Command::new("sh")
.args(["-c", "kubectl get crd -A | grep -i prometheuses"])
.status()
.map_err(|e| PreparationError::new(format!("could not connect to cluster: {}", e)))?;
if !status.success() {
if let Some(Some(k8s_state)) = self.k8s_state.get() {
match k8s_state.source {
K8sSource::LocalK3d => {
debug!("installing prometheus operator");
let op_score =
prometheus_operator_helm_chart_score(sender.namespace.clone());
let result = op_score.interpret(&Inventory::empty(), self).await;
return match result {
Ok(outcome) => match outcome.status {
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
details: "installed prometheus operator".into(),
}),
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
_ => Err(PreparationError::new(
"failed to install prometheus operator (unknown error)".into(),
)),
},
Err(err) => Err(PreparationError::new(err.to_string())),
};
}
K8sSource::Kubeconfig => {
debug!("unable to install prometheus operator, contact cluster admin");
return Ok(PreparationOutcome::Noop);
}
}
} else {
warn!("Unable to detect k8s_state. Skipping Prometheus Operator install.");
return Ok(PreparationOutcome::Noop);
}
}
debug!("Prometheus operator is already present, skipping install");
Ok(PreparationOutcome::Success {
details: "prometheus operator present in cluster".into(),
})
}
async fn install_grafana_operator(
&self,
inventory: &Inventory,
ns: Option<&str>,
) -> Result<PreparationOutcome, PreparationError> {
let namespace = ns.unwrap_or("grafana");
info!("installing grafana operator in ns {namespace}");
let tenant = self.get_k8s_tenant_manager()?.get_tenant_config().await;
let mut namespace_scope = false;
if tenant.is_some() {
namespace_scope = true;
}
let _grafana_operator_score = grafana_helm_chart_score(namespace, namespace_scope)
.interpret(inventory, self)
.await
.map_err(|e| PreparationError::new(e.to_string()));
Ok(PreparationOutcome::Success {
details: format!(
"Successfully installed grafana operator in ns {}",
ns.unwrap()
),
})
}
}
#[derive(Clone, Debug)]
pub struct K8sAnywhereConfig {
/// The path of the KUBECONFIG file that Harmony should use to interact with the Kubernetes
/// cluster
///
/// Default : None
pub kubeconfig: Option<String>,
/// Whether to use the system KUBECONFIG, either the environment variable or the file in the
/// default or configured location
///
/// Default : false
pub use_system_kubeconfig: bool,
/// Whether to install automatically a kubernetes cluster
///
/// When enabled, autoinstall will setup a K3D cluster on the localhost. https://k3d.io/stable/
///
/// Default: true
pub autoinstall: bool,
/// Whether to use local k3d cluster.
///
/// Takes precedence over other options, useful to avoid messing up a remote cluster by mistake
///
/// default: true
pub use_local_k3d: bool,
pub harmony_profile: String,
/// Name of the kubeconfig context to use.
///
/// If None, it will use the current context.
///
/// If the context name is not found, it will fail to initialize.
pub k8s_context: Option<String>,
}
impl K8sAnywhereConfig {
/// Reads an environment variable `env_var` and parses its content :
/// Comma-separated `key=value` pairs, e.g.,
/// `kubeconfig=/path/to/primary.kubeconfig,context=primary-ctx`
///
/// Then creates a K8sAnywhereConfig from it local installs disabled (`use_local_k3d=false`,
/// `autoinstall=false`, `use_system_kubeconfig=false`).
/// `harmony_profile` is read from `HARMONY_PROFILE` env or defaults to `"dev"`.
///
/// If no kubeconfig path is provided it will fall back to system kubeconfig
///
/// Panics if `env_var` is missing or malformed.
pub fn remote_k8s_from_env_var(env_var: &str) -> Self {
Self::remote_k8s_from_env_var_with_profile(env_var, "HARMONY_PROFILE")
}
pub fn remote_k8s_from_env_var_with_profile(env_var: &str, profile_env_var: &str) -> Self {
debug!("Looking for env var named : {env_var}");
let env_var_value = std::env::var(env_var)
.map_err(|e| format!("Missing required env var {env_var} : {e}"))
.unwrap();
info!("Initializing remote k8s from env var value : {env_var_value}");
let mut kubeconfig: Option<String> = None;
let mut k8s_context: Option<String> = None;
for part in env_var_value.split(',') {
let kv: Vec<&str> = part.splitn(2, '=').collect();
if kv.len() == 2 {
match kv[0].trim() {
"kubeconfig" => kubeconfig = Some(kv[1].trim().to_string()),
"context" => k8s_context = Some(kv[1].trim().to_string()),
_ => {}
}
}
}
debug!("Found in {env_var} : kubeconfig {kubeconfig:?} and context {k8s_context:?}");
let use_system_kubeconfig = kubeconfig.is_none();
if let Some(kubeconfig_value) = std::env::var("KUBECONFIG").ok().map(|v| v.to_string()) {
kubeconfig.get_or_insert(kubeconfig_value);
}
info!("Loading k8s environment with kubeconfig {kubeconfig:?} and context {k8s_context:?}");
K8sAnywhereConfig {
kubeconfig,
k8s_context,
use_system_kubeconfig,
autoinstall: false,
use_local_k3d: false,
harmony_profile: std::env::var(profile_env_var).unwrap_or_else(|_| "dev".to_string()),
}
}
fn from_env() -> Self {
Self {
kubeconfig: std::env::var("KUBECONFIG").ok().map(|v| v.to_string()),
use_system_kubeconfig: std::env::var("HARMONY_USE_SYSTEM_KUBECONFIG")
.map_or_else(|_| false, |v| v.parse().ok().unwrap_or(false)),
autoinstall: std::env::var("HARMONY_AUTOINSTALL")
.map_or_else(|_| true, |v| v.parse().ok().unwrap_or(false)),
// TODO harmony_profile should be managed at a more core level than this
harmony_profile: std::env::var("HARMONY_PROFILE").map_or_else(
|_| "dev".to_string(),
|v| v.parse().ok().unwrap_or("dev".to_string()),
),
use_local_k3d: std::env::var("HARMONY_USE_LOCAL_K3D")
.map_or_else(|_| true, |v| v.parse().ok().unwrap_or(true)),
k8s_context: std::env::var("HARMONY_K8S_CONTEXT").ok(),
}
}
}
#[async_trait]
impl Topology for K8sAnywhereTopology {
fn name(&self) -> &str {
"K8sAnywhereTopology"
}
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
let k8s_state = self
.k8s_state
.get_or_try_init(|| self.try_get_or_install_k8s_client())
.await?;
let k8s_state: &K8sState = k8s_state.as_ref().ok_or(PreparationError::new(
"no K8s client could be found or installed".to_string(),
))?;
self.ensure_k8s_tenant_manager(k8s_state)
.await
.map_err(PreparationError::new)?;
match self.is_helm_available() {
Ok(()) => Ok(PreparationOutcome::Success {
details: format!("{} + helm available", k8s_state.message.clone()),
}),
Err(e) => Err(PreparationError::new(format!("helm unavailable: {}", e))),
}
}
}
impl MultiTargetTopology for K8sAnywhereTopology {
fn current_target(&self) -> DeploymentTarget {
if self.config.use_local_k3d {
return DeploymentTarget::LocalDev;
}
match self.config.harmony_profile.to_lowercase().as_str() {
"staging" => DeploymentTarget::Staging,
"production" => DeploymentTarget::Production,
_ => todo!("HARMONY_PROFILE must be set when use_local_k3d is false"),
}
}
}
impl HelmCommand for K8sAnywhereTopology {}
#[async_trait]
impl TenantManager for K8sAnywhereTopology {
async fn provision_tenant(&self, config: &TenantConfig) -> Result<(), ExecutorError> {
self.get_k8s_tenant_manager()?
.provision_tenant(config)
.await
}
async fn get_tenant_config(&self) -> Option<TenantConfig> {
self.get_k8s_tenant_manager()
.ok()?
.get_tenant_config()
.await
}
}
#[async_trait]
impl Ingress for K8sAnywhereTopology {
//TODO this is specifically for openshift/okd which violates the k8sanywhere idea
async fn get_domain(&self, service: &str) -> Result<String, PreparationError> {
let client = self.k8s_client().await?;
if let Some(Some(k8s_state)) = self.k8s_state.get() {
match k8s_state.source {
K8sSource::LocalK3d => Ok(format!("{service}.local.k3d")),
K8sSource::Kubeconfig => {
self.openshift_ingress_operator_available().await?;
let gvk = GroupVersionKind {
group: "operator.openshift.io".into(),
version: "v1".into(),
kind: "IngressController".into(),
};
let ic = client
.get_resource_json_value(
"default",
Some("openshift-ingress-operator"),
&gvk,
)
.await
.map_err(|_| {
PreparationError::new("Failed to fetch IngressController".to_string())
})?;
match ic.data["status"]["domain"].as_str() {
Some(domain) => Ok(format!("{service}.{domain}")),
None => Err(PreparationError::new("Could not find domain".to_string())),
}
}
}
} else {
Err(PreparationError::new(
"Cannot get domain: unable to detect K8s state".to_string(),
))
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::sync::atomic::{AtomicUsize, Ordering};
static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
/// Sets environment variables with unique names to avoid concurrency issues between tests.
/// Returns the names of the (config_var, profile_var) used.
fn setup_env_vars(config_value: Option<&str>, profile_value: Option<&str>) -> (String, String) {
let id = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
let config_var = format!("TEST_VAR_{}", id);
let profile_var = format!("TEST_PROFILE_{}", id);
unsafe {
if let Some(v) = config_value {
std::env::set_var(&config_var, v);
} else {
std::env::remove_var(&config_var);
}
if let Some(v) = profile_value {
std::env::set_var(&profile_var, v);
} else {
std::env::remove_var(&profile_var);
}
}
(config_var, profile_var)
}
/// Runs a test in a separate thread to avoid polluting the process environment.
fn run_in_isolated_env<F>(f: F)
where
F: FnOnce() + Send + 'static,
{
let handle = std::thread::spawn(f);
handle.join().expect("Test thread panicked");
}
#[test]
fn test_remote_k8s_from_env_var_full() {
let (config_var, profile_var) =
setup_env_vars(Some("kubeconfig=/foo.kc,context=bar"), Some("testprof"));
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc"));
assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
assert_eq!(cfg.harmony_profile, "testprof");
assert!(!cfg.use_local_k3d);
assert!(!cfg.autoinstall);
assert!(!cfg.use_system_kubeconfig);
}
#[test]
fn test_remote_k8s_from_env_var_only_kubeconfig() {
let (config_var, profile_var) = setup_env_vars(Some("kubeconfig=/foo.kc"), None);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc"));
assert_eq!(cfg.k8s_context, None);
assert_eq!(cfg.harmony_profile, "dev");
}
#[test]
fn test_remote_k8s_from_env_var_only_context() {
run_in_isolated_env(|| {
unsafe {
std::env::remove_var("KUBECONFIG");
}
let (config_var, profile_var) = setup_env_vars(Some("context=bar"), None);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.kubeconfig, None);
assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
});
}
#[test]
fn test_remote_k8s_from_env_var_unknown_key_trim() {
run_in_isolated_env(|| {
unsafe {
std::env::remove_var("KUBECONFIG");
}
let (config_var, profile_var) = setup_env_vars(
Some(" unknown=bla , kubeconfig= /foo.kc ,context= bar "),
None,
);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.kubeconfig.as_deref(), Some("/foo.kc"));
assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
});
}
#[test]
fn test_remote_k8s_from_env_var_empty_malformed() {
run_in_isolated_env(|| {
unsafe {
std::env::remove_var("KUBECONFIG");
}
let (config_var, profile_var) = setup_env_vars(Some("malformed,no=,equal"), None);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
// Unknown/malformed ignored, defaults to None
assert_eq!(cfg.kubeconfig, None);
assert_eq!(cfg.k8s_context, None);
});
}
#[test]
fn test_remote_k8s_from_env_var_kubeconfig_fallback() {
run_in_isolated_env(|| {
unsafe {
std::env::set_var("KUBECONFIG", "/fallback/path");
}
let (config_var, profile_var) = setup_env_vars(Some("context=bar"), None);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(cfg.kubeconfig.as_deref(), Some("/fallback/path"));
assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
});
}
#[test]
fn test_remote_k8s_from_env_var_kubeconfig_no_fallback_if_provided() {
run_in_isolated_env(|| {
unsafe {
std::env::set_var("KUBECONFIG", "/fallback/path");
}
let (config_var, profile_var) =
setup_env_vars(Some("kubeconfig=/primary/path,context=bar"), None);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
// Primary path should take precedence
assert_eq!(cfg.kubeconfig.as_deref(), Some("/primary/path"));
assert_eq!(cfg.k8s_context.as_deref(), Some("bar"));
});
}
#[test]
#[should_panic(expected = "Missing required env var")]
fn test_remote_k8s_from_env_var_missing() {
let (config_var, profile_var) = setup_env_vars(None, None);
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
}
#[test]
fn test_remote_k8s_from_env_var_context_key() {
let (config_var, profile_var) = setup_env_vars(
Some("context=default/api-sto1-harmony-mcd:6443/kube:admin"),
None,
);
let cfg =
K8sAnywhereConfig::remote_k8s_from_env_var_with_profile(&config_var, &profile_var);
assert_eq!(
cfg.k8s_context.as_deref(),
Some("default/api-sto1-harmony-mcd:6443/kube:admin")
);
}
}