Compare commits

..

1 Commits

Author SHA1 Message Date
32d0c2aa1e feat: node readiness now supports a check query param with node_ready and okd_router_1936 options
Some checks failed
Run Check Script / check (pull_request) Failing after 1m39s
2026-03-02 14:53:22 -05:00
196 changed files with 6919 additions and 10972 deletions

2587
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,7 @@
resolver = "2"
members = [
"private_repos/*",
"examples/*",
"harmony",
"harmony_types",
"harmony_macros",
@@ -16,9 +17,9 @@ members = [
"harmony_secret_derive",
"harmony_secret",
"adr/agent_discovery/mdns",
"brocade",
"harmony_agent",
"harmony_agent/deploy", "harmony_node_readiness", "harmony-k8s",
"brocade",
"harmony_agent",
"harmony_agent/deploy", "harmony_node_readiness",
]
[workspace.package]
@@ -37,8 +38,6 @@ tokio = { version = "1.40", features = [
"macros",
"rt-multi-thread",
] }
tokio-retry = "0.3.0"
tokio-util = "0.7.15"
cidr = { features = ["serde"], version = "0.2" }
russh = "0.45"
russh-keys = "0.45"

View File

@@ -1,318 +0,0 @@
# Architecture Decision Record: Monitoring and Alerting Architecture
Initial Author: Willem Rolleman, Jean-Gabriel Carrier
Initial Date: March 9, 2026
Last Updated Date: March 9, 2026
## Status
Accepted
Supersedes: [ADR-010](010-monitoring-and-alerting.md)
## Context
Harmony needs a unified approach to monitoring and alerting across different infrastructure targets:
1. **Cluster-level monitoring**: Administrators managing entire Kubernetes/OKD clusters need to define cluster-wide alerts, receivers, and scrape targets.
2. **Tenant-level monitoring**: Multi-tenant clusters where teams are confined to namespaces need monitoring scoped to their resources.
3. **Application-level monitoring**: Developers deploying applications want zero-config monitoring that "just works" for their services.
The monitoring landscape is fragmented:
- **OKD/OpenShift**: Built-in Prometheus with AlertmanagerConfig CRDs
- **KubePrometheus**: Helm-based stack with PrometheusRule CRDs
- **RHOB (Red Hat Observability)**: Operator-based with MonitoringStack CRDs
- **Standalone Prometheus**: Raw Prometheus deployments
Each system has different CRDs, different installation methods, and different configuration APIs.
## Decision
We implement a **trait-based architecture with compile-time capability verification** that provides:
1. **Type-safe abstractions** via parameterized traits: `AlertReceiver<S>`, `AlertRule<S>`, `ScrapeTarget<S>`
2. **Compile-time topology compatibility** via the `Observability<S>` capability bound
3. **Three levels of abstraction**: Cluster, Tenant, and Application monitoring
4. **Pre-built alert rules** as functions that return typed structs
### Core Traits
```rust
// domain/topology/monitoring.rs
/// Marker trait for systems that send alerts (Prometheus, etc.)
pub trait AlertSender: Send + Sync + std::fmt::Debug {
fn name(&self) -> String;
}
/// Defines how a receiver (Discord, Slack, etc.) builds its configuration
/// for a specific sender type
pub trait AlertReceiver<S: AlertSender>: std::fmt::Debug + Send + Sync {
fn build(&self) -> Result<ReceiverInstallPlan, InterpretError>;
fn name(&self) -> String;
fn clone_box(&self) -> Box<dyn AlertReceiver<S>>;
}
/// Defines how an alert rule builds its PrometheusRule configuration
pub trait AlertRule<S: AlertSender>: std::fmt::Debug + Send + Sync {
fn build_rule(&self) -> Result<serde_json::Value, InterpretError>;
fn name(&self) -> String;
fn clone_box(&self) -> Box<dyn AlertRule<S>>;
}
/// Capability that topologies implement to support monitoring
pub trait Observability<S: AlertSender> {
async fn install_alert_sender(&self, sender: &S, inventory: &Inventory)
-> Result<PreparationOutcome, PreparationError>;
async fn install_receivers(&self, sender: &S, inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<S>>>>) -> Result<...>;
async fn install_rules(&self, sender: &S, inventory: &Inventory,
rules: Option<Vec<Box<dyn AlertRule<S>>>>) -> Result<...>;
async fn add_scrape_targets(&self, sender: &S, inventory: &Inventory,
scrape_targets: Option<Vec<Box<dyn ScrapeTarget<S>>>>) -> Result<...>;
async fn ensure_monitoring_installed(&self, sender: &S, inventory: &Inventory)
-> Result<...>;
}
```
### Alert Sender Types
Each monitoring stack is a distinct `AlertSender`:
| Sender | Module | Use Case |
|--------|--------|----------|
| `OpenshiftClusterAlertSender` | `monitoring/okd/` | OKD/OpenShift built-in monitoring |
| `KubePrometheus` | `monitoring/kube_prometheus/` | Helm-deployed kube-prometheus-stack |
| `Prometheus` | `monitoring/prometheus/` | Standalone Prometheus via Helm |
| `RedHatClusterObservability` | `monitoring/red_hat_cluster_observability/` | RHOB operator |
| `Grafana` | `monitoring/grafana/` | Grafana-managed alerting |
### Three Levels of Monitoring
#### 1. Cluster-Level Monitoring
For cluster administrators. Full control over monitoring infrastructure.
```rust
// examples/okd_cluster_alerts/src/main.rs
OpenshiftClusterAlertScore {
sender: OpenshiftClusterAlertSender,
receivers: vec![Box::new(DiscordReceiver { ... })],
rules: vec![Box::new(alert_rules)],
scrape_targets: Some(vec![Box::new(external_exporters)]),
}
```
**Characteristics:**
- Cluster-scoped CRDs and resources
- Can add external scrape targets (outside cluster)
- Manages Alertmanager configuration
- Requires cluster-admin privileges
#### 2. Tenant-Level Monitoring
For teams confined to namespaces. The topology determines tenant context.
```rust
// The topology's Observability impl handles namespace scoping
impl Observability<KubePrometheus> for K8sAnywhereTopology {
async fn install_rules(&self, sender: &KubePrometheus, ...) {
// Topology knows if it's tenant-scoped
let namespace = self.get_tenant_config().await
.map(|t| t.name)
.unwrap_or("default");
// Install rules in tenant namespace
}
}
```
**Characteristics:**
- Namespace-scoped resources
- Cannot modify cluster-level monitoring config
- May have restricted receiver types
- Runtime validation of permissions (cannot be fully compile-time)
#### 3. Application-Level Monitoring
For developers. Zero-config, opinionated monitoring.
```rust
// modules/application/features/monitoring.rs
pub struct Monitoring {
pub application: Arc<dyn Application>,
pub alert_receiver: Vec<Box<dyn AlertReceiver<Prometheus>>>,
}
impl<T: Topology + Observability<Prometheus> + TenantManager + ...>
ApplicationFeature<T> for Monitoring
{
async fn ensure_installed(&self, topology: &T) -> Result<...> {
// Auto-creates ServiceMonitor
// Auto-installs Ntfy for notifications
// Handles tenant namespace automatically
// Wires up sensible defaults
}
}
```
**Characteristics:**
- Automatic ServiceMonitor creation
- Opinionated notification channel (Ntfy)
- Tenant-aware via topology
- Minimal configuration required
## Rationale
### Why Generic Traits Instead of Unified Types?
Each monitoring stack (OKD, KubePrometheus, RHOB) has fundamentally different CRDs:
```rust
// OKD uses AlertmanagerConfig with different structure
AlertmanagerConfig { spec: { receivers: [...] } }
// RHOB uses secret references for webhook URLs
MonitoringStack { spec: { alertmanagerConfig: { discordConfigs: [{ apiURL: { key: "..." } }] } } }
// KubePrometheus uses Alertmanager CRD with different field names
Alertmanager { spec: { config: { receivers: [...] } } }
```
A unified type would either:
1. Be a lowest-common-denominator (loses stack-specific features)
2. Be a complex union type (hard to use, easy to misconfigure)
Generic traits let each stack express its configuration naturally while providing a consistent interface.
### Why Compile-Time Capability Bounds?
```rust
impl<T: Topology + Observability<OpenshiftClusterAlertSender>> Score<T>
for OpenshiftClusterAlertScore { ... }
```
This fails at compile time if you try to use `OpenshiftClusterAlertScore` with a topology that doesn't support OKD monitoring. This prevents the "config-is-valid-but-platform-is-wrong" errors that Harmony was designed to eliminate.
### Why Not a MonitoringStack Abstraction (V2 Approach)?
The V2 approach proposed a unified `MonitoringStack` that hides sender selection:
```rust
// V2 approach - rejected
MonitoringStack::new(MonitoringApiVersion::V2CRD)
.add_alert_channel(discord)
```
**Problems:**
1. Hides which sender you're using, losing compile-time guarantees
2. "Version selection" actually chooses between fundamentally different systems
3. Would need to handle all stack-specific features through a generic interface
The current approach is explicit: you choose `OpenshiftClusterAlertSender` and the compiler verifies compatibility.
### Why Runtime Validation for Tenants?
Tenant confinement is determined at runtime by the topology and K8s RBAC. We cannot know at compile time whether a user has cluster-admin or namespace-only access.
Options considered:
1. **Compile-time tenant markers** - Would require modeling entire RBAC hierarchy in types. Over-engineering.
2. **Runtime validation** - Current approach. Fails with clear K8s permission errors if insufficient access.
3. **No tenant support** - Would exclude a major use case.
Runtime validation is the pragmatic choice. The failure mode is clear (K8s API error) and occurs early in execution.
> Note : we will eventually have compile time validation for such things. Rust macros are powerful and we could discover the actual capabilities we're dealing with, similar to sqlx approach in query! macros.
## Consequences
### Pros
1. **Type Safety**: Invalid configurations are caught at compile time
2. **Extensibility**: Adding a new monitoring stack requires implementing traits, not modifying core code
3. **Clear Separation**: Cluster/Tenant/Application levels have distinct entry points
4. **Reusable Rules**: Pre-built alert rules as functions (`high_pvc_fill_rate_over_two_days()`)
5. **CRD Accuracy**: Type definitions match actual Kubernetes CRDs exactly
### Cons
1. **Implementation Explosion**: `DiscordReceiver` implements `AlertReceiver<S>` for each sender type (3+ implementations)
2. **Learning Curve**: Understanding the trait hierarchy takes time
3. **clone_box Boilerplate**: Required for trait object cloning (3 lines per impl)
### Mitigations
- Implementation explosion is contained: each receiver type has O(senders) implementations, but receivers are rare compared to rules
- Learning curve is documented with examples at each level
- clone_box boilerplate is minimal and copy-paste
## Alternatives Considered
### Unified MonitoringStack Type
See "Why Not a MonitoringStack Abstraction" above. Rejected for losing compile-time safety.
### Helm-Only Approach
Use `HelmScore` directly for each monitoring deployment. Rejected because:
- No type safety for alert rules
- Cannot compose with application features
- No tenant awareness
### Separate Modules Per Use Case
Have `cluster_monitoring/`, `tenant_monitoring/`, `app_monitoring/` as separate modules. Rejected because:
- Massive code duplication
- No shared abstraction for receivers/rules
- Adding a feature requires three implementations
## Implementation Notes
### Module Structure
```
modules/monitoring/
├── mod.rs # Public exports
├── alert_channel/ # Receivers (Discord, Webhook)
├── alert_rule/ # Rules and pre-built alerts
│ ├── prometheus_alert_rule.rs
│ └── alerts/ # Library of pre-built rules
│ ├── k8s/ # K8s-specific (pvc, pod, memory)
│ └── infra/ # Infrastructure (opnsense, dell)
├── okd/ # OpenshiftClusterAlertSender
├── kube_prometheus/ # KubePrometheus
├── prometheus/ # Prometheus
├── red_hat_cluster_observability/ # RHOB
├── grafana/ # Grafana
├── application_monitoring/ # Application-level scores
└── scrape_target/ # External scrape targets
```
### Adding a New Alert Sender
1. Create sender type: `pub struct MySender; impl AlertSender for MySender { ... }`
2. Implement `Observability<MySender>` for topologies that support it
3. Create CRD types in `crd/` subdirectory
4. Implement `AlertReceiver<MySender>` for existing receivers
5. Implement `AlertRule<MySender>` for `AlertManagerRuleGroup`
### Adding a New Alert Rule
```rust
pub fn my_custom_alert() -> PrometheusAlertRule {
PrometheusAlertRule::new("MyAlert", "up == 0")
.for_duration("5m")
.label("severity", "critical")
.annotation("summary", "Service is down")
}
```
No trait implementation needed - `AlertManagerRuleGroup` already handles conversion.
## Related ADRs
- [ADR-013](013-monitoring-notifications.md): Notification channel selection (ntfy)
- [ADR-011](011-multi-tenant-cluster.md): Multi-tenant cluster architecture

View File

@@ -1,21 +0,0 @@
[package]
name = "example-monitoring-v2"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony-k8s = { path = "../../harmony-k8s" }
harmony_types = { path = "../../harmony_types" }
kube = { workspace = true }
schemars = "0.8"
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
serde_yaml = { workspace = true }
url = { workspace = true }
log = { workspace = true }
async-trait = { workspace = true }
k8s-openapi = { workspace = true }

View File

@@ -1,91 +0,0 @@
# Monitoring v2 - Improved Architecture
This example demonstrates the improved monitoring architecture that addresses the "WTF/minute" issues in the original design.
## Key Improvements
### 1. **Single AlertChannel Trait with Generic Sender**
The original design required 9-12 implementations for each alert channel (Discord, Webhook, etc.) - one for each sender type. The new design uses a single trait with generic sender parameterization:
pub trait AlertChannel<Sender: AlertSender> {
async fn install_config(&self, sender: &Sender) -> Result<Outcome, InterpretError>;
fn name(&self) -> String;
fn as_any(&self) -> &dyn std::any::Any;
}
**Benefits:**
- One Discord implementation works with all sender types
- Type safety at compile time
- No runtime dispatch overhead
### 2. **MonitoringStack Abstraction**
Instead of manually selecting CRDPrometheus vs KubePrometheus vs RHOBObservability, you now have a unified MonitoringStack that handles versioning:
let monitoring_stack = MonitoringStack::new(MonitoringApiVersion::V2CRD)
.set_namespace("monitoring")
.add_alert_channel(discord_receiver)
.set_scrape_targets(vec![...]);
**Benefits:**
- Single source of truth for monitoring configuration
- Easy to switch between monitoring versions
- Automatic version-specific configuration
### 3. **TenantMonitoringScore - True Composition**
The original monitoring_with_tenant example just put tenant and monitoring as separate items in a vec. The new design truly composes them:
let tenant_score = TenantMonitoringScore::new("test-tenant", monitoring_stack);
This creates a single score that:
- Has tenant context
- Has monitoring configuration
- Automatically installs monitoring scoped to tenant namespace
**Benefits:**
- No more "two separate things" confusion
- Automatic tenant namespace scoping
- Clear ownership: tenant owns its monitoring
### 4. **Versioned Monitoring APIs**
Clear versioning makes it obvious which monitoring stack you're using:
pub enum MonitoringApiVersion {
V1Helm, // Old Helm charts
V2CRD, // Current CRDs
V3RHOB, // RHOB (future)
}
**Benefits:**
- No guessing which API version you're using
- Easy to migrate between versions
- Backward compatibility path
## Comparison
### Original Design (monitoring_with_tenant)
- Manual selection of each component
- Manual installation of both components
- Need to remember to pass both to harmony_cli::run
- Monitoring not scoped to tenant automatically
### New Design (monitoring_v2)
- Single composed score
- One score does it all
## Usage
cd examples/monitoring_v2
cargo run
## Migration Path
To migrate from the old design to the new:
1. Replace individual alert channel implementations with AlertChannel<Sender>
2. Use MonitoringStack instead of manual *Prometheus selection
3. Use TenantMonitoringScore instead of separate TenantScore + monitoring scores
4. Select monitoring version via MonitoringApiVersion

View File

@@ -1,343 +0,0 @@
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use log::debug;
use serde::{Deserialize, Serialize};
use serde_yaml::{Mapping, Value};
use harmony::data::Version;
use harmony::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use harmony::inventory::Inventory;
use harmony::score::Score;
use harmony::topology::{Topology, tenant::TenantManager};
use harmony_k8s::K8sClient;
use harmony_types::k8s_name::K8sName;
use harmony_types::net::Url;
pub trait AlertSender: Send + Sync + std::fmt::Debug {
fn name(&self) -> String;
fn namespace(&self) -> String;
}
#[derive(Debug)]
pub struct CRDPrometheus {
pub namespace: String,
pub client: Arc<K8sClient>,
}
impl AlertSender for CRDPrometheus {
fn name(&self) -> String {
"CRDPrometheus".to_string()
}
fn namespace(&self) -> String {
self.namespace.clone()
}
}
#[derive(Debug)]
pub struct RHOBObservability {
pub namespace: String,
pub client: Arc<K8sClient>,
}
impl AlertSender for RHOBObservability {
fn name(&self) -> String {
"RHOBObservability".to_string()
}
fn namespace(&self) -> String {
self.namespace.clone()
}
}
#[derive(Debug)]
pub struct KubePrometheus {
pub config: Arc<Mutex<KubePrometheusConfig>>,
}
impl Default for KubePrometheus {
fn default() -> Self {
Self::new()
}
}
impl KubePrometheus {
pub fn new() -> Self {
Self {
config: Arc::new(Mutex::new(KubePrometheusConfig::new())),
}
}
}
impl AlertSender for KubePrometheus {
fn name(&self) -> String {
"KubePrometheus".to_string()
}
fn namespace(&self) -> String {
self.config.lock().unwrap().namespace.clone().unwrap_or_else(|| "monitoring".to_string())
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct KubePrometheusConfig {
pub namespace: Option<String>,
#[serde(skip)]
pub alert_receiver_configs: Vec<AlertManagerChannelConfig>,
}
impl KubePrometheusConfig {
pub fn new() -> Self {
Self {
namespace: None,
alert_receiver_configs: Vec::new(),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AlertManagerChannelConfig {
pub channel_receiver: serde_yaml::Value,
pub channel_route: serde_yaml::Value,
}
impl Default for AlertManagerChannelConfig {
fn default() -> Self {
Self {
channel_receiver: serde_yaml::Value::Mapping(Default::default()),
channel_route: serde_yaml::Value::Mapping(Default::default()),
}
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ScrapeTargetConfig {
pub service_name: String,
pub port: String,
pub path: String,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub enum MonitoringApiVersion {
V1Helm,
V2CRD,
V3RHOB,
}
#[derive(Debug, Clone)]
pub struct MonitoringStack {
pub version: MonitoringApiVersion,
pub namespace: String,
pub alert_channels: Vec<Arc<dyn AlertSender>>,
pub scrape_targets: Vec<ScrapeTargetConfig>,
}
impl MonitoringStack {
pub fn new(version: MonitoringApiVersion) -> Self {
Self {
version,
namespace: "monitoring".to_string(),
alert_channels: Vec::new(),
scrape_targets: Vec::new(),
}
}
pub fn set_namespace(mut self, namespace: &str) -> Self {
self.namespace = namespace.to_string();
self
}
pub fn add_alert_channel(mut self, channel: impl AlertSender + 'static) -> Self {
self.alert_channels.push(Arc::new(channel));
self
}
pub fn set_scrape_targets(mut self, targets: Vec<(&str, &str, String)>) -> Self {
self.scrape_targets = targets
.into_iter()
.map(|(name, port, path)| ScrapeTargetConfig {
service_name: name.to_string(),
port: port.to_string(),
path,
})
.collect();
self
}
}
pub trait AlertChannel<Sender: AlertSender> {
fn install_config(&self, sender: &Sender);
fn name(&self) -> String;
}
#[derive(Debug, Clone)]
pub struct DiscordWebhook {
pub name: K8sName,
pub url: Url,
pub selectors: Vec<HashMap<String, String>>,
}
impl DiscordWebhook {
fn get_config(&self) -> AlertManagerChannelConfig {
let mut route = Mapping::new();
route.insert(
Value::String("receiver".to_string()),
Value::String(self.name.to_string()),
);
route.insert(
Value::String("matchers".to_string()),
Value::Sequence(vec![Value::String("alertname!=Watchdog".to_string())]),
);
let mut receiver = Mapping::new();
receiver.insert(
Value::String("name".to_string()),
Value::String(self.name.to_string()),
);
let mut discord_config = Mapping::new();
discord_config.insert(
Value::String("webhook_url".to_string()),
Value::String(self.url.to_string()),
);
receiver.insert(
Value::String("discord_configs".to_string()),
Value::Sequence(vec![Value::Mapping(discord_config)]),
);
AlertManagerChannelConfig {
channel_receiver: Value::Mapping(receiver),
channel_route: Value::Mapping(route),
}
}
}
impl AlertChannel<CRDPrometheus> for DiscordWebhook {
fn install_config(&self, sender: &CRDPrometheus) {
debug!("Installing Discord webhook for CRDPrometheus in namespace: {}", sender.namespace());
debug!("Config: {:?}", self.get_config());
debug!("Installed!");
}
fn name(&self) -> String {
"discord-webhook".to_string()
}
}
impl AlertChannel<RHOBObservability> for DiscordWebhook {
fn install_config(&self, sender: &RHOBObservability) {
debug!("Installing Discord webhook for RHOBObservability in namespace: {}", sender.namespace());
debug!("Config: {:?}", self.get_config());
debug!("Installed!");
}
fn name(&self) -> String {
"webhook-receiver".to_string()
}
}
impl AlertChannel<KubePrometheus> for DiscordWebhook {
fn install_config(&self, sender: &KubePrometheus) {
debug!("Installing Discord webhook for KubePrometheus in namespace: {}", sender.namespace());
let config = sender.config.lock().unwrap();
let ns = config.namespace.clone().unwrap_or_else(|| "monitoring".to_string());
debug!("Namespace: {}", ns);
let mut config = sender.config.lock().unwrap();
config.alert_receiver_configs.push(self.get_config());
debug!("Installed!");
}
fn name(&self) -> String {
"discord-webhook".to_string()
}
}
fn default_monitoring_stack() -> MonitoringStack {
MonitoringStack::new(MonitoringApiVersion::V2CRD)
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct TenantMonitoringScore {
pub tenant_id: harmony_types::id::Id,
pub tenant_name: String,
#[serde(skip)]
#[serde(default = "default_monitoring_stack")]
pub monitoring_stack: MonitoringStack,
}
impl TenantMonitoringScore {
pub fn new(tenant_name: &str, monitoring_stack: MonitoringStack) -> Self {
Self {
tenant_id: harmony_types::id::Id::default(),
tenant_name: tenant_name.to_string(),
monitoring_stack,
}
}
}
impl<T: Topology + TenantManager> Score<T> for TenantMonitoringScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(TenantMonitoringInterpret {
score: self.clone(),
})
}
fn name(&self) -> String {
format!("{} monitoring [TenantMonitoringScore]", self.tenant_name)
}
}
#[derive(Debug)]
pub struct TenantMonitoringInterpret {
pub score: TenantMonitoringScore,
}
#[async_trait::async_trait]
impl<T: Topology + TenantManager> Interpret<T> for TenantMonitoringInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let tenant_config = topology.get_tenant_config().await.unwrap();
let tenant_ns = tenant_config.name.clone();
match self.score.monitoring_stack.version {
MonitoringApiVersion::V1Helm => {
debug!("Installing Helm monitoring for tenant {}", tenant_ns);
}
MonitoringApiVersion::V2CRD => {
debug!("Installing CRD monitoring for tenant {}", tenant_ns);
}
MonitoringApiVersion::V3RHOB => {
debug!("Installing RHOB monitoring for tenant {}", tenant_ns);
}
}
Ok(Outcome::success(format!(
"Installed monitoring stack for tenant {} with version {:?}",
self.score.tenant_name,
self.score.monitoring_stack.version
)))
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("TenantMonitoringInterpret")
}
fn get_version(&self) -> Version {
Version::from("1.0.0").unwrap()
}
fn get_status(&self) -> InterpretStatus {
InterpretStatus::SUCCESS
}
fn get_children(&self) -> Vec<harmony_types::id::Id> {
Vec::new()
}
}

View File

@@ -1,7 +1,8 @@
use super::BrocadeClient;
use crate::{
BrocadeInfo, Error, ExecutionMode, InterSwitchLink, InterfaceInfo, MacAddressEntry,
PortChannelId, PortOperatingMode, parse_brocade_mac_address, shell::BrocadeShell,
PortChannelId, PortOperatingMode, SecurityLevel, parse_brocade_mac_address,
shell::BrocadeShell,
};
use async_trait::async_trait;

View File

@@ -8,7 +8,7 @@ use regex::Regex;
use crate::{
BrocadeClient, BrocadeInfo, Error, ExecutionMode, InterSwitchLink, InterfaceInfo,
InterfaceStatus, InterfaceType, MacAddressEntry, PortChannelId, PortOperatingMode,
parse_brocade_mac_address, shell::BrocadeShell,
SecurityLevel, parse_brocade_mac_address, shell::BrocadeShell,
};
#[derive(Debug)]

View File

@@ -31,16 +31,3 @@ Ready to build your own components? These guides show you how.
- [**Writing a Score**](./guides/writing-a-score.md): Learn how to create your own `Score` and `Interpret` logic to define a new desired state.
- [**Writing a Topology**](./guides/writing-a-topology.md): Learn how to model a new environment (like AWS, GCP, or custom hardware) as a `Topology`.
- [**Adding Capabilities**](./guides/adding-capabilities.md): See how to add a `Capability` to your custom `Topology`.
- [**Coding Guide**](./coding-guide.md): Conventions and best practices for writing Harmony code.
## 5. Module Documentation
Deep dives into specific Harmony modules and features.
- [**Monitoring and Alerting**](./monitoring.md): Comprehensive guide to cluster, tenant, and application-level monitoring with support for OKD, KubePrometheus, RHOB, and more.
## 6. Architecture Decision Records
Important architectural decisions are documented in the `adr/` directory:
- [Full ADR Index](../adr/)

View File

@@ -1,443 +0,0 @@
# Monitoring and Alerting in Harmony
Harmony provides a unified, type-safe approach to monitoring and alerting across Kubernetes, OpenShift, and bare-metal infrastructure. This guide explains the architecture and how to use it at different levels of abstraction.
## Overview
Harmony's monitoring module supports three distinct use cases:
| Level | Who Uses It | What It Provides |
|-------|-------------|------------------|
| **Cluster** | Cluster administrators | Full control over monitoring stack, cluster-wide alerts, external scrape targets |
| **Tenant** | Platform teams | Namespace-scoped monitoring in multi-tenant environments |
| **Application** | Application developers | Zero-config monitoring that "just works" |
Each level builds on the same underlying abstractions, ensuring consistency while providing appropriate complexity for each audience.
## Core Concepts
### AlertSender
An `AlertSender` represents the system that evaluates alert rules and sends notifications. Harmony supports multiple monitoring stacks:
| Sender | Description | Use When |
|--------|-------------|----------|
| `OpenshiftClusterAlertSender` | OKD/OpenShift built-in monitoring | Running on OKD/OpenShift |
| `KubePrometheus` | kube-prometheus-stack via Helm | Standard Kubernetes, need full stack |
| `Prometheus` | Standalone Prometheus | Custom Prometheus deployment |
| `RedHatClusterObservability` | RHOB operator | Red Hat managed clusters |
| `Grafana` | Grafana-managed alerting | Grafana as primary alerting layer |
### AlertReceiver
An `AlertReceiver` defines where alerts are sent (Discord, Slack, email, webhook, etc.). Receivers are parameterized by sender type because each monitoring stack has different configuration formats.
```rust
pub trait AlertReceiver<S: AlertSender> {
fn build(&self) -> Result<ReceiverInstallPlan, InterpretError>;
fn name(&self) -> String;
}
```
Built-in receivers:
- `DiscordReceiver` - Discord webhooks
- `WebhookReceiver` - Generic HTTP webhooks
### AlertRule
An `AlertRule` defines a Prometheus alert expression. Rules are also parameterized by sender to handle different CRD formats.
```rust
pub trait AlertRule<S: AlertSender> {
fn build_rule(&self) -> Result<serde_json::Value, InterpretError>;
fn name(&self) -> String;
}
```
### Observability Capability
Topologies implement `Observability<S>` to indicate they support a specific alert sender:
```rust
impl Observability<OpenshiftClusterAlertSender> for K8sAnywhereTopology {
async fn install_receivers(&self, sender, inventory, receivers) { ... }
async fn install_rules(&self, sender, inventory, rules) { ... }
// ...
}
```
This provides **compile-time verification**: if you try to use `OpenshiftClusterAlertScore` with a topology that doesn't implement `Observability<OpenshiftClusterAlertSender>`, the code won't compile.
---
## Level 1: Cluster Monitoring
Cluster monitoring is for administrators who need full control over the monitoring infrastructure. This includes:
- Installing/managing the monitoring stack
- Configuring cluster-wide alert receivers
- Defining cluster-level alert rules
- Adding external scrape targets (e.g., bare-metal servers, firewalls)
### Example: OKD Cluster Alerts
```rust
use harmony::{
modules::monitoring::{
alert_channel::discord_alert_channel::DiscordReceiver,
alert_rule::{alerts::k8s::pvc::high_pvc_fill_rate_over_two_days, prometheus_alert_rule::AlertManagerRuleGroup},
okd::openshift_cluster_alerting_score::OpenshiftClusterAlertScore,
scrape_target::prometheus_node_exporter::PrometheusNodeExporter,
},
topology::{K8sAnywhereTopology, monitoring::{AlertMatcher, AlertRoute, MatchOp}},
};
let severity_matcher = AlertMatcher {
label: "severity".to_string(),
operator: MatchOp::Eq,
value: "critical".to_string(),
};
let rule_group = AlertManagerRuleGroup::new(
"cluster-rules",
vec![high_pvc_fill_rate_over_two_days()],
);
let external_exporter = PrometheusNodeExporter {
job_name: "firewall".to_string(),
metrics_path: "/metrics".to_string(),
listen_address: ip!("192.168.1.1"),
port: 9100,
..Default::default()
};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(OpenshiftClusterAlertScore {
sender: OpenshiftClusterAlertSender,
receivers: vec![Box::new(DiscordReceiver {
name: "critical-alerts".to_string(),
url: hurl!("https://discord.com/api/webhooks/..."),
route: AlertRoute {
matchers: vec![severity_matcher],
..AlertRoute::default("critical-alerts".to_string())
},
})],
rules: vec![Box::new(rule_group)],
scrape_targets: Some(vec![Box::new(external_exporter)]),
})],
None,
).await?;
```
### What This Does
1. **Enables cluster monitoring** - Activates OKD's built-in Prometheus
2. **Enables user workload monitoring** - Allows namespace-scoped rules
3. **Configures Alertmanager** - Adds Discord receiver with route matching
4. **Deploys alert rules** - Creates `AlertingRule` CRD with PVC fill rate alert
5. **Adds external scrape target** - Configures Prometheus to scrape the firewall
### Compile-Time Safety
The `OpenshiftClusterAlertScore` requires:
```rust
impl<T: Topology + Observability<OpenshiftClusterAlertSender>> Score<T>
for OpenshiftClusterAlertScore
```
If `K8sAnywhereTopology` didn't implement `Observability<OpenshiftClusterAlertSender>`, this code would fail to compile. You cannot accidentally deploy OKD alerts to a cluster that doesn't support them.
---
## Level 2: Tenant Monitoring
In multi-tenant clusters, teams are often confined to specific namespaces. Tenant monitoring adapts to this constraint:
- Resources are deployed in the tenant's namespace
- Cannot modify cluster-level monitoring configuration
- The topology determines namespace context at runtime
### How It Works
The topology's `Observability` implementation handles tenant scoping:
```rust
impl Observability<KubePrometheus> for K8sAnywhereTopology {
async fn install_rules(&self, sender, inventory, rules) {
// Topology knows if it's tenant-scoped
let namespace = self.get_tenant_config().await
.map(|t| t.name)
.unwrap_or_else(|| "monitoring".to_string());
// Rules are installed in the appropriate namespace
for rule in rules.unwrap_or_default() {
let score = KubePrometheusRuleScore {
sender: sender.clone(),
rule,
namespace: namespace.clone(), // Tenant namespace
};
score.create_interpret().execute(inventory, self).await?;
}
}
}
```
### Tenant vs Cluster Resources
| Resource | Cluster-Level | Tenant-Level |
|----------|---------------|--------------|
| Alertmanager config | Global receivers | Namespaced receivers (where supported) |
| PrometheusRules | Cluster-wide alerts | Namespace alerts only |
| ServiceMonitors | Any namespace | Own namespace only |
| External scrape targets | Can add | Cannot add (cluster config) |
### Runtime Validation
Tenant constraints are validated at runtime via Kubernetes RBAC. If a tenant-scoped deployment attempts cluster-level operations, it fails with a clear permission error from the Kubernetes API.
This cannot be fully compile-time because tenant context is determined by who's running the code and what permissions they have—information only available at runtime.
---
## Level 3: Application Monitoring
Application monitoring provides zero-config, opinionated monitoring for developers. Just add the `Monitoring` feature to your application and it works.
### Example
```rust
use harmony::modules::{
application::{Application, ApplicationFeature},
monitoring::alert_channel::webhook_receiver::WebhookReceiver,
};
// Define your application
let my_app = MyApplication::new();
// Add monitoring as a feature
let monitoring = Monitoring {
application: Arc::new(my_app),
alert_receiver: vec![], // Uses defaults
};
// Install with the application
my_app.add_feature(monitoring);
```
### What Application Monitoring Provides
1. **Automatic ServiceMonitor** - Creates a ServiceMonitor for your application's pods
2. **Ntfy Notification Channel** - Auto-installs and configures Ntfy for push notifications
3. **Tenant Awareness** - Automatically scopes to the correct namespace
4. **Sensible Defaults** - Pre-configured alert routes and receivers
### Under the Hood
```rust
impl<T: Topology + Observability<Prometheus> + TenantManager>
ApplicationFeature<T> for Monitoring
{
async fn ensure_installed(&self, topology: &T) -> Result<...> {
// 1. Get tenant namespace (or use app name)
let namespace = topology.get_tenant_config().await
.map(|ns| ns.name.clone())
.unwrap_or_else(|| self.application.name());
// 2. Create ServiceMonitor for the app
let app_service_monitor = ServiceMonitor {
metadata: ObjectMeta {
name: Some(self.application.name()),
namespace: Some(namespace.clone()),
..Default::default()
},
spec: ServiceMonitorSpec::default(),
};
// 3. Install Ntfy for notifications
let ntfy = NtfyScore { namespace, host };
ntfy.interpret(&Inventory::empty(), topology).await?;
// 4. Wire up webhook receiver to Ntfy
let ntfy_receiver = WebhookReceiver { ... };
// 5. Execute monitoring score
alerting_score.interpret(&Inventory::empty(), topology).await?;
}
}
```
---
## Pre-Built Alert Rules
Harmony provides a library of common alert rules in `modules/monitoring/alert_rule/alerts/`:
### Kubernetes Alerts (`alerts/k8s/`)
```rust
use harmony::modules::monitoring::alert_rule::alerts::k8s::{
pod::pod_failed,
pvc::high_pvc_fill_rate_over_two_days,
memory_usage::alert_high_memory_usage,
};
let rules = AlertManagerRuleGroup::new("k8s-rules", vec![
pod_failed(),
high_pvc_fill_rate_over_two_days(),
alert_high_memory_usage(),
]);
```
Available rules:
- `pod_failed()` - Pod in failed state
- `alert_container_restarting()` - Container restart loop
- `alert_pod_not_ready()` - Pod not ready for extended period
- `high_pvc_fill_rate_over_two_days()` - PVC will fill within 2 days
- `alert_high_memory_usage()` - Memory usage above threshold
- `alert_high_cpu_usage()` - CPU usage above threshold
### Infrastructure Alerts (`alerts/infra/`)
```rust
use harmony::modules::monitoring::alert_rule::alerts::infra::opnsense::high_http_error_rate;
let rules = AlertManagerRuleGroup::new("infra-rules", vec![
high_http_error_rate(),
]);
```
### Creating Custom Rules
```rust
use harmony::modules::monitoring::alert_rule::prometheus_alert_rule::PrometheusAlertRule;
pub fn my_custom_alert() -> PrometheusAlertRule {
PrometheusAlertRule::new("MyServiceDown", "up{job=\"my-service\"} == 0")
.for_duration("5m")
.label("severity", "critical")
.annotation("summary", "My service is down")
.annotation("description", "The my-service job has been down for more than 5 minutes")
}
```
---
## Alert Receivers
### Discord Webhook
```rust
use harmony::modules::monitoring::alert_channel::discord_alert_channel::DiscordReceiver;
use harmony::topology::monitoring::{AlertRoute, AlertMatcher, MatchOp};
let discord = DiscordReceiver {
name: "ops-alerts".to_string(),
url: hurl!("https://discord.com/api/webhooks/123456/abcdef"),
route: AlertRoute {
receiver: "ops-alerts".to_string(),
matchers: vec![AlertMatcher {
label: "severity".to_string(),
operator: MatchOp::Eq,
value: "critical".to_string(),
}],
group_by: vec!["alertname".to_string()],
repeat_interval: Some("30m".to_string()),
continue_matching: false,
children: vec![],
},
};
```
### Generic Webhook
```rust
use harmony::modules::monitoring::alert_channel::webhook_receiver::WebhookReceiver;
let webhook = WebhookReceiver {
name: "custom-webhook".to_string(),
url: hurl!("https://api.example.com/alerts"),
route: AlertRoute::default("custom-webhook".to_string()),
};
```
---
## Adding a New Monitoring Stack
To add support for a new monitoring stack:
1. **Create the sender type** in `modules/monitoring/my_sender/mod.rs`:
```rust
#[derive(Debug, Clone)]
pub struct MySender;
impl AlertSender for MySender {
fn name(&self) -> String { "MySender".to_string() }
}
```
2. **Define CRD types** in `modules/monitoring/my_sender/crd/`:
```rust
#[derive(CustomResource, Debug, Serialize, Deserialize, Clone)]
#[kube(group = "monitoring.example.com", version = "v1", kind = "MyAlertRule")]
pub struct MyAlertRuleSpec { ... }
```
3. **Implement Observability** in `domain/topology/k8s_anywhere/observability/my_sender.rs`:
```rust
impl Observability<MySender> for K8sAnywhereTopology {
async fn install_receivers(&self, sender, inventory, receivers) { ... }
async fn install_rules(&self, sender, inventory, rules) { ... }
// ...
}
```
4. **Implement receiver conversions** for existing receivers:
```rust
impl AlertReceiver<MySender> for DiscordReceiver {
fn build(&self) -> Result<ReceiverInstallPlan, InterpretError> {
// Convert DiscordReceiver to MySender's format
}
}
```
5. **Create score types**:
```rust
pub struct MySenderAlertScore {
pub sender: MySender,
pub receivers: Vec<Box<dyn AlertReceiver<MySender>>>,
pub rules: Vec<Box<dyn AlertRule<MySender>>>,
}
```
---
## Architecture Principles
### Type Safety Over Flexibility
Each monitoring stack has distinct CRDs and configuration formats. Rather than a unified "MonitoringStack" type that loses stack-specific features, we use generic traits that provide type safety while allowing each stack to express its unique configuration.
### Compile-Time Capability Verification
The `Observability<S>` bound ensures you can't deploy OKD alerts to a KubePrometheus cluster. The compiler catches platform mismatches before deployment.
### Explicit Over Implicit
Monitoring stacks are chosen explicitly (`OpenshiftClusterAlertSender` vs `KubePrometheus`). There's no "auto-detection" that could lead to surprising behavior.
### Three Levels, One Foundation
Cluster, tenant, and application monitoring all use the same traits (`AlertSender`, `AlertReceiver`, `AlertRule`). The difference is in how scores are constructed and how topologies interpret them.
---
## Related Documentation
- [ADR-020: Monitoring and Alerting Architecture](../adr/020-monitoring-alerting-architecture.md)
- [ADR-013: Monitoring Notifications (ntfy)](../adr/013-monitoring-notifications.md)
- [ADR-011: Multi-Tenant Cluster Architecture](../adr/011-multi-tenant-cluster.md)
- [Coding Guide](coding-guide.md)
- [Core Concepts](concepts.md)

View File

@@ -7,7 +7,7 @@ use harmony::{
monitoring::alert_channel::webhook_receiver::WebhookReceiver,
tenant::TenantScore,
},
topology::{K8sAnywhereTopology, monitoring::AlertRoute, tenant::TenantConfig},
topology::{K8sAnywhereTopology, tenant::TenantConfig},
};
use harmony_types::id::Id;
use harmony_types::net::Url;
@@ -33,14 +33,9 @@ async fn main() {
service_port: 3000,
});
let receiver_name = "sample-webhook-receiver".to_string();
let webhook_receiver = WebhookReceiver {
name: receiver_name.clone(),
name: "sample-webhook-receiver".to_string(),
url: Url::Url(url::Url::parse("https://webhook-doesnt-exist.com").unwrap()),
route: AlertRoute {
..AlertRoute::default(receiver_name)
},
};
let app = ApplicationScore {

View File

@@ -1,8 +1,8 @@
use harmony::{
inventory::Inventory,
modules::cert_manager::{
capability::CertificateManagementConfig, score_certificate::CertificateScore,
score_issuer::CertificateIssuerScore,
capability::CertificateManagementConfig, score_cert_management::CertificateManagementScore,
score_certificate::CertificateScore, score_issuer::CertificateIssuerScore,
},
topology::K8sAnywhereTopology,
};

View File

@@ -10,10 +10,9 @@ publish = false
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
harmony_macros = { path = "../../harmony_macros" }
harmony-k8s = { path = "../../harmony-k8s" }
cidr.workspace = true
tokio.workspace = true
harmony_macros = { path = "../../harmony_macros" }
log.workspace = true
env_logger.workspace = true
url.workspace = true

View File

@@ -1,6 +1,6 @@
use std::time::Duration;
use harmony_k8s::{DrainOptions, K8sClient};
use harmony::topology::k8s::{DrainOptions, K8sClient};
use log::{info, trace};
#[tokio::main]

View File

@@ -10,10 +10,9 @@ publish = false
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
harmony_macros = { path = "../../harmony_macros" }
harmony-k8s = { path = "../../harmony-k8s" }
cidr.workspace = true
tokio.workspace = true
harmony_macros = { path = "../../harmony_macros" }
log.workspace = true
env_logger.workspace = true
url.workspace = true

View File

@@ -1,4 +1,4 @@
use harmony_k8s::{K8sClient, NodeFile};
use harmony::topology::k8s::{DrainOptions, K8sClient, NodeFile};
use log::{info, trace};
#[tokio::main]

View File

@@ -1,45 +1,37 @@
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use std::collections::HashMap;
use harmony::{
inventory::Inventory,
modules::monitoring::{
alert_channel::discord_alert_channel::DiscordReceiver,
alert_rule::{
alerts::{
infra::dell_server::{
alert_global_storage_status_critical,
alert_global_storage_status_non_recoverable,
global_storage_status_degraded_non_critical,
modules::{
monitoring::{
alert_channel::discord_alert_channel::DiscordWebhook,
alert_rule::prometheus_alert_rule::AlertManagerRuleGroup,
kube_prometheus::{
helm_prometheus_alert_score::HelmPrometheusAlertingScore,
types::{
HTTPScheme, MatchExpression, Operator, Selector, ServiceMonitor,
ServiceMonitorEndpoint,
},
k8s::pvc::high_pvc_fill_rate_over_two_days,
},
prometheus_alert_rule::AlertManagerRuleGroup,
},
kube_prometheus::{
helm::config::KubePrometheusConfig,
kube_prometheus_alerting_score::KubePrometheusAlertingScore,
types::{
HTTPScheme, MatchExpression, Operator, Selector, ServiceMonitor,
ServiceMonitorEndpoint,
prometheus::alerts::{
infra::dell_server::{
alert_global_storage_status_critical, alert_global_storage_status_non_recoverable,
global_storage_status_degraded_non_critical,
},
k8s::pvc::high_pvc_fill_rate_over_two_days,
},
},
topology::{K8sAnywhereTopology, monitoring::AlertRoute},
topology::K8sAnywhereTopology,
};
use harmony_types::{k8s_name::K8sName, net::Url};
#[tokio::main]
async fn main() {
let receiver_name = "test-discord".to_string();
let discord_receiver = DiscordReceiver {
name: receiver_name.clone(),
let discord_receiver = DiscordWebhook {
name: K8sName("test-discord".to_string()),
url: Url::Url(url::Url::parse("https://discord.doesnt.exist.com").unwrap()),
route: AlertRoute {
..AlertRoute::default(receiver_name)
},
selectors: vec![],
};
let high_pvc_fill_rate_over_two_days_alert = high_pvc_fill_rate_over_two_days();
@@ -78,15 +70,10 @@ async fn main() {
endpoints: vec![service_monitor_endpoint],
..Default::default()
};
let config = Arc::new(Mutex::new(KubePrometheusConfig::new()));
let alerting_score = KubePrometheusAlertingScore {
let alerting_score = HelmPrometheusAlertingScore {
receivers: vec![Box::new(discord_receiver)],
rules: vec![Box::new(additional_rules), Box::new(additional_rules2)],
service_monitors: vec![service_monitor],
scrape_targets: None,
config,
};
harmony_cli::run(

View File

@@ -1,32 +1,24 @@
use std::{
collections::HashMap,
str::FromStr,
sync::{Arc, Mutex},
};
use std::{collections::HashMap, str::FromStr};
use harmony::{
inventory::Inventory,
modules::{
monitoring::{
alert_channel::discord_alert_channel::DiscordReceiver,
alert_rule::{
alerts::k8s::pvc::high_pvc_fill_rate_over_two_days,
prometheus_alert_rule::AlertManagerRuleGroup,
},
alert_channel::discord_alert_channel::DiscordWebhook,
alert_rule::prometheus_alert_rule::AlertManagerRuleGroup,
kube_prometheus::{
helm::config::KubePrometheusConfig,
kube_prometheus_alerting_score::KubePrometheusAlertingScore,
helm_prometheus_alert_score::HelmPrometheusAlertingScore,
types::{
HTTPScheme, MatchExpression, Operator, Selector, ServiceMonitor,
ServiceMonitorEndpoint,
},
},
},
prometheus::alerts::k8s::pvc::high_pvc_fill_rate_over_two_days,
tenant::TenantScore,
},
topology::{
K8sAnywhereTopology,
monitoring::AlertRoute,
tenant::{ResourceLimits, TenantConfig, TenantNetworkPolicy},
},
};
@@ -50,13 +42,10 @@ async fn main() {
},
};
let receiver_name = "test-discord".to_string();
let discord_receiver = DiscordReceiver {
name: receiver_name.clone(),
let discord_receiver = DiscordWebhook {
name: K8sName("test-discord".to_string()),
url: Url::Url(url::Url::parse("https://discord.doesnt.exist.com").unwrap()),
route: AlertRoute {
..AlertRoute::default(receiver_name)
},
selectors: vec![],
};
let high_pvc_fill_rate_over_two_days_alert = high_pvc_fill_rate_over_two_days();
@@ -85,14 +74,10 @@ async fn main() {
..Default::default()
};
let config = Arc::new(Mutex::new(KubePrometheusConfig::new()));
let alerting_score = KubePrometheusAlertingScore {
let alerting_score = HelmPrometheusAlertingScore {
receivers: vec![Box::new(discord_receiver)],
rules: vec![Box::new(additional_rules)],
service_monitors: vec![service_monitor],
scrape_targets: None,
config,
};
harmony_cli::run(

View File

@@ -1,16 +0,0 @@
[package]
name = "example-node-health"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
tokio = { workspace = true }
harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true }
env_logger = { workspace = true }

View File

@@ -1,17 +0,0 @@
use harmony::{
inventory::Inventory, modules::node_health::NodeHealthScore, topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let node_health = NodeHealthScore {};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(node_health)],
None,
)
.await
.unwrap();
}

View File

@@ -1,64 +1,35 @@
use std::collections::HashMap;
use harmony::{
inventory::Inventory,
modules::monitoring::{
alert_channel::discord_alert_channel::DiscordReceiver,
alert_rule::{
alerts::{
infra::opnsense::high_http_error_rate, k8s::pvc::high_pvc_fill_rate_over_two_days,
},
prometheus_alert_rule::AlertManagerRuleGroup,
},
okd::openshift_cluster_alerting_score::OpenshiftClusterAlertScore,
scrape_target::prometheus_node_exporter::PrometheusNodeExporter,
},
topology::{
K8sAnywhereTopology,
monitoring::{AlertMatcher, AlertRoute, MatchOp},
alert_channel::discord_alert_channel::DiscordWebhook,
okd::cluster_monitoring::OpenshiftClusterAlertScore,
},
topology::K8sAnywhereTopology,
};
use harmony_macros::{hurl, ip};
use harmony_macros::hurl;
use harmony_types::k8s_name::K8sName;
#[tokio::main]
async fn main() {
let platform_matcher = AlertMatcher {
label: "prometheus".to_string(),
operator: MatchOp::Eq,
value: "openshift-monitoring/k8s".to_string(),
};
let severity = AlertMatcher {
label: "severity".to_string(),
operator: MatchOp::Eq,
value: "critical".to_string(),
};
let high_http_error_rate = high_http_error_rate();
let additional_rules = AlertManagerRuleGroup::new("test-rule", vec![high_http_error_rate]);
let scrape_target = PrometheusNodeExporter {
job_name: "firewall".to_string(),
metrics_path: "/metrics".to_string(),
listen_address: ip!("192.168.1.1"),
port: 9100,
..Default::default()
};
let mut sel = HashMap::new();
sel.insert(
"openshift_io_alert_source".to_string(),
"platform".to_string(),
);
let mut sel2 = HashMap::new();
sel2.insert("openshift_io_alert_source".to_string(), "".to_string());
let selectors = vec![sel, sel2];
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(OpenshiftClusterAlertScore {
receivers: vec![Box::new(DiscordReceiver {
name: "crit-wills-discord-channel-example".to_string(),
url: hurl!("https://test.io"),
route: AlertRoute {
matchers: vec![severity],
..AlertRoute::default("crit-wills-discord-channel-example".to_string())
},
receivers: vec![Box::new(DiscordWebhook {
name: K8sName("wills-discord-webhook-example".to_string()),
url: hurl!("https://something.io"),
selectors: selectors,
})],
sender: harmony::modules::monitoring::okd::OpenshiftClusterAlertSender,
rules: vec![Box::new(additional_rules)],
scrape_targets: Some(vec![Box::new(scrape_target)]),
})],
None,
)

View File

@@ -1,13 +1,63 @@
use std::str::FromStr;
use harmony::{
inventory::Inventory, modules::openbao::OpenbaoScore, topology::K8sAnywhereTopology,
inventory::Inventory,
modules::helm::chart::{HelmChartScore, HelmRepository, NonBlankString},
topology::K8sAnywhereTopology,
};
use harmony_macros::hurl;
#[tokio::main]
async fn main() {
let openbao = OpenbaoScore {
host: "openbao.sebastien.sto1.nationtech.io".to_string(),
let values_yaml = Some(
r#"server:
standalone:
enabled: true
config: |
listener "tcp" {
tls_disable = true
address = "[::]:8200"
cluster_address = "[::]:8201"
}
storage "file" {
path = "/openbao/data"
}
service:
enabled: true
dataStorage:
enabled: true
size: 10Gi
storageClass: null
accessMode: ReadWriteOnce
auditStorage:
enabled: true
size: 10Gi
storageClass: null
accessMode: ReadWriteOnce"#
.to_string(),
);
let openbao = HelmChartScore {
namespace: Some(NonBlankString::from_str("openbao").unwrap()),
release_name: NonBlankString::from_str("openbao").unwrap(),
chart_name: NonBlankString::from_str("openbao/openbao").unwrap(),
chart_version: None,
values_overrides: None,
values_yaml,
create_namespace: true,
install_only: true,
repository: Some(HelmRepository::new(
"openbao".to_string(),
hurl!("https://openbao.github.io/openbao-helm"),
true,
)),
};
// TODO exec pod commands to initialize secret store if not already done
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),

View File

@@ -1,3 +1,5 @@
use std::str::FromStr;
use harmony::{
inventory::Inventory,
modules::{k8s::apps::OperatorHubCatalogSourceScore, postgresql::CloudNativePgOperatorScore},
@@ -7,7 +9,7 @@ use harmony::{
#[tokio::main]
async fn main() {
let operatorhub_catalog = OperatorHubCatalogSourceScore::default();
let cnpg_operator = CloudNativePgOperatorScore::default_openshift();
let cnpg_operator = CloudNativePgOperatorScore::default();
harmony_cli::run(
Inventory::autoload(),

View File

@@ -1,13 +1,22 @@
use std::sync::Arc;
use std::{
net::{IpAddr, Ipv4Addr},
sync::Arc,
};
use async_trait::async_trait;
use cidr::Ipv4Cidr;
use harmony::{
executors::ExecutorError,
hardware::{HostCategory, Location, PhysicalHost, SwitchGroup},
infra::opnsense::OPNSenseManagementInterface,
inventory::Inventory,
modules::opnsense::node_exporter::NodeExporterScore,
topology::{PreparationError, PreparationOutcome, Topology, node_exporter::NodeExporter},
topology::{
HAClusterTopology, LogicalHost, PreparationError, PreparationOutcome, Topology,
UnmanagedRouter, node_exporter::NodeExporter,
},
};
use harmony_macros::ip;
use harmony_macros::{ip, ipv4, mac_address};
#[derive(Debug)]
struct OpnSenseTopology {

View File

@@ -1,7 +1,8 @@
use harmony::{
inventory::Inventory,
modules::postgresql::{
PostgreSQLConnectionScore, PublicPostgreSQLScore, capability::PostgreSQLConfig,
K8sPostgreSQLScore, PostgreSQLConnectionScore, PublicPostgreSQLScore,
capability::PostgreSQLConfig,
},
topology::K8sAnywhereTopology,
};

View File

@@ -1,4 +1,4 @@
use std::{path::PathBuf, sync::Arc};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use harmony::{
inventory::Inventory,
@@ -6,9 +6,9 @@ use harmony::{
application::{
ApplicationScore, RustWebFramework, RustWebapp, features::rhob_monitoring::Monitoring,
},
monitoring::alert_channel::discord_alert_channel::DiscordReceiver,
monitoring::alert_channel::discord_alert_channel::DiscordWebhook,
},
topology::{K8sAnywhereTopology, monitoring::AlertRoute},
topology::K8sAnywhereTopology,
};
use harmony_types::{k8s_name::K8sName, net::Url};
@@ -22,21 +22,18 @@ async fn main() {
service_port: 3000,
});
let receiver_name = "test-discord".to_string();
let discord_receiver = DiscordReceiver {
name: receiver_name.clone(),
let discord_receiver = DiscordWebhook {
name: K8sName("test-discord".to_string()),
url: Url::Url(url::Url::parse("https://discord.doesnt.exist.com").unwrap()),
route: AlertRoute {
..AlertRoute::default(receiver_name)
},
selectors: vec![],
};
let app = ApplicationScore {
features: vec![
// Box::new(Monitoring {
// application: application.clone(),
// alert_receiver: vec![Box::new(discord_receiver)],
// }),
Box::new(Monitoring {
application: application.clone(),
alert_receiver: vec![Box::new(discord_receiver)],
}),
// TODO add backups, multisite ha, etc
],
application,

View File

@@ -1,4 +1,4 @@
use std::{path::PathBuf, sync::Arc};
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use harmony::{
inventory::Inventory,
@@ -8,13 +8,13 @@ use harmony::{
features::{Monitoring, PackagingDeployment},
},
monitoring::alert_channel::{
discord_alert_channel::DiscordReceiver, webhook_receiver::WebhookReceiver,
discord_alert_channel::DiscordWebhook, webhook_receiver::WebhookReceiver,
},
},
topology::{K8sAnywhereTopology, monitoring::AlertRoute},
topology::K8sAnywhereTopology,
};
use harmony_macros::hurl;
use harmony_types::{k8s_name::K8sName, net::Url};
use harmony_types::k8s_name::K8sName;
#[tokio::main]
async fn main() {
@@ -26,23 +26,15 @@ async fn main() {
service_port: 3000,
});
let receiver_name = "test-discord".to_string();
let discord_receiver = DiscordReceiver {
name: receiver_name.clone(),
url: Url::Url(url::Url::parse("https://discord.doesnt.exist.com").unwrap()),
route: AlertRoute {
..AlertRoute::default(receiver_name)
},
let discord_receiver = DiscordWebhook {
name: K8sName("test-discord".to_string()),
url: hurl!("https://discord.doesnt.exist.com"),
selectors: vec![],
};
let receiver_name = "sample-webhook-receiver".to_string();
let webhook_receiver = WebhookReceiver {
name: receiver_name.clone(),
name: "sample-webhook-receiver".to_string(),
url: hurl!("https://webhook-doesnt-exist.com"),
route: AlertRoute {
..AlertRoute::default(receiver_name)
},
};
let app = ApplicationScore {
@@ -50,10 +42,10 @@ async fn main() {
Box::new(PackagingDeployment {
application: application.clone(),
}),
// Box::new(Monitoring {
// application: application.clone(),
// alert_receiver: vec![Box::new(discord_receiver), Box::new(webhook_receiver)],
// }),
Box::new(Monitoring {
application: application.clone(),
alert_receiver: vec![Box::new(discord_receiver), Box::new(webhook_receiver)],
}),
// TODO add backups, multisite ha, etc
],
application,

View File

@@ -1,8 +1,11 @@
use harmony::{
inventory::Inventory,
modules::application::{
ApplicationScore, RustWebFramework, RustWebapp,
features::{Monitoring, PackagingDeployment},
modules::{
application::{
ApplicationScore, RustWebFramework, RustWebapp,
features::{Monitoring, PackagingDeployment},
},
monitoring::alert_channel::discord_alert_channel::DiscordWebhook,
},
topology::K8sAnywhereTopology,
};
@@ -27,14 +30,14 @@ async fn main() {
Box::new(PackagingDeployment {
application: application.clone(),
}),
// Box::new(Monitoring {
// application: application.clone(),
// alert_receiver: vec![Box::new(DiscordWebhook {
// name: K8sName("test-discord".to_string()),
// url: hurl!("https://discord.doesnt.exist.com"),
// selectors: vec![],
// })],
// }),
Box::new(Monitoring {
application: application.clone(),
alert_receiver: vec![Box::new(DiscordWebhook {
name: K8sName("test-discord".to_string()),
url: hurl!("https://discord.doesnt.exist.com"),
selectors: vec![],
})],
}),
],
application,
};

View File

@@ -1,14 +0,0 @@
[package]
name = "example-zitadel"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_macros = { path = "../../harmony_macros" }
harmony_types = { path = "../../harmony_types" }
tokio.workspace = true
url.workspace = true

View File

@@ -1,20 +0,0 @@
use harmony::{
inventory::Inventory, modules::zitadel::ZitadelScore, topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let zitadel = ZitadelScore {
host: "sso.sto1.nationtech.io".to_string(),
zitadel_version: "v4.12.1".to_string(),
};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(zitadel)],
None,
)
.await
.unwrap();
}

Binary file not shown.

View File

@@ -1,23 +0,0 @@
[package]
name = "harmony-k8s"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
kube.workspace = true
k8s-openapi.workspace = true
tokio.workspace = true
tokio-retry.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
log.workspace = true
similar.workspace = true
reqwest.workspace = true
url.workspace = true
inquire.workspace = true
[dev-dependencies]
pretty_assertions.workspace = true

View File

@@ -1,593 +0,0 @@
use kube::{
Client, Error, Resource,
api::{
Api, ApiResource, DynamicObject, GroupVersionKind, Patch, PatchParams, PostParams,
ResourceExt,
},
core::ErrorResponse,
discovery::Scope,
error::DiscoveryError,
};
use log::{debug, error, trace, warn};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::Value;
use similar::TextDiff;
use url::Url;
use crate::client::K8sClient;
use crate::helper;
use crate::types::WriteMode;
/// The field-manager token sent with every server-side apply request.
pub const FIELD_MANAGER: &str = "harmony-k8s";
// ── Private helpers ──────────────────────────────────────────────────────────
/// Serialise any `Serialize` payload to a [`DynamicObject`] via JSON.
fn to_dynamic<T: Serialize>(payload: &T) -> Result<DynamicObject, Error> {
serde_json::from_value(serde_json::to_value(payload).map_err(Error::SerdeError)?)
.map_err(Error::SerdeError)
}
/// Fetch the current resource, display a unified diff against `payload`, and
/// return `()`. All output goes to stdout (same behaviour as before).
///
/// A 404 is treated as "resource would be created" — not an error.
async fn show_dry_run<T: Serialize>(
api: &Api<DynamicObject>,
name: &str,
payload: &T,
) -> Result<(), Error> {
let new_yaml = serde_yaml::to_string(payload)
.unwrap_or_else(|_| "Failed to serialize new resource".to_string());
match api.get(name).await {
Ok(current) => {
println!("\nDry-run for resource: '{name}'");
let mut current_val = serde_yaml::to_value(&current).unwrap_or(serde_yaml::Value::Null);
if let Some(map) = current_val.as_mapping_mut() {
map.remove(&serde_yaml::Value::String("status".to_string()));
}
let current_yaml = serde_yaml::to_string(&current_val)
.unwrap_or_else(|_| "Failed to serialize current resource".to_string());
if current_yaml == new_yaml {
println!("No changes detected.");
} else {
println!("Changes detected:");
let diff = TextDiff::from_lines(&current_yaml, &new_yaml);
for change in diff.iter_all_changes() {
let sign = match change.tag() {
similar::ChangeTag::Delete => "-",
similar::ChangeTag::Insert => "+",
similar::ChangeTag::Equal => " ",
};
print!("{sign}{change}");
}
}
Ok(())
}
Err(Error::Api(ErrorResponse { code: 404, .. })) => {
println!("\nDry-run for new resource: '{name}'");
println!("Resource does not exist. Would be created:");
for line in new_yaml.lines() {
println!("+{line}");
}
Ok(())
}
Err(e) => {
error!("Failed to fetch resource '{name}' for dry-run: {e}");
Err(e)
}
}
}
/// Execute the real (non-dry-run) apply, respecting [`WriteMode`].
async fn do_apply<T: Serialize + std::fmt::Debug>(
api: &Api<DynamicObject>,
name: &str,
payload: &T,
patch_params: &PatchParams,
write_mode: &WriteMode,
) -> Result<DynamicObject, Error> {
match write_mode {
WriteMode::CreateOrUpdate => {
// TODO refactor this arm to perform self.update and if fail with 404 self.create
// This will avoid the repetition of the api.patch and api.create calls within this
// function body. This makes the code more maintainable
match api.patch(name, patch_params, &Patch::Apply(payload)).await {
Ok(obj) => Ok(obj),
Err(Error::Api(ErrorResponse { code: 404, .. })) => {
debug!("Resource '{name}' not found via SSA, falling back to POST");
let dyn_obj = to_dynamic(payload)?;
api.create(&PostParams::default(), &dyn_obj)
.await
.map_err(|e| {
error!("Failed to create '{name}': {e}");
e
})
}
Err(e) => {
error!("Failed to apply '{name}': {e}");
Err(e)
}
}
}
WriteMode::Create => {
let dyn_obj = to_dynamic(payload)?;
api.create(&PostParams::default(), &dyn_obj)
.await
.map_err(|e| {
error!("Failed to create '{name}': {e}");
e
})
}
WriteMode::Update => match api.patch(name, patch_params, &Patch::Apply(payload)).await {
Ok(obj) => Ok(obj),
Err(Error::Api(ErrorResponse { code: 404, .. })) => Err(Error::Api(ErrorResponse {
code: 404,
message: format!("Resource '{name}' not found and WriteMode is UpdateOnly"),
reason: "NotFound".to_string(),
status: "Failure".to_string(),
})),
Err(e) => {
error!("Failed to update '{name}': {e}");
Err(e)
}
},
}
}
// ── Public API ───────────────────────────────────────────────────────────────
impl K8sClient {
/// Server-side apply: create if absent, update if present.
/// Equivalent to `kubectl apply`.
pub async fn apply<K>(&self, resource: &K, namespace: Option<&str>) -> Result<K, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + Serialize,
<K as Resource>::DynamicType: Default,
{
self.apply_with_strategy(resource, namespace, WriteMode::CreateOrUpdate)
.await
}
/// POST only — returns an error if the resource already exists.
pub async fn create<K>(&self, resource: &K, namespace: Option<&str>) -> Result<K, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + Serialize,
<K as Resource>::DynamicType: Default,
{
self.apply_with_strategy(resource, namespace, WriteMode::Create)
.await
}
/// Server-side apply only — returns an error if the resource does not exist.
pub async fn update<K>(&self, resource: &K, namespace: Option<&str>) -> Result<K, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + Serialize,
<K as Resource>::DynamicType: Default,
{
self.apply_with_strategy(resource, namespace, WriteMode::Update)
.await
}
pub async fn apply_with_strategy<K>(
&self,
resource: &K,
namespace: Option<&str>,
write_mode: WriteMode,
) -> Result<K, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + Serialize,
<K as Resource>::DynamicType: Default,
{
debug!(
"apply_with_strategy: {:?} ns={:?}",
resource.meta().name,
namespace
);
trace!("{:#}", serde_json::to_value(resource).unwrap_or_default());
let dyntype = K::DynamicType::default();
let gvk = GroupVersionKind {
group: K::group(&dyntype).to_string(),
version: K::version(&dyntype).to_string(),
kind: K::kind(&dyntype).to_string(),
};
let discovery = self.discovery().await?;
let (ar, caps) = discovery.resolve_gvk(&gvk).ok_or_else(|| {
Error::Discovery(DiscoveryError::MissingResource(format!(
"Cannot resolve GVK: {gvk:?}"
)))
})?;
let effective_ns = if caps.scope == Scope::Cluster {
None
} else {
namespace.or_else(|| resource.meta().namespace.as_deref())
};
let api: Api<DynamicObject> =
get_dynamic_api(ar, caps, self.client.clone(), effective_ns, false);
let name = resource
.meta()
.name
.as_deref()
.expect("Kubernetes resource must have a name");
if self.dry_run {
show_dry_run(&api, name, resource).await?;
return Ok(resource.clone());
}
let patch_params = PatchParams::apply(FIELD_MANAGER);
do_apply(&api, name, resource, &patch_params, &write_mode)
.await
.and_then(helper::dyn_to_typed)
}
/// Applies resources in order, one at a time
pub async fn apply_many<K>(&self, resources: &[K], ns: Option<&str>) -> Result<Vec<K>, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + Serialize,
<K as Resource>::DynamicType: Default,
{
let mut result = Vec::new();
for r in resources.iter() {
let res = self.apply(r, ns).await;
if res.is_err() {
// NOTE: this may log sensitive data; downgrade to debug if needed.
warn!(
"Failed to apply k8s resource: {}",
serde_json::to_string_pretty(r).map_err(Error::SerdeError)?
);
}
result.push(res?);
}
Ok(result)
}
/// Apply a [`DynamicObject`] resource using server-side apply.
pub async fn apply_dynamic(
&self,
resource: &DynamicObject,
namespace: Option<&str>,
force_conflicts: bool,
) -> Result<DynamicObject, Error> {
trace!("apply_dynamic {resource:#?} ns={namespace:?} force={force_conflicts}");
let discovery = self.discovery().await?;
let type_meta = resource.types.as_ref().ok_or_else(|| {
Error::BuildRequest(kube::core::request::Error::Validation(
"DynamicObject must have types (apiVersion and kind)".to_string(),
))
})?;
let gvk = GroupVersionKind::try_from(type_meta).map_err(|_| {
Error::BuildRequest(kube::core::request::Error::Validation(format!(
"Invalid GVK in DynamicObject: {type_meta:?}"
)))
})?;
let (ar, caps) = discovery.resolve_gvk(&gvk).ok_or_else(|| {
Error::Discovery(DiscoveryError::MissingResource(format!(
"Cannot resolve GVK: {gvk:?}"
)))
})?;
let effective_ns = if caps.scope == Scope::Cluster {
None
} else {
namespace.or_else(|| resource.metadata.namespace.as_deref())
};
let api = get_dynamic_api(ar, caps, self.client.clone(), effective_ns, false);
let name = resource.metadata.name.as_deref().ok_or_else(|| {
Error::BuildRequest(kube::core::request::Error::Validation(
"DynamicObject must have metadata.name".to_string(),
))
})?;
debug!(
"apply_dynamic kind={:?} name='{name}' ns={effective_ns:?}",
resource.types.as_ref().map(|t| &t.kind),
);
// NOTE would be nice to improve cohesion between the dynamic and typed apis and avoid copy
// pasting the dry_run and some more logic
if self.dry_run {
show_dry_run(&api, name, resource).await?;
return Ok(resource.clone());
}
let mut patch_params = PatchParams::apply(FIELD_MANAGER);
patch_params.force = force_conflicts;
do_apply(
&api,
name,
resource,
&patch_params,
&WriteMode::CreateOrUpdate,
)
.await
}
pub async fn apply_dynamic_many(
&self,
resources: &[DynamicObject],
namespace: Option<&str>,
force_conflicts: bool,
) -> Result<Vec<DynamicObject>, Error> {
let mut result = Vec::new();
for r in resources.iter() {
result.push(self.apply_dynamic(r, namespace, force_conflicts).await?);
}
Ok(result)
}
pub async fn apply_yaml_many(
&self,
#[allow(clippy::ptr_arg)] yaml: &Vec<serde_yaml::Value>,
ns: Option<&str>,
) -> Result<(), Error> {
for y in yaml.iter() {
self.apply_yaml(y, ns).await?;
}
Ok(())
}
pub async fn apply_yaml(
&self,
yaml: &serde_yaml::Value,
ns: Option<&str>,
) -> Result<(), Error> {
// NOTE wouldn't it be possible to parse this into a DynamicObject and simply call
// apply_dynamic instead of reimplementing api interactions?
let obj: DynamicObject =
serde_yaml::from_value(yaml.clone()).expect("YAML must deserialise to DynamicObject");
let name = obj.metadata.name.as_ref().expect("YAML must have a name");
let api_version = yaml["apiVersion"].as_str().expect("missing apiVersion");
let kind = yaml["kind"].as_str().expect("missing kind");
let mut it = api_version.splitn(2, '/');
let first = it.next().unwrap();
let (g, v) = match it.next() {
Some(second) => (first, second),
None => ("", first),
};
let api_resource = ApiResource::from_gvk(&GroupVersionKind::gvk(g, v, kind));
let namespace = ns.unwrap_or_else(|| {
obj.metadata
.namespace
.as_deref()
.expect("YAML must have a namespace when ns is not provided")
});
let api: Api<DynamicObject> =
Api::namespaced_with(self.client.clone(), namespace, &api_resource);
println!("Applying '{name}' in namespace '{namespace}'...");
let patch_params = PatchParams::apply(FIELD_MANAGER);
let result = api.patch(name, &patch_params, &Patch::Apply(&obj)).await?;
println!("Successfully applied '{}'.", result.name_any());
Ok(())
}
/// Equivalent to `kubectl apply -f <url>`.
pub async fn apply_url(&self, url: Url, ns: Option<&str>) -> Result<(), Error> {
let patch_params = PatchParams::apply(FIELD_MANAGER);
let discovery = self.discovery().await?;
let yaml = reqwest::get(url)
.await
.expect("Could not fetch URL")
.text()
.await
.expect("Could not read response body");
for doc in multidoc_deserialize(&yaml).expect("Failed to parse YAML from URL") {
let obj: DynamicObject =
serde_yaml::from_value(doc).expect("YAML document is not a valid object");
let namespace = obj.metadata.namespace.as_deref().or(ns);
let type_meta = obj.types.as_ref().expect("Object is missing TypeMeta");
let gvk =
GroupVersionKind::try_from(type_meta).expect("Object has invalid GroupVersionKind");
let name = obj.name_any();
if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) {
let api = get_dynamic_api(ar, caps, self.client.clone(), namespace, false);
trace!(
"Applying {}:\n{}",
gvk.kind,
serde_yaml::to_string(&obj).unwrap_or_default()
);
let data: Value = serde_json::to_value(&obj).expect("serialisation failed");
let _r = api.patch(&name, &patch_params, &Patch::Apply(data)).await?;
debug!("Applied {} '{name}'", gvk.kind);
} else {
warn!("Skipping document with unknown GVK: {gvk:?}");
}
}
Ok(())
}
/// Build a dynamic API client from a [`DynamicObject`]'s type metadata.
pub(crate) fn get_api_for_dynamic_object(
&self,
object: &DynamicObject,
ns: Option<&str>,
) -> Result<Api<DynamicObject>, Error> {
let ar = object
.types
.as_ref()
.and_then(|t| {
let parts: Vec<&str> = t.api_version.split('/').collect();
match parts.as_slice() {
[version] => Some(ApiResource::from_gvk(&GroupVersionKind::gvk(
"", version, &t.kind,
))),
[group, version] => Some(ApiResource::from_gvk(&GroupVersionKind::gvk(
group, version, &t.kind,
))),
_ => None,
}
})
.ok_or_else(|| {
Error::BuildRequest(kube::core::request::Error::Validation(format!(
"Invalid apiVersion in DynamicObject: {object:#?}"
)))
})?;
Ok(match ns {
Some(ns) => Api::namespaced_with(self.client.clone(), ns, &ar),
None => Api::default_namespaced_with(self.client.clone(), &ar),
})
}
}
// ── Free functions ───────────────────────────────────────────────────────────
pub(crate) fn get_dynamic_api(
resource: kube::api::ApiResource,
capabilities: kube::discovery::ApiCapabilities,
client: Client,
ns: Option<&str>,
all: bool,
) -> Api<DynamicObject> {
if capabilities.scope == Scope::Cluster || all {
Api::all_with(client, &resource)
} else if let Some(namespace) = ns {
Api::namespaced_with(client, namespace, &resource)
} else {
Api::default_namespaced_with(client, &resource)
}
}
pub(crate) fn multidoc_deserialize(
data: &str,
) -> Result<Vec<serde_yaml::Value>, serde_yaml::Error> {
use serde::Deserialize;
let mut docs = vec![];
for de in serde_yaml::Deserializer::from_str(data) {
docs.push(serde_yaml::Value::deserialize(de)?);
}
Ok(docs)
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod apply_tests {
use std::collections::BTreeMap;
use std::time::{SystemTime, UNIX_EPOCH};
use k8s_openapi::api::core::v1::ConfigMap;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::api::{DeleteParams, TypeMeta};
use super::*;
#[tokio::test]
#[ignore = "requires kubernetes cluster"]
async fn apply_creates_new_configmap() {
let client = K8sClient::try_default().await.unwrap();
let ns = "default";
let name = format!(
"test-cm-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
);
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some(name.clone()),
namespace: Some(ns.to_string()),
..Default::default()
},
data: Some(BTreeMap::from([("key1".to_string(), "value1".to_string())])),
..Default::default()
};
assert!(client.apply(&cm, Some(ns)).await.is_ok());
let api: Api<ConfigMap> = Api::namespaced(client.client.clone(), ns);
let _ = api.delete(&name, &DeleteParams::default()).await;
}
#[tokio::test]
#[ignore = "requires kubernetes cluster"]
async fn apply_is_idempotent() {
let client = K8sClient::try_default().await.unwrap();
let ns = "default";
let name = format!(
"test-idem-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
);
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some(name.clone()),
namespace: Some(ns.to_string()),
..Default::default()
},
data: Some(BTreeMap::from([("key".to_string(), "value".to_string())])),
..Default::default()
};
assert!(
client.apply(&cm, Some(ns)).await.is_ok(),
"first apply failed"
);
assert!(
client.apply(&cm, Some(ns)).await.is_ok(),
"second apply failed (not idempotent)"
);
let api: Api<ConfigMap> = Api::namespaced(client.client.clone(), ns);
let _ = api.delete(&name, &DeleteParams::default()).await;
}
#[tokio::test]
#[ignore = "requires kubernetes cluster"]
async fn apply_dynamic_creates_new_resource() {
let client = K8sClient::try_default().await.unwrap();
let ns = "default";
let name = format!(
"test-dyn-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
);
let obj = DynamicObject {
types: Some(TypeMeta {
api_version: "v1".to_string(),
kind: "ConfigMap".to_string(),
}),
metadata: ObjectMeta {
name: Some(name.clone()),
namespace: Some(ns.to_string()),
..Default::default()
},
data: serde_json::json!({}),
};
let result = client.apply_dynamic(&obj, Some(ns), false).await;
assert!(result.is_ok(), "apply_dynamic failed: {:?}", result.err());
let api: Api<ConfigMap> = Api::namespaced(client.client.clone(), ns);
let _ = api.delete(&name, &DeleteParams::default()).await;
}
}

View File

@@ -1,99 +0,0 @@
use std::sync::Arc;
use kube::config::{KubeConfigOptions, Kubeconfig};
use kube::{Client, Config, Discovery, Error};
use log::error;
use serde::Serialize;
use tokio::sync::OnceCell;
use crate::types::KubernetesDistribution;
// TODO not cool, should use a proper configuration mechanism
// cli arg, env var, config file
fn read_dry_run_from_env() -> bool {
std::env::var("DRY_RUN")
.map(|v| v == "true" || v == "1")
.unwrap_or(false)
}
#[derive(Clone)]
pub struct K8sClient {
pub(crate) client: Client,
/// When `true` no mutation is sent to the API server; diffs are printed
/// to stdout instead. Initialised from the `DRY_RUN` environment variable.
pub(crate) dry_run: bool,
pub(crate) k8s_distribution: Arc<OnceCell<KubernetesDistribution>>,
pub(crate) discovery: Arc<OnceCell<Discovery>>,
}
impl Serialize for K8sClient {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
todo!("K8sClient serialization is not meaningful; remove this impl if unused")
}
}
impl std::fmt::Debug for K8sClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"K8sClient {{ namespace: {}, dry_run: {} }}",
self.client.default_namespace(),
self.dry_run,
))
}
}
impl K8sClient {
/// Create a client, reading `DRY_RUN` from the environment.
pub fn new(client: Client) -> Self {
Self {
dry_run: read_dry_run_from_env(),
client,
k8s_distribution: Arc::new(OnceCell::new()),
discovery: Arc::new(OnceCell::new()),
}
}
/// Create a client that always operates in dry-run mode, regardless of
/// the environment variable.
pub fn new_dry_run(client: Client) -> Self {
Self {
dry_run: true,
..Self::new(client)
}
}
/// Returns `true` if this client is operating in dry-run mode.
pub fn is_dry_run(&self) -> bool {
self.dry_run
}
pub async fn try_default() -> Result<Self, Error> {
Ok(Self::new(Client::try_default().await?))
}
pub async fn from_kubeconfig(path: &str) -> Option<Self> {
Self::from_kubeconfig_with_opts(path, &KubeConfigOptions::default()).await
}
pub async fn from_kubeconfig_with_context(path: &str, context: Option<String>) -> Option<Self> {
let mut opts = KubeConfigOptions::default();
opts.context = context;
Self::from_kubeconfig_with_opts(path, &opts).await
}
pub async fn from_kubeconfig_with_opts(path: &str, opts: &KubeConfigOptions) -> Option<Self> {
let k = match Kubeconfig::read_from(path) {
Ok(k) => k,
Err(e) => {
error!("Failed to load kubeconfig from {path}: {e}");
return None;
}
};
Some(Self::new(
Client::try_from(Config::from_custom_kubeconfig(k, opts).await.unwrap()).unwrap(),
))
}
}

View File

@@ -1,83 +0,0 @@
use std::time::Duration;
use kube::{Discovery, Error};
use log::{debug, error, info, trace, warn};
use tokio::sync::Mutex;
use tokio_retry::{Retry, strategy::ExponentialBackoff};
use crate::client::K8sClient;
use crate::types::KubernetesDistribution;
impl K8sClient {
pub async fn get_apiserver_version(
&self,
) -> Result<k8s_openapi::apimachinery::pkg::version::Info, Error> {
self.client.clone().apiserver_version().await
}
/// Runs (and caches) Kubernetes API discovery with exponential-backoff retries.
pub async fn discovery(&self) -> Result<&Discovery, Error> {
let retry_strategy = ExponentialBackoff::from_millis(1000)
.max_delay(Duration::from_secs(32))
.take(6);
let attempt = Mutex::new(0u32);
Retry::spawn(retry_strategy, || async {
let mut n = attempt.lock().await;
*n += 1;
match self
.discovery
.get_or_try_init(async || {
debug!("Running Kubernetes API discovery (attempt {})", *n);
let d = Discovery::new(self.client.clone()).run().await?;
debug!("Kubernetes API discovery completed");
Ok(d)
})
.await
{
Ok(d) => Ok(d),
Err(e) => {
warn!("Kubernetes API discovery failed (attempt {}): {}", *n, e);
Err(e)
}
}
})
.await
.map_err(|e| {
error!("Kubernetes API discovery failed after all retries: {}", e);
e
})
}
/// Detect which Kubernetes distribution is running. Result is cached for
/// the lifetime of the client.
pub async fn get_k8s_distribution(&self) -> Result<KubernetesDistribution, Error> {
self.k8s_distribution
.get_or_try_init(async || {
debug!("Detecting Kubernetes distribution");
let api_groups = self.client.list_api_groups().await?;
trace!("list_api_groups: {:?}", api_groups);
let version = self.get_apiserver_version().await?;
if api_groups
.groups
.iter()
.any(|g| g.name == "project.openshift.io")
{
info!("Detected distribution: OpenshiftFamily");
return Ok(KubernetesDistribution::OpenshiftFamily);
}
if version.git_version.contains("k3s") {
info!("Detected distribution: K3sFamily");
return Ok(KubernetesDistribution::K3sFamily);
}
info!("Distribution not identified, using Default");
Ok(KubernetesDistribution::Default)
})
.await
.cloned()
}
}

View File

@@ -1,13 +0,0 @@
pub mod apply;
pub mod bundle;
pub mod client;
pub mod config;
pub mod discovery;
pub mod helper;
pub mod node;
pub mod pod;
pub mod resources;
pub mod types;
pub use client::K8sClient;
pub use types::{DrainOptions, KubernetesDistribution, NodeFile, ScopeResolver, WriteMode};

View File

@@ -1,3 +0,0 @@
fn main() {
println!("Hello, world!");
}

View File

@@ -1,722 +0,0 @@
use std::collections::BTreeMap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use k8s_openapi::api::core::v1::{
ConfigMap, ConfigMapVolumeSource, Node, Pod, Volume, VolumeMount,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::{
Error,
api::{Api, DeleteParams, EvictParams, ListParams, PostParams},
core::ErrorResponse,
error::DiscoveryError,
};
use log::{debug, error, info, warn};
use tokio::time::sleep;
use crate::client::K8sClient;
use crate::helper::{self, PrivilegedPodConfig};
use crate::types::{DrainOptions, NodeFile};
impl K8sClient {
pub async fn cordon_node(&self, node_name: &str) -> Result<(), Error> {
Api::<Node>::all(self.client.clone())
.cordon(node_name)
.await?;
Ok(())
}
pub async fn uncordon_node(&self, node_name: &str) -> Result<(), Error> {
Api::<Node>::all(self.client.clone())
.uncordon(node_name)
.await?;
Ok(())
}
pub async fn wait_for_node_ready(&self, node_name: &str) -> Result<(), Error> {
self.wait_for_node_ready_with_timeout(node_name, Duration::from_secs(600))
.await
}
async fn wait_for_node_ready_with_timeout(
&self,
node_name: &str,
timeout: Duration,
) -> Result<(), Error> {
let api: Api<Node> = Api::all(self.client.clone());
let start = tokio::time::Instant::now();
let poll = Duration::from_secs(5);
loop {
if start.elapsed() > timeout {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Node '{node_name}' did not become Ready within {timeout:?}"
))));
}
match api.get(node_name).await {
Ok(node) => {
if node
.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.map(|conds| {
conds
.iter()
.any(|c| c.type_ == "Ready" && c.status == "True")
})
.unwrap_or(false)
{
debug!("Node '{node_name}' is Ready");
return Ok(());
}
}
Err(e) => debug!("Error polling node '{node_name}': {e}"),
}
sleep(poll).await;
}
}
async fn wait_for_node_not_ready(
&self,
node_name: &str,
timeout: Duration,
) -> Result<(), Error> {
let api: Api<Node> = Api::all(self.client.clone());
let start = tokio::time::Instant::now();
let poll = Duration::from_secs(5);
loop {
if start.elapsed() > timeout {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Node '{node_name}' did not become NotReady within {timeout:?}"
))));
}
match api.get(node_name).await {
Ok(node) => {
let is_ready = node
.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.map(|conds| {
conds
.iter()
.any(|c| c.type_ == "Ready" && c.status == "True")
})
.unwrap_or(false);
if !is_ready {
debug!("Node '{node_name}' is NotReady");
return Ok(());
}
}
Err(e) => debug!("Error polling node '{node_name}': {e}"),
}
sleep(poll).await;
}
}
async fn list_pods_on_node(&self, node_name: &str) -> Result<Vec<Pod>, Error> {
let api: Api<Pod> = Api::all(self.client.clone());
Ok(api
.list(&ListParams::default().fields(&format!("spec.nodeName={node_name}")))
.await?
.items)
}
fn is_mirror_pod(pod: &Pod) -> bool {
pod.metadata
.annotations
.as_ref()
.map(|a| a.contains_key("kubernetes.io/config.mirror"))
.unwrap_or(false)
}
fn is_daemonset_pod(pod: &Pod) -> bool {
pod.metadata
.owner_references
.as_ref()
.map(|refs| refs.iter().any(|r| r.kind == "DaemonSet"))
.unwrap_or(false)
}
fn has_emptydir_volume(pod: &Pod) -> bool {
pod.spec
.as_ref()
.and_then(|s| s.volumes.as_ref())
.map(|vols| vols.iter().any(|v| v.empty_dir.is_some()))
.unwrap_or(false)
}
fn is_completed_pod(pod: &Pod) -> bool {
pod.status
.as_ref()
.and_then(|s| s.phase.as_deref())
.map(|phase| phase == "Succeeded" || phase == "Failed")
.unwrap_or(false)
}
fn classify_pods_for_drain(
pods: &[Pod],
options: &DrainOptions,
) -> Result<(Vec<Pod>, Vec<String>), String> {
let mut evictable = Vec::new();
let mut skipped = Vec::new();
let mut blocking = Vec::new();
for pod in pods {
let name = pod.metadata.name.as_deref().unwrap_or("<unknown>");
let ns = pod.metadata.namespace.as_deref().unwrap_or("<unknown>");
let qualified = format!("{ns}/{name}");
if Self::is_mirror_pod(pod) {
skipped.push(format!("{qualified} (mirror pod)"));
continue;
}
if Self::is_completed_pod(pod) {
skipped.push(format!("{qualified} (completed)"));
continue;
}
if Self::is_daemonset_pod(pod) {
if options.ignore_daemonsets {
skipped.push(format!("{qualified} (DaemonSet-managed)"));
} else {
blocking.push(format!(
"{qualified} is managed by a DaemonSet (set ignore_daemonsets to skip)"
));
}
continue;
}
if Self::has_emptydir_volume(pod) && !options.delete_emptydir_data {
blocking.push(format!(
"{qualified} uses emptyDir volumes (set delete_emptydir_data to allow eviction)"
));
continue;
}
evictable.push(pod.clone());
}
if !blocking.is_empty() {
return Err(format!(
"Cannot drain node — the following pods block eviction:\n - {}",
blocking.join("\n - ")
));
}
Ok((evictable, skipped))
}
async fn evict_pod(&self, pod: &Pod) -> Result<(), Error> {
let name = pod.metadata.name.as_deref().unwrap_or_default();
let ns = pod.metadata.namespace.as_deref().unwrap_or_default();
debug!("Evicting pod {ns}/{name}");
Api::<Pod>::namespaced(self.client.clone(), ns)
.evict(name, &EvictParams::default())
.await
.map(|_| ())
}
/// Drains a node: cordon → classify → evict & wait.
pub async fn drain_node(&self, node_name: &str, options: &DrainOptions) -> Result<(), Error> {
debug!("Cordoning '{node_name}'");
self.cordon_node(node_name).await?;
let pods = self.list_pods_on_node(node_name).await?;
debug!("Found {} pod(s) on '{node_name}'", pods.len());
let (evictable, skipped) =
Self::classify_pods_for_drain(&pods, options).map_err(|msg| {
error!("{msg}");
Error::Discovery(DiscoveryError::MissingResource(msg))
})?;
for s in &skipped {
info!("Skipping pod: {s}");
}
if evictable.is_empty() {
info!("No pods to evict on '{node_name}'");
return Ok(());
}
info!("Evicting {} pod(s) from '{node_name}'", evictable.len());
let mut start = tokio::time::Instant::now();
let poll = Duration::from_secs(5);
let mut pending = evictable;
loop {
for pod in &pending {
match self.evict_pod(pod).await {
Ok(()) => {}
Err(Error::Api(ErrorResponse { code: 404, .. })) => {}
Err(Error::Api(ErrorResponse { code: 429, .. })) => {
warn!(
"PDB blocked eviction of {}/{}; will retry",
pod.metadata.namespace.as_deref().unwrap_or(""),
pod.metadata.name.as_deref().unwrap_or("")
);
}
Err(e) => {
error!(
"Failed to evict {}/{}: {e}",
pod.metadata.namespace.as_deref().unwrap_or(""),
pod.metadata.name.as_deref().unwrap_or("")
);
return Err(e);
}
}
}
sleep(poll).await;
let mut still_present = Vec::new();
for pod in pending {
let ns = pod.metadata.namespace.as_deref().unwrap_or_default();
let name = pod.metadata.name.as_deref().unwrap_or_default();
match self.get_pod(name, Some(ns)).await? {
Some(_) => still_present.push(pod),
None => debug!("Pod {ns}/{name} evicted"),
}
}
pending = still_present;
if pending.is_empty() {
break;
}
if start.elapsed() > options.timeout {
match helper::prompt_drain_timeout_action(
node_name,
pending.len(),
options.timeout,
)? {
helper::DrainTimeoutAction::Accept => break,
helper::DrainTimeoutAction::Retry => {
start = tokio::time::Instant::now();
continue;
}
helper::DrainTimeoutAction::Abort => {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Drain aborted. {} pod(s) remaining on '{node_name}'",
pending.len()
))));
}
}
}
debug!("Waiting for {} pod(s) on '{node_name}'", pending.len());
}
debug!("'{node_name}' drained successfully");
Ok(())
}
/// Safely reboots a node: drain → reboot → wait for Ready → uncordon.
pub async fn reboot_node(
&self,
node_name: &str,
drain_options: &DrainOptions,
timeout: Duration,
) -> Result<(), Error> {
info!("Starting reboot for '{node_name}'");
let node_api: Api<Node> = Api::all(self.client.clone());
let boot_id_before = node_api
.get(node_name)
.await?
.status
.as_ref()
.and_then(|s| s.node_info.as_ref())
.map(|ni| ni.boot_id.clone())
.ok_or_else(|| {
Error::Discovery(DiscoveryError::MissingResource(format!(
"Node '{node_name}' has no boot_id in status"
)))
})?;
info!("Draining '{node_name}'");
self.drain_node(node_name, drain_options).await?;
let start = tokio::time::Instant::now();
info!("Scheduling reboot for '{node_name}'");
let reboot_cmd =
"echo rebooting ; nohup bash -c 'sleep 5 && nsenter -t 1 -m -- systemctl reboot'";
match self
.run_privileged_command_on_node(node_name, reboot_cmd)
.await
{
Ok(_) => debug!("Reboot command dispatched"),
Err(e) => debug!("Reboot command error (expected if node began shutdown): {e}"),
}
info!("Waiting for '{node_name}' to begin shutdown");
self.wait_for_node_not_ready(node_name, timeout.saturating_sub(start.elapsed()))
.await?;
if start.elapsed() > timeout {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Timeout during reboot of '{node_name}' (shutdown phase)"
))));
}
info!("Waiting for '{node_name}' to come back online");
self.wait_for_node_ready_with_timeout(node_name, timeout.saturating_sub(start.elapsed()))
.await?;
if start.elapsed() > timeout {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Timeout during reboot of '{node_name}' (ready phase)"
))));
}
let boot_id_after = node_api
.get(node_name)
.await?
.status
.as_ref()
.and_then(|s| s.node_info.as_ref())
.map(|ni| ni.boot_id.clone())
.ok_or_else(|| {
Error::Discovery(DiscoveryError::MissingResource(format!(
"Node '{node_name}' has no boot_id after reboot"
)))
})?;
if boot_id_before == boot_id_after {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Node '{node_name}' did not actually reboot (boot_id unchanged: {boot_id_before})"
))));
}
info!("'{node_name}' rebooted ({boot_id_before} → {boot_id_after})");
self.uncordon_node(node_name).await?;
info!("'{node_name}' reboot complete ({:?})", start.elapsed());
Ok(())
}
/// Write a set of files to a node's filesystem via a privileged ephemeral pod.
pub async fn write_files_to_node(
&self,
node_name: &str,
files: &[NodeFile],
) -> Result<String, Error> {
let ns = self.client.default_namespace();
let suffix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
let name = format!("harmony-k8s-writer-{suffix}");
debug!("Writing {} file(s) to '{node_name}'", files.len());
let mut data = BTreeMap::new();
let mut script = String::from("set -e\n");
for (i, file) in files.iter().enumerate() {
let key = format!("f{i}");
data.insert(key.clone(), file.content.clone());
script.push_str(&format!("mkdir -p \"$(dirname \"/host{}\")\"\n", file.path));
script.push_str(&format!("cp \"/payload/{key}\" \"/host{}\"\n", file.path));
script.push_str(&format!("chmod {:o} \"/host{}\"\n", file.mode, file.path));
}
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some(name.clone()),
namespace: Some(ns.to_string()),
..Default::default()
},
data: Some(data),
..Default::default()
};
let cm_api: Api<ConfigMap> = Api::namespaced(self.client.clone(), ns);
cm_api.create(&PostParams::default(), &cm).await?;
debug!("Created ConfigMap '{name}'");
let (host_vol, host_mount) = helper::host_root_volume();
let payload_vol = Volume {
name: "payload".to_string(),
config_map: Some(ConfigMapVolumeSource {
name: name.clone(),
..Default::default()
}),
..Default::default()
};
let payload_mount = VolumeMount {
name: "payload".to_string(),
mount_path: "/payload".to_string(),
..Default::default()
};
let bundle = helper::build_privileged_bundle(
PrivilegedPodConfig {
name: name.clone(),
namespace: ns.to_string(),
node_name: node_name.to_string(),
container_name: "writer".to_string(),
command: vec!["/bin/bash".to_string(), "-c".to_string(), script],
volumes: vec![payload_vol, host_vol],
volume_mounts: vec![payload_mount, host_mount],
host_pid: false,
host_network: false,
},
&self.get_k8s_distribution().await?,
);
bundle.apply(self).await?;
debug!("Created privileged pod bundle '{name}'");
let result = self.wait_for_pod_completion(&name, ns).await;
debug!("Cleaning up '{name}'");
let _ = bundle.delete(self).await;
let _ = cm_api.delete(&name, &DeleteParams::default()).await;
result
}
/// Run a privileged command on a node via an ephemeral pod.
pub async fn run_privileged_command_on_node(
&self,
node_name: &str,
command: &str,
) -> Result<String, Error> {
let namespace = self.client.default_namespace();
let suffix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
let name = format!("harmony-k8s-cmd-{suffix}");
debug!("Running privileged command on '{node_name}': {command}");
let (host_vol, host_mount) = helper::host_root_volume();
let bundle = helper::build_privileged_bundle(
PrivilegedPodConfig {
name: name.clone(),
namespace: namespace.to_string(),
node_name: node_name.to_string(),
container_name: "runner".to_string(),
command: vec![
"/bin/bash".to_string(),
"-c".to_string(),
command.to_string(),
],
volumes: vec![host_vol],
volume_mounts: vec![host_mount],
host_pid: true,
host_network: true,
},
&self.get_k8s_distribution().await?,
);
bundle.apply(self).await?;
debug!("Privileged pod '{name}' created");
let result = self.wait_for_pod_completion(&name, namespace).await;
debug!("Cleaning up '{name}'");
let _ = bundle.delete(self).await;
result
}
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use k8s_openapi::api::core::v1::{EmptyDirVolumeSource, PodSpec, PodStatus, Volume};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, OwnerReference};
use super::*;
fn base_pod(name: &str, ns: &str) -> Pod {
Pod {
metadata: ObjectMeta {
name: Some(name.to_string()),
namespace: Some(ns.to_string()),
..Default::default()
},
spec: Some(PodSpec::default()),
status: Some(PodStatus {
phase: Some("Running".to_string()),
..Default::default()
}),
}
}
fn mirror_pod(name: &str, ns: &str) -> Pod {
let mut pod = base_pod(name, ns);
pod.metadata.annotations = Some(std::collections::BTreeMap::from([(
"kubernetes.io/config.mirror".to_string(),
"abc123".to_string(),
)]));
pod
}
fn daemonset_pod(name: &str, ns: &str) -> Pod {
let mut pod = base_pod(name, ns);
pod.metadata.owner_references = Some(vec![OwnerReference {
api_version: "apps/v1".to_string(),
kind: "DaemonSet".to_string(),
name: "some-ds".to_string(),
uid: "uid-ds".to_string(),
..Default::default()
}]);
pod
}
fn emptydir_pod(name: &str, ns: &str) -> Pod {
let mut pod = base_pod(name, ns);
pod.spec = Some(PodSpec {
volumes: Some(vec![Volume {
name: "scratch".to_string(),
empty_dir: Some(EmptyDirVolumeSource::default()),
..Default::default()
}]),
..Default::default()
});
pod
}
fn completed_pod(name: &str, ns: &str, phase: &str) -> Pod {
let mut pod = base_pod(name, ns);
pod.status = Some(PodStatus {
phase: Some(phase.to_string()),
..Default::default()
});
pod
}
fn default_opts() -> DrainOptions {
DrainOptions::default()
}
// All test bodies are identical to the original — only the module path changed.
#[test]
fn empty_pod_list_returns_empty_vecs() {
let (e, s) = K8sClient::classify_pods_for_drain(&[], &default_opts()).unwrap();
assert!(e.is_empty());
assert!(s.is_empty());
}
#[test]
fn normal_pod_is_evictable() {
let pods = vec![base_pod("web", "default")];
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
assert_eq!(e.len(), 1);
assert!(s.is_empty());
}
#[test]
fn mirror_pod_is_skipped() {
let pods = vec![mirror_pod("kube-apiserver", "kube-system")];
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
assert!(e.is_empty());
assert!(s[0].contains("mirror pod"));
}
#[test]
fn completed_pods_are_skipped() {
for phase in ["Succeeded", "Failed"] {
let pods = vec![completed_pod("job", "batch", phase)];
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
assert!(e.is_empty());
assert!(s[0].contains("completed"));
}
}
#[test]
fn daemonset_skipped_when_ignored() {
let pods = vec![daemonset_pod("fluentd", "logging")];
let opts = DrainOptions {
ignore_daemonsets: true,
..default_opts()
};
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap();
assert!(e.is_empty());
assert!(s[0].contains("DaemonSet-managed"));
}
#[test]
fn daemonset_blocks_when_not_ignored() {
let pods = vec![daemonset_pod("fluentd", "logging")];
let opts = DrainOptions {
ignore_daemonsets: false,
..default_opts()
};
let err = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap_err();
assert!(err.contains("DaemonSet") && err.contains("logging/fluentd"));
}
#[test]
fn emptydir_blocks_without_flag() {
let pods = vec![emptydir_pod("cache", "default")];
let opts = DrainOptions {
delete_emptydir_data: false,
..default_opts()
};
let err = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap_err();
assert!(err.contains("emptyDir") && err.contains("default/cache"));
}
#[test]
fn emptydir_evictable_with_flag() {
let pods = vec![emptydir_pod("cache", "default")];
let opts = DrainOptions {
delete_emptydir_data: true,
..default_opts()
};
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap();
assert_eq!(e.len(), 1);
assert!(s.is_empty());
}
#[test]
fn multiple_blocking_all_reported() {
let pods = vec![daemonset_pod("ds", "ns1"), emptydir_pod("ed", "ns2")];
let opts = DrainOptions {
ignore_daemonsets: false,
delete_emptydir_data: false,
..default_opts()
};
let err = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap_err();
assert!(err.contains("ns1/ds") && err.contains("ns2/ed"));
}
#[test]
fn mixed_pods_classified_correctly() {
let pods = vec![
base_pod("web", "default"),
mirror_pod("kube-apiserver", "kube-system"),
daemonset_pod("fluentd", "logging"),
completed_pod("job", "batch", "Succeeded"),
base_pod("api", "default"),
];
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
let names: Vec<&str> = e
.iter()
.map(|p| p.metadata.name.as_deref().unwrap())
.collect();
assert_eq!(names, vec!["web", "api"]);
assert_eq!(s.len(), 3);
}
#[test]
fn mirror_checked_before_completed() {
let mut pod = mirror_pod("static-etcd", "kube-system");
pod.status = Some(PodStatus {
phase: Some("Succeeded".to_string()),
..Default::default()
});
let (_, s) = K8sClient::classify_pods_for_drain(&[pod], &default_opts()).unwrap();
assert!(s[0].contains("mirror pod"), "got: {}", s[0]);
}
#[test]
fn completed_checked_before_daemonset() {
let mut pod = daemonset_pod("collector", "monitoring");
pod.status = Some(PodStatus {
phase: Some("Failed".to_string()),
..Default::default()
});
let (_, s) = K8sClient::classify_pods_for_drain(&[pod], &default_opts()).unwrap();
assert!(s[0].contains("completed"), "got: {}", s[0]);
}
}

