feat/ceph-score #297

Open
stremblay wants to merge 19 commits from feat/ceph-score into master
16 changed files with 1883 additions and 6 deletions

10
Cargo.lock generated
View File

@@ -2685,6 +2685,16 @@ dependencies = [
"url",
]
[[package]]
name = "example-install-rook-ceph"
version = "0.1.0"
dependencies = [
"harmony",
"harmony_cli",
"k8s-openapi",
"tokio",
]
[[package]]
name = "example-k8s-drain-node"
version = "0.1.0"

View File

@@ -0,0 +1,13 @@
[package]
name = "example-install-rook-ceph"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
publish = false
[dependencies]
harmony = { path = "../../harmony" }
harmony_cli = { path = "../../harmony_cli" }
k8s-openapi = { workspace = true }
tokio = { workspace = true }

View File

@@ -0,0 +1,31 @@
# shellcheck shell=bash
# Source this file before running the example:
# source env.sh && cargo run -p example-install-rook-ceph
# ---------- Target an existing cluster (no local K3D) ------------------------
# This example is meant to drive a real cluster (pico OKD, vanilla K8s with an
# ingress, etc.) — not a throwaway local one. Force K8sAnywhereTopology to use
# whatever cluster the kubeconfig points at.
export HARMONY_USE_LOCAL_K3D=false
# K8sAnywhereTopology resolves the kubeconfig via:
# 1. KUBECONFIG env var
# 2. fallback: $HOME/.kube/config
# Uncomment if your config isn't in the default location.
# export KUBECONFIG=/path/to/your/kubeconfig
# Pin to a specific kubectl context if your kubeconfig has more than one.
# export HARMONY_K8S_CONTEXT=my-pico-okd
# ---------- Harmony state storage (per-example boilerplate) ------------------
# This example doesn't actually touch harmony_secret or the SQLite DB, but
# other examples in the workspace use this same boilerplate and these vars
# don't cost anything when unread.
export HARMONY_SECRET_NAMESPACE=install-rook-ceph
export HARMONY_SECRET_STORE=file
export HARMONY_DATABASE_URL=sqlite://harmony_install_rook_ceph.sqlite
# ---------- Logging ----------------------------------------------------------
# The RookCephClusterScore wait loops log every `ceph health` transition at
# info level; debug shows polling cadence.
export RUST_LOG=harmony=debug

View File

