Compare commits

..

11 Commits

Author SHA1 Message Date
a9fe4ab267 fix: cargo fmt
All checks were successful
Run Check Script / check (pull_request) Successful in 1m0s
2025-08-25 13:33:36 -04:00
65cc9befeb mod.rs
Some checks failed
Run Check Script / check (pull_request) Failing after 20s
2025-08-25 13:31:39 -04:00
d456a1f9ee feat: score to validate whether the ceph cluster is healthy 2025-08-25 13:30:32 -04:00
d36c574590 Merge pull request 'feat/inventory_agent' (#119) from feat/inventory_agent into master
Some checks failed
Run Check Script / check (push) Failing after 38s
Compile and package harmony_composer / package_harmony_composer (push) Successful in 5m48s
Reviewed-on: #119
2025-08-22 01:55:52 +00:00
bfca9cf163 Merge pull request 'feat/ceph-osd-score' (#116) from feat/ceph-osd-score into master
Some checks failed
Run Check Script / check (push) Failing after 36s
Compile and package harmony_composer / package_harmony_composer (push) Successful in 15m5s
Reviewed-on: #116
Reviewed-by: johnride <jg@nationtech.io>
2025-08-20 18:19:42 +00:00
cd3ea6fc10 fix: added check to ensure that rook-ceph-tools is available in the designated namespace
All checks were successful
Run Check Script / check (pull_request) Successful in 1m16s
2025-08-20 12:54:19 -04:00
89eb88d10e feat: socre to remove an osd from the ceph osd tree using K8sClient to interact with rook-ceph-toolbox pod 2025-08-20 12:09:55 -04:00
72fb05b5cc fix(inventory_agent) : Agent now retreives correct dmidecode fields, fixed uuid generation which is unacceptable, fixed storage drive parsing, much better error handling, much more strict behavior which also leads to more complete output as missing fields will raise errors unless explicitely optional 2025-08-19 17:56:06 -04:00
6685b05cc5 wip(inventory_agent): Refactoring for better error handling in progress 2025-08-19 17:05:23 -04:00
d1a274b705 fix: checks deployment status ready replicas rather than pod name since the pod name is not necessarily matching the deployment name and often has a random generated number in it 2025-08-15 15:44:06 -04:00
b43ca7c740 feat: score for preparing rook ceph cluster to remove drive based on rook-ceph-osd deployment name added functions to K8sclient to be able to scale deployment to a desired replicaset number and get pod based on name and namespace 2025-08-15 14:51:16 -04:00
15 changed files with 1289 additions and 614 deletions

5
Cargo.lock generated
View File

@@ -105,7 +105,7 @@ dependencies = [
"futures-core",
"futures-util",
"mio 1.0.4",
"socket2",
"socket2 0.5.10",
"tokio",
"tracing",
]
@@ -167,7 +167,7 @@ dependencies = [
"serde_json",
"serde_urlencoded",
"smallvec",
"socket2",
"socket2 0.5.10",
"time",
"tracing",
"url",
@@ -2178,7 +2178,6 @@ dependencies = [
"serde",
"serde_json",
"sysinfo",
"uuid",
]
[[package]]

View File

@@ -1,108 +0,0 @@
# OPNsense PXE Lab Environment
This project contains a script to automatically set up a virtual lab environment for testing PXE boot services managed by an OPNsense firewall.
## Overview
The `pxe_vm_lab_setup.sh` script will create the following resources using libvirt/KVM:
1. **A Virtual Network**: An isolated network named `harmonylan` (`virbr1`) for the lab.
2. **Two Virtual Machines**:
* `opnsense-pxe`: A firewall VM that will act as the gateway and PXE server.
* `pxe-node-1`: A client VM configured to boot from the network.
## Prerequisites
Ensure you have the following software installed on your Arch Linux host:
* `libvirt`
* `qemu`
* `virt-install` (from the `virt-install` package)
* `curl`
* `bzip2`
## Usage
### 1. Create the Environment
Run the `up` command to download the necessary images and create the network and VMs.
```bash
sudo ./pxe_vm_lab_setup.sh up
```
### 2. Install and Configure OPNsense
The OPNsense VM is created but the OS needs to be installed manually via the console.
1. **Connect to the VM console**:
```bash
sudo virsh console opnsense-pxe
```
2. **Log in as the installer**:
* Username: `installer`
* Password: `opnsense`
3. **Follow the on-screen installation wizard**. When prompted to assign network interfaces (`WAN` and `LAN`):
* Find the MAC address for the `harmonylan` interface by running this command in another terminal:
```bash
virsh domiflist opnsense-pxe
# Example output:
# Interface Type Source Model MAC
# ---------------------------------------------------------
# vnet18 network default virtio 52:54:00:b5:c4:6d
# vnet19 network harmonylan virtio 52:54:00:21:f9:ba
```
* Assign the interface connected to `harmonylan` (e.g., `vtnet1` with MAC `52:54:00:21:f9:ba`) as your **LAN**.
* Assign the other interface as your **WAN**.
4. After the installation is complete, **shut down** the VM from the console menu.
5. **Detach the installation media** by editing the VM's configuration:
```bash
sudo virsh edit opnsense-pxe
```
Find and **delete** the entire `<disk>` block corresponding to the `.img` file (the one with `<target ... bus='usb'/>`).
6. **Start the VM** to boot into the newly installed system:
```bash
sudo virsh start opnsense-pxe
```
### 3. Connect to OPNsense from Your Host
To configure OPNsense, you need to connect your host to the `harmonylan` network.
1. By default, OPNsense configures its LAN interface with the IP `192.168.1.1`.
2. Assign a compatible IP address to your host's `virbr1` bridge interface:
```bash
sudo ip addr add 192.168.1.5/24 dev virbr1
```
3. You can now access the OPNsense VM from your host:
* **SSH**: `ssh root@192.168.1.1` (password: `opnsense`)
* **Web UI**: `https://192.168.1.1`
### 4. Configure PXE Services with Harmony
With connectivity established, you can now use Harmony to configure the OPNsense firewall for PXE booting. Point your Harmony OPNsense scores to the firewall using these details:
* **Hostname/IP**: `192.168.1.1`
* **Credentials**: `root` / `opnsense`
### 5. Boot the PXE Client
Once your Harmony configuration has been applied and OPNsense is serving DHCP/TFTP, start the client VM. It will automatically attempt to boot from the network.
```bash
sudo virsh start pxe-node-1
sudo virsh console pxe-node-1
```
## Cleanup
To destroy all VMs and networks created by the script, run the `clean` command:
```bash
sudo ./pxe_vm_lab_setup.sh clean
```

View File

@@ -1,191 +0,0 @@
#!/usr/bin/env bash
set -euo pipefail
# --- Configuration ---
LAB_DIR="/var/lib/harmony_pxe_test"
IMG_DIR="${LAB_DIR}/images"
STATE_DIR="${LAB_DIR}/state"
VM_OPN="opnsense-pxe"
VM_PXE="pxe-node-1"
NET_HARMONYLAN="harmonylan"
# Network settings for the isolated LAN
VLAN_CIDR="192.168.150.0/24"
VLAN_GW="192.168.150.1"
VLAN_MASK="255.255.255.0"
# VM Specifications
RAM_OPN="2048"
VCPUS_OPN="2"
DISK_OPN_GB="10"
OS_VARIANT_OPN="freebsd14.0" # Updated to a more recent FreeBSD variant
RAM_PXE="4096"
VCPUS_PXE="2"
DISK_PXE_GB="40"
OS_VARIANT_LINUX="centos-stream9"
OPN_IMG_URL="https://mirror.ams1.nl.leaseweb.net/opnsense/releases/25.7/OPNsense-25.7-serial-amd64.img.bz2"
OPN_IMG_PATH="${IMG_DIR}/OPNsense-25.7-serial-amd64.img"
CENTOS_ISO_URL="https://mirror.stream.centos.org/9-stream/BaseOS/x86_64/os/images/boot.iso"
CENTOS_ISO_PATH="${IMG_DIR}/CentOS-Stream-9-latest-boot.iso"
CONNECT_URI="qemu:///system"
download_if_missing() {
local url="$1"
local dest="$2"
if [[ ! -f "$dest" ]]; then
echo "Downloading $url to $dest"
mkdir -p "$(dirname "$dest")"
local tmp
tmp="$(mktemp)"
curl -L --progress-bar "$url" -o "$tmp"
case "$url" in
*.bz2) bunzip2 -c "$tmp" > "$dest" && rm -f "$tmp" ;;
*) mv "$tmp" "$dest" ;;
esac
else
echo "Already present: $dest"
fi
}
# Ensures a libvirt network is defined and active
ensure_network() {
local net_name="$1"
local net_xml_path="$2"
if virsh --connect "${CONNECT_URI}" net-info "${net_name}" >/dev/null 2>&1; then
echo "Network ${net_name} already exists."
else
echo "Defining network ${net_name} from ${net_xml_path}"
virsh --connect "${CONNECT_URI}" net-define "${net_xml_path}"
fi
if ! virsh --connect "${CONNECT_URI}" net-info "${net_name}" | grep "Active: *yes"; then
echo "Starting network ${net_name}..."
virsh --connect "${CONNECT_URI}" net-start "${net_name}"
virsh --connect "${CONNECT_URI}" net-autostart "${net_name}"
fi
}
# Destroys a VM completely
destroy_vm() {
local vm_name="$1"
if virsh --connect "${CONNECT_URI}" dominfo "$vm_name" >/dev/null 2>&1; then
echo "Destroying and undefining VM: ${vm_name}"
virsh --connect "${CONNECT_URI}" destroy "$vm_name" || true
virsh --connect "${CONNECT_URI}" undefine "$vm_name" --nvram
fi
}
# Destroys a libvirt network
destroy_network() {
local net_name="$1"
if virsh --connect "${CONNECT_URI}" net-info "$net_name" >/dev/null 2>&1; then
echo "Destroying and undefining network: ${net_name}"
virsh --connect "${CONNECT_URI}" net-destroy "$net_name" || true
virsh --connect "${CONNECT_URI}" net-undefine "$net_name"
fi
}
# --- Main Logic ---
create_lab_environment() {
# Create network definition files
cat > "${STATE_DIR}/default.xml" <<EOF
<network>
<name>default</name>
<forward mode='nat'/>
<bridge name='virbr0' stp='on' delay='0'/>
<ip address='192.168.122.1' netmask='255.255.255.0'>
<dhcp>
<range start='192.168.122.100' end='192.168.122.200'/>
</dhcp>
</ip>
</network>
EOF
cat > "${STATE_DIR}/${NET_HARMONYLAN}.xml" <<EOF
<network>
<name>${NET_HARMONYLAN}</name>
<bridge name='virbr1' stp='on' delay='0'/>
</network>
EOF
# Ensure both networks exist and are active
ensure_network "default" "${STATE_DIR}/default.xml"
ensure_network "${NET_HARMONYLAN}" "${STATE_DIR}/${NET_HARMONYLAN}.xml"
# --- Create OPNsense VM (MODIFIED SECTION) ---
local disk_opn="${IMG_DIR}/${VM_OPN}.qcow2"
if [[ ! -f "$disk_opn" ]]; then
qemu-img create -f qcow2 "$disk_opn" "${DISK_OPN_GB}G"
fi
echo "Creating OPNsense VM using serial image..."
virt-install \
--connect "${CONNECT_URI}" \
--name "${VM_OPN}" \
--ram "${RAM_OPN}" \
--vcpus "${VCPUS_OPN}" \
--cpu host-passthrough \
--os-variant "${OS_VARIANT_OPN}" \
--graphics none \
--noautoconsole \
--disk path="${disk_opn}",device=disk,bus=virtio,boot.order=1 \
--disk path="${OPN_IMG_PATH}",device=disk,bus=usb,readonly=on,boot.order=2 \
--network network=default,model=virtio \
--network network="${NET_HARMONYLAN}",model=virtio \
--boot uefi,menu=on
echo "OPNsense VM created. Connect with: sudo virsh console ${VM_OPN}"
echo "The VM will boot from the serial installation image."
echo "Login with user 'installer' and password 'opnsense' to start the installation."
echo "Install onto the VirtIO disk (vtbd0)."
echo "After installation, shutdown the VM, then run 'sudo virsh edit ${VM_OPN}' and remove the USB disk block to boot from the installed system."
# --- Create PXE Client VM ---
local disk_pxe="${IMG_DIR}/${VM_PXE}.qcow2"
if [[ ! -f "$disk_pxe" ]]; then
qemu-img create -f qcow2 "$disk_pxe" "${DISK_PXE_GB}G"
fi
echo "Creating PXE client VM..."
virt-install \
--connect "${CONNECT_URI}" \
--name "${VM_PXE}" \
--ram "${RAM_PXE}" \
--vcpus "${VCPUS_PXE}" \
--cpu host-passthrough \
--os-variant "${OS_VARIANT_LINUX}" \
--graphics none \
--noautoconsole \
--disk path="${disk_pxe}",format=qcow2,bus=virtio \
--network network="${NET_HARMONYLAN}",model=virtio \
--pxe \
--boot uefi,menu=on
echo "PXE VM created. It will attempt to netboot on ${NET_HARMONYLAN}."
}
# --- Script Entrypoint ---
case "${1:-}" in
up)
mkdir -p "${IMG_DIR}" "${STATE_DIR}"
download_if_missing "$OPN_IMG_URL" "$OPN_IMG_PATH"
download_if_missing "$CENTOS_ISO_URL" "$CENTOS_ISO_PATH"
create_lab_environment
echo "Lab setup complete. Use 'sudo virsh list --all' to see VMs."
;;
clean)
destroy_vm "${VM_PXE}"
destroy_vm "${VM_OPN}"
destroy_network "${NET_HARMONYLAN}"
# Optionally destroy the default network if you want a full reset
# destroy_network "default"
echo "Cleanup complete."
;;
*)
echo "Usage: sudo $0 {up|clean}"
exit 1
;;
esac

