Compare commits

..

1 Commits

Author SHA1 Message Date
063a4d4f5c wip: improving inventory discovery 2026-03-04 07:18:20 -05:00
30 changed files with 89 additions and 1688 deletions

177
Cargo.lock generated
View File

@@ -1008,7 +1008,7 @@ dependencies = [
"anstream",
"anstyle",
"clap_lex",
"strsim 0.11.1",
"strsim",
]
[[package]]
@@ -1375,38 +1375,14 @@ dependencies = [
"syn 2.0.106",
]
[[package]]
name = "darling"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b750cb3417fd1b327431a470f388520309479ab0bf5e323505daf0290cd3850"
dependencies = [
"darling_core 0.14.4",
"darling_macro 0.14.4",
]
[[package]]
name = "darling"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc7f46116c46ff9ab3eb1597a45688b6715c6e628b5c133e288e709a29bcb4ee"
dependencies = [
"darling_core 0.20.11",
"darling_macro 0.20.11",
]
[[package]]
name = "darling_core"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "109c1ca6e6b7f82cc233a97004ea8ed7ca123a9af07a8230878fcfda9b158bf0"
dependencies = [
"fnv",
"ident_case",
"proc-macro2",
"quote",
"strsim 0.10.0",
"syn 1.0.109",
"darling_core",
"darling_macro",
]
[[package]]
@@ -1419,28 +1395,17 @@ dependencies = [
"ident_case",
"proc-macro2",
"quote",
"strsim 0.11.1",
"strsim",
"syn 2.0.106",
]
[[package]]
name = "darling_macro"
version = "0.14.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a4aab4dbc9f7611d8b55048a3a16d2d010c2c8334e46304b40ac1cc14bf3b48e"
dependencies = [
"darling_core 0.14.4",
"quote",
"syn 1.0.109",
]
[[package]]
name = "darling_macro"
version = "0.20.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fc34b93ccb385b40dc71c6fceac4b2ad23662c7eeb248cf10d529b7e055b6ead"
dependencies = [
"darling_core 0.20.11",
"darling_core",
"quote",
"syn 2.0.106",
]
@@ -1483,37 +1448,6 @@ dependencies = [
"syn 2.0.106",
]
[[package]]
name = "derive_builder"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d67778784b508018359cbc8696edb3db78160bab2c2a28ba7f56ef6932997f8"
dependencies = [
"derive_builder_macro",
]
[[package]]
name = "derive_builder_core"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c11bdc11a0c47bc7d37d582b5285da6849c96681023680b906673c5707af7b0f"
dependencies = [
"darling 0.14.4",
"proc-macro2",
"quote",
"syn 1.0.109",
]
[[package]]
name = "derive_builder_macro"
version = "0.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebcda35c7a396850a55ffeac740804b40ffec779b98fffbb1738f4033f0ee79e"
dependencies = [
"derive_builder_core",
"syn 1.0.109",
]
[[package]]
name = "derive_more"
version = "2.0.1"
@@ -2047,19 +1981,6 @@ dependencies = [
"url",
]
[[package]]
name = "example-node-health"
version = "0.1.0"
dependencies = [
"env_logger",
"harmony",
"harmony_cli",
"harmony_macros",
"harmony_types",
"log",
"tokio",
]
[[package]]
name = "example-ntfy"
version = "0.1.0"
@@ -2774,23 +2695,6 @@ dependencies = [
"walkdir",
]
[[package]]
name = "harmony-node-readiness-endpoint"
version = "0.1.0"
dependencies = [
"actix-web",
"chrono",
"env_logger",
"k8s-openapi",
"kube",
"log",
"reqwest 0.12.23",
"serde",
"serde_json",
"tokio",
"tower",
]
[[package]]
name = "harmony_agent"
version = "0.1.0"
@@ -2943,7 +2847,6 @@ dependencies = [
"tempfile",
"thiserror 2.0.16",
"tokio",
"vaultrs",
]
[[package]]
@@ -3642,7 +3545,7 @@ version = "0.3.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "435d80800b936787d62688c927b6490e887c7ef5ff9ce922c6c6050fca75eb9a"
dependencies = [
"darling 0.20.11",
"darling",
"indoc",
"proc-macro2",
"quote",
@@ -3914,7 +3817,7 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "079fc8c1c397538628309cfdee20696ebdcc26745f9fb17f89b78782205bd995"
dependencies = [
"darling 0.20.11",
"darling",
"proc-macro2",
"quote",
"serde",
@@ -5429,40 +5332,6 @@ dependencies = [
"semver",
]
[[package]]
name = "rustify"
version = "0.6.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "759a090a17ce545d1adcffcc48207d5136c8984d8153bd8247b1ad4a71e49f5f"
dependencies = [
"anyhow",
"async-trait",
"bytes",
"http 1.3.1",
"reqwest 0.12.23",
"rustify_derive",
"serde",
"serde_json",
"serde_urlencoded",
"thiserror 1.0.69",
"tracing",
"url",
]
[[package]]
name = "rustify_derive"
version = "0.5.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f07d43b2dbdbd99aaed648192098f0f413b762f0f352667153934ef3955f1793"
dependencies = [
"proc-macro2",
"quote",
"regex",
"serde_urlencoded",
"syn 1.0.109",
"synstructure 0.12.6",
]
[[package]]
name = "rustix"
version = "0.38.44"
@@ -5941,7 +5810,7 @@ version = "3.14.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "de90945e6565ce0d9a25098082ed4ee4002e047cb59892c318d66821e14bb30f"
dependencies = [
"darling 0.20.11",
"darling",
"proc-macro2",
"quote",
"syn 2.0.106",
@@ -6416,12 +6285,6 @@ dependencies = [
"unicode-properties",
]
[[package]]
name = "strsim"
version = "0.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623"
[[package]]
name = "strsim"
version = "0.11.1"
@@ -6917,9 +6780,9 @@ checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801"
[[package]]
name = "tower"
version = "0.5.3"
version = "0.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4"
checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9"
dependencies = [
"futures-core",
"futures-util",
@@ -7235,26 +7098,6 @@ version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "vaultrs"
version = "0.7.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f81eb4d9221ca29bad43d4b6871b6d2e7656e1af2cfca624a87e5d17880d831d"
dependencies = [
"async-trait",
"bytes",
"derive_builder",
"http 1.3.1",
"reqwest 0.12.23",
"rustify",
"rustify_derive",
"serde",
"serde_json",
"thiserror 1.0.69",
"tracing",
"url",
]
[[package]]
name = "vcpkg"
version = "0.2.15"

View File

@@ -2,6 +2,7 @@
resolver = "2"
members = [
"private_repos/*",
"examples/*",
"harmony",
"harmony_types",
"harmony_macros",
@@ -18,8 +19,7 @@ members = [
"adr/agent_discovery/mdns",
"brocade",
"harmony_agent",
"harmony_agent/deploy", "harmony_node_readiness",
"examples/*",
"harmony_agent/deploy",
]
[workspace.package]

View File

@@ -11,7 +11,7 @@ async fn main() {
role: HostRole::Worker,
number_desired_hosts: 3,
discovery_strategy: HarmonyDiscoveryStrategy::SUBNET {
cidr: cidrv4!("192.168.0.1/25"),
cidr: cidrv4!("192.168.2.0/24"),
port: 25000,
},
};
@@ -20,7 +20,7 @@ async fn main() {
role: HostRole::ControlPlane,
number_desired_hosts: 3,
discovery_strategy: HarmonyDiscoveryStrategy::SUBNET {
cidr: cidrv4!("192.168.0.1/25"),
cidr: cidrv4!("192.168.2.0/24"),
port: 25000,
},
};
@@ -28,7 +28,8 @@ async fn main() {
harmony_cli::run(
Inventory::autoload(),
LocalhostTopology::new(),
vec![Box::new(discover_worker), Box::new(discover_control_plane)],
vec![Box::new(discover_worker)],
//vec![Box::new(discover_worker), Box::new(discover_control_plane)],
None,
)
.await

View File

@@ -1,16 +0,0 @@
[package]
name = "example-node-health"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
tokio = { workspace = true }
harmony_macros = { path = "../../harmony_macros" }
log = { workspace = true }
env_logger = { workspace = true }

View File

@@ -1,17 +0,0 @@
use harmony::{
inventory::Inventory, modules::node_health::NodeHealthScore, topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let node_health = NodeHealthScore {};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(node_health)],
None,
)
.await
.unwrap();
}

View File

@@ -1,13 +1,63 @@
use std::str::FromStr;
use harmony::{
inventory::Inventory, modules::openbao::OpenbaoScore, topology::K8sAnywhereTopology,
inventory::Inventory,
modules::helm::chart::{HelmChartScore, HelmRepository, NonBlankString},
topology::K8sAnywhereTopology,
};
use harmony_macros::hurl;
#[tokio::main]
async fn main() {
let openbao = OpenbaoScore {
host: String::new(),
let values_yaml = Some(
r#"server:
standalone:
enabled: true
config: |
listener "tcp" {
tls_disable = true
address = "[::]:8200"
cluster_address = "[::]:8201"
}
storage "file" {
path = "/openbao/data"
}
service:
enabled: true
dataStorage:
enabled: true
size: 10Gi
storageClass: null
accessMode: ReadWriteOnce
auditStorage:
enabled: true
size: 10Gi
storageClass: null
accessMode: ReadWriteOnce"#
.to_string(),
);
let openbao = HelmChartScore {
namespace: Some(NonBlankString::from_str("openbao").unwrap()),
release_name: NonBlankString::from_str("openbao").unwrap(),
chart_name: NonBlankString::from_str("openbao/openbao").unwrap(),
chart_version: None,
values_overrides: None,
values_yaml,
create_namespace: true,
install_only: true,
repository: Some(HelmRepository::new(
"openbao".to_string(),
hurl!("https://openbao.github.io/openbao-helm"),
true,
)),
};
// TODO exec pod commands to initialize secret store if not already done
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),

View File

@@ -8,7 +8,7 @@ use harmony_types::{
use log::debug;
use log::info;
use crate::topology::{HelmCommand, PxeOptions};
use crate::topology::PxeOptions;
use crate::{data::FileContent, executors::ExecutorError, topology::node_exporter::NodeExporter};
use crate::{infra::network_manager::OpenShiftNmStateNetworkManager, topology::PortConfig};
@@ -18,10 +18,7 @@ use super::{
NetworkManager, PreparationError, PreparationOutcome, Router, Switch, SwitchClient,
SwitchError, TftpServer, Topology, k8s::K8sClient,
};
use std::{
process::Command,
sync::{Arc, OnceLock},
};
use std::sync::{Arc, OnceLock};
#[derive(Debug, Clone)]
pub struct HAClusterTopology {
@@ -55,30 +52,6 @@ impl Topology for HAClusterTopology {
}
}
impl HelmCommand for HAClusterTopology {
fn get_helm_command(&self) -> Command {
let mut cmd = Command::new("helm");
if let Some(k) = &self.kubeconfig {
cmd.args(["--kubeconfig", k]);
}
// FIXME we should support context anywhere there is a k8sclient
// This likely belongs in the k8sclient itself and should be extracted to a separate
// crate
//
// I feel like helm could very well be a feature of this external k8s client.
//
// Same for kustomize
//
// if let Some(c) = &self.k8s_context {
// cmd.args(["--kube-context", c]);
// }
info!("Using helm command {cmd:?}");
cmd
}
}
#[async_trait]
impl K8sclient for HAClusterTopology {
async fn k8s_client(&self) -> Result<Arc<K8sClient>, String> {

View File

@@ -10,10 +10,8 @@ use k8s_openapi::api::core::v1::{
};
use k8s_openapi::api::rbac::v1::{ClusterRoleBinding, RoleRef, Subject};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use kube::api::DynamicObject;
use kube::error::DiscoveryError;
use log::{debug, error, info, warn};
use serde::de::DeserializeOwned;
#[derive(Debug)]
pub struct PrivilegedPodConfig {
@@ -281,16 +279,6 @@ pub fn prompt_drain_timeout_action(
}
}
/// JSON round-trip: DynamicObject → K
///
/// Safe because the DynamicObject was produced by the apiserver from a
/// payload that was originally serialized from K, so the schema is identical.
pub(crate) fn dyn_to_typed<K: DeserializeOwned>(obj: DynamicObject) -> Result<K, kube::Error> {
serde_json::to_value(obj)
.and_then(serde_json::from_value)
.map_err(kube::Error::SerdeError)
}
#[cfg(test)]
mod tests {
use super::*;

View File

@@ -832,6 +832,7 @@ impl K8sClient {
pub async fn apply<K>(&self, resource: &K, namespace: Option<&str>) -> Result<K, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize,
<K as Resource>::Scope: ApplyStrategy<K>,
<K as kube::Resource>::DynamicType: Default,
{
debug!(
@@ -844,34 +845,9 @@ impl K8sClient {
serde_json::to_value(resource).unwrap_or(serde_json::Value::Null)
);
// ── 1. Extract GVK from compile-time type info ──────────────────────────
let dyntype = K::DynamicType::default();
let gvk = GroupVersionKind {
group: K::group(&dyntype).to_string(),
version: K::version(&dyntype).to_string(),
kind: K::kind(&dyntype).to_string(),
};
// ── 2. Resolve scope at runtime via discovery ────────────────────────────
let discovery = self.discovery().await?;
let (ar, caps) = discovery.resolve_gvk(&gvk).ok_or_else(|| {
Error::Discovery(DiscoveryError::MissingResource(format!(
"Cannot resolve GVK: {:?}",
gvk
)))
})?;
let effective_namespace = if caps.scope == Scope::Cluster {
None
} else {
// Prefer the caller-supplied namespace, fall back to the resource's own
namespace.or_else(|| resource.meta().namespace.as_deref())
};
// ── 3. Determine the effective namespace based on the discovered scope ───
let api: Api<DynamicObject> =
get_dynamic_api(ar, caps, self.client.clone(), effective_namespace, false);
let api: Api<K> =
<<K as Resource>::Scope as ApplyStrategy<K>>::get_api(&self.client, namespace);
// api.create(&PostParams::default(), &resource).await
let patch_params = PatchParams::apply("harmony");
let name = resource
.meta()
@@ -907,7 +883,7 @@ impl K8sClient {
if current_yaml == new_yaml {
println!("No changes detected.");
// Return the current resource state as there are no changes.
return helper::dyn_to_typed(current);
return Ok(current);
}
println!("Changes detected:");
@@ -954,19 +930,13 @@ impl K8sClient {
.patch(name, &patch_params, &Patch::Apply(resource))
.await
{
Ok(obj) => helper::dyn_to_typed(obj),
Ok(obj) => Ok(obj),
Err(Error::Api(ErrorResponse { code: 404, .. })) => {
// Resource doesn't exist, server-side apply should create it
// This can happen with some API servers, so we explicitly create
debug!("Resource '{}' not found, creating via POST", name);
let dyn_resource: DynamicObject = serde_json::from_value(
serde_json::to_value(resource).map_err(Error::SerdeError)?,
)
.map_err(Error::SerdeError)?;
api.create(&PostParams::default(), &dyn_resource)
api.create(&PostParams::default(), resource)
.await
.and_then(helper::dyn_to_typed)
.map_err(|e| {
error!("Failed to create resource '{}': {}", name, e);
e
@@ -983,6 +953,7 @@ impl K8sClient {
pub async fn apply_many<K>(&self, resource: &[K], ns: Option<&str>) -> Result<Vec<K>, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize,
<K as Resource>::Scope: ApplyStrategy<K>,
<K as Resource>::DynamicType: Default,
{
let mut result = Vec::new();
@@ -1936,7 +1907,7 @@ impl K8sClient {
);
// ── 3. Evict & wait loop ──────────────────────────────────────
let mut start = tokio::time::Instant::now();
let start = tokio::time::Instant::now();
let poll_interval = Duration::from_secs(5);
let mut pending = evictable;
@@ -2019,7 +1990,7 @@ impl K8sClient {
}
helper::DrainTimeoutAction::Retry => {
// Reset the start time to retry for another full timeout period
start = tokio::time::Instant::now();
let start = tokio::time::Instant::now();
continue;
}
helper::DrainTimeoutAction::Abort => {

View File

@@ -44,12 +44,6 @@ pub struct BrocadeSwitchAuth {
pub password: String,
}
impl BrocadeSwitchAuth {
pub fn user_pass(username: String, password: String) -> Self {
Self { username, password }
}
}
#[derive(Secret, Clone, Debug, JsonSchema, Serialize, Deserialize)]
pub struct BrocadeSnmpAuth {
pub username: String,

View File

@@ -45,7 +45,7 @@ impl<T: Topology> Interpret<T> for DiscoverHostForRoleInterpret {
topology: &T,
) -> Result<Outcome, InterpretError> {
info!(
"Launching discovery agent, make sure that your nodes are successfully PXE booted and running inventory agent. They should answer on `http://<node_ip>:8080/inventory`"
"Launching discovery agent, make sure that your nodes are successfully PXE booted and running inventory agent. They should answer on `http://<node_ip>:25000/inventory`"
);
LaunchDiscoverInventoryAgentScore {
discovery_timeout: None,
@@ -58,6 +58,8 @@ impl<T: Topology> Interpret<T> for DiscoverHostForRoleInterpret {
let host_repo = InventoryRepositoryFactory::build().await?;
let mut assigned_hosts = 0;
// let hosts_for_role = host_repo.get_hosts_for_role(&self.score.role);
loop {
let all_hosts = host_repo.get_all_hosts().await?;

View File

@@ -54,12 +54,6 @@ pub enum HarmonyDiscoveryStrategy {
SUBNET { cidr: cidr::Ipv4Cidr, port: u16 },
}
impl Default for HarmonyDiscoveryStrategy {
fn default() -> Self {
HarmonyDiscoveryStrategy::MDNS
}
}
#[async_trait]
impl<T: Topology> Interpret<T> for DiscoverInventoryAgentInterpret {
async fn execute(

View File

@@ -1,5 +1,5 @@
use async_trait::async_trait;
use k8s_openapi::{NamespaceResourceScope, ResourceScope};
use k8s_openapi::NamespaceResourceScope;
use kube::Resource;
use log::info;
use serde::{Serialize, de::DeserializeOwned};
@@ -29,7 +29,7 @@ impl<K: Resource + std::fmt::Debug> K8sResourceScore<K> {
}
impl<
K: Resource<Scope: ResourceScope>
K: Resource<Scope = NamespaceResourceScope>
+ std::fmt::Debug
+ Sync
+ DeserializeOwned
@@ -61,7 +61,7 @@ pub struct K8sResourceInterpret<K: Resource + std::fmt::Debug + Sync + Send> {
#[async_trait]
impl<
K: Resource<Scope: ResourceScope>
K: Resource<Scope = NamespaceResourceScope>
+ Clone
+ std::fmt::Debug
+ DeserializeOwned

View File

@@ -15,13 +15,10 @@ pub mod load_balancer;
pub mod monitoring;
pub mod nats;
pub mod network;
pub mod node_health;
pub mod okd;
pub mod openbao;
pub mod opnsense;
pub mod postgresql;
pub mod prometheus;
pub mod storage;
pub mod tenant;
pub mod tftp;
pub mod zitadel;

View File

@@ -1,260 +0,0 @@
use async_trait::async_trait;
use harmony_types::id::Id;
use k8s_openapi::api::{
apps::v1::{DaemonSet, DaemonSetSpec},
core::v1::{
Container, ContainerPort, EnvVar, EnvVarSource, Namespace, ObjectFieldSelector, PodSpec,
PodTemplateSpec, ResourceRequirements, ServiceAccount, Toleration,
},
rbac::v1::{ClusterRole, ClusterRoleBinding, PolicyRule, Role, RoleBinding, RoleRef, Subject},
};
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use kube::api::ObjectMeta;
use serde::Serialize;
use std::collections::BTreeMap;
use crate::{
data::Version,
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
modules::k8s::resource::K8sResourceScore,
score::Score,
topology::{K8sclient, Topology},
};
#[derive(Clone, Debug, Serialize)]
pub struct NodeHealthScore {}
impl<T: Topology + K8sclient> Score<T> for NodeHealthScore {
fn name(&self) -> String {
format!("NodeHealthScore")
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(NodeHealthInterpret {})
}
}
#[derive(Debug, Clone)]
pub struct NodeHealthInterpret {}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for NodeHealthInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let namespace_name = "harmony-node-healthcheck".to_string();
// Namespace
let mut labels = BTreeMap::new();
labels.insert("name".to_string(), namespace_name.clone());
let namespace = Namespace {
metadata: ObjectMeta {
name: Some(namespace_name.clone()),
labels: Some(labels),
..ObjectMeta::default()
},
..Namespace::default()
};
// ServiceAccount
let service_account_name = "node-healthcheck-sa".to_string();
let service_account = ServiceAccount {
metadata: ObjectMeta {
name: Some(service_account_name.clone()),
namespace: Some(namespace_name.clone()),
..ObjectMeta::default()
},
..ServiceAccount::default()
};
// ClusterRole
let cluster_role = ClusterRole {
metadata: ObjectMeta {
name: Some("node-healthcheck-role".to_string()),
..ObjectMeta::default()
},
rules: Some(vec![PolicyRule {
api_groups: Some(vec!["".to_string()]),
resources: Some(vec!["nodes".to_string()]),
verbs: vec!["get".to_string(), "list".to_string()],
..PolicyRule::default()
}]),
..ClusterRole::default()
};
// Role
let role = Role {
metadata: ObjectMeta {
name: Some("allow-hostnetwork-scc".to_string()),
namespace: Some(namespace_name.clone()),
..ObjectMeta::default()
},
rules: Some(vec![PolicyRule {
api_groups: Some(vec!["security.openshift.io".to_string()]),
resources: Some(vec!["securitycontextconstraints".to_string()]),
resource_names: Some(vec!["hostnetwork".to_string()]),
verbs: vec!["use".to_string()],
..PolicyRule::default()
}]),
..Role::default()
};
// RoleBinding
let role_binding = RoleBinding {
metadata: ObjectMeta {
name: Some("node-status-querier-scc-binding".to_string()),
namespace: Some(namespace_name.clone()),
..ObjectMeta::default()
},
subjects: Some(vec![Subject {
kind: "ServiceAccount".to_string(),
name: service_account_name.clone(),
namespace: Some(namespace_name.clone()),
..Subject::default()
}]),
role_ref: RoleRef {
api_group: "rbac.authorization.k8s.io".to_string(),
kind: "Role".to_string(),
name: "allow-hostnetwork-scc".to_string(),
},
};
// ClusterRoleBinding
let cluster_role_binding = ClusterRoleBinding {
metadata: ObjectMeta {
name: Some("read-nodes-binding".to_string()),
..ObjectMeta::default()
},
subjects: Some(vec![Subject {
kind: "ServiceAccount".to_string(),
name: service_account_name.clone(),
namespace: Some(namespace_name.clone()),
..Subject::default()
}]),
role_ref: RoleRef {
api_group: "rbac.authorization.k8s.io".to_string(),
kind: "ClusterRole".to_string(),
name: "node-healthcheck-role".to_string(),
},
};
// DaemonSet
let mut daemonset_labels = BTreeMap::new();
daemonset_labels.insert("app".to_string(), "node-healthcheck".to_string());
let daemon_set = DaemonSet {
metadata: ObjectMeta {
name: Some("node-healthcheck".to_string()),
namespace: Some(namespace_name.clone()),
labels: Some(daemonset_labels.clone()),
..ObjectMeta::default()
},
spec: Some(DaemonSetSpec {
selector: LabelSelector {
match_labels: Some(daemonset_labels.clone()),
..LabelSelector::default()
},
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(daemonset_labels),
..ObjectMeta::default()
}),
spec: Some(PodSpec {
service_account_name: Some(service_account_name.clone()),
host_network: Some(true),
tolerations: Some(vec![Toleration {
operator: Some("Exists".to_string()),
..Toleration::default()
}]),
containers: vec![Container {
name: "checker".to_string(),
image: Some(
"hub.nationtech.io/harmony/harmony-node-readiness-endpoint:latest"
.to_string(),
),
env: Some(vec![EnvVar {
name: "NODE_NAME".to_string(),
value_from: Some(EnvVarSource {
field_ref: Some(ObjectFieldSelector {
field_path: "spec.nodeName".to_string(),
..ObjectFieldSelector::default()
}),
..EnvVarSource::default()
}),
..EnvVar::default()
}]),
ports: Some(vec![ContainerPort {
container_port: 25001,
host_port: Some(25001),
name: Some("health-port".to_string()),
..ContainerPort::default()
}]),
resources: Some(ResourceRequirements {
requests: Some({
let mut requests = BTreeMap::new();
requests.insert("cpu".to_string(), Quantity("10m".to_string()));
requests
.insert("memory".to_string(), Quantity("50Mi".to_string()));
requests
}),
..ResourceRequirements::default()
}),
..Container::default()
}],
..PodSpec::default()
}),
},
..DaemonSetSpec::default()
}),
..DaemonSet::default()
};
K8sResourceScore::single(namespace, None)
.interpret(inventory, topology)
.await?;
K8sResourceScore::single(service_account, Some(namespace_name.clone()))
.interpret(inventory, topology)
.await?;
K8sResourceScore::single(cluster_role, None)
.interpret(inventory, topology)
.await?;
K8sResourceScore::single(role, Some(namespace_name.clone()))
.interpret(inventory, topology)
.await?;
K8sResourceScore::single(role_binding, Some(namespace_name.clone()))
.interpret(inventory, topology)
.await?;
K8sResourceScore::single(cluster_role_binding, None)
.interpret(inventory, topology)
.await?;
K8sResourceScore::single(daemon_set, Some(namespace_name.clone()))
.interpret(inventory, topology)
.await?;
Ok(Outcome::success(
"Harmony node health successfully deployed".to_string(),
))
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("NodeHealth")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}

View File

@@ -1,88 +0,0 @@
use std::str::FromStr;
use harmony_macros::hurl;
use non_blank_string_rs::NonBlankString;
use serde::Serialize;
use crate::{
interpret::Interpret,
modules::helm::chart::{HelmChartScore, HelmRepository},
score::Score,
topology::{HelmCommand, K8sclient, Topology},
};
#[derive(Debug, Serialize, Clone)]
pub struct OpenbaoScore {
/// Host used for external access (ingress)
pub host: String,
}
impl<T: Topology + K8sclient + HelmCommand> Score<T> for OpenbaoScore {
fn name(&self) -> String {
"OpenbaoScore".to_string()
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
// TODO exec pod commands to initialize secret store if not already done
let host = &self.host;
let values_yaml = Some(format!(
r#"global:
openshift: true
server:
standalone:
enabled: true
config: |
ui = true
listener "tcp" {{
tls_disable = true
address = "[::]:8200"
cluster_address = "[::]:8201"
}}
storage "file" {{
path = "/openbao/data"
}}
service:
enabled: true
ingress:
enabled: true
hosts:
- host: {host}
dataStorage:
enabled: true
size: 10Gi
storageClass: null
accessMode: ReadWriteOnce
auditStorage:
enabled: true
size: 10Gi
storageClass: null
accessMode: ReadWriteOnce
ui:
enabled: true"#
));
HelmChartScore {
namespace: Some(NonBlankString::from_str("openbao").unwrap()),
release_name: NonBlankString::from_str("openbao").unwrap(),
chart_name: NonBlankString::from_str("openbao/openbao").unwrap(),
chart_version: None,
values_overrides: None,
values_yaml,
create_namespace: true,
install_only: false,
repository: Some(HelmRepository::new(
"openbao".to_string(),
hurl!("https://openbao.github.io/openbao-helm"),
true,
)),
}
.create_interpret()
}
}

View File

@@ -1,51 +0,0 @@
use std::str::FromStr;
use harmony_macros::hurl;
use non_blank_string_rs::NonBlankString;
use serde::Serialize;
use crate::{
interpret::Interpret,
modules::helm::chart::{HelmChartScore, HelmRepository},
score::Score,
topology::{HelmCommand, K8sclient, Topology},
};
#[derive(Debug, Serialize, Clone)]
pub struct ZitadelScore {
/// Host used for external access (ingress)
pub host: String,
}
impl<T: Topology + K8sclient + HelmCommand> Score<T> for ZitadelScore {
fn name(&self) -> String {
"ZitadelScore".to_string()
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
// TODO exec pod commands to initialize secret store if not already done
let host = &self.host;
let values_yaml = Some(format!(r#""#));
todo!("This is not complete yet");
HelmChartScore {
namespace: Some(NonBlankString::from_str("zitadel").unwrap()),
release_name: NonBlankString::from_str("zitadel").unwrap(),
chart_name: NonBlankString::from_str("zitadel/zitadel").unwrap(),
chart_version: None,
values_overrides: None,
values_yaml,
create_namespace: true,
install_only: false,
repository: Some(HelmRepository::new(
"zitadel".to_string(),
hurl!("https://charts.zitadel.com"),
true,
)),
}
.create_interpret()
}
}

View File

@@ -1,17 +0,0 @@
[package]
name = "harmony-node-readiness-endpoint"
version = "0.1.0"
edition = "2024"
[dependencies]
actix-web = "4"
kube.workspace = true
k8s-openapi.workspace = true
serde.workspace = true
serde_json.workspace = true
env_logger.workspace = true
log.workspace = true
tokio.workspace = true
reqwest.workspace = true
chrono.workspace = true
tower = "0.5.3"

View File

@@ -1,13 +0,0 @@
FROM debian:13-slim
# RUN apt-get update && apt-get install -y --no-install-recommends \
# ca-certificates \
# && rm -rf /var/lib/apt/lists/*
COPY harmony-node-readiness-endpoint /usr/local/bin/harmony-node-readiness-endpoint
ENV RUST_LOG=info
EXPOSE 25001
CMD ["harmony-node-readiness-endpoint"]

View File

@@ -1,197 +0,0 @@
# harmony-node-readiness-endpoint
**A lightweight, standalone Rust service for Kubernetes node health checking.**
Designed for **bare-metal Kubernetes clusters** with external load balancers (HAProxy, OPNsense, F5, etc.).
Exposes a simple HTTP endpoint (`/health`) on each node:
- **200 OK** — node is healthy and ready to receive traffic
- **503 Service Unavailable** — node should be removed from the load balancer pool
- **500 Internal Server Error** — misconfiguration (e.g. `NODE_NAME` not set)
This project is **not dependent on Harmony**, but is commonly used as part of Harmony bare-metal Kubernetes deployments.
## Why this project exists
In bare-metal environments, external load balancers often rely on pod-level or router-level checks that can lag behind the authoritative Kubernetes `Node.status.conditions[Ready]`.
This service provides the true source-of-truth with fast reaction time.
## Available checks
| Check name | Description | Status |
|--------------------|-------------------------------------------------------------|-------------------|
| `node_ready` | Queries `Node.status.conditions[Ready]` via Kubernetes API | Implemented |
| `okd_router_1936` | Probes OpenShift router `/healthz/ready` on port 1936 | Implemented |
| `filesystem_ro` | Detects read-only mounts via `/proc/mounts` | To be implemented |
| `kubelet` | Local probe to kubelet `/healthz` (port 10248) | To be implemented |
| `container_runtime`| Socket check + runtime status | To be implemented |
| `disk_pressure` | Threshold checks on key filesystems | To be implemented |
| `network` | DNS resolution + gateway connectivity | To be implemented |
| `custom_conditions`| Reacts to extra conditions (NPD, etc.) | To be implemented |
All checks are combined with logical **AND** — any single failure results in 503.
## Behavior
### `node_ready` check — fail-open design
The `node_ready` check queries the Kubernetes API server to read `Node.status.conditions[Ready]`.
Because this service runs on the node it is checking, there are scenarios where the API server is temporarily
unreachable (e.g. during a control-plane restart). To avoid incorrectly draining a healthy node in such cases,
the check is **fail-open**: it passes (reports ready) whenever the Kubernetes API is unavailable.
| Situation | Result | HTTP status |
|------------------------------------------------------|-------------------|-------------|
| `Node.conditions[Ready] == True` | Pass | 200 |
| `Node.conditions[Ready] == False` | Fail | 503 |
| `Ready` condition absent | Fail | 503 |
| API server unreachable or timed out (1 s timeout) | Pass (assumes ready) | 200 |
| Kubernetes client initialization failed | Pass (assumes ready) | 200 |
| `NODE_NAME` env var not set | Hard error | 500 |
A warning is logged whenever the API is unavailable and the check falls back to assuming ready.
### `okd_router_1936` check
Sends `GET http://127.0.0.1:1936/healthz/ready` with a 5-second timeout.
Returns pass on any 2xx response, fail otherwise.
### Unknown check names
Requesting an unknown check name (e.g. `check=bogus`) results in that check returning `passed: false`
with reason `"Unknown check: bogus"`, and the overall response is 503.
## How it works
### Node name discovery
The service reads the `NODE_NAME` environment variable, which must be injected via the Kubernetes Downward API:
```yaml
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
```
### Kubernetes API authentication
- Uses standard **in-cluster configuration** — no external credentials needed.
- The ServiceAccount token and CA certificate are automatically mounted at `/var/run/secrets/kubernetes.io/serviceaccount/`.
- Requires only minimal RBAC: `get` and `list` on the `nodes` resource (see `deploy/resources.yaml`).
- Connect and write timeouts are set to **1 second** to keep checks fast.
## Deploy
All Kubernetes resources (Namespace, ServiceAccount, ClusterRole, ClusterRoleBinding, and an OpenShift SCC RoleBinding for `hostnetwork`) are in a single file.
```bash
kubectl apply -f deploy/resources.yaml
kubectl apply -f deploy/daemonset.yaml
```
The DaemonSet uses `hostNetwork: true` and `hostPort: 25001`, so the endpoint is reachable directly on the node's IP at port 25001.
It tolerates all taints, ensuring it runs even on nodes marked unschedulable.
### Configure your external load balancer
**Example for HAProxy / OPNsense:**
- Check type: **HTTP**
- URI: `/health`
- Port: `25001` (configurable via `LISTEN_PORT` env var)
- Interval: 510 s
- Rise: 2
- Fall: 3
- Expect: `2xx`
## Endpoint usage
### Query parameter
Use the `check` query parameter to select which checks to run (comma-separated).
When omitted, only `node_ready` runs.
| Request | Checks run |
|------------------------------------------------|-----------------------------------|
| `GET /health` | `node_ready` |
| `GET /health?check=okd_router_1936` | `okd_router_1936` only |
| `GET /health?check=node_ready,okd_router_1936` | `node_ready` and `okd_router_1936`|
> **Note:** specifying `check=` replaces the default. Include `node_ready` explicitly if you need it alongside other checks.
### Response format
```json
{
"status": "ready" | "not-ready",
"checks": [
{
"name": "<check-name>",
"passed": true | false,
"reason": "<failure reason, omitted on success>",
"duration_ms": 42
}
],
"total_duration_ms": 42
}
```
**Healthy node (default)**
```http
HTTP/1.1 200 OK
{
"status": "ready",
"checks": [{ "name": "node_ready", "passed": true, "duration_ms": 42 }],
"total_duration_ms": 42
}
```
**Unhealthy node**
```http
HTTP/1.1 503 Service Unavailable
{
"status": "not-ready",
"checks": [
{ "name": "node_ready", "passed": false, "reason": "KubeletNotReady", "duration_ms": 35 }
],
"total_duration_ms": 35
}
```
**API server unreachable (fail-open)**
```http
HTTP/1.1 200 OK
{
"status": "ready",
"checks": [{ "name": "node_ready", "passed": true, "duration_ms": 1001 }],
"total_duration_ms": 1001
}
```
*(A warning is logged: `Kubernetes API appears to be down … Assuming node is ready.`)*
## Configuration
| Env var | Default | Description |
|---------------|----------|--------------------------------------|
| `NODE_NAME` | required | Node name, injected via Downward API |
| `LISTEN_PORT` | `25001` | TCP port the HTTP server binds to |
| `RUST_LOG` | — | Log level (e.g. `info`, `debug`) |
## Development
```bash
# Run locally
NODE_NAME=my-test-node cargo run
# Run tests
cargo test
```
---
*Minimal, auditable, and built for production bare-metal Kubernetes environments.*

View File

@@ -1,13 +0,0 @@
#!/bin/bash
# TODO
# This is meant to be run on a machine with harmony development tools installed (cargo, etc)
DOCKER_TAG="${DOCKER_TAG:-dev}"
cargo build --release
cp ../target/release/harmony-node-readiness-endpoint .
docker build . -t hub.nationtech.io/harmony/harmony-node-readiness-endpoint:${DOCKER_TAG}

View File

@@ -1,36 +0,0 @@
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: node-healthcheck
namespace: harmony-node-healthcheck
spec:
selector:
matchLabels:
app: node-healthcheck
template:
metadata:
labels:
app: node-healthcheck
spec:
serviceAccountName: node-healthcheck-sa
hostNetwork: true
# This ensures the pod runs even if the node is already "unschedulable"
# so it can report the status correctly.
tolerations:
- operator: Exists
containers:
- name: checker
image: hub.nationtech.io/harmony/harmony-node-readiness-endpoint:latest
env:
- name: NODE_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
ports:
- containerPort: 25001
hostPort: 25001
name: health-port
resources:
requests:
cpu: 10m
memory: 50Mi

View File

@@ -1,64 +0,0 @@
apiVersion: v1
kind: Namespace
metadata:
name: harmony-node-healthcheck
labels:
name: harmony-node-healthcheck
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: node-healthcheck-sa
namespace: harmony-node-healthcheck
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: node-healthcheck-role
rules:
- apiGroups: [""]
resources: ["nodes"]
verbs: ["get", "list"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: allow-hostnetwork-scc
namespace: harmony-node-healthcheck
rules:
- apiGroups: ["security.openshift.io"]
resources: ["securitycontextconstraints"]
resourceNames: ["hostnetwork"]
verbs: ["use"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: node-status-querier-scc-binding
namespace: harmony-node-healthcheck
subjects:
- kind: ServiceAccount
name: node-healthcheck-sa
namespace: harmony-node-healthcheck
roleRef:
kind: Role
name: allow-hostnetwork-scc
apiGroup: rbac.authorization.k8s.io
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: read-nodes-binding
subjects:
- kind: ServiceAccount
name: node-healthcheck-sa
namespace: harmony-node-healthcheck
roleRef:
kind: ClusterRole
name: node-healthcheck-role
apiGroup: rbac.authorization.k8s.io

View File

@@ -1,282 +0,0 @@
use actix_web::{App, HttpResponse, HttpServer, Responder, get, web};
use k8s_openapi::api::core::v1::Node;
use kube::{Api, Client, Config};
use log::{debug, error, info, warn};
use reqwest;
use serde::{Deserialize, Serialize};
use std::env;
use std::time::{Duration, Instant};
use tokio::task::JoinSet;
const K8S_CLIENT_TIMEOUT: Duration = Duration::from_secs(1);
#[derive(Serialize, Deserialize)]
struct HealthStatus {
status: String,
checks: Vec<CheckResult>,
total_duration_ms: u128,
}
#[derive(Serialize, Deserialize)]
struct CheckResult {
name: String,
passed: bool,
#[serde(skip_serializing_if = "Option::is_none")]
reason: Option<String>,
duration_ms: u128,
}
#[derive(Serialize, Deserialize)]
struct HealthError {
status: String,
error: String,
}
#[derive(Deserialize)]
struct HealthQuery {
#[serde(rename = "check")]
checks: Option<String>,
}
/// Check if the node's Ready condition is true via Kubernetes API
async fn check_node_ready(client: Client, node_name: &str) -> Result<(), String> {
let nodes: Api<Node> = Api::all(client);
let node = match nodes.get(node_name).await {
Ok(n) => n,
Err(e) => {
warn!(
"Kubernetes API appears to be down, unreachable, or timed out for node '{}': {}. Assuming node is ready.",
node_name, e
);
return Ok(());
}
};
let conditions = node.status.and_then(|s| s.conditions).unwrap_or_default();
for condition in conditions {
if condition.type_ == "Ready" {
let is_ready = condition.status == "True";
let reason = condition
.reason
.clone()
.unwrap_or_else(|| "Unknown".to_string());
if !is_ready {
return Err(reason);
}
return Ok(());
}
}
Err("Ready condition not found".to_string())
}
/// Check OKD router health endpoint on port 1936
async fn check_okd_router_1936() -> Result<(), String> {
debug!("Checking okd router 1936");
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(5))
.build()
.map_err(|e| format!("Failed to build HTTP client: {}", e))?;
let response = client
.get("http://127.0.0.1:1936/healthz/ready")
.send()
.await
.map_err(|e| format!("Failed to connect to OKD router: {}", e))?;
debug!("okd router 1936 response status {}", response.status());
if response.status().is_success() {
Ok(())
} else {
Err(format!("OKD router returned status: {}", response.status()))
}
}
/// Parse comma-separated check names from query parameter
fn parse_checks(checks_param: Option<&str>) -> Vec<String> {
match checks_param {
None => vec!["node_ready".to_string()],
Some(s) if s.is_empty() => vec!["node_ready".to_string()],
Some(s) => s.split(',').map(|c| c.trim().to_string()).collect(),
}
}
/// Run a single health check by name and return the result
async fn run_check(check_name: &str, client: Option<Client>, node_name: &str) -> CheckResult {
let start = Instant::now();
let result = match check_name {
"node_ready" => match client {
Some(c) => check_node_ready(c, node_name).await,
None => {
warn!(
"Kubernetes client not available for node '{}'. Assuming node is ready.",
node_name
);
Ok(())
}
},
"okd_router_1936" => check_okd_router_1936().await,
_ => Err(format!("Unknown check: {}", check_name)),
};
let duration_ms = start.elapsed().as_millis();
match result {
Ok(()) => CheckResult {
name: check_name.to_string(),
passed: true,
reason: None,
duration_ms,
},
Err(reason) => CheckResult {
name: check_name.to_string(),
passed: false,
reason: Some(reason),
duration_ms,
},
}
}
#[get("/health")]
async fn health(query: web::Query<HealthQuery>) -> impl Responder {
let node_name = match env::var("NODE_NAME") {
Ok(name) => name,
Err(_) => {
error!("NODE_NAME environment variable not set");
return HttpResponse::InternalServerError().json(HealthError {
status: "error".to_string(),
error: "NODE_NAME environment variable not set".to_string(),
});
}
};
// Parse requested checks from query parameter
let requested_checks = parse_checks(query.checks.as_deref());
// Check if node_ready check requires Kubernetes client
let needs_k8s_client = requested_checks.contains(&"node_ready".to_string());
// Initialize Kubernetes client only if needed
let k8s_client = if needs_k8s_client {
match Config::infer().await {
Ok(mut config) => {
config.write_timeout = Some(K8S_CLIENT_TIMEOUT);
config.connect_timeout = Some(K8S_CLIENT_TIMEOUT);
Some(Client::try_from(config).map_err(|e| e.to_string()))
}
Err(e) => {
warn!(
"Failed to infer Kubernetes config for node '{}': {}. Assuming node_ready is healthy.",
node_name, e
);
None
}
}
.and_then(|result| match result {
Ok(client) => Some(client),
Err(e) => {
warn!(
"Failed to create Kubernetes client for node '{}': {}. Assuming node_ready is healthy.",
node_name, e
);
None
}
})
} else {
None
};
// Run all requested checks in parallel
let start = Instant::now();
let mut join_set = JoinSet::new();
debug!("Running checks {requested_checks:?}");
for check_name in requested_checks {
let client = k8s_client.clone();
let node_name = node_name.clone();
join_set.spawn(async move { run_check(&check_name, client, &node_name).await });
}
let mut check_results = Vec::new();
while let Some(result) = join_set.join_next().await {
match result {
Ok(check) => check_results.push(check),
Err(e) => error!("Check task failed: {}", e),
}
}
let total_duration_ms = start.elapsed().as_millis();
// Determine overall status
let all_passed = check_results.iter().all(|c| c.passed);
if all_passed {
info!(
"All health checks passed for node '{}' in {}ms",
node_name, total_duration_ms
);
HttpResponse::Ok().json(HealthStatus {
status: "ready".to_string(),
checks: check_results,
total_duration_ms,
})
} else {
let failed_checks: Vec<&str> = check_results
.iter()
.filter(|c| !c.passed)
.map(|c| c.name.as_str())
.collect();
warn!(
"Health checks failed for node '{}' in {}ms: {:?}",
node_name, total_duration_ms, failed_checks
);
HttpResponse::ServiceUnavailable().json(HealthStatus {
status: "not-ready".to_string(),
checks: check_results,
total_duration_ms,
})
}
}
#[actix_web::main]
async fn main() -> std::io::Result<()> {
env_logger::init();
let port = env::var("LISTEN_PORT").unwrap_or_else(|_| "25001".to_string());
let port = port
.parse::<u16>()
.unwrap_or_else(|_| panic!("Invalid port number: {}", port));
let bind_addr = format!("0.0.0.0:{}", port);
info!("Starting harmony-node-readiness-endpoint on {}", bind_addr);
HttpServer::new(|| App::new().service(health))
.workers(3)
.bind(&bind_addr)?
.run()
.await
}
#[cfg(test)]
mod tests {
use super::*;
use kube::error::ErrorResponse;
#[test]
fn parse_checks_defaults_to_node_ready() {
assert_eq!(parse_checks(None), vec!["node_ready"]);
assert_eq!(parse_checks(Some("")), vec!["node_ready"]);
}
#[test]
fn parse_checks_splits_and_trims_values() {
assert_eq!(
parse_checks(Some("node_ready, okd_router_1936 ")),
vec!["node_ready", "okd_router_1936"]
);
}
}

View File

@@ -21,7 +21,6 @@ http.workspace = true
inquire.workspace = true
interactive-parse = "0.1.5"
schemars = "0.8"
vaultrs = "0.7.4"
[dev-dependencies]
pretty_assertions.workspace = true

View File

@@ -1,5 +1,4 @@
use lazy_static::lazy_static;
use std::env;
lazy_static! {
pub static ref SECRET_NAMESPACE: String =
@@ -17,16 +16,3 @@ lazy_static! {
pub static ref INFISICAL_CLIENT_SECRET: Option<String> =
std::env::var("HARMONY_SECRET_INFISICAL_CLIENT_SECRET").ok();
}
lazy_static! {
// Openbao/Vault configuration
pub static ref OPENBAO_URL: Option<String> =
env::var("OPENBAO_URL").or(env::var("VAULT_ADDR")).ok();
pub static ref OPENBAO_TOKEN: Option<String> = env::var("OPENBAO_TOKEN").ok();
pub static ref OPENBAO_USERNAME: Option<String> = env::var("OPENBAO_USERNAME").ok();
pub static ref OPENBAO_PASSWORD: Option<String> = env::var("OPENBAO_PASSWORD").ok();
pub static ref OPENBAO_SKIP_TLS: bool =
env::var("OPENBAO_SKIP_TLS").map(|v| v == "true").unwrap_or(false);
pub static ref OPENBAO_KV_MOUNT: String =
env::var("OPENBAO_KV_MOUNT").unwrap_or_else(|_| "secret".to_string());
}

View File

@@ -8,12 +8,6 @@ use config::INFISICAL_CLIENT_SECRET;
use config::INFISICAL_ENVIRONMENT;
use config::INFISICAL_PROJECT_ID;
use config::INFISICAL_URL;
use config::OPENBAO_KV_MOUNT;
use config::OPENBAO_PASSWORD;
use config::OPENBAO_SKIP_TLS;
use config::OPENBAO_TOKEN;
use config::OPENBAO_URL;
use config::OPENBAO_USERNAME;
use config::SECRET_STORE;
use interactive_parse::InteractiveParseObj;
use log::debug;
@@ -23,7 +17,6 @@ use serde::{Serialize, de::DeserializeOwned};
use std::fmt;
use store::InfisicalSecretStore;
use store::LocalFileSecretStore;
use store::OpenbaoSecretStore;
use thiserror::Error;
use tokio::sync::OnceCell;
@@ -76,24 +69,11 @@ async fn get_secret_manager() -> &'static SecretManager {
/// The async initialization function for the SecretManager.
async fn init_secret_manager() -> SecretManager {
let default_secret_store = "infisical".to_string();
let store_type = SECRET_STORE.as_ref().unwrap_or(&default_secret_store);
let default_secret_score = "infisical".to_string();
let store_type = SECRET_STORE.as_ref().unwrap_or(&default_secret_score);
let store: Box<dyn SecretStore> = match store_type.as_str() {
"file" => Box::new(LocalFileSecretStore::default()),
"openbao" | "vault" => {
let store = OpenbaoSecretStore::new(
OPENBAO_URL.clone().expect("Openbao/Vault URL must be set, see harmony_secret config for ways to provide it. You can try with OPENBAO_URL or VAULT_ADDR"),
OPENBAO_KV_MOUNT.clone(),
*OPENBAO_SKIP_TLS,
OPENBAO_TOKEN.clone(),
OPENBAO_USERNAME.clone(),
OPENBAO_PASSWORD.clone(),
)
.await
.expect("Failed to initialize Openbao/Vault secret store");
Box::new(store)
}
"infisical" | _ => {
let store = InfisicalSecretStore::new(
INFISICAL_URL.clone().expect("Infisical url must be set, see harmony_secret config for ways to provide it. You can try with HARMONY_SECRET_INFISICAL_URL"),

View File

@@ -1,9 +1,4 @@
mod infisical;
mod local_file;
mod openbao;
pub use infisical::InfisicalSecretStore;
pub use infisical::*;
pub use local_file::LocalFileSecretStore;
pub use local_file::*;
pub use openbao::OpenbaoSecretStore;

View File

@@ -1,317 +0,0 @@
use crate::{SecretStore, SecretStoreError};
use async_trait::async_trait;
use log::{debug, info, warn};
use serde::{Deserialize, Serialize};
use std::fmt::Debug;
use std::fs;
use std::path::PathBuf;
use vaultrs::auth;
use vaultrs::client::{Client, VaultClient, VaultClientSettingsBuilder};
use vaultrs::kv2;
/// Token response from Vault/Openbao auth endpoints
#[derive(Debug, Deserialize)]
struct TokenResponse {
auth: AuthInfo,
}
#[derive(Debug, Serialize, Deserialize)]
struct AuthInfo {
client_token: String,
#[serde(default)]
lease_duration: Option<u64>,
token_type: String,
}
impl From<vaultrs::api::AuthInfo> for AuthInfo {
fn from(value: vaultrs::api::AuthInfo) -> Self {
AuthInfo {
client_token: value.client_token,
token_type: value.token_type,
lease_duration: Some(value.lease_duration),
}
}
}
pub struct OpenbaoSecretStore {
client: VaultClient,
kv_mount: String,
}
impl Debug for OpenbaoSecretStore {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("OpenbaoSecretStore")
.field("client", &self.client.settings)
.field("kv_mount", &self.kv_mount)
.finish()
}
}
impl OpenbaoSecretStore {
/// Creates a new Openbao/Vault secret store with authentication
pub async fn new(
base_url: String,
kv_mount: String,
skip_tls: bool,
token: Option<String>,
username: Option<String>,
password: Option<String>,
) -> Result<Self, SecretStoreError> {
info!("OPENBAO_STORE: Initializing client for URL: {base_url}");
// 1. If token is provided via env var, use it directly
if let Some(t) = token {
debug!("OPENBAO_STORE: Using token from environment variable");
return Self::with_token(&base_url, skip_tls, &t, &kv_mount);
}
// 2. Try to load cached token
let cache_path = Self::get_token_cache_path(&base_url);
if let Ok(cached_token) = Self::load_cached_token(&cache_path) {
debug!("OPENBAO_STORE: Found cached token, validating...");
if Self::validate_token(&base_url, skip_tls, &cached_token.client_token).await {
info!("OPENBAO_STORE: Cached token is valid");
return Self::with_token(
&base_url,
skip_tls,
&cached_token.client_token,
&kv_mount,
);
}
warn!("OPENBAO_STORE: Cached token is invalid or expired");
}
// 3. Authenticate with username/password
let (user, pass) = match (username, password) {
(Some(u), Some(p)) => (u, p),
_ => {
return Err(SecretStoreError::Store(
"No valid token found and username/password not provided. \
Set OPENBAO_TOKEN or OPENBAO_USERNAME/OPENBAO_PASSWORD environment variables."
.into(),
));
}
};
let token =
Self::authenticate_userpass(&base_url, &kv_mount, skip_tls, &user, &pass).await?;
// Cache the token
if let Err(e) = Self::cache_token(&cache_path, &token) {
warn!("OPENBAO_STORE: Failed to cache token: {e}");
}
Self::with_token(&base_url, skip_tls, &token.client_token, &kv_mount)
}
/// Create a client with an existing token
fn with_token(
base_url: &str,
skip_tls: bool,
token: &str,
kv_mount: &str,
) -> Result<Self, SecretStoreError> {
let mut settings = VaultClientSettingsBuilder::default();
settings.address(base_url).token(token);
if skip_tls {
warn!("OPENBAO_STORE: Skipping TLS verification - not recommended for production!");
settings.verify(false);
}
let client = VaultClient::new(
settings
.build()
.map_err(|e| SecretStoreError::Store(Box::new(e)))?,
)
.map_err(|e| SecretStoreError::Store(Box::new(e)))?;
Ok(Self {
client,
kv_mount: kv_mount.to_string(),
})
}
/// Get the cache file path for a given base URL
fn get_token_cache_path(base_url: &str) -> PathBuf {
let hash = Self::hash_url(base_url);
directories::BaseDirs::new()
.map(|dirs| {
dirs.data_dir()
.join("harmony")
.join("secrets")
.join(format!("openbao_token_{hash}"))
})
.unwrap_or_else(|| PathBuf::from(format!("/tmp/openbao_token_{hash}")))
}
/// Create a simple hash of the URL for unique cache files
fn hash_url(url: &str) -> String {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let mut hasher = DefaultHasher::new();
url.hash(&mut hasher);
format!("{:016x}", hasher.finish())
}
/// Load cached token from file
fn load_cached_token(path: &PathBuf) -> Result<AuthInfo, String> {
serde_json::from_str(
&fs::read_to_string(path)
.map_err(|e| format!("Could not load token from file {path:?} : {e}"))?,
)
.map_err(|e| format!("Could not deserialize token from file {path:?} : {e}"))
}
/// Cache token to file
fn cache_token(path: &PathBuf, token: &AuthInfo) -> Result<(), std::io::Error> {
if let Some(parent) = path.parent() {
fs::create_dir_all(parent)?;
}
// Set file permissions to 0600 (owner read/write only)
#[cfg(unix)]
{
use std::os::unix::fs::OpenOptionsExt;
let mut file = fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.mode(0o600)
.open(path)?;
use std::io::Write;
file.write_all(serde_json::to_string(token)?.as_bytes())?;
}
#[cfg(not(unix))]
{
fs::write(path, token)?;
}
Ok(())
}
/// Validate if a token is still valid using vaultrs
async fn validate_token(base_url: &str, skip_tls: bool, token: &str) -> bool {
let mut settings = VaultClientSettingsBuilder::default();
settings.address(base_url).token(token);
if skip_tls {
settings.verify(false);
}
if let Some(settings) = settings.build().ok() {
let client = match VaultClient::new(settings) {
Ok(s) => s,
Err(_) => return false,
};
return vaultrs::token::lookup(&client, token).await.is_ok();
}
false
}
/// Authenticate using username/password (userpass auth method)
async fn authenticate_userpass(
base_url: &str,
kv_mount: &str,
skip_tls: bool,
username: &str,
password: &str,
) -> Result<AuthInfo, SecretStoreError> {
info!("OPENBAO_STORE: Authenticating with username/password");
// Create a client without a token for authentication
let mut settings = VaultClientSettingsBuilder::default();
settings.address(base_url);
if skip_tls {
settings.verify(false);
}
let client = VaultClient::new(
settings
.build()
.map_err(|e| SecretStoreError::Store(Box::new(e)))?,
)
.map_err(|e| SecretStoreError::Store(Box::new(e)))?;
// Authenticate using userpass method
let token = auth::userpass::login(&client, kv_mount, username, password)
.await
.map_err(|e| SecretStoreError::Store(Box::new(e)))?;
Ok(token.into())
}
}
#[async_trait]
impl SecretStore for OpenbaoSecretStore {
async fn get_raw(&self, namespace: &str, key: &str) -> Result<Vec<u8>, SecretStoreError> {
let path = format!("{}/{}", namespace, key);
info!("OPENBAO_STORE: Getting key '{key}' from namespace '{namespace}'");
debug!("OPENBAO_STORE: Request path: {path}");
let data: serde_json::Value = kv2::read(&self.client, &self.kv_mount, &path)
.await
.map_err(|e| {
// Check for not found error
if e.to_string().contains("does not exist") || e.to_string().contains("404") {
SecretStoreError::NotFound {
namespace: namespace.to_string(),
key: key.to_string(),
}
} else {
SecretStoreError::Store(Box::new(e))
}
})?;
// Extract the actual secret value stored under the "value" key
let value = data.get("value").and_then(|v| v.as_str()).ok_or_else(|| {
SecretStoreError::Store("Secret does not contain expected 'value' field".into())
})?;
Ok(value.as_bytes().to_vec())
}
async fn set_raw(
&self,
namespace: &str,
key: &str,
val: &[u8],
) -> Result<(), SecretStoreError> {
let path = format!("{}/{}", namespace, key);
info!("OPENBAO_STORE: Setting key '{key}' in namespace '{namespace}'");
debug!("OPENBAO_STORE: Request path: {path}");
let value_str =
String::from_utf8(val.to_vec()).map_err(|e| SecretStoreError::Store(Box::new(e)))?;
// Create the data structure expected by our format
let data = serde_json::json!({
"value": value_str
});
kv2::set(&self.client, &self.kv_mount, &path, &data)
.await
.map_err(|e| SecretStoreError::Store(Box::new(e)))?;
info!("OPENBAO_STORE: Successfully stored secret '{key}'");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_hash_url_consistency() {
let url = "https://vault.example.com:8200";
let hash1 = OpenbaoSecretStore::hash_url(url);
let hash2 = OpenbaoSecretStore::hash_url(url);
assert_eq!(hash1, hash2);
assert_eq!(hash1.len(), 16);
}
#[test]
fn test_hash_url_uniqueness() {
let hash1 = OpenbaoSecretStore::hash_url("https://vault1.example.com");
let hash2 = OpenbaoSecretStore::hash_url("https://vault2.example.com");
assert_ne!(hash1, hash2);
}
}

View File

@@ -1408,7 +1408,6 @@ pub struct Account {
pub hostnames: String,
pub wildcard: i32,
pub zone: MaybeString,
pub dynipv6host: Option<MaybeString>,
pub checkip: String,
#[yaserde(rename = "checkip_timeout")]
pub checkip_timeout: i32,