Compare commits

..

2 Commits

Author SHA1 Message Date
3cf4fe4855 feat: rustfs 2026-03-21 11:36:50 -04:00
ccc26e07eb feat: harmony_asset crate to manage assets, local, s3, http urls, etc
Some checks failed
Run Check Script / check (pull_request) Failing after 17s
2026-03-21 11:10:51 -04:00
31 changed files with 2621 additions and 539 deletions

878
Cargo.lock generated

File diff suppressed because it is too large Load Diff

View File

@@ -23,6 +23,7 @@ members = [
"harmony_agent/deploy", "harmony_agent/deploy",
"harmony_node_readiness", "harmony_node_readiness",
"harmony-k8s", "harmony-k8s",
"harmony_assets",
] ]
[workspace.package] [workspace.package]
@@ -37,6 +38,7 @@ derive-new = "0.7"
async-trait = "0.1" async-trait = "0.1"
tokio = { version = "1.40", features = [ tokio = { version = "1.40", features = [
"io-std", "io-std",
"io-util",
"fs", "fs",
"macros", "macros",
"rt-multi-thread", "rt-multi-thread",
@@ -73,6 +75,7 @@ base64 = "0.22.1"
tar = "0.4.44" tar = "0.4.44"
lazy_static = "1.5.0" lazy_static = "1.5.0"
directories = "6.0.0" directories = "6.0.0"
futures-util = "0.3"
thiserror = "2.0.14" thiserror = "2.0.14"
serde = { version = "1.0.209", features = ["derive", "rc"] } serde = { version = "1.0.209", features = ["derive", "rc"] }
serde_json = "1.0.127" serde_json = "1.0.127"
@@ -86,3 +89,4 @@ reqwest = { version = "0.12", features = [
"json", "json",
], default-features = false } ], default-features = false }
assertor = "0.0.4" assertor = "0.0.4"
tokio-test = "0.4"

View File

@@ -1,124 +0,0 @@
# ADR014 · Port Allocation Policy
**Date:**20251103
**Status:**Proposed
**Authors:**Infrastructure Team / NationTech Harmony Core
---
## 1·Context
Harmony orchestrates heterogeneous clusters — from developer laptops to multitenant production environments — through a single, typed Rust codebase.
To enable deterministic, automatable service exposure across clusters, we need a **stable, conflictfree range of TCP/UDP ports** reserved for Harmonymanaged workloads and hostnetwork services.
Today, various manual operations (e.g. CephRGW on hostnetwork) occasionally use adhoc local ports such as`80`or`8080`. This leads to unpredictable overlaps with NodePorts, ephemeral ports, or other daemons.
Since Harmony aims to unify *manual* and *automated* infrastructure under a single declarative model, we formalize a **reserved port block** that should be used consistently — even for outofband operations — so everything remains alignmentready for future orchestration.
---
## 2·Problem Statement
We must select a contiguous range of approximately1000ports that:
1. Avoids conflict with:
- System/wellknown ports(`01023`)
- Common registered application ports(`102449151`)
- KubernetesNodePortdefaultrange(`3000032767`)
- Ephemeral port ranges (typically`3276860999`onLinux)
2. Feels deterministic and maintainable for longterm automation.
3. Can be mirrored across multiple clusters and baremetal hosts.
---
## 3·Decision
We designate:
> **Harmony Reserved Port Range:**`2500025999`
### Rationale
| Criterion | Reasoning |
|------------|------------|
| **Safety** | Lies below the default ephemeral port start(`32768`)and above NodePort(`3000032767`). |
| **Conflicts** | Very few historical IANA registrations in this span, practically unused in modern systems. |
| **Predictability** | Easy to recognize and communicate (`25kblock`for Harmony). |
| **Ergonomics** | Mirrors production service conventions(`25080` for HTTPlike, `25443` for HTTPSlike, etc.). |
| **Crosscluster consistency** | Portable across OKD,K3D,baremetal allows static firewall rules. |
This range is *registered* (per IANA definition102449151) but not *occupied*. Choosing such a midregistered band is intentional: it guarantees good interplatform compatibility without interfering with OSmanaged ephemeral allocations.
---
## 4·Integration inHarmony
Harmonys topology and orchestration layers should expose a builtin **`PortAllocator`** resource:
```rust
let orchestrator_ports = harmony::network::PortAllocator::new(25_000, 25_999);
let rados_gateway_port = orchestrator_ports.allocate("ceph-rgw")?;
```
In TOML configuration:
```toml
[harmony.network.defaults]
orchestrator_port_range = "25000-25999"
```
All `Score`/`Interpret` modules that deploy hostnetwork or externally visible services should use this allocator to pick ports deterministically.
---
## 5·Operational Guidance
- **Manual Operations:**Even manually configured hostnetwork services on clusters we manage (e.g. CephRGW,ntfy,reverseproxytests) should adopt ports within`2500025999`.
- **Automation Alignment:**Any such ports are considered *Harmonymanageable* and may be introspected later by the orchestrator.
- **Firewall Rules:**Cluster and site firewalls may prewhitelist this block as “HarmonyInfraPorts”.
---
## 6·Initial Allocation Table
| Service / Component | Port | Notes |
|----------------------|------|-------|
| **CephRADOSGateway** |**25080**| Replacement for`:80`; publicHTTPfrontendviaRGW(hostnetwork). |
| **InternalHTTPSServices** |25443 | Equivalent of`:443`for secure endpoints. |
| **Prometheus/MetricsEndpoints** |2509025099 | Internal observability. |
| **HarmonyControlPlane(gRPC)** |2550025519 | Interprocess messaging, API,TUIbackend. |
| **ReservedforFutureScores** |2555025599 | Dedicated expansion slots. |
These values shall be managed centrally in a `harmony.network.ports` registry file (YAML/TOML) within the repository.
---
## 7·Consequences
### Positive
- Eliminates accidental collisions with NodePorts and ephemeral connections.
- Provides a memorable, humanreadable convention suitable for documentation and automation.
- Enables Harmony to treat hostnetwork configuration as firstclass code.
### Negative / Tradeoffs
- Occupies part of the IANA “registered” band (must remain consistent across environments).
- Requires operators to adapt legacy services away from lower ports(`80`,`443`)for compliance with this policy.
---
## 8·Future Work
- Extend HarmonyCLI to display currently allocated ports and detect conflicts.
- Expose a compiletime capability check ensuring clusters in a topology do not reuse the same port twice.
- Consider parallel UDP range reservation when we start exposing datagram services.
---
## 9·References
- [IANAServiceName and TransportProtocol PortNumber Registry](https://www.iana.org/assignments/service-names-port-numbers/service-names-port-numbers.xhtml)
- [KubernetesNodePortdefaultrange documentation](https://kubernetes.io/docs/concepts/services-networking/service/)
- HarmonyADR001·WhyRust
- HarmonyADR003·InfrastructureAbstractions
---
*Made with🦀by theNationTechHarmonyteam — ensuring your infrastructure behaves like code.*

View File

@@ -1,107 +0,0 @@
### Context & Use Case
We are implementing a High Availability (HA) Failover Strategy for decentralized "Micro Data Centers." The core challenge is managing stateful workloads (PostgreSQL) over unreliable networks.
We solve this using a **Local Fencing First** approach, backed by **NATS JetStream Strict Ordering** for the final promotion authority.
In CAP theorem terms, we are developing a CP system, intentionally sacrificing availability. In practical terms, we expect an average of two primary outages per year, with a failover delay of around 2 minutes. This translates to an uptime of over five nines. To be precise, 2 outages * 2 minutes = 4 minutes per year = 99.99924% uptime.
### The Algorithm: Local Fencing & Remote Promotion
The safety (data consistency) of the system relies on the time gap between the **Primary giving up (Fencing)** and the **Replica taking over (Promotion)**.
To avoid clock skew issues between agents and datastore (nats), all timestamps comparisons will be done using jetstream metadata. I.E. a harmony agent will never use `Instant::now()` to get a timestamp, it will use `my_last_heartbeat.metadata.timestamp` (conceptually).
#### 1. Configuration
* `heartbeat_timeout` (e.g., 1s): Max time allowed for a NATS write/DB check.
* `failure_threshold` (e.g., 2): Consecutive failures before self-fencing.
* `failover_timeout` (e.g., 5s): Time since last NATS update of Primary heartbeat before Replica promotes.
* This timeout must be carefully configured to allow enough time for the primary to fence itself (after `heartbeat_timeout * failure_threshold`) BEFORE the replica gets promoted to avoid a split brain with two primaries.
* Implementing this will rely on the actual deployment configuration. For example, a CNPG based PostgreSQL cluster might require a longer gap (such as 30s) than other technologies.
* Expires when `replica_heartbeat.metadata.timestamp - primary_heartbeat.metadata.timestamp > failover_timeout`
#### 2. The Primary (Self-Preservation)
The Primary is aggressive about killing itself.
* It attempts a heartbeat.
* If the network latency > `heartbeat_timeout`, the attempt is **cancelled locally** because the heartbeat did not make it back in time.
* This counts as a failure and increments the `consecutive_failures` counter.
* If `consecutive_failures` hit the threshold, **FENCING occurs immediately**. The database is stopped.
This means that the Primary will fence itself after `heartbeat_timeout * failure_threshold`.
#### 3. The Replica (The Watchdog)
The Replica is patient.
* It watches the NATS stream to measure if `replica_heartbeat.metadata.timestamp - primary_heartbeat.metadata.timestamp > failover_timeout`
* It only attempts promotion if the `failover_timeout` (5s) has passed.
* **Crucial:** Careful configuration of the failover_timeout is required. This is the only way to avoid a split brain in case of a network partition where the Primary cannot write its heartbeats in time anymore.
* In short, `failover_timeout` should be tuned to be `heartbeat_timeout * failure_threshold + safety_margin`. This `safety_margin` will vary by use case. For example, a CNPG cluster may need 30 seconds to demote a Primary to Replica when fencing is triggered, so `safety_margin` should be at least 30s in that setup.
Since we forcibly fail timeouts after `heartbeat_timeout`, we are guaranteed that the primary will have **started** the fencing process after `heartbeat_timeout * failure_threshold`.
But, in a network split scenario where the failed primary is still accessible by clients but cannot write its heartbeat successfully, there is no way to know if the demotion has actually **completed**.
For example, in a CNPG cluster, the failed Primary agent will attempt to change the CNPG cluster state to read-only. But if anything fails after that attempt (permission error, k8s api failure, CNPG bug, etc) it is possible that the PostgreSQL instance keeps accepting writes.
While this is not a theoretical failure of the agent's algorithm, this is a practical failure where data corruption occurs.
This can be fixed by detecting the demotion failure and escalating the fencing procedure aggressiveness. Harmony being an infrastructure orchestrator, it can easily exert radical measures if given the proper credentials, such as forcibly powering off a server, disconnecting its network in the switch configuration, forcibly kill a pod/container/process, etc.
However, these details are out of scope of this algorithm, as they simply fall under the "fencing procedure".
The implementation of the fencing procedure itself is not relevant. This algorithm's responsibility stops at calling the fencing procedure in the appropriate situation.
#### 4. The Demotion Handshake (Return to Normalcy)
When the original Primary recovers:
1. It becomes healthy locally but sees `current_primary = Replica`. It waits.
2. The Replica (current leader) detects the Original Primary is back (via NATS heartbeats).
3. Replica performs a **Clean Demotion**:
* Stops DB.
* Writes `current_primary = None` to NATS.
4. Original Primary sees `current_primary = None` and can launch the promotion procedure.
Depending on the implementation, the promotion procedure may require a transition phase. Typically, for a PostgreSQL use case the promoting primary will make sure it has caught up on WAL replication before starting to accept writes.
---
### Failure Modes & Behavior Analysis
#### Case 1: Immediate Outage (Power Cut)
* **Primary:** Dies instantly. Fencing is implicit (machine is off).
* **Replica:** Waits for `failover_timeout` (5s). Sees staleness. Promotes self.
* **Outcome:** Clean failover after 5s.
// TODO detail what happens when the primary comes back up. We will likely have to tie PostgreSQL's lifecycle (liveness/readiness probes) with the agent to ensure it does not come back up as primary.
#### Case 2: High Network Latency on the Primary (The "Split Brain" Trap)
* **Scenario:** Network latency spikes to 5s on the Primary, still below `heartbeat_timeout` on the Replica.
* **T=0 to T=2 (Primary):** Tries to write. Latency (5s) > Timeout (1s). Fails twice.
* **T=2 (Primary):** `consecutive_failures` = 2. **Primary Fences Self.** (Service is DOWN).
* **T=2 to T=5 (Cluster):** **Read-Only Phase.** No Primary exists.
* **T=5 (Replica):** `failover_timeout` reached. Replica promotes self.
* **Outcome:** Safe failover. The "Read-Only Gap" (T=2 to T=5) ensures no Split Brain occurred.
#### Case 3: Replica Network Lag (False Positive)
* **Scenario:** Replica has high latency, greater than `failover_timeout`; Primary is fine.
* **Replica:** Thinks Primary is dead. Tries to promote by setting `cluster_state.current_primary = replica_id`.
* **NATS:** Rejects the write because the Primary is still updating the sequence numbers successfully.
* **Outcome:** Promotion denied. Primary stays leader.
#### Case 4: Network Instability (Flapping)
* **Scenario:** Intermittent packet loss.
* **Primary:** Fails 1 heartbeat, succeeds the next. `consecutive_failures` resets.
* **Replica:** Sees a slight delay in updates, but never reaches `failover_timeout`.
* **Outcome:** No Fencing, No Promotion. System rides out the noise.
## Contextual notes
* Clock skew : Tokio relies on monotonic clocks. This means that `tokio::time::sleep(...)` will not be affected by system clock corrections (such as NTP). But monotonic clocks are known to jump forward in some cases such as VM live migrations. This could mean a false timeout of a single heartbeat. If `failure_threshold = 1`, this can mean a false negative on the nodes' health, and a potentially useless demotion.

View File

@@ -1,107 +0,0 @@
### Context & Use Case
We are implementing a High Availability (HA) Failover Strategy for decentralized "Micro Data Centers." The core challenge is managing stateful workloads (PostgreSQL) over unreliable networks.
We solve this using a **Local Fencing First** approach, backed by **NATS JetStream Strict Ordering** for the final promotion authority.
In CAP theorem terms, we are developing a CP system, intentionally sacrificing availability. In practical terms, we expect an average of two primary outages per year, with a failover delay of around 2 minutes. This translates to an uptime of over five nines. To be precise, 2 outages * 2 minutes = 4 minutes per year = 99.99924% uptime.
### The Algorithm: Local Fencing & Remote Promotion
The safety (data consistency) of the system relies on the time gap between the **Primary giving up (Fencing)** and the **Replica taking over (Promotion)**.
To avoid clock skew issues between agents and datastore (nats), all timestamps comparisons will be done using jetstream metadata. I.E. a harmony agent will never use `Instant::now()` to get a timestamp, it will use `my_last_heartbeat.metadata.timestamp` (conceptually).
#### 1. Configuration
* `heartbeat_timeout` (e.g., 1s): Max time allowed for a NATS write/DB check.
* `failure_threshold` (e.g., 2): Consecutive failures before self-fencing.
* `failover_timeout` (e.g., 5s): Time since last NATS update of Primary heartbeat before Replica promotes.
* This timeout must be carefully configured to allow enough time for the primary to fence itself (after `heartbeat_timeout * failure_threshold`) BEFORE the replica gets promoted to avoid a split brain with two primaries.
* Implementing this will rely on the actual deployment configuration. For example, a CNPG based PostgreSQL cluster might require a longer gap (such as 30s) than other technologies.
* Expires when `replica_heartbeat.metadata.timestamp - primary_heartbeat.metadata.timestamp > failover_timeout`
#### 2. The Primary (Self-Preservation)
The Primary is aggressive about killing itself.
* It attempts a heartbeat.
* If the network latency > `heartbeat_timeout`, the attempt is **cancelled locally** because the heartbeat did not make it back in time.
* This counts as a failure and increments the `consecutive_failures` counter.
* If `consecutive_failures` hit the threshold, **FENCING occurs immediately**. The database is stopped.
This means that the Primary will fence itself after `heartbeat_timeout * failure_threshold`.
#### 3. The Replica (The Watchdog)
The Replica is patient.
* It watches the NATS stream to measure if `replica_heartbeat.metadata.timestamp - primary_heartbeat.metadata.timestamp > failover_timeout`
* It only attempts promotion if the `failover_timeout` (5s) has passed.
* **Crucial:** Careful configuration of the failover_timeout is required. This is the only way to avoid a split brain in case of a network partition where the Primary cannot write its heartbeats in time anymore.
* In short, `failover_timeout` should be tuned to be `heartbeat_timeout * failure_threshold + safety_margin`. This `safety_margin` will vary by use case. For example, a CNPG cluster may need 30 seconds to demote a Primary to Replica when fencing is triggered, so `safety_margin` should be at least 30s in that setup.
Since we forcibly fail timeouts after `heartbeat_timeout`, we are guaranteed that the primary will have **started** the fencing process after `heartbeat_timeout * failure_threshold`.
But, in a network split scenario where the failed primary is still accessible by clients but cannot write its heartbeat successfully, there is no way to know if the demotion has actually **completed**.
For example, in a CNPG cluster, the failed Primary agent will attempt to change the CNPG cluster state to read-only. But if anything fails after that attempt (permission error, k8s api failure, CNPG bug, etc) it is possible that the PostgreSQL instance keeps accepting writes.
While this is not a theoretical failure of the agent's algorithm, this is a practical failure where data corruption occurs.
This can be fixed by detecting the demotion failure and escalating the fencing procedure aggressiveness. Harmony being an infrastructure orchestrator, it can easily exert radical measures if given the proper credentials, such as forcibly powering off a server, disconnecting its network in the switch configuration, forcibly kill a pod/container/process, etc.
However, these details are out of scope of this algorithm, as they simply fall under the "fencing procedure".
The implementation of the fencing procedure itself is not relevant. This algorithm's responsibility stops at calling the fencing procedure in the appropriate situation.
#### 4. The Demotion Handshake (Return to Normalcy)
When the original Primary recovers:
1. It becomes healthy locally but sees `current_primary = Replica`. It waits.
2. The Replica (current leader) detects the Original Primary is back (via NATS heartbeats).
3. Replica performs a **Clean Demotion**:
* Stops DB.
* Writes `current_primary = None` to NATS.
4. Original Primary sees `current_primary = None` and can launch the promotion procedure.
Depending on the implementation, the promotion procedure may require a transition phase. Typically, for a PostgreSQL use case the promoting primary will make sure it has caught up on WAL replication before starting to accept writes.
---
### Failure Modes & Behavior Analysis
#### Case 1: Immediate Outage (Power Cut)
* **Primary:** Dies instantly. Fencing is implicit (machine is off).
* **Replica:** Waits for `failover_timeout` (5s). Sees staleness. Promotes self.
* **Outcome:** Clean failover after 5s.
// TODO detail what happens when the primary comes back up. We will likely have to tie PostgreSQL's lifecycle (liveness/readiness probes) with the agent to ensure it does not come back up as primary.
#### Case 2: High Network Latency on the Primary (The "Split Brain" Trap)
* **Scenario:** Network latency spikes to 5s on the Primary, still below `heartbeat_timeout` on the Replica.
* **T=0 to T=2 (Primary):** Tries to write. Latency (5s) > Timeout (1s). Fails twice.
* **T=2 (Primary):** `consecutive_failures` = 2. **Primary Fences Self.** (Service is DOWN).
* **T=2 to T=5 (Cluster):** **Read-Only Phase.** No Primary exists.
* **T=5 (Replica):** `failover_timeout` reached. Replica promotes self.
* **Outcome:** Safe failover. The "Read-Only Gap" (T=2 to T=5) ensures no Split Brain occurred.
#### Case 3: Replica Network Lag (False Positive)
* **Scenario:** Replica has high latency, greater than `failover_timeout`; Primary is fine.
* **Replica:** Thinks Primary is dead. Tries to promote by setting `cluster_state.current_primary = replica_id`.
* **NATS:** Rejects the write because the Primary is still updating the sequence numbers successfully.
* **Outcome:** Promotion denied. Primary stays leader.
#### Case 4: Network Instability (Flapping)
* **Scenario:** Intermittent packet loss.
* **Primary:** Fails 1 heartbeat, succeeds the next. `consecutive_failures` resets.
* **Replica:** Sees a slight delay in updates, but never reaches `failover_timeout`.
* **Outcome:** No Fencing, No Promotion. System rides out the noise.
## Contextual notes
* Clock skew : Tokio relies on monotonic clocks. This means that `tokio::time::sleep(...)` will not be affected by system clock corrections (such as NTP). But monotonic clocks are known to jump forward in some cases such as VM live migrations. This could mean a false timeout of a single heartbeat. If `failure_threshold = 1`, this can mean a false negative on the nodes' health, and a potentially useless demotion.

View File

@@ -1,84 +0,0 @@
# ADR 017: Multi-Cluster Federation and Granular Authorization
* **Status:** Accepted
* **Date:** 2025-12-20
* **Context:** ADR-016 Harmony Agent and Global Mesh For Decentralized Workload Management
## Context and Problem Statement
As Harmony expands to manage workloads across multiple independent clusters, we face two specific requirements regarding network topology and security:
1. **Granular Authorization:** We need the ability to authorize a specific node (Leaf Node) from the decentralized mesh to access only a specific subset of topics within a cluster. For example, a collaborator connecting to our Public Cloud should not have visibility into other collaborator's data or our internal system events. Same goes the other way, when we connect to a collaborator cluster, we must not gain access to their internal data.
2. **Hybrid/Failover Topologies:** Users need to run their own private meshes (e.g., 5 interconnected office locations) with full internal trust, while simultaneously maintaining a connection to the Harmony Public Cloud for failover or bursting. In this scenario, the Public Cloud must not have access to the private mesh's internal data, and the private mesh should only access specific "public" topics on the cloud.
We need to decide how the Harmony Agent and the underlying NATS infrastructure will handle these "Split-Horizon" and "Zero-Trust" scenarios without complicating the application logic within the Agent itself.
## Decision Drivers
* **Security:** Default-deny posture is required for cross-domain connections.
* **Isolation:** Private on-premise clusters must remain private; data leakage to the public cloud is unacceptable.
* **Simplicity:** The Harmony Agent code should not be burdened with managing multiple complex TCP connections or application-layer routing logic.
* **Efficiency:** Failover traffic should only occur when necessary; network chatter (gossip) from the Public Cloud should not propagate to private nodes.
## Considered Options
1. **Application-Layer Multi-Homing:** The Harmony Agent maintains two distinct active connections (one to Local, one to Cloud) and handles routing logic in Go code.
2. **VPN/VPC Peering:** Connect private networks to the public cloud at the network layer (IPSec/WireGuard).
3. **NATS Account Federation & Leaf Nodes:** Utilize NATS native multi-tenancy (Accounts) and topology bridging (Leaf Nodes) to handle routing and permissions at the protocol layer.
## Decision Outcome
We will adopt **Option 3: NATS Account Federation & Leaf Nodes**.
We will leverage NATS built-in "Accounts" to create logical isolation boundaries and "Service Imports/Exports" to bridge specific data streams between private and public clusters.
### 1. Authorization Strategy (Subject-Based Permissions)
We will not implement custom authorization logic inside the Harmony Agent for topic access. Instead, we will enforce **Subject-Based Permissions** at the NATS server level.
* **Mechanism:** When a Leaf Node (e.g., a Customer Site) connects to the Public Cloud, it must authenticate using a scoped credential (JWT/NKey).
* **Policy:** This credential will carry a permission policy that strictly defines:
* `PUB`: Allowed subjects (e.g., `harmony.public.requests`).
* `SUB`: Allowed subjects (e.g., `harmony.public.responses`).
* `DENY`: Implicitly everything else (including `_SYS.>`, other customer streams).
* **Enforcement:** The NATS server rejects unauthorized subscriptions or publications at the protocol level before they reach the application layer.
### 2. Hybrid Topology Strategy (Federation)
We will use **NATS Account Federation** to solve the "Hybrid Cloud" requirement. We will treat the Private Mesh and Public Cloud as separate "Accounts."
* **The Private Mesh (Business Account):**
* Consists of the customer's internal nodes (e.g., 5 locations).
* Nodes share full trust and visibility of `local.*` subjects.
* Configured as a **Leaf Node** connection to the Public Cloud.
* **The Public Cloud (SaaS Account):**
* **Exports** a specific stream/service (e.g., `public.compute.rentals`).
* Does *not* join the customer's cluster mesh. It acts as a hub.
* **The Bridge (Import):**
* The Private Mesh **Imports** the public stream.
* The Private Mesh maps this import to a local subject, e.g., `external.cloud`.
### 3. Failover Logic
The Harmony Agent will implement failover by selecting the target subject, relying on NATS to route the message physically.
1. **Primary Attempt:** Agent publishes to `local.compute`. NATS routes this only to the 5 internal sites.
2. **Failover Condition:** If internal capacity is exhausted (determined by Agent heuristics), the Agent publishes to `external.cloud`.
3. **Routing:** NATS transparently routes `external.cloud` messages over the Leaf Node connection to the Public Cloud.
4. **Security:** Because the Public Cloud does not import `local.compute`, it never sees internal traffic.
## Consequences
### Positive
* **Zero-Trust by Design:** The Public Cloud cannot spy on the Private Mesh because no streams are exported from Private to Public.
* **Reduced Complexity:** The Harmony Agent remains simple; it just publishes to different topics. It does not need to manage connection pools or complex authentication handshakes for multiple clouds.
* **Bandwidth Efficiency:** Gossip protocol traffic (cluster topology updates) is contained within the Accounts. The Private Mesh does not receive heartbeat traffic from the massive Public Cloud.
### Negative
* **Configuration Overhead:** Setting up NATS Accounts, Imports, and Exports requires more complex configuration files (server.conf) and understanding of JWT/NKeys compared to a flat mesh.
* **Observability:** Tracing a message as it crosses Account boundaries (from Private `external.cloud` to Public `public.compute`) requires centralized monitoring that understands Federation.
## References
* [NATS Documentation: Subject-Based Access Control](https://docs.nats.io/running-a-nats-service/configuration/securing_nats/auth_intro/subject_access)
* [NATS Documentation: Leaf Nodes](https://docs.nats.io/running-a-nats-service/configuration/leafnodes)
* [NATS Documentation: Services & Streams (Imports/Exports)](https://docs.nats.io/running-a-nats-service/configuration/securing_nats/accounts)

View File

@@ -14,6 +14,7 @@ If you're new to Harmony, start here:
See how to use Harmony to solve real-world problems. See how to use Harmony to solve real-world problems.
- [**PostgreSQL on Local K3D**](./use-cases/postgresql-on-local-k3d.md): Deploy a production-grade PostgreSQL cluster on a local K3D cluster. The fastest way to get started. - [**PostgreSQL on Local K3D**](./use-cases/postgresql-on-local-k3d.md): Deploy a production-grade PostgreSQL cluster on a local K3D cluster. The fastest way to get started.
- [**RustFS on Local K3D**](./use-cases/rustfs-on-local-k3d.md): Deploy a RustFS S3-compatible object store on a local K3D cluster.
- [**OKD on Bare Metal**](./use-cases/okd-on-bare-metal.md): A detailed walkthrough of bootstrapping a high-availability OKD cluster from physical hardware. - [**OKD on Bare Metal**](./use-cases/okd-on-bare-metal.md): A detailed walkthrough of bootstrapping a high-availability OKD cluster from physical hardware.
## 3. Component Catalogs ## 3. Component Catalogs

View File

@@ -0,0 +1,151 @@
# Use Case: RustFS (S3-Compatible Store) on Local K3D
Deploy a RustFS object store on a local Kubernetes cluster (K3D) using Harmony. RustFS is a Rust-based S3-compatible storage server, a modern alternative to MinIO for local development.
## What you'll have at the end
A fully operational S3-compatible object store with:
- 1 standalone instance with 1 GiB of storage
- S3 API endpoint on port 9000
- Web console on port 9001
- Ingress-based access at `http://rustfs.local`
- Default credentials: `rustfsadmin` / `rustfsadmin`
## Prerequisites
- Rust 2024 edition
- Docker running locally
- ~5 minutes
## The Score
The entire deployment is expressed in ~20 lines of Rust:
```rust
use harmony::{
inventory::Inventory,
modules::rustfs::{K8sRustFsScore, RustFsConfig},
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let rustfs = K8sRustFsScore {
config: RustFsConfig {
release_name: "harmony-rustfs".to_string(),
namespace: "harmony-rustfs".to_string(),
..Default::default()
},
};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(rustfs)],
None,
)
.await
.unwrap();
}
```
## What Harmony does
When you run this, Harmony:
1. **Connects to K8sAnywhereTopology** — auto-provisions a K3D cluster if none exists
2. **Creates a namespace**`harmony-rustfs` (or your custom namespace)
3. **Creates credentials secret** — stores the access/secret keys securely
4. **Deploys via Helm** — installs the RustFS chart in standalone mode
5. **Configures Ingress** — sets up routing at `rustfs.local`
## Running it
```bash
cargo run -p example-rustfs
```
## Verifying the deployment
```bash
# Check pods
kubectl get pods -n harmony-rustfs
# Check ingress
kubectl get ingress -n harmony-rustfs
# Access the S3 API
# Add rustfs.local to your /etc/hosts
echo "127.0.0.1 rustfs.local" | sudo tee -a /etc/hosts
# Use the AWS CLI or any S3 client
AWS_ACCESS_KEY_ID=rustfsadmin \
AWS_SECRET_ACCESS_KEY=rustfsadmin \
aws s3 ls --endpoint-url http://rustfs.local:9000
# Or via the web console
open http://rustfs.local:9001
```
## Customizing the deployment
The `RustFsConfig` struct supports:
| Field | Default | Description |
|-------|---------|-------------|
| `release_name` | `rustfs` | Helm release name |
| `namespace` | `harmony-rustfs` | Kubernetes namespace |
| `storage_size` | `1Gi` | Data storage size |
| `mode` | `Standalone` | Deployment mode (standalone only for now) |
| `access_key` | `None` | S3 access key (default: `rustfsadmin`) |
| `secret_key` | `None` | S3 secret key (default: `rustfsadmin`) |
| `ingress_class` | `None` | Ingress class to use (default: `nginx`) |
Example with custom credentials:
```rust
let rustfs = K8sRustFsScore {
config: RustFsConfig {
release_name: "my-rustfs".to_string(),
namespace: "storage".to_string(),
access_key: Some("myaccess".to_string()),
secret_key: Some("mysecret".to_string()),
ingress_class: Some("traefik".to_string()),
..Default::default()
},
};
```
## Architecture
The RustFS module follows the same pattern as PostgreSQL:
```
┌─────────────────────────────────────────────────────────────┐
│ K8sRustFsScore (user-facing) │
│ └── K8sRustFsInterpret │
│ ├── ensure_namespace() │
│ ├── ensure_secret() → K8sResourceScore │
│ └── HelmChartScore → HelmChartInterpret │
│ └── Installs rustfs/rustfs chart │
└─────────────────────────────────────────────────────────────┘
```
## Future: Unified S3 Capability
This is the first step toward a unified S3 capability that will work with:
- **RustFS** — local development (this example)
- **Ceph RGW** — production S3 via Rook/Ceph
- **AWS S3** — cloud-native S3
The pattern will be:
```rust
// Future: unified S3 interface
trait S3Store: Send + Sync {
async fn deploy_bucket(&self, config: &BucketConfig) -> Result<(), String>;
async fn get_endpoint(&self) -> Result<S3Endpoint, String>;
}
```
See the [Scores Catalog](../catalogs/scores.md) for related components.

View File

@@ -7,6 +7,7 @@ This directory contains runnable examples demonstrating Harmony's capabilities.
| Example | Description | Local K3D | Existing Cluster | Hardware Needed | | Example | Description | Local K3D | Existing Cluster | Hardware Needed |
|---------|-------------|:---------:|:----------------:|:---------------:| |---------|-------------|:---------:|:----------------:|:---------------:|
| `postgresql` | Deploy a PostgreSQL cluster | ✅ | ✅ | — | | `postgresql` | Deploy a PostgreSQL cluster | ✅ | ✅ | — |
| `rustfs` | Deploy a RustFS S3-compatible store | ✅ | ✅ | — |
| `ntfy` | Deploy ntfy notification server | ✅ | ✅ | — | | `ntfy` | Deploy ntfy notification server | ✅ | ✅ | — |
| `tenant` | Create a multi-tenant namespace | ✅ | ✅ | — | | `tenant` | Create a multi-tenant namespace | ✅ | ✅ | — |
| `cert_manager` | Provision TLS certificates | ✅ | ✅ | — | | `cert_manager` | Provision TLS certificates | ✅ | ✅ | — |
@@ -52,6 +53,7 @@ This directory contains runnable examples demonstrating Harmony's capabilities.
- **`postgresql`** — Deploy a PostgreSQL cluster via CloudNativePG - **`postgresql`** — Deploy a PostgreSQL cluster via CloudNativePG
- **`multisite_postgres`** — Multi-site PostgreSQL with failover - **`multisite_postgres`** — Multi-site PostgreSQL with failover
- **`public_postgres`** — Public-facing PostgreSQL (⚠️ uses NationTech DNS) - **`public_postgres`** — Public-facing PostgreSQL (⚠️ uses NationTech DNS)
- **`rustfs`** — Deploy a RustFS S3-compatible object store
### Kubernetes Utilities ### Kubernetes Utilities
- **`node_health`** — Check node health in a cluster - **`node_health`** — Check node health in a cluster

View File

@@ -0,0 +1,13 @@
[package]
name = "example-rustfs"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
harmony_types = { path = "../../harmony_types" }
tokio = { workspace = true }

View File

@@ -0,0 +1,25 @@
use harmony::{
inventory::Inventory,
modules::rustfs::{K8sRustFsScore, RustFsConfig},
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let rustfs = K8sRustFsScore {
config: RustFsConfig {
release_name: "harmony-rustfs".to_string(),
namespace: "harmony-rustfs".to_string(),
..Default::default()
},
};
harmony_cli::run(
Inventory::autoload(),
K8sAnywhereTopology::from_env(),
vec![Box::new(rustfs)],
None,
)
.await
.unwrap();
}

View File

@@ -21,6 +21,7 @@ pub mod openbao;
pub mod opnsense; pub mod opnsense;
pub mod postgresql; pub mod postgresql;
pub mod prometheus; pub mod prometheus;
pub mod rustfs;
pub mod storage; pub mod storage;
pub mod tenant; pub mod tenant;
pub mod tftp; pub mod tftp;

View File

@@ -0,0 +1,47 @@
use async_trait::async_trait;
use harmony_types::storage::StorageSize;
use serde::Serialize;
#[derive(Clone, Debug, Serialize)]
pub struct RustFsConfig {
pub release_name: String,
pub namespace: String,
pub storage_size: StorageSize,
pub mode: RustFsMode,
pub access_key: Option<String>,
pub secret_key: Option<String>,
pub ingress_class: Option<String>,
}
impl Default for RustFsConfig {
fn default() -> Self {
Self {
release_name: "rustfs".to_string(),
namespace: "harmony-rustfs".to_string(),
storage_size: StorageSize::gi(1),
mode: RustFsMode::Standalone,
access_key: None,
secret_key: None,
ingress_class: None,
}
}
}
#[derive(Clone, Debug, Serialize)]
pub enum RustFsMode {
Standalone,
}
#[async_trait]
pub trait RustFs: Send + Sync {
async fn deploy(&self, config: &RustFsConfig) -> Result<String, String>;
async fn get_endpoint(&self, config: &RustFsConfig) -> Result<RustFsEndpoint, String>;
}
#[derive(Clone, Debug)]
pub struct RustFsEndpoint {
pub s3_endpoint: String,
pub console_endpoint: String,
pub access_key: String,
pub secret_key: String,
}

View File

@@ -0,0 +1,6 @@
pub mod capability;
mod score;
mod score_k8s;
pub use capability::*;
pub use score::*;
pub use score_k8s::*;

View File

@@ -0,0 +1,85 @@
use async_trait::async_trait;
use harmony_types::id::Id;
use serde::Serialize;
use crate::data::Version;
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory;
use crate::modules::rustfs::capability::{RustFs, RustFsConfig};
use crate::score::Score;
use crate::topology::Topology;
#[derive(Debug, Clone, Serialize)]
pub struct RustFsScore {
pub config: RustFsConfig,
}
impl Default for RustFsScore {
fn default() -> Self {
Self {
config: RustFsConfig::default(),
}
}
}
impl RustFsScore {
pub fn new(namespace: &str) -> Self {
Self {
config: RustFsConfig {
namespace: namespace.to_string(),
..Default::default()
},
}
}
}
impl<T: Topology + RustFs + Send + Sync> Score<T> for RustFsScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(RustFsInterpret {
config: self.config.clone(),
})
}
fn name(&self) -> String {
format!(
"RustFsScore({}:{})",
self.config.namespace, self.config.release_name
)
}
}
#[derive(Debug, Clone)]
struct RustFsInterpret {
config: RustFsConfig,
}
#[async_trait]
impl<T: Topology + RustFs + Send + Sync> Interpret<T> for RustFsInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("RustFsInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
async fn execute(&self, _inventory: &Inventory, topo: &T) -> Result<Outcome, InterpretError> {
let release_name = topo
.deploy(&self.config)
.await
.map_err(|e| InterpretError::new(e))?;
Ok(Outcome::success(format!(
"RustFS '{}' deployed in namespace '{}'",
release_name, self.config.namespace
)))
}
}

View File

@@ -0,0 +1,285 @@
use std::str::FromStr;
use crate::data::Version;
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory;
use crate::modules::helm::chart::{HelmChartScore, HelmRepository};
use crate::modules::k8s::resource::K8sResourceScore;
use crate::modules::rustfs::capability::{RustFs, RustFsConfig, RustFsEndpoint, RustFsMode};
use crate::score::Score;
use crate::topology::{HelmCommand, K8sclient, Topology};
use async_trait::async_trait;
use harmony_types::id::Id;
use harmony_types::net::Url;
use k8s_openapi::api::core::v1::Secret;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use k8s_openapi::ByteString;
use log::info;
use non_blank_string_rs::NonBlankString;
use serde::Serialize;
#[derive(Debug, Clone, Serialize)]
pub struct K8sRustFsScore {
pub config: RustFsConfig,
}
impl Default for K8sRustFsScore {
fn default() -> Self {
Self {
config: RustFsConfig::default(),
}
}
}
impl K8sRustFsScore {
pub fn new(namespace: &str) -> Self {
Self {
config: RustFsConfig {
namespace: namespace.to_string(),
..Default::default()
},
}
}
}
impl<T: Topology + K8sclient + HelmCommand + 'static> Score<T> for K8sRustFsScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(K8sRustFsInterpret {
config: self.config.clone(),
})
}
fn name(&self) -> String {
format!("K8sRustFsScore({})", self.config.namespace)
}
}
#[derive(Debug)]
pub struct K8sRustFsInterpret {
config: RustFsConfig,
}
impl K8sRustFsInterpret {
async fn ensure_namespace<T: Topology + K8sclient>(
&self,
topology: &T,
) -> Result<(), InterpretError> {
let k8s_client = topology
.k8s_client()
.await
.map_err(|e| InterpretError::new(format!("Failed to get k8s client: {}", e)))?;
let namespace_name = &self.config.namespace;
if k8s_client
.namespace_exists(namespace_name)
.await
.map_err(|e| {
InterpretError::new(format!(
"Failed to check namespace '{}': {}",
namespace_name, e
))
})?
{
info!("Namespace '{}' already exists", namespace_name);
return Ok(());
}
info!("Creating namespace '{}'", namespace_name);
k8s_client
.create_namespace(namespace_name)
.await
.map_err(|e| {
InterpretError::new(format!(
"Failed to create namespace '{}': {}",
namespace_name, e
))
})?;
k8s_client
.wait_for_namespace(namespace_name, Some(std::time::Duration::from_secs(30)))
.await
.map_err(|e| {
InterpretError::new(format!("Namespace '{}' not ready: {}", namespace_name, e))
})?;
info!("Namespace '{}' is ready", namespace_name);
Ok(())
}
async fn ensure_secret<T: Topology + K8sclient>(
&self,
topology: &T,
) -> Result<(), InterpretError> {
let access_key = self.config.access_key.as_deref().unwrap_or("rustfsadmin");
let secret_key = self.config.secret_key.as_deref().unwrap_or("rustfsadmin");
let k8s_client = topology
.k8s_client()
.await
.map_err(|e| InterpretError::new(format!("Failed to get k8s client: {}", e)))?;
let namespace_name = &self.config.namespace;
let secret_name = format!("{}-credentials", self.config.release_name);
let secret_exists = k8s_client
.get_secret_json_value(&secret_name, Some(namespace_name))
.await
.is_ok();
if secret_exists {
info!(
"Secret '{}' already exists in namespace '{}'",
secret_name, namespace_name
);
return Ok(());
}
info!("Creating secret '{}' in namespace '{}'", secret_name, namespace_name);
let mut data = std::collections::BTreeMap::new();
data.insert(
"access_key".to_string(),
ByteString(access_key.as_bytes().to_vec()),
);
data.insert(
"secret_key".to_string(),
ByteString(secret_key.as_bytes().to_vec()),
);
let secret = Secret {
metadata: ObjectMeta {
name: Some(secret_name.clone()),
namespace: Some(namespace_name.clone()),
..ObjectMeta::default()
},
data: Some(data),
string_data: None,
type_: Some("Opaque".to_string()),
..Secret::default()
};
K8sResourceScore::single(secret, Some(namespace_name.clone()))
.create_interpret()
.execute(&Inventory::empty(), topology)
.await?;
Ok(())
}
fn to_values_yaml(&self) -> String {
let storage_size = self.config.storage_size.to_string();
let ingress_class = self.config.ingress_class.as_deref().unwrap_or("nginx");
let mode_yaml = match self.config.mode {
RustFsMode::Standalone => {
"mode:\n standalone:\n enabled: true\n distributed:\n enabled: false"
}
};
format!(
r#"{mode_yaml}
storageclass:
name: local-path
dataStorageSize: {storage_size}
logStorageSize: 256Mi
ingress:
enabled: true
className: {ingress_class}
hosts:
- host: rustfs.local
paths:
- path: /
pathType: Prefix
secret:
existingSecret: {release_name}-credentials
"#,
release_name = self.config.release_name
)
}
}
#[async_trait]
impl<T: Topology + K8sclient + HelmCommand + 'static> Interpret<T> for K8sRustFsInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
self.ensure_namespace(topology).await?;
self.ensure_secret(topology).await?;
let helm_score = HelmChartScore {
namespace: Some(NonBlankString::from_str(&self.config.namespace).unwrap()),
release_name: NonBlankString::from_str(&self.config.release_name).unwrap(),
chart_name: NonBlankString::from_str("rustfs/rustfs").unwrap(),
chart_version: None,
values_overrides: None,
values_yaml: Some(self.to_values_yaml()),
create_namespace: false,
install_only: false,
repository: Some(HelmRepository::new(
"rustfs".to_string(),
Url::Url(url::Url::parse("https://charts.rustfs.com").unwrap()),
true,
)),
};
helm_score
.create_interpret()
.execute(&Inventory::empty(), topology)
.await
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("K8sRustFsInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
pub struct K8sAnywhereRustFs;
impl K8sAnywhereRustFs {
pub fn new() -> Self {
Self
}
}
impl Default for K8sAnywhereRustFs {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl RustFs for K8sAnywhereRustFs {
async fn deploy(&self, config: &RustFsConfig) -> Result<String, String> {
Ok(config.release_name.clone())
}
async fn get_endpoint(&self, config: &RustFsConfig) -> Result<RustFsEndpoint, String> {
Ok(RustFsEndpoint {
s3_endpoint: "http://rustfs.local:9000".to_string(),
console_endpoint: "http://rustfs.local:9001".to_string(),
access_key: config
.access_key
.clone()
.unwrap_or_else(|| "rustfsadmin".to_string()),
secret_key: config
.secret_key
.clone()
.unwrap_or_else(|| "rustfsadmin".to_string()),
})
}
}

56
harmony_assets/Cargo.toml Normal file
View File

@@ -0,0 +1,56 @@
[package]
name = "harmony_assets"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[lib]
name = "harmony_assets"
[[bin]]
name = "harmony_assets"
path = "src/cli/mod.rs"
required-features = ["cli"]
[features]
default = ["blake3"]
sha256 = ["dep:sha2"]
blake3 = ["dep:blake3"]
s3 = [
"dep:aws-sdk-s3",
"dep:aws-config",
]
cli = [
"dep:clap",
"dep:indicatif",
"dep:inquire",
]
reqwest = ["dep:reqwest"]
[dependencies]
log.workspace = true
tokio.workspace = true
thiserror.workspace = true
directories.workspace = true
sha2 = { version = "0.10", optional = true }
blake3 = { version = "1.5", optional = true }
reqwest = { version = "0.12", optional = true, default-features = false, features = ["stream", "rustls-tls"] }
futures-util.workspace = true
async-trait.workspace = true
url.workspace = true
# CLI only
clap = { version = "4.5", features = ["derive"], optional = true }
indicatif = { version = "0.18", optional = true }
inquire = { version = "0.7", optional = true }
# S3 only
aws-sdk-s3 = { version = "1", optional = true }
aws-config = { version = "1", optional = true }
[dev-dependencies]
tempfile.workspace = true
httptest = "0.16"
pretty_assertions.workspace = true
tokio-test.workspace = true

View File

@@ -0,0 +1,80 @@
use crate::hash::ChecksumAlgo;
use std::path::PathBuf;
use url::Url;
#[derive(Debug, Clone)]
pub struct Asset {
pub url: Url,
pub checksum: String,
pub checksum_algo: ChecksumAlgo,
pub file_name: String,
pub size: Option<u64>,
}
impl Asset {
pub fn new(url: Url, checksum: String, checksum_algo: ChecksumAlgo, file_name: String) -> Self {
Self {
url,
checksum,
checksum_algo,
file_name,
size: None,
}
}
pub fn with_size(mut self, size: u64) -> Self {
self.size = Some(size);
self
}
pub fn formatted_checksum(&self) -> String {
crate::hash::format_checksum(&self.checksum, self.checksum_algo.clone())
}
}
#[derive(Debug, Clone)]
pub struct LocalCache {
pub base_dir: PathBuf,
}
impl LocalCache {
pub fn new(base_dir: PathBuf) -> Self {
Self { base_dir }
}
pub fn path_for(&self, asset: &Asset) -> PathBuf {
let prefix = &asset.checksum[..16.min(asset.checksum.len())];
self.base_dir.join(prefix).join(&asset.file_name)
}
pub fn cache_key_dir(&self, asset: &Asset) -> PathBuf {
let prefix = &asset.checksum[..16.min(asset.checksum.len())];
self.base_dir.join(prefix)
}
pub async fn ensure_dir(&self, asset: &Asset) -> Result<(), crate::errors::AssetError> {
let dir = self.cache_key_dir(asset);
tokio::fs::create_dir_all(&dir)
.await
.map_err(|e| crate::errors::AssetError::IoError(e))?;
Ok(())
}
}
impl Default for LocalCache {
fn default() -> Self {
let base_dir = directories::ProjectDirs::from("io", "NationTech", "Harmony")
.map(|dirs| dirs.cache_dir().join("assets"))
.unwrap_or_else(|| PathBuf::from("/tmp/harmony_assets"));
Self::new(base_dir)
}
}
#[derive(Debug, Clone)]
pub struct StoredAsset {
pub url: Url,
pub checksum: String,
pub checksum_algo: ChecksumAlgo,
pub size: u64,
pub key: String,
}

View File

@@ -0,0 +1,25 @@
use clap::Parser;
#[derive(Parser, Debug)]
pub struct ChecksumArgs {
pub path: String,
#[arg(short, long, default_value = "blake3")]
pub algo: String,
}
pub async fn execute(args: ChecksumArgs) -> Result<(), Box<dyn std::error::Error>> {
use harmony_assets::{ChecksumAlgo, checksum_for_path};
let path = std::path::Path::new(&args.path);
if !path.exists() {
eprintln!("Error: File not found: {}", args.path);
std::process::exit(1);
}
let algo = ChecksumAlgo::from_str(&args.algo)?;
let checksum = checksum_for_path(path, algo.clone()).await?;
println!("{}:{} {}", algo.name(), checksum, args.path);
Ok(())
}

View File

@@ -0,0 +1,82 @@
use clap::Parser;
#[derive(Parser, Debug)]
pub struct DownloadArgs {
pub url: String,
pub checksum: String,
#[arg(short, long)]
pub output: Option<String>,
#[arg(short, long, default_value = "blake3")]
pub algo: String,
}
pub async fn execute(args: DownloadArgs) -> Result<(), Box<dyn std::error::Error>> {
use harmony_assets::{
Asset, AssetStore, ChecksumAlgo, LocalCache, LocalStore, verify_checksum,
};
use indicatif::{ProgressBar, ProgressStyle};
use url::Url;
let url = Url::parse(&args.url).map_err(|e| format!("Invalid URL: {}", e))?;
let file_name = args
.output
.or_else(|| {
std::path::Path::new(&args.url)
.file_name()
.and_then(|n| n.to_str())
.map(|s| s.to_string())
})
.unwrap_or_else(|| "download".to_string());
let algo = ChecksumAlgo::from_str(&args.algo)?;
let asset = Asset::new(url, args.checksum.clone(), algo.clone(), file_name);
let cache = LocalCache::default();
println!("Downloading: {}", asset.url);
println!("Checksum: {}:{}", algo.name(), args.checksum);
println!("Cache dir: {:?}", cache.base_dir);
let total_size = asset.size.unwrap_or(0);
let pb = if total_size > 0 {
let pb = ProgressBar::new(total_size);
pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{bar:40}] {bytes}/{total_bytes} ({bytes_per_sec})")?
.progress_chars("=>-"),
);
Some(pb)
} else {
None
};
let progress_fn: Box<dyn Fn(u64, Option<u64>) + Send> = Box::new({
let pb = pb.clone();
move |bytes, _total| {
if let Some(ref pb) = pb {
pb.set_position(bytes);
}
}
});
let store = LocalStore::default();
let result = store.fetch(&asset, &cache, Some(progress_fn)).await;
if let Some(pb) = pb {
pb.finish();
}
match result {
Ok(path) => {
verify_checksum(&path, &args.checksum, algo).await?;
println!("\nDownloaded to: {:?}", path);
println!("Checksum verified OK");
Ok(())
}
Err(e) => {
eprintln!("Download failed: {}", e);
std::process::exit(1);
}
}
}

View File

@@ -0,0 +1,49 @@
pub mod checksum;
pub mod download;
pub mod upload;
pub mod verify;
use clap::{Parser, Subcommand};
#[derive(Parser, Debug)]
#[command(
name = "harmony_assets",
version,
about = "Asset management CLI for downloading, uploading, and verifying large binary assets"
)]
pub struct Cli {
#[command(subcommand)]
pub command: Commands,
}
#[derive(Subcommand, Debug)]
pub enum Commands {
Upload(upload::UploadArgs),
Download(download::DownloadArgs),
Checksum(checksum::ChecksumArgs),
Verify(verify::VerifyArgs),
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
log::info!("Starting harmony_assets CLI");
let cli = Cli::parse();
match cli.command {
Commands::Upload(args) => {
upload::execute(args).await?;
}
Commands::Download(args) => {
download::execute(args).await?;
}
Commands::Checksum(args) => {
checksum::execute(args).await?;
}
Commands::Verify(args) => {
verify::execute(args).await?;
}
}
Ok(())
}