View File

@@ -1,193 +0,0 @@
use std::time::Duration;
use k8s_openapi::api::core::v1::Pod;
use kube::{
Error,
api::{Api, AttachParams, ListParams},
error::DiscoveryError,
runtime::reflector::Lookup,
};
use log::debug;
use tokio::io::AsyncReadExt;
use tokio::time::sleep;
use crate::client::K8sClient;
impl K8sClient {
pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result<Option<Pod>, Error> {
let api: Api<Pod> = match namespace {
Some(ns) => Api::namespaced(self.client.clone(), ns),
None => Api::default_namespaced(self.client.clone()),
};
api.get_opt(name).await
}
pub async fn wait_for_pod_ready(
&self,
pod_name: &str,
namespace: Option<&str>,
) -> Result<(), Error> {
let mut elapsed = 0u64;
let interval = 5u64;
let timeout_secs = 120u64;
loop {
if let Some(p) = self.get_pod(pod_name, namespace).await? {
if let Some(phase) = p.status.and_then(|s| s.phase) {
if phase.to_lowercase() == "running" {
return Ok(());
}
}
}
if elapsed >= timeout_secs {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Pod '{}' in '{}' did not become ready within {timeout_secs}s",
pod_name,
namespace.unwrap_or("<default>"),
))));
}
sleep(Duration::from_secs(interval)).await;
elapsed += interval;
}
}
/// Polls a pod until it reaches `Succeeded` or `Failed`, then returns its
/// logs. Used internally by node operations.
pub(crate) async fn wait_for_pod_completion(
&self,
name: &str,
namespace: &str,
) -> Result<String, Error> {
let api: Api<Pod> = Api::namespaced(self.client.clone(), namespace);
let poll_interval = Duration::from_secs(2);
for _ in 0..60 {
sleep(poll_interval).await;
let p = api.get(name).await?;
match p.status.and_then(|s| s.phase).as_deref() {
Some("Succeeded") => {
let logs = api
.logs(name, &Default::default())
.await
.unwrap_or_default();
debug!("Pod {namespace}/{name} succeeded. Logs: {logs}");
return Ok(logs);
}
Some("Failed") => {
let logs = api
.logs(name, &Default::default())
.await
.unwrap_or_default();
debug!("Pod {namespace}/{name} failed. Logs: {logs}");
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Pod '{name}' failed.\n{logs}"
))));
}
_ => {}
}
}
Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Timed out waiting for pod '{name}'"
))))
}
/// Execute a command in the first pod matching `{label}={name}`.
pub async fn exec_app_capture_output(
&self,
name: String,
label: String,
namespace: Option<&str>,
command: Vec<&str>,
) -> Result<String, String> {
let api: Api<Pod> = match namespace {
Some(ns) => Api::namespaced(self.client.clone(), ns),
None => Api::default_namespaced(self.client.clone()),
};
let pod_list = api
.list(&ListParams::default().labels(&format!("{label}={name}")))
.await
.expect("Failed to list pods");
let pod_name = pod_list
.items
.first()
.expect("No matching pod")
.name()
.expect("Pod has no name")
.into_owned();
match api
.exec(
&pod_name,
command,
&AttachParams::default().stdout(true).stderr(true),
)
.await
{
Err(e) => Err(e.to_string()),
Ok(mut process) => {
let status = process
.take_status()
.expect("No status handle")
.await
.expect("Status channel closed");
if let Some(s) = status.status {
let mut buf = String::new();
if let Some(mut stdout) = process.stdout() {
stdout
.read_to_string(&mut buf)
.await
.map_err(|e| format!("Failed to read stdout: {e}"))?;
}
debug!("exec status: {} - {:?}", s, status.details);
if s == "Success" { Ok(buf) } else { Err(s) }
} else {
Err("No inner status from pod exec".to_string())
}
}
}
}
/// Execute a command in the first pod matching
/// `app.kubernetes.io/name={name}`.
pub async fn exec_app(
&self,
name: String,
namespace: Option<&str>,
command: Vec<&str>,
) -> Result<(), String> {
let api: Api<Pod> = match namespace {
Some(ns) => Api::namespaced(self.client.clone(), ns),
None => Api::default_namespaced(self.client.clone()),
};
let pod_list = api
.list(&ListParams::default().labels(&format!("app.kubernetes.io/name={name}")))
.await
.expect("Failed to list pods");
let pod_name = pod_list
.items
.first()
.expect("No matching pod")
.name()
.expect("Pod has no name")
.into_owned();
match api.exec(&pod_name, command, &AttachParams::default()).await {
Err(e) => Err(e.to_string()),
Ok(mut process) => {
let status = process
.take_status()
.expect("No status handle")
.await
.expect("Status channel closed");
if let Some(s) = status.status {
debug!("exec status: {} - {:?}", s, status.details);
if s == "Success" { Ok(()) } else { Err(s) }
} else {
Err("No inner status from pod exec".to_string())
}
}
}
}
}

