Compare commits

...

13 Commits

Author SHA1 Message Date
cd555eaaf4 chore: iobench deployment node example 2026-03-24 14:29:42 -04:00
25970ef7aa ADR that have been moved around, needs to be cleaned up before merge 2026-03-24 14:28:07 -04:00
7cb5237fdd fix(secret): openbao implementation wrong fs write call on windows
All checks were successful
Run Check Script / check (push) Successful in 1m48s
Compile and package harmony_composer / package_harmony_composer (push) Successful in 7m40s
2026-03-23 09:20:43 -04:00
9a67bcc96f Merge pull request 'fix/cnpgInstallation' (#251) from fix/cnpgInstallation into master
Some checks failed
Run Check Script / check (push) Successful in 1m45s
Compile and package harmony_composer / package_harmony_composer (push) Failing after 2m15s
Reviewed-on: #251
2026-03-20 21:02:53 +00:00
a377fc1404 Merge branch 'master' into fix/cnpgInstallation
All checks were successful
Run Check Script / check (pull_request) Successful in 1m44s
2026-03-20 20:56:30 +00:00
c9977fee12 fix: CI file moved
All checks were successful
Run Check Script / check (pull_request) Successful in 2m5s
2026-03-20 16:48:38 -04:00
64bf585e07 fix: remove check.sh with broken path handling and fix formatting
Some checks failed
Run Check Script / check (pull_request) Failing after 12s
2026-03-20 16:41:30 -04:00
44e2c45435 fix: flaky tests due to bad environment variables handling in harmony_config crate 2026-03-20 16:40:08 -04:00
cdccbc8939 fix: formatting and minor stuff 2026-03-20 16:34:48 -04:00
9830971d05 feat: Creat namespace in k8s client and wait for namespace ready utility functions 2026-03-20 16:15:51 -04:00
e1183ef6de feat: K8s postgresql score now ensures cnpg is installed 2026-03-20 07:02:26 -04:00
8499f4d1b7 Merge pull request 'fix: small details were preventing to re-save frontends,backends and healthchecks in opnsense UI' (#248) from fix/load-balancer-xml into master
Some checks failed
Run Check Script / check (push) Has been cancelled
Compile and package harmony_composer / package_harmony_composer (push) Has been cancelled
Reviewed-on: #248
2026-03-17 14:38:35 +00:00
67c3265286 fix: small details were preventing to re-save frontends,backends and healthchecks in opnsense UI
All checks were successful
Run Check Script / check (pull_request) Successful in 2m12s
2026-03-13 10:31:17 -04:00
19 changed files with 850 additions and 88 deletions

View File

@@ -15,4 +15,4 @@ jobs:
uses: actions/checkout@v4
- name: Run check script
run: bash check.sh
run: bash build/check.sh

View File

@@ -0,0 +1,124 @@
# ADR014 · Port Allocation Policy
**Date:**20251103
**Status:**Proposed
**Authors:**Infrastructure Team / NationTech Harmony Core
---
## 1·Context
Harmony orchestrates heterogeneous clusters — from developer laptops to multitenant production environments — through a single, typed Rust codebase.
To enable deterministic, automatable service exposure across clusters, we need a **stable, conflictfree range of TCP/UDP ports** reserved for Harmonymanaged workloads and hostnetwork services.
Today, various manual operations (e.g. CephRGW on hostnetwork) occasionally use adhoc local ports such as`80`or`8080`. This leads to unpredictable overlaps with NodePorts, ephemeral ports, or other daemons.
Since Harmony aims to unify *manual* and *automated* infrastructure under a single declarative model, we formalize a **reserved port block** that should be used consistently — even for outofband operations — so everything remains alignmentready for future orchestration.
---
## 2·Problem Statement
We must select a contiguous range of approximately1000ports that:
1. Avoids conflict with:
- System/wellknown ports(`01023`)
- Common registered application ports(`102449151`)
- KubernetesNodePortdefaultrange(`3000032767`)
- Ephemeral port ranges (typically`3276860999`onLinux)
2. Feels deterministic and maintainable for longterm automation.
3. Can be mirrored across multiple clusters and baremetal hosts.
---
## 3·Decision
We designate:
> **Harmony Reserved Port Range:**`2500025999`
### Rationale
| Criterion | Reasoning |
|------------|------------|
| **Safety** | Lies below the default ephemeral port start(`32768`)and above NodePort(`3000032767`). |
| **Conflicts** | Very few historical IANA registrations in this span, practically unused in modern systems. |
| **Predictability** | Easy to recognize and communicate (`25kblock`for Harmony). |
| **Ergonomics** | Mirrors production service conventions(`25080` for HTTPlike, `25443` for HTTPSlike, etc.). |
| **Crosscluster consistency** | Portable across OKD,K3D,baremetal allows static firewall rules. |
This range is *registered* (per IANA definition102449151) but not *occupied*. Choosing such a midregistered band is intentional: it guarantees good interplatform compatibility without interfering with OSmanaged ephemeral allocations.
---
## 4·Integration inHarmony
Harmonys topology and orchestration layers should expose a builtin **`PortAllocator`** resource:
```rust
let orchestrator_ports = harmony::network::PortAllocator::new(25_000, 25_999);
let rados_gateway_port = orchestrator_ports.allocate("ceph-rgw")?;
```
In TOML configuration:
```toml
[harmony.network.defaults]
orchestrator_port_range = "25000-25999"
```
All `Score`/`Interpret` modules that deploy hostnetwork or externally visible services should use this allocator to pick ports deterministically.
---
## 5·Operational Guidance
- **Manual Operations:**Even manually configured hostnetwork services on clusters we manage (e.g. CephRGW,ntfy,reverseproxytests) should adopt ports within`2500025999`.
- **Automation Alignment:**Any such ports are considered *Harmonymanageable* and may be introspected later by the orchestrator.
- **Firewall Rules:**Cluster and site firewalls may prewhitelist this block as “HarmonyInfraPorts”.
---
## 6·Initial Allocation Table
| Service / Component | Port | Notes |
|----------------------|------|-------|
| **CephRADOSGateway** |**25080**| Replacement for`:80`; publicHTTPfrontendviaRGW(hostnetwork). |
| **InternalHTTPSServices** |25443 | Equivalent of`:443`for secure endpoints. |
| **Prometheus/MetricsEndpoints** |2509025099 | Internal observability. |
| **HarmonyControlPlane(gRPC)** |2550025519 | Interprocess messaging, API,TUIbackend. |
| **ReservedforFutureScores** |2555025599 | Dedicated expansion slots. |
These values shall be managed centrally in a `harmony.network.ports` registry file (YAML/TOML) within the repository.
---
## 7·Consequences
### Positive
- Eliminates accidental collisions with NodePorts and ephemeral connections.
- Provides a memorable, humanreadable convention suitable for documentation and automation.
- Enables Harmony to treat hostnetwork configuration as firstclass code.
### Negative / Tradeoffs
- Occupies part of the IANA “registered” band (must remain consistent across environments).
- Requires operators to adapt legacy services away from lower ports(`80`,`443`)for compliance with this policy.
---
## 8·Future Work
- Extend HarmonyCLI to display currently allocated ports and detect conflicts.
- Expose a compiletime capability check ensuring clusters in a topology do not reuse the same port twice.
- Consider parallel UDP range reservation when we start exposing datagram services.
---
## 9·References
- [IANAServiceName and TransportProtocol PortNumber Registry](https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xhtml)
- [KubernetesNodePortdefaultrange documentation](https://kubernetes.io/docs/concepts/services-networking/service/)
- HarmonyADR001·WhyRust
- HarmonyADR003·InfrastructureAbstractions
---
*Made with🦀by theNationTechHarmonyteam — ensuring your infrastructure behaves like code.*

View File

@@ -0,0 +1,107 @@
### Context & Use Case
We are implementing a High Availability (HA) Failover Strategy for decentralized "Micro Data Centers." The core challenge is managing stateful workloads (PostgreSQL) over unreliable networks.
We solve this using a **Local Fencing First** approach, backed by **NATS JetStream Strict Ordering** for the final promotion authority.
In CAP theorem terms, we are developing a CP system, intentionally sacrificing availability. In practical terms, we expect an average of two primary outages per year, with a failover delay of around 2 minutes. This translates to an uptime of over five nines. To be precise, 2 outages * 2 minutes = 4 minutes per year = 99.99924% uptime.
### The Algorithm: Local Fencing & Remote Promotion
The safety (data consistency) of the system relies on the time gap between the **Primary giving up (Fencing)** and the **Replica taking over (Promotion)**.
To avoid clock skew issues between agents and datastore (nats), all timestamps comparisons will be done using jetstream metadata. I.E. a harmony agent will never use `Instant::now()` to get a timestamp, it will use `my_last_heartbeat.metadata.timestamp` (conceptually).
#### 1. Configuration
* `heartbeat_timeout` (e.g., 1s): Max time allowed for a NATS write/DB check.
* `failure_threshold` (e.g., 2): Consecutive failures before self-fencing.
* `failover_timeout` (e.g., 5s): Time since last NATS update of Primary heartbeat before Replica promotes.
* This timeout must be carefully configured to allow enough time for the primary to fence itself (after `heartbeat_timeout * failure_threshold`) BEFORE the replica gets promoted to avoid a split brain with two primaries.
* Implementing this will rely on the actual deployment configuration. For example, a CNPG based PostgreSQL cluster might require a longer gap (such as 30s) than other technologies.
* Expires when `replica_heartbeat.metadata.timestamp - primary_heartbeat.metadata.timestamp > failover_timeout`
#### 2. The Primary (Self-Preservation)
The Primary is aggressive about killing itself.
* It attempts a heartbeat.
* If the network latency > `heartbeat_timeout`, the attempt is **cancelled locally** because the heartbeat did not make it back in time.
* This counts as a failure and increments the `consecutive_failures` counter.
* If `consecutive_failures` hit the threshold, **FENCING occurs immediately**. The database is stopped.
This means that the Primary will fence itself after `heartbeat_timeout * failure_threshold`.
#### 3. The Replica (The Watchdog)
The Replica is patient.
* It watches the NATS stream to measure if `replica_heartbeat.metadata.timestamp - primary_heartbeat.metadata.timestamp > failover_timeout`
* It only attempts promotion if the `failover_timeout` (5s) has passed.
* **Crucial:** Careful configuration of the failover_timeout is required. This is the only way to avoid a split brain in case of a network partition where the Primary cannot write its heartbeats in time anymore.
* In short, `failover_timeout` should be tuned to be `heartbeat_timeout * failure_threshold + safety_margin`. This `safety_margin` will vary by use case. For example, a CNPG cluster may need 30 seconds to demote a Primary to Replica when fencing is triggered, so `safety_margin` should be at least 30s in that setup.
Since we forcibly fail timeouts after `heartbeat_timeout`, we are guaranteed that the primary will have **started** the fencing process after `heartbeat_timeout * failure_threshold`.
But, in a network split scenario where the failed primary is still accessible by clients but cannot write its heartbeat successfully, there is no way to know if the demotion has actually **completed**.
For example, in a CNPG cluster, the failed Primary agent will attempt to change the CNPG cluster state to read-only. But if anything fails after that attempt (permission error, k8s api failure, CNPG bug, etc) it is possible that the PostgreSQL instance keeps accepting writes.
While this is not a theoretical failure of the agent's algorithm, this is a practical failure where data corruption occurs.
This can be fixed by detecting the demotion failure and escalating the fencing procedure aggressiveness. Harmony being an infrastructure orchestrator, it can easily exert radical measures if given the proper credentials, such as forcibly powering off a server, disconnecting its network in the switch configuration, forcibly kill a pod/container/process, etc.
However, these details are out of scope of this algorithm, as they simply fall under the "fencing procedure".
The implementation of the fencing procedure itself is not relevant. This algorithm's responsibility stops at calling the fencing procedure in the appropriate situation.
#### 4. The Demotion Handshake (Return to Normalcy)
When the original Primary recovers:
1. It becomes healthy locally but sees `current_primary = Replica`. It waits.
2. The Replica (current leader) detects the Original Primary is back (via NATS heartbeats).
3. Replica performs a **Clean Demotion**:
* Stops DB.
* Writes `current_primary = None` to NATS.
4. Original Primary sees `current_primary = None` and can launch the promotion procedure.
Depending on the implementation, the promotion procedure may require a transition phase. Typically, for a PostgreSQL use case the promoting primary will make sure it has caught up on WAL replication before starting to accept writes.
---
### Failure Modes & Behavior Analysis
#### Case 1: Immediate Outage (Power Cut)
* **Primary:** Dies instantly. Fencing is implicit (machine is off).
* **Replica:** Waits for `failover_timeout` (5s). Sees staleness. Promotes self.
* **Outcome:** Clean failover after 5s.
// TODO detail what happens when the primary comes back up. We will likely have to tie PostgreSQL's lifecycle (liveness/readiness probes) with the agent to ensure it does not come back up as primary.
#### Case 2: High Network Latency on the Primary (The "Split Brain" Trap)
* **Scenario:** Network latency spikes to 5s on the Primary, still below `heartbeat_timeout` on the Replica.
* **T=0 to T=2 (Primary):** Tries to write. Latency (5s) > Timeout (1s). Fails twice.
* **T=2 (Primary):** `consecutive_failures` = 2. **Primary Fences Self.** (Service is DOWN).
* **T=2 to T=5 (Cluster):** **Read-Only Phase.** No Primary exists.
* **T=5 (Replica):** `failover_timeout` reached. Replica promotes self.
* **Outcome:** Safe failover. The "Read-Only Gap" (T=2 to T=5) ensures no Split Brain occurred.
#### Case 3: Replica Network Lag (False Positive)
* **Scenario:** Replica has high latency, greater than `failover_timeout`; Primary is fine.
* **Replica:** Thinks Primary is dead. Tries to promote by setting `cluster_state.current_primary = replica_id`.
* **NATS:** Rejects the write because the Primary is still updating the sequence numbers successfully.
* **Outcome:** Promotion denied. Primary stays leader.
#### Case 4: Network Instability (Flapping)
* **Scenario:** Intermittent packet loss.
* **Primary:** Fails 1 heartbeat, succeeds the next. `consecutive_failures` resets.
* **Replica:** Sees a slight delay in updates, but never reaches `failover_timeout`.
* **Outcome:** No Fencing, No Promotion. System rides out the noise.
## Contextual notes
* Clock skew : Tokio relies on monotonic clocks. This means that `tokio::time::sleep(...)` will not be affected by system clock corrections (such as NTP). But monotonic clocks are known to jump forward in some cases such as VM live migrations. This could mean a false timeout of a single heartbeat. If `failure_threshold = 1`, this can mean a false negative on the nodes' health, and a potentially useless demotion.

View File

@@ -0,0 +1,107 @@
### Context & Use Case
We are implementing a High Availability (HA) Failover Strategy for decentralized "Micro Data Centers." The core challenge is managing stateful workloads (PostgreSQL) over unreliable networks.
We solve this using a **Local Fencing First** approach, backed by **NATS JetStream Strict Ordering** for the final promotion authority.
In CAP theorem terms, we are developing a CP system, intentionally sacrificing availability. In practical terms, we expect an average of two primary outages per year, with a failover delay of around 2 minutes. This translates to an uptime of over five nines. To be precise, 2 outages * 2 minutes = 4 minutes per year = 99.99924% uptime.
### The Algorithm: Local Fencing & Remote Promotion
The safety (data consistency) of the system relies on the time gap between the **Primary giving up (Fencing)** and the **Replica taking over (Promotion)**.
To avoid clock skew issues between agents and datastore (nats), all timestamps comparisons will be done using jetstream metadata. I.E. a harmony agent will never use `Instant::now()` to get a timestamp, it will use `my_last_heartbeat.metadata.timestamp` (conceptually).
#### 1. Configuration
* `heartbeat_timeout` (e.g., 1s): Max time allowed for a NATS write/DB check.
* `failure_threshold` (e.g., 2): Consecutive failures before self-fencing.
* `failover_timeout` (e.g., 5s): Time since last NATS update of Primary heartbeat before Replica promotes.
* This timeout must be carefully configured to allow enough time for the primary to fence itself (after `heartbeat_timeout * failure_threshold`) BEFORE the replica gets promoted to avoid a split brain with two primaries.
* Implementing this will rely on the actual deployment configuration. For example, a CNPG based PostgreSQL cluster might require a longer gap (such as 30s) than other technologies.
* Expires when `replica_heartbeat.metadata.timestamp - primary_heartbeat.metadata.timestamp > failover_timeout`
#### 2. The Primary (Self-Preservation)
The Primary is aggressive about killing itself.
* It attempts a heartbeat.
* If the network latency > `heartbeat_timeout`, the attempt is **cancelled locally** because the heartbeat did not make it back in time.
* This counts as a failure and increments the `consecutive_failures` counter.
* If `consecutive_failures` hit the threshold, **FENCING occurs immediately**. The database is stopped.
This means that the Primary will fence itself after `heartbeat_timeout * failure_threshold`.
#### 3. The Replica (The Watchdog)
The Replica is patient.
* It watches the NATS stream to measure if `replica_heartbeat.metadata.timestamp - primary_heartbeat.metadata.timestamp > failover_timeout`
* It only attempts promotion if the `failover_timeout` (5s) has passed.
* **Crucial:** Careful configuration of the failover_timeout is required. This is the only way to avoid a split brain in case of a network partition where the Primary cannot write its heartbeats in time anymore.
* In short, `failover_timeout` should be tuned to be `heartbeat_timeout * failure_threshold + safety_margin`. This `safety_margin` will vary by use case. For example, a CNPG cluster may need 30 seconds to demote a Primary to Replica when fencing is triggered, so `safety_margin` should be at least 30s in that setup.
Since we forcibly fail timeouts after `heartbeat_timeout`, we are guaranteed that the primary will have **started** the fencing process after `heartbeat_timeout * failure_threshold`.
But, in a network split scenario where the failed primary is still accessible by clients but cannot write its heartbeat successfully, there is no way to know if the demotion has actually **completed**.
For example, in a CNPG cluster, the failed Primary agent will attempt to change the CNPG cluster state to read-only. But if anything fails after that attempt (permission error, k8s api failure, CNPG bug, etc) it is possible that the PostgreSQL instance keeps accepting writes.
While this is not a theoretical failure of the agent's algorithm, this is a practical failure where data corruption occurs.
This can be fixed by detecting the demotion failure and escalating the fencing procedure aggressiveness. Harmony being an infrastructure orchestrator, it can easily exert radical measures if given the proper credentials, such as forcibly powering off a server, disconnecting its network in the switch configuration, forcibly kill a pod/container/process, etc.
However, these details are out of scope of this algorithm, as they simply fall under the "fencing procedure".
The implementation of the fencing procedure itself is not relevant. This algorithm's responsibility stops at calling the fencing procedure in the appropriate situation.
#### 4. The Demotion Handshake (Return to Normalcy)
When the original Primary recovers:
1. It becomes healthy locally but sees `current_primary = Replica`. It waits.
2. The Replica (current leader) detects the Original Primary is back (via NATS heartbeats).
3. Replica performs a **Clean Demotion**:
* Stops DB.
* Writes `current_primary = None` to NATS.
4. Original Primary sees `current_primary = None` and can launch the promotion procedure.
Depending on the implementation, the promotion procedure may require a transition phase. Typically, for a PostgreSQL use case the promoting primary will make sure it has caught up on WAL replication before starting to accept writes.
---
### Failure Modes & Behavior Analysis
#### Case 1: Immediate Outage (Power Cut)
* **Primary:** Dies instantly. Fencing is implicit (machine is off).
* **Replica:** Waits for `failover_timeout` (5s). Sees staleness. Promotes self.
* **Outcome:** Clean failover after 5s.
// TODO detail what happens when the primary comes back up. We will likely have to tie PostgreSQL's lifecycle (liveness/readiness probes) with the agent to ensure it does not come back up as primary.
#### Case 2: High Network Latency on the Primary (The "Split Brain" Trap)
* **Scenario:** Network latency spikes to 5s on the Primary, still below `heartbeat_timeout` on the Replica.
* **T=0 to T=2 (Primary):** Tries to write. Latency (5s) > Timeout (1s). Fails twice.
* **T=2 (Primary):** `consecutive_failures` = 2. **Primary Fences Self.** (Service is DOWN).
* **T=2 to T=5 (Cluster):** **Read-Only Phase.** No Primary exists.
* **T=5 (Replica):** `failover_timeout` reached. Replica promotes self.
* **Outcome:** Safe failover. The "Read-Only Gap" (T=2 to T=5) ensures no Split Brain occurred.
#### Case 3: Replica Network Lag (False Positive)
* **Scenario:** Replica has high latency, greater than `failover_timeout`; Primary is fine.
* **Replica:** Thinks Primary is dead. Tries to promote by setting `cluster_state.current_primary = replica_id`.
* **NATS:** Rejects the write because the Primary is still updating the sequence numbers successfully.
* **Outcome:** Promotion denied. Primary stays leader.
#### Case 4: Network Instability (Flapping)
* **Scenario:** Intermittent packet loss.
* **Primary:** Fails 1 heartbeat, succeeds the next. `consecutive_failures` resets.
* **Replica:** Sees a slight delay in updates, but never reaches `failover_timeout`.
* **Outcome:** No Fencing, No Promotion. System rides out the noise.
## Contextual notes
* Clock skew : Tokio relies on monotonic clocks. This means that `tokio::time::sleep(...)` will not be affected by system clock corrections (such as NTP). But monotonic clocks are known to jump forward in some cases such as VM live migrations. This could mean a false timeout of a single heartbeat. If `failure_threshold = 1`, this can mean a false negative on the nodes' health, and a potentially useless demotion.

View File

@@ -0,0 +1,84 @@
# ADR 017: Multi-Cluster Federation and Granular Authorization
* **Status:** Accepted
* **Date:** 2025-12-20
* **Context:** ADR-016 Harmony Agent and Global Mesh For Decentralized Workload Management
## Context and Problem Statement
As Harmony expands to manage workloads across multiple independent clusters, we face two specific requirements regarding network topology and security:
1. **Granular Authorization:** We need the ability to authorize a specific node (Leaf Node) from the decentralized mesh to access only a specific subset of topics within a cluster. For example, a collaborator connecting to our Public Cloud should not have visibility into other collaborator's data or our internal system events. Same goes the other way, when we connect to a collaborator cluster, we must not gain access to their internal data.
2. **Hybrid/Failover Topologies:** Users need to run their own private meshes (e.g., 5 interconnected office locations) with full internal trust, while simultaneously maintaining a connection to the Harmony Public Cloud for failover or bursting. In this scenario, the Public Cloud must not have access to the private mesh's internal data, and the private mesh should only access specific "public" topics on the cloud.
We need to decide how the Harmony Agent and the underlying NATS infrastructure will handle these "Split-Horizon" and "Zero-Trust" scenarios without complicating the application logic within the Agent itself.
## Decision Drivers
* **Security:** Default-deny posture is required for cross-domain connections.
* **Isolation:** Private on-premise clusters must remain private; data leakage to the public cloud is unacceptable.
* **Simplicity:** The Harmony Agent code should not be burdened with managing multiple complex TCP connections or application-layer routing logic.
* **Efficiency:** Failover traffic should only occur when necessary; network chatter (gossip) from the Public Cloud should not propagate to private nodes.
## Considered Options
1. **Application-Layer Multi-Homing:** The Harmony Agent maintains two distinct active connections (one to Local, one to Cloud) and handles routing logic in Go code.
2. **VPN/VPC Peering:** Connect private networks to the public cloud at the network layer (IPSec/WireGuard).
3. **NATS Account Federation & Leaf Nodes:** Utilize NATS native multi-tenancy (Accounts) and topology bridging (Leaf Nodes) to handle routing and permissions at the protocol layer.
## Decision Outcome
We will adopt **Option 3: NATS Account Federation & Leaf Nodes**.
We will leverage NATS built-in "Accounts" to create logical isolation boundaries and "Service Imports/Exports" to bridge specific data streams between private and public clusters.
### 1. Authorization Strategy (Subject-Based Permissions)
We will not implement custom authorization logic inside the Harmony Agent for topic access. Instead, we will enforce **Subject-Based Permissions** at the NATS server level.
* **Mechanism:** When a Leaf Node (e.g., a Customer Site) connects to the Public Cloud, it must authenticate using a scoped credential (JWT/NKey).
* **Policy:** This credential will carry a permission policy that strictly defines:
* `PUB`: Allowed subjects (e.g., `harmony.public.requests`).
* `SUB`: Allowed subjects (e.g., `harmony.public.responses`).
* `DENY`: Implicitly everything else (including `_SYS.>`, other customer streams).
* **Enforcement:** The NATS server rejects unauthorized subscriptions or publications at the protocol level before they reach the application layer.
### 2. Hybrid Topology Strategy (Federation)
We will use **NATS Account Federation** to solve the "Hybrid Cloud" requirement. We will treat the Private Mesh and Public Cloud as separate "Accounts."
* **The Private Mesh (Business Account):**
* Consists of the customer's internal nodes (e.g., 5 locations).
* Nodes share full trust and visibility of `local.*` subjects.
* Configured as a **Leaf Node** connection to the Public Cloud.
* **The Public Cloud (SaaS Account):**
* **Exports** a specific stream/service (e.g., `public.compute.rentals`).
* Does *not* join the customer's cluster mesh. It acts as a hub.
* **The Bridge (Import):**
* The Private Mesh **Imports** the public stream.
* The Private Mesh maps this import to a local subject, e.g., `external.cloud`.
### 3. Failover Logic
The Harmony Agent will implement failover by selecting the target subject, relying on NATS to route the message physically.
1. **Primary Attempt:** Agent publishes to `local.compute`. NATS routes this only to the 5 internal sites.
2. **Failover Condition:** If internal capacity is exhausted (determined by Agent heuristics), the Agent publishes to `external.cloud`.
3. **Routing:** NATS transparently routes `external.cloud` messages over the Leaf Node connection to the Public Cloud.
4. **Security:** Because the Public Cloud does not import `local.compute`, it never sees internal traffic.
## Consequences
### Positive
* **Zero-Trust by Design:** The Public Cloud cannot spy on the Private Mesh because no streams are exported from Private to Public.
* **Reduced Complexity:** The Harmony Agent remains simple; it just publishes to different topics. It does not need to manage connection pools or complex authentication handshakes for multiple clouds.
* **Bandwidth Efficiency:** Gossip protocol traffic (cluster topology updates) is contained within the Accounts. The Private Mesh does not receive heartbeat traffic from the massive Public Cloud.
### Negative
* **Configuration Overhead:** Setting up NATS Accounts, Imports, and Exports requires more complex configuration files (server.conf) and understanding of JWT/NKeys compared to a flat mesh.
* **Observability:** Tracing a message as it crosses Account boundaries (from Private `external.cloud` to Public `public.compute`) requires centralized monitoring that understands Federation.
## References
* [NATS Documentation: Subject-Based Access Control](https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/subject_access)
* [NATS Documentation: Leaf Nodes](https://docs.nats.io/running-a-nats-service/configuration/leafnodes)
* [NATS Documentation: Services & Streams (Imports/Exports)](https://docs.nats.io/running-a-nats-service/configuration/securing_nats/accounts)

View File

@@ -1 +0,0 @@
build/check.sh

View File

@@ -2,13 +2,14 @@ use std::collections::HashMap;
use k8s_openapi::api::{
apps::v1::Deployment,
core::v1::{Node, ServiceAccount},
core::v1::{Namespace, Node, ServiceAccount},
};
use k8s_openapi::apiextensions_apiserver::pkg::apis::apiextensions::v1::CustomResourceDefinition;
use kube::api::ApiResource;
use kube::{
Error, Resource,
api::{Api, DynamicObject, GroupVersionKind, ListParams, ObjectList},
core::ErrorResponse,
runtime::conditions,
runtime::wait::await_condition,
};
@@ -313,4 +314,65 @@ impl K8sClient {
) -> Result<ObjectList<Node>, Error> {
self.list_resources(None, list_params).await
}
pub async fn namespace_exists(&self, name: &str) -> Result<bool, Error> {
let api: Api<Namespace> = Api::all(self.client.clone());
match api.get_opt(name).await? {
Some(_) => Ok(true),
None => Ok(false),
}
}
pub async fn create_namespace(&self, name: &str) -> Result<Namespace, Error> {
let namespace = Namespace {
metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta {
name: Some(name.to_string()),
..Default::default()
},
..Default::default()
};
let api: Api<Namespace> = Api::all(self.client.clone());
api.create(&kube::api::PostParams::default(), &namespace)
.await
}
pub async fn wait_for_namespace(
&self,
name: &str,
timeout: Option<Duration>,
) -> Result<(), Error> {
let api: Api<Namespace> = Api::all(self.client.clone());
let timeout = timeout.unwrap_or(Duration::from_secs(60));
let start = std::time::Instant::now();
loop {
if start.elapsed() > timeout {
return Err(Error::Api(ErrorResponse {
status: "Timeout".to_string(),
message: format!("Namespace '{}' not ready within timeout", name),
reason: "Timeout".to_string(),
code: 408,
}));
}
match api.get_opt(name).await? {
Some(ns) => {
if let Some(status) = ns.status {
if status.phase == Some("Active".to_string()) {
return Ok(());
}
}
}
None => {
return Err(Error::Api(ErrorResponse {
status: "NotFound".to_string(),
message: format!("Namespace '{}' not found", name),
reason: "NotFound".to_string(),
code: 404,
}));
}
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
}

View File

@@ -267,10 +267,16 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
SSL::Default => "".into(),
SSL::Other(other) => other.as_str().into(),
};
let path_without_query = path.split_once('?').map_or(path.as_str(), |(p, _)| p);
let (port, port_name) = match port {
Some(port) => (Some(port.to_string()), port.to_string()),
None => (None, "serverport".to_string()),
};
let haproxy_check = HAProxyHealthCheck {
name: format!("HTTP_{http_method}_{path}"),
name: format!("HTTP_{http_method}_{path_without_query}_{port_name}"),
uuid: Uuid::new_v4().to_string(),
http_method: http_method.to_string().into(),
http_method: http_method.to_string().to_lowercase().into(),
health_check_type: "http".to_string(),
http_uri: path.clone().into(),
interval: "2s".to_string(),
@@ -314,7 +320,10 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
let mut backend = HAProxyBackend {
uuid: Uuid::new_v4().to_string(),
enabled: 1,
name: format!("backend_{}", service.listening_port),
name: format!(
"backend_{}",
service.listening_port.to_string().replace(':', "_")
),
algorithm: "roundrobin".to_string(),
random_draws: Some(2),
stickiness_expire: "30m".to_string(),
@@ -346,10 +355,22 @@ pub(crate) fn harmony_load_balancer_service_to_haproxy_xml(
let frontend = Frontend {
uuid: uuid::Uuid::new_v4().to_string(),
enabled: 1,
name: format!("frontend_{}", service.listening_port),
name: format!(
"frontend_{}",
service.listening_port.to_string().replace(':', "_")
),
bind: service.listening_port.to_string(),
mode: "tcp".to_string(), // TODO do not depend on health check here
default_backend: Some(backend.uuid.clone()),
stickiness_expire: "30m".to_string().into(),
stickiness_size: "50k".to_string().into(),
stickiness_conn_rate_period: "10s".to_string().into(),
stickiness_sess_rate_period: "10s".to_string().into(),
stickiness_http_req_rate_period: "10s".to_string().into(),
stickiness_http_err_rate_period: "10s".to_string().into(),
stickiness_bytes_in_rate_period: "1m".to_string().into(),
stickiness_bytes_out_rate_period: "1m".to_string().into(),
ssl_hsts_max_age: 15768000,
..Default::default()
};
info!("HAPRoxy frontend and backend mode currently hardcoded to tcp");

View File

@@ -1,11 +1,15 @@
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use serde::Serialize;
use std::str::FromStr;
use non_blank_string_rs::NonBlankString;
use crate::interpret::Interpret;
use crate::modules::helm::chart::HelmChartScore;
use crate::modules::k8s::apps::crd::{Subscription, SubscriptionSpec};
use crate::modules::k8s::resource::K8sResourceScore;
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
use crate::topology::{HelmCommand, K8sclient, Topology};
/// Install the CloudNativePg (CNPG) Operator via an OperatorHub `Subscription`.
///
@@ -100,3 +104,41 @@ impl<T: Topology + K8sclient> Score<T> for CloudNativePgOperatorScore {
format!("CloudNativePgOperatorScore({})", self.namespace)
}
}
#[derive(Debug, Clone, Serialize)]
pub struct CloudNativePgOperatorHelmScore {
pub namespace: String,
}
impl Default for CloudNativePgOperatorHelmScore {
fn default() -> Self {
Self {
namespace: "cnpg-system".to_string(),
}
}
}
impl<T: Topology + K8sclient + HelmCommand + 'static> Score<T> for CloudNativePgOperatorHelmScore {
fn name(&self) -> String {
format!("CloudNativePgOperatorHelmScore({})", self.namespace)
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let cnpg_helm_score = HelmChartScore {
namespace: Some(NonBlankString::from_str(&self.namespace).unwrap()),
release_name: NonBlankString::from_str("cloudnative-pg").unwrap(),
chart_name: NonBlankString::from_str(
"oci://ghcr.io/cloudnative-pg/charts/cloudnative-pg",
)
.unwrap(),
chart_version: None,
values_overrides: None,
values_yaml: None,
create_namespace: true,
install_only: true,
repository: None,
};
cnpg_helm_score.create_interpret()
}
}

View File

@@ -1,24 +1,35 @@
use crate::data::Version;
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory;
use crate::modules::k8s::resource::K8sResourceScore;
use crate::modules::postgresql::capability::PostgreSQLConfig;
use crate::modules::postgresql::cnpg::{
Bootstrap, Cluster, ClusterSpec, ExternalCluster, Initdb, PgBaseBackup, ReplicaSpec,
SecretKeySelector, Storage,
};
use crate::modules::postgresql::operator::{
CloudNativePgOperatorHelmScore, CloudNativePgOperatorScore,
};
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
use crate::topology::{HelmCommand, K8sclient, Topology};
use async_trait::async_trait;
use harmony_k8s::KubernetesDistribution;
use harmony_types::id::Id;
use k8s_openapi::ByteString;
use k8s_openapi::api::core::v1::Secret;
use k8s_openapi::api::core::v1::{Pod, Secret};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use log::{info, warn};
use serde::Serialize;
/// Deploys an opinionated, highly available PostgreSQL cluster managed by CNPG.
///
/// This score automatically ensures the CloudNativePG (CNPG) operator is installed
/// before creating the Cluster CRD. The installation method depends on the Kubernetes
/// distribution:
///
/// - **OpenShift/OKD**: Uses OperatorHub Subscription via `CloudNativePgOperatorScore`
/// - **K3s/Other**: Uses Helm chart via `CloudNativePgOperatorHelmScore`
///
/// # Usage
/// ```
/// use harmony::modules::postgresql::PostgreSQLScore;
@@ -26,12 +37,7 @@ use serde::Serialize;
/// ```
///
/// # Limitations (Happy Path)
/// - Requires CNPG operator installed (use CloudNativePgOperatorScore).
/// - No backups, monitoring, extensions configured.
///
/// TODO : refactor this to declare a clean dependency on cnpg operator. Then cnpg operator will
/// self-deploy either using operatorhub or helm chart depending on k8s flavor. This is cnpg
/// specific behavior
#[derive(Debug, Clone, Serialize)]
pub struct K8sPostgreSQLScore {
pub config: PostgreSQLConfig,
@@ -56,7 +62,7 @@ impl K8sPostgreSQLScore {
}
}
impl<T: Topology + K8sclient> Score<T> for K8sPostgreSQLScore {
impl<T: Topology + K8sclient + HelmCommand + 'static> Score<T> for K8sPostgreSQLScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(K8sPostgreSQLInterpret {
config: self.config.clone(),
@@ -73,13 +79,127 @@ pub struct K8sPostgreSQLInterpret {
config: PostgreSQLConfig,
}
impl K8sPostgreSQLInterpret {
async fn ensure_namespace<T: Topology + K8sclient>(
&self,
topology: &T,
) -> Result<(), InterpretError> {
let k8s_client = topology
.k8s_client()
.await
.map_err(|e| InterpretError::new(format!("Failed to get k8s client: {}", e)))?;
let namespace_name = &self.config.namespace;
if k8s_client
.namespace_exists(namespace_name)
.await
.map_err(|e| {
InterpretError::new(format!(
"Failed to check namespace '{}': {}",
namespace_name, e
))
})?
{
info!("Namespace '{}' already exists", namespace_name);
return Ok(());
}
info!("Creating namespace '{}'", namespace_name);
k8s_client
.create_namespace(namespace_name)
.await
.map_err(|e| {
InterpretError::new(format!(
"Failed to create namespace '{}': {}",
namespace_name, e
))
})?;
k8s_client
.wait_for_namespace(namespace_name, Some(std::time::Duration::from_secs(30)))
.await
.map_err(|e| {
InterpretError::new(format!("Namespace '{}' not ready: {}", namespace_name, e))
})?;
info!("Namespace '{}' is ready", namespace_name);
Ok(())
}
async fn ensure_cnpg_operator<T: Topology + K8sclient + HelmCommand + 'static>(
&self,
topology: &T,
) -> Result<(), InterpretError> {
let k8s_client = topology
.k8s_client()
.await
.map_err(|e| InterpretError::new(format!("Failed to get k8s client: {}", e)))?;
let pods = k8s_client
.list_all_resources_with_labels::<Pod>("app.kubernetes.io/name=cloudnative-pg")
.await
.map_err(|e| {
InterpretError::new(format!("Failed to list CNPG operator pods: {}", e))
})?;
if !pods.is_empty() {
info!("CNPG operator is already installed");
return Ok(());
}
warn!("CNPG operator not found, installing...");
let distro = k8s_client.get_k8s_distribution().await.map_err(|e| {
InterpretError::new(format!("Failed to detect k8s distribution: {}", e))
})?;
match distro {
KubernetesDistribution::OpenshiftFamily => {
info!("Installing CNPG operator via OperatorHub Subscription");
let score = CloudNativePgOperatorScore::default_openshift();
score
.interpret(&Inventory::empty(), topology)
.await
.map_err(|e| {
InterpretError::new(format!("Failed to install CNPG operator: {}", e))
})?;
}
KubernetesDistribution::K3sFamily | KubernetesDistribution::Default => {
info!("Installing CNPG operator via Helm chart");
let score = CloudNativePgOperatorHelmScore::default();
score
.interpret(&Inventory::empty(), topology)
.await
.map_err(|e| {
InterpretError::new(format!("Failed to install CNPG operator: {}", e))
})?;
}
}
k8s_client
.wait_until_deployment_ready(
"cloudnative-pg",
Some("cnpg-system"),
Some(std::time::Duration::from_secs(120)),
)
.await
.map_err(|e| InterpretError::new(format!("CNPG operator not ready: {}", e)))?;
info!("CNPG operator is ready");
Ok(())
}
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for K8sPostgreSQLInterpret {
impl<T: Topology + K8sclient + HelmCommand + 'static> Interpret<T> for K8sPostgreSQLInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
self.ensure_cnpg_operator(topology).await?;
self.ensure_namespace(topology).await?;
match &self.config.role {
super::capability::PostgreSQLClusterRole::Primary => {
let metadata = ObjectMeta {

View File

@@ -5,7 +5,7 @@ use directories::ProjectDirs;
use interactive_parse::InteractiveParseObj;
use log::debug;
use schemars::JsonSchema;
use serde::{de::DeserializeOwned, Serialize};
use serde::{Serialize, de::DeserializeOwned};
use std::path::PathBuf;
use std::sync::Arc;
use thiserror::Error;
@@ -76,12 +76,11 @@ impl ConfigManager {
pub async fn get<T: Config>(&self) -> Result<T, ConfigError> {
for source in &self.sources {
if let Some(value) = source.get(T::KEY).await? {
let config: T = serde_json::from_value(value).map_err(|e| {
ConfigError::Deserialization {
let config: T =
serde_json::from_value(value).map_err(|e| ConfigError::Deserialization {
key: T::KEY.to_string(),
source: e,
}
})?;
})?;
debug!("Retrieved config for key {} from source", T::KEY);
return Ok(config);
}
@@ -95,17 +94,20 @@ impl ConfigManager {
match self.get::<T>().await {
Ok(config) => Ok(config),
Err(ConfigError::NotFound { .. }) => {
let config = T::parse_to_obj()
.map_err(|e| ConfigError::PromptError(e.to_string()))?;
let config =
T::parse_to_obj().map_err(|e| ConfigError::PromptError(e.to_string()))?;
for source in &self.sources {
if let Err(e) = source
.set(T::KEY, &serde_json::to_value(&config).map_err(|e| {
ConfigError::Serialization {
key: T::KEY.to_string(),
source: e,
}
})?)
.set(
T::KEY,
&serde_json::to_value(&config).map_err(|e| {
ConfigError::Serialization {
key: T::KEY.to_string(),
source: e,
}
})?,
)
.await
{
debug!("Failed to save config to source: {e}");
@@ -175,8 +177,35 @@ mod tests {
use super::*;
use pretty_assertions::assert_eq;
use serde::{Deserialize, Serialize};
use std::sync::Mutex;
use std::sync::atomic::{AtomicUsize, Ordering};
static TEST_COUNTER: AtomicUsize = AtomicUsize::new(0);
static ENV_LOCK: Mutex<()> = Mutex::new(());
fn setup_env_vars(key: &str, value: Option<&str>) -> String {
let id = TEST_COUNTER.fetch_add(1, Ordering::SeqCst);
let env_var = format!("HARMONY_CONFIG_{}_{}", key, id);
unsafe {
if let Some(v) = value {
std::env::set_var(&env_var, v);
} else {
std::env::remove_var(&env_var);
}
}
env_var
}
fn run_in_isolated_env<F>(f: F)
where
F: FnOnce() + Send + 'static,
{
let handle = std::thread::spawn(f);
handle.join().expect("Test thread panicked");
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema, PartialEq)]
struct TestConfig {
name: String,
@@ -339,18 +368,14 @@ mod tests {
#[tokio::test]
async fn test_env_source_reads_from_environment() {
unsafe {
std::env::set_var(
"HARMONY_CONFIG_TestConfig",
r#"{"name":"from_env","count":7}"#,
);
}
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let env_var = setup_env_vars("TestConfig", Some(r#"{"name":"from_env","count":7}"#));
let source = EnvSource;
let result = source.get("TestConfig").await;
let result = source.get(&env_var.replace("HARMONY_CONFIG_", "")).await;
unsafe {
std::env::remove_var("HARMONY_CONFIG_TestConfig");
std::env::remove_var(&env_var);
}
let value = result.unwrap().unwrap();
@@ -361,26 +386,32 @@ mod tests {
#[tokio::test]
async fn test_env_source_returns_none_when_not_set() {
unsafe {
std::env::remove_var("HARMONY_CONFIG_TestConfig");
}
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
run_in_isolated_env(|| {
let env_var = setup_env_vars("TestConfig", None);
let source = EnvSource;
let result = source.get("TestConfig").await.unwrap();
assert!(result.is_none());
let rt = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap();
rt.block_on(async {
let source = EnvSource;
let result = source.get(&env_var.replace("HARMONY_CONFIG_", "")).await;
assert!(result.unwrap().is_none());
});
});
}
#[tokio::test]
async fn test_env_source_returns_error_for_invalid_json() {
unsafe {
std::env::set_var("HARMONY_CONFIG_TestConfig", "not valid json");
}
let _lock = ENV_LOCK.lock().unwrap_or_else(|e| e.into_inner());
let env_var = setup_env_vars("TestConfig", Some("not valid json"));
let source = EnvSource;
let result = source.get("TestConfig").await;
let result = source.get(&env_var.replace("HARMONY_CONFIG_", "")).await;
unsafe {
std::env::remove_var("HARMONY_CONFIG_TestConfig");
std::env::remove_var(&env_var);
}
assert!(result.is_err());
@@ -438,4 +469,4 @@ mod tests {
assert_eq!(parsed, config);
}
}
}

View File

@@ -1,5 +1,5 @@
use async_trait::async_trait;
use crate::{ConfigError, ConfigSource};
use async_trait::async_trait;
pub struct EnvSource;
@@ -11,16 +11,14 @@ fn env_key_for(config_key: &str) -> String {
impl ConfigSource for EnvSource {
async fn get(&self, key: &str) -> Result<Option<serde_json::Value>, ConfigError> {
let env_key = env_key_for(key);
match std::env::var(&env_key) {
Ok(value) => {
serde_json::from_str(&value)
.map(Some)
.map_err(|e| ConfigError::EnvError(format!(
"Invalid JSON in environment variable {}: {}",
env_key, e
)))
}
Ok(value) => serde_json::from_str(&value).map(Some).map_err(|e| {
ConfigError::EnvError(format!(
"Invalid JSON in environment variable {}: {}",
env_key, e
))
}),
Err(std::env::VarError::NotPresent) => Ok(None),
Err(e) => Err(ConfigError::EnvError(format!(
"Failed to read environment variable {}: {}",
@@ -31,12 +29,11 @@ impl ConfigSource for EnvSource {
async fn set(&self, key: &str, value: &serde_json::Value) -> Result<(), ConfigError> {
let env_key = env_key_for(key);
let json_string = serde_json::to_string(value)
.map_err(|e| ConfigError::Serialization {
key: key.to_string(),
source: e,
})?;
let json_string = serde_json::to_string(value).map_err(|e| ConfigError::Serialization {
key: key.to_string(),
source: e,
})?;
// SAFETY: Setting environment variables is generally safe in single-threaded contexts.
// In multi-threaded contexts, this could cause races, but is acceptable for this use case
// as config is typically set once at startup.
@@ -45,4 +42,4 @@ impl ConfigSource for EnvSource {
}
Ok(())
}
}
}

View File

@@ -26,14 +26,15 @@ impl LocalFileSource {
impl ConfigSource for LocalFileSource {
async fn get(&self, key: &str) -> Result<Option<serde_json::Value>, ConfigError> {
let path = self.file_path_for(key);
match fs::read(&path).await {
Ok(contents) => {
let value: serde_json::Value = serde_json::from_slice(&contents)
.map_err(|e| ConfigError::Deserialization {
let value: serde_json::Value = serde_json::from_slice(&contents).map_err(|e| {
ConfigError::Deserialization {
key: key.to_string(),
source: e,
})?;
}
})?;
Ok(Some(value))
}
Err(e) if e.kind() == std::io::ErrorKind::NotFound => Ok(None),
@@ -46,16 +47,16 @@ impl ConfigSource for LocalFileSource {
async fn set(&self, key: &str, value: &serde_json::Value) -> Result<(), ConfigError> {
fs::create_dir_all(&self.base_path).await?;
let path = self.file_path_for(key);
let contents = serde_json::to_string_pretty(value)
.map_err(|e| ConfigError::Serialization {
let contents =
serde_json::to_string_pretty(value).map_err(|e| ConfigError::Serialization {
key: key.to_string(),
source: e,
})?;
fs::write(&path, contents).await?;
Ok(())
}
}
}

View File

@@ -1,4 +1,4 @@
pub mod env;
pub mod local_file;
pub mod prompt;
pub mod store;
pub mod store;

View File

@@ -18,7 +18,9 @@ impl PromptSource {
#[allow(dead_code)]
pub fn with_writer(writer: Arc<dyn std::io::Write + Send + Sync>) -> Self {
Self { writer: Some(writer) }
Self {
writer: Some(writer),
}
}
}
@@ -45,4 +47,4 @@ where
{
let _guard = PROMPT_MUTEX.lock().await;
f.await
}
}

View File

@@ -19,8 +19,8 @@ impl<S: SecretStore + 'static> ConfigSource for StoreSource<S> {
async fn get(&self, key: &str) -> Result<Option<serde_json::Value>, ConfigError> {
match self.store.get_raw(&self.namespace, key).await {
Ok(bytes) => {
let value: serde_json::Value = serde_json::from_slice(&bytes)
.map_err(|e| ConfigError::Deserialization {
let value: serde_json::Value =
serde_json::from_slice(&bytes).map_err(|e| ConfigError::Deserialization {
key: key.to_string(),
source: e,
})?;
@@ -36,10 +36,10 @@ impl<S: SecretStore + 'static> ConfigSource for StoreSource<S> {
key: key.to_string(),
source: e,
})?;
self.store
.set_raw(&self.namespace, key, &bytes)
.await
.map_err(ConfigError::StoreError)
}
}
}

View File

@@ -1,7 +1,7 @@
use proc_macro::TokenStream;
use proc_macro_crate::{crate_name, FoundCrate};
use proc_macro_crate::{FoundCrate, crate_name};
use quote::quote;
use syn::{parse_macro_input, DeriveInput, Ident};
use syn::{DeriveInput, Ident, parse_macro_input};
#[proc_macro_derive(Config)]
pub fn derive_config(input: TokenStream) -> TokenStream {

View File

@@ -183,7 +183,7 @@ impl OpenbaoSecretStore {
}
#[cfg(not(unix))]
{
fs::write(path, token)?;
fs::write(path, serde_json::to_string(token)?.as_bytes())?;
}
Ok(())
}

View File

@@ -0,0 +1,65 @@
apiVersion: v1
kind: Namespace
metadata:
name: harmony-iobench
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: iobench-privileged-scc
subjects:
- kind: ServiceAccount
name: iobench
namespace: harmony-iobench # Change to your namespace/project
roleRef:
kind: ClusterRole
name: system:openshift:scc:privileged
apiGroup: rbac.authorization.k8s.io
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: iobench
namespace: harmony-iobench # Change to your namespace/project
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: iobench
namespace: harmony-iobench # Change to your namespace/project
labels:
app: iobench
spec:
replicas: 1
selector:
matchLabels:
app: iobench
template:
metadata:
labels:
app: iobench
spec:
nodeSelector:
#iobench: "true" # Tune to match your target node's label(s), e.g.,
kubernetes.io/hostname: worker3
tolerations:
- key: node.kubernetes.io/unschedulable
operator: Exists
effect: NoSchedule
serviceAccountName: iobench
containers:
- name: fio
image: juicedata/fio:latest # Replace with your preferred fio image
imagePullPolicy: IfNotPresent
command: [ "sleep", "infinity" ] # Keeps the container running for kubectl exec
securityContext:
runAsUser: 0
privileged: true
volumeMounts:
- name: iobench-pvc
mountPath: /data # Mount the PVC at /data
volumes:
- name: iobench-pvc
hostPath:
path: /var/lib/harmony-iobench # Tune this to your desired host node path
type: DirectoryOrCreate