View File

@@ -0,0 +1,166 @@
use clap::Parser;
use harmony_assets::{S3Config, S3Store, checksum_for_path_with_progress};
use indicatif::{ProgressBar, ProgressStyle};
use std::path::Path;
#[derive(Parser, Debug)]
pub struct UploadArgs {
pub source: String,
pub key: Option<String>,
#[arg(short, long)]
pub content_type: Option<String>,
#[arg(short, long, default_value_t = true)]
pub public_read: bool,
#[arg(short, long)]
pub endpoint: Option<String>,
#[arg(short, long)]
pub bucket: Option<String>,
#[arg(short, long)]
pub region: Option<String>,
#[arg(short, long)]
pub access_key_id: Option<String>,
#[arg(short, long)]
pub secret_access_key: Option<String>,
#[arg(short, long, default_value = "blake3")]
pub algo: String,
#[arg(short, long, default_value_t = false)]
pub yes: bool,
}
pub async fn execute(args: UploadArgs) -> Result<(), Box<dyn std::error::Error>> {
let source_path = Path::new(&args.source);
if !source_path.exists() {
eprintln!("Error: File not found: {}", args.source);
std::process::exit(1);
}
let key = args.key.unwrap_or_else(|| {
source_path
.file_name()
.and_then(|n| n.to_str())
.unwrap_or("upload")
.to_string()
});
let metadata = tokio::fs::metadata(source_path)
.await
.map_err(|e| format!("Failed to read file metadata: {}", e))?;
let total_size = metadata.len();
let endpoint = args
.endpoint
.or_else(|| std::env::var("S3_ENDPOINT").ok())
.unwrap_or_default();
let bucket = args
.bucket
.or_else(|| std::env::var("S3_BUCKET").ok())
.unwrap_or_else(|| {
inquire::Text::new("S3 Bucket name:")
.with_default("harmony-assets")
.prompt()
.unwrap()
});
let region = args
.region
.or_else(|| std::env::var("S3_REGION").ok())
.unwrap_or_else(|| {
inquire::Text::new("S3 Region:")
.with_default("us-east-1")
.prompt()
.unwrap()
});
let access_key_id = args
.access_key_id
.or_else(|| std::env::var("AWS_ACCESS_KEY_ID").ok());
let secret_access_key = args
.secret_access_key
.or_else(|| std::env::var("AWS_SECRET_ACCESS_KEY").ok());
let config = S3Config {
endpoint: if endpoint.is_empty() {
None
} else {
Some(endpoint)
},
bucket: bucket.clone(),
region: region.clone(),
access_key_id,
secret_access_key,
public_read: args.public_read,
};
println!("Upload Configuration:");
println!(" Source: {}", args.source);
println!(" S3 Key: {}", key);
println!(" Bucket: {}", bucket);
println!(" Region: {}", region);
println!(
" Size: {} bytes ({} MB)",
total_size,
total_size as f64 / 1024.0 / 1024.0
);
println!();
if !args.yes {
let confirm = inquire::Confirm::new("Proceed with upload?")
.with_default(true)
.prompt()?;
if !confirm {
println!("Upload cancelled.");
return Ok(());
}
}
let store = S3Store::new(config)
.await
.map_err(|e| format!("Failed to initialize S3 client: {}", e))?;
println!("Computing checksum while uploading...\n");
let pb = ProgressBar::new(total_size);
pb.set_style(
ProgressStyle::default_bar()
.template("{spinner:.green} [{elapsed_precise}] [{bar:40}] {bytes}/{total_bytes} ({bytes_per_sec})")?
.progress_chars("=>-"),
);
{
let algo = harmony_assets::ChecksumAlgo::from_str(&args.algo)?;
let rt = tokio::runtime::Handle::current();
let pb_clone = pb.clone();
let _checksum = rt.block_on(checksum_for_path_with_progress(
source_path,
algo,
|read, _total| {
pb_clone.set_position(read);
},
))?;
}
pb.set_position(total_size);
let result = store
.store(source_path, &key, args.content_type.as_deref())
.await;
pb.finish();
match result {
Ok(asset) => {
println!("\nUpload complete!");
println!(" URL: {}", asset.url);
println!(
" Checksum: {}:{}",
asset.checksum_algo.name(),
asset.checksum
);
println!(" Size: {} bytes", asset.size);
println!(" Key: {}", asset.key);
Ok(())
}
Err(e) => {
eprintln!("Upload failed: {}", e);
std::process::exit(1);
}
}
}