View File

@@ -1,316 +0,0 @@
use std::collections::HashMap;
use k8s_openapi::api::{
apps::v1::Deployment,
core::v1::{Node, ServiceAccount},
};
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use kube::api::ApiResource;
use kube::{
Error, Resource,
api::{Api, DynamicObject, GroupVersionKind, ListParams, ObjectList},
runtime::conditions,
runtime::wait::await_condition,
};
use log::debug;
use serde::de::DeserializeOwned;
use serde_json::Value;
use std::time::Duration;
use crate::client::K8sClient;
use crate::types::ScopeResolver;
impl K8sClient {
pub async fn has_healthy_deployment_with_label(
&self,
namespace: &str,
label_selector: &str,
) -> Result<bool, Error> {
let api: Api<Deployment> = Api::namespaced(self.client.clone(), namespace);
let list = api
.list(&ListParams::default().labels(label_selector))
.await?;
for d in list.items {
let available = d
.status
.as_ref()
.and_then(|s| s.available_replicas)
.unwrap_or(0);
if available > 0 {
return Ok(true);
}
if let Some(conds) = d.status.as_ref().and_then(|s| s.conditions.as_ref()) {
if conds
.iter()
.any(|c| c.type_ == "Available" && c.status == "True")
{
return Ok(true);
}
}
}
Ok(false)
}
pub async fn list_namespaces_with_healthy_deployments(
&self,
label_selector: &str,
) -> Result<Vec<String>, Error> {
let api: Api<Deployment> = Api::all(self.client.clone());
let list = api
.list(&ListParams::default().labels(label_selector))
.await?;
let mut healthy_ns: HashMap<String, bool> = HashMap::new();
for d in list.items {
let ns = match d.metadata.namespace.clone() {
Some(n) => n,
None => continue,
};
let available = d
.status
.as_ref()
.and_then(|s| s.available_replicas)
.unwrap_or(0);
let is_healthy = if available > 0 {
true
} else {
d.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.map(|c| {
c.iter()
.any(|c| c.type_ == "Available" && c.status == "True")
})
.unwrap_or(false)
};
if is_healthy {
healthy_ns.insert(ns, true);
}
}
Ok(healthy_ns.into_keys().collect())
}
pub async fn get_controller_service_account_name(
&self,
ns: &str,
) -> Result<Option<String>, Error> {
let api: Api<Deployment> = Api::namespaced(self.client.clone(), ns);
let list = api
.list(&ListParams::default().labels("app.kubernetes.io/component=controller"))
.await?;
if let Some(dep) = list.items.first() {
if let Some(sa) = dep
.spec
.as_ref()
.and_then(|s| s.template.spec.as_ref())
.and_then(|s| s.service_account_name.clone())
{
return Ok(Some(sa));
}
}
Ok(None)
}
pub async fn list_clusterrolebindings_json(&self) -> Result<Vec<Value>, Error> {
let gvk = GroupVersionKind::gvk("rbac.authorization.k8s.io", "v1", "ClusterRoleBinding");
let ar = ApiResource::from_gvk(&gvk);
let api: Api<DynamicObject> = Api::all_with(self.client.clone(), &ar);
let list = api.list(&ListParams::default()).await?;
Ok(list
.items
.into_iter()
.map(|o| serde_json::to_value(&o).unwrap_or(Value::Null))
.collect())
}
pub async fn is_service_account_cluster_wide(&self, sa: &str, ns: &str) -> Result<bool, Error> {
let sa_user = format!("system:serviceaccount:{ns}:{sa}");
for crb in self.list_clusterrolebindings_json().await? {
if let Some(subjects) = crb.get("subjects").and_then(|s| s.as_array()) {
for subj in subjects {
let kind = subj.get("kind").and_then(|v| v.as_str()).unwrap_or("");
let name = subj.get("name").and_then(|v| v.as_str()).unwrap_or("");
let subj_ns = subj.get("namespace").and_then(|v| v.as_str()).unwrap_or("");
if (kind == "ServiceAccount" && name == sa && subj_ns == ns)
|| (kind == "User" && name == sa_user)
{
return Ok(true);
}
}
}
}
Ok(false)
}
pub async fn has_crd(&self, name: &str) -> Result<bool, Error> {
let api: Api<CustomResourceDefinition> = Api::all(self.client.clone());
let crds = api
.list(&ListParams::default().fields(&format!("metadata.name={name}")))
.await?;
Ok(!crds.items.is_empty())
}
pub async fn service_account_api(&self, namespace: &str) -> Api<ServiceAccount> {
Api::namespaced(self.client.clone(), namespace)
}
pub async fn get_resource_json_value(
&self,
name: &str,
namespace: Option<&str>,
gvk: &GroupVersionKind,
) -> Result<DynamicObject, Error> {
let ar = ApiResource::from_gvk(gvk);
let api: Api<DynamicObject> = match namespace {
Some(ns) => Api::namespaced_with(self.client.clone(), ns, &ar),
None => Api::default_namespaced_with(self.client.clone(), &ar),
};
api.get(name).await
}
pub async fn get_secret_json_value(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<DynamicObject, Error> {
self.get_resource_json_value(
name,
namespace,
&GroupVersionKind {
group: String::new(),
version: "v1".to_string(),
kind: "Secret".to_string(),
},
)
.await
}
pub async fn get_deployment(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<Option<Deployment>, Error> {
let api: Api<Deployment> = match namespace {
Some(ns) => {
debug!("Getting namespaced deployment '{name}' in '{ns}'");
Api::namespaced(self.client.clone(), ns)
}
None => {
debug!("Getting deployment '{name}' in default namespace");
Api::default_namespaced(self.client.clone())
}
};
api.get_opt(name).await
}
pub async fn scale_deployment(
&self,
name: &str,
namespace: Option<&str>,
replicas: u32,
) -> Result<(), Error> {
let api: Api<Deployment> = match namespace {
Some(ns) => Api::namespaced(self.client.clone(), ns),
None => Api::default_namespaced(self.client.clone()),
};
use kube::api::{Patch, PatchParams};
use serde_json::json;
let patch = json!({ "spec": { "replicas": replicas } });
api.patch_scale(name, &PatchParams::default(), &Patch::Merge(&patch))
.await?;
Ok(())
}
pub async fn delete_deployment(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<(), Error> {
let api: Api<Deployment> = match namespace {
Some(ns) => Api::namespaced(self.client.clone(), ns),
None => Api::default_namespaced(self.client.clone()),
};
api.delete(name, &kube::api::DeleteParams::default())
.await?;
Ok(())
}
pub async fn wait_until_deployment_ready(
&self,
name: &str,
namespace: Option<&str>,
timeout: Option<Duration>,
) -> Result<(), String> {
let api: Api<Deployment> = match namespace {
Some(ns) => Api::namespaced(self.client.clone(), ns),
None => Api::default_namespaced(self.client.clone()),
};
let timeout = timeout.unwrap_or(Duration::from_secs(120));
let establish = await_condition(api, name, conditions::is_deployment_completed());
tokio::time::timeout(timeout, establish)
.await
.map(|_| ())
.map_err(|_| "Timed out waiting for deployment".to_string())
}
/// Gets a single named resource, using the correct API scope for `K`.
pub async fn get_resource<K>(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<Option<K>, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
<K as Resource>::Scope: ScopeResolver<K>,
<K as Resource>::DynamicType: Default,
{
let api: Api<K> =
<<K as Resource>::Scope as ScopeResolver<K>>::get_api(&self.client, namespace);
api.get_opt(name).await
}
pub async fn list_resources<K>(
&self,
namespace: Option<&str>,
list_params: Option<ListParams>,
) -> Result<ObjectList<K>, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
<K as Resource>::Scope: ScopeResolver<K>,
<K as Resource>::DynamicType: Default,
{
let api: Api<K> =
<<K as Resource>::Scope as ScopeResolver<K>>::get_api(&self.client, namespace);
api.list(&list_params.unwrap_or_default()).await
}
pub async fn list_all_resources_with_labels<K>(&self, labels: &str) -> Result<Vec<K>, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
<K as Resource>::DynamicType: Default,
{
Api::<K>::all(self.client.clone())
.list(&ListParams::default().labels(labels))
.await
.map(|l| l.items)
}
pub async fn get_all_resource_in_all_namespace<K>(&self) -> Result<Vec<K>, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
<K as Resource>::Scope: ScopeResolver<K>,
<K as Resource>::DynamicType: Default,
{
Api::<K>::all(self.client.clone())
.list(&Default::default())
.await
.map(|l| l.items)
}
pub async fn get_nodes(
&self,
list_params: Option<ListParams>,
) -> Result<ObjectList<Node>, Error> {
self.list_resources(None, list_params).await
}
}

View File

@@ -1,100 +0,0 @@
use std::time::Duration;
use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope};
use kube::{Api, Client, Resource};
use serde::Serialize;
/// Which Kubernetes distribution is running. Detected once at runtime via
/// [`crate::discovery::K8sClient::get_k8s_distribution`].
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub enum KubernetesDistribution {
Default,
OpenshiftFamily,
K3sFamily,
}
/// A file to be written to a node's filesystem.
#[derive(Debug, Clone)]
pub struct NodeFile {
/// Absolute path on the host where the file should be written.
pub path: String,
/// Content of the file.
pub content: String,
/// UNIX permissions (e.g. `0o600`).
pub mode: u32,
}
/// Options controlling the behaviour of a [`crate::K8sClient::drain_node`] operation.
#[derive(Debug, Clone)]
pub struct DrainOptions {
/// Evict pods that use `emptyDir` volumes (ephemeral data is lost).
/// Equivalent to `kubectl drain --delete-emptydir-data`.
pub delete_emptydir_data: bool,
/// Silently skip DaemonSet-managed pods instead of blocking the drain.
/// Equivalent to `kubectl drain --ignore-daemonsets`.
pub ignore_daemonsets: bool,
/// Maximum wall-clock time to wait for all evictions to complete.
pub timeout: Duration,
}
impl Default for DrainOptions {
fn default() -> Self {
Self {
delete_emptydir_data: false,
ignore_daemonsets: true,
timeout: Duration::from_secs(1),
}
}
}
impl DrainOptions {
pub fn default_ignore_daemonset_delete_emptydir_data() -> Self {
Self {
delete_emptydir_data: true,
ignore_daemonsets: true,
..Self::default()
}
}
}
/// Controls how [`crate::K8sClient::apply_with_strategy`] behaves when the
/// resource already exists (or does not).
pub enum WriteMode {
/// Server-side apply; create if absent, update if present (default).
CreateOrUpdate,
/// POST only; return an error if the resource already exists.
Create,
/// Server-side apply only; return an error if the resource does not exist.
Update,
}
// ── Scope resolution trait ───────────────────────────────────────────────────
/// Resolves the correct [`kube::Api`] for a resource type based on its scope
/// (cluster-wide vs. namespace-scoped).
pub trait ScopeResolver<K: Resource> {
fn get_api(client: &Client, ns: Option<&str>) -> Api<K>;
}
impl<K> ScopeResolver<K> for ClusterResourceScope
where
K: Resource<Scope = ClusterResourceScope>,
<K as Resource>::DynamicType: Default,
{
fn get_api(client: &Client, _ns: Option<&str>) -> Api<K> {
Api::all(client.clone())
}
}
impl<K> ScopeResolver<K> for NamespaceResourceScope
where
K: Resource<Scope = NamespaceResourceScope>,
<K as Resource>::DynamicType: Default,
{
fn get_api(client: &Client, ns: Option<&str>) -> Api<K> {
match ns {
Some(ns) => Api::namespaced(client.clone(), ns),
None => Api::default_namespaced(client.clone()),
}
}
}

View File

@@ -21,8 +21,6 @@ semver = "1.0.23"
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tokio-retry.workspace = true
tokio-util.workspace = true
derive-new.workspace = true
log.workspace = true
env_logger.workspace = true
@@ -33,7 +31,6 @@ opnsense-config-xml = { path = "../opnsense-config-xml" }
harmony_macros = { path = "../harmony_macros" }
harmony_types = { path = "../harmony_types" }
harmony_execution = { path = "../harmony_execution" }
harmony-k8s = { path = "../harmony-k8s" }
uuid.workspace = true
url.workspace = true
kube = { workspace = true, features = ["derive"] }
@@ -63,6 +60,7 @@ temp-dir = "0.1.14"
dyn-clone = "1.0.19"
similar.workspace = true
futures-util = "0.3.31"
tokio-util = "0.7.15"
strum = { version = "0.27.1", features = ["derive"] }
tempfile.workspace = true
serde_with = "3.14.0"
@@ -82,7 +80,7 @@ sqlx.workspace = true
inquire.workspace = true
brocade = { path = "../brocade" }
option-ext = "0.2.0"
rand.workspace = true
tokio-retry = "0.3.0"
[dev-dependencies]
pretty_assertions.workspace = true

View File

@@ -4,6 +4,8 @@ use std::error::Error;
use async_trait::async_trait;
use derive_new::new;
use crate::inventory::HostRole;
use super::{
data::Version, executors::ExecutorError, inventory::Inventory, topology::PreparationError,
};

View File

@@ -1,5 +1,4 @@
use async_trait::async_trait;
use harmony_k8s::K8sClient;
use harmony_macros::ip;
use harmony_types::{
id::Id,
@@ -9,7 +8,7 @@ use harmony_types::{
use log::debug;
use log::info;
use crate::topology::{HelmCommand, PxeOptions};
use crate::topology::PxeOptions;
use crate::{data::FileContent, executors::ExecutorError, topology::node_exporter::NodeExporter};
use crate::{infra::network_manager::OpenShiftNmStateNetworkManager, topology::PortConfig};
@@ -17,12 +16,9 @@ use super::{
DHCPStaticEntry, DhcpServer, DnsRecord, DnsRecordType, DnsServer, Firewall, HostNetworkConfig,
HttpServer, IpAddress, K8sclient, LoadBalancer, LoadBalancerService, LogicalHost, NetworkError,
NetworkManager, PreparationError, PreparationOutcome, Router, Switch, SwitchClient,
SwitchError, TftpServer, Topology,
};
use std::{
process::Command,
sync::{Arc, OnceLock},
SwitchError, TftpServer, Topology, k8s::K8sClient,
};
use std::sync::{Arc, OnceLock};
#[derive(Debug, Clone)]
pub struct HAClusterTopology {
@@ -56,30 +52,6 @@ impl Topology for HAClusterTopology {
}
}
impl HelmCommand for HAClusterTopology {
fn get_helm_command(&self) -> Command {
let mut cmd = Command::new("helm");
if let Some(k) = &self.kubeconfig {
cmd.args(["--kubeconfig", k]);
}
// FIXME we should support context anywhere there is a k8sclient
// This likely belongs in the k8sclient itself and should be extracted to a separate
// crate
//
// I feel like helm could very well be a feature of this external k8s client.
//
// Same for kustomize
//
// if let Some(c) = &self.k8s_context {
// cmd.args(["--kube-context", c]);
// }
info!("Using helm command {cmd:?}");
cmd
}
}
#[async_trait]
impl K8sclient for HAClusterTopology {
async fn k8s_client(&self) -> Result<Arc<K8sClient>, String> {

View File

@@ -25,9 +25,9 @@
//!
//! ## Example
//!
//! ```
//! use harmony_k8s::{K8sClient, helper};
//! use harmony_k8s::KubernetesDistribution;
//! ```rust,no_run
//! use harmony::topology::k8s::{K8sClient, helper};
//! use harmony::topology::KubernetesDistribution;
//!
//! async fn write_network_config(client: &K8sClient, node: &str) {
//! // Create a bundle with platform-specific RBAC
@@ -56,7 +56,7 @@ use kube::{Error, Resource, ResourceExt, api::DynamicObject};
use serde::Serialize;
use serde_json;
use crate::K8sClient;
use crate::domain::topology::k8s::K8sClient;
/// A ResourceBundle represents a logical unit of work consisting of multiple
/// Kubernetes resources that should be applied or deleted together.

View File

@@ -1,7 +1,7 @@
use std::collections::BTreeMap;
use std::time::Duration;
use crate::KubernetesDistribution;
use crate::topology::KubernetesDistribution;
use super::bundle::ResourceBundle;
use super::config::PRIVILEGED_POD_IMAGE;
@@ -10,10 +10,8 @@ use k8s_openapi::api::core::v1::{
};
use k8s_openapi::api::rbac::v1::{ClusterRoleBinding, RoleRef, Subject};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::api::DynamicObject;
use kube::error::DiscoveryError;
use log::{debug, error, info, warn};
use serde::de::DeserializeOwned;
#[derive(Debug)]
pub struct PrivilegedPodConfig {
@@ -133,9 +131,9 @@ pub fn host_root_volume() -> (Volume, VolumeMount) {
///
/// # Example
///
/// ```
/// use harmony_k8s::helper::{build_privileged_bundle, PrivilegedPodConfig};
/// use harmony_k8s::KubernetesDistribution;
/// ```rust,no_run
/// # use harmony::topology::k8s::helper::{build_privileged_bundle, PrivilegedPodConfig};
/// # use harmony::topology::KubernetesDistribution;
/// let bundle = build_privileged_bundle(
/// PrivilegedPodConfig {
/// name: "network-setup".to_string(),
@@ -281,16 +279,6 @@ pub fn prompt_drain_timeout_action(
}
}
/// JSON round-trip: DynamicObject → K
///
/// Safe because the DynamicObject was produced by the apiserver from a
/// payload that was originally serialized from K, so the schema is identical.
pub(crate) fn dyn_to_typed<K: DeserializeOwned>(obj: DynamicObject) -> Result<K, kube::Error> {
serde_json::to_value(obj)
.and_then(serde_json::from_value)
.map_err(kube::Error::SerdeError)
}
#[cfg(test)]
mod tests {
use super::*;

File diff suppressed because it is too large Load Diff

View File

@@ -1,14 +1,13 @@
use std::{collections::BTreeMap, process::Command, sync::Arc};
use std::{collections::BTreeMap, process::Command, sync::Arc, time::Duration};
use async_trait::async_trait;
use base64::{Engine, engine::general_purpose};
use harmony_k8s::{K8sClient, KubernetesDistribution};
use harmony_types::rfc1123::Rfc1123Name;
use k8s_openapi::api::{
core::v1::{Pod, Secret},
rbac::v1::{ClusterRoleBinding, RoleRef, Subject},
};
use kube::api::{GroupVersionKind, ObjectMeta};
use kube::api::{DynamicObject, GroupVersionKind, ObjectMeta};
use log::{debug, info, trace, warn};
use serde::Serialize;
use tokio::sync::OnceCell;
@@ -29,7 +28,28 @@ use crate::{
score_cert_management::CertificateManagementScore,
},
k3d::K3DInstallationScore,
k8s::ingress::{K8sIngressScore, PathType},
monitoring::{
grafana::{grafana::Grafana, helm::helm_grafana::grafana_helm_chart_score},
kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus,
crd_grafana::{
Grafana as GrafanaCRD, GrafanaCom, GrafanaDashboard,
GrafanaDashboardDatasource, GrafanaDashboardSpec, GrafanaDatasource,
GrafanaDatasourceConfig, GrafanaDatasourceJsonData,
GrafanaDatasourceSecureJsonData, GrafanaDatasourceSpec, GrafanaSpec,
},
crd_prometheuses::LabelSelector,
prometheus_operator::prometheus_operator_helm_chart_score,
rhob_alertmanager_config::RHOBObservability,
service_monitor::ServiceMonitor,
},
},
okd::{crd::ingresses_config::Ingress as IngressResource, route::OKDTlsPassthroughScore},
prometheus::{
k8s_prometheus_alerting_score::K8sPrometheusCRDAlertingScore,
prometheus::PrometheusMonitoring, rhob_alerting_score::RHOBAlertingScore,
},
},
score::Score,
topology::{TlsRoute, TlsRouter, ingress::Ingress},
@@ -38,6 +58,8 @@ use crate::{
use super::super::{
DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, PreparationError,
PreparationOutcome, Topology,
k8s::K8sClient,
oberservability::monitoring::AlertReceiver,
tenant::{
TenantConfig, TenantManager,
k8s::K8sTenantManager,
@@ -54,6 +76,13 @@ struct K8sState {
message: String,
}
#[derive(Debug, Clone, Serialize)]
pub enum KubernetesDistribution {
OpenshiftFamily,
K3sFamily,
Default,
}
#[derive(Debug, Clone)]
enum K8sSource {
LocalK3d,
@@ -144,6 +173,216 @@ impl TlsRouter for K8sAnywhereTopology {
}
}
#[async_trait]
impl Grafana for K8sAnywhereTopology {
async fn ensure_grafana_operator(
&self,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
debug!("ensure grafana operator");
let client = self.k8s_client().await.unwrap();
let grafana_gvk = GroupVersionKind {
group: "grafana.integreatly.org".to_string(),
version: "v1beta1".to_string(),
kind: "Grafana".to_string(),
};
let name = "grafanas.grafana.integreatly.org";
let ns = "grafana";
let grafana_crd = client
.get_resource_json_value(name, Some(ns), &grafana_gvk)
.await;
match grafana_crd {
Ok(_) => {
return Ok(PreparationOutcome::Success {
details: "Found grafana CRDs in cluster".to_string(),
});
}
Err(_) => {
return self
.install_grafana_operator(inventory, Some("grafana"))
.await;
}
};
}
async fn install_grafana(&self) -> Result<PreparationOutcome, PreparationError> {
let ns = "grafana";
let mut label = BTreeMap::new();
label.insert("dashboards".to_string(), "grafana".to_string());
let label_selector = LabelSelector {
match_labels: label.clone(),
match_expressions: vec![],
};
let client = self.k8s_client().await?;
let grafana = self.build_grafana(ns, &label);
client.apply(&grafana, Some(ns)).await?;
//TODO change this to a ensure ready or something better than just a timeout
client
.wait_until_deployment_ready(
"grafana-grafana-deployment",
Some("grafana"),
Some(Duration::from_secs(30)),
)
.await?;
let sa_name = "grafana-grafana-sa";
let token_secret_name = "grafana-sa-token-secret";
let sa_token_secret = self.build_sa_token_secret(token_secret_name, sa_name, ns);
client.apply(&sa_token_secret, Some(ns)).await?;
let secret_gvk = GroupVersionKind {
group: "".to_string(),
version: "v1".to_string(),
kind: "Secret".to_string(),
};
let secret = client
.get_resource_json_value(token_secret_name, Some(ns), &secret_gvk)
.await?;
let token = format!(
"Bearer {}",
self.extract_and_normalize_token(&secret).unwrap()
);
debug!("creating grafana clusterrole binding");
let clusterrolebinding =
self.build_cluster_rolebinding(sa_name, "cluster-monitoring-view", ns);
client.apply(&clusterrolebinding, Some(ns)).await?;
debug!("creating grafana datasource crd");
let thanos_url = format!(
"https://{}",
self.get_domain("thanos-querier-openshift-monitoring")
.await
.unwrap()
);
let thanos_openshift_datasource = self.build_grafana_datasource(
"thanos-openshift-monitoring",
ns,
&label_selector,
&thanos_url,
&token,
);
client.apply(&thanos_openshift_datasource, Some(ns)).await?;
debug!("creating grafana dashboard crd");
let dashboard = self.build_grafana_dashboard(ns, &label_selector);
client.apply(&dashboard, Some(ns)).await?;
debug!("creating grafana ingress");
let grafana_ingress = self.build_grafana_ingress(ns).await;
grafana_ingress
.interpret(&Inventory::empty(), self)
.await
.map_err(|e| PreparationError::new(e.to_string()))?;
Ok(PreparationOutcome::Success {
details: "Installed grafana composants".to_string(),
})
}
}
#[async_trait]
impl PrometheusMonitoring<CRDPrometheus> for K8sAnywhereTopology {
async fn install_prometheus(
&self,
sender: &CRDPrometheus,
_inventory: &Inventory,
_receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>,
) -> Result<PreparationOutcome, PreparationError> {
let client = self.k8s_client().await?;
for monitor in sender.service_monitor.iter() {
client
.apply(monitor, Some(&sender.namespace))
.await
.map_err(|e| PreparationError::new(e.to_string()))?;
}
Ok(PreparationOutcome::Success {
details: "successfuly installed prometheus components".to_string(),
})
}
async fn ensure_prometheus_operator(
&self,
sender: &CRDPrometheus,
_inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
let po_result = self.ensure_prometheus_operator(sender).await?;
match po_result {
PreparationOutcome::Success { details: _ } => {
debug!("Detected prometheus crds operator present in cluster.");
return Ok(po_result);
}
PreparationOutcome::Noop => {
debug!("Skipping Prometheus CR installation due to missing operator.");
return Ok(po_result);
}
}
}
}
#[async_trait]
impl PrometheusMonitoring<RHOBObservability> for K8sAnywhereTopology {
async fn install_prometheus(
&self,
sender: &RHOBObservability,
inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<RHOBObservability>>>>,
) -> Result<PreparationOutcome, PreparationError> {
let po_result = self.ensure_cluster_observability_operator(sender).await?;
if po_result == PreparationOutcome::Noop {
debug!("Skipping Prometheus CR installation due to missing operator.");
return Ok(po_result);
}
let result = self
.get_cluster_observability_operator_prometheus_application_score(
sender.clone(),
receivers,
)
.await
.interpret(inventory, self)
.await;
match result {
Ok(outcome) => match outcome.status {
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
details: outcome.message,
}),
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
_ => Err(PreparationError::new(outcome.message)),
},
Err(err) => Err(PreparationError::new(err.to_string())),
}
}
async fn ensure_prometheus_operator(
&self,
sender: &RHOBObservability,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
todo!()
}
}
impl Serialize for K8sAnywhereTopology {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
@@ -348,6 +587,23 @@ impl K8sAnywhereTopology {
}
}
fn extract_and_normalize_token(&self, secret: &DynamicObject) -> Option<String> {
let token_b64 = secret
.data
.get("token")
.or_else(|| secret.data.get("data").and_then(|d| d.get("token")))
.and_then(|v| v.as_str())?;
let bytes = general_purpose::STANDARD.decode(token_b64).ok()?;
let s = String::from_utf8(bytes).ok()?;
let cleaned = s
.trim_matches(|c: char| c.is_whitespace() || c == '\0')
.to_string();
Some(cleaned)
}
pub async fn get_k8s_distribution(&self) -> Result<KubernetesDistribution, PreparationError> {
self.k8s_client()
.await?
@@ -407,6 +663,141 @@ impl K8sAnywhereTopology {
}
}
fn build_grafana_datasource(
&self,
name: &str,
ns: &str,
label_selector: &LabelSelector,
url: &str,
token: &str,
) -> GrafanaDatasource {
let mut json_data = BTreeMap::new();
json_data.insert("timeInterval".to_string(), "5s".to_string());
GrafanaDatasource {
metadata: ObjectMeta {
name: Some(name.to_string()),
namespace: Some(ns.to_string()),
..Default::default()
},
spec: GrafanaDatasourceSpec {
instance_selector: label_selector.clone(),
allow_cross_namespace_import: Some(true),
values_from: None,
datasource: GrafanaDatasourceConfig {
access: "proxy".to_string(),
name: name.to_string(),
r#type: "prometheus".to_string(),
url: url.to_string(),
database: None,
json_data: Some(GrafanaDatasourceJsonData {
time_interval: Some("60s".to_string()),
http_header_name1: Some("Authorization".to_string()),
tls_skip_verify: Some(true),
oauth_pass_thru: Some(true),
}),
secure_json_data: Some(GrafanaDatasourceSecureJsonData {
http_header_value1: Some(format!("Bearer {token}")),
}),
is_default: Some(false),
editable: Some(true),
},
},
}
}
fn build_grafana_dashboard(
&self,
ns: &str,
label_selector: &LabelSelector,
) -> GrafanaDashboard {
let graf_dashboard = GrafanaDashboard {
metadata: ObjectMeta {
name: Some(format!("grafana-dashboard-{}", ns)),
namespace: Some(ns.to_string()),
..Default::default()
},
spec: GrafanaDashboardSpec {
resync_period: Some("30s".to_string()),
instance_selector: label_selector.clone(),
datasources: Some(vec![GrafanaDashboardDatasource {
input_name: "DS_PROMETHEUS".to_string(),
datasource_name: "thanos-openshift-monitoring".to_string(),
}]),
json: None,
grafana_com: Some(GrafanaCom {
id: 17406,
revision: None,
}),
},
};
graf_dashboard
}
fn build_grafana(&self, ns: &str, labels: &BTreeMap<String, String>) -> GrafanaCRD {
let grafana = GrafanaCRD {
metadata: ObjectMeta {
name: Some(format!("grafana-{}", ns)),
namespace: Some(ns.to_string()),
labels: Some(labels.clone()),
..Default::default()
},
spec: GrafanaSpec {
config: None,
admin_user: None,
admin_password: None,
ingress: None,
persistence: None,
resources: None,
},
};
grafana
}
async fn build_grafana_ingress(&self, ns: &str) -> K8sIngressScore {
let domain = self.get_domain(&format!("grafana-{}", ns)).await.unwrap();
let name = format!("{}-grafana", ns);
let backend_service = format!("grafana-{}-service", ns);
K8sIngressScore {
name: fqdn::fqdn!(&name),
host: fqdn::fqdn!(&domain),
backend_service: fqdn::fqdn!(&backend_service),
port: 3000,
path: Some("/".to_string()),
path_type: Some(PathType::Prefix),
namespace: Some(fqdn::fqdn!(&ns)),
ingress_class_name: Some("openshift-default".to_string()),
}
}
async fn get_cluster_observability_operator_prometheus_application_score(
&self,
sender: RHOBObservability,
receivers: Option<Vec<Box<dyn AlertReceiver<RHOBObservability>>>>,
) -> RHOBAlertingScore {
RHOBAlertingScore {
sender,
receivers: receivers.unwrap_or_default(),
service_monitors: vec![],
prometheus_rules: vec![],
}
}
async fn get_k8s_prometheus_application_score(
&self,
sender: CRDPrometheus,
receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>,
service_monitors: Option<Vec<ServiceMonitor>>,
) -> K8sPrometheusCRDAlertingScore {
return K8sPrometheusCRDAlertingScore {
sender,
receivers: receivers.unwrap_or_default(),
service_monitors: service_monitors.unwrap_or_default(),
prometheus_rules: vec![],
};
}
async fn openshift_ingress_operator_available(&self) -> Result<(), PreparationError> {
let client = self.k8s_client().await?;
let gvk = GroupVersionKind {
@@ -572,6 +963,137 @@ impl K8sAnywhereTopology {
)),
}
}
async fn ensure_cluster_observability_operator(
&self,
sender: &RHOBObservability,
) -> Result<PreparationOutcome, PreparationError> {
let status = Command::new("sh")
.args(["-c", "kubectl get crd -A | grep -i rhobs"])
.status()
.map_err(|e| PreparationError::new(format!("could not connect to cluster: {}", e)))?;
if !status.success() {
if let Some(Some(k8s_state)) = self.k8s_state.get() {
match k8s_state.source {
K8sSource::LocalK3d => {
warn!(
"Installing observability operator is not supported on LocalK3d source"
);
return Ok(PreparationOutcome::Noop);
debug!("installing cluster observability operator");
todo!();
let op_score =
prometheus_operator_helm_chart_score(sender.namespace.clone());
let result = op_score.interpret(&Inventory::empty(), self).await;
return match result {
Ok(outcome) => match outcome.status {
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
details: "installed cluster observability operator".into(),
}),
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
_ => Err(PreparationError::new(
"failed to install cluster observability operator (unknown error)".into(),
)),
},
Err(err) => Err(PreparationError::new(err.to_string())),
};
}
K8sSource::Kubeconfig => {
debug!(
"unable to install cluster observability operator, contact cluster admin"
);
return Ok(PreparationOutcome::Noop);
}
}
} else {
warn!(
"Unable to detect k8s_state. Skipping Cluster Observability Operator install."
);
return Ok(PreparationOutcome::Noop);
}
}
debug!("Cluster Observability Operator is already present, skipping install");
Ok(PreparationOutcome::Success {
details: "cluster observability operator present in cluster".into(),
})
}
async fn ensure_prometheus_operator(
&self,
sender: &CRDPrometheus,
) -> Result<PreparationOutcome, PreparationError> {
let status = Command::new("sh")
.args(["-c", "kubectl get crd -A | grep -i prometheuses"])
.status()
.map_err(|e| PreparationError::new(format!("could not connect to cluster: {}", e)))?;
if !status.success() {
if let Some(Some(k8s_state)) = self.k8s_state.get() {
match k8s_state.source {
K8sSource::LocalK3d => {
debug!("installing prometheus operator");
let op_score =
prometheus_operator_helm_chart_score(sender.namespace.clone());
let result = op_score.interpret(&Inventory::empty(), self).await;
return match result {
Ok(outcome) => match outcome.status {
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
details: "installed prometheus operator".into(),
}),
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
_ => Err(PreparationError::new(
"failed to install prometheus operator (unknown error)".into(),
)),
},
Err(err) => Err(PreparationError::new(err.to_string())),
};
}
K8sSource::Kubeconfig => {
debug!("unable to install prometheus operator, contact cluster admin");
return Ok(PreparationOutcome::Noop);
}
}
} else {
warn!("Unable to detect k8s_state. Skipping Prometheus Operator install.");
return Ok(PreparationOutcome::Noop);
}
}
debug!("Prometheus operator is already present, skipping install");
Ok(PreparationOutcome::Success {
details: "prometheus operator present in cluster".into(),
})
}
async fn install_grafana_operator(
&self,
inventory: &Inventory,
ns: Option<&str>,
) -> Result<PreparationOutcome, PreparationError> {
let namespace = ns.unwrap_or("grafana");
info!("installing grafana operator in ns {namespace}");
let tenant = self.get_k8s_tenant_manager()?.get_tenant_config().await;
let mut namespace_scope = false;
if tenant.is_some() {
namespace_scope = true;
}
let _grafana_operator_score = grafana_helm_chart_score(namespace, namespace_scope)
.interpret(inventory, self)
.await
.map_err(|e| PreparationError::new(e.to_string()));
Ok(PreparationOutcome::Success {
details: format!(
"Successfully installed grafana operator in ns {}",
ns.unwrap()
),
})
}
}
#[derive(Clone, Debug)]

View File

@@ -1,5 +1,4 @@
mod k8s_anywhere;
pub mod nats;
pub mod observability;
mod postgres;
pub use k8s_anywhere::*;

View File

@@ -1,147 +0,0 @@
use async_trait::async_trait;
use crate::{
inventory::Inventory,
modules::monitoring::grafana::{
grafana::Grafana,
k8s::{
score_ensure_grafana_ready::GrafanaK8sEnsureReadyScore,
score_grafana_alert_receiver::GrafanaK8sReceiverScore,
score_grafana_datasource::GrafanaK8sDatasourceScore,
score_grafana_rule::GrafanaK8sRuleScore, score_install_grafana::GrafanaK8sInstallScore,
},
},
score::Score,
topology::{
K8sAnywhereTopology, PreparationError, PreparationOutcome,
monitoring::{AlertReceiver, AlertRule, Observability, ScrapeTarget},
},
};
#[async_trait]
impl Observability<Grafana> for K8sAnywhereTopology {
async fn install_alert_sender(
&self,
sender: &Grafana,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
let score = GrafanaK8sInstallScore {
sender: sender.clone(),
};
score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(format!("Grafana not installed {}", e)))?;
Ok(PreparationOutcome::Success {
details: "Successfully installed grafana alert sender".to_string(),
})
}
async fn install_receivers(
&self,
sender: &Grafana,
inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<Grafana>>>>,
) -> Result<PreparationOutcome, PreparationError> {
let receivers = match receivers {
Some(r) if !r.is_empty() => r,
_ => return Ok(PreparationOutcome::Noop),
};
for receiver in receivers {
let score = GrafanaK8sReceiverScore {
receiver,
sender: sender.clone(),
};
score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(format!("Failed to install receiver: {}", e)))?;
}
Ok(PreparationOutcome::Success {
details: "All alert receivers installed successfully".to_string(),
})
}
async fn install_rules(
&self,
sender: &Grafana,
inventory: &Inventory,
rules: Option<Vec<Box<dyn AlertRule<Grafana>>>>,
) -> Result<PreparationOutcome, PreparationError> {
let rules = match rules {
Some(r) if !r.is_empty() => r,
_ => return Ok(PreparationOutcome::Noop),
};
for rule in rules {
let score = GrafanaK8sRuleScore {
sender: sender.clone(),
rule,
};
score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(format!("Failed to install rule: {}", e)))?;
}
Ok(PreparationOutcome::Success {
details: "All alert rules installed successfully".to_string(),
})
}
async fn add_scrape_targets(
&self,
sender: &Grafana,
inventory: &Inventory,
scrape_targets: Option<Vec<Box<dyn ScrapeTarget<Grafana>>>>,
) -> Result<PreparationOutcome, PreparationError> {
let scrape_targets = match scrape_targets {
Some(r) if !r.is_empty() => r,
_ => return Ok(PreparationOutcome::Noop),
};
for scrape_target in scrape_targets {
let score = GrafanaK8sDatasourceScore {
scrape_target,
sender: sender.clone(),
};
score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(format!("Failed to add DataSource: {}", e)))?;
}
Ok(PreparationOutcome::Success {
details: "All datasources installed successfully".to_string(),
})
}
async fn ensure_monitoring_installed(
&self,
sender: &Grafana,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
let score = GrafanaK8sEnsureReadyScore {
sender: sender.clone(),
};
score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(format!("Grafana not ready {}", e)))?;
Ok(PreparationOutcome::Success {
details: "Grafana Ready".to_string(),
})
}
}

