feat: iobench redpanda profile to run the recommended fio settings by redpanda on a k8s storage backend #281

Open
johnride wants to merge 2 commits from feat/iobench-redpanda-profile into master
12 changed files with 1893 additions and 208 deletions

2
.gitignore vendored
View File

@@ -32,3 +32,5 @@ ignore
# Generated book # Generated book
book book
__pycache__

35
Cargo.lock generated
View File

@@ -1932,6 +1932,27 @@ dependencies = [
"typenum", "typenum",
] ]
[[package]]
name = "csv"
version = "1.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "52cd9d68cf7efc6ddfaaee42e7288d3a99d613d4b50f76ce9827ae0c6e14f938"
dependencies = [
"csv-core",
"itoa",
"ryu",
"serde_core",
]
[[package]]
name = "csv-core"
version = "0.1.13"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "704a3c26996a80471189265814dbc2c257598b96b8a7feae2d31ace646bb9782"
dependencies = [
"memchr",
]
[[package]] [[package]]
name = "ctr" name = "ctr"
version = "0.9.2" version = "0.9.2"
@@ -4648,6 +4669,20 @@ dependencies = [
"thiserror 1.0.69", "thiserror 1.0.69",
] ]
[[package]]
name = "iobench"
version = "1.0.0"
dependencies = [
"chrono",
"clap",
"csv",
"k8s-openapi",
"num_cpus",
"serde",
"serde_json",
"serde_yaml",
]
[[package]] [[package]]
name = "ipnet" name = "ipnet"
version = "2.12.0" version = "2.12.0"

View File

@@ -28,6 +28,7 @@ members = [
"harmony_node_readiness", "harmony_node_readiness",
"harmony-k8s", "harmony-k8s",
"harmony_assets", "opnsense-codegen", "opnsense-api", "harmony_assets", "opnsense-codegen", "opnsense-api",
"iobench",
] ]
[workspace.package] [workspace.package]

View File

@@ -11,7 +11,8 @@ clap = { version = "4.0", features = ["derive"] }
chrono = "0.4" chrono = "0.4"
serde = { version = "1.0", features = ["derive"] } serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0" serde_json = "1.0"
serde_yaml = { workspace = true }
csv = "1.1" csv = "1.1"
num_cpus = "1.13" num_cpus = "1.13"
k8s-openapi = { workspace = true }
[workspace]

View File

@@ -0,0 +1,68 @@
# Redpanda iobench preflight checklist
Cluster inspection performed: 2026-05-03
Context: maintenance window, other workloads turned off, Ceph pool at ~0 IOPS idle.
---
## Cluster topology
- [x] **PASS** — 3 worker nodes available (wk0, wk1, wk2). Matches `--replicas 3` default.
- [x] **PASS** — All 3 workers are Ready, no DiskPressure / MemoryPressure / PIDPressure.
- [x] **PASS** — Control plane nodes (cp3, cp4, cp5) have `NoSchedule` taint. iobench pods cannot land there.
- [x] **PASS** — All nodes carry `kubernetes.io/hostname` label. Anti-affinity topology key will work.
- [x] **PASS** — Worker nodes are untainted and have no scheduling restrictions.
## Storage
- [x] **PASS** — StorageClass `ceph-block` exists, `volumeBindingMode: Immediate`, `reclaimPolicy: Delete`.
- [x] **PASS** — Ceph pool `ceph-blockpool` replication: size=3, min_size=2.
- [x] **PASS** — 3 OSDs all Running (osd-0, osd-1, osd-2).
- [x] **PASS** — Raw capacity: 503 GiB available. 3x50GiB PVCs = 150 GiB usable = 450 GiB raw. Leaves 53 GiB raw headroom (~10.5%). Tight but sufficient for a benchmark run.
- [x] **PASS** — Largest single-workload disk footprint: `throughput` with numjobs=4 x size=10G = 40 GiB per PVC. Fits in 50 GiB PVC with 10 GiB headroom for logs and filesystem overhead.
- [x] **PASS** — Pool is idle ("nothing is going on"), OSD commit/apply latency showing 0 ms. Confirms maintenance window baseline.
- [x] **PASS** — All 3 disks are actually Intel SSDs (misidentified as HDD by Ceph due to HP RAID controller passthrough). No mixed-media concern.
- [x] **PASS (non-blocking)** — Ceph health is `HEALTH_WARN: too many PGs per OSD (265 > max 250)`. This is a pre-existing tuning issue unrelated to the benchmark. Does not affect correctness of results. Note: if this were a data-integrity warning (e.g. `HEALTH_WARN: degraded PGs`), the benchmark must not proceed.
## Namespace and resource constraints
- [x] **PASS**`default` namespace exists.
- [x] **PASS** — No ResourceQuota or LimitRange in `default` namespace. Pod creation won't be blocked.
- [x] **PASS** — No leftover `iobench-redpanda` StatefulSet or PVCs in `default` namespace. Clean slate.
## Container image
- [x] **PASS**`juicedata/fio:latest` pulls successfully and runs on this cluster.
- [x] **PASS** — fio version 3.18 confirmed. Supports `fdatasync=1`, `log_avg_msec`, `write_lat_log`, `write_iops_log`, `--output-format=json`.
- [x] **PASS** — Image contains `sh`, `date` (needed for wall-clock barrier), and `tar` (needed for `kubectl cp`).
## Clock synchronization (barrier correctness)
- [x] **PASS** — Node heartbeat timestamps are within ~5s of each other. The 60-second barrier (`start_at = now_epoch() + 60`) provides ample margin. Barrier only fails if clock skew exceeds 60s.
## Code safety review
- [x] **PASS** — All fio workloads use `direct=1` (bypass page cache). Benchmark measures Ceph, not RAM.
- [x] **PASS** — All writes target `/data/iobench_testfile` on the mounted PVC filesystem, not a raw block device. No risk of corrupting the node OS disk.
- [x] **PASS** — Pods run with default security context (no privileged, no hostPath, no hostNetwork). Blast radius is limited to the PVCs.
- [x] **PASS**`reclaimPolicy: Delete` means PVCs and their backing RBD images are fully cleaned up when deleted. No storage leak after `undeploy`.
- [x] **PASS**`delete_resources` targets only `statefulset,pvc` with label `app=iobench-redpanda`. Cannot accidentally delete unrelated resources.
- [x] **PASS** — Parallel mode prints a clear warning before starting.
- [x] **PASS**`--keep-deployment` flag available for debugging without re-provisioning.
- [x] **PASS** — Workloads run sequentially within each mode (throughput, then fsync_hot_path, then selftest_512k, then selftest_4k_qd1). Disk usage doesn't accumulate across workloads since they reuse the same filename.
## Resource consumption during run
- [x] **PASS** — Workers have ample CPU headroom (3-6% current usage). fio with 4 jobs + iodepth 16 is not CPU-intensive on modern hardware.
- [x] **PASS** — Workers have ample memory headroom (5-10% current usage). fio with `direct=1` uses minimal RAM.
- [x] **PASS** — No CPU/memory resource limits or requests set on the fio container. This is intentional — resource limits would throttle the benchmark and distort results. Acceptable during a maintenance window with other workloads off.
## Risks acknowledged
- **Ceph pool impact**: Parallel mode will saturate the Ceph pool. This is the point of the test. Confirmed other workloads are off and pool is at ~0 IOPS.
- **Capacity headroom is tight**: 53 GiB raw remaining after PVC provisioning (~10.5%). If the cluster has background operations that consume space (e.g., snapshots, compaction), this could trigger a `HEALTH_ERR: full` condition. Mitigated by: maintenance window, no other workloads, and `reclaimPolicy: Delete` ensuring cleanup.
- **`--wait=false` on delete**: `delete_resources` uses `--wait=false`, so `undeploy` returns before PVCs are fully reclaimed. This is fine — Ceph handles RBD image deletion asynchronously. But if re-running immediately after undeploy, PVCs from the previous run may still be terminating. Mitigated by: the deploy step uses `kubectl apply` which is idempotent.
## Verdict
All checks pass. Safe to proceed with `iobench redpanda --storage-class ceph-block` during the current maintenance window.

146
iobench/README.md Normal file
View File

