Compare commits
6 Commits
master
...
feat/disco
| Author | SHA1 | Date | |
|---|---|---|---|
| b5b77cf1ac | |||
| dda8e29843 | |||
| b33650e9d5 | |||
| ea4709a409 | |||
| cd40660350 | |||
| 2ca732cecd |
@ -1,10 +1,11 @@
|
|||||||
use std::{process::Command, sync::Arc};
|
use std::{collections::HashMap, process::Command, sync::Arc};
|
||||||
|
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use inquire::Confirm;
|
use inquire::Confirm;
|
||||||
use log::{info, warn};
|
use log::{info, warn};
|
||||||
use tokio::sync::OnceCell;
|
use tokio::sync::{Mutex, OnceCell};
|
||||||
|
|
||||||
|
use crate::score::Score;
|
||||||
use crate::{
|
use crate::{
|
||||||
executors::ExecutorError,
|
executors::ExecutorError,
|
||||||
interpret::{InterpretError, Outcome},
|
interpret::{InterpretError, Outcome},
|
||||||
@ -17,6 +18,7 @@ use crate::{
|
|||||||
use super::{
|
use super::{
|
||||||
HelmCommand, K8sclient, Topology,
|
HelmCommand, K8sclient, Topology,
|
||||||
k8s::K8sClient,
|
k8s::K8sClient,
|
||||||
|
oberservability::monitoring::{AlertReceiver, AlertReceiverProvision},
|
||||||
tenant::{
|
tenant::{
|
||||||
ResourceLimits, TenantConfig, TenantManager, TenantNetworkPolicy, k8s::K8sTenantManager,
|
ResourceLimits, TenantConfig, TenantManager, TenantNetworkPolicy, k8s::K8sTenantManager,
|
||||||
},
|
},
|
||||||
@ -37,6 +39,7 @@ enum K8sSource {
|
|||||||
pub struct K8sAnywhereTopology {
|
pub struct K8sAnywhereTopology {
|
||||||
k8s_state: OnceCell<Option<K8sState>>,
|
k8s_state: OnceCell<Option<K8sState>>,
|
||||||
tenant_manager: OnceCell<K8sTenantManager>,
|
tenant_manager: OnceCell<K8sTenantManager>,
|
||||||
|
pub alert_receivers: Mutex<HashMap<String, OnceCell<AlertReceiver>>>,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@ -61,9 +64,29 @@ impl K8sAnywhereTopology {
|
|||||||
Self {
|
Self {
|
||||||
k8s_state: OnceCell::new(),
|
k8s_state: OnceCell::new(),
|
||||||
tenant_manager: OnceCell::new(),
|
tenant_manager: OnceCell::new(),
|
||||||
|
alert_receivers: Mutex::new(HashMap::new()),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub async fn initialize_alert_receiver<C>(
|
||||||
|
&self,
|
||||||
|
config: &C,
|
||||||
|
inventory: &Inventory,
|
||||||
|
) -> Result<AlertReceiver, InterpretError>
|
||||||
|
where
|
||||||
|
Self: Topology + HelmCommand,
|
||||||
|
C: AlertReceiverProvision<Self> + Send + Sync,
|
||||||
|
{
|
||||||
|
let score = config.get_deployment_score();
|
||||||
|
let interpret = score.create_interpret();
|
||||||
|
interpret.execute(inventory, self).await?;
|
||||||
|
|
||||||
|
Ok(AlertReceiver {
|
||||||
|
receiver_id: config.alert_receiver_id(),
|
||||||
|
receiver_installed: true,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
fn is_helm_available(&self) -> Result<(), String> {
|
fn is_helm_available(&self) -> Result<(), String> {
|
||||||
let version_result = Command::new("helm")
|
let version_result = Command::new("helm")
|
||||||
.arg("version")
|
.arg("version")
|
||||||
|
|||||||
@ -1,14 +1,18 @@
|
|||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
|
use dyn_clone::DynClone;
|
||||||
|
use serde::Serialize;
|
||||||
|
|
||||||
use std::fmt::Debug;
|
use std::fmt::Debug;
|
||||||
use url::Url;
|
|
||||||
|
|
||||||
use crate::interpret::InterpretError;
|
use crate::interpret::InterpretError;
|
||||||
|
|
||||||
|
use crate::inventory::Inventory;
|
||||||
|
use crate::score::Score;
|
||||||
|
use crate::topology::HelmCommand;
|
||||||
use crate::{interpret::Outcome, topology::Topology};
|
use crate::{interpret::Outcome, topology::Topology};
|
||||||
|
|
||||||
/// Represents an entity responsible for collecting and organizing observability data
|
/// Represents an entity responsible for collecting and organizing observability data
|
||||||
/// from various telemetry sources
|
/// from various telemetry sources such as Prometheus or Datadog
|
||||||
/// A `Monitor` abstracts the logic required to scrape, aggregate, and structure
|
/// A `Monitor` abstracts the logic required to scrape, aggregate, and structure
|
||||||
/// monitoring data, enabling consistent processing regardless of the underlying data source.
|
/// monitoring data, enabling consistent processing regardless of the underlying data source.
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
@ -26,6 +30,26 @@ pub trait Monitor<T: Topology>: Debug + Send + Sync {
|
|||||||
) -> Result<Outcome, InterpretError>;
|
) -> Result<Outcome, InterpretError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait EnsureAlertReceiver<T: Topology>: Debug + DynClone + Send + Sync {
|
||||||
|
async fn ensure_alert_receiver(
|
||||||
|
&self,
|
||||||
|
inventory: Inventory,
|
||||||
|
topology: &T,
|
||||||
|
) -> Result<Outcome, InterpretError>;
|
||||||
|
}
|
||||||
|
|
||||||
|
dyn_clone::clone_trait_object!(<T> EnsureAlertReceiver<T>);
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Serialize)]
|
||||||
pub struct AlertReceiver {
|
pub struct AlertReceiver {
|
||||||
pub receiver_id: String,
|
pub receiver_id: String,
|
||||||
|
pub receiver_installed: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Provides the ability to turn an alert config into an executable score
|
||||||
|
/// for the topology
|
||||||
|
pub trait AlertReceiverProvision<T: Topology + HelmCommand> {
|
||||||
|
fn get_deployment_score(&self) -> Box<dyn Score<T>>;
|
||||||
|
fn alert_receiver_id(&self) -> String;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,32 +1,104 @@
|
|||||||
|
use super::discord_alert_manager::discord_alert_manager_score;
|
||||||
use async_trait::async_trait;
|
use async_trait::async_trait;
|
||||||
use serde_json::Value;
|
use serde::Serialize;
|
||||||
|
use serde_yaml::Value;
|
||||||
|
use tokio::sync::OnceCell;
|
||||||
use url::Url;
|
use url::Url;
|
||||||
|
|
||||||
use crate::{
|
use crate::{
|
||||||
interpret::{InterpretError, Outcome},
|
interpret::{Interpret, InterpretError, Outcome},
|
||||||
topology::K8sAnywhereTopology,
|
inventory::Inventory,
|
||||||
|
score::Score,
|
||||||
|
topology::{
|
||||||
|
HelmCommand, K8sAnywhereTopology, Topology,
|
||||||
|
oberservability::monitoring::{AlertReceiverProvision, EnsureAlertReceiver},
|
||||||
|
},
|
||||||
};
|
};
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
pub struct DiscordWebhookConfig {
|
pub struct DiscordWebhookConfig {
|
||||||
pub webhook_url: Url,
|
pub webhook_url: Url,
|
||||||
pub name: String,
|
pub name: String,
|
||||||
pub send_resolved_notifications: bool,
|
pub send_resolved_notifications: bool,
|
||||||
}
|
}
|
||||||
|
|
||||||
pub trait DiscordWebhookReceiver {
|
#[async_trait]
|
||||||
fn deploy_discord_webhook_receiver(
|
impl<T: Topology + DiscordWebhookReceiver> EnsureAlertReceiver<T> for DiscordWebhookConfig {
|
||||||
|
async fn ensure_alert_receiver(
|
||||||
&self,
|
&self,
|
||||||
_notification_adapter_id: &str,
|
inventory: Inventory,
|
||||||
) -> Result<Outcome, InterpretError>;
|
topology: &T,
|
||||||
|
) -> Result<Outcome, InterpretError> {
|
||||||
|
topology
|
||||||
|
.ensure_discord_webhook_receiver(&inventory, self.clone())
|
||||||
|
.await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Topology + HelmCommand> AlertReceiverProvision<T> for DiscordWebhookConfig {
|
||||||
|
fn get_deployment_score(&self) -> Box<dyn Score<T>> {
|
||||||
|
Box::new(DiscordWebhookReceiverScore {
|
||||||
|
config: self.clone(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
fn alert_receiver_id(&self) -> String {
|
||||||
|
self.name.clone()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[async_trait]
|
||||||
|
pub trait DiscordWebhookReceiver {
|
||||||
|
async fn ensure_discord_webhook_receiver(
|
||||||
|
&self,
|
||||||
|
inventory: &Inventory,
|
||||||
|
config: DiscordWebhookConfig,
|
||||||
|
) -> Result<Outcome, InterpretError>;
|
||||||
fn delete_discord_webhook_receiver(
|
fn delete_discord_webhook_receiver(
|
||||||
&self,
|
&self,
|
||||||
_notification_adapter_id: &str,
|
config: DiscordWebhookConfig,
|
||||||
) -> Result<Outcome, InterpretError>;
|
) -> Result<Outcome, InterpretError>;
|
||||||
}
|
}
|
||||||
|
|
||||||
// trait used to generate alert manager config values impl<T: Topology + AlertManagerConfig> Monitor for KubePrometheus
|
#[async_trait]
|
||||||
|
impl DiscordWebhookReceiver for K8sAnywhereTopology {
|
||||||
|
async fn ensure_discord_webhook_receiver(
|
||||||
|
&self,
|
||||||
|
inventory: &Inventory,
|
||||||
|
config: DiscordWebhookConfig,
|
||||||
|
) -> Result<Outcome, InterpretError> {
|
||||||
|
let receiver_key = config.name.clone();
|
||||||
|
let mut adapters_map_guard = self.alert_receivers.lock().await;
|
||||||
|
|
||||||
|
let cell = adapters_map_guard
|
||||||
|
.entry(receiver_key.clone())
|
||||||
|
.or_insert_with(OnceCell::new);
|
||||||
|
|
||||||
|
if let Some(initialized_receiver) = cell.get() {
|
||||||
|
return Ok(Outcome::success(format!(
|
||||||
|
"Discord Webhook adapter for '{}' already initialized.",
|
||||||
|
initialized_receiver.receiver_id
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let final_state = cell
|
||||||
|
.get_or_try_init(|| async { self.initialize_alert_receiver(&config, inventory).await })
|
||||||
|
.await?;
|
||||||
|
|
||||||
|
Ok(Outcome::success(format!(
|
||||||
|
"Discord Webhook Receiver for '{}' ensured/initialized.",
|
||||||
|
final_state.receiver_id
|
||||||
|
)))
|
||||||
|
}
|
||||||
|
|
||||||
|
fn delete_discord_webhook_receiver(
|
||||||
|
&self,
|
||||||
|
_config: DiscordWebhookConfig,
|
||||||
|
) -> Result<Outcome, InterpretError> {
|
||||||
|
todo!()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
pub trait AlertManagerConfig<T> {
|
pub trait AlertManagerConfig<T> {
|
||||||
fn get_alert_manager_config(&self) -> Result<Value, InterpretError>;
|
fn get_alert_manager_config(&self) -> Result<Value, InterpretError>;
|
||||||
}
|
}
|
||||||
@ -38,18 +110,22 @@ impl<T: DiscordWebhookReceiver> AlertManagerConfig<T> for DiscordWebhookConfig {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[derive(Debug, Clone, Serialize)]
|
||||||
impl DiscordWebhookReceiver for K8sAnywhereTopology {
|
struct DiscordWebhookReceiverScore {
|
||||||
fn deploy_discord_webhook_receiver(
|
config: DiscordWebhookConfig,
|
||||||
&self,
|
}
|
||||||
_notification_adapter_id: &str,
|
|
||||||
) -> Result<Outcome, InterpretError> {
|
impl<T: Topology + HelmCommand> Score<T> for DiscordWebhookReceiverScore {
|
||||||
todo!()
|
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||||
|
discord_alert_manager_score(
|
||||||
|
self.config.webhook_url.clone(),
|
||||||
|
self.config.name.clone(),
|
||||||
|
self.config.name.clone(),
|
||||||
|
)
|
||||||
|
.create_interpret()
|
||||||
}
|
}
|
||||||
fn delete_discord_webhook_receiver(
|
|
||||||
&self,
|
fn name(&self) -> String {
|
||||||
_notification_adapter_id: &str,
|
"DiscordWebhookReceiverScore".to_string()
|
||||||
) -> Result<Outcome, InterpretError> {
|
|
||||||
todo!()
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user