View File

@@ -0,0 +1,11 @@
[package]
name = "example_validate_ceph_cluster_health"
edition = "2024"
version.workspace = true
readme.workspace = true
license.workspace = true
[dependencies]
harmony = { version = "0.1.0", path = "../../harmony" }
harmony_cli = { version = "0.1.0", path = "../../harmony_cli" }
tokio.workspace = true

View File

@@ -0,0 +1,18 @@
use harmony::{
inventory::Inventory,
modules::storage::ceph::ceph_validate_health_score::CephVerifyClusterHealth,
topology::K8sAnywhereTopology,
};
#[tokio::main]
async fn main() {
let ceph_health_score = CephVerifyClusterHealth {
rook_ceph_namespace: "rook-ceph".to_string(),
};
let topology = K8sAnywhereTopology::from_env();
let inventory = Inventory::autoload();
harmony_cli::run(inventory, topology, vec![Box::new(ceph_health_score)], None)
.await
.unwrap();
}

View File

@@ -32,6 +32,7 @@ pub enum InterpretName {
Lamp,
ApplicationMonitoring,
K8sPrometheusCrdAlerting,
CephClusterHealth,
}
impl std::fmt::Display for InterpretName {
@@ -58,6 +59,7 @@ impl std::fmt::Display for InterpretName {
InterpretName::Lamp => f.write_str("LAMP"),
InterpretName::ApplicationMonitoring => f.write_str("ApplicationMonitoring"),
InterpretName::K8sPrometheusCrdAlerting => f.write_str("K8sPrometheusCrdAlerting"),
InterpretName::CephClusterHealth => f.write_str("CephClusterHealth"),
}
}
}

