Compare commits

..

2 Commits

Author SHA1 Message Date
51b39505bb docs(adr): reframe desired-state ADR as proposal and explore alternatives
Re-frame ADR-021 from an accepted shell-executor decision into an
explicit problem statement plus one candidate proposal (Alternative A),
with an Open Questions section capturing the concerns raised during
review: wrong abstraction level, no idempotency, no resource model, no
typed status, incoherence with the Score-Topology-Interpret pattern,
and weak security posture.

Add ADR-022 enumerating four alternatives:
- A: shell command executor (current scaffold)
- B: mini-kubelet with typed resource manifests and reconcilers
- C: embedded Score interpreter on the agent
- D: hybrid — typed manifests now, Scores later

Recommends Alternative D: ship typed AgentManifest/AgentStatus with a
small fixed reconciler set for the IoT MVP, keeping an explicit
migration seam to the Score-based end state once Scores become
uniformly wire-serializable.

Also documents what specifically is wrong with the happy-path shell
executor in harmony_agent/src/desired_state.rs and clarifies that the
NATS KV watch + typed CAS write skeleton is reusable, while the
execute_command shell-out should be gated behind an audited ShellJob
variant or deleted once real reconcilers land.
2026-04-10 07:13:38 -04:00
9cd1713788 feat(agent): desired-state convergence happy path
Adds a proof-of-concept desired-state convergence mechanism for
harmony_agent: the central platform writes a DesiredStateConfig
to NATS KV at desired-state.<agent-id>; the agent watches the key,
executes the command via sh -c, and writes the result to
actual-state.<agent-id>.

- New module: harmony_agent/src/desired_state.rs (~150 lines)
- New types: DeploymentConfig::DesiredState, DesiredStateConfig,
  ActualState, ExecutionStatus
- ADR: docs/adr/021-agent-desired-state-convergence.md

This is an initial happy path for review. The architecture
needs further evaluation against the mini-kubelet vision before
hardening (security, sandboxing, signing, resource limits).
2026-04-10 07:08:53 -04:00
19 changed files with 685 additions and 83 deletions

View File