@@ -0,0 +1,146 @@
# iobench
A command-line I/O benchmarking tool using fio. Runs locally, over SSH, or on Kubernetes pods. Includes a Redpanda storage characterization profile for validating Ceph RBD suitability.
## Build
```bash
cargo build -p iobench --release
```
## Simple profile (default)
Runs standard fio benchmarks (sequential/random read/write, single/multi-job) against a local or remote target.
```bash
# Local
iobench
# Over SSH
iobench --target ssh/user@host
# On a Kubernetes pod
iobench --target k8s/namespace/pod-name
# Custom parameters
iobench --duration 60 --size 4G --block-size 128k --tests read,write,randwrite
```
Available tests: `read`, `write`, `randread`, `randwrite`, `multiread`, `multiwrite`, `multirandread`, `multirandwrite`.
Results are saved to `./iobench-{timestamp}/summary.csv`.
## Redpanda profile
Characterizes whether a Kubernetes storage backend (e.g. Ceph RBD) can sustain Redpanda's I/O patterns. Deploys a 3-pod StatefulSet with per-node PVCs and runs four fio workloads that model Redpanda's actual storage behavior.
### Quick start
```bash
# Run the full profile (sequential baselines, then parallel contention test)
iobench redpanda --storage-class ceph-block
# Sequential baselines only (single-node, no cluster impact)
iobench redpanda --mode sequential --storage-class ceph-block
# Parallel contention test only
iobench redpanda --mode parallel --storage-class ceph-block
# Keep pods alive after the run (useful for re-running or debugging)
iobench redpanda --storage-class ceph-block --keep-deployment
# Deploy pods without running any workloads
iobench deploy --storage-class ceph-block
# Remove all iobench pods and PVCs
iobench undeploy
```
### Options
| Flag | Default | Description |
|------|---------|-------------|
| `--mode` | `both` | `sequential`, `parallel`, or `both` |
| `--storage-class` | `ceph-block` | Kubernetes StorageClass for the PVCs |
| `--pvc-size` | `50Gi` | Size of each PVC |
| `--replicas` | `3` | Number of pods (should match cluster node count) |
| `--namespace` | `default` | Kubernetes namespace |
| `--output-dir` | auto-timestamped | Directory for results |
| `--keep-deployment` | `false` | Don't delete pods/PVCs after the run |
### Workloads
The profile runs four workloads, each targeting a different aspect of Redpanda's I/O:
| Workload | What it models | Key params | Runtime |
|----------|---------------|------------|---------|
| `throughput` | Bulk segment writes (optimistic upper bound) | 16K seq write, no fsync, 4 jobs, iodepth=16 | 5 min |
| `fsync_hot_path` | Raft commit path with `acks=all` | 16K seq write, fdatasync per op, 4 jobs, iodepth=4 | 10 min |
| `selftest_512k` | `rpk cluster self-test` 512K phase | 512K seq write, fdatasync, 1 job, iodepth=4 | 2 min |
| `selftest_4k_qd1` | Worst-case single-stream commit | 4K seq write, fdatasync, 1 job, iodepth=1 | 2 min |
All workloads use `direct=1` (bypass page cache) and `ioengine=libaio`.
### Execution modes
- **`sequential`** -- Runs all four workloads back-to-back on a single node. Clean per-pattern baselines, comparable to Redpanda's published hardware requirements (16,000 IOPS minimum per broker).
- **`parallel`** -- Runs all four workloads concurrently across all nodes simultaneously. Each node writes to its own PVC. A wall-clock barrier synchronizes start times across pods so contention windows overlap. This is the production-shape test that exposes the Ceph OSD fan-in pattern.
- **`both`** (default) -- Sequential first, then parallel.
### Interpreting results
The **headline metric** is the worst p99.9 fdatasync latency on the `fsync_hot_path` workload during parallel execution:
- **<= 100 ms**: PASS. Storage can sustain Raft heartbeats under load.
- **> 100 ms**: FAIL. Raft heartbeats (150 ms interval, 1.5 s election timeout) will not survive load spikes, leading to election storms.
Reference values from healthy NVMe (from `rpk cluster self-test`):
- `selftest_512k`: ~1182 IOPS, ~591 MiB/s
- `selftest_4k_qd1`: ~406 IOPS
### Output
Results are saved to `./iobench-redpanda-{timestamp}/`:
```
iobench-redpanda-2026-05-03-143022/
redpanda_summary.csv # Full results: all metrics, all nodes, all workloads
iobench.csv # Dashboard-compatible format
sequential/
throughput_node-0.json # Raw fio JSON per workload per node
fsync_hot_path_node-0.json
...
node-0/ # Per-node time-series logs
throughput_lat.1.log
throughput_iops.1.log
...
parallel/
throughput_node-0.json
throughput_node-1.json
throughput_node-2.json
...
```
The CSV includes per-workload per-node: IOPS, bandwidth, completion latency percentiles (p50-p99.99, max), and fdatasync latency percentiles (p50-p99.99, max).
Per-second time-series logs (`*_lat.*.log`, `*_iops.*.log`) capture the bimodal/spiky behavior of Ceph under load that summary statistics miss.
### Warning
Parallel mode generates heavy fdatasync workloads across all nodes simultaneously. This **will impact other workloads on the same Ceph pool**. Run during a maintenance window or off-peak.
## Dashboard
A Plotly Dash app for visualizing results. Supports both simple and Redpanda profile data.
```bash
cd iobench/dash
virtualenv venv
source venv/bin/activate
pip install -r requirements_freeze.txt
# Copy result CSVs into the dash directory, then:
python iobench-dash.py
```
The dashboard reads `iobench.csv` and/or `redpanda_summary.csv` from its working directory.

View File

@@ -1,3 +1,5 @@
import os
import dash import dash
from dash import dcc, html, Input, Output, State, clientside_callback, ClientsideFunction from dash import dcc, html, Input, Output, State, clientside_callback, ClientsideFunction
import plotly.express as px import plotly.express as px
@@ -6,38 +8,29 @@ import dash_bootstrap_components as dbc
import io import io
# --- Data Loading and Preparation --- # --- Data Loading and Preparation ---
# csv_data = """label,test_name,iops,bandwidth_kibps,latency_mean_ms,latency_stddev_ms dataframes = []
# Ceph HDD Only,read-4k-sync-test,1474.302,5897,0.673,0.591
# Ceph HDD Only,write-4k-sync-test,14.126,56,27.074,7.046 if os.path.exists("iobench.csv"):
# Ceph HDD Only,randread-4k-sync-test,225.140,900,4.436,6.918 df_simple = pd.read_csv("iobench.csv")
# Ceph HDD Only,randwrite-4k-sync-test,13.129,52,34.891,10.859 dataframes.append(df_simple)
# Ceph HDD Only,multiread-4k-sync-test,6873.675,27494,0.578,0.764
# Ceph HDD Only,multiwrite-4k-sync-test,57.135,228,38.660,11.293 if os.path.exists("redpanda_summary.csv"):
# Ceph HDD Only,multirandread-4k-sync-test,2451.376,9805,1.626,2.515 df_red = pd.read_csv("redpanda_summary.csv")
# Ceph HDD Only,multirandwrite-4k-sync-test,54.642,218,33.492,13.111 df_red['label'] = df_red['mode'] + '-' + df_red['workload']
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,read-4k-sync-test,1495.700,5982,0.664,1.701 df_red['test_name'] = df_red['workload']
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,write-4k-sync-test,16.990,67,17.502,9.908 cols = ['label', 'test_name', 'iops', 'bandwidth_kibps', 'latency_mean_ms', 'latency_stddev_ms', 'fdatasync_p99_9_ms']
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,randread-4k-sync-test,159.256,637,6.274,9.232 df_red = df_red[[c for c in cols if c in df_red.columns]]
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,randwrite-4k-sync-test,16.693,66,24.094,16.099 dataframes.append(df_red)
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multiread-4k-sync-test,7305.559,29222,0.544,1.338
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multiwrite-4k-sync-test,52.260,209,34.891,17.576 if not dataframes:
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multirandread-4k-sync-test,700.606,2802,5.700,10.429 raise FileNotFoundError("No iobench.csv or redpanda_summary.csv found in working directory.")
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multirandwrite-4k-sync-test,52.723,210,29.709,25.829
# Ceph 2 Hosts WAL+DB SSD Only,randwrite-4k-sync-test,90.037,360,3.617,8.321 df = pd.concat(dataframes, ignore_index=True)
# Ceph WAL+DB SSD During Rebuild,randwrite-4k-sync-test,41.008,164,10.138,19.333
# Ceph WAL+DB SSD OSD HDD,read-4k-sync-test,1520.299,6081,0.654,1.539
# Ceph WAL+DB SSD OSD HDD,write-4k-sync-test,78.528,314,4.074,9.101
# Ceph WAL+DB SSD OSD HDD,randread-4k-sync-test,153.303,613,6.518,9.036
# Ceph WAL+DB SSD OSD HDD,randwrite-4k-sync-test,48.677,194,8.785,20.356
# Ceph WAL+DB SSD OSD HDD,multiread-4k-sync-test,6804.880,27219,0.584,1.422
# Ceph WAL+DB SSD OSD HDD,multiwrite-4k-sync-test,311.513,1246,4.978,9.458
# Ceph WAL+DB SSD OSD HDD,multirandread-4k-sync-test,581.756,2327,6.869,10.204
# Ceph WAL+DB SSD OSD HDD,multirandwrite-4k-sync-test,120.556,482,13.463,25.440
# """
#
# df = pd.read_csv(io.StringIO(csv_data))
df = pd.read_csv("iobench.csv") # Replace with the actual file path
df['bandwidth_mbps'] = df['bandwidth_kibps'] / 1024 df['bandwidth_mbps'] = df['bandwidth_kibps'] / 1024
if 'fdatasync_p99_9_ms' not in df.columns:
df['fdatasync_p99_9_ms'] = 0.0
else:
df['fdatasync_p99_9_ms'] = df['fdatasync_p99_9_ms'].fillna(0.0)
# --- App Initialization and Global Settings --- # --- App Initialization and Global Settings ---
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.FLATLY]) app = dash.Dash(__name__, external_stylesheets=[dbc.themes.FLATLY])
@@ -70,7 +63,8 @@ app.layout = dbc.Container([
options=[ options=[
{'label': 'IOPS', 'value': 'iops'}, {'label': 'IOPS', 'value': 'iops'},
{'label': 'Latency (ms)', 'value': 'latency_mean_ms'}, {'label': 'Latency (ms)', 'value': 'latency_mean_ms'},
{'label': 'Bandwidth (MB/s)', 'value': 'bandwidth_mbps'} {'label': 'Bandwidth (MB/s)', 'value': 'bandwidth_mbps'},
{'label': 'fdatasync p99.9 (ms)', 'value': 'fdatasync_p99_9_ms'},
], ],
value=['iops', 'latency_mean_ms', 'bandwidth_mbps'], # Default selection value=['iops', 'latency_mean_ms', 'bandwidth_mbps'], # Default selection
labelClassName="d-block" labelClassName="d-block"
@@ -185,11 +179,12 @@ def update_graphs(selected_metrics, selected_configs, selected_tests):
metric_titles = { metric_titles = {
'iops': 'IOPS Comparison (Higher is Better)', 'iops': 'IOPS Comparison (Higher is Better)',
'latency_mean_ms': 'Mean Latency (ms) Comparison (Lower is Better)', 'latency_mean_ms': 'Mean Latency (ms) Comparison (Lower is Better)',
'bandwidth_mbps': 'Bandwidth (MB/s) Comparison (Higher is Better)' 'bandwidth_mbps': 'Bandwidth (MB/s) Comparison (Higher is Better)',
'fdatasync_p99_9_ms': 'fdatasync p99.9 Latency (ms) Comparison (Lower is Better)',
} }
for metric in selected_metrics: for metric in selected_metrics:
sort_order = 'total ascending' if metric == 'latency_mean_ms' else 'total descending' sort_order = 'total ascending' if metric in ('latency_mean_ms', 'fdatasync_p99_9_ms') else 'total descending'
error_y_param = 'latency_stddev_ms' if metric == 'latency_mean_ms' else None error_y_param = 'latency_stddev_ms' if metric == 'latency_mean_ms' else None
fig = px.bar( fig = px.bar(
@@ -206,7 +201,8 @@ def update_graphs(selected_metrics, selected_configs, selected_tests):
"iops": "IOPS", "iops": "IOPS",
"latency_mean_ms": "Mean Latency (ms)", "latency_mean_ms": "Mean Latency (ms)",
"bandwidth_mbps": "Bandwidth (MB/s)", "bandwidth_mbps": "Bandwidth (MB/s)",
"label": "Cluster Configuration" "fdatasync_p99_9_ms": "fdatasync p99.9 (ms)",
"label": "Configuration"
} }
) )