View File

@@ -0,0 +1,32 @@
use clap::Parser;
#[derive(Parser, Debug)]
pub struct VerifyArgs {
pub path: String,
pub expected: String,
#[arg(short, long, default_value = "blake3")]
pub algo: String,
}
pub async fn execute(args: VerifyArgs) -> Result<(), Box<dyn std::error::Error>> {
use harmony_assets::{ChecksumAlgo, verify_checksum};
let path = std::path::Path::new(&args.path);
if !path.exists() {
eprintln!("Error: File not found: {}", args.path);
std::process::exit(1);
}
let algo = ChecksumAlgo::from_str(&args.algo)?;
match verify_checksum(path, &args.expected, algo).await {
Ok(()) => {
println!("Checksum verified OK");
Ok(())
}
Err(e) => {
eprintln!("Verification FAILED: {}", e);
std::process::exit(1);
}
}
}

View File

@@ -0,0 +1,37 @@
use std::path::PathBuf;
use thiserror::Error;
#[derive(Debug, Error)]
pub enum AssetError {
#[error("File not found: {0}")]
FileNotFound(PathBuf),
#[error("Checksum mismatch for '{path}': expected {expected}, got {actual}")]
ChecksumMismatch {
path: PathBuf,
expected: String,
actual: String,
},
#[error("Checksum algorithm not available: {0}. Enable the corresponding feature flag.")]
ChecksumAlgoNotAvailable(String),
#[error("Download failed: {0}")]
DownloadFailed(String),
#[error("S3 error: {0}")]
S3Error(String),
#[error("IO error: {0}")]
IoError(#[from] std::io::Error),
#[cfg(feature = "reqwest")]
#[error("HTTP error: {0}")]
HttpError(#[from] reqwest::Error),
#[error("Store error: {0}")]
StoreError(String),
#[error("Configuration error: {0}")]
ConfigError(String),
}

233
harmony_assets/src/hash.rs Normal file
View File

@@ -0,0 +1,233 @@
use crate::errors::AssetError;
use std::path::Path;
#[cfg(feature = "blake3")]
use blake3::Hasher as B3Hasher;
#[cfg(feature = "sha256")]
use sha2::{Digest, Sha256};
#[derive(Debug, Clone)]
pub enum ChecksumAlgo {
BLAKE3,
SHA256,
}
impl Default for ChecksumAlgo {
fn default() -> Self {
#[cfg(feature = "blake3")]
return ChecksumAlgo::BLAKE3;
#[cfg(not(feature = "blake3"))]
return ChecksumAlgo::SHA256;
}
}
impl ChecksumAlgo {
pub fn name(&self) -> &'static str {
match self {
ChecksumAlgo::BLAKE3 => "blake3",
ChecksumAlgo::SHA256 => "sha256",
}
}
pub fn from_str(s: &str) -> Result<Self, AssetError> {
match s.to_lowercase().as_str() {
"blake3" | "b3" => Ok(ChecksumAlgo::BLAKE3),
"sha256" | "sha-256" => Ok(ChecksumAlgo::SHA256),
_ => Err(AssetError::ChecksumAlgoNotAvailable(s.to_string())),
}
}
}
impl std::fmt::Display for ChecksumAlgo {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.name())
}
}
pub async fn checksum_for_file<R>(reader: R, algo: ChecksumAlgo) -> Result<String, AssetError>
where
R: tokio::io::AsyncRead + Unpin,
{
match algo {
#[cfg(feature = "blake3")]
ChecksumAlgo::BLAKE3 => {
let mut hasher = B3Hasher::new();
let mut reader = reader;
let mut buf = vec![0u8; 65536];
loop {
let n = tokio::io::AsyncReadExt::read(&mut reader, &mut buf).await?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
}
Ok(hasher.finalize().to_hex().to_string())
}
#[cfg(not(feature = "blake3"))]
ChecksumAlgo::BLAKE3 => Err(AssetError::ChecksumAlgoNotAvailable("blake3".to_string())),
#[cfg(feature = "sha256")]
ChecksumAlgo::SHA256 => {
let mut hasher = Sha256::new();
let mut reader = reader;
let mut buf = vec![0u8; 65536];
loop {
let n = tokio::io::AsyncReadExt::read(&mut reader, &mut buf).await?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
}
Ok(format!("{:x}", hasher.finalize()))
}
#[cfg(not(feature = "sha256"))]
ChecksumAlgo::SHA256 => Err(AssetError::ChecksumAlgoNotAvailable("sha256".to_string())),
}
}
pub async fn checksum_for_path(path: &Path, algo: ChecksumAlgo) -> Result<String, AssetError> {
let file = tokio::fs::File::open(path)
.await
.map_err(|e| AssetError::IoError(e))?;
let reader = tokio::io::BufReader::with_capacity(65536, file);
checksum_for_file(reader, algo).await
}
pub async fn checksum_for_path_with_progress<F>(
path: &Path,
algo: ChecksumAlgo,
mut progress: F,
) -> Result<String, AssetError>
where
F: FnMut(u64, Option<u64>) + Send,
{
let file = tokio::fs::File::open(path)
.await
.map_err(|e| AssetError::IoError(e))?;
let metadata = file.metadata().await.map_err(|e| AssetError::IoError(e))?;
let total = Some(metadata.len());
let reader = tokio::io::BufReader::with_capacity(65536, file);
match algo {
#[cfg(feature = "blake3")]
ChecksumAlgo::BLAKE3 => {
let mut hasher = B3Hasher::new();
let mut reader = reader;
let mut buf = vec![0u8; 65536];
let mut read: u64 = 0;
loop {
let n = tokio::io::AsyncReadExt::read(&mut reader, &mut buf).await?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
read += n as u64;
progress(read, total);
}
Ok(hasher.finalize().to_hex().to_string())
}
#[cfg(not(feature = "blake3"))]
ChecksumAlgo::BLAKE3 => Err(AssetError::ChecksumAlgoNotAvailable("blake3".to_string())),
#[cfg(feature = "sha256")]
ChecksumAlgo::SHA256 => {
let mut hasher = Sha256::new();
let mut reader = reader;
let mut buf = vec![0u8; 65536];
let mut read: u64 = 0;
loop {
let n = tokio::io::AsyncReadExt::read(&mut reader, &mut buf).await?;
if n == 0 {
break;
}
hasher.update(&buf[..n]);
read += n as u64;
progress(read, total);
}
Ok(format!("{:x}", hasher.finalize()))
}
#[cfg(not(feature = "sha256"))]
ChecksumAlgo::SHA256 => Err(AssetError::ChecksumAlgoNotAvailable("sha256".to_string())),
}
}
pub async fn verify_checksum(
path: &Path,
expected: &str,
algo: ChecksumAlgo,
) -> Result<(), AssetError> {
let actual = checksum_for_path(path, algo).await?;
let expected_clean = expected
.trim_start_matches("blake3:")
.trim_start_matches("sha256:")
.trim_start_matches("b3:")
.trim_start_matches("sha-256:");
if actual != expected_clean {
return Err(AssetError::ChecksumMismatch {
path: path.to_path_buf(),
expected: expected_clean.to_string(),
actual,
});
}
Ok(())
}
pub fn format_checksum(checksum: &str, algo: ChecksumAlgo) -> String {
format!("{}:{}", algo.name(), checksum)
}
#[cfg(test)]
mod tests {
use super::*;
use std::io::Write;
use tempfile::NamedTempFile;
async fn create_temp_file(content: &[u8]) -> NamedTempFile {
let mut file = NamedTempFile::new().unwrap();
file.write_all(content).unwrap();
file.flush().unwrap();
file
}
#[tokio::test]
async fn test_checksum_blake3() {
let file = create_temp_file(b"hello world").await;
let checksum = checksum_for_path(file.path(), ChecksumAlgo::BLAKE3)
.await
.unwrap();
assert_eq!(
checksum,
"d74981efa70a0c880b8d8c1985d075dbcbf679b99a5f9914e5aaf96b831a9e24"
);
}
#[tokio::test]
async fn test_verify_checksum_success() {
let file = create_temp_file(b"hello world").await;
let checksum = checksum_for_path(file.path(), ChecksumAlgo::BLAKE3)
.await
.unwrap();
let result = verify_checksum(file.path(), &checksum, ChecksumAlgo::BLAKE3).await;
assert!(result.is_ok());
}
#[tokio::test]
async fn test_verify_checksum_failure() {
let file = create_temp_file(b"hello world").await;
let result = verify_checksum(
file.path(),
"blake3:0000000000000000000000000000000000000000000000000000000000000000",
ChecksumAlgo::BLAKE3,
)
.await;
assert!(matches!(result, Err(AssetError::ChecksumMismatch { .. })));
}
#[tokio::test]
async fn test_checksum_with_prefix() {
let file = create_temp_file(b"hello world").await;
let checksum = checksum_for_path(file.path(), ChecksumAlgo::BLAKE3)
.await
.unwrap();
let formatted = format_checksum(&checksum, ChecksumAlgo::BLAKE3);
assert!(formatted.starts_with("blake3:"));
}
}

