Compare commits
65 Commits
feat/impro
...
example/vl
| Author | SHA1 | Date | |
|---|---|---|---|
| 304490977c | |||
| 8499f4d1b7 | |||
| aa07f4c8ad | |||
| 77bb138497 | |||
| a16879b1b6 | |||
| f57e6f5957 | |||
| 7605d05de3 | |||
| b244127843 | |||
| 67c3265286 | |||
| d10598d01e | |||
| 61ba7257d0 | |||
| b0e9594d92 | |||
| 2a7fa466cc | |||
| f463cd1e94 | |||
| e1da7949ec | |||
| d0a1a73710 | |||
| bc2b328296 | |||
| a93896707f | |||
| 0e9b23a320 | |||
| f532ba2b40 | |||
| fafca31798 | |||
| 5412c34957 | |||
| 787cc8feab | |||
| ce041f495b | |||
| bfb86f63ce | |||
| 55de206523 | |||
| 64893a84f5 | |||
| f941672662 | |||
| a98113dd40 | |||
| 5db1a31d33 | |||
| f5aac67af8 | |||
| d7e5bf11d5 | |||
| 2e1f1b8447 | |||
| 2b157ad7fd | |||
| a0c0905c3b | |||
| d920de34cf | |||
| 4276b9137b | |||
| 6ab88ab8d9 | |||
| fe52f69473 | |||
| d8338ad12c | |||
| ac9fedf853 | |||
| fd3705e382 | |||
| 4840c7fdc2 | |||
| 20172a7801 | |||
| 6bb33c5845 | |||
| d9357adad3 | |||
| a25ca86bdf | |||
| 646c5e723e | |||
| 69c382e8c6 | |||
| dca764395d | |||
| 53d0704a35 | |||
| 2738985edb | |||
| d9a21bf94b | |||
| 8f8bd34168 | |||
| b5e971b3b6 | |||
| a1c0e0e246 | |||
| d084cee8d5 | |||
| 63ef1c0ea7 | |||
| d8ab9d52a4 | |||
| 2cb7aeefc0 | |||
| 16016febcf | |||
| e709de531d | |||
| 6ab0f3a6ab | |||
| 724ab0b888 | |||
| 8b6ce8d069 |
2616
Cargo.lock
generated
2616
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -2,7 +2,6 @@
|
||||
resolver = "2"
|
||||
members = [
|
||||
"private_repos/*",
|
||||
"examples/*",
|
||||
"harmony",
|
||||
"harmony_types",
|
||||
"harmony_macros",
|
||||
@@ -19,7 +18,7 @@ members = [
|
||||
"adr/agent_discovery/mdns",
|
||||
"brocade",
|
||||
"harmony_agent",
|
||||
"harmony_agent/deploy",
|
||||
"harmony_agent/deploy", "harmony_node_readiness", "harmony-k8s", "examples/vllm",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
@@ -38,6 +37,8 @@ tokio = { version = "1.40", features = [
|
||||
"macros",
|
||||
"rt-multi-thread",
|
||||
] }
|
||||
tokio-retry = "0.3.0"
|
||||
tokio-util = "0.7.15"
|
||||
cidr = { features = ["serde"], version = "0.2" }
|
||||
russh = "0.45"
|
||||
russh-keys = "0.45"
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use super::BrocadeClient;
|
||||
use crate::{
|
||||
BrocadeInfo, Error, ExecutionMode, InterSwitchLink, InterfaceInfo, MacAddressEntry,
|
||||
PortChannelId, PortOperatingMode, SecurityLevel, parse_brocade_mac_address,
|
||||
shell::BrocadeShell,
|
||||
PortChannelId, PortOperatingMode, parse_brocade_mac_address, shell::BrocadeShell,
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
|
||||
@@ -8,7 +8,7 @@ use regex::Regex;
|
||||
use crate::{
|
||||
BrocadeClient, BrocadeInfo, Error, ExecutionMode, InterSwitchLink, InterfaceInfo,
|
||||
InterfaceStatus, InterfaceType, MacAddressEntry, PortChannelId, PortOperatingMode,
|
||||
SecurityLevel, parse_brocade_mac_address, shell::BrocadeShell,
|
||||
parse_brocade_mac_address, shell::BrocadeShell,
|
||||
};
|
||||
|
||||
#[derive(Debug)]
|
||||
|
||||
@@ -1,8 +1,8 @@
|
||||
use harmony::{
|
||||
inventory::Inventory,
|
||||
modules::cert_manager::{
|
||||
capability::CertificateManagementConfig, score_cert_management::CertificateManagementScore,
|
||||
score_certificate::CertificateScore, score_issuer::CertificateIssuerScore,
|
||||
capability::CertificateManagementConfig, score_certificate::CertificateScore,
|
||||
score_issuer::CertificateIssuerScore,
|
||||
},
|
||||
topology::K8sAnywhereTopology,
|
||||
};
|
||||
|
||||
@@ -10,9 +10,10 @@ publish = false
|
||||
harmony = { path = "../../harmony" }
|
||||
harmony_cli = { path = "../../harmony_cli" }
|
||||
harmony_types = { path = "../../harmony_types" }
|
||||
harmony_macros = { path = "../../harmony_macros" }
|
||||
harmony-k8s = { path = "../../harmony-k8s" }
|
||||
cidr.workspace = true
|
||||
tokio.workspace = true
|
||||
harmony_macros = { path = "../../harmony_macros" }
|
||||
log.workspace = true
|
||||
env_logger.workspace = true
|
||||
url.workspace = true
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use harmony::topology::k8s::{DrainOptions, K8sClient};
|
||||
use harmony_k8s::{DrainOptions, K8sClient};
|
||||
use log::{info, trace};
|
||||
|
||||
#[tokio::main]
|
||||
|
||||
@@ -10,9 +10,10 @@ publish = false
|
||||
harmony = { path = "../../harmony" }
|
||||
harmony_cli = { path = "../../harmony_cli" }
|
||||
harmony_types = { path = "../../harmony_types" }
|
||||
harmony_macros = { path = "../../harmony_macros" }
|
||||
harmony-k8s = { path = "../../harmony-k8s" }
|
||||
cidr.workspace = true
|
||||
tokio.workspace = true
|
||||
harmony_macros = { path = "../../harmony_macros" }
|
||||
log.workspace = true
|
||||
env_logger.workspace = true
|
||||
url.workspace = true
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use harmony::topology::k8s::{DrainOptions, K8sClient, NodeFile};
|
||||
use harmony_k8s::{K8sClient, NodeFile};
|
||||
use log::{info, trace};
|
||||
|
||||
#[tokio::main]
|
||||
|
||||
@@ -14,7 +14,6 @@ async fn main() {
|
||||
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
|
||||
// "default" namespace, 1 instance, 1Gi storage
|
||||
},
|
||||
hostname: "postgrestest.sto1.nationtech.io".to_string(),
|
||||
};
|
||||
|
||||
harmony_cli::run(
|
||||
|
||||
16
examples/node_health/Cargo.toml
Normal file
16
examples/node_health/Cargo.toml
Normal file
@@ -0,0 +1,16 @@
|
||||
[package]
|
||||
name = "example-node-health"
|
||||
edition = "2024"
|
||||
version.workspace = true
|
||||
readme.workspace = true
|
||||
license.workspace = true
|
||||
publish = false
|
||||
|
||||
[dependencies]
|
||||
harmony = { path = "../../harmony" }
|
||||
harmony_cli = { path = "../../harmony_cli" }
|
||||
harmony_types = { path = "../../harmony_types" }
|
||||
tokio = { workspace = true }
|
||||
harmony_macros = { path = "../../harmony_macros" }
|
||||
log = { workspace = true }
|
||||
env_logger = { workspace = true }
|
||||
17
examples/node_health/src/main.rs
Normal file
17
examples/node_health/src/main.rs
Normal file
@@ -0,0 +1,17 @@
|
||||
use harmony::{
|
||||
inventory::Inventory, modules::node_health::NodeHealthScore, topology::K8sAnywhereTopology,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let node_health = NodeHealthScore {};
|
||||
|
||||
harmony_cli::run(
|
||||
Inventory::autoload(),
|
||||
K8sAnywhereTopology::from_env(),
|
||||
vec![Box::new(node_health)],
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
@@ -6,7 +6,10 @@ use harmony::{
|
||||
data::{FileContent, FilePath},
|
||||
modules::{
|
||||
inventory::HarmonyDiscoveryStrategy,
|
||||
okd::{installation::OKDInstallationPipeline, ipxe::OKDIpxeScore},
|
||||
okd::{
|
||||
installation::OKDInstallationPipeline, ipxe::OKDIpxeScore,
|
||||
load_balancer::OKDLoadBalancerScore,
|
||||
},
|
||||
},
|
||||
score::Score,
|
||||
topology::HAClusterTopology,
|
||||
@@ -32,6 +35,7 @@ async fn main() {
|
||||
scores
|
||||
.append(&mut OKDInstallationPipeline::get_all_scores(HarmonyDiscoveryStrategy::MDNS).await);
|
||||
|
||||
scores.push(Box::new(OKDLoadBalancerScore::new(&topology)));
|
||||
harmony_cli::run(inventory, topology, scores, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
@@ -1,63 +1,13 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use harmony::{
|
||||
inventory::Inventory,
|
||||
modules::helm::chart::{HelmChartScore, HelmRepository, NonBlankString},
|
||||
topology::K8sAnywhereTopology,
|
||||
inventory::Inventory, modules::openbao::OpenbaoScore, topology::K8sAnywhereTopology,
|
||||
};
|
||||
use harmony_macros::hurl;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let values_yaml = Some(
|
||||
r#"server:
|
||||
standalone:
|
||||
enabled: true
|
||||
config: |
|
||||
listener "tcp" {
|
||||
tls_disable = true
|
||||
address = "[::]:8200"
|
||||
cluster_address = "[::]:8201"
|
||||
}
|
||||
|
||||
storage "file" {
|
||||
path = "/openbao/data"
|
||||
}
|
||||
|
||||
service:
|
||||
enabled: true
|
||||
|
||||
dataStorage:
|
||||
enabled: true
|
||||
size: 10Gi
|
||||
storageClass: null
|
||||
accessMode: ReadWriteOnce
|
||||
|
||||
auditStorage:
|
||||
enabled: true
|
||||
size: 10Gi
|
||||
storageClass: null
|
||||
accessMode: ReadWriteOnce"#
|
||||
.to_string(),
|
||||
);
|
||||
let openbao = HelmChartScore {
|
||||
namespace: Some(NonBlankString::from_str("openbao").unwrap()),
|
||||
release_name: NonBlankString::from_str("openbao").unwrap(),
|
||||
chart_name: NonBlankString::from_str("openbao/openbao").unwrap(),
|
||||
chart_version: None,
|
||||
values_overrides: None,
|
||||
values_yaml,
|
||||
create_namespace: true,
|
||||
install_only: true,
|
||||
repository: Some(HelmRepository::new(
|
||||
"openbao".to_string(),
|
||||
hurl!("https://openbao.github.io/openbao-helm"),
|
||||
true,
|
||||
)),
|
||||
let openbao = OpenbaoScore {
|
||||
host: "openbao.sebastien.sto1.nationtech.io".to_string(),
|
||||
};
|
||||
|
||||
// TODO exec pod commands to initialize secret store if not already done
|
||||
|
||||
harmony_cli::run(
|
||||
Inventory::autoload(),
|
||||
K8sAnywhereTopology::from_env(),
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use harmony::{
|
||||
inventory::Inventory,
|
||||
modules::{k8s::apps::OperatorHubCatalogSourceScore, postgresql::CloudNativePgOperatorScore},
|
||||
@@ -9,7 +7,7 @@ use harmony::{
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let operatorhub_catalog = OperatorHubCatalogSourceScore::default();
|
||||
let cnpg_operator = CloudNativePgOperatorScore::default();
|
||||
let cnpg_operator = CloudNativePgOperatorScore::default_openshift();
|
||||
|
||||
harmony_cli::run(
|
||||
Inventory::autoload(),
|
||||
|
||||
@@ -1,22 +1,13 @@
|
||||
use std::{
|
||||
net::{IpAddr, Ipv4Addr},
|
||||
sync::Arc,
|
||||
};
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use cidr::Ipv4Cidr;
|
||||
use harmony::{
|
||||
executors::ExecutorError,
|
||||
hardware::{HostCategory, Location, PhysicalHost, SwitchGroup},
|
||||
infra::opnsense::OPNSenseManagementInterface,
|
||||
inventory::Inventory,
|
||||
modules::opnsense::node_exporter::NodeExporterScore,
|
||||
topology::{
|
||||
HAClusterTopology, LogicalHost, PreparationError, PreparationOutcome, Topology,
|
||||
UnmanagedRouter, node_exporter::NodeExporter,
|
||||
},
|
||||
topology::{PreparationError, PreparationOutcome, Topology, node_exporter::NodeExporter},
|
||||
};
|
||||
use harmony_macros::{ip, ipv4, mac_address};
|
||||
use harmony_macros::ip;
|
||||
|
||||
#[derive(Debug)]
|
||||
struct OpnSenseTopology {
|
||||
|
||||
@@ -1,8 +1,7 @@
|
||||
use harmony::{
|
||||
inventory::Inventory,
|
||||
modules::postgresql::{
|
||||
K8sPostgreSQLScore, PostgreSQLConnectionScore, PublicPostgreSQLScore,
|
||||
capability::PostgreSQLConfig,
|
||||
PostgreSQLConnectionScore, PublicPostgreSQLScore, capability::PostgreSQLConfig,
|
||||
},
|
||||
topology::K8sAnywhereTopology,
|
||||
};
|
||||
@@ -16,7 +15,6 @@ async fn main() {
|
||||
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
|
||||
// 1 instance, 1Gi storage
|
||||
},
|
||||
hostname: "postgrestest.sto1.nationtech.io".to_string(),
|
||||
};
|
||||
|
||||
let test_connection = PostgreSQLConnectionScore {
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::{collections::HashMap, path::PathBuf, sync::Arc};
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
|
||||
use harmony::{
|
||||
inventory::Inventory,
|
||||
|
||||
@@ -1,4 +1,4 @@
|
||||
use std::{collections::HashMap, path::PathBuf, sync::Arc};
|
||||
use std::{path::PathBuf, sync::Arc};
|
||||
|
||||
use harmony::{
|
||||
inventory::Inventory,
|
||||
|
||||
@@ -44,6 +44,7 @@ fn build_large_score() -> LoadBalancerScore {
|
||||
],
|
||||
listening_port: SocketAddr::V4(SocketAddrV4::new(ipv4!("192.168.0.0"), 49387)),
|
||||
health_check: Some(HealthCheck::HTTP(
|
||||
Some(1993),
|
||||
"/some_long_ass_path_to_see_how_it_is_displayed_but_it_has_to_be_even_longer"
|
||||
.to_string(),
|
||||
HttpMethod::GET,
|
||||
|
||||
15
examples/vllm/Cargo.toml
Normal file
15
examples/vllm/Cargo.toml
Normal file
@@ -0,0 +1,15 @@
|
||||
[package]
|
||||
name = "vllm"
|
||||
edition = "2024"
|
||||
version.workspace = true
|
||||
readme.workspace = true
|
||||
license.workspace = true
|
||||
publish = false
|
||||
|
||||
[dependencies]
|
||||
harmony = { path = "../../harmony" }
|
||||
harmony_cli = { path = "../../harmony_cli" }
|
||||
k8s-openapi = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
log = { workspace = true }
|
||||
env_logger = { workspace = true }
|
||||
523
examples/vllm/src/main.rs
Normal file
523
examples/vllm/src/main.rs
Normal file
@@ -0,0 +1,523 @@
|
||||
//! vLLM Deployment Example for Qwen3.5-27B-FP8 on NVIDIA RTX 5090
|
||||
//!
|
||||
//! This example deploys vLLM serving Qwen3.5-27B with FP8 quantization,
|
||||
//! optimized for single RTX 5090 (32GB VRAM) with tool calling support.
|
||||
//!
|
||||
//! # Architecture & Memory Constraints
|
||||
//!
|
||||
//! **Model Details:**
|
||||
//! - Parameters: 27B (dense, not sparse/MoE)
|
||||
//! - Quantization: FP8 (8-bit weights)
|
||||
//! - Model size: ~27-28GB in memory
|
||||
//! - Native context: 262,144 tokens (will NOT fit in 32GB VRAM)
|
||||
//!
|
||||
//! **VRAM Budget for RTX 5090 (32GB):**
|
||||
//! - Model weights (FP8): ~27GB
|
||||
//! - Framework overhead: ~1-2GB
|
||||
//! - KV cache: ~2-3GB (for 16k context)
|
||||
//! - CUDA context: ~500MB
|
||||
//! - Temporary buffers: ~500MB
|
||||
//! - **Total: ~31-33GB** (tight fit, leaves minimal headroom)
|
||||
//!
|
||||
//! # OpenShift/OKD Requirements
|
||||
//!
|
||||
//! **SCC (Security Context Constraint) Setup:**
|
||||
//!
|
||||
//! The official vLLM container runs as root and writes to `/root/.cache/huggingface`.
|
||||
//! On OpenShift/OKD with the default restricted SCC, containers run as arbitrary UIDs
|
||||
//! and cannot write to `/root`. For testing, grant the `anyuid` SCC:
|
||||
//!
|
||||
//! ```bash
|
||||
//! # As cluster admin, grant anyuid SCC to the namespace's service account:
|
||||
//! oc adm policy add-scc-to-user anyuid -z default -n vllm-qwen
|
||||
//! ```
|
||||
//!
|
||||
//! This allows pods in the `vllm-qwen` namespace to run as root (UID 0).
|
||||
//! For production, consider building a custom vLLM image that runs as non-root.
|
||||
//!
|
||||
//! # Critical Configuration Notes
|
||||
//!
|
||||
//! 1. **GPU_MEMORY_UTILIZATION=1.0**: Maximum GPU memory allocation.
|
||||
//! NEVER decrease this for dense models - CPU offloading destroys performance
|
||||
//! (100-1000x slower) for models where every parameter is used during inference.
|
||||
//!
|
||||
//! 2. **MAX_MODEL_LEN=16384**: Conservative context length that fits in available VRAM.
|
||||
//! Agentic workflows with long tool call histories will need careful context management.
|
||||
//!
|
||||
//! 3. **--language-model-only**: Skips loading the vision encoder, saving ~1-2GB VRAM.
|
||||
//! Essential for fitting the model in 32GB VRAM.
|
||||
//!
|
||||
//! 4. **PVC Size**: 50Gi for HuggingFace cache. Qwen3.5-27B-FP8 is ~30GB.
|
||||
//!
|
||||
//! # Performance Expectations
|
||||
//!
|
||||
//! - Single token latency: ~50-100ms (no CPU offloading)
|
||||
//! - With CPU offloading: ~5-50 seconds per token (unusable for real-time inference)
|
||||
//! - Throughput: ~10-20 tokens/second (single stream, no batching)
|
||||
//!
|
||||
//! # Next Steps for Production
|
||||
//!
|
||||
//! To increase context length:
|
||||
//! 1. Monitor GPU memory: `kubectl exec -it deployment/qwen3-5-27b -- nvidia-smi dmon -s u`
|
||||
//! 2. If stable, increase MAX_MODEL_LEN (try 32768, then 65536)
|
||||
//! 3. If OOM: revert to lower value
|
||||
//!
|
||||
//! For full 262k context, consider:
|
||||
//! - Multi-GPU setup with tensor parallelism (--tensor-parallel-size 8)
|
||||
//! - Or use a smaller model (Qwen3.5-7B-FP8)
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use harmony::{
|
||||
inventory::Inventory,
|
||||
modules::{
|
||||
k8s::resource::K8sResourceScore,
|
||||
okd::{
|
||||
crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig},
|
||||
route::OKDRouteScore,
|
||||
},
|
||||
},
|
||||
score::Score,
|
||||
topology::{K8sAnywhereTopology, TlsRouter},
|
||||
};
|
||||
use k8s_openapi::{
|
||||
api::{
|
||||
apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy},
|
||||
core::v1::{
|
||||
Container, ContainerPort, EmptyDirVolumeSource, EnvVar, EnvVarSource,
|
||||
HTTPGetAction, PersistentVolumeClaim, PersistentVolumeClaimSpec,
|
||||
PersistentVolumeClaimVolumeSource, PodSpec, PodTemplateSpec, Probe,
|
||||
ResourceRequirements, Secret, SecretKeySelector, SecretVolumeSource, Service,
|
||||
ServicePort, ServiceSpec, Volume, VolumeMount, VolumeResourceRequirements,
|
||||
},
|
||||
},
|
||||
apimachinery::pkg::{
|
||||
api::resource::Quantity,
|
||||
apis::meta::v1::{LabelSelector, ObjectMeta},
|
||||
util::intstr::IntOrString,
|
||||
},
|
||||
ByteString,
|
||||
};
|
||||
use log::info;
|
||||
|
||||
const NAMESPACE: &str = "vllm-qwen";
|
||||
const MODEL_NAME: &str = "Qwen/Qwen3.5-27B-FP8";
|
||||
const DEPLOYMENT_NAME: &str = "qwen3-5-27b";
|
||||
const SERVICE_NAME: &str = DEPLOYMENT_NAME;
|
||||
const ROUTE_NAME: &str = DEPLOYMENT_NAME;
|
||||
const PVC_NAME: &str = "huggingface-cache";
|
||||
const SECRET_NAME: &str = "hf-token-secret";
|
||||
|
||||
const VLLM_IMAGE: &str = "vllm/vllm-openai:latest";
|
||||
const SERVICE_PORT: u16 = 8000;
|
||||
const TARGET_PORT: u16 = 8000;
|
||||
|
||||
/// Maximum context length for the model (in tokens).
|
||||
///
|
||||
/// **Impact on VRAM:**
|
||||
/// - Qwen3.5-27B uses per-token KV cache storage for the context window
|
||||
/// - Larger context = more KV cache memory required
|
||||
/// - Approximate KV cache per token: ~32KB for FP8 (very rough estimate)
|
||||
/// - 16k tokens ≈ 0.5-1GB KV cache
|
||||
/// - 262k tokens ≈ 8-16GB KV cache (native context length - will NOT fit in 32GB VRAM)
|
||||
///
|
||||
/// **Performance Impact:**
|
||||
/// - Context length directly impacts memory for storing conversation history
|
||||
/// - Agentic workflows with long tool call histories benefit from more context
|
||||
/// - If context > available VRAM, vLLM will OOM and fail to start
|
||||
///
|
||||
/// **Recommendations for RTX 5090 (32GB):**
|
||||
/// - Start with 16384 (conservative, should work)
|
||||
/// - If no OOM, try 32768 (better for agentic workflows)
|
||||
/// - Monitor GPU memory with `nvidia-smi` during operation
|
||||
const MAX_MODEL_LEN: i64 = 16384;
|
||||
|
||||
/// Fraction of GPU memory to allocate for the model (0.0 to 1.0).
|
||||
///
|
||||
/// **CRITICAL WARNING: This is a dense model!**
|
||||
/// Qwen3.5-27B-FP8 is NOT a sparse/mixture-of-experts model. All 27B parameters
|
||||
/// are active during inference. CPU offloading will DESTROY performance.
|
||||
///
|
||||
/// **What this parameter controls:**
|
||||
/// - Controls how much of GPU memory vLLM pre-allocates for:
|
||||
/// 1. Model weights (~27GB for FP8 quantization)
|
||||
/// 2. KV cache for context window
|
||||
/// 3. Activation buffers for inference
|
||||
/// 4. Runtime overhead
|
||||
///
|
||||
/// **VRAM Allocation Example:**
|
||||
/// - GPU: 32GB RTX 5090
|
||||
/// - GPU_MEMORY_UTILIZATION: 0.95
|
||||
/// - vLLM will try to use: 32GB * 0.95 = 30.4GB
|
||||
/// - Model weights: ~27-28GB
|
||||
/// - Remaining for KV cache + runtime: ~2-3GB
|
||||
///
|
||||
/// **If set too LOW (e.g., 0.7):**
|
||||
/// - vLLM restricts itself to 32GB * 0.7 = 22.4GB
|
||||
/// - Model weights alone need ~27GB
|
||||
/// - vLLM will OFFLOAD model weights to CPU memory
|
||||
/// - Performance: **100-1000x slower** (single token generation can take seconds instead of milliseconds)
|
||||
/// - This is catastrophic for a dense model where every layer needs all parameters
|
||||
///
|
||||
/// **If set too HIGH (e.g., 0.99):**
|
||||
/// - vLLM tries to allocate nearly all GPU memory
|
||||
/// - Risk: CUDA OOM if any other process needs GPU memory
|
||||
/// - Risk: KV cache allocation fails during inference
|
||||
/// - System instability
|
||||
///
|
||||
/// **Current Setting: 0.95**
|
||||
/// - Leaves 5% buffer (1.6GB) for CUDA overhead, system processes
|
||||
/// - Maximum allocation for model + KV cache: ~30.4GB
|
||||
/// - Should leave enough headroom for:
|
||||
/// - CUDA context: ~500MB
|
||||
/// - Temporary buffers: ~500MB
|
||||
/// - Safety margin: ~600MB
|
||||
///
|
||||
/// **How to tune:**
|
||||
/// 1. Start with 0.95 (current setting)
|
||||
/// 2. Monitor with `nvidia-smi dmon -s u` during operation
|
||||
/// 3. If OOM during inference: reduce MAX_MODEL_LEN first
|
||||
/// 4. If stable: try increasing MAX_MODEL_LEN before increasing this
|
||||
/// 5. Only increase this if you're certain no other GPU processes run
|
||||
///
|
||||
/// **NEVER decrease this for dense models!**
|
||||
/// If model doesn't fit, use a smaller model or quantization, not CPU offloading.
|
||||
const GPU_MEMORY_UTILIZATION : f32 = 1.0;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
||||
env_logger::init();
|
||||
|
||||
info!("Deploying vLLM with Qwen3.5-27B-FP8 model");
|
||||
info!("Configuration:");
|
||||
info!(" Model: {}", MODEL_NAME);
|
||||
info!(" Max context length: {} tokens", MAX_MODEL_LEN);
|
||||
info!(" GPU memory utilization: {}", GPU_MEMORY_UTILIZATION);
|
||||
info!(" Language model only: true");
|
||||
info!(" Tool calling enabled: true");
|
||||
|
||||
let topology = K8sAnywhereTopology::from_env();
|
||||
let domain = topology
|
||||
.get_internal_domain()
|
||||
.await
|
||||
.ok()
|
||||
.flatten()
|
||||
.unwrap_or_else(|| "cluster.local".to_string());
|
||||
|
||||
let host = format!("{}-{}.apps.{}", SERVICE_NAME, NAMESPACE, domain);
|
||||
info!("Creating route with host: {}", host);
|
||||
|
||||
let scores: Vec<Box<dyn Score<K8sAnywhereTopology>>> = vec![
|
||||
create_namespace(),
|
||||
create_pvc(),
|
||||
create_secret(),
|
||||
create_deployment(),
|
||||
create_service(),
|
||||
create_route(&host),
|
||||
];
|
||||
|
||||
harmony_cli::run(Inventory::autoload(), topology, scores, None)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to deploy: {}", e))?;
|
||||
|
||||
info!("Successfully deployed vLLM with Qwen3.5-27B-FP8");
|
||||
info!("Access the API at: http://{}.apps.<cluster-domain>", SERVICE_NAME);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn create_namespace() -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||
use k8s_openapi::api::core::v1::Namespace;
|
||||
|
||||
let namespace = Namespace {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(NAMESPACE.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: None,
|
||||
status: None,
|
||||
};
|
||||
|
||||
Box::new(K8sResourceScore::single(namespace, None))
|
||||
}
|
||||
|
||||
fn create_pvc() -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||
let pvc = PersistentVolumeClaim {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(PVC_NAME.to_string()),
|
||||
namespace: Some(NAMESPACE.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: Some(PersistentVolumeClaimSpec {
|
||||
access_modes: Some(vec!["ReadWriteOnce".to_string()]),
|
||||
resources: Some(VolumeResourceRequirements {
|
||||
requests: Some(BTreeMap::from([(
|
||||
"storage".to_string(),
|
||||
Quantity("50Gi".to_string()),
|
||||
)])),
|
||||
limits: None,
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
status: None,
|
||||
};
|
||||
|
||||
Box::new(K8sResourceScore::single(
|
||||
pvc,
|
||||
Some(NAMESPACE.to_string()),
|
||||
))
|
||||
}
|
||||
|
||||
fn create_secret() -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||
let mut data = BTreeMap::new();
|
||||
data.insert(
|
||||
"token".to_string(),
|
||||
ByteString("".to_string().into_bytes()),
|
||||
);
|
||||
|
||||
let secret = Secret {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(SECRET_NAME.to_string()),
|
||||
namespace: Some(NAMESPACE.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
data: Some(data),
|
||||
immutable: Some(false),
|
||||
type_: Some("Opaque".to_string()),
|
||||
string_data: None,
|
||||
};
|
||||
|
||||
Box::new(K8sResourceScore::single(
|
||||
secret,
|
||||
Some(NAMESPACE.to_string()),
|
||||
))
|
||||
}
|
||||
|
||||
fn create_deployment() -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||
let deployment = Deployment {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(DEPLOYMENT_NAME.to_string()),
|
||||
namespace: Some(NAMESPACE.to_string()),
|
||||
labels: Some(BTreeMap::from([(
|
||||
"app".to_string(),
|
||||
DEPLOYMENT_NAME.to_string(),
|
||||
)])),
|
||||
..Default::default()
|
||||
},
|
||||
spec: Some(DeploymentSpec {
|
||||
replicas: Some(1),
|
||||
selector: LabelSelector {
|
||||
match_labels: Some(BTreeMap::from([(
|
||||
"app".to_string(),
|
||||
DEPLOYMENT_NAME.to_string(),
|
||||
)])),
|
||||
..Default::default()
|
||||
},
|
||||
strategy: Some(DeploymentStrategy {
|
||||
type_: Some("Recreate".to_string()),
|
||||
..Default::default()
|
||||
}),
|
||||
template: PodTemplateSpec {
|
||||
metadata: Some(ObjectMeta {
|
||||
labels: Some(BTreeMap::from([(
|
||||
"app".to_string(),
|
||||
DEPLOYMENT_NAME.to_string(),
|
||||
)])),
|
||||
..Default::default()
|
||||
}),
|
||||
spec: Some(PodSpec {
|
||||
node_selector: Some(BTreeMap::from([(
|
||||
"nvidia.com/gpu.product".to_string(),
|
||||
"NVIDIA-GeForce-RTX-5090".to_string(),
|
||||
)])),
|
||||
volumes: Some(vec![
|
||||
Volume {
|
||||
name: "cache-volume".to_string(),
|
||||
persistent_volume_claim: Some(PersistentVolumeClaimVolumeSource {
|
||||
claim_name: PVC_NAME.to_string(),
|
||||
read_only: Some(false),
|
||||
}),
|
||||
..Default::default()
|
||||
},
|
||||
Volume {
|
||||
name: "shm".to_string(),
|
||||
empty_dir: Some(EmptyDirVolumeSource {
|
||||
medium: Some("Memory".to_string()),
|
||||
size_limit: Some(Quantity("4Gi".to_string())),
|
||||
}),
|
||||
..Default::default()
|
||||
},
|
||||
Volume {
|
||||
name: "hf-token".to_string(),
|
||||
secret: Some(SecretVolumeSource {
|
||||
secret_name: Some(SECRET_NAME.to_string()),
|
||||
optional: Some(true),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
},
|
||||
]),
|
||||
containers: vec![Container {
|
||||
name: DEPLOYMENT_NAME.to_string(),
|
||||
image: Some(VLLM_IMAGE.to_string()),
|
||||
command: Some(vec!["/bin/sh".to_string(), "-c".to_string()]),
|
||||
args: Some(vec![build_vllm_command()]),
|
||||
env: Some(vec![
|
||||
EnvVar {
|
||||
name: "HF_TOKEN".to_string(),
|
||||
value_from: Some(EnvVarSource {
|
||||
secret_key_ref: Some(SecretKeySelector {
|
||||
key: "token".to_string(),
|
||||
name: SECRET_NAME.to_string(),
|
||||
optional: Some(true),
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
value: None,
|
||||
},
|
||||
EnvVar {
|
||||
name: "VLLM_WORKER_MULTIPROC_METHOD".to_string(),
|
||||
value: Some("spawn".to_string()),
|
||||
value_from: None,
|
||||
},
|
||||
]),
|
||||
ports: Some(vec![ContainerPort {
|
||||
container_port: SERVICE_PORT as i32,
|
||||
protocol: Some("TCP".to_string()),
|
||||
..Default::default()
|
||||
}]),
|
||||
resources: Some(ResourceRequirements {
|
||||
limits: Some(BTreeMap::from([
|
||||
("cpu".to_string(), Quantity("10".to_string())),
|
||||
("memory".to_string(), Quantity("30Gi".to_string())),
|
||||
("nvidia.com/gpu".to_string(), Quantity("1".to_string())),
|
||||
])),
|
||||
requests: Some(BTreeMap::from([
|
||||
("cpu".to_string(), Quantity("2".to_string())),
|
||||
("memory".to_string(), Quantity("10Gi".to_string())),
|
||||
("nvidia.com/gpu".to_string(), Quantity("1".to_string())),
|
||||
])),
|
||||
claims: None,
|
||||
}),
|
||||
volume_mounts: Some(vec![
|
||||
VolumeMount {
|
||||
name: "cache-volume".to_string(),
|
||||
mount_path: "/root/.cache/huggingface".to_string(),
|
||||
read_only: Some(false),
|
||||
..Default::default()
|
||||
},
|
||||
VolumeMount {
|
||||
name: "shm".to_string(),
|
||||
mount_path: "/dev/shm".to_string(),
|
||||
..Default::default()
|
||||
},
|
||||
VolumeMount {
|
||||
name: "hf-token".to_string(),
|
||||
mount_path: "/etc/secrets/hf-token".to_string(),
|
||||
read_only: Some(true),
|
||||
..Default::default()
|
||||
},
|
||||
]),
|
||||
liveness_probe: Some(Probe {
|
||||
http_get: Some(HTTPGetAction {
|
||||
path: Some("/health".to_string()),
|
||||
port: IntOrString::Int(SERVICE_PORT as i32),
|
||||
..Default::default()
|
||||
}),
|
||||
initial_delay_seconds: Some(300),
|
||||
period_seconds: Some(30),
|
||||
..Default::default()
|
||||
}),
|
||||
readiness_probe: Some(Probe {
|
||||
http_get: Some(HTTPGetAction {
|
||||
path: Some("/health".to_string()),
|
||||
port: IntOrString::Int(SERVICE_PORT as i32),
|
||||
..Default::default()
|
||||
}),
|
||||
initial_delay_seconds: Some(120),
|
||||
period_seconds: Some(10),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
}],
|
||||
..Default::default()
|
||||
}),
|
||||
},
|
||||
..Default::default()
|
||||
}),
|
||||
status: None,
|
||||
};
|
||||
|
||||
Box::new(K8sResourceScore::single(
|
||||
deployment,
|
||||
Some(NAMESPACE.to_string()),
|
||||
))
|
||||
}
|
||||
|
||||
fn build_vllm_command() -> String {
|
||||
format!(
|
||||
"vllm serve {} \
|
||||
--port {} \
|
||||
--max-model-len {} \
|
||||
--gpu-memory-utilization {} \
|
||||
--reasoning-parser qwen3 \
|
||||
--enable-auto-tool-choice \
|
||||
--tool-call-parser qwen3_coder \
|
||||
--language-model-only",
|
||||
MODEL_NAME, SERVICE_PORT, MAX_MODEL_LEN, GPU_MEMORY_UTILIZATION
|
||||
)
|
||||
}
|
||||
|
||||
fn create_service() -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||
let service = Service {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(SERVICE_NAME.to_string()),
|
||||
namespace: Some(NAMESPACE.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: Some(ServiceSpec {
|
||||
ports: Some(vec![ServicePort {
|
||||
name: Some("http".to_string()),
|
||||
port: SERVICE_PORT as i32,
|
||||
protocol: Some("TCP".to_string()),
|
||||
target_port: Some(IntOrString::Int(TARGET_PORT as i32)),
|
||||
..Default::default()
|
||||
}]),
|
||||
selector: Some(BTreeMap::from([(
|
||||
"app".to_string(),
|
||||
DEPLOYMENT_NAME.to_string(),
|
||||
)])),
|
||||
type_: Some("ClusterIP".to_string()),
|
||||
..Default::default()
|
||||
}),
|
||||
status: None,
|
||||
};
|
||||
|
||||
Box::new(K8sResourceScore::single(
|
||||
service,
|
||||
Some(NAMESPACE.to_string()),
|
||||
))
|
||||
}
|
||||
|
||||
fn create_route(host: &str) -> Box<dyn Score<K8sAnywhereTopology>> {
|
||||
let route_spec = RouteSpec {
|
||||
to: RouteTargetReference {
|
||||
kind: "Service".to_string(),
|
||||
name: SERVICE_NAME.to_string(),
|
||||
weight: Some(100),
|
||||
},
|
||||
host: Some(host.to_string()),
|
||||
port: Some(RoutePort {
|
||||
target_port: SERVICE_PORT as u16,
|
||||
}),
|
||||
tls: Some(TLSConfig {
|
||||
termination: "edge".to_string(),
|
||||
insecure_edge_termination_policy: Some("Redirect".to_string()),
|
||||
..Default::default()
|
||||
}),
|
||||
wildcard_policy: None,
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
Box::new(OKDRouteScore::new(ROUTE_NAME, NAMESPACE, route_spec))
|
||||
}
|
||||
14
examples/zitadel/Cargo.toml
Normal file
14
examples/zitadel/Cargo.toml
Normal file
@@ -0,0 +1,14 @@
|
||||
[package]
|
||||
name = "example-zitadel"
|
||||
edition = "2024"
|
||||
version.workspace = true
|
||||
readme.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
harmony = { path = "../../harmony" }
|
||||
harmony_cli = { path = "../../harmony_cli" }
|
||||
harmony_macros = { path = "../../harmony_macros" }
|
||||
harmony_types = { path = "../../harmony_types" }
|
||||
tokio.workspace = true
|
||||
url.workspace = true
|
||||
20
examples/zitadel/src/main.rs
Normal file
20
examples/zitadel/src/main.rs
Normal file
@@ -0,0 +1,20 @@
|
||||
use harmony::{
|
||||
inventory::Inventory, modules::zitadel::ZitadelScore, topology::K8sAnywhereTopology,
|
||||
};
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let zitadel = ZitadelScore {
|
||||
host: "sso.sto1.nationtech.io".to_string(),
|
||||
zitadel_version: "v4.12.1".to_string(),
|
||||
};
|
||||
|
||||
harmony_cli::run(
|
||||
Inventory::autoload(),
|
||||
K8sAnywhereTopology::from_env(),
|
||||
vec![Box::new(zitadel)],
|
||||
None,
|
||||
)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
BIN
examples/zitadel/zitadel-9.24.0.tgz
Normal file
BIN
examples/zitadel/zitadel-9.24.0.tgz
Normal file
Binary file not shown.
23
harmony-k8s/Cargo.toml
Normal file
23
harmony-k8s/Cargo.toml
Normal file
@@ -0,0 +1,23 @@
|
||||
[package]
|
||||
name = "harmony-k8s"
|
||||
edition = "2024"
|
||||
version.workspace = true
|
||||
readme.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
kube.workspace = true
|
||||
k8s-openapi.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-retry.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
serde_yaml.workspace = true
|
||||
log.workspace = true
|
||||
similar.workspace = true
|
||||
reqwest.workspace = true
|
||||
url.workspace = true
|
||||
inquire.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions.workspace = true
|
||||
593
harmony-k8s/src/apply.rs
Normal file
593
harmony-k8s/src/apply.rs
Normal file
@@ -0,0 +1,593 @@
|
||||
use kube::{
|
||||
Client, Error, Resource,
|
||||
api::{
|
||||
Api, ApiResource, DynamicObject, GroupVersionKind, Patch, PatchParams, PostParams,
|
||||
ResourceExt,
|
||||
},
|
||||
core::ErrorResponse,
|
||||
discovery::Scope,
|
||||
error::DiscoveryError,
|
||||
};
|
||||
use log::{debug, error, trace, warn};
|
||||
use serde::{Serialize, de::DeserializeOwned};
|
||||
use serde_json::Value;
|
||||
use similar::TextDiff;
|
||||
use url::Url;
|
||||
|
||||
use crate::client::K8sClient;
|
||||
use crate::helper;
|
||||
use crate::types::WriteMode;
|
||||
|
||||
/// The field-manager token sent with every server-side apply request.
|
||||
pub const FIELD_MANAGER: &str = "harmony-k8s";
|
||||
|
||||
// ── Private helpers ──────────────────────────────────────────────────────────
|
||||
|
||||
/// Serialise any `Serialize` payload to a [`DynamicObject`] via JSON.
|
||||
fn to_dynamic<T: Serialize>(payload: &T) -> Result<DynamicObject, Error> {
|
||||
serde_json::from_value(serde_json::to_value(payload).map_err(Error::SerdeError)?)
|
||||
.map_err(Error::SerdeError)
|
||||
}
|
||||
|
||||
/// Fetch the current resource, display a unified diff against `payload`, and
|
||||
/// return `()`. All output goes to stdout (same behaviour as before).
|
||||
///
|
||||
/// A 404 is treated as "resource would be created" — not an error.
|
||||
async fn show_dry_run<T: Serialize>(
|
||||
api: &Api<DynamicObject>,
|
||||
name: &str,
|
||||
payload: &T,
|
||||
) -> Result<(), Error> {
|
||||
let new_yaml = serde_yaml::to_string(payload)
|
||||
.unwrap_or_else(|_| "Failed to serialize new resource".to_string());
|
||||
|
||||
match api.get(name).await {
|
||||
Ok(current) => {
|
||||
println!("\nDry-run for resource: '{name}'");
|
||||
let mut current_val = serde_yaml::to_value(¤t).unwrap_or(serde_yaml::Value::Null);
|
||||
if let Some(map) = current_val.as_mapping_mut() {
|
||||
map.remove(&serde_yaml::Value::String("status".to_string()));
|
||||
}
|
||||
let current_yaml = serde_yaml::to_string(¤t_val)
|
||||
.unwrap_or_else(|_| "Failed to serialize current resource".to_string());
|
||||
|
||||
if current_yaml == new_yaml {
|
||||
println!("No changes detected.");
|
||||
} else {
|
||||
println!("Changes detected:");
|
||||
let diff = TextDiff::from_lines(¤t_yaml, &new_yaml);
|
||||
for change in diff.iter_all_changes() {
|
||||
let sign = match change.tag() {
|
||||
similar::ChangeTag::Delete => "-",
|
||||
similar::ChangeTag::Insert => "+",
|
||||
similar::ChangeTag::Equal => " ",
|
||||
};
|
||||
print!("{sign}{change}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Err(Error::Api(ErrorResponse { code: 404, .. })) => {
|
||||
println!("\nDry-run for new resource: '{name}'");
|
||||
println!("Resource does not exist. Would be created:");
|
||||
for line in new_yaml.lines() {
|
||||
println!("+{line}");
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to fetch resource '{name}' for dry-run: {e}");
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute the real (non-dry-run) apply, respecting [`WriteMode`].
|
||||
async fn do_apply<T: Serialize + std::fmt::Debug>(
|
||||
api: &Api<DynamicObject>,
|
||||
name: &str,
|
||||
payload: &T,
|
||||
patch_params: &PatchParams,
|
||||
write_mode: &WriteMode,
|
||||
) -> Result<DynamicObject, Error> {
|
||||
match write_mode {
|
||||
WriteMode::CreateOrUpdate => {
|
||||
// TODO refactor this arm to perform self.update and if fail with 404 self.create
|
||||
// This will avoid the repetition of the api.patch and api.create calls within this
|
||||
// function body. This makes the code more maintainable
|
||||
match api.patch(name, patch_params, &Patch::Apply(payload)).await {
|
||||
Ok(obj) => Ok(obj),
|
||||
Err(Error::Api(ErrorResponse { code: 404, .. })) => {
|
||||
debug!("Resource '{name}' not found via SSA, falling back to POST");
|
||||
let dyn_obj = to_dynamic(payload)?;
|
||||
api.create(&PostParams::default(), &dyn_obj)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!("Failed to create '{name}': {e}");
|
||||
e
|
||||
})
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to apply '{name}': {e}");
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
}
|
||||
WriteMode::Create => {
|
||||
let dyn_obj = to_dynamic(payload)?;
|
||||
api.create(&PostParams::default(), &dyn_obj)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!("Failed to create '{name}': {e}");
|
||||
e
|
||||
})
|
||||
}
|
||||
WriteMode::Update => match api.patch(name, patch_params, &Patch::Apply(payload)).await {
|
||||
Ok(obj) => Ok(obj),
|
||||
Err(Error::Api(ErrorResponse { code: 404, .. })) => Err(Error::Api(ErrorResponse {
|
||||
code: 404,
|
||||
message: format!("Resource '{name}' not found and WriteMode is UpdateOnly"),
|
||||
reason: "NotFound".to_string(),
|
||||
status: "Failure".to_string(),
|
||||
})),
|
||||
Err(e) => {
|
||||
error!("Failed to update '{name}': {e}");
|
||||
Err(e)
|
||||
}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// ── Public API ───────────────────────────────────────────────────────────────
|
||||
|
||||
impl K8sClient {
|
||||
/// Server-side apply: create if absent, update if present.
|
||||
/// Equivalent to `kubectl apply`.
|
||||
pub async fn apply<K>(&self, resource: &K, namespace: Option<&str>) -> Result<K, Error>
|
||||
where
|
||||
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + Serialize,
|
||||
<K as Resource>::DynamicType: Default,
|
||||
{
|
||||
self.apply_with_strategy(resource, namespace, WriteMode::CreateOrUpdate)
|
||||
.await
|
||||
}
|
||||
|
||||
/// POST only — returns an error if the resource already exists.
|
||||
pub async fn create<K>(&self, resource: &K, namespace: Option<&str>) -> Result<K, Error>
|
||||
where
|
||||
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + Serialize,
|
||||
<K as Resource>::DynamicType: Default,
|
||||
{
|
||||
self.apply_with_strategy(resource, namespace, WriteMode::Create)
|
||||
.await
|
||||
}
|
||||
|
||||
/// Server-side apply only — returns an error if the resource does not exist.
|
||||
pub async fn update<K>(&self, resource: &K, namespace: Option<&str>) -> Result<K, Error>
|
||||
where
|
||||
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + Serialize,
|
||||
<K as Resource>::DynamicType: Default,
|
||||
{
|
||||
self.apply_with_strategy(resource, namespace, WriteMode::Update)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn apply_with_strategy<K>(
|
||||
&self,
|
||||
resource: &K,
|
||||
namespace: Option<&str>,
|
||||
write_mode: WriteMode,
|
||||
) -> Result<K, Error>
|
||||
where
|
||||
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + Serialize,
|
||||
<K as Resource>::DynamicType: Default,
|
||||
{
|
||||
debug!(
|
||||
"apply_with_strategy: {:?} ns={:?}",
|
||||
resource.meta().name,
|
||||
namespace
|
||||
);
|
||||
trace!("{:#}", serde_json::to_value(resource).unwrap_or_default());
|
||||
|
||||
let dyntype = K::DynamicType::default();
|
||||
let gvk = GroupVersionKind {
|
||||
group: K::group(&dyntype).to_string(),
|
||||
version: K::version(&dyntype).to_string(),
|
||||
kind: K::kind(&dyntype).to_string(),
|
||||
};
|
||||
|
||||
let discovery = self.discovery().await?;
|
||||
let (ar, caps) = discovery.resolve_gvk(&gvk).ok_or_else(|| {
|
||||
Error::Discovery(DiscoveryError::MissingResource(format!(
|
||||
"Cannot resolve GVK: {gvk:?}"
|
||||
)))
|
||||
})?;
|
||||
|
||||
let effective_ns = if caps.scope == Scope::Cluster {
|
||||
None
|
||||
} else {
|
||||
namespace.or_else(|| resource.meta().namespace.as_deref())
|
||||
};
|
||||
|
||||
let api: Api<DynamicObject> =
|
||||
get_dynamic_api(ar, caps, self.client.clone(), effective_ns, false);
|
||||
|
||||
let name = resource
|
||||
.meta()
|
||||
.name
|
||||
.as_deref()
|
||||
.expect("Kubernetes resource must have a name");
|
||||
|
||||
if self.dry_run {
|
||||
show_dry_run(&api, name, resource).await?;
|
||||
return Ok(resource.clone());
|
||||
}
|
||||
|
||||
let patch_params = PatchParams::apply(FIELD_MANAGER);
|
||||
do_apply(&api, name, resource, &patch_params, &write_mode)
|
||||
.await
|
||||
.and_then(helper::dyn_to_typed)
|
||||
}
|
||||
|
||||
/// Applies resources in order, one at a time
|
||||
pub async fn apply_many<K>(&self, resources: &[K], ns: Option<&str>) -> Result<Vec<K>, Error>
|
||||
where
|
||||
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + Serialize,
|
||||
<K as Resource>::DynamicType: Default,
|
||||
{
|
||||
let mut result = Vec::new();
|
||||
for r in resources.iter() {
|
||||
let res = self.apply(r, ns).await;
|
||||
if res.is_err() {
|
||||
// NOTE: this may log sensitive data; downgrade to debug if needed.
|
||||
warn!(
|
||||
"Failed to apply k8s resource: {}",
|
||||
serde_json::to_string_pretty(r).map_err(Error::SerdeError)?
|
||||
);
|
||||
}
|
||||
result.push(res?);
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
/// Apply a [`DynamicObject`] resource using server-side apply.
|
||||
pub async fn apply_dynamic(
|
||||
&self,
|
||||
resource: &DynamicObject,
|
||||
namespace: Option<&str>,
|
||||
force_conflicts: bool,
|
||||
) -> Result<DynamicObject, Error> {
|
||||
trace!("apply_dynamic {resource:#?} ns={namespace:?} force={force_conflicts}");
|
||||
|
||||
let discovery = self.discovery().await?;
|
||||
let type_meta = resource.types.as_ref().ok_or_else(|| {
|
||||
Error::BuildRequest(kube::core::request::Error::Validation(
|
||||
"DynamicObject must have types (apiVersion and kind)".to_string(),
|
||||
))
|
||||
})?;
|
||||
|
||||
let gvk = GroupVersionKind::try_from(type_meta).map_err(|_| {
|
||||
Error::BuildRequest(kube::core::request::Error::Validation(format!(
|
||||
"Invalid GVK in DynamicObject: {type_meta:?}"
|
||||
)))
|
||||
})?;
|
||||
|
||||
let (ar, caps) = discovery.resolve_gvk(&gvk).ok_or_else(|| {
|
||||
Error::Discovery(DiscoveryError::MissingResource(format!(
|
||||
"Cannot resolve GVK: {gvk:?}"
|
||||
)))
|
||||
})?;
|
||||
|
||||
let effective_ns = if caps.scope == Scope::Cluster {
|
||||
None
|
||||
} else {
|
||||
namespace.or_else(|| resource.metadata.namespace.as_deref())
|
||||
};
|
||||
|
||||
let api = get_dynamic_api(ar, caps, self.client.clone(), effective_ns, false);
|
||||
let name = resource.metadata.name.as_deref().ok_or_else(|| {
|
||||
Error::BuildRequest(kube::core::request::Error::Validation(
|
||||
"DynamicObject must have metadata.name".to_string(),
|
||||
))
|
||||
})?;
|
||||
|
||||
debug!(
|
||||
"apply_dynamic kind={:?} name='{name}' ns={effective_ns:?}",
|
||||
resource.types.as_ref().map(|t| &t.kind),
|
||||
);
|
||||
|
||||
// NOTE would be nice to improve cohesion between the dynamic and typed apis and avoid copy
|
||||
// pasting the dry_run and some more logic
|
||||
if self.dry_run {
|
||||
show_dry_run(&api, name, resource).await?;
|
||||
return Ok(resource.clone());
|
||||
}
|
||||
|
||||
let mut patch_params = PatchParams::apply(FIELD_MANAGER);
|
||||
patch_params.force = force_conflicts;
|
||||
|
||||
do_apply(
|
||||
&api,
|
||||
name,
|
||||
resource,
|
||||
&patch_params,
|
||||
&WriteMode::CreateOrUpdate,
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn apply_dynamic_many(
|
||||
&self,
|
||||
resources: &[DynamicObject],
|
||||
namespace: Option<&str>,
|
||||
force_conflicts: bool,
|
||||
) -> Result<Vec<DynamicObject>, Error> {
|
||||
let mut result = Vec::new();
|
||||
for r in resources.iter() {
|
||||
result.push(self.apply_dynamic(r, namespace, force_conflicts).await?);
|
||||
}
|
||||
Ok(result)
|
||||
}
|
||||
|
||||
pub async fn apply_yaml_many(
|
||||
&self,
|
||||
#[allow(clippy::ptr_arg)] yaml: &Vec<serde_yaml::Value>,
|
||||
ns: Option<&str>,
|
||||
) -> Result<(), Error> {
|
||||
for y in yaml.iter() {
|
||||
self.apply_yaml(y, ns).await?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn apply_yaml(
|
||||
&self,
|
||||
yaml: &serde_yaml::Value,
|
||||
ns: Option<&str>,
|
||||
) -> Result<(), Error> {
|
||||
// NOTE wouldn't it be possible to parse this into a DynamicObject and simply call
|
||||
// apply_dynamic instead of reimplementing api interactions?
|
||||
let obj: DynamicObject =
|
||||
serde_yaml::from_value(yaml.clone()).expect("YAML must deserialise to DynamicObject");
|
||||
let name = obj.metadata.name.as_ref().expect("YAML must have a name");
|
||||
|
||||
let api_version = yaml["apiVersion"].as_str().expect("missing apiVersion");
|
||||
let kind = yaml["kind"].as_str().expect("missing kind");
|
||||
|
||||
let mut it = api_version.splitn(2, '/');
|
||||
let first = it.next().unwrap();
|
||||
let (g, v) = match it.next() {
|
||||
Some(second) => (first, second),
|
||||
None => ("", first),
|
||||
};
|
||||
|
||||
let api_resource = ApiResource::from_gvk(&GroupVersionKind::gvk(g, v, kind));
|
||||
let namespace = ns.unwrap_or_else(|| {
|
||||
obj.metadata
|
||||
.namespace
|
||||
.as_deref()
|
||||
.expect("YAML must have a namespace when ns is not provided")
|
||||
});
|
||||
|
||||
let api: Api<DynamicObject> =
|
||||
Api::namespaced_with(self.client.clone(), namespace, &api_resource);
|
||||
|
||||
println!("Applying '{name}' in namespace '{namespace}'...");
|
||||
let patch_params = PatchParams::apply(FIELD_MANAGER);
|
||||
let result = api.patch(name, &patch_params, &Patch::Apply(&obj)).await?;
|
||||
println!("Successfully applied '{}'.", result.name_any());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Equivalent to `kubectl apply -f <url>`.
|
||||
pub async fn apply_url(&self, url: Url, ns: Option<&str>) -> Result<(), Error> {
|
||||
let patch_params = PatchParams::apply(FIELD_MANAGER);
|
||||
let discovery = self.discovery().await?;
|
||||
|
||||
let yaml = reqwest::get(url)
|
||||
.await
|
||||
.expect("Could not fetch URL")
|
||||
.text()
|
||||
.await
|
||||
.expect("Could not read response body");
|
||||
|
||||
for doc in multidoc_deserialize(&yaml).expect("Failed to parse YAML from URL") {
|
||||
let obj: DynamicObject =
|
||||
serde_yaml::from_value(doc).expect("YAML document is not a valid object");
|
||||
let namespace = obj.metadata.namespace.as_deref().or(ns);
|
||||
let type_meta = obj.types.as_ref().expect("Object is missing TypeMeta");
|
||||
let gvk =
|
||||
GroupVersionKind::try_from(type_meta).expect("Object has invalid GroupVersionKind");
|
||||
let name = obj.name_any();
|
||||
|
||||
if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) {
|
||||
let api = get_dynamic_api(ar, caps, self.client.clone(), namespace, false);
|
||||
trace!(
|
||||
"Applying {}:\n{}",
|
||||
gvk.kind,
|
||||
serde_yaml::to_string(&obj).unwrap_or_default()
|
||||
);
|
||||
let data: Value = serde_json::to_value(&obj).expect("serialisation failed");
|
||||
let _r = api.patch(&name, &patch_params, &Patch::Apply(data)).await?;
|
||||
debug!("Applied {} '{name}'", gvk.kind);
|
||||
} else {
|
||||
warn!("Skipping document with unknown GVK: {gvk:?}");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Build a dynamic API client from a [`DynamicObject`]'s type metadata.
|
||||
pub(crate) fn get_api_for_dynamic_object(
|
||||
&self,
|
||||
object: &DynamicObject,
|
||||
ns: Option<&str>,
|
||||
) -> Result<Api<DynamicObject>, Error> {
|
||||
let ar = object
|
||||
.types
|
||||
.as_ref()
|
||||
.and_then(|t| {
|
||||
let parts: Vec<&str> = t.api_version.split('/').collect();
|
||||
match parts.as_slice() {
|
||||
[version] => Some(ApiResource::from_gvk(&GroupVersionKind::gvk(
|
||||
"", version, &t.kind,
|
||||
))),
|
||||
[group, version] => Some(ApiResource::from_gvk(&GroupVersionKind::gvk(
|
||||
group, version, &t.kind,
|
||||
))),
|
||||
_ => None,
|
||||
}
|
||||
})
|
||||
.ok_or_else(|| {
|
||||
Error::BuildRequest(kube::core::request::Error::Validation(format!(
|
||||
"Invalid apiVersion in DynamicObject: {object:#?}"
|
||||
)))
|
||||
})?;
|
||||
|
||||
Ok(match ns {
|
||||
Some(ns) => Api::namespaced_with(self.client.clone(), ns, &ar),
|
||||
None => Api::default_namespaced_with(self.client.clone(), &ar),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ── Free functions ───────────────────────────────────────────────────────────
|
||||
|
||||
pub(crate) fn get_dynamic_api(
|
||||
resource: kube::api::ApiResource,
|
||||
capabilities: kube::discovery::ApiCapabilities,
|
||||
client: Client,
|
||||
ns: Option<&str>,
|
||||
all: bool,
|
||||
) -> Api<DynamicObject> {
|
||||
if capabilities.scope == Scope::Cluster || all {
|
||||
Api::all_with(client, &resource)
|
||||
} else if let Some(namespace) = ns {
|
||||
Api::namespaced_with(client, namespace, &resource)
|
||||
} else {
|
||||
Api::default_namespaced_with(client, &resource)
|
||||
}
|
||||
}
|
||||
|
||||
pub(crate) fn multidoc_deserialize(
|
||||
data: &str,
|
||||
) -> Result<Vec<serde_yaml::Value>, serde_yaml::Error> {
|
||||
use serde::Deserialize;
|
||||
let mut docs = vec![];
|
||||
for de in serde_yaml::Deserializer::from_str(data) {
|
||||
docs.push(serde_yaml::Value::deserialize(de)?);
|
||||
}
|
||||
Ok(docs)
|
||||
}
|
||||
|
||||
// ── Tests ────────────────────────────────────────────────────────────────────
|
||||
|
||||
#[cfg(test)]
|
||||
mod apply_tests {
|
||||
use std::collections::BTreeMap;
|
||||
use std::time::{SystemTime, UNIX_EPOCH};
|
||||
|
||||
use k8s_openapi::api::core::v1::ConfigMap;
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
||||
use kube::api::{DeleteParams, TypeMeta};
|
||||
|
||||
use super::*;
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore = "requires kubernetes cluster"]
|
||||
async fn apply_creates_new_configmap() {
|
||||
let client = K8sClient::try_default().await.unwrap();
|
||||
let ns = "default";
|
||||
let name = format!(
|
||||
"test-cm-{}",
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis()
|
||||
);
|
||||
|
||||
let cm = ConfigMap {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(name.clone()),
|
||||
namespace: Some(ns.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
data: Some(BTreeMap::from([("key1".to_string(), "value1".to_string())])),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert!(client.apply(&cm, Some(ns)).await.is_ok());
|
||||
|
||||
let api: Api<ConfigMap> = Api::namespaced(client.client.clone(), ns);
|
||||
let _ = api.delete(&name, &DeleteParams::default()).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore = "requires kubernetes cluster"]
|
||||
async fn apply_is_idempotent() {
|
||||
let client = K8sClient::try_default().await.unwrap();
|
||||
let ns = "default";
|
||||
let name = format!(
|
||||
"test-idem-{}",
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis()
|
||||
);
|
||||
|
||||
let cm = ConfigMap {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(name.clone()),
|
||||
namespace: Some(ns.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
data: Some(BTreeMap::from([("key".to_string(), "value".to_string())])),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
assert!(
|
||||
client.apply(&cm, Some(ns)).await.is_ok(),
|
||||
"first apply failed"
|
||||
);
|
||||
assert!(
|
||||
client.apply(&cm, Some(ns)).await.is_ok(),
|
||||
"second apply failed (not idempotent)"
|
||||
);
|
||||
|
||||
let api: Api<ConfigMap> = Api::namespaced(client.client.clone(), ns);
|
||||
let _ = api.delete(&name, &DeleteParams::default()).await;
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
#[ignore = "requires kubernetes cluster"]
|
||||
async fn apply_dynamic_creates_new_resource() {
|
||||
let client = K8sClient::try_default().await.unwrap();
|
||||
let ns = "default";
|
||||
let name = format!(
|
||||
"test-dyn-{}",
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis()
|
||||
);
|
||||
|
||||
let obj = DynamicObject {
|
||||
types: Some(TypeMeta {
|
||||
api_version: "v1".to_string(),
|
||||
kind: "ConfigMap".to_string(),
|
||||
}),
|
||||
metadata: ObjectMeta {
|
||||
name: Some(name.clone()),
|
||||
namespace: Some(ns.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
data: serde_json::json!({}),
|
||||
};
|
||||
|
||||
let result = client.apply_dynamic(&obj, Some(ns), false).await;
|
||||
assert!(result.is_ok(), "apply_dynamic failed: {:?}", result.err());
|
||||
|
||||
let api: Api<ConfigMap> = Api::namespaced(client.client.clone(), ns);
|
||||
let _ = api.delete(&name, &DeleteParams::default()).await;
|
||||
}
|
||||
}
|
||||
@@ -25,9 +25,9 @@
|
||||
//!
|
||||
//! ## Example
|
||||
//!
|
||||
//! ```rust,no_run
|
||||
//! use harmony::topology::k8s::{K8sClient, helper};
|
||||
//! use harmony::topology::KubernetesDistribution;
|
||||
//! ```
|
||||
//! use harmony_k8s::{K8sClient, helper};
|
||||
//! use harmony_k8s::KubernetesDistribution;
|
||||
//!
|
||||
//! async fn write_network_config(client: &K8sClient, node: &str) {
|
||||
//! // Create a bundle with platform-specific RBAC
|
||||
@@ -56,7 +56,7 @@ use kube::{Error, Resource, ResourceExt, api::DynamicObject};
|
||||
use serde::Serialize;
|
||||
use serde_json;
|
||||
|
||||
use crate::domain::topology::k8s::K8sClient;
|
||||
use crate::K8sClient;
|
||||
|
||||
/// A ResourceBundle represents a logical unit of work consisting of multiple
|
||||
/// Kubernetes resources that should be applied or deleted together.
|
||||
99
harmony-k8s/src/client.rs
Normal file
99
harmony-k8s/src/client.rs
Normal file
@@ -0,0 +1,99 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use kube::config::{KubeConfigOptions, Kubeconfig};
|
||||
use kube::{Client, Config, Discovery, Error};
|
||||
use log::error;
|
||||
use serde::Serialize;
|
||||
use tokio::sync::OnceCell;
|
||||
|
||||
use crate::types::KubernetesDistribution;
|
||||
|
||||
// TODO not cool, should use a proper configuration mechanism
|
||||
// cli arg, env var, config file
|
||||
fn read_dry_run_from_env() -> bool {
|
||||
std::env::var("DRY_RUN")
|
||||
.map(|v| v == "true" || v == "1")
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
#[derive(Clone)]
|
||||
pub struct K8sClient {
|
||||
pub(crate) client: Client,
|
||||
/// When `true` no mutation is sent to the API server; diffs are printed
|
||||
/// to stdout instead. Initialised from the `DRY_RUN` environment variable.
|
||||
pub(crate) dry_run: bool,
|
||||
pub(crate) k8s_distribution: Arc<OnceCell<KubernetesDistribution>>,
|
||||
pub(crate) discovery: Arc<OnceCell<Discovery>>,
|
||||
}
|
||||
|
||||
impl Serialize for K8sClient {
|
||||
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
|
||||
where
|
||||
S: serde::Serializer,
|
||||
{
|
||||
todo!("K8sClient serialization is not meaningful; remove this impl if unused")
|
||||
}
|
||||
}
|
||||
|
||||
impl std::fmt::Debug for K8sClient {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.write_fmt(format_args!(
|
||||
"K8sClient {{ namespace: {}, dry_run: {} }}",
|
||||
self.client.default_namespace(),
|
||||
self.dry_run,
|
||||
))
|
||||
}
|
||||
}
|
||||
|
||||
impl K8sClient {
|
||||
/// Create a client, reading `DRY_RUN` from the environment.
|
||||
pub fn new(client: Client) -> Self {
|
||||
Self {
|
||||
dry_run: read_dry_run_from_env(),
|
||||
client,
|
||||
k8s_distribution: Arc::new(OnceCell::new()),
|
||||
discovery: Arc::new(OnceCell::new()),
|
||||
}
|
||||
}
|
||||
|
||||
/// Create a client that always operates in dry-run mode, regardless of
|
||||
/// the environment variable.
|
||||
pub fn new_dry_run(client: Client) -> Self {
|
||||
Self {
|
||||
dry_run: true,
|
||||
..Self::new(client)
|
||||
}
|
||||
}
|
||||
|
||||
/// Returns `true` if this client is operating in dry-run mode.
|
||||
pub fn is_dry_run(&self) -> bool {
|
||||
self.dry_run
|
||||
}
|
||||
|
||||
pub async fn try_default() -> Result<Self, Error> {
|
||||
Ok(Self::new(Client::try_default().await?))
|
||||
}
|
||||
|
||||
pub async fn from_kubeconfig(path: &str) -> Option<Self> {
|
||||
Self::from_kubeconfig_with_opts(path, &KubeConfigOptions::default()).await
|
||||
}
|
||||
|
||||
pub async fn from_kubeconfig_with_context(path: &str, context: Option<String>) -> Option<Self> {
|
||||
let mut opts = KubeConfigOptions::default();
|
||||
opts.context = context;
|
||||
Self::from_kubeconfig_with_opts(path, &opts).await
|
||||
}
|
||||
|
||||
pub async fn from_kubeconfig_with_opts(path: &str, opts: &KubeConfigOptions) -> Option<Self> {
|
||||
let k = match Kubeconfig::read_from(path) {
|
||||
Ok(k) => k,
|
||||
Err(e) => {
|
||||
error!("Failed to load kubeconfig from {path}: {e}");
|
||||
return None;
|
||||
}
|
||||
};
|
||||
Some(Self::new(
|
||||
Client::try_from(Config::from_custom_kubeconfig(k, opts).await.unwrap()).unwrap(),
|
||||
))
|
||||
}
|
||||
}
|
||||
83
harmony-k8s/src/discovery.rs
Normal file
83
harmony-k8s/src/discovery.rs
Normal file
@@ -0,0 +1,83 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use kube::{Discovery, Error};
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use tokio::sync::Mutex;
|
||||
use tokio_retry::{Retry, strategy::ExponentialBackoff};
|
||||
|
||||
use crate::client::K8sClient;
|
||||
use crate::types::KubernetesDistribution;
|
||||
|
||||
impl K8sClient {
|
||||
pub async fn get_apiserver_version(
|
||||
&self,
|
||||
) -> Result<k8s_openapi::apimachinery::pkg::version::Info, Error> {
|
||||
self.client.clone().apiserver_version().await
|
||||
}
|
||||
|
||||
/// Runs (and caches) Kubernetes API discovery with exponential-backoff retries.
|
||||
pub async fn discovery(&self) -> Result<&Discovery, Error> {
|
||||
let retry_strategy = ExponentialBackoff::from_millis(1000)
|
||||
.max_delay(Duration::from_secs(32))
|
||||
.take(6);
|
||||
|
||||
let attempt = Mutex::new(0u32);
|
||||
Retry::spawn(retry_strategy, || async {
|
||||
let mut n = attempt.lock().await;
|
||||
*n += 1;
|
||||
match self
|
||||
.discovery
|
||||
.get_or_try_init(async || {
|
||||
debug!("Running Kubernetes API discovery (attempt {})", *n);
|
||||
let d = Discovery::new(self.client.clone()).run().await?;
|
||||
debug!("Kubernetes API discovery completed");
|
||||
Ok(d)
|
||||
})
|
||||
.await
|
||||
{
|
||||
Ok(d) => Ok(d),
|
||||
Err(e) => {
|
||||
warn!("Kubernetes API discovery failed (attempt {}): {}", *n, e);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|e| {
|
||||
error!("Kubernetes API discovery failed after all retries: {}", e);
|
||||
e
|
||||
})
|
||||
}
|
||||
|
||||
/// Detect which Kubernetes distribution is running. Result is cached for
|
||||
/// the lifetime of the client.
|
||||
pub async fn get_k8s_distribution(&self) -> Result<KubernetesDistribution, Error> {
|
||||
self.k8s_distribution
|
||||
.get_or_try_init(async || {
|
||||
debug!("Detecting Kubernetes distribution");
|
||||
let api_groups = self.client.list_api_groups().await?;
|
||||
trace!("list_api_groups: {:?}", api_groups);
|
||||
|
||||
let version = self.get_apiserver_version().await?;
|
||||
|
||||
if api_groups
|
||||
.groups
|
||||
.iter()
|
||||
.any(|g| g.name == "project.openshift.io")
|
||||
{
|
||||
info!("Detected distribution: OpenshiftFamily");
|
||||
return Ok(KubernetesDistribution::OpenshiftFamily);
|
||||
}
|
||||
|
||||
if version.git_version.contains("k3s") {
|
||||
info!("Detected distribution: K3sFamily");
|
||||
return Ok(KubernetesDistribution::K3sFamily);
|
||||
}
|
||||
|
||||
info!("Distribution not identified, using Default");
|
||||
Ok(KubernetesDistribution::Default)
|
||||
})
|
||||
.await
|
||||
.cloned()
|
||||
}
|
||||
}
|
||||
@@ -1,7 +1,7 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::topology::KubernetesDistribution;
|
||||
use crate::KubernetesDistribution;
|
||||
|
||||
use super::bundle::ResourceBundle;
|
||||
use super::config::PRIVILEGED_POD_IMAGE;
|
||||
@@ -10,8 +10,10 @@ use k8s_openapi::api::core::v1::{
|
||||
};
|
||||
use k8s_openapi::api::rbac::v1::{ClusterRoleBinding, RoleRef, Subject};
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
||||
use kube::api::DynamicObject;
|
||||
use kube::error::DiscoveryError;
|
||||
use log::{debug, error, info, warn};
|
||||
use serde::de::DeserializeOwned;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct PrivilegedPodConfig {
|
||||
@@ -131,9 +133,9 @@ pub fn host_root_volume() -> (Volume, VolumeMount) {
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```rust,no_run
|
||||
/// # use harmony::topology::k8s::helper::{build_privileged_bundle, PrivilegedPodConfig};
|
||||
/// # use harmony::topology::KubernetesDistribution;
|
||||
/// ```
|
||||
/// use harmony_k8s::helper::{build_privileged_bundle, PrivilegedPodConfig};
|
||||
/// use harmony_k8s::KubernetesDistribution;
|
||||
/// let bundle = build_privileged_bundle(
|
||||
/// PrivilegedPodConfig {
|
||||
/// name: "network-setup".to_string(),
|
||||
@@ -279,6 +281,16 @@ pub fn prompt_drain_timeout_action(
|
||||
}
|
||||
}
|
||||
|
||||
/// JSON round-trip: DynamicObject → K
|
||||
///
|
||||
/// Safe because the DynamicObject was produced by the apiserver from a
|
||||
/// payload that was originally serialized from K, so the schema is identical.
|
||||
pub(crate) fn dyn_to_typed<K: DeserializeOwned>(obj: DynamicObject) -> Result<K, kube::Error> {
|
||||
serde_json::to_value(obj)
|
||||
.and_then(serde_json::from_value)
|
||||
.map_err(kube::Error::SerdeError)
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
13
harmony-k8s/src/lib.rs
Normal file
13
harmony-k8s/src/lib.rs
Normal file
@@ -0,0 +1,13 @@
|
||||
pub mod apply;
|
||||
pub mod bundle;
|
||||
pub mod client;
|
||||
pub mod config;
|
||||
pub mod discovery;
|
||||
pub mod helper;
|
||||
pub mod node;
|
||||
pub mod pod;
|
||||
pub mod resources;
|
||||
pub mod types;
|
||||
|
||||
pub use client::K8sClient;
|
||||
pub use types::{DrainOptions, KubernetesDistribution, NodeFile, ScopeResolver, WriteMode};
|
||||
3
harmony-k8s/src/main.rs
Normal file
3
harmony-k8s/src/main.rs
Normal file
@@ -0,0 +1,3 @@
|
||||
fn main() {
|
||||
println!("Hello, world!");
|
||||
}
|
||||
722
harmony-k8s/src/node.rs
Normal file
722
harmony-k8s/src/node.rs
Normal file
@@ -0,0 +1,722 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use k8s_openapi::api::core::v1::{
|
||||
ConfigMap, ConfigMapVolumeSource, Node, Pod, Volume, VolumeMount,
|
||||
};
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
||||
use kube::{
|
||||
Error,
|
||||
api::{Api, DeleteParams, EvictParams, ListParams, PostParams},
|
||||
core::ErrorResponse,
|
||||
error::DiscoveryError,
|
||||
};
|
||||
use log::{debug, error, info, warn};
|
||||
use tokio::time::sleep;
|
||||
|
||||
use crate::client::K8sClient;
|
||||
use crate::helper::{self, PrivilegedPodConfig};
|
||||
use crate::types::{DrainOptions, NodeFile};
|
||||
|
||||
impl K8sClient {
|
||||
pub async fn cordon_node(&self, node_name: &str) -> Result<(), Error> {
|
||||
Api::<Node>::all(self.client.clone())
|
||||
.cordon(node_name)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn uncordon_node(&self, node_name: &str) -> Result<(), Error> {
|
||||
Api::<Node>::all(self.client.clone())
|
||||
.uncordon(node_name)
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn wait_for_node_ready(&self, node_name: &str) -> Result<(), Error> {
|
||||
self.wait_for_node_ready_with_timeout(node_name, Duration::from_secs(600))
|
||||
.await
|
||||
}
|
||||
|
||||
async fn wait_for_node_ready_with_timeout(
|
||||
&self,
|
||||
node_name: &str,
|
||||
timeout: Duration,
|
||||
) -> Result<(), Error> {
|
||||
let api: Api<Node> = Api::all(self.client.clone());
|
||||
let start = tokio::time::Instant::now();
|
||||
let poll = Duration::from_secs(5);
|
||||
loop {
|
||||
if start.elapsed() > timeout {
|
||||
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
|
||||
"Node '{node_name}' did not become Ready within {timeout:?}"
|
||||
))));
|
||||
}
|
||||
match api.get(node_name).await {
|
||||
Ok(node) => {
|
||||
if node
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.conditions.as_ref())
|
||||
.map(|conds| {
|
||||
conds
|
||||
.iter()
|
||||
.any(|c| c.type_ == "Ready" && c.status == "True")
|
||||
})
|
||||
.unwrap_or(false)
|
||||
{
|
||||
debug!("Node '{node_name}' is Ready");
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Err(e) => debug!("Error polling node '{node_name}': {e}"),
|
||||
}
|
||||
sleep(poll).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_for_node_not_ready(
|
||||
&self,
|
||||
node_name: &str,
|
||||
timeout: Duration,
|
||||
) -> Result<(), Error> {
|
||||
let api: Api<Node> = Api::all(self.client.clone());
|
||||
let start = tokio::time::Instant::now();
|
||||
let poll = Duration::from_secs(5);
|
||||
loop {
|
||||
if start.elapsed() > timeout {
|
||||
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
|
||||
"Node '{node_name}' did not become NotReady within {timeout:?}"
|
||||
))));
|
||||
}
|
||||
match api.get(node_name).await {
|
||||
Ok(node) => {
|
||||
let is_ready = node
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.conditions.as_ref())
|
||||
.map(|conds| {
|
||||
conds
|
||||
.iter()
|
||||
.any(|c| c.type_ == "Ready" && c.status == "True")
|
||||
})
|
||||
.unwrap_or(false);
|
||||
if !is_ready {
|
||||
debug!("Node '{node_name}' is NotReady");
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
Err(e) => debug!("Error polling node '{node_name}': {e}"),
|
||||
}
|
||||
sleep(poll).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn list_pods_on_node(&self, node_name: &str) -> Result<Vec<Pod>, Error> {
|
||||
let api: Api<Pod> = Api::all(self.client.clone());
|
||||
Ok(api
|
||||
.list(&ListParams::default().fields(&format!("spec.nodeName={node_name}")))
|
||||
.await?
|
||||
.items)
|
||||
}
|
||||
|
||||
fn is_mirror_pod(pod: &Pod) -> bool {
|
||||
pod.metadata
|
||||
.annotations
|
||||
.as_ref()
|
||||
.map(|a| a.contains_key("kubernetes.io/config.mirror"))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn is_daemonset_pod(pod: &Pod) -> bool {
|
||||
pod.metadata
|
||||
.owner_references
|
||||
.as_ref()
|
||||
.map(|refs| refs.iter().any(|r| r.kind == "DaemonSet"))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn has_emptydir_volume(pod: &Pod) -> bool {
|
||||
pod.spec
|
||||
.as_ref()
|
||||
.and_then(|s| s.volumes.as_ref())
|
||||
.map(|vols| vols.iter().any(|v| v.empty_dir.is_some()))
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn is_completed_pod(pod: &Pod) -> bool {
|
||||
pod.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.phase.as_deref())
|
||||
.map(|phase| phase == "Succeeded" || phase == "Failed")
|
||||
.unwrap_or(false)
|
||||
}
|
||||
|
||||
fn classify_pods_for_drain(
|
||||
pods: &[Pod],
|
||||
options: &DrainOptions,
|
||||
) -> Result<(Vec<Pod>, Vec<String>), String> {
|
||||
let mut evictable = Vec::new();
|
||||
let mut skipped = Vec::new();
|
||||
let mut blocking = Vec::new();
|
||||
|
||||
for pod in pods {
|
||||
let name = pod.metadata.name.as_deref().unwrap_or("<unknown>");
|
||||
let ns = pod.metadata.namespace.as_deref().unwrap_or("<unknown>");
|
||||
let qualified = format!("{ns}/{name}");
|
||||
|
||||
if Self::is_mirror_pod(pod) {
|
||||
skipped.push(format!("{qualified} (mirror pod)"));
|
||||
continue;
|
||||
}
|
||||
if Self::is_completed_pod(pod) {
|
||||
skipped.push(format!("{qualified} (completed)"));
|
||||
continue;
|
||||
}
|
||||
if Self::is_daemonset_pod(pod) {
|
||||
if options.ignore_daemonsets {
|
||||
skipped.push(format!("{qualified} (DaemonSet-managed)"));
|
||||
} else {
|
||||
blocking.push(format!(
|
||||
"{qualified} is managed by a DaemonSet (set ignore_daemonsets to skip)"
|
||||
));
|
||||
}
|
||||
continue;
|
||||
}
|
||||
if Self::has_emptydir_volume(pod) && !options.delete_emptydir_data {
|
||||
blocking.push(format!(
|
||||
"{qualified} uses emptyDir volumes (set delete_emptydir_data to allow eviction)"
|
||||
));
|
||||
continue;
|
||||
}
|
||||
evictable.push(pod.clone());
|
||||
}
|
||||
|
||||
if !blocking.is_empty() {
|
||||
return Err(format!(
|
||||
"Cannot drain node — the following pods block eviction:\n - {}",
|
||||
blocking.join("\n - ")
|
||||
));
|
||||
}
|
||||
Ok((evictable, skipped))
|
||||
}
|
||||
|
||||
async fn evict_pod(&self, pod: &Pod) -> Result<(), Error> {
|
||||
let name = pod.metadata.name.as_deref().unwrap_or_default();
|
||||
let ns = pod.metadata.namespace.as_deref().unwrap_or_default();
|
||||
debug!("Evicting pod {ns}/{name}");
|
||||
Api::<Pod>::namespaced(self.client.clone(), ns)
|
||||
.evict(name, &EvictParams::default())
|
||||
.await
|
||||
.map(|_| ())
|
||||
}
|
||||
|
||||
/// Drains a node: cordon → classify → evict & wait.
|
||||
pub async fn drain_node(&self, node_name: &str, options: &DrainOptions) -> Result<(), Error> {
|
||||
debug!("Cordoning '{node_name}'");
|
||||
self.cordon_node(node_name).await?;
|
||||
|
||||
let pods = self.list_pods_on_node(node_name).await?;
|
||||
debug!("Found {} pod(s) on '{node_name}'", pods.len());
|
||||
|
||||
let (evictable, skipped) =
|
||||
Self::classify_pods_for_drain(&pods, options).map_err(|msg| {
|
||||
error!("{msg}");
|
||||
Error::Discovery(DiscoveryError::MissingResource(msg))
|
||||
})?;
|
||||
|
||||
for s in &skipped {
|
||||
info!("Skipping pod: {s}");
|
||||
}
|
||||
if evictable.is_empty() {
|
||||
info!("No pods to evict on '{node_name}'");
|
||||
return Ok(());
|
||||
}
|
||||
info!("Evicting {} pod(s) from '{node_name}'", evictable.len());
|
||||
|
||||
let mut start = tokio::time::Instant::now();
|
||||
let poll = Duration::from_secs(5);
|
||||
let mut pending = evictable;
|
||||
|
||||
loop {
|
||||
for pod in &pending {
|
||||
match self.evict_pod(pod).await {
|
||||
Ok(()) => {}
|
||||
Err(Error::Api(ErrorResponse { code: 404, .. })) => {}
|
||||
Err(Error::Api(ErrorResponse { code: 429, .. })) => {
|
||||
warn!(
|
||||
"PDB blocked eviction of {}/{}; will retry",
|
||||
pod.metadata.namespace.as_deref().unwrap_or(""),
|
||||
pod.metadata.name.as_deref().unwrap_or("")
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
error!(
|
||||
"Failed to evict {}/{}: {e}",
|
||||
pod.metadata.namespace.as_deref().unwrap_or(""),
|
||||
pod.metadata.name.as_deref().unwrap_or("")
|
||||
);
|
||||
return Err(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
sleep(poll).await;
|
||||
|
||||
let mut still_present = Vec::new();
|
||||
for pod in pending {
|
||||
let ns = pod.metadata.namespace.as_deref().unwrap_or_default();
|
||||
let name = pod.metadata.name.as_deref().unwrap_or_default();
|
||||
match self.get_pod(name, Some(ns)).await? {
|
||||
Some(_) => still_present.push(pod),
|
||||
None => debug!("Pod {ns}/{name} evicted"),
|
||||
}
|
||||
}
|
||||
pending = still_present;
|
||||
|
||||
if pending.is_empty() {
|
||||
break;
|
||||
}
|
||||
|
||||
if start.elapsed() > options.timeout {
|
||||
match helper::prompt_drain_timeout_action(
|
||||
node_name,
|
||||
pending.len(),
|
||||
options.timeout,
|
||||
)? {
|
||||
helper::DrainTimeoutAction::Accept => break,
|
||||
helper::DrainTimeoutAction::Retry => {
|
||||
start = tokio::time::Instant::now();
|
||||
continue;
|
||||
}
|
||||
helper::DrainTimeoutAction::Abort => {
|
||||
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
|
||||
"Drain aborted. {} pod(s) remaining on '{node_name}'",
|
||||
pending.len()
|
||||
))));
|
||||
}
|
||||
}
|
||||
}
|
||||
debug!("Waiting for {} pod(s) on '{node_name}'", pending.len());
|
||||
}
|
||||
|
||||
debug!("'{node_name}' drained successfully");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Safely reboots a node: drain → reboot → wait for Ready → uncordon.
|
||||
pub async fn reboot_node(
|
||||
&self,
|
||||
node_name: &str,
|
||||
drain_options: &DrainOptions,
|
||||
timeout: Duration,
|
||||
) -> Result<(), Error> {
|
||||
info!("Starting reboot for '{node_name}'");
|
||||
let node_api: Api<Node> = Api::all(self.client.clone());
|
||||
|
||||
let boot_id_before = node_api
|
||||
.get(node_name)
|
||||
.await?
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.node_info.as_ref())
|
||||
.map(|ni| ni.boot_id.clone())
|
||||
.ok_or_else(|| {
|
||||
Error::Discovery(DiscoveryError::MissingResource(format!(
|
||||
"Node '{node_name}' has no boot_id in status"
|
||||
)))
|
||||
})?;
|
||||
|
||||
info!("Draining '{node_name}'");
|
||||
self.drain_node(node_name, drain_options).await?;
|
||||
|
||||
let start = tokio::time::Instant::now();
|
||||
|
||||
info!("Scheduling reboot for '{node_name}'");
|
||||
let reboot_cmd =
|
||||
"echo rebooting ; nohup bash -c 'sleep 5 && nsenter -t 1 -m -- systemctl reboot'";
|
||||
match self
|
||||
.run_privileged_command_on_node(node_name, reboot_cmd)
|
||||
.await
|
||||
{
|
||||
Ok(_) => debug!("Reboot command dispatched"),
|
||||
Err(e) => debug!("Reboot command error (expected if node began shutdown): {e}"),
|
||||
}
|
||||
|
||||
info!("Waiting for '{node_name}' to begin shutdown");
|
||||
self.wait_for_node_not_ready(node_name, timeout.saturating_sub(start.elapsed()))
|
||||
.await?;
|
||||
|
||||
if start.elapsed() > timeout {
|
||||
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
|
||||
"Timeout during reboot of '{node_name}' (shutdown phase)"
|
||||
))));
|
||||
}
|
||||
|
||||
info!("Waiting for '{node_name}' to come back online");
|
||||
self.wait_for_node_ready_with_timeout(node_name, timeout.saturating_sub(start.elapsed()))
|
||||
.await?;
|
||||
|
||||
if start.elapsed() > timeout {
|
||||
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
|
||||
"Timeout during reboot of '{node_name}' (ready phase)"
|
||||
))));
|
||||
}
|
||||
|
||||
let boot_id_after = node_api
|
||||
.get(node_name)
|
||||
.await?
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.node_info.as_ref())
|
||||
.map(|ni| ni.boot_id.clone())
|
||||
.ok_or_else(|| {
|
||||
Error::Discovery(DiscoveryError::MissingResource(format!(
|
||||
"Node '{node_name}' has no boot_id after reboot"
|
||||
)))
|
||||
})?;
|
||||
|
||||
if boot_id_before == boot_id_after {
|
||||
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
|
||||
"Node '{node_name}' did not actually reboot (boot_id unchanged: {boot_id_before})"
|
||||
))));
|
||||
}
|
||||
|
||||
info!("'{node_name}' rebooted ({boot_id_before} → {boot_id_after})");
|
||||
self.uncordon_node(node_name).await?;
|
||||
info!("'{node_name}' reboot complete ({:?})", start.elapsed());
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Write a set of files to a node's filesystem via a privileged ephemeral pod.
|
||||
pub async fn write_files_to_node(
|
||||
&self,
|
||||
node_name: &str,
|
||||
files: &[NodeFile],
|
||||
) -> Result<String, Error> {
|
||||
let ns = self.client.default_namespace();
|
||||
let suffix = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
let name = format!("harmony-k8s-writer-{suffix}");
|
||||
|
||||
debug!("Writing {} file(s) to '{node_name}'", files.len());
|
||||
|
||||
let mut data = BTreeMap::new();
|
||||
let mut script = String::from("set -e\n");
|
||||
for (i, file) in files.iter().enumerate() {
|
||||
let key = format!("f{i}");
|
||||
data.insert(key.clone(), file.content.clone());
|
||||
script.push_str(&format!("mkdir -p \"$(dirname \"/host{}\")\"\n", file.path));
|
||||
script.push_str(&format!("cp \"/payload/{key}\" \"/host{}\"\n", file.path));
|
||||
script.push_str(&format!("chmod {:o} \"/host{}\"\n", file.mode, file.path));
|
||||
}
|
||||
|
||||
let cm = ConfigMap {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(name.clone()),
|
||||
namespace: Some(ns.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
data: Some(data),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let cm_api: Api<ConfigMap> = Api::namespaced(self.client.clone(), ns);
|
||||
cm_api.create(&PostParams::default(), &cm).await?;
|
||||
debug!("Created ConfigMap '{name}'");
|
||||
|
||||
let (host_vol, host_mount) = helper::host_root_volume();
|
||||
let payload_vol = Volume {
|
||||
name: "payload".to_string(),
|
||||
config_map: Some(ConfigMapVolumeSource {
|
||||
name: name.clone(),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
let payload_mount = VolumeMount {
|
||||
name: "payload".to_string(),
|
||||
mount_path: "/payload".to_string(),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let bundle = helper::build_privileged_bundle(
|
||||
PrivilegedPodConfig {
|
||||
name: name.clone(),
|
||||
namespace: ns.to_string(),
|
||||
node_name: node_name.to_string(),
|
||||
container_name: "writer".to_string(),
|
||||
command: vec!["/bin/bash".to_string(), "-c".to_string(), script],
|
||||
volumes: vec![payload_vol, host_vol],
|
||||
volume_mounts: vec![payload_mount, host_mount],
|
||||
host_pid: false,
|
||||
host_network: false,
|
||||
},
|
||||
&self.get_k8s_distribution().await?,
|
||||
);
|
||||
|
||||
bundle.apply(self).await?;
|
||||
debug!("Created privileged pod bundle '{name}'");
|
||||
|
||||
let result = self.wait_for_pod_completion(&name, ns).await;
|
||||
|
||||
debug!("Cleaning up '{name}'");
|
||||
let _ = bundle.delete(self).await;
|
||||
let _ = cm_api.delete(&name, &DeleteParams::default()).await;
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
/// Run a privileged command on a node via an ephemeral pod.
|
||||
pub async fn run_privileged_command_on_node(
|
||||
&self,
|
||||
node_name: &str,
|
||||
command: &str,
|
||||
) -> Result<String, Error> {
|
||||
let namespace = self.client.default_namespace();
|
||||
let suffix = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_millis();
|
||||
let name = format!("harmony-k8s-cmd-{suffix}");
|
||||
|
||||
debug!("Running privileged command on '{node_name}': {command}");
|
||||
|
||||
let (host_vol, host_mount) = helper::host_root_volume();
|
||||
let bundle = helper::build_privileged_bundle(
|
||||
PrivilegedPodConfig {
|
||||
name: name.clone(),
|
||||
namespace: namespace.to_string(),
|
||||
node_name: node_name.to_string(),
|
||||
container_name: "runner".to_string(),
|
||||
command: vec![
|
||||
"/bin/bash".to_string(),
|
||||
"-c".to_string(),
|
||||
command.to_string(),
|
||||
],
|
||||
volumes: vec![host_vol],
|
||||
volume_mounts: vec![host_mount],
|
||||
host_pid: true,
|
||||
host_network: true,
|
||||
},
|
||||
&self.get_k8s_distribution().await?,
|
||||
);
|
||||
|
||||
bundle.apply(self).await?;
|
||||
debug!("Privileged pod '{name}' created");
|
||||
|
||||
let result = self.wait_for_pod_completion(&name, namespace).await;
|
||||
|
||||
debug!("Cleaning up '{name}'");
|
||||
let _ = bundle.delete(self).await;
|
||||
|
||||
result
|
||||
}
|
||||
}
|
||||
|
||||
// ── Tests ────────────────────────────────────────────────────────────────────
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use k8s_openapi::api::core::v1::{EmptyDirVolumeSource, PodSpec, PodStatus, Volume};
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, OwnerReference};
|
||||
|
||||
use super::*;
|
||||
|
||||
fn base_pod(name: &str, ns: &str) -> Pod {
|
||||
Pod {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(name.to_string()),
|
||||
namespace: Some(ns.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: Some(PodSpec::default()),
|
||||
status: Some(PodStatus {
|
||||
phase: Some("Running".to_string()),
|
||||
..Default::default()
|
||||
}),
|
||||
}
|
||||
}
|
||||
|
||||
fn mirror_pod(name: &str, ns: &str) -> Pod {
|
||||
let mut pod = base_pod(name, ns);
|
||||
pod.metadata.annotations = Some(std::collections::BTreeMap::from([(
|
||||
"kubernetes.io/config.mirror".to_string(),
|
||||
"abc123".to_string(),
|
||||
)]));
|
||||
pod
|
||||
}
|
||||
|
||||
fn daemonset_pod(name: &str, ns: &str) -> Pod {
|
||||
let mut pod = base_pod(name, ns);
|
||||
pod.metadata.owner_references = Some(vec![OwnerReference {
|
||||
api_version: "apps/v1".to_string(),
|
||||
kind: "DaemonSet".to_string(),
|
||||
name: "some-ds".to_string(),
|
||||
uid: "uid-ds".to_string(),
|
||||
..Default::default()
|
||||
}]);
|
||||
pod
|
||||
}
|
||||
|
||||
fn emptydir_pod(name: &str, ns: &str) -> Pod {
|
||||
let mut pod = base_pod(name, ns);
|
||||
pod.spec = Some(PodSpec {
|
||||
volumes: Some(vec![Volume {
|
||||
name: "scratch".to_string(),
|
||||
empty_dir: Some(EmptyDirVolumeSource::default()),
|
||||
..Default::default()
|
||||
}]),
|
||||
..Default::default()
|
||||
});
|
||||
pod
|
||||
}
|
||||
|
||||
fn completed_pod(name: &str, ns: &str, phase: &str) -> Pod {
|
||||
let mut pod = base_pod(name, ns);
|
||||
pod.status = Some(PodStatus {
|
||||
phase: Some(phase.to_string()),
|
||||
..Default::default()
|
||||
});
|
||||
pod
|
||||
}
|
||||
|
||||
fn default_opts() -> DrainOptions {
|
||||
DrainOptions::default()
|
||||
}
|
||||
|
||||
// All test bodies are identical to the original — only the module path changed.
|
||||
|
||||
#[test]
|
||||
fn empty_pod_list_returns_empty_vecs() {
|
||||
let (e, s) = K8sClient::classify_pods_for_drain(&[], &default_opts()).unwrap();
|
||||
assert!(e.is_empty());
|
||||
assert!(s.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn normal_pod_is_evictable() {
|
||||
let pods = vec![base_pod("web", "default")];
|
||||
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
|
||||
assert_eq!(e.len(), 1);
|
||||
assert!(s.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mirror_pod_is_skipped() {
|
||||
let pods = vec![mirror_pod("kube-apiserver", "kube-system")];
|
||||
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
|
||||
assert!(e.is_empty());
|
||||
assert!(s[0].contains("mirror pod"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn completed_pods_are_skipped() {
|
||||
for phase in ["Succeeded", "Failed"] {
|
||||
let pods = vec![completed_pod("job", "batch", phase)];
|
||||
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
|
||||
assert!(e.is_empty());
|
||||
assert!(s[0].contains("completed"));
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn daemonset_skipped_when_ignored() {
|
||||
let pods = vec![daemonset_pod("fluentd", "logging")];
|
||||
let opts = DrainOptions {
|
||||
ignore_daemonsets: true,
|
||||
..default_opts()
|
||||
};
|
||||
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap();
|
||||
assert!(e.is_empty());
|
||||
assert!(s[0].contains("DaemonSet-managed"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn daemonset_blocks_when_not_ignored() {
|
||||
let pods = vec![daemonset_pod("fluentd", "logging")];
|
||||
let opts = DrainOptions {
|
||||
ignore_daemonsets: false,
|
||||
..default_opts()
|
||||
};
|
||||
let err = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap_err();
|
||||
assert!(err.contains("DaemonSet") && err.contains("logging/fluentd"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emptydir_blocks_without_flag() {
|
||||
let pods = vec![emptydir_pod("cache", "default")];
|
||||
let opts = DrainOptions {
|
||||
delete_emptydir_data: false,
|
||||
..default_opts()
|
||||
};
|
||||
let err = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap_err();
|
||||
assert!(err.contains("emptyDir") && err.contains("default/cache"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn emptydir_evictable_with_flag() {
|
||||
let pods = vec![emptydir_pod("cache", "default")];
|
||||
let opts = DrainOptions {
|
||||
delete_emptydir_data: true,
|
||||
..default_opts()
|
||||
};
|
||||
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap();
|
||||
assert_eq!(e.len(), 1);
|
||||
assert!(s.is_empty());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn multiple_blocking_all_reported() {
|
||||
let pods = vec![daemonset_pod("ds", "ns1"), emptydir_pod("ed", "ns2")];
|
||||
let opts = DrainOptions {
|
||||
ignore_daemonsets: false,
|
||||
delete_emptydir_data: false,
|
||||
..default_opts()
|
||||
};
|
||||
let err = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap_err();
|
||||
assert!(err.contains("ns1/ds") && err.contains("ns2/ed"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mixed_pods_classified_correctly() {
|
||||
let pods = vec![
|
||||
base_pod("web", "default"),
|
||||
mirror_pod("kube-apiserver", "kube-system"),
|
||||
daemonset_pod("fluentd", "logging"),
|
||||
completed_pod("job", "batch", "Succeeded"),
|
||||
base_pod("api", "default"),
|
||||
];
|
||||
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
|
||||
let names: Vec<&str> = e
|
||||
.iter()
|
||||
.map(|p| p.metadata.name.as_deref().unwrap())
|
||||
.collect();
|
||||
assert_eq!(names, vec!["web", "api"]);
|
||||
assert_eq!(s.len(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn mirror_checked_before_completed() {
|
||||
let mut pod = mirror_pod("static-etcd", "kube-system");
|
||||
pod.status = Some(PodStatus {
|
||||
phase: Some("Succeeded".to_string()),
|
||||
..Default::default()
|
||||
});
|
||||
let (_, s) = K8sClient::classify_pods_for_drain(&[pod], &default_opts()).unwrap();
|
||||
assert!(s[0].contains("mirror pod"), "got: {}", s[0]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn completed_checked_before_daemonset() {
|
||||
let mut pod = daemonset_pod("collector", "monitoring");
|
||||
pod.status = Some(PodStatus {
|
||||
phase: Some("Failed".to_string()),
|
||||
..Default::default()
|
||||
});
|
||||
let (_, s) = K8sClient::classify_pods_for_drain(&[pod], &default_opts()).unwrap();
|
||||
assert!(s[0].contains("completed"), "got: {}", s[0]);
|
||||
}
|
||||
}
|
||||
193
harmony-k8s/src/pod.rs
Normal file
193
harmony-k8s/src/pod.rs
Normal file
@@ -0,0 +1,193 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use k8s_openapi::api::core::v1::Pod;
|
||||
use kube::{
|
||||
Error,
|
||||
api::{Api, AttachParams, ListParams},
|
||||
error::DiscoveryError,
|
||||
runtime::reflector::Lookup,
|
||||
};
|
||||
use log::debug;
|
||||
use tokio::io::AsyncReadExt;
|
||||
use tokio::time::sleep;
|
||||
|
||||
use crate::client::K8sClient;
|
||||
|
||||
impl K8sClient {
|
||||
pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result<Option<Pod>, Error> {
|
||||
let api: Api<Pod> = match namespace {
|
||||
Some(ns) => Api::namespaced(self.client.clone(), ns),
|
||||
None => Api::default_namespaced(self.client.clone()),
|
||||
};
|
||||
api.get_opt(name).await
|
||||
}
|
||||
|
||||
pub async fn wait_for_pod_ready(
|
||||
&self,
|
||||
pod_name: &str,
|
||||
namespace: Option<&str>,
|
||||
) -> Result<(), Error> {
|
||||
let mut elapsed = 0u64;
|
||||
let interval = 5u64;
|
||||
let timeout_secs = 120u64;
|
||||
loop {
|
||||
if let Some(p) = self.get_pod(pod_name, namespace).await? {
|
||||
if let Some(phase) = p.status.and_then(|s| s.phase) {
|
||||
if phase.to_lowercase() == "running" {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
if elapsed >= timeout_secs {
|
||||
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
|
||||
"Pod '{}' in '{}' did not become ready within {timeout_secs}s",
|
||||
pod_name,
|
||||
namespace.unwrap_or("<default>"),
|
||||
))));
|
||||
}
|
||||
sleep(Duration::from_secs(interval)).await;
|
||||
elapsed += interval;
|
||||
}
|
||||
}
|
||||
|
||||
/// Polls a pod until it reaches `Succeeded` or `Failed`, then returns its
|
||||
/// logs. Used internally by node operations.
|
||||
pub(crate) async fn wait_for_pod_completion(
|
||||
&self,
|
||||
name: &str,
|
||||
namespace: &str,
|
||||
) -> Result<String, Error> {
|
||||
let api: Api<Pod> = Api::namespaced(self.client.clone(), namespace);
|
||||
let poll_interval = Duration::from_secs(2);
|
||||
for _ in 0..60 {
|
||||
sleep(poll_interval).await;
|
||||
let p = api.get(name).await?;
|
||||
match p.status.and_then(|s| s.phase).as_deref() {
|
||||
Some("Succeeded") => {
|
||||
let logs = api
|
||||
.logs(name, &Default::default())
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
debug!("Pod {namespace}/{name} succeeded. Logs: {logs}");
|
||||
return Ok(logs);
|
||||
}
|
||||
Some("Failed") => {
|
||||
let logs = api
|
||||
.logs(name, &Default::default())
|
||||
.await
|
||||
.unwrap_or_default();
|
||||
debug!("Pod {namespace}/{name} failed. Logs: {logs}");
|
||||
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
|
||||
"Pod '{name}' failed.\n{logs}"
|
||||
))));
|
||||
}
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
Err(Error::Discovery(DiscoveryError::MissingResource(format!(
|
||||
"Timed out waiting for pod '{name}'"
|
||||
))))
|
||||
}
|
||||
|
||||
/// Execute a command in the first pod matching `{label}={name}`.
|
||||
pub async fn exec_app_capture_output(
|
||||
&self,
|
||||
name: String,
|
||||
label: String,
|
||||
namespace: Option<&str>,
|
||||
command: Vec<&str>,
|
||||
) -> Result<String, String> {
|
||||
let api: Api<Pod> = match namespace {
|
||||
Some(ns) => Api::namespaced(self.client.clone(), ns),
|
||||
None => Api::default_namespaced(self.client.clone()),
|
||||
};
|
||||
let pod_list = api
|
||||
.list(&ListParams::default().labels(&format!("{label}={name}")))
|
||||
.await
|
||||
.expect("Failed to list pods");
|
||||
|
||||
let pod_name = pod_list
|
||||
.items
|
||||
.first()
|
||||
.expect("No matching pod")
|
||||
.name()
|
||||
.expect("Pod has no name")
|
||||
.into_owned();
|
||||
|
||||
match api
|
||||
.exec(
|
||||
&pod_name,
|
||||
command,
|
||||
&AttachParams::default().stdout(true).stderr(true),
|
||||
)
|
||||
.await
|
||||
{
|
||||
Err(e) => Err(e.to_string()),
|
||||
Ok(mut process) => {
|
||||
let status = process
|
||||
.take_status()
|
||||
.expect("No status handle")
|
||||
.await
|
||||
.expect("Status channel closed");
|
||||
|
||||
if let Some(s) = status.status {
|
||||
let mut buf = String::new();
|
||||
if let Some(mut stdout) = process.stdout() {
|
||||
stdout
|
||||
.read_to_string(&mut buf)
|
||||
.await
|
||||
.map_err(|e| format!("Failed to read stdout: {e}"))?;
|
||||
}
|
||||
debug!("exec status: {} - {:?}", s, status.details);
|
||||
if s == "Success" { Ok(buf) } else { Err(s) }
|
||||
} else {
|
||||
Err("No inner status from pod exec".to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Execute a command in the first pod matching
|
||||
/// `app.kubernetes.io/name={name}`.
|
||||
pub async fn exec_app(
|
||||
&self,
|
||||
name: String,
|
||||
namespace: Option<&str>,
|
||||
command: Vec<&str>,
|
||||
) -> Result<(), String> {
|
||||
let api: Api<Pod> = match namespace {
|
||||
Some(ns) => Api::namespaced(self.client.clone(), ns),
|
||||
None => Api::default_namespaced(self.client.clone()),
|
||||
};
|
||||
let pod_list = api
|
||||
.list(&ListParams::default().labels(&format!("app.kubernetes.io/name={name}")))
|
||||
.await
|
||||
.expect("Failed to list pods");
|
||||
|
||||
let pod_name = pod_list
|
||||
.items
|
||||
.first()
|
||||
.expect("No matching pod")
|
||||
.name()
|
||||
.expect("Pod has no name")
|
||||
.into_owned();
|
||||
|
||||
match api.exec(&pod_name, command, &AttachParams::default()).await {
|
||||
Err(e) => Err(e.to_string()),
|
||||
Ok(mut process) => {
|
||||
let status = process
|
||||
.take_status()
|
||||
.expect("No status handle")
|
||||
.await
|
||||
.expect("Status channel closed");
|
||||
|
||||
if let Some(s) = status.status {
|
||||
debug!("exec status: {} - {:?}", s, status.details);
|
||||
if s == "Success" { Ok(()) } else { Err(s) }
|
||||
} else {
|
||||
Err("No inner status from pod exec".to_string())
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
316
harmony-k8s/src/resources.rs
Normal file
316
harmony-k8s/src/resources.rs
Normal file
@@ -0,0 +1,316 @@
|
||||
use std::collections::HashMap;
|
||||
|
||||
use k8s_openapi::api::{
|
||||
apps::v1::Deployment,
|
||||
core::v1::{Node, ServiceAccount},
|
||||
};
|
||||
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
|
||||
use kube::api::ApiResource;
|
||||
use kube::{
|
||||
Error, Resource,
|
||||
api::{Api, DynamicObject, GroupVersionKind, ListParams, ObjectList},
|
||||
runtime::conditions,
|
||||
runtime::wait::await_condition,
|
||||
};
|
||||
use log::debug;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde_json::Value;
|
||||
use std::time::Duration;
|
||||
|
||||
use crate::client::K8sClient;
|
||||
use crate::types::ScopeResolver;
|
||||
|
||||
impl K8sClient {
|
||||
pub async fn has_healthy_deployment_with_label(
|
||||
&self,
|
||||
namespace: &str,
|
||||
label_selector: &str,
|
||||
) -> Result<bool, Error> {
|
||||
let api: Api<Deployment> = Api::namespaced(self.client.clone(), namespace);
|
||||
let list = api
|
||||
.list(&ListParams::default().labels(label_selector))
|
||||
.await?;
|
||||
for d in list.items {
|
||||
let available = d
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.available_replicas)
|
||||
.unwrap_or(0);
|
||||
if available > 0 {
|
||||
return Ok(true);
|
||||
}
|
||||
if let Some(conds) = d.status.as_ref().and_then(|s| s.conditions.as_ref()) {
|
||||
if conds
|
||||
.iter()
|
||||
.any(|c| c.type_ == "Available" && c.status == "True")
|
||||
{
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
pub async fn list_namespaces_with_healthy_deployments(
|
||||
&self,
|
||||
label_selector: &str,
|
||||
) -> Result<Vec<String>, Error> {
|
||||
let api: Api<Deployment> = Api::all(self.client.clone());
|
||||
let list = api
|
||||
.list(&ListParams::default().labels(label_selector))
|
||||
.await?;
|
||||
|
||||
let mut healthy_ns: HashMap<String, bool> = HashMap::new();
|
||||
for d in list.items {
|
||||
let ns = match d.metadata.namespace.clone() {
|
||||
Some(n) => n,
|
||||
None => continue,
|
||||
};
|
||||
let available = d
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.available_replicas)
|
||||
.unwrap_or(0);
|
||||
let is_healthy = if available > 0 {
|
||||
true
|
||||
} else {
|
||||
d.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.conditions.as_ref())
|
||||
.map(|c| {
|
||||
c.iter()
|
||||
.any(|c| c.type_ == "Available" && c.status == "True")
|
||||
})
|
||||
.unwrap_or(false)
|
||||
};
|
||||
if is_healthy {
|
||||
healthy_ns.insert(ns, true);
|
||||
}
|
||||
}
|
||||
Ok(healthy_ns.into_keys().collect())
|
||||
}
|
||||
|
||||
pub async fn get_controller_service_account_name(
|
||||
&self,
|
||||
ns: &str,
|
||||
) -> Result<Option<String>, Error> {
|
||||
let api: Api<Deployment> = Api::namespaced(self.client.clone(), ns);
|
||||
let list = api
|
||||
.list(&ListParams::default().labels("app.kubernetes.io/component=controller"))
|
||||
.await?;
|
||||
if let Some(dep) = list.items.first() {
|
||||
if let Some(sa) = dep
|
||||
.spec
|
||||
.as_ref()
|
||||
.and_then(|s| s.template.spec.as_ref())
|
||||
.and_then(|s| s.service_account_name.clone())
|
||||
{
|
||||
return Ok(Some(sa));
|
||||
}
|
||||
}
|
||||
Ok(None)
|
||||
}
|
||||
|
||||
pub async fn list_clusterrolebindings_json(&self) -> Result<Vec<Value>, Error> {
|
||||
let gvk = GroupVersionKind::gvk("rbac.authorization.k8s.io", "v1", "ClusterRoleBinding");
|
||||
let ar = ApiResource::from_gvk(&gvk);
|
||||
let api: Api<DynamicObject> = Api::all_with(self.client.clone(), &ar);
|
||||
let list = api.list(&ListParams::default()).await?;
|
||||
Ok(list
|
||||
.items
|
||||
.into_iter()
|
||||
.map(|o| serde_json::to_value(&o).unwrap_or(Value::Null))
|
||||
.collect())
|
||||
}
|
||||
|
||||
pub async fn is_service_account_cluster_wide(&self, sa: &str, ns: &str) -> Result<bool, Error> {
|
||||
let sa_user = format!("system:serviceaccount:{ns}:{sa}");
|
||||
for crb in self.list_clusterrolebindings_json().await? {
|
||||
if let Some(subjects) = crb.get("subjects").and_then(|s| s.as_array()) {
|
||||
for subj in subjects {
|
||||
let kind = subj.get("kind").and_then(|v| v.as_str()).unwrap_or("");
|
||||
let name = subj.get("name").and_then(|v| v.as_str()).unwrap_or("");
|
||||
let subj_ns = subj.get("namespace").and_then(|v| v.as_str()).unwrap_or("");
|
||||
if (kind == "ServiceAccount" && name == sa && subj_ns == ns)
|
||||
|| (kind == "User" && name == sa_user)
|
||||
{
|
||||
return Ok(true);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(false)
|
||||
}
|
||||
|
||||
pub async fn has_crd(&self, name: &str) -> Result<bool, Error> {
|
||||
let api: Api<CustomResourceDefinition> = Api::all(self.client.clone());
|
||||
let crds = api
|
||||
.list(&ListParams::default().fields(&format!("metadata.name={name}")))
|
||||
.await?;
|
||||
Ok(!crds.items.is_empty())
|
||||
}
|
||||
|
||||
pub async fn service_account_api(&self, namespace: &str) -> Api<ServiceAccount> {
|
||||
Api::namespaced(self.client.clone(), namespace)
|
||||
}
|
||||
|
||||
pub async fn get_resource_json_value(
|
||||
&self,
|
||||
name: &str,
|
||||
namespace: Option<&str>,
|
||||
gvk: &GroupVersionKind,
|
||||
) -> Result<DynamicObject, Error> {
|
||||
let ar = ApiResource::from_gvk(gvk);
|
||||
let api: Api<DynamicObject> = match namespace {
|
||||
Some(ns) => Api::namespaced_with(self.client.clone(), ns, &ar),
|
||||
None => Api::default_namespaced_with(self.client.clone(), &ar),
|
||||
};
|
||||
api.get(name).await
|
||||
}
|
||||
|
||||
pub async fn get_secret_json_value(
|
||||
&self,
|
||||
name: &str,
|
||||
namespace: Option<&str>,
|
||||
) -> Result<DynamicObject, Error> {
|
||||
self.get_resource_json_value(
|
||||
name,
|
||||
namespace,
|
||||
&GroupVersionKind {
|
||||
group: String::new(),
|
||||
version: "v1".to_string(),
|
||||
kind: "Secret".to_string(),
|
||||
},
|
||||
)
|
||||
.await
|
||||
}
|
||||
|
||||
pub async fn get_deployment(
|
||||
&self,
|
||||
name: &str,
|
||||
namespace: Option<&str>,
|
||||
) -> Result<Option<Deployment>, Error> {
|
||||
let api: Api<Deployment> = match namespace {
|
||||
Some(ns) => {
|
||||
debug!("Getting namespaced deployment '{name}' in '{ns}'");
|
||||
Api::namespaced(self.client.clone(), ns)
|
||||
}
|
||||
None => {
|
||||
debug!("Getting deployment '{name}' in default namespace");
|
||||
Api::default_namespaced(self.client.clone())
|
||||
}
|
||||
};
|
||||
api.get_opt(name).await
|
||||
}
|
||||
|
||||
pub async fn scale_deployment(
|
||||
&self,
|
||||
name: &str,
|
||||
namespace: Option<&str>,
|
||||
replicas: u32,
|
||||
) -> Result<(), Error> {
|
||||
let api: Api<Deployment> = match namespace {
|
||||
Some(ns) => Api::namespaced(self.client.clone(), ns),
|
||||
None => Api::default_namespaced(self.client.clone()),
|
||||
};
|
||||
use kube::api::{Patch, PatchParams};
|
||||
use serde_json::json;
|
||||
let patch = json!({ "spec": { "replicas": replicas } });
|
||||
api.patch_scale(name, &PatchParams::default(), &Patch::Merge(&patch))
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn delete_deployment(
|
||||
&self,
|
||||
name: &str,
|
||||
namespace: Option<&str>,
|
||||
) -> Result<(), Error> {
|
||||
let api: Api<Deployment> = match namespace {
|
||||
Some(ns) => Api::namespaced(self.client.clone(), ns),
|
||||
None => Api::default_namespaced(self.client.clone()),
|
||||
};
|
||||
api.delete(name, &kube::api::DeleteParams::default())
|
||||
.await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub async fn wait_until_deployment_ready(
|
||||
&self,
|
||||
name: &str,
|
||||
namespace: Option<&str>,
|
||||
timeout: Option<Duration>,
|
||||
) -> Result<(), String> {
|
||||
let api: Api<Deployment> = match namespace {
|
||||
Some(ns) => Api::namespaced(self.client.clone(), ns),
|
||||
None => Api::default_namespaced(self.client.clone()),
|
||||
};
|
||||
let timeout = timeout.unwrap_or(Duration::from_secs(120));
|
||||
let establish = await_condition(api, name, conditions::is_deployment_completed());
|
||||
tokio::time::timeout(timeout, establish)
|
||||
.await
|
||||
.map(|_| ())
|
||||
.map_err(|_| "Timed out waiting for deployment".to_string())
|
||||
}
|
||||
|
||||
/// Gets a single named resource, using the correct API scope for `K`.
|
||||
pub async fn get_resource<K>(
|
||||
&self,
|
||||
name: &str,
|
||||
namespace: Option<&str>,
|
||||
) -> Result<Option<K>, Error>
|
||||
where
|
||||
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
|
||||
<K as Resource>::Scope: ScopeResolver<K>,
|
||||
<K as Resource>::DynamicType: Default,
|
||||
{
|
||||
let api: Api<K> =
|
||||
<<K as Resource>::Scope as ScopeResolver<K>>::get_api(&self.client, namespace);
|
||||
api.get_opt(name).await
|
||||
}
|
||||
|
||||
pub async fn list_resources<K>(
|
||||
&self,
|
||||
namespace: Option<&str>,
|
||||
list_params: Option<ListParams>,
|
||||
) -> Result<ObjectList<K>, Error>
|
||||
where
|
||||
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
|
||||
<K as Resource>::Scope: ScopeResolver<K>,
|
||||
<K as Resource>::DynamicType: Default,
|
||||
{
|
||||
let api: Api<K> =
|
||||
<<K as Resource>::Scope as ScopeResolver<K>>::get_api(&self.client, namespace);
|
||||
api.list(&list_params.unwrap_or_default()).await
|
||||
}
|
||||
|
||||
pub async fn list_all_resources_with_labels<K>(&self, labels: &str) -> Result<Vec<K>, Error>
|
||||
where
|
||||
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
|
||||
<K as Resource>::DynamicType: Default,
|
||||
{
|
||||
Api::<K>::all(self.client.clone())
|
||||
.list(&ListParams::default().labels(labels))
|
||||
.await
|
||||
.map(|l| l.items)
|
||||
}
|
||||
|
||||
pub async fn get_all_resource_in_all_namespace<K>(&self) -> Result<Vec<K>, Error>
|
||||
where
|
||||
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
|
||||
<K as Resource>::Scope: ScopeResolver<K>,
|
||||
<K as Resource>::DynamicType: Default,
|
||||
{
|
||||
Api::<K>::all(self.client.clone())
|
||||
.list(&Default::default())
|
||||
.await
|
||||
.map(|l| l.items)
|
||||
}
|
||||
|
||||
pub async fn get_nodes(
|
||||
&self,
|
||||
list_params: Option<ListParams>,
|
||||
) -> Result<ObjectList<Node>, Error> {
|
||||
self.list_resources(None, list_params).await
|
||||
}
|
||||
}
|
||||
100
harmony-k8s/src/types.rs
Normal file
100
harmony-k8s/src/types.rs
Normal file
@@ -0,0 +1,100 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope};
|
||||
use kube::{Api, Client, Resource};
|
||||
use serde::Serialize;
|
||||
|
||||
/// Which Kubernetes distribution is running. Detected once at runtime via
|
||||
/// [`crate::discovery::K8sClient::get_k8s_distribution`].
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
|
||||
pub enum KubernetesDistribution {
|
||||
Default,
|
||||
OpenshiftFamily,
|
||||
K3sFamily,
|
||||
}
|
||||
|
||||
/// A file to be written to a node's filesystem.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NodeFile {
|
||||
/// Absolute path on the host where the file should be written.
|
||||
pub path: String,
|
||||
/// Content of the file.
|
||||
pub content: String,
|
||||
/// UNIX permissions (e.g. `0o600`).
|
||||
pub mode: u32,
|
||||
}
|
||||
|
||||
/// Options controlling the behaviour of a [`crate::K8sClient::drain_node`] operation.
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct DrainOptions {
|
||||
/// Evict pods that use `emptyDir` volumes (ephemeral data is lost).
|
||||
/// Equivalent to `kubectl drain --delete-emptydir-data`.
|
||||
pub delete_emptydir_data: bool,
|
||||
/// Silently skip DaemonSet-managed pods instead of blocking the drain.
|
||||
/// Equivalent to `kubectl drain --ignore-daemonsets`.
|
||||
pub ignore_daemonsets: bool,
|
||||
/// Maximum wall-clock time to wait for all evictions to complete.
|
||||
pub timeout: Duration,
|
||||
}
|
||||
|
||||
impl Default for DrainOptions {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
delete_emptydir_data: false,
|
||||
ignore_daemonsets: true,
|
||||
timeout: Duration::from_secs(1),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl DrainOptions {
|
||||
pub fn default_ignore_daemonset_delete_emptydir_data() -> Self {
|
||||
Self {
|
||||
delete_emptydir_data: true,
|
||||
ignore_daemonsets: true,
|
||||
..Self::default()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// Controls how [`crate::K8sClient::apply_with_strategy`] behaves when the
|
||||
/// resource already exists (or does not).
|
||||
pub enum WriteMode {
|
||||
/// Server-side apply; create if absent, update if present (default).
|
||||
CreateOrUpdate,
|
||||
/// POST only; return an error if the resource already exists.
|
||||
Create,
|
||||
/// Server-side apply only; return an error if the resource does not exist.
|
||||
Update,
|
||||
}
|
||||
|
||||
// ── Scope resolution trait ───────────────────────────────────────────────────
|
||||
|
||||
/// Resolves the correct [`kube::Api`] for a resource type based on its scope
|
||||
/// (cluster-wide vs. namespace-scoped).
|
||||
pub trait ScopeResolver<K: Resource> {
|
||||
fn get_api(client: &Client, ns: Option<&str>) -> Api<K>;
|
||||
}
|
||||
|
||||
impl<K> ScopeResolver<K> for ClusterResourceScope
|
||||
where
|
||||
K: Resource<Scope = ClusterResourceScope>,
|
||||
<K as Resource>::DynamicType: Default,
|
||||
{
|
||||
fn get_api(client: &Client, _ns: Option<&str>) -> Api<K> {
|
||||
Api::all(client.clone())
|
||||
}
|
||||
}
|
||||
|
||||
impl<K> ScopeResolver<K> for NamespaceResourceScope
|
||||
where
|
||||
K: Resource<Scope = NamespaceResourceScope>,
|
||||
<K as Resource>::DynamicType: Default,
|
||||
{
|
||||
fn get_api(client: &Client, ns: Option<&str>) -> Api<K> {
|
||||
match ns {
|
||||
Some(ns) => Api::namespaced(client.clone(), ns),
|
||||
None => Api::default_namespaced(client.clone()),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -21,6 +21,8 @@ semver = "1.0.23"
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
tokio.workspace = true
|
||||
tokio-retry.workspace = true
|
||||
tokio-util.workspace = true
|
||||
derive-new.workspace = true
|
||||
log.workspace = true
|
||||
env_logger.workspace = true
|
||||
@@ -31,6 +33,7 @@ opnsense-config-xml = { path = "../opnsense-config-xml" }
|
||||
harmony_macros = { path = "../harmony_macros" }
|
||||
harmony_types = { path = "../harmony_types" }
|
||||
harmony_execution = { path = "../harmony_execution" }
|
||||
harmony-k8s = { path = "../harmony-k8s" }
|
||||
uuid.workspace = true
|
||||
url.workspace = true
|
||||
kube = { workspace = true, features = ["derive"] }
|
||||
@@ -60,7 +63,6 @@ temp-dir = "0.1.14"
|
||||
dyn-clone = "1.0.19"
|
||||
similar.workspace = true
|
||||
futures-util = "0.3.31"
|
||||
tokio-util = "0.7.15"
|
||||
strum = { version = "0.27.1", features = ["derive"] }
|
||||
tempfile.workspace = true
|
||||
serde_with = "3.14.0"
|
||||
@@ -80,7 +82,7 @@ sqlx.workspace = true
|
||||
inquire.workspace = true
|
||||
brocade = { path = "../brocade" }
|
||||
option-ext = "0.2.0"
|
||||
tokio-retry = "0.3.0"
|
||||
rand.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions.workspace = true
|
||||
|
||||
@@ -4,8 +4,6 @@ use std::error::Error;
|
||||
use async_trait::async_trait;
|
||||
use derive_new::new;
|
||||
|
||||
use crate::inventory::HostRole;
|
||||
|
||||
use super::{
|
||||
data::Version, executors::ExecutorError, inventory::Inventory, topology::PreparationError,
|
||||
};
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use async_trait::async_trait;
|
||||
use harmony_k8s::K8sClient;
|
||||
use harmony_macros::ip;
|
||||
use harmony_types::{
|
||||
id::Id,
|
||||
@@ -8,7 +9,7 @@ use harmony_types::{
|
||||
use log::debug;
|
||||
use log::info;
|
||||
|
||||
use crate::topology::PxeOptions;
|
||||
use crate::topology::{HelmCommand, PxeOptions};
|
||||
use crate::{data::FileContent, executors::ExecutorError, topology::node_exporter::NodeExporter};
|
||||
use crate::{infra::network_manager::OpenShiftNmStateNetworkManager, topology::PortConfig};
|
||||
|
||||
@@ -16,9 +17,12 @@ use super::{
|
||||
DHCPStaticEntry, DhcpServer, DnsRecord, DnsRecordType, DnsServer, Firewall, HostNetworkConfig,
|
||||
HttpServer, IpAddress, K8sclient, LoadBalancer, LoadBalancerService, LogicalHost, NetworkError,
|
||||
NetworkManager, PreparationError, PreparationOutcome, Router, Switch, SwitchClient,
|
||||
SwitchError, TftpServer, Topology, k8s::K8sClient,
|
||||
SwitchError, TftpServer, Topology,
|
||||
};
|
||||
use std::{
|
||||
process::Command,
|
||||
sync::{Arc, OnceLock},
|
||||
};
|
||||
use std::sync::{Arc, OnceLock};
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct HAClusterTopology {
|
||||
@@ -52,6 +56,30 @@ impl Topology for HAClusterTopology {
|
||||
}
|
||||
}
|
||||
|
||||
impl HelmCommand for HAClusterTopology {
|
||||
fn get_helm_command(&self) -> Command {
|
||||
let mut cmd = Command::new("helm");
|
||||
if let Some(k) = &self.kubeconfig {
|
||||
cmd.args(["--kubeconfig", k]);
|
||||
}
|
||||
|
||||
// FIXME we should support context anywhere there is a k8sclient
|
||||
// This likely belongs in the k8sclient itself and should be extracted to a separate
|
||||
// crate
|
||||
//
|
||||
// I feel like helm could very well be a feature of this external k8s client.
|
||||
//
|
||||
// Same for kustomize
|
||||
//
|
||||
// if let Some(c) = &self.k8s_context {
|
||||
// cmd.args(["--kube-context", c]);
|
||||
// }
|
||||
|
||||
info!("Using helm command {cmd:?}");
|
||||
cmd
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl K8sclient for HAClusterTopology {
|
||||
async fn k8s_client(&self) -> Result<Arc<K8sClient>, String> {
|
||||
|
||||
File diff suppressed because it is too large
Load Diff
@@ -2,6 +2,7 @@ use std::{collections::BTreeMap, process::Command, sync::Arc, time::Duration};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use base64::{Engine, engine::general_purpose};
|
||||
use harmony_k8s::{K8sClient, KubernetesDistribution};
|
||||
use harmony_types::rfc1123::Rfc1123Name;
|
||||
use k8s_openapi::api::{
|
||||
core::v1::{Pod, Secret},
|
||||
@@ -58,7 +59,6 @@ use crate::{
|
||||
use super::super::{
|
||||
DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, PreparationError,
|
||||
PreparationOutcome, Topology,
|
||||
k8s::K8sClient,
|
||||
oberservability::monitoring::AlertReceiver,
|
||||
tenant::{
|
||||
TenantConfig, TenantManager,
|
||||
@@ -76,13 +76,6 @@ struct K8sState {
|
||||
message: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub enum KubernetesDistribution {
|
||||
OpenshiftFamily,
|
||||
K3sFamily,
|
||||
Default,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
enum K8sSource {
|
||||
LocalK3d,
|
||||
@@ -116,6 +109,13 @@ impl K8sclient for K8sAnywhereTopology {
|
||||
|
||||
#[async_trait]
|
||||
impl TlsRouter for K8sAnywhereTopology {
|
||||
async fn get_public_domain(&self) -> Result<String, String> {
|
||||
match &self.config.public_domain {
|
||||
Some(public_domain) => Ok(public_domain.to_string()),
|
||||
None => Err("Public domain not available".to_string()),
|
||||
}
|
||||
}
|
||||
|
||||
async fn get_internal_domain(&self) -> Result<Option<String>, String> {
|
||||
match self.get_k8s_distribution().await.map_err(|e| {
|
||||
format!(
|
||||
@@ -1131,6 +1131,7 @@ pub struct K8sAnywhereConfig {
|
||||
///
|
||||
/// If the context name is not found, it will fail to initialize.
|
||||
pub k8s_context: Option<String>,
|
||||
public_domain: Option<String>,
|
||||
}
|
||||
|
||||
impl K8sAnywhereConfig {
|
||||
@@ -1158,6 +1159,7 @@ impl K8sAnywhereConfig {
|
||||
|
||||
let mut kubeconfig: Option<String> = None;
|
||||
let mut k8s_context: Option<String> = None;
|
||||
let mut public_domain: Option<String> = None;
|
||||
|
||||
for part in env_var_value.split(',') {
|
||||
let kv: Vec<&str> = part.splitn(2, '=').collect();
|
||||
@@ -1165,6 +1167,7 @@ impl K8sAnywhereConfig {
|
||||
match kv[0].trim() {
|
||||
"kubeconfig" => kubeconfig = Some(kv[1].trim().to_string()),
|
||||
"context" => k8s_context = Some(kv[1].trim().to_string()),
|
||||
"public_domain" => public_domain = Some(kv[1].trim().to_string()),
|
||||
_ => {}
|
||||
}
|
||||
}
|
||||
@@ -1182,6 +1185,7 @@ impl K8sAnywhereConfig {
|
||||
K8sAnywhereConfig {
|
||||
kubeconfig,
|
||||
k8s_context,
|
||||
public_domain,
|
||||
use_system_kubeconfig,
|
||||
autoinstall: false,
|
||||
use_local_k3d: false,
|
||||
@@ -1224,6 +1228,7 @@ impl K8sAnywhereConfig {
|
||||
use_local_k3d: std::env::var("HARMONY_USE_LOCAL_K3D")
|
||||
.map_or_else(|_| true, |v| v.parse().ok().unwrap_or(true)),
|
||||
k8s_context: std::env::var("HARMONY_K8S_CONTEXT").ok(),
|
||||
public_domain: std::env::var("HARMONY_PUBLIC_DOMAIN").ok(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,7 +1,6 @@
|
||||
use async_trait::async_trait;
|
||||
|
||||
use crate::{
|
||||
interpret::Outcome,
|
||||
inventory::Inventory,
|
||||
modules::postgresql::{
|
||||
K8sPostgreSQLScore,
|
||||
|
||||
@@ -106,6 +106,7 @@ pub enum SSL {
|
||||
|
||||
#[derive(Debug, Clone, PartialEq, Serialize)]
|
||||
pub enum HealthCheck {
|
||||
HTTP(String, HttpMethod, HttpStatusCode, SSL),
|
||||
/// HTTP(None, "/healthz/ready", HttpMethod::GET, HttpStatusCode::Success2xx, SSL::Disabled)
|
||||
HTTP(Option<u16>, String, HttpMethod, HttpStatusCode, SSL),
|
||||
TCP(Option<u16>),
|
||||
}
|
||||
|
||||
@@ -16,7 +16,6 @@ pub mod tenant;
|
||||
use derive_new::new;
|
||||
pub use k8s_anywhere::*;
|
||||
pub use localhost::*;
|
||||
pub mod k8s;
|
||||
mod load_balancer;
|
||||
pub mod router;
|
||||
mod tftp;
|
||||
|
||||
@@ -9,6 +9,7 @@ use std::{
|
||||
use async_trait::async_trait;
|
||||
use brocade::PortOperatingMode;
|
||||
use derive_new::new;
|
||||
use harmony_k8s::K8sClient;
|
||||
use harmony_types::{
|
||||
id::Id,
|
||||
net::{IpAddress, MacAddress},
|
||||
@@ -18,7 +19,7 @@ use serde::Serialize;
|
||||
|
||||
use crate::executors::ExecutorError;
|
||||
|
||||
use super::{LogicalHost, k8s::K8sClient};
|
||||
use super::LogicalHost;
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct DHCPStaticEntry {
|
||||
|
||||
@@ -122,4 +122,6 @@ pub trait TlsRouter: Send + Sync {
|
||||
|
||||
/// Returns the port that this router exposes externally.
|
||||
async fn get_router_port(&self) -> u16;
|
||||
|
||||
async fn get_public_domain(&self) -> Result<String, String>;
|
||||
}
|
||||
|
||||
@@ -1,10 +1,8 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use crate::{
|
||||
executors::ExecutorError,
|
||||
topology::k8s::{ApplyStrategy, K8sClient},
|
||||
};
|
||||
use crate::executors::ExecutorError;
|
||||
use async_trait::async_trait;
|
||||
use harmony_k8s::K8sClient;
|
||||
use k8s_openapi::{
|
||||
api::{
|
||||
core::v1::{LimitRange, Namespace, ResourceQuota},
|
||||
@@ -14,7 +12,7 @@ use k8s_openapi::{
|
||||
},
|
||||
apimachinery::pkg::util::intstr::IntOrString,
|
||||
};
|
||||
use kube::{Resource, api::DynamicObject};
|
||||
use kube::Resource;
|
||||
use log::debug;
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde_json::json;
|
||||
@@ -59,7 +57,6 @@ impl K8sTenantManager {
|
||||
) -> Result<K, ExecutorError>
|
||||
where
|
||||
<K as kube::Resource>::DynamicType: Default,
|
||||
<K as kube::Resource>::Scope: ApplyStrategy<K>,
|
||||
{
|
||||
self.apply_labels(&mut resource, config);
|
||||
self.k8s_client
|
||||
|
||||
@@ -5,6 +5,7 @@ use std::{
|
||||
|
||||
use askama::Template;
|
||||
use async_trait::async_trait;
|
||||
use harmony_k8s::{DrainOptions, K8sClient, NodeFile};
|
||||
use harmony_types::id::Id;
|
||||
use k8s_openapi::api::core::v1::Node;
|
||||
use kube::{
|
||||
@@ -15,10 +16,7 @@ use log::{debug, info, warn};
|
||||
|
||||
use crate::{
|
||||
modules::okd::crd::nmstate,
|
||||
topology::{
|
||||
HostNetworkConfig, NetworkError, NetworkManager,
|
||||
k8s::{DrainOptions, K8sClient, NodeFile},
|
||||
},
|
||||
topology::{HostNetworkConfig, NetworkError, NetworkManager},
|
||||
};
|
||||
|
||||
/// NetworkManager bond configuration template
|
||||
|
||||
@@ -216,7 +216,15 @@ pub(crate) fn get_health_check_for_backend(
|
||||
SSL::Other(other.to_string())
|
||||
}
|
||||
};
|
||||
Some(HealthCheck::HTTP(path, method, status_code, ssl))
|
||||
|
||||
let port = haproxy_health_check
|
||||
.checkport
|
||||
.content_string()
|
||||
.parse::<u16>()
|
||||
.ok();
|
||||
debug!("Found haproxy healthcheck port {port:?}");
|
||||
|
||||
Some(HealthCheck::HTTP(port, path, method, status_code, ssl))
|
||||
}
|
||||
_ => panic!("Received unsupported health check type {}", uppercase),
|
||||
}
|
||||
@@ -251,7 +259,7 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
|
||||
// frontend points to backend
|
||||
let healthcheck = if let Some(health_check) = &service.health_check {
|
||||
match health_check {
|
||||
HealthCheck::HTTP(path, http_method, _http_status_code, ssl) => {
|
||||
HealthCheck::HTTP(port, path, http_method, _http_status_code, ssl) => {
|
||||
let ssl: MaybeString = match ssl {
|
||||
SSL::SSL => "ssl".into(),
|
||||
SSL::SNI => "sslni".into(),
|
||||
@@ -259,14 +267,21 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
|
||||
SSL::Default => "".into(),
|
||||
SSL::Other(other) => other.as_str().into(),
|
||||
};
|
||||
let path_without_query = path.split_once('?').map_or(path.as_str(), |(p, _)| p);
|
||||
let (port, port_name) = match port {
|
||||
Some(port) => (Some(port.to_string()), port.to_string()),
|
||||
None => (None, "serverport".to_string()),
|
||||
};
|
||||
|
||||
let haproxy_check = HAProxyHealthCheck {
|
||||
name: format!("HTTP_{http_method}_{path}"),
|
||||
name: format!("HTTP_{http_method}_{path_without_query}_{port_name}"),
|
||||
uuid: Uuid::new_v4().to_string(),
|
||||
http_method: http_method.to_string().into(),
|
||||
http_method: http_method.to_string().to_lowercase().into(),
|
||||
health_check_type: "http".to_string(),
|
||||
http_uri: path.clone().into(),
|
||||
interval: "2s".to_string(),
|
||||
ssl,
|
||||
checkport: MaybeString::from(port.map(|p| p.to_string())),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
@@ -305,7 +320,10 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
|
||||
let mut backend = HAProxyBackend {
|
||||
uuid: Uuid::new_v4().to_string(),
|
||||
enabled: 1,
|
||||
name: format!("backend_{}", service.listening_port),
|
||||
name: format!(
|
||||
"backend_{}",
|
||||
service.listening_port.to_string().replace(':', "_")
|
||||
),
|
||||
algorithm: "roundrobin".to_string(),
|
||||
random_draws: Some(2),
|
||||
stickiness_expire: "30m".to_string(),
|
||||
@@ -337,10 +355,22 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
|
||||
let frontend = Frontend {
|
||||
uuid: uuid::Uuid::new_v4().to_string(),
|
||||
enabled: 1,
|
||||
name: format!("frontend_{}", service.listening_port),
|
||||
name: format!(
|
||||
"frontend_{}",
|
||||
service.listening_port.to_string().replace(':', "_")
|
||||
),
|
||||
bind: service.listening_port.to_string(),
|
||||
mode: "tcp".to_string(), // TODO do not depend on health check here
|
||||
default_backend: Some(backend.uuid.clone()),
|
||||
stickiness_expire: "30m".to_string().into(),
|
||||
stickiness_size: "50k".to_string().into(),
|
||||
stickiness_conn_rate_period: "10s".to_string().into(),
|
||||
stickiness_sess_rate_period: "10s".to_string().into(),
|
||||
stickiness_http_req_rate_period: "10s".to_string().into(),
|
||||
stickiness_http_err_rate_period: "10s".to_string().into(),
|
||||
stickiness_bytes_in_rate_period: "1m".to_string().into(),
|
||||
stickiness_bytes_out_rate_period: "1m".to_string().into(),
|
||||
ssl_hsts_max_age: 15768000,
|
||||
..Default::default()
|
||||
};
|
||||
info!("HAPRoxy frontend and backend mode currently hardcoded to tcp");
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use async_trait::async_trait;
|
||||
use log::{debug, info, trace};
|
||||
use log::{debug, info};
|
||||
use serde::Serialize;
|
||||
use std::path::PathBuf;
|
||||
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use async_trait::async_trait;
|
||||
use harmony_k8s::K8sClient;
|
||||
use harmony_macros::hurl;
|
||||
use log::{debug, info, trace, warn};
|
||||
use non_blank_string_rs::NonBlankString;
|
||||
@@ -14,7 +15,7 @@ use crate::{
|
||||
helm::chart::{HelmChartScore, HelmRepository},
|
||||
},
|
||||
score::Score,
|
||||
topology::{HelmCommand, K8sclient, Topology, ingress::Ingress, k8s::K8sClient},
|
||||
topology::{HelmCommand, K8sclient, Topology, ingress::Ingress},
|
||||
};
|
||||
use harmony_types::id::Id;
|
||||
|
||||
|
||||
@@ -1,6 +1,5 @@
|
||||
use std::fs::{self};
|
||||
use std::path::{Path, PathBuf};
|
||||
use std::process;
|
||||
use std::path::PathBuf;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
@@ -65,6 +64,7 @@ pub struct RustWebapp {
|
||||
///
|
||||
/// This is the place to put the public host name if this is a public facing webapp.
|
||||
pub dns: String,
|
||||
pub version: String,
|
||||
}
|
||||
|
||||
impl Application for RustWebapp {
|
||||
@@ -465,6 +465,7 @@ impl RustWebapp {
|
||||
|
||||
let app_name = &self.name;
|
||||
let service_port = self.service_port;
|
||||
let version = &self.version;
|
||||
// Create Chart.yaml
|
||||
let chart_yaml = format!(
|
||||
r#"
|
||||
@@ -472,7 +473,7 @@ apiVersion: v2
|
||||
name: {chart_name}
|
||||
description: A Helm chart for the {app_name} web application.
|
||||
type: application
|
||||
version: 0.2.1
|
||||
version: {version}
|
||||
appVersion: "{image_tag}"
|
||||
"#,
|
||||
);
|
||||
|
||||
@@ -1,8 +1,9 @@
|
||||
use std::sync::Arc;
|
||||
|
||||
use harmony_k8s::K8sClient;
|
||||
use log::{debug, info};
|
||||
|
||||
use crate::{interpret::InterpretError, topology::k8s::K8sClient};
|
||||
use crate::interpret::InterpretError;
|
||||
|
||||
#[derive(Clone, Debug, PartialEq, Eq)]
|
||||
pub enum ArgoScope {
|
||||
|
||||
@@ -44,6 +44,12 @@ pub struct BrocadeSwitchAuth {
|
||||
pub password: String,
|
||||
}
|
||||
|
||||
impl BrocadeSwitchAuth {
|
||||
pub fn user_pass(username: String, password: String) -> Self {
|
||||
Self { username, password }
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Secret, Clone, Debug, JsonSchema, Serialize, Deserialize)]
|
||||
pub struct BrocadeSnmpAuth {
|
||||
pub username: String,
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
use harmony_k8s::K8sClient;
|
||||
use std::sync::Arc;
|
||||
|
||||
use async_trait::async_trait;
|
||||
@@ -11,7 +12,7 @@ use crate::{
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
score::Score,
|
||||
topology::{K8sclient, Topology, k8s::K8sClient},
|
||||
topology::{K8sclient, Topology},
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
|
||||
@@ -54,6 +54,12 @@ pub enum HarmonyDiscoveryStrategy {
|
||||
SUBNET { cidr: cidr::Ipv4Cidr, port: u16 },
|
||||
}
|
||||
|
||||
impl Default for HarmonyDiscoveryStrategy {
|
||||
fn default() -> Self {
|
||||
HarmonyDiscoveryStrategy::MDNS
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology> Interpret<T> for DiscoverInventoryAgentInterpret {
|
||||
async fn execute(
|
||||
|
||||
@@ -3,7 +3,8 @@ use std::sync::Arc;
|
||||
use async_trait::async_trait;
|
||||
use log::warn;
|
||||
|
||||
use crate::topology::{FailoverTopology, K8sclient, k8s::K8sClient};
|
||||
use crate::topology::{FailoverTopology, K8sclient};
|
||||
use harmony_k8s::K8sClient;
|
||||
|
||||
#[async_trait]
|
||||
impl<T: K8sclient> K8sclient for FailoverTopology<T> {
|
||||
|
||||
@@ -1,5 +1,5 @@
|
||||
use async_trait::async_trait;
|
||||
use k8s_openapi::NamespaceResourceScope;
|
||||
use k8s_openapi::ResourceScope;
|
||||
use kube::Resource;
|
||||
use log::info;
|
||||
use serde::{Serialize, de::DeserializeOwned};
|
||||
@@ -29,7 +29,7 @@ impl<K: Resource + std::fmt::Debug> K8sResourceScore<K> {
|
||||
}
|
||||
|
||||
impl<
|
||||
K: Resource<Scope = NamespaceResourceScope>
|
||||
K: Resource<Scope: ResourceScope>
|
||||
+ std::fmt::Debug
|
||||
+ Sync
|
||||
+ DeserializeOwned
|
||||
@@ -61,7 +61,7 @@ pub struct K8sResourceInterpret<K: Resource + std::fmt::Debug + Sync + Send> {
|
||||
|
||||
#[async_trait]
|
||||
impl<
|
||||
K: Resource<Scope = NamespaceResourceScope>
|
||||
K: Resource<Scope: ResourceScope>
|
||||
+ Clone
|
||||
+ std::fmt::Debug
|
||||
+ DeserializeOwned
|
||||
@@ -109,7 +109,7 @@ where
|
||||
topology
|
||||
.k8s_client()
|
||||
.await
|
||||
.expect("Environment should provide enough information to instanciate a client")
|
||||
.map_err(|e| InterpretError::new(format!("Failed to get k8s client : {e}")))?
|
||||
.apply_many(&self.score.resource, self.score.namespace.as_deref())
|
||||
.await?;
|
||||
|
||||
|
||||
@@ -15,10 +15,13 @@ pub mod load_balancer;
|
||||
pub mod monitoring;
|
||||
pub mod nats;
|
||||
pub mod network;
|
||||
pub mod node_health;
|
||||
pub mod okd;
|
||||
pub mod openbao;
|
||||
pub mod opnsense;
|
||||
pub mod postgresql;
|
||||
pub mod prometheus;
|
||||
pub mod storage;
|
||||
pub mod tenant;
|
||||
pub mod tftp;
|
||||
pub mod zitadel;
|
||||
|
||||
@@ -6,7 +6,7 @@ use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::{
|
||||
interpret::{InterpretError, Outcome},
|
||||
interpret::InterpretError,
|
||||
inventory::Inventory,
|
||||
modules::{
|
||||
monitoring::{
|
||||
@@ -17,10 +17,10 @@ use crate::{
|
||||
topology::{
|
||||
K8sclient, Topology,
|
||||
installable::Installable,
|
||||
k8s::K8sClient,
|
||||
oberservability::monitoring::{AlertReceiver, AlertSender, ScrapeTarget},
|
||||
},
|
||||
};
|
||||
use harmony_k8s::K8sClient;
|
||||
|
||||
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
|
||||
#[kube(
|
||||
|
||||
@@ -4,10 +4,8 @@ use kube::CustomResource;
|
||||
use schemars::JsonSchema;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::topology::{
|
||||
k8s::K8sClient,
|
||||
oberservability::monitoring::{AlertReceiver, AlertSender},
|
||||
};
|
||||
use crate::topology::oberservability::monitoring::{AlertReceiver, AlertSender};
|
||||
use harmony_k8s::K8sClient;
|
||||
|
||||
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
|
||||
#[kube(
|
||||
|
||||
@@ -11,8 +11,9 @@ use crate::{
|
||||
inventory::Inventory,
|
||||
modules::monitoring::ntfy::helm::ntfy_helm_chart::ntfy_helm_chart_score,
|
||||
score::Score,
|
||||
topology::{HelmCommand, K8sclient, MultiTargetTopology, Topology, k8s::K8sClient},
|
||||
topology::{HelmCommand, K8sclient, MultiTargetTopology, Topology},
|
||||
};
|
||||
use harmony_k8s::K8sClient;
|
||||
use harmony_types::id::Id;
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
|
||||
@@ -1,9 +1,7 @@
|
||||
use std::{collections::BTreeMap, sync::Arc};
|
||||
|
||||
use crate::{
|
||||
interpret::{InterpretError, Outcome},
|
||||
topology::k8s::K8sClient,
|
||||
};
|
||||
use crate::interpret::{InterpretError, Outcome};
|
||||
use harmony_k8s::K8sClient;
|
||||
use k8s_openapi::api::core::v1::ConfigMap;
|
||||
use kube::api::ObjectMeta;
|
||||
|
||||
|
||||
@@ -1,3 +1,7 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use tokio_retry::{Retry, strategy::ExponentialBackoff};
|
||||
|
||||
use crate::modules::{
|
||||
cert_manager::{
|
||||
capability::{CertificateManagement, CertificateManagementConfig},
|
||||
@@ -69,9 +73,28 @@ where
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
self.topology
|
||||
let strategy = ExponentialBackoff::from_millis(250)
|
||||
.factor(2)
|
||||
.max_delay(Duration::from_millis(1000))
|
||||
.take(10);
|
||||
|
||||
Retry::spawn(strategy, || async {
|
||||
log::debug!("Attempting CA cert fetch");
|
||||
|
||||
let res = self
|
||||
.topology
|
||||
.get_ca_certificate(root_ca_cert_name.into(), &root_ca_config)
|
||||
.await;
|
||||
|
||||
match res {
|
||||
Ok(cert) => Ok(cert),
|
||||
Err(e) => {
|
||||
log::warn!("Retryable error: {:?}", e);
|
||||
Err(e)
|
||||
}
|
||||
}
|
||||
})
|
||||
.await
|
||||
.map_err(|e| e.to_string())
|
||||
.map_err(|e| format!("Retries exhausted: {:?}", e))
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,6 +1,7 @@
|
||||
use std::{collections::BTreeMap, str::FromStr};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use harmony_k8s::KubernetesDistribution;
|
||||
use harmony_macros::hurl;
|
||||
use harmony_secret::{Secret, SecretManager};
|
||||
use harmony_types::id::Id;
|
||||
@@ -25,7 +26,7 @@ use crate::{
|
||||
},
|
||||
},
|
||||
score::Score,
|
||||
topology::{HelmCommand, K8sclient, KubernetesDistribution, TlsRouter, Topology},
|
||||
topology::{HelmCommand, K8sclient, TlsRouter, Topology},
|
||||
};
|
||||
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
|
||||
@@ -4,7 +4,20 @@ use log::warn;
|
||||
use crate::topology::{FailoverTopology, TlsRoute, TlsRouter};
|
||||
|
||||
#[async_trait]
|
||||
impl<T: TlsRouter> TlsRouter for FailoverTopology<T> {
|
||||
impl<T: TlsRouter + Send> TlsRouter for FailoverTopology<T> {
|
||||
async fn get_public_domain(&self) -> Result<String, String> {
|
||||
/*
|
||||
let primary_domain = self
|
||||
.primary
|
||||
.get_public_domain()
|
||||
.await
|
||||
.map_err(|e| e.to_string())?;
|
||||
|
||||
Ok(primary_domain)
|
||||
*/
|
||||
todo!()
|
||||
}
|
||||
|
||||
async fn get_internal_domain(&self) -> Result<Option<String>, String> {
|
||||
todo!()
|
||||
}
|
||||
|
||||
279
harmony/src/modules/node_health/mod.rs
Normal file
279
harmony/src/modules/node_health/mod.rs
Normal file
@@ -0,0 +1,279 @@
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::id::Id;
|
||||
use k8s_openapi::api::scheduling::v1::PriorityClass;
|
||||
use k8s_openapi::api::{
|
||||
apps::v1::{DaemonSet, DaemonSetSpec},
|
||||
core::v1::{
|
||||
Container, ContainerPort, EnvVar, EnvVarSource, Namespace, ObjectFieldSelector, PodSpec,
|
||||
PodTemplateSpec, ResourceRequirements, ServiceAccount, Toleration,
|
||||
},
|
||||
rbac::v1::{ClusterRole, ClusterRoleBinding, PolicyRule, Role, RoleBinding, RoleRef, Subject},
|
||||
};
|
||||
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
|
||||
use kube::api::ObjectMeta;
|
||||
use serde::Serialize;
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use crate::{
|
||||
data::Version,
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
modules::k8s::resource::K8sResourceScore,
|
||||
score::Score,
|
||||
topology::{K8sclient, Topology},
|
||||
};
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
pub struct NodeHealthScore {}
|
||||
|
||||
impl<T: Topology + K8sclient> Score<T> for NodeHealthScore {
|
||||
fn name(&self) -> String {
|
||||
format!("NodeHealthScore")
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(NodeHealthInterpret {})
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct NodeHealthInterpret {}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + K8sclient> Interpret<T> for NodeHealthInterpret {
|
||||
async fn execute(
|
||||
&self,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let namespace_name = "harmony-node-healthcheck".to_string();
|
||||
|
||||
// Namespace
|
||||
let mut labels = BTreeMap::new();
|
||||
labels.insert("name".to_string(), namespace_name.clone());
|
||||
|
||||
let namespace = Namespace {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(namespace_name.clone()),
|
||||
labels: Some(labels),
|
||||
..ObjectMeta::default()
|
||||
},
|
||||
..Namespace::default()
|
||||
};
|
||||
|
||||
// ServiceAccount
|
||||
let service_account_name = "node-healthcheck-sa".to_string();
|
||||
let service_account = ServiceAccount {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(service_account_name.clone()),
|
||||
namespace: Some(namespace_name.clone()),
|
||||
..ObjectMeta::default()
|
||||
},
|
||||
..ServiceAccount::default()
|
||||
};
|
||||
|
||||
// ClusterRole
|
||||
let cluster_role = ClusterRole {
|
||||
metadata: ObjectMeta {
|
||||
name: Some("node-healthcheck-role".to_string()),
|
||||
..ObjectMeta::default()
|
||||
},
|
||||
rules: Some(vec![PolicyRule {
|
||||
api_groups: Some(vec!["".to_string()]),
|
||||
resources: Some(vec!["nodes".to_string()]),
|
||||
verbs: vec!["get".to_string(), "list".to_string()],
|
||||
..PolicyRule::default()
|
||||
}]),
|
||||
..ClusterRole::default()
|
||||
};
|
||||
|
||||
// Role
|
||||
let role = Role {
|
||||
metadata: ObjectMeta {
|
||||
name: Some("allow-hostnetwork-scc".to_string()),
|
||||
namespace: Some(namespace_name.clone()),
|
||||
..ObjectMeta::default()
|
||||
},
|
||||
rules: Some(vec![PolicyRule {
|
||||
api_groups: Some(vec!["security.openshift.io".to_string()]),
|
||||
resources: Some(vec!["securitycontextconstraints".to_string()]),
|
||||
resource_names: Some(vec!["hostnetwork".to_string()]),
|
||||
verbs: vec!["use".to_string()],
|
||||
..PolicyRule::default()
|
||||
}]),
|
||||
..Role::default()
|
||||
};
|
||||
|
||||
// RoleBinding
|
||||
let role_binding = RoleBinding {
|
||||
metadata: ObjectMeta {
|
||||
name: Some("node-status-querier-scc-binding".to_string()),
|
||||
namespace: Some(namespace_name.clone()),
|
||||
..ObjectMeta::default()
|
||||
},
|
||||
subjects: Some(vec![Subject {
|
||||
kind: "ServiceAccount".to_string(),
|
||||
name: service_account_name.clone(),
|
||||
namespace: Some(namespace_name.clone()),
|
||||
..Subject::default()
|
||||
}]),
|
||||
role_ref: RoleRef {
|
||||
api_group: "rbac.authorization.k8s.io".to_string(),
|
||||
kind: "Role".to_string(),
|
||||
name: "allow-hostnetwork-scc".to_string(),
|
||||
},
|
||||
};
|
||||
|
||||
// ClusterRoleBinding
|
||||
let cluster_role_binding = ClusterRoleBinding {
|
||||
metadata: ObjectMeta {
|
||||
name: Some("read-nodes-binding".to_string()),
|
||||
..ObjectMeta::default()
|
||||
},
|
||||
subjects: Some(vec![Subject {
|
||||
kind: "ServiceAccount".to_string(),
|
||||
name: service_account_name.clone(),
|
||||
namespace: Some(namespace_name.clone()),
|
||||
..Subject::default()
|
||||
}]),
|
||||
role_ref: RoleRef {
|
||||
api_group: "rbac.authorization.k8s.io".to_string(),
|
||||
kind: "ClusterRole".to_string(),
|
||||
name: "node-healthcheck-role".to_string(),
|
||||
},
|
||||
};
|
||||
|
||||
// PriorityClass
|
||||
let priority_class_name = "node-healthcheck-critical".to_string();
|
||||
let priority_class = PriorityClass {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(priority_class_name.clone()),
|
||||
..ObjectMeta::default()
|
||||
},
|
||||
value: 1000000000,
|
||||
global_default: Some(false),
|
||||
preemption_policy: Some("PreemptLowerPriority".to_string()),
|
||||
description: Some("Highest priority for node health check daemonset - can preempt lower priority pods".to_string()),
|
||||
};
|
||||
|
||||
// DaemonSet
|
||||
let mut daemonset_labels = BTreeMap::new();
|
||||
daemonset_labels.insert("app".to_string(), "node-healthcheck".to_string());
|
||||
|
||||
let daemon_set = DaemonSet {
|
||||
metadata: ObjectMeta {
|
||||
name: Some("node-healthcheck".to_string()),
|
||||
namespace: Some(namespace_name.clone()),
|
||||
labels: Some(daemonset_labels.clone()),
|
||||
..ObjectMeta::default()
|
||||
},
|
||||
spec: Some(DaemonSetSpec {
|
||||
selector: LabelSelector {
|
||||
match_labels: Some(daemonset_labels.clone()),
|
||||
..LabelSelector::default()
|
||||
},
|
||||
template: PodTemplateSpec {
|
||||
metadata: Some(ObjectMeta {
|
||||
labels: Some(daemonset_labels),
|
||||
..ObjectMeta::default()
|
||||
}),
|
||||
spec: Some(PodSpec {
|
||||
service_account_name: Some(service_account_name.clone()),
|
||||
host_network: Some(true),
|
||||
priority_class_name: Some(priority_class_name),
|
||||
tolerations: Some(vec![Toleration {
|
||||
operator: Some("Exists".to_string()),
|
||||
..Toleration::default()
|
||||
}]),
|
||||
containers: vec![Container {
|
||||
name: "checker".to_string(),
|
||||
image: Some(
|
||||
"hub.nationtech.io/harmony/harmony-node-readiness-endpoint:latest"
|
||||
.to_string(),
|
||||
),
|
||||
env: Some(vec![EnvVar {
|
||||
name: "NODE_NAME".to_string(),
|
||||
value_from: Some(EnvVarSource {
|
||||
field_ref: Some(ObjectFieldSelector {
|
||||
api_version: Some("v1".to_string()),
|
||||
field_path: "spec.nodeName".to_string(),
|
||||
..ObjectFieldSelector::default()
|
||||
}),
|
||||
..EnvVarSource::default()
|
||||
}),
|
||||
..EnvVar::default()
|
||||
}]),
|
||||
ports: Some(vec![ContainerPort {
|
||||
container_port: 25001,
|
||||
host_port: Some(25001),
|
||||
name: Some("health-port".to_string()),
|
||||
..ContainerPort::default()
|
||||
}]),
|
||||
resources: Some(ResourceRequirements {
|
||||
requests: Some({
|
||||
let mut requests = BTreeMap::new();
|
||||
requests.insert("cpu".to_string(), Quantity("10m".to_string()));
|
||||
requests
|
||||
.insert("memory".to_string(), Quantity("50Mi".to_string()));
|
||||
requests
|
||||
}),
|
||||
..ResourceRequirements::default()
|
||||
}),
|
||||
..Container::default()
|
||||
}],
|
||||
..PodSpec::default()
|
||||
}),
|
||||
},
|
||||
..DaemonSetSpec::default()
|
||||
}),
|
||||
..DaemonSet::default()
|
||||
};
|
||||
|
||||
K8sResourceScore::single(namespace, None)
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
K8sResourceScore::single(service_account, Some(namespace_name.clone()))
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
K8sResourceScore::single(cluster_role, None)
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
K8sResourceScore::single(role, Some(namespace_name.clone()))
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
K8sResourceScore::single(role_binding, Some(namespace_name.clone()))
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
K8sResourceScore::single(cluster_role_binding, None)
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
K8sResourceScore::single(priority_class, None)
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
K8sResourceScore::single(daemon_set, Some(namespace_name.clone()))
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
|
||||
Ok(Outcome::success(
|
||||
"Harmony node health successfully deployed".to_string(),
|
||||
))
|
||||
}
|
||||
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("NodeHealth")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
@@ -41,6 +41,7 @@ impl OKDBootstrapLoadBalancerScore {
|
||||
backend_servers: Self::topology_to_backend_server(topology, 6443),
|
||||
listening_port: SocketAddr::new(private_ip, 6443),
|
||||
health_check: Some(HealthCheck::HTTP(
|
||||
None,
|
||||
"/readyz".to_string(),
|
||||
HttpMethod::GET,
|
||||
HttpStatusCode::Success2xx,
|
||||
|
||||
@@ -8,7 +8,7 @@ use crate::{
|
||||
score::Score,
|
||||
topology::{
|
||||
BackendServer, HAClusterTopology, HealthCheck, HttpMethod, HttpStatusCode, LoadBalancer,
|
||||
LoadBalancerService, SSL, Topology,
|
||||
LoadBalancerService, LogicalHost, Router, SSL, Topology,
|
||||
},
|
||||
};
|
||||
|
||||
@@ -23,32 +23,72 @@ pub struct OKDLoadBalancerScore {
|
||||
load_balancer_score: LoadBalancerScore,
|
||||
}
|
||||
|
||||
/// OKD Load Balancer Score configuration
|
||||
///
|
||||
/// This module configures the load balancer for OKD (OpenShift Kubernetes Distribution)
|
||||
/// bare metal installations.
|
||||
///
|
||||
/// # Backend Server Configuration
|
||||
///
|
||||
/// For ports 80 and 443 (ingress traffic), the load balancer includes both control plane
|
||||
/// and worker nodes in the backend pool. This is consistent with OKD's requirement that
|
||||
/// ingress traffic should be load balanced across all nodes that may run ingress router pods.
|
||||
///
|
||||
/// For ports 22623 (Ignition API) and 6443 (Kubernetes API), only control plane nodes
|
||||
/// are included as backends, as these services are control plane specific.
|
||||
///
|
||||
/// # References
|
||||
///
|
||||
/// - [OKD Bare Metal Installation - External Load Balancer Configuration]
|
||||
/// (<https://docs.okd.io/latest/installing/installing_bare_metal/ipi/ipi-install-installation-workflow.html#nw-osp-configuring-external-load-balancer_ipi-install-installation-workflow>)
|
||||
///
|
||||
/// # Example
|
||||
///
|
||||
/// ```ignore
|
||||
/// use harmony::topology::HAClusterTopology;
|
||||
/// use harmony::modules::okd::OKDLoadBalancerScore;
|
||||
///
|
||||
/// let topology: HAClusterTopology = /* get topology from your infrastructure */;
|
||||
/// let score = OKDLoadBalancerScore::new(&topology);
|
||||
/// ```
|
||||
impl OKDLoadBalancerScore {
|
||||
pub fn new(topology: &HAClusterTopology) -> Self {
|
||||
let public_ip = topology.router.get_gateway();
|
||||
let public_services = vec![
|
||||
LoadBalancerService {
|
||||
backend_servers: Self::control_plane_to_backend_server(topology, 80),
|
||||
backend_servers: Self::nodes_to_backend_server(topology, 80),
|
||||
listening_port: SocketAddr::new(public_ip, 80),
|
||||
health_check: Some(HealthCheck::TCP(None)),
|
||||
health_check: None,
|
||||
},
|
||||
LoadBalancerService {
|
||||
backend_servers: Self::control_plane_to_backend_server(topology, 443),
|
||||
backend_servers: Self::nodes_to_backend_server(topology, 443),
|
||||
listening_port: SocketAddr::new(public_ip, 443),
|
||||
health_check: Some(HealthCheck::TCP(None)),
|
||||
health_check: None,
|
||||
},
|
||||
];
|
||||
|
||||
let private_services = vec![
|
||||
LoadBalancerService {
|
||||
backend_servers: Self::control_plane_to_backend_server(topology, 80),
|
||||
backend_servers: Self::nodes_to_backend_server(topology, 80),
|
||||
listening_port: SocketAddr::new(public_ip, 80),
|
||||
health_check: Some(HealthCheck::TCP(None)),
|
||||
health_check: Some(HealthCheck::HTTP(
|
||||
Some(25001),
|
||||
"/health?check=okd_router_1936,node_ready".to_string(),
|
||||
HttpMethod::GET,
|
||||
HttpStatusCode::Success2xx,
|
||||
SSL::Default,
|
||||
)),
|
||||
},
|
||||
LoadBalancerService {
|
||||
backend_servers: Self::control_plane_to_backend_server(topology, 443),
|
||||
backend_servers: Self::nodes_to_backend_server(topology, 443),
|
||||
listening_port: SocketAddr::new(public_ip, 443),
|
||||
health_check: Some(HealthCheck::TCP(None)),
|
||||
health_check: Some(HealthCheck::HTTP(
|
||||
Some(25001),
|
||||
"/health?check=okd_router_1936,node_ready".to_string(),
|
||||
HttpMethod::GET,
|
||||
HttpStatusCode::Success2xx,
|
||||
SSL::Default,
|
||||
)),
|
||||
},
|
||||
LoadBalancerService {
|
||||
backend_servers: Self::control_plane_to_backend_server(topology, 22623),
|
||||
@@ -59,6 +99,7 @@ impl OKDLoadBalancerScore {
|
||||
backend_servers: Self::control_plane_to_backend_server(topology, 6443),
|
||||
listening_port: SocketAddr::new(public_ip, 6443),
|
||||
health_check: Some(HealthCheck::HTTP(
|
||||
None,
|
||||
"/readyz".to_string(),
|
||||
HttpMethod::GET,
|
||||
HttpStatusCode::Success2xx,
|
||||
@@ -74,6 +115,11 @@ impl OKDLoadBalancerScore {
|
||||
}
|
||||
}
|
||||
|
||||
/// Creates backend servers list for control plane nodes only
|
||||
///
|
||||
/// Use this for control plane-specific services like:
|
||||
/// - Port 22623: Ignition API (machine configuration during bootstrap)
|
||||
/// - Port 6443: Kubernetes API server
|
||||
fn control_plane_to_backend_server(
|
||||
topology: &HAClusterTopology,
|
||||
port: u16,
|
||||
@@ -87,6 +133,194 @@ impl OKDLoadBalancerScore {
|
||||
})
|
||||
.collect()
|
||||
}
|
||||
|
||||
/// Creates backend servers list for all nodes (control plane + workers)
|
||||
///
|
||||
/// Use this for ingress traffic that should be distributed across all nodes:
|
||||
/// - Port 80: HTTP ingress traffic
|
||||
/// - Port 443: HTTPS ingress traffic
|
||||
///
|
||||
/// In OKD, ingress router pods can run on any node, so both control plane
|
||||
/// and worker nodes should be included in the load balancer backend pool.
|
||||
fn nodes_to_backend_server(topology: &HAClusterTopology, port: u16) -> Vec<BackendServer> {
|
||||
let mut nodes = Vec::new();
|
||||
for cp in &topology.control_plane {
|
||||
nodes.push(BackendServer {
|
||||
address: cp.ip.to_string(),
|
||||
port,
|
||||
});
|
||||
}
|
||||
for worker in &topology.workers {
|
||||
nodes.push(BackendServer {
|
||||
address: worker.ip.to_string(),
|
||||
port,
|
||||
});
|
||||
}
|
||||
nodes
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use std::sync::{Arc, OnceLock};
|
||||
|
||||
use super::*;
|
||||
use crate::topology::DummyInfra;
|
||||
use harmony_macros::ip;
|
||||
use harmony_types::net::IpAddress;
|
||||
|
||||
fn create_test_topology() -> HAClusterTopology {
|
||||
let router = Arc::new(DummyRouter {
|
||||
gateway: ip!("192.168.1.1"),
|
||||
});
|
||||
|
||||
HAClusterTopology {
|
||||
domain_name: "test.example.com".to_string(),
|
||||
router,
|
||||
load_balancer: Arc::new(DummyInfra),
|
||||
firewall: Arc::new(DummyInfra),
|
||||
dhcp_server: Arc::new(DummyInfra),
|
||||
tftp_server: Arc::new(DummyInfra),
|
||||
http_server: Arc::new(DummyInfra),
|
||||
dns_server: Arc::new(DummyInfra),
|
||||
node_exporter: Arc::new(DummyInfra),
|
||||
switch_client: Arc::new(DummyInfra),
|
||||
bootstrap_host: LogicalHost {
|
||||
ip: ip!("192.168.1.100"),
|
||||
name: "bootstrap".to_string(),
|
||||
},
|
||||
control_plane: vec![
|
||||
LogicalHost {
|
||||
ip: ip!("192.168.1.10"),
|
||||
name: "control-plane-0".to_string(),
|
||||
},
|
||||
LogicalHost {
|
||||
ip: ip!("192.168.1.11"),
|
||||
name: "control-plane-1".to_string(),
|
||||
},
|
||||
LogicalHost {
|
||||
ip: ip!("192.168.1.12"),
|
||||
name: "control-plane-2".to_string(),
|
||||
},
|
||||
],
|
||||
workers: vec![
|
||||
LogicalHost {
|
||||
ip: ip!("192.168.1.20"),
|
||||
name: "worker-0".to_string(),
|
||||
},
|
||||
LogicalHost {
|
||||
ip: ip!("192.168.1.21"),
|
||||
name: "worker-1".to_string(),
|
||||
},
|
||||
],
|
||||
kubeconfig: None,
|
||||
network_manager: OnceLock::new(),
|
||||
}
|
||||
}
|
||||
|
||||
struct DummyRouter {
|
||||
gateway: IpAddress,
|
||||
}
|
||||
|
||||
impl Router for DummyRouter {
|
||||
fn get_gateway(&self) -> IpAddress {
|
||||
self.gateway
|
||||
}
|
||||
fn get_cidr(&self) -> cidr::Ipv4Cidr {
|
||||
let ipv4 = match self.gateway {
|
||||
IpAddress::V4(ip) => ip,
|
||||
IpAddress::V6(_) => panic!("IPv6 not supported"),
|
||||
};
|
||||
cidr::Ipv4Cidr::new(ipv4, 24).unwrap()
|
||||
}
|
||||
fn get_host(&self) -> LogicalHost {
|
||||
LogicalHost {
|
||||
ip: self.gateway,
|
||||
name: "router".to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_nodes_to_backend_server_includes_control_plane_and_workers() {
|
||||
let topology = create_test_topology();
|
||||
|
||||
let backend_servers = OKDLoadBalancerScore::nodes_to_backend_server(&topology, 80);
|
||||
|
||||
assert_eq!(backend_servers.len(), 5);
|
||||
|
||||
let addresses: Vec<&str> = backend_servers.iter().map(|s| s.address.as_str()).collect();
|
||||
assert!(addresses.contains(&"192.168.1.10"));
|
||||
assert!(addresses.contains(&"192.168.1.11"));
|
||||
assert!(addresses.contains(&"192.168.1.12"));
|
||||
assert!(addresses.contains(&"192.168.1.20"));
|
||||
assert!(addresses.contains(&"192.168.1.21"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_control_plane_to_backend_server_only_includes_control_plane() {
|
||||
let topology = create_test_topology();
|
||||
|
||||
let backend_servers = OKDLoadBalancerScore::control_plane_to_backend_server(&topology, 80);
|
||||
|
||||
assert_eq!(backend_servers.len(), 3);
|
||||
|
||||
let addresses: Vec<&str> = backend_servers.iter().map(|s| s.address.as_str()).collect();
|
||||
assert!(addresses.contains(&"192.168.1.10"));
|
||||
assert!(addresses.contains(&"192.168.1.11"));
|
||||
assert!(addresses.contains(&"192.168.1.12"));
|
||||
assert!(!addresses.contains(&"192.168.1.20"));
|
||||
assert!(!addresses.contains(&"192.168.1.21"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_public_services_include_all_nodes_on_port_80_and_443() {
|
||||
let topology = create_test_topology();
|
||||
let score = OKDLoadBalancerScore::new(&topology);
|
||||
|
||||
let public_service_80 = score
|
||||
.load_balancer_score
|
||||
.public_services
|
||||
.iter()
|
||||
.find(|s| s.listening_port.port() == 80)
|
||||
.expect("Public service on port 80 not found");
|
||||
|
||||
let public_service_443 = score
|
||||
.load_balancer_score
|
||||
.public_services
|
||||
.iter()
|
||||
.find(|s| s.listening_port.port() == 443)
|
||||
.expect("Public service on port 443 not found");
|
||||
|
||||
assert_eq!(public_service_80.backend_servers.len(), 5);
|
||||
assert_eq!(public_service_443.backend_servers.len(), 5);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_private_service_port_22623_only_control_plane() {
|
||||
let topology = create_test_topology();
|
||||
let score = OKDLoadBalancerScore::new(&topology);
|
||||
|
||||
let private_service_22623 = score
|
||||
.load_balancer_score
|
||||
.private_services
|
||||
.iter()
|
||||
.find(|s| s.listening_port.port() == 22623)
|
||||
.expect("Private service on port 22623 not found");
|
||||
|
||||
assert_eq!(private_service_22623.backend_servers.len(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_all_backend_servers_have_correct_port() {
|
||||
let topology = create_test_topology();
|
||||
|
||||
let backend_servers = OKDLoadBalancerScore::nodes_to_backend_server(&topology, 443);
|
||||
|
||||
for server in backend_servers {
|
||||
assert_eq!(server.port, 443);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Topology + LoadBalancer> Score<T> for OKDLoadBalancerScore {
|
||||
|
||||
88
harmony/src/modules/openbao/mod.rs
Normal file
88
harmony/src/modules/openbao/mod.rs
Normal file
@@ -0,0 +1,88 @@
|
||||
use std::str::FromStr;
|
||||
|
||||
use harmony_macros::hurl;
|
||||
use non_blank_string_rs::NonBlankString;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::{
|
||||
interpret::Interpret,
|
||||
modules::helm::chart::{HelmChartScore, HelmRepository},
|
||||
score::Score,
|
||||
topology::{HelmCommand, K8sclient, Topology},
|
||||
};
|
||||
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct OpenbaoScore {
|
||||
/// Host used for external access (ingress)
|
||||
pub host: String,
|
||||
}
|
||||
|
||||
impl<T: Topology + K8sclient + HelmCommand> Score<T> for OpenbaoScore {
|
||||
fn name(&self) -> String {
|
||||
"OpenbaoScore".to_string()
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
// TODO exec pod commands to initialize secret store if not already done
|
||||
let host = &self.host;
|
||||
|
||||
let values_yaml = Some(format!(
|
||||
r#"global:
|
||||
openshift: true
|
||||
server:
|
||||
standalone:
|
||||
enabled: true
|
||||
config: |
|
||||
ui = true
|
||||
|
||||
listener "tcp" {{
|
||||
tls_disable = true
|
||||
address = "[::]:8200"
|
||||
cluster_address = "[::]:8201"
|
||||
}}
|
||||
|
||||
storage "file" {{
|
||||
path = "/openbao/data"
|
||||
}}
|
||||
|
||||
service:
|
||||
enabled: true
|
||||
|
||||
ingress:
|
||||
enabled: true
|
||||
hosts:
|
||||
- host: {host}
|
||||
dataStorage:
|
||||
enabled: true
|
||||
size: 10Gi
|
||||
storageClass: null
|
||||
accessMode: ReadWriteOnce
|
||||
|
||||
auditStorage:
|
||||
enabled: true
|
||||
size: 10Gi
|
||||
storageClass: null
|
||||
accessMode: ReadWriteOnce
|
||||
ui:
|
||||
enabled: true"#
|
||||
));
|
||||
|
||||
HelmChartScore {
|
||||
namespace: Some(NonBlankString::from_str("openbao").unwrap()),
|
||||
release_name: NonBlankString::from_str("openbao").unwrap(),
|
||||
chart_name: NonBlankString::from_str("openbao/openbao").unwrap(),
|
||||
chart_version: None,
|
||||
values_overrides: None,
|
||||
values_yaml,
|
||||
create_namespace: true,
|
||||
install_only: false,
|
||||
repository: Some(HelmRepository::new(
|
||||
"openbao".to_string(),
|
||||
hurl!("https://openbao.github.io/openbao-helm"),
|
||||
true,
|
||||
)),
|
||||
}
|
||||
.create_interpret()
|
||||
}
|
||||
}
|
||||
@@ -37,6 +37,7 @@ pub struct PostgreSQLConfig {
|
||||
/// settings incompatible with the default CNPG behavior.
|
||||
pub namespace: String,
|
||||
}
|
||||
|
||||
impl PostgreSQLConfig {
|
||||
pub fn with_namespace(&self, namespace: &str) -> PostgreSQLConfig {
|
||||
let mut new = self.clone();
|
||||
|
||||
@@ -1,3 +1,6 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use kube::{CustomResource, api::ObjectMeta};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
@@ -13,9 +16,18 @@ use serde::{Deserialize, Serialize};
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ClusterSpec {
|
||||
pub instances: u32,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub image_name: Option<String>,
|
||||
pub storage: Storage,
|
||||
pub bootstrap: Bootstrap,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub external_clusters: Option<Vec<ExternalCluster>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub replica: Option<ReplicaSpec>,
|
||||
/// This must be set to None if you want cnpg to generate a superuser secret
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub superuser_secret: Option<BTreeMap<String, String>>,
|
||||
pub enable_superuser_access: bool,
|
||||
}
|
||||
|
||||
impl Default for Cluster {
|
||||
@@ -34,6 +46,10 @@ impl Default for ClusterSpec {
|
||||
image_name: None,
|
||||
storage: Storage::default(),
|
||||
bootstrap: Bootstrap::default(),
|
||||
external_clusters: None,
|
||||
replica: None,
|
||||
superuser_secret: None,
|
||||
enable_superuser_access: false,
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -47,7 +63,13 @@ pub struct Storage {
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Bootstrap {
|
||||
pub initdb: Initdb,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub initdb: Option<Initdb>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub recovery: Option<Recovery>,
|
||||
#[serde(rename = "pg_basebackup")]
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub pg_basebackup: Option<PgBaseBackup>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
@@ -56,3 +78,50 @@ pub struct Initdb {
|
||||
pub database: String,
|
||||
pub owner: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct Recovery {
|
||||
pub source: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
pub struct PgBaseBackup {
|
||||
pub source: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ExternalCluster {
|
||||
pub name: String,
|
||||
pub connection_parameters: HashMap<String, String>,
|
||||
pub ssl_key: Option<SecretKeySelector>,
|
||||
pub ssl_cert: Option<SecretKeySelector>,
|
||||
pub ssl_root_cert: Option<SecretKeySelector>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ConnectionParameters {
|
||||
pub host: String,
|
||||
pub user: String,
|
||||
pub dbname: String,
|
||||
pub sslmode: String,
|
||||
pub sslnegotiation: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ReplicaSpec {
|
||||
pub enabled: bool,
|
||||
pub source: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub primary: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct SecretKeySelector {
|
||||
pub name: String,
|
||||
pub key: String,
|
||||
}
|
||||
|
||||
@@ -3,6 +3,8 @@ use log::debug;
|
||||
use log::info;
|
||||
use std::collections::HashMap;
|
||||
|
||||
use crate::interpret::InterpretError;
|
||||
use crate::topology::TlsRoute;
|
||||
use crate::topology::TlsRouter;
|
||||
use crate::{
|
||||
modules::postgresql::capability::{
|
||||
@@ -49,8 +51,18 @@ impl<T: PostgreSQL + TlsRouter> PostgreSQL for FailoverTopology<T> {
|
||||
// TODO we should be getting the public endpoint for a service by calling a method on
|
||||
// TlsRouter capability.
|
||||
// Something along the lines of `TlsRouter::get_hostname_for_service(...).await?;`
|
||||
let host = format!(
|
||||
"{}.{}.{}",
|
||||
config.cluster_name,
|
||||
config.namespace,
|
||||
self.primary
|
||||
.get_public_domain()
|
||||
.await
|
||||
.expect("failed to retrieve public domain")
|
||||
.to_string()
|
||||
);
|
||||
let endpoint = PostgreSQLEndpoint {
|
||||
host: "postgrestest.sto1.nationtech.io".to_string(),
|
||||
host,
|
||||
port: self.primary.get_router_port().await,
|
||||
};
|
||||
|
||||
@@ -59,6 +71,46 @@ impl<T: PostgreSQL + TlsRouter> PostgreSQL for FailoverTopology<T> {
|
||||
endpoint.host, endpoint.port
|
||||
);
|
||||
|
||||
info!("installing primary postgres route");
|
||||
let prim_hostname = format!(
|
||||
"{}.{}.{}",
|
||||
config.cluster_name,
|
||||
config.namespace,
|
||||
self.primary.get_public_domain().await?
|
||||
);
|
||||
let rw_backend = format!("{}-rw", config.cluster_name);
|
||||
let tls_route = TlsRoute {
|
||||
hostname: prim_hostname,
|
||||
backend: rw_backend,
|
||||
target_port: 5432,
|
||||
namespace: config.namespace.clone(),
|
||||
};
|
||||
// Expose RW publicly via TLS passthrough
|
||||
self.primary
|
||||
.install_route(tls_route.clone())
|
||||
.await
|
||||
.map_err(|e| InterpretError::new(e))?;
|
||||
|
||||
info!("installing replica postgres route");
|
||||
let rep_hostname = format!(
|
||||
"{}.{}.{}",
|
||||
config.cluster_name,
|
||||
config.namespace,
|
||||
self.replica.get_public_domain().await?
|
||||
);
|
||||
let rw_backend = format!("{}-rw", config.cluster_name);
|
||||
let tls_route = TlsRoute {
|
||||
hostname: rep_hostname,
|
||||
backend: rw_backend,
|
||||
target_port: 5432,
|
||||
namespace: config.namespace.clone(),
|
||||
};
|
||||
|
||||
// Expose RW publicly via TLS passthrough
|
||||
self.replica
|
||||
.install_route(tls_route.clone())
|
||||
.await
|
||||
.map_err(|e| InterpretError::new(e))?;
|
||||
info!("Configuring replica connection parameters and bootstrap");
|
||||
|
||||
let mut connection_parameters = HashMap::new();
|
||||
|
||||
@@ -20,7 +20,7 @@ use crate::topology::{K8sclient, Topology};
|
||||
/// # Usage
|
||||
/// ```
|
||||
/// use harmony::modules::postgresql::CloudNativePgOperatorScore;
|
||||
/// let score = CloudNativePgOperatorScore::default();
|
||||
/// let score = CloudNativePgOperatorScore::default_openshift();
|
||||
/// ```
|
||||
///
|
||||
/// Or, you can take control of most relevant fiedls this way :
|
||||
@@ -52,8 +52,8 @@ pub struct CloudNativePgOperatorScore {
|
||||
pub source_namespace: String,
|
||||
}
|
||||
|
||||
impl Default for CloudNativePgOperatorScore {
|
||||
fn default() -> Self {
|
||||
impl CloudNativePgOperatorScore {
|
||||
pub fn default_openshift() -> Self {
|
||||
Self {
|
||||
namespace: "openshift-operators".to_string(),
|
||||
channel: "stable-v1".to_string(),
|
||||
@@ -68,7 +68,7 @@ impl CloudNativePgOperatorScore {
|
||||
pub fn new(namespace: &str) -> Self {
|
||||
Self {
|
||||
namespace: namespace.to_string(),
|
||||
..Default::default()
|
||||
..Self::default_openshift()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,12 +1,21 @@
|
||||
use serde::Serialize;
|
||||
use crate::data::Version;
|
||||
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
|
||||
use crate::inventory::Inventory;
|
||||
|
||||
use crate::interpret::Interpret;
|
||||
use crate::modules::k8s::resource::K8sResourceScore;
|
||||
use crate::modules::postgresql::capability::PostgreSQLConfig;
|
||||
use crate::modules::postgresql::cnpg::{Bootstrap, Cluster, ClusterSpec, Initdb, Storage};
|
||||
use crate::modules::postgresql::cnpg::{
|
||||
Bootstrap, Cluster, ClusterSpec, ExternalCluster, Initdb, PgBaseBackup, ReplicaSpec,
|
||||
SecretKeySelector, Storage,
|
||||
};
|
||||
use crate::score::Score;
|
||||
use crate::topology::{K8sclient, Topology};
|
||||
use async_trait::async_trait;
|
||||
use harmony_types::id::Id;
|
||||
use k8s_openapi::ByteString;
|
||||
use k8s_openapi::api::core::v1::Secret;
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
||||
use serde::Serialize;
|
||||
|
||||
/// Deploys an opinionated, highly available PostgreSQL cluster managed by CNPG.
|
||||
///
|
||||
@@ -49,6 +58,30 @@ impl K8sPostgreSQLScore {
|
||||
|
||||
impl<T: Topology + K8sclient> Score<T> for K8sPostgreSQLScore {
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(K8sPostgreSQLInterpret {
|
||||
config: self.config.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn name(&self) -> String {
|
||||
format!("PostgreSQLScore({})", self.config.namespace)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
pub struct K8sPostgreSQLInterpret {
|
||||
config: PostgreSQLConfig,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + K8sclient> Interpret<T> for K8sPostgreSQLInterpret {
|
||||
async fn execute(
|
||||
&self,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
match &self.config.role {
|
||||
super::capability::PostgreSQLClusterRole::Primary => {
|
||||
let metadata = ObjectMeta {
|
||||
name: Some(self.config.cluster_name.clone()),
|
||||
namespace: Some(self.config.namespace.clone()),
|
||||
@@ -61,20 +94,148 @@ impl<T: Topology + K8sclient> Score<T> for K8sPostgreSQLScore {
|
||||
size: self.config.storage_size.to_string(),
|
||||
},
|
||||
bootstrap: Bootstrap {
|
||||
initdb: Initdb {
|
||||
initdb: Some(Initdb {
|
||||
database: "app".to_string(),
|
||||
owner: "app".to_string(),
|
||||
}),
|
||||
recovery: None,
|
||||
pg_basebackup: None,
|
||||
},
|
||||
enable_superuser_access: true,
|
||||
..ClusterSpec::default()
|
||||
};
|
||||
let cluster = Cluster { metadata, spec };
|
||||
|
||||
Ok(
|
||||
K8sResourceScore::single(cluster, Some(self.config.namespace.clone()))
|
||||
.create_interpret()
|
||||
.execute(inventory, topology)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
super::capability::PostgreSQLClusterRole::Replica(replica_config) => {
|
||||
let metadata = ObjectMeta {
|
||||
name: Some("streaming-replica-certs".to_string()),
|
||||
namespace: Some(self.config.namespace.clone()),
|
||||
..ObjectMeta::default()
|
||||
};
|
||||
|
||||
// The data must be base64-encoded. If you already have PEM strings in your config, encode them:
|
||||
let mut data = std::collections::BTreeMap::new();
|
||||
data.insert(
|
||||
"tls.key".to_string(),
|
||||
ByteString(
|
||||
replica_config
|
||||
.replication_certs
|
||||
.streaming_replica_key_pem
|
||||
.as_bytes()
|
||||
.to_vec(),
|
||||
),
|
||||
);
|
||||
data.insert(
|
||||
"tls.crt".to_string(),
|
||||
ByteString(
|
||||
replica_config
|
||||
.replication_certs
|
||||
.streaming_replica_cert_pem
|
||||
.as_bytes()
|
||||
.to_vec(),
|
||||
),
|
||||
);
|
||||
data.insert(
|
||||
"ca.crt".to_string(),
|
||||
ByteString(
|
||||
replica_config
|
||||
.replication_certs
|
||||
.ca_cert_pem
|
||||
.as_bytes()
|
||||
.to_vec(),
|
||||
),
|
||||
);
|
||||
|
||||
let secret = Secret {
|
||||
metadata,
|
||||
data: Some(data),
|
||||
string_data: None, // You could use string_data if you prefer raw strings
|
||||
type_: Some("Opaque".to_string()),
|
||||
..Secret::default()
|
||||
};
|
||||
|
||||
K8sResourceScore::single(secret, Some(self.config.namespace.clone()))
|
||||
.create_interpret()
|
||||
.execute(inventory, topology)
|
||||
.await?;
|
||||
|
||||
let metadata = ObjectMeta {
|
||||
name: Some(self.config.cluster_name.clone()),
|
||||
namespace: Some(self.config.namespace.clone()),
|
||||
..ObjectMeta::default()
|
||||
};
|
||||
|
||||
let spec = ClusterSpec {
|
||||
instances: self.config.instances,
|
||||
storage: Storage {
|
||||
size: self.config.storage_size.to_string(),
|
||||
},
|
||||
bootstrap: Bootstrap {
|
||||
initdb: None,
|
||||
recovery: None,
|
||||
pg_basebackup: Some(PgBaseBackup {
|
||||
source: replica_config.primary_cluster_name.clone(),
|
||||
}),
|
||||
},
|
||||
external_clusters: Some(vec![ExternalCluster {
|
||||
name: replica_config.primary_cluster_name.clone(),
|
||||
connection_parameters: replica_config
|
||||
.external_cluster
|
||||
.connection_parameters
|
||||
.clone(),
|
||||
ssl_key: Some(SecretKeySelector {
|
||||
name: "streaming-replica-certs".to_string(),
|
||||
key: "tls.key".to_string(),
|
||||
}),
|
||||
ssl_cert: Some(SecretKeySelector {
|
||||
name: "streaming-replica-certs".to_string(),
|
||||
key: "tls.crt".to_string(),
|
||||
}),
|
||||
ssl_root_cert: Some(SecretKeySelector {
|
||||
name: "streaming-replica-certs".to_string(),
|
||||
key: "ca.crt".to_string(),
|
||||
}),
|
||||
}]),
|
||||
replica: Some(ReplicaSpec {
|
||||
enabled: true,
|
||||
source: replica_config.primary_cluster_name.clone(),
|
||||
primary: None,
|
||||
}),
|
||||
..ClusterSpec::default()
|
||||
};
|
||||
|
||||
let cluster = Cluster { metadata, spec };
|
||||
|
||||
K8sResourceScore::single(cluster, Some(self.config.namespace.clone())).create_interpret()
|
||||
Ok(
|
||||
K8sResourceScore::single(cluster, Some(self.config.namespace.clone()))
|
||||
.create_interpret()
|
||||
.execute(inventory, topology)
|
||||
.await?,
|
||||
)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn name(&self) -> String {
|
||||
format!("PostgreSQLScore({})", self.config.namespace)
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("K8sPostgreSQLInterpret")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
@@ -18,46 +18,31 @@ use crate::topology::Topology;
|
||||
/// # Usage
|
||||
/// ```
|
||||
/// use harmony::modules::postgresql::PublicPostgreSQLScore;
|
||||
/// let score = PublicPostgreSQLScore::new("harmony", "pg-rw.example.com");
|
||||
/// let score = PublicPostgreSQLScore::new("harmony");
|
||||
/// ```
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct PublicPostgreSQLScore {
|
||||
/// Inner non-public Postgres cluster config.
|
||||
pub config: PostgreSQLConfig,
|
||||
/// Public hostname for RW TLS passthrough (port 443 → cluster-rw:5432).
|
||||
pub hostname: String,
|
||||
}
|
||||
|
||||
impl PublicPostgreSQLScore {
|
||||
pub fn new(namespace: &str, hostname: &str) -> Self {
|
||||
pub fn new(namespace: &str) -> Self {
|
||||
Self {
|
||||
config: PostgreSQLConfig::default().with_namespace(namespace),
|
||||
hostname: hostname.to_string(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Topology + PostgreSQL + TlsRouter> Score<T> for PublicPostgreSQLScore {
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
let rw_backend = format!("{}-rw", self.config.cluster_name);
|
||||
let tls_route = TlsRoute {
|
||||
namespace: self.config.namespace.clone(),
|
||||
hostname: self.hostname.clone(),
|
||||
backend: rw_backend,
|
||||
target_port: 5432,
|
||||
};
|
||||
|
||||
Box::new(PublicPostgreSQLInterpret {
|
||||
config: self.config.clone(),
|
||||
tls_route,
|
||||
})
|
||||
}
|
||||
|
||||
fn name(&self) -> String {
|
||||
format!(
|
||||
"PublicPostgreSQLScore({}:{})",
|
||||
self.config.namespace, self.hostname
|
||||
)
|
||||
format!("PublicPostgreSQLScore({})", self.config.namespace)
|
||||
}
|
||||
}
|
||||
|
||||
@@ -65,7 +50,6 @@ impl<T: Topology + PostgreSQL + TlsRouter> Score<T> for PublicPostgreSQLScore {
|
||||
#[derive(Debug, Clone)]
|
||||
struct PublicPostgreSQLInterpret {
|
||||
config: PostgreSQLConfig,
|
||||
tls_route: TlsRoute,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
@@ -76,15 +60,28 @@ impl<T: Topology + PostgreSQL + TlsRouter> Interpret<T> for PublicPostgreSQLInte
|
||||
.await
|
||||
.map_err(|e| InterpretError::new(e))?;
|
||||
|
||||
let hostname = format!(
|
||||
"{}.{}.{}",
|
||||
self.config.cluster_name,
|
||||
self.config.namespace,
|
||||
topo.get_public_domain().await?
|
||||
);
|
||||
let rw_backend = format!("{}-rw", self.config.cluster_name);
|
||||
let tls_route = TlsRoute {
|
||||
hostname,
|
||||
backend: rw_backend,
|
||||
target_port: 5432,
|
||||
namespace: self.config.namespace.clone(),
|
||||
};
|
||||
// Expose RW publicly via TLS passthrough
|
||||
topo.install_route(self.tls_route.clone())
|
||||
topo.install_route(tls_route.clone())
|
||||
.await
|
||||
.map_err(|e| InterpretError::new(e))?;
|
||||
|
||||
Ok(Outcome::success(format!(
|
||||
"Public CNPG cluster '{}' deployed with TLS passthrough route '{}'",
|
||||
self.config.cluster_name.clone(),
|
||||
self.tls_route.hostname
|
||||
tls_route.hostname
|
||||
)))
|
||||
}
|
||||
|
||||
|
||||
@@ -12,8 +12,7 @@ use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::C
|
||||
use crate::modules::monitoring::kube_prometheus::crd::crd_default_rules::build_default_application_rules;
|
||||
use crate::modules::monitoring::kube_prometheus::crd::crd_grafana::{
|
||||
Grafana, GrafanaDashboard, GrafanaDashboardSpec, GrafanaDatasource, GrafanaDatasourceConfig,
|
||||
GrafanaDatasourceJsonData, GrafanaDatasourceSpec, GrafanaSecretKeyRef, GrafanaSpec,
|
||||
GrafanaValueFrom, GrafanaValueSource,
|
||||
GrafanaDatasourceJsonData, GrafanaDatasourceSpec, GrafanaSpec,
|
||||
};
|
||||
use crate::modules::monitoring::kube_prometheus::crd::crd_prometheus_rules::{
|
||||
PrometheusRule, PrometheusRuleSpec, RuleGroup,
|
||||
@@ -23,7 +22,7 @@ use crate::modules::monitoring::kube_prometheus::crd::service_monitor::{
|
||||
ServiceMonitor, ServiceMonitorSpec,
|
||||
};
|
||||
use crate::topology::oberservability::monitoring::AlertReceiver;
|
||||
use crate::topology::{K8sclient, Topology, k8s::K8sClient};
|
||||
use crate::topology::{K8sclient, Topology};
|
||||
use crate::{
|
||||
data::Version,
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
@@ -38,6 +37,7 @@ use crate::{
|
||||
},
|
||||
score::Score,
|
||||
};
|
||||
use harmony_k8s::K8sClient;
|
||||
use harmony_types::id::Id;
|
||||
|
||||
use super::prometheus::PrometheusMonitoring;
|
||||
|
||||
@@ -30,12 +30,13 @@ use crate::modules::monitoring::kube_prometheus::crd::rhob_service_monitor::{
|
||||
use crate::score::Score;
|
||||
use crate::topology::ingress::Ingress;
|
||||
use crate::topology::oberservability::monitoring::AlertReceiver;
|
||||
use crate::topology::{K8sclient, Topology, k8s::K8sClient};
|
||||
use crate::topology::{K8sclient, Topology};
|
||||
use crate::{
|
||||
data::Version,
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
};
|
||||
use harmony_k8s::K8sClient;
|
||||
use harmony_types::id::Id;
|
||||
|
||||
use super::prometheus::PrometheusMonitoring;
|
||||
|
||||
@@ -4,6 +4,7 @@ use std::{
|
||||
};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use harmony_k8s::K8sClient;
|
||||
use log::{debug, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use tokio::time::sleep;
|
||||
@@ -13,7 +14,7 @@ use crate::{
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
score::Score,
|
||||
topology::{K8sclient, Topology, k8s::K8sClient},
|
||||
topology::{K8sclient, Topology},
|
||||
};
|
||||
use harmony_types::id::Id;
|
||||
|
||||
|
||||
@@ -9,8 +9,9 @@ use crate::{
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
score::Score,
|
||||
topology::{K8sclient, Topology, k8s::K8sClient},
|
||||
topology::{K8sclient, Topology},
|
||||
};
|
||||
use harmony_k8s::K8sClient;
|
||||
use harmony_types::id::Id;
|
||||
|
||||
#[derive(Clone, Debug, Serialize)]
|
||||
|
||||
518
harmony/src/modules/zitadel/mod.rs
Normal file
518
harmony/src/modules/zitadel/mod.rs
Normal file
@@ -0,0 +1,518 @@
|
||||
use k8s_openapi::api::core::v1::Namespace;
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
||||
use k8s_openapi::{ByteString, api::core::v1::Secret};
|
||||
use kube::{Error as KubeError, core::ErrorResponse};
|
||||
use rand::distr::Distribution;
|
||||
use rand::{Rng, rng, seq::SliceRandom};
|
||||
use std::collections::BTreeMap;
|
||||
use std::str::FromStr;
|
||||
|
||||
use async_trait::async_trait;
|
||||
use harmony_macros::hurl;
|
||||
use harmony_types::id::Id;
|
||||
use harmony_types::storage::StorageSize;
|
||||
use log::{debug, error, info, trace, warn};
|
||||
use non_blank_string_rs::NonBlankString;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::{
|
||||
data::Version,
|
||||
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
|
||||
inventory::Inventory,
|
||||
modules::helm::chart::{HelmChartScore, HelmRepository},
|
||||
modules::k8s::resource::K8sResourceScore,
|
||||
modules::postgresql::capability::{PostgreSQL, PostgreSQLClusterRole, PostgreSQLConfig},
|
||||
score::Score,
|
||||
topology::{HelmCommand, K8sclient, Topology},
|
||||
};
|
||||
|
||||
const NAMESPACE: &str = "zitadel";
|
||||
const PG_CLUSTER_NAME: &str = "zitadel-pg";
|
||||
const MASTERKEY_SECRET_NAME: &str = "zitadel-masterkey";
|
||||
|
||||
/// Opinionated Zitadel deployment score.
|
||||
///
|
||||
/// Deploys a PostgreSQL cluster (via the [`PostgreSQL`] trait) and the Zitadel
|
||||
/// Helm chart into the same namespace. Intended as a central multi-tenant IdP
|
||||
/// with SSO for OKD/OpenShift, OpenBao, Harbor, Grafana, Nextcloud, Ente
|
||||
/// Photos, and others.
|
||||
///
|
||||
/// # Ingress annotations
|
||||
/// No controller-specific ingress annotations are set by default. On
|
||||
/// OKD/OpenShift, the ingress should request TLS so the generated Route is
|
||||
/// edge-terminated instead of HTTP-only. Optional cert-manager annotations are
|
||||
/// included for clusters that have cert-manager installed; clusters without
|
||||
/// cert-manager will ignore them.
|
||||
/// Add or adjust annotations via `values_overrides` depending on your
|
||||
/// distribution:
|
||||
/// - NGINX: `nginx.ingress.kubernetes.io/backend-protocol: GRPC`
|
||||
/// - OpenShift HAProxy: `route.openshift.io/termination: edge`
|
||||
/// - AWS ALB: set `ingress.controller: aws`
|
||||
|
||||
///
|
||||
/// # Database credentials
|
||||
/// CNPG creates a `<cluster>-superuser` secret with key `password`. Because
|
||||
/// `envVarsSecret` injects secret keys verbatim as env var names and the CNPG
|
||||
/// key (`password`) does not match ZITADEL's expected name
|
||||
/// (`ZITADEL_DATABASE_POSTGRES_USER_PASSWORD`), individual `env` entries with
|
||||
/// `valueFrom.secretKeyRef` are used instead. For environments with an
|
||||
/// External Secrets Operator or similar, create a dedicated secret with the
|
||||
/// correct ZITADEL env var names and switch to `envVarsSecret`.
|
||||
#[derive(Debug, Serialize, Clone)]
|
||||
pub struct ZitadelScore {
|
||||
/// External domain (e.g. `"auth.example.com"`).
|
||||
pub host: String,
|
||||
pub zitadel_version: String,
|
||||
}
|
||||
|
||||
impl<T: Topology + K8sclient + HelmCommand + PostgreSQL> Score<T> for ZitadelScore {
|
||||
fn name(&self) -> String {
|
||||
"ZitadelScore".to_string()
|
||||
}
|
||||
|
||||
#[doc(hidden)]
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(ZitadelInterpret {
|
||||
host: self.host.clone(),
|
||||
zitadel_version: self.zitadel_version.clone(),
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
struct ZitadelInterpret {
|
||||
host: String,
|
||||
zitadel_version: String,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + K8sclient + HelmCommand + PostgreSQL> Interpret<T> for ZitadelInterpret {
|
||||
async fn execute(
|
||||
&self,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
info!(
|
||||
"[Zitadel] Starting full deployment — namespace: '{NAMESPACE}', host: '{}'",
|
||||
self.host
|
||||
);
|
||||
|
||||
info!("Creating namespace {NAMESPACE} if it does not exist");
|
||||
K8sResourceScore::single(
|
||||
Namespace {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(NAMESPACE.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
..Default::default()
|
||||
},
|
||||
None,
|
||||
)
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
|
||||
// --- Step 1: PostgreSQL -------------------------------------------
|
||||
|
||||
let pg_config = PostgreSQLConfig {
|
||||
cluster_name: PG_CLUSTER_NAME.to_string(),
|
||||
instances: 2,
|
||||
storage_size: StorageSize::gi(10),
|
||||
role: PostgreSQLClusterRole::Primary,
|
||||
namespace: NAMESPACE.to_string(),
|
||||
};
|
||||
|
||||
debug!(
|
||||
"[Zitadel] Deploying PostgreSQL cluster '{}' — instances: {}, storage: 10Gi, namespace: '{}'",
|
||||
pg_config.cluster_name, pg_config.instances, pg_config.namespace
|
||||
);
|
||||
|
||||
topology.deploy(&pg_config).await.map_err(|e| {
|
||||
let msg = format!(
|
||||
"[Zitadel] PostgreSQL deployment failed for '{}': {e}",
|
||||
pg_config.cluster_name
|
||||
);
|
||||
error!("{msg}");
|
||||
InterpretError::new(msg)
|
||||
})?;
|
||||
|
||||
info!(
|
||||
"[Zitadel] PostgreSQL cluster '{}' deployed",
|
||||
pg_config.cluster_name
|
||||
);
|
||||
|
||||
// --- Step 2: Resolve internal DB endpoint -------------------------
|
||||
|
||||
debug!(
|
||||
"[Zitadel] Resolving internal endpoint for cluster '{}'",
|
||||
pg_config.cluster_name
|
||||
);
|
||||
|
||||
let endpoint = topology.get_endpoint(&pg_config).await.map_err(|e| {
|
||||
let msg = format!(
|
||||
"[Zitadel] Failed to resolve endpoint for cluster '{}': {e}",
|
||||
pg_config.cluster_name
|
||||
);
|
||||
error!("{msg}");
|
||||
InterpretError::new(msg)
|
||||
})?;
|
||||
|
||||
info!(
|
||||
"[Zitadel] DB endpoint resolved — host: '{}', port: {}",
|
||||
endpoint.host, endpoint.port
|
||||
);
|
||||
|
||||
// The CNPG-managed superuser secret contains 'password', 'username',
|
||||
// 'host', 'port', 'dbname', 'uri'. We reference 'password' directly
|
||||
// via env.valueFrom.secretKeyRef because CNPG's key names do not
|
||||
// match ZITADEL's required env var names.
|
||||
let pg_user_secret = format!("{PG_CLUSTER_NAME}-app");
|
||||
let pg_superuser_secret = format!("{PG_CLUSTER_NAME}-superuser");
|
||||
let db_host = &endpoint.host;
|
||||
let db_port = endpoint.port;
|
||||
let host = &self.host;
|
||||
|
||||
debug!("[Zitadel] DB credentials source — secret: '{pg_user_secret}', key: 'password'");
|
||||
debug!(
|
||||
"[Zitadel] DB credentials source — superuser secret: '{pg_superuser_secret}', key: 'password'"
|
||||
);
|
||||
|
||||
// Zitadel requires one symbol, one number and more. So let's force it.
|
||||
fn generate_secure_password(length: usize) -> String {
|
||||
const ALPHA_UPPER: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
|
||||
const ALPHA_LOWER: &[u8] = b"abcdefghijklmnopqrstuvwxyz";
|
||||
const DIGITS: &[u8] = b"0123456789";
|
||||
const SYMBOLS: &[u8] = b"!@#$%^&*()_+-=[]{}|;:',.<>?/";
|
||||
|
||||
let mut rng = rand::rng();
|
||||
let uniform_alpha_upper = rand::distr::Uniform::new(0, ALPHA_UPPER.len())
|
||||
.expect("Failed to create distribution");
|
||||
let uniform_alpha_lower = rand::distr::Uniform::new(0, ALPHA_LOWER.len())
|
||||
.expect("Failed to create distribution");
|
||||
let uniform_digits =
|
||||
rand::distr::Uniform::new(0, DIGITS.len()).expect("Failed to create distribution");
|
||||
let uniform_symbols =
|
||||
rand::distr::Uniform::new(0, SYMBOLS.len()).expect("Failed to create distribution");
|
||||
|
||||
let mut chars: Vec<char> = Vec::with_capacity(length);
|
||||
|
||||
// Ensure at least one of each: upper, lower, digit, symbol
|
||||
chars.push(ALPHA_UPPER[uniform_alpha_upper.sample(&mut rng)] as char);
|
||||
chars.push(ALPHA_LOWER[uniform_alpha_lower.sample(&mut rng)] as char);
|
||||
chars.push(DIGITS[uniform_digits.sample(&mut rng)] as char);
|
||||
chars.push(SYMBOLS[uniform_symbols.sample(&mut rng)] as char);
|
||||
|
||||
// Fill remaining with random from all categories
|
||||
let all_chars: Vec<u8> = [ALPHA_UPPER, ALPHA_LOWER, DIGITS, SYMBOLS].concat();
|
||||
|
||||
let uniform_all = rand::distr::Uniform::new(0, all_chars.len())
|
||||
.expect("Failed to create distribution");
|
||||
|
||||
for _ in 0..(length - 4) {
|
||||
chars.push(all_chars[uniform_all.sample(&mut rng)] as char);
|
||||
}
|
||||
|
||||
// Shuffle
|
||||
let mut shuffled = chars;
|
||||
shuffled.shuffle(&mut rng);
|
||||
|
||||
return shuffled.iter().collect();
|
||||
}
|
||||
|
||||
let admin_password = generate_secure_password(16);
|
||||
|
||||
// --- Step 3: Create masterkey secret ------------------------------------
|
||||
|
||||
debug!(
|
||||
"[Zitadel] Creating masterkey secret '{}' in namespace '{}'",
|
||||
MASTERKEY_SECRET_NAME, NAMESPACE
|
||||
);
|
||||
|
||||
// Masterkey for symmetric encryption — must be exactly 32 ASCII bytes (alphanumeric only).
|
||||
let masterkey = rng()
|
||||
.sample_iter(&rand::distr::Alphanumeric)
|
||||
.take(32)
|
||||
.map(char::from)
|
||||
.collect::<String>();
|
||||
|
||||
debug!(
|
||||
"[Zitadel] Created masterkey secret '{}' in namespace '{}'",
|
||||
MASTERKEY_SECRET_NAME, NAMESPACE
|
||||
);
|
||||
|
||||
let mut masterkey_data: BTreeMap<String, ByteString> = BTreeMap::new();
|
||||
masterkey_data.insert("masterkey".to_string(), ByteString(masterkey.into()));
|
||||
|
||||
let masterkey_secret = Secret {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(MASTERKEY_SECRET_NAME.to_string()),
|
||||
namespace: Some(NAMESPACE.to_string()),
|
||||
..ObjectMeta::default()
|
||||
},
|
||||
data: Some(masterkey_data),
|
||||
..Secret::default()
|
||||
};
|
||||
|
||||
match topology
|
||||
.k8s_client()
|
||||
.await
|
||||
.map_err(|e| InterpretError::new(format!("Failed to get k8s client : {e}")))?
|
||||
.create(&masterkey_secret, Some(NAMESPACE))
|
||||
.await
|
||||
{
|
||||
Ok(_) => {
|
||||
info!(
|
||||
"[Zitadel] Masterkey secret '{}' created",
|
||||
MASTERKEY_SECRET_NAME
|
||||
);
|
||||
}
|
||||
Err(KubeError::Api(ErrorResponse { code: 409, .. })) => {
|
||||
info!(
|
||||
"[Zitadel] Masterkey secret '{}' already exists, leaving it untouched",
|
||||
MASTERKEY_SECRET_NAME
|
||||
);
|
||||
}
|
||||
Err(other) => {
|
||||
let msg = format!(
|
||||
"[Zitadel] Failed to create masterkey secret '{}': {other}",
|
||||
MASTERKEY_SECRET_NAME
|
||||
);
|
||||
error!("{msg}");
|
||||
return Err(InterpretError::new(msg));
|
||||
}
|
||||
};
|
||||
|
||||
debug!(
|
||||
"[Zitadel] Masterkey secret '{}' created successfully",
|
||||
MASTERKEY_SECRET_NAME
|
||||
);
|
||||
|
||||
// --- Step 4: Build Helm values ------------------------------------
|
||||
|
||||
warn!(
|
||||
"[Zitadel] Applying TLS-enabled ingress defaults for OKD/OpenShift. \
|
||||
cert-manager annotations are included as optional hints and are \
|
||||
ignored on clusters without cert-manager."
|
||||
);
|
||||
|
||||
let values_yaml = format!(
|
||||
r#"image:
|
||||
tag: {zitadel_version}
|
||||
zitadel:
|
||||
masterkeySecretName: "{MASTERKEY_SECRET_NAME}"
|
||||
configmapConfig:
|
||||
ExternalDomain: "{host}"
|
||||
ExternalSecure: true
|
||||
FirstInstance:
|
||||
Org:
|
||||
Human:
|
||||
UserName: "admin"
|
||||
Password: "{admin_password}"
|
||||
FirstName: "Zitadel"
|
||||
LastName: "Admin"
|
||||
Email: "admin@zitadel.example.com"
|
||||
PasswordChangeRequired: true
|
||||
TLS:
|
||||
Enabled: false
|
||||
Database:
|
||||
Postgres:
|
||||
Host: "{db_host}"
|
||||
Port: {db_port}
|
||||
Database: zitadel
|
||||
MaxOpenConns: 20
|
||||
MaxIdleConns: 10
|
||||
User:
|
||||
Username: postgres
|
||||
SSL:
|
||||
Mode: require
|
||||
Admin:
|
||||
Username: postgres
|
||||
SSL:
|
||||
Mode: require
|
||||
# Directly import credentials from the postgres secret
|
||||
# TODO : use a less privileged postgres user
|
||||
env:
|
||||
- name: ZITADEL_DATABASE_POSTGRES_USER_USERNAME
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: "{pg_superuser_secret}"
|
||||
key: user
|
||||
- name: ZITADEL_DATABASE_POSTGRES_USER_PASSWORD
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: "{pg_superuser_secret}"
|
||||
key: password
|
||||
- name: ZITADEL_DATABASE_POSTGRES_ADMIN_USERNAME
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: "{pg_superuser_secret}"
|
||||
key: user
|
||||
- name: ZITADEL_DATABASE_POSTGRES_ADMIN_PASSWORD
|
||||
valueFrom:
|
||||
secretKeyRef:
|
||||
name: "{pg_superuser_secret}"
|
||||
key: password
|
||||
# Security context for OpenShift restricted PSA compliance
|
||||
podSecurityContext:
|
||||
runAsNonRoot: true
|
||||
runAsUser: null
|
||||
fsGroup: null
|
||||
seccompProfile:
|
||||
type: RuntimeDefault
|
||||
securityContext:
|
||||
allowPrivilegeEscalation: false
|
||||
capabilities:
|
||||
drop:
|
||||
- ALL
|
||||
runAsNonRoot: true
|
||||
runAsUser: null
|
||||
fsGroup: null
|
||||
seccompProfile:
|
||||
type: RuntimeDefault
|
||||
# Init job security context (runs before main deployment)
|
||||
initJob:
|
||||
podSecurityContext:
|
||||
runAsNonRoot: true
|
||||
runAsUser: null
|
||||
fsGroup: null
|
||||
seccompProfile:
|
||||
type: RuntimeDefault
|
||||
securityContext:
|
||||
allowPrivilegeEscalation: false
|
||||
capabilities:
|
||||
drop:
|
||||
- ALL
|
||||
runAsNonRoot: true
|
||||
runAsUser: null
|
||||
fsGroup: null
|
||||
seccompProfile:
|
||||
type: RuntimeDefault
|
||||
# Setup job security context
|
||||
setupJob:
|
||||
podSecurityContext:
|
||||
runAsNonRoot: true
|
||||
runAsUser: null
|
||||
fsGroup: null
|
||||
seccompProfile:
|
||||
type: RuntimeDefault
|
||||
securityContext:
|
||||
allowPrivilegeEscalation: false
|
||||
capabilities:
|
||||
drop:
|
||||
- ALL
|
||||
runAsNonRoot: true
|
||||
runAsUser: null
|
||||
fsGroup: null
|
||||
seccompProfile:
|
||||
type: RuntimeDefault
|
||||
ingress:
|
||||
enabled: true
|
||||
annotations:
|
||||
cert-manager.io/cluster-issuer: letsencrypt-prod
|
||||
route.openshift.io/termination: edge
|
||||
hosts:
|
||||
- host: "{host}"
|
||||
paths:
|
||||
- path: /
|
||||
pathType: Prefix
|
||||
tls:
|
||||
- hosts:
|
||||
- "{host}"
|
||||
secretName: "{host}-tls"
|
||||
|
||||
login:
|
||||
enabled: true
|
||||
podSecurityContext:
|
||||
runAsNonRoot: true
|
||||
runAsUser: null
|
||||
fsGroup: null
|
||||
seccompProfile:
|
||||
type: RuntimeDefault
|
||||
securityContext:
|
||||
allowPrivilegeEscalation: false
|
||||
capabilities:
|
||||
drop:
|
||||
- ALL
|
||||
runAsNonRoot: true
|
||||
runAsUser: null
|
||||
fsGroup: null
|
||||
seccompProfile:
|
||||
type: RuntimeDefault
|
||||
ingress:
|
||||
enabled: true
|
||||
annotations:
|
||||
cert-manager.io/cluster-issuer: letsencrypt-prod
|
||||
route.openshift.io/termination: edge
|
||||
hosts:
|
||||
- host: "{host}"
|
||||
paths:
|
||||
- path: /ui/v2/login
|
||||
pathType: Prefix
|
||||
tls:
|
||||
- hosts:
|
||||
- "{host}"
|
||||
secretName: "{host}-tls""#,
|
||||
zitadel_version = self.zitadel_version
|
||||
);
|
||||
|
||||
trace!("[Zitadel] Helm values YAML:\n{values_yaml}");
|
||||
|
||||
// --- Step 5: Deploy Helm chart ------------------------------------
|
||||
|
||||
info!(
|
||||
"[Zitadel] Deploying Helm chart 'zitadel/zitadel' as release 'zitadel' in namespace '{NAMESPACE}'"
|
||||
);
|
||||
|
||||
let result = HelmChartScore {
|
||||
namespace: Some(NonBlankString::from_str(NAMESPACE).unwrap()),
|
||||
release_name: NonBlankString::from_str("zitadel").unwrap(),
|
||||
chart_name: NonBlankString::from_str("zitadel/zitadel").unwrap(),
|
||||
chart_version: None,
|
||||
values_overrides: None,
|
||||
values_yaml: Some(values_yaml),
|
||||
create_namespace: true,
|
||||
install_only: false,
|
||||
repository: Some(HelmRepository::new(
|
||||
"zitadel".to_string(),
|
||||
hurl!("https://charts.zitadel.com"),
|
||||
true,
|
||||
)),
|
||||
}
|
||||
.interpret(inventory, topology)
|
||||
.await;
|
||||
|
||||
match &result {
|
||||
Ok(_) => info!(
|
||||
"[Zitadel] Helm chart deployed successfully\n\n\
|
||||
===== ZITADEL DEPLOYMENT COMPLETE =====\n\
|
||||
Login URL: https://{host}\n\
|
||||
Username: admin@zitadel.{host}\n\
|
||||
Password: {admin_password}\n\n\
|
||||
IMPORTANT: The password is saved in ConfigMap 'zitadel-config-yaml'\n\
|
||||
and must be changed on first login. Save the credentials in a\n\
|
||||
secure location after changing them.\n\
|
||||
========================================="
|
||||
),
|
||||
Err(e) => error!("[Zitadel] Helm chart deployment failed: {e}"),
|
||||
}
|
||||
|
||||
result
|
||||
}
|
||||
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("Zitadel")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
vec![]
|
||||
}
|
||||
}
|
||||
17
harmony_node_readiness/Cargo.toml
Normal file
17
harmony_node_readiness/Cargo.toml
Normal file
@@ -0,0 +1,17 @@
|
||||
[package]
|
||||
name = "harmony-node-readiness-endpoint"
|
||||
version = "0.1.0"
|
||||
edition = "2024"
|
||||
|
||||
[dependencies]
|
||||
actix-web = "4"
|
||||
kube.workspace = true
|
||||
k8s-openapi.workspace = true
|
||||
serde.workspace = true
|
||||
serde_json.workspace = true
|
||||
env_logger.workspace = true
|
||||
log.workspace = true
|
||||
tokio.workspace = true
|
||||
reqwest.workspace = true
|
||||
chrono.workspace = true
|
||||
tower = "0.5.3"
|
||||
13
harmony_node_readiness/Dockerfile
Normal file
13
harmony_node_readiness/Dockerfile
Normal file
@@ -0,0 +1,13 @@
|
||||
FROM debian:13-slim
|
||||
|
||||
# RUN apt-get update && apt-get install -y --no-install-recommends \
|
||||
# ca-certificates \
|
||||
# && rm -rf /var/lib/apt/lists/*
|
||||
|
||||
COPY harmony-node-readiness-endpoint /usr/local/bin/harmony-node-readiness-endpoint
|
||||
|
||||
ENV RUST_LOG=info
|
||||
|
||||
EXPOSE 25001
|
||||
|
||||
CMD ["harmony-node-readiness-endpoint"]
|
||||
197
harmony_node_readiness/README.md
Normal file
197
harmony_node_readiness/README.md
Normal file
@@ -0,0 +1,197 @@
|
||||
# harmony-node-readiness-endpoint
|
||||
|
||||
**A lightweight, standalone Rust service for Kubernetes node health checking.**
|
||||
|
||||
Designed for **bare-metal Kubernetes clusters** with external load balancers (HAProxy, OPNsense, F5, etc.).
|
||||
|
||||
Exposes a simple HTTP endpoint (`/health`) on each node:
|
||||
|
||||
- **200 OK** — node is healthy and ready to receive traffic
|
||||
- **503 Service Unavailable** — node should be removed from the load balancer pool
|
||||
- **500 Internal Server Error** — misconfiguration (e.g. `NODE_NAME` not set)
|
||||
|
||||
This project is **not dependent on Harmony**, but is commonly used as part of Harmony bare-metal Kubernetes deployments.
|
||||
|
||||
## Why this project exists
|
||||
|
||||
In bare-metal environments, external load balancers often rely on pod-level or router-level checks that can lag behind the authoritative Kubernetes `Node.status.conditions[Ready]`.
|
||||
This service provides the true source-of-truth with fast reaction time.
|
||||
|
||||
## Available checks
|
||||
|
||||
| Check name | Description | Status |
|
||||
|--------------------|-------------------------------------------------------------|-------------------|
|
||||
| `node_ready` | Queries `Node.status.conditions[Ready]` via Kubernetes API | Implemented |
|
||||
| `okd_router_1936` | Probes OpenShift router `/healthz/ready` on port 1936 | Implemented |
|
||||
| `filesystem_ro` | Detects read-only mounts via `/proc/mounts` | To be implemented |
|
||||
| `kubelet` | Local probe to kubelet `/healthz` (port 10248) | To be implemented |
|
||||
| `container_runtime`| Socket check + runtime status | To be implemented |
|
||||
| `disk_pressure` | Threshold checks on key filesystems | To be implemented |
|
||||
| `network` | DNS resolution + gateway connectivity | To be implemented |
|
||||
| `custom_conditions`| Reacts to extra conditions (NPD, etc.) | To be implemented |
|
||||
|
||||
All checks are combined with logical **AND** — any single failure results in 503.
|
||||
|
||||
## Behavior
|
||||
|
||||
### `node_ready` check — fail-open design
|
||||
|
||||
The `node_ready` check queries the Kubernetes API server to read `Node.status.conditions[Ready]`.
|
||||
Because this service runs on the node it is checking, there are scenarios where the API server is temporarily
|
||||
unreachable (e.g. during a control-plane restart). To avoid incorrectly draining a healthy node in such cases,
|
||||
the check is **fail-open**: it passes (reports ready) whenever the Kubernetes API is unavailable.
|
||||
|
||||
| Situation | Result | HTTP status |
|
||||
|------------------------------------------------------|-------------------|-------------|
|
||||
| `Node.conditions[Ready] == True` | Pass | 200 |
|
||||
| `Node.conditions[Ready] == False` | Fail | 503 |
|
||||
| `Ready` condition absent | Fail | 503 |
|
||||
| API server unreachable or timed out (1 s timeout) | Pass (assumes ready) | 200 |
|
||||
| Kubernetes client initialization failed | Pass (assumes ready) | 200 |
|
||||
| `NODE_NAME` env var not set | Hard error | 500 |
|
||||
|
||||
A warning is logged whenever the API is unavailable and the check falls back to assuming ready.
|
||||
|
||||
### `okd_router_1936` check
|
||||
|
||||
Sends `GET http://127.0.0.1:1936/healthz/ready` with a 5-second timeout.
|
||||
Returns pass on any 2xx response, fail otherwise.
|
||||
|
||||
### Unknown check names
|
||||
|
||||
Requesting an unknown check name (e.g. `check=bogus`) results in that check returning `passed: false`
|
||||
with reason `"Unknown check: bogus"`, and the overall response is 503.
|
||||
|
||||
## How it works
|
||||
|
||||
### Node name discovery
|
||||
|
||||
The service reads the `NODE_NAME` environment variable, which must be injected via the Kubernetes Downward API:
|
||||
|
||||
```yaml
|
||||
env:
|
||||
- name: NODE_NAME
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: spec.nodeName
|
||||
```
|
||||
|
||||
### Kubernetes API authentication
|
||||
|
||||
- Uses standard **in-cluster configuration** — no external credentials needed.
|
||||
- The ServiceAccount token and CA certificate are automatically mounted at `/var/run/secrets/kubernetes.io/serviceaccount/`.
|
||||
- Requires only minimal RBAC: `get` and `list` on the `nodes` resource (see `deploy/resources.yaml`).
|
||||
- Connect and write timeouts are set to **1 second** to keep checks fast.
|
||||
|
||||
## Deploy
|
||||
|
||||
All Kubernetes resources (Namespace, ServiceAccount, ClusterRole, ClusterRoleBinding, and an OpenShift SCC RoleBinding for `hostnetwork`) are in a single file.
|
||||
|
||||
```bash
|
||||
kubectl apply -f deploy/resources.yaml
|
||||
kubectl apply -f deploy/daemonset.yaml
|
||||
```
|
||||
|
||||
The DaemonSet uses `hostNetwork: true` and `hostPort: 25001`, so the endpoint is reachable directly on the node's IP at port 25001.
|
||||
It tolerates all taints, ensuring it runs even on nodes marked unschedulable.
|
||||
|
||||
### Configure your external load balancer
|
||||
|
||||
**Example for HAProxy / OPNsense:**
|
||||
- Check type: **HTTP**
|
||||
- URI: `/health`
|
||||
- Port: `25001` (configurable via `LISTEN_PORT` env var)
|
||||
- Interval: 5–10 s
|
||||
- Rise: 2
|
||||
- Fall: 3
|
||||
- Expect: `2xx`
|
||||
|
||||
## Endpoint usage
|
||||
|
||||
### Query parameter
|
||||
|
||||
Use the `check` query parameter to select which checks to run (comma-separated).
|
||||
When omitted, only `node_ready` runs.
|
||||
|
||||
| Request | Checks run |
|
||||
|------------------------------------------------|-----------------------------------|
|
||||
| `GET /health` | `node_ready` |
|
||||
| `GET /health?check=okd_router_1936` | `okd_router_1936` only |
|
||||
| `GET /health?check=node_ready,okd_router_1936` | `node_ready` and `okd_router_1936`|
|
||||
|
||||
> **Note:** specifying `check=` replaces the default. Include `node_ready` explicitly if you need it alongside other checks.
|
||||
|
||||
### Response format
|
||||
|
||||
```json
|
||||
{
|
||||
"status": "ready" | "not-ready",
|
||||
"checks": [
|
||||
{
|
||||
"name": "<check-name>",
|
||||
"passed": true | false,
|
||||
"reason": "<failure reason, omitted on success>",
|
||||
"duration_ms": 42
|
||||
}
|
||||
],
|
||||
"total_duration_ms": 42
|
||||
}
|
||||
```
|
||||
|
||||
**Healthy node (default)**
|
||||
```http
|
||||
HTTP/1.1 200 OK
|
||||
|
||||
{
|
||||
"status": "ready",
|
||||
"checks": [{ "name": "node_ready", "passed": true, "duration_ms": 42 }],
|
||||
"total_duration_ms": 42
|
||||
}
|
||||
```
|
||||
|
||||
**Unhealthy node**
|
||||
```http
|
||||
HTTP/1.1 503 Service Unavailable
|
||||
|
||||
{
|
||||
"status": "not-ready",
|
||||
"checks": [
|
||||
{ "name": "node_ready", "passed": false, "reason": "KubeletNotReady", "duration_ms": 35 }
|
||||
],
|
||||
"total_duration_ms": 35
|
||||
}
|
||||
```
|
||||
|
||||
**API server unreachable (fail-open)**
|
||||
```http
|
||||
HTTP/1.1 200 OK
|
||||
|
||||
{
|
||||
"status": "ready",
|
||||
"checks": [{ "name": "node_ready", "passed": true, "duration_ms": 1001 }],
|
||||
"total_duration_ms": 1001
|
||||
}
|
||||
```
|
||||
*(A warning is logged: `Kubernetes API appears to be down … Assuming node is ready.`)*
|
||||
|
||||
## Configuration
|
||||
|
||||
| Env var | Default | Description |
|
||||
|---------------|----------|--------------------------------------|
|
||||
| `NODE_NAME` | required | Node name, injected via Downward API |
|
||||
| `LISTEN_PORT` | `25001` | TCP port the HTTP server binds to |
|
||||
| `RUST_LOG` | — | Log level (e.g. `info`, `debug`) |
|
||||
|
||||
## Development
|
||||
|
||||
```bash
|
||||
# Run locally
|
||||
NODE_NAME=my-test-node cargo run
|
||||
|
||||
# Run tests
|
||||
cargo test
|
||||
```
|
||||
|
||||
---
|
||||
|
||||
*Minimal, auditable, and built for production bare-metal Kubernetes environments.*
|
||||
13
harmony_node_readiness/build-docker.sh
Executable file
13
harmony_node_readiness/build-docker.sh
Executable file
@@ -0,0 +1,13 @@
|
||||
#!/bin/bash
|
||||
|
||||
# TODO
|
||||
# This is meant to be run on a machine with harmony development tools installed (cargo, etc)
|
||||
|
||||
DOCKER_TAG="${DOCKER_TAG:-dev}"
|
||||
|
||||
cargo build --release
|
||||
|
||||
cp ../target/release/harmony-node-readiness-endpoint .
|
||||
|
||||
docker build . -t hub.nationtech.io/harmony/harmony-node-readiness-endpoint:${DOCKER_TAG}
|
||||
|
||||
36
harmony_node_readiness/deploy/daemonset.yaml
Normal file
36
harmony_node_readiness/deploy/daemonset.yaml
Normal file
@@ -0,0 +1,36 @@
|
||||
apiVersion: apps/v1
|
||||
kind: DaemonSet
|
||||
metadata:
|
||||
name: node-healthcheck
|
||||
namespace: harmony-node-healthcheck
|
||||
spec:
|
||||
selector:
|
||||
matchLabels:
|
||||
app: node-healthcheck
|
||||
template:
|
||||
metadata:
|
||||
labels:
|
||||
app: node-healthcheck
|
||||
spec:
|
||||
serviceAccountName: node-healthcheck-sa
|
||||
hostNetwork: true
|
||||
# This ensures the pod runs even if the node is already "unschedulable"
|
||||
# so it can report the status correctly.
|
||||
tolerations:
|
||||
- operator: Exists
|
||||
containers:
|
||||
- name: checker
|
||||
image: hub.nationtech.io/harmony/harmony-node-readiness-endpoint:latest
|
||||
env:
|
||||
- name: NODE_NAME
|
||||
valueFrom:
|
||||
fieldRef:
|
||||
fieldPath: spec.nodeName
|
||||
ports:
|
||||
- containerPort: 25001
|
||||
hostPort: 25001
|
||||
name: health-port
|
||||
resources:
|
||||
requests:
|
||||
cpu: 10m
|
||||
memory: 50Mi
|
||||
64
harmony_node_readiness/deploy/resources.yaml
Normal file
64
harmony_node_readiness/deploy/resources.yaml
Normal file
@@ -0,0 +1,64 @@
|
||||
apiVersion: v1
|
||||
kind: Namespace
|
||||
metadata:
|
||||
name: harmony-node-healthcheck
|
||||
labels:
|
||||
name: harmony-node-healthcheck
|
||||
|
||||
---
|
||||
apiVersion: v1
|
||||
kind: ServiceAccount
|
||||
metadata:
|
||||
name: node-healthcheck-sa
|
||||
namespace: harmony-node-healthcheck
|
||||
|
||||
---
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: ClusterRole
|
||||
metadata:
|
||||
name: node-healthcheck-role
|
||||
rules:
|
||||
- apiGroups: [""]
|
||||
resources: ["nodes"]
|
||||
verbs: ["get", "list"]
|
||||
|
||||
---
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: Role
|
||||
metadata:
|
||||
name: allow-hostnetwork-scc
|
||||
namespace: harmony-node-healthcheck
|
||||
rules:
|
||||
- apiGroups: ["security.openshift.io"]
|
||||
resources: ["securitycontextconstraints"]
|
||||
resourceNames: ["hostnetwork"]
|
||||
verbs: ["use"]
|
||||
|
||||
---
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: RoleBinding
|
||||
metadata:
|
||||
name: node-status-querier-scc-binding
|
||||
namespace: harmony-node-healthcheck
|
||||
subjects:
|
||||
- kind: ServiceAccount
|
||||
name: node-healthcheck-sa
|
||||
namespace: harmony-node-healthcheck
|
||||
roleRef:
|
||||
kind: Role
|
||||
name: allow-hostnetwork-scc
|
||||
apiGroup: rbac.authorization.k8s.io
|
||||
|
||||
---
|
||||
apiVersion: rbac.authorization.k8s.io/v1
|
||||
kind: ClusterRoleBinding
|
||||
metadata:
|
||||
name: read-nodes-binding
|
||||
subjects:
|
||||
- kind: ServiceAccount
|
||||
name: node-healthcheck-sa
|
||||
namespace: harmony-node-healthcheck
|
||||
roleRef:
|
||||
kind: ClusterRole
|
||||
name: node-healthcheck-role
|
||||
apiGroup: rbac.authorization.k8s.io
|
||||
282
harmony_node_readiness/src/main.rs
Normal file
282
harmony_node_readiness/src/main.rs
Normal file
@@ -0,0 +1,282 @@
|
||||
use actix_web::{App, HttpResponse, HttpServer, Responder, get, web};
|
||||
use k8s_openapi::api::core::v1::Node;
|
||||
use kube::{Api, Client, Config};
|
||||
|
||||
use log::{debug, error, info, warn};
|
||||
use reqwest;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::env;
|
||||
use std::time::{Duration, Instant};
|
||||
use tokio::task::JoinSet;
|
||||
|
||||
const K8S_CLIENT_TIMEOUT: Duration = Duration::from_secs(1);
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct HealthStatus {
|
||||
status: String,
|
||||
checks: Vec<CheckResult>,
|
||||
total_duration_ms: u128,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct CheckResult {
|
||||
name: String,
|
||||
passed: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
reason: Option<String>,
|
||||
duration_ms: u128,
|
||||
}
|
||||
|
||||
#[derive(Serialize, Deserialize)]
|
||||
struct HealthError {
|
||||
status: String,
|
||||
error: String,
|
||||
}
|
||||
|
||||
#[derive(Deserialize)]
|
||||
struct HealthQuery {
|
||||
#[serde(rename = "check")]
|
||||
checks: Option<String>,
|
||||
}
|
||||
|
||||
/// Check if the node's Ready condition is true via Kubernetes API
|
||||
async fn check_node_ready(client: Client, node_name: &str) -> Result<(), String> {
|
||||
let nodes: Api<Node> = Api::all(client);
|
||||
|
||||
let node = match nodes.get(node_name).await {
|
||||
Ok(n) => n,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Kubernetes API appears to be down, unreachable, or timed out for node '{}': {}. Assuming node is ready.",
|
||||
node_name, e
|
||||
);
|
||||
return Ok(());
|
||||
}
|
||||
};
|
||||
|
||||
let conditions = node.status.and_then(|s| s.conditions).unwrap_or_default();
|
||||
|
||||
for condition in conditions {
|
||||
if condition.type_ == "Ready" {
|
||||
let is_ready = condition.status == "True";
|
||||
let reason = condition
|
||||
.reason
|
||||
.clone()
|
||||
.unwrap_or_else(|| "Unknown".to_string());
|
||||
|
||||
if !is_ready {
|
||||
return Err(reason);
|
||||
}
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
|
||||
Err("Ready condition not found".to_string())
|
||||
}
|
||||
|
||||
/// Check OKD router health endpoint on port 1936
|
||||
async fn check_okd_router_1936() -> Result<(), String> {
|
||||
debug!("Checking okd router 1936");
|
||||
let client = reqwest::Client::builder()
|
||||
.timeout(std::time::Duration::from_secs(5))
|
||||
.build()
|
||||
.map_err(|e| format!("Failed to build HTTP client: {}", e))?;
|
||||
|
||||
let response = client
|
||||
.get("http://127.0.0.1:1936/healthz/ready")
|
||||
.send()
|
||||
.await
|
||||
.map_err(|e| format!("Failed to connect to OKD router: {}", e))?;
|
||||
|
||||
debug!("okd router 1936 response status {}", response.status());
|
||||
|
||||
if response.status().is_success() {
|
||||
Ok(())
|
||||
} else {
|
||||
Err(format!("OKD router returned status: {}", response.status()))
|
||||
}
|
||||
}
|
||||
|
||||
/// Parse comma-separated check names from query parameter
|
||||
fn parse_checks(checks_param: Option<&str>) -> Vec<String> {
|
||||
match checks_param {
|
||||
None => vec!["node_ready".to_string()],
|
||||
Some(s) if s.is_empty() => vec!["node_ready".to_string()],
|
||||
Some(s) => s.split(',').map(|c| c.trim().to_string()).collect(),
|
||||
}
|
||||
}
|
||||
|
||||
/// Run a single health check by name and return the result
|
||||
async fn run_check(check_name: &str, client: Option<Client>, node_name: &str) -> CheckResult {
|
||||
let start = Instant::now();
|
||||
|
||||
let result = match check_name {
|
||||
"node_ready" => match client {
|
||||
Some(c) => check_node_ready(c, node_name).await,
|
||||
None => {
|
||||
warn!(
|
||||
"Kubernetes client not available for node '{}'. Assuming node is ready.",
|
||||
node_name
|
||||
);
|
||||
Ok(())
|
||||
}
|
||||
},
|
||||
"okd_router_1936" => check_okd_router_1936().await,
|
||||
_ => Err(format!("Unknown check: {}", check_name)),
|
||||
};
|
||||
|
||||
let duration_ms = start.elapsed().as_millis();
|
||||
|
||||
match result {
|
||||
Ok(()) => CheckResult {
|
||||
name: check_name.to_string(),
|
||||
passed: true,
|
||||
reason: None,
|
||||
duration_ms,
|
||||
},
|
||||
Err(reason) => CheckResult {
|
||||
name: check_name.to_string(),
|
||||
passed: false,
|
||||
reason: Some(reason),
|
||||
duration_ms,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
#[get("/health")]
|
||||
async fn health(query: web::Query<HealthQuery>) -> impl Responder {
|
||||
let node_name = match env::var("NODE_NAME") {
|
||||
Ok(name) => name,
|
||||
Err(_) => {
|
||||
error!("NODE_NAME environment variable not set");
|
||||
return HttpResponse::InternalServerError().json(HealthError {
|
||||
status: "error".to_string(),
|
||||
error: "NODE_NAME environment variable not set".to_string(),
|
||||
});
|
||||
}
|
||||
};
|
||||
|
||||
// Parse requested checks from query parameter
|
||||
let requested_checks = parse_checks(query.checks.as_deref());
|
||||
|
||||
// Check if node_ready check requires Kubernetes client
|
||||
let needs_k8s_client = requested_checks.contains(&"node_ready".to_string());
|
||||
|
||||
// Initialize Kubernetes client only if needed
|
||||
let k8s_client = if needs_k8s_client {
|
||||
match Config::infer().await {
|
||||
Ok(mut config) => {
|
||||
config.write_timeout = Some(K8S_CLIENT_TIMEOUT);
|
||||
config.connect_timeout = Some(K8S_CLIENT_TIMEOUT);
|
||||
Some(Client::try_from(config).map_err(|e| e.to_string()))
|
||||
}
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to infer Kubernetes config for node '{}': {}. Assuming node_ready is healthy.",
|
||||
node_name, e
|
||||
);
|
||||
None
|
||||
}
|
||||
}
|
||||
.and_then(|result| match result {
|
||||
Ok(client) => Some(client),
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Failed to create Kubernetes client for node '{}': {}. Assuming node_ready is healthy.",
|
||||
node_name, e
|
||||
);
|
||||
None
|
||||
}
|
||||
})
|
||||
} else {
|
||||
None
|
||||
};
|
||||
|
||||
// Run all requested checks in parallel
|
||||
let start = Instant::now();
|
||||
let mut join_set = JoinSet::new();
|
||||
debug!("Running checks {requested_checks:?}");
|
||||
|
||||
for check_name in requested_checks {
|
||||
let client = k8s_client.clone();
|
||||
let node_name = node_name.clone();
|
||||
join_set.spawn(async move { run_check(&check_name, client, &node_name).await });
|
||||
}
|
||||
let mut check_results = Vec::new();
|
||||
while let Some(result) = join_set.join_next().await {
|
||||
match result {
|
||||
Ok(check) => check_results.push(check),
|
||||
Err(e) => error!("Check task failed: {}", e),
|
||||
}
|
||||
}
|
||||
let total_duration_ms = start.elapsed().as_millis();
|
||||
|
||||
// Determine overall status
|
||||
let all_passed = check_results.iter().all(|c| c.passed);
|
||||
|
||||
if all_passed {
|
||||
info!(
|
||||
"All health checks passed for node '{}' in {}ms",
|
||||
node_name, total_duration_ms
|
||||
);
|
||||
HttpResponse::Ok().json(HealthStatus {
|
||||
status: "ready".to_string(),
|
||||
checks: check_results,
|
||||
total_duration_ms,
|
||||
})
|
||||
} else {
|
||||
let failed_checks: Vec<&str> = check_results
|
||||
.iter()
|
||||
.filter(|c| !c.passed)
|
||||
.map(|c| c.name.as_str())
|
||||
.collect();
|
||||
warn!(
|
||||
"Health checks failed for node '{}' in {}ms: {:?}",
|
||||
node_name, total_duration_ms, failed_checks
|
||||
);
|
||||
HttpResponse::ServiceUnavailable().json(HealthStatus {
|
||||
status: "not-ready".to_string(),
|
||||
checks: check_results,
|
||||
total_duration_ms,
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
#[actix_web::main]
|
||||
async fn main() -> std::io::Result<()> {
|
||||
env_logger::init();
|
||||
|
||||
let port = env::var("LISTEN_PORT").unwrap_or_else(|_| "25001".to_string());
|
||||
let port = port
|
||||
.parse::<u16>()
|
||||
.unwrap_or_else(|_| panic!("Invalid port number: {}", port));
|
||||
let bind_addr = format!("0.0.0.0:{}", port);
|
||||
|
||||
info!("Starting harmony-node-readiness-endpoint on {}", bind_addr);
|
||||
|
||||
HttpServer::new(|| App::new().service(health))
|
||||
.workers(3)
|
||||
.bind(&bind_addr)?
|
||||
.run()
|
||||
.await
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use kube::error::ErrorResponse;
|
||||
|
||||
#[test]
|
||||
fn parse_checks_defaults_to_node_ready() {
|
||||
assert_eq!(parse_checks(None), vec!["node_ready"]);
|
||||
assert_eq!(parse_checks(Some("")), vec!["node_ready"]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_checks_splits_and_trims_values() {
|
||||
assert_eq!(
|
||||
parse_checks(Some("node_ready, okd_router_1936 ")),
|
||||
vec!["node_ready", "okd_router_1936"]
|
||||
);
|
||||
}
|
||||
}
|
||||
@@ -21,6 +21,7 @@ http.workspace = true
|
||||
inquire.workspace = true
|
||||
interactive-parse = "0.1.5"
|
||||
schemars = "0.8"
|
||||
vaultrs = "0.7.4"
|
||||
|
||||
[dev-dependencies]
|
||||
pretty_assertions.workspace = true
|
||||
|
||||
@@ -1,4 +1,5 @@
|
||||
use lazy_static::lazy_static;
|
||||
use std::env;
|
||||
|
||||
lazy_static! {
|
||||
pub static ref SECRET_NAMESPACE: String =
|
||||
@@ -16,3 +17,16 @@ lazy_static! {
|
||||
pub static ref INFISICAL_CLIENT_SECRET: Option<String> =
|
||||
std::env::var("HARMONY_SECRET_INFISICAL_CLIENT_SECRET").ok();
|
||||
}
|
||||
|
||||
lazy_static! {
|
||||
// Openbao/Vault configuration
|
||||
pub static ref OPENBAO_URL: Option<String> =
|
||||
env::var("OPENBAO_URL").or(env::var("VAULT_ADDR")).ok();
|
||||
pub static ref OPENBAO_TOKEN: Option<String> = env::var("OPENBAO_TOKEN").ok();
|
||||
pub static ref OPENBAO_USERNAME: Option<String> = env::var("OPENBAO_USERNAME").ok();
|
||||
pub static ref OPENBAO_PASSWORD: Option<String> = env::var("OPENBAO_PASSWORD").ok();
|
||||
pub static ref OPENBAO_SKIP_TLS: bool =
|
||||
env::var("OPENBAO_SKIP_TLS").map(|v| v == "true").unwrap_or(false);
|
||||
pub static ref OPENBAO_KV_MOUNT: String =
|
||||
env::var("OPENBAO_KV_MOUNT").unwrap_or_else(|_| "secret".to_string());
|
||||
}
|
||||
|
||||
@@ -8,6 +8,12 @@ use config::INFISICAL_CLIENT_SECRET;
|
||||
use config::INFISICAL_ENVIRONMENT;
|
||||
use config::INFISICAL_PROJECT_ID;
|
||||
use config::INFISICAL_URL;
|
||||
use config::OPENBAO_KV_MOUNT;
|
||||
use config::OPENBAO_PASSWORD;
|
||||
use config::OPENBAO_SKIP_TLS;
|
||||
use config::OPENBAO_TOKEN;
|
||||
use config::OPENBAO_URL;
|
||||
use config::OPENBAO_USERNAME;
|
||||
use config::SECRET_STORE;
|
||||
use interactive_parse::InteractiveParseObj;
|
||||
use log::debug;
|
||||
@@ -17,6 +23,7 @@ use serde::{Serialize, de::DeserializeOwned};
|
||||
use std::fmt;
|
||||
use store::InfisicalSecretStore;
|
||||
use store::LocalFileSecretStore;
|
||||
use store::OpenbaoSecretStore;
|
||||
use thiserror::Error;
|
||||
use tokio::sync::OnceCell;
|
||||
|
||||
@@ -69,11 +76,24 @@ async fn get_secret_manager() -> &'static SecretManager {
|
||||
|
||||
/// The async initialization function for the SecretManager.
|
||||
async fn init_secret_manager() -> SecretManager {
|
||||
let default_secret_score = "infisical".to_string();
|
||||
let store_type = SECRET_STORE.as_ref().unwrap_or(&default_secret_score);
|
||||
let default_secret_store = "infisical".to_string();
|
||||
let store_type = SECRET_STORE.as_ref().unwrap_or(&default_secret_store);
|
||||
|
||||
let store: Box<dyn SecretStore> = match store_type.as_str() {
|
||||
"file" => Box::new(LocalFileSecretStore::default()),
|
||||
"openbao" | "vault" => {
|
||||
let store = OpenbaoSecretStore::new(
|
||||
OPENBAO_URL.clone().expect("Openbao/Vault URL must be set, see harmony_secret config for ways to provide it. You can try with OPENBAO_URL or VAULT_ADDR"),
|
||||
OPENBAO_KV_MOUNT.clone(),
|
||||
*OPENBAO_SKIP_TLS,
|
||||
OPENBAO_TOKEN.clone(),
|
||||
OPENBAO_USERNAME.clone(),
|
||||
OPENBAO_PASSWORD.clone(),
|
||||
)
|
||||
.await
|
||||
.expect("Failed to initialize Openbao/Vault secret store");
|
||||
Box::new(store)
|
||||
}
|
||||
"infisical" | _ => {
|
||||
let store = InfisicalSecretStore::new(
|
||||
INFISICAL_URL.clone().expect("Infisical url must be set, see harmony_secret config for ways to provide it. You can try with HARMONY_SECRET_INFISICAL_URL"),
|
||||
|
||||
@@ -1,4 +1,9 @@
|
||||
mod infisical;
|
||||
mod local_file;
|
||||
mod openbao;
|
||||
|
||||
pub use infisical::InfisicalSecretStore;
|
||||
pub use infisical::*;
|
||||
pub use local_file::LocalFileSecretStore;
|
||||
pub use local_file::*;
|
||||
pub use openbao::OpenbaoSecretStore;
|
||||
|
||||
317
harmony_secret/src/store/openbao.rs
Normal file
317
harmony_secret/src/store/openbao.rs
Normal file
@@ -0,0 +1,317 @@
|
||||
use crate::{SecretStore, SecretStoreError};
|
||||
use async_trait::async_trait;
|
||||
use log::{debug, info, warn};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::fmt::Debug;
|
||||
use std::fs;
|
||||
use std::path::PathBuf;
|
||||
use vaultrs::auth;
|
||||
use vaultrs::client::{Client, VaultClient, VaultClientSettingsBuilder};
|
||||
use vaultrs::kv2;
|
||||
|
||||
/// Token response from Vault/Openbao auth endpoints
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct TokenResponse {
|
||||
auth: AuthInfo,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct AuthInfo {
|
||||
client_token: String,
|
||||
#[serde(default)]
|
||||
lease_duration: Option<u64>,
|
||||
token_type: String,
|
||||
}
|
||||
|
||||
impl From<vaultrs::api::AuthInfo> for AuthInfo {
|
||||
fn from(value: vaultrs::api::AuthInfo) -> Self {
|
||||
AuthInfo {
|
||||
client_token: value.client_token,
|
||||
token_type: value.token_type,
|
||||
lease_duration: Some(value.lease_duration),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub struct OpenbaoSecretStore {
|
||||
client: VaultClient,
|
||||
kv_mount: String,
|
||||
}
|
||||
|
||||
impl Debug for OpenbaoSecretStore {
|
||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
||||
f.debug_struct("OpenbaoSecretStore")
|
||||
.field("client", &self.client.settings)
|
||||
.field("kv_mount", &self.kv_mount)
|
||||
.finish()
|
||||
}
|
||||
}
|
||||
|
||||
impl OpenbaoSecretStore {
|
||||
/// Creates a new Openbao/Vault secret store with authentication
|
||||
pub async fn new(
|
||||
base_url: String,
|
||||
kv_mount: String,
|
||||
skip_tls: bool,
|
||||
token: Option<String>,
|
||||
username: Option<String>,
|
||||
password: Option<String>,
|
||||
) -> Result<Self, SecretStoreError> {
|
||||
info!("OPENBAO_STORE: Initializing client for URL: {base_url}");
|
||||
|
||||
// 1. If token is provided via env var, use it directly
|
||||
if let Some(t) = token {
|
||||
debug!("OPENBAO_STORE: Using token from environment variable");
|
||||
return Self::with_token(&base_url, skip_tls, &t, &kv_mount);
|
||||
}
|
||||
|
||||
// 2. Try to load cached token
|
||||
let cache_path = Self::get_token_cache_path(&base_url);
|
||||
if let Ok(cached_token) = Self::load_cached_token(&cache_path) {
|
||||
debug!("OPENBAO_STORE: Found cached token, validating...");
|
||||
if Self::validate_token(&base_url, skip_tls, &cached_token.client_token).await {
|
||||
info!("OPENBAO_STORE: Cached token is valid");
|
||||
return Self::with_token(
|
||||
&base_url,
|
||||
skip_tls,
|
||||
&cached_token.client_token,
|
||||
&kv_mount,
|
||||
);
|
||||
}
|
||||
warn!("OPENBAO_STORE: Cached token is invalid or expired");
|
||||
}
|
||||
|
||||
// 3. Authenticate with username/password
|
||||
let (user, pass) = match (username, password) {
|
||||
(Some(u), Some(p)) => (u, p),
|
||||
_ => {
|
||||
return Err(SecretStoreError::Store(
|
||||
"No valid token found and username/password not provided. \
|
||||
Set OPENBAO_TOKEN or OPENBAO_USERNAME/OPENBAO_PASSWORD environment variables."
|
||||
.into(),
|
||||
));
|
||||
}
|
||||
};
|
||||
|
||||
let token =
|
||||
Self::authenticate_userpass(&base_url, &kv_mount, skip_tls, &user, &pass).await?;
|
||||
|
||||
// Cache the token
|
||||
if let Err(e) = Self::cache_token(&cache_path, &token) {
|
||||
warn!("OPENBAO_STORE: Failed to cache token: {e}");
|
||||
}
|
||||
|
||||
Self::with_token(&base_url, skip_tls, &token.client_token, &kv_mount)
|
||||
}
|
||||
|
||||
/// Create a client with an existing token
|
||||
fn with_token(
|
||||
base_url: &str,
|
||||
skip_tls: bool,
|
||||
token: &str,
|
||||
kv_mount: &str,
|
||||
) -> Result<Self, SecretStoreError> {
|
||||
let mut settings = VaultClientSettingsBuilder::default();
|
||||
settings.address(base_url).token(token);
|
||||
|
||||
if skip_tls {
|
||||
warn!("OPENBAO_STORE: Skipping TLS verification - not recommended for production!");
|
||||
settings.verify(false);
|
||||
}
|
||||
|
||||
let client = VaultClient::new(
|
||||
settings
|
||||
.build()
|
||||
.map_err(|e| SecretStoreError::Store(Box::new(e)))?,
|
||||
)
|
||||
.map_err(|e| SecretStoreError::Store(Box::new(e)))?;
|
||||
|
||||
Ok(Self {
|
||||
client,
|
||||
kv_mount: kv_mount.to_string(),
|
||||
})
|
||||
}
|
||||
|
||||
/// Get the cache file path for a given base URL
|
||||
fn get_token_cache_path(base_url: &str) -> PathBuf {
|
||||
let hash = Self::hash_url(base_url);
|
||||
directories::BaseDirs::new()
|
||||
.map(|dirs| {
|
||||
dirs.data_dir()
|
||||
.join("harmony")
|
||||
.join("secrets")
|
||||
.join(format!("openbao_token_{hash}"))
|
||||
})
|
||||
.unwrap_or_else(|| PathBuf::from(format!("/tmp/openbao_token_{hash}")))
|
||||
}
|
||||
|
||||
/// Create a simple hash of the URL for unique cache files
|
||||
fn hash_url(url: &str) -> String {
|
||||
use std::collections::hash_map::DefaultHasher;
|
||||
use std::hash::{Hash, Hasher};
|
||||
let mut hasher = DefaultHasher::new();
|
||||
url.hash(&mut hasher);
|
||||
format!("{:016x}", hasher.finish())
|
||||
}
|
||||
|
||||
/// Load cached token from file
|
||||
fn load_cached_token(path: &PathBuf) -> Result<AuthInfo, String> {
|
||||
serde_json::from_str(
|
||||
&fs::read_to_string(path)
|
||||
.map_err(|e| format!("Could not load token from file {path:?} : {e}"))?,
|
||||
)
|
||||
.map_err(|e| format!("Could not deserialize token from file {path:?} : {e}"))
|
||||
}
|
||||
|
||||
/// Cache token to file
|
||||
fn cache_token(path: &PathBuf, token: &AuthInfo) -> Result<(), std::io::Error> {
|
||||
if let Some(parent) = path.parent() {
|
||||
fs::create_dir_all(parent)?;
|
||||
}
|
||||
// Set file permissions to 0600 (owner read/write only)
|
||||
#[cfg(unix)]
|
||||
{
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
let mut file = fs::OpenOptions::new()
|
||||
.write(true)
|
||||
.create(true)
|
||||
.truncate(true)
|
||||
.mode(0o600)
|
||||
.open(path)?;
|
||||
use std::io::Write;
|
||||
file.write_all(serde_json::to_string(token)?.as_bytes())?;
|
||||
}
|
||||
#[cfg(not(unix))]
|
||||
{
|
||||
fs::write(path, token)?;
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Validate if a token is still valid using vaultrs
|
||||
async fn validate_token(base_url: &str, skip_tls: bool, token: &str) -> bool {
|
||||
let mut settings = VaultClientSettingsBuilder::default();
|
||||
settings.address(base_url).token(token);
|
||||
if skip_tls {
|
||||
settings.verify(false);
|
||||
}
|
||||
|
||||
if let Some(settings) = settings.build().ok() {
|
||||
let client = match VaultClient::new(settings) {
|
||||
Ok(s) => s,
|
||||
Err(_) => return false,
|
||||
};
|
||||
return vaultrs::token::lookup(&client, token).await.is_ok();
|
||||
}
|
||||
false
|
||||
}
|
||||
|
||||
/// Authenticate using username/password (userpass auth method)
|
||||
async fn authenticate_userpass(
|
||||
base_url: &str,
|
||||
kv_mount: &str,
|
||||
skip_tls: bool,
|
||||
username: &str,
|
||||
password: &str,
|
||||
) -> Result<AuthInfo, SecretStoreError> {
|
||||
info!("OPENBAO_STORE: Authenticating with username/password");
|
||||
|
||||
// Create a client without a token for authentication
|
||||
let mut settings = VaultClientSettingsBuilder::default();
|
||||
settings.address(base_url);
|
||||
if skip_tls {
|
||||
settings.verify(false);
|
||||
}
|
||||
|
||||
let client = VaultClient::new(
|
||||
settings
|
||||
.build()
|
||||
.map_err(|e| SecretStoreError::Store(Box::new(e)))?,
|
||||
)
|
||||
.map_err(|e| SecretStoreError::Store(Box::new(e)))?;
|
||||
|
||||
// Authenticate using userpass method
|
||||
let token = auth::userpass::login(&client, kv_mount, username, password)
|
||||
.await
|
||||
.map_err(|e| SecretStoreError::Store(Box::new(e)))?;
|
||||
|
||||
Ok(token.into())
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl SecretStore for OpenbaoSecretStore {
|
||||
async fn get_raw(&self, namespace: &str, key: &str) -> Result<Vec<u8>, SecretStoreError> {
|
||||
let path = format!("{}/{}", namespace, key);
|
||||
info!("OPENBAO_STORE: Getting key '{key}' from namespace '{namespace}'");
|
||||
debug!("OPENBAO_STORE: Request path: {path}");
|
||||
|
||||
let data: serde_json::Value = kv2::read(&self.client, &self.kv_mount, &path)
|
||||
.await
|
||||
.map_err(|e| {
|
||||
// Check for not found error
|
||||
if e.to_string().contains("does not exist") || e.to_string().contains("404") {
|
||||
SecretStoreError::NotFound {
|
||||
namespace: namespace.to_string(),
|
||||
key: key.to_string(),
|
||||
}
|
||||
} else {
|
||||
SecretStoreError::Store(Box::new(e))
|
||||
}
|
||||
})?;
|
||||
|
||||
// Extract the actual secret value stored under the "value" key
|
||||
let value = data.get("value").and_then(|v| v.as_str()).ok_or_else(|| {
|
||||
SecretStoreError::Store("Secret does not contain expected 'value' field".into())
|
||||
})?;
|
||||
|
||||
Ok(value.as_bytes().to_vec())
|
||||
}
|
||||
|
||||
async fn set_raw(
|
||||
&self,
|
||||
namespace: &str,
|
||||
key: &str,
|
||||
val: &[u8],
|
||||
) -> Result<(), SecretStoreError> {
|
||||
let path = format!("{}/{}", namespace, key);
|
||||
info!("OPENBAO_STORE: Setting key '{key}' in namespace '{namespace}'");
|
||||
debug!("OPENBAO_STORE: Request path: {path}");
|
||||
|
||||
let value_str =
|
||||
String::from_utf8(val.to_vec()).map_err(|e| SecretStoreError::Store(Box::new(e)))?;
|
||||
|
||||
// Create the data structure expected by our format
|
||||
let data = serde_json::json!({
|
||||
"value": value_str
|
||||
});
|
||||
|
||||
kv2::set(&self.client, &self.kv_mount, &path, &data)
|
||||
.await
|
||||
.map_err(|e| SecretStoreError::Store(Box::new(e)))?;
|
||||
|
||||
info!("OPENBAO_STORE: Successfully stored secret '{key}'");
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn test_hash_url_consistency() {
|
||||
let url = "https://vault.example.com:8200";
|
||||
let hash1 = OpenbaoSecretStore::hash_url(url);
|
||||
let hash2 = OpenbaoSecretStore::hash_url(url);
|
||||
assert_eq!(hash1, hash2);
|
||||
assert_eq!(hash1.len(), 16);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_hash_url_uniqueness() {
|
||||
let hash1 = OpenbaoSecretStore::hash_url("https://vault1.example.com");
|
||||
let hash2 = OpenbaoSecretStore::hash_url("https://vault2.example.com");
|
||||
assert_ne!(hash1, hash2);
|
||||
}
|
||||
}
|
||||
@@ -344,7 +344,7 @@ pub struct StaticMap {
|
||||
pub mac: String,
|
||||
pub ipaddr: String,
|
||||
pub cid: Option<MaybeString>,
|
||||
pub hostname: String,
|
||||
pub hostname: Option<String>,
|
||||
pub descr: Option<MaybeString>,
|
||||
pub winsserver: MaybeString,
|
||||
pub dnsserver: MaybeString,
|
||||
@@ -383,24 +383,24 @@ pub struct Outbound {
|
||||
|
||||
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
|
||||
pub struct NatRule {
|
||||
pub protocol: String,
|
||||
pub interface: String,
|
||||
pub category: MaybeString,
|
||||
pub ipprotocol: String,
|
||||
pub descr: MaybeString,
|
||||
pub tag: MaybeString,
|
||||
pub protocol: Option<String>,
|
||||
pub interface: Option<String>,
|
||||
pub category: Option<MaybeString>,
|
||||
pub ipprotocol: Option<String>,
|
||||
pub descr: Option<MaybeString>,
|
||||
pub tag: Option<MaybeString>,
|
||||
pub tagged: Option<MaybeString>,
|
||||
pub poolopts: PoolOpts,
|
||||
pub poolopts: Option<PoolOpts>,
|
||||
#[yaserde(rename = "associated-rule-id")]
|
||||
pub associated_rule_id: Option<MaybeString>,
|
||||
pub disabled: Option<u8>,
|
||||
pub target: String,
|
||||
pub target: Option<String>,
|
||||
#[yaserde(rename = "local-port")]
|
||||
pub local_port: i32,
|
||||
pub source: Source,
|
||||
pub destination: Destination,
|
||||
pub updated: Updated,
|
||||
pub created: Created,
|
||||
pub local_port: Option<i32>,
|
||||
pub source: Option<Source>,
|
||||
pub destination: Option<Destination>,
|
||||
pub updated: Option<Updated>,
|
||||
pub created: Option<Created>,
|
||||
}
|
||||
|
||||
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
|
||||
@@ -1408,6 +1408,7 @@ pub struct Account {
|
||||
pub hostnames: String,
|
||||
pub wildcard: i32,
|
||||
pub zone: MaybeString,
|
||||
pub dynipv6host: Option<MaybeString>,
|
||||
pub checkip: String,
|
||||
#[yaserde(rename = "checkip_timeout")]
|
||||
pub checkip_timeout: i32,
|
||||
@@ -1539,12 +1540,12 @@ pub struct Dyndns {
|
||||
pub struct Vlans {
|
||||
#[yaserde(attribute = true)]
|
||||
pub version: String,
|
||||
pub vlan: MaybeString,
|
||||
pub vlan: RawXml,
|
||||
}
|
||||
|
||||
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
|
||||
pub struct Bridges {
|
||||
pub bridged: Option<MaybeString>,
|
||||
pub bridged: Option<RawXml>,
|
||||
}
|
||||
|
||||
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
|
||||
|
||||
@@ -48,7 +48,7 @@ impl<'a> DhcpConfigLegacyISC<'a> {
|
||||
hostname: &str,
|
||||
) -> Result<(), DhcpError> {
|
||||
let mac = mac.to_string();
|
||||
let hostname = hostname.to_string();
|
||||
let hostname = Some(hostname.to_string());
|
||||
let lan_dhcpd = self.get_lan_dhcpd();
|
||||
let existing_mappings: &mut Vec<StaticMap> = &mut lan_dhcpd.staticmaps;
|
||||
|
||||
@@ -121,7 +121,7 @@ impl<'a> DhcpConfigLegacyISC<'a> {
|
||||
.map(|entry| StaticMap {
|
||||
mac: entry["mac"].as_str().unwrap_or_default().to_string(),
|
||||
ipaddr: entry["ipaddr"].as_str().unwrap_or_default().to_string(),
|
||||
hostname: entry["hostname"].as_str().unwrap_or_default().to_string(),
|
||||
hostname: Some(entry["hostname"].as_str().unwrap_or_default().to_string()),
|
||||
descr: entry["descr"].as_str().map(MaybeString::from),
|
||||
..Default::default()
|
||||
})
|
||||
|
||||
@@ -213,7 +213,7 @@ impl<'a> DhcpConfigDnsMasq<'a> {
|
||||
.map(|entry| StaticMap {
|
||||
mac: entry["mac"].as_str().unwrap_or_default().to_string(),
|
||||
ipaddr: entry["ipaddr"].as_str().unwrap_or_default().to_string(),
|
||||
hostname: entry["hostname"].as_str().unwrap_or_default().to_string(),
|
||||
hostname: Some(entry["hostname"].as_str().unwrap_or_default().to_string()),
|
||||
descr: entry["descr"].as_str().map(MaybeString::from),
|
||||
..Default::default()
|
||||
})
|
||||
|
||||
Reference in New Issue
Block a user