feat(fleet-operator): aggregator recovery signal + orphan GC + recovery e2e (Ch2) #328

Open
johnride wants to merge 3 commits from feat/fleet-ch2-operator-recovery into feat/fleet-device-exec-logs
13 changed files with 1154 additions and 58 deletions

1
Cargo.lock generated
View File

@@ -4131,6 +4131,7 @@ dependencies = [
"anyhow",
"async-nats",
"async-trait",
"chrono",
"directories",
"futures-util",
"harmony",

View File

@@ -0,0 +1,219 @@
# Fleet operator recovery scenarios
The operator is **stateless across restarts**. It can be killed, upgraded, or
rescheduled at any time. On restart it cold-rebuilds from two durable sources —
kube CRs and NATS KV — with no customer-visible "unknown state" window.
## Architecture
```
┌─────────────────────────────────────────────────────────────────┐
│ Operator process │
│ │
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
│ │ Deployment │ │ Device │ │ Device-state KV │ │
│ │ CR watcher │ │ CR watcher │ │ watcher (history) │ │
│ └──────┬───────┘ └──────┬───────┘ └──────────┬───────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ FleetState (mutex) │ │
│ │ deployments: {key → CachedDeployment} │ │
│ │ devices: {name → labels} │ │
│ │ states: {(device,deployment) → DeploymentState} │ │
│ │ owned_targets: {deployment → {device…}} │ │
│ │ dirty: {key…} ──► patch_tick (1 Hz) │ │
│ └──────────────────────────────────────────────────────────┘ │
│ │ │
│ ▼ │
│ ┌──────────────────┐ ┌────────────────────────────────┐ │
│ │ desired-state KV │ │ Deployment.status.aggregate │ │
│ │ put / delete │ │ (patched to kube at 1 Hz) │ │
│ └──────────────────┘ └────────────────────────────────┘ │
│ │
│ ┌──────────────────────────────────────────────────────────┐ │
│ │ OperatorLiveness (3 atomic latches, shared w/ dashboard) │ │
│ │ deployments_ready ─┐ │ │
│ │ devices_ready ─────┼──► all three → Converged │ │
│ │ states_ready ──────┘ (else: Recovering → banner) │ │
│ └──────────────────────────────────────────────────────────┘ │
└─────────────────────────────────────────────────────────────────┘
```
### Cold-start sequence
```
Operator starts
├─ seed_owned_targets() reads desired-state KV keys
│ → populates owned_targets
│ (so first reconcile diffs correctly)
├─ spawn 3 watchers (JoinSet) │
│ ├─ Deployment CR watcher │ replay → InitDone → mark_deployments_ready
│ │ │ + gc_orphaned_desired_state
│ ├─ Device CR watcher │ replay → InitDone → mark_devices_ready
│ └─ Device-state KV watch │ replay → seen_current → mark_states_ready
│ │ (or: empty bucket → mark immediately)
└─ spawn patch_tick (1 Hz) flushes dirty CR status patches
```
### Convergence banner
```
Dashboard reads OperatorLiveness.phase():
Recovering ──────► Converged
┌──────────────┐ ┌─────┐
│ ⚠ rebuilding │ │ │ (empty div, polling stops)
│ polls /2s │ │ │
└──────────────┘ └─────┘
```
The banner self-clears when all three latches trip. Typically sub-second.
### Key invariants
| Invariant | Mechanism |
|---|---|
| Writes are byte-deterministic | Same CR → same serialized score JSON. Two operators write identical bytes. |
| No leader election needed | Idempotent multi-writer at current fleet size. (HA = D3, deferred.) |
| Cancel = clean teardown | `JoinSet` owns all watchers; dropping `run`'s future aborts children. |
| `owned_targets` seeded before first reconcile | `seed_owned_targets()` reads KV keys so diff doesn't orphan entries. |
| Orphan GC on `InitDone` | Deployment CRs deleted while operator was down → purged at convergence. |
| Out-of-order state dedup | `apply_state` rejects entries with older `last_event_at`. |
---
## Scenarios
### 1. Cold restart, healthy fleet
Operator killed and restarted. Devices kept reporting to KV throughout.
```
Time ──────────────────────────────────────────────────►
Operator: [running]─── KILL ───[restart]──────────────────►
├─ seed_owned_targets (from KV keys)
├─ replay CRs → InitDone
├─ replay device-state → seen_current
└─ Converged
KV: [desired-state ✓] [device-state ✓] (untouched)
Expected: desired-state re-written identically (agents byte-compare → no-op)
health counts rebuilt from device-state KV
liveness → Converged
```
**Test:** `aggregator_converges_from_kv_after_restart`
### 2. Stale KV — CR deleted while operator down
A Deployment CR is force-deleted (finalizer bypassed: namespace teardown,
`--force`) while no operator is running. The desired-state KV entry is orphaned.
```
Time ──────────────────────────────────────────────────►
Operator: [running]─── KILL ──────────────[restart]──────────►
│ │
│ ├─ seed_owned_targets
│ │ (picks up orphan key)
│ │
│ ├─ CR replay: InitDone
│ │ deployment NOT in cache
│ │
│ └─ gc_orphaned_desired_state
│ deletes orphan from KV
CR: [exists]─── force-delete ──[gone]─────────────────►
KV: [desired-state ✓]──────────[orphan]───[purged]────►
Expected: orphan desired-state deleted at convergence
agents stop running the dead deployment
```
**Test:** `aggregator_gcs_desired_state_for_deleted_cr`
### 3. Two operators racing (rolling deploy overlap)
Brief period where two operator replicas run simultaneously.
```
Time ──────────────────────────────────────────────────►
Operator A: [running]────────────────────────────────────────►
Operator B: [start]───────────────────────────────────►
├─ both seed_owned_targets (same KV keys)
├─ both replay same CRs
├─ both compute same matched_devices
└─ both put identical score_json to same KV keys
KV: [desired-state: bytes_A] == [desired-state: bytes_B]
Expected: KV value stable regardless of write order
no orphan, no flapping, no leader election needed
```
**Test:** `two_aggregators_produce_identical_desired_state`
### 4. Partial KV — device offline during restart
A device hasn't reported state (no `device-state` entry) when the operator
restarts.
```
Time ──────────────────────────────────────────────────►
Operator: [running]─── KILL ───[restart]──────────────────►
Device A: [reporting ✓]────────────────────────────────────►
Device B: [offline]────────────────────────────────────────►
(no device-state entry in KV)
Expected: Device A → Running (rebuilt from KV)
Device B → Pending (no state entry = pending in aggregate)
Device B recovers to Running when it reports
```
**Test:** `device_offline_during_restart_counts_as_pending`
### 5. Chaos — kill under write load
Operator repeatedly killed and restarted while deployments are being created.
```
Time ──────────────────────────────────────────────────►
Deployments: [create d0] [create d1] [create d2] [create d3] [create d4]
Operator: [running]──────── KILL ──────┘──[restart]──── KILL ──[final]──►
└─ converge
all 5
Expected: final operator converges full desired-state set within 30s
```
**Test:** `chaos_kill_under_write_load_converges`
---
## Running the regression tests
```bash
HARMONY_FLEET_E2E=1 cargo test -p harmony-fleet-e2e --test operator_recovery
```
**Prerequisites:** k3d + podman on PATH. The shared harness brings up NATS in
a fresh namespace.
**How it works:** tests drive `fleet_aggregator::run` in-process against the
real NATS + k3d. `spawn_aggregator` returns a `JoinHandle`; `kill` aborts it
and awaits (ensuring the `JoinSet` children tear down before the next instance
starts).

View File

@@ -60,3 +60,4 @@ uuid = { version = "1", features = ["v4"] }
[dev-dependencies]
tokio = { workspace = true, features = ["full"] }
chrono = { workspace = true }

View File

@@ -172,6 +172,22 @@ impl AdminKv {
Ok(())
}
/// Publish a `device-state` entry directly, simulating an agent report.
/// Recovery tests use this to assert the operator rebuilds health counts
/// from KV alone after a restart.
pub async fn put_device_state(&self, state: &DeploymentState) -> Result<(), AdminKvError> {
Review

This should not exist on a live code path. At the very least use cfg test so it does not make it in the release binary.

This should not exist on a live code path. At the very least use cfg test so it does not make it in the release binary.
let key = device_state_key(&state.device_id.to_string(), &state.deployment);
let payload = serde_json::to_vec(state).map_err(AdminKvError::Serialize)?;
self.device_state
.put(&key, payload.into())
.await
.map_err(|e| AdminKvError::Put {
key,
source: Box::new(e),
})?;
Ok(())
}
/// Delete a desired-state entry. Triggers the agent's tombstone
/// reconcile path: containers for that deployment are removed
/// and the corresponding device-state entry is dropped.

View File

@@ -0,0 +1,568 @@
//! Operator restart + aggregator recovery regression tests (v0.3 Ch2).
//!
//! Each test maps to a scenario in `docs/fleet-operator-recovery-scenarios.md`.
//! Structure per test:
//!
//! 1. SETUP — create devices + deployments, start first aggregator
//! 2. ACTION — kill/restart/delete (the failure being tested)
//! 3. ASSERT — verify convergence + expected end state
//!
//! Gated behind `HARMONY_FLEET_E2E=1` (needs k3d + podman on PATH).
use std::collections::BTreeMap;
use std::time::{Duration, Instant};
use async_nats::jetstream;
use chrono::Utc;
use harmony::modules::fleet::operator::crd::{
Deployment, DeploymentSpec, Device, DeviceSpec, PodmanService, PodmanV0Score, ReconcileScore,
Rollout, RolloutStrategy,
};
use harmony_fleet_e2e::{AdminKv, StackOptions, shared_stack};
use harmony_fleet_operator::fleet_aggregator;
use harmony_fleet_operator::liveness::OperatorLiveness;
use harmony_reconciler_contracts::{
BUCKET_DESIRED_STATE, DeploymentName, DeploymentState, Id, Phase, desired_state_key,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use kube::Client;
use kube::api::{Api, DeleteParams, ObjectMeta, PostParams};
use tokio::task::JoinHandle;
const E2E_ENV: &str = "HARMONY_FLEET_E2E";
const LABEL_KEY: &str = "recovery-grp";
fn e2e_enabled() -> bool {
matches!(std::env::var(E2E_ENV).as_deref(), Ok("1" | "true"))
}
fn skip() {
eprintln!("skipping {E2E_ENV}-gated test (set {E2E_ENV}=1; needs k3d + podman)");
}
// ── Fixtures ────────────────────────────────────────────────────────────
async fn recovery_stack() -> anyhow::Result<std::sync::Arc<harmony_fleet_e2e::Stack>> {
let _ = tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("warn")),
)
.try_init();
let stack = shared_stack(StackOptions {
deploy_agent: false,
deploy_operator: false,
..StackOptions::default()
})
.await?;
harmony_fleet_deploy::operator::install_crds().await?;
wait_crds_ready().await?;
Ok(stack)
}
async fn wait_crds_ready() -> anyhow::Result<()> {
let client = Client::try_default().await?;
let deployments: Api<Deployment> = Api::all(client.clone());
let devices: Api<Device> = Api::all(client);
let deadline = Instant::now() + Duration::from_secs(30);
loop {
let ok = deployments.list(&Default::default()).await.is_ok()
&& devices.list(&Default::default()).await.is_ok();
if ok {
return Ok(());
}
if Instant::now() >= deadline {
anyhow::bail!("CRDs not Established within 30s");
}
tokio::time::sleep(Duration::from_millis(500)).await;
}
}
async fn admin_nats(stack: &harmony_fleet_e2e::Stack) -> anyhow::Result<async_nats::Client> {
Ok(async_nats::ConnectOptions::new()
.user_and_password(stack.admin_user.clone(), stack.admin_pass.clone())
.connect(&stack.nats_url)
.await?)
}
async fn spawn_aggregator(
stack: &harmony_fleet_e2e::Stack,
) -> anyhow::Result<(JoinHandle<()>, OperatorLiveness)> {
let kube = Client::try_default().await?;
let js = jetstream::new(admin_nats(stack).await?);
let liveness = OperatorLiveness::new();
let handle = {
let liveness = liveness.clone();
tokio::spawn(async move {
if let Err(e) = fleet_aggregator::run(kube, js, liveness).await {
tracing::warn!(error = %e, "test aggregator exited");
}
})
};
Ok((handle, liveness))
}
async fn kill(handle: JoinHandle<()>) {
handle.abort();
let _ = handle.await;
}
fn podman_score() -> PodmanV0Score {
PodmanV0Score {
services: vec![PodmanService {
name: "hello".to_string(),
image: "docker.io/library/hello-world:latest".to_string(),
ports: vec![],
env: vec![],
volumes: vec![],
restart_policy: Default::default(),
}],
}
}
fn device_with_label(name: &str, group: &str) -> Device {
let mut labels = BTreeMap::new();
labels.insert(LABEL_KEY.to_string(), group.to_string());
Device {
metadata: ObjectMeta {
name: Some(name.to_string()),
labels: Some(labels),
..Default::default()
},
spec: DeviceSpec { inventory: None },
status: None,
}
}
fn deployment_matching(name: &str, group: &str) -> Deployment {
let mut ml = BTreeMap::new();
ml.insert(LABEL_KEY.to_string(), group.to_string());
Deployment {
metadata: ObjectMeta {
name: Some(name.to_string()),
..Default::default()
},
spec: DeploymentSpec {
target_selector: LabelSelector {
match_labels: Some(ml),
match_expressions: None,
},
score: ReconcileScore::PodmanV0(podman_score()),
rollout: Rollout {
strategy: RolloutStrategy::Immediate,
},
},
status: None,
}
}
async fn wait_until<F, Fut>(budget: Duration, mut f: F) -> anyhow::Result<()>
where
F: FnMut() -> Fut,
Fut: std::future::Future<Output = bool>,
{
let deadline = Instant::now() + budget;
loop {
if f().await {
return Ok(());
}
if Instant::now() >= deadline {
anyhow::bail!("condition not met within {budget:?}");
}
tokio::time::sleep(Duration::from_millis(300)).await;
}
}
async fn desired_state_present(
stack: &harmony_fleet_e2e::Stack,
device: &str,
deployment: &str,
) -> anyhow::Result<bool> {
let js = jetstream::new(admin_nats(stack).await?);
let bucket = js.get_key_value(BUCKET_DESIRED_STATE).await?;
let key = desired_state_key(device, &DeploymentName::try_new(deployment)?);
Ok(bucket.get(&key).await?.is_some())
}
async fn cleanup(stack: &harmony_fleet_e2e::Stack, devices: &[&str], deployments: &[&str]) {
if let Ok(client) = Client::try_default().await {
let dev_api: Api<Device> = Api::all(client.clone());
let dep_api: Api<Deployment> = Api::namespaced(client, &stack.namespace);
for d in devices {
let _ = dev_api.delete(d, &DeleteParams::default()).await;
}
for d in deployments {
let _ = dep_api.delete(d, &DeleteParams::default()).await;
}
}
}
// ── Scenario 1: cold restart, healthy fleet ─────────────────────────────
//
// SETUP: 1 device (recov1-dev, group=g1)
// 1 deployment (recov1-depl, selector: g1)
// aggregator converges, agent reports Running
//
// ACTION: kill aggregator, start a new one
//
// ASSERT: desired-state survives restart (idempotent re-write)
// health counts rebuilt: succeeded=1, matched=1
// liveness → Converged
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> {
if !e2e_enabled() {
skip();
return Ok(());
}
let stack = recovery_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
let (dev, depl, grp) = ("recov1-dev", "recov1-depl", "g1");
let _ = devices.delete(dev, &DeleteParams::default()).await;
let _ = deployments.delete(depl, &DeleteParams::default()).await;
// SETUP: create device + deployment, run first aggregator to convergence
devices
.create(&PostParams::default(), &device_with_label(dev, grp))
.await?;
deployments
.create(&PostParams::default(), &deployment_matching(depl, grp))
.await?;
let (h1, l1) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async {
l1.is_converged()
&& desired_state_present(&stack, dev, depl)
.await
.unwrap_or(false)
})
.await?;
// Simulate agent having reported Running
let kv = AdminKv::connect(&admin_nats(&stack).await?).await?;
let depl_name = DeploymentName::try_new(depl)?;
kv.put_device_state(&DeploymentState {
device_id: Id::from(dev),
deployment: depl_name.clone(),
phase: Phase::Running,
last_event_at: Utc::now(),
last_error: None,
})
.await?;
// ACTION: kill and restart aggregator
kill(h1).await;
let (h2, l2) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async { l2.is_converged() }).await?;
// ASSERT: desired-state survives, health counts rebuilt from KV
assert!(
desired_state_present(&stack, dev, depl).await?,
"desired-state must persist across restart"
);
wait_until(Duration::from_secs(30), || async {
deployments
.get(depl)
.await
.ok()
.and_then(|cr| cr.status?.aggregate)
.map(|a| a.succeeded == 1 && a.matched_device_count == 1)
.unwrap_or(false)
})
.await?;
kill(h2).await;
cleanup(&stack, &[dev], &[depl]).await;
Ok(())
}
// ── Scenario 2: CR deleted while operator down ──────────────────────────
//
// SETUP: 1 device (recov2-dev, group=g2)
// 1 deployment (recov2-depl, selector: g2)
// aggregator converges → desired-state written
//
// ACTION: kill aggregator
// force-delete the deployment CR (bypasses finalizer)
// start new aggregator
//
// ASSERT: orphan desired-state is GC'd at convergence
// (deployment no longer exists → no targets → entries purged)
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn aggregator_gcs_desired_state_for_deleted_cr() -> anyhow::Result<()> {
if !e2e_enabled() {
skip();
return Ok(());
}
let stack = recovery_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
let (dev, depl, grp) = ("recov2-dev", "recov2-depl", "g2");
let _ = devices.delete(dev, &DeleteParams::default()).await;
let _ = deployments.delete(depl, &DeleteParams::default()).await;
// SETUP
devices
.create(&PostParams::default(), &device_with_label(dev, grp))
.await?;
deployments
.create(&PostParams::default(), &deployment_matching(depl, grp))
.await?;
let (h1, l1) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async {
l1.is_converged()
&& desired_state_present(&stack, dev, depl)
.await
.unwrap_or(false)
})
.await?;
kill(h1).await;
// ACTION: delete CR while operator is down, then restart
deployments.delete(depl, &DeleteParams::default()).await?;
assert!(
desired_state_present(&stack, dev, depl).await?,
"orphan desired-state should still be present before recovery"
);
let (h2, _l2) = spawn_aggregator(&stack).await?;
// ASSERT: orphan is GC'd
wait_until(Duration::from_secs(30), || async {
!desired_state_present(&stack, dev, depl)
.await
.unwrap_or(true)
})
.await?;
kill(h2).await;
cleanup(&stack, &[dev], &[]).await;
Ok(())
}
// ── Scenario 3: two operators racing ────────────────────────────────────
//
// SETUP: 1 device (recov3-dev, group=g3)
// 1 deployment (recov3-depl, selector: g3)
//
// ACTION: start TWO aggregators simultaneously (rolling-deploy overlap)
//
// ASSERT: both converge
// desired-state KV value is byte-identical to the serialized score
// (idempotent multi-writer, no leader election needed)
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn two_aggregators_produce_identical_desired_state() -> anyhow::Result<()> {
if !e2e_enabled() {
skip();
return Ok(());
}
let stack = recovery_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
let (dev, depl, grp) = ("recov3-dev", "recov3-depl", "g3");
let _ = devices.delete(dev, &DeleteParams::default()).await;
let _ = deployments.delete(depl, &DeleteParams::default()).await;
// SETUP
devices
.create(&PostParams::default(), &device_with_label(dev, grp))
.await?;
deployments
.create(&PostParams::default(), &deployment_matching(depl, grp))
.await?;
// ACTION: two operators at once
let (h_a, l_a) = spawn_aggregator(&stack).await?;
let (h_b, l_b) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async {
l_a.is_converged() && l_b.is_converged()
})
.await?;
// ASSERT: KV value is deterministic regardless of writer
let js = jetstream::new(admin_nats(&stack).await?);
let bucket = js.get_key_value(BUCKET_DESIRED_STATE).await?;
let key = desired_state_key(dev, &DeploymentName::try_new(depl)?);
let value = bucket.get(&key).await?.expect("desired-state present");
let expected = serde_json::to_vec(&ReconcileScore::PodmanV0(podman_score()))?;
assert_eq!(
value.to_vec(),
expected,
"racing writers must agree byte-for-byte"
);
kill(h_a).await;
kill(h_b).await;
cleanup(&stack, &[dev], &[depl]).await;
Ok(())
}
// ── Scenario 4: device offline during restart ───────────────────────────
//
// SETUP: 2 devices (recov4-dev-a reporting, recov4-dev-b silent)
// 1 deployment (recov4-depl, selector: g4, matches both)
// aggregator converges
// dev-a reports Running; dev-b never reports
//
// ACTION: kill aggregator, start a new one
//
// ASSERT: dev-a → succeeded (rebuilt from device-state KV)
// dev-b → pending (no state entry = pending)
// matched_device_count = 2
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn device_offline_during_restart_counts_as_pending() -> anyhow::Result<()> {
if !e2e_enabled() {
skip();
return Ok(());
}
let stack = recovery_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
let (dev_a, dev_b, depl, grp) = ("recov4-dev-a", "recov4-dev-b", "recov4-depl", "g4");
for d in [dev_a, dev_b] {
let _ = devices.delete(d, &DeleteParams::default()).await;
}
let _ = deployments.delete(depl, &DeleteParams::default()).await;
// SETUP: 2 devices, 1 deployment matching both
devices
.create(&PostParams::default(), &device_with_label(dev_a, grp))
.await?;
devices
.create(&PostParams::default(), &device_with_label(dev_b, grp))
.await?;
deployments
.create(&PostParams::default(), &deployment_matching(depl, grp))
.await?;
let (h1, l1) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async {
l1.is_converged()
&& desired_state_present(&stack, dev_a, depl)
.await
.unwrap_or(false)
&& desired_state_present(&stack, dev_b, depl)
.await
.unwrap_or(false)
})
.await?;
// Only dev-a reports Running; dev-b stays silent
let kv = AdminKv::connect(&admin_nats(&stack).await?).await?;
let depl_name = DeploymentName::try_new(depl)?;
kv.put_device_state(&DeploymentState {
device_id: Id::from(dev_a),
deployment: depl_name.clone(),
phase: Phase::Running,
last_event_at: Utc::now(),
last_error: None,
})
.await?;
// ACTION: kill and restart aggregator
kill(h1).await;
let (h2, l2) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async { l2.is_converged() }).await?;
// ASSERT: dev-a = succeeded, dev-b = pending, matched = 2
wait_until(Duration::from_secs(30), || async {
deployments
.get(depl)
.await
.ok()
.and_then(|cr| cr.status?.aggregate)
.map(|a| {
a.matched_device_count == 2 && a.succeeded == 1 && a.pending == 1
})
.unwrap_or(false)
})
.await?;
kill(h2).await;
cleanup(&stack, &[dev_a, dev_b], &[depl]).await;
Ok(())
}
// ── Scenario 5: chaos kill under write load ─────────────────────────────
//
// SETUP: 1 device (recov5-dev, group=g5)
// 5 deployments created incrementally (recov5-depl-0..4)
//
// ACTION: create deployments one by one
// kill + restart aggregator mid-stream (after depl-2)
// kill again after all created
// start final aggregator
//
// ASSERT: final aggregator converges all 5 desired-state entries within 30s
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
async fn chaos_kill_under_write_load_converges() -> anyhow::Result<()> {
if !e2e_enabled() {
skip();
return Ok(());
}
let stack = recovery_stack().await?;
let client = Client::try_default().await?;
let devices: Api<Device> = Api::all(client.clone());
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
let (dev, grp) = ("recov5-dev", "g5");
let names: Vec<String> = (0..5).map(|i| format!("recov5-depl-{i}")).collect();
let _ = devices.delete(dev, &DeleteParams::default()).await;
for n in &names {
let _ = deployments.delete(n, &DeleteParams::default()).await;
}
// SETUP: 1 device
devices
.create(&PostParams::default(), &device_with_label(dev, grp))
.await?;
// ACTION: create deployments while killing/restarting the operator
let (mut handle, _) = spawn_aggregator(&stack).await?;
for (i, n) in names.iter().enumerate() {
deployments
.create(&PostParams::default(), &deployment_matching(n, grp))
.await?;
if i == 2 {
kill(handle).await;
(handle, _) = spawn_aggregator(&stack).await?;
}
}
kill(handle).await;
// Final aggregator must converge all 5
let (h_final, l_final) = spawn_aggregator(&stack).await?;
wait_until(Duration::from_secs(30), || async { l_final.is_converged() }).await?;
// ASSERT: all 5 desired-state entries present
wait_until(Duration::from_secs(30), || async {
for n in &names {
if !desired_state_present(&stack, dev, n).await.unwrap_or(false) {
return false;
}
}
true
})
.await?;
kill(h_final).await;
let dep_refs: Vec<&str> = names.iter().map(String::as_str).collect();
cleanup(&stack, &[dev], &dep_refs).await;
Ok(())
}

