From 756a010adcba6ee4a6daafa5610b0519bada8092 Mon Sep 17 00:00:00 2001 From: Jean-Gabriel Gill-Couture Date: Sun, 3 May 2026 09:47:21 -0400 Subject: [PATCH 1/2] feat: iobench redpanda profile to run the recommended fio settings by redpanda on a k8s storage backend --- .gitignore | 2 + iobench/Cargo.toml | 3 +- iobench/PREFLIGHT_CHECKS.md | 68 +++++ iobench/README.md | 146 +++++++++++ iobench/dash/iobench-dash.py | 66 +++-- iobench/src/fio.rs | 100 ++++++++ iobench/src/k8s.rs | 193 ++++++++++++++ iobench/src/main.rs | 246 ++++++------------ iobench/src/redpanda.rs | 476 +++++++++++++++++++++++++++++++++++ iobench/src/simple.rs | 123 +++++++++ 10 files changed, 1215 insertions(+), 208 deletions(-) create mode 100644 iobench/PREFLIGHT_CHECKS.md create mode 100644 iobench/README.md create mode 100644 iobench/src/fio.rs create mode 100644 iobench/src/k8s.rs create mode 100644 iobench/src/redpanda.rs create mode 100644 iobench/src/simple.rs diff --git a/.gitignore b/.gitignore index 86ff3596..1d709c2d 100644 --- a/.gitignore +++ b/.gitignore @@ -32,3 +32,5 @@ ignore # Generated book book + +__pycache__ diff --git a/iobench/Cargo.toml b/iobench/Cargo.toml index 7b3daaa3..5876ba8c 100644 --- a/iobench/Cargo.toml +++ b/iobench/Cargo.toml @@ -11,7 +11,8 @@ clap = { version = "4.0", features = ["derive"] } chrono = "0.4" serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" +serde_yaml = { workspace = true } csv = "1.1" num_cpus = "1.13" +k8s-openapi = { workspace = true } -[workspace] diff --git a/iobench/PREFLIGHT_CHECKS.md b/iobench/PREFLIGHT_CHECKS.md new file mode 100644 index 00000000..da5eca73 --- /dev/null +++ b/iobench/PREFLIGHT_CHECKS.md @@ -0,0 +1,68 @@ +# Redpanda iobench preflight checklist + +Cluster inspection performed: 2026-05-03 +Context: maintenance window, other workloads turned off, Ceph pool at ~0 IOPS idle. + +--- + +## Cluster topology + +- [x] **PASS** — 3 worker nodes available (wk0, wk1, wk2). Matches `--replicas 3` default. +- [x] **PASS** — All 3 workers are Ready, no DiskPressure / MemoryPressure / PIDPressure. +- [x] **PASS** — Control plane nodes (cp3, cp4, cp5) have `NoSchedule` taint. iobench pods cannot land there. +- [x] **PASS** — All nodes carry `kubernetes.io/hostname` label. Anti-affinity topology key will work. +- [x] **PASS** — Worker nodes are untainted and have no scheduling restrictions. + +## Storage + +- [x] **PASS** — StorageClass `ceph-block` exists, `volumeBindingMode: Immediate`, `reclaimPolicy: Delete`. +- [x] **PASS** — Ceph pool `ceph-blockpool` replication: size=3, min_size=2. +- [x] **PASS** — 3 OSDs all Running (osd-0, osd-1, osd-2). +- [x] **PASS** — Raw capacity: 503 GiB available. 3x50GiB PVCs = 150 GiB usable = 450 GiB raw. Leaves 53 GiB raw headroom (~10.5%). Tight but sufficient for a benchmark run. +- [x] **PASS** — Largest single-workload disk footprint: `throughput` with numjobs=4 x size=10G = 40 GiB per PVC. Fits in 50 GiB PVC with 10 GiB headroom for logs and filesystem overhead. +- [x] **PASS** — Pool is idle ("nothing is going on"), OSD commit/apply latency showing 0 ms. Confirms maintenance window baseline. +- [x] **PASS** — All 3 disks are actually Intel SSDs (misidentified as HDD by Ceph due to HP RAID controller passthrough). No mixed-media concern. +- [x] **PASS (non-blocking)** — Ceph health is `HEALTH_WARN: too many PGs per OSD (265 > max 250)`. This is a pre-existing tuning issue unrelated to the benchmark. Does not affect correctness of results. Note: if this were a data-integrity warning (e.g. `HEALTH_WARN: degraded PGs`), the benchmark must not proceed. + +## Namespace and resource constraints + +- [x] **PASS** — `default` namespace exists. +- [x] **PASS** — No ResourceQuota or LimitRange in `default` namespace. Pod creation won't be blocked. +- [x] **PASS** — No leftover `iobench-redpanda` StatefulSet or PVCs in `default` namespace. Clean slate. + +## Container image + +- [x] **PASS** — `juicedata/fio:latest` pulls successfully and runs on this cluster. +- [x] **PASS** — fio version 3.18 confirmed. Supports `fdatasync=1`, `log_avg_msec`, `write_lat_log`, `write_iops_log`, `--output-format=json`. +- [x] **PASS** — Image contains `sh`, `date` (needed for wall-clock barrier), and `tar` (needed for `kubectl cp`). + +## Clock synchronization (barrier correctness) + +- [x] **PASS** — Node heartbeat timestamps are within ~5s of each other. The 60-second barrier (`start_at = now_epoch() + 60`) provides ample margin. Barrier only fails if clock skew exceeds 60s. + +## Code safety review + +- [x] **PASS** — All fio workloads use `direct=1` (bypass page cache). Benchmark measures Ceph, not RAM. +- [x] **PASS** — All writes target `/data/iobench_testfile` on the mounted PVC filesystem, not a raw block device. No risk of corrupting the node OS disk. +- [x] **PASS** — Pods run with default security context (no privileged, no hostPath, no hostNetwork). Blast radius is limited to the PVCs. +- [x] **PASS** — `reclaimPolicy: Delete` means PVCs and their backing RBD images are fully cleaned up when deleted. No storage leak after `undeploy`. +- [x] **PASS** — `delete_resources` targets only `statefulset,pvc` with label `app=iobench-redpanda`. Cannot accidentally delete unrelated resources. +- [x] **PASS** — Parallel mode prints a clear warning before starting. +- [x] **PASS** — `--keep-deployment` flag available for debugging without re-provisioning. +- [x] **PASS** — Workloads run sequentially within each mode (throughput, then fsync_hot_path, then selftest_512k, then selftest_4k_qd1). Disk usage doesn't accumulate across workloads since they reuse the same filename. + +## Resource consumption during run + +- [x] **PASS** — Workers have ample CPU headroom (3-6% current usage). fio with 4 jobs + iodepth 16 is not CPU-intensive on modern hardware. +- [x] **PASS** — Workers have ample memory headroom (5-10% current usage). fio with `direct=1` uses minimal RAM. +- [x] **PASS** — No CPU/memory resource limits or requests set on the fio container. This is intentional — resource limits would throttle the benchmark and distort results. Acceptable during a maintenance window with other workloads off. + +## Risks acknowledged + +- **Ceph pool impact**: Parallel mode will saturate the Ceph pool. This is the point of the test. Confirmed other workloads are off and pool is at ~0 IOPS. +- **Capacity headroom is tight**: 53 GiB raw remaining after PVC provisioning (~10.5%). If the cluster has background operations that consume space (e.g., snapshots, compaction), this could trigger a `HEALTH_ERR: full` condition. Mitigated by: maintenance window, no other workloads, and `reclaimPolicy: Delete` ensuring cleanup. +- **`--wait=false` on delete**: `delete_resources` uses `--wait=false`, so `undeploy` returns before PVCs are fully reclaimed. This is fine — Ceph handles RBD image deletion asynchronously. But if re-running immediately after undeploy, PVCs from the previous run may still be terminating. Mitigated by: the deploy step uses `kubectl apply` which is idempotent. + +## Verdict + +All checks pass. Safe to proceed with `iobench redpanda --storage-class ceph-block` during the current maintenance window. diff --git a/iobench/README.md b/iobench/README.md new file mode 100644 index 00000000..f4cff5cc --- /dev/null +++ b/iobench/README.md @@ -0,0 +1,146 @@ +# iobench + +A command-line I/O benchmarking tool using fio. Runs locally, over SSH, or on Kubernetes pods. Includes a Redpanda storage characterization profile for validating Ceph RBD suitability. + +## Build + +```bash +cargo build -p iobench --release +``` + +## Simple profile (default) + +Runs standard fio benchmarks (sequential/random read/write, single/multi-job) against a local or remote target. + +```bash +# Local +iobench + +# Over SSH +iobench --target ssh/user@host + +# On a Kubernetes pod +iobench --target k8s/namespace/pod-name + +# Custom parameters +iobench --duration 60 --size 4G --block-size 128k --tests read,write,randwrite +``` + +Available tests: `read`, `write`, `randread`, `randwrite`, `multiread`, `multiwrite`, `multirandread`, `multirandwrite`. + +Results are saved to `./iobench-{timestamp}/summary.csv`. + +## Redpanda profile + +Characterizes whether a Kubernetes storage backend (e.g. Ceph RBD) can sustain Redpanda's I/O patterns. Deploys a 3-pod StatefulSet with per-node PVCs and runs four fio workloads that model Redpanda's actual storage behavior. + +### Quick start + +```bash +# Run the full profile (sequential baselines, then parallel contention test) +iobench redpanda --storage-class ceph-block + +# Sequential baselines only (single-node, no cluster impact) +iobench redpanda --mode sequential --storage-class ceph-block + +# Parallel contention test only +iobench redpanda --mode parallel --storage-class ceph-block + +# Keep pods alive after the run (useful for re-running or debugging) +iobench redpanda --storage-class ceph-block --keep-deployment + +# Deploy pods without running any workloads +iobench deploy --storage-class ceph-block + +# Remove all iobench pods and PVCs +iobench undeploy +``` + +### Options + +| Flag | Default | Description | +|------|---------|-------------| +| `--mode` | `both` | `sequential`, `parallel`, or `both` | +| `--storage-class` | `ceph-block` | Kubernetes StorageClass for the PVCs | +| `--pvc-size` | `50Gi` | Size of each PVC | +| `--replicas` | `3` | Number of pods (should match cluster node count) | +| `--namespace` | `default` | Kubernetes namespace | +| `--output-dir` | auto-timestamped | Directory for results | +| `--keep-deployment` | `false` | Don't delete pods/PVCs after the run | + +### Workloads + +The profile runs four workloads, each targeting a different aspect of Redpanda's I/O: + +| Workload | What it models | Key params | Runtime | +|----------|---------------|------------|---------| +| `throughput` | Bulk segment writes (optimistic upper bound) | 16K seq write, no fsync, 4 jobs, iodepth=16 | 5 min | +| `fsync_hot_path` | Raft commit path with `acks=all` | 16K seq write, fdatasync per op, 4 jobs, iodepth=4 | 10 min | +| `selftest_512k` | `rpk cluster self-test` 512K phase | 512K seq write, fdatasync, 1 job, iodepth=4 | 2 min | +| `selftest_4k_qd1` | Worst-case single-stream commit | 4K seq write, fdatasync, 1 job, iodepth=1 | 2 min | + +All workloads use `direct=1` (bypass page cache) and `ioengine=libaio`. + +### Execution modes + +- **`sequential`** -- Runs all four workloads back-to-back on a single node. Clean per-pattern baselines, comparable to Redpanda's published hardware requirements (16,000 IOPS minimum per broker). +- **`parallel`** -- Runs all four workloads concurrently across all nodes simultaneously. Each node writes to its own PVC. A wall-clock barrier synchronizes start times across pods so contention windows overlap. This is the production-shape test that exposes the Ceph OSD fan-in pattern. +- **`both`** (default) -- Sequential first, then parallel. + +### Interpreting results + +The **headline metric** is the worst p99.9 fdatasync latency on the `fsync_hot_path` workload during parallel execution: + +- **<= 100 ms**: PASS. Storage can sustain Raft heartbeats under load. +- **> 100 ms**: FAIL. Raft heartbeats (150 ms interval, 1.5 s election timeout) will not survive load spikes, leading to election storms. + +Reference values from healthy NVMe (from `rpk cluster self-test`): +- `selftest_512k`: ~1182 IOPS, ~591 MiB/s +- `selftest_4k_qd1`: ~406 IOPS + +### Output + +Results are saved to `./iobench-redpanda-{timestamp}/`: + +``` +iobench-redpanda-2026-05-03-143022/ + redpanda_summary.csv # Full results: all metrics, all nodes, all workloads + iobench.csv # Dashboard-compatible format + sequential/ + throughput_node-0.json # Raw fio JSON per workload per node + fsync_hot_path_node-0.json + ... + node-0/ # Per-node time-series logs + throughput_lat.1.log + throughput_iops.1.log + ... + parallel/ + throughput_node-0.json + throughput_node-1.json + throughput_node-2.json + ... +``` + +The CSV includes per-workload per-node: IOPS, bandwidth, completion latency percentiles (p50-p99.99, max), and fdatasync latency percentiles (p50-p99.99, max). + +Per-second time-series logs (`*_lat.*.log`, `*_iops.*.log`) capture the bimodal/spiky behavior of Ceph under load that summary statistics miss. + +### Warning + +Parallel mode generates heavy fdatasync workloads across all nodes simultaneously. This **will impact other workloads on the same Ceph pool**. Run during a maintenance window or off-peak. + +## Dashboard + +A Plotly Dash app for visualizing results. Supports both simple and Redpanda profile data. + +```bash +cd iobench/dash +virtualenv venv +source venv/bin/activate +pip install -r requirements_freeze.txt + +# Copy result CSVs into the dash directory, then: +python iobench-dash.py +``` + +The dashboard reads `iobench.csv` and/or `redpanda_summary.csv` from its working directory. diff --git a/iobench/dash/iobench-dash.py b/iobench/dash/iobench-dash.py index ae896b4d..75271dca 100644 --- a/iobench/dash/iobench-dash.py +++ b/iobench/dash/iobench-dash.py @@ -1,3 +1,5 @@ +import os + import dash from dash import dcc, html, Input, Output, State, clientside_callback, ClientsideFunction import plotly.express as px @@ -6,38 +8,29 @@ import dash_bootstrap_components as dbc import io # --- Data Loading and Preparation --- -# csv_data = """label,test_name,iops,bandwidth_kibps,latency_mean_ms,latency_stddev_ms -# Ceph HDD Only,read-4k-sync-test,1474.302,5897,0.673,0.591 -# Ceph HDD Only,write-4k-sync-test,14.126,56,27.074,7.046 -# Ceph HDD Only,randread-4k-sync-test,225.140,900,4.436,6.918 -# Ceph HDD Only,randwrite-4k-sync-test,13.129,52,34.891,10.859 -# Ceph HDD Only,multiread-4k-sync-test,6873.675,27494,0.578,0.764 -# Ceph HDD Only,multiwrite-4k-sync-test,57.135,228,38.660,11.293 -# Ceph HDD Only,multirandread-4k-sync-test,2451.376,9805,1.626,2.515 -# Ceph HDD Only,multirandwrite-4k-sync-test,54.642,218,33.492,13.111 -# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,read-4k-sync-test,1495.700,5982,0.664,1.701 -# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,write-4k-sync-test,16.990,67,17.502,9.908 -# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,randread-4k-sync-test,159.256,637,6.274,9.232 -# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,randwrite-4k-sync-test,16.693,66,24.094,16.099 -# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multiread-4k-sync-test,7305.559,29222,0.544,1.338 -# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multiwrite-4k-sync-test,52.260,209,34.891,17.576 -# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multirandread-4k-sync-test,700.606,2802,5.700,10.429 -# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multirandwrite-4k-sync-test,52.723,210,29.709,25.829 -# Ceph 2 Hosts WAL+DB SSD Only,randwrite-4k-sync-test,90.037,360,3.617,8.321 -# Ceph WAL+DB SSD During Rebuild,randwrite-4k-sync-test,41.008,164,10.138,19.333 -# Ceph WAL+DB SSD OSD HDD,read-4k-sync-test,1520.299,6081,0.654,1.539 -# Ceph WAL+DB SSD OSD HDD,write-4k-sync-test,78.528,314,4.074,9.101 -# Ceph WAL+DB SSD OSD HDD,randread-4k-sync-test,153.303,613,6.518,9.036 -# Ceph WAL+DB SSD OSD HDD,randwrite-4k-sync-test,48.677,194,8.785,20.356 -# Ceph WAL+DB SSD OSD HDD,multiread-4k-sync-test,6804.880,27219,0.584,1.422 -# Ceph WAL+DB SSD OSD HDD,multiwrite-4k-sync-test,311.513,1246,4.978,9.458 -# Ceph WAL+DB SSD OSD HDD,multirandread-4k-sync-test,581.756,2327,6.869,10.204 -# Ceph WAL+DB SSD OSD HDD,multirandwrite-4k-sync-test,120.556,482,13.463,25.440 -# """ -# -# df = pd.read_csv(io.StringIO(csv_data)) -df = pd.read_csv("iobench.csv") # Replace with the actual file path +dataframes = [] + +if os.path.exists("iobench.csv"): + df_simple = pd.read_csv("iobench.csv") + dataframes.append(df_simple) + +if os.path.exists("redpanda_summary.csv"): + df_red = pd.read_csv("redpanda_summary.csv") + df_red['label'] = df_red['mode'] + '-' + df_red['workload'] + df_red['test_name'] = df_red['workload'] + cols = ['label', 'test_name', 'iops', 'bandwidth_kibps', 'latency_mean_ms', 'latency_stddev_ms', 'fdatasync_p99_9_ms'] + df_red = df_red[[c for c in cols if c in df_red.columns]] + dataframes.append(df_red) + +if not dataframes: + raise FileNotFoundError("No iobench.csv or redpanda_summary.csv found in working directory.") + +df = pd.concat(dataframes, ignore_index=True) df['bandwidth_mbps'] = df['bandwidth_kibps'] / 1024 +if 'fdatasync_p99_9_ms' not in df.columns: + df['fdatasync_p99_9_ms'] = 0.0 +else: + df['fdatasync_p99_9_ms'] = df['fdatasync_p99_9_ms'].fillna(0.0) # --- App Initialization and Global Settings --- app = dash.Dash(__name__, external_stylesheets=[dbc.themes.FLATLY]) @@ -70,7 +63,8 @@ app.layout = dbc.Container([ options=[ {'label': 'IOPS', 'value': 'iops'}, {'label': 'Latency (ms)', 'value': 'latency_mean_ms'}, - {'label': 'Bandwidth (MB/s)', 'value': 'bandwidth_mbps'} + {'label': 'Bandwidth (MB/s)', 'value': 'bandwidth_mbps'}, + {'label': 'fdatasync p99.9 (ms)', 'value': 'fdatasync_p99_9_ms'}, ], value=['iops', 'latency_mean_ms', 'bandwidth_mbps'], # Default selection labelClassName="d-block" @@ -185,11 +179,12 @@ def update_graphs(selected_metrics, selected_configs, selected_tests): metric_titles = { 'iops': 'IOPS Comparison (Higher is Better)', 'latency_mean_ms': 'Mean Latency (ms) Comparison (Lower is Better)', - 'bandwidth_mbps': 'Bandwidth (MB/s) Comparison (Higher is Better)' + 'bandwidth_mbps': 'Bandwidth (MB/s) Comparison (Higher is Better)', + 'fdatasync_p99_9_ms': 'fdatasync p99.9 Latency (ms) Comparison (Lower is Better)', } for metric in selected_metrics: - sort_order = 'total ascending' if metric == 'latency_mean_ms' else 'total descending' + sort_order = 'total ascending' if metric in ('latency_mean_ms', 'fdatasync_p99_9_ms') else 'total descending' error_y_param = 'latency_stddev_ms' if metric == 'latency_mean_ms' else None fig = px.bar( @@ -206,7 +201,8 @@ def update_graphs(selected_metrics, selected_configs, selected_tests): "iops": "IOPS", "latency_mean_ms": "Mean Latency (ms)", "bandwidth_mbps": "Bandwidth (MB/s)", - "label": "Cluster Configuration" + "fdatasync_p99_9_ms": "fdatasync p99.9 (ms)", + "label": "Configuration" } ) diff --git a/iobench/src/fio.rs b/iobench/src/fio.rs new file mode 100644 index 00000000..4b8957bd --- /dev/null +++ b/iobench/src/fio.rs @@ -0,0 +1,100 @@ +use serde::{Deserialize, Serialize}; +use std::collections::HashMap; + +#[derive(Debug, Serialize, Deserialize)] +pub struct FioOutput { + pub jobs: Vec, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct FioJobResult { + pub jobname: String, + pub read: FioMetrics, + pub write: FioMetrics, + #[serde(default)] + pub sync: Option, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct FioMetrics { + pub bw: f64, + pub iops: f64, + pub clat_ns: LatencyMetrics, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct SyncMetrics { + pub total_ios: u64, + pub lat_ns: LatencyMetrics, +} + +#[derive(Debug, Serialize, Deserialize)] +pub struct LatencyMetrics { + pub mean: f64, + pub stddev: f64, + #[serde(default)] + pub percentile: Option>, +} + +#[derive(Debug, Serialize)] +pub struct BenchmarkResult { + pub test_name: String, + pub iops: f64, + pub bandwidth_kibps: f64, + pub latency_mean_ms: f64, + pub latency_stddev_ms: f64, + pub clat_p50_ms: f64, + pub clat_p95_ms: f64, + pub clat_p99_ms: f64, + pub clat_p99_9_ms: f64, + pub clat_p99_99_ms: f64, + pub clat_max_ms: f64, +} + +fn percentile_ns_to_ms(p: &Option>, key: &str) -> f64 { + p.as_ref() + .and_then(|m| m.get(key).copied()) + .unwrap_or(0.0) + / 1_000_000.0 +} + +pub fn parse_fio_output( + output: &str, + test_name: &str, + rw: &str, +) -> Result { + let fio_data: FioOutput = serde_json::from_str(output) + .map_err(|e| format!("Failed to deserialize fio JSON: {}", e))?; + + let job_result = fio_data + .jobs + .iter() + .find(|j| j.jobname == test_name) + .ok_or_else(|| { + format!( + "Could not find job result for '{}' in fio output", + test_name + ) + })?; + + let metrics = if rw.contains("read") { + &job_result.read + } else { + &job_result.write + }; + + let p = &metrics.clat_ns.percentile; + Ok(BenchmarkResult { + test_name: test_name.to_string(), + iops: metrics.iops, + bandwidth_kibps: metrics.bw, + latency_mean_ms: metrics.clat_ns.mean / 1_000_000.0, + latency_stddev_ms: metrics.clat_ns.stddev / 1_000_000.0, + clat_p50_ms: percentile_ns_to_ms(p, "50.000000"), + clat_p95_ms: percentile_ns_to_ms(p, "95.000000"), + clat_p99_ms: percentile_ns_to_ms(p, "99.000000"), + clat_p99_9_ms: percentile_ns_to_ms(p, "99.900000"), + clat_p99_99_ms: percentile_ns_to_ms(p, "99.990000"), + clat_max_ms: percentile_ns_to_ms(p, "100.000000"), + }) +} diff --git a/iobench/src/k8s.rs b/iobench/src/k8s.rs new file mode 100644 index 00000000..5a8279d1 --- /dev/null +++ b/iobench/src/k8s.rs @@ -0,0 +1,193 @@ +use std::collections::BTreeMap; +use std::io; +use std::io::Write; +use std::process::{Command, Stdio}; + +use k8s_openapi::api::apps::v1::StatefulSet; +use k8s_openapi::api::core::v1::{ + Affinity, Container, PersistentVolumeClaim, PersistentVolumeClaimSpec, PodAffinityTerm, + PodAntiAffinity, PodSpec, PodTemplateSpec, VolumeMount, VolumeResourceRequirements, +}; +use k8s_openapi::apimachinery::pkg::api::resource::Quantity; +use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta}; + +pub fn redpanda_statefulset( + name: &str, + namespace: &str, + replicas: i32, + storage_class: &str, + size: &str, + image: &str, +) -> StatefulSet { + let mut labels = BTreeMap::new(); + labels.insert("app".to_string(), name.to_string()); + + let pvc = PersistentVolumeClaim { + metadata: ObjectMeta { + name: Some("data".to_string()), + ..Default::default() + }, + spec: Some(PersistentVolumeClaimSpec { + access_modes: Some(vec!["ReadWriteOnce".to_string()]), + resources: Some(VolumeResourceRequirements { + requests: Some({ + let mut m = BTreeMap::new(); + m.insert("storage".to_string(), Quantity(size.to_string())); + m + }), + ..Default::default() + }), + storage_class_name: Some(storage_class.to_string()), + ..Default::default() + }), + ..Default::default() + }; + + let container = Container { + name: "fio".to_string(), + image: Some(image.to_string()), + image_pull_policy: Some("IfNotPresent".to_string()), + command: Some(vec!["sleep".to_string(), "infinity".to_string()]), + volume_mounts: Some(vec![VolumeMount { + name: "data".to_string(), + mount_path: "/data".to_string(), + ..Default::default() + }]), + ..Default::default() + }; + + let pod_spec = PodSpec { + containers: vec![container], + affinity: Some(Affinity { + pod_anti_affinity: Some(PodAntiAffinity { + required_during_scheduling_ignored_during_execution: Some(vec![PodAffinityTerm { + label_selector: Some(LabelSelector { + match_labels: Some(labels.clone()), + ..Default::default() + }), + topology_key: "kubernetes.io/hostname".to_string(), + ..Default::default() + }]), + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }; + + StatefulSet { + metadata: ObjectMeta { + name: Some(name.to_string()), + namespace: Some(namespace.to_string()), + labels: Some(labels.clone()), + ..Default::default() + }, + spec: Some(k8s_openapi::api::apps::v1::StatefulSetSpec { + replicas: Some(replicas), + selector: LabelSelector { + match_labels: Some(labels.clone()), + ..Default::default() + }, + service_name: Some(name.to_string()), + pod_management_policy: Some("Parallel".to_string()), + template: PodTemplateSpec { + metadata: Some(ObjectMeta { + labels: Some(labels), + ..Default::default() + }), + spec: Some(pod_spec), + }, + volume_claim_templates: Some(vec![pvc]), + ..Default::default() + }), + ..Default::default() + } +} + +pub fn run_kubectl(args: &[&str]) -> io::Result { + let mut cmd = Command::new("kubectl"); + cmd.args(args); + cmd.stdout(Stdio::piped()).stderr(Stdio::piped()); + let output = cmd.output()?; + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(io::Error::other(format!( + "kubectl failed: {} (args: {:?})", + stderr, args + ))); + } + Ok(String::from_utf8_lossy(&output.stdout).to_string()) +} + +pub fn apply_yaml(yaml: &str) -> io::Result<()> { + let mut cmd = Command::new("kubectl"); + cmd.args(["apply", "-f", "-"]); + cmd.stdin(Stdio::piped()); + cmd.stdout(Stdio::piped()); + cmd.stderr(Stdio::piped()); + + let mut child = cmd.spawn()?; + { + let stdin = child.stdin.as_mut().unwrap(); + stdin.write_all(yaml.as_bytes())?; + } + let output = child.wait_with_output()?; + if !output.status.success() { + let stderr = String::from_utf8_lossy(&output.stderr); + return Err(io::Error::other(format!( + "kubectl apply failed: {}", + stderr + ))); + } + Ok(()) +} + +pub fn wait_for_pods(namespace: &str, name: &str) -> io::Result<()> { + run_kubectl(&[ + "rollout", + "status", + "statefulset", + name, + "-n", + namespace, + "--timeout", + "300s", + ])?; + Ok(()) +} + +pub fn get_pod_names(namespace: &str, label: &str) -> io::Result> { + let out = run_kubectl(&[ + "get", + "pods", + "-n", + namespace, + "-l", + &format!("app={}", label), + "-o", + "jsonpath={.items[*].metadata.name}", + ])?; + Ok(out.split_whitespace().map(|s| s.to_string()).collect()) +} + +pub fn exec(namespace: &str, pod: &str, command: &str) -> io::Result { + run_kubectl(&["exec", "-n", namespace, pod, "--", "sh", "-c", command]) +} + +pub fn cp_from(namespace: &str, pod: &str, remote: &str, local: &str) -> io::Result<()> { + run_kubectl(&["cp", "-n", namespace, &format!("{}:{}", pod, remote), local])?; + Ok(()) +} + +pub fn delete_resources(namespace: &str, label: &str) -> io::Result<()> { + run_kubectl(&[ + "delete", + "statefulset,pvc", + "-n", + namespace, + "-l", + &format!("app={}", label), + "--wait=false", + ])?; + Ok(()) +} diff --git a/iobench/src/main.rs b/iobench/src/main.rs index 9429ed0b..754c4de6 100644 --- a/iobench/src/main.rs +++ b/iobench/src/main.rs @@ -1,23 +1,22 @@ -use std::fs; -use std::io::{self, Write}; +use std::io; +use std::io::Write; use std::process::{Command, Stdio}; -use std::thread; -use std::time::Duration; -use chrono::Local; -use clap::Parser; -use serde::{Deserialize, Serialize}; +use clap::{Parser, Subcommand}; + +mod fio; +mod k8s; +mod redpanda; +mod simple; /// A simple yet powerful I/O benchmarking tool using fio. #[derive(Parser, Debug)] #[command(author, version, about, long_about = None)] -struct Args { - /// Target for the benchmark. - /// Formats: - /// - localhost (default) - /// - ssh/{user}@{host} - /// - ssh/{user}@{host}:{port} - /// - k8s/{namespace}/{pod} +struct Cli { + #[command(subcommand)] + command: Option, + + // Simple profile args (used when no subcommand is given) #[arg(short, long, default_value = "localhost")] target: String, @@ -27,7 +26,10 @@ struct Args { /// Comma-separated list of tests to run. /// Available tests: read, write, randread, randwrite, /// multiread, multiwrite, multirandread, multirandwrite. - #[arg(long, default_value = "read,write,randread,randwrite,multiread,multiwrite,multirandread,multirandwrite")] + #[arg( + long, + default_value = "read,write,randread,randwrite,multiread,multiwrite,multirandread,multirandwrite" + )] tests: String, /// Duration of each test in seconds. @@ -48,143 +50,49 @@ struct Args { block_size: String, } -#[derive(Debug, Serialize, Deserialize)] -struct FioOutput { - jobs: Vec, -} - -#[derive(Debug, Serialize, Deserialize)] -struct FioJobResult { - jobname: String, - read: FioMetrics, - write: FioMetrics, -} - -#[derive(Debug, Serialize, Deserialize)] -struct FioMetrics { - bw: f64, - iops: f64, - clat_ns: LatencyMetrics, -} - -#[derive(Debug, Serialize, Deserialize)] -struct LatencyMetrics { - mean: f64, - stddev: f64, -} - -#[derive(Debug, Serialize)] -struct BenchmarkResult { - test_name: String, - iops: f64, - bandwidth_kibps: f64, - latency_mean_ms: f64, - latency_stddev_ms: f64, +#[derive(Subcommand, Debug)] +enum Commands { + /// Run the Redpanda storage characterization profile. + Redpanda(redpanda::RedpandaArgs), + /// Deploy the Redpanda iobench resources and exit. + Deploy(redpanda::RedpandaArgs), + /// Remove the Redpanda iobench resources. + Undeploy(redpanda::RedpandaArgs), } fn main() -> io::Result<()> { - let args = Args::parse(); + let cli = Cli::parse(); - let output_dir = args.output_dir.unwrap_or_else(|| { - format!("./iobench-{}", Local::now().format("%Y-%m-%d-%H%M%S")) - }); - fs::create_dir_all(&output_dir)?; - - let tests_to_run: Vec<&str> = args.tests.split(',').collect(); - let mut results = Vec::new(); - - for test in tests_to_run { - println!("--------------------------------------------------"); - println!("Running test: {}", test); - - let (rw, numjobs) = match test { - "read" => ("read", 1), - "write" => ("write", 1), - "randread" => ("randread", 1), - "randwrite" => ("randwrite", 1), - "multiread" => ("read", 4), - "multiwrite" => ("write", 4), - "multirandread" => ("randread", 4), - "multirandwrite" => ("randwrite", 4), - _ => { - eprintln!("Unknown test: {}. Skipping.", test); - continue; - } - }; - - let test_name = format!("{}-{}-sync-test", test, args.block_size); - let fio_command = format!( - "fio --filename={}/iobench_testfile --direct=1 --fsync=1 --rw={} --bs={} --numjobs={} --iodepth=1 --runtime={} --time_based --group_reporting --name={} --size={} --output-format=json", - args.benchmark_dir, rw, args.block_size, numjobs, args.duration, test_name, args.size - ); - - println!("Executing command:\n{}\n", fio_command); - - let output = match run_command(&args.target, &fio_command) { - Ok(out) => out, - Err(e) => { - eprintln!("Failed to execute command for test {}: {}", test, e); - continue; - } - }; - - - let result = parse_fio_output(&output, &test_name, rw); - // TODO store raw fio output and print it - match result { - Ok(res) => { - results.push(res); - } - Err(e) => { - eprintln!("Error parsing fio output for test {}: {}", test, e); - eprintln!("Raw output:\n{}", output); - } + match cli.command { + None => { + // Backward-compatible simple benchmark + let args = simple::SimpleArgs { + target: cli.target, + benchmark_dir: cli.benchmark_dir, + tests: cli.tests, + duration: cli.duration, + output_dir: cli.output_dir, + size: cli.size, + block_size: cli.block_size, + }; + simple::run(&args) } - - println!("{output}"); - println!("Test {} completed.", test); - // A brief pause to let the system settle before the next test. - thread::sleep(Duration::from_secs(2)); + Some(Commands::Redpanda(args)) => redpanda::run(&args), + Some(Commands::Deploy(args)) => { + let mut a = args.clone(); + a.deploy_only = true; + redpanda::run(&a) + } + Some(Commands::Undeploy(args)) => redpanda::undeploy(&args), } - - // Cleanup the test file on the target - println!("--------------------------------------------------"); - println!("Cleaning up test file on target..."); - let cleanup_command = "rm -f ./iobench_testfile"; - if let Err(e) = run_command(&args.target, cleanup_command) { - eprintln!("Warning: Failed to clean up test file on target: {}", e); - } else { - println!("Cleanup successful."); - } - - - if results.is_empty() { - println!("\nNo benchmark results to display."); - return Ok(()); - } - - // Output results to a CSV file for easy analysis - let csv_path = format!("{}/summary.csv", output_dir); - let mut wtr = csv::Writer::from_path(&csv_path)?; - for result in &results { - wtr.serialize(result)?; - } - wtr.flush()?; - - println!("\nBenchmark summary saved to {}", csv_path); - println!("\n--- Benchmark Results Summary ---"); - println!("{:<25} {:>10} {:>18} {:>20} {:>22}", "Test Name", "IOPS", "Bandwidth (KiB/s)", "Latency Mean (ms)", "Latency StdDev (ms)"); - println!("{:-<98}", ""); - for result in results { - println!("{:<25} {:>10.2} {:>18.2} {:>20.4} {:>22.4}", result.test_name, result.iops, result.bandwidth_kibps, result.latency_mean_ms, result.latency_stddev_ms); - } - - Ok(()) } -fn run_command(target: &str, command: &str) -> io::Result { +pub fn run_command(target: &str, command: &str) -> io::Result { let (program, args) = if target == "localhost" { - ("sudo", vec!["sh".to_string(), "-c".to_string(), command.to_string()]) + ( + "sudo", + vec!["sh".to_string(), "-c".to_string(), command.to_string()], + ) } else if target.starts_with("ssh/") { let target_str = target.strip_prefix("ssh/").unwrap(); let ssh_target; @@ -203,13 +111,31 @@ fn run_command(target: &str, command: &str) -> io::Result { } else if target.starts_with("k8s/") { let parts: Vec<&str> = target.strip_prefix("k8s/").unwrap().split('/').collect(); if parts.len() != 2 { - return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid k8s target format. Expected k8s/{namespace}/{pod}")); + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid k8s target format. Expected k8s/{namespace}/{pod}", + )); } let namespace = parts[0]; let pod = parts[1]; - ("kubectl", vec!["exec".to_string(), "-n".to_string(), namespace.to_string(), pod.to_string(), "--".to_string(), "sh".to_string(), "-c".to_string(), command.to_string()]) + ( + "kubectl", + vec![ + "exec".to_string(), + "-n".to_string(), + namespace.to_string(), + pod.to_string(), + "--".to_string(), + "sh".to_string(), + "-c".to_string(), + command.to_string(), + ], + ) } else { - return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid target format")); + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + "Invalid target format", + )); }; let mut cmd = Command::new(program); @@ -222,32 +148,8 @@ fn run_command(target: &str, command: &str) -> io::Result { if !output.status.success() { eprintln!("Command failed with status: {}", output.status); io::stderr().write_all(&output.stderr)?; - return Err(io::Error::new(io::ErrorKind::Other, "Command execution failed")); + return Err(io::Error::other("Command execution failed")); } - String::from_utf8(output.stdout) - .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) -} - -fn parse_fio_output(output: &str, test_name: &str, rw: &str) -> Result { - let fio_data: FioOutput = serde_json::from_str(output) - .map_err(|e| format!("Failed to deserialize fio JSON: {}", e))?; - - let job_result = fio_data.jobs.iter() - .find(|j| j.jobname == test_name) - .ok_or_else(|| format!("Could not find job result for '{}' in fio output", test_name))?; - - let metrics = if rw.contains("read") { - &job_result.read - } else { - &job_result.write - }; - - Ok(BenchmarkResult { - test_name: test_name.to_string(), - iops: metrics.iops, - bandwidth_kibps: metrics.bw, - latency_mean_ms: metrics.clat_ns.mean / 1_000_000.0, - latency_stddev_ms: metrics.clat_ns.stddev / 1_000_000.0, - }) + String::from_utf8(output.stdout).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e)) } diff --git a/iobench/src/redpanda.rs b/iobench/src/redpanda.rs new file mode 100644 index 00000000..b6854fb7 --- /dev/null +++ b/iobench/src/redpanda.rs @@ -0,0 +1,476 @@ +use std::fs; +use std::io; +use std::path::Path; +use std::thread; +use std::time::{SystemTime, UNIX_EPOCH}; + +use chrono::Local; +use clap::Args; + +use crate::fio::{FioOutput, parse_fio_output}; +use crate::k8s; + +const NAME: &str = "iobench-redpanda"; +const IMAGE: &str = "juicedata/fio:latest"; + +const WORKLOAD_THROUGHPUT: &str = r#"[global] +direct=1 +ioengine=libaio +group_reporting +time_based +log_avg_msec=1000 +filename=/data/iobench_testfile +size=10G + +[throughput] +rw=write +bs=16K +numjobs=4 +iodepth=16 +runtime=300 +"#; + +const WORKLOAD_FSYNC_HOT_PATH: &str = r#"[global] +direct=1 +ioengine=libaio +group_reporting +time_based +log_avg_msec=1000 +filename=/data/iobench_testfile +size=5G + +[fsync_hot_path] +rw=write +bs=16K +fdatasync=1 +numjobs=4 +iodepth=4 +runtime=600 +"#; + +const WORKLOAD_SELFTEST_512K: &str = r#"[global] +direct=1 +ioengine=libaio +group_reporting +time_based +log_avg_msec=1000 +filename=/data/iobench_testfile +size=10G + +[selftest_512k] +rw=write +bs=512K +fdatasync=1 +numjobs=1 +iodepth=4 +runtime=120 +"#; + +const WORKLOAD_SELFTEST_4K_QD1: &str = r#"[global] +direct=1 +ioengine=libaio +group_reporting +time_based +log_avg_msec=1000 +filename=/data/iobench_testfile +size=2G + +[selftest_4k_qd1] +rw=write +bs=4K +fdatasync=1 +numjobs=1 +iodepth=1 +runtime=120 +"#; + +fn workloads() -> Vec<(&'static str, &'static str)> { + vec![ + ("throughput", WORKLOAD_THROUGHPUT), + ("fsync_hot_path", WORKLOAD_FSYNC_HOT_PATH), + ("selftest_512k", WORKLOAD_SELFTEST_512K), + ("selftest_4k_qd1", WORKLOAD_SELFTEST_4K_QD1), + ] +} + +#[derive(Args, Debug, Clone)] +pub struct RedpandaArgs { + #[arg(long, default_value = "both")] + pub mode: String, + #[arg(long, default_value = "ceph-block")] + pub storage_class: String, + #[arg(long, default_value = "50Gi")] + pub pvc_size: String, + #[arg(long, default_value_t = 3)] + pub replicas: i32, + #[arg(long, default_value = "default")] + pub namespace: String, + #[arg(long)] + pub output_dir: Option, + #[arg(long)] + pub keep_deployment: bool, + #[arg(long)] + pub deploy_only: bool, +} + +pub fn deploy(args: &RedpandaArgs) -> io::Result<()> { + println!("Deploying Redpanda iobench StatefulSet..."); + let ss = k8s::redpanda_statefulset( + NAME, + &args.namespace, + args.replicas, + &args.storage_class, + &args.pvc_size, + IMAGE, + ); + let yaml = serde_yaml::to_string(&ss) + .map_err(|e| io::Error::other(format!("YAML serialization failed: {}", e)))?; + k8s::apply_yaml(&yaml)?; + println!("Waiting for pods to be ready..."); + k8s::wait_for_pods(&args.namespace, NAME)?; + println!("Deployment complete."); + Ok(()) +} + +pub fn undeploy(args: &RedpandaArgs) -> io::Result<()> { + println!("Removing Redpanda iobench resources..."); + k8s::delete_resources(&args.namespace, NAME)?; + println!("Cleanup complete."); + Ok(()) +} + +pub fn run(args: &RedpandaArgs) -> io::Result<()> { + if args.deploy_only { + return deploy(args); + } + + let output_dir = args.output_dir.clone().unwrap_or_else(|| { + format!( + "./iobench-redpanda-{}", + Local::now().format("%Y-%m-%d-%H%M%S") + ) + }); + fs::create_dir_all(&output_dir)?; + + let pod_names = match k8s::get_pod_names(&args.namespace, NAME) { + Ok(pods) if pods.len() >= args.replicas as usize => pods, + _ => { + deploy(args)?; + k8s::get_pod_names(&args.namespace, NAME)? + } + }; + + if pod_names.len() < args.replicas as usize { + return Err(io::Error::other(format!( + "Expected {} pods, found {}", + args.replicas, + pod_names.len() + ))); + } + + let modes: Vec<&str> = match args.mode.as_str() { + "sequential" => vec!["sequential"], + "parallel" => vec!["parallel"], + "both" => vec!["sequential", "parallel"], + other => { + return Err(io::Error::other(format!( + "Unknown mode: {}. Use sequential, parallel, or both.", + other + ))); + } + }; + + let mut all_results: Vec = Vec::new(); + + for mode in modes { + if mode == "parallel" { + println!("\n============================================================"); + println!("WARNING: Parallel mode will run intensive fdatasync workloads"); + println!("concurrently across all nodes. This can severely impact other"); + println!("workloads on the same Ceph pool. Run during a maintenance"); + println!("window or off-peak."); + println!("============================================================\n"); + } + + println!("--- Running {} mode ---", mode); + + for (workload_name, job_template) in workloads() { + println!("Workload: {}", workload_name); + + let mode_dir = format!("{}/{}", output_dir, mode); + fs::create_dir_all(&mode_dir)?; + + let start_at = now_epoch() + 60; + + let results = if mode == "sequential" { + let pod = &pod_names[0]; + let result = run_workload_on_pod( + &args.namespace, + pod, + workload_name, + job_template, + start_at, + &mode_dir, + )?; + vec![(pod.clone(), result)] + } else { + let mut handles = Vec::new(); + for pod in &pod_names { + let ns = args.namespace.clone(); + let pod_for_thread = pod.clone(); + let workload = workload_name.to_string(); + let template = job_template.to_string(); + let dir = mode_dir.clone(); + let handle = thread::spawn(move || { + run_workload_on_pod( + &ns, + &pod_for_thread, + &workload, + &template, + start_at, + &dir, + ) + }); + handles.push((pod.clone(), handle)); + } + let mut collected = Vec::new(); + for (pod, handle) in handles { + match handle.join() { + Ok(Ok(result)) => collected.push((pod, result)), + Ok(Err(e)) => { + eprintln!("Error on pod {}: {}", pod, e); + } + Err(_) => { + eprintln!("Thread panic on pod {}", pod); + } + } + } + collected + }; + + for (pod, raw_json) in results { + let pod_slug = pod.replace(NAME, "node"); + let json_path = format!("{}/{}_{}.json", mode_dir, workload_name, pod_slug); + fs::write(&json_path, &raw_json)?; + + let result = parse_fio_output(&raw_json, workload_name, "write") + .map_err(io::Error::other)?; + + let sync = parse_sync_latencies(&raw_json, workload_name); + all_results.push(RedpandaCsvRow { + label: "redpanda".to_string(), + mode: mode.to_string(), + workload: workload_name.to_string(), + node: pod, + test_name: result.test_name.clone(), + iops: result.iops, + bandwidth_kibps: result.bandwidth_kibps, + latency_mean_ms: result.latency_mean_ms, + latency_stddev_ms: result.latency_stddev_ms, + clat_p50_ms: result.clat_p50_ms, + clat_p95_ms: result.clat_p95_ms, + clat_p99_ms: result.clat_p99_ms, + clat_p99_9_ms: result.clat_p99_9_ms, + clat_p99_99_ms: result.clat_p99_99_ms, + clat_max_ms: result.clat_max_ms, + fdatasync_total_ios: sync.total_ios, + fdatasync_mean_ms: sync.mean_ms, + fdatasync_p50_ms: sync.p50_ms, + fdatasync_p95_ms: sync.p95_ms, + fdatasync_p99_ms: sync.p99_ms, + fdatasync_p99_9_ms: sync.p99_9_ms, + fdatasync_p99_99_ms: sync.p99_99_ms, + fdatasync_max_ms: sync.max_ms, + }); + } + } + } + + let csv_path = format!("{}/redpanda_summary.csv", output_dir); + let mut wtr = csv::Writer::from_path(&csv_path)?; + for row in &all_results { + wtr.serialize(row)?; + } + wtr.flush()?; + println!("Redpanda summary saved to {}", csv_path); + + let dash_path = format!("{}/iobench.csv", output_dir); + let mut wtr2 = csv::Writer::from_path(&dash_path)?; + for row in &all_results { + wtr2.serialize(DashRow::from(row))?; + } + wtr2.flush()?; + println!("Dashboard CSV saved to {}", dash_path); + + if let Some(worst) = all_results + .iter() + .filter(|r| r.mode == "parallel" && r.workload == "fsync_hot_path") + .max_by(|a, b| { + a.fdatasync_p99_9_ms + .partial_cmp(&b.fdatasync_p99_9_ms) + .unwrap_or(std::cmp::Ordering::Equal) + }) + { + println!("\n=== HEADLINE ==="); + println!( + "Worst p99.9 fdatasync latency (parallel fsync_hot_path): {:.3} ms on {}", + worst.fdatasync_p99_9_ms, worst.node + ); + if worst.fdatasync_p99_9_ms > 100.0 { + println!("RESULT: FAIL (>100 ms). Raft heartbeats may not survive load spikes."); + } else { + println!("RESULT: PASS (<=100 ms)."); + } + } + + if !args.keep_deployment { + undeploy(args)?; + } + + Ok(()) +} + +fn now_epoch() -> u64 { + SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_secs() +} + +fn run_workload_on_pod( + namespace: &str, + pod: &str, + workload_name: &str, + job_template: &str, + start_at: u64, + local_mode_dir: &str, +) -> io::Result { + let remote_job_path = format!("/data/results/{}.fio", workload_name); + let remote_out_path = format!("/data/results/{}.json", workload_name); + let remote_results_dir = "/data/results"; + + let write_job_cmd = format!( + "mkdir -p {} && cat > {} <<'EOF'\n{}\nEOF", + remote_results_dir, remote_job_path, job_template + ); + k8s::exec(namespace, pod, &write_job_cmd)?; + + let fio_cmd = format!( + "NOW=$(date +%s); if [ $NOW -lt {start_at} ]; then sleep $(({start_at} - NOW)); fi; fio {remote_job_path} --output={remote_out_path} --output-format=json --write_lat_log={remote_results_dir}/{workload_name}_lat --write_iops_log={remote_results_dir}/{workload_name}_iops", + start_at = start_at, + remote_job_path = remote_job_path, + remote_out_path = remote_out_path, + remote_results_dir = remote_results_dir, + workload_name = workload_name, + ); + + let output = k8s::exec(namespace, pod, &fio_cmd)?; + + let local_pod_dir = format!("{}/{}", local_mode_dir, pod.replace(NAME, "node")); + fs::create_dir_all(&local_pod_dir)?; + if let Err(e) = k8s::cp_from(namespace, pod, remote_results_dir, &local_pod_dir) { + eprintln!("Warning: failed to copy results from {}: {}", pod, e); + } + + let local_json = format!("{}/{}.json", local_pod_dir, workload_name); + if Path::new(&local_json).exists() { + fs::read_to_string(&local_json) + } else { + Ok(output) + } +} + +#[derive(Debug, Clone, Default)] +struct SyncLatencies { + total_ios: u64, + mean_ms: f64, + p50_ms: f64, + p95_ms: f64, + p99_ms: f64, + p99_9_ms: f64, + p99_99_ms: f64, + max_ms: f64, +} + +fn parse_sync_latencies(json: &str, workload_name: &str) -> SyncLatencies { + let fio_data: FioOutput = match serde_json::from_str(json) { + Ok(d) => d, + Err(_) => return SyncLatencies::default(), + }; + let job = match fio_data.jobs.iter().find(|j| j.jobname == workload_name) { + Some(j) => j, + None => return SyncLatencies::default(), + }; + let sync = match &job.sync { + Some(s) => s, + None => return SyncLatencies::default(), + }; + + let p = sync.lat_ns.percentile.as_ref(); + SyncLatencies { + total_ios: sync.total_ios, + mean_ms: sync.lat_ns.mean / 1_000_000.0, + p50_ms: get_percentile(p, "50.000000"), + p95_ms: get_percentile(p, "95.000000"), + p99_ms: get_percentile(p, "99.000000"), + p99_9_ms: get_percentile(p, "99.900000"), + p99_99_ms: get_percentile(p, "99.990000"), + max_ms: get_percentile(p, "100.000000"), + } +} + +fn get_percentile(p: Option<&std::collections::HashMap>, key: &str) -> f64 { + p.and_then(|m| m.get(key).copied()).unwrap_or(0.0) / 1_000_000.0 +} + +#[derive(Debug, serde::Serialize)] +struct RedpandaCsvRow { + label: String, + mode: String, + workload: String, + node: String, + test_name: String, + iops: f64, + bandwidth_kibps: f64, + latency_mean_ms: f64, + latency_stddev_ms: f64, + clat_p50_ms: f64, + clat_p95_ms: f64, + clat_p99_ms: f64, + clat_p99_9_ms: f64, + clat_p99_99_ms: f64, + clat_max_ms: f64, + fdatasync_total_ios: u64, + fdatasync_mean_ms: f64, + fdatasync_p50_ms: f64, + fdatasync_p95_ms: f64, + fdatasync_p99_ms: f64, + fdatasync_p99_9_ms: f64, + fdatasync_p99_99_ms: f64, + fdatasync_max_ms: f64, +} + +#[derive(Debug, serde::Serialize)] +struct DashRow { + label: String, + test_name: String, + iops: f64, + bandwidth_kibps: f64, + latency_mean_ms: f64, + latency_stddev_ms: f64, + fdatasync_p99_9_ms: f64, +} + +impl From<&RedpandaCsvRow> for DashRow { + fn from(row: &RedpandaCsvRow) -> Self { + DashRow { + label: format!("{}-{}-{}", row.label, row.mode, row.workload), + test_name: row.test_name.clone(), + iops: row.iops, + bandwidth_kibps: row.bandwidth_kibps, + latency_mean_ms: row.latency_mean_ms, + latency_stddev_ms: row.latency_stddev_ms, + fdatasync_p99_9_ms: row.fdatasync_p99_9_ms, + } + } +} diff --git a/iobench/src/simple.rs b/iobench/src/simple.rs new file mode 100644 index 00000000..59dee2b9 --- /dev/null +++ b/iobench/src/simple.rs @@ -0,0 +1,123 @@ +use std::fs; +use std::io; +use std::thread; +use std::time::Duration; + +use chrono::Local; + +use crate::fio::parse_fio_output; +use crate::run_command; + +#[derive(Debug, Clone)] +pub struct SimpleArgs { + pub target: String, + pub benchmark_dir: String, + pub tests: String, + pub duration: u64, + pub output_dir: Option, + pub size: String, + pub block_size: String, +} + +pub fn run(args: &SimpleArgs) -> io::Result<()> { + let output_dir = args + .output_dir + .clone() + .unwrap_or_else(|| format!("./iobench-{}", Local::now().format("%Y-%m-%d-%H%M%S"))); + fs::create_dir_all(&output_dir)?; + + let tests_to_run: Vec<&str> = args.tests.split(',').collect(); + let mut results = Vec::new(); + + for test in tests_to_run { + println!("--------------------------------------------------"); + println!("Running test: {}", test); + + let (rw, numjobs) = match test { + "read" => ("read", 1), + "write" => ("write", 1), + "randread" => ("randread", 1), + "randwrite" => ("randwrite", 1), + "multiread" => ("read", 4), + "multiwrite" => ("write", 4), + "multirandread" => ("randread", 4), + "multirandwrite" => ("randwrite", 4), + _ => { + eprintln!("Unknown test: {}. Skipping.", test); + continue; + } + }; + + let test_name = format!("{}-{}-sync-test", test, args.block_size); + let fio_command = format!( + "fio --filename={}/iobench_testfile --direct=1 --fsync=1 --rw={} --bs={} --numjobs={} --iodepth=1 --runtime={} --time_based --group_reporting --name={} --size={} --output-format=json", + args.benchmark_dir, rw, args.block_size, numjobs, args.duration, test_name, args.size + ); + + println!("Executing command:\n{}\n", fio_command); + + let output = match run_command(&args.target, &fio_command) { + Ok(out) => out, + Err(e) => { + eprintln!("Failed to execute command for test {}: {}", test, e); + continue; + } + }; + + let result = parse_fio_output(&output, &test_name, rw); + match result { + Ok(res) => { + results.push(res); + } + Err(e) => { + eprintln!("Error parsing fio output for test {}: {}", test, e); + eprintln!("Raw output:\n{}", output); + } + } + + println!("{output}"); + println!("Test {} completed.", test); + thread::sleep(Duration::from_secs(2)); + } + + println!("--------------------------------------------------"); + println!("Cleaning up test file on target..."); + let cleanup_command = "rm -f ./iobench_testfile"; + if let Err(e) = run_command(&args.target, cleanup_command) { + eprintln!("Warning: Failed to clean up test file on target: {}", e); + } else { + println!("Cleanup successful."); + } + + if results.is_empty() { + println!("\nNo benchmark results to display."); + return Ok(()); + } + + let csv_path = format!("{}/summary.csv", output_dir); + let mut wtr = csv::Writer::from_path(&csv_path)?; + for result in &results { + wtr.serialize(result)?; + } + wtr.flush()?; + + println!("\nBenchmark summary saved to {}", csv_path); + println!("\n--- Benchmark Results Summary ---"); + println!( + "{:<25} {:>10} {:>18} {:>20} {:>22}", + "Test Name", "IOPS", "Bandwidth (KiB/s)", "Latency Mean (ms)", "Latency StdDev (ms)" + ); + println!("{:-<98}", ""); + for result in results { + println!( + "{:<25} {:>10.2} {:>18.2} {:>20.4} {:>22.4}", + result.test_name, + result.iops, + result.bandwidth_kibps, + result.latency_mean_ms, + result.latency_stddev_ms + ); + } + + Ok(()) +} -- 2.39.5 From 520dbd04613df65a524a2850a45c3586373bf2af Mon Sep 17 00:00:00 2001 From: Jean-Gabriel Gill-Couture Date: Mon, 25 May 2026 08:21:56 -0400 Subject: [PATCH 2/2] wip: redpanda simulation fio benchmark --- Cargo.lock | 35 +++ Cargo.toml | 1 + iobench/src/fio.rs | 132 ++++++++- iobench/src/redpanda.rs | 610 +++++++++++++++++++++++++++++++++++++--- 4 files changed, 728 insertions(+), 50 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 86a77a4b..b8f9d18e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1932,6 +1932,27 @@ dependencies = [ "typenum", ] +[[package]] +name = "csv" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52cd9d68cf7efc6ddfaaee42e7288d3a99d613d4b50f76ce9827ae0c6e14f938" +dependencies = [ + "csv-core", + "itoa", + "ryu", + "serde_core", +] + +[[package]] +name = "csv-core" +version = "0.1.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "704a3c26996a80471189265814dbc2c257598b96b8a7feae2d31ace646bb9782" +dependencies = [ + "memchr", +] + [[package]] name = "ctr" version = "0.9.2" @@ -4648,6 +4669,20 @@ dependencies = [ "thiserror 1.0.69", ] +[[package]] +name = "iobench" +version = "1.0.0" +dependencies = [ + "chrono", + "clap", + "csv", + "k8s-openapi", + "num_cpus", + "serde", + "serde_json", + "serde_yaml", +] + [[package]] name = "ipnet" version = "2.12.0" diff --git a/Cargo.toml b/Cargo.toml index ce0fdd18..a0122932 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -28,6 +28,7 @@ members = [ "harmony_node_readiness", "harmony-k8s", "harmony_assets", "opnsense-codegen", "opnsense-api", + "iobench", ] [workspace.package] diff --git a/iobench/src/fio.rs b/iobench/src/fio.rs index 4b8957bd..6ce77ccd 100644 --- a/iobench/src/fio.rs +++ b/iobench/src/fio.rs @@ -52,12 +52,13 @@ pub struct BenchmarkResult { } fn percentile_ns_to_ms(p: &Option>, key: &str) -> f64 { - p.as_ref() - .and_then(|m| m.get(key).copied()) - .unwrap_or(0.0) - / 1_000_000.0 + p.as_ref().and_then(|m| m.get(key).copied()).unwrap_or(0.0) / 1_000_000.0 } +/// Parse fio JSON output and extract metrics for the named job. +/// The `rw` parameter selects whether to read from the `read` or `write` +/// metrics block — any value containing "read" uses the read block, +/// everything else uses write. pub fn parse_fio_output( output: &str, test_name: &str, @@ -98,3 +99,126 @@ pub fn parse_fio_output( clat_max_ms: percentile_ns_to_ms(p, "100.000000"), }) } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn percentile_ns_to_ms_converts_correctly() { + let mut m = HashMap::new(); + m.insert("50.000000".to_string(), 4_000_000.0); // 4ms in ns + let p = Some(m); + assert!((percentile_ns_to_ms(&p, "50.000000") - 4.0).abs() < 0.001); + } + + #[test] + fn percentile_ns_to_ms_missing_key_returns_zero() { + let m = HashMap::new(); + assert_eq!(percentile_ns_to_ms(&Some(m), "99.000000"), 0.0); + } + + #[test] + fn percentile_ns_to_ms_none_returns_zero() { + assert_eq!(percentile_ns_to_ms(&None, "50.000000"), 0.0); + } + + #[test] + fn parse_fio_output_selects_read_metrics() { + let json = serde_json::json!({ + "jobs": [{ + "jobname": "readtest", + "read": { + "bw": 100000.0, "iops": 5000.0, + "clat_ns": { "mean": 200_000.0, "stddev": 50_000.0 } + }, + "write": { + "bw": 0.0, "iops": 0.0, + "clat_ns": { "mean": 0.0, "stddev": 0.0 } + } + }] + }) + .to_string(); + + let result = parse_fio_output(&json, "readtest", "read").unwrap(); + assert!((result.iops - 5000.0).abs() < 0.001); + assert!((result.bandwidth_kibps - 100000.0).abs() < 0.001); + } + + #[test] + fn parse_fio_output_selects_write_metrics() { + let json = serde_json::json!({ + "jobs": [{ + "jobname": "writetest", + "read": { + "bw": 0.0, "iops": 0.0, + "clat_ns": { "mean": 0.0, "stddev": 0.0 } + }, + "write": { + "bw": 51200.0, "iops": 3200.0, + "clat_ns": { "mean": 5_000_000.0, "stddev": 1_000_000.0 } + } + }] + }) + .to_string(); + + let result = parse_fio_output(&json, "writetest", "write").unwrap(); + assert!((result.iops - 3200.0).abs() < 0.001); + assert!((result.latency_mean_ms - 5.0).abs() < 0.001); + } + + #[test] + fn parse_fio_output_no_percentiles_returns_zeros() { + let json = serde_json::json!({ + "jobs": [{ + "jobname": "test", + "read": { + "bw": 0.0, "iops": 0.0, + "clat_ns": { "mean": 0.0, "stddev": 0.0 } + }, + "write": { + "bw": 1000.0, "iops": 100.0, + "clat_ns": { "mean": 10_000_000.0, "stddev": 2_000_000.0 } + } + }] + }) + .to_string(); + + let result = parse_fio_output(&json, "test", "write").unwrap(); + assert_eq!(result.clat_p50_ms, 0.0); + assert_eq!(result.clat_p99_ms, 0.0); + assert_eq!(result.clat_max_ms, 0.0); + } + + #[test] + fn parse_fio_output_invalid_json_errors() { + let result = parse_fio_output("{not json", "test", "write"); + assert!(result.is_err()); + } + + #[test] + fn fio_output_deserializes_with_optional_sync() { + // Without sync field + let json = serde_json::json!({ + "jobs": [{ + "jobname": "test", + "read": { "bw": 0.0, "iops": 0.0, "clat_ns": { "mean": 0.0, "stddev": 0.0 } }, + "write": { "bw": 0.0, "iops": 0.0, "clat_ns": { "mean": 0.0, "stddev": 0.0 } } + }] + }); + let parsed: FioOutput = serde_json::from_value(json).unwrap(); + assert!(parsed.jobs[0].sync.is_none()); + + // With sync field + let json = serde_json::json!({ + "jobs": [{ + "jobname": "test", + "read": { "bw": 0.0, "iops": 0.0, "clat_ns": { "mean": 0.0, "stddev": 0.0 } }, + "write": { "bw": 0.0, "iops": 0.0, "clat_ns": { "mean": 0.0, "stddev": 0.0 } }, + "sync": { "total_ios": 100, "lat_ns": { "mean": 500.0, "stddev": 100.0 } } + }] + }); + let parsed: FioOutput = serde_json::from_value(json).unwrap(); + assert_eq!(parsed.jobs[0].sync.as_ref().unwrap().total_ios, 100); + } +} diff --git a/iobench/src/redpanda.rs b/iobench/src/redpanda.rs index b6854fb7..b1e3a4d5 100644 --- a/iobench/src/redpanda.rs +++ b/iobench/src/redpanda.rs @@ -1,8 +1,13 @@ use std::fs; use std::io; use std::path::Path; +use std::sync::Arc; +use std::sync::atomic::{AtomicBool, Ordering}; use std::thread; -use std::time::{SystemTime, UNIX_EPOCH}; +use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; + +#[cfg(test)] +use std::collections::HashMap; use chrono::Local; use clap::Args; @@ -12,6 +17,7 @@ use crate::k8s; const NAME: &str = "iobench-redpanda"; const IMAGE: &str = "juicedata/fio:latest"; +const BARRIER_SECS: u64 = 60; const WORKLOAD_THROUGHPUT: &str = r#"[global] direct=1 @@ -84,15 +90,59 @@ iodepth=1 runtime=120 "#; -fn workloads() -> Vec<(&'static str, &'static str)> { +struct Workload { + name: &'static str, + template: &'static str, + runtime_secs: u64, + description: &'static str, +} + +fn workloads() -> Vec { vec![ - ("throughput", WORKLOAD_THROUGHPUT), - ("fsync_hot_path", WORKLOAD_FSYNC_HOT_PATH), - ("selftest_512k", WORKLOAD_SELFTEST_512K), - ("selftest_4k_qd1", WORKLOAD_SELFTEST_4K_QD1), + Workload { + name: "throughput", + template: WORKLOAD_THROUGHPUT, + runtime_secs: 300, + description: "16K seq write, no fsync, 4 jobs — bulk segment write upper bound", + }, + Workload { + name: "fsync_hot_path", + template: WORKLOAD_FSYNC_HOT_PATH, + runtime_secs: 600, + description: "16K seq write, fdatasync/op, 4 jobs — Raft acks=all commit path", + }, + Workload { + name: "selftest_512k", + template: WORKLOAD_SELFTEST_512K, + runtime_secs: 120, + description: "512K seq write, fdatasync, 1 job — rpk self-test 512K phase", + }, + Workload { + name: "selftest_4k_qd1", + template: WORKLOAD_SELFTEST_4K_QD1, + runtime_secs: 120, + description: "4K seq write, fdatasync, iodepth=1 — worst-case single-stream commit", + }, ] } +fn total_runtime_secs(workloads: &[Workload]) -> u64 { + workloads + .iter() + .map(|w| BARRIER_SECS + w.runtime_secs) + .sum() +} + +fn fmt_duration(secs: u64) -> String { + if secs >= 3600 { + format!("{}h{:02}m", secs / 3600, (secs % 3600) / 60) + } else if secs >= 60 { + format!("{}m{:02}s", secs / 60, secs % 60) + } else { + format!("{}s", secs) + } +} + #[derive(Args, Debug, Clone)] pub struct RedpandaArgs { #[arg(long, default_value = "both")] @@ -114,7 +164,10 @@ pub struct RedpandaArgs { } pub fn deploy(args: &RedpandaArgs) -> io::Result<()> { - println!("Deploying Redpanda iobench StatefulSet..."); + println!( + "[deploy] Creating StatefulSet '{}' in namespace '{}' ({} replicas, {} PVCs on {})", + NAME, args.namespace, args.replicas, args.pvc_size, args.storage_class + ); let ss = k8s::redpanda_statefulset( NAME, &args.namespace, @@ -126,16 +179,23 @@ pub fn deploy(args: &RedpandaArgs) -> io::Result<()> { let yaml = serde_yaml::to_string(&ss) .map_err(|e| io::Error::other(format!("YAML serialization failed: {}", e)))?; k8s::apply_yaml(&yaml)?; - println!("Waiting for pods to be ready..."); + println!( + "[deploy] StatefulSet applied, waiting for {} pods to be ready (timeout 300s)...", + args.replicas + ); k8s::wait_for_pods(&args.namespace, NAME)?; - println!("Deployment complete."); + let pods = k8s::get_pod_names(&args.namespace, NAME)?; + println!("[deploy] All pods ready: {}", pods.join(", ")); Ok(()) } pub fn undeploy(args: &RedpandaArgs) -> io::Result<()> { - println!("Removing Redpanda iobench resources..."); + println!( + "[cleanup] Deleting StatefulSet and PVCs with label app={}...", + NAME + ); k8s::delete_resources(&args.namespace, NAME)?; - println!("Cleanup complete."); + println!("[cleanup] Delete issued (async). Resources will be fully removed shortly."); Ok(()) } @@ -153,7 +213,10 @@ pub fn run(args: &RedpandaArgs) -> io::Result<()> { fs::create_dir_all(&output_dir)?; let pod_names = match k8s::get_pod_names(&args.namespace, NAME) { - Ok(pods) if pods.len() >= args.replicas as usize => pods, + Ok(pods) if pods.len() >= args.replicas as usize => { + println!("[setup] Found existing pods: {}", pods.join(", ")); + pods + } _ => { deploy(args)?; k8s::get_pod_names(&args.namespace, NAME)? @@ -180,53 +243,104 @@ pub fn run(args: &RedpandaArgs) -> io::Result<()> { } }; + let wl = workloads(); + let per_mode_secs = total_runtime_secs(&wl); + let total_secs = per_mode_secs * modes.len() as u64; + + println!(); + println!("============================================================"); + println!(" Redpanda I/O characterization profile"); + println!( + " Modes: {:?} | Workloads: {} | Pods: {}", + modes, + wl.len(), + pod_names.len() + ); + println!(" Estimated total runtime: {}", fmt_duration(total_secs)); + println!(" Output: {}", output_dir); + println!("============================================================"); + + let run_start = Instant::now(); let mut all_results: Vec = Vec::new(); - for mode in modes { - if mode == "parallel" { - println!("\n============================================================"); - println!("WARNING: Parallel mode will run intensive fdatasync workloads"); - println!("concurrently across all nodes. This can severely impact other"); - println!("workloads on the same Ceph pool. Run during a maintenance"); - println!("window or off-peak."); - println!("============================================================\n"); + for (mode_idx, mode) in modes.iter().enumerate() { + println!(); + if *mode == "parallel" { + println!("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); + println!(" WARNING: Parallel mode — intensive fdatasync across all"); + println!(" nodes simultaneously. Will impact other Ceph pool users."); + println!("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!"); } + println!( + "=== Mode: {} ({}/{}) — {} per-mode est. ===", + mode, + mode_idx + 1, + modes.len(), + fmt_duration(per_mode_secs) + ); - println!("--- Running {} mode ---", mode); + for (wl_idx, workload) in wl.iter().enumerate() { + let remaining_in_mode: u64 = wl[wl_idx..] + .iter() + .map(|w| BARRIER_SECS + w.runtime_secs) + .sum(); - for (workload_name, job_template) in workloads() { - println!("Workload: {}", workload_name); + println!(); + println!( + "--- [{}/{}] {} ({}s barrier + {}s fio = {}) ---", + wl_idx + 1, + wl.len(), + workload.name, + BARRIER_SECS, + workload.runtime_secs, + fmt_duration(BARRIER_SECS + workload.runtime_secs) + ); + println!(" {}", workload.description); + println!( + " Remaining in this mode: {} | Total elapsed: {}", + fmt_duration(remaining_in_mode), + fmt_duration(run_start.elapsed().as_secs()) + ); let mode_dir = format!("{}/{}", output_dir, mode); fs::create_dir_all(&mode_dir)?; - let start_at = now_epoch() + 60; + let start_at = now_epoch() + BARRIER_SECS; - let results = if mode == "sequential" { + let results = if *mode == "sequential" { let pod = &pod_names[0]; + println!(" Running on: {}", pod); let result = run_workload_on_pod( &args.namespace, pod, - workload_name, - job_template, + workload.name, + workload.template, + workload.runtime_secs, start_at, &mode_dir, )?; vec![(pod.clone(), result)] } else { + println!( + " Running on {} pods: {}", + pod_names.len(), + pod_names.join(", ") + ); let mut handles = Vec::new(); for pod in &pod_names { let ns = args.namespace.clone(); let pod_for_thread = pod.clone(); - let workload = workload_name.to_string(); - let template = job_template.to_string(); + let wl_name = workload.name.to_string(); + let template = workload.template.to_string(); let dir = mode_dir.clone(); + let rt = workload.runtime_secs; let handle = thread::spawn(move || { run_workload_on_pod( &ns, &pod_for_thread, - &workload, + &wl_name, &template, + rt, start_at, &dir, ) @@ -238,30 +352,52 @@ pub fn run(args: &RedpandaArgs) -> io::Result<()> { match handle.join() { Ok(Ok(result)) => collected.push((pod, result)), Ok(Err(e)) => { - eprintln!("Error on pod {}: {}", pod, e); + eprintln!(" ERROR on pod {}: {}", pod, e); } Err(_) => { - eprintln!("Thread panic on pod {}", pod); + eprintln!(" PANIC on pod {}", pod); } } } collected }; - for (pod, raw_json) in results { + println!( + " {} complete ({} elapsed total)", + workload.name, + fmt_duration(run_start.elapsed().as_secs()) + ); + + for (pod, raw_json) in &results { let pod_slug = pod.replace(NAME, "node"); - let json_path = format!("{}/{}_{}.json", mode_dir, workload_name, pod_slug); - fs::write(&json_path, &raw_json)?; + let json_path = format!("{}/{}_{}.json", mode_dir, workload.name, pod_slug); + fs::write(&json_path, raw_json)?; - let result = parse_fio_output(&raw_json, workload_name, "write") - .map_err(io::Error::other)?; + let result = parse_fio_output(raw_json, workload.name, "write").map_err(|e| { + io::Error::other(format!("{} on {}: {}", workload.name, pod, e)) + })?; + + let sync = parse_sync_latencies(raw_json, workload.name); + + // Print per-pod summary inline + let bw_mib = result.bandwidth_kibps / 1024.0; + if sync.total_ios > 0 { + println!( + " {} => {:.0} IOPS | {:.1} MiB/s | clat_p99={:.2}ms | fsync_p99.9={:.2}ms", + pod_slug, result.iops, bw_mib, result.clat_p99_ms, sync.p99_9_ms + ); + } else { + println!( + " {} => {:.0} IOPS | {:.1} MiB/s | clat_p99={:.2}ms", + pod_slug, result.iops, bw_mib, result.clat_p99_ms + ); + } - let sync = parse_sync_latencies(&raw_json, workload_name); all_results.push(RedpandaCsvRow { label: "redpanda".to_string(), mode: mode.to_string(), - workload: workload_name.to_string(), - node: pod, + workload: workload.name.to_string(), + node: pod.clone(), test_name: result.test_name.clone(), iops: result.iops, bandwidth_kibps: result.bandwidth_kibps, @@ -286,13 +422,21 @@ pub fn run(args: &RedpandaArgs) -> io::Result<()> { } } + println!(); + println!("============================================================"); + println!( + " All workloads complete. Total runtime: {}", + fmt_duration(run_start.elapsed().as_secs()) + ); + println!("============================================================"); + let csv_path = format!("{}/redpanda_summary.csv", output_dir); let mut wtr = csv::Writer::from_path(&csv_path)?; for row in &all_results { wtr.serialize(row)?; } wtr.flush()?; - println!("Redpanda summary saved to {}", csv_path); + println!("[output] Redpanda summary: {}", csv_path); let dash_path = format!("{}/iobench.csv", output_dir); let mut wtr2 = csv::Writer::from_path(&dash_path)?; @@ -300,7 +444,7 @@ pub fn run(args: &RedpandaArgs) -> io::Result<()> { wtr2.serialize(DashRow::from(row))?; } wtr2.flush()?; - println!("Dashboard CSV saved to {}", dash_path); + println!("[output] Dashboard CSV: {}", dash_path); if let Some(worst) = all_results .iter() @@ -311,7 +455,8 @@ pub fn run(args: &RedpandaArgs) -> io::Result<()> { .unwrap_or(std::cmp::Ordering::Equal) }) { - println!("\n=== HEADLINE ==="); + println!(); + println!("=== HEADLINE ==="); println!( "Worst p99.9 fdatasync latency (parallel fsync_hot_path): {:.3} ms on {}", worst.fdatasync_p99_9_ms, worst.node @@ -324,6 +469,7 @@ pub fn run(args: &RedpandaArgs) -> io::Result<()> { } if !args.keep_deployment { + println!(); undeploy(args)?; } @@ -342,9 +488,11 @@ fn run_workload_on_pod( pod: &str, workload_name: &str, job_template: &str, + runtime_secs: u64, start_at: u64, local_mode_dir: &str, ) -> io::Result { + let pod_slug = pod.replace(NAME, "node"); let remote_job_path = format!("/data/results/{}.fio", workload_name); let remote_out_path = format!("/data/results/{}.json", workload_name); let remote_results_dir = "/data/results"; @@ -364,12 +512,49 @@ fn run_workload_on_pod( workload_name = workload_name, ); - let output = k8s::exec(namespace, pod, &fio_cmd)?; + // Progress ticker — prints every 10s until fio exec returns + let done = Arc::new(AtomicBool::new(false)); + let done_clone = done.clone(); + let ticker_label = pod_slug.clone(); + let wl = workload_name.to_string(); + let total_expected = BARRIER_SECS + runtime_secs; + let progress_handle = thread::spawn(move || { + let start = Instant::now(); + loop { + thread::sleep(Duration::from_secs(10)); + if done_clone.load(Ordering::Acquire) { + break; + } + let elapsed = start.elapsed().as_secs(); + let phase = if elapsed < BARRIER_SECS { + format!("barrier sync {}s/{}s", elapsed, BARRIER_SECS) + } else { + let fio_elapsed = elapsed - BARRIER_SECS; + format!("fio running {}s/{}s", fio_elapsed, runtime_secs) + }; + let remaining = total_expected.saturating_sub(elapsed); + eprintln!( + " [{}:{}] {} ({} remaining)", + ticker_label, + wl, + phase, + fmt_duration(remaining) + ); + } + }); - let local_pod_dir = format!("{}/{}", local_mode_dir, pod.replace(NAME, "node")); + let output = k8s::exec(namespace, pod, &fio_cmd); + + done.store(true, Ordering::Release); + let _ = progress_handle.join(); + + let output = output?; + + let local_pod_dir = format!("{}/{}", local_mode_dir, pod_slug); fs::create_dir_all(&local_pod_dir)?; + eprintln!(" [{}] Copying results from pod...", pod_slug); if let Err(e) = k8s::cp_from(namespace, pod, remote_results_dir, &local_pod_dir) { - eprintln!("Warning: failed to copy results from {}: {}", pod, e); + eprintln!(" [{}] Warning: failed to copy results: {}", pod_slug, e); } let local_json = format!("{}/{}.json", local_pod_dir, workload_name); @@ -474,3 +659,336 @@ impl From<&RedpandaCsvRow> for DashRow { } } } + +#[cfg(test)] +mod tests { + use super::*; + + // -- fmt_duration -- + + #[test] + fn fmt_duration_seconds_only() { + assert_eq!(fmt_duration(0), "0s"); + assert_eq!(fmt_duration(1), "1s"); + assert_eq!(fmt_duration(59), "59s"); + } + + #[test] + fn fmt_duration_minutes_and_seconds() { + assert_eq!(fmt_duration(60), "1m00s"); + assert_eq!(fmt_duration(61), "1m01s"); + assert_eq!(fmt_duration(125), "2m05s"); + assert_eq!(fmt_duration(3599), "59m59s"); + } + + #[test] + fn fmt_duration_hours() { + assert_eq!(fmt_duration(3600), "1h00m"); + assert_eq!(fmt_duration(3660), "1h01m"); + assert_eq!(fmt_duration(7200), "2h00m"); + assert_eq!(fmt_duration(5432), "1h30m"); + } + + // -- total_runtime_secs -- + + #[test] + fn total_runtime_secs_matches_workload_sum() { + let wl = workloads(); + let expected: u64 = wl.iter().map(|w| BARRIER_SECS + w.runtime_secs).sum(); + assert_eq!(total_runtime_secs(&wl), expected); + // 4 workloads: (60+300) + (60+600) + (60+120) + (60+120) = 1380 + assert_eq!(expected, 1380); + } + + // -- Workload runtime consistency -- + // Verify that each Workload.runtime_secs matches the runtime= value + // in its fio template. Prevents the two sources of truth from diverging. + + #[test] + fn workload_runtime_matches_fio_template() { + for w in workloads() { + let runtime_from_template = w + .template + .lines() + .find_map(|line| { + let trimmed = line.trim(); + trimmed + .strip_prefix("runtime=") + .and_then(|v| v.parse::().ok()) + }) + .unwrap_or_else(|| { + panic!("Could not find runtime= in fio template for '{}'", w.name) + }); + assert_eq!( + w.runtime_secs, runtime_from_template, + "Workload '{}': struct says {}s but template says runtime={}", + w.name, w.runtime_secs, runtime_from_template + ); + } + } + + // -- parse_sync_latencies -- + + fn sample_fio_json_with_sync() -> String { + serde_json::json!({ + "jobs": [{ + "jobname": "fsync_hot_path", + "read": { + "bw": 0.0, "iops": 0.0, + "clat_ns": { "mean": 0.0, "stddev": 0.0 } + }, + "write": { + "bw": 51200.0, "iops": 3200.0, + "clat_ns": { + "mean": 5_000_000.0, "stddev": 1_000_000.0, + "percentile": { + "50.000000": 4_000_000.0, + "95.000000": 8_000_000.0, + "99.000000": 12_000_000.0, + "99.900000": 20_000_000.0, + "99.990000": 30_000_000.0, + "100.000000": 50_000_000.0 + } + } + }, + "sync": { + "total_ios": 50000_u64, + "lat_ns": { + "mean": 2_000_000.0, "stddev": 500_000.0, + "percentile": { + "50.000000": 1_500_000.0, + "95.000000": 3_000_000.0, + "99.000000": 5_000_000.0, + "99.900000": 10_000_000.0, + "99.990000": 20_000_000.0, + "100.000000": 50_000_000.0 + } + } + } + }] + }) + .to_string() + } + + #[test] + fn parse_sync_latencies_extracts_correctly() { + let json = sample_fio_json_with_sync(); + let sync = parse_sync_latencies(&json, "fsync_hot_path"); + + assert_eq!(sync.total_ios, 50000); + assert!((sync.mean_ms - 2.0).abs() < 0.001); + assert!((sync.p50_ms - 1.5).abs() < 0.001); + assert!((sync.p95_ms - 3.0).abs() < 0.001); + assert!((sync.p99_ms - 5.0).abs() < 0.001); + assert!((sync.p99_9_ms - 10.0).abs() < 0.001); + assert!((sync.p99_99_ms - 20.0).abs() < 0.001); + assert!((sync.max_ms - 50.0).abs() < 0.001); + } + + #[test] + fn parse_sync_latencies_wrong_job_returns_default() { + let json = sample_fio_json_with_sync(); + let sync = parse_sync_latencies(&json, "nonexistent"); + assert_eq!(sync.total_ios, 0); + assert_eq!(sync.mean_ms, 0.0); + } + + #[test] + fn parse_sync_latencies_invalid_json_returns_default() { + let sync = parse_sync_latencies("not json", "fsync_hot_path"); + assert_eq!(sync.total_ios, 0); + } + + fn sample_fio_json_no_sync() -> String { + serde_json::json!({ + "jobs": [{ + "jobname": "throughput", + "read": { + "bw": 0.0, "iops": 0.0, + "clat_ns": { "mean": 0.0, "stddev": 0.0 } + }, + "write": { + "bw": 51200.0, "iops": 3200.0, + "clat_ns": { + "mean": 5_000_000.0, "stddev": 1_000_000.0, + "percentile": { + "50.000000": 4_000_000.0, + "95.000000": 8_000_000.0, + "99.000000": 12_000_000.0, + "99.900000": 20_000_000.0, + "99.990000": 30_000_000.0, + "100.000000": 50_000_000.0 + } + } + } + }] + }) + .to_string() + } + + #[test] + fn parse_sync_latencies_no_sync_field_returns_default() { + let json = sample_fio_json_no_sync(); + let sync = parse_sync_latencies(&json, "throughput"); + assert_eq!(sync.total_ios, 0); + } + + // -- get_percentile -- + + #[test] + fn get_percentile_extracts_and_converts_ns_to_ms() { + let mut m = HashMap::new(); + m.insert("99.000000".to_string(), 12_000_000.0); + let result = get_percentile(Some(&m), "99.000000"); + assert!((result - 12.0).abs() < 0.001); + } + + #[test] + fn get_percentile_missing_key_returns_zero() { + let m = HashMap::new(); + assert_eq!(get_percentile(Some(&m), "99.000000"), 0.0); + } + + #[test] + fn get_percentile_none_map_returns_zero() { + assert_eq!(get_percentile(None, "99.000000"), 0.0); + } + + // -- parse_fio_output integration -- + + #[test] + fn parse_fio_output_with_sync_workload() { + let json = sample_fio_json_with_sync(); + let result = parse_fio_output(&json, "fsync_hot_path", "write").unwrap(); + + assert!((result.iops - 3200.0).abs() < 0.001); + assert!((result.bandwidth_kibps - 51200.0).abs() < 0.001); + assert!((result.latency_mean_ms - 5.0).abs() < 0.001); + assert!((result.clat_p50_ms - 4.0).abs() < 0.001); + assert!((result.clat_p99_ms - 12.0).abs() < 0.001); + assert!((result.clat_p99_9_ms - 20.0).abs() < 0.001); + assert!((result.clat_max_ms - 50.0).abs() < 0.001); + } + + #[test] + fn parse_fio_output_no_sync_workload() { + let json = sample_fio_json_no_sync(); + let result = parse_fio_output(&json, "throughput", "write").unwrap(); + + assert!((result.iops - 3200.0).abs() < 0.001); + assert!((result.clat_p99_ms - 12.0).abs() < 0.001); + } + + #[test] + fn parse_fio_output_wrong_job_name_errors() { + let json = sample_fio_json_no_sync(); + let result = parse_fio_output(&json, "nonexistent", "write"); + assert!(result.is_err()); + assert!(result.unwrap_err().contains("nonexistent")); + } + + // -- DashRow label formatting -- + + #[test] + fn dash_row_label_format() { + let row = RedpandaCsvRow { + label: "redpanda".to_string(), + mode: "parallel".to_string(), + workload: "fsync_hot_path".to_string(), + node: "iobench-redpanda-0".to_string(), + test_name: "fsync_hot_path".to_string(), + iops: 1000.0, + bandwidth_kibps: 16000.0, + latency_mean_ms: 5.0, + latency_stddev_ms: 1.0, + clat_p50_ms: 4.0, + clat_p95_ms: 8.0, + clat_p99_ms: 12.0, + clat_p99_9_ms: 20.0, + clat_p99_99_ms: 30.0, + clat_max_ms: 50.0, + fdatasync_total_ios: 50000, + fdatasync_mean_ms: 2.0, + fdatasync_p50_ms: 1.5, + fdatasync_p95_ms: 3.0, + fdatasync_p99_ms: 5.0, + fdatasync_p99_9_ms: 10.0, + fdatasync_p99_99_ms: 20.0, + fdatasync_max_ms: 50.0, + }; + let dash = DashRow::from(&row); + assert_eq!(dash.label, "redpanda-parallel-fsync_hot_path"); + assert!((dash.fdatasync_p99_9_ms - 10.0).abs() < 0.001); + } + + // -- CSV serialization round-trip -- + + #[test] + fn redpanda_csv_row_serializes_all_columns() { + let row = RedpandaCsvRow { + label: "redpanda".to_string(), + mode: "sequential".to_string(), + workload: "throughput".to_string(), + node: "node-0".to_string(), + test_name: "throughput".to_string(), + iops: 3200.0, + bandwidth_kibps: 51200.0, + latency_mean_ms: 5.0, + latency_stddev_ms: 1.0, + clat_p50_ms: 4.0, + clat_p95_ms: 8.0, + clat_p99_ms: 12.0, + clat_p99_9_ms: 20.0, + clat_p99_99_ms: 30.0, + clat_max_ms: 50.0, + fdatasync_total_ios: 0, + fdatasync_mean_ms: 0.0, + fdatasync_p50_ms: 0.0, + fdatasync_p95_ms: 0.0, + fdatasync_p99_ms: 0.0, + fdatasync_p99_9_ms: 0.0, + fdatasync_p99_99_ms: 0.0, + fdatasync_max_ms: 0.0, + }; + let mut wtr = csv::Writer::from_writer(Vec::new()); + wtr.serialize(&row).unwrap(); + wtr.flush().unwrap(); + let csv_output = String::from_utf8(wtr.into_inner().unwrap()).unwrap(); + let lines: Vec<&str> = csv_output.lines().collect(); + + // Header line must contain all expected column names + let header = lines[0]; + for col in [ + "label", + "mode", + "workload", + "node", + "test_name", + "iops", + "bandwidth_kibps", + "latency_mean_ms", + "latency_stddev_ms", + "clat_p50_ms", + "clat_p95_ms", + "clat_p99_ms", + "clat_p99_9_ms", + "clat_p99_99_ms", + "clat_max_ms", + "fdatasync_total_ios", + "fdatasync_mean_ms", + "fdatasync_p50_ms", + "fdatasync_p95_ms", + "fdatasync_p99_ms", + "fdatasync_p99_9_ms", + "fdatasync_p99_99_ms", + "fdatasync_max_ms", + ] { + assert!(header.contains(col), "CSV header missing column: {}", col); + } + + // Data line should have the right number of fields + let data_fields: Vec<&str> = lines[1].split(',').collect(); + let header_fields: Vec<&str> = header.split(',').collect(); + assert_eq!(data_fields.len(), header_fields.len()); + } +} -- 2.39.5