@@ -0,0 +1,117 @@
# ADR-021: Agent Desired-State Convergence — Problem Statement and Initial Proposal
**Status:** Proposed (under review — see ADR-022 for alternatives)
**Date:** 2026-04-09
> This document was originally drafted as an "Accepted" ADR describing a shell-command executor. On review, the team was not convinced that the shell-executor shape is the right one. It has been re-framed as a **problem statement + one candidate proposal (Alternative A)**. Alternative designs — including a mini-kubelet model and an embedded-Score model — are explored in [ADR-022](./022-agent-desired-state-alternatives.md). A final decision has **not** been made.
## Context
The Harmony Agent (ADR-016) currently handles a single use case: PostgreSQL HA failover via `DeploymentConfig::FailoverPostgreSQL`. For the IoT fleet management platform (Raspberry Pi clusters deployed in homes, offices, and community spaces), we need the agent to become a general-purpose desired-state convergence engine.
Concretely, the central Harmony control plane must be able to:
1. Express the *desired state* of an individual Pi (or a class of Pis) in a typed, serializable form.
2. Ship that desired state to the device over the existing NATS JetStream mesh (ADR-017-1).
3. Have the on-device agent reconcile toward it — idempotently, observably, and without manual intervention.
4. Read back an authoritative, typed *actual state* so the control plane can report convergence, surface errors, and drive a fleet dashboard.
The existing heartbeat / failover machinery (ADR-017-3) remains valuable — it proves the agent can maintain persistent NATS connections, do CAS writes against KV, and react to state changes. Whatever desired-state mechanism we add **extends** that foundation rather than replacing it.
### Design forces
- **Coherence with the rest of Harmony.** Harmony's entire identity is Score-Topology-Interpret with compile-time safety. A desired-state mechanism that reintroduces stringly-typed, runtime-validated blobs on the edge would be a regression from our own design rules (see `CLAUDE.md`: "Capabilities are industry concepts, not tools", "Scores encapsulate operational complexity", "Scores must be idempotent").
- **The "mini-kubelet" framing.** The team is converging on a mental model where the agent is a stripped-down kubelet: it owns a set of local reconcilers, maintains a PLEG-like state machine per managed resource, and converges toward a declarative manifest. ADR-017-3 is already explicitly Kubernetes-inspired for staleness detection. This framing should inform the desired-state design, not fight it.
- **Speed to IoT MVP.** We need something shippable soon enough that real Pi fleets can be demoed. Over-engineering the v1 risks never shipping; under-engineering it risks a rewrite once the wrong abstraction is entrenched on hundreds of devices in the field.
- **Security.** Whatever lands on the device is, by construction, running with the agent's privileges. A mechanism that reduces to "run this shell string as root" is a very wide blast radius.
- **Serializability.** Today, Harmony Scores are *not* uniformly serializable across the wire — many hold trait objects, closures, or references to live topologies. Any design that assumes "just send a Score" needs to confront this.
## Initial Proposal (Alternative A — Shell Command Executor)
This is the first-pass design, implemented as a happy-path scaffold on this branch. **It is presented here for critique, not as a settled decision.**
### Desired-State Model
Each agent watches a NATS KV key `desired-state.<agent-id>` for its workload definition. When the value changes, the agent executes the workload and reports the result to `actual-state.<agent-id>`. This is a pull-based convergence loop: the control plane writes intent, the agent converges, the control plane reads the result.
A `DesiredState` is a serializable description of what should be running on the device. For this first iteration, it is a shell command plus a monotonic generation counter.
```rust
enum DeploymentConfig {
FailoverPostgreSQL(FailoverCNPGConfig), // existing
DesiredState(DesiredStateConfig), // new
}
struct DesiredStateConfig {
command: String,
generation: u64,
}
```
### Config Flow
```
Central Platform NATS JetStream Agent (Pi)
================ ============== ==========
1. Write desired state -------> KV: desired-state.<agent-id>
2. Watch detects change
3. Execute workload
4. Write result --------> KV: actual-state.<agent-id>
5. Read actual state <------- KV: actual-state.<agent-id>
```
The agent's heartbeat loop continues independently. The desired-state watcher runs as a separate async task, sharing the same NATS connection. This separation means a slow command execution does not block heartbeats.
### State Reporting
```rust
struct ActualState {
agent_id: Id,
generation: u64, // mirrors the desired-state generation
status: ExecutionStatus, // Success, Failed, Running
stdout: String,
stderr: String,
exit_code: Option<i32>,
executed_at: u64,
}
```
The control plane reads this key to determine convergence. If `actual_state.generation == desired_state.generation` and `status == Success`, the device has converged.
### Why this shape was chosen first
- Dirt cheap to implement (≈200 lines, done on this branch).
- Works for literally any task a human would type into a Pi shell.
- Reuses the existing NATS KV infrastructure and CAS write idiom already proven by the heartbeat loop.
- Provides an end-to-end demo path in under a day.
## Open Questions and Concerns
The following concerns block promoting this to an "Accepted" decision:
1. **Wrong abstraction level.** `sh -c "<string>"` is the *opposite* of what Harmony stands for. Harmony exists because IaC tools drown in stringly-typed, runtime-validated config. Shipping arbitrary shell to the edge recreates that problem inside our own agent — at the worst possible place (the device).
2. **No idempotency.** `systemctl start foo` and `apt install foo` are not idempotent by themselves. Every Score in Harmony is required to be idempotent. A shell executor pushes that burden onto whoever writes the commands, where we cannot check it.
3. **No resource model.** There is no notion of "this manifest owns this systemd unit". When desired state changes, we cannot compute a diff, we cannot garbage-collect the old resource, and we cannot surface "drift" meaningfully. We know generation N was "run"; we do not know what it left behind.
4. **No typed status.** `stdout`/`stderr`/`exit_code` is not enough to drive a fleet dashboard. We want typed `Status { container: Running { since, restarts }, unit: Active, file: PresentAt(sha256) }`.
5. **No lifecycle.** Shell commands are fire-and-forget. A kubelet-shaped agent needs to know whether a resource is *still* healthy after it was created — liveness and readiness are first-class concerns, not a post-hoc `exit_code` check.
6. **Security.** The ADR hand-waves "NATS ACLs + future signing". In practice, v1 lets anyone with write access to the KV bucket execute anything as the agent user. Even with NATS ACLs, the *shape* of the API invites abuse; a typed manifest with an allowlist of resource types has a much narrower attack surface by construction.
7. **Generational model is too coarse.** A single `generation: u64` per agent means we can only describe one monolithic "job". Real fleet state is a *set* of resources (this container, this unit, this file). We need per-resource generations, or a manifest-level generation with a sub-resource status map.
8. **Incoherent with ADR-017-3's kubelet framing.** That ADR deliberately borrowed K8s vocabulary (staleness, fencing, leader promotion) because kubelet-like semantics are the right ones for resilient edge workloads. Shell-exec abandons that lineage at the first opportunity.
9. **Coherence with the Score-Topology-Interpret pattern.** Today's proposal introduces a parallel concept ("DesiredStateConfig") that has nothing to do with Score or Topology. If a Pi is just "a topology with a small capability set" (systemd, podman, files, network), then the right thing to ship is a Score, not a shell string.
## Status of the Implementation on this Branch
The happy-path code in `harmony_agent/src/desired_state.rs` (≈250 lines, fully tested) implements Alternative A. It is **scaffolding**, not a committed design:
- It is useful as a vehicle to prove out the NATS KV watch + typed `ActualState` CAS write pattern, both of which are reusable regardless of which alternative we pick.
- It should **not** be wired into user-facing tooling until the architectural decision in ADR-022 is made.
- If we adopt Alternative B (mini-kubelet) or C (embedded Scores), the shell executor either becomes one *variant* of a typed `Resource` enum (a `ShellJob` resource, clearly labeled as an escape hatch) or is deleted outright.
## Next Steps
1. Review ADR-022 (alternatives + recommendation).
2. Pick a target design.
3. Either:
- Rework `desired_state.rs` to match the chosen target, **or**
- Keep it behind a feature flag as a demo fallback while the real design is built.
4. Re-file this ADR as "Superseded by ADR-022" or update it in place with the accepted design.

View File