View File

@@ -1,142 +0,0 @@
use async_trait::async_trait;
use crate::{
inventory::Inventory,
modules::monitoring::kube_prometheus::{
KubePrometheus, helm::kube_prometheus_helm_chart::kube_prometheus_helm_chart_score,
score_kube_prometheus_alert_receivers::KubePrometheusReceiverScore,
score_kube_prometheus_ensure_ready::KubePrometheusEnsureReadyScore,
score_kube_prometheus_rule::KubePrometheusRuleScore,
score_kube_prometheus_scrape_target::KubePrometheusScrapeTargetScore,
},
score::Score,
topology::{
K8sAnywhereTopology, PreparationError, PreparationOutcome,
monitoring::{AlertReceiver, AlertRule, Observability, ScrapeTarget},
},
};
#[async_trait]
impl Observability<KubePrometheus> for K8sAnywhereTopology {
async fn install_alert_sender(
&self,
sender: &KubePrometheus,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
kube_prometheus_helm_chart_score(sender.config.clone())
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(e.to_string()))?;
Ok(PreparationOutcome::Success {
details: "Successfully installed kubeprometheus alert sender".to_string(),
})
}
async fn install_receivers(
&self,
sender: &KubePrometheus,
inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<KubePrometheus>>>>,
) -> Result<PreparationOutcome, PreparationError> {
let receivers = match receivers {
Some(r) if !r.is_empty() => r,
_ => return Ok(PreparationOutcome::Noop),
};
for receiver in receivers {
let score = KubePrometheusReceiverScore {
receiver,
sender: sender.clone(),
};
score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(format!("Failed to install receiver: {}", e)))?;
}
Ok(PreparationOutcome::Success {
details: "All alert receivers installed successfully".to_string(),
})
}
async fn install_rules(
&self,
sender: &KubePrometheus,
inventory: &Inventory,
rules: Option<Vec<Box<dyn AlertRule<KubePrometheus>>>>,
) -> Result<PreparationOutcome, PreparationError> {
let rules = match rules {
Some(r) if !r.is_empty() => r,
_ => return Ok(PreparationOutcome::Noop),
};
for rule in rules {
let score = KubePrometheusRuleScore {
sender: sender.clone(),
rule,
};
score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(format!("Failed to install rule: {}", e)))?;
}
Ok(PreparationOutcome::Success {
details: "All alert rules installed successfully".to_string(),
})
}
async fn add_scrape_targets(
&self,
sender: &KubePrometheus,
inventory: &Inventory,
scrape_targets: Option<Vec<Box<dyn ScrapeTarget<KubePrometheus>>>>,
) -> Result<PreparationOutcome, PreparationError> {
let scrape_targets = match scrape_targets {
Some(r) if !r.is_empty() => r,
_ => return Ok(PreparationOutcome::Noop),
};
for scrape_target in scrape_targets {
let score = KubePrometheusScrapeTargetScore {
scrape_target,
sender: sender.clone(),
};
score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(format!("Failed to install rule: {}", e)))?;
}
Ok(PreparationOutcome::Success {
details: "All scrap targets installed successfully".to_string(),
})
}
async fn ensure_monitoring_installed(
&self,
sender: &KubePrometheus,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
let score = KubePrometheusEnsureReadyScore {
sender: sender.clone(),
};
score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(format!("KubePrometheus not ready {}", e)))?;
Ok(PreparationOutcome::Success {
details: "KubePrometheus Ready".to_string(),
})
}
}