14
harmony_assets/src/lib.rs Normal file
View File

@@ -0,0 +1,14 @@
pub mod asset;
pub mod errors;
pub mod hash;
pub mod store;
pub use asset::{Asset, LocalCache, StoredAsset};
pub use errors::AssetError;
pub use hash::{ChecksumAlgo, checksum_for_path, checksum_for_path_with_progress, verify_checksum};
pub use store::AssetStore;
#[cfg(feature = "s3")]
pub use store::{S3Config, S3Store};
pub use store::local::LocalStore;

View File

@@ -0,0 +1,137 @@
use crate::asset::{Asset, LocalCache};
use crate::errors::AssetError;
use crate::store::AssetStore;
use async_trait::async_trait;
use std::path::PathBuf;
use url::Url;
#[cfg(feature = "reqwest")]
use crate::hash::verify_checksum;
#[derive(Debug, Clone)]
pub struct LocalStore {
base_dir: PathBuf,
}
impl LocalStore {
pub fn new(base_dir: PathBuf) -> Self {
Self { base_dir }
}
pub fn with_cache(cache: LocalCache) -> Self {
Self {
base_dir: cache.base_dir.clone(),
}
}
pub fn base_dir(&self) -> &PathBuf {
&self.base_dir
}
}
impl Default for LocalStore {
fn default() -> Self {
Self::new(LocalCache::default().base_dir)
}
}
#[async_trait]
impl AssetStore for LocalStore {
#[cfg(feature = "reqwest")]
async fn fetch(
&self,
asset: &Asset,
cache: &LocalCache,
progress: Option<Box<dyn Fn(u64, Option<u64>) + Send>>,
) -> Result<PathBuf, AssetError> {
use futures_util::StreamExt;
let dest_path = cache.path_for(asset);
if dest_path.exists() {
let verification =
verify_checksum(&dest_path, &asset.checksum, asset.checksum_algo.clone()).await;
if verification.is_ok() {
log::debug!("Asset already cached at {:?}", dest_path);
return Ok(dest_path);
} else {
log::warn!("Cached file failed checksum verification, re-downloading");
tokio::fs::remove_file(&dest_path)
.await
.map_err(|e| AssetError::IoError(e))?;
}
}
cache.ensure_dir(asset).await?;
log::info!("Downloading asset from {}", asset.url);
let client = reqwest::Client::new();
let response = client
.get(asset.url.as_str())
.send()
.await
.map_err(|e| AssetError::DownloadFailed(e.to_string()))?;
if !response.status().is_success() {
return Err(AssetError::DownloadFailed(format!(
"HTTP {}: {}",
response.status(),
asset.url
)));
}
let total_size = response.content_length();
let mut file = tokio::fs::File::create(&dest_path)
.await
.map_err(|e| AssetError::IoError(e))?;
let mut stream = response.bytes_stream();
let mut downloaded: u64 = 0;
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result.map_err(|e| AssetError::DownloadFailed(e.to_string()))?;
tokio::io::AsyncWriteExt::write_all(&mut file, &chunk)
.await
.map_err(|e| AssetError::IoError(e))?;
downloaded += chunk.len() as u64;
if let Some(ref p) = progress {
p(downloaded, total_size);
}
}
tokio::io::AsyncWriteExt::flush(&mut file)
.await
.map_err(|e| AssetError::IoError(e))?;
drop(file);
verify_checksum(&dest_path, &asset.checksum, asset.checksum_algo.clone()).await?;
log::info!("Asset downloaded and verified: {:?}", dest_path);
Ok(dest_path)
}
#[cfg(not(feature = "reqwest"))]
async fn fetch(
&self,
_asset: &Asset,
_cache: &LocalCache,
_progress: Option<Box<dyn Fn(u64, Option<u64>) + Send>>,
) -> Result<PathBuf, AssetError> {
Err(AssetError::DownloadFailed(
"HTTP downloads not available. Enable the 'reqwest' feature.".to_string(),
))
}
async fn exists(&self, key: &str) -> Result<bool, AssetError> {
let path = self.base_dir.join(key);
Ok(path.exists())
}
fn url_for(&self, key: &str) -> Result<Url, AssetError> {
let path = self.base_dir.join(key);
Url::from_file_path(&path)
.map_err(|_| AssetError::StoreError("Could not convert path to file URL".to_string()))
}
}