224
iobench/src/fio.rs Normal file
View File

@@ -0,0 +1,224 @@
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
#[derive(Debug, Serialize, Deserialize)]
pub struct FioOutput {
pub jobs: Vec<FioJobResult>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct FioJobResult {
pub jobname: String,
pub read: FioMetrics,
pub write: FioMetrics,
#[serde(default)]
pub sync: Option<SyncMetrics>,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct FioMetrics {
pub bw: f64,
pub iops: f64,
pub clat_ns: LatencyMetrics,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct SyncMetrics {
pub total_ios: u64,
pub lat_ns: LatencyMetrics,
}
#[derive(Debug, Serialize, Deserialize)]
pub struct LatencyMetrics {
pub mean: f64,
pub stddev: f64,
#[serde(default)]
pub percentile: Option<HashMap<String, f64>>,
}
#[derive(Debug, Serialize)]
pub struct BenchmarkResult {
pub test_name: String,
pub iops: f64,
pub bandwidth_kibps: f64,
pub latency_mean_ms: f64,
pub latency_stddev_ms: f64,
pub clat_p50_ms: f64,
pub clat_p95_ms: f64,
pub clat_p99_ms: f64,
pub clat_p99_9_ms: f64,
pub clat_p99_99_ms: f64,
pub clat_max_ms: f64,
}
fn percentile_ns_to_ms(p: &Option<HashMap<String, f64>>, key: &str) -> f64 {
p.as_ref().and_then(|m| m.get(key).copied()).unwrap_or(0.0) / 1_000_000.0
}
/// Parse fio JSON output and extract metrics for the named job.
/// The `rw` parameter selects whether to read from the `read` or `write`
/// metrics block — any value containing "read" uses the read block,
/// everything else uses write.
pub fn parse_fio_output(
output: &str,
test_name: &str,
rw: &str,
) -> Result<BenchmarkResult, String> {
let fio_data: FioOutput = serde_json::from_str(output)
.map_err(|e| format!("Failed to deserialize fio JSON: {}", e))?;
let job_result = fio_data
.jobs
.iter()
.find(|j| j.jobname == test_name)
.ok_or_else(|| {
format!(
"Could not find job result for '{}' in fio output",
test_name
)
})?;
let metrics = if rw.contains("read") {
&job_result.read
} else {
&job_result.write
};
let p = &metrics.clat_ns.percentile;
Ok(BenchmarkResult {
test_name: test_name.to_string(),
iops: metrics.iops,
bandwidth_kibps: metrics.bw,
latency_mean_ms: metrics.clat_ns.mean / 1_000_000.0,
latency_stddev_ms: metrics.clat_ns.stddev / 1_000_000.0,
clat_p50_ms: percentile_ns_to_ms(p, "50.000000"),
clat_p95_ms: percentile_ns_to_ms(p, "95.000000"),
clat_p99_ms: percentile_ns_to_ms(p, "99.000000"),
clat_p99_9_ms: percentile_ns_to_ms(p, "99.900000"),
clat_p99_99_ms: percentile_ns_to_ms(p, "99.990000"),
clat_max_ms: percentile_ns_to_ms(p, "100.000000"),
})
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn percentile_ns_to_ms_converts_correctly() {
let mut m = HashMap::new();
m.insert("50.000000".to_string(), 4_000_000.0); // 4ms in ns
let p = Some(m);
assert!((percentile_ns_to_ms(&p, "50.000000") - 4.0).abs() < 0.001);
}
#[test]
fn percentile_ns_to_ms_missing_key_returns_zero() {
let m = HashMap::new();
assert_eq!(percentile_ns_to_ms(&Some(m), "99.000000"), 0.0);
}
#[test]
fn percentile_ns_to_ms_none_returns_zero() {
assert_eq!(percentile_ns_to_ms(&None, "50.000000"), 0.0);
}
#[test]
fn parse_fio_output_selects_read_metrics() {
let json = serde_json::json!({
"jobs": [{
"jobname": "readtest",
"read": {
"bw": 100000.0, "iops": 5000.0,
"clat_ns": { "mean": 200_000.0, "stddev": 50_000.0 }
},
"write": {
"bw": 0.0, "iops": 0.0,
"clat_ns": { "mean": 0.0, "stddev": 0.0 }
}
}]
})
.to_string();
let result = parse_fio_output(&json, "readtest", "read").unwrap();
assert!((result.iops - 5000.0).abs() < 0.001);
assert!((result.bandwidth_kibps - 100000.0).abs() < 0.001);
}
#[test]
fn parse_fio_output_selects_write_metrics() {
let json = serde_json::json!({
"jobs": [{
"jobname": "writetest",
"read": {
"bw": 0.0, "iops": 0.0,
"clat_ns": { "mean": 0.0, "stddev": 0.0 }
},
"write": {
"bw": 51200.0, "iops": 3200.0,
"clat_ns": { "mean": 5_000_000.0, "stddev": 1_000_000.0 }
}
}]
})
.to_string();
let result = parse_fio_output(&json, "writetest", "write").unwrap();
assert!((result.iops - 3200.0).abs() < 0.001);
assert!((result.latency_mean_ms - 5.0).abs() < 0.001);
}
#[test]
fn parse_fio_output_no_percentiles_returns_zeros() {
let json = serde_json::json!({
"jobs": [{
"jobname": "test",
"read": {
"bw": 0.0, "iops": 0.0,
"clat_ns": { "mean": 0.0, "stddev": 0.0 }
},
"write": {
"bw": 1000.0, "iops": 100.0,
"clat_ns": { "mean": 10_000_000.0, "stddev": 2_000_000.0 }
}
}]
})
.to_string();
let result = parse_fio_output(&json, "test", "write").unwrap();
assert_eq!(result.clat_p50_ms, 0.0);
assert_eq!(result.clat_p99_ms, 0.0);
assert_eq!(result.clat_max_ms, 0.0);
}
#[test]
fn parse_fio_output_invalid_json_errors() {
let result = parse_fio_output("{not json", "test", "write");
assert!(result.is_err());
}
#[test]
fn fio_output_deserializes_with_optional_sync() {
// Without sync field
let json = serde_json::json!({
"jobs": [{
"jobname": "test",
"read": { "bw": 0.0, "iops": 0.0, "clat_ns": { "mean": 0.0, "stddev": 0.0 } },
"write": { "bw": 0.0, "iops": 0.0, "clat_ns": { "mean": 0.0, "stddev": 0.0 } }
}]
});
let parsed: FioOutput = serde_json::from_value(json).unwrap();
assert!(parsed.jobs[0].sync.is_none());
// With sync field
let json = serde_json::json!({
"jobs": [{
"jobname": "test",
"read": { "bw": 0.0, "iops": 0.0, "clat_ns": { "mean": 0.0, "stddev": 0.0 } },
"write": { "bw": 0.0, "iops": 0.0, "clat_ns": { "mean": 0.0, "stddev": 0.0 } },
"sync": { "total_ios": 100, "lat_ns": { "mean": 500.0, "stddev": 100.0 } }
}]
});
let parsed: FioOutput = serde_json::from_value(json).unwrap();
assert_eq!(parsed.jobs[0].sync.as_ref().unwrap().total_ios, 100);
}
}

193
iobench/src/k8s.rs Normal file
View File

@@ -0,0 +1,193 @@
use std::collections::BTreeMap;
use std::io;
use std::io::Write;
use std::process::{Command, Stdio};
use k8s_openapi::api::apps::v1::StatefulSet;
use k8s_openapi::api::core::v1::{
Affinity, Container, PersistentVolumeClaim, PersistentVolumeClaimSpec, PodAffinityTerm,
PodAntiAffinity, PodSpec, PodTemplateSpec, VolumeMount, VolumeResourceRequirements,
};
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta};
pub fn redpanda_statefulset(
name: &str,
namespace: &str,
replicas: i32,
storage_class: &str,
size: &str,
image: &str,
) -> StatefulSet {
let mut labels = BTreeMap::new();
labels.insert("app".to_string(), name.to_string());
let pvc = PersistentVolumeClaim {
metadata: ObjectMeta {
name: Some("data".to_string()),
..Default::default()
},
spec: Some(PersistentVolumeClaimSpec {
access_modes: Some(vec!["ReadWriteOnce".to_string()]),
resources: Some(VolumeResourceRequirements {
requests: Some({
let mut m = BTreeMap::new();
m.insert("storage".to_string(), Quantity(size.to_string()));
m
}),
..Default::default()
}),
storage_class_name: Some(storage_class.to_string()),
..Default::default()
}),
..Default::default()
};
let container = Container {
name: "fio".to_string(),
image: Some(image.to_string()),
image_pull_policy: Some("IfNotPresent".to_string()),
command: Some(vec!["sleep".to_string(), "infinity".to_string()]),
volume_mounts: Some(vec![VolumeMount {
name: "data".to_string(),
mount_path: "/data".to_string(),
..Default::default()
}]),
..Default::default()
};
let pod_spec = PodSpec {
containers: vec![container],
affinity: Some(Affinity {
pod_anti_affinity: Some(PodAntiAffinity {
required_during_scheduling_ignored_during_execution: Some(vec![PodAffinityTerm {
label_selector: Some(LabelSelector {
match_labels: Some(labels.clone()),
..Default::default()
}),
topology_key: "kubernetes.io/hostname".to_string(),
..Default::default()
}]),
..Default::default()
}),
..Default::default()
}),
..Default::default()
};
StatefulSet {
metadata: ObjectMeta {
name: Some(name.to_string()),
namespace: Some(namespace.to_string()),
labels: Some(labels.clone()),
..Default::default()
},
spec: Some(k8s_openapi::api::apps::v1::StatefulSetSpec {
replicas: Some(replicas),
selector: LabelSelector {
match_labels: Some(labels.clone()),
..Default::default()
},
service_name: Some(name.to_string()),
pod_management_policy: Some("Parallel".to_string()),
template: PodTemplateSpec {
metadata: Some(ObjectMeta {
labels: Some(labels),
..Default::default()
}),
spec: Some(pod_spec),
},
volume_claim_templates: Some(vec![pvc]),
..Default::default()
}),
..Default::default()
}
}
pub fn run_kubectl(args: &[&str]) -> io::Result<String> {
let mut cmd = Command::new("kubectl");
cmd.args(args);
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
let output = cmd.output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(io::Error::other(format!(
"kubectl failed: {} (args: {:?})",
stderr, args
)));
}
Ok(String::from_utf8_lossy(&output.stdout).to_string())
}
pub fn apply_yaml(yaml: &str) -> io::Result<()> {
let mut cmd = Command::new("kubectl");
cmd.args(["apply", "-f", "-"]);
cmd.stdin(Stdio::piped());
cmd.stdout(Stdio::piped());
cmd.stderr(Stdio::piped());
let mut child = cmd.spawn()?;
{
let stdin = child.stdin.as_mut().unwrap();
stdin.write_all(yaml.as_bytes())?;
}
let output = child.wait_with_output()?;
if !output.status.success() {
let stderr = String::from_utf8_lossy(&output.stderr);
return Err(io::Error::other(format!(
"kubectl apply failed: {}",
stderr
)));
}
Ok(())
}
pub fn wait_for_pods(namespace: &str, name: &str) -> io::Result<()> {
run_kubectl(&[
"rollout",
"status",
"statefulset",
name,
"-n",
namespace,
"--timeout",
"300s",
])?;
Ok(())
}
pub fn get_pod_names(namespace: &str, label: &str) -> io::Result<Vec<String>> {
let out = run_kubectl(&[
"get",
"pods",
"-n",
namespace,
"-l",
&format!("app={}", label),
"-o",
"jsonpath={.items[*].metadata.name}",
])?;
Ok(out.split_whitespace().map(|s| s.to_string()).collect())
}
pub fn exec(namespace: &str, pod: &str, command: &str) -> io::Result<String> {
run_kubectl(&["exec", "-n", namespace, pod, "--", "sh", "-c", command])
}
pub fn cp_from(namespace: &str, pod: &str, remote: &str, local: &str) -> io::Result<()> {
run_kubectl(&["cp", "-n", namespace, &format!("{}:{}", pod, remote), local])?;
Ok(())
}
pub fn delete_resources(namespace: &str, label: &str) -> io::Result<()> {
run_kubectl(&[
"delete",
"statefulset,pvc",
"-n",
namespace,
"-l",
&format!("app={}", label),
"--wait=false",
])?;
Ok(())
}

View File

@@ -1,23 +1,22 @@
use std::fs; use std::io;
use std::io::{self, Write}; use std::io::Write;
use std::process::{Command, Stdio}; use std::process::{Command, Stdio};
use std::thread;
use std::time::Duration;
use chrono::Local; use clap::{Parser, Subcommand};
use clap::Parser;
use serde::{Deserialize, Serialize}; mod fio;
mod k8s;
mod redpanda;
mod simple;
/// A simple yet powerful I/O benchmarking tool using fio. /// A simple yet powerful I/O benchmarking tool using fio.
#[derive(Parser, Debug)] #[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)] #[command(author, version, about, long_about = None)]
struct Args { struct Cli {
/// Target for the benchmark. #[command(subcommand)]
/// Formats: command: Option<Commands>,
/// - localhost (default)
/// - ssh/{user}@{host} // Simple profile args (used when no subcommand is given)
/// - ssh/{user}@{host}:{port}
/// - k8s/{namespace}/{pod}
#[arg(short, long, default_value = "localhost")] #[arg(short, long, default_value = "localhost")]
target: String, target: String,
@@ -27,7 +26,10 @@ struct Args {
/// Comma-separated list of tests to run. /// Comma-separated list of tests to run.
/// Available tests: read, write, randread, randwrite, /// Available tests: read, write, randread, randwrite,
/// multiread, multiwrite, multirandread, multirandwrite. /// multiread, multiwrite, multirandread, multirandwrite.
#[arg(long, default_value = "read,write,randread,randwrite,multiread,multiwrite,multirandread,multirandwrite")] #[arg(
long,
default_value = "read,write,randread,randwrite,multiread,multiwrite,multirandread,multirandwrite"
)]
tests: String, tests: String,
/// Duration of each test in seconds. /// Duration of each test in seconds.
@@ -48,143 +50,49 @@ struct Args {
block_size: String, block_size: String,
} }
#[derive(Debug, Serialize, Deserialize)] #[derive(Subcommand, Debug)]
struct FioOutput { enum Commands {
jobs: Vec<FioJobResult>, /// Run the Redpanda storage characterization profile.
} Redpanda(redpanda::RedpandaArgs),
/// Deploy the Redpanda iobench resources and exit.
#[derive(Debug, Serialize, Deserialize)] Deploy(redpanda::RedpandaArgs),
struct FioJobResult { /// Remove the Redpanda iobench resources.
jobname: String, Undeploy(redpanda::RedpandaArgs),
read: FioMetrics,
write: FioMetrics,
}
#[derive(Debug, Serialize, Deserialize)]
struct FioMetrics {
bw: f64,
iops: f64,
clat_ns: LatencyMetrics,
}
#[derive(Debug, Serialize, Deserialize)]
struct LatencyMetrics {
mean: f64,
stddev: f64,
}
#[derive(Debug, Serialize)]
struct BenchmarkResult {
test_name: String,
iops: f64,
bandwidth_kibps: f64,
latency_mean_ms: f64,
latency_stddev_ms: f64,
} }
fn main() -> io::Result<()> { fn main() -> io::Result<()> {
let args = Args::parse(); let cli = Cli::parse();
let output_dir = args.output_dir.unwrap_or_else(|| { match cli.command {
format!("./iobench-{}", Local::now().format("%Y-%m-%d-%H%M%S")) None => {
}); // Backward-compatible simple benchmark
fs::create_dir_all(&output_dir)?; let args = simple::SimpleArgs {
target: cli.target,
let tests_to_run: Vec<&str> = args.tests.split(',').collect(); benchmark_dir: cli.benchmark_dir,
let mut results = Vec::new(); tests: cli.tests,
duration: cli.duration,
for test in tests_to_run { output_dir: cli.output_dir,
println!("--------------------------------------------------"); size: cli.size,
println!("Running test: {}", test); block_size: cli.block_size,
let (rw, numjobs) = match test {
"read" => ("read", 1),
"write" => ("write", 1),
"randread" => ("randread", 1),
"randwrite" => ("randwrite", 1),
"multiread" => ("read", 4),
"multiwrite" => ("write", 4),
"multirandread" => ("randread", 4),
"multirandwrite" => ("randwrite", 4),
_ => {
eprintln!("Unknown test: {}. Skipping.", test);
continue;
}
}; };
simple::run(&args)
let test_name = format!("{}-{}-sync-test", test, args.block_size);
let fio_command = format!(
"fio --filename={}/iobench_testfile --direct=1 --fsync=1 --rw={} --bs={} --numjobs={} --iodepth=1 --runtime={} --time_based --group_reporting --name={} --size={} --output-format=json",
args.benchmark_dir, rw, args.block_size, numjobs, args.duration, test_name, args.size
);
println!("Executing command:\n{}\n", fio_command);
let output = match run_command(&args.target, &fio_command) {
Ok(out) => out,
Err(e) => {
eprintln!("Failed to execute command for test {}: {}", test, e);
continue;
} }
}; Some(Commands::Redpanda(args)) => redpanda::run(&args),
Some(Commands::Deploy(args)) => {
let mut a = args.clone();
let result = parse_fio_output(&output, &test_name, rw); a.deploy_only = true;
// TODO store raw fio output and print it redpanda::run(&a)
match result {
Ok(res) => {
results.push(res);
} }
Err(e) => { Some(Commands::Undeploy(args)) => redpanda::undeploy(&args),
eprintln!("Error parsing fio output for test {}: {}", test, e);
eprintln!("Raw output:\n{}", output);
} }
}
println!("{output}");
println!("Test {} completed.", test);
// A brief pause to let the system settle before the next test.
thread::sleep(Duration::from_secs(2));
}
// Cleanup the test file on the target
println!("--------------------------------------------------");
println!("Cleaning up test file on target...");
let cleanup_command = "rm -f ./iobench_testfile";
if let Err(e) = run_command(&args.target, cleanup_command) {
eprintln!("Warning: Failed to clean up test file on target: {}", e);
} else {
println!("Cleanup successful.");
}
if results.is_empty() {
println!("\nNo benchmark results to display.");
return Ok(());
}
// Output results to a CSV file for easy analysis
let csv_path = format!("{}/summary.csv", output_dir);
let mut wtr = csv::Writer::from_path(&csv_path)?;
for result in &results {
wtr.serialize(result)?;
}
wtr.flush()?;
println!("\nBenchmark summary saved to {}", csv_path);
println!("\n--- Benchmark Results Summary ---");
println!("{:<25} {:>10} {:>18} {:>20} {:>22}", "Test Name", "IOPS", "Bandwidth (KiB/s)", "Latency Mean (ms)", "Latency StdDev (ms)");
println!("{:-<98}", "");
for result in results {
println!("{:<25} {:>10.2} {:>18.2} {:>20.4} {:>22.4}", result.test_name, result.iops, result.bandwidth_kibps, result.latency_mean_ms, result.latency_stddev_ms);
}
Ok(())
} }
fn run_command(target: &str, command: &str) -> io::Result<String> { pub fn run_command(target: &str, command: &str) -> io::Result<String> {
let (program, args) = if target == "localhost" { let (program, args) = if target == "localhost" {
("sudo", vec!["sh".to_string(), "-c".to_string(), command.to_string()]) (
"sudo",
vec!["sh".to_string(), "-c".to_string(), command.to_string()],
)
} else if target.starts_with("ssh/") { } else if target.starts_with("ssh/") {
let target_str = target.strip_prefix("ssh/").unwrap(); let target_str = target.strip_prefix("ssh/").unwrap();
let ssh_target; let ssh_target;
@@ -203,13 +111,31 @@ fn run_command(target: &str, command: &str) -> io::Result<String> {
} else if target.starts_with("k8s/") { } else if target.starts_with("k8s/") {
let parts: Vec<&str> = target.strip_prefix("k8s/").unwrap().split('/').collect(); let parts: Vec<&str> = target.strip_prefix("k8s/").unwrap().split('/').collect();
if parts.len() != 2 { if parts.len() != 2 {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid k8s target format. Expected k8s/{namespace}/{pod}")); return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid k8s target format. Expected k8s/{namespace}/{pod}",
));
} }
let namespace = parts[0]; let namespace = parts[0];
let pod = parts[1]; let pod = parts[1];
("kubectl", vec!["exec".to_string(), "-n".to_string(), namespace.to_string(), pod.to_string(), "--".to_string(), "sh".to_string(), "-c".to_string(), command.to_string()]) (
"kubectl",
vec![
"exec".to_string(),
"-n".to_string(),
namespace.to_string(),
pod.to_string(),
"--".to_string(),
"sh".to_string(),
"-c".to_string(),
command.to_string(),
],
)
} else { } else {
return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid target format")); return Err(io::Error::new(
io::ErrorKind::InvalidInput,
"Invalid target format",
));
}; };
let mut cmd = Command::new(program); let mut cmd = Command::new(program);
@@ -222,32 +148,8 @@ fn run_command(target: &str, command: &str) -> io::Result<String> {
if !output.status.success() { if !output.status.success() {
eprintln!("Command failed with status: {}", output.status); eprintln!("Command failed with status: {}", output.status);
io::stderr().write_all(&output.stderr)?; io::stderr().write_all(&output.stderr)?;
return Err(io::Error::new(io::ErrorKind::Other, "Command execution failed")); return Err(io::Error::other("Command execution failed"));
} }
String::from_utf8(output.stdout) String::from_utf8(output.stdout).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
}
fn parse_fio_output(output: &str, test_name: &str, rw: &str) -> Result<BenchmarkResult, String> {
let fio_data: FioOutput = serde_json::from_str(output)
.map_err(|e| format!("Failed to deserialize fio JSON: {}", e))?;
let job_result = fio_data.jobs.iter()
.find(|j| j.jobname == test_name)
.ok_or_else(|| format!("Could not find job result for '{}' in fio output", test_name))?;
let metrics = if rw.contains("read") {
&job_result.read
} else {
&job_result.write
};
Ok(BenchmarkResult {
test_name: test_name.to_string(),
iops: metrics.iops,
bandwidth_kibps: metrics.bw,
latency_mean_ms: metrics.clat_ns.mean / 1_000_000.0,
latency_stddev_ms: metrics.clat_ns.stddev / 1_000_000.0,
})
} }

