Files
harmony/fleet/harmony-fleet-agent/src/command_server.rs
Jean-Gabriel Gill-Couture 020ebcb1f9 refactor(fleet): deploy-architecture cleanup per ADR-023 — Scores everywhere, deploy crate, principles in CLAUDE.md
The previous e2e harness handrolled k8s manifests in `stack.rs`,
bypassing the Score-Topology-Interpret machinery harmony exists to
provide. This commit:

1. **ADR-023** codifies the rules: deploy with Scores (not
   manifests), e2e uses the same Scores as production, one Score
   per component, deploy blocks on smoke-test success, deploy logic
   lives in `*-deploy` crates, topologies are compile-time,
   thiserror over anyhow. CLAUDE.md mirrors the principles.

2. **New `fleet/harmony-fleet-deploy` crate** is the canonical home
   for fleet-component Scores:
   - `FleetOperatorScore` + helm-chart generator + `install_crds`
     moved out of `harmony::modules::fleet::operator` (they should
     never have lived in `harmony` core). `FleetServerScore`
     (composite of NATS + operator + Zitadel + callout) moved too.
   - New `FleetNatsScore` (preset over `NatsHelmChartScore` with
     fleet's required values; v1 supports `UserPass` auth, callout
     mode reserved on the public API for PR 1.5).
   - New `FleetAgentScore` with `FleetAgentTarget::Pod`; `Vm`
     target is a future variant that absorbs `FleetDeviceSetupScore`.
   - `harmony-fleet-deploy` binary built on the existing
     `harmony_cli` crate — no new CLI scaffolding.

3. **Operator runtime binary trimmed**: `Install` and `Chart`
   subcommands removed; both jobs now belong to
   `harmony-fleet-deploy`. The runtime binary becomes leaner.

4. **E2E harness rewritten** as a thin Score composer:
   `harmony-fleet-e2e/src/stack.rs` deploys the stack via
   `FleetNatsScore` + `FleetAgentScore`. The inline NATS manifest
   factory and the bespoke agent Pod renderer are gone.
   - Bring-up runs once per test binary via `shared_stack` +
     `tokio::sync::OnceCell` (matches the `fleet_e2e_demo` pattern).
   - Stale `e2e-*` namespaces from prior runs get pruned at
     startup so the leaks the OnceCell creates don't compound.

5. **`thiserror` for the agent's `CommandServer`** — replaces the
   anyhow-based surface with typed `CommandError` /
   `CommandServerError`.

6. **Memory** captures eight load-bearing principles (saved to
   `~/.claude/projects/.../memory/`) so future sessions don't drift
   back into manifest-handrolling.

Verified: `cargo test -p harmony-fleet-e2e --test ping` green
end-to-end against k3d in 25s warm.
2026-05-18 22:54:50 -04:00

154 lines
5.4 KiB
Rust

