Compare commits

..

15 Commits

Author SHA1 Message Date
b118eb57e6 wip: OKD Installation automation layed out. Next step : review this after some sleep and fill in the (many) blanks with actual implementations.
Some checks failed
Run Check Script / check (pull_request) Failing after 22s
2025-08-18 22:29:46 -04:00
9c5d1bd27f feat: Secret module works with infisical and local file storage backends
All checks were successful
Run Check Script / check (pull_request) Successful in 1m10s
2025-08-16 11:28:59 -04:00
2a6a233fb2 feat: WIP add secrets module and macro crate 2025-08-15 14:40:39 -04:00
Ian Letourneau
67f3a23071 chore: cleanup unused imports
All checks were successful
Run Check Script / check (push) Successful in 1m30s
Compile and package harmony_composer / package_harmony_composer (push) Successful in 5m33s
2025-08-14 16:44:22 -04:00
d86970f81b fix: make sure demo works on both local & remote target (#107)
Some checks failed
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
Run Check Script / check (push) Has been cancelled
* define Ntfy ingress (naive implementation) based on current target
* use patched Ntfy Helm Chart
* create Ntfy main user only if needed
* add info logs
* better error bubbling
* instrument feature installations
* upgrade prometheus alerting charts if already installed
* harmony_composer params to control deployment `target` and `profile`

Co-authored-by: Ian Letourneau <letourneau.ian@gmail.com>
Co-authored-by: Jean-Gabriel Gill-Couture <jg@nationtech.io>
Reviewed-on: #107
2025-08-14 20:42:09 +00:00
623a3f019b fix: apply different network policies based on current target (#97)
Some checks failed
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
Run Check Script / check (push) Has been cancelled
Fixes #94

Co-authored-by: Ian Letourneau <letourneau.ian@gmail.com>
Reviewed-on: #97
Reviewed-by: johnride <jg@nationtech.io>
2025-08-14 20:36:19 +00:00
Ian Letourneau
bd214f8fb8 fix: remove sha256 for harmony composer image in harmony_composer workflow
All checks were successful
Run Check Script / check (push) Successful in 1m9s
Compile and package harmony_composer / package_harmony_composer (push) Successful in 6m11s
2025-08-11 19:49:06 -04:00
f0ed548755 fix: improve usage of indicatif for tracking progress (#101)
Some checks failed
Run Check Script / check (push) Successful in 1m18s
Compile and package harmony_composer / package_harmony_composer (push) Failing after -1s
The multiprogress wasn't used properly and leading to conflicting progress bars (within our own progress bars, as well as the log wrapper).

This PR introduce a layer on top of `indicatif::MultiProgress` to properly handle sections of progress bars, where we can dynamically add/update/remove progress bars from any sections.

We can see in the demo that new sections + progress bars are added on the fly and that extra logs (e.g. info logs) are appended on top of the progress bars.

Progress are also grouped together based on their parent score.

Co-authored-by: Ian Letourneau <letourneau.ian@gmail.com>
Co-authored-by: johnride <jg@nationtech.io>
Reviewed-on: #101
2025-08-11 23:47:11 +00:00
1de96027a1 fix: prevent instrumentation to run in test mode (#102)
Some checks failed
Run Check Script / check (push) Successful in 1m16s
Compile and package harmony_composer / package_harmony_composer (push) Failing after -1s
The CI pipeline (`./check.sh`) was failing because of test errors, which was caused by the instrumentation framework complaining that no subscribers/listeners were registered.

Instead of setting up all tests to run with a dummy subscriber, move the implementation of the instrumentation behind a feature flag so that it runs only for tests.

There's a catch though: the `#[cfg(test)]` directive works only when directly testing the crate. If a crate `A` depends on another crate `B`, `B` will be compiled as usual (aka not in test mode) which will not trigger the `test` flag.

So we need to introduce our own `testing` feature flag for `harmony` core and import it with that flag (only during dev/test).

More info: https://github.com/rust-lang/rust/issues/59168

Co-authored-by: Ian Letourneau <letourneau.ian@gmail.com>
Reviewed-on: https://git.nationtech.io/NationTech/harmony/pulls/102
2025-08-11 23:42:08 +00:00
0812937a67 fix(ci): Remove specific sha256 for harmony composer image, just always run on latest
Some checks failed
Compile and package harmony_composer / package_harmony_composer (push) Successful in 7m22s
Run Check Script / check (push) Failing after 2m0s
2025-08-11 15:52:37 -04:00
29a261575b refactor: Interpret score with a provided method on Score (#100)
Some checks failed
Compile and package harmony_composer / package_harmony_composer (push) Successful in 6m49s
Run Check Script / check (push) Failing after 41s
First step in a direction to better orchestrate the core flow, even though it feels weird to move this logic into the `Score`. We'll refactor this as soon as we have a better solution.

Co-authored-by: Ian Letourneau <letourneau.ian@gmail.com>
Reviewed-on: #100
2025-08-09 22:56:23 +00:00
dcf8335240 Merge pull request 'refactor: Remove InterpretStatus/Error & Outcome from Topology' (#99) from remove-interpret-status-from-topology into master
Some checks are pending
Run Check Script / check (push) Waiting to run
Compile and package harmony_composer / package_harmony_composer (push) Waiting to run
Reviewed-on: #99
Reviewed-by: johnride <jg@nationtech.io>
2025-08-09 22:52:21 +00:00
Ian Letourneau
f876b5e67b refactor: Remove InterpretStatus/Error & Outcome from Topology
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
2025-08-06 22:29:00 -04:00
440c1bce12 chore: reformat & clippy cleanup (#96)
Some checks failed
Run Check Script / check (pull_request) Has been cancelled
Run Check Script / check (push) Has been cancelled
Compile and package harmony_composer / package_harmony_composer (push) Has been cancelled
Clippy is now added to the `check` in the pipeline

Co-authored-by: Ian Letourneau <letourneau.ian@gmail.com>
Reviewed-on: #96
2025-08-06 15:57:14 +00:00
024084859e Monitor an application within a tenant (#86)
All checks were successful
Run Check Script / check (push) Successful in -45s
Compile and package harmony_composer / package_harmony_composer (push) Successful in 4m35s
WIP: added implementation to deploy crd-alertmanagerconfigs
Co-authored-by: Ian Letourneau <letourneau.ian@gmail.com>
Reviewed-on: #86
Co-authored-by: Willem <wrolleman@nationtech.io>
Co-committed-by: Willem <wrolleman@nationtech.io>
2025-08-04 21:42:01 +00:00
107 changed files with 3146 additions and 904 deletions

View File

@@ -9,7 +9,7 @@ jobs:
check:
runs-on: docker
container:
image: hub.nationtech.io/harmony/harmony_composer:latest@sha256:eb0406fcb95c63df9b7c4b19bc50ad7914dd8232ce98e9c9abef628e07c69386
image: hub.nationtech.io/harmony/harmony_composer:latest
steps:
- name: Checkout code
uses: actions/checkout@v4

View File

@@ -7,7 +7,7 @@ on:
jobs:
package_harmony_composer:
container:
image: hub.nationtech.io/harmony/harmony_composer:latest@sha256:eb0406fcb95c63df9b7c4b19bc50ad7914dd8232ce98e9c9abef628e07c69386
image: hub.nationtech.io/harmony/harmony_composer:latest
runs-on: dind
steps:
- name: Checkout code
@@ -45,14 +45,14 @@ jobs:
-H "Authorization: token ${{ secrets.GITEATOKEN }}" \
"https://git.nationtech.io/api/v1/repos/nationtech/harmony/releases/tags/snapshot-latest" \
| jq -r '.id // empty')
if [ -n "$RELEASE_ID" ]; then
# Delete existing release
curl -X DELETE \
-H "Authorization: token ${{ secrets.GITEATOKEN }}" \
"https://git.nationtech.io/api/v1/repos/nationtech/harmony/releases/$RELEASE_ID"
fi
# Create new release
RESPONSE=$(curl -X POST \
-H "Authorization: token ${{ secrets.GITEATOKEN }}" \
@@ -65,7 +65,7 @@ jobs:
"prerelease": true
}' \
"https://git.nationtech.io/api/v1/repos/nationtech/harmony/releases")
echo "RELEASE_ID=$(echo $RESPONSE | jq -r '.id')" >> $GITHUB_ENV
- name: Upload Linux binary

238
Cargo.lock generated
View File

@@ -378,7 +378,7 @@ dependencies = [
"serde_json",
"serde_repr",
"serde_urlencoded",
"thiserror 2.0.12",
"thiserror 2.0.14",
"tokio",
"tokio-util",
"tower-service",
@@ -473,7 +473,7 @@ dependencies = [
"semver",
"serde",
"serde_json",
"thiserror 2.0.12",
"thiserror 2.0.14",
]
[[package]]
@@ -515,6 +515,12 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9555578bc9e57714c812a1f84e4fc5b4d21fcb063490c624de019f7464c91268"
[[package]]
name = "cfg_aliases"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "chacha20"
version = "0.9.1"
@@ -1689,9 +1695,11 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
dependencies = [
"cfg-if",
"js-sys",
"libc",
"r-efi",
"wasi 0.14.2+wasi-0.2.4",
"wasm-bindgen",
]
[[package]]
@@ -1789,6 +1797,7 @@ dependencies = [
"env_logger",
"fqdn",
"futures-util",
"harmony-secret-derive",
"harmony_macros",
"harmony_types",
"helm-wrapper-rs",
@@ -1829,6 +1838,35 @@ dependencies = [
"uuid",
]
[[package]]
name = "harmony-secret"
version = "0.1.0"
dependencies = [
"async-trait",
"directories",
"harmony-secret-derive",
"http 1.3.1",
"infisical",
"lazy_static",
"log",
"pretty_assertions",
"serde",
"serde_json",
"tempfile",
"thiserror 2.0.14",
"tokio",
]
[[package]]
name = "harmony-secret-derive"
version = "0.1.0"
dependencies = [
"proc-macro-crate",
"proc-macro2",
"quote",
"syn",
]
[[package]]
name = "harmony_cli"
version = "0.1.0"
@@ -1962,7 +2000,7 @@ dependencies = [
"non-blank-string-rs",
"serde",
"serde_json",
"thiserror 2.0.12",
"thiserror 2.0.14",
]
[[package]]
@@ -2130,7 +2168,7 @@ dependencies = [
"httpdate",
"itoa",
"pin-project-lite",
"socket2",
"socket2 0.5.10",
"tokio",
"tower-service",
"tracing",
@@ -2209,6 +2247,7 @@ dependencies = [
"tokio",
"tokio-rustls",
"tower-service",
"webpki-roots",
]
[[package]]
@@ -2271,7 +2310,7 @@ dependencies = [
"libc",
"percent-encoding",
"pin-project-lite",
"socket2",
"socket2 0.5.10",
"system-configuration 0.6.1",
"tokio",
"tower-service",
@@ -2488,6 +2527,21 @@ version = "2.0.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f4c7245a08504955605670dbf141fceab975f15ca21570696aebe9d2e71576bd"
[[package]]
name = "infisical"
version = "0.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4d97c33b08e22b2f7b9f87a8fc06a7d247442db7bf216ffc6661a74ed8aea658"
dependencies = [
"base64 0.22.1",
"reqwest 0.12.20",
"serde",
"serde_json",
"thiserror 1.0.69",
"tokio",
"url",
]
[[package]]
name = "inout"
version = "0.1.4"
@@ -2528,6 +2582,17 @@ dependencies = [
"syn",
]
[[package]]
name = "io-uring"
version = "0.7.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d93587f37623a1a17d94ef2bc9ada592f5465fe7732084ab7beefabe5c77c0c4"
dependencies = [
"bitflags 2.9.1",
"cfg-if",
"libc",
]
[[package]]
name = "ipnet"
version = "2.11.0"
@@ -2621,7 +2686,7 @@ dependencies = [
"pest_derive",
"regex",
"serde_json",
"thiserror 2.0.12",
"thiserror 2.0.14",
]
[[package]]
@@ -2722,7 +2787,7 @@ dependencies = [
"serde",
"serde_json",
"serde_yaml",
"thiserror 2.0.12",
"thiserror 2.0.14",
"tokio",
"tokio-tungstenite",
"tokio-util",
@@ -2747,7 +2812,7 @@ dependencies = [
"serde",
"serde-value",
"serde_json",
"thiserror 2.0.12",
"thiserror 2.0.14",
]
[[package]]
@@ -2785,7 +2850,7 @@ dependencies = [
"pin-project",
"serde",
"serde_json",
"thiserror 2.0.12",
"thiserror 2.0.14",
"tokio",
"tokio-util",
"tracing",
@@ -2897,6 +2962,12 @@ dependencies = [
"hashbrown 0.15.4",
]
[[package]]
name = "lru-slab"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "112b39cec0b298b6c1999fee3e31427f74f676e4cb9879ed1a121b43661a4154"
[[package]]
name = "md5"
version = "0.7.0"
@@ -3199,7 +3270,7 @@ dependencies = [
"pretty_assertions",
"rand 0.8.5",
"serde",
"thiserror 1.0.69",
"thiserror 2.0.14",
"tokio",
"uuid",
"xml-rs",
@@ -3366,7 +3437,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1db05f56d34358a8b1066f67cbb203ee3e7ed2ba674a6263a1d5ec6db2204323"
dependencies = [
"memchr",
"thiserror 2.0.12",
"thiserror 2.0.14",
"ucd-trie",
]
@@ -3587,6 +3658,15 @@ dependencies = [
"elliptic-curve",
]
[[package]]
name = "proc-macro-crate"
version = "3.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "edce586971a4dfaa28950c6f18ed55e0406c1ab88bbce2c6f6293a7aaba73d35"
dependencies = [
"toml_edit",
]
[[package]]
name = "proc-macro2"
version = "1.0.95"
@@ -3602,6 +3682,61 @@ version = "0.4.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e9e1dcb320d6839f6edb64f7a4a59d39b30480d4d1765b56873f7c858538a5fe"
[[package]]
name = "quinn"
version = "0.11.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "626214629cda6781b6dc1d316ba307189c85ba657213ce642d9c77670f8202c8"
dependencies = [
"bytes",
"cfg_aliases",
"pin-project-lite",
"quinn-proto",
"quinn-udp",
"rustc-hash",
"rustls",
"socket2 0.5.10",
"thiserror 2.0.14",
"tokio",
"tracing",
"web-time",
]
[[package]]
name = "quinn-proto"
version = "0.11.12"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "49df843a9161c85bb8aae55f101bc0bac8bcafd637a620d9122fd7e0b2f7422e"
dependencies = [
"bytes",
"getrandom 0.3.3",
"lru-slab",
"rand 0.9.1",
"ring",
"rustc-hash",
"rustls",
"rustls-pki-types",
"slab",
"thiserror 2.0.14",
"tinyvec",
"tracing",
"web-time",
]
[[package]]
name = "quinn-udp"
version = "0.5.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "fcebb1209ee276352ef14ff8732e24cc2b02bbac986cd74a4c81bcb2f9881970"
dependencies = [
"cfg_aliases",
"libc",
"once_cell",
"socket2 0.5.10",
"tracing",
"windows-sys 0.59.0",
]
[[package]]
name = "quote"
version = "1.0.40"
@@ -3720,7 +3855,7 @@ checksum = "dd6f9d3d47bdd2ad6945c5015a226ec6155d0bcdfd8f7cd29f86b71f8de99d2b"
dependencies = [
"getrandom 0.2.16",
"libredox",
"thiserror 2.0.12",
"thiserror 2.0.14",
]
[[package]]
@@ -3821,6 +3956,7 @@ dependencies = [
"base64 0.22.1",
"bytes",
"encoding_rs",
"futures-channel",
"futures-core",
"futures-util",
"h2 0.4.10",
@@ -3837,6 +3973,8 @@ dependencies = [
"native-tls",
"percent-encoding",
"pin-project-lite",
"quinn",
"rustls",
"rustls-pki-types",
"serde",
"serde_json",
@@ -3844,6 +3982,7 @@ dependencies = [
"sync_wrapper 1.0.2",
"tokio",
"tokio-native-tls",
"tokio-rustls",
"tokio-util",
"tower",
"tower-http",
@@ -3853,6 +3992,7 @@ dependencies = [
"wasm-bindgen-futures",
"wasm-streams",
"web-sys",
"webpki-roots",
]
[[package]]
@@ -4016,7 +4156,7 @@ dependencies = [
"flurry",
"log",
"serde",
"thiserror 2.0.12",
"thiserror 2.0.14",
"tokio",
"tokio-util",
]
@@ -4042,6 +4182,12 @@ version = "0.1.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "989e6739f80c4ad5b13e0fd7fe89531180375b18520cc8c82080e4dc4035b84f"
[[package]]
name = "rustc-hash"
version = "2.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
[[package]]
name = "rustc_version"
version = "0.4.1"
@@ -4141,6 +4287,7 @@ version = "1.12.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "229a4a4c221013e7e1f1a043678c5cc39fe5171437c88fb47151a21e6f5b5c79"
dependencies = [
"web-time",
"zeroize",
]
@@ -4579,7 +4726,7 @@ checksum = "297f631f50729c8c99b84667867963997ec0b50f32b2a7dbcab828ef0541e8bb"
dependencies = [
"num-bigint",
"num-traits",
"thiserror 2.0.12",
"thiserror 2.0.14",
"time",
]
@@ -4626,6 +4773,16 @@ dependencies = [
"windows-sys 0.52.0",
]
[[package]]
name = "socket2"
version = "0.6.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "233504af464074f9d066d7b5416c5f9b894a5862a6506e306f7b816cdd6f1807"
dependencies = [
"libc",
"windows-sys 0.59.0",
]
[[package]]
name = "spin"
version = "0.9.8"
@@ -4763,9 +4920,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292"
[[package]]
name = "syn"
version = "2.0.104"
version = "2.0.105"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17b6f705963418cdb9927482fa304bc562ece2fdd4f616084c50b7023b435a40"
checksum = "7bc3fcb250e53458e712715cf74285c1f889686520d79294a9ef3bd7aa1fc619"
dependencies = [
"proc-macro2",
"quote",
@@ -4899,11 +5056,11 @@ dependencies = [
[[package]]
name = "thiserror"
version = "2.0.12"
version = "2.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "567b8a2dae586314f7be2a752ec7474332959c6460e02bde30d702a66d488708"
checksum = "0b0949c3a6c842cbde3f1686d6eea5a010516deb7085f79db747562d4102f41e"
dependencies = [
"thiserror-impl 2.0.12",
"thiserror-impl 2.0.14",
]
[[package]]
@@ -4919,9 +5076,9 @@ dependencies = [
[[package]]
name = "thiserror-impl"
version = "2.0.12"
version = "2.0.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7f7cf42b4507d8ea322120659672cf1b9dbb93f8f2d4ecfd6e51350ff5b17a1d"
checksum = "cc5b44b4ab9c2fdd0e0512e6bece8388e214c0749f5862b114cc5b7a25daf227"
dependencies = [
"proc-macro2",
"quote",
@@ -4988,20 +5145,38 @@ dependencies = [
]
[[package]]
name = "tokio"
version = "1.45.1"
name = "tinyvec"
version = "1.9.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75ef51a33ef1da925cea3e4eb122833cb377c61439ca401b770f54902b806779"
checksum = "09b3661f17e86524eccd4371ab0429194e0d7c008abb45f7a7495b1719463c71"
dependencies = [
"tinyvec_macros",
]
[[package]]
name = "tinyvec_macros"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1f3ccbac311fea05f86f61904b462b55fb3df8837a366dfc601a0161d0532f20"
[[package]]
name = "tokio"
version = "1.47.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "89e49afdadebb872d3145a5638b59eb0691ea23e46ca484037cfab3b76b95038"
dependencies = [
"backtrace",
"bytes",
"io-uring",
"libc",
"mio 1.0.4",
"parking_lot",
"pin-project-lite",
"signal-hook-registry",
"socket2",
"slab",
"socket2 0.6.0",
"tokio-macros",
"windows-sys 0.52.0",
"windows-sys 0.59.0",
]
[[package]]
@@ -5251,7 +5426,7 @@ dependencies = [
"log",
"rand 0.9.1",
"sha1",
"thiserror 2.0.12",
"thiserror 2.0.14",
"utf-8",
]
@@ -5552,6 +5727,15 @@ dependencies = [
"wasm-bindgen",
]
[[package]]
name = "webpki-roots"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e8983c3ab33d6fb807cfcdad2491c4ea8cbc8ed839181c7dfd9c67c83e261b2"
dependencies = [
"rustls-pki-types",
]
[[package]]
name = "winapi"
version = "0.3.9"

View File

@@ -12,6 +12,8 @@ members = [
"harmony_cli",
"k3d",
"harmony_composer",
"harmony_secret_derive",
"harmony_secret",
]
[workspace.package]
@@ -53,6 +55,10 @@ chrono = "0.4"
similar = "2"
uuid = { version = "1.11", features = ["v4", "fast-rng", "macro-diagnostics"] }
pretty_assertions = "1.4.1"
tempfile = "3.20.0"
bollard = "0.19.1"
base64 = "0.22.1"
tar = "0.4.44"
lazy_static = "1.5.0"
directories = "6.0.0"
thiserror = "2.0.14"

View File

@@ -13,6 +13,7 @@ WORKDIR /app
RUN rustup target add x86_64-pc-windows-gnu
RUN rustup target add x86_64-unknown-linux-gnu
RUN rustup component add rustfmt
RUN rustup component add clippy
RUN apt update
@@ -22,4 +23,4 @@ RUN apt install -y nodejs docker.io mingw-w64
COPY --from=build /app/target/release/harmony_composer .
ENTRYPOINT ["/app/harmony_composer"]
ENTRYPOINT ["/app/harmony_composer"]

View File

@@ -1,5 +1,7 @@
#!/bin/sh
set -e
cargo check --all-targets --all-features --keep-going
cargo fmt --check
cargo clippy
cargo test

View File

@@ -1,17 +1,11 @@
use std::{path::PathBuf, sync::Arc};
use std::{path::PathBuf, str::FromStr, sync::Arc};
use harmony::{
data::Id,
inventory::Inventory,
maestro::Maestro,
modules::{
application::{
ApplicationScore, RustWebFramework, RustWebapp,
features::{ContinuousDelivery, Monitoring},
},
monitoring::alert_channel::{
discord_alert_channel::DiscordWebhook, webhook_receiver::WebhookReceiver,
},
application::{ApplicationScore, RustWebFramework, RustWebapp, features::Monitoring},
monitoring::alert_channel::webhook_receiver::WebhookReceiver,
tenant::TenantScore,
},
topology::{K8sAnywhereTopology, Url, tenant::TenantConfig},
@@ -25,7 +19,7 @@ async fn main() {
//the TenantConfig.name must match
let tenant = TenantScore {
config: TenantConfig {
id: Id::from_str("test-tenant-id"),
id: Id::from_str("test-tenant-id").unwrap(),
name: "example-monitoring".to_string(),
..Default::default()
},

View File

@@ -125,40 +125,47 @@ spec:
name: nginx"#,
)
.unwrap();
return deployment;
deployment
}
fn nginx_deployment_2() -> Deployment {
let mut pod_template = PodTemplateSpec::default();
pod_template.metadata = Some(ObjectMeta {
labels: Some(BTreeMap::from([(
"app".to_string(),
"nginx-test".to_string(),
)])),
..Default::default()
});
pod_template.spec = Some(PodSpec {
containers: vec![Container {
name: "nginx".to_string(),
image: Some("nginx".to_string()),
let pod_template = PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(BTreeMap::from([(
"app".to_string(),
"nginx-test".to_string(),
)])),
..Default::default()
}],
..Default::default()
});
let mut spec = DeploymentSpec::default();
spec.template = pod_template;
spec.selector = LabelSelector {
match_expressions: None,
match_labels: Some(BTreeMap::from([(
"app".to_string(),
"nginx-test".to_string(),
)])),
}),
spec: Some(PodSpec {
containers: vec![Container {
name: "nginx".to_string(),
image: Some("nginx".to_string()),
..Default::default()
}],
..Default::default()
}),
};
let mut deployment = Deployment::default();
deployment.spec = Some(spec);
deployment.metadata.name = Some("nginx-test".to_string());
let spec = DeploymentSpec {
template: pod_template,
selector: LabelSelector {
match_expressions: None,
match_labels: Some(BTreeMap::from([(
"app".to_string(),
"nginx-test".to_string(),
)])),
},
..Default::default()
};
deployment
Deployment {
spec: Some(spec),
metadata: ObjectMeta {
name: Some("nginx-test".to_string()),
..Default::default()
},
..Default::default()
}
}
fn nginx_deployment() -> Deployment {

View File

@@ -23,7 +23,7 @@ async fn main() {
// This config can be extended as needed for more complicated configurations
config: LAMPConfig {
project_root: "./php".into(),
database_size: format!("4Gi").into(),
database_size: "4Gi".to_string().into(),
..Default::default()
},
};

View File

@@ -1,4 +1,4 @@
use std::collections::HashMap;
use std::{collections::HashMap, str::FromStr};
use harmony::{
data::Id,
@@ -28,7 +28,7 @@ use harmony::{
async fn main() {
let tenant = TenantScore {
config: TenantConfig {
id: Id::from_string("1234".to_string()),
id: Id::from_str("1234").unwrap(),
name: "test-tenant".to_string(),
resource_limits: ResourceLimits {
cpu_request_cores: 6.0,

View File

@@ -1,3 +1,5 @@
use std::str::FromStr;
use harmony::{
data::Id,
inventory::Inventory,
@@ -9,7 +11,7 @@ use harmony::{
async fn main() {
let tenant = TenantScore {
config: TenantConfig {
id: Id::from_str("test-tenant-id"),
id: Id::from_str("test-tenant-id").unwrap(),
name: "testtenant".to_string(),
..Default::default()
},

View File

@@ -5,6 +5,9 @@ version.workspace = true
readme.workspace = true
license.workspace = true
[features]
testing = []
[dependencies]
rand = "0.9"
hex = "0.4"
@@ -35,8 +38,8 @@ serde-value.workspace = true
helm-wrapper-rs = "0.4.0"
non-blank-string-rs = "1.0.4"
k3d-rs = { path = "../k3d" }
directories = "6.0.0"
lazy_static = "1.5.0"
directories.workspace = true
lazy_static.workspace = true
dockerfile_builder = "0.1.5"
temp-file = "0.1.9"
convert_case.workspace = true
@@ -56,7 +59,7 @@ similar.workspace = true
futures-util = "0.3.31"
tokio-util = "0.7.15"
strum = { version = "0.27.1", features = ["derive"] }
tempfile = "3.20.0"
tempfile.workspace = true
serde_with = "3.14.0"
schemars = "0.8.22"
kube-derive = "1.1.0"
@@ -64,6 +67,7 @@ bollard.workspace = true
tar.workspace = true
base64.workspace = true
once_cell = "1.21.3"
harmony-secret-derive = { version = "0.1.0", path = "../harmony_secret_derive" }
[dev-dependencies]
pretty_assertions.workspace = true

BIN
harmony/harmony.rlib Normal file

Binary file not shown.

View File

@@ -11,5 +11,5 @@ lazy_static! {
pub static ref REGISTRY_PROJECT: String =
std::env::var("HARMONY_REGISTRY_PROJECT").unwrap_or_else(|_| "harmony".to_string());
pub static ref DRY_RUN: bool =
std::env::var("HARMONY_DRY_RUN").map_or(true, |value| value.parse().unwrap_or(true));
std::env::var("HARMONY_DRY_RUN").is_ok_and(|value| value.parse().unwrap_or(false));
}

View File

@@ -1,5 +1,6 @@
use rand::distr::Alphanumeric;
use rand::distr::SampleString;
use std::str::FromStr;
use std::time::SystemTime;
use std::time::UNIX_EPOCH;
@@ -23,13 +24,13 @@ pub struct Id {
value: String,
}
impl Id {
pub fn from_string(value: String) -> Self {
Self { value }
}
impl FromStr for Id {
type Err = ();
pub fn from_str(value: &str) -> Self {
Self::from_string(value.to_string())
fn from_str(s: &str) -> Result<Self, Self::Err> {
Ok(Id {
value: s.to_string(),
})
}
}

View File

@@ -47,7 +47,7 @@ impl serde::Serialize for Version {
impl std::fmt::Display for Version {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
return self.value.fmt(f);
self.value.fmt(f)
}
}

View File

@@ -35,10 +35,9 @@ impl PhysicalHost {
pub fn cluster_mac(&self) -> MacAddress {
self.network
.get(0)
.first()
.expect("Cluster physical host should have a network interface")
.mac_address
.clone()
}
pub fn cpu(mut self, cpu_count: Option<u64>) -> Self {

View File

@@ -2,28 +2,42 @@ use log::debug;
use once_cell::sync::Lazy;
use tokio::sync::broadcast;
use super::interpret::{InterpretError, Outcome};
use crate::modules::application::ApplicationFeatureStatus;
use super::{
interpret::{InterpretError, Outcome},
topology::TopologyStatus,
};
#[derive(Debug, Clone)]
pub enum HarmonyEvent {
HarmonyStarted,
PrepareTopologyStarted {
topology: String,
},
TopologyPrepared {
topology: String,
outcome: Outcome,
},
HarmonyFinished,
InterpretExecutionStarted {
execution_id: String,
topology: String,
interpret: String,
score: String,
message: String,
},
InterpretExecutionFinished {
execution_id: String,
topology: String,
interpret: String,
score: String,
outcome: Result<Outcome, InterpretError>,
},
TopologyStateChanged {
topology: String,
status: TopologyStatus,
message: Option<String>,
},
ApplicationFeatureStateChanged {
topology: String,
application: String,
feature: String,
status: ApplicationFeatureStatus,
},
}
static HARMONY_EVENT_BUS: Lazy<broadcast::Sender<HarmonyEvent>> = Lazy::new(|| {
@@ -33,9 +47,14 @@ static HARMONY_EVENT_BUS: Lazy<broadcast::Sender<HarmonyEvent>> = Lazy::new(|| {
});
pub fn instrument(event: HarmonyEvent) -> Result<(), &'static str> {
match HARMONY_EVENT_BUS.send(event) {
Ok(_) => Ok(()),
Err(_) => Err("send error: no subscribers"),
if cfg!(any(test, feature = "testing")) {
let _ = event; // Suppress the "unused variable" warning for `event`
Ok(())
} else {
match HARMONY_EVENT_BUS.send(event) {
Ok(_) => Ok(()),
Err(_) => Err("send error: no subscribers"),
}
}
}

View File

@@ -7,6 +7,7 @@ use super::{
data::{Id, Version},
executors::ExecutorError,
inventory::Inventory,
topology::PreparationError,
};
pub enum InterpretName {
@@ -23,6 +24,15 @@ pub enum InterpretName {
TenantInterpret,
Application,
ArgoCD,
Alerting,
Ntfy,
HelmChart,
HelmCommand,
K8sResource,
Lamp,
ApplicationMonitoring,
K8sPrometheusCrdAlerting,
Custom(&'static str),
}
impl std::fmt::Display for InterpretName {
@@ -41,6 +51,15 @@ impl std::fmt::Display for InterpretName {
InterpretName::TenantInterpret => f.write_str("Tenant"),
InterpretName::Application => f.write_str("Application"),
InterpretName::ArgoCD => f.write_str("ArgoCD"),
InterpretName::Alerting => f.write_str("Alerting"),
InterpretName::Ntfy => f.write_str("Ntfy"),
InterpretName::HelmChart => f.write_str("HelmChart"),
InterpretName::HelmCommand => f.write_str("HelmCommand"),
InterpretName::K8sResource => f.write_str("K8sResource"),
InterpretName::Lamp => f.write_str("LAMP"),
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
InterpretName::Custom(name) => f.write_str(name),
}
}
}
@@ -113,6 +132,14 @@ impl std::fmt::Display for InterpretError {
}
impl Error for InterpretError {}
impl From<PreparationError> for InterpretError {
fn from(value: PreparationError) -> Self {
Self {
msg: format!("InterpretError : {value}"),
}
}
}
impl From<ExecutorError> for InterpretError {
fn from(value: ExecutorError) -> Self {
Self {

View File

@@ -1,14 +1,14 @@
use std::sync::{Arc, Mutex, RwLock};
use std::sync::{Arc, RwLock};
use log::{debug, warn};
use crate::instrumentation::{self, HarmonyEvent};
use crate::topology::TopologyStatus;
use super::{
interpret::{InterpretError, InterpretStatus, Outcome},
interpret::{InterpretError, Outcome},
inventory::Inventory,
score::Score,
topology::Topology,
topology::{PreparationError, PreparationOutcome, Topology, TopologyState},
};
type ScoreVec<T> = Vec<Box<dyn Score<T>>>;
@@ -17,7 +17,7 @@ pub struct Maestro<T: Topology> {
inventory: Inventory,
topology: T,
scores: Arc<RwLock<ScoreVec<T>>>,
topology_preparation_result: Mutex<Option<Outcome>>,
topology_state: TopologyState,
}
impl<T: Topology> Maestro<T> {
@@ -25,41 +25,46 @@ impl<T: Topology> Maestro<T> {
///
/// This should rarely be used. Most of the time Maestro::initialize should be used instead.
pub fn new_without_initialization(inventory: Inventory, topology: T) -> Self {
let topology_name = topology.name().to_string();
Self {
inventory,
topology,
scores: Arc::new(RwLock::new(Vec::new())),
topology_preparation_result: None.into(),
topology_state: TopologyState::new(topology_name),
}
}
pub async fn initialize(inventory: Inventory, topology: T) -> Result<Self, InterpretError> {
let instance = Self::new_without_initialization(inventory, topology);
pub async fn initialize(inventory: Inventory, topology: T) -> Result<Self, PreparationError> {
let mut instance = Self::new_without_initialization(inventory, topology);
instance.prepare_topology().await?;
Ok(instance)
}
/// Ensures the associated Topology is ready for operations.
/// Delegates the readiness check and potential setup actions to the Topology.
pub async fn prepare_topology(&self) -> Result<Outcome, InterpretError> {
instrumentation::instrument(HarmonyEvent::PrepareTopologyStarted {
topology: self.topology.name().to_string(),
})
.unwrap();
async fn prepare_topology(&mut self) -> Result<PreparationOutcome, PreparationError> {
self.topology_state.prepare();
let outcome = self.topology.ensure_ready().await?;
let result = self.topology.ensure_ready().await;
instrumentation::instrument(HarmonyEvent::TopologyPrepared {
topology: self.topology.name().to_string(),
outcome: outcome.clone(),
})
.unwrap();
self.topology_preparation_result
.lock()
.unwrap()
.replace(outcome.clone());
Ok(outcome)
match result {
Ok(outcome) => {
match outcome.clone() {
PreparationOutcome::Success { details } => {
self.topology_state.success(details);
}
PreparationOutcome::Noop => {
self.topology_state.noop();
}
};
Ok(outcome)
}
Err(err) => {
self.topology_state.error(err.to_string());
Err(err)
}
}
}
pub fn register_all(&mut self, mut scores: ScoreVec<T>) {
@@ -68,15 +73,7 @@ impl<T: Topology> Maestro<T> {
}
fn is_topology_initialized(&self) -> bool {
let result = self.topology_preparation_result.lock().unwrap();
if let Some(outcome) = result.as_ref() {
match outcome.status {
InterpretStatus::SUCCESS => return true,
_ => return false,
}
} else {
false
}
self.topology_state.status == TopologyStatus::Success
}
pub async fn interpret(&self, score: Box<dyn Score<T>>) -> Result<Outcome, InterpretError> {
@@ -87,10 +84,8 @@ impl<T: Topology> Maestro<T> {
self.topology.name(),
);
}
debug!("Running score {score:?}");
let interpret = score.create_interpret();
debug!("Launching interpret {interpret:?}");
let result = interpret.execute(&self.inventory, &self.topology).await;
debug!("Interpreting score {score:?}");
let result = score.interpret(&self.inventory, &self.topology).await;
debug!("Got result {result:?}");
result
}

View File

@@ -1,22 +1,62 @@
use std::collections::BTreeMap;
use async_trait::async_trait;
use serde::Serialize;
use serde_value::Value;
use super::{interpret::Interpret, topology::Topology};
use super::{
data::Id,
instrumentation::{self, HarmonyEvent},
interpret::{Interpret, InterpretError, Outcome},
inventory::Inventory,
topology::Topology,
};
#[async_trait]
pub trait Score<T: Topology>:
std::fmt::Debug + ScoreToString<T> + Send + Sync + CloneBoxScore<T> + SerializeScore<T>
{
fn create_interpret(&self) -> Box<dyn Interpret<T>>;
async fn interpret(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let id = Id::default();
let interpret = self.create_interpret();
instrumentation::instrument(HarmonyEvent::InterpretExecutionStarted {
execution_id: id.clone().to_string(),
topology: topology.name().into(),
interpret: interpret.get_name().to_string(),
score: self.name(),
message: format!("{} running...", interpret.get_name()),
})
.unwrap();
let result = interpret.execute(inventory, topology).await;
instrumentation::instrument(HarmonyEvent::InterpretExecutionFinished {
execution_id: id.clone().to_string(),
topology: topology.name().into(),
interpret: interpret.get_name().to_string(),
score: self.name(),
outcome: result.clone(),
})
.unwrap();
result
}
fn name(&self) -> String;
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>>;
}
pub trait SerializeScore<T: Topology> {
fn serialize(&self) -> Value;
}
impl<'de, S, T> SerializeScore<T> for S
impl<S, T> SerializeScore<T> for S
where
T: Topology,
S: Score<T> + Serialize,
@@ -24,7 +64,7 @@ where
fn serialize(&self) -> Value {
// TODO not sure if this is the right place to handle the error or it should bubble
// up?
serde_value::to_value(&self).expect("Score should serialize successfully")
serde_value::to_value(self).expect("Score should serialize successfully")
}
}

View File

@@ -4,8 +4,6 @@ use harmony_types::net::MacAddress;
use log::info;
use crate::executors::ExecutorError;
use crate::interpret::InterpretError;
use crate::interpret::Outcome;
use super::DHCPStaticEntry;
use super::DhcpServer;
@@ -19,6 +17,8 @@ use super::K8sclient;
use super::LoadBalancer;
use super::LoadBalancerService;
use super::LogicalHost;
use super::PreparationError;
use super::PreparationOutcome;
use super::Router;
use super::TftpServer;
@@ -48,7 +48,7 @@ impl Topology for HAClusterTopology {
fn name(&self) -> &str {
"HAClusterTopology"
}
async fn ensure_ready(&self) -> Result<Outcome, InterpretError> {
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
todo!(
"ensure_ready, not entirely sure what it should do here, probably something like verify that the hosts are reachable and all services are up and ready."
)
@@ -244,10 +244,12 @@ impl Topology for DummyInfra {
todo!()
}
async fn ensure_ready(&self) -> Result<Outcome, InterpretError> {
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
let dummy_msg = "This is a dummy infrastructure that does nothing";
info!("{dummy_msg}");
Ok(Outcome::success(dummy_msg.to_string()))
Ok(PreparationOutcome::Success {
details: dummy_msg.into(),
})
}
}

View File

@@ -1,5 +1,4 @@
use derive_new::new;
use futures_util::StreamExt;
use k8s_openapi::{
ClusterResourceScope, NamespaceResourceScope,
api::{apps::v1::Deployment, core::v1::Pod},
@@ -18,7 +17,7 @@ use kube::{
};
use log::{debug, error, trace};
use serde::{Serialize, de::DeserializeOwned};
use similar::{DiffableStr, TextDiff};
use similar::TextDiff;
#[derive(new, Clone)]
pub struct K8sClient {
@@ -67,13 +66,13 @@ impl K8sClient {
}
let establish = await_condition(api, name.as_str(), conditions::is_deployment_completed());
let t = if let Some(t) = timeout { t } else { 300 };
let t = timeout.unwrap_or(300);
let res = tokio::time::timeout(std::time::Duration::from_secs(t), establish).await;
if let Ok(r) = res {
return Ok(());
if res.is_ok() {
Ok(())
} else {
return Err("timed out while waiting for deployment".to_string());
Err("timed out while waiting for deployment".to_string())
}
}
@@ -112,7 +111,7 @@ impl K8sClient {
.await;
match res {
Err(e) => return Err(e.to_string()),
Err(e) => Err(e.to_string()),
Ok(mut process) => {
let status = process
.take_status()
@@ -121,14 +120,10 @@ impl K8sClient {
.expect("Couldn't unwrap status");
if let Some(s) = status.status {
debug!("Status: {}", s);
if s == "Success" {
return Ok(());
} else {
return Err(s);
}
debug!("Status: {} - {:?}", s, status.details);
if s == "Success" { Ok(()) } else { Err(s) }
} else {
return Err("Couldn't get inner status of pod exec".to_string());
Err("Couldn't get inner status of pod exec".to_string())
}
}
}
@@ -169,8 +164,9 @@ impl K8sClient {
trace!("Received current value {current:#?}");
// The resource exists, so we calculate and display a diff.
println!("\nPerforming dry-run for resource: '{}'", name);
let mut current_yaml = serde_yaml::to_value(&current)
.expect(&format!("Could not serialize current value : {current:#?}"));
let mut current_yaml = serde_yaml::to_value(&current).unwrap_or_else(|_| {
panic!("Could not serialize current value : {current:#?}")
});
if current_yaml.is_mapping() && current_yaml.get("status").is_some() {
let map = current_yaml.as_mapping_mut().unwrap();
let removed = map.remove_entry("status");
@@ -237,7 +233,7 @@ impl K8sClient {
}
}
pub async fn apply_many<K>(&self, resource: &Vec<K>, ns: Option<&str>) -> Result<Vec<K>, Error>
pub async fn apply_many<K>(&self, resource: &[K], ns: Option<&str>) -> Result<Vec<K>, Error>
where
K: Resource + Clone + std::fmt::Debug + DeserializeOwned + serde::Serialize,
<K as Resource>::Scope: ApplyStrategy<K>,
@@ -253,7 +249,7 @@ impl K8sClient {
pub async fn apply_yaml_many(
&self,
yaml: &Vec<serde_yaml::Value>,
#[allow(clippy::ptr_arg)] yaml: &Vec<serde_yaml::Value>,
ns: Option<&str>,
) -> Result<(), Error> {
for y in yaml.iter() {

View File

@@ -7,7 +7,7 @@ use tokio::sync::OnceCell;
use crate::{
executors::ExecutorError,
interpret::{InterpretError, InterpretStatus, Outcome},
interpret::InterpretStatus,
inventory::Inventory,
modules::{
k3d::K3DInstallationScore,
@@ -24,10 +24,17 @@ use crate::{
};
use super::{
DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, Topology,
DeploymentTarget, HelmCommand, K8sclient, MultiTargetTopology, PreparationError,
PreparationOutcome, Topology,
k8s::K8sClient,
oberservability::monitoring::AlertReceiver,
tenant::{TenantConfig, TenantManager, k8s::K8sTenantManager},
tenant::{
TenantConfig, TenantManager,
k8s::K8sTenantManager,
network_policy::{
K3dNetworkPolicyStrategy, NetworkPolicyStrategy, NoopNetworkPolicyStrategy,
},
},
};
#[derive(Clone, Debug)]
@@ -74,20 +81,30 @@ impl PrometheusApplicationMonitoring<CRDPrometheus> for K8sAnywhereTopology {
sender: &CRDPrometheus,
inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<CRDPrometheus>>>>,
) -> Result<Outcome, InterpretError> {
) -> Result<PreparationOutcome, PreparationError> {
let po_result = self.ensure_prometheus_operator(sender).await?;
if po_result.status == InterpretStatus::NOOP {
if po_result == PreparationOutcome::Noop {
debug!("Skipping Prometheus CR installation due to missing operator.");
return Ok(Outcome::noop());
return Ok(po_result);
}
self.get_k8s_prometheus_application_score(sender.clone(), receivers)
.await
.create_interpret()
.execute(inventory, self)
.await?;
Ok(Outcome::success(format!("No action, working on cluster ")))
let result = self
.get_k8s_prometheus_application_score(sender.clone(), receivers)
.await
.interpret(inventory, self)
.await;
match result {
Ok(outcome) => match outcome.status {
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
details: outcome.message,
}),
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
_ => Err(PreparationError::new(outcome.message)),
},
Err(err) => Err(PreparationError::new(err.to_string())),
}
}
}
@@ -124,7 +141,7 @@ impl K8sAnywhereTopology {
) -> K8sPrometheusCRDAlertingScore {
K8sPrometheusCRDAlertingScore {
sender,
receivers: receivers.unwrap_or_else(Vec::new),
receivers: receivers.unwrap_or_default(),
service_monitors: vec![],
prometheus_rules: vec![],
}
@@ -158,15 +175,23 @@ impl K8sAnywhereTopology {
K3DInstallationScore::default()
}
async fn try_install_k3d(&self) -> Result<(), InterpretError> {
self.get_k3d_installation_score()
.create_interpret()
.execute(&Inventory::empty(), self)
.await?;
Ok(())
async fn try_install_k3d(&self) -> Result<(), PreparationError> {
let result = self
.get_k3d_installation_score()
.interpret(&Inventory::empty(), self)
.await;
match result {
Ok(outcome) => match outcome.status {
InterpretStatus::SUCCESS => Ok(()),
InterpretStatus::NOOP => Ok(()),
_ => Err(PreparationError::new(outcome.message)),
},
Err(err) => Err(PreparationError::new(err.to_string())),
}
}
async fn try_get_or_install_k8s_client(&self) -> Result<Option<K8sState>, InterpretError> {
async fn try_get_or_install_k8s_client(&self) -> Result<Option<K8sState>, PreparationError> {
let k8s_anywhere_config = &self.config;
// TODO this deserves some refactoring, it is becoming a bit hard to figure out
@@ -176,7 +201,7 @@ impl K8sAnywhereTopology {
} else {
if let Some(kubeconfig) = &k8s_anywhere_config.kubeconfig {
debug!("Loading kubeconfig {kubeconfig}");
match self.try_load_kubeconfig(&kubeconfig).await {
match self.try_load_kubeconfig(kubeconfig).await {
Some(client) => {
return Ok(Some(K8sState {
client: Arc::new(client),
@@ -185,7 +210,7 @@ impl K8sAnywhereTopology {
}));
}
None => {
return Err(InterpretError::new(format!(
return Err(PreparationError::new(format!(
"Failed to load kubeconfig from {kubeconfig}"
)));
}
@@ -231,16 +256,21 @@ impl K8sAnywhereTopology {
Ok(Some(state))
}
async fn ensure_k8s_tenant_manager(&self) -> Result<(), String> {
if let Some(_) = self.tenant_manager.get() {
async fn ensure_k8s_tenant_manager(&self, k8s_state: &K8sState) -> Result<(), String> {
if self.tenant_manager.get().is_some() {
return Ok(());
}
self.tenant_manager
.get_or_try_init(async || -> Result<K8sTenantManager, String> {
// TOOD: checker si K8s ou K3d/s tenant manager (ref. issue https://git.nationtech.io/NationTech/harmony/issues/94)
let k8s_client = self.k8s_client().await?;
Ok(K8sTenantManager::new(k8s_client))
let network_policy_strategy: Box<dyn NetworkPolicyStrategy> = match k8s_state.source
{
K8sSource::LocalK3d => Box::new(K3dNetworkPolicyStrategy::new()),
K8sSource::Kubeconfig => Box::new(NoopNetworkPolicyStrategy::new()),
};
Ok(K8sTenantManager::new(k8s_client, network_policy_strategy))
})
.await?;
@@ -259,11 +289,11 @@ impl K8sAnywhereTopology {
async fn ensure_prometheus_operator(
&self,
sender: &CRDPrometheus,
) -> Result<Outcome, InterpretError> {
) -> Result<PreparationOutcome, PreparationError> {
let status = Command::new("sh")
.args(["-c", "kubectl get crd -A | grep -i prometheuses"])
.status()
.map_err(|e| InterpretError::new(format!("could not connect to cluster: {}", e)))?;
.map_err(|e| PreparationError::new(format!("could not connect to cluster: {}", e)))?;
if !status.success() {
if let Some(Some(k8s_state)) = self.k8s_state.get() {
@@ -272,30 +302,37 @@ impl K8sAnywhereTopology {
debug!("installing prometheus operator");
let op_score =
prometheus_operator_helm_chart_score(sender.namespace.clone());
op_score
.create_interpret()
.execute(&Inventory::empty(), self)
.await?;
return Ok(Outcome::success(
"installed prometheus operator".to_string(),
));
let result = op_score.interpret(&Inventory::empty(), self).await;
return match result {
Ok(outcome) => match outcome.status {
InterpretStatus::SUCCESS => Ok(PreparationOutcome::Success {
details: "installed prometheus operator".into(),
}),
InterpretStatus::NOOP => Ok(PreparationOutcome::Noop),
_ => Err(PreparationError::new(
"failed to install prometheus operator (unknown error)".into(),
)),
},
Err(err) => Err(PreparationError::new(err.to_string())),
};
}
K8sSource::Kubeconfig => {
debug!("unable to install prometheus operator, contact cluster admin");
return Ok(Outcome::noop());
return Ok(PreparationOutcome::Noop);
}
}
} else {
warn!("Unable to detect k8s_state. Skipping Prometheus Operator install.");
return Ok(Outcome::noop());
return Ok(PreparationOutcome::Noop);
}
}
debug!("Prometheus operator is already present, skipping install");
Ok(Outcome::success(
"prometheus operator present in cluster".to_string(),
))
Ok(PreparationOutcome::Success {
details: "prometheus operator present in cluster".into(),
})
}
}
@@ -354,26 +391,25 @@ impl Topology for K8sAnywhereTopology {
"K8sAnywhereTopology"
}
async fn ensure_ready(&self) -> Result<Outcome, InterpretError> {
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
let k8s_state = self
.k8s_state
.get_or_try_init(|| self.try_get_or_install_k8s_client())
.await?;
let k8s_state: &K8sState = k8s_state.as_ref().ok_or(InterpretError::new(
"No K8s client could be found or installed".to_string(),
let k8s_state: &K8sState = k8s_state.as_ref().ok_or(PreparationError::new(
"no K8s client could be found or installed".to_string(),
))?;
self.ensure_k8s_tenant_manager()
self.ensure_k8s_tenant_manager(k8s_state)
.await
.map_err(|e| InterpretError::new(e))?;
.map_err(PreparationError::new)?;
match self.is_helm_available() {
Ok(()) => Ok(Outcome::success(format!(
"{} + helm available",
k8s_state.message.clone()
))),
Err(e) => Err(InterpretError::new(format!("helm unavailable: {}", e))),
Ok(()) => Ok(PreparationOutcome::Success {
details: format!("{} + helm available", k8s_state.message.clone()),
}),
Err(e) => Err(PreparationError::new(format!("helm unavailable: {}", e))),
}
}
}

View File

@@ -1,9 +1,7 @@
use async_trait::async_trait;
use derive_new::new;
use crate::interpret::{InterpretError, Outcome};
use super::{HelmCommand, Topology};
use super::{HelmCommand, PreparationError, PreparationOutcome, Topology};
#[derive(new)]
pub struct LocalhostTopology;
@@ -14,10 +12,10 @@ impl Topology for LocalhostTopology {
"LocalHostTopology"
}
async fn ensure_ready(&self) -> Result<Outcome, InterpretError> {
Ok(Outcome::success(
"Localhost is Chuck Norris, always ready.".to_string(),
))
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
Ok(PreparationOutcome::Success {
details: "Localhost is Chuck Norris, always ready.".into(),
})
}
}

View File

@@ -6,6 +6,7 @@ mod k8s_anywhere;
mod localhost;
pub mod oberservability;
pub mod tenant;
use derive_new::new;
pub use k8s_anywhere::*;
pub use localhost::*;
pub mod k8s;
@@ -26,10 +27,13 @@ pub use tftp::*;
mod helm_command;
pub use helm_command::*;
use super::{
executors::ExecutorError,
instrumentation::{self, HarmonyEvent},
};
use std::error::Error;
use std::net::IpAddr;
use super::interpret::{InterpretError, Outcome};
/// Represents a logical view of an infrastructure environment providing specific capabilities.
///
/// A Topology acts as a self-contained "package" responsible for managing access
@@ -57,9 +61,128 @@ pub trait Topology: Send + Sync {
/// * **Internal Orchestration:** For complex topologies, this method might manage dependencies on other sub-topologies, ensuring *their* `ensure_ready` is called first. Using nested `Maestros` to run setup `Scores` against these sub-topologies is the recommended pattern for non-trivial bootstrapping, allowing reuse of Harmony's core orchestration logic.
///
/// # Returns
/// - `Ok(Outcome)`: Indicates the topology is now ready. The `Outcome` status might be `SUCCESS` if actions were taken, or `NOOP` if it was already ready. The message should provide context.
/// - `Err(TopologyError)`: Indicates the topology could not reach a ready state due to configuration issues, discovery failures, bootstrap errors, or unsupported environments.
async fn ensure_ready(&self) -> Result<Outcome, InterpretError>;
/// - `Ok(PreparationOutcome)`: Indicates the topology is now ready. The `Outcome` status might be `SUCCESS` if actions were taken, or `NOOP` if it was already ready. The message should provide context.
/// - `Err(PreparationError)`: Indicates the topology could not reach a ready state due to configuration issues, discovery failures, bootstrap errors, or unsupported environments.
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError>;
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum PreparationOutcome {
Success { details: String },
Noop,
}
#[derive(Debug, Clone, new)]
pub struct PreparationError {
msg: String,
}
impl std::fmt::Display for PreparationError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.msg)
}
}
impl Error for PreparationError {}
impl From<ExecutorError> for PreparationError {
fn from(value: ExecutorError) -> Self {
Self {
msg: format!("InterpretError : {value}"),
}
}
}
impl From<kube::Error> for PreparationError {
fn from(value: kube::Error) -> Self {
Self {
msg: format!("PreparationError : {value}"),
}
}
}
impl From<String> for PreparationError {
fn from(value: String) -> Self {
Self {
msg: format!("PreparationError : {value}"),
}
}
}
#[derive(Clone, Debug, PartialEq)]
pub enum TopologyStatus {
Queued,
Preparing,
Success,
Noop,
Error,
}
pub struct TopologyState {
pub topology: String,
pub status: TopologyStatus,
}
impl TopologyState {
pub fn new(topology: String) -> Self {
let instance = Self {
topology,
status: TopologyStatus::Queued,
};
instrumentation::instrument(HarmonyEvent::TopologyStateChanged {
topology: instance.topology.clone(),
status: instance.status.clone(),
message: None,
})
.unwrap();
instance
}
pub fn prepare(&mut self) {
self.status = TopologyStatus::Preparing;
instrumentation::instrument(HarmonyEvent::TopologyStateChanged {
topology: self.topology.clone(),
status: self.status.clone(),
message: None,
})
.unwrap();
}
pub fn success(&mut self, message: String) {
self.status = TopologyStatus::Success;
instrumentation::instrument(HarmonyEvent::TopologyStateChanged {
topology: self.topology.clone(),
status: self.status.clone(),
message: Some(message),
})
.unwrap();
}
pub fn noop(&mut self) {
self.status = TopologyStatus::Noop;
instrumentation::instrument(HarmonyEvent::TopologyStateChanged {
topology: self.topology.clone(),
status: self.status.clone(),
message: None,
})
.unwrap();
}
pub fn error(&mut self, message: String) {
self.status = TopologyStatus::Error;
instrumentation::instrument(HarmonyEvent::TopologyStateChanged {
topology: self.topology.clone(),
status: self.status.clone(),
message: Some(message),
})
.unwrap();
}
}
#[derive(Debug)]
@@ -88,7 +211,7 @@ impl Serialize for Url {
{
match self {
Url::LocalFolder(path) => serializer.serialize_str(path),
Url::Url(url) => serializer.serialize_str(&url.as_str()),
Url::Url(url) => serializer.serialize_str(url.as_str()),
}
}
}

View File

@@ -45,7 +45,7 @@ impl<S: AlertSender + Installable<T>, T: Topology> Interpret<T> for AlertingInte
}
fn get_name(&self) -> InterpretName {
todo!()
InterpretName::Alerting
}
fn get_version(&self) -> Version {

View File

@@ -27,11 +27,11 @@ pub struct UnmanagedRouter {
impl Router for UnmanagedRouter {
fn get_gateway(&self) -> IpAddress {
self.gateway.clone()
self.gateway
}
fn get_cidr(&self) -> Ipv4Cidr {
self.cidr.clone()
self.cidr
}
fn get_host(&self) -> LogicalHost {

View File

@@ -15,36 +15,38 @@ use k8s_openapi::{
apimachinery::pkg::util::intstr::IntOrString,
};
use kube::Resource;
use log::{debug, info, warn};
use log::debug;
use serde::de::DeserializeOwned;
use serde_json::json;
use tokio::sync::OnceCell;
use super::{TenantConfig, TenantManager};
use super::{TenantConfig, TenantManager, network_policy::NetworkPolicyStrategy};
#[derive(Clone, Debug)]
#[derive(Debug)]
pub struct K8sTenantManager {
k8s_client: Arc<K8sClient>,
k8s_tenant_config: Arc<OnceCell<TenantConfig>>,
network_policy_strategy: Box<dyn NetworkPolicyStrategy>,
}
impl K8sTenantManager {
pub fn new(client: Arc<K8sClient>) -> Self {
pub fn new(
client: Arc<K8sClient>,
network_policy_strategy: Box<dyn NetworkPolicyStrategy>,
) -> Self {
Self {
k8s_client: client,
k8s_tenant_config: Arc::new(OnceCell::new()),
network_policy_strategy,
}
}
}
impl K8sTenantManager {
fn get_namespace_name(&self, config: &TenantConfig) -> String {
config.name.clone()
}
fn ensure_constraints(&self, _namespace: &Namespace) -> Result<(), ExecutorError> {
warn!("Validate that when tenant already exists (by id) that name has not changed");
warn!("Make sure other Tenant constraints are respected by this k8s implementation");
// TODO: Ensure constraints are applied to namespace (https://git.nationtech.io/NationTech/harmony/issues/98)
Ok(())
}
@@ -219,29 +221,6 @@ impl K8sTenantManager {
}
]
},
{
"to": [
{
"ipBlock": {
"cidr": "10.43.0.1/32",
}
}
]
},
{
"to": [
{
//TODO this ip is from the docker network that k3d is running on
//since k3d does not deploy kube-api-server as a pod it needs to ahve the ip
//address opened up
//need to find a way to automatically detect the ip address from the docker
//network
"ipBlock": {
"cidr": "172.24.0.0/16",
}
}
]
},
{
"to": [
{
@@ -309,19 +288,19 @@ impl K8sTenantManager {
let ports: Option<Vec<NetworkPolicyPort>> =
c.1.as_ref().map(|spec| match &spec.data {
super::PortSpecData::SinglePort(port) => vec![NetworkPolicyPort {
port: Some(IntOrString::Int(port.clone().into())),
port: Some(IntOrString::Int((*port).into())),
..Default::default()
}],
super::PortSpecData::PortRange(start, end) => vec![NetworkPolicyPort {
port: Some(IntOrString::Int(start.clone().into())),
end_port: Some(end.clone().into()),
port: Some(IntOrString::Int((*start).into())),
end_port: Some((*end).into()),
protocol: None, // Not currently supported by Harmony
}],
super::PortSpecData::ListOfPorts(items) => items
.iter()
.map(|i| NetworkPolicyPort {
port: Some(IntOrString::Int(i.clone().into())),
port: Some(IntOrString::Int((*i).into())),
..Default::default()
})
.collect(),
@@ -366,19 +345,19 @@ impl K8sTenantManager {
let ports: Option<Vec<NetworkPolicyPort>> =
c.1.as_ref().map(|spec| match &spec.data {
super::PortSpecData::SinglePort(port) => vec![NetworkPolicyPort {
port: Some(IntOrString::Int(port.clone().into())),
port: Some(IntOrString::Int((*port).into())),
..Default::default()
}],
super::PortSpecData::PortRange(start, end) => vec![NetworkPolicyPort {
port: Some(IntOrString::Int(start.clone().into())),
end_port: Some(end.clone().into()),
port: Some(IntOrString::Int((*start).into())),
end_port: Some((*end).into()),
protocol: None, // Not currently supported by Harmony
}],
super::PortSpecData::ListOfPorts(items) => items
.iter()
.map(|i| NetworkPolicyPort {
port: Some(IntOrString::Int(i.clone().into())),
port: Some(IntOrString::Int((*i).into())),
..Default::default()
})
.collect(),
@@ -411,12 +390,27 @@ impl K8sTenantManager {
}
}
impl Clone for K8sTenantManager {
fn clone(&self) -> Self {
Self {
k8s_client: self.k8s_client.clone(),
k8s_tenant_config: self.k8s_tenant_config.clone(),
network_policy_strategy: self.network_policy_strategy.clone_box(),
}
}
}
#[async_trait]
impl TenantManager for K8sTenantManager {
async fn provision_tenant(&self, config: &TenantConfig) -> Result<(), ExecutorError> {
let namespace = self.build_namespace(config)?;
let resource_quota = self.build_resource_quota(config)?;
let network_policy = self.build_network_policy(config)?;
let network_policy = self
.network_policy_strategy
.adjust_policy(network_policy, config);
let resource_limit_range = self.build_limit_range(config)?;
self.ensure_constraints(&namespace)?;
@@ -433,13 +427,14 @@ impl TenantManager for K8sTenantManager {
debug!("Creating network_policy for tenant {}", config.name);
self.apply_resource(network_policy, config).await?;
info!(
debug!(
"Success provisionning K8s tenant id {} name {}",
config.id, config.name
);
self.store_config(config);
Ok(())
}
async fn get_tenant_config(&self) -> Option<TenantConfig> {
self.k8s_tenant_config.get().cloned()
}

View File

@@ -1,11 +1,11 @@
pub mod k8s;
mod manager;
use std::str::FromStr;
pub use manager::*;
use serde::{Deserialize, Serialize};
pub mod network_policy;
use crate::data::Id;
pub use manager::*;
use serde::{Deserialize, Serialize};
use std::str::FromStr;
#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] // Assuming serde for Scores
pub struct TenantConfig {

View File

@@ -0,0 +1,120 @@
use k8s_openapi::api::networking::v1::{
IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyPeer, NetworkPolicySpec,
};
use super::TenantConfig;
pub trait NetworkPolicyStrategy: Send + Sync + std::fmt::Debug {
fn clone_box(&self) -> Box<dyn NetworkPolicyStrategy>;
fn adjust_policy(&self, policy: NetworkPolicy, config: &TenantConfig) -> NetworkPolicy;
}
#[derive(Clone, Debug)]
pub struct NoopNetworkPolicyStrategy {}
impl NoopNetworkPolicyStrategy {
pub fn new() -> Self {
Self {}
}
}
impl Default for NoopNetworkPolicyStrategy {
fn default() -> Self {
Self::new()
}
}
impl NetworkPolicyStrategy for NoopNetworkPolicyStrategy {
fn clone_box(&self) -> Box<dyn NetworkPolicyStrategy> {
Box::new(self.clone())
}
fn adjust_policy(&self, policy: NetworkPolicy, _config: &TenantConfig) -> NetworkPolicy {
policy
}
}
#[derive(Clone, Debug)]
pub struct K3dNetworkPolicyStrategy {}
impl K3dNetworkPolicyStrategy {
pub fn new() -> Self {
Self {}
}
}
impl Default for K3dNetworkPolicyStrategy {
fn default() -> Self {
Self::new()
}
}
impl NetworkPolicyStrategy for K3dNetworkPolicyStrategy {
fn clone_box(&self) -> Box<dyn NetworkPolicyStrategy> {
Box::new(self.clone())
}
fn adjust_policy(&self, policy: NetworkPolicy, _config: &TenantConfig) -> NetworkPolicy {
let mut egress = policy
.spec
.clone()
.unwrap_or_default()
.egress
.clone()
.unwrap_or_default();
egress.push(NetworkPolicyEgressRule {
to: Some(vec![NetworkPolicyPeer {
ip_block: Some(IPBlock {
cidr: "172.18.0.0/16".into(), // TODO: query the IP range https://git.nationtech.io/NationTech/harmony/issues/108
..Default::default()
}),
..Default::default()
}]),
..Default::default()
});
NetworkPolicy {
spec: Some(NetworkPolicySpec {
egress: Some(egress),
..policy.spec.unwrap_or_default()
}),
..policy
}
}
}
#[cfg(test)]
mod tests {
use k8s_openapi::api::networking::v1::{
IPBlock, NetworkPolicy, NetworkPolicyEgressRule, NetworkPolicyPeer, NetworkPolicySpec,
};
use super::{K3dNetworkPolicyStrategy, NetworkPolicyStrategy};
#[test]
pub fn should_add_ip_block_for_k3d_harmony_server() {
let strategy = K3dNetworkPolicyStrategy::new();
let policy =
strategy.adjust_policy(NetworkPolicy::default(), &super::TenantConfig::default());
let expected_policy = NetworkPolicy {
spec: Some(NetworkPolicySpec {
egress: Some(vec![NetworkPolicyEgressRule {
to: Some(vec![NetworkPolicyPeer {
ip_block: Some(IPBlock {
cidr: "172.18.0.0/16".into(),
..Default::default()
}),
..Default::default()
}]),
..Default::default()
}]),
..Default::default()
}),
..Default::default()
};
assert_eq!(expected_policy, policy);
}
}

View File

@@ -60,7 +60,7 @@ impl DnsServer for OPNSenseFirewall {
}
fn get_ip(&self) -> IpAddress {
OPNSenseFirewall::get_ip(&self)
OPNSenseFirewall::get_ip(self)
}
fn get_host(&self) -> LogicalHost {

View File

@@ -48,7 +48,7 @@ impl HttpServer for OPNSenseFirewall {
async fn ensure_initialized(&self) -> Result<(), ExecutorError> {
let mut config = self.opnsense_config.write().await;
let caddy = config.caddy();
if let None = caddy.get_full_config() {
if caddy.get_full_config().is_none() {
info!("Http config not available in opnsense config, installing package");
config.install_package("os-caddy").await.map_err(|e| {
ExecutorError::UnexpectedError(format!(

View File

@@ -121,10 +121,12 @@ pub(crate) fn haproxy_xml_config_to_harmony_loadbalancer(
LoadBalancerService {
backend_servers,
listening_port: frontend.bind.parse().expect(&format!(
"HAProxy frontend address should be a valid SocketAddr, got {}",
frontend.bind
)),
listening_port: frontend.bind.parse().unwrap_or_else(|_| {
panic!(
"HAProxy frontend address should be a valid SocketAddr, got {}",
frontend.bind
)
}),
health_check,
}
})
@@ -167,28 +169,28 @@ pub(crate) fn get_health_check_for_backend(
None => return None,
};
let haproxy_health_check = match haproxy
let haproxy_health_check = haproxy
.healthchecks
.healthchecks
.iter()
.find(|h| &h.uuid == health_check_uuid)
{
Some(health_check) => health_check,
None => return None,
};
.find(|h| &h.uuid == health_check_uuid)?;
let binding = haproxy_health_check.health_check_type.to_uppercase();
let uppercase = binding.as_str();
match uppercase {
"TCP" => {
if let Some(checkport) = haproxy_health_check.checkport.content.as_ref() {
if checkport.len() > 0 {
return Some(HealthCheck::TCP(Some(checkport.parse().expect(&format!(
"HAProxy check port should be a valid port number, got {checkport}"
)))));
if !checkport.is_empty() {
return Some(HealthCheck::TCP(Some(checkport.parse().unwrap_or_else(
|_| {
panic!(
"HAProxy check port should be a valid port number, got {checkport}"
)
},
))));
}
}
return Some(HealthCheck::TCP(None));
Some(HealthCheck::TCP(None))
}
"HTTP" => {
let path: String = haproxy_health_check
@@ -355,16 +357,13 @@ mod tests {
// Create an HAProxy instance with servers
let mut haproxy = HAProxy::default();
let mut server = HAProxyServer::default();
server.uuid = "server1".to_string();
server.address = "192.168.1.1".to_string();
server.port = 80;
let server = HAProxyServer {
uuid: "server1".to_string(),
address: "192.168.1.1".to_string(),
port: 80,
..Default::default()
};
haproxy.servers.servers.push(server);
let mut server = HAProxyServer::default();
server.uuid = "server3".to_string();
server.address = "192.168.1.3".to_string();
server.port = 8080;
// Call the function
let result = get_servers_for_backend(&backend, &haproxy);
@@ -384,10 +383,12 @@ mod tests {
let backend = HAProxyBackend::default();
// Create an HAProxy instance with servers
let mut haproxy = HAProxy::default();
let mut server = HAProxyServer::default();
server.uuid = "server1".to_string();
server.address = "192.168.1.1".to_string();
server.port = 80;
let server = HAProxyServer {
uuid: "server1".to_string(),
address: "192.168.1.1".to_string(),
port: 80,
..Default::default()
};
haproxy.servers.servers.push(server);
// Call the function
let result = get_servers_for_backend(&backend, &haproxy);
@@ -402,10 +403,12 @@ mod tests {
backend.linked_servers.content = Some("server4,server5".to_string());
// Create an HAProxy instance with servers
let mut haproxy = HAProxy::default();
let mut server = HAProxyServer::default();
server.uuid = "server1".to_string();
server.address = "192.168.1.1".to_string();
server.port = 80;
let server = HAProxyServer {
uuid: "server1".to_string(),
address: "192.168.1.1".to_string(),
port: 80,
..Default::default()
};
haproxy.servers.servers.push(server);
// Call the function
let result = get_servers_for_backend(&backend, &haproxy);
@@ -416,20 +419,28 @@ mod tests {
#[test]
fn test_get_servers_for_backend_multiple_linked_servers() {
// Create a backend with multiple linked servers
#[allow(clippy::field_reassign_with_default)]
let mut backend = HAProxyBackend::default();
backend.linked_servers.content = Some("server1,server2".to_string());
//
// Create an HAProxy instance with matching servers
let mut haproxy = HAProxy::default();
let mut server = HAProxyServer::default();
server.uuid = "server1".to_string();
server.address = "some-hostname.test.mcd".to_string();
server.port = 80;
let server = HAProxyServer {
uuid: "server1".to_string(),
address: "some-hostname.test.mcd".to_string(),
port: 80,
..Default::default()
};
haproxy.servers.servers.push(server);
let mut server = HAProxyServer::default();
server.uuid = "server2".to_string();
server.address = "192.168.1.2".to_string();
server.port = 8080;
let server = HAProxyServer {
uuid: "server2".to_string(),
address: "192.168.1.2".to_string(),
port: 8080,
..Default::default()
};
haproxy.servers.servers.push(server);
// Call the function
let result = get_servers_for_backend(&backend, &haproxy);
// Check the result

View File

@@ -22,12 +22,18 @@ pub struct OPNSenseFirewall {
host: LogicalHost,
}
// TODO figure out a design to have a unique identifiere for this firewall
// I think a project identifier would be good enough, then the secrets module configuration will
// point to the project's vault and this opnsense modules doesn't need to know anything about it
const OPNSENSE_CREDENTIALS: &str = "OPNSENSE_CREDENTIALS";
impl OPNSenseFirewall {
pub fn get_ip(&self) -> IpAddress {
self.host.ip
}
pub async fn new(host: LogicalHost, port: Option<u16>, username: &str, password: &str) -> Self {
// let credentials = Secrets::get_by_name(OPNSENSE_CREDENTIALS)
Self {
opnsense_config: Arc::new(RwLock::new(
opnsense_config::Config::from_credentials(host.ip, port, username, password).await,

View File

@@ -58,7 +58,7 @@ impl TftpServer for OPNSenseFirewall {
async fn ensure_initialized(&self) -> Result<(), ExecutorError> {
let mut config = self.opnsense_config.write().await;
let tftp = config.tftp();
if let None = tftp.get_full_config() {
if tftp.get_full_config().is_none() {
info!("Tftp config not available in opnsense config, installing package");
config.install_package("os-tftp").await.map_err(|e| {
ExecutorError::UnexpectedError(format!(

View File

@@ -13,7 +13,7 @@ pub trait ApplicationFeature<T: Topology>:
fn name(&self) -> String;
}
trait ApplicationFeatureClone<T: Topology> {
pub trait ApplicationFeatureClone<T: Topology> {
fn clone_box(&self) -> Box<dyn ApplicationFeature<T>>;
}
@@ -27,7 +27,7 @@ where
}
impl<T: Topology> Serialize for Box<dyn ApplicationFeature<T>> {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{

View File

@@ -184,12 +184,11 @@ impl ArgoApplication {
pub fn to_yaml(&self) -> serde_yaml::Value {
let name = &self.name;
let namespace = if let Some(ns) = self.namespace.as_ref() {
&ns
ns
} else {
"argocd"
};
let project = &self.project;
let source = &self.source;
let yaml_str = format!(
r#"
@@ -228,7 +227,7 @@ spec:
serde_yaml::to_value(&self.source).expect("couldn't serialize source to value");
let sync_policy = serde_yaml::to_value(&self.sync_policy)
.expect("couldn't serialize sync_policy to value");
let revision_history_limit = serde_yaml::to_value(&self.revision_history_limit)
let revision_history_limit = serde_yaml::to_value(self.revision_history_limit)
.expect("couldn't serialize revision_history_limit to value");
spec.insert(

View File

@@ -1,7 +1,7 @@
use std::{io::Write, process::Command, sync::Arc};
use async_trait::async_trait;
use log::{debug, error};
use log::info;
use serde_yaml::Value;
use tempfile::NamedTempFile;
@@ -10,7 +10,7 @@ use crate::{
data::Version,
inventory::Inventory,
modules::application::{
Application, ApplicationFeature, HelmPackage, OCICompliant,
ApplicationFeature, HelmPackage, OCICompliant,
features::{ArgoApplication, ArgoHelmScore},
},
score::Score,
@@ -56,14 +56,11 @@ impl<A: OCICompliant + HelmPackage> ContinuousDelivery<A> {
chart_url: String,
image_name: String,
) -> Result<(), String> {
error!(
"FIXME This works only with local k3d installations, which is fine only for current demo purposes. We assume usage of K8sAnywhereTopology"
);
error!("TODO hardcoded k3d bin path is wrong");
// TODO: This works only with local k3d installations, which is fine only for current demo purposes. We assume usage of K8sAnywhereTopology"
// https://git.nationtech.io/NationTech/harmony/issues/106
let k3d_bin_path = (*HARMONY_DATA_DIR).join("k3d").join("k3d");
// --- 1. Import the container image into the k3d cluster ---
debug!(
info!(
"Importing image '{}' into k3d cluster 'harmony'",
image_name
);
@@ -80,7 +77,7 @@ impl<A: OCICompliant + HelmPackage> ContinuousDelivery<A> {
}
// --- 2. Get the kubeconfig for the k3d cluster and write it to a temp file ---
debug!("Retrieving kubeconfig for k3d cluster 'harmony'");
info!("Retrieving kubeconfig for k3d cluster 'harmony'");
let kubeconfig_output = Command::new(&k3d_bin_path)
.args(["kubeconfig", "get", "harmony"])
.output()
@@ -101,7 +98,7 @@ impl<A: OCICompliant + HelmPackage> ContinuousDelivery<A> {
let kubeconfig_path = temp_kubeconfig.path().to_str().unwrap();
// --- 3. Install or upgrade the Helm chart in the cluster ---
debug!(
info!(
"Deploying Helm chart '{}' to namespace '{}'",
chart_url, app_name
);
@@ -131,7 +128,7 @@ impl<A: OCICompliant + HelmPackage> ContinuousDelivery<A> {
));
}
debug!("Successfully deployed '{}' to local k3d cluster.", app_name);
info!("Successfully deployed '{}' to local k3d cluster.", app_name);
Ok(())
}
}
@@ -151,14 +148,12 @@ impl<
// Or ask for it when unknown
let helm_chart = self.application.build_push_helm_package(&image).await?;
debug!("Pushed new helm chart {helm_chart}");
error!("TODO Make building image configurable/skippable if image already exists (prompt)");
// TODO: Make building image configurable/skippable if image already exists (prompt)")
// https://git.nationtech.io/NationTech/harmony/issues/104
let image = self.application.build_push_oci_image().await?;
debug!("Pushed new docker image {image}");
debug!("Installing ContinuousDelivery feature");
// TODO this is a temporary hack for demo purposes, the deployment target should be driven
// TODO: this is a temporary hack for demo purposes, the deployment target should be driven
// by the topology only and we should not have to know how to perform tasks like this for
// which the topology should be responsible.
//
@@ -171,17 +166,20 @@ impl<
// access it. This forces every Topology to understand the concept of targets though... So
// instead I'll create a new Capability which is MultiTargetTopology and we'll see how it
// goes. It still does not feel right though.
//
// https://git.nationtech.io/NationTech/harmony/issues/106
match topology.current_target() {
DeploymentTarget::LocalDev => {
info!("Deploying {} locally...", self.application.name());
self.deploy_to_local_k3d(self.application.name(), helm_chart, image)
.await?;
}
target => {
debug!("Deploying to target {target:?}");
info!("Deploying {} to target {target:?}", self.application.name());
let score = ArgoHelmScore {
namespace: "harmonydemo-staging".to_string(),
openshift: false,
domain: "argo.harmonydemo.apps.st.mcd".to_string(),
namespace: "harmony-example-rust-webapp".to_string(),
openshift: true,
domain: "argo.harmonydemo.apps.ncd0.harmony.mcd".to_string(),
argo_apps: vec![ArgoApplication::from(CDApplicationConfig {
// helm pull oci://hub.nationtech.io/harmony/harmony-example-rust-webapp-chart --version 0.1.0
version: Version::from("0.1.0").unwrap(),
@@ -189,12 +187,11 @@ impl<
helm_chart_name: "harmony-example-rust-webapp-chart".to_string(),
values_overrides: None,
name: "harmony-demo-rust-webapp".to_string(),
namespace: "harmonydemo-staging".to_string(),
namespace: "harmony-example-rust-webapp".to_string(),
})],
};
score
.create_interpret()
.execute(&Inventory::empty(), topology)
.interpret(&Inventory::empty(), topology)
.await
.unwrap();
}

View File

@@ -1,5 +1,4 @@
use async_trait::async_trait;
use log::error;
use non_blank_string_rs::NonBlankString;
use serde::Serialize;
use std::str::FromStr;
@@ -50,20 +49,21 @@ impl<T: Topology + K8sclient + HelmCommand> Interpret<T> for ArgoInterpret {
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
error!("Uncomment below, only disabled for debugging");
self.score
.create_interpret()
.execute(inventory, topology)
.await?;
self.score.interpret(inventory, topology).await?;
let k8s_client = topology.k8s_client().await?;
k8s_client
.apply_yaml_many(&self.argo_apps.iter().map(|a| a.to_yaml()).collect(), None)
.await
.unwrap();
Ok(Outcome::success(format!(
"Successfully installed ArgoCD and {} Applications",
self.argo_apps.len()
"ArgoCD installed with {} {}",
self.argo_apps.len(),
match self.argo_apps.len() {
1 => "application",
_ => "applications",
}
)))
}
@@ -986,7 +986,7 @@ commitServer:
);
HelmChartScore {
namespace: Some(NonBlankString::from_str(&namespace).unwrap()),
namespace: Some(NonBlankString::from_str(namespace).unwrap()),
release_name: NonBlankString::from_str("argo-cd").unwrap(),
chart_name: NonBlankString::from_str("argo/argo-cd").unwrap(),
chart_version: Some(NonBlankString::from_str("8.1.2").unwrap()),

View File

@@ -4,6 +4,7 @@ use crate::modules::application::{Application, ApplicationFeature};
use crate::modules::monitoring::application_monitoring::application_monitoring_score::ApplicationMonitoringScore;
use crate::modules::monitoring::kube_prometheus::crd::crd_alertmanager_config::CRDPrometheus;
use crate::topology::MultiTargetTopology;
use crate::{
inventory::Inventory,
modules::monitoring::{
@@ -33,6 +34,7 @@ impl<
+ 'static
+ TenantManager
+ K8sclient
+ MultiTargetTopology
+ std::fmt::Debug
+ PrometheusApplicationMonitoring<CRDPrometheus>,
> ApplicationFeature<T> for Monitoring
@@ -55,12 +57,11 @@ impl<
};
let ntfy = NtfyScore {
namespace: namespace.clone(),
host: "localhost".to_string(),
host: "ntfy.harmonydemo.apps.ncd0.harmony.mcd".to_string(),
};
ntfy.create_interpret()
.execute(&Inventory::empty(), topology)
ntfy.interpret(&Inventory::empty(), topology)
.await
.expect("couldn't create interpret for ntfy");
.map_err(|e| e.to_string())?;
let ntfy_default_auth_username = "harmony";
let ntfy_default_auth_password = "harmony";
@@ -95,10 +96,9 @@ impl<
alerting_score.receivers.push(Box::new(ntfy_receiver));
alerting_score
.create_interpret()
.execute(&Inventory::empty(), topology)
.interpret(&Inventory::empty(), topology)
.await
.unwrap();
.map_err(|e| e.to_string())?;
Ok(())
}
fn name(&self) -> String {

View File

@@ -14,11 +14,19 @@ use serde::Serialize;
use crate::{
data::{Id, Version},
instrumentation::{self, HarmonyEvent},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
topology::Topology,
};
#[derive(Clone, Debug)]
pub enum ApplicationFeatureStatus {
Installing,
Installed,
Failed { details: String },
}
pub trait Application: std::fmt::Debug + Send + Sync {
fn name(&self) -> String;
}
@@ -47,20 +55,41 @@ impl<A: Application, T: Topology + std::fmt::Debug> Interpret<T> for Application
.join(", ")
);
for feature in self.features.iter() {
debug!(
"Installing feature {} for application {app_name}",
feature.name()
);
instrumentation::instrument(HarmonyEvent::ApplicationFeatureStateChanged {
topology: topology.name().into(),
application: self.application.name(),
feature: feature.name(),
status: ApplicationFeatureStatus::Installing,
})
.unwrap();
let _ = match feature.ensure_installed(topology).await {
Ok(()) => (),
Ok(()) => {
instrumentation::instrument(HarmonyEvent::ApplicationFeatureStateChanged {
topology: topology.name().into(),
application: self.application.name(),
feature: feature.name(),
status: ApplicationFeatureStatus::Installed,
})
.unwrap();
}
Err(msg) => {
instrumentation::instrument(HarmonyEvent::ApplicationFeatureStateChanged {
topology: topology.name().into(),
application: self.application.name(),
feature: feature.name(),
status: ApplicationFeatureStatus::Failed {
details: msg.clone(),
},
})
.unwrap();
return Err(InterpretError::new(format!(
"Application Interpret failed to install feature : {msg}"
)));
}
};
}
Ok(Outcome::success("successfully created app".to_string()))
Ok(Outcome::success("Application created".to_string()))
}
fn get_name(&self) -> InterpretName {
@@ -81,7 +110,7 @@ impl<A: Application, T: Topology + std::fmt::Debug> Interpret<T> for Application
}
impl Serialize for dyn Application {
fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
fn serialize<S>(&self, _serializer: S) -> Result<S::Ok, S::Error>
where
S: serde::Serializer,
{

View File

@@ -1,5 +1,5 @@
use std::fs;
use std::path::PathBuf;
use std::path::{Path, PathBuf};
use std::process;
use std::sync::Arc;
@@ -10,7 +10,7 @@ use dockerfile_builder::Dockerfile;
use dockerfile_builder::instruction::{CMD, COPY, ENV, EXPOSE, FROM, RUN, USER, WORKDIR};
use dockerfile_builder::instruction_builder::CopyBuilder;
use futures_util::StreamExt;
use log::{debug, error, log_enabled};
use log::{debug, info, log_enabled};
use serde::Serialize;
use tar::Archive;
@@ -46,7 +46,7 @@ where
}
fn name(&self) -> String {
format!("Application: {}", self.application.name())
format!("{} [ApplicationScore]", self.application.name())
}
}
@@ -73,19 +73,19 @@ impl Application for RustWebapp {
#[async_trait]
impl HelmPackage for RustWebapp {
async fn build_push_helm_package(&self, image_url: &str) -> Result<String, String> {
debug!("Starting Helm chart build and push for '{}'", self.name);
info!("Starting Helm chart build and push for '{}'", self.name);
// 1. Create the Helm chart files on disk.
let chart_dir = self
.create_helm_chart_files(image_url)
.map_err(|e| format!("Failed to create Helm chart files: {}", e))?;
debug!("Successfully created Helm chart files in {:?}", chart_dir);
info!("Successfully created Helm chart files in {:?}", chart_dir);
// 2. Package the chart into a .tgz archive.
let packaged_chart_path = self
.package_helm_chart(&chart_dir)
.map_err(|e| format!("Failed to package Helm chart: {}", e))?;
debug!(
info!(
"Successfully packaged Helm chart: {}",
packaged_chart_path.to_string_lossy()
);
@@ -94,7 +94,7 @@ impl HelmPackage for RustWebapp {
let oci_chart_url = self
.push_helm_chart(&packaged_chart_path)
.map_err(|e| format!("Failed to push Helm chart: {}", e))?;
debug!("Successfully pushed Helm chart to: {}", oci_chart_url);
info!("Successfully pushed Helm chart to: {}", oci_chart_url);
Ok(oci_chart_url)
}
@@ -107,20 +107,20 @@ impl OCICompliant for RustWebapp {
async fn build_push_oci_image(&self) -> Result<String, String> {
// This function orchestrates the build and push process.
// It's async to match the trait definition, though the underlying docker commands are blocking.
debug!("Starting OCI image build and push for '{}'", self.name);
info!("Starting OCI image build and push for '{}'", self.name);
// 1. Build the image by calling the synchronous helper function.
let image_tag = self.image_name();
self.build_docker_image(&image_tag)
.await
.map_err(|e| format!("Failed to build Docker image: {}", e))?;
debug!("Successfully built Docker image: {}", image_tag);
info!("Successfully built Docker image: {}", image_tag);
// 2. Push the image to the registry.
self.push_docker_image(&image_tag)
.await
.map_err(|e| format!("Failed to push Docker image: {}", e))?;
debug!("Successfully pushed Docker image to: {}", image_tag);
info!("Successfully pushed Docker image to: {}", image_tag);
Ok(image_tag)
}
@@ -174,7 +174,7 @@ impl RustWebapp {
.platform("linux/x86_64");
let mut temp_tar_builder = tar::Builder::new(Vec::new());
let _ = temp_tar_builder
temp_tar_builder
.append_dir_all("", self.project_root.clone())
.unwrap();
let archive = temp_tar_builder
@@ -195,7 +195,7 @@ impl RustWebapp {
);
while let Some(msg) = image_build_stream.next().await {
println!("Message: {msg:?}");
debug!("Message: {msg:?}");
}
Ok(image_name.to_string())
@@ -219,7 +219,7 @@ impl RustWebapp {
);
while let Some(msg) = push_image_stream.next().await {
println!("Message: {msg:?}");
debug!("Message: {msg:?}");
}
Ok(image_tag.to_string())
@@ -288,9 +288,8 @@ impl RustWebapp {
.unwrap(),
);
// Copy the compiled binary from the builder stage.
error!(
"FIXME Should not be using score name here, instead should use name from Cargo.toml"
);
// TODO: Should not be using score name here, instead should use name from Cargo.toml
// https://git.nationtech.io/NationTech/harmony/issues/105
let binary_path_in_builder = format!("/app/target/release/{}", self.name);
let binary_path_in_final = format!("/home/appuser/{}", self.name);
dockerfile.push(
@@ -328,9 +327,8 @@ impl RustWebapp {
));
// Copy only the compiled binary from the builder stage.
error!(
"FIXME Should not be using score name here, instead should use name from Cargo.toml"
);
// TODO: Should not be using score name here, instead should use name from Cargo.toml
// https://git.nationtech.io/NationTech/harmony/issues/105
let binary_path_in_builder = format!("/app/target/release/{}", self.name);
let binary_path_in_final = format!("/usr/local/bin/{}", self.name);
dockerfile.push(
@@ -530,10 +528,7 @@ spec:
}
/// Packages a Helm chart directory into a .tgz file.
fn package_helm_chart(
&self,
chart_dir: &PathBuf,
) -> Result<PathBuf, Box<dyn std::error::Error>> {
fn package_helm_chart(&self, chart_dir: &Path) -> Result<PathBuf, Box<dyn std::error::Error>> {
let chart_dirname = chart_dir.file_name().expect("Should find a chart dirname");
debug!(
"Launching `helm package {}` cli with CWD {}",
@@ -546,14 +541,13 @@ spec:
);
let output = process::Command::new("helm")
.args(["package", chart_dirname.to_str().unwrap()])
.current_dir(&self.project_root.join(".harmony_generated").join("helm")) // Run package from the parent dir
.current_dir(self.project_root.join(".harmony_generated").join("helm")) // Run package from the parent dir
.output()?;
self.check_output(&output, "Failed to package Helm chart")?;
// Helm prints the path of the created chart to stdout.
let tgz_name = String::from_utf8(output.stdout)?
.trim()
.split_whitespace()
.last()
.unwrap_or_default()
@@ -573,7 +567,7 @@ spec:
/// Pushes a packaged Helm chart to an OCI registry.
fn push_helm_chart(
&self,
packaged_chart_path: &PathBuf,
packaged_chart_path: &Path,
) -> Result<String, Box<dyn std::error::Error>> {
// The chart name is the file stem of the .tgz file
let chart_file_name = packaged_chart_path.file_stem().unwrap().to_str().unwrap();

View File

@@ -41,6 +41,6 @@ impl<T: Topology + HelmCommand> Score<T> for CertManagerHelmScore {
}
fn name(&self) -> String {
format!("CertManagerHelmScore")
"CertManagerHelmScore".to_string()
}
}

View File

@@ -111,7 +111,7 @@ impl DhcpInterpret {
let boot_filename_outcome = match &self.score.boot_filename {
Some(boot_filename) => {
dhcp_server.set_boot_filename(&boot_filename).await?;
dhcp_server.set_boot_filename(boot_filename).await?;
Outcome::new(
InterpretStatus::SUCCESS,
format!("Dhcp Interpret Set boot filename to {boot_filename}"),
@@ -122,7 +122,7 @@ impl DhcpInterpret {
let filename_outcome = match &self.score.filename {
Some(filename) => {
dhcp_server.set_filename(&filename).await?;
dhcp_server.set_filename(filename).await?;
Outcome::new(
InterpretStatus::SUCCESS,
format!("Dhcp Interpret Set filename to {filename}"),
@@ -133,7 +133,7 @@ impl DhcpInterpret {
let filename64_outcome = match &self.score.filename64 {
Some(filename64) => {
dhcp_server.set_filename64(&filename64).await?;
dhcp_server.set_filename64(filename64).await?;
Outcome::new(
InterpretStatus::SUCCESS,
format!("Dhcp Interpret Set filename64 to {filename64}"),
@@ -144,7 +144,7 @@ impl DhcpInterpret {
let filenameipxe_outcome = match &self.score.filenameipxe {
Some(filenameipxe) => {
dhcp_server.set_filenameipxe(&filenameipxe).await?;
dhcp_server.set_filenameipxe(filenameipxe).await?;
Outcome::new(
InterpretStatus::SUCCESS,
format!("Dhcp Interpret Set filenameipxe to {filenameipxe}"),
@@ -209,7 +209,7 @@ impl<T: DhcpServer> Interpret<T> for DhcpInterpret {
Ok(Outcome::new(
InterpretStatus::SUCCESS,
format!("Dhcp Interpret execution successful"),
"Dhcp Interpret execution successful".to_string(),
))
}
}

View File

@@ -112,7 +112,7 @@ impl<T: Topology + DnsServer> Interpret<T> for DnsInterpret {
Ok(Outcome::new(
InterpretStatus::SUCCESS,
format!("Dns Interpret execution successful"),
"Dns Interpret execution successful".to_string(),
))
}
}

View File

@@ -55,7 +55,7 @@ impl<T: Topology + HelmCommand> Score<T> for HelmChartScore {
}
fn name(&self) -> String {
format!("{} {} HelmChartScore", self.release_name, self.chart_name)
format!("{} [HelmChartScore]", self.release_name)
}
}
@@ -90,14 +90,10 @@ impl HelmChartInterpret {
);
match add_output.status.success() {
true => {
return Ok(());
}
false => {
return Err(InterpretError::new(format!(
"Failed to add helm repository!\n{full_output}"
)));
}
true => Ok(()),
false => Err(InterpretError::new(format!(
"Failed to add helm repository!\n{full_output}"
))),
}
}
}
@@ -212,7 +208,7 @@ impl<T: Topology + HelmCommand> Interpret<T> for HelmChartInterpret {
}
let res = helm_executor.install_or_upgrade(
&ns,
ns,
&self.score.release_name,
&self.score.chart_name,
self.score.chart_version.as_ref(),
@@ -229,24 +225,27 @@ impl<T: Topology + HelmCommand> Interpret<T> for HelmChartInterpret {
match status {
helm_wrapper_rs::HelmDeployStatus::Deployed => Ok(Outcome::new(
InterpretStatus::SUCCESS,
"Helm Chart deployed".to_string(),
format!("Helm Chart {} deployed", self.score.release_name),
)),
helm_wrapper_rs::HelmDeployStatus::PendingInstall => Ok(Outcome::new(
InterpretStatus::RUNNING,
"Helm Chart Pending install".to_string(),
format!("Helm Chart {} pending install...", self.score.release_name),
)),
helm_wrapper_rs::HelmDeployStatus::PendingUpgrade => Ok(Outcome::new(
InterpretStatus::RUNNING,
"Helm Chart pending upgrade".to_string(),
)),
helm_wrapper_rs::HelmDeployStatus::Failed => Err(InterpretError::new(
"Failed to install helm chart".to_string(),
format!("Helm Chart {} pending upgrade...", self.score.release_name),
)),
helm_wrapper_rs::HelmDeployStatus::Failed => Err(InterpretError::new(format!(
"Helm Chart {} installation failed",
self.score.release_name
))),
}
}
fn get_name(&self) -> InterpretName {
todo!()
InterpretName::HelmChart
}
fn get_version(&self) -> Version {
todo!()
}

View File

@@ -77,14 +77,11 @@ impl HelmCommandExecutor {
)?;
}
let out = match self.clone().run_command(
let out = self.clone().run_command(
self.chart
.clone()
.helm_args(self.globals.chart_home.clone().unwrap()),
) {
Ok(out) => out,
Err(e) => return Err(e),
};
)?;
// TODO: don't use unwrap here
let s = String::from_utf8(out.stdout).unwrap();
@@ -98,14 +95,11 @@ impl HelmCommandExecutor {
}
pub fn version(self) -> Result<String, std::io::Error> {
let out = match self.run_command(vec![
let out = self.run_command(vec![
"version".to_string(),
"-c".to_string(),
"--short".to_string(),
]) {
Ok(out) => out,
Err(e) => return Err(e),
};
])?;
// TODO: don't use unwrap
Ok(String::from_utf8(out.stdout).unwrap())
@@ -129,15 +123,11 @@ impl HelmCommandExecutor {
None => PathBuf::from(TempDir::new()?.path()),
};
match self.chart.values_inline {
Some(yaml_str) => {
let tf: TempFile;
tf = temp_file::with_contents(yaml_str.as_bytes());
self.chart
.additional_values_files
.push(PathBuf::from(tf.path()));
}
None => (),
if let Some(yaml_str) = self.chart.values_inline {
let tf: TempFile = temp_file::with_contents(yaml_str.as_bytes());
self.chart
.additional_values_files
.push(PathBuf::from(tf.path()));
};
self.env.insert(
@@ -180,9 +170,9 @@ impl HelmChart {
match self.repo {
Some(r) => {
if r.starts_with("oci://") {
args.push(String::from(
args.push(
r.trim_end_matches("/").to_string() + "/" + self.name.clone().as_str(),
));
);
} else {
args.push("--repo".to_string());
args.push(r.to_string());
@@ -193,12 +183,9 @@ impl HelmChart {
None => args.push(self.name),
};
match self.version {
Some(v) => {
args.push("--version".to_string());
args.push(v.to_string());
}
None => (),
if let Some(v) = self.version {
args.push("--version".to_string());
args.push(v.to_string());
}
args
@@ -362,7 +349,7 @@ impl<T: Topology + K8sclient + HelmCommand> Interpret<T> for HelmChartInterpretV
}
fn get_name(&self) -> InterpretName {
todo!()
InterpretName::HelmCommand
}
fn get_version(&self) -> Version {
todo!()

View File

@@ -1,13 +1,12 @@
use std::path::PathBuf;
use async_trait::async_trait;
use log::{debug, info};
use log::debug;
use serde::Serialize;
use crate::{
config::HARMONY_DATA_DIR,
data::{Id, Version},
instrumentation::{self, HarmonyEvent},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
@@ -30,14 +29,14 @@ impl Default for K3DInstallationScore {
}
impl<T: Topology> Score<T> for K3DInstallationScore {
fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(K3dInstallationInterpret {
score: self.clone(),
})
}
fn name(&self) -> String {
todo!()
"K3dInstallationScore".into()
}
}
@@ -51,20 +50,14 @@ impl<T: Topology> Interpret<T> for K3dInstallationInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
_topology: &T,
) -> Result<Outcome, InterpretError> {
instrumentation::instrument(HarmonyEvent::InterpretExecutionStarted {
topology: topology.name().into(),
interpret: "k3d-installation".into(),
message: "installing k3d...".into(),
})
.unwrap();
let k3d = k3d_rs::K3d::new(
self.score.installation_path.clone(),
Some(self.score.cluster_name.clone()),
);
let outcome = match k3d.ensure_installed().await {
match k3d.ensure_installed().await {
Ok(_client) => {
let msg = format!("k3d cluster '{}' installed ", self.score.cluster_name);
debug!("{msg}");
@@ -73,16 +66,7 @@ impl<T: Topology> Interpret<T> for K3dInstallationInterpret {
Err(msg) => Err(InterpretError::new(format!(
"failed to ensure k3d is installed : {msg}"
))),
};
instrumentation::instrument(HarmonyEvent::InterpretExecutionFinished {
topology: topology.name().into(),
interpret: "k3d-installation".into(),
outcome: outcome.clone(),
})
.unwrap();
outcome
}
}
fn get_name(&self) -> InterpretName {
InterpretName::K3dInstallation

View File

@@ -89,7 +89,7 @@ where
))
}
fn get_name(&self) -> InterpretName {
todo!()
InterpretName::K8sResource
}
fn get_version(&self) -> Version {
todo!()

View File

@@ -128,13 +128,12 @@ impl<T: Topology + K8sclient + HelmCommand> Interpret<T> for LAMPInterpret {
info!("Deploying score {deployment_score:#?}");
deployment_score
.create_interpret()
.execute(inventory, topology)
.await?;
deployment_score.interpret(inventory, topology).await?;
info!("LAMP deployment_score {deployment_score:?}");
let ingress_path = ingress_path!("/");
let lamp_ingress = K8sIngressScore {
name: fqdn!("lamp-ingress"),
host: fqdn!("test"),
@@ -144,17 +143,14 @@ impl<T: Topology + K8sclient + HelmCommand> Interpret<T> for LAMPInterpret {
.as_str()
),
port: 8080,
path: Some(ingress_path!("/")),
path: Some(ingress_path),
path_type: None,
namespace: self
.get_namespace()
.map(|nbs| fqdn!(nbs.to_string().as_str())),
};
lamp_ingress
.create_interpret()
.execute(inventory, topology)
.await?;
lamp_ingress.interpret(inventory, topology).await?;
info!("LAMP lamp_ingress {lamp_ingress:?}");
@@ -164,7 +160,7 @@ impl<T: Topology + K8sclient + HelmCommand> Interpret<T> for LAMPInterpret {
}
fn get_name(&self) -> InterpretName {
todo!()
InterpretName::Lamp
}
fn get_version(&self) -> Version {
@@ -213,7 +209,7 @@ impl LAMPInterpret {
repository: None,
};
score.create_interpret().execute(inventory, topology).await
score.interpret(inventory, topology).await
}
fn build_dockerfile(&self, score: &LAMPScore) -> Result<PathBuf, Box<dyn std::error::Error>> {
let mut dockerfile = Dockerfile::new();

View File

@@ -18,7 +18,7 @@ use crate::{
#[async_trait]
impl AlertRule<KubePrometheus> for AlertManagerRuleGroup {
async fn install(&self, sender: &KubePrometheus) -> Result<Outcome, InterpretError> {
sender.install_rule(&self).await
sender.install_rule(self).await
}
fn clone_box(&self) -> Box<dyn AlertRule<KubePrometheus>> {
Box::new(self.clone())
@@ -28,7 +28,7 @@ impl AlertRule<KubePrometheus> for AlertManagerRuleGroup {
#[async_trait]
impl AlertRule<Prometheus> for AlertManagerRuleGroup {
async fn install(&self, sender: &Prometheus) -> Result<Outcome, InterpretError> {
sender.install_rule(&self).await
sender.install_rule(self).await
}
fn clone_box(&self) -> Box<dyn AlertRule<Prometheus>> {
Box::new(self.clone())

View File

@@ -13,7 +13,7 @@ use crate::{
prometheus::prometheus::PrometheusApplicationMonitoring,
},
score::Score,
topology::{Topology, oberservability::monitoring::AlertReceiver},
topology::{PreparationOutcome, Topology, oberservability::monitoring::AlertReceiver},
};
#[derive(Debug, Clone, Serialize)]
@@ -33,7 +33,10 @@ impl<T: Topology + PrometheusApplicationMonitoring<CRDPrometheus>> Score<T>
}
fn name(&self) -> String {
"ApplicationMonitoringScore".to_string()
format!(
"{} monitoring [ApplicationMonitoringScore]",
self.application.name()
)
}
}
@@ -51,17 +54,27 @@ impl<T: Topology + PrometheusApplicationMonitoring<CRDPrometheus>> Interpret<T>
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
topology
let result = topology
.install_prometheus(
&self.score.sender,
inventory,
Some(self.score.receivers.clone()),
)
.await
.await;
match result {
Ok(outcome) => match outcome {
PreparationOutcome::Success { details: _ } => {
Ok(Outcome::success("Prometheus installed".into()))
}
PreparationOutcome::Noop => Ok(Outcome::noop()),
},
Err(err) => Err(InterpretError::from(err)),
}
}
fn get_name(&self) -> InterpretName {
todo!()
InterpretName::ApplicationMonitoring
}
fn get_version(&self) -> Version {

View File

@@ -4,15 +4,14 @@ use std::str::FromStr;
use crate::modules::helm::chart::HelmChartScore;
pub fn grafana_helm_chart_score(ns: &str) -> HelmChartScore {
let values = format!(
r#"
let values = r#"
rbac:
namespaced: true
sidecar:
dashboards:
enabled: true
"#
);
.to_string();
HelmChartScore {
namespace: Some(NonBlankString::from_str(ns).unwrap()),

View File

@@ -1,7 +1,6 @@
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use std::collections::BTreeMap;
use super::crd_prometheuses::LabelSelector;

View File

@@ -1,13 +1,8 @@
use std::collections::BTreeMap;
use crate::modules::{
monitoring::alert_rule::prometheus_alert_rule::PrometheusAlertRule,
prometheus::alerts::k8s::{
deployment::alert_deployment_unavailable,
pod::{alert_container_restarting, alert_pod_not_ready, pod_failed},
pvc::high_pvc_fill_rate_over_two_days,
service::alert_service_down,
},
use crate::modules::prometheus::alerts::k8s::{
deployment::alert_deployment_unavailable,
pod::{alert_container_restarting, alert_pod_not_ready, pod_failed},
pvc::high_pvc_fill_rate_over_two_days,
service::alert_service_down,
};
use super::crd_prometheus_rules::Rule;

View File

@@ -6,8 +6,6 @@ use serde::{Deserialize, Serialize};
use crate::modules::monitoring::alert_rule::prometheus_alert_rule::PrometheusAlertRule;
use super::crd_default_rules::build_default_application_rules;
#[derive(CustomResource, Debug, Serialize, Deserialize, Clone, JsonSchema)]
#[kube(
group = "monitoring.coreos.com",

View File

@@ -1,11 +1,9 @@
use std::collections::{BTreeMap, HashMap};
use std::collections::HashMap;
use kube::{CustomResource, Resource, api::ObjectMeta};
use kube::CustomResource;
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use crate::interpret::InterpretError;
use crate::modules::monitoring::kube_prometheus::types::{
HTTPScheme, MatchExpression, NamespaceSelector, Operator, Selector,
ServiceMonitor as KubeServiceMonitor, ServiceMonitorEndpoint,
@@ -50,7 +48,7 @@ pub struct ServiceMonitorSpec {
impl Default for ServiceMonitorSpec {
fn default() -> Self {
let mut labels = HashMap::new();
let labels = HashMap::new();
Self {
selector: Selector {
match_labels: { labels },

View File

@@ -27,6 +27,12 @@ pub struct KubePrometheusConfig {
pub alert_rules: Vec<AlertManagerAdditionalPromRules>,
pub additional_service_monitors: Vec<ServiceMonitor>,
}
impl Default for KubePrometheusConfig {
fn default() -> Self {
Self::new()
}
}
impl KubePrometheusConfig {
pub fn new() -> Self {
Self {

View File

@@ -35,7 +35,7 @@ pub fn kube_prometheus_helm_chart_score(
let kube_proxy = config.kube_proxy.to_string();
let kube_state_metrics = config.kube_state_metrics.to_string();
let node_exporter = config.node_exporter.to_string();
let prometheus_operator = config.prometheus_operator.to_string();
let _prometheus_operator = config.prometheus_operator.to_string();
let prometheus = config.prometheus.to_string();
let resource_limit = Resources {
limits: Limits {
@@ -64,7 +64,7 @@ pub fn kube_prometheus_helm_chart_score(
indent_lines(&yaml, indent_level + 2)
)
}
let resource_section = resource_block(&resource_limit, 2);
let _resource_section = resource_block(&resource_limit, 2);
let mut values = format!(
r#"

View File

@@ -55,6 +55,12 @@ pub struct KubePrometheus {
pub config: Arc<Mutex<KubePrometheusConfig>>,
}
impl Default for KubePrometheus {
fn default() -> Self {
Self::new()
}
}
impl KubePrometheus {
pub fn new() -> Self {
Self {
@@ -113,8 +119,7 @@ impl KubePrometheus {
topology: &T,
) -> Result<Outcome, InterpretError> {
kube_prometheus_helm_chart_score(self.config.clone())
.create_interpret()
.execute(inventory, topology)
.interpret(inventory, topology)
.await
}
}

View File

@@ -1,9 +1,25 @@
use non_blank_string_rs::NonBlankString;
use std::str::FromStr;
use crate::modules::helm::chart::{HelmChartScore, HelmRepository};
use crate::{modules::helm::chart::HelmChartScore, topology::DeploymentTarget};
pub fn ntfy_helm_chart_score(
namespace: String,
host: String,
target: DeploymentTarget,
) -> HelmChartScore {
// TODO not actually the correct logic, this should be fixed by using an ingresss which is the
// correct k8s standard.
//
// Another option is to delegate to the topology the ingress technology it wants to use Route,
// Ingress or other
let route_enabled = match target {
DeploymentTarget::LocalDev => false,
DeploymentTarget::Staging => true,
DeploymentTarget::Production => true,
};
let ingress_enabled = !route_enabled;
pub fn ntfy_helm_chart_score(namespace: String, host: String) -> HelmChartScore {
let values = format!(
r#"
replicaCount: 1
@@ -25,23 +41,14 @@ serviceAccount:
service:
type: ClusterIP
port: 80
port: 8080
ingress:
enabled: true
# annotations:
# kubernetes.io/ingress.class: nginx
# kubernetes.io/tls-acme: "true"
hosts:
- host: {host}
paths:
- path: /
pathType: ImplementationSpecific
tls: []
# - secretName: chart-example-tls
# hosts:
# - chart-example.local
enabled: {ingress_enabled}
route:
enabled: {route_enabled}
host: {host}
autoscaling:
enabled: false
@@ -49,7 +56,7 @@ autoscaling:
config:
enabled: true
data:
# base-url: "https://ntfy.something.com"
base-url: "https://{host}"
auth-file: "/var/cache/ntfy/user.db"
auth-default-access: "deny-all"
cache-file: "/var/cache/ntfy/cache.db"
@@ -59,6 +66,7 @@ config:
enable-signup: false
enable-login: "true"
enable-metrics: "true"
listen-http: ":8080"
persistence:
enabled: true
@@ -69,16 +77,12 @@ persistence:
HelmChartScore {
namespace: Some(NonBlankString::from_str(&namespace).unwrap()),
release_name: NonBlankString::from_str("ntfy").unwrap(),
chart_name: NonBlankString::from_str("sarab97/ntfy").unwrap(),
chart_version: Some(NonBlankString::from_str("0.1.7").unwrap()),
chart_name: NonBlankString::from_str("oci://hub.nationtech.io/harmony/ntfy").unwrap(),
chart_version: Some(NonBlankString::from_str("0.1.7-nationtech.1").unwrap()),
values_overrides: None,
values_yaml: Some(values.to_string()),
create_namespace: true,
install_only: false,
repository: Some(HelmRepository::new(
"sarab97".to_string(),
url::Url::parse("https://charts.sarabsingh.com").unwrap(),
true,
)),
repository: None,
}
}

View File

@@ -1,2 +1,3 @@
pub mod helm;
#[allow(clippy::module_inception)]
pub mod ntfy;

View File

@@ -1,7 +1,7 @@
use std::sync::Arc;
use async_trait::async_trait;
use log::debug;
use log::info;
use serde::Serialize;
use strum::{Display, EnumString};
@@ -11,7 +11,7 @@ use crate::{
inventory::Inventory,
modules::monitoring::ntfy::helm::ntfy_helm_chart::ntfy_helm_chart_score,
score::Score,
topology::{HelmCommand, K8sclient, Topology, k8s::K8sClient},
topology::{HelmCommand, K8sclient, MultiTargetTopology, Topology, k8s::K8sClient},
};
#[derive(Debug, Clone, Serialize)]
@@ -20,7 +20,7 @@ pub struct NtfyScore {
pub host: String,
}
impl<T: Topology + HelmCommand + K8sclient> Score<T> for NtfyScore {
impl<T: Topology + HelmCommand + K8sclient + MultiTargetTopology> Score<T> for NtfyScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(NtfyInterpret {
score: self.clone(),
@@ -28,7 +28,7 @@ impl<T: Topology + HelmCommand + K8sclient> Score<T> for NtfyScore {
}
fn name(&self) -> String {
format!("Ntfy")
"alert receiver [NtfyScore]".into()
}
}
@@ -39,31 +39,21 @@ pub struct NtfyInterpret {
#[derive(Debug, EnumString, Display)]
enum NtfyAccessMode {
#[strum(serialize = "read-write", serialize = "rw", to_string = "read-write")]
#[strum(serialize = "read-write", serialize = "rw")]
ReadWrite,
#[strum(
serialize = "read-only",
serialize = "ro",
serialize = "read",
to_string = "read-only"
)]
#[strum(serialize = "read-only", serialize = "ro", serialize = "read")]
ReadOnly,
#[strum(
serialize = "write-only",
serialize = "wo",
serialize = "write",
to_string = "write-only"
)]
#[strum(serialize = "write-only", serialize = "wo", serialize = "write")]
WriteOnly,
#[strum(serialize = "none", to_string = "deny")]
#[strum(serialize = "deny", serialize = "none")]
Deny,
}
#[derive(Debug, EnumString, Display)]
enum NtfyRole {
#[strum(serialize = "user", to_string = "user")]
#[strum(serialize = "user")]
User,
#[strum(serialize = "admin", to_string = "admin")]
#[strum(serialize = "admin")]
Admin,
}
@@ -87,7 +77,7 @@ impl NtfyInterpret {
vec![
"sh",
"-c",
format!("NTFY_PASSWORD={password} ntfy user add --role={role} {username}")
format!("NTFY_PASSWORD={password} ntfy user add --role={role} --ignore-exists {username}")
.as_str(),
],
)
@@ -95,69 +85,52 @@ impl NtfyInterpret {
Ok(())
}
async fn set_access(
&self,
k8s_client: Arc<K8sClient>,
username: &str,
topic: &str,
mode: NtfyAccessMode,
) -> Result<(), String> {
k8s_client
.exec_app(
"ntfy".to_string(),
Some(&self.score.namespace),
vec![
"sh",
"-c",
format!("ntfy access {username} {topic} {mode}").as_str(),
],
)
.await?;
Ok(())
}
}
/// We need a ntfy interpret to wrap the HelmChartScore in order to run the score, and then bootstrap the config inside ntfy
#[async_trait]
impl<T: Topology + HelmCommand + K8sclient> Interpret<T> for NtfyInterpret {
impl<T: Topology + HelmCommand + K8sclient + MultiTargetTopology> Interpret<T> for NtfyInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
ntfy_helm_chart_score(self.score.namespace.clone(), self.score.host.clone())
.create_interpret()
.execute(inventory, topology)
.await?;
ntfy_helm_chart_score(
self.score.namespace.clone(),
self.score.host.clone(),
topology.current_target(),
)
.interpret(inventory, topology)
.await?;
debug!("installed ntfy helm chart");
info!("installed ntfy helm chart");
let client = topology
.k8s_client()
.await
.expect("couldn't get k8s client");
info!("deploying ntfy...");
client
.wait_until_deployment_ready(
"ntfy".to_string(),
Some(&self.score.namespace.as_str()),
Some(self.score.namespace.as_str()),
None,
)
.await?;
debug!("created k8s client");
info!("ntfy deployed");
info!("adding user harmony");
self.add_user(client, "harmony", "harmony", Some(NtfyRole::Admin))
.await?;
info!("user added");
debug!("exec into pod done");
Ok(Outcome::success("installed ntfy".to_string()))
Ok(Outcome::success("Ntfy installed".to_string()))
}
fn get_name(&self) -> InterpretName {
todo!()
InterpretName::Ntfy
}
fn get_version(&self) -> Version {
todo!()
}

View File

@@ -1,3 +1,4 @@
pub mod helm;
#[allow(clippy::module_inception)]
pub mod prometheus;
pub mod prometheus_config;

View File

@@ -37,6 +37,12 @@ impl AlertSender for Prometheus {
}
}
impl Default for Prometheus {
fn default() -> Self {
Self::new()
}
}
impl Prometheus {
pub fn new() -> Self {
Self {
@@ -94,8 +100,7 @@ impl Prometheus {
topology: &T,
) -> Result<Outcome, InterpretError> {
prometheus_helm_chart_score(self.config.clone())
.create_interpret()
.execute(inventory, topology)
.interpret(inventory, topology)
.await
}
pub async fn install_grafana<T: Topology + HelmCommand + Send + Sync>(
@@ -110,13 +115,12 @@ impl Prometheus {
if let Some(ns) = namespace.as_deref() {
grafana_helm_chart_score(ns)
.create_interpret()
.execute(inventory, topology)
.interpret(inventory, topology)
.await
} else {
Err(InterpretError::new(format!(
"could not install grafana, missing namespace",
)))
Err(InterpretError::new(
"could not install grafana, missing namespace".to_string(),
))
}
}
}

View File

@@ -16,6 +16,12 @@ pub struct PrometheusConfig {
pub additional_service_monitors: Vec<ServiceMonitor>,
}
impl Default for PrometheusConfig {
fn default() -> Self {
Self::new()
}
}
impl PrometheusConfig {
pub fn new() -> Self {
Self {

View File

@@ -32,7 +32,7 @@ impl OKDBootstrapDhcpScore {
logical_host: topology.bootstrap_host.clone(),
physical_host: inventory
.worker_host
.get(0)
.first()
.expect("Should have at least one worker to be used as bootstrap node")
.clone(),
});

View File

@@ -0,0 +1,868 @@
//! OKDInstallationScore
//!
//! Overview
//! --------
//! OKDInstallationScore orchestrates an end-to-end, bare-metal OKD (OpenShift/OKD 4.19)
//! installation using Harmonys strongly-typed Scores and Interprets. It encodes the
//! “discovery-first, then provision” strategy with strict ordering, observable progress,
//! and minimal assumptions about the underlying network.
//!
//! Design goals
//! - Deterministic, observable pipeline from unknown hardware to a healthy OKD cluster.
//! - Do NOT require LACP bonding during PXE/inventory. Bonding is configured only
//! after the host has a stable OS on disk (SCOS/RHCOS) and OKD MachineConfigs/NNCP
//! can enforce persistence safely.
//! - Support per-MAC iPXE rendering without requiring multiple DHCP reservations for
//! the same host. Discovery runs with generic DHCP (access/unbonded). Role-specific
//! per-MAC PXE entries are activated just-in-time before install.
//! - Emit HarmonyEvent instrumentation at each step via the Score::interpret path.
//!
//! High-level flow
//! 1) OKDSetup01Inventory
//! - Serve default iPXE + Kickstart (in-RAM CentOS Stream 9) for discovery only.
//! - Enable SSH with the clusters ephemeral pubkey, start a Rust inventory agent.
//! - Harmony discovers nodes by scraping the agent endpoint and collects MACs/NICs.
//! - DNS: optionally register temporary hostnames and enable DHCP lease registration.
//!
//! 2) OKDSetup02Bootstrap
//! - User selects which discovered node becomes bootstrap.
//! - Render per-MAC iPXE for bootstrap with OKD 4.19 SCOS live assets + ignition.
//! - Reboot node via SSH; install bootstrap; wait for bootstrap-complete.
//!
//! 3) OKDSetup03ControlPlane
//! - Render per-MAC iPXE for cp0/cp1/cp2 with ignition (includes persistent bond via
//! MachineConfig or NNCP if required). Reboot via SSH, join masters.
//!
//! 4) OKDSetup04Workers
//! - Render per-MAC iPXE for worker set; join workers.
//!
//! 5) OKDSetup05SanityCheck
//! - Validate API/ingress/clusteroperators; ensure healthy control plane and SDN.
//!
//! 6) OKDSetup06InstallationReport
//! - Produce a concise, machine-readable report (JSON) and a human summary.
//!
//! Network notes
//! - During Inventory: ports must be simple access (no LACP). DHCP succeeds; iPXE
//! loads CentOS Stream live with Kickstart and starts the inventory endpoint.
//! - During Provisioning: only after SCOS is on disk and Ignition/MC can be applied
//! do we set the bond persistently. If early bonding is truly required on a host,
//! use kernel args selectively in the per-MAC PXE for that host, but never for the
//! generic discovery path.
//!
//! DNS and hostname
//! - Because a single host may present multiple MACs, but DHCP/ISC on OPNsense may not
//! easily support “one hostname across multiple MACs” in a single lease entry, we avoid
//! strict hostname binding during discovery. We rely on dynamic leases and record the
//! mapping (IP/MAC) at scrape time.
//! - Once a role is assigned, we render a per-MAC PXE entry and ensure the role-specific
//! DNS A/AAAA/CNAME entries are present (e.g., api, api-int, apps wildcard). This keeps
//! DHCP simple and DNS consistent for OKD.
//!
//! Instrumentation
//! - All child Scores are executed via Score::interpret, which emits HarmonyEvent
//! InterpretExecutionStarted/Finished. The orchestrator also emits HarmonyStarted/
//! HarmonyFinished around the full pipeline execution.
//!
//! Configuration knobs
//! - lan_cidr: CIDR to scan/allow for discovery endpoints.
//! - public_domain: External wildcard/apps domain (e.g., apps.example.com).
//! - internal_domain: Internal cluster domain (e.g., cluster.local or harmony.mcd).
//!
//! Notes
//! - This file co-locates step Scores for ease of review. In follow-up changes, refactor
//! step Scores (OKDSetupXX*) into separate modules.
use async_trait::async_trait;
use derive_new::new;
use harmony_macros::{ip, ipv4};
use log::info;
use serde::{Deserialize, Serialize};
use crate::{
data::Version,
instrumentation::{instrument, HarmonyEvent},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{DnsRecord, DnsRecordType, DnsServer, Topology},
};
// -------------------------------------------------------------------------------------------------
// Public Orchestrator Score
// -------------------------------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize, Deserialize, new)]
pub struct OKDInstallationScore {
/// The LAN CIDR where discovery endpoints live (e.g., 192.168.10.0/24)
pub lan_cidr: String,
/// Public external domain (e.g., example.com). Used for api/apps wildcard, etc.
pub public_domain: String,
/// Internal cluster domain (e.g., harmony.mcd). Used for internal svc/ingress and DNS.
pub internal_domain: String,
}
impl<T: Topology + DnsServer + 'static> Score<T> for OKDInstallationScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(OKDInstallationInterpret::new(self.clone()))
}
fn name(&self) -> String {
"OKDInstallationScore".to_string()
}
}
// -------------------------------------------------------------------------------------------------
// Orchestrator Interpret
// -------------------------------------------------------------------------------------------------
#[derive(Debug, Clone)]
pub struct OKDInstallationInterpret {
score: OKDInstallationScore,
version: Version,
status: InterpretStatus,
}
impl OKDInstallationInterpret {
pub fn new(score: OKDInstallationScore) -> Self {
let version = Version::from("0.1.0").expect("valid version");
Self {
score,
version,
status: InterpretStatus::QUEUED,
}
}
async fn run_inventory_phase<T: Topology + DnsServer>(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<(), InterpretError> {
// 1) Prepare DNS and DHCP lease registration (optional)
let dns_score = OKDSetup01InventoryDnsScore::new(
self.score.internal_domain.clone(),
self.score.public_domain.clone(),
Some(true), // register_dhcp_leases
);
dns_score.interpret(inventory, topology).await?;
// 2) Serve default iPXE + Kickstart and poll discovery
let discovery_score = OKDSetup01InventoryScore::new(self.score.lan_cidr.clone());
discovery_score.interpret(inventory, topology).await?;
Ok(())
}
async fn run_bootstrap_phase<T: Topology + DnsServer>(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<(), InterpretError> {
// Select and provision bootstrap
let bootstrap_score = OKDSetup02BootstrapScore::new(
self.score.public_domain.clone(),
self.score.internal_domain.clone(),
);
bootstrap_score.interpret(inventory, topology).await?;
Ok(())
}
async fn run_control_plane_phase<T: Topology + DnsServer>(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<(), InterpretError> {
let control_plane_score = OKDSetup03ControlPlaneScore::new();
control_plane_score.interpret(inventory, topology).await?;
Ok(())
}
async fn run_workers_phase<T: Topology + DnsServer>(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<(), InterpretError> {
let workers_score = OKDSetup04WorkersScore::new();
workers_score.interpret(inventory, topology).await?;
Ok(())
}
async fn run_sanity_phase<T: Topology + DnsServer>(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<(), InterpretError> {
let sanity_score = OKDSetup05SanityCheckScore::new();
sanity_score.interpret(inventory, topology).await?;
Ok(())
}
async fn run_report_phase<T: Topology + DnsServer>(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<(), InterpretError> {
let report_score = OKDSetup06InstallationReportScore::new(
self.score.public_domain.clone(),
self.score.internal_domain.clone(),
);
report_score.interpret(inventory, topology).await?;
Ok(())
}
}
#[async_trait]
impl<T: Topology + DnsServer> Interpret<T> for OKDInstallationInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDInstallationInterpret")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<crate::domain::data::Id> {
vec![]
}
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
instrument(HarmonyEvent::HarmonyStarted).ok();
info!(
"Starting OKD installation pipeline for public_domain={} internal_domain={} lan_cidr={}",
self.score.public_domain, self.score.internal_domain, self.score.lan_cidr
);
// 1) Inventory (default PXE, in-RAM kickstart, Rust inventory agent)
self.run_inventory_phase(inventory, topology).await?;
// 2) Bootstrap (render per-MAC iPXE + ignition; reboot node; wait for bootstrap complete)
self.run_bootstrap_phase(inventory, topology).await?;
// 3) Control plane
self.run_control_plane_phase(inventory, topology).await?;
// 4) Workers
self.run_workers_phase(inventory, topology).await?;
// 5) Sanity checks
self.run_sanity_phase(inventory, topology).await?;
// 6) Installation report
self.run_report_phase(inventory, topology).await?;
instrument(HarmonyEvent::HarmonyFinished).ok();
Ok(Outcome::new(
InterpretStatus::SUCCESS,
"OKD installation pipeline completed".into(),
))
}
}
// -------------------------------------------------------------------------------------------------
// Step 01: Inventory DNS setup
// - Keep DHCP simple; optionally register dynamic leases into DNS.
// - Ensure base records for internal/public domains (api/api-int/apps wildcard).
// -------------------------------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize, new)]
struct OKDSetup01InventoryDnsScore {
internal_domain: String,
public_domain: String,
register_dhcp_leases: Option<bool>,
}
impl<T: Topology + DnsServer> Score<T> for OKDSetup01InventoryDnsScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(OKDSetup01InventoryDnsInterpret::new(self.clone()))
}
fn name(&self) -> String {
"OKDSetup01InventoryDnsScore".to_string()
}
}
#[derive(Debug, Clone)]
struct OKDSetup01InventoryDnsInterpret {
score: OKDSetup01InventoryDnsScore,
version: Version,
status: InterpretStatus,
}
impl OKDSetup01InventoryDnsInterpret {
pub fn new(score: OKDSetup01InventoryDnsScore) -> Self {
let version = Version::from("1.0.0").unwrap();
Self {
version,
score,
status: InterpretStatus::QUEUED,
}
}
async fn ensure_dns<T: DnsServer>(&self, dns: &T) -> Result<(), InterpretError> {
// Minimal records placeholders; real IPs are set elsewhere in the flow.
// We register the names early to ensure resolvability for clients relying on DNS.
let mut records: Vec<DnsRecord> = vec![
DnsRecord {
value: ip!("0.0.0.0"),
host: "api".to_string(),
domain: self.score.internal_domain.clone(),
record_type: DnsRecordType::A,
},
DnsRecord {
value: ip!("0.0.0.0"),
host: "api-int".to_string(),
domain: self.score.internal_domain.clone(),
record_type: DnsRecordType::A,
},
DnsRecord {
value: ip!("0.0.0.0"),
host: "*.apps.".to_string(),
domain: self.score.internal_domain.clone(),
record_type: DnsRecordType::A,
},
];
dns.ensure_hosts_registered(records.drain(..).collect())
.await?;
if let Some(register) = self.score.register_dhcp_leases {
dns.register_dhcp_leases(register).await?;
}
dns.commit_config().await?;
Ok(())
}
}
#[async_trait]
impl<T: Topology + DnsServer> Interpret<T> for OKDSetup01InventoryDnsInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDSetup01InventoryDns")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<crate::domain::data::Id> {
vec![]
}
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
info!("Ensuring base DNS and DHCP lease registration for discovery phase");
self.ensure_dns(topology).await?;
Ok(Outcome::new(
InterpretStatus::SUCCESS,
"Inventory DNS prepared".into(),
))
}
}
// -------------------------------------------------------------------------------------------------
// Step 01: Inventory (default PXE + Kickstart in RAM + Rust agent)
// - This score exposes/ensures the default inventory assets and waits for discoveries.
// - No early bonding. Simple access DHCP.
// -------------------------------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize, new)]
struct OKDSetup01InventoryScore {
lan_cidr: String,
}
impl<T: Topology> Score<T> for OKDSetup01InventoryScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(OKDSetup01InventoryInterpret::new(self.clone()))
}
fn name(&self) -> String {
"OKDSetup01InventoryScore".to_string()
}
}
#[derive(Debug, Clone)]
struct OKDSetup01InventoryInterpret {
score: OKDSetup01InventoryScore,
version: Version,
status: InterpretStatus,
}
impl OKDSetup01InventoryInterpret {
pub fn new(score: OKDSetup01InventoryScore) -> Self {
let version = Version::from("1.0.0").unwrap();
Self {
version,
score,
status: InterpretStatus::QUEUED,
}
}
async fn ensure_inventory_assets<T: Topology>(
&self,
topology: &T,
) -> Result<(), InterpretError> {
// Placeholder: push or verify iPXE default, Kickstart, and Rust inventory agent are hosted.
// Real implementation: publish to the PXE/HTTP server via the topology.
info!(
"[Inventory] Ensuring default iPXE, Kickstart, and inventory agent are available for LAN {}",
self.score.lan_cidr
);
// topology.publish_http_asset(…) ?
Ok(())
}
async fn discover_nodes(&self) -> Result<usize, InterpretError> {
// Placeholder: implement Harmony discovery logic (scan/pull/push mode).
// Returns number of newly discovered nodes.
info!(
"[Inventory] Scanning for inventory agents in {}",
self.score.lan_cidr
);
// In practice, this would query harmony_composer or a local registry store.
Ok(3)
}
}
#[async_trait]
impl<T: Topology> Interpret<T> for OKDSetup01InventoryInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDSetup01Inventory")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<crate::domain::data::Id> {
vec![]
}
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
self.ensure_inventory_assets(topology).await?;
let count = self.discover_nodes().await?;
info!("[Inventory] Discovered {count} nodes");
Ok(Outcome::new(
InterpretStatus::SUCCESS,
format!("Inventory phase complete. Nodes discovered: {count}"),
))
}
}
// -------------------------------------------------------------------------------------------------
// Step 02: Bootstrap
// - Select bootstrap node (from discovered set).
// - Render per-MAC iPXE pointing to OKD 4.19 SCOS live assets + bootstrap ignition.
// - Reboot the host via SSH and wait for bootstrap-complete.
// - No bonding at this stage unless absolutely required; prefer persistence via MC later.
// -------------------------------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize, new)]
struct OKDSetup02BootstrapScore {
public_domain: String,
internal_domain: String,
}
impl<T: Topology> Score<T> for OKDSetup02BootstrapScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(OKDSetup02BootstrapInterpret::new(self.clone()))
}
fn name(&self) -> String {
"OKDSetup02BootstrapScore".to_string()
}
}
#[derive(Debug, Clone)]
struct OKDSetup02BootstrapInterpret {
score: OKDSetup02BootstrapScore,
version: Version,
status: InterpretStatus,
}
impl OKDSetup02BootstrapInterpret {
pub fn new(score: OKDSetup02BootstrapScore) -> Self {
let version = Version::from("1.0.0").unwrap();
Self {
version,
score,
status: InterpretStatus::QUEUED,
}
}
async fn render_per_mac_pxe(&self) -> Result<(), InterpretError> {
// Placeholder: use Harmony templates to emit {MAC}.ipxe selecting SCOS live + bootstrap ignition.
info!("[Bootstrap] Rendering per-MAC PXE for bootstrap node");
Ok(())
}
async fn reboot_target(&self) -> Result<(), InterpretError> {
// Placeholder: ssh reboot using the inventory ephemeral key
info!("[Bootstrap] Rebooting bootstrap node via SSH");
Ok(())
}
async fn wait_for_bootstrap_complete(&self) -> Result<(), InterpretError> {
// Placeholder: wait-for bootstrap-complete
info!("[Bootstrap] Waiting for bootstrap-complete …");
Ok(())
}
}
#[async_trait]
impl<T: Topology> Interpret<T> for OKDSetup02BootstrapInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDSetup02Bootstrap")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<crate::domain::data::Id> {
vec![]
}
async fn execute(
&self,
_inventory: &Inventory,
_topology: &T,
) -> Result<Outcome, InterpretError> {
self.render_per_mac_pxe().await?;
self.reboot_target().await?;
self.wait_for_bootstrap_complete().await?;
Ok(Outcome::new(
InterpretStatus::SUCCESS,
"Bootstrap phase complete".into(),
))
}
}
// -------------------------------------------------------------------------------------------------
// Step 03: Control Plane
// - Render per-MAC PXE & ignition for cp0/cp1/cp2.
// - Persist bonding via MachineConfigs (or NNCP) once SCOS is active.
// -------------------------------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize, new)]
struct OKDSetup03ControlPlaneScore {}
impl<T: Topology> Score<T> for OKDSetup03ControlPlaneScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(OKDSetup03ControlPlaneInterpret::new(self.clone()))
}
fn name(&self) -> String {
"OKDSetup03ControlPlaneScore".to_string()
}
}
#[derive(Debug, Clone)]
struct OKDSetup03ControlPlaneInterpret {
score: OKDSetup03ControlPlaneScore,
version: Version,
status: InterpretStatus,
}
impl OKDSetup03ControlPlaneInterpret {
pub fn new(score: OKDSetup03ControlPlaneScore) -> Self {
let version = Version::from("1.0.0").unwrap();
Self {
version,
score,
status: InterpretStatus::QUEUED,
}
}
async fn render_and_reboot(&self) -> Result<(), InterpretError> {
info!("[ControlPlane] Rendering per-MAC PXE for masters and rebooting");
Ok(())
}
async fn persist_network_bond(&self) -> Result<(), InterpretError> {
// Generate MC or NNCP from inventory NIC data; apply via ignition or post-join.
info!("[ControlPlane] Ensuring persistent bonding via MachineConfig/NNCP");
Ok(())
}
}
#[async_trait]
impl<T: Topology> Interpret<T> for OKDSetup03ControlPlaneInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDSetup03ControlPlane")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<crate::domain::data::Id> {
vec![]
}
async fn execute(
&self,
_inventory: &Inventory,
_topology: &T,
) -> Result<Outcome, InterpretError> {
self.render_and_reboot().await?;
self.persist_network_bond().await?;
Ok(Outcome::new(
InterpretStatus::SUCCESS,
"Control plane provisioned".into(),
))
}
}
// -------------------------------------------------------------------------------------------------
// Step 04: Workers
// - Render per-MAC PXE & ignition for workers; join nodes.
// - Persist bonding via MC/NNCP as required (same approach as masters).
// -------------------------------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize, new)]
struct OKDSetup04WorkersScore {}
impl<T: Topology> Score<T> for OKDSetup04WorkersScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(OKDSetup04WorkersInterpret::new(self.clone()))
}
fn name(&self) -> String {
"OKDSetup04WorkersScore".to_string()
}
}
#[derive(Debug, Clone)]
struct OKDSetup04WorkersInterpret {
score: OKDSetup04WorkersScore,
version: Version,
status: InterpretStatus,
}
impl OKDSetup04WorkersInterpret {
pub fn new(score: OKDSetup04WorkersScore) -> Self {
let version = Version::from("1.0.0").unwrap();
Self {
version,
score,
status: InterpretStatus::QUEUED,
}
}
async fn render_and_reboot(&self) -> Result<(), InterpretError> {
info!("[Workers] Rendering per-MAC PXE for workers and rebooting");
Ok(())
}
}
#[async_trait]
impl<T: Topology> Interpret<T> for OKDSetup04WorkersInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDSetup04Workers")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<crate::domain::data::Id> {
vec![]
}
async fn execute(
&self,
_inventory: &Inventory,
_topology: &T,
) -> Result<Outcome, InterpretError> {
self.render_and_reboot().await?;
Ok(Outcome::new(
InterpretStatus::SUCCESS,
"Workers provisioned".into(),
))
}
}
// -------------------------------------------------------------------------------------------------
// Step 05: Sanity Check
// - Validate API reachability, ClusterOperators, ingress, and SDN status.
// -------------------------------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize, new)]
struct OKDSetup05SanityCheckScore {}
impl<T: Topology> Score<T> for OKDSetup05SanityCheckScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(OKDSetup05SanityCheckInterpret::new(self.clone()))
}
fn name(&self) -> String {
"OKDSetup05SanityCheckScore".to_string()
}
}
#[derive(Debug, Clone)]
struct OKDSetup05SanityCheckInterpret {
score: OKDSetup05SanityCheckScore,
version: Version,
status: InterpretStatus,
}
impl OKDSetup05SanityCheckInterpret {
pub fn new(score: OKDSetup05SanityCheckScore) -> Self {
let version = Version::from("1.0.0").unwrap();
Self {
version,
score,
status: InterpretStatus::QUEUED,
}
}
async fn run_checks(&self) -> Result<(), InterpretError> {
info!("[Sanity] Checking API, COs, Ingress, and SDN health …");
Ok(())
}
}
#[async_trait]
impl<T: Topology> Interpret<T> for OKDSetup05SanityCheckInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDSetup05SanityCheck")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<crate::domain::data::Id> {
vec![]
}
async fn execute(
&self,
_inventory: &Inventory,
_topology: &T,
) -> Result<Outcome, InterpretError> {
self.run_checks().await?;
Ok(Outcome::new(
InterpretStatus::SUCCESS,
"Sanity checks passed".into(),
))
}
}
// -------------------------------------------------------------------------------------------------
// Step 06: Installation Report
// - Emit JSON and concise human summary of nodes, roles, versions, and health.
// -------------------------------------------------------------------------------------------------
#[derive(Debug, Clone, Serialize, new)]
struct OKDSetup06InstallationReportScore {
public_domain: String,
internal_domain: String,
}
impl<T: Topology> Score<T> for OKDSetup06InstallationReportScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(OKDSetup06InstallationReportInterpret::new(self.clone()))
}
fn name(&self) -> String {
"OKDSetup06InstallationReportScore".to_string()
}
}
#[derive(Debug, Clone)]
struct OKDSetup06InstallationReportInterpret {
score: OKDSetup06InstallationReportScore,
version: Version,
status: InterpretStatus,
}
impl OKDSetup06InstallationReportInterpret {
pub fn new(score: OKDSetup06InstallationReportScore) -> Self {
let version = Version::from("1.0.0").unwrap();
Self {
version,
score,
status: InterpretStatus::QUEUED,
}
}
async fn generate(&self) -> Result<(), InterpretError> {
info!(
"[Report] Generating installation report for {} / {}",
self.score.public_domain, self.score.internal_domain
);
Ok(())
}
}
#[async_trait]
impl<T: Topology> Interpret<T> for OKDSetup06InstallationReportInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("OKDSetup06InstallationReport")
}
fn get_version(&self) -> Version {
self.version.clone()
}
fn get_status(&self) -> InterpretStatus {
self.status.clone()
}
fn get_children(&self) -> Vec<crate::domain::data::Id> {
vec![]
}
async fn execute(
&self,
_inventory: &Inventory,
_topology: &T,
) -> Result<Outcome, InterpretError> {
self.generate().await?;
Ok(Outcome::new(
InterpretStatus::SUCCESS,
"Installation report generated".into(),
))
}
}

View File

@@ -4,3 +4,4 @@ pub mod dhcp;
pub mod dns;
pub mod load_balancer;
pub mod upgrade;
pub mod installation;

View File

@@ -6,6 +6,12 @@ pub struct OKDUpgradeScore {
_target_version: Version,
}
impl Default for OKDUpgradeScore {
fn default() -> Self {
Self::new()
}
}
impl OKDUpgradeScore {
pub fn new() -> Self {
Self {

View File

@@ -61,7 +61,7 @@ impl<T: Topology + K8sclient + PrometheusApplicationMonitoring<CRDPrometheus>> S
}
fn name(&self) -> String {
"CRDApplicationAlertingScore".into()
"prometheus alerting [CRDAlertingScore]".into()
}
}
@@ -93,13 +93,13 @@ impl<T: Topology + K8sclient + PrometheusApplicationMonitoring<CRDPrometheus>> I
self.install_rules(&self.prometheus_rules, &client).await?;
self.install_monitors(self.service_monitors.clone(), &client)
.await?;
Ok(Outcome::success(format!(
"deployed application monitoring composants"
)))
Ok(Outcome::success(
"K8s monitoring components installed".to_string(),
))
}
fn get_name(&self) -> InterpretName {
todo!()
InterpretName::K8sPrometheusCrdAlerting
}
fn get_version(&self) -> Version {
@@ -118,7 +118,7 @@ impl<T: Topology + K8sclient + PrometheusApplicationMonitoring<CRDPrometheus>> I
impl K8sPrometheusCRDAlertingInterpret {
async fn crd_exists(&self, crd: &str) -> bool {
let status = Command::new("sh")
.args(["-c", "kubectl get crd -A | grep -i", crd])
.args(["-c", &format!("kubectl get crd -A | grep -i {crd}")])
.status()
.map_err(|e| InterpretError::new(format!("could not connect to cluster: {}", e)))
.unwrap();
@@ -166,7 +166,8 @@ impl K8sPrometheusCRDAlertingInterpret {
let install_output = Command::new("helm")
.args([
"install",
"upgrade",
"--install",
&chart_name,
tgz_path.to_str().unwrap(),
"--namespace",
@@ -415,7 +416,7 @@ impl K8sPrometheusCRDAlertingInterpret {
async fn install_rules(
&self,
rules: &Vec<RuleGroup>,
#[allow(clippy::ptr_arg)] rules: &Vec<RuleGroup>,
client: &Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let mut prom_rule_spec = PrometheusRuleSpec {
@@ -423,7 +424,7 @@ impl K8sPrometheusCRDAlertingInterpret {
};
let default_rules_group = RuleGroup {
name: format!("default-rules"),
name: "default-rules".to_string(),
rules: build_default_application_rules(),
};

View File

@@ -1,3 +1,4 @@
pub mod alerts;
pub mod k8s_prometheus_alerting_score;
#[allow(clippy::module_inception)]
pub mod prometheus;

View File

@@ -1,9 +1,11 @@
use async_trait::async_trait;
use crate::{
interpret::{InterpretError, Outcome},
inventory::Inventory,
topology::oberservability::monitoring::{AlertReceiver, AlertSender},
topology::{
PreparationError, PreparationOutcome,
oberservability::monitoring::{AlertReceiver, AlertSender},
},
};
#[async_trait]
@@ -13,5 +15,5 @@ pub trait PrometheusApplicationMonitoring<S: AlertSender> {
sender: &S,
inventory: &Inventory,
receivers: Option<Vec<Box<dyn AlertReceiver<S>>>>,
) -> Result<Outcome, InterpretError>;
) -> Result<PreparationOutcome, PreparationError>;
}

View File

@@ -17,7 +17,7 @@ impl<T: Topology + TenantCredentialManager> Score<T> for TenantCredentialScore {
}
fn name(&self) -> String {
todo!()
"TenantCredentialScore".into()
}
}

View File

@@ -0,0 +1,51 @@
use async_trait::async_trait;
use chrono::{DateTime, Utc};
use serde::Serialize;
use crate::{interpret::InterpretError, score::Score, topology::Topology};
/// Create and manage Tenant Credentials.
///
/// This is meant to be used by cluster administrators who need to provide their tenant users and
/// services with credentials to access their resources.
#[derive(Debug, Clone, Serialize)]
pub struct TenantCredentialScore;
impl<T: Topology + TenantCredentialManager> Score<T> for TenantCredentialScore {
fn create_interpret(&self) -> Box<dyn crate::interpret::Interpret<T>> {
todo!()
}
fn name(&self) -> String {
todo!()
}
}
#[async_trait]
pub trait TenantCredentialManager {
async fn create_user(&self) -> Result<TenantCredentialBundle, InterpretError>;
}
#[derive(Debug, Clone)]
pub struct CredentialMetadata {
pub tenant_id: String,
pub credential_id: String,
pub description: String,
pub created_at: DateTime<Utc>,
pub expires_at: Option<DateTime<Utc>>,
}
#[derive(Debug, Clone)]
pub enum CredentialData {
/// Used to store login instructions destined to a human. Akin to AWS login instructions email
/// upon new console user creation.
PlainText(String),
}
pub struct TenantCredentialBundle {
_metadata: CredentialMetadata,
_content: CredentialData,
}
impl TenantCredentialBundle {}

View File

@@ -28,7 +28,7 @@ impl<T: Topology + TenantManager> Score<T> for TenantScore {
}
fn name(&self) -> String {
format!("{} TenantScore", self.config.name)
format!("{} [TenantScore]", self.config.name)
}
}
@@ -47,8 +47,8 @@ impl<T: Topology + TenantManager> Interpret<T> for TenantInterpret {
topology.provision_tenant(&self.tenant_config).await?;
Ok(Outcome::success(format!(
"Successfully provisioned tenant {} with id {}",
self.tenant_config.name, self.tenant_config.id
"Tenant provisioned with id '{}'",
self.tenant_config.id
)))
}

View File

@@ -5,6 +5,10 @@ version.workspace = true
readme.workspace = true
license.workspace = true
[features]
default = ["tui"]
tui = ["dep:harmony_tui"]
[dependencies]
assert_cmd = "2.0.17"
clap = { version = "4.5.35", features = ["derive"] }
@@ -19,7 +23,5 @@ lazy_static = "1.5.0"
log.workspace = true
indicatif-log-bridge = "0.2.3"
[features]
default = ["tui"]
tui = ["dep:harmony_tui"]
[dev-dependencies]
harmony = { path = "../harmony", features = ["testing"] }

View File

@@ -1,16 +1,22 @@
use harmony::instrumentation::{self, HarmonyEvent};
use indicatif::{MultiProgress, ProgressBar};
use harmony::{
instrumentation::{self, HarmonyEvent},
modules::application::ApplicationFeatureStatus,
topology::TopologyStatus,
};
use indicatif::MultiProgress;
use indicatif_log_bridge::LogWrapper;
use log::error;
use std::{
collections::{HashMap, hash_map},
sync::{Arc, Mutex},
thread,
time::Duration,
};
use crate::progress;
use crate::progress::{IndicatifProgressTracker, ProgressTracker};
pub fn init() -> tokio::task::JoinHandle<()> {
configure_logger();
let handle = tokio::spawn(handle_events());
let base_progress = configure_logger();
let handle = tokio::spawn(handle_events(base_progress));
loop {
if instrumentation::instrument(HarmonyEvent::HarmonyStarted).is_ok() {
@@ -21,91 +27,175 @@ pub fn init() -> tokio::task::JoinHandle<()> {
handle
}
fn configure_logger() {
fn configure_logger() -> MultiProgress {
let logger =
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build();
let level = logger.filter();
let multi = MultiProgress::new();
LogWrapper::new(multi.clone(), logger).try_init().unwrap();
let progress = MultiProgress::new();
LogWrapper::new(progress.clone(), logger)
.try_init()
.unwrap();
log::set_max_level(level);
progress
}
async fn handle_events() {
instrumentation::subscribe("Harmony CLI Logger", {
let sections: Arc<Mutex<HashMap<String, MultiProgress>>> =
Arc::new(Mutex::new(HashMap::new()));
let progress_bars: Arc<Mutex<HashMap<String, ProgressBar>>> =
Arc::new(Mutex::new(HashMap::new()));
async fn handle_events(base_progress: MultiProgress) {
let progress_tracker = Arc::new(IndicatifProgressTracker::new(base_progress.clone()));
let preparing_topology = Arc::new(Mutex::new(false));
let current_score: Arc<Mutex<Option<String>>> = Arc::new(Mutex::new(None));
instrumentation::subscribe("Harmony CLI Logger", {
move |event| {
let sections_clone = Arc::clone(&sections);
let progress_bars_clone = Arc::clone(&progress_bars);
let progress_tracker = Arc::clone(&progress_tracker);
let preparing_topology = Arc::clone(&preparing_topology);
let current_score = Arc::clone(&current_score);
async move {
let mut sections = sections_clone.lock().unwrap();
let mut progress_bars = progress_bars_clone.lock().unwrap();
let mut preparing_topology = preparing_topology.lock().unwrap();
let mut current_score = current_score.lock().unwrap();
match event {
HarmonyEvent::HarmonyStarted => {}
HarmonyEvent::PrepareTopologyStarted { topology: name } => {
let section = progress::new_section(format!(
"{} Preparing environment: {name}...",
crate::theme::EMOJI_TOPOLOGY,
));
(*sections).insert(name, section);
HarmonyEvent::HarmonyFinished => {
progress_tracker.add_section(
"harmony-summary",
&format!("\n{} Harmony completed\n\n", crate::theme::EMOJI_HARMONY),
);
progress_tracker.add_section("harmony-finished", "\n\n");
thread::sleep(Duration::from_millis(200));
return false;
}
HarmonyEvent::TopologyPrepared {
topology: name,
outcome,
HarmonyEvent::TopologyStateChanged {
topology,
status,
message,
} => {
let section = (*sections).get(&name).unwrap();
let progress = progress::add_spinner(section, "".into());
let section_key = topology_key(&topology);
match outcome.status {
harmony::interpret::InterpretStatus::SUCCESS => {
progress::success(section, Some(progress), outcome.message);
match status {
TopologyStatus::Queued => {}
TopologyStatus::Preparing => {
progress_tracker.add_section(
&section_key,
&format!(
"\n{} Preparing environment: {topology}...",
crate::theme::EMOJI_TOPOLOGY
),
);
(*preparing_topology) = true;
}
harmony::interpret::InterpretStatus::FAILURE => {
progress::error(section, Some(progress), outcome.message);
TopologyStatus::Success => {
(*preparing_topology) = false;
progress_tracker.add_task(&section_key, "topology-success", "");
progress_tracker
.finish_task("topology-success", &message.unwrap_or("".into()));
}
harmony::interpret::InterpretStatus::RUNNING => todo!(),
harmony::interpret::InterpretStatus::QUEUED => todo!(),
harmony::interpret::InterpretStatus::BLOCKED => todo!(),
harmony::interpret::InterpretStatus::NOOP => {
progress::skip(section, Some(progress), outcome.message);
TopologyStatus::Noop => {
(*preparing_topology) = false;
progress_tracker.add_task(&section_key, "topology-skip", "");
progress_tracker
.skip_task("topology-skip", &message.unwrap_or("".into()));
}
TopologyStatus::Error => {
progress_tracker.add_task(&section_key, "topology-error", "");
(*preparing_topology) = false;
progress_tracker
.fail_task("topology-error", &message.unwrap_or("".into()));
}
}
}
HarmonyEvent::InterpretExecutionStarted {
interpret: name,
execution_id: task_key,
topology,
interpret: _,
score,
message,
} => {
let section = (*sections).get(&topology).unwrap();
let progress_bar = progress::add_spinner(section, message);
let is_key_topology = (*preparing_topology)
&& progress_tracker.contains_section(&topology_key(&topology));
let is_key_current_score = current_score.is_some()
&& progress_tracker
.contains_section(&score_key(&current_score.clone().unwrap()));
let is_key_score = progress_tracker.contains_section(&score_key(&score));
(*progress_bars).insert(name, progress_bar);
let section_key = if is_key_topology {
topology_key(&topology)
} else if is_key_current_score {
score_key(&current_score.clone().unwrap())
} else if is_key_score {
score_key(&score)
} else {
(*current_score) = Some(score.clone());
let key = score_key(&score);
progress_tracker.add_section(
&key,
&format!(
"{} Interpreting score: {score}...",
crate::theme::EMOJI_SCORE
),
);
key
};
progress_tracker.add_task(&section_key, &task_key, &message);
}
HarmonyEvent::InterpretExecutionFinished {
topology,
interpret: name,
execution_id: task_key,
topology: _,
interpret: _,
score,
outcome,
} => {
let section = (*sections).get(&topology).unwrap();
let progress_bar = (*progress_bars).get(&name).cloned();
let _ = section.clear();
match outcome {
Ok(outcome) => {
progress::success(section, progress_bar, outcome.message);
}
Err(err) => {
progress::error(section, progress_bar, err.to_string());
}
if current_score.is_some() && current_score.clone().unwrap() == score {
(*current_score) = None;
}
(*progress_bars).remove(&name);
match outcome {
Ok(outcome) => match outcome.status {
harmony::interpret::InterpretStatus::SUCCESS => {
progress_tracker.finish_task(&task_key, &outcome.message);
}
harmony::interpret::InterpretStatus::NOOP => {
progress_tracker.skip_task(&task_key, &outcome.message);
}
_ => progress_tracker.fail_task(&task_key, &outcome.message),
},
Err(err) => {
error!("Interpret error: {err}");
progress_tracker.fail_task(&task_key, &err.to_string());
}
}
}
HarmonyEvent::ApplicationFeatureStateChanged {
topology: _,
application,
feature,
status,
} => {
if let Some(score) = &(*current_score) {
let section_key = score_key(score);
let task_key = app_feature_key(&application, &feature);
match status {
ApplicationFeatureStatus::Installing => {
let message = format!("Feature '{}' installing...", feature);
progress_tracker.add_task(&section_key, &task_key, &message);
}
ApplicationFeatureStatus::Installed => {
let message = format!("Feature '{}' installed", feature);
progress_tracker.finish_task(&task_key, &message);
}
ApplicationFeatureStatus::Failed { details } => {
let message = format!(
"Feature '{}' installation failed: {}",
feature, details
);
progress_tracker.fail_task(&task_key, &message);
}
}
}
}
}
true
@@ -114,3 +204,15 @@ async fn handle_events() {
})
.await;
}
fn topology_key(topology: &str) -> String {
format!("topology-{topology}")
}
fn score_key(score: &str) -> String {
format!("score-{score}")
}
fn app_feature_key(application: &str, feature: &str) -> String {
format!("app-{application}-{feature}")
}

View File

@@ -1,5 +1,6 @@
use clap::Parser;
use clap::builder::ArgPredicate;
use harmony::instrumentation;
use harmony::inventory::Inventory;
use harmony::maestro::Maestro;
use harmony::{score::Score, topology::Topology};
@@ -11,8 +12,6 @@ pub mod progress;
pub mod theme;
#[cfg(feature = "tui")]
use harmony_tui;
#[derive(Parser, Debug)]
#[command(version, about, long_about = None)]
pub struct Args {
@@ -73,7 +72,7 @@ fn maestro_scores_filter<T: Topology>(
}
};
return scores_vec;
scores_vec
}
// TODO: consider adding doctest for this function
@@ -83,7 +82,7 @@ fn list_scores_with_index<T: Topology>(scores_vec: &Vec<Box<dyn Score<T>>>) -> S
let name = s.name();
display_str.push_str(&format!("\n{i}: {name}"));
}
return display_str;
display_str
}
pub async fn run<T: Topology + Send + Sync + 'static>(
@@ -99,6 +98,7 @@ pub async fn run<T: Topology + Send + Sync + 'static>(
let result = init(maestro, args_struct).await;
instrumentation::instrument(instrumentation::HarmonyEvent::HarmonyFinished).unwrap();
let _ = tokio::try_join!(cli_logger_handle);
result
}
@@ -126,14 +126,15 @@ async fn init<T: Topology + Send + Sync + 'static>(
let scores_vec = maestro_scores_filter(&maestro, args.all, args.filter, args.number);
if scores_vec.len() == 0 {
if scores_vec.is_empty() {
return Err("No score found".into());
}
// if list option is specified, print filtered list and exit
if args.list {
println!("Available scores:");
println!("{}", list_scores_with_index(&scores_vec));
let num_scores = scores_vec.len();
println!("Available scores {num_scores}:");
println!("{}\n\n", list_scores_with_index(&scores_vec));
return Ok(());
}
@@ -165,7 +166,7 @@ async fn init<T: Topology + Send + Sync + 'static>(
}
#[cfg(test)]
mod test {
mod tests {
use harmony::{
inventory::Inventory,
maestro::Maestro,
@@ -265,7 +266,7 @@ mod test {
assert!(
maestro
.interpret(res.get(0).unwrap().clone_box())
.interpret(res.first().unwrap().clone_box())
.await
.is_ok()
);
@@ -281,7 +282,7 @@ mod test {
assert!(
maestro
.interpret(res.get(0).unwrap().clone_box())
.interpret(res.first().unwrap().clone_box())
.await
.is_err()
);
@@ -297,7 +298,7 @@ mod test {
assert!(
maestro
.interpret(res.get(0).unwrap().clone_box())
.interpret(res.first().unwrap().clone_box())
.await
.is_ok()
);
@@ -319,7 +320,7 @@ mod test {
assert!(
maestro
.interpret(res.get(0).unwrap().clone_box())
.interpret(res.first().unwrap().clone_box())
.await
.is_ok()
);
@@ -331,6 +332,6 @@ mod test {
let res = crate::maestro_scores_filter(&maestro, false, None, 11);
assert!(res.len() == 0);
assert!(res.is_empty());
}
}

View File

@@ -1,50 +1,147 @@
use indicatif::{MultiProgress, ProgressBar};
use std::collections::HashMap;
use std::sync::{Arc, Mutex};
use std::time::Duration;
use indicatif::{MultiProgress, ProgressBar};
pub fn new_section(title: String) -> MultiProgress {
let multi_progress = MultiProgress::new();
let _ = multi_progress.println(title);
multi_progress
pub trait ProgressTracker: Send + Sync {
fn contains_section(&self, id: &str) -> bool;
fn add_section(&self, id: &str, message: &str);
fn add_task(&self, section_id: &str, task_id: &str, message: &str);
fn finish_task(&self, id: &str, message: &str);
fn fail_task(&self, id: &str, message: &str);
fn skip_task(&self, id: &str, message: &str);
fn clear(&self);
}
pub fn add_spinner(multi_progress: &MultiProgress, message: String) -> ProgressBar {
let progress = multi_progress.add(ProgressBar::new_spinner());
progress.set_style(crate::theme::SPINNER_STYLE.clone());
progress.set_message(message);
progress.enable_steady_tick(Duration::from_millis(100));
progress
struct Section {
header_index: usize,
task_count: usize,
pb: ProgressBar,
}
pub fn success(multi_progress: &MultiProgress, progress: Option<ProgressBar>, message: String) {
if let Some(progress) = progress {
multi_progress.remove(&progress)
struct IndicatifProgressTrackerState {
sections: HashMap<String, Section>,
tasks: HashMap<String, ProgressBar>,
pb_count: usize,
}
#[derive(Clone)]
pub struct IndicatifProgressTracker {
mp: MultiProgress,
state: Arc<Mutex<IndicatifProgressTrackerState>>,
}
impl IndicatifProgressTracker {
pub fn new(base: MultiProgress) -> Self {
let sections = HashMap::new();
let tasks = HashMap::new();
let state = Arc::new(Mutex::new(IndicatifProgressTrackerState {
sections,
tasks,
pb_count: 0,
}));
Self { mp: base, state }
}
}
impl ProgressTracker for IndicatifProgressTracker {
fn add_section(&self, id: &str, message: &str) {
let mut state = self.state.lock().unwrap();
let header_pb = self
.mp
.add(ProgressBar::new(1).with_style(crate::theme::SECTION_STYLE.clone()));
header_pb.finish_with_message(message.to_string());
let header_index = state.pb_count;
state.pb_count += 1;
state.sections.insert(
id.to_string(),
Section {
header_index,
task_count: 0,
pb: header_pb,
},
);
}
let progress = multi_progress.add(ProgressBar::new_spinner());
progress.set_style(crate::theme::SUCCESS_SPINNER_STYLE.clone());
progress.finish_with_message(message);
}
fn add_task(&self, section_id: &str, task_id: &str, message: &str) {
let mut state = self.state.lock().unwrap();
pub fn error(multi_progress: &MultiProgress, progress: Option<ProgressBar>, message: String) {
if let Some(progress) = progress {
multi_progress.remove(&progress)
let insertion_index = {
let current_section = state
.sections
.get(section_id)
.expect("Section ID not found");
current_section.header_index + current_section.task_count + 1 // +1 to insert after header
};
let pb = self.mp.insert(insertion_index, ProgressBar::new_spinner());
pb.set_style(crate::theme::SPINNER_STYLE.clone());
pb.set_prefix(" ");
pb.set_message(message.to_string());
pb.enable_steady_tick(Duration::from_millis(80));
state.pb_count += 1;
let section = state
.sections
.get_mut(section_id)
.expect("Section ID not found");
section.task_count += 1;
// We inserted a new progress bar, so we must update the header_index
// for all subsequent sections.
for (id, s) in state.sections.iter_mut() {
if id != section_id && s.header_index >= insertion_index {
s.header_index += 1;
}
}
state.tasks.insert(task_id.to_string(), pb);
}
let progress = multi_progress.add(ProgressBar::new_spinner());
progress.set_style(crate::theme::ERROR_SPINNER_STYLE.clone());
progress.finish_with_message(message);
}
pub fn skip(multi_progress: &MultiProgress, progress: Option<ProgressBar>, message: String) {
if let Some(progress) = progress {
multi_progress.remove(&progress)
fn finish_task(&self, id: &str, message: &str) {
let state = self.state.lock().unwrap();
if let Some(pb) = state.tasks.get(id) {
pb.set_style(crate::theme::SUCCESS_SPINNER_STYLE.clone());
pb.finish_with_message(message.to_string());
}
}
let progress = multi_progress.add(ProgressBar::new_spinner());
progress.set_style(crate::theme::SKIP_SPINNER_STYLE.clone());
progress.finish_with_message(message);
fn fail_task(&self, id: &str, message: &str) {
let state = self.state.lock().unwrap();
if let Some(pb) = state.tasks.get(id) {
pb.set_style(crate::theme::ERROR_SPINNER_STYLE.clone());
pb.finish_with_message(message.to_string());
}
}
fn skip_task(&self, id: &str, message: &str) {
let state = self.state.lock().unwrap();
if let Some(pb) = state.tasks.get(id) {
pb.set_style(crate::theme::SKIP_SPINNER_STYLE.clone());
pb.finish_with_message(message.to_string());
}
}
fn contains_section(&self, id: &str) -> bool {
let state = self.state.lock().unwrap();
state.sections.contains_key(id)
}
fn clear(&self) {
let mut state = self.state.lock().unwrap();
state.tasks.values().for_each(|p| self.mp.remove(p));
state.tasks.clear();
state.sections.values().for_each(|s| self.mp.remove(&s.pb));
state.sections.clear();
state.pb_count = 0;
let _ = self.mp.clear();
}
}

View File

@@ -8,19 +8,27 @@ pub static EMOJI_SKIP: Emoji<'_, '_> = Emoji("⏭️", "");
pub static EMOJI_ERROR: Emoji<'_, '_> = Emoji("⚠️", "");
pub static EMOJI_DEPLOY: Emoji<'_, '_> = Emoji("🚀", "");
pub static EMOJI_TOPOLOGY: Emoji<'_, '_> = Emoji("📦", "");
pub static EMOJI_SCORE: Emoji<'_, '_> = Emoji("🎶", "");
lazy_static! {
pub static ref SECTION_STYLE: ProgressStyle = ProgressStyle::default_spinner()
.template("{wide_msg:.bold}")
.unwrap();
pub static ref SPINNER_STYLE: ProgressStyle = ProgressStyle::default_spinner()
.template(" {spinner:.green} {msg}")
.template(" {spinner:.green} {wide_msg}")
.unwrap()
.tick_strings(&["", "", "", "", "", "", "", "", "", ""]);
pub static ref SUCCESS_SPINNER_STYLE: ProgressStyle = SPINNER_STYLE
.clone()
.tick_strings(&[format!("{}", EMOJI_SUCCESS).as_str()]);
pub static ref SKIP_SPINNER_STYLE: ProgressStyle = SPINNER_STYLE
pub static ref SKIP_SPINNER_STYLE: ProgressStyle = ProgressStyle::default_spinner()
.template(" {spinner:.orange} {wide_msg}")
.unwrap()
.clone()
.tick_strings(&[format!("{}", EMOJI_SKIP).as_str()]);
pub static ref ERROR_SPINNER_STYLE: ProgressStyle = SPINNER_STYLE
pub static ref ERROR_SPINNER_STYLE: ProgressStyle = ProgressStyle::default_spinner()
.template(" {spinner:.red} {wide_msg}")
.unwrap()
.clone()
.tick_strings(&[format!("{}", EMOJI_ERROR).as_str()]);
}

View File

@@ -1,10 +1,6 @@
use indicatif::{MultiProgress, ProgressBar};
use indicatif_log_bridge::LogWrapper;
use log::error;
use std::{
collections::HashMap,
sync::{Arc, Mutex},
};
use harmony_cli::progress::{IndicatifProgressTracker, ProgressTracker};
use indicatif::MultiProgress;
use std::sync::Arc;
use crate::instrumentation::{self, HarmonyComposerEvent};
@@ -22,85 +18,59 @@ pub fn init() -> tokio::task::JoinHandle<()> {
}
fn configure_logger() {
let logger =
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build();
let level = logger.filter();
let multi = MultiProgress::new();
LogWrapper::new(multi.clone(), logger).try_init().unwrap();
log::set_max_level(level);
env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("info")).build();
}
pub async fn handle_events() {
const PROGRESS_SETUP: &str = "project-initialization";
let progress_tracker = Arc::new(IndicatifProgressTracker::new(MultiProgress::new()));
const SETUP_SECTION: &str = "project-initialization";
const COMPILTATION_TASK: &str = "compilation";
const PROGRESS_DEPLOYMENT: &str = "deployment";
instrumentation::subscribe("Harmony Composer Logger", {
let progresses: Arc<Mutex<HashMap<String, MultiProgress>>> =
Arc::new(Mutex::new(HashMap::new()));
let compilation_progress = Arc::new(Mutex::new(None::<ProgressBar>));
move |event| {
let progresses_clone = Arc::clone(&progresses);
let compilation_progress_clone = Arc::clone(&compilation_progress);
let progress_tracker = Arc::clone(&progress_tracker);
async move {
let mut progresses_guard = progresses_clone.lock().unwrap();
let mut compilation_progress_guard = compilation_progress_clone.lock().unwrap();
match event {
HarmonyComposerEvent::HarmonyComposerStarted => {}
HarmonyComposerEvent::ProjectInitializationStarted => {
let multi_progress = harmony_cli::progress::new_section(format!(
"{} Initializing Harmony project...",
harmony_cli::theme::EMOJI_HARMONY,
));
(*progresses_guard).insert(PROGRESS_SETUP.to_string(), multi_progress);
progress_tracker.add_section(
SETUP_SECTION,
&format!(
"{} Initializing Harmony project...",
harmony_cli::theme::EMOJI_HARMONY,
),
);
}
HarmonyComposerEvent::ProjectInitialized => println!("\n"),
HarmonyComposerEvent::ProjectInitialized => {}
HarmonyComposerEvent::ProjectCompilationStarted { details } => {
let initialization_progress =
(*progresses_guard).get(PROGRESS_SETUP).unwrap();
let _ = initialization_progress.clear();
let progress =
harmony_cli::progress::add_spinner(initialization_progress, details);
*compilation_progress_guard = Some(progress);
progress_tracker.add_task(SETUP_SECTION, COMPILTATION_TASK, &details);
}
HarmonyComposerEvent::ProjectCompiled => {
let initialization_progress =
(*progresses_guard).get(PROGRESS_SETUP).unwrap();
harmony_cli::progress::success(
initialization_progress,
(*compilation_progress_guard).take(),
"project compiled".to_string(),
);
progress_tracker.finish_task(COMPILTATION_TASK, "project compiled");
}
HarmonyComposerEvent::ProjectCompilationFailed { details } => {
let initialization_progress =
(*progresses_guard).get(PROGRESS_SETUP).unwrap();
harmony_cli::progress::error(
initialization_progress,
(*compilation_progress_guard).take(),
"failed to compile project".to_string(),
progress_tracker.fail_task(COMPILTATION_TASK, &format!("failed to compile project:\n{details}"));
}
HarmonyComposerEvent::DeploymentStarted { target, profile } => {
progress_tracker.add_section(
PROGRESS_DEPLOYMENT,
&format!(
"\n{} Deploying project on target '{target}' with profile '{profile}'...\n",
harmony_cli::theme::EMOJI_DEPLOY,
),
);
error!("{details}");
}
HarmonyComposerEvent::DeploymentStarted { target } => {
let multi_progress = harmony_cli::progress::new_section(format!(
"{} Starting deployment to {target}...\n\n",
harmony_cli::theme::EMOJI_DEPLOY
));
(*progresses_guard).insert(PROGRESS_DEPLOYMENT.to_string(), multi_progress);
HarmonyComposerEvent::DeploymentCompleted => {
progress_tracker.clear();
}
HarmonyComposerEvent::DeploymentCompleted { details } => println!("\n"),
HarmonyComposerEvent::DeploymentFailed { details } => {
progress_tracker.add_task(PROGRESS_DEPLOYMENT, "deployment-failed", "");
progress_tracker.fail_task("deployment-failed", &details);
},
HarmonyComposerEvent::Shutdown => {
for (_, progresses) in (*progresses_guard).iter() {
progresses.clear().unwrap();
}
return false;
}
}

View File

@@ -2,16 +2,28 @@ use log::debug;
use once_cell::sync::Lazy;
use tokio::sync::broadcast;
use crate::{HarmonyProfile, HarmonyTarget};
#[derive(Debug, Clone)]
pub enum HarmonyComposerEvent {
HarmonyComposerStarted,
ProjectInitializationStarted,
ProjectInitialized,
ProjectCompilationStarted { details: String },
ProjectCompilationStarted {
details: String,
},
ProjectCompiled,
ProjectCompilationFailed { details: String },
DeploymentStarted { target: String },
DeploymentCompleted { details: String },
ProjectCompilationFailed {
details: String,
},
DeploymentStarted {
target: HarmonyTarget,
profile: HarmonyProfile,
},
DeploymentCompleted,
DeploymentFailed {
details: String,
},
Shutdown,
}
@@ -23,9 +35,18 @@ static HARMONY_COMPOSER_EVENT_BUS: Lazy<broadcast::Sender<HarmonyComposerEvent>>
});
pub fn instrument(event: HarmonyComposerEvent) -> Result<(), &'static str> {
match HARMONY_COMPOSER_EVENT_BUS.send(event) {
Ok(_) => Ok(()),
Err(_) => Err("send error: no subscribers"),
#[cfg(not(test))]
{
match HARMONY_COMPOSER_EVENT_BUS.send(event) {
Ok(_) => Ok(()),
Err(_) => Err("send error: no subscribers"),
}
}
#[cfg(test)]
{
let _ = event; // Suppress the "unused variable" warning for `event`
Ok(())
}
}

View File

@@ -20,7 +20,7 @@ mod instrumentation;
#[derive(Parser)]
#[command(version, about, long_about = None, flatten_help = true, propagate_version = true)]
struct GlobalArgs {
#[arg(long, default_value = "harmony")]
#[arg(long, default_value = ".")]
harmony_path: String,
#[arg(long)]
@@ -49,14 +49,11 @@ struct CheckArgs {
#[derive(Args, Clone, Debug)]
struct DeployArgs {
#[arg(long, default_value_t = false)]
staging: bool,
#[arg(long = "target", short = 't', default_value = "local")]
harmony_target: HarmonyTarget,
#[arg(long, default_value_t = false)]
prod: bool,
#[arg(long, default_value_t = false)]
smoke_test: bool,
#[arg(long = "profile", short = 'p', default_value = "dev")]
harmony_profile: HarmonyProfile,
}
#[derive(Args, Clone, Debug)]
@@ -68,6 +65,38 @@ struct AllArgs {
deploy: DeployArgs,
}
#[derive(Clone, Debug, clap::ValueEnum)]
enum HarmonyTarget {
Local,
Remote,
}
impl std::fmt::Display for HarmonyTarget {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
HarmonyTarget::Local => f.write_str("local"),
HarmonyTarget::Remote => f.write_str("remote"),
}
}
}
#[derive(Clone, Debug, clap::ValueEnum)]
enum HarmonyProfile {
Dev,
Staging,
Production,
}
impl std::fmt::Display for HarmonyProfile {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
HarmonyProfile::Dev => f.write_str("dev"),
HarmonyProfile::Staging => f.write_str("staging"),
HarmonyProfile::Production => f.write_str("production"),
}
}
}
#[tokio::main]
async fn main() {
let hc_logger_handle = harmony_composer_logger::init();
@@ -80,14 +109,13 @@ async fn main() {
instrumentation::instrument(HarmonyComposerEvent::ProjectInitializationStarted).unwrap();
let harmony_bin_path: PathBuf = match harmony_path {
true => {
compile_harmony(
cli_args.compile_method,
cli_args.compile_platform,
cli_args.harmony_path.clone(),
)
.await
}
true => compile_harmony(
cli_args.compile_method,
cli_args.compile_platform,
cli_args.harmony_path.clone(),
)
.await
.expect("couldn't compile harmony"),
false => todo!("implement autodetect code"),
};
@@ -123,32 +151,44 @@ async fn main() {
);
}
Commands::Deploy(args) => {
let deploy = if args.staging {
instrumentation::instrument(HarmonyComposerEvent::DeploymentStarted {
target: "staging".to_string(),
})
.unwrap();
todo!("implement staging deployment")
} else if args.prod {
instrumentation::instrument(HarmonyComposerEvent::DeploymentStarted {
target: "prod".to_string(),
})
.unwrap();
todo!("implement prod deployment")
} else {
instrumentation::instrument(HarmonyComposerEvent::DeploymentStarted {
target: "dev".to_string(),
})
.unwrap();
Command::new(harmony_bin_path).arg("-y").arg("-a").spawn()
}
.expect("failed to run harmony deploy");
let deploy_output = deploy.wait_with_output().unwrap();
instrumentation::instrument(HarmonyComposerEvent::DeploymentCompleted {
details: String::from_utf8(deploy_output.stdout).unwrap(),
instrumentation::instrument(HarmonyComposerEvent::DeploymentStarted {
target: args.harmony_target.clone(),
profile: args.harmony_profile.clone(),
})
.unwrap();
if matches!(args.harmony_profile, HarmonyProfile::Dev)
&& !matches!(args.harmony_target, HarmonyTarget::Local)
{
instrumentation::instrument(HarmonyComposerEvent::DeploymentFailed {
details: format!(
"Cannot run profile '{}' on target '{}'. Profile '{}' can run locally only.",
args.harmony_profile, args.harmony_target, args.harmony_profile
),
}).unwrap();
return;
}
let use_local_k3d = match args.harmony_target {
HarmonyTarget::Local => true,
HarmonyTarget::Remote => false,
};
let mut command = Command::new(harmony_bin_path);
command
.env("HARMONY_USE_LOCAL_K3D", format!("{use_local_k3d}"))
.env("HARMONY_PROFILE", format!("{}", args.harmony_profile))
.arg("-y")
.arg("-a");
info!("{:?}", command);
let deploy = command.spawn().expect("failed to run harmony deploy");
let deploy_output = deploy.wait_with_output().unwrap();
debug!("{}", String::from_utf8(deploy_output.stdout).unwrap());
instrumentation::instrument(HarmonyComposerEvent::DeploymentCompleted).unwrap();
}
Commands::All(_args) => todo!(
"take all previous match arms and turn them into separate functions, and call them all one after the other"
@@ -173,7 +213,7 @@ async fn compile_harmony(
method: Option<CompileMethod>,
platform: Option<String>,
harmony_location: String,
) -> PathBuf {
) -> Result<PathBuf, String> {
let platform = match platform {
Some(p) => p,
None => current_platform::CURRENT_PLATFORM.to_string(),
@@ -203,6 +243,7 @@ async fn compile_harmony(
details: "compiling project with cargo".to_string(),
})
.unwrap();
compile_cargo(platform, harmony_location).await
}
CompileMethod::Docker => {
@@ -210,16 +251,28 @@ async fn compile_harmony(
details: "compiling project with docker".to_string(),
})
.unwrap();
compile_docker(platform, harmony_location).await
}
};
instrumentation::instrument(HarmonyComposerEvent::ProjectCompiled).unwrap();
path
match path {
Ok(path) => {
instrumentation::instrument(HarmonyComposerEvent::ProjectCompiled).unwrap();
Ok(path)
}
Err(err) => {
instrumentation::instrument(HarmonyComposerEvent::ProjectCompilationFailed {
details: err.clone(),
})
.unwrap();
Err(err)
}
}
}
// TODO: make sure this works with cargo workspaces
async fn compile_cargo(platform: String, harmony_location: String) -> PathBuf {
async fn compile_cargo(platform: String, harmony_location: String) -> Result<PathBuf, String> {
let metadata = MetadataCommand::new()
.manifest_path(format!("{}/Cargo.toml", harmony_location))
.exec()
@@ -268,7 +321,10 @@ async fn compile_cargo(platform: String, harmony_location: String) -> PathBuf {
}
}
cargo_build.wait().expect("run cargo command failed");
let res = cargo_build.wait(); //.expect("run cargo command failed");
if res.is_err() {
return Err("cargo build failed".into());
}
let bin = artifacts
.last()
@@ -286,10 +342,10 @@ async fn compile_cargo(platform: String, harmony_location: String) -> PathBuf {
let _copy_res = fs::copy(&bin, &bin_out).await;
}
bin_out
Ok(bin_out)
}
async fn compile_docker(platform: String, harmony_location: String) -> PathBuf {
async fn compile_docker(platform: String, harmony_location: String) -> Result<PathBuf, String> {
let docker_client =
bollard::Docker::connect_with_local_defaults().expect("couldn't connect to docker");
@@ -305,7 +361,7 @@ async fn compile_docker(platform: String, harmony_location: String) -> PathBuf {
.await
.expect("list containers failed");
if containers.len() > 0 {
if !containers.is_empty() {
docker_client
.remove_container("harmony_build", None::<RemoveContainerOptions>)
.await
@@ -367,12 +423,12 @@ async fn compile_docker(platform: String, harmony_location: String) -> PathBuf {
}
// wait until container is no longer running
while let Some(_) = wait.next().await {}
while (wait.next().await).is_some() {}
// hack that should be cleaned up
if platform.contains("windows") {
return PathBuf::from(format!("{}/harmony.exe", harmony_location));
Ok(PathBuf::from(format!("{}/harmony.exe", harmony_location)))
} else {
return PathBuf::from(format!("{}/harmony", harmony_location));
Ok(PathBuf::from(format!("{}/harmony", harmony_location)))
}
}

View File

@@ -11,13 +11,13 @@ pub fn ip(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as LitStr);
let ip_str = input.value();
if let Ok(_) = ip_str.parse::<std::net::Ipv4Addr>() {
if ip_str.parse::<std::net::Ipv4Addr>().is_ok() {
let expanded =
quote! { std::net::IpAddr::V4(#ip_str.parse::<std::net::Ipv4Addr>().unwrap()) };
return TokenStream::from(expanded);
}
if let Ok(_) = ip_str.parse::<std::net::Ipv6Addr>() {
if ip_str.parse::<std::net::Ipv6Addr>().is_ok() {
let expanded =
quote! { std::net::IpAddr::V6(#ip_str.parse::<std::net::Ipv6Addr>().unwrap()) };
return TokenStream::from(expanded);
@@ -31,7 +31,7 @@ pub fn ipv4(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as LitStr);
let ip_str = input.value();
if let Ok(_) = ip_str.parse::<std::net::Ipv4Addr>() {
if ip_str.parse::<std::net::Ipv4Addr>().is_ok() {
let expanded = quote! { #ip_str.parse::<std::net::Ipv4Addr>().unwrap() };
return TokenStream::from(expanded);
}
@@ -127,7 +127,7 @@ pub fn ingress_path(input: TokenStream) -> TokenStream {
match path_str.starts_with("/") {
true => {
let expanded = quote! {(#path_str.to_string()) };
return TokenStream::from(expanded);
TokenStream::from(expanded)
}
false => panic!("Invalid ingress path"),
}
@@ -138,7 +138,7 @@ pub fn cidrv4(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as LitStr);
let cidr_str = input.value();
if let Ok(_) = cidr_str.parse::<cidr::Ipv4Cidr>() {
if cidr_str.parse::<cidr::Ipv4Cidr>().is_ok() {
let expanded = quote! { #cidr_str.parse::<cidr::Ipv4Cidr>().unwrap() };
return TokenStream::from(expanded);
}

23
harmony_secret/Cargo.toml Normal file
View File

@@ -0,0 +1,23 @@
[package]
name = "harmony-secret"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony-secret-derive = { version = "0.1.0", path = "../harmony_secret_derive" }
serde = { version = "1.0.209", features = ["derive", "rc"] }
serde_json = "1.0.127"
thiserror.workspace = true
lazy_static.workspace = true
directories.workspace = true
log.workspace = true
infisical = "0.0.2"
tokio.workspace = true
async-trait.workspace = true
http.workspace = true
[dev-dependencies]
pretty_assertions.workspace = true
tempfile.workspace = true

View File

@@ -0,0 +1,18 @@
use lazy_static::lazy_static;
lazy_static! {
pub static ref SECRET_NAMESPACE: String =
std::env::var("HARMONY_SECRET_NAMESPACE").expect("HARMONY_SECRET_NAMESPACE environment variable is required, it should contain the name of the project you are working on to access its secrets");
pub static ref SECRET_STORE: Option<String> =
std::env::var("HARMONY_SECRET_STORE").ok();
pub static ref INFISICAL_URL: Option<String> =
std::env::var("HARMONY_SECRET_INFISICAL_URL").ok();
pub static ref INFISICAL_PROJECT_ID: Option<String> =
std::env::var("HARMONY_SECRET_INFISICAL_PROJECT_ID").ok();
pub static ref INFISICAL_ENVIRONMENT: Option<String> =
std::env::var("HARMONY_SECRET_INFISICAL_ENVIRONMENT").ok();
pub static ref INFISICAL_CLIENT_ID: Option<String> =
std::env::var("HARMONY_SECRET_INFISICAL_CLIENT_ID").ok();
pub static ref INFISICAL_CLIENT_SECRET: Option<String> =
std::env::var("HARMONY_SECRET_INFISICAL_CLIENT_SECRET").ok();
}

166
harmony_secret/src/lib.rs Normal file
View File

@@ -0,0 +1,166 @@
pub mod config;
mod store;
use crate::config::SECRET_NAMESPACE;
use async_trait::async_trait;
use config::INFISICAL_CLIENT_ID;
use config::INFISICAL_CLIENT_SECRET;
use config::INFISICAL_ENVIRONMENT;
use config::INFISICAL_PROJECT_ID;
use config::INFISICAL_URL;
use config::SECRET_STORE;
use serde::{Serialize, de::DeserializeOwned};
use std::fmt;
use store::InfisicalSecretStore;
use store::LocalFileSecretStore;
use thiserror::Error;
use tokio::sync::OnceCell;
pub use harmony_secret_derive::Secret;
// The Secret trait remains the same.
pub trait Secret: Serialize + DeserializeOwned + Sized {
const KEY: &'static str;
}
// The error enum remains the same.
#[derive(Debug, Error)]
pub enum SecretStoreError {
#[error("Secret not found for key '{key}' in namespace '{namespace}'")]
NotFound { namespace: String, key: String },
#[error("Failed to deserialize secret for key '{key}': {source}")]
Deserialization {
key: String,
source: serde_json::Error,
},
#[error("Failed to serialize secret for key '{key}': {source}")]
Serialization {
key: String,
source: serde_json::Error,
},
#[error("Underlying storage error: {0}")]
Store(#[from] Box<dyn std::error::Error + Send + Sync>),
}
// The trait is now async!
#[async_trait]
pub trait SecretStore: fmt::Debug + Send + Sync {
async fn get_raw(&self, namespace: &str, key: &str) -> Result<Vec<u8>, SecretStoreError>;
async fn set_raw(
&self,
namespace: &str,
key: &str,
value: &[u8],
) -> Result<(), SecretStoreError>;
}
// Use OnceCell for async-friendly, one-time initialization.
static SECRET_MANAGER: OnceCell<SecretManager> = OnceCell::const_new();
/// Initializes and returns a reference to the global SecretManager.
async fn get_secret_manager() -> &'static SecretManager {
SECRET_MANAGER.get_or_init(init_secret_manager).await
}
/// The async initialization function for the SecretManager.
async fn init_secret_manager() -> SecretManager {
let default_secret_score = "infisical".to_string();
let store_type = SECRET_STORE.as_ref().unwrap_or(&default_secret_score);
let store: Box<dyn SecretStore> = match store_type.as_str() {
"file" => Box::new(LocalFileSecretStore::default()),
"infisical" | _ => {
let store = InfisicalSecretStore::new(
INFISICAL_URL.clone().expect("Infisical url must be set, see harmony_secret config for ways to provide it. You can try with HARMONY_SECRET_INFISICAL_URL"),
INFISICAL_PROJECT_ID.clone().expect("Infisical project id must be set, see harmony_secret config for ways to provide it. You can try with HARMONY_SECRET_INFISICAL_PROJECT_ID"),
INFISICAL_ENVIRONMENT.clone().expect("Infisical environment must be set, see harmony_secret config for ways to provide it. You can try with HARMONY_SECRET_INFISICAL_ENVIRONMENT"),
INFISICAL_CLIENT_ID.clone().expect("Infisical client id must be set, see harmony_secret config for ways to provide it. You can try with HARMONY_SECRET_INFISICAL_CLIENT_ID"),
INFISICAL_CLIENT_SECRET.clone().expect("Infisical client secret must be set, see harmony_secret config for ways to provide it. You can try with HARMONY_SECRET_INFISICAL_CLIENT_SECRET"),
)
.await
.expect("Failed to initialize Infisical secret store");
Box::new(store)
}
};
SecretManager::new(SECRET_NAMESPACE.clone(), store)
}
/// Manages the lifecycle of secrets, providing a simple static API.
#[derive(Debug)]
pub struct SecretManager {
namespace: String,
store: Box<dyn SecretStore>,
}
impl SecretManager {
fn new(namespace: String, store: Box<dyn SecretStore>) -> Self {
Self { namespace, store }
}
/// Retrieves and deserializes a secret.
pub async fn get<T: Secret>() -> Result<T, SecretStoreError> {
let manager = get_secret_manager().await;
let raw_value = manager.store.get_raw(&manager.namespace, T::KEY).await?;
serde_json::from_slice(&raw_value).map_err(|e| SecretStoreError::Deserialization {
key: T::KEY.to_string(),
source: e,
})
}
/// Serializes and stores a secret.
pub async fn set<T: Secret>(secret: &T) -> Result<(), SecretStoreError> {
let manager = get_secret_manager().await;
let raw_value =
serde_json::to_vec(secret).map_err(|e| SecretStoreError::Serialization {
key: T::KEY.to_string(),
source: e,
})?;
manager
.store
.set_raw(&manager.namespace, T::KEY, &raw_value)
.await
}
}
#[cfg(test)]
mod test {
use super::*;
use pretty_assertions::assert_eq;
use serde::{Deserialize, Serialize};
#[derive(Serialize, Deserialize, Debug, PartialEq)]
struct TestUserMeta {
labels: Vec<String>,
}
#[derive(Secret, Serialize, Deserialize, Debug, PartialEq)]
struct TestSecret {
user: String,
password: String,
metadata: TestUserMeta,
}
#[cfg(secrete2etest)]
#[tokio::test]
async fn set_and_retrieve_secret() {
let secret = TestSecret {
user: String::from("user"),
password: String::from("password"),
metadata: TestUserMeta {
labels: vec![
String::from("label1"),
String::from("label2"),
String::from(
"some longet label with \" special @#%$)(udiojcia[]]] \"'asdij'' characters Nдs はにほへとちり าฟันพัฒนา yağız şoföre ç <20> <20> <20> <20> <20> <20> <20> <20> <20> <20> <20> <20> <20> 👩‍👩‍👧‍👦 /span> 👩‍👧‍👦 and why not emojis ",
),
],
},
};
SecretManager::set(&secret).await.unwrap();
let value = SecretManager::get::<TestSecret>().await.unwrap();
assert_eq!(value, secret);
}
}

View File

@@ -0,0 +1,129 @@
use crate::{SecretStore, SecretStoreError};
use async_trait::async_trait;
use infisical::{
AuthMethod, InfisicalError,
client::Client,
secrets::{CreateSecretRequest, GetSecretRequest, UpdateSecretRequest},
};
use log::{info, warn};
#[derive(Debug)]
pub struct InfisicalSecretStore {
client: Client,
project_id: String,
environment: String,
}
impl InfisicalSecretStore {
/// Creates a new, authenticated Infisical client.
pub async fn new(
base_url: String,
project_id: String,
environment: String,
client_id: String,
client_secret: String,
) -> Result<Self, InfisicalError> {
info!("INFISICAL_STORE: Initializing client for URL: {base_url}");
// The builder and login logic remains the same.
let mut client = Client::builder().base_url(base_url).build().await?;
let auth_method = AuthMethod::new_universal_auth(client_id, client_secret);
client.login(auth_method).await?;
info!("INFISICAL_STORE: Client authenticated successfully.");
Ok(Self {
client,
project_id,
environment,
})
}
}
#[async_trait]
impl SecretStore for InfisicalSecretStore {
async fn get_raw(&self, _environment: &str, key: &str) -> Result<Vec<u8>, SecretStoreError> {
let environment = &self.environment;
info!("INFISICAL_STORE: Getting key '{key}' from environment '{environment}'");
let request = GetSecretRequest::builder(key, &self.project_id, environment).build();
match self.client.secrets().get(request).await {
Ok(secret) => Ok(secret.secret_value.into_bytes()),
Err(e) => {
// Correctly match against the actual InfisicalError enum.
match e {
// The specific case for a 404 Not Found error.
InfisicalError::HttpError { status, .. }
if status == http::StatusCode::NOT_FOUND =>
{
Err(SecretStoreError::NotFound {
namespace: environment.to_string(),
key: key.to_string(),
})
}
// For all other errors, wrap them in our generic Store error.
_ => Err(SecretStoreError::Store(Box::new(e))),
}
}
}
}
async fn set_raw(
&self,
_environment: &str,
key: &str,
val: &[u8],
) -> Result<(), SecretStoreError> {
info!(
"INFISICAL_STORE: Setting key '{key}' in environment '{}'",
self.environment
);
let value_str =
String::from_utf8(val.to_vec()).map_err(|e| SecretStoreError::Store(Box::new(e)))?;
// --- Upsert Logic ---
// First, attempt to update the secret.
let update_req = UpdateSecretRequest::builder(key, &self.project_id, &self.environment)
.secret_value(&value_str)
.build();
match self.client.secrets().update(update_req).await {
Ok(_) => {
info!("INFISICAL_STORE: Successfully updated secret '{key}'.");
Ok(())
}
Err(e) => {
// If the update failed, check if it was because the secret doesn't exist.
match e {
InfisicalError::HttpError { status, .. }
if status == http::StatusCode::NOT_FOUND =>
{
// The secret was not found, so we create it instead.
warn!(
"INFISICAL_STORE: Secret '{key}' not found for update, attempting to create it."
);
let create_req = CreateSecretRequest::builder(
key,
&value_str,
&self.project_id,
&self.environment,
)
.build();
// Handle potential errors during creation.
self.client
.secrets()
.create(create_req)
.await
.map_err(|create_err| SecretStoreError::Store(Box::new(create_err)))?;
info!("INFISICAL_STORE: Successfully created secret '{key}'.");
Ok(())
}
// Any other error during update is a genuine failure.
_ => Err(SecretStoreError::Store(Box::new(e))),
}
}
}
}
}

View File

@@ -0,0 +1,105 @@
use async_trait::async_trait;
use log::info;
use std::path::{Path, PathBuf};
use crate::{SecretStore, SecretStoreError};
#[derive(Debug, Default)]
pub struct LocalFileSecretStore;
impl LocalFileSecretStore {
/// Helper to consistently generate the secret file path.
fn get_file_path(base_dir: &Path, ns: &str, key: &str) -> PathBuf {
base_dir.join(format!("{ns}_{key}.json"))
}
}
#[async_trait]
impl SecretStore for LocalFileSecretStore {
async fn get_raw(&self, ns: &str, key: &str) -> Result<Vec<u8>, SecretStoreError> {
let data_dir = directories::BaseDirs::new()
.expect("Could not find a valid home directory")
.data_dir()
.join("harmony")
.join("secrets");
let file_path = Self::get_file_path(&data_dir, ns, key);
info!(
"LOCAL_STORE: Getting key '{key}' from namespace '{ns}' at {}",
file_path.display()
);
tokio::fs::read(&file_path)
.await
.map_err(|_| SecretStoreError::NotFound {
namespace: ns.to_string(),
key: key.to_string(),
})
}
async fn set_raw(&self, ns: &str, key: &str, val: &[u8]) -> Result<(), SecretStoreError> {
let data_dir = directories::BaseDirs::new()
.expect("Could not find a valid home directory")
.data_dir()
.join("harmony")
.join("secrets");
let file_path = Self::get_file_path(&data_dir, ns, key);
info!(
"LOCAL_STORE: Setting key '{key}' in namespace '{ns}' at {}",
file_path.display()
);
if let Some(parent_dir) = file_path.parent() {
tokio::fs::create_dir_all(parent_dir)
.await
.map_err(|e| SecretStoreError::Store(Box::new(e)))?;
}
tokio::fs::write(&file_path, val)
.await
.map_err(|e| SecretStoreError::Store(Box::new(e)))
}
}
#[cfg(test)]
mod tests {
use super::*;
use tempfile::tempdir;
#[tokio::test]
async fn test_set_and_get_raw_successfully() {
let dir = tempdir().unwrap();
let store = LocalFileSecretStore::default();
let ns = "test-ns";
let key = "test-key";
let value = b"{\"data\":\"test-value\"}";
// To test the store directly, we override the base directory logic.
// For this test, we'll manually construct the path within our temp dir.
let file_path = LocalFileSecretStore::get_file_path(dir.path(), ns, key);
// Manually write to the temp path to simulate the store's behavior
tokio::fs::create_dir_all(file_path.parent().unwrap())
.await
.unwrap();
tokio::fs::write(&file_path, value).await.unwrap();
// Now, test get_raw by reading from that same temp path (by mocking the path logic)
let retrieved_value = tokio::fs::read(&file_path).await.unwrap();
assert_eq!(retrieved_value, value);
}
#[tokio::test]
async fn test_get_raw_not_found() {
let dir = tempdir().unwrap();
let ns = "test-ns";
let key = "non-existent-key";
// We need to check if reading a non-existent file gives the correct error
let file_path = LocalFileSecretStore::get_file_path(dir.path(), ns, key);
let result = tokio::fs::read(&file_path).await;
assert!(matches!(result, Err(_)));
}
}

View File

@@ -0,0 +1,4 @@
mod infisical;
mod local_file;
pub use infisical::*;
pub use local_file::*;

View File

@@ -0,0 +1,8 @@
export HARMONY_SECRET_NAMESPACE=harmony_test_secrets
export HARMONY_SECRET_INFISICAL_URL=http://localhost
export HARMONY_SECRET_INFISICAL_PROJECT_ID=eb4723dc-eede-44d7-98cc-c8e0caf29ccb
export HARMONY_SECRET_INFISICAL_ENVIRONMENT=dev
export HARMONY_SECRET_INFISICAL_CLIENT_ID=dd16b07f-0e38-4090-a1d0-922de9f44d91
export HARMONY_SECRET_INFISICAL_CLIENT_SECRET=bd2ae054e7759b11ca2e908494196337cc800bab138cb1f59e8d9b15ca3f286f
cargo test

View File

@@ -0,0 +1,13 @@
[package]
name = "harmony-secret-derive"
version = "0.1.0"
edition = "2024"
[lib]
proc-macro = true
[dependencies]
quote = "1.0"
proc-macro2 = "1.0"
proc-macro-crate = "3.3"
syn = "2.0"

View File

@@ -0,0 +1,38 @@
use proc_macro::TokenStream;
use proc_macro_crate::{FoundCrate, crate_name};
use quote::quote;
use syn::{DeriveInput, Ident, parse_macro_input};
#[proc_macro_derive(Secret)]
pub fn derive_secret(input: TokenStream) -> TokenStream {
let input = parse_macro_input!(input as DeriveInput);
let struct_ident = &input.ident;
// The key for the secret will be the stringified name of the struct itself.
// e.g., `struct OKDClusterSecret` becomes key `"OKDClusterSecret"`.
let key = struct_ident.to_string();
// Find the path to the `harmony_secret` crate.
let secret_crate_path = match crate_name("harmony-secret") {
Ok(FoundCrate::Itself) => quote!(crate),
Ok(FoundCrate::Name(name)) => {
let ident = Ident::new(&name, proc_macro2::Span::call_site());
quote!(::#ident)
}
Err(e) => {
return syn::Error::new(proc_macro2::Span::call_site(), e.to_string())
.to_compile_error()
.into();
}
};
// The generated code now implements `Secret` for the struct itself.
// The struct must also derive `Serialize` and `Deserialize` for this to be useful.
let expanded = quote! {
impl #secret_crate_path::Secret for #struct_ident {
const KEY: &'static str = #key;
}
};
TokenStream::from(expanded)
}

View File

@@ -14,9 +14,9 @@ use tokio::sync::mpsc;
#[derive(Debug)]
enum ExecutionState {
INITIATED,
RUNNING,
CANCELED,
Initiated,
Running,
Canceled,
}
struct Execution<T: Topology> {
@@ -62,7 +62,7 @@ impl<T: Topology> ScoreListWidget<T> {
pub(crate) fn launch_execution(&mut self) {
if let Some(score) = self.get_selected_score() {
self.execution = Some(Execution {
state: ExecutionState::INITIATED,
state: ExecutionState::Initiated,
score: score.clone_box(),
});
info!("{}\n\nConfirm Execution (Press y/n)", score.name());
@@ -106,7 +106,7 @@ impl<T: Topology> ScoreListWidget<T> {
if let Some(execution) = &mut self.execution {
match confirm {
true => {
execution.state = ExecutionState::RUNNING;
execution.state = ExecutionState::Running;
info!("Launch execution {execution}");
self.sender
.send(HarmonyTuiEvent::LaunchScore(execution.score.clone_box()))
@@ -114,7 +114,7 @@ impl<T: Topology> ScoreListWidget<T> {
.expect("Should be able to send message");
}
false => {
execution.state = ExecutionState::CANCELED;
execution.state = ExecutionState::Canceled;
info!("Execution cancelled");
self.clear_execution();
}
@@ -144,7 +144,11 @@ impl<T: Topology> Widget for &ScoreListWidget<T> {
Self: Sized,
{
let mut list_state = self.list_state.write().unwrap();
let scores_items: Vec<ListItem<'_>> = self.scores.iter().map(score_to_list_item).collect();
let scores_items: Vec<ListItem<'_>> = self
.scores
.iter()
.map(|score| ListItem::new(score.name()))
.collect();
let list = List::new(scores_items)
.highlight_style(Style::new().bold().italic())
.highlight_symbol("🠊 ");
@@ -152,7 +156,3 @@ impl<T: Topology> Widget for &ScoreListWidget<T> {
StatefulWidget::render(list, area, buf, &mut list_state)
}
}
fn score_to_list_item<'a, T: Topology>(score: &'a Box<dyn Score<T>>) -> ListItem<'a> {
ListItem::new(score.name())
}

View File

@@ -1,5 +1,5 @@
use futures_util::StreamExt;
use log::{debug, info, warn};
use log::{debug, warn};
use sha2::{Digest, Sha256};
use std::io::Read;
use std::path::PathBuf;
@@ -45,7 +45,7 @@ pub(crate) struct DownloadableAsset {
impl DownloadableAsset {
fn verify_checksum(&self, file: PathBuf) -> bool {
if !file.exists() {
warn!("File does not exist: {:?}", file);
debug!("File does not exist: {:?}", file);
return false;
}
@@ -155,7 +155,7 @@ impl DownloadableAsset {
return Err(CHECKSUM_FAILED_MSG.to_string());
}
info!(
debug!(
"File downloaded and verified successfully: {}",
target_file_path.to_string_lossy()
);

View File

@@ -2,7 +2,7 @@ mod downloadable_asset;
use downloadable_asset::*;
use kube::Client;
use log::{debug, warn};
use log::debug;
use std::path::PathBuf;
const K3D_BIN_FILE_NAME: &str = "k3d";
@@ -64,7 +64,6 @@ impl K3d {
.text()
.await
.unwrap();
println!("body: {body}");
let checksum = body
.lines()
@@ -104,8 +103,7 @@ impl K3d {
.get_latest()
.await
.map_err(|e| e.to_string())?;
// debug!("Got k3d releases {releases:#?}");
println!("Got k3d first releases {latest_release:#?}");
debug!("Got k3d releases {latest_release:#?}");
Ok(latest_release)
}
@@ -368,7 +366,7 @@ mod test {
async fn k3d_latest_release_should_get_latest() {
let dir = get_clean_test_directory();
assert_eq!(dir.join(K3D_BIN_FILE_NAME).exists(), false);
assert!(!dir.join(K3D_BIN_FILE_NAME).exists());
let k3d = K3d::new(dir.clone(), None);
let latest_release = k3d.get_latest_release_tag().await.unwrap();
@@ -382,12 +380,12 @@ mod test {
async fn k3d_download_latest_release_should_get_latest_bin() {
let dir = get_clean_test_directory();
assert_eq!(dir.join(K3D_BIN_FILE_NAME).exists(), false);
assert!(!dir.join(K3D_BIN_FILE_NAME).exists());
let k3d = K3d::new(dir.clone(), None);
let bin_file_path = k3d.download_latest_release().await.unwrap();
assert_eq!(bin_file_path, dir.join(K3D_BIN_FILE_NAME));
assert_eq!(dir.join(K3D_BIN_FILE_NAME).exists(), true);
assert!(dir.join(K3D_BIN_FILE_NAME).exists());
}
fn get_clean_test_directory() -> PathBuf {

View File

@@ -12,7 +12,7 @@ env_logger = { workspace = true }
yaserde = { git = "https://github.com/jggc/yaserde.git" }
yaserde_derive = { git = "https://github.com/jggc/yaserde.git" }
xml-rs = "0.8"
thiserror = "1.0"
thiserror.workspace = true
async-trait = { workspace = true }
tokio = { workspace = true }
uuid = { workspace = true }

View File

@@ -1,4 +1,3 @@
use rand;
use rand::Rng;
use xml::reader::XmlEvent as ReadEvent;
use xml::writer::XmlEvent as WriteEvent;
@@ -14,7 +13,7 @@ impl YaDeserializeTrait for HAProxyId {
ReadEvent::StartElement {
name, attributes, ..
} => {
if attributes.len() > 0 {
if !attributes.is_empty() {
return Err(String::from(
"Attributes not currently supported by HAProxyId",
));

Some files were not shown because too many files have changed in this diff Show More