Compare commits

...

65 Commits

Author SHA1 Message Date
304490977c wip: vllm example
Some checks failed
Run Check Script / check (pull_request) Failing after 36s
2026-03-23 08:40:29 -04:00
8499f4d1b7 Merge pull request 'fix: small details were preventing to re-save frontends,backends and healthchecks in opnsense UI' (#248) from fix/load-balancer-xml into master
Some checks failed
Run Check Script / check (push) Has been cancelled
Compile and package harmony_composer / package_harmony_composer (push) Has been cancelled
Reviewed-on: #248
2026-03-17 14:38:35 +00:00
aa07f4c8ad Merge pull request 'fix/dynamically_get_public_domain' (#234) from fix/dynamically_get_public_domain into master
Some checks failed
Compile and package harmony_composer / package_harmony_composer (push) Failing after 1m48s
Run Check Script / check (push) Failing after 11m1s
Reviewed-on: #234
Reviewed-by: johnride <jg@nationtech.io>
2026-03-15 14:07:25 +00:00
77bb138497 Merge remote-tracking branch 'origin/master' into fix/dynamically_get_public_domain
All checks were successful
Run Check Script / check (pull_request) Successful in 1m20s
2026-03-15 09:54:36 -04:00
a16879b1b6 Merge pull request 'fix: readded tokio retry to get ca cert for a nats cluster which was accidentally removed during a refactor' (#229) from fix/nats-ca-cert-retry into master
Some checks failed
Compile and package harmony_composer / package_harmony_composer (push) Failing after 2m1s
Run Check Script / check (push) Failing after 12m23s
Reviewed-on: #229
2026-03-15 12:36:05 +00:00
f57e6f5957 Merge pull request 'feat: add priorityClass to node_health daemonset' (#249) from feat/health_endpoint_priority_class into master
Some checks failed
Run Check Script / check (push) Successful in 1m28s
Compile and package harmony_composer / package_harmony_composer (push) Failing after 1m59s
Reviewed-on: #249
2026-03-14 18:53:30 +00:00
7605d05de3 fix: opnsense fixes for st-mcd (cb1)
All checks were successful
Run Check Script / check (pull_request) Successful in 1m29s
2026-03-13 13:13:37 -04:00
b244127843 feat: add priorityClass to node_health daemonset
All checks were successful
Run Check Script / check (pull_request) Successful in 1m27s
2026-03-13 11:18:18 -04:00
67c3265286 fix: small details were preventing to re-save frontends,backends and healthchecks in opnsense UI
All checks were successful
Run Check Script / check (pull_request) Successful in 2m12s
2026-03-13 10:31:17 -04:00
d10598d01e Merge pull request 'okdload balancer using 1936 port http healthcheck' (#240) from feat/okd_loadbalancer_betterhealthcheck into master
Some checks failed
Run Check Script / check (push) Successful in 1m26s
Compile and package harmony_composer / package_harmony_composer (push) Failing after 1m56s
Reviewed-on: #240
2026-03-10 17:45:51 +00:00
61ba7257d0 fix: remove broken test
All checks were successful
Run Check Script / check (pull_request) Successful in 1m22s
2026-03-10 13:40:24 -04:00
b0e9594d92 Merge branch 'master' into feat/okd_loadbalancer_betterhealthcheck
Some checks failed
Run Check Script / check (pull_request) Failing after 47s
2026-03-07 23:06:50 +00:00
2a7fa466cc Merge pull request 'reafactor/k8sclient' (#243) from reafactor/k8sclient into master
Some checks failed
Run Check Script / check (push) Successful in 2m57s
Compile and package harmony_composer / package_harmony_composer (push) Failing after 2m2s
Reviewed-on: #243
2026-03-07 23:05:09 +00:00
f463cd1e94 Fix merge conflict between master and refactor/k8sclient
All checks were successful
Run Check Script / check (pull_request) Successful in 1m28s
2026-03-07 17:56:26 -05:00
e1da7949ec Merge pull request 'okd: add worker nodes to load balancer backend pool' (#246) from feat/okd-load-balancer-include-workers into master
Some checks failed
Run Check Script / check (push) Successful in 1m30s
Compile and package harmony_composer / package_harmony_composer (push) Failing after 1m53s
Reviewed-on: #246
2026-03-07 22:42:14 +00:00
d0a1a73710 doc: fix example code to use ignore instead of no_run
All checks were successful
Run Check Script / check (pull_request) Successful in 1m43s
-  fails because  cannot be used at module level
- Use  to skip doc compilation while keeping example visible
2026-03-07 17:30:24 -05:00
bc2b328296 okd: include workers in load balancer backend pool + add tests and docs
Some checks failed
Run Check Script / check (pull_request) Failing after 24s
- Add nodes_to_backend_server() function to include both control plane and worker nodes
- Update public services (ports 80, 443) to use worker-inclusive backend pool
- Add comprehensive tests covering all backend configurations
- Add documentation with OKD reference link and usage examples
2026-03-07 17:15:24 -05:00
a93896707f okd: add worker nodes to load balancer backend pool
All checks were successful
Run Check Script / check (pull_request) Successful in 1m29s
Include both control plane and worker nodes in ports 80 and 443 backend pools
2026-03-07 16:46:47 -05:00
0e9b23a320 Merge branch 'feat/change-node-readiness-strategy'
Some checks failed
Run Check Script / check (push) Successful in 1m26s
Compile and package harmony_composer / package_harmony_composer (push) Failing after 2m11s
2026-03-07 16:35:14 -05:00
f532ba2b40 doc: Update node readiness readme and deployed port to 25001
All checks were successful
Run Check Script / check (pull_request) Successful in 1m27s
2026-03-07 16:33:28 -05:00
fafca31798 fix: formatting and check script
All checks were successful
Run Check Script / check (pull_request) Successful in 1m28s
2026-03-07 16:08:52 -05:00
5412c34957 Merge pull request 'fix: change vlan definition from MaybeString to RawXml' (#245) from feat/opnsense-config-xml-support-vlan into master
Some checks failed
Run Check Script / check (push) Successful in 1m47s
Compile and package harmony_composer / package_harmony_composer (push) Failing after 2m7s
Reviewed-on: #245
2026-03-07 20:59:28 +00:00
787cc8feab Fix doc tests for harmony-k8s crate refactoring
All checks were successful
Run Check Script / check (pull_request) Successful in 2m6s
- Updated harmony-k8s doc tests to import from harmony_k8s instead of harmony
- Changed CloudNativePgOperatorScore::default() to default_openshift()

This ensures doc tests work correctly after moving K8sClient to the harmony-k8s crate.
2026-03-07 15:50:39 -05:00
ce041f495b fix(zitadel): include admin@zitadel.{host} username, secure password with symbol/number, and cert-manager TLS configuration
Some checks failed
Run Check Script / check (pull_request) Failing after 26s
Update Zitadel deployment to use correct username format (admin@zitadel.{host}), generate secure passwords with required complexity (uppercase, lowercase, digit, symbol), configure edge TLS termination for OpenShift, and add cert-manager annotations. Also refactor password generation to ensure all complexity requirements are met.
2026-03-07 15:29:26 -05:00
bfb86f63ce fix: xml field for vlan
All checks were successful
Run Check Script / check (pull_request) Successful in 1m31s
2026-03-07 11:29:44 -05:00
55de206523 fix: change vlan definition from MaybeString to RawXml
All checks were successful
Run Check Script / check (pull_request) Successful in 1m29s
2026-03-07 10:03:03 -05:00
64893a84f5 fix(node health endpoint): Setup sane timeouts for usage as a load balancer health check. The default k8s client timeout of 30 seconds caused haproxy health check to fail even though we still returned 200OK after 30 seconds
Some checks failed
Run Check Script / check (pull_request) Failing after 25s
2026-03-06 16:28:13 -05:00
f941672662 fix: Node readiness always fails open when kube api call fails on note status check
Some checks failed
Run Check Script / check (pull_request) Failing after 1m54s
2026-03-06 15:45:38 -05:00
a98113dd40 wip: zitadel ingress https not working yet
Some checks failed
Run Check Script / check (pull_request) Failing after 28s
2026-03-06 15:28:21 -05:00
5db1a31d33 ... 2026-03-06 15:24:33 -05:00
f5aac67af8 feat: k8s client works fine, added version config in zitadel and fix master key secret existence handling
Some checks failed
Run Check Script / check (pull_request) Failing after 32s
2026-03-06 15:15:35 -05:00
d7e5bf11d5 removing bad stuff I did this morning and trying to make it simple, and adding a couple tests 2026-03-06 14:41:08 -05:00
2e1f1b8447 feat: Refactor K8sClient into separate, publishable crate, and add zitadel example 2026-03-06 14:21:15 -05:00
2b157ad7fd feat: add a background loop checking the node status every X seconds. If NotReady for Y seconds, kill the router pod if there's one 2026-03-06 11:57:39 -05:00
a0c0905c3b wip: zitadel deployment 2026-03-06 10:56:48 -05:00
d920de34cf fix: configure health_check: None for public_services
All checks were successful
Run Check Script / check (pull_request) Successful in 1m35s
2026-03-05 14:55:00 -05:00
4276b9137b fix: put the hc on private_services, not public_services
All checks were successful
Run Check Script / check (pull_request) Successful in 1m32s
2026-03-05 14:35:33 -05:00
6ab88ab8d9 Merge branch 'master' into feat/okd_loadbalancer_betterhealthcheck 2026-03-04 10:46:57 -05:00
fe52f69473 Merge pull request 'feat/openbao_secret_manager' (#239) from feat/openbao_secret_manager into master
Some checks failed
Run Check Script / check (push) Successful in 1m35s
Compile and package harmony_composer / package_harmony_composer (push) Failing after 2m36s
Reviewed-on: #239
Reviewed-by: stremblay <stremblay@nationtech.io>
2026-03-04 15:06:15 +00:00
d8338ad12c wip(sso): Openbao deploys fine, not fully tested yet, zitadel wip
All checks were successful
Run Check Script / check (pull_request) Successful in 1m40s
2026-03-04 09:53:33 -05:00
ac9fedf853 wip(secret store): Fix openbao, refactor with rust client 2026-03-04 09:33:21 -05:00
fd3705e382 wip(secret store): openbao/vault store implementation 2026-03-04 09:33:21 -05:00
4840c7fdc2 Merge pull request 'feat/node-health-score' (#242) from feat/node-health-score into master
Some checks failed
Run Check Script / check (push) Successful in 1m51s
Compile and package harmony_composer / package_harmony_composer (push) Failing after 3m16s
Reviewed-on: #242
Reviewed-by: johnride <jg@nationtech.io>
2026-03-04 14:31:44 +00:00
20172a7801 removing another useless commented line
All checks were successful
Run Check Script / check (pull_request) Successful in 2m17s
2026-03-04 09:31:02 -05:00
6bb33c5845 remove useless comment
All checks were successful
Run Check Script / check (pull_request) Successful in 1m43s
2026-03-04 09:29:49 -05:00
d9357adad3 format code, fix interpert name
All checks were successful
Run Check Script / check (pull_request) Successful in 1m33s
2026-03-04 09:28:32 -05:00
a25ca86bdf wip: happy path is working
Some checks failed
Run Check Script / check (pull_request) Failing after 29s
2026-03-04 08:21:08 -05:00
646c5e723e feat: implementing node_health 2026-03-04 07:16:25 -05:00
69c382e8c6 Merge pull request 'feat(k8s): Can now apply resources of any scope. Kind of a hack leveraging the dynamic type under the hood but this is due to a limitation of kube-rs' (#241) from feat/k8s_apply_any_scope into master
Some checks failed
Run Check Script / check (push) Successful in 2m42s
Compile and package harmony_composer / package_harmony_composer (push) Failing after 4m15s
Reviewed-on: #241
Reviewed-by: stremblay <stremblay@nationtech.io>
2026-03-03 20:06:03 +00:00
dca764395d feat(k8s): Can now apply resources of any scope. Kind of a hack leveraging the dynamic type under the hood but this is due to a limitation of kube-rs
Some checks failed
Run Check Script / check (pull_request) Failing after 38s
2026-03-03 14:37:52 -05:00
53d0704a35 wip: okdload balancer using 1936 port http healthcheck
Some checks failed
Run Check Script / check (pull_request) Failing after 31s
2026-03-02 20:47:41 -05:00
2738985edb Merge pull request 'feat: New harmony node readiness mini project what exposes health of a node on port 25001' (#237) from feat/harmony-node-health-endpoint into master
Some checks failed
Run Check Script / check (push) Successful in 1m36s
Compile and package harmony_composer / package_harmony_composer (push) Failing after 3m35s
Reviewed-on: #237
2026-03-02 19:56:39 +00:00
d9a21bf94b feat: node readiness now supports a check query param with node_ready and okd_router_1936 options
All checks were successful
Run Check Script / check (pull_request) Successful in 1m51s
2026-03-02 14:55:28 -05:00
8f8bd34168 feat: Deployment is now happening in harmony-node-healthcheck namespace
All checks were successful
Run Check Script / check (pull_request) Successful in 1m42s
2026-02-26 16:39:26 -05:00
b5e971b3b6 feat: adding yaml to deploy k8s resources
All checks were successful
Run Check Script / check (pull_request) Successful in 1m37s
2026-02-26 16:36:16 -05:00
a1c0e0e246 fix: build docker default value
All checks were successful
Run Check Script / check (pull_request) Successful in 1m38s
2026-02-26 16:35:38 -05:00
d084cee8d5 doc(node-readiness): Fix README
All checks were successful
Run Check Script / check (pull_request) Successful in 1m38s
2026-02-26 16:33:12 -05:00
63ef1c0ea7 feat: New harmony node readiness mini project what exposes health of a node on port 25001
All checks were successful
Run Check Script / check (pull_request) Successful in 1m36s
2026-02-26 16:23:27 -05:00
d8ab9d52a4 fix:broken test
All checks were successful
Run Check Script / check (pull_request) Successful in 1m0s
2026-02-17 15:34:42 -05:00
2cb7aeefc0 fix: deploys replicated postgresql with site 2 as standby
Some checks failed
Run Check Script / check (pull_request) Failing after 1m8s
2026-02-17 15:02:00 -05:00
16016febcf wip: adding impl details for deploying connected replica cluster 2026-02-16 16:22:30 -05:00
e709de531d fix: added route building to failover topology 2026-02-13 16:08:05 -05:00
6ab0f3a6ab wip 2026-02-13 15:48:24 -05:00
724ab0b888 wip: removed hardcoding and added fn to trait tlsrouter 2026-02-13 15:18:23 -05:00
8b6ce8d069 fix: readded tokio retry to get ca cert for a nats cluster which was accidentally removed during a refactor
All checks were successful
Run Check Script / check (pull_request) Successful in 1m9s
2026-02-06 09:09:01 -05:00
98 changed files with 6774 additions and 4128 deletions

2616
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -2,7 +2,6 @@
resolver = "2"
members = [
"private_repos/*",
"examples/*",
"harmony",
"harmony_types",
"harmony_macros",
@@ -19,7 +18,7 @@ members = [
"adr/agent_discovery/mdns",
"brocade",
"harmony_agent",
"harmony_agent/deploy",
"harmony_agent/deploy", "harmony_node_readiness", "harmony-k8s", "examples/vllm",
]
[workspace.package]
@@ -38,6 +37,8 @@ tokio = { version = "1.40", features = [
"macros",
"rt-multi-thread",
] }
tokio-retry = "0.3.0"
tokio-util = "0.7.15"
cidr = { features = ["serde"], version = "0.2" }
russh = "0.45"
russh-keys = "0.45"

View File

@@ -1,8 +1,7 @@
use super::BrocadeClient;
use crate::{
BrocadeInfo, Error, ExecutionMode, InterSwitchLink, InterfaceInfo, MacAddressEntry,
PortChannelId, PortOperatingMode, SecurityLevel, parse_brocade_mac_address,
shell::BrocadeShell,
PortChannelId, PortOperatingMode, parse_brocade_mac_address, shell::BrocadeShell,
};
use async_trait::async_trait;

View File

@@ -8,7 +8,7 @@ use regex::Regex;
use crate::{
BrocadeClient, BrocadeInfo, Error, ExecutionMode, InterSwitchLink, InterfaceInfo,
InterfaceStatus, InterfaceType, MacAddressEntry, PortChannelId, PortOperatingMode,
SecurityLevel, parse_brocade_mac_address, shell::BrocadeShell,
parse_brocade_mac_address, shell::BrocadeShell,
};
#[derive(Debug)]

View File

@@ -1,8 +1,8 @@
use harmony::{
inventory::Inventory,
modules::cert_manager::{
capability::CertificateManagementConfig, score_cert_management::CertificateManagementScore,
score_certificate::CertificateScore, score_issuer::CertificateIssuerScore,
capability::CertificateManagementConfig, score_certificate::CertificateScore,
score_issuer::CertificateIssuerScore,
},
topology::K8sAnywhereTopology,
};

View File

@@ -10,9 +10,10 @@ publish = false
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
harmony_macros = { path = "../../harmony_macros" }
harmony-k8s = { path = "../../harmony-k8s" }
cidr.workspace = true
tokio.workspace = true
harmony_macros = { path = "../../harmony_macros" }
log.workspace = true
env_logger.workspace = true
url.workspace = true

View File

@@ -1,6 +1,6 @@
use std::time::Duration;
use harmony::topology::k8s::{DrainOptions, K8sClient};
use harmony_k8s::{DrainOptions, K8sClient};
use log::{info, trace};
#[tokio::main]

View File

@@ -10,9 +10,10 @@ publish = false
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
harmony_macros = { path = "../../harmony_macros" }
harmony-k8s = { path = "../../harmony-k8s" }
cidr.workspace = true
tokio.workspace = true
harmony_macros = { path = "../../harmony_macros" }
log.workspace = true
env_logger.workspace = true
url.workspace = true

View File

@@ -1,4 +1,4 @@
use harmony::topology::k8s::{DrainOptions, K8sClient, NodeFile};
use harmony_k8s::{K8sClient, NodeFile};
use log::{info, trace};
#[tokio::main]

View File

@@ -14,7 +14,6 @@ async fn main() {
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
// "default" namespace, 1 instance, 1Gi storage
},
hostname: "postgrestest.sto1.nationtech.io".to_string(),
};
harmony_cli::run(

View File

@@ -0,0 +1,16 @@
[package]
name = "example-node-health"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
tokio = { workspace = true }
harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true }
env_logger = { workspace = true }

View File

@@ -0,0 +1,17 @@
use harmony::{
inventory::Inventory, modules::node_health::NodeHealthScore, topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let node_health = NodeHealthScore {};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(node_health)],
None,
)
.await
.unwrap();
}

View File

@@ -6,7 +6,10 @@ use harmony::{
data::{FileContent, FilePath},
modules::{
inventory::HarmonyDiscoveryStrategy,
okd::{installation::OKDInstallationPipeline, ipxe::OKDIpxeScore},
okd::{
installation::OKDInstallationPipeline, ipxe::OKDIpxeScore,
load_balancer::OKDLoadBalancerScore,
},
},
score::Score,
topology::HAClusterTopology,
@@ -32,6 +35,7 @@ async fn main() {
scores
.append(&mut OKDInstallationPipeline::get_all_scores(HarmonyDiscoveryStrategy::MDNS).await);
scores.push(Box::new(OKDLoadBalancerScore::new(&topology)));
harmony_cli::run(inventory, topology, scores, None)
.await
.unwrap();

View File

@@ -1,63 +1,13 @@
use std::str::FromStr;
use harmony::{
inventory::Inventory,
modules::helm::chart::{HelmChartScore, HelmRepository, NonBlankString},
topology::K8sAnywhereTopology,
inventory::Inventory, modules::openbao::OpenbaoScore, topology::K8sAnywhereTopology,
};
use harmony_macros::hurl;
#[tokio::main]
async fn main() {
let values_yaml = Some(
r#"server:
standalone:
enabled: true
config: |
listener "tcp" {
tls_disable = true
address = "[::]:8200"
cluster_address = "[::]:8201"
}
storage "file" {
path = "/openbao/data"
}
service:
enabled: true
dataStorage:
enabled: true
size: 10Gi
storageClass: null
accessMode: ReadWriteOnce
auditStorage:
enabled: true
size: 10Gi
storageClass: null
accessMode: ReadWriteOnce"#
.to_string(),
);
let openbao = HelmChartScore {
namespace: Some(NonBlankString::from_str("openbao").unwrap()),
release_name: NonBlankString::from_str("openbao").unwrap(),
chart_name: NonBlankString::from_str("openbao/openbao").unwrap(),
chart_version: None,
values_overrides: None,
values_yaml,
create_namespace: true,
install_only: true,
repository: Some(HelmRepository::new(
"openbao".to_string(),
hurl!("https://openbao.github.io/openbao-helm"),
true,
)),
let openbao = OpenbaoScore {
host: "openbao.sebastien.sto1.nationtech.io".to_string(),
};
// TODO exec pod commands to initialize secret store if not already done
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),

View File

@@ -1,5 +1,3 @@
use std::str::FromStr;
use harmony::{
inventory::Inventory,
modules::{k8s::apps::OperatorHubCatalogSourceScore, postgresql::CloudNativePgOperatorScore},
@@ -9,7 +7,7 @@ use harmony::{
#[tokio::main]
async fn main() {
let operatorhub_catalog = OperatorHubCatalogSourceScore::default();
let cnpg_operator = CloudNativePgOperatorScore::default();
let cnpg_operator = CloudNativePgOperatorScore::default_openshift();
harmony_cli::run(
Inventory::autoload(),

View File

@@ -1,22 +1,13 @@
use std::{
net::{IpAddr, Ipv4Addr},
sync::Arc,
};
use std::sync::Arc;
use async_trait::async_trait;
use cidr::Ipv4Cidr;
use harmony::{
executors::ExecutorError,
hardware::{HostCategory, Location, PhysicalHost, SwitchGroup},
infra::opnsense::OPNSenseManagementInterface,
inventory::Inventory,
modules::opnsense::node_exporter::NodeExporterScore,
topology::{
HAClusterTopology, LogicalHost, PreparationError, PreparationOutcome, Topology,
UnmanagedRouter, node_exporter::NodeExporter,
},
topology::{PreparationError, PreparationOutcome, Topology, node_exporter::NodeExporter},
};
use harmony_macros::{ip, ipv4, mac_address};
use harmony_macros::ip;
#[derive(Debug)]
struct OpnSenseTopology {

View File

@@ -1,8 +1,7 @@
use harmony::{
inventory::Inventory,
modules::postgresql::{
K8sPostgreSQLScore, PostgreSQLConnectionScore, PublicPostgreSQLScore,
capability::PostgreSQLConfig,
PostgreSQLConnectionScore, PublicPostgreSQLScore, capability::PostgreSQLConfig,
},
topology::K8sAnywhereTopology,
};
@@ -16,7 +15,6 @@ async fn main() {
..Default::default() // Use harmony defaults, they are based on CNPG's default values :
// 1 instance, 1Gi storage
},
hostname: "postgrestest.sto1.nationtech.io".to_string(),
};
let test_connection = PostgreSQLConnectionScore {

View File

@@ -1,4 +1,4 @@
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use std::{path::PathBuf, sync::Arc};
use harmony::{
inventory::Inventory,

View File

@@ -1,4 +1,4 @@
use std::{collections::HashMap, path::PathBuf, sync::Arc};
use std::{path::PathBuf, sync::Arc};
use harmony::{
inventory::Inventory,

View File

@@ -44,6 +44,7 @@ fn build_large_score() -> LoadBalancerScore {
],
listening_port: SocketAddr::V4(SocketAddrV4::new(ipv4!("192.168.0.0"), 49387)),
health_check: Some(HealthCheck::HTTP(
Some(1993),
"/some_long_ass_path_to_see_how_it_is_displayed_but_it_has_to_be_even_longer"
.to_string(),
HttpMethod::GET,

15
examples/vllm/Cargo.toml Normal file
View File

@@ -0,0 +1,15 @@
[package]
name = "vllm"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
k8s-openapi = { workspace = true }
tokio = { workspace = true }
log = { workspace = true }
env_logger = { workspace = true }

523
examples/vllm/src/main.rs Normal file
View File

@@ -0,0 +1,523 @@
//! vLLM Deployment Example for Qwen3.5-27B-FP8 on NVIDIA RTX 5090
//!
//! This example deploys vLLM serving Qwen3.5-27B with FP8 quantization,
//! optimized for single RTX 5090 (32GB VRAM) with tool calling support.
//!
//! # Architecture & Memory Constraints
//!
//! **Model Details:**
//! - Parameters: 27B (dense, not sparse/MoE)
//! - Quantization: FP8 (8-bit weights)
//! - Model size: ~27-28GB in memory
//! - Native context: 262,144 tokens (will NOT fit in 32GB VRAM)
//!
//! **VRAM Budget for RTX 5090 (32GB):**
//! - Model weights (FP8): ~27GB
//! - Framework overhead: ~1-2GB
//! - KV cache: ~2-3GB (for 16k context)
//! - CUDA context: ~500MB
//! - Temporary buffers: ~500MB
//! - **Total: ~31-33GB** (tight fit, leaves minimal headroom)
//!
//! # OpenShift/OKD Requirements
//!
//! **SCC (Security Context Constraint) Setup:**
//!
//! The official vLLM container runs as root and writes to `/root/.cache/huggingface`.
//! On OpenShift/OKD with the default restricted SCC, containers run as arbitrary UIDs
//! and cannot write to `/root`. For testing, grant the `anyuid` SCC:
//!
//! ```bash
//! # As cluster admin, grant anyuid SCC to the namespace's service account:
//! oc adm policy add-scc-to-user anyuid -z default -n vllm-qwen
//! ```
//!
//! This allows pods in the `vllm-qwen` namespace to run as root (UID 0).
//! For production, consider building a custom vLLM image that runs as non-root.
//!
//! # Critical Configuration Notes
//!
//! 1. **GPU_MEMORY_UTILIZATION=1.0**: Maximum GPU memory allocation.
//! NEVER decrease this for dense models - CPU offloading destroys performance
//! (100-1000x slower) for models where every parameter is used during inference.
//!
//! 2. **MAX_MODEL_LEN=16384**: Conservative context length that fits in available VRAM.
//! Agentic workflows with long tool call histories will need careful context management.
//!
//! 3. **--language-model-only**: Skips loading the vision encoder, saving ~1-2GB VRAM.
//! Essential for fitting the model in 32GB VRAM.
//!
//! 4. **PVC Size**: 50Gi for HuggingFace cache. Qwen3.5-27B-FP8 is ~30GB.
//!
//! # Performance Expectations
//!
//! - Single token latency: ~50-100ms (no CPU offloading)
//! - With CPU offloading: ~5-50 seconds per token (unusable for real-time inference)
//! - Throughput: ~10-20 tokens/second (single stream, no batching)
//!
//! # Next Steps for Production
//!
//! To increase context length:
//! 1. Monitor GPU memory: `kubectl exec -it deployment/qwen3-5-27b -- nvidia-smi dmon -s u`
//! 2. If stable, increase MAX_MODEL_LEN (try 32768, then 65536)
//! 3. If OOM: revert to lower value
//!
//! For full 262k context, consider:
//! - Multi-GPU setup with tensor parallelism (--tensor-parallel-size 8)
//! - Or use a smaller model (Qwen3.5-7B-FP8)
use std::collections::BTreeMap;
use harmony::{
inventory::Inventory,
modules::{
k8s::resource::K8sResourceScore,
okd::{
crd::route::{RoutePort, RouteSpec, RouteTargetReference, TLSConfig},
route::OKDRouteScore,
},
},
score::Score,
topology::{K8sAnywhereTopology, TlsRouter},
};
use k8s_openapi::{
api::{
apps::v1::{Deployment, DeploymentSpec, DeploymentStrategy},
core::v1::{
Container, ContainerPort, EmptyDirVolumeSource, EnvVar, EnvVarSource,
HTTPGetAction, PersistentVolumeClaim, PersistentVolumeClaimSpec,
PersistentVolumeClaimVolumeSource, PodSpec, PodTemplateSpec, Probe,
ResourceRequirements, Secret, SecretKeySelector, SecretVolumeSource, Service,
ServicePort, ServiceSpec, Volume, VolumeMount, VolumeResourceRequirements,
},
},
apimachinery::pkg::{
api::resource::Quantity,
apis::meta::v1::{LabelSelector, ObjectMeta},
util::intstr::IntOrString,
},
ByteString,
};
use log::info;
const NAMESPACE: &str = "vllm-qwen";
const MODEL_NAME: &str = "Qwen/Qwen3.5-27B-FP8";
const DEPLOYMENT_NAME: &str = "qwen3-5-27b";
const SERVICE_NAME: &str = DEPLOYMENT_NAME;
const ROUTE_NAME: &str = DEPLOYMENT_NAME;
const PVC_NAME: &str = "huggingface-cache";
const SECRET_NAME: &str = "hf-token-secret";
const VLLM_IMAGE: &str = "vllm/vllm-openai:latest";
const SERVICE_PORT: u16 = 8000;
const TARGET_PORT: u16 = 8000;
/// Maximum context length for the model (in tokens).
///
/// **Impact on VRAM:**
/// - Qwen3.5-27B uses per-token KV cache storage for the context window
/// - Larger context = more KV cache memory required
/// - Approximate KV cache per token: ~32KB for FP8 (very rough estimate)
/// - 16k tokens ≈ 0.5-1GB KV cache
/// - 262k tokens ≈ 8-16GB KV cache (native context length - will NOT fit in 32GB VRAM)
///
/// **Performance Impact:**
/// - Context length directly impacts memory for storing conversation history
/// - Agentic workflows with long tool call histories benefit from more context
/// - If context > available VRAM, vLLM will OOM and fail to start
///
/// **Recommendations for RTX 5090 (32GB):**
/// - Start with 16384 (conservative, should work)
/// - If no OOM, try 32768 (better for agentic workflows)
/// - Monitor GPU memory with `nvidia-smi` during operation
const MAX_MODEL_LEN: i64 = 16384;
/// Fraction of GPU memory to allocate for the model (0.0 to 1.0).
///
/// **CRITICAL WARNING: This is a dense model!**
/// Qwen3.5-27B-FP8 is NOT a sparse/mixture-of-experts model. All 27B parameters
/// are active during inference. CPU offloading will DESTROY performance.
///
/// **What this parameter controls:**
/// - Controls how much of GPU memory vLLM pre-allocates for:
/// 1. Model weights (~27GB for FP8 quantization)
/// 2. KV cache for context window
/// 3. Activation buffers for inference
/// 4. Runtime overhead
///
/// **VRAM Allocation Example:**
/// - GPU: 32GB RTX 5090
/// - GPU_MEMORY_UTILIZATION: 0.95
/// - vLLM will try to use: 32GB * 0.95 = 30.4GB
/// - Model weights: ~27-28GB
/// - Remaining for KV cache + runtime: ~2-3GB
///
/// **If set too LOW (e.g., 0.7):**
/// - vLLM restricts itself to 32GB * 0.7 = 22.4GB
/// - Model weights alone need ~27GB
/// - vLLM will OFFLOAD model weights to CPU memory
/// - Performance: **100-1000x slower** (single token generation can take seconds instead of milliseconds)
/// - This is catastrophic for a dense model where every layer needs all parameters
///
/// **If set too HIGH (e.g., 0.99):**
/// - vLLM tries to allocate nearly all GPU memory
/// - Risk: CUDA OOM if any other process needs GPU memory
/// - Risk: KV cache allocation fails during inference
/// - System instability
///
/// **Current Setting: 0.95**
/// - Leaves 5% buffer (1.6GB) for CUDA overhead, system processes
/// - Maximum allocation for model + KV cache: ~30.4GB
/// - Should leave enough headroom for:
/// - CUDA context: ~500MB
/// - Temporary buffers: ~500MB
/// - Safety margin: ~600MB
///
/// **How to tune:**
/// 1. Start with 0.95 (current setting)
/// 2. Monitor with `nvidia-smi dmon -s u` during operation
/// 3. If OOM during inference: reduce MAX_MODEL_LEN first
/// 4. If stable: try increasing MAX_MODEL_LEN before increasing this
/// 5. Only increase this if you're certain no other GPU processes run
///
/// **NEVER decrease this for dense models!**
/// If model doesn't fit, use a smaller model or quantization, not CPU offloading.
const GPU_MEMORY_UTILIZATION : f32 = 1.0;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
env_logger::init();
info!("Deploying vLLM with Qwen3.5-27B-FP8 model");
info!("Configuration:");
info!(" Model: {}", MODEL_NAME);
info!(" Max context length: {} tokens", MAX_MODEL_LEN);
info!(" GPU memory utilization: {}", GPU_MEMORY_UTILIZATION);
info!(" Language model only: true");
info!(" Tool calling enabled: true");
let topology = K8sAnywhereTopology::from_env();
let domain = topology
.get_internal_domain()
.await
.ok()
.flatten()
.unwrap_or_else(|| "cluster.local".to_string());
let host = format!("{}-{}.apps.{}", SERVICE_NAME, NAMESPACE, domain);
info!("Creating route with host: {}", host);
let scores: Vec<Box<dyn Score<K8sAnywhereTopology>>> = vec![
create_namespace(),
create_pvc(),
create_secret(),
create_deployment(),
create_service(),
create_route(&host),
];
harmony_cli::run(Inventory::autoload(), topology, scores, None)
.await
.map_err(|e| format!("Failed to deploy: {}", e))?;
info!("Successfully deployed vLLM with Qwen3.5-27B-FP8");
info!("Access the API at: http://{}.apps.<cluster-domain>", SERVICE_NAME);
Ok(())
}
fn create_namespace() -> Box<dyn Score<K8sAnywhereTopology>> {
use k8s_openapi::api::core::v1::Namespace;
let namespace = Namespace {
metadata: ObjectMeta {
name: Some(NAMESPACE.to_string()),
..Default::default()
},
spec: None,
status: None,
};
Box::new(K8sResourceScore::single(namespace, None))
}
fn create_pvc() -> Box<dyn Score<K8sAnywhereTopology>> {
let pvc = PersistentVolumeClaim {
metadata: ObjectMeta {
name: Some(PVC_NAME.to_string()),
namespace: Some(NAMESPACE.to_string()),
..Default::default()
},
spec: Some(PersistentVolumeClaimSpec {
access_modes: Some(vec!["ReadWriteOnce".to_string()]),
resources: Some(VolumeResourceRequirements {
requests: Some(BTreeMap::from([(
"storage".to_string(),
Quantity("50Gi".to_string()),
)])),
limits: None,
}),
..Default::default()
}),
status: None,
};
Box::new(K8sResourceScore::single(
pvc,
Some(NAMESPACE.to_string()),
))
}
fn create_secret() -> Box<dyn Score<K8sAnywhereTopology>> {
let mut data = BTreeMap::new();
data.insert(
"token".to_string(),
ByteString("".to_string().into_bytes()),
);
let secret = Secret {
metadata: ObjectMeta {
name: Some(SECRET_NAME.to_string()),
namespace: Some(NAMESPACE.to_string()),
..Default::default()
},
data: Some(data),
immutable: Some(false),
type_: Some("Opaque".to_string()),
string_data: None,
};
Box::new(K8sResourceScore::single(
secret,
Some(NAMESPACE.to_string()),
))
}
fn create_deployment() -> Box<dyn Score<K8sAnywhereTopology>> {
let deployment = Deployment {
metadata: ObjectMeta {
name: Some(DEPLOYMENT_NAME.to_string()),
namespace: Some(NAMESPACE.to_string()),
labels: Some(BTreeMap::from([(
"app".to_string(),
DEPLOYMENT_NAME.to_string(),
)])),
..Default::default()
},
spec: Some(DeploymentSpec {
replicas: Some(1),
selector: LabelSelector {
match_labels: Some(BTreeMap::from([(
"app".to_string(),
DEPLOYMENT_NAME.to_string(),
)])),
..Default::default()
},
strategy: Some(DeploymentStrategy {
type_: Some("Recreate".to_string()),
..Default::default()
}),
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(BTreeMap::from([(
"app".to_string(),
DEPLOYMENT_NAME.to_string(),
)])),
..Default::default()
}),
spec: Some(PodSpec {
node_selector: Some(BTreeMap::from([(
"nvidia.com/gpu.product".to_string(),
"NVIDIA-GeForce-RTX-5090".to_string(),
)])),
volumes: Some(vec![
Volume {
name: "cache-volume".to_string(),
persistent_volume_claim: Some(PersistentVolumeClaimVolumeSource {
claim_name: PVC_NAME.to_string(),
read_only: Some(false),
}),
..Default::default()
},
Volume {
name: "shm".to_string(),
empty_dir: Some(EmptyDirVolumeSource {
medium: Some("Memory".to_string()),
size_limit: Some(Quantity("4Gi".to_string())),
}),
..Default::default()
},
Volume {
name: "hf-token".to_string(),
secret: Some(SecretVolumeSource {
secret_name: Some(SECRET_NAME.to_string()),
optional: Some(true),
..Default::default()
}),
..Default::default()
},
]),
containers: vec![Container {
name: DEPLOYMENT_NAME.to_string(),
image: Some(VLLM_IMAGE.to_string()),
command: Some(vec!["/bin/sh".to_string(), "-c".to_string()]),
args: Some(vec![build_vllm_command()]),
env: Some(vec![
EnvVar {
name: "HF_TOKEN".to_string(),
value_from: Some(EnvVarSource {
secret_key_ref: Some(SecretKeySelector {
key: "token".to_string(),
name: SECRET_NAME.to_string(),
optional: Some(true),
}),
..Default::default()
}),
value: None,
},
EnvVar {
name: "VLLM_WORKER_MULTIPROC_METHOD".to_string(),
value: Some("spawn".to_string()),
value_from: None,
},
]),
ports: Some(vec![ContainerPort {
container_port: SERVICE_PORT as i32,
protocol: Some("TCP".to_string()),
..Default::default()
}]),
resources: Some(ResourceRequirements {
limits: Some(BTreeMap::from([
("cpu".to_string(), Quantity("10".to_string())),
("memory".to_string(), Quantity("30Gi".to_string())),
("nvidia.com/gpu".to_string(), Quantity("1".to_string())),
])),
requests: Some(BTreeMap::from([
("cpu".to_string(), Quantity("2".to_string())),
("memory".to_string(), Quantity("10Gi".to_string())),
("nvidia.com/gpu".to_string(), Quantity("1".to_string())),
])),
claims: None,
}),
volume_mounts: Some(vec![
VolumeMount {
name: "cache-volume".to_string(),
mount_path: "/root/.cache/huggingface".to_string(),
read_only: Some(false),
..Default::default()
},
VolumeMount {
name: "shm".to_string(),
mount_path: "/dev/shm".to_string(),
..Default::default()
},
VolumeMount {
name: "hf-token".to_string(),
mount_path: "/etc/secrets/hf-token".to_string(),
read_only: Some(true),
..Default::default()
},
]),
liveness_probe: Some(Probe {
http_get: Some(HTTPGetAction {
path: Some("/health".to_string()),
port: IntOrString::Int(SERVICE_PORT as i32),
..Default::default()
}),
initial_delay_seconds: Some(300),
period_seconds: Some(30),
..Default::default()
}),
readiness_probe: Some(Probe {
http_get: Some(HTTPGetAction {
path: Some("/health".to_string()),
port: IntOrString::Int(SERVICE_PORT as i32),
..Default::default()
}),
initial_delay_seconds: Some(120),
period_seconds: Some(10),
..Default::default()
}),
..Default::default()
}],
..Default::default()
}),
},
..Default::default()
}),
status: None,
};
Box::new(K8sResourceScore::single(
deployment,
Some(NAMESPACE.to_string()),
))
}
fn build_vllm_command() -> String {
format!(
"vllm serve {} \
--port {} \
--max-model-len {} \
--gpu-memory-utilization {} \
--reasoning-parser qwen3 \
--enable-auto-tool-choice \
--tool-call-parser qwen3_coder \
--language-model-only",
MODEL_NAME, SERVICE_PORT, MAX_MODEL_LEN, GPU_MEMORY_UTILIZATION
)
}
fn create_service() -> Box<dyn Score<K8sAnywhereTopology>> {
let service = Service {
metadata: ObjectMeta {
name: Some(SERVICE_NAME.to_string()),
namespace: Some(NAMESPACE.to_string()),
..Default::default()
},
spec: Some(ServiceSpec {
ports: Some(vec![ServicePort {
name: Some("http".to_string()),
port: SERVICE_PORT as i32,
protocol: Some("TCP".to_string()),
target_port: Some(IntOrString::Int(TARGET_PORT as i32)),
..Default::default()
}]),
selector: Some(BTreeMap::from([(
"app".to_string(),
DEPLOYMENT_NAME.to_string(),
)])),
type_: Some("ClusterIP".to_string()),
..Default::default()
}),
status: None,
};
Box::new(K8sResourceScore::single(
service,
Some(NAMESPACE.to_string()),
))
}
fn create_route(host: &str) -> Box<dyn Score<K8sAnywhereTopology>> {
let route_spec = RouteSpec {
to: RouteTargetReference {
kind: "Service".to_string(),
name: SERVICE_NAME.to_string(),
weight: Some(100),
},
host: Some(host.to_string()),
port: Some(RoutePort {
target_port: SERVICE_PORT as u16,
}),
tls: Some(TLSConfig {
termination: "edge".to_string(),
insecure_edge_termination_policy: Some("Redirect".to_string()),
..Default::default()
}),
wildcard_policy: None,
..Default::default()
};
Box::new(OKDRouteScore::new(ROUTE_NAME, NAMESPACE, route_spec))
}

View File

@@ -0,0 +1,14 @@
[package]
name = "example-zitadel"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_macros = { path = "../../harmony_macros" }
harmony_types = { path = "../../harmony_types" }
tokio.workspace = true
url.workspace = true

View File

@@ -0,0 +1,20 @@
use harmony::{
inventory::Inventory, modules::zitadel::ZitadelScore, topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let zitadel = ZitadelScore {
host: "sso.sto1.nationtech.io".to_string(),
zitadel_version: "v4.12.1".to_string(),
};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(zitadel)],
None,
)
.await
.unwrap();
}

Binary file not shown.

23
harmony-k8s/Cargo.toml Normal file
View File

@@ -0,0 +1,23 @@
[package]
name = "harmony-k8s"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
kube.workspace = true
k8s-openapi.workspace = true
tokio.workspace = true
tokio-retry.workspace = true
serde.workspace = true
serde_json.workspace = true
serde_yaml.workspace = true
log.workspace = true
similar.workspace = true
reqwest.workspace = true
url.workspace = true
inquire.workspace = true
[dev-dependencies]
pretty_assertions.workspace = true

593
harmony-k8s/src/apply.rs Normal file
View File

@@ -0,0 +1,593 @@
use kube::{
Client, Error, Resource,
api::{
Api, ApiResource, DynamicObject, GroupVersionKind, Patch, PatchParams, PostParams,
ResourceExt,
},
core::ErrorResponse,
discovery::Scope,
error::DiscoveryError,
};
use log::{debug, error, trace, warn};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::Value;
use similar::TextDiff;
use url::Url;
use crate::client::K8sClient;
use crate::helper;
use crate::types::WriteMode;
/// The field-manager token sent with every server-side apply request.
pub const FIELD_MANAGER: &str = "harmony-k8s";
// ── Private helpers ──────────────────────────────────────────────────────────
/// Serialise any `Serialize` payload to a [`DynamicObject`] via JSON.
fn to_dynamic<T: Serialize>(payload: &T) -> Result<DynamicObject, Error> {
serde_json::from_value(serde_json::to_value(payload).map_err(Error::SerdeError)?)
.map_err(Error::SerdeError)
}
/// Fetch the current resource, display a unified diff against `payload`, and
/// return `()`. All output goes to stdout (same behaviour as before).
///
/// A 404 is treated as "resource would be created" — not an error.
async fn show_dry_run<T: Serialize>(
api: &Api<DynamicObject>,
name: &str,
payload: &T,
) -> Result<(), Error> {
let new_yaml = serde_yaml::to_string(payload)
.unwrap_or_else(|_| "Failed to serialize new resource".to_string());
match api.get(name).await {
Ok(current) => {
println!("\nDry-run for resource: '{name}'");
let mut current_val = serde_yaml::to_value(&current).unwrap_or(serde_yaml::Value::Null);
if let Some(map) = current_val.as_mapping_mut() {
map.remove(&serde_yaml::Value::String("status".to_string()));
}
let current_yaml = serde_yaml::to_string(&current_val)
.unwrap_or_else(|_| "Failed to serialize current resource".to_string());
if current_yaml == new_yaml {
println!("No changes detected.");
} else {
println!("Changes detected:");
let diff = TextDiff::from_lines(&current_yaml, &new_yaml);
for change in diff.iter_all_changes() {
let sign = match change.tag() {
similar::ChangeTag::Delete => "-",
similar::ChangeTag::Insert => "+",
similar::ChangeTag::Equal => " ",
};
print!("{sign}{change}");
}
}
Ok(())
}
Err(Error::Api(ErrorResponse { code: 404, .. })) => {
println!("\nDry-run for new resource: '{name}'");
println!("Resource does not exist. Would be created:");
for line in new_yaml.lines() {
println!("+{line}");
}
Ok(())
}
Err(e) => {
error!("Failed to fetch resource '{name}' for dry-run: {e}");
Err(e)
}
}
}
/// Execute the real (non-dry-run) apply, respecting [`WriteMode`].
async fn do_apply<T: Serialize + std::fmt::Debug>(
api: &Api<DynamicObject>,
name: &str,
payload: &T,
patch_params: &PatchParams,
write_mode: &WriteMode,
) -> Result<DynamicObject, Error> {
match write_mode {
WriteMode::CreateOrUpdate => {
// TODO refactor this arm to perform self.update and if fail with 404 self.create
// This will avoid the repetition of the api.patch and api.create calls within this
// function body. This makes the code more maintainable
match api.patch(name, patch_params, &Patch::Apply(payload)).await {
Ok(obj) => Ok(obj),
Err(Error::Api(ErrorResponse { code: 404, .. })) => {
debug!("Resource '{name}' not found via SSA, falling back to POST");
let dyn_obj = to_dynamic(payload)?;
api.create(&PostParams::default(), &dyn_obj)
.await
.map_err(|e| {
error!("Failed to create '{name}': {e}");
e
})
}
Err(e) => {
error!("Failed to apply '{name}': {e}");
Err(e)
}
}
}
WriteMode::Create => {
let dyn_obj = to_dynamic(payload)?;
api.create(&PostParams::default(), &dyn_obj)
.await
.map_err(|e| {
error!("Failed to create '{name}': {e}");
e
})
}
WriteMode::Update => match api.patch(name, patch_params, &Patch::Apply(payload)).await {
Ok(obj) => Ok(obj),
Err(Error::Api(ErrorResponse { code: 404, .. })) => Err(Error::Api(ErrorResponse {
code: 404,
message: format!("Resource '{name}' not found and WriteMode is UpdateOnly"),
reason: "NotFound".to_string(),
status: "Failure".to_string(),
})),
Err(e) => {
error!("Failed to update '{name}': {e}");
Err(e)
}
},
}
}
// ── Public API ───────────────────────────────────────────────────────────────
impl K8sClient {
/// Server-side apply: create if absent, update if present.
/// Equivalent to `kubectl apply`.
pub async fn apply<K>(&self, resource: &K, namespace: Option<&str>) -> Result<K, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + Serialize,
<K as Resource>::DynamicType: Default,
{
self.apply_with_strategy(resource, namespace, WriteMode::CreateOrUpdate)
.await
}
/// POST only — returns an error if the resource already exists.
pub async fn create<K>(&self, resource: &K, namespace: Option<&str>) -> Result<K, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + Serialize,
<K as Resource>::DynamicType: Default,
{
self.apply_with_strategy(resource, namespace, WriteMode::Create)
.await
}
/// Server-side apply only — returns an error if the resource does not exist.
pub async fn update<K>(&self, resource: &K, namespace: Option<&str>) -> Result<K, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + Serialize,
<K as Resource>::DynamicType: Default,
{
self.apply_with_strategy(resource, namespace, WriteMode::Update)
.await
}
pub async fn apply_with_strategy<K>(
&self,
resource: &K,
namespace: Option<&str>,
write_mode: WriteMode,
) -> Result<K, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + Serialize,
<K as Resource>::DynamicType: Default,
{
debug!(
"apply_with_strategy: {:?} ns={:?}",
resource.meta().name,
namespace
);
trace!("{:#}", serde_json::to_value(resource).unwrap_or_default());
let dyntype = K::DynamicType::default();
let gvk = GroupVersionKind {
group: K::group(&dyntype).to_string(),
version: K::version(&dyntype).to_string(),
kind: K::kind(&dyntype).to_string(),
};
let discovery = self.discovery().await?;
let (ar, caps) = discovery.resolve_gvk(&gvk).ok_or_else(|| {
Error::Discovery(DiscoveryError::MissingResource(format!(
"Cannot resolve GVK: {gvk:?}"
)))
})?;
let effective_ns = if caps.scope == Scope::Cluster {
None
} else {
namespace.or_else(|| resource.meta().namespace.as_deref())
};
let api: Api<DynamicObject> =
get_dynamic_api(ar, caps, self.client.clone(), effective_ns, false);
let name = resource
.meta()
.name
.as_deref()
.expect("Kubernetes resource must have a name");
if self.dry_run {
show_dry_run(&api, name, resource).await?;
return Ok(resource.clone());
}
let patch_params = PatchParams::apply(FIELD_MANAGER);
do_apply(&api, name, resource, &patch_params, &write_mode)
.await
.and_then(helper::dyn_to_typed)
}
/// Applies resources in order, one at a time
pub async fn apply_many<K>(&self, resources: &[K], ns: Option<&str>) -> Result<Vec<K>, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + Serialize,
<K as Resource>::DynamicType: Default,
{
let mut result = Vec::new();
for r in resources.iter() {
let res = self.apply(r, ns).await;
if res.is_err() {
// NOTE: this may log sensitive data; downgrade to debug if needed.
warn!(
"Failed to apply k8s resource: {}",
serde_json::to_string_pretty(r).map_err(Error::SerdeError)?
);
}
result.push(res?);
}
Ok(result)
}
/// Apply a [`DynamicObject`] resource using server-side apply.
pub async fn apply_dynamic(
&self,
resource: &DynamicObject,
namespace: Option<&str>,
force_conflicts: bool,
) -> Result<DynamicObject, Error> {
trace!("apply_dynamic {resource:#?} ns={namespace:?} force={force_conflicts}");
let discovery = self.discovery().await?;
let type_meta = resource.types.as_ref().ok_or_else(|| {
Error::BuildRequest(kube::core::request::Error::Validation(
"DynamicObject must have types (apiVersion and kind)".to_string(),
))
})?;
let gvk = GroupVersionKind::try_from(type_meta).map_err(|_| {
Error::BuildRequest(kube::core::request::Error::Validation(format!(
"Invalid GVK in DynamicObject: {type_meta:?}"
)))
})?;
let (ar, caps) = discovery.resolve_gvk(&gvk).ok_or_else(|| {
Error::Discovery(DiscoveryError::MissingResource(format!(
"Cannot resolve GVK: {gvk:?}"
)))
})?;
let effective_ns = if caps.scope == Scope::Cluster {
None
} else {
namespace.or_else(|| resource.metadata.namespace.as_deref())
};
let api = get_dynamic_api(ar, caps, self.client.clone(), effective_ns, false);
let name = resource.metadata.name.as_deref().ok_or_else(|| {
Error::BuildRequest(kube::core::request::Error::Validation(
"DynamicObject must have metadata.name".to_string(),
))
})?;
debug!(
"apply_dynamic kind={:?} name='{name}' ns={effective_ns:?}",
resource.types.as_ref().map(|t| &t.kind),
);
// NOTE would be nice to improve cohesion between the dynamic and typed apis and avoid copy
// pasting the dry_run and some more logic
if self.dry_run {
show_dry_run(&api, name, resource).await?;
return Ok(resource.clone());
}
let mut patch_params = PatchParams::apply(FIELD_MANAGER);
patch_params.force = force_conflicts;
do_apply(
&api,
name,
resource,
&patch_params,
&WriteMode::CreateOrUpdate,
)
.await
}
pub async fn apply_dynamic_many(
&self,
resources: &[DynamicObject],
namespace: Option<&str>,
force_conflicts: bool,
) -> Result<Vec<DynamicObject>, Error> {
let mut result = Vec::new();
for r in resources.iter() {
result.push(self.apply_dynamic(r, namespace, force_conflicts).await?);
}
Ok(result)
}
pub async fn apply_yaml_many(
&self,
#[allow(clippy::ptr_arg)] yaml: &Vec<serde_yaml::Value>,
ns: Option<&str>,
) -> Result<(), Error> {
for y in yaml.iter() {
self.apply_yaml(y, ns).await?;
}
Ok(())
}
pub async fn apply_yaml(
&self,
yaml: &serde_yaml::Value,
ns: Option<&str>,
) -> Result<(), Error> {
// NOTE wouldn't it be possible to parse this into a DynamicObject and simply call
// apply_dynamic instead of reimplementing api interactions?
let obj: DynamicObject =
serde_yaml::from_value(yaml.clone()).expect("YAML must deserialise to DynamicObject");
let name = obj.metadata.name.as_ref().expect("YAML must have a name");
let api_version = yaml["apiVersion"].as_str().expect("missing apiVersion");
let kind = yaml["kind"].as_str().expect("missing kind");
let mut it = api_version.splitn(2, '/');
let first = it.next().unwrap();
let (g, v) = match it.next() {
Some(second) => (first, second),
None => ("", first),
};
let api_resource = ApiResource::from_gvk(&GroupVersionKind::gvk(g, v, kind));
let namespace = ns.unwrap_or_else(|| {
obj.metadata
.namespace
.as_deref()
.expect("YAML must have a namespace when ns is not provided")
});
let api: Api<DynamicObject> =
Api::namespaced_with(self.client.clone(), namespace, &api_resource);
println!("Applying '{name}' in namespace '{namespace}'...");
let patch_params = PatchParams::apply(FIELD_MANAGER);
let result = api.patch(name, &patch_params, &Patch::Apply(&obj)).await?;
println!("Successfully applied '{}'.", result.name_any());
Ok(())
}
/// Equivalent to `kubectl apply -f <url>`.
pub async fn apply_url(&self, url: Url, ns: Option<&str>) -> Result<(), Error> {
let patch_params = PatchParams::apply(FIELD_MANAGER);
let discovery = self.discovery().await?;
let yaml = reqwest::get(url)
.await
.expect("Could not fetch URL")
.text()
.await
.expect("Could not read response body");
for doc in multidoc_deserialize(&yaml).expect("Failed to parse YAML from URL") {
let obj: DynamicObject =
serde_yaml::from_value(doc).expect("YAML document is not a valid object");
let namespace = obj.metadata.namespace.as_deref().or(ns);
let type_meta = obj.types.as_ref().expect("Object is missing TypeMeta");
let gvk =
GroupVersionKind::try_from(type_meta).expect("Object has invalid GroupVersionKind");
let name = obj.name_any();
if let Some((ar, caps)) = discovery.resolve_gvk(&gvk) {
let api = get_dynamic_api(ar, caps, self.client.clone(), namespace, false);
trace!(
"Applying {}:\n{}",
gvk.kind,
serde_yaml::to_string(&obj).unwrap_or_default()
);
let data: Value = serde_json::to_value(&obj).expect("serialisation failed");
let _r = api.patch(&name, &patch_params, &Patch::Apply(data)).await?;
debug!("Applied {} '{name}'", gvk.kind);
} else {
warn!("Skipping document with unknown GVK: {gvk:?}");
}
}
Ok(())
}
/// Build a dynamic API client from a [`DynamicObject`]'s type metadata.
pub(crate) fn get_api_for_dynamic_object(
&self,
object: &DynamicObject,
ns: Option<&str>,
) -> Result<Api<DynamicObject>, Error> {
let ar = object
.types
.as_ref()
.and_then(|t| {
let parts: Vec<&str> = t.api_version.split('/').collect();
match parts.as_slice() {
[version] => Some(ApiResource::from_gvk(&GroupVersionKind::gvk(
"", version, &t.kind,
))),
[group, version] => Some(ApiResource::from_gvk(&GroupVersionKind::gvk(
group, version, &t.kind,
))),
_ => None,
}
})
.ok_or_else(|| {
Error::BuildRequest(kube::core::request::Error::Validation(format!(
"Invalid apiVersion in DynamicObject: {object:#?}"
)))
})?;
Ok(match ns {
Some(ns) => Api::namespaced_with(self.client.clone(), ns, &ar),
None => Api::default_namespaced_with(self.client.clone(), &ar),
})
}
}
// ── Free functions ───────────────────────────────────────────────────────────
pub(crate) fn get_dynamic_api(
resource: kube::api::ApiResource,
capabilities: kube::discovery::ApiCapabilities,
client: Client,
ns: Option<&str>,
all: bool,
) -> Api<DynamicObject> {
if capabilities.scope == Scope::Cluster || all {
Api::all_with(client, &resource)
} else if let Some(namespace) = ns {
Api::namespaced_with(client, namespace, &resource)
} else {
Api::default_namespaced_with(client, &resource)
}
}
pub(crate) fn multidoc_deserialize(
data: &str,
) -> Result<Vec<serde_yaml::Value>, serde_yaml::Error> {
use serde::Deserialize;
let mut docs = vec![];
for de in serde_yaml::Deserializer::from_str(data) {
docs.push(serde_yaml::Value::deserialize(de)?);
}
Ok(docs)
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod apply_tests {
use std::collections::BTreeMap;
use std::time::{SystemTime, UNIX_EPOCH};
use k8s_openapi::api::core::v1::ConfigMap;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::api::{DeleteParams, TypeMeta};
use super::*;
#[tokio::test]
#[ignore = "requires kubernetes cluster"]
async fn apply_creates_new_configmap() {
let client = K8sClient::try_default().await.unwrap();
let ns = "default";
let name = format!(
"test-cm-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
);
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some(name.clone()),
namespace: Some(ns.to_string()),
..Default::default()
},
data: Some(BTreeMap::from([("key1".to_string(), "value1".to_string())])),
..Default::default()
};
assert!(client.apply(&cm, Some(ns)).await.is_ok());
let api: Api<ConfigMap> = Api::namespaced(client.client.clone(), ns);
let _ = api.delete(&name, &DeleteParams::default()).await;
}
#[tokio::test]
#[ignore = "requires kubernetes cluster"]
async fn apply_is_idempotent() {
let client = K8sClient::try_default().await.unwrap();
let ns = "default";
let name = format!(
"test-idem-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
);
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some(name.clone()),
namespace: Some(ns.to_string()),
..Default::default()
},
data: Some(BTreeMap::from([("key".to_string(), "value".to_string())])),
..Default::default()
};
assert!(
client.apply(&cm, Some(ns)).await.is_ok(),
"first apply failed"
);
assert!(
client.apply(&cm, Some(ns)).await.is_ok(),
"second apply failed (not idempotent)"
);
let api: Api<ConfigMap> = Api::namespaced(client.client.clone(), ns);
let _ = api.delete(&name, &DeleteParams::default()).await;
}
#[tokio::test]
#[ignore = "requires kubernetes cluster"]
async fn apply_dynamic_creates_new_resource() {
let client = K8sClient::try_default().await.unwrap();
let ns = "default";
let name = format!(
"test-dyn-{}",
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis()
);
let obj = DynamicObject {
types: Some(TypeMeta {
api_version: "v1".to_string(),
kind: "ConfigMap".to_string(),
}),
metadata: ObjectMeta {
name: Some(name.clone()),
namespace: Some(ns.to_string()),
..Default::default()
},
data: serde_json::json!({}),
};
let result = client.apply_dynamic(&obj, Some(ns), false).await;
assert!(result.is_ok(), "apply_dynamic failed: {:?}", result.err());
let api: Api<ConfigMap> = Api::namespaced(client.client.clone(), ns);
let _ = api.delete(&name, &DeleteParams::default()).await;
}
}

View File

@@ -25,9 +25,9 @@
//!
//! ## Example
//!
//! ```rust,no_run
//! use harmony::topology::k8s::{K8sClient, helper};
//! use harmony::topology::KubernetesDistribution;
//! ```
//! use harmony_k8s::{K8sClient, helper};
//! use harmony_k8s::KubernetesDistribution;
//!
//! async fn write_network_config(client: &K8sClient, node: &str) {
//! // Create a bundle with platform-specific RBAC
@@ -56,7 +56,7 @@ use kube::{Error, Resource, ResourceExt, api::DynamicObject};
use serde::Serialize;
use serde_json;
use crate::domain::topology::k8s::K8sClient;
use crate::K8sClient;
/// A ResourceBundle represents a logical unit of work consisting of multiple
/// Kubernetes resources that should be applied or deleted together.

99
harmony-k8s/src/client.rs Normal file
View File

@@ -0,0 +1,99 @@
use std::sync::Arc;
use kube::config::{KubeConfigOptions, Kubeconfig};
use kube::{Client, Config, Discovery, Error};
use log::error;
use serde::Serialize;
use tokio::sync::OnceCell;
use crate::types::KubernetesDistribution;
// TODO not cool, should use a proper configuration mechanism
// cli arg, env var, config file
fn read_dry_run_from_env() -> bool {
std::env::var("DRY_RUN")
.map(|v| v == "true" || v == "1")
.unwrap_or(false)
}
#[derive(Clone)]
pub struct K8sClient {
pub(crate) client: Client,
/// When `true` no mutation is sent to the API server; diffs are printed
/// to stdout instead. Initialised from the `DRY_RUN` environment variable.
pub(crate) dry_run: bool,
pub(crate) k8s_distribution: Arc<OnceCell<KubernetesDistribution>>,
pub(crate) discovery: Arc<OnceCell<Discovery>>,
}
impl Serialize for K8sClient {
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{
todo!("K8sClient serialization is not meaningful; remove this impl if unused")
}
}
impl std::fmt::Debug for K8sClient {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_fmt(format_args!(
"K8sClient {{ namespace: {}, dry_run: {} }}",
self.client.default_namespace(),
self.dry_run,
))
}
}
impl K8sClient {
/// Create a client, reading `DRY_RUN` from the environment.
pub fn new(client: Client) -> Self {
Self {
dry_run: read_dry_run_from_env(),
client,
k8s_distribution: Arc::new(OnceCell::new()),
discovery: Arc::new(OnceCell::new()),
}
}
/// Create a client that always operates in dry-run mode, regardless of
/// the environment variable.
pub fn new_dry_run(client: Client) -> Self {
Self {
dry_run: true,
..Self::new(client)
}
}
/// Returns `true` if this client is operating in dry-run mode.
pub fn is_dry_run(&self) -> bool {
self.dry_run
}
pub async fn try_default() -> Result<Self, Error> {
Ok(Self::new(Client::try_default().await?))
}
pub async fn from_kubeconfig(path: &str) -> Option<Self> {
Self::from_kubeconfig_with_opts(path, &KubeConfigOptions::default()).await
}
pub async fn from_kubeconfig_with_context(path: &str, context: Option<String>) -> Option<Self> {
let mut opts = KubeConfigOptions::default();
opts.context = context;
Self::from_kubeconfig_with_opts(path, &opts).await
}
pub async fn from_kubeconfig_with_opts(path: &str, opts: &KubeConfigOptions) -> Option<Self> {
let k = match Kubeconfig::read_from(path) {
Ok(k) => k,
Err(e) => {
error!("Failed to load kubeconfig from {path}: {e}");
return None;
}
};
Some(Self::new(
Client::try_from(Config::from_custom_kubeconfig(k, opts).await.unwrap()).unwrap(),
))
}
}

View File

@@ -0,0 +1,83 @@
use std::time::Duration;
use kube::{Discovery, Error};
use log::{debug, error, info, trace, warn};
use tokio::sync::Mutex;
use tokio_retry::{Retry, strategy::ExponentialBackoff};
use crate::client::K8sClient;
use crate::types::KubernetesDistribution;
impl K8sClient {
pub async fn get_apiserver_version(
&self,
) -> Result<k8s_openapi::apimachinery::pkg::version::Info, Error> {
self.client.clone().apiserver_version().await
}
/// Runs (and caches) Kubernetes API discovery with exponential-backoff retries.
pub async fn discovery(&self) -> Result<&Discovery, Error> {
let retry_strategy = ExponentialBackoff::from_millis(1000)
.max_delay(Duration::from_secs(32))
.take(6);
let attempt = Mutex::new(0u32);
Retry::spawn(retry_strategy, || async {
let mut n = attempt.lock().await;
*n += 1;
match self
.discovery
.get_or_try_init(async || {
debug!("Running Kubernetes API discovery (attempt {})", *n);
let d = Discovery::new(self.client.clone()).run().await?;
debug!("Kubernetes API discovery completed");
Ok(d)
})
.await
{
Ok(d) => Ok(d),
Err(e) => {
warn!("Kubernetes API discovery failed (attempt {}): {}", *n, e);
Err(e)
}
}
})
.await
.map_err(|e| {
error!("Kubernetes API discovery failed after all retries: {}", e);
e
})
}
/// Detect which Kubernetes distribution is running. Result is cached for
/// the lifetime of the client.
pub async fn get_k8s_distribution(&self) -> Result<KubernetesDistribution, Error> {
self.k8s_distribution
.get_or_try_init(async || {
debug!("Detecting Kubernetes distribution");
let api_groups = self.client.list_api_groups().await?;
trace!("list_api_groups: {:?}", api_groups);
let version = self.get_apiserver_version().await?;
if api_groups
.groups
.iter()
.any(|g| g.name == "project.openshift.io")
{
info!("Detected distribution: OpenshiftFamily");
return Ok(KubernetesDistribution::OpenshiftFamily);
}
if version.git_version.contains("k3s") {
info!("Detected distribution: K3sFamily");
return Ok(KubernetesDistribution::K3sFamily);
}
info!("Distribution not identified, using Default");
Ok(KubernetesDistribution::Default)
})
.await
.cloned()
}
}

View File

@@ -1,7 +1,7 @@
use std::collections::BTreeMap;
use std::time::Duration;
use crate::topology::KubernetesDistribution;
use crate::KubernetesDistribution;
use super::bundle::ResourceBundle;
use super::config::PRIVILEGED_POD_IMAGE;
@@ -10,8 +10,10 @@ use k8s_openapi::api::core::v1::{
};
use k8s_openapi::api::rbac::v1::{ClusterRoleBinding, RoleRef, Subject};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::api::DynamicObject;
use kube::error::DiscoveryError;
use log::{debug, error, info, warn};
use serde::de::DeserializeOwned;
#[derive(Debug)]
pub struct PrivilegedPodConfig {
@@ -131,9 +133,9 @@ pub fn host_root_volume() -> (Volume, VolumeMount) {
///
/// # Example
///
/// ```rust,no_run
/// # use harmony::topology::k8s::helper::{build_privileged_bundle, PrivilegedPodConfig};
/// # use harmony::topology::KubernetesDistribution;
/// ```
/// use harmony_k8s::helper::{build_privileged_bundle, PrivilegedPodConfig};
/// use harmony_k8s::KubernetesDistribution;
/// let bundle = build_privileged_bundle(
/// PrivilegedPodConfig {
/// name: "network-setup".to_string(),
@@ -279,6 +281,16 @@ pub fn prompt_drain_timeout_action(
}
}
/// JSON round-trip: DynamicObject → K
///
/// Safe because the DynamicObject was produced by the apiserver from a
/// payload that was originally serialized from K, so the schema is identical.
pub(crate) fn dyn_to_typed<K: DeserializeOwned>(obj: DynamicObject) -> Result<K, kube::Error> {
serde_json::to_value(obj)
.and_then(serde_json::from_value)
.map_err(kube::Error::SerdeError)
}
#[cfg(test)]
mod tests {
use super::*;

13
harmony-k8s/src/lib.rs Normal file
View File

@@ -0,0 +1,13 @@
pub mod apply;
pub mod bundle;
pub mod client;
pub mod config;
pub mod discovery;
pub mod helper;
pub mod node;
pub mod pod;
pub mod resources;
pub mod types;
pub use client::K8sClient;
pub use types::{DrainOptions, KubernetesDistribution, NodeFile, ScopeResolver, WriteMode};

3
harmony-k8s/src/main.rs Normal file
View File

@@ -0,0 +1,3 @@
fn main() {
println!("Hello, world!");
}

722
harmony-k8s/src/node.rs Normal file
View File

@@ -0,0 +1,722 @@
use std::collections::BTreeMap;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use k8s_openapi::api::core::v1::{
ConfigMap, ConfigMapVolumeSource, Node, Pod, Volume, VolumeMount,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::{
Error,
api::{Api, DeleteParams, EvictParams, ListParams, PostParams},
core::ErrorResponse,
error::DiscoveryError,
};
use log::{debug, error, info, warn};
use tokio::time::sleep;
use crate::client::K8sClient;
use crate::helper::{self, PrivilegedPodConfig};
use crate::types::{DrainOptions, NodeFile};
impl K8sClient {
pub async fn cordon_node(&self, node_name: &str) -> Result<(), Error> {
Api::<Node>::all(self.client.clone())
.cordon(node_name)
.await?;
Ok(())
}
pub async fn uncordon_node(&self, node_name: &str) -> Result<(), Error> {
Api::<Node>::all(self.client.clone())
.uncordon(node_name)
.await?;
Ok(())
}
pub async fn wait_for_node_ready(&self, node_name: &str) -> Result<(), Error> {
self.wait_for_node_ready_with_timeout(node_name, Duration::from_secs(600))
.await
}
async fn wait_for_node_ready_with_timeout(
&self,
node_name: &str,
timeout: Duration,
) -> Result<(), Error> {
let api: Api<Node> = Api::all(self.client.clone());
let start = tokio::time::Instant::now();
let poll = Duration::from_secs(5);
loop {
if start.elapsed() > timeout {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Node '{node_name}' did not become Ready within {timeout:?}"
))));
}
match api.get(node_name).await {
Ok(node) => {
if node
.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.map(|conds| {
conds
.iter()
.any(|c| c.type_ == "Ready" && c.status == "True")
})
.unwrap_or(false)
{
debug!("Node '{node_name}' is Ready");
return Ok(());
}
}
Err(e) => debug!("Error polling node '{node_name}': {e}"),
}
sleep(poll).await;
}
}
async fn wait_for_node_not_ready(
&self,
node_name: &str,
timeout: Duration,
) -> Result<(), Error> {
let api: Api<Node> = Api::all(self.client.clone());
let start = tokio::time::Instant::now();
let poll = Duration::from_secs(5);
loop {
if start.elapsed() > timeout {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Node '{node_name}' did not become NotReady within {timeout:?}"
))));
}
match api.get(node_name).await {
Ok(node) => {
let is_ready = node
.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.map(|conds| {
conds
.iter()
.any(|c| c.type_ == "Ready" && c.status == "True")
})
.unwrap_or(false);
if !is_ready {
debug!("Node '{node_name}' is NotReady");
return Ok(());
}
}
Err(e) => debug!("Error polling node '{node_name}': {e}"),
}
sleep(poll).await;
}
}
async fn list_pods_on_node(&self, node_name: &str) -> Result<Vec<Pod>, Error> {
let api: Api<Pod> = Api::all(self.client.clone());
Ok(api
.list(&ListParams::default().fields(&format!("spec.nodeName={node_name}")))
.await?
.items)
}
fn is_mirror_pod(pod: &Pod) -> bool {
pod.metadata
.annotations
.as_ref()
.map(|a| a.contains_key("kubernetes.io/config.mirror"))
.unwrap_or(false)
}
fn is_daemonset_pod(pod: &Pod) -> bool {
pod.metadata
.owner_references
.as_ref()
.map(|refs| refs.iter().any(|r| r.kind == "DaemonSet"))
.unwrap_or(false)
}
fn has_emptydir_volume(pod: &Pod) -> bool {
pod.spec
.as_ref()
.and_then(|s| s.volumes.as_ref())
.map(|vols| vols.iter().any(|v| v.empty_dir.is_some()))
.unwrap_or(false)
}
fn is_completed_pod(pod: &Pod) -> bool {
pod.status
.as_ref()
.and_then(|s| s.phase.as_deref())
.map(|phase| phase == "Succeeded" || phase == "Failed")
.unwrap_or(false)
}
fn classify_pods_for_drain(
pods: &[Pod],
options: &DrainOptions,
) -> Result<(Vec<Pod>, Vec<String>), String> {
let mut evictable = Vec::new();
let mut skipped = Vec::new();
let mut blocking = Vec::new();
for pod in pods {
let name = pod.metadata.name.as_deref().unwrap_or("<unknown>");
let ns = pod.metadata.namespace.as_deref().unwrap_or("<unknown>");
let qualified = format!("{ns}/{name}");
if Self::is_mirror_pod(pod) {
skipped.push(format!("{qualified} (mirror pod)"));
continue;
}
if Self::is_completed_pod(pod) {
skipped.push(format!("{qualified} (completed)"));
continue;
}
if Self::is_daemonset_pod(pod) {
if options.ignore_daemonsets {
skipped.push(format!("{qualified} (DaemonSet-managed)"));
} else {
blocking.push(format!(
"{qualified} is managed by a DaemonSet (set ignore_daemonsets to skip)"
));
}
continue;
}
if Self::has_emptydir_volume(pod) && !options.delete_emptydir_data {
blocking.push(format!(
"{qualified} uses emptyDir volumes (set delete_emptydir_data to allow eviction)"
));
continue;
}
evictable.push(pod.clone());
}
if !blocking.is_empty() {
return Err(format!(
"Cannot drain node — the following pods block eviction:\n - {}",
blocking.join("\n - ")
));
}
Ok((evictable, skipped))
}
async fn evict_pod(&self, pod: &Pod) -> Result<(), Error> {
let name = pod.metadata.name.as_deref().unwrap_or_default();
let ns = pod.metadata.namespace.as_deref().unwrap_or_default();
debug!("Evicting pod {ns}/{name}");
Api::<Pod>::namespaced(self.client.clone(), ns)
.evict(name, &EvictParams::default())
.await
.map(|_| ())
}
/// Drains a node: cordon → classify → evict & wait.
pub async fn drain_node(&self, node_name: &str, options: &DrainOptions) -> Result<(), Error> {
debug!("Cordoning '{node_name}'");
self.cordon_node(node_name).await?;
let pods = self.list_pods_on_node(node_name).await?;
debug!("Found {} pod(s) on '{node_name}'", pods.len());
let (evictable, skipped) =
Self::classify_pods_for_drain(&pods, options).map_err(|msg| {
error!("{msg}");
Error::Discovery(DiscoveryError::MissingResource(msg))
})?;
for s in &skipped {
info!("Skipping pod: {s}");
}
if evictable.is_empty() {
info!("No pods to evict on '{node_name}'");
return Ok(());
}
info!("Evicting {} pod(s) from '{node_name}'", evictable.len());
let mut start = tokio::time::Instant::now();
let poll = Duration::from_secs(5);
let mut pending = evictable;
loop {
for pod in &pending {
match self.evict_pod(pod).await {
Ok(()) => {}
Err(Error::Api(ErrorResponse { code: 404, .. })) => {}
Err(Error::Api(ErrorResponse { code: 429, .. })) => {
warn!(
"PDB blocked eviction of {}/{}; will retry",
pod.metadata.namespace.as_deref().unwrap_or(""),
pod.metadata.name.as_deref().unwrap_or("")
);
}
Err(e) => {
error!(
"Failed to evict {}/{}: {e}",
pod.metadata.namespace.as_deref().unwrap_or(""),
pod.metadata.name.as_deref().unwrap_or("")
);
return Err(e);
}
}
}
sleep(poll).await;
let mut still_present = Vec::new();
for pod in pending {
let ns = pod.metadata.namespace.as_deref().unwrap_or_default();
let name = pod.metadata.name.as_deref().unwrap_or_default();
match self.get_pod(name, Some(ns)).await? {
Some(_) => still_present.push(pod),
None => debug!("Pod {ns}/{name} evicted"),
}
}
pending = still_present;
if pending.is_empty() {
break;
}
if start.elapsed() > options.timeout {
match helper::prompt_drain_timeout_action(
node_name,
pending.len(),
options.timeout,
)? {
helper::DrainTimeoutAction::Accept => break,
helper::DrainTimeoutAction::Retry => {
start = tokio::time::Instant::now();
continue;
}
helper::DrainTimeoutAction::Abort => {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Drain aborted. {} pod(s) remaining on '{node_name}'",
pending.len()
))));
}
}
}
debug!("Waiting for {} pod(s) on '{node_name}'", pending.len());
}
debug!("'{node_name}' drained successfully");
Ok(())
}
/// Safely reboots a node: drain → reboot → wait for Ready → uncordon.
pub async fn reboot_node(
&self,
node_name: &str,
drain_options: &DrainOptions,
timeout: Duration,
) -> Result<(), Error> {
info!("Starting reboot for '{node_name}'");
let node_api: Api<Node> = Api::all(self.client.clone());
let boot_id_before = node_api
.get(node_name)
.await?
.status
.as_ref()
.and_then(|s| s.node_info.as_ref())
.map(|ni| ni.boot_id.clone())
.ok_or_else(|| {
Error::Discovery(DiscoveryError::MissingResource(format!(
"Node '{node_name}' has no boot_id in status"
)))
})?;
info!("Draining '{node_name}'");
self.drain_node(node_name, drain_options).await?;
let start = tokio::time::Instant::now();
info!("Scheduling reboot for '{node_name}'");
let reboot_cmd =
"echo rebooting ; nohup bash -c 'sleep 5 && nsenter -t 1 -m -- systemctl reboot'";
match self
.run_privileged_command_on_node(node_name, reboot_cmd)
.await
{
Ok(_) => debug!("Reboot command dispatched"),
Err(e) => debug!("Reboot command error (expected if node began shutdown): {e}"),
}
info!("Waiting for '{node_name}' to begin shutdown");
self.wait_for_node_not_ready(node_name, timeout.saturating_sub(start.elapsed()))
.await?;
if start.elapsed() > timeout {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Timeout during reboot of '{node_name}' (shutdown phase)"
))));
}
info!("Waiting for '{node_name}' to come back online");
self.wait_for_node_ready_with_timeout(node_name, timeout.saturating_sub(start.elapsed()))
.await?;
if start.elapsed() > timeout {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Timeout during reboot of '{node_name}' (ready phase)"
))));
}
let boot_id_after = node_api
.get(node_name)
.await?
.status
.as_ref()
.and_then(|s| s.node_info.as_ref())
.map(|ni| ni.boot_id.clone())
.ok_or_else(|| {
Error::Discovery(DiscoveryError::MissingResource(format!(
"Node '{node_name}' has no boot_id after reboot"
)))
})?;
if boot_id_before == boot_id_after {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Node '{node_name}' did not actually reboot (boot_id unchanged: {boot_id_before})"
))));
}
info!("'{node_name}' rebooted ({boot_id_before} → {boot_id_after})");
self.uncordon_node(node_name).await?;
info!("'{node_name}' reboot complete ({:?})", start.elapsed());
Ok(())
}
/// Write a set of files to a node's filesystem via a privileged ephemeral pod.
pub async fn write_files_to_node(
&self,
node_name: &str,
files: &[NodeFile],
) -> Result<String, Error> {
let ns = self.client.default_namespace();
let suffix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
let name = format!("harmony-k8s-writer-{suffix}");
debug!("Writing {} file(s) to '{node_name}'", files.len());
let mut data = BTreeMap::new();
let mut script = String::from("set -e\n");
for (i, file) in files.iter().enumerate() {
let key = format!("f{i}");
data.insert(key.clone(), file.content.clone());
script.push_str(&format!("mkdir -p \"$(dirname \"/host{}\")\"\n", file.path));
script.push_str(&format!("cp \"/payload/{key}\" \"/host{}\"\n", file.path));
script.push_str(&format!("chmod {:o} \"/host{}\"\n", file.mode, file.path));
}
let cm = ConfigMap {
metadata: ObjectMeta {
name: Some(name.clone()),
namespace: Some(ns.to_string()),
..Default::default()
},
data: Some(data),
..Default::default()
};
let cm_api: Api<ConfigMap> = Api::namespaced(self.client.clone(), ns);
cm_api.create(&PostParams::default(), &cm).await?;
debug!("Created ConfigMap '{name}'");
let (host_vol, host_mount) = helper::host_root_volume();
let payload_vol = Volume {
name: "payload".to_string(),
config_map: Some(ConfigMapVolumeSource {
name: name.clone(),
..Default::default()
}),
..Default::default()
};
let payload_mount = VolumeMount {
name: "payload".to_string(),
mount_path: "/payload".to_string(),
..Default::default()
};
let bundle = helper::build_privileged_bundle(
PrivilegedPodConfig {
name: name.clone(),
namespace: ns.to_string(),
node_name: node_name.to_string(),
container_name: "writer".to_string(),
command: vec!["/bin/bash".to_string(), "-c".to_string(), script],
volumes: vec![payload_vol, host_vol],
volume_mounts: vec![payload_mount, host_mount],
host_pid: false,
host_network: false,
},
&self.get_k8s_distribution().await?,
);
bundle.apply(self).await?;
debug!("Created privileged pod bundle '{name}'");
let result = self.wait_for_pod_completion(&name, ns).await;
debug!("Cleaning up '{name}'");
let _ = bundle.delete(self).await;
let _ = cm_api.delete(&name, &DeleteParams::default()).await;
result
}
/// Run a privileged command on a node via an ephemeral pod.
pub async fn run_privileged_command_on_node(
&self,
node_name: &str,
command: &str,
) -> Result<String, Error> {
let namespace = self.client.default_namespace();
let suffix = SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_millis();
let name = format!("harmony-k8s-cmd-{suffix}");
debug!("Running privileged command on '{node_name}': {command}");
let (host_vol, host_mount) = helper::host_root_volume();
let bundle = helper::build_privileged_bundle(
PrivilegedPodConfig {
name: name.clone(),
namespace: namespace.to_string(),
node_name: node_name.to_string(),
container_name: "runner".to_string(),
command: vec![
"/bin/bash".to_string(),
"-c".to_string(),
command.to_string(),
],
volumes: vec![host_vol],
volume_mounts: vec![host_mount],
host_pid: true,
host_network: true,
},
&self.get_k8s_distribution().await?,
);
bundle.apply(self).await?;
debug!("Privileged pod '{name}' created");
let result = self.wait_for_pod_completion(&name, namespace).await;
debug!("Cleaning up '{name}'");
let _ = bundle.delete(self).await;
result
}
}
// ── Tests ────────────────────────────────────────────────────────────────────
#[cfg(test)]
mod tests {
use k8s_openapi::api::core::v1::{EmptyDirVolumeSource, PodSpec, PodStatus, Volume};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ObjectMeta, OwnerReference};
use super::*;
fn base_pod(name: &str, ns: &str) -> Pod {
Pod {
metadata: ObjectMeta {
name: Some(name.to_string()),
namespace: Some(ns.to_string()),
..Default::default()
},
spec: Some(PodSpec::default()),
status: Some(PodStatus {
phase: Some("Running".to_string()),
..Default::default()
}),
}
}
fn mirror_pod(name: &str, ns: &str) -> Pod {
let mut pod = base_pod(name, ns);
pod.metadata.annotations = Some(std::collections::BTreeMap::from([(
"kubernetes.io/config.mirror".to_string(),
"abc123".to_string(),
)]));
pod
}
fn daemonset_pod(name: &str, ns: &str) -> Pod {
let mut pod = base_pod(name, ns);
pod.metadata.owner_references = Some(vec![OwnerReference {
api_version: "apps/v1".to_string(),
kind: "DaemonSet".to_string(),
name: "some-ds".to_string(),
uid: "uid-ds".to_string(),
..Default::default()
}]);
pod
}
fn emptydir_pod(name: &str, ns: &str) -> Pod {
let mut pod = base_pod(name, ns);
pod.spec = Some(PodSpec {
volumes: Some(vec![Volume {
name: "scratch".to_string(),
empty_dir: Some(EmptyDirVolumeSource::default()),
..Default::default()
}]),
..Default::default()
});
pod
}
fn completed_pod(name: &str, ns: &str, phase: &str) -> Pod {
let mut pod = base_pod(name, ns);
pod.status = Some(PodStatus {
phase: Some(phase.to_string()),
..Default::default()
});
pod
}
fn default_opts() -> DrainOptions {
DrainOptions::default()
}
// All test bodies are identical to the original — only the module path changed.
#[test]
fn empty_pod_list_returns_empty_vecs() {
let (e, s) = K8sClient::classify_pods_for_drain(&[], &default_opts()).unwrap();
assert!(e.is_empty());
assert!(s.is_empty());
}
#[test]
fn normal_pod_is_evictable() {
let pods = vec![base_pod("web", "default")];
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
assert_eq!(e.len(), 1);
assert!(s.is_empty());
}
#[test]
fn mirror_pod_is_skipped() {
let pods = vec![mirror_pod("kube-apiserver", "kube-system")];
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
assert!(e.is_empty());
assert!(s[0].contains("mirror pod"));
}
#[test]
fn completed_pods_are_skipped() {
for phase in ["Succeeded", "Failed"] {
let pods = vec![completed_pod("job", "batch", phase)];
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
assert!(e.is_empty());
assert!(s[0].contains("completed"));
}
}
#[test]
fn daemonset_skipped_when_ignored() {
let pods = vec![daemonset_pod("fluentd", "logging")];
let opts = DrainOptions {
ignore_daemonsets: true,
..default_opts()
};
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap();
assert!(e.is_empty());
assert!(s[0].contains("DaemonSet-managed"));
}
#[test]
fn daemonset_blocks_when_not_ignored() {
let pods = vec![daemonset_pod("fluentd", "logging")];
let opts = DrainOptions {
ignore_daemonsets: false,
..default_opts()
};
let err = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap_err();
assert!(err.contains("DaemonSet") && err.contains("logging/fluentd"));
}
#[test]
fn emptydir_blocks_without_flag() {
let pods = vec![emptydir_pod("cache", "default")];
let opts = DrainOptions {
delete_emptydir_data: false,
..default_opts()
};
let err = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap_err();
assert!(err.contains("emptyDir") && err.contains("default/cache"));
}
#[test]
fn emptydir_evictable_with_flag() {
let pods = vec![emptydir_pod("cache", "default")];
let opts = DrainOptions {
delete_emptydir_data: true,
..default_opts()
};
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap();
assert_eq!(e.len(), 1);
assert!(s.is_empty());
}
#[test]
fn multiple_blocking_all_reported() {
let pods = vec![daemonset_pod("ds", "ns1"), emptydir_pod("ed", "ns2")];
let opts = DrainOptions {
ignore_daemonsets: false,
delete_emptydir_data: false,
..default_opts()
};
let err = K8sClient::classify_pods_for_drain(&pods, &opts).unwrap_err();
assert!(err.contains("ns1/ds") && err.contains("ns2/ed"));
}
#[test]
fn mixed_pods_classified_correctly() {
let pods = vec![
base_pod("web", "default"),
mirror_pod("kube-apiserver", "kube-system"),
daemonset_pod("fluentd", "logging"),
completed_pod("job", "batch", "Succeeded"),
base_pod("api", "default"),
];
let (e, s) = K8sClient::classify_pods_for_drain(&pods, &default_opts()).unwrap();
let names: Vec<&str> = e
.iter()
.map(|p| p.metadata.name.as_deref().unwrap())
.collect();
assert_eq!(names, vec!["web", "api"]);
assert_eq!(s.len(), 3);
}
#[test]
fn mirror_checked_before_completed() {
let mut pod = mirror_pod("static-etcd", "kube-system");
pod.status = Some(PodStatus {
phase: Some("Succeeded".to_string()),
..Default::default()
});
let (_, s) = K8sClient::classify_pods_for_drain(&[pod], &default_opts()).unwrap();
assert!(s[0].contains("mirror pod"), "got: {}", s[0]);
}
#[test]
fn completed_checked_before_daemonset() {
let mut pod = daemonset_pod("collector", "monitoring");
pod.status = Some(PodStatus {
phase: Some("Failed".to_string()),
..Default::default()
});
let (_, s) = K8sClient::classify_pods_for_drain(&[pod], &default_opts()).unwrap();
assert!(s[0].contains("completed"), "got: {}", s[0]);
}
}