@@ -0,0 +1,136 @@
use harmony::{
inventory::Inventory,
modules::storage::ceph::{
RookCephClusterScore, RookCephOperatorScore,
ceph_validate_health_score::CephVerifyClusterHealth,
crd::{CephObjectStore, CephObjectStoreSpec, CephObjectStoreUser, GatewaySpec},
},
score::Score,
topology::K8sAnywhereTopology,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
// Uncomment to enable the edge-TLS Ingress for the S3 endpoint.
// use harmony::modules::k8s::resource::K8sResourceScore;
// use k8s_openapi::api::networking::v1::{
// HTTPIngressPath, HTTPIngressRuleValue, Ingress, IngressBackend, IngressRule,
// IngressServiceBackend, IngressSpec, IngressTLS, ServiceBackendPort,
// };
const NAMESPACE: &str = "rook-ceph";
const OBJECTSTORE_NAME: &str = "ceph-objectstore";
const S3_USER_NAME: &str = "harmony-s3";
// External S3 hostname for the Ingress. Edit this to match the DNS name
// you'll point at your cluster's ingress LB.
// const S3_HOSTNAME: &str = "s3.example.com";
// TLS Secret holding the cert+key for `S3_HOSTNAME`. The Secret must exist
// in `NAMESPACE` *before* this example runs (e.g. via cert-manager, or a
// manual `kubectl create secret tls`). The example does not create it —
// real cert material can't be shipped in a repo.
// const S3_TLS_SECRET: &str = "ceph-objectstore-tls";
#[tokio::main]
async fn main() {
let topology = K8sAnywhereTopology::from_env();
let inventory = Inventory::autoload();
let object_store = CephObjectStore {
metadata: ObjectMeta {
name: Some(OBJECTSTORE_NAME.to_string()),
namespace: Some(NAMESPACE.to_string()),
..ObjectMeta::default()
},
spec: CephObjectStoreSpec {
gateway: GatewaySpec {
instances: 2,
..GatewaySpec::default()
},
..CephObjectStoreSpec::default()
},
};
let s3_user = CephObjectStoreUser::for_store(
S3_USER_NAME,
NAMESPACE,
OBJECTSTORE_NAME,
"Harmony default S3 user",
);
let mut cluster = RookCephClusterScore::default_okd(NAMESPACE);
cluster.object_stores = vec![object_store];
cluster.object_store_users = vec![s3_user];
// Pico-OKD tuning: relax OSD backfill/recovery on a small cluster so
// recovery storms don't starve client I/O. These are equivalent to running
// `ceph config set osd <key> <value>` against the live cluster, but
// declarative — Rook reconciles them after the MONs reach quorum and
// re-applies on drift. Values are strings (Ceph parses them).
//
// Adjust or remove per your hardware. Not opinionated defaults — only
// wired here because the user's first target is a pico OKD.
cluster
.cluster
.spec
.set_config("osd.*", "osd_max_backfills", "1")
.set_config("osd.*", "osd_recovery_max_active", "1")
.set_config("osd.*", "osd_recovery_op_priority", "1")
.set_config("osd.*", "osd_mclock_profile", "high_client_ops");
// Alternative — disable mclock entirely if you suspect it's the problem:
// cluster.cluster.spec.set_config("osd.*", "osd_op_queue", "wpq");
// Edge-TLS Ingress in front of the RGW Service. Rook creates the Service
// as `rook-ceph-rgw-<store-name>` on the gateway port (8080 here).
// Uncomment this block (and its imports + consts above) to enable, then
// add the Box::new(K8sResourceScore::single(...)) entry to the `scores`
// vec below.
//
// let s3_ingress = Ingress {
// metadata: ObjectMeta {
// name: Some(format!("{OBJECTSTORE_NAME}-s3")),
// namespace: Some(NAMESPACE.to_string()),
// ..ObjectMeta::default()
// },
// spec: Some(IngressSpec {
// rules: Some(vec![IngressRule {
// host: Some(S3_HOSTNAME.to_string()),
// http: Some(HTTPIngressRuleValue {
// paths: vec![HTTPIngressPath {
// path: Some("/".to_string()),
// path_type: "Prefix".to_string(),
// backend: IngressBackend {
// service: Some(IngressServiceBackend {
// name: format!("rook-ceph-rgw-{OBJECTSTORE_NAME}"),
// port: Some(ServiceBackendPort {
// number: Some(8080),
// ..ServiceBackendPort::default()
// }),
// }),
// ..IngressBackend::default()
// },
// }],
// }),
// }]),
// tls: Some(vec![IngressTLS {
// hosts: Some(vec![S3_HOSTNAME.to_string()]),
// secret_name: Some(S3_TLS_SECRET.to_string()),
// }]),
// ..IngressSpec::default()
// }),
// ..Ingress::default()
// };
let scores: Vec<Box<dyn Score<K8sAnywhereTopology>>> = vec![
Box::new(RookCephOperatorScore::default_okd()),
Box::new(cluster),
Box::new(CephVerifyClusterHealth {
rook_ceph_namespace: NAMESPACE.to_string(),
}),
// Box::new(K8sResourceScore::single(s3_ingress, Some(NAMESPACE.to_string()))),
];
harmony_cli::run(inventory, topology, scores, None)
.await
.unwrap();
}

View File

@@ -1,6 +1,7 @@
use async_trait::async_trait;
use k8s_openapi::ResourceScope;
use kube::Resource;
use kube::api::DynamicObject;
use log::info;
use serde::{Serialize, de::DeserializeOwned};
@@ -17,6 +18,12 @@ use harmony_types::id::Id;
pub struct K8sResourceScore<K: Resource + std::fmt::Debug> {
pub resource: Vec<K>,
pub namespace: Option<String>,
/// When `true`, server-side apply is performed with `force=true`
/// (equivalent to `kubectl apply --force-conflicts`). Required when
/// applying a resource whose fields an operator has taken ownership of
/// (typical for CRs managed by reconciling operators like Rook). For
/// generic Kubernetes resources owned only by Harmony, leave `false`.
pub force_conflicts: bool,
}
impl<K: Resource + std::fmt::Debug> K8sResourceScore<K> {
@@ -24,8 +31,16 @@ impl<K: Resource + std::fmt::Debug> K8sResourceScore<K> {
Self {
resource: vec![resource],
namespace,
force_conflicts: false,
}
}
/// Consume `self` and return it with `force_conflicts` set. Chainable
/// after `single`: `K8sResourceScore::single(r, ns).with_force_conflicts(true)`.
pub fn with_force_conflicts(mut self, force: bool) -> Self {
self.force_conflicts = force;
self
}
}
impl<
@@ -102,16 +117,47 @@ where
.collect();
info!(
"Applying {} resources : {}",
"Applying {} resources : {} (force_conflicts={})",
resource_names.len(),
resource_names.join(", ")
resource_names.join(", "),
self.score.force_conflicts
);
topology
let k8s = topology
.k8s_client()
.await
.map_err(|e| InterpretError::new(format!("Failed to get k8s client : {e}")))?
.apply_many(&self.score.resource, self.score.namespace.as_deref())
.await?;
.map_err(|e| InterpretError::new(format!("Failed to get k8s client : {e}")))?;
if self.score.force_conflicts {
// apply_dynamic_many exposes the force_conflicts flag on
// PatchParams that the typed apply path does not. Round-trip each
// typed resource through JSON to land on a DynamicObject without
// duplicating the harmony-k8s private `to_dynamic` helper.
let dyn_objects: Vec<DynamicObject> = self
.score
.resource
.iter()
.map(|r| {
serde_json::to_value(r)
.map_err(|e| {
InterpretError::new(format!(
"Failed to serialize resource for force-apply: {e}"
))
})
.and_then(|v| {
serde_json::from_value::<DynamicObject>(v).map_err(|e| {
InterpretError::new(format!(
"Failed to convert resource to DynamicObject: {e}"
))
})
})
})
.collect::<Result<Vec<_>, _>>()?;
k8s.apply_dynamic_many(&dyn_objects, self.score.namespace.as_deref(), true)
.await?;
} else {
k8s.apply_many(&self.score.resource, self.score.namespace.as_deref())
.await?;
}
Ok(Outcome::success(
"Successfully applied resource".to_string(),

View File

@@ -0,0 +1,517 @@
use std::collections::BTreeMap;
use std::sync::Arc;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use harmony_k8s::K8sClient;
use k8s_openapi::api::storage::v1::StorageClass;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
use log::{debug, info, warn};
use serde::Serialize;
use crate::data::Version;
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
use crate::inventory::Inventory;
use crate::modules::k8s::resource::K8sResourceScore;
use crate::score::Score;
use crate::topology::{K8sclient, Topology};
use harmony_types::id::Id;
/// Total time we'll wait for the `rook-ceph-operator` Deployment to be ready
/// after `helm install` returns. Image pulls dominate the first run.
const OPERATOR_READY_TIMEOUT: Duration = Duration::from_secs(300);
/// Total time we'll wait for the `cephclusters.ceph.rook.io` CRD to be
/// registered in the API server's discovery surface. Helm 3 applies CRDs
/// before other resources but the cache can lag.
const CRD_READY_TIMEOUT: Duration = Duration::from_secs(60);
/// Total time we'll wait for the toolbox Deployment to come up before bailing.
const TOOLBOX_READY_TIMEOUT: Duration = Duration::from_secs(600);
/// Total time we'll wait for the Ceph cluster to reach `HEALTH_OK` after the
/// toolbox is up. Mons + mgrs + OSDs + initial PG peering on a small cluster
/// typically lands in 515 min; 20 covers slower hardware.
const HEALTH_OK_TIMEOUT: Duration = Duration::from_secs(1200);
/// Poll cadence for the toolbox + health waits. Cluster transitions on the
/// order of seconds, not subsecond — no benefit to a tighter loop.
const POLL_INTERVAL: Duration = Duration::from_secs(10);
use super::crd::{
CephBlockPool, CephBlockPoolSpec, CephCluster, CephFilesystem, CephObjectStore,
CephObjectStoreUser, FailureDomain, ReplicatedSpec,
};
use super::toolbox::toolbox_deployment;
/// Deploys a typed Rook-Ceph cluster: `CephCluster` + pools + filesystems +
/// object stores + their consumer `StorageClass`es. Assumes the Rook operator
/// is already installed (use [`super::RookCephOperatorScore`]).
///
/// Each Custom Resource is a real Rust type (`kube::CustomResource`-derived),
/// applied via `K8sResourceScore::single` — the same apply path used by CNPG.
///
/// # Ordering
/// Steps applied in this order:
/// 1. **Wait** for the `rook-ceph-operator` Deployment to be Ready (Helm
/// install returns before pods are up).
/// 2. **Wait** for `cephclusters.ceph.rook.io` CRD to be discoverable and
/// invalidate the kube-rs discovery cache.
/// 3. Apply `CephCluster`.
/// 4. Apply the `rook-ceph-tools` Deployment (typed, ported from the upstream
/// `deploy/examples/toolbox.yaml`). Image is read from the CephCluster
/// spec so it always matches the daemons.
/// 5. **Wait** for the `rook-ceph-tools` Deployment to be Ready.
/// 6. **Wait** for `ceph health` to return `HEALTH_OK` — mons in quorum, mgrs
/// up, OSDs bootstrapped, initial PGs peered. Typically 515 min on a
/// small cluster; capped at 20 min.
/// 7. Apply `CephBlockPool` resources + their RBD `StorageClass`es.
/// 8. Apply `CephFilesystem` resources + their CephFS `StorageClass`es.
/// 9. Apply `CephObjectStore` resources.
/// 10. Apply `CephObjectStoreUser` resources (Rook materializes their S3
/// credentials into a `rook-ceph-object-user-<store>-<user>` Secret per
/// user).
///
/// The waits in steps 1, 2, 5, and 6 mean this Score takes minutes, not
/// seconds, to return — but downstream Scores like `CephVerifyClusterHealth`
/// can rely on the cluster actually being ready when this returns.
#[derive(Debug, Clone, Serialize)]
pub struct RookCephClusterScore {
pub namespace: String,
pub cluster: CephCluster,
pub block_pools: Vec<CephBlockPool>,
pub filesystems: Vec<CephFilesystem>,
pub object_stores: Vec<CephObjectStore>,
pub object_store_users: Vec<CephObjectStoreUser>,
/// Mark the first block-pool StorageClass as the cluster default.
pub default_block_pool_storage_class: bool,
}
impl RookCephClusterScore {
/// OKD-friendly defaults: 3-mon, 2-mgr, dashboard on 8443/ssl, replicated
/// pools size=3, useAllNodes + useAllDevices, dataDirHostPath=/var/lib/rook,
/// one CephBlockPool named "replicapool", no filesystem or object store.
pub fn default_okd(namespace: impl Into<String>) -> Self {
let ns = namespace.into();
let mut cluster = CephCluster::default();
cluster.metadata.name = Some(ns.clone());
cluster.metadata.namespace = Some(ns.clone());
cluster.spec.storage.use_all_nodes = true;
cluster.spec.storage.use_all_devices = true;
let block_pool = CephBlockPool {
metadata: ObjectMeta {
name: Some("replicapool".to_string()),
namespace: Some(ns.clone()),
..ObjectMeta::default()
},
spec: CephBlockPoolSpec {
replicated: Some(ReplicatedSpec::default()),
failure_domain: Some(FailureDomain::Host),
..CephBlockPoolSpec::default()
},
};
Self {
namespace: ns,
cluster,
block_pools: vec![block_pool],
filesystems: vec![],
object_stores: vec![],
object_store_users: vec![],
default_block_pool_storage_class: true,
}
}
}
impl<T: Topology + K8sclient + 'static> Score<T> for RookCephClusterScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(RookCephClusterInterpret {
score: self.clone(),
})
}
fn name(&self) -> String {
format!("RookCephClusterScore({})", self.namespace)
}
}
#[derive(Debug)]
struct RookCephClusterInterpret {
score: RookCephClusterScore,
}
#[async_trait]
impl<T: Topology + K8sclient + 'static> Interpret<T> for RookCephClusterInterpret {
async fn execute(
&self,
inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let ns = self.score.namespace.clone();
let k8s = topology
.k8s_client()
.await
.map_err(|e| InterpretError::new(format!("Failed to get k8s client: {e}")))?;
// `helm install` (in RookCephOperatorScore) returns once resources are
// created — not once the operator Deployment is Ready or the CRDs are
// registered in the API server's discovery cache. Without these waits,
// the CephCluster apply below races and typically fails with
// "Cannot resolve GVK ceph.rook.io/v1/CephCluster".
info!(
"[Rook-Ceph] Waiting for rook-ceph-operator deployment in '{}'",
ns
);
k8s.wait_until_deployment_ready(
"rook-ceph-operator",
Some(&ns),
Some(OPERATOR_READY_TIMEOUT),
)
.await
.map_err(|e| InterpretError::new(format!("rook-ceph-operator not ready: {e}")))?;
info!("[Rook-Ceph] Waiting for cephclusters.ceph.rook.io CRD registration");
k8s.wait_for_crd("cephclusters.ceph.rook.io", Some(CRD_READY_TIMEOUT))
.await
.map_err(|e| InterpretError::new(format!("CephCluster CRD not registered: {e}")))?;
// The discovery cache on our shared K8sClient was populated before the
// operator install added the Rook CRDs. Force a refresh so the next
// apply can resolve the new GVK.
k8s.invalidate_discovery().await;
info!("[Rook-Ceph] Applying CephCluster '{}'", ns);
K8sResourceScore::single(self.score.cluster.clone(), Some(ns.clone()))
.with_force_conflicts(true)
.interpret(inventory, topology)
.await?;
// Rook v1.19 no longer deploys the toolbox via the operator Helm chart;
// we ship it as a typed Deployment ported from the upstream
// `deploy/examples/toolbox.yaml`. Image follows whatever CephCluster
// is configured with — keeps the toolbox in lockstep with the daemons.
let toolbox_image = &self.score.cluster.spec.ceph_version.image;
info!(
"[Rook-Ceph] Applying rook-ceph-tools Deployment (image='{}')",
toolbox_image
);
K8sResourceScore::single(
toolbox_deployment(&ns, toolbox_image),
Some(ns.clone()),
)
.interpret(inventory, topology)
.await?;
wait_for_toolbox_ready(&k8s, &ns).await?;
wait_for_health_ok(&k8s, &ns).await?;
for (idx, pool) in self.score.block_pools.iter().enumerate() {
let pool_name = pool
.metadata
.name
.clone()
.unwrap_or_else(|| format!("pool-{idx}"));
info!("[Rook-Ceph] Applying CephBlockPool '{}'", pool_name);
K8sResourceScore::single(pool.clone(), Some(ns.clone()))
.with_force_conflicts(true)
.interpret(inventory, topology)
.await?;
let sc = rbd_storage_class(
&pool_name,
&ns,
idx == 0 && self.score.default_block_pool_storage_class,
);
info!(
"[Rook-Ceph] Applying StorageClass '{}' (RBD on pool '{}')",
sc.metadata.name.as_deref().unwrap_or("?"),
pool_name
);
K8sResourceScore::single(sc, None)
.interpret(inventory, topology)
.await?;
}
for fs in self.score.filesystems.iter() {
let fs_name = fs
.metadata
.name
.clone()
.unwrap_or_else(|| "cephfs".to_string());
info!("[Rook-Ceph] Applying CephFilesystem '{}'", fs_name);
K8sResourceScore::single(fs.clone(), Some(ns.clone()))
.with_force_conflicts(true)
.interpret(inventory, topology)
.await?;
let sc = cephfs_storage_class(&fs_name, &ns);
info!(
"[Rook-Ceph] Applying StorageClass '{}' (CephFS on filesystem '{}')",
sc.metadata.name.as_deref().unwrap_or("?"),
fs_name
);
K8sResourceScore::single(sc, None)
.interpret(inventory, topology)
.await?;
}
for store in self.score.object_stores.iter() {
let store_name = store
.metadata
.name
.clone()
.unwrap_or_else(|| "object-store".to_string());
info!("[Rook-Ceph] Applying CephObjectStore '{}'", store_name);
K8sResourceScore::single(store.clone(), Some(ns.clone()))
.with_force_conflicts(true)
.interpret(inventory, topology)
.await?;
}
for user in self.score.object_store_users.iter() {
let user_name = user
.metadata
.name
.clone()
.unwrap_or_else(|| "object-store-user".to_string());
info!(
"[Rook-Ceph] Applying CephObjectStoreUser '{}' (store='{}'); \
credentials will appear in Secret '{}'",
user_name,
user.spec.store,
user.credentials_secret_name(),
);
K8sResourceScore::single(user.clone(), Some(ns.clone()))
.with_force_conflicts(true)
.interpret(inventory, topology)
.await?;
}
Ok(Outcome::success(format!(
"Applied Rook-Ceph cluster '{}' ({} block-pool(s), {} filesystem(s), {} object-store(s), {} object-store-user(s))",
ns,
self.score.block_pools.len(),
self.score.filesystems.len(),
self.score.object_stores.len(),
self.score.object_store_users.len(),
)))
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("RookCephClusterInterpret")
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
/// Poll the `rook-ceph-tools` Deployment until it has ≥1 ready replica. This
/// is a prerequisite for any `ceph` CLI call against the cluster — including
/// the HEALTH_OK wait below, and the existing `CephVerifyClusterHealth` Score.
///
/// `exec_app_capture_output` panics (`.expect("No matching pod")`) if called
/// when no toolbox pod exists yet — so we must gate exec on this first.
async fn wait_for_toolbox_ready(client: &Arc<K8sClient>, ns: &str) -> Result<(), InterpretError> {
let toolbox = "rook-ceph-tools";
info!(
"[Rook-Ceph] Waiting for '{}' deployment in '{}' (up to {}s)",
toolbox,
ns,
TOOLBOX_READY_TIMEOUT.as_secs()
);
let start = Instant::now();
loop {
match client.get_deployment(toolbox, Some(ns)).await {
Ok(Some(dep)) => {
let ready = dep
.status
.as_ref()
.and_then(|s| s.ready_replicas)
.unwrap_or(0);
if ready >= 1 {
info!("[Rook-Ceph] '{}' is ready ({} replica(s))", toolbox, ready);
return Ok(());
}
debug!("[Rook-Ceph] '{}' present but 0 ready replicas", toolbox);
}
Ok(None) => debug!("[Rook-Ceph] '{}' deployment not yet created", toolbox),
Err(e) => debug!("[Rook-Ceph] error checking '{}': {e}", toolbox),
}
if start.elapsed() > TOOLBOX_READY_TIMEOUT {
return Err(InterpretError::new(format!(
"Timed out after {}s waiting for '{}' deployment to be ready in '{}'. \
Is the operator running with toolbox.enabled=true?",
TOOLBOX_READY_TIMEOUT.as_secs(),
toolbox,
ns,
)));
}
tokio::time::sleep(POLL_INTERVAL).await;
}
}
/// Poll `ceph health` via the toolbox until it returns `HEALTH_OK`. Fresh
/// clusters typically sit in `HEALTH_WARN` for a few minutes while mons reach
/// quorum, mgrs come up, and OSDs bootstrap their PGs. Returning before
/// `HEALTH_OK` would race with the subsequent block-pool / object-store /
/// user applies and with `CephVerifyClusterHealth`.
async fn wait_for_health_ok(client: &Arc<K8sClient>, ns: &str) -> Result<(), InterpretError> {
info!(
"[Rook-Ceph] Waiting for cluster to reach HEALTH_OK in '{}' (up to {}s)",
ns,
HEALTH_OK_TIMEOUT.as_secs()
);
let start = Instant::now();
let mut last_status = String::new();
loop {
match client
.exec_app_capture_output(
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(ns),
vec!["sh", "-c", "ceph health"],
)
.await
{
Ok(out) => {
let trimmed = out.trim().to_string();
if trimmed.starts_with("HEALTH_OK") {
info!("[Rook-Ceph] Cluster reached HEALTH_OK");
return Ok(());
}
if trimmed != last_status {
info!("[Rook-Ceph] ceph health: {}", trimmed);
last_status = trimmed;
}
}
Err(e) => debug!("[Rook-Ceph] ceph health exec failed: {e}"),
}
if start.elapsed() > HEALTH_OK_TIMEOUT {
warn!(
"[Rook-Ceph] Last ceph health output before timeout: {}",
last_status
);
return Err(InterpretError::new(format!(
"Timed out after {}s waiting for HEALTH_OK in '{}'. Last status: '{}'",
HEALTH_OK_TIMEOUT.as_secs(),
ns,
last_status,
)));
}
tokio::time::sleep(POLL_INTERVAL).await;
}
}
fn rbd_storage_class(pool_name: &str, ns: &str, is_default: bool) -> StorageClass {
let mut params = BTreeMap::new();
params.insert("clusterID".to_string(), ns.to_string());
params.insert("pool".to_string(), pool_name.to_string());
params.insert("imageFormat".to_string(), "2".to_string());
params.insert("imageFeatures".to_string(), "layering".to_string());
params.insert(
"csi.storage.k8s.io/provisioner-secret-name".to_string(),
"rook-csi-rbd-provisioner".to_string(),
);
params.insert(
"csi.storage.k8s.io/provisioner-secret-namespace".to_string(),
ns.to_string(),
);
params.insert(
"csi.storage.k8s.io/controller-expand-secret-name".to_string(),
"rook-csi-rbd-provisioner".to_string(),
);
params.insert(
"csi.storage.k8s.io/controller-expand-secret-namespace".to_string(),
ns.to_string(),
);
params.insert(
"csi.storage.k8s.io/node-stage-secret-name".to_string(),
"rook-csi-rbd-node".to_string(),
);
params.insert(
"csi.storage.k8s.io/node-stage-secret-namespace".to_string(),
ns.to_string(),
);
params.insert("csi.storage.k8s.io/fstype".to_string(), "ext4".to_string());
let mut annotations = BTreeMap::new();
if is_default {
annotations.insert(
"storageclass.kubernetes.io/is-default-class".to_string(),
"true".to_string(),
);
}
StorageClass {
metadata: ObjectMeta {
name: Some(format!("rook-ceph-block-{pool_name}")),
annotations: if annotations.is_empty() {
None
} else {
Some(annotations)
},
..ObjectMeta::default()
},
provisioner: format!("{ns}.rbd.csi.ceph.com"),
parameters: Some(params),
reclaim_policy: Some("Delete".to_string()),
allow_volume_expansion: Some(true),
volume_binding_mode: Some("Immediate".to_string()),
..StorageClass::default()
}
}
fn cephfs_storage_class(fs_name: &str, ns: &str) -> StorageClass {
let mut params = BTreeMap::new();
params.insert("clusterID".to_string(), ns.to_string());
params.insert("fsName".to_string(), fs_name.to_string());
params.insert("pool".to_string(), format!("{fs_name}-data0"));
params.insert(
"csi.storage.k8s.io/provisioner-secret-name".to_string(),
"rook-csi-cephfs-provisioner".to_string(),
);
params.insert(
"csi.storage.k8s.io/provisioner-secret-namespace".to_string(),
ns.to_string(),
);
params.insert(
"csi.storage.k8s.io/controller-expand-secret-name".to_string(),
"rook-csi-cephfs-provisioner".to_string(),
);
params.insert(
"csi.storage.k8s.io/controller-expand-secret-namespace".to_string(),
ns.to_string(),
);
params.insert(
"csi.storage.k8s.io/node-stage-secret-name".to_string(),
"rook-csi-cephfs-node".to_string(),
);
params.insert(
"csi.storage.k8s.io/node-stage-secret-namespace".to_string(),
ns.to_string(),
);
StorageClass {
metadata: ObjectMeta {
name: Some(format!("rook-cephfs-{fs_name}")),
..ObjectMeta::default()
},
provisioner: format!("{ns}.cephfs.csi.ceph.com"),
parameters: Some(params),
reclaim_policy: Some("Delete".to_string()),
allow_volume_expansion: Some(true),
volume_binding_mode: Some("Immediate".to_string()),
..StorageClass::default()
}
}

View File

@@ -0,0 +1,53 @@
use std::collections::BTreeMap;
use kube::{CustomResource, api::ObjectMeta};
use serde::{Deserialize, Serialize};
use super::shared::{ErasureCodedSpec, FailureDomain, ReplicatedSpec};
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
#[kube(
group = "ceph.rook.io",
version = "v1",
kind = "CephBlockPool",
plural = "cephblockpools",
namespaced = true,
schema = "disabled"
)]
#[serde(rename_all = "camelCase")]
pub struct CephBlockPoolSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub replicated: Option<ReplicatedSpec>,
#[serde(skip_serializing_if = "Option::is_none")]
pub erasure_coded: Option<ErasureCodedSpec>,
#[serde(skip_serializing_if = "Option::is_none")]
pub failure_domain: Option<FailureDomain>,
#[serde(skip_serializing_if = "Option::is_none")]
pub device_class: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parameters: Option<BTreeMap<String, String>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub enable_rbd_stats: Option<bool>,
}
impl Default for CephBlockPool {
fn default() -> Self {
Self {
metadata: ObjectMeta::default(),
spec: CephBlockPoolSpec::default(),
}
}
}
impl Default for CephBlockPoolSpec {
fn default() -> Self {
Self {
replicated: Some(ReplicatedSpec::default()),
erasure_coded: None,
failure_domain: Some(FailureDomain::Host),
device_class: None,
parameters: None,
enable_rbd_stats: None,
}
}
}

