feat: iobench redpanda profile to run the recommended fio settings by redpanda on a k8s storage backend #281
2
.gitignore
vendored
2
.gitignore
vendored
@@ -32,3 +32,5 @@ ignore
|
|||||||
|
|
||||||
# Generated book
|
# Generated book
|
||||||
book
|
book
|
||||||
|
|
||||||
|
__pycache__
|
||||||
|
|||||||
35
Cargo.lock
generated
35
Cargo.lock
generated
@@ -1932,6 +1932,27 @@ dependencies = [
|
|||||||
"typenum",
|
"typenum",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "csv"
|
||||||
|
version = "1.4.0"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "52cd9d68cf7efc6ddfaaee42e7288d3a99d613d4b50f76ce9827ae0c6e14f938"
|
||||||
|
dependencies = [
|
||||||
|
"csv-core",
|
||||||
|
"itoa",
|
||||||
|
"ryu",
|
||||||
|
"serde_core",
|
||||||
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "csv-core"
|
||||||
|
version = "0.1.13"
|
||||||
|
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||||
|
checksum = "704a3c26996a80471189265814dbc2c257598b96b8a7feae2d31ace646bb9782"
|
||||||
|
dependencies = [
|
||||||
|
"memchr",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ctr"
|
name = "ctr"
|
||||||
version = "0.9.2"
|
version = "0.9.2"
|
||||||
@@ -4648,6 +4669,20 @@ dependencies = [
|
|||||||
"thiserror 1.0.69",
|
"thiserror 1.0.69",
|
||||||
]
|
]
|
||||||
|
|
||||||
|
[[package]]
|
||||||
|
name = "iobench"
|
||||||
|
version = "1.0.0"
|
||||||
|
dependencies = [
|
||||||
|
"chrono",
|
||||||
|
"clap",
|
||||||
|
"csv",
|
||||||
|
"k8s-openapi",
|
||||||
|
"num_cpus",
|
||||||
|
"serde",
|
||||||
|
"serde_json",
|
||||||
|
"serde_yaml",
|
||||||
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "ipnet"
|
name = "ipnet"
|
||||||
version = "2.12.0"
|
version = "2.12.0"
|
||||||
|
|||||||
@@ -28,6 +28,7 @@ members = [
|
|||||||
"harmony_node_readiness",
|
"harmony_node_readiness",
|
||||||
"harmony-k8s",
|
"harmony-k8s",
|
||||||
"harmony_assets", "opnsense-codegen", "opnsense-api",
|
"harmony_assets", "opnsense-codegen", "opnsense-api",
|
||||||
|
"iobench",
|
||||||
]
|
]
|
||||||
|
|
||||||
[workspace.package]
|
[workspace.package]
|
||||||
|
|||||||
@@ -11,7 +11,8 @@ clap = { version = "4.0", features = ["derive"] }
|
|||||||
chrono = "0.4"
|
chrono = "0.4"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
serde_json = "1.0"
|
serde_json = "1.0"
|
||||||
|
serde_yaml = { workspace = true }
|
||||||
csv = "1.1"
|
csv = "1.1"
|
||||||
num_cpus = "1.13"
|
num_cpus = "1.13"
|
||||||
|
k8s-openapi = { workspace = true }
|
||||||
|
|
||||||
[workspace]
|
|
||||||
|
|||||||
68
iobench/PREFLIGHT_CHECKS.md
Normal file
68
iobench/PREFLIGHT_CHECKS.md
Normal file
@@ -0,0 +1,68 @@
|
|||||||
|
# Redpanda iobench preflight checklist
|
||||||
|
|
||||||
|
Cluster inspection performed: 2026-05-03
|
||||||
|
Context: maintenance window, other workloads turned off, Ceph pool at ~0 IOPS idle.
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
## Cluster topology
|
||||||
|
|
||||||
|
- [x] **PASS** — 3 worker nodes available (wk0, wk1, wk2). Matches `--replicas 3` default.
|
||||||
|
- [x] **PASS** — All 3 workers are Ready, no DiskPressure / MemoryPressure / PIDPressure.
|
||||||
|
- [x] **PASS** — Control plane nodes (cp3, cp4, cp5) have `NoSchedule` taint. iobench pods cannot land there.
|
||||||
|
- [x] **PASS** — All nodes carry `kubernetes.io/hostname` label. Anti-affinity topology key will work.
|
||||||
|
- [x] **PASS** — Worker nodes are untainted and have no scheduling restrictions.
|
||||||
|
|
||||||
|
## Storage
|
||||||
|
|
||||||
|
- [x] **PASS** — StorageClass `ceph-block` exists, `volumeBindingMode: Immediate`, `reclaimPolicy: Delete`.
|
||||||
|
- [x] **PASS** — Ceph pool `ceph-blockpool` replication: size=3, min_size=2.
|
||||||
|
- [x] **PASS** — 3 OSDs all Running (osd-0, osd-1, osd-2).
|
||||||
|
- [x] **PASS** — Raw capacity: 503 GiB available. 3x50GiB PVCs = 150 GiB usable = 450 GiB raw. Leaves 53 GiB raw headroom (~10.5%). Tight but sufficient for a benchmark run.
|
||||||
|
- [x] **PASS** — Largest single-workload disk footprint: `throughput` with numjobs=4 x size=10G = 40 GiB per PVC. Fits in 50 GiB PVC with 10 GiB headroom for logs and filesystem overhead.
|
||||||
|
- [x] **PASS** — Pool is idle ("nothing is going on"), OSD commit/apply latency showing 0 ms. Confirms maintenance window baseline.
|
||||||
|
- [x] **PASS** — All 3 disks are actually Intel SSDs (misidentified as HDD by Ceph due to HP RAID controller passthrough). No mixed-media concern.
|
||||||
|
- [x] **PASS (non-blocking)** — Ceph health is `HEALTH_WARN: too many PGs per OSD (265 > max 250)`. This is a pre-existing tuning issue unrelated to the benchmark. Does not affect correctness of results. Note: if this were a data-integrity warning (e.g. `HEALTH_WARN: degraded PGs`), the benchmark must not proceed.
|
||||||
|
|
||||||
|
## Namespace and resource constraints
|
||||||
|
|
||||||
|
- [x] **PASS** — `default` namespace exists.
|
||||||
|
- [x] **PASS** — No ResourceQuota or LimitRange in `default` namespace. Pod creation won't be blocked.
|
||||||
|
- [x] **PASS** — No leftover `iobench-redpanda` StatefulSet or PVCs in `default` namespace. Clean slate.
|
||||||
|
|
||||||
|
## Container image
|
||||||
|
|
||||||
|
- [x] **PASS** — `juicedata/fio:latest` pulls successfully and runs on this cluster.
|
||||||
|
- [x] **PASS** — fio version 3.18 confirmed. Supports `fdatasync=1`, `log_avg_msec`, `write_lat_log`, `write_iops_log`, `--output-format=json`.
|
||||||
|
- [x] **PASS** — Image contains `sh`, `date` (needed for wall-clock barrier), and `tar` (needed for `kubectl cp`).
|
||||||
|
|
||||||
|
## Clock synchronization (barrier correctness)
|
||||||
|
|
||||||
|
- [x] **PASS** — Node heartbeat timestamps are within ~5s of each other. The 60-second barrier (`start_at = now_epoch() + 60`) provides ample margin. Barrier only fails if clock skew exceeds 60s.
|
||||||
|
|
||||||
|
## Code safety review
|
||||||
|
|
||||||
|
- [x] **PASS** — All fio workloads use `direct=1` (bypass page cache). Benchmark measures Ceph, not RAM.
|
||||||
|
- [x] **PASS** — All writes target `/data/iobench_testfile` on the mounted PVC filesystem, not a raw block device. No risk of corrupting the node OS disk.
|
||||||
|
- [x] **PASS** — Pods run with default security context (no privileged, no hostPath, no hostNetwork). Blast radius is limited to the PVCs.
|
||||||
|
- [x] **PASS** — `reclaimPolicy: Delete` means PVCs and their backing RBD images are fully cleaned up when deleted. No storage leak after `undeploy`.
|
||||||
|
- [x] **PASS** — `delete_resources` targets only `statefulset,pvc` with label `app=iobench-redpanda`. Cannot accidentally delete unrelated resources.
|
||||||
|
- [x] **PASS** — Parallel mode prints a clear warning before starting.
|
||||||
|
- [x] **PASS** — `--keep-deployment` flag available for debugging without re-provisioning.
|
||||||
|
- [x] **PASS** — Workloads run sequentially within each mode (throughput, then fsync_hot_path, then selftest_512k, then selftest_4k_qd1). Disk usage doesn't accumulate across workloads since they reuse the same filename.
|
||||||
|
|
||||||
|
## Resource consumption during run
|
||||||
|
|
||||||
|
- [x] **PASS** — Workers have ample CPU headroom (3-6% current usage). fio with 4 jobs + iodepth 16 is not CPU-intensive on modern hardware.
|
||||||
|
- [x] **PASS** — Workers have ample memory headroom (5-10% current usage). fio with `direct=1` uses minimal RAM.
|
||||||
|
- [x] **PASS** — No CPU/memory resource limits or requests set on the fio container. This is intentional — resource limits would throttle the benchmark and distort results. Acceptable during a maintenance window with other workloads off.
|
||||||
|
|
||||||
|
## Risks acknowledged
|
||||||
|
|
||||||
|
- **Ceph pool impact**: Parallel mode will saturate the Ceph pool. This is the point of the test. Confirmed other workloads are off and pool is at ~0 IOPS.
|
||||||
|
- **Capacity headroom is tight**: 53 GiB raw remaining after PVC provisioning (~10.5%). If the cluster has background operations that consume space (e.g., snapshots, compaction), this could trigger a `HEALTH_ERR: full` condition. Mitigated by: maintenance window, no other workloads, and `reclaimPolicy: Delete` ensuring cleanup.
|
||||||
|
- **`--wait=false` on delete**: `delete_resources` uses `--wait=false`, so `undeploy` returns before PVCs are fully reclaimed. This is fine — Ceph handles RBD image deletion asynchronously. But if re-running immediately after undeploy, PVCs from the previous run may still be terminating. Mitigated by: the deploy step uses `kubectl apply` which is idempotent.
|
||||||
|
|
||||||
|
## Verdict
|
||||||
|
|
||||||
|
All checks pass. Safe to proceed with `iobench redpanda --storage-class ceph-block` during the current maintenance window.
|
||||||
146
iobench/README.md
Normal file
146
iobench/README.md
Normal file
@@ -0,0 +1,146 @@
|
|||||||
|
# iobench
|
||||||
|
|
||||||
|
A command-line I/O benchmarking tool using fio. Runs locally, over SSH, or on Kubernetes pods. Includes a Redpanda storage characterization profile for validating Ceph RBD suitability.
|
||||||
|
|
||||||
|
## Build
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cargo build -p iobench --release
|
||||||
|
```
|
||||||
|
|
||||||
|
## Simple profile (default)
|
||||||
|
|
||||||
|
Runs standard fio benchmarks (sequential/random read/write, single/multi-job) against a local or remote target.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Local
|
||||||
|
iobench
|
||||||
|
|
||||||
|
# Over SSH
|
||||||
|
iobench --target ssh/user@host
|
||||||
|
|
||||||
|
# On a Kubernetes pod
|
||||||
|
iobench --target k8s/namespace/pod-name
|
||||||
|
|
||||||
|
# Custom parameters
|
||||||
|
iobench --duration 60 --size 4G --block-size 128k --tests read,write,randwrite
|
||||||
|
```
|
||||||
|
|
||||||
|
Available tests: `read`, `write`, `randread`, `randwrite`, `multiread`, `multiwrite`, `multirandread`, `multirandwrite`.
|
||||||
|
|
||||||
|
Results are saved to `./iobench-{timestamp}/summary.csv`.
|
||||||
|
|
||||||
|
## Redpanda profile
|
||||||
|
|
||||||
|
Characterizes whether a Kubernetes storage backend (e.g. Ceph RBD) can sustain Redpanda's I/O patterns. Deploys a 3-pod StatefulSet with per-node PVCs and runs four fio workloads that model Redpanda's actual storage behavior.
|
||||||
|
|
||||||
|
### Quick start
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Run the full profile (sequential baselines, then parallel contention test)
|
||||||
|
iobench redpanda --storage-class ceph-block
|
||||||
|
|
||||||
|
# Sequential baselines only (single-node, no cluster impact)
|
||||||
|
iobench redpanda --mode sequential --storage-class ceph-block
|
||||||
|
|
||||||
|
# Parallel contention test only
|
||||||
|
iobench redpanda --mode parallel --storage-class ceph-block
|
||||||
|
|
||||||
|
# Keep pods alive after the run (useful for re-running or debugging)
|
||||||
|
iobench redpanda --storage-class ceph-block --keep-deployment
|
||||||
|
|
||||||
|
# Deploy pods without running any workloads
|
||||||
|
iobench deploy --storage-class ceph-block
|
||||||
|
|
||||||
|
# Remove all iobench pods and PVCs
|
||||||
|
iobench undeploy
|
||||||
|
```
|
||||||
|
|
||||||
|
### Options
|
||||||
|
|
||||||
|
| Flag | Default | Description |
|
||||||
|
|------|---------|-------------|
|
||||||
|
| `--mode` | `both` | `sequential`, `parallel`, or `both` |
|
||||||
|
| `--storage-class` | `ceph-block` | Kubernetes StorageClass for the PVCs |
|
||||||
|
| `--pvc-size` | `50Gi` | Size of each PVC |
|
||||||
|
| `--replicas` | `3` | Number of pods (should match cluster node count) |
|
||||||
|
| `--namespace` | `default` | Kubernetes namespace |
|
||||||
|
| `--output-dir` | auto-timestamped | Directory for results |
|
||||||
|
| `--keep-deployment` | `false` | Don't delete pods/PVCs after the run |
|
||||||
|
|
||||||
|
### Workloads
|
||||||
|
|
||||||
|
The profile runs four workloads, each targeting a different aspect of Redpanda's I/O:
|
||||||
|
|
||||||
|
| Workload | What it models | Key params | Runtime |
|
||||||
|
|----------|---------------|------------|---------|
|
||||||
|
| `throughput` | Bulk segment writes (optimistic upper bound) | 16K seq write, no fsync, 4 jobs, iodepth=16 | 5 min |
|
||||||
|
| `fsync_hot_path` | Raft commit path with `acks=all` | 16K seq write, fdatasync per op, 4 jobs, iodepth=4 | 10 min |
|
||||||
|
| `selftest_512k` | `rpk cluster self-test` 512K phase | 512K seq write, fdatasync, 1 job, iodepth=4 | 2 min |
|
||||||
|
| `selftest_4k_qd1` | Worst-case single-stream commit | 4K seq write, fdatasync, 1 job, iodepth=1 | 2 min |
|
||||||
|
|
||||||
|
All workloads use `direct=1` (bypass page cache) and `ioengine=libaio`.
|
||||||
|
|
||||||
|
### Execution modes
|
||||||
|
|
||||||
|
- **`sequential`** -- Runs all four workloads back-to-back on a single node. Clean per-pattern baselines, comparable to Redpanda's published hardware requirements (16,000 IOPS minimum per broker).
|
||||||
|
- **`parallel`** -- Runs all four workloads concurrently across all nodes simultaneously. Each node writes to its own PVC. A wall-clock barrier synchronizes start times across pods so contention windows overlap. This is the production-shape test that exposes the Ceph OSD fan-in pattern.
|
||||||
|
- **`both`** (default) -- Sequential first, then parallel.
|
||||||
|
|
||||||
|
### Interpreting results
|
||||||
|
|
||||||
|
The **headline metric** is the worst p99.9 fdatasync latency on the `fsync_hot_path` workload during parallel execution:
|
||||||
|
|
||||||
|
- **<= 100 ms**: PASS. Storage can sustain Raft heartbeats under load.
|
||||||
|
- **> 100 ms**: FAIL. Raft heartbeats (150 ms interval, 1.5 s election timeout) will not survive load spikes, leading to election storms.
|
||||||
|
|
||||||
|
Reference values from healthy NVMe (from `rpk cluster self-test`):
|
||||||
|
- `selftest_512k`: ~1182 IOPS, ~591 MiB/s
|
||||||
|
- `selftest_4k_qd1`: ~406 IOPS
|
||||||
|
|
||||||
|
### Output
|
||||||
|
|
||||||
|
Results are saved to `./iobench-redpanda-{timestamp}/`:
|
||||||
|
|
||||||
|
```
|
||||||
|
iobench-redpanda-2026-05-03-143022/
|
||||||
|
redpanda_summary.csv # Full results: all metrics, all nodes, all workloads
|
||||||
|
iobench.csv # Dashboard-compatible format
|
||||||
|
sequential/
|
||||||
|
throughput_node-0.json # Raw fio JSON per workload per node
|
||||||
|
fsync_hot_path_node-0.json
|
||||||
|
...
|
||||||
|
node-0/ # Per-node time-series logs
|
||||||
|
throughput_lat.1.log
|
||||||
|
throughput_iops.1.log
|
||||||
|
...
|
||||||
|
parallel/
|
||||||
|
throughput_node-0.json
|
||||||
|
throughput_node-1.json
|
||||||
|
throughput_node-2.json
|
||||||
|
...
|
||||||
|
```
|
||||||
|
|
||||||
|
The CSV includes per-workload per-node: IOPS, bandwidth, completion latency percentiles (p50-p99.99, max), and fdatasync latency percentiles (p50-p99.99, max).
|
||||||
|
|
||||||
|
Per-second time-series logs (`*_lat.*.log`, `*_iops.*.log`) capture the bimodal/spiky behavior of Ceph under load that summary statistics miss.
|
||||||
|
|
||||||
|
### Warning
|
||||||
|
|
||||||
|
Parallel mode generates heavy fdatasync workloads across all nodes simultaneously. This **will impact other workloads on the same Ceph pool**. Run during a maintenance window or off-peak.
|
||||||
|
|
||||||
|
## Dashboard
|
||||||
|
|
||||||
|
A Plotly Dash app for visualizing results. Supports both simple and Redpanda profile data.
|
||||||
|
|
||||||
|
```bash
|
||||||
|
cd iobench/dash
|
||||||
|
virtualenv venv
|
||||||
|
source venv/bin/activate
|
||||||
|
pip install -r requirements_freeze.txt
|
||||||
|
|
||||||
|
# Copy result CSVs into the dash directory, then:
|
||||||
|
python iobench-dash.py
|
||||||
|
```
|
||||||
|
|
||||||
|
The dashboard reads `iobench.csv` and/or `redpanda_summary.csv` from its working directory.
|
||||||
@@ -1,3 +1,5 @@
|
|||||||
|
import os
|
||||||
|
|
||||||
import dash
|
import dash
|
||||||
from dash import dcc, html, Input, Output, State, clientside_callback, ClientsideFunction
|
from dash import dcc, html, Input, Output, State, clientside_callback, ClientsideFunction
|
||||||
import plotly.express as px
|
import plotly.express as px
|
||||||
@@ -6,38 +8,29 @@ import dash_bootstrap_components as dbc
|
|||||||
import io
|
import io
|
||||||
|
|
||||||
# --- Data Loading and Preparation ---
|
# --- Data Loading and Preparation ---
|
||||||
# csv_data = """label,test_name,iops,bandwidth_kibps,latency_mean_ms,latency_stddev_ms
|
dataframes = []
|
||||||
# Ceph HDD Only,read-4k-sync-test,1474.302,5897,0.673,0.591
|
|
||||||
# Ceph HDD Only,write-4k-sync-test,14.126,56,27.074,7.046
|
if os.path.exists("iobench.csv"):
|
||||||
# Ceph HDD Only,randread-4k-sync-test,225.140,900,4.436,6.918
|
df_simple = pd.read_csv("iobench.csv")
|
||||||
# Ceph HDD Only,randwrite-4k-sync-test,13.129,52,34.891,10.859
|
dataframes.append(df_simple)
|
||||||
# Ceph HDD Only,multiread-4k-sync-test,6873.675,27494,0.578,0.764
|
|
||||||
# Ceph HDD Only,multiwrite-4k-sync-test,57.135,228,38.660,11.293
|
if os.path.exists("redpanda_summary.csv"):
|
||||||
# Ceph HDD Only,multirandread-4k-sync-test,2451.376,9805,1.626,2.515
|
df_red = pd.read_csv("redpanda_summary.csv")
|
||||||
# Ceph HDD Only,multirandwrite-4k-sync-test,54.642,218,33.492,13.111
|
df_red['label'] = df_red['mode'] + '-' + df_red['workload']
|
||||||
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,read-4k-sync-test,1495.700,5982,0.664,1.701
|
df_red['test_name'] = df_red['workload']
|
||||||
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,write-4k-sync-test,16.990,67,17.502,9.908
|
cols = ['label', 'test_name', 'iops', 'bandwidth_kibps', 'latency_mean_ms', 'latency_stddev_ms', 'fdatasync_p99_9_ms']
|
||||||
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,randread-4k-sync-test,159.256,637,6.274,9.232
|
df_red = df_red[[c for c in cols if c in df_red.columns]]
|
||||||
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,randwrite-4k-sync-test,16.693,66,24.094,16.099
|
dataframes.append(df_red)
|
||||||
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multiread-4k-sync-test,7305.559,29222,0.544,1.338
|
|
||||||
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multiwrite-4k-sync-test,52.260,209,34.891,17.576
|
if not dataframes:
|
||||||
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multirandread-4k-sync-test,700.606,2802,5.700,10.429
|
raise FileNotFoundError("No iobench.csv or redpanda_summary.csv found in working directory.")
|
||||||
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multirandwrite-4k-sync-test,52.723,210,29.709,25.829
|
|
||||||
# Ceph 2 Hosts WAL+DB SSD Only,randwrite-4k-sync-test,90.037,360,3.617,8.321
|
df = pd.concat(dataframes, ignore_index=True)
|
||||||
# Ceph WAL+DB SSD During Rebuild,randwrite-4k-sync-test,41.008,164,10.138,19.333
|
|
||||||
# Ceph WAL+DB SSD OSD HDD,read-4k-sync-test,1520.299,6081,0.654,1.539
|
|
||||||
# Ceph WAL+DB SSD OSD HDD,write-4k-sync-test,78.528,314,4.074,9.101
|
|
||||||
# Ceph WAL+DB SSD OSD HDD,randread-4k-sync-test,153.303,613,6.518,9.036
|
|
||||||
# Ceph WAL+DB SSD OSD HDD,randwrite-4k-sync-test,48.677,194,8.785,20.356
|
|
||||||
# Ceph WAL+DB SSD OSD HDD,multiread-4k-sync-test,6804.880,27219,0.584,1.422
|
|
||||||
# Ceph WAL+DB SSD OSD HDD,multiwrite-4k-sync-test,311.513,1246,4.978,9.458
|
|
||||||
# Ceph WAL+DB SSD OSD HDD,multirandread-4k-sync-test,581.756,2327,6.869,10.204
|
|
||||||
# Ceph WAL+DB SSD OSD HDD,multirandwrite-4k-sync-test,120.556,482,13.463,25.440
|
|
||||||
# """
|
|
||||||
#
|
|
||||||
# df = pd.read_csv(io.StringIO(csv_data))
|
|
||||||
df = pd.read_csv("iobench.csv") # Replace with the actual file path
|
|
||||||
df['bandwidth_mbps'] = df['bandwidth_kibps'] / 1024
|
df['bandwidth_mbps'] = df['bandwidth_kibps'] / 1024
|
||||||
|
if 'fdatasync_p99_9_ms' not in df.columns:
|
||||||
|
df['fdatasync_p99_9_ms'] = 0.0
|
||||||
|
else:
|
||||||
|
df['fdatasync_p99_9_ms'] = df['fdatasync_p99_9_ms'].fillna(0.0)
|
||||||
|
|
||||||
# --- App Initialization and Global Settings ---
|
# --- App Initialization and Global Settings ---
|
||||||
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.FLATLY])
|
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.FLATLY])
|
||||||
@@ -70,7 +63,8 @@ app.layout = dbc.Container([
|
|||||||
options=[
|
options=[
|
||||||
{'label': 'IOPS', 'value': 'iops'},
|
{'label': 'IOPS', 'value': 'iops'},
|
||||||
{'label': 'Latency (ms)', 'value': 'latency_mean_ms'},
|
{'label': 'Latency (ms)', 'value': 'latency_mean_ms'},
|
||||||
{'label': 'Bandwidth (MB/s)', 'value': 'bandwidth_mbps'}
|
{'label': 'Bandwidth (MB/s)', 'value': 'bandwidth_mbps'},
|
||||||
|
{'label': 'fdatasync p99.9 (ms)', 'value': 'fdatasync_p99_9_ms'},
|
||||||
],
|
],
|
||||||
value=['iops', 'latency_mean_ms', 'bandwidth_mbps'], # Default selection
|
value=['iops', 'latency_mean_ms', 'bandwidth_mbps'], # Default selection
|
||||||
labelClassName="d-block"
|
labelClassName="d-block"
|
||||||
@@ -185,11 +179,12 @@ def update_graphs(selected_metrics, selected_configs, selected_tests):
|
|||||||
metric_titles = {
|
metric_titles = {
|
||||||
'iops': 'IOPS Comparison (Higher is Better)',
|
'iops': 'IOPS Comparison (Higher is Better)',
|
||||||
'latency_mean_ms': 'Mean Latency (ms) Comparison (Lower is Better)',
|
'latency_mean_ms': 'Mean Latency (ms) Comparison (Lower is Better)',
|
||||||
'bandwidth_mbps': 'Bandwidth (MB/s) Comparison (Higher is Better)'
|
'bandwidth_mbps': 'Bandwidth (MB/s) Comparison (Higher is Better)',
|
||||||
|
'fdatasync_p99_9_ms': 'fdatasync p99.9 Latency (ms) Comparison (Lower is Better)',
|
||||||
}
|
}
|
||||||
|
|
||||||
for metric in selected_metrics:
|
for metric in selected_metrics:
|
||||||
sort_order = 'total ascending' if metric == 'latency_mean_ms' else 'total descending'
|
sort_order = 'total ascending' if metric in ('latency_mean_ms', 'fdatasync_p99_9_ms') else 'total descending'
|
||||||
error_y_param = 'latency_stddev_ms' if metric == 'latency_mean_ms' else None
|
error_y_param = 'latency_stddev_ms' if metric == 'latency_mean_ms' else None
|
||||||
|
|
||||||
fig = px.bar(
|
fig = px.bar(
|
||||||
@@ -206,7 +201,8 @@ def update_graphs(selected_metrics, selected_configs, selected_tests):
|
|||||||
"iops": "IOPS",
|
"iops": "IOPS",
|
||||||
"latency_mean_ms": "Mean Latency (ms)",
|
"latency_mean_ms": "Mean Latency (ms)",
|
||||||
"bandwidth_mbps": "Bandwidth (MB/s)",
|
"bandwidth_mbps": "Bandwidth (MB/s)",
|
||||||
"label": "Cluster Configuration"
|
"fdatasync_p99_9_ms": "fdatasync p99.9 (ms)",
|
||||||
|
"label": "Configuration"
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|||||||
224
iobench/src/fio.rs
Normal file
224
iobench/src/fio.rs
Normal file
@@ -0,0 +1,224 @@
|
|||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct FioOutput {
|
||||||
|
pub jobs: Vec<FioJobResult>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct FioJobResult {
|
||||||
|
pub jobname: String,
|
||||||
|
pub read: FioMetrics,
|
||||||
|
pub write: FioMetrics,
|
||||||
|
#[serde(default)]
|
||||||
|
pub sync: Option<SyncMetrics>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct FioMetrics {
|
||||||
|
pub bw: f64,
|
||||||
|
pub iops: f64,
|
||||||
|
pub clat_ns: LatencyMetrics,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct SyncMetrics {
|
||||||
|
pub total_ios: u64,
|
||||||
|
pub lat_ns: LatencyMetrics,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize, Deserialize)]
|
||||||
|
pub struct LatencyMetrics {
|
||||||
|
pub mean: f64,
|
||||||
|
pub stddev: f64,
|
||||||
|
#[serde(default)]
|
||||||
|
pub percentile: Option<HashMap<String, f64>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Serialize)]
|
||||||
|
pub struct BenchmarkResult {
|
||||||
|
pub test_name: String,
|
||||||
|
pub iops: f64,
|
||||||
|
pub bandwidth_kibps: f64,
|
||||||
|
pub latency_mean_ms: f64,
|
||||||
|
pub latency_stddev_ms: f64,
|
||||||
|
pub clat_p50_ms: f64,
|
||||||
|
pub clat_p95_ms: f64,
|
||||||
|
pub clat_p99_ms: f64,
|
||||||
|
pub clat_p99_9_ms: f64,
|
||||||
|
pub clat_p99_99_ms: f64,
|
||||||
|
pub clat_max_ms: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn percentile_ns_to_ms(p: &Option<HashMap<String, f64>>, key: &str) -> f64 {
|
||||||
|
p.as_ref().and_then(|m| m.get(key).copied()).unwrap_or(0.0) / 1_000_000.0
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Parse fio JSON output and extract metrics for the named job.
|
||||||
|
/// The `rw` parameter selects whether to read from the `read` or `write`
|
||||||
|
/// metrics block — any value containing "read" uses the read block,
|
||||||
|
/// everything else uses write.
|
||||||
|
pub fn parse_fio_output(
|
||||||
|
output: &str,
|
||||||
|
test_name: &str,
|
||||||
|
rw: &str,
|
||||||
|
) -> Result<BenchmarkResult, String> {
|
||||||
|
let fio_data: FioOutput = serde_json::from_str(output)
|
||||||
|
.map_err(|e| format!("Failed to deserialize fio JSON: {}", e))?;
|
||||||
|
|
||||||
|
let job_result = fio_data
|
||||||
|
.jobs
|
||||||
|
.iter()
|
||||||
|
.find(|j| j.jobname == test_name)
|
||||||
|
.ok_or_else(|| {
|
||||||
|
format!(
|
||||||
|
"Could not find job result for '{}' in fio output",
|
||||||
|
test_name
|
||||||
|
)
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let metrics = if rw.contains("read") {
|
||||||
|
&job_result.read
|
||||||
|
} else {
|
||||||
|
&job_result.write
|
||||||
|
};
|
||||||
|
|
||||||
|
let p = &metrics.clat_ns.percentile;
|
||||||
|
Ok(BenchmarkResult {
|
||||||
|
test_name: test_name.to_string(),
|
||||||
|
iops: metrics.iops,
|
||||||
|
bandwidth_kibps: metrics.bw,
|
||||||
|
latency_mean_ms: metrics.clat_ns.mean / 1_000_000.0,
|
||||||
|
latency_stddev_ms: metrics.clat_ns.stddev / 1_000_000.0,
|
||||||
|
clat_p50_ms: percentile_ns_to_ms(p, "50.000000"),
|
||||||
|
clat_p95_ms: percentile_ns_to_ms(p, "95.000000"),
|
||||||
|
clat_p99_ms: percentile_ns_to_ms(p, "99.000000"),
|
||||||
|
clat_p99_9_ms: percentile_ns_to_ms(p, "99.900000"),
|
||||||
|
clat_p99_99_ms: percentile_ns_to_ms(p, "99.990000"),
|
||||||
|
clat_max_ms: percentile_ns_to_ms(p, "100.000000"),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn percentile_ns_to_ms_converts_correctly() {
|
||||||
|
let mut m = HashMap::new();
|
||||||
|
m.insert("50.000000".to_string(), 4_000_000.0); // 4ms in ns
|
||||||
|
let p = Some(m);
|
||||||
|
assert!((percentile_ns_to_ms(&p, "50.000000") - 4.0).abs() < 0.001);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn percentile_ns_to_ms_missing_key_returns_zero() {
|
||||||
|
let m = HashMap::new();
|
||||||
|
assert_eq!(percentile_ns_to_ms(&Some(m), "99.000000"), 0.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn percentile_ns_to_ms_none_returns_zero() {
|
||||||
|
assert_eq!(percentile_ns_to_ms(&None, "50.000000"), 0.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_fio_output_selects_read_metrics() {
|
||||||
|
let json = serde_json::json!({
|
||||||
|
"jobs": [{
|
||||||
|
"jobname": "readtest",
|
||||||
|
"read": {
|
||||||
|
"bw": 100000.0, "iops": 5000.0,
|
||||||
|
"clat_ns": { "mean": 200_000.0, "stddev": 50_000.0 }
|
||||||
|
},
|
||||||
|
"write": {
|
||||||
|
"bw": 0.0, "iops": 0.0,
|
||||||
|
"clat_ns": { "mean": 0.0, "stddev": 0.0 }
|
||||||
|
}
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
let result = parse_fio_output(&json, "readtest", "read").unwrap();
|
||||||
|
assert!((result.iops - 5000.0).abs() < 0.001);
|
||||||
|
assert!((result.bandwidth_kibps - 100000.0).abs() < 0.001);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_fio_output_selects_write_metrics() {
|
||||||
|
let json = serde_json::json!({
|
||||||
|
"jobs": [{
|
||||||
|
"jobname": "writetest",
|
||||||
|
"read": {
|
||||||
|
"bw": 0.0, "iops": 0.0,
|
||||||
|
"clat_ns": { "mean": 0.0, "stddev": 0.0 }
|
||||||
|
},
|
||||||
|
"write": {
|
||||||
|
"bw": 51200.0, "iops": 3200.0,
|
||||||
|
"clat_ns": { "mean": 5_000_000.0, "stddev": 1_000_000.0 }
|
||||||
|
}
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
let result = parse_fio_output(&json, "writetest", "write").unwrap();
|
||||||
|
assert!((result.iops - 3200.0).abs() < 0.001);
|
||||||
|
assert!((result.latency_mean_ms - 5.0).abs() < 0.001);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_fio_output_no_percentiles_returns_zeros() {
|
||||||
|
let json = serde_json::json!({
|
||||||
|
"jobs": [{
|
||||||
|
"jobname": "test",
|
||||||
|
"read": {
|
||||||
|
"bw": 0.0, "iops": 0.0,
|
||||||
|
"clat_ns": { "mean": 0.0, "stddev": 0.0 }
|
||||||
|
},
|
||||||
|
"write": {
|
||||||
|
"bw": 1000.0, "iops": 100.0,
|
||||||
|
"clat_ns": { "mean": 10_000_000.0, "stddev": 2_000_000.0 }
|
||||||
|
}
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
.to_string();
|
||||||
|
|
||||||
|
let result = parse_fio_output(&json, "test", "write").unwrap();
|
||||||
|
assert_eq!(result.clat_p50_ms, 0.0);
|
||||||
|
assert_eq!(result.clat_p99_ms, 0.0);
|
||||||
|
assert_eq!(result.clat_max_ms, 0.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_fio_output_invalid_json_errors() {
|
||||||
|
let result = parse_fio_output("{not json", "test", "write");
|
||||||
|
assert!(result.is_err());
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn fio_output_deserializes_with_optional_sync() {
|
||||||
|
// Without sync field
|
||||||
|
let json = serde_json::json!({
|
||||||
|
"jobs": [{
|
||||||
|
"jobname": "test",
|
||||||
|
"read": { "bw": 0.0, "iops": 0.0, "clat_ns": { "mean": 0.0, "stddev": 0.0 } },
|
||||||
|
"write": { "bw": 0.0, "iops": 0.0, "clat_ns": { "mean": 0.0, "stddev": 0.0 } }
|
||||||
|
}]
|
||||||
|
});
|
||||||
|
let parsed: FioOutput = serde_json::from_value(json).unwrap();
|
||||||
|
assert!(parsed.jobs[0].sync.is_none());
|
||||||
|
|
||||||
|
// With sync field
|
||||||
|
let json = serde_json::json!({
|
||||||
|
"jobs": [{
|
||||||
|
"jobname": "test",
|
||||||
|
"read": { "bw": 0.0, "iops": 0.0, "clat_ns": { "mean": 0.0, "stddev": 0.0 } },
|
||||||
|
"write": { "bw": 0.0, "iops": 0.0, "clat_ns": { "mean": 0.0, "stddev": 0.0 } },
|
||||||
|
"sync": { "total_ios": 100, "lat_ns": { "mean": 500.0, "stddev": 100.0 } }
|
||||||
|
}]
|
||||||
|
});
|
||||||
|
let parsed: FioOutput = serde_json::from_value(json).unwrap();
|
||||||
|
assert_eq!(parsed.jobs[0].sync.as_ref().unwrap().total_ios, 100);
|
||||||
|
}
|
||||||
|
}
|
||||||
193
iobench/src/k8s.rs
Normal file
193
iobench/src/k8s.rs
Normal file
@@ -0,0 +1,193 @@
|
|||||||
|
use std::collections::BTreeMap;
|
||||||
|
use std::io;
|
||||||
|
use std::io::Write;
|
||||||
|
use std::process::{Command, Stdio};
|
||||||
|
|
||||||
|
use k8s_openapi::api::apps::v1::StatefulSet;
|
||||||
|
use k8s_openapi::api::core::v1::{
|
||||||
|
Affinity, Container, PersistentVolumeClaim, PersistentVolumeClaimSpec, PodAffinityTerm,
|
||||||
|
PodAntiAffinity, PodSpec, PodTemplateSpec, VolumeMount, VolumeResourceRequirements,
|
||||||
|
};
|
||||||
|
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
|
||||||
|
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta};
|
||||||
|
|
||||||
|
pub fn redpanda_statefulset(
|
||||||
|
name: &str,
|
||||||
|
namespace: &str,
|
||||||
|
replicas: i32,
|
||||||
|
storage_class: &str,
|
||||||
|
size: &str,
|
||||||
|
image: &str,
|
||||||
|
) -> StatefulSet {
|
||||||
|
let mut labels = BTreeMap::new();
|
||||||
|
labels.insert("app".to_string(), name.to_string());
|
||||||
|
|
||||||
|
let pvc = PersistentVolumeClaim {
|
||||||
|
metadata: ObjectMeta {
|
||||||
|
name: Some("data".to_string()),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
spec: Some(PersistentVolumeClaimSpec {
|
||||||
|
access_modes: Some(vec!["ReadWriteOnce".to_string()]),
|
||||||
|
resources: Some(VolumeResourceRequirements {
|
||||||
|
requests: Some({
|
||||||
|
let mut m = BTreeMap::new();
|
||||||
|
m.insert("storage".to_string(), Quantity(size.to_string()));
|
||||||
|
m
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
storage_class_name: Some(storage_class.to_string()),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let container = Container {
|
||||||
|
name: "fio".to_string(),
|
||||||
|
image: Some(image.to_string()),
|
||||||
|
image_pull_policy: Some("IfNotPresent".to_string()),
|
||||||
|
command: Some(vec!["sleep".to_string(), "infinity".to_string()]),
|
||||||
|
volume_mounts: Some(vec![VolumeMount {
|
||||||
|
name: "data".to_string(),
|
||||||
|
mount_path: "/data".to_string(),
|
||||||
|
..Default::default()
|
||||||
|
}]),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
let pod_spec = PodSpec {
|
||||||
|
containers: vec![container],
|
||||||
|
affinity: Some(Affinity {
|
||||||
|
pod_anti_affinity: Some(PodAntiAffinity {
|
||||||
|
required_during_scheduling_ignored_during_execution: Some(vec![PodAffinityTerm {
|
||||||
|
label_selector: Some(LabelSelector {
|
||||||
|
match_labels: Some(labels.clone()),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
topology_key: "kubernetes.io/hostname".to_string(),
|
||||||
|
..Default::default()
|
||||||
|
}]),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
|
||||||
|
StatefulSet {
|
||||||
|
metadata: ObjectMeta {
|
||||||
|
name: Some(name.to_string()),
|
||||||
|
namespace: Some(namespace.to_string()),
|
||||||
|
labels: Some(labels.clone()),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
spec: Some(k8s_openapi::api::apps::v1::StatefulSetSpec {
|
||||||
|
replicas: Some(replicas),
|
||||||
|
selector: LabelSelector {
|
||||||
|
match_labels: Some(labels.clone()),
|
||||||
|
..Default::default()
|
||||||
|
},
|
||||||
|
service_name: Some(name.to_string()),
|
||||||
|
pod_management_policy: Some("Parallel".to_string()),
|
||||||
|
template: PodTemplateSpec {
|
||||||
|
metadata: Some(ObjectMeta {
|
||||||
|
labels: Some(labels),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
spec: Some(pod_spec),
|
||||||
|
},
|
||||||
|
volume_claim_templates: Some(vec![pvc]),
|
||||||
|
..Default::default()
|
||||||
|
}),
|
||||||
|
..Default::default()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn run_kubectl(args: &[&str]) -> io::Result<String> {
|
||||||
|
let mut cmd = Command::new("kubectl");
|
||||||
|
cmd.args(args);
|
||||||
|
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
|
||||||
|
let output = cmd.output()?;
|
||||||
|
if !output.status.success() {
|
||||||
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||||
|
return Err(io::Error::other(format!(
|
||||||
|
"kubectl failed: {} (args: {:?})",
|
||||||
|
stderr, args
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
Ok(String::from_utf8_lossy(&output.stdout).to_string())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn apply_yaml(yaml: &str) -> io::Result<()> {
|
||||||
|
let mut cmd = Command::new("kubectl");
|
||||||
|
cmd.args(["apply", "-f", "-"]);
|
||||||
|
cmd.stdin(Stdio::piped());
|
||||||
|
cmd.stdout(Stdio::piped());
|
||||||
|
cmd.stderr(Stdio::piped());
|
||||||
|
|
||||||
|
let mut child = cmd.spawn()?;
|
||||||
|
{
|
||||||
|
let stdin = child.stdin.as_mut().unwrap();
|
||||||
|
stdin.write_all(yaml.as_bytes())?;
|
||||||
|
}
|
||||||
|
let output = child.wait_with_output()?;
|
||||||
|
if !output.status.success() {
|
||||||
|
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||||
|
return Err(io::Error::other(format!(
|
||||||
|
"kubectl apply failed: {}",
|
||||||
|
stderr
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn wait_for_pods(namespace: &str, name: &str) -> io::Result<()> {
|
||||||
|
run_kubectl(&[
|
||||||
|
"rollout",
|
||||||
|
"status",
|
||||||
|
"statefulset",
|
||||||
|
name,
|
||||||
|
"-n",
|
||||||
|
namespace,
|
||||||
|
"--timeout",
|
||||||
|
"300s",
|
||||||
|
])?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn get_pod_names(namespace: &str, label: &str) -> io::Result<Vec<String>> {
|
||||||
|
let out = run_kubectl(&[
|
||||||
|
"get",
|
||||||
|
"pods",
|
||||||
|
"-n",
|
||||||
|
namespace,
|
||||||
|
"-l",
|
||||||
|
&format!("app={}", label),
|
||||||
|
"-o",
|
||||||
|
"jsonpath={.items[*].metadata.name}",
|
||||||
|
])?;
|
||||||
|
Ok(out.split_whitespace().map(|s| s.to_string()).collect())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn exec(namespace: &str, pod: &str, command: &str) -> io::Result<String> {
|
||||||
|
run_kubectl(&["exec", "-n", namespace, pod, "--", "sh", "-c", command])
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn cp_from(namespace: &str, pod: &str, remote: &str, local: &str) -> io::Result<()> {
|
||||||
|
run_kubectl(&["cp", "-n", namespace, &format!("{}:{}", pod, remote), local])?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn delete_resources(namespace: &str, label: &str) -> io::Result<()> {
|
||||||
|
run_kubectl(&[
|
||||||
|
"delete",
|
||||||
|
"statefulset,pvc",
|
||||||
|
"-n",
|
||||||
|
namespace,
|
||||||
|
"-l",
|
||||||
|
&format!("app={}", label),
|
||||||
|
"--wait=false",
|
||||||
|
])?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
@@ -1,23 +1,22 @@
|
|||||||
use std::fs;
|
use std::io;
|
||||||
use std::io::{self, Write};
|
use std::io::Write;
|
||||||
use std::process::{Command, Stdio};
|
use std::process::{Command, Stdio};
|
||||||
use std::thread;
|
|
||||||
use std::time::Duration;
|
|
||||||
|
|
||||||
use chrono::Local;
|
use clap::{Parser, Subcommand};
|
||||||
use clap::Parser;
|
|
||||||
use serde::{Deserialize, Serialize};
|
mod fio;
|
||||||
|
mod k8s;
|
||||||
|
mod redpanda;
|
||||||
|
mod simple;
|
||||||
|
|
||||||
/// A simple yet powerful I/O benchmarking tool using fio.
|
/// A simple yet powerful I/O benchmarking tool using fio.
|
||||||
#[derive(Parser, Debug)]
|
#[derive(Parser, Debug)]
|
||||||
#[command(author, version, about, long_about = None)]
|
#[command(author, version, about, long_about = None)]
|
||||||
struct Args {
|
struct Cli {
|
||||||
/// Target for the benchmark.
|
#[command(subcommand)]
|
||||||
/// Formats:
|
command: Option<Commands>,
|
||||||
/// - localhost (default)
|
|
||||||
/// - ssh/{user}@{host}
|
// Simple profile args (used when no subcommand is given)
|
||||||
/// - ssh/{user}@{host}:{port}
|
|
||||||
/// - k8s/{namespace}/{pod}
|
|
||||||
#[arg(short, long, default_value = "localhost")]
|
#[arg(short, long, default_value = "localhost")]
|
||||||
target: String,
|
target: String,
|
||||||
|
|
||||||
@@ -27,7 +26,10 @@ struct Args {
|
|||||||
/// Comma-separated list of tests to run.
|
/// Comma-separated list of tests to run.
|
||||||
/// Available tests: read, write, randread, randwrite,
|
/// Available tests: read, write, randread, randwrite,
|
||||||
/// multiread, multiwrite, multirandread, multirandwrite.
|
/// multiread, multiwrite, multirandread, multirandwrite.
|
||||||
#[arg(long, default_value = "read,write,randread,randwrite,multiread,multiwrite,multirandread,multirandwrite")]
|
#[arg(
|
||||||
|
long,
|
||||||
|
default_value = "read,write,randread,randwrite,multiread,multiwrite,multirandread,multirandwrite"
|
||||||
|
)]
|
||||||
tests: String,
|
tests: String,
|
||||||
|
|
||||||
/// Duration of each test in seconds.
|
/// Duration of each test in seconds.
|
||||||
@@ -48,143 +50,49 @@ struct Args {
|
|||||||
block_size: String,
|
block_size: String,
|
||||||
}
|
}
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
#[derive(Subcommand, Debug)]
|
||||||
struct FioOutput {
|
enum Commands {
|
||||||
jobs: Vec<FioJobResult>,
|
/// Run the Redpanda storage characterization profile.
|
||||||
}
|
Redpanda(redpanda::RedpandaArgs),
|
||||||
|
/// Deploy the Redpanda iobench resources and exit.
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
Deploy(redpanda::RedpandaArgs),
|
||||||
struct FioJobResult {
|
/// Remove the Redpanda iobench resources.
|
||||||
jobname: String,
|
Undeploy(redpanda::RedpandaArgs),
|
||||||
read: FioMetrics,
|
|
||||||
write: FioMetrics,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
struct FioMetrics {
|
|
||||||
bw: f64,
|
|
||||||
iops: f64,
|
|
||||||
clat_ns: LatencyMetrics,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize, Deserialize)]
|
|
||||||
struct LatencyMetrics {
|
|
||||||
mean: f64,
|
|
||||||
stddev: f64,
|
|
||||||
}
|
|
||||||
|
|
||||||
#[derive(Debug, Serialize)]
|
|
||||||
struct BenchmarkResult {
|
|
||||||
test_name: String,
|
|
||||||
iops: f64,
|
|
||||||
bandwidth_kibps: f64,
|
|
||||||
latency_mean_ms: f64,
|
|
||||||
latency_stddev_ms: f64,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn main() -> io::Result<()> {
|
fn main() -> io::Result<()> {
|
||||||
let args = Args::parse();
|
let cli = Cli::parse();
|
||||||
|
|
||||||
let output_dir = args.output_dir.unwrap_or_else(|| {
|
match cli.command {
|
||||||
format!("./iobench-{}", Local::now().format("%Y-%m-%d-%H%M%S"))
|
None => {
|
||||||
});
|
// Backward-compatible simple benchmark
|
||||||
fs::create_dir_all(&output_dir)?;
|
let args = simple::SimpleArgs {
|
||||||
|
target: cli.target,
|
||||||
let tests_to_run: Vec<&str> = args.tests.split(',').collect();
|
benchmark_dir: cli.benchmark_dir,
|
||||||
let mut results = Vec::new();
|
tests: cli.tests,
|
||||||
|
duration: cli.duration,
|
||||||
for test in tests_to_run {
|
output_dir: cli.output_dir,
|
||||||
println!("--------------------------------------------------");
|
size: cli.size,
|
||||||
println!("Running test: {}", test);
|
block_size: cli.block_size,
|
||||||
|
|
||||||
let (rw, numjobs) = match test {
|
|
||||||
"read" => ("read", 1),
|
|
||||||
"write" => ("write", 1),
|
|
||||||
"randread" => ("randread", 1),
|
|
||||||
"randwrite" => ("randwrite", 1),
|
|
||||||
"multiread" => ("read", 4),
|
|
||||||
"multiwrite" => ("write", 4),
|
|
||||||
"multirandread" => ("randread", 4),
|
|
||||||
"multirandwrite" => ("randwrite", 4),
|
|
||||||
_ => {
|
|
||||||
eprintln!("Unknown test: {}. Skipping.", test);
|
|
||||||
continue;
|
|
||||||
}
|
|
||||||
};
|
};
|
||||||
|
simple::run(&args)
|
||||||
let test_name = format!("{}-{}-sync-test", test, args.block_size);
|
|
||||||
let fio_command = format!(
|
|
||||||
"fio --filename={}/iobench_testfile --direct=1 --fsync=1 --rw={} --bs={} --numjobs={} --iodepth=1 --runtime={} --time_based --group_reporting --name={} --size={} --output-format=json",
|
|
||||||
args.benchmark_dir, rw, args.block_size, numjobs, args.duration, test_name, args.size
|
|
||||||
);
|
|
||||||
|
|
||||||
println!("Executing command:\n{}\n", fio_command);
|
|
||||||
|
|
||||||
let output = match run_command(&args.target, &fio_command) {
|
|
||||||
Ok(out) => out,
|
|
||||||
Err(e) => {
|
|
||||||
eprintln!("Failed to execute command for test {}: {}", test, e);
|
|
||||||
continue;
|
|
||||||
}
|
}
|
||||||
};
|
Some(Commands::Redpanda(args)) => redpanda::run(&args),
|
||||||
|
Some(Commands::Deploy(args)) => {
|
||||||
|
let mut a = args.clone();
|
||||||
let result = parse_fio_output(&output, &test_name, rw);
|
a.deploy_only = true;
|
||||||
// TODO store raw fio output and print it
|
redpanda::run(&a)
|
||||||
match result {
|
|
||||||
Ok(res) => {
|
|
||||||
results.push(res);
|
|
||||||
}
|
}
|
||||||
Err(e) => {
|
Some(Commands::Undeploy(args)) => redpanda::undeploy(&args),
|
||||||
eprintln!("Error parsing fio output for test {}: {}", test, e);
|
|
||||||
eprintln!("Raw output:\n{}", output);
|
|
||||||
}
|
}
|
||||||
}
|
|
||||||
|
|
||||||
println!("{output}");
|
|
||||||
println!("Test {} completed.", test);
|
|
||||||
// A brief pause to let the system settle before the next test.
|
|
||||||
thread::sleep(Duration::from_secs(2));
|
|
||||||
}
|
|
||||||
|
|
||||||
// Cleanup the test file on the target
|
|
||||||
println!("--------------------------------------------------");
|
|
||||||
println!("Cleaning up test file on target...");
|
|
||||||
let cleanup_command = "rm -f ./iobench_testfile";
|
|
||||||
if let Err(e) = run_command(&args.target, cleanup_command) {
|
|
||||||
eprintln!("Warning: Failed to clean up test file on target: {}", e);
|
|
||||||
} else {
|
|
||||||
println!("Cleanup successful.");
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
if results.is_empty() {
|
|
||||||
println!("\nNo benchmark results to display.");
|
|
||||||
return Ok(());
|
|
||||||
}
|
|
||||||
|
|
||||||
// Output results to a CSV file for easy analysis
|
|
||||||
let csv_path = format!("{}/summary.csv", output_dir);
|
|
||||||
let mut wtr = csv::Writer::from_path(&csv_path)?;
|
|
||||||
for result in &results {
|
|
||||||
wtr.serialize(result)?;
|
|
||||||
}
|
|
||||||
wtr.flush()?;
|
|
||||||
|
|
||||||
println!("\nBenchmark summary saved to {}", csv_path);
|
|
||||||
println!("\n--- Benchmark Results Summary ---");
|
|
||||||
println!("{:<25} {:>10} {:>18} {:>20} {:>22}", "Test Name", "IOPS", "Bandwidth (KiB/s)", "Latency Mean (ms)", "Latency StdDev (ms)");
|
|
||||||
println!("{:-<98}", "");
|
|
||||||
for result in results {
|
|
||||||
println!("{:<25} {:>10.2} {:>18.2} {:>20.4} {:>22.4}", result.test_name, result.iops, result.bandwidth_kibps, result.latency_mean_ms, result.latency_stddev_ms);
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_command(target: &str, command: &str) -> io::Result<String> {
|
pub fn run_command(target: &str, command: &str) -> io::Result<String> {
|
||||||
let (program, args) = if target == "localhost" {
|
let (program, args) = if target == "localhost" {
|
||||||
("sudo", vec!["sh".to_string(), "-c".to_string(), command.to_string()])
|
(
|
||||||
|
"sudo",
|
||||||
|
vec!["sh".to_string(), "-c".to_string(), command.to_string()],
|
||||||
|
)
|
||||||
} else if target.starts_with("ssh/") {
|
} else if target.starts_with("ssh/") {
|
||||||
let target_str = target.strip_prefix("ssh/").unwrap();
|
let target_str = target.strip_prefix("ssh/").unwrap();
|
||||||
let ssh_target;
|
let ssh_target;
|
||||||
@@ -203,13 +111,31 @@ fn run_command(target: &str, command: &str) -> io::Result<String> {
|
|||||||
} else if target.starts_with("k8s/") {
|
} else if target.starts_with("k8s/") {
|
||||||
let parts: Vec<&str> = target.strip_prefix("k8s/").unwrap().split('/').collect();
|
let parts: Vec<&str> = target.strip_prefix("k8s/").unwrap().split('/').collect();
|
||||||
if parts.len() != 2 {
|
if parts.len() != 2 {
|
||||||
return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid k8s target format. Expected k8s/{namespace}/{pod}"));
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::InvalidInput,
|
||||||
|
"Invalid k8s target format. Expected k8s/{namespace}/{pod}",
|
||||||
|
));
|
||||||
}
|
}
|
||||||
let namespace = parts[0];
|
let namespace = parts[0];
|
||||||
let pod = parts[1];
|
let pod = parts[1];
|
||||||
("kubectl", vec!["exec".to_string(), "-n".to_string(), namespace.to_string(), pod.to_string(), "--".to_string(), "sh".to_string(), "-c".to_string(), command.to_string()])
|
(
|
||||||
|
"kubectl",
|
||||||
|
vec![
|
||||||
|
"exec".to_string(),
|
||||||
|
"-n".to_string(),
|
||||||
|
namespace.to_string(),
|
||||||
|
pod.to_string(),
|
||||||
|
"--".to_string(),
|
||||||
|
"sh".to_string(),
|
||||||
|
"-c".to_string(),
|
||||||
|
command.to_string(),
|
||||||
|
],
|
||||||
|
)
|
||||||
} else {
|
} else {
|
||||||
return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid target format"));
|
return Err(io::Error::new(
|
||||||
|
io::ErrorKind::InvalidInput,
|
||||||
|
"Invalid target format",
|
||||||
|
));
|
||||||
};
|
};
|
||||||
|
|
||||||
let mut cmd = Command::new(program);
|
let mut cmd = Command::new(program);
|
||||||
@@ -222,32 +148,8 @@ fn run_command(target: &str, command: &str) -> io::Result<String> {
|
|||||||
if !output.status.success() {
|
if !output.status.success() {
|
||||||
eprintln!("Command failed with status: {}", output.status);
|
eprintln!("Command failed with status: {}", output.status);
|
||||||
io::stderr().write_all(&output.stderr)?;
|
io::stderr().write_all(&output.stderr)?;
|
||||||
return Err(io::Error::new(io::ErrorKind::Other, "Command execution failed"));
|
return Err(io::Error::other("Command execution failed"));
|
||||||
}
|
}
|
||||||
|
|
||||||
String::from_utf8(output.stdout)
|
String::from_utf8(output.stdout).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
|
||||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
|
|
||||||
}
|
|
||||||
|
|
||||||
fn parse_fio_output(output: &str, test_name: &str, rw: &str) -> Result<BenchmarkResult, String> {
|
|
||||||
let fio_data: FioOutput = serde_json::from_str(output)
|
|
||||||
.map_err(|e| format!("Failed to deserialize fio JSON: {}", e))?;
|
|
||||||
|
|
||||||
let job_result = fio_data.jobs.iter()
|
|
||||||
.find(|j| j.jobname == test_name)
|
|
||||||
.ok_or_else(|| format!("Could not find job result for '{}' in fio output", test_name))?;
|
|
||||||
|
|
||||||
let metrics = if rw.contains("read") {
|
|
||||||
&job_result.read
|
|
||||||
} else {
|
|
||||||
&job_result.write
|
|
||||||
};
|
|
||||||
|
|
||||||
Ok(BenchmarkResult {
|
|
||||||
test_name: test_name.to_string(),
|
|
||||||
iops: metrics.iops,
|
|
||||||
bandwidth_kibps: metrics.bw,
|
|
||||||
latency_mean_ms: metrics.clat_ns.mean / 1_000_000.0,
|
|
||||||
latency_stddev_ms: metrics.clat_ns.stddev / 1_000_000.0,
|
|
||||||
})
|
|
||||||
}
|
}
|
||||||
|
|||||||
994
iobench/src/redpanda.rs
Normal file
994
iobench/src/redpanda.rs
Normal file
@@ -0,0 +1,994 @@
|
|||||||
|
use std::fs;
|
||||||
|
use std::io;
|
||||||
|
use std::path::Path;
|
||||||
|
use std::sync::Arc;
|
||||||
|
use std::sync::atomic::{AtomicBool, Ordering};
|
||||||
|
use std::thread;
|
||||||
|
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
use std::collections::HashMap;
|
||||||
|
|
||||||
|
use chrono::Local;
|
||||||
|
use clap::Args;
|
||||||
|
|
||||||
|
use crate::fio::{FioOutput, parse_fio_output};
|
||||||
|
use crate::k8s;
|
||||||
|
|
||||||
|
const NAME: &str = "iobench-redpanda";
|
||||||
|
const IMAGE: &str = "juicedata/fio:latest";
|
||||||
|
const BARRIER_SECS: u64 = 60;
|
||||||
|
|
||||||
|
const WORKLOAD_THROUGHPUT: &str = r#"[global]
|
||||||
|
direct=1
|
||||||
|
ioengine=libaio
|
||||||
|
group_reporting
|
||||||
|
time_based
|
||||||
|
log_avg_msec=1000
|
||||||
|
filename=/data/iobench_testfile
|
||||||
|
size=10G
|
||||||
|
|
||||||
|
[throughput]
|
||||||
|
rw=write
|
||||||
|
bs=16K
|
||||||
|
numjobs=4
|
||||||
|
iodepth=16
|
||||||
|
runtime=300
|
||||||
|
"#;
|
||||||
|
|
||||||
|
const WORKLOAD_FSYNC_HOT_PATH: &str = r#"[global]
|
||||||
|
direct=1
|
||||||
|
ioengine=libaio
|
||||||
|
group_reporting
|
||||||
|
time_based
|
||||||
|
log_avg_msec=1000
|
||||||
|
filename=/data/iobench_testfile
|
||||||
|
size=5G
|
||||||
|
|
||||||
|
[fsync_hot_path]
|
||||||
|
rw=write
|
||||||
|
bs=16K
|
||||||
|
fdatasync=1
|
||||||
|
numjobs=4
|
||||||
|
iodepth=4
|
||||||
|
runtime=600
|
||||||
|
"#;
|
||||||
|
|
||||||
|
const WORKLOAD_SELFTEST_512K: &str = r#"[global]
|
||||||
|
direct=1
|
||||||
|
ioengine=libaio
|
||||||
|
group_reporting
|
||||||
|
time_based
|
||||||
|
log_avg_msec=1000
|
||||||
|
filename=/data/iobench_testfile
|
||||||
|
size=10G
|
||||||
|
|
||||||
|
[selftest_512k]
|
||||||
|
rw=write
|
||||||
|
bs=512K
|
||||||
|
fdatasync=1
|
||||||
|
numjobs=1
|
||||||
|
iodepth=4
|
||||||
|
runtime=120
|
||||||
|
"#;
|
||||||
|
|
||||||
|
const WORKLOAD_SELFTEST_4K_QD1: &str = r#"[global]
|
||||||
|
direct=1
|
||||||
|
ioengine=libaio
|
||||||
|
group_reporting
|
||||||
|
time_based
|
||||||
|
log_avg_msec=1000
|
||||||
|
filename=/data/iobench_testfile
|
||||||
|
size=2G
|
||||||
|
|
||||||
|
[selftest_4k_qd1]
|
||||||
|
rw=write
|
||||||
|
bs=4K
|
||||||
|
fdatasync=1
|
||||||
|
numjobs=1
|
||||||
|
iodepth=1
|
||||||
|
runtime=120
|
||||||
|
"#;
|
||||||
|
|
||||||
|
struct Workload {
|
||||||
|
name: &'static str,
|
||||||
|
template: &'static str,
|
||||||
|
runtime_secs: u64,
|
||||||
|
description: &'static str,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn workloads() -> Vec<Workload> {
|
||||||
|
vec![
|
||||||
|
Workload {
|
||||||
|
name: "throughput",
|
||||||
|
template: WORKLOAD_THROUGHPUT,
|
||||||
|
runtime_secs: 300,
|
||||||
|
description: "16K seq write, no fsync, 4 jobs — bulk segment write upper bound",
|
||||||
|
},
|
||||||
|
Workload {
|
||||||
|
name: "fsync_hot_path",
|
||||||
|
template: WORKLOAD_FSYNC_HOT_PATH,
|
||||||
|
runtime_secs: 600,
|
||||||
|
description: "16K seq write, fdatasync/op, 4 jobs — Raft acks=all commit path",
|
||||||
|
},
|
||||||
|
Workload {
|
||||||
|
name: "selftest_512k",
|
||||||
|
template: WORKLOAD_SELFTEST_512K,
|
||||||
|
runtime_secs: 120,
|
||||||
|
description: "512K seq write, fdatasync, 1 job — rpk self-test 512K phase",
|
||||||
|
},
|
||||||
|
Workload {
|
||||||
|
name: "selftest_4k_qd1",
|
||||||
|
template: WORKLOAD_SELFTEST_4K_QD1,
|
||||||
|
runtime_secs: 120,
|
||||||
|
description: "4K seq write, fdatasync, iodepth=1 — worst-case single-stream commit",
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
|
||||||
|
fn total_runtime_secs(workloads: &[Workload]) -> u64 {
|
||||||
|
workloads
|
||||||
|
.iter()
|
||||||
|
.map(|w| BARRIER_SECS + w.runtime_secs)
|
||||||
|
.sum()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn fmt_duration(secs: u64) -> String {
|
||||||
|
if secs >= 3600 {
|
||||||
|
format!("{}h{:02}m", secs / 3600, (secs % 3600) / 60)
|
||||||
|
} else if secs >= 60 {
|
||||||
|
format!("{}m{:02}s", secs / 60, secs % 60)
|
||||||
|
} else {
|
||||||
|
format!("{}s", secs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Args, Debug, Clone)]
|
||||||
|
pub struct RedpandaArgs {
|
||||||
|
#[arg(long, default_value = "both")]
|
||||||
|
pub mode: String,
|
||||||
|
#[arg(long, default_value = "ceph-block")]
|
||||||
|
pub storage_class: String,
|
||||||
|
#[arg(long, default_value = "50Gi")]
|
||||||
|
pub pvc_size: String,
|
||||||
|
#[arg(long, default_value_t = 3)]
|
||||||
|
pub replicas: i32,
|
||||||
|
#[arg(long, default_value = "default")]
|
||||||
|
pub namespace: String,
|
||||||
|
#[arg(long)]
|
||||||
|
pub output_dir: Option<String>,
|
||||||
|
#[arg(long)]
|
||||||
|
pub keep_deployment: bool,
|
||||||
|
#[arg(long)]
|
||||||
|
pub deploy_only: bool,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn deploy(args: &RedpandaArgs) -> io::Result<()> {
|
||||||
|
println!(
|
||||||
|
"[deploy] Creating StatefulSet '{}' in namespace '{}' ({} replicas, {} PVCs on {})",
|
||||||
|
NAME, args.namespace, args.replicas, args.pvc_size, args.storage_class
|
||||||
|
);
|
||||||
|
let ss = k8s::redpanda_statefulset(
|
||||||
|
NAME,
|
||||||
|
&args.namespace,
|
||||||
|
args.replicas,
|
||||||
|
&args.storage_class,
|
||||||
|
&args.pvc_size,
|
||||||
|
IMAGE,
|
||||||
|
);
|
||||||
|
let yaml = serde_yaml::to_string(&ss)
|
||||||
|
.map_err(|e| io::Error::other(format!("YAML serialization failed: {}", e)))?;
|
||||||
|
k8s::apply_yaml(&yaml)?;
|
||||||
|
println!(
|
||||||
|
"[deploy] StatefulSet applied, waiting for {} pods to be ready (timeout 300s)...",
|
||||||
|
args.replicas
|
||||||
|
);
|
||||||
|
k8s::wait_for_pods(&args.namespace, NAME)?;
|
||||||
|
let pods = k8s::get_pod_names(&args.namespace, NAME)?;
|
||||||
|
println!("[deploy] All pods ready: {}", pods.join(", "));
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn undeploy(args: &RedpandaArgs) -> io::Result<()> {
|
||||||
|
println!(
|
||||||
|
"[cleanup] Deleting StatefulSet and PVCs with label app={}...",
|
||||||
|
NAME
|
||||||
|
);
|
||||||
|
k8s::delete_resources(&args.namespace, NAME)?;
|
||||||
|
println!("[cleanup] Delete issued (async). Resources will be fully removed shortly.");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn run(args: &RedpandaArgs) -> io::Result<()> {
|
||||||
|
if args.deploy_only {
|
||||||
|
return deploy(args);
|
||||||
|
}
|
||||||
|
|
||||||
|
let output_dir = args.output_dir.clone().unwrap_or_else(|| {
|
||||||
|
format!(
|
||||||
|
"./iobench-redpanda-{}",
|
||||||
|
Local::now().format("%Y-%m-%d-%H%M%S")
|
||||||
|
)
|
||||||
|
});
|
||||||
|
fs::create_dir_all(&output_dir)?;
|
||||||
|
|
||||||
|
let pod_names = match k8s::get_pod_names(&args.namespace, NAME) {
|
||||||
|
Ok(pods) if pods.len() >= args.replicas as usize => {
|
||||||
|
println!("[setup] Found existing pods: {}", pods.join(", "));
|
||||||
|
pods
|
||||||
|
}
|
||||||
|
_ => {
|
||||||
|
deploy(args)?;
|
||||||
|
k8s::get_pod_names(&args.namespace, NAME)?
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
if pod_names.len() < args.replicas as usize {
|
||||||
|
return Err(io::Error::other(format!(
|
||||||
|
"Expected {} pods, found {}",
|
||||||
|
args.replicas,
|
||||||
|
pod_names.len()
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
|
||||||
|
let modes: Vec<&str> = match args.mode.as_str() {
|
||||||
|
"sequential" => vec!["sequential"],
|
||||||
|
"parallel" => vec!["parallel"],
|
||||||
|
"both" => vec!["sequential", "parallel"],
|
||||||
|
other => {
|
||||||
|
return Err(io::Error::other(format!(
|
||||||
|
"Unknown mode: {}. Use sequential, parallel, or both.",
|
||||||
|
other
|
||||||
|
)));
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let wl = workloads();
|
||||||
|
let per_mode_secs = total_runtime_secs(&wl);
|
||||||
|
let total_secs = per_mode_secs * modes.len() as u64;
|
||||||
|
|
||||||
|
println!();
|
||||||
|
println!("============================================================");
|
||||||
|
println!(" Redpanda I/O characterization profile");
|
||||||
|
println!(
|
||||||
|
" Modes: {:?} | Workloads: {} | Pods: {}",
|
||||||
|
modes,
|
||||||
|
wl.len(),
|
||||||
|
pod_names.len()
|
||||||
|
);
|
||||||
|
println!(" Estimated total runtime: {}", fmt_duration(total_secs));
|
||||||
|
println!(" Output: {}", output_dir);
|
||||||
|
println!("============================================================");
|
||||||
|
|
||||||
|
let run_start = Instant::now();
|
||||||
|
let mut all_results: Vec<RedpandaCsvRow> = Vec::new();
|
||||||
|
|
||||||
|
for (mode_idx, mode) in modes.iter().enumerate() {
|
||||||
|
println!();
|
||||||
|
if *mode == "parallel" {
|
||||||
|
println!("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
|
||||||
|
println!(" WARNING: Parallel mode — intensive fdatasync across all");
|
||||||
|
println!(" nodes simultaneously. Will impact other Ceph pool users.");
|
||||||
|
println!("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
|
||||||
|
}
|
||||||
|
println!(
|
||||||
|
"=== Mode: {} ({}/{}) — {} per-mode est. ===",
|
||||||
|
mode,
|
||||||
|
mode_idx + 1,
|
||||||
|
modes.len(),
|
||||||
|
fmt_duration(per_mode_secs)
|
||||||
|
);
|
||||||
|
|
||||||
|
for (wl_idx, workload) in wl.iter().enumerate() {
|
||||||
|
let remaining_in_mode: u64 = wl[wl_idx..]
|
||||||
|
.iter()
|
||||||
|
.map(|w| BARRIER_SECS + w.runtime_secs)
|
||||||
|
.sum();
|
||||||
|
|
||||||
|
println!();
|
||||||
|
println!(
|
||||||
|
"--- [{}/{}] {} ({}s barrier + {}s fio = {}) ---",
|
||||||
|
wl_idx + 1,
|
||||||
|
wl.len(),
|
||||||
|
workload.name,
|
||||||
|
BARRIER_SECS,
|
||||||
|
workload.runtime_secs,
|
||||||
|
fmt_duration(BARRIER_SECS + workload.runtime_secs)
|
||||||
|
);
|
||||||
|
println!(" {}", workload.description);
|
||||||
|
println!(
|
||||||
|
" Remaining in this mode: {} | Total elapsed: {}",
|
||||||
|
fmt_duration(remaining_in_mode),
|
||||||
|
fmt_duration(run_start.elapsed().as_secs())
|
||||||
|
);
|
||||||
|
|
||||||
|
let mode_dir = format!("{}/{}", output_dir, mode);
|
||||||
|
fs::create_dir_all(&mode_dir)?;
|
||||||
|
|
||||||
|
let start_at = now_epoch() + BARRIER_SECS;
|
||||||
|
|
||||||
|
let results = if *mode == "sequential" {
|
||||||
|
let pod = &pod_names[0];
|
||||||
|
println!(" Running on: {}", pod);
|
||||||
|
let result = run_workload_on_pod(
|
||||||
|
&args.namespace,
|
||||||
|
pod,
|
||||||
|
workload.name,
|
||||||
|
workload.template,
|
||||||
|
workload.runtime_secs,
|
||||||
|
start_at,
|
||||||
|
&mode_dir,
|
||||||
|
)?;
|
||||||
|
vec![(pod.clone(), result)]
|
||||||
|
} else {
|
||||||
|
println!(
|
||||||
|
" Running on {} pods: {}",
|
||||||
|
pod_names.len(),
|
||||||
|
pod_names.join(", ")
|
||||||
|
);
|
||||||
|
let mut handles = Vec::new();
|
||||||
|
for pod in &pod_names {
|
||||||
|
let ns = args.namespace.clone();
|
||||||
|
let pod_for_thread = pod.clone();
|
||||||
|
let wl_name = workload.name.to_string();
|
||||||
|
let template = workload.template.to_string();
|
||||||
|
let dir = mode_dir.clone();
|
||||||
|
let rt = workload.runtime_secs;
|
||||||
|
let handle = thread::spawn(move || {
|
||||||
|
run_workload_on_pod(
|
||||||
|
&ns,
|
||||||
|
&pod_for_thread,
|
||||||
|
&wl_name,
|
||||||
|
&template,
|
||||||
|
rt,
|
||||||
|
start_at,
|
||||||
|
&dir,
|
||||||
|
)
|
||||||
|
});
|
||||||
|
handles.push((pod.clone(), handle));
|
||||||
|
}
|
||||||
|
let mut collected = Vec::new();
|
||||||
|
for (pod, handle) in handles {
|
||||||
|
match handle.join() {
|
||||||
|
Ok(Ok(result)) => collected.push((pod, result)),
|
||||||
|
Ok(Err(e)) => {
|
||||||
|
eprintln!(" ERROR on pod {}: {}", pod, e);
|
||||||
|
}
|
||||||
|
Err(_) => {
|
||||||
|
eprintln!(" PANIC on pod {}", pod);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
collected
|
||||||
|
};
|
||||||
|
|
||||||
|
println!(
|
||||||
|
" {} complete ({} elapsed total)",
|
||||||
|
workload.name,
|
||||||
|
fmt_duration(run_start.elapsed().as_secs())
|
||||||
|
);
|
||||||
|
|
||||||
|
for (pod, raw_json) in &results {
|
||||||
|
let pod_slug = pod.replace(NAME, "node");
|
||||||
|
let json_path = format!("{}/{}_{}.json", mode_dir, workload.name, pod_slug);
|
||||||
|
fs::write(&json_path, raw_json)?;
|
||||||
|
|
||||||
|
let result = parse_fio_output(raw_json, workload.name, "write").map_err(|e| {
|
||||||
|
io::Error::other(format!("{} on {}: {}", workload.name, pod, e))
|
||||||
|
})?;
|
||||||
|
|
||||||
|
let sync = parse_sync_latencies(raw_json, workload.name);
|
||||||
|
|
||||||
|
// Print per-pod summary inline
|
||||||
|
let bw_mib = result.bandwidth_kibps / 1024.0;
|
||||||
|
if sync.total_ios > 0 {
|
||||||
|
println!(
|
||||||
|
" {} => {:.0} IOPS | {:.1} MiB/s | clat_p99={:.2}ms | fsync_p99.9={:.2}ms",
|
||||||
|
pod_slug, result.iops, bw_mib, result.clat_p99_ms, sync.p99_9_ms
|
||||||
|
);
|
||||||
|
} else {
|
||||||
|
println!(
|
||||||
|
" {} => {:.0} IOPS | {:.1} MiB/s | clat_p99={:.2}ms",
|
||||||
|
pod_slug, result.iops, bw_mib, result.clat_p99_ms
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
all_results.push(RedpandaCsvRow {
|
||||||
|
label: "redpanda".to_string(),
|
||||||
|
mode: mode.to_string(),
|
||||||
|
workload: workload.name.to_string(),
|
||||||
|
node: pod.clone(),
|
||||||
|
test_name: result.test_name.clone(),
|
||||||
|
iops: result.iops,
|
||||||
|
bandwidth_kibps: result.bandwidth_kibps,
|
||||||
|
latency_mean_ms: result.latency_mean_ms,
|
||||||
|
latency_stddev_ms: result.latency_stddev_ms,
|
||||||
|
clat_p50_ms: result.clat_p50_ms,
|
||||||
|
clat_p95_ms: result.clat_p95_ms,
|
||||||
|
clat_p99_ms: result.clat_p99_ms,
|
||||||
|
clat_p99_9_ms: result.clat_p99_9_ms,
|
||||||
|
clat_p99_99_ms: result.clat_p99_99_ms,
|
||||||
|
clat_max_ms: result.clat_max_ms,
|
||||||
|
fdatasync_total_ios: sync.total_ios,
|
||||||
|
fdatasync_mean_ms: sync.mean_ms,
|
||||||
|
fdatasync_p50_ms: sync.p50_ms,
|
||||||
|
fdatasync_p95_ms: sync.p95_ms,
|
||||||
|
fdatasync_p99_ms: sync.p99_ms,
|
||||||
|
fdatasync_p99_9_ms: sync.p99_9_ms,
|
||||||
|
fdatasync_p99_99_ms: sync.p99_99_ms,
|
||||||
|
fdatasync_max_ms: sync.max_ms,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
println!();
|
||||||
|
println!("============================================================");
|
||||||
|
println!(
|
||||||
|
" All workloads complete. Total runtime: {}",
|
||||||
|
fmt_duration(run_start.elapsed().as_secs())
|
||||||
|
);
|
||||||
|
println!("============================================================");
|
||||||
|
|
||||||
|
let csv_path = format!("{}/redpanda_summary.csv", output_dir);
|
||||||
|
let mut wtr = csv::Writer::from_path(&csv_path)?;
|
||||||
|
for row in &all_results {
|
||||||
|
wtr.serialize(row)?;
|
||||||
|
}
|
||||||
|
wtr.flush()?;
|
||||||
|
println!("[output] Redpanda summary: {}", csv_path);
|
||||||
|
|
||||||
|
let dash_path = format!("{}/iobench.csv", output_dir);
|
||||||
|
let mut wtr2 = csv::Writer::from_path(&dash_path)?;
|
||||||
|
for row in &all_results {
|
||||||
|
wtr2.serialize(DashRow::from(row))?;
|
||||||
|
}
|
||||||
|
wtr2.flush()?;
|
||||||
|
println!("[output] Dashboard CSV: {}", dash_path);
|
||||||
|
|
||||||
|
if let Some(worst) = all_results
|
||||||
|
.iter()
|
||||||
|
.filter(|r| r.mode == "parallel" && r.workload == "fsync_hot_path")
|
||||||
|
.max_by(|a, b| {
|
||||||
|
a.fdatasync_p99_9_ms
|
||||||
|
.partial_cmp(&b.fdatasync_p99_9_ms)
|
||||||
|
.unwrap_or(std::cmp::Ordering::Equal)
|
||||||
|
})
|
||||||
|
{
|
||||||
|
println!();
|
||||||
|
println!("=== HEADLINE ===");
|
||||||
|
println!(
|
||||||
|
"Worst p99.9 fdatasync latency (parallel fsync_hot_path): {:.3} ms on {}",
|
||||||
|
worst.fdatasync_p99_9_ms, worst.node
|
||||||
|
);
|
||||||
|
if worst.fdatasync_p99_9_ms > 100.0 {
|
||||||
|
println!("RESULT: FAIL (>100 ms). Raft heartbeats may not survive load spikes.");
|
||||||
|
} else {
|
||||||
|
println!("RESULT: PASS (<=100 ms).");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if !args.keep_deployment {
|
||||||
|
println!();
|
||||||
|
undeploy(args)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn now_epoch() -> u64 {
|
||||||
|
SystemTime::now()
|
||||||
|
.duration_since(UNIX_EPOCH)
|
||||||
|
.unwrap()
|
||||||
|
.as_secs()
|
||||||
|
}
|
||||||
|
|
||||||
|
fn run_workload_on_pod(
|
||||||
|
namespace: &str,
|
||||||
|
pod: &str,
|
||||||
|
workload_name: &str,
|
||||||
|
job_template: &str,
|
||||||
|
runtime_secs: u64,
|
||||||
|
start_at: u64,
|
||||||
|
local_mode_dir: &str,
|
||||||
|
) -> io::Result<String> {
|
||||||
|
let pod_slug = pod.replace(NAME, "node");
|
||||||
|
let remote_job_path = format!("/data/results/{}.fio", workload_name);
|
||||||
|
let remote_out_path = format!("/data/results/{}.json", workload_name);
|
||||||
|
let remote_results_dir = "/data/results";
|
||||||
|
|
||||||
|
let write_job_cmd = format!(
|
||||||
|
"mkdir -p {} && cat > {} <<'EOF'\n{}\nEOF",
|
||||||
|
remote_results_dir, remote_job_path, job_template
|
||||||
|
);
|
||||||
|
k8s::exec(namespace, pod, &write_job_cmd)?;
|
||||||
|
|
||||||
|
let fio_cmd = format!(
|
||||||
|
"NOW=$(date +%s); if [ $NOW -lt {start_at} ]; then sleep $(({start_at} - NOW)); fi; fio {remote_job_path} --output={remote_out_path} --output-format=json --write_lat_log={remote_results_dir}/{workload_name}_lat --write_iops_log={remote_results_dir}/{workload_name}_iops",
|
||||||
|
start_at = start_at,
|
||||||
|
remote_job_path = remote_job_path,
|
||||||
|
remote_out_path = remote_out_path,
|
||||||
|
remote_results_dir = remote_results_dir,
|
||||||
|
workload_name = workload_name,
|
||||||
|
);
|
||||||
|
|
||||||
|
// Progress ticker — prints every 10s until fio exec returns
|
||||||
|
let done = Arc::new(AtomicBool::new(false));
|
||||||
|
let done_clone = done.clone();
|
||||||
|
let ticker_label = pod_slug.clone();
|
||||||
|
let wl = workload_name.to_string();
|
||||||
|
let total_expected = BARRIER_SECS + runtime_secs;
|
||||||
|
let progress_handle = thread::spawn(move || {
|
||||||
|
let start = Instant::now();
|
||||||
|
loop {
|
||||||
|
thread::sleep(Duration::from_secs(10));
|
||||||
|
if done_clone.load(Ordering::Acquire) {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
let elapsed = start.elapsed().as_secs();
|
||||||
|
let phase = if elapsed < BARRIER_SECS {
|
||||||
|
format!("barrier sync {}s/{}s", elapsed, BARRIER_SECS)
|
||||||
|
} else {
|
||||||
|
let fio_elapsed = elapsed - BARRIER_SECS;
|
||||||
|
format!("fio running {}s/{}s", fio_elapsed, runtime_secs)
|
||||||
|
};
|
||||||
|
let remaining = total_expected.saturating_sub(elapsed);
|
||||||
|
eprintln!(
|
||||||
|
" [{}:{}] {} ({} remaining)",
|
||||||
|
ticker_label,
|
||||||
|
wl,
|
||||||
|
phase,
|
||||||
|
fmt_duration(remaining)
|
||||||
|
);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
let output = k8s::exec(namespace, pod, &fio_cmd);
|
||||||
|
|
||||||
|
done.store(true, Ordering::Release);
|
||||||
|
let _ = progress_handle.join();
|
||||||
|
|
||||||
|
let output = output?;
|
||||||
|
|
||||||
|
let local_pod_dir = format!("{}/{}", local_mode_dir, pod_slug);
|
||||||
|
fs::create_dir_all(&local_pod_dir)?;
|
||||||
|
eprintln!(" [{}] Copying results from pod...", pod_slug);
|
||||||
|
if let Err(e) = k8s::cp_from(namespace, pod, remote_results_dir, &local_pod_dir) {
|
||||||
|
eprintln!(" [{}] Warning: failed to copy results: {}", pod_slug, e);
|
||||||
|
}
|
||||||
|
|
||||||
|
let local_json = format!("{}/{}.json", local_pod_dir, workload_name);
|
||||||
|
if Path::new(&local_json).exists() {
|
||||||
|
fs::read_to_string(&local_json)
|
||||||
|
} else {
|
||||||
|
Ok(output)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, Clone, Default)]
|
||||||
|
struct SyncLatencies {
|
||||||
|
total_ios: u64,
|
||||||
|
mean_ms: f64,
|
||||||
|
p50_ms: f64,
|
||||||
|
p95_ms: f64,
|
||||||
|
p99_ms: f64,
|
||||||
|
p99_9_ms: f64,
|
||||||
|
p99_99_ms: f64,
|
||||||
|
max_ms: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
fn parse_sync_latencies(json: &str, workload_name: &str) -> SyncLatencies {
|
||||||
|
let fio_data: FioOutput = match serde_json::from_str(json) {
|
||||||
|
Ok(d) => d,
|
||||||
|
Err(_) => return SyncLatencies::default(),
|
||||||
|
};
|
||||||
|
let job = match fio_data.jobs.iter().find(|j| j.jobname == workload_name) {
|
||||||
|
Some(j) => j,
|
||||||
|
None => return SyncLatencies::default(),
|
||||||
|
};
|
||||||
|
let sync = match &job.sync {
|
||||||
|
Some(s) => s,
|
||||||
|
None => return SyncLatencies::default(),
|
||||||
|
};
|
||||||
|
|
||||||
|
let p = sync.lat_ns.percentile.as_ref();
|
||||||
|
SyncLatencies {
|
||||||
|
total_ios: sync.total_ios,
|
||||||
|
mean_ms: sync.lat_ns.mean / 1_000_000.0,
|
||||||
|
p50_ms: get_percentile(p, "50.000000"),
|
||||||
|
p95_ms: get_percentile(p, "95.000000"),
|
||||||
|
p99_ms: get_percentile(p, "99.000000"),
|
||||||
|
p99_9_ms: get_percentile(p, "99.900000"),
|
||||||
|
p99_99_ms: get_percentile(p, "99.990000"),
|
||||||
|
max_ms: get_percentile(p, "100.000000"),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn get_percentile(p: Option<&std::collections::HashMap<String, f64>>, key: &str) -> f64 {
|
||||||
|
p.and_then(|m| m.get(key).copied()).unwrap_or(0.0) / 1_000_000.0
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Serialize)]
|
||||||
|
struct RedpandaCsvRow {
|
||||||
|
label: String,
|
||||||
|
mode: String,
|
||||||
|
workload: String,
|
||||||
|
node: String,
|
||||||
|
test_name: String,
|
||||||
|
iops: f64,
|
||||||
|
bandwidth_kibps: f64,
|
||||||
|
latency_mean_ms: f64,
|
||||||
|
latency_stddev_ms: f64,
|
||||||
|
clat_p50_ms: f64,
|
||||||
|
clat_p95_ms: f64,
|
||||||
|
clat_p99_ms: f64,
|
||||||
|
clat_p99_9_ms: f64,
|
||||||
|
clat_p99_99_ms: f64,
|
||||||
|
clat_max_ms: f64,
|
||||||
|
fdatasync_total_ios: u64,
|
||||||
|
fdatasync_mean_ms: f64,
|
||||||
|
fdatasync_p50_ms: f64,
|
||||||
|
fdatasync_p95_ms: f64,
|
||||||
|
fdatasync_p99_ms: f64,
|
||||||
|
fdatasync_p99_9_ms: f64,
|
||||||
|
fdatasync_p99_99_ms: f64,
|
||||||
|
fdatasync_max_ms: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Debug, serde::Serialize)]
|
||||||
|
struct DashRow {
|
||||||
|
label: String,
|
||||||
|
test_name: String,
|
||||||
|
iops: f64,
|
||||||
|
bandwidth_kibps: f64,
|
||||||
|
latency_mean_ms: f64,
|
||||||
|
latency_stddev_ms: f64,
|
||||||
|
fdatasync_p99_9_ms: f64,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl From<&RedpandaCsvRow> for DashRow {
|
||||||
|
fn from(row: &RedpandaCsvRow) -> Self {
|
||||||
|
DashRow {
|
||||||
|
label: format!("{}-{}-{}", row.label, row.mode, row.workload),
|
||||||
|
test_name: row.test_name.clone(),
|
||||||
|
iops: row.iops,
|
||||||
|
bandwidth_kibps: row.bandwidth_kibps,
|
||||||
|
latency_mean_ms: row.latency_mean_ms,
|
||||||
|
latency_stddev_ms: row.latency_stddev_ms,
|
||||||
|
fdatasync_p99_9_ms: row.fdatasync_p99_9_ms,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
// -- fmt_duration --
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn fmt_duration_seconds_only() {
|
||||||
|
assert_eq!(fmt_duration(0), "0s");
|
||||||
|
assert_eq!(fmt_duration(1), "1s");
|
||||||
|
assert_eq!(fmt_duration(59), "59s");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn fmt_duration_minutes_and_seconds() {
|
||||||
|
assert_eq!(fmt_duration(60), "1m00s");
|
||||||
|
assert_eq!(fmt_duration(61), "1m01s");
|
||||||
|
assert_eq!(fmt_duration(125), "2m05s");
|
||||||
|
assert_eq!(fmt_duration(3599), "59m59s");
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn fmt_duration_hours() {
|
||||||
|
assert_eq!(fmt_duration(3600), "1h00m");
|
||||||
|
assert_eq!(fmt_duration(3660), "1h01m");
|
||||||
|
assert_eq!(fmt_duration(7200), "2h00m");
|
||||||
|
assert_eq!(fmt_duration(5432), "1h30m");
|
||||||
|
}
|
||||||
|
|
||||||
|
// -- total_runtime_secs --
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn total_runtime_secs_matches_workload_sum() {
|
||||||
|
let wl = workloads();
|
||||||
|
let expected: u64 = wl.iter().map(|w| BARRIER_SECS + w.runtime_secs).sum();
|
||||||
|
assert_eq!(total_runtime_secs(&wl), expected);
|
||||||
|
// 4 workloads: (60+300) + (60+600) + (60+120) + (60+120) = 1380
|
||||||
|
assert_eq!(expected, 1380);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -- Workload runtime consistency --
|
||||||
|
// Verify that each Workload.runtime_secs matches the runtime= value
|
||||||
|
// in its fio template. Prevents the two sources of truth from diverging.
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn workload_runtime_matches_fio_template() {
|
||||||
|
for w in workloads() {
|
||||||
|
let runtime_from_template = w
|
||||||
|
.template
|
||||||
|
.lines()
|
||||||
|
.find_map(|line| {
|
||||||
|
let trimmed = line.trim();
|
||||||
|
trimmed
|
||||||
|
.strip_prefix("runtime=")
|
||||||
|
.and_then(|v| v.parse::<u64>().ok())
|
||||||
|
})
|
||||||
|
.unwrap_or_else(|| {
|
||||||
|
panic!("Could not find runtime= in fio template for '{}'", w.name)
|
||||||
|
});
|
||||||
|
assert_eq!(
|
||||||
|
w.runtime_secs, runtime_from_template,
|
||||||
|
"Workload '{}': struct says {}s but template says runtime={}",
|
||||||
|
w.name, w.runtime_secs, runtime_from_template
|
||||||
|
);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// -- parse_sync_latencies --
|
||||||
|
|
||||||
|
fn sample_fio_json_with_sync() -> String {
|
||||||
|
serde_json::json!({
|
||||||
|
"jobs": [{
|
||||||
|
"jobname": "fsync_hot_path",
|
||||||
|
"read": {
|
||||||
|
"bw": 0.0, "iops": 0.0,
|
||||||
|
"clat_ns": { "mean": 0.0, "stddev": 0.0 }
|
||||||
|
},
|
||||||
|
"write": {
|
||||||
|
"bw": 51200.0, "iops": 3200.0,
|
||||||
|
"clat_ns": {
|
||||||
|
"mean": 5_000_000.0, "stddev": 1_000_000.0,
|
||||||
|
"percentile": {
|
||||||
|
"50.000000": 4_000_000.0,
|
||||||
|
"95.000000": 8_000_000.0,
|
||||||
|
"99.000000": 12_000_000.0,
|
||||||
|
"99.900000": 20_000_000.0,
|
||||||
|
"99.990000": 30_000_000.0,
|
||||||
|
"100.000000": 50_000_000.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"sync": {
|
||||||
|
"total_ios": 50000_u64,
|
||||||
|
"lat_ns": {
|
||||||
|
"mean": 2_000_000.0, "stddev": 500_000.0,
|
||||||
|
"percentile": {
|
||||||
|
"50.000000": 1_500_000.0,
|
||||||
|
"95.000000": 3_000_000.0,
|
||||||
|
"99.000000": 5_000_000.0,
|
||||||
|
"99.900000": 10_000_000.0,
|
||||||
|
"99.990000": 20_000_000.0,
|
||||||
|
"100.000000": 50_000_000.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
.to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_sync_latencies_extracts_correctly() {
|
||||||
|
let json = sample_fio_json_with_sync();
|
||||||
|
let sync = parse_sync_latencies(&json, "fsync_hot_path");
|
||||||
|
|
||||||
|
assert_eq!(sync.total_ios, 50000);
|
||||||
|
assert!((sync.mean_ms - 2.0).abs() < 0.001);
|
||||||
|
assert!((sync.p50_ms - 1.5).abs() < 0.001);
|
||||||
|
assert!((sync.p95_ms - 3.0).abs() < 0.001);
|
||||||
|
assert!((sync.p99_ms - 5.0).abs() < 0.001);
|
||||||
|
assert!((sync.p99_9_ms - 10.0).abs() < 0.001);
|
||||||
|
assert!((sync.p99_99_ms - 20.0).abs() < 0.001);
|
||||||
|
assert!((sync.max_ms - 50.0).abs() < 0.001);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_sync_latencies_wrong_job_returns_default() {
|
||||||
|
let json = sample_fio_json_with_sync();
|
||||||
|
let sync = parse_sync_latencies(&json, "nonexistent");
|
||||||
|
assert_eq!(sync.total_ios, 0);
|
||||||
|
assert_eq!(sync.mean_ms, 0.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_sync_latencies_invalid_json_returns_default() {
|
||||||
|
let sync = parse_sync_latencies("not json", "fsync_hot_path");
|
||||||
|
assert_eq!(sync.total_ios, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn sample_fio_json_no_sync() -> String {
|
||||||
|
serde_json::json!({
|
||||||
|
"jobs": [{
|
||||||
|
"jobname": "throughput",
|
||||||
|
"read": {
|
||||||
|
"bw": 0.0, "iops": 0.0,
|
||||||
|
"clat_ns": { "mean": 0.0, "stddev": 0.0 }
|
||||||
|
},
|
||||||
|
"write": {
|
||||||
|
"bw": 51200.0, "iops": 3200.0,
|
||||||
|
"clat_ns": {
|
||||||
|
"mean": 5_000_000.0, "stddev": 1_000_000.0,
|
||||||
|
"percentile": {
|
||||||
|
"50.000000": 4_000_000.0,
|
||||||
|
"95.000000": 8_000_000.0,
|
||||||
|
"99.000000": 12_000_000.0,
|
||||||
|
"99.900000": 20_000_000.0,
|
||||||
|
"99.990000": 30_000_000.0,
|
||||||
|
"100.000000": 50_000_000.0
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}]
|
||||||
|
})
|
||||||
|
.to_string()
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_sync_latencies_no_sync_field_returns_default() {
|
||||||
|
let json = sample_fio_json_no_sync();
|
||||||
|
let sync = parse_sync_latencies(&json, "throughput");
|
||||||
|
assert_eq!(sync.total_ios, 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -- get_percentile --
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn get_percentile_extracts_and_converts_ns_to_ms() {
|
||||||
|
let mut m = HashMap::new();
|
||||||
|
m.insert("99.000000".to_string(), 12_000_000.0);
|
||||||
|
let result = get_percentile(Some(&m), "99.000000");
|
||||||
|
assert!((result - 12.0).abs() < 0.001);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn get_percentile_missing_key_returns_zero() {
|
||||||
|
let m = HashMap::new();
|
||||||
|
assert_eq!(get_percentile(Some(&m), "99.000000"), 0.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn get_percentile_none_map_returns_zero() {
|
||||||
|
assert_eq!(get_percentile(None, "99.000000"), 0.0);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -- parse_fio_output integration --
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_fio_output_with_sync_workload() {
|
||||||
|
let json = sample_fio_json_with_sync();
|
||||||
|
let result = parse_fio_output(&json, "fsync_hot_path", "write").unwrap();
|
||||||
|
|
||||||
|
assert!((result.iops - 3200.0).abs() < 0.001);
|
||||||
|
assert!((result.bandwidth_kibps - 51200.0).abs() < 0.001);
|
||||||
|
assert!((result.latency_mean_ms - 5.0).abs() < 0.001);
|
||||||
|
assert!((result.clat_p50_ms - 4.0).abs() < 0.001);
|
||||||
|
assert!((result.clat_p99_ms - 12.0).abs() < 0.001);
|
||||||
|
assert!((result.clat_p99_9_ms - 20.0).abs() < 0.001);
|
||||||
|
assert!((result.clat_max_ms - 50.0).abs() < 0.001);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_fio_output_no_sync_workload() {
|
||||||
|
let json = sample_fio_json_no_sync();
|
||||||
|
let result = parse_fio_output(&json, "throughput", "write").unwrap();
|
||||||
|
|
||||||
|
assert!((result.iops - 3200.0).abs() < 0.001);
|
||||||
|
assert!((result.clat_p99_ms - 12.0).abs() < 0.001);
|
||||||
|
}
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn parse_fio_output_wrong_job_name_errors() {
|
||||||
|
let json = sample_fio_json_no_sync();
|
||||||
|
let result = parse_fio_output(&json, "nonexistent", "write");
|
||||||
|
assert!(result.is_err());
|
||||||
|
assert!(result.unwrap_err().contains("nonexistent"));
|
||||||
|
}
|
||||||
|
|
||||||
|
// -- DashRow label formatting --
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn dash_row_label_format() {
|
||||||
|
let row = RedpandaCsvRow {
|
||||||
|
label: "redpanda".to_string(),
|
||||||
|
mode: "parallel".to_string(),
|
||||||
|
workload: "fsync_hot_path".to_string(),
|
||||||
|
node: "iobench-redpanda-0".to_string(),
|
||||||
|
test_name: "fsync_hot_path".to_string(),
|
||||||
|
iops: 1000.0,
|
||||||
|
bandwidth_kibps: 16000.0,
|
||||||
|
latency_mean_ms: 5.0,
|
||||||
|
latency_stddev_ms: 1.0,
|
||||||
|
clat_p50_ms: 4.0,
|
||||||
|
clat_p95_ms: 8.0,
|
||||||
|
clat_p99_ms: 12.0,
|
||||||
|
clat_p99_9_ms: 20.0,
|
||||||
|
clat_p99_99_ms: 30.0,
|
||||||
|
clat_max_ms: 50.0,
|
||||||
|
fdatasync_total_ios: 50000,
|
||||||
|
fdatasync_mean_ms: 2.0,
|
||||||
|
fdatasync_p50_ms: 1.5,
|
||||||
|
fdatasync_p95_ms: 3.0,
|
||||||
|
fdatasync_p99_ms: 5.0,
|
||||||
|
fdatasync_p99_9_ms: 10.0,
|
||||||
|
fdatasync_p99_99_ms: 20.0,
|
||||||
|
fdatasync_max_ms: 50.0,
|
||||||
|
};
|
||||||
|
let dash = DashRow::from(&row);
|
||||||
|
assert_eq!(dash.label, "redpanda-parallel-fsync_hot_path");
|
||||||
|
assert!((dash.fdatasync_p99_9_ms - 10.0).abs() < 0.001);
|
||||||
|
}
|
||||||
|
|
||||||
|
// -- CSV serialization round-trip --
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn redpanda_csv_row_serializes_all_columns() {
|
||||||
|
let row = RedpandaCsvRow {
|
||||||
|
label: "redpanda".to_string(),
|
||||||
|
mode: "sequential".to_string(),
|
||||||
|
workload: "throughput".to_string(),
|
||||||
|
node: "node-0".to_string(),
|
||||||
|
test_name: "throughput".to_string(),
|
||||||
|
iops: 3200.0,
|
||||||
|
bandwidth_kibps: 51200.0,
|
||||||
|
latency_mean_ms: 5.0,
|
||||||
|
latency_stddev_ms: 1.0,
|
||||||
|
clat_p50_ms: 4.0,
|
||||||
|
clat_p95_ms: 8.0,
|
||||||
|
clat_p99_ms: 12.0,
|
||||||
|
clat_p99_9_ms: 20.0,
|
||||||
|
clat_p99_99_ms: 30.0,
|
||||||
|
clat_max_ms: 50.0,
|
||||||
|
fdatasync_total_ios: 0,
|
||||||
|
fdatasync_mean_ms: 0.0,
|
||||||
|
fdatasync_p50_ms: 0.0,
|
||||||
|
fdatasync_p95_ms: 0.0,
|
||||||
|
fdatasync_p99_ms: 0.0,
|
||||||
|
fdatasync_p99_9_ms: 0.0,
|
||||||
|
fdatasync_p99_99_ms: 0.0,
|
||||||
|
fdatasync_max_ms: 0.0,
|
||||||
|
};
|
||||||
|
let mut wtr = csv::Writer::from_writer(Vec::new());
|
||||||
|
wtr.serialize(&row).unwrap();
|
||||||
|
wtr.flush().unwrap();
|
||||||
|
let csv_output = String::from_utf8(wtr.into_inner().unwrap()).unwrap();
|
||||||
|
let lines: Vec<&str> = csv_output.lines().collect();
|
||||||
|
|
||||||
|
// Header line must contain all expected column names
|
||||||
|
let header = lines[0];
|
||||||
|
for col in [
|
||||||
|
"label",
|
||||||
|
"mode",
|
||||||
|
"workload",
|
||||||
|
"node",
|
||||||
|
"test_name",
|
||||||
|
"iops",
|
||||||
|
"bandwidth_kibps",
|
||||||
|
"latency_mean_ms",
|
||||||
|
"latency_stddev_ms",
|
||||||
|
"clat_p50_ms",
|
||||||
|
"clat_p95_ms",
|
||||||
|
"clat_p99_ms",
|
||||||
|
"clat_p99_9_ms",
|
||||||
|
"clat_p99_99_ms",
|
||||||
|
"clat_max_ms",
|
||||||
|
"fdatasync_total_ios",
|
||||||
|
"fdatasync_mean_ms",
|
||||||
|
"fdatasync_p50_ms",
|
||||||
|
"fdatasync_p95_ms",
|
||||||
|
"fdatasync_p99_ms",
|
||||||
|
"fdatasync_p99_9_ms",
|
||||||
|
"fdatasync_p99_99_ms",
|
||||||
|
"fdatasync_max_ms",
|
||||||
|
] {
|
||||||
|
assert!(header.contains(col), "CSV header missing column: {}", col);
|
||||||
|
}
|
||||||
|
|
||||||
|
// Data line should have the right number of fields
|
||||||
|
let data_fields: Vec<&str> = lines[1].split(',').collect();
|
||||||
|
let header_fields: Vec<&str> = header.split(',').collect();
|
||||||
|
assert_eq!(data_fields.len(), header_fields.len());
|
||||||
|
}
|
||||||
|
}
|
||||||
123
iobench/src/simple.rs
Normal file
123
iobench/src/simple.rs
Normal file
@@ -0,0 +1,123 @@
|
|||||||
|
use std::fs;
|
||||||
|
use std::io;
|
||||||
|
use std::thread;
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use chrono::Local;
|
||||||
|
|
||||||
|
use crate::fio::parse_fio_output;
|
||||||
|
use crate::run_command;
|
||||||
|
|
||||||
|
#[derive(Debug, Clone)]
|
||||||
|
pub struct SimpleArgs {
|
||||||
|
pub target: String,
|
||||||
|
pub benchmark_dir: String,
|
||||||
|
pub tests: String,
|
||||||
|
pub duration: u64,
|
||||||
|
pub output_dir: Option<String>,
|
||||||
|
pub size: String,
|
||||||
|
pub block_size: String,
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn run(args: &SimpleArgs) -> io::Result<()> {
|
||||||
|
let output_dir = args
|
||||||
|
.output_dir
|
||||||
|
.clone()
|
||||||
|
.unwrap_or_else(|| format!("./iobench-{}", Local::now().format("%Y-%m-%d-%H%M%S")));
|
||||||
|
fs::create_dir_all(&output_dir)?;
|
||||||
|
|
||||||
|
let tests_to_run: Vec<&str> = args.tests.split(',').collect();
|
||||||
|
let mut results = Vec::new();
|
||||||
|
|
||||||
|
for test in tests_to_run {
|
||||||
|
println!("--------------------------------------------------");
|
||||||
|
println!("Running test: {}", test);
|
||||||
|
|
||||||
|
let (rw, numjobs) = match test {
|
||||||
|
"read" => ("read", 1),
|
||||||
|
"write" => ("write", 1),
|
||||||
|
"randread" => ("randread", 1),
|
||||||
|
"randwrite" => ("randwrite", 1),
|
||||||
|
"multiread" => ("read", 4),
|
||||||
|
"multiwrite" => ("write", 4),
|
||||||
|
"multirandread" => ("randread", 4),
|
||||||
|
"multirandwrite" => ("randwrite", 4),
|
||||||
|
_ => {
|
||||||
|
eprintln!("Unknown test: {}. Skipping.", test);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let test_name = format!("{}-{}-sync-test", test, args.block_size);
|
||||||
|
let fio_command = format!(
|
||||||
|
"fio --filename={}/iobench_testfile --direct=1 --fsync=1 --rw={} --bs={} --numjobs={} --iodepth=1 --runtime={} --time_based --group_reporting --name={} --size={} --output-format=json",
|
||||||
|
args.benchmark_dir, rw, args.block_size, numjobs, args.duration, test_name, args.size
|
||||||
|
);
|
||||||
|
|
||||||
|
println!("Executing command:\n{}\n", fio_command);
|
||||||
|
|
||||||
|
let output = match run_command(&args.target, &fio_command) {
|
||||||
|
Ok(out) => out,
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("Failed to execute command for test {}: {}", test, e);
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
|
let result = parse_fio_output(&output, &test_name, rw);
|
||||||
|
match result {
|
||||||
|
Ok(res) => {
|
||||||
|
results.push(res);
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
eprintln!("Error parsing fio output for test {}: {}", test, e);
|
||||||
|
eprintln!("Raw output:\n{}", output);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("{output}");
|
||||||
|
println!("Test {} completed.", test);
|
||||||
|
thread::sleep(Duration::from_secs(2));
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("--------------------------------------------------");
|
||||||
|
println!("Cleaning up test file on target...");
|
||||||
|
let cleanup_command = "rm -f ./iobench_testfile";
|
||||||
|
if let Err(e) = run_command(&args.target, cleanup_command) {
|
||||||
|
eprintln!("Warning: Failed to clean up test file on target: {}", e);
|
||||||
|
} else {
|
||||||
|
println!("Cleanup successful.");
|
||||||
|
}
|
||||||
|
|
||||||
|
if results.is_empty() {
|
||||||
|
println!("\nNo benchmark results to display.");
|
||||||
|
return Ok(());
|
||||||
|
}
|
||||||
|
|
||||||
|
let csv_path = format!("{}/summary.csv", output_dir);
|
||||||
|
let mut wtr = csv::Writer::from_path(&csv_path)?;
|
||||||
|
for result in &results {
|
||||||
|
wtr.serialize(result)?;
|
||||||
|
}
|
||||||
|
wtr.flush()?;
|
||||||
|
|
||||||
|
println!("\nBenchmark summary saved to {}", csv_path);
|
||||||
|
println!("\n--- Benchmark Results Summary ---");
|
||||||
|
println!(
|
||||||
|
"{:<25} {:>10} {:>18} {:>20} {:>22}",
|
||||||
|
"Test Name", "IOPS", "Bandwidth (KiB/s)", "Latency Mean (ms)", "Latency StdDev (ms)"
|
||||||
|
);
|
||||||
|
println!("{:-<98}", "");
|
||||||
|
for result in results {
|
||||||
|
println!(
|
||||||
|
"{:<25} {:>10.2} {:>18.2} {:>20.4} {:>22.4}",
|
||||||
|
result.test_name,
|
||||||
|
result.iops,
|
||||||
|
result.bandwidth_kibps,
|
||||||
|
result.latency_mean_ms,
|
||||||
|
result.latency_stddev_ms
|
||||||
|
);
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user