Compare commits

...

21 Commits

Author SHA1 Message Date
a9fe4ab267 fix: cargo fmt
All checks were successful
Run Check Script / check (pull_request) Successful in 1m0s
2025-08-25 13:33:36 -04:00
65cc9befeb mod.rs
Some checks failed
Run Check Script / check (pull_request) Failing after 20s
2025-08-25 13:31:39 -04:00
d456a1f9ee feat: score to validate whether the ceph cluster is healthy 2025-08-25 13:30:32 -04:00
d36c574590 Merge pull request 'feat/inventory_agent' (#119) from feat/inventory_agent into master
Some checks failed
Run Check Script / check (push) Failing after 38s
Compile and package harmony_composer / package_harmony_composer (push) Successful in 5m48s
Reviewed-on: #119
2025-08-22 01:55:52 +00:00
bfca9cf163 Merge pull request 'feat/ceph-osd-score' (#116) from feat/ceph-osd-score into master
Some checks failed
Run Check Script / check (push) Failing after 36s
Compile and package harmony_composer / package_harmony_composer (push) Successful in 15m5s
Reviewed-on: #116
Reviewed-by: johnride <jg@nationtech.io>
2025-08-20 18:19:42 +00:00
cd3ea6fc10 fix: added check to ensure that rook-ceph-tools is available in the designated namespace
All checks were successful
Run Check Script / check (pull_request) Successful in 1m16s
2025-08-20 12:54:19 -04:00
89eb88d10e feat: socre to remove an osd from the ceph osd tree using K8sClient to interact with rook-ceph-toolbox pod 2025-08-20 12:09:55 -04:00
72fb05b5cc fix(inventory_agent) : Agent now retreives correct dmidecode fields, fixed uuid generation which is unacceptable, fixed storage drive parsing, much better error handling, much more strict behavior which also leads to more complete output as missing fields will raise errors unless explicitely optional 2025-08-19 17:56:06 -04:00
6685b05cc5 wip(inventory_agent): Refactoring for better error handling in progress 2025-08-19 17:05:23 -04:00
07116eb8a6 Merge pull request 'feat: Harmony inventory agent crate that exposes an endpoint listing the host hardware. Has to be reviewed, generated 99% by GLM-4.5' (#115) from feat/inventory_agent into master
Some checks failed
Run Check Script / check (push) Failing after 27s
Compile and package harmony_composer / package_harmony_composer (push) Successful in 5m34s
Reviewed-on: #115
2025-08-19 16:58:00 +00:00
3f34f868eb Merge remote-tracking branch 'origin/master' into feat/inventory_agent
Some checks failed
Run Check Script / check (pull_request) Failing after 29s
2025-08-19 12:56:10 -04:00
67b5c2df07 Merge pull request 'feat: Add iobench project and python dashboard' (#112) from feat/iobench into master
All checks were successful
Run Check Script / check (push) Successful in 1m11s
Compile and package harmony_composer / package_harmony_composer (push) Successful in 5m41s
Reviewed-on: #112
2025-08-19 16:24:31 +00:00
1eaf63417b Merge pull request 'feat/secrets' (#111) from feat/secrets into master
Some checks failed
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
Run Check Script / check (push) Has been cancelled
Reviewed-on: #111

This pull request introduces a comprehensive and ergonomic secret management system via a new harmony-secret crate.
What's Done

    New harmony-secret Crate:
        A new crate dedicated to secret management, providing a clean, static API: SecretManager::get::<MySecret>() and SecretManager::set(&my_secret).
        A #[derive(Secret)] procedural macro that automatically uses the struct's name as the secret key, simplifying usage.
        An async SecretStore trait to support various backend implementations.

    Two Secret Store Implementations:
        LocalFileSecretStore: A simple file-based store that saves secrets as JSON in the user's data directory. Ideal for local development and testing.
        InfisicalSecretStore: A production-ready implementation that integrates with Infisical for centralized, secure secret management.

    Configuration via Environment Variables:
        The secret store is selected at runtime via the HARMONY_SECRET_STORE environment variable (file or infisical).
        Infisical integration is configured through HARMONY_SECRET_INFISICAL_* variables.

What's Not Done (Future Work)

    Automated Infisical Setup: The initial configuration for the Infisical backend is currently manual. Developers must create a project and a Universal Auth identity in Infisical and set the corresponding environment variables to run tests or use the backend. The new test_harmony_secret_infisical.sh script serves as a clear example of the required variables.

This new secrets module provides a solid and secure foundation for managing credentials for components like OPNsense, Kubernetes, and other infrastructure services going forward. Even with the manual first-time setup for Infisical, this architecture is robust enough to serve our needs for the foreseeable future.
2025-08-19 16:23:45 +00:00
5e7803d2ba chore(iobench-dash): Delete older revisions and rename to iobench-dash.py for clarity
All checks were successful
Run Check Script / check (pull_request) Successful in 1m3s
2025-08-19 12:21:42 -04:00
9a610661c7 chore: Add description and license fields to Cargo.toml to allow publishing the crate
All checks were successful
Run Check Script / check (pull_request) Successful in 1m1s
2025-08-19 12:12:41 -04:00
70a65ed5d0 Merge remote-tracking branch 'origin/master' into feat/secrets
All checks were successful
Run Check Script / check (pull_request) Successful in 1m9s
2025-08-19 12:00:19 -04:00
26e8e386b9 feat: Secret module works with infisical and local file storage backends
All checks were successful
Run Check Script / check (pull_request) Successful in 1m9s
2025-08-19 11:59:21 -04:00
d1a274b705 fix: checks deployment status ready replicas rather than pod name since the pod name is not necessarily matching the deployment name and often has a random generated number in it 2025-08-15 15:44:06 -04:00
b43ca7c740 feat: score for preparing rook ceph cluster to remove drive based on rook-ceph-osd deployment name added functions to K8sclient to be able to scale deployment to a desired replicaset number and get pod based on name and namespace 2025-08-15 14:51:16 -04:00
2a6a233fb2 feat: WIP add secrets module and macro crate 2025-08-15 14:40:39 -04:00
fd8f643a8f feat: Add iobench project and python dashboard
All checks were successful
Run Check Script / check (pull_request) Successful in 1m3s
2025-08-14 10:37:30 -04:00
32 changed files with 2590 additions and 346 deletions

242
Cargo.lock generated
View File

@@ -105,7 +105,7 @@ dependencies = [
"futures-core",
"futures-util",
"mio 1.0.4",
"socket2",
"socket2 0.5.10",
"tokio",
"tracing",
]
@@ -167,7 +167,7 @@ dependencies = [
"serde_json",
"serde_urlencoded",
"smallvec",
"socket2",
"socket2 0.5.10",
"time",
"tracing",
"url",
@@ -576,7 +576,7 @@ dependencies = [
"serde_json",
"serde_repr",
"serde_urlencoded",
"thiserror 2.0.12",
"thiserror 2.0.14",
"tokio",
"tokio-util",
"tower-service",
@@ -701,7 +701,7 @@ dependencies = [
"semver",
"serde",
"serde_json",
"thiserror 2.0.12",
"thiserror 2.0.14",
]
[[package]]
@@ -745,6 +745,12 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268"
[[package]]
name = "cfg_aliases"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "chacha20"
version = "0.9.1"
@@ -1950,9 +1956,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"r-efi",
"wasi 0.14.2+wasi-0.2.4",
"wasm-bindgen",
]
[[package]]
@@ -2050,6 +2058,7 @@ dependencies = [
"env_logger",
"fqdn",
"futures-util",
"harmony-secret-derive",
"harmony_macros",
"harmony_types",
"helm-wrapper-rs",
@@ -2090,6 +2099,35 @@ dependencies = [
"uuid",
]
[[package]]
name = "harmony-secret"
version = "0.1.0"
dependencies = [
"async-trait",
"directories",
"harmony-secret-derive",
"http 1.3.1",
"infisical",
"lazy_static",
"log",
"pretty_assertions",
"serde",
"serde_json",
"tempfile",
"thiserror 2.0.14",
"tokio",
]
[[package]]
name = "harmony-secret-derive"
version = "0.1.0"
dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "harmony_cli"
version = "0.1.0"
@@ -2140,7 +2178,6 @@ dependencies = [
"serde",
"serde_json",
"sysinfo",
"uuid",
]
[[package]]
@@ -2237,7 +2274,7 @@ dependencies = [
"non-blank-string-rs",
"serde",
"serde_json",
"thiserror 2.0.12",
"thiserror 2.0.14",
]
[[package]]
@@ -2405,7 +2442,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"socket2 0.5.10",
"tokio",
"tower-service",
"tracing",
@@ -2484,6 +2521,7 @@ dependencies = [
"tokio",
"tokio-rustls",
"tower-service",
"webpki-roots",
]
[[package]]
@@ -2546,7 +2584,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
"socket2",
"socket2 0.5.10",
"system-configuration 0.6.1",
"tokio",
"tower-service",
@@ -2769,6 +2807,21 @@ version = "2.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd"
[[package]]
name = "infisical"
version = "0.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d97c33b08e22b2f7b9f87a8fc06a7d247442db7bf216ffc6661a74ed8aea658"
dependencies = [
"base64 0.22.1",
"reqwest 0.12.20",
"serde",
"serde_json",
"thiserror 1.0.69",
"tokio",
"url",
]
[[package]]
name = "inout"
version = "0.1.4"
@@ -2809,6 +2862,17 @@ dependencies = [
"syn",
]
[[package]]
name = "io-uring"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4"
dependencies = [
"bitflags 2.9.1",
"cfg-if",
"libc",
]
[[package]]
name = "ipnet"
version = "2.11.0"
@@ -2912,7 +2976,7 @@ dependencies = [
"pest_derive",
"regex",
"serde_json",
"thiserror 2.0.12",
"thiserror 2.0.14",
]
[[package]]
@@ -3013,7 +3077,7 @@ dependencies = [
"serde",
"serde_json",
"serde_yaml",
"thiserror 2.0.12",
"thiserror 2.0.14",
"tokio",
"tokio-tungstenite",
"tokio-util",
@@ -3038,7 +3102,7 @@ dependencies = [
"serde",
"serde-value",
"serde_json",
"thiserror 2.0.12",
"thiserror 2.0.14",
]
[[package]]
@@ -3076,7 +3140,7 @@ dependencies = [
"pin-project",
"serde",
"serde_json",
"thiserror 2.0.12",
"thiserror 2.0.14",
"tokio",
"tokio-util",
"tracing",
@@ -3211,6 +3275,12 @@ dependencies = [
"hashbrown 0.15.4",
]
[[package]]
name = "lru-slab"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
[[package]]
name = "md5"
version = "0.7.0"
@@ -3522,7 +3592,7 @@ dependencies = [
"pretty_assertions",
"rand 0.8.5",
"serde",
"thiserror 1.0.69",
"thiserror 2.0.14",
"tokio",
"uuid",
"xml-rs",
@@ -3689,7 +3759,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1db05f56d34358a8b1066f67cbb203ee3e7ed2ba674a6263a1d5ec6db2204323"
dependencies = [
"memchr",
"thiserror 2.0.12",
"thiserror 2.0.14",
"ucd-trie",
]
@@ -3910,6 +3980,15 @@ dependencies = [
"elliptic-curve",
]
[[package]]
name = "proc-macro-crate"
version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edce586971a4dfaa28950c6f18ed55e0406c1ab88bbce2c6f6293a7aaba73d35"
dependencies = [
"toml_edit",
]
[[package]]
name = "proc-macro2"
version = "1.0.95"
@@ -3925,6 +4004,61 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9e1dcb320d6839f6edb64f7a4a59d39b30480d4d1765b56873f7c858538a5fe"
[[package]]
name = "quinn"
version = "0.11.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "626214629cda6781b6dc1d316ba307189c85ba657213ce642d9c77670f8202c8"
dependencies = [
"bytes",
"cfg_aliases",
"pin-project-lite",
"quinn-proto",
"quinn-udp",
"rustc-hash",
"rustls",
"socket2 0.5.10",
"thiserror 2.0.14",
"tokio",
"tracing",
"web-time",
]
[[package]]
name = "quinn-proto"
version = "0.11.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49df843a9161c85bb8aae55f101bc0bac8bcafd637a620d9122fd7e0b2f7422e"
dependencies = [
"bytes",
"getrandom 0.3.3",
"lru-slab",
"rand 0.9.1",
"ring",
"rustc-hash",
"rustls",
"rustls-pki-types",
"slab",
"thiserror 2.0.14",
"tinyvec",
"tracing",
"web-time",
]
[[package]]
name = "quinn-udp"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcebb1209ee276352ef14ff8732e24cc2b02bbac986cd74a4c81bcb2f9881970"
dependencies = [
"cfg_aliases",
"libc",
"once_cell",
"socket2 0.5.10",
"tracing",
"windows-sys 0.59.0",
]
[[package]]
name = "quote"
version = "1.0.40"
@@ -4063,7 +4197,7 @@ checksum = "dd6f9d3d47bdd2ad6945c5015a226ec6155d0bcdfd8f7cd29f86b71f8de99d2b"
dependencies = [
"getrandom 0.2.16",
"libredox",
"thiserror 2.0.12",
"thiserror 2.0.14",
]
[[package]]
@@ -4170,6 +4304,7 @@ dependencies = [
"base64 0.22.1",
"bytes",
"encoding_rs",
"futures-channel",
"futures-core",
"futures-util",
"h2 0.4.10",
@@ -4186,6 +4321,8 @@ dependencies = [
"native-tls",
"percent-encoding",
"pin-project-lite",
"quinn",
"rustls",
"rustls-pki-types",
"serde",
"serde_json",
@@ -4193,6 +4330,7 @@ dependencies = [
"sync_wrapper 1.0.2",
"tokio",
"tokio-native-tls",
"tokio-rustls",
"tokio-util",
"tower",
"tower-http",
@@ -4202,6 +4340,7 @@ dependencies = [
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots",
]
[[package]]
@@ -4365,7 +4504,7 @@ dependencies = [
"flurry",
"log",
"serde",
"thiserror 2.0.12",
"thiserror 2.0.14",
"tokio",
"tokio-util",
]
@@ -4391,6 +4530,12 @@ version = "0.1.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f"
[[package]]
name = "rustc-hash"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
name = "rustc_version"
version = "0.4.1"
@@ -4490,6 +4635,7 @@ version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79"
dependencies = [
"web-time",
"zeroize",
]
@@ -4928,7 +5074,7 @@ checksum = "297f631f50729c8c99b84667867963997ec0b50f32b2a7dbcab828ef0541e8bb"
dependencies = [
"num-bigint",
"num-traits",
"thiserror 2.0.12",
"thiserror 2.0.14",
"time",
]
@@ -4975,6 +5121,16 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "socket2"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807"
dependencies = [
"libc",
"windows-sys 0.59.0",
]
[[package]]
name = "spin"
version = "0.9.8"
@@ -5112,9 +5268,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "2.0.104"
version = "2.0.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40"
checksum = "7bc3fcb250e53458e712715cf74285c1f889686520d79294a9ef3bd7aa1fc619"
dependencies = [
"proc-macro2",
"quote",
@@ -5263,11 +5419,11 @@ dependencies = [
[[package]]
name = "thiserror"
version = "2.0.12"
version = "2.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708"
checksum = "0b0949c3a6c842cbde3f1686d6eea5a010516deb7085f79db747562d4102f41e"
dependencies = [
"thiserror-impl 2.0.12",
"thiserror-impl 2.0.14",
]
[[package]]
@@ -5283,9 +5439,9 @@ dependencies = [
[[package]]
name = "thiserror-impl"
version = "2.0.12"
version = "2.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d"
checksum = "cc5b44b4ab9c2fdd0e0512e6bece8388e214c0749f5862b114cc5b7a25daf227"
dependencies = [
"proc-macro2",
"quote",
@@ -5352,21 +5508,38 @@ dependencies = [
]
[[package]]
name = "tokio"
version = "1.45.1"
name = "tinyvec"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779"
checksum = "09b3661f17e86524eccd4371ab0429194e0d7c008abb45f7a7495b1719463c71"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.47.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038"
dependencies = [
"backtrace",
"bytes",
"io-uring",
"libc",
"mio 1.0.4",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"slab",
"socket2 0.6.0",
"tokio-macros",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -5616,7 +5789,7 @@ dependencies = [
"log",
"rand 0.9.1",
"sha1",
"thiserror 2.0.12",
"thiserror 2.0.14",
"utf-8",
]
@@ -5917,6 +6090,15 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki-roots"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e8983c3ab33d6fb807cfcdad2491c4ea8cbc8ed839181c7dfd9c67c83e261b2"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "winapi"
version = "0.3.9"

View File

@@ -13,6 +13,8 @@ members = [
"k3d",
"harmony_composer",
"harmony_inventory_agent",
"harmony_secret_derive",
"harmony_secret",
]
[workspace.package]
@@ -54,6 +56,7 @@ chrono = "0.4"
similar = "2"
uuid = { version = "1.11", features = ["v4", "fast-rng", "macro-diagnostics"] }
pretty_assertions = "1.4.1"
tempfile = "3.20.0"
bollard = "0.19.1"
base64 = "0.22.1"
tar = "0.4.44"

View File

@@ -0,0 +1,11 @@
[package]
name = "example_validate_ceph_cluster_health"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { version = "0.1.0", path = "../../harmony" }
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
tokio.workspace = true

View File

@@ -0,0 +1,18 @@
use harmony::{
inventory::Inventory,
modules::storage::ceph::ceph_validate_health_score::CephVerifyClusterHealth,
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let ceph_health_score = CephVerifyClusterHealth {
rook_ceph_namespace: "rook-ceph".to_string(),
};
let topology = K8sAnywhereTopology::from_env();
let inventory = Inventory::autoload();
harmony_cli::run(inventory, topology, vec![Box::new(ceph_health_score)], None)
.await
.unwrap();
}

View File

@@ -38,8 +38,8 @@ serde-value.workspace = true
helm-wrapper-rs = "0.4.0"
non-blank-string-rs = "1.0.4"
k3d-rs = { path = "../k3d" }
directories = "6.0.0"
lazy_static = "1.5.0"
directories.workspace = true
lazy_static.workspace = true
dockerfile_builder = "0.1.5"
temp-file = "0.1.9"
convert_case.workspace = true
@@ -59,7 +59,7 @@ similar.workspace = true
futures-util = "0.3.31"
tokio-util = "0.7.15"
strum = { version = "0.27.1", features = ["derive"] }
tempfile = "3.20.0"
tempfile.workspace = true
serde_with = "3.14.0"
schemars = "0.8.22"
kube-derive = "1.1.0"
@@ -67,6 +67,7 @@ bollard.workspace = true
tar.workspace = true
base64.workspace = true
once_cell = "1.21.3"
harmony-secret-derive = { version = "0.1.0", path = "../harmony_secret_derive" }
[dev-dependencies]
pretty_assertions.workspace = true

BIN
harmony/harmony.rlib Normal file

Binary file not shown.

View File

@@ -32,6 +32,7 @@ pub enum InterpretName {
Lamp,
ApplicationMonitoring,
K8sPrometheusCrdAlerting,
CephClusterHealth,
}
impl std::fmt::Display for InterpretName {
@@ -58,6 +59,7 @@ impl std::fmt::Display for InterpretName {
InterpretName::Lamp => f.write_str("LAMP"),
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"),
}
}
}

View File

@@ -5,7 +5,7 @@ use k8s_openapi::{
};
use kube::{
Client, Config, Error, Resource,
api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt},
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse,
runtime::reflector::Lookup,
@@ -17,7 +17,9 @@ use kube::{
};
use log::{debug, error, trace};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::json;
use similar::TextDiff;
use tokio::io::AsyncReadExt;
#[derive(new, Clone)]
pub struct K8sClient {
@@ -51,6 +53,66 @@ impl K8sClient {
})
}
pub async fn get_deployment(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<Option<Deployment>, Error> {
let deps: Api<Deployment> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
Ok(deps.get_opt(name).await?)
}
pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result<Option<Pod>, Error> {
let pods: Api<Pod> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
Ok(pods.get_opt(name).await?)
}
pub async fn scale_deployment(
&self,
name: &str,
namespace: Option<&str>,
replicas: u32,
) -> Result<(), Error> {
let deployments: Api<Deployment> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
let patch = json!({
"spec": {
"replicas": replicas
}
});
let pp = PatchParams::default();
let scale = Patch::Apply(&patch);
deployments.patch_scale(name, &pp, &scale).await?;
Ok(())
}
pub async fn delete_deployment(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<(), Error> {
let deployments: Api<Deployment> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
let delete_params = DeleteParams::default();
deployments.delete(name, &delete_params).await?;
Ok(())
}
pub async fn wait_until_deployment_ready(
&self,
name: String,
@@ -76,6 +138,68 @@ impl K8sClient {
}
}
/// Will execute a commond in the first pod found that matches the specified label
/// '{label}={name}'
pub async fn exec_app_capture_output(
&self,
name: String,
label: String,
namespace: Option<&str>,
command: Vec<&str>,
) -> Result<String, String> {
let api: Api<Pod>;
if let Some(ns) = namespace {
api = Api::namespaced(self.client.clone(), ns);
} else {
api = Api::default_namespaced(self.client.clone());
}
let pod_list = api
.list(&ListParams::default().labels(format!("{label}={name}").as_str()))
.await
.expect("couldn't get list of pods");
let res = api
.exec(
pod_list
.items
.first()
.expect("couldn't get pod")
.name()
.expect("couldn't get pod name")
.into_owned()
.as_str(),
command,
&AttachParams::default().stdout(true).stderr(true),
)
.await;
match res {
Err(e) => Err(e.to_string()),
Ok(mut process) => {
let status = process
.take_status()
.expect("Couldn't get status")
.await
.expect("Couldn't unwrap status");
if let Some(s) = status.status {
let mut stdout_buf = String::new();
if let Some(mut stdout) = process.stdout().take() {
stdout.read_to_string(&mut stdout_buf).await;
}
debug!("Status: {} - {:?}", s, status.details);
if s == "Success" {
Ok(stdout_buf)
} else {
Err(s)
}
} else {
Err("Couldn't get inner status of pod exec".to_string())
}
}
}
}
/// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}`
pub async fn exec_app(
&self,

View File

@@ -14,5 +14,6 @@ pub mod monitoring;
pub mod okd;
pub mod opnsense;
pub mod prometheus;
pub mod storage;
pub mod tenant;
pub mod tftp;

View File

@@ -0,0 +1,419 @@
use std::{
process::Command,
sync::Arc,
time::{Duration, Instant},
};
use async_trait::async_trait;
use log::{info, warn};
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use crate::{
data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
};
#[derive(Debug, Clone, Serialize)]
pub struct CephRemoveOsd {
osd_deployment_name: String,
rook_ceph_namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
fn name(&self) -> String {
format!("CephRemoveOsdScore")
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(CephRemoveOsdInterpret {
score: self.clone(),
})
}
}
#[derive(Debug, Clone)]
pub struct CephRemoveOsdInterpret {
score: CephRemoveOsd,
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await.unwrap();
self.verify_ceph_toolbox_exists(client.clone()).await?;
self.scale_deployment(client.clone()).await?;
self.verify_deployment_scaled(client.clone()).await?;
self.delete_deployment(client.clone()).await?;
self.verify_deployment_deleted(client.clone()).await?;
let osd_id_full = self.get_ceph_osd_id().unwrap();
self.purge_ceph_osd(client.clone(), &osd_id_full).await?;
self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
.await?;
Ok(Outcome::success(format!(
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
osd_id_full, self.score.osd_deployment_name
)))
}
fn get_name(&self) -> InterpretName {
todo!()
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl CephRemoveOsdInterpret {
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self
.score
.osd_deployment_name
.split('-')
.nth(3)
.ok_or_else(|| {
InterpretError::new(format!(
"Could not parse OSD id from deployment name {}",
self.score.osd_deployment_name
))
})?;
let osd_id_full = format!("osd.{}", osd_id_numeric);
info!(
"Targeting Ceph OSD: {} (parsed from deployment {})",
osd_id_full, self.score.osd_deployment_name
);
Ok(osd_id_full)
}
pub async fn verify_ceph_toolbox_exists(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let toolbox_dep = "rook-ceph-tools".to_string();
match client
.get_deployment(&toolbox_dep, Some(&self.score.rook_ceph_namespace))
.await
{
Ok(Some(deployment)) => {
if let Some(status) = deployment.status {
let ready_count = status.ready_replicas.unwrap_or(0);
if ready_count >= 1 {
return Ok(Outcome::success(format!(
"'{}' is ready with {} replica(s).",
&toolbox_dep, ready_count
)));
} else {
return Err(InterpretError::new(
"ceph-tool-box not ready in cluster".to_string(),
));
}
} else {
Err(InterpretError::new(format!(
"failed to get deployment status {}",
&toolbox_dep
)))
}
}
Ok(None) => Err(InterpretError::new(format!(
"Deployment '{}' not found in namespace '{}'.",
&toolbox_dep, self.score.rook_ceph_namespace
))),
Err(e) => Err(InterpretError::new(format!(
"Failed to query for deployment '{}': {}",
&toolbox_dep, e
))),
}
}
pub async fn scale_deployment(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
"Scaling down OSD deployment: {}",
self.score.osd_deployment_name
);
client
.scale_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
0,
)
.await?;
Ok(Outcome::success(format!(
"Scaled down deployment {}",
self.score.osd_deployment_name
)))
}
pub async fn verify_deployment_scaled(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
)
.await?;
if let Some(deployment) = dep {
if let Some(status) = deployment.status {
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0
{
return Ok(Outcome::success(
"Deployment successfully scaled down.".to_string(),
));
}
}
}
if start.elapsed() > timeout {
return Err(InterpretError::new(format!(
"Timed out waiting for deployment {} to scale down",
self.score.osd_deployment_name
)));
}
sleep(interval).await;
}
}
fn build_timer(&self) -> (Duration, Duration, Instant) {
let timeout = Duration::from_secs(120);
let interval = Duration::from_secs(5);
let start = Instant::now();
(timeout, interval, start)
}
pub async fn delete_deployment(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
"Deleting OSD deployment: {}",
self.score.osd_deployment_name
);
client
.delete_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
)
.await?;
Ok(Outcome::success(format!(
"deployment {} deleted",
self.score.osd_deployment_name
)))
}
pub async fn verify_deployment_deleted(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
)
.await?;
if dep.is_none() {
info!(
"Deployment {} successfully deleted.",
self.score.osd_deployment_name
);
return Ok(Outcome::success(format!(
"Deployment {} deleted.",
self.score.osd_deployment_name
)));
}
if start.elapsed() > timeout {
return Err(InterpretError::new(format!(
"Timed out waiting for deployment {} to be deleted",
self.score.osd_deployment_name
)));
}
sleep(interval).await;
}
}
fn get_osd_tree(&self, json: serde_json::Value) -> Result<CephOsdTree, InterpretError> {
let nodes = json.get("nodes").ok_or_else(|| {
InterpretError::new("Missing 'nodes' field in ceph osd tree JSON".to_string())
})?;
let tree: CephOsdTree = CephOsdTree {
nodes: serde_json::from_value(nodes.clone()).map_err(|e| {
InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e))
})?,
};
Ok(tree)
}
pub async fn purge_ceph_osd(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
info!(
"Purging OSD {} from Ceph cluster and removing its auth key",
osd_id_full
);
client
.exec_app_capture_output(
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec![
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(),
format!("ceph auth del osd.{osd_id_full}").as_str(),
],
)
.await?;
Ok(Outcome::success(format!(
"osd id {} removed from osd tree",
osd_id_full
)))
}
pub async fn verify_ceph_osd_removal(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!(
"Verifying OSD {} has been removed from the Ceph tree...",
osd_id_full
);
loop {
let output = client
.exec_app_capture_output(
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec!["ceph osd tree -f json"],
)
.await?;
let tree =
self.get_osd_tree(serde_json::from_str(&output).expect("could not extract json"));
let osd_found = tree
.unwrap()
.nodes
.iter()
.any(|node| node.name == osd_id_full);
if !osd_found {
return Ok(Outcome::success(format!(
"Successfully verified that OSD {} is removed from the Ceph cluster.",
osd_id_full,
)));
}
if start.elapsed() > timeout {
return Err(InterpretError::new(format!(
"Timed out waiting for OSD {} to be removed from Ceph tree",
osd_id_full
)));
}
warn!(
"OSD {} still found in Ceph tree, retrying in {:?}...",
osd_id_full, interval
);
sleep(interval).await;
}
}
}
#[derive(Debug, Deserialize, PartialEq)]
pub struct CephOsdTree {
pub nodes: Vec<CephNode>,
}
#[derive(Debug, Deserialize, PartialEq)]
pub struct CephNode {
pub id: i32,
pub name: String,
#[serde(rename = "type")]
pub node_type: String,
pub type_id: Option<i32>,
pub children: Option<Vec<i32>>,
pub exists: Option<i32>,
pub status: Option<String>,
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
#[test]
fn test_get_osd_tree() {
let json_data = json!({
"nodes": [
{"id": 1, "name": "osd.1", "type": "osd", "primary_affinity":"1"},
{"id": 2, "name": "osd.2", "type": "osd", "crush_weight": 1.22344}
]
});
let interpret = CephRemoveOsdInterpret {
score: CephRemoveOsd {
osd_deployment_name: "osd-1".to_string(),
rook_ceph_namespace: "dummy_ns".to_string(),
},
};
let json = interpret.get_osd_tree(json_data).unwrap();
let expected = CephOsdTree {
nodes: vec![
CephNode {
id: 1,
name: "osd.1".to_string(),
node_type: "osd".to_string(),
type_id: None,
children: None,
exists: None,
status: None,
},
CephNode {
id: 2,
name: "osd.2".to_string(),
node_type: "osd".to_string(),
type_id: None,
children: None,
exists: None,
status: None,
},
],
};
assert_eq!(json, expected);
}
}

View File

@@ -0,0 +1,136 @@
use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use log::debug;
use serde::Serialize;
use tokio::time::Instant;
use crate::{
data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
};
#[derive(Clone, Debug, Serialize)]
pub struct CephVerifyClusterHealth {
pub rook_ceph_namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for CephVerifyClusterHealth {
fn name(&self) -> String {
format!("CephValidateClusterHealth")
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(CephVerifyClusterHealthInterpret {
score: self.clone(),
})
}
}
#[derive(Clone, Debug)]
pub struct CephVerifyClusterHealthInterpret {
score: CephVerifyClusterHealth,
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for CephVerifyClusterHealthInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await.unwrap();
self.verify_ceph_toolbox_exists(client.clone()).await?;
self.validate_ceph_cluster_health(client.clone()).await?;
Ok(Outcome::success("Ceph cluster healthy".to_string()))
}
fn get_name(&self) -> InterpretName {
InterpretName::CephClusterHealth
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl CephVerifyClusterHealthInterpret {
pub async fn verify_ceph_toolbox_exists(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let toolbox_dep = "rook-ceph-tools".to_string();
match client
.get_deployment(&toolbox_dep, Some(&self.score.rook_ceph_namespace))
.await
{
Ok(Some(deployment)) => {
if let Some(status) = deployment.status {
let ready_count = status.ready_replicas.unwrap_or(0);
if ready_count >= 1 {
return Ok(Outcome::success(format!(
"'{}' is ready with {} replica(s).",
&toolbox_dep, ready_count
)));
} else {
return Err(InterpretError::new(
"ceph-tool-box not ready in cluster".to_string(),
));
}
} else {
Err(InterpretError::new(format!(
"failed to get deployment status {}",
&toolbox_dep
)))
}
}
Ok(None) => Err(InterpretError::new(format!(
"Deployment '{}' not found in namespace '{}'.",
&toolbox_dep, self.score.rook_ceph_namespace
))),
Err(e) => Err(InterpretError::new(format!(
"Failed to query for deployment '{}': {}",
&toolbox_dep, e
))),
}
}
pub async fn validate_ceph_cluster_health(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
debug!("Verifying ceph cluster is in healthy state");
let health = client
.exec_app_capture_output(
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec!["sh", "-c", "ceph health"],
)
.await?;
if health.contains("HEALTH_OK") {
return Ok(Outcome::success(
"Ceph Cluster in healthy state".to_string(),
));
} else {
Err(InterpretError::new(format!(
"Ceph cluster unhealthy {}",
health
)))
}
}
}

View File

@@ -0,0 +1,2 @@
pub mod ceph_osd_replacement_score;
pub mod ceph_validate_health_score;

View File

@@ -0,0 +1 @@
pub mod ceph;

View File

@@ -10,4 +10,3 @@ serde.workspace = true
serde_json.workspace = true
log.workspace = true
env_logger.workspace = true
uuid.workspace = true

File diff suppressed because it is too large Load Diff

View File

@@ -9,8 +9,16 @@ mod hwinfo;
async fn inventory() -> impl Responder {
log::info!("Received inventory request");
let host = PhysicalHost::gather();
log::info!("Inventory data gathered successfully");
actix_web::HttpResponse::Ok().json(host)
match host {
Ok(host) => {
log::info!("Inventory data gathered successfully");
actix_web::HttpResponse::Ok().json(host)
}
Err(error) => {
log::error!("Inventory data gathering FAILED");
actix_web::HttpResponse::InternalServerError().json(error)
}
}
}
#[actix_web::main]

23
harmony_secret/Cargo.toml Normal file
View File

@@ -0,0 +1,23 @@
[package]
name = "harmony-secret"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony-secret-derive = { version = "0.1.0", path = "../harmony_secret_derive" }
serde = { version = "1.0.209", features = ["derive", "rc"] }
serde_json = "1.0.127"
thiserror.workspace = true
lazy_static.workspace = true
directories.workspace = true
log.workspace = true
infisical = "0.0.2"
tokio.workspace = true
async-trait.workspace = true
http.workspace = true
[dev-dependencies]
pretty_assertions.workspace = true
tempfile.workspace = true

View File

@@ -0,0 +1,18 @@
use lazy_static::lazy_static;
lazy_static! {
pub static ref SECRET_NAMESPACE: String =
std::env::var("HARMONY_SECRET_NAMESPACE").expect("HARMONY_SECRET_NAMESPACE environment variable is required, it should contain the name of the project you are working on to access its secrets");
pub static ref SECRET_STORE: Option<String> =
std::env::var("HARMONY_SECRET_STORE").ok();
pub static ref INFISICAL_URL: Option<String> =
std::env::var("HARMONY_SECRET_INFISICAL_URL").ok();
pub static ref INFISICAL_PROJECT_ID: Option<String> =
std::env::var("HARMONY_SECRET_INFISICAL_PROJECT_ID").ok();
pub static ref INFISICAL_ENVIRONMENT: Option<String> =
std::env::var("HARMONY_SECRET_INFISICAL_ENVIRONMENT").ok();
pub static ref INFISICAL_CLIENT_ID: Option<String> =
std::env::var("HARMONY_SECRET_INFISICAL_CLIENT_ID").ok();
pub static ref INFISICAL_CLIENT_SECRET: Option<String> =
std::env::var("HARMONY_SECRET_INFISICAL_CLIENT_SECRET").ok();
}

166
harmony_secret/src/lib.rs Normal file
View File

@@ -0,0 +1,166 @@
pub mod config;
mod store;
use crate::config::SECRET_NAMESPACE;
use async_trait::async_trait;
use config::INFISICAL_CLIENT_ID;
use config::INFISICAL_CLIENT_SECRET;
use config::INFISICAL_ENVIRONMENT;
use config::INFISICAL_PROJECT_ID;
use config::INFISICAL_URL;
use config::SECRET_STORE;
use serde::{Serialize, de::DeserializeOwned};
use std::fmt;
use store::InfisicalSecretStore;
use store::LocalFileSecretStore;
use thiserror::Error;
use tokio::sync::OnceCell;
pub use harmony_secret_derive::Secret;
// The Secret trait remains the same.
pub trait Secret: Serialize + DeserializeOwned + Sized {
const KEY: &'static str;
}
// The error enum remains the same.
#[derive(Debug, Error)]
pub enum SecretStoreError {
#[error("Secret not found for key '{key}' in namespace '{namespace}'")]
NotFound { namespace: String, key: String },
#[error("Failed to deserialize secret for key '{key}': {source}")]
Deserialization {
key: String,
source: serde_json::Error,
},
#[error("Failed to serialize secret for key '{key}': {source}")]
Serialization {
key: String,
source: serde_json::Error,
},
#[error("Underlying storage error: {0}")]
Store(#[from] Box<dyn std::error::Error + Send + Sync>),
}
// The trait is now async!
#[async_trait]
pub trait SecretStore: fmt::Debug + Send + Sync {
async fn get_raw(&self, namespace: &str, key: &str) -> Result<Vec<u8>, SecretStoreError>;
async fn set_raw(
&self,
namespace: &str,
key: &str,
value: &[u8],
) -> Result<(), SecretStoreError>;
}
// Use OnceCell for async-friendly, one-time initialization.
static SECRET_MANAGER: OnceCell<SecretManager> = OnceCell::const_new();
/// Initializes and returns a reference to the global SecretManager.
async fn get_secret_manager() -> &'static SecretManager {
SECRET_MANAGER.get_or_init(init_secret_manager).await
}
/// The async initialization function for the SecretManager.
async fn init_secret_manager() -> SecretManager {
let default_secret_score = "infisical".to_string();
let store_type = SECRET_STORE.as_ref().unwrap_or(&default_secret_score);
let store: Box<dyn SecretStore> = match store_type.as_str() {
"file" => Box::new(LocalFileSecretStore::default()),
"infisical" | _ => {
let store = InfisicalSecretStore::new(
INFISICAL_URL.clone().expect("Infisical url must be set, see harmony_secret config for ways to provide it. You can try with HARMONY_SECRET_INFISICAL_URL"),
INFISICAL_PROJECT_ID.clone().expect("Infisical project id must be set, see harmony_secret config for ways to provide it. You can try with HARMONY_SECRET_INFISICAL_PROJECT_ID"),
INFISICAL_ENVIRONMENT.clone().expect("Infisical environment must be set, see harmony_secret config for ways to provide it. You can try with HARMONY_SECRET_INFISICAL_ENVIRONMENT"),
INFISICAL_CLIENT_ID.clone().expect("Infisical client id must be set, see harmony_secret config for ways to provide it. You can try with HARMONY_SECRET_INFISICAL_CLIENT_ID"),
INFISICAL_CLIENT_SECRET.clone().expect("Infisical client secret must be set, see harmony_secret config for ways to provide it. You can try with HARMONY_SECRET_INFISICAL_CLIENT_SECRET"),
)
.await
.expect("Failed to initialize Infisical secret store");
Box::new(store)
}
};
SecretManager::new(SECRET_NAMESPACE.clone(), store)
}
/// Manages the lifecycle of secrets, providing a simple static API.
#[derive(Debug)]
pub struct SecretManager {
namespace: String,
store: Box<dyn SecretStore>,
}
impl SecretManager {
fn new(namespace: String, store: Box<dyn SecretStore>) -> Self {
Self { namespace, store }
}
/// Retrieves and deserializes a secret.
pub async fn get<T: Secret>() -> Result<T, SecretStoreError> {
let manager = get_secret_manager().await;
let raw_value = manager.store.get_raw(&manager.namespace, T::KEY).await?;
serde_json::from_slice(&raw_value).map_err(|e| SecretStoreError::Deserialization {
key: T::KEY.to_string(),
source: e,
})
}
/// Serializes and stores a secret.
pub async fn set<T: Secret>(secret: &T) -> Result<(), SecretStoreError> {
let manager = get_secret_manager().await;
let raw_value =
serde_json::to_vec(secret).map_err(|e| SecretStoreError::Serialization {
key: T::KEY.to_string(),
source: e,
})?;
manager
.store
.set_raw(&manager.namespace, T::KEY, &raw_value)
.await
}
}
#[cfg(test)]
mod test {
use super::*;
use pretty_assertions::assert_eq;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct TestUserMeta {
labels: Vec<String>,
}
#[derive(Secret, Serialize, Deserialize, Debug, PartialEq)]
struct TestSecret {
user: String,
password: String,
metadata: TestUserMeta,
}
#[cfg(secrete2etest)]
#[tokio::test]
async fn set_and_retrieve_secret() {
let secret = TestSecret {
user: String::from("user"),
password: String::from("password"),
metadata: TestUserMeta {
labels: vec![
String::from("label1"),
String::from("label2"),
String::from(
"some longet label with \" special @#%$)(udiojcia[]]] \"'asdij'' characters Nдs はにほへとちり าฟันพัฒนา yağız şoföre ç <20> <20> <20> <20> <20> <20> <20> <20> <20> <20> <20> <20> <20> 👩‍👩‍👧‍👦 /span> 👩‍👧‍👦 and why not emojis ",
),
],
},
};
SecretManager::set(&secret).await.unwrap();
let value = SecretManager::get::<TestSecret>().await.unwrap();
assert_eq!(value, secret);
}
}

View File

@@ -0,0 +1,129 @@
use crate::{SecretStore, SecretStoreError};
use async_trait::async_trait;
use infisical::{
AuthMethod, InfisicalError,
client::Client,
secrets::{CreateSecretRequest, GetSecretRequest, UpdateSecretRequest},
};
use log::{info, warn};
#[derive(Debug)]
pub struct InfisicalSecretStore {
client: Client,
project_id: String,
environment: String,
}
impl InfisicalSecretStore {
/// Creates a new, authenticated Infisical client.
pub async fn new(
base_url: String,
project_id: String,
environment: String,
client_id: String,
client_secret: String,
) -> Result<Self, InfisicalError> {
info!("INFISICAL_STORE: Initializing client for URL: {base_url}");
// The builder and login logic remains the same.
let mut client = Client::builder().base_url(base_url).build().await?;
let auth_method = AuthMethod::new_universal_auth(client_id, client_secret);
client.login(auth_method).await?;
info!("INFISICAL_STORE: Client authenticated successfully.");
Ok(Self {
client,
project_id,
environment,
})
}
}
#[async_trait]
impl SecretStore for InfisicalSecretStore {
async fn get_raw(&self, _environment: &str, key: &str) -> Result<Vec<u8>, SecretStoreError> {
let environment = &self.environment;
info!("INFISICAL_STORE: Getting key '{key}' from environment '{environment}'");
let request = GetSecretRequest::builder(key, &self.project_id, environment).build();
match self.client.secrets().get(request).await {
Ok(secret) => Ok(secret.secret_value.into_bytes()),
Err(e) => {
// Correctly match against the actual InfisicalError enum.
match e {
// The specific case for a 404 Not Found error.
InfisicalError::HttpError { status, .. }
if status == http::StatusCode::NOT_FOUND =>
{
Err(SecretStoreError::NotFound {
namespace: environment.to_string(),
key: key.to_string(),
})
}
// For all other errors, wrap them in our generic Store error.
_ => Err(SecretStoreError::Store(Box::new(e))),
}
}
}
}
async fn set_raw(
&self,
_environment: &str,
key: &str,
val: &[u8],
) -> Result<(), SecretStoreError> {
info!(
"INFISICAL_STORE: Setting key '{key}' in environment '{}'",
self.environment
);
let value_str =
String::from_utf8(val.to_vec()).map_err(|e| SecretStoreError::Store(Box::new(e)))?;
// --- Upsert Logic ---
// First, attempt to update the secret.
let update_req = UpdateSecretRequest::builder(key, &self.project_id, &self.environment)
.secret_value(&value_str)
.build();
match self.client.secrets().update(update_req).await {
Ok(_) => {
info!("INFISICAL_STORE: Successfully updated secret '{key}'.");
Ok(())
}
Err(e) => {
// If the update failed, check if it was because the secret doesn't exist.
match e {
InfisicalError::HttpError { status, .. }
if status == http::StatusCode::NOT_FOUND =>
{
// The secret was not found, so we create it instead.
warn!(
"INFISICAL_STORE: Secret '{key}' not found for update, attempting to create it."
);
let create_req = CreateSecretRequest::builder(
key,
&value_str,
&self.project_id,
&self.environment,
)
.build();
// Handle potential errors during creation.
self.client
.secrets()
.create(create_req)
.await
.map_err(|create_err| SecretStoreError::Store(Box::new(create_err)))?;
info!("INFISICAL_STORE: Successfully created secret '{key}'.");
Ok(())
}
// Any other error during update is a genuine failure.
_ => Err(SecretStoreError::Store(Box::new(e))),
}
}
}
}
}

View File

@@ -0,0 +1,105 @@
use async_trait::async_trait;
use log::info;
use std::path::{Path, PathBuf};
use crate::{SecretStore, SecretStoreError};
#[derive(Debug, Default)]
pub struct LocalFileSecretStore;
impl LocalFileSecretStore {
/// Helper to consistently generate the secret file path.
fn get_file_path(base_dir: &Path, ns: &str, key: &str) -> PathBuf {
base_dir.join(format!("{ns}_{key}.json"))
}
}
#[async_trait]
impl SecretStore for LocalFileSecretStore {
async fn get_raw(&self, ns: &str, key: &str) -> Result<Vec<u8>, SecretStoreError> {
let data_dir = directories::BaseDirs::new()
.expect("Could not find a valid home directory")
.data_dir()
.join("harmony")
.join("secrets");
let file_path = Self::get_file_path(&data_dir, ns, key);
info!(
"LOCAL_STORE: Getting key '{key}' from namespace '{ns}' at {}",
file_path.display()
);
tokio::fs::read(&file_path)
.await
.map_err(|_| SecretStoreError::NotFound {
namespace: ns.to_string(),
key: key.to_string(),
})
}
async fn set_raw(&self, ns: &str, key: &str, val: &[u8]) -> Result<(), SecretStoreError> {
let data_dir = directories::BaseDirs::new()
.expect("Could not find a valid home directory")
.data_dir()
.join("harmony")
.join("secrets");
let file_path = Self::get_file_path(&data_dir, ns, key);
info!(
"LOCAL_STORE: Setting key '{key}' in namespace '{ns}' at {}",
file_path.display()
);
if let Some(parent_dir) = file_path.parent() {
tokio::fs::create_dir_all(parent_dir)
.await
.map_err(|e| SecretStoreError::Store(Box::new(e)))?;
}
tokio::fs::write(&file_path, val)
.await
.map_err(|e| SecretStoreError::Store(Box::new(e)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[tokio::test]
async fn test_set_and_get_raw_successfully() {
let dir = tempdir().unwrap();
let store = LocalFileSecretStore::default();
let ns = "test-ns";
let key = "test-key";
let value = b"{\"data\":\"test-value\"}";
// To test the store directly, we override the base directory logic.
// For this test, we'll manually construct the path within our temp dir.
let file_path = LocalFileSecretStore::get_file_path(dir.path(), ns, key);
// Manually write to the temp path to simulate the store's behavior
tokio::fs::create_dir_all(file_path.parent().unwrap())
.await
.unwrap();
tokio::fs::write(&file_path, value).await.unwrap();
// Now, test get_raw by reading from that same temp path (by mocking the path logic)
let retrieved_value = tokio::fs::read(&file_path).await.unwrap();
assert_eq!(retrieved_value, value);
}
#[tokio::test]
async fn test_get_raw_not_found() {
let dir = tempdir().unwrap();
let ns = "test-ns";
let key = "non-existent-key";
// We need to check if reading a non-existent file gives the correct error
let file_path = LocalFileSecretStore::get_file_path(dir.path(), ns, key);
let result = tokio::fs::read(&file_path).await;
assert!(matches!(result, Err(_)));
}
}

View File

@@ -0,0 +1,4 @@
mod infisical;
mod local_file;
pub use infisical::*;
pub use local_file::*;

View File

@@ -0,0 +1,8 @@
export HARMONY_SECRET_NAMESPACE=harmony_test_secrets
export HARMONY_SECRET_INFISICAL_URL=http://localhost
export HARMONY_SECRET_INFISICAL_PROJECT_ID=eb4723dc-eede-44d7-98cc-c8e0caf29ccb
export HARMONY_SECRET_INFISICAL_ENVIRONMENT=dev
export HARMONY_SECRET_INFISICAL_CLIENT_ID=dd16b07f-0e38-4090-a1d0-922de9f44d91
export HARMONY_SECRET_INFISICAL_CLIENT_SECRET=bd2ae054e7759b11ca2e908494196337cc800bab138cb1f59e8d9b15ca3f286f
cargo test

View File

@@ -0,0 +1,13 @@
[package]
name = "harmony-secret-derive"
version = "0.1.0"
edition = "2024"
[lib]
proc-macro = true
[dependencies]
quote = "1.0"
proc-macro2 = "1.0"
proc-macro-crate = "3.3"
syn = "2.0"

View File

@@ -0,0 +1,38 @@
use proc_macro::TokenStream;
use proc_macro_crate::{FoundCrate, crate_name};
use quote::quote;
use syn::{DeriveInput, Ident, parse_macro_input};
#[proc_macro_derive(Secret)]
pub fn derive_secret(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as DeriveInput);
let struct_ident = &input.ident;
// The key for the secret will be the stringified name of the struct itself.
// e.g., `struct OKDClusterSecret` becomes key `"OKDClusterSecret"`.
let key = struct_ident.to_string();
// Find the path to the `harmony_secret` crate.
let secret_crate_path = match crate_name("harmony-secret") {
Ok(FoundCrate::Itself) => quote!(crate),
Ok(FoundCrate::Name(name)) => {
let ident = Ident::new(&name, proc_macro2::Span::call_site());
quote!(::#ident)
}
Err(e) => {
return syn::Error::new(proc_macro2::Span::call_site(), e.to_string())
.to_compile_error()
.into();
}
};
// The generated code now implements `Secret` for the struct itself.
// The struct must also derive `Serialize` and `Deserialize` for this to be useful.
let expanded = quote! {
impl #secret_crate_path::Secret for #struct_ident {
const KEY: &'static str = #key;
}
};
TokenStream::from(expanded)
}

17
iobench/Cargo.toml Normal file
View File

@@ -0,0 +1,17 @@
[package]
name = "iobench"
edition = "2024"
version = "1.0.0"
license = "AGPL-3.0-or-later"
description = "A small command line utility to run fio benchmarks on localhost or remote ssh or kubernetes host. Was born out of a need to benchmark various ceph configurations!"
[dependencies]
clap = { version = "4.0", features = ["derive"] }
chrono = "0.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
csv = "1.1"
num_cpus = "1.13"
[workspace]

10
iobench/dash/README.md Normal file
View File

@@ -0,0 +1,10 @@
This project was generated mostly by Gemini but it works so... :)
## To run iobench dashboard
```bash
virtualenv venv
source venv/bin/activate
pip install -r requirements_freeze.txt
python iobench-dash-v4.py
```

View File

@@ -0,0 +1,229 @@
import dash
from dash import dcc, html, Input, Output, State, clientside_callback, ClientsideFunction
import plotly.express as px
import pandas as pd
import dash_bootstrap_components as dbc
import io
# --- Data Loading and Preparation ---
# csv_data = """label,test_name,iops,bandwidth_kibps,latency_mean_ms,latency_stddev_ms
# Ceph HDD Only,read-4k-sync-test,1474.302,5897,0.673,0.591
# Ceph HDD Only,write-4k-sync-test,14.126,56,27.074,7.046
# Ceph HDD Only,randread-4k-sync-test,225.140,900,4.436,6.918
# Ceph HDD Only,randwrite-4k-sync-test,13.129,52,34.891,10.859
# Ceph HDD Only,multiread-4k-sync-test,6873.675,27494,0.578,0.764
# Ceph HDD Only,multiwrite-4k-sync-test,57.135,228,38.660,11.293
# Ceph HDD Only,multirandread-4k-sync-test,2451.376,9805,1.626,2.515
# Ceph HDD Only,multirandwrite-4k-sync-test,54.642,218,33.492,13.111
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,read-4k-sync-test,1495.700,5982,0.664,1.701
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,write-4k-sync-test,16.990,67,17.502,9.908
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,randread-4k-sync-test,159.256,637,6.274,9.232
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,randwrite-4k-sync-test,16.693,66,24.094,16.099
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multiread-4k-sync-test,7305.559,29222,0.544,1.338
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multiwrite-4k-sync-test,52.260,209,34.891,17.576
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multirandread-4k-sync-test,700.606,2802,5.700,10.429
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multirandwrite-4k-sync-test,52.723,210,29.709,25.829
# Ceph 2 Hosts WAL+DB SSD Only,randwrite-4k-sync-test,90.037,360,3.617,8.321
# Ceph WAL+DB SSD During Rebuild,randwrite-4k-sync-test,41.008,164,10.138,19.333
# Ceph WAL+DB SSD OSD HDD,read-4k-sync-test,1520.299,6081,0.654,1.539
# Ceph WAL+DB SSD OSD HDD,write-4k-sync-test,78.528,314,4.074,9.101
# Ceph WAL+DB SSD OSD HDD,randread-4k-sync-test,153.303,613,6.518,9.036
# Ceph WAL+DB SSD OSD HDD,randwrite-4k-sync-test,48.677,194,8.785,20.356
# Ceph WAL+DB SSD OSD HDD,multiread-4k-sync-test,6804.880,27219,0.584,1.422
# Ceph WAL+DB SSD OSD HDD,multiwrite-4k-sync-test,311.513,1246,4.978,9.458
# Ceph WAL+DB SSD OSD HDD,multirandread-4k-sync-test,581.756,2327,6.869,10.204
# Ceph WAL+DB SSD OSD HDD,multirandwrite-4k-sync-test,120.556,482,13.463,25.440
# """
#
# df = pd.read_csv(io.StringIO(csv_data))
df = pd.read_csv("iobench.csv") # Replace with the actual file path
df['bandwidth_mbps'] = df['bandwidth_kibps'] / 1024
# --- App Initialization and Global Settings ---
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.FLATLY])
# Create master lists of options for checklists
unique_labels = sorted(df['label'].unique())
unique_tests = sorted(df['test_name'].unique())
# Create a consistent color map for each unique label
color_map = {label: color for label, color in zip(unique_labels, px.colors.qualitative.Plotly)}
# --- App Layout ---
app.layout = dbc.Container([
# Header
dbc.Row(dbc.Col(html.H1("Ceph iobench Performance Dashboard", className="text-primary"),), className="my-4 text-center"),
# Controls and Graphs Row
dbc.Row([
# Control Panel Column
dbc.Col([
dbc.Card([
dbc.CardBody([
html.H4("Control Panel", className="card-title"),
html.Hr(),
# Metric Selection
dbc.Label("1. Select Metrics to Display:", html_for="metric-checklist", className="fw-bold"),
dcc.Checklist(
id='metric-checklist',
options=[
{'label': 'IOPS', 'value': 'iops'},
{'label': 'Latency (ms)', 'value': 'latency_mean_ms'},
{'label': 'Bandwidth (MB/s)', 'value': 'bandwidth_mbps'}
],
value=['iops', 'latency_mean_ms', 'bandwidth_mbps'], # Default selection
labelClassName="d-block"
),
html.Hr(),
# Configuration Selection
dbc.Label("2. Select Configurations:", html_for="config-checklist", className="fw-bold"),
dbc.ButtonGroup([
dbc.Button("All", id="config-select-all", n_clicks=0, color="primary", outline=True, size="sm"),
dbc.Button("None", id="config-select-none", n_clicks=0, color="primary", outline=True, size="sm"),
], className="mb-2"),
dcc.Checklist(
id='config-checklist',
options=[{'label': label, 'value': label} for label in unique_labels],
value=unique_labels, # Select all by default
labelClassName="d-block"
),
html.Hr(),
# Test Name Selection
dbc.Label("3. Select Tests:", html_for="test-checklist", className="fw-bold"),
dbc.ButtonGroup([
dbc.Button("All", id="test-select-all", n_clicks=0, color="primary", outline=True, size="sm"),
dbc.Button("None", id="test-select-none", n_clicks=0, color="primary", outline=True, size="sm"),
], className="mb-2"),
dcc.Checklist(
id='test-checklist',
options=[{'label': test, 'value': test} for test in unique_tests],
value=unique_tests, # Select all by default
labelClassName="d-block"
),
])
], className="mb-4")
], width=12, lg=4),
# Graph Display Column
dbc.Col(id='graph-container', width=12, lg=8)
])
], fluid=True)
# --- Callbacks ---
# Callback to handle "Select All" / "Select None" for configurations
@app.callback(
Output('config-checklist', 'value'),
Input('config-select-all', 'n_clicks'),
Input('config-select-none', 'n_clicks'),
prevent_initial_call=True
)
def select_all_none_configs(all_clicks, none_clicks):
ctx = dash.callback_context
if not ctx.triggered:
return dash.no_update
button_id = ctx.triggered[0]['prop_id'].split('.')[0]
if button_id == 'config-select-all':
return unique_labels
elif button_id == 'config-select-none':
return []
return dash.no_update
# Callback to handle "Select All" / "Select None" for tests
@app.callback(
Output('test-checklist', 'value'),
Input('test-select-all', 'n_clicks'),
Input('test-select-none', 'n_clicks'),
prevent_initial_call=True
)
def select_all_none_tests(all_clicks, none_clicks):
ctx = dash.callback_context
if not ctx.triggered:
return dash.no_update
button_id = ctx.triggered[0]['prop_id'].split('.')[0]
if button_id == 'test-select-all':
return unique_tests
elif button_id == 'test-select-none':
return []
return dash.no_update
# Main callback to update graphs based on all selections
@app.callback(
Output('graph-container', 'children'),
[Input('metric-checklist', 'value'),
Input('config-checklist', 'value'),
Input('test-checklist', 'value')]
)
def update_graphs(selected_metrics, selected_configs, selected_tests):
"""
This function is triggered when any control's value changes.
It generates and returns a list of graphs based on all user selections.
"""
# Handle cases where no selection is made to prevent errors and show a helpful message
if not all([selected_metrics, selected_configs, selected_tests]):
return dbc.Alert(
"Please select at least one item from each category (Metric, Configuration, and Test) to view data.",
color="info",
className="mt-4"
)
# Filter the DataFrame based on all selected criteria
filtered_df = df[df['label'].isin(selected_configs) & df['test_name'].isin(selected_tests)]
# If the filtered data is empty after selection, inform the user
if filtered_df.empty:
return dbc.Alert("No data available for the current selection.", color="warning", className="mt-4")
graph_list = []
metric_titles = {
'iops': 'IOPS Comparison (Higher is Better)',
'latency_mean_ms': 'Mean Latency (ms) Comparison (Lower is Better)',
'bandwidth_mbps': 'Bandwidth (MB/s) Comparison (Higher is Better)'
}
for metric in selected_metrics:
sort_order = 'total ascending' if metric == 'latency_mean_ms' else 'total descending'
error_y_param = 'latency_stddev_ms' if metric == 'latency_mean_ms' else None
fig = px.bar(
filtered_df,
x='test_name',
y=metric,
color='label',
barmode='group',
color_discrete_map=color_map,
error_y=error_y_param,
title=metric_titles.get(metric, metric),
labels={
"test_name": "Benchmark Test Name",
"iops": "IOPS",
"latency_mean_ms": "Mean Latency (ms)",
"bandwidth_mbps": "Bandwidth (MB/s)",
"label": "Cluster Configuration"
}
)
fig.update_layout(
height=500,
xaxis_title=None,
legend_title="Configuration",
title_x=0.5,
xaxis={'categoryorder': sort_order},
xaxis_tickangle=-45,
margin=dict(b=120) # Add bottom margin to prevent tick labels from being cut off
)
graph_list.append(dbc.Row(dbc.Col(dcc.Graph(figure=fig)), className="mb-4"))
return graph_list
# --- Run the App ---
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -0,0 +1,29 @@
blinker==1.9.0
certifi==2025.7.14
charset-normalizer==3.4.2
click==8.2.1
dash==3.2.0
dash-bootstrap-components==2.0.3
Flask==3.1.1
idna==3.10
importlib_metadata==8.7.0
itsdangerous==2.2.0
Jinja2==3.1.6
MarkupSafe==3.0.2
narwhals==2.0.1
nest-asyncio==1.6.0
numpy==2.3.2
packaging==25.0
pandas==2.3.1
plotly==6.2.0
python-dateutil==2.9.0.post0
pytz==2025.2
requests==2.32.4
retrying==1.4.1
setuptools==80.9.0
six==1.17.0
typing_extensions==4.14.1
tzdata==2025.2
urllib3==2.5.0
Werkzeug==3.1.3
zipp==3.23.0

41
iobench/deployment.yaml Normal file
View File

@@ -0,0 +1,41 @@
apiVersion: apps/v1
kind: Deployment
metadata:
name: iobench
labels:
app: iobench
spec:
replicas: 1
selector:
matchLabels:
app: iobench
template:
metadata:
labels:
app: iobench
spec:
containers:
- name: fio
image: juicedata/fio:latest # Replace with your preferred fio image
imagePullPolicy: IfNotPresent
command: [ "sleep", "infinity" ] # Keeps the container running for kubectl exec
volumeMounts:
- name: iobench-pvc
mountPath: /data # Mount the PVC at /data
volumes:
- name: iobench-pvc
persistentVolumeClaim:
claimName: iobench-pvc # Matches your PVC name
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
name: iobench-pvc
spec:
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 5Gi
storageClassName: ceph-block

253
iobench/src/main.rs Normal file
View File

@@ -0,0 +1,253 @@
use std::fs;
use std::io::{self, Write};
use std::process::{Command, Stdio};
use std::thread;
use std::time::Duration;
use chrono::Local;
use clap::Parser;
use serde::{Deserialize, Serialize};
/// A simple yet powerful I/O benchmarking tool using fio.
#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
/// Target for the benchmark.
/// Formats:
/// - localhost (default)
/// - ssh/{user}@{host}
/// - ssh/{user}@{host}:{port}
/// - k8s/{namespace}/{pod}
#[arg(short, long, default_value = "localhost")]
target: String,
#[arg(short, long, default_value = ".")]
benchmark_dir: String,
/// Comma-separated list of tests to run.
/// Available tests: read, write, randread, randwrite,
/// multiread, multiwrite, multirandread, multirandwrite.
#[arg(long, default_value = "read,write,randread,randwrite,multiread,multiwrite,multirandread,multirandwrite")]
tests: String,
/// Duration of each test in seconds.
#[arg(long, default_value_t = 15)]
duration: u64,
/// Output directory for results.
/// Defaults to ./iobench-{current_datetime}.
#[arg(long)]
output_dir: Option<String>,
/// The size of the test file for fio.
#[arg(long, default_value = "1G")]
size: String,
/// The block size for I/O operations.
#[arg(long, default_value = "4k")]
block_size: String,
}
#[derive(Debug, Serialize, Deserialize)]
struct FioOutput {
jobs: Vec<FioJobResult>,
}
#[derive(Debug, Serialize, Deserialize)]
struct FioJobResult {
jobname: String,
read: FioMetrics,
write: FioMetrics,
}
#[derive(Debug, Serialize, Deserialize)]
struct FioMetrics {
bw: f64,
iops: f64,
clat_ns: LatencyMetrics,
}
#[derive(Debug, Serialize, Deserialize)]
struct LatencyMetrics {
mean: f64,
stddev: f64,
}
#[derive(Debug, Serialize)]
struct BenchmarkResult {
test_name: String,
iops: f64,
bandwidth_kibps: f64,
latency_mean_ms: f64,
latency_stddev_ms: f64,
}
fn main() -> io::Result<()> {
let args = Args::parse();
let output_dir = args.output_dir.unwrap_or_else(|| {
format!("./iobench-{}", Local::now().format("%Y-%m-%d-%H%M%S"))
});
fs::create_dir_all(&output_dir)?;
let tests_to_run: Vec<&str> = args.tests.split(',').collect();
let mut results = Vec::new();
for test in tests_to_run {
println!("--------------------------------------------------");
println!("Running test: {}", test);
let (rw, numjobs) = match test {
"read" => ("read", 1),
"write" => ("write", 1),
"randread" => ("randread", 1),
"randwrite" => ("randwrite", 1),
"multiread" => ("read", 4),
"multiwrite" => ("write", 4),
"multirandread" => ("randread", 4),
"multirandwrite" => ("randwrite", 4),
_ => {
eprintln!("Unknown test: {}. Skipping.", test);
continue;
}
};
let test_name = format!("{}-{}-sync-test", test, args.block_size);
let fio_command = format!(
"fio --filename={}/iobench_testfile --direct=1 --fsync=1 --rw={} --bs={} --numjobs={} --iodepth=1 --runtime={} --time_based --group_reporting --name={} --size={} --output-format=json",
args.benchmark_dir, rw, args.block_size, numjobs, args.duration, test_name, args.size
);
println!("Executing command:\n{}\n", fio_command);
let output = match run_command(&args.target, &fio_command) {
Ok(out) => out,
Err(e) => {
eprintln!("Failed to execute command for test {}: {}", test, e);
continue;
}
};
let result = parse_fio_output(&output, &test_name, rw);
// TODO store raw fio output and print it
match result {
Ok(res) => {
results.push(res);
}
Err(e) => {
eprintln!("Error parsing fio output for test {}: {}", test, e);
eprintln!("Raw output:\n{}", output);
}
}
println!("{output}");
println!("Test {} completed.", test);
// A brief pause to let the system settle before the next test.
thread::sleep(Duration::from_secs(2));
}
// Cleanup the test file on the target
println!("--------------------------------------------------");
println!("Cleaning up test file on target...");
let cleanup_command = "rm -f ./iobench_testfile";
if let Err(e) = run_command(&args.target, cleanup_command) {
eprintln!("Warning: Failed to clean up test file on target: {}", e);
} else {
println!("Cleanup successful.");
}
if results.is_empty() {
println!("\nNo benchmark results to display.");
return Ok(());
}
// Output results to a CSV file for easy analysis
let csv_path = format!("{}/summary.csv", output_dir);
let mut wtr = csv::Writer::from_path(&csv_path)?;
for result in &results {
wtr.serialize(result)?;
}
wtr.flush()?;
println!("\nBenchmark summary saved to {}", csv_path);
println!("\n--- Benchmark Results Summary ---");
println!("{:<25} {:>10} {:>18} {:>20} {:>22}", "Test Name", "IOPS", "Bandwidth (KiB/s)", "Latency Mean (ms)", "Latency StdDev (ms)");
println!("{:-<98}", "");
for result in results {
println!("{:<25} {:>10.2} {:>18.2} {:>20.4} {:>22.4}", result.test_name, result.iops, result.bandwidth_kibps, result.latency_mean_ms, result.latency_stddev_ms);
}
Ok(())
}
fn run_command(target: &str, command: &str) -> io::Result<String> {
let (program, args) = if target == "localhost" {
("sudo", vec!["sh".to_string(), "-c".to_string(), command.to_string()])
} else if target.starts_with("ssh/") {
let target_str = target.strip_prefix("ssh/").unwrap();
let ssh_target;
let mut ssh_args = vec!["-o".to_string(), "StrictHostKeyChecking=no".to_string()];
let port_parts: Vec<&str> = target_str.split(':').collect();
if port_parts.len() == 2 {
ssh_target = port_parts[0].to_string();
ssh_args.push("-p".to_string());
ssh_args.push(port_parts[1].to_string());
} else {
ssh_target = target_str.to_string();
}
ssh_args.push(ssh_target);
ssh_args.push(format!("sudo sh -c '{}'", command));
("ssh", ssh_args)
} else if target.starts_with("k8s/") {
let parts: Vec<&str> = target.strip_prefix("k8s/").unwrap().split('/').collect();
if parts.len() != 2 {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid k8s target format. Expected k8s/{namespace}/{pod}"));
}
let namespace = parts[0];
let pod = parts[1];
("kubectl", vec!["exec".to_string(), "-n".to_string(), namespace.to_string(), pod.to_string(), "--".to_string(), "sh".to_string(), "-c".to_string(), command.to_string()])
} else {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid target format"));
};
let mut cmd = Command::new(program);
cmd.args(&args);
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
let child = cmd.spawn()?;
let output = child.wait_with_output()?;
if !output.status.success() {
eprintln!("Command failed with status: {}", output.status);
io::stderr().write_all(&output.stderr)?;
return Err(io::Error::new(io::ErrorKind::Other, "Command execution failed"));
}
String::from_utf8(output.stdout)
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
}
fn parse_fio_output(output: &str, test_name: &str, rw: &str) -> Result<BenchmarkResult, String> {
let fio_data: FioOutput = serde_json::from_str(output)
.map_err(|e| format!("Failed to deserialize fio JSON: {}", e))?;
let job_result = fio_data.jobs.iter()
.find(|j| j.jobname == test_name)
.ok_or_else(|| format!("Could not find job result for '{}' in fio output", test_name))?;
let metrics = if rw.contains("read") {
&job_result.read
} else {
&job_result.write
};
Ok(BenchmarkResult {
test_name: test_name.to_string(),
iops: metrics.iops,
bandwidth_kibps: metrics.bw,
latency_mean_ms: metrics.clat_ns.mean / 1_000_000.0,
latency_stddev_ms: metrics.clat_ns.stddev / 1_000_000.0,
})
}

View File

@@ -12,7 +12,7 @@ env_logger = { workspace = true }
yaserde = { git = "https://github.com/jggc/yaserde.git" }
yaserde_derive = { git = "https://github.com/jggc/yaserde.git" }
xml-rs = "0.8"
thiserror = "1.0"
thiserror.workspace = true
async-trait = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true }