View File

@@ -0,0 +1,27 @@
use crate::asset::{Asset, LocalCache};
use crate::errors::AssetError;
use async_trait::async_trait;
use std::path::PathBuf;
use url::Url;
pub mod local;
#[cfg(feature = "s3")]
pub mod s3;
#[async_trait]
pub trait AssetStore: Send + Sync {
async fn fetch(
&self,
asset: &Asset,
cache: &LocalCache,
progress: Option<Box<dyn Fn(u64, Option<u64>) + Send>>,
) -> Result<PathBuf, AssetError>;
async fn exists(&self, key: &str) -> Result<bool, AssetError>;
fn url_for(&self, key: &str) -> Result<Url, AssetError>;
}
#[cfg(feature = "s3")]
pub use s3::{S3Config, S3Store};

View File

@@ -0,0 +1,235 @@
use crate::asset::StoredAsset;
use crate::errors::AssetError;
use crate::hash::ChecksumAlgo;
use async_trait::async_trait;
use aws_sdk_s3::Client as S3Client;
use aws_sdk_s3::primitives::ByteStream;
use aws_sdk_s3::types::ObjectCannedAcl;
use std::path::Path;
use url::Url;
#[derive(Debug, Clone)]
pub struct S3Config {
pub endpoint: Option<String>,
pub bucket: String,
pub region: String,
pub access_key_id: Option<String>,
pub secret_access_key: Option<String>,
pub public_read: bool,
}
impl Default for S3Config {
fn default() -> Self {
Self {
endpoint: None,
bucket: String::new(),
region: String::from("us-east-1"),
access_key_id: None,
secret_access_key: None,
public_read: true,
}
}
}
#[derive(Debug, Clone)]
pub struct S3Store {
client: S3Client,
config: S3Config,
}
impl S3Store {
pub async fn new(config: S3Config) -> Result<Self, AssetError> {
let mut cfg_builder = aws_config::defaults(aws_config::BehaviorVersion::latest());
if let Some(ref endpoint) = config.endpoint {
cfg_builder = cfg_builder.endpoint_url(endpoint);
}
let cfg = cfg_builder.load().await;
let client = S3Client::new(&cfg);
Ok(Self { client, config })
}
pub fn config(&self) -> &S3Config {
&self.config
}
fn public_url(&self, key: &str) -> Result<Url, AssetError> {
let url_str = if let Some(ref endpoint) = self.config.endpoint {
format!(
"{}/{}/{}",
endpoint.trim_end_matches('/'),
self.config.bucket,
key
)
} else {
format!(
"https://{}.s3.{}.amazonaws.com/{}",
self.config.bucket, self.config.region, key
)
};
Url::parse(&url_str).map_err(|e| AssetError::S3Error(e.to_string()))
}
pub async fn store(
&self,
source: &Path,
key: &str,
content_type: Option<&str>,
) -> Result<StoredAsset, AssetError> {
let metadata = tokio::fs::metadata(source)
.await
.map_err(|e| AssetError::IoError(e))?;
let size = metadata.len();
let checksum = crate::checksum_for_path(source, ChecksumAlgo::default())
.await
.map_err(|e| AssetError::StoreError(e.to_string()))?;
let body = ByteStream::from_path(source).await.map_err(|e| {
AssetError::IoError(std::io::Error::new(
std::io::ErrorKind::Other,
e.to_string(),
))
})?;
let mut put_builder = self
.client
.put_object()
.bucket(&self.config.bucket)
.key(key)
.body(body)
.content_length(size as i64)
.metadata("checksum", &checksum);
if self.config.public_read {
put_builder = put_builder.acl(ObjectCannedAcl::PublicRead);
}
if let Some(ct) = content_type {
put_builder = put_builder.content_type(ct);
}
put_builder
.send()
.await
.map_err(|e| AssetError::S3Error(e.to_string()))?;
Ok(StoredAsset {
url: self.public_url(key)?,
checksum,
checksum_algo: ChecksumAlgo::default(),
size,
key: key.to_string(),
})
}
}
use crate::store::AssetStore;
use crate::{Asset, LocalCache};
#[async_trait]
impl AssetStore for S3Store {
async fn fetch(
&self,
asset: &Asset,
cache: &LocalCache,
progress: Option<Box<dyn Fn(u64, Option<u64>) + Send>>,
) -> Result<std::path::PathBuf, AssetError> {
let dest_path = cache.path_for(asset);
if dest_path.exists() {
let verification =
crate::verify_checksum(&dest_path, &asset.checksum, asset.checksum_algo.clone())
.await;
if verification.is_ok() {
log::debug!("Asset already cached at {:?}", dest_path);
return Ok(dest_path);
}
}
cache.ensure_dir(asset).await?;
log::info!(
"Downloading asset from s3://{}/{}",
self.config.bucket,
asset.url
);
let key = extract_s3_key(&asset.url, &self.config.bucket)?;
let obj = self
.client
.get_object()
.bucket(&self.config.bucket)
.key(&key)
.send()
.await
.map_err(|e| AssetError::S3Error(e.to_string()))?;
let total_size = obj.content_length.unwrap_or(0) as u64;
let mut file = tokio::fs::File::create(&dest_path)
.await
.map_err(|e| AssetError::IoError(e))?;
let mut stream = obj.body;
let mut downloaded: u64 = 0;
while let Some(chunk_result) = stream.next().await {
let chunk = chunk_result.map_err(|e| AssetError::S3Error(e.to_string()))?;
tokio::io::AsyncWriteExt::write_all(&mut file, &chunk)
.await
.map_err(|e| AssetError::IoError(e))?;
downloaded += chunk.len() as u64;
if let Some(ref p) = progress {
p(downloaded, Some(total_size));
}
}
tokio::io::AsyncWriteExt::flush(&mut file)
.await
.map_err(|e| AssetError::IoError(e))?;
drop(file);
crate::verify_checksum(&dest_path, &asset.checksum, asset.checksum_algo.clone()).await?;
Ok(dest_path)
}
async fn exists(&self, key: &str) -> Result<bool, AssetError> {
match self
.client
.head_object()
.bucket(&self.config.bucket)
.key(key)
.send()
.await
{
Ok(_) => Ok(true),
Err(e) => {
let err_str = e.to_string();
if err_str.contains("NoSuchKey") || err_str.contains("NotFound") {
Ok(false)
} else {
Err(AssetError::S3Error(err_str))
}
}
}
}
fn url_for(&self, key: &str) -> Result<Url, AssetError> {
self.public_url(key)
}
}
fn extract_s3_key(url: &Url, bucket: &str) -> Result<String, AssetError> {
let path = url.path().trim_start_matches('/');
if let Some(stripped) = path.strip_prefix(&format!("{}/", bucket)) {
Ok(stripped.to_string())
} else if path == bucket {
Ok(String::new())
} else {
Ok(path.to_string())
}
}

