Compare commits
1 Commits
feat/rustf
...
snapshot-l
| Author | SHA1 | Date | |
|---|---|---|---|
| 7cb5237fdd |
878
Cargo.lock
generated
878
Cargo.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -23,7 +23,6 @@ members = [
|
|||||||
"harmony_agent/deploy",
|
"harmony_agent/deploy",
|
||||||
"harmony_node_readiness",
|
"harmony_node_readiness",
|
||||||
"harmony-k8s",
|
"harmony-k8s",
|
||||||
"harmony_assets",
|
|
||||||
]
|
]
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
@@ -38,7 +37,6 @@ derive-new = "0.7"
|
|||||||
async-trait = "0.1"
|
async-trait = "0.1"
|
||||||
tokio = { version = "1.40", features = [
|
tokio = { version = "1.40", features = [
|
||||||
"io-std",
|
"io-std",
|
||||||
"io-util",
|
|
||||||
"fs",
|
"fs",
|
||||||
"macros",
|
"macros",
|
||||||
"rt-multi-thread",
|
"rt-multi-thread",
|
||||||
@@ -75,7 +73,6 @@ base64 = "0.22.1"
|
|||||||
tar = "0.4.44"
|
tar = "0.4.44"
|
||||||
lazy_static = "1.5.0"
|
lazy_static = "1.5.0"
|
||||||
directories = "6.0.0"
|
directories = "6.0.0"
|
||||||
futures-util = "0.3"
|
|
||||||
thiserror = "2.0.14"
|
thiserror = "2.0.14"
|
||||||
serde = { version = "1.0.209", features = ["derive", "rc"] }
|
serde = { version = "1.0.209", features = ["derive", "rc"] }
|
||||||
serde_json = "1.0.127"
|
serde_json = "1.0.127"
|
||||||
@@ -89,4 +86,3 @@ reqwest = { version = "0.12", features = [
|
|||||||
"json",
|
"json",
|
||||||
], default-features = false }
|
], default-features = false }
|
||||||
assertor = "0.0.4"
|
assertor = "0.0.4"
|
||||||
tokio-test = "0.4"
|
|
||||||
|
|||||||
@@ -14,7 +14,6 @@ If you're new to Harmony, start here:
|
|||||||
See how to use Harmony to solve real-world problems.
|
See how to use Harmony to solve real-world problems.
|
||||||
|
|
||||||
- [**PostgreSQL on Local K3D**](./use-cases/postgresql-on-local-k3d.md): Deploy a production-grade PostgreSQL cluster on a local K3D cluster. The fastest way to get started.
|
- [**PostgreSQL on Local K3D**](./use-cases/postgresql-on-local-k3d.md): Deploy a production-grade PostgreSQL cluster on a local K3D cluster. The fastest way to get started.
|
||||||
- [**RustFS on Local K3D**](./use-cases/rustfs-on-local-k3d.md): Deploy a RustFS S3-compatible object store on a local K3D cluster.
|
|
||||||
- [**OKD on Bare Metal**](./use-cases/okd-on-bare-metal.md): A detailed walkthrough of bootstrapping a high-availability OKD cluster from physical hardware.
|
- [**OKD on Bare Metal**](./use-cases/okd-on-bare-metal.md): A detailed walkthrough of bootstrapping a high-availability OKD cluster from physical hardware.
|
||||||
|
|
||||||
## 3. Component Catalogs
|
## 3. Component Catalogs
|
||||||
|
|||||||
@@ -1,151 +0,0 @@
|
|||||||
# Use Case: RustFS (S3-Compatible Store) on Local K3D
|
|
||||||
|
|
||||||
Deploy a RustFS object store on a local Kubernetes cluster (K3D) using Harmony. RustFS is a Rust-based S3-compatible storage server, a modern alternative to MinIO for local development.
|
|
||||||
|
|
||||||
## What you'll have at the end
|
|
||||||
|
|
||||||
A fully operational S3-compatible object store with:
|
|
||||||
- 1 standalone instance with 1 GiB of storage
|
|
||||||
- S3 API endpoint on port 9000
|
|
||||||
- Web console on port 9001
|
|
||||||
- Ingress-based access at `http://rustfs.local`
|
|
||||||
- Default credentials: `rustfsadmin` / `rustfsadmin`
|
|
||||||
|
|
||||||
## Prerequisites
|
|
||||||
|
|
||||||
- Rust 2024 edition
|
|
||||||
- Docker running locally
|
|
||||||
- ~5 minutes
|
|
||||||
|
|
||||||
## The Score
|
|
||||||
|
|
||||||
The entire deployment is expressed in ~20 lines of Rust:
|
|
||||||
|
|
||||||
```rust
|
|
||||||
use harmony::{
|
|
||||||
inventory::Inventory,
|
|
||||||
modules::rustfs::{K8sRustFsScore, RustFsConfig},
|
|
||||||
topology::K8sAnywhereTopology,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() {
|
|
||||||
let rustfs = K8sRustFsScore {
|
|
||||||
config: RustFsConfig {
|
|
||||||
release_name: "harmony-rustfs".to_string(),
|
|
||||||
namespace: "harmony-rustfs".to_string(),
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
harmony_cli::run(
|
|
||||||
Inventory::autoload(),
|
|
||||||
K8sAnywhereTopology::from_env(),
|
|
||||||
vec![Box::new(rustfs)],
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
## What Harmony does
|
|
||||||
|
|
||||||
When you run this, Harmony:
|
|
||||||
|
|
||||||
1. **Connects to K8sAnywhereTopology** — auto-provisions a K3D cluster if none exists
|
|
||||||
2. **Creates a namespace** — `harmony-rustfs` (or your custom namespace)
|
|
||||||
3. **Creates credentials secret** — stores the access/secret keys securely
|
|
||||||
4. **Deploys via Helm** — installs the RustFS chart in standalone mode
|
|
||||||
5. **Configures Ingress** — sets up routing at `rustfs.local`
|
|
||||||
|
|
||||||
## Running it
|
|
||||||
|
|
||||||
```bash
|
|
||||||
cargo run -p example-rustfs
|
|
||||||
```
|
|
||||||
|
|
||||||
## Verifying the deployment
|
|
||||||
|
|
||||||
```bash
|
|
||||||
# Check pods
|
|
||||||
kubectl get pods -n harmony-rustfs
|
|
||||||
|
|
||||||
# Check ingress
|
|
||||||
kubectl get ingress -n harmony-rustfs
|
|
||||||
|
|
||||||
# Access the S3 API
|
|
||||||
# Add rustfs.local to your /etc/hosts
|
|
||||||
echo "127.0.0.1 rustfs.local" | sudo tee -a /etc/hosts
|
|
||||||
|
|
||||||
# Use the AWS CLI or any S3 client
|
|
||||||
AWS_ACCESS_KEY_ID=rustfsadmin \
|
|
||||||
AWS_SECRET_ACCESS_KEY=rustfsadmin \
|
|
||||||
aws s3 ls --endpoint-url http://rustfs.local:9000
|
|
||||||
|
|
||||||
# Or via the web console
|
|
||||||
open http://rustfs.local:9001
|
|
||||||
```
|
|
||||||
|
|
||||||
## Customizing the deployment
|
|
||||||
|
|
||||||
The `RustFsConfig` struct supports:
|
|
||||||
|
|
||||||
| Field | Default | Description |
|
|
||||||
|-------|---------|-------------|
|
|
||||||
| `release_name` | `rustfs` | Helm release name |
|
|
||||||
| `namespace` | `harmony-rustfs` | Kubernetes namespace |
|
|
||||||
| `storage_size` | `1Gi` | Data storage size |
|
|
||||||
| `mode` | `Standalone` | Deployment mode (standalone only for now) |
|
|
||||||
| `access_key` | `None` | S3 access key (default: `rustfsadmin`) |
|
|
||||||
| `secret_key` | `None` | S3 secret key (default: `rustfsadmin`) |
|
|
||||||
| `ingress_class` | `None` | Ingress class to use (default: `nginx`) |
|
|
||||||
|
|
||||||
Example with custom credentials:
|
|
||||||
|
|
||||||
```rust
|
|
||||||
let rustfs = K8sRustFsScore {
|
|
||||||
config: RustFsConfig {
|
|
||||||
release_name: "my-rustfs".to_string(),
|
|
||||||
namespace: "storage".to_string(),
|
|
||||||
access_key: Some("myaccess".to_string()),
|
|
||||||
secret_key: Some("mysecret".to_string()),
|
|
||||||
ingress_class: Some("traefik".to_string()),
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
};
|
|
||||||
```
|
|
||||||
|
|
||||||
## Architecture
|
|
||||||
|
|
||||||
The RustFS module follows the same pattern as PostgreSQL:
|
|
||||||
|
|
||||||
```
|
|
||||||
┌─────────────────────────────────────────────────────────────┐
|
|
||||||
│ K8sRustFsScore (user-facing) │
|
|
||||||
│ └── K8sRustFsInterpret │
|
|
||||||
│ ├── ensure_namespace() │
|
|
||||||
│ ├── ensure_secret() → K8sResourceScore │
|
|
||||||
│ └── HelmChartScore → HelmChartInterpret │
|
|
||||||
│ └── Installs rustfs/rustfs chart │
|
|
||||||
└─────────────────────────────────────────────────────────────┘
|
|
||||||
```
|
|
||||||
|
|
||||||
## Future: Unified S3 Capability
|
|
||||||
|
|
||||||
This is the first step toward a unified S3 capability that will work with:
|
|
||||||
- **RustFS** — local development (this example)
|
|
||||||
- **Ceph RGW** — production S3 via Rook/Ceph
|
|
||||||
- **AWS S3** — cloud-native S3
|
|
||||||
|
|
||||||
The pattern will be:
|
|
||||||
|
|
||||||
```rust
|
|
||||||
// Future: unified S3 interface
|
|
||||||
trait S3Store: Send + Sync {
|
|
||||||
async fn deploy_bucket(&self, config: &BucketConfig) -> Result<(), String>;
|
|
||||||
async fn get_endpoint(&self) -> Result<S3Endpoint, String>;
|
|
||||||
}
|
|
||||||
```
|
|
||||||
|
|
||||||
See the [Scores Catalog](../catalogs/scores.md) for related components.
|
|
||||||
@@ -7,7 +7,6 @@ This directory contains runnable examples demonstrating Harmony's capabilities.
|
|||||||
| Example | Description | Local K3D | Existing Cluster | Hardware Needed |
|
| Example | Description | Local K3D | Existing Cluster | Hardware Needed |
|
||||||
|---------|-------------|:---------:|:----------------:|:---------------:|
|
|---------|-------------|:---------:|:----------------:|:---------------:|
|
||||||
| `postgresql` | Deploy a PostgreSQL cluster | ✅ | ✅ | — |
|
| `postgresql` | Deploy a PostgreSQL cluster | ✅ | ✅ | — |
|
||||||
| `rustfs` | Deploy a RustFS S3-compatible store | ✅ | ✅ | — |
|
|
||||||
| `ntfy` | Deploy ntfy notification server | ✅ | ✅ | — |
|
| `ntfy` | Deploy ntfy notification server | ✅ | ✅ | — |
|
||||||
| `tenant` | Create a multi-tenant namespace | ✅ | ✅ | — |
|
| `tenant` | Create a multi-tenant namespace | ✅ | ✅ | — |
|
||||||
| `cert_manager` | Provision TLS certificates | ✅ | ✅ | — |
|
| `cert_manager` | Provision TLS certificates | ✅ | ✅ | — |
|
||||||
@@ -53,7 +52,6 @@ This directory contains runnable examples demonstrating Harmony's capabilities.
|
|||||||
- **`postgresql`** — Deploy a PostgreSQL cluster via CloudNativePG
|
- **`postgresql`** — Deploy a PostgreSQL cluster via CloudNativePG
|
||||||
- **`multisite_postgres`** — Multi-site PostgreSQL with failover
|
- **`multisite_postgres`** — Multi-site PostgreSQL with failover
|
||||||
- **`public_postgres`** — Public-facing PostgreSQL (⚠️ uses NationTech DNS)
|
- **`public_postgres`** — Public-facing PostgreSQL (⚠️ uses NationTech DNS)
|
||||||
- **`rustfs`** — Deploy a RustFS S3-compatible object store
|
|
||||||
|
|
||||||
### Kubernetes Utilities
|
### Kubernetes Utilities
|
||||||
- **`node_health`** — Check node health in a cluster
|
- **`node_health`** — Check node health in a cluster
|
||||||
|
|||||||
@@ -1,13 +0,0 @@
|
|||||||
[package]
|
|
||||||
name = "example-rustfs"
|
|
||||||
edition = "2024"
|
|
||||||
version.workspace = true
|
|
||||||
readme.workspace = true
|
|
||||||
license.workspace = true
|
|
||||||
publish = false
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
harmony = { path = "../../harmony" }
|
|
||||||
harmony_cli = { path = "../../harmony_cli" }
|
|
||||||
harmony_types = { path = "../../harmony_types" }
|
|
||||||
tokio = { workspace = true }
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
use harmony::{
|
|
||||||
inventory::Inventory,
|
|
||||||
modules::rustfs::{K8sRustFsScore, RustFsConfig},
|
|
||||||
topology::K8sAnywhereTopology,
|
|
||||||
};
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() {
|
|
||||||
let rustfs = K8sRustFsScore {
|
|
||||||
config: RustFsConfig {
|
|
||||||
release_name: "harmony-rustfs".to_string(),
|
|
||||||
namespace: "harmony-rustfs".to_string(),
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
};
|
|
||||||
|
|
||||||
harmony_cli::run(
|
|
||||||
Inventory::autoload(),
|
|
||||||
K8sAnywhereTopology::from_env(),
|
|
||||||
vec![Box::new(rustfs)],
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
}
|
|
||||||
@@ -21,7 +21,6 @@ pub mod openbao;
|
|||||||
pub mod opnsense;
|
pub mod opnsense;
|
||||||
pub mod postgresql;
|
pub mod postgresql;
|
||||||
pub mod prometheus;
|
pub mod prometheus;
|
||||||
pub mod rustfs;
|
|
||||||
pub mod storage;
|
pub mod storage;
|
||||||
pub mod tenant;
|
pub mod tenant;
|
||||||
pub mod tftp;
|
pub mod tftp;
|
||||||
|
|||||||
@@ -1,47 +0,0 @@
|
|||||||
use async_trait::async_trait;
|
|
||||||
use harmony_types::storage::StorageSize;
|
|
||||||
use serde::Serialize;
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize)]
|
|
||||||
pub struct RustFsConfig {
|
|
||||||
pub release_name: String,
|
|
||||||
pub namespace: String,
|
|
||||||
pub storage_size: StorageSize,
|
|
||||||
pub mode: RustFsMode,
|
|
||||||
pub access_key: Option<String>,
|
|
||||||
pub secret_key: Option<String>,
|
|
||||||
pub ingress_class: Option<String>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for RustFsConfig {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
release_name: "rustfs".to_string(),
|
|
||||||
namespace: "harmony-rustfs".to_string(),
|
|
||||||
storage_size: StorageSize::gi(1),
|
|
||||||
mode: RustFsMode::Standalone,
|
|
||||||
access_key: None,
|
|
||||||
secret_key: None,
|
|
||||||
ingress_class: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug, Serialize)]
|
|
||||||
pub enum RustFsMode {
|
|
||||||
Standalone,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
pub trait RustFs: Send + Sync {
|
|
||||||
async fn deploy(&self, config: &RustFsConfig) -> Result<String, String>;
|
|
||||||
async fn get_endpoint(&self, config: &RustFsConfig) -> Result<RustFsEndpoint, String>;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Clone, Debug)]
|
|
||||||
pub struct RustFsEndpoint {
|
|
||||||
pub s3_endpoint: String,
|
|
||||||
pub console_endpoint: String,
|
|
||||||
pub access_key: String,
|
|
||||||
pub secret_key: String,
|
|
||||||
}
|
|
||||||
@@ -1,6 +0,0 @@
|
|||||||
pub mod capability;
|
|
||||||
mod score;
|
|
||||||
mod score_k8s;
|
|
||||||
pub use capability::*;
|
|
||||||
pub use score::*;
|
|
||||||
pub use score_k8s::*;
|
|
||||||
@@ -1,85 +0,0 @@
|
|||||||
use async_trait::async_trait;
|
|
||||||
use harmony_types::id::Id;
|
|
||||||
use serde::Serialize;
|
|
||||||
|
|
||||||
use crate::data::Version;
|
|
||||||
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
|
|
||||||
use crate::inventory::Inventory;
|
|
||||||
use crate::modules::rustfs::capability::{RustFs, RustFsConfig};
|
|
||||||
use crate::score::Score;
|
|
||||||
use crate::topology::Topology;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
|
||||||
pub struct RustFsScore {
|
|
||||||
pub config: RustFsConfig,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for RustFsScore {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
config: RustFsConfig::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl RustFsScore {
|
|
||||||
pub fn new(namespace: &str) -> Self {
|
|
||||||
Self {
|
|
||||||
config: RustFsConfig {
|
|
||||||
namespace: namespace.to_string(),
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Topology + RustFs + Send + Sync> Score<T> for RustFsScore {
|
|
||||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
|
||||||
Box::new(RustFsInterpret {
|
|
||||||
config: self.config.clone(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn name(&self) -> String {
|
|
||||||
format!(
|
|
||||||
"RustFsScore({}:{})",
|
|
||||||
self.config.namespace, self.config.release_name
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
struct RustFsInterpret {
|
|
||||||
config: RustFsConfig,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<T: Topology + RustFs + Send + Sync> Interpret<T> for RustFsInterpret {
|
|
||||||
fn get_name(&self) -> InterpretName {
|
|
||||||
InterpretName::Custom("RustFsInterpret")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_version(&self) -> Version {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_status(&self) -> InterpretStatus {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_children(&self) -> Vec<Id> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn execute(&self, _inventory: &Inventory, topo: &T) -> Result<Outcome, InterpretError> {
|
|
||||||
let release_name = topo
|
|
||||||
.deploy(&self.config)
|
|
||||||
.await
|
|
||||||
.map_err(|e| InterpretError::new(e))?;
|
|
||||||
|
|
||||||
Ok(Outcome::success(format!(
|
|
||||||
"RustFS '{}' deployed in namespace '{}'",
|
|
||||||
release_name, self.config.namespace
|
|
||||||
)))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,285 +0,0 @@
|
|||||||
use std::str::FromStr;
|
|
||||||
|
|
||||||
use crate::data::Version;
|
|
||||||
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
|
|
||||||
use crate::inventory::Inventory;
|
|
||||||
use crate::modules::helm::chart::{HelmChartScore, HelmRepository};
|
|
||||||
use crate::modules::k8s::resource::K8sResourceScore;
|
|
||||||
use crate::modules::rustfs::capability::{RustFs, RustFsConfig, RustFsEndpoint, RustFsMode};
|
|
||||||
use crate::score::Score;
|
|
||||||
use crate::topology::{HelmCommand, K8sclient, Topology};
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use harmony_types::id::Id;
|
|
||||||
use harmony_types::net::Url;
|
|
||||||
use k8s_openapi::api::core::v1::Secret;
|
|
||||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
|
||||||
use k8s_openapi::ByteString;
|
|
||||||
use log::info;
|
|
||||||
use non_blank_string_rs::NonBlankString;
|
|
||||||
use serde::Serialize;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone, Serialize)]
|
|
||||||
pub struct K8sRustFsScore {
|
|
||||||
pub config: RustFsConfig,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for K8sRustFsScore {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
config: RustFsConfig::default(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl K8sRustFsScore {
|
|
||||||
pub fn new(namespace: &str) -> Self {
|
|
||||||
Self {
|
|
||||||
config: RustFsConfig {
|
|
||||||
namespace: namespace.to_string(),
|
|
||||||
..Default::default()
|
|
||||||
},
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl<T: Topology + K8sclient + HelmCommand + 'static> Score<T> for K8sRustFsScore {
|
|
||||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
|
||||||
Box::new(K8sRustFsInterpret {
|
|
||||||
config: self.config.clone(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
fn name(&self) -> String {
|
|
||||||
format!("K8sRustFsScore({})", self.config.namespace)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct K8sRustFsInterpret {
|
|
||||||
config: RustFsConfig,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl K8sRustFsInterpret {
|
|
||||||
async fn ensure_namespace<T: Topology + K8sclient>(
|
|
||||||
&self,
|
|
||||||
topology: &T,
|
|
||||||
) -> Result<(), InterpretError> {
|
|
||||||
let k8s_client = topology
|
|
||||||
.k8s_client()
|
|
||||||
.await
|
|
||||||
.map_err(|e| InterpretError::new(format!("Failed to get k8s client: {}", e)))?;
|
|
||||||
|
|
||||||
let namespace_name = &self.config.namespace;
|
|
||||||
|
|
||||||
if k8s_client
|
|
||||||
.namespace_exists(namespace_name)
|
|
||||||
.await
|
|
||||||
.map_err(|e| {
|
|
||||||
InterpretError::new(format!(
|
|
||||||
"Failed to check namespace '{}': {}",
|
|
||||||
namespace_name, e
|
|
||||||
))
|
|
||||||
})?
|
|
||||||
{
|
|
||||||
info!("Namespace '{}' already exists", namespace_name);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Creating namespace '{}'", namespace_name);
|
|
||||||
k8s_client
|
|
||||||
.create_namespace(namespace_name)
|
|
||||||
.await
|
|
||||||
.map_err(|e| {
|
|
||||||
InterpretError::new(format!(
|
|
||||||
"Failed to create namespace '{}': {}",
|
|
||||||
namespace_name, e
|
|
||||||
))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
k8s_client
|
|
||||||
.wait_for_namespace(namespace_name, Some(std::time::Duration::from_secs(30)))
|
|
||||||
.await
|
|
||||||
.map_err(|e| {
|
|
||||||
InterpretError::new(format!("Namespace '{}' not ready: {}", namespace_name, e))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
info!("Namespace '{}' is ready", namespace_name);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn ensure_secret<T: Topology + K8sclient>(
|
|
||||||
&self,
|
|
||||||
topology: &T,
|
|
||||||
) -> Result<(), InterpretError> {
|
|
||||||
let access_key = self.config.access_key.as_deref().unwrap_or("rustfsadmin");
|
|
||||||
let secret_key = self.config.secret_key.as_deref().unwrap_or("rustfsadmin");
|
|
||||||
|
|
||||||
let k8s_client = topology
|
|
||||||
.k8s_client()
|
|
||||||
.await
|
|
||||||
.map_err(|e| InterpretError::new(format!("Failed to get k8s client: {}", e)))?;
|
|
||||||
|
|
||||||
let namespace_name = &self.config.namespace;
|
|
||||||
let secret_name = format!("{}-credentials", self.config.release_name);
|
|
||||||
|
|
||||||
let secret_exists = k8s_client
|
|
||||||
.get_secret_json_value(&secret_name, Some(namespace_name))
|
|
||||||
.await
|
|
||||||
.is_ok();
|
|
||||||
|
|
||||||
if secret_exists {
|
|
||||||
info!(
|
|
||||||
"Secret '{}' already exists in namespace '{}'",
|
|
||||||
secret_name, namespace_name
|
|
||||||
);
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
info!("Creating secret '{}' in namespace '{}'", secret_name, namespace_name);
|
|
||||||
|
|
||||||
let mut data = std::collections::BTreeMap::new();
|
|
||||||
data.insert(
|
|
||||||
"access_key".to_string(),
|
|
||||||
ByteString(access_key.as_bytes().to_vec()),
|
|
||||||
);
|
|
||||||
data.insert(
|
|
||||||
"secret_key".to_string(),
|
|
||||||
ByteString(secret_key.as_bytes().to_vec()),
|
|
||||||
);
|
|
||||||
|
|
||||||
let secret = Secret {
|
|
||||||
metadata: ObjectMeta {
|
|
||||||
name: Some(secret_name.clone()),
|
|
||||||
namespace: Some(namespace_name.clone()),
|
|
||||||
..ObjectMeta::default()
|
|
||||||
},
|
|
||||||
data: Some(data),
|
|
||||||
string_data: None,
|
|
||||||
type_: Some("Opaque".to_string()),
|
|
||||||
..Secret::default()
|
|
||||||
};
|
|
||||||
|
|
||||||
K8sResourceScore::single(secret, Some(namespace_name.clone()))
|
|
||||||
.create_interpret()
|
|
||||||
.execute(&Inventory::empty(), topology)
|
|
||||||
.await?;
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn to_values_yaml(&self) -> String {
|
|
||||||
let storage_size = self.config.storage_size.to_string();
|
|
||||||
let ingress_class = self.config.ingress_class.as_deref().unwrap_or("nginx");
|
|
||||||
|
|
||||||
let mode_yaml = match self.config.mode {
|
|
||||||
RustFsMode::Standalone => {
|
|
||||||
"mode:\n standalone:\n enabled: true\n distributed:\n enabled: false"
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
format!(
|
|
||||||
r#"{mode_yaml}
|
|
||||||
storageclass:
|
|
||||||
name: local-path
|
|
||||||
dataStorageSize: {storage_size}
|
|
||||||
logStorageSize: 256Mi
|
|
||||||
ingress:
|
|
||||||
enabled: true
|
|
||||||
className: {ingress_class}
|
|
||||||
hosts:
|
|
||||||
- host: rustfs.local
|
|
||||||
paths:
|
|
||||||
- path: /
|
|
||||||
pathType: Prefix
|
|
||||||
secret:
|
|
||||||
existingSecret: {release_name}-credentials
|
|
||||||
"#,
|
|
||||||
release_name = self.config.release_name
|
|
||||||
)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl<T: Topology + K8sclient + HelmCommand + 'static> Interpret<T> for K8sRustFsInterpret {
|
|
||||||
async fn execute(
|
|
||||||
&self,
|
|
||||||
_inventory: &Inventory,
|
|
||||||
topology: &T,
|
|
||||||
) -> Result<Outcome, InterpretError> {
|
|
||||||
self.ensure_namespace(topology).await?;
|
|
||||||
self.ensure_secret(topology).await?;
|
|
||||||
|
|
||||||
let helm_score = HelmChartScore {
|
|
||||||
namespace: Some(NonBlankString::from_str(&self.config.namespace).unwrap()),
|
|
||||||
release_name: NonBlankString::from_str(&self.config.release_name).unwrap(),
|
|
||||||
chart_name: NonBlankString::from_str("rustfs/rustfs").unwrap(),
|
|
||||||
chart_version: None,
|
|
||||||
values_overrides: None,
|
|
||||||
values_yaml: Some(self.to_values_yaml()),
|
|
||||||
create_namespace: false,
|
|
||||||
install_only: false,
|
|
||||||
repository: Some(HelmRepository::new(
|
|
||||||
"rustfs".to_string(),
|
|
||||||
Url::Url(url::Url::parse("https://charts.rustfs.com").unwrap()),
|
|
||||||
true,
|
|
||||||
)),
|
|
||||||
};
|
|
||||||
|
|
||||||
helm_score
|
|
||||||
.create_interpret()
|
|
||||||
.execute(&Inventory::empty(), topology)
|
|
||||||
.await
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_name(&self) -> InterpretName {
|
|
||||||
InterpretName::Custom("K8sRustFsInterpret")
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_version(&self) -> Version {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_status(&self) -> InterpretStatus {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_children(&self) -> Vec<Id> {
|
|
||||||
todo!()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub struct K8sAnywhereRustFs;
|
|
||||||
|
|
||||||
impl K8sAnywhereRustFs {
|
|
||||||
pub fn new() -> Self {
|
|
||||||
Self
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for K8sAnywhereRustFs {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::new()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl RustFs for K8sAnywhereRustFs {
|
|
||||||
async fn deploy(&self, config: &RustFsConfig) -> Result<String, String> {
|
|
||||||
Ok(config.release_name.clone())
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_endpoint(&self, config: &RustFsConfig) -> Result<RustFsEndpoint, String> {
|
|
||||||
Ok(RustFsEndpoint {
|
|
||||||
s3_endpoint: "http://rustfs.local:9000".to_string(),
|
|
||||||
console_endpoint: "http://rustfs.local:9001".to_string(),
|
|
||||||
access_key: config
|
|
||||||
.access_key
|
|
||||||
.clone()
|
|
||||||
.unwrap_or_else(|| "rustfsadmin".to_string()),
|
|
||||||
secret_key: config
|
|
||||||
.secret_key
|
|
||||||
.clone()
|
|
||||||
.unwrap_or_else(|| "rustfsadmin".to_string()),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,56 +0,0 @@
|
|||||||
[package]
|
|
||||||
name = "harmony_assets"
|
|
||||||
edition = "2024"
|
|
||||||
version.workspace = true
|
|
||||||
readme.workspace = true
|
|
||||||
license.workspace = true
|
|
||||||
|
|
||||||
[lib]
|
|
||||||
name = "harmony_assets"
|
|
||||||
|
|
||||||
[[bin]]
|
|
||||||
name = "harmony_assets"
|
|
||||||
path = "src/cli/mod.rs"
|
|
||||||
required-features = ["cli"]
|
|
||||||
|
|
||||||
[features]
|
|
||||||
default = ["blake3"]
|
|
||||||
sha256 = ["dep:sha2"]
|
|
||||||
blake3 = ["dep:blake3"]
|
|
||||||
s3 = [
|
|
||||||
"dep:aws-sdk-s3",
|
|
||||||
"dep:aws-config",
|
|
||||||
]
|
|
||||||
cli = [
|
|
||||||
"dep:clap",
|
|
||||||
"dep:indicatif",
|
|
||||||
"dep:inquire",
|
|
||||||
]
|
|
||||||
reqwest = ["dep:reqwest"]
|
|
||||||
|
|
||||||
[dependencies]
|
|
||||||
log.workspace = true
|
|
||||||
tokio.workspace = true
|
|
||||||
thiserror.workspace = true
|
|
||||||
directories.workspace = true
|
|
||||||
sha2 = { version = "0.10", optional = true }
|
|
||||||
blake3 = { version = "1.5", optional = true }
|
|
||||||
reqwest = { version = "0.12", optional = true, default-features = false, features = ["stream", "rustls-tls"] }
|
|
||||||
futures-util.workspace = true
|
|
||||||
async-trait.workspace = true
|
|
||||||
url.workspace = true
|
|
||||||
|
|
||||||
# CLI only
|
|
||||||
clap = { version = "4.5", features = ["derive"], optional = true }
|
|
||||||
indicatif = { version = "0.18", optional = true }
|
|
||||||
inquire = { version = "0.7", optional = true }
|
|
||||||
|
|
||||||
# S3 only
|
|
||||||
aws-sdk-s3 = { version = "1", optional = true }
|
|
||||||
aws-config = { version = "1", optional = true }
|
|
||||||
|
|
||||||
[dev-dependencies]
|
|
||||||
tempfile.workspace = true
|
|
||||||
httptest = "0.16"
|
|
||||||
pretty_assertions.workspace = true
|
|
||||||
tokio-test.workspace = true
|
|
||||||
@@ -1,80 +0,0 @@
|
|||||||
use crate::hash::ChecksumAlgo;
|
|
||||||
use std::path::PathBuf;
|
|
||||||
use url::Url;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct Asset {
|
|
||||||
pub url: Url,
|
|
||||||
pub checksum: String,
|
|
||||||
pub checksum_algo: ChecksumAlgo,
|
|
||||||
pub file_name: String,
|
|
||||||
pub size: Option<u64>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Asset {
|
|
||||||
pub fn new(url: Url, checksum: String, checksum_algo: ChecksumAlgo, file_name: String) -> Self {
|
|
||||||
Self {
|
|
||||||
url,
|
|
||||||
checksum,
|
|
||||||
checksum_algo,
|
|
||||||
file_name,
|
|
||||||
size: None,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn with_size(mut self, size: u64) -> Self {
|
|
||||||
self.size = Some(size);
|
|
||||||
self
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn formatted_checksum(&self) -> String {
|
|
||||||
crate::hash::format_checksum(&self.checksum, self.checksum_algo.clone())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct LocalCache {
|
|
||||||
pub base_dir: PathBuf,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LocalCache {
|
|
||||||
pub fn new(base_dir: PathBuf) -> Self {
|
|
||||||
Self { base_dir }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn path_for(&self, asset: &Asset) -> PathBuf {
|
|
||||||
let prefix = &asset.checksum[..16.min(asset.checksum.len())];
|
|
||||||
self.base_dir.join(prefix).join(&asset.file_name)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn cache_key_dir(&self, asset: &Asset) -> PathBuf {
|
|
||||||
let prefix = &asset.checksum[..16.min(asset.checksum.len())];
|
|
||||||
self.base_dir.join(prefix)
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn ensure_dir(&self, asset: &Asset) -> Result<(), crate::errors::AssetError> {
|
|
||||||
let dir = self.cache_key_dir(asset);
|
|
||||||
tokio::fs::create_dir_all(&dir)
|
|
||||||
.await
|
|
||||||
.map_err(|e| crate::errors::AssetError::IoError(e))?;
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for LocalCache {
|
|
||||||
fn default() -> Self {
|
|
||||||
let base_dir = directories::ProjectDirs::from("io", "NationTech", "Harmony")
|
|
||||||
.map(|dirs| dirs.cache_dir().join("assets"))
|
|
||||||
.unwrap_or_else(|| PathBuf::from("/tmp/harmony_assets"));
|
|
||||||
Self::new(base_dir)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct StoredAsset {
|
|
||||||
pub url: Url,
|
|
||||||
pub checksum: String,
|
|
||||||
pub checksum_algo: ChecksumAlgo,
|
|
||||||
pub size: u64,
|
|
||||||
pub key: String,
|
|
||||||
}
|
|
||||||
@@ -1,25 +0,0 @@
|
|||||||
use clap::Parser;
|
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
|
||||||
pub struct ChecksumArgs {
|
|
||||||
pub path: String,
|
|
||||||
#[arg(short, long, default_value = "blake3")]
|
|
||||||
pub algo: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn execute(args: ChecksumArgs) -> Result<(), Box<dyn std::error::Error>> {
|
|
||||||
use harmony_assets::{ChecksumAlgo, checksum_for_path};
|
|
||||||
|
|
||||||
let path = std::path::Path::new(&args.path);
|
|
||||||
if !path.exists() {
|
|
||||||
eprintln!("Error: File not found: {}", args.path);
|
|
||||||
std::process::exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
let algo = ChecksumAlgo::from_str(&args.algo)?;
|
|
||||||
let checksum = checksum_for_path(path, algo.clone()).await?;
|
|
||||||
|
|
||||||
println!("{}:{} {}", algo.name(), checksum, args.path);
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
@@ -1,82 +0,0 @@
|
|||||||
use clap::Parser;
|
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
|
||||||
pub struct DownloadArgs {
|
|
||||||
pub url: String,
|
|
||||||
pub checksum: String,
|
|
||||||
#[arg(short, long)]
|
|
||||||
pub output: Option<String>,
|
|
||||||
#[arg(short, long, default_value = "blake3")]
|
|
||||||
pub algo: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn execute(args: DownloadArgs) -> Result<(), Box<dyn std::error::Error>> {
|
|
||||||
use harmony_assets::{
|
|
||||||
Asset, AssetStore, ChecksumAlgo, LocalCache, LocalStore, verify_checksum,
|
|
||||||
};
|
|
||||||
use indicatif::{ProgressBar, ProgressStyle};
|
|
||||||
use url::Url;
|
|
||||||
|
|
||||||
let url = Url::parse(&args.url).map_err(|e| format!("Invalid URL: {}", e))?;
|
|
||||||
|
|
||||||
let file_name = args
|
|
||||||
.output
|
|
||||||
.or_else(|| {
|
|
||||||
std::path::Path::new(&args.url)
|
|
||||||
.file_name()
|
|
||||||
.and_then(|n| n.to_str())
|
|
||||||
.map(|s| s.to_string())
|
|
||||||
})
|
|
||||||
.unwrap_or_else(|| "download".to_string());
|
|
||||||
|
|
||||||
let algo = ChecksumAlgo::from_str(&args.algo)?;
|
|
||||||
let asset = Asset::new(url, args.checksum.clone(), algo.clone(), file_name);
|
|
||||||
|
|
||||||
let cache = LocalCache::default();
|
|
||||||
|
|
||||||
println!("Downloading: {}", asset.url);
|
|
||||||
println!("Checksum: {}:{}", algo.name(), args.checksum);
|
|
||||||
println!("Cache dir: {:?}", cache.base_dir);
|
|
||||||
|
|
||||||
let total_size = asset.size.unwrap_or(0);
|
|
||||||
let pb = if total_size > 0 {
|
|
||||||
let pb = ProgressBar::new(total_size);
|
|
||||||
pb.set_style(
|
|
||||||
ProgressStyle::default_bar()
|
|
||||||
.template("{spinner:.green} [{elapsed_precise}] [{bar:40}] {bytes}/{total_bytes} ({bytes_per_sec})")?
|
|
||||||
.progress_chars("=>-"),
|
|
||||||
);
|
|
||||||
Some(pb)
|
|
||||||
} else {
|
|
||||||
None
|
|
||||||
};
|
|
||||||
|
|
||||||
let progress_fn: Box<dyn Fn(u64, Option<u64>) + Send> = Box::new({
|
|
||||||
let pb = pb.clone();
|
|
||||||
move |bytes, _total| {
|
|
||||||
if let Some(ref pb) = pb {
|
|
||||||
pb.set_position(bytes);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
});
|
|
||||||
|
|
||||||
let store = LocalStore::default();
|
|
||||||
let result = store.fetch(&asset, &cache, Some(progress_fn)).await;
|
|
||||||
|
|
||||||
if let Some(pb) = pb {
|
|
||||||
pb.finish();
|
|
||||||
}
|
|
||||||
|
|
||||||
match result {
|
|
||||||
Ok(path) => {
|
|
||||||
verify_checksum(&path, &args.checksum, algo).await?;
|
|
||||||
println!("\nDownloaded to: {:?}", path);
|
|
||||||
println!("Checksum verified OK");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("Download failed: {}", e);
|
|
||||||
std::process::exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,49 +0,0 @@
|
|||||||
pub mod checksum;
|
|
||||||
pub mod download;
|
|
||||||
pub mod upload;
|
|
||||||
pub mod verify;
|
|
||||||
|
|
||||||
use clap::{Parser, Subcommand};
|
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
|
||||||
#[command(
|
|
||||||
name = "harmony_assets",
|
|
||||||
version,
|
|
||||||
about = "Asset management CLI for downloading, uploading, and verifying large binary assets"
|
|
||||||
)]
|
|
||||||
pub struct Cli {
|
|
||||||
#[command(subcommand)]
|
|
||||||
pub command: Commands,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Subcommand, Debug)]
|
|
||||||
pub enum Commands {
|
|
||||||
Upload(upload::UploadArgs),
|
|
||||||
Download(download::DownloadArgs),
|
|
||||||
Checksum(checksum::ChecksumArgs),
|
|
||||||
Verify(verify::VerifyArgs),
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::main]
|
|
||||||
async fn main() -> Result<(), Box<dyn std::error::Error>> {
|
|
||||||
log::info!("Starting harmony_assets CLI");
|
|
||||||
|
|
||||||
let cli = Cli::parse();
|
|
||||||
|
|
||||||
match cli.command {
|
|
||||||
Commands::Upload(args) => {
|
|
||||||
upload::execute(args).await?;
|
|
||||||
}
|
|
||||||
Commands::Download(args) => {
|
|
||||||
download::execute(args).await?;
|
|
||||||
}
|
|
||||||
Commands::Checksum(args) => {
|
|
||||||
checksum::execute(args).await?;
|
|
||||||
}
|
|
||||||
Commands::Verify(args) => {
|
|
||||||
verify::execute(args).await?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
@@ -1,166 +0,0 @@
|
|||||||
use clap::Parser;
|
|
||||||
use harmony_assets::{S3Config, S3Store, checksum_for_path_with_progress};
|
|
||||||
use indicatif::{ProgressBar, ProgressStyle};
|
|
||||||
use std::path::Path;
|
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
|
||||||
pub struct UploadArgs {
|
|
||||||
pub source: String,
|
|
||||||
pub key: Option<String>,
|
|
||||||
#[arg(short, long)]
|
|
||||||
pub content_type: Option<String>,
|
|
||||||
#[arg(short, long, default_value_t = true)]
|
|
||||||
pub public_read: bool,
|
|
||||||
#[arg(short, long)]
|
|
||||||
pub endpoint: Option<String>,
|
|
||||||
#[arg(short, long)]
|
|
||||||
pub bucket: Option<String>,
|
|
||||||
#[arg(short, long)]
|
|
||||||
pub region: Option<String>,
|
|
||||||
#[arg(short, long)]
|
|
||||||
pub access_key_id: Option<String>,
|
|
||||||
#[arg(short, long)]
|
|
||||||
pub secret_access_key: Option<String>,
|
|
||||||
#[arg(short, long, default_value = "blake3")]
|
|
||||||
pub algo: String,
|
|
||||||
#[arg(short, long, default_value_t = false)]
|
|
||||||
pub yes: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn execute(args: UploadArgs) -> Result<(), Box<dyn std::error::Error>> {
|
|
||||||
let source_path = Path::new(&args.source);
|
|
||||||
if !source_path.exists() {
|
|
||||||
eprintln!("Error: File not found: {}", args.source);
|
|
||||||
std::process::exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
let key = args.key.unwrap_or_else(|| {
|
|
||||||
source_path
|
|
||||||
.file_name()
|
|
||||||
.and_then(|n| n.to_str())
|
|
||||||
.unwrap_or("upload")
|
|
||||||
.to_string()
|
|
||||||
});
|
|
||||||
|
|
||||||
let metadata = tokio::fs::metadata(source_path)
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("Failed to read file metadata: {}", e))?;
|
|
||||||
let total_size = metadata.len();
|
|
||||||
|
|
||||||
let endpoint = args
|
|
||||||
.endpoint
|
|
||||||
.or_else(|| std::env::var("S3_ENDPOINT").ok())
|
|
||||||
.unwrap_or_default();
|
|
||||||
let bucket = args
|
|
||||||
.bucket
|
|
||||||
.or_else(|| std::env::var("S3_BUCKET").ok())
|
|
||||||
.unwrap_or_else(|| {
|
|
||||||
inquire::Text::new("S3 Bucket name:")
|
|
||||||
.with_default("harmony-assets")
|
|
||||||
.prompt()
|
|
||||||
.unwrap()
|
|
||||||
});
|
|
||||||
let region = args
|
|
||||||
.region
|
|
||||||
.or_else(|| std::env::var("S3_REGION").ok())
|
|
||||||
.unwrap_or_else(|| {
|
|
||||||
inquire::Text::new("S3 Region:")
|
|
||||||
.with_default("us-east-1")
|
|
||||||
.prompt()
|
|
||||||
.unwrap()
|
|
||||||
});
|
|
||||||
let access_key_id = args
|
|
||||||
.access_key_id
|
|
||||||
.or_else(|| std::env::var("AWS_ACCESS_KEY_ID").ok());
|
|
||||||
let secret_access_key = args
|
|
||||||
.secret_access_key
|
|
||||||
.or_else(|| std::env::var("AWS_SECRET_ACCESS_KEY").ok());
|
|
||||||
|
|
||||||
let config = S3Config {
|
|
||||||
endpoint: if endpoint.is_empty() {
|
|
||||||
None
|
|
||||||
} else {
|
|
||||||
Some(endpoint)
|
|
||||||
},
|
|
||||||
bucket: bucket.clone(),
|
|
||||||
region: region.clone(),
|
|
||||||
access_key_id,
|
|
||||||
secret_access_key,
|
|
||||||
public_read: args.public_read,
|
|
||||||
};
|
|
||||||
|
|
||||||
println!("Upload Configuration:");
|
|
||||||
println!(" Source: {}", args.source);
|
|
||||||
println!(" S3 Key: {}", key);
|
|
||||||
println!(" Bucket: {}", bucket);
|
|
||||||
println!(" Region: {}", region);
|
|
||||||
println!(
|
|
||||||
" Size: {} bytes ({} MB)",
|
|
||||||
total_size,
|
|
||||||
total_size as f64 / 1024.0 / 1024.0
|
|
||||||
);
|
|
||||||
println!();
|
|
||||||
|
|
||||||
if !args.yes {
|
|
||||||
let confirm = inquire::Confirm::new("Proceed with upload?")
|
|
||||||
.with_default(true)
|
|
||||||
.prompt()?;
|
|
||||||
if !confirm {
|
|
||||||
println!("Upload cancelled.");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
let store = S3Store::new(config)
|
|
||||||
.await
|
|
||||||
.map_err(|e| format!("Failed to initialize S3 client: {}", e))?;
|
|
||||||
|
|
||||||
println!("Computing checksum while uploading...\n");
|
|
||||||
|
|
||||||
let pb = ProgressBar::new(total_size);
|
|
||||||
pb.set_style(
|
|
||||||
ProgressStyle::default_bar()
|
|
||||||
.template("{spinner:.green} [{elapsed_precise}] [{bar:40}] {bytes}/{total_bytes} ({bytes_per_sec})")?
|
|
||||||
.progress_chars("=>-"),
|
|
||||||
);
|
|
||||||
|
|
||||||
{
|
|
||||||
let algo = harmony_assets::ChecksumAlgo::from_str(&args.algo)?;
|
|
||||||
let rt = tokio::runtime::Handle::current();
|
|
||||||
let pb_clone = pb.clone();
|
|
||||||
let _checksum = rt.block_on(checksum_for_path_with_progress(
|
|
||||||
source_path,
|
|
||||||
algo,
|
|
||||||
|read, _total| {
|
|
||||||
pb_clone.set_position(read);
|
|
||||||
},
|
|
||||||
))?;
|
|
||||||
}
|
|
||||||
|
|
||||||
pb.set_position(total_size);
|
|
||||||
|
|
||||||
let result = store
|
|
||||||
.store(source_path, &key, args.content_type.as_deref())
|
|
||||||
.await;
|
|
||||||
|
|
||||||
pb.finish();
|
|
||||||
|
|
||||||
match result {
|
|
||||||
Ok(asset) => {
|
|
||||||
println!("\nUpload complete!");
|
|
||||||
println!(" URL: {}", asset.url);
|
|
||||||
println!(
|
|
||||||
" Checksum: {}:{}",
|
|
||||||
asset.checksum_algo.name(),
|
|
||||||
asset.checksum
|
|
||||||
);
|
|
||||||
println!(" Size: {} bytes", asset.size);
|
|
||||||
println!(" Key: {}", asset.key);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("Upload failed: {}", e);
|
|
||||||
std::process::exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,32 +0,0 @@
|
|||||||
use clap::Parser;
|
|
||||||
|
|
||||||
#[derive(Parser, Debug)]
|
|
||||||
pub struct VerifyArgs {
|
|
||||||
pub path: String,
|
|
||||||
pub expected: String,
|
|
||||||
#[arg(short, long, default_value = "blake3")]
|
|
||||||
pub algo: String,
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn execute(args: VerifyArgs) -> Result<(), Box<dyn std::error::Error>> {
|
|
||||||
use harmony_assets::{ChecksumAlgo, verify_checksum};
|
|
||||||
|
|
||||||
let path = std::path::Path::new(&args.path);
|
|
||||||
if !path.exists() {
|
|
||||||
eprintln!("Error: File not found: {}", args.path);
|
|
||||||
std::process::exit(1);
|
|
||||||
}
|
|
||||||
|
|
||||||
let algo = ChecksumAlgo::from_str(&args.algo)?;
|
|
||||||
|
|
||||||
match verify_checksum(path, &args.expected, algo).await {
|
|
||||||
Ok(()) => {
|
|
||||||
println!("Checksum verified OK");
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("Verification FAILED: {}", e);
|
|
||||||
std::process::exit(1);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,37 +0,0 @@
|
|||||||
use std::path::PathBuf;
|
|
||||||
use thiserror::Error;
|
|
||||||
|
|
||||||
#[derive(Debug, Error)]
|
|
||||||
pub enum AssetError {
|
|
||||||
#[error("File not found: {0}")]
|
|
||||||
FileNotFound(PathBuf),
|
|
||||||
|
|
||||||
#[error("Checksum mismatch for '{path}': expected {expected}, got {actual}")]
|
|
||||||
ChecksumMismatch {
|
|
||||||
path: PathBuf,
|
|
||||||
expected: String,
|
|
||||||
actual: String,
|
|
||||||
},
|
|
||||||
|
|
||||||
#[error("Checksum algorithm not available: {0}. Enable the corresponding feature flag.")]
|
|
||||||
ChecksumAlgoNotAvailable(String),
|
|
||||||
|
|
||||||
#[error("Download failed: {0}")]
|
|
||||||
DownloadFailed(String),
|
|
||||||
|
|
||||||
#[error("S3 error: {0}")]
|
|
||||||
S3Error(String),
|
|
||||||
|
|
||||||
#[error("IO error: {0}")]
|
|
||||||
IoError(#[from] std::io::Error),
|
|
||||||
|
|
||||||
#[cfg(feature = "reqwest")]
|
|
||||||
#[error("HTTP error: {0}")]
|
|
||||||
HttpError(#[from] reqwest::Error),
|
|
||||||
|
|
||||||
#[error("Store error: {0}")]
|
|
||||||
StoreError(String),
|
|
||||||
|
|
||||||
#[error("Configuration error: {0}")]
|
|
||||||
ConfigError(String),
|
|
||||||
}
|
|
||||||
@@ -1,233 +0,0 @@
|
|||||||
use crate::errors::AssetError;
|
|
||||||
use std::path::Path;
|
|
||||||
|
|
||||||
#[cfg(feature = "blake3")]
|
|
||||||
use blake3::Hasher as B3Hasher;
|
|
||||||
#[cfg(feature = "sha256")]
|
|
||||||
use sha2::{Digest, Sha256};
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub enum ChecksumAlgo {
|
|
||||||
BLAKE3,
|
|
||||||
SHA256,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for ChecksumAlgo {
|
|
||||||
fn default() -> Self {
|
|
||||||
#[cfg(feature = "blake3")]
|
|
||||||
return ChecksumAlgo::BLAKE3;
|
|
||||||
#[cfg(not(feature = "blake3"))]
|
|
||||||
return ChecksumAlgo::SHA256;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ChecksumAlgo {
|
|
||||||
pub fn name(&self) -> &'static str {
|
|
||||||
match self {
|
|
||||||
ChecksumAlgo::BLAKE3 => "blake3",
|
|
||||||
ChecksumAlgo::SHA256 => "sha256",
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn from_str(s: &str) -> Result<Self, AssetError> {
|
|
||||||
match s.to_lowercase().as_str() {
|
|
||||||
"blake3" | "b3" => Ok(ChecksumAlgo::BLAKE3),
|
|
||||||
"sha256" | "sha-256" => Ok(ChecksumAlgo::SHA256),
|
|
||||||
_ => Err(AssetError::ChecksumAlgoNotAvailable(s.to_string())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl std::fmt::Display for ChecksumAlgo {
|
|
||||||
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
|
|
||||||
write!(f, "{}", self.name())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn checksum_for_file<R>(reader: R, algo: ChecksumAlgo) -> Result<String, AssetError>
|
|
||||||
where
|
|
||||||
R: tokio::io::AsyncRead + Unpin,
|
|
||||||
{
|
|
||||||
match algo {
|
|
||||||
#[cfg(feature = "blake3")]
|
|
||||||
ChecksumAlgo::BLAKE3 => {
|
|
||||||
let mut hasher = B3Hasher::new();
|
|
||||||
let mut reader = reader;
|
|
||||||
let mut buf = vec![0u8; 65536];
|
|
||||||
loop {
|
|
||||||
let n = tokio::io::AsyncReadExt::read(&mut reader, &mut buf).await?;
|
|
||||||
if n == 0 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
hasher.update(&buf[..n]);
|
|
||||||
}
|
|
||||||
Ok(hasher.finalize().to_hex().to_string())
|
|
||||||
}
|
|
||||||
#[cfg(not(feature = "blake3"))]
|
|
||||||
ChecksumAlgo::BLAKE3 => Err(AssetError::ChecksumAlgoNotAvailable("blake3".to_string())),
|
|
||||||
#[cfg(feature = "sha256")]
|
|
||||||
ChecksumAlgo::SHA256 => {
|
|
||||||
let mut hasher = Sha256::new();
|
|
||||||
let mut reader = reader;
|
|
||||||
let mut buf = vec![0u8; 65536];
|
|
||||||
loop {
|
|
||||||
let n = tokio::io::AsyncReadExt::read(&mut reader, &mut buf).await?;
|
|
||||||
if n == 0 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
hasher.update(&buf[..n]);
|
|
||||||
}
|
|
||||||
Ok(format!("{:x}", hasher.finalize()))
|
|
||||||
}
|
|
||||||
#[cfg(not(feature = "sha256"))]
|
|
||||||
ChecksumAlgo::SHA256 => Err(AssetError::ChecksumAlgoNotAvailable("sha256".to_string())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn checksum_for_path(path: &Path, algo: ChecksumAlgo) -> Result<String, AssetError> {
|
|
||||||
let file = tokio::fs::File::open(path)
|
|
||||||
.await
|
|
||||||
.map_err(|e| AssetError::IoError(e))?;
|
|
||||||
let reader = tokio::io::BufReader::with_capacity(65536, file);
|
|
||||||
checksum_for_file(reader, algo).await
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn checksum_for_path_with_progress<F>(
|
|
||||||
path: &Path,
|
|
||||||
algo: ChecksumAlgo,
|
|
||||||
mut progress: F,
|
|
||||||
) -> Result<String, AssetError>
|
|
||||||
where
|
|
||||||
F: FnMut(u64, Option<u64>) + Send,
|
|
||||||
{
|
|
||||||
let file = tokio::fs::File::open(path)
|
|
||||||
.await
|
|
||||||
.map_err(|e| AssetError::IoError(e))?;
|
|
||||||
let metadata = file.metadata().await.map_err(|e| AssetError::IoError(e))?;
|
|
||||||
let total = Some(metadata.len());
|
|
||||||
let reader = tokio::io::BufReader::with_capacity(65536, file);
|
|
||||||
|
|
||||||
match algo {
|
|
||||||
#[cfg(feature = "blake3")]
|
|
||||||
ChecksumAlgo::BLAKE3 => {
|
|
||||||
let mut hasher = B3Hasher::new();
|
|
||||||
let mut reader = reader;
|
|
||||||
let mut buf = vec![0u8; 65536];
|
|
||||||
let mut read: u64 = 0;
|
|
||||||
loop {
|
|
||||||
let n = tokio::io::AsyncReadExt::read(&mut reader, &mut buf).await?;
|
|
||||||
if n == 0 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
hasher.update(&buf[..n]);
|
|
||||||
read += n as u64;
|
|
||||||
progress(read, total);
|
|
||||||
}
|
|
||||||
Ok(hasher.finalize().to_hex().to_string())
|
|
||||||
}
|
|
||||||
#[cfg(not(feature = "blake3"))]
|
|
||||||
ChecksumAlgo::BLAKE3 => Err(AssetError::ChecksumAlgoNotAvailable("blake3".to_string())),
|
|
||||||
#[cfg(feature = "sha256")]
|
|
||||||
ChecksumAlgo::SHA256 => {
|
|
||||||
let mut hasher = Sha256::new();
|
|
||||||
let mut reader = reader;
|
|
||||||
let mut buf = vec![0u8; 65536];
|
|
||||||
let mut read: u64 = 0;
|
|
||||||
loop {
|
|
||||||
let n = tokio::io::AsyncReadExt::read(&mut reader, &mut buf).await?;
|
|
||||||
if n == 0 {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
hasher.update(&buf[..n]);
|
|
||||||
read += n as u64;
|
|
||||||
progress(read, total);
|
|
||||||
}
|
|
||||||
Ok(format!("{:x}", hasher.finalize()))
|
|
||||||
}
|
|
||||||
#[cfg(not(feature = "sha256"))]
|
|
||||||
ChecksumAlgo::SHA256 => Err(AssetError::ChecksumAlgoNotAvailable("sha256".to_string())),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn verify_checksum(
|
|
||||||
path: &Path,
|
|
||||||
expected: &str,
|
|
||||||
algo: ChecksumAlgo,
|
|
||||||
) -> Result<(), AssetError> {
|
|
||||||
let actual = checksum_for_path(path, algo).await?;
|
|
||||||
let expected_clean = expected
|
|
||||||
.trim_start_matches("blake3:")
|
|
||||||
.trim_start_matches("sha256:")
|
|
||||||
.trim_start_matches("b3:")
|
|
||||||
.trim_start_matches("sha-256:");
|
|
||||||
if actual != expected_clean {
|
|
||||||
return Err(AssetError::ChecksumMismatch {
|
|
||||||
path: path.to_path_buf(),
|
|
||||||
expected: expected_clean.to_string(),
|
|
||||||
actual,
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn format_checksum(checksum: &str, algo: ChecksumAlgo) -> String {
|
|
||||||
format!("{}:{}", algo.name(), checksum)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(test)]
|
|
||||||
mod tests {
|
|
||||||
use super::*;
|
|
||||||
use std::io::Write;
|
|
||||||
use tempfile::NamedTempFile;
|
|
||||||
|
|
||||||
async fn create_temp_file(content: &[u8]) -> NamedTempFile {
|
|
||||||
let mut file = NamedTempFile::new().unwrap();
|
|
||||||
file.write_all(content).unwrap();
|
|
||||||
file.flush().unwrap();
|
|
||||||
file
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_checksum_blake3() {
|
|
||||||
let file = create_temp_file(b"hello world").await;
|
|
||||||
let checksum = checksum_for_path(file.path(), ChecksumAlgo::BLAKE3)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
assert_eq!(
|
|
||||||
checksum,
|
|
||||||
"d74981efa70a0c880b8d8c1985d075dbcbf679b99a5f9914e5aaf96b831a9e24"
|
|
||||||
);
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_verify_checksum_success() {
|
|
||||||
let file = create_temp_file(b"hello world").await;
|
|
||||||
let checksum = checksum_for_path(file.path(), ChecksumAlgo::BLAKE3)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let result = verify_checksum(file.path(), &checksum, ChecksumAlgo::BLAKE3).await;
|
|
||||||
assert!(result.is_ok());
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_verify_checksum_failure() {
|
|
||||||
let file = create_temp_file(b"hello world").await;
|
|
||||||
let result = verify_checksum(
|
|
||||||
file.path(),
|
|
||||||
"blake3:0000000000000000000000000000000000000000000000000000000000000000",
|
|
||||||
ChecksumAlgo::BLAKE3,
|
|
||||||
)
|
|
||||||
.await;
|
|
||||||
assert!(matches!(result, Err(AssetError::ChecksumMismatch { .. })));
|
|
||||||
}
|
|
||||||
|
|
||||||
#[tokio::test]
|
|
||||||
async fn test_checksum_with_prefix() {
|
|
||||||
let file = create_temp_file(b"hello world").await;
|
|
||||||
let checksum = checksum_for_path(file.path(), ChecksumAlgo::BLAKE3)
|
|
||||||
.await
|
|
||||||
.unwrap();
|
|
||||||
let formatted = format_checksum(&checksum, ChecksumAlgo::BLAKE3);
|
|
||||||
assert!(formatted.starts_with("blake3:"));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,14 +0,0 @@
|
|||||||
pub mod asset;
|
|
||||||
pub mod errors;
|
|
||||||
pub mod hash;
|
|
||||||
pub mod store;
|
|
||||||
|
|
||||||
pub use asset::{Asset, LocalCache, StoredAsset};
|
|
||||||
pub use errors::AssetError;
|
|
||||||
pub use hash::{ChecksumAlgo, checksum_for_path, checksum_for_path_with_progress, verify_checksum};
|
|
||||||
pub use store::AssetStore;
|
|
||||||
|
|
||||||
#[cfg(feature = "s3")]
|
|
||||||
pub use store::{S3Config, S3Store};
|
|
||||||
|
|
||||||
pub use store::local::LocalStore;
|
|
||||||
@@ -1,137 +0,0 @@
|
|||||||
use crate::asset::{Asset, LocalCache};
|
|
||||||
use crate::errors::AssetError;
|
|
||||||
use crate::store::AssetStore;
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use std::path::PathBuf;
|
|
||||||
use url::Url;
|
|
||||||
|
|
||||||
#[cfg(feature = "reqwest")]
|
|
||||||
use crate::hash::verify_checksum;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct LocalStore {
|
|
||||||
base_dir: PathBuf,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl LocalStore {
|
|
||||||
pub fn new(base_dir: PathBuf) -> Self {
|
|
||||||
Self { base_dir }
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn with_cache(cache: LocalCache) -> Self {
|
|
||||||
Self {
|
|
||||||
base_dir: cache.base_dir.clone(),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn base_dir(&self) -> &PathBuf {
|
|
||||||
&self.base_dir
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for LocalStore {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self::new(LocalCache::default().base_dir)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl AssetStore for LocalStore {
|
|
||||||
#[cfg(feature = "reqwest")]
|
|
||||||
async fn fetch(
|
|
||||||
&self,
|
|
||||||
asset: &Asset,
|
|
||||||
cache: &LocalCache,
|
|
||||||
progress: Option<Box<dyn Fn(u64, Option<u64>) + Send>>,
|
|
||||||
) -> Result<PathBuf, AssetError> {
|
|
||||||
use futures_util::StreamExt;
|
|
||||||
|
|
||||||
let dest_path = cache.path_for(asset);
|
|
||||||
|
|
||||||
if dest_path.exists() {
|
|
||||||
let verification =
|
|
||||||
verify_checksum(&dest_path, &asset.checksum, asset.checksum_algo.clone()).await;
|
|
||||||
if verification.is_ok() {
|
|
||||||
log::debug!("Asset already cached at {:?}", dest_path);
|
|
||||||
return Ok(dest_path);
|
|
||||||
} else {
|
|
||||||
log::warn!("Cached file failed checksum verification, re-downloading");
|
|
||||||
tokio::fs::remove_file(&dest_path)
|
|
||||||
.await
|
|
||||||
.map_err(|e| AssetError::IoError(e))?;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cache.ensure_dir(asset).await?;
|
|
||||||
|
|
||||||
log::info!("Downloading asset from {}", asset.url);
|
|
||||||
let client = reqwest::Client::new();
|
|
||||||
let response = client
|
|
||||||
.get(asset.url.as_str())
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.map_err(|e| AssetError::DownloadFailed(e.to_string()))?;
|
|
||||||
|
|
||||||
if !response.status().is_success() {
|
|
||||||
return Err(AssetError::DownloadFailed(format!(
|
|
||||||
"HTTP {}: {}",
|
|
||||||
response.status(),
|
|
||||||
asset.url
|
|
||||||
)));
|
|
||||||
}
|
|
||||||
|
|
||||||
let total_size = response.content_length();
|
|
||||||
|
|
||||||
let mut file = tokio::fs::File::create(&dest_path)
|
|
||||||
.await
|
|
||||||
.map_err(|e| AssetError::IoError(e))?;
|
|
||||||
|
|
||||||
let mut stream = response.bytes_stream();
|
|
||||||
let mut downloaded: u64 = 0;
|
|
||||||
|
|
||||||
while let Some(chunk_result) = stream.next().await {
|
|
||||||
let chunk = chunk_result.map_err(|e| AssetError::DownloadFailed(e.to_string()))?;
|
|
||||||
tokio::io::AsyncWriteExt::write_all(&mut file, &chunk)
|
|
||||||
.await
|
|
||||||
.map_err(|e| AssetError::IoError(e))?;
|
|
||||||
downloaded += chunk.len() as u64;
|
|
||||||
if let Some(ref p) = progress {
|
|
||||||
p(downloaded, total_size);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tokio::io::AsyncWriteExt::flush(&mut file)
|
|
||||||
.await
|
|
||||||
.map_err(|e| AssetError::IoError(e))?;
|
|
||||||
|
|
||||||
drop(file);
|
|
||||||
|
|
||||||
verify_checksum(&dest_path, &asset.checksum, asset.checksum_algo.clone()).await?;
|
|
||||||
|
|
||||||
log::info!("Asset downloaded and verified: {:?}", dest_path);
|
|
||||||
Ok(dest_path)
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(not(feature = "reqwest"))]
|
|
||||||
async fn fetch(
|
|
||||||
&self,
|
|
||||||
_asset: &Asset,
|
|
||||||
_cache: &LocalCache,
|
|
||||||
_progress: Option<Box<dyn Fn(u64, Option<u64>) + Send>>,
|
|
||||||
) -> Result<PathBuf, AssetError> {
|
|
||||||
Err(AssetError::DownloadFailed(
|
|
||||||
"HTTP downloads not available. Enable the 'reqwest' feature.".to_string(),
|
|
||||||
))
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn exists(&self, key: &str) -> Result<bool, AssetError> {
|
|
||||||
let path = self.base_dir.join(key);
|
|
||||||
Ok(path.exists())
|
|
||||||
}
|
|
||||||
|
|
||||||
fn url_for(&self, key: &str) -> Result<Url, AssetError> {
|
|
||||||
let path = self.base_dir.join(key);
|
|
||||||
Url::from_file_path(&path)
|
|
||||||
.map_err(|_| AssetError::StoreError("Could not convert path to file URL".to_string()))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -1,27 +0,0 @@
|
|||||||
use crate::asset::{Asset, LocalCache};
|
|
||||||
use crate::errors::AssetError;
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use std::path::PathBuf;
|
|
||||||
use url::Url;
|
|
||||||
|
|
||||||
pub mod local;
|
|
||||||
|
|
||||||
#[cfg(feature = "s3")]
|
|
||||||
pub mod s3;
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
pub trait AssetStore: Send + Sync {
|
|
||||||
async fn fetch(
|
|
||||||
&self,
|
|
||||||
asset: &Asset,
|
|
||||||
cache: &LocalCache,
|
|
||||||
progress: Option<Box<dyn Fn(u64, Option<u64>) + Send>>,
|
|
||||||
) -> Result<PathBuf, AssetError>;
|
|
||||||
|
|
||||||
async fn exists(&self, key: &str) -> Result<bool, AssetError>;
|
|
||||||
|
|
||||||
fn url_for(&self, key: &str) -> Result<Url, AssetError>;
|
|
||||||
}
|
|
||||||
|
|
||||||
#[cfg(feature = "s3")]
|
|
||||||
pub use s3::{S3Config, S3Store};
|
|
||||||
@@ -1,235 +0,0 @@
|
|||||||
use crate::asset::StoredAsset;
|
|
||||||
use crate::errors::AssetError;
|
|
||||||
use crate::hash::ChecksumAlgo;
|
|
||||||
use async_trait::async_trait;
|
|
||||||
use aws_sdk_s3::Client as S3Client;
|
|
||||||
use aws_sdk_s3::primitives::ByteStream;
|
|
||||||
use aws_sdk_s3::types::ObjectCannedAcl;
|
|
||||||
use std::path::Path;
|
|
||||||
use url::Url;
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct S3Config {
|
|
||||||
pub endpoint: Option<String>,
|
|
||||||
pub bucket: String,
|
|
||||||
pub region: String,
|
|
||||||
pub access_key_id: Option<String>,
|
|
||||||
pub secret_access_key: Option<String>,
|
|
||||||
pub public_read: bool,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Default for S3Config {
|
|
||||||
fn default() -> Self {
|
|
||||||
Self {
|
|
||||||
endpoint: None,
|
|
||||||
bucket: String::new(),
|
|
||||||
region: String::from("us-east-1"),
|
|
||||||
access_key_id: None,
|
|
||||||
secret_access_key: None,
|
|
||||||
public_read: true,
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Clone)]
|
|
||||||
pub struct S3Store {
|
|
||||||
client: S3Client,
|
|
||||||
config: S3Config,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl S3Store {
|
|
||||||
pub async fn new(config: S3Config) -> Result<Self, AssetError> {
|
|
||||||
let mut cfg_builder = aws_config::defaults(aws_config::BehaviorVersion::latest());
|
|
||||||
|
|
||||||
if let Some(ref endpoint) = config.endpoint {
|
|
||||||
cfg_builder = cfg_builder.endpoint_url(endpoint);
|
|
||||||
}
|
|
||||||
|
|
||||||
let cfg = cfg_builder.load().await;
|
|
||||||
let client = S3Client::new(&cfg);
|
|
||||||
|
|
||||||
Ok(Self { client, config })
|
|
||||||
}
|
|
||||||
|
|
||||||
pub fn config(&self) -> &S3Config {
|
|
||||||
&self.config
|
|
||||||
}
|
|
||||||
|
|
||||||
fn public_url(&self, key: &str) -> Result<Url, AssetError> {
|
|
||||||
let url_str = if let Some(ref endpoint) = self.config.endpoint {
|
|
||||||
format!(
|
|
||||||
"{}/{}/{}",
|
|
||||||
endpoint.trim_end_matches('/'),
|
|
||||||
self.config.bucket,
|
|
||||||
key
|
|
||||||
)
|
|
||||||
} else {
|
|
||||||
format!(
|
|
||||||
"https://{}.s3.{}.amazonaws.com/{}",
|
|
||||||
self.config.bucket, self.config.region, key
|
|
||||||
)
|
|
||||||
};
|
|
||||||
Url::parse(&url_str).map_err(|e| AssetError::S3Error(e.to_string()))
|
|
||||||
}
|
|
||||||
|
|
||||||
pub async fn store(
|
|
||||||
&self,
|
|
||||||
source: &Path,
|
|
||||||
key: &str,
|
|
||||||
content_type: Option<&str>,
|
|
||||||
) -> Result<StoredAsset, AssetError> {
|
|
||||||
let metadata = tokio::fs::metadata(source)
|
|
||||||
.await
|
|
||||||
.map_err(|e| AssetError::IoError(e))?;
|
|
||||||
let size = metadata.len();
|
|
||||||
|
|
||||||
let checksum = crate::checksum_for_path(source, ChecksumAlgo::default())
|
|
||||||
.await
|
|
||||||
.map_err(|e| AssetError::StoreError(e.to_string()))?;
|
|
||||||
|
|
||||||
let body = ByteStream::from_path(source).await.map_err(|e| {
|
|
||||||
AssetError::IoError(std::io::Error::new(
|
|
||||||
std::io::ErrorKind::Other,
|
|
||||||
e.to_string(),
|
|
||||||
))
|
|
||||||
})?;
|
|
||||||
|
|
||||||
let mut put_builder = self
|
|
||||||
.client
|
|
||||||
.put_object()
|
|
||||||
.bucket(&self.config.bucket)
|
|
||||||
.key(key)
|
|
||||||
.body(body)
|
|
||||||
.content_length(size as i64)
|
|
||||||
.metadata("checksum", &checksum);
|
|
||||||
|
|
||||||
if self.config.public_read {
|
|
||||||
put_builder = put_builder.acl(ObjectCannedAcl::PublicRead);
|
|
||||||
}
|
|
||||||
|
|
||||||
if let Some(ct) = content_type {
|
|
||||||
put_builder = put_builder.content_type(ct);
|
|
||||||
}
|
|
||||||
|
|
||||||
put_builder
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.map_err(|e| AssetError::S3Error(e.to_string()))?;
|
|
||||||
|
|
||||||
Ok(StoredAsset {
|
|
||||||
url: self.public_url(key)?,
|
|
||||||
checksum,
|
|
||||||
checksum_algo: ChecksumAlgo::default(),
|
|
||||||
size,
|
|
||||||
key: key.to_string(),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
use crate::store::AssetStore;
|
|
||||||
use crate::{Asset, LocalCache};
|
|
||||||
|
|
||||||
#[async_trait]
|
|
||||||
impl AssetStore for S3Store {
|
|
||||||
async fn fetch(
|
|
||||||
&self,
|
|
||||||
asset: &Asset,
|
|
||||||
cache: &LocalCache,
|
|
||||||
progress: Option<Box<dyn Fn(u64, Option<u64>) + Send>>,
|
|
||||||
) -> Result<std::path::PathBuf, AssetError> {
|
|
||||||
let dest_path = cache.path_for(asset);
|
|
||||||
|
|
||||||
if dest_path.exists() {
|
|
||||||
let verification =
|
|
||||||
crate::verify_checksum(&dest_path, &asset.checksum, asset.checksum_algo.clone())
|
|
||||||
.await;
|
|
||||||
if verification.is_ok() {
|
|
||||||
log::debug!("Asset already cached at {:?}", dest_path);
|
|
||||||
return Ok(dest_path);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
cache.ensure_dir(asset).await?;
|
|
||||||
|
|
||||||
log::info!(
|
|
||||||
"Downloading asset from s3://{}/{}",
|
|
||||||
self.config.bucket,
|
|
||||||
asset.url
|
|
||||||
);
|
|
||||||
|
|
||||||
let key = extract_s3_key(&asset.url, &self.config.bucket)?;
|
|
||||||
let obj = self
|
|
||||||
.client
|
|
||||||
.get_object()
|
|
||||||
.bucket(&self.config.bucket)
|
|
||||||
.key(&key)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
.map_err(|e| AssetError::S3Error(e.to_string()))?;
|
|
||||||
|
|
||||||
let total_size = obj.content_length.unwrap_or(0) as u64;
|
|
||||||
let mut file = tokio::fs::File::create(&dest_path)
|
|
||||||
.await
|
|
||||||
.map_err(|e| AssetError::IoError(e))?;
|
|
||||||
|
|
||||||
let mut stream = obj.body;
|
|
||||||
let mut downloaded: u64 = 0;
|
|
||||||
|
|
||||||
while let Some(chunk_result) = stream.next().await {
|
|
||||||
let chunk = chunk_result.map_err(|e| AssetError::S3Error(e.to_string()))?;
|
|
||||||
tokio::io::AsyncWriteExt::write_all(&mut file, &chunk)
|
|
||||||
.await
|
|
||||||
.map_err(|e| AssetError::IoError(e))?;
|
|
||||||
downloaded += chunk.len() as u64;
|
|
||||||
if let Some(ref p) = progress {
|
|
||||||
p(downloaded, Some(total_size));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
tokio::io::AsyncWriteExt::flush(&mut file)
|
|
||||||
.await
|
|
||||||
.map_err(|e| AssetError::IoError(e))?;
|
|
||||||
|
|
||||||
drop(file);
|
|
||||||
|
|
||||||
crate::verify_checksum(&dest_path, &asset.checksum, asset.checksum_algo.clone()).await?;
|
|
||||||
|
|
||||||
Ok(dest_path)
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn exists(&self, key: &str) -> Result<bool, AssetError> {
|
|
||||||
match self
|
|
||||||
.client
|
|
||||||
.head_object()
|
|
||||||
.bucket(&self.config.bucket)
|
|
||||||
.key(key)
|
|
||||||
.send()
|
|
||||||
.await
|
|
||||||
{
|
|
||||||
Ok(_) => Ok(true),
|
|
||||||
Err(e) => {
|
|
||||||
let err_str = e.to_string();
|
|
||||||
if err_str.contains("NoSuchKey") || err_str.contains("NotFound") {
|
|
||||||
Ok(false)
|
|
||||||
} else {
|
|
||||||
Err(AssetError::S3Error(err_str))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn url_for(&self, key: &str) -> Result<Url, AssetError> {
|
|
||||||
self.public_url(key)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn extract_s3_key(url: &Url, bucket: &str) -> Result<String, AssetError> {
|
|
||||||
let path = url.path().trim_start_matches('/');
|
|
||||||
if let Some(stripped) = path.strip_prefix(&format!("{}/", bucket)) {
|
|
||||||
Ok(stripped.to_string())
|
|
||||||
} else if path == bucket {
|
|
||||||
Ok(String::new())
|
|
||||||
} else {
|
|
||||||
Ok(path.to_string())
|
|
||||||
}
|
|
||||||
}
|
|
||||||
@@ -183,7 +183,7 @@ impl OpenbaoSecretStore {
|
|||||||
}
|
}
|
||||||
#[cfg(not(unix))]
|
#[cfg(not(unix))]
|
||||||
{
|
{
|
||||||
fs::write(path, token)?;
|
fs::write(path, serde_json::to_string(token)?.as_bytes())?;
|
||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user