193
harmony-k8s/src/pod.rs Normal file
View File

@@ -0,0 +1,193 @@
use std::time::Duration;
use k8s_openapi::api::core::v1::Pod;
use kube::{
Error,
api::{Api, AttachParams, ListParams},
error::DiscoveryError,
runtime::reflector::Lookup,
};
use log::debug;
use tokio::io::AsyncReadExt;
use tokio::time::sleep;
use crate::client::K8sClient;
impl K8sClient {
pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result<Option<Pod>, Error> {
let api: Api<Pod> = match namespace {
Some(ns) => Api::namespaced(self.client.clone(), ns),
None => Api::default_namespaced(self.client.clone()),
};
api.get_opt(name).await
}
pub async fn wait_for_pod_ready(
&self,
pod_name: &str,
namespace: Option<&str>,
) -> Result<(), Error> {
let mut elapsed = 0u64;
let interval = 5u64;
let timeout_secs = 120u64;
loop {
if let Some(p) = self.get_pod(pod_name, namespace).await? {
if let Some(phase) = p.status.and_then(|s| s.phase) {
if phase.to_lowercase() == "running" {
return Ok(());
}
}
}
if elapsed >= timeout_secs {
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Pod '{}' in '{}' did not become ready within {timeout_secs}s",
pod_name,
namespace.unwrap_or("<default>"),
))));
}
sleep(Duration::from_secs(interval)).await;
elapsed += interval;
}
}
/// Polls a pod until it reaches `Succeeded` or `Failed`, then returns its
/// logs. Used internally by node operations.
pub(crate) async fn wait_for_pod_completion(
&self,
name: &str,
namespace: &str,
) -> Result<String, Error> {
let api: Api<Pod> = Api::namespaced(self.client.clone(), namespace);
let poll_interval = Duration::from_secs(2);
for _ in 0..60 {
sleep(poll_interval).await;
let p = api.get(name).await?;
match p.status.and_then(|s| s.phase).as_deref() {
Some("Succeeded") => {
let logs = api
.logs(name, &Default::default())
.await
.unwrap_or_default();
debug!("Pod {namespace}/{name} succeeded. Logs: {logs}");
return Ok(logs);
}
Some("Failed") => {
let logs = api
.logs(name, &Default::default())
.await
.unwrap_or_default();
debug!("Pod {namespace}/{name} failed. Logs: {logs}");
return Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Pod '{name}' failed.\n{logs}"
))));
}
_ => {}
}
}
Err(Error::Discovery(DiscoveryError::MissingResource(format!(
"Timed out waiting for pod '{name}'"
))))
}
/// Execute a command in the first pod matching `{label}={name}`.
pub async fn exec_app_capture_output(
&self,
name: String,
label: String,
namespace: Option<&str>,
command: Vec<&str>,
) -> Result<String, String> {
let api: Api<Pod> = match namespace {
Some(ns) => Api::namespaced(self.client.clone(), ns),
None => Api::default_namespaced(self.client.clone()),
};
let pod_list = api
.list(&ListParams::default().labels(&format!("{label}={name}")))
.await
.expect("Failed to list pods");
let pod_name = pod_list
.items
.first()
.expect("No matching pod")
.name()
.expect("Pod has no name")
.into_owned();
match api
.exec(
&pod_name,
command,
&AttachParams::default().stdout(true).stderr(true),
)
.await
{
Err(e) => Err(e.to_string()),
Ok(mut process) => {
let status = process
.take_status()
.expect("No status handle")
.await
.expect("Status channel closed");
if let Some(s) = status.status {
let mut buf = String::new();
if let Some(mut stdout) = process.stdout() {
stdout
.read_to_string(&mut buf)
.await
.map_err(|e| format!("Failed to read stdout: {e}"))?;
}
debug!("exec status: {} - {:?}", s, status.details);
if s == "Success" { Ok(buf) } else { Err(s) }
} else {
Err("No inner status from pod exec".to_string())
}
}
}
}
/// Execute a command in the first pod matching
/// `app.kubernetes.io/name={name}`.
pub async fn exec_app(
&self,
name: String,
namespace: Option<&str>,
command: Vec<&str>,
) -> Result<(), String> {
let api: Api<Pod> = match namespace {
Some(ns) => Api::namespaced(self.client.clone(), ns),
None => Api::default_namespaced(self.client.clone()),
};
let pod_list = api
.list(&ListParams::default().labels(&format!("app.kubernetes.io/name={name}")))
.await
.expect("Failed to list pods");
let pod_name = pod_list
.items
.first()
.expect("No matching pod")
.name()
.expect("Pod has no name")
.into_owned();
match api.exec(&pod_name, command, &AttachParams::default()).await {
Err(e) => Err(e.to_string()),
Ok(mut process) => {
let status = process
.take_status()
.expect("No status handle")
.await
.expect("Status channel closed");
if let Some(s) = status.status {
debug!("exec status: {} - {:?}", s, status.details);
if s == "Success" { Ok(()) } else { Err(s) }
} else {
Err("No inner status from pod exec".to_string())
}
}
}
}
}