View File

@@ -5,7 +5,7 @@ use k8s_openapi::{
};
use kube::{
Client, Config, Error, Resource,
api::{Api, AttachParams, ListParams, Patch, PatchParams, ResourceExt},
api::{Api, AttachParams, DeleteParams, ListParams, Patch, PatchParams, ResourceExt},
config::{KubeConfigOptions, Kubeconfig},
core::ErrorResponse,
runtime::reflector::Lookup,
@@ -17,7 +17,9 @@ use kube::{
};
use log::{debug, error, trace};
use serde::{Serialize, de::DeserializeOwned};
use serde_json::json;
use similar::TextDiff;
use tokio::io::AsyncReadExt;
#[derive(new, Clone)]
pub struct K8sClient {
@@ -51,6 +53,66 @@ impl K8sClient {
})
}
pub async fn get_deployment(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<Option<Deployment>, Error> {
let deps: Api<Deployment> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
Ok(deps.get_opt(name).await?)
}
pub async fn get_pod(&self, name: &str, namespace: Option<&str>) -> Result<Option<Pod>, Error> {
let pods: Api<Pod> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
Ok(pods.get_opt(name).await?)
}
pub async fn scale_deployment(
&self,
name: &str,
namespace: Option<&str>,
replicas: u32,
) -> Result<(), Error> {
let deployments: Api<Deployment> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
let patch = json!({
"spec": {
"replicas": replicas
}
});
let pp = PatchParams::default();
let scale = Patch::Apply(&patch);
deployments.patch_scale(name, &pp, &scale).await?;
Ok(())
}
pub async fn delete_deployment(
&self,
name: &str,
namespace: Option<&str>,
) -> Result<(), Error> {
let deployments: Api<Deployment> = if let Some(ns) = namespace {
Api::namespaced(self.client.clone(), ns)
} else {
Api::default_namespaced(self.client.clone())
};
let delete_params = DeleteParams::default();
deployments.delete(name, &delete_params).await?;
Ok(())
}
pub async fn wait_until_deployment_ready(
&self,
name: String,
@@ -76,6 +138,68 @@ impl K8sClient {
}
}
/// Will execute a commond in the first pod found that matches the specified label
/// '{label}={name}'
pub async fn exec_app_capture_output(
&self,
name: String,
label: String,
namespace: Option<&str>,
command: Vec<&str>,
) -> Result<String, String> {
let api: Api<Pod>;
if let Some(ns) = namespace {
api = Api::namespaced(self.client.clone(), ns);
} else {
api = Api::default_namespaced(self.client.clone());
}
let pod_list = api
.list(&ListParams::default().labels(format!("{label}={name}").as_str()))
.await
.expect("couldn't get list of pods");
let res = api
.exec(
pod_list
.items
.first()
.expect("couldn't get pod")
.name()
.expect("couldn't get pod name")
.into_owned()
.as_str(),
command,
&AttachParams::default().stdout(true).stderr(true),
)
.await;
match res {
Err(e) => Err(e.to_string()),
Ok(mut process) => {
let status = process
.take_status()
.expect("Couldn't get status")
.await
.expect("Couldn't unwrap status");
if let Some(s) = status.status {
let mut stdout_buf = String::new();
if let Some(mut stdout) = process.stdout().take() {
stdout.read_to_string(&mut stdout_buf).await;
}
debug!("Status: {} - {:?}", s, status.details);
if s == "Success" {
Ok(stdout_buf)
} else {
Err(s)
}
} else {
Err("Couldn't get inner status of pod exec".to_string())
}
}
}
}
/// Will execute a command in the first pod found that matches the label `app.kubernetes.io/name={name}`
pub async fn exec_app(
&self,

View File

@@ -14,5 +14,6 @@ pub mod monitoring;
pub mod okd;
pub mod opnsense;
pub mod prometheus;
pub mod storage;
pub mod tenant;
pub mod tftp;

View File

@@ -0,0 +1,419 @@
use std::{
process::Command,
sync::Arc,
time::{Duration, Instant},
};
use async_trait::async_trait;
use log::{info, warn};
use serde::{Deserialize, Serialize};
use tokio::time::sleep;
use crate::{
data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
};
#[derive(Debug, Clone, Serialize)]
pub struct CephRemoveOsd {
osd_deployment_name: String,
rook_ceph_namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for CephRemoveOsd {
fn name(&self) -> String {
format!("CephRemoveOsdScore")
}
#[doc(hidden)]
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(CephRemoveOsdInterpret {
score: self.clone(),
})
}
}
#[derive(Debug, Clone)]
pub struct CephRemoveOsdInterpret {
score: CephRemoveOsd,
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for CephRemoveOsdInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await.unwrap();
self.verify_ceph_toolbox_exists(client.clone()).await?;
self.scale_deployment(client.clone()).await?;
self.verify_deployment_scaled(client.clone()).await?;
self.delete_deployment(client.clone()).await?;
self.verify_deployment_deleted(client.clone()).await?;
let osd_id_full = self.get_ceph_osd_id().unwrap();
self.purge_ceph_osd(client.clone(), &osd_id_full).await?;
self.verify_ceph_osd_removal(client.clone(), &osd_id_full)
.await?;
Ok(Outcome::success(format!(
"Successfully removed OSD {} from rook-ceph cluster by deleting deployment {}",
osd_id_full, self.score.osd_deployment_name
)))
}
fn get_name(&self) -> InterpretName {
todo!()
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl CephRemoveOsdInterpret {
pub fn get_ceph_osd_id(&self) -> Result<String, InterpretError> {
let osd_id_numeric = self
.score
.osd_deployment_name
.split('-')
.nth(3)
.ok_or_else(|| {
InterpretError::new(format!(
"Could not parse OSD id from deployment name {}",
self.score.osd_deployment_name
))
})?;
let osd_id_full = format!("osd.{}", osd_id_numeric);
info!(
"Targeting Ceph OSD: {} (parsed from deployment {})",
osd_id_full, self.score.osd_deployment_name
);
Ok(osd_id_full)
}
pub async fn verify_ceph_toolbox_exists(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let toolbox_dep = "rook-ceph-tools".to_string();
match client
.get_deployment(&toolbox_dep, Some(&self.score.rook_ceph_namespace))
.await
{
Ok(Some(deployment)) => {
if let Some(status) = deployment.status {
let ready_count = status.ready_replicas.unwrap_or(0);
if ready_count >= 1 {
return Ok(Outcome::success(format!(
"'{}' is ready with {} replica(s).",
&toolbox_dep, ready_count
)));
} else {
return Err(InterpretError::new(
"ceph-tool-box not ready in cluster".to_string(),
));
}
} else {
Err(InterpretError::new(format!(
"failed to get deployment status {}",
&toolbox_dep
)))
}
}
Ok(None) => Err(InterpretError::new(format!(
"Deployment '{}' not found in namespace '{}'.",
&toolbox_dep, self.score.rook_ceph_namespace
))),
Err(e) => Err(InterpretError::new(format!(
"Failed to query for deployment '{}': {}",
&toolbox_dep, e
))),
}
}
pub async fn scale_deployment(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
"Scaling down OSD deployment: {}",
self.score.osd_deployment_name
);
client
.scale_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
0,
)
.await?;
Ok(Outcome::success(format!(
"Scaled down deployment {}",
self.score.osd_deployment_name
)))
}
pub async fn verify_deployment_scaled(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
)
.await?;
if let Some(deployment) = dep {
if let Some(status) = deployment.status {
if status.replicas.unwrap_or(1) == 0 && status.ready_replicas.unwrap_or(1) == 0
{
return Ok(Outcome::success(
"Deployment successfully scaled down.".to_string(),
));
}
}
}
if start.elapsed() > timeout {
return Err(InterpretError::new(format!(
"Timed out waiting for deployment {} to scale down",
self.score.osd_deployment_name
)));
}
sleep(interval).await;
}
}
fn build_timer(&self) -> (Duration, Duration, Instant) {
let timeout = Duration::from_secs(120);
let interval = Duration::from_secs(5);
let start = Instant::now();
(timeout, interval, start)
}
pub async fn delete_deployment(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
info!(
"Deleting OSD deployment: {}",
self.score.osd_deployment_name
);
client
.delete_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
)
.await?;
Ok(Outcome::success(format!(
"deployment {} deleted",
self.score.osd_deployment_name
)))
}
pub async fn verify_deployment_deleted(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!("Waiting for OSD deployment to scale down to 0 replicas");
loop {
let dep = client
.get_deployment(
&self.score.osd_deployment_name,
Some(&self.score.rook_ceph_namespace),
)
.await?;
if dep.is_none() {
info!(
"Deployment {} successfully deleted.",
self.score.osd_deployment_name
);
return Ok(Outcome::success(format!(
"Deployment {} deleted.",
self.score.osd_deployment_name
)));
}
if start.elapsed() > timeout {
return Err(InterpretError::new(format!(
"Timed out waiting for deployment {} to be deleted",
self.score.osd_deployment_name
)));
}
sleep(interval).await;
}
}
fn get_osd_tree(&self, json: serde_json::Value) -> Result<CephOsdTree, InterpretError> {
let nodes = json.get("nodes").ok_or_else(|| {
InterpretError::new("Missing 'nodes' field in ceph osd tree JSON".to_string())
})?;
let tree: CephOsdTree = CephOsdTree {
nodes: serde_json::from_value(nodes.clone()).map_err(|e| {
InterpretError::new(format!("Failed to parse ceph osd tree JSON: {}", e))
})?,
};
Ok(tree)
}
pub async fn purge_ceph_osd(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
info!(
"Purging OSD {} from Ceph cluster and removing its auth key",
osd_id_full
);
client
.exec_app_capture_output(
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec![
format!("ceph osd purge {osd_id_full} --yes-i-really-mean-it").as_str(),
format!("ceph auth del osd.{osd_id_full}").as_str(),
],
)
.await?;
Ok(Outcome::success(format!(
"osd id {} removed from osd tree",
osd_id_full
)))
}
pub async fn verify_ceph_osd_removal(
&self,
client: Arc<K8sClient>,
osd_id_full: &str,
) -> Result<Outcome, InterpretError> {
let (timeout, interval, start) = self.build_timer();
info!(
"Verifying OSD {} has been removed from the Ceph tree...",
osd_id_full
);
loop {
let output = client
.exec_app_capture_output(
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec!["ceph osd tree -f json"],
)
.await?;
let tree =
self.get_osd_tree(serde_json::from_str(&output).expect("could not extract json"));
let osd_found = tree
.unwrap()
.nodes
.iter()
.any(|node| node.name == osd_id_full);
if !osd_found {
return Ok(Outcome::success(format!(
"Successfully verified that OSD {} is removed from the Ceph cluster.",
osd_id_full,
)));
}
if start.elapsed() > timeout {
return Err(InterpretError::new(format!(
"Timed out waiting for OSD {} to be removed from Ceph tree",
osd_id_full
)));
}
warn!(
"OSD {} still found in Ceph tree, retrying in {:?}...",
osd_id_full, interval
);
sleep(interval).await;
}
}
}
#[derive(Debug, Deserialize, PartialEq)]
pub struct CephOsdTree {
pub nodes: Vec<CephNode>,
}
#[derive(Debug, Deserialize, PartialEq)]
pub struct CephNode {
pub id: i32,
pub name: String,
#[serde(rename = "type")]
pub node_type: String,
pub type_id: Option<i32>,
pub children: Option<Vec<i32>>,
pub exists: Option<i32>,
pub status: Option<String>,
}
#[cfg(test)]
mod tests {
use serde_json::json;
use super::*;
#[test]
fn test_get_osd_tree() {
let json_data = json!({
"nodes": [
{"id": 1, "name": "osd.1", "type": "osd", "primary_affinity":"1"},
{"id": 2, "name": "osd.2", "type": "osd", "crush_weight": 1.22344}
]
});
let interpret = CephRemoveOsdInterpret {
score: CephRemoveOsd {
osd_deployment_name: "osd-1".to_string(),
rook_ceph_namespace: "dummy_ns".to_string(),
},
};
let json = interpret.get_osd_tree(json_data).unwrap();
let expected = CephOsdTree {
nodes: vec![
CephNode {
id: 1,
name: "osd.1".to_string(),
node_type: "osd".to_string(),
type_id: None,
children: None,
exists: None,
status: None,
},
CephNode {
id: 2,
name: "osd.2".to_string(),
node_type: "osd".to_string(),
type_id: None,
children: None,
exists: None,
status: None,
},
],
};
assert_eq!(json, expected);
}
}

