Compare commits
2 Commits
feat/arm-c
...
feat/agent
| Author | SHA1 | Date | |
|---|---|---|---|
| 51b39505bb | |||
| 9cd1713788 |
@@ -3,6 +3,3 @@ rustflags = ["-C", "link-arg=/STACK:8000000"]
|
||||
|
||||
[target.x86_64-pc-windows-gnu]
|
||||
rustflags = ["-C", "link-arg=-Wl,--stack,8000000"]
|
||||
|
||||
[target.aarch64-unknown-linux-gnu]
|
||||
linker = "aarch64-linux-gnu-gcc"
|
||||
|
||||
@@ -1,78 +0,0 @@
|
||||
name: Build ARM agent binaries
|
||||
on:
|
||||
push:
|
||||
tags:
|
||||
- 'v*'
|
||||
- 'snapshot-*'
|
||||
workflow_dispatch:
|
||||
|
||||
jobs:
|
||||
build_arm_agents:
|
||||
container:
|
||||
image: hub.nationtech.io/harmony/harmony_composer:latest
|
||||
runs-on: docker
|
||||
steps:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
with:
|
||||
submodules: 'recursive'
|
||||
fetch-depth: 1
|
||||
|
||||
- name: Install ARM cross-compilation toolchain
|
||||
run: |
|
||||
apt-get update -qq
|
||||
apt-get install -y -qq gcc-aarch64-linux-gnu
|
||||
rustup target add aarch64-unknown-linux-gnu
|
||||
|
||||
- name: Build agent crates for aarch64
|
||||
run: |
|
||||
cargo build --release --target aarch64-unknown-linux-gnu \
|
||||
-p harmony_agent \
|
||||
-p harmony_inventory_agent
|
||||
|
||||
- name: Install jq
|
||||
run: apt-get install -y -qq jq
|
||||
|
||||
- name: Get or create release
|
||||
run: |
|
||||
TAG_NAME="${GITHUB_REF_NAME}"
|
||||
|
||||
# Try to get existing release
|
||||
RELEASE_ID=$(curl -s -X GET \
|
||||
-H "Authorization: token ${{ secrets.GITEATOKEN }}" \
|
||||
"https://git.nationtech.io/api/v1/repos/nationtech/harmony/releases/tags/${TAG_NAME}" \
|
||||
| jq -r '.id // empty')
|
||||
|
||||
if [ -z "$RELEASE_ID" ]; then
|
||||
# Create new release
|
||||
RESPONSE=$(curl -s -X POST \
|
||||
-H "Authorization: token ${{ secrets.GITEATOKEN }}" \
|
||||
-H "Content-Type: application/json" \
|
||||
-d "{
|
||||
\"tag_name\": \"${TAG_NAME}\",
|
||||
\"name\": \"${TAG_NAME}\",
|
||||
\"body\": \"Release ${TAG_NAME}\",
|
||||
\"draft\": false,
|
||||
\"prerelease\": true
|
||||
}" \
|
||||
"https://git.nationtech.io/api/v1/repos/nationtech/harmony/releases")
|
||||
RELEASE_ID=$(echo "$RESPONSE" | jq -r '.id')
|
||||
fi
|
||||
|
||||
echo "RELEASE_ID=$RELEASE_ID" >> $GITHUB_ENV
|
||||
|
||||
- name: Upload harmony_agent ARM binary
|
||||
run: |
|
||||
curl -X POST \
|
||||
-H "Authorization: token ${{ secrets.GITEATOKEN }}" \
|
||||
-H "Content-Type: application/octet-stream" \
|
||||
--data-binary "@target/aarch64-unknown-linux-gnu/release/harmony_agent" \
|
||||
"https://git.nationtech.io/api/v1/repos/nationtech/harmony/releases/${{ env.RELEASE_ID }}/assets?name=harmony_agent-aarch64-linux"
|
||||
|
||||
- name: Upload harmony_inventory_agent ARM binary
|
||||
run: |
|
||||
curl -X POST \
|
||||
-H "Authorization: token ${{ secrets.GITEATOKEN }}" \
|
||||
-H "Content-Type: application/octet-stream" \
|
||||
--data-binary "@target/aarch64-unknown-linux-gnu/release/harmony_inventory_agent" \
|
||||
"https://git.nationtech.io/api/v1/repos/nationtech/harmony/releases/${{ env.RELEASE_ID }}/assets?name=harmony_inventory_agent-aarch64-linux"
|
||||
1
Cargo.lock
generated
1
Cargo.lock
generated
@@ -3640,6 +3640,7 @@ dependencies = [
|
||||
"cidr",
|
||||
"env_logger",
|
||||
"getrandom 0.3.4",
|
||||
"harmony",
|
||||
"harmony_macros",
|
||||
"harmony_types",
|
||||
"log",
|
||||
|
||||
@@ -1,43 +0,0 @@
|
||||
#!/bin/sh
|
||||
set -e
|
||||
|
||||
cd "$(dirname "$0")/.."
|
||||
|
||||
# Cross-compile agent crates for aarch64 (Raspberry Pi)
|
||||
#
|
||||
# Prerequisites (Debian/Ubuntu):
|
||||
# sudo apt install gcc-aarch64-linux-gnu
|
||||
# rustup target add aarch64-unknown-linux-gnu
|
||||
#
|
||||
# Prerequisites (Arch Linux):
|
||||
# sudo pacman -S aarch64-linux-gnu-gcc
|
||||
# rustup target add aarch64-unknown-linux-gnu
|
||||
|
||||
TARGET="aarch64-unknown-linux-gnu"
|
||||
|
||||
echo "=== Cross-compiling for $TARGET ==="
|
||||
|
||||
# Check prerequisites
|
||||
if ! rustup target list --installed | grep -q "$TARGET"; then
|
||||
echo "ERROR: Rust target $TARGET not installed. Run: rustup target add $TARGET"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
if ! command -v aarch64-linux-gnu-gcc > /dev/null 2>&1; then
|
||||
echo "ERROR: aarch64-linux-gnu-gcc not found. Install the cross-compilation toolchain."
|
||||
echo " Debian/Ubuntu: sudo apt install gcc-aarch64-linux-gnu"
|
||||
echo " Arch Linux: sudo pacman -S aarch64-linux-gnu-gcc"
|
||||
exit 1
|
||||
fi
|
||||
|
||||
echo "--- Building harmony_agent ---"
|
||||
cargo build --release --target "$TARGET" -p harmony_agent
|
||||
|
||||
echo "--- Building harmony_inventory_agent ---"
|
||||
cargo build --release --target "$TARGET" -p harmony_inventory_agent
|
||||
|
||||
echo ""
|
||||
echo "=== Build complete ==="
|
||||
echo "Binaries:"
|
||||
echo " target/$TARGET/release/harmony_agent"
|
||||
echo " target/$TARGET/release/harmony_inventory_agent"
|
||||
117
docs/adr/021-agent-desired-state-convergence.md
Normal file
117
docs/adr/021-agent-desired-state-convergence.md
Normal file
@@ -0,0 +1,117 @@
|
||||
# ADR-021: Agent Desired-State Convergence — Problem Statement and Initial Proposal
|
||||
|
||||
**Status:** Proposed (under review — see ADR-022 for alternatives)
|
||||
**Date:** 2026-04-09
|
||||
|
||||
> This document was originally drafted as an "Accepted" ADR describing a shell-command executor. On review, the team was not convinced that the shell-executor shape is the right one. It has been re-framed as a **problem statement + one candidate proposal (Alternative A)**. Alternative designs — including a mini-kubelet model and an embedded-Score model — are explored in [ADR-022](./022-agent-desired-state-alternatives.md). A final decision has **not** been made.
|
||||
|
||||
## Context
|
||||
|
||||
The Harmony Agent (ADR-016) currently handles a single use case: PostgreSQL HA failover via `DeploymentConfig::FailoverPostgreSQL`. For the IoT fleet management platform (Raspberry Pi clusters deployed in homes, offices, and community spaces), we need the agent to become a general-purpose desired-state convergence engine.
|
||||
|
||||
Concretely, the central Harmony control plane must be able to:
|
||||
|
||||
1. Express the *desired state* of an individual Pi (or a class of Pis) in a typed, serializable form.
|
||||
2. Ship that desired state to the device over the existing NATS JetStream mesh (ADR-017-1).
|
||||
3. Have the on-device agent reconcile toward it — idempotently, observably, and without manual intervention.
|
||||
4. Read back an authoritative, typed *actual state* so the control plane can report convergence, surface errors, and drive a fleet dashboard.
|
||||
|
||||
The existing heartbeat / failover machinery (ADR-017-3) remains valuable — it proves the agent can maintain persistent NATS connections, do CAS writes against KV, and react to state changes. Whatever desired-state mechanism we add **extends** that foundation rather than replacing it.
|
||||
|
||||
### Design forces
|
||||
|
||||
- **Coherence with the rest of Harmony.** Harmony's entire identity is Score-Topology-Interpret with compile-time safety. A desired-state mechanism that reintroduces stringly-typed, runtime-validated blobs on the edge would be a regression from our own design rules (see `CLAUDE.md`: "Capabilities are industry concepts, not tools", "Scores encapsulate operational complexity", "Scores must be idempotent").
|
||||
- **The "mini-kubelet" framing.** The team is converging on a mental model where the agent is a stripped-down kubelet: it owns a set of local reconcilers, maintains a PLEG-like state machine per managed resource, and converges toward a declarative manifest. ADR-017-3 is already explicitly Kubernetes-inspired for staleness detection. This framing should inform the desired-state design, not fight it.
|
||||
- **Speed to IoT MVP.** We need something shippable soon enough that real Pi fleets can be demoed. Over-engineering the v1 risks never shipping; under-engineering it risks a rewrite once the wrong abstraction is entrenched on hundreds of devices in the field.
|
||||
- **Security.** Whatever lands on the device is, by construction, running with the agent's privileges. A mechanism that reduces to "run this shell string as root" is a very wide blast radius.
|
||||
- **Serializability.** Today, Harmony Scores are *not* uniformly serializable across the wire — many hold trait objects, closures, or references to live topologies. Any design that assumes "just send a Score" needs to confront this.
|
||||
|
||||
## Initial Proposal (Alternative A — Shell Command Executor)
|
||||
|
||||
This is the first-pass design, implemented as a happy-path scaffold on this branch. **It is presented here for critique, not as a settled decision.**
|
||||
|
||||
### Desired-State Model
|
||||
|
||||
Each agent watches a NATS KV key `desired-state.<agent-id>` for its workload definition. When the value changes, the agent executes the workload and reports the result to `actual-state.<agent-id>`. This is a pull-based convergence loop: the control plane writes intent, the agent converges, the control plane reads the result.
|
||||
|
||||
A `DesiredState` is a serializable description of what should be running on the device. For this first iteration, it is a shell command plus a monotonic generation counter.
|
||||
|
||||
```rust
|
||||
enum DeploymentConfig {
|
||||
FailoverPostgreSQL(FailoverCNPGConfig), // existing
|
||||
DesiredState(DesiredStateConfig), // new
|
||||
}
|
||||
|
||||
struct DesiredStateConfig {
|
||||
command: String,
|
||||
generation: u64,
|
||||
}
|
||||
```
|
||||
|
||||
### Config Flow
|
||||
|
||||
```
|
||||
Central Platform NATS JetStream Agent (Pi)
|
||||
================ ============== ==========
|
||||
1. Write desired state -------> KV: desired-state.<agent-id>
|
||||
2. Watch detects change
|
||||
3. Execute workload
|
||||
4. Write result --------> KV: actual-state.<agent-id>
|
||||
5. Read actual state <------- KV: actual-state.<agent-id>
|
||||
```
|
||||
|
||||
The agent's heartbeat loop continues independently. The desired-state watcher runs as a separate async task, sharing the same NATS connection. This separation means a slow command execution does not block heartbeats.
|
||||
|
||||
### State Reporting
|
||||
|
||||
```rust
|
||||
struct ActualState {
|
||||
agent_id: Id,
|
||||
generation: u64, // mirrors the desired-state generation
|
||||
status: ExecutionStatus, // Success, Failed, Running
|
||||
stdout: String,
|
||||
stderr: String,
|
||||
exit_code: Option<i32>,
|
||||
executed_at: u64,
|
||||
}
|
||||
```
|
||||
|
||||
The control plane reads this key to determine convergence. If `actual_state.generation == desired_state.generation` and `status == Success`, the device has converged.
|
||||
|
||||
### Why this shape was chosen first
|
||||
|
||||
- Dirt cheap to implement (≈200 lines, done on this branch).
|
||||
- Works for literally any task a human would type into a Pi shell.
|
||||
- Reuses the existing NATS KV infrastructure and CAS write idiom already proven by the heartbeat loop.
|
||||
- Provides an end-to-end demo path in under a day.
|
||||
|
||||
## Open Questions and Concerns
|
||||
|
||||
The following concerns block promoting this to an "Accepted" decision:
|
||||
|
||||
1. **Wrong abstraction level.** `sh -c "<string>"` is the *opposite* of what Harmony stands for. Harmony exists because IaC tools drown in stringly-typed, runtime-validated config. Shipping arbitrary shell to the edge recreates that problem inside our own agent — at the worst possible place (the device).
|
||||
2. **No idempotency.** `systemctl start foo` and `apt install foo` are not idempotent by themselves. Every Score in Harmony is required to be idempotent. A shell executor pushes that burden onto whoever writes the commands, where we cannot check it.
|
||||
3. **No resource model.** There is no notion of "this manifest owns this systemd unit". When desired state changes, we cannot compute a diff, we cannot garbage-collect the old resource, and we cannot surface "drift" meaningfully. We know generation N was "run"; we do not know what it left behind.
|
||||
4. **No typed status.** `stdout`/`stderr`/`exit_code` is not enough to drive a fleet dashboard. We want typed `Status { container: Running { since, restarts }, unit: Active, file: PresentAt(sha256) }`.
|
||||
5. **No lifecycle.** Shell commands are fire-and-forget. A kubelet-shaped agent needs to know whether a resource is *still* healthy after it was created — liveness and readiness are first-class concerns, not a post-hoc `exit_code` check.
|
||||
6. **Security.** The ADR hand-waves "NATS ACLs + future signing". In practice, v1 lets anyone with write access to the KV bucket execute anything as the agent user. Even with NATS ACLs, the *shape* of the API invites abuse; a typed manifest with an allowlist of resource types has a much narrower attack surface by construction.
|
||||
7. **Generational model is too coarse.** A single `generation: u64` per agent means we can only describe one monolithic "job". Real fleet state is a *set* of resources (this container, this unit, this file). We need per-resource generations, or a manifest-level generation with a sub-resource status map.
|
||||
8. **Incoherent with ADR-017-3's kubelet framing.** That ADR deliberately borrowed K8s vocabulary (staleness, fencing, leader promotion) because kubelet-like semantics are the right ones for resilient edge workloads. Shell-exec abandons that lineage at the first opportunity.
|
||||
9. **Coherence with the Score-Topology-Interpret pattern.** Today's proposal introduces a parallel concept ("DesiredStateConfig") that has nothing to do with Score or Topology. If a Pi is just "a topology with a small capability set" (systemd, podman, files, network), then the right thing to ship is a Score, not a shell string.
|
||||
|
||||
## Status of the Implementation on this Branch
|
||||
|
||||
The happy-path code in `harmony_agent/src/desired_state.rs` (≈250 lines, fully tested) implements Alternative A. It is **scaffolding**, not a committed design:
|
||||
|
||||
- It is useful as a vehicle to prove out the NATS KV watch + typed `ActualState` CAS write pattern, both of which are reusable regardless of which alternative we pick.
|
||||
- It should **not** be wired into user-facing tooling until the architectural decision in ADR-022 is made.
|
||||
- If we adopt Alternative B (mini-kubelet) or C (embedded Scores), the shell executor either becomes one *variant* of a typed `Resource` enum (a `ShellJob` resource, clearly labeled as an escape hatch) or is deleted outright.
|
||||
|
||||
## Next Steps
|
||||
|
||||
1. Review ADR-022 (alternatives + recommendation).
|
||||
2. Pick a target design.
|
||||
3. Either:
|
||||
- Rework `desired_state.rs` to match the chosen target, **or**
|
||||
- Keep it behind a feature flag as a demo fallback while the real design is built.
|
||||
4. Re-file this ADR as "Superseded by ADR-022" or update it in place with the accepted design.
|
||||
218
docs/adr/022-agent-desired-state-alternatives.md
Normal file
218
docs/adr/022-agent-desired-state-alternatives.md
Normal file
@@ -0,0 +1,218 @@
|
||||
# ADR-022: Agent Desired-State — Alternatives and Recommendation
|
||||
|
||||
**Status:** Proposed
|
||||
**Date:** 2026-04-09
|
||||
**Supersedes (candidate):** ADR-021 shell-executor proposal
|
||||
|
||||
## Context
|
||||
|
||||
ADR-021 drafted a first-pass "desired-state convergence" mechanism for the Harmony Agent (ADR-016) in the form of a shell-command executor. On review, that shape raised serious concerns (see ADR-021 §"Open Questions and Concerns"): it is incoherent with Harmony's Score-Topology-Interpret pattern, it is not idempotent, it has no resource model, no typed status, no lifecycle, and it weakens the agent's security posture.
|
||||
|
||||
Separately, the team has been converging on a **"mini-kubelet" framing** for the IoT agent:
|
||||
|
||||
- The agent owns a small, fixed set of *reconcilers*, one per resource type it can manage (systemd unit, container, file, network interface, overlay config...).
|
||||
- The desired state is a *typed manifest* — a bag of resources with identities, generations, and typed status.
|
||||
- The agent runs reconcile loops similar to kubelet's Pod Lifecycle Event Generator (PLEG): for each managed resource, observe actual, compare to desired, apply the minimum delta, update typed status.
|
||||
- Failure and drift are first-class. "I tried, it failed, here is why" is a valid steady state.
|
||||
|
||||
ADR-017-3 already borrows Kubernetes vocabulary (staleness, fencing, promotion) on purpose. Doubling down on the kubelet metaphor at the desired-state layer is the natural continuation, not a tangent.
|
||||
|
||||
This ADR enumerates the candidate designs, argues their tradeoffs honestly, and recommends a path.
|
||||
|
||||
## Alternatives
|
||||
|
||||
### Alternative A — Shell Command Executor (ADR-021 as-is)
|
||||
|
||||
**Shape:** `DesiredState { command: String, generation: u64 }`, agent does `sh -c $command`, pipes stdout/stderr/exit into `ActualState`.
|
||||
|
||||
**Pros:**
|
||||
- Trivial to implement. ~200 LOC, already on this branch.
|
||||
- Works for any task that can be expressed as a shell pipeline — maximum flexibility at v1.
|
||||
- Zero new abstractions: reuses existing NATS KV watch + CAS patterns.
|
||||
- End-to-end demo-able in an afternoon.
|
||||
|
||||
**Cons:**
|
||||
- **Wrong abstraction level.** Harmony's entire thesis is "no more stringly-typed YAML/shell mud pits". This design ships that mud pit *to the edge*.
|
||||
- **Not idempotent.** The burden of idempotency falls on whoever writes the command string. `systemctl start foo` run twice is fine; `apt install foo && echo "done" >> /etc/state` run twice is broken. We cannot enforce correctness.
|
||||
- **No resource model.** No concept of "this manifest owns X". No diffing, no GC, no drift detection, no "what does this agent currently run?".
|
||||
- **No typed status.** stdout/stderr/exit_code does not tell a fleet dashboard "container nginx is running, restarted 3 times, last healthy 2s ago". It tells it "this bash ran and exited 0 once, three minutes ago".
|
||||
- **No lifecycle.** Fire-and-forget; post-exit the agent has no notion of whether the resource is still healthy.
|
||||
- **Security.** Even with NATS ACLs, the API's *shape* invites abuse. Any bug in the control plane that lets a user influence a desired-state write equals RCE on every Pi.
|
||||
- **Incoherent with ADR-017-3 and Score-Topology-Interpret.** Introduces a parallel concept that has nothing to do with the rest of Harmony.
|
||||
|
||||
**Verdict:** Acceptable only as a *named escape hatch* inside a richer design (a `ShellJob` resource variant, explicitly labeled as such and audited). Not acceptable as the whole design.
|
||||
|
||||
---
|
||||
|
||||
### Alternative B — Mini-Kubelet with Typed Resource Manifests
|
||||
|
||||
**Shape:** The agent owns a fixed set of `Resource` variants and one reconciler per variant.
|
||||
|
||||
```rust
|
||||
/// The unit of desired state shipped to an agent.
|
||||
/// Serialized to JSON, pushed via NATS KV to `desired-state.<agent-id>`.
|
||||
struct AgentManifest {
|
||||
generation: u64, // monotonic, control-plane assigned
|
||||
resources: Vec<ManagedResource>,
|
||||
}
|
||||
|
||||
struct ManagedResource {
|
||||
/// Stable, manifest-unique identity. Used for diffing across generations.
|
||||
id: ResourceId,
|
||||
spec: ResourceSpec,
|
||||
}
|
||||
|
||||
enum ResourceSpec {
|
||||
SystemdUnit(SystemdUnitSpec), // ensure unit exists, enabled, active
|
||||
Container(ContainerSpec), // podman/docker run with image, env, volumes
|
||||
File(FileSpec), // path, mode, owner, content (hash or inline)
|
||||
NetworkConfig(NetworkConfigSpec), // interface, addresses, routes
|
||||
ShellJob(ShellJobSpec), // explicit escape hatch, audited separately
|
||||
// ...extend carefully
|
||||
}
|
||||
|
||||
/// What the agent reports back.
|
||||
struct AgentStatus {
|
||||
manifest_generation: u64, // which desired-state gen this reflects
|
||||
observed_generation: u64, // highest gen the agent has *processed*
|
||||
resources: HashMap<ResourceId, ResourceStatus>,
|
||||
conditions: Vec<AgentCondition>, // Ready, Degraded, Reconciling, ...
|
||||
}
|
||||
|
||||
enum ResourceStatus {
|
||||
Pending,
|
||||
Reconciling { since: Timestamp },
|
||||
Ready { since: Timestamp, details: ResourceReadyDetails },
|
||||
Failed { since: Timestamp, error: String, retry_after: Option<Timestamp> },
|
||||
}
|
||||
```
|
||||
|
||||
Each reconciler implements a small trait:
|
||||
|
||||
```rust
|
||||
trait Reconciler {
|
||||
type Spec;
|
||||
type Status;
|
||||
async fn observe(&self, id: &ResourceId) -> Result<Self::Status>;
|
||||
async fn reconcile(&self, id: &ResourceId, spec: &Self::Spec) -> Result<Self::Status>;
|
||||
async fn delete(&self, id: &ResourceId) -> Result<()>;
|
||||
}
|
||||
```
|
||||
|
||||
The agent loop becomes:
|
||||
|
||||
1. Watch `desired-state.<agent-id>` for the latest `AgentManifest`.
|
||||
2. On change, compute diff vs. observed set: additions, updates, deletions.
|
||||
3. Dispatch each resource to its reconciler. Reconcilers are idempotent by contract.
|
||||
4. Aggregate per-resource status into `AgentStatus`, write to `actual-state.<agent-id>` via CAS.
|
||||
5. Re-run periodically to detect drift even when desired state has not changed (PLEG-equivalent).
|
||||
|
||||
**Pros:**
|
||||
- **Declarative and idempotent by construction.** Reconcilers are required to be idempotent; the contract is enforced in Rust traits, not in docs.
|
||||
- **Typed status.** Dashboards, alerts, and the control plane get structured data.
|
||||
- **Drift detection.** Periodic re-observation catches "someone SSH'd in and stopped the service".
|
||||
- **Lifecycle.** Each resource has a clear state machine; health is a first-class concept.
|
||||
- **Coherent with ADR-017-3.** The kubelet framing becomes literal, not metaphorical.
|
||||
- **Narrow attack surface.** The agent only knows how to do a handful of well-audited things. Adding a new capability is an explicit code change, not a new shell string.
|
||||
- **Composable with Harmony's existing philosophy.** `ManagedResource` is to the agent what a Score is to a Topology, at a smaller scale.
|
||||
|
||||
**Cons:**
|
||||
- More upfront design. Each reconciler needs to be written and tested.
|
||||
- Requires us to *commit* to a resource type set and its status schema. Adding a new kind is a versioned change to the wire format.
|
||||
- Duplicates, at the edge, some of the vocabulary already present in Harmony's Score layer (e.g., `FileDeployment`, container deployments). Risk of two parallel abstractions evolving in tension.
|
||||
- Harder to demo in a single afternoon.
|
||||
|
||||
**Verdict:** Strong candidate. Matches the team's mini-kubelet intuition directly.
|
||||
|
||||
---
|
||||
|
||||
### Alternative C — Embedded Score Interpreter on the Agent
|
||||
|
||||
**Shape:** The desired state *is* a Harmony Score (or a set of Scores), serialized and pushed via NATS. The agent hosts a local `PiTopology` that exposes a small, carefully chosen set of capabilities (`SystemdHost`, `ContainerRuntime`, `FileSystemHost`, `NetworkConfigurator`, ...). The agent runs the Score's `interpret` against that local topology.
|
||||
|
||||
```rust
|
||||
// On the control plane:
|
||||
let score = SystemdServiceScore { ... };
|
||||
let wire: SerializedScore = score.to_wire()?;
|
||||
nats.put(format!("desired-state.{agent_id}"), wire).await?;
|
||||
|
||||
// On the agent:
|
||||
let score = SerializedScore::decode(payload)?;
|
||||
let topology = PiTopology::new();
|
||||
let outcome = score.interpret(&inventory, &topology).await?;
|
||||
// Outcome is already a typed Harmony result (SUCCESS/NOOP/FAILURE/RUNNING/...).
|
||||
```
|
||||
|
||||
**Pros:**
|
||||
- **Zero new abstractions.** The agent becomes "a Harmony executor that happens to run on a Pi". Everything we already know how to do in Harmony works, for free.
|
||||
- **Maximum coherence.** There is exactly one way to describe desired state in the whole system: a Score. The type system enforces that a score requesting `K8sclient` cannot be shipped to a Pi topology that does not offer it — at compile time on the control plane, at deserialization time on the agent.
|
||||
- **Composability.** Higher-order topologies (ADR-015) work unchanged: `FailoverTopology<PiTopology>` gets you HA at the edge for free.
|
||||
- **Single mental model for the whole team.** "Write a Score" is already the Harmony primitive; no one needs to learn a second one.
|
||||
|
||||
**Cons:**
|
||||
- **Serializability.** This is the hard one. Harmony Scores today hold trait objects, references to live topology state, and embedded closures in places. Making them uniformly serde-serializable is a non-trivial refactor that touches dozens of modules. We would be gating the IoT MVP on a cross-cutting refactor.
|
||||
- **Agent binary size.** If "the agent can run any Score", it links every module. On a Pi Zero 2 W, that matters. We can mitigate with feature flags, but then we are back to "which scores does *this* agent support?" — i.e., we have reinvented resource-type registration, just spelled differently.
|
||||
- **Capability scoping is subtle.** We have to be extremely careful about which capabilities `PiTopology` exposes. "A Pi can run containers" is true; "a Pi can run arbitrary k8s clusters" is not. Getting that boundary wrong opens the same attack surface as Alternative A, just hidden behind a Score.
|
||||
- **Control-plane UX.** The central platform now needs to instantiate Scores for specific Pis, handle their inventories, and ship them. That is heavier than "push a JSON blob".
|
||||
|
||||
**Verdict:** The principled end state, almost certainly where we want to be in 18 months. Not shippable for the IoT MVP.
|
||||
|
||||
---
|
||||
|
||||
### Alternative D — Hybrid: Typed Manifests Now, Scores Later
|
||||
|
||||
**Shape:** Ship Alternative B (typed `AgentManifest` with a fixed set of reconcilers). Keep the Score ambition (Alternative C) as an explicit roadmap item. When Scores become uniformly wire-serializable and `PiTopology` is mature, migrate by adding a `ResourceSpec::Score(SerializedScore)` variant. Eventually that variant may subsume the others.
|
||||
|
||||
**Pros:**
|
||||
- **Shippable soon.** Alternative B is the implementable core; we can have a fleet demo in weeks, not months.
|
||||
- **On a path to the ideal.** We do not dead-end. The `ResourceSpec` enum becomes the migration seam.
|
||||
- **De-risks the Score serialization refactor.** We learn what resource types we *actually* need on the edge before we refactor the Score layer.
|
||||
- **Lets us delete Alternative A cleanly.** The shell executor either disappears or survives as a narrow, explicitly-audited `ResourceSpec::ShellJob` variant that documents itself as an escape hatch.
|
||||
|
||||
**Cons:**
|
||||
- Temporarily maintains two vocabularies (`ResourceSpec` at the edge, `Score` in the core). There is a risk they drift before they reconverge.
|
||||
- Requires team discipline to actually do the C migration and not leave B as the permanent design.
|
||||
|
||||
**Verdict:** Recommended.
|
||||
|
||||
---
|
||||
|
||||
## Recommendation
|
||||
|
||||
**Adopt Alternative D (Hybrid: typed manifests now, Scores later).**
|
||||
|
||||
Reasoning:
|
||||
|
||||
1. **Speed to IoT MVP** is real. Alternative C is a 3-6 month refactor of the Score layer before we can deploy anything; Alternative B can ship within the current iteration.
|
||||
2. **Long-term coherence with Harmony's design philosophy** is preserved because D has an explicit migration seam to C. We do not paint ourselves into a corner.
|
||||
3. **The mini-kubelet framing is directly satisfied by B.** Typed resources, reconciler loops, observed-generation pattern, PLEG-style drift detection. This is exactly what the team has been describing.
|
||||
4. **Capability-trait discipline carries over cleanly.** `Reconciler` is the agent-side analog of a capability trait (`DnsServer`, `K8sclient`, etc.). The rule "capabilities are industry concepts, not tools" applies to `ResourceSpec` too: we name it `Container`, not `Podman`; `SystemdUnit`, not `Systemctl`.
|
||||
5. **The shell executor is not wasted work.** It proved the NATS KV watch + typed CAS write pattern that Alternative B will also need. It becomes either `ResourceSpec::ShellJob` (audited escape hatch) or gets deleted.
|
||||
6. **Security posture improves immediately.** A fixed resource-type allowlist is dramatically tighter than "run any shell", even before we add signing or sandboxing.
|
||||
7. **The IoT product use case actually is "deploy simple workloads to Pi fleets".** Containers, systemd services, config files, network config. That is a short list, and it maps to four or five resource types. We do not need the full expressive power of a Score layer to hit the product milestone.
|
||||
|
||||
## Specific Findings on the Current Implementation
|
||||
|
||||
`harmony_agent/src/desired_state.rs` (≈250 lines, implemented on this branch):
|
||||
|
||||
- **Keep as scaffolding**, do not wire into user tooling.
|
||||
- The NATS KV watch loop, the `ActualState` CAS write, and the generation-tracking skeleton are all reusable by Alternative B. They are the only parts worth keeping.
|
||||
- The `execute_command` function (shelling out via `Command::new("sh").arg("-c")`) is the part that bakes in the wrong abstraction. It should be:
|
||||
1. **Moved behind a `ResourceSpec::ShellJob` reconciler** if we decide to keep shell as an explicit, audited escape hatch, **or**
|
||||
2. **Deleted** when the first two real reconcilers (Container, SystemdUnit) land.
|
||||
- The `DesiredStateConfig` / `ActualState` types in `harmony_agent/src/agent/config.rs` are too narrow. They should be replaced by `AgentManifest` / `AgentStatus` as sketched above. `generation: u64` at the manifest level stays; per-resource status is added.
|
||||
- The existing tests (`executes_command_and_reports_result`, `reports_failure_for_bad_command`) are testing the shell executor specifically; they will be deleted or repurposed when the resource model lands.
|
||||
|
||||
## Open Questions (to resolve before implementing B)
|
||||
|
||||
1. **What is the minimum viable resource type set for the IoT MVP?** Proposal: `Container`, `SystemdUnit`, `File`. Defer `NetworkConfig`, `ShellJob` until a concrete use case appears.
|
||||
2. **Where does `AgentManifest` live in the crate graph?** It is consumed by both the control plane and the agent. Likely `harmony_agent_types` (new) or an existing shared types crate.
|
||||
3. **How are images, files, and secrets referenced?** By content hash + asset store URL (ADR: `harmony_assets`)? By inline payload under a size cap?
|
||||
4. **What is the reconcile cadence?** On NATS KV change + periodic drift check every N seconds? What is N on a Pi?
|
||||
5. **How does `AgentStatus` interact with the heartbeat loop?** Is the status written on every reconcile, or aggregated into the heartbeat payload? The heartbeat cares about liveness; the status cares about workload health. They are probably separate KV keys, coupled by generation.
|
||||
6. **How do we handle partial failures and retry?** Exponential backoff per resource? Global pause on repeated failures? Surface to the control plane via `conditions`?
|
||||
7. **Can the agent refuse a manifest it does not understand?** (Forward compatibility: new `ResourceSpec` variant rolled out before the agent upgrade.) Proposal: fail loudly and report a typed `UnknownResource` status so the control plane can detect version skew.
|
||||
|
||||
## Decision
|
||||
|
||||
**None yet.** This ADR is explicitly a proposal to adopt **Alternative D**, pending team review. If approved, a follow-up ADR-023 will specify the concrete `AgentManifest` / `AgentStatus` schema and the initial reconciler set.
|
||||
@@ -6,6 +6,7 @@ readme.workspace = true
|
||||
license.workspace = true
|
||||
|
||||
[dependencies]
|
||||
harmony = { path = "../harmony" }
|
||||
# harmony_cli = { path = "../harmony_cli" }
|
||||
harmony_types = { path = "../harmony_types" }
|
||||
harmony_macros = { path = "../harmony_macros" }
|
||||
|
||||
@@ -2,6 +2,7 @@ use std::time::Duration;
|
||||
|
||||
use harmony_types::id::Id;
|
||||
use log::info;
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use super::heartbeat::HeartbeatFailure;
|
||||
use super::role::AgentRole;
|
||||
@@ -46,6 +47,9 @@ pub struct AgentConfig {
|
||||
#[derive(Debug, Clone)]
|
||||
pub enum DeploymentConfig {
|
||||
FailoverPostgreSQL(FailoverCNPGConfig),
|
||||
/// Desired-state convergence mode: agent watches a NATS KV key for commands
|
||||
/// to execute and reports results back. See ADR-021.
|
||||
DesiredState,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone)]
|
||||
@@ -53,6 +57,38 @@ pub struct FailoverCNPGConfig {
|
||||
pub cnpg_cluster_name: String,
|
||||
}
|
||||
|
||||
/// The desired state pushed by the central platform to an agent via NATS KV.
|
||||
/// Key: `desired-state.<agent-id>`
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct DesiredStateConfig {
|
||||
/// Shell command to execute (passed to `sh -c`)
|
||||
pub command: String,
|
||||
/// Monotonic generation counter. The agent only executes when this changes.
|
||||
pub generation: u64,
|
||||
}
|
||||
|
||||
/// The actual state reported by the agent after executing a desired-state command.
|
||||
/// Key: `actual-state.<agent-id>`
|
||||
#[derive(Debug, Clone, Serialize, Deserialize)]
|
||||
pub struct ActualState {
|
||||
pub agent_id: String,
|
||||
/// Mirrors the generation from the DesiredStateConfig that was executed
|
||||
pub generation: u64,
|
||||
pub status: ExecutionStatus,
|
||||
pub stdout: String,
|
||||
pub stderr: String,
|
||||
pub exit_code: Option<i32>,
|
||||
/// Timestamp (ms since epoch) when the command was executed, from agent clock (informational)
|
||||
pub executed_at: u64,
|
||||
}
|
||||
|
||||
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
|
||||
pub enum ExecutionStatus {
|
||||
Success,
|
||||
Failed,
|
||||
Running,
|
||||
}
|
||||
|
||||
impl DeploymentConfig {
|
||||
/// The actual "work" of the heartbeat (e.g., write to NATS, check Postgres)
|
||||
pub async fn perform_heartbeat(&self) -> Result<(), HeartbeatFailure> {
|
||||
@@ -62,6 +98,11 @@ impl DeploymentConfig {
|
||||
// TODO: Implement actual PG check / NATS write here
|
||||
Ok(())
|
||||
}
|
||||
DeploymentConfig::DesiredState => {
|
||||
// In desired-state mode, heartbeat just proves liveness.
|
||||
// Workload execution happens in the separate desired-state watcher task.
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@@ -18,7 +18,10 @@ pub mod heartbeat;
|
||||
mod role;
|
||||
|
||||
// Re-exports for backwards compatibility
|
||||
pub use config::{AgentConfig, DeploymentConfig, FailoverCNPGConfig};
|
||||
pub use config::{
|
||||
ActualState, AgentConfig, DeploymentConfig, DesiredStateConfig, ExecutionStatus,
|
||||
FailoverCNPGConfig,
|
||||
};
|
||||
pub use heartbeat::{AgentHeartbeat, AgentInfo, ClusterStateData, HeartbeatFailure};
|
||||
pub use role::AgentRole;
|
||||
|
||||
|
||||
246
harmony_agent/src/desired_state.rs
Normal file
246
harmony_agent/src/desired_state.rs
Normal file
@@ -0,0 +1,246 @@
|
||||
//! Desired-state convergence watcher (ADR-021).
|
||||
//!
|
||||
//! Polls a NATS KV key `desired-state.<agent-id>` for workload definitions.
|
||||
//! When the generation changes, executes the command and writes the result
|
||||
//! to `actual-state.<agent-id>`.
|
||||
|
||||
use std::sync::Arc;
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use log::{debug, error, info, warn};
|
||||
use tokio::process::Command;
|
||||
|
||||
use crate::agent::{ActualState, DesiredStateConfig, ExecutionStatus};
|
||||
use crate::store::{KvStore, KvStoreError};
|
||||
|
||||
/// Runs the desired-state convergence loop for a single agent.
|
||||
///
|
||||
/// This task polls the KV store for changes to the desired state and executes
|
||||
/// commands when the generation counter advances. It is designed to run as a
|
||||
/// separate tokio task alongside the heartbeat loop.
|
||||
pub async fn run_desired_state_watcher<S>(
|
||||
agent_id: String,
|
||||
kv: Arc<S>,
|
||||
poll_interval: Duration,
|
||||
) where
|
||||
S: KvStore + Send + Sync + 'static,
|
||||
{
|
||||
let desired_key = format!("desired-state.{agent_id}");
|
||||
let actual_key = format!("actual-state.{agent_id}");
|
||||
let mut last_generation: Option<u64> = None;
|
||||
|
||||
info!("Desired-state watcher started for agent {agent_id}, polling key: {desired_key}");
|
||||
|
||||
loop {
|
||||
match fetch_desired_state(&kv, &desired_key).await {
|
||||
Ok(Some(desired)) => {
|
||||
if last_generation == Some(desired.generation) {
|
||||
debug!(
|
||||
"Desired state generation {} already executed, skipping",
|
||||
desired.generation
|
||||
);
|
||||
} else {
|
||||
info!(
|
||||
"New desired state detected: generation={}, command='{}'",
|
||||
desired.generation, desired.command
|
||||
);
|
||||
|
||||
let result = execute_command(&agent_id, &desired).await;
|
||||
|
||||
match store_actual_state(&kv, &actual_key, &result).await {
|
||||
Ok(_) => {
|
||||
info!(
|
||||
"Actual state stored: generation={}, status={:?}, exit_code={:?}",
|
||||
result.generation, result.status, result.exit_code
|
||||
);
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to store actual state: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
last_generation = Some(desired.generation);
|
||||
}
|
||||
}
|
||||
Ok(None) => {
|
||||
debug!("No desired state found for {agent_id}, waiting...");
|
||||
}
|
||||
Err(e) => {
|
||||
warn!("Error fetching desired state for {agent_id}: {e}");
|
||||
}
|
||||
}
|
||||
|
||||
tokio::time::sleep(poll_interval).await;
|
||||
}
|
||||
}
|
||||
|
||||
async fn fetch_desired_state<S>(
|
||||
kv: &Arc<S>,
|
||||
key: &str,
|
||||
) -> Result<Option<DesiredStateConfig>, KvStoreError>
|
||||
where
|
||||
S: KvStore + Send + Sync,
|
||||
{
|
||||
match kv.get(key).await {
|
||||
Ok(result) => {
|
||||
if let Some(value) = result.value {
|
||||
let config: DesiredStateConfig =
|
||||
serde_json::from_value(value).map_err(|e| KvStoreError::DeserializationFailed {
|
||||
deserialization_error: e.to_string(),
|
||||
value: String::new(),
|
||||
})?;
|
||||
Ok(Some(config))
|
||||
} else {
|
||||
Ok(None)
|
||||
}
|
||||
}
|
||||
Err(KvStoreError::KeyNotAvailable(_)) => Ok(None),
|
||||
Err(e) => Err(e),
|
||||
}
|
||||
}
|
||||
|
||||
async fn execute_command(agent_id: &str, desired: &DesiredStateConfig) -> ActualState {
|
||||
let now = SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.expect("Time went backwards")
|
||||
.as_millis() as u64;
|
||||
|
||||
info!("Executing command for generation {}: sh -c '{}'", desired.generation, desired.command);
|
||||
|
||||
let output = Command::new("sh")
|
||||
.arg("-c")
|
||||
.arg(&desired.command)
|
||||
.output()
|
||||
.await;
|
||||
|
||||
match output {
|
||||
Ok(output) => {
|
||||
let exit_code = output.status.code();
|
||||
let status = if output.status.success() {
|
||||
ExecutionStatus::Success
|
||||
} else {
|
||||
ExecutionStatus::Failed
|
||||
};
|
||||
|
||||
let stdout = String::from_utf8_lossy(&output.stdout).to_string();
|
||||
let stderr = String::from_utf8_lossy(&output.stderr).to_string();
|
||||
|
||||
if !stdout.is_empty() {
|
||||
debug!("stdout: {stdout}");
|
||||
}
|
||||
if !stderr.is_empty() {
|
||||
warn!("stderr: {stderr}");
|
||||
}
|
||||
|
||||
ActualState {
|
||||
agent_id: agent_id.to_string(),
|
||||
generation: desired.generation,
|
||||
status,
|
||||
stdout,
|
||||
stderr,
|
||||
exit_code,
|
||||
executed_at: now,
|
||||
}
|
||||
}
|
||||
Err(e) => {
|
||||
error!("Failed to spawn command: {e}");
|
||||
ActualState {
|
||||
agent_id: agent_id.to_string(),
|
||||
generation: desired.generation,
|
||||
status: ExecutionStatus::Failed,
|
||||
stdout: String::new(),
|
||||
stderr: format!("Failed to spawn command: {e}"),
|
||||
exit_code: None,
|
||||
executed_at: now,
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
async fn store_actual_state<S>(
|
||||
kv: &Arc<S>,
|
||||
key: &str,
|
||||
state: &ActualState,
|
||||
) -> Result<(), KvStoreError>
|
||||
where
|
||||
S: KvStore + Send + Sync,
|
||||
{
|
||||
let value = serde_json::to_value(state).map_err(|e| KvStoreError::DeserializationFailed {
|
||||
deserialization_error: e.to_string(),
|
||||
value: format!("{state:?}"),
|
||||
})?;
|
||||
|
||||
// Use get to find current sequence, then set_strict for safe writes.
|
||||
// If the key doesn't exist yet, use sequence 0 for the first write.
|
||||
let expected_seq = match kv.get(key).await {
|
||||
Ok(result) => result.metadata.sequence,
|
||||
Err(KvStoreError::KeyNotAvailable(_)) => 0,
|
||||
Err(e) => return Err(e),
|
||||
};
|
||||
|
||||
kv.set_strict(key, value, expected_seq).await?;
|
||||
Ok(())
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
use crate::store::InMemoryKvStore;
|
||||
use serde_json::json;
|
||||
|
||||
#[tokio::test]
|
||||
async fn executes_command_and_reports_result() {
|
||||
let kv = Arc::new(InMemoryKvStore::new());
|
||||
let agent_id = "test-agent";
|
||||
let desired_key = format!("desired-state.{agent_id}");
|
||||
let actual_key = format!("actual-state.{agent_id}");
|
||||
|
||||
// Write a desired state
|
||||
let desired = DesiredStateConfig {
|
||||
command: "echo hello".to_string(),
|
||||
generation: 1,
|
||||
};
|
||||
let value = serde_json::to_value(&desired).unwrap();
|
||||
kv.set_strict(&desired_key, value, 0).await.unwrap();
|
||||
|
||||
// Fetch and execute
|
||||
let fetched = fetch_desired_state(&kv, &desired_key).await.unwrap().unwrap();
|
||||
assert_eq!(fetched.generation, 1);
|
||||
assert_eq!(fetched.command, "echo hello");
|
||||
|
||||
let result = execute_command(agent_id, &fetched).await;
|
||||
assert_eq!(result.status, ExecutionStatus::Success);
|
||||
assert_eq!(result.stdout.trim(), "hello");
|
||||
assert_eq!(result.generation, 1);
|
||||
assert_eq!(result.exit_code, Some(0));
|
||||
|
||||
// Store actual state
|
||||
store_actual_state(&kv, &actual_key, &result).await.unwrap();
|
||||
|
||||
// Verify stored value
|
||||
let stored = kv.get(&actual_key).await.unwrap();
|
||||
let actual: ActualState = serde_json::from_value(stored.value.unwrap()).unwrap();
|
||||
assert_eq!(actual.generation, 1);
|
||||
assert_eq!(actual.status, ExecutionStatus::Success);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn reports_failure_for_bad_command() {
|
||||
let desired = DesiredStateConfig {
|
||||
command: "exit 42".to_string(),
|
||||
generation: 2,
|
||||
};
|
||||
|
||||
let result = execute_command("test-agent", &desired).await;
|
||||
assert_eq!(result.status, ExecutionStatus::Failed);
|
||||
assert_eq!(result.exit_code, Some(42));
|
||||
assert_eq!(result.generation, 2);
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn returns_none_when_no_desired_state() {
|
||||
let kv = Arc::new(InMemoryKvStore::new());
|
||||
let result = fetch_desired_state(&kv, "desired-state.nonexistent").await.unwrap();
|
||||
assert!(result.is_none());
|
||||
}
|
||||
}
|
||||
@@ -9,6 +9,7 @@ use crate::{
|
||||
|
||||
// mod agent_loop;
|
||||
mod agent;
|
||||
pub mod desired_state;
|
||||
pub mod store;
|
||||
mod workflow;
|
||||
|
||||
|
||||
Reference in New Issue
Block a user