View File

@@ -0,0 +1,316 @@
use std::collections::HashMap;
use k8s_openapi::api::{
apps::v1::Deployment,
core::v1::{Node, ServiceAccount},
};
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use kube::api::ApiResource;
use kube::{
Error, Resource,
api::{Api, DynamicObject, GroupVersionKind, ListParams, ObjectList},
runtime::conditions,
runtime::wait::await_condition,
};
use log::debug;
use serde::de::DeserializeOwned;
use serde_json::Value;
use std::time::Duration;
use crate::client::K8sClient;
use crate::types::ScopeResolver;
impl K8sClient {
pub async fn has_healthy_deployment_with_label(
&self,
namespace: &str,
label_selector: &str,
) -> Result<bool, Error> {
let api: Api<Deployment> = Api::namespaced(self.client.clone(), namespace);
let list = api
.list(&ListParams::default().labels(label_selector))
.await?;
for d in list.items {
let available = d
.status
.as_ref()
.and_then(|s| s.available_replicas)
.unwrap_or(0);
if available > 0 {
return Ok(true);
}
if let Some(conds) = d.status.as_ref().and_then(|s| s.conditions.as_ref()) {
if conds
.iter()
.any(|c| c.type_ == "Available" && c.status == "True")
{
return Ok(true);
}
}
}
Ok(false)
}
pub async fn list_namespaces_with_healthy_deployments(
&self,
label_selector: &str,
) -> Result<Vec<String>, Error> {
let api: Api<Deployment> = Api::all(self.client.clone());
let list = api
.list(&ListParams::default().labels(label_selector))
.await?;
let mut healthy_ns: HashMap<String, bool> = HashMap::new();
for d in list.items {
let ns = match d.metadata.namespace.clone() {
Some(n) => n,
None => continue,
};
let available = d
.status
.as_ref()
.and_then(|s| s.available_replicas)
.unwrap_or(0);
let is_healthy = if available > 0 {
true
} else {
d.status
.as_ref()
.and_then(|s| s.conditions.as_ref())
.map(|c| {
c.iter()
.any(|c| c.type_ == "Available" && c.status == "True")
})
.unwrap_or(false)
};
if is_healthy {
healthy_ns.insert(ns, true);
}
}
Ok(healthy_ns.into_keys().collect())
}
pub async fn get_controller_service_account_name(
&self,
ns: &str,
) -> Result<Option<String>, Error> {
let api: Api<Deployment> = Api::namespaced(self.client.clone(), ns);
let list = api
.list(&ListParams::default().labels("app.kubernetes.io/component=controller"))
.await?;
if let Some(dep) = list.items.first() {
if let Some(sa) = dep
.spec
.as_ref()
.and_then(|s| s.template.spec.as_ref())
.and_then(|s| s.service_account_name.clone())
{
return Ok(Some(sa));
}
}
Ok(None)
}
pub async fn list_clusterrolebindings_json(&self) -> Result<Vec<Value>, Error> {
let gvk = GroupVersionKind::gvk("rbac.authorization.k8s.io", "v1", "ClusterRoleBinding");
let ar = ApiResource::from_gvk(&gvk);
let api: Api<DynamicObject> = Api::all_with(self.client.clone(), &ar);
let list = api.list(&ListParams::default()).await?;
Ok(list
.items
.into_iter()
.map(|o| serde_json::to_value(&o).unwrap_or(Value::Null))
.collect())
}
pub async fn is_service_account_cluster_wide(&self, sa: &str, ns: &str) -> Result<bool, Error> {
let sa_user = format!("system:serviceaccount:{ns}:{sa}");
for crb in self.list_clusterrolebindings_json().await? {
if let Some(subjects) = crb.get("subjects").and_then(|s| s.as_array()) {
for subj in subjects {
let kind = subj.get("kind").and_then(|v| v.as_str()).unwrap_or("");
let name = subj.get("name").and_then(|v| v.as_str()).unwrap_or("");
let subj_ns = subj.get("namespace").and_then(|v| v.as_str()).unwrap_or("");
if (kind == "ServiceAccount" && name == sa && subj_ns == ns)
|| (kind == "User" && name == sa_user)
{
return Ok(true);
}
}
}
}
Ok(false)
}
pub async fn has_crd(&self, name: &str) -> Result<bool, Error> {
let api: Api<CustomResourceDefinition> = Api::all(self.client.clone());
let crds = api
.list(&ListParams::default().fields(&format!("metadata.name={name}")))
.await?;
Ok(!crds.items.is_empty())
}
pub async fn service_account_api(&self, namespace: &str) -> Api<ServiceAccount> {
Api::namespaced(self.client.clone(), namespace)
}
pub async fn get_resource_json_value(
&self,
name: &str,
namespace: Option<&str>,
gvk: &GroupVersionKind,
) -> Result<DynamicObject, Error> {
let ar = ApiResource::from_gvk(gvk);
let api: Api<DynamicObject> = match namespace {
Some(ns) => Api::namespaced_with(self.client.clone(), ns, &ar),
None => Api::default_namespaced_with(self.client.clone(), &ar),
};
api.get(name).await
}
pub async fn get_secret_json_value(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<DynamicObject, Error> {
self.get_resource_json_value(
name,
namespace,
&GroupVersionKind {
group: String::new(),
version: "v1".to_string(),
kind: "Secret".to_string(),
},
)
.await
}
pub async fn get_deployment(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<Option<Deployment>, Error> {
let api: Api<Deployment> = match namespace {
Some(ns) => {
debug!("Getting namespaced deployment '{name}' in '{ns}'");
Api::namespaced(self.client.clone(), ns)
}
None => {
debug!("Getting deployment '{name}' in default namespace");
Api::default_namespaced(self.client.clone())
}
};
api.get_opt(name).await
}
pub async fn scale_deployment(
&self,
name: &str,
namespace: Option<&str>,
replicas: u32,
) -> Result<(), Error> {
let api: Api<Deployment> = match namespace {
Some(ns) => Api::namespaced(self.client.clone(), ns),
None => Api::default_namespaced(self.client.clone()),
};
use kube::api::{Patch, PatchParams};
use serde_json::json;
let patch = json!({ "spec": { "replicas": replicas } });
api.patch_scale(name, &PatchParams::default(), &Patch::Merge(&patch))
.await?;
Ok(())
}
pub async fn delete_deployment(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<(), Error> {
let api: Api<Deployment> = match namespace {
Some(ns) => Api::namespaced(self.client.clone(), ns),
None => Api::default_namespaced(self.client.clone()),
};
api.delete(name, &kube::api::DeleteParams::default())
.await?;
Ok(())
}
pub async fn wait_until_deployment_ready(
&self,
name: &str,
namespace: Option<&str>,
timeout: Option<Duration>,
) -> Result<(), String> {
let api: Api<Deployment> = match namespace {
Some(ns) => Api::namespaced(self.client.clone(), ns),
None => Api::default_namespaced(self.client.clone()),
};
let timeout = timeout.unwrap_or(Duration::from_secs(120));
let establish = await_condition(api, name, conditions::is_deployment_completed());
tokio::time::timeout(timeout, establish)
.await
.map(|_| ())
.map_err(|_| "Timed out waiting for deployment".to_string())
}
/// Gets a single named resource, using the correct API scope for `K`.
pub async fn get_resource<K>(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<Option<K>, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
<K as Resource>::Scope: ScopeResolver<K>,
<K as Resource>::DynamicType: Default,
{
let api: Api<K> =
<<K as Resource>::Scope as ScopeResolver<K>>::get_api(&self.client, namespace);
api.get_opt(name).await
}
pub async fn list_resources<K>(
&self,
namespace: Option<&str>,
list_params: Option<ListParams>,
) -> Result<ObjectList<K>, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
<K as Resource>::Scope: ScopeResolver<K>,
<K as Resource>::DynamicType: Default,
{
let api: Api<K> =
<<K as Resource>::Scope as ScopeResolver<K>>::get_api(&self.client, namespace);
api.list(&list_params.unwrap_or_default()).await
}
pub async fn list_all_resources_with_labels<K>(&self, labels: &str) -> Result<Vec<K>, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
<K as Resource>::DynamicType: Default,
{
Api::<K>::all(self.client.clone())
.list(&ListParams::default().labels(labels))
.await
.map(|l| l.items)
}
pub async fn get_all_resource_in_all_namespace<K>(&self) -> Result<Vec<K>, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned,
<K as Resource>::Scope: ScopeResolver<K>,
<K as Resource>::DynamicType: Default,
{
Api::<K>::all(self.client.clone())
.list(&Default::default())
.await
.map(|l| l.items)
}
pub async fn get_nodes(
&self,
list_params: Option<ListParams>,
) -> Result<ObjectList<Node>, Error> {
self.list_resources(None, list_params).await
}
}

100
harmony-k8s/src/types.rs Normal file
View File

@@ -0,0 +1,100 @@
use std::time::Duration;
use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope};
use kube::{Api, Client, Resource};
use serde::Serialize;
/// Which Kubernetes distribution is running. Detected once at runtime via
/// [`crate::discovery::K8sClient::get_k8s_distribution`].
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub enum KubernetesDistribution {
Default,
OpenshiftFamily,
K3sFamily,
}
/// A file to be written to a node's filesystem.
#[derive(Debug, Clone)]
pub struct NodeFile {
/// Absolute path on the host where the file should be written.
pub path: String,
/// Content of the file.
pub content: String,
/// UNIX permissions (e.g. `0o600`).
pub mode: u32,
}
/// Options controlling the behaviour of a [`crate::K8sClient::drain_node`] operation.
#[derive(Debug, Clone)]
pub struct DrainOptions {
/// Evict pods that use `emptyDir` volumes (ephemeral data is lost).
/// Equivalent to `kubectl drain --delete-emptydir-data`.
pub delete_emptydir_data: bool,
/// Silently skip DaemonSet-managed pods instead of blocking the drain.
/// Equivalent to `kubectl drain --ignore-daemonsets`.
pub ignore_daemonsets: bool,
/// Maximum wall-clock time to wait for all evictions to complete.
pub timeout: Duration,
}
impl Default for DrainOptions {
fn default() -> Self {
Self {
delete_emptydir_data: false,
ignore_daemonsets: true,
timeout: Duration::from_secs(1),
}
}
}
impl DrainOptions {
pub fn default_ignore_daemonset_delete_emptydir_data() -> Self {
Self {
delete_emptydir_data: true,
ignore_daemonsets: true,
..Self::default()
}
}
}
/// Controls how [`crate::K8sClient::apply_with_strategy`] behaves when the
/// resource already exists (or does not).
pub enum WriteMode {
/// Server-side apply; create if absent, update if present (default).
CreateOrUpdate,
/// POST only; return an error if the resource already exists.
Create,
/// Server-side apply only; return an error if the resource does not exist.
Update,
}
// ── Scope resolution trait ───────────────────────────────────────────────────
/// Resolves the correct [`kube::Api`] for a resource type based on its scope
/// (cluster-wide vs. namespace-scoped).
pub trait ScopeResolver<K: Resource> {
fn get_api(client: &Client, ns: Option<&str>) -> Api<K>;
}
impl<K> ScopeResolver<K> for ClusterResourceScope
where
K: Resource<Scope = ClusterResourceScope>,
<K as Resource>::DynamicType: Default,
{
fn get_api(client: &Client, _ns: Option<&str>) -> Api<K> {
Api::all(client.clone())
}
}
impl<K> ScopeResolver<K> for NamespaceResourceScope
where
K: Resource<Scope = NamespaceResourceScope>,
<K as Resource>::DynamicType: Default,
{
fn get_api(client: &Client, ns: Option<&str>) -> Api<K> {
match ns {
Some(ns) => Api::namespaced(client.clone(), ns),
None => Api::default_namespaced(client.clone()),
}
}
}

View File

@@ -21,6 +21,8 @@ semver = "1.0.23"
serde.workspace = true
serde_json.workspace = true
tokio.workspace = true
tokio-retry.workspace = true
tokio-util.workspace = true
derive-new.workspace = true
log.workspace = true
env_logger.workspace = true
@@ -31,6 +33,7 @@ opnsense-config-xml = { path = "../opnsense-config-xml" }
harmony_macros = { path = "../harmony_macros" }
harmony_types = { path = "../harmony_types" }
harmony_execution = { path = "../harmony_execution" }
harmony-k8s = { path = "../harmony-k8s" }
uuid.workspace = true
url.workspace = true
kube = { workspace = true, features = ["derive"] }
@@ -60,7 +63,6 @@ temp-dir = "0.1.14"
dyn-clone = "1.0.19"
similar.workspace = true
futures-util = "0.3.31"
tokio-util = "0.7.15"
strum = { version = "0.27.1", features = ["derive"] }
tempfile.workspace = true
serde_with = "3.14.0"
@@ -80,7 +82,7 @@ sqlx.workspace = true
inquire.workspace = true
brocade = { path = "../brocade" }
option-ext = "0.2.0"
tokio-retry = "0.3.0"
rand.workspace = true
[dev-dependencies]
pretty_assertions.workspace = true

View File

@@ -4,8 +4,6 @@ use std::error::Error;
use async_trait::async_trait;
use derive_new::new;
use crate::inventory::HostRole;
use super::{
data::Version, executors::ExecutorError, inventory::Inventory, topology::PreparationError,
};

View File

@@ -1,4 +1,5 @@
use async_trait::async_trait;
use harmony_k8s::K8sClient;
use harmony_macros::ip;
use harmony_types::{
id::Id,
@@ -8,7 +9,7 @@ use harmony_types::{
use log::debug;
use log::info;
use crate::topology::PxeOptions;
use crate::topology::{HelmCommand, PxeOptions};
use crate::{data::FileContent, executors::ExecutorError, topology::node_exporter::NodeExporter};
use crate::{infra::network_manager::OpenShiftNmStateNetworkManager, topology::PortConfig};
@@ -16,9 +17,12 @@ use super::{
DHCPStaticEntry, DhcpServer, DnsRecord, DnsRecordType, DnsServer, Firewall, HostNetworkConfig,
HttpServer, IpAddress, K8sclient, LoadBalancer, LoadBalancerService, LogicalHost, NetworkError,
NetworkManager, PreparationError, PreparationOutcome, Router, Switch, SwitchClient,
SwitchError, TftpServer, Topology, k8s::K8sClient,
SwitchError, TftpServer, Topology,
};
use std::{
process::Command,
sync::{Arc, OnceLock},
};
use std::sync::{Arc, OnceLock};
#[derive(Debug, Clone)]
pub struct HAClusterTopology {
@@ -52,6 +56,30 @@ impl Topology for HAClusterTopology {
}
}
impl HelmCommand for HAClusterTopology {
fn get_helm_command(&self) -> Command {
let mut cmd = Command::new("helm");
if let Some(k) = &self.kubeconfig {
cmd.args(["--kubeconfig", k]);
}
// FIXME we should support context anywhere there is a k8sclient
// This likely belongs in the k8sclient itself and should be extracted to a separate
// crate
//
// I feel like helm could very well be a feature of this external k8s client.
//
// Same for kustomize
//
// if let Some(c) = &self.k8s_context {
// cmd.args(["--kube-context", c]);
// }
info!("Using helm command {cmd:?}");
cmd
}
}
#[async_trait]
impl K8sclient for HAClusterTopology {
async fn k8s_client(&self) -> Result<Arc<K8sClient>, String> {

File diff suppressed because it is too large Load Diff

View File

@@ -2,6 +2,7 @@ use std::{collections::BTreeMap, process::Command, sync::Arc, time::Duration};
use async_trait::async_trait;
use base64::{Engine, engine::general_purpose};
use harmony_k8s::{K8sClient, KubernetesDistribution};
use harmony_types::rfc1123::Rfc1123Name;
use k8s_openapi::api::{
core::v1::{Pod, Secret},
@@ -58,7 +59,6 @@ use crate::{
use super::super::{
DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, PreparationError,
PreparationOutcome, Topology,
k8s::K8sClient,
oberservability::monitoring::AlertReceiver,
tenant::{
TenantConfig, TenantManager,
@@ -76,13 +76,6 @@ struct K8sState {
message: String,
}
#[derive(Debug, Clone, Serialize)]
pub enum KubernetesDistribution {
OpenshiftFamily,
K3sFamily,
Default,
}
#[derive(Debug, Clone)]
enum K8sSource {
LocalK3d,
@@ -116,6 +109,13 @@ impl K8sclient for K8sAnywhereTopology {
#[async_trait]
impl TlsRouter for K8sAnywhereTopology {
async fn get_public_domain(&self) -> Result<String, String> {
match &self.config.public_domain {
Some(public_domain) => Ok(public_domain.to_string()),
None => Err("Public domain not available".to_string()),
}
}
async fn get_internal_domain(&self) -> Result<Option<String>, String> {
match self.get_k8s_distribution().await.map_err(|e| {
format!(
@@ -1131,6 +1131,7 @@ pub struct K8sAnywhereConfig {
///
/// If the context name is not found, it will fail to initialize.
pub k8s_context: Option<String>,
public_domain: Option<String>,
}
impl K8sAnywhereConfig {
@@ -1158,6 +1159,7 @@ impl K8sAnywhereConfig {
let mut kubeconfig: Option<String> = None;
let mut k8s_context: Option<String> = None;
let mut public_domain: Option<String> = None;
for part in env_var_value.split(',') {
let kv: Vec<&str> = part.splitn(2, '=').collect();
@@ -1165,6 +1167,7 @@ impl K8sAnywhereConfig {
match kv[0].trim() {
"kubeconfig" => kubeconfig = Some(kv[1].trim().to_string()),
"context" => k8s_context = Some(kv[1].trim().to_string()),
"public_domain" => public_domain = Some(kv[1].trim().to_string()),
_ => {}
}
}
@@ -1182,6 +1185,7 @@ impl K8sAnywhereConfig {
K8sAnywhereConfig {
kubeconfig,
k8s_context,
public_domain,
use_system_kubeconfig,
autoinstall: false,
use_local_k3d: false,
@@ -1224,6 +1228,7 @@ impl K8sAnywhereConfig {
use_local_k3d: std::env::var("HARMONY_USE_LOCAL_K3D")
.map_or_else(|_| true, |v| v.parse().ok().unwrap_or(true)),
k8s_context: std::env::var("HARMONY_K8S_CONTEXT").ok(),
public_domain: std::env::var("HARMONY_PUBLIC_DOMAIN").ok(),
}
}
}

View File

@@ -1,7 +1,6 @@
use async_trait::async_trait;
use crate::{
interpret::Outcome,
inventory::Inventory,
modules::postgresql::{
K8sPostgreSQLScore,

View File

@@ -106,6 +106,7 @@ pub enum SSL {
#[derive(Debug, Clone, PartialEq, Serialize)]
pub enum HealthCheck {
HTTP(String, HttpMethod, HttpStatusCode, SSL),
/// HTTP(None, "/healthz/ready", HttpMethod::GET, HttpStatusCode::Success2xx, SSL::Disabled)
HTTP(Option<u16>, String, HttpMethod, HttpStatusCode, SSL),
TCP(Option<u16>),
}

View File

@@ -16,7 +16,6 @@ pub mod tenant;
use derive_new::new;
pub use k8s_anywhere::*;
pub use localhost::*;
pub mod k8s;
mod load_balancer;
pub mod router;
mod tftp;

View File

@@ -9,6 +9,7 @@ use std::{
use async_trait::async_trait;
use brocade::PortOperatingMode;
use derive_new::new;
use harmony_k8s::K8sClient;
use harmony_types::{
id::Id,
net::{IpAddress, MacAddress},
@@ -18,7 +19,7 @@ use serde::Serialize;
use crate::executors::ExecutorError;
use super::{LogicalHost, k8s::K8sClient};
use super::LogicalHost;
#[derive(Debug)]
pub struct DHCPStaticEntry {

View File

@@ -122,4 +122,6 @@ pub trait TlsRouter: Send + Sync {
/// Returns the port that this router exposes externally.
async fn get_router_port(&self) -> u16;
async fn get_public_domain(&self) -> Result<String, String>;
}

View File

@@ -1,10 +1,8 @@
use std::sync::Arc;
use crate::{
executors::ExecutorError,
topology::k8s::{ApplyStrategy, K8sClient},
};
use crate::executors::ExecutorError;
use async_trait::async_trait;
use harmony_k8s::K8sClient;
use k8s_openapi::{
api::{
core::v1::{LimitRange, Namespace, ResourceQuota},
@@ -14,7 +12,7 @@ use k8s_openapi::{
},
apimachinery::pkg::util::intstr::IntOrString,
};
use kube::{Resource, api::DynamicObject};
use kube::Resource;
use log::debug;
use serde::de::DeserializeOwned;
use serde_json::json;
@@ -59,7 +57,6 @@ impl K8sTenantManager {
) -> Result<K, ExecutorError>
where
<K as kube::Resource>::DynamicType: Default,
<K as kube::Resource>::Scope: ApplyStrategy<K>,
{
self.apply_labels(&mut resource, config);
self.k8s_client

View File

@@ -5,6 +5,7 @@ use std::{
use askama::Template;
use async_trait::async_trait;
use harmony_k8s::{DrainOptions, K8sClient, NodeFile};
use harmony_types::id::Id;
use k8s_openapi::api::core::v1::Node;
use kube::{
@@ -15,10 +16,7 @@ use log::{debug, info, warn};
use crate::{
modules::okd::crd::nmstate,
topology::{
HostNetworkConfig, NetworkError, NetworkManager,
k8s::{DrainOptions, K8sClient, NodeFile},
},
topology::{HostNetworkConfig, NetworkError, NetworkManager},
};
/// NetworkManager bond configuration template

View File

@@ -216,7 +216,15 @@ pub(crate) fn get_health_check_for_backend(
SSL::Other(other.to_string())
}
};
Some(HealthCheck::HTTP(path, method, status_code, ssl))
let port = haproxy_health_check
.checkport
.content_string()
.parse::<u16>()
.ok();
debug!("Found haproxy healthcheck port {port:?}");
Some(HealthCheck::HTTP(port, path, method, status_code, ssl))
}
_ => panic!("Received unsupported health check type {}", uppercase),
}
@@ -251,7 +259,7 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
// frontend points to backend
let healthcheck = if let Some(health_check) = &service.health_check {
match health_check {
HealthCheck::HTTP(path, http_method, _http_status_code, ssl) => {
HealthCheck::HTTP(port, path, http_method, _http_status_code, ssl) => {
let ssl: MaybeString = match ssl {
SSL::SSL => "ssl".into(),
SSL::SNI => "sslni".into(),
@@ -259,14 +267,21 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
SSL::Default => "".into(),
SSL::Other(other) => other.as_str().into(),
};
let path_without_query = path.split_once('?').map_or(path.as_str(), |(p, _)| p);
let (port, port_name) = match port {
Some(port) => (Some(port.to_string()), port.to_string()),
None => (None, "serverport".to_string()),
};
let haproxy_check = HAProxyHealthCheck {
name: format!("HTTP_{http_method}_{path}"),
name: format!("HTTP_{http_method}_{path_without_query}_{port_name}"),
uuid: Uuid::new_v4().to_string(),
http_method: http_method.to_string().into(),
http_method: http_method.to_string().to_lowercase().into(),
health_check_type: "http".to_string(),
http_uri: path.clone().into(),
interval: "2s".to_string(),
ssl,
checkport: MaybeString::from(port.map(|p| p.to_string())),
..Default::default()
};
@@ -305,7 +320,10 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
let mut backend = HAProxyBackend {
uuid: Uuid::new_v4().to_string(),
enabled: 1,
name: format!("backend_{}", service.listening_port),
name: format!(
"backend_{}",
service.listening_port.to_string().replace(':', "_")
),
algorithm: "roundrobin".to_string(),
random_draws: Some(2),
stickiness_expire: "30m".to_string(),
@@ -337,10 +355,22 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
let frontend = Frontend {
uuid: uuid::Uuid::new_v4().to_string(),
enabled: 1,
name: format!("frontend_{}", service.listening_port),
name: format!(
"frontend_{}",
service.listening_port.to_string().replace(':', "_")
),
bind: service.listening_port.to_string(),
mode: "tcp".to_string(), // TODO do not depend on health check here
default_backend: Some(backend.uuid.clone()),
stickiness_expire: "30m".to_string().into(),
stickiness_size: "50k".to_string().into(),
stickiness_conn_rate_period: "10s".to_string().into(),
stickiness_sess_rate_period: "10s".to_string().into(),
stickiness_http_req_rate_period: "10s".to_string().into(),
stickiness_http_err_rate_period: "10s".to_string().into(),
stickiness_bytes_in_rate_period: "1m".to_string().into(),
stickiness_bytes_out_rate_period: "1m".to_string().into(),
ssl_hsts_max_age: 15768000,
..Default::default()
};
info!("HAPRoxy frontend and backend mode currently hardcoded to tcp");

View File

@@ -1,5 +1,5 @@
use async_trait::async_trait;
use log::{debug, info, trace};
use log::{debug, info};
use serde::Serialize;
use std::path::PathBuf;

View File

@@ -1,4 +1,5 @@
use async_trait::async_trait;
use harmony_k8s::K8sClient;
use harmony_macros::hurl;
use log::{debug, info, trace, warn};
use non_blank_string_rs::NonBlankString;
@@ -14,7 +15,7 @@ use crate::{
helm::chart::{HelmChartScore, HelmRepository},
},
score::Score,
topology::{HelmCommand, K8sclient, Topology, ingress::Ingress, k8s::K8sClient},
topology::{HelmCommand, K8sclient, Topology, ingress::Ingress},
};
use harmony_types::id::Id;

View File

@@ -1,6 +1,5 @@
use std::fs::{self};
use std::path::{Path, PathBuf};
use std::process;
use std::path::PathBuf;
use std::sync::Arc;
use async_trait::async_trait;
@@ -65,6 +64,7 @@ pub struct RustWebapp {
///
/// This is the place to put the public host name if this is a public facing webapp.
pub dns: String,
pub version: String,
}
impl Application for RustWebapp {
@@ -465,6 +465,7 @@ impl RustWebapp {
let app_name = &self.name;
let service_port = self.service_port;
let version = &self.version;
// Create Chart.yaml
let chart_yaml = format!(
r#"
@@ -472,7 +473,7 @@ apiVersion: v2
name: {chart_name}
description: A Helm chart for the {app_name} web application.
type: application
version: 0.2.1
version: {version}
appVersion: "{image_tag}"
"#,
);

View File

@@ -1,8 +1,9 @@
use std::sync::Arc;
use harmony_k8s::K8sClient;
use log::{debug, info};
use crate::{interpret::InterpretError, topology::k8s::K8sClient};
use crate::interpret::InterpretError;
#[derive(Clone, Debug, PartialEq, Eq)]
pub enum ArgoScope {

View File

@@ -44,6 +44,12 @@ pub struct BrocadeSwitchAuth {
pub password: String,
}
impl BrocadeSwitchAuth {
pub fn user_pass(username: String, password: String) -> Self {
Self { username, password }
}
}
#[derive(Secret, Clone, Debug, JsonSchema, Serialize, Deserialize)]
pub struct BrocadeSnmpAuth {
pub username: String,

View File

@@ -1,3 +1,4 @@
use harmony_k8s::K8sClient;
use std::sync::Arc;
use async_trait::async_trait;
@@ -11,7 +12,7 @@ use crate::{
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
topology::{K8sclient, Topology},
};
#[derive(Clone, Debug, Serialize)]

View File

@@ -54,6 +54,12 @@ pub enum HarmonyDiscoveryStrategy {
SUBNET { cidr: cidr::Ipv4Cidr, port: u16 },
}
impl Default for HarmonyDiscoveryStrategy {
fn default() -> Self {
HarmonyDiscoveryStrategy::MDNS
}
}
#[async_trait]
impl<T: Topology> Interpret<T> for DiscoverInventoryAgentInterpret {
async fn execute(

View File

@@ -3,7 +3,8 @@ use std::sync::Arc;
use async_trait::async_trait;
use log::warn;
use crate::topology::{FailoverTopology, K8sclient, k8s::K8sClient};
use crate::topology::{FailoverTopology, K8sclient};
use harmony_k8s::K8sClient;
#[async_trait]
impl<T: K8sclient> K8sclient for FailoverTopology<T> {

View File

@@ -1,5 +1,5 @@
use async_trait::async_trait;
use k8s_openapi::NamespaceResourceScope;
use k8s_openapi::ResourceScope;
use kube::Resource;
use log::info;
use serde::{Serialize, de::DeserializeOwned};
@@ -29,7 +29,7 @@ impl<K: Resource + std::fmt::Debug> K8sResourceScore<K> {
}
impl<
K: Resource<Scope = NamespaceResourceScope>
K: Resource<Scope: ResourceScope>
+ std::fmt::Debug
+ Sync
+ DeserializeOwned
@@ -61,7 +61,7 @@ pub struct K8sResourceInterpret<K: Resource + std::fmt::Debug + Sync + Send> {
#[async_trait]
impl<
K: Resource<Scope = NamespaceResourceScope>
K: Resource<Scope: ResourceScope>
+ Clone
+ std::fmt::Debug
+ DeserializeOwned
@@ -109,7 +109,7 @@ where
topology
.k8s_client()
.await
.expect("Environment should provide enough information to instanciate a client")
.map_err(|e| InterpretError::new(format!("Failed to get k8s client : {e}")))?
.apply_many(&self.score.resource, self.score.namespace.as_deref())
.await?;

View File

@@ -15,10 +15,13 @@ pub mod load_balancer;
pub mod monitoring;
pub mod nats;
pub mod network;
pub mod node_health;
pub mod okd;
pub mod openbao;
pub mod opnsense;
pub mod postgresql;
pub mod prometheus;
pub mod storage;
pub mod tenant;
pub mod tftp;
pub mod zitadel;

View File

@@ -6,7 +6,7 @@ use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::{
interpret::{InterpretError, Outcome},
interpret::InterpretError,
inventory::Inventory,
modules::{
monitoring::{
@@ -17,10 +17,10 @@ use crate::{
topology::{
K8sclient, Topology,
installable::Installable,
k8s::K8sClient,
oberservability::monitoring::{AlertReceiver, AlertSender, ScrapeTarget},
},
};
use harmony_k8s::K8sClient;
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(

View File

@@ -4,10 +4,8 @@ use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::topology::{
k8s::K8sClient,
oberservability::monitoring::{AlertReceiver, AlertSender},
};
use crate::topology::oberservability::monitoring::{AlertReceiver, AlertSender};
use harmony_k8s::K8sClient;
#[derive(CustomResource, Serialize, Deserialize, Debug, Clone, JsonSchema)]
#[kube(

View File

@@ -11,8 +11,9 @@ use crate::{
inventory::Inventory,
modules::monitoring::ntfy::helm::ntfy_helm_chart::ntfy_helm_chart_score,
score::Score,
topology::{HelmCommand, K8sclient, MultiTargetTopology, Topology, k8s::K8sClient},
topology::{HelmCommand, K8sclient, MultiTargetTopology, Topology},
};
use harmony_k8s::K8sClient;
use harmony_types::id::Id;
#[derive(Debug, Clone, Serialize)]

View File

@@ -1,9 +1,7 @@
use std::{collections::BTreeMap, sync::Arc};
use crate::{
interpret::{InterpretError, Outcome},
topology::k8s::K8sClient,
};
use crate::interpret::{InterpretError, Outcome};
use harmony_k8s::K8sClient;
use k8s_openapi::api::core::v1::ConfigMap;
use kube::api::ObjectMeta;

View File

@@ -1,3 +1,7 @@
use std::time::Duration;
use tokio_retry::{Retry, strategy::ExponentialBackoff};
use crate::modules::{
cert_manager::{
capability::{CertificateManagement, CertificateManagementConfig},
@@ -69,9 +73,28 @@ where
.await
.map_err(|e| e.to_string())?;
self.topology
let strategy = ExponentialBackoff::from_millis(250)
.factor(2)
.max_delay(Duration::from_millis(1000))
.take(10);
Retry::spawn(strategy, || async {
log::debug!("Attempting CA cert fetch");
let res = self
.topology
.get_ca_certificate(root_ca_cert_name.into(), &root_ca_config)
.await;
match res {
Ok(cert) => Ok(cert),
Err(e) => {
log::warn!("Retryable error: {:?}", e);
Err(e)
}
}
})
.await
.map_err(|e| e.to_string())
.map_err(|e| format!("Retries exhausted: {:?}", e))
}
}

View File

@@ -1,6 +1,7 @@
use std::{collections::BTreeMap, str::FromStr};
use async_trait::async_trait;
use harmony_k8s::KubernetesDistribution;
use harmony_macros::hurl;
use harmony_secret::{Secret, SecretManager};
use harmony_types::id::Id;
@@ -25,7 +26,7 @@ use crate::{
},
},
score::Score,
topology::{HelmCommand, K8sclient, KubernetesDistribution, TlsRouter, Topology},
topology::{HelmCommand, K8sclient, TlsRouter, Topology},
};
#[derive(Debug, Clone, Serialize)]

View File

@@ -4,7 +4,20 @@ use log::warn;
use crate::topology::{FailoverTopology, TlsRoute, TlsRouter};
#[async_trait]
impl<T: TlsRouter> TlsRouter for FailoverTopology<T> {
impl<T: TlsRouter + Send> TlsRouter for FailoverTopology<T> {
async fn get_public_domain(&self) -> Result<String, String> {
/*
let primary_domain = self
.primary
.get_public_domain()
.await
.map_err(|e| e.to_string())?;
Ok(primary_domain)
*/
todo!()
}
async fn get_internal_domain(&self) -> Result<Option<String>, String> {
todo!()
}

View File

@@ -0,0 +1,279 @@
use async_trait::async_trait;
use harmony_types::id::Id;
use k8s_openapi::api::scheduling::v1::PriorityClass;
use k8s_openapi::api::{
apps::v1::{DaemonSet, DaemonSetSpec},
core::v1::{
Container, ContainerPort, EnvVar, EnvVarSource, Namespace, ObjectFieldSelector, PodSpec,
PodTemplateSpec, ResourceRequirements, ServiceAccount, Toleration,
},
rbac::v1::{ClusterRole, ClusterRoleBinding, PolicyRule, Role, RoleBinding, RoleRef, Subject},
};
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use kube::api::ObjectMeta;
use serde::Serialize;
use std::collections::BTreeMap;
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
modules::k8s::resource::K8sResourceScore,
score::Score,
topology::{K8sclient, Topology},
};
#[derive(Clone, Debug, Serialize)]
pub struct NodeHealthScore {}
impl<T: Topology + K8sclient> Score<T> for NodeHealthScore {
fn name(&self) -> String {
format!("NodeHealthScore")
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(NodeHealthInterpret {})
}
}
#[derive(Debug, Clone)]
pub struct NodeHealthInterpret {}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for NodeHealthInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let namespace_name = "harmony-node-healthcheck".to_string();
// Namespace
let mut labels = BTreeMap::new();
labels.insert("name".to_string(), namespace_name.clone());
let namespace = Namespace {
metadata: ObjectMeta {
name: Some(namespace_name.clone()),
labels: Some(labels),
..ObjectMeta::default()
},
..Namespace::default()
};
// ServiceAccount
let service_account_name = "node-healthcheck-sa".to_string();
let service_account = ServiceAccount {
metadata: ObjectMeta {
name: Some(service_account_name.clone()),
namespace: Some(namespace_name.clone()),
..ObjectMeta::default()
},
..ServiceAccount::default()
};
// ClusterRole
let cluster_role = ClusterRole {
metadata: ObjectMeta {
name: Some("node-healthcheck-role".to_string()),
..ObjectMeta::default()
},
rules: Some(vec![PolicyRule {
api_groups: Some(vec!["".to_string()]),
resources: Some(vec!["nodes".to_string()]),
verbs: vec!["get".to_string(), "list".to_string()],
..PolicyRule::default()
}]),
..ClusterRole::default()
};
// Role
let role = Role {
metadata: ObjectMeta {
name: Some("allow-hostnetwork-scc".to_string()),
namespace: Some(namespace_name.clone()),
..ObjectMeta::default()
},
rules: Some(vec![PolicyRule {
api_groups: Some(vec!["security.openshift.io".to_string()]),
resources: Some(vec!["securitycontextconstraints".to_string()]),
resource_names: Some(vec!["hostnetwork".to_string()]),
verbs: vec!["use".to_string()],
..PolicyRule::default()
}]),
..Role::default()
};
// RoleBinding
let role_binding = RoleBinding {
metadata: ObjectMeta {
name: Some("node-status-querier-scc-binding".to_string()),
namespace: Some(namespace_name.clone()),
..ObjectMeta::default()
},
subjects: Some(vec![Subject {
kind: "ServiceAccount".to_string(),
name: service_account_name.clone(),
namespace: Some(namespace_name.clone()),
..Subject::default()
}]),
role_ref: RoleRef {
api_group: "rbac.authorization.k8s.io".to_string(),
kind: "Role".to_string(),
name: "allow-hostnetwork-scc".to_string(),
},
};
// ClusterRoleBinding
let cluster_role_binding = ClusterRoleBinding {
metadata: ObjectMeta {
name: Some("read-nodes-binding".to_string()),
..ObjectMeta::default()
},
subjects: Some(vec![Subject {
kind: "ServiceAccount".to_string(),
name: service_account_name.clone(),
namespace: Some(namespace_name.clone()),
..Subject::default()
}]),
role_ref: RoleRef {
api_group: "rbac.authorization.k8s.io".to_string(),
kind: "ClusterRole".to_string(),
name: "node-healthcheck-role".to_string(),
},
};
// PriorityClass
let priority_class_name = "node-healthcheck-critical".to_string();
let priority_class = PriorityClass {
metadata: ObjectMeta {
name: Some(priority_class_name.clone()),
..ObjectMeta::default()
},
value: 1000000000,
global_default: Some(false),
preemption_policy: Some("PreemptLowerPriority".to_string()),
description: Some("Highest priority for node health check daemonset - can preempt lower priority pods".to_string()),
};
// DaemonSet
let mut daemonset_labels = BTreeMap::new();
daemonset_labels.insert("app".to_string(), "node-healthcheck".to_string());
let daemon_set = DaemonSet {
metadata: ObjectMeta {
name: Some("node-healthcheck".to_string()),
namespace: Some(namespace_name.clone()),
labels: Some(daemonset_labels.clone()),
..ObjectMeta::default()
},
spec: Some(DaemonSetSpec {
selector: LabelSelector {
match_labels: Some(daemonset_labels.clone()),
..LabelSelector::default()
},
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(daemonset_labels),
..ObjectMeta::default()
}),
spec: Some(PodSpec {
service_account_name: Some(service_account_name.clone()),
host_network: Some(true),
priority_class_name: Some(priority_class_name),
tolerations: Some(vec![Toleration {
operator: Some("Exists".to_string()),
..Toleration::default()
}]),
containers: vec![Container {
name: "checker".to_string(),
image: Some(
"hub.nationtech.io/harmony/harmony-node-readiness-endpoint:latest"
.to_string(),
),
env: Some(vec![EnvVar {
name: "NODE_NAME".to_string(),
value_from: Some(EnvVarSource {
field_ref: Some(ObjectFieldSelector {
api_version: Some("v1".to_string()),
field_path: "spec.nodeName".to_string(),
..ObjectFieldSelector::default()
}),
..EnvVarSource::default()
}),
..EnvVar::default()
}]),
ports: Some(vec![ContainerPort {
container_port: 25001,
host_port: Some(25001),
name: Some("health-port".to_string()),
..ContainerPort::default()
}]),
resources: Some(ResourceRequirements {
requests: Some({
let mut requests = BTreeMap::new();
requests.insert("cpu".to_string(), Quantity("10m".to_string()));
requests
.insert("memory".to_string(), Quantity("50Mi".to_string()));
requests
}),
..ResourceRequirements::default()
}),
..Container::default()
}],
..PodSpec::default()
}),
},
..DaemonSetSpec::default()
}),
..DaemonSet::default()
};
K8sResourceScore::single(namespace, None)
.interpret(inventory, topology)
.await?;
K8sResourceScore::single(service_account, Some(namespace_name.clone()))
.interpret(inventory, topology)
.await?;
K8sResourceScore::single(cluster_role, None)
.interpret(inventory, topology)
.await?;
K8sResourceScore::single(role, Some(namespace_name.clone()))
.interpret(inventory, topology)
.await?;
K8sResourceScore::single(role_binding, Some(namespace_name.clone()))
.interpret(inventory, topology)
.await?;
K8sResourceScore::single(cluster_role_binding, None)
.interpret(inventory, topology)
.await?;
K8sResourceScore::single(priority_class, None)
.interpret(inventory, topology)
.await?;
K8sResourceScore::single(daemon_set, Some(namespace_name.clone()))
.interpret(inventory, topology)
.await?;
Ok(Outcome::success(
"Harmony node health successfully deployed".to_string(),
))
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("NodeHealth")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@@ -41,6 +41,7 @@ impl OKDBootstrapLoadBalancerScore {
backend_servers: Self::topology_to_backend_server(topology, 6443),
listening_port: SocketAddr::new(private_ip, 6443),
health_check: Some(HealthCheck::HTTP(
None,
"/readyz".to_string(),
HttpMethod::GET,
HttpStatusCode::Success2xx,

View File

@@ -8,7 +8,7 @@ use crate::{
score::Score,
topology::{
BackendServer, HAClusterTopology, HealthCheck, HttpMethod, HttpStatusCode, LoadBalancer,
LoadBalancerService, SSL, Topology,
LoadBalancerService, LogicalHost, Router, SSL, Topology,
},
};
@@ -23,32 +23,72 @@ pub struct OKDLoadBalancerScore {
load_balancer_score: LoadBalancerScore,
}
/// OKD Load Balancer Score configuration
///
/// This module configures the load balancer for OKD (OpenShift Kubernetes Distribution)
/// bare metal installations.
///
/// # Backend Server Configuration
///
/// For ports 80 and 443 (ingress traffic), the load balancer includes both control plane
/// and worker nodes in the backend pool. This is consistent with OKD's requirement that
/// ingress traffic should be load balanced across all nodes that may run ingress router pods.
///
/// For ports 22623 (Ignition API) and 6443 (Kubernetes API), only control plane nodes
/// are included as backends, as these services are control plane specific.
///
/// # References
///
/// - [OKD Bare Metal Installation - External Load Balancer Configuration]
/// (<https://docs.okd.io/latest/installing/installing_bare_metal/ipi/ipi-install-installation-workflow.html#nw-osp-configuring-external-load-balancer_ipi-install-installation-workflow>)
///
/// # Example
///
/// ```ignore
/// use harmony::topology::HAClusterTopology;
/// use harmony::modules::okd::OKDLoadBalancerScore;
///
/// let topology: HAClusterTopology = /* get topology from your infrastructure */;
/// let score = OKDLoadBalancerScore::new(&topology);
/// ```
impl OKDLoadBalancerScore {
pub fn new(topology: &HAClusterTopology) -> Self {
let public_ip = topology.router.get_gateway();
let public_services = vec![
LoadBalancerService {
backend_servers: Self::control_plane_to_backend_server(topology, 80),
backend_servers: Self::nodes_to_backend_server(topology, 80),
listening_port: SocketAddr::new(public_ip, 80),
health_check: Some(HealthCheck::TCP(None)),
health_check: None,
},
LoadBalancerService {
backend_servers: Self::control_plane_to_backend_server(topology, 443),
backend_servers: Self::nodes_to_backend_server(topology, 443),
listening_port: SocketAddr::new(public_ip, 443),
health_check: Some(HealthCheck::TCP(None)),
health_check: None,
},
];
let private_services = vec![
LoadBalancerService {
backend_servers: Self::control_plane_to_backend_server(topology, 80),
backend_servers: Self::nodes_to_backend_server(topology, 80),
listening_port: SocketAddr::new(public_ip, 80),
health_check: Some(HealthCheck::TCP(None)),
health_check: Some(HealthCheck::HTTP(
Some(25001),
"/health?check=okd_router_1936,node_ready".to_string(),
HttpMethod::GET,
HttpStatusCode::Success2xx,
SSL::Default,
)),
},
LoadBalancerService {
backend_servers: Self::control_plane_to_backend_server(topology, 443),
backend_servers: Self::nodes_to_backend_server(topology, 443),
listening_port: SocketAddr::new(public_ip, 443),
health_check: Some(HealthCheck::TCP(None)),
health_check: Some(HealthCheck::HTTP(
Some(25001),
"/health?check=okd_router_1936,node_ready".to_string(),
HttpMethod::GET,
HttpStatusCode::Success2xx,
SSL::Default,
)),
},
LoadBalancerService {
backend_servers: Self::control_plane_to_backend_server(topology, 22623),
@@ -59,6 +99,7 @@ impl OKDLoadBalancerScore {
backend_servers: Self::control_plane_to_backend_server(topology, 6443),
listening_port: SocketAddr::new(public_ip, 6443),
health_check: Some(HealthCheck::HTTP(
None,
"/readyz".to_string(),
HttpMethod::GET,
HttpStatusCode::Success2xx,
@@ -74,6 +115,11 @@ impl OKDLoadBalancerScore {
}
}
/// Creates backend servers list for control plane nodes only
///
/// Use this for control plane-specific services like:
/// - Port 22623: Ignition API (machine configuration during bootstrap)
/// - Port 6443: Kubernetes API server
fn control_plane_to_backend_server(
topology: &HAClusterTopology,
port: u16,
@@ -87,6 +133,194 @@ impl OKDLoadBalancerScore {
})
.collect()
}
/// Creates backend servers list for all nodes (control plane + workers)
///
/// Use this for ingress traffic that should be distributed across all nodes:
/// - Port 80: HTTP ingress traffic
/// - Port 443: HTTPS ingress traffic
///
/// In OKD, ingress router pods can run on any node, so both control plane
/// and worker nodes should be included in the load balancer backend pool.
fn nodes_to_backend_server(topology: &HAClusterTopology, port: u16) -> Vec<BackendServer> {
let mut nodes = Vec::new();
for cp in &topology.control_plane {
nodes.push(BackendServer {
address: cp.ip.to_string(),
port,
});
}
for worker in &topology.workers {
nodes.push(BackendServer {
address: worker.ip.to_string(),
port,
});
}
nodes
}
}
#[cfg(test)]
mod tests {
use std::sync::{Arc, OnceLock};
use super::*;
use crate::topology::DummyInfra;
use harmony_macros::ip;
use harmony_types::net::IpAddress;
fn create_test_topology() -> HAClusterTopology {
let router = Arc::new(DummyRouter {
gateway: ip!("192.168.1.1"),
});
HAClusterTopology {
domain_name: "test.example.com".to_string(),
router,
load_balancer: Arc::new(DummyInfra),
firewall: Arc::new(DummyInfra),
dhcp_server: Arc::new(DummyInfra),
tftp_server: Arc::new(DummyInfra),
http_server: Arc::new(DummyInfra),
dns_server: Arc::new(DummyInfra),
node_exporter: Arc::new(DummyInfra),
switch_client: Arc::new(DummyInfra),
bootstrap_host: LogicalHost {
ip: ip!("192.168.1.100"),
name: "bootstrap".to_string(),
},
control_plane: vec![
LogicalHost {
ip: ip!("192.168.1.10"),
name: "control-plane-0".to_string(),
},
LogicalHost {
ip: ip!("192.168.1.11"),
name: "control-plane-1".to_string(),
},
LogicalHost {
ip: ip!("192.168.1.12"),
name: "control-plane-2".to_string(),
},
],
workers: vec![
LogicalHost {
ip: ip!("192.168.1.20"),
name: "worker-0".to_string(),
},
LogicalHost {
ip: ip!("192.168.1.21"),
name: "worker-1".to_string(),
},
],
kubeconfig: None,
network_manager: OnceLock::new(),
}
}
struct DummyRouter {
gateway: IpAddress,
}
impl Router for DummyRouter {
fn get_gateway(&self) -> IpAddress {
self.gateway
}
fn get_cidr(&self) -> cidr::Ipv4Cidr {
let ipv4 = match self.gateway {
IpAddress::V4(ip) => ip,
IpAddress::V6(_) => panic!("IPv6 not supported"),
};
cidr::Ipv4Cidr::new(ipv4, 24).unwrap()
}
fn get_host(&self) -> LogicalHost {
LogicalHost {
ip: self.gateway,
name: "router".to_string(),
}
}
}
#[test]
fn test_nodes_to_backend_server_includes_control_plane_and_workers() {
let topology = create_test_topology();
let backend_servers = OKDLoadBalancerScore::nodes_to_backend_server(&topology, 80);
assert_eq!(backend_servers.len(), 5);
let addresses: Vec<&str> = backend_servers.iter().map(|s| s.address.as_str()).collect();
assert!(addresses.contains(&"192.168.1.10"));
assert!(addresses.contains(&"192.168.1.11"));
assert!(addresses.contains(&"192.168.1.12"));
assert!(addresses.contains(&"192.168.1.20"));
assert!(addresses.contains(&"192.168.1.21"));
}
#[test]
fn test_control_plane_to_backend_server_only_includes_control_plane() {
let topology = create_test_topology();
let backend_servers = OKDLoadBalancerScore::control_plane_to_backend_server(&topology, 80);
assert_eq!(backend_servers.len(), 3);
let addresses: Vec<&str> = backend_servers.iter().map(|s| s.address.as_str()).collect();
assert!(addresses.contains(&"192.168.1.10"));
assert!(addresses.contains(&"192.168.1.11"));
assert!(addresses.contains(&"192.168.1.12"));
assert!(!addresses.contains(&"192.168.1.20"));
assert!(!addresses.contains(&"192.168.1.21"));
}
#[test]
fn test_public_services_include_all_nodes_on_port_80_and_443() {
let topology = create_test_topology();
let score = OKDLoadBalancerScore::new(&topology);
let public_service_80 = score
.load_balancer_score
.public_services
.iter()
.find(|s| s.listening_port.port() == 80)
.expect("Public service on port 80 not found");
let public_service_443 = score
.load_balancer_score
.public_services
.iter()
.find(|s| s.listening_port.port() == 443)
.expect("Public service on port 443 not found");
assert_eq!(public_service_80.backend_servers.len(), 5);
assert_eq!(public_service_443.backend_servers.len(), 5);
}
#[test]
fn test_private_service_port_22623_only_control_plane() {
let topology = create_test_topology();
let score = OKDLoadBalancerScore::new(&topology);
let private_service_22623 = score
.load_balancer_score
.private_services
.iter()
.find(|s| s.listening_port.port() == 22623)
.expect("Private service on port 22623 not found");
assert_eq!(private_service_22623.backend_servers.len(), 3);
}
#[test]
fn test_all_backend_servers_have_correct_port() {
let topology = create_test_topology();
let backend_servers = OKDLoadBalancerScore::nodes_to_backend_server(&topology, 443);
for server in backend_servers {
assert_eq!(server.port, 443);
}
}
}
impl<T: Topology + LoadBalancer> Score<T> for OKDLoadBalancerScore {

View File

@@ -0,0 +1,88 @@
use std::str::FromStr;
use harmony_macros::hurl;
use non_blank_string_rs::NonBlankString;
use serde::Serialize;
use crate::{
interpret::Interpret,
modules::helm::chart::{HelmChartScore, HelmRepository},
score::Score,
topology::{HelmCommand, K8sclient, Topology},
};
#[derive(Debug, Serialize, Clone)]
pub struct OpenbaoScore {
/// Host used for external access (ingress)
pub host: String,
}
impl<T: Topology + K8sclient + HelmCommand> Score<T> for OpenbaoScore {
fn name(&self) -> String {
"OpenbaoScore".to_string()
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
// TODO exec pod commands to initialize secret store if not already done
let host = &self.host;
let values_yaml = Some(format!(
r#"global:
openshift: true
server:
standalone:
enabled: true
config: |
ui = true
listener "tcp" {{
tls_disable = true
address = "[::]:8200"
cluster_address = "[::]:8201"
}}
storage "file" {{
path = "/openbao/data"
}}
service:
enabled: true
ingress:
enabled: true
hosts:
- host: {host}
dataStorage:
enabled: true
size: 10Gi
storageClass: null
accessMode: ReadWriteOnce
auditStorage:
enabled: true
size: 10Gi
storageClass: null
accessMode: ReadWriteOnce
ui:
enabled: true"#
));
HelmChartScore {
namespace: Some(NonBlankString::from_str("openbao").unwrap()),
release_name: NonBlankString::from_str("openbao").unwrap(),
chart_name: NonBlankString::from_str("openbao/openbao").unwrap(),
chart_version: None,
values_overrides: None,
values_yaml,
create_namespace: true,
install_only: false,
repository: Some(HelmRepository::new(
"openbao".to_string(),
hurl!("https://openbao.github.io/openbao-helm"),
true,
)),
}
.create_interpret()
}
}

View File

@@ -37,6 +37,7 @@ pub struct PostgreSQLConfig {
/// settings incompatible with the default CNPG behavior.
pub namespace: String,
}
impl PostgreSQLConfig {
pub fn with_namespace(&self, namespace: &str) -> PostgreSQLConfig {
let mut new = self.clone();

View File

@@ -1,3 +1,6 @@
use std::collections::BTreeMap;
use std::collections::HashMap;
use kube::{CustomResource, api::ObjectMeta};
use serde::{Deserialize, Serialize};
@@ -13,9 +16,18 @@ use serde::{Deserialize, Serialize};
#[serde(rename_all = "camelCase")]
pub struct ClusterSpec {
pub instances: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub image_name: Option<String>,
pub storage: Storage,
pub bootstrap: Bootstrap,
#[serde(skip_serializing_if = "Option::is_none")]
pub external_clusters: Option<Vec<ExternalCluster>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub replica: Option<ReplicaSpec>,
/// This must be set to None if you want cnpg to generate a superuser secret
#[serde(skip_serializing_if = "Option::is_none")]
pub superuser_secret: Option<BTreeMap<String, String>>,
pub enable_superuser_access: bool,
}
impl Default for Cluster {
@@ -34,6 +46,10 @@ impl Default for ClusterSpec {
image_name: None,
storage: Storage::default(),
bootstrap: Bootstrap::default(),
external_clusters: None,
replica: None,
superuser_secret: None,
enable_superuser_access: false,
}
}
}
@@ -47,7 +63,13 @@ pub struct Storage {
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct Bootstrap {
pub initdb: Initdb,
#[serde(skip_serializing_if = "Option::is_none")]
pub initdb: Option<Initdb>,
#[serde(skip_serializing_if = "Option::is_none")]
pub recovery: Option<Recovery>,
#[serde(rename = "pg_basebackup")]
#[serde(skip_serializing_if = "Option::is_none")]
pub pg_basebackup: Option<PgBaseBackup>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
@@ -56,3 +78,50 @@ pub struct Initdb {
pub database: String,
pub owner: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct Recovery {
pub source: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
pub struct PgBaseBackup {
pub source: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ExternalCluster {
pub name: String,
pub connection_parameters: HashMap<String, String>,
pub ssl_key: Option<SecretKeySelector>,
pub ssl_cert: Option<SecretKeySelector>,
pub ssl_root_cert: Option<SecretKeySelector>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ConnectionParameters {
pub host: String,
pub user: String,
pub dbname: String,
pub sslmode: String,
pub sslnegotiation: String,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ReplicaSpec {
pub enabled: bool,
pub source: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub primary: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct SecretKeySelector {
pub name: String,
pub key: String,
}

View File

@@ -3,6 +3,8 @@ use log::debug;
use log::info;
use std::collections::HashMap;
use crate::interpret::InterpretError;
use crate::topology::TlsRoute;
use crate::topology::TlsRouter;
use crate::{
modules::postgresql::capability::{
@@ -49,8 +51,18 @@ impl<T: PostgreSQL + TlsRouter> PostgreSQL for FailoverTopology<T> {
// TODO we should be getting the public endpoint for a service by calling a method on
// TlsRouter capability.
// Something along the lines of `TlsRouter::get_hostname_for_service(...).await?;`
let host = format!(
"{}.{}.{}",
config.cluster_name,
config.namespace,
self.primary
.get_public_domain()
.await
.expect("failed to retrieve public domain")
.to_string()
);
let endpoint = PostgreSQLEndpoint {
host: "postgrestest.sto1.nationtech.io".to_string(),
host,
port: self.primary.get_router_port().await,
};
@@ -59,6 +71,46 @@ impl<T: PostgreSQL + TlsRouter> PostgreSQL for FailoverTopology<T> {
endpoint.host, endpoint.port
);
info!("installing primary postgres route");
let prim_hostname = format!(
"{}.{}.{}",
config.cluster_name,
config.namespace,
self.primary.get_public_domain().await?
);
let rw_backend = format!("{}-rw", config.cluster_name);
let tls_route = TlsRoute {
hostname: prim_hostname,
backend: rw_backend,
target_port: 5432,
namespace: config.namespace.clone(),
};
// Expose RW publicly via TLS passthrough
self.primary
.install_route(tls_route.clone())
.await
.map_err(|e| InterpretError::new(e))?;
info!("installing replica postgres route");
let rep_hostname = format!(
"{}.{}.{}",
config.cluster_name,
config.namespace,
self.replica.get_public_domain().await?
);
let rw_backend = format!("{}-rw", config.cluster_name);
let tls_route = TlsRoute {
hostname: rep_hostname,
backend: rw_backend,
target_port: 5432,
namespace: config.namespace.clone(),
};
// Expose RW publicly via TLS passthrough
self.replica
.install_route(tls_route.clone())
.await
.map_err(|e| InterpretError::new(e))?;
info!("Configuring replica connection parameters and bootstrap");
let mut connection_parameters = HashMap::new();

View File

@@ -20,7 +20,7 @@ use crate::topology::{K8sclient, Topology};
/// # Usage
/// ```
/// use harmony::modules::postgresql::CloudNativePgOperatorScore;
/// let score = CloudNativePgOperatorScore::default();
/// let score = CloudNativePgOperatorScore::default_openshift();
/// ```
///
/// Or, you can take control of most relevant fiedls this way :
@@ -52,8 +52,8 @@ pub struct CloudNativePgOperatorScore {
pub source_namespace: String,
}
impl Default for CloudNativePgOperatorScore {
fn default() -> Self {
impl CloudNativePgOperatorScore {
pub fn default_openshift() -> Self {
Self {
namespace: "openshift-operators".to_string(),
channel: "stable-v1".to_string(),
@@ -68,7 +68,7 @@ impl CloudNativePgOperatorScore {
pub fn new(namespace: &str) -> Self {
Self {
namespace: namespace.to_string(),
..Default::default()
..Self::default_openshift()
}
}
}

View File

@@ -1,12 +1,21 @@
use serde::Serialize;
use crate::data::Version;
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory;
use crate::interpret::Interpret;
use crate::modules::k8s::resource::K8sResourceScore;
use crate::modules::postgresql::capability::PostgreSQLConfig;
use crate::modules::postgresql::cnpg::{Bootstrap, Cluster, ClusterSpec, Initdb, Storage};
use crate::modules::postgresql::cnpg::{
Bootstrap, Cluster, ClusterSpec, ExternalCluster, Initdb, PgBaseBackup, ReplicaSpec,
SecretKeySelector, Storage,
};
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
use async_trait::async_trait;
use harmony_types::id::Id;
use k8s_openapi::ByteString;
use k8s_openapi::api::core::v1::Secret;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use serde::Serialize;
/// Deploys an opinionated, highly available PostgreSQL cluster managed by CNPG.
///
@@ -49,6 +58,30 @@ impl K8sPostgreSQLScore {
impl<T: Topology + K8sclient> Score<T> for K8sPostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(K8sPostgreSQLInterpret {
config: self.config.clone(),
})
}
fn name(&self) -> String {
format!("PostgreSQLScore({})", self.config.namespace)
}
}
#[derive(Debug)]
pub struct K8sPostgreSQLInterpret {
config: PostgreSQLConfig,
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for K8sPostgreSQLInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
match &self.config.role {
super::capability::PostgreSQLClusterRole::Primary => {
let metadata = ObjectMeta {
name: Some(self.config.cluster_name.clone()),
namespace: Some(self.config.namespace.clone()),
@@ -61,20 +94,148 @@ impl<T: Topology + K8sclient> Score<T> for K8sPostgreSQLScore {
size: self.config.storage_size.to_string(),
},
bootstrap: Bootstrap {
initdb: Initdb {
initdb: Some(Initdb {
database: "app".to_string(),
owner: "app".to_string(),
}),
recovery: None,
pg_basebackup: None,
},
enable_superuser_access: true,
..ClusterSpec::default()
};
let cluster = Cluster { metadata, spec };
Ok(
K8sResourceScore::single(cluster, Some(self.config.namespace.clone()))
.create_interpret()
.execute(inventory, topology)
.await?,
)
}
super::capability::PostgreSQLClusterRole::Replica(replica_config) => {
let metadata = ObjectMeta {
name: Some("streaming-replica-certs".to_string()),
namespace: Some(self.config.namespace.clone()),
..ObjectMeta::default()
};
// The data must be base64-encoded. If you already have PEM strings in your config, encode them:
let mut data = std::collections::BTreeMap::new();
data.insert(
"tls.key".to_string(),
ByteString(
replica_config
.replication_certs
.streaming_replica_key_pem
.as_bytes()
.to_vec(),
),
);
data.insert(
"tls.crt".to_string(),
ByteString(
replica_config
.replication_certs
.streaming_replica_cert_pem
.as_bytes()
.to_vec(),
),
);
data.insert(
"ca.crt".to_string(),
ByteString(
replica_config
.replication_certs
.ca_cert_pem
.as_bytes()
.to_vec(),
),
);
let secret = Secret {
metadata,
data: Some(data),
string_data: None, // You could use string_data if you prefer raw strings
type_: Some("Opaque".to_string()),
..Secret::default()
};
K8sResourceScore::single(secret, Some(self.config.namespace.clone()))
.create_interpret()
.execute(inventory, topology)
.await?;
let metadata = ObjectMeta {
name: Some(self.config.cluster_name.clone()),
namespace: Some(self.config.namespace.clone()),
..ObjectMeta::default()
};
let spec = ClusterSpec {
instances: self.config.instances,
storage: Storage {
size: self.config.storage_size.to_string(),
},
bootstrap: Bootstrap {
initdb: None,
recovery: None,
pg_basebackup: Some(PgBaseBackup {
source: replica_config.primary_cluster_name.clone(),
}),
},
external_clusters: Some(vec![ExternalCluster {
name: replica_config.primary_cluster_name.clone(),
connection_parameters: replica_config
.external_cluster
.connection_parameters
.clone(),
ssl_key: Some(SecretKeySelector {
name: "streaming-replica-certs".to_string(),
key: "tls.key".to_string(),
}),
ssl_cert: Some(SecretKeySelector {
name: "streaming-replica-certs".to_string(),
key: "tls.crt".to_string(),
}),
ssl_root_cert: Some(SecretKeySelector {
name: "streaming-replica-certs".to_string(),
key: "ca.crt".to_string(),
}),
}]),
replica: Some(ReplicaSpec {
enabled: true,
source: replica_config.primary_cluster_name.clone(),
primary: None,
}),
..ClusterSpec::default()
};
let cluster = Cluster { metadata, spec };
K8sResourceScore::single(cluster, Some(self.config.namespace.clone())).create_interpret()
Ok(
K8sResourceScore::single(cluster, Some(self.config.namespace.clone()))
.create_interpret()
.execute(inventory, topology)
.await?,
)
}
}
}
fn name(&self) -> String {
format!("PostgreSQLScore({})", self.config.namespace)
fn get_name(&self) -> InterpretName {
InterpretName::Custom("K8sPostgreSQLInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@@ -18,46 +18,31 @@ use crate::topology::Topology;
/// # Usage
/// ```
/// use harmony::modules::postgresql::PublicPostgreSQLScore;
/// let score = PublicPostgreSQLScore::new("harmony", "pg-rw.example.com");
/// let score = PublicPostgreSQLScore::new("harmony");
/// ```
#[derive(Debug, Clone, Serialize)]
pub struct PublicPostgreSQLScore {
/// Inner non-public Postgres cluster config.
pub config: PostgreSQLConfig,
/// Public hostname for RW TLS passthrough (port 443 → cluster-rw:5432).
pub hostname: String,
}
impl PublicPostgreSQLScore {
pub fn new(namespace: &str, hostname: &str) -> Self {
pub fn new(namespace: &str) -> Self {
Self {
config: PostgreSQLConfig::default().with_namespace(namespace),
hostname: hostname.to_string(),
}
}
}
impl<T: Topology + PostgreSQL + TlsRouter> Score<T> for PublicPostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let rw_backend = format!("{}-rw", self.config.cluster_name);
let tls_route = TlsRoute {
namespace: self.config.namespace.clone(),
hostname: self.hostname.clone(),
backend: rw_backend,
target_port: 5432,
};
Box::new(PublicPostgreSQLInterpret {
config: self.config.clone(),
tls_route,
})
}
fn name(&self) -> String {
format!(
"PublicPostgreSQLScore({}:{})",
self.config.namespace, self.hostname
)
format!("PublicPostgreSQLScore({})", self.config.namespace)
}
}
@@ -65,7 +50,6 @@ impl<T: Topology + PostgreSQL + TlsRouter> Score<T> for PublicPostgreSQLScore {
#[derive(Debug, Clone)]
struct PublicPostgreSQLInterpret {
config: PostgreSQLConfig,
tls_route: TlsRoute,
}
#[async_trait]
@@ -76,15 +60,28 @@ impl<T: Topology + PostgreSQL + TlsRouter> Interpret<T> for PublicPostgreSQLInte
.await
.map_err(|e| InterpretError::new(e))?;
let hostname = format!(
"{}.{}.{}",
self.config.cluster_name,
self.config.namespace,
topo.get_public_domain().await?
);
let rw_backend = format!("{}-rw", self.config.cluster_name);
let tls_route = TlsRoute {
hostname,
backend: rw_backend,
target_port: 5432,
namespace: self.config.namespace.clone(),
};
// Expose RW publicly via TLS passthrough
topo.install_route(self.tls_route.clone())
topo.install_route(tls_route.clone())
.await
.map_err(|e| InterpretError::new(e))?;
Ok(Outcome::success(format!(
"Public CNPG cluster '{}' deployed with TLS passthrough route '{}'",
self.config.cluster_name.clone(),
self.tls_route.hostname
tls_route.hostname
)))
}

View File

@@ -12,8 +12,7 @@ use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::C
use crate::modules::monitoring::kube_prometheus::crd::crd_default_rules::build_default_application_rules;
use crate::modules::monitoring::kube_prometheus::crd::crd_grafana::{
Grafana, GrafanaDashboard, GrafanaDashboardSpec, GrafanaDatasource, GrafanaDatasourceConfig,
GrafanaDatasourceJsonData, GrafanaDatasourceSpec, GrafanaSecretKeyRef, GrafanaSpec,
GrafanaValueFrom, GrafanaValueSource,
GrafanaDatasourceJsonData, GrafanaDatasourceSpec, GrafanaSpec,
};
use crate::modules::monitoring::kube_prometheus::crd::crd_prometheus_rules::{
PrometheusRule, PrometheusRuleSpec, RuleGroup,
@@ -23,7 +22,7 @@ use crate::modules::monitoring::kube_prometheus::crd::service_monitor::{
ServiceMonitor, ServiceMonitorSpec,
};
use crate::topology::oberservability::monitoring::AlertReceiver;
use crate::topology::{K8sclient, Topology, k8s::K8sClient};
use crate::topology::{K8sclient, Topology};
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
@@ -38,6 +37,7 @@ use crate::{
},
score::Score,
};
use harmony_k8s::K8sClient;
use harmony_types::id::Id;
use super::prometheus::PrometheusMonitoring;

View File

@@ -30,12 +30,13 @@ use crate::modules::monitoring::kube_prometheus::crd::rhob_service_monitor::{
use crate::score::Score;
use crate::topology::ingress::Ingress;
use crate::topology::oberservability::monitoring::AlertReceiver;
use crate::topology::{K8sclient, Topology, k8s::K8sClient};
use crate::topology::{K8sclient, Topology};
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
};
use harmony_k8s::K8sClient;
use harmony_types::id::Id;
use super::prometheus::PrometheusMonitoring;

View File

@@ -4,6 +4,7 @@ use std::{
};
use async_trait::async_trait;
use harmony_k8s::K8sClient;
use log::{debug, warn};
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
@@ -13,7 +14,7 @@ use crate::{
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
topology::{K8sclient, Topology},
};
use harmony_types::id::Id;

View File

@@ -9,8 +9,9 @@ use crate::{
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
topology::{K8sclient, Topology},
};
use harmony_k8s::K8sClient;
use harmony_types::id::Id;
#[derive(Clone, Debug, Serialize)]

View File

@@ -0,0 +1,518 @@
use k8s_openapi::api::core::v1::Namespace;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use k8s_openapi::{ByteString, api::core::v1::Secret};
use kube::{Error as KubeError, core::ErrorResponse};
use rand::distr::Distribution;
use rand::{Rng, rng, seq::SliceRandom};
use std::collections::BTreeMap;
use std::str::FromStr;
use async_trait::async_trait;
use harmony_macros::hurl;
use harmony_types::id::Id;
use harmony_types::storage::StorageSize;
use log::{debug, error, info, trace, warn};
use non_blank_string_rs::NonBlankString;
use serde::Serialize;
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
modules::helm::chart::{HelmChartScore, HelmRepository},
modules::k8s::resource::K8sResourceScore,
modules::postgresql::capability::{PostgreSQL, PostgreSQLClusterRole, PostgreSQLConfig},
score::Score,
topology::{HelmCommand, K8sclient, Topology},
};
const NAMESPACE: &str = "zitadel";
const PG_CLUSTER_NAME: &str = "zitadel-pg";
const MASTERKEY_SECRET_NAME: &str = "zitadel-masterkey";
/// Opinionated Zitadel deployment score.
///
/// Deploys a PostgreSQL cluster (via the [`PostgreSQL`] trait) and the Zitadel
/// Helm chart into the same namespace. Intended as a central multi-tenant IdP
/// with SSO for OKD/OpenShift, OpenBao, Harbor, Grafana, Nextcloud, Ente
/// Photos, and others.
///
/// # Ingress annotations
/// No controller-specific ingress annotations are set by default. On
/// OKD/OpenShift, the ingress should request TLS so the generated Route is
/// edge-terminated instead of HTTP-only. Optional cert-manager annotations are
/// included for clusters that have cert-manager installed; clusters without
/// cert-manager will ignore them.
/// Add or adjust annotations via `values_overrides` depending on your
/// distribution:
/// - NGINX: `nginx.ingress.kubernetes.io/backend-protocol: GRPC`
/// - OpenShift HAProxy: `route.openshift.io/termination: edge`
/// - AWS ALB: set `ingress.controller: aws`
///
/// # Database credentials
/// CNPG creates a `<cluster>-superuser` secret with key `password`. Because
/// `envVarsSecret` injects secret keys verbatim as env var names and the CNPG
/// key (`password`) does not match ZITADEL's expected name
/// (`ZITADEL_DATABASE_POSTGRES_USER_PASSWORD`), individual `env` entries with
/// `valueFrom.secretKeyRef` are used instead. For environments with an
/// External Secrets Operator or similar, create a dedicated secret with the
/// correct ZITADEL env var names and switch to `envVarsSecret`.
#[derive(Debug, Serialize, Clone)]
pub struct ZitadelScore {
/// External domain (e.g. `"auth.example.com"`).
pub host: String,
pub zitadel_version: String,
}
impl<T: Topology + K8sclient + HelmCommand + PostgreSQL> Score<T> for ZitadelScore {
fn name(&self) -> String {
"ZitadelScore".to_string()
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(ZitadelInterpret {
host: self.host.clone(),
zitadel_version: self.zitadel_version.clone(),
})
}
}
// ---------------------------------------------------------------------------
#[derive(Debug, Clone)]
struct ZitadelInterpret {
host: String,
zitadel_version: String,
}
#[async_trait]
impl<T: Topology + K8sclient + HelmCommand + PostgreSQL> Interpret<T> for ZitadelInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
info!(
"[Zitadel] Starting full deployment — namespace: '{NAMESPACE}', host: '{}'",
self.host
);
info!("Creating namespace {NAMESPACE} if it does not exist");
K8sResourceScore::single(
Namespace {
metadata: ObjectMeta {
name: Some(NAMESPACE.to_string()),
..Default::default()
},
..Default::default()
},
None,
)
.interpret(inventory, topology)
.await?;
// --- Step 1: PostgreSQL -------------------------------------------
let pg_config = PostgreSQLConfig {
cluster_name: PG_CLUSTER_NAME.to_string(),
instances: 2,
storage_size: StorageSize::gi(10),
role: PostgreSQLClusterRole::Primary,
namespace: NAMESPACE.to_string(),
};
debug!(
"[Zitadel] Deploying PostgreSQL cluster '{}' — instances: {}, storage: 10Gi, namespace: '{}'",
pg_config.cluster_name, pg_config.instances, pg_config.namespace
);
topology.deploy(&pg_config).await.map_err(|e| {
let msg = format!(
"[Zitadel] PostgreSQL deployment failed for '{}': {e}",
pg_config.cluster_name
);
error!("{msg}");
InterpretError::new(msg)
})?;
info!(
"[Zitadel] PostgreSQL cluster '{}' deployed",
pg_config.cluster_name
);
// --- Step 2: Resolve internal DB endpoint -------------------------
debug!(
"[Zitadel] Resolving internal endpoint for cluster '{}'",
pg_config.cluster_name
);
let endpoint = topology.get_endpoint(&pg_config).await.map_err(|e| {
let msg = format!(
"[Zitadel] Failed to resolve endpoint for cluster '{}': {e}",
pg_config.cluster_name
);
error!("{msg}");
InterpretError::new(msg)
})?;
info!(
"[Zitadel] DB endpoint resolved — host: '{}', port: {}",
endpoint.host, endpoint.port
);
// The CNPG-managed superuser secret contains 'password', 'username',
// 'host', 'port', 'dbname', 'uri'. We reference 'password' directly
// via env.valueFrom.secretKeyRef because CNPG's key names do not
// match ZITADEL's required env var names.
let pg_user_secret = format!("{PG_CLUSTER_NAME}-app");
let pg_superuser_secret = format!("{PG_CLUSTER_NAME}-superuser");
let db_host = &endpoint.host;
let db_port = endpoint.port;
let host = &self.host;
debug!("[Zitadel] DB credentials source — secret: '{pg_user_secret}', key: 'password'");
debug!(
"[Zitadel] DB credentials source — superuser secret: '{pg_superuser_secret}', key: 'password'"
);
// Zitadel requires one symbol, one number and more. So let's force it.
fn generate_secure_password(length: usize) -> String {
const ALPHA_UPPER: &[u8] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZ";
const ALPHA_LOWER: &[u8] = b"abcdefghijklmnopqrstuvwxyz";
const DIGITS: &[u8] = b"0123456789";
const SYMBOLS: &[u8] = b"!@#$%^&*()_+-=[]{}|;:',.<>?/";
let mut rng = rand::rng();
let uniform_alpha_upper = rand::distr::Uniform::new(0, ALPHA_UPPER.len())
.expect("Failed to create distribution");
let uniform_alpha_lower = rand::distr::Uniform::new(0, ALPHA_LOWER.len())
.expect("Failed to create distribution");
let uniform_digits =
rand::distr::Uniform::new(0, DIGITS.len()).expect("Failed to create distribution");
let uniform_symbols =
rand::distr::Uniform::new(0, SYMBOLS.len()).expect("Failed to create distribution");
let mut chars: Vec<char> = Vec::with_capacity(length);
// Ensure at least one of each: upper, lower, digit, symbol
chars.push(ALPHA_UPPER[uniform_alpha_upper.sample(&mut rng)] as char);
chars.push(ALPHA_LOWER[uniform_alpha_lower.sample(&mut rng)] as char);
chars.push(DIGITS[uniform_digits.sample(&mut rng)] as char);
chars.push(SYMBOLS[uniform_symbols.sample(&mut rng)] as char);
// Fill remaining with random from all categories
let all_chars: Vec<u8> = [ALPHA_UPPER, ALPHA_LOWER, DIGITS, SYMBOLS].concat();
let uniform_all = rand::distr::Uniform::new(0, all_chars.len())
.expect("Failed to create distribution");
for _ in 0..(length - 4) {
chars.push(all_chars[uniform_all.sample(&mut rng)] as char);
}
// Shuffle
let mut shuffled = chars;
shuffled.shuffle(&mut rng);
return shuffled.iter().collect();
}
let admin_password = generate_secure_password(16);
// --- Step 3: Create masterkey secret ------------------------------------
debug!(
"[Zitadel] Creating masterkey secret '{}' in namespace '{}'",
MASTERKEY_SECRET_NAME, NAMESPACE
);
// Masterkey for symmetric encryption — must be exactly 32 ASCII bytes (alphanumeric only).
let masterkey = rng()
.sample_iter(&rand::distr::Alphanumeric)
.take(32)
.map(char::from)
.collect::<String>();
debug!(
"[Zitadel] Created masterkey secret '{}' in namespace '{}'",
MASTERKEY_SECRET_NAME, NAMESPACE
);
let mut masterkey_data: BTreeMap<String, ByteString> = BTreeMap::new();
masterkey_data.insert("masterkey".to_string(), ByteString(masterkey.into()));
let masterkey_secret = Secret {
metadata: ObjectMeta {
name: Some(MASTERKEY_SECRET_NAME.to_string()),
namespace: Some(NAMESPACE.to_string()),
..ObjectMeta::default()
},
data: Some(masterkey_data),
..Secret::default()
};
match topology
.k8s_client()
.await
.map_err(|e| InterpretError::new(format!("Failed to get k8s client : {e}")))?
.create(&masterkey_secret, Some(NAMESPACE))
.await
{
Ok(_) => {
info!(
"[Zitadel] Masterkey secret '{}' created",
MASTERKEY_SECRET_NAME
);
}
Err(KubeError::Api(ErrorResponse { code: 409, .. })) => {
info!(
"[Zitadel] Masterkey secret '{}' already exists, leaving it untouched",
MASTERKEY_SECRET_NAME
);
}
Err(other) => {
let msg = format!(
"[Zitadel] Failed to create masterkey secret '{}': {other}",
MASTERKEY_SECRET_NAME
);
error!("{msg}");
return Err(InterpretError::new(msg));
}
};
debug!(
"[Zitadel] Masterkey secret '{}' created successfully",
MASTERKEY_SECRET_NAME
);
// --- Step 4: Build Helm values ------------------------------------
warn!(
"[Zitadel] Applying TLS-enabled ingress defaults for OKD/OpenShift. \
cert-manager annotations are included as optional hints and are \
ignored on clusters without cert-manager."
);
let values_yaml = format!(
r#"image:
tag: {zitadel_version}
zitadel:
masterkeySecretName: "{MASTERKEY_SECRET_NAME}"
configmapConfig:
ExternalDomain: "{host}"
ExternalSecure: true
FirstInstance:
Org:
Human:
UserName: "admin"
Password: "{admin_password}"
FirstName: "Zitadel"
LastName: "Admin"
Email: "admin@zitadel.example.com"
PasswordChangeRequired: true
TLS:
Enabled: false
Database:
Postgres:
Host: "{db_host}"
Port: {db_port}
Database: zitadel
MaxOpenConns: 20
MaxIdleConns: 10
User:
Username: postgres
SSL:
Mode: require
Admin:
Username: postgres
SSL:
Mode: require
# Directly import credentials from the postgres secret
# TODO : use a less privileged postgres user
env:
- name: ZITADEL_DATABASE_POSTGRES_USER_USERNAME
valueFrom:
secretKeyRef:
name: "{pg_superuser_secret}"
key: user
- name: ZITADEL_DATABASE_POSTGRES_USER_PASSWORD
valueFrom:
secretKeyRef:
name: "{pg_superuser_secret}"
key: password
- name: ZITADEL_DATABASE_POSTGRES_ADMIN_USERNAME
valueFrom:
secretKeyRef:
name: "{pg_superuser_secret}"
key: user
- name: ZITADEL_DATABASE_POSTGRES_ADMIN_PASSWORD
valueFrom:
secretKeyRef:
name: "{pg_superuser_secret}"
key: password
# Security context for OpenShift restricted PSA compliance
podSecurityContext:
runAsNonRoot: true
runAsUser: null
fsGroup: null
seccompProfile:
type: RuntimeDefault
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
runAsNonRoot: true
runAsUser: null
fsGroup: null
seccompProfile:
type: RuntimeDefault
# Init job security context (runs before main deployment)
initJob:
podSecurityContext:
runAsNonRoot: true
runAsUser: null
fsGroup: null
seccompProfile:
type: RuntimeDefault
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
runAsNonRoot: true
runAsUser: null
fsGroup: null
seccompProfile:
type: RuntimeDefault
# Setup job security context
setupJob:
podSecurityContext:
runAsNonRoot: true
runAsUser: null
fsGroup: null
seccompProfile:
type: RuntimeDefault
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
runAsNonRoot: true
runAsUser: null
fsGroup: null
seccompProfile:
type: RuntimeDefault
ingress:
enabled: true
annotations:
cert-manager.io/cluster-issuer: letsencrypt-prod
route.openshift.io/termination: edge
hosts:
- host: "{host}"
paths:
- path: /
pathType: Prefix
tls:
- hosts:
- "{host}"
secretName: "{host}-tls"
login:
enabled: true
podSecurityContext:
runAsNonRoot: true
runAsUser: null
fsGroup: null
seccompProfile:
type: RuntimeDefault
securityContext:
allowPrivilegeEscalation: false
capabilities:
drop:
- ALL
runAsNonRoot: true
runAsUser: null
fsGroup: null
seccompProfile:
type: RuntimeDefault
ingress:
enabled: true
annotations:
cert-manager.io/cluster-issuer: letsencrypt-prod
route.openshift.io/termination: edge
hosts:
- host: "{host}"
paths:
- path: /ui/v2/login
pathType: Prefix
tls:
- hosts:
- "{host}"
secretName: "{host}-tls""#,
zitadel_version = self.zitadel_version
);
trace!("[Zitadel] Helm values YAML:\n{values_yaml}");
// --- Step 5: Deploy Helm chart ------------------------------------
info!(
"[Zitadel] Deploying Helm chart 'zitadel/zitadel' as release 'zitadel' in namespace '{NAMESPACE}'"
);
let result = HelmChartScore {
namespace: Some(NonBlankString::from_str(NAMESPACE).unwrap()),
release_name: NonBlankString::from_str("zitadel").unwrap(),
chart_name: NonBlankString::from_str("zitadel/zitadel").unwrap(),
chart_version: None,
values_overrides: None,
values_yaml: Some(values_yaml),
create_namespace: true,
install_only: false,
repository: Some(HelmRepository::new(
"zitadel".to_string(),
hurl!("https://charts.zitadel.com"),
true,
)),
}
.interpret(inventory, topology)
.await;
match &result {
Ok(_) => info!(
"[Zitadel] Helm chart deployed successfully\n\n\
===== ZITADEL DEPLOYMENT COMPLETE =====\n\
Login URL: https://{host}\n\
Username: admin@zitadel.{host}\n\
Password: {admin_password}\n\n\
IMPORTANT: The password is saved in ConfigMap 'zitadel-config-yaml'\n\
and must be changed on first login. Save the credentials in a\n\
secure location after changing them.\n\
========================================="
),
Err(e) => error!("[Zitadel] Helm chart deployment failed: {e}"),
}
result
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("Zitadel")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
vec![]
}
}

View File

@@ -0,0 +1,17 @@
[package]
name = "harmony-node-readiness-endpoint"
version = "0.1.0"
edition = "2024"
[dependencies]
actix-web = "4"
kube.workspace = true
k8s-openapi.workspace = true
serde.workspace = true
serde_json.workspace = true
env_logger.workspace = true
log.workspace = true
tokio.workspace = true
reqwest.workspace = true
chrono.workspace = true
tower = "0.5.3"

View File

@@ -0,0 +1,13 @@
FROM debian:13-slim
# RUN apt-get update && apt-get install -y --no-install-recommends \
# ca-certificates \
# && rm -rf /var/lib/apt/lists/*
COPY harmony-node-readiness-endpoint /usr/local/bin/harmony-node-readiness-endpoint
ENV RUST_LOG=info
EXPOSE 25001
CMD ["harmony-node-readiness-endpoint"]

View File

@@ -0,0 +1,197 @@
# harmony-node-readiness-endpoint
**A lightweight, standalone Rust service for Kubernetes node health checking.**
Designed for **bare-metal Kubernetes clusters** with external load balancers (HAProxy, OPNsense, F5, etc.).
Exposes a simple HTTP endpoint (`/health`) on each node:
- **200 OK** — node is healthy and ready to receive traffic
- **503 Service Unavailable** — node should be removed from the load balancer pool
- **500 Internal Server Error** — misconfiguration (e.g. `NODE_NAME` not set)
This project is **not dependent on Harmony**, but is commonly used as part of Harmony bare-metal Kubernetes deployments.
## Why this project exists
In bare-metal environments, external load balancers often rely on pod-level or router-level checks that can lag behind the authoritative Kubernetes `Node.status.conditions[Ready]`.
This service provides the true source-of-truth with fast reaction time.
## Available checks
| Check name | Description | Status |
|--------------------|-------------------------------------------------------------|-------------------|
| `node_ready` | Queries `Node.status.conditions[Ready]` via Kubernetes API | Implemented |
| `okd_router_1936` | Probes OpenShift router `/healthz/ready` on port 1936 | Implemented |
| `filesystem_ro` | Detects read-only mounts via `/proc/mounts` | To be implemented |
| `kubelet` | Local probe to kubelet `/healthz` (port 10248) | To be implemented |
| `container_runtime`| Socket check + runtime status | To be implemented |
| `disk_pressure` | Threshold checks on key filesystems | To be implemented |
| `network` | DNS resolution + gateway connectivity | To be implemented |
| `custom_conditions`| Reacts to extra conditions (NPD, etc.) | To be implemented |
All checks are combined with logical **AND** — any single failure results in 503.
## Behavior
### `node_ready` check — fail-open design
The `node_ready` check queries the Kubernetes API server to read `Node.status.conditions[Ready]`.
Because this service runs on the node it is checking, there are scenarios where the API server is temporarily
unreachable (e.g. during a control-plane restart). To avoid incorrectly draining a healthy node in such cases,
the check is **fail-open**: it passes (reports ready) whenever the Kubernetes API is unavailable.
| Situation | Result | HTTP status |
|------------------------------------------------------|-------------------|-------------|
| `Node.conditions[Ready] == True` | Pass | 200 |
| `Node.conditions[Ready] == False` | Fail | 503 |
| `Ready` condition absent | Fail | 503 |
| API server unreachable or timed out (1 s timeout) | Pass (assumes ready) | 200 |
| Kubernetes client initialization failed | Pass (assumes ready) | 200 |
| `NODE_NAME` env var not set | Hard error | 500 |
A warning is logged whenever the API is unavailable and the check falls back to assuming ready.
### `okd_router_1936` check
Sends `GET http://127.0.0.1:1936/healthz/ready` with a 5-second timeout.
Returns pass on any 2xx response, fail otherwise.
### Unknown check names
Requesting an unknown check name (e.g. `check=bogus`) results in that check returning `passed: false`
with reason `"Unknown check: bogus"`, and the overall response is 503.
## How it works
### Node name discovery
The service reads the `NODE_NAME` environment variable, which must be injected via the Kubernetes Downward API:
```yaml
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
```
### Kubernetes API authentication
- Uses standard **in-cluster configuration** — no external credentials needed.
- The ServiceAccount token and CA certificate are automatically mounted at `/var/run/secrets/kubernetes.io/serviceaccount/`.
- Requires only minimal RBAC: `get` and `list` on the `nodes` resource (see `deploy/resources.yaml`).
- Connect and write timeouts are set to **1 second** to keep checks fast.
## Deploy
All Kubernetes resources (Namespace, ServiceAccount, ClusterRole, ClusterRoleBinding, and an OpenShift SCC RoleBinding for `hostnetwork`) are in a single file.
```bash
kubectl apply -f deploy/resources.yaml
kubectl apply -f deploy/daemonset.yaml
```
The DaemonSet uses `hostNetwork: true` and `hostPort: 25001`, so the endpoint is reachable directly on the node's IP at port 25001.
It tolerates all taints, ensuring it runs even on nodes marked unschedulable.
### Configure your external load balancer
**Example for HAProxy / OPNsense:**
- Check type: **HTTP**
- URI: `/health`
- Port: `25001` (configurable via `LISTEN_PORT` env var)
- Interval: 510 s
- Rise: 2
- Fall: 3
- Expect: `2xx`
## Endpoint usage
### Query parameter
Use the `check` query parameter to select which checks to run (comma-separated).
When omitted, only `node_ready` runs.
| Request | Checks run |
|------------------------------------------------|-----------------------------------|
| `GET /health` | `node_ready` |
| `GET /health?check=okd_router_1936` | `okd_router_1936` only |
| `GET /health?check=node_ready,okd_router_1936` | `node_ready` and `okd_router_1936`|
> **Note:** specifying `check=` replaces the default. Include `node_ready` explicitly if you need it alongside other checks.
### Response format
```json
{
"status": "ready" | "not-ready",
"checks": [
{
"name": "<check-name>",
"passed": true | false,
"reason": "<failure reason, omitted on success>",
"duration_ms": 42
}
],
"total_duration_ms": 42
}
```
**Healthy node (default)**
```http
HTTP/1.1 200 OK
{
"status": "ready",
"checks": [{ "name": "node_ready", "passed": true, "duration_ms": 42 }],
"total_duration_ms": 42
}
```
**Unhealthy node**
```http
HTTP/1.1 503 Service Unavailable
{
"status": "not-ready",
"checks": [
{ "name": "node_ready", "passed": false, "reason": "KubeletNotReady", "duration_ms": 35 }
],
"total_duration_ms": 35
}
```
**API server unreachable (fail-open)**
```http
HTTP/1.1 200 OK
{
"status": "ready",
"checks": [{ "name": "node_ready", "passed": true, "duration_ms": 1001 }],
"total_duration_ms": 1001
}
```
*(A warning is logged: `Kubernetes API appears to be down … Assuming node is ready.`)*
## Configuration
| Env var | Default | Description |
|---------------|----------|--------------------------------------|
| `NODE_NAME` | required | Node name, injected via Downward API |
| `LISTEN_PORT` | `25001` | TCP port the HTTP server binds to |
| `RUST_LOG` | — | Log level (e.g. `info`, `debug`) |
## Development
```bash
# Run locally
NODE_NAME=my-test-node cargo run
# Run tests
cargo test
```
---
*Minimal, auditable, and built for production bare-metal Kubernetes environments.*

View File

@@ -0,0 +1,13 @@
#!/bin/bash
# TODO
# This is meant to be run on a machine with harmony development tools installed (cargo, etc)
DOCKER_TAG="${DOCKER_TAG:-dev}"
cargo build --release
cp ../target/release/harmony-node-readiness-endpoint .
docker build . -t hub.nationtech.io/harmony/harmony-node-readiness-endpoint:${DOCKER_TAG}

View File

@@ -0,0 +1,36 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: node-healthcheck
namespace: harmony-node-healthcheck
spec:
selector:
matchLabels:
app: node-healthcheck
template:
metadata:
labels:
app: node-healthcheck
spec:
serviceAccountName: node-healthcheck-sa
hostNetwork: true
# This ensures the pod runs even if the node is already "unschedulable"
# so it can report the status correctly.
tolerations:
- operator: Exists
containers:
- name: checker
image: hub.nationtech.io/harmony/harmony-node-readiness-endpoint:latest
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
ports:
- containerPort: 25001
hostPort: 25001
name: health-port
resources:
requests:
cpu: 10m
memory: 50Mi

View File

@@ -0,0 +1,64 @@
apiVersion: v1
kind: Namespace
metadata:
name: harmony-node-healthcheck
labels:
name: harmony-node-healthcheck
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: node-healthcheck-sa
namespace: harmony-node-healthcheck
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: node-healthcheck-role
rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: allow-hostnetwork-scc
namespace: harmony-node-healthcheck
rules:
- apiGroups: ["security.openshift.io"]
resources: ["securitycontextconstraints"]
resourceNames: ["hostnetwork"]
verbs: ["use"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: node-status-querier-scc-binding
namespace: harmony-node-healthcheck
subjects:
- kind: ServiceAccount
name: node-healthcheck-sa
namespace: harmony-node-healthcheck
roleRef:
kind: Role
name: allow-hostnetwork-scc
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: read-nodes-binding
subjects:
- kind: ServiceAccount
name: node-healthcheck-sa
namespace: harmony-node-healthcheck
roleRef:
kind: ClusterRole
name: node-healthcheck-role
apiGroup: rbac.authorization.k8s.io

View File

@@ -0,0 +1,282 @@
use actix_web::{App, HttpResponse, HttpServer, Responder, get, web};
use k8s_openapi::api::core::v1::Node;
use kube::{Api, Client, Config};
use log::{debug, error, info, warn};
use reqwest;
use serde::{Deserialize, Serialize};
use std::env;
use std::time::{Duration, Instant};
use tokio::task::JoinSet;
const K8S_CLIENT_TIMEOUT: Duration = Duration::from_secs(1);
#[derive(Serialize, Deserialize)]
struct HealthStatus {
status: String,
checks: Vec<CheckResult>,
total_duration_ms: u128,
}
#[derive(Serialize, Deserialize)]
struct CheckResult {
name: String,
passed: bool,
#[serde(skip_serializing_if = "Option::is_none")]
reason: Option<String>,
duration_ms: u128,
}
#[derive(Serialize, Deserialize)]
struct HealthError {
status: String,
error: String,
}
#[derive(Deserialize)]
struct HealthQuery {
#[serde(rename = "check")]
checks: Option<String>,
}
/// Check if the node's Ready condition is true via Kubernetes API
async fn check_node_ready(client: Client, node_name: &str) -> Result<(), String> {
let nodes: Api<Node> = Api::all(client);
let node = match nodes.get(node_name).await {
Ok(n) => n,
Err(e) => {
warn!(
"Kubernetes API appears to be down, unreachable, or timed out for node '{}': {}. Assuming node is ready.",
node_name, e
);
return Ok(());
}
};
let conditions = node.status.and_then(|s| s.conditions).unwrap_or_default();
for condition in conditions {
if condition.type_ == "Ready" {
let is_ready = condition.status == "True";
let reason = condition
.reason
.clone()
.unwrap_or_else(|| "Unknown".to_string());
if !is_ready {
return Err(reason);
}
return Ok(());
}
}
Err("Ready condition not found".to_string())
}
/// Check OKD router health endpoint on port 1936
async fn check_okd_router_1936() -> Result<(), String> {
debug!("Checking okd router 1936");
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
.map_err(|e| format!("Failed to build HTTP client: {}", e))?;
let response = client
.get("http://127.0.0.1:1936/healthz/ready")
.send()
.await
.map_err(|e| format!("Failed to connect to OKD router: {}", e))?;
debug!("okd router 1936 response status {}", response.status());
if response.status().is_success() {
Ok(())
} else {
Err(format!("OKD router returned status: {}", response.status()))
}
}
/// Parse comma-separated check names from query parameter
fn parse_checks(checks_param: Option<&str>) -> Vec<String> {
match checks_param {
None => vec!["node_ready".to_string()],
Some(s) if s.is_empty() => vec!["node_ready".to_string()],
Some(s) => s.split(',').map(|c| c.trim().to_string()).collect(),
}
}
/// Run a single health check by name and return the result
async fn run_check(check_name: &str, client: Option<Client>, node_name: &str) -> CheckResult {
let start = Instant::now();
let result = match check_name {
"node_ready" => match client {
Some(c) => check_node_ready(c, node_name).await,
None => {
warn!(
"Kubernetes client not available for node '{}'. Assuming node is ready.",
node_name
);
Ok(())
}
},
"okd_router_1936" => check_okd_router_1936().await,
_ => Err(format!("Unknown check: {}", check_name)),
};
let duration_ms = start.elapsed().as_millis();
match result {
Ok(()) => CheckResult {
name: check_name.to_string(),
passed: true,
reason: None,
duration_ms,
},
Err(reason) => CheckResult {
name: check_name.to_string(),
passed: false,
reason: Some(reason),
duration_ms,
},
}
}
#[get("/health")]
async fn health(query: web::Query<HealthQuery>) -> impl Responder {
let node_name = match env::var("NODE_NAME") {
Ok(name) => name,
Err(_) => {
error!("NODE_NAME environment variable not set");
return HttpResponse::InternalServerError().json(HealthError {
status: "error".to_string(),
error: "NODE_NAME environment variable not set".to_string(),
});
}
};
// Parse requested checks from query parameter
let requested_checks = parse_checks(query.checks.as_deref());
// Check if node_ready check requires Kubernetes client
let needs_k8s_client = requested_checks.contains(&"node_ready".to_string());
// Initialize Kubernetes client only if needed
let k8s_client = if needs_k8s_client {
match Config::infer().await {
Ok(mut config) => {
config.write_timeout = Some(K8S_CLIENT_TIMEOUT);
config.connect_timeout = Some(K8S_CLIENT_TIMEOUT);
Some(Client::try_from(config).map_err(|e| e.to_string()))
}
Err(e) => {
warn!(
"Failed to infer Kubernetes config for node '{}': {}. Assuming node_ready is healthy.",
node_name, e
);
None
}
}
.and_then(|result| match result {
Ok(client) => Some(client),
Err(e) => {
warn!(
"Failed to create Kubernetes client for node '{}': {}. Assuming node_ready is healthy.",
node_name, e
);
None
}
})
} else {
None
};
// Run all requested checks in parallel
let start = Instant::now();
let mut join_set = JoinSet::new();
debug!("Running checks {requested_checks:?}");
for check_name in requested_checks {
let client = k8s_client.clone();
let node_name = node_name.clone();
join_set.spawn(async move { run_check(&check_name, client, &node_name).await });
}
let mut check_results = Vec::new();
while let Some(result) = join_set.join_next().await {
match result {
Ok(check) => check_results.push(check),
Err(e) => error!("Check task failed: {}", e),
}
}
let total_duration_ms = start.elapsed().as_millis();
// Determine overall status
let all_passed = check_results.iter().all(|c| c.passed);
if all_passed {
info!(
"All health checks passed for node '{}' in {}ms",
node_name, total_duration_ms
);
HttpResponse::Ok().json(HealthStatus {
status: "ready".to_string(),
checks: check_results,
total_duration_ms,
})
} else {
let failed_checks: Vec<&str> = check_results
.iter()
.filter(|c| !c.passed)
.map(|c| c.name.as_str())
.collect();
warn!(
"Health checks failed for node '{}' in {}ms: {:?}",
node_name, total_duration_ms, failed_checks
);
HttpResponse::ServiceUnavailable().json(HealthStatus {
status: "not-ready".to_string(),
checks: check_results,
total_duration_ms,
})
}
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
env_logger::init();
let port = env::var("LISTEN_PORT").unwrap_or_else(|_| "25001".to_string());
let port = port
.parse::<u16>()
.unwrap_or_else(|_| panic!("Invalid port number: {}", port));
let bind_addr = format!("0.0.0.0:{}", port);
info!("Starting harmony-node-readiness-endpoint on {}", bind_addr);
HttpServer::new(|| App::new().service(health))
.workers(3)
.bind(&bind_addr)?
.run()
.await
}
#[cfg(test)]
mod tests {
use super::*;
use kube::error::ErrorResponse;
#[test]
fn parse_checks_defaults_to_node_ready() {
assert_eq!(parse_checks(None), vec!["node_ready"]);
assert_eq!(parse_checks(Some("")), vec!["node_ready"]);
}
#[test]
fn parse_checks_splits_and_trims_values() {
assert_eq!(
parse_checks(Some("node_ready, okd_router_1936 ")),
vec!["node_ready", "okd_router_1936"]
);
}
}

View File

@@ -21,6 +21,7 @@ http.workspace = true
inquire.workspace = true
interactive-parse = "0.1.5"
schemars = "0.8"
vaultrs = "0.7.4"
[dev-dependencies]
pretty_assertions.workspace = true

View File

@@ -1,4 +1,5 @@
use lazy_static::lazy_static;
use std::env;
lazy_static! {
pub static ref SECRET_NAMESPACE: String =
@@ -16,3 +17,16 @@ lazy_static! {
pub static ref INFISICAL_CLIENT_SECRET: Option<String> =
std::env::var("HARMONY_SECRET_INFISICAL_CLIENT_SECRET").ok();
}
lazy_static! {
// Openbao/Vault configuration
pub static ref OPENBAO_URL: Option<String> =
env::var("OPENBAO_URL").or(env::var("VAULT_ADDR")).ok();
pub static ref OPENBAO_TOKEN: Option<String> = env::var("OPENBAO_TOKEN").ok();
pub static ref OPENBAO_USERNAME: Option<String> = env::var("OPENBAO_USERNAME").ok();
pub static ref OPENBAO_PASSWORD: Option<String> = env::var("OPENBAO_PASSWORD").ok();
pub static ref OPENBAO_SKIP_TLS: bool =
env::var("OPENBAO_SKIP_TLS").map(|v| v == "true").unwrap_or(false);
pub static ref OPENBAO_KV_MOUNT: String =
env::var("OPENBAO_KV_MOUNT").unwrap_or_else(|_| "secret".to_string());
}

View File

@@ -8,6 +8,12 @@ use config::INFISICAL_CLIENT_SECRET;
use config::INFISICAL_ENVIRONMENT;
use config::INFISICAL_PROJECT_ID;
use config::INFISICAL_URL;
use config::OPENBAO_KV_MOUNT;
use config::OPENBAO_PASSWORD;
use config::OPENBAO_SKIP_TLS;
use config::OPENBAO_TOKEN;
use config::OPENBAO_URL;
use config::OPENBAO_USERNAME;
use config::SECRET_STORE;
use interactive_parse::InteractiveParseObj;
use log::debug;
@@ -17,6 +23,7 @@ use serde::{Serialize, de::DeserializeOwned};
use std::fmt;
use store::InfisicalSecretStore;
use store::LocalFileSecretStore;
use store::OpenbaoSecretStore;
use thiserror::Error;
use tokio::sync::OnceCell;
@@ -69,11 +76,24 @@ async fn get_secret_manager() -> &'static SecretManager {
/// The async initialization function for the SecretManager.
async fn init_secret_manager() -> SecretManager {
let default_secret_score = "infisical".to_string();
let store_type = SECRET_STORE.as_ref().unwrap_or(&default_secret_score);
let default_secret_store = "infisical".to_string();
let store_type = SECRET_STORE.as_ref().unwrap_or(&default_secret_store);
let store: Box<dyn SecretStore> = match store_type.as_str() {
"file" => Box::new(LocalFileSecretStore::default()),
"openbao" | "vault" => {
let store = OpenbaoSecretStore::new(
OPENBAO_URL.clone().expect("Openbao/Vault URL must be set, see harmony_secret config for ways to provide it. You can try with OPENBAO_URL or VAULT_ADDR"),
OPENBAO_KV_MOUNT.clone(),
*OPENBAO_SKIP_TLS,
OPENBAO_TOKEN.clone(),
OPENBAO_USERNAME.clone(),
OPENBAO_PASSWORD.clone(),
)
.await
.expect("Failed to initialize Openbao/Vault secret store");
Box::new(store)
}
"infisical" | _ => {
let store = InfisicalSecretStore::new(
INFISICAL_URL.clone().expect("Infisical url must be set, see harmony_secret config for ways to provide it. You can try with HARMONY_SECRET_INFISICAL_URL"),

View File

@@ -1,4 +1,9 @@
mod infisical;
mod local_file;
mod openbao;
pub use infisical::InfisicalSecretStore;
pub use infisical::*;
pub use local_file::LocalFileSecretStore;
pub use local_file::*;
pub use openbao::OpenbaoSecretStore;

View File

@@ -0,0 +1,317 @@
use crate::{SecretStore, SecretStoreError};
use async_trait::async_trait;
use log::{debug, info, warn};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::fs;
use std::path::PathBuf;
use vaultrs::auth;
use vaultrs::client::{Client, VaultClient, VaultClientSettingsBuilder};
use vaultrs::kv2;
/// Token response from Vault/Openbao auth endpoints
#[derive(Debug, Deserialize)]
struct TokenResponse {
auth: AuthInfo,
}
#[derive(Debug, Serialize, Deserialize)]
struct AuthInfo {
client_token: String,
#[serde(default)]
lease_duration: Option<u64>,
token_type: String,
}
impl From<vaultrs::api::AuthInfo> for AuthInfo {
fn from(value: vaultrs::api::AuthInfo) -> Self {
AuthInfo {
client_token: value.client_token,
token_type: value.token_type,
lease_duration: Some(value.lease_duration),
}
}
}
pub struct OpenbaoSecretStore {
client: VaultClient,
kv_mount: String,
}
impl Debug for OpenbaoSecretStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpenbaoSecretStore")
.field("client", &self.client.settings)
.field("kv_mount", &self.kv_mount)
.finish()
}
}
impl OpenbaoSecretStore {
/// Creates a new Openbao/Vault secret store with authentication
pub async fn new(
base_url: String,
kv_mount: String,
skip_tls: bool,
token: Option<String>,
username: Option<String>,
password: Option<String>,
) -> Result<Self, SecretStoreError> {
info!("OPENBAO_STORE: Initializing client for URL: {base_url}");
// 1. If token is provided via env var, use it directly
if let Some(t) = token {
debug!("OPENBAO_STORE: Using token from environment variable");
return Self::with_token(&base_url, skip_tls, &t, &kv_mount);
}
// 2. Try to load cached token
let cache_path = Self::get_token_cache_path(&base_url);
if let Ok(cached_token) = Self::load_cached_token(&cache_path) {
debug!("OPENBAO_STORE: Found cached token, validating...");
if Self::validate_token(&base_url, skip_tls, &cached_token.client_token).await {
info!("OPENBAO_STORE: Cached token is valid");
return Self::with_token(
&base_url,
skip_tls,
&cached_token.client_token,
&kv_mount,
);
}
warn!("OPENBAO_STORE: Cached token is invalid or expired");
}
// 3. Authenticate with username/password
let (user, pass) = match (username, password) {
(Some(u), Some(p)) => (u, p),
_ => {
return Err(SecretStoreError::Store(
"No valid token found and username/password not provided. \
Set OPENBAO_TOKEN or OPENBAO_USERNAME/OPENBAO_PASSWORD environment variables."
.into(),
));
}
};
let token =
Self::authenticate_userpass(&base_url, &kv_mount, skip_tls, &user, &pass).await?;
// Cache the token
if let Err(e) = Self::cache_token(&cache_path, &token) {
warn!("OPENBAO_STORE: Failed to cache token: {e}");
}
Self::with_token(&base_url, skip_tls, &token.client_token, &kv_mount)
}
/// Create a client with an existing token
fn with_token(
base_url: &str,
skip_tls: bool,
token: &str,
kv_mount: &str,
) -> Result<Self, SecretStoreError> {
let mut settings = VaultClientSettingsBuilder::default();
settings.address(base_url).token(token);
if skip_tls {
warn!("OPENBAO_STORE: Skipping TLS verification - not recommended for production!");
settings.verify(false);
}
let client = VaultClient::new(
settings
.build()
.map_err(|e| SecretStoreError::Store(Box::new(e)))?,
)
.map_err(|e| SecretStoreError::Store(Box::new(e)))?;
Ok(Self {
client,
kv_mount: kv_mount.to_string(),
})
}
/// Get the cache file path for a given base URL
fn get_token_cache_path(base_url: &str) -> PathBuf {
let hash = Self::hash_url(base_url);
directories::BaseDirs::new()
.map(|dirs| {
dirs.data_dir()
.join("harmony")
.join("secrets")
.join(format!("openbao_token_{hash}"))
})
.unwrap_or_else(|| PathBuf::from(format!("/tmp/openbao_token_{hash}")))
}
/// Create a simple hash of the URL for unique cache files
fn hash_url(url: &str) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
url.hash(&mut hasher);
format!("{:016x}", hasher.finish())
}
/// Load cached token from file
fn load_cached_token(path: &PathBuf) -> Result<AuthInfo, String> {
serde_json::from_str(
&fs::read_to_string(path)
.map_err(|e| format!("Could not load token from file {path:?} : {e}"))?,
)
.map_err(|e| format!("Could not deserialize token from file {path:?} : {e}"))
}
/// Cache token to file
fn cache_token(path: &PathBuf, token: &AuthInfo) -> Result<(), std::io::Error> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
// Set file permissions to 0600 (owner read/write only)
#[cfg(unix)]
{
use std::os::unix::fs::OpenOptionsExt;
let mut file = fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.mode(0o600)
.open(path)?;
use std::io::Write;
file.write_all(serde_json::to_string(token)?.as_bytes())?;
}
#[cfg(not(unix))]
{
fs::write(path, token)?;
}
Ok(())
}
/// Validate if a token is still valid using vaultrs
async fn validate_token(base_url: &str, skip_tls: bool, token: &str) -> bool {
let mut settings = VaultClientSettingsBuilder::default();
settings.address(base_url).token(token);
if skip_tls {
settings.verify(false);
}
if let Some(settings) = settings.build().ok() {
let client = match VaultClient::new(settings) {
Ok(s) => s,
Err(_) => return false,
};
return vaultrs::token::lookup(&client, token).await.is_ok();
}
false
}
/// Authenticate using username/password (userpass auth method)
async fn authenticate_userpass(
base_url: &str,
kv_mount: &str,
skip_tls: bool,
username: &str,
password: &str,
) -> Result<AuthInfo, SecretStoreError> {
info!("OPENBAO_STORE: Authenticating with username/password");
// Create a client without a token for authentication
let mut settings = VaultClientSettingsBuilder::default();
settings.address(base_url);
if skip_tls {
settings.verify(false);
}
let client = VaultClient::new(
settings
.build()
.map_err(|e| SecretStoreError::Store(Box::new(e)))?,
)
.map_err(|e| SecretStoreError::Store(Box::new(e)))?;
// Authenticate using userpass method
let token = auth::userpass::login(&client, kv_mount, username, password)
.await
.map_err(|e| SecretStoreError::Store(Box::new(e)))?;
Ok(token.into())
}
}
#[async_trait]
impl SecretStore for OpenbaoSecretStore {
async fn get_raw(&self, namespace: &str, key: &str) -> Result<Vec<u8>, SecretStoreError> {
let path = format!("{}/{}", namespace, key);
info!("OPENBAO_STORE: Getting key '{key}' from namespace '{namespace}'");
debug!("OPENBAO_STORE: Request path: {path}");
let data: serde_json::Value = kv2::read(&self.client, &self.kv_mount, &path)
.await
.map_err(|e| {
// Check for not found error
if e.to_string().contains("does not exist") || e.to_string().contains("404") {
SecretStoreError::NotFound {
namespace: namespace.to_string(),
key: key.to_string(),
}
} else {
SecretStoreError::Store(Box::new(e))
}
})?;
// Extract the actual secret value stored under the "value" key
let value = data.get("value").and_then(|v| v.as_str()).ok_or_else(|| {
SecretStoreError::Store("Secret does not contain expected 'value' field".into())
})?;
Ok(value.as_bytes().to_vec())
}
async fn set_raw(
&self,
namespace: &str,
key: &str,
val: &[u8],
) -> Result<(), SecretStoreError> {
let path = format!("{}/{}", namespace, key);
info!("OPENBAO_STORE: Setting key '{key}' in namespace '{namespace}'");
debug!("OPENBAO_STORE: Request path: {path}");
let value_str =
String::from_utf8(val.to_vec()).map_err(|e| SecretStoreError::Store(Box::new(e)))?;
// Create the data structure expected by our format
let data = serde_json::json!({
"value": value_str
});
kv2::set(&self.client, &self.kv_mount, &path, &data)
.await
.map_err(|e| SecretStoreError::Store(Box::new(e)))?;
info!("OPENBAO_STORE: Successfully stored secret '{key}'");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_hash_url_consistency() {
let url = "https://vault.example.com:8200";
let hash1 = OpenbaoSecretStore::hash_url(url);
let hash2 = OpenbaoSecretStore::hash_url(url);
assert_eq!(hash1, hash2);
assert_eq!(hash1.len(), 16);
}
#[test]
fn test_hash_url_uniqueness() {
let hash1 = OpenbaoSecretStore::hash_url("https://vault1.example.com");
let hash2 = OpenbaoSecretStore::hash_url("https://vault2.example.com");
assert_ne!(hash1, hash2);
}
}

View File

@@ -344,7 +344,7 @@ pub struct StaticMap {
pub mac: String,
pub ipaddr: String,
pub cid: Option<MaybeString>,
pub hostname: String,
pub hostname: Option<String>,
pub descr: Option<MaybeString>,
pub winsserver: MaybeString,
pub dnsserver: MaybeString,
@@ -383,24 +383,24 @@ pub struct Outbound {
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct NatRule {
pub protocol: String,
pub interface: String,
pub category: MaybeString,
pub ipprotocol: String,
pub descr: MaybeString,
pub tag: MaybeString,
pub protocol: Option<String>,
pub interface: Option<String>,
pub category: Option<MaybeString>,
pub ipprotocol: Option<String>,
pub descr: Option<MaybeString>,
pub tag: Option<MaybeString>,
pub tagged: Option<MaybeString>,
pub poolopts: PoolOpts,
pub poolopts: Option<PoolOpts>,
#[yaserde(rename = "associated-rule-id")]
pub associated_rule_id: Option<MaybeString>,
pub disabled: Option<u8>,
pub target: String,
pub target: Option<String>,
#[yaserde(rename = "local-port")]
pub local_port: i32,
pub source: Source,
pub destination: Destination,
pub updated: Updated,
pub created: Created,
pub local_port: Option<i32>,
pub source: Option<Source>,
pub destination: Option<Destination>,
pub updated: Option<Updated>,
pub created: Option<Created>,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
@@ -1408,6 +1408,7 @@ pub struct Account {
pub hostnames: String,
pub wildcard: i32,
pub zone: MaybeString,
pub dynipv6host: Option<MaybeString>,
pub checkip: String,
#[yaserde(rename = "checkip_timeout")]
pub checkip_timeout: i32,
@@ -1539,12 +1540,12 @@ pub struct Dyndns {
pub struct Vlans {
#[yaserde(attribute = true)]
pub version: String,
pub vlan: MaybeString,
pub vlan: RawXml,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]
pub struct Bridges {
pub bridged: Option<MaybeString>,
pub bridged: Option<RawXml>,
}
#[derive(Default, PartialEq, Debug, YaSerialize, YaDeserialize)]

View File

@@ -48,7 +48,7 @@ impl<'a> DhcpConfigLegacyISC<'a> {
hostname: &str,
) -> Result<(), DhcpError> {
let mac = mac.to_string();
let hostname = hostname.to_string();
let hostname = Some(hostname.to_string());
let lan_dhcpd = self.get_lan_dhcpd();
let existing_mappings: &mut Vec<StaticMap> = &mut lan_dhcpd.staticmaps;
@@ -121,7 +121,7 @@ impl<'a> DhcpConfigLegacyISC<'a> {
.map(|entry| StaticMap {
mac: entry["mac"].as_str().unwrap_or_default().to_string(),
ipaddr: entry["ipaddr"].as_str().unwrap_or_default().to_string(),
hostname: entry["hostname"].as_str().unwrap_or_default().to_string(),
hostname: Some(entry["hostname"].as_str().unwrap_or_default().to_string()),
descr: entry["descr"].as_str().map(MaybeString::from),
..Default::default()
})

View File

@@ -213,7 +213,7 @@ impl<'a> DhcpConfigDnsMasq<'a> {
.map(|entry| StaticMap {
mac: entry["mac"].as_str().unwrap_or_default().to_string(),
ipaddr: entry["ipaddr"].as_str().unwrap_or_default().to_string(),
hostname: entry["hostname"].as_str().unwrap_or_default().to_string(),
hostname: Some(entry["hostname"].as_str().unwrap_or_default().to_string()),
descr: entry["descr"].as_str().map(MaybeString::from),
..Default::default()
})