994
iobench/src/redpanda.rs Normal file
View File

@@ -0,0 +1,994 @@
use std::fs;
use std::io;
use std::path::Path;
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
use std::thread;
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
#[cfg(test)]
use std::collections::HashMap;
use chrono::Local;
use clap::Args;
use crate::fio::{FioOutput, parse_fio_output};
use crate::k8s;
const NAME: &str = "iobench-redpanda";
const IMAGE: &str = "juicedata/fio:latest";
const BARRIER_SECS: u64 = 60;
const WORKLOAD_THROUGHPUT: &str = r#"[global]
direct=1
ioengine=libaio
group_reporting
time_based
log_avg_msec=1000
filename=/data/iobench_testfile
size=10G
[throughput]
rw=write
bs=16K
numjobs=4
iodepth=16
runtime=300
"#;
const WORKLOAD_FSYNC_HOT_PATH: &str = r#"[global]
direct=1
ioengine=libaio
group_reporting
time_based
log_avg_msec=1000
filename=/data/iobench_testfile
size=5G
[fsync_hot_path]
rw=write
bs=16K
fdatasync=1
numjobs=4
iodepth=4
runtime=600
"#;
const WORKLOAD_SELFTEST_512K: &str = r#"[global]
direct=1
ioengine=libaio
group_reporting
time_based
log_avg_msec=1000
filename=/data/iobench_testfile
size=10G
[selftest_512k]
rw=write
bs=512K
fdatasync=1
numjobs=1
iodepth=4
runtime=120
"#;
const WORKLOAD_SELFTEST_4K_QD1: &str = r#"[global]
direct=1
ioengine=libaio
group_reporting
time_based
log_avg_msec=1000
filename=/data/iobench_testfile
size=2G
[selftest_4k_qd1]
rw=write
bs=4K
fdatasync=1
numjobs=1
iodepth=1
runtime=120
"#;
struct Workload {
name: &'static str,
template: &'static str,
runtime_secs: u64,
description: &'static str,
}
fn workloads() -> Vec<Workload> {
vec![
Workload {
name: "throughput",
template: WORKLOAD_THROUGHPUT,
runtime_secs: 300,
description: "16K seq write, no fsync, 4 jobs — bulk segment write upper bound",
},
Workload {
name: "fsync_hot_path",
template: WORKLOAD_FSYNC_HOT_PATH,
runtime_secs: 600,
description: "16K seq write, fdatasync/op, 4 jobs — Raft acks=all commit path",
},
Workload {
name: "selftest_512k",
template: WORKLOAD_SELFTEST_512K,
runtime_secs: 120,
description: "512K seq write, fdatasync, 1 job — rpk self-test 512K phase",
},
Workload {
name: "selftest_4k_qd1",
template: WORKLOAD_SELFTEST_4K_QD1,
runtime_secs: 120,
description: "4K seq write, fdatasync, iodepth=1 — worst-case single-stream commit",
},
]
}
fn total_runtime_secs(workloads: &[Workload]) -> u64 {
workloads
.iter()
.map(|w| BARRIER_SECS + w.runtime_secs)
.sum()
}
fn fmt_duration(secs: u64) -> String {
if secs >= 3600 {
format!("{}h{:02}m", secs / 3600, (secs % 3600) / 60)
} else if secs >= 60 {
format!("{}m{:02}s", secs / 60, secs % 60)
} else {
format!("{}s", secs)
}
}
#[derive(Args, Debug, Clone)]
pub struct RedpandaArgs {
#[arg(long, default_value = "both")]
pub mode: String,
#[arg(long, default_value = "ceph-block")]
pub storage_class: String,
#[arg(long, default_value = "50Gi")]
pub pvc_size: String,
#[arg(long, default_value_t = 3)]
pub replicas: i32,
#[arg(long, default_value = "default")]
pub namespace: String,
#[arg(long)]
pub output_dir: Option<String>,
#[arg(long)]
pub keep_deployment: bool,
#[arg(long)]
pub deploy_only: bool,
}
pub fn deploy(args: &RedpandaArgs) -> io::Result<()> {
println!(
"[deploy] Creating StatefulSet '{}' in namespace '{}' ({} replicas, {} PVCs on {})",
NAME, args.namespace, args.replicas, args.pvc_size, args.storage_class
);
let ss = k8s::redpanda_statefulset(
NAME,
&args.namespace,
args.replicas,
&args.storage_class,
&args.pvc_size,
IMAGE,
);
let yaml = serde_yaml::to_string(&ss)
.map_err(|e| io::Error::other(format!("YAML serialization failed: {}", e)))?;
k8s::apply_yaml(&yaml)?;
println!(
"[deploy] StatefulSet applied, waiting for {} pods to be ready (timeout 300s)...",
args.replicas
);
k8s::wait_for_pods(&args.namespace, NAME)?;
let pods = k8s::get_pod_names(&args.namespace, NAME)?;
println!("[deploy] All pods ready: {}", pods.join(", "));
Ok(())
}
pub fn undeploy(args: &RedpandaArgs) -> io::Result<()> {
println!(
"[cleanup] Deleting StatefulSet and PVCs with label app={}...",
NAME
);
k8s::delete_resources(&args.namespace, NAME)?;
println!("[cleanup] Delete issued (async). Resources will be fully removed shortly.");
Ok(())
}
pub fn run(args: &RedpandaArgs) -> io::Result<()> {
if args.deploy_only {
return deploy(args);
}
let output_dir = args.output_dir.clone().unwrap_or_else(|| {
format!(
"./iobench-redpanda-{}",
Local::now().format("%Y-%m-%d-%H%M%S")
)
});
fs::create_dir_all(&output_dir)?;
let pod_names = match k8s::get_pod_names(&args.namespace, NAME) {
Ok(pods) if pods.len() >= args.replicas as usize => {
println!("[setup] Found existing pods: {}", pods.join(", "));
pods
}
_ => {
deploy(args)?;
k8s::get_pod_names(&args.namespace, NAME)?
}
};
if pod_names.len() < args.replicas as usize {
return Err(io::Error::other(format!(
"Expected {} pods, found {}",
args.replicas,
pod_names.len()
)));
}
let modes: Vec<&str> = match args.mode.as_str() {
"sequential" => vec!["sequential"],
"parallel" => vec!["parallel"],
"both" => vec!["sequential", "parallel"],
other => {
return Err(io::Error::other(format!(
"Unknown mode: {}. Use sequential, parallel, or both.",
other
)));
}
};
let wl = workloads();
let per_mode_secs = total_runtime_secs(&wl);
let total_secs = per_mode_secs * modes.len() as u64;
println!();
println!("============================================================");
println!(" Redpanda I/O characterization profile");
println!(
" Modes: {:?} | Workloads: {} | Pods: {}",
modes,
wl.len(),
pod_names.len()
);
println!(" Estimated total runtime: {}", fmt_duration(total_secs));
println!(" Output: {}", output_dir);
println!("============================================================");
let run_start = Instant::now();
let mut all_results: Vec<RedpandaCsvRow> = Vec::new();
for (mode_idx, mode) in modes.iter().enumerate() {
println!();
if *mode == "parallel" {
println!("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
println!(" WARNING: Parallel mode — intensive fdatasync across all");
println!(" nodes simultaneously. Will impact other Ceph pool users.");
println!("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
}
println!(
"=== Mode: {} ({}/{}) — {} per-mode est. ===",
mode,
mode_idx + 1,
modes.len(),
fmt_duration(per_mode_secs)
);
for (wl_idx, workload) in wl.iter().enumerate() {
let remaining_in_mode: u64 = wl[wl_idx..]
.iter()
.map(|w| BARRIER_SECS + w.runtime_secs)
.sum();
println!();
println!(
"--- [{}/{}] {} ({}s barrier + {}s fio = {}) ---",
wl_idx + 1,
wl.len(),
workload.name,
BARRIER_SECS,
workload.runtime_secs,
fmt_duration(BARRIER_SECS + workload.runtime_secs)
);
println!(" {}", workload.description);
println!(
" Remaining in this mode: {} | Total elapsed: {}",
fmt_duration(remaining_in_mode),
fmt_duration(run_start.elapsed().as_secs())
);
let mode_dir = format!("{}/{}", output_dir, mode);
fs::create_dir_all(&mode_dir)?;
let start_at = now_epoch() + BARRIER_SECS;
let results = if *mode == "sequential" {
let pod = &pod_names[0];
println!(" Running on: {}", pod);
let result = run_workload_on_pod(
&args.namespace,
pod,
workload.name,
workload.template,
workload.runtime_secs,
start_at,
&mode_dir,
)?;
vec![(pod.clone(), result)]
} else {
println!(
" Running on {} pods: {}",
pod_names.len(),
pod_names.join(", ")
);
let mut handles = Vec::new();
for pod in &pod_names {
let ns = args.namespace.clone();
let pod_for_thread = pod.clone();
let wl_name = workload.name.to_string();
let template = workload.template.to_string();
let dir = mode_dir.clone();
let rt = workload.runtime_secs;
let handle = thread::spawn(move || {
run_workload_on_pod(
&ns,
&pod_for_thread,
&wl_name,
&template,
rt,
start_at,
&dir,
)
});
handles.push((pod.clone(), handle));
}
let mut collected = Vec::new();
for (pod, handle) in handles {
match handle.join() {
Ok(Ok(result)) => collected.push((pod, result)),
Ok(Err(e)) => {
eprintln!(" ERROR on pod {}: {}", pod, e);
}
Err(_) => {
eprintln!(" PANIC on pod {}", pod);
}
}
}
collected
};
println!(
" {} complete ({} elapsed total)",
workload.name,
fmt_duration(run_start.elapsed().as_secs())
);
for (pod, raw_json) in &results {
let pod_slug = pod.replace(NAME, "node");
let json_path = format!("{}/{}_{}.json", mode_dir, workload.name, pod_slug);
fs::write(&json_path, raw_json)?;
let result = parse_fio_output(raw_json, workload.name, "write").map_err(|e| {
io::Error::other(format!("{} on {}: {}", workload.name, pod, e))
})?;
let sync = parse_sync_latencies(raw_json, workload.name);
// Print per-pod summary inline
let bw_mib = result.bandwidth_kibps / 1024.0;
if sync.total_ios > 0 {
println!(
" {} => {:.0} IOPS | {:.1} MiB/s | clat_p99={:.2}ms | fsync_p99.9={:.2}ms",
pod_slug, result.iops, bw_mib, result.clat_p99_ms, sync.p99_9_ms
);
} else {
println!(
" {} => {:.0} IOPS | {:.1} MiB/s | clat_p99={:.2}ms",
pod_slug, result.iops, bw_mib, result.clat_p99_ms
);
}
all_results.push(RedpandaCsvRow {
label: "redpanda".to_string(),
mode: mode.to_string(),
workload: workload.name.to_string(),
node: pod.clone(),
test_name: result.test_name.clone(),
iops: result.iops,
bandwidth_kibps: result.bandwidth_kibps,
latency_mean_ms: result.latency_mean_ms,
latency_stddev_ms: result.latency_stddev_ms,
clat_p50_ms: result.clat_p50_ms,
clat_p95_ms: result.clat_p95_ms,
clat_p99_ms: result.clat_p99_ms,
clat_p99_9_ms: result.clat_p99_9_ms,
clat_p99_99_ms: result.clat_p99_99_ms,
clat_max_ms: result.clat_max_ms,
fdatasync_total_ios: sync.total_ios,
fdatasync_mean_ms: sync.mean_ms,
fdatasync_p50_ms: sync.p50_ms,
fdatasync_p95_ms: sync.p95_ms,
fdatasync_p99_ms: sync.p99_ms,
fdatasync_p99_9_ms: sync.p99_9_ms,
fdatasync_p99_99_ms: sync.p99_99_ms,
fdatasync_max_ms: sync.max_ms,
});
}
}
}
println!();
println!("============================================================");
println!(
" All workloads complete. Total runtime: {}",
fmt_duration(run_start.elapsed().as_secs())
);
println!("============================================================");
let csv_path = format!("{}/redpanda_summary.csv", output_dir);
let mut wtr = csv::Writer::from_path(&csv_path)?;
for row in &all_results {
wtr.serialize(row)?;
}
wtr.flush()?;
println!("[output] Redpanda summary: {}", csv_path);
let dash_path = format!("{}/iobench.csv", output_dir);
let mut wtr2 = csv::Writer::from_path(&dash_path)?;
for row in &all_results {
wtr2.serialize(DashRow::from(row))?;
}
wtr2.flush()?;
println!("[output] Dashboard CSV: {}", dash_path);
if let Some(worst) = all_results
.iter()
.filter(|r| r.mode == "parallel" && r.workload == "fsync_hot_path")
.max_by(|a, b| {
a.fdatasync_p99_9_ms
.partial_cmp(&b.fdatasync_p99_9_ms)
.unwrap_or(std::cmp::Ordering::Equal)
})
{
println!();
println!("=== HEADLINE ===");
println!(
"Worst p99.9 fdatasync latency (parallel fsync_hot_path): {:.3} ms on {}",
worst.fdatasync_p99_9_ms, worst.node
);
if worst.fdatasync_p99_9_ms > 100.0 {
println!("RESULT: FAIL (>100 ms). Raft heartbeats may not survive load spikes.");
} else {
println!("RESULT: PASS (<=100 ms).");
}
}
if !args.keep_deployment {
println!();
undeploy(args)?;
}
Ok(())
}
fn now_epoch() -> u64 {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap()
.as_secs()
}
fn run_workload_on_pod(
namespace: &str,
pod: &str,
workload_name: &str,
job_template: &str,
runtime_secs: u64,
start_at: u64,
local_mode_dir: &str,
) -> io::Result<String> {
let pod_slug = pod.replace(NAME, "node");
let remote_job_path = format!("/data/results/{}.fio", workload_name);
let remote_out_path = format!("/data/results/{}.json", workload_name);
let remote_results_dir = "/data/results";
let write_job_cmd = format!(
"mkdir -p {} && cat > {} <<'EOF'\n{}\nEOF",
remote_results_dir, remote_job_path, job_template
);
k8s::exec(namespace, pod, &write_job_cmd)?;
let fio_cmd = format!(
"NOW=$(date +%s); if [ $NOW -lt {start_at} ]; then sleep $(({start_at} - NOW)); fi; fio {remote_job_path} --output={remote_out_path} --output-format=json --write_lat_log={remote_results_dir}/{workload_name}_lat --write_iops_log={remote_results_dir}/{workload_name}_iops",
start_at = start_at,
remote_job_path = remote_job_path,
remote_out_path = remote_out_path,
remote_results_dir = remote_results_dir,
workload_name = workload_name,
);
// Progress ticker — prints every 10s until fio exec returns
let done = Arc::new(AtomicBool::new(false));
let done_clone = done.clone();
let ticker_label = pod_slug.clone();
let wl = workload_name.to_string();
let total_expected = BARRIER_SECS + runtime_secs;
let progress_handle = thread::spawn(move || {
let start = Instant::now();
loop {
thread::sleep(Duration::from_secs(10));
if done_clone.load(Ordering::Acquire) {
break;
}
let elapsed = start.elapsed().as_secs();
let phase = if elapsed < BARRIER_SECS {
format!("barrier sync {}s/{}s", elapsed, BARRIER_SECS)
} else {
let fio_elapsed = elapsed - BARRIER_SECS;
format!("fio running {}s/{}s", fio_elapsed, runtime_secs)
};
let remaining = total_expected.saturating_sub(elapsed);
eprintln!(
" [{}:{}] {} ({} remaining)",
ticker_label,
wl,
phase,
fmt_duration(remaining)
);
}
});
let output = k8s::exec(namespace, pod, &fio_cmd);
done.store(true, Ordering::Release);
let _ = progress_handle.join();
let output = output?;
let local_pod_dir = format!("{}/{}", local_mode_dir, pod_slug);
fs::create_dir_all(&local_pod_dir)?;
eprintln!(" [{}] Copying results from pod...", pod_slug);
if let Err(e) = k8s::cp_from(namespace, pod, remote_results_dir, &local_pod_dir) {
eprintln!(" [{}] Warning: failed to copy results: {}", pod_slug, e);
}
let local_json = format!("{}/{}.json", local_pod_dir, workload_name);
if Path::new(&local_json).exists() {
fs::read_to_string(&local_json)
} else {
Ok(output)
}
}
#[derive(Debug, Clone, Default)]
struct SyncLatencies {
total_ios: u64,
mean_ms: f64,
p50_ms: f64,
p95_ms: f64,
p99_ms: f64,
p99_9_ms: f64,
p99_99_ms: f64,
max_ms: f64,
}
fn parse_sync_latencies(json: &str, workload_name: &str) -> SyncLatencies {
let fio_data: FioOutput = match serde_json::from_str(json) {
Ok(d) => d,
Err(_) => return SyncLatencies::default(),
};
let job = match fio_data.jobs.iter().find(|j| j.jobname == workload_name) {
Some(j) => j,
None => return SyncLatencies::default(),
};
let sync = match &job.sync {
Some(s) => s,
None => return SyncLatencies::default(),
};
let p = sync.lat_ns.percentile.as_ref();
SyncLatencies {
total_ios: sync.total_ios,
mean_ms: sync.lat_ns.mean / 1_000_000.0,
p50_ms: get_percentile(p, "50.000000"),
p95_ms: get_percentile(p, "95.000000"),
p99_ms: get_percentile(p, "99.000000"),
p99_9_ms: get_percentile(p, "99.900000"),
p99_99_ms: get_percentile(p, "99.990000"),
max_ms: get_percentile(p, "100.000000"),
}
}
fn get_percentile(p: Option<&std::collections::HashMap<String, f64>>, key: &str) -> f64 {
p.and_then(|m| m.get(key).copied()).unwrap_or(0.0) / 1_000_000.0
}
#[derive(Debug, serde::Serialize)]
struct RedpandaCsvRow {
label: String,
mode: String,
workload: String,
node: String,
test_name: String,
iops: f64,
bandwidth_kibps: f64,
latency_mean_ms: f64,
latency_stddev_ms: f64,
clat_p50_ms: f64,
clat_p95_ms: f64,
clat_p99_ms: f64,
clat_p99_9_ms: f64,
clat_p99_99_ms: f64,
clat_max_ms: f64,
fdatasync_total_ios: u64,
fdatasync_mean_ms: f64,
fdatasync_p50_ms: f64,
fdatasync_p95_ms: f64,
fdatasync_p99_ms: f64,
fdatasync_p99_9_ms: f64,
fdatasync_p99_99_ms: f64,
fdatasync_max_ms: f64,
}
#[derive(Debug, serde::Serialize)]
struct DashRow {
label: String,
test_name: String,
iops: f64,
bandwidth_kibps: f64,
latency_mean_ms: f64,
latency_stddev_ms: f64,
fdatasync_p99_9_ms: f64,
}
impl From<&RedpandaCsvRow> for DashRow {
fn from(row: &RedpandaCsvRow) -> Self {
DashRow {
label: format!("{}-{}-{}", row.label, row.mode, row.workload),
test_name: row.test_name.clone(),
iops: row.iops,
bandwidth_kibps: row.bandwidth_kibps,
latency_mean_ms: row.latency_mean_ms,
latency_stddev_ms: row.latency_stddev_ms,
fdatasync_p99_9_ms: row.fdatasync_p99_9_ms,
}
}
}
#[cfg(test)]
mod tests {
use super::*;
// -- fmt_duration --
#[test]
fn fmt_duration_seconds_only() {
assert_eq!(fmt_duration(0), "0s");
assert_eq!(fmt_duration(1), "1s");
assert_eq!(fmt_duration(59), "59s");
}
#[test]
fn fmt_duration_minutes_and_seconds() {
assert_eq!(fmt_duration(60), "1m00s");
assert_eq!(fmt_duration(61), "1m01s");
assert_eq!(fmt_duration(125), "2m05s");
assert_eq!(fmt_duration(3599), "59m59s");
}
#[test]
fn fmt_duration_hours() {
assert_eq!(fmt_duration(3600), "1h00m");
assert_eq!(fmt_duration(3660), "1h01m");
assert_eq!(fmt_duration(7200), "2h00m");
assert_eq!(fmt_duration(5432), "1h30m");
}
// -- total_runtime_secs --
#[test]
fn total_runtime_secs_matches_workload_sum() {
let wl = workloads();
let expected: u64 = wl.iter().map(|w| BARRIER_SECS + w.runtime_secs).sum();
assert_eq!(total_runtime_secs(&wl), expected);
// 4 workloads: (60+300) + (60+600) + (60+120) + (60+120) = 1380
assert_eq!(expected, 1380);
}
// -- Workload runtime consistency --
// Verify that each Workload.runtime_secs matches the runtime= value
// in its fio template. Prevents the two sources of truth from diverging.
#[test]
fn workload_runtime_matches_fio_template() {
for w in workloads() {
let runtime_from_template = w
.template
.lines()
.find_map(|line| {
let trimmed = line.trim();
trimmed
.strip_prefix("runtime=")
.and_then(|v| v.parse::<u64>().ok())
})
.unwrap_or_else(|| {
panic!("Could not find runtime= in fio template for '{}'", w.name)
});
assert_eq!(
w.runtime_secs, runtime_from_template,
"Workload '{}': struct says {}s but template says runtime={}",
w.name, w.runtime_secs, runtime_from_template
);
}
}
// -- parse_sync_latencies --
fn sample_fio_json_with_sync() -> String {
serde_json::json!({
"jobs": [{
"jobname": "fsync_hot_path",
"read": {
"bw": 0.0, "iops": 0.0,
"clat_ns": { "mean": 0.0, "stddev": 0.0 }
},
"write": {
"bw": 51200.0, "iops": 3200.0,
"clat_ns": {
"mean": 5_000_000.0, "stddev": 1_000_000.0,
"percentile": {
"50.000000": 4_000_000.0,
"95.000000": 8_000_000.0,
"99.000000": 12_000_000.0,
"99.900000": 20_000_000.0,
"99.990000": 30_000_000.0,
"100.000000": 50_000_000.0
}
}
},
"sync": {
"total_ios": 50000_u64,
"lat_ns": {
"mean": 2_000_000.0, "stddev": 500_000.0,
"percentile": {
"50.000000": 1_500_000.0,
"95.000000": 3_000_000.0,
"99.000000": 5_000_000.0,
"99.900000": 10_000_000.0,
"99.990000": 20_000_000.0,
"100.000000": 50_000_000.0
}
}
}
}]
})
.to_string()
}
#[test]
fn parse_sync_latencies_extracts_correctly() {
let json = sample_fio_json_with_sync();
let sync = parse_sync_latencies(&json, "fsync_hot_path");
assert_eq!(sync.total_ios, 50000);
assert!((sync.mean_ms - 2.0).abs() < 0.001);
assert!((sync.p50_ms - 1.5).abs() < 0.001);
assert!((sync.p95_ms - 3.0).abs() < 0.001);
assert!((sync.p99_ms - 5.0).abs() < 0.001);
assert!((sync.p99_9_ms - 10.0).abs() < 0.001);
assert!((sync.p99_99_ms - 20.0).abs() < 0.001);
assert!((sync.max_ms - 50.0).abs() < 0.001);
}
#[test]
fn parse_sync_latencies_wrong_job_returns_default() {
let json = sample_fio_json_with_sync();
let sync = parse_sync_latencies(&json, "nonexistent");
assert_eq!(sync.total_ios, 0);
assert_eq!(sync.mean_ms, 0.0);
}
#[test]
fn parse_sync_latencies_invalid_json_returns_default() {
let sync = parse_sync_latencies("not json", "fsync_hot_path");
assert_eq!(sync.total_ios, 0);
}
fn sample_fio_json_no_sync() -> String {
serde_json::json!({
"jobs": [{
"jobname": "throughput",
"read": {
"bw": 0.0, "iops": 0.0,
"clat_ns": { "mean": 0.0, "stddev": 0.0 }
},
"write": {
"bw": 51200.0, "iops": 3200.0,
"clat_ns": {
"mean": 5_000_000.0, "stddev": 1_000_000.0,
"percentile": {
"50.000000": 4_000_000.0,
"95.000000": 8_000_000.0,
"99.000000": 12_000_000.0,
"99.900000": 20_000_000.0,
"99.990000": 30_000_000.0,
"100.000000": 50_000_000.0
}
}
}
}]
})
.to_string()
}
#[test]
fn parse_sync_latencies_no_sync_field_returns_default() {
let json = sample_fio_json_no_sync();
let sync = parse_sync_latencies(&json, "throughput");
assert_eq!(sync.total_ios, 0);
}
// -- get_percentile --
#[test]
fn get_percentile_extracts_and_converts_ns_to_ms() {
let mut m = HashMap::new();
m.insert("99.000000".to_string(), 12_000_000.0);
let result = get_percentile(Some(&m), "99.000000");
assert!((result - 12.0).abs() < 0.001);
}
#[test]
fn get_percentile_missing_key_returns_zero() {
let m = HashMap::new();
assert_eq!(get_percentile(Some(&m), "99.000000"), 0.0);
}
#[test]
fn get_percentile_none_map_returns_zero() {
assert_eq!(get_percentile(None, "99.000000"), 0.0);
}
// -- parse_fio_output integration --
#[test]
fn parse_fio_output_with_sync_workload() {
let json = sample_fio_json_with_sync();
let result = parse_fio_output(&json, "fsync_hot_path", "write").unwrap();
assert!((result.iops - 3200.0).abs() < 0.001);
assert!((result.bandwidth_kibps - 51200.0).abs() < 0.001);
assert!((result.latency_mean_ms - 5.0).abs() < 0.001);
assert!((result.clat_p50_ms - 4.0).abs() < 0.001);
assert!((result.clat_p99_ms - 12.0).abs() < 0.001);
assert!((result.clat_p99_9_ms - 20.0).abs() < 0.001);
assert!((result.clat_max_ms - 50.0).abs() < 0.001);
}
#[test]
fn parse_fio_output_no_sync_workload() {
let json = sample_fio_json_no_sync();
let result = parse_fio_output(&json, "throughput", "write").unwrap();
assert!((result.iops - 3200.0).abs() < 0.001);
assert!((result.clat_p99_ms - 12.0).abs() < 0.001);
}
#[test]
fn parse_fio_output_wrong_job_name_errors() {
let json = sample_fio_json_no_sync();
let result = parse_fio_output(&json, "nonexistent", "write");
assert!(result.is_err());
assert!(result.unwrap_err().contains("nonexistent"));
}
// -- DashRow label formatting --
#[test]
fn dash_row_label_format() {
let row = RedpandaCsvRow {
label: "redpanda".to_string(),
mode: "parallel".to_string(),
workload: "fsync_hot_path".to_string(),
node: "iobench-redpanda-0".to_string(),
test_name: "fsync_hot_path".to_string(),
iops: 1000.0,
bandwidth_kibps: 16000.0,
latency_mean_ms: 5.0,
latency_stddev_ms: 1.0,
clat_p50_ms: 4.0,
clat_p95_ms: 8.0,
clat_p99_ms: 12.0,
clat_p99_9_ms: 20.0,
clat_p99_99_ms: 30.0,
clat_max_ms: 50.0,
fdatasync_total_ios: 50000,
fdatasync_mean_ms: 2.0,
fdatasync_p50_ms: 1.5,
fdatasync_p95_ms: 3.0,
fdatasync_p99_ms: 5.0,
fdatasync_p99_9_ms: 10.0,
fdatasync_p99_99_ms: 20.0,
fdatasync_max_ms: 50.0,
};
let dash = DashRow::from(&row);
assert_eq!(dash.label, "redpanda-parallel-fsync_hot_path");
assert!((dash.fdatasync_p99_9_ms - 10.0).abs() < 0.001);
}
// -- CSV serialization round-trip --
#[test]
fn redpanda_csv_row_serializes_all_columns() {
let row = RedpandaCsvRow {
label: "redpanda".to_string(),
mode: "sequential".to_string(),
workload: "throughput".to_string(),
node: "node-0".to_string(),
test_name: "throughput".to_string(),
iops: 3200.0,
bandwidth_kibps: 51200.0,
latency_mean_ms: 5.0,
latency_stddev_ms: 1.0,
clat_p50_ms: 4.0,
clat_p95_ms: 8.0,
clat_p99_ms: 12.0,
clat_p99_9_ms: 20.0,
clat_p99_99_ms: 30.0,
clat_max_ms: 50.0,
fdatasync_total_ios: 0,
fdatasync_mean_ms: 0.0,
fdatasync_p50_ms: 0.0,
fdatasync_p95_ms: 0.0,
fdatasync_p99_ms: 0.0,
fdatasync_p99_9_ms: 0.0,
fdatasync_p99_99_ms: 0.0,
fdatasync_max_ms: 0.0,
};
let mut wtr = csv::Writer::from_writer(Vec::new());
wtr.serialize(&row).unwrap();
wtr.flush().unwrap();
let csv_output = String::from_utf8(wtr.into_inner().unwrap()).unwrap();
let lines: Vec<&str> = csv_output.lines().collect();
// Header line must contain all expected column names
let header = lines[0];
for col in [
"label",
"mode",
"workload",
"node",
"test_name",
"iops",
"bandwidth_kibps",
"latency_mean_ms",
"latency_stddev_ms",
"clat_p50_ms",
"clat_p95_ms",
"clat_p99_ms",
"clat_p99_9_ms",
"clat_p99_99_ms",
"clat_max_ms",
"fdatasync_total_ios",
"fdatasync_mean_ms",
"fdatasync_p50_ms",
"fdatasync_p95_ms",
"fdatasync_p99_ms",
"fdatasync_p99_9_ms",
"fdatasync_p99_99_ms",
"fdatasync_max_ms",
] {
assert!(header.contains(col), "CSV header missing column: {}", col);
}
// Data line should have the right number of fields
let data_fields: Vec<&str> = lines[1].split(',').collect();
let header_fields: Vec<&str> = header.split(',').collect();
assert_eq!(data_fields.len(), header_fields.len());
}
}

123
iobench/src/simple.rs Normal file
View File

@@ -0,0 +1,123 @@
use std::fs;
use std::io;
use std::thread;
use std::time::Duration;
use chrono::Local;
use crate::fio::parse_fio_output;
use crate::run_command;
#[derive(Debug, Clone)]
pub struct SimpleArgs {
pub target: String,
pub benchmark_dir: String,
pub tests: String,
pub duration: u64,
pub output_dir: Option<String>,
pub size: String,
pub block_size: String,
}
pub fn run(args: &SimpleArgs) -> io::Result<()> {
let output_dir = args
.output_dir
.clone()
.unwrap_or_else(|| format!("./iobench-{}", Local::now().format("%Y-%m-%d-%H%M%S")));
fs::create_dir_all(&output_dir)?;
let tests_to_run: Vec<&str> = args.tests.split(',').collect();
let mut results = Vec::new();
for test in tests_to_run {
println!("--------------------------------------------------");
println!("Running test: {}", test);
let (rw, numjobs) = match test {
"read" => ("read", 1),
"write" => ("write", 1),
"randread" => ("randread", 1),
"randwrite" => ("randwrite", 1),
"multiread" => ("read", 4),
"multiwrite" => ("write", 4),
"multirandread" => ("randread", 4),
"multirandwrite" => ("randwrite", 4),
_ => {
eprintln!("Unknown test: {}. Skipping.", test);
continue;
}
};
let test_name = format!("{}-{}-sync-test", test, args.block_size);
let fio_command = format!(
"fio --filename={}/iobench_testfile --direct=1 --fsync=1 --rw={} --bs={} --numjobs={} --iodepth=1 --runtime={} --time_based --group_reporting --name={} --size={} --output-format=json",
args.benchmark_dir, rw, args.block_size, numjobs, args.duration, test_name, args.size
);
println!("Executing command:\n{}\n", fio_command);
let output = match run_command(&args.target, &fio_command) {
Ok(out) => out,
Err(e) => {
eprintln!("Failed to execute command for test {}: {}", test, e);
continue;
}
};
let result = parse_fio_output(&output, &test_name, rw);
match result {
Ok(res) => {
results.push(res);
}
Err(e) => {
eprintln!("Error parsing fio output for test {}: {}", test, e);
eprintln!("Raw output:\n{}", output);
}
}
println!("{output}");
println!("Test {} completed.", test);
thread::sleep(Duration::from_secs(2));
}
println!("--------------------------------------------------");
println!("Cleaning up test file on target...");
let cleanup_command = "rm -f ./iobench_testfile";
if let Err(e) = run_command(&args.target, cleanup_command) {
eprintln!("Warning: Failed to clean up test file on target: {}", e);
} else {
println!("Cleanup successful.");
}
if results.is_empty() {
println!("\nNo benchmark results to display.");
return Ok(());
}
let csv_path = format!("{}/summary.csv", output_dir);
let mut wtr = csv::Writer::from_path(&csv_path)?;
for result in &results {
wtr.serialize(result)?;
}
wtr.flush()?;
println!("\nBenchmark summary saved to {}", csv_path);
println!("\n--- Benchmark Results Summary ---");
println!(
"{:<25} {:>10} {:>18} {:>20} {:>22}",
"Test Name", "IOPS", "Bandwidth (KiB/s)", "Latency Mean (ms)", "Latency StdDev (ms)"
);
println!("{:-<98}", "");
for result in results {
println!(
"{:<25} {:>10.2} {:>18.2} {:>20.4} {:>22.4}",
result.test_name,
result.iops,
result.bandwidth_kibps,
result.latency_mean_ms,
result.latency_stddev_ms
);
}
Ok(())
}