@@ -0,0 +1,218 @@
# ADR-022: Agent Desired-State — Alternatives and Recommendation
**Status:** Proposed
**Date:** 2026-04-09
**Supersedes (candidate):** ADR-021 shell-executor proposal
## Context
ADR-021 drafted a first-pass "desired-state convergence" mechanism for the Harmony Agent (ADR-016) in the form of a shell-command executor. On review, that shape raised serious concerns (see ADR-021 §"Open Questions and Concerns"): it is incoherent with Harmony's Score-Topology-Interpret pattern, it is not idempotent, it has no resource model, no typed status, no lifecycle, and it weakens the agent's security posture.
Separately, the team has been converging on a **"mini-kubelet" framing** for the IoT agent:
- The agent owns a small, fixed set of *reconcilers*, one per resource type it can manage (systemd unit, container, file, network interface, overlay config...).
- The desired state is a *typed manifest* — a bag of resources with identities, generations, and typed status.
- The agent runs reconcile loops similar to kubelet's Pod Lifecycle Event Generator (PLEG): for each managed resource, observe actual, compare to desired, apply the minimum delta, update typed status.
- Failure and drift are first-class. "I tried, it failed, here is why" is a valid steady state.
ADR-017-3 already borrows Kubernetes vocabulary (staleness, fencing, promotion) on purpose. Doubling down on the kubelet metaphor at the desired-state layer is the natural continuation, not a tangent.
This ADR enumerates the candidate designs, argues their tradeoffs honestly, and recommends a path.
## Alternatives
### Alternative A — Shell Command Executor (ADR-021 as-is)
**Shape:** `DesiredState { command: String, generation: u64 }`, agent does `sh -c $command`, pipes stdout/stderr/exit into `ActualState`.
**Pros:**
- Trivial to implement. ~200 LOC, already on this branch.
- Works for any task that can be expressed as a shell pipeline — maximum flexibility at v1.
- Zero new abstractions: reuses existing NATS KV watch + CAS patterns.
- End-to-end demo-able in an afternoon.
**Cons:**
- **Wrong abstraction level.** Harmony's entire thesis is "no more stringly-typed YAML/shell mud pits". This design ships that mud pit *to the edge*.
- **Not idempotent.** The burden of idempotency falls on whoever writes the command string. `systemctl start foo` run twice is fine; `apt install foo && echo "done" >> /etc/state` run twice is broken. We cannot enforce correctness.
- **No resource model.** No concept of "this manifest owns X". No diffing, no GC, no drift detection, no "what does this agent currently run?".
- **No typed status.** stdout/stderr/exit_code does not tell a fleet dashboard "container nginx is running, restarted 3 times, last healthy 2s ago". It tells it "this bash ran and exited 0 once, three minutes ago".
- **No lifecycle.** Fire-and-forget; post-exit the agent has no notion of whether the resource is still healthy.
- **Security.** Even with NATS ACLs, the API's *shape* invites abuse. Any bug in the control plane that lets a user influence a desired-state write equals RCE on every Pi.
- **Incoherent with ADR-017-3 and Score-Topology-Interpret.** Introduces a parallel concept that has nothing to do with the rest of Harmony.
**Verdict:** Acceptable only as a *named escape hatch* inside a richer design (a `ShellJob` resource variant, explicitly labeled as such and audited). Not acceptable as the whole design.
---
### Alternative B — Mini-Kubelet with Typed Resource Manifests
**Shape:** The agent owns a fixed set of `Resource` variants and one reconciler per variant.
```rust
/// The unit of desired state shipped to an agent.
/// Serialized to JSON, pushed via NATS KV to `desired-state.<agent-id>`.
struct AgentManifest {
generation: u64, // monotonic, control-plane assigned
resources: Vec<ManagedResource>,
}
struct ManagedResource {
/// Stable, manifest-unique identity. Used for diffing across generations.
id: ResourceId,
spec: ResourceSpec,
}
enum ResourceSpec {
SystemdUnit(SystemdUnitSpec), // ensure unit exists, enabled, active
Container(ContainerSpec), // podman/docker run with image, env, volumes
File(FileSpec), // path, mode, owner, content (hash or inline)
NetworkConfig(NetworkConfigSpec), // interface, addresses, routes
ShellJob(ShellJobSpec), // explicit escape hatch, audited separately
// ...extend carefully
}
/// What the agent reports back.
struct AgentStatus {
manifest_generation: u64, // which desired-state gen this reflects
observed_generation: u64, // highest gen the agent has *processed*
resources: HashMap<ResourceId, ResourceStatus>,
conditions: Vec<AgentCondition>, // Ready, Degraded, Reconciling, ...
}
enum ResourceStatus {
Pending,
Reconciling { since: Timestamp },
Ready { since: Timestamp, details: ResourceReadyDetails },
Failed { since: Timestamp, error: String, retry_after: Option<Timestamp> },
}
```
Each reconciler implements a small trait:
```rust
trait Reconciler {
type Spec;
type Status;
async fn observe(&self, id: &ResourceId) -> Result<Self::Status>;
async fn reconcile(&self, id: &ResourceId, spec: &Self::Spec) -> Result<Self::Status>;
async fn delete(&self, id: &ResourceId) -> Result<()>;
}
```
The agent loop becomes:
1. Watch `desired-state.<agent-id>` for the latest `AgentManifest`.
2. On change, compute diff vs. observed set: additions, updates, deletions.
3. Dispatch each resource to its reconciler. Reconcilers are idempotent by contract.
4. Aggregate per-resource status into `AgentStatus`, write to `actual-state.<agent-id>` via CAS.
5. Re-run periodically to detect drift even when desired state has not changed (PLEG-equivalent).
**Pros:**
- **Declarative and idempotent by construction.** Reconcilers are required to be idempotent; the contract is enforced in Rust traits, not in docs.
- **Typed status.** Dashboards, alerts, and the control plane get structured data.
- **Drift detection.** Periodic re-observation catches "someone SSH'd in and stopped the service".
- **Lifecycle.** Each resource has a clear state machine; health is a first-class concept.
- **Coherent with ADR-017-3.** The kubelet framing becomes literal, not metaphorical.
- **Narrow attack surface.** The agent only knows how to do a handful of well-audited things. Adding a new capability is an explicit code change, not a new shell string.
- **Composable with Harmony's existing philosophy.** `ManagedResource` is to the agent what a Score is to a Topology, at a smaller scale.
**Cons:**
- More upfront design. Each reconciler needs to be written and tested.
- Requires us to *commit* to a resource type set and its status schema. Adding a new kind is a versioned change to the wire format.
- Duplicates, at the edge, some of the vocabulary already present in Harmony's Score layer (e.g., `FileDeployment`, container deployments). Risk of two parallel abstractions evolving in tension.
- Harder to demo in a single afternoon.
**Verdict:** Strong candidate. Matches the team's mini-kubelet intuition directly.
---
### Alternative C — Embedded Score Interpreter on the Agent
**Shape:** The desired state *is* a Harmony Score (or a set of Scores), serialized and pushed via NATS. The agent hosts a local `PiTopology` that exposes a small, carefully chosen set of capabilities (`SystemdHost`, `ContainerRuntime`, `FileSystemHost`, `NetworkConfigurator`, ...). The agent runs the Score's `interpret` against that local topology.
```rust
// On the control plane:
let score = SystemdServiceScore { ... };
let wire: SerializedScore = score.to_wire()?;
nats.put(format!("desired-state.{agent_id}"), wire).await?;
// On the agent:
let score = SerializedScore::decode(payload)?;
let topology = PiTopology::new();
let outcome = score.interpret(&inventory, &topology).await?;
// Outcome is already a typed Harmony result (SUCCESS/NOOP/FAILURE/RUNNING/...).
```
**Pros:**
- **Zero new abstractions.** The agent becomes "a Harmony executor that happens to run on a Pi". Everything we already know how to do in Harmony works, for free.
- **Maximum coherence.** There is exactly one way to describe desired state in the whole system: a Score. The type system enforces that a score requesting `K8sclient` cannot be shipped to a Pi topology that does not offer it — at compile time on the control plane, at deserialization time on the agent.
- **Composability.** Higher-order topologies (ADR-015) work unchanged: `FailoverTopology<PiTopology>` gets you HA at the edge for free.
- **Single mental model for the whole team.** "Write a Score" is already the Harmony primitive; no one needs to learn a second one.
**Cons:**
- **Serializability.** This is the hard one. Harmony Scores today hold trait objects, references to live topology state, and embedded closures in places. Making them uniformly serde-serializable is a non-trivial refactor that touches dozens of modules. We would be gating the IoT MVP on a cross-cutting refactor.
- **Agent binary size.** If "the agent can run any Score", it links every module. On a Pi Zero 2 W, that matters. We can mitigate with feature flags, but then we are back to "which scores does *this* agent support?" — i.e., we have reinvented resource-type registration, just spelled differently.
- **Capability scoping is subtle.** We have to be extremely careful about which capabilities `PiTopology` exposes. "A Pi can run containers" is true; "a Pi can run arbitrary k8s clusters" is not. Getting that boundary wrong opens the same attack surface as Alternative A, just hidden behind a Score.
- **Control-plane UX.** The central platform now needs to instantiate Scores for specific Pis, handle their inventories, and ship them. That is heavier than "push a JSON blob".
**Verdict:** The principled end state, almost certainly where we want to be in 18 months. Not shippable for the IoT MVP.
---
### Alternative D — Hybrid: Typed Manifests Now, Scores Later
**Shape:** Ship Alternative B (typed `AgentManifest` with a fixed set of reconcilers). Keep the Score ambition (Alternative C) as an explicit roadmap item. When Scores become uniformly wire-serializable and `PiTopology` is mature, migrate by adding a `ResourceSpec::Score(SerializedScore)` variant. Eventually that variant may subsume the others.
**Pros:**
- **Shippable soon.** Alternative B is the implementable core; we can have a fleet demo in weeks, not months.
- **On a path to the ideal.** We do not dead-end. The `ResourceSpec` enum becomes the migration seam.
- **De-risks the Score serialization refactor.** We learn what resource types we *actually* need on the edge before we refactor the Score layer.
- **Lets us delete Alternative A cleanly.** The shell executor either disappears or survives as a narrow, explicitly-audited `ResourceSpec::ShellJob` variant that documents itself as an escape hatch.
**Cons:**
- Temporarily maintains two vocabularies (`ResourceSpec` at the edge, `Score` in the core). There is a risk they drift before they reconverge.
- Requires team discipline to actually do the C migration and not leave B as the permanent design.
**Verdict:** Recommended.
---
## Recommendation
**Adopt Alternative D (Hybrid: typed manifests now, Scores later).**
Reasoning:
1. **Speed to IoT MVP** is real. Alternative C is a 3-6 month refactor of the Score layer before we can deploy anything; Alternative B can ship within the current iteration.
2. **Long-term coherence with Harmony's design philosophy** is preserved because D has an explicit migration seam to C. We do not paint ourselves into a corner.
3. **The mini-kubelet framing is directly satisfied by B.** Typed resources, reconciler loops, observed-generation pattern, PLEG-style drift detection. This is exactly what the team has been describing.
4. **Capability-trait discipline carries over cleanly.** `Reconciler` is the agent-side analog of a capability trait (`DnsServer`, `K8sclient`, etc.). The rule "capabilities are industry concepts, not tools" applies to `ResourceSpec` too: we name it `Container`, not `Podman`; `SystemdUnit`, not `Systemctl`.
5. **The shell executor is not wasted work.** It proved the NATS KV watch + typed CAS write pattern that Alternative B will also need. It becomes either `ResourceSpec::ShellJob` (audited escape hatch) or gets deleted.
6. **Security posture improves immediately.** A fixed resource-type allowlist is dramatically tighter than "run any shell", even before we add signing or sandboxing.
7. **The IoT product use case actually is "deploy simple workloads to Pi fleets".** Containers, systemd services, config files, network config. That is a short list, and it maps to four or five resource types. We do not need the full expressive power of a Score layer to hit the product milestone.
## Specific Findings on the Current Implementation
`harmony_agent/src/desired_state.rs` (≈250 lines, implemented on this branch):
- **Keep as scaffolding**, do not wire into user tooling.
- The NATS KV watch loop, the `ActualState` CAS write, and the generation-tracking skeleton are all reusable by Alternative B. They are the only parts worth keeping.
- The `execute_command` function (shelling out via `Command::new("sh").arg("-c")`) is the part that bakes in the wrong abstraction. It should be:
1. **Moved behind a `ResourceSpec::ShellJob` reconciler** if we decide to keep shell as an explicit, audited escape hatch, **or**
2. **Deleted** when the first two real reconcilers (Container, SystemdUnit) land.
- The `DesiredStateConfig` / `ActualState` types in `harmony_agent/src/agent/config.rs` are too narrow. They should be replaced by `AgentManifest` / `AgentStatus` as sketched above. `generation: u64` at the manifest level stays; per-resource status is added.
- The existing tests (`executes_command_and_reports_result`, `reports_failure_for_bad_command`) are testing the shell executor specifically; they will be deleted or repurposed when the resource model lands.
## Open Questions (to resolve before implementing B)
1. **What is the minimum viable resource type set for the IoT MVP?** Proposal: `Container`, `SystemdUnit`, `File`. Defer `NetworkConfig`, `ShellJob` until a concrete use case appears.
2. **Where does `AgentManifest` live in the crate graph?** It is consumed by both the control plane and the agent. Likely `harmony_agent_types` (new) or an existing shared types crate.
3. **How are images, files, and secrets referenced?** By content hash + asset store URL (ADR: `harmony_assets`)? By inline payload under a size cap?
4. **What is the reconcile cadence?** On NATS KV change + periodic drift check every N seconds? What is N on a Pi?
5. **How does `AgentStatus` interact with the heartbeat loop?** Is the status written on every reconcile, or aggregated into the heartbeat payload? The heartbeat cares about liveness; the status cares about workload health. They are probably separate KV keys, coupled by generation.
6. **How do we handle partial failures and retry?** Exponential backoff per resource? Global pause on repeated failures? Surface to the control plane via `conditions`?
7. **Can the agent refuse a manifest it does not understand?** (Forward compatibility: new `ResourceSpec` variant rolled out before the agent upgrade.) Proposal: fail loudly and report a typed `UnknownResource` status so the control plane can detect version skew.
## Decision
**None yet.** This ADR is explicitly a proposal to adopt **Alternative D**, pending team review. If approved, a follow-up ADR-023 will specify the concrete `AgentManifest` / `AgentStatus` schema and the initial reconciler set.

