The previous e2e harness handrolled k8s manifests in `stack.rs`,
bypassing the Score-Topology-Interpret machinery harmony exists to
provide. This commit:
1. **ADR-023** codifies the rules: deploy with Scores (not
manifests), e2e uses the same Scores as production, one Score
per component, deploy blocks on smoke-test success, deploy logic
lives in `*-deploy` crates, topologies are compile-time,
thiserror over anyhow. CLAUDE.md mirrors the principles.
2. **New `fleet/harmony-fleet-deploy` crate** is the canonical home
for fleet-component Scores:
- `FleetOperatorScore` + helm-chart generator + `install_crds`
moved out of `harmony::modules::fleet::operator` (they should
never have lived in `harmony` core). `FleetServerScore`
(composite of NATS + operator + Zitadel + callout) moved too.
- New `FleetNatsScore` (preset over `NatsHelmChartScore` with
fleet's required values; v1 supports `UserPass` auth, callout
mode reserved on the public API for PR 1.5).
- New `FleetAgentScore` with `FleetAgentTarget::Pod`; `Vm`
target is a future variant that absorbs `FleetDeviceSetupScore`.
- `harmony-fleet-deploy` binary built on the existing
`harmony_cli` crate — no new CLI scaffolding.
3. **Operator runtime binary trimmed**: `Install` and `Chart`
subcommands removed; both jobs now belong to
`harmony-fleet-deploy`. The runtime binary becomes leaner.
4. **E2E harness rewritten** as a thin Score composer:
`harmony-fleet-e2e/src/stack.rs` deploys the stack via
`FleetNatsScore` + `FleetAgentScore`. The inline NATS manifest
factory and the bespoke agent Pod renderer are gone.
- Bring-up runs once per test binary via `shared_stack` +
`tokio::sync::OnceCell` (matches the `fleet_e2e_demo` pattern).
- Stale `e2e-*` namespaces from prior runs get pruned at
startup so the leaks the OnceCell creates don't compound.
5. **`thiserror` for the agent's `CommandServer`** — replaces the
anyhow-based surface with typed `CommandError` /
`CommandServerError`.
6. **Memory** captures eight load-bearing principles (saved to
`~/.claude/projects/.../memory/`) so future sessions don't drift
back into manifest-handrolling.
Verified: `cargo test -p harmony-fleet-e2e --test ping` green
end-to-end against k3d in 25s warm.
154 lines
5.4 KiB
Rust
154 lines
5.4 KiB
Rust
//! Agent-side request/reply command server.
|
|
//!
|
|
//! Subscribes to `device-commands.<device_id>.>` and dispatches one
|
|
//! handler per verb. Single-shot replies for v1; streaming verbs
|
|
//! (logs, exec follow-up) will reuse this loop and write multiple
|
|
//! frames to the inbox, terminating with the `X-Harmony-Final`
|
|
//! header.
|
|
//!
|
|
//! Runs alongside the KV reconciler in the agent's top-level
|
|
//! `tokio::select!`. Independent of the podman runtime: when
|
|
//! `[agent] runtime_enabled = false`, the reconciler is skipped but
|
|
//! the command server still runs (ping is useful for "is this device
|
|
//! online" health-checks regardless).
|
|
|
|
use std::sync::Arc;
|
|
use std::time::Instant;
|
|
|
|
use async_nats::Client;
|
|
use async_nats::Subject;
|
|
use futures_util::StreamExt;
|
|
use harmony_reconciler_contracts::{
|
|
HDR_REQUEST_ID, Id, PingReply, Verb, device_command_subscription,
|
|
};
|
|
use serde::Serialize;
|
|
use thiserror::Error;
|
|
|
|
pub struct CommandServer {
|
|
device_id: Id,
|
|
client: Client,
|
|
agent_version: &'static str,
|
|
started_at: Instant,
|
|
}
|
|
|
|
impl CommandServer {
|
|
pub fn new(device_id: Id, client: Client) -> Self {
|
|
Self {
|
|
device_id,
|
|
client,
|
|
agent_version: env!("CARGO_PKG_VERSION"),
|
|
started_at: Instant::now(),
|
|
}
|
|
}
|
|
|
|
pub async fn run(self: Arc<Self>) -> Result<(), CommandServerError> {
|
|
let subject = device_command_subscription(&self.device_id.to_string());
|
|
tracing::info!(subject = %subject, "command server subscribing");
|
|
let mut sub = self.client.subscribe(subject.clone()).await.map_err(|e| {
|
|
CommandServerError::Subscribe {
|
|
subject: subject.clone(),
|
|
source: e,
|
|
}
|
|
})?;
|
|
while let Some(msg) = sub.next().await {
|
|
let me = self.clone();
|
|
tokio::spawn(async move {
|
|
match me.dispatch(msg).await {
|
|
Ok(()) => tracing::debug!("command handled"),
|
|
Err(e) => {
|
|
tracing::error!(command_error = %e, "failed to handle command")
|
|
}
|
|
};
|
|
});
|
|
}
|
|
tracing::warn!("command server subscription ended");
|
|
Ok(())
|
|
}
|
|
|
|
async fn dispatch(&self, msg: async_nats::Message) -> Result<(), CommandError> {
|
|
// Subject token after the device id is the verb. Pattern is
|
|
// `device-commands.<id>.<verb>` — we own both ends so this
|
|
// unwrap shape is safe under normal routing.
|
|
// FIXME do not unwrap here, we cannot affoard to crash an entire fleet because a verb is
|
|
// added or removed or format changed. Log an error and move on maybe we could list supported verbs.
|
|
let verb_token = if let Some(verb) = msg.subject.rsplit('.').next() {
|
|
verb
|
|
} else {
|
|
return Err(CommandError::InvalidFormat(msg.subject.to_string()));
|
|
};
|
|
let request_id = msg
|
|
.headers
|
|
.as_ref()
|
|
.and_then(|h| h.get(HDR_REQUEST_ID))
|
|
.map(|v| v.as_str().to_string());
|
|
tracing::debug!(
|
|
subject = %msg.subject,
|
|
verb = %verb_token,
|
|
request_id = ?request_id,
|
|
"command dispatch",
|
|
);
|
|
|
|
let reply_to = match msg.reply.clone() {
|
|
Some(inbox) => inbox,
|
|
None => {
|
|
tracing::warn!(verb = %verb_token, "command without reply inbox; ignoring");
|
|
return Err(CommandError::MissingReplyInbox);
|
|
}
|
|
};
|
|
|
|
if verb_token == Verb::Ping.as_subject_token() {
|
|
self.reply_ping(reply_to).await?;
|
|
Ok(())
|
|
} else {
|
|
tracing::warn!(verb = %verb_token, "unknown command verb");
|
|
Err(CommandError::UnknownVerb(verb_token.to_string()))
|
|
}
|
|
}
|
|
|
|
async fn reply_ping(&self, reply_to: Subject) -> Result<(), CommandError> {
|
|
let reply = PingReply {
|
|
device_id: self.device_id.clone(),
|
|
agent_version: self.agent_version.to_string(),
|
|
uptime_s: self.started_at.elapsed().as_secs(),
|
|
};
|
|
let payload = serde_json::to_vec(&reply).map_err(CommandError::SerializeReply)?;
|
|
self.client
|
|
.publish(reply_to, payload.into())
|
|
.await
|
|
.map_err(|e| CommandError::PublishReply(e.to_string()))
|
|
}
|
|
}
|
|
|
|
/// Failure modes the per-message dispatcher can report. Stays
|
|
/// `pub(crate)` for now — the run loop logs and continues on each
|
|
/// variant rather than surfacing them to a caller.
|
|
#[derive(Debug, Error, Serialize)]
|
|
pub(crate) enum CommandError {
|
|
#[error("invalid command subject: {0}")]
|
|
InvalidFormat(String),
|
|
#[error("unknown verb: {0}")]
|
|
UnknownVerb(String),
|
|
#[error("command message had no reply inbox")]
|
|
MissingReplyInbox,
|
|
#[error("serializing reply: {0}")]
|
|
// `serde_json::Error` is not `Serialize`, so flatten on the
|
|
// serialize-out path. The original error stays in `Display`.
|
|
#[serde(skip)]
|
|
SerializeReply(serde_json::Error),
|
|
#[error("publishing reply: {0}")]
|
|
PublishReply(String),
|
|
}
|
|
|
|
/// Surface returned by [`CommandServer::run`]. The only currently
|
|
/// failing operation is the initial subscribe; per-message errors
|
|
/// stay inside the loop and are logged.
|
|
#[derive(Debug, Error)]
|
|
pub enum CommandServerError {
|
|
#[error("subscribing to {subject}")]
|
|
Subscribe {
|
|
subject: String,
|
|
#[source]
|
|
source: async_nats::SubscribeError,
|
|
},
|
|
}
|