feat(fleet-deploy): log-tail contract as a Score companion #295

Open
johnride wants to merge 2 commits from feat/v0-3-logs-companion into feat/smoke-test-contract
18 changed files with 1570 additions and 1582 deletions

2
Cargo.lock generated
View File

@@ -4086,7 +4086,9 @@ name = "harmony-fleet-deploy"
version = "0.1.0"
dependencies = [
"anyhow",
"async-nats",
"async-trait",
"chrono",
"clap",
"env_logger",
"harmony",

View File

@@ -25,7 +25,9 @@ harmony-fleet-auth = { path = "../harmony-fleet-auth" }
harmony-reconciler-contracts = { path = "../../harmony-reconciler-contracts" }
anyhow = { workspace = true }
async-nats = { workspace = true }
async-trait = { workspace = true }
chrono = { workspace = true, features = ["serde"] }
clap = { workspace = true }
k8s-openapi = { workspace = true }
kube = { workspace = true, features = ["runtime", "derive"] }

View File

@@ -0,0 +1,220 @@
//! The value a [`LogQuery`](super::LogQuery) returns: a chunk of log
//! lines with the metadata the dashboard needs to render it honestly.
//!
//! Kept as a pure data type — no transport, no async, no IO. The
//! transport layer (the `PodmanLogQuery` impl, the agent's command
//! handler) constructs and consumes it; the dashboard renders it.
//! That separation makes the trait easy to mock for unit tests and
//! easy to evolve (e.g. add `level` per line) without churning every
//! call site.
use chrono::{DateTime, Utc};
use thiserror::Error;
/// Maximum byte length of a [`LogSource`] string. Bounded so a
/// misbehaving agent can't OOM the dashboard with a multi-megabyte
/// "source" field.
pub const LOG_SOURCE_MAX_BYTES: usize = 128;
/// Stable identifier for the log stream a [`LogChunk`] came from
/// (e.g. `podman:my-deployment/web`).
///
/// **Validated newtype**: the value originates on a NATS reply from
/// an untrusted-until-authenticated agent, so we enforce shape
/// constraints at the deserialization boundary. Rules:
///
/// - non-empty
/// - no ASCII control characters (`\0`..`\x1f`, `\x7f`)
/// - at most [`LOG_SOURCE_MAX_BYTES`] bytes (UTF-8 byte length)
///
/// A bad value is surfaced via [`LogQueryError::InvalidReply`], not
/// folded into a generic decode failure — the dashboard logs the
/// rejected string for triage.
#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct LogSource(String);
impl LogSource {
pub fn try_new(name: impl Into<String>) -> Result<Self, InvalidLogSource> {
let s = name.into();
if s.is_empty() {
return Err(InvalidLogSource::Empty);
}
if s.len() > LOG_SOURCE_MAX_BYTES {
return Err(InvalidLogSource::TooLong {
bytes: s.len(),
max: LOG_SOURCE_MAX_BYTES,
});
}
if let Some(c) = s.chars().find(|c| c.is_control()) {
return Err(InvalidLogSource::ControlChar(c));
}
Ok(LogSource(s))
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for LogSource {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(&self.0)
}
}
#[derive(Debug, Error, PartialEq, Eq)]
pub enum InvalidLogSource {
#[error("log source name must not be empty")]
Empty,
#[error("log source name is {bytes} bytes; max is {max}")]
TooLong { bytes: usize, max: usize },
#[error("log source name contains control char {0:?}")]
ControlChar(char),
}
/// A bounded chunk of log lines fetched from a device.
///
/// "Chunk" is deliberate: this isn't "the logs" — it's the last N
/// lines, captured at one moment, in their original ordering. The
/// dashboard renders the chunk; refreshing fetches a new chunk. v0.4
/// will add a streaming variant; for v0.3 the customer's mental
/// model is "show me what just happened", which a chunk answers
/// directly.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LogChunk {
/// Stable identifier for the log stream this chunk came from
/// (e.g. `podman:my-deployment/web`). Validated by [`LogSource`]
/// at deserialization time — see that type's docs for the shape
/// constraints.
pub source: LogSource,
/// When the chunk was captured **on the device**, not when the
/// operator received it. Surfaces clock skew between operator and
/// device — "logs from 12 seconds ago" is honest, "logs as of
/// now" would be a lie.
pub captured_at: DateTime<Utc>,
/// Oldest-first. Empty when the container has produced no output
/// yet. Each entry is one container log line as the runtime
/// reported it; we do not split / merge / re-encode.
pub lines: Vec<String>,
/// `true` when the runtime had more output than fit in the chunk
/// (either the per-request size budget or the runtime's own tail
/// limit). Surfacing this lets the dashboard show a "showing last
/// N of more" hint instead of silently hiding lines.
pub truncated: bool,
}
impl LogChunk {
/// `true` when the device has produced no log lines yet for this
/// deployment. Distinct from "the device is offline" (that's a
/// transport error, not an empty chunk) — the dashboard renders
/// the two differently.
pub fn is_empty(&self) -> bool {
self.lines.is_empty()
}
/// How many lines this chunk carries.
pub fn len(&self) -> usize {
self.lines.len()
}
}
/// Why a log query failed.
///
/// Four arms, deliberately mapped to four different operator actions:
///
/// - [`LogQueryError::DeviceOffline`] — no agent listening. The
/// dashboard shows "device is offline"; retrying immediately is
/// pointless.
/// - [`LogQueryError::Timeout`] — agent didn't reply in time. Could
/// be a slow podman socket, a saturated network, or a hung agent.
/// The dashboard offers a retry button.
/// - [`LogQueryError::Agent`] — the agent replied but reported its
/// own failure (deployment not found, container has no log driver,
/// etc.). The dashboard renders the message verbatim — it's the
/// agent's responsibility to make these actionable.
/// - [`LogQueryError::BadReply`] — agent replied with bytes that
/// couldn't be deserialized. Indicates a version mismatch between
/// operator and agent; the dashboard shows "device protocol
/// mismatch — update the agent".
/// - [`LogQueryError::Transport`] — generic NATS transport failure
/// (publish failed, subscribe failed). Rare; usually a NATS server
/// problem, not a device problem.
///
/// We mirror the categorization that `FleetCommandsClient::CommandError`
/// uses for `Ping` — both share the same NATS request/reply substrate,
/// so callers benefit from a uniform error shape across verbs.
#[derive(Debug, Error)]
pub enum LogQueryError {
#[error("device offline (no agent listening on the logs subject)")]
DeviceOffline,
#[error("log query timed out after {0:?}")]
Timeout(std::time::Duration),
#[error("agent reported an error: {0}")]
Agent(String),
#[error("agent reply decode failed: {0}")]
BadReply(#[from] serde_json::Error),
#[error("nats transport: {0}")]
Transport(String),
/// The wire payload was syntactically valid but semantically
/// unusable — e.g. the agent returned a `source` string that
/// failed [`LogSource`] validation. We surface this as a
/// distinct arm (rather than folding into `BadReply`) so the
/// dashboard can log the bad value: it's almost always an agent
/// bug.
#[error("invalid reply contents: {0}")]
InvalidReply(String),
}
#[cfg(test)]
mod tests {
use super::*;
fn pn(s: &str) -> LogSource {
LogSource::try_new(s).expect("valid")
}
#[test]
fn empty_chunk_reports_is_empty() {
// "No lines yet" is a legal, common state — the dashboard
// renders it as "no output". A separate state from "device
// offline".
let chunk = LogChunk {
source: pn("podman:hello"),
captured_at: Utc::now(),
lines: vec![],
truncated: false,
};
assert!(chunk.is_empty());
assert_eq!(chunk.len(), 0);
}
#[test]
fn populated_chunk_preserves_line_order() {
// The trait contract is "oldest first". Reordering would
// misrender the dashboard (newest line at top of an
// oldest-first list is the *first* entry).
let chunk = LogChunk {
source: pn("podman:hello"),
captured_at: Utc::now(),
lines: vec!["a".into(), "b".into(), "c".into()],
truncated: false,
};
assert_eq!(chunk.lines, vec!["a", "b", "c"]);
assert_eq!(chunk.len(), 3);
assert!(!chunk.is_empty());
}
#[test]
fn truncated_flag_is_independent_of_emptiness() {
// A chunk can be both truncated and have lines (we cut the
// head) — assert both states coexist.
let chunk = LogChunk {
source: pn("podman:hello"),
captured_at: Utc::now(),
lines: vec!["latest".into()],
truncated: true,
};
assert!(!chunk.is_empty());
assert!(chunk.truncated);
}
}

View File

@@ -0,0 +1,125 @@
//! Log-tail contract — the third Score companion (ADR-023 P7).
//!
//! ## The big picture
//!
//! ```text
//! ┌────────────┐ pair ┌─────────────┐ request ┌──────────┐
//! │ *Score │◀─type─────│ LogQuery │──────────────▶│ Agent │
//! └────────────┘ locked └─────────────┘ (NATS r/r) └──────────┘
//! │ │
//! └─── deserialize reply ◀─────┘
//! │
//! ▼
//! LogChunk
//! ```
//!
//! - [`LogQuery`] is the companion trait: pair a [`Score`] with a way
//! to fetch the last N lines of *that specific Score's* container
//! logs from the device that's running it. The pairing is type-level
//! via the `Score` associated type — the compiler refuses to call
//! `log_query.last_lines(&podman_score, …)` with a query whose
//! `Score = FleetOperatorScore`. Same trick the smoke contract uses
//! (see [`super::smoke::SmokeTest`]).
//! - [`LogChunk`] is the value the trait returns. It's a pure data
//! type — easy to serialize for the dashboard, easy to assert
//! against in tests, no transport types leak through.
//! - [`PodmanLogQuery`] is the only implementor in v0.3: it sends a
//! `device-commands.<id>.logs` request over NATS, deserializes the
//! reply into [`LogChunk`]. Agent-side handler + dashboard wiring
//! ship in a follow-up PR (see "Deferred" below).
//!
//! ## Why a companion and not a method on `Score`?
//!
//! Per ADR-023 P7, framework capabilities that "wrap" a Score (smoke,
//! planning, observability, log tail) attach as companions rather
//! than as new methods on `Score`/`Interpret`. The base trait surface
//! stays one method (`create_interpret`). Log tail is *something
//! paired with* a Score, not *something on* a Score — and a Score
//! without a `LogQuery` should render "no logs view" on the
//! dashboard, not become a compilation error.
//!
//! The orthogonal benefits, which mirror smoke's:
//!
//! - A Score can be deployed with one of several log backends (podman,
//! journald, k8s) without rewriting the Score itself.
//! - Test harnesses substitute a mock `LogQuery` for an end-to-end
//! probe of the dashboard's "View logs" path, without spinning up a
//! real device.
//! - Adding a "live tail (stream)" verb in v0.4 is an additive trait
//! method on `LogQuery`, not a breaking change to `Score`.
//!
//! ## Why sync `last_lines` and not a stream?
//!
//! v0.3 ships **single-shot** log tail only. Customer's mental model
//! is "I clicked View logs, show me the last 100 lines". That's a
//! `Future<LogChunk>`, not a `Stream<Item = String>`. The
//! request/reply NATS pattern (one inbox, one reply, then close) maps
//! 1:1 to that mental model and reuses the same dispatcher the
//! `Ping` verb already runs through.
//!
//! Live tail is genuinely useful — but it's a separate UX (the
//! dashboard needs scrollback management, the agent needs to attach
//! to a running container's log stream, the wire format needs frame
//! framing + final-marker handling) — and shipping it half-baked
//! would degrade the "View logs" experience the customer is asking
//! for *today*. v0.4 adds a `tail_stream` method to this trait,
//! additive.
//!
//! ## Wire shape (locked in this PR)
//!
//! Subject: `device-commands.<device_id>.logs` — exactly the existing
//! callout-permitted prefix (see
//! `harmony_reconciler_contracts::SUBJECT_PREFIX`). The deployment
//! name and the line count live in the JSON request body, not in the
//! subject, because:
//!
//! 1. The agent already subscribes to `device-commands.<id>.>`; the
//! permission template covers this subject without any callout
//! JWT change (zero ops surface).
//! 2. Putting the deployment in the subject would alias the verb
//! token, breaking
//! [`harmony_reconciler_contracts::Verb::as_subject_token`]'s
//! "verb is the trailing token" invariant.
//!
//! The request body is [`harmony_reconciler_contracts::LogsRequest`],
//! the reply body is [`harmony_reconciler_contracts::LogsReply`].
//! Those types live in `reconciler-contracts` (not here) so the
//! agent build, which must not depend on harmony, can serialize them.
//! This companion is a thin wrapper around that contract — it owns
//! transport, the contract crate owns the bytes.
//!
//! ## Deferred to follow-up PRs (in scope of v0.3)
//!
//! - **Agent-side command handler** for `Verb::Logs` in
//! `fleet/harmony-fleet-agent/src/command_server.rs`: parse the
//! `LogsRequest`, look up the deployment's container, run
//! `podman logs --tail N`, serialize a `LogsReply`. The
//! deployment→container resolution must reject names that don't
//! match `[a-zA-Z0-9_.-]{1,128}` before invoking the runtime —
//! defense-in-depth on top of `Command::arg` (which already
//! avoids shell parsing). That regex is *stricter* than
//! [`harmony_reconciler_contracts::DeploymentName`] on the wire,
//! reflecting that the agent's resolved container name has tighter
//! constraints than a deployment identifier.
//! - **Dashboard handler** at
//! `/deployments/<name>/devices/<id>/logs?lines=N` in
//! `fleet/harmony-fleet-operator/src/frontend/server.rs`:
//! instantiate a `PodmanLogQuery`, call `last_lines`, render the
//! `LogChunk` as maud.
//! - **E2E integration test** that drives a real podman container
//! end-to-end through this trait.
//!
//! Splitting this is a scope-discipline call: the trait + value +
//! NATS-transport impl + unit tests are merge-worthy on their own
//! because they lock the contract. Once merged, the agent handler PR
//! is a focused diff against `command_server.rs` and the dashboard
//! handler PR is a focused diff against `server.rs` — both small,
//! both reviewable, both backed by this PR's existing tests.
pub mod chunk;
pub mod podman;
pub mod query;
pub use chunk::{LogChunk, LogQueryError};
pub use podman::PodmanLogQuery;
pub use query::LogQuery;

View File

@@ -0,0 +1,378 @@
//! `PodmanLogQuery` — the NATS request/reply implementation of
//! [`LogQuery`] for a [`PodmanV0Score`].
//!
//! Transport: NATS request on
//! `device-commands.<device_id>.logs`. Body: a JSON
//! [`LogsRequest`] carrying the deployment name and a clamped line
//! count. Reply: a JSON [`LogsReply`] which we re-wrap into a
//! [`LogChunk`].
//!
//! The query is constructed with the device id and deployment name
//! it targets — those are the routing parameters that the dashboard
//! handler already has on the URL path
//! (`/deployments/<name>/devices/<id>/logs`). The `LogQuery` trait
//! method takes `&Self::Score` for the compile-time pairing
//! guarantee, but the wire request doesn't need the score body
//! itself (the agent reads its own deployment KV).
//!
//! Why not let the trait method take `device_id`/`deployment` as
//! parameters? Because the trait must stay backend-agnostic. A
//! future `K8sLogQuery` doesn't have a device id; it has a pod name.
//! Routing parameters belong on the implementor, not on the trait.
//!
//! ## Split between routing and transport
//!
//! [`LogQueryRouting`] is the pure routing value: device id +
//! deployment + line cap, no transport. It owns `subject()` and
//! `request_body()` so unit tests can verify the wire bytes without
//! constructing a NATS client. [`PodmanLogQuery`] adds the
//! `async_nats::Client` on top and implements [`LogQuery`].
use std::time::Duration;
use async_nats::Client;
use async_nats::error::Error as NatsError;
use async_trait::async_trait;
use harmony::modules::podman::{PodmanTopology, PodmanV0Score};
use harmony_reconciler_contracts::{
CommandRequest, DeploymentName, ErrorReply, Id, LOGS_MAX_LINES, LogsReply, LogsRequest, Verb,
device_command_subject,
};
use super::chunk::{LogChunk, LogQueryError, LogSource};
use super::query::LogQuery;
/// Default reply timeout for a single log query. 10 s gives a slow
/// podman socket (cold-cached image, busy device) room to breathe;
/// offline devices get short-circuited earlier by NATS's
/// `no_responders` reply, so the timeout only matters when the agent
/// is alive but slow.
pub const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10);
/// Routing parameters for one log query. Pure data — no NATS client,
/// no IO. Lets unit tests verify the wire bytes without a connected
/// client.
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct LogQueryRouting {
pub device_id: Id,
pub deployment: DeploymentName,
}
impl LogQueryRouting {
pub fn new(device_id: Id, deployment: DeploymentName) -> Self {
Self {
device_id,
deployment,
}
}
/// Build the NATS subject this query publishes to. Kept on the
/// routing value (not on the transport struct) so the agent-side
/// dispatcher tests in the follow-up PR can use the same helper
/// without pulling in a NATS client.
pub fn subject(&self) -> String {
device_command_subject(self.device_id.to_string().as_str(), Verb::Logs)
}
/// Serialize the wire body. Clamps `n` to [`LOGS_MAX_LINES`] so
/// a buggy "show all" dashboard button can't ask for an
/// unbounded transfer.
pub fn request_body(&self, n: usize) -> Vec<u8> {
let req = CommandRequest::Logs(LogsRequest {
deployment: self.deployment.clone(),
lines: clamp_lines(n),
});
serde_json::to_vec(&req).expect("CommandRequest is always serializable")
}
}
/// Pin the conversion `usize -> u32` to one place so the clamping
/// behavior is testable in isolation. We saturate at
/// [`LOGS_MAX_LINES`] both because (a) the agent will clamp anyway,
/// and (b) sending a request advertising a smaller `lines` value
/// than the agent's cap makes the protocol log easier to read.
fn clamp_lines(n: usize) -> u32 {
let n_u32 = u32::try_from(n).unwrap_or(LOGS_MAX_LINES);
n_u32.min(LOGS_MAX_LINES)
}
/// Operator-side log fetcher for a podman-runtime device.
///
/// Holds the [`LogQueryRouting`] for the device+deployment it
/// targets plus the connected NATS client; the dashboard builds one
/// per request from the URL parts, then calls
/// `last_lines(&score, &topology, n)`.
pub struct PodmanLogQuery {
nc: Client,
routing: LogQueryRouting,
timeout: Duration,
}
impl PodmanLogQuery {
pub fn new(nc: Client, device_id: Id, deployment: DeploymentName) -> Self {
Self {
nc,
routing: LogQueryRouting::new(device_id, deployment),
timeout: DEFAULT_TIMEOUT,
}
}
/// Override the default reply timeout. Tests use a very small
/// value to keep the suite fast; production sticks with
/// [`DEFAULT_TIMEOUT`].
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn routing(&self) -> &LogQueryRouting {
&self.routing
}
}
#[async_trait]
impl LogQuery<PodmanTopology> for PodmanLogQuery {
type Score = PodmanV0Score;
async fn last_lines(
&self,
_score: &Self::Score,
_topology: &PodmanTopology,
n: usize,
) -> Result<LogChunk, LogQueryError> {
let subject = self.routing.subject();
let payload = self.routing.request_body(n);
let fut = self.nc.request(subject, payload.into());
let resp = match tokio::time::timeout(self.timeout, fut).await {
Ok(Ok(msg)) => msg,
Ok(Err(err)) => return Err(map_request_error(err, self.timeout)),
Err(_) => return Err(LogQueryError::Timeout(self.timeout)),
};
decode_reply(&resp.payload)
}
}
/// Map an `async-nats` `RequestError` to our typed surface. Same
/// shape `FleetCommandsClient` uses for `Ping` — we propagate
/// `NoResponders` and `TimedOut` so the dashboard can render them
/// differently.
fn map_request_error(
err: NatsError<async_nats::client::RequestErrorKind>,
timeout: Duration,
) -> LogQueryError {
use async_nats::client::RequestErrorKind;
match err.kind() {
RequestErrorKind::NoResponders => LogQueryError::DeviceOffline,
RequestErrorKind::TimedOut => LogQueryError::Timeout(timeout),
_ => LogQueryError::Transport(err.to_string()),
}
}
/// Decode a reply payload. The agent can reply with either a
/// [`LogsReply`] (happy path) or an [`ErrorReply`] (the agent
/// successfully received the request but couldn't fulfil it). We try
/// `LogsReply` first because it's the common case; only if that
/// fails do we attempt `ErrorReply`. Falling all the way through
/// yields [`LogQueryError::InvalidReply`].
///
/// `pub(crate)` so unit tests can drive it without a live NATS
/// server. Not part of the public API — callers always go through
/// [`PodmanLogQuery::last_lines`].
pub(crate) fn decode_reply(bytes: &[u8]) -> Result<LogChunk, LogQueryError> {
// Happy path first — keeps the cold path cost on errors only.
if let Ok(reply) = serde_json::from_slice::<LogsReply>(bytes) {
let source = LogSource::try_new(reply.source.clone())
.map_err(|e| LogQueryError::InvalidReply(format!("invalid source: {e}")))?;
return Ok(LogChunk {
source,
captured_at: reply.captured_at,
lines: reply.lines,
truncated: reply.truncated,
});
}
if let Ok(err) = serde_json::from_slice::<ErrorReply>(bytes) {
return Err(LogQueryError::Agent(err.message));
}
// Neither shape matched — the agent is speaking a protocol we
// don't understand. The dashboard will render this as a version
// mismatch.
Err(LogQueryError::InvalidReply(format!(
"reply did not match LogsReply or ErrorReply ({} bytes)",
bytes.len()
)))
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::{DateTime, Utc};
use harmony_reconciler_contracts::{ErrorKind, ErrorReply};
fn dn(s: &str) -> DeploymentName {
DeploymentName::try_new(s).expect("valid")
}
fn routing() -> LogQueryRouting {
LogQueryRouting::new(Id::from("pi-42".to_string()), dn("hello-web"))
}
// -- clamp_lines ---------------------------------------------------------
#[test]
fn clamp_lines_caps_at_protocol_max() {
// Operators can call `last_lines(usize::MAX, …)` — the wire
// value must stay within the budget the agent enforces.
assert_eq!(clamp_lines(usize::MAX), LOGS_MAX_LINES);
assert_eq!(clamp_lines(LOGS_MAX_LINES as usize + 1), LOGS_MAX_LINES);
}
#[test]
fn clamp_lines_passes_through_small_values() {
assert_eq!(clamp_lines(0), 0);
assert_eq!(clamp_lines(1), 1);
assert_eq!(clamp_lines(100), 100);
}
// -- LogQueryRouting::subject() -----------------------------------------
#[test]
fn subject_matches_documented_format() {
// The subject the agent's existing `device-commands.<id>.>`
// subscription receives. If this drifts, the callout
// permission template stops covering the request — silent
// routing failure under JWT enforcement.
let r = routing();
assert_eq!(r.subject(), "device-commands.pi-42.logs");
}
// -- LogQueryRouting::request_body() ------------------------------------
#[test]
fn request_body_is_tagged_json_with_clamped_lines() {
// Locks the wire format. Older agents that learn `Verb::Logs`
// expect this exact shape on receipt.
let r = routing();
let body = r.request_body(50);
let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(parsed["verb"], "logs");
assert_eq!(parsed["deployment"], "hello-web");
assert_eq!(parsed["lines"], 50);
}
#[test]
fn request_body_clamps_oversized_n() {
// A dashboard with a buggy "show all" button must not get
// through unchecked. We clamp on the operator side too —
// defense in depth, and a more honest log message ("asked
// for 1000") than "asked for usize::MAX, agent capped".
let r = routing();
let body = r.request_body(usize::MAX);
let parsed: serde_json::Value = serde_json::from_slice(&body).unwrap();
assert_eq!(parsed["lines"], LOGS_MAX_LINES);
}
// -- decode_reply() ------------------------------------------------------
#[test]
fn decode_reply_unwraps_logs_reply_into_log_chunk() {
let reply = LogsReply {
source: "podman:hello-web/api".to_string(),
captured_at: DateTime::parse_from_rfc3339("2026-01-15T12:34:56Z")
.unwrap()
.with_timezone(&Utc),
lines: vec!["line a".into(), "line b".into()],
truncated: false,
};
let bytes = serde_json::to_vec(&reply).unwrap();
let chunk = decode_reply(&bytes).expect("valid LogsReply must decode");
assert_eq!(chunk.source.as_str(), "podman:hello-web/api");
assert_eq!(chunk.lines, vec!["line a", "line b"]);
assert!(!chunk.truncated);
}
#[test]
fn decode_reply_carries_truncated_flag() {
// The dashboard renders "showing last N of more" off this
// flag. If we accidentally drop it on the wire, that hint
// disappears and customers think they have full output.
let reply = LogsReply {
source: "podman:hello-web/api".to_string(),
captured_at: Utc::now(),
lines: vec!["only".into()],
truncated: true,
};
let bytes = serde_json::to_vec(&reply).unwrap();
let chunk = decode_reply(&bytes).unwrap();
assert!(chunk.truncated);
}
#[test]
fn decode_reply_surfaces_agent_error_reply_as_agent_variant() {
// The agent can fail without crashing — e.g. the deployment
// exists in KV but the container hasn't started yet. That
// must surface as `Agent(message)` so the dashboard renders
// the agent's own description, not "device offline".
let err = ErrorReply {
kind: ErrorKind::BadRequest,
message: "deployment hello-web has no containers running".into(),
};
let bytes = serde_json::to_vec(&err).unwrap();
let result = decode_reply(&bytes);
match result {
Err(LogQueryError::Agent(msg)) => {
assert!(msg.contains("hello-web"));
}
other => panic!("expected Agent variant, got {other:?}"),
}
}
#[test]
fn decode_reply_rejects_invalid_source_name() {
// A malicious or buggy agent could return a `source` with
// control characters that would corrupt the dashboard.
// LogSource validation catches this — we surface it as
// InvalidReply, not a panic, and not a successful chunk.
let bad = serde_json::json!({
"source": "bad\nname",
"captured_at": "2026-01-15T12:00:00Z",
"lines": [],
"truncated": false,
});
let bytes = serde_json::to_vec(&bad).unwrap();
match decode_reply(&bytes) {
Err(LogQueryError::InvalidReply(_)) => {}
other => panic!("expected InvalidReply, got {other:?}"),
}
}
#[test]
fn decode_reply_falls_through_to_invalid_for_unknown_shape() {
// Neither LogsReply nor ErrorReply — a future verb's reply
// accidentally routed here, or an upgraded agent talking a
// newer protocol. The dashboard shows "protocol mismatch".
let bytes = br#"{"completely":"unrelated"}"#;
match decode_reply(bytes) {
Err(LogQueryError::InvalidReply(_)) => {}
other => panic!("expected InvalidReply, got {other:?}"),
}
}
// -- LogQuery pairing (compile-time) ------------------------------------
/// Compile-time test: `PodmanLogQuery::Score == PodmanV0Score`.
/// If a future refactor changes the associated type without
/// updating callers, this test fails to compile — which is
/// exactly the compile-time-safety guarantee the companion
/// pattern exists to provide.
#[test]
fn paired_score_type_is_podman_v0_score() {
fn assert_pair<Q>()
where
Q: LogQuery<PodmanTopology, Score = PodmanV0Score>,
{
}
assert_pair::<PodmanLogQuery>();
}
}

View File

@@ -0,0 +1,66 @@
//! The `LogQuery` trait — companion contract for "fetch the last N
//! lines of this Score's container logs".
//!
//! The trait pairs a [`Score`] with a way to query its logs, with the
//! pairing locked at the type level via the `Score` associated type.
//! Same mechanism the smoke contract uses; same compile-time
//! guarantee that callers can't pass a `PodmanLogQuery` to a
//! `FleetOperatorScore`.
//!
//! Object safety: kept *not* object-safe on purpose. Callers in v0.3
//! always know the concrete Score type at the call site (the
//! dashboard handler is `Score = PodmanV0Score` at compile time —
//! topologies are compile-time per ADR-023 P6). When a second Score
//! type starts shipping logs (e.g. k8s `Deployment`), we add another
//! impl, not a `Box<dyn LogQuery>`.
use async_trait::async_trait;
use harmony::score::Score;
use harmony::topology::Topology;
use super::chunk::{LogChunk, LogQueryError};
/// Pair a [`Score`] with a way to fetch the last N lines of its
/// container logs.
///
/// Implementors live next to the transport they use:
/// [`super::PodmanLogQuery`] for NATS request/reply against the fleet
/// agent's podman runtime. Future impls (`K8sLogQuery`,
/// `JournaldLogQuery`) follow the same shape.
///
/// Why a trait and not a free function? Because mocking is the point
/// of this companion. The dashboard handler takes
/// `&dyn LogQuery<…>` (statically dispatched in v0.3, but the trait
/// shape is mock-friendly): the e2e test substitutes a
/// `FixtureLogQuery` that returns a canned `LogChunk` without needing
/// a real device, a real NATS server, or a real podman socket. That
/// makes the dashboard's "View logs" path testable in seconds, not
/// minutes.
#[async_trait]
pub trait LogQuery<T: Topology>: Send + Sync {
/// The Score this query is paired with. Type-locked so the
/// compiler refuses `podman.last_lines(&operator_score, …)` —
/// the wrong Score type doesn't even compile.
type Score: Score<T>;
/// Fetch the most recent `n` lines of `score`'s container logs.
///
/// Semantics:
/// - Returns oldest-first lines, up to `n`.
/// - If the runtime has fewer than `n` lines, returns all of them
/// with `LogChunk::truncated = false`.
/// - If the per-request size budget cuts the response,
/// `LogChunk::truncated = true`.
/// - `n == 0` is legal and returns an empty chunk. Callers should
/// prefer a sensible default (e.g. 100) at the UI layer rather
/// than relying on this edge case.
///
/// Failure modes are categorized via [`LogQueryError`].
async fn last_lines(
&self,
score: &Self::Score,
topology: &T,
n: usize,
) -> Result<LogChunk, LogQueryError>;
}

View File

@@ -13,13 +13,19 @@
//! - [`AgentObservation`] — derived view of a fleet agent's KV watch
//! filter, computed from a typed `AgentConfig`. Used by the e2e
//! harness to write desired-state keys the agent will pick up.
//! - [`smoke`] — the smoke-test contract. `Probe` trait, `SmokeSuite`
//! composition, `SmokeTest` companion, and the `deploy`/
//! `deploy_with_smoke` wrappers that bind a Score's interpret to
//! an optional smoke run. Implements ADR-023 P4 ("deploy returns
//! only after smoke-test success") via the companion seam from P7.
//! - [`smoke`] — the smoke-test contract. One `SmokeTest` trait with
//! a single `verify` method returning a `SmokeReport`, plus
//! `deploy` / `deploy_with_smoke` free functions. Implements
//! ADR-023 P4 ("deploy returns only after smoke-test success") via
//! the companion seam from P7.
//! - [`logs`] — the log-tail contract. `LogQuery` companion trait,
//! `LogChunk` value, and the `PodmanLogQuery` NATS request/reply
//! implementation. Powers the dashboard's "View logs" path —
//! customer clicks the button, gets the last N lines from the
//! device. Live tail (streaming) is v0.4.
pub mod agent;
pub mod logs;
pub mod smoke;
pub use agent::AgentObservation;

View File

@@ -0,0 +1,633 @@
//! Smoke-test companion — minimal shape.
//!
//! Implements ADR-023 P4 ("deploy returns only after smoke-test
//! success") and P7 ("extend Scores with companions, not API
//! changes"). The earlier draft of this module decomposed smoke into
//! `Probe` / `ProbeAttempt` / `ProbeOutcome` / `ProbeFailure` /
//! `ProbeName` / `RetryPolicy` / `SmokeSuite` / `SmokeStage` /
//! `SmokeReport` / `SmokeTest` / `SmokeAssemblyError`. That was
//! cardinality matching gone overboard for what is, in the end, "run
//! an async function after deploy and surface a pipeline report."
//! This rewrite picks the smallest seam that still buys the things we
//! actually need.
//!
//! ## What we kept
//!
//! - [`SmokeTest`] — one trait, one async method. Implementer writes a
//! normal Rust function and returns a [`SmokeReport`].
//! - Type-level pairing via the associated `type Interpret`
//! (suggested in PR #292 review): one smoke can cover every Score
//! that shares an Interpret (e.g. `HelmChartScore` + any preset on
//! top of it). **Pairing is convention-only in v0.3** —
//! `Score::create_interpret` returns a `Box<dyn Interpret<T>>`, so
//! the `deploy_with_smoke` call site can't statically refuse a
//! mismatched smoke. Closing that gap is an additive refactor on
//! the `Score` trait (`type Interpret: Interpret<T>;`) and is
//! deliberately deferred — the API shape supports it.
//! - [`deploy`] / [`deploy_with_smoke`] — free functions. `deploy`
//! runs the Score; `deploy_with_smoke` runs the Score then blocks
//! on smoke.
//! - Pipeline data the dashboard renders top-to-bottom
//! ([`CheckReport`] entries on [`SmokeReport`]).
//!
//! ## What we dropped, and what to do instead
//!
//! | Old type | Replaced by |
//! |---------------------------------------|-----------------------------------------------------------------------------|
//! | `Probe`, `ProbeAttempt`, `ProbeOutcome`, `ProbeFailure` | Write a normal `async fn` inside `verify`; push `CheckReport`s into a `Vec`. |
//! | `ProbeName` (validated newtype) | `&'static str` — the names come from our own static code, not user input. |
//! | `RetryPolicy` value type | [`poll_until`] free function. Pass the budget + interval directly. |
//! | `SmokeSuite` builder | A `Vec<CheckReport>` collected inside `verify`. No intermediate type. |
//! | `TcpReachable` probe impl | [`tcp_reachable`] free function. Call it; it returns a `CheckReport`. |
//!
//! Reusability is preserved: the helpers ([`poll_until`],
//! [`tcp_reachable`]) compose by simple function call. A future
//! `http_healthy(url)` or `k8s_pod_ready(client, name)` belongs in
//! this module as another `async fn -> CheckReport`, not as another
//! trait.
use std::future::Future;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use harmony::interpret::{Interpret, InterpretError, Outcome};
use harmony::inventory::Inventory;
use harmony::score::Score;
use harmony::topology::Topology;
use thiserror::Error;
use tracing::{info, info_span};
/// Pair an [`Interpret`] type with an async smoke verification.
///
/// One method. Implementers compose their checks inline; no probe
/// trait, no suite builder, no retry-policy value type. If retries
/// are needed, wrap the closure with [`poll_until`]; otherwise just
/// `await` once.
///
/// Associating with `Interpret` (not the concrete `Score`) lets a
/// single smoke impl cover a family of Scores that share an
/// Interpret. Convention-only enforcement in v0.3 — see the
/// module-level docs.
#[async_trait]
pub trait SmokeTest<T: Topology>: Send + Sync {
/// The Interpret class this smoke verifies. Documentary in v0.3;
/// becomes compile-time enforceable once `Score` gains its own
/// `type Interpret`.
type Interpret: Interpret<T>;
/// Run all checks. Implementers are free to short-circuit on
/// first failure, or run every check to give the dashboard a
/// full per-step picture. The framework reads `report.passed()`
/// to gate the deploy and renders `report.checks` for the user.
async fn verify(&self, topology: &T) -> SmokeReport;
}
/// Aggregated smoke result. Dashboards render `checks` top-to-bottom.
///
/// A report is "passing" iff it has at least one check AND every
/// check passed. An empty report passes nothing — the framework
/// rejects empty reports the same way it rejects failing ones, so a
/// smoke impl that forgets to push any checks fails loudly instead
/// of letting a deploy through.
#[derive(Debug, Clone, Default)]
pub struct SmokeReport {
pub checks: Vec<CheckReport>,
}
impl SmokeReport {
pub fn passed(&self) -> bool {
!self.checks.is_empty() && self.checks.iter().all(|c| c.passed)
}
pub fn is_empty(&self) -> bool {
self.checks.is_empty()
}
/// Iterator over only the failures — handy for compact error
/// rendering without filtering at the call site.
pub fn failed(&self) -> impl Iterator<Item = &CheckReport> {
self.checks.iter().filter(|c| !c.passed)
}
}
/// One step's result, named for the dashboard and the operator's
/// log. `detail` is `Some` whenever there's something useful to show
/// (a connect error, the elapsed time, the response body's first
/// line) — leave it `None` for trivial passes.
#[derive(Debug, Clone)]
pub struct CheckReport {
pub name: &'static str,
pub passed: bool,
pub detail: Option<String>,
}
impl CheckReport {
pub fn pass(name: &'static str) -> Self {
Self {
name,
passed: true,
detail: None,
}
}
pub fn pass_with(name: &'static str, detail: impl Into<String>) -> Self {
Self {
name,
passed: true,
detail: Some(detail.into()),
}
}
pub fn fail(name: &'static str, detail: impl Into<String>) -> Self {
Self {
name,
passed: false,
detail: Some(detail.into()),
}
}
}
/// Poll a fallible async closure until it returns `Ok` or `budget`
/// elapses. Returns a single [`CheckReport`] capturing the outcome.
///
/// Use this inside [`SmokeTest::verify`] when you need retries.
/// Otherwise just `await` once and build a `CheckReport` from the
/// result — no helper required.
pub async fn poll_until<F, Fut>(
name: &'static str,
budget: Duration,
interval: Duration,
check: F,
) -> CheckReport
where
F: Fn() -> Fut,
Fut: Future<Output = Result<(), String>>,
{
let deadline = Instant::now() + budget;
loop {
match check().await {
Ok(()) => return CheckReport::pass(name),
Err(e) => {
if Instant::now() >= deadline {
return CheckReport::fail(name, format!("did not pass within {budget:?}: {e}"));
}
tokio::time::sleep(interval).await;
}
}
}
}
/// One TCP connect attempt against `addr`, gated by `timeout`.
///
/// Wrap with [`poll_until`] for retry semantics:
///
/// ```ignore
/// poll_until("nats-reachable", Duration::from_secs(30), Duration::from_secs(1), || async {
/// tcp_reachable("once", "nats:4222", Duration::from_secs(1))
/// .await
/// .passed
/// .then_some(())
/// .ok_or_else(|| "still not reachable".to_string())
/// }).await
/// ```
pub async fn tcp_reachable(name: &'static str, addr: &str, timeout: Duration) -> CheckReport {
match tokio::time::timeout(timeout, tokio::net::TcpStream::connect(addr)).await {
Ok(Ok(_)) => CheckReport::pass(name),
Ok(Err(e)) => CheckReport::fail(name, format!("connect failed: {e}")),
Err(_) => CheckReport::fail(name, format!("timeout after {timeout:?}")),
}
}
/// What `deploy*` returns on success. `smoke` is `None` for the
/// no-smoke variant, `Some` (with `passed() == true`) otherwise.
#[derive(Debug, Clone)]
pub struct DeployOutcome {
pub interpret: Outcome,
pub smoke: Option<SmokeReport>,
}
#[derive(Debug, Error)]
pub enum DeployError {
#[error("score interpret failed: {0}")]
Interpret(#[from] InterpretError),
#[error(
"smoke failed: {} of {} check(s) did not pass",
.report.failed().count(),
.report.checks.len(),
)]
SmokeFailed {
/// The Score deployed cleanly — preserved so the dashboard
/// can render the success message next to the smoke failure.
interpret: Outcome,
report: SmokeReport,
},
}
/// Deploy a Score; do **not** run any smoke. Equivalent to calling
/// `score.interpret(...)` directly — provided so callers can switch
/// between smoke / no-smoke at one site.
pub async fn deploy<S, T>(
score: S,
inventory: &Inventory,
topology: &T,
) -> Result<DeployOutcome, DeployError>
where
S: Score<T>,
T: Topology,
{
let span = info_span!("deploy", score = score.name());
let _g = span.enter();
info!("deploy: starting interpret (no smoke)");
let interpret = score.interpret(inventory, topology).await?;
info!(status = %interpret.status, "deploy: interpret returned");
Ok(DeployOutcome {
interpret,
smoke: None,
})
}
/// Deploy a Score and block on its smoke test (ADR-023 P4).
///
/// Returns `Ok` only when every check in the report passed. An empty
/// report is rejected the same way a failing one is — a smoke impl
/// that forgot to push checks fails the deploy rather than silently
/// passing.
pub async fn deploy_with_smoke<S, ST, T>(
score: S,
smoke: &ST,
inventory: &Inventory,
topology: &T,
) -> Result<DeployOutcome, DeployError>
where
S: Score<T>,
ST: SmokeTest<T>,
T: Topology,
{
let span = info_span!("deploy", score = score.name(), smoke = true);
let _g = span.enter();
info!("deploy: starting interpret");
let interpret = score.interpret(inventory, topology).await?;
info!(status = %interpret.status, "deploy: interpret returned; running smoke");
let report = smoke.verify(topology).await;
info!(
passed = report.passed(),
checks = report.checks.len(),
failed = report.failed().count(),
"deploy: smoke returned",
);
if report.passed() {
Ok(DeployOutcome {
interpret,
smoke: Some(report),
})
} else {
Err(DeployError::SmokeFailed { interpret, report })
}
}
#[cfg(test)]
mod tests {
//! Behavioural tests for the minimal smoke shape.
//!
//! These exercise the deploy ↔ smoke handoff against a trivial
//! in-process Score / Interpret / Topology so we don't depend on
//! any external infrastructure. The fixtures are intentionally
//! tiny — they only carry what each test actually touches.
use super::*;
use async_trait::async_trait;
use harmony::data::Version;
use harmony::interpret::{InterpretName, InterpretStatus};
use harmony::inventory::Inventory;
use harmony::topology::{PreparationError, PreparationOutcome};
use harmony_types::id::Id;
use serde::Serialize;
use std::net::SocketAddr;
use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::{Arc, Mutex};
use tokio::net::TcpListener;
#[derive(Debug)]
struct NoopTopology;
#[async_trait]
impl Topology for NoopTopology {
fn name(&self) -> &str {
"noop"
}
async fn ensure_ready(&self) -> Result<PreparationOutcome, PreparationError> {
Ok(PreparationOutcome::Noop)
}
}
/// A `Score` that records execute() calls and returns a configured
/// outcome. Tracks invocations so a test can assert "interpret ran
/// exactly once".
#[derive(Debug, Clone)]
struct CountingScore {
executed: Arc<AtomicBool>,
fail: bool,
}
impl CountingScore {
fn new() -> Self {
Self {
executed: Arc::new(AtomicBool::new(false)),
fail: false,
}
}
fn failing() -> Self {
Self {
executed: Arc::new(AtomicBool::new(false)),
fail: true,
}
}
}
impl Serialize for CountingScore {
fn serialize<S: serde::Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
s.serialize_str("CountingScore")
}
}
#[async_trait]
impl Score<NoopTopology> for CountingScore {
fn create_interpret(&self) -> Box<dyn Interpret<NoopTopology>> {
Box::new(CountingInterpret {
executed: self.executed.clone(),
fail: self.fail,
})
}
fn name(&self) -> String {
"CountingScore".into()
}
}
#[derive(Debug)]
struct CountingInterpret {
executed: Arc<AtomicBool>,
fail: bool,
}
#[async_trait]
impl Interpret<NoopTopology> for CountingInterpret {
fn get_name(&self) -> InterpretName {
InterpretName::Custom("counting")
}
fn get_version(&self) -> Version {
Version::from("0.1.0").unwrap()
}
fn get_status(&self) -> InterpretStatus {
InterpretStatus::QUEUED
}
fn get_children(&self) -> Vec<Id> {
vec![]
}
async fn execute(
&self,
_i: &Inventory,
_t: &NoopTopology,
) -> Result<Outcome, InterpretError> {
self.executed.store(true, Ordering::SeqCst);
if self.fail {
Err(InterpretError::new(
"counting: configured to fail".to_string(),
))
} else {
Ok(Outcome::success("counting: ok".to_string()))
}
}
}
/// A test smoke that returns a pre-recorded report and records
/// whether `verify` was called. Lets each test inject the exact
/// report shape it wants to drive `deploy_with_smoke` through.
struct FixedSmoke {
report: Mutex<Option<SmokeReport>>,
verify_called: Arc<AtomicBool>,
}
impl FixedSmoke {
fn new(report: SmokeReport) -> Self {
Self {
report: Mutex::new(Some(report)),
verify_called: Arc::new(AtomicBool::new(false)),
}
}
}
#[async_trait]
impl SmokeTest<NoopTopology> for FixedSmoke {
type Interpret = CountingInterpret;
async fn verify(&self, _t: &NoopTopology) -> SmokeReport {
self.verify_called.store(true, Ordering::SeqCst);
self.report
.lock()
.unwrap()
.take()
.expect("verify called more than once")
}
}
#[tokio::test]
async fn deploy_runs_interpret_and_returns_outcome() {
let score = CountingScore::new();
let executed = score.executed.clone();
let outcome = deploy(score, &Inventory::empty(), &NoopTopology)
.await
.expect("clean interpret should produce Ok");
assert!(executed.load(Ordering::SeqCst), "interpret must run");
assert!(outcome.smoke.is_none(), "no-smoke variant has no report");
assert_eq!(outcome.interpret.message, "counting: ok");
}
#[tokio::test]
async fn deploy_propagates_interpret_failure() {
let result = deploy(CountingScore::failing(), &Inventory::empty(), &NoopTopology).await;
assert!(matches!(result, Err(DeployError::Interpret(_))));
}
#[tokio::test]
async fn deploy_with_smoke_returns_report_on_success() {
let score = CountingScore::new();
let smoke = FixedSmoke::new(SmokeReport {
checks: vec![CheckReport::pass("a"), CheckReport::pass("b")],
});
let outcome = deploy_with_smoke(score, &smoke, &Inventory::empty(), &NoopTopology)
.await
.expect("passing smoke should produce Ok");
let report = outcome.smoke.expect("smoke report present on success");
assert!(report.passed());
assert_eq!(report.checks.len(), 2);
assert!(smoke.verify_called.load(Ordering::SeqCst));
}
#[tokio::test]
async fn deploy_with_smoke_fails_when_any_check_fails() {
// Pin the deploy-blocks-on-smoke contract: a single failed
// check fails the whole deploy, regardless of how many
// passed. The `interpret` field of the error must still be
// populated so the dashboard can show what succeeded
// alongside what didn't.
let smoke = FixedSmoke::new(SmokeReport {
checks: vec![
CheckReport::pass("a"),
CheckReport::fail("b", "bad"),
CheckReport::pass("c"),
],
});
let err = deploy_with_smoke(
CountingScore::new(),
&smoke,
&Inventory::empty(),
&NoopTopology,
)
.await
.expect_err("failing smoke must error");
match err {
DeployError::SmokeFailed { report, interpret } => {
assert!(!report.passed());
assert_eq!(report.failed().count(), 1);
assert_eq!(interpret.message, "counting: ok");
}
other => panic!("expected SmokeFailed, got {other:?}"),
}
}
#[tokio::test]
async fn deploy_with_smoke_rejects_empty_report() {
// A smoke impl that forgot to push any checks must fail
// loudly. Silent pass-through would defeat the entire point
// of the contract.
let smoke = FixedSmoke::new(SmokeReport::default());
let err = deploy_with_smoke(
CountingScore::new(),
&smoke,
&Inventory::empty(),
&NoopTopology,
)
.await
.expect_err("empty smoke must error");
assert!(matches!(err, DeployError::SmokeFailed { .. }));
}
#[tokio::test]
async fn deploy_with_smoke_skips_smoke_when_interpret_fails() {
// Interpret error short-circuits — running smoke against a
// half-deployed component would produce confusing cascade
// failures.
let smoke = FixedSmoke::new(SmokeReport {
checks: vec![CheckReport::pass("never-runs")],
});
let verify_called = smoke.verify_called.clone();
let err = deploy_with_smoke(
CountingScore::failing(),
&smoke,
&Inventory::empty(),
&NoopTopology,
)
.await
.expect_err("interpret failure must error");
assert!(matches!(err, DeployError::Interpret(_)));
assert!(
!verify_called.load(Ordering::SeqCst),
"smoke must NOT run when interpret fails",
);
}
#[tokio::test]
async fn smoke_report_passed_requires_nonempty_and_all_pass() {
assert!(!SmokeReport::default().passed());
assert!(
!SmokeReport {
checks: vec![CheckReport::pass("a"), CheckReport::fail("b", "x")],
}
.passed()
);
assert!(
SmokeReport {
checks: vec![CheckReport::pass("a"), CheckReport::pass("b")],
}
.passed()
);
}
#[tokio::test]
async fn poll_until_returns_pass_when_check_succeeds_within_budget() {
let calls = Arc::new(AtomicBool::new(false));
let c = calls.clone();
let check = poll_until(
"eventually-ok",
Duration::from_secs(2),
Duration::from_millis(10),
move || {
let c = c.clone();
async move {
if c.swap(true, Ordering::SeqCst) {
Ok(())
} else {
Err("not yet".into())
}
}
},
)
.await;
assert!(check.passed);
assert_eq!(check.name, "eventually-ok");
}
#[tokio::test]
async fn poll_until_returns_fail_after_budget_elapses() {
let check = poll_until(
"never-ok",
Duration::from_millis(50),
Duration::from_millis(10),
|| async { Err("still nope".into()) },
)
.await;
assert!(!check.passed);
assert!(
check
.detail
.as_deref()
.is_some_and(|d| d.contains("still nope")),
"detail must carry the last error, got {:?}",
check.detail,
);
}
#[tokio::test]
async fn tcp_reachable_passes_against_real_listener() {
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr: SocketAddr = listener.local_addr().unwrap();
// Accept loop so connect() completes; otherwise SYN-RST.
tokio::spawn(async move {
let _ = listener.accept().await;
});
let check = tcp_reachable("loopback", &addr.to_string(), Duration::from_secs(1)).await;
assert!(check.passed, "loopback connect must succeed: {check:?}");
}
#[tokio::test]
async fn tcp_reachable_fails_with_timeout_when_no_listener() {
// RFC 5737 TEST-NET-1: guaranteed-non-routable; a SYN here
// will sit unanswered and we'll hit our own timeout deterministically.
let check = tcp_reachable("dead-net", "192.0.2.1:1", Duration::from_millis(150)).await;
assert!(!check.passed);
assert!(
check
.detail
.as_deref()
.is_some_and(|d| d.contains("timeout") || d.contains("connect failed")),
);
}
#[tokio::test]
async fn deploy_outcome_smoke_field_is_none_for_no_smoke_deploy() {
let outcome = deploy(CountingScore::new(), &Inventory::empty(), &NoopTopology)
.await
.unwrap();
assert!(outcome.smoke.is_none());
}
}

