forked from NationTech/harmony
619 lines
22 KiB
Rust
619 lines
22 KiB
Rust
use std::{process::Command, sync::Arc};
|
|
|
|
use async_trait::async_trait;
|
|
use kube::api::GroupVersionKind;
|
|
use log::{debug, info, warn};
|
|
use serde::Serialize;
|
|
use tokio::sync::OnceCell;
|
|
|
|
use crate::{
|
|
executors::ExecutorError,
|
|
interpret::InterpretStatus,
|
|
inventory::Inventory,
|
|
modules::{
|
|
k3d::K3DInstallationScore,
|
|
monitoring::kube_prometheus::crd::{
|
|
crd_alertmanager_config::CRDPrometheus,
|
|
prometheus_operator::prometheus_operator_helm_chart_score,
|
|
rhob_alertmanager_config::RHOBObservability,
|
|
},
|
|
prometheus::{
|
|
k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore,
|
|
prometheus::PrometheusApplicationMonitoring, rhob_alerting_score::RHOBAlertingScore,
|
|
},
|
|
},
|
|
score::Score,
|
|
topology::ingress::Ingress,
|
|
};
|
|
|
|
use super::{
|
|
DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, PreparationError,
|
|
PreparationOutcome, Topology,
|
|
k8s::K8sClient,
|
|
oberservability::monitoring::AlertReceiver,
|
|
tenant::{
|
|
TenantConfig, TenantManager,
|
|
k8s::K8sTenantManager,
|
|
network_policy::{
|
|
K3dNetworkPolicyStrategy, NetworkPolicyStrategy, NoopNetworkPolicyStrategy,
|
|
},
|
|
},
|
|
};
|
|
|
|
#[derive(Clone, Debug)]
|
|
struct K8sState {
|
|
client: Arc<K8sClient>,
|
|
source: K8sSource,
|
|
message: String,
|
|
}
|
|
|
|
#[derive(Debug, Clone)]
|
|
enum K8sSource {
|
|
LocalK3d,
|
|
Kubeconfig,
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct K8sAnywhereTopology {
|
|
k8s_state: Arc<OnceCell<Option<K8sState>>>,
|
|
tenant_manager: Arc<OnceCell<K8sTenantManager>>,
|
|
config: Arc<K8sAnywhereConfig>,
|
|
}
|
|
|
|
#[async_trait]
|
|
impl K8sclient for K8sAnywhereTopology {
|
|
async fn k8s_client(&self) -> Result<Arc<K8sClient>, String> {
|
|
let state = match self.k8s_state.get() {
|
|
Some(state) => state,
|
|
None => return Err("K8s state not initialized yet".to_string()),
|
|
};
|
|
|
|
let state = match state {
|
|
Some(state) => state,
|
|
None => return Err("K8s client initialized but empty".to_string()),
|
|
};
|
|
|
|
Ok(state.client.clone())
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl PrometheusApplicationMonitoring<CRDPrometheus> for K8sAnywhereTopology {
|
|
async fn install_prometheus(
|
|
&self,
|
|
sender: &CRDPrometheus,
|
|
inventory: &Inventory,
|
|
receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>,
|
|
) -> Result<PreparationOutcome, PreparationError> {
|
|
let po_result = self.ensure_prometheus_operator(sender).await?;
|
|
|
|
if po_result == PreparationOutcome::Noop {
|
|
debug!("Skipping Prometheus CR installation due to missing operator.");
|
|
return Ok(po_result);
|
|
}
|
|
|
|
let result = self
|
|
.get_k8s_prometheus_application_score(sender.clone(), receivers)
|
|
.await
|
|
.interpret(inventory, self)
|
|
.await;
|
|
|
|
match result {
|
|
Ok(outcome) => match outcome.status {
|
|
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
|
|
details: outcome.message,
|
|
}),
|
|
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
|
|
_ => Err(PreparationError::new(outcome.message)),
|
|
},
|
|
Err(err) => Err(PreparationError::new(err.to_string())),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl PrometheusApplicationMonitoring<RHOBObservability> for K8sAnywhereTopology {
|
|
async fn install_prometheus(
|
|
&self,
|
|
sender: &RHOBObservability,
|
|
inventory: &Inventory,
|
|
receivers: Option<Vec<Box<dyn AlertReceiver<RHOBObservability>>>>,
|
|
) -> Result<PreparationOutcome, PreparationError> {
|
|
let po_result = self.ensure_cluster_observability_operator(sender).await?;
|
|
|
|
if po_result == PreparationOutcome::Noop {
|
|
debug!("Skipping Prometheus CR installation due to missing operator.");
|
|
return Ok(po_result);
|
|
}
|
|
|
|
let result = self
|
|
.get_cluster_observability_operator_prometheus_application_score(
|
|
sender.clone(),
|
|
receivers,
|
|
)
|
|
.await
|
|
.interpret(inventory, self)
|
|
.await;
|
|
|
|
match result {
|
|
Ok(outcome) => match outcome.status {
|
|
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
|
|
details: outcome.message,
|
|
}),
|
|
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
|
|
_ => Err(PreparationError::new(outcome.message)),
|
|
},
|
|
Err(err) => Err(PreparationError::new(err.to_string())),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl Serialize for K8sAnywhereTopology {
|
|
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
|
|
where
|
|
S: serde::Serializer,
|
|
{
|
|
todo!()
|
|
}
|
|
}
|
|
|
|
impl K8sAnywhereTopology {
|
|
pub fn from_env() -> Self {
|
|
Self {
|
|
k8s_state: Arc::new(OnceCell::new()),
|
|
tenant_manager: Arc::new(OnceCell::new()),
|
|
config: Arc::new(K8sAnywhereConfig::from_env()),
|
|
}
|
|
}
|
|
|
|
pub fn with_config(config: K8sAnywhereConfig) -> Self {
|
|
Self {
|
|
k8s_state: Arc::new(OnceCell::new()),
|
|
tenant_manager: Arc::new(OnceCell::new()),
|
|
config: Arc::new(config),
|
|
}
|
|
}
|
|
|
|
async fn get_cluster_observability_operator_prometheus_application_score(
|
|
&self,
|
|
sender: RHOBObservability,
|
|
receivers: Option<Vec<Box<dyn AlertReceiver<RHOBObservability>>>>,
|
|
) -> RHOBAlertingScore {
|
|
RHOBAlertingScore {
|
|
sender,
|
|
receivers: receivers.unwrap_or_default(),
|
|
service_monitors: vec![],
|
|
prometheus_rules: vec![],
|
|
}
|
|
}
|
|
|
|
async fn get_k8s_prometheus_application_score(
|
|
&self,
|
|
sender: CRDPrometheus,
|
|
receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>,
|
|
) -> K8sPrometheusCRDAlertingScore {
|
|
K8sPrometheusCRDAlertingScore {
|
|
sender,
|
|
receivers: receivers.unwrap_or_default(),
|
|
service_monitors: vec![],
|
|
prometheus_rules: vec![],
|
|
}
|
|
}
|
|
|
|
async fn openshift_ingress_operator_available(&self) -> Result<(), PreparationError> {
|
|
let client = self.k8s_client().await?;
|
|
let gvk = GroupVersionKind {
|
|
group: "operator.openshift.io".into(),
|
|
version: "v1".into(),
|
|
kind: "IngressController".into(),
|
|
};
|
|
let ic = client
|
|
.get_resource_json_value("default", Some("openshift-ingress-operator"), &gvk)
|
|
.await?;
|
|
let ready_replicas = ic.data["status"]["availableReplicas"].as_i64().unwrap_or(0);
|
|
if ready_replicas >= 1 {
|
|
return Ok(());
|
|
} else {
|
|
return Err(PreparationError::new(
|
|
"openshift-ingress-operator not available".to_string(),
|
|
));
|
|
}
|
|
}
|
|
|
|
fn is_helm_available(&self) -> Result<(), String> {
|
|
let version_result = Command::new("helm")
|
|
.arg("version")
|
|
.output()
|
|
.map_err(|e| format!("Failed to execute 'helm -version': {}", e))?;
|
|
|
|
if !version_result.status.success() {
|
|
return Err("Failed to run 'helm -version'".to_string());
|
|
}
|
|
|
|
let version_output = String::from_utf8_lossy(&version_result.stdout);
|
|
debug!("Helm version: {}", version_output.trim());
|
|
|
|
Ok(())
|
|
}
|
|
|
|
async fn try_load_system_kubeconfig(&self) -> Option<K8sClient> {
|
|
todo!("Use kube-rs default behavior to load system kubeconfig");
|
|
}
|
|
|
|
async fn try_load_kubeconfig(&self, path: &str) -> Option<K8sClient> {
|
|
K8sClient::from_kubeconfig(path).await
|
|
}
|
|
|
|
fn get_k3d_installation_score(&self) -> K3DInstallationScore {
|
|
K3DInstallationScore::default()
|
|
}
|
|
|
|
async fn try_install_k3d(&self) -> Result<(), PreparationError> {
|
|
let result = self
|
|
.get_k3d_installation_score()
|
|
.interpret(&Inventory::empty(), self)
|
|
.await;
|
|
|
|
match result {
|
|
Ok(outcome) => match outcome.status {
|
|
InterpretStatus::SUCCESS => Ok(()),
|
|
InterpretStatus::NOOP => Ok(()),
|
|
_ => Err(PreparationError::new(outcome.message)),
|
|
},
|
|
Err(err) => Err(PreparationError::new(err.to_string())),
|
|
}
|
|
}
|
|
|
|
async fn try_get_or_install_k8s_client(&self) -> Result<Option<K8sState>, PreparationError> {
|
|
let k8s_anywhere_config = &self.config;
|
|
|
|
// TODO this deserves some refactoring, it is becoming a bit hard to figure out
|
|
// be careful when making modifications here
|
|
if k8s_anywhere_config.use_local_k3d {
|
|
debug!("Using local k3d cluster because of use_local_k3d set to true");
|
|
} else {
|
|
if let Some(kubeconfig) = &k8s_anywhere_config.kubeconfig {
|
|
debug!("Loading kubeconfig {kubeconfig}");
|
|
match self.try_load_kubeconfig(kubeconfig).await {
|
|
Some(client) => {
|
|
return Ok(Some(K8sState {
|
|
client: Arc::new(client),
|
|
source: K8sSource::Kubeconfig,
|
|
message: format!("Loaded k8s client from kubeconfig {kubeconfig}"),
|
|
}));
|
|
}
|
|
None => {
|
|
return Err(PreparationError::new(format!(
|
|
"Failed to load kubeconfig from {kubeconfig}"
|
|
)));
|
|
}
|
|
}
|
|
}
|
|
|
|
if k8s_anywhere_config.use_system_kubeconfig {
|
|
debug!("Loading system kubeconfig");
|
|
match self.try_load_system_kubeconfig().await {
|
|
Some(_client) => todo!(),
|
|
None => todo!(),
|
|
}
|
|
}
|
|
|
|
info!("No kubernetes configuration found");
|
|
}
|
|
|
|
if !k8s_anywhere_config.autoinstall {
|
|
warn!(
|
|
"Installation cancelled, K8sAnywhere could not initialize a valid Kubernetes client"
|
|
);
|
|
return Ok(None);
|
|
}
|
|
|
|
debug!("Starting K8sAnywhere installation");
|
|
self.try_install_k3d().await?;
|
|
let k3d_score = self.get_k3d_installation_score();
|
|
// I feel like having to rely on the k3d_rs crate here is a smell
|
|
// I think we should have a way to interact more deeply with scores/interpret. Maybe the
|
|
// K3DInstallationScore should expose a method to get_client ? Not too sure what would be a
|
|
// good implementation due to the stateful nature of the k3d thing. Which is why I went
|
|
// with this solution for now
|
|
let k3d = k3d_rs::K3d::new(k3d_score.installation_path, Some(k3d_score.cluster_name));
|
|
let state = match k3d.get_client().await {
|
|
Ok(client) => K8sState {
|
|
client: Arc::new(K8sClient::new(client)),
|
|
source: K8sSource::LocalK3d,
|
|
message: "K8s client ready".to_string(),
|
|
},
|
|
Err(_) => todo!(),
|
|
};
|
|
|
|
Ok(Some(state))
|
|
}
|
|
|
|
async fn ensure_k8s_tenant_manager(&self, k8s_state: &K8sState) -> Result<(), String> {
|
|
if self.tenant_manager.get().is_some() {
|
|
return Ok(());
|
|
}
|
|
|
|
self.tenant_manager
|
|
.get_or_try_init(async || -> Result<K8sTenantManager, String> {
|
|
let k8s_client = self.k8s_client().await?;
|
|
let network_policy_strategy: Box<dyn NetworkPolicyStrategy> = match k8s_state.source
|
|
{
|
|
K8sSource::LocalK3d => Box::new(K3dNetworkPolicyStrategy::new()),
|
|
K8sSource::Kubeconfig => Box::new(NoopNetworkPolicyStrategy::new()),
|
|
};
|
|
|
|
Ok(K8sTenantManager::new(k8s_client, network_policy_strategy))
|
|
})
|
|
.await?;
|
|
|
|
Ok(())
|
|
}
|
|
|
|
fn get_k8s_tenant_manager(&self) -> Result<&K8sTenantManager, ExecutorError> {
|
|
match self.tenant_manager.get() {
|
|
Some(t) => Ok(t),
|
|
None => Err(ExecutorError::UnexpectedError(
|
|
"K8sTenantManager not available".to_string(),
|
|
)),
|
|
}
|
|
}
|
|
|
|
async fn ensure_cluster_observability_operator(
|
|
&self,
|
|
sender: &RHOBObservability,
|
|
) -> Result<PreparationOutcome, PreparationError> {
|
|
let status = Command::new("sh")
|
|
.args(["-c", "kubectl get crd -A | grep -i rhobs"])
|
|
.status()
|
|
.map_err(|e| PreparationError::new(format!("could not connect to cluster: {}", e)))?;
|
|
|
|
if !status.success() {
|
|
if let Some(Some(k8s_state)) = self.k8s_state.get() {
|
|
match k8s_state.source {
|
|
K8sSource::LocalK3d => {
|
|
warn!("Installing observability operator is not supported on LocalK3d source");
|
|
return Ok(PreparationOutcome::Noop);
|
|
debug!("installing cluster observability operator");
|
|
todo!();
|
|
let op_score =
|
|
prometheus_operator_helm_chart_score(sender.namespace.clone());
|
|
let result = op_score.interpret(&Inventory::empty(), self).await;
|
|
|
|
return match result {
|
|
Ok(outcome) => match outcome.status {
|
|
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
|
|
details: "installed cluster observability operator".into(),
|
|
}),
|
|
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
|
|
_ => Err(PreparationError::new(
|
|
"failed to install cluster observability operator (unknown error)".into(),
|
|
)),
|
|
},
|
|
Err(err) => Err(PreparationError::new(err.to_string())),
|
|
};
|
|
}
|
|
K8sSource::Kubeconfig => {
|
|
debug!(
|
|
"unable to install cluster observability operator, contact cluster admin"
|
|
);
|
|
return Ok(PreparationOutcome::Noop);
|
|
}
|
|
}
|
|
} else {
|
|
warn!(
|
|
"Unable to detect k8s_state. Skipping Cluster Observability Operator install."
|
|
);
|
|
return Ok(PreparationOutcome::Noop);
|
|
}
|
|
}
|
|
|
|
debug!("Cluster Observability Operator is already present, skipping install");
|
|
|
|
Ok(PreparationOutcome::Success {
|
|
details: "cluster observability operator present in cluster".into(),
|
|
})
|
|
}
|
|
|
|
async fn ensure_prometheus_operator(
|
|
&self,
|
|
sender: &CRDPrometheus,
|
|
) -> Result<PreparationOutcome, PreparationError> {
|
|
let status = Command::new("sh")
|
|
.args(["-c", "kubectl get crd -A | grep -i prometheuses"])
|
|
.status()
|
|
.map_err(|e| PreparationError::new(format!("could not connect to cluster: {}", e)))?;
|
|
|
|
if !status.success() {
|
|
if let Some(Some(k8s_state)) = self.k8s_state.get() {
|
|
match k8s_state.source {
|
|
K8sSource::LocalK3d => {
|
|
debug!("installing prometheus operator");
|
|
let op_score =
|
|
prometheus_operator_helm_chart_score(sender.namespace.clone());
|
|
let result = op_score.interpret(&Inventory::empty(), self).await;
|
|
|
|
return match result {
|
|
Ok(outcome) => match outcome.status {
|
|
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
|
|
details: "installed prometheus operator".into(),
|
|
}),
|
|
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
|
|
_ => Err(PreparationError::new(
|
|
"failed to install prometheus operator (unknown error)".into(),
|
|
)),
|
|
},
|
|
Err(err) => Err(PreparationError::new(err.to_string())),
|
|
};
|
|
}
|
|
K8sSource::Kubeconfig => {
|
|
debug!("unable to install prometheus operator, contact cluster admin");
|
|
return Ok(PreparationOutcome::Noop);
|
|
}
|
|
}
|
|
} else {
|
|
warn!("Unable to detect k8s_state. Skipping Prometheus Operator install.");
|
|
return Ok(PreparationOutcome::Noop);
|
|
}
|
|
}
|
|
|
|
debug!("Prometheus operator is already present, skipping install");
|
|
|
|
Ok(PreparationOutcome::Success {
|
|
details: "prometheus operator present in cluster".into(),
|
|
})
|
|
}
|
|
}
|
|
|
|
#[derive(Clone, Debug)]
|
|
pub struct K8sAnywhereConfig {
|
|
/// The path of the KUBECONFIG file that Harmony should use to interact with the Kubernetes
|
|
/// cluster
|
|
///
|
|
/// Default : None
|
|
pub kubeconfig: Option<String>,
|
|
|
|
/// Whether to use the system KUBECONFIG, either the environment variable or the file in the
|
|
/// default or configured location
|
|
///
|
|
/// Default : false
|
|
pub use_system_kubeconfig: bool,
|
|
|
|
/// Whether to install automatically a kubernetes cluster
|
|
///
|
|
/// When enabled, autoinstall will setup a K3D cluster on the localhost. https://k3d.io/stable/
|
|
///
|
|
/// Default: true
|
|
pub autoinstall: bool,
|
|
|
|
/// Whether to use local k3d cluster.
|
|
///
|
|
/// Takes precedence over other options, useful to avoid messing up a remote cluster by mistake
|
|
///
|
|
/// default: true
|
|
pub use_local_k3d: bool,
|
|
pub harmony_profile: String,
|
|
}
|
|
|
|
impl K8sAnywhereConfig {
|
|
fn from_env() -> Self {
|
|
Self {
|
|
kubeconfig: std::env::var("KUBECONFIG").ok().map(|v| v.to_string()),
|
|
use_system_kubeconfig: std::env::var("HARMONY_USE_SYSTEM_KUBECONFIG")
|
|
.map_or_else(|_| false, |v| v.parse().ok().unwrap_or(false)),
|
|
autoinstall: std::env::var("HARMONY_AUTOINSTALL")
|
|
.map_or_else(|_| true, |v| v.parse().ok().unwrap_or(false)),
|
|
// TODO harmony_profile should be managed at a more core level than this
|
|
harmony_profile: std::env::var("HARMONY_PROFILE").map_or_else(
|
|
|_| "dev".to_string(),
|
|
|v| v.parse().ok().unwrap_or("dev".to_string()),
|
|
),
|
|
use_local_k3d: std::env::var("HARMONY_USE_LOCAL_K3D")
|
|
.map_or_else(|_| true, |v| v.parse().ok().unwrap_or(true)),
|
|
}
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Topology for K8sAnywhereTopology {
|
|
fn name(&self) -> &str {
|
|
"K8sAnywhereTopology"
|
|
}
|
|
|
|
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
|
|
let k8s_state = self
|
|
.k8s_state
|
|
.get_or_try_init(|| self.try_get_or_install_k8s_client())
|
|
.await?;
|
|
|
|
let k8s_state: &K8sState = k8s_state.as_ref().ok_or(PreparationError::new(
|
|
"no K8s client could be found or installed".to_string(),
|
|
))?;
|
|
|
|
self.ensure_k8s_tenant_manager(k8s_state)
|
|
.await
|
|
.map_err(PreparationError::new)?;
|
|
|
|
match self.is_helm_available() {
|
|
Ok(()) => Ok(PreparationOutcome::Success {
|
|
details: format!("{} + helm available", k8s_state.message.clone()),
|
|
}),
|
|
Err(e) => Err(PreparationError::new(format!("helm unavailable: {}", e))),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl MultiTargetTopology for K8sAnywhereTopology {
|
|
fn current_target(&self) -> DeploymentTarget {
|
|
if self.config.use_local_k3d {
|
|
return DeploymentTarget::LocalDev;
|
|
}
|
|
|
|
match self.config.harmony_profile.to_lowercase().as_str() {
|
|
"staging" => DeploymentTarget::Staging,
|
|
"production" => DeploymentTarget::Production,
|
|
_ => todo!("HARMONY_PROFILE must be set when use_local_k3d is false"),
|
|
}
|
|
}
|
|
}
|
|
|
|
impl HelmCommand for K8sAnywhereTopology {}
|
|
|
|
#[async_trait]
|
|
impl TenantManager for K8sAnywhereTopology {
|
|
async fn provision_tenant(&self, config: &TenantConfig) -> Result<(), ExecutorError> {
|
|
self.get_k8s_tenant_manager()?
|
|
.provision_tenant(config)
|
|
.await
|
|
}
|
|
|
|
async fn get_tenant_config(&self) -> Option<TenantConfig> {
|
|
self.get_k8s_tenant_manager()
|
|
.ok()?
|
|
.get_tenant_config()
|
|
.await
|
|
}
|
|
}
|
|
|
|
#[async_trait]
|
|
impl Ingress for K8sAnywhereTopology {
|
|
//TODO this is specifically for openshift/okd which violates the k8sanywhere idea
|
|
async fn get_domain(&self, service: &str) -> Result<String, PreparationError> {
|
|
let client = self.k8s_client().await?;
|
|
|
|
if let Some(Some(k8s_state)) = self.k8s_state.get() {
|
|
match k8s_state.source {
|
|
K8sSource::LocalK3d => Ok(format!("{service}.local.k3d")),
|
|
K8sSource::Kubeconfig => {
|
|
self.openshift_ingress_operator_available().await?;
|
|
|
|
let gvk = GroupVersionKind {
|
|
group: "operator.openshift.io".into(),
|
|
version: "v1".into(),
|
|
kind: "IngressController".into(),
|
|
};
|
|
let ic = client
|
|
.get_resource_json_value(
|
|
"default",
|
|
Some("openshift-ingress-operator"),
|
|
&gvk,
|
|
)
|
|
.await
|
|
.map_err(|_| {
|
|
PreparationError::new("Failed to fetch IngressController".to_string())
|
|
})?;
|
|
|
|
match ic.data["status"]["domain"].as_str() {
|
|
Some(domain) => Ok(format!("{service}.{domain}")),
|
|
None => Err(PreparationError::new("Could not find domain".to_string())),
|
|
}
|
|
}
|
|
}
|
|
} else {
|
|
Err(PreparationError::new(
|
|
"Cannot get domain: unable to detect K8s state".to_string(),
|
|
))
|
|
}
|
|
}
|
|
}
|