View File

@@ -0,0 +1,295 @@
use std::collections::BTreeMap;
use kube::{CustomResource, api::ObjectMeta};
use serde::{Deserialize, Serialize};
use super::shared::{PlacementSpec, VolumeClaimTemplate};
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
#[kube(
group = "ceph.rook.io",
version = "v1",
kind = "CephCluster",
plural = "cephclusters",
namespaced = true,
schema = "disabled"
)]
#[serde(rename_all = "camelCase")]
pub struct CephClusterSpec {
pub ceph_version: CephVersionSpec,
pub data_dir_host_path: String,
pub mon: MonSpec,
pub mgr: MgrSpec,
pub dashboard: DashboardSpec,
pub storage: StorageSpec,
#[serde(skip_serializing_if = "Option::is_none")]
pub network: Option<NetworkSpec>,
#[serde(skip_serializing_if = "BTreeMap::is_empty", default)]
pub placement: BTreeMap<String, PlacementSpec>,
#[serde(skip_serializing_if = "Option::is_none")]
pub crash_collector: Option<CrashCollectorSpec>,
#[serde(skip_serializing_if = "Option::is_none")]
pub log_collector: Option<LogCollectorSpec>,
#[serde(skip_serializing_if = "Option::is_none")]
pub remove_osds_if_out_and_safe_to_remove: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub continue_upgrade_after_checks_even_if_not_healthy: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub skip_upgrade_checks: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub wait_timeout_for_healthy_osd_in_minutes: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub disruption_management: Option<DisruptionManagementSpec>,
/// Centralized Ceph config applied by the operator after MONs reach quorum.
/// Outer key is the "WHO" target (e.g. `"global"`, `"osd.*"`, `"mgr.*"`,
/// `"client.rgw.<store>"`, or a specific daemon like `"osd.0"`). Inner map
/// is `option-name -> string-value` — Rook calls `ceph config set <who>
/// <key> <value>` for each entry. Values must be strings even for numerics
/// and booleans; Ceph parses them. Rook does not validate the keys.
///
/// Prefer [`CephClusterSpec::set_config`] for ergonomic insertion.
#[serde(skip_serializing_if = "Option::is_none")]
pub ceph_config: Option<BTreeMap<String, BTreeMap<String, String>>>,
}
impl CephClusterSpec {
/// Set a single centralized Ceph config entry (`ceph config set <who>
/// <key> <value>`). Chainable. Creates the `ceph_config` map and the
/// per-WHO sub-map on demand.
///
/// Common `who` values: `"global"`, `"osd.*"`, `"mon.*"`, `"mgr.*"`,
/// `"client.rgw.<store-name>"`, or a specific daemon like `"osd.0"`.
pub fn set_config(
&mut self,
who: impl Into<String>,
key: impl Into<String>,
value: impl Into<String>,
) -> &mut Self {
self.ceph_config
.get_or_insert_with(BTreeMap::new)
.entry(who.into())
.or_default()
.insert(key.into(), value.into());
self
}
}
impl Default for CephCluster {
fn default() -> Self {
Self {
metadata: ObjectMeta::default(),
spec: CephClusterSpec::default(),
}
}
}
impl Default for CephClusterSpec {
fn default() -> Self {
Self {
ceph_version: CephVersionSpec::default(),
data_dir_host_path: "/var/lib/rook".to_string(),
mon: MonSpec::default(),
mgr: MgrSpec::default(),
dashboard: DashboardSpec::default(),
storage: StorageSpec::default(),
network: None,
placement: BTreeMap::new(),
crash_collector: None,
log_collector: None,
remove_osds_if_out_and_safe_to_remove: None,
continue_upgrade_after_checks_even_if_not_healthy: None,
skip_upgrade_checks: None,
wait_timeout_for_healthy_osd_in_minutes: None,
disruption_management: None,
ceph_config: None,
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct CephVersionSpec {
pub image: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub allow_unsupported: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub image_pull_policy: Option<String>,
}
impl Default for CephVersionSpec {
fn default() -> Self {
// Pinned to the full version+build tag explicitly recommended for
// production by the Rook 1.19 docs (Upgrade/ceph-upgrade). The
// date-stamped suffix locks the exact container image so the cluster
// can't go heterogeneous across daemon restarts.
Self {
image: "quay.io/ceph/ceph:v19.2.3-20250717".to_string(),
allow_unsupported: Some(false),
image_pull_policy: Some("IfNotPresent".to_string()),
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MonSpec {
pub count: u32,
pub allow_multiple_per_node: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub volume_claim_template: Option<VolumeClaimTemplate>,
#[serde(skip_serializing_if = "Option::is_none")]
pub failure_domain_label: Option<String>,
}
impl Default for MonSpec {
fn default() -> Self {
Self {
count: 3,
allow_multiple_per_node: false,
volume_claim_template: None,
failure_domain_label: None,
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MgrSpec {
pub count: u32,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub modules: Vec<ModuleSpec>,
}
impl Default for MgrSpec {
fn default() -> Self {
Self {
count: 2,
modules: vec![ModuleSpec {
name: "pg_autoscaler".to_string(),
enabled: true,
}],
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct ModuleSpec {
pub name: String,
pub enabled: bool,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct DashboardSpec {
pub enabled: bool,
pub ssl: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub port: Option<u16>,
#[serde(skip_serializing_if = "Option::is_none")]
pub url_prefix: Option<String>,
}
impl Default for DashboardSpec {
fn default() -> Self {
Self {
enabled: true,
ssl: true,
port: Some(8443),
url_prefix: None,
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct StorageSpec {
pub use_all_nodes: bool,
pub use_all_devices: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub device_filter: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub device_path_filter: Option<String>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub nodes: Vec<NodeSpec>,
#[serde(skip_serializing_if = "Option::is_none")]
pub config: Option<BTreeMap<String, String>>,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub storage_class_device_sets: Vec<StorageClassDeviceSet>,
#[serde(skip_serializing_if = "Option::is_none")]
pub allow_device_class_update: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub allow_osd_crush_weight_update: Option<bool>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct NodeSpec {
pub name: String,
#[serde(skip_serializing_if = "Vec::is_empty", default)]
pub devices: Vec<DeviceSpec>,
#[serde(skip_serializing_if = "Option::is_none")]
pub config: Option<BTreeMap<String, String>>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct DeviceSpec {
pub name: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub config: Option<BTreeMap<String, String>>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct StorageClassDeviceSet {
pub name: String,
pub count: u32,
pub portable: bool,
pub volume_claim_templates: Vec<VolumeClaimTemplate>,
#[serde(skip_serializing_if = "Option::is_none")]
pub placement: Option<PlacementSpec>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct NetworkSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub provider: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub host_network: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub dual_stack: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ipv4: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ipv6: Option<bool>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct CrashCollectorSpec {
pub disable: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub days_to_retain: Option<u32>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct LogCollectorSpec {
pub enabled: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub periodicity: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_log_size: Option<String>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct DisruptionManagementSpec {
pub manage_pod_budgets: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub osd_maintenance_timeout: Option<u32>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pg_health_check_timeout: Option<u32>,
}

View File

@@ -0,0 +1,54 @@
use kube::{CustomResource, api::ObjectMeta};
use serde::{Deserialize, Serialize};
use super::shared::{MetadataServerSpec, NamedPoolSpec, PoolSpec, ReplicatedSpec};
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
#[kube(
group = "ceph.rook.io",
version = "v1",
kind = "CephFilesystem",
plural = "cephfilesystems",
namespaced = true,
schema = "disabled"
)]
#[serde(rename_all = "camelCase")]
pub struct CephFilesystemSpec {
pub metadata_pool: PoolSpec,
pub data_pools: Vec<NamedPoolSpec>,
pub metadata_server: MetadataServerSpec,
#[serde(skip_serializing_if = "Option::is_none")]
pub preserve_filesystem_on_delete: Option<bool>,
#[serde(skip_serializing_if = "Option::is_none")]
pub preserve_pools_on_delete: Option<bool>,
}
impl Default for CephFilesystem {
fn default() -> Self {
Self {
metadata: ObjectMeta::default(),
spec: CephFilesystemSpec::default(),
}
}
}
impl Default for CephFilesystemSpec {
fn default() -> Self {
Self {
metadata_pool: PoolSpec {
replicated: Some(ReplicatedSpec::default()),
..PoolSpec::default()
},
data_pools: vec![NamedPoolSpec {
name: "data0".to_string(),
spec: PoolSpec {
replicated: Some(ReplicatedSpec::default()),
..PoolSpec::default()
},
}],
metadata_server: MetadataServerSpec::default(),
preserve_filesystem_on_delete: Some(true),
preserve_pools_on_delete: Some(false),
}
}
}

View File

@@ -0,0 +1,13 @@
pub mod block_pool;
pub mod cluster;
pub mod filesystem;
pub mod object_store;
pub mod object_store_user;
pub mod shared;
pub use block_pool::*;
pub use cluster::*;
pub use filesystem::*;
pub use object_store::*;
pub use object_store_user::*;
pub use shared::*;

View File

@@ -0,0 +1,74 @@
use kube::{CustomResource, api::ObjectMeta};
use serde::{Deserialize, Serialize};
use super::shared::{PlacementSpec, PoolSpec, ReplicatedSpec};
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
#[kube(
group = "ceph.rook.io",
version = "v1",
kind = "CephObjectStore",
plural = "cephobjectstores",
namespaced = true,
schema = "disabled"
)]
#[serde(rename_all = "camelCase")]
pub struct CephObjectStoreSpec {
pub metadata_pool: PoolSpec,
pub data_pool: PoolSpec,
pub gateway: GatewaySpec,
#[serde(skip_serializing_if = "Option::is_none")]
pub preserve_pools_on_delete: Option<bool>,
}
impl Default for CephObjectStore {
fn default() -> Self {
Self {
metadata: ObjectMeta::default(),
spec: CephObjectStoreSpec::default(),
}
}
}
impl Default for CephObjectStoreSpec {
fn default() -> Self {
Self {
metadata_pool: PoolSpec {
replicated: Some(ReplicatedSpec::default()),
..PoolSpec::default()
},
data_pool: PoolSpec {
replicated: Some(ReplicatedSpec::default()),
..PoolSpec::default()
},
gateway: GatewaySpec::default(),
preserve_pools_on_delete: Some(false),
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct GatewaySpec {
/// Non-secure gateway port. Defaults to 8080 to dodge OKD's <1024 bind restriction.
pub port: u16,
#[serde(skip_serializing_if = "Option::is_none")]
pub secure_port: Option<u16>,
pub instances: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub placement: Option<PlacementSpec>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ssl_certificate_ref: Option<String>,
}
impl Default for GatewaySpec {
fn default() -> Self {
Self {
port: 8080,
secure_port: None,
instances: 1,
placement: None,
ssl_certificate_ref: None,
}
}
}

View File

@@ -0,0 +1,128 @@
use kube::{CustomResource, api::ObjectMeta};
use serde::{Deserialize, Serialize};
/// `CephObjectStoreUser` — provisions an RGW S3 user against a `CephObjectStore`.
///
/// Rook auto-creates a Kubernetes Secret named
/// `rook-ceph-object-user-<store>-<user>` containing the user's access
/// credentials, with these base64-encoded keys:
/// - `AccessKey` — S3 access key ID (20 bytes)
/// - `SecretKey` — S3 secret access key (40 bytes)
///
/// The Secret lives in the same namespace as the user.
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
#[kube(
group = "ceph.rook.io",
version = "v1",
kind = "CephObjectStoreUser",
plural = "cephobjectstoreusers",
namespaced = true,
schema = "disabled"
)]
#[serde(rename_all = "camelCase")]
pub struct CephObjectStoreUserSpec {
/// Name of the `CephObjectStore` this user belongs to.
pub store: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub display_name: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub cluster_namespace: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub quotas: Option<UserQuotas>,
#[serde(skip_serializing_if = "Option::is_none")]
pub capabilities: Option<UserCapabilities>,
}
impl Default for CephObjectStoreUser {
fn default() -> Self {
Self {
metadata: ObjectMeta::default(),
spec: CephObjectStoreUserSpec::default(),
}
}
}
impl Default for CephObjectStoreUserSpec {
fn default() -> Self {
Self {
store: String::new(),
display_name: None,
cluster_namespace: None,
quotas: None,
capabilities: None,
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct UserQuotas {
#[serde(skip_serializing_if = "Option::is_none")]
pub max_buckets: Option<i32>,
/// Resource-quantity string, e.g. `"10G"`, `"500M"`.
#[serde(skip_serializing_if = "Option::is_none")]
pub max_size: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub max_objects: Option<i64>,
}
/// RGW capability strings — each accepts `"read"`, `"write"`, `"read, write"`,
/// or `"*"`.
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct UserCapabilities {
#[serde(skip_serializing_if = "Option::is_none")]
pub user: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub bucket: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub usage: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub metadata: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub zone: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub roles: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub info: Option<String>,
#[serde(rename = "amz-cache", skip_serializing_if = "Option::is_none")]
pub amz_cache: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub bilog: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub mdlog: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub datalog: Option<String>,
#[serde(rename = "user-policy", skip_serializing_if = "Option::is_none")]
pub user_policy: Option<String>,
#[serde(rename = "odic-provider", skip_serializing_if = "Option::is_none")]
pub odic_provider: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub ratelimit: Option<String>,
}
impl CephObjectStoreUser {
/// Convenience constructor: a user attached to `store` with the given
/// display name. Capabilities and quotas left unset (RGW defaults apply).
pub fn for_store(name: &str, namespace: &str, store: &str, display_name: &str) -> Self {
Self {
metadata: ObjectMeta {
name: Some(name.to_string()),
namespace: Some(namespace.to_string()),
..ObjectMeta::default()
},
spec: CephObjectStoreUserSpec {
store: store.to_string(),
display_name: Some(display_name.to_string()),
..CephObjectStoreUserSpec::default()
},
}
}
/// Name of the auto-generated Secret carrying this user's `AccessKey` /
/// `SecretKey` (base64-encoded).
pub fn credentials_secret_name(&self) -> String {
let user = self.metadata.name.as_deref().unwrap_or("");
format!("rook-ceph-object-user-{}-{}", self.spec.store, user)
}
}

View File

@@ -0,0 +1,117 @@
use std::collections::BTreeMap;
use k8s_openapi::api::core::v1::{PersistentVolumeClaim, Toleration};
use serde::{Deserialize, Serialize};
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct PoolSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub replicated: Option<ReplicatedSpec>,
#[serde(skip_serializing_if = "Option::is_none")]
pub erasure_coded: Option<ErasureCodedSpec>,
#[serde(skip_serializing_if = "Option::is_none")]
pub failure_domain: Option<FailureDomain>,
#[serde(skip_serializing_if = "Option::is_none")]
pub device_class: Option<String>,
#[serde(skip_serializing_if = "Option::is_none")]
pub parameters: Option<BTreeMap<String, String>>,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct ReplicatedSpec {
pub size: u32,
#[serde(skip_serializing_if = "Option::is_none")]
pub require_safe_replica_size: Option<bool>,
}
impl Default for ReplicatedSpec {
fn default() -> Self {
Self {
size: 3,
require_safe_replica_size: Some(true),
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct ErasureCodedSpec {
pub data_chunks: u32,
pub coding_chunks: u32,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "lowercase")]
pub enum FailureDomain {
Osd,
Host,
Rack,
Zone,
Region,
}
impl Default for FailureDomain {
fn default() -> Self {
Self::Host
}
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct NamedPoolSpec {
pub name: String,
#[serde(flatten)]
pub spec: PoolSpec,
}
#[derive(Deserialize, Serialize, Clone, Debug)]
#[serde(rename_all = "camelCase")]
pub struct MetadataServerSpec {
pub active_count: u32,
pub active_standby: bool,
#[serde(skip_serializing_if = "Option::is_none")]
pub placement: Option<PlacementSpec>,
}
impl Default for MetadataServerSpec {
fn default() -> Self {
Self {
active_count: 1,
active_standby: true,
placement: None,
}
}
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct PlacementSpec {
#[serde(skip_serializing_if = "Option::is_none")]
pub node_affinity: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pod_affinity: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub pod_anti_affinity: Option<serde_json::Value>,
#[serde(skip_serializing_if = "Option::is_none")]
pub tolerations: Option<Vec<Toleration>>,
#[serde(skip_serializing_if = "Option::is_none")]
pub topology_spread_constraints: Option<serde_json::Value>,
}
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
#[serde(rename_all = "camelCase")]
pub struct VolumeClaimTemplate {
pub metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta,
pub spec: k8s_openapi::api::core::v1::PersistentVolumeClaimSpec,
}
impl From<PersistentVolumeClaim> for VolumeClaimTemplate {
fn from(pvc: PersistentVolumeClaim) -> Self {
Self {
metadata: pvc.metadata,
spec: pvc.spec.unwrap_or_default(),
}
}
}

View File

@@ -1,2 +1,9 @@
pub mod ceph_remove_osd_score;
pub mod ceph_validate_health_score;
pub mod cluster_score;
pub mod crd;
pub mod operator_score;
pub mod toolbox;
pub use cluster_score::*;
pub use operator_score::*;

View File

@@ -0,0 +1,128 @@
use std::collections::HashMap;
use std::str::FromStr;
use non_blank_string_rs::NonBlankString;
use serde::Serialize;
use crate::interpret::Interpret;
use crate::modules::helm::chart::{HelmChartScore, HelmRepository};
use crate::score::Score;
use crate::topology::{HelmCommand, Topology};
use harmony_macros::hurl;
/// Install the Rook-Ceph operator via its upstream Helm chart.
///
/// The Rook docs explicitly recommend Helm on OpenShift/OKD because the chart
/// automatically creates the `SecurityContextConstraints` resources OKD requires.
/// This Score wraps `HelmChartScore` against `https://charts.rook.io/release`.
///
/// The chart installs:
/// - The `rook-ceph` operator Deployment
/// - All `ceph.rook.io/v1` CRDs (CephCluster, CephBlockPool, CephFilesystem, CephObjectStore, ...)
/// - RBAC (ServiceAccounts, Roles, ClusterRoles, RoleBindings, ClusterRoleBindings)
/// - OpenShift SCC bindings when `hostpath_requires_privileged` is true
///
/// The chart does **not** install the `rook-ceph-tools` (toolbox) pod in
/// Rook v1.19 — that's now a standalone manifest in `deploy/examples/`.
/// `RookCephClusterScore` deploys it as a typed Rust Deployment via
/// [`super::toolbox::toolbox_deployment`].
///
/// The CRs that consume these CRDs are deployed separately by
/// `RookCephClusterScore` (typed Rust structs applied via `K8sResourceScore`),
/// preserving compile-time type-safety on the user-facing surface.
///
/// # OKD requirements
/// - `hostpath_requires_privileged` must be `true` — OpenShift's SELinux
/// restricted-set blocks hostPath writes otherwise.
///
/// # Usage
/// ```ignore
/// use harmony::modules::storage::ceph::RookCephOperatorScore;
/// let score = RookCephOperatorScore::default_okd();
/// ```
#[derive(Debug, Clone, Serialize)]
pub struct RookCephOperatorScore {
pub namespace: String,
pub chart_version: Option<String>,
pub hostpath_requires_privileged: bool,
pub enable_rbd_driver: bool,
pub enable_cephfs_driver: bool,
}
impl RookCephOperatorScore {
/// OKD-friendly defaults: `rook-ceph` namespace, both CSI drivers enabled,
/// hostPath privileged mode on. Pinned to Rook 1.19.5 — the latest stable
/// release at the time this Score was written, and the pair tested
/// against the Ceph image default in
/// [`super::crd::CephVersionSpec::default`].
pub fn default_okd() -> Self {
Self {
namespace: "rook-ceph".to_string(),
chart_version: Some("v1.19.5".to_string()),
hostpath_requires_privileged: true,
enable_rbd_driver: true,
enable_cephfs_driver: true,
}
}
/// K3s / vanilla-K8s defaults — same as OKD but without the privileged flag.
pub fn default_k8s() -> Self {
Self {
hostpath_requires_privileged: false,
..Self::default_okd()
}
}
}
impl Default for RookCephOperatorScore {
fn default() -> Self {
Self::default_okd()
}
}
impl<T: Topology + HelmCommand> Score<T> for RookCephOperatorScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
let mut values: HashMap<NonBlankString, String> = HashMap::new();
values.insert(
NonBlankString::from_str("csi.enableRbdDriver").unwrap(),
self.enable_rbd_driver.to_string(),
);
values.insert(
NonBlankString::from_str("csi.enableCephfsDriver").unwrap(),
self.enable_cephfs_driver.to_string(),
);
if self.hostpath_requires_privileged {
values.insert(
NonBlankString::from_str("hostpathRequiresPrivileged").unwrap(),
"true".to_string(),
);
}
let chart_version = self
.chart_version
.as_ref()
.map(|v| NonBlankString::from_str(v).expect("chart_version must be non-blank"));
let helm_score = HelmChartScore {
namespace: Some(NonBlankString::from_str(&self.namespace).unwrap()),
release_name: NonBlankString::from_str("rook-ceph").unwrap(),
chart_name: NonBlankString::from_str("rook-release/rook-ceph").unwrap(),
chart_version,
values_overrides: Some(values),
values_yaml: None,
create_namespace: true,
install_only: true,
repository: Some(HelmRepository::new(
"rook-release".to_string(),
hurl!("https://charts.rook.io/release"),
true,
)),
};
helm_score.create_interpret()
}
fn name(&self) -> String {
format!("RookCephOperatorScore({})", self.namespace)
}
}

View File

@@ -0,0 +1,255 @@
//! Typed `rook-ceph-tools` Deployment.
//!
//! In Rook v1.19 the toolbox is *not* installed by the operator Helm chart —
//! it's a standalone manifest at `deploy/examples/toolbox.yaml` in the rook
//! repo. This module ports that manifest to a typed `k8s_openapi::Deployment`
//! so [`super::RookCephClusterScore`] can apply it via `K8sResourceScore`
//! without dropping back to raw YAML.
//!
//! The container image is sourced from the CephCluster spec's `cephVersion`
//! so the toolbox stays in lockstep with the cluster's Ceph version.
use std::collections::BTreeMap;
use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec};
use k8s_openapi::api::core::v1::{
Capabilities, ConfigMapVolumeSource, Container, EmptyDirVolumeSource, EnvVar, EnvVarSource,
KeyToPath, PodSpec, PodTemplateSpec, SecretKeySelector, SecretVolumeSource, SecurityContext,
Toleration, Volume, VolumeMount,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
/// Inline replication of Rook's `toolbox.sh` — re-renders `/etc/ceph/ceph.conf`
/// and keyring on mon-endpoint changes so the bundled `quay.io/ceph/ceph`
/// image (which lacks the rook tooling) can be used directly.
///
/// Verbatim from `deploy/examples/toolbox.yaml` in rook/rook@v1.19.5.
const TOOLBOX_SCRIPT: &str = r#"# Replicate the script from toolbox.sh inline so the ceph image
# can be run directly, instead of requiring the rook toolbox
CEPH_CONFIG="/etc/ceph/ceph.conf"
MON_CONFIG="/etc/rook/mon-endpoints"
KEYRING_FILE="/etc/ceph/keyring"
CONFIG_OVERRIDE="/etc/rook-config-override/config"
# create a ceph config file in its default location so ceph/rados tools can be used
# without specifying any arguments
write_endpoints() {
endpoints=$(cat ${MON_CONFIG})
# filter out the mon names
# external cluster can have numbers or hyphens in mon names, handling them in regex
# shellcheck disable=SC2001
mon_endpoints=$(echo "${endpoints}"| sed 's/[a-z0-9_-]\+=//g')
DATE=$(date)
echo "$DATE writing mon endpoints to ${CEPH_CONFIG}: ${endpoints}"
cat <<EOF > ${CEPH_CONFIG}
[global]
mon_host = ${mon_endpoints}
[client.admin]
keyring = ${KEYRING_FILE}
EOF
# Merge the config override if it exists and is not empty
if [ -f "${CONFIG_OVERRIDE}" ] && [ -s "${CONFIG_OVERRIDE}" ]; then
echo "$DATE merging config override from ${CONFIG_OVERRIDE}"
echo "" >> ${CEPH_CONFIG}
cat ${CONFIG_OVERRIDE} >> ${CEPH_CONFIG}
fi
}
# watch the endpoints config file and update if the mon endpoints ever change
watch_endpoints() {
# get the timestamp for the target of the soft link
real_path=$(realpath ${MON_CONFIG})
initial_time=$(stat -c %Z "${real_path}")
while true; do
real_path=$(realpath ${MON_CONFIG})
latest_time=$(stat -c %Z "${real_path}")
if [[ "${latest_time}" != "${initial_time}" ]]; then
write_endpoints
initial_time=${latest_time}
fi
sleep 10
done
}
# read the secret from an env var (for backward compatibility), or from the secret file
ceph_secret=${ROOK_CEPH_SECRET}
if [[ "$ceph_secret" == "" ]]; then
ceph_secret=$(cat /var/lib/rook-ceph-mon/secret.keyring)
fi
# create the keyring file
cat <<EOF > ${KEYRING_FILE}
[${ROOK_CEPH_USERNAME}]
key = ${ceph_secret}
EOF
# write the initial config file
write_endpoints
# continuously update the mon endpoints if they fail over
watch_endpoints
"#;
const TOOLBOX_NAME: &str = "rook-ceph-tools";
const TOOLBOX_LABEL: &str = "rook-ceph-tools";
/// Build the canonical `rook-ceph-tools` Deployment for the given namespace
/// and Ceph container image. Ported verbatim from rook/rook@v1.19.5's
/// `deploy/examples/toolbox.yaml`.
pub fn toolbox_deployment(namespace: &str, image: &str) -> Deployment {
let labels = {
let mut m = BTreeMap::new();
m.insert("app".to_string(), TOOLBOX_LABEL.to_string());
m
};
let container = Container {
name: TOOLBOX_NAME.to_string(),
image: Some(image.to_string()),
image_pull_policy: Some("IfNotPresent".to_string()),
command: Some(vec![
"/bin/bash".to_string(),
"-c".to_string(),
TOOLBOX_SCRIPT.to_string(),
]),
tty: Some(true),
security_context: Some(SecurityContext {
run_as_non_root: Some(true),
run_as_user: Some(2016),
run_as_group: Some(2016),
capabilities: Some(Capabilities {
drop: Some(vec!["ALL".to_string()]),
..Capabilities::default()
}),
..SecurityContext::default()
}),
env: Some(vec![EnvVar {
name: "ROOK_CEPH_USERNAME".to_string(),
value_from: Some(EnvVarSource {
secret_key_ref: Some(SecretKeySelector {
name: "rook-ceph-mon".to_string(),
key: "ceph-username".to_string(),
optional: None,
}),
..EnvVarSource::default()
}),
..EnvVar::default()
}]),
volume_mounts: Some(vec![
VolumeMount {
name: "ceph-config".to_string(),
mount_path: "/etc/ceph".to_string(),
..VolumeMount::default()
},
VolumeMount {
name: "mon-endpoint-volume".to_string(),
mount_path: "/etc/rook".to_string(),
..VolumeMount::default()
},
VolumeMount {
name: "ceph-admin-secret".to_string(),
mount_path: "/var/lib/rook-ceph-mon".to_string(),
read_only: Some(true),
..VolumeMount::default()
},
VolumeMount {
name: "rook-config-override".to_string(),
mount_path: "/etc/rook-config-override".to_string(),
read_only: Some(true),
..VolumeMount::default()
},
]),
..Container::default()
};
let volumes = vec![
Volume {
name: "ceph-admin-secret".to_string(),
secret: Some(SecretVolumeSource {
secret_name: Some("rook-ceph-mon".to_string()),
optional: Some(false),
items: Some(vec![KeyToPath {
key: "ceph-secret".to_string(),
path: "secret.keyring".to_string(),
..KeyToPath::default()
}]),
..SecretVolumeSource::default()
}),
..Volume::default()
},
Volume {
name: "mon-endpoint-volume".to_string(),
config_map: Some(ConfigMapVolumeSource {
name: "rook-ceph-mon-endpoints".to_string(),
items: Some(vec![KeyToPath {
key: "data".to_string(),
path: "mon-endpoints".to_string(),
..KeyToPath::default()
}]),
..ConfigMapVolumeSource::default()
}),
..Volume::default()
},
Volume {
name: "rook-config-override".to_string(),
config_map: Some(ConfigMapVolumeSource {
name: "rook-config-override".to_string(),
optional: Some(true),
..ConfigMapVolumeSource::default()
}),
..Volume::default()
},
Volume {
name: "ceph-config".to_string(),
empty_dir: Some(EmptyDirVolumeSource::default()),
..Volume::default()
},
];
let pod_spec = PodSpec {
dns_policy: Some("ClusterFirstWithHostNet".to_string()),
service_account_name: Some("rook-ceph-default".to_string()),
containers: vec![container],
volumes: Some(volumes),
tolerations: Some(vec![Toleration {
key: Some("node.kubernetes.io/unreachable".to_string()),
operator: Some("Exists".to_string()),
effect: Some("NoExecute".to_string()),
toleration_seconds: Some(5),
..Toleration::default()
}]),
..PodSpec::default()
};
Deployment {
metadata: ObjectMeta {
name: Some(TOOLBOX_NAME.to_string()),
namespace: Some(namespace.to_string()),
labels: Some(labels.clone()),
..ObjectMeta::default()
},
spec: Some(DeploymentSpec {
replicas: Some(1),
selector: LabelSelector {
match_labels: Some(labels.clone()),
..LabelSelector::default()
},
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(labels),
..ObjectMeta::default()
}),
spec: Some(pod_spec),
},
..DeploymentSpec::default()
}),
..Deployment::default()
}
}