View File

@@ -1,82 +0,0 @@
//! The Score → SmokeTest pairing contract.
//!
//! [`SmokeTest`] is the companion trait that pairs a [`Score`] with
//! the probes that verify *that specific* Score's deployment worked.
//! The pairing is type-level via the `Score` associated type — the
//! compiler refuses to call
//! [`deploy_with_smoke`](super::deploy::deploy_with_smoke) with a
//! `FleetOperatorSmokeTest` for a `DnsScore`, because their
//! associated types don't match. This is the same trick that makes
//! `K8sBareTopology: PostgreSQL` an unsatisfied trait bound when you
//! accidentally try to run PostgreSQL on a topology that doesn't
//! advertise it — see ADR-024 §2 and JG's *For the Love of
//! Compilers* talk.
//!
//! ## Why not a method on `Score`?
//!
//! Per ADR-023 P7, framework capabilities attach as companions, not
//! as new methods on `Score`/`Interpret`. The base trait surface
//! stays one method (`create_interpret`). Smoke is *something paired
//! with* a Score, not *something on* a Score. The orthogonal
//! benefit: a Score can be deployed without smoke, can be deployed
//! with one of several smoke configurations (dev, prod, e2e), or
//! can have its smoke evolved independently of its interpret logic.
//!
//! ## Why `assemble` returns a value, not a future-of-checks?
//!
//! The probes a smoke test wants to run are usually known up front
//! from the Score's data — "the service name we just deployed,
//! check it on the cluster's default DNS". Resolving that into a
//! concrete `SmokeSuite` *before* the probes run means the dashboard
//! can render the pipeline (probe names, stage count) the moment
//! smoke starts. If it returned a `Stream<Item = ProbeReport>`, the
//! pipeline shape wouldn't be knowable until the run completed.
use async_trait::async_trait;
use thiserror::Error;
use harmony::score::Score;
use harmony::topology::Topology;
use super::suite::SmokeSuite;
/// Pair a [`Score`] with the [`SmokeSuite`] that proves its deploy
/// converged.
///
/// Implementors live next to the Score they pair with — the
/// `FleetOperatorSmokeTest` ships from this crate alongside
/// `FleetOperatorScore`, the same way [`crate::AgentObservation`]
/// lives alongside `FleetAgentScore`. This is the companion
/// placement rule from ADR-023 §"Extend Scores with companions".
#[async_trait]
pub trait SmokeTest<T: Topology>: Send + Sync {
/// The Score this smoke test verifies. The type lock means
/// `SM::Score = S` is enforced at every call site.
type Score: Score<T>;
/// Build the [`SmokeSuite`] that will run after the Score's
/// `interpret` succeeds. Has access to the Score so probes can
/// be parameterized on the deploy's actual inputs (service
/// names, namespaces, ports), and to the topology so probes can
/// pull credentials / endpoints from declared capabilities.
async fn assemble(
&self,
score: &Self::Score,
topology: &T,
) -> Result<SmokeSuite<T>, SmokeAssemblyError>;
}
/// Why a smoke test failed to *build* (before any probe runs).
///
/// Distinct from probe failure — assembly failure means the Score
/// is in an unexpected shape (missing endpoint, can't resolve a
/// reference). The dashboard shows this differently from a probe
/// failure: it's an authoring / configuration error, not a deployed-
/// system error.
#[derive(Debug, Clone, Error, PartialEq, Eq)]
pub enum SmokeAssemblyError {
#[error("smoke test could not derive endpoint for {what}: {detail}")]
MissingEndpoint { what: String, detail: String },
#[error("smoke test assembly failed: {0}")]
Other(String),
}

