feat(fleet-operator): aggregator recovery signal + orphan GC + recovery e2e (Ch2) #328
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -4131,6 +4131,7 @@ dependencies = [
|
||||
"anyhow",
|
||||
"async-nats",
|
||||
"async-trait",
|
||||
"chrono",
|
||||
"directories",
|
||||
"futures-util",
|
||||
"harmony",
|
||||
|
||||
219
docs/fleet-operator-recovery-scenarios.md
Normal file
219
docs/fleet-operator-recovery-scenarios.md
Normal file
@@ -0,0 +1,219 @@
|
||||
# Fleet operator recovery scenarios
|
||||
|
||||
The operator is **stateless across restarts**. It can be killed, upgraded, or
|
||||
rescheduled at any time. On restart it cold-rebuilds from two durable sources —
|
||||
kube CRs and NATS KV — with no customer-visible "unknown state" window.
|
||||
|
||||
## Architecture
|
||||
|
||||
```
|
||||
┌─────────────────────────────────────────────────────────────────┐
|
||||
│ Operator process │
|
||||
│ │
|
||||
│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────────┐ │
|
||||
│ │ Deployment │ │ Device │ │ Device-state KV │ │
|
||||
│ │ CR watcher │ │ CR watcher │ │ watcher (history) │ │
|
||||
│ └──────┬───────┘ └──────┬───────┘ └──────────┬───────────┘ │
|
||||
│ │ │ │ │
|
||||
│ ▼ ▼ ▼ │
|
||||
│ ┌──────────────────────────────────────────────────────────┐ │
|
||||
│ │ FleetState (mutex) │ │
|
||||
│ │ deployments: {key → CachedDeployment} │ │
|
||||
│ │ devices: {name → labels} │ │
|
||||
│ │ states: {(device,deployment) → DeploymentState} │ │
|
||||
│ │ owned_targets: {deployment → {device…}} │ │
|
||||
│ │ dirty: {key…} ──► patch_tick (1 Hz) │ │
|
||||
│ └──────────────────────────────────────────────────────────┘ │
|
||||
│ │ │
|
||||
│ ▼ │
|
||||
│ ┌──────────────────┐ ┌────────────────────────────────┐ │
|
||||
│ │ desired-state KV │ │ Deployment.status.aggregate │ │
|
||||
│ │ put / delete │ │ (patched to kube at 1 Hz) │ │
|
||||
│ └──────────────────┘ └────────────────────────────────┘ │
|
||||
│ │
|
||||
│ ┌──────────────────────────────────────────────────────────┐ │
|
||||
│ │ OperatorLiveness (3 atomic latches, shared w/ dashboard) │ │
|
||||
│ │ deployments_ready ─┐ │ │
|
||||
│ │ devices_ready ─────┼──► all three → Converged │ │
|
||||
│ │ states_ready ──────┘ (else: Recovering → banner) │ │
|
||||
│ └──────────────────────────────────────────────────────────┘ │
|
||||
└─────────────────────────────────────────────────────────────────┘
|
||||
```
|
||||
|
||||
### Cold-start sequence
|
||||
|
||||
```
|
||||
Operator starts
|
||||
│
|
||||
├─ seed_owned_targets() reads desired-state KV keys
|
||||
│ → populates owned_targets
|
||||
│ (so first reconcile diffs correctly)
|
||||
│
|
||||
├─ spawn 3 watchers (JoinSet) │
|
||||
│ ├─ Deployment CR watcher │ replay → InitDone → mark_deployments_ready
|
||||
│ │ │ + gc_orphaned_desired_state
|
||||
│ ├─ Device CR watcher │ replay → InitDone → mark_devices_ready
|
||||
│ └─ Device-state KV watch │ replay → seen_current → mark_states_ready
|
||||
│ │ (or: empty bucket → mark immediately)
|
||||
│
|
||||
└─ spawn patch_tick (1 Hz) flushes dirty CR status patches
|
||||
```
|
||||
|
||||
### Convergence banner
|
||||
|
||||
```
|
||||
Dashboard reads OperatorLiveness.phase():
|
||||
|
||||
Recovering ──────► Converged
|
||||
┌──────────────┐ ┌─────┐
|
||||
│ ⚠ rebuilding │ │ │ (empty div, polling stops)
|
||||
│ polls /2s │ │ │
|
||||
└──────────────┘ └─────┘
|
||||
```
|
||||
|
||||
The banner self-clears when all three latches trip. Typically sub-second.
|
||||
|
||||
### Key invariants
|
||||
|
||||
| Invariant | Mechanism |
|
||||
|---|---|
|
||||
| Writes are byte-deterministic | Same CR → same serialized score JSON. Two operators write identical bytes. |
|
||||
| No leader election needed | Idempotent multi-writer at current fleet size. (HA = D3, deferred.) |
|
||||
| Cancel = clean teardown | `JoinSet` owns all watchers; dropping `run`'s future aborts children. |
|
||||
| `owned_targets` seeded before first reconcile | `seed_owned_targets()` reads KV keys so diff doesn't orphan entries. |
|
||||
| Orphan GC on `InitDone` | Deployment CRs deleted while operator was down → purged at convergence. |
|
||||
| Out-of-order state dedup | `apply_state` rejects entries with older `last_event_at`. |
|
||||
|
||||
---
|
||||
|
||||
## Scenarios
|
||||
|
||||
### 1. Cold restart, healthy fleet
|
||||
|
||||
Operator killed and restarted. Devices kept reporting to KV throughout.
|
||||
|
||||
```
|
||||
Time ──────────────────────────────────────────────────►
|
||||
|
||||
Operator: [running]─── KILL ───[restart]──────────────────►
|
||||
│
|
||||
├─ seed_owned_targets (from KV keys)
|
||||
├─ replay CRs → InitDone
|
||||
├─ replay device-state → seen_current
|
||||
└─ Converged
|
||||
|
||||
KV: [desired-state ✓] [device-state ✓] (untouched)
|
||||
|
||||
Expected: desired-state re-written identically (agents byte-compare → no-op)
|
||||
health counts rebuilt from device-state KV
|
||||
liveness → Converged
|
||||
```
|
||||
|
||||
**Test:** `aggregator_converges_from_kv_after_restart`
|
||||
|
||||
### 2. Stale KV — CR deleted while operator down
|
||||
|
||||
A Deployment CR is force-deleted (finalizer bypassed: namespace teardown,
|
||||
`--force`) while no operator is running. The desired-state KV entry is orphaned.
|
||||
|
||||
```
|
||||
Time ──────────────────────────────────────────────────►
|
||||
|
||||
Operator: [running]─── KILL ──────────────[restart]──────────►
|
||||
│ │
|
||||
│ ├─ seed_owned_targets
|
||||
│ │ (picks up orphan key)
|
||||
│ │
|
||||
│ ├─ CR replay: InitDone
|
||||
│ │ deployment NOT in cache
|
||||
│ │
|
||||
│ └─ gc_orphaned_desired_state
|
||||
│ deletes orphan from KV
|
||||
│
|
||||
CR: [exists]─── force-delete ──[gone]─────────────────►
|
||||
KV: [desired-state ✓]──────────[orphan]───[purged]────►
|
||||
|
||||
Expected: orphan desired-state deleted at convergence
|
||||
agents stop running the dead deployment
|
||||
```
|
||||
|
||||
**Test:** `aggregator_gcs_desired_state_for_deleted_cr`
|
||||
|
||||
### 3. Two operators racing (rolling deploy overlap)
|
||||
|
||||
Brief period where two operator replicas run simultaneously.
|
||||
|
||||
```
|
||||
Time ──────────────────────────────────────────────────►
|
||||
|
||||
Operator A: [running]────────────────────────────────────────►
|
||||
Operator B: [start]───────────────────────────────────►
|
||||
│
|
||||
├─ both seed_owned_targets (same KV keys)
|
||||
├─ both replay same CRs
|
||||
├─ both compute same matched_devices
|
||||
└─ both put identical score_json to same KV keys
|
||||
|
||||
KV: [desired-state: bytes_A] == [desired-state: bytes_B]
|
||||
|
||||
Expected: KV value stable regardless of write order
|
||||
no orphan, no flapping, no leader election needed
|
||||
```
|
||||
|
||||
**Test:** `two_aggregators_produce_identical_desired_state`
|
||||
|
||||
### 4. Partial KV — device offline during restart
|
||||
|
||||
A device hasn't reported state (no `device-state` entry) when the operator
|
||||
restarts.
|
||||
|
||||
```
|
||||
Time ──────────────────────────────────────────────────►
|
||||
|
||||
Operator: [running]─── KILL ───[restart]──────────────────►
|
||||
|
||||
Device A: [reporting ✓]────────────────────────────────────►
|
||||
Device B: [offline]────────────────────────────────────────►
|
||||
(no device-state entry in KV)
|
||||
|
||||
Expected: Device A → Running (rebuilt from KV)
|
||||
Device B → Pending (no state entry = pending in aggregate)
|
||||
Device B recovers to Running when it reports
|
||||
```
|
||||
|
||||
**Test:** `device_offline_during_restart_counts_as_pending`
|
||||
|
||||
### 5. Chaos — kill under write load
|
||||
|
||||
Operator repeatedly killed and restarted while deployments are being created.
|
||||
|
||||
```
|
||||
Time ──────────────────────────────────────────────────►
|
||||
|
||||
Deployments: [create d0] [create d1] [create d2] [create d3] [create d4]
|
||||
▲
|
||||
Operator: [running]──────── KILL ──────┘──[restart]──── KILL ──[final]──►
|
||||
│
|
||||
└─ converge
|
||||
all 5
|
||||
|
||||
Expected: final operator converges full desired-state set within 30s
|
||||
```
|
||||
|
||||
**Test:** `chaos_kill_under_write_load_converges`
|
||||
|
||||
---
|
||||
|
||||
## Running the regression tests
|
||||
|
||||
```bash
|
||||
HARMONY_FLEET_E2E=1 cargo test -p harmony-fleet-e2e --test operator_recovery
|
||||
```
|
||||
|
||||
**Prerequisites:** k3d + podman on PATH. The shared harness brings up NATS in
|
||||
a fresh namespace.
|
||||
|
||||
**How it works:** tests drive `fleet_aggregator::run` in-process against the
|
||||
real NATS + k3d. `spawn_aggregator` returns a `JoinHandle`; `kill` aborts it
|
||||
and awaits (ensuring the `JoinSet` children tear down before the next instance
|
||||
starts).
|
||||
@@ -60,3 +60,4 @@ uuid = { version = "1", features = ["v4"] }
|
||||
|
||||
[dev-dependencies]
|
||||
tokio = { workspace = true, features = ["full"] }
|
||||
chrono = { workspace = true }
|
||||
|
||||
@@ -172,6 +172,22 @@ impl AdminKv {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Publish a `device-state` entry directly, simulating an agent report.
|
||||
/// Recovery tests use this to assert the operator rebuilds health counts
|
||||
/// from KV alone after a restart.
|
||||
pub async fn put_device_state(&self, state: &DeploymentState) -> Result<(), AdminKvError> {
|
||||
|
|
||||
let key = device_state_key(&state.device_id.to_string(), &state.deployment);
|
||||
let payload = serde_json::to_vec(state).map_err(AdminKvError::Serialize)?;
|
||||
self.device_state
|
||||
.put(&key, payload.into())
|
||||
.await
|
||||
.map_err(|e| AdminKvError::Put {
|
||||
key,
|
||||
source: Box::new(e),
|
||||
})?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Delete a desired-state entry. Triggers the agent's tombstone
|
||||
/// reconcile path: containers for that deployment are removed
|
||||
/// and the corresponding device-state entry is dropped.
|
||||
|
||||
568
fleet/harmony-fleet-e2e/tests/operator_recovery.rs
Normal file
568
fleet/harmony-fleet-e2e/tests/operator_recovery.rs
Normal file
@@ -0,0 +1,568 @@
|
||||
//! Operator restart + aggregator recovery regression tests (v0.3 Ch2).
|
||||
//!
|
||||
//! Each test maps to a scenario in `docs/fleet-operator-recovery-scenarios.md`.
|
||||
//! Structure per test:
|
||||
//!
|
||||
//! 1. SETUP — create devices + deployments, start first aggregator
|
||||
//! 2. ACTION — kill/restart/delete (the failure being tested)
|
||||
//! 3. ASSERT — verify convergence + expected end state
|
||||
//!
|
||||
//! Gated behind `HARMONY_FLEET_E2E=1` (needs k3d + podman on PATH).
|
||||
|
||||
use std::collections::BTreeMap;
|
||||
use std::time::{Duration, Instant};
|
||||
|
||||
use async_nats::jetstream;
|
||||
use chrono::Utc;
|
||||
use harmony::modules::fleet::operator::crd::{
|
||||
Deployment, DeploymentSpec, Device, DeviceSpec, PodmanService, PodmanV0Score, ReconcileScore,
|
||||
Rollout, RolloutStrategy,
|
||||
};
|
||||
use harmony_fleet_e2e::{AdminKv, StackOptions, shared_stack};
|
||||
use harmony_fleet_operator::fleet_aggregator;
|
||||
use harmony_fleet_operator::liveness::OperatorLiveness;
|
||||
use harmony_reconciler_contracts::{
|
||||
BUCKET_DESIRED_STATE, DeploymentName, DeploymentState, Id, Phase, desired_state_key,
|
||||
};
|
||||
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
|
||||
use kube::Client;
|
||||
use kube::api::{Api, DeleteParams, ObjectMeta, PostParams};
|
||||
use tokio::task::JoinHandle;
|
||||
|
||||
const E2E_ENV: &str = "HARMONY_FLEET_E2E";
|
||||
const LABEL_KEY: &str = "recovery-grp";
|
||||
|
||||
fn e2e_enabled() -> bool {
|
||||
matches!(std::env::var(E2E_ENV).as_deref(), Ok("1" | "true"))
|
||||
}
|
||||
|
||||
fn skip() {
|
||||
eprintln!("skipping {E2E_ENV}-gated test (set {E2E_ENV}=1; needs k3d + podman)");
|
||||
}
|
||||
|
||||
// ── Fixtures ────────────────────────────────────────────────────────────
|
||||
|
||||
async fn recovery_stack() -> anyhow::Result<std::sync::Arc<harmony_fleet_e2e::Stack>> {
|
||||
let _ = tracing_subscriber::fmt()
|
||||
.with_env_filter(
|
||||
tracing_subscriber::EnvFilter::try_from_default_env()
|
||||
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("warn")),
|
||||
)
|
||||
.try_init();
|
||||
|
||||
let stack = shared_stack(StackOptions {
|
||||
deploy_agent: false,
|
||||
deploy_operator: false,
|
||||
..StackOptions::default()
|
||||
})
|
||||
.await?;
|
||||
|
||||
harmony_fleet_deploy::operator::install_crds().await?;
|
||||
wait_crds_ready().await?;
|
||||
Ok(stack)
|
||||
}
|
||||
|
||||
async fn wait_crds_ready() -> anyhow::Result<()> {
|
||||
let client = Client::try_default().await?;
|
||||
let deployments: Api<Deployment> = Api::all(client.clone());
|
||||
let devices: Api<Device> = Api::all(client);
|
||||
let deadline = Instant::now() + Duration::from_secs(30);
|
||||
loop {
|
||||
let ok = deployments.list(&Default::default()).await.is_ok()
|
||||
&& devices.list(&Default::default()).await.is_ok();
|
||||
if ok {
|
||||
return Ok(());
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
anyhow::bail!("CRDs not Established within 30s");
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(500)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn admin_nats(stack: &harmony_fleet_e2e::Stack) -> anyhow::Result<async_nats::Client> {
|
||||
Ok(async_nats::ConnectOptions::new()
|
||||
.user_and_password(stack.admin_user.clone(), stack.admin_pass.clone())
|
||||
.connect(&stack.nats_url)
|
||||
.await?)
|
||||
}
|
||||
|
||||
async fn spawn_aggregator(
|
||||
stack: &harmony_fleet_e2e::Stack,
|
||||
) -> anyhow::Result<(JoinHandle<()>, OperatorLiveness)> {
|
||||
let kube = Client::try_default().await?;
|
||||
let js = jetstream::new(admin_nats(stack).await?);
|
||||
let liveness = OperatorLiveness::new();
|
||||
let handle = {
|
||||
let liveness = liveness.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = fleet_aggregator::run(kube, js, liveness).await {
|
||||
tracing::warn!(error = %e, "test aggregator exited");
|
||||
}
|
||||
})
|
||||
};
|
||||
Ok((handle, liveness))
|
||||
}
|
||||
|
||||
async fn kill(handle: JoinHandle<()>) {
|
||||
handle.abort();
|
||||
let _ = handle.await;
|
||||
}
|
||||
|
||||
fn podman_score() -> PodmanV0Score {
|
||||
PodmanV0Score {
|
||||
services: vec![PodmanService {
|
||||
name: "hello".to_string(),
|
||||
image: "docker.io/library/hello-world:latest".to_string(),
|
||||
ports: vec![],
|
||||
env: vec![],
|
||||
volumes: vec![],
|
||||
restart_policy: Default::default(),
|
||||
}],
|
||||
}
|
||||
}
|
||||
|
||||
fn device_with_label(name: &str, group: &str) -> Device {
|
||||
let mut labels = BTreeMap::new();
|
||||
labels.insert(LABEL_KEY.to_string(), group.to_string());
|
||||
Device {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(name.to_string()),
|
||||
labels: Some(labels),
|
||||
..Default::default()
|
||||
},
|
||||
spec: DeviceSpec { inventory: None },
|
||||
status: None,
|
||||
}
|
||||
}
|
||||
|
||||
fn deployment_matching(name: &str, group: &str) -> Deployment {
|
||||
let mut ml = BTreeMap::new();
|
||||
ml.insert(LABEL_KEY.to_string(), group.to_string());
|
||||
Deployment {
|
||||
metadata: ObjectMeta {
|
||||
name: Some(name.to_string()),
|
||||
..Default::default()
|
||||
},
|
||||
spec: DeploymentSpec {
|
||||
target_selector: LabelSelector {
|
||||
match_labels: Some(ml),
|
||||
match_expressions: None,
|
||||
},
|
||||
score: ReconcileScore::PodmanV0(podman_score()),
|
||||
rollout: Rollout {
|
||||
strategy: RolloutStrategy::Immediate,
|
||||
},
|
||||
},
|
||||
status: None,
|
||||
}
|
||||
}
|
||||
|
||||
async fn wait_until<F, Fut>(budget: Duration, mut f: F) -> anyhow::Result<()>
|
||||
where
|
||||
F: FnMut() -> Fut,
|
||||
Fut: std::future::Future<Output = bool>,
|
||||
{
|
||||
let deadline = Instant::now() + budget;
|
||||
loop {
|
||||
if f().await {
|
||||
return Ok(());
|
||||
}
|
||||
if Instant::now() >= deadline {
|
||||
anyhow::bail!("condition not met within {budget:?}");
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(300)).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn desired_state_present(
|
||||
stack: &harmony_fleet_e2e::Stack,
|
||||
device: &str,
|
||||
deployment: &str,
|
||||
) -> anyhow::Result<bool> {
|
||||
let js = jetstream::new(admin_nats(stack).await?);
|
||||
let bucket = js.get_key_value(BUCKET_DESIRED_STATE).await?;
|
||||
let key = desired_state_key(device, &DeploymentName::try_new(deployment)?);
|
||||
Ok(bucket.get(&key).await?.is_some())
|
||||
}
|
||||
|
||||
async fn cleanup(stack: &harmony_fleet_e2e::Stack, devices: &[&str], deployments: &[&str]) {
|
||||
if let Ok(client) = Client::try_default().await {
|
||||
let dev_api: Api<Device> = Api::all(client.clone());
|
||||
let dep_api: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
||||
for d in devices {
|
||||
let _ = dev_api.delete(d, &DeleteParams::default()).await;
|
||||
}
|
||||
for d in deployments {
|
||||
let _ = dep_api.delete(d, &DeleteParams::default()).await;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// ── Scenario 1: cold restart, healthy fleet ─────────────────────────────
|
||||
//
|
||||
// SETUP: 1 device (recov1-dev, group=g1)
|
||||
// 1 deployment (recov1-depl, selector: g1)
|
||||
// aggregator converges, agent reports Running
|
||||
//
|
||||
// ACTION: kill aggregator, start a new one
|
||||
//
|
||||
// ASSERT: desired-state survives restart (idempotent re-write)
|
||||
// health counts rebuilt: succeeded=1, matched=1
|
||||
// liveness → Converged
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn aggregator_converges_from_kv_after_restart() -> anyhow::Result<()> {
|
||||
if !e2e_enabled() {
|
||||
skip();
|
||||
return Ok(());
|
||||
}
|
||||
let stack = recovery_stack().await?;
|
||||
let client = Client::try_default().await?;
|
||||
let devices: Api<Device> = Api::all(client.clone());
|
||||
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
||||
|
||||
let (dev, depl, grp) = ("recov1-dev", "recov1-depl", "g1");
|
||||
let _ = devices.delete(dev, &DeleteParams::default()).await;
|
||||
let _ = deployments.delete(depl, &DeleteParams::default()).await;
|
||||
|
||||
// SETUP: create device + deployment, run first aggregator to convergence
|
||||
devices
|
||||
.create(&PostParams::default(), &device_with_label(dev, grp))
|
||||
.await?;
|
||||
deployments
|
||||
.create(&PostParams::default(), &deployment_matching(depl, grp))
|
||||
.await?;
|
||||
|
||||
let (h1, l1) = spawn_aggregator(&stack).await?;
|
||||
wait_until(Duration::from_secs(30), || async {
|
||||
l1.is_converged()
|
||||
&& desired_state_present(&stack, dev, depl)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Simulate agent having reported Running
|
||||
let kv = AdminKv::connect(&admin_nats(&stack).await?).await?;
|
||||
let depl_name = DeploymentName::try_new(depl)?;
|
||||
kv.put_device_state(&DeploymentState {
|
||||
device_id: Id::from(dev),
|
||||
deployment: depl_name.clone(),
|
||||
phase: Phase::Running,
|
||||
last_event_at: Utc::now(),
|
||||
last_error: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
// ACTION: kill and restart aggregator
|
||||
kill(h1).await;
|
||||
let (h2, l2) = spawn_aggregator(&stack).await?;
|
||||
wait_until(Duration::from_secs(30), || async { l2.is_converged() }).await?;
|
||||
|
||||
// ASSERT: desired-state survives, health counts rebuilt from KV
|
||||
assert!(
|
||||
desired_state_present(&stack, dev, depl).await?,
|
||||
"desired-state must persist across restart"
|
||||
);
|
||||
wait_until(Duration::from_secs(30), || async {
|
||||
deployments
|
||||
.get(depl)
|
||||
.await
|
||||
.ok()
|
||||
.and_then(|cr| cr.status?.aggregate)
|
||||
.map(|a| a.succeeded == 1 && a.matched_device_count == 1)
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.await?;
|
||||
|
||||
kill(h2).await;
|
||||
cleanup(&stack, &[dev], &[depl]).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── Scenario 2: CR deleted while operator down ──────────────────────────
|
||||
//
|
||||
// SETUP: 1 device (recov2-dev, group=g2)
|
||||
// 1 deployment (recov2-depl, selector: g2)
|
||||
// aggregator converges → desired-state written
|
||||
//
|
||||
// ACTION: kill aggregator
|
||||
// force-delete the deployment CR (bypasses finalizer)
|
||||
// start new aggregator
|
||||
//
|
||||
// ASSERT: orphan desired-state is GC'd at convergence
|
||||
// (deployment no longer exists → no targets → entries purged)
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn aggregator_gcs_desired_state_for_deleted_cr() -> anyhow::Result<()> {
|
||||
if !e2e_enabled() {
|
||||
skip();
|
||||
return Ok(());
|
||||
}
|
||||
let stack = recovery_stack().await?;
|
||||
let client = Client::try_default().await?;
|
||||
let devices: Api<Device> = Api::all(client.clone());
|
||||
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
||||
|
||||
let (dev, depl, grp) = ("recov2-dev", "recov2-depl", "g2");
|
||||
let _ = devices.delete(dev, &DeleteParams::default()).await;
|
||||
let _ = deployments.delete(depl, &DeleteParams::default()).await;
|
||||
|
||||
// SETUP
|
||||
devices
|
||||
.create(&PostParams::default(), &device_with_label(dev, grp))
|
||||
.await?;
|
||||
deployments
|
||||
.create(&PostParams::default(), &deployment_matching(depl, grp))
|
||||
.await?;
|
||||
|
||||
let (h1, l1) = spawn_aggregator(&stack).await?;
|
||||
wait_until(Duration::from_secs(30), || async {
|
||||
l1.is_converged()
|
||||
&& desired_state_present(&stack, dev, depl)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.await?;
|
||||
kill(h1).await;
|
||||
|
||||
// ACTION: delete CR while operator is down, then restart
|
||||
deployments.delete(depl, &DeleteParams::default()).await?;
|
||||
assert!(
|
||||
desired_state_present(&stack, dev, depl).await?,
|
||||
"orphan desired-state should still be present before recovery"
|
||||
);
|
||||
|
||||
let (h2, _l2) = spawn_aggregator(&stack).await?;
|
||||
|
||||
// ASSERT: orphan is GC'd
|
||||
wait_until(Duration::from_secs(30), || async {
|
||||
!desired_state_present(&stack, dev, depl)
|
||||
.await
|
||||
.unwrap_or(true)
|
||||
})
|
||||
.await?;
|
||||
|
||||
kill(h2).await;
|
||||
cleanup(&stack, &[dev], &[]).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── Scenario 3: two operators racing ────────────────────────────────────
|
||||
//
|
||||
// SETUP: 1 device (recov3-dev, group=g3)
|
||||
// 1 deployment (recov3-depl, selector: g3)
|
||||
//
|
||||
// ACTION: start TWO aggregators simultaneously (rolling-deploy overlap)
|
||||
//
|
||||
// ASSERT: both converge
|
||||
// desired-state KV value is byte-identical to the serialized score
|
||||
// (idempotent multi-writer, no leader election needed)
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn two_aggregators_produce_identical_desired_state() -> anyhow::Result<()> {
|
||||
if !e2e_enabled() {
|
||||
skip();
|
||||
return Ok(());
|
||||
}
|
||||
let stack = recovery_stack().await?;
|
||||
let client = Client::try_default().await?;
|
||||
let devices: Api<Device> = Api::all(client.clone());
|
||||
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
||||
|
||||
let (dev, depl, grp) = ("recov3-dev", "recov3-depl", "g3");
|
||||
let _ = devices.delete(dev, &DeleteParams::default()).await;
|
||||
let _ = deployments.delete(depl, &DeleteParams::default()).await;
|
||||
|
||||
// SETUP
|
||||
devices
|
||||
.create(&PostParams::default(), &device_with_label(dev, grp))
|
||||
.await?;
|
||||
deployments
|
||||
.create(&PostParams::default(), &deployment_matching(depl, grp))
|
||||
.await?;
|
||||
|
||||
// ACTION: two operators at once
|
||||
let (h_a, l_a) = spawn_aggregator(&stack).await?;
|
||||
let (h_b, l_b) = spawn_aggregator(&stack).await?;
|
||||
wait_until(Duration::from_secs(30), || async {
|
||||
l_a.is_converged() && l_b.is_converged()
|
||||
})
|
||||
.await?;
|
||||
|
||||
// ASSERT: KV value is deterministic regardless of writer
|
||||
let js = jetstream::new(admin_nats(&stack).await?);
|
||||
let bucket = js.get_key_value(BUCKET_DESIRED_STATE).await?;
|
||||
let key = desired_state_key(dev, &DeploymentName::try_new(depl)?);
|
||||
let value = bucket.get(&key).await?.expect("desired-state present");
|
||||
let expected = serde_json::to_vec(&ReconcileScore::PodmanV0(podman_score()))?;
|
||||
assert_eq!(
|
||||
value.to_vec(),
|
||||
expected,
|
||||
"racing writers must agree byte-for-byte"
|
||||
);
|
||||
|
||||
kill(h_a).await;
|
||||
kill(h_b).await;
|
||||
cleanup(&stack, &[dev], &[depl]).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── Scenario 4: device offline during restart ───────────────────────────
|
||||
//
|
||||
// SETUP: 2 devices (recov4-dev-a reporting, recov4-dev-b silent)
|
||||
// 1 deployment (recov4-depl, selector: g4, matches both)
|
||||
// aggregator converges
|
||||
// dev-a reports Running; dev-b never reports
|
||||
//
|
||||
// ACTION: kill aggregator, start a new one
|
||||
//
|
||||
// ASSERT: dev-a → succeeded (rebuilt from device-state KV)
|
||||
// dev-b → pending (no state entry = pending)
|
||||
// matched_device_count = 2
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn device_offline_during_restart_counts_as_pending() -> anyhow::Result<()> {
|
||||
if !e2e_enabled() {
|
||||
skip();
|
||||
return Ok(());
|
||||
}
|
||||
let stack = recovery_stack().await?;
|
||||
let client = Client::try_default().await?;
|
||||
let devices: Api<Device> = Api::all(client.clone());
|
||||
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
||||
|
||||
let (dev_a, dev_b, depl, grp) = ("recov4-dev-a", "recov4-dev-b", "recov4-depl", "g4");
|
||||
for d in [dev_a, dev_b] {
|
||||
let _ = devices.delete(d, &DeleteParams::default()).await;
|
||||
}
|
||||
let _ = deployments.delete(depl, &DeleteParams::default()).await;
|
||||
|
||||
// SETUP: 2 devices, 1 deployment matching both
|
||||
devices
|
||||
.create(&PostParams::default(), &device_with_label(dev_a, grp))
|
||||
.await?;
|
||||
devices
|
||||
.create(&PostParams::default(), &device_with_label(dev_b, grp))
|
||||
.await?;
|
||||
deployments
|
||||
.create(&PostParams::default(), &deployment_matching(depl, grp))
|
||||
.await?;
|
||||
|
||||
let (h1, l1) = spawn_aggregator(&stack).await?;
|
||||
wait_until(Duration::from_secs(30), || async {
|
||||
l1.is_converged()
|
||||
&& desired_state_present(&stack, dev_a, depl)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
&& desired_state_present(&stack, dev_b, depl)
|
||||
.await
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.await?;
|
||||
|
||||
// Only dev-a reports Running; dev-b stays silent
|
||||
let kv = AdminKv::connect(&admin_nats(&stack).await?).await?;
|
||||
let depl_name = DeploymentName::try_new(depl)?;
|
||||
kv.put_device_state(&DeploymentState {
|
||||
device_id: Id::from(dev_a),
|
||||
deployment: depl_name.clone(),
|
||||
phase: Phase::Running,
|
||||
last_event_at: Utc::now(),
|
||||
last_error: None,
|
||||
})
|
||||
.await?;
|
||||
|
||||
// ACTION: kill and restart aggregator
|
||||
kill(h1).await;
|
||||
let (h2, l2) = spawn_aggregator(&stack).await?;
|
||||
wait_until(Duration::from_secs(30), || async { l2.is_converged() }).await?;
|
||||
|
||||
// ASSERT: dev-a = succeeded, dev-b = pending, matched = 2
|
||||
wait_until(Duration::from_secs(30), || async {
|
||||
deployments
|
||||
.get(depl)
|
||||
.await
|
||||
.ok()
|
||||
.and_then(|cr| cr.status?.aggregate)
|
||||
.map(|a| {
|
||||
a.matched_device_count == 2 && a.succeeded == 1 && a.pending == 1
|
||||
})
|
||||
.unwrap_or(false)
|
||||
})
|
||||
.await?;
|
||||
|
||||
kill(h2).await;
|
||||
cleanup(&stack, &[dev_a, dev_b], &[depl]).await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ── Scenario 5: chaos kill under write load ─────────────────────────────
|
||||
//
|
||||
// SETUP: 1 device (recov5-dev, group=g5)
|
||||
// 5 deployments created incrementally (recov5-depl-0..4)
|
||||
//
|
||||
// ACTION: create deployments one by one
|
||||
// kill + restart aggregator mid-stream (after depl-2)
|
||||
// kill again after all created
|
||||
// start final aggregator
|
||||
//
|
||||
// ASSERT: final aggregator converges all 5 desired-state entries within 30s
|
||||
|
||||
#[tokio::test(flavor = "multi_thread", worker_threads = 4)]
|
||||
async fn chaos_kill_under_write_load_converges() -> anyhow::Result<()> {
|
||||
if !e2e_enabled() {
|
||||
skip();
|
||||
return Ok(());
|
||||
}
|
||||
let stack = recovery_stack().await?;
|
||||
let client = Client::try_default().await?;
|
||||
let devices: Api<Device> = Api::all(client.clone());
|
||||
let deployments: Api<Deployment> = Api::namespaced(client, &stack.namespace);
|
||||
|
||||
let (dev, grp) = ("recov5-dev", "g5");
|
||||
let names: Vec<String> = (0..5).map(|i| format!("recov5-depl-{i}")).collect();
|
||||
let _ = devices.delete(dev, &DeleteParams::default()).await;
|
||||
for n in &names {
|
||||
let _ = deployments.delete(n, &DeleteParams::default()).await;
|
||||
}
|
||||
|
||||
// SETUP: 1 device
|
||||
devices
|
||||
.create(&PostParams::default(), &device_with_label(dev, grp))
|
||||
.await?;
|
||||
|
||||
// ACTION: create deployments while killing/restarting the operator
|
||||
let (mut handle, _) = spawn_aggregator(&stack).await?;
|
||||
for (i, n) in names.iter().enumerate() {
|
||||
deployments
|
||||
.create(&PostParams::default(), &deployment_matching(n, grp))
|
||||
.await?;
|
||||
if i == 2 {
|
||||
kill(handle).await;
|
||||
(handle, _) = spawn_aggregator(&stack).await?;
|
||||
}
|
||||
}
|
||||
kill(handle).await;
|
||||
|
||||
// Final aggregator must converge all 5
|
||||
let (h_final, l_final) = spawn_aggregator(&stack).await?;
|
||||
wait_until(Duration::from_secs(30), || async { l_final.is_converged() }).await?;
|
||||
|
||||
// ASSERT: all 5 desired-state entries present
|
||||
wait_until(Duration::from_secs(30), || async {
|
||||
for n in &names {
|
||||
if !desired_state_present(&stack, dev, n).await.unwrap_or(false) {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
true
|
||||
})
|
||||
.await?;
|
||||
|
||||
kill(h_final).await;
|
||||
let dep_refs: Vec<&str> = names.iter().map(String::as_str).collect();
|
||||
cleanup(&stack, &[dev], &dep_refs).await;
|
||||
Ok(())
|
||||
}
|
||||
@@ -38,6 +38,8 @@ use harmony::modules::fleet::operator::{
|
||||
};
|
||||
use tracing::warn;
|
||||
|
||||
use crate::liveness::OperatorLiveness;
|
||||
|
||||
const PATCH_TICK: Duration = Duration::from_secs(1);
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
@@ -149,7 +151,11 @@ fn matched_devices(
|
||||
// Top-level run
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
pub async fn run(client: Client, js: async_nats::jetstream::Context) -> anyhow::Result<()> {
|
||||
pub async fn run(
|
||||
client: Client,
|
||||
js: async_nats::jetstream::Context,
|
||||
liveness: OperatorLiveness,
|
||||
) -> anyhow::Result<()> {
|
||||
let state_bucket = js
|
||||
.create_key_value(async_nats::jetstream::kv::Config {
|
||||
bucket: BUCKET_DEVICE_STATE.to_string(),
|
||||
@@ -173,81 +179,110 @@ pub async fn run(client: Client, js: async_nats::jetstream::Context) -> anyhow::
|
||||
let devices_api: Api<Device> = Api::all(client.clone());
|
||||
let patch_api: Api<Deployment> = Api::all(client);
|
||||
|
||||
tracing::info!(
|
||||
owned = state
|
||||
.lock()
|
||||
.await
|
||||
.owned_targets
|
||||
.values()
|
||||
.map(|s| s.len())
|
||||
.sum::<usize>(),
|
||||
"aggregator: startup complete"
|
||||
);
|
||||
// Compute before logging: a `.await` inside the macro would hold the
|
||||
// event's non-Send format temporaries across it, making `run` non-Send.
|
||||
let owned_count: usize = {
|
||||
let guard = state.lock().await;
|
||||
guard.owned_targets.values().map(|s| s.len()).sum()
|
||||
};
|
||||
tracing::info!(owned = owned_count, "aggregator: startup complete");
|
||||
|
||||
let state_watcher_handle = {
|
||||
// A JoinSet so cancelling `run` (its future dropped) aborts every child
|
||||
// task — no orphan watchers outliving the aggregator. The whole point of
|
||||
// recovery is a clean stop/start, so this cancellation property matters
|
||||
// both in tests (restart simulation) and in process teardown.
|
||||
let mut tasks = tokio::task::JoinSet::new();
|
||||
|
||||
{
|
||||
let state = state.clone();
|
||||
let bucket = state_bucket.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_state_kv_watcher(bucket, state).await {
|
||||
let liveness = liveness.clone();
|
||||
tasks.spawn(async move {
|
||||
if let Err(e) = run_state_kv_watcher(bucket, state, liveness).await {
|
||||
tracing::warn!(error = %e, "aggregator: state watcher exited");
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
let deployment_watcher_handle = {
|
||||
});
|
||||
}
|
||||
{
|
||||
let state = state.clone();
|
||||
let desired = desired_bucket.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_deployment_watcher(deployments_api.clone(), state, desired).await {
|
||||
let liveness = liveness.clone();
|
||||
tasks.spawn(async move {
|
||||
if let Err(e) =
|
||||
run_deployment_watcher(deployments_api.clone(), state, desired, liveness).await
|
||||
{
|
||||
tracing::warn!(error = %e, "aggregator: deployment watcher exited");
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
let device_watcher_handle = {
|
||||
});
|
||||
}
|
||||
{
|
||||
let state = state.clone();
|
||||
let desired = desired_bucket.clone();
|
||||
tokio::spawn(async move {
|
||||
if let Err(e) = run_device_watcher(devices_api, state, desired).await {
|
||||
let liveness = liveness.clone();
|
||||
tasks.spawn(async move {
|
||||
if let Err(e) = run_device_watcher(devices_api, state, desired, liveness).await {
|
||||
tracing::warn!(error = %e, "aggregator: device watcher exited");
|
||||
}
|
||||
})
|
||||
};
|
||||
|
||||
let patch_state = state.clone();
|
||||
let patch_loop = async move {
|
||||
let mut ticker = tokio::time::interval(PATCH_TICK);
|
||||
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
|
||||
loop {
|
||||
ticker.tick().await;
|
||||
if let Err(e) = patch_tick(&patch_api, &patch_state).await {
|
||||
tracing::warn!(error = %e, "aggregator: patch tick failed");
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
tokio::select! {
|
||||
_ = patch_loop => Ok(()),
|
||||
_ = state_watcher_handle => Ok(()),
|
||||
_ = deployment_watcher_handle => Ok(()),
|
||||
_ = device_watcher_handle => Ok(()),
|
||||
});
|
||||
}
|
||||
{
|
||||
let patch_state = state.clone();
|
||||
tasks.spawn(async move {
|
||||
let mut ticker = tokio::time::interval(PATCH_TICK);
|
||||
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
|
||||
loop {
|
||||
ticker.tick().await;
|
||||
if let Err(e) = patch_tick(&patch_api, &patch_state).await {
|
||||
tracing::warn!(error = %e, "aggregator: patch tick failed");
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
// Return when the first task exits (mirrors the old `select!`); dropping
|
||||
// `tasks` here aborts the rest.
|
||||
tasks.join_next().await;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Device-state KV watcher (unchanged path)
|
||||
// ---------------------------------------------------------------------------
|
||||
|
||||
/// True when the bucket currently holds no keys. Used to short-circuit the
|
||||
/// caught-up signal: an empty bucket's watch yields no entry to carry
|
||||
/// `seen_current`.
|
||||
async fn bucket_is_empty(bucket: &Store) -> bool {
|
||||
match bucket.keys().await {
|
||||
Ok(mut keys) => keys.next().await.is_none(),
|
||||
Err(e) => {
|
||||
tracing::warn!(error = %e, "aggregator: keys() probe failed; assuming non-empty");
|
||||
false
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn parse_state_key(key: &str) -> Option<DevicePair> {
|
||||
let rest = key.strip_prefix("state.")?;
|
||||
let (device, deployment) = rest.split_once('.')?;
|
||||
let (device, deployment) = rest.rsplit_once('.')?;
|
||||
Some(DevicePair {
|
||||
device_id: device.to_string(),
|
||||
deployment: DeploymentName::try_new(deployment).ok()?,
|
||||
})
|
||||
}
|
||||
|
||||
async fn run_state_kv_watcher(bucket: Store, state: SharedFleetState) -> anyhow::Result<()> {
|
||||
async fn run_state_kv_watcher(
|
||||
bucket: Store,
|
||||
state: SharedFleetState,
|
||||
liveness: OperatorLiveness,
|
||||
) -> anyhow::Result<()> {
|
||||
// An empty bucket replays nothing, so `seen_current` never surfaces on an
|
||||
// entry — mark the state source caught-up up front when there's nothing to
|
||||
// replay. (A write landing just after this is replayed by the watch below
|
||||
// and re-marks; the mark is an idempotent latch.)
|
||||
if bucket_is_empty(&bucket).await {
|
||||
liveness.mark_states_ready();
|
||||
}
|
||||
let mut watch = bucket.watch_with_history(">").await?;
|
||||
while let Some(entry_res) = watch.next().await {
|
||||
let entry = match entry_res {
|
||||
@@ -257,6 +292,11 @@ async fn run_state_kv_watcher(bucket: Store, state: SharedFleetState) -> anyhow:
|
||||
continue;
|
||||
}
|
||||
};
|
||||
// First entry that has caught up to the bucket head means the cold
|
||||
// replay is done.
|
||||
if entry.seen_current {
|
||||
liveness.mark_states_ready();
|
||||
}
|
||||
let Some(pair) = parse_state_key(&entry.key) else {
|
||||
continue;
|
||||
};
|
||||
@@ -359,6 +399,7 @@ async fn run_deployment_watcher(
|
||||
api: Api<Deployment>,
|
||||
state: SharedFleetState,
|
||||
desired: Store,
|
||||
liveness: OperatorLiveness,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut stream = watcher::watcher(api, WatcherConfig::default()).boxed();
|
||||
while let Some(event) = stream.try_next().await? {
|
||||
@@ -369,12 +410,60 @@ async fn run_deployment_watcher(
|
||||
Event::Delete(cr) => {
|
||||
on_deployment_delete(&state, &desired, cr).await;
|
||||
}
|
||||
Event::Init | Event::InitDone => {}
|
||||
Event::Init => {}
|
||||
Event::InitDone => {
|
||||
// The initial CR list is fully replayed; the deployment cache is
|
||||
// now authoritative. GC desired-state for any deployment that
|
||||
// was seeded from KV but has no surviving CR (deleted while we
|
||||
// were down — finalizer bypassed, e.g. namespace teardown),
|
||||
// then declare this source converged.
|
||||
gc_orphaned_desired_state(&state, &desired).await;
|
||||
liveness.mark_deployments_ready();
|
||||
}
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
/// Purge desired-state entries whose Deployment CR no longer exists. Runs once,
|
||||
/// after the initial CR list is loaded: `owned_targets` was seeded from the KV
|
||||
/// (so it includes deployments the previous operator wrote), and any name with
|
||||
/// no CR in the now-complete cache is an orphan. Belt-and-suspenders with the
|
||||
/// controller finalizer, which covers deletes that happen while we're *up*.
|
||||
async fn gc_orphaned_desired_state(state: &SharedFleetState, desired: &Store) {
|
||||
let orphans: Vec<(DeploymentName, HashSet<String>)> = {
|
||||
let mut guard = state.lock().await;
|
||||
let live: HashSet<DeploymentName> = guard
|
||||
.deployments
|
||||
.values()
|
||||
.map(|d| d.deployment_name.clone())
|
||||
.collect();
|
||||
let orphan_names: Vec<DeploymentName> = guard
|
||||
.owned_targets
|
||||
.keys()
|
||||
.filter(|n| !live.contains(*n))
|
||||
.cloned()
|
||||
.collect();
|
||||
orphan_names
|
||||
.into_iter()
|
||||
.filter_map(|n| guard.owned_targets.remove(&n).map(|set| (n, set)))
|
||||
.collect()
|
||||
};
|
||||
for (deployment_name, devices) in orphans {
|
||||
for device in &devices {
|
||||
let k = desired_state_key(device, &deployment_name);
|
||||
if let Err(e) = desired.delete(&k).await {
|
||||
tracing::debug!(key = %k, error = %e, "aggregator: orphan desired-state delete failed");
|
||||
}
|
||||
}
|
||||
tracing::info!(
|
||||
deployment = ?deployment_name,
|
||||
count = devices.len(),
|
||||
"aggregator: GC'd orphaned desired-state for deleted deployment"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
async fn on_deployment_upsert(state: &SharedFleetState, desired: &Store, cr: Deployment) {
|
||||
let Some(key) = DeploymentKey::from_cr(&cr) else {
|
||||
return;
|
||||
@@ -467,6 +556,7 @@ async fn run_device_watcher(
|
||||
api: Api<Device>,
|
||||
state: SharedFleetState,
|
||||
desired: Store,
|
||||
liveness: OperatorLiveness,
|
||||
) -> anyhow::Result<()> {
|
||||
let mut stream = watcher::watcher(api, WatcherConfig::default()).boxed();
|
||||
while let Some(event) = stream.try_next().await? {
|
||||
@@ -477,7 +567,9 @@ async fn run_device_watcher(
|
||||
Event::Delete(dev) => {
|
||||
on_device_delete(&state, &desired, dev).await;
|
||||
}
|
||||
Event::Init | Event::InitDone => {}
|
||||
Event::Init => {}
|
||||
// Initial CR list fully replayed into the cache.
|
||||
Event::InitDone => liveness.mark_devices_ready(),
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
@@ -613,10 +705,10 @@ async fn seed_owned_targets(bucket: &Store, state: &SharedFleetState) -> anyhow:
|
||||
let mut keys = bucket.keys().await?;
|
||||
while let Some(key_res) = keys.next().await {
|
||||
let key = key_res?;
|
||||
// Keys are `<device>.<deployment>`. The KV key space carries
|
||||
// no namespace — names are globally unique at this layer —
|
||||
// which is exactly why `owned_targets` keys by DeploymentName.
|
||||
let Some((device, deployment)) = key.split_once('.') else {
|
||||
// Keys are `<device>.<deployment>`. DeploymentName is dot-free
|
||||
// (RFC1123), so rsplit_once is unambiguous even if device ids
|
||||
// contain dots.
|
||||
let Some((device, deployment)) = key.rsplit_once('.') else {
|
||||
warn!("Could not read device.deployment for key {key}");
|
||||
continue;
|
||||
};
|
||||
@@ -838,4 +930,26 @@ mod tests {
|
||||
assert_eq!(matched.len(), 1);
|
||||
assert!(matched.contains("pi-01"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn apply_state_rejects_older_timestamp() {
|
||||
let cached = cached("ns", "hello", "group", "edge");
|
||||
let mut s = FleetState::default();
|
||||
s.deployments.insert(cached.key.clone(), cached.clone());
|
||||
s.owned_targets.insert(
|
||||
cached.deployment_name.clone(),
|
||||
["pi-01"].iter().map(|s| s.to_string()).collect(),
|
||||
);
|
||||
|
||||
let p = pair("pi-01", "hello");
|
||||
apply_state(&mut s, p.clone(), state("pi-01", "hello", Phase::Running, 10));
|
||||
assert_eq!(s.states[&p].phase, Phase::Running);
|
||||
|
||||
apply_state(&mut s, p.clone(), state("pi-01", "hello", Phase::Failed, 5));
|
||||
assert_eq!(
|
||||
s.states[&p].phase,
|
||||
Phase::Running,
|
||||
"older event must not overwrite newer state"
|
||||
);
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,5 +1,7 @@
|
||||
use maud::{DOCTYPE, Markup, PreEscaped, html};
|
||||
|
||||
use harmony_fleet_operator::liveness::RecoveryPhase;
|
||||
|
||||
use crate::frontend::auth::DashboardSession;
|
||||
|
||||
// ── Inline SVG icons ────────────────────────────────────────────────────
|
||||
@@ -41,6 +43,9 @@ pub fn page(
|
||||
(sidebar(current_path, session, unacked_alerts))
|
||||
main class="flex-1 min-w-0 flex flex-col overflow-hidden" {
|
||||
(topbar(title, unacked_alerts))
|
||||
// Bootstrap the recovery banner; the fragment self-polls
|
||||
// while recovering and goes inert once converged.
|
||||
div id="recovery-banner" hx-get="/__recovery" hx-trigger="load" hx-swap="outerHTML" {}
|
||||
div class="flex-1 overflow-y-auto grid-bg" { (content) }
|
||||
}
|
||||
}
|
||||
@@ -50,6 +55,28 @@ pub fn page(
|
||||
}
|
||||
}
|
||||
|
||||
/// Recovery-state banner fragment swapped into `#recovery-banner`.
|
||||
///
|
||||
/// `Recovering` → a visible banner that re-requests itself every 2s.
|
||||
/// `Converged` → an inert element with no trigger, so polling stops and the
|
||||
/// banner disappears. Same element id either way so HTMX `outerHTML` swaps cleanly.
|
||||
pub fn recovery_banner(phase: RecoveryPhase) -> Markup {
|
||||
match phase {
|
||||
RecoveryPhase::Recovering => html! {
|
||||
div id="recovery-banner"
|
||||
hx-get="/__recovery" hx-trigger="load delay:2s" hx-swap="outerHTML"
|
||||
class="flex items-center gap-2 px-6 py-2 text-[12px] border-b"
|
||||
style="background:rgba(234,179,8,0.10); border-color:rgba(234,179,8,0.25); color:#fde68a" {
|
||||
span class="inline-block w-2 h-2 rounded-full animate-pulse" style="background:#eab308" {}
|
||||
"Operator recovering — rebuilding fleet state from NATS. Counts may be briefly stale."
|
||||
}
|
||||
},
|
||||
RecoveryPhase::Converged => html! {
|
||||
div id="recovery-banner" {}
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
fn sidebar(
|
||||
current_path: &str,
|
||||
session: Option<&DashboardSession>,
|
||||
|
||||
@@ -121,6 +121,8 @@ pub fn router(state: AppState) -> Router {
|
||||
// Settings
|
||||
.route("/settings", get(settings_handler))
|
||||
.route("/settings/toggle/{key}", post(settings_toggle_handler))
|
||||
// Operator recovery banner (HTMX self-poll while recovering)
|
||||
.route("/__recovery", get(recovery_banner_handler))
|
||||
// Logout
|
||||
.route("/logout", get(auth::logout_handler))
|
||||
.route_layer(middleware::from_fn_with_state(state.clone(), csrf_protect))
|
||||
@@ -615,6 +617,13 @@ async fn device_logs_handler(
|
||||
Ok(devices_view::logs_modal(&id, q.deployment.as_deref()))
|
||||
}
|
||||
|
||||
/// Recovery banner fragment. While the aggregator replays KV/CR history this
|
||||
/// returns a banner that re-fetches itself every 2s; once converged it returns
|
||||
/// an inert empty element, so the poll naturally stops.
|
||||
async fn recovery_banner_handler(State(s): State<AppState>) -> Markup {
|
||||
crate::frontend::layout::recovery_banner(s.fleet.recovery_phase())
|
||||
}
|
||||
|
||||
/// One-shot `podman logs` tail of the device's deployment container,
|
||||
/// fetched over the NATS command channel. Rendered as the log panel
|
||||
/// body (HTMX swaps it in on `load` and on Refresh).
|
||||
|
||||
@@ -16,3 +16,4 @@ pub mod commands;
|
||||
pub mod device_reconciler;
|
||||
pub mod device_status;
|
||||
pub mod fleet_aggregator;
|
||||
pub mod liveness;
|
||||
|
||||
103
fleet/harmony-fleet-operator/src/liveness.rs
Normal file
103
fleet/harmony-fleet-operator/src/liveness.rs
Normal file
@@ -0,0 +1,103 @@
|
||||
//! Operator recovery liveness.
|
||||
//!
|
||||
//! After a restart the aggregator cold-rebuilds its caches from NATS KV +
|
||||
//! kube CR watches. Until that replay completes, the dashboard's counts can
|
||||
//! be stale — so we expose a coarse [`RecoveryPhase`] the UI shows as a banner
|
||||
//! ("recovering" → "converged"), and the customer sees progress instead of a
|
||||
//! silent stale view.
|
||||
//!
|
||||
//! Convergence = all three cold-start sources have replayed their current
|
||||
//! contents at least once: Deployment CRs, Device CRs, and the device-state KV.
|
||||
//! Each is a one-way latch (`Recovering` → `Converged`, never back), set from
|
||||
//! the natural caught-up signals (`watcher::Event::InitDone`, KV
|
||||
//! `Entry::seen_current`).
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::sync::atomic::{AtomicBool, Ordering};
|
||||
|
||||
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
|
||||
pub enum RecoveryPhase {
|
||||
/// Still replaying KV / CR history; counts may be stale.
|
||||
Recovering,
|
||||
/// All cold-start sources replayed; the picture is current.
|
||||
Converged,
|
||||
}
|
||||
|
||||
/// Cheap-to-clone handle shared between the aggregator (writer) and the
|
||||
/// in-process dashboard (reader). Both live in one operator process, so a
|
||||
/// shared atomic is all the coordination needed.
|
||||
#[derive(Clone, Default)]
|
||||
pub struct OperatorLiveness {
|
||||
inner: Arc<Inner>,
|
||||
}
|
||||
|
||||
#[derive(Default)]
|
||||
struct Inner {
|
||||
deployments_ready: AtomicBool,
|
||||
devices_ready: AtomicBool,
|
||||
states_ready: AtomicBool,
|
||||
}
|
||||
|
||||
impl OperatorLiveness {
|
||||
pub fn new() -> Self {
|
||||
Self::default()
|
||||
}
|
||||
|
||||
pub fn mark_deployments_ready(&self) {
|
||||
self.inner.deployments_ready.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn mark_devices_ready(&self) {
|
||||
self.inner.devices_ready.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn mark_states_ready(&self) {
|
||||
self.inner.states_ready.store(true, Ordering::Relaxed);
|
||||
}
|
||||
|
||||
pub fn phase(&self) -> RecoveryPhase {
|
||||
let i = &self.inner;
|
||||
if i.deployments_ready.load(Ordering::Relaxed)
|
||||
&& i.devices_ready.load(Ordering::Relaxed)
|
||||
&& i.states_ready.load(Ordering::Relaxed)
|
||||
{
|
||||
RecoveryPhase::Converged
|
||||
} else {
|
||||
RecoveryPhase::Recovering
|
||||
}
|
||||
}
|
||||
|
||||
pub fn is_converged(&self) -> bool {
|
||||
self.phase() == RecoveryPhase::Converged
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn starts_recovering_and_needs_all_three_sources() {
|
||||
let l = OperatorLiveness::new();
|
||||
assert_eq!(l.phase(), RecoveryPhase::Recovering);
|
||||
l.mark_deployments_ready();
|
||||
l.mark_devices_ready();
|
||||
assert_eq!(l.phase(), RecoveryPhase::Recovering, "states still pending");
|
||||
l.mark_states_ready();
|
||||
assert_eq!(l.phase(), RecoveryPhase::Converged);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn converged_is_a_one_way_latch_and_clones_share_state() {
|
||||
let a = OperatorLiveness::new();
|
||||
let b = a.clone();
|
||||
a.mark_deployments_ready();
|
||||
a.mark_devices_ready();
|
||||
a.mark_states_ready();
|
||||
// The reader clone observes the writer's marks.
|
||||
assert!(b.is_converged());
|
||||
// Marking again is idempotent — no way back to Recovering.
|
||||
b.mark_states_ready();
|
||||
assert!(a.is_converged());
|
||||
}
|
||||
}
|
||||
@@ -129,7 +129,11 @@ async fn serve_web(
|
||||
// Standalone serve-web has no NATS, so device exec/log tailing is
|
||||
// unavailable here — those features need the deployed operator
|
||||
// (`run`), which threads in the command channel.
|
||||
Arc::new(RealFleetService::new(Client::try_default().await?, None))
|
||||
Arc::new(RealFleetService::new(
|
||||
Client::try_default().await?,
|
||||
None,
|
||||
None,
|
||||
))
|
||||
};
|
||||
serve_dashboard(fleet, addr, css_from, live_reload).await
|
||||
}
|
||||
@@ -192,6 +196,7 @@ async fn serve_dashboard(
|
||||
fn spawn_dashboard(
|
||||
client: Client,
|
||||
commands: harmony_fleet_operator::commands::FleetCommandsClient,
|
||||
liveness: harmony_fleet_operator::liveness::OperatorLiveness,
|
||||
) {
|
||||
use std::net::SocketAddr;
|
||||
use std::sync::Arc;
|
||||
@@ -200,7 +205,11 @@ fn spawn_dashboard(
|
||||
|
||||
let addr = SocketAddr::from(([0, 0, 0, 0], frontend::server::DEFAULT_PORT));
|
||||
tokio::spawn(async move {
|
||||
let fleet = Arc::new(RealFleetService::new(client, Some(commands)));
|
||||
let fleet = Arc::new(RealFleetService::new(
|
||||
client,
|
||||
Some(commands),
|
||||
Some(liveness),
|
||||
));
|
||||
if let Err(e) = serve_dashboard(fleet, addr, None, false).await {
|
||||
tracing::error!(error = %e, "dashboard server exited; reconcile continues");
|
||||
}
|
||||
@@ -221,6 +230,10 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()>
|
||||
|
||||
let client = Client::try_default().await?;
|
||||
|
||||
// Shared recovery signal: the aggregator latches it to Converged once it
|
||||
// has replayed KV + CR history; the dashboard reads it for the banner.
|
||||
let liveness = harmony_fleet_operator::liveness::OperatorLiveness::new();
|
||||
|
||||
// Serve the read-only dashboard in the same process (best-effort).
|
||||
// It reads CRs for state and uses the NATS command channel for
|
||||
// device exec / log tailing. Built only with the web-frontend
|
||||
@@ -229,6 +242,7 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()>
|
||||
spawn_dashboard(
|
||||
client.clone(),
|
||||
harmony_fleet_operator::commands::FleetCommandsClient::new(nats.clone()),
|
||||
liveness.clone(),
|
||||
);
|
||||
|
||||
// Concurrent tasks:
|
||||
@@ -248,7 +262,7 @@ async fn run(nats_url: &str, bucket: &str, credentials_toml: &str) -> Result<()>
|
||||
r = controller::run(ctl_client, desired_state_kv) => r,
|
||||
r = device_reconciler::run(dr_client, dr_js) => r,
|
||||
r = device_status::run(ds_client, ds_js) => r,
|
||||
r = fleet_aggregator::run(client, js) => r,
|
||||
r = fleet_aggregator::run(client, js, liveness) => r,
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -45,6 +45,12 @@ pub trait FleetService: Send + Sync + 'static {
|
||||
device_id: &str,
|
||||
deployment: Option<&str>,
|
||||
) -> anyhow::Result<String>;
|
||||
/// Coarse operator recovery state for the dashboard banner. Sync — a cheap
|
||||
/// in-memory read, not a CR/NATS round-trip. Defaults to `Converged`, which
|
||||
/// the mock and the standalone `serve-web` path (no aggregator) inherit.
|
||||
fn recovery_phase(&self) -> harmony_fleet_operator::liveness::RecoveryPhase {
|
||||
harmony_fleet_operator::liveness::RecoveryPhase::Converged
|
||||
}
|
||||
}
|
||||
|
||||
// ── Device ─────────────────────────────────────────────────────────────
|
||||
|
||||
@@ -25,6 +25,7 @@ use harmony::modules::fleet::operator::{
|
||||
use harmony::modules::podman::ReconcileScore;
|
||||
use harmony_fleet_operator::commands::FleetCommandsClient;
|
||||
use harmony_fleet_operator::fleet_aggregator::selector_matches;
|
||||
use harmony_fleet_operator::liveness::{OperatorLiveness, RecoveryPhase};
|
||||
use harmony_reconciler_contracts::ExecReply;
|
||||
|
||||
use super::{
|
||||
@@ -46,14 +47,23 @@ pub struct RealFleetService {
|
||||
/// In-memory ack set. Alerts are derived from live CR state and
|
||||
/// have no store of their own, so acks don't survive a restart.
|
||||
acked_alerts: Mutex<HashSet<String>>,
|
||||
/// Aggregator recovery signal, shared in-process. `None` in the
|
||||
/// standalone `serve-web` path (no aggregator running) → reported as
|
||||
/// `Converged` so the banner never shows there.
|
||||
liveness: Option<OperatorLiveness>,
|
||||
}
|
||||
|
||||
impl RealFleetService {
|
||||
pub fn new(kube: Client, commands: Option<FleetCommandsClient>) -> Self {
|
||||
pub fn new(
|
||||
kube: Client,
|
||||
commands: Option<FleetCommandsClient>,
|
||||
liveness: Option<OperatorLiveness>,
|
||||
) -> Self {
|
||||
Self {
|
||||
kube,
|
||||
commands,
|
||||
acked_alerts: Mutex::new(HashSet::new()),
|
||||
liveness,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -360,6 +370,13 @@ fn derive_alerts(
|
||||
|
||||
#[async_trait]
|
||||
impl FleetService for RealFleetService {
|
||||
fn recovery_phase(&self) -> RecoveryPhase {
|
||||
self.liveness
|
||||
.as_ref()
|
||||
.map(|l| l.phase())
|
||||
.unwrap_or(RecoveryPhase::Converged)
|
||||
}
|
||||
|
||||
async fn dashboard_detail(&self) -> anyhow::Result<DashboardDetail> {
|
||||
let devices = self.devices().await?;
|
||||
let mut deployments = self.deployments().await?;
|
||||
|
||||
Reference in New Issue
Block a user
This should not exist on a live code path. At the very least use cfg test so it does not make it in the release binary.