View File

@@ -1,5 +0,0 @@
pub mod grafana;
pub mod kube_prometheus;
pub mod openshift_monitoring;
pub mod prometheus;
pub mod redhat_cluster_observability;

View File

@@ -1,142 +0,0 @@
use async_trait::async_trait;
use log::info;
use crate::score::Score;
use crate::{
inventory::Inventory,
modules::monitoring::okd::{
OpenshiftClusterAlertSender,
score_enable_cluster_monitoring::OpenshiftEnableClusterMonitoringScore,
score_openshift_alert_rule::OpenshiftAlertRuleScore,
score_openshift_receiver::OpenshiftReceiverScore,
score_openshift_scrape_target::OpenshiftScrapeTargetScore,
score_user_workload::OpenshiftUserWorkloadMonitoring,
score_verify_user_workload_monitoring::VerifyUserWorkload,
},
topology::{
K8sAnywhereTopology, PreparationError, PreparationOutcome,
monitoring::{AlertReceiver, AlertRule, Observability, ScrapeTarget},
},
};
#[async_trait]
impl Observability<OpenshiftClusterAlertSender> for K8sAnywhereTopology {
async fn install_alert_sender(
&self,
_sender: &OpenshiftClusterAlertSender,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
info!("enabling cluster monitoring");
let cluster_monitoring_score = OpenshiftEnableClusterMonitoringScore {};
cluster_monitoring_score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError { msg: e.to_string() })?;
info!("enabling user workload monitoring");
let user_workload_score = OpenshiftUserWorkloadMonitoring {};
user_workload_score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError { msg: e.to_string() })?;
Ok(PreparationOutcome::Success {
details: "Successfully configured cluster monitoring".to_string(),
})
}
async fn install_receivers(
&self,
_sender: &OpenshiftClusterAlertSender,
inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<OpenshiftClusterAlertSender>>>>,
) -> Result<PreparationOutcome, PreparationError> {
if let Some(receivers) = receivers {
for receiver in receivers {
info!("Installing receiver {}", receiver.name());
let receiver_score = OpenshiftReceiverScore { receiver };
receiver_score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError { msg: e.to_string() })?;
}
Ok(PreparationOutcome::Success {
details: "Successfully installed receivers for OpenshiftClusterMonitoring"
.to_string(),
})
} else {
Ok(PreparationOutcome::Noop)
}
}
async fn install_rules(
&self,
_sender: &OpenshiftClusterAlertSender,
inventory: &Inventory,
rules: Option<Vec<Box<dyn AlertRule<OpenshiftClusterAlertSender>>>>,
) -> Result<PreparationOutcome, PreparationError> {
if let Some(rules) = rules {
for rule in rules {
info!("Installing rule ");
let rule_score = OpenshiftAlertRuleScore { rule: rule };
rule_score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError { msg: e.to_string() })?;
}
Ok(PreparationOutcome::Success {
details: "Successfully installed rules for OpenshiftClusterMonitoring".to_string(),
})
} else {
Ok(PreparationOutcome::Noop)
}
}
async fn add_scrape_targets(
&self,
_sender: &OpenshiftClusterAlertSender,
inventory: &Inventory,
scrape_targets: Option<Vec<Box<dyn ScrapeTarget<OpenshiftClusterAlertSender>>>>,
) -> Result<PreparationOutcome, PreparationError> {
if let Some(scrape_targets) = scrape_targets {
for scrape_target in scrape_targets {
info!("Installing scrape target");
let scrape_target_score = OpenshiftScrapeTargetScore {
scrape_target: scrape_target,
};
scrape_target_score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError { msg: e.to_string() })?;
}
Ok(PreparationOutcome::Success {
details: "Successfully added scrape targets for OpenshiftClusterMonitoring"
.to_string(),
})
} else {
Ok(PreparationOutcome::Noop)
}
}
async fn ensure_monitoring_installed(
&self,
_sender: &OpenshiftClusterAlertSender,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
let verify_monitoring_score = VerifyUserWorkload {};
info!("Verifying user workload and cluster monitoring installed");
verify_monitoring_score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError { msg: e.to_string() })?;
Ok(PreparationOutcome::Success {
details: "OpenshiftClusterMonitoring ready".to_string(),
})
}
}