View File

@@ -183,7 +183,7 @@ impl OpenbaoSecretStore {
} }
#[cfg(not(unix))] #[cfg(not(unix))]
{ {
fs::write(path, serde_json::to_string(token)?.as_bytes())?; fs::write(path, token)?;
} }
Ok(()) Ok(())
} }

View File

@@ -1,65 +0,0 @@
apiVersion: v1
kind: Namespace
metadata:
name: harmony-iobench
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: iobench-privileged-scc
subjects:
- kind: ServiceAccount
name: iobench
namespace: harmony-iobench # Change to your namespace/project
roleRef:
kind: ClusterRole
name: system:openshift:scc:privileged
apiGroup: rbac.authorization.k8s.io
---
apiVersion: v1
kind: ServiceAccount
metadata:
name: iobench
namespace: harmony-iobench # Change to your namespace/project
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: iobench
namespace: harmony-iobench # Change to your namespace/project
labels:
app: iobench
spec:
replicas: 1
selector:
matchLabels:
app: iobench
template:
metadata:
labels:
app: iobench
spec:
nodeSelector:
#iobench: "true" # Tune to match your target node's label(s), e.g.,
kubernetes.io/hostname: worker3
tolerations:
- key: node.kubernetes.io/unschedulable
operator: Exists
effect: NoSchedule
serviceAccountName: iobench
containers:
- name: fio
image: juicedata/fio:latest # Replace with your preferred fio image
imagePullPolicy: IfNotPresent
command: [ "sleep", "infinity" ] # Keeps the container running for kubectl exec
securityContext:
runAsUser: 0
privileged: true
volumeMounts:
- name: iobench-pvc
mountPath: /data # Mount the PVC at /data
volumes:
- name: iobench-pvc
hostPath:
path: /var/lib/harmony-iobench # Tune this to your desired host node path
type: DirectoryOrCreate