View File

@@ -600,6 +600,7 @@ fn build_all_scores() -> Result<Vec<Box<dyn Score<OPNSenseFirewall>>>, Box<dyn s
},
],
private_services: vec![],
wan_firewall_ports: vec![],
};
let dhcp_score = DhcpScore::new(

View File

@@ -69,5 +69,6 @@ fn build_large_score() -> LoadBalancerScore {
lb_service.clone(),
lb_service.clone(),
],
wan_firewall_ports: vec![],
}
}

View File

@@ -15,7 +15,7 @@ use async_trait::async_trait;
use harmony_types::firewall::VipMode;
use harmony_types::id::Id;
use harmony_types::net::{IpAddress, MacAddress};
use log::{info, warn};
use log::info;
use serde::Serialize;
use crate::config::secret::{OPNSenseApiCredentials, OPNSenseFirewallCredentials};
@@ -176,24 +176,8 @@ impl DhcpServer for FirewallPairTopology {
}
async fn list_static_mappings(&self) -> Vec<(MacAddress, IpAddress)> {
let primary_mappings = self.primary.list_static_mappings().await;
let backup_mappings = self.backup.list_static_mappings().await;
let primary_set: std::collections::HashSet<_> = primary_mappings.iter().collect();
let backup_set: std::collections::HashSet<_> = backup_mappings.iter().collect();
let only_primary: Vec<_> = primary_set.difference(&backup_set).collect();
let only_backup: Vec<_> = backup_set.difference(&primary_set).collect();
if !only_primary.is_empty() || !only_backup.is_empty() {
warn!(
"DHCP static mapping mismatch between primary and backup firewalls! \
Only on primary: {:?}, Only on backup: {:?}",
only_primary, only_backup
);
}
primary_mappings
// Return primary's view — both should be identical
self.primary.list_static_mappings().await
}
/// Returns the primary firewall's IP. In a CARP setup, callers