//! Agent-side request/reply command server.
//!
//! Subscribes to `device-commands.<device_id>.>` and dispatches one
//! handler per verb. Single-shot replies for v1; streaming verbs
//! (logs, exec follow-up) will reuse this loop and write multiple
//! frames to the inbox, terminating with the `X-Harmony-Final`
//! header.
//!
//! Runs alongside the KV reconciler in the agent's top-level
//! `tokio::select!`. Independent of the podman runtime: when
//! `[agent] runtime_enabled = false`, the reconciler is skipped but
//! the command server still runs (ping is useful for "is this device
//! online" health-checks regardless).
use std::sync::Arc;
use std::time::Instant;
use async_nats::Client;
use async_nats::Subject;
use futures_util::StreamExt;
use harmony_reconciler_contracts::{
HDR_REQUEST_ID, Id, PingReply, Verb, device_command_subscription,
};
use serde::Serialize;
use thiserror::Error;
pub struct CommandServer {
device_id: Id,
client: Client,
agent_version: &'static str,
started_at: Instant,
}
impl CommandServer {
pub fn new(device_id: Id, client: Client) -> Self {
Self {
device_id,
client,
agent_version: env!("CARGO_PKG_VERSION"),
started_at: Instant::now(),
}
}
pub async fn run(self: Arc<Self>) -> Result<(), CommandServerError> {
let subject = device_command_subscription(&self.device_id.to_string());
tracing::info!(subject = %subject, "command server subscribing");
let mut sub = self.client.subscribe(subject.clone()).await.map_err(|e| {
CommandServerError::Subscribe {
subject: subject.clone(),
source: e,
}
})?;
while let Some(msg) = sub.next().await {
let me = self.clone();
tokio::spawn(async move {
match me.dispatch(msg).await {
Ok(()) => tracing::debug!("command handled"),
Err(e) => {
tracing::error!(command_error = %e, "failed to handle command")
}
};
});
}
tracing::warn!("command server subscription ended");
Ok(())
}
async fn dispatch(&self, msg: async_nats::Message) -> Result<(), CommandError> {
// Subject token after the device id is the verb. Pattern is
// `device-commands.<id>.<verb>` — we own both ends so this
// unwrap shape is safe under normal routing.
// FIXME do not unwrap here, we cannot affoard to crash an entire fleet because a verb is
// added or removed or format changed. Log an error and move on maybe we could list supported verbs.
let verb_token = if let Some(verb) = msg.subject.rsplit('.').next() {
verb
} else {
return Err(CommandError::InvalidFormat(msg.subject.to_string()));
};
let request_id = msg
.headers
.as_ref()
.and_then(|h| h.get(HDR_REQUEST_ID))
.map(|v| v.as_str().to_string());
tracing::debug!(
subject = %msg.subject,
verb = %verb_token,
request_id = ?request_id,
"command dispatch",
);
let reply_to = match msg.reply.clone() {
Some(inbox) => inbox,
None => {
tracing::warn!(verb = %verb_token, "command without reply inbox; ignoring");
return Err(CommandError::MissingReplyInbox);
}
};
if verb_token == Verb::Ping.as_subject_token() {
self.reply_ping(reply_to).await?;
Ok(())
} else {
tracing::warn!(verb = %verb_token, "unknown command verb");
Err(CommandError::UnknownVerb(verb_token.to_string()))
}
}
async fn reply_ping(&self, reply_to: Subject) -> Result<(), CommandError> {
let reply = PingReply {
device_id: self.device_id.clone(),
agent_version: self.agent_version.to_string(),
uptime_s: self.started_at.elapsed().as_secs(),
};
let payload = serde_json::to_vec(&reply).map_err(CommandError::SerializeReply)?;
self.client
.publish(reply_to, payload.into())
.await
.map_err(|e| CommandError::PublishReply(e.to_string()))
}
}
/// Failure modes the per-message dispatcher can report. Stays
/// `pub(crate)` for now — the run loop logs and continues on each
/// variant rather than surfacing them to a caller.
#[derive(Debug, Error, Serialize)]
pub(crate) enum CommandError {
#[error("invalid command subject: {0}")]
InvalidFormat(String),
#[error("unknown verb: {0}")]
UnknownVerb(String),
#[error("command message had no reply inbox")]
MissingReplyInbox,
#[error("serializing reply: {0}")]
// `serde_json::Error` is not `Serialize`, so flatten on the
// serialize-out path. The original error stays in `Display`.
#[serde(skip)]
SerializeReply(serde_json::Error),
#[error("publishing reply: {0}")]
PublishReply(String),
}
/// Surface returned by [`CommandServer::run`]. The only currently
/// failing operation is the initial subscribe; per-message errors
/// stay inside the loop and are logged.
#[derive(Debug, Error)]
pub enum CommandServerError {
#[error("subscribing to {subject}")]
Subscribe {
subject: String,
#[source]
source: async_nats::SubscribeError,
},
}