feat/ceph-score #297
10
Cargo.lock
generated
10
Cargo.lock
generated
@@ -2685,6 +2685,16 @@ dependencies = [
|
||||
"url",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "example-install-rook-ceph"
|
||||
version = "0.1.0"
|
||||
dependencies = [
|
||||
"harmony",
|
||||
"harmony_cli",
|
||||
"k8s-openapi",
|
||||
"tokio",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "example-k8s-drain-node"
|
||||
version = "0.1.0"
|
||||
|
||||
13
examples/install_rook_ceph/Cargo.toml
Normal file
13
examples/install_rook_ceph/Cargo.toml
Normal file
@@ -0,0 +1,13 @@
|
||||
[package]
|
||||
name = "example-install-rook-ceph"
|
||||
edition = "2024"
|
||||
version.workspace = true
|
||||
readme.workspace = true
|
||||
license.workspace = true
|
||||
publish = false
|
||||
|
||||
[dependencies]
|
||||
harmony = { path = "../../harmony" }
|
||||
harmony_cli = { path = "../../harmony_cli" }
|
||||
k8s-openapi = { workspace = true }
|
||||
tokio = { workspace = true }
|
||||
31
examples/install_rook_ceph/env.sh
Normal file
31
examples/install_rook_ceph/env.sh
Normal file
@@ -0,0 +1,31 @@
|
||||
# shellcheck shell=bash
|
||||
# Source this file before running the example:
|
||||
# source env.sh && cargo run -p example-install-rook-ceph
|
||||
|
||||
# ---------- Target an existing cluster (no local K3D) ------------------------
|
||||
# This example is meant to drive a real cluster (pico OKD, vanilla K8s with an
|
||||
# ingress, etc.) — not a throwaway local one. Force K8sAnywhereTopology to use
|
||||
# whatever cluster the kubeconfig points at.
|
||||
export HARMONY_USE_LOCAL_K3D=false
|
||||
|
||||
# K8sAnywhereTopology resolves the kubeconfig via:
|
||||
# 1. KUBECONFIG env var
|
||||
# 2. fallback: $HOME/.kube/config
|
||||
# Uncomment if your config isn't in the default location.
|
||||
# export KUBECONFIG=/path/to/your/kubeconfig
|
||||
|
||||
# Pin to a specific kubectl context if your kubeconfig has more than one.
|
||||
# export HARMONY_K8S_CONTEXT=my-pico-okd
|
||||
|
||||
# ---------- Harmony state storage (per-example boilerplate) ------------------
|
||||
# This example doesn't actually touch harmony_secret or the SQLite DB, but
|
||||
# other examples in the workspace use this same boilerplate and these vars
|
||||
# don't cost anything when unread.
|
||||
export HARMONY_SECRET_NAMESPACE=install-rook-ceph
|
||||
export HARMONY_SECRET_STORE=file
|
||||
export HARMONY_DATABASE_URL=sqlite://harmony_install_rook_ceph.sqlite
|
||||
|
||||
# ---------- Logging ----------------------------------------------------------
|
||||
# The RookCephClusterScore wait loops log every `ceph health` transition at
|
||||
# info level; debug shows polling cadence.
|
||||
export RUST_LOG=harmony=debug
|
||||
136
examples/install_rook_ceph/src/main.rs
Normal file
136
examples/install_rook_ceph/src/main.rs
Normal file
@@ -0,0 +1,136 @@
|
||||
use harmony::{
|
||||
inventory::Inventory,
|
||||
modules::storage::ceph::{
|
||||
RookCephClusterScore, RookCephOperatorScore,
|
||||
ceph_validate_health_score::CephVerifyClusterHealth,
|
||||
crd::{CephObjectStore, CephObjectStoreSpec, CephObjectStoreUser, GatewaySpec},
|
||||
},
|
||||
score::Score,
|
||||
topology::K8sAnywhereTopology,
|
||||
};
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
||||
|
||||
// Uncomment to enable the edge-TLS Ingress for the S3 endpoint.
|
||||
// use harmony::modules::k8s::resource::K8sResourceScore;
|
||||
// use k8s_openapi::api::networking::v1::{
|
||||
// HTTPIngressPath, HTTPIngressRuleValue, Ingress, IngressBackend, IngressRule,
|
||||
// IngressServiceBackend, IngressSpec, IngressTLS, ServiceBackendPort,
|
||||
// };
|
||||
|
||||
const NAMESPACE: &str = "rook-ceph";
|
||||
const OBJECTSTORE_NAME: &str = "ceph-objectstore";
|
||||
const S3_USER_NAME: &str = "harmony-s3";
|
||||
|
||||
// External S3 hostname for the Ingress. Edit this to match the DNS name
|
||||
// you'll point at your cluster's ingress LB.
|
||||
// const S3_HOSTNAME: &str = "s3.example.com";
|
||||
|
||||
// TLS Secret holding the cert+key for `S3_HOSTNAME`. The Secret must exist
|
||||
// in `NAMESPACE` *before* this example runs (e.g. via cert-manager, or a
|
||||
// manual `kubectl create secret tls`). The example does not create it —
|
||||
// real cert material can't be shipped in a repo.
|
||||
// const S3_TLS_SECRET: &str = "ceph-objectstore-tls";
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() {
|
||||
let topology = K8sAnywhereTopology::from_env();
|
||||
let inventory = Inventory::autoload();
|
||||
|
||||
let object_store = CephObjectStore {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(OBJECTSTORE_NAME.to_string()),
|
||||
namespace: Some(NAMESPACE.to_string()),
|
||||
..ObjectMeta::default()
|
||||
},
|
||||
spec: CephObjectStoreSpec {
|
||||
gateway: GatewaySpec {
|
||||
instances: 2,
|
||||
..GatewaySpec::default()
|
||||
},
|
||||
..CephObjectStoreSpec::default()
|
||||
},
|
||||
};
|
||||
|
||||
let s3_user = CephObjectStoreUser::for_store(
|
||||
S3_USER_NAME,
|
||||
NAMESPACE,
|
||||
OBJECTSTORE_NAME,
|
||||
"Harmony default S3 user",
|
||||
);
|
||||
|
||||
let mut cluster = RookCephClusterScore::default_okd(NAMESPACE);
|
||||
cluster.object_stores = vec![object_store];
|
||||
cluster.object_store_users = vec![s3_user];
|
||||
|
||||
// Pico-OKD tuning: relax OSD backfill/recovery on a small cluster so
|
||||
// recovery storms don't starve client I/O. These are equivalent to running
|
||||
// `ceph config set osd <key> <value>` against the live cluster, but
|
||||
// declarative — Rook reconciles them after the MONs reach quorum and
|
||||
// re-applies on drift. Values are strings (Ceph parses them).
|
||||
//
|
||||
// Adjust or remove per your hardware. Not opinionated defaults — only
|
||||
// wired here because the user's first target is a pico OKD.
|
||||
cluster
|
||||
.cluster
|
||||
.spec
|
||||
.set_config("osd.*", "osd_max_backfills", "1")
|
||||
.set_config("osd.*", "osd_recovery_max_active", "1")
|
||||
.set_config("osd.*", "osd_recovery_op_priority", "1")
|
||||
.set_config("osd.*", "osd_mclock_profile", "high_client_ops");
|
||||
// Alternative — disable mclock entirely if you suspect it's the problem:
|
||||
// cluster.cluster.spec.set_config("osd.*", "osd_op_queue", "wpq");
|
||||
|
||||
// Edge-TLS Ingress in front of the RGW Service. Rook creates the Service
|
||||
// as `rook-ceph-rgw-<store-name>` on the gateway port (8080 here).
|
||||
// Uncomment this block (and its imports + consts above) to enable, then
|
||||
// add the Box::new(K8sResourceScore::single(...)) entry to the `scores`
|
||||
// vec below.
|
||||
//
|
||||
// let s3_ingress = Ingress {
|
||||
// metadata: ObjectMeta {
|
||||
// name: Some(format!("{OBJECTSTORE_NAME}-s3")),
|
||||
// namespace: Some(NAMESPACE.to_string()),
|
||||
// ..ObjectMeta::default()
|
||||
// },
|
||||
// spec: Some(IngressSpec {
|
||||
// rules: Some(vec![IngressRule {
|
||||
// host: Some(S3_HOSTNAME.to_string()),
|
||||
// http: Some(HTTPIngressRuleValue {
|
||||
// paths: vec![HTTPIngressPath {
|
||||
// path: Some("/".to_string()),
|
||||
// path_type: "Prefix".to_string(),
|
||||
// backend: IngressBackend {
|
||||
// service: Some(IngressServiceBackend {
|
||||
// name: format!("rook-ceph-rgw-{OBJECTSTORE_NAME}"),
|
||||
// port: Some(ServiceBackendPort {
|
||||
// number: Some(8080),
|
||||
// ..ServiceBackendPort::default()
|
||||
// }),
|
||||
// }),
|
||||
// ..IngressBackend::default()
|
||||
// },
|
||||
// }],
|
||||
// }),
|
||||
// }]),
|
||||
// tls: Some(vec![IngressTLS {
|
||||
// hosts: Some(vec![S3_HOSTNAME.to_string()]),
|
||||
// secret_name: Some(S3_TLS_SECRET.to_string()),
|
||||
// }]),
|
||||
// ..IngressSpec::default()
|
||||
// }),
|
||||
// ..Ingress::default()
|
||||
// };
|
||||
|
||||
let scores: Vec<Box<dyn Score<K8sAnywhereTopology>>> = vec![
|
||||
Box::new(RookCephOperatorScore::default_okd()),
|
||||
Box::new(cluster),
|
||||
Box::new(CephVerifyClusterHealth {
|
||||
rook_ceph_namespace: NAMESPACE.to_string(),
|
||||
}),
|
||||
// Box::new(K8sResourceScore::single(s3_ingress, Some(NAMESPACE.to_string()))),
|
||||
];
|
||||
|
||||
harmony_cli::run(inventory, topology, scores, None)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
@@ -1,6 +1,7 @@
|
||||
use async_trait::async_trait;
|
||||
use k8s_openapi::ResourceScope;
|
||||
use kube::Resource;
|
||||
use kube::api::DynamicObject;
|
||||
use log::info;
|
||||
use serde::{Serialize, de::DeserializeOwned};
|
||||
|
||||
@@ -17,6 +18,12 @@ use harmony_types::id::Id;
|
||||
pub struct K8sResourceScore<K: Resource + std::fmt::Debug> {
|
||||
pub resource: Vec<K>,
|
||||
pub namespace: Option<String>,
|
||||
/// When `true`, server-side apply is performed with `force=true`
|
||||
/// (equivalent to `kubectl apply --force-conflicts`). Required when
|
||||
/// applying a resource whose fields an operator has taken ownership of
|
||||
/// (typical for CRs managed by reconciling operators like Rook). For
|
||||
/// generic Kubernetes resources owned only by Harmony, leave `false`.
|
||||
pub force_conflicts: bool,
|
||||
}
|
||||
|
||||
impl<K: Resource + std::fmt::Debug> K8sResourceScore<K> {
|
||||
@@ -24,8 +31,16 @@ impl<K: Resource + std::fmt::Debug> K8sResourceScore<K> {
|
||||
Self {
|
||||
resource: vec![resource],
|
||||
namespace,
|
||||
force_conflicts: false,
|
||||
}
|
||||
}
|
||||
|
||||
/// Consume `self` and return it with `force_conflicts` set. Chainable
|
||||
/// after `single`: `K8sResourceScore::single(r, ns).with_force_conflicts(true)`.
|
||||
pub fn with_force_conflicts(mut self, force: bool) -> Self {
|
||||
self.force_conflicts = force;
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl<
|
||||
@@ -102,16 +117,47 @@ where
|
||||
.collect();
|
||||
|
||||
info!(
|
||||
"Applying {} resources : {}",
|
||||
"Applying {} resources : {} (force_conflicts={})",
|
||||
resource_names.len(),
|
||||
resource_names.join(", ")
|
||||
resource_names.join(", "),
|
||||
self.score.force_conflicts
|
||||
);
|
||||
topology
|
||||
let k8s = topology
|
||||
.k8s_client()
|
||||
.await
|
||||
.map_err(|e| InterpretError::new(format!("Failed to get k8s client : {e}")))?
|
||||
.apply_many(&self.score.resource, self.score.namespace.as_deref())
|
||||
.await?;
|
||||
.map_err(|e| InterpretError::new(format!("Failed to get k8s client : {e}")))?;
|
||||
|
||||
if self.score.force_conflicts {
|
||||
// apply_dynamic_many exposes the force_conflicts flag on
|
||||
// PatchParams that the typed apply path does not. Round-trip each
|
||||
// typed resource through JSON to land on a DynamicObject without
|
||||
// duplicating the harmony-k8s private `to_dynamic` helper.
|
||||
let dyn_objects: Vec<DynamicObject> = self
|
||||
.score
|
||||
.resource
|
||||
.iter()
|
||||
.map(|r| {
|
||||
serde_json::to_value(r)
|
||||
.map_err(|e| {
|
||||
InterpretError::new(format!(
|
||||
"Failed to serialize resource for force-apply: {e}"
|
||||
))
|
||||
})
|
||||
.and_then(|v| {
|
||||
serde_json::from_value::<DynamicObject>(v).map_err(|e| {
|
||||
InterpretError::new(format!(
|
||||
"Failed to convert resource to DynamicObject: {e}"
|
||||
))
|
||||
})
|
||||
})
|
||||
})
|
||||
.collect::<Result<Vec<_>, _>>()?;
|
||||
k8s.apply_dynamic_many(&dyn_objects, self.score.namespace.as_deref(), true)
|
||||
.await?;
|
||||
} else {
|
||||
k8s.apply_many(&self.score.resource, self.score.namespace.as_deref())
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(Outcome::success(
|
||||
"Successfully applied resource".to_string(),
|
||||
|
||||
517
harmony/src/modules/storage/ceph/cluster_score.rs
Normal file
517
harmony/src/modules/storage/ceph/cluster_score.rs
Normal file
@@ -0,0 +1,517 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use async_trait::async_trait;
|
||||
use harmony_k8s::K8sClient;
|
||||
use k8s_openapi::api::storage::v1::StorageClass;
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
||||
use log::{debug, info, warn};
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::data::Version;
|
||||
use crate::interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome};
|
||||
use crate::inventory::Inventory;
|
||||
use crate::modules::k8s::resource::K8sResourceScore;
|
||||
use crate::score::Score;
|
||||
use crate::topology::{K8sclient, Topology};
|
||||
use harmony_types::id::Id;
|
||||
|
||||
/// Total time we'll wait for the `rook-ceph-operator` Deployment to be ready
|
||||
/// after `helm install` returns. Image pulls dominate the first run.
|
||||
const OPERATOR_READY_TIMEOUT: Duration = Duration::from_secs(300);
|
||||
|
||||
/// Total time we'll wait for the `cephclusters.ceph.rook.io` CRD to be
|
||||
/// registered in the API server's discovery surface. Helm 3 applies CRDs
|
||||
/// before other resources but the cache can lag.
|
||||
const CRD_READY_TIMEOUT: Duration = Duration::from_secs(60);
|
||||
|
||||
/// Total time we'll wait for the toolbox Deployment to come up before bailing.
|
||||
const TOOLBOX_READY_TIMEOUT: Duration = Duration::from_secs(600);
|
||||
|
||||
/// Total time we'll wait for the Ceph cluster to reach `HEALTH_OK` after the
|
||||
/// toolbox is up. Mons + mgrs + OSDs + initial PG peering on a small cluster
|
||||
/// typically lands in 5–15 min; 20 covers slower hardware.
|
||||
const HEALTH_OK_TIMEOUT: Duration = Duration::from_secs(1200);
|
||||
|
||||
/// Poll cadence for the toolbox + health waits. Cluster transitions on the
|
||||
/// order of seconds, not subsecond — no benefit to a tighter loop.
|
||||
const POLL_INTERVAL: Duration = Duration::from_secs(10);
|
||||
|
||||
use super::crd::{
|
||||
CephBlockPool, CephBlockPoolSpec, CephCluster, CephFilesystem, CephObjectStore,
|
||||
CephObjectStoreUser, FailureDomain, ReplicatedSpec,
|
||||
};
|
||||
use super::toolbox::toolbox_deployment;
|
||||
|
||||
/// Deploys a typed Rook-Ceph cluster: `CephCluster` + pools + filesystems +
|
||||
/// object stores + their consumer `StorageClass`es. Assumes the Rook operator
|
||||
/// is already installed (use [`super::RookCephOperatorScore`]).
|
||||
///
|
||||
/// Each Custom Resource is a real Rust type (`kube::CustomResource`-derived),
|
||||
/// applied via `K8sResourceScore::single` — the same apply path used by CNPG.
|
||||
///
|
||||
/// # Ordering
|
||||
/// Steps applied in this order:
|
||||
/// 1. **Wait** for the `rook-ceph-operator` Deployment to be Ready (Helm
|
||||
/// install returns before pods are up).
|
||||
/// 2. **Wait** for `cephclusters.ceph.rook.io` CRD to be discoverable and
|
||||
/// invalidate the kube-rs discovery cache.
|
||||
/// 3. Apply `CephCluster`.
|
||||
/// 4. Apply the `rook-ceph-tools` Deployment (typed, ported from the upstream
|
||||
/// `deploy/examples/toolbox.yaml`). Image is read from the CephCluster
|
||||
/// spec so it always matches the daemons.
|
||||
/// 5. **Wait** for the `rook-ceph-tools` Deployment to be Ready.
|
||||
/// 6. **Wait** for `ceph health` to return `HEALTH_OK` — mons in quorum, mgrs
|
||||
/// up, OSDs bootstrapped, initial PGs peered. Typically 5–15 min on a
|
||||
/// small cluster; capped at 20 min.
|
||||
/// 7. Apply `CephBlockPool` resources + their RBD `StorageClass`es.
|
||||
/// 8. Apply `CephFilesystem` resources + their CephFS `StorageClass`es.
|
||||
/// 9. Apply `CephObjectStore` resources.
|
||||
/// 10. Apply `CephObjectStoreUser` resources (Rook materializes their S3
|
||||
/// credentials into a `rook-ceph-object-user-<store>-<user>` Secret per
|
||||
/// user).
|
||||
///
|
||||
/// The waits in steps 1, 2, 5, and 6 mean this Score takes minutes, not
|
||||
/// seconds, to return — but downstream Scores like `CephVerifyClusterHealth`
|
||||
/// can rely on the cluster actually being ready when this returns.
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct RookCephClusterScore {
|
||||
pub namespace: String,
|
||||
pub cluster: CephCluster,
|
||||
pub block_pools: Vec<CephBlockPool>,
|
||||
pub filesystems: Vec<CephFilesystem>,
|
||||
pub object_stores: Vec<CephObjectStore>,
|
||||
pub object_store_users: Vec<CephObjectStoreUser>,
|
||||
/// Mark the first block-pool StorageClass as the cluster default.
|
||||
pub default_block_pool_storage_class: bool,
|
||||
}
|
||||
|
||||
impl RookCephClusterScore {
|
||||
/// OKD-friendly defaults: 3-mon, 2-mgr, dashboard on 8443/ssl, replicated
|
||||
/// pools size=3, useAllNodes + useAllDevices, dataDirHostPath=/var/lib/rook,
|
||||
/// one CephBlockPool named "replicapool", no filesystem or object store.
|
||||
pub fn default_okd(namespace: impl Into<String>) -> Self {
|
||||
let ns = namespace.into();
|
||||
|
||||
let mut cluster = CephCluster::default();
|
||||
cluster.metadata.name = Some(ns.clone());
|
||||
cluster.metadata.namespace = Some(ns.clone());
|
||||
cluster.spec.storage.use_all_nodes = true;
|
||||
cluster.spec.storage.use_all_devices = true;
|
||||
|
||||
let block_pool = CephBlockPool {
|
||||
metadata: ObjectMeta {
|
||||
name: Some("replicapool".to_string()),
|
||||
namespace: Some(ns.clone()),
|
||||
..ObjectMeta::default()
|
||||
},
|
||||
spec: CephBlockPoolSpec {
|
||||
replicated: Some(ReplicatedSpec::default()),
|
||||
failure_domain: Some(FailureDomain::Host),
|
||||
..CephBlockPoolSpec::default()
|
||||
},
|
||||
};
|
||||
|
||||
Self {
|
||||
namespace: ns,
|
||||
cluster,
|
||||
block_pools: vec![block_pool],
|
||||
filesystems: vec![],
|
||||
object_stores: vec![],
|
||||
object_store_users: vec![],
|
||||
default_block_pool_storage_class: true,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Topology + K8sclient + 'static> Score<T> for RookCephClusterScore {
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
Box::new(RookCephClusterInterpret {
|
||||
score: self.clone(),
|
||||
})
|
||||
}
|
||||
|
||||
fn name(&self) -> String {
|
||||
format!("RookCephClusterScore({})", self.namespace)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug)]
|
||||
struct RookCephClusterInterpret {
|
||||
score: RookCephClusterScore,
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
impl<T: Topology + K8sclient + 'static> Interpret<T> for RookCephClusterInterpret {
|
||||
async fn execute(
|
||||
&self,
|
||||
inventory: &Inventory,
|
||||
topology: &T,
|
||||
) -> Result<Outcome, InterpretError> {
|
||||
let ns = self.score.namespace.clone();
|
||||
|
||||
let k8s = topology
|
||||
.k8s_client()
|
||||
.await
|
||||
.map_err(|e| InterpretError::new(format!("Failed to get k8s client: {e}")))?;
|
||||
|
||||
// `helm install` (in RookCephOperatorScore) returns once resources are
|
||||
// created — not once the operator Deployment is Ready or the CRDs are
|
||||
// registered in the API server's discovery cache. Without these waits,
|
||||
// the CephCluster apply below races and typically fails with
|
||||
// "Cannot resolve GVK ceph.rook.io/v1/CephCluster".
|
||||
info!(
|
||||
"[Rook-Ceph] Waiting for rook-ceph-operator deployment in '{}'",
|
||||
ns
|
||||
);
|
||||
k8s.wait_until_deployment_ready(
|
||||
"rook-ceph-operator",
|
||||
Some(&ns),
|
||||
Some(OPERATOR_READY_TIMEOUT),
|
||||
)
|
||||
.await
|
||||
.map_err(|e| InterpretError::new(format!("rook-ceph-operator not ready: {e}")))?;
|
||||
|
||||
info!("[Rook-Ceph] Waiting for cephclusters.ceph.rook.io CRD registration");
|
||||
k8s.wait_for_crd("cephclusters.ceph.rook.io", Some(CRD_READY_TIMEOUT))
|
||||
.await
|
||||
.map_err(|e| InterpretError::new(format!("CephCluster CRD not registered: {e}")))?;
|
||||
// The discovery cache on our shared K8sClient was populated before the
|
||||
// operator install added the Rook CRDs. Force a refresh so the next
|
||||
// apply can resolve the new GVK.
|
||||
k8s.invalidate_discovery().await;
|
||||
|
||||
info!("[Rook-Ceph] Applying CephCluster '{}'", ns);
|
||||
K8sResourceScore::single(self.score.cluster.clone(), Some(ns.clone()))
|
||||
.with_force_conflicts(true)
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
|
||||
// Rook v1.19 no longer deploys the toolbox via the operator Helm chart;
|
||||
// we ship it as a typed Deployment ported from the upstream
|
||||
// `deploy/examples/toolbox.yaml`. Image follows whatever CephCluster
|
||||
// is configured with — keeps the toolbox in lockstep with the daemons.
|
||||
let toolbox_image = &self.score.cluster.spec.ceph_version.image;
|
||||
info!(
|
||||
"[Rook-Ceph] Applying rook-ceph-tools Deployment (image='{}')",
|
||||
toolbox_image
|
||||
);
|
||||
K8sResourceScore::single(
|
||||
toolbox_deployment(&ns, toolbox_image),
|
||||
Some(ns.clone()),
|
||||
)
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
|
||||
wait_for_toolbox_ready(&k8s, &ns).await?;
|
||||
wait_for_health_ok(&k8s, &ns).await?;
|
||||
|
||||
for (idx, pool) in self.score.block_pools.iter().enumerate() {
|
||||
let pool_name = pool
|
||||
.metadata
|
||||
.name
|
||||
.clone()
|
||||
.unwrap_or_else(|| format!("pool-{idx}"));
|
||||
info!("[Rook-Ceph] Applying CephBlockPool '{}'", pool_name);
|
||||
K8sResourceScore::single(pool.clone(), Some(ns.clone()))
|
||||
.with_force_conflicts(true)
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
|
||||
let sc = rbd_storage_class(
|
||||
&pool_name,
|
||||
&ns,
|
||||
idx == 0 && self.score.default_block_pool_storage_class,
|
||||
);
|
||||
info!(
|
||||
"[Rook-Ceph] Applying StorageClass '{}' (RBD on pool '{}')",
|
||||
sc.metadata.name.as_deref().unwrap_or("?"),
|
||||
pool_name
|
||||
);
|
||||
K8sResourceScore::single(sc, None)
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
}
|
||||
|
||||
for fs in self.score.filesystems.iter() {
|
||||
let fs_name = fs
|
||||
.metadata
|
||||
.name
|
||||
.clone()
|
||||
.unwrap_or_else(|| "cephfs".to_string());
|
||||
info!("[Rook-Ceph] Applying CephFilesystem '{}'", fs_name);
|
||||
K8sResourceScore::single(fs.clone(), Some(ns.clone()))
|
||||
.with_force_conflicts(true)
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
|
||||
let sc = cephfs_storage_class(&fs_name, &ns);
|
||||
info!(
|
||||
"[Rook-Ceph] Applying StorageClass '{}' (CephFS on filesystem '{}')",
|
||||
sc.metadata.name.as_deref().unwrap_or("?"),
|
||||
fs_name
|
||||
);
|
||||
K8sResourceScore::single(sc, None)
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
}
|
||||
|
||||
for store in self.score.object_stores.iter() {
|
||||
let store_name = store
|
||||
.metadata
|
||||
.name
|
||||
.clone()
|
||||
.unwrap_or_else(|| "object-store".to_string());
|
||||
info!("[Rook-Ceph] Applying CephObjectStore '{}'", store_name);
|
||||
K8sResourceScore::single(store.clone(), Some(ns.clone()))
|
||||
.with_force_conflicts(true)
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
}
|
||||
|
||||
for user in self.score.object_store_users.iter() {
|
||||
let user_name = user
|
||||
.metadata
|
||||
.name
|
||||
.clone()
|
||||
.unwrap_or_else(|| "object-store-user".to_string());
|
||||
info!(
|
||||
"[Rook-Ceph] Applying CephObjectStoreUser '{}' (store='{}'); \
|
||||
credentials will appear in Secret '{}'",
|
||||
user_name,
|
||||
user.spec.store,
|
||||
user.credentials_secret_name(),
|
||||
);
|
||||
K8sResourceScore::single(user.clone(), Some(ns.clone()))
|
||||
.with_force_conflicts(true)
|
||||
.interpret(inventory, topology)
|
||||
.await?;
|
||||
}
|
||||
|
||||
Ok(Outcome::success(format!(
|
||||
"Applied Rook-Ceph cluster '{}' ({} block-pool(s), {} filesystem(s), {} object-store(s), {} object-store-user(s))",
|
||||
ns,
|
||||
self.score.block_pools.len(),
|
||||
self.score.filesystems.len(),
|
||||
self.score.object_stores.len(),
|
||||
self.score.object_store_users.len(),
|
||||
)))
|
||||
}
|
||||
|
||||
fn get_name(&self) -> InterpretName {
|
||||
InterpretName::Custom("RookCephClusterInterpret")
|
||||
}
|
||||
|
||||
fn get_version(&self) -> Version {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_status(&self) -> InterpretStatus {
|
||||
todo!()
|
||||
}
|
||||
|
||||
fn get_children(&self) -> Vec<Id> {
|
||||
todo!()
|
||||
}
|
||||
}
|
||||
|
||||
/// Poll the `rook-ceph-tools` Deployment until it has ≥1 ready replica. This
|
||||
/// is a prerequisite for any `ceph` CLI call against the cluster — including
|
||||
/// the HEALTH_OK wait below, and the existing `CephVerifyClusterHealth` Score.
|
||||
///
|
||||
/// `exec_app_capture_output` panics (`.expect("No matching pod")`) if called
|
||||
/// when no toolbox pod exists yet — so we must gate exec on this first.
|
||||
async fn wait_for_toolbox_ready(client: &Arc<K8sClient>, ns: &str) -> Result<(), InterpretError> {
|
||||
let toolbox = "rook-ceph-tools";
|
||||
info!(
|
||||
"[Rook-Ceph] Waiting for '{}' deployment in '{}' (up to {}s)",
|
||||
toolbox,
|
||||
ns,
|
||||
TOOLBOX_READY_TIMEOUT.as_secs()
|
||||
);
|
||||
let start = Instant::now();
|
||||
loop {
|
||||
match client.get_deployment(toolbox, Some(ns)).await {
|
||||
Ok(Some(dep)) => {
|
||||
let ready = dep
|
||||
.status
|
||||
.as_ref()
|
||||
.and_then(|s| s.ready_replicas)
|
||||
.unwrap_or(0);
|
||||
if ready >= 1 {
|
||||
info!("[Rook-Ceph] '{}' is ready ({} replica(s))", toolbox, ready);
|
||||
return Ok(());
|
||||
}
|
||||
debug!("[Rook-Ceph] '{}' present but 0 ready replicas", toolbox);
|
||||
}
|
||||
Ok(None) => debug!("[Rook-Ceph] '{}' deployment not yet created", toolbox),
|
||||
Err(e) => debug!("[Rook-Ceph] error checking '{}': {e}", toolbox),
|
||||
}
|
||||
if start.elapsed() > TOOLBOX_READY_TIMEOUT {
|
||||
return Err(InterpretError::new(format!(
|
||||
"Timed out after {}s waiting for '{}' deployment to be ready in '{}'. \
|
||||
Is the operator running with toolbox.enabled=true?",
|
||||
TOOLBOX_READY_TIMEOUT.as_secs(),
|
||||
toolbox,
|
||||
ns,
|
||||
)));
|
||||
}
|
||||
tokio::time::sleep(POLL_INTERVAL).await;
|
||||
}
|
||||
}
|
||||
|
||||
/// Poll `ceph health` via the toolbox until it returns `HEALTH_OK`. Fresh
|
||||
/// clusters typically sit in `HEALTH_WARN` for a few minutes while mons reach
|
||||
/// quorum, mgrs come up, and OSDs bootstrap their PGs. Returning before
|
||||
/// `HEALTH_OK` would race with the subsequent block-pool / object-store /
|
||||
/// user applies and with `CephVerifyClusterHealth`.
|
||||
async fn wait_for_health_ok(client: &Arc<K8sClient>, ns: &str) -> Result<(), InterpretError> {
|
||||
info!(
|
||||
"[Rook-Ceph] Waiting for cluster to reach HEALTH_OK in '{}' (up to {}s)",
|
||||
ns,
|
||||
HEALTH_OK_TIMEOUT.as_secs()
|
||||
);
|
||||
let start = Instant::now();
|
||||
let mut last_status = String::new();
|
||||
loop {
|
||||
match client
|
||||
.exec_app_capture_output(
|
||||
"rook-ceph-tools".to_string(),
|
||||
"app".to_string(),
|
||||
Some(ns),
|
||||
vec!["sh", "-c", "ceph health"],
|
||||
)
|
||||
.await
|
||||
{
|
||||
Ok(out) => {
|
||||
let trimmed = out.trim().to_string();
|
||||
if trimmed.starts_with("HEALTH_OK") {
|
||||
info!("[Rook-Ceph] Cluster reached HEALTH_OK");
|
||||
return Ok(());
|
||||
}
|
||||
if trimmed != last_status {
|
||||
info!("[Rook-Ceph] ceph health: {}", trimmed);
|
||||
last_status = trimmed;
|
||||
}
|
||||
}
|
||||
Err(e) => debug!("[Rook-Ceph] ceph health exec failed: {e}"),
|
||||
}
|
||||
if start.elapsed() > HEALTH_OK_TIMEOUT {
|
||||
warn!(
|
||||
"[Rook-Ceph] Last ceph health output before timeout: {}",
|
||||
last_status
|
||||
);
|
||||
return Err(InterpretError::new(format!(
|
||||
"Timed out after {}s waiting for HEALTH_OK in '{}'. Last status: '{}'",
|
||||
HEALTH_OK_TIMEOUT.as_secs(),
|
||||
ns,
|
||||
last_status,
|
||||
)));
|
||||
}
|
||||
tokio::time::sleep(POLL_INTERVAL).await;
|
||||
}
|
||||
}
|
||||
|
||||
fn rbd_storage_class(pool_name: &str, ns: &str, is_default: bool) -> StorageClass {
|
||||
let mut params = BTreeMap::new();
|
||||
params.insert("clusterID".to_string(), ns.to_string());
|
||||
params.insert("pool".to_string(), pool_name.to_string());
|
||||
params.insert("imageFormat".to_string(), "2".to_string());
|
||||
params.insert("imageFeatures".to_string(), "layering".to_string());
|
||||
params.insert(
|
||||
"csi.storage.k8s.io/provisioner-secret-name".to_string(),
|
||||
"rook-csi-rbd-provisioner".to_string(),
|
||||
);
|
||||
params.insert(
|
||||
"csi.storage.k8s.io/provisioner-secret-namespace".to_string(),
|
||||
ns.to_string(),
|
||||
);
|
||||
params.insert(
|
||||
"csi.storage.k8s.io/controller-expand-secret-name".to_string(),
|
||||
"rook-csi-rbd-provisioner".to_string(),
|
||||
);
|
||||
params.insert(
|
||||
"csi.storage.k8s.io/controller-expand-secret-namespace".to_string(),
|
||||
ns.to_string(),
|
||||
);
|
||||
params.insert(
|
||||
"csi.storage.k8s.io/node-stage-secret-name".to_string(),
|
||||
"rook-csi-rbd-node".to_string(),
|
||||
);
|
||||
params.insert(
|
||||
"csi.storage.k8s.io/node-stage-secret-namespace".to_string(),
|
||||
ns.to_string(),
|
||||
);
|
||||
params.insert("csi.storage.k8s.io/fstype".to_string(), "ext4".to_string());
|
||||
|
||||
let mut annotations = BTreeMap::new();
|
||||
if is_default {
|
||||
annotations.insert(
|
||||
"storageclass.kubernetes.io/is-default-class".to_string(),
|
||||
"true".to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
StorageClass {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(format!("rook-ceph-block-{pool_name}")),
|
||||
annotations: if annotations.is_empty() {
|
||||
None
|
||||
} else {
|
||||
Some(annotations)
|
||||
},
|
||||
..ObjectMeta::default()
|
||||
},
|
||||
provisioner: format!("{ns}.rbd.csi.ceph.com"),
|
||||
parameters: Some(params),
|
||||
reclaim_policy: Some("Delete".to_string()),
|
||||
allow_volume_expansion: Some(true),
|
||||
volume_binding_mode: Some("Immediate".to_string()),
|
||||
..StorageClass::default()
|
||||
}
|
||||
}
|
||||
|
||||
fn cephfs_storage_class(fs_name: &str, ns: &str) -> StorageClass {
|
||||
let mut params = BTreeMap::new();
|
||||
params.insert("clusterID".to_string(), ns.to_string());
|
||||
params.insert("fsName".to_string(), fs_name.to_string());
|
||||
params.insert("pool".to_string(), format!("{fs_name}-data0"));
|
||||
params.insert(
|
||||
"csi.storage.k8s.io/provisioner-secret-name".to_string(),
|
||||
"rook-csi-cephfs-provisioner".to_string(),
|
||||
);
|
||||
params.insert(
|
||||
"csi.storage.k8s.io/provisioner-secret-namespace".to_string(),
|
||||
ns.to_string(),
|
||||
);
|
||||
params.insert(
|
||||
"csi.storage.k8s.io/controller-expand-secret-name".to_string(),
|
||||
"rook-csi-cephfs-provisioner".to_string(),
|
||||
);
|
||||
params.insert(
|
||||
"csi.storage.k8s.io/controller-expand-secret-namespace".to_string(),
|
||||
ns.to_string(),
|
||||
);
|
||||
params.insert(
|
||||
"csi.storage.k8s.io/node-stage-secret-name".to_string(),
|
||||
"rook-csi-cephfs-node".to_string(),
|
||||
);
|
||||
params.insert(
|
||||
"csi.storage.k8s.io/node-stage-secret-namespace".to_string(),
|
||||
ns.to_string(),
|
||||
);
|
||||
|
||||
StorageClass {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(format!("rook-cephfs-{fs_name}")),
|
||||
..ObjectMeta::default()
|
||||
},
|
||||
provisioner: format!("{ns}.cephfs.csi.ceph.com"),
|
||||
parameters: Some(params),
|
||||
reclaim_policy: Some("Delete".to_string()),
|
||||
allow_volume_expansion: Some(true),
|
||||
volume_binding_mode: Some("Immediate".to_string()),
|
||||
..StorageClass::default()
|
||||
}
|
||||
}
|
||||
53
harmony/src/modules/storage/ceph/crd/block_pool.rs
Normal file
53
harmony/src/modules/storage/ceph/crd/block_pool.rs
Normal file
@@ -0,0 +1,53 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use kube::{CustomResource, api::ObjectMeta};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::shared::{ErasureCodedSpec, FailureDomain, ReplicatedSpec};
|
||||
|
||||
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
|
||||
#[kube(
|
||||
group = "ceph.rook.io",
|
||||
version = "v1",
|
||||
kind = "CephBlockPool",
|
||||
plural = "cephblockpools",
|
||||
namespaced = true,
|
||||
schema = "disabled"
|
||||
)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct CephBlockPoolSpec {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub replicated: Option<ReplicatedSpec>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub erasure_coded: Option<ErasureCodedSpec>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub failure_domain: Option<FailureDomain>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub device_class: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub parameters: Option<BTreeMap<String, String>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub enable_rbd_stats: Option<bool>,
|
||||
}
|
||||
|
||||
impl Default for CephBlockPool {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
metadata: ObjectMeta::default(),
|
||||
spec: CephBlockPoolSpec::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CephBlockPoolSpec {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
replicated: Some(ReplicatedSpec::default()),
|
||||
erasure_coded: None,
|
||||
failure_domain: Some(FailureDomain::Host),
|
||||
device_class: None,
|
||||
parameters: None,
|
||||
enable_rbd_stats: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
295
harmony/src/modules/storage/ceph/crd/cluster.rs
Normal file
295
harmony/src/modules/storage/ceph/crd/cluster.rs
Normal file
@@ -0,0 +1,295 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use kube::{CustomResource, api::ObjectMeta};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::shared::{PlacementSpec, VolumeClaimTemplate};
|
||||
|
||||
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
|
||||
#[kube(
|
||||
group = "ceph.rook.io",
|
||||
version = "v1",
|
||||
kind = "CephCluster",
|
||||
plural = "cephclusters",
|
||||
namespaced = true,
|
||||
schema = "disabled"
|
||||
)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct CephClusterSpec {
|
||||
pub ceph_version: CephVersionSpec,
|
||||
pub data_dir_host_path: String,
|
||||
pub mon: MonSpec,
|
||||
pub mgr: MgrSpec,
|
||||
pub dashboard: DashboardSpec,
|
||||
pub storage: StorageSpec,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub network: Option<NetworkSpec>,
|
||||
#[serde(skip_serializing_if = "BTreeMap::is_empty", default)]
|
||||
pub placement: BTreeMap<String, PlacementSpec>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub crash_collector: Option<CrashCollectorSpec>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub log_collector: Option<LogCollectorSpec>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub remove_osds_if_out_and_safe_to_remove: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub continue_upgrade_after_checks_even_if_not_healthy: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub skip_upgrade_checks: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub wait_timeout_for_healthy_osd_in_minutes: Option<u32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub disruption_management: Option<DisruptionManagementSpec>,
|
||||
/// Centralized Ceph config applied by the operator after MONs reach quorum.
|
||||
/// Outer key is the "WHO" target (e.g. `"global"`, `"osd.*"`, `"mgr.*"`,
|
||||
/// `"client.rgw.<store>"`, or a specific daemon like `"osd.0"`). Inner map
|
||||
/// is `option-name -> string-value` — Rook calls `ceph config set <who>
|
||||
/// <key> <value>` for each entry. Values must be strings even for numerics
|
||||
/// and booleans; Ceph parses them. Rook does not validate the keys.
|
||||
///
|
||||
/// Prefer [`CephClusterSpec::set_config`] for ergonomic insertion.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub ceph_config: Option<BTreeMap<String, BTreeMap<String, String>>>,
|
||||
}
|
||||
|
||||
impl CephClusterSpec {
|
||||
/// Set a single centralized Ceph config entry (`ceph config set <who>
|
||||
/// <key> <value>`). Chainable. Creates the `ceph_config` map and the
|
||||
/// per-WHO sub-map on demand.
|
||||
///
|
||||
/// Common `who` values: `"global"`, `"osd.*"`, `"mon.*"`, `"mgr.*"`,
|
||||
/// `"client.rgw.<store-name>"`, or a specific daemon like `"osd.0"`.
|
||||
pub fn set_config(
|
||||
&mut self,
|
||||
who: impl Into<String>,
|
||||
key: impl Into<String>,
|
||||
value: impl Into<String>,
|
||||
) -> &mut Self {
|
||||
self.ceph_config
|
||||
.get_or_insert_with(BTreeMap::new)
|
||||
.entry(who.into())
|
||||
.or_default()
|
||||
.insert(key.into(), value.into());
|
||||
self
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CephCluster {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
metadata: ObjectMeta::default(),
|
||||
spec: CephClusterSpec::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CephClusterSpec {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
ceph_version: CephVersionSpec::default(),
|
||||
data_dir_host_path: "/var/lib/rook".to_string(),
|
||||
mon: MonSpec::default(),
|
||||
mgr: MgrSpec::default(),
|
||||
dashboard: DashboardSpec::default(),
|
||||
storage: StorageSpec::default(),
|
||||
network: None,
|
||||
placement: BTreeMap::new(),
|
||||
crash_collector: None,
|
||||
log_collector: None,
|
||||
remove_osds_if_out_and_safe_to_remove: None,
|
||||
continue_upgrade_after_checks_even_if_not_healthy: None,
|
||||
skip_upgrade_checks: None,
|
||||
wait_timeout_for_healthy_osd_in_minutes: None,
|
||||
disruption_management: None,
|
||||
ceph_config: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct CephVersionSpec {
|
||||
pub image: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub allow_unsupported: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub image_pull_policy: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for CephVersionSpec {
|
||||
fn default() -> Self {
|
||||
// Pinned to the full version+build tag explicitly recommended for
|
||||
// production by the Rook 1.19 docs (Upgrade/ceph-upgrade). The
|
||||
// date-stamped suffix locks the exact container image so the cluster
|
||||
// can't go heterogeneous across daemon restarts.
|
||||
Self {
|
||||
image: "quay.io/ceph/ceph:v19.2.3-20250717".to_string(),
|
||||
allow_unsupported: Some(false),
|
||||
image_pull_policy: Some("IfNotPresent".to_string()),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct MonSpec {
|
||||
pub count: u32,
|
||||
pub allow_multiple_per_node: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub volume_claim_template: Option<VolumeClaimTemplate>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub failure_domain_label: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for MonSpec {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
count: 3,
|
||||
allow_multiple_per_node: false,
|
||||
volume_claim_template: None,
|
||||
failure_domain_label: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct MgrSpec {
|
||||
pub count: u32,
|
||||
#[serde(skip_serializing_if = "Vec::is_empty", default)]
|
||||
pub modules: Vec<ModuleSpec>,
|
||||
}
|
||||
|
||||
impl Default for MgrSpec {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
count: 2,
|
||||
modules: vec![ModuleSpec {
|
||||
name: "pg_autoscaler".to_string(),
|
||||
enabled: true,
|
||||
}],
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ModuleSpec {
|
||||
pub name: String,
|
||||
pub enabled: bool,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct DashboardSpec {
|
||||
pub enabled: bool,
|
||||
pub ssl: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub port: Option<u16>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub url_prefix: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for DashboardSpec {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
enabled: true,
|
||||
ssl: true,
|
||||
port: Some(8443),
|
||||
url_prefix: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct StorageSpec {
|
||||
pub use_all_nodes: bool,
|
||||
pub use_all_devices: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub device_filter: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub device_path_filter: Option<String>,
|
||||
#[serde(skip_serializing_if = "Vec::is_empty", default)]
|
||||
pub nodes: Vec<NodeSpec>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub config: Option<BTreeMap<String, String>>,
|
||||
#[serde(skip_serializing_if = "Vec::is_empty", default)]
|
||||
pub storage_class_device_sets: Vec<StorageClassDeviceSet>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub allow_device_class_update: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub allow_osd_crush_weight_update: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct NodeSpec {
|
||||
pub name: String,
|
||||
#[serde(skip_serializing_if = "Vec::is_empty", default)]
|
||||
pub devices: Vec<DeviceSpec>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub config: Option<BTreeMap<String, String>>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct DeviceSpec {
|
||||
pub name: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub config: Option<BTreeMap<String, String>>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct StorageClassDeviceSet {
|
||||
pub name: String,
|
||||
pub count: u32,
|
||||
pub portable: bool,
|
||||
pub volume_claim_templates: Vec<VolumeClaimTemplate>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub placement: Option<PlacementSpec>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct NetworkSpec {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub provider: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub host_network: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub dual_stack: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub ipv4: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub ipv6: Option<bool>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct CrashCollectorSpec {
|
||||
pub disable: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub days_to_retain: Option<u32>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct LogCollectorSpec {
|
||||
pub enabled: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub periodicity: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub max_log_size: Option<String>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct DisruptionManagementSpec {
|
||||
pub manage_pod_budgets: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub osd_maintenance_timeout: Option<u32>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub pg_health_check_timeout: Option<u32>,
|
||||
}
|
||||
54
harmony/src/modules/storage/ceph/crd/filesystem.rs
Normal file
54
harmony/src/modules/storage/ceph/crd/filesystem.rs
Normal file
@@ -0,0 +1,54 @@
|
||||
use kube::{CustomResource, api::ObjectMeta};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::shared::{MetadataServerSpec, NamedPoolSpec, PoolSpec, ReplicatedSpec};
|
||||
|
||||
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
|
||||
#[kube(
|
||||
group = "ceph.rook.io",
|
||||
version = "v1",
|
||||
kind = "CephFilesystem",
|
||||
plural = "cephfilesystems",
|
||||
namespaced = true,
|
||||
schema = "disabled"
|
||||
)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct CephFilesystemSpec {
|
||||
pub metadata_pool: PoolSpec,
|
||||
pub data_pools: Vec<NamedPoolSpec>,
|
||||
pub metadata_server: MetadataServerSpec,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub preserve_filesystem_on_delete: Option<bool>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub preserve_pools_on_delete: Option<bool>,
|
||||
}
|
||||
|
||||
impl Default for CephFilesystem {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
metadata: ObjectMeta::default(),
|
||||
spec: CephFilesystemSpec::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CephFilesystemSpec {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
metadata_pool: PoolSpec {
|
||||
replicated: Some(ReplicatedSpec::default()),
|
||||
..PoolSpec::default()
|
||||
},
|
||||
data_pools: vec![NamedPoolSpec {
|
||||
name: "data0".to_string(),
|
||||
spec: PoolSpec {
|
||||
replicated: Some(ReplicatedSpec::default()),
|
||||
..PoolSpec::default()
|
||||
},
|
||||
}],
|
||||
metadata_server: MetadataServerSpec::default(),
|
||||
preserve_filesystem_on_delete: Some(true),
|
||||
preserve_pools_on_delete: Some(false),
|
||||
}
|
||||
}
|
||||
}
|
||||
13
harmony/src/modules/storage/ceph/crd/mod.rs
Normal file
13
harmony/src/modules/storage/ceph/crd/mod.rs
Normal file
@@ -0,0 +1,13 @@
|
||||
pub mod block_pool;
|
||||
pub mod cluster;
|
||||
pub mod filesystem;
|
||||
pub mod object_store;
|
||||
pub mod object_store_user;
|
||||
pub mod shared;
|
||||
|
||||
pub use block_pool::*;
|
||||
pub use cluster::*;
|
||||
pub use filesystem::*;
|
||||
pub use object_store::*;
|
||||
pub use object_store_user::*;
|
||||
pub use shared::*;
|
||||
74
harmony/src/modules/storage/ceph/crd/object_store.rs
Normal file
74
harmony/src/modules/storage/ceph/crd/object_store.rs
Normal file
@@ -0,0 +1,74 @@
|
||||
use kube::{CustomResource, api::ObjectMeta};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::shared::{PlacementSpec, PoolSpec, ReplicatedSpec};
|
||||
|
||||
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
|
||||
#[kube(
|
||||
group = "ceph.rook.io",
|
||||
version = "v1",
|
||||
kind = "CephObjectStore",
|
||||
plural = "cephobjectstores",
|
||||
namespaced = true,
|
||||
schema = "disabled"
|
||||
)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct CephObjectStoreSpec {
|
||||
pub metadata_pool: PoolSpec,
|
||||
pub data_pool: PoolSpec,
|
||||
pub gateway: GatewaySpec,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub preserve_pools_on_delete: Option<bool>,
|
||||
}
|
||||
|
||||
impl Default for CephObjectStore {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
metadata: ObjectMeta::default(),
|
||||
spec: CephObjectStoreSpec::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CephObjectStoreSpec {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
metadata_pool: PoolSpec {
|
||||
replicated: Some(ReplicatedSpec::default()),
|
||||
..PoolSpec::default()
|
||||
},
|
||||
data_pool: PoolSpec {
|
||||
replicated: Some(ReplicatedSpec::default()),
|
||||
..PoolSpec::default()
|
||||
},
|
||||
gateway: GatewaySpec::default(),
|
||||
preserve_pools_on_delete: Some(false),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct GatewaySpec {
|
||||
/// Non-secure gateway port. Defaults to 8080 to dodge OKD's <1024 bind restriction.
|
||||
pub port: u16,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub secure_port: Option<u16>,
|
||||
pub instances: u32,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub placement: Option<PlacementSpec>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub ssl_certificate_ref: Option<String>,
|
||||
}
|
||||
|
||||
impl Default for GatewaySpec {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
port: 8080,
|
||||
secure_port: None,
|
||||
instances: 1,
|
||||
placement: None,
|
||||
ssl_certificate_ref: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
128
harmony/src/modules/storage/ceph/crd/object_store_user.rs
Normal file
128
harmony/src/modules/storage/ceph/crd/object_store_user.rs
Normal file
@@ -0,0 +1,128 @@
|
||||
use kube::{CustomResource, api::ObjectMeta};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// `CephObjectStoreUser` — provisions an RGW S3 user against a `CephObjectStore`.
|
||||
///
|
||||
/// Rook auto-creates a Kubernetes Secret named
|
||||
/// `rook-ceph-object-user-<store>-<user>` containing the user's access
|
||||
/// credentials, with these base64-encoded keys:
|
||||
/// - `AccessKey` — S3 access key ID (20 bytes)
|
||||
/// - `SecretKey` — S3 secret access key (40 bytes)
|
||||
///
|
||||
/// The Secret lives in the same namespace as the user.
|
||||
#[derive(CustomResource, Deserialize, Serialize, Clone, Debug)]
|
||||
#[kube(
|
||||
group = "ceph.rook.io",
|
||||
version = "v1",
|
||||
kind = "CephObjectStoreUser",
|
||||
plural = "cephobjectstoreusers",
|
||||
namespaced = true,
|
||||
schema = "disabled"
|
||||
)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct CephObjectStoreUserSpec {
|
||||
/// Name of the `CephObjectStore` this user belongs to.
|
||||
pub store: String,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub display_name: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub cluster_namespace: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub quotas: Option<UserQuotas>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub capabilities: Option<UserCapabilities>,
|
||||
}
|
||||
|
||||
impl Default for CephObjectStoreUser {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
metadata: ObjectMeta::default(),
|
||||
spec: CephObjectStoreUserSpec::default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for CephObjectStoreUserSpec {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
store: String::new(),
|
||||
display_name: None,
|
||||
cluster_namespace: None,
|
||||
quotas: None,
|
||||
capabilities: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct UserQuotas {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub max_buckets: Option<i32>,
|
||||
/// Resource-quantity string, e.g. `"10G"`, `"500M"`.
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub max_size: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub max_objects: Option<i64>,
|
||||
}
|
||||
|
||||
/// RGW capability strings — each accepts `"read"`, `"write"`, `"read, write"`,
|
||||
/// or `"*"`.
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct UserCapabilities {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub user: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub bucket: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub usage: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub metadata: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub zone: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub roles: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub info: Option<String>,
|
||||
#[serde(rename = "amz-cache", skip_serializing_if = "Option::is_none")]
|
||||
pub amz_cache: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub bilog: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub mdlog: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub datalog: Option<String>,
|
||||
#[serde(rename = "user-policy", skip_serializing_if = "Option::is_none")]
|
||||
pub user_policy: Option<String>,
|
||||
#[serde(rename = "odic-provider", skip_serializing_if = "Option::is_none")]
|
||||
pub odic_provider: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub ratelimit: Option<String>,
|
||||
}
|
||||
|
||||
impl CephObjectStoreUser {
|
||||
/// Convenience constructor: a user attached to `store` with the given
|
||||
/// display name. Capabilities and quotas left unset (RGW defaults apply).
|
||||
pub fn for_store(name: &str, namespace: &str, store: &str, display_name: &str) -> Self {
|
||||
Self {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(name.to_string()),
|
||||
namespace: Some(namespace.to_string()),
|
||||
..ObjectMeta::default()
|
||||
},
|
||||
spec: CephObjectStoreUserSpec {
|
||||
store: store.to_string(),
|
||||
display_name: Some(display_name.to_string()),
|
||||
..CephObjectStoreUserSpec::default()
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
/// Name of the auto-generated Secret carrying this user's `AccessKey` /
|
||||
/// `SecretKey` (base64-encoded).
|
||||
pub fn credentials_secret_name(&self) -> String {
|
||||
let user = self.metadata.name.as_deref().unwrap_or("");
|
||||
format!("rook-ceph-object-user-{}-{}", self.spec.store, user)
|
||||
}
|
||||
}
|
||||
117
harmony/src/modules/storage/ceph/crd/shared.rs
Normal file
117
harmony/src/modules/storage/ceph/crd/shared.rs
Normal file
@@ -0,0 +1,117 @@
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use k8s_openapi::api::core::v1::{PersistentVolumeClaim, Toleration};
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct PoolSpec {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub replicated: Option<ReplicatedSpec>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub erasure_coded: Option<ErasureCodedSpec>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub failure_domain: Option<FailureDomain>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub device_class: Option<String>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub parameters: Option<BTreeMap<String, String>>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ReplicatedSpec {
|
||||
pub size: u32,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub require_safe_replica_size: Option<bool>,
|
||||
}
|
||||
|
||||
impl Default for ReplicatedSpec {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
size: 3,
|
||||
require_safe_replica_size: Some(true),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct ErasureCodedSpec {
|
||||
pub data_chunks: u32,
|
||||
pub coding_chunks: u32,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "lowercase")]
|
||||
pub enum FailureDomain {
|
||||
Osd,
|
||||
Host,
|
||||
Rack,
|
||||
Zone,
|
||||
Region,
|
||||
}
|
||||
|
||||
impl Default for FailureDomain {
|
||||
fn default() -> Self {
|
||||
Self::Host
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct NamedPoolSpec {
|
||||
pub name: String,
|
||||
#[serde(flatten)]
|
||||
pub spec: PoolSpec,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct MetadataServerSpec {
|
||||
pub active_count: u32,
|
||||
pub active_standby: bool,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub placement: Option<PlacementSpec>,
|
||||
}
|
||||
|
||||
impl Default for MetadataServerSpec {
|
||||
fn default() -> Self {
|
||||
Self {
|
||||
active_count: 1,
|
||||
active_standby: true,
|
||||
placement: None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct PlacementSpec {
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub node_affinity: Option<serde_json::Value>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub pod_affinity: Option<serde_json::Value>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub pod_anti_affinity: Option<serde_json::Value>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub tolerations: Option<Vec<Toleration>>,
|
||||
#[serde(skip_serializing_if = "Option::is_none")]
|
||||
pub topology_spread_constraints: Option<serde_json::Value>,
|
||||
}
|
||||
|
||||
#[derive(Deserialize, Serialize, Clone, Debug, Default)]
|
||||
#[serde(rename_all = "camelCase")]
|
||||
pub struct VolumeClaimTemplate {
|
||||
pub metadata: k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta,
|
||||
pub spec: k8s_openapi::api::core::v1::PersistentVolumeClaimSpec,
|
||||
}
|
||||
|
||||
impl From<PersistentVolumeClaim> for VolumeClaimTemplate {
|
||||
fn from(pvc: PersistentVolumeClaim) -> Self {
|
||||
Self {
|
||||
metadata: pvc.metadata,
|
||||
spec: pvc.spec.unwrap_or_default(),
|
||||
}
|
||||
}
|
||||
}
|
||||
@@ -1,2 +1,9 @@
|
||||
pub mod ceph_remove_osd_score;
|
||||
pub mod ceph_validate_health_score;
|
||||
pub mod cluster_score;
|
||||
pub mod crd;
|
||||
pub mod operator_score;
|
||||
pub mod toolbox;
|
||||
|
||||
pub use cluster_score::*;
|
||||
pub use operator_score::*;
|
||||
|
||||
128
harmony/src/modules/storage/ceph/operator_score.rs
Normal file
128
harmony/src/modules/storage/ceph/operator_score.rs
Normal file
@@ -0,0 +1,128 @@
|
||||
use std::collections::HashMap;
|
||||
use std::str::FromStr;
|
||||
|
||||
use non_blank_string_rs::NonBlankString;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::interpret::Interpret;
|
||||
use crate::modules::helm::chart::{HelmChartScore, HelmRepository};
|
||||
use crate::score::Score;
|
||||
use crate::topology::{HelmCommand, Topology};
|
||||
use harmony_macros::hurl;
|
||||
|
||||
/// Install the Rook-Ceph operator via its upstream Helm chart.
|
||||
///
|
||||
/// The Rook docs explicitly recommend Helm on OpenShift/OKD because the chart
|
||||
/// automatically creates the `SecurityContextConstraints` resources OKD requires.
|
||||
/// This Score wraps `HelmChartScore` against `https://charts.rook.io/release`.
|
||||
///
|
||||
/// The chart installs:
|
||||
/// - The `rook-ceph` operator Deployment
|
||||
/// - All `ceph.rook.io/v1` CRDs (CephCluster, CephBlockPool, CephFilesystem, CephObjectStore, ...)
|
||||
/// - RBAC (ServiceAccounts, Roles, ClusterRoles, RoleBindings, ClusterRoleBindings)
|
||||
/// - OpenShift SCC bindings when `hostpath_requires_privileged` is true
|
||||
///
|
||||
/// The chart does **not** install the `rook-ceph-tools` (toolbox) pod in
|
||||
/// Rook v1.19 — that's now a standalone manifest in `deploy/examples/`.
|
||||
/// `RookCephClusterScore` deploys it as a typed Rust Deployment via
|
||||
/// [`super::toolbox::toolbox_deployment`].
|
||||
///
|
||||
/// The CRs that consume these CRDs are deployed separately by
|
||||
/// `RookCephClusterScore` (typed Rust structs applied via `K8sResourceScore`),
|
||||
/// preserving compile-time type-safety on the user-facing surface.
|
||||
///
|
||||
/// # OKD requirements
|
||||
/// - `hostpath_requires_privileged` must be `true` — OpenShift's SELinux
|
||||
/// restricted-set blocks hostPath writes otherwise.
|
||||
///
|
||||
/// # Usage
|
||||
/// ```ignore
|
||||
/// use harmony::modules::storage::ceph::RookCephOperatorScore;
|
||||
/// let score = RookCephOperatorScore::default_okd();
|
||||
/// ```
|
||||
#[derive(Debug, Clone, Serialize)]
|
||||
pub struct RookCephOperatorScore {
|
||||
pub namespace: String,
|
||||
pub chart_version: Option<String>,
|
||||
pub hostpath_requires_privileged: bool,
|
||||
pub enable_rbd_driver: bool,
|
||||
pub enable_cephfs_driver: bool,
|
||||
}
|
||||
|
||||
impl RookCephOperatorScore {
|
||||
/// OKD-friendly defaults: `rook-ceph` namespace, both CSI drivers enabled,
|
||||
/// hostPath privileged mode on. Pinned to Rook 1.19.5 — the latest stable
|
||||
/// release at the time this Score was written, and the pair tested
|
||||
/// against the Ceph image default in
|
||||
/// [`super::crd::CephVersionSpec::default`].
|
||||
pub fn default_okd() -> Self {
|
||||
Self {
|
||||
namespace: "rook-ceph".to_string(),
|
||||
chart_version: Some("v1.19.5".to_string()),
|
||||
hostpath_requires_privileged: true,
|
||||
enable_rbd_driver: true,
|
||||
enable_cephfs_driver: true,
|
||||
}
|
||||
}
|
||||
|
||||
/// K3s / vanilla-K8s defaults — same as OKD but without the privileged flag.
|
||||
pub fn default_k8s() -> Self {
|
||||
Self {
|
||||
hostpath_requires_privileged: false,
|
||||
..Self::default_okd()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl Default for RookCephOperatorScore {
|
||||
fn default() -> Self {
|
||||
Self::default_okd()
|
||||
}
|
||||
}
|
||||
|
||||
impl<T: Topology + HelmCommand> Score<T> for RookCephOperatorScore {
|
||||
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
|
||||
let mut values: HashMap<NonBlankString, String> = HashMap::new();
|
||||
values.insert(
|
||||
NonBlankString::from_str("csi.enableRbdDriver").unwrap(),
|
||||
self.enable_rbd_driver.to_string(),
|
||||
);
|
||||
values.insert(
|
||||
NonBlankString::from_str("csi.enableCephfsDriver").unwrap(),
|
||||
self.enable_cephfs_driver.to_string(),
|
||||
);
|
||||
if self.hostpath_requires_privileged {
|
||||
values.insert(
|
||||
NonBlankString::from_str("hostpathRequiresPrivileged").unwrap(),
|
||||
"true".to_string(),
|
||||
);
|
||||
}
|
||||
|
||||
let chart_version = self
|
||||
.chart_version
|
||||
.as_ref()
|
||||
.map(|v| NonBlankString::from_str(v).expect("chart_version must be non-blank"));
|
||||
|
||||
let helm_score = HelmChartScore {
|
||||
namespace: Some(NonBlankString::from_str(&self.namespace).unwrap()),
|
||||
release_name: NonBlankString::from_str("rook-ceph").unwrap(),
|
||||
chart_name: NonBlankString::from_str("rook-release/rook-ceph").unwrap(),
|
||||
chart_version,
|
||||
values_overrides: Some(values),
|
||||
values_yaml: None,
|
||||
create_namespace: true,
|
||||
install_only: true,
|
||||
repository: Some(HelmRepository::new(
|
||||
"rook-release".to_string(),
|
||||
hurl!("https://charts.rook.io/release"),
|
||||
true,
|
||||
)),
|
||||
};
|
||||
|
||||
helm_score.create_interpret()
|
||||
}
|
||||
|
||||
fn name(&self) -> String {
|
||||
format!("RookCephOperatorScore({})", self.namespace)
|
||||
}
|
||||
}
|
||||
255
harmony/src/modules/storage/ceph/toolbox.rs
Normal file
255
harmony/src/modules/storage/ceph/toolbox.rs
Normal file
@@ -0,0 +1,255 @@
|
||||
//! Typed `rook-ceph-tools` Deployment.
|
||||
//!
|
||||
//! In Rook v1.19 the toolbox is *not* installed by the operator Helm chart —
|
||||
//! it's a standalone manifest at `deploy/examples/toolbox.yaml` in the rook
|
||||
//! repo. This module ports that manifest to a typed `k8s_openapi::Deployment`
|
||||
//! so [`super::RookCephClusterScore`] can apply it via `K8sResourceScore`
|
||||
//! without dropping back to raw YAML.
|
||||
//!
|
||||
//! The container image is sourced from the CephCluster spec's `cephVersion`
|
||||
//! so the toolbox stays in lockstep with the cluster's Ceph version.
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
|
||||
use k8s_openapi::api::apps::v1::{Deployment, DeploymentSpec};
|
||||
use k8s_openapi::api::core::v1::{
|
||||
Capabilities, ConfigMapVolumeSource, Container, EmptyDirVolumeSource, EnvVar, EnvVarSource,
|
||||
KeyToPath, PodSpec, PodTemplateSpec, SecretKeySelector, SecretVolumeSource, SecurityContext,
|
||||
Toleration, Volume, VolumeMount,
|
||||
};
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta;
|
||||
|
||||
/// Inline replication of Rook's `toolbox.sh` — re-renders `/etc/ceph/ceph.conf`
|
||||
/// and keyring on mon-endpoint changes so the bundled `quay.io/ceph/ceph`
|
||||
/// image (which lacks the rook tooling) can be used directly.
|
||||
///
|
||||
/// Verbatim from `deploy/examples/toolbox.yaml` in rook/rook@v1.19.5.
|
||||
const TOOLBOX_SCRIPT: &str = r#"# Replicate the script from toolbox.sh inline so the ceph image
|
||||
# can be run directly, instead of requiring the rook toolbox
|
||||
CEPH_CONFIG="/etc/ceph/ceph.conf"
|
||||
MON_CONFIG="/etc/rook/mon-endpoints"
|
||||
KEYRING_FILE="/etc/ceph/keyring"
|
||||
CONFIG_OVERRIDE="/etc/rook-config-override/config"
|
||||
|
||||
# create a ceph config file in its default location so ceph/rados tools can be used
|
||||
# without specifying any arguments
|
||||
write_endpoints() {
|
||||
endpoints=$(cat ${MON_CONFIG})
|
||||
|
||||
# filter out the mon names
|
||||
# external cluster can have numbers or hyphens in mon names, handling them in regex
|
||||
# shellcheck disable=SC2001
|
||||
mon_endpoints=$(echo "${endpoints}"| sed 's/[a-z0-9_-]\+=//g')
|
||||
|
||||
DATE=$(date)
|
||||
echo "$DATE writing mon endpoints to ${CEPH_CONFIG}: ${endpoints}"
|
||||
cat <<EOF > ${CEPH_CONFIG}
|
||||
[global]
|
||||
mon_host = ${mon_endpoints}
|
||||
|
||||
[client.admin]
|
||||
keyring = ${KEYRING_FILE}
|
||||
EOF
|
||||
|
||||
# Merge the config override if it exists and is not empty
|
||||
if [ -f "${CONFIG_OVERRIDE}" ] && [ -s "${CONFIG_OVERRIDE}" ]; then
|
||||
echo "$DATE merging config override from ${CONFIG_OVERRIDE}"
|
||||
echo "" >> ${CEPH_CONFIG}
|
||||
cat ${CONFIG_OVERRIDE} >> ${CEPH_CONFIG}
|
||||
fi
|
||||
}
|
||||
|
||||
# watch the endpoints config file and update if the mon endpoints ever change
|
||||
watch_endpoints() {
|
||||
# get the timestamp for the target of the soft link
|
||||
real_path=$(realpath ${MON_CONFIG})
|
||||
initial_time=$(stat -c %Z "${real_path}")
|
||||
while true; do
|
||||
real_path=$(realpath ${MON_CONFIG})
|
||||
latest_time=$(stat -c %Z "${real_path}")
|
||||
|
||||
if [[ "${latest_time}" != "${initial_time}" ]]; then
|
||||
write_endpoints
|
||||
initial_time=${latest_time}
|
||||
fi
|
||||
|
||||
sleep 10
|
||||
done
|
||||
}
|
||||
|
||||
# read the secret from an env var (for backward compatibility), or from the secret file
|
||||
ceph_secret=${ROOK_CEPH_SECRET}
|
||||
if [[ "$ceph_secret" == "" ]]; then
|
||||
ceph_secret=$(cat /var/lib/rook-ceph-mon/secret.keyring)
|
||||
fi
|
||||
|
||||
# create the keyring file
|
||||
cat <<EOF > ${KEYRING_FILE}
|
||||
[${ROOK_CEPH_USERNAME}]
|
||||
key = ${ceph_secret}
|
||||
EOF
|
||||
|
||||
# write the initial config file
|
||||
write_endpoints
|
||||
|
||||
# continuously update the mon endpoints if they fail over
|
||||
watch_endpoints
|
||||
"#;
|
||||
|
||||
const TOOLBOX_NAME: &str = "rook-ceph-tools";
|
||||
const TOOLBOX_LABEL: &str = "rook-ceph-tools";
|
||||
|
||||
/// Build the canonical `rook-ceph-tools` Deployment for the given namespace
|
||||
/// and Ceph container image. Ported verbatim from rook/rook@v1.19.5's
|
||||
/// `deploy/examples/toolbox.yaml`.
|
||||
pub fn toolbox_deployment(namespace: &str, image: &str) -> Deployment {
|
||||
let labels = {
|
||||
let mut m = BTreeMap::new();
|
||||
m.insert("app".to_string(), TOOLBOX_LABEL.to_string());
|
||||
m
|
||||
};
|
||||
|
||||
let container = Container {
|
||||
name: TOOLBOX_NAME.to_string(),
|
||||
image: Some(image.to_string()),
|
||||
image_pull_policy: Some("IfNotPresent".to_string()),
|
||||
command: Some(vec![
|
||||
"/bin/bash".to_string(),
|
||||
"-c".to_string(),
|
||||
TOOLBOX_SCRIPT.to_string(),
|
||||
]),
|
||||
tty: Some(true),
|
||||
security_context: Some(SecurityContext {
|
||||
run_as_non_root: Some(true),
|
||||
run_as_user: Some(2016),
|
||||
run_as_group: Some(2016),
|
||||
capabilities: Some(Capabilities {
|
||||
drop: Some(vec!["ALL".to_string()]),
|
||||
..Capabilities::default()
|
||||
}),
|
||||
..SecurityContext::default()
|
||||
}),
|
||||
env: Some(vec![EnvVar {
|
||||
name: "ROOK_CEPH_USERNAME".to_string(),
|
||||
value_from: Some(EnvVarSource {
|
||||
secret_key_ref: Some(SecretKeySelector {
|
||||
name: "rook-ceph-mon".to_string(),
|
||||
key: "ceph-username".to_string(),
|
||||
optional: None,
|
||||
}),
|
||||
..EnvVarSource::default()
|
||||
}),
|
||||
..EnvVar::default()
|
||||
}]),
|
||||
volume_mounts: Some(vec![
|
||||
VolumeMount {
|
||||
name: "ceph-config".to_string(),
|
||||
mount_path: "/etc/ceph".to_string(),
|
||||
..VolumeMount::default()
|
||||
},
|
||||
VolumeMount {
|
||||
name: "mon-endpoint-volume".to_string(),
|
||||
mount_path: "/etc/rook".to_string(),
|
||||
..VolumeMount::default()
|
||||
},
|
||||
VolumeMount {
|
||||
name: "ceph-admin-secret".to_string(),
|
||||
mount_path: "/var/lib/rook-ceph-mon".to_string(),
|
||||
read_only: Some(true),
|
||||
..VolumeMount::default()
|
||||
},
|
||||
VolumeMount {
|
||||
name: "rook-config-override".to_string(),
|
||||
mount_path: "/etc/rook-config-override".to_string(),
|
||||
read_only: Some(true),
|
||||
..VolumeMount::default()
|
||||
},
|
||||
]),
|
||||
..Container::default()
|
||||
};
|
||||
|
||||
let volumes = vec![
|
||||
Volume {
|
||||
name: "ceph-admin-secret".to_string(),
|
||||
secret: Some(SecretVolumeSource {
|
||||
secret_name: Some("rook-ceph-mon".to_string()),
|
||||
optional: Some(false),
|
||||
items: Some(vec![KeyToPath {
|
||||
key: "ceph-secret".to_string(),
|
||||
path: "secret.keyring".to_string(),
|
||||
..KeyToPath::default()
|
||||
}]),
|
||||
..SecretVolumeSource::default()
|
||||
}),
|
||||
..Volume::default()
|
||||
},
|
||||
Volume {
|
||||
name: "mon-endpoint-volume".to_string(),
|
||||
config_map: Some(ConfigMapVolumeSource {
|
||||
name: "rook-ceph-mon-endpoints".to_string(),
|
||||
items: Some(vec![KeyToPath {
|
||||
key: "data".to_string(),
|
||||
path: "mon-endpoints".to_string(),
|
||||
..KeyToPath::default()
|
||||
}]),
|
||||
..ConfigMapVolumeSource::default()
|
||||
}),
|
||||
..Volume::default()
|
||||
},
|
||||
Volume {
|
||||
name: "rook-config-override".to_string(),
|
||||
config_map: Some(ConfigMapVolumeSource {
|
||||
name: "rook-config-override".to_string(),
|
||||
optional: Some(true),
|
||||
..ConfigMapVolumeSource::default()
|
||||
}),
|
||||
..Volume::default()
|
||||
},
|
||||
Volume {
|
||||
name: "ceph-config".to_string(),
|
||||
empty_dir: Some(EmptyDirVolumeSource::default()),
|
||||
..Volume::default()
|
||||
},
|
||||
];
|
||||
|
||||
let pod_spec = PodSpec {
|
||||
dns_policy: Some("ClusterFirstWithHostNet".to_string()),
|
||||
service_account_name: Some("rook-ceph-default".to_string()),
|
||||
containers: vec![container],
|
||||
volumes: Some(volumes),
|
||||
tolerations: Some(vec![Toleration {
|
||||
key: Some("node.kubernetes.io/unreachable".to_string()),
|
||||
operator: Some("Exists".to_string()),
|
||||
effect: Some("NoExecute".to_string()),
|
||||
toleration_seconds: Some(5),
|
||||
..Toleration::default()
|
||||
}]),
|
||||
..PodSpec::default()
|
||||
};
|
||||
|
||||
Deployment {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(TOOLBOX_NAME.to_string()),
|
||||
namespace: Some(namespace.to_string()),
|
||||
labels: Some(labels.clone()),
|
||||
..ObjectMeta::default()
|
||||
},
|
||||
spec: Some(DeploymentSpec {
|
||||
replicas: Some(1),
|
||||
selector: LabelSelector {
|
||||
match_labels: Some(labels.clone()),
|
||||
..LabelSelector::default()
|
||||
},
|
||||
template: PodTemplateSpec {
|
||||
metadata: Some(ObjectMeta {
|
||||
labels: Some(labels),
|
||||
..ObjectMeta::default()
|
||||
}),
|
||||
spec: Some(pod_spec),
|
||||
},
|
||||
..DeploymentSpec::default()
|
||||
}),
|
||||
..Deployment::default()
|
||||
}
|
||||
}
|
||||
Reference in New Issue
Block a user