View File

@@ -1,147 +0,0 @@
use async_trait::async_trait;
use crate::{
inventory::Inventory,
modules::monitoring::prometheus::{
Prometheus, score_prometheus_alert_receivers::PrometheusReceiverScore,
score_prometheus_ensure_ready::PrometheusEnsureReadyScore,
score_prometheus_install::PrometheusInstallScore,
score_prometheus_rule::PrometheusRuleScore,
score_prometheus_scrape_target::PrometheusScrapeTargetScore,
},
score::Score,
topology::{
K8sAnywhereTopology, PreparationError, PreparationOutcome,
monitoring::{AlertReceiver, AlertRule, Observability, ScrapeTarget},
},
};
#[async_trait]
impl Observability<Prometheus> for K8sAnywhereTopology {
async fn install_alert_sender(
&self,
sender: &Prometheus,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
let score = PrometheusInstallScore {
sender: sender.clone(),
};
score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(format!("Prometheus not installed {}", e)))?;
Ok(PreparationOutcome::Success {
details: "Successfully installed kubeprometheus alert sender".to_string(),
})
}
async fn install_receivers(
&self,
sender: &Prometheus,
inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<Prometheus>>>>,
) -> Result<PreparationOutcome, PreparationError> {
let receivers = match receivers {
Some(r) if !r.is_empty() => r,
_ => return Ok(PreparationOutcome::Noop),
};
for receiver in receivers {
let score = PrometheusReceiverScore {
receiver,
sender: sender.clone(),
};
score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(format!("Failed to install receiver: {}", e)))?;
}
Ok(PreparationOutcome::Success {
details: "All alert receivers installed successfully".to_string(),
})
}
async fn install_rules(
&self,
sender: &Prometheus,
inventory: &Inventory,
rules: Option<Vec<Box<dyn AlertRule<Prometheus>>>>,
) -> Result<PreparationOutcome, PreparationError> {
let rules = match rules {
Some(r) if !r.is_empty() => r,
_ => return Ok(PreparationOutcome::Noop),
};
for rule in rules {
let score = PrometheusRuleScore {
sender: sender.clone(),
rule,
};
score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(format!("Failed to install rule: {}", e)))?;
}
Ok(PreparationOutcome::Success {
details: "All alert rules installed successfully".to_string(),
})
}
async fn add_scrape_targets(
&self,
sender: &Prometheus,
inventory: &Inventory,
scrape_targets: Option<Vec<Box<dyn ScrapeTarget<Prometheus>>>>,
) -> Result<PreparationOutcome, PreparationError> {
let scrape_targets = match scrape_targets {
Some(r) if !r.is_empty() => r,
_ => return Ok(PreparationOutcome::Noop),
};
for scrape_target in scrape_targets {
let score = PrometheusScrapeTargetScore {
scrape_target,
sender: sender.clone(),
};
score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(format!("Failed to install rule: {}", e)))?;
}
Ok(PreparationOutcome::Success {
details: "All scrap targets installed successfully".to_string(),
})
}
async fn ensure_monitoring_installed(
&self,
sender: &Prometheus,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
let score = PrometheusEnsureReadyScore {
sender: sender.clone(),
};
score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(format!("Prometheus not ready {}", e)))?;
Ok(PreparationOutcome::Success {
details: "Prometheus Ready".to_string(),
})
}
}

View File

@@ -1,116 +0,0 @@
use crate::{
modules::monitoring::red_hat_cluster_observability::{
score_alert_receiver::RedHatClusterObservabilityReceiverScore,
score_coo_monitoring_stack::RedHatClusterObservabilityMonitoringStackScore,
},
score::Score,
};
use async_trait::async_trait;
use log::info;
use crate::{
inventory::Inventory,
modules::monitoring::red_hat_cluster_observability::{
RedHatClusterObservability,
score_redhat_cluster_observability_operator::RedHatClusterObservabilityOperatorScore,
},
topology::{
K8sAnywhereTopology, PreparationError, PreparationOutcome,
monitoring::{AlertReceiver, AlertRule, Observability, ScrapeTarget},
},
};
#[async_trait]
impl Observability<RedHatClusterObservability> for K8sAnywhereTopology {
async fn install_alert_sender(
&self,
sender: &RedHatClusterObservability,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
info!("Verifying Redhat Cluster Observability Operator");
let coo_score = RedHatClusterObservabilityOperatorScore::default();
coo_score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(e.to_string()))?;
info!(
"Installing Cluster Observability Operator Monitoring Stack in ns {}",
sender.namespace.clone()
);
let coo_monitoring_stack_score = RedHatClusterObservabilityMonitoringStackScore {
namespace: sender.namespace.clone(),
resource_selector: sender.resource_selector.clone(),
};
coo_monitoring_stack_score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(e.to_string()))?;
Ok(PreparationOutcome::Success {
details: "Successfully installed RedHatClusterObservability Operator".to_string(),
})
}
async fn install_receivers(
&self,
sender: &RedHatClusterObservability,
inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<RedHatClusterObservability>>>>,
) -> Result<PreparationOutcome, PreparationError> {
let receivers = match receivers {
Some(r) if !r.is_empty() => r,
_ => return Ok(PreparationOutcome::Noop),
};
for receiver in receivers {
info!("Installing receiver {}", receiver.name());
let receiver_score = RedHatClusterObservabilityReceiverScore {
receiver,
sender: sender.clone(),
};
receiver_score
.create_interpret()
.execute(inventory, self)
.await
.map_err(|e| PreparationError::new(e.to_string()))?;
}
Ok(PreparationOutcome::Success {
details: "Successfully installed receivers for OpenshiftClusterMonitoring".to_string(),
})
}
async fn install_rules(
&self,
_sender: &RedHatClusterObservability,
_inventory: &Inventory,
_rules: Option<Vec<Box<dyn AlertRule<RedHatClusterObservability>>>>,
) -> Result<PreparationOutcome, PreparationError> {
todo!()
}
async fn add_scrape_targets(
&self,
_sender: &RedHatClusterObservability,
_inventory: &Inventory,
_scrape_targets: Option<Vec<Box<dyn ScrapeTarget<RedHatClusterObservability>>>>,
) -> Result<PreparationOutcome, PreparationError> {
todo!()
}
async fn ensure_monitoring_installed(
&self,
_sender: &RedHatClusterObservability,
_inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError> {
todo!()
}
}

View File

