Files
harmony/fleet/harmony-fleet-operator/src/fleet_aggregator.rs

842 lines
30 KiB
Rust

//! Operator-side aggregator + desired-state writer.
//!
//! Maintains three in-memory caches driven by watches:
//! - Deployment CRs (kube watch) → what we want to run
//! - Device CRs (kube watch) → where we could run it
//! - DeploymentState KV (NATS watch) → what's actually running
//!
//! Outputs:
//! - Writes `desired-state.<device>.<deployment>` KV entries when a
//! Deployment's selector matches a Device. Deletes them when the
//! match goes away.
//! - Patches `Deployment.status.aggregate` at 1 Hz for every CR
//! whose matched-device set or phase counts changed.
//!
//! No separate event stream, no per-key revision tracking: KV watches
//! are ordered and last-writer-wins, and the dirty set naturally
//! coalesces high-frequency state churn into one patch per tick.
use std::collections::{BTreeMap, HashMap, HashSet};
use std::sync::Arc;
use std::time::Duration;
use async_nats::jetstream::kv::{Operation, Store};
use futures_util::{StreamExt, TryStreamExt};
use harmony_reconciler_contracts::{
BUCKET_DESIRED_STATE, BUCKET_DEVICE_STATE, DeploymentName, DeploymentState, Phase,
desired_state_key,
};
use k8s_openapi::apimachinery::pkg::apis::meta::v1::LabelSelector;
use kube::api::{Api, Patch, PatchParams};
use kube::runtime::watcher::{self, Config as WatcherConfig, Event};
use kube::{Client, ResourceExt};
use serde_json::json;
use tokio::sync::Mutex;
use harmony::modules::fleet::operator::{
AggregateLastError, Deployment, DeploymentAggregate, Device,
};
use tracing::warn;
const PATCH_TICK: Duration = Duration::from_secs(1);
// ---------------------------------------------------------------------------
// State
// ---------------------------------------------------------------------------
/// (namespace, name) identifying a Deployment CR.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct DeploymentKey {
pub namespace: String,
pub name: String,
}
impl DeploymentKey {
pub fn from_cr(cr: &Deployment) -> Option<Self> {
Some(Self {
namespace: cr.namespace()?,
name: cr.name_any(),
})
}
}
/// One `(device, deployment)` pair.
#[derive(Debug, Clone, Hash, PartialEq, Eq)]
pub struct DevicePair {
pub device_id: String,
pub deployment: DeploymentName,
}
/// Thin projection of a Deployment CR — everything we need for
/// selector evaluation + desired-state writes + status aggregation,
/// without borrowing the full kube object.
#[derive(Debug, Clone)]
pub struct CachedDeployment {
key: DeploymentKey,
deployment_name: DeploymentName,
selector: LabelSelector,
/// JSON-serialized score payload ready to `put` into
/// desired-state. Cached because the same bytes are written to
/// every matched device's KV entry.
score_json: Vec<u8>,
}
#[derive(Debug, Default)]
pub struct FleetState {
/// Cached Deployment CRs, keyed by (namespace, name).
deployments: HashMap<DeploymentKey, CachedDeployment>,
/// Cached Device labels, keyed by `metadata.name`.
devices: HashMap<String, BTreeMap<String, String>>,
/// Latest DeploymentState per (device, deployment) pair.
states: HashMap<DevicePair, DeploymentState>,
/// Which devices have we pushed desired-state for, per deployment?
/// Diff against recomputed targets on any change. Keyed by
/// `DeploymentName` (not `DeploymentKey`) because the
/// `desired-state` KV key space doesn't carry namespace —
/// deployment names are globally unique at the NATS level. This
/// lets cold-start seeding from the KV populate the map
/// correctly without having to guess namespaces.
owned_targets: HashMap<DeploymentName, HashSet<String>>,
/// Per-deployment latest-failure surface for the CR status.
last_error: HashMap<DeploymentKey, AggregateLastError>,
/// CR keys whose status needs re-patching on the next tick.
dirty: HashSet<DeploymentKey>,
}
pub type SharedFleetState = Arc<Mutex<FleetState>>;
// ---------------------------------------------------------------------------
// Selector evaluation
// ---------------------------------------------------------------------------
/// Does `selector` match this label set? matchLabels only for MVP —
/// matchExpressions logs a warning once and is treated as "no match"
/// until we need it.
pub fn selector_matches(selector: &LabelSelector, labels: &BTreeMap<String, String>) -> bool {
if let Some(match_labels) = &selector.match_labels {
for (k, v) in match_labels {
if labels.get(k) != Some(v) {
return false;
}
}
}
if selector
.match_expressions
.as_ref()
.is_some_and(|v| !v.is_empty())
{
tracing::warn!(
"LabelSelector.matchExpressions is not yet supported; treating CR as empty-selector (matches nothing)"
);
return false;
}
true
}
/// Set of Device names currently matching `selector`.
fn matched_devices(
selector: &LabelSelector,
devices: &HashMap<String, BTreeMap<String, String>>,
) -> HashSet<String> {
devices
.iter()
.filter(|(_, labels)| selector_matches(selector, labels))
.map(|(name, _)| name.clone())
.collect()
}
// ---------------------------------------------------------------------------
// Top-level run
// ---------------------------------------------------------------------------
pub async fn run(client: Client, js: async_nats::jetstream::Context) -> anyhow::Result<()> {
let state_bucket = js
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_DEVICE_STATE.to_string(),
..Default::default()
})
.await?;
let desired_bucket = js
.create_key_value(async_nats::jetstream::kv::Config {
bucket: BUCKET_DESIRED_STATE.to_string(),
..Default::default()
})
.await?;
// Cold-start: initialize owned_targets from the current contents
// of the desired-state bucket so we don't orphan entries written
// by a previous operator run.
let state: SharedFleetState = Arc::new(Mutex::new(FleetState::default()));
seed_owned_targets(&desired_bucket, &state).await?;
let deployments_api: Api<Deployment> = Api::all(client.clone());
let devices_api: Api<Device> = Api::all(client.clone());
let patch_api: Api<Deployment> = Api::all(client);
tracing::info!(
owned = state
.lock()
.await
.owned_targets
.values()
.map(|s| s.len())
.sum::<usize>(),
"aggregator: startup complete"
);
let state_watcher_handle = {
let state = state.clone();
let bucket = state_bucket.clone();
tokio::spawn(async move {
if let Err(e) = run_state_kv_watcher(bucket, state).await {
tracing::warn!(error = %e, "aggregator: state watcher exited");
}
})
};
let deployment_watcher_handle = {
let state = state.clone();
let desired = desired_bucket.clone();
tokio::spawn(async move {
if let Err(e) = run_deployment_watcher(deployments_api.clone(), state, desired).await {
tracing::warn!(error = %e, "aggregator: deployment watcher exited");
}
})
};
let device_watcher_handle = {
let state = state.clone();
let desired = desired_bucket.clone();
tokio::spawn(async move {
if let Err(e) = run_device_watcher(devices_api, state, desired).await {
tracing::warn!(error = %e, "aggregator: device watcher exited");
}
})
};
let patch_state = state.clone();
let patch_loop = async move {
let mut ticker = tokio::time::interval(PATCH_TICK);
ticker.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Delay);
loop {
ticker.tick().await;
if let Err(e) = patch_tick(&patch_api, &patch_state).await {
tracing::warn!(error = %e, "aggregator: patch tick failed");
}
}
};
tokio::select! {
_ = patch_loop => Ok(()),
_ = state_watcher_handle => Ok(()),
_ = deployment_watcher_handle => Ok(()),
_ = device_watcher_handle => Ok(()),
}
}
// ---------------------------------------------------------------------------
// Device-state KV watcher (unchanged path)
// ---------------------------------------------------------------------------
fn parse_state_key(key: &str) -> Option<DevicePair> {
let rest = key.strip_prefix("state.")?;
let (device, deployment) = rest.split_once('.')?;
Some(DevicePair {
device_id: device.to_string(),
deployment: DeploymentName::try_new(deployment).ok()?,
})
}
async fn run_state_kv_watcher(bucket: Store, state: SharedFleetState) -> anyhow::Result<()> {
let mut watch = bucket.watch_with_history(">").await?;
while let Some(entry_res) = watch.next().await {
let entry = match entry_res {
Ok(e) => e,
Err(e) => {
tracing::warn!(error = %e, "aggregator: state watch delivery error");
continue;
}
};
let Some(pair) = parse_state_key(&entry.key) else {
continue;
};
match entry.operation {
Operation::Put => {
let ds: DeploymentState = match serde_json::from_slice(&entry.value) {
Ok(d) => d,
Err(e) => {
tracing::warn!(key = %entry.key, error = %e, "aggregator: bad device_state payload");
continue;
}
};
let mut guard = state.lock().await;
apply_state(&mut guard, pair, ds);
}
Operation::Delete | Operation::Purge => {
let mut guard = state.lock().await;
drop_state(&mut guard, &pair);
}
}
}
Ok(())
}
/// Record a device's latest state, dedup against older timestamps,
/// maintain last_error, mark the deployment dirty.
pub fn apply_state(state: &mut FleetState, pair: DevicePair, ds: DeploymentState) {
if let Some(prev) = state.states.get(&pair) {
if prev.last_event_at > ds.last_event_at {
return;
}
}
let phase = ds.phase;
let device_id = ds.device_id.to_string();
let last_error_msg = ds.last_error.clone();
let at = ds.last_event_at.to_rfc3339();
state.states.insert(pair.clone(), ds);
for key in matching_deployment_keys(state, &pair.deployment) {
match phase {
Phase::Failed => {
if let Some(msg) = last_error_msg.as_deref() {
state.last_error.insert(
key.clone(),
AggregateLastError {
device_id: device_id.clone(),
message: msg.to_string(),
at: at.clone(),
},
);
}
}
Phase::Running => {
if let Some(existing) = state.last_error.get(&key) {
if existing.device_id == device_id {
state.last_error.remove(&key);
}
}
}
Phase::Pending => {}
}
state.dirty.insert(key);
}
}
pub fn drop_state(state: &mut FleetState, pair: &DevicePair) {
let Some(removed) = state.states.remove(pair) else {
return;
};
let device_id = removed.device_id.to_string();
for key in matching_deployment_keys(state, &pair.deployment) {
if let Some(existing) = state.last_error.get(&key) {
if existing.device_id == device_id {
state.last_error.remove(&key);
}
}
state.dirty.insert(key);
}
}
/// CR keys that carry a given deployment name. Deployment names are
/// globally unique at the KV level, so typically 0 or 1 entry here;
/// Vec lets us surface a warning rather than panic if a misconfigured
/// cluster has duplicates across namespaces.
fn matching_deployment_keys(state: &FleetState, deployment: &DeploymentName) -> Vec<DeploymentKey> {
state
.deployments
.values()
.filter(|d| &d.deployment_name == deployment)
.map(|d| d.key.clone())
.collect()
}
// ---------------------------------------------------------------------------
// Deployment CR watcher
// ---------------------------------------------------------------------------
/// Watch Deployment CRD in k8s API and update nats desired state accordingly
async fn run_deployment_watcher(
api: Api<Deployment>,
state: SharedFleetState,
desired: Store,
) -> anyhow::Result<()> {
let mut stream = watcher::watcher(api, WatcherConfig::default()).boxed();
while let Some(event) = stream.try_next().await? {
match event {
Event::Apply(cr) | Event::InitApply(cr) => {
on_deployment_upsert(&state, &desired, cr).await;
}
Event::Delete(cr) => {
on_deployment_delete(&state, &desired, cr).await;
}
Event::Init | Event::InitDone => {}
}
}
Ok(())
}
async fn on_deployment_upsert(state: &SharedFleetState, desired: &Store, cr: Deployment) {
let Some(key) = DeploymentKey::from_cr(&cr) else {
return;
};
let Ok(deployment_name) = DeploymentName::try_new(&key.name) else {
tracing::warn!(name = %key.name, "aggregator: CR name is not a valid DeploymentName, skipping");
return;
};
let selector = cr.spec.target_selector.clone();
let score_json = match serde_json::to_vec(&cr.spec.score) {
Ok(v) => v,
Err(e) => {
tracing::warn!(namespace = %key.namespace, name = %key.name, error = %e, "aggregator: score payload not serializable");
return;
}
};
let (new_targets, previous_targets) = {
let mut guard = state.lock().await;
let new_targets = matched_devices(&selector, &guard.devices);
guard.deployments.insert(
key.clone(),
CachedDeployment {
key: key.clone(),
deployment_name: deployment_name.clone(),
selector: selector.clone(),
score_json: score_json.clone(),
},
);
let previous = guard
.owned_targets
.remove(&deployment_name)
.unwrap_or_default();
guard
.owned_targets
.insert(deployment_name.clone(), new_targets.clone());
guard.dirty.insert(key.clone());
(new_targets, previous)
};
reconcile_kv(
desired,
&deployment_name,
&new_targets,
&previous_targets,
&score_json,
)
.await;
}
async fn on_deployment_delete(state: &SharedFleetState, desired: &Store, cr: Deployment) {
let Some(key) = DeploymentKey::from_cr(&cr) else {
return;
};
let Ok(deployment_name) = DeploymentName::try_new(&key.name) else {
return;
};
let previous = {
let mut guard = state.lock().await;
guard.deployments.remove(&key);
guard.last_error.remove(&key);
guard.dirty.remove(&key);
guard
.owned_targets
.remove(&deployment_name)
.unwrap_or_default()
};
// Every previously-owned target becomes a KV delete. Controller
// finalizer does a belt-and-suspenders scan, but we pull our own
// entries here too so agents react immediately.
for device in &previous {
let k = desired_state_key(device, &deployment_name);
if let Err(e) = desired.delete(&k).await {
tracing::debug!(key = %k, error = %e, "aggregator: desired-state delete on CR delete failed");
}
}
}
// ---------------------------------------------------------------------------
// Device CR watcher
// ---------------------------------------------------------------------------
/// Watch k8s Device CRD and adjust desired state.
///
/// For example, if a device adds or deletes a label, its desired state will contain deployments matching
/// the device's new set of labels.
async fn run_device_watcher(
api: Api<Device>,
state: SharedFleetState,
desired: Store,
) -> anyhow::Result<()> {
let mut stream = watcher::watcher(api, WatcherConfig::default()).boxed();
while let Some(event) = stream.try_next().await? {
match event {
Event::Apply(dev) | Event::InitApply(dev) => {
on_device_upsert(&state, &desired, dev).await;
}
Event::Delete(dev) => {
on_device_delete(&state, &desired, dev).await;
}
Event::Init | Event::InitDone => {}
}
}
Ok(())
}
async fn on_device_upsert(state: &SharedFleetState, desired: &Store, dev: Device) {
let name = dev.name_any();
let labels: BTreeMap<String, String> = dev.metadata.labels.clone().unwrap_or_default();
// For every deployment, compute whether this single device now
// matches vs. previously matched; diff against owned_targets;
// collect the KV writes/deletes to perform after the lock is
// released.
let per_deployment: Vec<(CachedDeployment, bool, bool)> = {
let mut guard = state.lock().await;
let snapshot: Vec<CachedDeployment> = guard.deployments.values().cloned().collect();
let previously_matched: HashMap<DeploymentName, bool> = snapshot
.iter()
.map(|d| {
let was = guard
.owned_targets
.get(&d.deployment_name)
.is_some_and(|set| set.contains(&name));
(d.deployment_name.clone(), was)
})
.collect();
guard.devices.insert(name.clone(), labels.clone());
let mut out = Vec::with_capacity(snapshot.len());
for d in snapshot {
let was = previously_matched
.get(&d.deployment_name)
.copied()
.unwrap_or(false);
let now = selector_matches(&d.selector, &labels);
if was != now {
let targets = guard
.owned_targets
.entry(d.deployment_name.clone())
.or_default();
if now {
targets.insert(name.clone());
} else {
targets.remove(&name);
}
guard.dirty.insert(d.key.clone());
}
out.push((d, was, now));
}
out
};
for (cached, was, now) in per_deployment {
match (was, now) {
(false, true) => {
let k = desired_state_key(&name, &cached.deployment_name);
if let Err(e) = desired.put(&k, cached.score_json.clone().into()).await {
tracing::debug!(key = %k, error = %e, "aggregator: desired-state put failed");
}
}
(true, false) => {
let k = desired_state_key(&name, &cached.deployment_name);
if let Err(e) = desired.delete(&k).await {
tracing::debug!(key = %k, error = %e, "aggregator: desired-state delete failed");
}
}
_ => {}
}
}
}
async fn on_device_delete(state: &SharedFleetState, desired: &Store, dev: Device) {
let name = dev.name_any();
let removed_from: Vec<DeploymentName> = {
let mut guard = state.lock().await;
guard.devices.remove(&name);
let mut out = Vec::new();
let deployments_snapshot: Vec<CachedDeployment> =
guard.deployments.values().cloned().collect();
for cached in deployments_snapshot {
if let Some(set) = guard.owned_targets.get_mut(&cached.deployment_name) {
if set.remove(&name) {
out.push(cached.deployment_name.clone());
guard.dirty.insert(cached.key.clone());
}
}
}
out
};
for deployment_name in removed_from {
let k = desired_state_key(&name, &deployment_name);
if let Err(e) = desired.delete(&k).await {
tracing::debug!(key = %k, error = %e, "aggregator: desired-state delete on device delete failed");
}
}
}
// ---------------------------------------------------------------------------
// Diff helper: write/delete desired-state entries for one deployment
// ---------------------------------------------------------------------------
async fn reconcile_kv(
desired: &Store,
deployment_name: &DeploymentName,
new_targets: &HashSet<String>,
previous_targets: &HashSet<String>,
score_json: &[u8],
) {
// Writes: new_targets, unconditionally — idempotent put; agents
// byte-compare and no-op on unchanged content.
for device in new_targets {
let k = desired_state_key(device, deployment_name);
if let Err(e) = desired.put(&k, score_json.to_vec().into()).await {
tracing::debug!(key = %k, error = %e, "aggregator: desired-state put failed");
}
}
// Deletes: anything we owned previously but no longer target.
for device in previous_targets.difference(new_targets) {
let k = desired_state_key(device, deployment_name);
if let Err(e) = desired.delete(&k).await {
tracing::debug!(key = %k, error = %e, "aggregator: desired-state delete failed");
}
}
}
/// Initialize `owned_targets` from the current contents of the
/// `desired-state` KV. After a restart, we need to know what was
/// previously written so we can diff correctly on the first
/// watch-driven reconcile (otherwise we'd leak orphans when a
/// selector change causes a deployment to stop targeting a device).
async fn seed_owned_targets(bucket: &Store, state: &SharedFleetState) -> anyhow::Result<()> {
let mut guard = state.lock().await;
let mut keys = bucket.keys().await?;
while let Some(key_res) = keys.next().await {
let key = key_res?;
// Keys are `<device>.<deployment>`. The KV key space carries
// no namespace — names are globally unique at this layer —
// which is exactly why `owned_targets` keys by DeploymentName.
let Some((device, deployment)) = key.split_once('.') else {
warn!("Could not read device.deployment for key {key}");
continue;
};
let Ok(deployment_name) = DeploymentName::try_new(deployment) else {
warn!("Invalid deployment name for key {key}");
continue;
};
guard
.owned_targets
.entry(deployment_name)
.or_default()
.insert(device.to_string());
}
Ok(())
}
// ---------------------------------------------------------------------------
// Patch tick
// ---------------------------------------------------------------------------
async fn patch_tick(api: &Api<Deployment>, state: &SharedFleetState) -> anyhow::Result<()> {
let dirty: Vec<(DeploymentKey, DeploymentAggregate)> = {
let mut guard = state.lock().await;
let keys: Vec<DeploymentKey> = guard.dirty.drain().collect();
keys.iter()
.filter_map(|k| {
let cached = guard.deployments.get(k)?.clone();
let agg = compute_aggregate(&guard, &cached);
Some((k.clone(), agg))
})
.collect()
};
for (key, aggregate) in dirty {
let ns_api: Api<Deployment> = Api::namespaced(api.clone().into_client(), &key.namespace);
let status = json!({ "status": { "aggregate": aggregate } });
if let Err(e) = ns_api
.patch_status(&key.name, &PatchParams::default(), &Patch::Merge(&status))
.await
{
tracing::warn!(
namespace = %key.namespace,
name = %key.name,
error = %e,
"aggregator: status patch failed"
);
} else {
tracing::debug!(
namespace = %key.namespace,
name = %key.name,
matched = aggregate.matched_device_count,
succeeded = aggregate.succeeded,
failed = aggregate.failed,
pending = aggregate.pending,
"aggregator: status patched"
);
}
}
Ok(())
}
/// Compute the aggregate for one Deployment from current caches.
/// `owned_targets` is the authoritative "currently selector-matched"
/// set for the deployment, as maintained by the watchers.
pub fn compute_aggregate(state: &FleetState, cached: &CachedDeployment) -> DeploymentAggregate {
let empty = HashSet::new();
let targets = state
.owned_targets
.get(&cached.deployment_name)
.unwrap_or(&empty);
let mut agg = DeploymentAggregate {
matched_device_count: targets.len() as u32,
..Default::default()
};
for device_id in targets {
let pair = DevicePair {
device_id: device_id.clone(),
deployment: cached.deployment_name.clone(),
};
match state.states.get(&pair).map(|s| s.phase) {
Some(Phase::Running) => agg.succeeded += 1,
Some(Phase::Failed) => agg.failed += 1,
Some(Phase::Pending) | None => agg.pending += 1,
}
}
agg.last_error = state.last_error.get(&cached.key).cloned();
agg
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{TimeZone, Utc};
use harmony_reconciler_contracts::Id;
fn dn(s: &str) -> DeploymentName {
DeploymentName::try_new(s).expect("valid test name")
}
fn state(device: &str, deployment: &str, phase: Phase, seconds: i64) -> DeploymentState {
DeploymentState {
device_id: Id::from(device.to_string()),
deployment: dn(deployment),
phase,
last_event_at: Utc.timestamp_opt(1_700_000_000 + seconds, 0).unwrap(),
last_error: None,
}
}
fn cached(namespace: &str, name: &str, match_key: &str, match_val: &str) -> CachedDeployment {
let mut ml = BTreeMap::new();
ml.insert(match_key.to_string(), match_val.to_string());
CachedDeployment {
key: DeploymentKey {
namespace: namespace.to_string(),
name: name.to_string(),
},
deployment_name: dn(name),
selector: LabelSelector {
match_labels: Some(ml),
match_expressions: None,
},
score_json: b"{}".to_vec(),
}
}
fn pair(device: &str, deployment: &str) -> DevicePair {
DevicePair {
device_id: device.to_string(),
deployment: dn(deployment),
}
}
#[test]
fn selector_match_labels_only() {
let mut ml = BTreeMap::new();
ml.insert("group".to_string(), "edge-a".to_string());
let sel = LabelSelector {
match_labels: Some(ml),
match_expressions: None,
};
let mut matching = BTreeMap::new();
matching.insert("group".to_string(), "edge-a".to_string());
matching.insert("arch".to_string(), "aarch64".to_string());
assert!(selector_matches(&sel, &matching));
let mut non_matching = BTreeMap::new();
non_matching.insert("group".to_string(), "edge-b".to_string());
assert!(!selector_matches(&sel, &non_matching));
let empty = BTreeMap::new();
assert!(!selector_matches(&sel, &empty));
}
#[test]
fn empty_selector_matches_everything() {
let sel = LabelSelector::default();
let mut labels = BTreeMap::new();
labels.insert("anything".to_string(), "goes".to_string());
assert!(selector_matches(&sel, &labels));
assert!(selector_matches(&sel, &BTreeMap::new()));
}
#[test]
fn compute_aggregate_counts_matched_devices() {
let cached = cached("fleet-demo", "hello", "group", "edge-a");
let key = cached.key.clone();
let mut s = FleetState::default();
s.deployments.insert(key, cached.clone());
// Three devices already in owned_targets (selector resolution
// is separate from the aggregate; aggregate reads owned_targets).
s.owned_targets.insert(
cached.deployment_name.clone(),
["pi-01", "pi-02", "pi-03"]
.iter()
.map(|s| s.to_string())
.collect(),
);
s.states.insert(
pair("pi-01", "hello"),
state("pi-01", "hello", Phase::Running, 0),
);
s.states.insert(
pair("pi-02", "hello"),
state("pi-02", "hello", Phase::Failed, 0),
);
// pi-03 has no state entry → pending
let agg = compute_aggregate(&s, &cached);
assert_eq!(agg.matched_device_count, 3);
assert_eq!(agg.succeeded, 1);
assert_eq!(agg.failed, 1);
assert_eq!(agg.pending, 1);
}
#[test]
fn matched_devices_picks_by_label() {
let mut ml = BTreeMap::new();
ml.insert("group".to_string(), "edge-a".to_string());
let sel = LabelSelector {
match_labels: Some(ml),
match_expressions: None,
};
let mut devices: HashMap<String, BTreeMap<String, String>> = HashMap::new();
let mut a = BTreeMap::new();
a.insert("group".to_string(), "edge-a".to_string());
devices.insert("pi-01".to_string(), a);
let mut b = BTreeMap::new();
b.insert("group".to_string(), "edge-b".to_string());
devices.insert("pi-02".to_string(), b);
let matched = matched_devices(&sel, &devices);
assert_eq!(matched.len(), 1);
assert!(matched.contains("pi-01"));
}
}