842 lines
30 KiB
Rust
842 lines
30 KiB
Rust
//! Operator-side aggregator + desired-state writer.
|
|
//!
|
|
//! Maintains three in-memory caches driven by watches:
|
|
//! - Deployment CRs (kube watch) → what we want to run
|
|
//! - Device CRs (kube watch) → where we could run it
|
|
//! - DeploymentState KV (NATS watch) → what's actually running
|
|
//!
|
|
//! Outputs:
|
|
//! - Writes `desired-state.<device>.<deployment>` KV entries when a
|
|
//! Deployment's selector matches a Device. Deletes them when the
|
|
//! match goes away.
|
|
//! - Patches `Deployment.status.aggregate` at 1 Hz for every CR
|
|
//! whose matched-device set or phase counts changed.
|
|
//!
|
|
//! No separate event stream, no per-key revision tracking: KV watches
|
|
//! are ordered and last-writer-wins, and the dirty set naturally
|
|
//! coalesces high-frequency state churn into one patch per tick.
|
|
|
|
use std::collections::{BTreeMap, HashMap, HashSet};
|
|
use std::sync::Arc;
|
|
use std::time::Duration;
|
|
|
|
use async_nats::jetstream::kv::{Operation, Store};
|
|
use futures_util::{StreamExt, TryStreamExt};
|
|
use harmony_reconciler_contracts::{
|
|
BUCKET_DESIRED_STATE, BUCKET_DEVICE_STATE, DeploymentName, DeploymentState, Phase,
|
|
desired_state_key,
|
|
};
|
|
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
|
|
use kube::api::{Api, Patch, PatchParams};
|
|
use kube::runtime::watcher::{self, Config as WatcherConfig, Event};
|
|
use kube::{Client, ResourceExt};
|
|
use serde_json::json;
|
|
use tokio::sync::Mutex;
|
|
|
|
use harmony::modules::fleet::operator::{
|
|
AggregateLastError, Deployment, DeploymentAggregate, Device,
|
|
};
|
|
use tracing::warn;
|
|
|
|
const PATCH_TICK: Duration = Duration::from_secs(1);
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// State
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/// (namespace, name) identifying a Deployment CR.
|
|
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
|
|
pub struct DeploymentKey {
|
|
pub namespace: String,
|
|
pub name: String,
|
|
}
|
|
|
|
impl DeploymentKey {
|
|
pub fn from_cr(cr: &Deployment) -> Option<Self> {
|
|
Some(Self {
|
|
namespace: cr.namespace()?,
|
|
name: cr.name_any(),
|
|
})
|
|
}
|
|
}
|
|
|
|
/// One `(device, deployment)` pair.
|
|
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
|
|
pub struct DevicePair {
|
|
pub device_id: String,
|
|
pub deployment: DeploymentName,
|
|
}
|
|
|
|
/// Thin projection of a Deployment CR — everything we need for
|
|
/// selector evaluation + desired-state writes + status aggregation,
|
|
/// without borrowing the full kube object.
|
|
#[derive(Debug, Clone)]
|
|
pub struct CachedDeployment {
|
|
key: DeploymentKey,
|
|
deployment_name: DeploymentName,
|
|
selector: LabelSelector,
|
|
/// JSON-serialized score payload ready to `put` into
|
|
/// desired-state. Cached because the same bytes are written to
|
|
/// every matched device's KV entry.
|
|
score_json: Vec<u8>,
|
|
}
|
|
|
|
#[derive(Debug, Default)]
|
|
pub struct FleetState {
|
|
/// Cached Deployment CRs, keyed by (namespace, name).
|
|
deployments: HashMap<DeploymentKey, CachedDeployment>,
|
|
/// Cached Device labels, keyed by `metadata.name`.
|
|
devices: HashMap<String, BTreeMap<String, String>>,
|
|
/// Latest DeploymentState per (device, deployment) pair.
|
|
states: HashMap<DevicePair, DeploymentState>,
|
|
/// Which devices have we pushed desired-state for, per deployment?
|
|
/// Diff against recomputed targets on any change. Keyed by
|
|
/// `DeploymentName` (not `DeploymentKey`) because the
|
|
/// `desired-state` KV key space doesn't carry namespace —
|
|
/// deployment names are globally unique at the NATS level. This
|
|
/// lets cold-start seeding from the KV populate the map
|
|
/// correctly without having to guess namespaces.
|
|
owned_targets: HashMap<DeploymentName, HashSet<String>>,
|
|
/// Per-deployment latest-failure surface for the CR status.
|
|
last_error: HashMap<DeploymentKey, AggregateLastError>,
|
|
/// CR keys whose status needs re-patching on the next tick.
|
|
dirty: HashSet<DeploymentKey>,
|
|
}
|
|
|
|
pub type SharedFleetState = Arc<Mutex<FleetState>>;
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Selector evaluation
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/// Does `selector` match this label set? matchLabels only for MVP —
|
|
/// matchExpressions logs a warning once and is treated as "no match"
|
|
/// until we need it.
|
|
pub fn selector_matches(selector: &LabelSelector, labels: &BTreeMap<String, String>) -> bool {
|
|
if let Some(match_labels) = &selector.match_labels {
|
|
for (k, v) in match_labels {
|
|
if labels.get(k) != Some(v) {
|
|
return false;
|
|
}
|
|
}
|
|
}
|
|
if selector
|
|
.match_expressions
|
|
.as_ref()
|
|
.is_some_and(|v| !v.is_empty())
|
|
{
|
|
tracing::warn!(
|
|
"LabelSelector.matchExpressions is not yet supported; treating CR as empty-selector (matches nothing)"
|
|
);
|
|
return false;
|
|
}
|
|
true
|
|
}
|
|
|
|
/// Set of Device names currently matching `selector`.
|
|
fn matched_devices(
|
|
selector: &LabelSelector,
|
|
devices: &HashMap<String, BTreeMap<String, String>>,
|
|
) -> HashSet<String> {
|
|
devices
|
|
.iter()
|
|
.filter(|(_, labels)| selector_matches(selector, labels))
|
|
.map(|(name, _)| name.clone())
|
|
.collect()
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Top-level run
|
|
// ---------------------------------------------------------------------------
|
|
|
|
pub async fn run(client: Client, js: async_nats::jetstream::Context) -> anyhow::Result<()> {
|
|
let state_bucket = js
|
|
.create_key_value(async_nats::jetstream::kv::Config {
|
|
bucket: BUCKET_DEVICE_STATE.to_string(),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
let desired_bucket = js
|
|
.create_key_value(async_nats::jetstream::kv::Config {
|
|
bucket: BUCKET_DESIRED_STATE.to_string(),
|
|
..Default::default()
|
|
})
|
|
.await?;
|
|
|
|
// Cold-start: initialize owned_targets from the current contents
|
|
// of the desired-state bucket so we don't orphan entries written
|
|
// by a previous operator run.
|
|
let state: SharedFleetState = Arc::new(Mutex::new(FleetState::default()));
|
|
seed_owned_targets(&desired_bucket, &state).await?;
|
|
|
|
let deployments_api: Api<Deployment> = Api::all(client.clone());
|
|
let devices_api: Api<Device> = Api::all(client.clone());
|
|
let patch_api: Api<Deployment> = Api::all(client);
|
|
|
|
tracing::info!(
|
|
owned = state
|
|
.lock()
|
|
.await
|
|
.owned_targets
|
|
.values()
|
|
.map(|s| s.len())
|
|
.sum::<usize>(),
|
|
"aggregator: startup complete"
|
|
);
|
|
|
|
let state_watcher_handle = {
|
|
let state = state.clone();
|
|
let bucket = state_bucket.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(e) = run_state_kv_watcher(bucket, state).await {
|
|
tracing::warn!(error = %e, "aggregator: state watcher exited");
|
|
}
|
|
})
|
|
};
|
|
|
|
let deployment_watcher_handle = {
|
|
let state = state.clone();
|
|
let desired = desired_bucket.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(e) = run_deployment_watcher(deployments_api.clone(), state, desired).await {
|
|
tracing::warn!(error = %e, "aggregator: deployment watcher exited");
|
|
}
|
|
})
|
|
};
|
|
|
|
let device_watcher_handle = {
|
|
let state = state.clone();
|
|
let desired = desired_bucket.clone();
|
|
tokio::spawn(async move {
|
|
if let Err(e) = run_device_watcher(devices_api, state, desired).await {
|
|
tracing::warn!(error = %e, "aggregator: device watcher exited");
|
|
}
|
|
})
|
|
};
|
|
|
|
let patch_state = state.clone();
|
|
let patch_loop = async move {
|
|
let mut ticker = tokio::time::interval(PATCH_TICK);
|
|
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
|
|
loop {
|
|
ticker.tick().await;
|
|
if let Err(e) = patch_tick(&patch_api, &patch_state).await {
|
|
tracing::warn!(error = %e, "aggregator: patch tick failed");
|
|
}
|
|
}
|
|
};
|
|
|
|
tokio::select! {
|
|
_ = patch_loop => Ok(()),
|
|
_ = state_watcher_handle => Ok(()),
|
|
_ = deployment_watcher_handle => Ok(()),
|
|
_ = device_watcher_handle => Ok(()),
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Device-state KV watcher (unchanged path)
|
|
// ---------------------------------------------------------------------------
|
|
|
|
fn parse_state_key(key: &str) -> Option<DevicePair> {
|
|
let rest = key.strip_prefix("state.")?;
|
|
let (device, deployment) = rest.split_once('.')?;
|
|
Some(DevicePair {
|
|
device_id: device.to_string(),
|
|
deployment: DeploymentName::try_new(deployment).ok()?,
|
|
})
|
|
}
|
|
|
|
async fn run_state_kv_watcher(bucket: Store, state: SharedFleetState) -> anyhow::Result<()> {
|
|
let mut watch = bucket.watch_with_history(">").await?;
|
|
while let Some(entry_res) = watch.next().await {
|
|
let entry = match entry_res {
|
|
Ok(e) => e,
|
|
Err(e) => {
|
|
tracing::warn!(error = %e, "aggregator: state watch delivery error");
|
|
continue;
|
|
}
|
|
};
|
|
let Some(pair) = parse_state_key(&entry.key) else {
|
|
continue;
|
|
};
|
|
match entry.operation {
|
|
Operation::Put => {
|
|
let ds: DeploymentState = match serde_json::from_slice(&entry.value) {
|
|
Ok(d) => d,
|
|
Err(e) => {
|
|
tracing::warn!(key = %entry.key, error = %e, "aggregator: bad device_state payload");
|
|
continue;
|
|
}
|
|
};
|
|
let mut guard = state.lock().await;
|
|
apply_state(&mut guard, pair, ds);
|
|
}
|
|
Operation::Delete | Operation::Purge => {
|
|
let mut guard = state.lock().await;
|
|
drop_state(&mut guard, &pair);
|
|
}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Record a device's latest state, dedup against older timestamps,
|
|
/// maintain last_error, mark the deployment dirty.
|
|
pub fn apply_state(state: &mut FleetState, pair: DevicePair, ds: DeploymentState) {
|
|
if let Some(prev) = state.states.get(&pair) {
|
|
if prev.last_event_at > ds.last_event_at {
|
|
return;
|
|
}
|
|
}
|
|
let phase = ds.phase;
|
|
let device_id = ds.device_id.to_string();
|
|
let last_error_msg = ds.last_error.clone();
|
|
let at = ds.last_event_at.to_rfc3339();
|
|
state.states.insert(pair.clone(), ds);
|
|
|
|
for key in matching_deployment_keys(state, &pair.deployment) {
|
|
match phase {
|
|
Phase::Failed => {
|
|
if let Some(msg) = last_error_msg.as_deref() {
|
|
state.last_error.insert(
|
|
key.clone(),
|
|
AggregateLastError {
|
|
device_id: device_id.clone(),
|
|
message: msg.to_string(),
|
|
at: at.clone(),
|
|
},
|
|
);
|
|
}
|
|
}
|
|
Phase::Running => {
|
|
if let Some(existing) = state.last_error.get(&key) {
|
|
if existing.device_id == device_id {
|
|
state.last_error.remove(&key);
|
|
}
|
|
}
|
|
}
|
|
Phase::Pending => {}
|
|
}
|
|
state.dirty.insert(key);
|
|
}
|
|
}
|
|
|
|
pub fn drop_state(state: &mut FleetState, pair: &DevicePair) {
|
|
let Some(removed) = state.states.remove(pair) else {
|
|
return;
|
|
};
|
|
let device_id = removed.device_id.to_string();
|
|
for key in matching_deployment_keys(state, &pair.deployment) {
|
|
if let Some(existing) = state.last_error.get(&key) {
|
|
if existing.device_id == device_id {
|
|
state.last_error.remove(&key);
|
|
}
|
|
}
|
|
state.dirty.insert(key);
|
|
}
|
|
}
|
|
|
|
/// CR keys that carry a given deployment name. Deployment names are
|
|
/// globally unique at the KV level, so typically 0 or 1 entry here;
|
|
/// Vec lets us surface a warning rather than panic if a misconfigured
|
|
/// cluster has duplicates across namespaces.
|
|
fn matching_deployment_keys(state: &FleetState, deployment: &DeploymentName) -> Vec<DeploymentKey> {
|
|
state
|
|
.deployments
|
|
.values()
|
|
.filter(|d| &d.deployment_name == deployment)
|
|
.map(|d| d.key.clone())
|
|
.collect()
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Deployment CR watcher
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/// Watch Deployment CRD in k8s API and update nats desired state accordingly
|
|
async fn run_deployment_watcher(
|
|
api: Api<Deployment>,
|
|
state: SharedFleetState,
|
|
desired: Store,
|
|
) -> anyhow::Result<()> {
|
|
let mut stream = watcher::watcher(api, WatcherConfig::default()).boxed();
|
|
while let Some(event) = stream.try_next().await? {
|
|
match event {
|
|
Event::Apply(cr) | Event::InitApply(cr) => {
|
|
on_deployment_upsert(&state, &desired, cr).await;
|
|
}
|
|
Event::Delete(cr) => {
|
|
on_deployment_delete(&state, &desired, cr).await;
|
|
}
|
|
Event::Init | Event::InitDone => {}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn on_deployment_upsert(state: &SharedFleetState, desired: &Store, cr: Deployment) {
|
|
let Some(key) = DeploymentKey::from_cr(&cr) else {
|
|
return;
|
|
};
|
|
let Ok(deployment_name) = DeploymentName::try_new(&key.name) else {
|
|
tracing::warn!(name = %key.name, "aggregator: CR name is not a valid DeploymentName, skipping");
|
|
return;
|
|
};
|
|
let selector = cr.spec.target_selector.clone();
|
|
let score_json = match serde_json::to_vec(&cr.spec.score) {
|
|
Ok(v) => v,
|
|
Err(e) => {
|
|
tracing::warn!(namespace = %key.namespace, name = %key.name, error = %e, "aggregator: score payload not serializable");
|
|
return;
|
|
}
|
|
};
|
|
|
|
let (new_targets, previous_targets) = {
|
|
let mut guard = state.lock().await;
|
|
let new_targets = matched_devices(&selector, &guard.devices);
|
|
guard.deployments.insert(
|
|
key.clone(),
|
|
CachedDeployment {
|
|
key: key.clone(),
|
|
deployment_name: deployment_name.clone(),
|
|
selector: selector.clone(),
|
|
score_json: score_json.clone(),
|
|
},
|
|
);
|
|
let previous = guard
|
|
.owned_targets
|
|
.remove(&deployment_name)
|
|
.unwrap_or_default();
|
|
guard
|
|
.owned_targets
|
|
.insert(deployment_name.clone(), new_targets.clone());
|
|
guard.dirty.insert(key.clone());
|
|
(new_targets, previous)
|
|
};
|
|
|
|
reconcile_kv(
|
|
desired,
|
|
&deployment_name,
|
|
&new_targets,
|
|
&previous_targets,
|
|
&score_json,
|
|
)
|
|
.await;
|
|
}
|
|
|
|
async fn on_deployment_delete(state: &SharedFleetState, desired: &Store, cr: Deployment) {
|
|
let Some(key) = DeploymentKey::from_cr(&cr) else {
|
|
return;
|
|
};
|
|
let Ok(deployment_name) = DeploymentName::try_new(&key.name) else {
|
|
return;
|
|
};
|
|
|
|
let previous = {
|
|
let mut guard = state.lock().await;
|
|
guard.deployments.remove(&key);
|
|
guard.last_error.remove(&key);
|
|
guard.dirty.remove(&key);
|
|
guard
|
|
.owned_targets
|
|
.remove(&deployment_name)
|
|
.unwrap_or_default()
|
|
};
|
|
|
|
// Every previously-owned target becomes a KV delete. Controller
|
|
// finalizer does a belt-and-suspenders scan, but we pull our own
|
|
// entries here too so agents react immediately.
|
|
for device in &previous {
|
|
let k = desired_state_key(device, &deployment_name);
|
|
if let Err(e) = desired.delete(&k).await {
|
|
tracing::debug!(key = %k, error = %e, "aggregator: desired-state delete on CR delete failed");
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Device CR watcher
|
|
// ---------------------------------------------------------------------------
|
|
|
|
/// Watch k8s Device CRD and adjust desired state.
|
|
///
|
|
/// For example, if a device adds or deletes a label, its desired state will contain deployments matching
|
|
/// the device's new set of labels.
|
|
async fn run_device_watcher(
|
|
api: Api<Device>,
|
|
state: SharedFleetState,
|
|
desired: Store,
|
|
) -> anyhow::Result<()> {
|
|
let mut stream = watcher::watcher(api, WatcherConfig::default()).boxed();
|
|
while let Some(event) = stream.try_next().await? {
|
|
match event {
|
|
Event::Apply(dev) | Event::InitApply(dev) => {
|
|
on_device_upsert(&state, &desired, dev).await;
|
|
}
|
|
Event::Delete(dev) => {
|
|
on_device_delete(&state, &desired, dev).await;
|
|
}
|
|
Event::Init | Event::InitDone => {}
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
async fn on_device_upsert(state: &SharedFleetState, desired: &Store, dev: Device) {
|
|
let name = dev.name_any();
|
|
let labels: BTreeMap<String, String> = dev.metadata.labels.clone().unwrap_or_default();
|
|
|
|
// For every deployment, compute whether this single device now
|
|
// matches vs. previously matched; diff against owned_targets;
|
|
// collect the KV writes/deletes to perform after the lock is
|
|
// released.
|
|
let per_deployment: Vec<(CachedDeployment, bool, bool)> = {
|
|
let mut guard = state.lock().await;
|
|
let snapshot: Vec<CachedDeployment> = guard.deployments.values().cloned().collect();
|
|
let previously_matched: HashMap<DeploymentName, bool> = snapshot
|
|
.iter()
|
|
.map(|d| {
|
|
let was = guard
|
|
.owned_targets
|
|
.get(&d.deployment_name)
|
|
.is_some_and(|set| set.contains(&name));
|
|
(d.deployment_name.clone(), was)
|
|
})
|
|
.collect();
|
|
guard.devices.insert(name.clone(), labels.clone());
|
|
|
|
let mut out = Vec::with_capacity(snapshot.len());
|
|
for d in snapshot {
|
|
let was = previously_matched
|
|
.get(&d.deployment_name)
|
|
.copied()
|
|
.unwrap_or(false);
|
|
let now = selector_matches(&d.selector, &labels);
|
|
if was != now {
|
|
let targets = guard
|
|
.owned_targets
|
|
.entry(d.deployment_name.clone())
|
|
.or_default();
|
|
if now {
|
|
targets.insert(name.clone());
|
|
} else {
|
|
targets.remove(&name);
|
|
}
|
|
guard.dirty.insert(d.key.clone());
|
|
}
|
|
out.push((d, was, now));
|
|
}
|
|
out
|
|
};
|
|
|
|
for (cached, was, now) in per_deployment {
|
|
match (was, now) {
|
|
(false, true) => {
|
|
let k = desired_state_key(&name, &cached.deployment_name);
|
|
if let Err(e) = desired.put(&k, cached.score_json.clone().into()).await {
|
|
tracing::debug!(key = %k, error = %e, "aggregator: desired-state put failed");
|
|
}
|
|
}
|
|
(true, false) => {
|
|
let k = desired_state_key(&name, &cached.deployment_name);
|
|
if let Err(e) = desired.delete(&k).await {
|
|
tracing::debug!(key = %k, error = %e, "aggregator: desired-state delete failed");
|
|
}
|
|
}
|
|
_ => {}
|
|
}
|
|
}
|
|
}
|
|
|
|
async fn on_device_delete(state: &SharedFleetState, desired: &Store, dev: Device) {
|
|
let name = dev.name_any();
|
|
let removed_from: Vec<DeploymentName> = {
|
|
let mut guard = state.lock().await;
|
|
guard.devices.remove(&name);
|
|
let mut out = Vec::new();
|
|
let deployments_snapshot: Vec<CachedDeployment> =
|
|
guard.deployments.values().cloned().collect();
|
|
for cached in deployments_snapshot {
|
|
if let Some(set) = guard.owned_targets.get_mut(&cached.deployment_name) {
|
|
if set.remove(&name) {
|
|
out.push(cached.deployment_name.clone());
|
|
guard.dirty.insert(cached.key.clone());
|
|
}
|
|
}
|
|
}
|
|
out
|
|
};
|
|
for deployment_name in removed_from {
|
|
let k = desired_state_key(&name, &deployment_name);
|
|
if let Err(e) = desired.delete(&k).await {
|
|
tracing::debug!(key = %k, error = %e, "aggregator: desired-state delete on device delete failed");
|
|
}
|
|
}
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Diff helper: write/delete desired-state entries for one deployment
|
|
// ---------------------------------------------------------------------------
|
|
|
|
async fn reconcile_kv(
|
|
desired: &Store,
|
|
deployment_name: &DeploymentName,
|
|
new_targets: &HashSet<String>,
|
|
previous_targets: &HashSet<String>,
|
|
score_json: &[u8],
|
|
) {
|
|
// Writes: new_targets, unconditionally — idempotent put; agents
|
|
// byte-compare and no-op on unchanged content.
|
|
for device in new_targets {
|
|
let k = desired_state_key(device, deployment_name);
|
|
if let Err(e) = desired.put(&k, score_json.to_vec().into()).await {
|
|
tracing::debug!(key = %k, error = %e, "aggregator: desired-state put failed");
|
|
}
|
|
}
|
|
// Deletes: anything we owned previously but no longer target.
|
|
for device in previous_targets.difference(new_targets) {
|
|
let k = desired_state_key(device, deployment_name);
|
|
if let Err(e) = desired.delete(&k).await {
|
|
tracing::debug!(key = %k, error = %e, "aggregator: desired-state delete failed");
|
|
}
|
|
}
|
|
}
|
|
|
|
/// Initialize `owned_targets` from the current contents of the
|
|
/// `desired-state` KV. After a restart, we need to know what was
|
|
/// previously written so we can diff correctly on the first
|
|
/// watch-driven reconcile (otherwise we'd leak orphans when a
|
|
/// selector change causes a deployment to stop targeting a device).
|
|
async fn seed_owned_targets(bucket: &Store, state: &SharedFleetState) -> anyhow::Result<()> {
|
|
let mut guard = state.lock().await;
|
|
let mut keys = bucket.keys().await?;
|
|
while let Some(key_res) = keys.next().await {
|
|
let key = key_res?;
|
|
// Keys are `<device>.<deployment>`. The KV key space carries
|
|
// no namespace — names are globally unique at this layer —
|
|
// which is exactly why `owned_targets` keys by DeploymentName.
|
|
let Some((device, deployment)) = key.split_once('.') else {
|
|
warn!("Could not read device.deployment for key {key}");
|
|
continue;
|
|
};
|
|
let Ok(deployment_name) = DeploymentName::try_new(deployment) else {
|
|
warn!("Invalid deployment name for key {key}");
|
|
continue;
|
|
};
|
|
guard
|
|
.owned_targets
|
|
.entry(deployment_name)
|
|
.or_default()
|
|
.insert(device.to_string());
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
// ---------------------------------------------------------------------------
|
|
// Patch tick
|
|
// ---------------------------------------------------------------------------
|
|
|
|
async fn patch_tick(api: &Api<Deployment>, state: &SharedFleetState) -> anyhow::Result<()> {
|
|
let dirty: Vec<(DeploymentKey, DeploymentAggregate)> = {
|
|
let mut guard = state.lock().await;
|
|
let keys: Vec<DeploymentKey> = guard.dirty.drain().collect();
|
|
keys.iter()
|
|
.filter_map(|k| {
|
|
let cached = guard.deployments.get(k)?.clone();
|
|
let agg = compute_aggregate(&guard, &cached);
|
|
Some((k.clone(), agg))
|
|
})
|
|
.collect()
|
|
};
|
|
|
|
for (key, aggregate) in dirty {
|
|
let ns_api: Api<Deployment> = Api::namespaced(api.clone().into_client(), &key.namespace);
|
|
let status = json!({ "status": { "aggregate": aggregate } });
|
|
if let Err(e) = ns_api
|
|
.patch_status(&key.name, &PatchParams::default(), &Patch::Merge(&status))
|
|
.await
|
|
{
|
|
tracing::warn!(
|
|
namespace = %key.namespace,
|
|
name = %key.name,
|
|
error = %e,
|
|
"aggregator: status patch failed"
|
|
);
|
|
} else {
|
|
tracing::debug!(
|
|
namespace = %key.namespace,
|
|
name = %key.name,
|
|
matched = aggregate.matched_device_count,
|
|
succeeded = aggregate.succeeded,
|
|
failed = aggregate.failed,
|
|
pending = aggregate.pending,
|
|
"aggregator: status patched"
|
|
);
|
|
}
|
|
}
|
|
Ok(())
|
|
}
|
|
|
|
/// Compute the aggregate for one Deployment from current caches.
|
|
/// `owned_targets` is the authoritative "currently selector-matched"
|
|
/// set for the deployment, as maintained by the watchers.
|
|
pub fn compute_aggregate(state: &FleetState, cached: &CachedDeployment) -> DeploymentAggregate {
|
|
let empty = HashSet::new();
|
|
let targets = state
|
|
.owned_targets
|
|
.get(&cached.deployment_name)
|
|
.unwrap_or(&empty);
|
|
|
|
let mut agg = DeploymentAggregate {
|
|
matched_device_count: targets.len() as u32,
|
|
..Default::default()
|
|
};
|
|
|
|
for device_id in targets {
|
|
let pair = DevicePair {
|
|
device_id: device_id.clone(),
|
|
deployment: cached.deployment_name.clone(),
|
|
};
|
|
match state.states.get(&pair).map(|s| s.phase) {
|
|
Some(Phase::Running) => agg.succeeded += 1,
|
|
Some(Phase::Failed) => agg.failed += 1,
|
|
Some(Phase::Pending) | None => agg.pending += 1,
|
|
}
|
|
}
|
|
|
|
agg.last_error = state.last_error.get(&cached.key).cloned();
|
|
agg
|
|
}
|
|
|
|
#[cfg(test)]
|
|
mod tests {
|
|
use super::*;
|
|
use chrono::{TimeZone, Utc};
|
|
use harmony_reconciler_contracts::Id;
|
|
|
|
fn dn(s: &str) -> DeploymentName {
|
|
DeploymentName::try_new(s).expect("valid test name")
|
|
}
|
|
|
|
fn state(device: &str, deployment: &str, phase: Phase, seconds: i64) -> DeploymentState {
|
|
DeploymentState {
|
|
device_id: Id::from(device.to_string()),
|
|
deployment: dn(deployment),
|
|
phase,
|
|
last_event_at: Utc.timestamp_opt(1_700_000_000 + seconds, 0).unwrap(),
|
|
last_error: None,
|
|
}
|
|
}
|
|
|
|
fn cached(namespace: &str, name: &str, match_key: &str, match_val: &str) -> CachedDeployment {
|
|
let mut ml = BTreeMap::new();
|
|
ml.insert(match_key.to_string(), match_val.to_string());
|
|
CachedDeployment {
|
|
key: DeploymentKey {
|
|
namespace: namespace.to_string(),
|
|
name: name.to_string(),
|
|
},
|
|
deployment_name: dn(name),
|
|
selector: LabelSelector {
|
|
match_labels: Some(ml),
|
|
match_expressions: None,
|
|
},
|
|
score_json: b"{}".to_vec(),
|
|
}
|
|
}
|
|
|
|
fn pair(device: &str, deployment: &str) -> DevicePair {
|
|
DevicePair {
|
|
device_id: device.to_string(),
|
|
deployment: dn(deployment),
|
|
}
|
|
}
|
|
|
|
#[test]
|
|
fn selector_match_labels_only() {
|
|
let mut ml = BTreeMap::new();
|
|
ml.insert("group".to_string(), "edge-a".to_string());
|
|
let sel = LabelSelector {
|
|
match_labels: Some(ml),
|
|
match_expressions: None,
|
|
};
|
|
|
|
let mut matching = BTreeMap::new();
|
|
matching.insert("group".to_string(), "edge-a".to_string());
|
|
matching.insert("arch".to_string(), "aarch64".to_string());
|
|
assert!(selector_matches(&sel, &matching));
|
|
|
|
let mut non_matching = BTreeMap::new();
|
|
non_matching.insert("group".to_string(), "edge-b".to_string());
|
|
assert!(!selector_matches(&sel, &non_matching));
|
|
|
|
let empty = BTreeMap::new();
|
|
assert!(!selector_matches(&sel, &empty));
|
|
}
|
|
|
|
#[test]
|
|
fn empty_selector_matches_everything() {
|
|
let sel = LabelSelector::default();
|
|
let mut labels = BTreeMap::new();
|
|
labels.insert("anything".to_string(), "goes".to_string());
|
|
assert!(selector_matches(&sel, &labels));
|
|
assert!(selector_matches(&sel, &BTreeMap::new()));
|
|
}
|
|
|
|
#[test]
|
|
fn compute_aggregate_counts_matched_devices() {
|
|
let cached = cached("fleet-demo", "hello", "group", "edge-a");
|
|
let key = cached.key.clone();
|
|
|
|
let mut s = FleetState::default();
|
|
s.deployments.insert(key, cached.clone());
|
|
// Three devices already in owned_targets (selector resolution
|
|
// is separate from the aggregate; aggregate reads owned_targets).
|
|
s.owned_targets.insert(
|
|
cached.deployment_name.clone(),
|
|
["pi-01", "pi-02", "pi-03"]
|
|
.iter()
|
|
.map(|s| s.to_string())
|
|
.collect(),
|
|
);
|
|
s.states.insert(
|
|
pair("pi-01", "hello"),
|
|
state("pi-01", "hello", Phase::Running, 0),
|
|
);
|
|
s.states.insert(
|
|
pair("pi-02", "hello"),
|
|
state("pi-02", "hello", Phase::Failed, 0),
|
|
);
|
|
// pi-03 has no state entry → pending
|
|
|
|
let agg = compute_aggregate(&s, &cached);
|
|
assert_eq!(agg.matched_device_count, 3);
|
|
assert_eq!(agg.succeeded, 1);
|
|
assert_eq!(agg.failed, 1);
|
|
assert_eq!(agg.pending, 1);
|
|
}
|
|
|
|
#[test]
|
|
fn matched_devices_picks_by_label() {
|
|
let mut ml = BTreeMap::new();
|
|
ml.insert("group".to_string(), "edge-a".to_string());
|
|
let sel = LabelSelector {
|
|
match_labels: Some(ml),
|
|
match_expressions: None,
|
|
};
|
|
|
|
let mut devices: HashMap<String, BTreeMap<String, String>> = HashMap::new();
|
|
let mut a = BTreeMap::new();
|
|
a.insert("group".to_string(), "edge-a".to_string());
|
|
devices.insert("pi-01".to_string(), a);
|
|
let mut b = BTreeMap::new();
|
|
b.insert("group".to_string(), "edge-b".to_string());
|
|
devices.insert("pi-02".to_string(), b);
|
|
|
|
let matched = matched_devices(&sel, &devices);
|
|
assert_eq!(matched.len(), 1);
|
|
assert!(matched.contains("pi-01"));
|
|
}
|
|
}
|