View File

@@ -0,0 +1,136 @@
use std::{sync::Arc, time::Duration};
use async_trait::async_trait;
use log::debug;
use serde::Serialize;
use tokio::time::Instant;
use crate::{
data::{Id, Version},
interpret::{Interpret, InterpretError, InterpretName, InterpretStatus, Outcome},
inventory::Inventory,
score::Score,
topology::{K8sclient, Topology, k8s::K8sClient},
};
#[derive(Clone, Debug, Serialize)]
pub struct CephVerifyClusterHealth {
pub rook_ceph_namespace: String,
}
impl<T: Topology + K8sclient> Score<T> for CephVerifyClusterHealth {
fn name(&self) -> String {
format!("CephValidateClusterHealth")
}
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(CephVerifyClusterHealthInterpret {
score: self.clone(),
})
}
}
#[derive(Clone, Debug)]
pub struct CephVerifyClusterHealthInterpret {
score: CephVerifyClusterHealth,
}
#[async_trait]
impl<T: Topology + K8sclient> Interpret<T> for CephVerifyClusterHealthInterpret {
async fn execute(
&self,
_inventory: &Inventory,
topology: &T,
) -> Result<Outcome, InterpretError> {
let client = topology.k8s_client().await.unwrap();
self.verify_ceph_toolbox_exists(client.clone()).await?;
self.validate_ceph_cluster_health(client.clone()).await?;
Ok(Outcome::success("Ceph cluster healthy".to_string()))
}
fn get_name(&self) -> InterpretName {
InterpretName::CephClusterHealth
}
fn get_version(&self) -> Version {
todo!()
}
fn get_status(&self) -> InterpretStatus {
todo!()
}
fn get_children(&self) -> Vec<Id> {
todo!()
}
}
impl CephVerifyClusterHealthInterpret {
pub async fn verify_ceph_toolbox_exists(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
let toolbox_dep = "rook-ceph-tools".to_string();
match client
.get_deployment(&toolbox_dep, Some(&self.score.rook_ceph_namespace))
.await
{
Ok(Some(deployment)) => {
if let Some(status) = deployment.status {
let ready_count = status.ready_replicas.unwrap_or(0);
if ready_count >= 1 {
return Ok(Outcome::success(format!(
"'{}' is ready with {} replica(s).",
&toolbox_dep, ready_count
)));
} else {
return Err(InterpretError::new(
"ceph-tool-box not ready in cluster".to_string(),
));
}
} else {
Err(InterpretError::new(format!(
"failed to get deployment status {}",
&toolbox_dep
)))
}
}
Ok(None) => Err(InterpretError::new(format!(
"Deployment '{}' not found in namespace '{}'.",
&toolbox_dep, self.score.rook_ceph_namespace
))),
Err(e) => Err(InterpretError::new(format!(
"Failed to query for deployment '{}': {}",
&toolbox_dep, e
))),
}
}
pub async fn validate_ceph_cluster_health(
&self,
client: Arc<K8sClient>,
) -> Result<Outcome, InterpretError> {
debug!("Verifying ceph cluster is in healthy state");
let health = client
.exec_app_capture_output(
"rook-ceph-tools".to_string(),
"app".to_string(),
Some(&self.score.rook_ceph_namespace),
vec!["sh", "-c", "ceph health"],
)
.await?;
if health.contains("HEALTH_OK") {
return Ok(Outcome::success(
"Ceph Cluster in healthy state".to_string(),
));
} else {
Err(InterpretError::new(format!(
"Ceph cluster unhealthy {}",
health
)))
}
}
}

View File

@@ -0,0 +1,2 @@
pub mod ceph_osd_replacement_score;
pub mod ceph_validate_health_score;

View File

@@ -0,0 +1 @@
pub mod ceph;

View File

@@ -10,4 +10,3 @@ serde.workspace = true
serde_json.workspace = true
log.workspace = true
env_logger.workspace = true
uuid.workspace = true

File diff suppressed because it is too large Load Diff

View File

@@ -9,8 +9,16 @@ mod hwinfo;
async fn inventory() -> impl Responder {
log::info!("Received inventory request");
let host = PhysicalHost::gather();
log::info!("Inventory data gathered successfully");
actix_web::HttpResponse::Ok().json(host)
match host {
Ok(host) => {
log::info!("Inventory data gathered successfully");
actix_web::HttpResponse::Ok().json(host)
}
Err(error) => {
log::error!("Inventory data gathering FAILED");
actix_web::HttpResponse::InternalServerError().json(error)
}
}
}
#[actix_web::main]