View File

@@ -38,6 +38,8 @@ use harmony::modules::fleet::operator::{
};
use tracing::warn;
use crate::liveness::OperatorLiveness;
const PATCH_TICK: Duration = Duration::from_secs(1);
// ---------------------------------------------------------------------------
@@ -149,7 +151,11 @@ fn matched_devices(
// Top-level run
// ---------------------------------------------------------------------------
pub async fn run(client: Client, js: async_nats::jetstream::Context) -> anyhow::Result<()> {
pub async fn run(
client: Client,
js: async_nats::jetstream::Context,
liveness: OperatorLiveness,
) -> anyhow::Result<()> {
let state_bucket = js
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_DEVICE_STATE.to_string(),
@@ -173,81 +179,110 @@ pub async fn run(client: Client, js: async_nats::jetstream::Context) -> anyhow::
let devices_api: Api<Device> = Api::all(client.clone());
let patch_api: Api<Deployment> = Api::all(client);
tracing::info!(
owned = state
.lock()
.await
.owned_targets
.values()
.map(|s| s.len())
.sum::<usize>(),
"aggregator: startup complete"
);
// Compute before logging: a `.await` inside the macro would hold the
// event's non-Send format temporaries across it, making `run` non-Send.
let owned_count: usize = {
let guard = state.lock().await;
guard.owned_targets.values().map(|s| s.len()).sum()
};
tracing::info!(owned = owned_count, "aggregator: startup complete");
let state_watcher_handle = {
// A JoinSet so cancelling `run` (its future dropped) aborts every child
// task — no orphan watchers outliving the aggregator. The whole point of
// recovery is a clean stop/start, so this cancellation property matters
// both in tests (restart simulation) and in process teardown.
let mut tasks = tokio::task::JoinSet::new();
{
let state = state.clone();
let bucket = state_bucket.clone();
tokio::spawn(async move {
if let Err(e) = run_state_kv_watcher(bucket, state).await {
let liveness = liveness.clone();
tasks.spawn(async move {
if let Err(e) = run_state_kv_watcher(bucket, state, liveness).await {
tracing::warn!(error = %e, "aggregator: state watcher exited");
}
})
};
let deployment_watcher_handle = {
});
}
{
let state = state.clone();
let desired = desired_bucket.clone();
tokio::spawn(async move {
if let Err(e) = run_deployment_watcher(deployments_api.clone(), state, desired).await {
let liveness = liveness.clone();
tasks.spawn(async move {
if let Err(e) =
run_deployment_watcher(deployments_api.clone(), state, desired, liveness).await
{
tracing::warn!(error = %e, "aggregator: deployment watcher exited");
}
})
};
let device_watcher_handle = {
});
}
{
let state = state.clone();
let desired = desired_bucket.clone();
tokio::spawn(async move {
if let Err(e) = run_device_watcher(devices_api, state, desired).await {
let liveness = liveness.clone();
tasks.spawn(async move {
if let Err(e) = run_device_watcher(devices_api, state, desired, liveness).await {
tracing::warn!(error = %e, "aggregator: device watcher exited");
}
})
};
let patch_state = state.clone();
let patch_loop = async move {
let mut ticker = tokio::time::interval(PATCH_TICK);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
ticker.tick().await;
if let Err(e) = patch_tick(&patch_api, &patch_state).await {
tracing::warn!(error = %e, "aggregator: patch tick failed");
}
}
};
tokio::select! {
_ = patch_loop => Ok(()),
_ = state_watcher_handle => Ok(()),
_ = deployment_watcher_handle => Ok(()),
_ = device_watcher_handle => Ok(()),
});
}
{
let patch_state = state.clone();
tasks.spawn(async move {
let mut ticker = tokio::time::interval(PATCH_TICK);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
ticker.tick().await;
if let Err(e) = patch_tick(&patch_api, &patch_state).await {
tracing::warn!(error = %e, "aggregator: patch tick failed");
}
}
});
}
// Return when the first task exits (mirrors the old `select!`); dropping
// `tasks` here aborts the rest.
tasks.join_next().await;
Ok(())
}
// ---------------------------------------------------------------------------
// Device-state KV watcher (unchanged path)
// ---------------------------------------------------------------------------
/// True when the bucket currently holds no keys. Used to short-circuit the
/// caught-up signal: an empty bucket's watch yields no entry to carry
/// `seen_current`.
async fn bucket_is_empty(bucket: &Store) -> bool {
match bucket.keys().await {
Ok(mut keys) => keys.next().await.is_none(),
Err(e) => {
tracing::warn!(error = %e, "aggregator: keys() probe failed; assuming non-empty");
false
}
}
}
fn parse_state_key(key: &str) -> Option<DevicePair> {
let rest = key.strip_prefix("state.")?;
let (device, deployment) = rest.split_once('.')?;
let (device, deployment) = rest.rsplit_once('.')?;
Some(DevicePair {
device_id: device.to_string(),
deployment: DeploymentName::try_new(deployment).ok()?,
})
}
async fn run_state_kv_watcher(bucket: Store, state: SharedFleetState) -> anyhow::Result<()> {
async fn run_state_kv_watcher(
bucket: Store,
state: SharedFleetState,
liveness: OperatorLiveness,
) -> anyhow::Result<()> {
// An empty bucket replays nothing, so `seen_current` never surfaces on an
// entry — mark the state source caught-up up front when there's nothing to
// replay. (A write landing just after this is replayed by the watch below
// and re-marks; the mark is an idempotent latch.)
if bucket_is_empty(&bucket).await {
liveness.mark_states_ready();
}
let mut watch = bucket.watch_with_history(">").await?;
while let Some(entry_res) = watch.next().await {
let entry = match entry_res {
@@ -257,6 +292,11 @@ async fn run_state_kv_watcher(bucket: Store, state: SharedFleetState) -> anyhow:
continue;
}
};
// First entry that has caught up to the bucket head means the cold
// replay is done.
if entry.seen_current {
liveness.mark_states_ready();
}
let Some(pair) = parse_state_key(&entry.key) else {
continue;
};
@@ -359,6 +399,7 @@ async fn run_deployment_watcher(
api: Api<Deployment>,
state: SharedFleetState,
desired: Store,
liveness: OperatorLiveness,
) -> anyhow::Result<()> {
let mut stream = watcher::watcher(api, WatcherConfig::default()).boxed();
while let Some(event) = stream.try_next().await? {
@@ -369,12 +410,60 @@ async fn run_deployment_watcher(
Event::Delete(cr) => {
on_deployment_delete(&state, &desired, cr).await;
}
Event::Init | Event::InitDone => {}
Event::Init => {}
Event::InitDone => {
// The initial CR list is fully replayed; the deployment cache is
// now authoritative. GC desired-state for any deployment that
// was seeded from KV but has no surviving CR (deleted while we
// were down — finalizer bypassed, e.g. namespace teardown),
// then declare this source converged.
gc_orphaned_desired_state(&state, &desired).await;
liveness.mark_deployments_ready();
}
}
}
Ok(())
}
/// Purge desired-state entries whose Deployment CR no longer exists. Runs once,
/// after the initial CR list is loaded: `owned_targets` was seeded from the KV
/// (so it includes deployments the previous operator wrote), and any name with
/// no CR in the now-complete cache is an orphan. Belt-and-suspenders with the
/// controller finalizer, which covers deletes that happen while we're *up*.
async fn gc_orphaned_desired_state(state: &SharedFleetState, desired: &Store) {
let orphans: Vec<(DeploymentName, HashSet<String>)> = {
let mut guard = state.lock().await;
let live: HashSet<DeploymentName> = guard
.deployments
.values()
.map(|d| d.deployment_name.clone())
.collect();
let orphan_names: Vec<DeploymentName> = guard
.owned_targets
.keys()
.filter(|n| !live.contains(*n))
.cloned()
.collect();
orphan_names
.into_iter()
.filter_map(|n| guard.owned_targets.remove(&n).map(|set| (n, set)))
.collect()
};
for (deployment_name, devices) in orphans {
for device in &devices {
let k = desired_state_key(device, &deployment_name);
if let Err(e) = desired.delete(&k).await {
tracing::debug!(key = %k, error = %e, "aggregator: orphan desired-state delete failed");
}
}
tracing::info!(
deployment = ?deployment_name,
count = devices.len(),
"aggregator: GC'd orphaned desired-state for deleted deployment"
);
}
}
async fn on_deployment_upsert(state: &SharedFleetState, desired: &Store, cr: Deployment) {
let Some(key) = DeploymentKey::from_cr(&cr) else {
return;
@@ -467,6 +556,7 @@ async fn run_device_watcher(
api: Api<Device>,
state: SharedFleetState,
desired: Store,
liveness: OperatorLiveness,
) -> anyhow::Result<()> {
let mut stream = watcher::watcher(api, WatcherConfig::default()).boxed();
while let Some(event) = stream.try_next().await? {
@@ -477,7 +567,9 @@ async fn run_device_watcher(
Event::Delete(dev) => {
on_device_delete(&state, &desired, dev).await;
}
Event::Init | Event::InitDone => {}
Event::Init => {}
// Initial CR list fully replayed into the cache.
Event::InitDone => liveness.mark_devices_ready(),
}
}
Ok(())
@@ -613,10 +705,10 @@ async fn seed_owned_targets(bucket: &Store, state: &SharedFleetState) -> anyhow:
let mut keys = bucket.keys().await?;
while let Some(key_res) = keys.next().await {
let key = key_res?;
// Keys are `<device>.<deployment>`. The KV key space carries
// no namespace — names are globally unique at this layer —
// which is exactly why `owned_targets` keys by DeploymentName.
let Some((device, deployment)) = key.split_once('.') else {
// Keys are `<device>.<deployment>`. DeploymentName is dot-free
// (RFC1123), so rsplit_once is unambiguous even if device ids
// contain dots.
let Some((device, deployment)) = key.rsplit_once('.') else {
warn!("Could not read device.deployment for key {key}");
continue;
};
@@ -838,4 +930,26 @@ mod tests {
assert_eq!(matched.len(), 1);
assert!(matched.contains("pi-01"));
}
#[test]
fn apply_state_rejects_older_timestamp() {
let cached = cached("ns", "hello", "group", "edge");
let mut s = FleetState::default();
s.deployments.insert(cached.key.clone(), cached.clone());
s.owned_targets.insert(
cached.deployment_name.clone(),
["pi-01"].iter().map(|s| s.to_string()).collect(),
);
let p = pair("pi-01", "hello");
apply_state(&mut s, p.clone(), state("pi-01", "hello", Phase::Running, 10));
assert_eq!(s.states[&p].phase, Phase::Running);
apply_state(&mut s, p.clone(), state("pi-01", "hello", Phase::Failed, 5));
assert_eq!(
s.states[&p].phase,
Phase::Running,
"older event must not overwrite newer state"
);
}
}

View File

@@ -1,5 +1,7 @@
use maud::{DOCTYPE, Markup, PreEscaped, html};
use harmony_fleet_operator::liveness::RecoveryPhase;
use crate::frontend::auth::DashboardSession;
// ── Inline SVG icons ────────────────────────────────────────────────────
@@ -41,6 +43,9 @@ pub fn page(
(sidebar(current_path, session, unacked_alerts))
main class="flex-1 min-w-0 flex flex-col overflow-hidden" {
(topbar(title, unacked_alerts))
// Bootstrap the recovery banner; the fragment self-polls
// while recovering and goes inert once converged.
div id="recovery-banner" hx-get="/__recovery" hx-trigger="load" hx-swap="outerHTML" {}
div class="flex-1 overflow-y-auto grid-bg" { (content) }
}
}
@@ -50,6 +55,28 @@ pub fn page(
}
}
/// Recovery-state banner fragment swapped into `#recovery-banner`.
///
/// `Recovering` → a visible banner that re-requests itself every 2s.
/// `Converged` → an inert element with no trigger, so polling stops and the
/// banner disappears. Same element id either way so HTMX `outerHTML` swaps cleanly.
pub fn recovery_banner(phase: RecoveryPhase) -> Markup {
match phase {
RecoveryPhase::Recovering => html! {
div id="recovery-banner"
hx-get="/__recovery" hx-trigger="load delay:2s" hx-swap="outerHTML"
class="flex items-center gap-2 px-6 py-2 text-[12px] border-b"
style="background:rgba(234,179,8,0.10); border-color:rgba(234,179,8,0.25); color:#fde68a" {
span class="inline-block w-2 h-2 rounded-full animate-pulse" style="background:#eab308" {}
"Operator recovering — rebuilding fleet state from NATS. Counts may be briefly stale."
}
},
RecoveryPhase::Converged => html! {
div id="recovery-banner" {}
},
}
}
fn sidebar(
current_path: &str,
session: Option<&DashboardSession>,

View File

@@ -121,6 +121,8 @@ pub fn router(state: AppState) -> Router {
// Settings
.route("/settings", get(settings_handler))
.route("/settings/toggle/{key}", post(settings_toggle_handler))
// Operator recovery banner (HTMX self-poll while recovering)
.route("/__recovery", get(recovery_banner_handler))
// Logout
.route("/logout", get(auth::logout_handler))
.route_layer(middleware::from_fn_with_state(state.clone(), csrf_protect))
@@ -615,6 +617,13 @@ async fn device_logs_handler(
Ok(devices_view::logs_modal(&id, q.deployment.as_deref()))
}
/// Recovery banner fragment. While the aggregator replays KV/CR history this
/// returns a banner that re-fetches itself every 2s; once converged it returns
/// an inert empty element, so the poll naturally stops.
async fn recovery_banner_handler(State(s): State<AppState>) -> Markup {
crate::frontend::layout::recovery_banner(s.fleet.recovery_phase())
}
/// One-shot `podman logs` tail of the device's deployment container,
/// fetched over the NATS command channel. Rendered as the log panel
/// body (HTMX swaps it in on `load` and on Refresh).

View File

@@ -16,3 +16,4 @@ pub mod commands;
pub mod device_reconciler;
pub mod device_status;
pub mod fleet_aggregator;
pub mod liveness;

View File

@@ -0,0 +1,103 @@
//! Operator recovery liveness.
//!
//! After a restart the aggregator cold-rebuilds its caches from NATS KV +
//! kube CR watches. Until that replay completes, the dashboard's counts can
//! be stale — so we expose a coarse [`RecoveryPhase`] the UI shows as a banner
//! ("recovering" → "converged"), and the customer sees progress instead of a
//! silent stale view.
//!
//! Convergence = all three cold-start sources have replayed their current
//! contents at least once: Deployment CRs, Device CRs, and the device-state KV.
//! Each is a one-way latch (`Recovering` → `Converged`, never back), set from
//! the natural caught-up signals (`watcher::Event::InitDone`, KV
//! `Entry::seen_current`).
use std::sync::Arc;
use std::sync::atomic::{AtomicBool, Ordering};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum RecoveryPhase {
/// Still replaying KV / CR history; counts may be stale.
Recovering,
/// All cold-start sources replayed; the picture is current.
Converged,
}
/// Cheap-to-clone handle shared between the aggregator (writer) and the
/// in-process dashboard (reader). Both live in one operator process, so a
/// shared atomic is all the coordination needed.
#[derive(Clone, Default)]
pub struct OperatorLiveness {
inner: Arc<Inner>,
}
#[derive(Default)]
struct Inner {
deployments_ready: AtomicBool,
devices_ready: AtomicBool,
states_ready: AtomicBool,
}
impl OperatorLiveness {
pub fn new() -> Self {
Self::default()
}
pub fn mark_deployments_ready(&self) {
self.inner.deployments_ready.store(true, Ordering::Relaxed);
}
pub fn mark_devices_ready(&self) {
self.inner.devices_ready.store(true, Ordering::Relaxed);
}
pub fn mark_states_ready(&self) {
self.inner.states_ready.store(true, Ordering::Relaxed);
}
pub fn phase(&self) -> RecoveryPhase {
let i = &self.inner;
if i.deployments_ready.load(Ordering::Relaxed)
&& i.devices_ready.load(Ordering::Relaxed)
&& i.states_ready.load(Ordering::Relaxed)
{
RecoveryPhase::Converged
} else {
RecoveryPhase::Recovering
}
}
pub fn is_converged(&self) -> bool {
self.phase() == RecoveryPhase::Converged
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn starts_recovering_and_needs_all_three_sources() {
let l = OperatorLiveness::new();
assert_eq!(l.phase(), RecoveryPhase::Recovering);
l.mark_deployments_ready();
l.mark_devices_ready();
assert_eq!(l.phase(), RecoveryPhase::Recovering, "states still pending");
l.mark_states_ready();
assert_eq!(l.phase(), RecoveryPhase::Converged);
}
#[test]
fn converged_is_a_one_way_latch_and_clones_share_state() {
let a = OperatorLiveness::new();
let b = a.clone();
a.mark_deployments_ready();
a.mark_devices_ready();
a.mark_states_ready();
// The reader clone observes the writer's marks.
assert!(b.is_converged());
// Marking again is idempotent — no way back to Recovering.
b.mark_states_ready();
assert!(a.is_converged());
}
}

View File

@@ -129,7 +129,11 @@ async fn serve_web(
// Standalone serve-web has no NATS, so device exec/log tailing is
// unavailable here — those features need the deployed operator
// (`run`), which threads in the command channel.
Arc::new(RealFleetService::new(Client::try_default().await?, None))
Arc::new(RealFleetService::new(
Client::try_default().await?,
None,
None,
))
};
serve_dashboard(fleet, addr, css_from, live_reload).await
}
@@ -192,6 +196,7 @@ async fn serve_dashboard(
fn spawn_dashboard(
client: Client,
commands: harmony_fleet_operator::commands::FleetCommandsClient,
liveness: harmony_fleet_operator::liveness::OperatorLiveness,
) {
use std::net::SocketAddr;
use std::sync::Arc;
@@ -200,7 +205,11 @@ fn spawn_dashboard(
let addr = SocketAddr::from(([0, 0, 0, 0], frontend::server::DEFAULT_PORT));
tokio::spawn(async move {
let fleet = Arc::new(RealFleetService::new(client, Some(commands)));
let fleet = Arc::new(RealFleetService::new(
client,
Some(commands),
Some(liveness),
));
if let Err(e) = serve_dashboard(fleet, addr, None, false).await {
tracing::error!(error = %e, "dashboard server exited; reconcile continues");
}
@@ -221,6 +230,10 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()>
let client = Client::try_default().await?;
// Shared recovery signal: the aggregator latches it to Converged once it
// has replayed KV + CR history; the dashboard reads it for the banner.
let liveness = harmony_fleet_operator::liveness::OperatorLiveness::new();
// Serve the read-only dashboard in the same process (best-effort).
// It reads CRs for state and uses the NATS command channel for
// device exec / log tailing. Built only with the web-frontend
@@ -229,6 +242,7 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()>
spawn_dashboard(
client.clone(),
harmony_fleet_operator::commands::FleetCommandsClient::new(nats.clone()),
liveness.clone(),
);
// Concurrent tasks:
@@ -248,7 +262,7 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()>
r = controller::run(ctl_client, desired_state_kv) => r,
r = device_reconciler::run(dr_client, dr_js) => r,
r = device_status::run(ds_client, ds_js) => r,
r = fleet_aggregator::run(client, js) => r,
r = fleet_aggregator::run(client, js, liveness) => r,
}
}

View File

@@ -45,6 +45,12 @@ pub trait FleetService: Send + Sync + 'static {
device_id: &str,
deployment: Option<&str>,
) -> anyhow::Result<String>;
/// Coarse operator recovery state for the dashboard banner. Sync — a cheap
/// in-memory read, not a CR/NATS round-trip. Defaults to `Converged`, which
/// the mock and the standalone `serve-web` path (no aggregator) inherit.
fn recovery_phase(&self) -> harmony_fleet_operator::liveness::RecoveryPhase {
harmony_fleet_operator::liveness::RecoveryPhase::Converged
}
}
// ── Device ─────────────────────────────────────────────────────────────

View File

@@ -25,6 +25,7 @@ use harmony::modules::fleet::operator::{
use harmony::modules::podman::ReconcileScore;
use harmony_fleet_operator::commands::FleetCommandsClient;
use harmony_fleet_operator::fleet_aggregator::selector_matches;
use harmony_fleet_operator::liveness::{OperatorLiveness, RecoveryPhase};
use harmony_reconciler_contracts::ExecReply;
use super::{
@@ -46,14 +47,23 @@ pub struct RealFleetService {
/// In-memory ack set. Alerts are derived from live CR state and
/// have no store of their own, so acks don't survive a restart.
acked_alerts: Mutex<HashSet<String>>,
/// Aggregator recovery signal, shared in-process. `None` in the
/// standalone `serve-web` path (no aggregator running) → reported as
/// `Converged` so the banner never shows there.
liveness: Option<OperatorLiveness>,
}
impl RealFleetService {
pub fn new(kube: Client, commands: Option<FleetCommandsClient>) -> Self {
pub fn new(
kube: Client,
commands: Option<FleetCommandsClient>,
liveness: Option<OperatorLiveness>,
) -> Self {
Self {
kube,
commands,
acked_alerts: Mutex::new(HashSet::new()),
liveness,
}
}
@@ -360,6 +370,13 @@ fn derive_alerts(
#[async_trait]
impl FleetService for RealFleetService {
fn recovery_phase(&self) -> RecoveryPhase {
self.liveness
.as_ref()
.map(|l| l.phase())
.unwrap_or(RecoveryPhase::Converged)
}
async fn dashboard_detail(&self) -> anyhow::Result<DashboardDetail> {
let devices = self.devices().await?;
let mut deployments = self.deployments().await?;