View File

@@ -489,9 +489,6 @@ impl LoadBalancer for DummyInfra {
async fn reload_restart(&self) -> Result<(), ExecutorError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
async fn ensure_wan_access(&self, _port: u16) -> Result<(), ExecutorError> {
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
}
}
#[async_trait]

View File

@@ -37,9 +37,11 @@ pub trait LoadBalancer: Send + Sync {
/// from the WAN interface. Used by load balancers that need to receive
/// external traffic (e.g., OKD ingress on ports 80/443).
///
/// Topologies that don't manage firewall rules (e.g., cloud environments
/// with security groups) should return `Ok(())`.
async fn ensure_wan_access(&self, port: u16) -> Result<(), ExecutorError>;
/// Default implementation is a no-op for topologies that don't manage
/// firewall rules (e.g., cloud environments with security groups).
async fn ensure_wan_access(&self, _port: u16) -> Result<(), ExecutorError> {
Ok(())
}
}
#[derive(Debug, PartialEq, Clone, Serialize)]

View File

@@ -111,16 +111,12 @@ impl LoadBalancer for OPNSenseFirewall {
}
fn haproxy_service_to_harmony(svc: &HaproxyService) -> Option<LoadBalancerService> {
let listening_port = match svc.bind.parse() {
Ok(addr) => addr,
Err(e) => {
warn!(
"Skipping HAProxy service: bind address '{}' is not a valid SocketAddr: {e}",
svc.bind
);
return None;
}
};
let listening_port = svc.bind.parse().unwrap_or_else(|_| {
panic!(
"HAProxy frontend address should be a valid SocketAddr, got {}",
svc.bind
)
});
let backend_servers: Vec<BackendServer> = svc
.servers

View File

@@ -10,7 +10,6 @@ use virt::sys;
use super::error::KvmError;
use super::types::{CdromConfig, NetworkConfig, VmConfig, VmInterface, VmStatus};
use super::xml;
use harmony_types::net::MacAddress;
/// A handle to a libvirt hypervisor.
///
@@ -375,15 +374,14 @@ impl KvmExecutor {
pub async fn set_interface_link(
&self,
vm_name: &str,
mac: &MacAddress,
mac: &str,
up: bool,
) -> Result<(), KvmError> {
let state = if up { "up" } else { "down" };
let mac_str = mac.to_string();
info!("Setting {vm_name} interface {mac_str} link {state}");
info!("Setting {vm_name} interface {mac} link {state}");
let output = tokio::process::Command::new("virsh")
.args(["-c", &self.uri, "domif-setlink", vm_name, &mac_str, state])
.args(["-c", &self.uri, "domif-setlink", vm_name, mac, state])
.output()
.await?;
@@ -422,18 +420,11 @@ impl KvmExecutor {
// virsh domiflist columns: Interface, Type, Source, Model, MAC
let parts: Vec<&str> = line.split_whitespace().collect();
if parts.len() >= 5 {
let mac = match MacAddress::try_from(parts[4].to_string()) {
Ok(m) => m,
Err(e) => {
warn!("Skipping interface with invalid MAC '{}': {e}", parts[4]);
continue;
}
};
interfaces.push(VmInterface {
interface_type: parts[1].to_string(),
source: parts[2].to_string(),
model: parts[3].to_string(),
mac,
mac: parts[4].to_string(),
});
}
}

View File

@@ -1,4 +1,3 @@
use harmony_types::net::MacAddress;
use serde::{Deserialize, Serialize};
/// Information about a VM's network interface, as reported by `virsh domiflist`.
@@ -11,7 +10,7 @@ pub struct VmInterface {
/// Device model (e.g. "virtio")
pub model: String,
/// MAC address
pub mac: MacAddress,
pub mac: String,
}
/// Specifies how a KVM host is accessed.
@@ -96,7 +95,7 @@ pub struct NetworkRef {
pub name: String,
/// Optional fixed MAC address for this interface. When `None`, libvirt
/// assigns one automatically.
pub mac: Option<MacAddress>,
pub mac: Option<String>,
}
impl NetworkRef {
@@ -107,8 +106,8 @@ impl NetworkRef {
}
}
pub fn with_mac(mut self, mac: MacAddress) -> Self {
self.mac = Some(mac);
pub fn with_mac(mut self, mac: impl Into<String>) -> Self {
self.mac = Some(mac.into());
self
}
}
@@ -261,7 +260,7 @@ impl VmConfigBuilder {
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DhcpHost {
/// MAC address (e.g. `"52:54:00:00:50:01"`).
pub mac: MacAddress,
pub mac: String,
/// IP to assign (e.g. `"10.50.0.2"`).
pub ip: String,
/// Optional hostname.
@@ -357,12 +356,12 @@ impl NetworkConfigBuilder {
/// Add a static DHCP host entry (MAC → fixed IP).
pub fn dhcp_host(
mut self,
mac: MacAddress,
mac: impl Into<String>,
ip: impl Into<String>,
name: Option<String>,
) -> Self {
self.dhcp_hosts.push(DhcpHost {
mac,
mac: mac.into(),
ip: ip.into(),
name,
});

View File

@@ -136,7 +136,7 @@ fn nic_devices(vm: &VmConfig) -> String {
.map(|net| {
let mac_line = net
.mac
.as_ref()
.as_deref()
.map(|m| format!("\n <mac address='{m}'/>"))
.unwrap_or_default();
format!(
@@ -221,7 +221,6 @@ mod tests {
use crate::modules::kvm::types::{
BootDevice, ForwardMode, NetworkConfig, NetworkRef, VmConfig,
};
use harmony_types::net::MacAddress;
// ── Domain XML ──────────────────────────────────────────────────────
@@ -285,13 +284,12 @@ mod tests {
#[test]
fn domain_xml_nic_with_mac_address() {
let mac: MacAddress = "52:54:00:aa:bb:cc".to_string().try_into().unwrap();
let vm = VmConfig::builder("mac-test")
.network(NetworkRef::named("mynet").with_mac(mac))
.network(NetworkRef::named("mynet").with_mac("52:54:00:AA:BB:CC"))
.build();
let xml = domain_xml(&vm, "/tmp");
assert!(xml.contains("mac address='52:54:00:aa:bb:cc'"));
assert!(xml.contains("mac address='52:54:00:AA:BB:CC'"));
}
#[test]
@@ -456,11 +454,14 @@ mod tests {
#[test]
fn network_xml_with_dhcp_host() {
let mac: MacAddress = "52:54:00:00:50:01".to_string().try_into().unwrap();
let cfg = NetworkConfig::builder("hostnet")
.subnet("10.50.0.1", 24)
.dhcp_range("10.50.0.100", "10.50.0.200")
.dhcp_host(mac, "10.50.0.2", Some("opnsense".to_string()))
.dhcp_host(
"52:54:00:00:50:01",
"10.50.0.2",
Some("opnsense".to_string()),
)
.build();
let xml = network_xml(&cfg);

View File

@@ -19,6 +19,12 @@ pub struct LoadBalancerScore {
// (listen_interface, LoadBalancerService) tuples or something like that
// I am not sure what to use as listen_interface, should it be interface name, ip address,
// uuid?
/// TCP ports that must be open for inbound WAN traffic.
///
/// The load balancer interpret will call `ensure_wan_access` for each port
/// before configuring services, so that the load balancer is reachable
/// from outside the LAN.
pub wan_firewall_ports: Vec<u16>,
}
impl<T: Topology + LoadBalancer> Score<T> for LoadBalancerScore {
@@ -60,6 +66,11 @@ impl<T: Topology + LoadBalancer> Interpret<T> for LoadBalancerInterpret {
load_balancer.ensure_initialized().await?
);
for port in &self.score.wan_firewall_ports {
info!("Ensuring WAN access for port {port}");
load_balancer.ensure_wan_access(*port).await?;
}
for service in self.score.public_services.iter() {
info!("Ensuring service exists {service:?}");

View File

@@ -350,20 +350,13 @@ impl OKDSetup02BootstrapInterpret {
&self,
inventory: &Inventory,
) -> Result<(), InterpretError> {
let timeout_minutes: u64 = std::env::var("HARMONY_OKD_BOOTSTRAP_TIMEOUT_MINUTES")
.ok()
.and_then(|v| v.parse().ok())
.unwrap_or(90);
info!(
"[Stage 02/Bootstrap] Waiting for bootstrap to complete (timeout: {timeout_minutes}m)..."
);
info!("[Stage 02/Bootstrap] Waiting for bootstrap to complete...");
info!("[Stage 02/Bootstrap] Running: openshift-install wait-for bootstrap-complete");
let okd_installation_path =
format!("./data/okd/installation_files_{}", inventory.location.name);
let child = Command::new("./data/okd/bin/openshift-install")
let output = Command::new("./data/okd/bin/openshift-install")
.args([
"wait-for",
"bootstrap-complete",
@@ -371,17 +364,8 @@ impl OKDSetup02BootstrapInterpret {
&okd_installation_path,
"--log-level=info",
])
.output();
let timeout = std::time::Duration::from_secs(timeout_minutes * 60);
let output = tokio::time::timeout(timeout, child)
.output()
.await
.map_err(|_| {
InterpretError::new(format!(
"[Stage 02/Bootstrap] bootstrap-complete timed out after {timeout_minutes} minutes. \
Set HARMONY_OKD_BOOTSTRAP_TIMEOUT_MINUTES to increase the timeout and retry."
))
})?
.map_err(|e| {
InterpretError::new(format!(
"[Stage 02/Bootstrap] Failed to run openshift-install wait-for bootstrap-complete: {e}"

View File

@@ -56,6 +56,7 @@ impl OKDBootstrapLoadBalancerScore {
load_balancer_score: LoadBalancerScore {
public_services: vec![],
private_services,
wan_firewall_ports: vec![80, 443],
},
}
}

View File

@@ -114,6 +114,7 @@ impl OKDLoadBalancerScore {
load_balancer_score: LoadBalancerScore {
public_services,
private_services,
wan_firewall_ports: vec![80, 443],
},
}
}
@@ -338,6 +339,13 @@ mod tests {
assert_eq!(private_service_22623.backend_servers.len(), 3);
}
#[test]
fn test_wan_firewall_ports_include_http_and_https() {
let topology = create_test_topology();
let score = OKDLoadBalancerScore::new(&topology);
assert_eq!(score.load_balancer_score.wan_firewall_ports, vec![80, 443]);
}
#[test]
fn test_all_backend_servers_have_correct_port() {
let topology = create_test_topology();

View File

@@ -2,6 +2,7 @@ use std::time::Duration;
use harmony_types::id::Id;
use log::info;
use serde::{Deserialize, Serialize};
use super::heartbeat::HeartbeatFailure;
use super::role::AgentRole;
@@ -46,6 +47,9 @@ pub struct AgentConfig {
#[derive(Debug, Clone)]
pub enum DeploymentConfig {
FailoverPostgreSQL(FailoverCNPGConfig),
/// Desired-state convergence mode: agent watches a NATS KV key for commands
/// to execute and reports results back. See ADR-021.
DesiredState,
}
#[derive(Debug, Clone)]
@@ -53,6 +57,38 @@ pub struct FailoverCNPGConfig {
pub cnpg_cluster_name: String,
}
/// The desired state pushed by the central platform to an agent via NATS KV.
/// Key: `desired-state.<agent-id>`
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct DesiredStateConfig {
/// Shell command to execute (passed to `sh -c`)
pub command: String,
/// Monotonic generation counter. The agent only executes when this changes.
pub generation: u64,
}
/// The actual state reported by the agent after executing a desired-state command.
/// Key: `actual-state.<agent-id>`
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct ActualState {
pub agent_id: String,
/// Mirrors the generation from the DesiredStateConfig that was executed
pub generation: u64,
pub status: ExecutionStatus,
pub stdout: String,
pub stderr: String,
pub exit_code: Option<i32>,
/// Timestamp (ms since epoch) when the command was executed, from agent clock (informational)
pub executed_at: u64,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub enum ExecutionStatus {
Success,
Failed,
Running,
}
impl DeploymentConfig {
/// The actual "work" of the heartbeat (e.g., write to NATS, check Postgres)
pub async fn perform_heartbeat(&self) -> Result<(), HeartbeatFailure> {
@@ -62,6 +98,11 @@ impl DeploymentConfig {
// TODO: Implement actual PG check / NATS write here
Ok(())
}
DeploymentConfig::DesiredState => {
// In desired-state mode, heartbeat just proves liveness.
// Workload execution happens in the separate desired-state watcher task.
Ok(())
}
}
}

View File

@@ -18,7 +18,10 @@ pub mod heartbeat;
mod role;
// Re-exports for backwards compatibility
pub use config::{AgentConfig, DeploymentConfig, FailoverCNPGConfig};
pub use config::{
ActualState, AgentConfig, DeploymentConfig, DesiredStateConfig, ExecutionStatus,
FailoverCNPGConfig,
};
pub use heartbeat::{AgentHeartbeat, AgentInfo, ClusterStateData, HeartbeatFailure};
pub use role::AgentRole;

View File

@@ -0,0 +1,246 @@
//! Desired-state convergence watcher (ADR-021).
//!
//! Polls a NATS KV key `desired-state.<agent-id>` for workload definitions.
//! When the generation changes, executes the command and writes the result
//! to `actual-state.<agent-id>`.
use std::sync::Arc;
use std::time::{Duration, SystemTime, UNIX_EPOCH};
use log::{debug, error, info, warn};
use tokio::process::Command;
use crate::agent::{ActualState, DesiredStateConfig, ExecutionStatus};
use crate::store::{KvStore, KvStoreError};
/// Runs the desired-state convergence loop for a single agent.
///
/// This task polls the KV store for changes to the desired state and executes
/// commands when the generation counter advances. It is designed to run as a
/// separate tokio task alongside the heartbeat loop.
pub async fn run_desired_state_watcher<S>(
agent_id: String,
kv: Arc<S>,
poll_interval: Duration,
) where
S: KvStore + Send + Sync + 'static,
{
let desired_key = format!("desired-state.{agent_id}");
let actual_key = format!("actual-state.{agent_id}");
let mut last_generation: Option<u64> = None;
info!("Desired-state watcher started for agent {agent_id}, polling key: {desired_key}");
loop {
match fetch_desired_state(&kv, &desired_key).await {
Ok(Some(desired)) => {
if last_generation == Some(desired.generation) {
debug!(
"Desired state generation {} already executed, skipping",
desired.generation
);
} else {
info!(
"New desired state detected: generation={}, command='{}'",
desired.generation, desired.command
);
let result = execute_command(&agent_id, &desired).await;
match store_actual_state(&kv, &actual_key, &result).await {
Ok(_) => {
info!(
"Actual state stored: generation={}, status={:?}, exit_code={:?}",
result.generation, result.status, result.exit_code
);
}
Err(e) => {
error!("Failed to store actual state: {e}");
}
}
last_generation = Some(desired.generation);
}
}
Ok(None) => {
debug!("No desired state found for {agent_id}, waiting...");
}
Err(e) => {
warn!("Error fetching desired state for {agent_id}: {e}");
}
}
tokio::time::sleep(poll_interval).await;
}
}
async fn fetch_desired_state<S>(
kv: &Arc<S>,
key: &str,
) -> Result<Option<DesiredStateConfig>, KvStoreError>
where
S: KvStore + Send + Sync,
{
match kv.get(key).await {
Ok(result) => {
if let Some(value) = result.value {
let config: DesiredStateConfig =
serde_json::from_value(value).map_err(|e| KvStoreError::DeserializationFailed {
deserialization_error: e.to_string(),
value: String::new(),
})?;
Ok(Some(config))
} else {
Ok(None)
}
}
Err(KvStoreError::KeyNotAvailable(_)) => Ok(None),
Err(e) => Err(e),
}
}
async fn execute_command(agent_id: &str, desired: &DesiredStateConfig) -> ActualState {
let now = SystemTime::now()
.duration_since(UNIX_EPOCH)
.expect("Time went backwards")
.as_millis() as u64;
info!("Executing command for generation {}: sh -c '{}'", desired.generation, desired.command);
let output = Command::new("sh")
.arg("-c")
.arg(&desired.command)
.output()
.await;
match output {
Ok(output) => {
let exit_code = output.status.code();
let status = if output.status.success() {
ExecutionStatus::Success
} else {
ExecutionStatus::Failed
};
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
if !stdout.is_empty() {
debug!("stdout: {stdout}");
}
if !stderr.is_empty() {
warn!("stderr: {stderr}");
}
ActualState {
agent_id: agent_id.to_string(),
generation: desired.generation,
status,
stdout,
stderr,
exit_code,
executed_at: now,
}
}
Err(e) => {
error!("Failed to spawn command: {e}");
ActualState {
agent_id: agent_id.to_string(),
generation: desired.generation,
status: ExecutionStatus::Failed,
stdout: String::new(),
stderr: format!("Failed to spawn command: {e}"),
exit_code: None,
executed_at: now,
}
}
}
}
async fn store_actual_state<S>(
kv: &Arc<S>,
key: &str,
state: &ActualState,
) -> Result<(), KvStoreError>
where
S: KvStore + Send + Sync,
{
let value = serde_json::to_value(state).map_err(|e| KvStoreError::DeserializationFailed {
deserialization_error: e.to_string(),
value: format!("{state:?}"),
})?;
// Use get to find current sequence, then set_strict for safe writes.
// If the key doesn't exist yet, use sequence 0 for the first write.
let expected_seq = match kv.get(key).await {
Ok(result) => result.metadata.sequence,
Err(KvStoreError::KeyNotAvailable(_)) => 0,
Err(e) => return Err(e),
};
kv.set_strict(key, value, expected_seq).await?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
use crate::store::InMemoryKvStore;
use serde_json::json;
#[tokio::test]
async fn executes_command_and_reports_result() {
let kv = Arc::new(InMemoryKvStore::new());
let agent_id = "test-agent";
let desired_key = format!("desired-state.{agent_id}");
let actual_key = format!("actual-state.{agent_id}");
// Write a desired state
let desired = DesiredStateConfig {
command: "echo hello".to_string(),
generation: 1,
};
let value = serde_json::to_value(&desired).unwrap();
kv.set_strict(&desired_key, value, 0).await.unwrap();
// Fetch and execute
let fetched = fetch_desired_state(&kv, &desired_key).await.unwrap().unwrap();
assert_eq!(fetched.generation, 1);
assert_eq!(fetched.command, "echo hello");
let result = execute_command(agent_id, &fetched).await;
assert_eq!(result.status, ExecutionStatus::Success);
assert_eq!(result.stdout.trim(), "hello");
assert_eq!(result.generation, 1);
assert_eq!(result.exit_code, Some(0));
// Store actual state
store_actual_state(&kv, &actual_key, &result).await.unwrap();
// Verify stored value
let stored = kv.get(&actual_key).await.unwrap();
let actual: ActualState = serde_json::from_value(stored.value.unwrap()).unwrap();
assert_eq!(actual.generation, 1);
assert_eq!(actual.status, ExecutionStatus::Success);
}
#[tokio::test]
async fn reports_failure_for_bad_command() {
let desired = DesiredStateConfig {
command: "exit 42".to_string(),
generation: 2,
};
let result = execute_command("test-agent", &desired).await;
assert_eq!(result.status, ExecutionStatus::Failed);
assert_eq!(result.exit_code, Some(42));
assert_eq!(result.generation, 2);
}
#[tokio::test]
async fn returns_none_when_no_desired_state() {
let kv = Arc::new(InMemoryKvStore::new());
let result = fetch_desired_state(&kv, "desired-state.nonexistent").await.unwrap();
assert!(result.is_none());
}
}

View File

@@ -9,6 +9,7 @@ use crate::{
// mod agent_loop;
mod agent;
pub mod desired_state;
pub mod store;
mod workflow;