Files
harmony/fleet/harmony-fleet-operator/src/service/mock.rs
2026-05-20 12:03:19 -04:00

707 lines
22 KiB
Rust

use std::collections::{HashMap, HashSet};
use std::sync::Mutex;
use async_trait::async_trait;
use super::{
Activity, Alert, AlertSeverity, DashboardDetail, DeploymentDetail, DeploymentStatus,
DeviceDetail, DeviceStatus, FleetService, TaskGraph, TaskNode, TaskStatus,
};
pub struct MockFleetService {
devices: Mutex<Vec<DeviceDetail>>,
deployments: Mutex<Vec<DeploymentDetail>>,
alerts: Mutex<Vec<Alert>>,
}
impl Default for MockFleetService {
fn default() -> Self {
Self::with_seeded_data()
}
}
impl MockFleetService {
pub fn with_seeded_data() -> Self {
let devices = seed_devices();
let deployments = seed_deployments();
let alerts = seed_alerts();
Self {
devices: Mutex::new(devices),
deployments: Mutex::new(deployments),
alerts: Mutex::new(alerts),
}
}
}
// ── Seeded PRNG ────────────────────────────────────────────────────────
struct Rng(u32);
impl Iterator for Rng {
type Item = f64;
fn next(&mut self) -> Option<f64> {
self.0 = (self.0.wrapping_mul(1103515245).wrapping_add(12345)) & 0x7fffffff;
Some(self.0 as f64 / 0x7fffffff as f64)
}
}
fn pick<'a, T>(rng: &mut Rng, arr: &'a [T]) -> &'a T {
let i = (rng.next().unwrap() * arr.len() as f64) as usize;
&arr[i.min(arr.len() - 1)]
}
// ── Device seed ────────────────────────────────────────────────────────
fn seed_devices() -> Vec<DeviceDetail> {
let mut rng = Rng(1337);
let hosts = ["edge", "sensor", "gw", "cam", "relay", "meter", "hub"];
let regions = [
"eu-paris-1",
"eu-paris-2",
"us-east-1",
"us-west-2",
"apac-tokyo-1",
];
let models = [
"HF-Edge-2",
"HF-Edge-3",
"HF-Sensor-S1",
"HF-Gateway-G2",
"HF-Cam-V1",
];
let tags_pool = [
"prod", "staging", "lab", "eu", "us", "apac", "gpu", "lowpower", "thermal", "pilot",
];
let status_weights: [(DeviceStatus, f64); 5] = [
(DeviceStatus::Healthy, 0.72),
(DeviceStatus::Pending, 0.10),
(DeviceStatus::Stale, 0.10),
(DeviceStatus::Blacklisted, 0.04),
(DeviceStatus::Unknown, 0.04),
];
let device_deployment: [Option<&str>; 100] = {
let mut arr = [None; 100];
let dist: [(&str, usize); 7] = [
("edge-gateway", 32),
("sensor-firmware", 41),
("ingest-pipeline", 8),
("control-plane", 6),
("telemetry-collector", 12),
("gateway-proxy", 4),
("media-relay", 9),
];
let mut idx = 0;
for (name, count) in dist {
for _ in 0..count {
if idx < 100 {
arr[idx] = Some(name);
idx += 1;
}
}
}
arr
};
let now = chrono::Utc::now();
let mut devices = Vec::with_capacity(100);
for (i, deployment_slot) in device_deployment.iter().enumerate() {
let host = pick(&mut rng, &hosts);
let id = format!("hf-{}-{:03}", host, i + 1);
let status = if i < 4 {
DeviceStatus::Healthy
} else if i == 4 {
DeviceStatus::Failing
} else if i == 5 {
DeviceStatus::Pending
} else {
let r = rng.next().unwrap();
let mut acc = 0.0;
let mut s = DeviceStatus::Healthy;
for &(st, w) in &status_weights {
acc += w;
if r < acc {
s = st;
break;
}
}
s
};
let minutes_ago = match status {
DeviceStatus::Stale => 60 + (rng.next().unwrap() * 4000.0) as i64,
DeviceStatus::Pending => (rng.next().unwrap() * 5.0) as i64,
DeviceStatus::Blacklisted => 600 + (rng.next().unwrap() * 8000.0) as i64,
_ => (rng.next().unwrap() * 12.0) as i64,
};
let last_seen = now - chrono::Duration::minutes(minutes_ago);
let ip = format!(
"10.{}.{}.{}",
20 + (rng.next().unwrap() * 4.0) as u8,
(rng.next().unwrap() * 256.0) as u8,
(rng.next().unwrap() * 256.0) as u8
);
let mut tags = Vec::new();
let num_tags = 1 + (rng.next().unwrap() * 3.0) as usize;
let mut seen = HashSet::new();
for _ in 0..num_tags {
let t = pick(&mut rng, &tags_pool).to_string();
if seen.insert(t.clone()) {
tags.push(t);
}
}
let model = pick(&mut rng, &models).to_string();
let region = pick(&mut rng, &regions).to_string();
let deployment = deployment_slot.map(str::to_string);
let fw = format!(
"v{}.{}.{}",
1 + (rng.next().unwrap() * 3.0) as u8,
(rng.next().unwrap() * 20.0) as u8,
(rng.next().unwrap() * 10.0) as u8
);
let uptime_h = if status == DeviceStatus::Stale {
0
} else {
(rng.next().unwrap() * 4200.0) as u32
};
let cpu = 5 + (rng.next().unwrap() * 70.0) as u8;
let mem = 15 + (rng.next().unwrap() * 70.0) as u8;
devices.push(DeviceDetail {
id,
status,
last_seen,
minutes_ago,
deployment,
ip: Some(ip),
region,
model,
fw,
tags,
uptime_h,
cpu,
mem,
});
}
// Force some control-plane devices to failing
let mut cp_failed = 0;
for d in &mut devices {
if d.deployment.as_deref() == Some("control-plane") && cp_failed < 2 {
d.status = DeviceStatus::Failing;
cp_failed += 1;
}
}
devices
}
// ── Deployment seed ────────────────────────────────────────────────────
fn seed_deployments() -> Vec<DeploymentDetail> {
vec![
DeploymentDetail {
name: "edge-gateway".into(),
version: "v2.14.1".into(),
status: DeploymentStatus::Active,
target: 32,
healthy: 31,
failing: 0,
pending: 1,
updated_at: "2026-05-19 04:12".into(),
author: "r.tarzalt".into(),
},
DeploymentDetail {
name: "sensor-firmware".into(),
version: "v0.9.3".into(),
status: DeploymentStatus::Rolling,
target: 41,
healthy: 28,
failing: 1,
pending: 12,
updated_at: "2026-05-19 06:48".into(),
author: "m.lavoie".into(),
},
DeploymentDetail {
name: "ingest-pipeline".into(),
version: "v1.7.0".into(),
status: DeploymentStatus::Active,
target: 8,
healthy: 8,
failing: 0,
pending: 0,
updated_at: "2026-05-15 11:30".into(),
author: "r.tarzalt".into(),
},
DeploymentDetail {
name: "control-plane".into(),
version: "v3.2.0".into(),
status: DeploymentStatus::Failing,
target: 6,
healthy: 3,
failing: 2,
pending: 1,
updated_at: "2026-05-19 07:01".into(),
author: "a.singh".into(),
},
DeploymentDetail {
name: "telemetry-collector".into(),
version: "v0.4.12".into(),
status: DeploymentStatus::Active,
target: 12,
healthy: 12,
failing: 0,
pending: 0,
updated_at: "2026-05-12 09:22".into(),
author: "m.lavoie".into(),
},
DeploymentDetail {
name: "gateway-proxy".into(),
version: "v1.0.5".into(),
status: DeploymentStatus::Paused,
target: 4,
healthy: 0,
failing: 0,
pending: 4,
updated_at: "2026-05-18 18:14".into(),
author: "r.tarzalt".into(),
},
DeploymentDetail {
name: "media-relay".into(),
version: "v2.0.0-rc.3".into(),
status: DeploymentStatus::Rolling,
target: 9,
healthy: 5,
failing: 0,
pending: 4,
updated_at: "2026-05-19 06:55".into(),
author: "a.singh".into(),
},
]
}
// ── Alerts seed ────────────────────────────────────────────────────────
fn seed_alerts() -> Vec<Alert> {
vec![
Alert {
id: "al-1".into(),
severity: AlertSeverity::Critical,
title: "control-plane rollout failing on 2 devices".into(),
deployment: Some("control-plane".into()),
device: Some("hf-gw-018".into()),
at: "2 min ago".into(),
acked: false,
},
Alert {
id: "al-2".into(),
severity: AlertSeverity::Critical,
title: "hf-sensor-042 unreachable for 14 minutes".into(),
deployment: Some("sensor-firmware".into()),
device: Some("hf-sensor-042".into()),
at: "14 min ago".into(),
acked: false,
},
Alert {
id: "al-3".into(),
severity: AlertSeverity::Warning,
title: "sensor-firmware rollout stalled at 68%".into(),
deployment: Some("sensor-firmware".into()),
device: None,
at: "22 min ago".into(),
acked: false,
},
Alert {
id: "al-4".into(),
severity: AlertSeverity::Warning,
title: "hf-cam-011 reporting elevated thermal (78°C)".into(),
deployment: None,
device: Some("hf-cam-011".into()),
at: "1h ago".into(),
acked: false,
},
Alert {
id: "al-5".into(),
severity: AlertSeverity::Info,
title: "edge-gateway v2.14.1 deployed to 31 devices".into(),
deployment: Some("edge-gateway".into()),
device: None,
at: "3h ago".into(),
acked: true,
},
]
}
// ── Activity feed ─────────────────────────────────────────────────────
fn activity_feed() -> Vec<Activity> {
vec![
Activity {
who: "r.tarzalt".into(),
verb: "started rollout".into(),
target: "sensor-firmware v0.9.3".into(),
at: "07:24".into(),
},
Activity {
who: "system".into(),
verb: "auto-blacklisted".into(),
target: "hf-sensor-091".into(),
at: "07:18".into(),
},
Activity {
who: "a.singh".into(),
verb: "paused deployment".into(),
target: "gateway-proxy".into(),
at: "07:02".into(),
},
Activity {
who: "system".into(),
verb: "detected failure".into(),
target: "hf-gw-018 (control-plane)".into(),
at: "06:51".into(),
},
Activity {
who: "m.lavoie".into(),
verb: "updated task graph".into(),
target: "telemetry-collector".into(),
at: "06:14".into(),
},
Activity {
who: "r.tarzalt".into(),
verb: "logged in".into(),
target: String::new(),
at: "06:02".into(),
},
]
}
// ── Trend generation ──────────────────────────────────────────────────
fn ingest_trend() -> Vec<u32> {
let mut rng = Rng(1337);
(0..48)
.map(|i| {
let base = 38.0 + (i as f64 / 4.0).sin() * 8.0 + (rng.next().unwrap() * 6.0);
(base.max(2.0)).round() as u32
})
.collect()
}
fn health_trend() -> Vec<f64> {
let mut rng = Rng(1337);
(0..48)
.map(|i| {
let v = 96.0 + (i as f64 / 6.0).sin() * 1.4 - if i > 38 && i < 44 { 6.0 } else { 0.0 }
+ (rng.next().unwrap() * 0.4);
(v * 10.0).round() / 10.0
})
.collect()
}
// ── Task graph ─────────────────────────────────────────────────────────
fn task_graph() -> TaskGraph {
let mut positions = HashMap::new();
positions.insert("t1".into(), (0, 1));
positions.insert("t2".into(), (1, 1));
positions.insert("t3".into(), (2, 1));
positions.insert("t4".into(), (3, 1));
positions.insert("t5".into(), (4, 1));
positions.insert("t6".into(), (5, 0));
positions.insert("t7".into(), (5, 2));
positions.insert("t8".into(), (6, 1));
TaskGraph {
nodes: vec![
TaskNode {
id: "t1".into(),
label: "fetch artifact".into(),
status: TaskStatus::Done,
duration: "2s".into(),
},
TaskNode {
id: "t2".into(),
label: "verify signature".into(),
status: TaskStatus::Done,
duration: "0.4s".into(),
},
TaskNode {
id: "t3".into(),
label: "stop services".into(),
status: TaskStatus::Done,
duration: "1.1s".into(),
},
TaskNode {
id: "t4".into(),
label: "install deps".into(),
status: TaskStatus::Running,
duration: "12s".into(),
},
TaskNode {
id: "t5".into(),
label: "mount volumes".into(),
status: TaskStatus::Pending,
duration: "".into(),
},
TaskNode {
id: "t6".into(),
label: "launch sensord".into(),
status: TaskStatus::Pending,
duration: "".into(),
},
TaskNode {
id: "t7".into(),
label: "launch relayd".into(),
status: TaskStatus::Pending,
duration: "".into(),
},
TaskNode {
id: "t8".into(),
label: "health probe".into(),
status: TaskStatus::Pending,
duration: "".into(),
},
],
edges: vec![
("t1".into(), "t2".into()),
("t2".into(), "t3".into()),
("t3".into(), "t4".into()),
("t4".into(), "t5".into()),
("t5".into(), "t6".into()),
("t5".into(), "t7".into()),
("t6".into(), "t8".into()),
("t7".into(), "t8".into()),
],
positions,
}
}
// ── FleetService impl ─────────────────────────────────────────────────
#[async_trait]
impl FleetService for MockFleetService {
async fn dashboard_detail(&self) -> anyhow::Result<DashboardDetail> {
let devices = self.devices.lock().unwrap();
let deployments = self.deployments.lock().unwrap();
let alerts = self.alerts.lock().unwrap();
let mut d = DashboardDetail {
devices_total: devices.len() as u32,
devices_healthy: 0,
devices_pending: 0,
devices_failing: 0,
devices_stale: 0,
devices_blacklisted: 0,
devices_unknown: 0,
deployments_total: deployments.len(),
health_pct: 0,
health_trend: health_trend(),
ingest_rate: *ingest_trend().last().unwrap_or(&0),
ingest_trend: ingest_trend(),
attention_devices: vec![],
activity_feed: activity_feed(),
top_deployments: deployments.clone(),
active_alerts: alerts
.iter()
.filter(|a| !a.acked)
.take(10)
.cloned()
.collect(),
rolling_count: 0,
failing_count: 0,
};
for dev in devices.iter() {
match dev.status {
DeviceStatus::Healthy => d.devices_healthy += 1,
DeviceStatus::Pending => d.devices_pending += 1,
DeviceStatus::Stale => d.devices_stale += 1,
DeviceStatus::Failing => d.devices_failing += 1,
DeviceStatus::Blacklisted => d.devices_blacklisted += 1,
DeviceStatus::Unknown => d.devices_unknown += 1,
}
}
d.health_pct = ((d.devices_healthy as f64 / d.devices_total as f64) * 100.0).round() as u32;
d.attention_devices = devices
.iter()
.filter(|d| {
d.status == DeviceStatus::Failing
|| d.status == DeviceStatus::Stale
|| d.status == DeviceStatus::Pending
})
.take(12)
.cloned()
.collect();
for dep in deployments.iter() {
match dep.status {
DeploymentStatus::Rolling => d.rolling_count += 1,
DeploymentStatus::Failing => d.failing_count += 1,
_ => {}
}
}
d.top_deployments.truncate(4);
Ok(d)
}
async fn list_devices(&self) -> anyhow::Result<Vec<DeviceDetail>> {
Ok(self.devices.lock().unwrap().clone())
}
async fn get_device(&self, id: &str) -> anyhow::Result<Option<DeviceDetail>> {
Ok(self
.devices
.lock()
.unwrap()
.iter()
.find(|d| d.id == id)
.cloned())
}
async fn list_deployments(&self) -> anyhow::Result<Vec<DeploymentDetail>> {
Ok(self.deployments.lock().unwrap().clone())
}
async fn get_deployment(&self, name: &str) -> anyhow::Result<Option<DeploymentDetail>> {
Ok(self
.deployments
.lock()
.unwrap()
.iter()
.find(|d| d.name == name)
.cloned())
}
async fn get_deployment_devices(&self, name: &str) -> anyhow::Result<Vec<DeviceDetail>> {
Ok(self
.devices
.lock()
.unwrap()
.iter()
.filter(|d| d.deployment.as_deref() == Some(name))
.cloned()
.collect())
}
async fn blacklist_device(&self, id: &str) -> anyhow::Result<DeviceDetail> {
let mut devices = self.devices.lock().unwrap();
let dev = devices
.iter_mut()
.find(|d| d.id == id)
.ok_or_else(|| anyhow::anyhow!("device {id} not found"))?;
dev.status = DeviceStatus::Blacklisted;
dev.deployment = None;
Ok(dev.clone())
}
async fn list_alerts(&self) -> anyhow::Result<Vec<Alert>> {
Ok(self.alerts.lock().unwrap().clone())
}
async fn ack_alert(&self, id: &str) -> anyhow::Result<bool> {
let mut alerts = self.alerts.lock().unwrap();
if let Some(a) = alerts.iter_mut().find(|a| a.id == id) {
a.acked = true;
Ok(true)
} else {
Ok(false)
}
}
async fn get_task_graph(&self, _deployment: &str) -> anyhow::Result<TaskGraph> {
Ok(task_graph())
}
async fn filtered_devices(
&self,
status: Option<DeviceStatus>,
deployment: Option<String>,
region: Option<String>,
search: Option<String>,
) -> anyhow::Result<Vec<DeviceDetail>> {
let devices = self.devices.lock().unwrap();
let mut out: Vec<DeviceDetail> = devices
.iter()
.filter(|&d| {
if let Some(s) = status {
if d.status != s {
return false;
}
}
if let Some(ref dep) = deployment {
if d.deployment.as_deref() != Some(dep.as_str()) {
return false;
}
}
if let Some(ref reg) = region {
if d.region != *reg {
return false;
}
}
if let Some(ref q) = search {
let q = q.to_lowercase();
if !d.id.to_lowercase().contains(&q)
&& !d
.deployment
.as_deref()
.unwrap_or("")
.to_lowercase()
.contains(&q)
&& !d.ip.as_deref().unwrap_or("").contains(&q)
&& !d.tags.iter().any(|t| t.to_lowercase().contains(&q))
{
return false;
}
}
true
})
.cloned()
.collect();
out.sort_by(|a, b| a.id.cmp(&b.id));
Ok(out)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[tokio::test]
async fn dashboard_detail_counts() {
let svc = MockFleetService::default();
let d = svc.dashboard_detail().await.unwrap();
assert_eq!(d.devices_total, 100);
assert!(d.devices_healthy > 0);
assert!(d.health_pct > 0);
assert!(!d.activity_feed.is_empty());
}
#[tokio::test]
async fn blacklist_flips_status() {
let svc = MockFleetService::default();
let dev = svc.get_device("hf-edge-001").await.unwrap().unwrap();
assert_eq!(dev.status, DeviceStatus::Healthy);
svc.blacklist_device("hf-edge-001").await.unwrap();
let after = svc.get_device("hf-edge-001").await.unwrap().unwrap();
assert_eq!(after.status, DeviceStatus::Blacklisted);
assert!(after.deployment.is_none());
}
#[tokio::test]
async fn filtered_devices_by_status() {
let svc = MockFleetService::default();
let failing = svc
.filtered_devices(Some(DeviceStatus::Failing), None, None, None)
.await
.unwrap();
assert!(failing.iter().all(|d| d.status == DeviceStatus::Failing));
}
}