diff --git a/Cargo.lock b/Cargo.lock index 97904ddf..11476bec 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2685,6 +2685,16 @@ dependencies = [ "url", ] +[[package]] +name = "example-install-rook-ceph" +version = "0.1.0" +dependencies = [ + "harmony", + "harmony_cli", + "k8s-openapi", + "tokio", +] + [[package]] name = "example-k8s-drain-node" version = "0.1.0" diff --git a/examples/install_rook_ceph/Cargo.toml b/examples/install_rook_ceph/Cargo.toml new file mode 100644 index 00000000..b79d88b6 --- /dev/null +++ b/examples/install_rook_ceph/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "example-install-rook-ceph" +edition = "2024" +version.workspace = true +readme.workspace = true +license.workspace = true +publish = false + +[dependencies] +harmony = { path = "../../harmony" } +harmony_cli = { path = "../../harmony_cli" } +k8s-openapi = { workspace = true } +tokio = { workspace = true } diff --git a/examples/install_rook_ceph/env.sh b/examples/install_rook_ceph/env.sh new file mode 100644 index 00000000..e07420e8 --- /dev/null +++ b/examples/install_rook_ceph/env.sh @@ -0,0 +1,31 @@ +# shellcheck shell=bash +# Source this file before running the example: +# source env.sh && cargo run -p example-install-rook-ceph + +# ---------- Target an existing cluster (no local K3D) ------------------------ +# This example is meant to drive a real cluster (pico OKD, vanilla K8s with an +# ingress, etc.) — not a throwaway local one. Force K8sAnywhereTopology to use +# whatever cluster the kubeconfig points at. +export HARMONY_USE_LOCAL_K3D=false + +# K8sAnywhereTopology resolves the kubeconfig via: +# 1. KUBECONFIG env var +# 2. fallback: $HOME/.kube/config +# Uncomment if your config isn't in the default location. +# export KUBECONFIG=/path/to/your/kubeconfig + +# Pin to a specific kubectl context if your kubeconfig has more than one. +# export HARMONY_K8S_CONTEXT=my-pico-okd + +# ---------- Harmony state storage (per-example boilerplate) ------------------ +# This example doesn't actually touch harmony_secret or the SQLite DB, but +# other examples in the workspace use this same boilerplate and these vars +# don't cost anything when unread. +export HARMONY_SECRET_NAMESPACE=install-rook-ceph +export HARMONY_SECRET_STORE=file +export HARMONY_DATABASE_URL=sqlite://harmony_install_rook_ceph.sqlite + +# ---------- Logging ---------------------------------------------------------- +# The RookCephClusterScore wait loops log every `ceph health` transition at +# info level; debug shows polling cadence. +export RUST_LOG=harmony=debug diff --git a/examples/install_rook_ceph/src/main.rs b/examples/install_rook_ceph/src/main.rs new file mode 100644 index 00000000..2f945382 --- /dev/null +++ b/examples/install_rook_ceph/src/main.rs @@ -0,0 +1,136 @@ +use harmony::{ + inventory::Inventory, + modules::storage::ceph::{ + RookCephClusterScore, RookCephOperatorScore, + ceph_validate_health_score::CephVerifyClusterHealth, + crd::{CephObjectStore, CephObjectStoreSpec, CephObjectStoreUser, GatewaySpec}, + }, + score::Score, + topology::K8sAnywhereTopology, +}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; + +// Uncomment to enable the edge-TLS Ingress for the S3 endpoint. +// use harmony::modules::k8s::resource::K8sResourceScore; +// use k8s_openapi::api::networking::v1::{ +// HTTPIngressPath, HTTPIngressRuleValue, Ingress, IngressBackend, IngressRule, +// IngressServiceBackend, IngressSpec, IngressTLS, ServiceBackendPort, +// }; + +const NAMESPACE: &str = "rook-ceph"; +const OBJECTSTORE_NAME: &str = "ceph-objectstore"; +const S3_USER_NAME: &str = "harmony-s3"; + +// External S3 hostname for the Ingress. Edit this to match the DNS name +// you'll point at your cluster's ingress LB. +// const S3_HOSTNAME: &str = "s3.example.com"; + +// TLS Secret holding the cert+key for `S3_HOSTNAME`. The Secret must exist +// in `NAMESPACE` *before* this example runs (e.g. via cert-manager, or a +// manual `kubectl create secret tls`). The example does not create it — +// real cert material can't be shipped in a repo. +// const S3_TLS_SECRET: &str = "ceph-objectstore-tls"; + +#[tokio::main] +async fn main() { + let topology = K8sAnywhereTopology::from_env(); + let inventory = Inventory::autoload(); + + let object_store = CephObjectStore { + metadata: ObjectMeta { + name: Some(OBJECTSTORE_NAME.to_string()), + namespace: Some(NAMESPACE.to_string()), + ..ObjectMeta::default() + }, + spec: CephObjectStoreSpec { + gateway: GatewaySpec { + instances: 2, + ..GatewaySpec::default() + }, + ..CephObjectStoreSpec::default() + }, + }; + + let s3_user = CephObjectStoreUser::for_store( + S3_USER_NAME, + NAMESPACE, + OBJECTSTORE_NAME, + "Harmony default S3 user", + ); + + let mut cluster = RookCephClusterScore::default_okd(NAMESPACE); + cluster.object_stores = vec![object_store]; + cluster.object_store_users = vec![s3_user]; + + // Pico-OKD tuning: relax OSD backfill/recovery on a small cluster so + // recovery storms don't starve client I/O. These are equivalent to running + // `ceph config set osd ` against the live cluster, but + // declarative — Rook reconciles them after the MONs reach quorum and + // re-applies on drift. Values are strings (Ceph parses them). + // + // Adjust or remove per your hardware. Not opinionated defaults — only + // wired here because the user's first target is a pico OKD. + cluster + .cluster + .spec + .set_config("osd.*", "osd_max_backfills", "1") + .set_config("osd.*", "osd_recovery_max_active", "1") + .set_config("osd.*", "osd_recovery_op_priority", "1") + .set_config("osd.*", "osd_mclock_profile", "high_client_ops"); + // Alternative — disable mclock entirely if you suspect it's the problem: + // cluster.cluster.spec.set_config("osd.*", "osd_op_queue", "wpq"); + + // Edge-TLS Ingress in front of the RGW Service. Rook creates the Service + // as `rook-ceph-rgw-` on the gateway port (8080 here). + // Uncomment this block (and its imports + consts above) to enable, then + // add the Box::new(K8sResourceScore::single(...)) entry to the `scores` + // vec below. + // + // let s3_ingress = Ingress { + // metadata: ObjectMeta { + // name: Some(format!("{OBJECTSTORE_NAME}-s3")), + // namespace: Some(NAMESPACE.to_string()), + // ..ObjectMeta::default() + // }, + // spec: Some(IngressSpec { + // rules: Some(vec![IngressRule { + // host: Some(S3_HOSTNAME.to_string()), + // http: Some(HTTPIngressRuleValue { + // paths: vec![HTTPIngressPath { + // path: Some("/".to_string()), + // path_type: "Prefix".to_string(), + // backend: IngressBackend { + // service: Some(IngressServiceBackend { + // name: format!("rook-ceph-rgw-{OBJECTSTORE_NAME}"), + // port: Some(ServiceBackendPort { + // number: Some(8080), + // ..ServiceBackendPort::default() + // }), + // }), + // ..IngressBackend::default() + // }, + // }], + // }), + // }]), + // tls: Some(vec![IngressTLS { + // hosts: Some(vec![S3_HOSTNAME.to_string()]), + // secret_name: Some(S3_TLS_SECRET.to_string()), + // }]), + // ..IngressSpec::default() + // }), + // ..Ingress::default() + // }; + + let scores: Vec>> = vec![ + Box::new(RookCephOperatorScore::default_okd()), + Box::new(cluster), + Box::new(CephVerifyClusterHealth { + rook_ceph_namespace: NAMESPACE.to_string(), + }), + // Box::new(K8sResourceScore::single(s3_ingress, Some(NAMESPACE.to_string()))), + ]; + + harmony_cli::run(inventory, topology, scores, None) + .await + .unwrap(); +} diff --git a/harmony/src/modules/k8s/resource.rs b/harmony/src/modules/k8s/resource.rs index bff8183f..1cbc1582 100644 --- a/harmony/src/modules/k8s/resource.rs +++ b/harmony/src/modules/k8s/resource.rs @@ -1,6 +1,7 @@ use async_trait::async_trait; use k8s_openapi::ResourceScope; use kube::Resource; +use kube::api::DynamicObject; use log::info; use serde::{Serialize, de::DeserializeOwned}; @@ -17,6 +18,12 @@ use harmony_types::id::Id; pub struct K8sResourceScore { pub resource: Vec, pub namespace: Option, + /// When `true`, server-side apply is performed with `force=true` + /// (equivalent to `kubectl apply --force-conflicts`). Required when + /// applying a resource whose fields an operator has taken ownership of + /// (typical for CRs managed by reconciling operators like Rook). For + /// generic Kubernetes resources owned only by Harmony, leave `false`. + pub force_conflicts: bool, } impl K8sResourceScore { @@ -24,8 +31,16 @@ impl K8sResourceScore { Self { resource: vec![resource], namespace, + force_conflicts: false, } } + + /// Consume `self` and return it with `force_conflicts` set. Chainable + /// after `single`: `K8sResourceScore::single(r, ns).with_force_conflicts(true)`. + pub fn with_force_conflicts(mut self, force: bool) -> Self { + self.force_conflicts = force; + self + } } impl< @@ -102,16 +117,47 @@ where .collect(); info!( - "Applying {} resources : {}", + "Applying {} resources : {} (force_conflicts={})", resource_names.len(), - resource_names.join(", ") + resource_names.join(", "), + self.score.force_conflicts ); - topology + let k8s = topology .k8s_client() .await - .map_err(|e| InterpretError::new(format!("Failed to get k8s client : {e}")))? - .apply_many(&self.score.resource, self.score.namespace.as_deref()) - .await?; + .map_err(|e| InterpretError::new(format!("Failed to get k8s client : {e}")))?; + + if self.score.force_conflicts { + // apply_dynamic_many exposes the force_conflicts flag on + // PatchParams that the typed apply path does not. Round-trip each + // typed resource through JSON to land on a DynamicObject without + // duplicating the harmony-k8s private `to_dynamic` helper. + let dyn_objects: Vec = self + .score + .resource + .iter() + .map(|r| { + serde_json::to_value(r) + .map_err(|e| { + InterpretError::new(format!( + "Failed to serialize resource for force-apply: {e}" + )) + }) + .and_then(|v| { + serde_json::from_value::(v).map_err(|e| { + InterpretError::new(format!( + "Failed to convert resource to DynamicObject: {e}" + )) + }) + }) + }) + .collect::, _>>()?; + k8s.apply_dynamic_many(&dyn_objects, self.score.namespace.as_deref(), true) + .await?; + } else { + k8s.apply_many(&self.score.resource, self.score.namespace.as_deref()) + .await?; + } Ok(Outcome::success( "Successfully applied resource".to_string(), diff --git a/harmony/src/modules/storage/ceph/cluster_score.rs b/harmony/src/modules/storage/ceph/cluster_score.rs new file mode 100644 index 00000000..460d70dc --- /dev/null +++ b/harmony/src/modules/storage/ceph/cluster_score.rs @@ -0,0 +1,517 @@ +use std::collections::BTreeMap; +use std::sync::Arc; +use std::time::{Duration, Instant}; + +use async_trait::async_trait; +use harmony_k8s::K8sClient; +use k8s_openapi::api::storage::v1::StorageClass; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; +use log::{debug, info, warn}; +use serde::Serialize; + +use crate::data::Version; +use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome}; +use crate::inventory::Inventory; +use crate::modules::k8s::resource::K8sResourceScore; +use crate::score::Score; +use crate::topology::{K8sclient, Topology}; +use harmony_types::id::Id; + +/// Total time we'll wait for the `rook-ceph-operator` Deployment to be ready +/// after `helm install` returns. Image pulls dominate the first run. +const OPERATOR_READY_TIMEOUT: Duration = Duration::from_secs(300); + +/// Total time we'll wait for the `cephclusters.ceph.rook.io` CRD to be +/// registered in the API server's discovery surface. Helm 3 applies CRDs +/// before other resources but the cache can lag. +const CRD_READY_TIMEOUT: Duration = Duration::from_secs(60); + +/// Total time we'll wait for the toolbox Deployment to come up before bailing. +const TOOLBOX_READY_TIMEOUT: Duration = Duration::from_secs(600); + +/// Total time we'll wait for the Ceph cluster to reach `HEALTH_OK` after the +/// toolbox is up. Mons + mgrs + OSDs + initial PG peering on a small cluster +/// typically lands in 5–15 min; 20 covers slower hardware. +const HEALTH_OK_TIMEOUT: Duration = Duration::from_secs(1200); + +/// Poll cadence for the toolbox + health waits. Cluster transitions on the +/// order of seconds, not subsecond — no benefit to a tighter loop. +const POLL_INTERVAL: Duration = Duration::from_secs(10); + +use super::crd::{ + CephBlockPool, CephBlockPoolSpec, CephCluster, CephFilesystem, CephObjectStore, + CephObjectStoreUser, FailureDomain, ReplicatedSpec, +}; +use super::toolbox::toolbox_deployment; + +/// Deploys a typed Rook-Ceph cluster: `CephCluster` + pools + filesystems + +/// object stores + their consumer `StorageClass`es. Assumes the Rook operator +/// is already installed (use [`super::RookCephOperatorScore`]). +/// +/// Each Custom Resource is a real Rust type (`kube::CustomResource`-derived), +/// applied via `K8sResourceScore::single` — the same apply path used by CNPG. +/// +/// # Ordering +/// Steps applied in this order: +/// 1. **Wait** for the `rook-ceph-operator` Deployment to be Ready (Helm +/// install returns before pods are up). +/// 2. **Wait** for `cephclusters.ceph.rook.io` CRD to be discoverable and +/// invalidate the kube-rs discovery cache. +/// 3. Apply `CephCluster`. +/// 4. Apply the `rook-ceph-tools` Deployment (typed, ported from the upstream +/// `deploy/examples/toolbox.yaml`). Image is read from the CephCluster +/// spec so it always matches the daemons. +/// 5. **Wait** for the `rook-ceph-tools` Deployment to be Ready. +/// 6. **Wait** for `ceph health` to return `HEALTH_OK` — mons in quorum, mgrs +/// up, OSDs bootstrapped, initial PGs peered. Typically 5–15 min on a +/// small cluster; capped at 20 min. +/// 7. Apply `CephBlockPool` resources + their RBD `StorageClass`es. +/// 8. Apply `CephFilesystem` resources + their CephFS `StorageClass`es. +/// 9. Apply `CephObjectStore` resources. +/// 10. Apply `CephObjectStoreUser` resources (Rook materializes their S3 +/// credentials into a `rook-ceph-object-user--` Secret per +/// user). +/// +/// The waits in steps 1, 2, 5, and 6 mean this Score takes minutes, not +/// seconds, to return — but downstream Scores like `CephVerifyClusterHealth` +/// can rely on the cluster actually being ready when this returns. +#[derive(Debug, Clone, Serialize)] +pub struct RookCephClusterScore { + pub namespace: String, + pub cluster: CephCluster, + pub block_pools: Vec, + pub filesystems: Vec, + pub object_stores: Vec, + pub object_store_users: Vec, + /// Mark the first block-pool StorageClass as the cluster default. + pub default_block_pool_storage_class: bool, +} + +impl RookCephClusterScore { + /// OKD-friendly defaults: 3-mon, 2-mgr, dashboard on 8443/ssl, replicated + /// pools size=3, useAllNodes + useAllDevices, dataDirHostPath=/var/lib/rook, + /// one CephBlockPool named "replicapool", no filesystem or object store. + pub fn default_okd(namespace: impl Into) -> Self { + let ns = namespace.into(); + + let mut cluster = CephCluster::default(); + cluster.metadata.name = Some(ns.clone()); + cluster.metadata.namespace = Some(ns.clone()); + cluster.spec.storage.use_all_nodes = true; + cluster.spec.storage.use_all_devices = true; + + let block_pool = CephBlockPool { + metadata: ObjectMeta { + name: Some("replicapool".to_string()), + namespace: Some(ns.clone()), + ..ObjectMeta::default() + }, + spec: CephBlockPoolSpec { + replicated: Some(ReplicatedSpec::default()), + failure_domain: Some(FailureDomain::Host), + ..CephBlockPoolSpec::default() + }, + }; + + Self { + namespace: ns, + cluster, + block_pools: vec![block_pool], + filesystems: vec![], + object_stores: vec![], + object_store_users: vec![], + default_block_pool_storage_class: true, + } + } +} + +impl Score for RookCephClusterScore { + fn create_interpret(&self) -> Box> { + Box::new(RookCephClusterInterpret { + score: self.clone(), + }) + } + + fn name(&self) -> String { + format!("RookCephClusterScore({})", self.namespace) + } +} + +#[derive(Debug)] +struct RookCephClusterInterpret { + score: RookCephClusterScore, +} + +#[async_trait] +impl Interpret for RookCephClusterInterpret { + async fn execute( + &self, + inventory: &Inventory, + topology: &T, + ) -> Result { + let ns = self.score.namespace.clone(); + + let k8s = topology + .k8s_client() + .await + .map_err(|e| InterpretError::new(format!("Failed to get k8s client: {e}")))?; + + // `helm install` (in RookCephOperatorScore) returns once resources are + // created — not once the operator Deployment is Ready or the CRDs are + // registered in the API server's discovery cache. Without these waits, + // the CephCluster apply below races and typically fails with + // "Cannot resolve GVK ceph.rook.io/v1/CephCluster". + info!( + "[Rook-Ceph] Waiting for rook-ceph-operator deployment in '{}'", + ns + ); + k8s.wait_until_deployment_ready( + "rook-ceph-operator", + Some(&ns), + Some(OPERATOR_READY_TIMEOUT), + ) + .await + .map_err(|e| InterpretError::new(format!("rook-ceph-operator not ready: {e}")))?; + + info!("[Rook-Ceph] Waiting for cephclusters.ceph.rook.io CRD registration"); + k8s.wait_for_crd("cephclusters.ceph.rook.io", Some(CRD_READY_TIMEOUT)) + .await + .map_err(|e| InterpretError::new(format!("CephCluster CRD not registered: {e}")))?; + // The discovery cache on our shared K8sClient was populated before the + // operator install added the Rook CRDs. Force a refresh so the next + // apply can resolve the new GVK. + k8s.invalidate_discovery().await; + + info!("[Rook-Ceph] Applying CephCluster '{}'", ns); + K8sResourceScore::single(self.score.cluster.clone(), Some(ns.clone())) + .with_force_conflicts(true) + .interpret(inventory, topology) + .await?; + + // Rook v1.19 no longer deploys the toolbox via the operator Helm chart; + // we ship it as a typed Deployment ported from the upstream + // `deploy/examples/toolbox.yaml`. Image follows whatever CephCluster + // is configured with — keeps the toolbox in lockstep with the daemons. + let toolbox_image = &self.score.cluster.spec.ceph_version.image; + info!( + "[Rook-Ceph] Applying rook-ceph-tools Deployment (image='{}')", + toolbox_image + ); + K8sResourceScore::single( + toolbox_deployment(&ns, toolbox_image), + Some(ns.clone()), + ) + .interpret(inventory, topology) + .await?; + + wait_for_toolbox_ready(&k8s, &ns).await?; + wait_for_health_ok(&k8s, &ns).await?; + + for (idx, pool) in self.score.block_pools.iter().enumerate() { + let pool_name = pool + .metadata + .name + .clone() + .unwrap_or_else(|| format!("pool-{idx}")); + info!("[Rook-Ceph] Applying CephBlockPool '{}'", pool_name); + K8sResourceScore::single(pool.clone(), Some(ns.clone())) + .with_force_conflicts(true) + .interpret(inventory, topology) + .await?; + + let sc = rbd_storage_class( + &pool_name, + &ns, + idx == 0 && self.score.default_block_pool_storage_class, + ); + info!( + "[Rook-Ceph] Applying StorageClass '{}' (RBD on pool '{}')", + sc.metadata.name.as_deref().unwrap_or("?"), + pool_name + ); + K8sResourceScore::single(sc, None) + .interpret(inventory, topology) + .await?; + } + + for fs in self.score.filesystems.iter() { + let fs_name = fs + .metadata + .name + .clone() + .unwrap_or_else(|| "cephfs".to_string()); + info!("[Rook-Ceph] Applying CephFilesystem '{}'", fs_name); + K8sResourceScore::single(fs.clone(), Some(ns.clone())) + .with_force_conflicts(true) + .interpret(inventory, topology) + .await?; + + let sc = cephfs_storage_class(&fs_name, &ns); + info!( + "[Rook-Ceph] Applying StorageClass '{}' (CephFS on filesystem '{}')", + sc.metadata.name.as_deref().unwrap_or("?"), + fs_name + ); + K8sResourceScore::single(sc, None) + .interpret(inventory, topology) + .await?; + } + + for store in self.score.object_stores.iter() { + let store_name = store + .metadata + .name + .clone() + .unwrap_or_else(|| "object-store".to_string()); + info!("[Rook-Ceph] Applying CephObjectStore '{}'", store_name); + K8sResourceScore::single(store.clone(), Some(ns.clone())) + .with_force_conflicts(true) + .interpret(inventory, topology) + .await?; + } + + for user in self.score.object_store_users.iter() { + let user_name = user + .metadata + .name + .clone() + .unwrap_or_else(|| "object-store-user".to_string()); + info!( + "[Rook-Ceph] Applying CephObjectStoreUser '{}' (store='{}'); \ + credentials will appear in Secret '{}'", + user_name, + user.spec.store, + user.credentials_secret_name(), + ); + K8sResourceScore::single(user.clone(), Some(ns.clone())) + .with_force_conflicts(true) + .interpret(inventory, topology) + .await?; + } + + Ok(Outcome::success(format!( + "Applied Rook-Ceph cluster '{}' ({} block-pool(s), {} filesystem(s), {} object-store(s), {} object-store-user(s))", + ns, + self.score.block_pools.len(), + self.score.filesystems.len(), + self.score.object_stores.len(), + self.score.object_store_users.len(), + ))) + } + + fn get_name(&self) -> InterpretName { + InterpretName::Custom("RookCephClusterInterpret") + } + + fn get_version(&self) -> Version { + todo!() + } + + fn get_status(&self) -> InterpretStatus { + todo!() + } + + fn get_children(&self) -> Vec { + todo!() + } +} + +/// Poll the `rook-ceph-tools` Deployment until it has ≥1 ready replica. This +/// is a prerequisite for any `ceph` CLI call against the cluster — including +/// the HEALTH_OK wait below, and the existing `CephVerifyClusterHealth` Score. +/// +/// `exec_app_capture_output` panics (`.expect("No matching pod")`) if called +/// when no toolbox pod exists yet — so we must gate exec on this first. +async fn wait_for_toolbox_ready(client: &Arc, ns: &str) -> Result<(), InterpretError> { + let toolbox = "rook-ceph-tools"; + info!( + "[Rook-Ceph] Waiting for '{}' deployment in '{}' (up to {}s)", + toolbox, + ns, + TOOLBOX_READY_TIMEOUT.as_secs() + ); + let start = Instant::now(); + loop { + match client.get_deployment(toolbox, Some(ns)).await { + Ok(Some(dep)) => { + let ready = dep + .status + .as_ref() + .and_then(|s| s.ready_replicas) + .unwrap_or(0); + if ready >= 1 { + info!("[Rook-Ceph] '{}' is ready ({} replica(s))", toolbox, ready); + return Ok(()); + } + debug!("[Rook-Ceph] '{}' present but 0 ready replicas", toolbox); + } + Ok(None) => debug!("[Rook-Ceph] '{}' deployment not yet created", toolbox), + Err(e) => debug!("[Rook-Ceph] error checking '{}': {e}", toolbox), + } + if start.elapsed() > TOOLBOX_READY_TIMEOUT { + return Err(InterpretError::new(format!( + "Timed out after {}s waiting for '{}' deployment to be ready in '{}'. \ + Is the operator running with toolbox.enabled=true?", + TOOLBOX_READY_TIMEOUT.as_secs(), + toolbox, + ns, + ))); + } + tokio::time::sleep(POLL_INTERVAL).await; + } +} + +/// Poll `ceph health` via the toolbox until it returns `HEALTH_OK`. Fresh +/// clusters typically sit in `HEALTH_WARN` for a few minutes while mons reach +/// quorum, mgrs come up, and OSDs bootstrap their PGs. Returning before +/// `HEALTH_OK` would race with the subsequent block-pool / object-store / +/// user applies and with `CephVerifyClusterHealth`. +async fn wait_for_health_ok(client: &Arc, ns: &str) -> Result<(), InterpretError> { + info!( + "[Rook-Ceph] Waiting for cluster to reach HEALTH_OK in '{}' (up to {}s)", + ns, + HEALTH_OK_TIMEOUT.as_secs() + ); + let start = Instant::now(); + let mut last_status = String::new(); + loop { + match client + .exec_app_capture_output( + "rook-ceph-tools".to_string(), + "app".to_string(), + Some(ns), + vec!["sh", "-c", "ceph health"], + ) + .await + { + Ok(out) => { + let trimmed = out.trim().to_string(); + if trimmed.starts_with("HEALTH_OK") { + info!("[Rook-Ceph] Cluster reached HEALTH_OK"); + return Ok(()); + } + if trimmed != last_status { + info!("[Rook-Ceph] ceph health: {}", trimmed); + last_status = trimmed; + } + } + Err(e) => debug!("[Rook-Ceph] ceph health exec failed: {e}"), + } + if start.elapsed() > HEALTH_OK_TIMEOUT { + warn!( + "[Rook-Ceph] Last ceph health output before timeout: {}", + last_status + ); + return Err(InterpretError::new(format!( + "Timed out after {}s waiting for HEALTH_OK in '{}'. Last status: '{}'", + HEALTH_OK_TIMEOUT.as_secs(), + ns, + last_status, + ))); + } + tokio::time::sleep(POLL_INTERVAL).await; + } +} + +fn rbd_storage_class(pool_name: &str, ns: &str, is_default: bool) -> StorageClass { + let mut params = BTreeMap::new(); + params.insert("clusterID".to_string(), ns.to_string()); + params.insert("pool".to_string(), pool_name.to_string()); + params.insert("imageFormat".to_string(), "2".to_string()); + params.insert("imageFeatures".to_string(), "layering".to_string()); + params.insert( + "csi.storage.k8s.io/provisioner-secret-name".to_string(), + "rook-csi-rbd-provisioner".to_string(), + ); + params.insert( + "csi.storage.k8s.io/provisioner-secret-namespace".to_string(), + ns.to_string(), + ); + params.insert( + "csi.storage.k8s.io/controller-expand-secret-name".to_string(), + "rook-csi-rbd-provisioner".to_string(), + ); + params.insert( + "csi.storage.k8s.io/controller-expand-secret-namespace".to_string(), + ns.to_string(), + ); + params.insert( + "csi.storage.k8s.io/node-stage-secret-name".to_string(), + "rook-csi-rbd-node".to_string(), + ); + params.insert( + "csi.storage.k8s.io/node-stage-secret-namespace".to_string(), + ns.to_string(), + ); + params.insert("csi.storage.k8s.io/fstype".to_string(), "ext4".to_string()); + + let mut annotations = BTreeMap::new(); + if is_default { + annotations.insert( + "storageclass.kubernetes.io/is-default-class".to_string(), + "true".to_string(), + ); + } + + StorageClass { + metadata: ObjectMeta { + name: Some(format!("rook-ceph-block-{pool_name}")), + annotations: if annotations.is_empty() { + None + } else { + Some(annotations) + }, + ..ObjectMeta::default() + }, + provisioner: format!("{ns}.rbd.csi.ceph.com"), + parameters: Some(params), + reclaim_policy: Some("Delete".to_string()), + allow_volume_expansion: Some(true), + volume_binding_mode: Some("Immediate".to_string()), + ..StorageClass::default() + } +} + +fn cephfs_storage_class(fs_name: &str, ns: &str) -> StorageClass { + let mut params = BTreeMap::new(); + params.insert("clusterID".to_string(), ns.to_string()); + params.insert("fsName".to_string(), fs_name.to_string()); + params.insert("pool".to_string(), format!("{fs_name}-data0")); + params.insert( + "csi.storage.k8s.io/provisioner-secret-name".to_string(), + "rook-csi-cephfs-provisioner".to_string(), + ); + params.insert( + "csi.storage.k8s.io/provisioner-secret-namespace".to_string(), + ns.to_string(), + ); + params.insert( + "csi.storage.k8s.io/controller-expand-secret-name".to_string(), + "rook-csi-cephfs-provisioner".to_string(), + ); + params.insert( + "csi.storage.k8s.io/controller-expand-secret-namespace".to_string(), + ns.to_string(), + ); + params.insert( + "csi.storage.k8s.io/node-stage-secret-name".to_string(), + "rook-csi-cephfs-node".to_string(), + ); + params.insert( + "csi.storage.k8s.io/node-stage-secret-namespace".to_string(), + ns.to_string(), + ); + + StorageClass { + metadata: ObjectMeta { + name: Some(format!("rook-cephfs-{fs_name}")), + ..ObjectMeta::default() + }, + provisioner: format!("{ns}.cephfs.csi.ceph.com"), + parameters: Some(params), + reclaim_policy: Some("Delete".to_string()), + allow_volume_expansion: Some(true), + volume_binding_mode: Some("Immediate".to_string()), + ..StorageClass::default() + } +} diff --git a/harmony/src/modules/storage/ceph/crd/block_pool.rs b/harmony/src/modules/storage/ceph/crd/block_pool.rs new file mode 100644 index 00000000..397436a0 --- /dev/null +++ b/harmony/src/modules/storage/ceph/crd/block_pool.rs @@ -0,0 +1,53 @@ +use std::collections::BTreeMap; + +use kube::{CustomResource, api::ObjectMeta}; +use serde::{Deserialize, Serialize}; + +use super::shared::{ErasureCodedSpec, FailureDomain, ReplicatedSpec}; + +#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)] +#[kube( + group = "ceph.rook.io", + version = "v1", + kind = "CephBlockPool", + plural = "cephblockpools", + namespaced = true, + schema = "disabled" +)] +#[serde(rename_all = "camelCase")] +pub struct CephBlockPoolSpec { + #[serde(skip_serializing_if = "Option::is_none")] + pub replicated: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub erasure_coded: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub failure_domain: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub device_class: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub parameters: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub enable_rbd_stats: Option, +} + +impl Default for CephBlockPool { + fn default() -> Self { + Self { + metadata: ObjectMeta::default(), + spec: CephBlockPoolSpec::default(), + } + } +} + +impl Default for CephBlockPoolSpec { + fn default() -> Self { + Self { + replicated: Some(ReplicatedSpec::default()), + erasure_coded: None, + failure_domain: Some(FailureDomain::Host), + device_class: None, + parameters: None, + enable_rbd_stats: None, + } + } +} diff --git a/harmony/src/modules/storage/ceph/crd/cluster.rs b/harmony/src/modules/storage/ceph/crd/cluster.rs new file mode 100644 index 00000000..9bab64be --- /dev/null +++ b/harmony/src/modules/storage/ceph/crd/cluster.rs @@ -0,0 +1,295 @@ +use std::collections::BTreeMap; + +use kube::{CustomResource, api::ObjectMeta}; +use serde::{Deserialize, Serialize}; + +use super::shared::{PlacementSpec, VolumeClaimTemplate}; + +#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)] +#[kube( + group = "ceph.rook.io", + version = "v1", + kind = "CephCluster", + plural = "cephclusters", + namespaced = true, + schema = "disabled" +)] +#[serde(rename_all = "camelCase")] +pub struct CephClusterSpec { + pub ceph_version: CephVersionSpec, + pub data_dir_host_path: String, + pub mon: MonSpec, + pub mgr: MgrSpec, + pub dashboard: DashboardSpec, + pub storage: StorageSpec, + #[serde(skip_serializing_if = "Option::is_none")] + pub network: Option, + #[serde(skip_serializing_if = "BTreeMap::is_empty", default)] + pub placement: BTreeMap, + #[serde(skip_serializing_if = "Option::is_none")] + pub crash_collector: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub log_collector: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub remove_osds_if_out_and_safe_to_remove: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub continue_upgrade_after_checks_even_if_not_healthy: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub skip_upgrade_checks: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub wait_timeout_for_healthy_osd_in_minutes: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub disruption_management: Option, + /// Centralized Ceph config applied by the operator after MONs reach quorum. + /// Outer key is the "WHO" target (e.g. `"global"`, `"osd.*"`, `"mgr.*"`, + /// `"client.rgw."`, or a specific daemon like `"osd.0"`). Inner map + /// is `option-name -> string-value` — Rook calls `ceph config set + /// ` for each entry. Values must be strings even for numerics + /// and booleans; Ceph parses them. Rook does not validate the keys. + /// + /// Prefer [`CephClusterSpec::set_config`] for ergonomic insertion. + #[serde(skip_serializing_if = "Option::is_none")] + pub ceph_config: Option>>, +} + +impl CephClusterSpec { + /// Set a single centralized Ceph config entry (`ceph config set + /// `). Chainable. Creates the `ceph_config` map and the + /// per-WHO sub-map on demand. + /// + /// Common `who` values: `"global"`, `"osd.*"`, `"mon.*"`, `"mgr.*"`, + /// `"client.rgw."`, or a specific daemon like `"osd.0"`. + pub fn set_config( + &mut self, + who: impl Into, + key: impl Into, + value: impl Into, + ) -> &mut Self { + self.ceph_config + .get_or_insert_with(BTreeMap::new) + .entry(who.into()) + .or_default() + .insert(key.into(), value.into()); + self + } +} + +impl Default for CephCluster { + fn default() -> Self { + Self { + metadata: ObjectMeta::default(), + spec: CephClusterSpec::default(), + } + } +} + +impl Default for CephClusterSpec { + fn default() -> Self { + Self { + ceph_version: CephVersionSpec::default(), + data_dir_host_path: "/var/lib/rook".to_string(), + mon: MonSpec::default(), + mgr: MgrSpec::default(), + dashboard: DashboardSpec::default(), + storage: StorageSpec::default(), + network: None, + placement: BTreeMap::new(), + crash_collector: None, + log_collector: None, + remove_osds_if_out_and_safe_to_remove: None, + continue_upgrade_after_checks_even_if_not_healthy: None, + skip_upgrade_checks: None, + wait_timeout_for_healthy_osd_in_minutes: None, + disruption_management: None, + ceph_config: None, + } + } +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct CephVersionSpec { + pub image: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub allow_unsupported: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub image_pull_policy: Option, +} + +impl Default for CephVersionSpec { + fn default() -> Self { + // Pinned to the full version+build tag explicitly recommended for + // production by the Rook 1.19 docs (Upgrade/ceph-upgrade). The + // date-stamped suffix locks the exact container image so the cluster + // can't go heterogeneous across daemon restarts. + Self { + image: "quay.io/ceph/ceph:v19.2.3-20250717".to_string(), + allow_unsupported: Some(false), + image_pull_policy: Some("IfNotPresent".to_string()), + } + } +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct MonSpec { + pub count: u32, + pub allow_multiple_per_node: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub volume_claim_template: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub failure_domain_label: Option, +} + +impl Default for MonSpec { + fn default() -> Self { + Self { + count: 3, + allow_multiple_per_node: false, + volume_claim_template: None, + failure_domain_label: None, + } + } +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct MgrSpec { + pub count: u32, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub modules: Vec, +} + +impl Default for MgrSpec { + fn default() -> Self { + Self { + count: 2, + modules: vec![ModuleSpec { + name: "pg_autoscaler".to_string(), + enabled: true, + }], + } + } +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct ModuleSpec { + pub name: String, + pub enabled: bool, +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct DashboardSpec { + pub enabled: bool, + pub ssl: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub port: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub url_prefix: Option, +} + +impl Default for DashboardSpec { + fn default() -> Self { + Self { + enabled: true, + ssl: true, + port: Some(8443), + url_prefix: None, + } + } +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct StorageSpec { + pub use_all_nodes: bool, + pub use_all_devices: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub device_filter: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub device_path_filter: Option, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub nodes: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub config: Option>, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub storage_class_device_sets: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub allow_device_class_update: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub allow_osd_crush_weight_update: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct NodeSpec { + pub name: String, + #[serde(skip_serializing_if = "Vec::is_empty", default)] + pub devices: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub config: Option>, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct DeviceSpec { + pub name: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub config: Option>, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct StorageClassDeviceSet { + pub name: String, + pub count: u32, + pub portable: bool, + pub volume_claim_templates: Vec, + #[serde(skip_serializing_if = "Option::is_none")] + pub placement: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct NetworkSpec { + #[serde(skip_serializing_if = "Option::is_none")] + pub provider: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub host_network: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub dual_stack: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub ipv4: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub ipv6: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct CrashCollectorSpec { + pub disable: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub days_to_retain: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct LogCollectorSpec { + pub enabled: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub periodicity: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub max_log_size: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct DisruptionManagementSpec { + pub manage_pod_budgets: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub osd_maintenance_timeout: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub pg_health_check_timeout: Option, +} diff --git a/harmony/src/modules/storage/ceph/crd/filesystem.rs b/harmony/src/modules/storage/ceph/crd/filesystem.rs new file mode 100644 index 00000000..5f87a7f7 --- /dev/null +++ b/harmony/src/modules/storage/ceph/crd/filesystem.rs @@ -0,0 +1,54 @@ +use kube::{CustomResource, api::ObjectMeta}; +use serde::{Deserialize, Serialize}; + +use super::shared::{MetadataServerSpec, NamedPoolSpec, PoolSpec, ReplicatedSpec}; + +#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)] +#[kube( + group = "ceph.rook.io", + version = "v1", + kind = "CephFilesystem", + plural = "cephfilesystems", + namespaced = true, + schema = "disabled" +)] +#[serde(rename_all = "camelCase")] +pub struct CephFilesystemSpec { + pub metadata_pool: PoolSpec, + pub data_pools: Vec, + pub metadata_server: MetadataServerSpec, + #[serde(skip_serializing_if = "Option::is_none")] + pub preserve_filesystem_on_delete: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub preserve_pools_on_delete: Option, +} + +impl Default for CephFilesystem { + fn default() -> Self { + Self { + metadata: ObjectMeta::default(), + spec: CephFilesystemSpec::default(), + } + } +} + +impl Default for CephFilesystemSpec { + fn default() -> Self { + Self { + metadata_pool: PoolSpec { + replicated: Some(ReplicatedSpec::default()), + ..PoolSpec::default() + }, + data_pools: vec![NamedPoolSpec { + name: "data0".to_string(), + spec: PoolSpec { + replicated: Some(ReplicatedSpec::default()), + ..PoolSpec::default() + }, + }], + metadata_server: MetadataServerSpec::default(), + preserve_filesystem_on_delete: Some(true), + preserve_pools_on_delete: Some(false), + } + } +} diff --git a/harmony/src/modules/storage/ceph/crd/mod.rs b/harmony/src/modules/storage/ceph/crd/mod.rs new file mode 100644 index 00000000..5a0b96aa --- /dev/null +++ b/harmony/src/modules/storage/ceph/crd/mod.rs @@ -0,0 +1,13 @@ +pub mod block_pool; +pub mod cluster; +pub mod filesystem; +pub mod object_store; +pub mod object_store_user; +pub mod shared; + +pub use block_pool::*; +pub use cluster::*; +pub use filesystem::*; +pub use object_store::*; +pub use object_store_user::*; +pub use shared::*; diff --git a/harmony/src/modules/storage/ceph/crd/object_store.rs b/harmony/src/modules/storage/ceph/crd/object_store.rs new file mode 100644 index 00000000..fc805099 --- /dev/null +++ b/harmony/src/modules/storage/ceph/crd/object_store.rs @@ -0,0 +1,74 @@ +use kube::{CustomResource, api::ObjectMeta}; +use serde::{Deserialize, Serialize}; + +use super::shared::{PlacementSpec, PoolSpec, ReplicatedSpec}; + +#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)] +#[kube( + group = "ceph.rook.io", + version = "v1", + kind = "CephObjectStore", + plural = "cephobjectstores", + namespaced = true, + schema = "disabled" +)] +#[serde(rename_all = "camelCase")] +pub struct CephObjectStoreSpec { + pub metadata_pool: PoolSpec, + pub data_pool: PoolSpec, + pub gateway: GatewaySpec, + #[serde(skip_serializing_if = "Option::is_none")] + pub preserve_pools_on_delete: Option, +} + +impl Default for CephObjectStore { + fn default() -> Self { + Self { + metadata: ObjectMeta::default(), + spec: CephObjectStoreSpec::default(), + } + } +} + +impl Default for CephObjectStoreSpec { + fn default() -> Self { + Self { + metadata_pool: PoolSpec { + replicated: Some(ReplicatedSpec::default()), + ..PoolSpec::default() + }, + data_pool: PoolSpec { + replicated: Some(ReplicatedSpec::default()), + ..PoolSpec::default() + }, + gateway: GatewaySpec::default(), + preserve_pools_on_delete: Some(false), + } + } +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct GatewaySpec { + /// Non-secure gateway port. Defaults to 8080 to dodge OKD's <1024 bind restriction. + pub port: u16, + #[serde(skip_serializing_if = "Option::is_none")] + pub secure_port: Option, + pub instances: u32, + #[serde(skip_serializing_if = "Option::is_none")] + pub placement: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub ssl_certificate_ref: Option, +} + +impl Default for GatewaySpec { + fn default() -> Self { + Self { + port: 8080, + secure_port: None, + instances: 1, + placement: None, + ssl_certificate_ref: None, + } + } +} diff --git a/harmony/src/modules/storage/ceph/crd/object_store_user.rs b/harmony/src/modules/storage/ceph/crd/object_store_user.rs new file mode 100644 index 00000000..b599f116 --- /dev/null +++ b/harmony/src/modules/storage/ceph/crd/object_store_user.rs @@ -0,0 +1,128 @@ +use kube::{CustomResource, api::ObjectMeta}; +use serde::{Deserialize, Serialize}; + +/// `CephObjectStoreUser` — provisions an RGW S3 user against a `CephObjectStore`. +/// +/// Rook auto-creates a Kubernetes Secret named +/// `rook-ceph-object-user--` containing the user's access +/// credentials, with these base64-encoded keys: +/// - `AccessKey` — S3 access key ID (20 bytes) +/// - `SecretKey` — S3 secret access key (40 bytes) +/// +/// The Secret lives in the same namespace as the user. +#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)] +#[kube( + group = "ceph.rook.io", + version = "v1", + kind = "CephObjectStoreUser", + plural = "cephobjectstoreusers", + namespaced = true, + schema = "disabled" +)] +#[serde(rename_all = "camelCase")] +pub struct CephObjectStoreUserSpec { + /// Name of the `CephObjectStore` this user belongs to. + pub store: String, + #[serde(skip_serializing_if = "Option::is_none")] + pub display_name: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub cluster_namespace: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub quotas: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub capabilities: Option, +} + +impl Default for CephObjectStoreUser { + fn default() -> Self { + Self { + metadata: ObjectMeta::default(), + spec: CephObjectStoreUserSpec::default(), + } + } +} + +impl Default for CephObjectStoreUserSpec { + fn default() -> Self { + Self { + store: String::new(), + display_name: None, + cluster_namespace: None, + quotas: None, + capabilities: None, + } + } +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct UserQuotas { + #[serde(skip_serializing_if = "Option::is_none")] + pub max_buckets: Option, + /// Resource-quantity string, e.g. `"10G"`, `"500M"`. + #[serde(skip_serializing_if = "Option::is_none")] + pub max_size: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub max_objects: Option, +} + +/// RGW capability strings — each accepts `"read"`, `"write"`, `"read, write"`, +/// or `"*"`. +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct UserCapabilities { + #[serde(skip_serializing_if = "Option::is_none")] + pub user: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub bucket: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub usage: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub metadata: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub zone: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub roles: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub info: Option, + #[serde(rename = "amz-cache", skip_serializing_if = "Option::is_none")] + pub amz_cache: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub bilog: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub mdlog: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub datalog: Option, + #[serde(rename = "user-policy", skip_serializing_if = "Option::is_none")] + pub user_policy: Option, + #[serde(rename = "odic-provider", skip_serializing_if = "Option::is_none")] + pub odic_provider: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub ratelimit: Option, +} + +impl CephObjectStoreUser { + /// Convenience constructor: a user attached to `store` with the given + /// display name. Capabilities and quotas left unset (RGW defaults apply). + pub fn for_store(name: &str, namespace: &str, store: &str, display_name: &str) -> Self { + Self { + metadata: ObjectMeta { + name: Some(name.to_string()), + namespace: Some(namespace.to_string()), + ..ObjectMeta::default() + }, + spec: CephObjectStoreUserSpec { + store: store.to_string(), + display_name: Some(display_name.to_string()), + ..CephObjectStoreUserSpec::default() + }, + } + } + + /// Name of the auto-generated Secret carrying this user's `AccessKey` / + /// `SecretKey` (base64-encoded). + pub fn credentials_secret_name(&self) -> String { + let user = self.metadata.name.as_deref().unwrap_or(""); + format!("rook-ceph-object-user-{}-{}", self.spec.store, user) + } +} diff --git a/harmony/src/modules/storage/ceph/crd/shared.rs b/harmony/src/modules/storage/ceph/crd/shared.rs new file mode 100644 index 00000000..dd6995c6 --- /dev/null +++ b/harmony/src/modules/storage/ceph/crd/shared.rs @@ -0,0 +1,117 @@ +use std::collections::BTreeMap; + +use k8s_openapi::api::core::v1::{PersistentVolumeClaim, Toleration}; +use serde::{Deserialize, Serialize}; + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct PoolSpec { + #[serde(skip_serializing_if = "Option::is_none")] + pub replicated: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub erasure_coded: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub failure_domain: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub device_class: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub parameters: Option>, +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct ReplicatedSpec { + pub size: u32, + #[serde(skip_serializing_if = "Option::is_none")] + pub require_safe_replica_size: Option, +} + +impl Default for ReplicatedSpec { + fn default() -> Self { + Self { + size: 3, + require_safe_replica_size: Some(true), + } + } +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct ErasureCodedSpec { + pub data_chunks: u32, + pub coding_chunks: u32, +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "lowercase")] +pub enum FailureDomain { + Osd, + Host, + Rack, + Zone, + Region, +} + +impl Default for FailureDomain { + fn default() -> Self { + Self::Host + } +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct NamedPoolSpec { + pub name: String, + #[serde(flatten)] + pub spec: PoolSpec, +} + +#[derive(Deserialize, Serialize, Clone, Debug)] +#[serde(rename_all = "camelCase")] +pub struct MetadataServerSpec { + pub active_count: u32, + pub active_standby: bool, + #[serde(skip_serializing_if = "Option::is_none")] + pub placement: Option, +} + +impl Default for MetadataServerSpec { + fn default() -> Self { + Self { + active_count: 1, + active_standby: true, + placement: None, + } + } +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct PlacementSpec { + #[serde(skip_serializing_if = "Option::is_none")] + pub node_affinity: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub pod_affinity: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub pod_anti_affinity: Option, + #[serde(skip_serializing_if = "Option::is_none")] + pub tolerations: Option>, + #[serde(skip_serializing_if = "Option::is_none")] + pub topology_spread_constraints: Option, +} + +#[derive(Deserialize, Serialize, Clone, Debug, Default)] +#[serde(rename_all = "camelCase")] +pub struct VolumeClaimTemplate { + pub metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta, + pub spec: k8s_openapi::api::core::v1::PersistentVolumeClaimSpec, +} + +impl From for VolumeClaimTemplate { + fn from(pvc: PersistentVolumeClaim) -> Self { + Self { + metadata: pvc.metadata, + spec: pvc.spec.unwrap_or_default(), + } + } +} diff --git a/harmony/src/modules/storage/ceph/mod.rs b/harmony/src/modules/storage/ceph/mod.rs index 0a3dcecf..cddb8428 100644 --- a/harmony/src/modules/storage/ceph/mod.rs +++ b/harmony/src/modules/storage/ceph/mod.rs @@ -1,2 +1,9 @@ pub mod ceph_remove_osd_score; pub mod ceph_validate_health_score; +pub mod cluster_score; +pub mod crd; +pub mod operator_score; +pub mod toolbox; + +pub use cluster_score::*; +pub use operator_score::*; diff --git a/harmony/src/modules/storage/ceph/operator_score.rs b/harmony/src/modules/storage/ceph/operator_score.rs new file mode 100644 index 00000000..fafc0e4c --- /dev/null +++ b/harmony/src/modules/storage/ceph/operator_score.rs @@ -0,0 +1,128 @@ +use std::collections::HashMap; +use std::str::FromStr; + +use non_blank_string_rs::NonBlankString; +use serde::Serialize; + +use crate::interpret::Interpret; +use crate::modules::helm::chart::{HelmChartScore, HelmRepository}; +use crate::score::Score; +use crate::topology::{HelmCommand, Topology}; +use harmony_macros::hurl; + +/// Install the Rook-Ceph operator via its upstream Helm chart. +/// +/// The Rook docs explicitly recommend Helm on OpenShift/OKD because the chart +/// automatically creates the `SecurityContextConstraints` resources OKD requires. +/// This Score wraps `HelmChartScore` against `https://charts.rook.io/release`. +/// +/// The chart installs: +/// - The `rook-ceph` operator Deployment +/// - All `ceph.rook.io/v1` CRDs (CephCluster, CephBlockPool, CephFilesystem, CephObjectStore, ...) +/// - RBAC (ServiceAccounts, Roles, ClusterRoles, RoleBindings, ClusterRoleBindings) +/// - OpenShift SCC bindings when `hostpath_requires_privileged` is true +/// +/// The chart does **not** install the `rook-ceph-tools` (toolbox) pod in +/// Rook v1.19 — that's now a standalone manifest in `deploy/examples/`. +/// `RookCephClusterScore` deploys it as a typed Rust Deployment via +/// [`super::toolbox::toolbox_deployment`]. +/// +/// The CRs that consume these CRDs are deployed separately by +/// `RookCephClusterScore` (typed Rust structs applied via `K8sResourceScore`), +/// preserving compile-time type-safety on the user-facing surface. +/// +/// # OKD requirements +/// - `hostpath_requires_privileged` must be `true` — OpenShift's SELinux +/// restricted-set blocks hostPath writes otherwise. +/// +/// # Usage +/// ```ignore +/// use harmony::modules::storage::ceph::RookCephOperatorScore; +/// let score = RookCephOperatorScore::default_okd(); +/// ``` +#[derive(Debug, Clone, Serialize)] +pub struct RookCephOperatorScore { + pub namespace: String, + pub chart_version: Option, + pub hostpath_requires_privileged: bool, + pub enable_rbd_driver: bool, + pub enable_cephfs_driver: bool, +} + +impl RookCephOperatorScore { + /// OKD-friendly defaults: `rook-ceph` namespace, both CSI drivers enabled, + /// hostPath privileged mode on. Pinned to Rook 1.19.5 — the latest stable + /// release at the time this Score was written, and the pair tested + /// against the Ceph image default in + /// [`super::crd::CephVersionSpec::default`]. + pub fn default_okd() -> Self { + Self { + namespace: "rook-ceph".to_string(), + chart_version: Some("v1.19.5".to_string()), + hostpath_requires_privileged: true, + enable_rbd_driver: true, + enable_cephfs_driver: true, + } + } + + /// K3s / vanilla-K8s defaults — same as OKD but without the privileged flag. + pub fn default_k8s() -> Self { + Self { + hostpath_requires_privileged: false, + ..Self::default_okd() + } + } +} + +impl Default for RookCephOperatorScore { + fn default() -> Self { + Self::default_okd() + } +} + +impl Score for RookCephOperatorScore { + fn create_interpret(&self) -> Box> { + let mut values: HashMap = HashMap::new(); + values.insert( + NonBlankString::from_str("csi.enableRbdDriver").unwrap(), + self.enable_rbd_driver.to_string(), + ); + values.insert( + NonBlankString::from_str("csi.enableCephfsDriver").unwrap(), + self.enable_cephfs_driver.to_string(), + ); + if self.hostpath_requires_privileged { + values.insert( + NonBlankString::from_str("hostpathRequiresPrivileged").unwrap(), + "true".to_string(), + ); + } + + let chart_version = self + .chart_version + .as_ref() + .map(|v| NonBlankString::from_str(v).expect("chart_version must be non-blank")); + + let helm_score = HelmChartScore { + namespace: Some(NonBlankString::from_str(&self.namespace).unwrap()), + release_name: NonBlankString::from_str("rook-ceph").unwrap(), + chart_name: NonBlankString::from_str("rook-release/rook-ceph").unwrap(), + chart_version, + values_overrides: Some(values), + values_yaml: None, + create_namespace: true, + install_only: true, + repository: Some(HelmRepository::new( + "rook-release".to_string(), + hurl!("https://charts.rook.io/release"), + true, + )), + }; + + helm_score.create_interpret() + } + + fn name(&self) -> String { + format!("RookCephOperatorScore({})", self.namespace) + } +} diff --git a/harmony/src/modules/storage/ceph/toolbox.rs b/harmony/src/modules/storage/ceph/toolbox.rs new file mode 100644 index 00000000..d58917e3 --- /dev/null +++ b/harmony/src/modules/storage/ceph/toolbox.rs @@ -0,0 +1,255 @@ +//! Typed `rook-ceph-tools` Deployment. +//! +//! In Rook v1.19 the toolbox is *not* installed by the operator Helm chart — +//! it's a standalone manifest at `deploy/examples/toolbox.yaml` in the rook +//! repo. This module ports that manifest to a typed `k8s_openapi::Deployment` +//! so [`super::RookCephClusterScore`] can apply it via `K8sResourceScore` +//! without dropping back to raw YAML. +//! +//! The container image is sourced from the CephCluster spec's `cephVersion` +//! so the toolbox stays in lockstep with the cluster's Ceph version. + +use std::collections::BTreeMap; + +use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec}; +use k8s_openapi::api::core::v1::{ + Capabilities, ConfigMapVolumeSource, Container, EmptyDirVolumeSource, EnvVar, EnvVarSource, + KeyToPath, PodSpec, PodTemplateSpec, SecretKeySelector, SecretVolumeSource, SecurityContext, + Toleration, Volume, VolumeMount, +}; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; + +/// Inline replication of Rook's `toolbox.sh` — re-renders `/etc/ceph/ceph.conf` +/// and keyring on mon-endpoint changes so the bundled `quay.io/ceph/ceph` +/// image (which lacks the rook tooling) can be used directly. +/// +/// Verbatim from `deploy/examples/toolbox.yaml` in rook/rook@v1.19.5. +const TOOLBOX_SCRIPT: &str = r#"# Replicate the script from toolbox.sh inline so the ceph image +# can be run directly, instead of requiring the rook toolbox +CEPH_CONFIG="/etc/ceph/ceph.conf" +MON_CONFIG="/etc/rook/mon-endpoints" +KEYRING_FILE="/etc/ceph/keyring" +CONFIG_OVERRIDE="/etc/rook-config-override/config" + +# create a ceph config file in its default location so ceph/rados tools can be used +# without specifying any arguments +write_endpoints() { + endpoints=$(cat ${MON_CONFIG}) + + # filter out the mon names + # external cluster can have numbers or hyphens in mon names, handling them in regex + # shellcheck disable=SC2001 + mon_endpoints=$(echo "${endpoints}"| sed 's/[a-z0-9_-]\+=//g') + + DATE=$(date) + echo "$DATE writing mon endpoints to ${CEPH_CONFIG}: ${endpoints}" + cat < ${CEPH_CONFIG} +[global] +mon_host = ${mon_endpoints} + +[client.admin] +keyring = ${KEYRING_FILE} +EOF + + # Merge the config override if it exists and is not empty + if [ -f "${CONFIG_OVERRIDE}" ] && [ -s "${CONFIG_OVERRIDE}" ]; then + echo "$DATE merging config override from ${CONFIG_OVERRIDE}" + echo "" >> ${CEPH_CONFIG} + cat ${CONFIG_OVERRIDE} >> ${CEPH_CONFIG} + fi +} + +# watch the endpoints config file and update if the mon endpoints ever change +watch_endpoints() { + # get the timestamp for the target of the soft link + real_path=$(realpath ${MON_CONFIG}) + initial_time=$(stat -c %Z "${real_path}") + while true; do + real_path=$(realpath ${MON_CONFIG}) + latest_time=$(stat -c %Z "${real_path}") + + if [[ "${latest_time}" != "${initial_time}" ]]; then + write_endpoints + initial_time=${latest_time} + fi + + sleep 10 + done +} + +# read the secret from an env var (for backward compatibility), or from the secret file +ceph_secret=${ROOK_CEPH_SECRET} +if [[ "$ceph_secret" == "" ]]; then + ceph_secret=$(cat /var/lib/rook-ceph-mon/secret.keyring) +fi + +# create the keyring file +cat < ${KEYRING_FILE} +[${ROOK_CEPH_USERNAME}] +key = ${ceph_secret} +EOF + +# write the initial config file +write_endpoints + +# continuously update the mon endpoints if they fail over +watch_endpoints +"#; + +const TOOLBOX_NAME: &str = "rook-ceph-tools"; +const TOOLBOX_LABEL: &str = "rook-ceph-tools"; + +/// Build the canonical `rook-ceph-tools` Deployment for the given namespace +/// and Ceph container image. Ported verbatim from rook/rook@v1.19.5's +/// `deploy/examples/toolbox.yaml`. +pub fn toolbox_deployment(namespace: &str, image: &str) -> Deployment { + let labels = { + let mut m = BTreeMap::new(); + m.insert("app".to_string(), TOOLBOX_LABEL.to_string()); + m + }; + + let container = Container { + name: TOOLBOX_NAME.to_string(), + image: Some(image.to_string()), + image_pull_policy: Some("IfNotPresent".to_string()), + command: Some(vec![ + "/bin/bash".to_string(), + "-c".to_string(), + TOOLBOX_SCRIPT.to_string(), + ]), + tty: Some(true), + security_context: Some(SecurityContext { + run_as_non_root: Some(true), + run_as_user: Some(2016), + run_as_group: Some(2016), + capabilities: Some(Capabilities { + drop: Some(vec!["ALL".to_string()]), + ..Capabilities::default() + }), + ..SecurityContext::default() + }), + env: Some(vec![EnvVar { + name: "ROOK_CEPH_USERNAME".to_string(), + value_from: Some(EnvVarSource { + secret_key_ref: Some(SecretKeySelector { + name: "rook-ceph-mon".to_string(), + key: "ceph-username".to_string(), + optional: None, + }), + ..EnvVarSource::default() + }), + ..EnvVar::default() + }]), + volume_mounts: Some(vec![ + VolumeMount { + name: "ceph-config".to_string(), + mount_path: "/etc/ceph".to_string(), + ..VolumeMount::default() + }, + VolumeMount { + name: "mon-endpoint-volume".to_string(), + mount_path: "/etc/rook".to_string(), + ..VolumeMount::default() + }, + VolumeMount { + name: "ceph-admin-secret".to_string(), + mount_path: "/var/lib/rook-ceph-mon".to_string(), + read_only: Some(true), + ..VolumeMount::default() + }, + VolumeMount { + name: "rook-config-override".to_string(), + mount_path: "/etc/rook-config-override".to_string(), + read_only: Some(true), + ..VolumeMount::default() + }, + ]), + ..Container::default() + }; + + let volumes = vec![ + Volume { + name: "ceph-admin-secret".to_string(), + secret: Some(SecretVolumeSource { + secret_name: Some("rook-ceph-mon".to_string()), + optional: Some(false), + items: Some(vec![KeyToPath { + key: "ceph-secret".to_string(), + path: "secret.keyring".to_string(), + ..KeyToPath::default() + }]), + ..SecretVolumeSource::default() + }), + ..Volume::default() + }, + Volume { + name: "mon-endpoint-volume".to_string(), + config_map: Some(ConfigMapVolumeSource { + name: "rook-ceph-mon-endpoints".to_string(), + items: Some(vec![KeyToPath { + key: "data".to_string(), + path: "mon-endpoints".to_string(), + ..KeyToPath::default() + }]), + ..ConfigMapVolumeSource::default() + }), + ..Volume::default() + }, + Volume { + name: "rook-config-override".to_string(), + config_map: Some(ConfigMapVolumeSource { + name: "rook-config-override".to_string(), + optional: Some(true), + ..ConfigMapVolumeSource::default() + }), + ..Volume::default() + }, + Volume { + name: "ceph-config".to_string(), + empty_dir: Some(EmptyDirVolumeSource::default()), + ..Volume::default() + }, + ]; + + let pod_spec = PodSpec { + dns_policy: Some("ClusterFirstWithHostNet".to_string()), + service_account_name: Some("rook-ceph-default".to_string()), + containers: vec![container], + volumes: Some(volumes), + tolerations: Some(vec![Toleration { + key: Some("node.kubernetes.io/unreachable".to_string()), + operator: Some("Exists".to_string()), + effect: Some("NoExecute".to_string()), + toleration_seconds: Some(5), + ..Toleration::default() + }]), + ..PodSpec::default() + }; + + Deployment { + metadata: ObjectMeta { + name: Some(TOOLBOX_NAME.to_string()), + namespace: Some(namespace.to_string()), + labels: Some(labels.clone()), + ..ObjectMeta::default() + }, + spec: Some(DeploymentSpec { + replicas: Some(1), + selector: LabelSelector { + match_labels: Some(labels.clone()), + ..LabelSelector::default() + }, + template: PodTemplateSpec { + metadata: Some(ObjectMeta { + labels: Some(labels), + ..ObjectMeta::default() + }), + spec: Some(pod_spec), + }, + ..DeploymentSpec::default() + }), + ..Deployment::default() + } +}