diff --git a/docs/adr/021-agent-desired-state-convergence.md b/docs/adr/021-agent-desired-state-convergence.md new file mode 100644 index 0000000..527da8c --- /dev/null +++ b/docs/adr/021-agent-desired-state-convergence.md @@ -0,0 +1,117 @@ +# ADR-021: Agent Desired-State Convergence — Problem Statement and Initial Proposal + +**Status:** Proposed (under review — see ADR-022 for alternatives) +**Date:** 2026-04-09 + +> This document was originally drafted as an "Accepted" ADR describing a shell-command executor. On review, the team was not convinced that the shell-executor shape is the right one. It has been re-framed as a **problem statement + one candidate proposal (Alternative A)**. Alternative designs — including a mini-kubelet model and an embedded-Score model — are explored in [ADR-022](./022-agent-desired-state-alternatives.md). A final decision has **not** been made. + +## Context + +The Harmony Agent (ADR-016) currently handles a single use case: PostgreSQL HA failover via `DeploymentConfig::FailoverPostgreSQL`. For the IoT fleet management platform (Raspberry Pi clusters deployed in homes, offices, and community spaces), we need the agent to become a general-purpose desired-state convergence engine. + +Concretely, the central Harmony control plane must be able to: + +1. Express the *desired state* of an individual Pi (or a class of Pis) in a typed, serializable form. +2. Ship that desired state to the device over the existing NATS JetStream mesh (ADR-017-1). +3. Have the on-device agent reconcile toward it — idempotently, observably, and without manual intervention. +4. Read back an authoritative, typed *actual state* so the control plane can report convergence, surface errors, and drive a fleet dashboard. + +The existing heartbeat / failover machinery (ADR-017-3) remains valuable — it proves the agent can maintain persistent NATS connections, do CAS writes against KV, and react to state changes. Whatever desired-state mechanism we add **extends** that foundation rather than replacing it. + +### Design forces + +- **Coherence with the rest of Harmony.** Harmony's entire identity is Score-Topology-Interpret with compile-time safety. A desired-state mechanism that reintroduces stringly-typed, runtime-validated blobs on the edge would be a regression from our own design rules (see `CLAUDE.md`: "Capabilities are industry concepts, not tools", "Scores encapsulate operational complexity", "Scores must be idempotent"). +- **The "mini-kubelet" framing.** The team is converging on a mental model where the agent is a stripped-down kubelet: it owns a set of local reconcilers, maintains a PLEG-like state machine per managed resource, and converges toward a declarative manifest. ADR-017-3 is already explicitly Kubernetes-inspired for staleness detection. This framing should inform the desired-state design, not fight it. +- **Speed to IoT MVP.** We need something shippable soon enough that real Pi fleets can be demoed. Over-engineering the v1 risks never shipping; under-engineering it risks a rewrite once the wrong abstraction is entrenched on hundreds of devices in the field. +- **Security.** Whatever lands on the device is, by construction, running with the agent's privileges. A mechanism that reduces to "run this shell string as root" is a very wide blast radius. +- **Serializability.** Today, Harmony Scores are *not* uniformly serializable across the wire — many hold trait objects, closures, or references to live topologies. Any design that assumes "just send a Score" needs to confront this. + +## Initial Proposal (Alternative A — Shell Command Executor) + +This is the first-pass design, implemented as a happy-path scaffold on this branch. **It is presented here for critique, not as a settled decision.** + +### Desired-State Model + +Each agent watches a NATS KV key `desired-state.` for its workload definition. When the value changes, the agent executes the workload and reports the result to `actual-state.`. This is a pull-based convergence loop: the control plane writes intent, the agent converges, the control plane reads the result. + +A `DesiredState` is a serializable description of what should be running on the device. For this first iteration, it is a shell command plus a monotonic generation counter. + +```rust +enum DeploymentConfig { + FailoverPostgreSQL(FailoverCNPGConfig), // existing + DesiredState(DesiredStateConfig), // new +} + +struct DesiredStateConfig { + command: String, + generation: u64, +} +``` + +### Config Flow + +``` + Central Platform NATS JetStream Agent (Pi) + ================ ============== ========== + 1. Write desired state -------> KV: desired-state. + 2. Watch detects change + 3. Execute workload + 4. Write result --------> KV: actual-state. + 5. Read actual state <------- KV: actual-state. +``` + +The agent's heartbeat loop continues independently. The desired-state watcher runs as a separate async task, sharing the same NATS connection. This separation means a slow command execution does not block heartbeats. + +### State Reporting + +```rust +struct ActualState { + agent_id: Id, + generation: u64, // mirrors the desired-state generation + status: ExecutionStatus, // Success, Failed, Running + stdout: String, + stderr: String, + exit_code: Option, + executed_at: u64, +} +``` + +The control plane reads this key to determine convergence. If `actual_state.generation == desired_state.generation` and `status == Success`, the device has converged. + +### Why this shape was chosen first + +- Dirt cheap to implement (≈200 lines, done on this branch). +- Works for literally any task a human would type into a Pi shell. +- Reuses the existing NATS KV infrastructure and CAS write idiom already proven by the heartbeat loop. +- Provides an end-to-end demo path in under a day. + +## Open Questions and Concerns + +The following concerns block promoting this to an "Accepted" decision: + +1. **Wrong abstraction level.** `sh -c ""` is the *opposite* of what Harmony stands for. Harmony exists because IaC tools drown in stringly-typed, runtime-validated config. Shipping arbitrary shell to the edge recreates that problem inside our own agent — at the worst possible place (the device). +2. **No idempotency.** `systemctl start foo` and `apt install foo` are not idempotent by themselves. Every Score in Harmony is required to be idempotent. A shell executor pushes that burden onto whoever writes the commands, where we cannot check it. +3. **No resource model.** There is no notion of "this manifest owns this systemd unit". When desired state changes, we cannot compute a diff, we cannot garbage-collect the old resource, and we cannot surface "drift" meaningfully. We know generation N was "run"; we do not know what it left behind. +4. **No typed status.** `stdout`/`stderr`/`exit_code` is not enough to drive a fleet dashboard. We want typed `Status { container: Running { since, restarts }, unit: Active, file: PresentAt(sha256) }`. +5. **No lifecycle.** Shell commands are fire-and-forget. A kubelet-shaped agent needs to know whether a resource is *still* healthy after it was created — liveness and readiness are first-class concerns, not a post-hoc `exit_code` check. +6. **Security.** The ADR hand-waves "NATS ACLs + future signing". In practice, v1 lets anyone with write access to the KV bucket execute anything as the agent user. Even with NATS ACLs, the *shape* of the API invites abuse; a typed manifest with an allowlist of resource types has a much narrower attack surface by construction. +7. **Generational model is too coarse.** A single `generation: u64` per agent means we can only describe one monolithic "job". Real fleet state is a *set* of resources (this container, this unit, this file). We need per-resource generations, or a manifest-level generation with a sub-resource status map. +8. **Incoherent with ADR-017-3's kubelet framing.** That ADR deliberately borrowed K8s vocabulary (staleness, fencing, leader promotion) because kubelet-like semantics are the right ones for resilient edge workloads. Shell-exec abandons that lineage at the first opportunity. +9. **Coherence with the Score-Topology-Interpret pattern.** Today's proposal introduces a parallel concept ("DesiredStateConfig") that has nothing to do with Score or Topology. If a Pi is just "a topology with a small capability set" (systemd, podman, files, network), then the right thing to ship is a Score, not a shell string. + +## Status of the Implementation on this Branch + +The happy-path code in `harmony_agent/src/desired_state.rs` (≈250 lines, fully tested) implements Alternative A. It is **scaffolding**, not a committed design: + +- It is useful as a vehicle to prove out the NATS KV watch + typed `ActualState` CAS write pattern, both of which are reusable regardless of which alternative we pick. +- It should **not** be wired into user-facing tooling until the architectural decision in ADR-022 is made. +- If we adopt Alternative B (mini-kubelet) or C (embedded Scores), the shell executor either becomes one *variant* of a typed `Resource` enum (a `ShellJob` resource, clearly labeled as an escape hatch) or is deleted outright. + +## Next Steps + +1. Review ADR-022 (alternatives + recommendation). +2. Pick a target design. +3. Either: + - Rework `desired_state.rs` to match the chosen target, **or** + - Keep it behind a feature flag as a demo fallback while the real design is built. +4. Re-file this ADR as "Superseded by ADR-022" or update it in place with the accepted design. diff --git a/docs/adr/022-agent-desired-state-alternatives.md b/docs/adr/022-agent-desired-state-alternatives.md new file mode 100644 index 0000000..35da656 --- /dev/null +++ b/docs/adr/022-agent-desired-state-alternatives.md @@ -0,0 +1,218 @@ +# ADR-022: Agent Desired-State — Alternatives and Recommendation + +**Status:** Proposed +**Date:** 2026-04-09 +**Supersedes (candidate):** ADR-021 shell-executor proposal + +## Context + +ADR-021 drafted a first-pass "desired-state convergence" mechanism for the Harmony Agent (ADR-016) in the form of a shell-command executor. On review, that shape raised serious concerns (see ADR-021 §"Open Questions and Concerns"): it is incoherent with Harmony's Score-Topology-Interpret pattern, it is not idempotent, it has no resource model, no typed status, no lifecycle, and it weakens the agent's security posture. + +Separately, the team has been converging on a **"mini-kubelet" framing** for the IoT agent: + +- The agent owns a small, fixed set of *reconcilers*, one per resource type it can manage (systemd unit, container, file, network interface, overlay config...). +- The desired state is a *typed manifest* — a bag of resources with identities, generations, and typed status. +- The agent runs reconcile loops similar to kubelet's Pod Lifecycle Event Generator (PLEG): for each managed resource, observe actual, compare to desired, apply the minimum delta, update typed status. +- Failure and drift are first-class. "I tried, it failed, here is why" is a valid steady state. + +ADR-017-3 already borrows Kubernetes vocabulary (staleness, fencing, promotion) on purpose. Doubling down on the kubelet metaphor at the desired-state layer is the natural continuation, not a tangent. + +This ADR enumerates the candidate designs, argues their tradeoffs honestly, and recommends a path. + +## Alternatives + +### Alternative A — Shell Command Executor (ADR-021 as-is) + +**Shape:** `DesiredState { command: String, generation: u64 }`, agent does `sh -c $command`, pipes stdout/stderr/exit into `ActualState`. + +**Pros:** +- Trivial to implement. ~200 LOC, already on this branch. +- Works for any task that can be expressed as a shell pipeline — maximum flexibility at v1. +- Zero new abstractions: reuses existing NATS KV watch + CAS patterns. +- End-to-end demo-able in an afternoon. + +**Cons:** +- **Wrong abstraction level.** Harmony's entire thesis is "no more stringly-typed YAML/shell mud pits". This design ships that mud pit *to the edge*. +- **Not idempotent.** The burden of idempotency falls on whoever writes the command string. `systemctl start foo` run twice is fine; `apt install foo && echo "done" >> /etc/state` run twice is broken. We cannot enforce correctness. +- **No resource model.** No concept of "this manifest owns X". No diffing, no GC, no drift detection, no "what does this agent currently run?". +- **No typed status.** stdout/stderr/exit_code does not tell a fleet dashboard "container nginx is running, restarted 3 times, last healthy 2s ago". It tells it "this bash ran and exited 0 once, three minutes ago". +- **No lifecycle.** Fire-and-forget; post-exit the agent has no notion of whether the resource is still healthy. +- **Security.** Even with NATS ACLs, the API's *shape* invites abuse. Any bug in the control plane that lets a user influence a desired-state write equals RCE on every Pi. +- **Incoherent with ADR-017-3 and Score-Topology-Interpret.** Introduces a parallel concept that has nothing to do with the rest of Harmony. + +**Verdict:** Acceptable only as a *named escape hatch* inside a richer design (a `ShellJob` resource variant, explicitly labeled as such and audited). Not acceptable as the whole design. + +--- + +### Alternative B — Mini-Kubelet with Typed Resource Manifests + +**Shape:** The agent owns a fixed set of `Resource` variants and one reconciler per variant. + +```rust +/// The unit of desired state shipped to an agent. +/// Serialized to JSON, pushed via NATS KV to `desired-state.`. +struct AgentManifest { + generation: u64, // monotonic, control-plane assigned + resources: Vec, +} + +struct ManagedResource { + /// Stable, manifest-unique identity. Used for diffing across generations. + id: ResourceId, + spec: ResourceSpec, +} + +enum ResourceSpec { + SystemdUnit(SystemdUnitSpec), // ensure unit exists, enabled, active + Container(ContainerSpec), // podman/docker run with image, env, volumes + File(FileSpec), // path, mode, owner, content (hash or inline) + NetworkConfig(NetworkConfigSpec), // interface, addresses, routes + ShellJob(ShellJobSpec), // explicit escape hatch, audited separately + // ...extend carefully +} + +/// What the agent reports back. +struct AgentStatus { + manifest_generation: u64, // which desired-state gen this reflects + observed_generation: u64, // highest gen the agent has *processed* + resources: HashMap, + conditions: Vec, // Ready, Degraded, Reconciling, ... +} + +enum ResourceStatus { + Pending, + Reconciling { since: Timestamp }, + Ready { since: Timestamp, details: ResourceReadyDetails }, + Failed { since: Timestamp, error: String, retry_after: Option }, +} +``` + +Each reconciler implements a small trait: + +```rust +trait Reconciler { + type Spec; + type Status; + async fn observe(&self, id: &ResourceId) -> Result; + async fn reconcile(&self, id: &ResourceId, spec: &Self::Spec) -> Result; + async fn delete(&self, id: &ResourceId) -> Result<()>; +} +``` + +The agent loop becomes: + +1. Watch `desired-state.` for the latest `AgentManifest`. +2. On change, compute diff vs. observed set: additions, updates, deletions. +3. Dispatch each resource to its reconciler. Reconcilers are idempotent by contract. +4. Aggregate per-resource status into `AgentStatus`, write to `actual-state.` via CAS. +5. Re-run periodically to detect drift even when desired state has not changed (PLEG-equivalent). + +**Pros:** +- **Declarative and idempotent by construction.** Reconcilers are required to be idempotent; the contract is enforced in Rust traits, not in docs. +- **Typed status.** Dashboards, alerts, and the control plane get structured data. +- **Drift detection.** Periodic re-observation catches "someone SSH'd in and stopped the service". +- **Lifecycle.** Each resource has a clear state machine; health is a first-class concept. +- **Coherent with ADR-017-3.** The kubelet framing becomes literal, not metaphorical. +- **Narrow attack surface.** The agent only knows how to do a handful of well-audited things. Adding a new capability is an explicit code change, not a new shell string. +- **Composable with Harmony's existing philosophy.** `ManagedResource` is to the agent what a Score is to a Topology, at a smaller scale. + +**Cons:** +- More upfront design. Each reconciler needs to be written and tested. +- Requires us to *commit* to a resource type set and its status schema. Adding a new kind is a versioned change to the wire format. +- Duplicates, at the edge, some of the vocabulary already present in Harmony's Score layer (e.g., `FileDeployment`, container deployments). Risk of two parallel abstractions evolving in tension. +- Harder to demo in a single afternoon. + +**Verdict:** Strong candidate. Matches the team's mini-kubelet intuition directly. + +--- + +### Alternative C — Embedded Score Interpreter on the Agent + +**Shape:** The desired state *is* a Harmony Score (or a set of Scores), serialized and pushed via NATS. The agent hosts a local `PiTopology` that exposes a small, carefully chosen set of capabilities (`SystemdHost`, `ContainerRuntime`, `FileSystemHost`, `NetworkConfigurator`, ...). The agent runs the Score's `interpret` against that local topology. + +```rust +// On the control plane: +let score = SystemdServiceScore { ... }; +let wire: SerializedScore = score.to_wire()?; +nats.put(format!("desired-state.{agent_id}"), wire).await?; + +// On the agent: +let score = SerializedScore::decode(payload)?; +let topology = PiTopology::new(); +let outcome = score.interpret(&inventory, &topology).await?; +// Outcome is already a typed Harmony result (SUCCESS/NOOP/FAILURE/RUNNING/...). +``` + +**Pros:** +- **Zero new abstractions.** The agent becomes "a Harmony executor that happens to run on a Pi". Everything we already know how to do in Harmony works, for free. +- **Maximum coherence.** There is exactly one way to describe desired state in the whole system: a Score. The type system enforces that a score requesting `K8sclient` cannot be shipped to a Pi topology that does not offer it — at compile time on the control plane, at deserialization time on the agent. +- **Composability.** Higher-order topologies (ADR-015) work unchanged: `FailoverTopology` gets you HA at the edge for free. +- **Single mental model for the whole team.** "Write a Score" is already the Harmony primitive; no one needs to learn a second one. + +**Cons:** +- **Serializability.** This is the hard one. Harmony Scores today hold trait objects, references to live topology state, and embedded closures in places. Making them uniformly serde-serializable is a non-trivial refactor that touches dozens of modules. We would be gating the IoT MVP on a cross-cutting refactor. +- **Agent binary size.** If "the agent can run any Score", it links every module. On a Pi Zero 2 W, that matters. We can mitigate with feature flags, but then we are back to "which scores does *this* agent support?" — i.e., we have reinvented resource-type registration, just spelled differently. +- **Capability scoping is subtle.** We have to be extremely careful about which capabilities `PiTopology` exposes. "A Pi can run containers" is true; "a Pi can run arbitrary k8s clusters" is not. Getting that boundary wrong opens the same attack surface as Alternative A, just hidden behind a Score. +- **Control-plane UX.** The central platform now needs to instantiate Scores for specific Pis, handle their inventories, and ship them. That is heavier than "push a JSON blob". + +**Verdict:** The principled end state, almost certainly where we want to be in 18 months. Not shippable for the IoT MVP. + +--- + +### Alternative D — Hybrid: Typed Manifests Now, Scores Later + +**Shape:** Ship Alternative B (typed `AgentManifest` with a fixed set of reconcilers). Keep the Score ambition (Alternative C) as an explicit roadmap item. When Scores become uniformly wire-serializable and `PiTopology` is mature, migrate by adding a `ResourceSpec::Score(SerializedScore)` variant. Eventually that variant may subsume the others. + +**Pros:** +- **Shippable soon.** Alternative B is the implementable core; we can have a fleet demo in weeks, not months. +- **On a path to the ideal.** We do not dead-end. The `ResourceSpec` enum becomes the migration seam. +- **De-risks the Score serialization refactor.** We learn what resource types we *actually* need on the edge before we refactor the Score layer. +- **Lets us delete Alternative A cleanly.** The shell executor either disappears or survives as a narrow, explicitly-audited `ResourceSpec::ShellJob` variant that documents itself as an escape hatch. + +**Cons:** +- Temporarily maintains two vocabularies (`ResourceSpec` at the edge, `Score` in the core). There is a risk they drift before they reconverge. +- Requires team discipline to actually do the C migration and not leave B as the permanent design. + +**Verdict:** Recommended. + +--- + +## Recommendation + +**Adopt Alternative D (Hybrid: typed manifests now, Scores later).** + +Reasoning: + +1. **Speed to IoT MVP** is real. Alternative C is a 3-6 month refactor of the Score layer before we can deploy anything; Alternative B can ship within the current iteration. +2. **Long-term coherence with Harmony's design philosophy** is preserved because D has an explicit migration seam to C. We do not paint ourselves into a corner. +3. **The mini-kubelet framing is directly satisfied by B.** Typed resources, reconciler loops, observed-generation pattern, PLEG-style drift detection. This is exactly what the team has been describing. +4. **Capability-trait discipline carries over cleanly.** `Reconciler` is the agent-side analog of a capability trait (`DnsServer`, `K8sclient`, etc.). The rule "capabilities are industry concepts, not tools" applies to `ResourceSpec` too: we name it `Container`, not `Podman`; `SystemdUnit`, not `Systemctl`. +5. **The shell executor is not wasted work.** It proved the NATS KV watch + typed CAS write pattern that Alternative B will also need. It becomes either `ResourceSpec::ShellJob` (audited escape hatch) or gets deleted. +6. **Security posture improves immediately.** A fixed resource-type allowlist is dramatically tighter than "run any shell", even before we add signing or sandboxing. +7. **The IoT product use case actually is "deploy simple workloads to Pi fleets".** Containers, systemd services, config files, network config. That is a short list, and it maps to four or five resource types. We do not need the full expressive power of a Score layer to hit the product milestone. + +## Specific Findings on the Current Implementation + +`harmony_agent/src/desired_state.rs` (≈250 lines, implemented on this branch): + +- **Keep as scaffolding**, do not wire into user tooling. +- The NATS KV watch loop, the `ActualState` CAS write, and the generation-tracking skeleton are all reusable by Alternative B. They are the only parts worth keeping. +- The `execute_command` function (shelling out via `Command::new("sh").arg("-c")`) is the part that bakes in the wrong abstraction. It should be: + 1. **Moved behind a `ResourceSpec::ShellJob` reconciler** if we decide to keep shell as an explicit, audited escape hatch, **or** + 2. **Deleted** when the first two real reconcilers (Container, SystemdUnit) land. +- The `DesiredStateConfig` / `ActualState` types in `harmony_agent/src/agent/config.rs` are too narrow. They should be replaced by `AgentManifest` / `AgentStatus` as sketched above. `generation: u64` at the manifest level stays; per-resource status is added. +- The existing tests (`executes_command_and_reports_result`, `reports_failure_for_bad_command`) are testing the shell executor specifically; they will be deleted or repurposed when the resource model lands. + +## Open Questions (to resolve before implementing B) + +1. **What is the minimum viable resource type set for the IoT MVP?** Proposal: `Container`, `SystemdUnit`, `File`. Defer `NetworkConfig`, `ShellJob` until a concrete use case appears. +2. **Where does `AgentManifest` live in the crate graph?** It is consumed by both the control plane and the agent. Likely `harmony_agent_types` (new) or an existing shared types crate. +3. **How are images, files, and secrets referenced?** By content hash + asset store URL (ADR: `harmony_assets`)? By inline payload under a size cap? +4. **What is the reconcile cadence?** On NATS KV change + periodic drift check every N seconds? What is N on a Pi? +5. **How does `AgentStatus` interact with the heartbeat loop?** Is the status written on every reconcile, or aggregated into the heartbeat payload? The heartbeat cares about liveness; the status cares about workload health. They are probably separate KV keys, coupled by generation. +6. **How do we handle partial failures and retry?** Exponential backoff per resource? Global pause on repeated failures? Surface to the control plane via `conditions`? +7. **Can the agent refuse a manifest it does not understand?** (Forward compatibility: new `ResourceSpec` variant rolled out before the agent upgrade.) Proposal: fail loudly and report a typed `UnknownResource` status so the control plane can detect version skew. + +## Decision + +**None yet.** This ADR is explicitly a proposal to adopt **Alternative D**, pending team review. If approved, a follow-up ADR-023 will specify the concrete `AgentManifest` / `AgentStatus` schema and the initial reconciler set. diff --git a/harmony_agent/src/agent/config.rs b/harmony_agent/src/agent/config.rs index 86b731c..dd750ea 100644 --- a/harmony_agent/src/agent/config.rs +++ b/harmony_agent/src/agent/config.rs @@ -2,6 +2,7 @@ use std::time::Duration; use harmony_types::id::Id; use log::info; +use serde::{Deserialize, Serialize}; use super::heartbeat::HeartbeatFailure; use super::role::AgentRole; @@ -46,6 +47,9 @@ pub struct AgentConfig { #[derive(Debug, Clone)] pub enum DeploymentConfig { FailoverPostgreSQL(FailoverCNPGConfig), + /// Desired-state convergence mode: agent watches a NATS KV key for commands + /// to execute and reports results back. See ADR-021. + DesiredState, } #[derive(Debug, Clone)] @@ -53,6 +57,38 @@ pub struct FailoverCNPGConfig { pub cnpg_cluster_name: String, } +/// The desired state pushed by the central platform to an agent via NATS KV. +/// Key: `desired-state.` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct DesiredStateConfig { + /// Shell command to execute (passed to `sh -c`) + pub command: String, + /// Monotonic generation counter. The agent only executes when this changes. + pub generation: u64, +} + +/// The actual state reported by the agent after executing a desired-state command. +/// Key: `actual-state.` +#[derive(Debug, Clone, Serialize, Deserialize)] +pub struct ActualState { + pub agent_id: String, + /// Mirrors the generation from the DesiredStateConfig that was executed + pub generation: u64, + pub status: ExecutionStatus, + pub stdout: String, + pub stderr: String, + pub exit_code: Option, + /// Timestamp (ms since epoch) when the command was executed, from agent clock (informational) + pub executed_at: u64, +} + +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +pub enum ExecutionStatus { + Success, + Failed, + Running, +} + impl DeploymentConfig { /// The actual "work" of the heartbeat (e.g., write to NATS, check Postgres) pub async fn perform_heartbeat(&self) -> Result<(), HeartbeatFailure> { @@ -62,6 +98,11 @@ impl DeploymentConfig { // TODO: Implement actual PG check / NATS write here Ok(()) } + DeploymentConfig::DesiredState => { + // In desired-state mode, heartbeat just proves liveness. + // Workload execution happens in the separate desired-state watcher task. + Ok(()) + } } } diff --git a/harmony_agent/src/agent/mod.rs b/harmony_agent/src/agent/mod.rs index 3291aea..5b0e7fc 100644 --- a/harmony_agent/src/agent/mod.rs +++ b/harmony_agent/src/agent/mod.rs @@ -18,7 +18,10 @@ pub mod heartbeat; mod role; // Re-exports for backwards compatibility -pub use config::{AgentConfig, DeploymentConfig, FailoverCNPGConfig}; +pub use config::{ + ActualState, AgentConfig, DeploymentConfig, DesiredStateConfig, ExecutionStatus, + FailoverCNPGConfig, +}; pub use heartbeat::{AgentHeartbeat, AgentInfo, ClusterStateData, HeartbeatFailure}; pub use role::AgentRole; diff --git a/harmony_agent/src/desired_state.rs b/harmony_agent/src/desired_state.rs new file mode 100644 index 0000000..454ccd8 --- /dev/null +++ b/harmony_agent/src/desired_state.rs @@ -0,0 +1,246 @@ +//! Desired-state convergence watcher (ADR-021). +//! +//! Polls a NATS KV key `desired-state.` for workload definitions. +//! When the generation changes, executes the command and writes the result +//! to `actual-state.`. + +use std::sync::Arc; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use log::{debug, error, info, warn}; +use tokio::process::Command; + +use crate::agent::{ActualState, DesiredStateConfig, ExecutionStatus}; +use crate::store::{KvStore, KvStoreError}; + +/// Runs the desired-state convergence loop for a single agent. +/// +/// This task polls the KV store for changes to the desired state and executes +/// commands when the generation counter advances. It is designed to run as a +/// separate tokio task alongside the heartbeat loop. +pub async fn run_desired_state_watcher( + agent_id: String, + kv: Arc, + poll_interval: Duration, +) where + S: KvStore + Send + Sync + 'static, +{ + let desired_key = format!("desired-state.{agent_id}"); + let actual_key = format!("actual-state.{agent_id}"); + let mut last_generation: Option = None; + + info!("Desired-state watcher started for agent {agent_id}, polling key: {desired_key}"); + + loop { + match fetch_desired_state(&kv, &desired_key).await { + Ok(Some(desired)) => { + if last_generation == Some(desired.generation) { + debug!( + "Desired state generation {} already executed, skipping", + desired.generation + ); + } else { + info!( + "New desired state detected: generation={}, command='{}'", + desired.generation, desired.command + ); + + let result = execute_command(&agent_id, &desired).await; + + match store_actual_state(&kv, &actual_key, &result).await { + Ok(_) => { + info!( + "Actual state stored: generation={}, status={:?}, exit_code={:?}", + result.generation, result.status, result.exit_code + ); + } + Err(e) => { + error!("Failed to store actual state: {e}"); + } + } + + last_generation = Some(desired.generation); + } + } + Ok(None) => { + debug!("No desired state found for {agent_id}, waiting..."); + } + Err(e) => { + warn!("Error fetching desired state for {agent_id}: {e}"); + } + } + + tokio::time::sleep(poll_interval).await; + } +} + +async fn fetch_desired_state( + kv: &Arc, + key: &str, +) -> Result, KvStoreError> +where + S: KvStore + Send + Sync, +{ + match kv.get(key).await { + Ok(result) => { + if let Some(value) = result.value { + let config: DesiredStateConfig = + serde_json::from_value(value).map_err(|e| KvStoreError::DeserializationFailed { + deserialization_error: e.to_string(), + value: String::new(), + })?; + Ok(Some(config)) + } else { + Ok(None) + } + } + Err(KvStoreError::KeyNotAvailable(_)) => Ok(None), + Err(e) => Err(e), + } +} + +async fn execute_command(agent_id: &str, desired: &DesiredStateConfig) -> ActualState { + let now = SystemTime::now() + .duration_since(UNIX_EPOCH) + .expect("Time went backwards") + .as_millis() as u64; + + info!("Executing command for generation {}: sh -c '{}'", desired.generation, desired.command); + + let output = Command::new("sh") + .arg("-c") + .arg(&desired.command) + .output() + .await; + + match output { + Ok(output) => { + let exit_code = output.status.code(); + let status = if output.status.success() { + ExecutionStatus::Success + } else { + ExecutionStatus::Failed + }; + + let stdout = String::from_utf8_lossy(&output.stdout).to_string(); + let stderr = String::from_utf8_lossy(&output.stderr).to_string(); + + if !stdout.is_empty() { + debug!("stdout: {stdout}"); + } + if !stderr.is_empty() { + warn!("stderr: {stderr}"); + } + + ActualState { + agent_id: agent_id.to_string(), + generation: desired.generation, + status, + stdout, + stderr, + exit_code, + executed_at: now, + } + } + Err(e) => { + error!("Failed to spawn command: {e}"); + ActualState { + agent_id: agent_id.to_string(), + generation: desired.generation, + status: ExecutionStatus::Failed, + stdout: String::new(), + stderr: format!("Failed to spawn command: {e}"), + exit_code: None, + executed_at: now, + } + } + } +} + +async fn store_actual_state( + kv: &Arc, + key: &str, + state: &ActualState, +) -> Result<(), KvStoreError> +where + S: KvStore + Send + Sync, +{ + let value = serde_json::to_value(state).map_err(|e| KvStoreError::DeserializationFailed { + deserialization_error: e.to_string(), + value: format!("{state:?}"), + })?; + + // Use get to find current sequence, then set_strict for safe writes. + // If the key doesn't exist yet, use sequence 0 for the first write. + let expected_seq = match kv.get(key).await { + Ok(result) => result.metadata.sequence, + Err(KvStoreError::KeyNotAvailable(_)) => 0, + Err(e) => return Err(e), + }; + + kv.set_strict(key, value, expected_seq).await?; + Ok(()) +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::store::InMemoryKvStore; + use serde_json::json; + + #[tokio::test] + async fn executes_command_and_reports_result() { + let kv = Arc::new(InMemoryKvStore::new()); + let agent_id = "test-agent"; + let desired_key = format!("desired-state.{agent_id}"); + let actual_key = format!("actual-state.{agent_id}"); + + // Write a desired state + let desired = DesiredStateConfig { + command: "echo hello".to_string(), + generation: 1, + }; + let value = serde_json::to_value(&desired).unwrap(); + kv.set_strict(&desired_key, value, 0).await.unwrap(); + + // Fetch and execute + let fetched = fetch_desired_state(&kv, &desired_key).await.unwrap().unwrap(); + assert_eq!(fetched.generation, 1); + assert_eq!(fetched.command, "echo hello"); + + let result = execute_command(agent_id, &fetched).await; + assert_eq!(result.status, ExecutionStatus::Success); + assert_eq!(result.stdout.trim(), "hello"); + assert_eq!(result.generation, 1); + assert_eq!(result.exit_code, Some(0)); + + // Store actual state + store_actual_state(&kv, &actual_key, &result).await.unwrap(); + + // Verify stored value + let stored = kv.get(&actual_key).await.unwrap(); + let actual: ActualState = serde_json::from_value(stored.value.unwrap()).unwrap(); + assert_eq!(actual.generation, 1); + assert_eq!(actual.status, ExecutionStatus::Success); + } + + #[tokio::test] + async fn reports_failure_for_bad_command() { + let desired = DesiredStateConfig { + command: "exit 42".to_string(), + generation: 2, + }; + + let result = execute_command("test-agent", &desired).await; + assert_eq!(result.status, ExecutionStatus::Failed); + assert_eq!(result.exit_code, Some(42)); + assert_eq!(result.generation, 2); + } + + #[tokio::test] + async fn returns_none_when_no_desired_state() { + let kv = Arc::new(InMemoryKvStore::new()); + let result = fetch_desired_state(&kv, "desired-state.nonexistent").await.unwrap(); + assert!(result.is_none()); + } +} diff --git a/harmony_agent/src/main.rs b/harmony_agent/src/main.rs index 2218758..583d048 100644 --- a/harmony_agent/src/main.rs +++ b/harmony_agent/src/main.rs @@ -9,6 +9,7 @@ use crate::{ // mod agent_loop; mod agent; +pub mod desired_state; pub mod store; mod workflow;