## Description * Replace the CatalogSource approach to install the OperatorHub.io catalog by a more simple & straightforward way to install NMState * Improve logging * Add report summarizing the host network configuration that was applied (which host, bonds, port-channels) * Fix command to find next available port channel id ## Extra info Using the `apply_url` approach to install the NMState operator isn't the best approach: it's harder to maintain and upgrade. But it helps us achieve waht we wanted for now: install the NMState Operator to configure bonds on a host. The preferred approach, installing an operator from the OperatorHub.io catalog, didn't work for now. We had a timeout error with DeadlineExceeded probably caused by an insufficient CPU/Memory allocation to query such a big catalog, even though we tweaked the RAM allocation (we couldn't find a way to do it for CPU). Spent too much time on this so we stopped these efforts for now. It would be good to get back to it when we need to install something else from a custom catalog. Reviewed-on: #175
1017 lines
36 KiB
Rust
1017 lines
36 KiB
Rust
use std::{collections::BTreeMap, process::Command, sync::Arc, time::Duration};
|
|
|
|
use async_trait::async_trait;
|
|
use base64::{Engine, engine::general_purpose};
|
|
use k8s_openapi::api::{
|
|
core::v1::Secret,
|
|
rbac::v1::{ClusterRoleBinding, RoleRef, Subject},
|
|
};
|
|
use kube::api::{DynamicObject, GroupVersionKind, ObjectMeta};
|
|
use log::{debug, info, warn};
|
|
use serde::Serialize;
|
|
use tokio::sync::OnceCell;
|
|
|
|
use crate::{
|
|
executors::ExecutorError,
|
|
interpret::InterpretStatus,
|
|
inventory::Inventory,
|
|
modules::{
|
|
k3d::K3DInstallationScore,
|
|
k8s::ingress::{K8sIngressScore, PathType},
|
|
monitoring::{
|
|
grafana::{grafana::Grafana, helm::helm_grafana::grafana_helm_chart_score},
|
|
kube_prometheus::crd::{
|
|
crd_alertmanager_config::CRDPrometheus,
|
|
crd_grafana::{
|
|
Grafana as GrafanaCRD, GrafanaCom, GrafanaDashboard,
|
|
GrafanaDashboardDatasource, GrafanaDashboardSpec, GrafanaDatasource,
|
|
GrafanaDatasourceConfig, GrafanaDatasourceJsonData,
|
|
GrafanaDatasourceSecureJsonData, GrafanaDatasourceSpec, GrafanaSpec,
|
|
},
|
|
crd_prometheuses::LabelSelector,
|
|
prometheus_operator::prometheus_operator_helm_chart_score,
|
|
rhob_alertmanager_config::RHOBObservability,
|
|
service_monitor::ServiceMonitor,
|
|
},
|
|
},
|
|
prometheus::{
|
|
k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore,
|
|
prometheus::PrometheusMonitoring, rhob_alerting_score::RHOBAlertingScore,
|
|
},
|
|
},
|
|
score::Score,
|
|
topology::ingress::Ingress,
|
|
};
|
|
|
|
use super::{
|
|
DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, PreparationError,
|
|
PreparationOutcome, Topology,
|
|
k8s::K8sClient,
|
|
oberservability::monitoring::AlertReceiver,
|
|
tenant::{
|
|
TenantConfig, TenantManager,
|
|
k8s::K8sTenantManager,
|
|
network_policy::{
|
|
K3dNetworkPolicyStrategy, NetworkPolicyStrategy, NoopNetworkPolicyStrategy,
|
|
},
|
|
},
|
|
};
|
|
|
|
#[derive(Clone, Debug)]
|
|
struct K8sState {
|
|
client: Arc<K8sClient>,
|
|
source: K8sSource,
|
|
message: String,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
pub enum KubernetesDistribution {
|
|
OpenshiftFamily,
|
|
K3sFamily,
|
|
Default,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
enum K8sSource {
|
|
LocalK3d,
|
|
Kubeconfig,
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct K8sAnywhereTopology {
|
|
k8s_state: Arc<OnceCell<Option<K8sState>>>,
|
|
tenant_manager: Arc<OnceCell<K8sTenantManager>>,
|
|
k8s_distribution: Arc<OnceCell<KubernetesDistribution>>,
|
|
config: Arc<K8sAnywhereConfig>,
|
|
}
|
|
|
|
#[async_trait]
|
|
impl K8sclient for K8sAnywhereTopology {
|
|
async fn k8s_client(&self) -> Result<Arc<K8sClient>, String> {
|
|
let state = match self.k8s_state.get() {
|
|
Some(state) => state,
|
|
None => return Err("K8s state not initialized yet".to_string()),
|
|
};
|
|
|
|
let state = match state {
|
|
Some(state) => state,
|
|
None => return Err("K8s client initialized but empty".to_string()),
|
|
};
|
|
|
|
Ok(state.client.clone())
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Grafana for K8sAnywhereTopology {
|
|
async fn ensure_grafana_operator(
|
|
&self,
|
|
inventory: &Inventory,
|
|
) -> Result<PreparationOutcome, PreparationError> {
|
|
debug!("ensure grafana operator");
|
|
let client = self.k8s_client().await.unwrap();
|
|
let grafana_gvk = GroupVersionKind {
|
|
group: "grafana.integreatly.org".to_string(),
|
|
version: "v1beta1".to_string(),
|
|
kind: "Grafana".to_string(),
|
|
};
|
|
let name = "grafanas.grafana.integreatly.org";
|
|
let ns = "grafana";
|
|
|
|
let grafana_crd = client
|
|
.get_resource_json_value(name, Some(ns), &grafana_gvk)
|
|
.await;
|
|
match grafana_crd {
|
|
Ok(_) => {
|
|
return Ok(PreparationOutcome::Success {
|
|
details: "Found grafana CRDs in cluster".to_string(),
|
|
});
|
|
}
|
|
|
|
Err(_) => {
|
|
return self
|
|
.install_grafana_operator(inventory, Some("grafana"))
|
|
.await;
|
|
}
|
|
};
|
|
}
|
|
async fn install_grafana(&self) -> Result<PreparationOutcome, PreparationError> {
|
|
let ns = "grafana";
|
|
|
|
let mut label = BTreeMap::new();
|
|
|
|
label.insert("dashboards".to_string(), "grafana".to_string());
|
|
|
|
let label_selector = LabelSelector {
|
|
match_labels: label.clone(),
|
|
match_expressions: vec![],
|
|
};
|
|
|
|
let client = self.k8s_client().await?;
|
|
|
|
let grafana = self.build_grafana(ns, &label);
|
|
|
|
client.apply(&grafana, Some(ns)).await?;
|
|
//TODO change this to a ensure ready or something better than just a timeout
|
|
client
|
|
.wait_until_deployment_ready(
|
|
"grafana-grafana-deployment",
|
|
Some("grafana"),
|
|
Some(Duration::from_secs(30)),
|
|
)
|
|
.await?;
|
|
|
|
let sa_name = "grafana-grafana-sa";
|
|
let token_secret_name = "grafana-sa-token-secret";
|
|
|
|
let sa_token_secret = self.build_sa_token_secret(token_secret_name, sa_name, ns);
|
|
|
|
client.apply(&sa_token_secret, Some(ns)).await?;
|
|
let secret_gvk = GroupVersionKind {
|
|
group: "".to_string(),
|
|
version: "v1".to_string(),
|
|
kind: "Secret".to_string(),
|
|
};
|
|
|
|
let secret = client
|
|
.get_resource_json_value(token_secret_name, Some(ns), &secret_gvk)
|
|
.await?;
|
|
|
|
let token = format!(
|
|
"Bearer {}",
|
|
self.extract_and_normalize_token(&secret).unwrap()
|
|
);
|
|
|
|
debug!("creating grafana clusterrole binding");
|
|
|
|
let clusterrolebinding =
|
|
self.build_cluster_rolebinding(sa_name, "cluster-monitoring-view", ns);
|
|
|
|
client.apply(&clusterrolebinding, Some(ns)).await?;
|
|
|
|
debug!("creating grafana datasource crd");
|
|
|
|
let thanos_url = format!(
|
|
"https://{}",
|
|
self.get_domain("thanos-querier-openshift-monitoring")
|
|
.await
|
|
.unwrap()
|
|
);
|
|
|
|
let thanos_openshift_datasource = self.build_grafana_datasource(
|
|
"thanos-openshift-monitoring",
|
|
ns,
|
|
&label_selector,
|
|
&thanos_url,
|
|
&token,
|
|
);
|
|
|
|
client.apply(&thanos_openshift_datasource, Some(ns)).await?;
|
|
|
|
debug!("creating grafana dashboard crd");
|
|
let dashboard = self.build_grafana_dashboard(ns, &label_selector);
|
|
|
|
client.apply(&dashboard, Some(ns)).await?;
|
|
debug!("creating grafana ingress");
|
|
let grafana_ingress = self.build_grafana_ingress(ns).await;
|
|
|
|
grafana_ingress
|
|
.interpret(&Inventory::empty(), self)
|
|
.await
|
|
.map_err(|e| PreparationError::new(e.to_string()))?;
|
|
|
|
Ok(PreparationOutcome::Success {
|
|
details: "Installed grafana composants".to_string(),
|
|
})
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl PrometheusMonitoring<CRDPrometheus> for K8sAnywhereTopology {
|
|
async fn install_prometheus(
|
|
&self,
|
|
sender: &CRDPrometheus,
|
|
_inventory: &Inventory,
|
|
_receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>,
|
|
) -> Result<PreparationOutcome, PreparationError> {
|
|
let client = self.k8s_client().await?;
|
|
|
|
for monitor in sender.service_monitor.iter() {
|
|
client
|
|
.apply(monitor, Some(&sender.namespace))
|
|
.await
|
|
.map_err(|e| PreparationError::new(e.to_string()))?;
|
|
}
|
|
Ok(PreparationOutcome::Success {
|
|
details: "successfuly installed prometheus components".to_string(),
|
|
})
|
|
}
|
|
|
|
async fn ensure_prometheus_operator(
|
|
&self,
|
|
sender: &CRDPrometheus,
|
|
_inventory: &Inventory,
|
|
) -> Result<PreparationOutcome, PreparationError> {
|
|
let po_result = self.ensure_prometheus_operator(sender).await?;
|
|
|
|
match po_result {
|
|
PreparationOutcome::Success { details: _ } => {
|
|
debug!("Detected prometheus crds operator present in cluster.");
|
|
return Ok(po_result);
|
|
}
|
|
PreparationOutcome::Noop => {
|
|
debug!("Skipping Prometheus CR installation due to missing operator.");
|
|
return Ok(po_result);
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl PrometheusMonitoring<RHOBObservability> for K8sAnywhereTopology {
|
|
async fn install_prometheus(
|
|
&self,
|
|
sender: &RHOBObservability,
|
|
inventory: &Inventory,
|
|
receivers: Option<Vec<Box<dyn AlertReceiver<RHOBObservability>>>>,
|
|
) -> Result<PreparationOutcome, PreparationError> {
|
|
let po_result = self.ensure_cluster_observability_operator(sender).await?;
|
|
|
|
if po_result == PreparationOutcome::Noop {
|
|
debug!("Skipping Prometheus CR installation due to missing operator.");
|
|
return Ok(po_result);
|
|
}
|
|
|
|
let result = self
|
|
.get_cluster_observability_operator_prometheus_application_score(
|
|
sender.clone(),
|
|
receivers,
|
|
)
|
|
.await
|
|
.interpret(inventory, self)
|
|
.await;
|
|
|
|
match result {
|
|
Ok(outcome) => match outcome.status {
|
|
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
|
|
details: outcome.message,
|
|
}),
|
|
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
|
|
_ => Err(PreparationError::new(outcome.message)),
|
|
},
|
|
Err(err) => Err(PreparationError::new(err.to_string())),
|
|
}
|
|
}
|
|
|
|
async fn ensure_prometheus_operator(
|
|
&self,
|
|
sender: &RHOBObservability,
|
|
inventory: &Inventory,
|
|
) -> Result<PreparationOutcome, PreparationError> {
|
|
todo!()
|
|
}
|
|
}
|
|
|
|
impl Serialize for K8sAnywhereTopology {
|
|
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
|
|
where
|
|
S: serde::Serializer,
|
|
{
|
|
todo!()
|
|
}
|
|
}
|
|
|
|
impl K8sAnywhereTopology {
|
|
pub fn from_env() -> Self {
|
|
Self {
|
|
k8s_state: Arc::new(OnceCell::new()),
|
|
tenant_manager: Arc::new(OnceCell::new()),
|
|
k8s_distribution: Arc::new(OnceCell::new()),
|
|
config: Arc::new(K8sAnywhereConfig::from_env()),
|
|
}
|
|
}
|
|
|
|
pub fn with_config(config: K8sAnywhereConfig) -> Self {
|
|
Self {
|
|
k8s_state: Arc::new(OnceCell::new()),
|
|
tenant_manager: Arc::new(OnceCell::new()),
|
|
k8s_distribution: Arc::new(OnceCell::new()),
|
|
config: Arc::new(config),
|
|
}
|
|
}
|
|
|
|
pub async fn get_k8s_distribution(&self) -> Result<&KubernetesDistribution, PreparationError> {
|
|
self.k8s_distribution
|
|
.get_or_try_init(async || {
|
|
let client = self.k8s_client().await.unwrap();
|
|
|
|
let discovery = client.discovery().await.map_err(|e| {
|
|
PreparationError::new(format!("Could not discover API groups: {}", e))
|
|
})?;
|
|
|
|
let version = client.get_apiserver_version().await.map_err(|e| {
|
|
PreparationError::new(format!("Could not get server version: {}", e))
|
|
})?;
|
|
|
|
// OpenShift / OKD
|
|
if discovery
|
|
.groups()
|
|
.any(|g| g.name() == "project.openshift.io")
|
|
{
|
|
return Ok(KubernetesDistribution::OpenshiftFamily);
|
|
}
|
|
|
|
// K3d / K3s
|
|
if version.git_version.contains("k3s") {
|
|
return Ok(KubernetesDistribution::K3sFamily);
|
|
}
|
|
|
|
return Ok(KubernetesDistribution::Default);
|
|
})
|
|
.await
|
|
}
|
|
|
|
fn extract_and_normalize_token(&self, secret: &DynamicObject) -> Option<String> {
|
|
let token_b64 = secret
|
|
.data
|
|
.get("token")
|
|
.or_else(|| secret.data.get("data").and_then(|d| d.get("token")))
|
|
.and_then(|v| v.as_str())?;
|
|
|
|
let bytes = general_purpose::STANDARD.decode(token_b64).ok()?;
|
|
|
|
let s = String::from_utf8(bytes).ok()?;
|
|
|
|
let cleaned = s
|
|
.trim_matches(|c: char| c.is_whitespace() || c == '\0')
|
|
.to_string();
|
|
Some(cleaned)
|
|
}
|
|
|
|
pub fn build_cluster_rolebinding(
|
|
&self,
|
|
service_account_name: &str,
|
|
clusterrole_name: &str,
|
|
ns: &str,
|
|
) -> ClusterRoleBinding {
|
|
ClusterRoleBinding {
|
|
metadata: ObjectMeta {
|
|
name: Some(format!("{}-view-binding", service_account_name)),
|
|
..Default::default()
|
|
},
|
|
role_ref: RoleRef {
|
|
api_group: "rbac.authorization.k8s.io".into(),
|
|
kind: "ClusterRole".into(),
|
|
name: clusterrole_name.into(),
|
|
},
|
|
subjects: Some(vec![Subject {
|
|
kind: "ServiceAccount".into(),
|
|
name: service_account_name.into(),
|
|
namespace: Some(ns.into()),
|
|
..Default::default()
|
|
}]),
|
|
}
|
|
}
|
|
|
|
pub fn build_sa_token_secret(
|
|
&self,
|
|
secret_name: &str,
|
|
service_account_name: &str,
|
|
ns: &str,
|
|
) -> Secret {
|
|
let mut annotations = BTreeMap::new();
|
|
annotations.insert(
|
|
"kubernetes.io/service-account.name".to_string(),
|
|
service_account_name.to_string(),
|
|
);
|
|
|
|
Secret {
|
|
metadata: ObjectMeta {
|
|
name: Some(secret_name.into()),
|
|
namespace: Some(ns.into()),
|
|
annotations: Some(annotations),
|
|
..Default::default()
|
|
},
|
|
type_: Some("kubernetes.io/service-account-token".to_string()),
|
|
..Default::default()
|
|
}
|
|
}
|
|
|
|
fn build_grafana_datasource(
|
|
&self,
|
|
name: &str,
|
|
ns: &str,
|
|
label_selector: &LabelSelector,
|
|
url: &str,
|
|
token: &str,
|
|
) -> GrafanaDatasource {
|
|
let mut json_data = BTreeMap::new();
|
|
json_data.insert("timeInterval".to_string(), "5s".to_string());
|
|
|
|
GrafanaDatasource {
|
|
metadata: ObjectMeta {
|
|
name: Some(name.to_string()),
|
|
namespace: Some(ns.to_string()),
|
|
..Default::default()
|
|
},
|
|
spec: GrafanaDatasourceSpec {
|
|
instance_selector: label_selector.clone(),
|
|
allow_cross_namespace_import: Some(true),
|
|
values_from: None,
|
|
datasource: GrafanaDatasourceConfig {
|
|
access: "proxy".to_string(),
|
|
name: name.to_string(),
|
|
r#type: "prometheus".to_string(),
|
|
url: url.to_string(),
|
|
database: None,
|
|
json_data: Some(GrafanaDatasourceJsonData {
|
|
time_interval: Some("60s".to_string()),
|
|
http_header_name1: Some("Authorization".to_string()),
|
|
tls_skip_verify: Some(true),
|
|
oauth_pass_thru: Some(true),
|
|
}),
|
|
secure_json_data: Some(GrafanaDatasourceSecureJsonData {
|
|
http_header_value1: Some(format!("Bearer {token}")),
|
|
}),
|
|
is_default: Some(false),
|
|
editable: Some(true),
|
|
},
|
|
},
|
|
}
|
|
}
|
|
|
|
fn build_grafana_dashboard(
|
|
&self,
|
|
ns: &str,
|
|
label_selector: &LabelSelector,
|
|
) -> GrafanaDashboard {
|
|
let graf_dashboard = GrafanaDashboard {
|
|
metadata: ObjectMeta {
|
|
name: Some(format!("grafana-dashboard-{}", ns)),
|
|
namespace: Some(ns.to_string()),
|
|
..Default::default()
|
|
},
|
|
spec: GrafanaDashboardSpec {
|
|
resync_period: Some("30s".to_string()),
|
|
instance_selector: label_selector.clone(),
|
|
datasources: Some(vec![GrafanaDashboardDatasource {
|
|
input_name: "DS_PROMETHEUS".to_string(),
|
|
datasource_name: "thanos-openshift-monitoring".to_string(),
|
|
}]),
|
|
json: None,
|
|
grafana_com: Some(GrafanaCom {
|
|
id: 17406,
|
|
revision: None,
|
|
}),
|
|
},
|
|
};
|
|
graf_dashboard
|
|
}
|
|
|
|
fn build_grafana(&self, ns: &str, labels: &BTreeMap<String, String>) -> GrafanaCRD {
|
|
let grafana = GrafanaCRD {
|
|
metadata: ObjectMeta {
|
|
name: Some(format!("grafana-{}", ns)),
|
|
namespace: Some(ns.to_string()),
|
|
labels: Some(labels.clone()),
|
|
..Default::default()
|
|
},
|
|
spec: GrafanaSpec {
|
|
config: None,
|
|
admin_user: None,
|
|
admin_password: None,
|
|
ingress: None,
|
|
persistence: None,
|
|
resources: None,
|
|
},
|
|
};
|
|
grafana
|
|
}
|
|
|
|
async fn build_grafana_ingress(&self, ns: &str) -> K8sIngressScore {
|
|
let domain = self.get_domain(&format!("grafana-{}", ns)).await.unwrap();
|
|
let name = format!("{}-grafana", ns);
|
|
let backend_service = format!("grafana-{}-service", ns);
|
|
|
|
K8sIngressScore {
|
|
name: fqdn::fqdn!(&name),
|
|
host: fqdn::fqdn!(&domain),
|
|
backend_service: fqdn::fqdn!(&backend_service),
|
|
port: 3000,
|
|
path: Some("/".to_string()),
|
|
path_type: Some(PathType::Prefix),
|
|
namespace: Some(fqdn::fqdn!(&ns)),
|
|
ingress_class_name: Some("openshift-default".to_string()),
|
|
}
|
|
}
|
|
|
|
async fn get_cluster_observability_operator_prometheus_application_score(
|
|
&self,
|
|
sender: RHOBObservability,
|
|
receivers: Option<Vec<Box<dyn AlertReceiver<RHOBObservability>>>>,
|
|
) -> RHOBAlertingScore {
|
|
RHOBAlertingScore {
|
|
sender,
|
|
receivers: receivers.unwrap_or_default(),
|
|
service_monitors: vec![],
|
|
prometheus_rules: vec![],
|
|
}
|
|
}
|
|
|
|
async fn get_k8s_prometheus_application_score(
|
|
&self,
|
|
sender: CRDPrometheus,
|
|
receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>,
|
|
service_monitors: Option<Vec<ServiceMonitor>>,
|
|
) -> K8sPrometheusCRDAlertingScore {
|
|
return K8sPrometheusCRDAlertingScore {
|
|
sender,
|
|
receivers: receivers.unwrap_or_default(),
|
|
service_monitors: service_monitors.unwrap_or_default(),
|
|
prometheus_rules: vec![],
|
|
};
|
|
}
|
|
|
|
async fn openshift_ingress_operator_available(&self) -> Result<(), PreparationError> {
|
|
let client = self.k8s_client().await?;
|
|
let gvk = GroupVersionKind {
|
|
group: "operator.openshift.io".into(),
|
|
version: "v1".into(),
|
|
kind: "IngressController".into(),
|
|
};
|
|
let ic = client
|
|
.get_resource_json_value("default", Some("openshift-ingress-operator"), &gvk)
|
|
.await?;
|
|
let ready_replicas = ic.data["status"]["availableReplicas"].as_i64().unwrap_or(0);
|
|
if ready_replicas >= 1 {
|
|
return Ok(());
|
|
} else {
|
|
return Err(PreparationError::new(
|
|
"openshift-ingress-operator not available".to_string(),
|
|
));
|
|
}
|
|
}
|
|
|
|
fn is_helm_available(&self) -> Result<(), String> {
|
|
let version_result = Command::new("helm")
|
|
.arg("version")
|
|
.output()
|
|
.map_err(|e| format!("Failed to execute 'helm -version': {}", e))?;
|
|
|
|
if !version_result.status.success() {
|
|
return Err("Failed to run 'helm -version'".to_string());
|
|
}
|
|
|
|
let version_output = String::from_utf8_lossy(&version_result.stdout);
|
|
debug!("Helm version: {}", version_output.trim());
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn try_load_system_kubeconfig(&self) -> Option<K8sClient> {
|
|
todo!("Use kube-rs default behavior to load system kubeconfig");
|
|
}
|
|
|
|
async fn try_load_kubeconfig(&self, path: &str) -> Option<K8sClient> {
|
|
K8sClient::from_kubeconfig(path).await
|
|
}
|
|
|
|
fn get_k3d_installation_score(&self) -> K3DInstallationScore {
|
|
K3DInstallationScore::default()
|
|
}
|
|
|
|
async fn try_install_k3d(&self) -> Result<(), PreparationError> {
|
|
let result = self
|
|
.get_k3d_installation_score()
|
|
.interpret(&Inventory::empty(), self)
|
|
.await;
|
|
|
|
match result {
|
|
Ok(outcome) => match outcome.status {
|
|
InterpretStatus::SUCCESS => Ok(()),
|
|
InterpretStatus::NOOP => Ok(()),
|
|
_ => Err(PreparationError::new(outcome.message)),
|
|
},
|
|
Err(err) => Err(PreparationError::new(err.to_string())),
|
|
}
|
|
}
|
|
|
|
async fn try_get_or_install_k8s_client(&self) -> Result<Option<K8sState>, PreparationError> {
|
|
let k8s_anywhere_config = &self.config;
|
|
|
|
// TODO this deserves some refactoring, it is becoming a bit hard to figure out
|
|
// be careful when making modifications here
|
|
if k8s_anywhere_config.use_local_k3d {
|
|
debug!("Using local k3d cluster because of use_local_k3d set to true");
|
|
} else {
|
|
if let Some(kubeconfig) = &k8s_anywhere_config.kubeconfig {
|
|
debug!("Loading kubeconfig {kubeconfig}");
|
|
match self.try_load_kubeconfig(kubeconfig).await {
|
|
Some(client) => {
|
|
return Ok(Some(K8sState {
|
|
client: Arc::new(client),
|
|
source: K8sSource::Kubeconfig,
|
|
message: format!("Loaded k8s client from kubeconfig {kubeconfig}"),
|
|
}));
|
|
}
|
|
None => {
|
|
return Err(PreparationError::new(format!(
|
|
"Failed to load kubeconfig from {kubeconfig}"
|
|
)));
|
|
}
|
|
}
|
|
}
|
|
|
|
if k8s_anywhere_config.use_system_kubeconfig {
|
|
debug!("Loading system kubeconfig");
|
|
match self.try_load_system_kubeconfig().await {
|
|
Some(_client) => todo!(),
|
|
None => todo!(),
|
|
}
|
|
}
|
|
|
|
info!("No kubernetes configuration found");
|
|
}
|
|
|
|
if !k8s_anywhere_config.autoinstall {
|
|
warn!(
|
|
"Installation cancelled, K8sAnywhere could not initialize a valid Kubernetes client"
|
|
);
|
|
return Ok(None);
|
|
}
|
|
|
|
debug!("Starting K8sAnywhere installation");
|
|
self.try_install_k3d().await?;
|
|
let k3d_score = self.get_k3d_installation_score();
|
|
// I feel like having to rely on the k3d_rs crate here is a smell
|
|
// I think we should have a way to interact more deeply with scores/interpret. Maybe the
|
|
// K3DInstallationScore should expose a method to get_client ? Not too sure what would be a
|
|
// good implementation due to the stateful nature of the k3d thing. Which is why I went
|
|
// with this solution for now
|
|
let k3d = k3d_rs::K3d::new(k3d_score.installation_path, Some(k3d_score.cluster_name));
|
|
let state = match k3d.get_client().await {
|
|
Ok(client) => K8sState {
|
|
client: Arc::new(K8sClient::new(client)),
|
|
source: K8sSource::LocalK3d,
|
|
message: "K8s client ready".to_string(),
|
|
},
|
|
Err(_) => todo!(),
|
|
};
|
|
|
|
Ok(Some(state))
|
|
}
|
|
|
|
async fn ensure_k8s_tenant_manager(&self, k8s_state: &K8sState) -> Result<(), String> {
|
|
if self.tenant_manager.get().is_some() {
|
|
return Ok(());
|
|
}
|
|
|
|
self.tenant_manager
|
|
.get_or_try_init(async || -> Result<K8sTenantManager, String> {
|
|
let k8s_client = self.k8s_client().await?;
|
|
let network_policy_strategy: Box<dyn NetworkPolicyStrategy> = match k8s_state.source
|
|
{
|
|
K8sSource::LocalK3d => Box::new(K3dNetworkPolicyStrategy::new()),
|
|
K8sSource::Kubeconfig => Box::new(NoopNetworkPolicyStrategy::new()),
|
|
};
|
|
|
|
Ok(K8sTenantManager::new(k8s_client, network_policy_strategy))
|
|
})
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn get_k8s_tenant_manager(&self) -> Result<&K8sTenantManager, ExecutorError> {
|
|
match self.tenant_manager.get() {
|
|
Some(t) => Ok(t),
|
|
None => Err(ExecutorError::UnexpectedError(
|
|
"K8sTenantManager not available".to_string(),
|
|
)),
|
|
}
|
|
}
|
|
|
|
async fn ensure_cluster_observability_operator(
|
|
&self,
|
|
sender: &RHOBObservability,
|
|
) -> Result<PreparationOutcome, PreparationError> {
|
|
let status = Command::new("sh")
|
|
.args(["-c", "kubectl get crd -A | grep -i rhobs"])
|
|
.status()
|
|
.map_err(|e| PreparationError::new(format!("could not connect to cluster: {}", e)))?;
|
|
|
|
if !status.success() {
|
|
if let Some(Some(k8s_state)) = self.k8s_state.get() {
|
|
match k8s_state.source {
|
|
K8sSource::LocalK3d => {
|
|
warn!(
|
|
"Installing observability operator is not supported on LocalK3d source"
|
|
);
|
|
return Ok(PreparationOutcome::Noop);
|
|
debug!("installing cluster observability operator");
|
|
todo!();
|
|
let op_score =
|
|
prometheus_operator_helm_chart_score(sender.namespace.clone());
|
|
let result = op_score.interpret(&Inventory::empty(), self).await;
|
|
|
|
return match result {
|
|
Ok(outcome) => match outcome.status {
|
|
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
|
|
details: "installed cluster observability operator".into(),
|
|
}),
|
|
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
|
|
_ => Err(PreparationError::new(
|
|
"failed to install cluster observability operator (unknown error)".into(),
|
|
)),
|
|
},
|
|
Err(err) => Err(PreparationError::new(err.to_string())),
|
|
};
|
|
}
|
|
K8sSource::Kubeconfig => {
|
|
debug!(
|
|
"unable to install cluster observability operator, contact cluster admin"
|
|
);
|
|
return Ok(PreparationOutcome::Noop);
|
|
}
|
|
}
|
|
} else {
|
|
warn!(
|
|
"Unable to detect k8s_state. Skipping Cluster Observability Operator install."
|
|
);
|
|
return Ok(PreparationOutcome::Noop);
|
|
}
|
|
}
|
|
|
|
debug!("Cluster Observability Operator is already present, skipping install");
|
|
|
|
Ok(PreparationOutcome::Success {
|
|
details: "cluster observability operator present in cluster".into(),
|
|
})
|
|
}
|
|
|
|
async fn ensure_prometheus_operator(
|
|
&self,
|
|
sender: &CRDPrometheus,
|
|
) -> Result<PreparationOutcome, PreparationError> {
|
|
let status = Command::new("sh")
|
|
.args(["-c", "kubectl get crd -A | grep -i prometheuses"])
|
|
.status()
|
|
.map_err(|e| PreparationError::new(format!("could not connect to cluster: {}", e)))?;
|
|
|
|
if !status.success() {
|
|
if let Some(Some(k8s_state)) = self.k8s_state.get() {
|
|
match k8s_state.source {
|
|
K8sSource::LocalK3d => {
|
|
debug!("installing prometheus operator");
|
|
let op_score =
|
|
prometheus_operator_helm_chart_score(sender.namespace.clone());
|
|
let result = op_score.interpret(&Inventory::empty(), self).await;
|
|
|
|
return match result {
|
|
Ok(outcome) => match outcome.status {
|
|
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
|
|
details: "installed prometheus operator".into(),
|
|
}),
|
|
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
|
|
_ => Err(PreparationError::new(
|
|
"failed to install prometheus operator (unknown error)".into(),
|
|
)),
|
|
},
|
|
Err(err) => Err(PreparationError::new(err.to_string())),
|
|
};
|
|
}
|
|
K8sSource::Kubeconfig => {
|
|
debug!("unable to install prometheus operator, contact cluster admin");
|
|
return Ok(PreparationOutcome::Noop);
|
|
}
|
|
}
|
|
} else {
|
|
warn!("Unable to detect k8s_state. Skipping Prometheus Operator install.");
|
|
return Ok(PreparationOutcome::Noop);
|
|
}
|
|
}
|
|
|
|
debug!("Prometheus operator is already present, skipping install");
|
|
|
|
Ok(PreparationOutcome::Success {
|
|
details: "prometheus operator present in cluster".into(),
|
|
})
|
|
}
|
|
|
|
async fn install_grafana_operator(
|
|
&self,
|
|
inventory: &Inventory,
|
|
ns: Option<&str>,
|
|
) -> Result<PreparationOutcome, PreparationError> {
|
|
let namespace = ns.unwrap_or("grafana");
|
|
info!("installing grafana operator in ns {namespace}");
|
|
let tenant = self.get_k8s_tenant_manager()?.get_tenant_config().await;
|
|
let mut namespace_scope = false;
|
|
if tenant.is_some() {
|
|
namespace_scope = true;
|
|
}
|
|
let _grafana_operator_score = grafana_helm_chart_score(namespace, namespace_scope)
|
|
.interpret(inventory, self)
|
|
.await
|
|
.map_err(|e| PreparationError::new(e.to_string()));
|
|
Ok(PreparationOutcome::Success {
|
|
details: format!(
|
|
"Successfully installed grafana operator in ns {}",
|
|
ns.unwrap()
|
|
),
|
|
})
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct K8sAnywhereConfig {
|
|
/// The path of the KUBECONFIG file that Harmony should use to interact with the Kubernetes
|
|
/// cluster
|
|
///
|
|
/// Default : None
|
|
pub kubeconfig: Option<String>,
|
|
|
|
/// Whether to use the system KUBECONFIG, either the environment variable or the file in the
|
|
/// default or configured location
|
|
///
|
|
/// Default : false
|
|
pub use_system_kubeconfig: bool,
|
|
|
|
/// Whether to install automatically a kubernetes cluster
|
|
///
|
|
/// When enabled, autoinstall will setup a K3D cluster on the localhost. https://k3d.io/stable/
|
|
///
|
|
/// Default: true
|
|
pub autoinstall: bool,
|
|
|
|
/// Whether to use local k3d cluster.
|
|
///
|
|
/// Takes precedence over other options, useful to avoid messing up a remote cluster by mistake
|
|
///
|
|
/// default: true
|
|
pub use_local_k3d: bool,
|
|
pub harmony_profile: String,
|
|
}
|
|
|
|
impl K8sAnywhereConfig {
|
|
fn from_env() -> Self {
|
|
Self {
|
|
kubeconfig: std::env::var("KUBECONFIG").ok().map(|v| v.to_string()),
|
|
use_system_kubeconfig: std::env::var("HARMONY_USE_SYSTEM_KUBECONFIG")
|
|
.map_or_else(|_| false, |v| v.parse().ok().unwrap_or(false)),
|
|
autoinstall: std::env::var("HARMONY_AUTOINSTALL")
|
|
.map_or_else(|_| true, |v| v.parse().ok().unwrap_or(false)),
|
|
// TODO harmony_profile should be managed at a more core level than this
|
|
harmony_profile: std::env::var("HARMONY_PROFILE").map_or_else(
|
|
|_| "dev".to_string(),
|
|
|v| v.parse().ok().unwrap_or("dev".to_string()),
|
|
),
|
|
use_local_k3d: std::env::var("HARMONY_USE_LOCAL_K3D")
|
|
.map_or_else(|_| true, |v| v.parse().ok().unwrap_or(true)),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Topology for K8sAnywhereTopology {
|
|
fn name(&self) -> &str {
|
|
"K8sAnywhereTopology"
|
|
}
|
|
|
|
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
|
|
let k8s_state = self
|
|
.k8s_state
|
|
.get_or_try_init(|| self.try_get_or_install_k8s_client())
|
|
.await?;
|
|
|
|
let k8s_state: &K8sState = k8s_state.as_ref().ok_or(PreparationError::new(
|
|
"no K8s client could be found or installed".to_string(),
|
|
))?;
|
|
|
|
self.ensure_k8s_tenant_manager(k8s_state)
|
|
.await
|
|
.map_err(PreparationError::new)?;
|
|
|
|
match self.is_helm_available() {
|
|
Ok(()) => Ok(PreparationOutcome::Success {
|
|
details: format!("{} + helm available", k8s_state.message.clone()),
|
|
}),
|
|
Err(e) => Err(PreparationError::new(format!("helm unavailable: {}", e))),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl MultiTargetTopology for K8sAnywhereTopology {
|
|
fn current_target(&self) -> DeploymentTarget {
|
|
if self.config.use_local_k3d {
|
|
return DeploymentTarget::LocalDev;
|
|
}
|
|
|
|
match self.config.harmony_profile.to_lowercase().as_str() {
|
|
"staging" => DeploymentTarget::Staging,
|
|
"production" => DeploymentTarget::Production,
|
|
_ => todo!("HARMONY_PROFILE must be set when use_local_k3d is false"),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl HelmCommand for K8sAnywhereTopology {}
|
|
|
|
#[async_trait]
|
|
impl TenantManager for K8sAnywhereTopology {
|
|
async fn provision_tenant(&self, config: &TenantConfig) -> Result<(), ExecutorError> {
|
|
self.get_k8s_tenant_manager()?
|
|
.provision_tenant(config)
|
|
.await
|
|
}
|
|
|
|
async fn get_tenant_config(&self) -> Option<TenantConfig> {
|
|
self.get_k8s_tenant_manager()
|
|
.ok()?
|
|
.get_tenant_config()
|
|
.await
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Ingress for K8sAnywhereTopology {
|
|
//TODO this is specifically for openshift/okd which violates the k8sanywhere idea
|
|
async fn get_domain(&self, service: &str) -> Result<String, PreparationError> {
|
|
let client = self.k8s_client().await?;
|
|
|
|
if let Some(Some(k8s_state)) = self.k8s_state.get() {
|
|
match k8s_state.source {
|
|
K8sSource::LocalK3d => Ok(format!("{service}.local.k3d")),
|
|
K8sSource::Kubeconfig => {
|
|
self.openshift_ingress_operator_available().await?;
|
|
|
|
let gvk = GroupVersionKind {
|
|
group: "operator.openshift.io".into(),
|
|
version: "v1".into(),
|
|
kind: "IngressController".into(),
|
|
};
|
|
let ic = client
|
|
.get_resource_json_value(
|
|
"default",
|
|
Some("openshift-ingress-operator"),
|
|
&gvk,
|
|
)
|
|
.await
|
|
.map_err(|_| {
|
|
PreparationError::new("Failed to fetch IngressController".to_string())
|
|
})?;
|
|
|
|
match ic.data["status"]["domain"].as_str() {
|
|
Some(domain) => Ok(format!("{service}.{domain}")),
|
|
None => Err(PreparationError::new("Could not find domain".to_string())),
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
Err(PreparationError::new(
|
|
"Cannot get domain: unable to detect K8s state".to_string(),
|
|
))
|
|
}
|
|
}
|
|
}
|