feat: iobench redpanda profile to run the recommended fio settings by redpanda on a k8s storage backend #281
2
.gitignore
vendored
2
.gitignore
vendored
@@ -32,3 +32,5 @@ ignore
|
||||
|
||||
# Generated book
|
||||
book
|
||||
|
||||
__pycache__
|
||||
|
||||
35
Cargo.lock
generated
35
Cargo.lock
generated
@@ -1932,6 +1932,27 @@ dependencies = [
|
||||
"typenum",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "csv"
|
||||
version = "1.4.0"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "52cd9d68cf7efc6ddfaaee42e7288d3a99d613d4b50f76ce9827ae0c6e14f938"
|
||||
dependencies = [
|
||||
"csv-core",
|
||||
"itoa",
|
||||
"ryu",
|
||||
"serde_core",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "csv-core"
|
||||
version = "0.1.13"
|
||||
source = "registry+https://github.com/rust-lang/crates.io-index"
|
||||
checksum = "704a3c26996a80471189265814dbc2c257598b96b8a7feae2d31ace646bb9782"
|
||||
dependencies = [
|
||||
"memchr",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ctr"
|
||||
version = "0.9.2"
|
||||
@@ -4648,6 +4669,20 @@ dependencies = [
|
||||
"thiserror 1.0.69",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "iobench"
|
||||
version = "1.0.0"
|
||||
dependencies = [
|
||||
"chrono",
|
||||
"clap",
|
||||
"csv",
|
||||
"k8s-openapi",
|
||||
"num_cpus",
|
||||
"serde",
|
||||
"serde_json",
|
||||
"serde_yaml",
|
||||
]
|
||||
|
||||
[[package]]
|
||||
name = "ipnet"
|
||||
version = "2.12.0"
|
||||
|
||||
@@ -28,6 +28,7 @@ members = [
|
||||
"harmony_node_readiness",
|
||||
"harmony-k8s",
|
||||
"harmony_assets", "opnsense-codegen", "opnsense-api",
|
||||
"iobench",
|
||||
]
|
||||
|
||||
[workspace.package]
|
||||
|
||||
@@ -11,7 +11,8 @@ clap = { version = "4.0", features = ["derive"] }
|
||||
chrono = "0.4"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
serde_yaml = { workspace = true }
|
||||
csv = "1.1"
|
||||
num_cpus = "1.13"
|
||||
k8s-openapi = { workspace = true }
|
||||
|
||||
[workspace]
|
||||
|
||||
68
iobench/PREFLIGHT_CHECKS.md
Normal file
68
iobench/PREFLIGHT_CHECKS.md
Normal file
@@ -0,0 +1,68 @@
|
||||
# Redpanda iobench preflight checklist
|
||||
|
||||
Cluster inspection performed: 2026-05-03
|
||||
Context: maintenance window, other workloads turned off, Ceph pool at ~0 IOPS idle.
|
||||
|
||||
---
|
||||
|
||||
## Cluster topology
|
||||
|
||||
- [x] **PASS** — 3 worker nodes available (wk0, wk1, wk2). Matches `--replicas 3` default.
|
||||
- [x] **PASS** — All 3 workers are Ready, no DiskPressure / MemoryPressure / PIDPressure.
|
||||
- [x] **PASS** — Control plane nodes (cp3, cp4, cp5) have `NoSchedule` taint. iobench pods cannot land there.
|
||||
- [x] **PASS** — All nodes carry `kubernetes.io/hostname` label. Anti-affinity topology key will work.
|
||||
- [x] **PASS** — Worker nodes are untainted and have no scheduling restrictions.
|
||||
|
||||
## Storage
|
||||
|
||||
- [x] **PASS** — StorageClass `ceph-block` exists, `volumeBindingMode: Immediate`, `reclaimPolicy: Delete`.
|
||||
- [x] **PASS** — Ceph pool `ceph-blockpool` replication: size=3, min_size=2.
|
||||
- [x] **PASS** — 3 OSDs all Running (osd-0, osd-1, osd-2).
|
||||
- [x] **PASS** — Raw capacity: 503 GiB available. 3x50GiB PVCs = 150 GiB usable = 450 GiB raw. Leaves 53 GiB raw headroom (~10.5%). Tight but sufficient for a benchmark run.
|
||||
- [x] **PASS** — Largest single-workload disk footprint: `throughput` with numjobs=4 x size=10G = 40 GiB per PVC. Fits in 50 GiB PVC with 10 GiB headroom for logs and filesystem overhead.
|
||||
- [x] **PASS** — Pool is idle ("nothing is going on"), OSD commit/apply latency showing 0 ms. Confirms maintenance window baseline.
|
||||
- [x] **PASS** — All 3 disks are actually Intel SSDs (misidentified as HDD by Ceph due to HP RAID controller passthrough). No mixed-media concern.
|
||||
- [x] **PASS (non-blocking)** — Ceph health is `HEALTH_WARN: too many PGs per OSD (265 > max 250)`. This is a pre-existing tuning issue unrelated to the benchmark. Does not affect correctness of results. Note: if this were a data-integrity warning (e.g. `HEALTH_WARN: degraded PGs`), the benchmark must not proceed.
|
||||
|
||||
## Namespace and resource constraints
|
||||
|
||||
- [x] **PASS** — `default` namespace exists.
|
||||
- [x] **PASS** — No ResourceQuota or LimitRange in `default` namespace. Pod creation won't be blocked.
|
||||
- [x] **PASS** — No leftover `iobench-redpanda` StatefulSet or PVCs in `default` namespace. Clean slate.
|
||||
|
||||
## Container image
|
||||
|
||||
- [x] **PASS** — `juicedata/fio:latest` pulls successfully and runs on this cluster.
|
||||
- [x] **PASS** — fio version 3.18 confirmed. Supports `fdatasync=1`, `log_avg_msec`, `write_lat_log`, `write_iops_log`, `--output-format=json`.
|
||||
- [x] **PASS** — Image contains `sh`, `date` (needed for wall-clock barrier), and `tar` (needed for `kubectl cp`).
|
||||
|
||||
## Clock synchronization (barrier correctness)
|
||||
|
||||
- [x] **PASS** — Node heartbeat timestamps are within ~5s of each other. The 60-second barrier (`start_at = now_epoch() + 60`) provides ample margin. Barrier only fails if clock skew exceeds 60s.
|
||||
|
||||
## Code safety review
|
||||
|
||||
- [x] **PASS** — All fio workloads use `direct=1` (bypass page cache). Benchmark measures Ceph, not RAM.
|
||||
- [x] **PASS** — All writes target `/data/iobench_testfile` on the mounted PVC filesystem, not a raw block device. No risk of corrupting the node OS disk.
|
||||
- [x] **PASS** — Pods run with default security context (no privileged, no hostPath, no hostNetwork). Blast radius is limited to the PVCs.
|
||||
- [x] **PASS** — `reclaimPolicy: Delete` means PVCs and their backing RBD images are fully cleaned up when deleted. No storage leak after `undeploy`.
|
||||
- [x] **PASS** — `delete_resources` targets only `statefulset,pvc` with label `app=iobench-redpanda`. Cannot accidentally delete unrelated resources.
|
||||
- [x] **PASS** — Parallel mode prints a clear warning before starting.
|
||||
- [x] **PASS** — `--keep-deployment` flag available for debugging without re-provisioning.
|
||||
- [x] **PASS** — Workloads run sequentially within each mode (throughput, then fsync_hot_path, then selftest_512k, then selftest_4k_qd1). Disk usage doesn't accumulate across workloads since they reuse the same filename.
|
||||
|
||||
## Resource consumption during run
|
||||
|
||||
- [x] **PASS** — Workers have ample CPU headroom (3-6% current usage). fio with 4 jobs + iodepth 16 is not CPU-intensive on modern hardware.
|
||||
- [x] **PASS** — Workers have ample memory headroom (5-10% current usage). fio with `direct=1` uses minimal RAM.
|
||||
- [x] **PASS** — No CPU/memory resource limits or requests set on the fio container. This is intentional — resource limits would throttle the benchmark and distort results. Acceptable during a maintenance window with other workloads off.
|
||||
|
||||
## Risks acknowledged
|
||||
|
||||
- **Ceph pool impact**: Parallel mode will saturate the Ceph pool. This is the point of the test. Confirmed other workloads are off and pool is at ~0 IOPS.
|
||||
- **Capacity headroom is tight**: 53 GiB raw remaining after PVC provisioning (~10.5%). If the cluster has background operations that consume space (e.g., snapshots, compaction), this could trigger a `HEALTH_ERR: full` condition. Mitigated by: maintenance window, no other workloads, and `reclaimPolicy: Delete` ensuring cleanup.
|
||||
- **`--wait=false` on delete**: `delete_resources` uses `--wait=false`, so `undeploy` returns before PVCs are fully reclaimed. This is fine — Ceph handles RBD image deletion asynchronously. But if re-running immediately after undeploy, PVCs from the previous run may still be terminating. Mitigated by: the deploy step uses `kubectl apply` which is idempotent.
|
||||
|
||||
## Verdict
|
||||
|
||||
All checks pass. Safe to proceed with `iobench redpanda --storage-class ceph-block` during the current maintenance window.
|
||||
146
iobench/README.md
Normal file
146
iobench/README.md
Normal file
@@ -0,0 +1,146 @@
|
||||
# iobench
|
||||
|
||||
A command-line I/O benchmarking tool using fio. Runs locally, over SSH, or on Kubernetes pods. Includes a Redpanda storage characterization profile for validating Ceph RBD suitability.
|
||||
|
||||
## Build
|
||||
|
||||
```bash
|
||||
cargo build -p iobench --release
|
||||
```
|
||||
|
||||
## Simple profile (default)
|
||||
|
||||
Runs standard fio benchmarks (sequential/random read/write, single/multi-job) against a local or remote target.
|
||||
|
||||
```bash
|
||||
# Local
|
||||
iobench
|
||||
|
||||
# Over SSH
|
||||
iobench --target ssh/user@host
|
||||
|
||||
# On a Kubernetes pod
|
||||
iobench --target k8s/namespace/pod-name
|
||||
|
||||
# Custom parameters
|
||||
iobench --duration 60 --size 4G --block-size 128k --tests read,write,randwrite
|
||||
```
|
||||
|
||||
Available tests: `read`, `write`, `randread`, `randwrite`, `multiread`, `multiwrite`, `multirandread`, `multirandwrite`.
|
||||
|
||||
Results are saved to `./iobench-{timestamp}/summary.csv`.
|
||||
|
||||
## Redpanda profile
|
||||
|
||||
Characterizes whether a Kubernetes storage backend (e.g. Ceph RBD) can sustain Redpanda's I/O patterns. Deploys a 3-pod StatefulSet with per-node PVCs and runs four fio workloads that model Redpanda's actual storage behavior.
|
||||
|
||||
### Quick start
|
||||
|
||||
```bash
|
||||
# Run the full profile (sequential baselines, then parallel contention test)
|
||||
iobench redpanda --storage-class ceph-block
|
||||
|
||||
# Sequential baselines only (single-node, no cluster impact)
|
||||
iobench redpanda --mode sequential --storage-class ceph-block
|
||||
|
||||
# Parallel contention test only
|
||||
iobench redpanda --mode parallel --storage-class ceph-block
|
||||
|
||||
# Keep pods alive after the run (useful for re-running or debugging)
|
||||
iobench redpanda --storage-class ceph-block --keep-deployment
|
||||
|
||||
# Deploy pods without running any workloads
|
||||
iobench deploy --storage-class ceph-block
|
||||
|
||||
# Remove all iobench pods and PVCs
|
||||
iobench undeploy
|
||||
```
|
||||
|
||||
### Options
|
||||
|
||||
| Flag | Default | Description |
|
||||
|------|---------|-------------|
|
||||
| `--mode` | `both` | `sequential`, `parallel`, or `both` |
|
||||
| `--storage-class` | `ceph-block` | Kubernetes StorageClass for the PVCs |
|
||||
| `--pvc-size` | `50Gi` | Size of each PVC |
|
||||
| `--replicas` | `3` | Number of pods (should match cluster node count) |
|
||||
| `--namespace` | `default` | Kubernetes namespace |
|
||||
| `--output-dir` | auto-timestamped | Directory for results |
|
||||
| `--keep-deployment` | `false` | Don't delete pods/PVCs after the run |
|
||||
|
||||
### Workloads
|
||||
|
||||
The profile runs four workloads, each targeting a different aspect of Redpanda's I/O:
|
||||
|
||||
| Workload | What it models | Key params | Runtime |
|
||||
|----------|---------------|------------|---------|
|
||||
| `throughput` | Bulk segment writes (optimistic upper bound) | 16K seq write, no fsync, 4 jobs, iodepth=16 | 5 min |
|
||||
| `fsync_hot_path` | Raft commit path with `acks=all` | 16K seq write, fdatasync per op, 4 jobs, iodepth=4 | 10 min |
|
||||
| `selftest_512k` | `rpk cluster self-test` 512K phase | 512K seq write, fdatasync, 1 job, iodepth=4 | 2 min |
|
||||
| `selftest_4k_qd1` | Worst-case single-stream commit | 4K seq write, fdatasync, 1 job, iodepth=1 | 2 min |
|
||||
|
||||
All workloads use `direct=1` (bypass page cache) and `ioengine=libaio`.
|
||||
|
||||
### Execution modes
|
||||
|
||||
- **`sequential`** -- Runs all four workloads back-to-back on a single node. Clean per-pattern baselines, comparable to Redpanda's published hardware requirements (16,000 IOPS minimum per broker).
|
||||
- **`parallel`** -- Runs all four workloads concurrently across all nodes simultaneously. Each node writes to its own PVC. A wall-clock barrier synchronizes start times across pods so contention windows overlap. This is the production-shape test that exposes the Ceph OSD fan-in pattern.
|
||||
- **`both`** (default) -- Sequential first, then parallel.
|
||||
|
||||
### Interpreting results
|
||||
|
||||
The **headline metric** is the worst p99.9 fdatasync latency on the `fsync_hot_path` workload during parallel execution:
|
||||
|
||||
- **<= 100 ms**: PASS. Storage can sustain Raft heartbeats under load.
|
||||
- **> 100 ms**: FAIL. Raft heartbeats (150 ms interval, 1.5 s election timeout) will not survive load spikes, leading to election storms.
|
||||
|
||||
Reference values from healthy NVMe (from `rpk cluster self-test`):
|
||||
- `selftest_512k`: ~1182 IOPS, ~591 MiB/s
|
||||
- `selftest_4k_qd1`: ~406 IOPS
|
||||
|
||||
### Output
|
||||
|
||||
Results are saved to `./iobench-redpanda-{timestamp}/`:
|
||||
|
||||
```
|
||||
iobench-redpanda-2026-05-03-143022/
|
||||
redpanda_summary.csv # Full results: all metrics, all nodes, all workloads
|
||||
iobench.csv # Dashboard-compatible format
|
||||
sequential/
|
||||
throughput_node-0.json # Raw fio JSON per workload per node
|
||||
fsync_hot_path_node-0.json
|
||||
...
|
||||
node-0/ # Per-node time-series logs
|
||||
throughput_lat.1.log
|
||||
throughput_iops.1.log
|
||||
...
|
||||
parallel/
|
||||
throughput_node-0.json
|
||||
throughput_node-1.json
|
||||
throughput_node-2.json
|
||||
...
|
||||
```
|
||||
|
||||
The CSV includes per-workload per-node: IOPS, bandwidth, completion latency percentiles (p50-p99.99, max), and fdatasync latency percentiles (p50-p99.99, max).
|
||||
|
||||
Per-second time-series logs (`*_lat.*.log`, `*_iops.*.log`) capture the bimodal/spiky behavior of Ceph under load that summary statistics miss.
|
||||
|
||||
### Warning
|
||||
|
||||
Parallel mode generates heavy fdatasync workloads across all nodes simultaneously. This **will impact other workloads on the same Ceph pool**. Run during a maintenance window or off-peak.
|
||||
|
||||
## Dashboard
|
||||
|
||||
A Plotly Dash app for visualizing results. Supports both simple and Redpanda profile data.
|
||||
|
||||
```bash
|
||||
cd iobench/dash
|
||||
virtualenv venv
|
||||
source venv/bin/activate
|
||||
pip install -r requirements_freeze.txt
|
||||
|
||||
# Copy result CSVs into the dash directory, then:
|
||||
python iobench-dash.py
|
||||
```
|
||||
|
||||
The dashboard reads `iobench.csv` and/or `redpanda_summary.csv` from its working directory.
|
||||
@@ -1,3 +1,5 @@
|
||||
import os
|
||||
|
||||
import dash
|
||||
from dash import dcc, html, Input, Output, State, clientside_callback, ClientsideFunction
|
||||
import plotly.express as px
|
||||
@@ -6,38 +8,29 @@ import dash_bootstrap_components as dbc
|
||||
import io
|
||||
|
||||
# --- Data Loading and Preparation ---
|
||||
# csv_data = """label,test_name,iops,bandwidth_kibps,latency_mean_ms,latency_stddev_ms
|
||||
# Ceph HDD Only,read-4k-sync-test,1474.302,5897,0.673,0.591
|
||||
# Ceph HDD Only,write-4k-sync-test,14.126,56,27.074,7.046
|
||||
# Ceph HDD Only,randread-4k-sync-test,225.140,900,4.436,6.918
|
||||
# Ceph HDD Only,randwrite-4k-sync-test,13.129,52,34.891,10.859
|
||||
# Ceph HDD Only,multiread-4k-sync-test,6873.675,27494,0.578,0.764
|
||||
# Ceph HDD Only,multiwrite-4k-sync-test,57.135,228,38.660,11.293
|
||||
# Ceph HDD Only,multirandread-4k-sync-test,2451.376,9805,1.626,2.515
|
||||
# Ceph HDD Only,multirandwrite-4k-sync-test,54.642,218,33.492,13.111
|
||||
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,read-4k-sync-test,1495.700,5982,0.664,1.701
|
||||
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,write-4k-sync-test,16.990,67,17.502,9.908
|
||||
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,randread-4k-sync-test,159.256,637,6.274,9.232
|
||||
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,randwrite-4k-sync-test,16.693,66,24.094,16.099
|
||||
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multiread-4k-sync-test,7305.559,29222,0.544,1.338
|
||||
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multiwrite-4k-sync-test,52.260,209,34.891,17.576
|
||||
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multirandread-4k-sync-test,700.606,2802,5.700,10.429
|
||||
# Ceph 2 Hosts WAL+DB SSD and 1 Host HDD,multirandwrite-4k-sync-test,52.723,210,29.709,25.829
|
||||
# Ceph 2 Hosts WAL+DB SSD Only,randwrite-4k-sync-test,90.037,360,3.617,8.321
|
||||
# Ceph WAL+DB SSD During Rebuild,randwrite-4k-sync-test,41.008,164,10.138,19.333
|
||||
# Ceph WAL+DB SSD OSD HDD,read-4k-sync-test,1520.299,6081,0.654,1.539
|
||||
# Ceph WAL+DB SSD OSD HDD,write-4k-sync-test,78.528,314,4.074,9.101
|
||||
# Ceph WAL+DB SSD OSD HDD,randread-4k-sync-test,153.303,613,6.518,9.036
|
||||
# Ceph WAL+DB SSD OSD HDD,randwrite-4k-sync-test,48.677,194,8.785,20.356
|
||||
# Ceph WAL+DB SSD OSD HDD,multiread-4k-sync-test,6804.880,27219,0.584,1.422
|
||||
# Ceph WAL+DB SSD OSD HDD,multiwrite-4k-sync-test,311.513,1246,4.978,9.458
|
||||
# Ceph WAL+DB SSD OSD HDD,multirandread-4k-sync-test,581.756,2327,6.869,10.204
|
||||
# Ceph WAL+DB SSD OSD HDD,multirandwrite-4k-sync-test,120.556,482,13.463,25.440
|
||||
# """
|
||||
#
|
||||
# df = pd.read_csv(io.StringIO(csv_data))
|
||||
df = pd.read_csv("iobench.csv") # Replace with the actual file path
|
||||
dataframes = []
|
||||
|
||||
if os.path.exists("iobench.csv"):
|
||||
df_simple = pd.read_csv("iobench.csv")
|
||||
dataframes.append(df_simple)
|
||||
|
||||
if os.path.exists("redpanda_summary.csv"):
|
||||
df_red = pd.read_csv("redpanda_summary.csv")
|
||||
df_red['label'] = df_red['mode'] + '-' + df_red['workload']
|
||||
df_red['test_name'] = df_red['workload']
|
||||
cols = ['label', 'test_name', 'iops', 'bandwidth_kibps', 'latency_mean_ms', 'latency_stddev_ms', 'fdatasync_p99_9_ms']
|
||||
df_red = df_red[[c for c in cols if c in df_red.columns]]
|
||||
dataframes.append(df_red)
|
||||
|
||||
if not dataframes:
|
||||
raise FileNotFoundError("No iobench.csv or redpanda_summary.csv found in working directory.")
|
||||
|
||||
df = pd.concat(dataframes, ignore_index=True)
|
||||
df['bandwidth_mbps'] = df['bandwidth_kibps'] / 1024
|
||||
if 'fdatasync_p99_9_ms' not in df.columns:
|
||||
df['fdatasync_p99_9_ms'] = 0.0
|
||||
else:
|
||||
df['fdatasync_p99_9_ms'] = df['fdatasync_p99_9_ms'].fillna(0.0)
|
||||
|
||||
# --- App Initialization and Global Settings ---
|
||||
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.FLATLY])
|
||||
@@ -70,7 +63,8 @@ app.layout = dbc.Container([
|
||||
options=[
|
||||
{'label': 'IOPS', 'value': 'iops'},
|
||||
{'label': 'Latency (ms)', 'value': 'latency_mean_ms'},
|
||||
{'label': 'Bandwidth (MB/s)', 'value': 'bandwidth_mbps'}
|
||||
{'label': 'Bandwidth (MB/s)', 'value': 'bandwidth_mbps'},
|
||||
{'label': 'fdatasync p99.9 (ms)', 'value': 'fdatasync_p99_9_ms'},
|
||||
],
|
||||
value=['iops', 'latency_mean_ms', 'bandwidth_mbps'], # Default selection
|
||||
labelClassName="d-block"
|
||||
@@ -185,11 +179,12 @@ def update_graphs(selected_metrics, selected_configs, selected_tests):
|
||||
metric_titles = {
|
||||
'iops': 'IOPS Comparison (Higher is Better)',
|
||||
'latency_mean_ms': 'Mean Latency (ms) Comparison (Lower is Better)',
|
||||
'bandwidth_mbps': 'Bandwidth (MB/s) Comparison (Higher is Better)'
|
||||
'bandwidth_mbps': 'Bandwidth (MB/s) Comparison (Higher is Better)',
|
||||
'fdatasync_p99_9_ms': 'fdatasync p99.9 Latency (ms) Comparison (Lower is Better)',
|
||||
}
|
||||
|
||||
for metric in selected_metrics:
|
||||
sort_order = 'total ascending' if metric == 'latency_mean_ms' else 'total descending'
|
||||
sort_order = 'total ascending' if metric in ('latency_mean_ms', 'fdatasync_p99_9_ms') else 'total descending'
|
||||
error_y_param = 'latency_stddev_ms' if metric == 'latency_mean_ms' else None
|
||||
|
||||
fig = px.bar(
|
||||
@@ -206,7 +201,8 @@ def update_graphs(selected_metrics, selected_configs, selected_tests):
|
||||
"iops": "IOPS",
|
||||
"latency_mean_ms": "Mean Latency (ms)",
|
||||
"bandwidth_mbps": "Bandwidth (MB/s)",
|
||||
"label": "Cluster Configuration"
|
||||
"fdatasync_p99_9_ms": "fdatasync p99.9 (ms)",
|
||||
"label": "Configuration"
|
||||
}
|
||||
)
|
||||
|
||||
|
||||
224
iobench/src/fio.rs
Normal file
224
iobench/src/fio.rs
Normal file
@@ -0,0 +1,224 @@
|
||||
use serde::{Deserialize, Serialize};
|
||||
use std::collections::HashMap;
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct FioOutput {
|
||||
pub jobs: Vec<FioJobResult>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct FioJobResult {
|
||||
pub jobname: String,
|
||||
pub read: FioMetrics,
|
||||
pub write: FioMetrics,
|
||||
#[serde(default)]
|
||||
pub sync: Option<SyncMetrics>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct FioMetrics {
|
||||
pub bw: f64,
|
||||
pub iops: f64,
|
||||
pub clat_ns: LatencyMetrics,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct SyncMetrics {
|
||||
pub total_ios: u64,
|
||||
pub lat_ns: LatencyMetrics,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
pub struct LatencyMetrics {
|
||||
pub mean: f64,
|
||||
pub stddev: f64,
|
||||
#[serde(default)]
|
||||
pub percentile: Option<HashMap<String, f64>>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
pub struct BenchmarkResult {
|
||||
pub test_name: String,
|
||||
pub iops: f64,
|
||||
pub bandwidth_kibps: f64,
|
||||
pub latency_mean_ms: f64,
|
||||
pub latency_stddev_ms: f64,
|
||||
pub clat_p50_ms: f64,
|
||||
pub clat_p95_ms: f64,
|
||||
pub clat_p99_ms: f64,
|
||||
pub clat_p99_9_ms: f64,
|
||||
pub clat_p99_99_ms: f64,
|
||||
pub clat_max_ms: f64,
|
||||
}
|
||||
|
||||
fn percentile_ns_to_ms(p: &Option<HashMap<String, f64>>, key: &str) -> f64 {
|
||||
p.as_ref().and_then(|m| m.get(key).copied()).unwrap_or(0.0) / 1_000_000.0
|
||||
}
|
||||
|
||||
/// Parse fio JSON output and extract metrics for the named job.
|
||||
/// The `rw` parameter selects whether to read from the `read` or `write`
|
||||
/// metrics block — any value containing "read" uses the read block,
|
||||
/// everything else uses write.
|
||||
pub fn parse_fio_output(
|
||||
output: &str,
|
||||
test_name: &str,
|
||||
rw: &str,
|
||||
) -> Result<BenchmarkResult, String> {
|
||||
let fio_data: FioOutput = serde_json::from_str(output)
|
||||
.map_err(|e| format!("Failed to deserialize fio JSON: {}", e))?;
|
||||
|
||||
let job_result = fio_data
|
||||
.jobs
|
||||
.iter()
|
||||
.find(|j| j.jobname == test_name)
|
||||
.ok_or_else(|| {
|
||||
format!(
|
||||
"Could not find job result for '{}' in fio output",
|
||||
test_name
|
||||
)
|
||||
})?;
|
||||
|
||||
let metrics = if rw.contains("read") {
|
||||
&job_result.read
|
||||
} else {
|
||||
&job_result.write
|
||||
};
|
||||
|
||||
let p = &metrics.clat_ns.percentile;
|
||||
Ok(BenchmarkResult {
|
||||
test_name: test_name.to_string(),
|
||||
iops: metrics.iops,
|
||||
bandwidth_kibps: metrics.bw,
|
||||
latency_mean_ms: metrics.clat_ns.mean / 1_000_000.0,
|
||||
latency_stddev_ms: metrics.clat_ns.stddev / 1_000_000.0,
|
||||
clat_p50_ms: percentile_ns_to_ms(p, "50.000000"),
|
||||
clat_p95_ms: percentile_ns_to_ms(p, "95.000000"),
|
||||
clat_p99_ms: percentile_ns_to_ms(p, "99.000000"),
|
||||
clat_p99_9_ms: percentile_ns_to_ms(p, "99.900000"),
|
||||
clat_p99_99_ms: percentile_ns_to_ms(p, "99.990000"),
|
||||
clat_max_ms: percentile_ns_to_ms(p, "100.000000"),
|
||||
})
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn percentile_ns_to_ms_converts_correctly() {
|
||||
let mut m = HashMap::new();
|
||||
m.insert("50.000000".to_string(), 4_000_000.0); // 4ms in ns
|
||||
let p = Some(m);
|
||||
assert!((percentile_ns_to_ms(&p, "50.000000") - 4.0).abs() < 0.001);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn percentile_ns_to_ms_missing_key_returns_zero() {
|
||||
let m = HashMap::new();
|
||||
assert_eq!(percentile_ns_to_ms(&Some(m), "99.000000"), 0.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn percentile_ns_to_ms_none_returns_zero() {
|
||||
assert_eq!(percentile_ns_to_ms(&None, "50.000000"), 0.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_fio_output_selects_read_metrics() {
|
||||
let json = serde_json::json!({
|
||||
"jobs": [{
|
||||
"jobname": "readtest",
|
||||
"read": {
|
||||
"bw": 100000.0, "iops": 5000.0,
|
||||
"clat_ns": { "mean": 200_000.0, "stddev": 50_000.0 }
|
||||
},
|
||||
"write": {
|
||||
"bw": 0.0, "iops": 0.0,
|
||||
"clat_ns": { "mean": 0.0, "stddev": 0.0 }
|
||||
}
|
||||
}]
|
||||
})
|
||||
.to_string();
|
||||
|
||||
let result = parse_fio_output(&json, "readtest", "read").unwrap();
|
||||
assert!((result.iops - 5000.0).abs() < 0.001);
|
||||
assert!((result.bandwidth_kibps - 100000.0).abs() < 0.001);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_fio_output_selects_write_metrics() {
|
||||
let json = serde_json::json!({
|
||||
"jobs": [{
|
||||
"jobname": "writetest",
|
||||
"read": {
|
||||
"bw": 0.0, "iops": 0.0,
|
||||
"clat_ns": { "mean": 0.0, "stddev": 0.0 }
|
||||
},
|
||||
"write": {
|
||||
"bw": 51200.0, "iops": 3200.0,
|
||||
"clat_ns": { "mean": 5_000_000.0, "stddev": 1_000_000.0 }
|
||||
}
|
||||
}]
|
||||
})
|
||||
.to_string();
|
||||
|
||||
let result = parse_fio_output(&json, "writetest", "write").unwrap();
|
||||
assert!((result.iops - 3200.0).abs() < 0.001);
|
||||
assert!((result.latency_mean_ms - 5.0).abs() < 0.001);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_fio_output_no_percentiles_returns_zeros() {
|
||||
let json = serde_json::json!({
|
||||
"jobs": [{
|
||||
"jobname": "test",
|
||||
"read": {
|
||||
"bw": 0.0, "iops": 0.0,
|
||||
"clat_ns": { "mean": 0.0, "stddev": 0.0 }
|
||||
},
|
||||
"write": {
|
||||
"bw": 1000.0, "iops": 100.0,
|
||||
"clat_ns": { "mean": 10_000_000.0, "stddev": 2_000_000.0 }
|
||||
}
|
||||
}]
|
||||
})
|
||||
.to_string();
|
||||
|
||||
let result = parse_fio_output(&json, "test", "write").unwrap();
|
||||
assert_eq!(result.clat_p50_ms, 0.0);
|
||||
assert_eq!(result.clat_p99_ms, 0.0);
|
||||
assert_eq!(result.clat_max_ms, 0.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_fio_output_invalid_json_errors() {
|
||||
let result = parse_fio_output("{not json", "test", "write");
|
||||
assert!(result.is_err());
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fio_output_deserializes_with_optional_sync() {
|
||||
// Without sync field
|
||||
let json = serde_json::json!({
|
||||
"jobs": [{
|
||||
"jobname": "test",
|
||||
"read": { "bw": 0.0, "iops": 0.0, "clat_ns": { "mean": 0.0, "stddev": 0.0 } },
|
||||
"write": { "bw": 0.0, "iops": 0.0, "clat_ns": { "mean": 0.0, "stddev": 0.0 } }
|
||||
}]
|
||||
});
|
||||
let parsed: FioOutput = serde_json::from_value(json).unwrap();
|
||||
assert!(parsed.jobs[0].sync.is_none());
|
||||
|
||||
// With sync field
|
||||
let json = serde_json::json!({
|
||||
"jobs": [{
|
||||
"jobname": "test",
|
||||
"read": { "bw": 0.0, "iops": 0.0, "clat_ns": { "mean": 0.0, "stddev": 0.0 } },
|
||||
"write": { "bw": 0.0, "iops": 0.0, "clat_ns": { "mean": 0.0, "stddev": 0.0 } },
|
||||
"sync": { "total_ios": 100, "lat_ns": { "mean": 500.0, "stddev": 100.0 } }
|
||||
}]
|
||||
});
|
||||
let parsed: FioOutput = serde_json::from_value(json).unwrap();
|
||||
assert_eq!(parsed.jobs[0].sync.as_ref().unwrap().total_ios, 100);
|
||||
}
|
||||
}
|
||||
193
iobench/src/k8s.rs
Normal file
193
iobench/src/k8s.rs
Normal file
@@ -0,0 +1,193 @@
|
||||
use std::collections::BTreeMap;
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::process::{Command, Stdio};
|
||||
|
||||
use k8s_openapi::api::apps::v1::StatefulSet;
|
||||
use k8s_openapi::api::core::v1::{
|
||||
Affinity, Container, PersistentVolumeClaim, PersistentVolumeClaimSpec, PodAffinityTerm,
|
||||
PodAntiAffinity, PodSpec, PodTemplateSpec, VolumeMount, VolumeResourceRequirements,
|
||||
};
|
||||
use k8s_openapi::apimachinery::pkg::api::resource::Quantity;
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::{LabelSelector, ObjectMeta};
|
||||
|
||||
pub fn redpanda_statefulset(
|
||||
name: &str,
|
||||
namespace: &str,
|
||||
replicas: i32,
|
||||
storage_class: &str,
|
||||
size: &str,
|
||||
image: &str,
|
||||
) -> StatefulSet {
|
||||
let mut labels = BTreeMap::new();
|
||||
labels.insert("app".to_string(), name.to_string());
|
||||
|
||||
let pvc = PersistentVolumeClaim {
|
||||
metadata: ObjectMeta {
|
||||
name: Some("data".to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: Some(PersistentVolumeClaimSpec {
|
||||
access_modes: Some(vec!["ReadWriteOnce".to_string()]),
|
||||
resources: Some(VolumeResourceRequirements {
|
||||
requests: Some({
|
||||
let mut m = BTreeMap::new();
|
||||
m.insert("storage".to_string(), Quantity(size.to_string()));
|
||||
m
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
storage_class_name: Some(storage_class.to_string()),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let container = Container {
|
||||
name: "fio".to_string(),
|
||||
image: Some(image.to_string()),
|
||||
image_pull_policy: Some("IfNotPresent".to_string()),
|
||||
command: Some(vec!["sleep".to_string(), "infinity".to_string()]),
|
||||
volume_mounts: Some(vec![VolumeMount {
|
||||
name: "data".to_string(),
|
||||
mount_path: "/data".to_string(),
|
||||
..Default::default()
|
||||
}]),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
let pod_spec = PodSpec {
|
||||
containers: vec![container],
|
||||
affinity: Some(Affinity {
|
||||
pod_anti_affinity: Some(PodAntiAffinity {
|
||||
required_during_scheduling_ignored_during_execution: Some(vec![PodAffinityTerm {
|
||||
label_selector: Some(LabelSelector {
|
||||
match_labels: Some(labels.clone()),
|
||||
..Default::default()
|
||||
}),
|
||||
topology_key: "kubernetes.io/hostname".to_string(),
|
||||
..Default::default()
|
||||
}]),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
};
|
||||
|
||||
StatefulSet {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(name.to_string()),
|
||||
namespace: Some(namespace.to_string()),
|
||||
labels: Some(labels.clone()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: Some(k8s_openapi::api::apps::v1::StatefulSetSpec {
|
||||
replicas: Some(replicas),
|
||||
selector: LabelSelector {
|
||||
match_labels: Some(labels.clone()),
|
||||
..Default::default()
|
||||
},
|
||||
service_name: Some(name.to_string()),
|
||||
pod_management_policy: Some("Parallel".to_string()),
|
||||
template: PodTemplateSpec {
|
||||
metadata: Some(ObjectMeta {
|
||||
labels: Some(labels),
|
||||
..Default::default()
|
||||
}),
|
||||
spec: Some(pod_spec),
|
||||
},
|
||||
volume_claim_templates: Some(vec![pvc]),
|
||||
..Default::default()
|
||||
}),
|
||||
..Default::default()
|
||||
}
|
||||
}
|
||||
|
||||
pub fn run_kubectl(args: &[&str]) -> io::Result<String> {
|
||||
let mut cmd = Command::new("kubectl");
|
||||
cmd.args(args);
|
||||
cmd.stdout(Stdio::piped()).stderr(Stdio::piped());
|
||||
let output = cmd.output()?;
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
return Err(io::Error::other(format!(
|
||||
"kubectl failed: {} (args: {:?})",
|
||||
stderr, args
|
||||
)));
|
||||
}
|
||||
Ok(String::from_utf8_lossy(&output.stdout).to_string())
|
||||
}
|
||||
|
||||
pub fn apply_yaml(yaml: &str) -> io::Result<()> {
|
||||
let mut cmd = Command::new("kubectl");
|
||||
cmd.args(["apply", "-f", "-"]);
|
||||
cmd.stdin(Stdio::piped());
|
||||
cmd.stdout(Stdio::piped());
|
||||
cmd.stderr(Stdio::piped());
|
||||
|
||||
let mut child = cmd.spawn()?;
|
||||
{
|
||||
let stdin = child.stdin.as_mut().unwrap();
|
||||
stdin.write_all(yaml.as_bytes())?;
|
||||
}
|
||||
let output = child.wait_with_output()?;
|
||||
if !output.status.success() {
|
||||
let stderr = String::from_utf8_lossy(&output.stderr);
|
||||
return Err(io::Error::other(format!(
|
||||
"kubectl apply failed: {}",
|
||||
stderr
|
||||
)));
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn wait_for_pods(namespace: &str, name: &str) -> io::Result<()> {
|
||||
run_kubectl(&[
|
||||
"rollout",
|
||||
"status",
|
||||
"statefulset",
|
||||
name,
|
||||
"-n",
|
||||
namespace,
|
||||
"--timeout",
|
||||
"300s",
|
||||
])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn get_pod_names(namespace: &str, label: &str) -> io::Result<Vec<String>> {
|
||||
let out = run_kubectl(&[
|
||||
"get",
|
||||
"pods",
|
||||
"-n",
|
||||
namespace,
|
||||
"-l",
|
||||
&format!("app={}", label),
|
||||
"-o",
|
||||
"jsonpath={.items[*].metadata.name}",
|
||||
])?;
|
||||
Ok(out.split_whitespace().map(|s| s.to_string()).collect())
|
||||
}
|
||||
|
||||
pub fn exec(namespace: &str, pod: &str, command: &str) -> io::Result<String> {
|
||||
run_kubectl(&["exec", "-n", namespace, pod, "--", "sh", "-c", command])
|
||||
}
|
||||
|
||||
pub fn cp_from(namespace: &str, pod: &str, remote: &str, local: &str) -> io::Result<()> {
|
||||
run_kubectl(&["cp", "-n", namespace, &format!("{}:{}", pod, remote), local])?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn delete_resources(namespace: &str, label: &str) -> io::Result<()> {
|
||||
run_kubectl(&[
|
||||
"delete",
|
||||
"statefulset,pvc",
|
||||
"-n",
|
||||
namespace,
|
||||
"-l",
|
||||
&format!("app={}", label),
|
||||
"--wait=false",
|
||||
])?;
|
||||
Ok(())
|
||||
}
|
||||
@@ -1,23 +1,22 @@
|
||||
use std::fs;
|
||||
use std::io::{self, Write};
|
||||
use std::io;
|
||||
use std::io::Write;
|
||||
use std::process::{Command, Stdio};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use chrono::Local;
|
||||
use clap::Parser;
|
||||
use serde::{Deserialize, Serialize};
|
||||
use clap::{Parser, Subcommand};
|
||||
|
||||
mod fio;
|
||||
mod k8s;
|
||||
mod redpanda;
|
||||
mod simple;
|
||||
|
||||
/// A simple yet powerful I/O benchmarking tool using fio.
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(author, version, about, long_about = None)]
|
||||
struct Args {
|
||||
/// Target for the benchmark.
|
||||
/// Formats:
|
||||
/// - localhost (default)
|
||||
/// - ssh/{user}@{host}
|
||||
/// - ssh/{user}@{host}:{port}
|
||||
/// - k8s/{namespace}/{pod}
|
||||
struct Cli {
|
||||
#[command(subcommand)]
|
||||
command: Option<Commands>,
|
||||
|
||||
// Simple profile args (used when no subcommand is given)
|
||||
#[arg(short, long, default_value = "localhost")]
|
||||
target: String,
|
||||
|
||||
@@ -27,7 +26,10 @@ struct Args {
|
||||
/// Comma-separated list of tests to run.
|
||||
/// Available tests: read, write, randread, randwrite,
|
||||
/// multiread, multiwrite, multirandread, multirandwrite.
|
||||
#[arg(long, default_value = "read,write,randread,randwrite,multiread,multiwrite,multirandread,multirandwrite")]
|
||||
#[arg(
|
||||
long,
|
||||
default_value = "read,write,randread,randwrite,multiread,multiwrite,multirandread,multirandwrite"
|
||||
)]
|
||||
tests: String,
|
||||
|
||||
/// Duration of each test in seconds.
|
||||
@@ -48,143 +50,49 @@ struct Args {
|
||||
block_size: String,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct FioOutput {
|
||||
jobs: Vec<FioJobResult>,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct FioJobResult {
|
||||
jobname: String,
|
||||
read: FioMetrics,
|
||||
write: FioMetrics,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct FioMetrics {
|
||||
bw: f64,
|
||||
iops: f64,
|
||||
clat_ns: LatencyMetrics,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize, Deserialize)]
|
||||
struct LatencyMetrics {
|
||||
mean: f64,
|
||||
stddev: f64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Serialize)]
|
||||
struct BenchmarkResult {
|
||||
test_name: String,
|
||||
iops: f64,
|
||||
bandwidth_kibps: f64,
|
||||
latency_mean_ms: f64,
|
||||
latency_stddev_ms: f64,
|
||||
#[derive(Subcommand, Debug)]
|
||||
enum Commands {
|
||||
/// Run the Redpanda storage characterization profile.
|
||||
Redpanda(redpanda::RedpandaArgs),
|
||||
/// Deploy the Redpanda iobench resources and exit.
|
||||
Deploy(redpanda::RedpandaArgs),
|
||||
/// Remove the Redpanda iobench resources.
|
||||
Undeploy(redpanda::RedpandaArgs),
|
||||
}
|
||||
|
||||
fn main() -> io::Result<()> {
|
||||
let args = Args::parse();
|
||||
let cli = Cli::parse();
|
||||
|
||||
let output_dir = args.output_dir.unwrap_or_else(|| {
|
||||
format!("./iobench-{}", Local::now().format("%Y-%m-%d-%H%M%S"))
|
||||
});
|
||||
fs::create_dir_all(&output_dir)?;
|
||||
|
||||
let tests_to_run: Vec<&str> = args.tests.split(',').collect();
|
||||
let mut results = Vec::new();
|
||||
|
||||
for test in tests_to_run {
|
||||
println!("--------------------------------------------------");
|
||||
println!("Running test: {}", test);
|
||||
|
||||
let (rw, numjobs) = match test {
|
||||
"read" => ("read", 1),
|
||||
"write" => ("write", 1),
|
||||
"randread" => ("randread", 1),
|
||||
"randwrite" => ("randwrite", 1),
|
||||
"multiread" => ("read", 4),
|
||||
"multiwrite" => ("write", 4),
|
||||
"multirandread" => ("randread", 4),
|
||||
"multirandwrite" => ("randwrite", 4),
|
||||
_ => {
|
||||
eprintln!("Unknown test: {}. Skipping.", test);
|
||||
continue;
|
||||
}
|
||||
match cli.command {
|
||||
None => {
|
||||
// Backward-compatible simple benchmark
|
||||
let args = simple::SimpleArgs {
|
||||
target: cli.target,
|
||||
benchmark_dir: cli.benchmark_dir,
|
||||
tests: cli.tests,
|
||||
duration: cli.duration,
|
||||
output_dir: cli.output_dir,
|
||||
size: cli.size,
|
||||
block_size: cli.block_size,
|
||||
};
|
||||
|
||||
let test_name = format!("{}-{}-sync-test", test, args.block_size);
|
||||
let fio_command = format!(
|
||||
"fio --filename={}/iobench_testfile --direct=1 --fsync=1 --rw={} --bs={} --numjobs={} --iodepth=1 --runtime={} --time_based --group_reporting --name={} --size={} --output-format=json",
|
||||
args.benchmark_dir, rw, args.block_size, numjobs, args.duration, test_name, args.size
|
||||
);
|
||||
|
||||
println!("Executing command:\n{}\n", fio_command);
|
||||
|
||||
let output = match run_command(&args.target, &fio_command) {
|
||||
Ok(out) => out,
|
||||
Err(e) => {
|
||||
eprintln!("Failed to execute command for test {}: {}", test, e);
|
||||
continue;
|
||||
simple::run(&args)
|
||||
}
|
||||
};
|
||||
|
||||
|
||||
let result = parse_fio_output(&output, &test_name, rw);
|
||||
// TODO store raw fio output and print it
|
||||
match result {
|
||||
Ok(res) => {
|
||||
results.push(res);
|
||||
Some(Commands::Redpanda(args)) => redpanda::run(&args),
|
||||
Some(Commands::Deploy(args)) => {
|
||||
let mut a = args.clone();
|
||||
a.deploy_only = true;
|
||||
redpanda::run(&a)
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Error parsing fio output for test {}: {}", test, e);
|
||||
eprintln!("Raw output:\n{}", output);
|
||||
Some(Commands::Undeploy(args)) => redpanda::undeploy(&args),
|
||||
}
|
||||
}
|
||||
|
||||
println!("{output}");
|
||||
println!("Test {} completed.", test);
|
||||
// A brief pause to let the system settle before the next test.
|
||||
thread::sleep(Duration::from_secs(2));
|
||||
}
|
||||
|
||||
// Cleanup the test file on the target
|
||||
println!("--------------------------------------------------");
|
||||
println!("Cleaning up test file on target...");
|
||||
let cleanup_command = "rm -f ./iobench_testfile";
|
||||
if let Err(e) = run_command(&args.target, cleanup_command) {
|
||||
eprintln!("Warning: Failed to clean up test file on target: {}", e);
|
||||
} else {
|
||||
println!("Cleanup successful.");
|
||||
}
|
||||
|
||||
|
||||
if results.is_empty() {
|
||||
println!("\nNo benchmark results to display.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
// Output results to a CSV file for easy analysis
|
||||
let csv_path = format!("{}/summary.csv", output_dir);
|
||||
let mut wtr = csv::Writer::from_path(&csv_path)?;
|
||||
for result in &results {
|
||||
wtr.serialize(result)?;
|
||||
}
|
||||
wtr.flush()?;
|
||||
|
||||
println!("\nBenchmark summary saved to {}", csv_path);
|
||||
println!("\n--- Benchmark Results Summary ---");
|
||||
println!("{:<25} {:>10} {:>18} {:>20} {:>22}", "Test Name", "IOPS", "Bandwidth (KiB/s)", "Latency Mean (ms)", "Latency StdDev (ms)");
|
||||
println!("{:-<98}", "");
|
||||
for result in results {
|
||||
println!("{:<25} {:>10.2} {:>18.2} {:>20.4} {:>22.4}", result.test_name, result.iops, result.bandwidth_kibps, result.latency_mean_ms, result.latency_stddev_ms);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn run_command(target: &str, command: &str) -> io::Result<String> {
|
||||
pub fn run_command(target: &str, command: &str) -> io::Result<String> {
|
||||
let (program, args) = if target == "localhost" {
|
||||
("sudo", vec!["sh".to_string(), "-c".to_string(), command.to_string()])
|
||||
(
|
||||
"sudo",
|
||||
vec!["sh".to_string(), "-c".to_string(), command.to_string()],
|
||||
)
|
||||
} else if target.starts_with("ssh/") {
|
||||
let target_str = target.strip_prefix("ssh/").unwrap();
|
||||
let ssh_target;
|
||||
@@ -203,13 +111,31 @@ fn run_command(target: &str, command: &str) -> io::Result<String> {
|
||||
} else if target.starts_with("k8s/") {
|
||||
let parts: Vec<&str> = target.strip_prefix("k8s/").unwrap().split('/').collect();
|
||||
if parts.len() != 2 {
|
||||
return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid k8s target format. Expected k8s/{namespace}/{pod}"));
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"Invalid k8s target format. Expected k8s/{namespace}/{pod}",
|
||||
));
|
||||
}
|
||||
let namespace = parts[0];
|
||||
let pod = parts[1];
|
||||
("kubectl", vec!["exec".to_string(), "-n".to_string(), namespace.to_string(), pod.to_string(), "--".to_string(), "sh".to_string(), "-c".to_string(), command.to_string()])
|
||||
(
|
||||
"kubectl",
|
||||
vec![
|
||||
"exec".to_string(),
|
||||
"-n".to_string(),
|
||||
namespace.to_string(),
|
||||
pod.to_string(),
|
||||
"--".to_string(),
|
||||
"sh".to_string(),
|
||||
"-c".to_string(),
|
||||
command.to_string(),
|
||||
],
|
||||
)
|
||||
} else {
|
||||
return Err(io::Error::new(io::ErrorKind::InvalidInput, "Invalid target format"));
|
||||
return Err(io::Error::new(
|
||||
io::ErrorKind::InvalidInput,
|
||||
"Invalid target format",
|
||||
));
|
||||
};
|
||||
|
||||
let mut cmd = Command::new(program);
|
||||
@@ -222,32 +148,8 @@ fn run_command(target: &str, command: &str) -> io::Result<String> {
|
||||
if !output.status.success() {
|
||||
eprintln!("Command failed with status: {}", output.status);
|
||||
io::stderr().write_all(&output.stderr)?;
|
||||
return Err(io::Error::new(io::ErrorKind::Other, "Command execution failed"));
|
||||
return Err(io::Error::other("Command execution failed"));
|
||||
}
|
||||
|
||||
String::from_utf8(output.stdout)
|
||||
.map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
|
||||
}
|
||||
|
||||
fn parse_fio_output(output: &str, test_name: &str, rw: &str) -> Result<BenchmarkResult, String> {
|
||||
let fio_data: FioOutput = serde_json::from_str(output)
|
||||
.map_err(|e| format!("Failed to deserialize fio JSON: {}", e))?;
|
||||
|
||||
let job_result = fio_data.jobs.iter()
|
||||
.find(|j| j.jobname == test_name)
|
||||
.ok_or_else(|| format!("Could not find job result for '{}' in fio output", test_name))?;
|
||||
|
||||
let metrics = if rw.contains("read") {
|
||||
&job_result.read
|
||||
} else {
|
||||
&job_result.write
|
||||
};
|
||||
|
||||
Ok(BenchmarkResult {
|
||||
test_name: test_name.to_string(),
|
||||
iops: metrics.iops,
|
||||
bandwidth_kibps: metrics.bw,
|
||||
latency_mean_ms: metrics.clat_ns.mean / 1_000_000.0,
|
||||
latency_stddev_ms: metrics.clat_ns.stddev / 1_000_000.0,
|
||||
})
|
||||
String::from_utf8(output.stdout).map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e))
|
||||
}
|
||||
|
||||
994
iobench/src/redpanda.rs
Normal file
994
iobench/src/redpanda.rs
Normal file
@@ -0,0 +1,994 @@
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::path::Path;
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
use std::thread;
|
||||
use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH};
|
||||
|
||||
#[cfg(test)]
|
||||
use std::collections::HashMap;
|
||||
|
||||
use chrono::Local;
|
||||
use clap::Args;
|
||||
|
||||
use crate::fio::{FioOutput, parse_fio_output};
|
||||
use crate::k8s;
|
||||
|
||||
const NAME: &str = "iobench-redpanda";
|
||||
const IMAGE: &str = "juicedata/fio:latest";
|
||||
const BARRIER_SECS: u64 = 60;
|
||||
|
||||
const WORKLOAD_THROUGHPUT: &str = r#"[global]
|
||||
direct=1
|
||||
ioengine=libaio
|
||||
group_reporting
|
||||
time_based
|
||||
log_avg_msec=1000
|
||||
filename=/data/iobench_testfile
|
||||
size=10G
|
||||
|
||||
[throughput]
|
||||
rw=write
|
||||
bs=16K
|
||||
numjobs=4
|
||||
iodepth=16
|
||||
runtime=300
|
||||
"#;
|
||||
|
||||
const WORKLOAD_FSYNC_HOT_PATH: &str = r#"[global]
|
||||
direct=1
|
||||
ioengine=libaio
|
||||
group_reporting
|
||||
time_based
|
||||
log_avg_msec=1000
|
||||
filename=/data/iobench_testfile
|
||||
size=5G
|
||||
|
||||
[fsync_hot_path]
|
||||
rw=write
|
||||
bs=16K
|
||||
fdatasync=1
|
||||
numjobs=4
|
||||
iodepth=4
|
||||
runtime=600
|
||||
"#;
|
||||
|
||||
const WORKLOAD_SELFTEST_512K: &str = r#"[global]
|
||||
direct=1
|
||||
ioengine=libaio
|
||||
group_reporting
|
||||
time_based
|
||||
log_avg_msec=1000
|
||||
filename=/data/iobench_testfile
|
||||
size=10G
|
||||
|
||||
[selftest_512k]
|
||||
rw=write
|
||||
bs=512K
|
||||
fdatasync=1
|
||||
numjobs=1
|
||||
iodepth=4
|
||||
runtime=120
|
||||
"#;
|
||||
|
||||
const WORKLOAD_SELFTEST_4K_QD1: &str = r#"[global]
|
||||
direct=1
|
||||
ioengine=libaio
|
||||
group_reporting
|
||||
time_based
|
||||
log_avg_msec=1000
|
||||
filename=/data/iobench_testfile
|
||||
size=2G
|
||||
|
||||
[selftest_4k_qd1]
|
||||
rw=write
|
||||
bs=4K
|
||||
fdatasync=1
|
||||
numjobs=1
|
||||
iodepth=1
|
||||
runtime=120
|
||||
"#;
|
||||
|
||||
struct Workload {
|
||||
name: &'static str,
|
||||
template: &'static str,
|
||||
runtime_secs: u64,
|
||||
description: &'static str,
|
||||
}
|
||||
|
||||
fn workloads() -> Vec<Workload> {
|
||||
vec![
|
||||
Workload {
|
||||
name: "throughput",
|
||||
template: WORKLOAD_THROUGHPUT,
|
||||
runtime_secs: 300,
|
||||
description: "16K seq write, no fsync, 4 jobs — bulk segment write upper bound",
|
||||
},
|
||||
Workload {
|
||||
name: "fsync_hot_path",
|
||||
template: WORKLOAD_FSYNC_HOT_PATH,
|
||||
runtime_secs: 600,
|
||||
description: "16K seq write, fdatasync/op, 4 jobs — Raft acks=all commit path",
|
||||
},
|
||||
Workload {
|
||||
name: "selftest_512k",
|
||||
template: WORKLOAD_SELFTEST_512K,
|
||||
runtime_secs: 120,
|
||||
description: "512K seq write, fdatasync, 1 job — rpk self-test 512K phase",
|
||||
},
|
||||
Workload {
|
||||
name: "selftest_4k_qd1",
|
||||
template: WORKLOAD_SELFTEST_4K_QD1,
|
||||
runtime_secs: 120,
|
||||
description: "4K seq write, fdatasync, iodepth=1 — worst-case single-stream commit",
|
||||
},
|
||||
]
|
||||
}
|
||||
|
||||
fn total_runtime_secs(workloads: &[Workload]) -> u64 {
|
||||
workloads
|
||||
.iter()
|
||||
.map(|w| BARRIER_SECS + w.runtime_secs)
|
||||
.sum()
|
||||
}
|
||||
|
||||
fn fmt_duration(secs: u64) -> String {
|
||||
if secs >= 3600 {
|
||||
format!("{}h{:02}m", secs / 3600, (secs % 3600) / 60)
|
||||
} else if secs >= 60 {
|
||||
format!("{}m{:02}s", secs / 60, secs % 60)
|
||||
} else {
|
||||
format!("{}s", secs)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Args, Debug, Clone)]
|
||||
pub struct RedpandaArgs {
|
||||
#[arg(long, default_value = "both")]
|
||||
pub mode: String,
|
||||
#[arg(long, default_value = "ceph-block")]
|
||||
pub storage_class: String,
|
||||
#[arg(long, default_value = "50Gi")]
|
||||
pub pvc_size: String,
|
||||
#[arg(long, default_value_t = 3)]
|
||||
pub replicas: i32,
|
||||
#[arg(long, default_value = "default")]
|
||||
pub namespace: String,
|
||||
#[arg(long)]
|
||||
pub output_dir: Option<String>,
|
||||
#[arg(long)]
|
||||
pub keep_deployment: bool,
|
||||
#[arg(long)]
|
||||
pub deploy_only: bool,
|
||||
}
|
||||
|
||||
pub fn deploy(args: &RedpandaArgs) -> io::Result<()> {
|
||||
println!(
|
||||
"[deploy] Creating StatefulSet '{}' in namespace '{}' ({} replicas, {} PVCs on {})",
|
||||
NAME, args.namespace, args.replicas, args.pvc_size, args.storage_class
|
||||
);
|
||||
let ss = k8s::redpanda_statefulset(
|
||||
NAME,
|
||||
&args.namespace,
|
||||
args.replicas,
|
||||
&args.storage_class,
|
||||
&args.pvc_size,
|
||||
IMAGE,
|
||||
);
|
||||
let yaml = serde_yaml::to_string(&ss)
|
||||
.map_err(|e| io::Error::other(format!("YAML serialization failed: {}", e)))?;
|
||||
k8s::apply_yaml(&yaml)?;
|
||||
println!(
|
||||
"[deploy] StatefulSet applied, waiting for {} pods to be ready (timeout 300s)...",
|
||||
args.replicas
|
||||
);
|
||||
k8s::wait_for_pods(&args.namespace, NAME)?;
|
||||
let pods = k8s::get_pod_names(&args.namespace, NAME)?;
|
||||
println!("[deploy] All pods ready: {}", pods.join(", "));
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn undeploy(args: &RedpandaArgs) -> io::Result<()> {
|
||||
println!(
|
||||
"[cleanup] Deleting StatefulSet and PVCs with label app={}...",
|
||||
NAME
|
||||
);
|
||||
k8s::delete_resources(&args.namespace, NAME)?;
|
||||
println!("[cleanup] Delete issued (async). Resources will be fully removed shortly.");
|
||||
Ok(())
|
||||
}
|
||||
|
||||
pub fn run(args: &RedpandaArgs) -> io::Result<()> {
|
||||
if args.deploy_only {
|
||||
return deploy(args);
|
||||
}
|
||||
|
||||
let output_dir = args.output_dir.clone().unwrap_or_else(|| {
|
||||
format!(
|
||||
"./iobench-redpanda-{}",
|
||||
Local::now().format("%Y-%m-%d-%H%M%S")
|
||||
)
|
||||
});
|
||||
fs::create_dir_all(&output_dir)?;
|
||||
|
||||
let pod_names = match k8s::get_pod_names(&args.namespace, NAME) {
|
||||
Ok(pods) if pods.len() >= args.replicas as usize => {
|
||||
println!("[setup] Found existing pods: {}", pods.join(", "));
|
||||
pods
|
||||
}
|
||||
_ => {
|
||||
deploy(args)?;
|
||||
k8s::get_pod_names(&args.namespace, NAME)?
|
||||
}
|
||||
};
|
||||
|
||||
if pod_names.len() < args.replicas as usize {
|
||||
return Err(io::Error::other(format!(
|
||||
"Expected {} pods, found {}",
|
||||
args.replicas,
|
||||
pod_names.len()
|
||||
)));
|
||||
}
|
||||
|
||||
let modes: Vec<&str> = match args.mode.as_str() {
|
||||
"sequential" => vec!["sequential"],
|
||||
"parallel" => vec!["parallel"],
|
||||
"both" => vec!["sequential", "parallel"],
|
||||
other => {
|
||||
return Err(io::Error::other(format!(
|
||||
"Unknown mode: {}. Use sequential, parallel, or both.",
|
||||
other
|
||||
)));
|
||||
}
|
||||
};
|
||||
|
||||
let wl = workloads();
|
||||
let per_mode_secs = total_runtime_secs(&wl);
|
||||
let total_secs = per_mode_secs * modes.len() as u64;
|
||||
|
||||
println!();
|
||||
println!("============================================================");
|
||||
println!(" Redpanda I/O characterization profile");
|
||||
println!(
|
||||
" Modes: {:?} | Workloads: {} | Pods: {}",
|
||||
modes,
|
||||
wl.len(),
|
||||
pod_names.len()
|
||||
);
|
||||
println!(" Estimated total runtime: {}", fmt_duration(total_secs));
|
||||
println!(" Output: {}", output_dir);
|
||||
println!("============================================================");
|
||||
|
||||
let run_start = Instant::now();
|
||||
let mut all_results: Vec<RedpandaCsvRow> = Vec::new();
|
||||
|
||||
for (mode_idx, mode) in modes.iter().enumerate() {
|
||||
println!();
|
||||
if *mode == "parallel" {
|
||||
println!("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
|
||||
println!(" WARNING: Parallel mode — intensive fdatasync across all");
|
||||
println!(" nodes simultaneously. Will impact other Ceph pool users.");
|
||||
println!("!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!");
|
||||
}
|
||||
println!(
|
||||
"=== Mode: {} ({}/{}) — {} per-mode est. ===",
|
||||
mode,
|
||||
mode_idx + 1,
|
||||
modes.len(),
|
||||
fmt_duration(per_mode_secs)
|
||||
);
|
||||
|
||||
for (wl_idx, workload) in wl.iter().enumerate() {
|
||||
let remaining_in_mode: u64 = wl[wl_idx..]
|
||||
.iter()
|
||||
.map(|w| BARRIER_SECS + w.runtime_secs)
|
||||
.sum();
|
||||
|
||||
println!();
|
||||
println!(
|
||||
"--- [{}/{}] {} ({}s barrier + {}s fio = {}) ---",
|
||||
wl_idx + 1,
|
||||
wl.len(),
|
||||
workload.name,
|
||||
BARRIER_SECS,
|
||||
workload.runtime_secs,
|
||||
fmt_duration(BARRIER_SECS + workload.runtime_secs)
|
||||
);
|
||||
println!(" {}", workload.description);
|
||||
println!(
|
||||
" Remaining in this mode: {} | Total elapsed: {}",
|
||||
fmt_duration(remaining_in_mode),
|
||||
fmt_duration(run_start.elapsed().as_secs())
|
||||
);
|
||||
|
||||
let mode_dir = format!("{}/{}", output_dir, mode);
|
||||
fs::create_dir_all(&mode_dir)?;
|
||||
|
||||
let start_at = now_epoch() + BARRIER_SECS;
|
||||
|
||||
let results = if *mode == "sequential" {
|
||||
let pod = &pod_names[0];
|
||||
println!(" Running on: {}", pod);
|
||||
let result = run_workload_on_pod(
|
||||
&args.namespace,
|
||||
pod,
|
||||
workload.name,
|
||||
workload.template,
|
||||
workload.runtime_secs,
|
||||
start_at,
|
||||
&mode_dir,
|
||||
)?;
|
||||
vec![(pod.clone(), result)]
|
||||
} else {
|
||||
println!(
|
||||
" Running on {} pods: {}",
|
||||
pod_names.len(),
|
||||
pod_names.join(", ")
|
||||
);
|
||||
let mut handles = Vec::new();
|
||||
for pod in &pod_names {
|
||||
let ns = args.namespace.clone();
|
||||
let pod_for_thread = pod.clone();
|
||||
let wl_name = workload.name.to_string();
|
||||
let template = workload.template.to_string();
|
||||
let dir = mode_dir.clone();
|
||||
let rt = workload.runtime_secs;
|
||||
let handle = thread::spawn(move || {
|
||||
run_workload_on_pod(
|
||||
&ns,
|
||||
&pod_for_thread,
|
||||
&wl_name,
|
||||
&template,
|
||||
rt,
|
||||
start_at,
|
||||
&dir,
|
||||
)
|
||||
});
|
||||
handles.push((pod.clone(), handle));
|
||||
}
|
||||
let mut collected = Vec::new();
|
||||
for (pod, handle) in handles {
|
||||
match handle.join() {
|
||||
Ok(Ok(result)) => collected.push((pod, result)),
|
||||
Ok(Err(e)) => {
|
||||
eprintln!(" ERROR on pod {}: {}", pod, e);
|
||||
}
|
||||
Err(_) => {
|
||||
eprintln!(" PANIC on pod {}", pod);
|
||||
}
|
||||
}
|
||||
}
|
||||
collected
|
||||
};
|
||||
|
||||
println!(
|
||||
" {} complete ({} elapsed total)",
|
||||
workload.name,
|
||||
fmt_duration(run_start.elapsed().as_secs())
|
||||
);
|
||||
|
||||
for (pod, raw_json) in &results {
|
||||
let pod_slug = pod.replace(NAME, "node");
|
||||
let json_path = format!("{}/{}_{}.json", mode_dir, workload.name, pod_slug);
|
||||
fs::write(&json_path, raw_json)?;
|
||||
|
||||
let result = parse_fio_output(raw_json, workload.name, "write").map_err(|e| {
|
||||
io::Error::other(format!("{} on {}: {}", workload.name, pod, e))
|
||||
})?;
|
||||
|
||||
let sync = parse_sync_latencies(raw_json, workload.name);
|
||||
|
||||
// Print per-pod summary inline
|
||||
let bw_mib = result.bandwidth_kibps / 1024.0;
|
||||
if sync.total_ios > 0 {
|
||||
println!(
|
||||
" {} => {:.0} IOPS | {:.1} MiB/s | clat_p99={:.2}ms | fsync_p99.9={:.2}ms",
|
||||
pod_slug, result.iops, bw_mib, result.clat_p99_ms, sync.p99_9_ms
|
||||
);
|
||||
} else {
|
||||
println!(
|
||||
" {} => {:.0} IOPS | {:.1} MiB/s | clat_p99={:.2}ms",
|
||||
pod_slug, result.iops, bw_mib, result.clat_p99_ms
|
||||
);
|
||||
}
|
||||
|
||||
all_results.push(RedpandaCsvRow {
|
||||
label: "redpanda".to_string(),
|
||||
mode: mode.to_string(),
|
||||
workload: workload.name.to_string(),
|
||||
node: pod.clone(),
|
||||
test_name: result.test_name.clone(),
|
||||
iops: result.iops,
|
||||
bandwidth_kibps: result.bandwidth_kibps,
|
||||
latency_mean_ms: result.latency_mean_ms,
|
||||
latency_stddev_ms: result.latency_stddev_ms,
|
||||
clat_p50_ms: result.clat_p50_ms,
|
||||
clat_p95_ms: result.clat_p95_ms,
|
||||
clat_p99_ms: result.clat_p99_ms,
|
||||
clat_p99_9_ms: result.clat_p99_9_ms,
|
||||
clat_p99_99_ms: result.clat_p99_99_ms,
|
||||
clat_max_ms: result.clat_max_ms,
|
||||
fdatasync_total_ios: sync.total_ios,
|
||||
fdatasync_mean_ms: sync.mean_ms,
|
||||
fdatasync_p50_ms: sync.p50_ms,
|
||||
fdatasync_p95_ms: sync.p95_ms,
|
||||
fdatasync_p99_ms: sync.p99_ms,
|
||||
fdatasync_p99_9_ms: sync.p99_9_ms,
|
||||
fdatasync_p99_99_ms: sync.p99_99_ms,
|
||||
fdatasync_max_ms: sync.max_ms,
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
println!();
|
||||
println!("============================================================");
|
||||
println!(
|
||||
" All workloads complete. Total runtime: {}",
|
||||
fmt_duration(run_start.elapsed().as_secs())
|
||||
);
|
||||
println!("============================================================");
|
||||
|
||||
let csv_path = format!("{}/redpanda_summary.csv", output_dir);
|
||||
let mut wtr = csv::Writer::from_path(&csv_path)?;
|
||||
for row in &all_results {
|
||||
wtr.serialize(row)?;
|
||||
}
|
||||
wtr.flush()?;
|
||||
println!("[output] Redpanda summary: {}", csv_path);
|
||||
|
||||
let dash_path = format!("{}/iobench.csv", output_dir);
|
||||
let mut wtr2 = csv::Writer::from_path(&dash_path)?;
|
||||
for row in &all_results {
|
||||
wtr2.serialize(DashRow::from(row))?;
|
||||
}
|
||||
wtr2.flush()?;
|
||||
println!("[output] Dashboard CSV: {}", dash_path);
|
||||
|
||||
if let Some(worst) = all_results
|
||||
.iter()
|
||||
.filter(|r| r.mode == "parallel" && r.workload == "fsync_hot_path")
|
||||
.max_by(|a, b| {
|
||||
a.fdatasync_p99_9_ms
|
||||
.partial_cmp(&b.fdatasync_p99_9_ms)
|
||||
.unwrap_or(std::cmp::Ordering::Equal)
|
||||
})
|
||||
{
|
||||
println!();
|
||||
println!("=== HEADLINE ===");
|
||||
println!(
|
||||
"Worst p99.9 fdatasync latency (parallel fsync_hot_path): {:.3} ms on {}",
|
||||
worst.fdatasync_p99_9_ms, worst.node
|
||||
);
|
||||
if worst.fdatasync_p99_9_ms > 100.0 {
|
||||
println!("RESULT: FAIL (>100 ms). Raft heartbeats may not survive load spikes.");
|
||||
} else {
|
||||
println!("RESULT: PASS (<=100 ms).");
|
||||
}
|
||||
}
|
||||
|
||||
if !args.keep_deployment {
|
||||
println!();
|
||||
undeploy(args)?;
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn now_epoch() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap()
|
||||
.as_secs()
|
||||
}
|
||||
|
||||
fn run_workload_on_pod(
|
||||
namespace: &str,
|
||||
pod: &str,
|
||||
workload_name: &str,
|
||||
job_template: &str,
|
||||
runtime_secs: u64,
|
||||
start_at: u64,
|
||||
local_mode_dir: &str,
|
||||
) -> io::Result<String> {
|
||||
let pod_slug = pod.replace(NAME, "node");
|
||||
let remote_job_path = format!("/data/results/{}.fio", workload_name);
|
||||
let remote_out_path = format!("/data/results/{}.json", workload_name);
|
||||
let remote_results_dir = "/data/results";
|
||||
|
||||
let write_job_cmd = format!(
|
||||
"mkdir -p {} && cat > {} <<'EOF'\n{}\nEOF",
|
||||
remote_results_dir, remote_job_path, job_template
|
||||
);
|
||||
k8s::exec(namespace, pod, &write_job_cmd)?;
|
||||
|
||||
let fio_cmd = format!(
|
||||
"NOW=$(date +%s); if [ $NOW -lt {start_at} ]; then sleep $(({start_at} - NOW)); fi; fio {remote_job_path} --output={remote_out_path} --output-format=json --write_lat_log={remote_results_dir}/{workload_name}_lat --write_iops_log={remote_results_dir}/{workload_name}_iops",
|
||||
start_at = start_at,
|
||||
remote_job_path = remote_job_path,
|
||||
remote_out_path = remote_out_path,
|
||||
remote_results_dir = remote_results_dir,
|
||||
workload_name = workload_name,
|
||||
);
|
||||
|
||||
// Progress ticker — prints every 10s until fio exec returns
|
||||
let done = Arc::new(AtomicBool::new(false));
|
||||
let done_clone = done.clone();
|
||||
let ticker_label = pod_slug.clone();
|
||||
let wl = workload_name.to_string();
|
||||
let total_expected = BARRIER_SECS + runtime_secs;
|
||||
let progress_handle = thread::spawn(move || {
|
||||
let start = Instant::now();
|
||||
loop {
|
||||
thread::sleep(Duration::from_secs(10));
|
||||
if done_clone.load(Ordering::Acquire) {
|
||||
break;
|
||||
}
|
||||
let elapsed = start.elapsed().as_secs();
|
||||
let phase = if elapsed < BARRIER_SECS {
|
||||
format!("barrier sync {}s/{}s", elapsed, BARRIER_SECS)
|
||||
} else {
|
||||
let fio_elapsed = elapsed - BARRIER_SECS;
|
||||
format!("fio running {}s/{}s", fio_elapsed, runtime_secs)
|
||||
};
|
||||
let remaining = total_expected.saturating_sub(elapsed);
|
||||
eprintln!(
|
||||
" [{}:{}] {} ({} remaining)",
|
||||
ticker_label,
|
||||
wl,
|
||||
phase,
|
||||
fmt_duration(remaining)
|
||||
);
|
||||
}
|
||||
});
|
||||
|
||||
let output = k8s::exec(namespace, pod, &fio_cmd);
|
||||
|
||||
done.store(true, Ordering::Release);
|
||||
let _ = progress_handle.join();
|
||||
|
||||
let output = output?;
|
||||
|
||||
let local_pod_dir = format!("{}/{}", local_mode_dir, pod_slug);
|
||||
fs::create_dir_all(&local_pod_dir)?;
|
||||
eprintln!(" [{}] Copying results from pod...", pod_slug);
|
||||
if let Err(e) = k8s::cp_from(namespace, pod, remote_results_dir, &local_pod_dir) {
|
||||
eprintln!(" [{}] Warning: failed to copy results: {}", pod_slug, e);
|
||||
}
|
||||
|
||||
let local_json = format!("{}/{}.json", local_pod_dir, workload_name);
|
||||
if Path::new(&local_json).exists() {
|
||||
fs::read_to_string(&local_json)
|
||||
} else {
|
||||
Ok(output)
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Default)]
|
||||
struct SyncLatencies {
|
||||
total_ios: u64,
|
||||
mean_ms: f64,
|
||||
p50_ms: f64,
|
||||
p95_ms: f64,
|
||||
p99_ms: f64,
|
||||
p99_9_ms: f64,
|
||||
p99_99_ms: f64,
|
||||
max_ms: f64,
|
||||
}
|
||||
|
||||
fn parse_sync_latencies(json: &str, workload_name: &str) -> SyncLatencies {
|
||||
let fio_data: FioOutput = match serde_json::from_str(json) {
|
||||
Ok(d) => d,
|
||||
Err(_) => return SyncLatencies::default(),
|
||||
};
|
||||
let job = match fio_data.jobs.iter().find(|j| j.jobname == workload_name) {
|
||||
Some(j) => j,
|
||||
None => return SyncLatencies::default(),
|
||||
};
|
||||
let sync = match &job.sync {
|
||||
Some(s) => s,
|
||||
None => return SyncLatencies::default(),
|
||||
};
|
||||
|
||||
let p = sync.lat_ns.percentile.as_ref();
|
||||
SyncLatencies {
|
||||
total_ios: sync.total_ios,
|
||||
mean_ms: sync.lat_ns.mean / 1_000_000.0,
|
||||
p50_ms: get_percentile(p, "50.000000"),
|
||||
p95_ms: get_percentile(p, "95.000000"),
|
||||
p99_ms: get_percentile(p, "99.000000"),
|
||||
p99_9_ms: get_percentile(p, "99.900000"),
|
||||
p99_99_ms: get_percentile(p, "99.990000"),
|
||||
max_ms: get_percentile(p, "100.000000"),
|
||||
}
|
||||
}
|
||||
|
||||
fn get_percentile(p: Option<&std::collections::HashMap<String, f64>>, key: &str) -> f64 {
|
||||
p.and_then(|m| m.get(key).copied()).unwrap_or(0.0) / 1_000_000.0
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize)]
|
||||
struct RedpandaCsvRow {
|
||||
label: String,
|
||||
mode: String,
|
||||
workload: String,
|
||||
node: String,
|
||||
test_name: String,
|
||||
iops: f64,
|
||||
bandwidth_kibps: f64,
|
||||
latency_mean_ms: f64,
|
||||
latency_stddev_ms: f64,
|
||||
clat_p50_ms: f64,
|
||||
clat_p95_ms: f64,
|
||||
clat_p99_ms: f64,
|
||||
clat_p99_9_ms: f64,
|
||||
clat_p99_99_ms: f64,
|
||||
clat_max_ms: f64,
|
||||
fdatasync_total_ios: u64,
|
||||
fdatasync_mean_ms: f64,
|
||||
fdatasync_p50_ms: f64,
|
||||
fdatasync_p95_ms: f64,
|
||||
fdatasync_p99_ms: f64,
|
||||
fdatasync_p99_9_ms: f64,
|
||||
fdatasync_p99_99_ms: f64,
|
||||
fdatasync_max_ms: f64,
|
||||
}
|
||||
|
||||
#[derive(Debug, serde::Serialize)]
|
||||
struct DashRow {
|
||||
label: String,
|
||||
test_name: String,
|
||||
iops: f64,
|
||||
bandwidth_kibps: f64,
|
||||
latency_mean_ms: f64,
|
||||
latency_stddev_ms: f64,
|
||||
fdatasync_p99_9_ms: f64,
|
||||
}
|
||||
|
||||
impl From<&RedpandaCsvRow> for DashRow {
|
||||
fn from(row: &RedpandaCsvRow) -> Self {
|
||||
DashRow {
|
||||
label: format!("{}-{}-{}", row.label, row.mode, row.workload),
|
||||
test_name: row.test_name.clone(),
|
||||
iops: row.iops,
|
||||
bandwidth_kibps: row.bandwidth_kibps,
|
||||
latency_mean_ms: row.latency_mean_ms,
|
||||
latency_stddev_ms: row.latency_stddev_ms,
|
||||
fdatasync_p99_9_ms: row.fdatasync_p99_9_ms,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
// -- fmt_duration --
|
||||
|
||||
#[test]
|
||||
fn fmt_duration_seconds_only() {
|
||||
assert_eq!(fmt_duration(0), "0s");
|
||||
assert_eq!(fmt_duration(1), "1s");
|
||||
assert_eq!(fmt_duration(59), "59s");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fmt_duration_minutes_and_seconds() {
|
||||
assert_eq!(fmt_duration(60), "1m00s");
|
||||
assert_eq!(fmt_duration(61), "1m01s");
|
||||
assert_eq!(fmt_duration(125), "2m05s");
|
||||
assert_eq!(fmt_duration(3599), "59m59s");
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn fmt_duration_hours() {
|
||||
assert_eq!(fmt_duration(3600), "1h00m");
|
||||
assert_eq!(fmt_duration(3660), "1h01m");
|
||||
assert_eq!(fmt_duration(7200), "2h00m");
|
||||
assert_eq!(fmt_duration(5432), "1h30m");
|
||||
}
|
||||
|
||||
// -- total_runtime_secs --
|
||||
|
||||
#[test]
|
||||
fn total_runtime_secs_matches_workload_sum() {
|
||||
let wl = workloads();
|
||||
let expected: u64 = wl.iter().map(|w| BARRIER_SECS + w.runtime_secs).sum();
|
||||
assert_eq!(total_runtime_secs(&wl), expected);
|
||||
// 4 workloads: (60+300) + (60+600) + (60+120) + (60+120) = 1380
|
||||
assert_eq!(expected, 1380);
|
||||
}
|
||||
|
||||
// -- Workload runtime consistency --
|
||||
// Verify that each Workload.runtime_secs matches the runtime= value
|
||||
// in its fio template. Prevents the two sources of truth from diverging.
|
||||
|
||||
#[test]
|
||||
fn workload_runtime_matches_fio_template() {
|
||||
for w in workloads() {
|
||||
let runtime_from_template = w
|
||||
.template
|
||||
.lines()
|
||||
.find_map(|line| {
|
||||
let trimmed = line.trim();
|
||||
trimmed
|
||||
.strip_prefix("runtime=")
|
||||
.and_then(|v| v.parse::<u64>().ok())
|
||||
})
|
||||
.unwrap_or_else(|| {
|
||||
panic!("Could not find runtime= in fio template for '{}'", w.name)
|
||||
});
|
||||
assert_eq!(
|
||||
w.runtime_secs, runtime_from_template,
|
||||
"Workload '{}': struct says {}s but template says runtime={}",
|
||||
w.name, w.runtime_secs, runtime_from_template
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
// -- parse_sync_latencies --
|
||||
|
||||
fn sample_fio_json_with_sync() -> String {
|
||||
serde_json::json!({
|
||||
"jobs": [{
|
||||
"jobname": "fsync_hot_path",
|
||||
"read": {
|
||||
"bw": 0.0, "iops": 0.0,
|
||||
"clat_ns": { "mean": 0.0, "stddev": 0.0 }
|
||||
},
|
||||
"write": {
|
||||
"bw": 51200.0, "iops": 3200.0,
|
||||
"clat_ns": {
|
||||
"mean": 5_000_000.0, "stddev": 1_000_000.0,
|
||||
"percentile": {
|
||||
"50.000000": 4_000_000.0,
|
||||
"95.000000": 8_000_000.0,
|
||||
"99.000000": 12_000_000.0,
|
||||
"99.900000": 20_000_000.0,
|
||||
"99.990000": 30_000_000.0,
|
||||
"100.000000": 50_000_000.0
|
||||
}
|
||||
}
|
||||
},
|
||||
"sync": {
|
||||
"total_ios": 50000_u64,
|
||||
"lat_ns": {
|
||||
"mean": 2_000_000.0, "stddev": 500_000.0,
|
||||
"percentile": {
|
||||
"50.000000": 1_500_000.0,
|
||||
"95.000000": 3_000_000.0,
|
||||
"99.000000": 5_000_000.0,
|
||||
"99.900000": 10_000_000.0,
|
||||
"99.990000": 20_000_000.0,
|
||||
"100.000000": 50_000_000.0
|
||||
}
|
||||
}
|
||||
}
|
||||
}]
|
||||
})
|
||||
.to_string()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_sync_latencies_extracts_correctly() {
|
||||
let json = sample_fio_json_with_sync();
|
||||
let sync = parse_sync_latencies(&json, "fsync_hot_path");
|
||||
|
||||
assert_eq!(sync.total_ios, 50000);
|
||||
assert!((sync.mean_ms - 2.0).abs() < 0.001);
|
||||
assert!((sync.p50_ms - 1.5).abs() < 0.001);
|
||||
assert!((sync.p95_ms - 3.0).abs() < 0.001);
|
||||
assert!((sync.p99_ms - 5.0).abs() < 0.001);
|
||||
assert!((sync.p99_9_ms - 10.0).abs() < 0.001);
|
||||
assert!((sync.p99_99_ms - 20.0).abs() < 0.001);
|
||||
assert!((sync.max_ms - 50.0).abs() < 0.001);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_sync_latencies_wrong_job_returns_default() {
|
||||
let json = sample_fio_json_with_sync();
|
||||
let sync = parse_sync_latencies(&json, "nonexistent");
|
||||
assert_eq!(sync.total_ios, 0);
|
||||
assert_eq!(sync.mean_ms, 0.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_sync_latencies_invalid_json_returns_default() {
|
||||
let sync = parse_sync_latencies("not json", "fsync_hot_path");
|
||||
assert_eq!(sync.total_ios, 0);
|
||||
}
|
||||
|
||||
fn sample_fio_json_no_sync() -> String {
|
||||
serde_json::json!({
|
||||
"jobs": [{
|
||||
"jobname": "throughput",
|
||||
"read": {
|
||||
"bw": 0.0, "iops": 0.0,
|
||||
"clat_ns": { "mean": 0.0, "stddev": 0.0 }
|
||||
},
|
||||
"write": {
|
||||
"bw": 51200.0, "iops": 3200.0,
|
||||
"clat_ns": {
|
||||
"mean": 5_000_000.0, "stddev": 1_000_000.0,
|
||||
"percentile": {
|
||||
"50.000000": 4_000_000.0,
|
||||
"95.000000": 8_000_000.0,
|
||||
"99.000000": 12_000_000.0,
|
||||
"99.900000": 20_000_000.0,
|
||||
"99.990000": 30_000_000.0,
|
||||
"100.000000": 50_000_000.0
|
||||
}
|
||||
}
|
||||
}
|
||||
}]
|
||||
})
|
||||
.to_string()
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_sync_latencies_no_sync_field_returns_default() {
|
||||
let json = sample_fio_json_no_sync();
|
||||
let sync = parse_sync_latencies(&json, "throughput");
|
||||
assert_eq!(sync.total_ios, 0);
|
||||
}
|
||||
|
||||
// -- get_percentile --
|
||||
|
||||
#[test]
|
||||
fn get_percentile_extracts_and_converts_ns_to_ms() {
|
||||
let mut m = HashMap::new();
|
||||
m.insert("99.000000".to_string(), 12_000_000.0);
|
||||
let result = get_percentile(Some(&m), "99.000000");
|
||||
assert!((result - 12.0).abs() < 0.001);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_percentile_missing_key_returns_zero() {
|
||||
let m = HashMap::new();
|
||||
assert_eq!(get_percentile(Some(&m), "99.000000"), 0.0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn get_percentile_none_map_returns_zero() {
|
||||
assert_eq!(get_percentile(None, "99.000000"), 0.0);
|
||||
}
|
||||
|
||||
// -- parse_fio_output integration --
|
||||
|
||||
#[test]
|
||||
fn parse_fio_output_with_sync_workload() {
|
||||
let json = sample_fio_json_with_sync();
|
||||
let result = parse_fio_output(&json, "fsync_hot_path", "write").unwrap();
|
||||
|
||||
assert!((result.iops - 3200.0).abs() < 0.001);
|
||||
assert!((result.bandwidth_kibps - 51200.0).abs() < 0.001);
|
||||
assert!((result.latency_mean_ms - 5.0).abs() < 0.001);
|
||||
assert!((result.clat_p50_ms - 4.0).abs() < 0.001);
|
||||
assert!((result.clat_p99_ms - 12.0).abs() < 0.001);
|
||||
assert!((result.clat_p99_9_ms - 20.0).abs() < 0.001);
|
||||
assert!((result.clat_max_ms - 50.0).abs() < 0.001);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_fio_output_no_sync_workload() {
|
||||
let json = sample_fio_json_no_sync();
|
||||
let result = parse_fio_output(&json, "throughput", "write").unwrap();
|
||||
|
||||
assert!((result.iops - 3200.0).abs() < 0.001);
|
||||
assert!((result.clat_p99_ms - 12.0).abs() < 0.001);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn parse_fio_output_wrong_job_name_errors() {
|
||||
let json = sample_fio_json_no_sync();
|
||||
let result = parse_fio_output(&json, "nonexistent", "write");
|
||||
assert!(result.is_err());
|
||||
assert!(result.unwrap_err().contains("nonexistent"));
|
||||
}
|
||||
|
||||
// -- DashRow label formatting --
|
||||
|
||||
#[test]
|
||||
fn dash_row_label_format() {
|
||||
let row = RedpandaCsvRow {
|
||||
label: "redpanda".to_string(),
|
||||
mode: "parallel".to_string(),
|
||||
workload: "fsync_hot_path".to_string(),
|
||||
node: "iobench-redpanda-0".to_string(),
|
||||
test_name: "fsync_hot_path".to_string(),
|
||||
iops: 1000.0,
|
||||
bandwidth_kibps: 16000.0,
|
||||
latency_mean_ms: 5.0,
|
||||
latency_stddev_ms: 1.0,
|
||||
clat_p50_ms: 4.0,
|
||||
clat_p95_ms: 8.0,
|
||||
clat_p99_ms: 12.0,
|
||||
clat_p99_9_ms: 20.0,
|
||||
clat_p99_99_ms: 30.0,
|
||||
clat_max_ms: 50.0,
|
||||
fdatasync_total_ios: 50000,
|
||||
fdatasync_mean_ms: 2.0,
|
||||
fdatasync_p50_ms: 1.5,
|
||||
fdatasync_p95_ms: 3.0,
|
||||
fdatasync_p99_ms: 5.0,
|
||||
fdatasync_p99_9_ms: 10.0,
|
||||
fdatasync_p99_99_ms: 20.0,
|
||||
fdatasync_max_ms: 50.0,
|
||||
};
|
||||
let dash = DashRow::from(&row);
|
||||
assert_eq!(dash.label, "redpanda-parallel-fsync_hot_path");
|
||||
assert!((dash.fdatasync_p99_9_ms - 10.0).abs() < 0.001);
|
||||
}
|
||||
|
||||
// -- CSV serialization round-trip --
|
||||
|
||||
#[test]
|
||||
fn redpanda_csv_row_serializes_all_columns() {
|
||||
let row = RedpandaCsvRow {
|
||||
label: "redpanda".to_string(),
|
||||
mode: "sequential".to_string(),
|
||||
workload: "throughput".to_string(),
|
||||
node: "node-0".to_string(),
|
||||
test_name: "throughput".to_string(),
|
||||
iops: 3200.0,
|
||||
bandwidth_kibps: 51200.0,
|
||||
latency_mean_ms: 5.0,
|
||||
latency_stddev_ms: 1.0,
|
||||
clat_p50_ms: 4.0,
|
||||
clat_p95_ms: 8.0,
|
||||
clat_p99_ms: 12.0,
|
||||
clat_p99_9_ms: 20.0,
|
||||
clat_p99_99_ms: 30.0,
|
||||
clat_max_ms: 50.0,
|
||||
fdatasync_total_ios: 0,
|
||||
fdatasync_mean_ms: 0.0,
|
||||
fdatasync_p50_ms: 0.0,
|
||||
fdatasync_p95_ms: 0.0,
|
||||
fdatasync_p99_ms: 0.0,
|
||||
fdatasync_p99_9_ms: 0.0,
|
||||
fdatasync_p99_99_ms: 0.0,
|
||||
fdatasync_max_ms: 0.0,
|
||||
};
|
||||
let mut wtr = csv::Writer::from_writer(Vec::new());
|
||||
wtr.serialize(&row).unwrap();
|
||||
wtr.flush().unwrap();
|
||||
let csv_output = String::from_utf8(wtr.into_inner().unwrap()).unwrap();
|
||||
let lines: Vec<&str> = csv_output.lines().collect();
|
||||
|
||||
// Header line must contain all expected column names
|
||||
let header = lines[0];
|
||||
for col in [
|
||||
"label",
|
||||
"mode",
|
||||
"workload",
|
||||
"node",
|
||||
"test_name",
|
||||
"iops",
|
||||
"bandwidth_kibps",
|
||||
"latency_mean_ms",
|
||||
"latency_stddev_ms",
|
||||
"clat_p50_ms",
|
||||
"clat_p95_ms",
|
||||
"clat_p99_ms",
|
||||
"clat_p99_9_ms",
|
||||
"clat_p99_99_ms",
|
||||
"clat_max_ms",
|
||||
"fdatasync_total_ios",
|
||||
"fdatasync_mean_ms",
|
||||
"fdatasync_p50_ms",
|
||||
"fdatasync_p95_ms",
|
||||
"fdatasync_p99_ms",
|
||||
"fdatasync_p99_9_ms",
|
||||
"fdatasync_p99_99_ms",
|
||||
"fdatasync_max_ms",
|
||||
] {
|
||||
assert!(header.contains(col), "CSV header missing column: {}", col);
|
||||
}
|
||||
|
||||
// Data line should have the right number of fields
|
||||
let data_fields: Vec<&str> = lines[1].split(',').collect();
|
||||
let header_fields: Vec<&str> = header.split(',').collect();
|
||||
assert_eq!(data_fields.len(), header_fields.len());
|
||||
}
|
||||
}
|
||||
123
iobench/src/simple.rs
Normal file
123
iobench/src/simple.rs
Normal file
@@ -0,0 +1,123 @@
|
||||
use std::fs;
|
||||
use std::io;
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use chrono::Local;
|
||||
|
||||
use crate::fio::parse_fio_output;
|
||||
use crate::run_command;
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
pub struct SimpleArgs {
|
||||
pub target: String,
|
||||
pub benchmark_dir: String,
|
||||
pub tests: String,
|
||||
pub duration: u64,
|
||||
pub output_dir: Option<String>,
|
||||
pub size: String,
|
||||
pub block_size: String,
|
||||
}
|
||||
|
||||
pub fn run(args: &SimpleArgs) -> io::Result<()> {
|
||||
let output_dir = args
|
||||
.output_dir
|
||||
.clone()
|
||||
.unwrap_or_else(|| format!("./iobench-{}", Local::now().format("%Y-%m-%d-%H%M%S")));
|
||||
fs::create_dir_all(&output_dir)?;
|
||||
|
||||
let tests_to_run: Vec<&str> = args.tests.split(',').collect();
|
||||
let mut results = Vec::new();
|
||||
|
||||
for test in tests_to_run {
|
||||
println!("--------------------------------------------------");
|
||||
println!("Running test: {}", test);
|
||||
|
||||
let (rw, numjobs) = match test {
|
||||
"read" => ("read", 1),
|
||||
"write" => ("write", 1),
|
||||
"randread" => ("randread", 1),
|
||||
"randwrite" => ("randwrite", 1),
|
||||
"multiread" => ("read", 4),
|
||||
"multiwrite" => ("write", 4),
|
||||
"multirandread" => ("randread", 4),
|
||||
"multirandwrite" => ("randwrite", 4),
|
||||
_ => {
|
||||
eprintln!("Unknown test: {}. Skipping.", test);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let test_name = format!("{}-{}-sync-test", test, args.block_size);
|
||||
let fio_command = format!(
|
||||
"fio --filename={}/iobench_testfile --direct=1 --fsync=1 --rw={} --bs={} --numjobs={} --iodepth=1 --runtime={} --time_based --group_reporting --name={} --size={} --output-format=json",
|
||||
args.benchmark_dir, rw, args.block_size, numjobs, args.duration, test_name, args.size
|
||||
);
|
||||
|
||||
println!("Executing command:\n{}\n", fio_command);
|
||||
|
||||
let output = match run_command(&args.target, &fio_command) {
|
||||
Ok(out) => out,
|
||||
Err(e) => {
|
||||
eprintln!("Failed to execute command for test {}: {}", test, e);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
|
||||
let result = parse_fio_output(&output, &test_name, rw);
|
||||
match result {
|
||||
Ok(res) => {
|
||||
results.push(res);
|
||||
}
|
||||
Err(e) => {
|
||||
eprintln!("Error parsing fio output for test {}: {}", test, e);
|
||||
eprintln!("Raw output:\n{}", output);
|
||||
}
|
||||
}
|
||||
|
||||
println!("{output}");
|
||||
println!("Test {} completed.", test);
|
||||
thread::sleep(Duration::from_secs(2));
|
||||
}
|
||||
|
||||
println!("--------------------------------------------------");
|
||||
println!("Cleaning up test file on target...");
|
||||
let cleanup_command = "rm -f ./iobench_testfile";
|
||||
if let Err(e) = run_command(&args.target, cleanup_command) {
|
||||
eprintln!("Warning: Failed to clean up test file on target: {}", e);
|
||||
} else {
|
||||
println!("Cleanup successful.");
|
||||
}
|
||||
|
||||
if results.is_empty() {
|
||||
println!("\nNo benchmark results to display.");
|
||||
return Ok(());
|
||||
}
|
||||
|
||||
let csv_path = format!("{}/summary.csv", output_dir);
|
||||
let mut wtr = csv::Writer::from_path(&csv_path)?;
|
||||
for result in &results {
|
||||
wtr.serialize(result)?;
|
||||
}
|
||||
wtr.flush()?;
|
||||
|
||||
println!("\nBenchmark summary saved to {}", csv_path);
|
||||
println!("\n--- Benchmark Results Summary ---");
|
||||
println!(
|
||||
"{:<25} {:>10} {:>18} {:>20} {:>22}",
|
||||
"Test Name", "IOPS", "Bandwidth (KiB/s)", "Latency Mean (ms)", "Latency StdDev (ms)"
|
||||
);
|
||||
println!("{:-<98}", "");
|
||||
for result in results {
|
||||
println!(
|
||||
"{:<25} {:>10.2} {:>18.2} {:>20.4} {:>22.4}",
|
||||
result.test_name,
|
||||
result.iops,
|
||||
result.bandwidth_kibps,
|
||||
result.latency_mean_ms,
|
||||
result.latency_stddev_ms
|
||||
);
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
Reference in New Issue
Block a user