Compare commits
2 Commits
feat/remov
...
feat/agent
| Author | SHA1 | Date | |
|---|---|---|---|
| 51b39505bb | |||
| 9cd1713788 |
117
docs/adr/021-agent-desired-state-convergence.md
Normal file
117
docs/adr/021-agent-desired-state-convergence.md
Normal file
@@ -0,0 +1,117 @@
|
||||
# ADR-021: Agent Desired-State Convergence — Problem Statement and Initial Proposal
|
||||
|
||||
**Status:** Proposed (under review — see ADR-022 for alternatives)
|
||||
**Date:** 2026-04-09
|
||||
|
||||
> This document was originally drafted as an "Accepted" ADR describing a shell-command executor. On review, the team was not convinced that the shell-executor shape is the right one. It has been re-framed as a **problem statement + one candidate proposal (Alternative A)**. Alternative designs — including a mini-kubelet model and an embedded-Score model — are explored in [ADR-022](./022-agent-desired-state-alternatives.md). A final decision has **not** been made.
|
||||
|
||||
## Context
|
||||
|
||||
The Harmony Agent (ADR-016) currently handles a single use case: PostgreSQL HA failover via `DeploymentConfig::FailoverPostgreSQL`. For the IoT fleet management platform (Raspberry Pi clusters deployed in homes, offices, and community spaces), we need the agent to become a general-purpose desired-state convergence engine.
|
||||
|
||||
Concretely, the central Harmony control plane must be able to:
|
||||
|
||||
1. Express the *desired state* of an individual Pi (or a class of Pis) in a typed, serializable form.
|
||||
2. Ship that desired state to the device over the existing NATS JetStream mesh (ADR-017-1).
|
||||
3. Have the on-device agent reconcile toward it — idempotently, observably, and without manual intervention.
|
||||
4. Read back an authoritative, typed *actual state* so the control plane can report convergence, surface errors, and drive a fleet dashboard.
|
||||
|
||||
The existing heartbeat / failover machinery (ADR-017-3) remains valuable — it proves the agent can maintain persistent NATS connections, do CAS writes against KV, and react to state changes. Whatever desired-state mechanism we add **extends** that foundation rather than replacing it.
|
||||
|
||||
### Design forces
|
||||
|
||||
- **Coherence with the rest of Harmony.** Harmony's entire identity is Score-Topology-Interpret with compile-time safety. A desired-state mechanism that reintroduces stringly-typed, runtime-validated blobs on the edge would be a regression from our own design rules (see `CLAUDE.md`: "Capabilities are industry concepts, not tools", "Scores encapsulate operational complexity", "Scores must be idempotent").
|
||||
- **The "mini-kubelet" framing.** The team is converging on a mental model where the agent is a stripped-down kubelet: it owns a set of local reconcilers, maintains a PLEG-like state machine per managed resource, and converges toward a declarative manifest. ADR-017-3 is already explicitly Kubernetes-inspired for staleness detection. This framing should inform the desired-state design, not fight it.
|
||||
- **Speed to IoT MVP.** We need something shippable soon enough that real Pi fleets can be demoed. Over-engineering the v1 risks never shipping; under-engineering it risks a rewrite once the wrong abstraction is entrenched on hundreds of devices in the field.
|
||||
- **Security.** Whatever lands on the device is, by construction, running with the agent's privileges. A mechanism that reduces to "run this shell string as root" is a very wide blast radius.
|
||||
- **Serializability.** Today, Harmony Scores are *not* uniformly serializable across the wire — many hold trait objects, closures, or references to live topologies. Any design that assumes "just send a Score" needs to confront this.
|
||||
|
||||
## Initial Proposal (Alternative A — Shell Command Executor)
|
||||
|
||||
This is the first-pass design, implemented as a happy-path scaffold on this branch. **It is presented here for critique, not as a settled decision.**
|
||||
|
||||
### Desired-State Model
|
||||
|
||||
Each agent watches a NATS KV key `desired-state.<agent-id>` for its workload definition. When the value changes, the agent executes the workload and reports the result to `actual-state.<agent-id>`. This is a pull-based convergence loop: the control plane writes intent, the agent converges, the control plane reads the result.
|
||||
|
||||
A `DesiredState` is a serializable description of what should be running on the device. For this first iteration, it is a shell command plus a monotonic generation counter.
|
||||
|
||||
```rust
|
||||
enum DeploymentConfig {
|
||||
FailoverPostgreSQL(FailoverCNPGConfig), // existing
|
||||
DesiredState(DesiredStateConfig), // new
|
||||
}
|
||||
|
||||
struct DesiredStateConfig {
|
||||
command: String,
|
||||
generation: u64,
|
||||
}
|
||||
```
|
||||
|
||||
### Config Flow
|
||||
|
||||
```
|
||||
Central Platform NATS JetStream Agent (Pi)
|
||||
================ ============== ==========
|
||||
1. Write desired state -------> KV: desired-state.<agent-id>
|
||||
2. Watch detects change
|
||||
3. Execute workload
|
||||
4. Write result --------> KV: actual-state.<agent-id>
|
||||
5. Read actual state <------- KV: actual-state.<agent-id>
|
||||
```
|
||||
|
||||
The agent's heartbeat loop continues independently. The desired-state watcher runs as a separate async task, sharing the same NATS connection. This separation means a slow command execution does not block heartbeats.
|
||||
|
||||
### State Reporting
|
||||
|
||||
```rust
|
||||
struct ActualState {
|
||||
agent_id: Id,
|
||||
generation: u64, // mirrors the desired-state generation
|
||||
status: ExecutionStatus, // Success, Failed, Running
|
||||
stdout: String,
|
||||
stderr: String,
|
||||
exit_code: Option<i32>,
|
||||
executed_at: u64,
|
||||
}
|
||||
```
|
||||
|
||||
The control plane reads this key to determine convergence. If `actual_state.generation == desired_state.generation` and `status == Success`, the device has converged.
|
||||
|
||||
### Why this shape was chosen first
|
||||
|
||||
- Dirt cheap to implement (≈200 lines, done on this branch).
|
||||
- Works for literally any task a human would type into a Pi shell.
|
||||
- Reuses the existing NATS KV infrastructure and CAS write idiom already proven by the heartbeat loop.
|
||||
- Provides an end-to-end demo path in under a day.
|
||||
|
||||
## Open Questions and Concerns
|
||||
|
||||
The following concerns block promoting this to an "Accepted" decision:
|
||||
|
||||
1. **Wrong abstraction level.** `sh -c "<string>"` is the *opposite* of what Harmony stands for. Harmony exists because IaC tools drown in stringly-typed, runtime-validated config. Shipping arbitrary shell to the edge recreates that problem inside our own agent — at the worst possible place (the device).
|
||||
2. **No idempotency.** `systemctl start foo` and `apt install foo` are not idempotent by themselves. Every Score in Harmony is required to be idempotent. A shell executor pushes that burden onto whoever writes the commands, where we cannot check it.
|
||||
3. **No resource model.** There is no notion of "this manifest owns this systemd unit". When desired state changes, we cannot compute a diff, we cannot garbage-collect the old resource, and we cannot surface "drift" meaningfully. We know generation N was "run"; we do not know what it left behind.
|
||||
4. **No typed status.** `stdout`/`stderr`/`exit_code` is not enough to drive a fleet dashboard. We want typed `Status { container: Running { since, restarts }, unit: Active, file: PresentAt(sha256) }`.
|
||||
5. **No lifecycle.** Shell commands are fire-and-forget. A kubelet-shaped agent needs to know whether a resource is *still* healthy after it was created — liveness and readiness are first-class concerns, not a post-hoc `exit_code` check.
|
||||
6. **Security.** The ADR hand-waves "NATS ACLs + future signing". In practice, v1 lets anyone with write access to the KV bucket execute anything as the agent user. Even with NATS ACLs, the *shape* of the API invites abuse; a typed manifest with an allowlist of resource types has a much narrower attack surface by construction.
|
||||
7. **Generational model is too coarse.** A single `generation: u64` per agent means we can only describe one monolithic "job". Real fleet state is a *set* of resources (this container, this unit, this file). We need per-resource generations, or a manifest-level generation with a sub-resource status map.
|
||||
8. **Incoherent with ADR-017-3's kubelet framing.** That ADR deliberately borrowed K8s vocabulary (staleness, fencing, leader promotion) because kubelet-like semantics are the right ones for resilient edge workloads. Shell-exec abandons that lineage at the first opportunity.
|
||||
9. **Coherence with the Score-Topology-Interpret pattern.** Today's proposal introduces a parallel concept ("DesiredStateConfig") that has nothing to do with Score or Topology. If a Pi is just "a topology with a small capability set" (systemd, podman, files, network), then the right thing to ship is a Score, not a shell string.
|
||||
|
||||
## Status of the Implementation on this Branch
|
||||
|
||||
The happy-path code in `harmony_agent/src/desired_state.rs` (≈250 lines, fully tested) implements Alternative A. It is **scaffolding**, not a committed design:
|
||||
|
||||
- It is useful as a vehicle to prove out the NATS KV watch + typed `ActualState` CAS write pattern, both of which are reusable regardless of which alternative we pick.
|
||||
- It should **not** be wired into user-facing tooling until the architectural decision in ADR-022 is made.
|
||||
- If we adopt Alternative B (mini-kubelet) or C (embedded Scores), the shell executor either becomes one *variant* of a typed `Resource` enum (a `ShellJob` resource, clearly labeled as an escape hatch) or is deleted outright.
|
||||
|
||||
## Next Steps
|
||||
|
||||
1. Review ADR-022 (alternatives + recommendation).
|
||||
2. Pick a target design.
|
||||
3. Either:
|
||||
- Rework `desired_state.rs` to match the chosen target, **or**
|
||||
- Keep it behind a feature flag as a demo fallback while the real design is built.
|
||||
4. Re-file this ADR as "Superseded by ADR-022" or update it in place with the accepted design.
|
||||
218
docs/adr/022-agent-desired-state-alternatives.md
Normal file
218
docs/adr/022-agent-desired-state-alternatives.md
Normal file
@@ -0,0 +1,218 @@
|
||||
# ADR-022: Agent Desired-State — Alternatives and Recommendation
|
||||
|
||||
**Status:** Proposed
|
||||
**Date:** 2026-04-09
|
||||
**Supersedes (candidate):** ADR-021 shell-executor proposal
|
||||
|
||||
## Context
|
||||
|
||||
ADR-021 drafted a first-pass "desired-state convergence" mechanism for the Harmony Agent (ADR-016) in the form of a shell-command executor. On review, that shape raised serious concerns (see ADR-021 §"Open Questions and Concerns"): it is incoherent with Harmony's Score-Topology-Interpret pattern, it is not idempotent, it has no resource model, no typed status, no lifecycle, and it weakens the agent's security posture.
|
||||
|
||||
Separately, the team has been converging on a **"mini-kubelet" framing** for the IoT agent:
|
||||
|
||||
- The agent owns a small, fixed set of *reconcilers*, one per resource type it can manage (systemd unit, container, file, network interface, overlay config...).
|
||||
- The desired state is a *typed manifest* — a bag of resources with identities, generations, and typed status.
|
||||
- The agent runs reconcile loops similar to kubelet's Pod Lifecycle Event Generator (PLEG): for each managed resource, observe actual, compare to desired, apply the minimum delta, update typed status.
|
||||
- Failure and drift are first-class. "I tried, it failed, here is why" is a valid steady state.
|
||||
|
||||
ADR-017-3 already borrows Kubernetes vocabulary (staleness, fencing, promotion) on purpose. Doubling down on the kubelet metaphor at the desired-state layer is the natural continuation, not a tangent.
|
||||
|
||||
This ADR enumerates the candidate designs, argues their tradeoffs honestly, and recommends a path.
|
||||
|
||||
## Alternatives
|
||||
|
||||
### Alternative A — Shell Command Executor (ADR-021 as-is)
|
||||
|
||||
**Shape:** `DesiredState { command: String, generation: u64 }`, agent does `sh -c $command`, pipes stdout/stderr/exit into `ActualState`.
|
||||
|
||||
**Pros:**
|
||||
- Trivial to implement. ~200 LOC, already on this branch.
|
||||
- Works for any task that can be expressed as a shell pipeline — maximum flexibility at v1.
|
||||
- Zero new abstractions: reuses existing NATS KV watch + CAS patterns.
|
||||
- End-to-end demo-able in an afternoon.
|
||||
|
||||
**Cons:**
|
||||
- **Wrong abstraction level.** Harmony's entire thesis is "no more stringly-typed YAML/shell mud pits". This design ships that mud pit *to the edge*.
|
||||
- **Not idempotent.** The burden of idempotency falls on whoever writes the command string. `systemctl start foo` run twice is fine; `apt install foo && echo "done" >> /etc/state` run twice is broken. We cannot enforce correctness.
|
||||
- **No resource model.** No concept of "this manifest owns X". No diffing, no GC, no drift detection, no "what does this agent currently run?".
|
||||
- **No typed status.** stdout/stderr/exit_code does not tell a fleet dashboard "container nginx is running, restarted 3 times, last healthy 2s ago". It tells it "this bash ran and exited 0 once, three minutes ago".
|
||||
- **No lifecycle.** Fire-and-forget; post-exit the agent has no notion of whether the resource is still healthy.
|
||||
- **Security.** Even with NATS ACLs, the API's *shape* invites abuse. Any bug in the control plane that lets a user influence a desired-state write equals RCE on every Pi.
|
||||
- **Incoherent with ADR-017-3 and Score-Topology-Interpret.** Introduces a parallel concept that has nothing to do with the rest of Harmony.
|
||||
|
||||
**Verdict:** Acceptable only as a *named escape hatch* inside a richer design (a `ShellJob` resource variant, explicitly labeled as such and audited). Not acceptable as the whole design.
|
||||
|
||||
---
|
||||
|
||||
### Alternative B — Mini-Kubelet with Typed Resource Manifests
|
||||
|
||||
**Shape:** The agent owns a fixed set of `Resource` variants and one reconciler per variant.
|
||||
|
||||
```rust
|
||||
/// The unit of desired state shipped to an agent.
|
||||
/// Serialized to JSON, pushed via NATS KV to `desired-state.<agent-id>`.
|
||||
struct AgentManifest {
|
||||
generation: u64, // monotonic, control-plane assigned
|
||||
resources: Vec<ManagedResource>,
|
||||
}
|
||||
|
||||
struct ManagedResource {
|
||||
/// Stable, manifest-unique identity. Used for diffing across generations.
|
||||
id: ResourceId,
|
||||
spec: ResourceSpec,
|
||||
}
|
||||
|
||||
enum ResourceSpec {
|
||||
SystemdUnit(SystemdUnitSpec), // ensure unit exists, enabled, active
|
||||
Container(ContainerSpec), // podman/docker run with image, env, volumes
|
||||
File(FileSpec), // path, mode, owner, content (hash or inline)
|
||||
NetworkConfig(NetworkConfigSpec), // interface, addresses, routes
|
||||
ShellJob(ShellJobSpec), // explicit escape hatch, audited separately
|
||||
// ...extend carefully
|
||||
}
|
||||
|
||||
/// What the agent reports back.
|
||||
struct AgentStatus {
|
||||
manifest_generation: u64, // which desired-state gen this reflects
|
||||
observed_generation: u64, // highest gen the agent has *processed*
|
||||
resources: HashMap<ResourceId, ResourceStatus>,
|
||||
conditions: Vec<AgentCondition>, // Ready, Degraded, Reconciling, ...
|
||||
}
|
||||
|
||||
enum ResourceStatus {
|
||||
Pending,
|
||||
Reconciling { since: Timestamp },
|
||||
Ready { since: Timestamp, details: ResourceReadyDetails },
|
||||
Failed { since: Timestamp, error: String, retry_after: Option<Timestamp> },
|
||||
}
|
||||
```
|
||||
|
||||
Each reconciler implements a small trait:
|
||||
|
||||
```rust
|
||||
trait Reconciler {
|
||||
type Spec;
|
||||
type Status;
|
||||
async fn observe(&self, id: &ResourceId) -> Result<Self::Status>;
|
||||
async fn reconcile(&self, id: &ResourceId, spec: &Self::Spec) -> Result<Self::Status>;
|
||||
async fn delete(&self, id: &ResourceId) -> Result<()>;
|
||||
}
|
||||
```
|
||||
|
||||
The agent loop becomes:
|
||||
|
||||
1. Watch `desired-state.<agent-id>` for the latest `AgentManifest`.
|
||||
2. On change, compute diff vs. observed set: additions, updates, deletions.
|
||||
3. Dispatch each resource to its reconciler. Reconcilers are idempotent by contract.
|
||||
4. Aggregate per-resource status into `AgentStatus`, write to `actual-state.<agent-id>` via CAS.
|
||||
5. Re-run periodically to detect drift even when desired state has not changed (PLEG-equivalent).
|
||||
|
||||
**Pros:**
|
||||
- **Declarative and idempotent by construction.** Reconcilers are required to be idempotent; the contract is enforced in Rust traits, not in docs.
|
||||
- **Typed status.** Dashboards, alerts, and the control plane get structured data.
|
||||
- **Drift detection.** Periodic re-observation catches "someone SSH'd in and stopped the service".
|
||||
- **Lifecycle.** Each resource has a clear state machine; health is a first-class concept.
|
||||
- **Coherent with ADR-017-3.** The kubelet framing becomes literal, not metaphorical.
|
||||
- **Narrow attack surface.** The agent only knows how to do a handful of well-audited things. Adding a new capability is an explicit code change, not a new shell string.
|
||||
- **Composable with Harmony's existing philosophy.** `ManagedResource` is to the agent what a Score is to a Topology, at a smaller scale.
|
||||
|
||||
**Cons:**
|
||||
- More upfront design. Each reconciler needs to be written and tested.
|
||||
- Requires us to *commit* to a resource type set and its status schema. Adding a new kind is a versioned change to the wire format.
|
||||
- Duplicates, at the edge, some of the vocabulary already present in Harmony's Score layer (e.g., `FileDeployment`, container deployments). Risk of two parallel abstractions evolving in tension.
|
||||
- Harder to demo in a single afternoon.
|
||||
|
||||
**Verdict:** Strong candidate. Matches the team's mini-kubelet intuition directly.
|
||||
|
||||
---
|
||||
|
||||
### Alternative C — Embedded Score Interpreter on the Agent
|
||||
|
||||
**Shape:** The desired state *is* a Harmony Score (or a set of Scores), serialized and pushed via NATS. The agent hosts a local `PiTopology` that exposes a small, carefully chosen set of capabilities (`SystemdHost`, `ContainerRuntime`, `FileSystemHost`, `NetworkConfigurator`, ...). The agent runs the Score's `interpret` against that local topology.
|
||||
|
||||
```rust
|
||||
// On the control plane:
|
||||
let score = SystemdServiceScore { ... };
|
||||
let wire: SerializedScore = score.to_wire()?;
|
||||
nats.put(format!("desired-state.{agent_id}"), wire).await?;
|
||||
|
||||
// On the agent:
|
||||
let score = SerializedScore::decode(payload)?;
|
||||
let topology = PiTopology::new();
|
||||
let outcome = score.interpret(&inventory, &topology).await?;
|
||||
// Outcome is already a typed Harmony result (SUCCESS/NOOP/FAILURE/RUNNING/...).
|
||||
```
|
||||
|
||||
**Pros:**
|
||||
- **Zero new abstractions.** The agent becomes "a Harmony executor that happens to run on a Pi". Everything we already know how to do in Harmony works, for free.
|
||||
- **Maximum coherence.** There is exactly one way to describe desired state in the whole system: a Score. The type system enforces that a score requesting `K8sclient` cannot be shipped to a Pi topology that does not offer it — at compile time on the control plane, at deserialization time on the agent.
|
||||
- **Composability.** Higher-order topologies (ADR-015) work unchanged: `FailoverTopology<PiTopology>` gets you HA at the edge for free.
|
||||
- **Single mental model for the whole team.** "Write a Score" is already the Harmony primitive; no one needs to learn a second one.
|
||||
|
||||
**Cons:**
|
||||
- **Serializability.** This is the hard one. Harmony Scores today hold trait objects, references to live topology state, and embedded closures in places. Making them uniformly serde-serializable is a non-trivial refactor that touches dozens of modules. We would be gating the IoT MVP on a cross-cutting refactor.
|
||||
- **Agent binary size.** If "the agent can run any Score", it links every module. On a Pi Zero 2 W, that matters. We can mitigate with feature flags, but then we are back to "which scores does *this* agent support?" — i.e., we have reinvented resource-type registration, just spelled differently.
|
||||
- **Capability scoping is subtle.** We have to be extremely careful about which capabilities `PiTopology` exposes. "A Pi can run containers" is true; "a Pi can run arbitrary k8s clusters" is not. Getting that boundary wrong opens the same attack surface as Alternative A, just hidden behind a Score.
|
||||
- **Control-plane UX.** The central platform now needs to instantiate Scores for specific Pis, handle their inventories, and ship them. That is heavier than "push a JSON blob".
|
||||
|
||||
**Verdict:** The principled end state, almost certainly where we want to be in 18 months. Not shippable for the IoT MVP.
|
||||
|
||||
---
|
||||
|
||||
### Alternative D — Hybrid: Typed Manifests Now, Scores Later
|
||||
|
||||
**Shape:** Ship Alternative B (typed `AgentManifest` with a fixed set of reconcilers). Keep the Score ambition (Alternative C) as an explicit roadmap item. When Scores become uniformly wire-serializable and `PiTopology` is mature, migrate by adding a `ResourceSpec::Score(SerializedScore)` variant. Eventually that variant may subsume the others.
|
||||
|
||||
**Pros:**
|
||||
- **Shippable soon.** Alternative B is the implementable core; we can have a fleet demo in weeks, not months.
|
||||
- **On a path to the ideal.** We do not dead-end. The `ResourceSpec` enum becomes the migration seam.
|
||||
- **De-risks the Score serialization refactor.** We learn what resource types we *actually* need on the edge before we refactor the Score layer.
|
||||
- **Lets us delete Alternative A cleanly.** The shell executor either disappears or survives as a narrow, explicitly-audited `ResourceSpec::ShellJob` variant that documents itself as an escape hatch.
|
||||
|
||||
**Cons:**
|
||||
- Temporarily maintains two vocabularies (`ResourceSpec` at the edge, `Score` in the core). There is a risk they drift before they reconverge.
|
||||
- Requires team discipline to actually do the C migration and not leave B as the permanent design.
|
||||
|
||||
**Verdict:** Recommended.
|
||||
|
||||
---
|
||||
|
||||
## Recommendation
|
||||
|
||||
**Adopt Alternative D (Hybrid: typed manifests now, Scores later).**
|
||||
|
||||
Reasoning:
|
||||
|
||||
1. **Speed to IoT MVP** is real. Alternative C is a 3-6 month refactor of the Score layer before we can deploy anything; Alternative B can ship within the current iteration.
|
||||
2. **Long-term coherence with Harmony's design philosophy** is preserved because D has an explicit migration seam to C. We do not paint ourselves into a corner.
|
||||
3. **The mini-kubelet framing is directly satisfied by B.** Typed resources, reconciler loops, observed-generation pattern, PLEG-style drift detection. This is exactly what the team has been describing.
|
||||
4. **Capability-trait discipline carries over cleanly.** `Reconciler` is the agent-side analog of a capability trait (`DnsServer`, `K8sclient`, etc.). The rule "capabilities are industry concepts, not tools" applies to `ResourceSpec` too: we name it `Container`, not `Podman`; `SystemdUnit`, not `Systemctl`.
|
||||
5. **The shell executor is not wasted work.** It proved the NATS KV watch + typed CAS write pattern that Alternative B will also need. It becomes either `ResourceSpec::ShellJob` (audited escape hatch) or gets deleted.
|
||||
6. **Security posture improves immediately.** A fixed resource-type allowlist is dramatically tighter than "run any shell", even before we add signing or sandboxing.
|
||||
7. **The IoT product use case actually is "deploy simple workloads to Pi fleets".** Containers, systemd services, config files, network config. That is a short list, and it maps to four or five resource types. We do not need the full expressive power of a Score layer to hit the product milestone.
|
||||
|
||||
## Specific Findings on the Current Implementation
|
||||
|
||||
`harmony_agent/src/desired_state.rs` (≈250 lines, implemented on this branch):
|
||||
|
||||
- **Keep as scaffolding**, do not wire into user tooling.
|
||||
- The NATS KV watch loop, the `ActualState` CAS write, and the generation-tracking skeleton are all reusable by Alternative B. They are the only parts worth keeping.
|
||||
- The `execute_command` function (shelling out via `Command::new("sh").arg("-c")`) is the part that bakes in the wrong abstraction. It should be:
|
||||
1. **Moved behind a `ResourceSpec::ShellJob` reconciler** if we decide to keep shell as an explicit, audited escape hatch, **or**
|
||||
2. **Deleted** when the first two real reconcilers (Container, SystemdUnit) land.
|
||||
- The `DesiredStateConfig` / `ActualState` types in `harmony_agent/src/agent/config.rs` are too narrow. They should be replaced by `AgentManifest` / `AgentStatus` as sketched above. `generation: u64` at the manifest level stays; per-resource status is added.
|
||||
- The existing tests (`executes_command_and_reports_result`, `reports_failure_for_bad_command`) are testing the shell executor specifically; they will be deleted or repurposed when the resource model lands.
|
||||
|
||||
## Open Questions (to resolve before implementing B)
|
||||
|
||||
1. **What is the minimum viable resource type set for the IoT MVP?** Proposal: `Container`, `SystemdUnit`, `File`. Defer `NetworkConfig`, `ShellJob` until a concrete use case appears.
|
||||
2. **Where does `AgentManifest` live in the crate graph?** It is consumed by both the control plane and the agent. Likely `harmony_agent_types` (new) or an existing shared types crate.
|
||||
3. **How are images, files, and secrets referenced?** By content hash + asset store URL (ADR: `harmony_assets`)? By inline payload under a size cap?
|
||||
4. **What is the reconcile cadence?** On NATS KV change + periodic drift check every N seconds? What is N on a Pi?
|
||||
5. **How does `AgentStatus` interact with the heartbeat loop?** Is the status written on every reconcile, or aggregated into the heartbeat payload? The heartbeat cares about liveness; the status cares about workload health. They are probably separate KV keys, coupled by generation.
|
||||
6. **How do we handle partial failures and retry?** Exponential backoff per resource? Global pause on repeated failures? Surface to the control plane via `conditions`?
|
||||
7. **Can the agent refuse a manifest it does not understand?** (Forward compatibility: new `ResourceSpec` variant rolled out before the agent upgrade.) Proposal: fail loudly and report a typed `UnknownResource` status so the control plane can detect version skew.
|
||||
|
||||
## Decision
|
||||
|
||||
**None yet.** This ADR is explicitly a proposal to adopt **Alternative D**, pending team review. If approved, a follow-up ADR-023 will specify the concrete `AgentManifest` / `AgentStatus` schema and the initial reconciler set.
|
||||
@@ -600,6 +600,7 @@ fn build_all_scores() -> Result<Vec<Box<dyn Score<OPNSenseFirewall>>>, Box<dyn s
|
||||
},
|
||||
],
|
||||
private_services: vec![],
|
||||
wan_firewall_ports: vec![],
|
||||
};
|
||||
|
||||
let dhcp_score = DhcpScore::new(
|
||||
|
||||
@@ -69,5 +69,6 @@ fn build_large_score() -> LoadBalancerScore {
|
||||
lb_service.clone(),
|
||||
lb_service.clone(),
|
||||
],
|
||||
wan_firewall_ports: vec![],
|
||||
}
|
||||
}
|
||||
|
||||
@@ -15,7 +15,7 @@ use async_trait::async_trait;
|
||||
use harmony_types::firewall::VipMode;
|
||||
use harmony_types::id::Id;
|
||||
use harmony_types::net::{IpAddress, MacAddress};
|
||||
use log::{info, warn};
|
||||
use log::info;
|
||||
use serde::Serialize;
|
||||
|
||||
use crate::config::secret::{OPNSenseApiCredentials, OPNSenseFirewallCredentials};
|
||||
@@ -176,24 +176,8 @@ impl DhcpServer for FirewallPairTopology {
|
||||
}
|
||||
|
||||
async fn list_static_mappings(&self) -> Vec<(MacAddress, IpAddress)> {
|
||||
let primary_mappings = self.primary.list_static_mappings().await;
|
||||
let backup_mappings = self.backup.list_static_mappings().await;
|
||||
|
||||
let primary_set: std::collections::HashSet<_> = primary_mappings.iter().collect();
|
||||
let backup_set: std::collections::HashSet<_> = backup_mappings.iter().collect();
|
||||
|
||||
let only_primary: Vec<_> = primary_set.difference(&backup_set).collect();
|
||||
let only_backup: Vec<_> = backup_set.difference(&primary_set).collect();
|
||||
|
||||
if !only_primary.is_empty() || !only_backup.is_empty() {
|
||||
warn!(
|
||||
"DHCP static mapping mismatch between primary and backup firewalls! \
|
||||
Only on primary: {:?}, Only on backup: {:?}",
|
||||
only_primary, only_backup
|
||||
);
|
||||
}
|
||||
|
||||
primary_mappings
|
||||
// Return primary's view — both should be identical
|
||||
self.primary.list_static_mappings().await
|
||||
}
|
||||
|
||||
/// Returns the primary firewall's IP. In a CARP setup, callers
|
||||
|
||||
@@ -489,9 +489,6 @@ impl LoadBalancer for DummyInfra {
|
||||
async fn reload_restart(&self) -> Result<(), ExecutorError> {
|
||||
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
|
||||
}
|
||||
async fn ensure_wan_access(&self, _port: u16) -> Result<(), ExecutorError> {
|
||||
unimplemented!("{}", UNIMPLEMENTED_DUMMY_INFRA)
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
||||
@@ -37,9 +37,11 @@ pub trait LoadBalancer: Send + Sync {
|
||||
/// from the WAN interface. Used by load balancers that need to receive
|
||||
/// external traffic (e.g., OKD ingress on ports 80/443).
|
||||
///
|
||||
/// Topologies that don't manage firewall rules (e.g., cloud environments
|
||||
/// with security groups) should return `Ok(())`.
|
||||
async fn ensure_wan_access(&self, port: u16) -> Result<(), ExecutorError>;
|
||||
/// Default implementation is a no-op for topologies that don't manage
|
||||
/// firewall rules (e.g., cloud environments with security groups).
|
||||
async fn ensure_wan_access(&self, _port: u16) -> Result<(), ExecutorError> {
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
#[derive(Debug, PartialEq, Clone, Serialize)]
|
||||
|
||||
@@ -111,16 +111,12 @@ impl LoadBalancer for OPNSenseFirewall {
|
||||
}
|
||||
|
||||
fn haproxy_service_to_harmony(svc: &HaproxyService) -> Option<LoadBalancerService> {
|
||||
let listening_port = match svc.bind.parse() {
|
||||
Ok(addr) => addr,
|
||||
Err(e) => {
|
||||
warn!(
|
||||
"Skipping HAProxy service: bind address '{}' is not a valid SocketAddr: {e}",
|
||||
svc.bind
|
||||
);
|
||||
return None;
|
||||
}
|
||||
};
|
||||
let listening_port = svc.bind.parse().unwrap_or_else(|_| {
|
||||
panic!(
|
||||
"HAProxy frontend address should be a valid SocketAddr, got {}",
|
||||
svc.bind
|
||||
)
|
||||
});
|
||||
|
||||
let backend_servers: Vec<BackendServer> = svc
|
||||
.servers
|
||||
|
||||
@@ -10,7 +10,6 @@ use virt::sys;
|
||||
use super::error::KvmError;
|
||||
use super::types::{CdromConfig, NetworkConfig, VmConfig, VmInterface, VmStatus};
|
||||
use super::xml;
|
||||
use harmony_types::net::MacAddress;
|
||||
|
||||
/// A handle to a libvirt hypervisor.
|
||||
///
|
||||
@@ -375,15 +374,14 @@ impl KvmExecutor {
|
||||
pub async fn set_interface_link(
|
||||
&self,
|
||||
vm_name: &str,
|
||||
mac: &MacAddress,
|
||||
mac: &str,
|
||||
up: bool,
|
||||
) -> Result<(), KvmError> {
|
||||
let state = if up { "up" } else { "down" };
|
||||
let mac_str = mac.to_string();
|
||||
info!("Setting {vm_name} interface {mac_str} link {state}");
|
||||
info!("Setting {vm_name} interface {mac} link {state}");
|
||||
|
||||
let output = tokio::process::Command::new("virsh")
|
||||
.args(["-c", &self.uri, "domif-setlink", vm_name, &mac_str, state])
|
||||
.args(["-c", &self.uri, "domif-setlink", vm_name, mac, state])
|
||||
.output()
|
||||
.await?;
|
||||
|
||||
@@ -422,18 +420,11 @@ impl KvmExecutor {
|
||||
// virsh domiflist columns: Interface, Type, Source, Model, MAC
|
||||
let parts: Vec<&str> = line.split_whitespace().collect();
|
||||
if parts.len() >= 5 {
|
||||
let mac = match MacAddress::try_from(parts[4].to_string()) {
|
||||
Ok(m) => m,
|
||||
Err(e) => {
|
||||
warn!("Skipping interface with invalid MAC '{}': {e}", parts[4]);
|
||||
continue;
|
||||
}
|
||||
};
|
||||
interfaces.push(VmInterface {
|
||||
interface_type: parts[1].to_string(),
|
||||
source: parts[2].to_string(),
|
||||
model: parts[3].to_string(),
|
||||
mac,
|
||||
mac: parts[4].to_string(),
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1,4 +1,3 @@
|
||||
use harmony_types::net::MacAddress;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
/// Information about a VM's network interface, as reported by `virsh domiflist`.
|
||||
@@ -11,7 +10,7 @@ pub struct VmInterface {
|
||||
/// Device model (e.g. "virtio")
|
||||
pub model: String,
|
||||
/// MAC address
|
||||
pub mac: MacAddress,
|
||||
pub mac: String,
|
||||
}
|
||||
|
||||
/// Specifies how a KVM host is accessed.
|
||||
@@ -96,7 +95,7 @@ pub struct NetworkRef {
|
||||
pub name: String,
|
||||
/// Optional fixed MAC address for this interface. When `None`, libvirt
|
||||
/// assigns one automatically.
|
||||
pub mac: Option<MacAddress>,
|
||||
pub mac: Option<String>,
|
||||
}
|
||||
|
||||
impl NetworkRef {
|
||||
@@ -107,8 +106,8 @@ impl NetworkRef {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn with_mac(mut self, mac: MacAddress) -> Self {
|
||||
self.mac = Some(mac);
|
||||
pub fn with_mac(mut self, mac: impl Into<String>) -> Self {
|
||||
self.mac = Some(mac.into());
|
||||
self
|
||||
}
|
||||
}
|
||||
@@ -261,7 +260,7 @@ impl VmConfigBuilder {
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DhcpHost {
|
||||
/// MAC address (e.g. `"52:54:00:00:50:01"`).
|
||||
pub mac: MacAddress,
|
||||
pub mac: String,
|
||||
/// IP to assign (e.g. `"10.50.0.2"`).
|
||||
pub ip: String,
|
||||
/// Optional hostname.
|
||||
@@ -357,12 +356,12 @@ impl NetworkConfigBuilder {
|
||||
/// Add a static DHCP host entry (MAC → fixed IP).
|
||||
pub fn dhcp_host(
|
||||
mut self,
|
||||
mac: MacAddress,
|
||||
mac: impl Into<String>,
|
||||
ip: impl Into<String>,
|
||||
name: Option<String>,
|
||||
) -> Self {
|
||||
self.dhcp_hosts.push(DhcpHost {
|
||||
mac,
|
||||
mac: mac.into(),
|
||||
ip: ip.into(),
|
||||
name,
|
||||
});
|
||||
|
||||
@@ -136,7 +136,7 @@ fn nic_devices(vm: &VmConfig) -> String {
|
||||
.map(|net| {
|
||||
let mac_line = net
|
||||
.mac
|
||||
.as_ref()
|
||||
.as_deref()
|
||||
.map(|m| format!("\n <mac address='{m}'/>"))
|
||||
.unwrap_or_default();
|
||||
format!(
|
||||
@@ -221,7 +221,6 @@ mod tests {
|
||||
use crate::modules::kvm::types::{
|
||||
BootDevice, ForwardMode, NetworkConfig, NetworkRef, VmConfig,
|
||||
};
|
||||
use harmony_types::net::MacAddress;
|
||||
|
||||
// ── Domain XML ──────────────────────────────────────────────────────
|
||||
|
||||
@@ -285,13 +284,12 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn domain_xml_nic_with_mac_address() {
|
||||
let mac: MacAddress = "52:54:00:aa:bb:cc".to_string().try_into().unwrap();
|
||||
let vm = VmConfig::builder("mac-test")
|
||||
.network(NetworkRef::named("mynet").with_mac(mac))
|
||||
.network(NetworkRef::named("mynet").with_mac("52:54:00:AA:BB:CC"))
|
||||
.build();
|
||||
|
||||
let xml = domain_xml(&vm, "/tmp");
|
||||
assert!(xml.contains("mac address='52:54:00:aa:bb:cc'"));
|
||||
assert!(xml.contains("mac address='52:54:00:AA:BB:CC'"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
@@ -456,11 +454,14 @@ mod tests {
|
||||
|
||||
#[test]
|
||||
fn network_xml_with_dhcp_host() {
|
||||
let mac: MacAddress = "52:54:00:00:50:01".to_string().try_into().unwrap();
|
||||
let cfg = NetworkConfig::builder("hostnet")
|
||||
.subnet("10.50.0.1", 24)
|
||||
.dhcp_range("10.50.0.100", "10.50.0.200")
|
||||
.dhcp_host(mac, "10.50.0.2", Some("opnsense".to_string()))
|
||||
.dhcp_host(
|
||||
"52:54:00:00:50:01",
|
||||
"10.50.0.2",
|
||||
Some("opnsense".to_string()),
|
||||
)
|
||||
.build();
|
||||
|
||||
let xml = network_xml(&cfg);
|
||||
|
||||
@@ -19,6 +19,12 @@ pub struct LoadBalancerScore {
|
||||
// (listen_interface, LoadBalancerService) tuples or something like that
|
||||
// I am not sure what to use as listen_interface, should it be interface name, ip address,
|
||||
// uuid?
|
||||
/// TCP ports that must be open for inbound WAN traffic.
|
||||
///
|
||||
/// The load balancer interpret will call `ensure_wan_access` for each port
|
||||
/// before configuring services, so that the load balancer is reachable
|
||||
/// from outside the LAN.
|
||||
pub wan_firewall_ports: Vec<u16>,
|
||||
}
|
||||
|
||||
impl<T: Topology + LoadBalancer> Score<T> for LoadBalancerScore {
|
||||
@@ -60,6 +66,11 @@ impl<T: Topology + LoadBalancer> Interpret<T> for LoadBalancerInterpret {
|
||||
load_balancer.ensure_initialized().await?
|
||||
);
|
||||
|
||||
for port in &self.score.wan_firewall_ports {
|
||||
info!("Ensuring WAN access for port {port}");
|
||||
load_balancer.ensure_wan_access(*port).await?;
|
||||
}
|
||||
|
||||
for service in self.score.public_services.iter() {
|
||||
info!("Ensuring service exists {service:?}");
|
||||
|
||||
|
||||
@@ -350,20 +350,13 @@ impl OKDSetup02BootstrapInterpret {
|
||||
&self,
|
||||
inventory: &Inventory,
|
||||
) -> Result<(), InterpretError> {
|
||||
let timeout_minutes: u64 = std::env::var("HARMONY_OKD_BOOTSTRAP_TIMEOUT_MINUTES")
|
||||
.ok()
|
||||
.and_then(|v| v.parse().ok())
|
||||
.unwrap_or(90);
|
||||
|
||||
info!(
|
||||
"[Stage 02/Bootstrap] Waiting for bootstrap to complete (timeout: {timeout_minutes}m)..."
|
||||
);
|
||||
info!("[Stage 02/Bootstrap] Waiting for bootstrap to complete...");
|
||||
info!("[Stage 02/Bootstrap] Running: openshift-install wait-for bootstrap-complete");
|
||||
|
||||
let okd_installation_path =
|
||||
format!("./data/okd/installation_files_{}", inventory.location.name);
|
||||
|
||||
let child = Command::new("./data/okd/bin/openshift-install")
|
||||
let output = Command::new("./data/okd/bin/openshift-install")
|
||||
.args([
|
||||
"wait-for",
|
||||
"bootstrap-complete",
|
||||
@@ -371,17 +364,8 @@ impl OKDSetup02BootstrapInterpret {
|
||||
&okd_installation_path,
|
||||
"--log-level=info",
|
||||
])
|
||||
.output();
|
||||
|
||||
let timeout = std::time::Duration::from_secs(timeout_minutes * 60);
|
||||
let output = tokio::time::timeout(timeout, child)
|
||||
.output()
|
||||
.await
|
||||
.map_err(|_| {
|
||||
InterpretError::new(format!(
|
||||
"[Stage 02/Bootstrap] bootstrap-complete timed out after {timeout_minutes} minutes. \
|
||||
Set HARMONY_OKD_BOOTSTRAP_TIMEOUT_MINUTES to increase the timeout and retry."
|
||||
))
|
||||
})?
|
||||
.map_err(|e| {
|
||||
InterpretError::new(format!(
|
||||
"[Stage 02/Bootstrap] Failed to run openshift-install wait-for bootstrap-complete: {e}"
|
||||
|
||||
@@ -56,6 +56,7 @@ impl OKDBootstrapLoadBalancerScore {
|
||||
load_balancer_score: LoadBalancerScore {
|
||||
public_services: vec![],
|
||||
private_services,
|
||||
wan_firewall_ports: vec![80, 443],
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
@@ -114,6 +114,7 @@ impl OKDLoadBalancerScore {
|
||||
load_balancer_score: LoadBalancerScore {
|
||||
public_services,
|
||||
private_services,
|
||||
wan_firewall_ports: vec![80, 443],
|
||||
},
|
||||
}
|
||||
}
|
||||
@@ -338,6 +339,13 @@ mod tests {
|
||||
assert_eq!(private_service_22623.backend_servers.len(), 3);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_wan_firewall_ports_include_http_and_https() {
|
||||
let topology = create_test_topology();
|
||||
let score = OKDLoadBalancerScore::new(&topology);
|
||||
assert_eq!(score.load_balancer_score.wan_firewall_ports, vec![80, 443]);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn test_all_backend_servers_have_correct_port() {
|
||||
let topology = create_test_topology();
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::time::Duration;
|
||||
|
||||
use harmony_types::id::Id;
|
||||
use log::info;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::heartbeat::HeartbeatFailure;
|
||||
use super::role::AgentRole;
|
||||
@@ -46,6 +47,9 @@ pub struct AgentConfig {
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum DeploymentConfig {
|
||||
FailoverPostgreSQL(FailoverCNPGConfig),
|
||||
/// Desired-state convergence mode: agent watches a NATS KV key for commands
|
||||
/// to execute and reports results back. See ADR-021.
|
||||
DesiredState,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -53,6 +57,38 @@ pub struct FailoverCNPGConfig {
|
||||
pub cnpg_cluster_name: String,
|
||||
}
|
||||
|
||||
/// The desired state pushed by the central platform to an agent via NATS KV.
|
||||
/// Key: `desired-state.<agent-id>`
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DesiredStateConfig {
|
||||
/// Shell command to execute (passed to `sh -c`)
|
||||
pub command: String,
|
||||
/// Monotonic generation counter. The agent only executes when this changes.
|
||||
pub generation: u64,
|
||||
}
|
||||
|
||||
/// The actual state reported by the agent after executing a desired-state command.
|
||||
/// Key: `actual-state.<agent-id>`
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ActualState {
|
||||
pub agent_id: String,
|
||||
/// Mirrors the generation from the DesiredStateConfig that was executed
|
||||
pub generation: u64,
|
||||
pub status: ExecutionStatus,
|
||||
pub stdout: String,
|
||||
pub stderr: String,
|
||||
pub exit_code: Option<i32>,
|
||||
/// Timestamp (ms since epoch) when the command was executed, from agent clock (informational)
|
||||
pub executed_at: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub enum ExecutionStatus {
|
||||
Success,
|
||||
Failed,
|
||||
Running,
|
||||
}
|
||||
|
||||
impl DeploymentConfig {
|
||||
/// The actual "work" of the heartbeat (e.g., write to NATS, check Postgres)
|
||||
pub async fn perform_heartbeat(&self) -> Result<(), HeartbeatFailure> {
|
||||
@@ -62,6 +98,11 @@ impl DeploymentConfig {
|
||||
// TODO: Implement actual PG check / NATS write here
|
||||
Ok(())
|
||||
}
|
||||
DeploymentConfig::DesiredState => {
|
||||
// In desired-state mode, heartbeat just proves liveness.
|
||||
// Workload execution happens in the separate desired-state watcher task.
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,10 @@ pub mod heartbeat;
|
||||
mod role;
|
||||
|
||||
// Re-exports for backwards compatibility
|
||||
pub use config::{AgentConfig, DeploymentConfig, FailoverCNPGConfig};
|
||||
pub use config::{
|
||||
ActualState, AgentConfig, DeploymentConfig, DesiredStateConfig, ExecutionStatus,
|
||||
FailoverCNPGConfig,
|
||||
};
|
||||
pub use heartbeat::{AgentHeartbeat, AgentInfo, ClusterStateData, HeartbeatFailure};
|
||||
pub use role::AgentRole;
|
||||
|
||||
|
||||
246
harmony_agent/src/desired_state.rs
Normal file
246
harmony_agent/src/desired_state.rs
Normal file
@@ -0,0 +1,246 @@
|
||||
//! Desired-state convergence watcher (ADR-021).
|
||||
//!
|
||||
//! Polls a NATS KV key `desired-state.<agent-id>` for workload definitions.
|
||||
//! When the generation changes, executes the command and writes the result
|
||||
//! to `actual-state.<agent-id>`.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use log::{debug, error, info, warn};
|
||||
use tokio::process::Command;
|
||||
|
||||
use crate::agent::{ActualState, DesiredStateConfig, ExecutionStatus};
|
||||
use crate::store::{KvStore, KvStoreError};
|
||||
|
||||
/// Runs the desired-state convergence loop for a single agent.
|
||||
///
|
||||
/// This task polls the KV store for changes to the desired state and executes
|
||||
/// commands when the generation counter advances. It is designed to run as a
|
||||
/// separate tokio task alongside the heartbeat loop.
|
||||
pub async fn run_desired_state_watcher<S>(
|
||||
agent_id: String,
|
||||
kv: Arc<S>,
|
||||
poll_interval: Duration,
|
||||
) where
|
||||
S: KvStore + Send + Sync + 'static,
|
||||
{
|
||||
let desired_key = format!("desired-state.{agent_id}");
|
||||
let actual_key = format!("actual-state.{agent_id}");
|
||||
let mut last_generation: Option<u64> = None;
|
||||
|
||||
info!("Desired-state watcher started for agent {agent_id}, polling key: {desired_key}");
|
||||
|
||||
loop {
|
||||
match fetch_desired_state(&kv, &desired_key).await {
|
||||
Ok(Some(desired)) => {
|
||||
if last_generation == Some(desired.generation) {
|
||||
debug!(
|
||||
"Desired state generation {} already executed, skipping",
|
||||
desired.generation
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
"New desired state detected: generation={}, command='{}'",
|
||||
desired.generation, desired.command
|
||||
);
|
||||
|
||||
let result = execute_command(&agent_id, &desired).await;
|
||||
|
||||
match store_actual_state(&kv, &actual_key, &result).await {
|
||||
Ok(_) => {
|
||||
info!(
|
||||
"Actual state stored: generation={}, status={:?}, exit_code={:?}",
|
||||
result.generation, result.status, result.exit_code
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to store actual state: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
last_generation = Some(desired.generation);
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
debug!("No desired state found for {agent_id}, waiting...");
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Error fetching desired state for {agent_id}: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_desired_state<S>(
|
||||
kv: &Arc<S>,
|
||||
key: &str,
|
||||
) -> Result<Option<DesiredStateConfig>, KvStoreError>
|
||||
where
|
||||
S: KvStore + Send + Sync,
|
||||
{
|
||||
match kv.get(key).await {
|
||||
Ok(result) => {
|
||||
if let Some(value) = result.value {
|
||||
let config: DesiredStateConfig =
|
||||
serde_json::from_value(value).map_err(|e| KvStoreError::DeserializationFailed {
|
||||
deserialization_error: e.to_string(),
|
||||
value: String::new(),
|
||||
})?;
|
||||
Ok(Some(config))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
Err(KvStoreError::KeyNotAvailable(_)) => Ok(None),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_command(agent_id: &str, desired: &DesiredStateConfig) -> ActualState {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_millis() as u64;
|
||||
|
||||
info!("Executing command for generation {}: sh -c '{}'", desired.generation, desired.command);
|
||||
|
||||
let output = Command::new("sh")
|
||||
.arg("-c")
|
||||
.arg(&desired.command)
|
||||
.output()
|
||||
.await;
|
||||
|
||||
match output {
|
||||
Ok(output) => {
|
||||
let exit_code = output.status.code();
|
||||
let status = if output.status.success() {
|
||||
ExecutionStatus::Success
|
||||
} else {
|
||||
ExecutionStatus::Failed
|
||||
};
|
||||
|
||||
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
|
||||
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
|
||||
|
||||
if !stdout.is_empty() {
|
||||
debug!("stdout: {stdout}");
|
||||
}
|
||||
if !stderr.is_empty() {
|
||||
warn!("stderr: {stderr}");
|
||||
}
|
||||
|
||||
ActualState {
|
||||
agent_id: agent_id.to_string(),
|
||||
generation: desired.generation,
|
||||
status,
|
||||
stdout,
|
||||
stderr,
|
||||
exit_code,
|
||||
executed_at: now,
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to spawn command: {e}");
|
||||
ActualState {
|
||||
agent_id: agent_id.to_string(),
|
||||
generation: desired.generation,
|
||||
status: ExecutionStatus::Failed,
|
||||
stdout: String::new(),
|
||||
stderr: format!("Failed to spawn command: {e}"),
|
||||
exit_code: None,
|
||||
executed_at: now,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn store_actual_state<S>(
|
||||
kv: &Arc<S>,
|
||||
key: &str,
|
||||
state: &ActualState,
|
||||
) -> Result<(), KvStoreError>
|
||||
where
|
||||
S: KvStore + Send + Sync,
|
||||
{
|
||||
let value = serde_json::to_value(state).map_err(|e| KvStoreError::DeserializationFailed {
|
||||
deserialization_error: e.to_string(),
|
||||
value: format!("{state:?}"),
|
||||
})?;
|
||||
|
||||
// Use get to find current sequence, then set_strict for safe writes.
|
||||
// If the key doesn't exist yet, use sequence 0 for the first write.
|
||||
let expected_seq = match kv.get(key).await {
|
||||
Ok(result) => result.metadata.sequence,
|
||||
Err(KvStoreError::KeyNotAvailable(_)) => 0,
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
kv.set_strict(key, value, expected_seq).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::store::InMemoryKvStore;
|
||||
use serde_json::json;
|
||||
|
||||
#[tokio::test]
|
||||
async fn executes_command_and_reports_result() {
|
||||
let kv = Arc::new(InMemoryKvStore::new());
|
||||
let agent_id = "test-agent";
|
||||
let desired_key = format!("desired-state.{agent_id}");
|
||||
let actual_key = format!("actual-state.{agent_id}");
|
||||
|
||||
// Write a desired state
|
||||
let desired = DesiredStateConfig {
|
||||
command: "echo hello".to_string(),
|
||||
generation: 1,
|
||||
};
|
||||
let value = serde_json::to_value(&desired).unwrap();
|
||||
kv.set_strict(&desired_key, value, 0).await.unwrap();
|
||||
|
||||
// Fetch and execute
|
||||
let fetched = fetch_desired_state(&kv, &desired_key).await.unwrap().unwrap();
|
||||
assert_eq!(fetched.generation, 1);
|
||||
assert_eq!(fetched.command, "echo hello");
|
||||
|
||||
let result = execute_command(agent_id, &fetched).await;
|
||||
assert_eq!(result.status, ExecutionStatus::Success);
|
||||
assert_eq!(result.stdout.trim(), "hello");
|
||||
assert_eq!(result.generation, 1);
|
||||
assert_eq!(result.exit_code, Some(0));
|
||||
|
||||
// Store actual state
|
||||
store_actual_state(&kv, &actual_key, &result).await.unwrap();
|
||||
|
||||
// Verify stored value
|
||||
let stored = kv.get(&actual_key).await.unwrap();
|
||||
let actual: ActualState = serde_json::from_value(stored.value.unwrap()).unwrap();
|
||||
assert_eq!(actual.generation, 1);
|
||||
assert_eq!(actual.status, ExecutionStatus::Success);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reports_failure_for_bad_command() {
|
||||
let desired = DesiredStateConfig {
|
||||
command: "exit 42".to_string(),
|
||||
generation: 2,
|
||||
};
|
||||
|
||||
let result = execute_command("test-agent", &desired).await;
|
||||
assert_eq!(result.status, ExecutionStatus::Failed);
|
||||
assert_eq!(result.exit_code, Some(42));
|
||||
assert_eq!(result.generation, 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn returns_none_when_no_desired_state() {
|
||||
let kv = Arc::new(InMemoryKvStore::new());
|
||||
let result = fetch_desired_state(&kv, "desired-state.nonexistent").await.unwrap();
|
||||
assert!(result.is_none());
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,7 @@ use crate::{
|
||||
|
||||
// mod agent_loop;
|
||||
mod agent;
|
||||
pub mod desired_state;
|
||||
pub mod store;
|
||||
mod workflow;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user