View File

@@ -1,387 +0,0 @@
//! `deploy` and `deploy_with_smoke` — the framework-glue wrappers
//! that bind a [`Score`]'s interpret to an optional [`SmokeTest`].
//!
//! These are free functions, not methods on [`Score`]/[`Maestro`]
//! /[`Interpret`]. The framework's domain directory is left
//! untouched in this PR (ADR-023 P7 — "extend Scores with companions,
//! not API changes"). If a later PR proves this shape, promoting
//! `deploy_with_smoke` to a `Maestro` convenience is a one-line
//! additive change.
//!
//! ## Opting in or out
//!
//! Two entry points, one decision at the call site:
//!
//! - [`deploy`] — run the Score's interpret, no smoke.
//! - [`deploy_with_smoke`] — run the Score's interpret, then the
//! paired [`SmokeTest`]. Returns only after the smoke run
//! completes; a failed smoke is a deploy error per the project
//! rule "deploy blocks on smoke-test".
//!
//! Callers (CLI binaries, the e2e harness, the operator's
//! reconciler) choose which to call. There is no global config knob
//! and no per-Score override — the choice is one function call.
use harmony::interpret::{InterpretError, Outcome};
use harmony::inventory::Inventory;
use harmony::score::Score;
use harmony::topology::Topology;
use thiserror::Error;
use tracing::{info, info_span};
use super::contract::{SmokeAssemblyError, SmokeTest};
use super::suite::SmokeReport;
/// What [`deploy`]/[`deploy_with_smoke`] return on success.
///
/// `interpret` is the underlying `Score::interpret` result —
/// preserved verbatim so callers that previously consumed `Outcome`
/// directly continue to work after switching to these wrappers.
/// `smoke` is `None` when the no-smoke variant was used; `Some`
/// otherwise, with `passed() == true` guaranteed (a failed smoke is
/// a [`DeployError::SmokeFailed`]).
#[derive(Debug, Clone)]
pub struct DeployOutcome {
pub interpret: Outcome,
pub smoke: Option<SmokeReport>,
}
/// All the ways a deploy can fail. Three arms, each pointing at a
/// concrete cause the operator can act on:
///
/// - `Interpret` — the Score's own work failed (helm error, kube
/// apply error, etc). Same shape callers got before smoke existed.
/// - `SmokeAssembly` — the Score deployed, but its `SmokeTest`
/// couldn't even *build* a suite. Typically means the test's
/// author is missing an endpoint reference; an authoring bug, not
/// a runtime bug.
/// - `SmokeFailed` — the Score deployed and the suite built, but
/// one or more probes failed. The full [`SmokeReport`] is
/// attached so the dashboard can render which stages passed and
/// which didn't, instead of a single opaque "deploy failed".
#[derive(Debug, Error)]
pub enum DeployError {
#[error("score interpret failed: {0}")]
Interpret(#[from] InterpretError),
#[error("smoke assembly failed: {0}")]
SmokeAssembly(#[from] SmokeAssemblyError),
#[error("smoke test failed ({} of {} probe(s) did not pass)", .report.failed().count(), .report.probes.len())]
SmokeFailed {
/// The Score deployed cleanly — `interpret` is preserved so
/// callers (and the dashboard) can render the underlying
/// success message alongside the smoke failure.
interpret: Outcome,
report: SmokeReport,
},
}
/// Deploy a Score without running any smoke checks.
///
/// Returns immediately after `score.interpret` returns. Equivalent
/// to calling `score.interpret(...)` directly — provided here so
/// callers can switch between smoke / no-smoke without changing
/// the surrounding control flow.
pub async fn deploy<T, S>(
score: S,
inventory: &Inventory,
topology: &T,
) -> Result<DeployOutcome, DeployError>
where
T: Topology,
S: Score<T>,
{
let span = info_span!("deploy", score = score.name());
let _g = span.enter();
info!("deploy: starting interpret (no smoke)");
let interpret = score.interpret(inventory, topology).await?;
info!(status = %interpret.status, "deploy: interpret returned");
Ok(DeployOutcome {
interpret,
smoke: None,
})
}
/// Deploy a Score, then run its paired SmokeTest. Block on the
/// suite — see project rule "deploy blocks on smoke-test".
///
/// The type bound `SM: SmokeTest<T, Score = S>` is where the
/// compile-time pairing lives: the `SmokeTest` you pass must
/// declare *this* Score type as its associated `Score`. A mismatch
/// is rejected at the call site, not at runtime. (See ADR-024 §2
/// and JG's compile-time-feedback principle.)
pub async fn deploy_with_smoke<T, S, SM>(
score: S,
smoke: SM,
inventory: &Inventory,
topology: &T,
) -> Result<DeployOutcome, DeployError>
where
T: Topology,
S: Score<T>,
SM: SmokeTest<T, Score = S>,
{
let span = info_span!("deploy", score = score.name(), smoke = true);
let _g = span.enter();
info!("deploy: starting interpret");
let interpret = score.interpret(inventory, topology).await?;
info!(status = %interpret.status, "deploy: interpret returned, assembling smoke");
let suite = smoke.assemble(&score, topology).await?;
info!(stages = suite.len(), "deploy: smoke suite assembled");
let report = suite.run(topology).await;
info!(
passed = report.passed(),
elapsed_ms = report.elapsed.as_millis() as u64,
"deploy: smoke finished",
);
if !report.passed() {
return Err(DeployError::SmokeFailed { interpret, report });
}
Ok(DeployOutcome {
interpret,
smoke: Some(report),
})
}
// ---- tests -----------------------------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
use crate::companion::smoke::probe::{Probe, ProbeAttempt, ProbeName, RetryPolicy};
use crate::companion::smoke::suite::SmokeSuite;
use async_trait::async_trait;
use harmony::data::Version;
use harmony::interpret::{Interpret, InterpretName, InterpretStatus};
use harmony::topology::PreparationOutcome;
use harmony_types::id::Id;
use serde::Serialize;
use std::sync::Mutex;
// -- test fixtures: a NoopTopology + minimal Score + minimal Probe --------
#[derive(Debug)]
struct NoopTopology;
#[async_trait]
impl Topology for NoopTopology {
fn name(&self) -> &str {
"noop"
}
async fn ensure_ready(
&self,
) -> Result<PreparationOutcome, harmony::topology::PreparationError> {
Ok(PreparationOutcome::Noop)
}
}
/// Minimal Score whose interpret can be scripted to succeed or
/// fail. Demonstrates that `deploy_with_smoke` short-circuits
/// correctly when interpret fails.
#[derive(Debug, Clone, Serialize)]
struct TrivialScore {
label: String,
should_fail: bool,
}
impl<T: Topology> Score<T> for TrivialScore {
fn create_interpret(&self) -> Box<dyn Interpret<T>> {
Box::new(TrivialInterpret {
should_fail: self.should_fail,
label: self.label.clone(),
})
}
fn name(&self) -> String {
format!("TrivialScore({})", self.label)
}
}
#[derive(Debug)]
struct TrivialInterpret {
should_fail: bool,
label: String,
}
#[async_trait]
impl<T: Topology> Interpret<T> for TrivialInterpret {
async fn execute(
&self,
_inventory: &Inventory,
_topology: &T,
) -> Result<Outcome, InterpretError> {
if self.should_fail {
Err(InterpretError::new(format!(
"scripted failure: {}",
self.label
)))
} else {
Ok(Outcome::success(format!("deployed {}", self.label)))
}
}
fn get_name(&self) -> InterpretName {
InterpretName::Custom("TrivialInterpret")
}
fn get_version(&self) -> Version {
Version::from("0.0.0").unwrap()
}
fn get_status(&self) -> InterpretStatus {
InterpretStatus::QUEUED
}
fn get_children(&self) -> Vec<Id> {
vec![]
}
}
/// A probe whose every attempt is hardcoded. Lets us script
/// "this stage passes" or "this stage rejects" in tests.
#[derive(Debug)]
struct Always {
name: ProbeName,
attempt: ProbeAttempt,
}
impl Always {
fn new(name: &str, attempt: ProbeAttempt) -> Self {
Self {
name: ProbeName::try_new(name).unwrap(),
attempt,
}
}
}
#[async_trait]
impl<T: Topology> Probe<T> for Always {
fn name(&self) -> &ProbeName {
&self.name
}
async fn check(&self, _t: &T) -> ProbeAttempt {
self.attempt.clone()
}
}
/// The shape we expect every fleet smoke test to follow: it
/// owns no state beyond what the Score gives it.
#[derive(Debug)]
struct TrivialSmoke {
outcomes: Mutex<Vec<ProbeAttempt>>,
}
impl TrivialSmoke {
fn new(outcomes: Vec<ProbeAttempt>) -> Self {
Self {
outcomes: Mutex::new(outcomes),
}
}
}
#[async_trait]
impl<T: Topology> SmokeTest<T> for TrivialSmoke {
type Score = TrivialScore;
async fn assemble(
&self,
score: &Self::Score,
_topology: &T,
) -> Result<SmokeSuite<T>, SmokeAssemblyError> {
let mut suite = SmokeSuite::new();
for (i, attempt) in self.outcomes.lock().unwrap().iter().enumerate() {
let name = format!("{}-probe-{i}", score.label);
suite = suite.stage(Always::new(&name, attempt.clone()), RetryPolicy::once());
}
Ok(suite)
}
}
// -- the actual contract tests --------------------------------------------
fn inv() -> Inventory {
Inventory::empty()
}
#[tokio::test]
async fn deploy_without_smoke_returns_outcome_and_no_report() {
let s = TrivialScore {
label: "alpha".to_string(),
should_fail: false,
};
let r = deploy(s, &inv(), &NoopTopology).await.expect("deploy ok");
assert_eq!(r.interpret.status, InterpretStatus::SUCCESS);
assert!(
r.smoke.is_none(),
"no-smoke variant must not synthesize a report"
);
}
#[tokio::test]
async fn interpret_failure_short_circuits_before_smoke_assembly() {
// If interpret fails, we MUST NOT call `smoke.assemble` —
// there is nothing deployed to verify, and running smoke
// would produce confusing cascade failures in the report.
let s = TrivialScore {
label: "beta".to_string(),
should_fail: true,
};
// Use a smoke that would panic if assembled — proves we
// never got there.
struct PanicSmoke;
#[async_trait]
impl<T: Topology> SmokeTest<T> for PanicSmoke {
type Score = TrivialScore;
async fn assemble(
&self,
_: &Self::Score,
_: &T,
) -> Result<SmokeSuite<T>, SmokeAssemblyError> {
panic!("assemble must not be called when interpret fails");
}
}
match deploy_with_smoke(s, PanicSmoke, &inv(), &NoopTopology).await {
Err(DeployError::Interpret(e)) => {
assert!(format!("{e}").contains("scripted failure"));
}
other => panic!("expected Interpret error, got {other:?}"),
}
}
#[tokio::test]
async fn all_probes_pass_yields_deployoutcome_with_report() {
let s = TrivialScore {
label: "gamma".to_string(),
should_fail: false,
};
let smoke = TrivialSmoke::new(vec![
ProbeAttempt::Ok(Some("first stage ok".to_string())),
ProbeAttempt::Ok(None),
]);
let r = deploy_with_smoke(s, smoke, &inv(), &NoopTopology)
.await
.expect("deploy + smoke ok");
let report = r.smoke.expect("smoke report present when smoke runs");
assert!(report.passed());
assert_eq!(report.probes.len(), 2);
}
#[tokio::test]
async fn probe_failure_blocks_deploy() {
let s = TrivialScore {
label: "delta".to_string(),
should_fail: false,
};
let smoke = TrivialSmoke::new(vec![
ProbeAttempt::Ok(None),
ProbeAttempt::Fatal("definitely broken".to_string()),
]);
match deploy_with_smoke(s, smoke, &inv(), &NoopTopology).await {
Err(DeployError::SmokeFailed { interpret, report }) => {
// The Score itself succeeded — that's preserved, so
// the dashboard can show "deploy converged, smoke
// disagrees".
assert_eq!(interpret.status, InterpretStatus::SUCCESS);
assert!(!report.passed());
assert_eq!(report.failed().count(), 1);
assert_eq!(report.probes.last().unwrap().name.as_str(), "delta-probe-1");
}
other => panic!("expected SmokeFailed, got {other:?}"),
}
}
}

View File

@@ -1,62 +0,0 @@
//! Smoke-test contract — the second Score companion (ADR-023 P7,
//! ADR-023 P4 "deploy returns only after smoke-test success").
//!
//! ## The big picture
//!
//! ```text
//! ┌────────────┐ pair ┌──────────────┐ assemble ┌────────────┐
//! │ *Score │◀─type─────│ SmokeTest │─────────────▶│ SmokeSuite │
//! └────────────┘ locked └──────────────┘ └────────────┘
//! ▲ │
//! │ │
//! deploy_with_smoke ─── interpret ─── then ── run probes ──▶ SmokeReport
//! ```
//!
//! - [`Probe`] is the single-attempt unit, classifying its
//! observation as [`Ok`/`Retry`/`Fatal`](probe::ProbeAttempt). It's
//! also the stage the dashboard renders.
//! - [`SmokeSuite`] is an ordered sequence of probes with per-stage
//! [`RetryPolicy`]. Sequential by design; parallel stages deferred.
//! - [`SmokeTest`] pairs a [`Score`](harmony::score::Score) with a
//! suite by associated type — a `SmokeTest<Score = FleetOperatorScore>`
//! cannot be passed to `deploy_with_smoke(DnsScore, …)` because the
//! compiler refuses the mismatch.
//! - [`deploy`] / [`deploy_with_smoke`] are free functions that
//! bind a Score's interpret to an optional SmokeTest. Returns
//! only after the smoke run completes — see
//! [`DeployError::SmokeFailed`] for the failure shape.
//!
//! ## Where this lives
//!
//! Inside `harmony-fleet-deploy` for Phase 0, alongside the
//! existing [`AgentObservation`](super::AgentObservation) companion.
//! When a second consumer (PostgreSQL deploy, monitoring stack)
//! adopts this contract, the module promotes to a top-level
//! `harmony-smoke` crate with no API churn — the move is mechanical
//! because nothing in here depends on fleet types.
//!
//! ## What this does NOT do
//!
//! - **No new `HarmonyEvent` variants in Phase 0.** Stage progress
//! is emitted via `tracing::info!` events inside `info_span!`s.
//! A later PR can add `HarmonyEvent::SmokeStage{Started,Finished}`
//! if the dashboard wants the typed seam.
//! - **No `Score`/`Interpret` trait edits.** Per ADR-023 P7. The
//! smoke seam is purely companion-shaped.
//! - **No CLI flag yet.** Phase 1 wires `harmony-fleet-deploy <cmd>
//! --no-smoke` once a real `FleetOperatorSmokeTest` exists.
pub mod contract;
pub mod deploy;
pub mod probe;
pub mod probes;
pub mod suite;
pub use contract::{SmokeAssemblyError, SmokeTest};
pub use deploy::{DeployError, DeployOutcome, deploy, deploy_with_smoke};
pub use probe::{
InvalidProbeName, Probe, ProbeAttempt, ProbeFailure, ProbeName, ProbeOutcome, RetryPolicy,
run_probe,
};
pub use probes::TcpReachable;
pub use suite::{ProbeReport, SmokeReport, SmokeSuite};

View File

@@ -1,515 +0,0 @@
//! Probe contract — the single-attempt unit of a smoke run.
//!
//! A [`Probe`] is what gets visualized as one stage of the smoke
//! pipeline. Each implementation answers one question against a
//! deployed system: "is this port reachable", "did this pod become
//! ready", "is this KV key set to the value we expect".
//!
//! The retry loop is **not** a probe responsibility. A probe's
//! [`check`](Probe::check) returns one of three classifications via
//! [`ProbeAttempt`] — `Ok`, `Retry`, or `Fatal` — and [`run_probe`]
//! drives the polling timer + timeout budget around it. This keeps
//! probes single-responsibility and makes their unit tests
//! deterministic.
//!
//! Each public type here is cardinality-matched to the concept it
//! represents (see ADR-024 §2 and JG's *For the Love of Compilers*
//! talk). `ProbeName` is a validated newtype, not `String`, because
//! "an empty probe name" or "a probe name with a newline in it"
//! cannot mean anything useful and would corrupt the dashboard
//! rendering. `ProbeAttempt` is a three-arm sum, not `Result<(), Error>`,
//! because "not ready yet" and "definitively no" must drive different
//! orchestration paths.
use std::fmt;
use std::time::{Duration, Instant};
use async_trait::async_trait;
use thiserror::Error;
use tracing::{debug, warn};
use harmony::topology::Topology;
/// One attempted check by a probe.
///
/// The probe classifies its own observation rather than letting the
/// orchestrator guess: a 5xx response is fundamentally different from
/// a connection refused, even if both look like "failure" to a naive
/// caller. The orchestrator ([`run_probe`]) uses this classification
/// to decide whether to keep polling or bail out.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProbeAttempt {
/// The probe's invariant holds *right now*. Optional detail is
/// surfaced in the smoke report (e.g. "HTTP 200 in 12 ms",
/// "found 3 ready pods").
Ok(Option<String>),
/// The probe didn't observe what it's looking for yet, but the
/// state is consistent with eventual success. Keep polling until
/// the timeout. The string is the most recent observation, kept
/// so that a final timeout can quote the last thing we saw.
Retry(String),
/// The probe got a definitive negative answer — no amount of
/// further polling will change it. Skip the rest of the budget
/// and report failure now. Example: HTTP 404 from a URL that
/// must exist after deploy, or a `Service` with the wrong
/// selector.
Fatal(String),
}
/// Result of a full probe run after [`run_probe`] has consumed the
/// retry budget (or short-circuited).
///
/// This is the per-probe shape that lands in
/// [`crate::companion::smoke::SmokeReport`]. The orchestrator wraps
/// it with the [`ProbeName`] and the elapsed time so the dashboard
/// can render each stage independently.
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ProbeOutcome {
/// At least one attempt returned [`ProbeAttempt::Ok`] within the
/// retry budget.
Pass {
elapsed: Duration,
attempts: u32,
detail: Option<String>,
},
/// Either we exhausted the budget with only `Retry`s (timeout)
/// or a single `Fatal` short-circuited us (rejected).
Fail {
elapsed: Duration,
attempts: u32,
failure: ProbeFailure,
},
}
impl ProbeOutcome {
pub fn passed(&self) -> bool {
matches!(self, Self::Pass { .. })
}
}
/// Why a probe failed. Two arms, deliberately — they map to two
/// different operator actions: "I waited and nothing happened" needs
/// a longer budget or a stuck-component investigation; "something
/// gave me a hard no" needs a configuration fix.
#[derive(Debug, Clone, PartialEq, Eq, Error)]
pub enum ProbeFailure {
#[error("timed out after {budget:?} (last observation: {last_observation})")]
Timeout {
budget: Duration,
last_observation: String,
},
#[error("rejected: {detail}")]
Rejected { detail: String },
}
/// Polling parameters for one probe.
///
/// `interval` is the wait between attempts. `timeout` is the total
/// wall-clock budget from the first attempt to the last; the loop
/// will not start a new attempt past this budget but will let an
/// in-flight one finish.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct RetryPolicy {
interval: Duration,
timeout: Duration,
}
impl RetryPolicy {
/// Standard polling shape: try, wait `interval`, try again, until
/// either an `Ok`/`Fatal` happens or `timeout` elapses.
///
/// Panics if `interval` is zero — a zero interval combined with
/// any non-trivial check would spin the executor and produce no
/// useful retry semantics. Express "run once" via [`Self::once`]
/// instead.
pub fn polling(interval: Duration, timeout: Duration) -> Self {
assert!(
!interval.is_zero(),
"RetryPolicy::polling requires a non-zero interval; use RetryPolicy::once for a single attempt",
);
Self { interval, timeout }
}
/// Single-shot: one attempt, no retries. Useful for probes whose
/// success criterion is binary the moment we check (e.g. "is
/// this string equal to that string").
pub fn once() -> Self {
Self {
interval: Duration::from_millis(1),
timeout: Duration::ZERO,
}
}
pub fn interval(&self) -> Duration {
self.interval
}
pub fn timeout(&self) -> Duration {
self.timeout
}
}
/// A validated probe name.
///
/// Probe names appear in `tracing` events, error messages, the smoke
/// report, and the eventual dashboard. Constraining them at
/// construction means the dashboard never has to defend against
/// surprising bytes (control chars, newlines, etc).
#[derive(Debug, Clone, Hash, PartialEq, Eq, PartialOrd, Ord)]
pub struct ProbeName(String);
#[derive(Debug, Error, PartialEq, Eq)]
pub enum InvalidProbeName {
#[error("probe name must not be empty")]
Empty,
#[error("probe name must not exceed 128 bytes (got {0})")]
TooLong(usize),
#[error("probe name must not contain control characters")]
ControlChar,
}
impl ProbeName {
pub fn try_new(s: impl Into<String>) -> Result<Self, InvalidProbeName> {
let s = s.into();
if s.is_empty() {
return Err(InvalidProbeName::Empty);
}
if s.len() > 128 {
return Err(InvalidProbeName::TooLong(s.len()));
}
if s.chars().any(|c| c.is_control()) {
return Err(InvalidProbeName::ControlChar);
}
Ok(Self(s))
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl fmt::Display for ProbeName {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.write_str(&self.0)
}
}
/// One pipeline stage of a [`SmokeSuite`](super::SmokeSuite).
///
/// Object-safe so a suite can hold a heterogeneous `Vec<Box<dyn
/// Probe<T>>>`. The associated retry behavior is supplied separately
/// by the suite (via [`RetryPolicy`]) so that the same probe type
/// can be reused with different budgets in different suites.
#[async_trait]
pub trait Probe<T: Topology>: Send + Sync + std::fmt::Debug {
fn name(&self) -> &ProbeName;
async fn check(&self, topology: &T) -> ProbeAttempt;
}
/// Drive a probe through its retry policy and return the consolidated
/// outcome.
///
/// Behavior:
///
/// - Every `interval` we run `probe.check(...)`. A successful attempt
/// wins immediately; a `Fatal` short-circuits to `Rejected`; a
/// `Retry` records its observation and we wait.
/// - If `timeout` is `Duration::ZERO` we run exactly one attempt
/// (the "once" shape).
/// - Otherwise we keep attempting until the elapsed wall time
/// exceeds `timeout`. The last `Retry` observation becomes the
/// `Timeout::last_observation` field so the failure message tells
/// the operator what we kept seeing.
///
/// The function emits a `tracing` event per attempt at `debug` level
/// and one at `warn` for the final failure (so a Phase-1 dashboard
/// can subscribe via a `tracing` layer without us needing to add a
/// new `HarmonyEvent` variant yet).
pub async fn run_probe<T, P>(probe: &P, topology: &T, policy: RetryPolicy) -> ProbeOutcome
where
T: Topology,
P: Probe<T> + ?Sized,
{
let start = Instant::now();
let mut attempts: u32 = 0;
loop {
attempts += 1;
// Each iteration produces either an early return (Ok / Fatal)
// or the most recent Retry observation. Threading that
// observation back into the next iteration only happens
// implicitly via the timeout check below, so there's no
// long-lived `last_observation` to lint over.
let last_observation = match probe.check(topology).await {
ProbeAttempt::Ok(detail) => {
debug!(
probe = %probe.name(),
attempts,
elapsed_ms = start.elapsed().as_millis() as u64,
"probe passed",
);
return ProbeOutcome::Pass {
elapsed: start.elapsed(),
attempts,
detail,
};
}
ProbeAttempt::Fatal(detail) => {
warn!(
probe = %probe.name(),
attempts,
%detail,
"probe rejected (fatal)",
);
return ProbeOutcome::Fail {
elapsed: start.elapsed(),
attempts,
failure: ProbeFailure::Rejected { detail },
};
}
ProbeAttempt::Retry(obs) => {
debug!(
probe = %probe.name(),
attempts,
observation = %obs,
"probe retry",
);
obs
}
};
if start.elapsed() >= policy.timeout {
warn!(
probe = %probe.name(),
attempts,
budget_ms = policy.timeout.as_millis() as u64,
last_observation = %last_observation,
"probe timed out",
);
return ProbeOutcome::Fail {
elapsed: start.elapsed(),
attempts,
failure: ProbeFailure::Timeout {
budget: policy.timeout,
last_observation,
},
};
}
tokio::time::sleep(policy.interval).await;
}
}
#[cfg(test)]
mod tests {
use super::*;
use harmony::topology::PreparationOutcome;
use std::sync::Mutex;
use std::sync::atomic::{AtomicU32, Ordering};
// -- ProbeName -----------------------------------------------------------
#[test]
fn probe_name_accepts_normal_strings() {
assert!(ProbeName::try_new("nats-reachable").is_ok());
assert!(ProbeName::try_new("HTTP / health").is_ok());
}
#[test]
fn probe_name_rejects_empty() {
assert_eq!(ProbeName::try_new(""), Err(InvalidProbeName::Empty));
}
#[test]
fn probe_name_rejects_control_chars() {
// Dashboard renders these as garbage — fail at construction.
assert_eq!(
ProbeName::try_new("bad\nname"),
Err(InvalidProbeName::ControlChar)
);
}
#[test]
fn probe_name_rejects_overlong() {
let long = "x".repeat(129);
assert_eq!(
ProbeName::try_new(long.clone()),
Err(InvalidProbeName::TooLong(129))
);
}
// -- RetryPolicy ---------------------------------------------------------
#[test]
fn retry_policy_once_is_single_attempt() {
let p = RetryPolicy::once();
assert_eq!(p.timeout(), Duration::ZERO);
}
#[test]
#[should_panic(expected = "non-zero interval")]
fn retry_policy_polling_rejects_zero_interval() {
// Zero interval + non-trivial check is a spin loop. The
// type system can't catch this, so a runtime assert at
// construction is the next best thing — fail loud, fail
// early, not on the operator's deploy log.
let _ = RetryPolicy::polling(Duration::ZERO, Duration::from_secs(5));
}
// -- run_probe orchestration --------------------------------------------
/// Test probe that returns a scripted sequence of attempts, then
/// loops forever on the last one. Lets us simulate
/// "retry-then-ok" without real network I/O.
#[derive(Debug)]
struct ScriptedProbe {
name: ProbeName,
script: Mutex<Vec<ProbeAttempt>>,
calls: AtomicU32,
}
impl ScriptedProbe {
fn new(name: &str, script: Vec<ProbeAttempt>) -> Self {
Self {
name: ProbeName::try_new(name).unwrap(),
script: Mutex::new(script),
calls: AtomicU32::new(0),
}
}
}
#[async_trait]
impl<T: Topology> Probe<T> for ScriptedProbe {
fn name(&self) -> &ProbeName {
&self.name
}
async fn check(&self, _t: &T) -> ProbeAttempt {
self.calls.fetch_add(1, Ordering::SeqCst);
let mut s = self.script.lock().unwrap();
if s.len() == 1 {
s[0].clone()
} else {
s.remove(0)
}
}
}
#[derive(Debug)]
struct NoopTopology;
#[async_trait]
impl Topology for NoopTopology {
fn name(&self) -> &str {
"noop"
}
async fn ensure_ready(
&self,
) -> Result<PreparationOutcome, harmony::topology::PreparationError> {
Ok(PreparationOutcome::Noop)
}
}
#[tokio::test]
async fn ok_on_first_attempt_yields_pass() {
let probe = ScriptedProbe::new("instant", vec![ProbeAttempt::Ok(Some("hi".to_string()))]);
let outcome = run_probe(
&probe,
&NoopTopology,
RetryPolicy::polling(Duration::from_millis(1), Duration::from_millis(50)),
)
.await;
match outcome {
ProbeOutcome::Pass {
attempts, detail, ..
} => {
assert_eq!(attempts, 1);
assert_eq!(detail.as_deref(), Some("hi"));
}
other => panic!("expected Pass, got {other:?}"),
}
}
#[tokio::test]
async fn retries_until_ok_within_budget() {
let probe = ScriptedProbe::new(
"eventual",
vec![
ProbeAttempt::Retry("not yet (1)".to_string()),
ProbeAttempt::Retry("not yet (2)".to_string()),
ProbeAttempt::Ok(None),
],
);
let outcome = run_probe(
&probe,
&NoopTopology,
RetryPolicy::polling(Duration::from_millis(2), Duration::from_millis(500)),
)
.await;
match outcome {
ProbeOutcome::Pass { attempts, .. } => assert_eq!(attempts, 3),
other => panic!("expected Pass after retries, got {other:?}"),
}
}
#[tokio::test]
async fn fatal_short_circuits_without_consuming_full_budget() {
let probe = ScriptedProbe::new(
"definite-no",
vec![ProbeAttempt::Fatal("wrong selector".to_string())],
);
let start = Instant::now();
let outcome = run_probe(
&probe,
&NoopTopology,
RetryPolicy::polling(Duration::from_millis(5), Duration::from_secs(10)),
)
.await;
// The whole point of Fatal is that we don't burn the budget.
// Generous upper bound — CI variance, not a real check.
assert!(
start.elapsed() < Duration::from_secs(1),
"Fatal should return immediately, took {:?}",
start.elapsed()
);
match outcome {
ProbeOutcome::Fail {
failure: ProbeFailure::Rejected { detail },
..
} => assert_eq!(detail, "wrong selector"),
other => panic!("expected Rejected, got {other:?}"),
}
}
#[tokio::test]
async fn always_retry_yields_timeout_with_last_observation() {
let probe = ScriptedProbe::new(
"never-ready",
vec![ProbeAttempt::Retry("still pending".to_string())],
);
let outcome = run_probe(
&probe,
&NoopTopology,
RetryPolicy::polling(Duration::from_millis(5), Duration::from_millis(40)),
)
.await;
match outcome {
ProbeOutcome::Fail {
failure:
ProbeFailure::Timeout {
budget,
last_observation,
},
attempts,
..
} => {
assert_eq!(budget, Duration::from_millis(40));
assert_eq!(last_observation, "still pending");
assert!(
attempts >= 2,
"expected at least two attempts, got {attempts}"
);
}
other => panic!("expected Timeout, got {other:?}"),
}
}
}

View File

@@ -1,15 +0,0 @@
//! First-party [`Probe`](super::Probe) implementations.
//!
//! Phase 0 ships exactly one — [`TcpReachable`] — to validate the
//! trait shape against real network I/O. Phase 1 adds HTTP, K8s
//! pod-ready, NATS KV. Each future probe is one self-contained
//! file under this module.
//!
//! Probes that need their own crate dependencies (e.g. a future
//! `HelmReleaseHealthy` that talks to the helm client) can be
//! gated behind a Cargo feature here without touching the core
//! smoke types.
pub mod tcp;
pub use tcp::TcpReachable;

View File

@@ -1,225 +0,0 @@
//! `TcpReachable` — the smallest useful probe: open a TCP connection
//! to `host:port` within a timeout.
//!
//! Used as the first stage of practically every fleet smoke suite:
//! "before checking the operator's `/healthz`, prove the cluster IP
//! even routes". A connect-refused is a `Retry` (we may have caught
//! the service mid-startup); a DNS or address-syntax error is
//! `Fatal` (no amount of polling will fix it).
//!
//! Implemented with `tokio::net::TcpStream`. No new crate
//! dependencies — `tokio` is already pulled in with the `full`
//! feature by this crate's `Cargo.toml`.
use std::io;
use std::time::Duration;
use async_trait::async_trait;
use tokio::net::TcpStream;
use harmony::topology::Topology;
use crate::companion::smoke::probe::{Probe, ProbeAttempt, ProbeName};
/// Probe that connects to `host:port` and immediately drops the
/// socket. Pass on `connect succeeded`.
#[derive(Debug, Clone)]
pub struct TcpReachable {
name: ProbeName,
address: String,
connect_timeout: Duration,
}
impl TcpReachable {
/// Default connect timeout is 1s — short enough that a polling
/// retry sees several attempts; long enough that a slow host
/// doesn't false-fail. Override with
/// [`Self::with_connect_timeout`].
pub const DEFAULT_CONNECT_TIMEOUT: Duration = Duration::from_secs(1);
pub fn new(name: ProbeName, address: impl Into<String>) -> Self {
Self {
name,
address: address.into(),
connect_timeout: Self::DEFAULT_CONNECT_TIMEOUT,
}
}
pub fn with_connect_timeout(mut self, t: Duration) -> Self {
self.connect_timeout = t;
self
}
pub fn address(&self) -> &str {
&self.address
}
}
#[async_trait]
impl<T: Topology> Probe<T> for TcpReachable {
fn name(&self) -> &ProbeName {
&self.name
}
async fn check(&self, _topology: &T) -> ProbeAttempt {
let connect = TcpStream::connect(self.address.as_str());
match tokio::time::timeout(self.connect_timeout, connect).await {
Ok(Ok(_stream)) => ProbeAttempt::Ok(Some(format!("connected to {}", self.address))),
Ok(Err(e)) => classify_connect_error(&self.address, e),
Err(_elapsed) => ProbeAttempt::Retry(format!(
"connect to {} timed out after {:?}",
self.address, self.connect_timeout
)),
}
}
}
/// Most connect errors look like "not ready yet" — we want to keep
/// polling. A small set is definitive and should short-circuit the
/// retry budget. `InvalidInput` and `Unsupported` both mean the
/// address is unparseable / unroutable on this host — no number of
/// retries will fix that.
fn classify_connect_error(address: &str, e: io::Error) -> ProbeAttempt {
match e.kind() {
io::ErrorKind::InvalidInput => {
ProbeAttempt::Fatal(format!("invalid address {address}: {e}"))
}
io::ErrorKind::Unsupported => {
ProbeAttempt::Fatal(format!("unsupported address {address}: {e}"))
}
// ConnectionRefused / ConnectionReset / TimedOut /
// HostUnreachable / NetworkUnreachable / NotFound (rare) —
// all consistent with "service not up yet". Keep polling.
_ => ProbeAttempt::Retry(format!("connect to {address}: {e}")),
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::companion::smoke::probe::{ProbeFailure, ProbeOutcome, RetryPolicy, run_probe};
use async_trait::async_trait;
use harmony::topology::PreparationOutcome;
use tokio::net::TcpListener;
#[derive(Debug)]
struct NoopTopology;
#[async_trait]
impl Topology for NoopTopology {
fn name(&self) -> &str {
"noop"
}
async fn ensure_ready(
&self,
) -> Result<PreparationOutcome, harmony::topology::PreparationError> {
Ok(PreparationOutcome::Noop)
}
}
fn name(s: &str) -> ProbeName {
ProbeName::try_new(s).unwrap()
}
#[tokio::test]
async fn passes_against_a_bound_listener() {
// Bind to a kernel-assigned port and prove the probe sees
// the listener. Using 127.0.0.1 keeps this hermetic — no
// outbound network, no flakiness from name resolution.
let listener = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = listener.local_addr().unwrap();
let probe = TcpReachable::new(name("local-listener"), addr.to_string());
match run_probe(&probe, &NoopTopology, RetryPolicy::once()).await {
ProbeOutcome::Pass { attempts, .. } => assert_eq!(attempts, 1),
other => panic!("expected pass, got {other:?}"),
}
}
#[tokio::test]
async fn retries_then_passes_when_listener_appears_late() {
// Pick a free port, then race: probe runs immediately, then
// we bind 30 ms later. The polling retry should catch it.
let pre = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = pre.local_addr().unwrap();
drop(pre); // free the port
let probe = TcpReachable::new(name("late-listener"), addr.to_string())
.with_connect_timeout(Duration::from_millis(50));
let (probe_result, _) = tokio::join!(
run_probe(
&probe,
&NoopTopology,
RetryPolicy::polling(Duration::from_millis(10), Duration::from_secs(2)),
),
async {
tokio::time::sleep(Duration::from_millis(40)).await;
let listener = TcpListener::bind(addr).await.expect("rebind");
// Hold the listener until the probe is satisfied.
tokio::time::sleep(Duration::from_millis(500)).await;
drop(listener);
}
);
match probe_result {
ProbeOutcome::Pass { attempts, .. } => {
assert!(
attempts >= 2,
"expected at least 2 attempts, got {attempts}"
);
}
other => panic!("expected eventual pass, got {other:?}"),
}
}
#[tokio::test]
async fn times_out_on_unused_port() {
// Bind+drop to confirm the port is free in the kernel's
// sense, then probe against it with a tight budget. With
// nothing listening, the OS returns ECONNREFUSED, which we
// classify as Retry → budget expiry → Timeout.
let pre = TcpListener::bind("127.0.0.1:0").await.unwrap();
let addr = pre.local_addr().unwrap();
drop(pre);
let probe = TcpReachable::new(name("nothing-here"), addr.to_string())
.with_connect_timeout(Duration::from_millis(50));
let outcome = run_probe(
&probe,
&NoopTopology,
RetryPolicy::polling(Duration::from_millis(20), Duration::from_millis(100)),
)
.await;
match outcome {
ProbeOutcome::Fail {
failure: ProbeFailure::Timeout { .. },
..
} => {}
other => panic!("expected Timeout, got {other:?}"),
}
}
#[tokio::test]
async fn fatal_on_unparseable_address() {
// Connect to a syntactically invalid address. No amount of
// retrying will help; we should short-circuit to Fatal.
let probe = TcpReachable::new(name("bad-addr"), "not-an-address");
let outcome = run_probe(
&probe,
&NoopTopology,
RetryPolicy::polling(Duration::from_millis(20), Duration::from_secs(5)),
)
.await;
match outcome {
ProbeOutcome::Fail {
failure: ProbeFailure::Rejected { detail },
attempts,
..
} => {
assert!(detail.contains("not-an-address"));
assert_eq!(attempts, 1, "Fatal must short-circuit on attempt 1");
}
other => panic!("expected Rejected, got {other:?}"),
}
}
}

View File

@@ -1,287 +0,0 @@
//! Smoke suite — an ordered sequence of [`Probe`]s with per-probe
//! retry budgets, and the report that comes back from running it.
//!
//! Suites are sequential by design in Phase 0. The mental model is
//! "first prove the basic connectivity, then prove the derived
//! state" — which is inherently ordered. Parallel stages are a
//! deferred extension; the public API never exposed sequentiality
//! as a guarantee, so adding `parallel_stage` later is additive.
//!
//! A [`SmokeReport`] is a pure value — easy to log, easy to
//! serialize for the dashboard, easy to assert against in tests. It
//! deliberately mirrors the
//! [`Outcome`](harmony::interpret::Outcome)/[`InterpretError`](harmony::interpret::InterpretError)
//! split that the rest of the framework uses: one type per "what
//! happened", not a Result soup.
use std::time::{Duration, Instant};
use harmony::topology::Topology;
use tracing::{info, info_span};
use super::probe::{Probe, ProbeName, ProbeOutcome, RetryPolicy, run_probe};
/// One stage in a [`SmokeSuite`]: a probe plus the retry budget that
/// applies to *this* invocation.
struct SmokeStage<T: Topology> {
probe: Box<dyn Probe<T>>,
policy: RetryPolicy,
}
/// An ordered, retry-aware composition of probes for one
/// [`SmokeTest`](super::SmokeTest).
///
/// Built via [`SmokeSuite::new`] + [`SmokeSuite::stage`]. Run via
/// [`SmokeSuite::run`]. The same probe type can appear multiple
/// times with different policies — that's the point of supplying
/// the policy at the stage level, not on the probe itself.
pub struct SmokeSuite<T: Topology> {
stages: Vec<SmokeStage<T>>,
}
impl<T: Topology> Default for SmokeSuite<T> {
fn default() -> Self {
Self::new()
}
}
impl<T: Topology> std::fmt::Debug for SmokeSuite<T> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SmokeSuite")
.field(
"stages",
&self
.stages
.iter()
.map(|s| s.probe.name().as_str())
.collect::<Vec<_>>(),
)
.finish()
}
}
impl<T: Topology> SmokeSuite<T> {
pub fn new() -> Self {
Self { stages: Vec::new() }
}
/// Append a stage. Builder-style so an `assemble` method reads
/// like a checklist:
///
/// ```ignore
/// SmokeSuite::new()
/// .stage(TcpReachable::new(name, "nats:4222"), RetryPolicy::polling(...))
/// .stage(HttpHealthy::new(name, "/healthz"), RetryPolicy::polling(...))
/// ```
pub fn stage<P>(mut self, probe: P, policy: RetryPolicy) -> Self
where
P: Probe<T> + 'static,
{
self.stages.push(SmokeStage {
probe: Box::new(probe) as Box<dyn Probe<T>>,
policy,
});
self
}
pub fn len(&self) -> usize {
self.stages.len()
}
pub fn is_empty(&self) -> bool {
self.stages.is_empty()
}
/// Drive every stage in declared order. Stops on the first
/// failure — a smoke test is a chain of preconditions, so
/// continuing past a failure rarely produces useful information
/// and risks flooding the dashboard with cascade failures.
pub async fn run(self, topology: &T) -> SmokeReport {
let span = info_span!("smoke_suite", probes = self.stages.len());
let _guard = span.enter();
let start = Instant::now();
let mut probes = Vec::with_capacity(self.stages.len());
for stage in self.stages {
let name = stage.probe.name().clone();
info!(probe = %name, "smoke stage started");
let outcome = run_probe(&*stage.probe, topology, stage.policy).await;
let passed = outcome.passed();
info!(
probe = %name,
passed,
elapsed_ms = elapsed_ms(&outcome),
"smoke stage finished",
);
probes.push(ProbeReport { name, outcome });
if !passed {
break;
}
}
SmokeReport {
probes,
elapsed: start.elapsed(),
}
}
}
fn elapsed_ms(outcome: &ProbeOutcome) -> u64 {
let d = match outcome {
ProbeOutcome::Pass { elapsed, .. } | ProbeOutcome::Fail { elapsed, .. } => *elapsed,
};
d.as_millis() as u64
}
/// What one probe stage produced. The `name` is repeated here (also
/// owned by the probe itself) so a `SmokeReport` is fully
/// self-contained — the probe `Box` is dropped after the suite runs.
#[derive(Debug, Clone)]
pub struct ProbeReport {
pub name: ProbeName,
pub outcome: ProbeOutcome,
}
impl ProbeReport {
pub fn passed(&self) -> bool {
self.outcome.passed()
}
}
/// What [`SmokeSuite::run`] returns. The dashboard reads probes top
/// to bottom; the orchestrator reads `passed()` to gate the deploy.
#[derive(Debug, Clone)]
pub struct SmokeReport {
pub probes: Vec<ProbeReport>,
pub elapsed: Duration,
}
impl SmokeReport {
/// `true` iff every probe in the suite passed.
///
/// An empty suite passes — there's nothing to fail. Callers who
/// want "a smoke test ran" semantics should check `!is_empty()`
/// in addition.
pub fn passed(&self) -> bool {
self.probes.iter().all(ProbeReport::passed)
}
pub fn is_empty(&self) -> bool {
self.probes.is_empty()
}
/// Iterator over only the failed probes — convenient for
/// rendering "X failures of Y" without filtering at the call
/// site.
pub fn failed(&self) -> impl Iterator<Item = &ProbeReport> {
self.probes.iter().filter(|p| !p.passed())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::companion::smoke::probe::{ProbeAttempt, ProbeFailure};
use async_trait::async_trait;
use harmony::topology::PreparationOutcome;
#[derive(Debug)]
struct NoopTopology;
#[async_trait]
impl Topology for NoopTopology {
fn name(&self) -> &str {
"noop"
}
async fn ensure_ready(
&self,
) -> Result<PreparationOutcome, harmony::topology::PreparationError> {
Ok(PreparationOutcome::Noop)
}
}
#[derive(Debug)]
struct Always {
name: ProbeName,
attempt: ProbeAttempt,
}
impl Always {
fn new(name: &str, attempt: ProbeAttempt) -> Self {
Self {
name: ProbeName::try_new(name).unwrap(),
attempt,
}
}
}
#[async_trait]
impl<T: Topology> Probe<T> for Always {
fn name(&self) -> &ProbeName {
&self.name
}
async fn check(&self, _t: &T) -> ProbeAttempt {
self.attempt.clone()
}
}
#[tokio::test]
async fn empty_suite_passes_vacuously() {
let report = SmokeSuite::<NoopTopology>::new().run(&NoopTopology).await;
assert!(report.passed());
assert!(report.is_empty());
}
#[tokio::test]
async fn all_pass_stages_aggregates_pass() {
let report = SmokeSuite::<NoopTopology>::new()
.stage(
Always::new("a", ProbeAttempt::Ok(None)),
RetryPolicy::once(),
)
.stage(
Always::new("b", ProbeAttempt::Ok(Some("ok".to_string()))),
RetryPolicy::once(),
)
.run(&NoopTopology)
.await;
assert!(report.passed());
assert_eq!(report.probes.len(), 2);
assert!(report.failed().count() == 0);
}
#[tokio::test]
async fn first_failure_short_circuits_remaining_stages() {
// The third stage must never run. We assert by name —
// its presence in the report would prove a regression.
let report = SmokeSuite::<NoopTopology>::new()
.stage(
Always::new("a", ProbeAttempt::Ok(None)),
RetryPolicy::once(),
)
.stage(
Always::new("b", ProbeAttempt::Fatal("nope".to_string())),
RetryPolicy::once(),
)
.stage(
Always::new("c", ProbeAttempt::Ok(None)),
RetryPolicy::once(),
)
.run(&NoopTopology)
.await;
assert!(!report.passed());
assert_eq!(report.probes.len(), 2);
assert_eq!(report.probes[0].name.as_str(), "a");
assert_eq!(report.probes[1].name.as_str(), "b");
// Cascading failures are noise — confirm only the actual
// culprit appears.
match &report.probes[1].outcome {
ProbeOutcome::Fail {
failure: ProbeFailure::Rejected { detail },
..
} => assert_eq!(detail, "nope"),
other => panic!("expected Rejected, got {other:?}"),
}
}
}

View File

@@ -25,6 +25,13 @@
//! companion seam from P7. First-party `TcpReachable` probe ships
//! in this PR; HTTP/K8s/NATS probes and a real
//! `FleetOperatorSmokeTest` follow in Phase 1.
//! - [`companion::logs`] — the log-tail contract. `LogQuery`
//! companion trait, `LogChunk` value, `PodmanLogQuery` NATS
//! request/reply implementation. Powers the dashboard's "View
//! logs" UX. Trait + transport-side impl + unit tests ship in
//! this PR; the agent-side `Verb::Logs` handler and the operator
//! dashboard handler (`/deployments/<name>/devices/<id>/logs`)
//! land in follow-up PRs against the contract locked here.
//!
//! Out of scope for now:
//!
@@ -42,6 +49,7 @@ pub mod server;
pub use agent::{FleetAgentScore, PodTarget};
pub use companion::AgentObservation;
pub use companion::logs;
pub use companion::smoke;
pub use nats::{FleetNatsScore, UserPassCredentials};
pub use operator::{FleetOperatorScore, OperatorCredentials};

View File

@@ -15,6 +15,7 @@
//! messages on the inbox (streaming verbs set the `X-Harmony-Final`
//! header on the last frame).
use chrono::{DateTime, Utc};
use serde::{Deserialize, Serialize};
use harmony_types::id::Id;
@@ -42,6 +43,10 @@ pub const SUBJECT_PREFIX: &str = "device-commands";
#[serde(rename_all = "lowercase")]
pub enum Verb {
Ping,
/// Fetch the last N lines of a deployment's container logs from
/// the device. Single-shot reply; live tail (streaming) is a
/// later verb.
Logs,
}
impl Verb {
@@ -51,6 +56,7 @@ impl Verb {
pub fn as_subject_token(&self) -> &'static str {
match self {
Verb::Ping => "ping",
Verb::Logs => "logs",
}
}
}
@@ -67,12 +73,14 @@ pub fn device_command_subscription(device_id: &str) -> String {
format!("{SUBJECT_PREFIX}.{device_id}.>")
}
/// JSON body of an outbound command request. v1 carries only `Ping`
/// (which has no payload). Future verbs add their own struct + variant.
/// JSON body of an outbound command request. v1 carried only `Ping`;
/// `Logs` is the second verb. Future verbs add their own struct +
/// variant.
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "verb", rename_all = "lowercase")]
pub enum CommandRequest {
Ping,
Logs(LogsRequest),
}
/// JSON body of a `Verb::Ping` reply.
@@ -83,6 +91,55 @@ pub struct PingReply {
pub uptime_s: u64,
}
/// JSON body of a `Verb::Logs` request.
///
/// The deployment lives in the body (not in the subject) so the
/// agent's existing wildcard subscription
/// `device-commands.<id>.>` keeps working unchanged — the verb stays
/// the trailing token. `lines` is bounded at deserialization on the
/// agent side; values above [`LOGS_MAX_LINES`] are clamped down so a
/// hostile operator can't ask for an unbounded transfer.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct LogsRequest {
/// Which deployment on the target device to read logs from.
/// Validated by the typed [`crate::DeploymentName`] newtype.
pub deployment: crate::DeploymentName,
/// How many trailing lines to return. The agent clamps to
/// [`LOGS_MAX_LINES`] before invoking the runtime.
pub lines: u32,
}
/// JSON body of a `Verb::Logs` reply. Mirrors
/// `harmony_fleet_deploy::companion::logs::LogChunk` on the wire —
/// kept here (not in the deploy crate) so the agent build, which
/// must not depend on harmony, can construct and serialize it.
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
pub struct LogsReply {
/// `ProbeName`-equivalent identifier for the log stream. The deploy
/// crate wraps this back into a validated `ProbeName` on receipt.
/// On the wire it's a `String` so the agent crate stays free of
/// the smoke-companion's validated-newtype surface.
pub source: String,
/// When the agent captured the logs. The dashboard renders this
/// alongside the lines so the user knows the chunk is fresh
/// (not a stale cache).
pub captured_at: DateTime<Utc>,
/// Tail lines, oldest-first, one container line per entry. Empty
/// when the container has produced no output yet.
pub lines: Vec<String>,
/// `true` when the agent's runtime returned a stream that
/// exceeded the per-request size budget and we cut it. Surfacing
/// this lets the dashboard show a "showing last N of more"
/// hint instead of silently truncating.
pub truncated: bool,
}
/// Hard upper bound on `LogsRequest::lines`. The agent clamps
/// requests above this. 1000 lines comfortably fits in a single
/// NATS message under the default 8 MiB max payload and matches the
/// `--tail` budget a typical operator wants from the dashboard.
pub const LOGS_MAX_LINES: u32 = 1000;
/// Stable error categories the agent reports on the reply payload
/// when a verb can't be handled. The operator-side client maps these
/// to its own typed error enum; everything else (no_responders,
@@ -155,4 +212,68 @@ mod tests {
let json = serde_json::to_string(&CommandRequest::Ping).unwrap();
assert_eq!(json, r#"{"verb":"ping"}"#);
}
#[test]
fn logs_subject_matches_documented_format() {
// Subject suffix is the verb token only — the deployment
// lives in the request body. Keeps the agent's existing
// wildcard subscription (`device-commands.<id>.>`) and
// permission template valid without change.
assert_eq!(
device_command_subject("pi-42", Verb::Logs),
"device-commands.pi-42.logs"
);
}
#[test]
fn logs_request_round_trips_through_json() {
let original = LogsRequest {
deployment: crate::DeploymentName::try_new("hello-web").expect("valid"),
lines: 100,
};
let json = serde_json::to_vec(&original).unwrap();
let back: LogsRequest = serde_json::from_slice(&json).unwrap();
assert_eq!(back, original);
}
#[test]
fn logs_reply_round_trips_through_json() {
let original = LogsReply {
source: "podman:hello-web".to_string(),
captured_at: chrono::DateTime::parse_from_rfc3339("2026-01-15T12:00:00Z")
.unwrap()
.with_timezone(&Utc),
lines: vec!["first".to_string(), "second".to_string()],
truncated: false,
};
let json = serde_json::to_vec(&original).unwrap();
let back: LogsReply = serde_json::from_slice(&json).unwrap();
assert_eq!(back, original);
}
#[test]
fn command_request_logs_tagged_form_is_stable() {
// Locking the wire shape: an older agent that learns about
// `Logs` must continue to recognize this exact bytes
// sequence as a Logs request.
let req = CommandRequest::Logs(LogsRequest {
deployment: crate::DeploymentName::try_new("hello-web").expect("valid"),
lines: 50,
});
let json = serde_json::to_string(&req).unwrap();
assert_eq!(
json,
r#"{"verb":"logs","deployment":"hello-web","lines":50}"#
);
}
#[test]
fn logs_max_lines_is_a_sane_budget() {
// The constant exists; assert it's not zero (which would
// make every legal request return no lines) and not so
// huge that a single NATS message would exceed the default
// server payload limit.
assert!(LOGS_MAX_LINES > 0);
assert!(LOGS_MAX_LINES <= 10_000);
}
}

View File

@@ -23,8 +23,8 @@ pub mod status;
pub use commands::{
CommandRequest, ErrorKind, ErrorReply, HDR_DEADLINE, HDR_FINAL, HDR_OPERATOR_SUB,
HDR_REQUEST_ID, PingReply, SUBJECT_PREFIX, Verb, device_command_subject,
device_command_subscription,
HDR_REQUEST_ID, LOGS_MAX_LINES, LogsReply, LogsRequest, PingReply, SUBJECT_PREFIX, Verb,
device_command_subject, device_command_subscription,
};
pub use fleet::{
DeploymentName, DeploymentState, DeviceInfo, HeartbeatPayload, InvalidDeploymentName,