@@ -1,6 +1,7 @@
use async_trait::async_trait;
use crate::{
interpret::Outcome,
inventory::Inventory,
modules::postgresql::{
K8sPostgreSQLScore,

View File

@@ -2,7 +2,6 @@ pub mod decentralized;
mod failover;
mod ha_cluster;
pub mod ingress;
pub mod monitoring;
pub mod node_exporter;
pub mod opnsense;
pub use failover::*;
@@ -12,10 +11,12 @@ mod http;
pub mod installable;
mod k8s_anywhere;
mod localhost;
pub mod oberservability;
pub mod tenant;
use derive_new::new;
pub use k8s_anywhere::*;
pub use localhost::*;
pub mod k8s;
mod load_balancer;
pub mod router;
mod tftp;

View File

@@ -1,256 +0,0 @@
use std::{
any::Any,
collections::{BTreeMap, HashMap},
net::IpAddr,
};
use async_trait::async_trait;
use kube::api::DynamicObject;
use log::{debug, info};
use serde::{Deserialize, Serialize};
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
topology::{PreparationError, PreparationOutcome, Topology, installable::Installable},
};
use harmony_types::id::Id;
/// Defines the application that sends the alerts to a receivers
/// for example prometheus
#[async_trait]
pub trait AlertSender: Send + Sync + std::fmt::Debug {
fn name(&self) -> String;
}
/// Trait which defines how an alert sender is impleneted for a specific topology
#[async_trait]
pub trait Observability<S: AlertSender> {
async fn install_alert_sender(
&self,
sender: &S,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError>;
async fn install_receivers(
&self,
sender: &S,
inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<S>>>>,
) -> Result<PreparationOutcome, PreparationError>;
async fn install_rules(
&self,
sender: &S,
inventory: &Inventory,
rules: Option<Vec<Box<dyn AlertRule<S>>>>,
) -> Result<PreparationOutcome, PreparationError>;
async fn add_scrape_targets(
&self,
sender: &S,
inventory: &Inventory,
scrape_targets: Option<Vec<Box<dyn ScrapeTarget<S>>>>,
) -> Result<PreparationOutcome, PreparationError>;
async fn ensure_monitoring_installed(
&self,
sender: &S,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError>;
}
/// Defines the entity that receives the alerts from a sender. For example Discord, Slack, etc
///
pub trait AlertReceiver<S: AlertSender>: std::fmt::Debug + Send + Sync {
fn build(&self) -> Result<ReceiverInstallPlan, InterpretError>;
fn name(&self) -> String;
fn clone_box(&self) -> Box<dyn AlertReceiver<S>>;
}
/// Defines a generic rule that can be applied to a sender, such as aprometheus alert rule
pub trait AlertRule<S: AlertSender>: std::fmt::Debug + Send + Sync {
fn build_rule(&self) -> Result<serde_json::Value, InterpretError>;
fn name(&self) -> String;
fn clone_box(&self) -> Box<dyn AlertRule<S>>;
}
/// A generic scrape target that can be added to a sender to scrape metrics from, for example a
/// server outside of the cluster
pub trait ScrapeTarget<S: AlertSender>: std::fmt::Debug + Send + Sync {
fn build_scrape_target(&self) -> Result<ExternalScrapeTarget, InterpretError>;
fn name(&self) -> String;
fn clone_box(&self) -> Box<dyn ScrapeTarget<S>>;
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ExternalScrapeTarget {
pub ip: IpAddr,
pub port: i32,
pub interval: Option<String>,
pub path: Option<String>,
pub labels: Option<BTreeMap<String, String>>,
}
/// Alerting interpret to install an alert sender on a given topology
#[derive(Debug)]
pub struct AlertingInterpret<S: AlertSender> {
pub sender: S,
pub receivers: Vec<Box<dyn AlertReceiver<S>>>,
pub rules: Vec<Box<dyn AlertRule<S>>>,
pub scrape_targets: Option<Vec<Box<dyn ScrapeTarget<S>>>>,
}
#[async_trait]
impl<S: AlertSender, T: Topology + Observability<S>> Interpret<T> for AlertingInterpret<S> {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
info!("Configuring alert sender {}", self.sender.name());
topology
.install_alert_sender(&self.sender, inventory)
.await?;
info!("Installing receivers");
topology
.install_receivers(&self.sender, inventory, Some(self.receivers.clone()))
.await?;
info!("Installing rules");
topology
.install_rules(&self.sender, inventory, Some(self.rules.clone()))
.await?;
info!("Adding extra scrape targets");
topology
.add_scrape_targets(&self.sender, inventory, self.scrape_targets.clone())
.await?;
info!("Ensuring alert sender {} is ready", self.sender.name());
topology
.ensure_monitoring_installed(&self.sender, inventory)
.await?;
Ok(Outcome::success(format!(
"successfully installed alert sender {}",
self.sender.name()
)))
}
fn get_name(&self) -> InterpretName {
InterpretName::Alerting
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl<S: AlertSender> Clone for Box<dyn AlertReceiver<S>> {
fn clone(&self) -> Self {
self.clone_box()
}
}
impl<S: AlertSender> Clone for Box<dyn AlertRule<S>> {
fn clone(&self) -> Self {
self.clone_box()
}
}
impl<S: AlertSender> Clone for Box<dyn ScrapeTarget<S>> {
fn clone(&self) -> Self {
self.clone_box()
}
}
pub struct ReceiverInstallPlan {
pub install_operation: Option<Vec<InstallOperation>>,
pub route: Option<AlertRoute>,
pub receiver: Option<serde_yaml::Value>,
}
impl Default for ReceiverInstallPlan {
fn default() -> Self {
Self {
install_operation: None,
route: None,
receiver: None,
}
}
}
pub enum InstallOperation {
CreateSecret {
name: String,
data: BTreeMap<String, String>,
},
}
///Generic routing that can map to various alert sender backends
#[derive(Debug, Clone, Serialize)]
pub struct AlertRoute {
pub receiver: String,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub matchers: Vec<AlertMatcher>,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub group_by: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub repeat_interval: Option<String>,
#[serde(rename = "continue")]
pub continue_matching: bool,
#[serde(skip_serializing_if = "Vec::is_empty")]
pub children: Vec<AlertRoute>,
}
impl AlertRoute {
pub fn default(name: String) -> Self {
Self {
receiver: name,
matchers: vec![],
group_by: vec![],
repeat_interval: Some("30s".to_string()),
continue_matching: true,
children: vec![],
}
}
}
#[derive(Debug, Clone, Serialize)]
pub struct AlertMatcher {
pub label: String,
pub operator: MatchOp,
pub value: String,
}
#[derive(Debug, Clone)]
pub enum MatchOp {
Eq,
NotEq,
Regex,
}
impl Serialize for MatchOp {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
let op = match self {
MatchOp::Eq => "=",
MatchOp::NotEq => "!=",
MatchOp::Regex => "=~",
};
serializer.serialize_str(op)
}
}

View File

@@ -9,7 +9,6 @@ use std::{
use async_trait::async_trait;
use brocade::PortOperatingMode;
use derive_new::new;
use harmony_k8s::K8sClient;
use harmony_types::{
id::Id,
net::{IpAddress, MacAddress},
@@ -19,7 +18,7 @@ use serde::Serialize;
use crate::executors::ExecutorError;
use super::LogicalHost;
use super::{LogicalHost, k8s::K8sClient};
#[derive(Debug)]
pub struct DHCPStaticEntry {

View File

@@ -0,0 +1 @@
pub mod monitoring;

View File

@@ -0,0 +1,101 @@
use std::{any::Any, collections::HashMap};
use async_trait::async_trait;
use kube::api::DynamicObject;
use log::debug;
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
topology::{Topology, installable::Installable},
};
use harmony_types::id::Id;
#[async_trait]
pub trait AlertSender: Send + Sync + std::fmt::Debug {
fn name(&self) -> String;
}
#[derive(Debug)]
pub struct AlertingInterpret<S: AlertSender> {
pub sender: S,
pub receivers: Vec<Box<dyn AlertReceiver<S>>>,
pub rules: Vec<Box<dyn AlertRule<S>>>,
pub scrape_targets: Option<Vec<Box<dyn ScrapeTarget<S>>>>,
}
#[async_trait]
impl<S: AlertSender + Installable<T>, T: Topology> Interpret<T> for AlertingInterpret<S> {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
debug!("hit sender configure for AlertingInterpret");
self.sender.configure(inventory, topology).await?;
for receiver in self.receivers.iter() {
receiver.install(&self.sender).await?;
}
for rule in self.rules.iter() {
debug!("installing rule: {:#?}", rule);
rule.install(&self.sender).await?;
}
if let Some(targets) = &self.scrape_targets {
for target in targets.iter() {
debug!("installing scrape_target: {:#?}", target);
target.install(&self.sender).await?;
}
}
self.sender.ensure_installed(inventory, topology).await?;
Ok(Outcome::success(format!(
"successfully installed alert sender {}",
self.sender.name()
)))
}
fn get_name(&self) -> InterpretName {
InterpretName::Alerting
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
#[async_trait]
pub trait AlertReceiver<S: AlertSender>: std::fmt::Debug + Send + Sync {
async fn install(&self, sender: &S) -> Result<Outcome, InterpretError>;
fn name(&self) -> String;
fn clone_box(&self) -> Box<dyn AlertReceiver<S>>;
fn as_any(&self) -> &dyn Any;
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String>;
}
#[derive(Debug)]
pub struct AlertManagerReceiver {
pub receiver_config: serde_json::Value,
// FIXME we should not leak k8s here. DynamicObject is k8s specific
pub additional_ressources: Vec<DynamicObject>,
pub route_config: serde_json::Value,
}
#[async_trait]
pub trait AlertRule<S: AlertSender>: std::fmt::Debug + Send + Sync {
async fn install(&self, sender: &S) -> Result<Outcome, InterpretError>;
fn clone_box(&self) -> Box<dyn AlertRule<S>>;
}
#[async_trait]
pub trait ScrapeTarget<S: AlertSender>: std::fmt::Debug + Send + Sync {
async fn install(&self, sender: &S) -> Result<Outcome, InterpretError>;
fn clone_box(&self) -> Box<dyn ScrapeTarget<S>>;
}

View File

@@ -1,8 +1,10 @@
use std::sync::Arc;
use crate::executors::ExecutorError;
use crate::{
executors::ExecutorError,
topology::k8s::{ApplyStrategy, K8sClient},
};
use async_trait::async_trait;
use harmony_k8s::K8sClient;
use k8s_openapi::{
api::{
core::v1::{LimitRange, Namespace, ResourceQuota},
@@ -12,7 +14,7 @@ use k8s_openapi::{
},
apimachinery::pkg::util::intstr::IntOrString,
};
use kube::Resource;
use kube::{Resource, api::DynamicObject};
use log::debug;
use serde::de::DeserializeOwned;
use serde_json::json;
@@ -57,6 +59,7 @@ impl K8sTenantManager {
) -> Result<K, ExecutorError>
where
<K as kube::Resource>::DynamicType: Default,
<K as kube::Resource>::Scope: ApplyStrategy<K>,
{
self.apply_labels(&mut resource, config);
self.k8s_client

View File

@@ -5,7 +5,6 @@ use std::{
use askama::Template;
use async_trait::async_trait;
use harmony_k8s::{DrainOptions, K8sClient, NodeFile};
use harmony_types::id::Id;
use k8s_openapi::api::core::v1::Node;
use kube::{
@@ -16,7 +15,10 @@ use log::{debug, info, warn};
use crate::{
modules::okd::crd::nmstate,
topology::{HostNetworkConfig, NetworkError, NetworkManager},
topology::{
HostNetworkConfig, NetworkError, NetworkManager,
k8s::{DrainOptions, K8sClient, NodeFile},
},
};
/// NetworkManager bond configuration template

View File

@@ -1,5 +1,5 @@
use async_trait::async_trait;
use log::{debug, info};
use log::{debug, info, trace};
use serde::Serialize;
use std::path::PathBuf;

View File

@@ -1,5 +1,4 @@
use async_trait::async_trait;
use harmony_k8s::K8sClient;
use harmony_macros::hurl;
use log::{debug, info, trace, warn};
use non_blank_string_rs::NonBlankString;
@@ -15,7 +14,7 @@ use crate::{
helm::chart::{HelmChartScore, HelmRepository},
},
score::Score,
topology::{HelmCommand, K8sclient, Topology, ingress::Ingress},
topology::{HelmCommand, K8sclient, Topology, ingress::Ingress, k8s::K8sClient},
};
use harmony_types::id::Id;

View File

@@ -2,15 +2,13 @@ use crate::modules::application::{
Application, ApplicationFeature, InstallationError, InstallationOutcome,
};
use crate::modules::monitoring::application_monitoring::application_monitoring_score::ApplicationMonitoringScore;
use crate::modules::monitoring::grafana::grafana::Grafana;
use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::CRDPrometheus;
use crate::modules::monitoring::kube_prometheus::crd::service_monitor::{
ServiceMonitor, ServiceMonitorSpec,
};
use crate::modules::monitoring::prometheus::Prometheus;
use crate::modules::monitoring::prometheus::helm::prometheus_config::PrometheusConfig;
use crate::topology::MultiTargetTopology;
use crate::topology::ingress::Ingress;
use crate::topology::monitoring::Observability;
use crate::topology::monitoring::{AlertReceiver, AlertRoute};
use crate::{
inventory::Inventory,
modules::monitoring::{
@@ -19,6 +17,10 @@ use crate::{
score::Score,
topology::{HelmCommand, K8sclient, Topology, tenant::TenantManager},
};
use crate::{
modules::prometheus::prometheus::PrometheusMonitoring,
topology::oberservability::monitoring::AlertReceiver,
};
use async_trait::async_trait;
use base64::{Engine as _, engine::general_purpose};
use harmony_secret::SecretManager;
@@ -28,13 +30,12 @@ use kube::api::ObjectMeta;
use log::{debug, info};
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::sync::{Arc, Mutex};
use std::sync::Arc;
//TODO test this
#[derive(Debug, Clone)]
pub struct Monitoring {
pub application: Arc<dyn Application>,
pub alert_receiver: Vec<Box<dyn AlertReceiver<Prometheus>>>,
pub alert_receiver: Vec<Box<dyn AlertReceiver<CRDPrometheus>>>,
}
#[async_trait]
@@ -45,7 +46,8 @@ impl<
+ TenantManager
+ K8sclient
+ MultiTargetTopology
+ Observability<Prometheus>
+ PrometheusMonitoring<CRDPrometheus>
+ Grafana
+ Ingress
+ std::fmt::Debug,
> ApplicationFeature<T> for Monitoring
@@ -72,15 +74,17 @@ impl<
};
let mut alerting_score = ApplicationMonitoringScore {
sender: Prometheus {
config: Arc::new(Mutex::new(PrometheusConfig::new())),
sender: CRDPrometheus {
namespace: namespace.clone(),
client: topology.k8s_client().await.unwrap(),
service_monitor: vec![app_service_monitor],
},
application: self.application.clone(),
receivers: self.alert_receiver.clone(),
};
let ntfy = NtfyScore {
namespace: namespace.clone(),
host: domain.clone(),
host: domain,
};
ntfy.interpret(&Inventory::empty(), topology)
.await
@@ -101,28 +105,20 @@ impl<
debug!("ntfy_default_auth_param: {ntfy_default_auth_param}");
debug!("ntfy_default_auth_param: {ntfy_default_auth_param}");
let ntfy_receiver = WebhookReceiver {
name: "ntfy-webhook".to_string(),
url: Url::Url(
url::Url::parse(
format!(
"http://{domain}/{}?auth={ntfy_default_auth_param}",
__self.application.name()
"http://ntfy.{}.svc.cluster.local/rust-web-app?auth={ntfy_default_auth_param}",
namespace.clone()
)
.as_str(),
)
.unwrap(),
),
route: AlertRoute {
..AlertRoute::default("ntfy-webhook".to_string())
},
};
debug!(
"ntfy webhook receiver \n{:#?}\nntfy topic: {}",
ntfy_receiver.clone(),
self.application.name()
);
alerting_score.receivers.push(Box::new(ntfy_receiver));
alerting_score
.interpret(&Inventory::empty(), topology)

View File

@@ -3,13 +3,11 @@ use std::sync::Arc;
use crate::modules::application::{
Application, ApplicationFeature, InstallationError, InstallationOutcome,
};
use crate::modules::monitoring::application_monitoring::rhobs_application_monitoring_score::ApplicationRHOBMonitoringScore;
use crate::modules::monitoring::red_hat_cluster_observability::RedHatClusterObservability;
use crate::modules::monitoring::red_hat_cluster_observability::redhat_cluster_observability::RedHatClusterObservabilityScore;
use crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::RHOBObservability;
use crate::topology::MultiTargetTopology;
use crate::topology::ingress::Ingress;
use crate::topology::monitoring::Observability;
use crate::topology::monitoring::{AlertReceiver, AlertRoute};
use crate::{
inventory::Inventory,
modules::monitoring::{
@@ -18,6 +16,10 @@ use crate::{
score::Score,
topology::{HelmCommand, K8sclient, Topology, tenant::TenantManager},
};
use crate::{
modules::prometheus::prometheus::PrometheusMonitoring,
topology::oberservability::monitoring::AlertReceiver,
};
use async_trait::async_trait;
use base64::{Engine as _, engine::general_purpose};
use harmony_types::net::Url;
@@ -26,10 +28,9 @@ use log::{debug, info};
#[derive(Debug, Clone)]
pub struct Monitoring {
pub application: Arc<dyn Application>,
pub alert_receiver: Vec<Box<dyn AlertReceiver<RedHatClusterObservability>>>,
pub alert_receiver: Vec<Box<dyn AlertReceiver<RHOBObservability>>>,
}
///TODO TEST this
#[async_trait]
impl<
T: Topology
@@ -40,7 +41,7 @@ impl<
+ MultiTargetTopology
+ Ingress
+ std::fmt::Debug
+ Observability<RedHatClusterObservability>,
+ PrometheusMonitoring<RHOBObservability>,
> ApplicationFeature<T> for Monitoring
{
async fn ensure_installed(
@@ -54,14 +55,13 @@ impl<
.map(|ns| ns.name.clone())
.unwrap_or_else(|| self.application.name());
let mut alerting_score = RedHatClusterObservabilityScore {
sender: RedHatClusterObservability {
let mut alerting_score = ApplicationRHOBMonitoringScore {
sender: RHOBObservability {
namespace: namespace.clone(),
resource_selector: todo!(),
client: topology.k8s_client().await.unwrap(),
},
application: self.application.clone(),
receivers: self.alert_receiver.clone(),
rules: vec![],
scrape_targets: None,
};
let domain = topology
.get_domain("ntfy")
@@ -97,15 +97,12 @@ impl<
url::Url::parse(
format!(
"http://{domain}/{}?auth={ntfy_default_auth_param}",
__self.application.name()
self.application.name()
)
.as_str(),
)
.unwrap(),
),
route: AlertRoute {
..AlertRoute::default("ntfy-webhook".to_string())
},
};
debug!(
"ntfy webhook receiver \n{:#?}\nntfy topic: {}",

View File

@@ -1,9 +1,8 @@
use std::sync::Arc;
use harmony_k8s::K8sClient;
use log::{debug, info};
use crate::interpret::InterpretError;
use crate::{interpret::InterpretError, topology::k8s::K8sClient};
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ArgoScope {

View File

@@ -44,12 +44,6 @@ pub struct BrocadeSwitchAuth {
pub password: String,
}
impl BrocadeSwitchAuth {
pub fn user_pass(username: String, password: String) -> Self {
Self { username, password }
}
}
#[derive(Secret, Clone, Debug, JsonSchema, Serialize, Deserialize)]
pub struct BrocadeSnmpAuth {
pub username: String,

View File

@@ -1,4 +1,3 @@
use harmony_k8s::K8sClient;
use std::sync::Arc;
use async_trait::async_trait;
@@ -12,7 +11,7 @@ use crate::{
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology},
topology::{K8sclient, Topology, k8s::K8sClient},
};
#[derive(Clone, Debug, Serialize)]

View File

@@ -54,12 +54,6 @@ pub enum HarmonyDiscoveryStrategy {
SUBNET { cidr: cidr::Ipv4Cidr, port: u16 },
}
impl Default for HarmonyDiscoveryStrategy {
fn default() -> Self {
HarmonyDiscoveryStrategy::MDNS
}
}
#[async_trait]
impl<T: Topology> Interpret<T> for DiscoverInventoryAgentInterpret {
async fn execute(

View File

@@ -3,8 +3,7 @@ use std::sync::Arc;
use async_trait::async_trait;
use log::warn;
use crate::topology::{FailoverTopology, K8sclient};
use harmony_k8s::K8sClient;
use crate::topology::{FailoverTopology, K8sclient, k8s::K8sClient};
#[async_trait]
impl<T: K8sclient> K8sclient for FailoverTopology<T> {

View File

@@ -1,4 +1,5 @@
use async_trait::async_trait;
use k8s_openapi::NamespaceResourceScope;
use kube::Resource;
use log::info;
use serde::{Serialize, de::DeserializeOwned};
@@ -28,7 +29,7 @@ impl<K: Resource + std::fmt::Debug> K8sResourceScore<K> {
}
impl<
K: Resource
K: Resource<Scope = NamespaceResourceScope>
+ std::fmt::Debug
+ Sync
+ DeserializeOwned
@@ -60,7 +61,7 @@ pub struct K8sResourceInterpret<K: Resource + std::fmt::Debug + Sync + Send> {
#[async_trait]
impl<
K: Resource
K: Resource<Scope = NamespaceResourceScope>
+ Clone
+ std::fmt::Debug
+ DeserializeOwned
@@ -108,7 +109,7 @@ where
topology
.k8s_client()
.await
.map_err(|e| InterpretError::new(format!("Failed to get k8s client : {e}")))?
.expect("Environment should provide enough information to instanciate a client")
.apply_many(&self.score.resource, self.score.namespace.as_deref())
.await?;

View File

@@ -15,12 +15,10 @@ pub mod load_balancer;
pub mod monitoring;
pub mod nats;
pub mod network;
pub mod node_health;
pub mod okd;
pub mod openbao;
pub mod opnsense;
pub mod postgresql;
pub mod prometheus;
pub mod storage;
pub mod tenant;
pub mod tftp;
pub mod zitadel;

View File

@@ -1,38 +1,99 @@
use crate::modules::monitoring::kube_prometheus::KubePrometheus;
use crate::modules::monitoring::okd::OpenshiftClusterAlertSender;
use crate::modules::monitoring::red_hat_cluster_observability::RedHatClusterObservability;
use crate::topology::monitoring::{AlertRoute, InstallOperation, ReceiverInstallPlan};
use crate::{interpret::InterpretError, topology::monitoring::AlertReceiver};
use harmony_types::net::Url;
use std::any::Any;
use std::collections::{BTreeMap, HashMap};
use async_trait::async_trait;
use harmony_types::k8s_name::K8sName;
use k8s_openapi::api::core::v1::Secret;
use kube::Resource;
use kube::api::{DynamicObject, ObjectMeta};
use log::{debug, trace};
use serde::Serialize;
use serde_json::json;
use std::collections::BTreeMap;
use serde_yaml::{Mapping, Value};
use crate::infra::kube::kube_resource_to_dynamic;
use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::{
AlertmanagerConfig, AlertmanagerConfigSpec, CRDPrometheus,
};
use crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::RHOBObservability;
use crate::modules::monitoring::okd::OpenshiftClusterAlertSender;
use crate::topology::oberservability::monitoring::AlertManagerReceiver;
use crate::{
interpret::{InterpretError, Outcome},
modules::monitoring::{
kube_prometheus::{
prometheus::{KubePrometheus, KubePrometheusReceiver},
types::{AlertChannelConfig, AlertManagerChannelConfig},
},
prometheus::prometheus::{Prometheus, PrometheusReceiver},
},
topology::oberservability::monitoring::AlertReceiver,
};
use harmony_types::net::Url;
#[derive(Debug, Clone, Serialize)]
pub struct DiscordReceiver {
pub name: String,
pub struct DiscordWebhook {
pub name: K8sName,
pub url: Url,
pub route: AlertRoute,
pub selectors: Vec<HashMap<String, String>>,
}
impl AlertReceiver<OpenshiftClusterAlertSender> for DiscordReceiver {
fn build(&self) -> Result<ReceiverInstallPlan, InterpretError> {
let receiver_block = serde_yaml::to_value(json!({
"name": self.name,
"discord_configs": [{
"webhook_url": format!("{}", self.url),
"title": "{{ template \"discord.default.title\" . }}",
"message": "{{ template \"discord.default.message\" . }}"
}]
}))
.map_err(|e| InterpretError::new(e.to_string()))?;
impl DiscordWebhook {
fn get_receiver_config(&self) -> Result<AlertManagerReceiver, String> {
let secret_name = format!("{}-secret", self.name.clone());
let webhook_key = format!("{}", self.url.clone());
Ok(ReceiverInstallPlan {
install_operation: None,
route: Some(self.route.clone()),
receiver: Some(receiver_block),
let mut string_data = BTreeMap::new();
string_data.insert("webhook-url".to_string(), webhook_key.clone());
let secret = Secret {
metadata: kube::core::ObjectMeta {
name: Some(secret_name.clone()),
..Default::default()
},
string_data: Some(string_data),
type_: Some("Opaque".to_string()),
..Default::default()
};
let mut matchers: Vec<String> = Vec::new();
for selector in &self.selectors {
trace!("selector: {:#?}", selector);
for (k, v) in selector {
matchers.push(format!("{} = {}", k, v));
}
}
Ok(AlertManagerReceiver {
additional_ressources: vec![kube_resource_to_dynamic(&secret)?],
receiver_config: json!({
"name": self.name,
"discord_configs": [
{
"webhook_url": self.url.clone(),
"title": "{{ template \"discord.default.title\" . }}",
"message": "{{ template \"discord.default.message\" . }}"
}
]
}),
route_config: json!({
"receiver": self.name,
"matchers": matchers,
}),
})
}
}
#[async_trait]
impl AlertReceiver<OpenshiftClusterAlertSender> for DiscordWebhook {
async fn install(
&self,
sender: &OpenshiftClusterAlertSender,
) -> Result<Outcome, InterpretError> {
todo!()
}
fn name(&self) -> String {
self.name.clone().to_string()
@@ -41,77 +102,309 @@ impl AlertReceiver<OpenshiftClusterAlertSender> for DiscordReceiver {
fn clone_box(&self) -> Box<dyn AlertReceiver<OpenshiftClusterAlertSender>> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
todo!()
}
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
self.get_receiver_config()
}
}
impl AlertReceiver<RedHatClusterObservability> for DiscordReceiver {
fn build(&self) -> Result<ReceiverInstallPlan, InterpretError> {
#[async_trait]
impl AlertReceiver<RHOBObservability> for DiscordWebhook {
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
todo!()
}
async fn install(&self, sender: &RHOBObservability) -> Result<Outcome, InterpretError> {
let ns = sender.namespace.clone();
let config = self.get_receiver_config()?;
for resource in config.additional_ressources.iter() {
todo!("can I apply a dynamicresource");
// sender.client.apply(resource, Some(&ns)).await;
}
let spec = crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::AlertmanagerConfigSpec {
data: json!({
"route": {
"receiver": self.name,
},
"receivers": [
config.receiver_config
]
}),
};
let alertmanager_configs = crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::AlertmanagerConfig {
metadata: ObjectMeta {
name: Some(self.name.clone().to_string()),
labels: Some(std::collections::BTreeMap::from([(
"alertmanagerConfig".to_string(),
"enabled".to_string(),
)])),
namespace: Some(sender.namespace.clone()),
..Default::default()
},
spec,
};
debug!(
"alertmanager_configs yaml:\n{:#?}",
serde_yaml::to_string(&alertmanager_configs)
);
debug!(
"alert manager configs: \n{:#?}",
alertmanager_configs.clone()
);
sender
.client
.apply(&alertmanager_configs, Some(&sender.namespace))
.await?;
Ok(Outcome::success(format!(
"installed rhob-alertmanagerconfigs for {}",
self.name
)))
}
fn name(&self) -> String {
"webhook-receiver".to_string()
}
fn clone_box(&self) -> Box<dyn AlertReceiver<RHOBObservability>> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[async_trait]
impl AlertReceiver<CRDPrometheus> for DiscordWebhook {
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
todo!()
}
async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> {
let ns = sender.namespace.clone();
let secret_name = format!("{}-secret", self.name.clone());
let webhook_key = format!("{}", self.url.clone());
let mut string_data = BTreeMap::new();
string_data.insert("webhook-url".to_string(), webhook_key.clone());
let receiver_config = json!({
"name": self.name,
"discordConfigs": [
{
"apiURL": {
"key": "webhook-url",
"name": format!("{}-secret", self.name)
},
"title": "{{ template \"discord.default.title\" . }}",
"message": "{{ template \"discord.default.message\" . }}"
}
]
});
let secret = Secret {
metadata: kube::core::ObjectMeta {
name: Some(secret_name.clone()),
..Default::default()
},
string_data: Some(string_data),
type_: Some("Opaque".to_string()),
..Default::default()
};
Ok(ReceiverInstallPlan {
install_operation: Some(vec![InstallOperation::CreateSecret {
name: secret_name,
data: string_data,
}]),
route: Some(self.route.clone()),
receiver: Some(
serde_yaml::to_value(receiver_config)
.map_err(|e| InterpretError::new(e.to_string()))
.expect("failed to build yaml value"),
),
})
let _ = sender.client.apply(&secret, Some(&ns)).await;
let spec = AlertmanagerConfigSpec {
data: json!({
"route": {
"receiver": self.name,
},
"receivers": [
{
"name": self.name,
"discordConfigs": [
{
"apiURL": {
"name": secret_name,
"key": "webhook-url",
},
"title": "{{ template \"discord.default.title\" . }}",
"message": "{{ template \"discord.default.message\" . }}"
}
]
}
]
}),
};
let alertmanager_configs = AlertmanagerConfig {
metadata: ObjectMeta {
name: Some(self.name.clone().to_string()),
labels: Some(std::collections::BTreeMap::from([(
"alertmanagerConfig".to_string(),
"enabled".to_string(),
)])),
namespace: Some(ns),
..Default::default()
},
spec,
};
sender
.client
.apply(&alertmanager_configs, Some(&sender.namespace))
.await?;
Ok(Outcome::success(format!(
"installed crd-alertmanagerconfigs for {}",
self.name
)))
}
fn name(&self) -> String {
self.name.clone()
"discord-webhook".to_string()
}
fn clone_box(&self) -> Box<dyn AlertReceiver<RedHatClusterObservability>> {
fn clone_box(&self) -> Box<dyn AlertReceiver<CRDPrometheus>> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
}
impl AlertReceiver<KubePrometheus> for DiscordReceiver {
fn build(&self) -> Result<ReceiverInstallPlan, InterpretError> {
let receiver_block = serde_yaml::to_value(json!({
"name": self.name,
"discord_configs": [{
"webhook_url": format!("{}", self.url),
"title": "{{ template \"discord.default.title\" . }}",
"message": "{{ template \"discord.default.message\" . }}"
}]
}))
.map_err(|e| InterpretError::new(e.to_string()))?;
Ok(ReceiverInstallPlan {
install_operation: None,
route: Some(self.route.clone()),
receiver: Some(receiver_block),
})
#[async_trait]
impl AlertReceiver<Prometheus> for DiscordWebhook {
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
todo!()
}
async fn install(&self, sender: &Prometheus) -> Result<Outcome, InterpretError> {
sender.install_receiver(self).await
}
fn name(&self) -> String {
self.name.clone()
"discord-webhook".to_string()
}
fn clone_box(&self) -> Box<dyn AlertReceiver<Prometheus>> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[async_trait]
impl PrometheusReceiver for DiscordWebhook {
fn name(&self) -> String {
self.name.clone().to_string()
}
async fn configure_receiver(&self) -> AlertManagerChannelConfig {
self.get_config().await
}
}
#[async_trait]
impl AlertReceiver<KubePrometheus> for DiscordWebhook {
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
todo!()
}
async fn install(&self, sender: &KubePrometheus) -> Result<Outcome, InterpretError> {
sender.install_receiver(self).await
}
fn clone_box(&self) -> Box<dyn AlertReceiver<KubePrometheus>> {
Box::new(self.clone())
}
fn name(&self) -> String {
"discord-webhook".to_string()
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[async_trait]
impl KubePrometheusReceiver for DiscordWebhook {
fn name(&self) -> String {
self.name.clone().to_string()
}
async fn configure_receiver(&self) -> AlertManagerChannelConfig {
self.get_config().await
}
}
#[async_trait]
impl AlertChannelConfig for DiscordWebhook {
async fn get_config(&self) -> AlertManagerChannelConfig {
let channel_global_config = None;
let channel_receiver = self.alert_channel_receiver().await;
let channel_route = self.alert_channel_route().await;
AlertManagerChannelConfig {
channel_global_config,
channel_receiver,
channel_route,
}
}
}
impl DiscordWebhook {
async fn alert_channel_route(&self) -> serde_yaml::Value {
let mut route = Mapping::new();
route.insert(
Value::String("receiver".to_string()),
Value::String(self.name.clone().to_string()),
);
route.insert(
Value::String("matchers".to_string()),
Value::Sequence(vec![Value::String("alertname!=Watchdog".to_string())]),
);
route.insert(Value::String("continue".to_string()), Value::Bool(true));
Value::Mapping(route)
}
async fn alert_channel_receiver(&self) -> serde_yaml::Value {
let mut receiver = Mapping::new();
receiver.insert(
Value::String("name".to_string()),
Value::String(self.name.clone().to_string()),
);
let mut discord_config = Mapping::new();
discord_config.insert(
Value::String("webhook_url".to_string()),
Value::String(self.url.to_string()),
);
receiver.insert(
Value::String("discord_configs".to_string()),
Value::Sequence(vec![Value::Mapping(discord_config)]),
);
Value::Mapping(receiver)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn discord_serialize_should_match() {
let discord_receiver = DiscordWebhook {
name: K8sName("test-discord".to_string()),
url: Url::Url(url::Url::parse("https://discord.i.dont.exist.com").unwrap()),
selectors: vec![],
};
let discord_receiver_receiver =
serde_yaml::to_string(&discord_receiver.alert_channel_receiver().await).unwrap();
println!("receiver \n{:#}", discord_receiver_receiver);
let discord_receiver_receiver_yaml = r#"name: test-discord
discord_configs:
- webhook_url: https://discord.i.dont.exist.com/
"#
.to_string();
let discord_receiver_route =
serde_yaml::to_string(&discord_receiver.alert_channel_route().await).unwrap();
println!("route \n{:#}", discord_receiver_route);
let discord_receiver_route_yaml = r#"receiver: test-discord
matchers:
- alertname!=Watchdog
continue: true
"#
.to_string();
assert_eq!(discord_receiver_receiver, discord_receiver_receiver_yaml);
assert_eq!(discord_receiver_route, discord_receiver_route_yaml);
}
}

View File

@@ -1,13 +1,25 @@
use std::any::Any;
use async_trait::async_trait;
use kube::api::ObjectMeta;
use log::debug;
use serde::Serialize;
use serde_json::json;
use serde_yaml::{Mapping, Value};
use crate::{
interpret::InterpretError,
interpret::{InterpretError, Outcome},
modules::monitoring::{
kube_prometheus::KubePrometheus, okd::OpenshiftClusterAlertSender, prometheus::Prometheus,
red_hat_cluster_observability::RedHatClusterObservability,
kube_prometheus::{
crd::{
crd_alertmanager_config::CRDPrometheus, rhob_alertmanager_config::RHOBObservability,
},
prometheus::{KubePrometheus, KubePrometheusReceiver},
types::{AlertChannelConfig, AlertManagerChannelConfig},
},
prometheus::prometheus::{Prometheus, PrometheusReceiver},
},
topology::monitoring::{AlertReceiver, AlertRoute, ReceiverInstallPlan},
topology::oberservability::monitoring::{AlertManagerReceiver, AlertReceiver},
};
use harmony_types::net::Url;
@@ -15,115 +27,281 @@ use harmony_types::net::Url;
pub struct WebhookReceiver {
pub name: String,
pub url: Url,
pub route: AlertRoute,
}
impl WebhookReceiver {
fn build_receiver(&self) -> serde_json::Value {
json!({
"name": self.name,
"webhookConfigs": [
{
"url": self.url,
"httpConfig": {
"tlsConfig": {
"insecureSkipVerify": true
#[async_trait]
impl AlertReceiver<RHOBObservability> for WebhookReceiver {
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
todo!()
}
async fn install(&self, sender: &RHOBObservability) -> Result<Outcome, InterpretError> {
let spec = crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::AlertmanagerConfigSpec {
data: json!({
"route": {
"receiver": self.name,
},
"receivers": [
{
"name": self.name,
"webhookConfigs": [
{
"url": self.url,
"httpConfig": {
"tlsConfig": {
"insecureSkipVerify": true
}
}
}
]
}
}
}
]})
]
}),
};
let alertmanager_configs = crate::modules::monitoring::kube_prometheus::crd::rhob_alertmanager_config::AlertmanagerConfig {
metadata: ObjectMeta {
name: Some(self.name.clone()),
labels: Some(std::collections::BTreeMap::from([(
"alertmanagerConfig".to_string(),
"enabled".to_string(),
)])),
namespace: Some(sender.namespace.clone()),
..Default::default()
},
spec,
};
debug!(
"alert manager configs: \n{:#?}",
alertmanager_configs.clone()
);
sender
.client
.apply(&alertmanager_configs, Some(&sender.namespace))
.await?;
Ok(Outcome::success(format!(
"installed rhob-alertmanagerconfigs for {}",
self.name
)))
}
fn build_route(&self) -> serde_json::Value {
json!({
"name": self.name})
}
}
impl AlertReceiver<OpenshiftClusterAlertSender> for WebhookReceiver {
fn name(&self) -> String {
self.name.clone()
"webhook-receiver".to_string()
}
fn clone_box(&self) -> Box<dyn AlertReceiver<OpenshiftClusterAlertSender>> {
fn clone_box(&self) -> Box<dyn AlertReceiver<RHOBObservability>> {
Box::new(self.clone())
}
fn build(&self) -> Result<crate::topology::monitoring::ReceiverInstallPlan, InterpretError> {
let receiver = self.build_receiver();
let receiver =
serde_yaml::to_value(receiver).map_err(|e| InterpretError::new(e.to_string()))?;
Ok(ReceiverInstallPlan {
install_operation: None,
route: Some(self.route.clone()),
receiver: Some(receiver),
})
fn as_any(&self) -> &dyn Any {
self
}
}
impl AlertReceiver<RedHatClusterObservability> for WebhookReceiver {
fn name(&self) -> String {
self.name.clone()
#[async_trait]
impl AlertReceiver<CRDPrometheus> for WebhookReceiver {
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
todo!()
}
async fn install(&self, sender: &CRDPrometheus) -> Result<Outcome, InterpretError> {
let spec = crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::AlertmanagerConfigSpec {
data: json!({
"route": {
"receiver": self.name,
},
"receivers": [
{
"name": self.name,
"webhookConfigs": [
{
"url": self.url,
}
]
}
]
}),
};
let alertmanager_configs = crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::AlertmanagerConfig {
metadata: ObjectMeta {
name: Some(self.name.clone()),
labels: Some(std::collections::BTreeMap::from([(
"alertmanagerConfig".to_string(),
"enabled".to_string(),
)])),
namespace: Some(sender.namespace.clone()),
..Default::default()
},
spec,
};
debug!(
"alert manager configs: \n{:#?}",
alertmanager_configs.clone()
);
sender
.client
.apply(&alertmanager_configs, Some(&sender.namespace))
.await?;
Ok(Outcome::success(format!(
"installed crd-alertmanagerconfigs for {}",
self.name
)))
}
fn clone_box(&self) -> Box<dyn AlertReceiver<RedHatClusterObservability>> {
fn name(&self) -> String {
"webhook-receiver".to_string()
}
fn clone_box(&self) -> Box<dyn AlertReceiver<CRDPrometheus>> {
Box::new(self.clone())
}
fn build(&self) -> Result<crate::topology::monitoring::ReceiverInstallPlan, InterpretError> {
let receiver = self.build_receiver();
let receiver =
serde_yaml::to_value(receiver).map_err(|e| InterpretError::new(e.to_string()))?;
Ok(ReceiverInstallPlan {
install_operation: None,
route: Some(self.route.clone()),
receiver: Some(receiver),
})
}
}
impl AlertReceiver<KubePrometheus> for WebhookReceiver {
fn name(&self) -> String {
self.name.clone()
}
fn clone_box(&self) -> Box<dyn AlertReceiver<KubePrometheus>> {
Box::new(self.clone())
}
fn build(&self) -> Result<crate::topology::monitoring::ReceiverInstallPlan, InterpretError> {
let receiver = self.build_receiver();
let receiver =
serde_yaml::to_value(receiver).map_err(|e| InterpretError::new(e.to_string()))?;
Ok(ReceiverInstallPlan {
install_operation: None,
route: Some(self.route.clone()),
receiver: Some(receiver),
})
fn as_any(&self) -> &dyn Any {
self
}
}
#[async_trait]
impl AlertReceiver<Prometheus> for WebhookReceiver {
fn name(&self) -> String {
self.name.clone()
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
todo!()
}
async fn install(&self, sender: &Prometheus) -> Result<Outcome, InterpretError> {
sender.install_receiver(self).await
}
fn name(&self) -> String {
"webhook-receiver".to_string()
}
fn clone_box(&self) -> Box<dyn AlertReceiver<Prometheus>> {
Box::new(self.clone())
}
fn build(&self) -> Result<crate::topology::monitoring::ReceiverInstallPlan, InterpretError> {
let receiver = self.build_receiver();
let receiver =
serde_yaml::to_value(receiver).map_err(|e| InterpretError::new(e.to_string()))?;
Ok(ReceiverInstallPlan {
install_operation: None,
route: Some(self.route.clone()),
receiver: Some(receiver),
})
fn as_any(&self) -> &dyn Any {
self
}
}
#[async_trait]
impl PrometheusReceiver for WebhookReceiver {
fn name(&self) -> String {
self.name.clone()
}
async fn configure_receiver(&self) -> AlertManagerChannelConfig {
self.get_config().await
}
}
#[async_trait]
impl AlertReceiver<KubePrometheus> for WebhookReceiver {
fn as_alertmanager_receiver(&self) -> Result<AlertManagerReceiver, String> {
todo!()
}
async fn install(&self, sender: &KubePrometheus) -> Result<Outcome, InterpretError> {
sender.install_receiver(self).await
}
fn name(&self) -> String {
"webhook-receiver".to_string()
}
fn clone_box(&self) -> Box<dyn AlertReceiver<KubePrometheus>> {
Box::new(self.clone())
}
fn as_any(&self) -> &dyn Any {
self
}
}
#[async_trait]
impl KubePrometheusReceiver for WebhookReceiver {
fn name(&self) -> String {
self.name.clone()
}
async fn configure_receiver(&self) -> AlertManagerChannelConfig {
self.get_config().await
}
}
#[async_trait]
impl AlertChannelConfig for WebhookReceiver {
async fn get_config(&self) -> AlertManagerChannelConfig {
let channel_global_config = None;
let channel_receiver = self.alert_channel_receiver().await;
let channel_route = self.alert_channel_route().await;
AlertManagerChannelConfig {
channel_global_config,
channel_receiver,
channel_route,
}
}
}
impl WebhookReceiver {
async fn alert_channel_route(&self) -> serde_yaml::Value {
let mut route = Mapping::new();
route.insert(
Value::String("receiver".to_string()),
Value::String(self.name.clone()),
);
route.insert(
Value::String("matchers".to_string()),
Value::Sequence(vec![Value::String("alertname!=Watchdog".to_string())]),
);
route.insert(Value::String("continue".to_string()), Value::Bool(true));
Value::Mapping(route)
}
async fn alert_channel_receiver(&self) -> serde_yaml::Value {
let mut receiver = Mapping::new();
receiver.insert(
Value::String("name".to_string()),
Value::String(self.name.clone()),
);
let mut webhook_config = Mapping::new();
webhook_config.insert(
Value::String("url".to_string()),
Value::String(self.url.to_string()),
);
receiver.insert(
Value::String("webhook_configs".to_string()),
Value::Sequence(vec![Value::Mapping(webhook_config)]),
);
Value::Mapping(receiver)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn webhook_serialize_should_match() {
let webhook_receiver = WebhookReceiver {
name: "test-webhook".to_string(),
url: Url::Url(url::Url::parse("https://webhook.i.dont.exist.com").unwrap()),
};
let webhook_receiver_receiver =
serde_yaml::to_string(&webhook_receiver.alert_channel_receiver().await).unwrap();
println!("receiver \n{:#}", webhook_receiver_receiver);
let webhook_receiver_receiver_yaml = r#"name: test-webhook
webhook_configs:
- url: https://webhook.i.dont.exist.com/
"#
.to_string();
let webhook_receiver_route =
serde_yaml::to_string(&webhook_receiver.alert_channel_route().await).unwrap();
println!("route \n{:#}", webhook_receiver_route);
let webhook_receiver_route_yaml = r#"receiver: test-webhook
matchers:
- alertname!=Watchdog
continue: true
"#
.to_string();
assert_eq!(webhook_receiver_receiver, webhook_receiver_receiver_yaml);
assert_eq!(webhook_receiver_route, webhook_receiver_route_yaml);
}
}

View File

@@ -1,15 +0,0 @@
use crate::modules::monitoring::alert_rule::prometheus_alert_rule::PrometheusAlertRule;
pub fn high_http_error_rate() -> PrometheusAlertRule {
let expression = r#"(
sum(rate(http_requests_total{status=~"5.."}[5m])) by (job, route, service)
/
sum(rate(http_requests_total[5m])) by (job, route, service)
) > 0.05 and sum(rate(http_requests_total[5m])) by (job, route, service) > 10"#;
PrometheusAlertRule::new("HighApplicationErrorRate", expression)
.for_duration("10m")
.label("severity", "warning")
.annotation("summary", "High HTTP error rate on {{ $labels.job }}")
.annotation("description", "Job {{ $labels.job }} (route {{ $labels.route }}) has an error rate > 5% over the last 10m.")
}

View File

@@ -1,2 +1 @@
pub mod alerts;
pub mod prometheus_alert_rule;

View File

@@ -1,13 +1,79 @@
use std::collections::HashMap;
use std::collections::{BTreeMap, HashMap};
use async_trait::async_trait;
use serde::Serialize;
use crate::{
interpret::InterpretError,
modules::monitoring::{kube_prometheus::KubePrometheus, okd::OpenshiftClusterAlertSender},
topology::monitoring::AlertRule,
interpret::{InterpretError, Outcome},
modules::monitoring::{
kube_prometheus::{
prometheus::{KubePrometheus, KubePrometheusRule},
types::{AlertGroup, AlertManagerAdditionalPromRules},
},
prometheus::prometheus::{Prometheus, PrometheusRule},
},
topology::oberservability::monitoring::AlertRule,
};
#[async_trait]
impl AlertRule<KubePrometheus> for AlertManagerRuleGroup {
async fn install(&self, sender: &KubePrometheus) -> Result<Outcome, InterpretError> {
sender.install_rule(self).await
}
fn clone_box(&self) -> Box<dyn AlertRule<KubePrometheus>> {
Box::new(self.clone())
}
}
#[async_trait]
impl AlertRule<Prometheus> for AlertManagerRuleGroup {
async fn install(&self, sender: &Prometheus) -> Result<Outcome, InterpretError> {
sender.install_rule(self).await
}
fn clone_box(&self) -> Box<dyn AlertRule<Prometheus>> {
Box::new(self.clone())
}
}
#[async_trait]
impl PrometheusRule for AlertManagerRuleGroup {
fn name(&self) -> String {
self.name.clone()
}
async fn configure_rule(&self) -> AlertManagerAdditionalPromRules {
let mut additional_prom_rules = BTreeMap::new();
additional_prom_rules.insert(
self.name.clone(),
AlertGroup {
groups: vec![self.clone()],
},
);
AlertManagerAdditionalPromRules {
rules: additional_prom_rules,
}
}
}
#[async_trait]
impl KubePrometheusRule for AlertManagerRuleGroup {
fn name(&self) -> String {
self.name.clone()
}
async fn configure_rule(&self) -> AlertManagerAdditionalPromRules {
let mut additional_prom_rules = BTreeMap::new();
additional_prom_rules.insert(
self.name.clone(),
AlertGroup {
groups: vec![self.clone()],
},
);
AlertManagerAdditionalPromRules {
rules: additional_prom_rules,
}
}
}
impl AlertManagerRuleGroup {
pub fn new(name: &str, rules: Vec<PrometheusAlertRule>) -> AlertManagerRuleGroup {
AlertManagerRuleGroup {
@@ -63,55 +129,3 @@ impl PrometheusAlertRule {
self
}
}
impl AlertRule<OpenshiftClusterAlertSender> for AlertManagerRuleGroup {
fn build_rule(&self) -> Result<serde_json::Value, InterpretError> {
let name = self.name.clone();
let mut rules: Vec<crate::modules::monitoring::okd::crd::alerting_rules::Rule> = vec![];
for rule in self.rules.clone() {
rules.push(rule.into())
}
let rule_groups =
vec![crate::modules::monitoring::okd::crd::alerting_rules::RuleGroup { name, rules }];
Ok(serde_json::to_value(rule_groups).map_err(|e| InterpretError::new(e.to_string()))?)
}
fn name(&self) -> String {
self.name.clone()
}
fn clone_box(&self) -> Box<dyn AlertRule<OpenshiftClusterAlertSender>> {
Box::new(self.clone())
}
}
impl AlertRule<KubePrometheus> for AlertManagerRuleGroup {
fn build_rule(&self) -> Result<serde_json::Value, InterpretError> {
let name = self.name.clone();
let mut rules: Vec<
crate::modules::monitoring::kube_prometheus::crd::crd_prometheus_rules::Rule,
> = vec![];
for rule in self.rules.clone() {
rules.push(rule.into())
}
let rule_groups = vec![
crate::modules::monitoring::kube_prometheus::crd::crd_prometheus_rules::RuleGroup {
name,
rules,
},
];
Ok(serde_json::to_value(rule_groups).map_err(|e| InterpretError::new(e.to_string()))?)
}
fn name(&self) -> String {
self.name.clone()
}
fn clone_box(&self) -> Box<dyn AlertRule<KubePrometheus>> {
Box::new(self.clone())
}
}

View File

@@ -5,26 +5,32 @@ use serde::Serialize;
use crate::{
interpret::Interpret,
modules::{application::Application, monitoring::prometheus::Prometheus},
modules::{
application::Application,
monitoring::{
grafana::grafana::Grafana, kube_prometheus::crd::crd_alertmanager_config::CRDPrometheus,
},
prometheus::prometheus::PrometheusMonitoring,
},
score::Score,
topology::{
K8sclient, Topology,
monitoring::{AlertReceiver, AlertingInterpret, Observability, ScrapeTarget},
oberservability::monitoring::{AlertReceiver, AlertingInterpret, ScrapeTarget},
},
};
#[derive(Debug, Clone, Serialize)]
pub struct ApplicationMonitoringScore {
pub sender: Prometheus,
pub sender: CRDPrometheus,
pub application: Arc<dyn Application>,
pub receivers: Vec<Box<dyn AlertReceiver<Prometheus>>>,
pub receivers: Vec<Box<dyn AlertReceiver<CRDPrometheus>>>,
}
impl<T: Topology + Observability<Prometheus> + K8sclient> Score<T> for ApplicationMonitoringScore {
impl<T: Topology + PrometheusMonitoring<CRDPrometheus> + K8sclient + Grafana> Score<T>
for ApplicationMonitoringScore
{
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
debug!("creating alerting interpret");
//TODO will need to use k8sclient to apply service monitors or find a way to pass
//them to the AlertingInterpret potentially via Sender Prometheus
Box::new(AlertingInterpret {
sender: self.sender.clone(),
receivers: self.receivers.clone(),

View File

@@ -9,27 +9,28 @@ use crate::{
inventory::Inventory,
modules::{
application::Application,
monitoring::red_hat_cluster_observability::RedHatClusterObservability,
monitoring::kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus, rhob_alertmanager_config::RHOBObservability,
},
prometheus::prometheus::PrometheusMonitoring,
},
score::Score,
topology::{
Topology,
monitoring::{AlertReceiver, AlertingInterpret, Observability},
},
topology::{PreparationOutcome, Topology, oberservability::monitoring::AlertReceiver},
};
use harmony_types::id::Id;
#[derive(Debug, Clone, Serialize)]
pub struct ApplicationRedHatClusterMonitoringScore {
pub sender: RedHatClusterObservability,
pub struct ApplicationRHOBMonitoringScore {
pub sender: RHOBObservability,
pub application: Arc<dyn Application>,
pub receivers: Vec<Box<dyn AlertReceiver<RedHatClusterObservability>>>,
pub receivers: Vec<Box<dyn AlertReceiver<RHOBObservability>>>,
}
impl<T: Topology + Observability<RedHatClusterObservability>> Score<T>
for ApplicationRedHatClusterMonitoringScore
impl<T: Topology + PrometheusMonitoring<RHOBObservability>> Score<T>
for ApplicationRHOBMonitoringScore
{
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(ApplicationRedHatClusterMonitoringInterpret {
Box::new(ApplicationRHOBMonitoringInterpret {
score: self.clone(),
})
}
@@ -43,28 +44,38 @@ impl<T: Topology + Observability<RedHatClusterObservability>> Score<T>
}
#[derive(Debug)]
pub struct ApplicationRedHatClusterMonitoringInterpret {
score: ApplicationRedHatClusterMonitoringScore,
pub struct ApplicationRHOBMonitoringInterpret {
score: ApplicationRHOBMonitoringScore,
}
#[async_trait]
impl<T: Topology + Observability<RedHatClusterObservability>> Interpret<T>
for ApplicationRedHatClusterMonitoringInterpret
impl<T: Topology + PrometheusMonitoring<RHOBObservability>> Interpret<T>
for ApplicationRHOBMonitoringInterpret
{
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
//TODO will need to use k8sclient to apply crd ServiceMonitor or find a way to pass
//them to the AlertingInterpret potentially via Sender RedHatClusterObservability
let alerting_interpret = AlertingInterpret {
sender: self.score.sender.clone(),
receivers: self.score.receivers.clone(),
rules: vec![],
scrape_targets: None,
};
alerting_interpret.execute(inventory, topology).await
let result = topology
.install_prometheus(
&self.score.sender,
inventory,
Some(self.score.receivers.clone()),
)
.await;
match result {
Ok(outcome) => match outcome {
PreparationOutcome::Success { details: _ } => {
Ok(Outcome::success("Prometheus installed".into()))
}
PreparationOutcome::Noop => {
Ok(Outcome::noop("Prometheus installation skipped".into()))
}
},
Err(err) => Err(InterpretError::from(err)),
}
}
fn get_name(&self) -> InterpretName {

View File

@@ -1,41 +1,17 @@
use serde::Serialize;
use async_trait::async_trait;
use k8s_openapi::Resource;
use crate::topology::monitoring::{AlertReceiver, AlertRule, AlertSender, ScrapeTarget};
use crate::{
inventory::Inventory,
topology::{PreparationError, PreparationOutcome},
};
#[derive(Debug, Clone, Serialize)]
pub struct Grafana {
pub namespace: String,
}
impl AlertSender for Grafana {
fn name(&self) -> String {
"grafana".to_string()
}
}
impl Serialize for Box<dyn AlertReceiver<Grafana>> {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
todo!()
}
}
impl Serialize for Box<dyn AlertRule<Grafana>> {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
todo!()
}
}
impl Serialize for Box<dyn ScrapeTarget<Grafana>> {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
todo!()
}
#[async_trait]
pub trait Grafana {
async fn ensure_grafana_operator(
&self,
inventory: &Inventory,
) -> Result<PreparationOutcome, PreparationError>;
async fn install_grafana(&self) -> Result<PreparationOutcome, PreparationError>;
}

View File

@@ -1,32 +0,0 @@
use serde::Serialize;
use crate::{
modules::monitoring::grafana::grafana::Grafana,
score::Score,
topology::{
Topology,
monitoring::{AlertReceiver, AlertRule, AlertingInterpret, Observability, ScrapeTarget},
},
};
#[derive(Clone, Debug, Serialize)]
pub struct GrafanaAlertingScore {
pub receivers: Vec<Box<dyn AlertReceiver<Grafana>>>,
pub rules: Vec<Box<dyn AlertRule<Grafana>>>,
pub scrape_targets: Option<Vec<Box<dyn ScrapeTarget<Grafana>>>>,
pub sender: Grafana,
}
impl<T: Topology + Observability<Grafana>> Score<T> for GrafanaAlertingScore {
fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> {
Box::new(AlertingInterpret {
sender: self.sender.clone(),
receivers: self.receivers.clone(),
rules: self.rules.clone(),
scrape_targets: self.scrape_targets.clone(),
})
}
fn name(&self) -> String {
"HelmPrometheusAlertingScore".to_string()
}
}

View File

@@ -0,0 +1,28 @@
use harmony_macros::hurl;
use non_blank_string_rs::NonBlankString;
use std::{collections::HashMap, str::FromStr};
use crate::modules::helm::chart::{HelmChartScore, HelmRepository};
pub fn grafana_helm_chart_score(ns: &str, namespace_scope: bool) -> HelmChartScore {
let mut values_overrides = HashMap::new();
values_overrides.insert(
NonBlankString::from_str("namespaceScope").unwrap(),
namespace_scope.to_string(),
);
HelmChartScore {
namespace: Some(NonBlankString::from_str(ns).unwrap()),
release_name: NonBlankString::from_str("grafana-operator").unwrap(),
chart_name: NonBlankString::from_str("grafana/grafana-operator").unwrap(),
chart_version: None,
values_overrides: Some(values_overrides),
values_yaml: None,
create_namespace: true,
install_only: true,
repository: Some(HelmRepository::new(
"grafana".to_string(),
hurl!("https://grafana.github.io/helm-charts"),
true,
)),
}
}

View File

@@ -0,0 +1 @@
pub mod helm_grafana;

View File

@@ -1,3 +0,0 @@
pub mod crd_grafana;
pub mod grafana_default_dashboard;
pub mod rhob_grafana;

View File

@@ -1 +0,0 @@
pub mod grafana_operator;

View File

@@ -1,7 +0,0 @@
pub mod crd;
pub mod helm;
pub mod score_ensure_grafana_ready;
pub mod score_grafana_alert_receiver;
pub mod score_grafana_datasource;
pub mod score_grafana_rule;
pub mod score_install_grafana;

View File

@@ -1,54 +0,0 @@
use serde::Serialize;
use crate::{
interpret::Interpret,
modules::monitoring::grafana::grafana::Grafana,
score::Score,
topology::{K8sclient, Topology},
};
#[derive(Debug, Clone, Serialize)]
pub struct GrafanaK8sEnsureReadyScore {
pub sender: Grafana,
}
impl<T: Topology + K8sclient> Score<T> for GrafanaK8sEnsureReadyScore {
fn name(&self) -> String {
"GrafanaK8sEnsureReadyScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
todo!()
}
}
// async fn ensure_ready(
// &self,
// inventory: &Inventory,
// ) -> Result<PreparationOutcome, PreparationError> {
// debug!("ensure grafana operator");
// let client = self.k8s_client().await.unwrap();
// let grafana_gvk = GroupVersionKind {
// group: "grafana.integreatly.org".to_string(),
// version: "v1beta1".to_string(),
// kind: "Grafana".to_string(),
// };
// let name = "grafanas.grafana.integreatly.org";
// let ns = "grafana";
//
// let grafana_crd = client
// .get_resource_json_value(name, Some(ns), &grafana_gvk)
// .await;
// match grafana_crd {
// Ok(_) => {
// return Ok(PreparationOutcome::Success {
// details: "Found grafana CRDs in cluster".to_string(),
// });
// }
//
// Err(_) => {
// return self
// .install_grafana_operator(inventory, Some("grafana"))
// .await;
// }
// };
// }

View File

@@ -1,24 +0,0 @@
use serde::Serialize;
use crate::{
interpret::Interpret,
modules::monitoring::grafana::grafana::Grafana,
score::Score,
topology::{K8sclient, Topology, monitoring::AlertReceiver},
};
#[derive(Debug, Clone, Serialize)]
pub struct GrafanaK8sReceiverScore {
pub sender: Grafana,
pub receiver: Box<dyn AlertReceiver<Grafana>>,
}
impl<T: Topology + K8sclient> Score<T> for GrafanaK8sReceiverScore {
fn name(&self) -> String {
"GrafanaK8sReceiverScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
todo!()
}
}

View File

@@ -1,83 +0,0 @@
use serde::Serialize;
use crate::{
interpret::Interpret,
modules::monitoring::grafana::grafana::Grafana,
score::Score,
topology::{K8sclient, Topology, monitoring::ScrapeTarget},
};
#[derive(Debug, Clone, Serialize)]
pub struct GrafanaK8sDatasourceScore {
pub sender: Grafana,
pub scrape_target: Box<dyn ScrapeTarget<Grafana>>,
}
impl<T: Topology + K8sclient> Score<T> for GrafanaK8sDatasourceScore {
fn name(&self) -> String {
"GrafanaK8sDatasourceScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
todo!()
}
}
// fn extract_and_normalize_token(&self, secret: &DynamicObject) -> Option<String> {
// let token_b64 = secret
// .data
// .get("token")
// .or_else(|| secret.data.get("data").and_then(|d| d.get("token")))
// .and_then(|v| v.as_str())?;
//
// let bytes = general_purpose::STANDARD.decode(token_b64).ok()?;
//
// let s = String::from_utf8(bytes).ok()?;
//
// let cleaned = s
// .trim_matches(|c: char| c.is_whitespace() || c == '\0')
// .to_string();
// Some(cleaned)
// }
// fn build_grafana_datasource(
// &self,
// name: &str,
// ns: &str,
// label_selector: &LabelSelector,
// url: &str,
// token: &str,
// ) -> GrafanaDatasource {
// let mut json_data = BTreeMap::new();
// json_data.insert("timeInterval".to_string(), "5s".to_string());
//
// GrafanaDatasource {
// metadata: ObjectMeta {
// name: Some(name.to_string()),
// namespace: Some(ns.to_string()),
// ..Default::default()
// },
// spec: GrafanaDatasourceSpec {
// instance_selector: label_selector.clone(),
// allow_cross_namespace_import: Some(true),
// values_from: None,
// datasource: GrafanaDatasourceConfig {
// access: "proxy".to_string(),
// name: name.to_string(),
// rype: "prometheus".to_string(),
// url: url.to_string(),
// database: None,
// json_data: Some(GrafanaDatasourceJsonData {
// time_interval: Some("60s".to_string()),
// http_header_name1: Some("Authorization".to_string()),
// tls_skip_verify: Some(true),
// oauth_pass_thru: Some(true),
// }),
// secure_json_data: Some(GrafanaDatasourceSecureJsonData {
// http_header_value1: Some(format!("Bearer {token}")),
// }),
// is_default: Some(false),
// editable: Some(true),
// },
// },
// }
// }

View File

@@ -1,67 +0,0 @@
use serde::Serialize;
use crate::{
interpret::Interpret,
modules::monitoring::grafana::grafana::Grafana,
score::Score,
topology::{K8sclient, Topology, monitoring::AlertRule},
};
#[derive(Debug, Clone, Serialize)]
pub struct GrafanaK8sRuleScore {
pub sender: Grafana,
pub rule: Box<dyn AlertRule<Grafana>>,
}
impl<T: Topology + K8sclient> Score<T> for GrafanaK8sRuleScore {
fn name(&self) -> String {
"GrafanaK8sRuleScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
todo!()
}
}
// kind: Secret
// apiVersion: v1
// metadata:
// name: credentials
// namespace: grafana
// stringData:
// PROMETHEUS_USERNAME: root
// PROMETHEUS_PASSWORD: secret
// type: Opaque
// ---
// apiVersion: grafana.integreatly.org/v1beta1
// kind: GrafanaDatasource
// metadata:
// name: grafanadatasource-sample
// spec:
// valuesFrom:
// - targetPath: "basicAuthUser"
// valueFrom:
// secretKeyRef:
// name: "credentials"
// key: "PROMETHEUS_USERNAME"
// - targetPath: "secureJsonData.basicAuthPassword"
// valueFrom:
// secretKeyRef:
// name: "credentials"
// key: "PROMETHEUS_PASSWORD"
// instanceSelector:
// matchLabels:
// dashboards: "grafana"
// datasource:
// name: prometheus
// type: prometheus
// access: proxy
// basicAuth: true
// url: http://prometheus-service:9090
// isDefault: true
// basicAuthUser: ${PROMETHEUS_USERNAME}
// jsonData:
// "tlsSkipVerify": true
// "timeInterval": "5s"
// secureJsonData:
// "basicAuthPassword": ${PROMETHEUS_PASSWORD} #

View File

@@ -1,223 +0,0 @@
use async_trait::async_trait;
use harmony_types::id::Id;
use serde::Serialize;
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
modules::monitoring::grafana::grafana::Grafana,
score::Score,
topology::{HelmCommand, K8sclient, Topology},
};
#[derive(Debug, Clone, Serialize)]
pub struct GrafanaK8sInstallScore {
pub sender: Grafana,
}
impl<T: Topology + K8sclient + HelmCommand> Score<T> for GrafanaK8sInstallScore {
fn name(&self) -> String {
"GrafanaK8sEnsureReadyScore".to_string()
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(GrafanaK8sInstallInterpret {})
}
}
#[derive(Debug, Clone, Serialize)]
pub struct GrafanaK8sInstallInterpret {}
#[async_trait]
impl<T: Topology + K8sclient + HelmCommand> Interpret<T> for GrafanaK8sInstallInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
todo!()
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("GrafanaK8sInstallInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
// let score = grafana_operator_helm_chart_score(sender.namespace.clone());
//
// score
// .create_interpret()
// .execute(inventory, self)
// .await
// .map_err(|e| PreparationError::new(e.to_string()))?;
//
//
// fn build_grafana_dashboard(
// &self,
// ns: &str,
// label_selector: &LabelSelector,
// ) -> GrafanaDashboard {
// let graf_dashboard = GrafanaDashboard {
// metadata: ObjectMeta {
// name: Some(format!("grafana-dashboard-{}", ns)),
// namespace: Some(ns.to_string()),
// ..Default::default()
// },
// spec: GrafanaDashboardSpec {
// resync_period: Some("30s".to_string()),
// instance_selector: label_selector.clone(),
// datasources: Some(vec![GrafanaDashboardDatasource {
// input_name: "DS_PROMETHEUS".to_string(),
// datasource_name: "thanos-openshift-monitoring".to_string(),
// }]),
// json: None,
// grafana_com: Some(GrafanaCom {
// id: 17406,
// revision: None,
// }),
// },
// };
// graf_dashboard
// }
//
// fn build_grafana(&self, ns: &str, labels: &BTreeMap<String, String>) -> GrafanaCRD {
// let grafana = GrafanaCRD {
// metadata: ObjectMeta {
// name: Some(format!("grafana-{}", ns)),
// namespace: Some(ns.to_string()),
// labels: Some(labels.clone()),
// ..Default::default()
// },
// spec: GrafanaSpec {
// config: None,
// admin_user: None,
// admin_password: None,
// ingress: None,
// persistence: None,
// resources: None,
// },
// };
// grafana
// }
//
// async fn build_grafana_ingress(&self, ns: &str) -> K8sIngressScore {
// let domain = self.get_domain(&format!("grafana-{}", ns)).await.unwrap();
// let name = format!("{}-grafana", ns);
// let backend_service = format!("grafana-{}-service", ns);
//
// K8sIngressScore {
// name: fqdn::fqdn!(&name),
// host: fqdn::fqdn!(&domain),
// backend_service: fqdn::fqdn!(&backend_service),
// port: 3000,
// path: Some("/".to_string()),
// path_type: Some(PathType::Prefix),
// namespace: Some(fqdn::fqdn!(&ns)),
// ingress_class_name: Some("openshift-default".to_string()),
// }
// }
// #[async_trait]
// impl Grafana for K8sAnywhereTopology {
// async fn install_grafana(&self) -> Result<PreparationOutcome, PreparationError> {
// let ns = "grafana";
//
// let mut label = BTreeMap::new();
//
// label.insert("dashboards".to_string(), "grafana".to_string());
//
// let label_selector = LabelSelector {
// match_labels: label.clone(),
// match_expressions: vec![],
// };
//
// let client = self.k8s_client().await?;
//
// let grafana = self.build_grafana(ns, &label);
//
// client.apply(&grafana, Some(ns)).await?;
// //TODO change this to a ensure ready or something better than just a timeout
// client
// .wait_until_deployment_ready(
// "grafana-grafana-deployment",
// Some("grafana"),
// Some(Duration::from_secs(30)),
// )
// .await?;
//
// let sa_name = "grafana-grafana-sa";
// let token_secret_name = "grafana-sa-token-secret";
//
// let sa_token_secret = self.build_sa_token_secret(token_secret_name, sa_name, ns);
//
// client.apply(&sa_token_secret, Some(ns)).await?;
// let secret_gvk = GroupVersionKind {
// group: "".to_string(),
// version: "v1".to_string(),
// kind: "Secret".to_string(),
// };
//
// let secret = client
// .get_resource_json_value(token_secret_name, Some(ns), &secret_gvk)
// .await?;
//
// let token = format!(
// "Bearer {}",
// self.extract_and_normalize_token(&secret).unwrap()
// );
//
// debug!("creating grafana clusterrole binding");
//
// let clusterrolebinding =
// self.build_cluster_rolebinding(sa_name, "cluster-monitoring-view", ns);
//
// client.apply(&clusterrolebinding, Some(ns)).await?;
//
// debug!("creating grafana datasource crd");
//
// let thanos_url = format!(
// "https://{}",
// self.get_domain("thanos-querier-openshift-monitoring")
// .await
// .unwrap()
// );
//
// let thanos_openshift_datasource = self.build_grafana_datasource(
// "thanos-openshift-monitoring",
// ns,
// &label_selector,
// &thanos_url,
// &token,
// );
//
// client.apply(&thanos_openshift_datasource, Some(ns)).await?;
//
// debug!("creating grafana dashboard crd");
// let dashboard = self.build_grafana_dashboard(ns, &label_selector);
//
// client.apply(&dashboard, Some(ns)).await?;
// debug!("creating grafana ingress");
// let grafana_ingress = self.build_grafana_ingress(ns).await;
//
// grafana_ingress
// .interpret(&Inventory::empty(), self)
// .await
// .map_err(|e| PreparationError::new(e.to_string()))?;
//
// Ok(PreparationOutcome::Success {
// details: "Installed grafana composants".to_string(),
// })
// }
// }

View File

@@ -1,3 +1,2 @@
pub mod grafana;
pub mod grafana_alerting_score;
pub mod k8s;
pub mod helm;

View File

@@ -1,17 +1,91 @@
use std::sync::Arc;
use async_trait::async_trait;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
#[derive(CustomResource, Serialize, Deserialize, Default, Debug, Clone, JsonSchema)]
use crate::{
interpret::{InterpretError, Outcome},
inventory::Inventory,
modules::{
monitoring::{
grafana::grafana::Grafana, kube_prometheus::crd::service_monitor::ServiceMonitor,
},
prometheus::prometheus::PrometheusMonitoring,
},
topology::{
K8sclient, Topology,
installable::Installable,
k8s::K8sClient,
oberservability::monitoring::{AlertReceiver, AlertSender, ScrapeTarget},
},
};
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "monitoring.coreos.com",
version = "v1",
version = "v1alpha1",
kind = "AlertmanagerConfig",
plural = "alertmanagerconfigs",
namespaced,
derive = "Default"
namespaced
)]
pub struct AlertmanagerConfigSpec {
#[serde(flatten)]
pub data: serde_json::Value,
}
#[derive(Debug, Clone, Serialize)]
pub struct CRDPrometheus {
pub namespace: String,
pub client: Arc<K8sClient>,
pub service_monitor: Vec<ServiceMonitor>,
}
impl AlertSender for CRDPrometheus {
fn name(&self) -> String {
"CRDAlertManager".to_string()
}
}
impl Clone for Box<dyn AlertReceiver<CRDPrometheus>> {
fn clone(&self) -> Self {
self.clone_box()
}
}
impl Clone for Box<dyn ScrapeTarget<CRDPrometheus>> {
fn clone(&self) -> Self {
self.clone_box()
}
}
impl Serialize for Box<dyn AlertReceiver<CRDPrometheus>> {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
todo!()
}
}
#[async_trait]
impl<T: Topology + K8sclient + PrometheusMonitoring<CRDPrometheus> + Grafana> Installable<T>
for CRDPrometheus
{
async fn configure(&self, inventory: &Inventory, topology: &T) -> Result<(), InterpretError> {
topology.ensure_grafana_operator(inventory).await?;
topology.ensure_prometheus_operator(self, inventory).await?;
Ok(())
}
async fn ensure_installed(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<(), InterpretError> {
topology.install_grafana().await?;
topology.install_prometheus(&self, inventory, None).await?;
Ok(())
}
}

View File

@@ -1,4 +1,4 @@
use crate::modules::monitoring::alert_rule::alerts::k8s::{
use crate::modules::prometheus::alerts::k8s::{
deployment::alert_deployment_unavailable,
pod::{alert_container_restarting, alert_pod_not_ready, pod_failed},
pvc::high_pvc_fill_rate_over_two_days,

View File

@@ -4,7 +4,7 @@ use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::modules::monitoring::kube_prometheus::crd::crd_prometheuses::LabelSelector;
use super::crd_prometheuses::LabelSelector;
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(

View File

@@ -6,14 +6,13 @@ use serde::{Deserialize, Serialize};
use crate::modules::monitoring::alert_rule::prometheus_alert_rule::PrometheusAlertRule;
#[derive(CustomResource, Default, Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[kube(
group = "monitoring.coreos.com",
version = "v1",
kind = "PrometheusRule",
plural = "prometheusrules",
namespaced,
derive = "Default"
namespaced
)]
#[serde(rename_all = "camelCase")]
pub struct PrometheusRuleSpec {

View File

@@ -1,18 +1,23 @@
use std::collections::BTreeMap;
use std::net::IpAddr;
use async_trait::async_trait;
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::modules::monitoring::kube_prometheus::crd::crd_prometheuses::LabelSelector;
use crate::{
modules::monitoring::kube_prometheus::crd::{
crd_alertmanager_config::CRDPrometheus, crd_prometheuses::LabelSelector,
},
topology::oberservability::monitoring::ScrapeTarget,
};
#[derive(CustomResource, Default, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(
group = "monitoring.coreos.com",
version = "v1alpha1",
kind = "ScrapeConfig",
plural = "scrapeconfigs",
derive = "Default",
namespaced
)]
#[serde(rename_all = "camelCase")]
@@ -65,8 +70,8 @@ pub struct ScrapeConfigSpec {
#[serde(rename_all = "camelCase")]
pub struct StaticConfig {
pub targets: Vec<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub labels: Option<BTreeMap<String, String>>,
pub labels: Option<LabelSelector>,
}
/// Relabeling configuration for target or metric relabeling.

Some files were not shown because too many files have changed in this diff Show More